diff --git a/spinetoolbox/project.py b/spinetoolbox/project.py index 701515d02..66b6a3b29 100644 --- a/spinetoolbox/project.py +++ b/spinetoolbox/project.py @@ -108,6 +108,8 @@ class SpineToolboxProject(MetaObject): specification_saved = Signal(str, str) """Emitted after a specification has been saved.""" + LOCAL_EXECUTION_JOB_ID = "1" + def __init__(self, toolbox, p_dir, plugin_specs, app_settings, settings, logger): """ Args: @@ -978,8 +980,8 @@ def _execute_dags(self, dags, execution_permits_list): if self._engine_workers: self._logger.msg_error.emit("Execution already in progress.") return - self.job_id = self.prepare_remote_execution() - if not self.job_id: + job_id = self.prepare_remote_execution() + if not job_id: self.project_execution_finished.emit() return settings = make_settings_dict_for_engine(self._app_settings) @@ -987,7 +989,7 @@ def _execute_dags(self, dags, execution_permits_list): darker = lambda x: f'{x}' for k, (dag, execution_permits) in enumerate(zip(dags, execution_permits_list)): dag_identifier = f"{k + 1}/{len(dags)}" - worker = self.create_engine_worker(dag, execution_permits, dag_identifier, settings) + worker = self.create_engine_worker(dag, execution_permits, dag_identifier, settings, job_id) if worker is None: continue self._logger.msg.emit("Starting DAG {0}".format(dag_identifier)) @@ -1003,7 +1005,7 @@ def _execute_dags(self, dags, execution_permits_list): for worker in self._engine_workers: worker.start() - def create_engine_worker(self, dag, execution_permits, dag_identifier, settings): + def create_engine_worker(self, dag, execution_permits, dag_identifier, settings, job_id): """Creates and returns a SpineEngineWorker to execute given *validated* dag. Args: @@ -1011,6 +1013,7 @@ def create_engine_worker(self, dag, execution_permits, dag_identifier, settings) execution_permits (dict): mapping item names to a boolean indicating whether to execute it or skip it dag_identifier (str): A string identifying the dag, for logging settings (dict): project and app settings to send to the spine engine. + job_id (str): job id Returns: SpineEngineWorker @@ -1045,7 +1048,7 @@ def create_engine_worker(self, dag, execution_permits, dag_identifier, settings) "settings": settings, "project_dir": self.project_dir.replace(os.sep, "/"), } - worker = SpineEngineWorker(data, dag, dag_identifier, items, connections, self._logger, self.job_id) + worker = SpineEngineWorker(data, dag, dag_identifier, items, connections, self._logger, job_id) return worker def _handle_engine_worker_finished(self, worker): @@ -1068,7 +1071,7 @@ def _handle_engine_worker_finished(self, worker): for item, direction, state in finished_worker.successful_executions: item.handle_execution_successful(direction, state) finished_worker.clean_up() - self.finalize_remote_execution() + self.finalize_remote_execution(worker.job_id) self._engine_workers.clear() self.project_execution_finished.emit() @@ -1418,11 +1421,11 @@ def prepare_remote_execution(self): """Pings the server and sends the project as a zip-file to server. Returns: - str: Job Id if server is ready for remote execution, empty string if something went wrong or "1" if - local execution is enabled. + str: Job Id if server is ready for remote execution, empty string if something went wrong + or LOCAL_EXECUTION_JOB_ID if local execution is enabled. """ if not self._app_settings.value("engineSettings/remoteExecutionEnabled", defaultValue="false") == "true": - return "1" # Something that isn't False + return self.LOCAL_EXECUTION_JOB_ID host, port, sec_model, sec_folder = self._toolbox.engine_server_settings() if not host: self._logger.msg_error.emit( @@ -1469,8 +1472,12 @@ def prepare_remote_execution(self): engine_client.close() return job_id - def finalize_remote_execution(self): - """Sends a request to server to remove the project directory and removes the project ZIP file from client.""" + def finalize_remote_execution(self, job_id): + """Sends a request to server to remove the project directory and removes the project ZIP file from client.y + + Args: + job_id (str): job id + """ if not self._app_settings.value("engineSettings/remoteExecutionEnabled", defaultValue="false") == "true": return host, port, sec_model, sec_folder = self._toolbox.engine_server_settings() @@ -1481,7 +1488,7 @@ def finalize_remote_execution(self): f"Server is not responding. {e}. " f"Check settings in File->Settings->Engine." ) return - engine_client.remove_project_from_server(self.job_id) + engine_client.remove_project_from_server(job_id) engine_client.close() project_zip_file = os.path.abspath(os.path.join(self.project_dir, os.pardir, PROJECT_ZIP_FILENAME + ".zip")) if not os.path.isfile(project_zip_file): diff --git a/spinetoolbox/spine_engine_worker.py b/spinetoolbox/spine_engine_worker.py index bcc222333..c047ddde5 100644 --- a/spinetoolbox/spine_engine_worker.py +++ b/spinetoolbox/spine_engine_worker.py @@ -128,11 +128,12 @@ def __init__(self, engine_data, dag, dag_identifier, project_items, connections, project_items (dict): mapping from project item name to :class:`ProjectItem` connections (dict): mapping from jump name to :class:`LoggingConnection` or :class:`LoggingJump` logger (LoggerInterface): a logger - job_id: Job Id for remote execution + job_id (str): Job id for remote execution """ super().__init__() self._engine_data = engine_data exec_remotely = engine_data["settings"].get("engineSettings/remoteExecutionEnabled", "false") == "true" + self._job_id = job_id self._engine_mngr = make_engine_manager(exec_remotely, job_id) self.dag = dag self.dag_identifier = dag_identifier @@ -148,6 +149,10 @@ def __init__(self, engine_data, dag, dag_identifier, project_items, connections, self.moveToThread(self._thread) self._thread.started.connect(self.do_work) + @property + def job_id(self): + return self._job_id + @property def engine_data(self): """Engine data dictionary."""