Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add taskvine temp/url handling to parsl/taskvine executor #3355

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
17 changes: 12 additions & 5 deletions parsl/executors/taskvine/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,11 @@ def __init__(self,
manager_config: TaskVineManagerConfig = TaskVineManagerConfig(),
factory_config: TaskVineFactoryConfig = TaskVineFactoryConfig(),
provider: Optional[ExecutionProvider] = LocalProvider(init_blocks=1),
working_dir: str = '.',
storage_access: Optional[List[Staging]] = None):

self.working_dir = working_dir

# Set worker launch option for this executor
if worker_launch_method == 'factory' or worker_launch_method == 'manual':
provider = None
Expand Down Expand Up @@ -215,9 +218,9 @@ def __create_data_and_logging_dirs(self):

# Create directories for data and results
log_dir = os.path.join(run_dir, self.label)
self._function_data_dir = os.path.join(run_dir, self.label, "function_data")
self._function_data_dir = os.path.join("/tmp/function_data/", self.label, str(uuid.uuid4()))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a change to say "taskvine is going to manage movements of all these files off this local temporary directory"? if so, is that something that works independently of this PR #3355? (eg. if you did it as a different PR, is that what would happen without any other changes?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is not in the scope of this PR. I can go ahead and separate the two. This will work independently, and potentially be a benefit to others.

While I was developing this temp file effort I discovered my testing application was writing thousands of function data files to the shared file system, so it got thrown in here for the time being.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, then I think this definitely makes sense to turn into its own PR.

I think you need to consider who owns /tmp/function_data on a shared submit host (it's probably not you, but the first user who ever ran taskvine on this machine) - so maybe this direction should be something like /tmp/parsl-taskvine-{username} so both the guilty user and the guilty software are clearly identified.

os.makedirs(log_dir)
os.makedirs(self._function_data_dir)
os.makedirs(self._function_data_dir, exist_ok=True)

# put TaskVine logs outside of a Parsl run as TaskVine caches between runs while
# Parsl does not.
Expand Down Expand Up @@ -363,6 +366,7 @@ def submit(self, func, resource_specification, *args, **kwargs):

# Also consider any *arg that looks like a file as an input:
input_files += [self._register_file(f) for f in args if isinstance(f, File)]
logger.debug(f"registered input files {input_files}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for debug logs in Parsl, @khk-globus and I have been pushing on a style that defers string interpolation until the log system has decided if it really wants to render that message - often that's not the case and rendering can be expensive. that format looks like this:

logger.debug("Creating process with command '%s'", cmd)

rather than using Python-level string formatting (either f-strings or "".format(


for kwarg, maybe_file in kwargs.items():
# Add appropriate input and output files from "stdout" and "stderr" keyword arguments
Expand Down Expand Up @@ -504,15 +508,18 @@ def _register_file(self, parsl_file):
if parsl_file.scheme == 'file' or \
(parsl_file.local_path and os.path.exists(parsl_file.local_path)):
to_stage = not os.path.isabs(parsl_file.filepath)

return ParslFileToVine(parsl_file.filepath, to_stage, to_cache)
return ParslFileToVine(parsl_file.filepath, parsl_file.filepath, stage=to_stage, cache=to_cache, protocol=parsl_file.scheme)
else:
# we must stage url and temp files
ptv = ParslFileToVine(parsl_file.url, parsl_file.local_path, stage=True, cache=to_cache, protocol=parsl_file.scheme)
return ptv

def _std_output_to_vine(self, fdname, stdfspec):
"""Find the name of the file that will contain stdout or stderr and
return a ParslFileToVine with it. These files are never cached"""
fname, mode = putils.get_std_fname_mode(fdname, stdfspec)
to_stage = not os.path.isabs(fname)
return ParslFileToVine(fname, stage=to_stage, cache=False)
return ParslFileToVine(fname, fname, stage=to_stage, cache=False, protocol="file")

def _prepare_package(self, fn, extra_pkgs):
""" Look at source code of apps to figure out their package depedencies
Expand Down
25 changes: 21 additions & 4 deletions parsl/executors/taskvine/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,15 @@ def _prepare_environment_regular(m, manager_config, t, task, poncho_env_to_file,
t.add_environment(poncho_env_file)


def _handle_file_declaration_protocol(m, spec):
if "http" in spec.protocol:
return m.declare_url(spec.parsl_name, cache=spec.cache, peer_transfer=True)
elif spec.protocol == "taskvinetemp":
return m.declare_temp()
else:
return m.declare_file(spec.parsl_name, cache=spec.cache, peer_transfer=True)


@wrap_with_logs
def _taskvine_submit_wait(ready_task_queue=None,
finished_task_queue=None,
Expand Down Expand Up @@ -364,18 +373,26 @@ def _taskvine_submit_wait(ready_task_queue=None,
if spec.parsl_name in parsl_file_name_to_vine_file:
task_in_file = parsl_file_name_to_vine_file[spec.parsl_name]
else:
task_in_file = m.declare_file(spec.parsl_name, cache=spec.cache, peer_transfer=True)
task_in_file = _handle_file_declaration_protocol(m, spec)
parsl_file_name_to_vine_file[spec.parsl_name] = task_in_file
t.add_input(task_in_file, spec.parsl_name)
logger.debug("Adding input file {}, {} to TaskVine".format(spec.parsl_name, task.executor_id))
if spec.remote_name == '':
t.add_input(task_in_file, spec.parsl_name)
else:
t.add_input(task_in_file, spec.remote_name)

for spec in task.output_files:
if spec.stage:
if spec.parsl_name in parsl_file_name_to_vine_file:
task_out_file = parsl_file_name_to_vine_file[spec.parsl_name]
else:
task_out_file = m.declare_file(spec.parsl_name, cache=spec.cache, peer_transfer=True)
task_out_file = _handle_file_declaration_protocol(m, spec)
parsl_file_name_to_vine_file[spec.parsl_name] = task_out_file
t.add_output(task_out_file, spec.parsl_name)
logger.debug("Adding output file {}, {} to TaskVine".format(spec.parsl_name, task.executor_id))
if spec.remote_name == '':
t.add_output(task_out_file, spec.parsl_name)
else:
t.add_output(task_out_file, spec.remote_name)

# Submit the task to the TaskVine object
logger.debug("Submitting executor task {}, {} to TaskVine".format(task.executor_id, t))
Expand Down
35 changes: 35 additions & 0 deletions parsl/executors/taskvine/taskvine_staging_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import logging
from concurrent.futures import Future
from typing import Optional

from parsl.app.futures import DataFuture
from parsl.data_provider.files import File
from parsl.data_provider.staging import Staging
from parsl.utils import RepresentationMixin

logger = logging.getLogger(__name__)

known_url_schemes = ["http", "https", "taskvinetemp"]


class TaskVineStaging(Staging, RepresentationMixin):

def can_stage_in(self, file):
logger.debug("Task vine staging provider checking passthrough for {}".format(repr(file)))
return file.scheme in known_url_schemes

def can_stage_out(self, file):
logger.debug("Task vine staging provider checking passthrough for {}".format(repr(file)))
return file.scheme in known_url_schemes

def stage_in(self, dm, executor: str, file: File, parent_fut: Optional[Future]) -> Optional[DataFuture]:
if file.scheme in ["taskvinetemp", "https", "http"]:
file.local_path = file.url.split('/')[-1]
logger.debug("Task vine staging provider stage in for {}".format(repr(file)))
return None

def stage_out(self, dm, executor: str, file: File, app_fu: Future) -> Optional[Future]:
if file.scheme in ["taskvinetemp", "https", "http"]:
file.local_path = file.url.split('/')[-1]
logger.debug("Task vine staging provider stage out for {}".format(repr(file)))
return None
6 changes: 5 additions & 1 deletion parsl/executors/taskvine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,16 @@ class ParslFileToVine:
"""
def __init__(self,
parsl_name: str, # name of file
remote_name: str, # name of file if url
stage: bool, # whether TaskVine should know about this file
cache: bool # whether TaskVine should cache this file
cache: bool, # whether TaskVine should cache this file
protocol: str, # protocol if url
):
self.parsl_name = parsl_name
self.stage = stage
self.cache = cache
self.remote_name = remote_name
self.protocol = protocol


def run_parsl_function(map_file, function_file, argument_file, result_file):
Expand Down
5 changes: 4 additions & 1 deletion parsl/tests/configs/taskvine_ex.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
from parsl.data_provider.file_noop import NoOpFileStaging
from parsl.data_provider.ftp import FTPInTaskStaging
from parsl.data_provider.http import HTTPInTaskStaging
from parsl.data_provider.zip import ZipFileStaging
from parsl.executors.taskvine import TaskVineExecutor, TaskVineManagerConfig
from parsl.executors.taskvine.taskvine_staging_provider import TaskVineStaging


def fresh_config():
return Config(executors=[TaskVineExecutor(manager_config=TaskVineManagerConfig(port=9000),
worker_launch_method='factory')])
worker_launch_method='factory',
storage_access=[FTPInTaskStaging(), TaskVineStaging(), NoOpFileStaging(), ZipFileStaging()])])
Loading