Skip to content

Commit

Permalink
other file staging types
Browse files Browse the repository at this point in the history
  • Loading branch information
colinthomas-z80 committed May 24, 2024
1 parent 2865342 commit 8bcfe4a
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 10 deletions.
10 changes: 8 additions & 2 deletions parsl/executors/taskvine/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,10 @@ def _taskvine_submit_wait(ready_task_queue=None,
task_in_file = _handle_file_declaration_protocol(m, spec.parsl_name, spec.cache)
parsl_file_name_to_vine_file[spec.parsl_name] = task_in_file
logger.debug("Adding input file {}, {} to TaskVine".format(task_in_file, task.executor_id))
t.add_input(task_in_file, spec.netloc)
if spec.netloc != '':
t.add_input(task_in_file, spec.netloc)
else:
t.add_input(task_in_file, spec.parsl_name)

for spec in task.output_files:
if spec.stage:
Expand All @@ -388,7 +391,10 @@ def _taskvine_submit_wait(ready_task_queue=None,
task_out_file = _handle_file_declaration_protocol(m, spec.parsl_name, spec.cache)
parsl_file_name_to_vine_file[spec.parsl_name] = task_out_file
logger.debug("Adding output file {}, {} to TaskVine".format(task_out_file, task.executor_id))
t.add_output(task_out_file, spec.netloc)
if spec.netloc != '':
t.add_output(task_out_file, spec.netloc)
else:
t.add_output(task_out_file, spec.parsl_name)

# Submit the task to the TaskVine object
logger.debug("Submitting executor task {}, {} to TaskVine".format(task.executor_id, t))
Expand Down
8 changes: 3 additions & 5 deletions parsl/executors/taskvine/stub_staging_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

logger = logging.getLogger(__name__)

known_url_schemes = ["file", "http", "https", "taskvinetemp"]
known_url_schemes = ["http", "https", "taskvinetemp"]


class StubStaging(Staging, RepresentationMixin):
Expand All @@ -23,15 +23,13 @@ def can_stage_out(self, file):
return file.scheme in known_url_schemes

def stage_in(self, dm, executor: str, file: File, parent_fut: Optional[Future]) -> Optional[DataFuture]:
if file.netloc == '':
file.netloc = file.filename
if file.scheme in ["taskvinetemp", "https", "http"]:
file.netloc = file.filename
file.local_path = file.netloc
return None

def stage_out(self, dm, executor: str, file: File, app_fu: Future) -> Optional[Future]:
if file.netloc == '':
file.netloc = file.filename
if file.scheme in ["taskvinetemp", "https", "http"]:
file.netloc = file.filename
file.local_path = file.netloc
return None
6 changes: 3 additions & 3 deletions parsl/tests/configs/taskvine_ex.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
from parsl.executors.taskvine import TaskVineExecutor
from parsl.executors.taskvine import TaskVineManagerConfig

from parsl.data_provider.http import HTTPInTaskStaging
from parsl.data_provider.ftp import FTPInTaskStaging
from parsl.executors.taskvine.stub_staging_provider import StubStaging
from parsl.data_provider.file_noop import NoOpFileStaging
from parsl.data_provider.zip import ZipFileStaging


def fresh_config():
return Config(executors=[TaskVineExecutor(manager_config=TaskVineManagerConfig(port=9000),
worker_launch_method='factory',
storage_access=[FTPInTaskStaging(), StubStaging()])])
storage_access=[FTPInTaskStaging(), StubStaging(), NoOpFileStaging(), ZipFileStaging()])])

0 comments on commit 8bcfe4a

Please sign in to comment.