Skip to content

Commit

Permalink
segment list atomic swap
Browse files Browse the repository at this point in the history
  • Loading branch information
marvin-j97 committed Jun 9, 2024
1 parent 2a36278 commit dc4c9c6
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 54 deletions.
126 changes: 73 additions & 53 deletions src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,69 +153,89 @@ impl SegmentManifest {
Ok(m)
}

pub fn drop_segments(&self, ids: &[u64]) -> crate::Result<()> {
// TODO: atomic swap
/// Modifies the level manifest atomically.
pub(crate) fn atomic_swap<F: Fn(&mut HashMap<SegmentId, Arc<Segment>>)>(
&self,
f: F,
) -> crate::Result<()> {
// NOTE: Create a copy of the levels we can operate on
// without mutating the current level manifest
// If persisting to disk fails, this way the level manifest
// is unchanged
let mut prev_segments = self.segments.write().expect("lock is poisoned");

let mut working_copy = prev_segments.clone();

f(&mut working_copy);

let ids = working_copy.keys().copied().collect::<Vec<_>>();

Self::write_to_disk(&self.path, &ids)?;
*prev_segments = working_copy;

let mut lock = self.segments.write().expect("lock is poisoned");
lock.retain(|x, _| !ids.contains(x));
Self::write_to_disk(&self.path, &lock.keys().copied().collect::<Vec<_>>())
log::trace!("Swapped vLog segment list to: {ids:?}");

Ok(())
}

pub fn register(&self, writer: MultiWriter) -> crate::Result<()> {
// TODO: atomic swap
pub fn drop_segments(&self, ids: &[u64]) -> crate::Result<()> {
self.atomic_swap(|recipe| {
recipe.retain(|x, _| !ids.contains(x));
})
}

let mut lock = self.segments.write().expect("lock is poisoned");
pub fn register(&self, writer: MultiWriter) -> crate::Result<()> {
let writers = writer.finish()?;

for writer in writers {
if writer.item_count == 0 {
log::trace!(
"Writer at {:?} has written no data, deleting empty vLog segment file",
writer.path
);
if let Err(e) = std::fs::remove_file(&writer.path) {
log::warn!(
"Could not delete empty vLog segment file at {:?}: {e:?}",
self.atomic_swap(move |recipe| {
for writer in &writers {
if writer.item_count == 0 {
log::trace!(
"Writer at {:?} has written no data, deleting empty vLog segment file",
writer.path
);
};
continue;
}
if let Err(e) = std::fs::remove_file(&writer.path) {
log::warn!(
"Could not delete empty vLog segment file at {:?}: {e:?}",
writer.path
);
};
continue;
}

let segment_id = writer.segment_id;

lock.insert(
segment_id,
Arc::new(Segment {
id: segment_id,
path: writer.path,
meta: Metadata {
item_count: writer.item_count,
compressed_bytes: writer.written_blob_bytes,
total_uncompressed_bytes: writer.uncompressed_bytes,
key_range: KeyRange::new((
writer
.first_key
.clone()
.expect("should have written at least 1 item"),
writer
.last_key
.clone()
.expect("should have written at least 1 item"),
)),
},
gc_stats: GcStats::default(),
}),
);

log::debug!(
"Created segment #{segment_id:?} ({} items, {} userdata bytes)",
writer.item_count,
writer.uncompressed_bytes,
);
}
let segment_id = writer.segment_id;

Self::write_to_disk(&self.path, &lock.keys().copied().collect::<Vec<_>>())
recipe.insert(
segment_id,
Arc::new(Segment {
id: segment_id,
path: writer.path.clone(),
meta: Metadata {
item_count: writer.item_count,
compressed_bytes: writer.written_blob_bytes,
total_uncompressed_bytes: writer.uncompressed_bytes,
key_range: KeyRange::new((
writer
.first_key
.clone()
.expect("should have written at least 1 item"),
writer
.last_key
.clone()
.expect("should have written at least 1 item"),
)),
},
gc_stats: GcStats::default(),
}),
);

log::debug!(
"Created segment #{segment_id:?} ({} items, {} userdata bytes)",
writer.item_count,
writer.uncompressed_bytes,
);
}
})
}

fn write_to_disk<P: AsRef<Path>>(path: P, segment_ids: &[SegmentId]) -> crate::Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion src/segment/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub struct Metadata {
/// true size in bytes (if no compression were used)
pub total_uncompressed_bytes: u64,

// TODO:
// TODO: 1.0.0
///// What type of compression is used
// pub compression: CompressionType,
/// Key range
Expand Down

0 comments on commit dc4c9c6

Please sign in to comment.