From 38bbef025d819f8b6eefa92163d44f7a7ae2f8be Mon Sep 17 00:00:00 2001 From: Hans Dembinski Date: Fri, 16 Sep 2022 10:29:02 +0200 Subject: [PATCH] restructuring files (#23) --- src/jacobi/__init__.py | 3 +- src/jacobi/{core.py => _jacobi.py} | 349 ++++++----------------------- src/jacobi/_propagate.py | 215 ++++++++++++++++++ src/jacobi/_typing.py | 11 + 4 files changed, 292 insertions(+), 286 deletions(-) rename src/jacobi/{core.py => _jacobi.py} (53%) create mode 100644 src/jacobi/_propagate.py create mode 100644 src/jacobi/_typing.py diff --git a/src/jacobi/__init__.py b/src/jacobi/__init__.py index d6cd8e8..5a31d72 100644 --- a/src/jacobi/__init__.py +++ b/src/jacobi/__init__.py @@ -1,6 +1,7 @@ """Fast numerical derivatives for analytic functions with arbitrary round-off error.""" -from .core import jacobi, propagate +from ._propagate import propagate +from ._jacobi import jacobi from ._version import version as __version__ # noqa __all__ = ["jacobi", "propagate"] diff --git a/src/jacobi/core.py b/src/jacobi/_jacobi.py similarity index 53% rename from src/jacobi/core.py rename to src/jacobi/_jacobi.py index 36f5498..dcb28e0 100644 --- a/src/jacobi/core.py +++ b/src/jacobi/_jacobi.py @@ -1,77 +1,6 @@ -"""Core functions of jacobi.""" - import numpy as np import typing as _tp - -_T = _tp.TypeVar("_T") - - -class _Indexable(_tp.Iterable, _tp.Sized, _tp.Generic[_T]): - """Indexable type for mypy.""" - - def __getitem__(self, idx: int) -> _T: - """Get item at index idx.""" - ... # pragma: no cover - - -def _steps(p, step, maxiter): - h0, factor = step - h = p * h0 - if h == 0: - h = h0 - return h * factor ** np.arange(maxiter) - - -def _derive(mode, f0, f, x, i, h, args): - x1 = x.copy() - x2 = x.copy() - if mode == 0: - x1[i] += h - x2[i] -= h - return (f(x1, *args) - f(x2, *args)) * (0.5 / h) - h = h * mode - x1[i] += h - x2[i] += 2 * h - f1 = f(x1, *args) - f2 = f(x2, *args) - return (-3 * f0 + 4 * f1 - f2) * (0.5 / h) - - -def _first(method, f0, f, x, i, h, args): - norm = 0.5 / h - f1 = None - f2 = None - if method is None or method == 0: - x1 = x.copy() - x2 = x.copy() - x1[i] -= h - x2[i] += h - f1 = f(x1, *args) - f2 = f(x2, *args) - if method is None: - if np.any(np.isnan(f1)): # forward method - method = 1 - elif np.any(np.isnan(f2)): # backward method - method = -1 - else: - method = 0 - if method == 0: - return method, None, (f1 - f2) * norm - if f0 is None: - f0 = f(x, *args) - if method == -1: - h = -h - norm = -norm - if f1 is None: - x1 = x.copy() - x1[i] += h - f1 = f(x1, *args) - elif method == 1: - f1 = f2 - x2 = x.copy() - x2[i] += 2 * h - f2 = f(x2, *args) - return method, f0, (-3 * f0 + 4 * f1 - f2) * norm +from ._typing import Indexable as _Indexable def jacobi( @@ -166,7 +95,7 @@ def jacobi( x_indices = x_indices[mask] nx = len(x_indices) - if isinstance(diagnostic, dict): + if diagnostic is not None: diagnostic["method"] = np.zeros(nx, dtype=np.int8) diagnostic["iteration"] = np.zeros(len(x_indices), dtype=np.uint8) @@ -182,7 +111,7 @@ def jacobi( # if method is None, auto-detect for each x[k] md, f0, r = _first(method, f0, fn, x, k, h[0], args) - if diagnostic: + if diagnostic is not None: diagnostic["method"][ik] = md if md != 0 and step is None: @@ -199,21 +128,22 @@ def jacobi( if jac is None: jac = np.empty(r_shape + (nx,), dtype=r.dtype) err = np.empty(r_shape + (nx,), dtype=r.dtype) - if diagnostic: + if diagnostic is not None: diagnostic["call"] = np.zeros((nr, nx), dtype=np.uint8) - if diagnostic: + if diagnostic is not None: diagnostic["call"][:, ik] = 2 if md == 0 else 3 for i in range(1, len(h)): fdi = _derive(md, f0, fn, x, k, h[i], args) fdi = np.reshape(fdi, -1) fd.append(fdi if i == 1 else fdi[todo]) - if diagnostic: + if diagnostic is not None: diagnostic["call"][todo, ik] += 2 diagnostic["iteration"][ik] += 1 - # polynomial fit with one extra degree of freedom + # polynomial fit with one extra degree of freedom; + # use latest maxgrad + 1 data points grad = min(i - 1, maxgrad) start = i - (grad + 1) stop = i + 1 @@ -247,11 +177,11 @@ def jacobi( jac[..., ik] = r.reshape(r_shape) err[..., ik] = re.reshape(r_shape) - if diagnostic: + if diagnostic is not None: diagnostic["call"].shape = r_shape + (nx,) if squeeze: - if diagnostic: + if diagnostic is not None: diagnostic["call"] = np.squeeze(diagnostic["call"]) jac = np.squeeze(jac) err = np.squeeze(err) @@ -259,212 +189,61 @@ def jacobi( return jac, err -def propagate( - fn: _tp.Callable, - x: _tp.Union[float, _Indexable[float]], - cov: _tp.Union[float, _Indexable[float], _Indexable[_Indexable[float]]], - *args, - **kwargs, -) -> _tp.Tuple[np.ndarray, np.ndarray]: - """ - Numerically propagates the covariance of function inputs to function outputs. - - The function computes C' = J C J^T, where C is the covariance matrix of the input, - C' the matrix of the output, and J is the Jacobi matrix of first derivatives of the - mapping function fn. The Jacobi matrix is computed numerically. - - Parameters - ---------- - fn: callable - Function that computes r = fn(x, [y, ...]). The arguments of the function are - each allowed to be scalars or one-dimensional arrays. If the function accepts - several arguments, their uncertainties are treated as uncorrelated. - Functions that accept several correlated arguments must be wrapped, see examples. - The result of the function may be a scalar or a one-dimensional array with a - different lenth as the input. - x: float or array-like with shape (N,) - Input vector. An array-like is converted before passing it to the callable. - cov: float or array-like with shape (N,) or shape(N, N) - Covariance matrix of input vector. If the array is one-dimensional, it is - interpreted as the diagonal of a covariance matrix with zero off-diagonal - elements. - *args: - If the function accepts several arguments that are mutually independent, these - is possible to pass those values and covariance matrices pairwise, see examples. - **kwargs: - Extra arguments are passed to :func:`jacobi`. - - Returns - ------- - y, ycov - y is the result of fn(x). - ycov is the propagated covariance matrix. - If ycov is a matrix, unless y is a number. In that case, ycov is also - reduced to a number. - - Examples - -------- - General error propagation maps input vectors to output vectors:: - - def fn(x): - return x ** 2 + 1 - - x = [1, 2] - xcov = [[3, 1], - [1, 4]] - - y, ycov = propagate(fn, x, xcov) - - In the previous example, the function y = fn(x) treats all x values independently, - so the Jacobian computed from fn(x) has zero off-diagonal entries. In this case, - one can speed up the calculation significantly with a special keyword:: - - # same result as before, but faster and uses much less memory - y, ycov = propagate(fn, x, xcov, diagonal=True) - - If the function accepts several arguments, their uncertainties are treated as - uncorrelated:: - - def fn(x, y): - return x + y - - x = 1 - y = 2 - xcov = 2 - ycov = 3 - - z, zcov = propagate(fn, x, xcov, y, ycov) - - Functions that accept several correlated arguments must be wrapped:: - - def fn(x, y): - return x + y - - x = 1 - y = 2 - sigma_x = 3 - sigma_y = 4 - rho_xy = 0.5 - - r = [x, y] - cov_xy = rho_xy * sigma_x * sigma_y - rcov = [[sigma_x ** 2, cov_xy], [cov_xy, sigma_y ** 2]] - - def fn_wrapped(r): - return fn(r[0], r[1]) - - z, zcov = propagate(fn_wrapped, r, rcov) - - See Also - -------- - jacobi - """ - if args: - if len(args) % 2 != 0: - raise ValueError("number of extra positional arguments must be even") - - args_a = [np.asarray(_) for _ in ((x, cov) + args)] - x_parts = args_a[::2] - cov_parts = args_a[1::2] - y_a = np.asarray(fn(*x_parts)) - return _propagate_independent(fn, y_a, x_parts, cov_parts, **kwargs) - - x_a = np.asarray(x) - cov_a = np.asarray(cov) - y_a = np.asarray(fn(x_a)) - - if x_a.ndim > 1: - raise ValueError("x must have dimension 0 or 1") - - if kwargs.get("diagonal", False): - return _propagate_diagonal(fn, y_a, x_a, cov_a, **kwargs) - - if cov_a.ndim > 2: - raise ValueError("cov must have dimension 0, 1, or 2") - - return _propagate_full(fn, y_a, x_a, cov_a, **kwargs) - - -def _propagate_full(fn, y: np.ndarray, x: np.ndarray, xcov: np.ndarray, **kwargs): - x_a = np.atleast_1d(x) - - _check_x_xcov_compatibility(x_a, xcov) - - jac = np.asarray(jacobi(fn, x_a, **kwargs)[0]) - - y_len = len(y) if y.ndim == 1 else 1 - - if jac.ndim == 1: - jac = jac.reshape((y_len, len(x_a))) - assert np.ndim(jac) == 2 - - ycov = _jac_cov_product(jac, xcov) - - if y.ndim == 0: - ycov = np.squeeze(ycov) - - return y, ycov - - -def _propagate_diagonal(fn, y: np.ndarray, x: np.ndarray, xcov: np.ndarray, **kwargs): - x_a = np.atleast_1d(x) - - _check_x_xcov_compatibility(x_a, xcov) - - jac = np.asarray(jacobi(fn, x_a, **kwargs)[0]) - assert jac.ndim <= 1 - - ycov = _jac_cov_product(jac, xcov) - - if y.ndim == 0: - assert ycov.ndim == 0 - - return y, ycov - - -def _propagate_independent( - fn, - y: np.ndarray, - x_parts: _tp.List[np.ndarray], - xcov_parts: _tp.List[np.ndarray], - **kwargs, -): - ycov = 0 - - for i, x in enumerate(x_parts): - rest = x_parts[:i] + x_parts[i + 1 :] - - def wrapped(x, *rest): - args = rest[:i] + (x,) + rest[i:] - return fn(*args) - - x_a = np.atleast_1d(x) - xcov = xcov_parts[i] - _check_x_xcov_compatibility(x_a, xcov) - - jac = np.asarray(jacobi(wrapped, x_a, *rest, **kwargs)[0]) - ycov += _jac_cov_product(jac, xcov) - - if y.ndim == 0: - ycov = np.squeeze(ycov) - - return y, ycov +def _steps(p, step, maxiter): + h0, factor = step + h = p * h0 + if h == 0: + h = h0 + return h * factor ** np.arange(maxiter) -def _jac_cov_product(jac: np.ndarray, xcov: np.ndarray): - if xcov.ndim == 2: - return np.einsum( - "i,j,ij -> ij" if jac.ndim == 1 else "ij,kl,jl", jac, jac, xcov - ) - elif jac.ndim == 2: - if xcov.ndim == 1: - return np.einsum("ij,kj,j", jac, jac, xcov) - return np.einsum("ij,kj", jac, jac) * xcov - assert jac.ndim < 2 and xcov.ndim < 2 - return xcov * jac**2 +def _derive(mode, f0, f, x, i, h, args): + x1 = x.copy() + x2 = x.copy() + if mode == 0: + x1[i] += h + x2[i] -= h + return (f(x1, *args) - f(x2, *args)) * (0.5 / h) + h = h * mode + x1[i] += h + x2[i] += 2 * h + f1 = f(x1, *args) + f2 = f(x2, *args) + return (-3 * f0 + 4 * f1 - f2) * (0.5 / h) -def _check_x_xcov_compatibility(x: np.ndarray, xcov: np.ndarray): - if xcov.ndim > 0 and len(xcov) != (len(x) if x.ndim == 1 else 1): - # this works for 1D and 2D xcov - raise ValueError("x and cov have incompatible shapes") +def _first(method, f0, f, x, i, h, args): + norm = 0.5 / h + f1 = None + f2 = None + if method is None or method == 0: + x1 = x.copy() + x2 = x.copy() + x1[i] -= h + x2[i] += h + f1 = f(x1, *args) + f2 = f(x2, *args) + if method is None: + if np.any(np.isnan(f1)): # forward method + method = 1 + elif np.any(np.isnan(f2)): # backward method + method = -1 + else: + method = 0 + if method == 0: + return method, None, (f1 - f2) * norm + if f0 is None: + f0 = f(x, *args) + if method == -1: + h = -h + norm = -norm + if f1 is None: + x1 = x.copy() + x1[i] += h + f1 = f(x1, *args) + elif method == 1: + f1 = f2 + x2 = x.copy() + x2[i] += 2 * h + f2 = f(x2, *args) + return method, f0, (-3 * f0 + 4 * f1 - f2) * norm diff --git a/src/jacobi/_propagate.py b/src/jacobi/_propagate.py new file mode 100644 index 0000000..070c388 --- /dev/null +++ b/src/jacobi/_propagate.py @@ -0,0 +1,215 @@ +import typing as _tp +from ._typing import Indexable as _Indexable +from ._jacobi import jacobi +import numpy as np + + +def propagate( + fn: _tp.Callable, + x: _tp.Union[float, _Indexable[float]], + cov: _tp.Union[float, _Indexable[float], _Indexable[_Indexable[float]]], + *args, + **kwargs, +) -> _tp.Tuple[np.ndarray, np.ndarray]: + """ + Numerically propagates the covariance of function inputs to function outputs. + + The function computes C' = J C J^T, where C is the covariance matrix of the input, + C' the matrix of the output, and J is the Jacobi matrix of first derivatives of the + mapping function fn. The Jacobi matrix is computed numerically. + + Parameters + ---------- + fn: callable + Function that computes r = fn(x, [y, ...]). The arguments of the function are + each allowed to be scalars or one-dimensional arrays. If the function accepts + several arguments, their uncertainties are treated as uncorrelated. + Functions that accept several correlated arguments must be wrapped, see examples. + The result of the function may be a scalar or a one-dimensional array with a + different lenth as the input. + x: float or array-like with shape (N,) + Input vector. An array-like is converted before passing it to the callable. + cov: float or array-like with shape (N,) or shape(N, N) + Covariance matrix of input vector. If the array is one-dimensional, it is + interpreted as the diagonal of a covariance matrix with zero off-diagonal + elements. + *args: + If the function accepts several arguments that are mutually independent, these + is possible to pass those values and covariance matrices pairwise, see examples. + **kwargs: + Extra arguments are passed to :func:`jacobi`. + + Returns + ------- + y, ycov + y is the result of fn(x). + ycov is the propagated covariance matrix. + If ycov is a matrix, unless y is a number. In that case, ycov is also + reduced to a number. + + Examples + -------- + General error propagation maps input vectors to output vectors:: + + def fn(x): + return x ** 2 + 1 + + x = [1, 2] + xcov = [[3, 1], + [1, 4]] + + y, ycov = propagate(fn, x, xcov) + + In the previous example, the function y = fn(x) treats all x values independently, + so the Jacobian computed from fn(x) has zero off-diagonal entries. In this case, + one can speed up the calculation significantly with a special keyword:: + + # same result as before, but faster and uses much less memory + y, ycov = propagate(fn, x, xcov, diagonal=True) + + If the function accepts several arguments, their uncertainties are treated as + uncorrelated:: + + def fn(x, y): + return x + y + + x = 1 + y = 2 + xcov = 2 + ycov = 3 + + z, zcov = propagate(fn, x, xcov, y, ycov) + + Functions that accept several correlated arguments must be wrapped:: + + def fn(x, y): + return x + y + + x = 1 + y = 2 + sigma_x = 3 + sigma_y = 4 + rho_xy = 0.5 + + r = [x, y] + cov_xy = rho_xy * sigma_x * sigma_y + rcov = [[sigma_x ** 2, cov_xy], [cov_xy, sigma_y ** 2]] + + def fn_wrapped(r): + return fn(r[0], r[1]) + + z, zcov = propagate(fn_wrapped, r, rcov) + + See Also + -------- + jacobi + """ + if args: + if len(args) % 2 != 0: + raise ValueError("number of extra positional arguments must be even") + + args_a = [np.asarray(_) for _ in ((x, cov) + args)] + x_parts = args_a[::2] + cov_parts = args_a[1::2] + y_a = np.asarray(fn(*x_parts)) + return _propagate_independent(fn, y_a, x_parts, cov_parts, **kwargs) + + x_a = np.asarray(x) + cov_a = np.asarray(cov) + y_a = np.asarray(fn(x_a)) + + if x_a.ndim > 1: + raise ValueError("x must have dimension 0 or 1") + + if kwargs.get("diagonal", False): + return _propagate_diagonal(fn, y_a, x_a, cov_a, **kwargs) + + if cov_a.ndim > 2: + raise ValueError("cov must have dimension 0, 1, or 2") + + return _propagate_full(fn, y_a, x_a, cov_a, **kwargs) + + +def _propagate_full(fn, y: np.ndarray, x: np.ndarray, xcov: np.ndarray, **kwargs): + x_a = np.atleast_1d(x) + + _check_x_xcov_compatibility(x_a, xcov) + + jac = np.asarray(jacobi(fn, x_a, **kwargs)[0]) + + y_len = len(y) if y.ndim == 1 else 1 + + if jac.ndim == 1: + jac = jac.reshape((y_len, len(x_a))) + assert np.ndim(jac) == 2 + + ycov = _jac_cov_product(jac, xcov) + + if y.ndim == 0: + ycov = np.squeeze(ycov) + + return y, ycov + + +def _propagate_diagonal(fn, y: np.ndarray, x: np.ndarray, xcov: np.ndarray, **kwargs): + x_a = np.atleast_1d(x) + + _check_x_xcov_compatibility(x_a, xcov) + + jac = np.asarray(jacobi(fn, x_a, **kwargs)[0]) + assert jac.ndim <= 1 + + ycov = _jac_cov_product(jac, xcov) + + if y.ndim == 0: + assert ycov.ndim == 0 + + return y, ycov + + +def _propagate_independent( + fn, + y: np.ndarray, + x_parts: _tp.List[np.ndarray], + xcov_parts: _tp.List[np.ndarray], + **kwargs, +): + ycov = 0 + + for i, x in enumerate(x_parts): + rest = x_parts[:i] + x_parts[i + 1 :] + + def wrapped(x, *rest): + args = rest[:i] + (x,) + rest[i:] + return fn(*args) + + x_a = np.atleast_1d(x) + xcov = xcov_parts[i] + _check_x_xcov_compatibility(x_a, xcov) + + jac = np.asarray(jacobi(wrapped, x_a, *rest, **kwargs)[0]) + ycov += _jac_cov_product(jac, xcov) + + if y.ndim == 0: + ycov = np.squeeze(ycov) + + return y, ycov + + +def _jac_cov_product(jac: np.ndarray, xcov: np.ndarray): + if xcov.ndim == 2: + return np.einsum( + "i,j,ij -> ij" if jac.ndim == 1 else "ij,kl,jl", jac, jac, xcov + ) + elif jac.ndim == 2: + if xcov.ndim == 1: + return np.einsum("ij,kj,j", jac, jac, xcov) + return np.einsum("ij,kj", jac, jac) * xcov + assert jac.ndim < 2 and xcov.ndim < 2 + return xcov * jac**2 + + +def _check_x_xcov_compatibility(x: np.ndarray, xcov: np.ndarray): + if xcov.ndim > 0 and len(xcov) != (len(x) if x.ndim == 1 else 1): + # this works for 1D and 2D xcov + raise ValueError("x and cov have incompatible shapes") diff --git a/src/jacobi/_typing.py b/src/jacobi/_typing.py new file mode 100644 index 0000000..2d3c54b --- /dev/null +++ b/src/jacobi/_typing.py @@ -0,0 +1,11 @@ +from typing import Iterable, Sized, Generic, TypeVar + +T = TypeVar("T") + + +class Indexable(Iterable, Sized, Generic[T]): + """Indexable type for mypy.""" + + def __getitem__(self, idx: int) -> T: + """Get item at index idx.""" + ... # pragma: no cover