Skip to content

Commit

Permalink
Merge pull request #185 from dthulke/localengine-scheduling
Browse files Browse the repository at this point in the history
Improving the scheduling logic of the local engine
  • Loading branch information
dthulke authored Apr 29, 2024
2 parents 923a693 + 19f0b2b commit 0da764a
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed

- The memory_allocation_type in the slurm engine now default to per_node instead of per_cpu for consistency with other engines
- Improved the scheduling approach of the LocalEngine

### Added
- Support for async workflow definitions. If await tk.async_run(obj) is called sisyphus will wait until all Path objects inside of obj are available
Expand Down
25 changes: 13 additions & 12 deletions sisyphus/localengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def start_engine(self):
if self.started:
return
# control input
self.input_queue = queue.Queue()
self.runnable_tasks = sync_object([])
self.waiting_tasks = sync_object({})

# control output / which tasks are currently running
Expand Down Expand Up @@ -194,21 +194,20 @@ def release_resources(self, rqmt, selected_devices):

@tools.default_handle_exception_interrupt_main_thread
def run(self):
next_task = None
try:
while self.running.value:
self.check_finished_tasks()

wait = True # wait if no new job is started
# get next task
logging.debug("Check for new task (Free resources %s)" % self.free_resources)
with self.waiting_tasks as waiting_tasks: # get object for synchronisation
if next_task is None and not self.input_queue.empty():
next_task = self.input_queue.get()
logging.debug("Found new task: %s" % str(next_task))
# check runnable tasks
logging.debug("Check for new tasks (Free resources %s)" % self.free_resources)
# get object for synchronisation
with self.waiting_tasks as waiting_tasks, self.runnable_tasks as runnable_tasks:
runnable_task_idx = 0

# run next task if the capacities are available
if next_task is not None:
while runnable_task_idx < len(runnable_tasks):
next_task = runnable_tasks[runnable_task_idx]
with self.running_tasks as running_tasks:
# if enough free resources => run job
if self.enough_free_resources(next_task.rqmt):
Expand All @@ -225,8 +224,10 @@ def run(self):
# Start job:
process = self.start_task(next_task, selected_gpus)
running_tasks[name] = (process, next_task, selected_gpus)
next_task = None
del runnable_tasks[runnable_task_idx]
wait = False
else:
runnable_task_idx += 1

if wait:
# check only once per second for new jobs
Expand Down Expand Up @@ -255,8 +256,8 @@ def submit_call(self, call, logpath, rqmt, name, task_name, task_ids):
call_with_id += ["--redirect_output"]

task = TaskQueueInstance(call_with_id, logpath, rqmt, name, task_name, task_id)
with self.waiting_tasks as waiting_tasks:
self.input_queue.put(task)
with self.waiting_tasks as waiting_tasks, self.runnable_tasks as runnable_tasks:
runnable_tasks.append(task)
waiting_tasks[(name, task_id)] = task
return ENGINE_NAME, socket.gethostname()

Expand Down

0 comments on commit 0da764a

Please sign in to comment.