Skip to content

Commit

Permalink
refactor: remove needless unsafe Send Sync implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
ethe committed Nov 23, 2024
1 parent 13fff5e commit b509436
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 117 deletions.
2 changes: 1 addition & 1 deletion bindings/python/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub struct Column {
pub datatype: DataType,
pub nullable: bool,
pub primary_key: bool,
pub(crate) value: Arc<dyn Any>,
pub(crate) value: Arc<dyn Any + Send + Sync>,
}

unsafe impl Send for Column {}
Expand Down
2 changes: 1 addition & 1 deletion bindings/python/src/datatype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl DataType {
DataType::Bytes => Vec::<u8>::default().to_object(py),
}
}
pub(crate) fn none_value(&self) -> Arc<dyn Any> {
pub(crate) fn none_value(&self) -> Arc<dyn Any + Send + Sync> {
match self {
DataType::UInt8 => Arc::new(Option::<u8>::None),
DataType::UInt16 => Arc::new(Option::<u16>::None),
Expand Down
34 changes: 22 additions & 12 deletions bindings/python/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,19 +159,29 @@ pub(crate) fn to_dict(
dict
}

pub(crate) fn to_key(py: Python, datatype: &DataType, key: Py<PyAny>) -> Arc<dyn Any> {
pub(crate) fn to_key(
py: Python,
datatype: &DataType,
key: Py<PyAny>,
) -> Arc<dyn Any + Send + Sync> {
match datatype {
DataType::UInt8 => Arc::new(key.extract::<u8>(py).unwrap()) as Arc<dyn Any>,
DataType::UInt16 => Arc::new(key.extract::<u16>(py).unwrap()) as Arc<dyn Any>,
DataType::UInt32 => Arc::new(key.extract::<u32>(py).unwrap()) as Arc<dyn Any>,
DataType::UInt64 => Arc::new(key.extract::<u64>(py).unwrap()) as Arc<dyn Any>,
DataType::Int8 => Arc::new(key.extract::<i8>(py).unwrap()) as Arc<dyn Any>,
DataType::Int16 => Arc::new(key.extract::<i16>(py).unwrap()) as Arc<dyn Any>,
DataType::Int32 => Arc::new(key.extract::<i32>(py).unwrap()) as Arc<dyn Any>,
DataType::Int64 => Arc::new(key.extract::<i64>(py).unwrap()) as Arc<dyn Any>,
DataType::String => Arc::new(key.extract::<String>(py).unwrap()) as Arc<dyn Any>,
DataType::Boolean => Arc::new(key.extract::<bool>(py).unwrap()) as Arc<dyn Any>,
DataType::Bytes => Arc::new(key.extract::<Vec<u8>>(py).unwrap()) as Arc<dyn Any>,
DataType::UInt8 => Arc::new(key.extract::<u8>(py).unwrap()) as Arc<dyn Any + Send + Sync>,
DataType::UInt16 => Arc::new(key.extract::<u16>(py).unwrap()) as Arc<dyn Any + Send + Sync>,
DataType::UInt32 => Arc::new(key.extract::<u32>(py).unwrap()) as Arc<dyn Any + Send + Sync>,
DataType::UInt64 => Arc::new(key.extract::<u64>(py).unwrap()) as Arc<dyn Any + Send + Sync>,
DataType::Int8 => Arc::new(key.extract::<i8>(py).unwrap()) as Arc<dyn Any + Send + Sync>,
DataType::Int16 => Arc::new(key.extract::<i16>(py).unwrap()) as Arc<dyn Any + Send + Sync>,
DataType::Int32 => Arc::new(key.extract::<i32>(py).unwrap()) as Arc<dyn Any + Send + Sync>,
DataType::Int64 => Arc::new(key.extract::<i64>(py).unwrap()) as Arc<dyn Any + Send + Sync>,
DataType::String => {
Arc::new(key.extract::<String>(py).unwrap()) as Arc<dyn Any + Send + Sync>
}
DataType::Boolean => {
Arc::new(key.extract::<bool>(py).unwrap()) as Arc<dyn Any + Send + Sync>
}
DataType::Bytes => {
Arc::new(key.extract::<Vec<u8>>(py).unwrap()) as Arc<dyn Any + Send + Sync>
}
}
}

Expand Down
195 changes: 120 additions & 75 deletions src/record/runtime/array.rs

Large diffs are not rendered by default.

21 changes: 9 additions & 12 deletions src/record/runtime/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,18 @@ impl ColumnDesc {
#[derive(Clone)]
pub struct Column {
pub datatype: Datatype,
pub value: Arc<dyn Any>,
pub value: Arc<dyn Any + Send + Sync>,
pub is_nullable: bool,
pub name: String,
}

unsafe impl Send for Column {}
unsafe impl Sync for Column {}

impl Column {
pub fn new(datatype: Datatype, name: String, value: Arc<dyn Any>, is_nullable: bool) -> Self {
pub fn new(
datatype: Datatype,
name: String,
value: Arc<dyn Any + Send + Sync>,
is_nullable: bool,
) -> Self {
Self {
datatype,
name,
Expand Down Expand Up @@ -102,7 +104,6 @@ impl PartialOrd for Column {
}
}

#[macro_export]
macro_rules! implement_col {
([], $({$Type:ty, $Datatype:ident}), *) => {
impl Ord for Column {
Expand Down Expand Up @@ -168,7 +169,6 @@ macro_rules! implement_col {
};
}

#[macro_export]
macro_rules! implement_key_col {
($({$Type:ident, $Datatype:ident, $Array:ident}), *) => {
impl Key for Column {
Expand Down Expand Up @@ -224,7 +224,6 @@ impl<'r> KeyRef<'r> for Column {
}
}

#[macro_export]
macro_rules! implement_decode_col {
([], $({$Type:ty, $Datatype:ident}), *) => {
impl Decode for Column {
Expand All @@ -248,8 +247,8 @@ macro_rules! implement_decode_col {
DecodeError::Fusio(error) => error,
DecodeError::Inner(error) => fusio::Error::Other(Box::new(error)),
},
)?) as Arc<dyn Any>,
false => Arc::new(<$Type>::decode(reader).await?) as Arc<dyn Any>,
)?) as Arc<dyn Any + Send + Sync>,
false => Arc::new(<$Type>::decode(reader).await?) as Arc<dyn Any + Send + Sync>,
},
)*
};
Expand All @@ -265,7 +264,6 @@ macro_rules! implement_decode_col {
}
}

#[macro_export]
macro_rules! implement_encode_col {
([], $({$Type:ty, $Datatype:ident}), *) => {
impl Encode for Column {
Expand Down Expand Up @@ -374,7 +372,6 @@ impl From<&Column> for Field {
}
}

#[macro_export]
macro_rules! for_datatype {
($macro:tt $(, $x:tt)*) => {
$macro! {
Expand Down
5 changes: 1 addition & 4 deletions src/record/runtime/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl DynRecord {
pub(crate) fn empty_record(column_descs: Vec<ColumnDesc>, primary_index: usize) -> DynRecord {
let mut columns = vec![];
for desc in column_descs.iter() {
let value: Arc<dyn Any> = match desc.datatype {
let value: Arc<dyn Any + Send + Sync> = match desc.datatype {
Datatype::UInt8 => match desc.is_nullable {
true => Arc::<Option<u8>>::new(None),
false => Arc::new(u8::default()),
Expand Down Expand Up @@ -274,9 +274,6 @@ impl Record for DynRecord {
}
}

unsafe impl Send for DynRecord {}
unsafe impl Sync for DynRecord {}

#[cfg(test)]
pub(crate) mod test {
use std::sync::Arc;
Expand Down
21 changes: 9 additions & 12 deletions src/record/runtime/record_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,32 +168,32 @@ impl<'r> RecordRef<'r> for DynRecordRef<'r> {
let v = col.as_string::<i32>();

if primary_index == idx - 2 {
Arc::new(v.value(offset).to_owned()) as Arc<dyn Any>
Arc::new(v.value(offset).to_owned()) as Arc<dyn Any + Send + Sync>
} else {
let value = (!v.is_null(offset) && projection_mask.leaf_included(idx))
.then_some(v.value(offset).to_owned());
Arc::new(value) as Arc<dyn Any>
Arc::new(value) as Arc<dyn Any + Send + Sync>
}
}
Datatype::Boolean => {
let v = col.as_boolean();

if primary_index == idx - 2 {
Arc::new(v.value(offset).to_owned()) as Arc<dyn Any>
Arc::new(v.value(offset).to_owned()) as Arc<dyn Any + Send + Sync>
} else {
let value = (!v.is_null(offset) && projection_mask.leaf_included(idx))
.then_some(v.value(offset).to_owned());
Arc::new(value) as Arc<dyn Any>
Arc::new(value) as Arc<dyn Any + Send + Sync>
}
}
Datatype::Bytes => {
let v = col.as_binary::<i32>();
if primary_index == idx - 2 {
Arc::new(v.value(offset).to_owned()) as Arc<dyn Any>
Arc::new(v.value(offset).to_owned()) as Arc<dyn Any + Send + Sync>
} else {
let value = (!v.is_null(offset) && projection_mask.leaf_included(idx))
.then_some(v.value(offset).to_owned());
Arc::new(value) as Arc<dyn Any>
Arc::new(value) as Arc<dyn Any + Send + Sync>
}
}
};
Expand Down Expand Up @@ -241,21 +241,18 @@ impl<'r> DynRecordRef<'r> {
idx: usize,
projection_mask: &'r parquet::arrow::ProjectionMask,
primary: bool,
) -> Arc<dyn Any>
) -> Arc<dyn Any + Send + Sync>
where
T: ArrowPrimitiveType,
{
let v = col.as_primitive::<T>();

if primary {
Arc::new(v.value(offset)) as Arc<dyn Any>
Arc::new(v.value(offset)) as Arc<dyn Any + Send + Sync>
} else {
let value = (!v.is_null(offset) && projection_mask.leaf_included(idx))
.then_some(v.value(offset));
Arc::new(value) as Arc<dyn Any>
Arc::new(value) as Arc<dyn Any + Send + Sync>
}
}
}

unsafe impl<'r> Send for DynRecordRef<'r> {}
unsafe impl<'r> Sync for DynRecordRef<'r> {}

0 comments on commit b509436

Please sign in to comment.