Skip to content

Commit

Permalink
wake on connection
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <igxnon@gmail.com>
  • Loading branch information
iGxnon committed Jul 31, 2024
1 parent 85807e3 commit 9611e86
Show file tree
Hide file tree
Showing 18 changed files with 211 additions and 254 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async-channel = "2.3.1"
bytes = "1"
concurrent-queue = "2.5.0"
fastrace = "0.6"
futures = "0.3"
futures = { version = "0.3.5", default-features = false }
futures-async-stream = "0.2"
log = "0.4"
lru = "0.12"
Expand Down
6 changes: 0 additions & 6 deletions examples/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::error::Error;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU8, Ordering};

use bytes::Bytes;
use futures::{SinkExt, StreamExt};
Expand Down Expand Up @@ -30,8 +29,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
loop {
let io = incoming.next().await.unwrap();
tokio::spawn(async move {
static ORDER_CHANNEL: AtomicU8 = AtomicU8::new(0);

tokio::pin!(io);
println!("[server] set default reliability to Reliable");
io.as_mut().set_default_reliability(Reliability::Reliable);
Expand All @@ -48,9 +45,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
.send()
.await
.unwrap();
let channel = ORDER_CHANNEL.fetch_add(1, Ordering::Relaxed);
println!("[server] assign order channel: {}", channel);
io.as_mut().set_default_order_channel(channel);
io.send(res.bytes().await.unwrap()).await.unwrap();
continue;
}
Expand Down
6 changes: 3 additions & 3 deletions src/client/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::net::ToSocketAddrs;

use super::handler::offline;
use crate::io::{Ping, IO};
use crate::{codec, RoleContext};
use crate::{codec, Role};

/// Connection implementation by using tokio's UDP framework
#[cfg(feature = "tokio-udp")]
Expand Down Expand Up @@ -117,8 +117,8 @@ impl Config {
}
}

fn client_role(&self) -> RoleContext {
RoleContext::Client {
fn client_role(&self) -> Role {
Role::Client {
guid: self.client_guid,
}
}
Expand Down
26 changes: 7 additions & 19 deletions src/client/conn/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use crate::io::{Ping, SeparatedIO, IO};
use crate::link::{Router, TransferLink};
use crate::state::{IncomingStateManage, OutgoingStateManage};
use crate::utils::TraceStreamExt;
use crate::PeerContext;

impl ConnectTo for TokioUdpSocket {
async fn connect_to(
Expand All @@ -39,25 +38,18 @@ impl ConnectTo for TokioUdpSocket {
));
};

let mut incoming = OfflineHandler::new(
let (mut incoming, peer) = OfflineHandler::new(
Framed::new(Arc::clone(&socket), config.mtu as usize), // TODO: discover MTU
addr,
config.offline_config(),
)
.await?;
let role = config.client_role();

let link = TransferLink::new_arc(config.client_role());
let dst = Framed::new(Arc::clone(&socket), config.mtu as usize)
.handle_outgoing(
Arc::clone(&link),
config.send_buf_cap,
PeerContext {
addr,
mtu: config.mtu,
},
config.client_role(),
)
.frame_encoded(config.mtu, config.codec_config(), Arc::clone(&link))
let link = TransferLink::new_arc(role, peer);
let dst = Framed::new(Arc::clone(&socket), peer.mtu as usize)
.handle_outgoing(Arc::clone(&link), config.send_buf_cap, peer, role)
.frame_encoded(peer.mtu, config.codec_config(), Arc::clone(&link))
.manage_outgoing_state(None);

let (mut router, route) = Router::new(Arc::clone(&link));
Expand All @@ -69,11 +61,7 @@ impl ConnectTo for TokioUdpSocket {
});

let src = route
.frame_decoded(
config.codec_config(),
Arc::clone(&link),
config.client_role(),
)
.frame_decoded(config.codec_config(), role)
.manage_incoming_state()
.handle_online(addr, config.client_guid, Arc::clone(&link))
.enter_on_item(Span::noop);
Expand Down
27 changes: 18 additions & 9 deletions src/client/handler/offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use pin_project_lite::pin_project;

use crate::packet::connected::{self, FramesMut};
use crate::packet::{unconnected, Packet};
use crate::RoleContext;
use crate::{Peer, Role};

#[derive(Debug, Clone, Copy)]
pub(crate) struct Config {
Expand All @@ -23,7 +23,7 @@ pin_project! {
state: State,
server_addr: SocketAddr,
config: Config,
role: RoleContext,
role: Role,
}
}

Expand All @@ -42,7 +42,7 @@ where
mtu: config.mtu,
}),
server_addr,
role: RoleContext::Client {
role: Role::Client {
guid: config.client_guid,
},
config,
Expand All @@ -65,7 +65,7 @@ where
+ Sink<(unconnected::Packet, SocketAddr), Error = io::Error>
+ Unpin,
{
type Output = Result<impl Stream<Item = connected::Packet<FramesMut>>, io::Error>;
type Output = Result<(impl Stream<Item = connected::Packet<FramesMut>>, Peer), io::Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
Expand Down Expand Up @@ -124,14 +124,23 @@ where
}
match pack {
Packet::Unconnected(unconnected::Packet::OpenConnectionReply2 {
server_guid: guid,
..
}) => {}
}) => {
return Poll::Ready(Ok((
FilterConnected {
frame: this.frame.take().unwrap(),
server_addr: *this.server_addr,
},
Peer {
addr: *this.server_addr,
mtu: this.config.mtu,
guid,
},
)))
}
_ => continue,
};
return Poll::Ready(Ok(FilterConnected {
frame: this.frame.take().unwrap(),
server_addr: *this.server_addr,
}));
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/client/handler/online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use pin_project_lite::pin_project;
use crate::link::SharedLink;
use crate::packet::connected::FrameBody;
use crate::utils::timestamp;
use crate::RoleContext;
use crate::Role;

pub(crate) trait HandleOnline: Sized {
fn handle_online(
Expand Down Expand Up @@ -41,7 +41,7 @@ where
state: State::WaitConnRes,
addr,
link,
role: RoleContext::Client { guid: client_guid },
role: Role::Client { guid: client_guid },
}
}
}
Expand All @@ -53,7 +53,7 @@ pin_project! {
state: State,
addr: SocketAddr,
link: SharedLink,
role: RoleContext,
role: Role,
}
}

Expand Down
50 changes: 7 additions & 43 deletions src/codec/decoder/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use lru::LruCache;
use pin_project_lite::pin_project;

use crate::errors::CodecError;
use crate::link::SharedLink;
use crate::packet::connected::{Fragment, Frame, FrameMut, FrameSet, FramesMut};

const DEFAULT_DEFRAGMENT_BUF_SIZE: usize = 512;
Expand Down Expand Up @@ -55,36 +54,24 @@ pin_project! {
// users sending a large number of parted IDs.
parts: LruCache<u16, BinaryHeap<FramePart>>,
buffer: VecDeque<FrameSet<Frame>>,
link: SharedLink,
span: Option<Span>,
}
}

pub(crate) trait DeFragmented: Sized {
fn defragmented(
self,
limit_size: u32,
limit_parted: usize,
link: SharedLink,
) -> DeFragment<Self>;
fn defragmented(self, limit_size: u32, limit_parted: usize) -> DeFragment<Self>;
}

impl<F> DeFragmented for F
where
F: Stream<Item = Result<FrameSet<FramesMut>, CodecError>>,
{
fn defragmented(
self,
limit_size: u32,
limit_parted: usize,
link: SharedLink,
) -> DeFragment<Self> {
fn defragmented(self, limit_size: u32, limit_parted: usize) -> DeFragment<Self> {
DeFragment {
frame: self,
limit_size,
parts: LruCache::new(NonZeroUsize::new(limit_parted).expect("limit_parted > 0")),
buffer: VecDeque::with_capacity(DEFAULT_DEFRAGMENT_BUF_SIZE),
link,
span: None,
}
}
Expand Down Expand Up @@ -124,8 +111,6 @@ where
{
// promise that parted_index is always less than parted_size
if parted_index >= parted_size {
// perhaps network bit-flips
this.link.outgoing_nack(frame_set.seq_num);
let err = format!(
"parted_index {} >= parted_size {}",
parted_index, parted_size
Expand Down Expand Up @@ -194,7 +179,6 @@ mod test {

use super::*;
use crate::errors::CodecError;
use crate::link::TransferLink;
use crate::packet::connected::{Flags, Fragment, Frame, FrameSet, FramesMut};

fn frame_set<'a, T: AsRef<str> + 'a>(
Expand Down Expand Up @@ -253,11 +237,7 @@ mod test {
};

tokio::pin!(frame);
let mut frag = frame.map(Ok).defragmented(
0,
512,
TransferLink::new_arc(crate::RoleContext::test_server()),
);
let mut frag = frame.map(Ok).defragmented(0, 512);
let set = frag.next().await.unwrap().unwrap();

// frames should be merged
Expand All @@ -280,11 +260,7 @@ mod test {
}
};
tokio::pin!(frame);
let mut frag = frame.map(Ok).defragmented(
20,
512,
TransferLink::new_arc(crate::RoleContext::test_server()),
);
let mut frag = frame.map(Ok).defragmented(20, 512);
assert!(matches!(
frag.next().await.unwrap(),
Err(CodecError::PartedFrame(..))
Expand Down Expand Up @@ -313,11 +289,7 @@ mod test {
};

tokio::pin!(frame);
let mut frag = frame.map(Ok).defragmented(
0,
2,
TransferLink::new_arc(crate::RoleContext::test_server()),
);
let mut frag = frame.map(Ok).defragmented(0, 2);
assert!(frag.next().await.is_none());
assert_eq!(frag.parts.len(), 2);
assert_eq!(frag.parts.peek(&0).unwrap().len(), 2);
Expand All @@ -341,11 +313,7 @@ mod test {
};

tokio::pin!(frame);
let mut frag = frame.map(Ok).defragmented(
0,
2,
TransferLink::new_arc(crate::RoleContext::test_server()),
);
let mut frag = frame.map(Ok).defragmented(0, 2);

{
let set = frag.next().await.unwrap().unwrap();
Expand Down Expand Up @@ -382,11 +350,7 @@ mod test {
};

tokio::pin!(frame);
let mut frag = frame.map(Ok).defragmented(
0,
1,
TransferLink::new_arc(crate::RoleContext::test_server()),
);
let mut frag = frame.map(Ok).defragmented(0, 1);

let set = frag.next().await.unwrap().unwrap();
assert_eq!(
Expand Down
16 changes: 8 additions & 8 deletions src/codec/encoder/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,31 +217,31 @@ mod test {
Reliability::ReliableOrdered,
0,
Bytes::from_static(b"hello world"),
));
)).unwrap();
// 2
dst.as_mut().start_send(Message::new(
Reliability::ReliableOrdered,
1,
Bytes::from_static(b"hello world, hello world, hello world, hello world"),
));
)).unwrap();
// 1
dst.as_mut().start_send(Message::new(
Reliability::Reliable,
0,
Bytes::from_static(b"hello world"),
));
)).unwrap();
// 2
dst.as_mut().start_send(Message::new(
Reliability::Unreliable, // adjust to reliable
0,
Bytes::from_static(b"hello world, hello world, hello world, hello world"),
));
)).unwrap();
// 1
dst.as_mut().start_send(Message::new(
Reliability::Unreliable,
0,
Bytes::from_static(b"hello world"),
));
)).unwrap();

assert_eq!(dst.order_write_index[0].to_u32(), 1); // 1 message on channel 0 requires ordering, next ordered frame index is 1
assert_eq!(dst.order_write_index[1].to_u32(), 1); // 1 message on channel 1 requires ordering, next ordered frame index is 1
Expand All @@ -262,7 +262,7 @@ mod test {
Reliability::ReliableOrdered,
100,
Bytes::from_static(b"hello world"),
));
)).unwrap();
}

#[test]
Expand All @@ -273,7 +273,7 @@ mod test {
Reliability::ReliableOrdered,
0,
Bytes::from_iter(std::iter::repeat(0xfe).take(50 - FRAME_SET_HEADER_SIZE - 10)),
));
)).unwrap();
assert_eq!(dst.frame.buf.len(), 1);
assert!(dst.frame.buf[0].fragment.is_none());
assert_eq!(dst.frame.buf[0].size(), 50 - FRAME_SET_HEADER_SIZE);
Expand All @@ -287,7 +287,7 @@ mod test {
Reliability::ReliableOrdered,
0,
Bytes::from_iter(std::iter::repeat(0xfe).take(50)),
));
)).unwrap();
assert_eq!(dst.frame.buf.len(), 2);
let mut fragment = dst.frame.buf[0].fragment.unwrap();
let r = dst.frame.buf[0].flags.reliability.size();
Expand Down
Loading

0 comments on commit 9611e86

Please sign in to comment.