diff --git a/changelog.txt b/changelog.txt index 63c7af8cd..d9403b8cb 100644 --- a/changelog.txt +++ b/changelog.txt @@ -7,13 +7,15 @@ The **signac** package follows `semantic versioning `_. Version 2 ========= -[2.2.0] -- 2023-xx-xx +[2.2.0] -- 2024-xx-xx --------------------- Added +++++ - Official support for Python 3.12 (#957). + - ``Job.cached_statepoint`` - cached and read only access to job state points. Faster than + ``Job.statepoint`` (#975). Changed +++++++ @@ -21,6 +23,7 @@ Changed - Restrict allowable tar file features in Python 3.12 (#957). - linked views now can contain spaces and other characters except directory separators (#926). - linked views now can be created on Windows, if 'Developer mode' is enabled (#430). + - Increase performance for many usage patterns (#975). Fixed +++++ diff --git a/doc/api.rst b/doc/api.rst index 4a89a5a55..41734ed39 100644 --- a/doc/api.rst +++ b/doc/api.rst @@ -78,6 +78,7 @@ The Job class .. autosummary:: + Job.cached_statepoint Job.clear Job.close Job.data diff --git a/signac/job.py b/signac/job.py index c26e396b8..c3a637ff0 100644 --- a/signac/job.py +++ b/signac/job.py @@ -11,6 +11,7 @@ import shutil from copy import deepcopy from threading import RLock +from types import MappingProxyType from typing import FrozenSet from synced_collections.backends.collection_json import ( @@ -248,7 +249,8 @@ class Job: Jobs can be opened by ``statepoint`` or ``id_``. If both values are provided, it is the user's responsibility to ensure that the values - correspond. + correspond. Set ``directory_known`` to ``True`` when the job directory + is known to exist - this skips some expensive isdir checks. Parameters ---------- @@ -258,6 +260,8 @@ class Job: State point for the job. (Default value = None) id_ : str, optional The job identifier. (Default value = None) + directory_known : bool, optional + Set to true when the job directory is known to exist. (Default value = False) """ @@ -274,30 +278,34 @@ class Job: KEY_DATA = "signac_data" "The job's datastore key." - def __init__(self, project, statepoint=None, id_=None): + def __init__(self, project, statepoint=None, id_=None, directory_known=False): self._project = project self._lock = RLock() self._initialize_lazy_properties() + self._directory_known = directory_known if statepoint is None and id_ is None: raise ValueError("Either statepoint or id_ must be provided.") elif statepoint is not None: - self._statepoint_requires_init = False try: self._id = calc_id(statepoint) if id_ is None else id_ except TypeError: raise KeyTypeError - self._statepoint = _StatePointDict( - jobs=[self], filename=self._statepoint_filename, data=statepoint - ) - # Update the project's state point cache immediately if opened by state point - self._project._register(self.id, statepoint) + self._cached_statepoint = statepoint + self._statepoint_requires_init = True else: # Only an id was provided. State point will be loaded lazily. self._id = id_ self._statepoint_requires_init = True + # Fetch the cached statepoint from the project's cache. Don't load it + # from disk on a cache miss (will be loaded on demand). + try: + self._cached_statepoint = project._sp_cache[id_] + except KeyError: + self._cached_statepoint = None + def _initialize_lazy_properties(self): """Initialize all properties that are designed to be loaded lazily.""" with self._lock: @@ -334,7 +342,7 @@ def __str__(self): def __repr__(self): return "{}(project={}, statepoint={})".format( - self.__class__.__name__, repr(self._project), self.statepoint + self.__class__.__name__, repr(self._project), self.cached_statepoint ) @property @@ -406,6 +414,33 @@ def update_statepoint(self, update, overwrite=False): statepoint.update(update) self.statepoint = statepoint + @property + def cached_statepoint(self): + """Get a copy of the job's state point as a read-only mapping. + + :py:attr:`cached_statepoint` uses the state point cache to provide fast access to + the job's state point for reading. + + .. note:: + + Create and update the state point cache by calling + :py:meth:`project.update_cache ` + or running ``signac update-cache`` on the command line. + + .. seealso:: + + Use :py:attr:`statepoint` to modify the job's state point. + + Returns + ------- + Mapping + Returns the job's state point. + """ + if self._cached_statepoint is None: + self._cached_statepoint = self._project._get_statepoint(self._id) + + return MappingProxyType(self._cached_statepoint) + @property def statepoint(self): """Get or set the job's state point. @@ -416,6 +451,11 @@ def statepoint(self): `Modifying the State Point `_. + .. tip:: + + Use :py:attr:`cached_statepoint` for fast read-only access to the + state point. + .. warning:: The state point object behaves like a dictionary in most cases, @@ -443,14 +483,25 @@ def statepoint(self): """ with self._lock: if self._statepoint_requires_init: - # Load state point data lazily (on access). - self._statepoint = _StatePointDict( - jobs=[self], filename=self._statepoint_filename - ) - statepoint = self._statepoint.load(self.id) + if self._cached_statepoint is None: + # Load state point data lazily (on access). + self._statepoint = _StatePointDict( + jobs=[self], + filename=self._statepoint_filename, + ) + statepoint = self._statepoint.load(self.id) + + # Update the project's state point cache when loaded lazily + self._project._register(self.id, statepoint) + self._cached_statepoint = statepoint + else: + # Create _StatePointDict lazily with a known statepoint dict. + self._statepoint = _StatePointDict( + jobs=[self], + filename=self._statepoint_filename, + data=self._cached_statepoint, + ) - # Update the project's state point cache when loaded lazily - self._project._register(self.id, statepoint) self._statepoint_requires_init = False return self._statepoint @@ -510,7 +561,7 @@ def document(self): """ with self._lock: if self._document is None: - self.init() + self.init(validate_statepoint=False) fn_doc = os.path.join(self.path, self.FN_DOCUMENT) self._document = BufferedJSONAttrDict( filename=fn_doc, write_concern=True @@ -591,9 +642,9 @@ def stores(self): """ with self._lock: if self._stores is None: - self.init() + self.init(validate_statepoint=False) self._stores = H5StoreManager(self.path) - return self.init()._stores + return self._stores @property def data(self): @@ -640,7 +691,7 @@ def project(self): """ return self._project - def init(self, force=False): + def init(self, force=False, validate_statepoint=True): """Initialize the job's workspace directory. This function will do nothing if the directory and the job state point @@ -656,6 +707,10 @@ def init(self, force=False): Overwrite any existing state point files, e.g., to repair them if they got corrupted (Default value = False). + validate_statepoint : bool, optional + When True (the default), load the job state point and ensure that it matches + the id. When False, exit early when the job directory exists. + Returns ------- Job @@ -671,6 +726,15 @@ def init(self, force=False): """ with self._lock: try: + # Fast early exit when not validating. + if not validate_statepoint: + if self._directory_known: + return self + + if os.path.isdir(self.path): + self._directory_known = True + return self + # Attempt early exit if the state point file exists and is valid. try: statepoint = self.statepoint.load(self.id) @@ -687,6 +751,8 @@ def init(self, force=False): ) raise + self._directory_known = True + # The state point save will not overwrite an existing file on # disk unless force is True, so the subsequent load will catch # when a preexisting invalid file was present. @@ -760,6 +826,8 @@ def remove(self): self._document = None self._stores = None + self._directory_known = False + def move(self, project): """Move this job to project. @@ -899,7 +967,7 @@ def open(self): """ self._cwd.append(os.getcwd()) - self.init() + self.init(validate_statepoint=False) logger.info(f"Enter workspace '{self.path}'.") os.chdir(self.path) diff --git a/signac/project.py b/signac/project.py index d0cb27938..6ead1c356 100644 --- a/signac/project.py +++ b/signac/project.py @@ -15,6 +15,7 @@ from collections import defaultdict from collections.abc import Iterable from contextlib import contextmanager +from copy import deepcopy from datetime import timedelta from itertools import groupby from multiprocessing.pool import ThreadPool @@ -203,6 +204,7 @@ def __init__(self, path=None): # can be used to re-open a job by id as long as that id remains in the # cache. self._sp_cache = {} + self._sp_cache_read = False self._sp_cache_misses = 0 self._sp_cache_warned = False self._sp_cache_miss_warning_threshold = self.config.get( @@ -517,13 +519,18 @@ def open_job(self, statepoint=None, id=None): than one match. """ + if not self._sp_cache_read: + # Read the cache from disk on the first call. + self._read_cache() + self._sp_cache_read = True + if statepoint is None and id is None: raise ValueError("Must provide statepoint or id.") elif statepoint is not None and id is not None: raise ValueError("Either statepoint or id must be provided, but not both.") elif statepoint is not None: # Second best case (Job will update self._sp_cache on init) - return Job(project=self, statepoint=statepoint) + return Job(project=self, statepoint=deepcopy(statepoint)) try: # Optimal case (id is in the state point cache) return Job(project=self, statepoint=self._sp_cache[id], id_=id) @@ -545,7 +552,7 @@ def open_job(self, statepoint=None, id=None): elif not self._contains_job_id(id): # id does not exist in the project data space raise KeyError(id) - return Job(project=self, id_=id) + return Job(project=self, id_=id, directory_known=True) def _job_dirs(self): """Generate ids of jobs in the workspace. @@ -712,6 +719,20 @@ def find_jobs(self, filter=None): See :ref:`signac find ` for the command line equivalent. + .. tip:: + + To find a single job given a state point, use `open_job` with O(1) cost. + + .. tip:: + + To find many groups of jobs, use your own code to loop through the project + once and build multiple matching lists. + + .. warning:: + + `find_jobs` costs O(N) each time it is called. It applies the filter to + every job in the workspace. + Parameters ---------- filter : Mapping, optional @@ -835,13 +856,20 @@ def _register(self, id_, statepoint): """ self._sp_cache[id_] = statepoint - def _get_statepoint_from_workspace(self, job_id): + def _get_statepoint_from_workspace(self, job_id, validate=True): """Attempt to read the state point from the workspace. Parameters ---------- job_id : str Identifier of the job. + validate : bool + When True, validate that any state point read from disk matches the job_id. + + Raises + ------ + :class:`signac.errors.JobsCorruptedError` + When one or more jobs are identified as corrupted. """ # Performance-critical path. We can rely on the project workspace, job @@ -850,7 +878,11 @@ def _get_statepoint_from_workspace(self, job_id): fn_statepoint = os.sep.join((self.workspace, job_id, Job.FN_STATE_POINT)) try: with open(fn_statepoint, "rb") as statepoint_file: - return json.loads(statepoint_file.read().decode()) + statepoint = json.loads(statepoint_file.read().decode()) + if validate and calc_id(statepoint) != job_id: + raise JobsCorruptedError([job_id]) + + return statepoint except (OSError, ValueError) as error: if os.path.isdir(os.sep.join((self.workspace, job_id))): logger.error( @@ -861,7 +893,7 @@ def _get_statepoint_from_workspace(self, job_id): raise JobsCorruptedError([job_id]) raise KeyError(job_id) - def _get_statepoint(self, job_id): + def _get_statepoint(self, job_id, validate=True): """Get the state point associated with a job id. The state point is retrieved from the internal cache, from @@ -871,6 +903,9 @@ def _get_statepoint(self, job_id): ---------- job_id : str A job id to get the state point for. + validate : bool + When True, validate that any state point read from disk matches the job_id. + Returns ------- @@ -886,10 +921,10 @@ def _get_statepoint(self, job_id): corrupted. """ - if not self._sp_cache: - # Triggers if no state points have been added to the cache, and all - # the values are None. + if not self._sp_cache_read: + # Read the cache from disk on the first call. self._read_cache() + self._sp_cache_read = True try: # State point cache hit return self._sp_cache[job_id] @@ -902,10 +937,10 @@ def _get_statepoint(self, job_id): ): logger.debug( "High number of state point cache misses. Consider " - "to update cache with the Project.update_cache() method." + "updating the cache by running `signac update-cache`." ) self._sp_cache_warned = True - statepoint = self._get_statepoint_from_workspace(job_id) + statepoint = self._get_statepoint_from_workspace(job_id, validate) # Update the project's state point cache from this cache miss self._sp_cache[job_id] = statepoint return statepoint @@ -1237,11 +1272,7 @@ def check(self): logger.info("Checking workspace for corruption...") for job_id in self._find_job_ids(): try: - statepoint = self._get_statepoint(job_id) - if calc_id(statepoint) != job_id: - corrupted.append(job_id) - else: - self.open_job(statepoint).init() + self._get_statepoint_from_workspace(job_id) except JobsCorruptedError as error: corrupted.extend(error.job_ids) if corrupted: @@ -1277,7 +1308,7 @@ def repair(self, job_ids=None): for job_id in job_ids: try: # First, check if we can look up the state point. - statepoint = self._get_statepoint(job_id) + statepoint = self._get_statepoint(job_id, validate=False) # Check if state point and id correspond. correct_id = calc_id(statepoint) if correct_id != job_id: @@ -1406,7 +1437,6 @@ def update_cache(self): including iteration and filtering or selection are expected to be significantly faster after calling this function, especially for large data spaces. - """ logger.info("Update cache...") start = time.time() @@ -1610,7 +1640,7 @@ def get_job(cls, path=None): project = cls.get_project(os.path.join(job_path, os.pardir)) # Return the matched job id from the found project - return project.open_job(id=job_id) + return Job(project=project, id_=job_id, directory_known=True) def __getstate__(self): state = dict(self.__dict__) @@ -1667,7 +1697,9 @@ def __init__(self, project, ids): self._ids_iterator = iter(ids) def __next__(self): - return self._project.open_job(id=next(self._ids_iterator)) + return Job( + project=self._project, id_=next(self._ids_iterator), directory_known=True + ) def __iter__(self): return type(self)(self._project, self._ids) @@ -1681,6 +1713,12 @@ class JobsCursor: Enables simple iteration and grouping operations. + .. warning:: + + `JobsCursor` caches the jobs that match the filter. Call `Project.find_jobs` + again to update the search after making changes to jobs or the workspace + that would change the result of the search. + Parameters ---------- project : :class:`~signac.Project` @@ -1700,6 +1738,42 @@ def __init__(self, project, filter=None): if self._filter == {}: self._filter = None + # Cache for matching ids. + self._id_cache = None + self._id_set_cache = None + + @property + def _ids(self): + """List of job ids that match the filter. + + Populated on first use, then cached in subsequent calls. + + Returns + ------- + list[str] + Job ids that match the filter. + """ + if self._id_cache is None: + self._id_cache = self._project._find_job_ids(self._filter) + + return self._id_cache + + @property + def _id_set(self): + """Set of job ids that match the filter. + + Populated on first use, then cached in subsequent calls. + + Returns + ------- + set[str] + Job ids that match the filter. + """ + if self._id_set_cache is None: + self._id_set_cache = set(self._ids) + + return self._id_set_cache + def __eq__(self, other): return self._project == other._project and self._filter == other._filter @@ -1708,7 +1782,7 @@ def __len__(self): if self._filter: # We use the standard function for determining job ids if and only if # any of the two filter is provided. - return len(self._project._find_job_ids(self._filter)) + return len(self._ids) else: # Without filters, we can simply return the length of the whole project. return len(self._project) @@ -1727,25 +1801,14 @@ def __contains__(self, job): True if the job matches the filter criteria and is initialized for this project. """ - if job not in self._project: - # Exit early if the job is not in the project. This is O(1). - return False if self._filter: - # We use the standard function for determining job ids if a filter - # is provided. This is O(N) and could be optimized by caching the - # ids of state points that match a state point filter. Caching the - # matches for a document filter is not safe because the document - # can change. - return job.id in self._project._find_job_ids(self._filter) - # Without filters, we can simply check if the job is in the project. - # By the early-exit condition, we know the job must be contained. - return True + return job.id in self._id_set + + return job in self._project def __iter__(self): # Code duplication here for improved performance. - return _JobsCursorIterator( - self._project, self._project._find_job_ids(self._filter) - ) + return _JobsCursorIterator(self._project, self._ids) def groupby(self, key=None, default=None): """Group jobs according to one or more state point or document parameters. @@ -1845,7 +1908,7 @@ def keyfunction(job): else: def keyfunction(job): - return job.sp[stripped_key] + return job.cached_statepoint[stripped_key] else: if _is_doc_key(key): @@ -1856,7 +1919,7 @@ def keyfunction(job): else: def keyfunction(job): - return job.sp.get(stripped_key, default) + return job.cached_statepoint.get(stripped_key, default) elif isinstance(key, Iterable): sp_keys = [] @@ -1875,7 +1938,7 @@ def keyfunction(job): def keyfunction(job): return tuple( - [job.sp[k] for k in sp_keys] + [job.cached_statepoint[k] for k in sp_keys] + [job.document[k] for k in doc_keys] ) @@ -1883,7 +1946,7 @@ def keyfunction(job): def keyfunction(job): return tuple( - [job.sp.get(k, default) for k in sp_keys] + [job.cached_statepoint.get(k, default) for k in sp_keys] + [job.document.get(k, default) for k in doc_keys] ) @@ -2002,7 +2065,7 @@ def _export_sp_and_doc(job): tuple with prefixed state point or document key and values. """ - for key, value in _flatten(job.statepoint).items(): + for key, value in _flatten(job.cached_statepoint).items(): prefixed_key = sp_prefix + key if usecols(prefixed_key): yield prefixed_key, value diff --git a/tests/test_job.py b/tests/test_job.py index 7cf8d6aa6..20cf7bf69 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -308,7 +308,7 @@ def test_interface_job_identity_change(self): def test_interface_nested_kws(self): with pytest.raises(InvalidKeyError): - job = self.open_job({"a.b.c": 0}) + job = self.open_job({"a.b.c": 0}).statepoint job = self.open_job(dict(a=dict(b=dict(c=2)))) assert job.sp.a.b.c == 2 @@ -492,6 +492,43 @@ class A: with pytest.raises(TypeError): job.doc = {key: "test"} + def test_cached_statepoint_read_only(self): + statepoint = {"a": 0, "b": 1, "dict": {"value": "string"}} + job = self.open_job(statepoint=statepoint) + job.init() + + assert "a" in job.cached_statepoint + assert "b" in job.cached_statepoint + assert "c" not in job.cached_statepoint + assert "dict" in job.cached_statepoint + assert job.cached_statepoint["a"] == 0 + assert job.cached_statepoint["b"] == 1 + assert job.cached_statepoint["dict"] == {"value": "string"} + with pytest.raises(KeyError): + job.cached_statepoint["c"] + assert list(job.cached_statepoint.keys()) == ["a", "b", "dict"] + + with pytest.raises(TypeError): + job.cached_statepoint["c"] = 2 + + def test_cached_statepoint_lazy_init(self): + statepoint = {"a": 0} + job = self.project.open_job(statepoint=statepoint) + job.init() + id_ = job.id + + # Clear the cache to force a lazy load of the cached statepoint + self.project._sp_cache.clear() + job = self.project.open_job(id=id_) + job.cached_statepoint + + def test_no_args_error(self): + with pytest.raises(ValueError): + self.project.open_job() + + with pytest.raises(ValueError): + Job(project=self.project) + class TestConfig(TestJobBase): def test_config_str(self): diff --git a/tests/test_numpy_integration.py b/tests/test_numpy_integration.py index 28275fe85..f64e1ddca 100644 --- a/tests/test_numpy_integration.py +++ b/tests/test_numpy_integration.py @@ -22,6 +22,7 @@ def test_store_number_in_sp_and_doc(self): b = numpy.float64(i) if i % 2 else numpy.float32(i) with pytest.warns(NumpyConversionWarning): job = self.project.open_job(dict(a=a)) + job.statepoint with pytest.warns(NumpyConversionWarning): job.doc.b = b numpy.testing.assert_equal(job.doc.b, b) diff --git a/tests/test_project.py b/tests/test_project.py index da546ede0..2b75e2eb5 100644 --- a/tests/test_project.py +++ b/tests/test_project.py @@ -513,7 +513,11 @@ def test_rename_workspace(self): with pytest.raises(JobsCorruptedError): self.project.check() # ... we reinitialize the initial job, ... - job.init() + try: + job.init() + except JobsCorruptedError: + # ... which raises the JobsCorruptedError in update_cache + pass with pytest.raises(JobsCorruptedError): # ... which means the repair attempt must fail. self.project.repair()