Skip to content

Commit

Permalink
Remove retry from block and add acknowledgement to getting start
Browse files Browse the repository at this point in the history
  • Loading branch information
sheldygg committed Nov 22, 2024
1 parent f269339 commit b2ef81e
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 61 deletions.
24 changes: 24 additions & 0 deletions docs/docs/en/getting-started/acknowlegment.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Acknowledgment

Since unexpected errors may occur during message processing, **FastStream** has an `ack_policy` parameter.

`AckPolicy` have 4 variants:

- `ACK` means that the message will be acked anyway.

- `NACK_ON_ERROR` means that the message will be nacked if an error occurs during processing and consumer will receive this message one more time.

- `REJECT_ON_ERROR` means that the message will be rejected if an error occurs during processing and consumer will not receive this message again.

- `DO_NOTHING` in this case *FastStream* will do nothing with the message. You must ack/nack/reject the message manually.


You must provide this parameter when initializing the subscriber.

```python linenums="1" hl_lines="5" title="main.py"
from faststream import AckPolicy
from faststream.nats import NatsBroker

broker = NatsBroker()
@broker.subscriber(ack_policy=AckPolicy.REJECT_ON_ERROR)
```
23 changes: 0 additions & 23 deletions docs/docs/en/nats/jetstream/ack.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,6 @@ In most cases, **FastStream** automatically acknowledges (*acks*) messages on yo

However, there are situations where you might want to use different acknowledgement logic.

## Retries

If you prefer to use a *nack* instead of a *reject* when there's an error in message processing, you can specify the `retry` flag in the `#!python @broker.subscriber(...)` method, which is responsible for error handling logic.

By default, this flag is set to `False`, indicating that if an error occurs during message processing, the message can still be retrieved from the queue:

```python
@broker.subscriber("test", retry=False) # don't handle exceptions
async def base_handler(body: str):
...
```

If this flag is set to `True`, the message will be *nack*ed and placed back in the queue each time an error occurs. In this scenario, the message can be processed by another consumer (if there are several of them) or by the same one:

```python
@broker.subscriber("test", retry=True) # try again indefinitely
async def base_handler(body: str):
...
```

!!! tip
For more complex error handling cases, you can use [tenacity](https://tenacity.readthedocs.io/en/latest/){.external-link target="_blank"}

## Manual Acknowledgement

If you want to acknowledge a message manually, you can get access directly to the message object via the [Context](../../getting-started/context/existed.md){.internal-link} and call the method.
Expand Down
38 changes: 0 additions & 38 deletions docs/docs/en/rabbit/ack.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,6 @@ In most cases, **FastStream** automatically acknowledges (*acks*) messages on yo

However, there are situations where you might want to use a different acknowledgement logic.

## Retries

If you prefer to use a *nack* instead of a *reject* when there's an error in message processing, you can specify the `retry` flag in the `#!python @broker.subscriber(...)` method, which is responsible for error handling logic.

By default, this flag is set to `False`, indicating that if an error occurs during message processing, the message can still be retrieved from the queue:

```python
@broker.subscriber("test", retry=False) # don't handle exceptions
async def base_handler(body: str):
...
```

If this flag is set to `True`, the message will be *nack*ed and placed back in the queue each time an error occurs. In this scenario, the message can be processed by another consumer (if there are several of them) or by the same one:

```python
@broker.subscriber("test", retry=True) # try again indefinitely
async def base_handler(body: str):
...
```

If the `retry` flag is set to an `int`, the message will be placed back in the queue, and the number of retries will be limited to this number:

```python
@broker.subscriber("test", retry=3) # make up to 3 attempts
async def base_handler(body: str):
...
```

!!! tip
**FastStream** identifies the message by its `message_id`. To make this option work, you should manually set this field on the producer side (if your library doesn't set it automatically).

!!! bug
At the moment, attempts are counted only by the current consumer. If the message goes to another consumer, it will have its own counter.
Subsequently, this logic will be reworked.

!!! tip
For more complex error handling cases, you can use [tenacity](https://tenacity.readthedocs.io/en/latest/){.external-link target="_blank"}

## Manual acknowledgement

If you want to acknowledge a message manually, you can get access directly to the message object via the [Context](../getting-started/context/existed.md){.internal-link} and call the method.
Expand Down

0 comments on commit b2ef81e

Please sign in to comment.