Skip to content

Commit

Permalink
feat: ack_first added deprecated and docstrings to Ack_Policy
Browse files Browse the repository at this point in the history
  • Loading branch information
Rusich90 committed Nov 19, 2024
1 parent d64f9f5 commit ad8d6b4
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 9 deletions.
52 changes: 46 additions & 6 deletions faststream/confluent/broker/registrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
overload,
)

from typing_extensions import Doc, override
from typing_extensions import Doc, deprecated, override

from faststream._internal.broker.abc_broker import ABCBroker
from faststream._internal.constants import EMPTY
Expand Down Expand Up @@ -161,7 +161,13 @@ def subscriber(
periodically committed in the background.
""",
),
] = True,
deprecated(
"""
This option is deprecated and will be removed in 0.7.0 release.
Please, use `ack_policy=AckPolicy.ACK_FIRST` instead.
""",
),
] = EMPTY,
auto_commit_interval_ms: Annotated[
int,
Doc(
Expand Down Expand Up @@ -428,7 +434,13 @@ def subscriber(
periodically committed in the background.
""",
),
] = True,
deprecated(
"""
This option is deprecated and will be removed in 0.7.0 release.
Please, use `ack_policy=AckPolicy.ACK_FIRST` instead.
""",
),
] = EMPTY,
auto_commit_interval_ms: Annotated[
int,
Doc(
Expand Down Expand Up @@ -695,7 +707,13 @@ def subscriber(
periodically committed in the background.
""",
),
] = True,
deprecated(
"""
This option is deprecated and will be removed in 0.7.0 release.
Please, use `ack_policy=AckPolicy.ACK_FIRST` instead.
""",
),
] = EMPTY,
auto_commit_interval_ms: Annotated[
int,
Doc(
Expand Down Expand Up @@ -965,7 +983,13 @@ def subscriber(
periodically committed in the background.
""",
),
] = True,
deprecated(
"""
This option is deprecated and will be removed in 0.7.0 release.
Please, use `ack_policy=AckPolicy.ACK_FIRST` instead.
""",
),
] = EMPTY,
auto_commit_interval_ms: Annotated[
int,
Doc(
Expand Down Expand Up @@ -1130,7 +1154,23 @@ def subscriber(
"SpecificationDefaultSubscriber",
"SpecificationBatchSubscriber",
]:
if ack_policy is AckPolicy.ACK_FIRST:
if (
auto_commit is not EMPTY and auto_commit
and ack_policy is not EMPTY and ack_policy is not ack_policy.ACK_FIRST
) or (
auto_commit is not EMPTY and not auto_commit
and ack_policy is ack_policy.ACK_FIRST
):
msg = "You can't use conflict settings ('auto_commit' and 'ack_policy')"
raise SetupError(msg)

if auto_commit is not EMPTY and auto_commit and ack_policy is EMPTY:
ack_policy = AckPolicy.DO_NOTHING

elif auto_commit is not EMPTY and not auto_commit and ack_policy is EMPTY:
ack_policy = AckPolicy.REJECT_ON_ERROR

elif ack_policy is AckPolicy.ACK_FIRST:
auto_commit = True
ack_policy = AckPolicy.DO_NOTHING

Expand Down
24 changes: 24 additions & 0 deletions faststream/confluent/fastapi/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,12 @@ def subscriber(
periodically committed in the background.
""",
),
deprecated(
"""
This option is deprecated and will be removed in 0.7.0 release.
Please, use `ack_policy=AckPolicy.ACK_FIRST` instead.
""",
),
] = True,
auto_commit_interval_ms: Annotated[
int,
Expand Down Expand Up @@ -1089,6 +1095,12 @@ def subscriber(
periodically committed in the background.
""",
),
deprecated(
"""
This option is deprecated and will be removed in 0.7.0 release.
Please, use `ack_policy=AckPolicy.ACK_FIRST` instead.
""",
),
] = True,
auto_commit_interval_ms: Annotated[
int,
Expand Down Expand Up @@ -1469,6 +1481,12 @@ def subscriber(
periodically committed in the background.
""",
),
deprecated(
"""
This option is deprecated and will be removed in 0.7.0 release.
Please, use `ack_policy=AckPolicy.ACK_FIRST` instead.
""",
),
] = True,
auto_commit_interval_ms: Annotated[
int,
Expand Down Expand Up @@ -1862,6 +1880,12 @@ def subscriber(
periodically committed in the background.
""",
),
deprecated(
"""
This option is deprecated and will be removed in 0.7.0 release.
Please, use `ack_policy=AckPolicy.ACK_FIRST` instead.
""",
),
] = True,
auto_commit_interval_ms: Annotated[
int,
Expand Down
8 changes: 7 additions & 1 deletion faststream/confluent/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
Union,
)

from typing_extensions import Doc
from typing_extensions import Doc, deprecated

from faststream._internal.broker.router import (
ArgsContainer,
Expand Down Expand Up @@ -247,6 +247,12 @@ def __init__(
periodically committed in the background.
""",
),
deprecated(
"""
This option is deprecated and will be removed in 0.7.0 release.
Please, use `ack_policy=AckPolicy.ACK_FIRST` instead.
""",
),
] = True,
auto_commit_interval_ms: Annotated[
int,
Expand Down
26 changes: 25 additions & 1 deletion faststream/kafka/broker/registrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from aiokafka import ConsumerRecord
from aiokafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from typing_extensions import Doc, override
from typing_extensions import Doc, deprecated, override

from faststream._internal.broker.abc_broker import ABCBroker
from faststream._internal.constants import EMPTY
Expand Down Expand Up @@ -167,6 +167,12 @@ def subscriber(
periodically committed in the background.
""",
),
deprecated(
"""
This option is deprecated and will be removed in 0.7.0 release.
Please, use `ack_policy=AckPolicy.ACK_FIRST` instead.
""",
),
] = True,
auto_commit_interval_ms: Annotated[
int,
Expand Down Expand Up @@ -533,6 +539,12 @@ def subscriber(
periodically committed in the background.
""",
),
deprecated(
"""
This option is deprecated and will be removed in 0.7.0 release.
Please, use `ack_policy=AckPolicy.ACK_FIRST` instead.
""",
),
] = True,
auto_commit_interval_ms: Annotated[
int,
Expand Down Expand Up @@ -899,6 +911,12 @@ def subscriber(
periodically committed in the background.
""",
),
deprecated(
"""
This option is deprecated and will be removed in 0.7.0 release.
Please, use `ack_policy=AckPolicy.ACK_FIRST` instead.
""",
),
] = True,
auto_commit_interval_ms: Annotated[
int,
Expand Down Expand Up @@ -1268,6 +1286,12 @@ def subscriber(
periodically committed in the background.
""",
),
deprecated(
"""
This option is deprecated and will be removed in 0.7.0 release.
Please, use `ack_policy=AckPolicy.ACK_FIRST` instead.
""",
),
] = True,
auto_commit_interval_ms: Annotated[
int,
Expand Down
24 changes: 24 additions & 0 deletions faststream/kafka/fastapi/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,12 @@ def subscriber(
periodically committed in the background.
""",
),
deprecated(
"""
This option is deprecated and will be removed in 0.7.0 release.
Please, use `ack_policy=AckPolicy.ACK_FIRST` instead.
""",
),
] = True,
auto_commit_interval_ms: Annotated[
int,
Expand Down Expand Up @@ -1196,6 +1202,12 @@ def subscriber(
periodically committed in the background.
""",
),
deprecated(
"""
This option is deprecated and will be removed in 0.7.0 release.
Please, use `ack_policy=AckPolicy.ACK_FIRST` instead.
""",
),
] = True,
auto_commit_interval_ms: Annotated[
int,
Expand Down Expand Up @@ -1682,6 +1694,12 @@ def subscriber(
periodically committed in the background.
""",
),
deprecated(
"""
This option is deprecated and will be removed in 0.7.0 release.
Please, use `ack_policy=AckPolicy.ACK_FIRST` instead.
""",
),
] = True,
auto_commit_interval_ms: Annotated[
int,
Expand Down Expand Up @@ -2171,6 +2189,12 @@ def subscriber(
periodically committed in the background.
""",
),
deprecated(
"""
This option is deprecated and will be removed in 0.7.0 release.
Please, use `ack_policy=AckPolicy.ACK_FIRST` instead.
""",
),
] = True,
auto_commit_interval_ms: Annotated[
int,
Expand Down
8 changes: 7 additions & 1 deletion faststream/kafka/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
)

from aiokafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from typing_extensions import Doc
from typing_extensions import Doc, deprecated

from faststream._internal.broker.router import (
ArgsContainer,
Expand Down Expand Up @@ -255,6 +255,12 @@ def __init__(
periodically committed in the background.
""",
),
deprecated(
"""
This option is deprecated and will be removed in 0.7.0 release.
Please, use `ack_policy=AckPolicy.ACK_FIRST` instead.
""",
),
] = True,
auto_commit_interval_ms: Annotated[
int,
Expand Down
9 changes: 9 additions & 0 deletions faststream/middlewares/acknowledgement/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,16 @@

class AckPolicy(str, Enum):
ACK_FIRST = "ack_first"
"""Ack message on consume"""

ACK = "ack"
"""Ack message after all process"""

REJECT_ON_ERROR = "reject_on_error"
"""Reject message on unhandled exceptions"""

NACK_ON_ERROR = "nack_on_error"
"""Nack message on unhandled exceptions"""

DO_NOTHING = "do_nothing"
"""Not create AcknowledgementMiddleware"""

0 comments on commit ad8d6b4

Please sign in to comment.