Skip to content

Commit

Permalink
try to fix issue ANSSI-FR#3
Browse files Browse the repository at this point in the history
  • Loading branch information
github-af committed Jul 10, 2024
1 parent ddd9eda commit 7667db5
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 30 deletions.
49 changes: 22 additions & 27 deletions src/receive/decoding.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
//! Worker that decodes RaptorQ packets into protocol messages

use std::{cmp::Ordering, thread::yield_now};

use crate::{protocol, receive};

pub(crate) fn start<F>(receiver: &receive::Receiver<F>) -> Result<(), receive::Error> {
pub(crate) fn start<F>(
receiver: &receive::Receiver<F>,
nb_decoding_threads: u8,
) -> Result<(), receive::Error> {
let encoding_block_size = receiver.object_transmission_info.transfer_length();

loop {
Expand All @@ -23,38 +24,32 @@ pub(crate) fn start<F>(receiver: &receive::Receiver<F>) -> Result<(), receive::E

match decoder.decode(packets) {
None => {
log::warn!("lost block {block_id}");
log::error!("lost block {block_id}, synchronization lost");
continue;
}
Some(block) => {
log::trace!("block {} decoded with {} bytes!", block_id, block.len());

let mut retry_cnt = 0;
log::trace!("block {block_id} decoded with {} bytes!", block.len());

loop {
let mut retried = 0;
let mut to_receive = receiver.block_to_receive.lock().expect("acquire lock");
match block_id.cmp(&to_receive) {
Ordering::Equal => {
receiver
.to_dispatch
.send(protocol::Message::deserialize(block))?;
*to_receive = to_receive.wrapping_add(1);
break;
}
Ordering::Greater => {
// Thread is too late, drop the packet and kill the current job
log::warn!("Dropping the packet {block_id}");

if *to_receive == block_id {
// The decoded block is the expected one, dispatching it
receiver
.to_dispatch
.send(protocol::Message::deserialize(block))?;
*to_receive = to_receive.wrapping_add(1);
break;
} else {
// The decoded block is not the expected one
// Retrying until all decoding threads had one chance to dispatch their block
retried += 1;
if nb_decoding_threads < retried {
// All decoding threads should have had one chance to dispatch their block
log::warn!("dropping block {block_id} after trying to dispatch it {retried} times");
break;
}
Ordering::Less => {
if retry_cnt < 10 {
retry_cnt +=1;
yield_now();
} else {
break;
}

}
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions src/receive/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ pub(crate) fn start<F>(receiver: &receive::Receiver<F>) -> Result<(), receive::E
protocol::MessageType::Start => {
let (client_sendq, client_recvq) =
crossbeam_channel::unbounded::<protocol::Message>();

active_transfers.insert(client_id, client_sendq);

receiver.to_clients.send((client_id, client_recvq))?;
}

Expand Down
4 changes: 3 additions & 1 deletion src/receive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,9 @@ where
for i in 0..self.config.nb_decoding_threads {
thread::Builder::new()
.name(format!("decoding_{i}"))
.spawn_scoped(scope, || decoding::start(self))?;
.spawn_scoped(scope, || {
decoding::start(self, self.config.nb_decoding_threads)
})?;
}

thread::Builder::new()
Expand Down

0 comments on commit 7667db5

Please sign in to comment.