Skip to content

Commit

Permalink
clean log
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <igxnon@gmail.com>
  • Loading branch information
iGxnon committed Jul 31, 2024
1 parent 92c018e commit 0b7f4ef
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 39 deletions.
4 changes: 0 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ indexmap = "2"
reqwest = "0.12"
tokio = { version = "1", features = ["full"] }

# TODO: Remove it when tokio-macros release its new version
[patch.crates-io]
tokio-macros = { git = "https://github.com/tokio-rs/tokio.git", rev = "833ee027d0ec44d88765157ec98b6809f0070169" }

[features]
default = ["tokio-udp"]
tokio-udp = ["dep:tokio"]
Expand Down
9 changes: 7 additions & 2 deletions src/client/conn/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::sync::Arc;

use fastrace::Span;
use futures::StreamExt;
use log::{error, trace};
use tokio::net::UdpSocket as TokioUdpSocket;

use super::ConnectTo;
Expand All @@ -15,7 +16,7 @@ use crate::guard::HandleOutgoing;
use crate::io::{Ping, SeparatedIO, IO};
use crate::link::{Router, TransferLink};
use crate::state::{IncomingStateManage, OutgoingStateManage};
use crate::utils::TraceStreamExt;
use crate::utils::{Logged, TraceStreamExt};

impl ConnectTo for TokioUdpSocket {
async fn connect_to(
Expand Down Expand Up @@ -61,7 +62,11 @@ impl ConnectTo for TokioUdpSocket {
});

let src = route
.frame_decoded(config.codec_config(), role, peer)
.frame_decoded(config.codec_config())
.logged(
move |frame| trace!("[{role}] received {frame:?} from {peer}"),
move |err| error!("[{role}] decode error: {err} from {peer}"),
)
.manage_incoming_state()
.handle_online(addr, config.client_guid, Arc::clone(&link))
.enter_on_item(Span::noop);
Expand Down
36 changes: 8 additions & 28 deletions src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ use std::task::{Context, Poll};

use bytes::BytesMut;
use futures::{Sink, Stream, StreamExt};
use log::{debug, trace};

use self::decoder::{BodyDecoded, DeFragmented, Deduplicated, Ordered, TracePending};
use self::encoder::{BodyEncoded, Fragmented};
use crate::errors::CodecError;
use crate::link::SharedLink;
use crate::packet::connected::{Frame, FrameBody, FrameSet, FramesMut};
use crate::utils::Logged;
use crate::{Message, Peer, Role};
use crate::Message;

/// Codec config
#[derive(Clone, Copy, Debug)]
Expand Down Expand Up @@ -72,34 +71,20 @@ pub(crate) trait AsyncSocket: Unpin {
/// Frames pipeline decoder
/// It will convert the stream of raw frames into defragmented, deduplicated and ordered frames.
pub(crate) trait Decoded {
fn frame_decoded(self, config: Config, role: Role, peer: Peer)
-> impl Stream<Item = FrameBody>;
fn frame_decoded(self, config: Config) -> impl Stream<Item = Result<FrameBody, CodecError>>;
}

impl<F> Decoded for F
where
F: Stream<Item = FrameSet<FramesMut>>,
{
fn frame_decoded(
self,
config: Config,
role: Role,
peer: Peer,
) -> impl Stream<Item = FrameBody> {
fn frame_decoded(self, config: Config) -> impl Stream<Item = Result<FrameBody, CodecError>> {
self.map(Ok)
.trace_pending()
.deduplicated()
.defragmented(config.max_parted_size, config.max_parted_count)
.ordered(config.max_channels)
.body_decoded()
.logged(
move |pack| {
trace!("[{role}] received packet: {:?} from {peer}", pack);
},
move |err| {
debug!("[{role}] got codec error: {err} when pipelining packets from {peer}");
},
)
}
}

Expand Down Expand Up @@ -139,7 +124,7 @@ pub mod micro_bench {

use super::{Config, Decoded, FrameSet, FramesMut, Stream};
use crate::packet::connected::{Flags, Fragment, Frame, Ordered};
use crate::{Peer, Reliability, Role};
use crate::Reliability;

#[derive(Debug, Clone)]
pub struct Options {
Expand Down Expand Up @@ -262,14 +247,11 @@ pub mod micro_bench {

let config = self.config;
let data = self.data.clone();
let link = TransferLink::new_arc(Role::test_server());

let stream = self
.into_stream()
.frame_decoded(config, link, Role::test_server());
let stream = self.into_stream().frame_decoded(config);
#[futures_async_stream::for_await]
for res in stream {
let body = match res {
let body = match res.unwrap() {
crate::packet::connected::FrameBody::User(body) => body,
_ => unreachable!("unexpected decoded result"),
};
Expand All @@ -280,9 +262,7 @@ pub mod micro_bench {
#[allow(clippy::semicolon_if_nothing_returned)]
pub async fn bench_decoded(self) {
let config = self.config;
let stream =
self.into_stream()
.frame_decoded(config, Role::test_server(), Peer::test());
let stream = self.into_stream().frame_decoded(config);
#[futures_async_stream::for_await]
for _r in stream {}
}
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ enum Role {
}

impl Role {
#[cfg(any(test, feature = "micro-bench"))]
#[cfg(test)]
fn test_server() -> Self {
Role::Server { guid: 114514 }
}
Expand Down Expand Up @@ -130,7 +130,7 @@ struct Peer {
}

impl Peer {
#[cfg(any(test, feature = "micro-bench"))]
#[cfg(test)]
fn test() -> Self {
Self {
guid: 114514,
Expand Down
10 changes: 7 additions & 3 deletions src/server/incoming/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use concurrent_queue::ConcurrentQueue;
use fastrace::collector::SpanContext;
use fastrace::Span;
use futures::Stream;
use log::{debug, error};
use log::{debug, error, trace};
use pin_project_lite::pin_project;
use tokio::net::UdpSocket as TokioUdpSocket;

Expand All @@ -21,7 +21,7 @@ use crate::link::{Router, TransferLink};
use crate::server::handler::offline::OfflineHandler;
use crate::server::handler::online::HandleOnline;
use crate::state::{CloseOnDrop, IncomingStateManage, OutgoingStateManage};
use crate::utils::TraceStreamExt;
use crate::utils::{Logged, TraceStreamExt};

pin_project! {
struct Incoming {
Expand Down Expand Up @@ -90,7 +90,11 @@ impl Stream for Incoming {
)));

let src = route
.frame_decoded(this.config.codec_config(), role, peer)
.frame_decoded(this.config.codec_config())
.logged(
move |frame| trace!("[{role}] received {frame:?} from {peer}"),
move |err| error!("[{role}] decode error: {err} from {peer}"),
)
.manage_incoming_state()
.handle_online(role, peer.addr, Arc::clone(&link))
.enter_on_item(move || {
Expand Down

0 comments on commit 0b7f4ef

Please sign in to comment.