Skip to content

Commit

Permalink
mpmc: Block drop fix
Browse files Browse the repository at this point in the history
  • Loading branch information
tower120 committed Nov 20, 2024
1 parent 8e076a0 commit 6b55357
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 16 deletions.
11 changes: 11 additions & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# Changelog

## 0.2.1
### Fix
- `mpmc` block destructor could go out-of-bounds for `needs_drop` objects.

### Changed
- `mpmc::Writer` now unconditionally `Send`able.

### Added
- Better test coverage.


## 0.2.0

New mpmc algorithm. Previous algorithm was based on block write counters - block's
Expand Down
9 changes: 7 additions & 2 deletions src/block.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::alloc::{alloc, dealloc, handle_alloc_error, Layout};
use std::{mem, ptr};
use std::{cmp, mem, ptr};
use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::mem::{ManuallyDrop, MaybeUninit};
use std::ops::Deref;
use std::ptr::{null_mut, NonNull};
use std::sync::atomic;
use std::sync::atomic::{AtomicPtr, AtomicU64, AtomicUsize, Ordering};
use branch_hints::unlikely;

Expand Down Expand Up @@ -46,6 +47,8 @@ pub(crate) struct Block<T> {
///
/// It's len for writers. Readers use `bit_blocks` for getting
/// actual block len.
///
/// Will be >= BLOCK_SIZE after block is fully written.
// Aligning with cache-line size gives us +10% perf.
pub len : CacheLineAlign<AtomicUsize>,
use_count : AtomicUsize, // When decreases to 0 - frees itself
Expand Down Expand Up @@ -93,7 +96,7 @@ impl<T> Block<T>{

// drop mem
if mem::needs_drop::<T>() {
let len = this.as_ref().len.load(Ordering::Acquire);
let len = cmp::min(this.as_ref().len.load(Ordering::Acquire), BLOCK_SIZE);
let mem = this.as_mut().mem.get_mut();
for i in 0..len {
ptr::drop_in_place(mem.get_unchecked_mut(i).assume_init_mut());
Expand All @@ -116,6 +119,8 @@ impl<T> Block<T>{
// Release instead of AcqRel, because we'll drop this at 0
let prev = this.as_ref().use_count.fetch_sub(1, Ordering::Release);
if prev == 1 {
// See Arc::drop implementation, for this fence rationale.
atomic::fence(Ordering::Acquire);
Self::drop_this(this);
}
}
Expand Down
18 changes: 17 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,20 @@ pub mod mpmc;
pub mod spmc;

mod reader;
pub use reader::*;
pub use reader::*;

#[cfg(test)]
mod test{
#[derive(Clone, PartialEq)]
pub struct StringWrapper(String);
impl From<usize> for StringWrapper{
fn from(value: usize) -> Self {
Self(String::from(format!("{value}")))
}
}
impl From<StringWrapper> for usize{
fn from(value: StringWrapper) -> Self {
value.0.parse().unwrap()
}
}
}
23 changes: 15 additions & 8 deletions src/mpmc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,10 @@ impl<T> Drop for Queue<T> {
pub struct Writer<T> {
block: BlockArc<T>,
event_queue: Arc<Queue<T>>
}
}

unsafe impl<T> Send for Writer<T>{}

impl<T> Writer<T> {
#[inline]
fn fast_forward_to_last_block(&mut self, max_jumps: usize) -> Result<(), ()> {
Expand Down Expand Up @@ -375,6 +378,7 @@ mod test_mpmc{
use crate::block::BLOCK_SIZE;
use crate::LendingReader;
use crate::mpmc::Queue;
use crate::test::StringWrapper;

#[test]
fn test_mpmc() {
Expand All @@ -390,14 +394,16 @@ mod test_mpmc{

let mut vec = Vec::new();
while let Some(value) = reader.next() {
//println!("{value}");
vec.push(value.clone());
}
assert_equal(vec, 0..COUNT);
}

fn test_mpmc_mt(wt: usize, rt: usize, len: usize) {
let queue: Arc<Queue<usize>> = Default::default();
fn test_mpmc_mt<Value>(wt: usize, rt: usize, len: usize)
where
Value: From<usize> + Into<usize> + Clone + 'static,
{
let queue: Arc<Queue<Value>> = Default::default();

let mut joins = Vec::new();

Expand All @@ -406,11 +412,11 @@ mod test_mpmc{
for _ in 0..rt {
let mut reader = queue.reader();
joins.push(std::thread::spawn(move || {
let mut sum = 0;
let mut sum: usize = 0;
let mut i = 0;
loop {
if let Some(value) = reader.next() {
sum += value;
sum += value.clone().into();

i += 1;
if i == len {
Expand All @@ -428,7 +434,7 @@ mod test_mpmc{
let mut writer = queue.writer();
joins.push(std::thread::spawn(move || {
for i in t*messages..(t+1)*messages {
writer.push(i);
writer.push(i.into());
}
}));
}
Expand All @@ -449,7 +455,8 @@ mod test_mpmc{
let wt = rng.gen_range(1..=MAX_THREADS);
let rt = rng.gen_range(1..=MAX_THREADS);
let len = rng.gen_range(0..RANGE) / wt * wt;
test_mpmc_mt(wt, rt, len);
test_mpmc_mt::<usize>(wt, rt, len);
test_mpmc_mt::<StringWrapper>(wt, rt, len);
}
}
}
15 changes: 10 additions & 5 deletions src/spmc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,13 @@ mod test{
use crate::block::BLOCK_SIZE;
use crate::spmc::Queue;
use crate::LendingReader;
use crate::test::StringWrapper;

fn test_spmc_mt(rt: usize, len: usize) {
let queue: Arc<spin::Mutex<Queue<usize>>> = Default::default();
fn test_spmc_mt<Value>(rt: usize, len: usize)
where
Value: From<usize> + Into<usize> + Clone + 'static,
{
let queue: Arc<spin::Mutex<Queue<Value>>> = Default::default();

let mut joins = Vec::new();

Expand All @@ -162,7 +166,7 @@ mod test{
let mut i = 0;
loop {
if let Some(value) = reader.next() {
sum += value;
sum += value.clone().into();

i += 1;
if i == len {
Expand All @@ -176,7 +180,7 @@ mod test{

joins.push(std::thread::spawn(move || {
for i in 0..len{
queue.lock().push(i);
queue.lock().push(i.into());
}
}));

Expand All @@ -195,7 +199,8 @@ mod test{
for _ in 0..REPEATS {
let rt = rng.gen_range(1..=MAX_THREADS);
let len = rng.gen_range(0..RANGE);
test_spmc_mt(rt, len);
test_spmc_mt::<usize>(rt, len);
test_spmc_mt::<StringWrapper>(rt, len);
}
}
}

0 comments on commit 6b55357

Please sign in to comment.