From 0b7f4efef65d4fc014522fbb00f4e486b8f06c35 Mon Sep 17 00:00:00 2001 From: iGxnon Date: Thu, 1 Aug 2024 03:25:51 +0800 Subject: [PATCH] clean log Signed-off-by: iGxnon --- Cargo.toml | 4 ---- src/client/conn/tokio.rs | 9 +++++++-- src/codec/mod.rs | 36 ++++++++---------------------------- src/lib.rs | 4 ++-- src/server/incoming/tokio.rs | 10 +++++++--- 5 files changed, 24 insertions(+), 39 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7b763d5..8f30a30 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,10 +34,6 @@ indexmap = "2" reqwest = "0.12" tokio = { version = "1", features = ["full"] } -# TODO: Remove it when tokio-macros release its new version -[patch.crates-io] -tokio-macros = { git = "https://github.com/tokio-rs/tokio.git", rev = "833ee027d0ec44d88765157ec98b6809f0070169" } - [features] default = ["tokio-udp"] tokio-udp = ["dep:tokio"] diff --git a/src/client/conn/tokio.rs b/src/client/conn/tokio.rs index 3ccfb1a..ddd819b 100644 --- a/src/client/conn/tokio.rs +++ b/src/client/conn/tokio.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use fastrace::Span; use futures::StreamExt; +use log::{error, trace}; use tokio::net::UdpSocket as TokioUdpSocket; use super::ConnectTo; @@ -15,7 +16,7 @@ use crate::guard::HandleOutgoing; use crate::io::{Ping, SeparatedIO, IO}; use crate::link::{Router, TransferLink}; use crate::state::{IncomingStateManage, OutgoingStateManage}; -use crate::utils::TraceStreamExt; +use crate::utils::{Logged, TraceStreamExt}; impl ConnectTo for TokioUdpSocket { async fn connect_to( @@ -61,7 +62,11 @@ impl ConnectTo for TokioUdpSocket { }); let src = route - .frame_decoded(config.codec_config(), role, peer) + .frame_decoded(config.codec_config()) + .logged( + move |frame| trace!("[{role}] received {frame:?} from {peer}"), + move |err| error!("[{role}] decode error: {err} from {peer}"), + ) .manage_incoming_state() .handle_online(addr, config.client_guid, Arc::clone(&link)) .enter_on_item(Span::noop); diff --git a/src/codec/mod.rs b/src/codec/mod.rs index f6fc946..252bb55 100644 --- a/src/codec/mod.rs +++ b/src/codec/mod.rs @@ -17,14 +17,13 @@ use std::task::{Context, Poll}; use bytes::BytesMut; use futures::{Sink, Stream, StreamExt}; -use log::{debug, trace}; use self::decoder::{BodyDecoded, DeFragmented, Deduplicated, Ordered, TracePending}; use self::encoder::{BodyEncoded, Fragmented}; +use crate::errors::CodecError; use crate::link::SharedLink; use crate::packet::connected::{Frame, FrameBody, FrameSet, FramesMut}; -use crate::utils::Logged; -use crate::{Message, Peer, Role}; +use crate::Message; /// Codec config #[derive(Clone, Copy, Debug)] @@ -72,34 +71,20 @@ pub(crate) trait AsyncSocket: Unpin { /// Frames pipeline decoder /// It will convert the stream of raw frames into defragmented, deduplicated and ordered frames. pub(crate) trait Decoded { - fn frame_decoded(self, config: Config, role: Role, peer: Peer) - -> impl Stream; + fn frame_decoded(self, config: Config) -> impl Stream>; } impl Decoded for F where F: Stream>, { - fn frame_decoded( - self, - config: Config, - role: Role, - peer: Peer, - ) -> impl Stream { + fn frame_decoded(self, config: Config) -> impl Stream> { self.map(Ok) .trace_pending() .deduplicated() .defragmented(config.max_parted_size, config.max_parted_count) .ordered(config.max_channels) .body_decoded() - .logged( - move |pack| { - trace!("[{role}] received packet: {:?} from {peer}", pack); - }, - move |err| { - debug!("[{role}] got codec error: {err} when pipelining packets from {peer}"); - }, - ) } } @@ -139,7 +124,7 @@ pub mod micro_bench { use super::{Config, Decoded, FrameSet, FramesMut, Stream}; use crate::packet::connected::{Flags, Fragment, Frame, Ordered}; - use crate::{Peer, Reliability, Role}; + use crate::Reliability; #[derive(Debug, Clone)] pub struct Options { @@ -262,14 +247,11 @@ pub mod micro_bench { let config = self.config; let data = self.data.clone(); - let link = TransferLink::new_arc(Role::test_server()); - let stream = self - .into_stream() - .frame_decoded(config, link, Role::test_server()); + let stream = self.into_stream().frame_decoded(config); #[futures_async_stream::for_await] for res in stream { - let body = match res { + let body = match res.unwrap() { crate::packet::connected::FrameBody::User(body) => body, _ => unreachable!("unexpected decoded result"), }; @@ -280,9 +262,7 @@ pub mod micro_bench { #[allow(clippy::semicolon_if_nothing_returned)] pub async fn bench_decoded(self) { let config = self.config; - let stream = - self.into_stream() - .frame_decoded(config, Role::test_server(), Peer::test()); + let stream = self.into_stream().frame_decoded(config); #[futures_async_stream::for_await] for _r in stream {} } diff --git a/src/lib.rs b/src/lib.rs index 410d28b..7a73cc6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -100,7 +100,7 @@ enum Role { } impl Role { - #[cfg(any(test, feature = "micro-bench"))] + #[cfg(test)] fn test_server() -> Self { Role::Server { guid: 114514 } } @@ -130,7 +130,7 @@ struct Peer { } impl Peer { - #[cfg(any(test, feature = "micro-bench"))] + #[cfg(test)] fn test() -> Self { Self { guid: 114514, diff --git a/src/server/incoming/tokio.rs b/src/server/incoming/tokio.rs index 9ee993e..5be7dbf 100644 --- a/src/server/incoming/tokio.rs +++ b/src/server/incoming/tokio.rs @@ -8,7 +8,7 @@ use concurrent_queue::ConcurrentQueue; use fastrace::collector::SpanContext; use fastrace::Span; use futures::Stream; -use log::{debug, error}; +use log::{debug, error, trace}; use pin_project_lite::pin_project; use tokio::net::UdpSocket as TokioUdpSocket; @@ -21,7 +21,7 @@ use crate::link::{Router, TransferLink}; use crate::server::handler::offline::OfflineHandler; use crate::server::handler::online::HandleOnline; use crate::state::{CloseOnDrop, IncomingStateManage, OutgoingStateManage}; -use crate::utils::TraceStreamExt; +use crate::utils::{Logged, TraceStreamExt}; pin_project! { struct Incoming { @@ -90,7 +90,11 @@ impl Stream for Incoming { ))); let src = route - .frame_decoded(this.config.codec_config(), role, peer) + .frame_decoded(this.config.codec_config()) + .logged( + move |frame| trace!("[{role}] received {frame:?} from {peer}"), + move |err| error!("[{role}] decode error: {err} from {peer}"), + ) .manage_incoming_state() .handle_online(role, peer.addr, Arc::clone(&link)) .enter_on_item(move || {