Skip to content

Commit

Permalink
Use tracing crate instead of eprintln.
Browse files Browse the repository at this point in the history
  • Loading branch information
finnbear committed Jul 14, 2023
1 parent 94f6b5e commit 5dbe08f
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 85 deletions.
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ required-features = ["maelstrom"]

[features]
default = ["maelstrom"]
maelstrom = ["async-maelstrom", "env_logger", "log", "async-trait"]
maelstrom = ["async-maelstrom", "async-trait", "tracing-subscriber"]

[dependencies]
bitcode = { version = "0.4.0", features = ["serde"] }
Expand All @@ -25,12 +25,13 @@ futures = "0.3.28"
pin-project-lite = "0.2.9"
derive_more = "0.99.17"
async-maelstrom = { version = "0.1.2", optional = true }
env_logger = { version = "0", optional = true }
log = { version = "0.4", optional = true }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", optional = true, features = ["env-filter"]}
async-trait = { version = "0", optional = true }

[dev-dependencies]
pprof = { version = "0.11.1", features = ["flamegraph"] }
tracing-subscriber = {version = "0.3.17", features = ["env-filter"]}

[profile.release]
debug-assertions = true
27 changes: 13 additions & 14 deletions src/bin/maelstrom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use async_maelstrom::process::{ProcNet, Process};
use async_maelstrom::runtime::Runtime;
use async_maelstrom::{Id, Status};
use async_trait::async_trait;
use log::info;
use tracing::{info, trace, warn};
use rand::{thread_rng, Rng};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
Expand Down Expand Up @@ -142,7 +142,7 @@ impl Transport<TapirReplica<K, V>> for Maelstrom {
is_reply_to: None,
do_reply_to: Some(reply),
};
eprintln!("{id} sending {message:?} to {address}");
trace!("{id} sending {message:?} to {address}");
let inner = Arc::clone(&self.inner);
async move {
loop {
Expand Down Expand Up @@ -180,7 +180,7 @@ impl Transport<TapirReplica<K, V>> for Maelstrom {
do_reply_to: None,
is_reply_to: None,
};
//eprintln!("{} do-sending {message:?} to {address}", self.id);
trace!("{} do-sending {message:?} to {address}", self.id);
let src = self.id.to_string();
let txq = self.inner.net.txq.clone();
tokio::spawn(async move {
Expand Down Expand Up @@ -258,10 +258,9 @@ impl Process<LinKv, Wrapper> for KvNode {

let (transport, inner) = self.inner.as_ref().unwrap();
loop {
eprintln!("RECEIVING");
match transport.inner.net.rxq.recv().await {
Ok(Msg { src, body, .. }) => {
eprintln!("received {body:?} from {src}");
trace!("received {body:?} from {src}");
let transport = transport.clone();
let inner = inner.clone();
tokio::spawn(async move {
Expand All @@ -270,10 +269,7 @@ impl Process<LinKv, Wrapper> for KvNode {
if let Some(reply) = app.is_reply_to {
let mut requests = transport.inner.requests.lock().unwrap();
if let Some(sender) = requests.remove(&reply) {
eprintln!("is reply");
let _ = sender.send(app.message);
} else {
eprintln!("duplicate reply");
}
} else if let KvNodeInner::Replica(replica) = &inner {
if let Some(response) =
Expand All @@ -288,14 +284,14 @@ impl Process<LinKv, Wrapper> for KvNode {
is_reply_to: app.do_reply_to,
}),
};
eprintln!("sending response {response:?}");
trace!("sending response {response:?}");
let _ =
transport.inner.net.txq.send(response).await.unwrap();
} else {
eprintln!("NO RESPONSE");
trace!("NO RESPONSE");
}
} else {
eprintln!("(was unsolicited)");
trace!("(was unsolicited)");
}
}
Body::Workload(work) => {
Expand Down Expand Up @@ -485,7 +481,7 @@ impl Process<LinKv, Wrapper> for KvNode {
}
} else {
// Proxy...
eprintln!("Proxying...");
trace!("Proxying...");
let _ = transport
.inner
.net
Expand All @@ -507,7 +503,7 @@ impl Process<LinKv, Wrapper> for KvNode {
});
}
Err(_) => {
eprintln!("shutting down recv");
warn!("shutting down recv");
return Ok(());
} // Runtime is shutting down.
};
Expand All @@ -518,7 +514,10 @@ impl Process<LinKv, Wrapper> for KvNode {
#[tokio::main]
async fn main() -> Status {
// Log to stderr where Maelstrom will capture it
env_logger::init();
tracing_subscriber::fmt()
.with_writer(std::io::stderr)
.init();

info!("starting");

let process: KvNode = Default::default();
Expand Down
11 changes: 1 addition & 10 deletions src/ir/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,6 @@ impl<U: ReplicaUpcalls, T: Transport<U>> Client<U, T> {
continue;
}

// println!("finalizing to membership: {:?}", sync.membership);
for address in &sync.view.membership {
inner
.transport
Expand Down Expand Up @@ -433,10 +432,9 @@ impl<U: ReplicaUpcalls, T: Transport<U>> Client<U, T> {

let membership_size = sync.view.membership.size();
let finalized = get_finalized(&results);
//println!("checking quorum: {}", finalized.is_some());

if finalized.is_none() && let Some((_, result)) = get_quorum(membership_size, &results, true) {
// Fast path.
// eprintln!("doing fast path");
for address in &sync.view.membership {
inner.transport.do_send(
address,
Expand Down Expand Up @@ -466,12 +464,6 @@ impl<U: ReplicaUpcalls, T: Transport<U>> Client<U, T> {
})
{
// Slow path.
/*
eprintln!(
"doing SLOW path finalized={}",
reply_consensus_view.is_none()
);
*/
let future = join(sync.view.membership.iter().map(|address| {
(
address,
Expand All @@ -487,7 +479,6 @@ impl<U: ReplicaUpcalls, T: Transport<U>> Client<U, T> {

Some((result, reply_consensus_view, future))
} else {
// println!("no quorum");
None
}
};
Expand Down
54 changes: 20 additions & 34 deletions src/ir/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::{
sync::{Arc, Mutex},
time::{Duration, Instant},
};
use tracing::{warn, info, trace, trace_span};

#[derive(Debug)]
pub enum Status {
Expand Down Expand Up @@ -208,7 +209,7 @@ impl<U: Upcalls, T: Transport<U>> Replica<U, T> {
.retain(|a, _| sync.view.membership.contains(*a));

if sync.changed_view_recently {
eprintln!("{:?} skipping view change", inner.transport.address());
trace!("{:?} skipping view change", inner.transport.address());
sync.changed_view_recently = false;
}
/* else if sync
Expand All @@ -227,7 +228,7 @@ impl<U: Upcalls, T: Transport<U>> Replica<U, T> {
}
sync.view.number.0 += 1;

eprintln!(
info!(
"{:?} timeout sending do view change {}",
inner.transport.address(),
sync.view.number.0
Expand Down Expand Up @@ -288,6 +289,8 @@ impl<U: Upcalls, T: Transport<U>> Replica<U, T> {
}

pub fn receive(&self, address: T::Address, message: Message<U, T>) -> Option<Message<U, T>> {
let _span = trace_span!("recv", address = ?self.address()).entered();

let mut sync = self.inner.sync.lock().unwrap();
let sync = &mut *sync;

Expand All @@ -303,14 +306,12 @@ impl<U: Upcalls, T: Transport<U>> Replica<U, T> {
result,
view: sync.view.clone(),
}));
} else {
eprintln!("{:?} abnormal", self.inner.transport.address());
}
}
Message::<U, T>::ProposeInconsistent(ProposeInconsistent { op_id, op, recent }) => {
if sync.status.is_normal() {
if !recent.is_recent_relative_to(sync.view.number) {
eprintln!("ancient relative to {:?}", sync.view.number);
warn!("ancient relative to {:?}", sync.view.number);
return Some(Message::<U, T>::ReplyInconsistent(ReplyInconsistent {
op_id,
view: sync.view.clone(),
Expand Down Expand Up @@ -341,7 +342,7 @@ impl<U: Upcalls, T: Transport<U>> Replica<U, T> {
Message::<U, T>::ProposeConsensus(ProposeConsensus { op_id, op, recent }) => {
if sync.status.is_normal() {
if !recent.is_recent_relative_to(sync.view.number) {
eprintln!("ancient relative to {:?}", sync.view.number);
warn!("ancient relative to {:?}", sync.view.number);
return Some(Message::<U, T>::ReplyConsensus(ReplyConsensus {
op_id,
view: sync.view.clone(),
Expand Down Expand Up @@ -369,8 +370,6 @@ impl<U: Upcalls, T: Transport<U>> Replica<U, T> {
view: sync.view.clone(),
result_state: Some((result, state)),
}));
} else {
eprintln!("{:?} abnormal", self.inner.transport.address());
}
}
Message::<U, T>::FinalizeInconsistent(FinalizeInconsistent { op_id }) => {
Expand All @@ -390,7 +389,7 @@ impl<U: Upcalls, T: Transport<U>> Replica<U, T> {
sync.upcalls.finalize_consensus(&entry.op, &entry.result);
} else if cfg!(debug_assertions) && entry.result != result {
// For diagnostic purposes.
eprintln!("warning: tried to finalize consensus with {result:?} when {:?} was already finalized", entry.result);
warn!("tried to finalize consensus with {result:?} when {:?} was already finalized", entry.result);
}

// Send `Confirm` regardless; the view number gives the
Expand All @@ -403,7 +402,6 @@ impl<U: Upcalls, T: Transport<U>> Replica<U, T> {
}
}
Message::<U, T>::DoViewChange(msg) => {
//eprintln!("{:?} receiving dvt {:?} and have {:?} / {:?}", self.inner.transport.address(), msg.view.number, sync.view.number, sync.status);
if msg.view.number > sync.view.number
|| (msg.view.number == sync.view.number && sync.status.is_view_changing())
{
Expand All @@ -419,14 +417,6 @@ impl<U: Upcalls, T: Transport<U>> Replica<U, T> {
);
}

/*
eprintln!(
"index = {:?} , leader index = {:?}",
self.index,
sync.view.leader_index()
);
*/

if self.inner.transport.address() == sync.view.leader() && msg.addendum.is_some() {
debug_assert!(!msg.from_client);
let msg_view_number = msg.view.number;
Expand Down Expand Up @@ -454,7 +444,7 @@ impl<U: Upcalls, T: Transport<U>> Replica<U, T> {
);

if matching.clone().count() >= sync.latest_normal_view.membership.size().f_plus_one() {
eprintln!("{:?} DOING VIEW CHANGE", self.inner.transport.address());
info!("changing to {:?}", msg_view_number);
{
let latest_normal_view =
matching
Expand All @@ -475,8 +465,8 @@ impl<U: Upcalls, T: Transport<U>> Replica<U, T> {
.map(|(_, r)| r.addendum.as_ref().unwrap().record.clone())
.collect::<Vec<_>>();

eprintln!(
"have {} latest ({:?})",
trace!(
"have {} latest records ({:?})",
latest_records.len(),
sync
.outstanding_do_view_changes
Expand Down Expand Up @@ -570,17 +560,14 @@ impl<U: Upcalls, T: Transport<U>> Replica<U, T> {
}

if let Some(majority_result_in_d) = majority_result_in_d {
eprintln!("merge majority replied {:?} to {op_id:?}", majority_result_in_d);
trace!("merge majority replied {:?} to {op_id:?}", majority_result_in_d);
d.insert(op_id, (entries[0].op.clone(), majority_result_in_d));
} else {
eprintln!("merge no majority for {op_id:?}; deciding among {:?}", entries.iter().map(|entry| (entry.result.clone(), entry.state)).collect::<Vec<_>>());
trace!("merge no majority for {op_id:?}; deciding among {:?}", entries.iter().map(|entry| (entry.result.clone(), entry.state)).collect::<Vec<_>>());
u.extend(entries.into_iter().map(|e| (op_id, e.op, e.result)));
}
}

// println!("d = {d:?}");
// println!("u = {u:?}");

{
let sync = &mut *sync;
sync.upcalls.sync(&sync.record, &R);
Expand Down Expand Up @@ -642,8 +629,8 @@ impl<U: Upcalls, T: Transport<U>> Replica<U, T> {
);
}
}
} else if let Some(leader_record) = sync.leader_record.as_ref() && leader_record.view.number >= msg.view.number {
println!("{:?} sending leader record to help catch up {address:?}", self.address());
} else if !msg.from_client && let Some(leader_record) = sync.leader_record.as_ref() && leader_record.view.number >= msg.view.number {
warn!("{:?} sending leader record to help catch up {address:?}", self.address());
self.inner.transport.do_send(address, leader_record.clone());
}
}
Expand All @@ -654,7 +641,7 @@ impl<U: Upcalls, T: Transport<U>> Replica<U, T> {
if view.number > sync.view.number
|| (view.number == sync.view.number && !sync.status.is_normal())
{
eprintln!("{:?} starting view {:?} (was {:?} in {:?})", self.inner.transport.address(), view.number, sync.status, sync.view.number);
info!("starting view {:?} (was {:?} in {:?})", view.number, sync.status, sync.view.number);
sync.upcalls.sync(&sync.record, &new_record);
sync.record = new_record.clone();
sync.status = Status::Normal;
Expand All @@ -666,13 +653,12 @@ impl<U: Upcalls, T: Transport<U>> Replica<U, T> {
}
}
Message::<U, T>::AddMember(AddMember{address}) => {
eprintln!("{:?} recv add member {address:?}", self.inner.transport.address());
if sync.status.is_normal() && sync.view.membership.get_index(address).is_none() {
if !sync.view.membership.contains(self.inner.transport.address()) {
// TODO: Expand coverage.
return None;
}
eprintln!("{:?} acting on add member {address:?}", self.inner.transport.address());
info!("adding member {address:?}");

sync.status = Status::ViewChanging;
sync.view.number.0 += 3;
Expand All @@ -691,12 +677,11 @@ impl<U: Upcalls, T: Transport<U>> Replica<U, T> {
}
}
Message::<U, T>::RemoveMember(RemoveMember{address}) => {
eprintln!("{:?} recv remove member {address:?}", self.inner.transport.address());
if sync.status.is_normal() && sync.view.membership.get_index(address).is_some() && sync.view.membership.len() > 1 && address != self.inner.transport.address() {
if !sync.view.membership.contains(self.inner.transport.address()) {
return None;
}
eprintln!("{:?} acting on remove member {address:?}", self.inner.transport.address());
info!("removing member {address:?}");
sync.status = Status::ViewChanging;
sync.view.number.0 += 3;

Expand All @@ -714,7 +699,8 @@ impl<U: Upcalls, T: Transport<U>> Replica<U, T> {
}
}
_ => {
eprintln!("unexpected message");
debug_assert!(false);
warn!("unexpected message");
}
}
None
Expand Down
7 changes: 3 additions & 4 deletions src/occ/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
MvccStore,
};
use serde::{Deserialize, Serialize};
use tracing::trace;
use std::{
borrow::Borrow,
collections::{hash_map::Entry, BTreeMap, HashMap},
Expand Down Expand Up @@ -168,8 +169,6 @@ impl<K: Key, V: Value, TS: Timestamp> Store<K, V, TS> {
}
}

// println!("pr = {prepared_reads:?}, pw = {prepared_writes:?}");

let result = self.occ_check(&transaction, commit);

// Avoid logical mutation in dry run.
Expand Down Expand Up @@ -304,7 +303,7 @@ impl<K: Key, V: Value, TS: Timestamp> Store<K, V, TS> {
commit: TS,
finalized: bool,
) {
eprintln!("preparing {id:?} at {commit:?} (fin = {finalized})");
trace!("preparing {id:?} at {commit:?} (fin = {finalized})");
match self.prepared.entry(id) {
Entry::Vacant(vacant) => {
vacant.insert((commit, transaction.clone(), finalized));
Expand Down Expand Up @@ -341,7 +340,7 @@ impl<K: Key, V: Value, TS: Timestamp> Store<K, V, TS> {

pub fn remove_prepared(&mut self, id: TransactionId) -> bool {
if let Some((commit, transaction, finalized)) = self.prepared.remove(&id) {
eprintln!("removing prepared {id:?} at {commit:?} (fin = {finalized})");
trace!("removing prepared {id:?} at {commit:?} (fin = {finalized})");
self.remove_prepared_inner(transaction, commit);
true
} else {
Expand Down
Loading

0 comments on commit 5dbe08f

Please sign in to comment.