From 92c018e2bf490fba934be5da67193627fbe0dc41 Mon Sep 17 00:00:00 2001 From: iGxnon Date: Thu, 1 Aug 2024 03:10:05 +0800 Subject: [PATCH] trace log on packets Signed-off-by: iGxnon --- src/client/conn/tokio.rs | 2 +- src/codec/encoder/fragment.rs | 96 ++++++++++++++++++++--------------- src/codec/mod.rs | 29 ++++++----- src/server/incoming/tokio.rs | 2 +- src/utils/log.rs | 12 ++--- 5 files changed, 79 insertions(+), 62 deletions(-) diff --git a/src/client/conn/tokio.rs b/src/client/conn/tokio.rs index 2a6ba87..3ccfb1a 100644 --- a/src/client/conn/tokio.rs +++ b/src/client/conn/tokio.rs @@ -61,7 +61,7 @@ impl ConnectTo for TokioUdpSocket { }); let src = route - .frame_decoded(config.codec_config(), role) + .frame_decoded(config.codec_config(), role, peer) .manage_incoming_state() .handle_online(addr, config.client_guid, Arc::clone(&link)) .enter_on_item(Span::noop); diff --git a/src/codec/encoder/fragment.rs b/src/codec/encoder/fragment.rs index 9b6f696..15572c0 100644 --- a/src/codec/encoder/fragment.rs +++ b/src/codec/encoder/fragment.rs @@ -213,35 +213,45 @@ mod test { let dst = DstSink::default().fragmented(50, 8); tokio::pin!(dst); // 1 - dst.as_mut().start_send(Message::new( - Reliability::ReliableOrdered, - 0, - Bytes::from_static(b"hello world"), - )).unwrap(); + dst.as_mut() + .start_send(Message::new( + Reliability::ReliableOrdered, + 0, + Bytes::from_static(b"hello world"), + )) + .unwrap(); // 2 - dst.as_mut().start_send(Message::new( - Reliability::ReliableOrdered, - 1, - Bytes::from_static(b"hello world, hello world, hello world, hello world"), - )).unwrap(); + dst.as_mut() + .start_send(Message::new( + Reliability::ReliableOrdered, + 1, + Bytes::from_static(b"hello world, hello world, hello world, hello world"), + )) + .unwrap(); // 1 - dst.as_mut().start_send(Message::new( - Reliability::Reliable, - 0, - Bytes::from_static(b"hello world"), - )).unwrap(); + dst.as_mut() + .start_send(Message::new( + Reliability::Reliable, + 0, + Bytes::from_static(b"hello world"), + )) + .unwrap(); // 2 - dst.as_mut().start_send(Message::new( - Reliability::Unreliable, // adjust to reliable - 0, - Bytes::from_static(b"hello world, hello world, hello world, hello world"), - )).unwrap(); + dst.as_mut() + .start_send(Message::new( + Reliability::Unreliable, // adjust to reliable + 0, + Bytes::from_static(b"hello world, hello world, hello world, hello world"), + )) + .unwrap(); // 1 - dst.as_mut().start_send(Message::new( - Reliability::Unreliable, - 0, - Bytes::from_static(b"hello world"), - )).unwrap(); + dst.as_mut() + .start_send(Message::new( + Reliability::Unreliable, + 0, + Bytes::from_static(b"hello world"), + )) + .unwrap(); assert_eq!(dst.order_write_index[0].to_u32(), 1); // 1 message on channel 0 requires ordering, next ordered frame index is 1 assert_eq!(dst.order_write_index[1].to_u32(), 1); // 1 message on channel 1 requires ordering, next ordered frame index is 1 @@ -258,22 +268,26 @@ mod test { fn test_fragmented_panic() { let dst = DstSink::default().fragmented(50, 8); tokio::pin!(dst); - dst.as_mut().start_send(Message::new( - Reliability::ReliableOrdered, - 100, - Bytes::from_static(b"hello world"), - )).unwrap(); + dst.as_mut() + .start_send(Message::new( + Reliability::ReliableOrdered, + 100, + Bytes::from_static(b"hello world"), + )) + .unwrap(); } #[test] fn test_fragmented_fulfill_one_packet() { let dst = DstSink::default().fragmented(50, 8); tokio::pin!(dst); - dst.as_mut().start_send(Message::new( - Reliability::ReliableOrdered, - 0, - Bytes::from_iter(std::iter::repeat(0xfe).take(50 - FRAME_SET_HEADER_SIZE - 10)), - )).unwrap(); + dst.as_mut() + .start_send(Message::new( + Reliability::ReliableOrdered, + 0, + Bytes::from_iter(std::iter::repeat(0xfe).take(50 - FRAME_SET_HEADER_SIZE - 10)), + )) + .unwrap(); assert_eq!(dst.frame.buf.len(), 1); assert!(dst.frame.buf[0].fragment.is_none()); assert_eq!(dst.frame.buf[0].size(), 50 - FRAME_SET_HEADER_SIZE); @@ -283,11 +297,13 @@ mod test { fn test_fragmented_split_packet() { let dst = DstSink::default().fragmented(50, 8); tokio::pin!(dst); - dst.as_mut().start_send(Message::new( - Reliability::ReliableOrdered, - 0, - Bytes::from_iter(std::iter::repeat(0xfe).take(50)), - )).unwrap(); + dst.as_mut() + .start_send(Message::new( + Reliability::ReliableOrdered, + 0, + Bytes::from_iter(std::iter::repeat(0xfe).take(50)), + )) + .unwrap(); assert_eq!(dst.frame.buf.len(), 2); let mut fragment = dst.frame.buf[0].fragment.unwrap(); let r = dst.frame.buf[0].flags.reliability.size(); diff --git a/src/codec/mod.rs b/src/codec/mod.rs index 9bb627b..f6fc946 100644 --- a/src/codec/mod.rs +++ b/src/codec/mod.rs @@ -24,7 +24,7 @@ use self::encoder::{BodyEncoded, Fragmented}; use crate::link::SharedLink; use crate::packet::connected::{Frame, FrameBody, FrameSet, FramesMut}; use crate::utils::Logged; -use crate::{Message, Role}; +use crate::{Message, Peer, Role}; /// Codec config #[derive(Clone, Copy, Debug)] @@ -72,26 +72,32 @@ pub(crate) trait AsyncSocket: Unpin { /// Frames pipeline decoder /// It will convert the stream of raw frames into defragmented, deduplicated and ordered frames. pub(crate) trait Decoded { - fn frame_decoded(self, config: Config, role: Role) -> impl Stream; + fn frame_decoded(self, config: Config, role: Role, peer: Peer) + -> impl Stream; } impl Decoded for F where F: Stream>, { - fn frame_decoded(self, config: Config, role: Role) -> impl Stream { + fn frame_decoded( + self, + config: Config, + role: Role, + peer: Peer, + ) -> impl Stream { self.map(Ok) .trace_pending() .deduplicated() .defragmented(config.max_parted_size, config.max_parted_count) .ordered(config.max_channels) .body_decoded() - .logged_all( + .logged( move |pack| { - trace!("[{role}] received packet: {:?}", pack); + trace!("[{role}] received packet: {:?} from {peer}", pack); }, move |err| { - debug!("[{role}] got codec error: {err} when pipelining packets"); + debug!("[{role}] got codec error: {err} when pipelining packets from {peer}"); }, ) } @@ -132,9 +138,8 @@ pub mod micro_bench { use rand::{Rng, SeedableRng}; use super::{Config, Decoded, FrameSet, FramesMut, Stream}; - use crate::link::TransferLink; use crate::packet::connected::{Flags, Fragment, Frame, Ordered}; - use crate::{Reliability, Role}; + use crate::{Peer, Reliability, Role}; #[derive(Debug, Clone)] pub struct Options { @@ -275,11 +280,9 @@ pub mod micro_bench { #[allow(clippy::semicolon_if_nothing_returned)] pub async fn bench_decoded(self) { let config = self.config; - let link = TransferLink::new_arc(Role::test_server()); - - let stream = self - .into_stream() - .frame_decoded(config, link, Role::test_server()); + let stream = + self.into_stream() + .frame_decoded(config, Role::test_server(), Peer::test()); #[futures_async_stream::for_await] for _r in stream {} } diff --git a/src/server/incoming/tokio.rs b/src/server/incoming/tokio.rs index dce6934..9ee993e 100644 --- a/src/server/incoming/tokio.rs +++ b/src/server/incoming/tokio.rs @@ -90,7 +90,7 @@ impl Stream for Incoming { ))); let src = route - .frame_decoded(this.config.codec_config(), role) + .frame_decoded(this.config.codec_config(), role, peer) .manage_incoming_state() .handle_online(role, peer.addr, Arc::clone(&link)) .enter_on_item(move || { diff --git a/src/utils/log.rs b/src/utils/log.rs index 61dcdad..fefb765 100644 --- a/src/utils/log.rs +++ b/src/utils/log.rs @@ -5,7 +5,7 @@ use futures::{Sink, Stream}; use pin_project_lite::pin_project; pub(crate) trait Logged: Sized { - fn logged_all( + fn logged( self, ok_f: impl Fn(&T) + Send + Sync + 'static, err_f: impl Fn(&E) + Send + Sync + 'static, @@ -16,7 +16,7 @@ impl Logged for F where F: Stream>, { - fn logged_all( + fn logged( self, ok_f: impl Fn(&T) + Send + Sync + 'static, err_f: impl Fn(&E) + Send + Sync + 'static, @@ -24,7 +24,7 @@ where Log { source: self, err_f: Box::new(err_f), - ok_f: Some(Box::new(ok_f)), + ok_f: Box::new(ok_f), } } } @@ -34,7 +34,7 @@ pin_project! { pub(crate) struct Log { #[pin] source: F, - ok_f: Option>, + ok_f: Box, err_f: Box, } } @@ -59,9 +59,7 @@ where continue; } }; - if let Some(ok_f) = this.ok_f { - ok_f(&v); - } + (*this.ok_f)(&v); return Poll::Ready(Some(v)); } }