diff --git a/src/client/conn/tokio.rs b/src/client/conn/tokio.rs index 4e1bc4b..bc96275 100644 --- a/src/client/conn/tokio.rs +++ b/src/client/conn/tokio.rs @@ -14,7 +14,7 @@ use crate::codec::tokio::Codec; use crate::codec::{Decoded, Encoded}; use crate::errors::Error; use crate::guard::HandleOutgoing; -use crate::io::{IOImpl, IO}; +use crate::io::{MergedIO, IO}; use crate::link::TransferLink; use crate::utils::{Logged, StreamExt}; use crate::PeerContext; @@ -69,6 +69,6 @@ impl ConnectTo for TokioUdpSocket { .await? .enter_on_item(Span::noop); - Ok(IOImpl::new(io)) + Ok(MergedIO::new(io)) } } diff --git a/src/guard.rs b/src/guard.rs index 807ab13..b5a9c95 100644 --- a/src/guard.rs +++ b/src/guard.rs @@ -229,15 +229,16 @@ where } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // insure all frames are sent - // TODO: resend with a proper threshold or timeout here + // insure all frames are received by the peer + // TODO: resend with a proper threshold or timeout here instead of infinite waiting while !self.resend.is_empty() { ready!(self.as_mut().try_empty(cx))?; debug_assert!(self.buf.is_empty() && self.link.flush_empty()); + // wait for the next resend ready!(self.resend.poll_wait(cx)); } self.project().frame.poll_close(cx) } } -// TODO: test \ No newline at end of file +// TODO: test diff --git a/src/io.rs b/src/io.rs index 911e71e..69063d8 100644 --- a/src/io.rs +++ b/src/io.rs @@ -29,8 +29,7 @@ pub trait IO: } pin_project! { - /// The detailed implementation of [`crate::io::IO`] - pub(crate) struct IOImpl { + pub(crate) struct MergedIO { #[pin] io: IO, default_reliability: Reliability, @@ -38,12 +37,12 @@ pin_project! { } } -impl IOImpl +impl MergedIO where IO: Stream + Sink + TraceInfo + Send, { pub(crate) fn new(io: IO) -> Self { - IOImpl { + MergedIO { io, default_reliability: Reliability::ReliableOrdered, default_order_channel: 0, @@ -51,7 +50,7 @@ where } } -impl Stream for IOImpl +impl Stream for MergedIO where IO: Stream, { @@ -62,7 +61,7 @@ where } } -impl Sink for IOImpl +impl Sink for MergedIO where IO: Sink, { @@ -86,7 +85,7 @@ where } } -impl Sink for IOImpl +impl Sink for MergedIO where IO: Sink, { @@ -109,7 +108,7 @@ where } } -impl crate::io::IO for IOImpl +impl crate::io::IO for MergedIO where IO: Sink + Stream + TraceInfo + Send, { @@ -134,3 +133,114 @@ where self.io.get_last_trace_id() } } + +pin_project! { + pub(crate) struct SplittedIO { + #[pin] + src: I, + #[pin] + dst: O, + default_reliability: Reliability, + default_order_channel: u8, + } +} + +impl SplittedIO +where + I: Stream + TraceInfo + Send, + O: Sink + Send, +{ + pub(crate) fn new(src: I, dst: O) -> Self { + SplittedIO { + src, + dst, + default_reliability: Reliability::ReliableOrdered, + default_order_channel: 0, + } + } +} + +impl Stream for SplittedIO +where + I: Stream, +{ + type Item = Bytes; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().src.poll_next(cx) + } +} + +impl Sink for SplittedIO +where + O: Sink, +{ + type Error = Error; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Sink::::poll_ready(self, cx) + } + + fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> { + let msg = Message::new(self.default_reliability, self.default_order_channel, item); + Sink::::start_send(self, msg) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Sink::::poll_flush(self, cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Sink::::poll_close(self, cx) + } +} + +impl Sink for SplittedIO +where + O: Sink, +{ + type Error = Error; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().dst.poll_ready(cx) + } + + fn start_send(self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> { + self.project().dst.start_send(item) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().dst.poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().dst.poll_close(cx) + } +} + +impl crate::io::IO for SplittedIO +where + O: Sink + Send, + I: Stream + TraceInfo + Send, +{ + fn set_default_reliability(&mut self, reliability: Reliability) { + self.default_reliability = reliability; + } + + fn get_default_reliability(&self) -> Reliability { + self.default_reliability + } + + fn set_default_order_channel(&mut self, order_channel: u8) { + self.default_order_channel = order_channel; + } + + fn get_default_order_channel(&self) -> u8 { + self.default_order_channel + } + + /// Get the last `trace_id` after polling Bytes form it, used for end to end tracing + fn last_trace_id(&self) -> Option { + self.src.get_last_trace_id() + } +} diff --git a/src/server/handler/offline.rs b/src/server/handler/offline.rs index f65f6c1..e8e0916 100644 --- a/src/server/handler/offline.rs +++ b/src/server/handler/offline.rs @@ -150,11 +150,10 @@ where return Poll::Ready(Some((pack, peer.clone()))); } debug!( - "[{}] ignore connected packet {:?} from unconnected client {addr}", + "[{}] ignore packet {:?} from unconnected client {addr}", this.role, pack.pack_type() ); - // TODO: Send DETECT_LOST_CONNECTION ? *this.state = OfflineState::SendingPrepare(Some(( Self::make_connection_request_failed(this.config), addr, diff --git a/src/server/handler/online.rs b/src/server/handler/online.rs index b403c14..702aebc 100644 --- a/src/server/handler/online.rs +++ b/src/server/handler/online.rs @@ -1,315 +1,154 @@ use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; -use std::ops::Deref; use std::pin::Pin; use std::task::{ready, Context, Poll}; -use std::time::{SystemTime, UNIX_EPOCH}; use bytes::Bytes; -use flume::Sender; -use futures::{Sink, Stream}; -use log::{debug, error}; +use futures::Stream; +use log::debug; use pin_project_lite::pin_project; -use crate::errors::{CodecError, Error}; +use crate::link::SharedLink; use crate::packet::connected::FrameBody; use crate::packet::unconnected; -use crate::Message; +use crate::RoleContext; pub(crate) trait HandleOnline: Sized { - fn handle_online( + fn handle_online( self, - write: O, - raw_write: RO, - client_addr: SocketAddr, server_guid: u64, - drop_notifier: Sender, - ) -> OnlineHandler - where - O: Sink, - RO: Sink; + client_addr: SocketAddr, + link: SharedLink, + ) -> OnlineHandler; } impl HandleOnline for F where F: Stream, { - fn handle_online( + fn handle_online( self, - write: O, - raw_write: RO, - client_addr: SocketAddr, server_guid: u64, - drop_notifier: Sender, - ) -> OnlineHandler - where - O: Sink, - RO: Sink, - { + client_addr: SocketAddr, + link: SharedLink, + ) -> OnlineHandler { OnlineHandler { - state: State::HandshakePhase1, + frame: self, server_guid, - client_addr: AddrDropGuard { - client_addr, - drop_notifier, - }, - read: self, - write, - raw_write, + client_addr, + state: HandshakeState::WaitConnRequest, + link, + role: RoleContext::Server { guid: server_guid }, } } } pin_project! { - pub(crate) struct OnlineHandler { - state: State, - server_guid: u64, - client_addr: AddrDropGuard, + pub(crate) struct OnlineHandler { #[pin] - read: I, - #[pin] - write: O, - #[pin] - raw_write: RO, - } -} - -struct AddrDropGuard { - client_addr: SocketAddr, - drop_notifier: Sender, -} - -impl Deref for AddrDropGuard { - type Target = SocketAddr; - - fn deref(&self) -> &Self::Target { - &self.client_addr - } -} - -impl Drop for AddrDropGuard { - fn drop(&mut self) { - if let Err(err) = self.drop_notifier.try_send(self.client_addr) { - error!( - "[server] cannot send address {} into drop_notifier, err: {err}", - self.client_addr - ); - return; - } - debug!("[server] connection from {} is dropped", self.client_addr); + frame: F, + server_guid: u64, + client_addr: SocketAddr, + state: HandshakeState, + link: SharedLink, + role: RoleContext, } } -enum State { - HandshakePhase1, - SendAccept(Option), - SendAcceptFlush, - SendFailed(Option), - SendFailedFlush, - HandshakePhase2, +enum HandshakeState { + WaitConnRequest, + WaitNewIncomingConn, Connected, - SendPong(Option), - SendPongFlush, - Closed, } -impl Stream for OnlineHandler +impl Stream for OnlineHandler where - I: Stream, - O: Sink, - RO: Sink, + F: Stream, { type Item = Bytes; - #[allow(clippy::cognitive_complexity)] // not bad fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); loop { match this.state { - State::HandshakePhase1 => { - let Some(body) = ready!(this.read.as_mut().poll_next(cx)) else { - *this.state = State::Closed; - continue; + HandshakeState::WaitConnRequest => { + let Some(body) = ready!(this.frame.as_mut().poll_next(cx)) else { + return Poll::Ready(None); }; - - match body { - FrameBody::ConnectionRequest { - request_timestamp, - use_encryption, - .. - } => { - if use_encryption { - *this.state = State::SendFailed(Some( - unconnected::Packet::ConnectionRequestFailed { - magic: (), - server_guid: *this.server_guid, - }, - )); - } else { - let system_addr = if this.client_addr.is_ipv6() { - SocketAddr::new( - std::net::IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), - 0, - ) - } else { - SocketAddr::new( - std::net::IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), - 0, - ) - }; - *this.state = - State::SendAccept(Some(FrameBody::ConnectionRequestAccepted { - client_address: **this.client_addr, - system_index: 0, - system_addresses: [system_addr; 20], - request_timestamp, - accepted_timestamp: timestamp(), - })); - } - } - _ => { - debug!("[server] ignore packet {body:?} on HandshakePhase1",); + if let FrameBody::ConnectionRequest { + request_timestamp, + use_encryption, + .. + } = body + { + if use_encryption { + this.link.send_unconnected( + unconnected::Packet::ConnectionRequestFailed { + magic: (), + server_guid: *this.server_guid, + }, + ); + continue; } - }; - } - State::SendAccept(pack) => { - if let Err(err) = ready!(this.write.as_mut().poll_ready(cx)) { - debug!("[server] SendAccept poll_ready error: {err}, fallback to HandshakePhase1"); - *this.state = State::HandshakePhase1; - continue; - } - if let Err(err) = this.write.as_mut().start_send(pack.take().unwrap()) { - debug!("[server] SendAccept start_send error: {err}, fallback to HandshakePhase1"); - *this.state = State::HandshakePhase1; - continue; - } - *this.state = State::SendAcceptFlush; - } - State::SendAcceptFlush => { - if let Err(err) = ready!(this.write.as_mut().poll_flush(cx)) { - debug!( - "[server] SendAcceptFlush poll_flush error: {err}, fallback to HandshakePhase1" - ); - *this.state = State::HandshakePhase1; + let system_addr = if this.client_addr.is_ipv6() { + SocketAddr::new( + std::net::IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), + 0, + ) + } else { + SocketAddr::new(std::net::IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0) + }; + this.link + .send_frame_body(FrameBody::ConnectionRequestAccepted { + client_address: *this.client_addr, + system_index: 0, + system_addresses: [system_addr; 20], + request_timestamp, + accepted_timestamp: timestamp(), + }); + *this.state = HandshakeState::WaitNewIncomingConn; continue; } - *this.state = State::HandshakePhase2; + debug!("[{}] ignore packet {body:?} on WaitConnRequest", this.role); } - State::SendFailed(pack) => { - if let Err(err) = ready!(this.raw_write.as_mut().poll_ready(cx)) { - debug!("[server] SendFailed poll_ready error: {err}, fallback to HandshakePhase1"); - *this.state = State::HandshakePhase1; - continue; - } - if let Err(err) = this.raw_write.as_mut().start_send(pack.take().unwrap()) { - debug!("[server] SendFailed start_send error: {err}, fallback to HandshakePhase1"); - *this.state = State::HandshakePhase1; + HandshakeState::WaitNewIncomingConn => { + let Some(body) = ready!(this.frame.as_mut().poll_next(cx)) else { + return Poll::Ready(None); + }; + if let FrameBody::NewIncomingConnection { .. } = body { + debug!("[{}] accept new incoming connection", this.role); + *this.state = HandshakeState::Connected; continue; } - *this.state = State::SendFailedFlush; - } - State::SendFailedFlush => { - if let Err(err) = ready!(this.raw_write.as_mut().poll_flush(cx)) { - debug!("[server] SendFailedFlush poll_flush error: {err}"); - } - *this.state = State::HandshakePhase1; + debug!( + "[{}] ignore packet {body:?} on WaitNewIncomingConn", + this.role + ); } - State::HandshakePhase2 => { - let Some(body) = ready!(this.read.as_mut().poll_next(cx)) else { - *this.state = State::Closed; - continue; - }; - match body { - FrameBody::NewIncomingConnection { .. } => { - debug!("[server] connections finished handshake"); - *this.state = State::Connected; - } - _ => { - debug!("[server] ignore packet {body:?} on HandshakePhase2"); - } - }; - } - State::Connected => { - let Some(body) = ready!(this.read.as_mut().poll_next(cx)) else { - *this.state = State::Closed; - continue; + HandshakeState::Connected => { + let Some(body) = ready!(this.frame.as_mut().poll_next(cx)) else { + return Poll::Ready(None); }; match body { FrameBody::ConnectedPing { client_timestamp } => { - *this.state = State::SendPong(Some(FrameBody::ConnectedPong { + this.link.send_frame_body(FrameBody::ConnectedPong { client_timestamp, server_timestamp: timestamp(), - })); - } - FrameBody::DisconnectNotification => { - *this.state = State::Closed; + }); } FrameBody::User(data) => return Poll::Ready(Some(data)), _ => { - debug!("[server] ignore packet {body:?} on Connected",); + debug!("[{}] ignore packet {body:?} on Connected", this.role); } } } - State::SendPong(pack) => { - if let Err(err) = ready!(this.write.as_mut().poll_ready(cx)) { - debug!("[server] SendPong poll_ready error: {err}"); - *this.state = State::Connected; - continue; - } - if let Err(err) = this.write.as_mut().start_send(pack.take().unwrap()) { - debug!("[server] SendPong start_send error: {err}"); - *this.state = State::Connected; - continue; - } - *this.state = State::SendPongFlush; - } - State::SendPongFlush => { - if let Err(err) = ready!(this.write.as_mut().poll_flush(cx)) { - debug!("[server] SendPongFlush poll_flush error: {err}"); - } - *this.state = State::Connected; - } - State::Closed => return Poll::Ready(None), } } } } -impl Sink for OnlineHandler -where - O: Sink, -{ - type Error = Error; - - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - if matches!(*this.state, State::Closed) { - return Poll::Ready(Err(Error::ConnectionClosed)); - } - ready!(this.write.poll_ready(cx))?; - Poll::Ready(Ok(())) - } - - fn start_send(self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> { - self.project().write.start_send(item)?; - Ok(()) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.project().write.poll_flush(cx))?; - Poll::Ready(Ok(())) - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - ready!(this.write.poll_close(cx))?; - *this.state = State::Closed; - Poll::Ready(Ok(())) - } -} - fn timestamp() -> i64 { - SystemTime::now() - .duration_since(UNIX_EPOCH) + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_millis() as i64 } diff --git a/src/server/incoming/tokio.rs b/src/server/incoming/tokio.rs index f126f8e..dd1e0a0 100644 --- a/src/server/incoming/tokio.rs +++ b/src/server/incoming/tokio.rs @@ -5,8 +5,8 @@ use std::sync::Arc; use std::task::{ready, Context, Poll}; use flume::{Receiver, Sender}; -use futures::{SinkExt, Stream}; -use log::{debug, error, info}; +use futures::Stream; +use log::{debug, error}; use minitrace::collector::SpanContext; use minitrace::Span; use pin_project_lite::pin_project; @@ -18,13 +18,14 @@ use crate::codec::tokio::Codec; use crate::codec::{Decoded, Encoded}; use crate::errors::CodecError; use crate::guard::HandleOutgoing; -use crate::io::{IOImpl, IO}; +use crate::io::{SplittedIO, IO}; use crate::link::TransferLink; -use crate::packet::connected::{self, Frames, FramesMut}; -use crate::packet::{unconnected, Packet}; +use crate::packet::connected::{self, FramesMut}; +use crate::packet::Packet; use crate::server::handler::offline; use crate::server::handler::offline::HandleOffline; use crate::server::handler::online::HandleOnline; +use crate::state::{IncomingStateManage, OutgoingStateManage}; use crate::utils::{Log, Logged, StreamExt}; type OfflineHandler = offline::OfflineHandler< @@ -60,7 +61,7 @@ impl MakeIncoming for TokioUdpSocket { Incoming { offline: UdpFramed::new(Arc::clone(&socket), Codec) .logged_err(|err| { - debug!("[server] got codec error: {err} when decode offline frames"); + debug!("codec error: {err} when decode offline frames"); }) .handle_offline(config.offline_config()), socket, @@ -85,49 +86,38 @@ impl Stream for Incoming { }; if let Some(router_tx) = this.router.get_mut(&peer.addr) { if router_tx.send(pack).is_err() { - error!("[server] connection was dropped before closed"); + error!("connection was dropped before closed"); this.router.remove(&peer.addr); this.offline.as_mut().disconnect(&peer.addr); } continue; } - info!("[server] new incoming from {}", peer.addr); let (router_tx, router_rx) = flume::unbounded(); router_tx.send(pack).unwrap(); this.router.insert(peer.addr, router_tx); - let ack = TransferLink::new_arc(this.config.server_role()); + let link = TransferLink::new_arc(this.config.server_role()); - let write = UdpFramed::new(Arc::clone(this.socket), Codec) + let dst = UdpFramed::new(Arc::clone(this.socket), Codec) .handle_outgoing( - Arc::clone(&ack), + Arc::clone(&link), this.config.send_buf_cap, peer.clone(), this.config.server_role(), ) - .frame_encoded(peer.mtu, this.config.codec_config(), Arc::clone(&ack)); + .frame_encoded(peer.mtu, this.config.codec_config(), Arc::clone(&link)) + .manage_outgoing_state(); - let raw_write = UdpFramed::new(Arc::clone(this.socket), Codec).with( - move |input: unconnected::Packet| async move { - Ok((Packet::::Unconnected(input), peer.addr)) - }, - ); - - let io = ack + let src = link .filter_incoming_ack(router_rx.into_stream()) .frame_decoded( this.config.codec_config(), - Arc::clone(&ack), + Arc::clone(&link), this.config.server_role(), ) - .handle_online( - write, - raw_write, - peer.addr, - this.config.sever_guid, - this.drop_notifier.clone(), - ) + .manage_incoming_state() + .handle_online(this.config.sever_guid, peer.addr, Arc::clone(&link)) .enter_on_item(move || { Span::root("conn", SpanContext::random()).with_properties(|| { [ @@ -137,7 +127,7 @@ impl Stream for Incoming { }) }); - return Poll::Ready(Some(IOImpl::new(io))); + return Poll::Ready(Some(SplittedIO::new(src, dst))); } } } diff --git a/src/state.rs b/src/state.rs index 9109b78..fe68865 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,11 +1,12 @@ //! State management for the connection. +//! Perform the 4-ways handshake for the connection close. //! Reflect the operation in the APIs of Sink and Stream when the connection stops. use std::pin::Pin; use std::task::{ready, Context, Poll}; use futures::{Sink, Stream}; -use log::warn; +use log::{debug, warn}; use pin_project_lite::pin_project; use crate::errors::{CodecError, Error}; @@ -15,10 +16,10 @@ use crate::Message; enum OutgoingState { // before sending DisconnectNotification Connecting, + FirstCloseWait, FinWait, // after sending DisconnectNotification - Fin, - CloseWait, + SecondCloseWait, Closed, } @@ -30,7 +31,10 @@ enum IncomingState { impl OutgoingState { #[inline(always)] fn before_finish(&self) -> bool { - matches!(self, OutgoingState::Connecting | OutgoingState::FinWait) + matches!( + self, + OutgoingState::Connecting | OutgoingState::FirstCloseWait | OutgoingState::FinWait + ) } } @@ -67,6 +71,14 @@ where pub(crate) trait IncomingStateManage: Sized { /// Manage the incoming state of the connection. + /// + /// It will yield None when it receives the `DisconnectNotification`. And will continue to + /// return None in the following. + /// + /// You have to repeatedly `poll_next` after receiving `DisconnectNotification` from + /// the peer. This will ensure that the ack you sent to acknowledge the `DisconnectNotification` + /// can be received by the the peer (i.e. ensuring that the the peer's `poll_close` call + /// returns successfully). fn manage_incoming_state(self) -> impl Stream; } @@ -103,7 +115,7 @@ where } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if !self.state.before_finish() { + if matches!(self.state, OutgoingState::Closed) { return Poll::Ready(Err(Error::ConnectionClosed)); } self.project().frame.poll_flush(cx).map_err(Into::into) @@ -116,21 +128,24 @@ where } loop { match this.state { - OutgoingState::Connecting | OutgoingState::FinWait => { + OutgoingState::Connecting => { + *this.state = OutgoingState::FirstCloseWait; + } + OutgoingState::FirstCloseWait => { + // first wait all stales packets to receive by the peer + ready!(this.frame.as_mut().poll_close(cx)?); *this.state = OutgoingState::FinWait; + } + OutgoingState::FinWait => { + // then send the DisconnectNotification ready!(this.frame.as_mut().poll_ready(cx)?); - this.frame .as_mut() .start_send(FrameBody::DisconnectNotification)?; - *this.state = OutgoingState::Fin; - } - OutgoingState::Fin => { - ready!(this.frame.as_mut().poll_flush(cx)?); - *this.state = OutgoingState::CloseWait; + *this.state = OutgoingState::SecondCloseWait; } - OutgoingState::CloseWait => { - // is this like TCP? + OutgoingState::SecondCloseWait => { + // second wait the DisconnectNotification to receive by the peer ready!(this.frame.as_mut().poll_close(cx)?); *this.state = OutgoingState::Closed; } @@ -173,15 +188,22 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); if matches!(this.state, IncomingState::Closed) { + debug!("state closed, poll_next to deliver ack"); + // Poll the frame even if the state is closed to because the peer can send the + // DisconnectNotification as it did not receive ack. + // This will trigger the ack of the DisconnectNotification to be delivered. + let _ = this.frame.as_mut().poll_next(cx); // ignore pending return Poll::Ready(None); } let Some(body) = ready!(this.frame.as_mut().poll_next(cx)) else { - // this is weird, UDP will not be closed by the remote, but we regard it as closed - warn!("Connection closed by the remote"); + // This happens when the incoming router is dropped on server side. + // On client side, the connection cannot be closed by UDP, this is unreachable. + warn!("Router dropped"); *this.state = IncomingState::Closed; return Poll::Ready(None); }; if matches!(body, FrameBody::DisconnectNotification) { + // The peer no longer sends any data. *this.state = IncomingState::Closed; return Poll::Ready(None); }