Source code for bayesmark.experiment_analysis

# Copyright (c) 2019 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Perform analysis to compare different optimizers across problems.
"""
import json
import logging
import warnings
from collections import OrderedDict

import numpy as np
import xarray as xr

import bayesmark.constants as cc
import bayesmark.quantiles as qt
import bayesmark.xr_util as xru
from bayesmark.cmd_parse import CmdArgs, general_parser, parse_args, serializable_dict
from bayesmark.constants import (
    ITER,
    LB_MEAN,
    LB_MED,
    LB_NORMED_MEAN,
    METHOD,
    NORMED_MEAN,
    NORMED_MED,
    OBJECTIVE,
    PERF_BEST,
    PERF_CLIP,
    PERF_MEAN,
    PERF_MED,
    SUGGEST,
    TEST_CASE,
    TRIAL,
    UB_MEAN,
    UB_MED,
    UB_NORMED_MEAN,
)
from bayesmark.experiment_aggregate import validate_agg_perf
from bayesmark.experiment_baseline import do_baseline
from bayesmark.np_util import cummin, linear_rescale
from bayesmark.serialize import XRSerializer
from bayesmark.signatures import analyze_signature_pair
from bayesmark.stats import t_EB

# Mathematical settings
EVAL_Q = 0.5  # Evaluate based on median loss across n_trials
ALPHA = 0.05  # ==> 95% CIs

logger = logging.getLogger(__name__)


[docs]def get_perf_array(evals, evals_visible): """Get the actual (e.g., generalization loss) over iterations. Parameters ---------- evals : :class:`numpy:numpy.ndarray` of shape (n_iter, n_batch, n_trials) The actual loss (e.g., generalization) for a given experiment. evals_visible : :class:`numpy:numpy.ndarray` of shape (n_iter, n_batch, n_trials) The observable loss (e.g., validation) for a given experiment. Returns ------- perf_array : :class:`numpy:numpy.ndarray` of shape (n_iter, n_trials) The best performance so far at iteration i from `evals`. Where the best has been selected according to `evals_visible`. """ n_iter, _, n_trials = evals.shape assert evals.size > 0, "perf array not supported for empty arrays" assert evals_visible.shape == evals.shape assert not np.any(np.isnan(evals)) assert not np.any(np.isnan(evals_visible)) idx = np.argmin(evals_visible, axis=1) perf_array = np.take_along_axis(evals, idx[:, None, :], axis=1).squeeze(axis=1) assert perf_array.shape == (n_iter, n_trials) visible_perf_array = np.min(evals_visible, axis=1) assert visible_perf_array.shape == (n_iter, n_trials) # Get the minimum from the visible loss perf_array = cummin(perf_array, visible_perf_array) return perf_array
[docs]def compute_aggregates(perf_da, baseline_ds, visible_perf_da=None): """Aggregate function evaluations in the experiments to get performance summaries of each method. Parameters ---------- perf_da : :class:`xarray:xarray.DataArray` Aggregate experimental results with each function evaluation in the experiments according to true loss (e.g., generalization). `perf_da` has dimensions ``(ITER, SUGGEST, TEST_CASE, METHOD, TRIAL)`` as is assumed to have no nan values. baseline_ds : :class:`xarray:xarray.Dataset` Dataset with baseline performance. It was variables ``(PERF_MED, PERF_MEAN, PERF_CLIP, PERF_BEST)`` with dimensions ``(ITER, TEST_CASE)``, ``(ITER, TEST_CASE)``, ``(TEST_CASE,)``, and ``(TEST_CASE,)``, respectively. `PERF_MED` is a baseline of performance based on random search when using medians to summarize performance. Likewise, `PERF_MEAN` is for means. `PERF_CLIP` is an upperbound to clip poor performance when using the mean. `PERF_BEST` is an estimate on the global minimum. visible_perf_da : :class:`xarray:xarray.DataArray` Aggregate experimental results with each function evaluation in the experiments according to visible loss (e.g., validation). `visible_perf_da` has dimensions ``(ITER, SUGGEST, TEST_CASE, METHOD, TRIAL)`` as is assumed to have no nan values. If `None`, we set ``visible_perf_da = perf_da``. Returns ------- agg_result : :class:`xarray:xarray.Dataset` Dataset with summary of performance for each method and test case combination. Contains variables: ``(PERF_MED, LB_MED, UB_MED, NORMED_MED, PERF_MEAN, LB_MEAN, UB_MEAN, NORMED_MEAN)`` each with dimensions ``(ITER, METHOD, TEST_CASE)``. `PERF_MED` is a median summary of performance with `LB_MED` and `UB_MED` as error bars. `NORMED_MED` is a rescaled `PERF_MED` so we expect the optimal performance is 0, and random search gives 1 at all `ITER`. Likewise, `PERF_MEAN`, `LB_MEAN`, `UB_MEAN`, `NORMED_MEAN` are for mean performance. summary : :class:`xarray:xarray.Dataset` Dataset with overall summary of performance of each method. Contains variables ``(PERF_MED, LB_MED, UB_MED, PERF_MEAN, LB_MEAN, UB_MEAN)`` each with dimensions ``(ITER, METHOD)``. """ validate_agg_perf(perf_da, min_trial=1) assert isinstance(baseline_ds, xr.Dataset) assert tuple(baseline_ds[PERF_BEST].dims) == (TEST_CASE,) assert tuple(baseline_ds[PERF_CLIP].dims) == (TEST_CASE,) assert tuple(baseline_ds[PERF_MED].dims) == (ITER, TEST_CASE) assert tuple(baseline_ds[PERF_MEAN].dims) == (ITER, TEST_CASE) assert xru.coord_compat((perf_da, baseline_ds), (ITER, TEST_CASE)) assert not any(np.any(np.isnan(baseline_ds[kk].values)) for kk in baseline_ds) # Now actually get the aggregate performance numbers per test case agg_result = xru.ds_like( perf_da, (PERF_MED, LB_MED, UB_MED, NORMED_MED, PERF_MEAN, LB_MEAN, UB_MEAN, NORMED_MEAN), (ITER, METHOD, TEST_CASE), ) baseline_mean_da = xru.only_dataarray(xru.ds_like(perf_da, ["ref"], (ITER, TEST_CASE))) # Using values here since just clearer to get raw items than xr object for func_name for func_name in perf_da.coords[TEST_CASE].values: rand_perf_med = baseline_ds[PERF_MED].sel({TEST_CASE: func_name}, drop=True).values rand_perf_mean = baseline_ds[PERF_MEAN].sel({TEST_CASE: func_name}, drop=True).values best_opt = baseline_ds[PERF_BEST].sel({TEST_CASE: func_name}, drop=True).values base_clip_val = baseline_ds[PERF_CLIP].sel({TEST_CASE: func_name}, drop=True).values assert np.all(np.diff(rand_perf_med) <= 0), "Baseline should be decreasing with iteration" assert np.all(np.diff(rand_perf_mean) <= 0), "Baseline should be decreasing with iteration" assert np.all(rand_perf_med > best_opt) assert np.all(rand_perf_mean > best_opt) assert np.all(rand_perf_mean <= base_clip_val) baseline_mean_da.loc[{TEST_CASE: func_name}] = linear_rescale( rand_perf_mean, best_opt, base_clip_val, 0.0, 1.0, enforce_bounds=False ) for method_name in perf_da.coords[METHOD].values: # Take the minimum over all suggestion at given iter + sanity check perf_da curr_da = perf_da.sel({METHOD: method_name, TEST_CASE: func_name}, drop=True) assert curr_da.dims == (ITER, SUGGEST, TRIAL) if visible_perf_da is None: perf_array = get_perf_array(curr_da.values, curr_da.values) curr_da_ = perf_da.sel({METHOD: method_name, TEST_CASE: func_name}, drop=True).min(dim=SUGGEST) assert curr_da_.dims == (ITER, TRIAL) perf_array_ = np.minimum.accumulate(curr_da_.values, axis=0) assert np.allclose(perf_array, perf_array_) else: curr_visible_da = visible_perf_da.sel({METHOD: method_name, TEST_CASE: func_name}, drop=True) assert curr_visible_da.dims == (ITER, SUGGEST, TRIAL) perf_array = get_perf_array(curr_da.values, curr_visible_da.values) # Compute median perf and CI on it med_perf, LB, UB = qt.quantile_and_CI(perf_array, EVAL_Q, alpha=ALPHA) assert med_perf.shape == rand_perf_med.shape agg_result[PERF_MED].loc[{TEST_CASE: func_name, METHOD: method_name}] = med_perf agg_result[LB_MED].loc[{TEST_CASE: func_name, METHOD: method_name}] = LB agg_result[UB_MED].loc[{TEST_CASE: func_name, METHOD: method_name}] = UB # Now store normed version, which is better for aggregation normed = linear_rescale(med_perf, best_opt, rand_perf_med, 0.0, 1.0, enforce_bounds=False) agg_result[NORMED_MED].loc[{TEST_CASE: func_name, METHOD: method_name}] = normed # Store normed mean version normed = linear_rescale(perf_array, best_opt, base_clip_val, 0.0, 1.0, enforce_bounds=False) # Also, clip the score from below at -1 to limit max influence of single run on final average normed = np.clip(normed, -1.0, 1.0) normed = np.mean(normed, axis=1) agg_result[NORMED_MEAN].loc[{TEST_CASE: func_name, METHOD: method_name}] = normed # Compute mean perf and CI on it perf_array = np.minimum(base_clip_val, perf_array) mean_perf = np.mean(perf_array, axis=1) assert mean_perf.shape == rand_perf_mean.shape EB = t_EB(perf_array, alpha=ALPHA, axis=1) assert EB.shape == rand_perf_mean.shape agg_result[PERF_MEAN].loc[{TEST_CASE: func_name, METHOD: method_name}] = mean_perf agg_result[LB_MEAN].loc[{TEST_CASE: func_name, METHOD: method_name}] = mean_perf - EB agg_result[UB_MEAN].loc[{TEST_CASE: func_name, METHOD: method_name}] = mean_perf + EB assert not any(np.any(np.isnan(agg_result[kk].values)) for kk in agg_result) # Compute summary score over all test cases, summarize performance of each method summary = xru.ds_like( perf_da, (PERF_MED, LB_MED, UB_MED, PERF_MEAN, LB_MEAN, UB_MEAN, NORMED_MEAN, LB_NORMED_MEAN, UB_NORMED_MEAN), (ITER, METHOD), ) summary[PERF_MED], summary[LB_MED], summary[UB_MED] = xr.apply_ufunc( qt.quantile_and_CI, agg_result[NORMED_MED], input_core_dims=[[TEST_CASE]], kwargs={"q": EVAL_Q, "alpha": ALPHA}, output_core_dims=[[], [], []], ) summary[PERF_MEAN] = agg_result[NORMED_MEAN].mean(dim=TEST_CASE) EB = xr.apply_ufunc(t_EB, agg_result[NORMED_MEAN], input_core_dims=[[TEST_CASE]]) summary[LB_MEAN] = summary[PERF_MEAN] - EB summary[UB_MEAN] = summary[PERF_MEAN] + EB normalizer = baseline_mean_da.mean(dim=TEST_CASE) summary[NORMED_MEAN] = summary[PERF_MEAN] / normalizer summary[LB_NORMED_MEAN] = summary[LB_MEAN] / normalizer summary[UB_NORMED_MEAN] = summary[UB_MEAN] / normalizer assert all(tuple(summary[kk].dims) == (ITER, METHOD) for kk in summary) return agg_result, summary
def main(): """See README for instructions on calling analysis. """ description = "Analyze results from aggregated studies" args = parse_args(general_parser(description)) # Metric used on leaderboard leaderboard_metric = cc.VISIBLE_TO_OPT logger.setLevel(logging.INFO) # Note this is the module-wide logger if args[CmdArgs.verbose]: logger.addHandler(logging.StreamHandler()) # Load in the eval data and sanity check perf_ds, meta = XRSerializer.load_derived(args[CmdArgs.db_root], db=args[CmdArgs.db], key=cc.EVAL_RESULTS) logger.info("Meta data from source file: %s" % str(meta["args"])) # Check if there is baselines file, other make one if cc.BASELINE not in XRSerializer.get_derived_keys(args[CmdArgs.db_root], db=args[CmdArgs.db]): warnings.warn("Baselines not found. Need to construct baseline.") do_baseline(args) # Load in baseline scores data and sanity check (including compatibility with eval data) baseline_ds, meta_ref = XRSerializer.load_derived(args[CmdArgs.db_root], db=args[CmdArgs.db], key=cc.BASELINE) logger.info("baseline data from source ref file: %s" % str(meta_ref["args"])) # Check test case signatures match between eval data and baseline data sig_errs, signatures = analyze_signature_pair(meta["signature"], meta_ref["signature"]) logger.info("Signature errors:\n%s" % sig_errs.to_string()) print(json.dumps({"exp-anal sig errors": sig_errs.T.to_dict()})) # Subset baseline to only the test cases run in the experiments test_cases_run = perf_ds.coords[TEST_CASE].values.tolist() assert set(test_cases_run) <= set( baseline_ds.coords[TEST_CASE].values.tolist() ), "Data set contains test cases not found in baseline." baseline_ds = baseline_ds.sel({TEST_CASE: test_cases_run}) # Also subset to allow shorter runs iters_run = perf_ds.coords[ITER].values.tolist() assert set(iters_run) <= set( baseline_ds.coords[ITER].values.tolist() ), "Data set not same batch size or too many iters compared to baseline." baseline_ds = baseline_ds.sel({ITER: iters_run}) # Do the actual computation perf_visible = perf_ds[cc.VISIBLE_TO_OPT] agg_result = OrderedDict() summary = OrderedDict() for metric_for_scoring in sorted(perf_ds): perf_da = perf_ds[metric_for_scoring] baseline_ds_ = baseline_ds.sel({OBJECTIVE: metric_for_scoring}, drop=True) agg_result[(metric_for_scoring,)], summary[(metric_for_scoring,)] = compute_aggregates( perf_da, baseline_ds_, perf_visible ) agg_result = xru.ds_concat(agg_result, dims=(cc.OBJECTIVE,)) summary = xru.ds_concat(summary, dims=(cc.OBJECTIVE,)) for metric_for_scoring in sorted(perf_ds): # Print summary by problem # Recall that: # ... summary[PERF_MEAN] = agg_result[NORMED_MEAN].mean(dim=TEST_CASE) # ... summary[NORMED_MEAN] = summary[PERF_MEAN] / normalizer # Where normalizer is constant across all problems, optimizers print("Scores by problem (JSON):\n") agg_df = agg_result[NORMED_MEAN].sel({cc.OBJECTIVE: metric_for_scoring}, drop=True)[{ITER: -1}].to_pandas().T print(json.dumps({metric_for_scoring: agg_df.to_dict()})) print("\n") final_score = summary[PERF_MED].sel({cc.OBJECTIVE: metric_for_scoring}, drop=True)[{ITER: -1}] logger.info("median score @ %d:\n%s" % (summary.sizes[ITER], xru.da_to_string(final_score))) final_score = summary[PERF_MEAN].sel({cc.OBJECTIVE: metric_for_scoring}, drop=True)[{ITER: -1}] logger.info("mean score @ %d:\n%s" % (summary.sizes[ITER], xru.da_to_string(final_score))) print("Final scores (JSON):\n") print(json.dumps({metric_for_scoring: final_score.to_series().to_dict()})) print("\n") final_score = summary[NORMED_MEAN].sel({cc.OBJECTIVE: metric_for_scoring}, drop=True)[{ITER: -1}] logger.info("normed mean score @ %d:\n%s" % (summary.sizes[ITER], xru.da_to_string(final_score))) # Now saving results meta = {"args": serializable_dict(args), "signature": signatures} XRSerializer.save_derived(agg_result, meta, args[CmdArgs.db_root], db=args[CmdArgs.db], key=cc.PERF_RESULTS) XRSerializer.save_derived(summary, meta, args[CmdArgs.db_root], db=args[CmdArgs.db], key=cc.MEAN_SCORE) final_msg = xru.da_to_string( 100 * (1.0 - summary[PERF_MEAN].sel({cc.OBJECTIVE: leaderboard_metric}, drop=True)[{ITER: -1}]) ) logger.info("-" * 20) logger.info("Final score `100 x (1-loss)` for leaderboard:\n%s" % final_msg) if __name__ == "__main__": main() # pragma: main