diff --git a/src/manifest.rs b/src/manifest.rs index 71bb6a2..1bf5c78 100644 --- a/src/manifest.rs +++ b/src/manifest.rs @@ -153,69 +153,89 @@ impl SegmentManifest { Ok(m) } - pub fn drop_segments(&self, ids: &[u64]) -> crate::Result<()> { - // TODO: atomic swap + /// Modifies the level manifest atomically. + pub(crate) fn atomic_swap>)>( + &self, + f: F, + ) -> crate::Result<()> { + // NOTE: Create a copy of the levels we can operate on + // without mutating the current level manifest + // If persisting to disk fails, this way the level manifest + // is unchanged + let mut prev_segments = self.segments.write().expect("lock is poisoned"); + + let mut working_copy = prev_segments.clone(); + + f(&mut working_copy); + + let ids = working_copy.keys().copied().collect::>(); + + Self::write_to_disk(&self.path, &ids)?; + *prev_segments = working_copy; - let mut lock = self.segments.write().expect("lock is poisoned"); - lock.retain(|x, _| !ids.contains(x)); - Self::write_to_disk(&self.path, &lock.keys().copied().collect::>()) + log::trace!("Swapped vLog segment list to: {ids:?}"); + + Ok(()) } - pub fn register(&self, writer: MultiWriter) -> crate::Result<()> { - // TODO: atomic swap + pub fn drop_segments(&self, ids: &[u64]) -> crate::Result<()> { + self.atomic_swap(|recipe| { + recipe.retain(|x, _| !ids.contains(x)); + }) + } - let mut lock = self.segments.write().expect("lock is poisoned"); + pub fn register(&self, writer: MultiWriter) -> crate::Result<()> { let writers = writer.finish()?; - for writer in writers { - if writer.item_count == 0 { - log::trace!( - "Writer at {:?} has written no data, deleting empty vLog segment file", - writer.path - ); - if let Err(e) = std::fs::remove_file(&writer.path) { - log::warn!( - "Could not delete empty vLog segment file at {:?}: {e:?}", + self.atomic_swap(move |recipe| { + for writer in &writers { + if writer.item_count == 0 { + log::trace!( + "Writer at {:?} has written no data, deleting empty vLog segment file", writer.path ); - }; - continue; - } + if let Err(e) = std::fs::remove_file(&writer.path) { + log::warn!( + "Could not delete empty vLog segment file at {:?}: {e:?}", + writer.path + ); + }; + continue; + } - let segment_id = writer.segment_id; - - lock.insert( - segment_id, - Arc::new(Segment { - id: segment_id, - path: writer.path, - meta: Metadata { - item_count: writer.item_count, - compressed_bytes: writer.written_blob_bytes, - total_uncompressed_bytes: writer.uncompressed_bytes, - key_range: KeyRange::new(( - writer - .first_key - .clone() - .expect("should have written at least 1 item"), - writer - .last_key - .clone() - .expect("should have written at least 1 item"), - )), - }, - gc_stats: GcStats::default(), - }), - ); - - log::debug!( - "Created segment #{segment_id:?} ({} items, {} userdata bytes)", - writer.item_count, - writer.uncompressed_bytes, - ); - } + let segment_id = writer.segment_id; - Self::write_to_disk(&self.path, &lock.keys().copied().collect::>()) + recipe.insert( + segment_id, + Arc::new(Segment { + id: segment_id, + path: writer.path.clone(), + meta: Metadata { + item_count: writer.item_count, + compressed_bytes: writer.written_blob_bytes, + total_uncompressed_bytes: writer.uncompressed_bytes, + key_range: KeyRange::new(( + writer + .first_key + .clone() + .expect("should have written at least 1 item"), + writer + .last_key + .clone() + .expect("should have written at least 1 item"), + )), + }, + gc_stats: GcStats::default(), + }), + ); + + log::debug!( + "Created segment #{segment_id:?} ({} items, {} userdata bytes)", + writer.item_count, + writer.uncompressed_bytes, + ); + } + }) } fn write_to_disk>(path: P, segment_ids: &[SegmentId]) -> crate::Result<()> { diff --git a/src/segment/meta.rs b/src/segment/meta.rs index ae5d506..36ebb4d 100644 --- a/src/segment/meta.rs +++ b/src/segment/meta.rs @@ -19,7 +19,7 @@ pub struct Metadata { /// true size in bytes (if no compression were used) pub total_uncompressed_bytes: u64, - // TODO: + // TODO: 1.0.0 ///// What type of compression is used // pub compression: CompressionType, /// Key range