Skip to content

Commit

Permalink
remove merged io
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <igxnon@gmail.com>
  • Loading branch information
iGxnon committed Jul 15, 2024
1 parent 898702e commit 1fd2cd3
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 313 deletions.
20 changes: 11 additions & 9 deletions src/client/conn/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ use crate::codec::tokio::Codec;
use crate::codec::{Decoded, Encoded};
use crate::errors::Error;
use crate::guard::HandleOutgoing;
use crate::io::{MergedIO, IO};
use crate::io::{SplittedIO, IO};
use crate::link::TransferLink;
use crate::utils::{Logged, StreamExt};
use crate::state::{IncomingStateManage, OutgoingStateManage};
use crate::utils::{Logged, TraceStreamExt};
use crate::PeerContext;

impl ConnectTo for TokioUdpSocket {
Expand All @@ -39,7 +40,7 @@ impl ConnectTo for TokioUdpSocket {

let ack = TransferLink::new_arc(config.client_role());

let write = UdpFramed::new(Arc::clone(&socket), Codec)
let dst = UdpFramed::new(Arc::clone(&socket), Codec)
.handle_outgoing(
Arc::clone(&ack),
config.send_buf_cap,
Expand All @@ -49,26 +50,27 @@ impl ConnectTo for TokioUdpSocket {
},
config.client_role(),
)
.frame_encoded(config.mtu, config.codec_config(), Arc::clone(&ack));
.frame_encoded(config.mtu, config.codec_config(), Arc::clone(&ack))
.manage_outgoing_state();

let incoming = UdpFramed::new(socket, Codec)
.logged_err(|err| {
debug!("[client] got codec error: {err} when decode offline frames");
debug!("codec error: {err} when decode offline frames");
})
.handle_offline(addr, config.offline_config())
.await?;

let io = ack
let src = ack
.filter_incoming_ack(incoming)
.frame_decoded(
config.codec_config(),
Arc::clone(&ack),
config.client_role(),
)
.handle_online(write, addr, config.client_guid)
.await?
.manage_incoming_state()
.handle_online(addr, config.client_guid, Arc::clone(&ack))
.enter_on_item(Span::noop);

Ok(MergedIO::new(io))
Ok(SplittedIO::new(src, dst))
}
}
242 changes: 59 additions & 183 deletions src/client/handler/online.rs
Original file line number Diff line number Diff line change
@@ -1,250 +1,126 @@
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
use std::time::{SystemTime, UNIX_EPOCH};

use bytes::Bytes;
use futures::{Future, Sink, SinkExt, Stream, StreamExt};
use futures::Stream;
use log::debug;
use pin_project_lite::pin_project;

use crate::errors::{CodecError, Error};
use crate::link::SharedLink;
use crate::packet::connected::FrameBody;
use crate::Message;
use crate::RoleContext;

pub(crate) trait HandleOnline: Sized {
fn handle_online<O>(
fn handle_online(
self,
write: O,
addr: SocketAddr,
client_guid: u64,
) -> OnlineHandler<Self, O>
where
O: Sink<FrameBody, Error = CodecError>;
link: SharedLink,
) -> OnlineHandler<Self>;
}

impl<F> HandleOnline for F
where
F: Stream<Item = FrameBody>,
{
fn handle_online<O>(
fn handle_online(
self,
write: O,
addr: SocketAddr,
client_guid: u64,
) -> OnlineHandler<Self, O>
where
O: Sink<FrameBody, Error = CodecError>,
{
link: SharedLink,
) -> OnlineHandler<Self> {
link.send_frame_body(FrameBody::ConnectionRequest {
client_guid,
request_timestamp: timestamp(),
use_encryption: false,
});
OnlineHandler {
read: Some(self),
write: Some(write),
state: State::SendConnectionRequest(Some(FrameBody::ConnectionRequest {
client_guid,
request_timestamp: timestamp(),
use_encryption: false,
})),
frame: self,
state: State::WaitConnRes,
addr,
link,
role: RoleContext::Client { guid: client_guid },
}
}
}

pin_project! {
pub(crate) struct OnlineHandler<I, O> {
read: Option<I>,
write: Option<O>,
pub(crate) struct OnlineHandler<F> {
#[pin]
frame: F,
state: State,
addr: SocketAddr,
link: SharedLink,
role: RoleContext,
}
}

enum State {
SendConnectionRequest(Option<FrameBody>),
WaitConnectionRequestReply(Option<FrameBody>),
SendNewIncomingConnection(FrameBody),
WaitConnRes,
Connected,
}

impl<I, O> Future for OnlineHandler<I, O>
impl<F> Stream for OnlineHandler<F>
where
I: Stream<Item = FrameBody> + Unpin,
O: Sink<FrameBody, Error = CodecError> + Sink<Message, Error = CodecError> + Unpin,
F: Stream<Item = FrameBody>,
{
type Output = Result<impl Stream<Item = Bytes> + Sink<Message, Error = Error>, Error>;
type Item = Bytes;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let write = this.write.as_mut().unwrap();
let read = this.read.as_mut().unwrap();
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
match this.state {
State::SendConnectionRequest(pack) => {
if let Err(err) = ready!(SinkExt::<FrameBody>::poll_ready_unpin(write, cx)) {
debug!("[client] SendConnectionRequest poll_ready error: {err}, retrying");
continue;
}
if let Err(err) = write.start_send_unpin(pack.clone().unwrap()) {
debug!("[client] SendConnectionRequest start_send error: {err}, retrying");
continue;
}
if let Err(err) = ready!(SinkExt::<FrameBody>::poll_flush_unpin(write, cx)) {
debug!("[client] SendConnectionRequest poll_flush error: {err}, retrying");
continue;
}
*this.state = State::WaitConnectionRequestReply(pack.take());
}
State::WaitConnectionRequestReply(pack) => {
let Some(body) = ready!(read.poll_next_unpin(cx)) else {
return Poll::Ready(Err(Error::ConnectionClosed));
State::WaitConnRes => {
let Some(body) = ready!(this.frame.as_mut().poll_next(cx)) else {
return Poll::Ready(None);
};
match body {
FrameBody::ConnectionRequestAccepted {
if let FrameBody::ConnectionRequestAccepted {
system_addresses,
accepted_timestamp,
..
} = body
{
this.link.send_frame_body(FrameBody::NewIncomingConnection {
server_address: *this.addr,
system_addresses,
request_timestamp: timestamp(),
accepted_timestamp,
..
} => {
*this.state = State::SendNewIncomingConnection(
FrameBody::NewIncomingConnection {
server_address: *this.addr,
system_addresses,
request_timestamp: timestamp(),
accepted_timestamp,
},
);
}
_ => {
debug!("[client] got unexpected packet {body:?} on WaitConnectionRequestReply, fallback to SendConnectionRequest");
*this.state = State::SendConnectionRequest(pack.take());
}
}
}
State::SendNewIncomingConnection(pack) => {
if let Err(err) = ready!(SinkExt::<FrameBody>::poll_ready_unpin(write, cx)) {
});
*this.state = State::Connected;
debug!(
"[client] SendNewIncomingConnection poll_ready error: {err}, retrying"
"[{}] connected to server {addr:?}",
this.role,
addr = this.addr
);
continue;
}
if let Err(err) = write.start_send_unpin(pack.clone()) {
debug!(
"[client] SendNewIncomingConnection start_send error: {err}, retrying"
);
continue;
}
if let Err(err) = ready!(SinkExt::<FrameBody>::poll_flush_unpin(write, cx)) {
debug!(
"[client] SendNewIncomingConnection poll_flush error: {err}, retrying"
);
continue;
}
return Poll::Ready(Ok(FilterIO {
read: this.read.take().unwrap(),
write: this.write.take().unwrap(),
state: FilterIOState::Serving,
}));
debug!("[{}] ignore packet {body:?} on WaitConnRes", this.role);
}
}
}
}
}

pin_project! {
struct FilterIO<I, O> {
read: I,
write: O,
state: FilterIOState,
}
}

enum FilterIOState {
Serving,
SendPing,
Closed,
}

impl<I, O> Stream for FilterIO<I, O>
where
I: Stream<Item = FrameBody> + Unpin,
O: Sink<FrameBody, Error = CodecError> + Unpin,
{
type Item = Bytes;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
loop {
match this.state {
FilterIOState::Serving => {
let Some(body) = ready!(this.read.poll_next_unpin(cx)) else {
*this.state = FilterIOState::Closed;
continue;
State::Connected => {
let Some(body) = ready!(this.frame.as_mut().poll_next(cx)) else {
return Poll::Ready(None);
};
match body {
FrameBody::DisconnectNotification => *this.state = FilterIOState::Closed,
FrameBody::DetectLostConnections => *this.state = FilterIOState::SendPing,
FrameBody::DetectLostConnections => {
this.link.send_frame_body(FrameBody::ConnectedPing {
client_timestamp: timestamp(),
});
}
FrameBody::User(data) => return Poll::Ready(Some(data)),
_ => {
debug!("[client] ignore packet {body:?} on Connected",);
debug!("[{}] ignore packet {body:?} on Connected", this.role);
}
}
}
FilterIOState::SendPing => {
if let Err(err) = ready!(this.write.poll_ready_unpin(cx)) {
debug!("[client] SendPing poll_ready error: {err}, retrying");
continue;
}
if let Err(err) = this.write.start_send_unpin(FrameBody::ConnectedPing {
client_timestamp: timestamp(),
}) {
debug!("[client] SendPing start_send error: {err}, retrying");
continue;
}
if let Err(err) = ready!(this.write.poll_flush_unpin(cx)) {
debug!("[client] SendPing poll_flush error: {err}, retrying");
continue;
}
*this.state = FilterIOState::Serving;
}
FilterIOState::Closed => return Poll::Ready(None),
}
}
}
}

impl<I, O> Sink<Message> for FilterIO<I, O>
where
O: Sink<Message, Error = CodecError> + Unpin,
{
type Error = Error;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.project();
if matches!(*this.state, FilterIOState::Closed) {
return Poll::Ready(Err(Error::ConnectionClosed));
}
ready!(this.write.poll_ready_unpin(cx))?;
Poll::Ready(Ok(()))
}

fn start_send(self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
self.project().write.start_send_unpin(item)?;
Ok(())
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(self.project().write.poll_flush_unpin(cx))?;
Poll::Ready(Ok(()))
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.project();
ready!(this.write.poll_close_unpin(cx))?;
*this.state = FilterIOState::Closed;
Poll::Ready(Ok(()))
}
}

fn timestamp() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as i64
}
2 changes: 1 addition & 1 deletion src/guard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ where
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// insure all frames are received by the peer
// TODO: resend with a proper threshold or timeout here instead of infinite waiting
while !self.resend.is_empty() {
while !self.resend.is_empty() || !self.buf.is_empty() || !self.link.flush_empty() {
ready!(self.as_mut().try_empty(cx))?;
debug_assert!(self.buf.is_empty() && self.link.flush_empty());
// wait for the next resend
Expand Down
Loading

0 comments on commit 1fd2cd3

Please sign in to comment.