Skip to content

Commit

Permalink
rebase: cargo clippy and fmt fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
digikata authored and fraidev committed Mar 21, 2024
1 parent d535448 commit df5a3a2
Show file tree
Hide file tree
Showing 12 changed files with 81 additions and 81 deletions.
2 changes: 0 additions & 2 deletions crates/fluvio-cluster/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ use spu::SpuCmd;
use start::StartOpt;
use status::StatusOpt;


pub use self::error::ClusterCliError;


use anyhow::Result;

use fluvio_extension_common as common;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl ExportOpt {
let partition = mirror_config
.partitions()
.iter()
.position(|rc| &rc.remote_cluster == &self.mirror);
.position(|rc| rc.remote_cluster == self.mirror);

if partition.is_none() {
return Err(anyhow::anyhow!(
Expand Down
4 changes: 2 additions & 2 deletions crates/fluvio-controlplane/src/remote_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ mod metadata {
fn convert_status_from_k8(
status: Self::Status,
) -> <Self::K8Spec as fluvio_stream_model::k8_types::Spec>::Status {
status.into()
status
}

fn into_k8(self) -> Self::K8Spec {
self.into()
self
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/fluvio-controlplane/src/upstream_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ mod metadata {
fn convert_status_from_k8(
status: Self::Status,
) -> <Self::K8Spec as fluvio_stream_model::k8_types::Spec>::Status {
status.into()
status
}

fn into_k8(self) -> Self::K8Spec {
self.into()
self
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions crates/fluvio-sc/src/controllers/topics/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ pub(crate) async fn update_replica_map_for_assigned_topic<C: MetadataItem>(
}
}


/// values for next state
#[derive(Default, Debug)]
pub(crate) struct TopicNextState<C: MetadataItem> {
Expand Down Expand Up @@ -255,7 +254,6 @@ impl<C: MetadataItem> TopicNextState<C> {
validate_mirror_topic_parameter(mirror_config)
}
TopicResolution::Pending | TopicResolution::InsufficientResources => {

let partitions = mirror_config.partition_count();
// create pseudo normal replica map
let computed_param = TopicReplicaParam {
Expand All @@ -264,7 +262,9 @@ impl<C: MetadataItem> TopicNextState<C> {
..Default::default()
};

let replica_map = scheduler.generate_replica_map_for_topic(&computed_param).await;
let replica_map = scheduler
.generate_replica_map_for_topic(&computed_param)
.await;
if replica_map.scheduled() {
debug!(
topic = %topic.key(),
Expand Down
59 changes: 31 additions & 28 deletions crates/fluvio-spu/src/control_plane/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,38 +117,41 @@ impl ScDispatcher<FileReplica> {
async fn dispatch_loop(mut self) {
let mut backoff = create_backoff();

loop {
debug!(loop_count = self.metrics.get_loop_count(), "starting loop");

let mut socket = self.create_socket_to_sc(&mut backoff).await;
info!(
local_spu_id=%self.ctx.local_spu_id(),
"established connection to sc for spu",
);
// clippy: this loop never loops due to final match break/break
// loop {
debug!(loop_count = self.metrics.get_loop_count(), "starting loop");

let mut socket = self.create_socket_to_sc(&mut backoff).await;
info!(
local_spu_id=%self.ctx.local_spu_id(),
"established connection to sc for spu",
);

// register and exit on error
let _ = match self.send_spu_registration(&mut socket).await {
Ok(status) => status,
Err(err) => {
print_cli_err!(format!(
"spu registration failed with sc due to error: {err}"
));
break;
}
};
// register and exit on error
let _ = match self.send_spu_registration(&mut socket).await {
Ok(status) => status,
Err(err) => {
print_cli_err!(format!(
"spu registration failed with sc due to error: {err}"
));
warn!("spu registration failed with sc due to error: {err}");
return;
// break;
}
};

// continuously process updates from and send back status to SC
info!("starting sc request loop");
match self.request_loop(socket).await {
Ok(_) => {
break;
}
Err(err) => {
warn!(?err, "error connecting to sc, waiting before reconnecting",);
break;
}
// continuously process updates from and send back status to SC
info!("starting sc request loop");
match self.request_loop(socket).await {
Ok(_) => {
// break;
}
Err(err) => {
warn!(?err, "error connecting to sc, waiting before reconnecting",);
// break;
}
}
// }
}

#[instrument(
Expand Down
53 changes: 29 additions & 24 deletions crates/fluvio-spu/src/mirroring/source/controller.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::{
fmt,
time::Duration,
sync::{
Arc,
atomic::{AtomicU64, Ordering, AtomicI64},
},
time::Duration,
};

use futures_util::StreamExt;
Expand Down Expand Up @@ -322,26 +322,31 @@ where
debug!(new_target_leo, "updating target leo from uninitialized");
self.state.metrics.update_target_leo(new_target_leo);
}
if new_target_leo > leader_leo {
// target leo should never be greater than leader's leo
warn!(
leader_leo,
new_target_leo, "target has more records, this should not happen, this is error"
);
return Err(anyhow!("target's leo: {new_target_leo} > leader's leo: {leader_leo} this should not happen, this is error"));
} else if new_target_leo < leader_leo {
debug!(
new_target_leo,
leader_leo, "target has less records, need to refresh target"
);
self.state.metrics.update_target_leo(new_target_leo);
Ok(true)
} else {
debug!(
new_target_leo,
"target has same records, no need to refresh target"
);
Ok(false)
match new_target_leo.cmp(&leader_leo) {
std::cmp::Ordering::Greater => {
// target leo should never be greater than leader's leo
warn!(
leader_leo,
new_target_leo,
"target has more records, this should not happen, this is error"
);
return Err(anyhow!("target's leo: {new_target_leo} > leader's leo: {leader_leo} this should not happen, this is error"));
}
std::cmp::Ordering::Less => {
debug!(
new_target_leo,
leader_leo, "target has less records, need to refresh target"
);
self.state.metrics.update_target_leo(new_target_leo);
Ok(true)
}
std::cmp::Ordering::Equal => {
debug!(
new_target_leo,
"target has same records, no need to refresh target"
);
Ok(false)
}
}
}

Expand Down Expand Up @@ -431,7 +436,7 @@ where
backoff: &mut ExponentialBackoff,
upstream_cluster: &UpstreamClusterSpec,
) -> (FluvioSocket, bool) {
let tlspolicy = option_tlspolicy(&upstream_cluster);
let tlspolicy = option_tlspolicy(upstream_cluster);

loop {
self.state.metrics.increase_conn_count();
Expand All @@ -447,7 +452,7 @@ where
match DomainConnector::try_from(tlspolicy.clone()) {
Ok(connector) => {
// box connector?
FluvioSocket::connect_with_connector(&endpoint, &(*connector)).await
FluvioSocket::connect_with_connector(endpoint, &(*connector)).await
}
Err(err) => {
error!(
Expand All @@ -460,7 +465,7 @@ where
}
// FluvioSocket::connect(&endpoint)
} else {
FluvioSocket::connect(&endpoint).await
FluvioSocket::connect(endpoint).await
};
match res {
Ok(socket) => {
Expand Down
3 changes: 1 addition & 2 deletions crates/fluvio-spu/src/mirroring/targt/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,7 @@ impl MirrorTargetHandler {
};

debug!("sending offset info: {:#?}", offset_request);
let req_msg =
RequestMessage::new_request(offset_request).set_client_id(format!("mirror target"));
let req_msg = RequestMessage::new_request(offset_request).set_client_id("mirror target");

sink.send_request(&req_msg).await?;

Expand Down
4 changes: 2 additions & 2 deletions crates/fluvio-spu/src/mirroring/test/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl ReplicaConfig {

replica.mirror = Some(PartitionMirrorConfig::Target(TargetPartitionConfig {
remote_cluster: remote_cluster_name.to_string(),
source_replica: ReplicaKey::new(self.source_topic.clone(), 0 as u32).to_string(),
source_replica: ReplicaKey::new(self.source_topic.clone(), 0u32).to_string(),
}));

replica
Expand Down Expand Up @@ -225,7 +225,7 @@ impl ReplicaConfig {
let mut remote_clusters = vec![];

for (partition_id, remote_cluster) in self.remote_clusters.iter().enumerate() {
let replica = self.target_replica(&remote_cluster, partition_id as PartitionId);
let replica = self.target_replica(remote_cluster, partition_id as PartitionId);

let remote_cluster = RemoteCluster {
name: remote_cluster.clone(),
Expand Down
4 changes: 2 additions & 2 deletions crates/fluvio-spu/src/mirroring/test/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async fn test_mirroring_new_records() {
let target_gctx = target_builder.init_mirror_target().await;
let target_replica0 = target_gctx
.leaders_state()
.get(&ReplicaKey::new("temp", 0 as u32))
.get(&ReplicaKey::new("temp", 0u32))
.await
.expect("leader");
assert_eq!(
Expand Down Expand Up @@ -58,7 +58,7 @@ async fn test_mirroring_new_records() {
// check 2nd target replica
let target_replica1 = target_gctx
.leaders_state()
.get(&ReplicaKey::new("temp", 1 as u32))
.get(&ReplicaKey::new("temp", 1u32))
.await
.expect("2nd targert");

Expand Down
15 changes: 5 additions & 10 deletions crates/fluvio-spu/src/replication/leader/leaders_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,11 @@ where
let read = self.read().await;
for (_replica_key, state) in read.iter() {
let replica_config = state.get_replica();
if let Some(mirror) = &replica_config.mirror {
match mirror {
PartitionMirrorConfig::Target(target) => {
if target.remote_cluster == remote_cluster
&& target.source_replica == source_replica
{
return Some(state.clone());
}
}
_ => {}
if let Some(PartitionMirrorConfig::Target(target)) = &replica_config.mirror {
if target.remote_cluster == remote_cluster
&& target.source_replica == source_replica
{
return Some(state.clone());
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions crates/fluvio-spu/src/replication/leader/replica_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ where
if let Some(dedup) = &state.replica.deduplication {
debug!(?state.replica.deduplication, "init leader smartmodule context");
let dedup_filter = dedup_to_invocation(dedup);
let mut sm_ctx = SmartModuleContext::try_from(vec![dedup_filter], COMMON_VERSION, &ctx)
let mut sm_ctx = SmartModuleContext::try_from(vec![dedup_filter], COMMON_VERSION, ctx)
.await?
.ok_or_else(|| anyhow::anyhow!("SmartModule context is required here"))?;
sm_ctx
Expand Down Expand Up @@ -517,13 +517,13 @@ where
///
/// // case 2: follower offset is same as previous
/// // leader: leo: 2, hw: 2, follower: leo: 1, hw: 1
/// // Input: leo: 1, hw:1,
/// // Input: leo: 1, hw:1,
/// // Expect, no status but follower sync
/// //
/// // case 3: different follower offset
/// // leader: leo: 3, hw: 3, follower: leo: 1, hw: 1
/// // Input: leo: 2, hw: 2,
/// // Expect, status change, follower sync
/// // Expect, status change, follower sync
///
/// Simple HW mark calculation (assume LRS = 2) which is find minimum offset values that satisfy
/// Assume: Leader leo = 10, hw = 2,
Expand Down

0 comments on commit df5a3a2

Please sign in to comment.