Skip to content

Commit

Permalink
Add more warning's to nats subscription factory (#1907)
Browse files Browse the repository at this point in the history
* Add dynaconf example for nats

* Update index

* :)

* Add type for new_value and uncomment

* Update dynaconf.md

* docs: polish markup

* Proofread howto docs page

* Add more warning's to nats subscription factory

* docs: generate API References

* Add `validate_input_for_warnings` function

* docs: generate API References

* refactor: merge options to config object

* refactor: use correct defaults

* Moreeeee warnings

* Show warnings for core subscription without category

* Upd warning text

* chore: correct text

* Pre-final variant

* Add warnings for nats core subscriber

* docs: generate API References

* Add end-line

* Add return type

* refactor: polish NATS warnings

* refactor: polish conditions

* Proofread warning messages

* Add missing full stop

* Fix linting issues

---------

Co-authored-by: Nikita Pastukhov <diementros@yandex.ru>
Co-authored-by: Kumaran Rajendhiran <kumaran@airt.ai>
Co-authored-by: sheldygg <sheldygg@users.noreply.github.com>
Co-authored-by: Pastukhov Nikita <nikita@pastukhov-dev.ru>
  • Loading branch information
5 people authored Nov 11, 2024
1 parent 0d4c237 commit bd188c7
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 30 deletions.
4 changes: 2 additions & 2 deletions faststream/nats/broker/registrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ def subscriber( # type: ignore[override]
Doc("Enable Heartbeats for a consumer to detect failures."),
] = None,
flow_control: Annotated[
bool,
Optional[bool],
Doc("Enable Flow Control for a consumer."),
] = False,
] = None,
deliver_policy: Annotated[
Optional["api.DeliverPolicy"],
Doc("Deliver Policy to be used for subscription."),
Expand Down
4 changes: 2 additions & 2 deletions faststream/nats/fastapi/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,9 +630,9 @@ def subscriber( # type: ignore[override]
Doc("Enable Heartbeats for a consumer to detect failures."),
] = None,
flow_control: Annotated[
bool,
Optional[bool],
Doc("Enable Flow Control for a consumer."),
] = False,
] = None,
deliver_policy: Annotated[
Optional["api.DeliverPolicy"],
Doc("Deliver Policy to be used for subscription."),
Expand Down
4 changes: 2 additions & 2 deletions faststream/nats/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,9 @@ def __init__(
Doc("Enable Heartbeats for a consumer to detect failures."),
] = None,
flow_control: Annotated[
bool,
Optional[bool],
Doc("Enable Flow Control for a consumer."),
] = False,
] = None,
deliver_policy: Annotated[
Optional["api.DeliverPolicy"],
Doc("Deliver Policy to be used for subscription."),
Expand Down
237 changes: 213 additions & 24 deletions faststream/nats/subscriber/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
DEFAULT_SUB_PENDING_BYTES_LIMIT,
DEFAULT_SUB_PENDING_MSGS_LIMIT,
)
from nats.js.api import ConsumerConfig
from nats.js.api import ConsumerConfig, DeliverPolicy
from nats.js.client import (
DEFAULT_JS_SUB_PENDING_BYTES_LIMIT,
DEFAULT_JS_SUB_PENDING_MSGS_LIMIT,
Expand Down Expand Up @@ -46,7 +46,7 @@ def create_subscriber(
config: Optional["api.ConsumerConfig"],
ordered_consumer: bool,
idle_heartbeat: Optional[float],
flow_control: bool,
flow_control: Optional[bool],
deliver_policy: Optional["api.DeliverPolicy"],
headers_only: Optional[bool],
# pull args
Expand Down Expand Up @@ -79,18 +79,39 @@ def create_subscriber(
"AsyncAPIKeyValueWatchSubscriber",
"AsyncAPIObjStoreWatchSubscriber",
]:
if pull_sub is not None and stream is None:
raise SetupError("Pull subscriber can be used only with a stream")

if not subject and not config:
raise SetupError("You must provide either `subject` or `config` option.")
_validate_input_for_misconfigure(
subject=subject,
queue=queue,
pending_msgs_limit=pending_msgs_limit,
pending_bytes_limit=pending_bytes_limit,
max_msgs=max_msgs,
durable=durable,
config=config,
ordered_consumer=ordered_consumer,
idle_heartbeat=idle_heartbeat,
flow_control=flow_control,
deliver_policy=deliver_policy,
headers_only=headers_only,
pull_sub=pull_sub,
kv_watch=kv_watch,
obj_watch=obj_watch,
ack_first=ack_first,
max_workers=max_workers,
stream=stream,
)

config = config or ConsumerConfig(filter_subjects=[])
if config.durable_name is None:
config.durable_name = durable
if config.idle_heartbeat is None:
config.idle_heartbeat = idle_heartbeat
if config.headers_only is None:
config.headers_only = headers_only
if config.deliver_policy is DeliverPolicy.ALL:
config.deliver_policy = deliver_policy or DeliverPolicy.ALL

if stream:
# TODO: pull & queue warning
# TODO: push & durable warning

# Both JS Subscribers
extra_options: AnyDict = {
"pending_msgs_limit": pending_msgs_limit
or DEFAULT_JS_SUB_PENDING_MSGS_LIMIT,
Expand All @@ -101,9 +122,11 @@ def create_subscriber(
}

if pull_sub is not None:
# JS Pull Subscriber
extra_options.update({"inbox_prefix": inbox_prefix})

else:
# JS Push Subscriber
extra_options.update(
{
"ordered_consumer": ordered_consumer,
Expand All @@ -116,6 +139,7 @@ def create_subscriber(
)

else:
# Core Subscriber
extra_options = {
"pending_msgs_limit": pending_msgs_limit or DEFAULT_SUB_PENDING_MSGS_LIMIT,
"pending_bytes_limit": pending_bytes_limit
Expand All @@ -124,13 +148,6 @@ def create_subscriber(
}

if obj_watch is not None:
if max_workers > 1:
warnings.warn(
"`max_workers` has no effect for ObjectValue subscriber.",
RuntimeWarning,
stacklevel=3,
)

return AsyncAPIObjStoreWatchSubscriber(
subject=subject,
config=config,
Expand All @@ -143,13 +160,6 @@ def create_subscriber(
)

if kv_watch is not None:
if max_workers > 1:
warnings.warn(
"`max_workers` has no effect for KeyValue subscriber.",
RuntimeWarning,
stacklevel=3,
)

return AsyncAPIKeyValueWatchSubscriber(
subject=subject,
config=config,
Expand Down Expand Up @@ -306,3 +316,182 @@ def create_subscriber(
description_=description_,
include_in_schema=include_in_schema,
)


def _validate_input_for_misconfigure(
subject: str,
queue: str, # default ""
pending_msgs_limit: Optional[int],
pending_bytes_limit: Optional[int],
max_msgs: int, # default 0
durable: Optional[str],
config: Optional["api.ConsumerConfig"],
ordered_consumer: bool, # default False
idle_heartbeat: Optional[float],
flow_control: Optional[bool],
deliver_policy: Optional["api.DeliverPolicy"],
headers_only: Optional[bool],
pull_sub: Optional["PullSub"],
kv_watch: Optional["KvWatch"],
obj_watch: Optional["ObjWatch"],
ack_first: bool, # default False
max_workers: int, # default 1
stream: Optional["JStream"],
) -> None:
if not subject and not config:
raise SetupError("You must provide either the `subject` or `config` option.")

if stream and kv_watch:
raise SetupError(
"You can't use both the `stream` and `kv_watch` options simultaneously."
)

if stream and obj_watch:
raise SetupError(
"You can't use both the `stream` and `obj_watch` options simultaneously."
)

if kv_watch and obj_watch:
raise SetupError(
"You can't use both the `kv_watch` and `obj_watch` options simultaneously."
)

if pull_sub and not stream:
raise SetupError(
"The pull subscriber can only be used with the `stream` option."
)

if max_msgs > 0 and any((stream, kv_watch, obj_watch)):
warnings.warn(
"The `max_msgs` option can be used only with a NATS Core Subscriber.",
RuntimeWarning,
stacklevel=4,
)

if not stream:
if obj_watch or kv_watch:
# Obj/Kv Subscriber
if pending_msgs_limit is not None:
warnings.warn(
message="The `pending_msgs_limit` option can be used only with JetStream (Pull/Push) or Core Subscription.",
category=RuntimeWarning,
stacklevel=4,
)

if pending_bytes_limit is not None:
warnings.warn(
message="The `pending_bytes_limit` option can be used only with JetStream (Pull/Push) or Core Subscription.",
category=RuntimeWarning,
stacklevel=4,
)

if queue:
warnings.warn(
message="The `queue` option can be used only with JetStream Push or Core Subscription.",
category=RuntimeWarning,
stacklevel=4,
)

if max_workers > 1:
warnings.warn(
message="The `max_workers` option can be used only with JetStream (Pull/Push) or Core Subscription.",
category=RuntimeWarning,
stacklevel=4,
)

# Core/Obj/Kv Subscriber
if durable:
warnings.warn(
message="The `durable` option can be used only with JetStream (Pull/Push) Subscription.",
category=RuntimeWarning,
stacklevel=4,
)

if config is not None:
warnings.warn(
message="The `config` option can be used only with JetStream (Pull/Push) Subscription.",
category=RuntimeWarning,
stacklevel=4,
)

if ordered_consumer:
warnings.warn(
message="The `ordered_consumer` option can be used only with JetStream (Pull/Push) Subscription.",
category=RuntimeWarning,
stacklevel=4,
)

if idle_heartbeat is not None:
warnings.warn(
message="The `idle_heartbeat` option can be used only with JetStream (Pull/Push) Subscription.",
category=RuntimeWarning,
stacklevel=4,
)

if flow_control:
warnings.warn(
message="The `flow_control` option can be used only with JetStream Push Subscription.",
category=RuntimeWarning,
stacklevel=4,
)

if deliver_policy:
warnings.warn(
message="The `deliver_policy` option can be used only with JetStream (Pull/Push) Subscription.",
category=RuntimeWarning,
stacklevel=4,
)

if headers_only:
warnings.warn(
message="The `headers_only` option can be used only with JetStream (Pull/Push) Subscription.",
category=RuntimeWarning,
stacklevel=4,
)

if ack_first:
warnings.warn(
message="The `ack_first` option can be used only with JetStream Push Subscription.",
category=RuntimeWarning,
stacklevel=4,
)

else:
# JetStream Subscribers
if pull_sub:
if queue:
warnings.warn(
message="The `queue` option has no effect with JetStream Pull Subscription. You probably wanted to use the `durable` option instead.",
category=RuntimeWarning,
stacklevel=4,
)

if ordered_consumer:
warnings.warn(
"The `ordered_consumer` option has no effect with JetStream Pull Subscription. It can only be used with JetStream Push Subscription.",
RuntimeWarning,
stacklevel=4,
)

if ack_first:
warnings.warn(
message="The `ack_first` option has no effect with JetStream Pull Subscription. It can only be used with JetStream Push Subscription.",
category=RuntimeWarning,
stacklevel=4,
)

if flow_control:
warnings.warn(
message="The `flow_control` option has no effect with JetStream Pull Subscription. It can only be used with JetStream Push Subscription.",
category=RuntimeWarning,
stacklevel=4,
)

else:
# JS PushSub
if durable is not None:
warnings.warn(
message="The JetStream Push consumer with the `durable` option can't be scaled horizontally across multiple instances. You probably wanted to use the `queue` option instead. Also, we strongly recommend using the Jetstream PullSubsriber with the `durable` option as the default.",
category=RuntimeWarning,
stacklevel=4,
)

0 comments on commit bd188c7

Please sign in to comment.