import sys
import numpy as np
import scipy.linalg
from scipy.optimize import newton
from ..decorators import flatten_batch, get_defaults_from_self
from ..stats import covariance_from_similarity, similarity_exp
from ..utils import concatenate
from .selector import UncertaintySelector, optimize_batch
# pylint: disable=invalid-name
@flatten_batch(bdim=-2, to_flatten=2)
def solve_triangular_torch(A, b, lower=False, trans=0, **kwargs):
"""Solve a linear system using Pytorch."""
import torch
A = torch.transpose(torch.tensor(A), -2, -1) if trans == 1 or trans == "T" else torch.tensor(A)
return torch.linalg.solve_triangular(
A, b if isinstance(b, torch.Tensor) else torch.tensor(b), upper=not lower, **kwargs
)
# pylint: disable=invalid-name
@flatten_batch(bdim=-2, to_flatten=2)
def solve_triangular_scipy(A, b, **kwargs):
"""Solve a linear system using SciPy."""
soln = np.empty(b.shape)
for i in range(A.shape[0]):
soln[i] = scipy.linalg.solve_triangular(A[i], b[i], check_finite=False, **kwargs)
return soln
HAS_TORCH = None
def solve_triangular(*args, **kwargs):
global HAS_TORCH
if HAS_TORCH is None:
HAS_TORCH = "torch" in sys.modules
return (solve_triangular_torch if HAS_TORCH else solve_triangular_scipy)(
*args, **kwargs
)
[docs]class CovarianceSelector(UncertaintySelector):
"""
Batch selector which looks for batches with large total covariance, i.e., large
joint entropy.
:param model: An instance of :class:`models.CovarianceRegressor`. Will be used to
determine prediction covariances for proposed batches.
:param samples: The sample pool to select from. Can be a numpy-style addressable
array (with first dimension indexing samples, and other dimensions indexing
features)---note that :class:`data.Dataset` serves this purpose---or an instance of
:class:`sample_generation.SampleGenerator`, in which case the `num_samples` parameter is
in effect.
:param num_samples: If a :class:`SampleGenerator` has been provided via the `samples`
parameter, then at the start of a call to :meth:`.select`, `num_samples`
samples will be drawn from the `SampleGenerator`, or as many samples as the
:class:`SampleGenerator` can provide, whichever is less. Defaults to :Inf:, i.e., draws
as many samples as available.
:param batch_size: Size of the batch to select.
:param regularization: The diagonal of the covariance matrix will be multiplied by
(1 + regularization), after being computed by the model. This ensures that
the coviarance matrix is positive definite (as long as all the covariances
are positive). Defaults to .05
This parameter is particularly important if the covariance is computed from
an ensemble of models, and the ensemble is not very large: for a given batch
of N samples, and a model ensemble size of M, the distribution of predictions
will consist of M points in an N-dimensional space. If M < N, the covariance
of the batch predictions is sure to have determinant 0, and no comparisons
can be made (without regularization). Even if M >= N, a relatively small
ensemble size can produce numerical instability in the covariances, which
regularization smooths out.
:param normalize: If True, scales the (co)variances by the inverse-square-length
of the embedding vector (retrieved by a call to `model.embedding`), modulo a
small constant. This prevents the algorithm from just seeking out those inputs
which give large embedding vectors. Defaults to False.
If the model has not implemented an `.embedding` method, setting
`normalize = True` will raise a `NotImplementedError` when you call
:meth:`.select`. :class:`LaplaceApproxRegressor` and subclasses have implemented
:meth:`.embedding`, as have some others.
:param normalize_epsilon: In the normalization step described above, variances are
divided by |embedding_length|² + ε, where
ε = normalize_epsilon * MEAN_SQUARE(all embedding lengths)
Defaults to 1e-3. Should be related to measurement error.
:param similarity: The effective covariance matrix (before regularization) will be
(1 - similarity) * covariance (computed from the model) + similarity * S,
where S is a "synthetic" covariance computed from a similarity matrix, as
follows:
First, a similarity matrix is computed: each feature dimension in the data, X,
will be normalized to have variance of 1. Then, a euclidean distance matrix is
computed for the whole dataset (divided by sqrt(N), where N is the number of
feature dimensions). Then, this distance metric is passed into a decaying
exponential, with 1/e-life equal to the parameter 'similarity_scale'. This
gives a positive-definite similarity matrix, with ones on the diagonal.
Second, the similarity matrix is interpreted as a correlation matrix.
Variances are taken from the model (i.e., copied from the diagonal of the
covariance matrix). Together, these data determine a covariance matrix, which
will be combined with the model covariance in the given proportion.
Defaults to 0.
:param similarity_scale: This tunes the correlation matrix in the similarity
computation above. The pairwise euclidean distances in normalized feature
space are passed into a exponential with 1/e-life equal to similarity_scale.
So, a smaller value for similarity_scale will give smaller off-diagonal
entries on the correlation/covariance matrix.
If similarity_scale is set to 'auto', then a scale is chosen to match the
mean squares of the similarities and the correlations (from the model).
:param prior: Specifies a "prior probability" for each sample. The covariance
matrix (after the application of similarity) will be multiplied by the priors
such that, if C_ij is a covariance and p_i, p_j are the corresponding priors,
C'_ij = C_ij p_i p_j. This is a covenient way of introducing factors
other than covariance into the ranking. 'prior' may be an array of numbers
(of size num_samples), or a function (applied to the samples), or one of the
following:
'prediction': calculates a prior from the quantile of the predicted
performance (not the uncertainties). 'prior_scale' sets the power
this quantile is raised to.
Defaults to the constant value 1.
:param prior_scale: The prior will be raised to this power before applying it to
the covariance. Defaults to 1.
:param prefilter: Reduces the incoming sample pool before applying batch
selection. Selects the subset of the samples which have the highest
std_dev * prior score. If 0 < prefilter < 1, takes this fraction of the
sample pool. If prefilter >= 1, takes this many samples. Since batch
selection computes and stores the size-N^2 covariance matrix for the whole
sample pool, it should work with at most around 10,000 samples. Prefiltering
can work with much larger pools, since it only needs to compute N standard
deviations. Therefore, a practical strategy is to take a sample pool about
5 times as big as you can handle covariances for, then narrow down to only
the top 20% individual scores before batch selection. Narrowing to much less
than 20% risks changing what will ultimately be the optimal batch.
:param random_seed: A random seed for deterministic behaviour.
"""
def __init__(
self,
model=None,
samples=None,
num_samples=float("inf"),
batch_size=1,
normalize=False,
normalize_epsilon=1e-3,
regularization=0.05,
similarity=0,
similarity_scale="auto",
prior=1,
prior_scale=1,
prefilter=None,
random_seed=None,
fast_opt=True,
n_rounds=10,
**kwargs
):
super().__init__(
model=model,
# labelled_samples=labelled_samples,
samples=samples,
num_samples=num_samples,
batch_size=batch_size,
)
self.normalize = normalize
self.normalize_epsilon = normalize_epsilon
self.regularization = regularization
self.similarity = similarity
self.similarity_scale = similarity_scale
self.rng = np.random.default_rng(random_seed)
self.prior = prior
self.prior_scale = prior_scale
self.prefilter = prefilter
self.n_rounds = n_rounds
self.fast_opt = fast_opt
@get_defaults_from_self
def _select(
self,
samples=None,
fixed_samples=None,
batch_size=None,
prior=None,
fixed_prior=None,
verbose=False,
**kwargs
):
if fixed_samples is not None:
n_fixed = len(fixed_samples)
samples = concatenate(fixed_samples, samples)
else:
n_fixed = 0
# get prediction covariances
if verbose:
print("Computing covariance matrix for sample pool...")
cov = self.model.covariance(samples)
# smooth out the covariance with a similarity term
if self.similarity:
if verbose:
print("Computing similarity matrix...")
if self.similarity_scale == "auto":
# similarity is scaled so that it has the same mean square
# as the correlation
# only subsamples
sim_1 = similarity_exp(samples, scale=1)
std_dev = np.sqrt(cov.diagonal())
corr = cov / std_dev[None, :] / std_dev[:, None]
corr_mean = np.mean(np.square(corr))
log_s1 = np.log(sim_1)
def f(x):
return np.mean(np.power(sim_1, 2 * x)) - corr_mean
def df(x):
return 2 * np.mean(np.power(sim_1, 2 * x) * log_s1)
x = newton(f, 1.0, df, tol=corr_mean * 1e-4, full_output=True, disp=False)[0]
sim = np.power(sim_1, x)
else:
sim = similarity_exp(samples, scale=self.similarity_scale)
cov = (1 - self.similarity) * cov + self.similarity * covariance_from_similarity(
sim, cov.diagonal()
)
# normalize covariance for embedding vector size
if self.normalize and hasattr(self.model, "embedding"):
if verbose:
print("Normalizing covariances by embedding norms...")
emb_norm2 = (self.model.embedding(samples) ** 2).sum(axis=-1)
eps = self.normalize_epsilon * emb_norm2.mean()
prior = prior / np.sqrt(emb_norm2 + eps)
# apply the prior
cov = prior[None, :] * cov * prior[:, None]
epsilon = np.mean(np.diag(cov)) * 1e-8
# This actually returns 1/2 the log-determinant of the covariance
def cov_fn(indices):
# select sub-matrix of covariance matrix
i_0, i_1 = np.broadcast_arrays(indices[..., None, :], indices[..., :, None])
cov_batch = cov[i_0, i_1]
assert cov_batch.shape == (*indices.shape, indices.shape[-1])
# regularize to avoid non-positive matrices
# (eg., when a sample is repeated in a batch)
batch_indices = np.arange(indices.shape[-1])
cov_batch[..., batch_indices, batch_indices] += (
cov_batch[..., batch_indices, batch_indices] * self.regularization + epsilon
)
# compute log-determinant using cholesky decomposition
R = np.linalg.cholesky(cov_batch)
return np.sum(np.log(np.diagonal(R, axis1=-2, axis2=-1)), axis=-1)
# This is a weaker version...
def cov_fn_opt_step(indices, samples):
# select sub-matrix of covariance matrix
i_0, i_1 = np.broadcast_arrays(indices[..., None, :], indices[..., :, None])
cov_batch = cov[i_0, i_1]
assert cov_batch.shape == (*indices.shape, indices.shape[-1])
# regularize to avoid non-positive matrices
# (eg., when a sample is repeated in a batch)
batch_indices = np.arange(indices.shape[-1])
cov_batch[..., batch_indices, batch_indices] *= 1 + self.regularization
# pick out covariances of each batch w.r.t. the whole sample_space
C10 = cov[indices, :]
assert C10.shape == (*indices.shape, len(samples))
L00 = np.linalg.cholesky(cov_batch)
L10 = np.asarray(solve_triangular(L00, C10, lower=True))
assert L10.shape == (*indices.shape, len(samples))
return np.swapaxes(np.diag(cov) - np.sum(L10**2, axis=1), 0, 1)
if verbose:
print("Optimizing batches...")
batch_indices = optimize_batch(
cov_fn,
batch_size,
len(samples),
n_fixed=n_fixed,
scoring_opt_step=cov_fn_opt_step if self.fast_opt else None,
n_rounds=self.n_rounds,
random_seed=self.rng.integers(1e8),
# n_tuples=None, n_best=None,
verbose=verbose,
)
return batch_indices