From 6e8cfc8539dcd9519db7f4ed1c90cdaa2cde86fe Mon Sep 17 00:00:00 2001 From: David Thulke Date: Fri, 19 Apr 2024 19:24:20 +0200 Subject: [PATCH 1/5] improves the scheduling logic of the local engine --- CHANGELOG.md | 1 + sisyphus/localengine.py | 12 +++++++++--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f2de78..5fdb0eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - The memory_allocation_type in the slurm engine now default to per_node instead of per_cpu for consistency with other engines +- Improved the scheduling approach of the LocalEngine ### Added - Support for async workflow definitions. If await tk.async_run(obj) is called sisyphus will wait until all Path objects inside of obj are available diff --git a/sisyphus/localengine.py b/sisyphus/localengine.py index 7de9a42..4187d82 100644 --- a/sisyphus/localengine.py +++ b/sisyphus/localengine.py @@ -203,13 +203,15 @@ def run(self): # get next task logging.debug("Check for new task (Free resources %s)" % self.free_resources) with self.waiting_tasks as waiting_tasks: # get object for synchronisation - if next_task is None and not self.input_queue.empty(): + next_task = None + if not self.input_queue.empty(): next_task = self.input_queue.get() logging.debug("Found new task: %s" % str(next_task)) # run next task if the capacities are available if next_task is not None: with self.running_tasks as running_tasks: + wait = False # if enough free resources => run job if self.enough_free_resources(next_task.rqmt): selected_gpus = self.reserve_resources(next_task.rqmt) @@ -225,8 +227,12 @@ def run(self): # Start job: process = self.start_task(next_task, selected_gpus) running_tasks[name] = (process, next_task, selected_gpus) - next_task = None - wait = False + else: + # Put next_task at end of queue and try to schedule the next one + if self.input_queue.empty(): + # Wait if this is the only task in the queue + wait = True + self.input_queue.put(next_task) if wait: # check only once per second for new jobs From 7f2747f957ba784d7530d9546d11d52a0d7cf015 Mon Sep 17 00:00:00 2001 From: David Thulke Date: Fri, 19 Apr 2024 19:35:06 +0200 Subject: [PATCH 2/5] LocalEngine: adds wait after full queue was checked --- sisyphus/localengine.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sisyphus/localengine.py b/sisyphus/localengine.py index 4187d82..002607e 100644 --- a/sisyphus/localengine.py +++ b/sisyphus/localengine.py @@ -194,7 +194,7 @@ def release_resources(self, rqmt, selected_devices): @tools.default_handle_exception_interrupt_main_thread def run(self): - next_task = None + checked_tasks_since_last_wait = 0 try: while self.running.value: self.check_finished_tasks() @@ -229,7 +229,8 @@ def run(self): running_tasks[name] = (process, next_task, selected_gpus) else: # Put next_task at end of queue and try to schedule the next one - if self.input_queue.empty(): + checked_tasks_since_last_wait += 1 + if checked_tasks_since_last_wait > self.input_queue.qsize(): # Wait if this is the only task in the queue wait = True self.input_queue.put(next_task) @@ -238,6 +239,7 @@ def run(self): # check only once per second for new jobs # if no job has been started time.sleep(1) + checked_tasks_since_last_wait = 0 except KeyboardInterrupt: # KeyboardInterrupt is handled in manager pass From 43470da1975fc8e61877f5b6d866aa24a912cb91 Mon Sep 17 00:00:00 2001 From: David Thulke Date: Sun, 21 Apr 2024 14:07:35 +0200 Subject: [PATCH 3/5] LocalEngine: replace input queue with list to keep fifo scheduling behaviour --- sisyphus/localengine.py | 31 +++++++++++++------------------ 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/sisyphus/localengine.py b/sisyphus/localengine.py index 002607e..44150ac 100644 --- a/sisyphus/localengine.py +++ b/sisyphus/localengine.py @@ -99,7 +99,7 @@ def start_engine(self): if self.started: return # control input - self.input_queue = queue.Queue() + self.runnable_tasks = sync_object([]) self.waiting_tasks = sync_object({}) # control output / which tasks are currently running @@ -194,7 +194,6 @@ def release_resources(self, rqmt, selected_devices): @tools.default_handle_exception_interrupt_main_thread def run(self): - checked_tasks_since_last_wait = 0 try: while self.running.value: self.check_finished_tasks() @@ -202,16 +201,15 @@ def run(self): wait = True # wait if no new job is started # get next task logging.debug("Check for new task (Free resources %s)" % self.free_resources) - with self.waiting_tasks as waiting_tasks: # get object for synchronisation - next_task = None - if not self.input_queue.empty(): - next_task = self.input_queue.get() - logging.debug("Found new task: %s" % str(next_task)) + # get object for synchronisation + with self.waiting_tasks as waiting_tasks, self.runnable_tasks as runnable_tasks: + total_runnable_tasks = len(runnable_tasks) + runnable_task_idx = 0 # run next task if the capacities are available - if next_task is not None: + while runnable_task_idx < total_runnable_tasks: + next_task = runnable_tasks[runnable_task_idx] with self.running_tasks as running_tasks: - wait = False # if enough free resources => run job if self.enough_free_resources(next_task.rqmt): selected_gpus = self.reserve_resources(next_task.rqmt) @@ -227,19 +225,16 @@ def run(self): # Start job: process = self.start_task(next_task, selected_gpus) running_tasks[name] = (process, next_task, selected_gpus) + del runnable_tasks[runnable_task_idx] + total_runnable_tasks -= 1 + wait = False else: - # Put next_task at end of queue and try to schedule the next one - checked_tasks_since_last_wait += 1 - if checked_tasks_since_last_wait > self.input_queue.qsize(): - # Wait if this is the only task in the queue - wait = True - self.input_queue.put(next_task) + runnable_task_idx += 1 if wait: # check only once per second for new jobs # if no job has been started time.sleep(1) - checked_tasks_since_last_wait = 0 except KeyboardInterrupt: # KeyboardInterrupt is handled in manager pass @@ -263,8 +258,8 @@ def submit_call(self, call, logpath, rqmt, name, task_name, task_ids): call_with_id += ["--redirect_output"] task = TaskQueueInstance(call_with_id, logpath, rqmt, name, task_name, task_id) - with self.waiting_tasks as waiting_tasks: - self.input_queue.put(task) + with self.waiting_tasks as waiting_tasks, self.runnable_tasks as runnable_tasks: + runnable_tasks.append(task) waiting_tasks[(name, task_id)] = task return ENGINE_NAME, socket.gethostname() From 39f55f03236e3d045b11a86fdd28b94938a9411b Mon Sep 17 00:00:00 2001 From: David Thulke Date: Sun, 21 Apr 2024 14:13:56 +0200 Subject: [PATCH 4/5] LocalEngine: update comments --- sisyphus/localengine.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sisyphus/localengine.py b/sisyphus/localengine.py index 44150ac..ee34aba 100644 --- a/sisyphus/localengine.py +++ b/sisyphus/localengine.py @@ -199,8 +199,8 @@ def run(self): self.check_finished_tasks() wait = True # wait if no new job is started - # get next task - logging.debug("Check for new task (Free resources %s)" % self.free_resources) + # check runnable tasks + logging.debug("Check for new tasks (Free resources %s)" % self.free_resources) # get object for synchronisation with self.waiting_tasks as waiting_tasks, self.runnable_tasks as runnable_tasks: total_runnable_tasks = len(runnable_tasks) From 19f0b2bd75746c5a6fc044ead7811f64118cc5a0 Mon Sep 17 00:00:00 2001 From: David Thulke Date: Wed, 24 Apr 2024 18:18:01 +0200 Subject: [PATCH 5/5] LocalEngine: use len of runnable_tasks instead of total_runnable_tasks --- sisyphus/localengine.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sisyphus/localengine.py b/sisyphus/localengine.py index ee34aba..a754d47 100644 --- a/sisyphus/localengine.py +++ b/sisyphus/localengine.py @@ -203,11 +203,10 @@ def run(self): logging.debug("Check for new tasks (Free resources %s)" % self.free_resources) # get object for synchronisation with self.waiting_tasks as waiting_tasks, self.runnable_tasks as runnable_tasks: - total_runnable_tasks = len(runnable_tasks) runnable_task_idx = 0 # run next task if the capacities are available - while runnable_task_idx < total_runnable_tasks: + while runnable_task_idx < len(runnable_tasks): next_task = runnable_tasks[runnable_task_idx] with self.running_tasks as running_tasks: # if enough free resources => run job @@ -226,7 +225,6 @@ def run(self): process = self.start_task(next_task, selected_gpus) running_tasks[name] = (process, next_task, selected_gpus) del runnable_tasks[runnable_task_idx] - total_runnable_tasks -= 1 wait = False else: runnable_task_idx += 1