Skip to content

Commit

Permalink
WIP: pop this & describe changes
Browse files Browse the repository at this point in the history
  • Loading branch information
thedodd committed Oct 24, 2021
1 parent a3f6801 commit 8c2f0b7
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 41 deletions.
7 changes: 5 additions & 2 deletions hadron-operator/src/k8s/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ impl Controller {
match self.streams.get(name.as_ref()) {
None => (),
Some(stream) => {
let mut new_sts = self.build_stream_statefulset(&stream);
let mut new_sts = self.build_stream_statefulset(stream);
new_sts = match self.create_statefulset(new_sts).await {
Ok(new_sts) => new_sts,
Err(err) => {
Expand Down Expand Up @@ -486,6 +486,8 @@ impl Controller {
let selector = spec.selector.get_or_insert_with(Default::default);
set_cannonical_labels(selector);
selector.insert(LABEL_HADRON_RS_STREAM.into(), stream.name().into());
spec.cluster_ip = Some("None".into());
spec.type_ = Some("ClusterIP".into());
spec.ports = Some(vec![
ServicePort {
name: Some("client-port".into()),
Expand Down Expand Up @@ -608,6 +610,7 @@ impl Controller {
type_: Some("RollingUpdate".into()),
rolling_update: None,
});
spec.service_name = stream.name().into();
spec.replicas = Some(stream.spec.partitions as i32);
spec.selector = LabelSelector {
match_labels: Some(labels.clone()),
Expand Down Expand Up @@ -826,7 +829,7 @@ impl Controller {
}
let api: Api<Secret> = Api::namespaced(self.client.clone(), &self.config.namespace);
let params = kube::api::PostParams::default();
timeout(API_TIMEOUT, api.create(&params, &secret))
timeout(API_TIMEOUT, api.create(&params, secret))
.await
.context("timeout while creating secret")?
.context("error creating secret")
Expand Down
12 changes: 0 additions & 12 deletions hadron-stream/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ pub const DEFAULT_DATA_PATH: &str = "/usr/local/hadron/db";
const TREE_STREAM: &str = "stream";
/// The DB tree prefix used for pipelines.
const TREE_PIPELINE_PREFIX: &str = "pipelines";
/// The DB tree prefix used for pipeline metadata.
const TREE_PIPELINE_METADATA: &str = "pipelines_metadata";

/// The default path to use for data storage.
pub fn default_data_path() -> String {
Expand Down Expand Up @@ -87,14 +85,4 @@ impl Database {
.and_then(|res| res.map_err(|err| ShutdownError(anyhow!("could not open DB tree {} {}", &name, err))))?;
Ok(tree)
}

/// Get a handle to the DB tree for a pipeline partition's metadata.
pub async fn get_pipeline_tree_metadata(&self, name: &str) -> ShutdownResult<Tree> {
let name = format!("{}/{}", TREE_PIPELINE_METADATA, name);
let (db, ivname) = (self.inner.db.clone(), IVec::from(name.as_str()));
let tree = Self::spawn_blocking(move || -> Result<Tree> { Ok(db.open_tree(ivname)?) })
.await
.and_then(|res| res.map_err(|err| ShutdownError(anyhow!("could not open DB tree {} {}", &name, err))))?;
Ok(tree)
}
}
2 changes: 2 additions & 0 deletions hadron-stream/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ mod pipeline;
mod server;
mod stream;
mod utils;
#[cfg(test)]
mod utils_test;
mod watchers;

use std::io::Write;
Expand Down
36 changes: 18 additions & 18 deletions hadron-stream/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const DEFAULT_MAX_PARALLEL: u32 = 50;
///
/// NOTE: this does not necessarily indicate that the pipeline for this key has actually been
/// executed, but only that it has been prepared for execution.
const KEY_LAST_OFFSET_PROCESSED: &str = "/meta/last_offset_processed";
const KEY_LAST_OFFSET_PROCESSED: &[u8; 1] = b"l";
/// A metadata key prefix used for tracking active pipeline instances.
///
/// Active instances are keyed as `a{offset}` where `{offset}` is the offset
Expand All @@ -53,8 +53,6 @@ pub struct PipelineCtl {
_db: Database,
/// The database tree for storing this pipeline's instance records.
tree: Tree,
/// The database tree for this pipeline's metadata storage.
tree_metadata: Tree,
/// The database tree of this pipeline's source stream; which is only ever used for reading.
tree_stream: Tree,
/// The data model of the pipeline with which this controller is associated.
Expand Down Expand Up @@ -97,17 +95,15 @@ impl PipelineCtl {
shutdown_tx: broadcast::Sender<()>, events_tx: mpsc::Sender<PipelineCtlMsg>, events_rx: mpsc::Receiver<PipelineCtlMsg>,
) -> Result<Self> {
let tree = db.get_pipeline_tree(pipeline.name()).await?;
let tree_metadata = db.get_pipeline_tree_metadata(pipeline.name()).await?;
let stream_tree = db.get_stream_tree().await?;
let stream_offset = *stream_signal.borrow();
let (last_offset_processed, active_pipelines) =
recover_pipeline_state(&tree, &tree_metadata, &stream_tree, pipeline.clone(), stream_offset).await?;
recover_pipeline_state(tree.clone(), stream_tree.clone(), pipeline.clone(), stream_offset).await?;

Ok(Self {
config,
_db: db,
tree,
tree_metadata,
tree_stream: stream_tree,
pipeline,
partition,
Expand Down Expand Up @@ -564,24 +560,27 @@ impl PipelineCtl {
self.is_fetching_stream_data = true;
tokio::spawn(Self::try_fetch_stream_data(
self.tree_stream.clone(),
self.tree_metadata.clone(),
self.tree.clone(),
self.last_offset_processed,
self.pipeline.clone(),
self.max_parallel(),
self.events_tx.clone(),
));
}

#[tracing::instrument(level = "trace", skip(tree_stream, tree_metadata, last_offset_processed, pipeline, max_parallel, tx))]
#[tracing::instrument(level = "trace", skip(tree_stream, tree_pipeline, last_offset_processed, pipeline, max_parallel, tx))]
async fn try_fetch_stream_data(
tree_stream: Tree, tree_metadata: Tree, last_offset_processed: u64, pipeline: Arc<Pipeline>, max_parallel: u32,
tree_stream: Tree, tree_pipeline: Tree, last_offset_processed: u64, pipeline: Arc<Pipeline>, max_parallel: u32,
tx: mpsc::Sender<PipelineCtlMsg>,
) {
tracing::debug!("fetching stream data for pipeline");
let data_res = Database::spawn_blocking(move || -> Result<FetchStreamRecords> {
// Iterate over the records of the stream up to the maximum parallel allowed.
let (mut metadata_batch, mut new_instances) = (sled::Batch::default(), Vec::with_capacity(max_parallel as usize));
let (mut pipeline_batch, mut new_instances) = (sled::Batch::default(), Vec::with_capacity(max_parallel as usize));
let (start, mut last_processed, mut count) = (&utils::encode_u64(last_offset_processed + 1), last_offset_processed, 0);
// TODO: need to iterate the tree correctly only scanning events.
// TODO: ensure the pipeline tree is used correctly below with prefixes.
// TODO: store the root event in the pipeline as well, to avoid issues with compaction.
let stream = tree_stream.range::<_, std::ops::RangeFrom<&[u8]>>(start..);
for event_res in stream {
// Decode the records offset.
Expand All @@ -596,7 +595,7 @@ impl PipelineCtl {
}

// Construct pipeline instance & add to batch.
metadata_batch.insert(&utils::encode_byte_prefix(PREFIX_META_ACTIVE_INSTANCES, offset), &key);
pipeline_batch.insert(&utils::encode_byte_prefix(PREFIX_META_ACTIVE_INSTANCES, offset), &key);
let inst = ActivePipelineInstance {
root_event,
root_event_offset: offset,
Expand All @@ -613,9 +612,9 @@ impl PipelineCtl {
}

// Apply the batch of changes.
metadata_batch.insert(KEY_LAST_OFFSET_PROCESSED, &utils::encode_u64(last_processed));
let _res = tree_metadata
.apply_batch(metadata_batch)
pipeline_batch.insert(KEY_LAST_OFFSET_PROCESSED, &utils::encode_u64(last_processed));
let _res = tree_pipeline
.apply_batch(pipeline_batch)
.context("error applying metadata batch while fetching stream data for pipeline")?;

Ok(FetchStreamRecords {
Expand All @@ -641,12 +640,11 @@ impl PipelineCtl {
/// - The output of each stage of a pipeline instance is recorded under `/s/{instance}/{stage}` where
/// `{instance}` is the input stream record's offset and `{stage}` is the name of the pipeline stage.
async fn recover_pipeline_state(
pipeline_tree: &Tree, metadata_tree: &Tree, stream_tree: &Tree, pipeline: Arc<Pipeline>, stream_latest_offset: u64,
pipeline_tree: Tree, stream_tree: Tree, pipeline: Arc<Pipeline>, stream_latest_offset: u64,
) -> Result<(u64, BTreeMap<u64, ActivePipelineInstance>)> {
let (pipeline_tree, metadata_tree, stream_tree) = (pipeline_tree.clone(), metadata_tree.clone(), stream_tree.clone());
let val = Database::spawn_blocking(move || -> Result<(u64, BTreeMap<u64, ActivePipelineInstance>)> {
// Fetch last source stream offset to have been processed by this pipeline.
let last_offset = metadata_tree
let last_offset = pipeline_tree
.get(KEY_LAST_OFFSET_PROCESSED)
.context("error fetching pipeline last offset processed key")?
.map(|val| utils::decode_u64(&val))
Expand All @@ -660,13 +658,15 @@ async fn recover_pipeline_state(
});

// Fetch active instances.
let active_instances = metadata_tree.scan_prefix(PREFIX_META_ACTIVE_INSTANCES).values().try_fold(
let active_instances = pipeline_tree.scan_prefix(PREFIX_META_ACTIVE_INSTANCES).values().try_fold(
BTreeMap::new(),
|mut acc, val| -> Result<BTreeMap<u64, ActivePipelineInstance>> {
let offset_ivec = val.context(ERR_ITER_FAILURE)?;
let offset = utils::decode_u64(&offset_ivec).context("error decoding active pipeline offset")?;

// Fetch the stream event which triggered this pipeline instance.
// TODO: refactor this. The root event needs to be stored in the pipeline tree as well so that
// compaction does not cause complications.
let root_event = stream_tree
.get(&offset_ivec)
.context("error fetching pipeline instance root event")?
Expand Down
21 changes: 12 additions & 9 deletions hadron-stream/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,25 @@ use crate::models::stream::Subscription;
use crate::stream::subscriber::StreamSubCtlMsg;
use crate::utils;

/*
TODO:
- [x] simplify PREFIX_STREAM_SUBS & PREFIX_STREAM_SUB_OFFSETS with minimal `{byte}` prefixes
and use IVec::from<Iterator> to build IVec keys.
- store the stream's last written offset under the KEY_STREAM_LAST_WRITTEN_OFFSET key.
- store all stream entries using a `{byte}` prefix.
- [x] collapse the metadata stream into the standard stream tree.
*/

/// The key prefix used for storing stream events.
///
/// NOTE: in order to preserve lexicographical ordering of keys, it is important to always use
/// the `utils::encode_byte_prefix*` methods.
const PREFIX_STREAM_EVENT: &[u8; 1] = b"e";
/// The key prefix used for storing stream event timestamps.
///
/// NOTE: in order to preserve lexicographical ordering of keys, it is important to always use
/// the `utils::encode_byte_prefix*` methods.
const PREFIX_STREAM_TS: &[u8; 1] = b"t";
/// The database key prefix used for storing stream subscriber data.
///
/// NOTE: in order to preserve lexicographical ordering of keys, it is important to always use
/// the `utils::encode_byte_prefix*` methods.
const PREFIX_STREAM_SUBS: &[u8; 1] = b"s";
/// The database key prefix used for storing stream subscriber offsets.
///
/// NOTE: in order to preserve lexicographical ordering of keys, it is important to always use
/// the `utils::encode_byte_prefix*` methods.
const PREFIX_STREAM_SUB_OFFSETS: &[u8; 1] = b"o";
/// The key used to store the last written offset for the stream.
const KEY_STREAM_LAST_WRITTEN_OFFSET: &[u8; 1] = b"l";
Expand Down
92 changes: 92 additions & 0 deletions hadron-stream/src/utils_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use anyhow::{Context, Result};

use crate::config::Config;
use crate::database::Database;
use crate::utils;

const ERR_MSG_ITER: &str = "error iterating scanned data";
const NUM_ENTRIES: u64 = 1_001;
const PREFIX_A: &[u8; 1] = b"a";
/// We use this in tests as it is middle in lexicographical sort order.
const PREFIX_B: &[u8; 1] = b"b";
const PREFIX_C: &[u8; 1] = b"c";

#[tokio::test]
async fn test_exhaustive_scan_prefix_and_range_behavior() -> Result<()> {
let (config, _tmpdir) = Config::new_test()?;
let db = Database::new(config.clone()).await?;
let tree = db.get_stream_tree().await?;

// Load data distributed across three key prefixes which are used to assert correctness of
// range scans and prefix scans, which depend upon the correctness of key encoding.
load_data(&tree)?;

// Assert that prefix scan finds the correct amount of data.
let mut count = 0;
for kv_res in tree.scan_prefix(PREFIX_B) {
let (key, val) = kv_res.context(ERR_MSG_ITER)?;
if key[0] != PREFIX_B[0] {
println!("bad key prefix: got {}; expected: {};", key[0], PREFIX_B[0]);
} else {
count += 1;
}
let _key = utils::decode_u64(&key[1..])?;
let _val = utils::decode_u64(&val)?;
}
assert_eq!(count, NUM_ENTRIES, "expected scan_prefix to find {} entries, got {}", NUM_ENTRIES, count);

// Assert that range scans preserve sort order based on our key prefix strategy.
let (start, stop, mut count, mut current_offset) = (PREFIX_B, PREFIX_C, 0, 0u64);
for kv_res in tree.range::<_, std::ops::Range<&[u8]>>(start..stop) {
let (key, val) = kv_res.context(ERR_MSG_ITER)?;
if key[0] != PREFIX_B[0] {
println!("bad key prefix: got {}; expected: {};", key[0], &PREFIX_B[0]);
} else {
count += 1;
}
let key = utils::decode_u64(&key[1..])?;
let val = utils::decode_u64(&val)?;
assert_eq!(
key, current_offset,
"db.range with prefix iterated out of order, expected key {} got {}",
current_offset, key
);
assert_eq!(
val, current_offset,
"db.range with prefix iterated out of order, expected val {} got {}",
current_offset, val
);
current_offset += 1;
}
assert_eq!(count, NUM_ENTRIES, "expected range to find {} entries, got {}", NUM_ENTRIES, count);

Ok(())
}

#[test]
fn test_ivec_and_encoding_compat() {
let i0 = utils::ivec_from_iter(PREFIX_B.iter().copied().chain(u64::MIN.to_be_bytes().iter().copied()));
let i1 = utils::ivec_from_iter(PREFIX_B.iter().copied().chain(1u64.to_be_bytes().iter().copied()));
let i2 = utils::ivec_from_iter(PREFIX_B.iter().copied().chain(10u64.to_be_bytes().iter().copied()));

let e0 = utils::encode_byte_prefix(PREFIX_B, u64::MIN);
let e1 = utils::encode_byte_prefix(PREFIX_B, 1u64);
let e2 = utils::encode_byte_prefix(PREFIX_B, 10u64);

assert_eq!(&i0, &e0, "ivec slice i0 is different from encoded slice:\n{:?}\n{:?}", &i0, &e0);
assert_eq!(&i1, &e1, "ivec slice i1 is different from encoded slice:\n{:?}\n{:?}", &i1, &e1);
assert_eq!(&i2, &e2, "ivec slice i2 is different from encoded slice:\n{:?}\n{:?}", &i2, &e2);
}

fn load_data(db: &sled::Tree) -> Result<()> {
for prefix in [PREFIX_A, PREFIX_B, PREFIX_C] {
let mut batch = sled::Batch::default();
for offset in 0..NUM_ENTRIES {
let key = utils::encode_byte_prefix(prefix, offset);
batch.insert(&key, &utils::encode_u64(offset));
}
db.apply_batch(batch).context("error inserting data")?;
}
db.flush().context("error flusing data")?;
Ok(())
}

0 comments on commit 8c2f0b7

Please sign in to comment.