Skip to content

Commit

Permalink
chore: merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed Nov 22, 2024
2 parents 7bd03af + 9bc7a05 commit f269339
Show file tree
Hide file tree
Showing 36 changed files with 411 additions and 1,397 deletions.
4 changes: 1 addition & 3 deletions .github/workflows/pr_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: |
3.9
3.10
python-version: "3.12"
- name: Set $PY environment variable
run: echo "PY=$(python -VV | sha256sum | cut -d' ' -f1)" >> $GITHUB_ENV
- uses: actions/cache@v4
Expand Down
1,246 changes: 0 additions & 1,246 deletions docs/docs/SUMMARY.md

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.kafka.subscriber.asyncapi.AsyncAPIConcurrentDefaultSubscriber
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.kafka.subscriber.usecase.ConcurrentDefaultSubscriber
4 changes: 2 additions & 2 deletions faststream/_internal/fastapi/get_dependant.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def _patch_fastapi_dependent(dependant: "Dependant") -> "Dependant":
lambda x: isinstance(x, FieldInfo),
p.field_info.metadata or (),
),
Field(**field_data), # type: ignore[pydantic-field]
Field(**field_data),
)

else:
Expand All @@ -109,7 +109,7 @@ def _patch_fastapi_dependent(dependant: "Dependant") -> "Dependant":
"le": info.field_info.le,
},
)
f = Field(**field_data) # type: ignore[pydantic-field]
f = Field(**field_data)

params_unique[p.name] = (
info.annotation,
Expand Down
8 changes: 4 additions & 4 deletions faststream/_internal/subscriber/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@


class TasksMixin(SubscriberUsecase[Any]):
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.tasks: list[asyncio.Task[Any]] = []

def add_task(self, coro: Coroutine[Any, Any, Any]) -> None:
Expand All @@ -36,7 +36,7 @@ class ConcurrentMixin(TasksMixin):

def __init__(
self,
*,
*args: Any,
max_workers: int,
**kwargs: Any,
) -> None:
Expand All @@ -47,7 +47,7 @@ def __init__(
)
self.limiter = anyio.Semaphore(max_workers)

super().__init__(**kwargs)
super().__init__(*args, **kwargs)

def start_consume_task(self) -> None:
self.add_task(self._serve_consume_queue())
Expand Down
2 changes: 0 additions & 2 deletions faststream/_internal/subscriber/proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from faststream._internal.types import (
BrokerMiddleware,
CustomCallable,
Filter,
SubscriberMiddleware,
)
from faststream.message import StreamMessage
Expand Down Expand Up @@ -92,7 +91,6 @@ async def get_one(
def add_call(
self,
*,
filter_: "Filter[Any]",
parser_: "CustomCallable",
decoder_: "CustomCallable",
middlewares_: Iterable["SubscriberMiddleware[Any]"],
Expand Down
22 changes: 1 addition & 21 deletions faststream/_internal/subscriber/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
)

import anyio
from typing_extensions import Literal, Self, overload
from typing_extensions import Self

from faststream._internal.types import MsgType
from faststream._internal.utils.functions import return_input, to_async
Expand All @@ -31,26 +31,6 @@
from faststream.middlewares import BaseMiddleware


@overload
async def process_msg(
msg: Literal[None],
middlewares: Iterable["BaseMiddleware"],
parser: Callable[[MsgType], Awaitable["StreamMessage[MsgType]"]],
decoder: Callable[["StreamMessage[MsgType]"], "Any"],
source_type: SourceType = SourceType.CONSUME,
) -> None: ...


@overload
async def process_msg(
msg: MsgType,
middlewares: Iterable["BaseMiddleware"],
parser: Callable[[MsgType], Awaitable["StreamMessage[MsgType]"]],
decoder: Callable[["StreamMessage[MsgType]"], "Any"],
source_type: SourceType = SourceType.CONSUME,
) -> "StreamMessage[MsgType]": ...


async def process_msg(
msg: Optional[MsgType],
middlewares: Iterable["BaseMiddleware"],
Expand Down
2 changes: 1 addition & 1 deletion faststream/confluent/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
Partition = TypeVar("Partition")


class KafkaBroker(
class KafkaBroker( # type: ignore[misc]
KafkaRegistrator,
BrokerUsecase[
Union[
Expand Down
94 changes: 47 additions & 47 deletions faststream/confluent/broker/registrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ class KafkaRegistrator(
):
"""Includable to KafkaBroker router."""

_subscribers: list[
_subscribers: list[ # type: ignore[assignment]
Union["SpecificationBatchSubscriber", "SpecificationDefaultSubscriber"]
]
_publishers: list[
_publishers: list[ # type: ignore[assignment]
Union["SpecificationBatchPublisher", "SpecificationDefaultPublisher"]
]

Expand Down Expand Up @@ -1154,52 +1154,50 @@ def subscriber(
msg = "You should install `group_id` with manual commit mode"
raise SetupError(msg)

subscriber = super().subscriber(
create_subscriber(
*topics,
polling_interval=polling_interval,
partitions=partitions,
batch=batch,
max_records=max_records,
group_id=group_id,
connection_data={
"group_instance_id": group_instance_id,
"fetch_max_wait_ms": fetch_max_wait_ms,
"fetch_max_bytes": fetch_max_bytes,
"fetch_min_bytes": fetch_min_bytes,
"max_partition_fetch_bytes": max_partition_fetch_bytes,
"auto_offset_reset": auto_offset_reset,
"enable_auto_commit": auto_commit,
"auto_commit_interval_ms": auto_commit_interval_ms,
"check_crcs": check_crcs,
"partition_assignment_strategy": partition_assignment_strategy,
"max_poll_interval_ms": max_poll_interval_ms,
"session_timeout_ms": session_timeout_ms,
"heartbeat_interval_ms": heartbeat_interval_ms,
"isolation_level": isolation_level,
},
is_manual=not auto_commit,
# subscriber args
ack_policy=ack_policy,
no_ack=no_ack,
no_reply=no_reply,
broker_middlewares=self.middlewares,
broker_dependencies=self._dependencies,
# Specification
title_=title,
description_=description,
include_in_schema=self._solve_include_in_schema(include_in_schema),
),
subscriber = create_subscriber(
*topics,
polling_interval=polling_interval,
partitions=partitions,
batch=batch,
max_records=max_records,
group_id=group_id,
connection_data={
"group_instance_id": group_instance_id,
"fetch_max_wait_ms": fetch_max_wait_ms,
"fetch_max_bytes": fetch_max_bytes,
"fetch_min_bytes": fetch_min_bytes,
"max_partition_fetch_bytes": max_partition_fetch_bytes,
"auto_offset_reset": auto_offset_reset,
"enable_auto_commit": auto_commit,
"auto_commit_interval_ms": auto_commit_interval_ms,
"check_crcs": check_crcs,
"partition_assignment_strategy": partition_assignment_strategy,
"max_poll_interval_ms": max_poll_interval_ms,
"session_timeout_ms": session_timeout_ms,
"heartbeat_interval_ms": heartbeat_interval_ms,
"isolation_level": isolation_level,
},
is_manual=not auto_commit,
# subscriber args
ack_policy=ack_policy,
no_ack=no_ack,
no_reply=no_reply,
broker_middlewares=self.middlewares,
broker_dependencies=self._dependencies,
# Specification
title_=title,
description_=description,
include_in_schema=self._solve_include_in_schema(include_in_schema),
)

if batch:
return cast("SpecificationBatchSubscriber", subscriber).add_call(
parser_=parser or self._parser,
decoder_=decoder or self._decoder,
dependencies_=dependencies,
middlewares_=middlewares,
)
return cast("SpecificationDefaultSubscriber", subscriber).add_call(
subscriber = cast("SpecificationBatchSubscriber", subscriber)
else:
subscriber = cast("SpecificationDefaultSubscriber", subscriber)

subscriber = super().subscriber(subscriber) # type: ignore[arg-type,assignment]

return subscriber.add_call(
parser_=parser or self._parser,
decoder_=decoder or self._decoder,
dependencies_=dependencies,
Expand Down Expand Up @@ -1535,6 +1533,8 @@ def publisher(
)

if batch:
return cast("SpecificationBatchPublisher", super().publisher(publisher))
publisher = cast("SpecificationBatchPublisher", publisher)
else:
publisher = cast("SpecificationDefaultPublisher", publisher)

return cast("SpecificationDefaultPublisher", super().publisher(publisher))
return super().publisher(publisher)
2 changes: 1 addition & 1 deletion faststream/confluent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ async def getmany(
) -> tuple[Message, ...]:
"""Consumes a batch of messages from Kafka and groups them by topic and partition."""
raw_messages: list[Optional[Message]] = await call_or_await(
self.consumer.consume,
self.consumer.consume, # type: ignore[arg-type]
num_messages=max_records or 10,
timeout=timeout,
)
Expand Down
2 changes: 1 addition & 1 deletion faststream/confluent/fastapi/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ def __init__(
graceful_timeout=graceful_timeout,
decoder=decoder,
parser=parser,
middlewares=middlewares,
middlewares=middlewares, # type: ignore[arg-type]
schema_url=schema_url,
setup_state=setup_state,
# logger options
Expand Down
21 changes: 15 additions & 6 deletions faststream/confluent/publisher/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Literal,
Optional,
Union,
cast,
overload,
)

Expand Down Expand Up @@ -68,8 +69,9 @@ def create_publisher(
headers: Optional[dict[str, str]],
reply_to: str,
# Publisher args
broker_middlewares: Iterable[
"BrokerMiddleware[Union[tuple[ConfluentMsg, ...], ConfluentMsg]]"
broker_middlewares: Union[
Iterable["BrokerMiddleware[ConfluentMsg]"],
Iterable["BrokerMiddleware[tuple[ConfluentMsg, ...]]"],
],
middlewares: Iterable["PublisherMiddleware"],
# Specification args
Expand All @@ -92,8 +94,9 @@ def create_publisher(
headers: Optional[dict[str, str]],
reply_to: str,
# Publisher args
broker_middlewares: Iterable[
"BrokerMiddleware[Union[tuple[ConfluentMsg, ...], ConfluentMsg]]"
broker_middlewares: Union[
Iterable["BrokerMiddleware[ConfluentMsg]"],
Iterable["BrokerMiddleware[tuple[ConfluentMsg, ...]]"],
],
middlewares: Iterable["PublisherMiddleware"],
# Specification args
Expand All @@ -115,7 +118,10 @@ def create_publisher(
partition=partition,
headers=headers,
reply_to=reply_to,
broker_middlewares=broker_middlewares,
broker_middlewares=cast(
Iterable["BrokerMiddleware[tuple[ConfluentMsg, ...]]"],
broker_middlewares,
),
middlewares=middlewares,
schema_=schema_,
title_=title_,
Expand All @@ -130,7 +136,10 @@ def create_publisher(
partition=partition,
headers=headers,
reply_to=reply_to,
broker_middlewares=broker_middlewares,
broker_middlewares=cast(
Iterable["BrokerMiddleware[ConfluentMsg]"],
broker_middlewares,
),
middlewares=middlewares,
schema_=schema_,
title_=title_,
Expand Down
2 changes: 1 addition & 1 deletion faststream/confluent/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ def __init__(
# basic args
prefix=prefix,
dependencies=dependencies,
middlewares=middlewares,
middlewares=middlewares, # type: ignore[arg-type]
parser=parser,
decoder=decoder,
include_in_schema=include_in_schema,
Expand Down
Loading

0 comments on commit f269339

Please sign in to comment.