Skip to content

Commit

Permalink
Refactor raise_min_prepare_time into shard client, fmt.
Browse files Browse the repository at this point in the history
  • Loading branch information
finnbear committed Sep 27, 2023
1 parent 5dbe08f commit ba0521b
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 56 deletions.
2 changes: 1 addition & 1 deletion src/bin/maelstrom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use async_maelstrom::process::{ProcNet, Process};
use async_maelstrom::runtime::Runtime;
use async_maelstrom::{Id, Status};
use async_trait::async_trait;
use tracing::{info, trace, warn};
use rand::{thread_rng, Rng};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
Expand All @@ -23,6 +22,7 @@ use tapirs::{
IrMembership, IrMessage, IrReplica, TapirClient, TapirReplica, TapirTransport, Transport,
};
use tokio::spawn;
use tracing::{info, trace, warn};

type K = String;
type V = String;
Expand Down
2 changes: 1 addition & 1 deletion src/ir/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::{
sync::{Arc, Mutex},
time::{Duration, Instant},
};
use tracing::{warn, info, trace, trace_span};
use tracing::{info, trace, trace_span, warn};

#[derive(Debug)]
pub enum Status {
Expand Down
2 changes: 1 addition & 1 deletion src/occ/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use crate::{
MvccStore,
};
use serde::{Deserialize, Serialize};
use tracing::trace;
use std::{
borrow::Borrow,
collections::{hash_map::Entry, BTreeMap, HashMap},
fmt::Debug,
hash::Hash,
ops::{Bound, Deref, DerefMut},
};
use tracing::trace;

#[derive(Serialize, Deserialize)]
pub struct Store<K, V, TS> {
Expand Down
2 changes: 1 addition & 1 deletion src/tapir/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::{
};
use futures::future::join_all;
use rand::{thread_rng, Rng};
use tracing::trace;
use std::{
collections::HashMap,
future::Future,
Expand All @@ -16,6 +15,7 @@ use std::{
time::Duration,
};
use tokio::select;
use tracing::trace;

pub struct Client<K: Key, V: Value, T: TapirTransport<K, V>> {
inner: Arc<Mutex<Inner<K, V, T>>>,
Expand Down
68 changes: 19 additions & 49 deletions src/tapir/replica.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
use super::{Key, ShardNumber, Timestamp, Value, CO, CR, IO, UO, UR};
use crate::ir::ReplyUnlogged;
use crate::tapir::ShardClient;
use crate::util::vectorize;
use crate::{
IrClient, IrMembership, IrMembershipSize, IrOpId, IrRecord, IrReplicaUpcalls, OccPrepareResult,
OccStore, OccTransaction, OccTransactionId, TapirTransport,
IrClientId, IrMembership, IrMembershipSize, IrOpId, IrRecord, IrReplicaUpcalls,
OccPrepareResult, OccStore, OccTransaction, OccTransactionId, TapirTransport,
};
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use tokio::time::timeout;
use tracing::{trace, warn};
use std::task::Context;
use std::time::Duration;
use std::{collections::HashMap, future::Future, hash::Hash};
use tokio::time::timeout;
use tracing::{trace, warn};

/// Diverge from TAPIR and don't maintain a no-vote list. Instead, wait for a
/// view change to syncronize each participant shard's prepare result and then
Expand Down Expand Up @@ -67,56 +68,23 @@ impl<K: Key, V: Value> Replica<K, V> {

async move {
let mut participants = HashMap::new();
let client_id = IrClientId::new();
for shard in transaction.participants() {
let membership = transport.shard_addresses(shard).await;
participants.insert(shard, IrClient::new(membership, transport.clone()));
participants.insert(
shard,
ShardClient::new(client_id, shard, membership, transport.clone()),
);
}

let min_prepares = join_all(participants.values().map(|client| {
client.invoke_consensus(
CO::RaiseMinPrepareTime {
time: commit.time + 1,
},
|results, size| {
let times = results.iter().filter_map(|(r, c)| {
if let CR::RaiseMinPrepareTime { time } = r {
Some((*time, *c))
} else {
debug_assert!(false);
None
}
});

// Find a time that a quorum of replicas agree on.
CR::RaiseMinPrepareTime {
time: times
.clone()
.filter(|&(time, _)| {
times
.clone()
.filter(|&(t, _)| t >= time)
.map(|(_, c)| c)
.sum::<usize>()
>= size.f_plus_one()
})
.map(|(t, _)| t)
.max()
.unwrap_or_else(|| {
debug_assert!(false);
0
}),
}
},
)
}))
let min_prepares = join_all(
participants
.values()
.map(|client| client.raise_min_prepare_time(commit.time + 1)),
)
.await;

if min_prepares.into_iter().any(|min_prepare| {
let CR::RaiseMinPrepareTime { time: min_prepare_time } = min_prepare else {
debug_assert!(false);
return true;
};

if min_prepares.into_iter().any(|min_prepare_time| {
if commit.time >= min_prepare_time {
// Not ready.
return true;
Expand Down Expand Up @@ -166,7 +134,7 @@ impl<K: Key, V: Value> Replica<K, V> {
}

let results = join_all(participants.values().map(|client| {
let (future, membership) = client.invoke_unlogged_joined(UO::CheckPrepare {
let (future, membership) = client.inner.invoke_unlogged_joined(UO::CheckPrepare {
transaction_id,
commit,
});
Expand Down Expand Up @@ -204,6 +172,7 @@ impl<K: Key, V: Value> Replica<K, V> {
async move {
if ok {
client
.inner
.invoke_inconsistent(IO::Commit {
transaction_id,
transaction,
Expand All @@ -212,6 +181,7 @@ impl<K: Key, V: Value> Replica<K, V> {
.await
} else {
client
.inner
.invoke_inconsistent(IO::Abort {
transaction_id,
commit: Some(commit),
Expand Down
46 changes: 45 additions & 1 deletion src/tapir/shard_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::future::Future;

pub struct ShardClient<K: Key, V: Value, T: Transport<Replica<K, V>>> {
shard: ShardNumber,
inner: IrClient<Replica<K, V>, T>,
pub(crate) inner: IrClient<Replica<K, V>, T>,
}

impl<K: Key, V: Value, T: Transport<Replica<K, V>>> Clone for ShardClient<K, V, T> {
Expand Down Expand Up @@ -146,4 +146,48 @@ impl<K: Key, V: Value, T: Transport<Replica<K, V>>> ShardClient<K, V, T> {
}
})
}

pub fn raise_min_prepare_time(&self, time: u64) -> impl Future<Output = u64> + Send {
let future =
self.inner
.invoke_consensus(CO::RaiseMinPrepareTime { time }, |results, size| {
let times = results.iter().filter_map(|(r, c)| {
if let CR::RaiseMinPrepareTime { time } = r {
Some((*time, *c))
} else {
debug_assert!(false);
None
}
});

// Find a time that a quorum of replicas agree on.
CR::RaiseMinPrepareTime {
time: times
.clone()
.filter(|&(time, _)| {
times
.clone()
.filter(|&(t, _)| t >= time)
.map(|(_, c)| c)
.sum::<usize>()
>= size.f_plus_one()
})
.map(|(t, _)| t)
.max()
.unwrap_or_else(|| {
debug_assert!(false);
0
}),
}
});
async move {
match future.await {
CR::RaiseMinPrepareTime { time } => time,
_ => {
debug_assert!(false);
0
}
}
}
}
}
6 changes: 5 additions & 1 deletion src/tapir/tests/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ use std::{
use tokio::time::timeout;

fn init_tracing() {
let _ = tracing::subscriber::set_global_default(tracing_subscriber::fmt().with_env_filter(tracing_subscriber::EnvFilter::from_default_env()).finish());
let _ = tracing::subscriber::set_global_default(
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.finish(),
);
}

type K = i64;
Expand Down
2 changes: 1 addition & 1 deletion src/transport/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::{
};
use rand::{thread_rng, Rng};
use serde::{de::DeserializeOwned, Serialize};
use tracing::{trace, warn};
use std::{
collections::HashMap,
fmt::Debug,
Expand All @@ -14,6 +13,7 @@ use std::{
sync::{Arc, Mutex, RwLock},
time::{Duration, SystemTime},
};
use tracing::{trace, warn};

const LOG: bool = true;

Expand Down

0 comments on commit ba0521b

Please sign in to comment.