from typing import List, Dict, Any
import numpy as np
import scipy
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split as tts
from bfade.abstract import AbstractCurve
from bfade.util import grid_factory, logger_factory, YieldException, printer
_log = logger_factory(name=__name__, level="DEBUG")
[docs]
class Dataset:
"""General dataset class for managing datasets."""
def __init__(self, **kwargs: Dict[str, Any]) -> None:
"""
Initialize the instance.
Parameters
----------
**kwargs : Dict[str, Any]
-name : str
Name of the instance.
- X : np.ndarray
Input features
- y: np.ndarray
Output feature.
- test: np.ndarray
Binary vector indicating whether a datum has to be used to train.
- reader: callable
Pandas reader
- remainder of the arguments: arguments for the viewer
Note
----
The initialisation can be done passing a dataset containing X, and y
as keys and related items.
Returns
-------
None
"""
self.X = None
self.y = None
try:
self.name = kwargs.pop("name")
except:
self.name = "Untitled"
try:
path = kwargs.pop("path")
reader = kwargs.pop("reader")
self.data = reader(path, **kwargs)
except KeyError:
self.data = None
try:
self.X = self.data[["x1", "x2"]].to_numpy()
self.y = self.data["y"].to_numpy()
_log.debug(f"{self.__class__.__name__}.{self.__init__.__name__} -- Data ready")
except (TypeError, KeyError):
pass
try:
self.X = kwargs.pop("X")
_log.debug(f"{self.__class__.__name__}.{self.__init__.__name__} -- Load X data")
except KeyError:
pass
try:
self.y = kwargs.pop("y")
_log.debug(f"{self.__class__.__name__}.{self.__init__.__name__} -- Load y data")
except KeyError:
pass
try:
self.test = kwargs.pop("test")
except KeyError:
self.test = None
try:
[setattr(self, k, kwargs[k]) for k in kwargs.keys()]
except KeyError:
pass
self.config()
[docs]
def config(self, save: bool = False, folder: str = "./", fmt: str = "png", dpi: int = 300) -> None:
"""
Configure settings for saving plots.
Parameters
----------
save : bool, optional
Flag indicating whether to save plots. The default is False.
folder : str, optional
Folder path where plots will be saved. The default is "./".
fmt : str, optional
Format for saving plots. The default is "png".
dpi : int, optional
Dots per inch for saving plots. The default is 300.
Returns
-------
None
"""
_log.debug(f"{self.__class__.__name__}.{self.config.__name__}")
self.save = save
self.folder = folder
self.fmt = fmt
self.dpi = dpi
[docs]
@printer
def inspect(self, xlim=[1,1000], ylim=[1,1000], scale="linear", **kwargs: Dict[str, Any]):
"""
Visualize the data and optionally a curve.
Parameters
----------
xlim : list, optional
Limits for the x-axis. Default is [1, 1000].
ylim : list, optional
Limits for the y-axis. Default is [1, 1000].
scale : str, optional
Scale for both x and y axes. Options are "linear" (default) or "log".
**kwargs : Dict[str, Any]
- curve: AbstractCurve
Curve to inspect.
- x: np.ndarray
Abscissa for the curve
"""
_log.debug(f"{self.__class__.__name__}.{self.inspect.__name__}")
fig, ax = plt.subplots(dpi=300)
ax.scatter(self.X[:,0], self.X[:,1], c=self.y, s=10)
try:
curve = kwargs.pop("curve")
x = kwargs.pop("x")
ax.plot(x, curve.equation(x))
self.name + "_curve"
except:
pass
ax.set_xlim(xlim)
ax.set_ylim(ylim)
ax.set_xscale(scale)
ax.set_yscale(scale)
return fig, self.name + "_data"
[docs]
def partition(self, method: str = "random", test_size: float = 0.2, random_state: int = 0):
"""
Partition the dataset into training and testing sets.
Parameters
----------
method : str, optional
Method for partitioning. Options are "random" (default) or "user".
test_size : float, optional
The proportion of the dataset to include in the test split. Default is 0.2.
random_state : int, optional
Random seed for reproducibility. Default is 0.
Returns
-------
Tuple[Dataset, Dataset]
Training and testing datasets.
Raises
------
AttributeError
If no data is available in the dataset.
Exception
If split method is incorrectly provided.
"""
_log.info(f"{self.__class__.__name__}.{self.partition.__name__}")
_log.warning(f"Train/test split. Method: {method}")
if method == "random":
if self.data is not None:
data_tr, data_ts = tts(self.data,
test_size=test_size,
random_state=random_state)
print(data_tr)
return Dataset(name=self.name+"_train", **self.populate(data_tr)),\
Dataset(name=self.name+"_test", **self.populate(data_ts))
elif self.X is not None and self.y is not None:
X_tr, X_ts, y_tr, y_ts = tts(self.X, self.y,
test_size=test_size,
random_state=random_state)
return Dataset(X=X_tr, y=y_tr, name=self.name+"_train"),\
Dataset(X=X_ts, y=y_ts, name=self.name+"_test")
else:
raise AttributeError("No data in dataset.")
elif method == "user":
if self.data is not None:
return Dataset(name=self.name+"_train", **self.populate(self.data.query("test == 0"))),\
Dataset(name=self.name+"_test", **self.populate(self.data.query("test == 1"))),
elif self.X is not None and self.y is not None:
class0 = np.where(self.test == 0)
class1 = np.where(self.test == 1)
return Dataset(X=self.X[class0], y=self.y[class0], name=self.name+"_train"),\
Dataset(X=self.X[class1], y=self.y[class1], name=self.name+"_test")
else:
raise AttributeError("No data in dataset.")
else:
raise Exception("Split method incorrectly provided.")
[docs]
def populate(self, data, X_labels: List[str] = ["x1", "x2"], y_label: str = "y") -> Dict[str, np.ndarray]:
"""
Populate data into features and target labels.
Parameters
----------
data : pd.DataFrame
Input data containing features and target labels.
X_labels : list of str
Feature column labels. The default is ["x1", "x2"].
y_label : str
Target column label. The default is "y".
Returns
-------
dict
Dictionary containing features and target labels.
"""
_log.debug(f"{self.__class__.__name__}.{self.populate.__name__}")
return {"X": data[X_labels].to_numpy(), "y": data[y_label].to_numpy()}
[docs]
class SyntheticDataset(Dataset):
def __init__(self, **kwargs: Dict[str, Any]) -> None:
super().__init__(**kwargs)
[docs]
def make_grid(self, x1_bounds: List[float], x2_bounds: List[float],
n1: int, n2: int, spacing: str ="lin") -> None:
"""
Generate a grid of input points for the synthetic dataset.
Parameters
----------
x1_bounds : List[float]
Bounds for the first feature (x1).
x2_bounds : List[float]
Bounds for the second feature (x2).
n1 : int
Number of points along the first dimension (x1).
n2 : int
Number of points along the second dimension (x2).
scale : str, optional
The scale of the grid spacing, either "lin" for linear or "log" for logarithmic.
Default is "lin".
Returns
-------
None
"""
_log.debug(f"{self.__class__.__name__}.{self.make_grid.__name__}")
self.X = np.vstack(grid_factory(x1_bounds, x2_bounds, n1, n2, spacing)).T
[docs]
def make_tube(self, curve, x_bounds: List[float], n: int = 50, up: float = 0.1,
down: float = -0.1, step: int = 4, spacing: str = "lin") -> None:
"""
Generate a ``tube'' of points surrounding the given EH curve.
This method should be used in place of make_grid.
The dataset is inspected via view_grid
Parameters
----------
xlim : List[float]
Edges of the interval along the x-axis.
x_res : int, optional
Number of points . The default is 50.
up : float, optional
Maximum upward translation of the EH curve. The default is 0.1.
Note that in log-space (uniform) translations is achieved via
multiplication.
down : float, optional
Minimum downward translation of the EH curve. The default is -0.1.
Note that in log-space (uniform) translations is achieved via
multiplication.
step : int, optional
Number of translated curves. The default is 12. The method disregards
the curve obtained via translation when the multiplication factor
is 1. It gives the original curve, where points are classified as
0.5, so they do not bring about any information.
spacing: str, optional
Spacing of the points.
Returns
-------
None
"""
_log.debug(f"{self.__class__.__name__}.{self.make_tube.__name__}")
assert down < up
if spacing == "lin":
steps = np.linspace(up, down, step)
x1 = np.linspace(x_bounds[0], x_bounds[1], n)
else:
steps = np.logspace(up, down, step)
x1 = np.logspace(np.log10(x_bounds[0]), np.log10(x_bounds[1]), n)
x2 = curve.equation(x1)
X1 = []
X2 = []
for s in steps:
if spacing == "lin":
X2.append(x2 + s)
else:
X2.append(x2 * s)
X2 = np.array(X2)
X1 = np.array(list(x1)*X2.shape[0]).flatten()
X2 = X2.flatten()
self.X = np.vstack([X1,X2]).T
[docs]
def make_classes(self, curve):
"""
Assign class labels to the synthetic dataset based on the underlying curve.
curve: AbstractCurve
The curve used to separated the dataset and make classes accordingly.
Returns
-------
None
"""
_log.debug(f"{self.__class__.__name__}.{self.make_classes.__name__}")
self.y = []
for d in self.X:
if curve.equation(d[0]) > d[1]:
self.y.append(0)
else:
self.y.append(1)
self.y = np.array(self.y)
[docs]
def clear_points(self, curve, tol: float = 1e-2):
"""
Remove data points from the synthetic dataset based on the deviation from the underlying curve.
curve: AbstractCurve
The curve used to separated the dataset and make classes accordingly.
Parameters
----------
tol : float, optional
Tolerance level for determining the deviation. Points with a
deviation less than `tol` will be removed. The default is 1e-2.
Returns
-------
None
"""
_log.debug(f"{self.__class__.__name__}.{self.clear_points.__name__} -- tol = {tol}")
if self.y is not None:
raise YieldException("Points must cleared before making classes.")
else:
self.X = np.array([d for d in self.X if abs(curve.equation(d[0]) - d[1]) > tol])
[docs]
def add_noise(self, x1_std: float, x2_std: float, random_state: int = 0) -> None:
"""
Add Gaussian noise to the data points in the synthetic dataset.
Parameters
----------
x1_std : float
Standard deviation of the Gaussian noise to be added to the first feature (x1).
x2_std : float
Standard deviation of the Gaussian noise to be added to the second feature (x2).
random_state: int
Random state. The default is 0.
Returns
-------
None
"""
_log.debug(f"{self.__class__.__name__}.{self.add_noise.__name__}")
np.random.seed(random_state)
if self.y is None:
raise YieldException("Noise must be added after making classes.")
self.X[:,0] += scipy.stats.norm(loc = 0, scale = x1_std).rvs(size=self.X.shape[0])
self.X[:,1] += scipy.stats.norm(loc = 0, scale = x2_std).rvs(size=self.X.shape[0])
[docs]
def crop_points(self):
_log.debug(f"{self.__class__.__name__}.{self.crop_points.__name__}")
X = []
y = []
for xx, yy in zip(self.X, self.y):
if xx[0] > 0 and xx[1] > 0:
X.append(xx)
y.append(yy)
else:
pass
self.X = np.array(X)
self.y = np.array(y)