Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: #1874 support workers for ASGI FastStream #1936

Merged
merged 5 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ search:
- [get_log_level](api/faststream/cli/utils/logs/get_log_level.md)
- [set_log_level](api/faststream/cli/utils/logs/set_log_level.md)
- parser
- [is_bind_arg](api/faststream/cli/utils/parser/is_bind_arg.md)
- [parse_cli_args](api/faststream/cli/utils/parser/parse_cli_args.md)
- [remove_prefix](api/faststream/cli/utils/parser/remove_prefix.md)
- confluent
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/cli/utils/parser/is_bind_arg.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.cli.utils.parser.is_bind_arg
32 changes: 32 additions & 0 deletions docs/docs/en/getting-started/asgi.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,22 @@ uvicorn main:app

It does nothing but launch the app itself as an **ASGI lifespan**.

!!! note
If you want to run your app using several workers, you need to use something else than `uvicorn`.
```shell
faststream run main:app --workers 4
```
```shell
gunicorn -k uvicorn.workers.UvicornWorker main:app --workers=4
```
```shell
granian --interface asgi main:app --workers 4
```
```shell
hypercorn main:app --workers 4
```


### ASGI Routes

It doesn't look very helpful, so let's add some **HTTP** endpoints.
Expand Down Expand Up @@ -137,6 +153,8 @@ app = FastStream(broker).as_asgi(
```shell
faststream run main:app --host 0.0.0.0 --port 8000 --workers 4
```
This possibility built on gunicorn + uvicorn, you need install them to run FastStream ASGI app via CLI.
We send all args directly to gunicorn, you can learn more about it [here](https://github.com/benoitc/gunicorn/blob/master/examples/example_config.py).

## Other ASGI Compatibility

Expand Down Expand Up @@ -166,3 +184,17 @@ app = FastAPI(lifespan=start_broker)
app.mount("/health", make_ping_asgi(broker, timeout=5.0))
app.mount("/asyncapi", make_asyncapi_asgi(FastStream(broker)))
```

!!! tip
You can also bind to unix domain or a file descriptor. FastStream will bind to “127.0.0.1:8000” by default

```shell
faststream run main:app --bind unix:/tmp/socket.sock
```
```shell
faststream run main:app --bind fd://2
```
You can use multiple binds if you want
```shell
faststream run main:app --bind 0.0.0.0:8000 '[::]:8000'
```
65 changes: 46 additions & 19 deletions faststream/asgi/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
Any,
AsyncIterator,
Dict,
List,
Optional,
Sequence,
Tuple,
Expand Down Expand Up @@ -146,25 +147,51 @@ async def run(
run_extra_options: Optional[Dict[str, "SettingField"]] = None,
sleep_time: float = 0.1,
) -> None:
import uvicorn

if not run_extra_options:
run_extra_options = {}
port = int(run_extra_options.pop("port", 8000)) # type: ignore[arg-type]
workers = int(run_extra_options.pop("workers", 1)) # type: ignore[arg-type]
host = str(run_extra_options.pop("host", "localhost"))
fd = int(run_extra_options.pop("fd", -1)) # type: ignore[arg-type]
config = uvicorn.Config(
self,
host=host,
port=port,
log_level=log_level,
workers=workers,
fd=fd if fd != -1 else None,
**run_extra_options,
)
server = uvicorn.Server(config)
await server.serve()
try:
import uvicorn # noqa: F401
from gunicorn.app.base import BaseApplication
except ImportError as e:
raise RuntimeError(
"You need uvicorn and gunicorn to run FastStream ASGI App via CLI. pip install uvicorn gunicorn"
) from e

class ASGIRunner(BaseApplication): # type: ignore[misc]
def __init__(self, options: Dict[str, Any], asgi_app: "ASGIApp") -> None:
self.options = options
self.asgi_app = asgi_app
super().__init__()

def load_config(self) -> None:
for k, v in self.options.items():
if k in self.cfg.settings and v is not None:
self.cfg.set(k.lower(), v)

def load(self) -> "ASGIApp":
return self.asgi_app

run_extra_options = run_extra_options or {}

bindings: List[str] = []
host = run_extra_options.pop("host", None)
port = run_extra_options.pop("port", None)
if host is not None and port is not None:
bindings.append(f"{host}:{port}")
elif host is not None:
bindings.append(f"{host}:8000")
elif port is not None:
bindings.append(f"127.0.0.1:{port}")

bind = run_extra_options.get("bind")
if isinstance(bind, list):
bindings.extend(bind) # type: ignore
elif isinstance(bind, str):
bindings.append(bind)

run_extra_options["bind"] = bindings or "127.0.0.1:8000"
# We use gunicorn with uvicorn workers because uvicorn don't support multiple workers
run_extra_options["worker_class"] = "uvicorn.workers.UvicornWorker"

ASGIRunner(run_extra_options, self).run()

@asynccontextmanager
async def start_lifespan_context(self) -> AsyncIterator[None]:
Expand Down
4 changes: 2 additions & 2 deletions faststream/broker/fastapi/get_dependant.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def _patch_fastapi_dependent(dependant: "Dependant") -> "Dependant":
lambda x: isinstance(x, FieldInfo),
p.field_info.metadata or (),
),
Field(**field_data),
Field(**field_data), # type: ignore[pydantic-field,unused-ignore]
)

else:
Expand All @@ -109,7 +109,7 @@ def _patch_fastapi_dependent(dependant: "Dependant") -> "Dependant":
"le": info.field_info.le,
}
)
f = Field(**field_data)
f = Field(**field_data) # type: ignore[pydantic-field,unused-ignore]

params_unique[p.name] = (
info.annotation,
Expand Down
13 changes: 12 additions & 1 deletion faststream/cli/utils/parser.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
import re
from functools import reduce
from typing import TYPE_CHECKING, Dict, List, Tuple

if TYPE_CHECKING:
from faststream.types import SettingField


def is_bind_arg(arg: str) -> bool:
"""Determine whether the received argument refers to --bind.

bind arguments are like: 0.0.0.0:8000, [::]:8000, fd://2, /tmp/socket.sock

"""
bind_regex = re.compile(r":\d+$|:/+\d|:/[a-zA-Z0-9._-]+/[a-zA-Z0-9._-]+")
return bool(bind_regex.search(arg))


def parse_cli_args(*args: str) -> Tuple[str, Dict[str, "SettingField"]]:
"""Parses command line arguments."""
extra_kwargs: Dict[str, SettingField] = {}
Expand All @@ -22,7 +33,7 @@ def parse_cli_args(*args: str) -> Tuple[str, Dict[str, "SettingField"]]:
),
"-",
]:
if ":" in item:
if ":" in item and not is_bind_arg(item):
app = item

else:
Expand Down
8 changes: 4 additions & 4 deletions faststream/rabbit/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,10 @@ def __init__(
asyncapi_url = str(amqp_url)

# respect ascynapi_url argument scheme
builded_asyncapi_url = urlparse(asyncapi_url)
self.virtual_host = builded_asyncapi_url.path
built_asyncapi_url = urlparse(asyncapi_url)
self.virtual_host = built_asyncapi_url.path
if protocol is None:
protocol = builded_asyncapi_url.scheme
protocol = built_asyncapi_url.scheme

super().__init__(
url=str(amqp_url),
Expand All @@ -268,7 +268,7 @@ def __init__(
# AsyncAPI args
description=description,
asyncapi_url=asyncapi_url,
protocol=protocol or builded_asyncapi_url.scheme,
protocol=protocol or built_asyncapi_url.scheme,
protocol_version=protocol_version,
security=security,
tags=tags,
Expand Down
34 changes: 25 additions & 9 deletions tests/cli/utils/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import pytest

from faststream.cli.utils.parser import parse_cli_args
from faststream.cli.utils.parser import is_bind_arg, parse_cli_args

APPLICATION = "module:app"

Expand All @@ -23,19 +23,20 @@
)
ARG6 = ("--some-key",)
ARG7 = ("--k7", "1", "2", "--k7", "3")
ARG8 = ("--bind", "[::]:8000", "0.0.0.0:8000", "fd://2")


@pytest.mark.parametrize(
"args",
( # noqa: PT007
(APPLICATION, *ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7),
(*ARG1, APPLICATION, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7),
(*ARG1, *ARG2, APPLICATION, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7),
(*ARG1, *ARG2, *ARG3, APPLICATION, *ARG4, *ARG5, *ARG6, *ARG7),
(*ARG1, *ARG2, *ARG3, *ARG4, APPLICATION, *ARG5, *ARG6, *ARG7),
(*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, APPLICATION, *ARG6, *ARG7),
(*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, APPLICATION, *ARG7),
(*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7, APPLICATION),
(APPLICATION, *ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7, *ARG8),
(*ARG1, APPLICATION, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7, *ARG8),
(*ARG1, *ARG2, APPLICATION, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7, *ARG8),
(*ARG1, *ARG2, *ARG3, APPLICATION, *ARG4, *ARG5, *ARG6, *ARG7, *ARG8),
(*ARG1, *ARG2, *ARG3, *ARG4, APPLICATION, *ARG5, *ARG6, *ARG7, *ARG8),
(*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, APPLICATION, *ARG6, *ARG7, *ARG8),
(*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, APPLICATION, *ARG7, *ARG8),
(*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7, *ARG8, APPLICATION),
),
)
def test_custom_argument_parsing(args: Tuple[str]):
Expand All @@ -49,4 +50,19 @@ def test_custom_argument_parsing(args: Tuple[str]):
"k5": ["1", "1"],
"some_key": True,
"k7": ["1", "2", "3"],
"bind": ["[::]:8000", "0.0.0.0:8000", "fd://2"],
}


@pytest.mark.parametrize(
"args", ["0.0.0.0:8000", "[::]:8000", "fd://2", "unix:/tmp/socket.sock"]
)
def test_bind_arg(args: str):
assert is_bind_arg(args) is True


@pytest.mark.parametrize(
"args", ["main:app", "src.main:app", "examples.nats.e01_basic:app2"]
)
def test_not_bind_arg(args: str):
assert is_bind_arg(args) is False
Loading