Skip to content

Commit

Permalink
feat: reverse home to edge mirroring (#4231)
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev authored Nov 4, 2024
1 parent cdb0fdf commit 2511b94
Show file tree
Hide file tree
Showing 51 changed files with 1,380 additions and 190 deletions.
13 changes: 7 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -166,24 +166,24 @@ k8-diff = { version = "0.1.2" }
trybuild = { branch = "check_option", git = "https://github.com/infinyon/trybuild" }

# Internal fluvio dependencies
fluvio = { version = "0.23.3", path = "crates/fluvio" }
fluvio = { version = "0.24.0", path = "crates/fluvio" }
fluvio-auth = { path = "crates/fluvio-auth" }
fluvio-channel = { path = "crates/fluvio-channel" }
fluvio-cli-common = { path = "crates/fluvio-cli-common"}
fluvio-compression = { version = "0.3.2", path = "crates/fluvio-compression", default-features = false }
fluvio-connector-package = { path = "crates/fluvio-connector-package/" }
fluvio-controlplane = { path = "crates/fluvio-controlplane" }
fluvio-controlplane-metadata = { version = "0.29.0", default-features = false, path = "crates/fluvio-controlplane-metadata" }
fluvio-controlplane-metadata = { version = "0.30.0", default-features = false, path = "crates/fluvio-controlplane-metadata" }
fluvio-extension-common = { path = "crates/fluvio-extension-common", default-features = false }
fluvio-hub-util = { path = "crates/fluvio-hub-util" }
fluvio-package-index = { version = "0.7.6", path = "crates/fluvio-package-index", default-features = false }
fluvio-protocol = { version = "0.11.0", path = "crates/fluvio-protocol" }
fluvio-sc-schema = { version = "0.24.0", path = "crates/fluvio-sc-schema", default-features = false }
fluvio-protocol = { version = "0.12.0", path = "crates/fluvio-protocol" }
fluvio-sc-schema = { version = "0.25.0", path = "crates/fluvio-sc-schema", default-features = false }
fluvio-service = { path = "crates/fluvio-service" }
fluvio-smartengine = { version = "0.8.0", path = "crates/fluvio-smartengine", default-features = false }
fluvio-smartmodule = { version = "0.7.4", path = "crates/fluvio-smartmodule", default-features = false }
fluvio-smartmodule = { version = "0.8.0", path = "crates/fluvio-smartmodule", default-features = false }
fluvio-socket = { version = "0.14.9", path = "crates/fluvio-socket", default-features = false }
fluvio-spu-schema = { version = "0.16.0", path = "crates/fluvio-spu-schema", default-features = false }
fluvio-spu-schema = { version = "0.17.0", path = "crates/fluvio-spu-schema", default-features = false }
fluvio-storage = { path = "crates/fluvio-storage" }
fluvio-stream-dispatcher = { version = "0.13.2", path = "crates/fluvio-stream-dispatcher" }
fluvio-stream-model = { version = "0.11.3", path = "crates/fluvio-stream-model", default-features = false }
Expand Down
4 changes: 4 additions & 0 deletions crates/fluvio-cli/src/client/topic/add_mirror.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pub struct AddMirrorOpt {
topic: String,
/// Remote cluster to add
remote: String,
/// if set, it will mirror from home to remote
#[arg(long)]
home_to_remote: bool,
}

impl AddMirrorOpt {
Expand All @@ -24,6 +27,7 @@ impl AddMirrorOpt {

let request = AddMirror {
remote_cluster: self.remote.clone(),
home_to_mirror: self.home_to_remote,
};

let action = UpdateTopicAction::AddMirror(request);
Expand Down
19 changes: 15 additions & 4 deletions crates/fluvio-cli/src/client/topic/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ pub struct CreateTopicOpt {
/// or inside the topic configuration file in YAML format.
#[arg(short, long, value_name = "PATH", conflicts_with = "config-arg")]
config: Option<PathBuf>,

/// signify that this topic can be mirror from home to edge
#[arg(long)]
home_to_remote: bool,
}

impl CreateTopicOpt {
Expand Down Expand Up @@ -163,7 +167,10 @@ impl CreateTopicOpt {
&topic_name,
)?)
} else if let Some(mirror_assign_file) = &self.mirror_apply {
let config = MirrorConfig::read_from_json_file(mirror_assign_file, &topic_name)?;
let mut config = MirrorConfig::read_from_json_file(mirror_assign_file, &topic_name)?;

config.set_home_to_remote(self.home_to_remote)?;

let targets = match config {
MirrorConfig::Home(ref c) => c
.partitions()
Expand Down Expand Up @@ -199,7 +206,9 @@ impl CreateTopicOpt {

ReplicaSpec::Mirror(config)
} else if self.mirror {
let mirror_map = MirrorConfig::Home(HomeMirrorConfig::from(vec![]));
let mut home_mirror = HomeMirrorConfig::from(vec![]);
home_mirror.source = self.home_to_remote;
let mirror_map = MirrorConfig::Home(home_mirror);
ReplicaSpec::Mirror(mirror_map)
} else {
ReplicaSpec::Computed(TopicReplicaParam {
Expand Down Expand Up @@ -349,14 +358,16 @@ mod load {
partitions[0],
HomePartitionConfig {
remote_cluster: "boat1".to_string(),
remote_replica: "boats-0".to_string()
remote_replica: "boats-0".to_string(),
..Default::default()
}
);
assert_eq!(
partitions[1],
HomePartitionConfig {
remote_cluster: "boat2".to_string(),
remote_replica: "boats-0".to_string()
remote_replica: "boats-0".to_string(),
..Default::default()
}
);
}
Expand Down
3 changes: 2 additions & 1 deletion crates/fluvio-controlplane-metadata/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "fluvio-controlplane-metadata"
edition = "2021"
version = "0.29.1"
version = "0.30.0"
authors = ["Fluvio Contributors <team@fluvio.io>"]
description = "Metadata definition for Fluvio control plane"
repository = "https://github.com/infinyon/fluvio"
Expand All @@ -17,6 +17,7 @@ use_serde = ["serde","semver/serde", "bytesize/serde", "humantime-serde", "serde
k8 = ["use_serde", "fluvio-stream-model/k8"]

[dependencies]
cfg-if = { workspace = true }
thiserror = { workspace = true }
base64 = { workspace = true }
bytes = { workspace = true }
Expand Down
5 changes: 5 additions & 0 deletions crates/fluvio-controlplane-metadata/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ pub mod store {
pub use fluvio_stream_model::store::*;
}

#[cfg(feature = "use_serde")]
pub(crate) fn is_false(b: &bool) -> bool {
!b
}

#[cfg(feature = "k8")]
pub use fluvio_stream_model::k8_types;

Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-controlplane-metadata/src/mirror/k8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ mod test_v1_spec {
type K8RemoteSpec = K8Obj<MirrorSpec>;

#[test]
fn read_k8_mirror_json() {
fn read_k8_mirror_json_v1() {
let reader: BufReader<File> =
BufReader::new(File::open("tests/k8_mirror_v1.json").expect("spec"));
let cluster: K8RemoteSpec = serde_json::from_reader(reader).expect("failed to parse topic");
Expand Down
5 changes: 3 additions & 2 deletions crates/fluvio-controlplane-metadata/src/mirror/status.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::time::Duration;
use fluvio_protocol::{Encoder, Decoder};

#[derive(Encoder, Decoder, Default, Debug, Clone, Eq, PartialEq)]
Expand Down Expand Up @@ -101,7 +100,7 @@ pub struct ConnectionStat {

impl MirrorStatus {
#[cfg(feature = "use_serde")]
pub fn last_seen(&self, since: Duration) -> String {
pub fn last_seen(&self, since: std::time::Duration) -> String {
use humantime_serde::re::humantime;

let since_sec = since.as_secs();
Expand Down Expand Up @@ -153,6 +152,8 @@ impl std::fmt::Display for ConnectionStatus {

#[cfg(test)]
mod test {
use std::time::Duration;

use super::*;

#[test]
Expand Down
3 changes: 2 additions & 1 deletion crates/fluvio-controlplane-metadata/src/partition/k8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ mod test_spec {
mirror,
PartitionMirrorConfig::Home(HomePartitionConfig {
remote_cluster: "boat1".to_string(),
remote_replica: "boats-0".to_string()
remote_replica: "boats-0".to_string(),
..Default::default()
})
);
}
Expand Down
57 changes: 51 additions & 6 deletions crates/fluvio-controlplane-metadata/src/partition/spec.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
#![allow(clippy::assign_op_pattern)]

//!
//! # Partition Spec
//!
//!
use fluvio_types::SpuId;
use fluvio_protocol::{Encoder, Decoder};
use fluvio_protocol::{link::ErrorCode, Decoder, Encoder};

use crate::topic::{CleanupPolicy, CompressionAlgorithm, Deduplication, TopicSpec, TopicStorageConfig};

Expand Down Expand Up @@ -82,7 +78,24 @@ impl PartitionSpec {

pub fn mirror_string(&self) -> String {
if let Some(mirror) = &self.mirror {
mirror.external_cluster()
let external = mirror.external_cluster();
match mirror {
PartitionMirrorConfig::Remote(remote) => {
if remote.target {
format!("{}(to-home)", external)
} else {
format!("{}(from-home)", external)
}
}

PartitionMirrorConfig::Home(home) => {
if home.source {
format!("{}(from-remote)", external)
} else {
format!("{}(to-remote)", external)
}
}
}
} else {
"".to_owned()
}
Expand Down Expand Up @@ -149,9 +162,30 @@ impl PartitionMirrorConfig {
}
}

#[deprecated(since = "0.29.1")]
pub fn is_home_mirror(&self) -> bool {
matches!(self, Self::Home(_))
}

/// check whether this mirror should accept traffic
pub fn accept_traffic(&self) -> Option<ErrorCode> {
match self {
Self::Remote(r) => {
if r.target {
Some(ErrorCode::MirrorProduceFromRemoteNotAllowed)
} else {
None
}
}
Self::Home(h) => {
if h.source {
None
} else {
Some(ErrorCode::MirrorProduceFromHome)
}
}
}
}
}

impl std::fmt::Display for PartitionMirrorConfig {
Expand All @@ -172,6 +206,12 @@ impl std::fmt::Display for PartitionMirrorConfig {
pub struct HomePartitionConfig {
pub remote_cluster: String,
pub remote_replica: String,
// if this is set, home will be mirror instead of
#[cfg_attr(
feature = "use_serde",
serde(default, skip_serializing_if = "crate::is_false")
)]
pub source: bool,
}

impl std::fmt::Display for HomePartitionConfig {
Expand All @@ -192,6 +232,11 @@ pub struct RemotePartitionConfig {
#[cfg_attr(feature = "use_serde", serde(default))]
pub home_spu_id: SpuId,
pub home_spu_endpoint: String,
#[cfg_attr(
feature = "use_serde",
serde(default, skip_serializing_if = "crate::is_false")
)]
pub target: bool,
}

impl std::fmt::Display for RemotePartitionConfig {
Expand Down
34 changes: 31 additions & 3 deletions crates/fluvio-controlplane-metadata/src/topic/k8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,33 @@ mod test_spec {
}

#[test]
fn read_k8_topic_partition_mirror_json() {
fn read_k8_topic_partition_mirror_json_v1() {
let reader: BufReader<File> =
BufReader::new(File::open("tests/k8_topic_mirror_down_v1.json").expect("spec"));
let topic: K8TopicSpec = serde_json::from_reader(reader).expect("failed to parse topic");
assert_eq!(topic.metadata.name, "downstream-topic");
assert_eq!(
topic.spec.replicas().to_owned(),
ReplicaSpec::Mirror(MirrorConfig::Home(
vec![
HomePartitionConfig {
remote_cluster: "boat1".to_string(),
remote_replica: "boats-0".to_string(),
..Default::default()
},
HomePartitionConfig {
remote_cluster: "boat2".to_string(),
remote_replica: "boats-0".to_string(),
..Default::default()
}
]
.into()
))
);
}

#[test]
fn read_k8_topic_partition_mirror_json_v2() {
let reader: BufReader<File> =
BufReader::new(File::open("tests/k8_topic_mirror_down_v2.json").expect("spec"));
let topic: K8TopicSpec = serde_json::from_reader(reader).expect("failed to parse topic");
Expand All @@ -61,11 +87,13 @@ mod test_spec {
vec![
HomePartitionConfig {
remote_cluster: "boat1".to_string(),
remote_replica: "boats-0".to_string()
remote_replica: "boats-0".to_string(),
..Default::default()
},
HomePartitionConfig {
remote_cluster: "boat2".to_string(),
remote_replica: "boats-0".to_string()
remote_replica: "boats-0".to_string(),
..Default::default()
}
]
.into()
Expand Down
Loading

0 comments on commit 2511b94

Please sign in to comment.