Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a parameter to allow for "async for" iteration over data items, awaiting eac... #217

Open
github-actions bot opened this issue Nov 9, 2024 · 0 comments
Labels

Comments

@github-actions
Copy link

github-actions bot commented Nov 9, 2024

call of the underlying asynchronous function instead of running all calls concurrently,

as is currently implemented. This would be useful for cases where the processing of

each data item needs to be done sequentially, e.g. when the processing involves

modifying a shared resource that cannot be accessed concurrently. This would still

allow asynchronous processing of the job as a whole, e.g. in context of other jobs.

# TODO: Add a parameter to allow for "async for" iteration over data items, awaiting each

            params[0].default != Parameter.empty or \
            params[0].kind not in [Parameter.POSITIONAL_ONLY,
                                   Parameter.POSITIONAL_OR_KEYWORD]:
        raise ValueError('Parameter "iterate_over_data_files" is set to True, '
                         'but the job function has no arguments without default values. '
                         'Such a first argument will be replaced with a corresponding '
                         'Dataset arg to be iterated over')


def _create_dataset_cls(data_file_type: InputTypeT) -> type[IsDataset]:
    if is_model_subclass(data_file_type):
        return Dataset[data_file_type]  # type: ignore[return-value, valid-type]
    else:
        return Dataset[Model[data_file_type]]  # type: ignore[return-value, valid-type]


# Classes

# TODO: Data files -> data items throughout, e.g. iterate_over_data_items??

# TODO: Add a parameter to allow for "async for" iteration over data items, awaiting each
#       call of the underlying asynchronous function instead of running all calls concurrently,
#       as is currently implemented. This would be useful for cases where the processing of
#       each data item needs to be done sequentially, e.g. when the processing involves
#       modifying a shared resource that cannot be accessed concurrently. This would still
#       allow asynchronous processing of the job as a whole, e.g. in context of other jobs.


class IterateFuncJobBaseMixin:
    def __init__(  # noqa: C901
        self,
        *,
        iterate_over_data_files: bool = False,
        output_dataset_param: str | None = None,
        output_dataset_cls: type[IsDataset] | None = None,
    ):
        self_as_plain_func_arg_job_base = cast(IsPlainFuncArgJobBase, self)

        self._iterate_over_data_files = iterate_over_data_files
        self._input_dataset_type: type | None = None
        self._output_dataset_param = output_dataset_param
        self._output_dataset_cls = output_dataset_cls
        self._output_dataset_param_in_func: inspect.Parameter | None = None

        if not isinstance(self.iterate_over_data_files, bool):
            raise ValueError(
                'Value of "iterate_over_data_files" parameter must be bool (True/False), '
                f'not "{iterate_over_data_files}"')

        if not iterate_over_data_files:
            if output_dataset_param is not None:
                raise ValueError('Output dataset parameter can only be set when '
                                 '"iterate_over_data_files" is True')
            if output_dataset_cls is not None:
                raise ValueError(
                    'Output dataset class can only be set when "iterate_over_data_files" is True')

        if iterate_over_data_files:
            job_func = self_as_plain_func_arg_job_base._job_func
            if job_func.__name__ != '_omnipy_iterate_func':

                _check_job_func_parameters(job_func)
                self._generate_new_signature_for_iteration(job_func)

                def _sync_iterate_over_data_files_decorator(call_func: Callable):
                    def _omnipy_iterate_func(
                        dataset: InputDatasetT,
                        *args: object,
@github-actions github-actions bot added the todo label Nov 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

0 participants