diff --git a/README.md b/README.md index 8fafc23..98f81a1 100644 --- a/README.md +++ b/README.md @@ -36,13 +36,14 @@ See [examples](examples/) or [integration testing](src/tests.rs) for basic usage ### Server Most operations are performed on `Stream` and `Sink`. There will be some options in [opts](src/opts.rs). +The implementation details are obscured, and you can only see a very high level of abstraction, including the `Error` type, which is just `std::io::Error`. Keep polling `incoming` because it also serves as the router to every connections. -Apply `Sink::poll_flush` to IO will trigger to flush all pending packets, `ACK`/`NACK`, and stale packets. -Apply `Sink::poll_close` to IO will ensure that all data is received by the peer before returning (i.e It may keep resending infinitely.). +Apply `Sink::poll_flush` to IO will trigger to flush all pending packets, `ACK`/`NACK`, and stale packets. So you have to call `poll_flush` periodically. You can configure the [flush strategy](src/opts.rs) you want. +Apply `Sink::poll_close` to IO will ensure that all data is received by the peer before returning. It may keep resending infinitely unless you cancel the task. So you'd better set a timeout for each `poll_close`. > [!NOTE] -> All calculations are lazy. You need to decide how long to flush once, and how long to wait when closing before considering the peer is disconnected. +> All calculations are lazy. The state will not update if you do not poll it. ```rust use bytes::Bytes; diff --git a/examples/tracing.rs b/examples/tracing.rs index 8f2af27..89550e7 100644 --- a/examples/tracing.rs +++ b/examples/tracing.rs @@ -177,7 +177,7 @@ fn display(spans: Vec) { if list.is_empty() { continue; } - println!("{}", trace_id.0); + println!("trace_id: {}", trace_id.0); let l = &list[&SpanId::default()]; for (i, root) in l.iter().enumerate() { dfs(&list, &spans_map, *root, 0, i == l.len() - 1); diff --git a/src/client/conn/tokio.rs b/src/client/conn/tokio.rs index 377f9b5..407b056 100644 --- a/src/client/conn/tokio.rs +++ b/src/client/conn/tokio.rs @@ -13,7 +13,7 @@ use crate::client::handler::online::HandleOnline; use crate::codec::frame::Framed; use crate::codec::{Decoded, Encoded}; use crate::guard::HandleOutgoing; -use crate::link::{Router, TransferLink}; +use crate::link::{Route, TransferLink}; use crate::opts::Ping; use crate::state::{IncomingStateManage, OutgoingStateManage}; use crate::utils::Logged; @@ -57,10 +57,11 @@ impl ConnectTo for TokioUdpSocket { .frame_encoded(peer.mtu, config.codec_config(), Arc::clone(&link)) .manage_outgoing_state(None); - let (mut router, route) = Router::new(Arc::clone(&link)); + let (mut router, route) = Route::new(Arc::clone(&link)); tokio::spawn(async move { while let Some(pack) = incoming.next().await { + // deliver the packet actively so that we do not miss ACK/NACK packets to advance the outgoing state router.deliver(pack); } }); diff --git a/src/estimator.rs b/src/estimator.rs index c9f763a..a04827c 100644 --- a/src/estimator.rs +++ b/src/estimator.rs @@ -42,6 +42,9 @@ impl RFC6298Impl { /// The current RTO estimation. pub(crate) fn rto(&self) -> Duration { + // TODO: + // RFC6298 2.4 suggests a minimum of 1 second, which may be + // a conservative choice for some applications. cmp::max( self.get() + cmp::max(TIMER_GRANULARITY, 4 * self.var), Duration::from_secs(1), diff --git a/src/guard.rs b/src/guard.rs index b3c0018..16947f4 100644 --- a/src/guard.rs +++ b/src/guard.rs @@ -362,6 +362,10 @@ impl ResendMap { /// `process_stales` collect all stale frames into buffer and remove the expired entries fn process_stales(&mut self, buffer: &mut VecDeque) { + if self.map.is_empty() { + return; + } + let now = Instant::now(); if now < self.last_record_expired_at { trace!( @@ -384,6 +388,7 @@ impl ResendMap { } }); debug_assert!(min_expired_at > now); + // update the last record expired at self.last_record_expired_at = min_expired_at; let len = self.map.len(); diff --git a/src/link.rs b/src/link.rs index 6f779cb..d6b05f1 100644 --- a/src/link.rs +++ b/src/link.rs @@ -19,6 +19,7 @@ pub(crate) type SharedLink = Arc; /// Transfer data and task between stream and sink. pub(crate) struct TransferLink { + // incoming ack with receive timestamp incoming_ack: ConcurrentQueue<(AckOrNack, Instant)>, incoming_nack: ConcurrentQueue, forward_waking: AtomicBool, @@ -98,17 +99,19 @@ impl TransferLink { dropped.total_cnt() ); } - // wake up after sends ack + // wake up after receiving an ack if self.should_waking() { let c_id = ConnId::new(self.role.guid(), self.peer.guid); + let mut cnt = 0; for waker in Reactor::get().cancel_all_timers(c_id) { // safe to panic waker.wake(); - debug!( - "[{}] wake up a certain waker after receives ack on connection: {c_id:?}", - self.role - ); + cnt += 1; } + debug!( + "[{}] wake up {cnt} wakers after receives ack on connection: {c_id:?}", + self.role + ); } } @@ -173,14 +176,14 @@ impl TransferLink { } } -/// Router for incoming packets -pub(crate) struct Router { +/// A route for incoming packets +pub(crate) struct Route { router_tx: Sender>, link: SharedLink, seq_read: u24, } -impl Router { +impl Route { pub(crate) fn new(link: SharedLink) -> (Self, impl Stream>) { let (router_tx, router_rx) = async_channel::unbounded(); ( @@ -200,7 +203,8 @@ impl Router { } match pack { connected::Packet::FrameSet(frames) => { - // TODO: use lock free concurrent queue to avoid lock + // TODO: use lock free concurrent queue to buffer the outgoing ack/nack to avoid + // locking the mutex self.link.outgoing_ack.lock().push(Reverse(frames.seq_num)); diff --git a/src/packet/connected/frame_set.rs b/src/packet/connected/frame_set.rs index 963e735..272ddc1 100644 --- a/src/packet/connected/frame_set.rs +++ b/src/packet/connected/frame_set.rs @@ -390,7 +390,8 @@ impl FrameBody { }), PackType::DisconnectNotification => Ok(Self::DisconnectNotification), PackType::DetectLostConnections => Ok(Self::DetectLostConnections), - _ => Ok(Self::User(buf)), + _ => Ok(Self::User(buf)), /* we rely on the user to handle this even it is not a user + * packet */ } } diff --git a/src/packet/mod.rs b/src/packet/mod.rs index 04f5ba7..3d8364a 100644 --- a/src/packet/mod.rs +++ b/src/packet/mod.rs @@ -19,9 +19,9 @@ macro_rules! read_buf { pub(in crate::packet) use read_buf; -const VALID_FLAG: u8 = 0b1000_0000; -const ACK_FLAG: u8 = 0b1100_0000; -const NACK_FLAG: u8 = 0b1010_0000; +const VALID_FLAG: u8 = 0b1000_0000; // A valid user frame +const ACK_FLAG: u8 = 0b1100_0000; // A valid user frame which contains an ACK frame +const NACK_FLAG: u8 = 0b1010_0000; // A valid user frame which contains a NACK frame const PARTED_FLAG: u8 = 0b0001_0000; const CONTINUOUS_SEND_FLAG: u8 = 0b0000_1000; @@ -38,7 +38,7 @@ pub(crate) const FRAGMENT_PART_SIZE: usize = 10; /// others are encapsulated in a `FrameSet` data packet and appear as the first byte of the body #[derive(Debug, Clone, Copy, PartialEq, Eq)] #[repr(u8)] -#[allow(dead_code)] // may used in future +#[allow(dead_code)] // We do not want follow the whole RakNet design pub(crate) enum PackType { ConnectedPing = 0x00, UnconnectedPing1 = 0x01, @@ -50,6 +50,17 @@ pub(crate) enum PackType { OpenConnectionRequest2 = 0x07, OpenConnectionReply2 = 0x08, ConnectionRequest = 0x09, + + // Some packet related to security, we do not support them now + RemoteSystemRequiresPublicKey = 0x0A, + OurSystemRequiresSecurity = 0x0B, + PublicKeyMismatch = 0x0C, + OutOfBandInternal = 0x0D, + // Related to `_WITH_ACK_RECEIPT` packets + SndReceiptAcked = 0x0E, + SndReceiptLoss = 0x0F, + + // User packet types ConnectionRequestAccepted = 0x10, ConnectionRequestFailed = 0x11, AlreadyConnected = 0x12, @@ -58,6 +69,7 @@ pub(crate) enum PackType { DisconnectNotification = 0x15, ConnectionLost = 0x16, ConnectionBanned = 0x17, + InvalidPassword = 0x18, IncompatibleProtocolVersion = 0x19, IpRecentlyConnected = 0x1a, Timestamp = 0x1b, @@ -85,6 +97,12 @@ impl PackType { 0x08 => Ok(PackType::OpenConnectionReply2), 0x09 => Ok(PackType::ConnectionRequest), 0x10 => Ok(PackType::ConnectionRequestAccepted), + 0x0A => Ok(PackType::RemoteSystemRequiresPublicKey), + 0x0B => Ok(PackType::OurSystemRequiresSecurity), + 0x0C => Ok(PackType::PublicKeyMismatch), + 0x0D => Ok(PackType::OutOfBandInternal), + 0x0E => Ok(PackType::SndReceiptAcked), + 0x0F => Ok(PackType::SndReceiptLoss), 0x11 => Ok(PackType::ConnectionRequestFailed), 0x12 => Ok(PackType::AlreadyConnected), 0x13 => Ok(PackType::NewIncomingConnection), @@ -92,6 +110,7 @@ impl PackType { 0x15 => Ok(PackType::DisconnectNotification), 0x16 => Ok(PackType::ConnectionLost), 0x17 => Ok(PackType::ConnectionBanned), + 0x18 => Ok(PackType::InvalidPassword), 0x19 => Ok(PackType::IncompatibleProtocolVersion), 0x1a => Ok(PackType::IpRecentlyConnected), 0x1b => Ok(PackType::Timestamp), diff --git a/src/server/incoming/tokio.rs b/src/server/incoming/tokio.rs index 606d093..a0bb9f8 100644 --- a/src/server/incoming/tokio.rs +++ b/src/server/incoming/tokio.rs @@ -18,7 +18,7 @@ use super::{Config, MakeIncoming}; use crate::codec::frame::Framed; use crate::codec::{Decoded, Encoded}; use crate::guard::HandleOutgoing; -use crate::link::{Router, TransferLink}; +use crate::link::{Route, TransferLink}; use crate::opts::TraceInfo; use crate::server::handler::offline::OfflineHandler; use crate::server::handler::online::HandleOnline; @@ -32,7 +32,7 @@ pin_project! { offline: OfflineHandler>>, config: Config, socket: Arc, - routers: HashMap, + router: HashMap, close_events: Arc>, } } @@ -55,7 +55,7 @@ impl MakeIncoming for TokioUdpSocket { ), socket, config, - routers: HashMap::new(), + router: HashMap::new(), close_events: Arc::new(ConcurrentQueue::unbounded()), } } @@ -72,9 +72,10 @@ impl Stream for Incoming { let role = this.config.server_role(); for ev in this.close_events.try_iter() { - this.routers + this.router .remove(&ev) .expect("closed a non-exist connection"); + // TODO: could we keep the connection alive for a while? 0-RTT handshake? this.offline.as_mut().disconnect(&ev); debug!("[{role}] connection closed: {ev}"); } @@ -83,7 +84,7 @@ impl Stream for Incoming { let Some((pack, peer)) = ready!(this.offline.as_mut().poll_next(cx)) else { return Poll::Ready(None); }; - if let Some(entry) = this.routers.get_mut(&peer.addr) { + if let Some(entry) = this.router.get_mut(&peer.addr) { if !entry.deliver(pack) { error!("[{role}] connection was dropped before closed"); } @@ -91,9 +92,9 @@ impl Stream for Incoming { } let link = TransferLink::new_arc(role, peer); - let (mut entry, route) = Router::new(Arc::clone(&link)); + let (mut entry, route) = Route::new(Arc::clone(&link)); entry.deliver(pack); - this.routers.insert(peer.addr, entry); + this.router.insert(peer.addr, entry); let dst = Framed::new(Arc::clone(this.socket), this.config.max_mtu as usize) .handle_outgoing(Arc::clone(&link), this.config.send_buf_cap, peer, role) diff --git a/src/tests.rs b/src/tests.rs index 0ec33ed..3bbce10 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -9,7 +9,7 @@ use log::info; use tokio::net::UdpSocket; use crate::client::{self, ConnectTo}; -use crate::opts::{FlushStrategy, TraceInfo}; +use crate::opts::FlushStrategy; use crate::server::{self, MakeIncoming}; use crate::utils::tests::test_trace_log_setup; use crate::{Message, Reliability}; @@ -60,7 +60,6 @@ async fn test_tokio_udp_works() { tokio::select! { Some(data) = reader.next() => { sender.feed(Message::new(Reliability::Reliable, 0, data)).await.unwrap(); - info!("last trace id: {:?}", reader.last_trace_id()); } _ = ticker.tick() => { sender.flush().await.unwrap();