Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: init concurrent rpc for rabbit broker #1605

Open
wants to merge 2 commits into
base: feat/rmq-update
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
"filename": "docs/docs/en/release.md",
"hashed_secret": "35675e68f4b5af7b995d9205ad0fc43842f16450",
"is_verified": false,
"line_number": 1325,
"line_number": 1376,
"is_secret": false
}
],
Expand Down Expand Up @@ -163,5 +163,5 @@
}
]
},
"generated_at": "2024-06-10T09:56:52Z"
"generated_at": "2024-07-18T16:41:25Z"
}
2 changes: 1 addition & 1 deletion CODE_OF_CONDUCT.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
We as members, contributors, and leaders pledge to make participation in our
community a harassment-free experience for everyone, regardless of age, body
size, visible or invisible disability, ethnicity, sex characteristics, gender
identity and expression, level of experience, education, socio-economic status,
identity and expression, level of experience, education, socioeconomic status,
nationality, personal appearance, race, caste, color, religion, or sexual
identity and orientation.

Expand Down
31 changes: 30 additions & 1 deletion docs/docs/en/release.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,35 @@ hide:
---

# Release Notes
## 0.5.14

### What's Changed
* Update Release Notes for 0.5.13 by @faststream-release-notes-updater in [#1548](https://github.com/airtai/faststream/pull/1548){.external-link target="_blank"}
* Add allow_auto_create_topics to make automatic topic creation configurable by [@kumaranvpl](https://github.com/kumaranvpl){.external-link target="_blank"} in [#1556](https://github.com/airtai/faststream/pull/1556){.external-link target="_blank"}


**Full Changelog**: [#0.5.13...0.5.14](https://github.com/airtai/faststream/compare/0.5.13...0.5.14){.external-link target="_blank"}

## 0.5.13

### What's Changed

* feat: nats filter JS subscription support by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1519](https://github.com/airtai/faststream/pull/1519){.external-link target="_blank"}
* fix: correct RabbitExchange processing by OTEL in broker.publish case by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1521](https://github.com/airtai/faststream/pull/1521){.external-link target="_blank"}
* fix: correct Nats ObjectStorage get file behavior inside watch subscriber by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1523](https://github.com/airtai/faststream/pull/1523){.external-link target="_blank"}
* Resolve Issue 1386, Add rpc_prefix by [@aKardasz](https://github.com/aKardasz){.external-link target="_blank"} in [#1484](https://github.com/airtai/faststream/pull/1484){.external-link target="_blank"}
* fix: correct spans linking in batches case by [@draincoder](https://github.com/draincoder){.external-link target="_blank"} in [#1532](https://github.com/airtai/faststream/pull/1532){.external-link target="_blank"}
* fix (#1539): correct anyio.create_memory_object_stream annotation by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1541](https://github.com/airtai/faststream/pull/1541){.external-link target="_blank"}
* fix: correct publish_coverage CI by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1536](https://github.com/airtai/faststream/pull/1536){.external-link target="_blank"}
* Add NatsBroker.new_inbox() by [@maxalbert](https://github.com/maxalbert){.external-link target="_blank"} in [#1543](https://github.com/airtai/faststream/pull/1543){.external-link target="_blank"}
* fix (#1544): correct Redis message nack & reject signature by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1546](https://github.com/airtai/faststream/pull/1546){.external-link target="_blank"}

### New Contributors
* [@aKardasz](https://github.com/aKardasz){.external-link target="_blank"} made their first contribution in [#1484](https://github.com/airtai/faststream/pull/1484){.external-link target="_blank"}
* [@maxalbert](https://github.com/maxalbert){.external-link target="_blank"} made their first contribution in [#1543](https://github.com/airtai/faststream/pull/1543){.external-link target="_blank"}

**Full Changelog**: [#0.5.12...0.5.13](https://github.com/airtai/faststream/compare/0.5.12...0.5.13){.external-link target="_blank"}

## 0.5.12

### What's Changed
Expand Down Expand Up @@ -324,7 +353,7 @@ You can find more information about it in the official [**aiokafka** doc](https:

`pattern` option was added too, but it is still experimental and does not support `Path`

3. [`Path`](https://faststream.airt.ai/latest/nats/message/#subject-pattern-access) feature performance was increased. Also, `Path` is suitable for NATS `PullSub` batch subscribtion as well now.
3. [`Path`](https://faststream.airt.ai/latest/nats/message/#subject-pattern-access) feature performance was increased. Also, `Path` is suitable for NATS `PullSub` batch subscription as well now.

```python
from faststream import NatsBroker, PullSub
Expand Down
2 changes: 1 addition & 1 deletion faststream/rabbit/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def __init__(
max_channel_pool_size: Annotated[
int,
Doc("Max channel pool size"),
] = 1,
] = 2, # NOTE: because we're sharing channels between consumers and producers
) -> None:
security_args = parse_security(security)

Expand Down
4 changes: 2 additions & 2 deletions faststream/rabbit/broker/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(
publisher_confirms: bool,
on_return_raises: bool,
) -> None:
self._connection_pool: "Pool[RobustConnection]" = Pool(
self._connection_pool: Pool[RobustConnection] = Pool(
lambda: connect_robust(
url=url,
timeout=timeout,
Expand All @@ -36,7 +36,7 @@ def __init__(
max_size=connection_pool_size,
)

self._channel_pool: "Pool[RobustChannel]" = Pool(
self._channel_pool: Pool[RobustChannel] = Pool(
lambda: self._get_channel(
channel_number=channel_number,
publisher_confirms=publisher_confirms,
Expand Down
48 changes: 40 additions & 8 deletions faststream/rabbit/helpers/declarer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from contextlib import AsyncExitStack
from typing import TYPE_CHECKING, Dict, Optional, cast
from contextlib import AsyncExitStack, asynccontextmanager
from typing import TYPE_CHECKING, AsyncGenerator, Dict, Optional, Tuple, cast

if TYPE_CHECKING:
import aio_pika
Expand All @@ -12,8 +12,13 @@ class RabbitDeclarer:
"""An utility class to declare RabbitMQ queues and exchanges."""

__connection_manager: "ConnectionManager"
__queues: Dict["RabbitQueue", "aio_pika.RobustQueue"]
__exchanges: Dict["RabbitExchange", "aio_pika.RobustExchange"]
__queues: Dict[
Tuple[Optional["aio_pika.RobustChannel"], "RabbitQueue"], "aio_pika.RobustQueue"
]
__exchanges: Dict[
Tuple[Optional["aio_pika.RobustChannel"], "RabbitExchange"],
"aio_pika.RobustExchange",
]

def __init__(self, connection_manager: "ConnectionManager") -> None:
self.__connection_manager = connection_manager
Expand All @@ -28,14 +33,18 @@ async def declare_queue(
channel: Optional["aio_pika.RobustChannel"] = None,
) -> "aio_pika.RobustQueue":
"""Declare a queue."""
if (queue_obj := self.__queues.get(queue)) is None:
# NOTE: It would return the queue linked to another channel if it was already declared
# unless the channel is part of the key
if (queue_obj := self.__queues.get((channel, queue))) is None:
async with AsyncExitStack() as stack:
if channel is None:
channel = await stack.enter_async_context(
self.__connection_manager.acquire_channel()
)
if (channel, queue) in self.__queues:
return self.__queues[(channel, queue)]

self.__queues[queue] = queue_obj = cast(
self.__queues[(channel, queue)] = queue_obj = cast(
"aio_pika.RobustQueue",
await channel.declare_queue(
name=queue.name,
Expand All @@ -59,20 +68,24 @@ async def declare_exchange(
channel: Optional["aio_pika.RobustChannel"] = None,
) -> "aio_pika.RobustExchange":
"""Declare an exchange, parent exchanges and bind them each other."""
if exch := self.__exchanges.get(exchange):
# NOTE: It would return the queue linked to another channel if it was already declared
# unless the channel is part of the key
if exch := self.__exchanges.get((channel, exchange)):
return exch

async with AsyncExitStack() as stack:
if channel is None:
channel = await stack.enter_async_context(
self.__connection_manager.acquire_channel()
)
if (channel, exchange) in self.__exchanges:
return self.__exchanges[(channel, exchange)]

if not exchange.name:
return channel.default_exchange

else:
self.__exchanges[exchange] = exch = cast(
self.__exchanges[(channel, exchange)] = exch = cast(
"aio_pika.RobustExchange",
await channel.declare_exchange(
name=exchange.name,
Expand Down Expand Up @@ -102,3 +115,22 @@ async def declare_exchange(
)

return exch # type: ignore[return-value]

@asynccontextmanager
async def declare_queue_scope(
self,
queue: "RabbitQueue",
passive: bool = False,
*,
channel: Optional["aio_pika.RobustChannel"] = None,
) -> AsyncGenerator["aio_pika.RobustQueue", None]:
"""Declare a queue and return it with a context manager."""
async with AsyncExitStack() as stack:
if channel is None:
channel = await stack.enter_async_context(
self.__connection_manager.acquire_channel()
)

yield await self.declare_queue(
queue=queue, passive=passive, channel=channel
)
104 changes: 58 additions & 46 deletions faststream/rabbit/publisher/producer.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from contextlib import asynccontextmanager
from typing import (
TYPE_CHECKING,
Any,
AsyncContextManager,
AsyncGenerator,
Optional,
Type,
Tuple,
Union,
cast,
)
Expand All @@ -17,13 +19,15 @@
from faststream.exceptions import WRONG_PUBLISH_ARGS
from faststream.rabbit.parser import AioPikaParser
from faststream.rabbit.schemas import RABBIT_REPLY, RabbitExchange
from faststream.utils.functions import fake_context, timeout_scope
from faststream.utils.classes import Singleton
from faststream.utils.functions import (
fake_context_yielding,
timeout_scope,
)

if TYPE_CHECKING:
from types import TracebackType

import aiormq
from aio_pika import IncomingMessage, RobustChannel, RobustQueue
from aio_pika import IncomingMessage, RobustChannel
from aio_pika.abc import DateType, HeadersType, TimeoutType
from anyio.streams.memory import MemoryObjectReceiveStream

Expand All @@ -50,8 +54,7 @@ def __init__(
decoder: Optional["CustomCallable"],
) -> None:
self.declarer = declarer

self._rpc_lock = anyio.Lock()
self.rpc_manager = _RPCManager(declarer=declarer)

default_parser = AioPikaParser()
self._parser = resolve_custom_func(parser, default_parser.parse_message)
Expand Down Expand Up @@ -86,24 +89,25 @@ async def publish( # type: ignore[override]
) -> Optional[Any]:
"""Publish a message to a RabbitMQ queue."""
context: AsyncContextManager[
Optional[MemoryObjectReceiveStream[IncomingMessage]]
Union[
Tuple[MemoryObjectReceiveStream[IncomingMessage], RobustChannel],
Tuple[None, None],
]
]
channel: Optional["RobustChannel"]
channel: Optional[RobustChannel]
response_queue: Optional[MemoryObjectReceiveStream[IncomingMessage]]

if rpc:
if reply_to is not None:
raise WRONG_PUBLISH_ARGS

rmq_queue = await self.declarer.declare_queue(RABBIT_REPLY)
channel = cast("RobustChannel", rmq_queue.channel)
context = _RPCCallback(self._rpc_lock, rmq_queue)
context = self.rpc_manager()
reply_to = RABBIT_REPLY.name

else:
channel = None
context = fake_context()
context = fake_context_yielding(with_yield=(None, None))

async with context as response_queue:
async with context as (response_queue, channel):
r = await self._publish(
message=message,
exchange=exchange,
Expand Down Expand Up @@ -197,37 +201,45 @@ async def _publish(
)


class _RPCCallback:
"""A class provides an RPC lock."""

def __init__(self, lock: "anyio.Lock", callback_queue: "RobustQueue") -> None:
self.lock = lock
self.queue = callback_queue
class _RPCManager(Singleton):
"""A class that provides an RPC lock."""

async def __aenter__(self) -> "MemoryObjectReceiveStream[IncomingMessage]":
(
send_response_stream,
receive_response_stream,
) = anyio.create_memory_object_stream[AbstractIncomingMessage](
max_buffer_size=1
)
await self.lock.acquire()

self.consumer_tag = await self.queue.consume(
callback=send_response_stream.send,
no_ack=True,
)

return cast(
"MemoryObjectReceiveStream[IncomingMessage]",
receive_response_stream,
)
def __init__(self, declarer: "RabbitDeclarer") -> None:
self.declarer = declarer

async def __aexit__(
@asynccontextmanager
async def __call__(
self,
exc_type: Optional[Type[BaseException]] = None,
exc_val: Optional[BaseException] = None,
exc_tb: Optional["TracebackType"] = None,
) -> None:
self.lock.release()
await self.queue.cancel(self.consumer_tag)
) -> AsyncGenerator[
Tuple[
"MemoryObjectReceiveStream[IncomingMessage]",
"RobustChannel",
],
None,
]:
# NOTE: this allows us to make sure the channel is only used by a single
# RPC call at a time, however, if the channel pool is used for both consuming
# and producing, they will be blocked by each other
async with self.declarer.declare_queue_scope(RABBIT_REPLY) as queue:
consumer_tag = None
try:
(
send_response_stream,
receive_response_stream,
) = anyio.create_memory_object_stream[AbstractIncomingMessage](
max_buffer_size=1
)
consumer_tag = await queue.consume(
callback=send_response_stream.send, # type: ignore[arg-type]
no_ack=True,
)
yield (
cast(
"MemoryObjectReceiveStream[IncomingMessage]",
receive_response_stream,
),
cast("RobustChannel", queue.channel),
)
finally:
if consumer_tag is not None:
await queue.cancel(consumer_tag) # type: ignore[index]
9 changes: 9 additions & 0 deletions faststream/utils/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ async def fake_context(*args: Any, **kwargs: Any) -> AsyncIterator[None]:
yield None


@asynccontextmanager
async def fake_context_yielding(
*args: Any,
with_yield: F_Return = None, # type: ignore[assignment]
**kwargs: Any,
) -> AsyncIterator[F_Return]:
yield with_yield


@contextmanager
def sync_fake_context(*args: Any, **kwargs: Any) -> Iterator[None]:
yield None
Expand Down
Loading