from . import multivariate as mvt
# from DepthEucl import DepthEucl
import numpy as np
import pandas as pd
from scipy.interpolate import interp1d
from typing import Literal
[docs]
class DepthFunc():
"""
Functional Data-Depth
Return the depth of each sample w.r.t. a dataset, D(x,data) in a functional space, using a chosen depth notion.
Data depth computes the centrality (similarity, belongness) of a sample 'x' given a dataset 'data'.
Notes
-----
Possible depth notions are : `mahalanobis`, `halfspace`, `zonoid`, `projection`, `aprojection`, `cexpchullstar`, `cexpchull`, `geometrical`.
For each discretization point i = 1, ..., L:
- Extract the data slice `data[:, i, :]` (shape: N_data x D)
- Extract the query vector `x[i, :]` (shape: D)
- Compute the multivariate depth of the query vector relative to the data slice
- Average the results over all L time points
"""
def __init__(self):
self.data=None
[docs]
def load_dataset(self, data:pd.DataFrame=None, y:np.ndarray=None,
timestamp_col:str|int='timestamp',value_cols:str|list|int='value', case_id:str|int="case_id",
interpolate_grid:bool=True, N_grid:int=10,
interpolation_type:str="linear"):
"""
Load the dataset X for reference calculations. Depth is computed with respect to this dataset.
Parameters
----------
data : dataframe or array_like.
Dataset that will be used as base for depth computation
y : Ignored, default=None
Not used, present for API consistency by convention.
timestamp_col : str|int, default = timestamp
Column used for discretization of the dataset.
timestamp_col can be a string indicating the name of the column or an integer for the position of the column.
If the dataset is an array, timestamp_col must be an integer or it will not be considered and a new timestamp_col is created.
case_id : str | int, default= "case_id"
Column used to separate different functions.
For pandas dataframe, it must be either the column name or its position.
For numpy array, it must be eather an integer for the position or it wil be considered as the dimension separation.
value_cols: str|list, default = 'value'
Columns used for the multivariate depth computation.
If value_cols is a string, columns with such word in the name are used.
If value_cols is a list, it is considered the list of columns to be used.
If data is an array, value_cols must be an integer or a list of integers, if not all columns that are not timestamp or case_id will be considered.
interpolate_grid : bool, default = True
Interpolates the timestamp grid using an equaly spaced array from the minimum to the maximum timestamp value.
N_grid : int, default = 10
Determines the number of grid points in the interpolation process.
interpolation_type : str, default='linear'
Interpolation method to use. Supported options are those provided by
`scipy.interpolate.interp1d`, including:
- `'linear'` : linear interpolation (default)
- `'nearest'` : nearest-neighbor interpolation
- `'cubic'` : cubic spline interpolation
- `'quadratic'` : quadratic spline interpolation
- `'previous'`, `'next'` : stepwise interpolation
Returns
---------
DepthFunc model
Notes
-----
For each discretization point i = 1, ..., L:
- Extract the data slice `data[:, i, :]` (shape: N_data x D)
- Extract the query vector `query_point[i, :]` (shape: D)
- Compute the multivariate depth of the query vector relative to the data slice
- Average the results over all L time points
"""
if type(data) == type(None):
raise Exception("You must load a dataset")
assert type(data)==np.ndarray or type(data)==pd.DataFrame, 'Data must be a 3D numpy array of a pandas DataFrame'
if type(data)==np.ndarray:
self.TSnp=timestamp_col
self.CInp=case_id
data=self._3Dnp_tp_pd(data,self.TSnp, self.CInp)
timestamp_col="timestamp"
value_cols="value"
case_id="case_id"
if type(timestamp_col)==str:
if timestamp_col in data.columns:
self.timestamp_col=timestamp_col
else:
raise NameError(f"timestamp_col is not a column in the data.")
elif type(timestamp_col)==int:
if timestamp_col>=len(data.columns):
raise IndexError(f"timestamp_col is greater than the number of columns.")
else:
self.timestamp_col = data.columns[timestamp_col]
print(f"timestamp_col is set to {self.timestamp_col}")
if type(value_cols) == str:
self.value_cols = data.filter(like = value_cols).columns
if len(self.value_cols)==0: raise ValueError("value_cols is an empty list, value_cols should be a suffix for different columns")
elif type(value_cols) == list:
self.value_cols = value_cols
print(f"value_cols is set to {self.value_cols}")
if type(case_id)==str:
if case_id in data.columns:
self.case_id=case_id
else:
raise ValueError("case_id is not a column of the data")
elif type(case_id)==int:
if case_id>=len(data.columns):
raise IndexError(f"case_id is greater than the number of columns.")
else:
self.case_id = data.columns[case_id]
print(f"case_id is set to {self.case_id}")
self.t_min = data[self.timestamp_col].min()
self.t_max = data[self.timestamp_col].max()
data=data.sort_values([self.case_id,self.timestamp_col])
data=self._MinMax(data)
if interpolate_grid==True:
self.N_grid = N_grid
if np.issubdtype(data[self.timestamp_col].dtype, np.datetime64):
new_domain = np.linspace(0, (self.t_max - self.t_min).total_seconds(), self.N_grid)
else:
new_domain = np.linspace(0, self.t_max - self.t_min, self.N_grid)
self.new_domain = new_domain
# for i in np.unique(data.case_id)
elif interpolate_grid==False:
if np.issubdtype(data[self.timestamp_col].dtype, np.datetime64):
self.new_domain=np.sort(np.unique((data[self.timestamp_col]-data[self.timestamp_col].min()).dt.total_seconds()))
else:
self.new_domain=np.sort(np.unique((data[self.timestamp_col]-data[self.timestamp_col].min())))
self.interpolation_type=interpolation_type
self.data_array = self._syncronise_over_time(data,True)
self.data = data
return self
def _3Dnp_tp_pd(self,data,timestamp_col, case_id,):
assert len(data.shape)==3, "data must be a 3D numpy array or pandas dataframe"
if type(timestamp_col)==str and timestamp_col!="timestamp":
print(f"timestamp_col is set to {timestamp_col}, strings are only accepted with pandas dataframe.")
print("timestamp_col is set to a equaly spaced grid")
TS=np.arange(0,data.shape[-1])
timestamp_col=-1
elif type(timestamp_col)==int:
if timestamp_col>data.shape[-1]:
raise IndexError("timestamp_col is set to a value greater than the dataset")
else:
print("timestamp_col must be an integer, it will not be considered for further computations")
timestamp_col=-1
if type(case_id)==str and case_id!="case_id":
print(f"case_id is set to {case_id}, strings are only accepted with pandas dataframe.")
print("case_id is set to the first dimension of the query.")
case_id=-1
elif type(case_id)==int:
if case_id>data.shape[-1]:
raise IndexError("case_id is set to a value greater than the dataset")
else:
print("case_id must be an integer, it will not be considered for further computations")
case_id=-1
nCols=data.shape[-1]
if timestamp_col==-1:
timestamp_col=nCols
nCols+=1
if case_id==-1:
case_id=nCols
nCols+=1
valCols=[f"value_{i}" for i in range(nCols-2)]
df=pd.DataFrame(columns=["timestamp", "case_id", *valCols],dtype=np.float32)
for i in range(data.shape[0]):
# dfi=np.zeros((data.shape[1],nCols))
dfi=pd.DataFrame(data=np.zeros((data.shape[1],nCols)),columns=["timestamp", "case_id", *valCols],dtype=np.float32)
c=0
for j in range(nCols):
if j==timestamp_col:
if j>=data.shape[-1]:
dfi["timestamp"]=np.arange(data.shape[1],dtype=int)
else:
dfi["timestamp"]=data[i,:,j]
elif j==case_id:
if j>=data.shape[-1]:
dfi["case_id"]=np.ones(data.shape[1],dtype=int)*i
else:
dfi["case_id"]=data[i,:,j]
else:
# print(f"value_{c}")
dfi[f"value_{c}"]=data[i,:,j]
c+=1
df=pd.concat([df,dfi],ignore_index=True).reset_index(drop=True)
# print(dfi,timestamp_col)
return df
def _MinMax(self,data):
"""
fix min max values for border times
"""
for caso in np.unique(data[self.case_id]):
# df[df.case_id==i][minT]
# print(df[(df.case_id==i)&(df.timestamp==df[df.case_id==i]["timestamp"].min())][["case_id","timestamp",*val_cols]])
if data[(data[self.case_id]==caso)][self.timestamp_col].min()==self.t_min:
for col in self.value_cols:
if data[(data[self.case_id]==caso)&(data[self.timestamp_col]==self.t_min)][col].isna().values[0]:
data.loc[(data[self.case_id]==caso)&(data[self.timestamp_col]==self.t_min),col
]=data[(data[self.case_id]==caso)].loc[data[(data[self.case_id]==caso)][col].first_valid_index()][col]
else:
firstCol=pd.DataFrame(columns=data.columns, )
for col in data.columns:
if col==self.case_id:
firstCol.loc[0,col]=caso
elif col==self.timestamp_col:
firstCol.loc[0,col]=self.t_min
elif col in self.value_cols:
firstCol.loc[0,col]=data[(data[self.case_id]==caso)].loc[data[(data[self.case_id]==caso)][[col]].first_valid_index()][col]
else:
firstCol.loc[0,col]=data[(data[self.case_id]==caso)].loc[data[(data[self.case_id]==caso)][[col]].first_valid_index()][col]
firstCol = firstCol.astype(data.dtypes.to_dict())
# print(firstCol.dtypes)
data=pd.concat([firstCol,data], ignore_index=False).reset_index(drop=True)
# print(data[data[self.case_id]==caso])
if data[(data[self.case_id]==caso)][self.timestamp_col].max()==self.t_max:
for col in self.value_cols:
if data[(data[self.case_id]==caso)&(data[self.timestamp_col]==self.t_max)][col].isna().values[0]:
data.loc[(data[self.case_id]==caso)&(data[self.timestamp_col]==self.t_max),col
]=data[(data[self.case_id]==caso)].loc[data[(data[self.case_id]==caso)][col].last_valid_index()][col]
else:
lastCol=pd.DataFrame(columns=data.columns, )
for col in data.columns:
if col==self.case_id:
lastCol.loc[0,col]=caso
elif col==self.timestamp_col:
lastCol.loc[0,col]=self.t_max
elif col in self.value_cols:
lastCol.loc[0,col]=data.loc[data[(data[self.case_id]==caso)][col].last_valid_index(),col]
else :
lastCol.loc[0,col]=data.loc[data[(data[self.case_id]==caso)][col].last_valid_index(),col]
lastCol = lastCol.astype(data.dtypes.to_dict())
# print(lastCol)
# # print(firstCol.dtypes)
data=pd.concat([data,lastCol], ignore_index=False).reset_index(drop=True)
return data.sort_values([self.case_id,self.timestamp_col]).reset_index(drop=True)
def _syncronise_over_time(self,df, interpolation=True, ):
M = []
if np.issubdtype(df[self.timestamp_col].dtype, np.datetime64):
for case, group in df.groupby(self.case_id):
group = group.loc[~group[self.timestamp_col].duplicated(keep='first')]
if interpolation:
interp_values = self._interpolate(group[self.value_cols],
(group[self.timestamp_col]-self.t_min).dt.total_seconds().to_numpy(), self.new_domain,
self.interpolation_type,axis = 0)
else:
interp_values = np.asarray(group[self.value_cols])
M.append(interp_values)
else:
for case, group in df.groupby(self.case_id):
group = group.loc[~group[self.timestamp_col].duplicated(keep='first')]
if interpolation:
interp_values = self._interpolate(group[self.value_cols], (group[self.timestamp_col]-self.t_min).to_numpy(), self.new_domain, axis = 0)
else:
interp_values = np.asarray(group[self.value_cols])
M.append(interp_values)
M = np.stack(M, axis = 0)
return M
def _interpolate(self,value_matrix, original_domain, new_domain, method='linear', axis=0):
"""
Interpolate functional data (1D or ND) onto a new domain.
It supports interpolation along any specified axis and handles missing (`NaN`) values by skipping them
during interpolation.
Parameters
----------
value_matrix : array-like of shape (n_points, ...) or (n_points,)
Array containing the functional data to be interpolated.
- For 1D input: a single function defined on `original_domain`.
- For ND input: multi-dimensional observations, where
interpolation is performed along the specified `axis`.
original_domain : array-like of shape (n_points,)
The domain (e.g., time or spatial coordinates) corresponding to the data
in `value_matrix`. Must be strictly increasing and match the first dimension
of `value_matrix` along the interpolation axis.
new_domain : array-like of shape (n_new_points,)
The new domain points where the interpolation will be evaluated.
axis : int, default=0
Axis along which to perform the interpolation. By default, interpolation
is performed along the first axis (rows).
Returns
-------
interp_value_matrix : np.ndarray
Interpolated data array evaluated at the `new_domain`.
- If input was 1D, returns a 1D NumPy array of shape `(len(new_domain),)`.
- If input was ND, returns an array of shape similar to `value_matrix`,
except that the length along the interpolation axis becomes `len(new_domain)`.
"""
value_matrix = np.asarray(value_matrix)
original_domain = np.asarray(original_domain)
new_domain = np.asarray(new_domain)
# 1D case
if value_matrix.ndim == 1:
non_nan_idx = ~np.isnan(value_matrix)
# TODO check if more thqn 2 values are not null
f = interp1d(original_domain[non_nan_idx], value_matrix[non_nan_idx], kind=method, bounds_error=False, fill_value="extrapolate")
return f(new_domain)
# Move interpolation axis to 0
if axis != 0:
value_matrix = np.moveaxis(value_matrix, axis, 0)
original_shape = value_matrix.shape
flat_value_matrix = value_matrix.reshape(original_shape[0], -1)
interp_flat_value_matrix = np.empty((len(new_domain), flat_value_matrix.shape[1]), dtype=value_matrix.dtype)
# Interpolate each flattened column
for j in range(flat_value_matrix.shape[1]):
non_nan_idx = ~np.isnan(flat_value_matrix[:, j])
# TODO check if more thqn 2 values are not null
f = interp1d(original_domain[non_nan_idx], flat_value_matrix[:, j][non_nan_idx], kind=method, bounds_error=False, fill_value="extrapolate")
interp_flat_value_matrix[:, j] = f(new_domain)
# Reshape back to original dimensions
interp_value_matrix = interp_flat_value_matrix.reshape((len(new_domain),) + original_shape[1:])
# Move axis back to original position
if axis != 0:
interp_value_matrix = np.moveaxis(interp_value_matrix, 0, axis)
return interp_value_matrix
def _int_depth(self, query_point, notion='halfspace', solver='neldermead', NRandom=100, option=1, **kwargs):
"""
Compute the integrated functional depth (IFD) of a query function with respect to a sample of functional data.
Parameters
----------
data : np.ndarray
A 3D NumPy array of shape (N_data, L, D)
representing the sample of functional data:
- N_data: number of functional observations (samples)
- L: number of discretization points per function
- D: dimension of the data at each discretization point
Each element data[j, i, :] corresponds to the D-dimensional
observation of the j-th function at discretization point i.
query_point : np.ndarray
A 2D NumPy array of shape (L, D)
representing the function for which the integrated depth is to be computed.
It has the same structure as a single element of `data`.
type_of_depth : str, optional, default='halfspace'
The name of the multivariate depth function to use for each time slice.
Typical choices include:
- 'halfspace'
- 'projection'
- 'simplicial', etc.
solver : str, optional, default='neldermead'
The numerical method used to approximate the multivariate depth.
For example, 'neldermead' uses the Nelder–Mead optimization algorithm.
NRandom : int, optional, default=100
The number of random projections (or random directions)
Returns
-------
functional_depth_val : float
The integrated functional depth of `query_point` with respect to `data`,
computed as the average of the multivariate depths across all L discretization points.
Notes
-----
For each discretization point i = 1, ..., L:
- Extract the data slice `data[:, i, :]` (shape: N_data x D)
- Extract the query vector `query_point[i, :]` (shape: D)
- Compute the multivariate depth of the query vector relative to the data slice
- Average the results over all L time points
"""
total_depth_sum = 0
l_points, d = query_point.shape
n_refinements,sphcap_shrink,alpha_Dirichlet,cooling_factor,\
cap_size,start,space,line_solver,bound_gc=self._check_hyperparDepth(**kwargs)
if option==2: directions=np.zeros(query_point.shape)
# if d==1:option=1
for i in range(l_points):
# data_component_slice: N_data x D matrix (all functions at time i)
data_component_slice = self.data_array[:, i, :]
# query_component: D-dimensional vector (query function at time i)
query_component = query_point[i, :]
# Compute depth at time i
if option==1:
time_component_depth = mvt.depth_approximation(
query_component, data_component_slice,
notion, solver, NRandom,
option=option,
n_refinements=n_refinements,sphcap_shrink=sphcap_shrink,
alpha_Dirichlet=alpha_Dirichlet,
cooling_factor=cooling_factor,cap_size=cap_size,
start=start,space=space,line_solver=line_solver,bound_gc=bound_gc
)
if option==2:
time_component_depth, directions[i] = mvt.depth_approximation(
query_component, data_component_slice,
notion, solver, NRandom,
option=2,
n_refinements=n_refinements,sphcap_shrink=sphcap_shrink,
alpha_Dirichlet=alpha_Dirichlet,
cooling_factor=cooling_factor,cap_size=cap_size,
start=start,space=space,line_solver=line_solver,bound_gc=bound_gc
)
total_depth_sum += np.nan_to_num(time_component_depth,nan=0)
# Average depth over all L time points
functional_depth_val = total_depth_sum / l_points
if option==1:return functional_depth_val
else:return functional_depth_val,directions
[docs]
def projection_based_func_depth(self, query,
notion='halfspace', solver='neldermead', NRandom=100,
output_option:Literal["lowest_depth","final_depth_dir"]="lowest_depth", **kwargs):
"""
Compute projection-based functional depth for query functional data with respect to a reference dataset.
This function computes depth values of functional observations (in `query`) relative to a
reference dataset (`df`) using projection-based methods such as halfspace depth.
Each function (trajectory) is represented by a sequence of multivariate values over time.
Parameters
----------
query : pandas.DataFrame
Query dataset containing functional observations whose depth will be computed
relative to `df`. Must have the same column structure as `df`.
notion : {"mahalanobis", "halfspace", "zonoid", "projection", "aprojection", "cexpchullstar", "cexpchull", "geometrical"}, default='halfspace'
Type of functional depth to compute. Currently supports 'halfspace', but
can be extended to other projection-based depths.
solver : str, default='neldermead'
{'simplegrid', 'refinedgrid', 'simplerandom', 'refinedrandom', 'coordinatedescent', 'randomsimplices', 'neldermead', 'simulatedannealing'},
The type of solver used to approximate the depth.
Optimization solver used within the internal depth computation.
NRandom : int, default=100
Number of random projections or optimization restarts used in computing
projection-based depth.
notion
{"mahalanobis", "halfspace", "zonoid", "projection", "aprojection", "cexpchullstar", "cexpchull", "geometrical"},
Which depth will be computed.
n_refinements
For ``solver`` = ``refinedrandom`` or ``refinedgrid``, set the maximum of iteration for
computing the depth of one point.
sphcap_shrink
For ``solver`` = ``refinedrandom`` or `refinedgrid`, it's the shrinking of the spherical cap.
alpha_Dirichlet
For ``solver`` = ``randomsimplices``. it's the parameter of the Dirichlet distribution.
cooling_factor
For ``solver`` = ``randomsimplices``, it's the cooling factor.
cap_size
For ``solver`` = ``simulatedannealing`` or ``neldermead``, it's the size of the spherical cap.
start
{'mean', 'random'},
For ``solver`` = ``simulatedannealing`` or ``neldermead``, it's the method used to compute the first depth.
space
{'sphere', 'euclidean'},
For ``solver`` = ``coordinatedescent`` or ``neldermead``, it's the type of spacecin which
the solver is running.
line_solver
{'uniform', 'goldensection'},
For ``solver`` = ``coordinatedescent``, it's the line searh strategy used by this solver.
bound_gc
For ``solver`` = ``neldermead``, it's ``True`` if the search is limited to the closed hemisphere.
Returns
-------
depth_array : np.ndarray of shape (n_query,)
Array of depth values, where `n_query` is the number of functional observations
(unique `case_id`s) in the `query` dataset.
The first return is the lowest comuted depth regarding all explored directions in space.
The second return is the direction that best represents the analyzed point, the direction corresponfing to the lowest depth.
If ``output_option=="lowest_depth"`` returns:
array_like
- Lowest Asymmetrical Projection Detph
If ``output_option=="final_depth_dir"`` returns:
Tuple of array_like
- Lowest Asymmetrical Projection Detph
- Lowest depth respective sirection
Notes
-----
- If `timestamp` is of type `datetime64`, it is converted internally to seconds
relative to the global minimum timestamp (`t_min`).
- Duplicate timestamps within each `case_id` group are automatically dropped.
- Interpolation uses linear extrapolation outside the observed time range."""
self._check_depth(notion)
if type(query)==np.ndarray:
query=self._3Dnp_tp_pd(query,self.TSnp, self.CInp)
if query[self.timestamp_col].max()>self.t_max:
print(f"Values with {self.timestamp_col} greater the base set domain are excluded")
query.drop(query[query[self.timestamp_col]>self.t_max].index, inplace=True)
if query[self.timestamp_col].min()>self.t_min:
print(f"Values with {self.timestamp_col} smaller the base set domain are excluded")
query.drop(query[query[self.timestamp_col]<self.t_min].index, inplace=True)
queryMM=self._MinMax(query)
query_array = self._syncronise_over_time(queryMM,)
depth_array = np.empty((query_array.shape[0],), dtype = float)
if len(self.value_cols)==1 and output_option=="final_depth_dir":
print("Space dimention is 1, output_option is set to 'lowest_depth'")
output_option="lowest_depth"
if output_option=="lowest_depth":
option=1
direction_array=None
elif output_option=="final_depth_dir":
option=2
direction_array = np.empty(query_array.shape, dtype = float)
else:
option=1
print(f"Invalid output_option, output_option set to 'lowest_depth'")
for i in range(query_array.shape[0]):
if option==1:
depth_array[i] = self._int_depth(query_array[i, :, :], notion=notion, solver=solver,option=option,
NRandom=NRandom,**kwargs)[0]
elif option==2:
depth_array[i], direction_array[i] =self._int_depth(query_array[i, :, :], notion=notion, solver=solver,option=option,
NRandom=NRandom,**kwargs)
if option==1:return depth_array
else:return depth_array,direction_array
def _check_hyperparDepth(self,**kwargs):
n_refinements = 10
sphcap_shrink = 0.5
alpha_Dirichlet = 1.25
cooling_factor = 0.95
cap_size = 1
start = "mean"
space = "sphere"
line_solver = "goldensection"
bound_gc = True
for key, value in kwargs.items():
if key=="n_refinements":
n_refinements=value
elif key=="sphcap_shrink":
sphcap_shrink=value
elif key=="alpha_Dirichlet":
alpha_Dirichlet=value
elif key=="cooling_factor":
cooling_factor=value
elif key=="cap_size":
cap_size=value
elif key=="start":
start=value
elif key=="space":
space=value
elif key=="line_solver":
line_solver=value
elif key=="bound_gc":
bound_gc=value
else:
print(f"{key} is not a parameter for depth computation")
return n_refinements,sphcap_shrink,alpha_Dirichlet,cooling_factor,cap_size,start,space,line_solver,bound_gc
def _check_depth(self, depth):
all_depths = ["mahalanobis", "halfspace", "zonoid", "projection", "aprojection", "cexpchullstar", "cexpchull", "geometrical"]
if (depth not in all_depths):
raise ValueError("Depths approximation is available only for depths in %s, got %s."%(all_depths, depth))