Skip to content

Commit

Permalink
Remove infinite retry policy from find tail in partition processor
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Nov 13, 2024
1 parent 0cc9816 commit 96f4bfc
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 27 deletions.
2 changes: 1 addition & 1 deletion crates/bifrost/src/providers/replicated_loglet/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
return Ok(latest_tail);
}
// We might have been sealed by external node and the sequencer is unaware. In this
// case, we run the a check seal task to determine if we suspect that sealing is
// case, we run a check seal task to determine if we suspect that sealing is
// happening.
let result = CheckSealTask::run(
&self.my_params,
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/src/providers/replicated_loglet/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ impl RequestPump {
let _ = reciprocal.prepare(sequencer_state).try_send();
}

/// Infailable handle_append method
/// Infallible handle_append method
#[instrument(
level="trace",
skip_all,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ impl<T: TransportConnect> SequencerAppender<T> {
Ok(Some(result)) => result,
Ok(None) => break, // no more tasks
Err(elapsed) => {
// if we have already ackowledged this append, it's okay to retire.
// if we have already acknowledged this append, it's okay to retire.
if self.commit_resolver.is_none() {
tracing::debug!(%pending_servers, %wave, ?spread, ?elapsed, responses=?checker, "Some servers didn't store this batch, but append was committed, giving up");
return SequencerAppenderState::Done;
Expand Down
34 changes: 11 additions & 23 deletions crates/worker/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ use restate_types::net::partition_processor::{
InvocationOutput, PartitionProcessorRpcError, PartitionProcessorRpcRequest,
PartitionProcessorRpcRequestInner, PartitionProcessorRpcResponse,
};
use restate_types::retries::RetryPolicy;
use restate_types::time::MillisSinceEpoch;
use restate_types::{invocation, GenerationalNodeId};
use restate_wal_protocol::control::AnnounceLeader;
Expand Down Expand Up @@ -265,7 +264,7 @@ where
{
#[instrument(level = "error", skip_all, fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty))]
pub async fn run(mut self) -> anyhow::Result<()> {
debug!("Starting the partition processor");
info!("Starting the partition processor");
let res = self.run_inner().await;

// Drain control_rx
Expand Down Expand Up @@ -293,25 +292,14 @@ where

self.status.last_applied_log_lsn = Some(last_applied_lsn);

// todo make this configurable
let find_tail_retry_policy = RetryPolicy::exponential(
Duration::from_millis(100),
2.0,
None,
Some(Duration::from_secs(10)),
);

// It can happen that the log is currently unavailable. That's why we need to retry.
// todo if being stuck expose the state to the controller to allow it to make a control decision
let current_tail = find_tail_retry_policy
.retry(|| {
self.bifrost.find_tail(
LogId::from(self.partition_id),
FindTailAttributes::default(),
)
})
.await
.expect("we should be retrying indefinitely");
// propagate errors and let the PPM handle error retries
let current_tail = self
.bifrost
.find_tail(
LogId::from(self.partition_id),
FindTailAttributes::default(),
)
.await?;

debug!(
last_applied_lsn = %last_applied_lsn,
Expand Down Expand Up @@ -379,8 +367,6 @@ where
.is_ok_and(|(_, envelope)| envelope.matches_key_query(&key_query))))
});

info!("PartitionProcessor starting up.");

// avoid synchronized timers. We pick a randomised timer between 500 and 1023 millis.
let mut status_update_timer =
tokio::time::interval(Duration::from_millis(500 + rand::random::<u64>() % 524));
Expand All @@ -399,6 +385,8 @@ where
let mut action_collector = ActionCollector::default();
let mut command_buffer = Vec::with_capacity(self.max_command_batch_size);

info!("PartitionProcessor starting event loop.");

loop {
tokio::select! {
_ = &mut cancellation => break,
Expand Down
2 changes: 1 addition & 1 deletion server/tests/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ async fn replicated_loglet() -> googletest::Result<()> {
3,
);

let regex: Regex = "PartitionProcessor starting up".parse()?;
let regex: Regex = "Starting the partition processor".parse()?;
let mut partition_processors_starting_up: Vec<_> =
(1..=3).map(|idx| nodes[idx].lines(regex.clone())).collect();

Expand Down

0 comments on commit 96f4bfc

Please sign in to comment.