Skip to content

Commit

Permalink
use std error
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <igxnon@gmail.com>
  • Loading branch information
iGxnon committed Jul 31, 2024
1 parent 34ed251 commit 85807e3
Show file tree
Hide file tree
Showing 16 changed files with 197 additions and 263 deletions.
4 changes: 2 additions & 2 deletions src/client/conn/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::io;
use std::net::ToSocketAddrs;

use super::handler::offline;
use crate::errors::Error;
use crate::io::{Ping, IO};
use crate::{codec, RoleContext};

Expand Down Expand Up @@ -130,5 +130,5 @@ pub trait ConnectTo: Sized {
self,
addr: impl ToSocketAddrs,
config: Config,
) -> Result<impl IO + Ping, Error>;
) -> io::Result<impl IO + Ping>;
}
8 changes: 5 additions & 3 deletions src/client/conn/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use crate::client::handler::offline::OfflineHandler;
use crate::client::handler::online::HandleOnline;
use crate::codec::frame::Framed;
use crate::codec::{Decoded, Encoded};
use crate::errors::Error;
use crate::guard::HandleOutgoing;
use crate::io::{Ping, SeparatedIO, IO};
use crate::link::{Router, TransferLink};
Expand All @@ -24,7 +23,7 @@ impl ConnectTo for TokioUdpSocket {
self,
addrs: impl ToSocketAddrs,
config: super::Config,
) -> Result<impl IO + Ping, Error> {
) -> io::Result<impl IO + Ping> {
let socket = Arc::new(self);
let mut lookups = addrs.to_socket_addrs()?;
let addr = loop {
Expand All @@ -34,7 +33,10 @@ impl ConnectTo for TokioUdpSocket {
}
continue;
}
return Err(io::Error::new(io::ErrorKind::AddrNotAvailable, "invalid address").into());
return Err(io::Error::new(
io::ErrorKind::AddrNotAvailable,
"invalid address",
));
};

let mut incoming = OfflineHandler::new(
Expand Down
71 changes: 18 additions & 53 deletions src/client/handler/offline.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{ready, Context, Poll};

use futures::{Future, Sink, SinkExt, Stream, StreamExt};
use log::debug;
use pin_project_lite::pin_project;

use crate::errors::{CodecError, Error};
use crate::packet::connected::{self, FramesMut};
use crate::packet::{unconnected, Packet};
use crate::RoleContext;
Expand All @@ -31,7 +30,7 @@ pin_project! {
impl<F> OfflineHandler<F>
where
F: Stream<Item = (Packet<FramesMut>, SocketAddr)>
+ Sink<(unconnected::Packet, SocketAddr), Error = CodecError>
+ Sink<(unconnected::Packet, SocketAddr), Error = io::Error>
+ Unpin,
{
pub(crate) fn new(frame: F, server_addr: SocketAddr, config: Config) -> Self {
Expand Down Expand Up @@ -63,48 +62,31 @@ enum State {
impl<F> Future for OfflineHandler<F>
where
F: Stream<Item = (Packet<FramesMut>, SocketAddr)>
+ Sink<(unconnected::Packet, SocketAddr), Error = CodecError>
+ Sink<(unconnected::Packet, SocketAddr), Error = io::Error>
+ Unpin,
{
type Output = Result<impl Stream<Item = connected::Packet<FramesMut>>, Error>;
type Output = Result<impl Stream<Item = connected::Packet<FramesMut>>, io::Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let frame = this.frame.as_mut().unwrap();
loop {
match this.state {
State::SendOpenConnReq1(pack) => {
if let Err(err) = ready!(frame.poll_ready_unpin(cx)) {
debug!(
"[{}] SendingOpenConnectionRequest1 poll_ready error: {err}, retrying",
this.role
);
continue;
}
if let Err(err) = frame.start_send_unpin((pack.clone(), *this.server_addr)) {
debug!(
"[{}] SendingOpenConnectionRequest1 start_send error: {err}, retrying",
this.role
);
continue;
}
ready!(frame.poll_ready_unpin(cx))?;
frame.start_send_unpin((pack.clone(), *this.server_addr))?;
*this.state = State::SendOpenConnReq1Flush;
}
State::SendOpenConnReq1Flush => {
if let Err(err) = ready!(frame.poll_flush_unpin(cx)) {
debug!(
"[{}] SendingOpenConnectionRequest1 poll_flush error: {err}, retrying",
this.role
);
continue;
}
ready!(frame.poll_flush_unpin(cx))?;
*this.state = State::WaitOpenConnReply1;
}
State::WaitOpenConnReply1 => {
// TODO: Add timeout
let Some((pack, addr)) = ready!(frame.poll_next_unpin(cx)) else {
return Poll::Ready(Err(Error::ConnectionClosed));
};
let (pack, addr) = ready!(frame.poll_next_unpin(cx)).ok_or(io::Error::new(
io::ErrorKind::ConnectionReset,
"connection reset by peer",
))?;
if addr != *this.server_addr {
continue;
}
Expand All @@ -123,37 +105,20 @@ where
*this.state = State::SendOpenConnReq2(next);
}
State::SendOpenConnReq2(pack) => {
if let Err(err) = ready!(frame.poll_ready_unpin(cx)) {
debug!(
"[{}] SendOpenConnectionRequest2 poll_ready error: {err}, retrying",
this.role
);
continue;
}
if let Err(err) = frame.start_send_unpin((pack.clone(), *this.server_addr)) {
debug!(
"[{}] SendOpenConnectionRequest2 start_send error: {err}, retrying",
this.role
);
continue;
}
ready!(frame.poll_ready_unpin(cx))?;
frame.start_send_unpin((pack.clone(), *this.server_addr))?;
*this.state = State::SendOpenConnReq2Flush;
}
State::SendOpenConnReq2Flush => {
if let Err(err) = ready!(frame.poll_flush_unpin(cx)) {
debug!(
"[{}] SendOpenConnectionRequest2 poll_flush error: {err}, retrying",
this.role
);
continue;
}
ready!(frame.poll_flush_unpin(cx))?;
*this.state = State::WaitOpenConnReply2;
}
State::WaitOpenConnReply2 => {
// TODO: Add timeout
let Some((pack, addr)) = ready!(frame.poll_next_unpin(cx)) else {
return Poll::Ready(Err(Error::ConnectionClosed));
};
let (pack, addr) = ready!(frame.poll_next_unpin(cx)).ok_or(io::Error::new(
io::ErrorKind::ConnectionReset,
"connection reset by peer",
))?;
if addr != *this.server_addr {
continue;
}
Expand Down
6 changes: 3 additions & 3 deletions src/codec/decoder/body.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};

use fastrace::local::LocalSpan;
use fastrace::Event;
use futures::{ready, Stream, StreamExt};
use futures::Stream;
use pin_project_lite::pin_project;

use crate::errors::CodecError;
Expand Down Expand Up @@ -38,7 +38,7 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();

let Some(frame_set) = ready!(this.frame.poll_next_unpin(cx)?) else {
let Some(frame_set) = ready!(this.frame.as_mut().poll_next(cx)?) else {
return Poll::Ready(None);
};

Expand Down
6 changes: 3 additions & 3 deletions src/codec/decoder/dedup.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};

use fastrace::Span;
use futures::{ready, Stream, StreamExt};
use futures::Stream;
use pin_project_lite::pin_project;

use crate::errors::CodecError;
Expand Down Expand Up @@ -88,7 +88,7 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
let Some(mut frame_set) = ready!(this.frame.poll_next_unpin(cx)?) else {
let Some(mut frame_set) = ready!(this.frame.as_mut().poll_next(cx)?) else {
return Poll::Ready(None);
};
this.span.get_or_insert_with(|| {
Expand Down
6 changes: 3 additions & 3 deletions src/codec/decoder/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ use std::cmp::Reverse;
use std::collections::{BinaryHeap, VecDeque};
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};

use bytes::BufMut;
use fastrace::{Event, Span};
use futures::{ready, Stream, StreamExt};
use futures::Stream;
use lru::LruCache;
use pin_project_lite::pin_project;

Expand Down Expand Up @@ -104,7 +104,7 @@ where
this.span.take();
return Poll::Ready(Some(Ok(frame_set)));
}
let Some(frame_set) = ready!(this.frame.poll_next_unpin(cx)?) else {
let Some(frame_set) = ready!(this.frame.as_mut().poll_next(cx)?) else {
return Poll::Ready(None);
};
this.span.get_or_insert_with(|| {
Expand Down
6 changes: 3 additions & 3 deletions src/codec/decoder/ordered.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::collections::HashMap;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};

use bytes::Buf;
use fastrace::{Event, Span};
use futures::{ready, Stream, StreamExt};
use futures::Stream;
use log::warn;
use pin_project_lite::pin_project;

Expand Down Expand Up @@ -89,7 +89,7 @@ where
}
}

let Some(frame_set) = ready!(this.frame.poll_next_unpin(cx)?) else {
let Some(frame_set) = ready!(this.frame.as_mut().poll_next(cx)?) else {
return Poll::Ready(None);
};
this.span.get_or_insert_with(|| {
Expand Down
16 changes: 8 additions & 8 deletions src/codec/encoder/body.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::io;
use std::pin::Pin;
use std::task::{ready, Context, Poll};

use bytes::BytesMut;
use futures::Sink;
use pin_project_lite::pin_project;

use crate::errors::CodecError;
use crate::link::SharedLink;
use crate::packet::connected::FrameBody;
use crate::{Message, Reliability};
Expand All @@ -25,7 +25,7 @@ pub(crate) trait BodyEncoded: Sized {

impl<F> BodyEncoded for F
where
F: Sink<Message, Error = CodecError>,
F: Sink<Message, Error = io::Error>,
{
fn body_encoded(self, link: SharedLink) -> BodyEncoder<Self> {
BodyEncoder { frame: self, link }
Expand Down Expand Up @@ -59,13 +59,13 @@ fn encode(body: FrameBody) -> Message {

impl<F> BodyEncoder<F>
where
F: Sink<Message, Error = CodecError>,
F: Sink<Message, Error = io::Error>,
{
/// Empty the link buffer all the frame body, insure the frame is ready to send
pub(crate) fn poll_empty(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), CodecError>> {
) -> Poll<Result<(), io::Error>> {
let mut this = self.project();
if this.link.frame_body_empty() {
return Poll::Ready(Ok(()));
Expand All @@ -85,9 +85,9 @@ where

impl<F> Sink<Message> for BodyEncoder<F>
where
F: Sink<Message, Error = CodecError>,
F: Sink<Message, Error = io::Error>,
{
type Error = CodecError;
type Error = io::Error;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Sink::<FrameBody>::poll_ready(self, cx)
Expand All @@ -109,9 +109,9 @@ where

impl<F> Sink<FrameBody> for BodyEncoder<F>
where
F: Sink<Message, Error = CodecError>,
F: Sink<Message, Error = io::Error>,
{
type Error = CodecError;
type Error = io::Error;

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(self.as_mut().poll_empty(cx))?;
Expand Down
Loading

0 comments on commit 85807e3

Please sign in to comment.