Source code for bayesmark.np_util

# Copyright (c) 2019 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities to that could be included in `numpy` but aren't.
"""
import numpy as np

# np seed must be in [0, 2**32 - 1] = [0, uint32 max]
SEED_MAX_INCL = np.iinfo(np.uint32).max

# Access default numpy rng in way that is short and sphinx friendly
random = np.random.random.__self__


[docs]def random_seed(random=random): """Draw a random seed compatible with :class:`numpy:numpy.random.RandomState`. Parameters ---------- random : :class:`numpy:numpy.random.RandomState` Random stream to use to draw the random seed. Returns ------- seed : int Seed for a new random stream in ``[0, 2**32-1)``. """ # np randint is exclusive on the high value, py randint is inclusive. We # must use inclusive limit here to work with both. We are missing one # possibility here (2**32-1), but I don't think that matters. seed = random.randint(0, SEED_MAX_INCL) return seed
[docs]def shuffle_2d(X, random=random): """Generalization of :func:`numpy:numpy.random.shuffle` of 2D array. Performs in-place shuffling of `X`. So, it has no return value. Parameters ---------- X : :class:`numpy:numpy.ndarray` of shape (n, m) Array-like 2D data to shuffle in place. Shuffles order of rows and order of elements within a row. random : :class:`numpy:numpy.random.RandomState` Random stream to use to draw the random seed. """ random.shuffle(X) for rr in X: random.shuffle(rr)
[docs]def strat_split(X, n_splits, inplace=False, random=random): """Make a stratified random split of items. Parameters ---------- X : :class:`numpy:numpy.ndarray` of shape (n, m) Data we would like to split randomly into groups. We should get the same number +/-1 of elements from each row in each group. n_splits : int How many groups we want to split into. inplace : bool If true, this function will cause in place modifications to `X`. random : :class:`numpy:numpy.random.RandomState` Random stream to use for reproducibility. Returns ------- Y : list(:class:`numpy:numpy.ndarray`) Stratified split of `X` where each row of `Y` contains the same number +/-1 of elements from each row of `X`. Must be a list of arrays since each row may have a different length. """ # Arguably, this function could go in stats assert np.ndim(X) == 2 assert n_splits > 0 if not inplace: X = np.array(X, copy=True) shuffle_2d(X, random=random) # Note this is like X.T.ravel() Y = np.array_split(np.ravel(X, order="F"), n_splits) # Just for good measure make sure this is shuffled too, prob not needed. shuffle_2d(Y, random=random) return Y
[docs]def isclose_lte(x, y): """Check that less than or equal to (lte, ``x <= y``) is approximately true between all elements of `x` and `y`. This is similar to :func:`numpy:numpy.allclose` for equality. Shapes of all input variables must be broadcast compatible. Parameters ---------- x : :class:`numpy:numpy.ndarray` Lower limit in ``<=`` check. y : :class:`numpy:numpy.ndarray` Upper limit in ``<=`` check. Returns ------- lte : bool True if ``x <= y`` is approximately true element-wise. """ # Use np.less_equal to ensure always np type consistently lte = np.less_equal(x, y) | np.isclose(x, y) return lte
[docs]def clip_chk(x, lb, ub, allow_nan=False): """Clip all element of `x` to be between `lb` and `ub` like :func:`numpy:numpy.clip`, but also check :func:`numpy:numpy.isclose`. Shapes of all input variables must be broadcast compatible. Parameters ---------- x : :class:`numpy:numpy.ndarray` Array containing elements to clip. lb : :class:`numpy:numpy.ndarray` Lower limit in clip. ub : :class:`numpy:numpy.ndarray` Upper limit in clip. allow_nan : bool If true, we allow ``nan`` to be present in `x` without out raising an error. Returns ------- x : :class:`numpy:numpy.ndarray` An array with the elements of `x`, but where values < `lb` are replaced with `lb`, and those > `ub` with `ub`. """ assert np.all(lb <= ub) # np.clip does not do this check x = np.asarray(x) # These are asserts not exceptions since clip_chk most used internally. if allow_nan: assert np.all(isclose_lte(lb, x) | np.isnan(x)) assert np.all(isclose_lte(x, ub) | np.isnan(x)) else: assert np.all(isclose_lte(lb, x)) assert np.all(isclose_lte(x, ub)) x = np.clip(x, lb, ub) return x
[docs]def snap_to(x, fixed_val=None): """Snap input `x` to the `fixed_val` unless `fixed_val` is `None`, where `x` is returned. Parameters ---------- x : :class:`numpy:numpy.ndarray` Array containing elements to snap. fixed_val : :class:`numpy:numpy.ndarray` or None Values to be returned if `x` is close, otherwise an error is raised. If `fixed_val` is `None`, `x` is returned. Returns ------- fixed_val : :class:`numpy:numpy.ndarray` Snapped to value of `x`. """ if fixed_val is None: return x # Include == for discrete types where allclose doesn't work if not (np.all(x == fixed_val) or np.allclose(x, fixed_val)): raise ValueError("Expected fixed value %s, got %s." % (repr(fixed_val), repr(x))) assert np.all(x == fixed_val) or np.allclose(x, fixed_val) fixed_val = np.broadcast_to(fixed_val, np.shape(x)) return fixed_val
[docs]def linear_rescale(X, lb0, ub0, lb1, ub1, enforce_bounds=True): """Linearly transform all elements of `X`, bounded between `lb0` and `ub0`, to be between `lb1` and `ub1`. Shapes of all input variables must be broadcast compatible. Parameters ---------- X : :class:`numpy:numpy.ndarray` Array containing elements to rescale. lb0 : :class:`numpy:numpy.ndarray` Current lower bound of `X`. ub0 : :class:`numpy:numpy.ndarray` Current upper bound of `X`. lb1 : :class:`numpy:numpy.ndarray` Desired lower bound of `X`. ub1 : :class:`numpy:numpy.ndarray` Desired upper bound of `X`. enforce_bounds : bool If True, perform input bounds check (and clipping if slight violation) on the input `X` and again on the output. This argument is not meant to be vectorized like the other input variables. Returns ------- X : :class:`numpy:numpy.ndarray` Elements of input `X` after linear rescaling. """ assert np.all(np.isfinite(lb0)) assert np.all(np.isfinite(lb1)) assert np.all(np.isfinite(ub0)) assert np.all(np.isfinite(ub1)) assert np.all(lb0 < ub0) assert np.all(lb1 <= ub1) m = np.true_divide(ub1 - lb1, ub0 - lb0) assert np.all(m >= 0) if enforce_bounds: X = clip_chk(X, lb0, ub0) # This will flag any non-finite X input. X = clip_chk(m * (X - lb0) + lb1, lb1, ub1) else: X = m * (X - lb0) + lb1 return X
[docs]def argmin_2d(X): """Take the arg minimum of a 2D array.""" assert X.size > 0, "argmin of empty array not defined" ii, jj = np.unravel_index(X.argmin(), X.shape) return ii, jj
[docs]def cummin(x_val, x_key): """Get the cumulative minimum of `x_val` when ranked according to `x_key`. Parameters ---------- x_val : :class:`numpy:numpy.ndarray` of shape (n, d) The array to get the cumulative minimum of along axis 0. x_key : :class:`numpy:numpy.ndarray` of shape (n, d) The array for ranking elements as to what is the minimum. Returns ------- c_min : :class:`numpy:numpy.ndarray` of shape (n, d) The cumulative minimum array. """ assert x_val.shape == x_key.shape assert x_val.ndim == 2 assert not np.any(np.isnan(x_key)), "cummin not defined for nan key" n, _ = x_val.shape xm = np.minimum.accumulate(x_key, axis=0) idx = np.maximum.accumulate((x_key <= xm) * np.arange(n)[:, None]) c_min = np.take_along_axis(x_val, idx, axis=0) return c_min