Skip to content

Commit

Permalink
Make job id's usable outside project (#2246)
Browse files Browse the repository at this point in the history
  • Loading branch information
soininen authored Aug 23, 2023
2 parents 96ad355 + 8464fed commit 8b74d3b
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 13 deletions.
31 changes: 19 additions & 12 deletions spinetoolbox/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ class SpineToolboxProject(MetaObject):
specification_saved = Signal(str, str)
"""Emitted after a specification has been saved."""

LOCAL_EXECUTION_JOB_ID = "1"

def __init__(self, toolbox, p_dir, plugin_specs, app_settings, settings, logger):
"""
Args:
Expand Down Expand Up @@ -978,16 +980,16 @@ def _execute_dags(self, dags, execution_permits_list):
if self._engine_workers:
self._logger.msg_error.emit("Execution already in progress.")
return
self.job_id = self.prepare_remote_execution()
if not self.job_id:
job_id = self.prepare_remote_execution()
if not job_id:
self.project_execution_finished.emit()
return
settings = make_settings_dict_for_engine(self._app_settings)
darker_fg_color = QColor(FG_COLOR).darker().name()
darker = lambda x: f'<span style="color: {darker_fg_color}">{x}</span>'
for k, (dag, execution_permits) in enumerate(zip(dags, execution_permits_list)):
dag_identifier = f"{k + 1}/{len(dags)}"
worker = self.create_engine_worker(dag, execution_permits, dag_identifier, settings)
worker = self.create_engine_worker(dag, execution_permits, dag_identifier, settings, job_id)
if worker is None:
continue
self._logger.msg.emit("<b>Starting DAG {0}</b>".format(dag_identifier))
Expand All @@ -1003,14 +1005,15 @@ def _execute_dags(self, dags, execution_permits_list):
for worker in self._engine_workers:
worker.start()

def create_engine_worker(self, dag, execution_permits, dag_identifier, settings):
def create_engine_worker(self, dag, execution_permits, dag_identifier, settings, job_id):
"""Creates and returns a SpineEngineWorker to execute given *validated* dag.
Args:
dag (DiGraph): The dag
execution_permits (dict): mapping item names to a boolean indicating whether to execute it or skip it
dag_identifier (str): A string identifying the dag, for logging
settings (dict): project and app settings to send to the spine engine.
job_id (str): job id
Returns:
SpineEngineWorker
Expand Down Expand Up @@ -1045,7 +1048,7 @@ def create_engine_worker(self, dag, execution_permits, dag_identifier, settings)
"settings": settings,
"project_dir": self.project_dir.replace(os.sep, "/"),
}
worker = SpineEngineWorker(data, dag, dag_identifier, items, connections, self._logger, self.job_id)
worker = SpineEngineWorker(data, dag, dag_identifier, items, connections, self._logger, job_id)
return worker

def _handle_engine_worker_finished(self, worker):
Expand All @@ -1068,7 +1071,7 @@ def _handle_engine_worker_finished(self, worker):
for item, direction, state in finished_worker.successful_executions:
item.handle_execution_successful(direction, state)
finished_worker.clean_up()
self.finalize_remote_execution()
self.finalize_remote_execution(worker.job_id)
self._engine_workers.clear()
self.project_execution_finished.emit()

Expand Down Expand Up @@ -1418,11 +1421,11 @@ def prepare_remote_execution(self):
"""Pings the server and sends the project as a zip-file to server.
Returns:
str: Job Id if server is ready for remote execution, empty string if something went wrong or "1" if
local execution is enabled.
str: Job Id if server is ready for remote execution, empty string if something went wrong
or LOCAL_EXECUTION_JOB_ID if local execution is enabled.
"""
if not self._app_settings.value("engineSettings/remoteExecutionEnabled", defaultValue="false") == "true":
return "1" # Something that isn't False
return self.LOCAL_EXECUTION_JOB_ID
host, port, sec_model, sec_folder = self._toolbox.engine_server_settings()
if not host:
self._logger.msg_error.emit(
Expand Down Expand Up @@ -1469,8 +1472,12 @@ def prepare_remote_execution(self):
engine_client.close()
return job_id

def finalize_remote_execution(self):
"""Sends a request to server to remove the project directory and removes the project ZIP file from client."""
def finalize_remote_execution(self, job_id):
"""Sends a request to server to remove the project directory and removes the project ZIP file from client.y
Args:
job_id (str): job id
"""
if not self._app_settings.value("engineSettings/remoteExecutionEnabled", defaultValue="false") == "true":
return
host, port, sec_model, sec_folder = self._toolbox.engine_server_settings()
Expand All @@ -1481,7 +1488,7 @@ def finalize_remote_execution(self):
f"Server is not responding. {e}. " f"Check settings in <b>File->Settings->Engine</b>."
)
return
engine_client.remove_project_from_server(self.job_id)
engine_client.remove_project_from_server(job_id)
engine_client.close()
project_zip_file = os.path.abspath(os.path.join(self.project_dir, os.pardir, PROJECT_ZIP_FILENAME + ".zip"))
if not os.path.isfile(project_zip_file):
Expand Down
7 changes: 6 additions & 1 deletion spinetoolbox/spine_engine_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,12 @@ def __init__(self, engine_data, dag, dag_identifier, project_items, connections,
project_items (dict): mapping from project item name to :class:`ProjectItem`
connections (dict): mapping from jump name to :class:`LoggingConnection` or :class:`LoggingJump`
logger (LoggerInterface): a logger
job_id: Job Id for remote execution
job_id (str): Job id for remote execution
"""
super().__init__()
self._engine_data = engine_data
exec_remotely = engine_data["settings"].get("engineSettings/remoteExecutionEnabled", "false") == "true"
self._job_id = job_id
self._engine_mngr = make_engine_manager(exec_remotely, job_id)
self.dag = dag
self.dag_identifier = dag_identifier
Expand All @@ -148,6 +149,10 @@ def __init__(self, engine_data, dag, dag_identifier, project_items, connections,
self.moveToThread(self._thread)
self._thread.started.connect(self.do_work)

@property
def job_id(self):
return self._job_id

@property
def engine_data(self):
"""Engine data dictionary."""
Expand Down

0 comments on commit 8b74d3b

Please sign in to comment.