Skip to content

Commit

Permalink
Fix issue 3: limit block dispatch retries (#14)
Browse files Browse the repository at this point in the history
* revert cccecec

* fix issue #3

* refactor decode/dispatch for new reordering thread
  • Loading branch information
github-af authored Jul 29, 2024
1 parent 4b34be0 commit 2047632
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 222 deletions.
7 changes: 0 additions & 7 deletions doc/parameters.rst
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,6 @@ Then, on the logical level, fountain codes operates on blocks. If blocks reorder
The default value for an encoding block is 60000, and repair block size is defaulted to 10% of this value (6000).
See the :ref:`Tweaking parameters` chapter for more details on how to choose optimal values for your particular use case and devices.

Because of how UDP works, blocks may sometimes be received in a wildly different order: you may start receiving block number 12 before you finished receiving all packets for block number 10. Because of that, lidi will keep track of a given amount of blocks before considering them lost (up to 8 by default).

.. code-block::
--reblock_retention_window <nb_blocks>
on the receiver side
Multiplexing
------------

Expand Down
14 changes: 0 additions & 14 deletions src/bin/diode-receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ struct Config {
encoding_block_size: u64,
repair_block_size: u32,
udp_buffer_size: u32,
reblock_retention_window: u8,
flush_timeout: time::Duration,
nb_decoding_threads: u8,
to: ClientConfig,
Expand Down Expand Up @@ -97,14 +96,6 @@ fn command_args() -> Config {
.value_parser(clap::value_parser!(u32).range(..1073741824))
.help("Size of UDP socket recv buffer"),
)
.arg(
Arg::new("reblock_retention_window")
.long("reblock_retention_window")
.value_name("reblock_retention_window")
.default_value("8")
.value_parser(clap::value_parser!(u8).range(1..128))
.help("Higher value increases resilience to blocks getting mixed up"),
)
.arg(
Arg::new("flush_timeout")
.long("flush_timeout")
Expand Down Expand Up @@ -147,9 +138,6 @@ fn command_args() -> Config {
let nb_decoding_threads = *args.get_one::<u8>("nb_decoding_threads").expect("default");
let encoding_block_size = *args.get_one::<u64>("encoding_block_size").expect("default");
let udp_buffer_size = *args.get_one::<u32>("udp_buffer_size").expect("default");
let reblock_retention_window = *args
.get_one::<u8>("reblock_retention_window")
.expect("default");
let repair_block_size = *args.get_one::<u32>("repair_block_size").expect("default");
let flush_timeout = time::Duration::from_millis(
args.get_one::<NonZeroU64>("flush_timeout")
Expand Down Expand Up @@ -182,7 +170,6 @@ fn command_args() -> Config {
encoding_block_size,
repair_block_size,
udp_buffer_size,
reblock_retention_window,
flush_timeout,
to,
heartbeat,
Expand Down Expand Up @@ -251,7 +238,6 @@ fn main() {
encoding_block_size: config.encoding_block_size,
repair_block_size: config.repair_block_size,
udp_buffer_size: config.udp_buffer_size,
reblock_retention_window: config.reblock_retention_window,
flush_timeout: config.flush_timeout,
nb_decoding_threads: config.nb_decoding_threads,
heartbeat_interval: config.heartbeat,
Expand Down
6 changes: 5 additions & 1 deletion src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,15 @@ impl Message {

impl fmt::Display for Message {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
let msg_type = match self.message_type() {
Err(e) => format!("UNKNOWN {e}"),
Ok(t) => t.to_string(),
};
write!(
fmt,
"client {:x} message = {} data = {} byte(s)",
self.client_id(),
self.message_type().map_err(|_| fmt::Error)?,
msg_type,
self.payload_len()
)
}
Expand Down
31 changes: 17 additions & 14 deletions src/receive/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@ pub(crate) fn start<F>(receiver: &receive::Receiver<F>) -> Result<(), receive::E
loop {
let (block_id, packets) = receiver.for_decoding.recv()?;

let packets = match packets {
None => {
log::warn!("synchronization lost received, propagating");
// Sending lost synchronization signal to reorder thread
receiver.to_reordering.send((block_id, None))?;
continue;
}
Some(packets) => packets,
};

log::trace!(
"trying to decode block {block_id} with {} packets",
packets.len()
Expand All @@ -21,22 +31,15 @@ pub(crate) fn start<F>(receiver: &receive::Receiver<F>) -> Result<(), receive::E

match decoder.decode(packets) {
None => {
log::warn!("lost block {block_id}");
continue;
log::error!("lost block {block_id}, synchronization lost");
// Sending lost synchronization signal to reorder thread
receiver.to_reordering.send((block_id, None))?;
}
Some(block) => {
log::trace!("block {} decoded with {} bytes!", block_id, block.len());

loop {
let mut to_receive = receiver.block_to_receive.lock().expect("acquire lock");
if *to_receive == block_id {
receiver
.to_dispatch
.send(protocol::Message::deserialize(block))?;
*to_receive = to_receive.wrapping_add(1);
break;
}
}
log::trace!("block {block_id} decoded with {} bytes!", block.len());
receiver
.to_reordering
.send((block_id, Some(protocol::Message::deserialize(block))))?;
}
}
}
Expand Down
33 changes: 32 additions & 1 deletion src/receive/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,31 @@ pub(crate) fn start<F>(receiver: &receive::Receiver<F>) -> Result<(), receive::E
receiver.for_dispatch.recv()?
};

let message = match message {
Some(m) => m,
None => {
// Synchonization has been lost
// Marking all active transfers as failed
for (client_id, client_sendq) in active_transfers {
let message = protocol::Message::new(
protocol::MessageType::Abort,
receiver.to_buffer_size as u32,
client_id,
None,
);

if let Err(e) = client_sendq.send(message) {
log::error!("failed to send payload to client {client_id:x}: {e}");
}

failed_transfers.insert(client_id);
ended_transfers.insert(client_id, client_sendq);
}
active_transfers = BTreeMap::new();
continue;
}
};

log::trace!("received {message}");

let client_id = message.client_id();
Expand All @@ -46,7 +71,13 @@ pub(crate) fn start<F>(receiver: &receive::Receiver<F>) -> Result<(), receive::E
continue;
}

let message_type = message.message_type()?;
let message_type = match message.message_type() {
Err(e) => {
log::error!("message of UNKNOWN type received ({e}), dropping it");
continue;
}
Ok(mt) => mt,
};

let mut will_end = false;

Expand Down
56 changes: 38 additions & 18 deletions src/receive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ use std::{
io::{self, Write},
net,
os::fd::AsRawFd,
sync, thread, time,
thread, time,
};

mod client;
mod clients;
mod decoding;
mod dispatch;
mod reblock;
mod reordering;
mod udp;

pub struct Config {
Expand All @@ -39,7 +40,6 @@ pub struct Config {
pub encoding_block_size: u64,
pub repair_block_size: u32,
pub udp_buffer_size: u32,
pub reblock_retention_window: u8,
pub flush_timeout: time::Duration,
pub nb_decoding_threads: u8,
pub heartbeat_interval: Option<time::Duration>,
Expand All @@ -62,8 +62,9 @@ impl Config {
pub enum Error {
Io(io::Error),
SendPackets(crossbeam_channel::SendError<Vec<raptorq::EncodingPacket>>),
SendBlockPackets(crossbeam_channel::SendError<(u8, Vec<raptorq::EncodingPacket>)>),
SendMessage(crossbeam_channel::SendError<protocol::Message>),
SendBlockPackets(crossbeam_channel::SendError<(u8, Option<Vec<raptorq::EncodingPacket>>)>),
SendBlockMessage(crossbeam_channel::SendError<(u8, Option<protocol::Message>)>),
SendMessage(crossbeam_channel::SendError<Option<protocol::Message>>),
SendClients(
crossbeam_channel::SendError<(
protocol::ClientId,
Expand All @@ -81,6 +82,7 @@ impl fmt::Display for Error {
Self::Io(e) => write!(fmt, "I/O error: {e}"),
Self::SendPackets(e) => write!(fmt, "crossbeam send packets error: {e}"),
Self::SendBlockPackets(e) => write!(fmt, "crossbeam send block packets error: {e}"),
Self::SendBlockMessage(e) => write!(fmt, "crossbeam send block/message error: {e}"),
Self::SendMessage(e) => write!(fmt, "crossbeam send message error: {e}"),
Self::SendClients(e) => write!(fmt, "crossbeam send client error: {e}"),
Self::Receive(e) => write!(fmt, "crossbeam receive error: {e}"),
Expand All @@ -102,15 +104,21 @@ impl From<crossbeam_channel::SendError<Vec<raptorq::EncodingPacket>>> for Error
}
}

impl From<crossbeam_channel::SendError<(u8, Vec<raptorq::EncodingPacket>)>> for Error {
fn from(e: crossbeam_channel::SendError<(u8, Vec<raptorq::EncodingPacket>)>) -> Self {
impl From<crossbeam_channel::SendError<(u8, Option<Vec<raptorq::EncodingPacket>>)>> for Error {
fn from(e: crossbeam_channel::SendError<(u8, Option<Vec<raptorq::EncodingPacket>>)>) -> Self {
Self::SendBlockPackets(e)
}
}

impl From<crossbeam_channel::SendError<protocol::Message>> for Error {
fn from(e: crossbeam_channel::SendError<protocol::Message>) -> Self {
Self::SendMessage(e)
impl From<crossbeam_channel::SendError<(u8, Option<protocol::Message>)>> for Error {
fn from(oe: crossbeam_channel::SendError<(u8, Option<protocol::Message>)>) -> Self {
Self::SendBlockMessage(oe)
}
}

impl From<crossbeam_channel::SendError<Option<protocol::Message>>> for Error {
fn from(oe: crossbeam_channel::SendError<Option<protocol::Message>>) -> Self {
Self::SendMessage(oe)
}
}

Expand Down Expand Up @@ -158,13 +166,16 @@ pub struct Receiver<F> {
pub(crate) to_buffer_size: usize,
pub(crate) from_max_messages: u16,
pub(crate) multiplex_control: semaphore::Semaphore,
pub(crate) block_to_receive: sync::Mutex<u8>,
pub(crate) resync_needed_block_id: crossbeam_utils::atomic::AtomicCell<(bool, u8)>,
pub(crate) to_reblock: crossbeam_channel::Sender<Vec<raptorq::EncodingPacket>>,
pub(crate) for_reblock: crossbeam_channel::Receiver<Vec<raptorq::EncodingPacket>>,
pub(crate) to_decoding: crossbeam_channel::Sender<(u8, Vec<raptorq::EncodingPacket>)>,
pub(crate) for_decoding: crossbeam_channel::Receiver<(u8, Vec<raptorq::EncodingPacket>)>,
pub(crate) to_dispatch: crossbeam_channel::Sender<protocol::Message>,
pub(crate) for_dispatch: crossbeam_channel::Receiver<protocol::Message>,
pub(crate) to_decoding: crossbeam_channel::Sender<(u8, Option<Vec<raptorq::EncodingPacket>>)>,
pub(crate) for_decoding:
crossbeam_channel::Receiver<(u8, Option<Vec<raptorq::EncodingPacket>>)>,
pub(crate) to_reordering: crossbeam_channel::Sender<(u8, Option<protocol::Message>)>,
pub(crate) for_reordering: crossbeam_channel::Receiver<(u8, Option<protocol::Message>)>,
pub(crate) to_dispatch: crossbeam_channel::Sender<Option<protocol::Message>>,
pub(crate) for_dispatch: crossbeam_channel::Receiver<Option<protocol::Message>>,
pub(crate) to_clients: crossbeam_channel::Sender<(
protocol::ClientId,
crossbeam_channel::Receiver<protocol::Message>,
Expand Down Expand Up @@ -199,13 +210,16 @@ where

let multiplex_control = semaphore::Semaphore::new(config.nb_clients as usize);

let block_to_receive = sync::Mutex::new(0);
let resync_needed_block_id = crossbeam_utils::atomic::AtomicCell::default();

let (to_reblock, for_reblock) =
crossbeam_channel::unbounded::<Vec<raptorq::EncodingPacket>>();
let (to_decoding, for_decoding) =
crossbeam_channel::unbounded::<(u8, Vec<raptorq::EncodingPacket>)>();
let (to_dispatch, for_dispatch) = crossbeam_channel::unbounded::<protocol::Message>();
crossbeam_channel::unbounded::<(u8, Option<Vec<raptorq::EncodingPacket>>)>();
let (to_reordering, for_reordering) =
crossbeam_channel::unbounded::<(u8, Option<protocol::Message>)>();
let (to_dispatch, for_dispatch) =
crossbeam_channel::unbounded::<Option<protocol::Message>>();

let (to_clients, for_clients) = crossbeam_channel::bounded::<(
protocol::ClientId,
Expand All @@ -218,11 +232,13 @@ where
to_buffer_size,
from_max_messages,
multiplex_control,
block_to_receive,
resync_needed_block_id,
to_reblock,
for_reblock,
to_decoding,
for_decoding,
to_reordering,
for_reordering,
to_dispatch,
for_dispatch,
to_clients,
Expand Down Expand Up @@ -273,6 +289,10 @@ where
.name("dispatch".to_string())
.spawn_scoped(scope, || dispatch::start(self))?;

thread::Builder::new()
.name("reordering".to_string())
.spawn_scoped(scope, || reordering::start(self))?;

for i in 0..self.config.nb_decoding_threads {
thread::Builder::new()
.name(format!("decoding_{i}"))
Expand Down
Loading

0 comments on commit 2047632

Please sign in to comment.