Skip to content

concurrentes-fiuba/actix-async-handler

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

10 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Actix Async Handler

An attribute macro to support writing async message handlers for Actix actors

Using this macro you can convert this example

fn handle(&mut self, _: Msg, _: &mut Context<Self>) -> Self::Result {
    AtomicResponse::new(Box::pin(
        async {}
            .into_actor(self)
            .map(|_, this, _| {
                this.0 = 30;
            })
            .then(|_, this, _| {
                sleep(Duration::from_secs(3)).into_actor(this)
            })
            .map(|_, this, _| {
                this.0 -= 1;
                this.0
            }),
    ))
}

into a much more readable async handler

async fn handle(&mut self, _msg: Msg, _ctx: &mut Context<Self>) -> Self::Result {
    self.0 = 30;
    sleep(Duration::from_secs(3)).await;
    self.0 -= 1;
    self.0
}

Usage

Add actix_async_handler as dev dependency.

cargo add --dev actix_async_handler

If you intend to use loops, also add futures as dependency

cargo add futures

When implementing an async handler, annotate it with the #[async_handler] attribute like

#[async_handler]
impl Handler<Msg> for MyActor {
    type Result = u64; // or whatever your message handler returns, no enclosing ResponseActFuture or AtomicFuture needed 
    async fn handle(&mut self, _msg: Msg, _ctx: &mut Context<Self>) -> Self::Result {
        // your handler code, for example
        self.other_actor_addr.send(OtherMsg()).await // yay! we can use await
    }

that's it! Enjoy.

By default, the returned future will be an AtomicFuture, so your actor won't handle any other incoming messages until fully resolves any awaited calls. This is the behavior that mostly respects the Hewitt's original model, letting you abstract the await-ness in your code and use it exactly like a sync version would do. If you rather let your actor process messages in between awaits, you can change it to be a ResponseActFuture by annotating your handler with #[async_handler(non_atomic)] instead.

Known Limitations

Known list of language features that won't be correctly translated, and hopefully workarounds that may exist.

Chained operations on await results

The following code is not translated well (yet)

let result = self.delegate_actor_addr.send(MyMsg).await.or_else(0) + 3

Isolate the awaitable call to its own expression instead

let await_result = self.delegate_actor_addr.send(MyMsg).await;
let result = await_result.or_else(0) + 3

If expressions

Mutating variables inside if expressions

The following code won't work as expected

let mut result = None;

if some_condition {
    let returned_value = self.delegate_actor.send(message).await;
    result = returned_value.ok();
}

println!("{}", result); // Always prints None regardless of some_condition and returned_value

The async_handler macro translates your async code to a "pyramid of doom" in order to correctly move the latest value of your variables.

For example, a code like this

let a = call_a().await;
let b = call_b(a).await;
let c = call_c(b).await;
println!("{}, {}, {}", a, b, c)

becomes (simplified)

wrap_future(call_a())
    .then(move |__res, __self, __ctx| {
        let a = __res;
        wrap_future(call_b(a))    
            .then(move |__res, __self, __ctx| {
                let b = __res;
                wrap_future(call_c(b))
                    .then(move |__res, __self, __ctx| {
                        let c = __res;
                        println!("{}, {}, {}", a, b, c)
                    })
            })
    })

This way the latest lines are the innermost in the then chain, and as such are moving the correct values for the scope variables.

The problem arises when you are using an if condition. Here as we have different branches, then is applied externally.

For the first example, the translated code would look like (again simplified)

    let mut result = None;

    (if some_condition {
        wrap_future(self.delegate_actor.send(message))
            .then(move |__res, __self, __ctx| {
                let returned_value = __res;
                result = returned_value.ok(); // updates the local copy of result, useless
            } 
    } else {
        wrap_future(fut::ready(())) // both if branches need to return a future. 
    }).then(move |__res, __self, __ctx| {
        println!("{}", result);
    })

The then for the lines after the if is put outside the conditional chain, and as such captures the original variable value. Hence, the value stays the original from the point of view of the print.

To overcome this issue, you should make your condition always return what you need to be updated.

In the code above, you should do instead

let mut result = None;

result = if some_condition {
    let returned_value = self.delegate_actor.send(message).await;
    returned_value.ok()
}

println!("{}", result);

If you have multiple variables you wish to update, you could pack them in a tuple

let mut a = 0, mut b = 0, mut c = 0;

(a, b, c) = if some_condition {
    a = call_a().await;
    b = call_b(b).await;
    c = call_c(c).await;
    (a, b, c)
} else {
    (a, b, c) // return the defaults. It is mandatory to have an else
}

Need for explicitly setting a return type for if expressions

This doesn't compile

let result = if some_condition {
    let a = call_a().await
    a.ok()
} else {
    None
}

As the translation code is not smart enough to figure the returned type of a.ok()

instead you should hit the compiler on the type like:

let result: Option<CallAResultType> = if some_condition {
    let a = call_a().await // image return type to be Result<CallAResultType, Err>
    a.ok()
} else {
    None
}

Early returning inside if expressions

This code wouldn't do what you expect

if some_early_exit_condition {
    call_a().await;
    return;
}

call_b(a).await;
...

As the then chain is external to the closure containing the if, it won't avoid the code after the await to be executed.

Write an else block containing the rest instead

if some_early_exit_condition {
    call_a().await;
} else {
    call_b(a).await;
    ... // rest of the code
}

Previous declaration of result variable

This fails to compile with Cannot assign to `a` as it is not declared mutable

let a;

if condition {
    a = call_a().await;
}

Given you cannot really use a for anything outside the then block, simply declare it local. If you want to "return" the result of the await call, refer to Mutating variables inside if expressions

match expressions

awaits inside match expressions are not currently supported. Replace them with chained if let expressions instead like

match action {
    Move(x, y) => call_move_async(x, y).await,
    Talk(msg) => say_async(msg).await,
    _ => println!("unknown action");
}

becomes

if let Move(x, y) = action {
    call_move_async(x, y).await
} else if let Talk(msg) = action {
    say_async(msg).await
} else {
    println!("unknown action");
}

Loops

In the case of loops containing awaits in their blocks

  • for loops are the only ones currently supported.
    • while loops only depending on the actor state in the condition could be easily implemented by take_whileing an infinite stream (wrap_stream(futures::stream::iter(iter::repeat())))
    • while loops depending on scope accumulators (i.e. let mut i = 0; while i < 3 { i += 1}) should require to create a TryActorStream, in particular TryFold; to be able to pass the current accumulator value to the condition expression closure.
  • The iterator used in the for expression will be moved, so you may need to .clone() it if you want to keep a ref of the iterable (for instance when iterating over a field in your actor's state)
  • break and continue are not supported. continue should be easy to implement for the immediate level by replacing it with an early return. break would require to create a TryActorStream, in particular TryFold; same as with while.
  • The for expression can't have an await clause itself. Extract it into a variable first.
  • Currently, you can't iterate a stream, though with some generics magic maybe we could spare expecting an IntoIterator and also accept a Stream directly in the for expression. But probably it is a bad idea to do it inside a message handler anyway. You should use Actor::add_stream instead.

Using variables mutated inside, after the loop

As with conditionals, variables are moved inside the for block. If you need to keep using the updated value for them after the loop, we support an assignment syntax for loops like

let mut i;
i = for other_actor in self.other_actors {
    i += 1;
    other_actor.send(i).await;
}
println!("{}", i)

This is valid rust syntax, but regular for loops always return unit. In this case the macro gets "smart" and given the internal for impl is actually a Fold, the return value would be the one from accumulator; which purposely will be autofilled with whatever variable names you put in the assignment.

If you happen to need multiple you should

let mut i, j;
(i, j) = for other_actor in self.other_actors {
    i += 1;
    j = i + 1;
    other_actor.send(i).await;
}
println!("{}, {}", i, j)