From 759706cd120e7672bc136d67b548981aaf52bacf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=BE=D0=BD=D1=81=D1=82=D0=B0=D0=BD=D1=82=D0=B8?= =?UTF-8?q?=D0=BD=20=D0=91=D0=B5=D0=B4=D1=80=D0=B8=D1=86=D0=BA=D0=B8=D0=B9?= Date: Sat, 11 Sep 2021 16:30:37 +0600 Subject: [PATCH] Separate senders for packets and channels --- src/lib.rs | 7 +++++-- src/transport.rs | 32 ++++++++++++++++---------------- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 697d5bb..a605cbc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -303,6 +303,7 @@ impl NetworkResource { task_pool.clone(), packet_rx, server_socket.get_sender(), + server_socket.get_sender(), address, ), )); @@ -340,7 +341,8 @@ impl NetworkResource { socket } }; - let sender = client_socket.get_sender(); + let packet_sender = client_socket.get_sender(); + let channel_sender = client_socket.get_sender(); self.pending_connections .lock() @@ -348,7 +350,8 @@ impl NetworkResource { .push(Box::new(transport::ClientConnection::new( self.task_pool.clone(), client_socket, - sender, + packet_sender, + channel_sender, ))); } diff --git a/src/transport.rs b/src/transport.rs index 704d4ef..f750bd0 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -109,7 +109,8 @@ pub struct ServerConnection { task_pool: TaskPool, packet_rx: crossbeam_channel::Receiver>, - sender: Option, + packet_sender: ServerSender, + channel_sender: Option, client_address: SocketAddr, stats: Arc>, @@ -128,13 +129,15 @@ impl ServerConnection { pub fn new( task_pool: TaskPool, packet_rx: crossbeam_channel::Receiver>, - sender: ServerSender, + packet_sender: ServerSender, + channel_sender: ServerSender, client_address: SocketAddr, ) -> Self { ServerConnection { task_pool, packet_rx, - sender: Some(sender), + packet_sender, + channel_sender: Some(channel_sender), client_address, stats: Arc::new(RwLock::new(PacketStats::default())), channels: None, @@ -157,10 +160,7 @@ impl Connection for ServerConnection { fn send(&mut self, payload: Packet) -> Result<(), Box> { self.stats.write().expect("stats lock poisoned").add_tx(payload.len()); block_on( - self.sender - .as_mut() - .unwrap() - .send(ServerPacket::new(self.client_address, payload.to_vec())), + self.packet_sender.send(ServerPacket::new(self.client_address, payload.to_vec())), ) } @@ -201,7 +201,7 @@ impl Connection for ServerConnection { let (channels_rx, mut channels_tx) = multiplexer.start(); self.channels_rx = Some(channels_rx); - let mut sender = self.sender.take().unwrap(); + let mut sender = self.channel_sender.take().unwrap(); let client_address = self.client_address; let stats = self.stats.clone(); self.channels_task = Some(self.task_pool.spawn(async move { @@ -229,7 +229,8 @@ pub struct ClientConnection { task_pool: TaskPool, socket: Box, - sender: Option, + packet_sender: ClientSender, + channel_sender: Option, stats: Arc>, channels: Option, @@ -246,12 +247,14 @@ impl ClientConnection { pub fn new( task_pool: TaskPool, socket: Box, - sender: ClientSender, + packet_sender: ClientSender, + channel_sender: ClientSender, ) -> Self { ClientConnection { task_pool, socket, - sender: Some(sender), + packet_sender, + channel_sender: Some(channel_sender), stats: Arc::new(RwLock::new(PacketStats::default())), channels: None, channels_rx: None, @@ -277,10 +280,7 @@ impl Connection for ClientConnection { fn send(&mut self, payload: Packet) -> Result<(), Box> { self.stats.write().expect("stats lock poisoned").add_tx(payload.len()); - self.sender - .as_mut() - .unwrap() - .send(ClientPacket::new(payload.to_vec())) + self.packet_sender.send(ClientPacket::new(payload.to_vec())) } fn receive(&mut self) -> Option> { @@ -307,7 +307,7 @@ impl Connection for ClientConnection { let (channels_rx, mut channels_tx) = multiplexer.start(); self.channels_rx = Some(channels_rx); - let mut sender = self.sender.take().unwrap(); + let mut sender = self.channel_sender.take().unwrap(); let stats = self.stats.clone(); #[allow(unused_variables)] let channels_task = self.task_pool.spawn(async move {