Source code for gp_emulator.multivariate_gp

# /usr/bin/env python
"""A multivariate Gaussian Process emulator through principal components compression.
This code provides a convenient way of emulating a multivariate model output (e.g. a
time series, or a spatial/temporal stack) by reducing it through principal component
analysis. Each of the selected PCs is then modelled through a Gaussian Process

Usage
-----

We need two arrays: one that stores the multivariate model output(`X`) and one that
stores the model parameters that produced said outputs (`y`). The sizes of these
arrays are important: `X` is `(N_training x N_full)` (i.e, `N_training` rows and
`N_full` elements in each column). `y` is `( N_train, N_params )`. Then simply

.. code-block:: python

   emulator = MultivariateEmulator( X=X, y=y )
   prediction, der_prediction = emulator.predict ( y[0] ) # Say

Saving emulators for future use
--------------------------------

If you want to save an emulator for re-use, you can do that easily by using the
`dump_emulator` method with a filename. You can then instantiate the
`MultivariateEmulator` class setting only the `dump` keyword in the constructor
to be the saved filename, and the emulator will be recreated.

"""

# import h5py

import numpy as np

from .GaussianProcess import GaussianProcess


[docs]class MultivariateEmulator(object): def __init__( self, dump=None, X=None, y=None, hyperparams=None, model="", sza=0, vza=0, raa=0, thresh=0.98, n_tries=5, ): """Constructor The constructor takes an array of model outputs `X`, and a vector of the parameters that served as the inputs for this model runs `y`. The sizes of these two arrays are `( N_training, N_full )` for `X` and `( N_train, N_params )` for `y`. `X` is decomposed into its PCs and the first `self.n_pcs` that explain `thresh` of the variance are selected. If `hyperparams` is set to `None`, normal training of the individual `self.n_pcs` GPs is carried out. If `hyperparams` is specified and it is the right shape `( N_params + 2, self.n_pcs )`, then there's no need for training the emulators (might be more efficient). Parameters ---------- dump: str A filename that will be read to populate X, y and hyperparams X: array ( N_train, N_full ) The modelled output array for training y: array (N_train, N_param ) The corresponding training parameters for `X` hyperparams: array ( N_params + 2, N_PCs ) The hyperparameters for the relevant GPs thresh: float The threshold at where to cutoff the percentage of variance explained. """ if dump is not None: if X is None and y is None: if dump.find(".h5") > 0 or dump.find(".hdf5") > 0: raise IOError("I can't be bothered working with HDF5 files") elif dump.find(".npz"): self.emulator_file = dump f = np.load(dump) X = f["X"] y = f["y"] hyperparams = f["hyperparams"] thresh = f["thresh"] if "basis_functions" in dict(f): basis_functions = f["basis_functions"] n_pcs = f["n_pcs"] f.close() else: pass else: raise ValueError("You specified both a dump file and X and y") else: if X is None or y is None: raise ValueError("Need to specify both X and y") else: assert X.shape[0] == y.shape[0] assert X.ndim == 2 assert y.ndim == 2 basis_functions = None self.X_train = X self.y_train = y self.thresh = thresh if basis_functions is None: print("Decomposing the input dataset into basis functions...") self.calculate_decomposition(X, thresh) print("Done!\n ====> Using %d basis functions" % self.n_pcs) basis_functions = self.basis_functions n_pcs = self.n_pcs self.n_pcs = n_pcs self.basis_functions = basis_functions if hyperparams is not None: assert (y.shape[1] + 2 == hyperparams.shape[0]) and ( self.n_pcs == hyperparams.shape[1] ) self.train_emulators(X, y, hyperparams=hyperparams, n_tries=n_tries)
[docs] def dump_emulator(self, fname, model_name, sza, vza, raa): """Save emulator to file for reuse Saves the emulator to a file (`.npz` format) for reuse. Parameters ---------- fname: str The output filename """ sza = int(sza) vza = int(vza) raa = int(raa) if fname.find(".npz") < 0 and ( fname.find("h5") >= 0 or fname.find(".hdf") >= 0 ): raise IOError("I can't be bothered working with HDF5 files") else: np.savez_compressed( fname, X=self.X_train, y=self.y_train, hyperparams=self.hyperparams, thresh=self.thresh, basis_functions=self.basis_functions, n_pcs=self.n_pcs, ) print("Emulator safely saved")
[docs] def calculate_decomposition(self, X, thresh): """Does PCA decomposition This simply does a PCA decomposition using the SVD. Note that if `X` is very large, more efficient methods of doing this might be required. The number of PCs to retain is selected as those required to estimate `thresh` of the total variance. Parameters ----------- X: array ( N_train, N_full ) The modelled output array for training thresh: float The threshold at where to cutoff the percentage of variance explained. """ U, s, V = np.linalg.svd(X, full_matrices=False) pcnt_var_explained = s.cumsum() / s.sum() self.basis_functions = V[pcnt_var_explained <= thresh] self.n_pcs = np.sum(pcnt_var_explained <= thresh)
[docs] def train_emulators(self, X, y, hyperparams, n_tries=2): """Train the emulators This sets up the required emulators. If necessary (`hypeparams` is set to None), it will train the emulators. X: array ( N_train, N_full ) The modelled output array for training y: array (N_train, N_param ) The corresponding training parameters for `X` hyperparams: array ( N_params + 2, N_PCs ) The hyperparameters for the relevant GPs """ self.emulators = [] train_data = self.compress(X) self.hyperparams = np.zeros((2 + y.shape[1], self.n_pcs)) for i in range(self.n_pcs): self.emulators.append(GaussianProcess(np.atleast_2d(y), train_data[i])) if hyperparams is None: print("\tFitting GP for basis function %d" % i) self.hyperparams[:, i] = self.emulators[i].learn_hyperparameters( n_tries=n_tries )[1] else: self.hyperparams[:, i] = hyperparams[:, i] self.emulators[i]._set_params(hyperparams[:, i])
[docs] def hessian(self, x): """A method to approximate the Hessian. This method builds on the fact that the spectral emulators are a linear combination of individual emulators. Therefore, we can calculate the Hessian of the spectral emulator as the sum of the individual products of individual Hessians times the spectral basis functions. """ the_hessian = np.zeros((len(x), len(x), len(self.basis_functions[0]))) x = np.atleast_2d(x) for i in range(self.n_pcs): # Calculate the Hessian of the weight this_hessian = self.emulators[i].hessian(x) # Add this hessian contribution once it's been scaled by the # relevant basis function. the_hessian = ( the_hessian + this_hessian.squeeze()[:, :, None] * self.basis_functions[i] ) return the_hessian
[docs] def compress(self, X): """Project full-rank vector into PC basis""" return X.dot(self.basis_functions.T).T
[docs] def predict(self, y, do_unc=True, do_deriv=True): """Prediction of input vector The individual GPs predict the PC weights, and these are used to reconstruct the value of the function at a point `y`. Additionally, the derivative of the function is also calculated. This is returned as a `( N_params, N_full )` vector (i.e., it needs to be reduced along axis 1) Parameters: y: array The value of the prediction point do_deriv: bool Whether derivatives are required or not do_unc: bool Whether to calculate the uncertainty or not Returns: A tuple with the predicted mean, predicted variance and patial derivatives. If any of the latter two elements have been switched off by `do_deriv` or `do_unc`, they'll be returned as `None`. """ fwd = np.zeros(self.basis_functions[0].shape[0]) y = np.atleast_2d(y) # Just in case deriv = None unc = None if do_deriv: deriv = np.zeros((y.shape[1], self.basis_functions.shape[1])) if do_unc: unc = np.zeros_like(fwd) for i in range(self.n_pcs): pred_mu, pred_var, grad = self.emulators[i].predict( y, do_unc=do_unc, do_deriv=do_deriv ) fwd += pred_mu * self.basis_functions[i] if do_deriv: deriv += (grad.squeeze()*self.basis_functions[i][:, None]).T if do_unc: unc += pred_var * self.basis_functions[i] try: return fwd.squeeze(), unc.squeeze(), deriv except AttributeError: return fwd.squeeze(), unc, deriv
###if __name__ == "__main__": #### read LUT to test this bits ###f = np.load("test_LUT.npz") ###angles = f["angles"] ###train_params = f["train_params"] ###train_brf = f["train_brf"] ###def unpack(params): ###"""Input a dictionary and output keys and array""" ###inputs = [] ###keys = np.sort(list(params.keys())) ###for i, k in enumerate(keys): ###inputs.append(params[k]) ###inputs = np.array(inputs).T ###return inputs, keys ###def pack(inputs, keys): ###"""Input keys and array and output dict""" ###params = {} ###for i, k in enumerate(keys): ###params[k] = inputs[i] ###return params ###train_paramsoot, keys = unpack(train_params.tolist()) ###mv_em = MultivariateEmulator(X=train_brf, y=train_paramsoot) ###y_test = np.array([1., 0.5, 0.5, 1.65, 0.5, 0.5, 0.73, 0.89, 0.44, 0.42, 0.1]) ###hypers = mv_em.hyperparams ###mv_em2 = MultivariateEmulator(X=train_brf, y=train_paramsoot, hyperparams=hypers) ###y_arr = y_test * 1 ###for i in range(8): ###y_arr[-1] = 0.05 + 0.1 * i ###plt.plot(mv_em.predict(y_arr)[0], "-r", lw=2) ###plt.plot(mv_em2.predict(y_arr)[0], "-k", lw=1) ###mv_em.dump_emulator("emulator1.npz") ###plt.figure() ###new_em = MultivariateEmulator(dump="emulator1.npz") ###for i in range(8): ###y_arr[-1] = 0.05 + 0.1 * i ###plt.plot(mv_em.predict(y_arr)[0], "-r", lw=2) ###plt.plot(new_em.predict(y_arr)[0], "-k", lw=1)