-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix blocking worker queue when scheduling in parallel #3013
base: master
Are you sure you want to change the base?
Conversation
test/scheduler_test.py
Outdated
i = luigi.IntParameter() | ||
|
||
tasks = [UnpicklableTask(i=i) for i in range(5)] | ||
self.assertFalse(self.schedule_parallel(tasks)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line never terminates as can be seen in the first build log.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 043b4c6.
This seems like a really interesting improvement, is there any chance of this making it into luigi? In addition, I would be also very interested in the two improvements mentioned above, could these be included in luigi without the PR suggested here? |
I am not a maintainer, but I think it would be great to get this in. We also use parallel scheduling to cut down latency until first task execution. |
I essentially have all changes I mentioned above ready in personal branches (and then work kicked in and couldn't follow up on this anymore). @dlstadther If you agree with the ideas above, I would open PRs. |
I'm good with the submission of a fix to resolve this issue. Note that I don't utilize the Luigi project for work anymore, so i can only review as an outsider now. I'll do my best to be responsive, but times might be slow. Also, Parallel scheduling is optional behaviour and is not enabled by default. |
I committed the fix of the loop doing the completeness checks during initial scheduling. The main change is that queue objects are no longer required for storing the results of IMHO, this simplifies the code quite a lot and allows to add the 2 additional features I mentioned in the PR description (in future PRs), but I understand that changes to the core require a lot of scrutiny. @dlstadther Any idea who we can include for additional review? The tests pass, but for some reason the coverage drops by ~8% and I fail to see where / how exactly ... |
With the usual disclaimer that I am not a maintainer, I am familiar with the code and would like this PR to get in, so I'll contribute a review. It overall looks good. The polling itches a bit, since it might add up to measurable CPU usage for long tasks, but I don't have a suggestion for a way to avoid it that wouldn't add complexity. Luigi is built on a concept of polling, so I guess we should not be too picky. :-) I'll add a few nitpicks inline. |
break | ||
else: | ||
time.sleep(wait_interval) | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The for/else + continue makes the flow unclear. How about packing up the rest of the loop into a new method and calling it before break
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have an optional comment and a question. Assuming no impact, i'm good to approve. (Sorry for taking a month to return to this review when I said I'd be a week)!
""" | ||
|
||
def __init__(self, func, args=None, kwargs=None): | ||
super(SyncResult, self).__init__() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
luigi isn't officially supporting py 2.7 anymore so this could just be super().__init__()
, but totally optional
try: | ||
is_complete = task.complete() | ||
except Exception: | ||
is_complete = TracebackWrapper(traceback.format_exc()) | ||
out_queue.put((task, is_complete)) | ||
return task.complete() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the potential for hitting an exception here? Was the previous catch never caught and thus pointless? Or are we introducing the opportunity here for an unhandled exception?
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. If closed, you may revisit when your time allows and reopen! Thank you for your contributions. |
Hi,
this PR is meant to fix* and improve the parallel scheduling of tasks. To demonstrate what is currently failing (* therefore the fix), I added the WIP label and only added one commit with a simple test that currently, against my expectation, does not pass but blocks the entire process.
Description
The blocking occurs when parallel scheduling is enabled but a task is not picklable. The scheduling is mostly implemented in
Worker.add
,luigi/luigi/worker.py
Lines 730 to 774 in cf2abbd
where processes of a pool put the results of task completeness checks back into a queue. However, when the task is not picklable, the queue is never filled so that
queue.get()
will block forever without being able to access exceptions raised in the process. Instead, the exception is hold by theAsyncResult
object returned byapply_async
which is not used inWorker.add
, so there is currently no handle to catch these cases.Possible solution
IMHO, the root cause for this issue is the difference between the actual
multiprocessing.Pool.apply_async
(returning an async result object) and the customSingleProcessPool.apply_async
(instantly returning the result of the function call). The latter should produce something like aSyncResult
with the same API asAsyncResult
. Following this, one should rely on its built-in functionality to wrap exceptions (re-raised when callingget()
) and a lot of the custom exception handling in worker.py would be obsolete (such asAsyncCompletionException
andTracebackWrapper
) which simplifies things quite a lot.I already implemented this (all tests pass) and will push it if you agree :)
I know that you guys are (understandably) very cautious when it comes to changes in core code, but I think the parallel scheduling could really benefit from this change / fix.
Motivation and Context
Our dependency trees sometimes take O(h) to build as our completeness checks require a lot of remote resources. Therefore, we make heavy use of the parallel scheduling feature. Besides this issue/PR, we plan to submit two additional PRs in the future:
interface.build(many_tasks_in_a_list, ...)
is not using the parallel scheduling at all (because of this loop).Have you tested this? If so, how?
Yes, the test is included and will pass once the parallel scheduling is fixed.