Skip to content

Commit

Permalink
Fixed Start and End Era issue (#381)
Browse files Browse the repository at this point in the history
## Description
<!-- Describe what change this PR is implementing -->

## Types of Changes
Please select the branch type you are merging and fill in the relevant
template.
<!--- Check the following box with an x if the following applies: -->
- [ ] Hotfix
- [ ] Release
- [ ] Fix or Feature

## Fix or Feature
<!--- Check the following box with an x if the following applies: -->

### Types of Changes
<!--- What types of changes does your code introduce? -->
- [ ] Tech Debt (Code improvements)
- [ ] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
- [ ] Dependency upgrade (A change in substrate or any 3rd party crate
version)

### Migrations and Hooks
<!--- Check the following box with an x if the following applies: -->
- [ ] This change requires a runtime migration.
- [ ] Modifies `on_initialize`
- [ ] Modifies `on_finalize`

### Checklist for Fix or Feature
<!--- All boxes need to be checked. Follow this checklist before
requiring PR review -->
- [ ] Change has been tested locally.
- [ ] Change adds / updates tests if applicable.
- [ ] Changelog doc updated.
- [ ] `spec_version` has been incremented.
- [ ] `network-relayer`'s
[events](https://github.com/Cerebellum-Network/network-relayer/blob/dev-cere/shared/substrate/events.go)
have been updated according to the blockchain events if applicable.
- [ ] All CI checks have been passed successfully

## Checklist for Hotfix
<!--- All boxes need to be checked. Follow this checklist before
requiring PR review -->
- [ ] Changelog has been updated.
- [ ] Crate version has been updated.
- [ ] `spec_version` has been incremented.
- [ ] Transaction version has been updated if required.
- [ ] Pull Request to `dev` has been created.
- [ ] Pull Request to `staging` has been created.
- [ ] `network-relayer`'s
[events](https://github.com/Cerebellum-Network/network-relayer/blob/dev-cere/shared/substrate/events.go)
have been updated according to the blockchain events if applicable.
- [ ] All CI checks have been passed successfully

## Checklist for Release
<!--- All boxes need to be checked. Follow this checklist before
requiring PR review -->
- [ ] Change has been deployed to Devnet.
- [ ] Change has been tested in Devnet.
- [ ] Change has been deployed to Qanet.
- [ ] Change has been tested in Qanet.
- [ ] Change has been deployed to Testnet.
- [ ] Change has been tested in Testnet.
- [ ] Changelog has been updated.
- [ ] Crate version has been updated.
- [ ] Spec version has been updated.
- [ ] Transaction version has been updated if required.
- [ ] All CI checks have been passed successfully
  • Loading branch information
ayushmishra2005 authored Jul 2, 2024
1 parent 5b223eb commit bcbee27
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 85 deletions.
6 changes: 4 additions & 2 deletions pallets/ddc-payouts/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,8 @@ pub mod pallet {
end_era: i64,
) -> DispatchResult {
let caller = ensure_signed(origin)?;
ensure!(T::ValidatorVisitor::is_ocw_validator(caller), Error::<T>::Unauthorised);
ensure!(T::ValidatorVisitor::is_ocw_validator(caller), Error::<T>::Unauthorised); //
// todo! need to refactor this

ensure!(
ActiveBillingReports::<T>::try_get(cluster_id, era).is_err(),
Expand Down Expand Up @@ -353,7 +354,8 @@ pub mod pallet {
max_batch_index: BatchIndex,
) -> DispatchResult {
let caller = ensure_signed(origin)?;
ensure!(T::ValidatorVisitor::is_ocw_validator(caller), Error::<T>::Unauthorised);
ensure!(T::ValidatorVisitor::is_ocw_validator(caller), Error::<T>::Unauthorised); //
// todo! need to refactor this

ensure!(max_batch_index < MaxBatchesCount::get(), Error::<T>::BatchIndexOverflow);

Expand Down
144 changes: 84 additions & 60 deletions pallets/ddc-verification/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,21 @@ pub mod pallet {
}

/// Era activity of a node.
#[derive(Serialize, Copy, Deserialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub(crate) struct EraActivity {
#[derive(
Debug,
Serialize,
Deserialize,
Clone,
Hash,
Ord,
PartialOrd,
PartialEq,
Eq,
TypeInfo,
Encode,
Decode,
)]
pub struct EraActivity {
/// Era id.
pub id: DdcEra,
pub start: i64,
Expand Down Expand Up @@ -550,8 +563,9 @@ pub mod pallet {
}

if Self::fetch_current_validator().is_err() {
log::info!("🏄‍ Setting current validator..."); // todo! add public key
let _ = signer.send_signed_transaction(|account| {
log::info!("🏄‍ Setting current validator... {:?}", account.id);

Self::store_current_validator(account.id.encode());

Call::set_current_validator {}
Expand Down Expand Up @@ -582,17 +596,17 @@ pub mod pallet {
Self::process_dac_data(&cluster_id, None, &dac_nodes, min_nodes, batch_size.into());

match processed_dac_data {
Ok(Some((era_id, payers_merkle_root_hash, payees_merkle_root_hash))) => {
Ok(Some((era_activity, payers_merkle_root_hash, payees_merkle_root_hash))) => {
log::info!(
"🚀 Processing era_id: {:?} for cluster_id: {:?}",
era_id,
era_activity.clone(),
cluster_id
);

let results = signer.send_signed_transaction(|_account| {
Call::set_prepare_era_for_payout {
cluster_id,
era_id,
era_activity: era_activity.clone(),
payers_merkle_root_hash,
payees_merkle_root_hash,
}
Expand All @@ -602,22 +616,22 @@ pub mod pallet {
match res {
Ok(()) => {
log::info!(
"⛳️ Merkle roots posted on-chain for cluster_id: {:?}, era_id: {:?}",
"⛳️ Merkle roots posted on-chain for cluster_id: {:?}, era: {:?}",
cluster_id,
era_id
era_activity.clone()
);
},
Err(e) => {
log::error!(
"🦀 Error to post merkle roots on-chain for cluster_id: {:?}, era_id: {:?}: {:?}",
"🦀 Error to post merkle roots on-chain for cluster_id: {:?}, era: {:?}: {:?}",
cluster_id,
era_id,
era_activity.clone(),
e
);
// Extrinsic call failed
errors.push(OCWError::PrepareEraTransactionError {
cluster_id,
era_id,
era_id: era_activity.id,
payers_merkle_root_hash,
payees_merkle_root_hash,
});
Expand All @@ -637,26 +651,28 @@ pub mod pallet {
match Self::prepare_begin_billing_report(&cluster_id) {
Ok(Some((era_id, start_era, end_era))) => {
log::info!(
"🚀 process_start_payout processed successfully for cluster_id: {:?}, era_id: {:?}",
"🚀 process_start_payout processed successfully for cluster_id: {:?}, era_id: {:?}, start_era: {:?}, end_era: {:?} ",
cluster_id,
era_id
era_id,
start_era,
end_era
);

if let Some((_, res)) = signer.send_signed_transaction(|_acc| {
let results = signer.send_signed_transaction(|_account| {
Call::begin_billing_report { cluster_id, era_id, start_era, end_era }
}) {
});

for (_, res) in &results {
match res {
Ok(_) => {
// Extrinsic call succeeded
Ok(()) => {
log::info!(
"Sent begin_billing_report successfully for cluster_id: {:?}, era_id: {:?}",
"🏄‍ Sent begin_billing_report successfully for cluster_id: {:?}, era_id: {:?}",
cluster_id,
era_id
);
},
Err(e) => {
log::error!(
"Error to post begin_billing_report for cluster_id: {:?}, era_id: {:?}: {:?}",
"🦀 Error to post begin_billing_report for cluster_id: {:?}, era_id: {:?}: {:?}",
cluster_id,
era_id,
e
Expand All @@ -668,13 +684,10 @@ pub mod pallet {
});
},
}
} else {
log::error!("No account available to sign the transaction");
errors.push(OCWError::NoAvailableSigner);
}
},
Ok(None) => {
log::info!("No era for payout for cluster_id: {:?}", cluster_id);
log::info!("🦀 No era for payout for cluster_id: {:?}", cluster_id);
},
Err(e) => {
errors.push(e);
Expand All @@ -685,7 +698,7 @@ pub mod pallet {
match Self::prepare_begin_charging_customers(&cluster_id) {
Ok(Some((era_id, max_batch_index))) => {
log::info!(
"prepare_begin_charging_customers processed successfully for cluster_id: {:?}, era_id: {:?}",
"🎁 prepare_begin_charging_customers processed successfully for cluster_id: {:?}, era_id: {:?}",
cluster_id,
era_id
);
Expand All @@ -697,14 +710,14 @@ pub mod pallet {
Ok(_) => {
// Extrinsic call succeeded
log::info!(
"Sent begin_charging_customers successfully for cluster_id: {:?}, era_id: {:?}",
"🚀 Sent begin_charging_customers successfully for cluster_id: {:?}, era_id: {:?}",
cluster_id,
era_id
);
},
Err(e) => {
log::error!(
"Error to post begin_charging_customers for cluster_id: {:?}, era_id: {:?}: {:?}",
"🦀 Error to post begin_charging_customers for cluster_id: {:?}, era_id: {:?}: {:?}",
cluster_id,
era_id,
e
Expand All @@ -717,13 +730,13 @@ pub mod pallet {
},
}
} else {
log::error!("No account available to sign the transaction");
log::error!("🦀 No account available to sign the transaction");
errors.push(OCWError::NoAvailableSigner);
}
},
Ok(None) => {
log::info!(
"No era for begin_charging_customers for cluster_id: {:?}",
"🦀 No era for begin_charging_customers for cluster_id: {:?}",
cluster_id
);
},
Expand Down Expand Up @@ -959,31 +972,32 @@ pub mod pallet {
dac_nodes: &[(NodePubKey, StorageNodeParams)],
min_nodes: u16,
batch_size: usize,
) -> Result<Option<(DdcEra, ActivityHash, ActivityHash)>, Vec<OCWError>> {
) -> Result<Option<(EraActivity, ActivityHash, ActivityHash)>, Vec<OCWError>> {
log::info!("🚀 Processing dac data for cluster_id: {:?}", cluster_id);
if dac_nodes.len().ilog2() < min_nodes.into() {
return Err(vec![OCWError::NotEnoughDACNodes { num_nodes: min_nodes }]);
}

let era_id = if let Some(era_id) = era_id_to_process {
era_id
let era_activity = if let Some(era_id) = era_id_to_process {
EraActivity { id: era_id, start: Default::default(), end: Default::default() }
} else {
match Self::get_era_for_validation(cluster_id, dac_nodes) {
Ok(Some(era_id)) => era_id,
Ok(Some(era_activity)) => era_activity,
Ok(None) => return Ok(None),
Err(err) => return Err(vec![err]),
}
};

let nodes_usage = Self::fetch_nodes_usage_for_era(cluster_id, era_id, dac_nodes)
.map_err(|err| vec![err])?;
let nodes_usage =
Self::fetch_nodes_usage_for_era(cluster_id, era_activity.id, dac_nodes)
.map_err(|err| vec![err])?;
let customers_usage =
Self::fetch_customers_usage_for_era(cluster_id, era_id, dac_nodes)
Self::fetch_customers_usage_for_era(cluster_id, era_activity.id, dac_nodes)
.map_err(|err| vec![err])?;

let customers_activity_in_consensus = Self::get_consensus_for_activities(
cluster_id,
era_id,
era_activity.id,
&customers_usage,
min_nodes,
Percent::from_percent(T::MAJORITY),
Expand All @@ -999,7 +1013,7 @@ pub mod pallet {

let nodes_activity_in_consensus = Self::get_consensus_for_activities(
cluster_id,
era_id,
era_activity.id,
&nodes_usage,
min_nodes,
Percent::from_percent(T::MAJORITY),
Expand All @@ -1015,7 +1029,7 @@ pub mod pallet {

Self::store_validation_activities(
cluster_id,
era_id,
era_activity.id,
&customers_activity_in_consensus,
customers_activity_root,
&customers_activity_batch_roots,
Expand All @@ -1024,7 +1038,7 @@ pub mod pallet {
&nodes_activity_batch_roots,
);
log::info!("🙇‍ Dac data processing completed for cluster_id: {:?}", cluster_id);
Ok(Some((era_id, customers_activity_root, nodes_activity_root)))
Ok(Some((era_activity, customers_activity_root, nodes_activity_root)))
}

// let batches = split into batches customers_activity_in_consensus
Expand Down Expand Up @@ -1493,7 +1507,7 @@ pub mod pallet {
// todo! this needs to be rewriten - too complex and inefficient
cluster_id: &ClusterId,
dac_nodes: &[(NodePubKey, StorageNodeParams)],
) -> Result<Option<DdcEra>, OCWError> {
) -> Result<Option<EraActivity>, OCWError> {
let current_validator_data = Self::fetch_current_validator()?;

let current_validator = T::AccountId::decode(&mut &current_validator_data[..]).unwrap();
Expand All @@ -1503,17 +1517,15 @@ pub mod pallet {

let all_ids = Self::fetch_processed_era_for_node(cluster_id, dac_nodes)?;

let ids_greater_than_last_validated_era: Vec<DdcEra> = all_ids
let ids_greater_than_last_validated_era: Vec<EraActivity> = all_ids
.iter()
.flat_map(|eras| {
eras.iter().cloned().filter(|ids| ids.id > last_validated_era).map(|era| era.id)
})
.flat_map(|eras| eras.iter().filter(|&ids| ids.id > last_validated_era).cloned())
.sorted()
.collect::<Vec<DdcEra>>();
.collect::<Vec<EraActivity>>();

let mut grouped_data: Vec<(u32, DdcEra)> = Vec::new();
let mut grouped_data: Vec<(u32, EraActivity)> = Vec::new();
for (key, chunk) in
&ids_greater_than_last_validated_era.into_iter().chunk_by(|elt| *elt)
&ids_greater_than_last_validated_era.into_iter().chunk_by(|elt| elt.clone())
{
grouped_data.push((chunk.count() as u32, key));
}
Expand All @@ -1522,9 +1534,9 @@ pub mod pallet {
.into_iter()
.filter(|(v, _)| *v == dac_nodes.len() as u32)
.map(|(_, id)| id)
.collect::<Vec<DdcEra>>();
.collect::<Vec<EraActivity>>();

Ok(all_node_eras.iter().cloned().min())
Ok(all_node_eras.iter().cloned().min_by_key(|n| n.id))
}

/// Determines if a consensus is reached for a set of activities based on a specified
Expand Down Expand Up @@ -1868,7 +1880,7 @@ pub mod pallet {
pub fn set_prepare_era_for_payout(
origin: OriginFor<T>,
cluster_id: ClusterId,
era_id: DdcEra,
era_activity: EraActivity,
payers_merkle_root_hash: ActivityHash,
payees_merkle_root_hash: ActivityHash,
) -> DispatchResult {
Expand All @@ -1877,7 +1889,7 @@ pub mod pallet {
//ensure!(Self::is_ocw_validator(caller.clone()), Error::<T>::Unauthorised); // todo!
// need to refactor this Retrieve or initialize the EraValidation
let mut era_validation = {
let era_validations = <EraValidations<T>>::get(cluster_id, era_id);
let era_validations = <EraValidations<T>>::get(cluster_id, era_activity.id);

if era_validations.is_none() {
EraValidation {
Expand Down Expand Up @@ -1913,16 +1925,24 @@ pub mod pallet {
era_validation.payers_merkle_root_hash = payers_merkle_root_hash;
era_validation.payees_merkle_root_hash = payees_merkle_root_hash;
era_validation.status = EraValidationStatus::ReadyForPayout;
era_validation.start_era = era_activity.start;
era_validation.end_era = era_activity.end;

should_deposit_ready_event = true;
}

// Update the EraValidations storage
<EraValidations<T>>::insert(cluster_id, era_id, era_validation);
<EraValidations<T>>::insert(cluster_id, era_activity.id, era_validation);
if should_deposit_ready_event {
Self::deposit_event(Event::<T>::EraValidationReady { cluster_id, era_id });
Self::deposit_event(Event::<T>::EraValidationReady {
cluster_id,
era_id: era_activity.id,
});
} else {
Self::deposit_event(Event::<T>::EraValidationNotReady { cluster_id, era_id });
Self::deposit_event(Event::<T>::EraValidationNotReady {
cluster_id,
era_id: era_activity.id,
});
}

Ok(())
Expand Down Expand Up @@ -2134,12 +2154,15 @@ pub mod pallet {
end_era: i64,
) -> DispatchResult {
let sender = ensure_signed(origin)?;
ensure!(Self::is_ocw_validator(sender.clone()), Error::<T>::Unauthorised);
//ensure!(Self::is_ocw_validator(sender.clone()), Error::<T>::Unauthorised); // todo!
// need to refactor this
T::PayoutVisitor::begin_billing_report(sender, cluster_id, era_id, start_era, end_era)?;

let mut era_validation = <EraValidations<T>>::get(cluster_id, era_id).unwrap(); // should exist
era_validation.status = EraValidationStatus::PayoutInProgress;
<EraValidations<T>>::insert(cluster_id, era_id, era_validation);
EraValidations::<T>::mutate(cluster_id, era_id, |maybe_era_validations| {
if let Some(ref mut era_validation) = maybe_era_validations {
era_validation.status = EraValidationStatus::PayoutInProgress
}
});

Ok(())
}
Expand All @@ -2153,7 +2176,8 @@ pub mod pallet {
max_batch_index: BatchIndex,
) -> DispatchResult {
let sender = ensure_signed(origin)?;
ensure!(Self::is_ocw_validator(sender.clone()), Error::<T>::Unauthorised);
//ensure!(Self::is_ocw_validator(sender.clone()), Error::<T>::Unauthorised); // todo!
// need to refactor this
T::PayoutVisitor::begin_charging_customers(sender, cluster_id, era_id, max_batch_index)
}

Expand Down
Loading

0 comments on commit bcbee27

Please sign in to comment.