Source code for bfade.abstract

from abc import ABC, abstractmethod
from typing import Dict, List, Tuple, Any

import numpy as np
import matplotlib.pyplot as plt
import scipy
from scipy.stats import norm, multivariate_normal
from scipy.optimize import minimize_scalar, minimize
from numdifftools import Hessian

from bfade.util import grid_factory, logger_factory
from bfade.util import identity, printer, dummy_translator
from bfade.statistics import Distribution, uniform

_log = logger_factory(name=__name__, level="DEBUG")

MINIMZER_LO_BOUND = 0
MINIMZER_UP_BOUND = 1e20

[docs] class AbstractCurve(ABC): """Abstract curve to instantiate curves to perform MAP over. Contains: - representation - inspection - computation of its distance to a given dataset. """ def __init__(self, metrics: callable = identity, **pars: Dict[str, Any]) -> None: """ Initialise curve. Parameters ---------- metrics : callable identity, logarithm, for instance. Default is identity. This determines\ whether minimum distance points are sought over the lin-lin or log-log space. pars: Dict[str, Any] name : Dict[str, str] name of the instance. remainder of arguments : Dict[str, float] parameters of the curve Returns ------- None """ try: self.name = pars.pop("name") except KeyError: self.name = "Untitled" try: self.metrics = pars.pop("metrics") except KeyError: self.metrics = identity [setattr(self, p, pars[p]) for p in pars] self.pars = [k for k in pars.keys()] self.metrics = metrics self.config()
[docs] def config(self, save: bool = False, folder: str = "./", fmt: str = "png", dpi: int = 300) -> None: """ Configure settings for saving plots. Parameters ---------- save : bool, optional Flag indicating whether to save plots. The default is False. folder : str, optional Folder path where plots will be saved. The default is "./". fmt : str, optional Format for saving plots. The default is "png". dpi : int, optional Dots per inch for saving plots. The default is 300. Returns ------- None """ self.save = save self.folder = folder self.fmt = fmt self.dpi = dpi
[docs] @abstractmethod def equation(self) -> np.ndarray: """ Abstract representation of a mathematical equation. """ ...
[docs] def squared_distance(self, t: float, X: np.ndarray) -> float: """ Calculate the squared distance between two points over the feature plane. This is just an auxiliary function, which ought not to be used directly, rather it must be used in conjunction with signed_distance_to_dataset. .. math:: d^2 = (\mathcal{M}(x1) - \mathcal{M}(t))^2 + (\mathcal{M}(x2) - \mathcal{M}(f(t)))^2 where :math:`\mathcal{M}` is the metrics whereby the optimal distance will be computed. Parameters ---------- t : float Dummy parameter. Abscissa along the equation. X : np.ndarray An array representing a point belonging to the feature space [X[0], X[1]]. Returns ------- float The squared distance between the metric values of points [t, equation(t)] and X. """ return (self.metrics(X[0]) - self.metrics(t))**2 +\ (self.metrics(X[1]) - self.metrics(self.equation(t)))**2
[docs] def signed_distance_to_dataset(self, D) -> Tuple[np.ndarray]: """ Minimises squared_distance to compute the minimum squared distance of each point of the dataset to the target curve. .. math:: \min_{\\theta} d^2 where :math:`\\theta` gather the parameters of the target curve. D : Dataset Dataset instance containing attributes X and y as features and output, respectively. """ x_opt = [] y_opt = [] l_dis = [] dd = [] signa = [] for x in D.X: res = minimize_scalar(self.squared_distance, args=(x), method="bounded", bounds=(MINIMZER_LO_BOUND, MINIMZER_UP_BOUND), ) if res.success: x_opt.append(res.x) l_dis.append(res.fun) # else: # raise Exception("Error while minimising.") x_opt = np.array(x_opt) y_opt = self.equation(x_opt) for x, xo, yo in zip(D.X, x_opt, y_opt): d = np.array([x[0]-xo, x[1]-yo]).T dd.append(np.inner(d, d)**0.5) if x[1] > yo: signa.append(1) else: signa.append(-1) l_dis = np.array(l_dis) dd = np.array(dd) signa = np.array(signa) return dd*signa, x_opt, y_opt
[docs] @printer def inspect(self, x: np.ndarray, scale: str = "linear", **data: Dict[str, Any]) -> None: """ Plot the equation of the curve and optionally the provided dataset. Parameters ---------- x : np.ndarray Array of x-values for plotting the equation curve. scale : str, optional The scale of the plot. The default is "linear". data : Dict[str, Any] Additional data for scatter points. Expected keys: "X", "y". Returns ------- None """ _log.warning(f"{self.__class__.__name__}.{self.inspect.__name__}") fig, ax = plt.subplots(dpi=300) plt.plot(x, self.equation(x), "k") try: plt.scatter(data["X"][:,0], data["X"][:,1], c=data["y"], s=10) except: pass plt.title(f"{self.__class__.__name__} -- {self.pars} = {[getattr(self, p) for p in self.pars]}") plt.xscale(scale) plt.yscale(scale) return fig, self.name + "_curve"
[docs] @printer def inspect_signed_distance(self, x: np.ndarray, x_opt: np.ndarray, y_opt: np.ndarray, dis: np.ndarray, X: np.ndarray = None, y: np.ndarray = None, scale: str = "linear") -> None: """ Visualize the signed distance of data points to a minimum-distance (optimal) point along the curve. Parameters ---------- x : np.ndarray Input values for the optimal point. x_opt : np.ndarray x-coordinate of the optimal point. y_opt : np.ndarray y-coordinate of the optimal point. dis : np.ndarray Signed distance values for each data point. X : np.ndarray, optional Input features of the synthetic dataset. y : np.ndarray, optional Target values of the synthetic dataset. scale : str, optional Scale for both x and y axes. Options are "linear" (default) or "log". Returns ------- None Displays a plot visualizing the signed distance of data points to the optimal point. """ fig, ax = plt.subplots(dpi=300) if X is not None: plt.scatter(X[:,0], X[:,1], c=y) plt.scatter(x_opt, y_opt) for x, xo, yo, d in zip(X, x_opt, y_opt, dis): if d > 0: ax.plot([x[0], xo], [x[1], yo], '-b') else: ax.plot([x[0], xo], [x[1], yo], '-.r') ax.axis("equal") plt.xscale(scale) plt.yscale(scale) return fig, self.name + "_sig_dist"
[docs] def get_curve(self) -> Tuple: """ Get curve parameters and its equation Returns ------- Tuple """ return self.pars, self.equation
def __repr__(self): attributes_str = ',\n '.join(f'{key} = {value}' for key, value in vars(self).items()) return f"{self.__class__.__name__}({attributes_str})"
[docs] class AbstractBayes(ABC): """Bayesian framework to perform Maximum a Posterior Estimation and predictions. Contains: - core method to perform map - abstract predictor - methods to instantiate priors, log-likelihood, and log-posterior - Variational (Laplace) approximation of the posterior - computation of the predictive posterior. """ def __init__(self, *pars: List[str], **args: Dict[str, Any]) -> None: """ Initialize the instance. Parameters ---------- pars : List[str] List of the names of the trainable parameters. args: Dict[str, Any] - theta_hat : np.ndarray expected value of the parameter (if available). - ihess : np.ndarray Inverse Hessian matrix of the log-posterior (if available). - name : str Name of the instance. - other parameters : float Deterministic parameters, if any. Returns ------- None """ try: self.name = args.pop("name") except: self.name = "Untitled" self.pars = pars [setattr(self, "prior_" + p, Distribution(uniform, unif_value=1)) for p in self.pars] _log.debug(f"{self.__class__.__name__}.{self.__init__.__name__} -- {self}") try: self.theta_hat = args.pop("theta_hat") self.ihess = args.pop("ihess") self.laplace_posterior() _log.warning(f"{self.__class__.__name__}.{self.MAP.__name__}"+\ f" -- Optimal values known -- theta_hat = {self.theta_hat}" +\ f"ihess = {self.ihess}") except: self.theta_hat = None self.ihess = None _log.warning(f"{self.__class__.__name__}.{self.MAP.__name__} -- Optimal values unknown. Must run MAP.") try: self.deterministic = args _log.info(f"{self.__class__.__name__}.{self.__init__.__name__} -- Deterministic parameter(s) {self.deterministic}") except KeyError: self.deterministic = None _log.info(f"{self.__class__.__name__}.{self.__init__.__name__} -- No deterministic parameter(s)")
[docs] def load_prior(self, par: str, dist: callable, **args: Dict) -> None: """ Load a prior distribution for a specified parameter. Parameters ---------- par : str The name of the parameter. dist : callable The distribution function or class. args : Dict[str] Additional arguments to be passed to the distribution function. Returns ------- None """ _log.info(f"{self.__class__.__name__}.{self.load_prior.__name__} for {par}") setattr(self, "prior_" + par, Distribution(dist, **args))
[docs] def load_log_likelihood(self, log_loss_fn: callable, **args: Dict[str, Any]) -> None: """ Load a likelihood loss function. Parameters ---------- log_loss_fn : callable Log-likelihood function. args : Dict[str, Any] Arguments of the log-likelihood function. Returns ------- None """ _log.info(f"{self.__class__.__name__}.{self.load_log_likelihood.__name__} -- {log_loss_fn}") self.log_likelihood_args = args self.log_likelihood_loss = log_loss_fn
[docs] @abstractmethod def predictor(self, D, *P: Dict[str, float]) -> None: """ Abstract method for making predictions using a model. Parameters ---------- D : Dataset Training dataset. P : Dict[str, float] Value of the parameters of the target curve. Returns ------- np.ndarray The result of the prediction. """ ...
[docs] def predict(self, D) -> np.ndarray: """ Wraps predictor to predict via the trained model. Parameters ---------- D : Dataset Data for prediction. Returns ------- np.ndarray Predictions. Raises ------ TypeError If the optimal value is not available. Must run MAP. """ try: return self.predictor(D, *self.theta_hat) except (AttributeError, TypeError): raise TypeError("Optimal value not available. Must run MAP.")
[docs] def log_prior(self, *P: Dict[str, Any]) -> float: """ Calculate the log-prior probability hypothesising initially independent distributions. .. math:: \log P[\\theta] = \sum \log P[\\theta_i] Parameters ---------- P : Dict[str, Any] Distribution and related arguments to be prescribed over the parameter. Returns ------- float The log-prior probability. """ return np.array([getattr(self, "prior_" + p).logpdf(P[(self.pars.index(p))]) for p in self.pars]).sum()
[docs] def log_likelihood(self, D, *P: Dict[str, Any]) -> float: """ Calculate the log-likelihood. .. math:: \log P[D | \\theta] Parameters ---------- D : Dataset Input dataset. P : Dict[str, float] Value of trainable parameters for the target curve. Returns ------- float The log-posterior probability. """ return -self.log_likelihood_loss(D.y, self.predictor(D, *P), **self.log_likelihood_args)
[docs] def log_posterior(self, D, *P: Dict[str, Any]) -> float: """ Calculate the log-posterior. .. math:: \log P[\\theta] = \log P[D | \\theta] + \log P[\\theta] Parameters ---------- D : Dataset Input dataset. P : Dict[str, Any] Trainable parameters. Returns ------- float The log-posterior probability. """ return self.log_prior(*P) + self.log_likelihood(D, *P)
[docs] def MAP(self, D, x0=[1,1], solver: Dict[str, Any] = None) -> None: """ Find the Maximum A Posteriori (MAP) estimate for the parameters. .. math:: \min_{\\theta} -\log P[\\theta | D] If MAP succeeds, the optimal parameters are stored in theta_hat. Whilst the Hessian Matrix is stored in ihess. Parameters ---------- D : Dataset Training dataset. x0 : list, optional Initial guess for the parameters, by default [1, 1]. solver : str, optional The optimization solver to use, by default "Nelder-Mead". Raises ------ Exception Raised if MAP optimization does not succeed. Returns ------- None """ def callback(X): """ Callback function for optimization. Parameters ---------- X : array-like Current parameter values. Returns ------- None """ current_min = -self.log_posterior(D, *X) _log.info(f"Iter: {self.n_eval:d} -- Params: {X} -- Min {current_min:.3f}") self.n_eval += 1 try: method = solver.pop("method") solver = solver _log.info(f"{self.__class__.__name__}.{self.MAP.__name__} -- User defined solver {method}, {solver}") except (KeyError, AttributeError): method = "Nelder-Mead" solver = {'disp': True, 'maxiter': 1e10} _log.info(f"{self.__class__.__name__}.{self.MAP.__name__} -- Default solver {method}, {solver}") if self.theta_hat is not None and self.ihess is not None: _log.warning(f"{self.__class__.__name__}.{self.MAP.__name__} -- Optimal value known. Skipping MAP.") else: _log.warning(f"{self.__class__.__name__}.{self.MAP.__name__} -- Run MAP.") self.n_eval = 0 result = minimize(lambda t: -self.log_posterior(D, *t), x0=x0, method=method, callback=callback, options=solver) if result.success: _log.warning(f"{self.__class__.__name__}.{self.MAP.__name__} -- MAP succeeded.") self.theta_hat = result.x hessfun = Hessian(lambda t: -self.log_posterior(D, *t)) _log.info(f"{self.__class__.__name__}.{self.MAP.__name__} -- Compute inverse Hessian Matrix.") self.ihess = np.linalg.inv(hessfun(self.theta_hat)) self.laplace_posterior() _log.warning(f"{self.__class__.__name__}.{self.MAP.__name__} -- theta_hat {self.theta_hat}") _log.warning(f"{self.__class__.__name__}.{self.MAP.__name__} -- ihess {self.ihess}") else: raise Exception("MAP did not succeede.")
[docs] def laplace_posterior(self) -> None: """ Load Laplace approximation. .. math:: P[\\theta | D] \sim \mathcal{N}(\hat{\\theta}, \mathbf{H}^{-1}) and its marginal distributions, where :math:`\hat{\\theta}` is the optimal value from MAP,\ and :math:`\mathbf{H}^{-1}` is the inverse Hessian matrix of :math:`-\log P[\\theta | D]` Returns ------- None. """ _log.debug(f"{self.__class__.__name__}.{self.laplace_posterior.__name__} -- Load distributions.") self.joint = multivariate_normal(mean = self.theta_hat, cov=self.ihess) for idx in range(self.theta_hat.shape[0]): setattr(self, "marginal_" + self.pars[idx], norm(loc=self.theta_hat[idx], scale=self.ihess[idx][idx]**0.5))
[docs] def predictive_posterior(self, posterior_samples: int, D, post_op: callable = None, random_state: int = 0) -> np.ndarray: """ Evaluate the predictive posterior using the specified number of samples. Parameters ---------- posterior_samples : int The number of posterior samples to generate. D : Dataset The dataset supplied for predicting the corresponding output. post_op : Callable[..., Any], optional Posterior operation function. Default is None. random_state: int Random state for numpy to sample the posterior. The default is 0. Returns ------- np.ndarray Predictive posterior samples processed via post_op, if provided. """ np.random.seed(0) _log.debug(f"{self.__class__.__name__}.{self.predictive_posterior.__name__}") self.posterior_samples = posterior_samples predictions = [] for k in range(0,self.posterior_samples): predictions.append(self.predictor(D, *self.joint.rvs(1))) predictions = np.array(predictions) if post_op is not None: _log.debug(f"{self.__class__.__name__}.{self.predictive_posterior.__name__} -- Return {post_op.__name__}") return post_op(predictions, axis=0) else: _log.debug(f"{self.__class__.__name__}.{self.predictive_posterior.__name__} -- Return prediction stack") return predictions
def __repr__(self) -> str: # attributes_str = ',\n '.join(f'{key} = {vars(self)[key]}' for key in vars(self).keys()) return f"{self.__class__.__name__}({vars(self)})"
[docs] class AbstractMAPViewer(ABC): """ Abstract viewer for inspecting MAP elements and Laplace's Variational approximation of the posterior. """ def __init__(self, p1: str, b1: list, n1: int, p2: str, b2: list, n2: int, spacing: float = "lin", **kwargs: Dict[str, float]) -> None: """ Initialize the AbstractMAPViewer. Parameters ---------- p1 : str Name of the first parameter. b1 : list Bounds for the first parameter. n1 : int Number of grid points for the first parameter. p2 : str Name of the second parameter. b2 : list Bounds for the second parameter. n2 : int Number of grid points for the second parameter. spacing : float Spacing between grid points, linear of logarithmic. kwargs: Dict[str, Any] - name: str name of the instance. Returns ------- None """ try: self.name = kwargs.pop("name") except KeyError: self.name = "Untitled" self.pars = (p1, p2) self.p1 = p1 self.p2 = p2 self.n1 = n1 self.n2 = n2 self.b1 = b1 self.b2 = b2 self.spacing = spacing setattr(self, "bounds_" + p1, b1) setattr(self, "bounds_" + p2, b2) _log.debug(f"{self.__class__.__name__}.{self.__init__.__name__} -- {self}") X1, X2 = grid_factory(getattr(self, "bounds_" + p1), getattr(self, "bounds_" + p2), self.n1, self.n2, spacing) setattr(self, p1, X1) setattr(self, p2, X2)
[docs] @abstractmethod def contour(self): """ Display the contour of the Bayes elements log-prior, -likelihood, and -posterior. """ ...
[docs] def config(self, save: bool = False, folder: str = "./", fmt: str = "png", dpi: int = 300) -> None: """ Configure settings for saving plots. Parameters ---------- save : bool, optional Flag indicating whether to save plots. The default is False. folder : str, optional Folder path where plots will be saved. The default is "./". fmt : str, optional Format for saving plots. The default is "png". dpi : int, optional Dots per inch for saving plots. The default is 300. Returns ------- None """ _log.debug(f"{self.__class__.__name__}.{self.config.__name__}") self.save = save self.folder = folder self.fmt = fmt self.dpi = dpi
[docs] def config_contour(self, levels: int = 21, clevels: int = 11, cmap: str = "viridis", xlim = None, ylim = None, translator: Dict = dummy_translator) -> None: """ Configure contour plot settings. Parameters ---------- levels : int, optional The number of contour levels for the main plot. The default is 21. clevels : int, optional The number of contour levels for the colorbar. The default is 11. cmap : str, optional The colormap to use for the plot. The default is "viridis". translator: Dict or callable Mapper for labels. Returns ------- None """ _log.debug(f"{self.__class__.__name__}.{self.config_contour.__name__}") self.levels = levels self.clevels = clevels self.cmap = cmap self.xlabel = translator[self.p1] self.ylabel = translator[self.p2] if xlim is not None and xlim is not None: self.b1 = xlim self.b2 = ylim
def __repr__(self): attributes_str = ',\n '.join(f'{key} = {value}' for key, value in vars(self).items()) return f"{self.__class__.__name__}({attributes_str})"