Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async: always use selector loop policy #658

Merged
merged 4 commits into from
Jun 7, 2021

Conversation

isidentical
Copy link
Member

Resolves #656

@martindurant
Copy link
Member

I'll have to defer to you on this one, I don't know much about this loop choice.
Should we still allow the user to be able to give their own default factory, or is it enough that they can create their own loop in whatever way they desire.

(failure appears to be because of conda server - rerunning)

@isidentical
Copy link
Member Author

Have to admit that doing this for all get_loop() calls is a bit pre-emptive, though AFAIK there is nothing wrong with it since the selector was the default up to 3.8, and then the default switched to proactor (which supports pipes). The reason that I avoided doing this on filesystem level (on s3fs, gcsfs, adlfs) (all async filesystems that use aiohttp need this, see aio-libs/aiohttp#4536, aio-libs/aiohttp#4719, ...) is that it seemed too much of replication (there are multiple get_loop() calls per implementation, so we'd need to add an option to each one of them, e.g use_selector=...).

Should we still allow the user to be able to give their own default factory, or is it enough that they can create their own loop in whatever way they desire.

They can definitely create their own loop in whatever policy they choose, and pass it as super().__init__(loop=loop) though the fsspec.get_loop() does a bit more then just a bare asyncio.get_new_event_loop() so I don't know whether this is something trivial or not.

@martindurant
Copy link
Member

I'll merge this, see if there's any knock-on effects later.

@martindurant martindurant merged commit c66cbed into fsspec:master Jun 7, 2021
@efiop
Copy link
Member

efiop commented Jan 26, 2023

For the record: we've received a feedback from a user on windows that this breaks fsspec/sshfs#29

File "C:\Users\alex\AppData\Local\Programs\Python\Python310\lib\site-packages\sshfs\spec.py", line 82, in _connect
client = await self._stack.enter_async_context(_raw_client)
File "C:\Users\alex\AppData\Local\Programs\Python\Python310\lib\contextlib.py", line 619, in enter_async_context
result = await _cm_type.aenter(cm)
File "C:\Users\alex\AppData\Local\Programs\Python\Python310\lib\site-packages\asyncssh\misc.py", line 274, in aenter
self._coro_result = await self._coro
File "C:\Users\alex\AppData\Local\Programs\Python\Python310\lib\site-packages\asyncssh\connection.py", line 7834, in connect
return await asyncio.wait_for(
File "C:\Users\alex\AppData\Local\Programs\Python\Python310\lib\asyncio\tasks.py", line 408, in wait_for
return await fut
File "C:\Users\alex\AppData\Local\Programs\Python\Python310\lib\site-packages\asyncssh\connection.py", line 433, in _connect
conn = await _open_proxy(loop, proxy_command, conn_factory)
File "C:\Users\alex\AppData\Local\Programs\Python\Python310\lib\site-packages\asyncssh\connection.py", line 352, in _open_proxy
_, tunnel = await loop.subprocess_exec(_ProxyCommandTunnel, *command)
File "C:\Users\alex\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 1670, in subprocess_exec
transport = await self._make_subprocess_transport(
File "C:\Users\alex\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 498, in _make_subprocess_transport
raise NotImplementedError

Using that sshfs issue to monitor this, looks like we'll need to reconsider this change.

@martindurant
Copy link
Member

Thanks @efiop . I don't know much about it, but happy to consider any change you think is needed. Of course, better if it can be fixed on your end :)

@riklopfer
Copy link

Is there a recommended way to use the default event loop policy from the standard lib? This is what I have currently, which works, but feels a little verbose.

I also wonder if the issues originally reported by @isidentical have been resolved in more recent python versions...

try:
    import s3fs

    _loop = None

    if _loop is None:
        with fsspec.asyn.get_lock():
            if _loop is None:
                _loop = asyncio.new_event_loop()
                th = threading.Thread(target=_loop.run_forever, name="fsspecIO")
                th.daemon = True
                th.start()

    class DefaultEventLoopS3FS(s3fs.S3FileSystem):
        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs, loop=_loop)

    for protocol in fsspec.available_protocols():
        try:
            cls = fsspec.get_filesystem_class(protocol)
        except ImportError:
            continue

        if cls == s3fs.S3FileSystem:
            fsspec.register_implementation(protocol, DefaultEventLoopS3FS, clobber=True)


except ImportError:
    s3fs = None

@martindurant
Copy link
Member

Some little coding could allow fsspec.asyn._selector_policy to be configurable, then you don't have to do any subclassing and re-registering. As it is, you can easily set that module's attribute to a (context manager) function of your choice.

@riklopfer
Copy link

Some little coding could allow fsspec.asyn._selector_policy to be configurable, then you don't have to do any subclassing and re-registering.

Setting fsspec.asyn._selector_policy directly works for me! Is there any reason not to do this instead of my verbose code aside from that member being protected?

@martindurant
Copy link
Member

Patching _selector_policy is fine, it will only apply to the current python process.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

async: use WindowsSelectorEventLoopPolicy for 3.8+
4 participants