Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: CLI ASGI #1950

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 28 additions & 41 deletions faststream/asgi/app.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import inspect
import logging
import traceback
from contextlib import asynccontextmanager
Expand All @@ -6,7 +7,6 @@
Any,
AsyncIterator,
Dict,
List,
Optional,
Sequence,
Tuple,
Expand Down Expand Up @@ -44,6 +44,15 @@
)


def cast_uvicorn_params(params: Dict[str, Any]) -> Dict[str, Any]:
if port := params.get("port"):
params["port"] = int(port)
if fd := params.get("fd"):
params["fd"] = int(fd)
return params



class AsgiFastStream(Application):
def __init__(
self,
Expand Down Expand Up @@ -148,50 +157,28 @@ async def run(
sleep_time: float = 0.1,
) -> None:
try:
import uvicorn # noqa: F401
from gunicorn.app.base import BaseApplication
import uvicorn
except ImportError as e:
raise RuntimeError(
"You need uvicorn and gunicorn to run FastStream ASGI App via CLI. pip install uvicorn gunicorn"
"You need uvicorn to run FastStream ASGI App via CLI. pip install uvicorn"
) from e

class ASGIRunner(BaseApplication): # type: ignore[misc]
def __init__(self, options: Dict[str, Any], asgi_app: "ASGIApp") -> None:
self.options = options
self.asgi_app = asgi_app
super().__init__()

def load_config(self) -> None:
for k, v in self.options.items():
if k in self.cfg.settings and v is not None:
self.cfg.set(k.lower(), v)

def load(self) -> "ASGIApp":
return self.asgi_app

run_extra_options = run_extra_options or {}

bindings: List[str] = []
host = run_extra_options.pop("host", None)
port = run_extra_options.pop("port", None)
if host is not None and port is not None:
bindings.append(f"{host}:{port}")
elif host is not None:
bindings.append(f"{host}:8000")
elif port is not None:
bindings.append(f"127.0.0.1:{port}")

bind = run_extra_options.get("bind")
if isinstance(bind, list):
bindings.extend(bind) # type: ignore
elif isinstance(bind, str):
bindings.append(bind)

run_extra_options["bind"] = bindings or "127.0.0.1:8000"
# We use gunicorn with uvicorn workers because uvicorn don't support multiple workers
run_extra_options["worker_class"] = "uvicorn.workers.UvicornWorker"

ASGIRunner(run_extra_options, self).run()
run_extra_options = cast_uvicorn_params(run_extra_options or {})

uvicorn_config_params = set(inspect.signature(uvicorn.Config).parameters.keys())

config = uvicorn.Config(
app=self,
log_level=log_level,
**{
key: v
for key, v in run_extra_options.items()
if key in uvicorn_config_params
}, # type: ignore[arg-type]
)

server = uvicorn.Server(config)
await server.serve()

@asynccontextmanager
async def start_lifespan_context(self) -> AsyncIterator[None]:
Expand Down
16 changes: 12 additions & 4 deletions faststream/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from faststream import FastStream
from faststream.__about__ import __version__
from faststream._internal.application import Application
from faststream.asgi.app import AsgiFastStream
from faststream.cli.docs.app import docs_app
from faststream.cli.utils.imports import import_from_string
from faststream.cli.utils.logs import LogLevels, get_log_level, set_log_level
Expand Down Expand Up @@ -146,17 +147,24 @@ def run(
).run()

elif workers > 1:
from faststream.cli.supervisors.multiprocess import Multiprocess

if isinstance(app_obj, FastStream):
from faststream.cli.supervisors.multiprocess import Multiprocess
Multiprocess(
target=_run,
args=(*args, logging.DEBUG),
workers=workers,
).run()
elif isinstance(app_obj, AsgiFastStream):
from faststream.cli.supervisors.asgi_multiprocess import ASGIMultiprocess
ASGIMultiprocess(
target=app,
args=args,
workers=workers,
).run()
else:
args[1]["workers"] = workers
_run(*args)
raise typer.BadParameter(
f"Unexpected app type, expected FastStream or AsgiFastStream, got: {type(app_obj)}."
)

else:
_run_imported_app(
Expand Down
32 changes: 32 additions & 0 deletions faststream/cli/supervisors/asgi_multiprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import inspect
from typing import Dict, Tuple

from faststream.asgi.app import cast_uvicorn_params


class ASGIMultiprocess:
def __init__(self, target: str, args: Tuple[str, Dict[str, str], bool, int], workers: int) -> None:
_, uvicorn_kwargs, is_factory, log_level = args
self._target = target
self._uvicorn_kwargs = cast_uvicorn_params(uvicorn_kwargs or {})
self._workers = workers
self._is_factory = is_factory
self._log_level = log_level

def run(self) -> None:
try:
import uvicorn
except ImportError as e:
raise RuntimeError(
"You need uvicorn to run FastStream ASGI App via CLI. pip install uvicorn"
) from e

uvicorn_params = set(inspect.signature(uvicorn.run).parameters.keys())

uvicorn.run(
self._target,
factory=self._is_factory,
workers=self._workers,
log_level=self._log_level,
**{key: v for key, v in self._uvicorn_kwargs.items() if key in uvicorn_params}
)
3 changes: 3 additions & 0 deletions faststream/cli/utils/imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ def import_from_string(
else:
raise typer.BadParameter(f'"{instance}" is not a factory')

if callable(instance) and not is_factory:
raise typer.BadParameter("Please, use --factory option for callable object")

return module_path, instance


Expand Down
Loading