Skip to content

Commit

Permalink
readme
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <igxnon@gmail.com>
  • Loading branch information
iGxnon committed Sep 7, 2024
1 parent 02424ed commit 0864c67
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 29 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ See [examples](examples/) or [integration testing](src/tests.rs) for basic usage
### Server

Most operations are performed on `Stream` and `Sink`. There will be some options in [opts](src/opts.rs).
The implementation details are obscured, and you can only see a very high level of abstraction, including the `Error` type, which is just `std::io::Error`.

Keep polling `incoming` because it also serves as the router to every connections.
Apply `Sink::poll_flush` to IO will trigger to flush all pending packets, `ACK`/`NACK`, and stale packets.
Apply `Sink::poll_close` to IO will ensure that all data is received by the peer before returning (i.e It may keep resending infinitely.).
Apply `Sink::poll_flush` to IO will trigger to flush all pending packets, `ACK`/`NACK`, and stale packets. So you have to call `poll_flush` periodically. You can configure the [flush strategy](src/opts.rs) you want.
Apply `Sink::poll_close` to IO will ensure that all data is received by the peer before returning. It may keep resending infinitely unless you cancel the task. So you'd better set a timeout for each `poll_close`.

> [!NOTE]
> All calculations are lazy. You need to decide how long to flush once, and how long to wait when closing before considering the peer is disconnected.
> All calculations are lazy. The state will not update if you do not poll it.
```rust
use bytes::Bytes;
Expand Down
2 changes: 1 addition & 1 deletion examples/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ fn display(spans: Vec<SpanRecord>) {
if list.is_empty() {
continue;
}
println!("{}", trace_id.0);
println!("trace_id: {}", trace_id.0);
let l = &list[&SpanId::default()];
for (i, root) in l.iter().enumerate() {
dfs(&list, &spans_map, *root, 0, i == l.len() - 1);
Expand Down
5 changes: 3 additions & 2 deletions src/client/conn/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::client::handler::online::HandleOnline;
use crate::codec::frame::Framed;
use crate::codec::{Decoded, Encoded};
use crate::guard::HandleOutgoing;
use crate::link::{Router, TransferLink};
use crate::link::{Route, TransferLink};
use crate::opts::Ping;
use crate::state::{IncomingStateManage, OutgoingStateManage};
use crate::utils::Logged;
Expand Down Expand Up @@ -57,10 +57,11 @@ impl ConnectTo for TokioUdpSocket {
.frame_encoded(peer.mtu, config.codec_config(), Arc::clone(&link))
.manage_outgoing_state(None);

let (mut router, route) = Router::new(Arc::clone(&link));
let (mut router, route) = Route::new(Arc::clone(&link));

tokio::spawn(async move {
while let Some(pack) = incoming.next().await {
// deliver the packet actively so that we do not miss ACK/NACK packets to advance the outgoing state
router.deliver(pack);
}
});
Expand Down
3 changes: 3 additions & 0 deletions src/estimator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ impl RFC6298Impl {

/// The current RTO estimation.
pub(crate) fn rto(&self) -> Duration {
// TODO:
// RFC6298 2.4 suggests a minimum of 1 second, which may be
// a conservative choice for some applications.
cmp::max(
self.get() + cmp::max(TIMER_GRANULARITY, 4 * self.var),
Duration::from_secs(1),
Expand Down
5 changes: 5 additions & 0 deletions src/guard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,10 @@ impl ResendMap {

/// `process_stales` collect all stale frames into buffer and remove the expired entries
fn process_stales(&mut self, buffer: &mut VecDeque<Frame>) {
if self.map.is_empty() {
return;
}

let now = Instant::now();
if now < self.last_record_expired_at {
trace!(
Expand All @@ -384,6 +388,7 @@ impl ResendMap {
}
});
debug_assert!(min_expired_at > now);
// update the last record expired at
self.last_record_expired_at = min_expired_at;

let len = self.map.len();
Expand Down
22 changes: 13 additions & 9 deletions src/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub(crate) type SharedLink = Arc<TransferLink>;

/// Transfer data and task between stream and sink.
pub(crate) struct TransferLink {
// incoming ack with receive timestamp
incoming_ack: ConcurrentQueue<(AckOrNack, Instant)>,
incoming_nack: ConcurrentQueue<AckOrNack>,
forward_waking: AtomicBool,
Expand Down Expand Up @@ -98,17 +99,19 @@ impl TransferLink {
dropped.total_cnt()
);
}
// wake up after sends ack
// wake up after receiving an ack
if self.should_waking() {
let c_id = ConnId::new(self.role.guid(), self.peer.guid);
let mut cnt = 0;
for waker in Reactor::get().cancel_all_timers(c_id) {
// safe to panic
waker.wake();
debug!(
"[{}] wake up a certain waker after receives ack on connection: {c_id:?}",
self.role
);
cnt += 1;
}
debug!(
"[{}] wake up {cnt} wakers after receives ack on connection: {c_id:?}",
self.role
);
}
}

Expand Down Expand Up @@ -173,14 +176,14 @@ impl TransferLink {
}
}

/// Router for incoming packets
pub(crate) struct Router {
/// A route for incoming packets
pub(crate) struct Route {
router_tx: Sender<FrameSet<FramesMut>>,
link: SharedLink,
seq_read: u24,
}

impl Router {
impl Route {
pub(crate) fn new(link: SharedLink) -> (Self, impl Stream<Item = FrameSet<FramesMut>>) {
let (router_tx, router_rx) = async_channel::unbounded();
(
Expand All @@ -200,7 +203,8 @@ impl Router {
}
match pack {
connected::Packet::FrameSet(frames) => {
// TODO: use lock free concurrent queue to avoid lock
// TODO: use lock free concurrent queue to buffer the outgoing ack/nack to avoid
// locking the mutex

self.link.outgoing_ack.lock().push(Reverse(frames.seq_num));

Expand Down
3 changes: 2 additions & 1 deletion src/packet/connected/frame_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,8 @@ impl FrameBody {
}),
PackType::DisconnectNotification => Ok(Self::DisconnectNotification),
PackType::DetectLostConnections => Ok(Self::DetectLostConnections),
_ => Ok(Self::User(buf)),
_ => Ok(Self::User(buf)), /* we rely on the user to handle this even it is not a user
* packet */
}
}

Expand Down
27 changes: 23 additions & 4 deletions src/packet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ macro_rules! read_buf {

pub(in crate::packet) use read_buf;

const VALID_FLAG: u8 = 0b1000_0000;
const ACK_FLAG: u8 = 0b1100_0000;
const NACK_FLAG: u8 = 0b1010_0000;
const VALID_FLAG: u8 = 0b1000_0000; // A valid user frame
const ACK_FLAG: u8 = 0b1100_0000; // A valid user frame which contains an ACK frame
const NACK_FLAG: u8 = 0b1010_0000; // A valid user frame which contains a NACK frame

const PARTED_FLAG: u8 = 0b0001_0000;
const CONTINUOUS_SEND_FLAG: u8 = 0b0000_1000;
Expand All @@ -38,7 +38,7 @@ pub(crate) const FRAGMENT_PART_SIZE: usize = 10;
/// others are encapsulated in a `FrameSet` data packet and appear as the first byte of the body
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
#[allow(dead_code)] // may used in future
#[allow(dead_code)] // We do not want follow the whole RakNet design
pub(crate) enum PackType {
ConnectedPing = 0x00,
UnconnectedPing1 = 0x01,
Expand All @@ -50,6 +50,17 @@ pub(crate) enum PackType {
OpenConnectionRequest2 = 0x07,
OpenConnectionReply2 = 0x08,
ConnectionRequest = 0x09,

// Some packet related to security, we do not support them now
RemoteSystemRequiresPublicKey = 0x0A,
OurSystemRequiresSecurity = 0x0B,
PublicKeyMismatch = 0x0C,
OutOfBandInternal = 0x0D,
// Related to `_WITH_ACK_RECEIPT` packets
SndReceiptAcked = 0x0E,
SndReceiptLoss = 0x0F,

// User packet types
ConnectionRequestAccepted = 0x10,
ConnectionRequestFailed = 0x11,
AlreadyConnected = 0x12,
Expand All @@ -58,6 +69,7 @@ pub(crate) enum PackType {
DisconnectNotification = 0x15,
ConnectionLost = 0x16,
ConnectionBanned = 0x17,
InvalidPassword = 0x18,
IncompatibleProtocolVersion = 0x19,
IpRecentlyConnected = 0x1a,
Timestamp = 0x1b,
Expand Down Expand Up @@ -85,13 +97,20 @@ impl PackType {
0x08 => Ok(PackType::OpenConnectionReply2),
0x09 => Ok(PackType::ConnectionRequest),
0x10 => Ok(PackType::ConnectionRequestAccepted),
0x0A => Ok(PackType::RemoteSystemRequiresPublicKey),
0x0B => Ok(PackType::OurSystemRequiresSecurity),
0x0C => Ok(PackType::PublicKeyMismatch),
0x0D => Ok(PackType::OutOfBandInternal),
0x0E => Ok(PackType::SndReceiptAcked),
0x0F => Ok(PackType::SndReceiptLoss),
0x11 => Ok(PackType::ConnectionRequestFailed),
0x12 => Ok(PackType::AlreadyConnected),
0x13 => Ok(PackType::NewIncomingConnection),
0x14 => Ok(PackType::NoFreeIncomingConnections),
0x15 => Ok(PackType::DisconnectNotification),
0x16 => Ok(PackType::ConnectionLost),
0x17 => Ok(PackType::ConnectionBanned),
0x18 => Ok(PackType::InvalidPassword),
0x19 => Ok(PackType::IncompatibleProtocolVersion),
0x1a => Ok(PackType::IpRecentlyConnected),
0x1b => Ok(PackType::Timestamp),
Expand Down
15 changes: 8 additions & 7 deletions src/server/incoming/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use super::{Config, MakeIncoming};
use crate::codec::frame::Framed;
use crate::codec::{Decoded, Encoded};
use crate::guard::HandleOutgoing;
use crate::link::{Router, TransferLink};
use crate::link::{Route, TransferLink};
use crate::opts::TraceInfo;
use crate::server::handler::offline::OfflineHandler;
use crate::server::handler::online::HandleOnline;
Expand All @@ -32,7 +32,7 @@ pin_project! {
offline: OfflineHandler<Framed<Arc<TokioUdpSocket>>>,
config: Config,
socket: Arc<TokioUdpSocket>,
routers: HashMap<SocketAddr, Router>,
router: HashMap<SocketAddr, Route>,
close_events: Arc<ConcurrentQueue<SocketAddr>>,
}
}
Expand All @@ -55,7 +55,7 @@ impl MakeIncoming for TokioUdpSocket {
),
socket,
config,
routers: HashMap::new(),
router: HashMap::new(),
close_events: Arc::new(ConcurrentQueue::unbounded()),
}
}
Expand All @@ -72,9 +72,10 @@ impl Stream for Incoming {

let role = this.config.server_role();
for ev in this.close_events.try_iter() {
this.routers
this.router
.remove(&ev)
.expect("closed a non-exist connection");
// TODO: could we keep the connection alive for a while? 0-RTT handshake?
this.offline.as_mut().disconnect(&ev);
debug!("[{role}] connection closed: {ev}");
}
Expand All @@ -83,17 +84,17 @@ impl Stream for Incoming {
let Some((pack, peer)) = ready!(this.offline.as_mut().poll_next(cx)) else {
return Poll::Ready(None);
};
if let Some(entry) = this.routers.get_mut(&peer.addr) {
if let Some(entry) = this.router.get_mut(&peer.addr) {
if !entry.deliver(pack) {
error!("[{role}] connection was dropped before closed");
}
continue;
}

let link = TransferLink::new_arc(role, peer);
let (mut entry, route) = Router::new(Arc::clone(&link));
let (mut entry, route) = Route::new(Arc::clone(&link));
entry.deliver(pack);
this.routers.insert(peer.addr, entry);
this.router.insert(peer.addr, entry);

let dst = Framed::new(Arc::clone(this.socket), this.config.max_mtu as usize)
.handle_outgoing(Arc::clone(&link), this.config.send_buf_cap, peer, role)
Expand Down
3 changes: 1 addition & 2 deletions src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use log::info;
use tokio::net::UdpSocket;

use crate::client::{self, ConnectTo};
use crate::opts::{FlushStrategy, TraceInfo};
use crate::opts::FlushStrategy;
use crate::server::{self, MakeIncoming};
use crate::utils::tests::test_trace_log_setup;
use crate::{Message, Reliability};
Expand Down Expand Up @@ -60,7 +60,6 @@ async fn test_tokio_udp_works() {
tokio::select! {
Some(data) = reader.next() => {
sender.feed(Message::new(Reliability::Reliable, 0, data)).await.unwrap();
info!("last trace id: {:?}", reader.last_trace_id());
}
_ = ticker.tick() => {
sender.flush().await.unwrap();
Expand Down

0 comments on commit 0864c67

Please sign in to comment.