Skip to content

Commit

Permalink
Merge pull request #658 from isidentical/fsspec-aio-policy
Browse files Browse the repository at this point in the history
async: always use selector loop policy
  • Loading branch information
martindurant authored Jun 7, 2021
2 parents e63145d + ef284c5 commit c66cbed
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 1 deletion.
5 changes: 5 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ Changelog
Dev
---

Other

- The ``fsspec.asyn.get_loop()`` will always return a loop of a selector policy.


2021.05.0
---------

Expand Down
21 changes: 20 additions & 1 deletion fsspec/asyn.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import inspect
import os
import re
import sys
import threading
from contextlib import contextmanager
from glob import has_magic

from .exceptions import FSTimeoutError
Expand Down Expand Up @@ -87,6 +89,22 @@ def wrapper(*args, **kwargs):
return wrapper


@contextmanager
def _selector_policy():
original_policy = asyncio.get_event_loop_policy()
try:
if (
sys.version_info >= (3, 8)
and os.name == "nt"
and hasattr(asyncio, "WindowsSelectorEventLoopPolicy")
):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

yield
finally:
asyncio.set_event_loop_policy(original_policy)


def get_loop():
"""Create or return the default fsspec IO loop
Expand All @@ -97,7 +115,8 @@ def get_loop():
# repeat the check just in case the loop got filled between the
# previous two calls from another thread
if loop[0] is None:
loop[0] = asyncio.new_event_loop()
with _selector_policy():
loop[0] = asyncio.new_event_loop()
th = threading.Thread(target=loop[0].run_forever, name="fsspecIO")
th.daemon = True
th.start()
Expand Down
20 changes: 20 additions & 0 deletions fsspec/tests/test_async.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import inspect
import os
import sys
import time

Expand Down Expand Up @@ -112,3 +113,22 @@ async def main(**kwargs):

monkeypatch.setitem(fsspec.config.conf, "gather_batch_size", 4)
assert sum(asyncio.run(main())) == 32 # override


@pytest.mark.skipif(os.name != "nt", reason="only for windows")
def test_windows_policy():
from asyncio.windows_events import SelectorEventLoop

loop = fsspec.asyn.get_loop()
policy = asyncio.get_event_loop_policy()

# Ensure that the created loop always uses selector policy
assert isinstance(loop, SelectorEventLoop)

# Ensure that the global policy is not changed and it is
# set to the default one. This is important since the
# get_loop() method will temporarily override the policy
# with the one which uses selectors on windows, so this
# check ensures that we are restoring the old policy back
# after our change.
assert isinstance(policy, asyncio.DefaultEventLoopPolicy)

0 comments on commit c66cbed

Please sign in to comment.