-
-
Notifications
You must be signed in to change notification settings - Fork 64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Recommend a way to load/reload settings in async environment #479
Comments
Here's a short proof that I wrote for fun and to illustrate the problem with more depth and room for interaction. Imagine there is a task group with 10000 async tasks that first load the newest config from the disk and then sleep for 0 seconds (and then check how much they actually slept). import asyncio
import time
from collections.abc import Awaitable, Callable
from concurrent.futures import ProcessPoolExecutor
from pydantic_settings import BaseSettings
max_sleeping_error = 0.0
class SomeSettings(
BaseSettings,
env_file=".env",
toml_file="config.toml",
extra="allow",
):
pass
async def without_executor() -> None:
# simply load the settings, which is blocking.
SomeSettings()
async def with_executor() -> None:
# load the settings in a worker thread, which isn't blocking (creating a thread is blocking though)
await asyncio.to_thread(SomeSettings)
async def measure_sleeping(callback: Callable[..., Awaitable[None]], sleep_for: float = 0.0) -> None:
global max_sleeping_error
# callback loads the settings
await callback()
started = time.perf_counter()
# this lasts longer and longer as more and more blocking calls occur
await asyncio.sleep(sleep_for)
slept_for = time.perf_counter() - started
max_sleeping_error = max(max_sleeping_error, slept_for - sleep_for)
async def run_test(callback: Callable[..., Awaitable[None]]) -> None:
async with asyncio.TaskGroup() as tg:
for _ in range(10000):
tg.create_task(measure_sleeping(callback))
def test_without_executor() -> None:
# check how late asyncio.sleep() calls will be if we dont use an executor
asyncio.run(run_test(callback=without_executor))
return max_sleeping_error
def test_with_executor() -> None:
# check how late asyncio.sleep() calls will be if we use an executor
asyncio.run(run_test(callback=with_executor))
return max_sleeping_error
if __name__ == "__main__":
with ProcessPoolExecutor() as pool:
fut_with_executor = pool.submit(test_with_executor)
fut_without_executor = pool.submit(test_without_executor)
print("[with executor]")
print("max sleeping error:", fut_with_executor.result(), end="\n\n")
print("[without executor]")
print("max sleeping error:", fut_without_executor.result()) # can even exceed 1s! If you run that, you'll see that some of the late tasks which use the executor make the event loop maximally stretch the time needed to wait for |
Thanks @bswck for the issue and explanation. Let's document it at the end of our docs before In-place reloading |
This is how I load/reload my settings in an asynchronous app:
The reason why I'm doing it this way is because the
Settings
settings model hastoml_file
specified in its model config which causes the initializer to directly interact with the blocking disk I/O when resolving config values:pydantic-settings/pydantic_settings/sources.py
Lines 1993 to 1996 in 6fe3bd1
Calls to
open()
are blocking and halt the entire event loop for possibly longer than regular statements betweenawait
s in typical coroutines, which can lead to unsound effects.Let's create an API for loading configuration asynchronously or hint the users in the docs (possibly here and here) to use
asyncio.to_thread
/loop.run_in_executor
in order to use a worker thread that can make the entire I/O code non-blocking, wrapped in a future and correctly awaited in a coroutine.The big question is whether loading the config from sources is currently thread-safe (I'm guessing so).
The text was updated successfully, but these errors were encountered: