diff --git a/doc/parameters.rst b/doc/parameters.rst index 1de49b0..b8bfcc8 100644 --- a/doc/parameters.rst +++ b/doc/parameters.rst @@ -132,13 +132,6 @@ Then, on the logical level, fountain codes operates on blocks. If blocks reorder The default value for an encoding block is 60000, and repair block size is defaulted to 10% of this value (6000). See the :ref:`Tweaking parameters` chapter for more details on how to choose optimal values for your particular use case and devices. -Because of how UDP works, blocks may sometimes be received in a wildly different order: you may start receiving block number 12 before you finished receiving all packets for block number 10. Because of that, lidi will keep track of a given amount of blocks before considering them lost (up to 8 by default). - -.. code-block:: - - --reblock_retention_window - on the receiver side - Multiplexing ------------ diff --git a/src/bin/diode-receive.rs b/src/bin/diode-receive.rs index 2ac413b..b1803a7 100644 --- a/src/bin/diode-receive.rs +++ b/src/bin/diode-receive.rs @@ -18,7 +18,6 @@ struct Config { encoding_block_size: u64, repair_block_size: u32, udp_buffer_size: u32, - reblock_retention_window: u8, flush_timeout: time::Duration, nb_decoding_threads: u8, to: ClientConfig, @@ -97,14 +96,6 @@ fn command_args() -> Config { .value_parser(clap::value_parser!(u32).range(..1073741824)) .help("Size of UDP socket recv buffer"), ) - .arg( - Arg::new("reblock_retention_window") - .long("reblock_retention_window") - .value_name("reblock_retention_window") - .default_value("8") - .value_parser(clap::value_parser!(u8).range(1..128)) - .help("Higher value increases resilience to blocks getting mixed up"), - ) .arg( Arg::new("flush_timeout") .long("flush_timeout") @@ -147,9 +138,6 @@ fn command_args() -> Config { let nb_decoding_threads = *args.get_one::("nb_decoding_threads").expect("default"); let encoding_block_size = *args.get_one::("encoding_block_size").expect("default"); let udp_buffer_size = *args.get_one::("udp_buffer_size").expect("default"); - let reblock_retention_window = *args - .get_one::("reblock_retention_window") - .expect("default"); let repair_block_size = *args.get_one::("repair_block_size").expect("default"); let flush_timeout = time::Duration::from_millis( args.get_one::("flush_timeout") @@ -182,7 +170,6 @@ fn command_args() -> Config { encoding_block_size, repair_block_size, udp_buffer_size, - reblock_retention_window, flush_timeout, to, heartbeat, @@ -251,7 +238,6 @@ fn main() { encoding_block_size: config.encoding_block_size, repair_block_size: config.repair_block_size, udp_buffer_size: config.udp_buffer_size, - reblock_retention_window: config.reblock_retention_window, flush_timeout: config.flush_timeout, nb_decoding_threads: config.nb_decoding_threads, heartbeat_interval: config.heartbeat, diff --git a/src/protocol.rs b/src/protocol.rs index 83435d3..378c650 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -183,11 +183,15 @@ impl Message { impl fmt::Display for Message { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + let msg_type = match self.message_type() { + Err(e) => format!("UNKNOWN {e}"), + Ok(t) => t.to_string(), + }; write!( fmt, "client {:x} message = {} data = {} byte(s)", self.client_id(), - self.message_type().map_err(|_| fmt::Error)?, + msg_type, self.payload_len() ) } diff --git a/src/receive/decoding.rs b/src/receive/decoding.rs index 0322bbb..835a2f8 100644 --- a/src/receive/decoding.rs +++ b/src/receive/decoding.rs @@ -8,6 +8,16 @@ pub(crate) fn start(receiver: &receive::Receiver) -> Result<(), receive::E loop { let (block_id, packets) = receiver.for_decoding.recv()?; + let packets = match packets { + None => { + log::warn!("synchronization lost received, propagating"); + // Sending lost synchronization signal to reorder thread + receiver.to_reordering.send((block_id, None))?; + continue; + } + Some(packets) => packets, + }; + log::trace!( "trying to decode block {block_id} with {} packets", packets.len() @@ -21,22 +31,15 @@ pub(crate) fn start(receiver: &receive::Receiver) -> Result<(), receive::E match decoder.decode(packets) { None => { - log::warn!("lost block {block_id}"); - continue; + log::error!("lost block {block_id}, synchronization lost"); + // Sending lost synchronization signal to reorder thread + receiver.to_reordering.send((block_id, None))?; } Some(block) => { - log::trace!("block {} decoded with {} bytes!", block_id, block.len()); - - loop { - let mut to_receive = receiver.block_to_receive.lock().expect("acquire lock"); - if *to_receive == block_id { - receiver - .to_dispatch - .send(protocol::Message::deserialize(block))?; - *to_receive = to_receive.wrapping_add(1); - break; - } - } + log::trace!("block {block_id} decoded with {} bytes!", block.len()); + receiver + .to_reordering + .send((block_id, Some(protocol::Message::deserialize(block))))?; } } } diff --git a/src/receive/dispatch.rs b/src/receive/dispatch.rs index ade2cfc..84be913 100644 --- a/src/receive/dispatch.rs +++ b/src/receive/dispatch.rs @@ -38,6 +38,31 @@ pub(crate) fn start(receiver: &receive::Receiver) -> Result<(), receive::E receiver.for_dispatch.recv()? }; + let message = match message { + Some(m) => m, + None => { + // Synchonization has been lost + // Marking all active transfers as failed + for (client_id, client_sendq) in active_transfers { + let message = protocol::Message::new( + protocol::MessageType::Abort, + receiver.to_buffer_size as u32, + client_id, + None, + ); + + if let Err(e) = client_sendq.send(message) { + log::error!("failed to send payload to client {client_id:x}: {e}"); + } + + failed_transfers.insert(client_id); + ended_transfers.insert(client_id, client_sendq); + } + active_transfers = BTreeMap::new(); + continue; + } + }; + log::trace!("received {message}"); let client_id = message.client_id(); @@ -46,7 +71,13 @@ pub(crate) fn start(receiver: &receive::Receiver) -> Result<(), receive::E continue; } - let message_type = message.message_type()?; + let message_type = match message.message_type() { + Err(e) => { + log::error!("message of UNKNOWN type received ({e}), dropping it"); + continue; + } + Ok(mt) => mt, + }; let mut will_end = false; diff --git a/src/receive/mod.rs b/src/receive/mod.rs index e2e8b3e..6c002cd 100644 --- a/src/receive/mod.rs +++ b/src/receive/mod.rs @@ -22,7 +22,7 @@ use std::{ io::{self, Write}, net, os::fd::AsRawFd, - sync, thread, time, + thread, time, }; mod client; @@ -30,6 +30,7 @@ mod clients; mod decoding; mod dispatch; mod reblock; +mod reordering; mod udp; pub struct Config { @@ -39,7 +40,6 @@ pub struct Config { pub encoding_block_size: u64, pub repair_block_size: u32, pub udp_buffer_size: u32, - pub reblock_retention_window: u8, pub flush_timeout: time::Duration, pub nb_decoding_threads: u8, pub heartbeat_interval: Option, @@ -62,8 +62,9 @@ impl Config { pub enum Error { Io(io::Error), SendPackets(crossbeam_channel::SendError>), - SendBlockPackets(crossbeam_channel::SendError<(u8, Vec)>), - SendMessage(crossbeam_channel::SendError), + SendBlockPackets(crossbeam_channel::SendError<(u8, Option>)>), + SendBlockMessage(crossbeam_channel::SendError<(u8, Option)>), + SendMessage(crossbeam_channel::SendError>), SendClients( crossbeam_channel::SendError<( protocol::ClientId, @@ -81,6 +82,7 @@ impl fmt::Display for Error { Self::Io(e) => write!(fmt, "I/O error: {e}"), Self::SendPackets(e) => write!(fmt, "crossbeam send packets error: {e}"), Self::SendBlockPackets(e) => write!(fmt, "crossbeam send block packets error: {e}"), + Self::SendBlockMessage(e) => write!(fmt, "crossbeam send block/message error: {e}"), Self::SendMessage(e) => write!(fmt, "crossbeam send message error: {e}"), Self::SendClients(e) => write!(fmt, "crossbeam send client error: {e}"), Self::Receive(e) => write!(fmt, "crossbeam receive error: {e}"), @@ -102,15 +104,21 @@ impl From>> for Error } } -impl From)>> for Error { - fn from(e: crossbeam_channel::SendError<(u8, Vec)>) -> Self { +impl From>)>> for Error { + fn from(e: crossbeam_channel::SendError<(u8, Option>)>) -> Self { Self::SendBlockPackets(e) } } -impl From> for Error { - fn from(e: crossbeam_channel::SendError) -> Self { - Self::SendMessage(e) +impl From)>> for Error { + fn from(oe: crossbeam_channel::SendError<(u8, Option)>) -> Self { + Self::SendBlockMessage(oe) + } +} + +impl From>> for Error { + fn from(oe: crossbeam_channel::SendError>) -> Self { + Self::SendMessage(oe) } } @@ -158,13 +166,16 @@ pub struct Receiver { pub(crate) to_buffer_size: usize, pub(crate) from_max_messages: u16, pub(crate) multiplex_control: semaphore::Semaphore, - pub(crate) block_to_receive: sync::Mutex, + pub(crate) resync_needed_block_id: crossbeam_utils::atomic::AtomicCell<(bool, u8)>, pub(crate) to_reblock: crossbeam_channel::Sender>, pub(crate) for_reblock: crossbeam_channel::Receiver>, - pub(crate) to_decoding: crossbeam_channel::Sender<(u8, Vec)>, - pub(crate) for_decoding: crossbeam_channel::Receiver<(u8, Vec)>, - pub(crate) to_dispatch: crossbeam_channel::Sender, - pub(crate) for_dispatch: crossbeam_channel::Receiver, + pub(crate) to_decoding: crossbeam_channel::Sender<(u8, Option>)>, + pub(crate) for_decoding: + crossbeam_channel::Receiver<(u8, Option>)>, + pub(crate) to_reordering: crossbeam_channel::Sender<(u8, Option)>, + pub(crate) for_reordering: crossbeam_channel::Receiver<(u8, Option)>, + pub(crate) to_dispatch: crossbeam_channel::Sender>, + pub(crate) for_dispatch: crossbeam_channel::Receiver>, pub(crate) to_clients: crossbeam_channel::Sender<( protocol::ClientId, crossbeam_channel::Receiver, @@ -199,13 +210,16 @@ where let multiplex_control = semaphore::Semaphore::new(config.nb_clients as usize); - let block_to_receive = sync::Mutex::new(0); + let resync_needed_block_id = crossbeam_utils::atomic::AtomicCell::default(); let (to_reblock, for_reblock) = crossbeam_channel::unbounded::>(); let (to_decoding, for_decoding) = - crossbeam_channel::unbounded::<(u8, Vec)>(); - let (to_dispatch, for_dispatch) = crossbeam_channel::unbounded::(); + crossbeam_channel::unbounded::<(u8, Option>)>(); + let (to_reordering, for_reordering) = + crossbeam_channel::unbounded::<(u8, Option)>(); + let (to_dispatch, for_dispatch) = + crossbeam_channel::unbounded::>(); let (to_clients, for_clients) = crossbeam_channel::bounded::<( protocol::ClientId, @@ -218,11 +232,13 @@ where to_buffer_size, from_max_messages, multiplex_control, - block_to_receive, + resync_needed_block_id, to_reblock, for_reblock, to_decoding, for_decoding, + to_reordering, + for_reordering, to_dispatch, for_dispatch, to_clients, @@ -273,6 +289,10 @@ where .name("dispatch".to_string()) .spawn_scoped(scope, || dispatch::start(self))?; + thread::Builder::new() + .name("reordering".to_string()) + .spawn_scoped(scope, || reordering::start(self))?; + for i in 0..self.config.nb_decoding_threads { thread::Builder::new() .name(format!("decoding_{i}")) diff --git a/src/receive/reblock.rs b/src/receive/reblock.rs index 2d11550..768659b 100644 --- a/src/receive/reblock.rs +++ b/src/receive/reblock.rs @@ -2,52 +2,6 @@ //! reordering use crate::{protocol, receive}; -use std::collections::HashMap; - -const BLOCK_GAP_WARNING_THRESHOLD: u8 = 2; - -// Discards queues in Hashmap queues_by_block_id, where block_id is not between -// (leading_block_id - window_size) and (leading_block_id). -// Raises warnings for each discarded queue containing partial data (len < nb_normal_packets) -fn discard_queues_outside_retention_window( - leading_block_id: u8, - window_size: u8, - queues_by_block_id: &mut HashMap>, - nb_normal_packets: usize, -) { - queues_by_block_id.retain(|&k, v| { - let retain = is_in_retention_window(k, leading_block_id, window_size); - if !retain && (v.len() < nb_normal_packets) { - log::warn!("discarding incomplete block {k} (currently on block {leading_block_id})") - } - retain - }); -} - -// Returns true if block_id is between (leading_block_id - window_size) and -// leading_block_id; otherwise, returns false. -fn is_in_retention_window(block_id: u8, leading_block_id: u8, window_size: u8) -> bool { - is_in_wrapped_interval( - block_id, - ( - leading_block_id.wrapping_sub(window_size - 1), - leading_block_id, - ), - ) -} - -// Returns true if value is between (leading_block_id - window_size) and -// leading_block_id; otherwise, returns false. -fn is_in_wrapped_interval(value: u8, interval: (u8, u8)) -> bool { - let (lower_bound, upper_bound) = interval; - if lower_bound < upper_bound { - // continuous interval like within 32-48 - lower_bound <= value && value <= upper_bound - } else { - // wrapped interval like (0-8 or 248-255) - value <= upper_bound || lower_bound <= value - } -} pub(crate) fn start(receiver: &receive::Receiver) -> Result<(), receive::Error> { let nb_normal_packets = protocol::nb_encoding_packets(&receiver.object_transmission_info); @@ -55,16 +9,12 @@ pub(crate) fn start(receiver: &receive::Receiver) -> Result<(), receive::E &receiver.object_transmission_info, receiver.config.repair_block_size, ); - let reblock_retention_window = receiver.config.reblock_retention_window; - - let mut next_block_id_overwrites_leading = true; + let mut desynchro = true; let capacity = nb_normal_packets as usize + nb_repair_packets as usize; - - let mut leading_block_id: u8 = 0; - let mut next_sendable_block_id: u8 = 0; - let mut queues_by_block_id: HashMap> = - HashMap::with_capacity(reblock_retention_window as usize); + let mut prev_queue: Option> = None; + let mut queue = Vec::with_capacity(capacity); + let mut block_id = 0; loop { let packets = match receiver @@ -72,9 +22,27 @@ pub(crate) fn start(receiver: &receive::Receiver) -> Result<(), receive::E .recv_timeout(receiver.config.flush_timeout) { Err(crossbeam_channel::RecvTimeoutError::Timeout) => { - log::trace!("timeout while waiting for next packets"); - queues_by_block_id.clear(); - next_block_id_overwrites_leading = true; + let qlen = queue.len(); + if 0 < qlen { + // no more traffic but ongoing block, trying to decode + if nb_normal_packets as usize <= qlen { + log::debug!("flushing block {block_id} with {qlen} packets"); + receiver.to_decoding.send((block_id, Some(queue)))?; + block_id = block_id.wrapping_add(1); + } else { + log::debug!( + "not enough packets ({qlen} packets) to decode block {block_id}" + ); + log::warn!("lost block {block_id}"); + receiver.to_decoding.send((block_id, None))?; + desynchro = true; + } + queue = Vec::with_capacity(capacity); + prev_queue = None; + } else { + // without data for some time we reset the current block_id + desynchro = true; + } continue; } Err(e) => return Err(receive::Error::from(e)), @@ -83,131 +51,63 @@ pub(crate) fn start(receiver: &receive::Receiver) -> Result<(), receive::E for packet in packets { let payload_id = packet.payload_id(); - let packet_block_id = payload_id.source_block_number(); - - if next_block_id_overwrites_leading { - log::debug!("new leading block id: ({packet_block_id})"); - leading_block_id = packet_block_id; - next_sendable_block_id = packet_block_id; - next_block_id_overwrites_leading = false; - } else if !is_in_retention_window( - packet_block_id, - leading_block_id, - reblock_retention_window, - ) { - log::debug!("new leading block id: {packet_block_id} (was {leading_block_id})"); - if !is_in_wrapped_interval( - packet_block_id, - ( - leading_block_id, - leading_block_id + BLOCK_GAP_WARNING_THRESHOLD, - ), - ) { - log::warn!("large gap in block sequence (received {packet_block_id} while on block {leading_block_id})") - } - leading_block_id = packet_block_id; - - log::debug!("discarding all packets for blocks outside new retention window"); - discard_queues_outside_retention_window( - leading_block_id, - reblock_retention_window, - &mut queues_by_block_id, - nb_normal_packets as usize, - ); + let message_block_id = payload_id.source_block_number(); - // update next_sendable_block_id if now outside window - if !is_in_retention_window( - next_sendable_block_id, - leading_block_id, - reblock_retention_window, - ) { - next_sendable_block_id = - leading_block_id.wrapping_sub(reblock_retention_window - 1); - log::debug!("bumped next_sendable_block_id to {next_sendable_block_id}"); + if desynchro { + block_id = message_block_id; + receiver.resync_needed_block_id.store((true, block_id)); + desynchro = false; + } - // check sendable queues - loop { - let queue = queues_by_block_id - .entry(next_sendable_block_id) - .or_insert(Vec::with_capacity(capacity)); - let qlen = queue.len(); - let queue_packet_id = next_sendable_block_id; + if message_block_id == block_id { + log::trace!("queueing in block {block_id}"); + queue.push(packet); + continue; + } - if (nb_normal_packets as usize) > qlen { - break; - } - log::debug!("trying to decode block {queue_packet_id} with {qlen} packets"); + if message_block_id.wrapping_add(1) == block_id { + //packet is from previous block; is this block parked ? + if let Some(mut pqueue) = prev_queue { + pqueue.push(packet); + if nb_normal_packets as usize <= pqueue.len() { + //now there is enough packets to decode it receiver .to_decoding - .send((queue_packet_id, queue.to_vec()))?; - next_sendable_block_id = next_sendable_block_id.wrapping_add(1); + .send((message_block_id, Some(pqueue)))?; + prev_queue = None; + } else { + prev_queue = Some(pqueue); } } + continue; } - // push packet into queue - let mut queue = queues_by_block_id - .entry(packet_block_id) - .or_insert(Vec::with_capacity(capacity)); - let mut qlen = queue.len(); - let mut queue_packet_id = packet_block_id; + if message_block_id != block_id.wrapping_add(1) { + log::warn!("discarding packet with block_id {message_block_id} (current block_id is {block_id})"); + continue; + } - log::trace!("queueing packet for block {packet_block_id}"); - queue.push(packet); + //this is the first packet of the next block - // send block if enough packets - while nb_normal_packets as usize == qlen { - if next_sendable_block_id != queue_packet_id { - log::debug!("ready to decode block {queue_packet_id} with {qlen} packets, but still waiting on block {next_sendable_block_id}"); - break; + if nb_normal_packets as usize <= queue.len() { + //enough packets in the current block to decode it + receiver.to_decoding.send((block_id, Some(queue)))?; + if prev_queue.is_some() { + log::warn!("lost block {}", block_id.wrapping_sub(1)); } - - log::debug!("trying to decode block {queue_packet_id} with {qlen} packets"); - receiver - .to_decoding - .send((queue_packet_id, queue.to_vec()))?; - - next_sendable_block_id = next_sendable_block_id.wrapping_add(1); - - // check if next queue is ready to send - queue = queues_by_block_id - .entry(next_sendable_block_id) - .or_insert(Vec::with_capacity(capacity)); - qlen = queue.len(); - queue_packet_id = next_sendable_block_id; + prev_queue = None; + } else { + //not enough packet, parking the current block + prev_queue = Some(queue); } - } - } -} -#[cfg(test)] -mod tests { - use super::is_in_wrapped_interval; + //starting the next block + + block_id = message_block_id; - #[test] - fn test_is_in_wrapped_interval() { - for (value, lower_bound, upper_bound, expected_result) in vec![ - (0, 0, 0, true), - (0, 0, 255, true), - // continuous interval - (29, 30, 40, false), - (30, 30, 40, true), - (31, 30, 40, true), - (40, 30, 40, true), - (41, 30, 40, false), - // wrapping - (29, 40, 30, true), - (30, 40, 30, true), - (31, 40, 30, false), - (40, 40, 30, true), - (41, 40, 30, true), - // edge cases - (0, 255, 0, true), - (3, 255, 0, false), - (255, 255, 0, true), - ] { - let res = is_in_wrapped_interval(value, (lower_bound, upper_bound)); - assert_eq!(expected_result, res, "expected {expected_result}; got {res}. value: {value}; lower_bound: {lower_bound}; upper_bound: {upper_bound}"); + log::trace!("queueing in block {block_id}"); + queue = Vec::with_capacity(capacity); + queue.push(packet); } } } diff --git a/src/receive/reordering.rs b/src/receive/reordering.rs new file mode 100644 index 0000000..5fa551f --- /dev/null +++ b/src/receive/reordering.rs @@ -0,0 +1,62 @@ +//! Worker that reorders received messages according to block numbers + +use crate::receive; + +pub(crate) fn start(receiver: &receive::Receiver) -> Result<(), receive::Error> { + let mut block_to_receive = 0; + let mut pending_messages = [const { None }; u8::MAX as usize + 1]; + + loop { + let (block_id, message) = receiver.for_reordering.recv()?; + + if message.is_none() { + // Synchronization lost, dropping everything + log::warn!("synchronization lost received, dropping everything, propagating it"); + pending_messages.fill_with(|| None); + receiver.to_dispatch.send(None)?; + continue; + } + + let (resync_needed, resync_block_id) = receiver.resync_needed_block_id.take(); + + if resync_needed { + log::debug!("forced resynchronization"); + if pending_messages.iter().any(Option::is_some) { + log::warn!("forced resynchronization with pending messages, dropping everything"); + pending_messages.fill_with(|| None); + } + block_to_receive = resync_block_id; + } + + log::debug!("received block {block_id}, expecting block {block_to_receive}"); + + if block_to_receive == block_id { + let message = if pending_messages[block_to_receive as usize].is_some() { + // a message was already pending + // using the old one, storing the newly received one + pending_messages[block_to_receive as usize] + .replace(message) + .expect("infallible") + } else { + // no message was pending, using the newly received one + message + }; + + receiver.to_dispatch.send(message)?; + block_to_receive = block_to_receive.wrapping_add(1); + + // flushing as much as possible further pending blocks + while let Some(message) = pending_messages[block_to_receive as usize].take() { + receiver.to_dispatch.send(message)?; + block_to_receive = block_to_receive.wrapping_add(1); + } + } else if pending_messages[block_id as usize] + .replace(message) + .is_some() + { + log::error!("received a new block {block_id} but existing one was not sent to dispatch, synchronization lost, dropping everything"); + pending_messages.fill_with(|| None); + receiver.to_dispatch.send(None)?; + } + } +}