"""Predictor implementations."""
import copy
import logging
from abc import ABC, abstractmethod
from time import time
from typing import Callable, List, Tuple, Union
import numpy as np
from scipy.stats import pearsonr
try:
import sklearn.gaussian_process as skl_gp
import sklearn.linear_model as skl_lm
import sklearn.model_selection as skl_ms
import sklearn.neural_network as skl_nn
except ImportError:
skl_lm = skl_gp = skl_nn = skl_ms = None
logger = logging.getLogger("ABC.Predictor")
[docs]
class Predictor(ABC):
"""Generic predictor model class.
A predictor should define:
* `fit(x, y, w=None)` to fit the model on a sample of data x and outputs y,
where x has shape (n_sample, n_feature), and
y has shape (n_sample, n_out).
Further, gets as a third argument the sample weights
if `weight_samples` is set. Not all predictors support this.
* `predict(X)` to predict outputs of shape (n_out,), where X has shape
(n_sample, n_feature).
"""
[docs]
@abstractmethod
def fit(self, x: np.ndarray, y: np.ndarray, w: np.ndarray = None) -> None:
"""Fit the predictor to labeled data.
Parameters
----------
x: Samples, shape (n_sample, n_feature).
y: Targets, shape (n_sample, n_out).
w: Weights, shape (n_sample,).
"""
[docs]
@abstractmethod
def predict(self, x: np.ndarray, normalize: bool = False) -> np.ndarray:
"""Predict outputs using the model.
Parameters
----------
x:
Samples, shape (n_sample, n_feature) or (n_feature,).
normalize:
Whether outputs should be normalized, or on the original scale.
Returns
-------
y: Predicted targets, shape (n_sample, n_out).
"""
def wrap_fit_log(fit):
"""Wrapper for fit logging."""
def wrapped_fun(self, x: np.ndarray, y: np.ndarray, w: np.ndarray):
start_time = time()
# actual fitting
ret = fit(self, x, y, w)
logger.info(f"Fitted {self} in {time() - start_time:.2f}s")
if self.log_pearson:
# shape: n_sample, n_out
y_pred = self.predict(x)
coeffs = [
pearsonr(y[:, i], y_pred[:, i])[0]
for i in range(y_pred.shape[1])
]
logger.info(
" ".join(
[
"Pearson correlations:",
*[f"{coeff:.3f}" for coeff in coeffs],
]
),
)
return ret
return wrapped_fun
[docs]
class SimplePredictor(Predictor):
"""Wrapper around generic predictor routines."""
[docs]
def __init__(
self,
predictor,
normalize_features: bool = True,
normalize_labels: bool = True,
joint: bool = True,
weight_samples: bool = False,
log_pearson: bool = True,
):
"""
Parameters
----------
predictor:
Predictor model to use, fulfilling the predictor contract.
normalize_features:
Whether to apply z-score normalization to the input data.
normalize_labels:
Whether to apply z-score normalization to the parameters.
joint:
Whether the predictor learns one model for all targets, or
separate models per target.
weight_samples:
Whether to use importance sampling weights. Not that not all
predictors may support weighted samples.
log_pearson:
Whether to log Pearson correlation coefficients after fitting.
"""
self.predictor = predictor
# only used if not joint
self.single_predictors: Union[List, None] = None
self.normalize_features: bool = normalize_features
self.normalize_labels: bool = normalize_labels
self.joint: bool = joint
self.weight_samples: bool = weight_samples
self.log_pearson: bool = log_pearson
# indices to use
self.use_ixs: Union[np.ndarray, None] = None
# z-score normalization coefficients
self.mean_x: Union[np.ndarray, None] = None
self.std_x: Union[np.ndarray, None] = None
self.mean_y: Union[np.ndarray, None] = None
self.std_y: Union[np.ndarray, None] = None
[docs]
@wrap_fit_log
def fit(self, x: np.ndarray, y: np.ndarray, w: np.ndarray = None) -> None:
"""Fit the predictor to labeled data.
Parameters
----------
x: Samples, shape (n_sample, n_feature).
y: Targets, shape (n_sample, n_out).
w: Weights, shape (n_sample,).
"""
# remove trivial features
self.set_use_ixs(x=x)
x = x[:, self.use_ixs]
# normalize features
if self.normalize_features:
self.mean_x = np.mean(x, axis=0)
self.std_x = np.std(x, axis=0)
x = (x - self.mean_x) / self.std_x
# normalize labels
if self.normalize_labels:
self.mean_y = np.mean(y, axis=0)
self.std_y = np.std(y, axis=0)
y = (y - self.mean_y) / self.std_y
if self.joint:
# fit a model with all parameters as joint response variables
if self.weight_samples:
self.predictor.fit(x, y, w)
else:
self.predictor.fit(x, y)
else:
# set up predictors
if self.single_predictors is None:
n_par = y.shape[1]
self.single_predictors: List = [
copy.deepcopy(self.predictor) for _ in range(n_par)
]
# fit a model for each parameter separately
for predictor, y_ in zip(self.single_predictors, y.T):
if self.weight_samples:
predictor.fit(x, y_, w)
else:
predictor.fit(x, y_)
[docs]
def predict(self, x: np.ndarray, normalize: bool = False) -> np.ndarray:
"""Predict outputs using the model.
Parameters
----------
x:
Samples, shape (n_sample, n_feature) or (n_feature,).
normalize:
Whether outputs should be normalized, or on the original scale.
Returns
-------
y: Predicted targets, shape (n_sample, n_out).
"""
# to 2d matrix
if x.ndim == 1:
x = x.reshape(1, -1)
# remove trivial features
x = x[:, self.use_ixs]
# normalize features
if self.normalize_features:
x = (x - self.mean_x) / self.std_x
if self.joint:
y = self.predictor.predict(x)
else:
y = [
predictor.predict(x).flatten()
for predictor in self.single_predictors
]
y = np.array(y).T
if not normalize and self.normalize_labels:
y = y * self.std_y + self.mean_y
# some predictors may return flat arrays if n_out == 1
if y.ndim == 1:
y = y.reshape(-1, 1)
return y
[docs]
def set_use_ixs(self, x: np.ndarray, log: bool = True) -> None:
"""Set feature indices to use.
Parameters
----------
x: Feature matrix, shape (n_sample, n_feature).
log: Whether to log.
"""
# remove trivial features
self.use_ixs = np.any(x != x[0], axis=0)
# log omitted indices
if log and not self.use_ixs.all():
logger.info(
"Ignore trivial features "
f"{list(np.flatnonzero(~self.use_ixs))}"
)
def __repr__(self) -> str:
rep = f"<{self.__class__.__name__} predictor={self.predictor}"
# print everything that is customized
if not self.normalize_features:
rep += f" normalize_features={self.normalize_features}"
if not self.normalize_labels:
rep += f" normalize_labels={self.normalize_labels}"
if not self.joint:
rep += f" joint={self.joint}"
if self.weight_samples:
rep += f" weight_samples={self.weight_samples}"
return rep + ">"
[docs]
class LinearPredictor(SimplePredictor):
"""Linear predictor model.
Based on [#fearnheadprangle2012]_.
.. [#fearnheadprangle2012]
Fearnhead, Paul, and Dennis Prangle.
"Constructing summary statistics for approximate Bayesian computation:
Semiāautomatic approximate Bayesian computation."
Journal of the Royal Statistical Society: Series B
(Statistical Methodology) 74.3 (2012): 419-474.
"""
[docs]
def __init__(
self,
normalize_features: bool = True,
normalize_labels: bool = True,
joint: bool = True,
weight_samples: bool = False,
log_pearson: bool = True,
**kwargs,
):
# check installation
if skl_lm is None:
raise ImportError(
"This predictor requires an installation of scikit-learn. "
"Install e.g. via `pip install pyabc[scikit-learn]`"
)
predictor = skl_lm.LinearRegression(**kwargs)
super().__init__(
predictor=predictor,
normalize_features=normalize_features,
normalize_labels=normalize_labels,
joint=joint,
weight_samples=weight_samples,
log_pearson=log_pearson,
)
[docs]
def fit(self, x: np.ndarray, y: np.ndarray, w: np.ndarray = None) -> None:
super().fit(x, y, w)
# log
if self.joint:
logger.debug(
"Linear regression coefficients (n_target, n_feature):\n"
f"{self.predictor.coef_}"
)
else:
for i_pred, predictor in enumerate(self.single_predictors):
logger.debug(
"Linear regression coefficients (n_target, n_feature):\n"
f"for predictor {i_pred}: {predictor.coef_}"
)
[docs]
class LassoPredictor(SimplePredictor):
"""Lasso (least absolute shrinkage and selection) model.
Linear model with l1 regularization.
"""
[docs]
def __init__(
self,
normalize_features: bool = True,
normalize_labels: bool = True,
joint: bool = True,
log_pearson: bool = True,
**kwargs,
):
"""Additional keyword arguments are passed on to the model."""
# check installation
if skl_lm is None:
raise ImportError(
"This predictor requires an installation of scikit-learn. "
"Install e.g. via `pip install pyabc[scikit-learn]`"
)
predictor = skl_lm.Lasso(**kwargs)
super().__init__(
predictor=predictor,
normalize_features=normalize_features,
normalize_labels=normalize_labels,
joint=joint,
weight_samples=False,
log_pearson=log_pearson,
)
[docs]
class GPPredictor(SimplePredictor):
"""Gaussian process model.
Similar to [#borowska2021]_.
.. [#borowska2021]
Borowska, Agnieszka, Diana Giurghita, and Dirk Husmeier.
"Gaussian process enhanced semi-automatic approximate Bayesian
computation: parameter inference in a stochastic differential equation
system for chemotaxis."
Journal of Computational Physics 429 (2021): 109999.
"""
[docs]
def __init__(
self,
kernel: Union[Callable, skl_gp.kernels.Kernel] = None,
normalize_features: bool = True,
normalize_labels: bool = True,
joint: bool = True,
log_pearson: bool = True,
**kwargs,
):
"""
Parameters
----------
kernel:
Covariance kernel. Can be either a kernel, o
"""
# check installation
if skl_gp is None:
raise ImportError(
"This predictor requires an installation of scikit-learn. "
"Install e.g. via `pip install pyabc[scikit-learn]`"
)
# default kernel
if kernel is None:
kernel = GPKernelHandle()
self.kernel: Union[Callable, skl_gp.kernels.Kernel] = kernel
self.kwargs = kwargs
super().__init__(
predictor=None,
normalize_features=normalize_features,
normalize_labels=normalize_labels,
joint=joint,
weight_samples=False,
log_pearson=log_pearson,
)
[docs]
def fit(self, x: np.ndarray, y: np.ndarray, w: np.ndarray = None) -> None:
# need to recreate the model
# set indices to keep
self.set_use_ixs(x=x, log=False)
# kernel
kernel = self.kernel
if not isinstance(kernel, skl_gp.kernels.Kernel):
n_in = sum(self.use_ixs)
kernel = kernel(n_in)
self.predictor = skl_gp.GaussianProcessRegressor(
kernel=kernel, **self.kwargs
)
super().fit(x=x, y=y, w=w)
[docs]
class GPKernelHandle:
"""Convenience class for Gaussian process kernel construction.
Allows to create kernels depending on problem dimensions.
"""
# kernels supporting features specific length scales
ARD_KERNELS = ["RBF", "Matern"]
[docs]
def __init__(
self,
kernels: List[str] = None,
kernel_kwargs: List[dict] = None,
ard: bool = True,
):
"""
Parameters
----------
kernels:
Names of `sklearn.kernel` covariance kernels. Defaults to a
radial basis function (a.k.a. squared exponential) kernel "RBF"
and a "WhiteKernel" to explain noise in the data. The resulting
kernel is the sum of all kernels.
kernel_kwargs:
Optional arguments passed to the kernel constructors.
ard:
Automatic relevance determination by assigning a separate length
scale per input variable. Only supported by some kernels,
currently "RBF" and "Matern".
If set to True, the capable kernels are automatically informed.
It the underlying scitki-learn toolbox extends support, this
list needs to be updated.
"""
if kernels is None:
kernels = ["RBF", "WhiteKernel"]
self.kernels: List[str] = kernels
if kernel_kwargs is None:
kernel_kwargs = [{} for _ in self.kernels]
self.kernel_kwargs = kernel_kwargs
self.ard: bool = ard
[docs]
def __call__(self, n_in: int) -> "skl_gp.kernels.Kernel":
"""
Parameters
----------
n_in: Input (feature) dimension.
Returns
-------
kernel: Kernel created from inputs.
"""
kernels = [
getattr(skl_gp.kernels, kernel)(
length_scale=np.ones(n_in), **kernel_kwargs
)
if self.ard and kernel in GPKernelHandle.ARD_KERNELS
else getattr(skl_gp.kernels, kernel)(**kernel_kwargs)
for kernel, kernel_kwargs in zip(self.kernels, self.kernel_kwargs)
]
return sum(kernels)
[docs]
class MLPPredictor(SimplePredictor):
"""Multi-layer perceptron regressor predictor.
See e.g. [#jiang2017]_.
.. [#jiang2017]
Jiang, Bai, et al.
"Learning summary statistic for approximate Bayesian computation via
deep neural network."
Statistica Sinica (2017): 1595-1618.
"""
[docs]
def __init__(
self,
normalize_features: bool = True,
normalize_labels: bool = True,
joint: bool = True,
hidden_layer_sizes: Union[Tuple[int, ...], Callable] = None,
log_pearson: bool = True,
**kwargs,
):
"""Additional keyword arguments are passed on to the model.
Parameters
----------
hidden_layer_sizes:
Network hidden layer sizes. Can be either a tuple of ints, or
a callable taking input dimension, output dimension, and number
of samples and returning a tuple of ints.
The :class:`HiddenLayerSize` provides some useful defaults.
"""
# check installation
if skl_nn is None:
raise ImportError(
"This predictor requires an installation of scikit-learn. "
"Install e.g. via `pip install pyabc[scikit-learn]`"
)
if hidden_layer_sizes is None:
hidden_layer_sizes = HiddenLayerHandle()
self.hidden_layer_sizes = hidden_layer_sizes
self.kwargs = {
'solver': 'lbfgs',
'max_iter': 10000,
'early_stopping': True,
}
self.kwargs.update(kwargs)
super().__init__(
predictor=None,
normalize_features=normalize_features,
normalize_labels=normalize_labels,
joint=joint,
weight_samples=False,
log_pearson=log_pearson,
)
[docs]
def fit(self, x: np.ndarray, y: np.ndarray, w: np.ndarray = None) -> None:
# need to recreate the model
# set indices to keep
self.set_use_ixs(x=x, log=False)
# hidden layer sizes
hidden_layer_sizes = self.hidden_layer_sizes
if callable(hidden_layer_sizes):
n_in, n_out, n_sample = sum(self.use_ixs), y.shape[1], x.shape[0]
hidden_layer_sizes = hidden_layer_sizes(n_in, n_out, n_sample)
self.predictor = skl_nn.MLPRegressor(
hidden_layer_sizes=hidden_layer_sizes, **self.kwargs
)
super().fit(x=x, y=y, w=w)
[docs]
class HiddenLayerHandle:
"""Convenience class for various layer size strategies.
Allows to define sizes depending on problem dimensions.
"""
HEURISTIC = "heuristic"
MEAN = "mean"
MAX = "max"
METHODS = [HEURISTIC, MEAN, MAX]
[docs]
def __init__(
self,
method: Union[str, List[str]] = MEAN,
n_layer: int = 1,
max_size: int = np.inf,
alpha: float = 1.0,
):
"""
Parameters
----------
method:
Method to use. Can be any of:
* "heuristic" bases the number of neurons on the number of samples
to avoid overfitting. See
https://stats.stackexchange.com/questions/181.
* "mean" takes the mean of input and output dimension.
* "max" takes the maximum of input and output dimension.
Additionally, a list of methods can be passed, in which case
the minimum over all is used.
n_layer:
Number of layers.
max_size:
Maximum layer size. Applied to all strategies.
alpha:
Factor used in "heuristic". The higher, the fewer neurons.
Recommended is a value in the range 2-10.
"""
if isinstance(method, str):
method = [method]
for m in method:
if m not in HiddenLayerHandle.METHODS:
raise ValueError(
f"Method {m} must be in {HiddenLayerHandle.METHODS}"
)
self.method = method
self.n_layer = n_layer
self.max_size = max_size
self.alpha = alpha
[docs]
def __call__(
self,
n_in: int,
n_out: int,
n_sample: int,
) -> Tuple[int, ...]:
"""
Parameters
----------
n_in: Input (feature) dimension.
n_out: Output (target) dimension.
n_sample: Number of samples.
Returns
-------
hidden_layer_sizes: Tuple of hidden layer sizes.
"""
neurons_arr = []
for method in self.method:
if method == HiddenLayerHandle.HEURISTIC:
# number of neurons
neurons = n_sample / (self.alpha * (n_in + n_out))
# divide by number of layers
neurons /= self.n_layer
elif method == HiddenLayerHandle.MEAN:
neurons = 0.5 * (n_in + n_out)
elif method == HiddenLayerHandle.MAX:
neurons = max(n_in, n_out)
else:
raise ValueError(f"Did not recognize method {self.method}.")
neurons_arr.append(neurons)
# take minimum over proposed values
neurons_per_layer = min(neurons_arr)
# cap
neurons_per_layer = min(neurons_per_layer, self.max_size)
# only >=2 dim makes sense, round
neurons_per_layer = int(max(2.0, neurons_per_layer))
layer_sizes = tuple(neurons_per_layer for _ in range(self.n_layer))
logger.info(f"Layer sizes: {layer_sizes}")
return layer_sizes
[docs]
class ModelSelectionPredictor(Predictor):
"""Model selection over a set of predictors.
Picks the model with minimum k-fold cross valdation score and retrains on
full data set.
"""
CROSS_VALIDATION = "cross_validation"
TRAIN_TEST_SPLIT = "train_test_split"
SPLIT_METHODS = [CROSS_VALIDATION, TRAIN_TEST_SPLIT]
[docs]
def __init__(
self,
predictors: List[Predictor],
split_method: str = TRAIN_TEST_SPLIT,
n_splits: int = 5,
test_size: float = 0.2,
f_score: Callable = None,
):
"""
Parameters
----------
predictors:
Set of predictors over which to perform model selection.
split_method:
Method how to split the data set into training and test data,
can be "cross_validation" for a full `n_splits` fold cross
validation, or "train_test_split" for a single separation of a
test set of size `test_size`.
n_splits:
Number of splits to use in k-fold cross validation.
test_size:
Fraction of samples to randomly pick as test set, when using a
single training and test set.
f_score:
Score function to assess prediction quality. Defaults to
root mean square error normalized by target standard variation.
Takes arguments y1, y2, std for prediction, ground truth, and
standard variation, and returns the score as a float.
"""
super().__init__()
self.predictors: List[Predictor] = predictors
if split_method not in ModelSelectionPredictor.SPLIT_METHODS:
raise ValueError(
f"Split method {split_method} must be in "
f"{ModelSelectionPredictor.SPLIT_METHODS}",
)
self.split_method: str = split_method
self.n_splits: int = n_splits
self.test_size: float = test_size
if f_score is None:
self.f_score = root_mean_square_error
# holds the chosen predictor model
self.chosen_one: Union[Predictor, None] = None
[docs]
def fit(self, x: np.ndarray, y: np.ndarray, w: np.ndarray = None) -> None:
# output normalization
std_y = np.std(y, axis=0)
n_sample = x.shape[0]
if self.split_method == ModelSelectionPredictor.CROSS_VALIDATION:
# k-fold cross validation
kfold = skl_ms.KFold(n_splits=self.n_splits, shuffle=True)
splits = kfold.split(np.arange(n_sample))
n_splits = self.n_splits
else:
# a single training and test set
splits = skl_ms.train_test_split(
np.arange(n_sample), test_size=self.test_size
)
# as iterable
splits = [splits]
n_splits = 1
# scores on training and test set
scores_test = np.empty((len(self.predictors), n_splits))
# for debugging
scores_train = np.empty((len(self.predictors), n_splits))
# iterate over training and test sets
for i_split, (train_ixs, test_ixs) in enumerate(splits):
# training and test sets
x_train, y_train = x[train_ixs], y[train_ixs]
w_train = w
if w is not None:
w_train = w[train_ixs]
x_test, y_test = x[test_ixs], y[test_ixs]
# iterate over predictors
for i_pred, predictor in enumerate(self.predictors):
# fit predictor on training set
predictor.fit(x=x_train, y=y_train, w=w_train)
# test score of z-score normalized variables
y_test_pred = predictor.predict(x=x_test)
scores_test[i_pred, i_split] = self.f_score(
y1=y_test_pred,
y2=y_test,
sigma=std_y,
)
# for debugging, log training scores
y_train_pred = predictor.predict(x=x_train)
scores_train[i_pred, i_split] = self.f_score(
y1=y_train_pred,
y2=y_train,
sigma=std_y,
)
# the score of a predictor is the mean over all folds
scores_test = np.mean(scores_test, axis=1)
scores_train = np.mean(scores_train, axis=1)
# logging
for predictor, score_train, score_test in zip(
self.predictors, scores_train, scores_test
):
logger.info(
f"Test score {score_test:.3e} (train {score_train:.3e}) for "
f"{predictor}"
)
# best predictor has minimum score
self.chosen_one = self.predictors[np.argmin(scores_test)]
# retrain chosen model on full data set
self.chosen_one.fit(x=x, y=y, w=w)
[docs]
def predict(self, x: np.ndarray, normalize: bool = False) -> np.ndarray:
# predict from the chosen model
return self.chosen_one.predict(x=x, normalize=normalize)
[docs]
def root_mean_square_error(
y1: np.ndarray,
y2: np.ndarray,
sigma: Union[np.ndarray, float],
) -> float:
"""Root mean square error of `y1 - y2 / sigma`.
Parameters
----------
y1: Model simulations, shape (n_sample, n_par).
y2: Ground truth values, shape (n_sample, n_par).
sigma: Normalizations, shape (n_sample,) or (1,).
Returns
-------
val: The normalized root mean square error value.
"""
return np.sqrt(
np.sum(
((y1 - y2) / sigma) ** 2,
)
/ y1.size,
)
[docs]
def root_mean_square_relative_error(
y1: np.ndarray,
y2: np.ndarray,
) -> float:
"""Root mean square relative error of `(y1 - y2) / y2`.
Note that this may behave badly for ground truth parameters close to 0.
Parameters
----------
y1: Model simulations.
y2: Ground truth values.
Returns
-------
val: The normalized root mean square relative error value.
"""
return np.sqrt(
np.sum(
((y1 - y2) / y2) ** 2,
)
/ y1.size,
)