Source code for alien.models.models

from abc import ABCMeta, abstractmethod

import numpy as np
from numpy.typing import ArrayLike

from ..data import Dataset, DictDataset
from ..decorators import flatten_batch, abstract_group, get_Xy, get_defaults_from_self
from ..stats import covariance_from_ensemble, ensemble_from_covariance, std_dev_from_ensemble
from ..utils import shift_seed, ranges, join
from ..config import INIT_SEED_INCREMENT

# pylint: disable=import-outside-toplevel


[docs]class Model(metaclass=ABCMeta): """ Abstract base class for wrapping a model. Implementers must provide prediction and fitting (training) methods. Parameters ---------- X You may provide training data at the time of initialization. You may do so by passing `X` and `y` parameters, or by passing a combined `data` (from which the model will extract `data.X` and `data.y`, if available, otherwise `data[:-1]` and `data[-1]`). You may instead pass in the training data when you call :meth:`.fit`. y data shape Specifies the `.shape` of the feature space. This will be set automatically if you provide training data. random_seed Random seed for those models that need it. init_seed Random seed for initializing model weights. This is stored, and after each call to :meth:`.initialize`, it is incremented by `INIT_SEED_INCREMENT`. reinitialize Whether to reinitialize model weights before each :meth:`.fit`. Defaults to `True`. ensemble_size Sets the ensemble size. This parameter is used by :meth:`.predict_ensemble` to determine how many observations to produce. It is also used by some ensemble models (eg., :class:`RandomForestRegressor` and :class:`CatBoostRegressor`) to set the size of their ensemble of estimators. """ def __init__( self, X=None, y=None, data=None, random_seed=None, reinitialize=True, init_seed=None, shape=None, ensemble_size=40, **kwargs ): super().__init__() if data is not None: self.data = data assert (X is None) and (y is None), "Only pass X,y *or* data to Model constructor" elif X is not None and y is None: self.data = X else: self._data = None self.X, self.y = X, y self.shape = shape self.ensemble_size = ensemble_size self.random_seed = random_seed self.rng = np.random.default_rng(random_seed) self.reinitialize = reinitialize self.init_seed = ( shift_seed(random_seed, INIT_SEED_INCREMENT) if init_seed is None else init_seed ) self.trained = False
[docs] @abstractmethod def predict(self, X): """ Applies the model to input(s) X (with the last self.ndim axes corresponding to each sample), and returns prediction(s). """
[docs] def predict_samples(self, X, n=1): """ Makes a prediction for for the batch X, randomly selected from this model's posterior distribution. Gives an ensemble of predictions, with shape `(len(X), n)`. """ return join(self.predict_samples(X) for _ in range(n))
[docs] @abstract_group('fit') @get_Xy @get_defaults_from_self def fit(self, X=None, y=None, reinitialize=None, fit_uncertainty=True, **kwargs): """ Fits the model to the given training data. If `X` and `y` are not specified, this method looks for `self.X` and `self.y`. If :meth:`.fit` finds an `X` but not a `y`, it treats `X` as a combined dataset `data`, and then uses `X, y = data.X, data.y`. If we can't find `data.X` and `data.y`, we instead use `X, y = data[:-1], data[-1]`. :meth:`.fit` should also fit any accompanying uncertainty model. :param reinitialize: If `True`, reinitializes model weights before fitting. If `False`, starts training from previous weight values. If not specified, uses `self.reinitialize`) :param fit_uncertainty: If `True`, a call to :meth:`fit` will also call :meth:`fit_uncertainty`. Defaults to `True`. """ if reinitialize: self.initialize() self.fit_model(X=X, y=y, **kwargs) if fit_uncertainty: self.fit_uncertainty(X=X, y=y)
[docs] @abstract_group('fit') def fit_model(self, X=None, y=None, **kwargs): """ Fit just the model component, and not the uncertainties (if these are computed separately) """
[docs] def fit_uncertainty(self, X=None, y=None): """ Fit just the uncertainties (if these need additional fitting beyond just the model) """ if hasattr(self, 'fit_laplace'): self.fit_laplace(X=X, y=y)
@property def data(self): if self._data is None and self.X is not None: self._data = DictDataset({"X": self.X, "y": self.y}) return self._data @data.setter def data(self, data): if data is None: self._data, self.X, self.y = None, None, None return if not isinstance(data, Dataset): data = Dataset.from_data(data) self._data = data try: self.X, self.y = data.X, data.y except AttributeError: self.X, self.y = None, None @property def shape(self): """ The shape of the feature space. Can either be specified directly, or inferred from training data, in which case `self.shape == X.shape[1:]`, i.e., the first (batch) dimension is dropped. This property is used by any methods which use the `@flatten_batch` decorator. """ if self._shape is None and self.X is not None: self._shape = self.X.shape[1:] return self._shape @shape.setter def shape(self, shape): self._shape = shape @property def ndim(self, default=1): """ The number of axes in the feature space. Equal to `len(self.shape)`. Most commonly equal to 1. If training data have been specified, then `self.ndim == X.ndim - 1`. This property is used by any methods which use the `@flatten_batch` decorator. """ return default if self.shape is None else len(self.shape)
[docs] def initialize(self, init_seed=None, sample_input=None): """ (Re)initializes the model weights. If `self.reinitialize` is True, this should be called at the start of every :meth:`.fit`, and this should be the default behaviour of :meth:`.fit`. """ pass
[docs] def save(self, path): """ Saves the model. May well be overloaded by subclasses, if they contain non-picklable components (or pickling would be inefficient). For any subclass, the :meth:`.save` and :meth:`.load` methods should be compatible with each other. """ import pickle with open(path, "wb") as f: pickle.dump(self, f)
[docs] @staticmethod def load(path): """ Loads a model. This particular implementation only works if `.save(path)` hasn't been overloaded. """ import pickle with open(path, "rb") as f: return pickle.load(f)
[docs]class Regressor(Model): """ This class can accept as its first argument (or `model`), any of the deep learning models we currently support. So, Pytorch, Keras or DeepChem. `Regressor`'s constructor will build a specialized subclass depending on the type of `model`. The resulting wrapped model will compute uncertainties and covariances in the way prescribed by `uncertainty`. Args: model: A Pytorch, Keras or DeepChem model, to be wrapped. uncertainty (str): can be `'dropout'` or `'laplace'`. This determines how the model will compute uncertainties and covariances. **kwargs: You can pass in arguments to the destined subclass. So, for example, if `model` is a DeepChem model, then `**kwargs` may carry any of the arguments accepted by `alien.models.DeepChemRegressor`. """
[docs] @abstractmethod def predict(self, X, return_std_dev=False): """ Applies the model to input(s) X (with the last self.ndim axes corresponding to each sample), and returns prediction(s). :param return_std_dev: if True, returns a tuple `(prediction, std_dev)` """
def __new__(cls, model=None, X=None, y=None, **kwargs): if cls == Regressor: if test_if_pytorch(model): from .pytorch import PytorchRegressor return PytorchRegressor.__new__( PytorchRegressor, model=model, X=X, y=y, **kwargs ) elif test_if_keras(model): from .keras import KerasRegressor return KerasRegressor.__new__( KerasRegressor, model=model, X=X, y=y, **kwargs ) elif test_if_deepchem(model): from .deepchem import DeepChemRegressor return DeepChemRegressor.__new__( DeepChemRegressor, model=model, X=X, y=y, **kwargs ) else: raise TypeError( f"Regressor doesn't support models of type {model.__class__.__qualname__}" ) else: return super().__new__(cls)
[docs]class CovarianceRegressor(Regressor): def __init__(self, *args, uncertainty=None, use_covariance_for_ensemble=False, **kwargs): self.use_covariance_for_ensemble = use_covariance_for_ensemble super().__init__(*args, **kwargs) if uncertainty is not None: self.covariance = getattr(self, 'covariance_' + uncertainty, self.covariance) self.std_dev = getattr(self, 'std_dev_' + uncertainty, self.std_dev)
[docs] def covariance(self, X): """ Returns the covariance of the epistemic uncertainty between all rows of X. This is where memory bugs often appear, because of the large matrices involved. """ raise NotImplementedError
# @flatten_batch
[docs] def predict_ensemble(self, X, multiple=1.0): """ Returns a correlated ensemble of predictions for samples X. Ensembles are correlated only over the last batch dimension, corresponding to axis (-1 - self.ndim) of X. Earlier dimensions have no guarantee of correlation. :param multiple: standard deviation will be multiplied by this """ return self.predict_samples(X, n=self.ensemble_size, multiple=multiple)
[docs] @get_defaults_from_self def predict_samples(self, X, n=1, multiple=1.0, use_covariance_for_ensemble=None): if not use_covariance_for_ensemble: raise RuntimeError("Using covariance computation to produce ensembles, which is unusual, so we're warning you here. Set `use_covariance_for_ensemble=True` to skip this error.") mean, cov = self.predict(X), self.covariance(X) return ensemble_from_covariance(mean, multiple * cov, n, self.rng)
# May want to override this:
[docs] @flatten_batch def std_dev(self, X, **kwargs): """Returns the (epistemic) standard deviation of the model on input `X`.""" return np.sqrt(self.covariance(X, **kwargs).diagonal())
[docs]class EnsembleRegressor(CovarianceRegressor): """ Inherit from EnsembleRegressor if you wish to compute ensembles directly. This class provides covariance and prediction for free, given these ensembles of predictions. Subclasses must implement one of :meth:`predict_ensemble` or :meth:`predict_samples`. """ def __init__(self, *args, ensemble_size=40, uncertainty='ensemble', **kwargs): super().__init__(*args, ensemble_size=ensemble_size, uncertainty=uncertainty, **kwargs)
[docs] @flatten_batch def predict(self, X, return_std_dev=False): preds_e = self.predict_ensemble(X) preds = preds_e.mean(1) if return_std_dev: return preds, np.std(preds_e, axis=-1) else: return preds
[docs] @abstract_group('ensemble') def predict_ensemble(self, X, **kwargs): """ Returns an ensemble of predictions. :param multiple: standard deviation should be this much larger """ return self.predict_samples(X, n=self.ensemble_size, **kwargs)
[docs] @flatten_batch @abstract_group('ensemble') def predict_samples(self, X, n=1, **kwargs): #multiple=1.0): # Here, we assume `predict_ensemble` has been implemented. preds = [] if n < self.ensemble_size and kwargs.get('multiple', 1) == 1 and hasattr(self, 'models'): indices = self.rng.choice(self.ensemble_size, n, replace=False, shuffle=False) preds = [self.models[i].predict(X, **kwargs) for i in indices] else: for j, k in ranges(0, n, self.ensemble_size): indices = self.rng.choice(self.ensemble_size, k-j, replace=False, shuffle=False) preds.append(self.predict_ensemble(X, **kwargs)[:,indices]) return join(preds)
[docs] @flatten_batch(degree=2) def covariance_ensemble(self, X: ArrayLike): """Compute covariance from the ensemble of predictions""" return covariance_from_ensemble(self.predict_ensemble(X))
[docs] @flatten_batch def std_dev_ensemble(self, X): """Returns the (epistemic) standard deviation of the model on input `X`.""" return std_dev_from_ensemble(self.predict_ensemble(X))
class WrappedModel(Model): def __init__(self, model, **kwargs): super().__init__(**kwargs) self.model = model def fit_model(self, X=None, y=None, **kwargs): self.model.fit(X, y) def predict(self, X, **kwargs): return self.model.predict(X, **kwargs) # -------- Some mixin classes for embeddings -------- # class EmbeddableModel(Model): """ A mixin class for models which can provide a vector embedding for its inputs. There are a number of ways to define this embedding: 0. A subclass defines the :meth:`embedding` method explicitly, or 1. The model has a 'last layer' before the output, in which inputs are embedded, or 2. The model's inputs are already vectors, so the embedding is the identity map Args: embedding (str): Specifies how this model should find the embedding. If it can't find an embedding method according to this guidance, then it raises an error. - `'explicit'` or `0` - Only option 0 is allowed - `'last_layer'` or `1` - Only option 1 is allowed - `'input'` or `2` - Only option 2 - `'any'` or `[0,1,2]` - Whatever is available, preference in the order given - `'good'` or `[0,1]` - Whichever of options 0 or 1 is available, preference for 0. This is the default. - a sequence of integers - for other orderings """ def __init__(self, *args, embedding='good', **kwargs): super().__init__(*args, **kwargs) if hasattr(self, 'embedding'): # Saving self.embedding so it doesn't get overwritten self._embedding = self.embedding # invokes a property setter: self.embedding_method = embedding @property def embedding_method(self): return self._embedding_method @embedding_method.setter def embedding_method(self, method): if not (isinstance(method, list) or isinstance(method, tuple)): method = [method] self._embedding_method = [] for m in method: self._embedding_method.extend({ 'explicit':[0], 'last_layer':[1], 'input':[2], 'any':[0,1,2], 'good':[0,1], }.get(m, [m])) self.find_method() method_names = { 0: ['_embedding', 'embed', 'embeddings'], 1: ['last_layer_embedding', 'last_layer_embed', 'embed_last_layer', 'last_layer'], 2: ['input_embedding'], } def find_method(self): for m in self._embedding_method: for a in self.method_names[m]: if hasattr(self, a): self.embedding = getattr(self, a) return raise NotImplementedError(f"Could not find an embedding for model of type {type(self)}") def input_embedding(self, X): try: X.shape except AttributeError: raise TypeError(f"`input_embedding` needs the input to be array-like, but you passed a {type(X)}") return X class LastLayerEmbeddableModel(EmbeddableModel): def __init__(self, *args, embedding='last_layer', **kwargs): super().__init__(*args, embedding=embedding, **kwargs) @abstractmethod def last_layer_embedding(self, X): """Returns the activations of the last layer before the output.""" def test_if_pytorch(model): pt_attrs = [ "_parameters", "_buffers", "_forward_hooks", "_modules", ] if all(hasattr(model, attr) for attr in pt_attrs): try: from torch.nn import Module assert isinstance(model, Module) return True except ImportError: pass return False def test_if_deepchem(model): dc_attrs = [ "_loss_fn", "output_types", "model_class", "_prediction_outputs", ] if all(hasattr(model, attr) for attr in dc_attrs): try: from deepchem.models import Model as DCModel assert isinstance(model, DCModel) return True except AssertionError: pass return False def test_if_keras(model): """Test if model is a keras model.""" kr_attrs = [ "_supports_masking", "_name", "_callable_losses", "_jit_compile", "_input_dtype", "_graph_initialized", ] if sum(hasattr(model, attr) for attr in kr_attrs) >= 4: try: from tensorflow.keras import Model assert isinstance(model, Model) return True except (ImportError, AssertionError): pass return False