Skip to content

Commit

Permalink
refactor ack
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <igxnon@gmail.com>
  • Loading branch information
iGxnon committed Jul 5, 2024
1 parent 7dc57f4 commit b0b6aeb
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 327 deletions.
13 changes: 11 additions & 2 deletions src/ack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ impl Acknowledgement {
}

pub(crate) fn filter_incoming_ack<F, S>(
self: SharedAck,
self: &SharedAck,
frame: F,
) -> impl Stream<Item = FrameSet<S>>
where
F: Stream<Item = connected::Packet<S>>,
{
IncomingAckFilter {
frame,
ack: Arc::clone(&self),
ack: Arc::clone(self),
}
}

Expand All @@ -82,6 +82,10 @@ impl Acknowledgement {
self.outgoing_nack_tx.send(seq_num);
}

pub(crate) fn outgoing_nack_batch(&self, t: impl IntoIterator<Item = u24>) {
self.outgoing_nack_tx.send_batch(t);
}

// Clear all acknowledged frames
pub(crate) fn poll_ack(&self, resend: &mut ResendMap) {
for ack in self.incoming_ack_rx.try_iter() {
Expand Down Expand Up @@ -116,6 +120,11 @@ impl Acknowledgement {
pub(crate) fn poll_outgoing_nack(&self, mtu: u16) -> Option<AckOrNack> {
AckOrNack::extend_from(self.outgoing_nack_rx.recv_batch(), mtu)
}

// Return whether the outgoing buffer is empty
pub(crate) fn empty(&self) -> bool {
self.outgoing_ack_rx.is_empty() && self.outgoing_nack_rx.is_empty()
}
}

pin_project! {
Expand Down
35 changes: 12 additions & 23 deletions src/client/conn/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ use tokio::net::UdpSocket as TokioUdpSocket;
use tokio_util::udp::UdpFramed;

use super::ConnectTo;
use crate::ack::Acknowledgement;
use crate::client::handler::offline::HandleOffline;
use crate::client::handler::online::HandleOnline;
use crate::codec::tokio::Codec;
use crate::codec::{Decoded, Encoded};
use crate::errors::Error;
use crate::guard::{HandleIncoming, HandleOutgoingAck};
use crate::guard::HandleOutgoing;
use crate::io::{IOImpl, IO};
use crate::utils::{priority_mpsc, Logged};
use crate::utils::Logged;
use crate::{PeerContext, RoleContext};

impl ConnectTo for TokioUdpSocket {
Expand All @@ -24,15 +25,7 @@ impl ConnectTo for TokioUdpSocket {
config: super::Config,
) -> Result<impl IO, Error> {
let socket = Arc::new(self);

let (incoming_ack_tx, incoming_ack_rx) = flume::unbounded();
let (incoming_nack_tx, incoming_nack_rx) = flume::unbounded();

let (outgoing_ack_tx, outgoing_ack_rx) = priority_mpsc::unbounded();
let (outgoing_nack_tx, outgoing_nack_rx) = priority_mpsc::unbounded();

let mut lookups = addrs.to_socket_addrs()?;

let addr = loop {
if let Some(addr) = lookups.next() {
if socket.connect(addr).await.is_ok() {
Expand All @@ -43,12 +36,11 @@ impl ConnectTo for TokioUdpSocket {
return Err(io::Error::new(io::ErrorKind::AddrNotAvailable, "invalid address").into());
};

let ack = Acknowledgement::new_arc(RoleContext::Client);

let write = UdpFramed::new(Arc::clone(&socket), Codec)
.handle_outgoing(
incoming_ack_rx,
incoming_nack_rx,
outgoing_ack_rx,
outgoing_nack_rx,
Arc::clone(&ack),
config.send_buf_cap,
PeerContext {
addr,
Expand All @@ -58,19 +50,16 @@ impl ConnectTo for TokioUdpSocket {
)
.frame_encoded(config.mtu, config.codec_config());

let io = UdpFramed::new(socket, Codec)
let incoming = UdpFramed::new(socket, Codec)
.logged_err(|err| {
debug!("[client] got codec error: {err} when decode offline frames");
})
.handle_offline(addr, config.offline_config())
.await?
.handle_incoming(incoming_ack_tx, incoming_nack_tx)
.decoded(
config.codec_config(),
outgoing_ack_tx,
outgoing_nack_tx,
RoleContext::Client,
)
.await?;

let io = ack
.filter_incoming_ack(incoming)
.decoded(config.codec_config(), Arc::clone(&ack), RoleContext::Client)
.handle_online(write, addr, config.client_guid)
.await?;

Expand Down
34 changes: 12 additions & 22 deletions src/codec/decoder/dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ use futures::{ready, Stream, StreamExt};
use minitrace::local::LocalSpan;
use pin_project_lite::pin_project;

use crate::ack::SharedAck;
use crate::errors::CodecError;
use crate::packet::connected::{FrameSet, Frames};
use crate::utils::{priority_mpsc, u24, BitVecQueue};
use crate::utils::{u24, BitVecQueue};

/// The deduplication window. For each connect, the maximum size is
/// 2 ^ (8 * 3) / 8 / 1024 / 1024 = 2MB.
Expand Down Expand Up @@ -60,32 +61,24 @@ pin_project! {
// Limit the maximum reliable_frame_index gap for a connection. 0 means no limit.
max_gap: usize,
window: DuplicateWindow,
outgoing_ack_tx: priority_mpsc::Sender<u24>,
ack: SharedAck,
}
}

pub(crate) trait Deduplicated: Sized {
fn deduplicated(
self,
max_gap: usize,
outgoing_ack_tx: priority_mpsc::Sender<u24>,
) -> Dedup<Self>;
fn deduplicated(self, max_gap: usize, ack: SharedAck) -> Dedup<Self>;
}

impl<F, B> Deduplicated for F
where
F: Stream<Item = Result<FrameSet<Frames<B>>, CodecError>>,
{
fn deduplicated(
self,
max_gap: usize,
outgoing_ack_tx: priority_mpsc::Sender<u24>,
) -> Dedup<Self> {
fn deduplicated(self, max_gap: usize, ack: SharedAck) -> Dedup<Self> {
Dedup {
frame: self,
max_gap,
window: DuplicateWindow::default(),
outgoing_ack_tx,
ack,
}
}
}
Expand Down Expand Up @@ -125,7 +118,7 @@ where
return Poll::Ready(Some(Ok(frame_set)));
}
// all duplicated, send ack back
this.outgoing_ack_tx.send(frame_set.seq_num);
this.ack.outgoing_ack(frame_set.seq_num);
}
}
}
Expand All @@ -141,6 +134,7 @@ mod test {
use indexmap::IndexSet;

use super::*;
use crate::ack::Acknowledgement;
use crate::errors::CodecError;
use crate::packet::connected::{Flags, Frame, FrameSet};

Expand Down Expand Up @@ -224,12 +218,11 @@ mod test {
}
};
tokio::pin!(frame);
let (outgoing_ack_tx, _rx) = priority_mpsc::unbounded();
let mut dedup = Dedup {
frame: frame.map(Ok),
max_gap: 100,
window: DuplicateWindow::default(),
outgoing_ack_tx,
ack: Acknowledgement::new_arc(crate::RoleContext::Server),
};

assert_eq!(dedup.next().await.unwrap().unwrap(), frame_set(0..64));
Expand All @@ -253,12 +246,11 @@ mod test {
}
};
tokio::pin!(frame);
let (outgoing_ack_tx, _rx) = priority_mpsc::unbounded();
let mut dedup = Dedup {
frame: frame.map(Ok),
max_gap: 100,
window: DuplicateWindow::default(),
outgoing_ack_tx,
ack: Acknowledgement::new_arc(crate::RoleContext::Server),
};
assert_eq!(dedup.next().await.unwrap().unwrap(), frame_set([0]));
assert_eq!(dedup.next().await.unwrap().unwrap(), frame_set([101]));
Expand All @@ -278,12 +270,11 @@ mod test {
}
};
tokio::pin!(frame);
let (outgoing_ack_tx, _rx) = priority_mpsc::unbounded();
let mut dedup = Dedup {
frame: frame.map(Ok),
max_gap: 100,
window: DuplicateWindow::default(),
outgoing_ack_tx,
ack: Acknowledgement::new_arc(crate::RoleContext::Server),
};
assert_eq!(
dedup.next().await.unwrap().unwrap(),
Expand Down Expand Up @@ -314,12 +305,11 @@ mod test {
}
};
tokio::pin!(frame);
let (outgoing_ack_tx, _rx) = priority_mpsc::unbounded();
let mut dedup = Dedup {
frame: frame.map(Ok),
max_gap: scale,
window: DuplicateWindow::default(),
outgoing_ack_tx,
ack: Acknowledgement::new_arc(crate::RoleContext::Server),
};
assert_eq!(dedup.next().await.unwrap().unwrap(), frame_set(idx1_set));

Expand Down
Loading

0 comments on commit b0b6aeb

Please sign in to comment.