Skip to content

Commit

Permalink
Merge pull request #123 from plaindocs/async
Browse files Browse the repository at this point in the history
Minor edits
  • Loading branch information
jonathanpallant authored Nov 14, 2024
2 parents 84d0372 + 3d434b8 commit 465b122
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 38 deletions.
17 changes: 9 additions & 8 deletions exercise-book/src/async-chat/accept_loop.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,21 @@ First of all, let's add required import boilerplate:
# extern crate tokio;
use std::future::Future; // 1
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, // 1
io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, // 2
net::{tcp::OwnedWriteHalf, TcpListener, TcpStream, ToSocketAddrs}, // 3
sync::{mpsc, oneshot},
task, // 2
task, // 4
};
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>; // 4
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>; // 5
```

1. Import some traits required to work with futures and streams.
2. The `task` module roughly corresponds to the `std::thread` module, but tasks are much lighter weight.
A single thread can run many tasks.
1. Import traits required to work with futures.
2. Import traits required to work with streams.
3. For the socket type, we use `TcpListener` from `tokio`, which is similar to the sync `std::net::TcpListener`, but is non-blocking and uses `async` API.
4. We will skip implementing detailled error handling in this example.
4. The `task` module roughly corresponds to the `std::thread` module, but tasks are much lighter weight.
A single thread can run many tasks.
5. We will skip implementing detailed error handling in this example.
To propagate the errors, we will use a boxed error trait object.
Do you know that there's `From<&'_ str> for Box<dyn Error>` implementation in stdlib, which allows you to use strings with `?` operator?

Expand Down Expand Up @@ -66,7 +67,7 @@ pub(crate) async fn main() -> Result<()> {
}
```

The crucial thing to realise that is in Rust, unlike other languages, calling an async function does **not** run any code.
The crucial thing to realise is that in Rust, unlike in other languages, calling an async function does **not** run any code.
Async functions only construct futures, which are inert state machines.
To start stepping through the future state-machine in an async function, you should use `.await`.
In a non-async function, a way to execute a future is to hand it to the executor.
2 changes: 1 addition & 1 deletion exercise-book/src/async-chat/all_together.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## Gluing all together
## Gluing it all together

At this point, we only need to start the broker to get a fully-functioning (in the happy case!) chat.

Expand Down
18 changes: 8 additions & 10 deletions exercise-book/src/async-chat/clean_shutdown.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
## Clean Shutdown

One of the problems of the current implementation is that it doesn't handle graceful shutdown.
If we break from the accept loop for some reason, all in-flight tasks are just dropped on the floor.
One of the problems with the current implementation is that it doesn't handle graceful shutdown.
If we break from the accept loop for some reason, all in-flight tasks are just dropped.

We will intercept `Ctrl-C`.

A more correct shutdown sequence would be:
Instead, let's intercept `Ctrl-C` and implment a more correct shutdown sequence:

1. Stop accepting new clients
2. Notify the readers we're not accepting new messages
Expand All @@ -26,7 +24,7 @@ However, we never wait for broker and writers, which might cause some messages t
We also need to notify all readers that we are going to stop accepting messages. Here, we use `tokio::sync::Notify`.

Let's first add the notification feature to the readers.
We have to start using `select!` here to work
We have to start using `select!` here to work
```rust,ignore
async fn connection_loop(broker: Sender<Event>, stream: TcpStream, shutdown: Arc<Notify>) -> Result<()> {
// ...
Expand All @@ -43,7 +41,7 @@ async fn connection_loop(broker: Sender<Event>, stream: TcpStream, shutdown: Arc
.map(|name| name.trim().to_string())
.collect();
let msg: String = msg.trim().to_string();
broker
.send(Event::Message {
from: name.clone(),
Expand Down Expand Up @@ -97,7 +95,7 @@ Let's add Ctrl-C handling and waiting to the server.
# {
# unimplemented!()
# }
#
#
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
let listener = TcpListener::bind(addr).await?;
Expand Down Expand Up @@ -167,15 +165,15 @@ And to the broker:
# ) -> Result<()> {
# Ok(())
# }
#
#
async fn broker_loop(mut events: Receiver<Event>) {
let mut peers: HashMap<String, Sender<String>> = HashMap::new();
loop {
let event = match events.recv().await {
Some(event) => event,
None => break,
};
};
match event {
Event::Message { from, to, msg } => {
// ...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

So how do we make sure that messages read in `connection_loop` flow into the relevant `connection_writer_loop`?
We should somehow maintain a `peers: HashMap<String, Sender<String>>` map which allows a client to find destination channels.
However, this map would be a bit of shared mutable state, so we'll have to wrap an `RwLock` over it and answer tough questions of what should happen if the client joins at the same moment as it receives a message.
However, this map would be a bit of shared mutable state, so we'll have to wrap an `RwLock` over it and answer tough questions about what should happen if the client joins at the same moment as it receives a message.

One trick to make reasoning about state simpler is by taking inspiration from the actor model.
We can create a dedicated broker task which owns the `peers` map and communicates with other tasks using channels.
Expand Down
16 changes: 8 additions & 8 deletions exercise-book/src/async-chat/handling_disconnection.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ If the read side finishes we will notify the write side that it should stop as w
That is, we need to add an ability to signal shutdown for the writer task.

One way to approach this is a `shutdown: Receiver<()>` channel.
There's a more minimal solution however, which makes clever use of RAII.
There's a more minimal solution however, which makes clever use of [RAII (Resource Acquisition Is Initialization)](https://doc.rust-lang.org/rust-by-example/scope/raii.html).
Closing a channel is a synchronization event, so we don't need to send a shutdown message, we can just drop the sender.
This way, we statically guarantee that we issue shutdown exactly once, even if we early return via `?` or panic.

Expand Down Expand Up @@ -50,7 +50,7 @@ enum Event {
NewPeer {
name: String,
stream: OwnedWriteHalf,
shutdown: oneshot::Receiver<()>,
shutdown: oneshot::Receiver<()>, // 1
},
Message {
from: String,
Expand All @@ -65,12 +65,12 @@ async fn connection_loop(broker: Sender<Event>, stream: TcpStream) -> Result<()>
# let mut lines = reader.lines();
# let name: String = String::new();
// ...
let (_shutdown_sender, shutdown_receiver) = oneshot::channel::<()>();
let (_shutdown_sender, shutdown_receiver) = oneshot::channel::<()>(); // 3
broker
.send(Event::NewPeer {
name: name.clone(),
stream: writer,
shutdown: shutdown_receiver,
shutdown: shutdown_receiver, // 2
})
.unwrap();
// ...
Expand All @@ -89,15 +89,15 @@ We use the `select` macro for this purpose:
async fn connection_writer_loop(
messages: &mut Receiver<String>,
stream: &mut OwnedWriteHalf,
mut shutdown: oneshot::Receiver<()>,
mut shutdown: oneshot::Receiver<()>, // 1
) -> Result<()> {
loop {
loop { // 2
tokio::select! {
msg = messages.recv() => match msg {
Some(msg) => stream.write_all(msg.as_bytes()).await?,
None => break,
},
_ = &mut shutdown => break
_ = &mut shutdown => break // 3
}
}
Expand Down Expand Up @@ -164,4 +164,4 @@ async fn broker_loop(mut events: Receiver<Event>) {
drop(disconnect_sender);
while let Some((_name, _pending_messages)) = disconnect_receiver.recv().await {}
}
```
```
9 changes: 5 additions & 4 deletions exercise-book/src/async-chat/implementing_a_client.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ async fn try_main(addr: impl ToSocketAddrs) -> Result<()> {
let (reader, mut writer) = stream.into_split();
let mut lines_from_server = BufReader::new(reader).lines(); // 2
let mut lines_from_stdin = BufReader::new(stdin()).lines(); // 2
let mut lines_from_stdin = BufReader::new(stdin()).lines(); // 3
loop {
tokio::select! { // 3
tokio::select! { // 4
line = lines_from_server.next_line() => match line {
Ok(Some(line)) => {
println!("{}", line);
Expand All @@ -57,5 +57,6 @@ async fn try_main(addr: impl ToSocketAddrs) -> Result<()> {
```

1. Here we split `TcpStream` into read and write halves.
2. We create a stream of lines for both the socket and stdin.
3. In the main select loop, we print the lines we receive from the server and send the lines we read from the console.
2. We create a stream of lines for the socket.
2. We create a stream of lines for stdin.
4. In the main select loop, we print the lines we receive from the server and send the lines we read from the console.
12 changes: 6 additions & 6 deletions exercise-book/src/async-chat/receiving_messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ We need to:
2. interpret the first line as a login
3. parse the rest of the lines as a `login: message`

We highly recommend to go past this quick, this is a lot of protocol minutia.
We recommend that you speed through this quickly, it is mostly a lot of uninteresting protocol minutia.

```rust,ignore
# extern crate tokio;
Expand All @@ -29,7 +29,7 @@ async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
loop {
let (stream, _socket_addr) = listener.accept().await?;
println!("Accepting from: {}", stream.peer_addr()?);
let _handle = task::spawn(connection_loop(stream));
let _handle = task::spawn(connection_loop(stream)); // 1
}
Ok(())
}
Expand Down Expand Up @@ -83,7 +83,7 @@ async fn connection_loop(stream: TcpStream) -> Result<()> {

## Managing Errors

One serious problem in the above solution is that, while we correctly propagate errors in the `connection_loop`, we just drop the error on the floor afterwards!
One issue with the previous solution is that while we correctly propagate errors in the `connection_loop`, we just drop the error afterwards!
That is, `task::spawn` does not return an error immediately (it can't, it needs to run the future to completion first), only after it is joined.
We can "fix" it by waiting for the task to be joined, like this:

Expand All @@ -107,11 +107,11 @@ handle.await?
The `.await` waits until the client finishes, and `?` propagates the result.

There are two problems with this solution however!
*First*, because we immediately await the client, we can only handle one client at a time, and that completely defeats the purpose of async!
*Second*, if a client encounters an IO error, the whole server immediately exits.
- *First*, because we immediately await the client, we can only handle one client at a time, and that completely defeats the purpose of async!
- *Second*, if a client encounters an IO error, the whole server immediately exits.
That is, a flaky internet connection of one peer brings down the whole chat room!

A correct way to handle client errors in this case is log them, and continue serving other clients.
The correct way to handle client errors in this case is log them, and continue serving other clients.
So let's use a helper function for this:

```rust,ignore
Expand Down

0 comments on commit 465b122

Please sign in to comment.