Skip to content

Commit

Permalink
Pipeline storage has been updated
Browse files Browse the repository at this point in the history
Pipeline data indexing strategy has been updated to use a single tree and
a more efficient indexing strategy based upon lexicographical ordering
of encoded keys.
  • Loading branch information
thedodd committed Oct 25, 2021
1 parent e035cc1 commit dfbb047
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 47 deletions.
78 changes: 36 additions & 42 deletions hadron-stream/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,24 @@ use crate::database::Database;
use crate::error::{RpcResult, ShutdownError, ShutdownResult, ERR_DB_FLUSH, ERR_ITER_FAILURE};
use crate::futures::LivenessStream;
use crate::grpc::{Event, PipelineSubscribeRequest, PipelineSubscribeRequestAction, PipelineSubscribeResponse};
use crate::stream::PREFIX_STREAM_EVENT;
use crate::utils;
use hadron_core::crd::{Pipeline, PipelineStartPointLocation, RequiredMetadata};

// TODO: change all keys & key prefixes to use `b'{val}'` as u8 bytes.

const DEFAULT_MAX_PARALLEL: u32 = 50;
/// A pipeline metadata key used to track the last offset of the source stream to have been
/// transformed into a pipeline instance for pipeline processing.
///
/// NOTE: this does not necessarily indicate that the pipeline for this key has actually been
/// executed, but only that it has been prepared for execution.
const KEY_LAST_OFFSET_PROCESSED: &[u8; 1] = b"l";
/// A metadata key prefix used for tracking active pipeline instances.
/// A key prefix used for tracking active pipeline instances.
///
/// Active instances are keyed as `a{offset}` where `{offset}` is the offset
/// of the event from the source stream. The value is the offset.
const PREFIX_META_ACTIVE_INSTANCES: &[u8; 1] = b"a";
/// of the event from the source stream. The value is the corresponding root event.
const PREFIX_ACTIVE_INSTANCES: &[u8; 1] = b"a";
/// The key prefix under which pipeline stage outputs are stored.
const PREFIX_PIPELINE_STAGE_OUTPUTS: &[u8; 1] = b"o";

Expand Down Expand Up @@ -97,8 +100,7 @@ impl PipelineCtl {
let tree = db.get_pipeline_tree(pipeline.name()).await?;
let stream_tree = db.get_stream_tree().await?;
let stream_offset = *stream_signal.borrow();
let (last_offset_processed, active_pipelines) =
recover_pipeline_state(tree.clone(), stream_tree.clone(), pipeline.clone(), stream_offset).await?;
let (last_offset_processed, active_pipelines) = recover_pipeline_state(tree.clone(), pipeline.clone(), stream_offset).await?;

Ok(Self {
config,
Expand Down Expand Up @@ -534,15 +536,16 @@ impl PipelineCtl {
return Ok(());
}
};
let key = format!(
"{}{}/{}",
unsafe { std::str::from_utf8_unchecked(PREFIX_PIPELINE_STAGE_OUTPUTS) },
offset,
&*stage_name,
let key = utils::ivec_from_iter(
PREFIX_PIPELINE_STAGE_OUTPUTS
.iter()
.copied()
.chain(utils::encode_u64(offset))
.chain(stage_name.as_bytes().iter().copied()),
);
let event_bytes = utils::encode_model(&event)?;
pipeline_tree
.insert(key.as_bytes(), event_bytes.as_slice())
.insert(key, event_bytes.as_slice())
.context("error recording pipeline stage output on disk")
.map_err(ShutdownError::from)?;
pipeline_tree
Expand Down Expand Up @@ -577,15 +580,16 @@ impl PipelineCtl {
let data_res = Database::spawn_blocking(move || -> Result<FetchStreamRecords> {
// Iterate over the records of the stream up to the maximum parallel allowed.
let (mut pipeline_batch, mut new_instances) = (sled::Batch::default(), Vec::with_capacity(max_parallel as usize));
let (start, mut last_processed, mut count) = (&utils::encode_u64(last_offset_processed + 1), last_offset_processed, 0);
// TODO: need to iterate the tree correctly only scanning events.
// TODO: ensure the pipeline tree is used correctly below with prefixes.
// TODO: store the root event in the pipeline as well, to avoid issues with compaction.
let stream = tree_stream.range::<_, std::ops::RangeFrom<&[u8]>>(start..);
for event_res in stream {
let (mut last_processed, mut count) = (last_offset_processed, 0);
let (start, stop) = (
&utils::encode_byte_prefix(PREFIX_STREAM_EVENT, last_offset_processed + 1),
&utils::encode_byte_prefix(PREFIX_STREAM_EVENT, u64::MAX),
);
let kv_iter = tree_stream.range::<_, std::ops::RangeInclusive<&[u8]>>(start..=stop);
for event_res in kv_iter {
// Decode the records offset.
let (key, root_event_bytes) = event_res.context(ERR_ITER_FAILURE)?;
let offset = utils::decode_u64(key.as_ref())?;
let offset = utils::decode_u64(&key[1..])?;
last_processed = offset;
let root_event: Event = utils::decode_model(root_event_bytes.as_ref()).context("error decoding event from storage")?;

Expand All @@ -595,7 +599,7 @@ impl PipelineCtl {
}

// Construct pipeline instance & add to batch.
pipeline_batch.insert(&utils::encode_byte_prefix(PREFIX_META_ACTIVE_INSTANCES, offset), &key);
pipeline_batch.insert(&utils::encode_byte_prefix(PREFIX_ACTIVE_INSTANCES, offset), &root_event_bytes);
let inst = ActivePipelineInstance {
root_event,
root_event_offset: offset,
Expand All @@ -616,6 +620,7 @@ impl PipelineCtl {
let _res = tree_pipeline
.apply_batch(pipeline_batch)
.context("error applying metadata batch while fetching stream data for pipeline")?;
let _res = tree_pipeline.flush().context(ERR_DB_FLUSH)?;

Ok(FetchStreamRecords {
last_offset_processed: last_processed,
Expand All @@ -635,12 +640,12 @@ impl PipelineCtl {
/// The pipeline tree records pipeline instances/executions based on the input stream's
/// offset, which provides easy "exactly once" consumption of the input stream. In the pipeline
/// tree:
/// - The key for a pipeline instance will be roughly `/i/{instance}/`, where `{instance}` is the
/// input stream record's offset. The value stored here is top-level metadata of the pipeline instance.
/// - The output of each stage of a pipeline instance is recorded under `/s/{instance}/{stage}` where
/// `{instance}` is the input stream record's offset and `{stage}` is the name of the pipeline stage.
/// - The key for a pipeline instance will be roughly `a{offset}/`, where `{offset}` is the
/// source stream record's offset. The value stored here is a copy of the root event from the stream.
/// - The output of each stage of a pipeline instance is recorded under `o{instance}{stage}` where
/// `{instance}` is the source stream record's offset and `{stage}` is the name of the pipeline stage.
async fn recover_pipeline_state(
pipeline_tree: Tree, stream_tree: Tree, pipeline: Arc<Pipeline>, stream_latest_offset: u64,
pipeline_tree: Tree, pipeline: Arc<Pipeline>, stream_latest_offset: u64,
) -> Result<(u64, BTreeMap<u64, ActivePipelineInstance>)> {
let val = Database::spawn_blocking(move || -> Result<(u64, BTreeMap<u64, ActivePipelineInstance>)> {
// Fetch last source stream offset to have been processed by this pipeline.
Expand All @@ -658,33 +663,22 @@ async fn recover_pipeline_state(
});

// Fetch active instances.
let active_instances = pipeline_tree.scan_prefix(PREFIX_META_ACTIVE_INSTANCES).values().try_fold(
let active_instances = pipeline_tree.scan_prefix(PREFIX_ACTIVE_INSTANCES).try_fold(
BTreeMap::new(),
|mut acc, val| -> Result<BTreeMap<u64, ActivePipelineInstance>> {
let offset_ivec = val.context(ERR_ITER_FAILURE)?;
let offset = utils::decode_u64(&offset_ivec).context("error decoding active pipeline offset")?;

// Fetch the stream event which triggered this pipeline instance.
// TODO: refactor this. The root event needs to be stored in the pipeline tree as well so that
// compaction does not cause complications.
let root_event = stream_tree
.get(&offset_ivec)
.context("error fetching pipeline instance root event")?
.map(|data| -> Result<Event> { utils::decode_model::<Event>(data.as_ref()).context("error decoding event from storage") })
.transpose()?
.context("source event of pipeline instance not found")?;
|mut acc, kv_res| -> Result<BTreeMap<u64, ActivePipelineInstance>> {
let (key, val) = kv_res.context(ERR_ITER_FAILURE)?;
let offset = utils::decode_u64(&key[1..]).context("error decoding active pipeline offset")?;
let root_event: Event = utils::decode_model(&val).context("error decoding event from storage")?;

// Iterate over all outputs currently recorded for this pipeline instance.
// See `try_record_delivery_response`, these are keyed as `/s/{offset}/{stage_name}`.
// See `try_record_delivery_response`, these are keyed as `o{offset}{stage}`.
let mut outputs = HashMap::new();
for iter_res in pipeline_tree.scan_prefix(&utils::encode_byte_prefix(PREFIX_PIPELINE_STAGE_OUTPUTS, offset)) {
let (key, val) = iter_res.context(ERR_ITER_FAILURE)?;
let key = std::str::from_utf8(&key).context("data corruption: all keys should be valid utf8")?;
let stage = key.split('/').last().unwrap_or("");
let stage = std::str::from_utf8(&key[9..]).context("data corruption: stage name for pipeline stage output is not valid utf8")?;
let output = Event::decode(val.as_ref()).context("error decoding pipeline stage output")?;
outputs.insert(stage.into(), output);
}

let inst = ActivePipelineInstance {
root_event,
root_event_offset: offset,
Expand Down
10 changes: 5 additions & 5 deletions hadron-stream/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,24 @@ use crate::utils;
///
/// NOTE: in order to preserve lexicographical ordering of keys, it is important to always use
/// the `utils::encode_byte_prefix*` methods.
const PREFIX_STREAM_EVENT: &[u8; 1] = b"e";
pub const PREFIX_STREAM_EVENT: &[u8; 1] = b"e";
/// The key prefix used for storing stream event timestamps.
///
/// NOTE: in order to preserve lexicographical ordering of keys, it is important to always use
/// the `utils::encode_byte_prefix*` methods.
const PREFIX_STREAM_TS: &[u8; 1] = b"t";
pub const PREFIX_STREAM_TS: &[u8; 1] = b"t";
/// The database key prefix used for storing stream subscriber data.
///
/// NOTE: in order to preserve lexicographical ordering of keys, it is important to always use
/// the `utils::encode_byte_prefix*` methods.
const PREFIX_STREAM_SUBS: &[u8; 1] = b"s";
pub const PREFIX_STREAM_SUBS: &[u8; 1] = b"s";
/// The database key prefix used for storing stream subscriber offsets.
///
/// NOTE: in order to preserve lexicographical ordering of keys, it is important to always use
/// the `utils::encode_byte_prefix*` methods.
const PREFIX_STREAM_SUB_OFFSETS: &[u8; 1] = b"o";
pub const PREFIX_STREAM_SUB_OFFSETS: &[u8; 1] = b"o";
/// The key used to store the last written offset for the stream.
const KEY_STREAM_LAST_WRITTEN_OFFSET: &[u8; 1] = b"l";
pub const KEY_STREAM_LAST_WRITTEN_OFFSET: &[u8; 1] = b"l";

const ERR_DECODING_STREAM_META_GROUP_NAME: &str = "error decoding stream meta group name from storage";

Expand Down
21 changes: 21 additions & 0 deletions hadron-stream/src/utils_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ fn test_ivec_and_encoding_compat() {
let i0 = utils::ivec_from_iter(PREFIX_B.iter().copied().chain(u64::MIN.to_be_bytes().iter().copied()));
let i1 = utils::ivec_from_iter(PREFIX_B.iter().copied().chain(1u64.to_be_bytes().iter().copied()));
let i2 = utils::ivec_from_iter(PREFIX_B.iter().copied().chain(10u64.to_be_bytes().iter().copied()));
let i3 = utils::ivec_from_iter(
PREFIX_B
.iter()
.copied()
.chain(utils::encode_u64(10))
.chain("stage_name".as_bytes().iter().copied()),
);

let e0 = utils::encode_byte_prefix(PREFIX_B, u64::MIN);
let e1 = utils::encode_byte_prefix(PREFIX_B, 1u64);
Expand All @@ -76,6 +83,20 @@ fn test_ivec_and_encoding_compat() {
assert_eq!(&i0, &e0, "ivec slice i0 is different from encoded slice:\n{:?}\n{:?}", &i0, &e0);
assert_eq!(&i1, &e1, "ivec slice i1 is different from encoded slice:\n{:?}\n{:?}", &i1, &e1);
assert_eq!(&i2, &e2, "ivec slice i2 is different from encoded slice:\n{:?}\n{:?}", &i2, &e2);
assert_eq!(
&i3[..9],
&e2,
"the first 9 bytes of i3 do not match the byte encoded prefix:\n{:?}\n{:?}",
&i3[..9],
&e2
);
assert_eq!(
&i3[9..],
b"stage_name",
"the last 10 bytes of i3 do not match the expected stage name:\n{:?}\n{:?}",
&i3[9..],
b"stage_name"
);
}

fn load_data(db: &sled::Tree) -> Result<()> {
Expand Down

0 comments on commit dfbb047

Please sign in to comment.