Source code for bayesmark.experiment_aggregate

# Copyright (c) 2019 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Aggregate the results of many studies to prepare analysis.
"""
import json
import logging
from collections import Counter

import numpy as np
import xarray as xr

import bayesmark.constants as cc
import bayesmark.xr_util as xru
from bayesmark.cmd_parse import CmdArgs, agg_parser, parse_args, serializable_dict, unserializable_dict
from bayesmark.constants import ARG_DELIM, EVAL_RESULTS, ITER, METHOD, SUGGEST, TEST_CASE, TIME_RESULTS, TRIAL
from bayesmark.serialize import XRSerializer
from bayesmark.signatures import analyze_signatures
from bayesmark.sklearn_funcs import SklearnModel
from bayesmark.util import str_join_safe

logger = logging.getLogger(__name__)


[docs]def validate_time(all_time): """Validate the aggregated time data set.""" assert isinstance(all_time, xr.Dataset) assert all_time[cc.SUGGEST_PHASE].dims == (ITER,) assert all_time[cc.EVAL_PHASE].dims == (ITER, SUGGEST) assert all_time[cc.OBS_PHASE].dims == (ITER,) assert xru.is_simple_coords(all_time.coords, min_side=1)
[docs]def validate_perf(perf_da): """Validate the input eval data arrays.""" assert isinstance(perf_da, xr.Dataset) assert perf_da.dims == (ITER, SUGGEST) assert xru.is_simple_coords(perf_da.coords) assert not np.any(np.isnan(perf_da.values))
[docs]def validate_agg_perf(perf_da, min_trial=1): """Validate the aggregated eval data set.""" assert isinstance(perf_da, xr.DataArray) assert perf_da.dims == (ITER, SUGGEST, TEST_CASE, METHOD, TRIAL) assert xru.is_simple_coords(perf_da.coords, dims=(ITER, SUGGEST, TRIAL)) assert not np.any(np.isnan(perf_da.values)) assert perf_da.sizes[TRIAL] >= min_trial
[docs]def summarize_time(all_time): """Transform a single timing dataset from an experiment into a form better for aggregation. Parameters ---------- all_time : :class:`xarray:xarray.Dataset` Dataset with variables ``(SUGGEST_PHASE, EVAL_PHASE, OBS_PHASE)`` which have dimensions ``(ITER,)``, ``(ITER, SUGGEST)``, and ``(ITER,)``, respectively. The variable `EVAL_PHASE` has the function evaluation time for each parallel suggestion. Returns ------- time_summary : :class:`xarray:xarray.Dataset` Dataset with variables ``(SUGGEST_PHASE, OBS_PHASE, EVAL_PHASE_MAX, EVAL_PHASE_SUM)`` which all have dimensions ``(ITER,)``. The maximum `EVAL_PHASE_MAX` is relevant for wall clock time, while `EVAL_PHASE_SUM` is relevant for CPU time. """ validate_time(all_time) time_summary = xr.Dataset(coords=all_time.coords) time_summary[cc.SUGGEST_PHASE] = all_time[cc.SUGGEST_PHASE] time_summary[cc.OBS_PHASE] = all_time[cc.OBS_PHASE] time_summary[cc.EVAL_PHASE_MAX] = all_time[cc.EVAL_PHASE].max(dim=SUGGEST) time_summary[cc.EVAL_PHASE_SUM] = all_time[cc.EVAL_PHASE].sum(dim=SUGGEST) return time_summary
[docs]def concat_experiments(all_experiments, ravel=False): """Aggregate the Datasets from a series of experiments into combined Dataset. Parameters ---------- all_experiments : typing.Iterable Iterable (possible from a generator) with the Datasets from each experiment. Each item in `all_experiments` is a pair containing ``(meta_data, data)``. See `load_experiments` for details on these variables, ravel : bool If true, ravel all studies to store batch suggestions as if they were serial. Returns ------- all_perf : :class:`xarray:xarray.Dataset` DataArray containing all of the `perf_da` from the experiments. The meta-data from the experiments are included as extra dimensions. `all_perf` has dimensions ``(ITER, SUGGEST, TEST_CASE, METHOD, TRIAL)``. To convert the `uuid` to a trial, there must be an equal number of repetition in the experiments for each `TEST_CASE`, `METHOD` combination. Likewise, all of the experiments need an equal number of `ITER` and `SUGGEST`. If `ravel` is true, then the `SUGGEST` is singleton. all_time : :class:`xarray:xarray.Dataset` Dataset containing all of the `time_ds` from the experiments. The new dimensions are ``(ITER, TEST_CASE, METHOD, TRIAL)``. It has the same variables as `time_ds`. all_suggest : :class:`xarray:xarray.Dataset` DataArray containing all of the `suggest_ds` from the experiments. It has dimensions ``(ITER, SUGGEST, TEST_CASE, METHOD, TRIAL)``. all_sigs : dict(str, list(list(float))) Aggregate of all experiment signatures. """ all_perf = {} all_time = {} all_suggest = {} all_sigs = {} trial_counter = Counter() for (test_case, optimizer, uuid), (perf_ds, time_ds, suggest_ds, sig) in all_experiments: if ravel: raise NotImplementedError("ravel is deprecated. Just reshape in analysis steps instead.") case_key = (test_case, optimizer, trial_counter[(test_case, optimizer)]) trial_counter[(test_case, optimizer)] += 1 # Process perf data assert all(perf_ds[kk].dims == (ITER, SUGGEST) for kk in perf_ds) all_perf[case_key] = perf_ds # Process time data all_time[case_key] = summarize_time(time_ds) # Process suggestion data all_suggest_curr = all_suggest.setdefault(test_case, {}) all_suggest_curr[case_key] = suggest_ds # Handle the signatures all_sigs.setdefault(test_case, []).append(sig) assert min(trial_counter.values()) == max(trial_counter.values()), "Uneven number of trials per test case" # Now need to concat dict of datasets into single dataset all_perf = xru.ds_concat(all_perf, dims=(TEST_CASE, METHOD, TRIAL)) assert all(all_perf[kk].dims == (ITER, SUGGEST, TEST_CASE, METHOD, TRIAL) for kk in all_perf) assert not any( np.any(np.isnan(all_perf[kk].values)) for kk in all_perf ), "Missing combinations of method and test case" all_time = xru.ds_concat(all_time, dims=(TEST_CASE, METHOD, TRIAL)) assert all(all_time[kk].dims == (ITER, TEST_CASE, METHOD, TRIAL) for kk in all_time) assert not any(np.any(np.isnan(all_time[kk].values)) for kk in all_time) assert xru.coord_compat((all_perf, all_time), (ITER, TEST_CASE, METHOD, TRIAL)) for test_case in all_suggest: all_suggest[test_case] = xru.ds_concat(all_suggest[test_case], dims=(TEST_CASE, METHOD, TRIAL)) assert all( all_suggest[test_case][kk].dims == (ITER, SUGGEST, TEST_CASE, METHOD, TRIAL) for kk in all_suggest[test_case] ) assert not any(np.any(np.isnan(all_suggest[test_case][kk].values)) for kk in all_suggest[test_case]) assert xru.coord_compat((all_perf, all_suggest[test_case]), (ITER, METHOD, TRIAL)) assert all_suggest[test_case].coords[TEST_CASE].shape == (1,), "test case should be singleton" return all_perf, all_time, all_suggest, all_sigs
[docs]def load_experiments(uuid_list, db_root, dbid): # pragma: io """Generator to load the results of the experiments. Parameters ---------- uuid_list : list(uuid.UUID) List of UUIDs corresponding to experiments to load. db_root : str Root location for data store as requested by the serializer used. dbid : str Name of the data store as requested by the serializer used. Yields ------ meta_data : (str, str, str) The `meta_data` contains a `tuple` of `str` with ``test_case, optimizer, uuid``. data : (:class:`xarray:xarray.Dataset`, :class:`xarray:xarray.Dataset`, :class:`xarray:xarray.Dataset` list(float)) The `data` contains a tuple of ``(perf_ds, time_ds, suggest_ds, sig)``. The `perf_ds` is a :class:`xarray:xarray.Dataset` containing the evaluation results with dimensions ``(ITER, SUGGEST)``, each variable is an objective. The `time_ds` is an :class:`xarray:xarray.Dataset` containing the timing results of the form accepted by `summarize_time`. The coordinates must be compatible with `perf_ds`. The suggest_ds is a :class:`xarray:xarray.Dataset` containing the inputs to the function evaluations. Each variable is a function input. Finally, `sig` contains the `test_case` signature and must be `list(float)`. """ uuids_seen = set() for uuid_ in uuid_list: logger.info(uuid_.hex) # Load perf and timing data perf_ds, meta = XRSerializer.load(db_root, db=dbid, key=cc.EVAL, uuid_=uuid_) time_ds, meta_t = XRSerializer.load(db_root, db=dbid, key=cc.TIME, uuid_=uuid_) assert meta == meta_t, "meta data should between time and eval files" suggest_ds, meta_t = XRSerializer.load(db_root, db=dbid, key=cc.SUGGEST_LOG, uuid_=uuid_) assert meta == meta_t, "meta data should between suggest and eval files" # Get signature to pass out as well _, sig = meta["signature"] logger.info(meta) logger.info(sig) # Build the new indices for combined data, this could be put in function for easier testing eval_args = unserializable_dict(meta["args"]) # Unpack meta-data test_case = SklearnModel.test_case_str( eval_args[CmdArgs.classifier], eval_args[CmdArgs.data], eval_args[CmdArgs.metric] ) optimizer = str_join_safe( ARG_DELIM, (eval_args[CmdArgs.optimizer], eval_args[CmdArgs.opt_rev], eval_args[CmdArgs.rev]) ) args_uuid = eval_args[CmdArgs.uuid] # Check UUID sanity assert isinstance(args_uuid, str) assert args_uuid == uuid_.hex, "UUID meta-data does not match filename" assert args_uuid not in uuids_seen, "uuids being reused between studies" uuids_seen.add(args_uuid) # Return key -> data so this generator can be iterated over in dict like manner meta_data = (test_case, optimizer, args_uuid) data = (perf_ds, time_ds, suggest_ds, sig) yield meta_data, data
def main(): """See README for instructions on calling aggregate. """ description = "Aggregate study results across functions and optimizers" args = parse_args(agg_parser(description)) logger.setLevel(logging.INFO) # Note this is the module-wide logger if args[CmdArgs.verbose]: logger.addHandler(logging.StreamHandler()) # Get list of UUIDs uuid_list = XRSerializer.get_uuids(args[CmdArgs.db_root], db=args[CmdArgs.db], key=cc.EVAL) uuid_list_ = XRSerializer.get_uuids(args[CmdArgs.db_root], db=args[CmdArgs.db], key=cc.TIME) assert uuid_list == uuid_list_, "UUID list does not match between time and eval results" uuid_list_ = XRSerializer.get_uuids(args[CmdArgs.db_root], db=args[CmdArgs.db], key=cc.SUGGEST_LOG) assert uuid_list == uuid_list_, "UUID list does not match between suggest log and eval results" # Get iterator of all experiment data dumps, load in and process, and concat data_G = load_experiments(uuid_list, args[CmdArgs.db_root], args[CmdArgs.db]) all_perf, all_time, all_suggest, all_sigs = concat_experiments(data_G, ravel=args[CmdArgs.ravel]) # Check the concat signatures make are coherent sig_errs, signatures_median = analyze_signatures(all_sigs) logger.info("Signature errors:\n%s" % sig_errs.to_string()) print(json.dumps({"exp-agg sig errors": sig_errs.T.to_dict()})) # Dump and save it all out logger.info("saving") meta = {"args": serializable_dict(args), "signature": signatures_median} XRSerializer.save_derived(all_perf, meta, args[CmdArgs.db_root], db=args[CmdArgs.db], key=EVAL_RESULTS) XRSerializer.save_derived(all_time, meta, args[CmdArgs.db_root], db=args[CmdArgs.db], key=TIME_RESULTS) for test_case, ds in all_suggest.items(): XRSerializer.save_derived(ds, meta, args[CmdArgs.db_root], db=args[CmdArgs.db], key=test_case) logger.info("done") if __name__ == "__main__": main() # pragma: main