"""
Evaluation Core
===============
Pure evaluation orchestration for dimensionality-reduction workflows.
This module contains the two public evaluation interfaces used by the
dim-reduction stack:
- ``evaluate_embedding(...)`` evaluates an explicit embedding and returns
scalar metrics, scalar metadata, diagnostics, and tidy metric records.
- ``MethodSelector`` compares and ranks multiple already-scored
``DimReduction`` objects without refitting or recomputing embeddings.
The module is intentionally evaluation-only. It does not fit reducers,
transform data, reconstruct 3D trajectory tensors from flat embeddings, or
provide plotting methods. Reduction execution belongs to
``coco_pipe.dim_reduction.core.DimReduction`` and plotting belongs to
``coco_pipe.viz.dim_reduction``.
Author: Hamza Abdelhedi (hamza.abdelhedi@umontreal.ca)
"""
from __future__ import annotations
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
Optional,
Sequence,
Tuple,
Union,
)
import numpy as np
import pandas as pd
from sklearn.linear_model import LogisticRegression
if TYPE_CHECKING:
from ..core import DimReduction
from ...decoding.configs import CVConfig
from ...decoding.utils import cross_validate_score
from .geometry import (
trajectory_acceleration,
trajectory_curvature,
trajectory_dispersion,
trajectory_displacement,
trajectory_path_length,
trajectory_separation,
trajectory_speed,
trajectory_tortuosity,
trajectory_turning_angle,
)
from .metrics import (
compute_coranking_matrix,
compute_mrre,
continuity,
lcmc,
shepard_diagram_data,
trustworthiness,
)
__all__ = ["evaluate_embedding", "MethodSelector"]
METRIC_COLUMNS = ("method", "metric", "value", "scope", "scope_value")
SEPARATION_LOGREG_BALANCED_ACCURACY = "separation_logreg_balanced_accuracy"
SWEEP_METRICS = (
"trustworthiness",
"continuity",
"lcmc",
"mrre_intrusion",
"mrre_extrusion",
"mrre_total",
)
DEFAULT_SCORE_METRICS = (
*SWEEP_METRICS,
"shepard_correlation",
"trajectory_speed",
"trajectory_acceleration",
"trajectory_curvature",
"trajectory_turning_angle",
"trajectory_dispersion",
"trajectory_path_length",
"trajectory_displacement",
"trajectory_tortuosity",
"trajectory_separation",
)
RANKING_DIRECTIONS = {
"trustworthiness": "desc",
"continuity": "desc",
"lcmc": "desc",
"shepard_correlation": "desc",
SEPARATION_LOGREG_BALANCED_ACCURACY: "desc",
"mrre_intrusion": "asc",
"mrre_extrusion": "asc",
"mrre_total": "asc",
}
def _summarize_trajectory_metric(
prefix: str,
values: np.ndarray,
*,
summary_type: str,
use_last_axis: bool = False,
) -> Dict[str, float]:
"""Return scalar summaries for one trajectory metric payload."""
arr = np.asarray(values, dtype=float)
summary_: Dict[str, float] = {}
if summary_type == "peak":
summary_[f"{prefix}_mean"] = float(np.nanmean(arr))
summary_[f"{prefix}_peak"] = float(np.nanmax(arr))
elif summary_type == "final":
final_values = arr[..., -1] if use_last_axis else arr
summary_[f"{prefix}_final"] = float(np.nanmean(final_values))
else:
raise ValueError(f"Unsupported trajectory summary type '{summary_type}'.")
return summary_
def _evaluate_trajectory_metrics(
method_name: str,
X_emb: np.ndarray,
metric_selection: Optional[set],
labels: Optional[np.ndarray] = None,
times: Optional[np.ndarray] = None,
separation_method: str = "centroid",
) -> Tuple[Dict[str, Any], Dict[str, Any], Dict[str, Any], List[Dict[str, Any]]]:
"""
Compute trajectory summaries and diagnostics for native 3D embeddings.
Parameters
----------
method_name : str
Display name attached to the tidy metric records.
X_emb : np.ndarray
Embedded trajectories with shape ``(n_trajectories, n_times, n_dims)``.
metric_selection : set of str or None
Requested trajectory metric families. ``None`` computes all supported
trajectory metrics.
labels : np.ndarray, optional
One label per trajectory. Labels are currently only used by
``trajectory_separation``.
times : np.ndarray, optional
One time value per trajectory step. When provided and aligned, it is
used for separation AUC integration and stored as a diagnostic.
separation_method : str, default="centroid"
Separation definition passed to ``trajectory_separation``.
Returns
-------
metrics : dict
Scalar summary metrics for the requested trajectory families.
metadata : dict
Scalar metadata describing the trajectory tensor.
diagnostics : dict
Array-like or structured trajectory diagnostics.
records : list of dict
Tidy long-form metric records.
Notes
-----
This evaluator only operates on embeddings that are already shaped as
``(n_trajectories, n_times, n_dims)``. Trajectory reconstruction from flat
2D embeddings is intentionally out of scope and must happen upstream.
The evaluator-level ``trajectory_dispersion`` metric always uses the global,
unlabeled dispersion definition by calling
``trajectory_dispersion(traj, labels=None)``. Even when trajectory labels are
available, those labels are currently only used by
``trajectory_separation``, using the caller-provided ``separation_method``.
This keeps the non-separation trajectory metric loop uniform, but it means
evaluator-level dispersion summarizes overall spread rather than per-label
spread.
"""
traj = np.asarray(X_emb)
if traj.ndim != 3:
return {}, {}, {}, []
if times is not None:
candidate = np.asarray(times).reshape(-1)
if len(candidate) == traj.shape[1]:
times = candidate
if labels is not None:
candidate = np.asarray(labels).reshape(-1)
if len(candidate) == traj.shape[0]:
labels = candidate
metrics_payload: Dict[str, Any] = {}
metadata_payload: Dict[str, Any] = {
"trajectory_count": int(traj.shape[0]),
"trajectory_length": int(traj.shape[1]),
}
diagnostics_payload: Dict[str, Any] = {
"trajectory_times_": times,
}
records: List[Dict[str, Any]] = []
metrics = (
("trajectory_speed", trajectory_speed, "peak", False, 2),
("trajectory_acceleration", trajectory_acceleration, "peak", False, 3),
("trajectory_curvature", trajectory_curvature, "peak", False, 2),
("trajectory_turning_angle", trajectory_turning_angle, "peak", False, 3),
("trajectory_dispersion", trajectory_dispersion, "peak", False, 1),
(
"trajectory_path_length",
lambda values: trajectory_path_length(values, cumulative=True),
"final",
True,
2,
),
("trajectory_displacement", trajectory_displacement, "final", True, 1),
("trajectory_tortuosity", trajectory_tortuosity, "final", False, 2),
)
for (
metric_prefix,
metric_func,
summary_type,
use_last_axis,
min_timepoints,
) in metrics:
if metric_selection is not None and metric_prefix not in metric_selection:
continue
if traj.shape[1] < min_timepoints:
continue
values = metric_func(traj)
diagnostics_payload[f"{metric_prefix}_"] = values
summary = _summarize_trajectory_metric(
metric_prefix,
values,
summary_type=summary_type,
use_last_axis=use_last_axis,
)
metrics_payload.update(summary)
for metric_name, value in summary.items():
is_num = isinstance(value, (int, float, np.number))
if is_num and not isinstance(value, bool):
records.append(
{
"method": method_name,
"metric": metric_name,
"value": float(value),
"scope": "global",
"scope_value": "global",
}
)
if (
(metric_selection is None or "trajectory_separation" in metric_selection)
and labels is not None
and len(np.unique(labels)) > 1
):
separation = trajectory_separation(
traj,
labels,
method=separation_method,
)
diagnostics_payload["trajectory_separation_"] = separation
for pair, values in separation.items():
pair_suffix = f"{pair[0]}::{pair[1]}"
values_arr = np.asarray(values)
integrate = getattr(np, "trapezoid", getattr(np, "trapz", None))
if values_arr.size == 0:
auc_value = float("nan")
peak_value = float("nan")
elif times is None:
auc_value = float(integrate(values_arr))
peak_value = float(np.nanmax(values_arr))
else:
time_arr = np.asarray(times)
auc_value = (
float(integrate(values_arr, x=time_arr))
if len(time_arr) == len(values_arr)
else float(integrate(values_arr))
)
peak_value = float(np.nanmax(values_arr))
pair_metrics = {
f"trajectory_separation_auc::{pair_suffix}": auc_value,
f"trajectory_separation_peak::{pair_suffix}": peak_value,
}
metrics_payload.update(pair_metrics)
for metric_name, value in pair_metrics.items():
is_num = isinstance(value, (int, float, np.number))
if is_num and not isinstance(value, bool):
records.append(
{
"method": method_name,
"metric": metric_name,
"value": float(value),
"scope": "global",
"scope_value": "global",
"pair": f"{pair[0]} vs {pair[1]}",
}
)
return metrics_payload, metadata_payload, diagnostics_payload, records
def _evaluate_standard_metrics(
method_name: str,
X_eval: np.ndarray,
X_emb_eval: np.ndarray,
metric_selection: Optional[set],
n_neighbors: int,
k_values: Optional[Sequence[int]],
random_state: Optional[int],
) -> Tuple[Dict[str, Any], Dict[str, Any], List[Dict[str, Any]]]:
"""
Compute standard co-ranking and Shepard-based metrics for a 2D embedding.
Parameters
----------
method_name : str
Display name attached to the tidy metric records.
X_eval : np.ndarray
Original data with shape ``(n_samples, n_features)``.
X_emb_eval : np.ndarray
Embedded data with shape ``(n_samples, n_components)``.
metric_selection : set of str or None
Requested standard metrics. ``None`` computes all standard metrics
supported by this evaluator.
n_neighbors : int
Neighborhood size used when no explicit ``k_values`` sweep is
requested.
k_values : sequence of int, optional
Explicit neighborhood sizes for sweep-style evaluation.
random_state : int, optional
Random state used for sampled Shepard distances.
Returns
-------
metrics : dict
Scalar standard metrics.
diagnostics : dict
Standard evaluation diagnostics such as the co-ranking matrix or
Shepard sampled distances.
records : list of dict
Tidy long-form metric records.
"""
metrics_payload: Dict[str, Any] = {}
diagnostics_payload: Dict[str, Any] = {}
records: List[Dict[str, Any]] = []
requested_k_metrics = (
set(SWEEP_METRICS)
if metric_selection is None
else set(SWEEP_METRICS).intersection(metric_selection)
)
needs_shepard = (
metric_selection is None or "shepard_correlation" in metric_selection
)
if not requested_k_metrics and not needs_shepard:
return metrics_payload, diagnostics_payload, []
n_samples = X_eval.shape[0]
if requested_k_metrics:
Q = compute_coranking_matrix(X_eval, X_emb_eval)
diagnostics_payload["coranking_matrix_"] = Q
valid_k: List[int] = []
needs_positive_normalizer = bool(
{"trustworthiness", "continuity"} & requested_k_metrics
)
for k in [n_neighbors] if k_values is None else list(k_values):
if k <= 0 or k >= (n_samples - 1):
continue
if needs_positive_normalizer and (2 * n_samples - 3 * k - 1) <= 0:
continue
valid_k.append(k)
for k in valid_k:
row_values: Dict[str, float] = {}
for metric_name, metric_func in (
("trustworthiness", trustworthiness),
("continuity", continuity),
("lcmc", lcmc),
):
if metric_name in requested_k_metrics:
row_values[metric_name] = metric_func(Q, k)
if requested_k_metrics & {"mrre_intrusion", "mrre_extrusion", "mrre_total"}:
mrre_int, mrre_ext = compute_mrre(Q, k)
if "mrre_intrusion" in requested_k_metrics:
row_values["mrre_intrusion"] = mrre_int
if "mrre_extrusion" in requested_k_metrics:
row_values["mrre_extrusion"] = mrre_ext
if "mrre_total" in requested_k_metrics:
row_values["mrre_total"] = mrre_int + mrre_ext
if k_values is None:
metrics_payload.update(row_values)
for metric_name, value in row_values.items():
is_num = isinstance(value, (int, float, np.number))
if is_num and not isinstance(value, bool):
records.append(
{
"method": method_name,
"metric": metric_name,
"value": float(value),
"scope": "global" if k_values is None else "k",
"scope_value": "global" if k_values is None else k,
}
)
if needs_shepard:
d_orig, d_emb = shepard_diagram_data(
X_eval,
X_emb_eval,
sample_size=1000,
random_state=random_state,
)
shepard_metrics = {
"shepard_correlation": float(np.corrcoef(d_orig, d_emb)[0, 1])
if len(d_orig) > 1
else np.nan
}
metrics_payload.update(shepard_metrics)
diagnostics_payload["shepard_distances_"] = {
"original": d_orig,
"embedded": d_emb,
}
for metric_name, value in shepard_metrics.items():
is_num = isinstance(value, (int, float, np.number))
if is_num and not isinstance(value, bool):
records.append(
{
"method": method_name,
"metric": metric_name,
"value": float(value),
"scope": "global",
"scope_value": "global",
}
)
return metrics_payload, diagnostics_payload, records
[docs]
def evaluate_embedding(
X_emb: np.ndarray,
X: Optional[np.ndarray] = None,
method_name: str = "embedding",
metrics: Optional[Sequence[str]] = None,
labels: Optional[np.ndarray] = None,
groups: Optional[np.ndarray] = None,
times: Optional[np.ndarray] = None,
quality_metadata: Optional[Dict[str, Any]] = None,
diagnostics: Optional[Dict[str, Any]] = None,
random_state: Optional[int] = None,
n_neighbors: int = 5,
k_values: Optional[Sequence[int]] = None,
separation_method: str = "centroid",
) -> Dict[str, Any]:
"""
Evaluate an already computed embedding.
Parameters
----------
X_emb : np.ndarray
Embedded data to evaluate.
- ``(n_samples, n_components)`` triggers standard co-ranking and
Shepard-style metrics.
- ``(n_trajectories, n_times, n_dims)`` triggers trajectory metrics.
X : np.ndarray, optional
Original data with shape ``(n_samples, n_features)``. Required when
standard 2D metrics are requested.
method_name : str, default="embedding"
Display name attached to tidy metric records.
metrics : sequence of str, optional
Metric selectors to compute. ``None`` computes all metrics available for
the provided inputs.
labels : np.ndarray, optional
Optional labels aligned with the embedding. Used by
``trajectory_separation`` for native 3D embeddings and by explicit
supervised 2D metrics such as
``separation_logreg_balanced_accuracy`` when requested.
groups : np.ndarray, optional
Optional grouping variable aligned with ``X_emb``. Required by
``separation_logreg_balanced_accuracy``.
times : np.ndarray, optional
Optional trajectory time coordinates used for separation AUC
integration when trajectory metrics are evaluated.
quality_metadata : dict, optional
Scalar quality metadata to attach to the evaluation payload.
diagnostics : dict, optional
Precomputed diagnostics to carry through the evaluation payload.
random_state : int, optional
Random state used for sampled Shepard distances.
n_neighbors : int, default=5
Neighborhood size for single-score standard metrics.
k_values : sequence of int, optional
Neighborhood sizes for benchmark sweeps.
separation_method : str, default="centroid"
Separation definition passed to ``trajectory_separation`` when
trajectory labels are available.
Returns
-------
dict
Dictionary with these keys:
- ``embedding`` : the evaluated embedding
- ``metrics`` : scalar metric summaries
- ``metadata`` : scalar descriptive metadata
- ``diagnostics`` : array-like or structured diagnostics
- ``records`` : tidy long-form metric records as ``list[dict]``
- ``artifacts`` : copy of the diagnostics payload
Raises
------
TypeError
If ``quality_metadata`` or ``diagnostics`` is not a dictionary.
ValueError
If ``X_emb`` is not 2D or 3D, or if standard 2D evaluation is
requested without a compatible ``X``.
Notes
-----
This function is intentionally pure. It does not fit reducers, transform
data, or inspect reducer internals. Callers are responsible for preparing
``X_emb`` and any optional metadata such as trajectory labels or times.
See Also
--------
coco_pipe.dim_reduction.core.DimReduction.score
Manager-level wrapper that prepares inputs and stores the returned
evaluation payload on a fitted ``DimReduction`` object.
MethodSelector
Post-hoc comparison and ranking across multiple scored reductions.
Examples
--------
Evaluate a standard 2D embedding:
>>> import numpy as np
>>> X = np.random.RandomState(0).randn(20, 5)
>>> X_emb = X[:, :2]
>>> result = evaluate_embedding(X_emb, X=X, method_name="demo")
>>> "metrics" in result and "records" in result
True
Evaluate a native trajectory embedding:
>>> traj = np.random.RandomState(0).randn(4, 10, 2)
>>> labels = np.array(["A", "A", "B", "B"])
>>> result = evaluate_embedding(
... traj,
... method_name="traj",
... metrics=["trajectory_speed", "trajectory_separation"],
... labels=labels,
... )
>>> "trajectory_speed_mean" in result["metrics"]
True
"""
X_emb = np.asarray(X_emb)
if X is not None:
X = np.asarray(X)
metric_selection = None if metrics is None else set(metrics)
standard_metric_names = set(SWEEP_METRICS) | {"shepard_correlation"}
supervised_metric_names = {SEPARATION_LOGREG_BALANCED_ACCURACY}
trajectory_metric_names = set(DEFAULT_SCORE_METRICS) - standard_metric_names
metrics_payload: Dict[str, Any] = {}
if quality_metadata is None:
metadata_payload = {}
elif not isinstance(quality_metadata, dict):
raise TypeError("Evaluation quality metadata must be a dictionary.")
else:
metadata_payload = dict(quality_metadata)
if diagnostics is None:
diagnostics_payload = {}
elif not isinstance(diagnostics, dict):
raise TypeError("Evaluation diagnostics must be a dictionary.")
else:
diagnostics_payload = dict(diagnostics)
records: List[Dict[str, Any]] = []
if X_emb.ndim == 2:
if metric_selection is None:
standard_selection = standard_metric_names
supervised_selection = set()
else:
standard_selection = metric_selection & standard_metric_names
supervised_selection = metric_selection & supervised_metric_names
if standard_selection:
if X is None:
raise ValueError(
"Original data `X` is required to evaluate standard metrics "
"for 2D embeddings."
)
if X.ndim != 2 or X.shape[0] != X_emb.shape[0]:
raise ValueError(
"Standard evaluation requires 2D `X` and `X_emb` with matching "
"sample counts."
)
std_metrics, std_diagnostics, std_records = _evaluate_standard_metrics(
method_name=method_name,
X_eval=X,
X_emb_eval=X_emb,
metric_selection=standard_selection,
n_neighbors=n_neighbors,
k_values=k_values,
random_state=random_state,
)
metrics_payload.update(std_metrics)
diagnostics_payload.update(std_diagnostics)
records.extend(std_records)
if SEPARATION_LOGREG_BALANCED_ACCURACY in supervised_selection:
if labels is None or groups is None:
raise ValueError(
f"`labels` and `groups` are required for "
f"'{SEPARATION_LOGREG_BALANCED_ACCURACY}'."
)
separation_score = cross_validate_score(
LogisticRegression(max_iter=1000, class_weight="balanced"),
X_emb,
labels,
groups=groups,
cv_config=CVConfig(
strategy="stratified_group_kfold",
n_splits=5,
shuffle=True,
random_state=42,
),
metric="balanced_accuracy",
use_scaler=True,
)
metrics_payload[SEPARATION_LOGREG_BALANCED_ACCURACY] = separation_score
records.append(
{
"method": method_name,
"metric": SEPARATION_LOGREG_BALANCED_ACCURACY,
"value": separation_score,
"scope": "global",
"scope_value": "global",
}
)
elif X_emb.ndim == 3:
if metric_selection is None:
metric_selection = trajectory_metric_names
else:
metric_selection = metric_selection & trajectory_metric_names
(traj_metrics, traj_metadata, traj_diagnostics, traj_records) = (
_evaluate_trajectory_metrics(
method_name=method_name,
X_emb=X_emb,
metric_selection=metric_selection,
labels=labels,
times=times,
separation_method=separation_method,
)
)
metrics_payload.update(traj_metrics)
metadata_payload.update(traj_metadata)
diagnostics_payload.update(traj_diagnostics)
records.extend(traj_records)
else:
raise ValueError("`X_emb` must be either 2D or 3D for evaluation.")
return {
"embedding": X_emb,
"metrics": metrics_payload,
"metadata": metadata_payload,
"diagnostics": diagnostics_payload,
"records": list(records),
"artifacts": diagnostics_payload.copy(),
}
[docs]
class MethodSelector:
"""
Compare and rank already-scored dimensionality reduction methods.
``MethodSelector`` is intentionally post-hoc. It does not fit reducers or
compute embeddings. Each reducer must already be a scored ``DimReduction``
instance with cached ``metric_records_``.
Parameters
----------
reducers : dict or list of DimReduction
Scored ``DimReduction`` objects to compare. Lists are converted to a
method-keyed mapping using ``reducer.method``.
Attributes
----------
reducers : dict of str to DimReduction
Compared reductions keyed by method name.
metric_records_ : list of dict
Cached long-form metric records populated by ``collect()``.
See Also
--------
evaluate_embedding
Pure evaluator used upstream by ``DimReduction.score``.
coco_pipe.dim_reduction.core.DimReduction.score
Scores a fitted reduction and populates the records consumed here.
Examples
--------
>>> import numpy as np
>>> from coco_pipe.dim_reduction import DimReduction
>>> X = np.random.RandomState(0).randn(30, 4)
>>> reducers = [
... DimReduction("PCA", n_components=2),
... DimReduction("Isomap", n_components=2, n_neighbors=5),
... ]
>>> for reducer in reducers:
... embedding = reducer.fit_transform(X)
... reducer.score(embedding, X=X, k_values=[5])
>>> selector = MethodSelector(reducers).collect()
>>> frame = selector.to_frame()
>>> not frame.empty
True
"""
def __init__(
self, reducers: Union[Dict[str, "DimReduction"], List["DimReduction"]]
):
"""
Create a post-hoc comparison layer over scored reductions.
Parameters
----------
reducers : dict or list of DimReduction
Already-scored reductions to compare. When a list is provided,
reducers are keyed by ``reducer.method``.
Raises
------
TypeError
If any provided object is not a ``DimReduction`` instance.
"""
from ..core import DimReduction
if isinstance(reducers, list):
validated: Dict[str, DimReduction] = {}
for reducer in reducers:
if not isinstance(reducer, DimReduction):
raise TypeError(
"MethodSelector only accepts scored DimReduction objects. "
f"Got {type(reducer).__name__}."
)
validated[reducer.method] = reducer
self.reducers = validated
else:
self.reducers = dict(reducers)
for name, reducer in self.reducers.items():
if not isinstance(reducer, DimReduction):
raise TypeError(
"MethodSelector only accepts scored DimReduction objects. "
f"Reducer '{name}' has type {type(reducer).__name__}."
)
self.metric_records_ = []
[docs]
@classmethod
def from_records(cls, records: List[Dict[str, Any]]) -> "MethodSelector":
"""Create a selector directly from long-form metric records."""
selector = cls({})
selector.metric_records_ = [dict(record) for record in records]
return selector
[docs]
@classmethod
def from_frame(cls, frame: pd.DataFrame) -> "MethodSelector":
"""Create a selector directly from a metric-record DataFrame."""
return cls.from_records(frame.to_dict(orient="records"))
[docs]
def collect(self) -> "MethodSelector":
"""
Collect cached metric records from already-scored reducers.
Returns
-------
MethodSelector
The selector populated with comparison-ready metric records.
Raises
------
ValueError
If a reducer has not been scored yet.
See Also
--------
coco_pipe.dim_reduction.core.DimReduction.score
Populates the ``metric_records_`` consumed by this method.
to_frame
Materialize the collected long-form records as a DataFrame.
Notes
-----
``collect()`` does not fit reducers or recompute evaluation metrics.
It only gathers cached metric observations from reducers that were
already scored explicitly.
Examples
--------
>>> import numpy as np
>>> from coco_pipe.dim_reduction import DimReduction
>>> X = np.random.RandomState(0).randn(20, 4)
>>> reducer = DimReduction("PCA", n_components=2)
>>> embedding = reducer.fit_transform(X)
>>> reducer.score(embedding, X=X, k_values=[5])
>>> selector = MethodSelector([reducer]).collect()
>>> len(selector.metric_records_) > 0
True
"""
self.metric_records_ = []
records: List[Dict[str, Any]] = []
for name, reducer in self.reducers.items():
if not reducer.metric_records_:
raise ValueError(
f"Reducer '{name}' has no metric records. Call score() first."
)
for record in reducer.metric_records_:
updated = dict(record)
updated["method"] = name
records.append(updated)
self.metric_records_ = records
return self
[docs]
def to_frame(self) -> pd.DataFrame:
"""
Return the cached long-form metric table.
Returns
-------
pandas.DataFrame
Tidy metric table with columns ``method``, ``metric``, ``value``,
``scope``, and ``scope_value``.
Notes
-----
This method only materializes a DataFrame at the public export
boundary. Internally, ``MethodSelector`` stores metric records as plain
Python dictionaries.
See Also
--------
collect
Gather cached metric records from scored reducers.
rank_methods
Rank reducers from the collected metric table.
Examples
--------
>>> import numpy as np
>>> from coco_pipe.dim_reduction import DimReduction
>>> X = np.random.RandomState(0).randn(20, 4)
>>> reducer = DimReduction("PCA", n_components=2)
>>> embedding = reducer.fit_transform(X)
>>> reducer.score(embedding, X=X, k_values=[5])
>>> frame = MethodSelector([reducer]).collect().to_frame()
>>> set(["method", "metric", "value"]).issubset(frame.columns)
True
"""
if not self.metric_records_:
return pd.DataFrame(columns=METRIC_COLUMNS)
return pd.DataFrame.from_records(self.metric_records_)
[docs]
def rank_methods(
self,
selection_metric: str,
*,
selection_k: Optional[int] = None,
tie_breakers: Optional[Sequence[str]] = None,
) -> pd.DataFrame:
"""
Rank methods using one primary metric and optional tie-breakers.
Parameters
----------
selection_metric : str
Metric to optimize.
selection_k : int, optional
Neighborhood size to compare for k-scoped metrics.
tie_breakers : sequence of str, optional
Additional metrics used in order when primary values tie.
Returns
-------
pandas.DataFrame
Ranked comparison table. The first row is the best-scoring method
under the requested ranking policy.
Raises
------
ValueError
If the requested metrics are unsupported, unavailable in the cached
records, or missing the requested ``selection_k`` observations.
Notes
-----
Ranking is based on mean metric values per method. For k-scoped metrics,
``selection_k`` restricts comparison to a single neighborhood size when
requested.
See Also
--------
collect
Gather cached metric observations before ranking.
to_frame
Inspect the underlying long-form metric observations directly.
coco_pipe.dim_reduction.core.DimReduction.score
Produces the metric records that feed into ranking.
Examples
--------
>>> import numpy as np
>>> from coco_pipe.dim_reduction import DimReduction
>>> X = np.random.RandomState(0).randn(20, 4)
>>> reducers = [DimReduction("PCA", n_components=2)]
>>> reducer = reducers[0]
>>> embedding = reducer.fit_transform(X)
>>> reducer.score(embedding, X=X, k_values=[5])
>>> ranked = MethodSelector(reducers).collect().rank_methods(
... "trustworthiness",
... selection_k=5,
... )
>>> ranked.iloc[0]["method"] == reducer.method
True
"""
if selection_metric not in RANKING_DIRECTIONS:
raise ValueError(
f"Unsupported selection metric '{selection_metric}'. "
f"Supported metrics: {sorted(RANKING_DIRECTIONS)}"
)
tie_metrics = list(tie_breakers) if tie_breakers is not None else []
for tie_metric in tie_metrics:
if tie_metric not in RANKING_DIRECTIONS:
raise ValueError(
f"Unsupported tie-breaker metric '{tie_metric}'. "
f"Supported metrics: {sorted(RANKING_DIRECTIONS)}"
)
records = self.to_frame()
if records.empty:
raise ValueError(
"No evaluation metrics available. "
"Score reducers and call collect() first."
)
summary = pd.DataFrame(index=sorted(records["method"].unique()))
comparison_metrics = [selection_metric, *tie_metrics]
for metric in comparison_metrics:
metric_df = records[records["metric"] == metric].copy()
if metric_df.empty:
raise ValueError(
f"Metric '{metric}' is not available in the current results."
)
if selection_k is not None and (metric_df["scope"] == "k").any():
k_numeric = pd.to_numeric(metric_df["scope_value"], errors="coerce")
metric_df = metric_df[k_numeric == float(selection_k)]
if metric_df.empty:
raise ValueError(
f"Metric '{metric}' has no observations at k={selection_k}."
)
summary[metric] = metric_df.groupby("method", dropna=False)["value"].mean()
summary = summary.reset_index().rename(columns={"index": "method"})
sort_by = []
ascending = []
for metric in comparison_metrics:
sort_by.append(metric)
ascending.append(RANKING_DIRECTIONS[metric] == "asc")
sort_by.append("method")
ascending.append(True)
ranked = summary.sort_values(
sort_by, ascending=ascending, na_position="last"
).reset_index(drop=True)
ranked.insert(0, "rank", np.arange(1, len(ranked) + 1))
return ranked