from itertools import cycle
import numpy as np
from .decorators import flatten_batch, normalize_args
from .utils import shift_seed
# pylint: disable=no-value-for-parameter
[docs]@flatten_batch(bdim=-1)
def multiply_std_dev(ensemble, multiple: float):
"""
Modifies an ensemble to change its standard deviation by
a factor of 'multiple'.
:param ensemble: ensembles are over the last dimension
only.
"""
if multiple == 1:
return ensemble
means = np.mean(ensemble, axis=-1, keepdims=True)
ensemble = multiple * ensemble + (1 - multiple) * means
return ensemble
[docs]@flatten_batch(bdim=-1)
def augment_ensemble(preds, N: int, multiple=1.0, rng=None):
"""
Augments ensemble predictions to an ensemble of size N
per variable.
Extra observations are generated from a multivariate normal
distribution to have the same covariances (within the last
batch dimension) as existing observations
:param multiple: returns an ensemble with 'multiple'
times the covariance of the original.
"""
if multiple != 1:
preds = multiply_std_dev(preds, multiple)
if N <= preds.shape[-1]:
return preds
mean = np.mean(preds, axis=-1)
cov = covariance_from_ensemble(preds)
aug = ensemble_from_covariance(mean, cov, N - preds.shape[-1], rng)
return np.concatenate((preds, aug), axis=-1)
# @flatten_batch(bdim=-2)
[docs]def covariance_from_ensemble(preds, weights=None):
# weights.shape = (3,)
if weights is not None:
preds = preds * weights
if preds.ndim == 2:
return np.cov(preds)
# preds.shape = (N, 50, 3)
# subtract mean, and then reshape:
p0 = (preds - preds.mean(axis=1, keepdims=True)).reshape(len(preds), -1)
# 3N different means
# p0.shape = (N, 150)
return (p0[None, ...] * p0[:, None, ...]).sum(axis=1) / preds.shape[1]
# output.shape (N, N)
[docs]def std_dev_from_ensemble(preds, weights=None):
if preds.ndim == 2:
return np.std(preds, axis=1)
raise NotImplementedError
# @flatten_batch(bdim=-2, to_flatten=2, degree)
[docs]def ensemble_from_covariance(mean, cov, ensemble_size, rng=None, random_seed=None):
""" """
if rng is None:
rng = np.random.default_rng(random_seed)
return np.moveaxis(
rng.multivariate_normal(mean, cov, size=ensemble_size),
-2,
-1,
)
[docs]@flatten_batch(bdim=-1)
@normalize_args(euclidean=True)
def similarity_gaussian(X, relevant_features=None, normalize=True, scale=1.0):
"""
Computes a similarity matrix from a Gaussian function of distances,
as follows:
Restricts X to only look at 'relevant_features'.
If 'normalize' is True, rescales the input to have standard
deviation 1 in each feature, then rescales by another factor
of sqrt(X.shape[-1]).
Then the distance matrix is put into a Gaussian of std dev
equal to 'scale'.
"""
if relevant_features is not None:
X = X[..., relevant_features]
# TODO: if normalize: ...
distance = np.linalg.norm(X[..., :, None, :] - X[..., None, :, :], axis=-1)
similarity = np.exp(-np.square(distance) / (2 * scale))
return similarity
[docs]@flatten_batch(bdim=-1)
@normalize_args(euclidean=True)
def similarity_exp(X, relevant_features=None, normalize=True, scale=1.0):
# TODO: if normalize ...
if relevant_features is not None:
X = X[..., relevant_features]
distance = np.linalg.norm(X[..., :, None, :] - X[..., None, :, :], axis=-1)
similarity = np.exp(-distance / scale)
return similarity
[docs]@flatten_batch(bdim=-1)
@normalize_args
def similarity_cosine(X, relevant_indices=None, normalize=True):
if relevant_indices is not None:
X = X[relevant_indices]
l = np.linalg.norm(X, axis=-1)
l[l == 0] = 1
X = (X.T / l).T
return np.sum(X[..., None, :, :] * X[..., :, None, :], axis=-1)
[docs]def covariance_from_similarity(similarity, variance):
"""
Given a similarity matrix, and a vector of variances,
returns a covariance matrix, using the similarity matrix
as correlations.
"""
std = np.sqrt(variance)
return std[..., :, None] * similarity * std[..., None, :]
[docs]@flatten_batch(bdim=-2)
def joint_entropy(X):
"""
Not regularized. Is liable to underestimate the
entropy, i.e., samples may actually be less correlated
than they appear from this calculation, due to the
limited size of the ensemble.
:param X: ensemble of class predictions, of shape
... x batch_size x ensemble_size
"""
entropy = np.zeros(X.shape[0])
for i, X_b in enumerate(X):
_, counts = np.unique(X_b, axis=-1, return_counts=True)
probs = counts / X.shape[-1]
entropy[i] = np.sum(-np.log2(probs) * probs)
return entropy
[docs]@flatten_batch(bdim=-1)
def joint_entropy_regularized(X, n_classes=None, rng=None, random_seed=None):
"""
:param X: ensemble of class predictions, of shape
... x batch_size x ensemble_size
:param n_classes: to avoid computing it, you can
pass in the number of classes.
"""
if rng is None:
rng = np.random.default_rng(shift_seed(random_seed, 1234567))
if n_classes == 1: # will I continue with this 1-classes distinction??
n_classes = 2
if n_classes is None:
n_classes = len(np.unique(X))
if n_classes < 2:
# If we don't have at least 2 distinct classes, there is no information
return np.zeros(X.shape[0])
assert X.shape[-1] >= np.square(
n_classes
), f"Must have ensemble of size at least {np.square(n_classes)}, whereas yours is size {X.shape[-1]}."
batch_size = X.shape[-2]
# for now, aways take sub-batches of size so that in principle
# the ensemble could find all the information
sub_batch_size = n_classes ** int(np.log(batch_size) / np.log(n_classes))
# create subsampling indices
sub_indices = np.zeros(batch_size * (batch_size - 1) / 2)
[docs]def group_sizes(size, group_size):
sizes = int(size / group_size) * [group_size]
if size % group_size > 0:
stub = size % group_size
for i in cycle(range(len(sizes))):
if sizes[i] <= stub:
break
sizes[i] -= 1
stub += 1
sizes.append(stub)
return sizes
[docs]def group_edges(size, group_size):
sizes = group_sizes(size, group_size)
edges = [sum(sizes[:i]) for i in range(len(sizes) + 1)]
return list(zip(edges[:-1], edges[1:]))