Skip to content

Commit

Permalink
trace log on packets
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <igxnon@gmail.com>
  • Loading branch information
iGxnon committed Jul 31, 2024
1 parent 9611e86 commit 92c018e
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 62 deletions.
2 changes: 1 addition & 1 deletion src/client/conn/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl ConnectTo for TokioUdpSocket {
});

let src = route
.frame_decoded(config.codec_config(), role)
.frame_decoded(config.codec_config(), role, peer)
.manage_incoming_state()
.handle_online(addr, config.client_guid, Arc::clone(&link))
.enter_on_item(Span::noop);
Expand Down
96 changes: 56 additions & 40 deletions src/codec/encoder/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,35 +213,45 @@ mod test {
let dst = DstSink::default().fragmented(50, 8);
tokio::pin!(dst);
// 1
dst.as_mut().start_send(Message::new(
Reliability::ReliableOrdered,
0,
Bytes::from_static(b"hello world"),
)).unwrap();
dst.as_mut()
.start_send(Message::new(
Reliability::ReliableOrdered,
0,
Bytes::from_static(b"hello world"),
))
.unwrap();
// 2
dst.as_mut().start_send(Message::new(
Reliability::ReliableOrdered,
1,
Bytes::from_static(b"hello world, hello world, hello world, hello world"),
)).unwrap();
dst.as_mut()
.start_send(Message::new(
Reliability::ReliableOrdered,
1,
Bytes::from_static(b"hello world, hello world, hello world, hello world"),
))
.unwrap();
// 1
dst.as_mut().start_send(Message::new(
Reliability::Reliable,
0,
Bytes::from_static(b"hello world"),
)).unwrap();
dst.as_mut()
.start_send(Message::new(
Reliability::Reliable,
0,
Bytes::from_static(b"hello world"),
))
.unwrap();
// 2
dst.as_mut().start_send(Message::new(
Reliability::Unreliable, // adjust to reliable
0,
Bytes::from_static(b"hello world, hello world, hello world, hello world"),
)).unwrap();
dst.as_mut()
.start_send(Message::new(
Reliability::Unreliable, // adjust to reliable
0,
Bytes::from_static(b"hello world, hello world, hello world, hello world"),
))
.unwrap();
// 1
dst.as_mut().start_send(Message::new(
Reliability::Unreliable,
0,
Bytes::from_static(b"hello world"),
)).unwrap();
dst.as_mut()
.start_send(Message::new(
Reliability::Unreliable,
0,
Bytes::from_static(b"hello world"),
))
.unwrap();

assert_eq!(dst.order_write_index[0].to_u32(), 1); // 1 message on channel 0 requires ordering, next ordered frame index is 1
assert_eq!(dst.order_write_index[1].to_u32(), 1); // 1 message on channel 1 requires ordering, next ordered frame index is 1
Expand All @@ -258,22 +268,26 @@ mod test {
fn test_fragmented_panic() {
let dst = DstSink::default().fragmented(50, 8);
tokio::pin!(dst);
dst.as_mut().start_send(Message::new(
Reliability::ReliableOrdered,
100,
Bytes::from_static(b"hello world"),
)).unwrap();
dst.as_mut()
.start_send(Message::new(
Reliability::ReliableOrdered,
100,
Bytes::from_static(b"hello world"),
))
.unwrap();
}

#[test]
fn test_fragmented_fulfill_one_packet() {
let dst = DstSink::default().fragmented(50, 8);
tokio::pin!(dst);
dst.as_mut().start_send(Message::new(
Reliability::ReliableOrdered,
0,
Bytes::from_iter(std::iter::repeat(0xfe).take(50 - FRAME_SET_HEADER_SIZE - 10)),
)).unwrap();
dst.as_mut()
.start_send(Message::new(
Reliability::ReliableOrdered,
0,
Bytes::from_iter(std::iter::repeat(0xfe).take(50 - FRAME_SET_HEADER_SIZE - 10)),
))
.unwrap();
assert_eq!(dst.frame.buf.len(), 1);
assert!(dst.frame.buf[0].fragment.is_none());
assert_eq!(dst.frame.buf[0].size(), 50 - FRAME_SET_HEADER_SIZE);
Expand All @@ -283,11 +297,13 @@ mod test {
fn test_fragmented_split_packet() {
let dst = DstSink::default().fragmented(50, 8);
tokio::pin!(dst);
dst.as_mut().start_send(Message::new(
Reliability::ReliableOrdered,
0,
Bytes::from_iter(std::iter::repeat(0xfe).take(50)),
)).unwrap();
dst.as_mut()
.start_send(Message::new(
Reliability::ReliableOrdered,
0,
Bytes::from_iter(std::iter::repeat(0xfe).take(50)),
))
.unwrap();
assert_eq!(dst.frame.buf.len(), 2);
let mut fragment = dst.frame.buf[0].fragment.unwrap();
let r = dst.frame.buf[0].flags.reliability.size();
Expand Down
29 changes: 16 additions & 13 deletions src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use self::encoder::{BodyEncoded, Fragmented};
use crate::link::SharedLink;
use crate::packet::connected::{Frame, FrameBody, FrameSet, FramesMut};
use crate::utils::Logged;
use crate::{Message, Role};
use crate::{Message, Peer, Role};

/// Codec config
#[derive(Clone, Copy, Debug)]
Expand Down Expand Up @@ -72,26 +72,32 @@ pub(crate) trait AsyncSocket: Unpin {
/// Frames pipeline decoder
/// It will convert the stream of raw frames into defragmented, deduplicated and ordered frames.
pub(crate) trait Decoded {
fn frame_decoded(self, config: Config, role: Role) -> impl Stream<Item = FrameBody>;
fn frame_decoded(self, config: Config, role: Role, peer: Peer)
-> impl Stream<Item = FrameBody>;
}

impl<F> Decoded for F
where
F: Stream<Item = FrameSet<FramesMut>>,
{
fn frame_decoded(self, config: Config, role: Role) -> impl Stream<Item = FrameBody> {
fn frame_decoded(
self,
config: Config,
role: Role,
peer: Peer,
) -> impl Stream<Item = FrameBody> {
self.map(Ok)
.trace_pending()
.deduplicated()
.defragmented(config.max_parted_size, config.max_parted_count)
.ordered(config.max_channels)
.body_decoded()
.logged_all(
.logged(
move |pack| {
trace!("[{role}] received packet: {:?}", pack);
trace!("[{role}] received packet: {:?} from {peer}", pack);
},
move |err| {
debug!("[{role}] got codec error: {err} when pipelining packets");
debug!("[{role}] got codec error: {err} when pipelining packets from {peer}");
},
)
}
Expand Down Expand Up @@ -132,9 +138,8 @@ pub mod micro_bench {
use rand::{Rng, SeedableRng};

use super::{Config, Decoded, FrameSet, FramesMut, Stream};
use crate::link::TransferLink;
use crate::packet::connected::{Flags, Fragment, Frame, Ordered};
use crate::{Reliability, Role};
use crate::{Peer, Reliability, Role};

#[derive(Debug, Clone)]
pub struct Options {
Expand Down Expand Up @@ -275,11 +280,9 @@ pub mod micro_bench {
#[allow(clippy::semicolon_if_nothing_returned)]
pub async fn bench_decoded(self) {
let config = self.config;
let link = TransferLink::new_arc(Role::test_server());

let stream = self
.into_stream()
.frame_decoded(config, link, Role::test_server());
let stream =
self.into_stream()
.frame_decoded(config, Role::test_server(), Peer::test());
#[futures_async_stream::for_await]
for _r in stream {}
}
Expand Down
2 changes: 1 addition & 1 deletion src/server/incoming/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl Stream for Incoming {
)));

let src = route
.frame_decoded(this.config.codec_config(), role)
.frame_decoded(this.config.codec_config(), role, peer)
.manage_incoming_state()
.handle_online(role, peer.addr, Arc::clone(&link))
.enter_on_item(move || {
Expand Down
12 changes: 5 additions & 7 deletions src/utils/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use futures::{Sink, Stream};
use pin_project_lite::pin_project;

pub(crate) trait Logged<T, E>: Sized {
fn logged_all(
fn logged(
self,
ok_f: impl Fn(&T) + Send + Sync + 'static,
err_f: impl Fn(&E) + Send + Sync + 'static,
Expand All @@ -16,15 +16,15 @@ impl<F, T, E> Logged<T, E> for F
where
F: Stream<Item = Result<T, E>>,
{
fn logged_all(
fn logged(
self,
ok_f: impl Fn(&T) + Send + Sync + 'static,
err_f: impl Fn(&E) + Send + Sync + 'static,
) -> Log<Self, T, E> {
Log {
source: self,
err_f: Box::new(err_f),
ok_f: Some(Box::new(ok_f)),
ok_f: Box::new(ok_f),
}
}
}
Expand All @@ -34,7 +34,7 @@ pin_project! {
pub(crate) struct Log<F, T, E> {
#[pin]
source: F,
ok_f: Option<Box<dyn Fn(&T) + Send + Sync>>,
ok_f: Box<dyn Fn(&T) + Send + Sync>,
err_f: Box<dyn Fn(&E) + Send + Sync>,
}
}
Expand All @@ -59,9 +59,7 @@ where
continue;
}
};
if let Some(ok_f) = this.ok_f {
ok_f(&v);
}
(*this.ok_f)(&v);
return Poll::Ready(Some(v));
}
}
Expand Down

0 comments on commit 92c018e

Please sign in to comment.