Skip to content

Commit

Permalink
fix: mirror compatibility with old versions (#4239)
Browse files Browse the repository at this point in the history
* fix: mirror compatibility with old versions

* fix: workaround to align public api to get version encoding
  • Loading branch information
fraidev authored Nov 6, 2024
1 parent 5401b0a commit 818da10
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 41 deletions.
2 changes: 2 additions & 0 deletions crates/fluvio-controlplane-metadata/src/partition/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ pub struct HomePartitionConfig {
feature = "use_serde",
serde(default, skip_serializing_if = "crate::is_false")
)]
#[fluvio(min_version = 18)]
pub source: bool,
}

Expand All @@ -236,6 +237,7 @@ pub struct RemotePartitionConfig {
feature = "use_serde",
serde(default, skip_serializing_if = "crate::is_false")
)]
#[fluvio(min_version = 18)]
pub target: bool,
}

Expand Down
43 changes: 2 additions & 41 deletions crates/fluvio-controlplane-metadata/src/topic/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ cfg_if::cfg_if! {
}
}

#[derive(Default, Debug, Clone, Eq, PartialEq)]
#[derive(Default, Debug, Clone, Eq, PartialEq, Decoder, Encoder)]
#[cfg_attr(
feature = "use_serde",
derive(serde::Serialize, serde::Deserialize),
Expand All @@ -803,49 +803,10 @@ pub struct HomeMirrorInner {
feature = "use_serde",
serde(skip_serializing_if = "crate::is_false", default)
)]
#[fluvio(min_version = 18)]
pub source: bool, // source of mirror
}

impl Encoder for HomeMirrorInner {
fn write_size(&self, version: i16) -> usize {
if version < 18 {
self.partitions.write_size(version)
} else {
self.partitions.write_size(version) + self.source.write_size(version)
}
}

fn encode<T>(
&self,
dest: &mut T,
version: fluvio_protocol::Version,
) -> std::result::Result<(), std::io::Error>
where
T: bytes::BufMut,
{
if version < 18 {
self.partitions.encode(dest, version)?;
} else {
self.partitions.encode(dest, version)?;
self.source.encode(dest, version)?;
}
Ok(())
}
}

impl Decoder for HomeMirrorInner {
fn decode<T>(&mut self, src: &mut T, version: i16) -> std::result::Result<(), std::io::Error>
where
T: bytes::Buf,
{
self.partitions.decode(src, version)?;
if version >= 18 {
self.source.decode(src, version)?;
}
Ok(())
}
}

impl From<Vec<HomePartitionConfig>> for HomeMirrorConfig {
fn from(partitions: Vec<HomePartitionConfig>) -> Self {
Self(HomeMirrorInner {
Expand Down
2 changes: 2 additions & 0 deletions crates/fluvio-controlplane/src/spu_api/update_replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ pub type UpdateReplicaRequest = ControlPlaneRequest<Replica>;

impl Request for UpdateReplicaRequest {
const API_KEY: u16 = InternalSpuApi::UpdateReplica as u16;
const DEFAULT_API_VERSION: i16 = 18; // align with pubic api to get version encoding
const MIN_API_VERSION: i16 = 0;
type Response = UpdateReplicaResponse;
}

Expand Down

0 comments on commit 818da10

Please sign in to comment.