Skip to content

Commit

Permalink
split io on server
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <igxnon@gmail.com>
  • Loading branch information
iGxnon committed Jul 15, 2024
1 parent 4c719a8 commit 898702e
Show file tree
Hide file tree
Showing 7 changed files with 264 additions and 303 deletions.
4 changes: 2 additions & 2 deletions src/client/conn/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::codec::tokio::Codec;
use crate::codec::{Decoded, Encoded};
use crate::errors::Error;
use crate::guard::HandleOutgoing;
use crate::io::{IOImpl, IO};
use crate::io::{MergedIO, IO};
use crate::link::TransferLink;
use crate::utils::{Logged, StreamExt};
use crate::PeerContext;
Expand Down Expand Up @@ -69,6 +69,6 @@ impl ConnectTo for TokioUdpSocket {
.await?
.enter_on_item(Span::noop);

Ok(IOImpl::new(io))
Ok(MergedIO::new(io))
}
}
7 changes: 4 additions & 3 deletions src/guard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,15 +229,16 @@ where
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// insure all frames are sent
// TODO: resend with a proper threshold or timeout here
// insure all frames are received by the peer
// TODO: resend with a proper threshold or timeout here instead of infinite waiting
while !self.resend.is_empty() {
ready!(self.as_mut().try_empty(cx))?;
debug_assert!(self.buf.is_empty() && self.link.flush_empty());
// wait for the next resend
ready!(self.resend.poll_wait(cx));
}
self.project().frame.poll_close(cx)
}
}

// TODO: test
// TODO: test
126 changes: 118 additions & 8 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,28 @@ pub trait IO:
}

pin_project! {
/// The detailed implementation of [`crate::io::IO`]
pub(crate) struct IOImpl<IO> {
pub(crate) struct MergedIO<IO> {
#[pin]
io: IO,
default_reliability: Reliability,
default_order_channel: u8,
}
}

impl<IO> IOImpl<IO>
impl<IO> MergedIO<IO>
where
IO: Stream<Item = Bytes> + Sink<Message, Error = Error> + TraceInfo + Send,
{
pub(crate) fn new(io: IO) -> Self {
IOImpl {
MergedIO {
io,
default_reliability: Reliability::ReliableOrdered,
default_order_channel: 0,
}
}
}

impl<IO> Stream for IOImpl<IO>
impl<IO> Stream for MergedIO<IO>
where
IO: Stream<Item = Bytes>,
{
Expand All @@ -62,7 +61,7 @@ where
}
}

impl<IO> Sink<Bytes> for IOImpl<IO>
impl<IO> Sink<Bytes> for MergedIO<IO>
where
IO: Sink<Message, Error = Error>,
{
Expand All @@ -86,7 +85,7 @@ where
}
}

impl<IO> Sink<Message> for IOImpl<IO>
impl<IO> Sink<Message> for MergedIO<IO>
where
IO: Sink<Message, Error = Error>,
{
Expand All @@ -109,7 +108,7 @@ where
}
}

impl<IO> crate::io::IO for IOImpl<IO>
impl<IO> crate::io::IO for MergedIO<IO>
where
IO: Sink<Message, Error = Error> + Stream<Item = Bytes> + TraceInfo + Send,
{
Expand All @@ -134,3 +133,114 @@ where
self.io.get_last_trace_id()
}
}

pin_project! {
pub(crate) struct SplittedIO<I, O> {
#[pin]
src: I,
#[pin]
dst: O,
default_reliability: Reliability,
default_order_channel: u8,
}
}

impl<I, O> SplittedIO<I, O>
where
I: Stream<Item = Bytes> + TraceInfo + Send,
O: Sink<Message, Error = Error> + Send,
{
pub(crate) fn new(src: I, dst: O) -> Self {
SplittedIO {
src,
dst,
default_reliability: Reliability::ReliableOrdered,
default_order_channel: 0,
}
}
}

impl<I, O> Stream for SplittedIO<I, O>
where
I: Stream<Item = Bytes>,
{
type Item = Bytes;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().src.poll_next(cx)
}
}

impl<I, O> Sink<Bytes> for SplittedIO<I, O>
where
O: Sink<Message, Error = Error>,
{
type Error = Error;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Sink::<Message>::poll_ready(self, cx)
}

fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> {
let msg = Message::new(self.default_reliability, self.default_order_channel, item);
Sink::<Message>::start_send(self, msg)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Sink::<Message>::poll_flush(self, cx)
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Sink::<Message>::poll_close(self, cx)
}
}

impl<I, O> Sink<Message> for SplittedIO<I, O>
where
O: Sink<Message, Error = Error>,
{
type Error = Error;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().dst.poll_ready(cx)
}

fn start_send(self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
self.project().dst.start_send(item)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().dst.poll_flush(cx)
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().dst.poll_close(cx)
}
}

impl<I, O> crate::io::IO for SplittedIO<I, O>
where
O: Sink<Message, Error = Error> + Send,
I: Stream<Item = Bytes> + TraceInfo + Send,
{
fn set_default_reliability(&mut self, reliability: Reliability) {
self.default_reliability = reliability;
}

fn get_default_reliability(&self) -> Reliability {
self.default_reliability
}

fn set_default_order_channel(&mut self, order_channel: u8) {
self.default_order_channel = order_channel;
}

fn get_default_order_channel(&self) -> u8 {
self.default_order_channel
}

/// Get the last `trace_id` after polling Bytes form it, used for end to end tracing
fn last_trace_id(&self) -> Option<TraceId> {
self.src.get_last_trace_id()
}
}
3 changes: 1 addition & 2 deletions src/server/handler/offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,10 @@ where
return Poll::Ready(Some((pack, peer.clone())));
}
debug!(
"[{}] ignore connected packet {:?} from unconnected client {addr}",
"[{}] ignore packet {:?} from unconnected client {addr}",
this.role,
pack.pack_type()
);
// TODO: Send DETECT_LOST_CONNECTION ?
*this.state = OfflineState::SendingPrepare(Some((
Self::make_connection_request_failed(this.config),
addr,
Expand Down
Loading

0 comments on commit 898702e

Please sign in to comment.