diff --git a/pallets/ddc-verification/src/lib.rs b/pallets/ddc-verification/src/lib.rs index 76356b15e..cf5efbc8c 100644 --- a/pallets/ddc-verification/src/lib.rs +++ b/pallets/ddc-verification/src/lib.rs @@ -591,14 +591,12 @@ pub mod pallet { let cluster_id = unwrap_or_log_error!( Self::get_cluster_to_validate(), - "🏭❌ Error retrieving cluster to validate" /* todo! consistent emojis in logs - * with 2 icons, one for OCW 🏭, - * other is logπŸ“‹/error❌ */ + "🏭❌ Error retrieving cluster to validate" ); let dac_nodes = unwrap_or_log_error!( Self::get_dac_nodes(&cluster_id), - "πŸ¦€πŸ¦€πŸ¦€πŸ¦€ Error retrieving dac nodes to validate πŸ¦€πŸ¦€πŸ¦€πŸ¦€" + "🏭❌ Error retrieving dac nodes to validate" ); let min_nodes = T::MIN_DAC_NODES_FOR_CONSENSUS; @@ -611,7 +609,7 @@ pub mod pallet { match processed_dac_data { Ok(Some((era_activity, payers_merkle_root_hash, payees_merkle_root_hash))) => { log::info!( - "πŸš€ Processing era_id: {:?} for cluster_id: {:?}", + "πŸ­πŸš€ Processing era_id: {:?} for cluster_id: {:?}", era_activity.clone(), cluster_id ); @@ -629,14 +627,14 @@ pub mod pallet { match res { Ok(()) => { log::info!( - "⛳️ Merkle roots posted on-chain for cluster_id: {:?}, era: {:?}", + "πŸ­β›³οΈ Merkle roots posted on-chain for cluster_id: {:?}, era: {:?}", cluster_id, era_activity.clone() ); }, Err(e) => { log::error!( - "πŸ¦€ Error to post merkle roots on-chain for cluster_id: {:?}, era: {:?}: {:?}", + "🏭❌ Error to post merkle roots on-chain for cluster_id: {:?}, era: {:?}: {:?}", cluster_id, era_activity.clone(), e @@ -653,7 +651,7 @@ pub mod pallet { } }, Ok(None) => { - log::info!("ℹ️ No eras for DAC process for cluster_id: {:?}", cluster_id); + log::info!("πŸ­β„ΉοΈ No eras for DAC process for cluster_id: {:?}", cluster_id); }, Err(process_errors) => { errors.extend(process_errors); @@ -664,7 +662,7 @@ pub mod pallet { match Self::prepare_begin_billing_report(&cluster_id) { Ok(Some((era_id, start_era, end_era))) => { log::info!( - "πŸš€ process_start_payout processed successfully for cluster_id: {:?}, era_id: {:?}, start_era: {:?}, end_era: {:?} ", + "πŸ­πŸš€ process_start_payout processed successfully for cluster_id: {:?}, era_id: {:?}, start_era: {:?}, end_era: {:?} ", cluster_id, era_id, start_era, @@ -678,14 +676,14 @@ pub mod pallet { match res { Ok(()) => { log::info!( - "πŸ„β€ Sent begin_billing_report successfully for cluster_id: {:?}, era_id: {:?}", + "πŸ­πŸ„β€ Sent begin_billing_report successfully for cluster_id: {:?}, era_id: {:?}", cluster_id, era_id ); }, Err(e) => { log::error!( - "πŸ¦€ Error to post begin_billing_report for cluster_id: {:?}, era_id: {:?}: {:?}", + "🏭❌ Error to post begin_billing_report for cluster_id: {:?}, era_id: {:?}: {:?}", cluster_id, era_id, e @@ -700,7 +698,7 @@ pub mod pallet { } }, Ok(None) => { - log::info!("πŸ¦€ No era for payout for cluster_id: {:?}", cluster_id); + log::info!("🏭❌ No era for payout for cluster_id: {:?}", cluster_id); }, Err(e) => { errors.push(e); @@ -716,7 +714,7 @@ pub mod pallet { ) { Ok(Some((era_id, max_batch_index))) => { log::info!( - "🎁 prepare_begin_charging_customers processed successfully for cluster_id: {:?}, era_id: {:?}", + "🏭🎁 prepare_begin_charging_customers processed successfully for cluster_id: {:?}, era_id: {:?}", cluster_id, era_id ); @@ -728,14 +726,14 @@ pub mod pallet { Ok(_) => { // Extrinsic call succeeded log::info!( - "πŸš€ Sent begin_charging_customers successfully for cluster_id: {:?}, era_id: {:?}", + "πŸ­πŸš€ Sent begin_charging_customers successfully for cluster_id: {:?}, era_id: {:?}", cluster_id, era_id ); }, Err(e) => { log::error!( - "πŸ¦€ Error to post begin_charging_customers for cluster_id: {:?}, era_id: {:?}: {:?}", + "🏭❌ Error to post begin_charging_customers for cluster_id: {:?}, era_id: {:?}: {:?}", cluster_id, era_id, e @@ -748,13 +746,13 @@ pub mod pallet { }, } } else { - log::error!("πŸ¦€ No account available to sign the transaction"); + log::error!("🏭❌ No account available to sign the transaction"); errors.push(OCWError::NoAvailableSigner); } }, Ok(None) => { - log::info!( - "πŸ¦€ No era for begin_charging_customers for cluster_id: {:?}", + log::error!( + "πŸ­πŸ¦€ No era for begin_charging_customers for cluster_id: {:?}", cluster_id ); }, @@ -762,10 +760,15 @@ pub mod pallet { } // todo! factor out as macro as this is repetitive - match Self::prepare_send_charging_customers_batch(&cluster_id, batch_size.into()) { + match Self::prepare_send_charging_customers_batch( + &cluster_id, + batch_size.into(), + &dac_nodes, + min_nodes, + ) { Ok(Some((era_id, batch_payout))) => { log::info!( - "🎁 prepare_send_charging_customers_batch processed successfully for cluster_id: {:?}, era_id: {:?}", + "🏭🎁 prepare_send_charging_customers_batch processed successfully for cluster_id: {:?}, era_id: {:?}", cluster_id, era_id ); @@ -782,14 +785,14 @@ pub mod pallet { Ok(_) => { // Extrinsic call succeeded log::info!( - "πŸš€ Sent send_charging_customers_batch successfully for cluster_id: {:?}, era_id: {:?}", + "πŸ­πŸš€ Sent send_charging_customers_batch successfully for cluster_id: {:?}, era_id: {:?}", cluster_id, era_id ); }, Err(e) => { log::error!( - "πŸ¦€ Error to post send_charging_customers_batch for cluster_id: {:?}, era_id: {:?}: {:?}", + "🏭❌ Error to post send_charging_customers_batch for cluster_id: {:?}, era_id: {:?}: {:?}", cluster_id, era_id, e @@ -803,18 +806,18 @@ pub mod pallet { }, } } else { - log::error!("πŸ¦€ No account available to sign the transaction"); + log::error!("🏭❌ No account available to sign the transaction"); errors.push(OCWError::NoAvailableSigner); } }, Ok(None) => { log::info!( - "πŸ¦€ No era for send_charging_customers_batch for cluster_id: {:?}", + "πŸ­πŸ¦€ No era for send_charging_customers_batch for cluster_id: {:?}", cluster_id ); }, Err(e) => { - errors.push(e); + errors.extend(e); }, } @@ -822,7 +825,7 @@ pub mod pallet { match Self::prepare_end_charging_customers(&cluster_id) { Ok(Some(era_id)) => { log::info!( - "prepare_end_charging_customers processed successfully for cluster_id: {:?}, era_id: {:?}", + "πŸ­πŸ“prepare_end_charging_customers processed successfully for cluster_id: {:?}, era_id: {:?}", cluster_id, era_id ); @@ -834,14 +837,14 @@ pub mod pallet { Ok(_) => { // Extrinsic call succeeded log::info!( - "Sent end_charging_customers successfully for cluster_id: {:?}, era_id: {:?}", + "πŸ­πŸ“Sent end_charging_customers successfully for cluster_id: {:?}, era_id: {:?}", cluster_id, era_id ); }, Err(e) => { log::error!( - "Error to post end_charging_customers for cluster_id: {:?}, era_id: {:?}: {:?}", + "🏭❌Error to post end_charging_customers for cluster_id: {:?}, era_id: {:?}: {:?}", cluster_id, era_id, e @@ -854,13 +857,13 @@ pub mod pallet { }, } } else { - log::error!("No account available to sign the transaction"); + log::error!("🏭❌No account available to sign the transaction"); errors.push(OCWError::NoAvailableSigner); } }, Ok(None) => { log::info!( - "No era for end_charging_customers for cluster_id: {:?}", + "πŸ­πŸ“No era for end_charging_customers for cluster_id: {:?}", cluster_id ); }, @@ -870,10 +873,15 @@ pub mod pallet { } // todo! factor out as macro as this is repetitive - match Self::prepare_begin_rewarding_providers(&cluster_id) { + match Self::prepare_begin_rewarding_providers( + &cluster_id, + &dac_nodes, + min_nodes, + batch_size.into(), + ) { Ok(Some((era_id, max_batch_index, total_node_usage))) => { log::info!( - "prepare_begin_rewarding_providers processed successfully for cluster_id: {:?}, era_id: {:?}", + "πŸ­πŸ“prepare_begin_rewarding_providers processed successfully for cluster_id: {:?}, era_id: {:?}", cluster_id, era_id ); @@ -889,14 +897,14 @@ pub mod pallet { Ok(_) => { // Extrinsic call succeeded log::info!( - "Sent begin_rewarding_providers successfully for cluster_id: {:?}, era_id: {:?}", + "πŸ­πŸ“Sent begin_rewarding_providers successfully for cluster_id: {:?}, era_id: {:?}", cluster_id, era_id ); }, Err(e) => { log::error!( - "Error to post begin_rewarding_providers for cluster_id: {:?}, era_id: {:?}: {:?}", + "🏭❌Error to post begin_rewarding_providers for cluster_id: {:?}, era_id: {:?}: {:?}", cluster_id, era_id, e @@ -909,23 +917,28 @@ pub mod pallet { }, } } else { - log::error!("No account available to sign the transaction"); + log::error!("🏭❌No account available to sign the transaction"); errors.push(OCWError::NoAvailableSigner); } }, Ok(None) => { log::info!( - "No era for begin_rewarding_providers for cluster_id: {:?}", + "πŸ­πŸ“No era for begin_rewarding_providers for cluster_id: {:?}", cluster_id ); }, Err(e) => { - errors.push(e); + errors.extend(e); }, } // todo! factor out as macro as this is repetitive - match Self::prepare_send_rewarding_providers_batch(&cluster_id, batch_size.into()) { + match Self::prepare_send_rewarding_providers_batch( + &cluster_id, + batch_size.into(), + &dac_nodes, + min_nodes, + ) { Ok(Some((era_id, batch_payout))) => { log::info!( "🎁 prepare_send_rewarding_providers_batch processed successfully for cluster_id: {:?}, era_id: {:?}", @@ -980,7 +993,7 @@ pub mod pallet { ); }, Err(e) => { - errors.push(e); + errors.extend(e); }, } @@ -988,7 +1001,7 @@ pub mod pallet { match Self::prepare_end_rewarding_providers(&cluster_id) { Ok(Some(era_id)) => { log::info!( - "prepare_end_rewarding_providers processed successfully for cluster_id: {:?}, era_id: {:?}", + "πŸ­πŸ“prepare_end_rewarding_providers processed successfully for cluster_id: {:?}, era_id: {:?}", cluster_id, era_id ); @@ -1000,14 +1013,14 @@ pub mod pallet { Ok(_) => { // Extrinsic call succeeded log::info!( - "Sent end_rewarding_providers successfully for cluster_id: {:?}, era_id: {:?}", + "πŸ­πŸ“Sent end_rewarding_providers successfully for cluster_id: {:?}, era_id: {:?}", cluster_id, era_id ); }, Err(e) => { log::error!( - "Error to post end_rewarding_providers for cluster_id: {:?}, era_id: {:?}: {:?}", + "🏭❌Error to post end_rewarding_providers for cluster_id: {:?}, era_id: {:?}: {:?}", cluster_id, era_id, e @@ -1020,13 +1033,13 @@ pub mod pallet { }, } } else { - log::error!("No account available to sign the transaction"); + log::error!("🏭❌No account available to sign the transaction"); errors.push(OCWError::NoAvailableSigner); } }, Ok(None) => { log::info!( - "No era for end_rewarding_providers for cluster_id: {:?}", + "πŸ­πŸ“No era for end_rewarding_providers for cluster_id: {:?}", cluster_id ); }, @@ -1039,7 +1052,7 @@ pub mod pallet { match Self::prepare_end_billing_report(&cluster_id) { Ok(Some(era_id)) => { log::info!( - "prepare_end_billing_report processed successfully for cluster_id: {:?}, era_id: {:?}", + "πŸ­πŸ“prepare_end_billing_report processed successfully for cluster_id: {:?}, era_id: {:?}", cluster_id, era_id ); @@ -1051,14 +1064,14 @@ pub mod pallet { Ok(_) => { // Extrinsic call succeeded log::info!( - "Sent end_billing_report successfully for cluster_id: {:?}, era_id: {:?}", + "πŸ­πŸ“Sent end_billing_report successfully for cluster_id: {:?}, era_id: {:?}", cluster_id, era_id ); }, Err(e) => { log::error!( - "Error to post end_billing_report for cluster_id: {:?}, era_id: {:?}: {:?}", + "🏭❌Error to post end_billing_report for cluster_id: {:?}, era_id: {:?}: {:?}", cluster_id, era_id, e @@ -1071,12 +1084,15 @@ pub mod pallet { }, } } else { - log::error!("No account available to sign the transaction"); + log::error!("🏭❌No account available to sign the transaction"); errors.push(OCWError::NoAvailableSigner); } }, Ok(None) => { - log::info!("No era for end_billing_report for cluster_id: {:?}", cluster_id); + log::info!( + "πŸ­πŸ“No era for end_billing_report for cluster_id: {:?}", + cluster_id + ); }, Err(e) => { errors.push(e); @@ -1091,7 +1107,7 @@ pub mod pallet { for (_, res) in &results { match res { Ok(()) => log::info!("βœ… Successfully submitted emit_consensus_errors tx"), - Err(_) => log::error!("πŸ¦€ Failed to submit emit_consensus_errors tx"), + Err(_) => log::error!("🏭❌ Failed to submit emit_consensus_errors tx"), } } } @@ -1218,7 +1234,6 @@ pub mod pallet { era_id, customers_activity_batch_roots, ) - .map_err(|err| vec![err])?; } else { let era_activity = EraActivity { id: era_id, start, end }; @@ -1239,36 +1254,42 @@ pub mod pallet { era_id, customers_activity_batch_roots, ) - .map_err(|err| vec![err])?; + } else { + Ok(None) } } + } else { + Ok(None) } + } else { + Ok(None) } - Ok(None) } pub(crate) fn fetch_customer_activity( cluster_id: &ClusterId, era_id: DdcEra, customers_activity_batch_roots: Vec, - ) -> Result, OCWError> { + ) -> Result, Vec> { if let Some(max_batch_index) = customers_activity_batch_roots.len().checked_sub(1) // -1 cause payout expects max_index, not length { let max_batch_index: u16 = max_batch_index.try_into().map_err(|_| { - OCWError::BatchIndexConversionFailed { cluster_id: *cluster_id, era_id } + vec![OCWError::BatchIndexConversionFailed { cluster_id: *cluster_id, era_id }] })?; Ok(Some((era_id, max_batch_index))) } else { - Err(OCWError::EmptyCustomerActivity { cluster_id: *cluster_id, era_id }) + Err(vec![OCWError::EmptyCustomerActivity { cluster_id: *cluster_id, era_id }]) } } pub(crate) fn prepare_send_charging_customers_batch( cluster_id: &ClusterId, batch_size: usize, - ) -> Result)>, OCWError> { - if let Some((era_id, _start, _end)) = + dac_nodes: &[(NodePubKey, StorageNodeParams)], + min_nodes: u16, + ) -> Result)>, Vec> { + if let Some((era_id, start, end)) = Self::get_era_for_payout(cluster_id, EraValidationStatus::PayoutInProgress) { if T::PayoutVisitor::get_billing_report_status(cluster_id, era_id) == @@ -1284,90 +1305,131 @@ pub mod pallet { )) = Self::fetch_validation_activities::( cluster_id, era_id, ) { - let batch_index = T::PayoutVisitor::get_next_customer_batch_for_payment( - cluster_id, era_id, - ) - .map_err(|_| OCWError::BillingReportDoesNotExist { - cluster_id: *cluster_id, + Self::fetch_charging_activities( + cluster_id, + batch_size, era_id, - })?; + customers_activity_in_consensus, + customers_activity_batch_roots, + ) + } else { + let era_activity = EraActivity { id: era_id, start, end }; + + let _ = Self::process_dac_data( + cluster_id, + Some(era_activity), + dac_nodes, + min_nodes, + batch_size, + )?; - if let Some(index) = batch_index { - let i: usize = index.into(); - // todo! store batched activity to avoid splitting it again each time - let customers_activity_batched = Self::split_to_batches( - &customers_activity_in_consensus, + if let Some(( + customers_activity_in_consensus, + _, + customers_activity_batch_roots, + _, + _, + _, + )) = Self::fetch_validation_activities::( + cluster_id, era_id, + ) { + Self::fetch_charging_activities( + cluster_id, batch_size, - ); - - let batch_root = customers_activity_batch_roots[i]; - let store = MemStore::default(); - let mut mmr: MMR< - ActivityHash, - MergeActivityHash, - &MemStore, - > = MemMMR::<_, MergeActivityHash>::new(0, &store); - - let leaf_position_map: Vec<(ActivityHash, u64)> = - customers_activity_batch_roots - .iter() - .map(|a| (*a, mmr.push(*a).unwrap())) - .collect(); - - let leaf_position: Vec<(u64, ActivityHash)> = leaf_position_map - .iter() - .filter(|&(l, _)| l == &batch_root) - .map(|&(ref l, p)| (p, *l)) - .collect(); - let position: Vec = - leaf_position.clone().into_iter().map(|(p, _)| p).collect(); - - let proof = mmr - .gen_proof(position) - .map_err(|_| OCWError::FailedToCreateMerkleProof { - cluster_id: *cluster_id, - era_id, - })? - .proof_items() - .to_vec(); - - let batch_proof = MMRProof { - mmr_size: mmr.mmr_size(), - proof, - leaf_with_position: leaf_position[0], - }; - return Ok(Some(( era_id, - CustomerBatch { - batch_index: index, - payers: customers_activity_batched[i] - .iter() - .map(|activity| { - let account_id = T::AccountId::decode( - &mut &activity.customer_id.as_bytes()[..], - ) - .unwrap(); - let customer_usage = CustomerUsage { - transferred_bytes: activity.transferred_bytes, - stored_bytes: activity.stored_bytes, - number_of_puts: activity.number_of_puts, - number_of_gets: activity.number_of_gets, - }; - (account_id, activity.bucket_id, customer_usage) - }) - .collect(), - batch_proof, - }, - ))); + customers_activity_in_consensus, + customers_activity_batch_roots, + ) } else { - return Ok(None); + Ok(None) } - } /*else { - // todo! no data - reconstruct the data from DAC - }*/ + } + } else { + Ok(None) } + } else { + Ok(None) + } + } + + fn fetch_charging_activities( + cluster_id: &ClusterId, + batch_size: usize, + era_id: DdcEra, + customers_activity_in_consensus: Vec, + customers_activity_batch_roots: Vec, + ) -> Result)>, Vec> { + let batch_index = T::PayoutVisitor::get_next_customer_batch_for_payment( + cluster_id, era_id, + ) + .map_err(|_| { + vec![OCWError::BillingReportDoesNotExist { cluster_id: *cluster_id, era_id }] + })?; + + if let Some(index) = batch_index { + let i: usize = index.into(); + // todo! store batched activity to avoid splitting it again each time + let customers_activity_batched = + Self::split_to_batches(&customers_activity_in_consensus, batch_size); + + let batch_root = customers_activity_batch_roots[i]; + let store = MemStore::default(); + let mut mmr: MMR> = + MemMMR::<_, MergeActivityHash>::new(0, &store); + + let leaf_position_map: Vec<(ActivityHash, u64)> = customers_activity_batch_roots + .iter() + .map(|a| (*a, mmr.push(*a).unwrap())) + .collect(); + + let leaf_position: Vec<(u64, ActivityHash)> = leaf_position_map + .iter() + .filter(|&(l, _)| l == &batch_root) + .map(|&(ref l, p)| (p, *l)) + .collect(); + let position: Vec = + leaf_position.clone().into_iter().map(|(p, _)| p).collect(); + + let proof = mmr + .gen_proof(position) + .map_err(|_| OCWError::FailedToCreateMerkleProof { + cluster_id: *cluster_id, + era_id, + }) + .map_err(|e| vec![e])? + .proof_items() + .to_vec(); + + let batch_proof = MMRProof { + mmr_size: mmr.mmr_size(), + proof, + leaf_with_position: leaf_position[0], + }; + Ok(Some(( + era_id, + CustomerBatch { + batch_index: index, + payers: customers_activity_batched[i] + .iter() + .map(|activity| { + let account_id = + T::AccountId::decode(&mut &activity.customer_id.as_bytes()[..]) + .unwrap(); + let customer_usage = CustomerUsage { + transferred_bytes: activity.transferred_bytes, + stored_bytes: activity.stored_bytes, + number_of_puts: activity.number_of_puts, + number_of_gets: activity.number_of_gets, + }; + (account_id, activity.bucket_id, customer_usage) + }) + .collect(), + batch_proof, + }, + ))) + } else { + Ok(None) } - Ok(None) } pub(crate) fn prepare_end_charging_customers( @@ -1388,8 +1450,11 @@ pub mod pallet { pub(crate) fn prepare_begin_rewarding_providers( cluster_id: &ClusterId, - ) -> Result, OCWError> { - if let Some((era_id, _start, _end)) = + dac_nodes: &[(NodePubKey, StorageNodeParams)], + min_nodes: u16, + batch_size: usize, + ) -> Result, Vec> { + if let Some((era_id, start, end)) = Self::get_era_for_payout(cluster_id, EraValidationStatus::PayoutInProgress) { if T::PayoutVisitor::get_billing_report_status(cluster_id, era_id) == @@ -1405,54 +1470,93 @@ pub mod pallet { )) = Self::fetch_validation_activities::( cluster_id, era_id, ) { - if let Some(max_batch_index) = - nodes_activity_batch_roots.len().checked_sub(1) - // -1 cause payout expects max_index, not length - { - let max_batch_index: u16 = - max_batch_index.try_into().map_err(|_| { - OCWError::BatchIndexConversionFailed { - cluster_id: *cluster_id, - era_id, - } - })?; - - let total_node_usage = nodes_activity_in_consensus.into_iter().fold( - NodeUsage { - transferred_bytes: 0, - stored_bytes: 0, - number_of_puts: 0, - number_of_gets: 0, - }, - |mut acc, activity| { - acc.transferred_bytes += activity.transferred_bytes; - acc.stored_bytes += activity.stored_bytes; - acc.number_of_puts += activity.number_of_puts; - acc.number_of_gets += activity.number_of_gets; - acc - }, - ); - - return Ok(Some((era_id, max_batch_index, total_node_usage))); - } else { - return Err(OCWError::EmptyCustomerActivity { - cluster_id: *cluster_id, + Self::fetch_reward_activities( + cluster_id, + era_id, + nodes_activity_in_consensus, + nodes_activity_batch_roots, + ) + } else { + let era_activity = EraActivity { id: era_id, start, end }; + + let _ = Self::process_dac_data( + cluster_id, + Some(era_activity), + dac_nodes, + min_nodes, + batch_size, + )?; + + if let Some(( + _, + _, + _, + nodes_activity_in_consensus, + _, + nodes_activity_batch_roots, + )) = Self::fetch_validation_activities::( + cluster_id, era_id, + ) { + Self::fetch_reward_activities( + cluster_id, era_id, - }); + nodes_activity_in_consensus, + nodes_activity_batch_roots, + ) + } else { + Ok(None) } - } /*else { - // todo! no data - reconstruct the data from DAC - }*/ + } + } else { + Ok(None) } + } else { + Ok(None) + } + } + + fn fetch_reward_activities( + cluster_id: &ClusterId, + era_id: DdcEra, + nodes_activity_in_consensus: Vec, + nodes_activity_batch_roots: Vec, + ) -> Result, Vec> { + if let Some(max_batch_index) = nodes_activity_batch_roots.len().checked_sub(1) + // -1 cause payout expects max_index, not length + { + let max_batch_index: u16 = max_batch_index.try_into().map_err(|_| { + vec![OCWError::BatchIndexConversionFailed { cluster_id: *cluster_id, era_id }] + })?; + + let total_node_usage = nodes_activity_in_consensus.into_iter().fold( + NodeUsage { + transferred_bytes: 0, + stored_bytes: 0, + number_of_puts: 0, + number_of_gets: 0, + }, + |mut acc, activity| { + acc.transferred_bytes += activity.transferred_bytes; + acc.stored_bytes += activity.stored_bytes; + acc.number_of_puts += activity.number_of_puts; + acc.number_of_gets += activity.number_of_gets; + acc + }, + ); + + Ok(Some((era_id, max_batch_index, total_node_usage))) + } else { + Err(vec![OCWError::EmptyCustomerActivity { cluster_id: *cluster_id, era_id }]) } - Ok(None) } pub(crate) fn prepare_send_rewarding_providers_batch( cluster_id: &ClusterId, batch_size: usize, - ) -> Result)>, OCWError> { - if let Some((era_id, _start, _end)) = + dac_nodes: &[(NodePubKey, StorageNodeParams)], + min_nodes: u16, + ) -> Result)>, Vec> { + if let Some((era_id, start, end)) = Self::get_era_for_payout(cluster_id, EraValidationStatus::PayoutInProgress) { if T::PayoutVisitor::get_billing_report_status(cluster_id, era_id) == @@ -1468,90 +1572,134 @@ pub mod pallet { )) = Self::fetch_validation_activities::( cluster_id, era_id, ) { - let batch_index = T::PayoutVisitor::get_next_provider_batch_for_payment( - cluster_id, era_id, - ) - .map_err(|_| OCWError::BillingReportDoesNotExist { - cluster_id: *cluster_id, + Self::fetch_reward_provider_batch( + cluster_id, + batch_size, era_id, - })?; - - if let Some(index) = batch_index { - let i: usize = index.into(); - // todo! store batched activity to avoid splitting it again each time - let nodes_activity_batched = - Self::split_to_batches(&nodes_activity_in_consensus, batch_size); - - let batch_root = nodes_activity_batch_roots[i]; - let store = MemStore::default(); - let mut mmr: MMR< - ActivityHash, - MergeActivityHash, - &MemStore, - > = MemMMR::<_, MergeActivityHash>::new(0, &store); - - let leaf_position_map: Vec<(ActivityHash, u64)> = - nodes_activity_batch_roots - .iter() - .map(|a| (*a, mmr.push(*a).unwrap())) - .collect(); - - let leaf_position: Vec<(u64, ActivityHash)> = leaf_position_map - .iter() - .filter(|&(l, _)| l == &batch_root) - .map(|&(ref l, p)| (p, *l)) - .collect(); - let position: Vec = - leaf_position.clone().into_iter().map(|(p, _)| p).collect(); - - let proof = mmr - .gen_proof(position) - .map_err(|_| OCWError::FailedToCreateMerkleProof { - cluster_id: *cluster_id, - era_id, - })? - .proof_items() - .to_vec(); - - // todo! attend [i] through get(i).ok_or() - // todo! attend accountid conversion - let batch_proof = MMRProof { - mmr_size: mmr.mmr_size(), - proof, - leaf_with_position: leaf_position[0], - }; - return Ok(Some(( + nodes_activity_in_consensus, + nodes_activity_batch_roots, + ) + } else { + let era_activity = EraActivity { id: era_id, start, end }; + + let _ = Self::process_dac_data( + cluster_id, + Some(era_activity), + dac_nodes, + min_nodes, + batch_size, + )?; + + if let Some(( + _, + _, + _, + nodes_activity_in_consensus, + _, + nodes_activity_batch_roots, + )) = Self::fetch_validation_activities::( + cluster_id, era_id, + ) { + Self::fetch_reward_provider_batch( + cluster_id, + batch_size, era_id, - ProviderBatch { - batch_index: index, - payees: nodes_activity_batched[i] - .iter() - .map(|activity| { - let provider_id = T::AccountId::decode( - &mut &activity.provider_id.as_bytes()[..], - ) - .unwrap(); - let node_usage = NodeUsage { - transferred_bytes: activity.transferred_bytes, - stored_bytes: activity.stored_bytes, - number_of_puts: activity.number_of_puts, - number_of_gets: activity.number_of_gets, - }; - (provider_id, node_usage) - }) - .collect(), - batch_proof, - }, - ))); + nodes_activity_in_consensus, + nodes_activity_batch_roots, + ) } else { - return Ok(None); + Ok(None) } - } /*else { - // todo! no data - reconstruct the data from DAC - }*/ + } + } else { + Ok(None) } + } else { + Ok(None) + } + } + + fn fetch_reward_provider_batch( + cluster_id: &ClusterId, + batch_size: usize, + era_id: DdcEra, + nodes_activity_in_consensus: Vec, + nodes_activity_batch_roots: Vec, + ) -> Result)>, Vec> { + let batch_index = T::PayoutVisitor::get_next_provider_batch_for_payment( + cluster_id, era_id, + ) + .map_err(|_| { + vec![OCWError::BillingReportDoesNotExist { cluster_id: *cluster_id, era_id }] + })?; + + if let Some(index) = batch_index { + let i: usize = index.into(); + // todo! store batched activity to avoid splitting it again each time + let nodes_activity_batched = + Self::split_to_batches(&nodes_activity_in_consensus, batch_size); + + let batch_root = nodes_activity_batch_roots[i]; + let store = MemStore::default(); + let mut mmr: MMR> = + MemMMR::<_, MergeActivityHash>::new(0, &store); + + let leaf_position_map: Vec<(ActivityHash, u64)> = nodes_activity_batch_roots + .iter() + .map(|a| (*a, mmr.push(*a).unwrap())) + .collect(); + + let leaf_position: Vec<(u64, ActivityHash)> = leaf_position_map + .iter() + .filter(|&(l, _)| l == &batch_root) + .map(|&(ref l, p)| (p, *l)) + .collect(); + let position: Vec = + leaf_position.clone().into_iter().map(|(p, _)| p).collect(); + + let proof = mmr + .gen_proof(position) + .map_err(|_| { + vec![OCWError::FailedToCreateMerkleProof { + cluster_id: *cluster_id, + era_id, + }] + })? + .proof_items() + .to_vec(); + + // todo! attend [i] through get(i).ok_or() + // todo! attend accountid conversion + let batch_proof = MMRProof { + mmr_size: mmr.mmr_size(), + proof, + leaf_with_position: leaf_position[0], + }; + Ok(Some(( + era_id, + ProviderBatch { + batch_index: index, + payees: nodes_activity_batched[i] + .iter() + .map(|activity| { + let provider_id = + T::AccountId::decode(&mut &activity.provider_id.as_bytes()[..]) + .unwrap(); + let node_usage = NodeUsage { + transferred_bytes: activity.transferred_bytes, + stored_bytes: activity.stored_bytes, + number_of_puts: activity.number_of_puts, + number_of_gets: activity.number_of_gets, + }; + (provider_id, node_usage) + }) + .collect(), + batch_proof, + }, + ))) + } else { + Ok(None) } - Ok(None) } pub(crate) fn prepare_end_rewarding_providers(