Skip to content

Commit

Permalink
try to fix issue ANSSI-FR#3
Browse files Browse the repository at this point in the history
  • Loading branch information
github-af committed Jul 10, 2024
1 parent ddd9eda commit ce5014b
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 37 deletions.
13 changes: 9 additions & 4 deletions src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ impl From<io::Error> for Error {

pub(crate) enum MessageType {
Heartbeat,
LostSynchronization,
Start,
Data,
Abort,
Expand All @@ -66,6 +67,7 @@ impl MessageType {
fn serialized(self) -> u8 {
match self {
Self::Heartbeat => ID_HEARTBEAT,
Self::LostSynchronization => ID_LOST_SYNCHRONIZATION,
Self::Start => ID_START,
Self::Data => ID_DATA,
Self::Abort => ID_ABORT,
Expand All @@ -78,6 +80,7 @@ impl fmt::Display for MessageType {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
match self {
Self::Heartbeat => write!(fmt, "Heartbeat"),
Self::LostSynchronization => write!(fmt, "LostSynchronization"),
Self::Start => write!(fmt, "Start"),
Self::Data => write!(fmt, "Data"),
Self::Abort => write!(fmt, "Abort"),
Expand All @@ -87,10 +90,11 @@ impl fmt::Display for MessageType {
}

const ID_HEARTBEAT: u8 = 0x00;
const ID_START: u8 = 0x01;
const ID_DATA: u8 = 0x02;
const ID_ABORT: u8 = 0x03;
const ID_END: u8 = 0x04;
const ID_LOST_SYNCHRONIZATION: u8 = 0x01;
const ID_START: u8 = 0x02;
const ID_DATA: u8 = 0x03;
const ID_ABORT: u8 = 0x04;
const ID_END: u8 = 0x05;

pub(crate) type ClientId = u32;

Expand Down Expand Up @@ -150,6 +154,7 @@ impl Message {
pub(crate) fn message_type(&self) -> Result<MessageType, Error> {
match self.0.get(4) {
Some(&ID_HEARTBEAT) => Ok(MessageType::Heartbeat),
Some(&ID_LOST_SYNCHRONIZATION) => Ok(MessageType::LostSynchronization),
Some(&ID_START) => Ok(MessageType::Start),
Some(&ID_DATA) => Ok(MessageType::Data),
Some(&ID_ABORT) => Ok(MessageType::Abort),
Expand Down
58 changes: 27 additions & 31 deletions src/receive/decoding.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
//! Worker that decodes RaptorQ packets into protocol messages

use std::{cmp::Ordering, thread::yield_now};

use crate::{protocol, receive};

pub(crate) fn start<F>(receiver: &receive::Receiver<F>) -> Result<(), receive::Error> {
Expand All @@ -23,38 +21,36 @@ pub(crate) fn start<F>(receiver: &receive::Receiver<F>) -> Result<(), receive::E

match decoder.decode(packets) {
None => {
log::warn!("lost block {block_id}");
log::error!("lost block {block_id}, synchronization lost");

let message =
protocol::Message::new(protocol::MessageType::LostSynchronization, 0, 0, None);

receiver.to_dispatch.send(message)?;

continue;
}
Some(block) => {
log::trace!("block {} decoded with {} bytes!", block_id, block.len());

let mut retry_cnt = 0;

loop {
let mut to_receive = receiver.block_to_receive.lock().expect("acquire lock");
match block_id.cmp(&to_receive) {
Ordering::Equal => {
receiver
.to_dispatch
.send(protocol::Message::deserialize(block))?;
*to_receive = to_receive.wrapping_add(1);
break;
}
Ordering::Greater => {
// Thread is too late, drop the packet and kill the current job
log::warn!("Dropping the packet {block_id}");
break;
}
Ordering::Less => {
if retry_cnt < 10 {
retry_cnt +=1;
yield_now();
} else {
break;
}

}
log::trace!("block {block_id} decoded with {} bytes!", block.len());

let mut to_receive = receiver.block_to_receive.lock().expect("acquire lock");

if *to_receive == block_id {
receiver
.to_dispatch
.send(protocol::Message::deserialize(block))?;
*to_receive = to_receive.wrapping_add(1);
break;
} else {
if to_receive.wrapping_sub(1) == block_id {
log::warn!("receiving block {block_id} from the past, assuming it was already decoded successfully");
break;
} else {
log::error!("receiving unknown block {block_id}, synchronization lost");
let message =
protocol::Message::new(protocol::MessageType::LostSynchronization, 0, 0, None);
receiver.to_dispatch.send(message)?;
break;
}
}
}
Expand Down
16 changes: 14 additions & 2 deletions src/receive/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,24 @@ pub(crate) fn start<F>(receiver: &receive::Receiver<F>) -> Result<(), receive::E
continue;
}

protocol::MessageType::LostSynchronization => {
log::error!("synchronization lost, aborting all transfers");
for (client_id, client_sendq) in active_transfers.into_iter() {
let message =
protocol::Message::new(protocol::MessageType::Abort, 0, client_id, None);
if let Err(e) = client_sendq.send(message) {
log::warn!("failed to send abnort to client {client_id:x}: {e}");
}
failed_transfers.insert(client_id);
}
active_transfers = BTreeMap::new();
continue;
}

protocol::MessageType::Start => {
let (client_sendq, client_recvq) =
crossbeam_channel::unbounded::<protocol::Message>();

active_transfers.insert(client_id, client_sendq);

receiver.to_clients.send((client_id, client_recvq))?;
}

Expand Down

0 comments on commit ce5014b

Please sign in to comment.