From 818da1062b2e0c1e32c055a65fd8f22d916be8d7 Mon Sep 17 00:00:00 2001 From: Felipe Cardozo Date: Wed, 6 Nov 2024 15:09:02 -0300 Subject: [PATCH] fix: mirror compatibility with old versions (#4239) * fix: mirror compatibility with old versions * fix: workaround to align public api to get version encoding --- .../src/partition/spec.rs | 2 + .../src/topic/spec.rs | 43 +------------------ .../src/spu_api/update_replica.rs | 2 + 3 files changed, 6 insertions(+), 41 deletions(-) diff --git a/crates/fluvio-controlplane-metadata/src/partition/spec.rs b/crates/fluvio-controlplane-metadata/src/partition/spec.rs index 929c3cca63..7f5d27f9d2 100644 --- a/crates/fluvio-controlplane-metadata/src/partition/spec.rs +++ b/crates/fluvio-controlplane-metadata/src/partition/spec.rs @@ -211,6 +211,7 @@ pub struct HomePartitionConfig { feature = "use_serde", serde(default, skip_serializing_if = "crate::is_false") )] + #[fluvio(min_version = 18)] pub source: bool, } @@ -236,6 +237,7 @@ pub struct RemotePartitionConfig { feature = "use_serde", serde(default, skip_serializing_if = "crate::is_false") )] + #[fluvio(min_version = 18)] pub target: bool, } diff --git a/crates/fluvio-controlplane-metadata/src/topic/spec.rs b/crates/fluvio-controlplane-metadata/src/topic/spec.rs index 855998402f..ed23143bdb 100644 --- a/crates/fluvio-controlplane-metadata/src/topic/spec.rs +++ b/crates/fluvio-controlplane-metadata/src/topic/spec.rs @@ -790,7 +790,7 @@ cfg_if::cfg_if! { } } -#[derive(Default, Debug, Clone, Eq, PartialEq)] +#[derive(Default, Debug, Clone, Eq, PartialEq, Decoder, Encoder)] #[cfg_attr( feature = "use_serde", derive(serde::Serialize, serde::Deserialize), @@ -803,49 +803,10 @@ pub struct HomeMirrorInner { feature = "use_serde", serde(skip_serializing_if = "crate::is_false", default) )] + #[fluvio(min_version = 18)] pub source: bool, // source of mirror } -impl Encoder for HomeMirrorInner { - fn write_size(&self, version: i16) -> usize { - if version < 18 { - self.partitions.write_size(version) - } else { - self.partitions.write_size(version) + self.source.write_size(version) - } - } - - fn encode( - &self, - dest: &mut T, - version: fluvio_protocol::Version, - ) -> std::result::Result<(), std::io::Error> - where - T: bytes::BufMut, - { - if version < 18 { - self.partitions.encode(dest, version)?; - } else { - self.partitions.encode(dest, version)?; - self.source.encode(dest, version)?; - } - Ok(()) - } -} - -impl Decoder for HomeMirrorInner { - fn decode(&mut self, src: &mut T, version: i16) -> std::result::Result<(), std::io::Error> - where - T: bytes::Buf, - { - self.partitions.decode(src, version)?; - if version >= 18 { - self.source.decode(src, version)?; - } - Ok(()) - } -} - impl From> for HomeMirrorConfig { fn from(partitions: Vec) -> Self { Self(HomeMirrorInner { diff --git a/crates/fluvio-controlplane/src/spu_api/update_replica.rs b/crates/fluvio-controlplane/src/spu_api/update_replica.rs index dff62d0ae8..6dc6116946 100644 --- a/crates/fluvio-controlplane/src/spu_api/update_replica.rs +++ b/crates/fluvio-controlplane/src/spu_api/update_replica.rs @@ -11,6 +11,8 @@ pub type UpdateReplicaRequest = ControlPlaneRequest; impl Request for UpdateReplicaRequest { const API_KEY: u16 = InternalSpuApi::UpdateReplica as u16; + const DEFAULT_API_VERSION: i16 = 18; // align with pubic api to get version encoding + const MIN_API_VERSION: i16 = 0; type Response = UpdateReplicaResponse; }