Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problems with background threads #53

Open
mxmlnkn opened this issue Oct 5, 2024 · 3 comments
Open

Problems with background threads #53

mxmlnkn opened this issue Oct 5, 2024 · 3 comments

Comments

@mxmlnkn
Copy link
Contributor

mxmlnkn commented Oct 5, 2024

I am trying to use this library with FUSE. However, I have noticed:

  • The FUSE "fork into background" functionality results in unkillable I/O hangs because SSHFileSystem seems to start background threads. I have this problem myself in rapidgzip with the thread pool, for which I added a method to join all threads, so that the process can be forked into the background, where the thread pool gets automatically restarted on first usage. It would be nice to have a similar API for SSHFilesystem.
  • When an error occurs, my program does not quit anymore when a SSHFilesystem was created. I have to press Ctrl+C / send SIGINT for it to quit. I guess that this is also because of background threads that do "block" somehow. Maybe I am just not closing something correctly...

The running threads according to import threading; threading.enumerate() are: MainThread, fsspecIO, asyncio_0. Maybe I am running into #42 Then again, looking at the code, it does not seem like there is any logic for joining or canceling that thread at all!?

On further inspection, the program also hangs inside sshfs.file.SSHFile.close when calling it manually, which itself hangs in SSHFile._close, which itself hangs in fsspec.asyn.sync_wrapper, which hangs in this loop.

It would already be helpful if SSHFile.close would forward kwargs like all the other methods that are initialized with __mirror_method, so that an argument with timeout = 3, for example, can be specified. Some kind of timeout should then also be used in __exit__.

@martindurant
Copy link
Member

for which I added a method to join all threads,

fsspec might find this useful. We have an unreliable guard against fork in general, but perhaps a function prefork() which makes sure nothing unsafe (threads, even loops, open files) is copied out of the main process?

@mxmlnkn
Copy link
Contributor Author

mxmlnkn commented Oct 7, 2024

fsspec might find this useful. We have an unreliable guard against fork in general, but perhaps a function prefork() which makes sure nothing unsafe (threads, even loops, open files) is copied out of the main process?

Sounds good.

I don't think that opened files are a problem, at least not as long as they are not asynchronous, see man fork. I did not have problems with those and I think that I open some before the fork and it works after the work.

The child inherits copies of the parent's set of open file
descriptors. Each file descriptor in the child refers to the
same open file description (see open(2)) as the corresponding
file descriptor in the parent. This means that the two file
descriptors share open file status flags, file offset, and
signal-driven I/O attributes (see the description of F_SETOWN
and F_SETSIG in fcntl(2)).

Here is the libfuse wiki mentioning the problem with threads and fork and the recommendation to start those in the init callback.

Btw, currently, I simply have this check to avoid hangs. (Argh, and while linking this, I noticed that the printDebug >= 1 test shouldn't be in the if. It was a simple warning before, but now is an error. That's how this happened.)

@martindurant
Copy link
Member

I simply have this check

It is good practice, I suppose, but still you need remediation in the case you have threads but then want to fork. Note that even a console ipython session has extra threads, which is only a problem if you try to do anything with them; they don't exist in the child.

In [1]: import multiprocess

In [2]: def f(_):
   ...:     import threading
   ...:     return str(threading.enumerate())
   ...:

In [3]: f(0)
Out[3]: '[<_MainThread(MainThread, started 8383734016)>, <HistorySavingThread(IPythonHistorySavingThread, started 6113292288)>]'

In [4]: pool = multiprocess.Pool()

In [5]: pool.map(f, [0])
Out[5]: ['[<_MainThread(MainThread, started 8383734016)>]']

(If you pass a thread object to the child, you can still check in threading.enumerate(), which works by ID, or call .is_alive() on it)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants