diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 2c5b534d9..532a4948c 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -4,6 +4,11 @@ Changelog Dev --- +Other + +- The ``fsspec.asyn.get_loop()`` will always return a loop of a selector policy. + + 2021.05.0 --------- diff --git a/fsspec/asyn.py b/fsspec/asyn.py index 969d68002..a4940716d 100644 --- a/fsspec/asyn.py +++ b/fsspec/asyn.py @@ -4,7 +4,9 @@ import inspect import os import re +import sys import threading +from contextlib import contextmanager from glob import has_magic from .exceptions import FSTimeoutError @@ -87,6 +89,22 @@ def wrapper(*args, **kwargs): return wrapper +@contextmanager +def _selector_policy(): + original_policy = asyncio.get_event_loop_policy() + try: + if ( + sys.version_info >= (3, 8) + and os.name == "nt" + and hasattr(asyncio, "WindowsSelectorEventLoopPolicy") + ): + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) + + yield + finally: + asyncio.set_event_loop_policy(original_policy) + + def get_loop(): """Create or return the default fsspec IO loop @@ -97,7 +115,8 @@ def get_loop(): # repeat the check just in case the loop got filled between the # previous two calls from another thread if loop[0] is None: - loop[0] = asyncio.new_event_loop() + with _selector_policy(): + loop[0] = asyncio.new_event_loop() th = threading.Thread(target=loop[0].run_forever, name="fsspecIO") th.daemon = True th.start() diff --git a/fsspec/tests/test_async.py b/fsspec/tests/test_async.py index 84cf5c4cf..b783cd7f4 100644 --- a/fsspec/tests/test_async.py +++ b/fsspec/tests/test_async.py @@ -1,5 +1,6 @@ import asyncio import inspect +import os import sys import time @@ -112,3 +113,22 @@ async def main(**kwargs): monkeypatch.setitem(fsspec.config.conf, "gather_batch_size", 4) assert sum(asyncio.run(main())) == 32 # override + + +@pytest.mark.skipif(os.name != "nt", reason="only for windows") +def test_windows_policy(): + from asyncio.windows_events import SelectorEventLoop + + loop = fsspec.asyn.get_loop() + policy = asyncio.get_event_loop_policy() + + # Ensure that the created loop always uses selector policy + assert isinstance(loop, SelectorEventLoop) + + # Ensure that the global policy is not changed and it is + # set to the default one. This is important since the + # get_loop() method will temporarily override the policy + # with the one which uses selectors on windows, so this + # check ensures that we are restoring the old policy back + # after our change. + assert isinstance(policy, asyncio.DefaultEventLoopPolicy)