diff --git a/pallets/ddc-verification/src/lib.rs b/pallets/ddc-verification/src/lib.rs index 7e8d7a833..2cdafc0b7 100644 --- a/pallets/ddc-verification/src/lib.rs +++ b/pallets/ddc-verification/src/lib.rs @@ -28,7 +28,7 @@ use frame_support::{ traits::{Currency, Get, OneSessionHandler}, }; use frame_system::{ - offchain::{AppCrypto, CreateSignedTransaction, SendSignedTransaction, Signer}, + offchain::{Account, AppCrypto, CreateSignedTransaction, SendSignedTransaction, Signer}, pallet_prelude::*, }; use itertools::Itertools; @@ -565,12 +565,14 @@ pub mod pallet { } } + #[derive(Clone)] pub struct CustomerBatch { pub(crate) batch_index: BatchIndex, pub(crate) payers: Vec<(NodePubKey, BucketId, BucketUsage)>, pub(crate) batch_proof: MMRProof, } + #[derive(Clone)] pub struct ProviderBatch { pub(crate) batch_index: BatchIndex, pub(crate) payees: Vec<(NodePubKey, NodeUsage)>, @@ -737,20 +739,20 @@ pub mod pallet { return; } - let verification_key = unwrap_or_log_error!( + let verification_account = unwrap_or_log_error!( Self::collect_verification_pub_key(), "โŒ Error collecting validator verification key" ); let signer = Signer::::any_account() - .with_filter(vec![verification_key.clone()]); + .with_filter(vec![verification_account.public.clone()]); if !signer.can_sign() { log::error!("๐Ÿšจ OCW signer is not available"); return; } - Self::store_verification_account_id(verification_key.clone().into_account()); + Self::store_verification_account_id(verification_account.public.clone().into_account()); let clusters_ids = unwrap_or_log_error!( T::ClusterManager::get_clusters(ClusterStatus::Activated), @@ -759,518 +761,92 @@ pub mod pallet { log::info!("๐ŸŽก {:?} of 'Activated' clusters found", clusters_ids.len()); for cluster_id in clusters_ids { - let batch_size = T::MAX_PAYOUT_BATCH_SIZE; let mut errors: Vec = Vec::new(); - let dac_era_result = Self::process_dac_era(&cluster_id, None, batch_size.into()); + let validation_result = + Self::start_validation_phase(&cluster_id, &verification_account, &signer); - match dac_era_result { - Ok(Some(( - era_activity, - payers_merkle_root_hash, - payees_merkle_root_hash, - payers_batch_merkle_root_hashes, - payees_batch_merkle_root_hashes, - ))) => { - log::info!( - "๐Ÿญ๐Ÿš€ Processing era_id: {:?} for cluster_id: {:?}", - era_activity.clone(), - cluster_id - ); - - let results = signer.send_signed_transaction(|_account| { - Call::set_prepare_era_for_payout { - cluster_id, - era_activity: era_activity.clone(), - payers_merkle_root_hash, - payees_merkle_root_hash, - payers_batch_merkle_root_hashes: payers_batch_merkle_root_hashes - .clone(), - payees_batch_merkle_root_hashes: payees_batch_merkle_root_hashes - .clone(), - } - }); - - for (_, res) in &results { - match res { - Ok(()) => { - log::info!( - "๐Ÿญโ›ณ๏ธ Merkle roots posted on-chain for cluster_id: {:?}, era: {:?}", - cluster_id, - era_activity.clone() - ); - }, - Err(e) => { - log::error!( - "๐ŸญโŒ Error to post merkle roots on-chain for cluster_id: {:?}, era: {:?}: {:?}", - cluster_id, - era_activity.clone(), - e - ); - // Extrinsic call failed - errors.push(OCWError::PrepareEraTransactionError { - cluster_id, - era_id: era_activity.id, - payers_merkle_root_hash, - payees_merkle_root_hash, - }); - }, - } - } - }, - Ok(None) => { - log::info!("๐Ÿญโ„น๏ธ No eras for DAC process for cluster_id: {:?}", cluster_id); - }, - Err(process_errors) => { - errors.extend(process_errors); - }, - }; - - // todo! factor out as macro as this is repetitive - match Self::prepare_begin_billing_report(&cluster_id) { - Ok(Some((era_id, start_era, end_era))) => { - log::info!( - "๐Ÿญ๐Ÿš€ process_start_payout processed successfully for cluster_id: {:?}, era_id: {:?}, start_era: {:?}, end_era: {:?} ", - cluster_id, - era_id, - start_era, - end_era - ); - let results = signer.send_signed_transaction(|_account| { - Call::begin_billing_report { cluster_id, era_id, start_era, end_era } - }); - - for (_, res) in &results { - match res { - Ok(()) => { - log::info!( - "๐Ÿญ๐Ÿ„โ€ Sent begin_billing_report successfully for cluster_id: {:?}, era_id: {:?}", - cluster_id, - era_id - ); - }, - Err(e) => { - log::error!( - "๐ŸญโŒ Error to post begin_billing_report for cluster_id: {:?}, era_id: {:?}: {:?}", - cluster_id, - era_id, - e - ); - // Extrinsic call failed - errors.push(OCWError::BeginBillingReportTransactionError { - cluster_id, - era_id, - }); - }, - } - } - }, - Ok(None) => { - log::info!("๐ŸญโŒ No era for payout for cluster_id: {:?}", cluster_id); - }, - Err(e) => { - errors.push(e); - }, - } - - // todo! factor out as macro as this is repetitive - match Self::prepare_begin_charging_customers(&cluster_id, batch_size.into()) { - Ok(Some((era_id, max_batch_index))) => { - log::info!( - "๐Ÿญ๐ŸŽ prepare_begin_charging_customers processed successfully for cluster_id: {:?}, era_id: {:?}", - cluster_id, - era_id - ); - - if let Some((_, res)) = signer.send_signed_transaction(|_acc| { - Call::begin_charging_customers { cluster_id, era_id, max_batch_index } - }) { - match res { - Ok(_) => { - // Extrinsic call succeeded - log::info!( - "๐Ÿญ๐Ÿš€ Sent begin_charging_customers successfully for cluster_id: {:?}, era_id: {:?}", - cluster_id, - era_id - ); - }, - Err(e) => { - log::error!( - "๐ŸญโŒ Error to post begin_charging_customers for cluster_id: {:?}, era_id: {:?}: {:?}", - cluster_id, - era_id, - e - ); - // Extrinsic call failed - errors.push(OCWError::BeginChargingCustomersTransactionError { - cluster_id, - era_id, - }); - }, - } - } else { - log::error!("๐ŸญโŒ No account available to sign the transaction"); - errors.push(OCWError::NoAvailableSigner); - } - }, - Ok(None) => { - log::error!( - "๐Ÿญ๐Ÿฆ€ No era for begin_charging_customers for cluster_id: {:?}", - cluster_id - ); - }, - Err(e) => errors.extend(e), + if let Err(errs) = validation_result { + errors.extend(errs); } - // todo! factor out as macro as this is repetitive - match Self::prepare_send_charging_customers_batch(&cluster_id, batch_size.into()) { - Ok(Some((era_id, batch_payout))) => { - log::info!( - "๐ŸŽ prepare_send_charging_customers_batch processed successfully for cluster_id: {:?}, era_id: {:?}", - cluster_id, - era_id - ); + let payouts_result = + Self::start_payouts_phase(&cluster_id, &verification_account, &signer); - if let Some((_, res)) = signer.send_signed_transaction(|_acc| { - Call::send_charging_customers_batch { - cluster_id, - era_id, - batch_index: batch_payout.batch_index, - payers: batch_payout.payers.clone(), - batch_proof: batch_payout.batch_proof.clone(), - } - }) { - match res { - Ok(_) => { - // Extrinsic call succeeded - log::info!( - "๐Ÿญ๐Ÿš€ Sent send_charging_customers_batch successfully for cluster_id: {:?}, era_id: {:?}", - cluster_id, - era_id - ); - }, - Err(e) => { - log::error!( - "๐ŸญโŒ Error to post send_charging_customers_batch for cluster_id: {:?}, era_id: {:?}: {:?}", - cluster_id, - era_id, - e - ); - // Extrinsic call failed - errors.push( - OCWError::SendChargingCustomersBatchTransactionError { - cluster_id, - era_id, - batch_index: batch_payout.batch_index, - }, - ); - }, - } - } else { - log::error!("๐ŸญโŒ No account available to sign the transaction"); - errors.push(OCWError::NoAvailableSigner); - } - }, - Ok(None) => { - log::info!( - "๐Ÿญ๐Ÿฆ€ No era for send_charging_customers_batch for cluster_id: {:?}", - cluster_id - ); - }, - Err(e) => { - errors.extend(e); - }, + if let Err(errs) = payouts_result { + errors.extend(errs); } - // todo! factor out as macro as this is repetitive - match Self::prepare_end_charging_customers(&cluster_id) { - Ok(Some(era_id)) => { - log::info!( - "๐Ÿญ๐Ÿ“prepare_end_charging_customers processed successfully for cluster_id: {:?}, era_id: {:?}", - cluster_id, - era_id - ); - - if let Some((_, res)) = signer.send_signed_transaction(|_acc| { - Call::end_charging_customers { cluster_id, era_id } - }) { - match res { - Ok(_) => { - // Extrinsic call succeeded - log::info!( - "๐Ÿญ๐Ÿ“Sent end_charging_customers successfully for cluster_id: {:?}, era_id: {:?}", - cluster_id, - era_id - ); - }, - Err(e) => { - log::error!( - "๐ŸญโŒError to post end_charging_customers for cluster_id: {:?}, era_id: {:?}: {:?}", - cluster_id, - era_id, - e - ); - // Extrinsic call failed - errors.push(OCWError::EndChargingCustomersTransactionError { - cluster_id, - era_id, - }); - }, - } - } else { - log::error!("๐ŸญโŒNo account available to sign the transaction"); - errors.push(OCWError::NoAvailableSigner); - } - }, - Ok(None) => { - log::info!( - "๐Ÿญ๐Ÿ“No era for end_charging_customers for cluster_id: {:?}", - cluster_id - ); - }, - Err(e) => { - errors.push(e); - }, - } - - // todo! factor out as macro as this is repetitive - match Self::prepare_begin_rewarding_providers(&cluster_id, batch_size.into()) { - Ok(Some((era_id, max_batch_index, total_node_usage))) => { - log::info!( - "๐Ÿญ๐Ÿ“prepare_begin_rewarding_providers processed successfully for cluster_id: {:?}, era_id: {:?}", - cluster_id, - era_id - ); + Self::submit_errors(&errors, &verification_account, &signer); + } - if let Some((_, res)) = - signer.send_signed_transaction(|_acc| Call::begin_rewarding_providers { - cluster_id, - era_id, - max_batch_index, - total_node_usage: total_node_usage.clone(), - }) { - match res { - Ok(_) => { - // Extrinsic call succeeded - log::info!( - "๐Ÿญ๐Ÿ“Sent begin_rewarding_providers successfully for cluster_id: {:?}, era_id: {:?}", - cluster_id, - era_id - ); - }, - Err(e) => { - log::error!( - "๐ŸญโŒError to post begin_rewarding_providers for cluster_id: {:?}, era_id: {:?}: {:?}", - cluster_id, - era_id, - e - ); - // Extrinsic call failed - errors.push( - OCWError::BeginRewardingProvidersTransactionError { - cluster_id, - era_id, - }, - ); - }, - } - } else { - log::error!("๐ŸญโŒNo account available to sign the transaction"); - errors.push(OCWError::NoAvailableSigner); - } - }, - Ok(None) => { - log::info!( - "๐Ÿญ๐Ÿ“No era for begin_rewarding_providers for cluster_id: {:?}", - cluster_id - ); - }, - Err(e) => { - errors.extend(e); - }, - } + // Allow the next invocation of the offchain worker hook to run. + local_storage_clear(StorageKind::PERSISTENT, IS_RUNNING_KEY); + } + } - // todo! factor out as macro as this is repetitive - match Self::prepare_send_rewarding_providers_batch(&cluster_id, batch_size.into()) { - Ok(Some((era_id, batch_payout))) => { - log::info!( - "๐ŸŽ prepare_send_rewarding_providers_batch processed successfully for cluster_id: {:?}, era_id: {:?}", - cluster_id, - era_id - ); + macro_rules! define_payout_step_function { + ( + $func_name:ident, + $prepare_fn:ident, + $call_variant:expr, + $log_prefix:literal, + $error_variant:expr, + $extract_prepare_data:expr + ) => { + #[allow(clippy::redundant_closure_call)] + pub(crate) fn $func_name( + cluster_id: &ClusterId, + account: &Account, + signer: &Signer, + ) -> Result<(), Vec> { + match Self::$prepare_fn(&cluster_id) { + Ok(Some(prepared_data)) => { + let log_data = $extract_prepare_data(&prepared_data); - if let Some((_, res)) = signer.send_signed_transaction(|_acc| { - Call::send_rewarding_providers_batch { - cluster_id, - era_id, - batch_index: batch_payout.batch_index, - payees: batch_payout.payees.clone(), - batch_proof: batch_payout.batch_proof.clone(), - } - }) { - match res { - Ok(_) => { - // Extrinsic call succeeded - log::info!( - "๐Ÿš€ Sent send_rewarding_providers_batch successfully for cluster_id: {:?}, era_id: {:?}", - cluster_id, - era_id - ); - }, - Err(e) => { - log::error!( - "๐Ÿฆ€ Error to post send_rewarding_providers_batch for cluster_id: {:?}, era_id: {:?}: {:?}", - cluster_id, - era_id, - e - ); - // Extrinsic call failed - errors.push( - OCWError::SendRewardingProvidersBatchTransactionError { - cluster_id, - era_id, - batch_index: batch_payout.batch_index, - }, - ); - }, - } - } else { - log::error!("๐Ÿฆ€ No account available to sign the transaction"); - errors.push(OCWError::NoAvailableSigner); - } - }, - Ok(None) => { log::info!( - "๐Ÿฆ€ No era for send_rewarding_providers_batch for cluster_id: {:?}", - cluster_id + concat!($log_prefix, " Initializing '{}' call for cluster_id: {:?}, era_id: {:?}"), + stringify!($func_name), + cluster_id, + log_data, ); - }, - Err(e) => { - errors.extend(e); - }, - } - // todo! factor out as macro as this is repetitive - match Self::prepare_end_rewarding_providers(&cluster_id) { - Ok(Some(era_id)) => { - log::info!( - "๐Ÿญ๐Ÿ“prepare_end_rewarding_providers processed successfully for cluster_id: {:?}, era_id: {:?}", - cluster_id, - era_id - ); + let call = $call_variant(cluster_id, prepared_data.clone()); + let result = signer.send_single_signed_transaction(account, call); - if let Some((_, res)) = signer.send_signed_transaction(|_acc| { - Call::end_rewarding_providers { cluster_id, era_id } - }) { - match res { - Ok(_) => { - // Extrinsic call succeeded - log::info!( - "๐Ÿญ๐Ÿ“Sent end_rewarding_providers successfully for cluster_id: {:?}, era_id: {:?}", + match result { + Some(Ok(_)) => { + log::info!( + concat!($log_prefix, " Successfully sent '{}' call for cluster_id: {:?}, era_id: {:?}"), + stringify!($func_name), cluster_id, - era_id + log_data, ); - }, - Err(e) => { - log::error!( - "๐ŸญโŒError to post end_rewarding_providers for cluster_id: {:?}, era_id: {:?}: {:?}", - cluster_id, - era_id, - e - ); - // Extrinsic call failed - errors.push(OCWError::EndRewardingProvidersTransactionError { - cluster_id, - era_id, - }); - }, + Ok(()) } - } else { - log::error!("๐ŸญโŒNo account available to sign the transaction"); - errors.push(OCWError::NoAvailableSigner); - } - }, - Ok(None) => { - log::info!( - "๐Ÿญ๐Ÿ“No era for end_rewarding_providers for cluster_id: {:?}", - cluster_id - ); - }, - Err(e) => { - errors.push(e); - }, - } - - // todo! factor out as macro as this is repetitive - match Self::prepare_end_billing_report(&cluster_id) { - Ok(Some(era_id)) => { - log::info!( - "๐Ÿญ๐Ÿ“prepare_end_billing_report processed successfully for cluster_id: {:?}, era_id: {:?}", - cluster_id, - era_id - ); - - if let Some((_, res)) = signer.send_signed_transaction(|_acc| { - Call::end_billing_report { cluster_id, era_id } - }) { - match res { - Ok(_) => { - // Extrinsic call succeeded - log::info!( - "๐Ÿญ๐Ÿ“Sent end_billing_report successfully for cluster_id: {:?}, era_id: {:?}", + _ => { + log::error!( + concat!($log_prefix, " Failed to send '{}' call for cluster_id: {:?}, era_id: {:?}"), + stringify!($func_name), cluster_id, - era_id + log_data, ); - }, - Err(e) => { - log::error!( - "๐ŸญโŒError to post end_billing_report for cluster_id: {:?}, era_id: {:?}: {:?}", - cluster_id, - era_id, - e - ); - // Extrinsic call failed - errors.push(OCWError::EndBillingReportTransactionError { - cluster_id, - era_id, - }); - }, + Err(vec![$error_variant(cluster_id, prepared_data)]) } - } else { - log::error!("๐ŸญโŒNo account available to sign the transaction"); - errors.push(OCWError::NoAvailableSigner); } - }, + } Ok(None) => { log::info!( - "๐Ÿญ๐Ÿ“No era for end_billing_report for cluster_id: {:?}", - cluster_id + concat!($log_prefix, " Skipping '{}' call as there is no era for payout for cluster_id: {:?}"), + stringify!($func_name), + cluster_id, ); - }, - Err(e) => { - errors.push(e); - }, - } - - if !errors.is_empty() { - let results = signer.send_signed_transaction(|_account| { - Call::emit_consensus_errors { errors: errors.clone() } - }); - - for (_, res) in &results { - match res { - Ok(()) => { - log::info!("โœ… Successfully submitted emit_consensus_errors tx") - }, - Err(_) => log::error!("๐ŸญโŒ Failed to submit emit_consensus_errors tx"), - } + Ok(()) } + Err(errs) => Err(errs), } } - - // Allow the next invocation of the offchain worker hook to run. - local_storage_clear(StorageKind::PERSISTENT, IS_RUNNING_KEY); - } + }; } impl Pallet { @@ -1305,15 +881,14 @@ pub mod pallet { pub(crate) fn process_dac_era( cluster_id: &ClusterId, era_id_to_process: Option, - batch_size: usize, ) -> Result< Option<(EraActivity, ActivityHash, ActivityHash, Vec, Vec)>, Vec, > { - log::info!("๐Ÿš€ Processing dac data for cluster_id: {:?}", cluster_id); + let batch_size = T::MAX_PAYOUT_BATCH_SIZE; let dac_nodes = Self::get_dac_nodes(cluster_id).map_err(|_| { - log::error!("๐ŸญโŒ Error retrieving dac nodes to validate cluster {:?}", cluster_id); + log::error!("โŒ Error retrieving dac nodes to validate cluster {:?}", cluster_id); vec![OCWError::FailedToFetchDacNodes] })?; @@ -1331,6 +906,12 @@ pub mod pallet { } }; + log::info!( + "๐Ÿ‘๏ธโ€๐Ÿ—จ๏ธ Start processing DAC for cluster_id: {:?} era_id; {:?}", + cluster_id, + era_activity.id + ); + // todo: move to cluster protocol parameters let dac_redundancy_factor = T::DAC_REDUNDANCY_FACTOR; let aggregators_quorum = T::AggregatorsQuorum::get(); @@ -1365,7 +946,7 @@ pub mod pallet { customer_activity_hashes.clone().into_iter().map(hex::encode).collect(); log::info!( - "๐Ÿง—โ€ Customer Activity hashes for ClusterId: {:?} EraId: {:?} is: {:?}", + "๐Ÿ‘๏ธโ€๐Ÿ—จ๏ธ Customer Activity hashes for cluster_id: {:?} era_id: {:?} is: {:?}", cluster_id, era_activity.id, customer_activity_hashes_string @@ -1373,7 +954,7 @@ pub mod pallet { let customers_activity_batch_roots = Self::convert_to_batch_merkle_roots( cluster_id, era_activity.id, - Self::split_to_batches(&total_buckets_usage, batch_size), + Self::split_to_batches(&total_buckets_usage, batch_size.into()), ) .map_err(|err| vec![err])?; @@ -1382,7 +963,7 @@ pub mod pallet { for (pos, batch_root) in customer_batch_roots_string.iter().enumerate() { log::info!( - "๐Ÿง—โ€ Customer Activity batches for ClusterId: {:?} EraId: {:?} is: batch {:?} with root {:?} for activities {:?}", + "๐Ÿ‘๏ธโ€๐Ÿ—จ๏ธโ€ Customer Activity batches for cluster_id: {:?} era_id: {:?} is: batch {:?} with root {:?} for activities {:?}", cluster_id, era_activity.id, pos + 1, @@ -1399,7 +980,7 @@ pub mod pallet { .map_err(|err| vec![err])?; log::info!( - "๐Ÿง—โ€ Customer Activity batches tree for ClusterId: {:?} EraId: {:?} is: batch with root {:?} for activities {:?}", + "๐Ÿ‘๏ธโ€๐Ÿ—จ๏ธโ€ Customer Activity batches tree for cluster_id: {:?} era_id: {:?} is: batch with root {:?} for activities {:?}", cluster_id, era_activity.id, hex::encode(customers_activity_root), @@ -1424,7 +1005,7 @@ pub mod pallet { node_activity_hashes.clone().into_iter().map(hex::encode).collect(); log::info!( - "๐Ÿง—โ€ Node Activity hashes for ClusterId: {:?} EraId: {:?} is: {:?}", + "๐Ÿ‘๏ธโ€๐Ÿ—จ๏ธ Node Activity hashes for cluster_id: {:?} era_id: {:?} is: {:?}", cluster_id, era_activity.id, node_activity_hashes_string @@ -1433,7 +1014,7 @@ pub mod pallet { let nodes_activity_batch_roots = Self::convert_to_batch_merkle_roots( cluster_id, era_activity.id, - Self::split_to_batches(&total_nodes_usage, batch_size), + Self::split_to_batches(&total_nodes_usage, batch_size.into()), ) .map_err(|err| vec![err])?; @@ -1442,7 +1023,7 @@ pub mod pallet { for (pos, batch_root) in nodes_activity_batch_roots_string.iter().enumerate() { log::info!( - "๐Ÿง—โ€ Node Activity batches for ClusterId: {:?} EraId: {:?} is: batch {:?} with root {:?} for activities {:?}", + "๐Ÿ‘๏ธโ€๐Ÿ—จ๏ธ Node Activity batches for cluster_id: {:?} era_id: {:?} are: batch {:?} with root {:?} for activities {:?}", cluster_id, era_activity.id, pos + 1, @@ -1456,7 +1037,7 @@ pub mod pallet { .map_err(|err| vec![err])?; log::info!( - "๐Ÿง—โ€ Node Activity batches tree for ClusterId: {:?} EraId: {:?} is: batch with root {:?} for activities {:?}", + "๐Ÿ‘๏ธโ€๐Ÿ—จ๏ธ Node Activity batches tree for cluster_id: {:?} era_id: {:?} are: batch with root {:?} for activities {:?}", cluster_id, era_activity.id, hex::encode(nodes_activity_root), @@ -1473,7 +1054,11 @@ pub mod pallet { nodes_activity_root, &nodes_activity_batch_roots, ); - log::info!("๐Ÿ™‡โ€ Dac data processing completed for cluster_id: {:?}", cluster_id); + log::info!( + "๐Ÿ‘๏ธโ€๐Ÿ—จ๏ธโ€ End processing DAC for cluster_id: {:?} era_id: {:?}", + cluster_id, + era_activity.id + ); Ok(Some(( era_activity, customers_activity_root, @@ -1483,6 +1068,262 @@ pub mod pallet { ))) } + pub(crate) fn start_validation_phase( + cluster_id: &ClusterId, + verification_account: &Account, + signer: &Signer, + ) -> Result<(), Vec> { + let validation_output = Self::process_dac_era(cluster_id, None)?; + + match validation_output { + Some(( + era_activity, + payers_merkle_root_hash, + payees_merkle_root_hash, + payers_batch_merkle_root_hashes, + payees_batch_merkle_root_hashes, + )) => { + let call = Call::set_prepare_era_for_payout { + cluster_id: *cluster_id, + era_activity: era_activity.clone(), + payers_merkle_root_hash, + payees_merkle_root_hash, + payers_batch_merkle_root_hashes: payers_batch_merkle_root_hashes.clone(), + payees_batch_merkle_root_hashes: payees_batch_merkle_root_hashes.clone(), + }; + + let result = signer.send_single_signed_transaction(verification_account, call); + + match result { + Some(Ok(_)) => { + log::info!( + "๐Ÿ‘๏ธโ€๐Ÿ—จ๏ธ DAC Validation merkle roots posted on-chain for cluster_id: {:?}, era: {:?}", + cluster_id, + era_activity.clone() + ); + Ok(()) + }, + _ => Err(vec![OCWError::PrepareEraTransactionError { + cluster_id: *cluster_id, + era_id: era_activity.id, + payers_merkle_root_hash, + payees_merkle_root_hash, + }]), + } + }, + None => { + log::info!("๐Ÿ‘๏ธโ€๐Ÿ—จ๏ธ No eras for DAC processing for cluster_id: {:?}", cluster_id); + Ok(()) + }, + } + } + + pub(crate) fn start_payouts_phase( + cluster_id: &ClusterId, + account: &Account, + signer: &Signer, + ) -> Result<(), Vec> { + let mut errors: Vec = Vec::new(); + + if let Err(errs) = Self::step_begin_billing_report(cluster_id, account, signer) { + errors.extend(errs); + } + + if let Err(errs) = Self::step_begin_charging_customers(cluster_id, account, signer) { + errors.extend(errs); + } + + if let Err(errs) = Self::step_send_charging_customers(cluster_id, account, signer) { + errors.extend(errs); + } + + if let Err(errs) = Self::step_end_charging_customers(cluster_id, account, signer) { + errors.extend(errs); + } + + if let Err(errs) = Self::step_begin_rewarding_providers(cluster_id, account, signer) { + errors.extend(errs); + } + + if let Err(errs) = Self::step_send_rewarding_providers(cluster_id, account, signer) { + errors.extend(errs); + } + + if let Err(errs) = Self::step_end_rewarding_providers(cluster_id, account, signer) { + errors.extend(errs); + } + + if let Err(errs) = Self::step_end_billing_report(cluster_id, account, signer) { + errors.extend(errs); + } + + Self::submit_errors(&errors, account, signer); + + Ok(()) + } + + define_payout_step_function!( + step_begin_billing_report, + prepare_begin_billing_report, + |cluster_id: &ClusterId, (era_id, start_era, end_era)| Call::begin_billing_report { + cluster_id: *cluster_id, + era_id, + start_era, + end_era, + }, + "๐Ÿ—“๏ธ", + |cluster_id: &ClusterId, (era_id, _, _)| OCWError::BeginBillingReportTransactionError { + cluster_id: *cluster_id, + era_id, + }, + |prepared_data: &(DdcEra, _, _)| prepared_data.0 + ); + + define_payout_step_function!( + step_begin_charging_customers, + prepare_begin_charging_customers, + |cluster_id: &ClusterId, (era_id, max_batch_index)| Call::begin_charging_customers { + cluster_id: *cluster_id, + era_id, + max_batch_index, + }, + "๐Ÿ“ฅ", + |cluster_id: &ClusterId, (era_id, _)| { + OCWError::BeginChargingCustomersTransactionError { cluster_id: *cluster_id, era_id } + }, + |prepared_data: &(DdcEra, _)| prepared_data.0 + ); + + define_payout_step_function!( + step_send_charging_customers, + prepare_send_charging_customers_batch, + |cluster_id: &ClusterId, (era_id, batch_payout): (DdcEra, CustomerBatch)| { + Call::send_charging_customers_batch { + cluster_id: *cluster_id, + era_id, + batch_index: batch_payout.batch_index, + payers: batch_payout.payers.clone(), + batch_proof: batch_payout.batch_proof.clone(), + } + }, + "๐Ÿงพ", + |cluster_id: &ClusterId, (era_id, batch_payout): (DdcEra, CustomerBatch)| { + OCWError::SendChargingCustomersBatchTransactionError { + cluster_id: *cluster_id, + era_id, + batch_index: batch_payout.batch_index, + } + }, + |prepared_data: &(DdcEra, _)| prepared_data.0 + ); + + define_payout_step_function!( + step_end_charging_customers, + prepare_end_charging_customers, + |cluster_id: &ClusterId, era_id| Call::end_charging_customers { + cluster_id: *cluster_id, + era_id + }, + "๐Ÿ“ช", + |cluster_id: &ClusterId, era_id| OCWError::EndChargingCustomersTransactionError { + cluster_id: *cluster_id, + era_id, + }, + |prepared_data: &DdcEra| *prepared_data + ); + + define_payout_step_function!( + step_begin_rewarding_providers, + prepare_begin_rewarding_providers, + |cluster_id: &ClusterId, + (era_id, max_batch_index, total_node_usage): (DdcEra, u16, NodeUsage)| { + Call::begin_rewarding_providers { + cluster_id: *cluster_id, + era_id, + max_batch_index, + total_node_usage: total_node_usage.clone(), + } + }, + "๐Ÿ“ค", + |cluster_id: &ClusterId, (era_id, _, _)| { + OCWError::BeginRewardingProvidersTransactionError { + cluster_id: *cluster_id, + era_id, + } + }, + |prepared_data: &(DdcEra, _, _)| prepared_data.0 + ); + + define_payout_step_function!( + step_send_rewarding_providers, + prepare_send_rewarding_providers_batch, + |cluster_id: &ClusterId, (era_id, batch_payout): (DdcEra, ProviderBatch)| { + Call::send_rewarding_providers_batch { + cluster_id: *cluster_id, + era_id, + batch_index: batch_payout.batch_index, + payees: batch_payout.payees.clone(), + batch_proof: batch_payout.batch_proof.clone(), + } + }, + "๐Ÿ’ธ", + |cluster_id: &ClusterId, (era_id, batch_payout): (DdcEra, ProviderBatch)| { + OCWError::SendRewardingProvidersBatchTransactionError { + cluster_id: *cluster_id, + era_id, + batch_index: batch_payout.batch_index, + } + }, + |prepared_data: &(DdcEra, _)| prepared_data.0 + ); + + define_payout_step_function!( + step_end_rewarding_providers, + prepare_end_rewarding_providers, + |cluster_id: &ClusterId, era_id| Call::end_rewarding_providers { + cluster_id: *cluster_id, + era_id + }, + "๐Ÿ“ญ", + |cluster_id: &ClusterId, era_id| OCWError::EndRewardingProvidersTransactionError { + cluster_id: *cluster_id, + era_id, + }, + |prepared_data: &DdcEra| *prepared_data + ); + + define_payout_step_function!( + step_end_billing_report, + prepare_end_billing_report, + |cluster_id: &ClusterId, era_id| Call::end_billing_report { + cluster_id: *cluster_id, + era_id + }, + "๐Ÿงฎ", + |cluster_id: &ClusterId, era_id| OCWError::EndBillingReportTransactionError { + cluster_id: *cluster_id, + era_id, + }, + |prepared_data: &DdcEra| *prepared_data + ); + + pub(crate) fn submit_errors( + errors: &Vec, + verification_account: &Account, + signer: &Signer, + ) { + if !errors.is_empty() { + let call = Call::emit_consensus_errors { errors: errors.to_owned() }; + let result = signer.send_single_signed_transaction(verification_account, call); + + if let Some(Ok(_)) = result { + log::info!("โŒ Successfully sent 'emit_consensus_errors' call"); + } else { + log::error!("โŒ Failed to send 'emit_consensus_errors' call"); + }; + } + } + pub(crate) fn get_total_usage( cluster_id: &ClusterId, era_id: DdcEra, @@ -1927,13 +1768,13 @@ pub mod pallet { #[allow(dead_code)] pub(crate) fn prepare_begin_billing_report( cluster_id: &ClusterId, - ) -> Result, OCWError> { + ) -> Result, Vec> { Ok(Self::get_era_for_payout(cluster_id, EraValidationStatus::ReadyForPayout)) + .map_err(|e| vec![e]) } pub(crate) fn prepare_begin_charging_customers( cluster_id: &ClusterId, - batch_size: usize, ) -> Result, Vec> { if let Some((era_id, start, end)) = Self::get_era_for_payout(cluster_id, EraValidationStatus::PayoutInProgress) @@ -1955,7 +1796,7 @@ pub mod pallet { } else { let era_activity = EraActivity { id: era_id, start, end }; - let _ = Self::process_dac_era(cluster_id, Some(era_activity), batch_size)?; + let _ = Self::process_dac_era(cluster_id, Some(era_activity))?; if let Some((_, _, customers_activity_batch_roots, _, _, _)) = Self::fetch_validation_activities::< @@ -1999,8 +1840,9 @@ pub mod pallet { pub(crate) fn prepare_send_charging_customers_batch( cluster_id: &ClusterId, - batch_size: usize, ) -> Result, Vec> { + let batch_size = T::MAX_PAYOUT_BATCH_SIZE; + if let Some((era_id, start, end)) = Self::get_era_for_payout(cluster_id, EraValidationStatus::PayoutInProgress) { @@ -2021,7 +1863,7 @@ pub mod pallet { { Self::fetch_charging_activities( cluster_id, - batch_size, + batch_size.into(), era_id, customers_total_activity, customers_activity_batch_roots, @@ -2029,7 +1871,7 @@ pub mod pallet { } else { let era_activity = EraActivity { id: era_id, start, end }; - let _ = Self::process_dac_era(cluster_id, Some(era_activity), batch_size)?; + let _ = Self::process_dac_era(cluster_id, Some(era_activity))?; if let Some(( customers_total_activity, @@ -2045,7 +1887,7 @@ pub mod pallet { { Self::fetch_charging_activities( cluster_id, - batch_size, + batch_size.into(), era_id, customers_total_activity, customers_activity_batch_roots, @@ -2142,7 +1984,7 @@ pub mod pallet { pub(crate) fn prepare_end_charging_customers( cluster_id: &ClusterId, - ) -> Result, OCWError> { + ) -> Result, Vec> { if let Some((era_id, _start, _end)) = Self::get_era_for_payout(cluster_id, EraValidationStatus::PayoutInProgress) { @@ -2158,7 +2000,6 @@ pub mod pallet { pub(crate) fn prepare_begin_rewarding_providers( cluster_id: &ClusterId, - batch_size: usize, ) -> Result, Vec> { if let Some((era_id, start, end)) = Self::get_era_for_payout(cluster_id, EraValidationStatus::PayoutInProgress) @@ -2189,7 +2030,7 @@ pub mod pallet { } else { let era_activity = EraActivity { id: era_id, start, end }; - let _ = Self::process_dac_era(cluster_id, Some(era_activity), batch_size)?; + let _ = Self::process_dac_era(cluster_id, Some(era_activity))?; if let Some(( _, @@ -2258,8 +2099,9 @@ pub mod pallet { pub(crate) fn prepare_send_rewarding_providers_batch( cluster_id: &ClusterId, - batch_size: usize, ) -> Result, Vec> { + let batch_size = T::MAX_PAYOUT_BATCH_SIZE; + if let Some((era_id, start, end)) = Self::get_era_for_payout(cluster_id, EraValidationStatus::PayoutInProgress) { @@ -2274,7 +2116,7 @@ pub mod pallet { { Self::fetch_reward_provider_batch( cluster_id, - batch_size, + batch_size.into(), era_id, nodes_total_activity, nodes_activity_batch_roots, @@ -2282,7 +2124,7 @@ pub mod pallet { } else { let era_activity = EraActivity { id: era_id, start, end }; - let _ = Self::process_dac_era(cluster_id, Some(era_activity), batch_size)?; + let _ = Self::process_dac_era(cluster_id, Some(era_activity))?; if let Some(( _, @@ -2298,7 +2140,7 @@ pub mod pallet { { Self::fetch_reward_provider_batch( cluster_id, - batch_size, + batch_size.into(), era_id, nodes_total_activity, nodes_activity_batch_roots, @@ -2395,7 +2237,7 @@ pub mod pallet { pub(crate) fn prepare_end_rewarding_providers( cluster_id: &ClusterId, - ) -> Result, OCWError> { + ) -> Result, Vec> { if let Some((era_id, _start, _end)) = Self::get_era_for_payout(cluster_id, EraValidationStatus::PayoutInProgress) { @@ -2411,7 +2253,7 @@ pub mod pallet { pub(crate) fn prepare_end_billing_report( cluster_id: &ClusterId, - ) -> Result, OCWError> { + ) -> Result, Vec> { if let Some((era_id, _start, _end)) = Self::get_era_for_payout(cluster_id, EraValidationStatus::PayoutInProgress) { @@ -2428,21 +2270,24 @@ pub mod pallet { format!("offchain::activities::{:?}::{:?}", cluster_id, era_id).into_bytes() } - pub(crate) fn collect_verification_pub_key() -> Result { + pub(crate) fn collect_verification_pub_key() -> Result, OCWError> { let session_verification_keys = >::RuntimeAppPublic::all() .into_iter() - .filter_map(|key| { + .enumerate() + .filter_map(|(i, key)| { let generic_public = >::GenericPublic::from(key); let public_key: T::Public = generic_public.into(); let account_id = public_key.clone().into_account(); + if >::get().contains(&account_id) { - Option::Some(public_key) + let account = Account::new(i, account_id, public_key); + Option::Some(account) } else { Option::None } @@ -2559,7 +2404,7 @@ pub mod pallet { Vec, )> { log::info!( - "๐Ÿ  Off-chain validation_activities cache hit for ClusterId: {:?} EraId: {:?}", + "๐Ÿ  Off-chain validation_activities cache hit for cluster_id: {:?} era_id: {:?}", cluster_id, era_id );