Source code for coco_pipe.report.core

"""
Core Reporting Classes
======================

Defines the generic reporting primitives and dim-reduction report adapters used
to assemble single-file HTML reports.
"""

import base64
import gzip
import html
import io
import json
import re
import uuid
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, Dict, List, Optional, Union

import numpy as np
import pandas as pd

from .config import ReportConfig
from .engine import render_template
from .provenance import get_environment_info
from .quality import (
    CheckResult,
    check_constant_columns,
    check_flatline,
    check_missingness,
    check_outliers_zscore,
)


[docs] def _get_reducer_summary(reducer: Any) -> Dict[str, Any]: """Collect the strict summary payload from a reduction-like object.""" if not hasattr(reducer, "get_summary"): raise TypeError( "Reduction objects passed to Report.add_reduction() must implement " "get_summary()." ) summary = reducer.get_summary() if not isinstance(summary, dict): raise TypeError("Reducer get_summary() must return a dictionary.") metrics = summary.get("metrics") if not isinstance(metrics, dict): metrics = {} metric_records = summary.get("metric_records") if not isinstance(metric_records, list): metric_records = [] quality_metadata = summary.get("quality_metadata") if not isinstance(quality_metadata, dict): quality_metadata = {} diagnostics = summary.get("diagnostics") if not isinstance(diagnostics, dict): diagnostics = {} interpretation = summary.get("interpretation") if not isinstance(interpretation, dict): interpretation = {} interpretation_records = summary.get("interpretation_records") if not isinstance(interpretation_records, list): interpretation_records = [] return { "method": summary.get("method") or type(reducer).__name__, "metrics": metrics, "metric_records": metric_records, "quality_metadata": quality_metadata, "diagnostics": diagnostics, "interpretation": interpretation, "interpretation_records": interpretation_records, "capabilities": summary.get("capabilities") or {}, }
[docs] def _metrics_summary_table(metrics: Any) -> pd.DataFrame: """Reduce metric observations to a method x metric summary table.""" from coco_pipe.viz.utils import prepare_metrics_frame metrics_df = prepare_metrics_frame(metrics) if metrics_df.empty: return pd.DataFrame() return metrics_df.pivot_table( index="method", columns="metric", values="value", aggfunc="mean" )
[docs] def _trajectory_times( diagnostics: Dict[str, Any], times: Optional[np.ndarray] ) -> Optional[np.ndarray]: """Return the explicit trajectory time axis when it aligns with diagnostics.""" if times is not None: time_values = np.asarray(times).reshape(-1) if time_values.size > 0: return time_values diagnostic_times = diagnostics.get("trajectory_times_") if diagnostic_times is None: return None time_values = np.asarray(diagnostic_times).reshape(-1) return time_values if time_values.size > 0 else None
[docs] class Element(ABC): """ Abstract base class for all report elements. """
[docs] @abstractmethod def render(self) -> str: """Render the element to HTML.""" pass
[docs] def collect_payload(self, registry: Dict[str, Any]) -> None: """ Collect data to be stored in the global payload. Default implementation does nothing. Parameters ---------- registry : Dict[str, Any] Global dictionary accumulating data. Keyed by UUID. """ pass
[docs] class HtmlElement(Element): """ Wrapper for raw HTML content. Parameters ---------- html : str The raw HTML string to include. Examples -------- >>> elem = HtmlElement("<div>My Custom HTML</div>") >>> rep.add_element(elem) """ def __init__(self, html: str): self.html = html
[docs] def render(self) -> str: return self.html
[docs] class ImageElement(Element): """ Embeds an image or matplotlib figure as Base64. Parameters ---------- src : str, bytes, Path, or matplotlib.figure.Figure The image source. caption : str, optional Caption text for the figure. width : str, optional CSS width (e.g., '100%', '600px'). Default '100%'. Examples -------- >>> fig, ax = plt.subplots() >>> ax.plot([1, 2, 3]) >>> elem = ImageElement(fig, caption="My Plot") """ def __init__(self, src: Any, caption: Optional[str] = None, width: str = "100%"): self.src = src self.caption = caption self.width = width
[docs] def _encode_image(self) -> str: """Convert input to base64 string.""" # Check for Matplotlib Figure if hasattr(self.src, "savefig"): buf = io.BytesIO() self.src.savefig(buf, format="png", bbox_inches="tight", dpi=150) buf.seek(0) data = buf.read() return base64.b64encode(data).decode("utf-8") # Check for bytes if isinstance(self.src, bytes): return base64.b64encode(self.src).decode("utf-8") # Check for path (str or Path) if isinstance(self.src, (str, type(None))): # type check loose for Path pass # import pathlib below import pathlib if isinstance(self.src, (str, pathlib.Path)): p = pathlib.Path(self.src) if p.exists(): return base64.b64encode(p.read_bytes()).decode("utf-8") raise ValueError(f"Unsupported image source type: {type(self.src)}")
[docs] def render(self) -> str: b64_str = self._encode_image() html = f""" <figure class="my-6"> <img src="data:image/png;base64,{b64_str}" style="width: {self.width};" class="rounded shadow-sm mx-auto border border-gray-100"> { f'<figcaption class="text-center text-sm text-gray-500 mt-2">' f"{self.caption}</figcaption>" if self.caption else "" } </figure> """ return html
[docs] class PlotlyElement(Element): """ Embeds a Plotly figure using lazy loading and global data usage. Parameters ---------- figure : plotly.graph_objects.Figure The figure to render. height : str, optional Height of the plot plot container. Default "500px". Examples -------- >>> fig = go.Figure(data=go.Scatter(x=[1, 2], y=[3, 4])) >>> elem = PlotlyElement(fig) """ def __init__(self, figure: Any, height: str = "500px"): self.figure = figure self.height = height self.registry_id = None
[docs] def collect_payload(self, registry: Dict[str, Any]) -> None: """Extract figure data and store in registry.""" if self.registry_id is None: self.registry_id = str(uuid.uuid4()) json_str = self.figure.to_json() fig_dict = json.loads(json_str) fig_dict = self._force_standard_json(fig_dict) registry[self.registry_id] = fig_dict
[docs] def _force_standard_json(self, obj: Any) -> Any: """Recursively convert Plotly binary-encoded arrays to standard lists.""" if isinstance(obj, dict): # Check for Plotly binary format if "dtype" in obj and "bdata" in obj and len(obj) <= 3: # Identify keys like 'shape'? Usually just dtype/bdata. # Decode! try: import base64 dtype = obj["dtype"] bdata = obj["bdata"] # Map dtype string to numpy type # common: 'f4' (float32), 'f8' (float64), 'i4' (int32), 'u4'... decoded = base64.b64decode(bdata) arr = np.frombuffer(decoded, dtype=dtype) return arr.tolist() except Exception as e: print(f"[Report] Warning: Failed to decode binary data: {e}") return obj return {k: self._force_standard_json(v) for k, v in obj.items()} elif isinstance(obj, list): return [self._force_standard_json(x) for x in obj] return obj
[docs] def render(self) -> str: # Instead of dumping JSON, we reference the ID if self.registry_id is None: return self._render_inline() html = f""" <div class="my-6"> <div class="lazy-plot w-full rounded shadow-sm border border-gray-100 bg-gray-50 flex items-center justify-center text-gray-400 animate-pulse" style="height: {self.height};" data-id="{self.registry_id}"> <span class="sr-only">Loading Plot...</span> </div> </div> """ return html
[docs] def _render_inline(self) -> str: fig_dict = self.figure.to_dict() json_str = json.dumps(fig_dict) safe_json = json_str.replace('"', "&quot;") return f""" <div class="my-6"> <div class="lazy-plot w-full rounded shadow-sm border border-gray-100 bg-gray-50 flex items-center justify-center text-gray-400 animate-pulse" style="height: {self.height};" data-figure="{safe_json}"> <span class="sr-only">Loading Plot...</span> </div> </div> """
[docs] class TableElement(Element): """ Renders a Pandas DataFrame or Dict as a styled HTML table. Parameters ---------- data : DataFrame, Dict, or List[Dict] Data to display. title : str, optional Title describing the table. Examples -------- >>> df = pd.DataFrame({'A': [1, 2], 'B': [3, 4]}) >>> elem = TableElement(df, title="Metrics") """ def __init__(self, data: Any, title: Optional[str] = None): self.data = data self.title = title self.table_id = f"table-{uuid.uuid4().hex[:8]}"
[docs] @staticmethod def _to_frame(data: Any) -> pd.DataFrame: """Normalize supported table-like inputs to a DataFrame.""" if isinstance(data, pd.DataFrame): return data if isinstance(data, dict): if all( isinstance(v, (int, float, str, np.number)) or v is None for v in data.values() ): return pd.DataFrame([data]) return pd.DataFrame(data) return pd.DataFrame(data)
[docs] def render(self) -> str: df = self._to_frame(self.data) # Basic Tailwind Styling html = '<div class="overflow-x-auto my-4 group relative">' if self.title: html += f""" <div class="flex justify-between items-center mb-2"> <h4 class="text-sm font-semibold text-gray-700 dark:text-gray-300 uppercase tracking-wide"> {self.title} </h4> <button onclick="exportTableToCSV( '{self.table_id}', '{self.title or "data"}')" class="text-xs px-2 py-1 bg-gray-100 hover:bg-gray-200 dark:bg-gray-800 dark:hover:bg-gray-700 rounded text-gray-500 transition opacity-0 group-hover:opacity-100"> ⬇ CSV </button> </div> """ # Render Table # Render Table html += ( f'<table id="{self.table_id}" class="min-w-full divide-y divide-gray-200 ' 'dark:divide-gray-700 border dark:border-gray-700 text-sm">' ) # Header html += '<thead class="bg-gray-50 dark:bg-gray-800"><tr>' for col in df.columns: html += ( f'<th class="px-4 py-3 text-left text-xs font-medium text-gray-500 ' f'dark:text-gray-400 uppercase tracking-wider">{col}</th>' ) html += "</tr></thead>" # Body # Body html += ( '<tbody class="bg-white dark:bg-gray-900 divide-y divide-gray-200 ' 'dark:divide-gray-700">' ) for idx, row in df.iterrows(): html += self._render_row(row, idx) html += "</tbody></table></div>" return html
[docs] def _render_row(self, row, idx) -> str: """Render a single row. Can be overridden.""" html = "<tr>" for val in row: html += ( f'<td class="px-4 py-3 whitespace-nowrap text-gray-700 ' f'dark:text-gray-300">{val}</td>' ) html += "</tr>" return html
[docs] class InteractiveTableElement(Element): """Render a payload-backed interactive data table.""" def __init__( self, data: Any, title: Optional[str] = None, selector_columns: Optional[List[str]] = None, default_sort: Optional[Dict[str, str]] = None, page_size: int = 50, ): self.data = data self.title = title self.selector_columns = list(selector_columns or []) self.default_sort = dict(default_sort) if default_sort else None self.page_size = int(page_size) self.registry_id: Optional[str] = None
[docs] def collect_payload(self, registry: Dict[str, Any]) -> None: if self.registry_id is None: self.registry_id = str(uuid.uuid4()) df = TableElement._to_frame(self.data) payload = { "columns": [str(column) for column in df.columns], "rows": json.loads(df.to_json(orient="records", date_format="iso")), } registry[self.registry_id] = payload
[docs] def render(self) -> str: if self.registry_id is None: self.registry_id = str(uuid.uuid4()) config = { "title": self.title, "selector_columns": self.selector_columns, "default_sort": self.default_sort, "page_size": self.page_size, } config_json = html.escape(json.dumps(config), quote=True) title_html = "" if self.title: title_html = f""" <div class="flex justify-between items-center mb-3"> <h4 class="text-sm font-semibold text-gray-700 dark:text-gray-300 uppercase tracking-wide"> {self.title} </h4> </div> """ return f""" <div class="my-4"> {title_html} <div class="interactive-table" data-id="{self.registry_id}" data-config="{config_json}"> <div class="rounded border border-gray-200 dark:border-gray-700 bg-white dark:bg-gray-900 p-4 text-sm text-gray-500 dark:text-gray-400"> Loading interactive table... </div> </div> </div> """
[docs] class MetricsTableElement(TableElement): """ Comparison table that highlights best values. Parameters ---------- data : DataFrame Comparison data (rows=methods, cols=metrics). highlight_cols : List[str], optional Columns to highlight best values in. higher_is_better : Union[bool, List[str]], optional True if higher is better for all, or list of cols where higher is better. Default True. """ def __init__( self, data: Any, title: str = "Comparison Metrics", highlight_cols: Optional[List[str]] = None, higher_is_better: Union[bool, List[str]] = True, ): super().__init__(data, title) self.highlight_cols = highlight_cols self.higher_is_better = higher_is_better # Pre-compute best values self.best_vals = {} if isinstance(self.data, pd.DataFrame): cols = ( self.highlight_cols if self.highlight_cols else self.data.select_dtypes(include=[np.number]).columns ) for col in cols: if col not in self.data.columns: continue # Determine direction is_higher = True if isinstance(self.higher_is_better, list): is_higher = col in self.higher_is_better else: is_higher = self.higher_is_better if is_higher: self.best_vals[col] = self.data[col].max() else: self.best_vals[col] = self.data[col].min()
[docs] def _render_row(self, row, idx) -> str: html = "<tr>" for col, val in row.items(): # Check if best is_best = False if col in self.best_vals and np.isclose(val, self.best_vals[col]): is_best = True style = "text-gray-700 dark:text-gray-300" if is_best: style = ( "font-bold text-green-600 dark:text-green-400 bg-green-50 " "dark:bg-green-900/20" ) # Format numbers display_val = val if isinstance(val, float): display_val = f"{val:.4f}" html += ( f'<td class="px-4 py-3 whitespace-nowrap {style}">{display_val}</td>' ) html += "</tr>" return html
[docs] class ContainerElement(Element): """ Base class for elements that contain other elements. """ def __init__(self): self.children: List[Element] = []
[docs] def add_element(self, element: Union[Element, str]): """ Add a child element. Parameters ---------- element : Element or str The element to add. specific strings are converted to HtmlElement. Returns ------- self Fluent interface. """ if isinstance(element, str): element = HtmlElement(element) self.children.append(element) return self # Fluent interface
[docs] def add_markdown(self, text: str) -> "ContainerElement": """ Add a markdown block. Note: Requires 'markdown' package. If not present, falls back to raw text in <pre>. """ try: import markdown html = markdown.markdown(text, extensions=["extra"]) # Wrap in prose class for consistent styling wrapper = ( f'<div class="prose prose-sm max-w-none text-gray-700 ' f'dark:text-gray-200 dark:prose-invert">{html}</div>' ) self.add_element(HtmlElement(wrapper)) except ImportError: # Fallback safe_text = text.replace("<", "&lt;").replace(">", "&gt;") html = ( f'<div class="whitespace-pre-wrap font-mono text-sm bg-gray-50 p-4 ' f'rounded">{safe_text}</div>' ) self.add_element(HtmlElement(html)) return self
[docs] def render_children(self) -> str: """Render all child elements concatenated.""" return "\n".join([c.render() for c in self.children])
[docs] def collect_payload(self, registry: Dict[str, Any]) -> None: """Recursively collect payload from children.""" for child in self.children: child.collect_payload(registry)
[docs] def render(self) -> str: return self.render_children()
[docs] class Section(ContainerElement): """ A logical section of the report. Parameters ---------- title : str The section title. icon : str, optional SVG icon or emoji to display next to the title. tags : List[str], optional Tags for filtering. status : str, optional Status string ("OK", "WARN", "FAIL"). Default "OK". code : str, optional Source code snippet to reproduce this section. Examples -------- >>> sec = Section("Results", icon="📈", status="OK") >>> sec.add_element(plotly_element) >>> rep.add_section(sec) """ def __init__( self, title: str, icon: Optional[str] = None, tags: Optional[List[str]] = None, status: str = "OK", code: Optional[str] = None, ): super().__init__() self.title = title self.icon = icon self.tags = tags if tags else [] self.status = status self.code = code self.findings: List[Dict] = [] # List of serialized CheckResults # Generated ID (slugify) self.id = re.sub(r"[^a-z0-9]+", "-", self.title.lower()).strip("-")
[docs] def add_finding(self, result: CheckResult) -> None: """Add a quality finding and automatically update status.""" self.findings.append(result.__dict__) # Store as dict for JSON serialization # Upgrade status logic if result.status == "FAIL": self.status = "FAIL" elif result.status == "WARN" and self.status != "FAIL": self.status = "WARN"
[docs] def render(self) -> str: content = self.render_children() return render_template( "section.html", title=self.title, icon=self.icon, content=content, id=self.id, tags=json.dumps(self.tags), status=self.status, code=self.code, findings=self.findings, # Pass list of dicts for Jinja iteration )
[docs] class Report(ContainerElement): """ The main report container. Parameters ---------- title : str The report title. config : Union[Dict, ReportConfig], optional Configuration dictionary or ReportConfig object used for the run. """ def __init__( self, title: str = "CoCo Analysis Report", config: Optional[Union[Dict, ReportConfig]] = None, ): super().__init__() self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M") # Validate/Coerce Config if config is None: config = {} if isinstance(config, dict): # If title is in config, it overrides arg if "title" in config: title = config["title"] else: # Ensure the argument title takes precedence over ReportConfig default config["title"] = title try: self.config = ReportConfig(**config) except Exception: # If direct validation fails, assume it's a bag of parameters self.config = ReportConfig(title=title, run_params=config) else: self.config = config # Ensure title sync self.title = self.config.title # Auto-capture environment provenance if not provided if self.config.provenance is None: # metadata from existing functionality raw_meta = get_environment_info() # raw_meta keys match ProvenanceConfig closely? # get_environment_info returns: timestamp_utc, os_platform, # python_version, command, git_hash, versions... # This matches ProvenanceConfig fields. from .config import ProvenanceConfig self.config.provenance = ProvenanceConfig(**raw_meta) self.metadata = self.config.provenance.model_dump()
[docs] def add_section(self, section: Section) -> "Report": """Syntactic sugar for adding a Section.""" return self.add_element(section)
[docs] def add_figure(self, fig: Any, caption: Optional[str] = None) -> "Report": """ Add a figure (Matplotlib) or Image. """ self.add_element(ImageElement(fig, caption=caption)) return self
[docs] def add_container( self, container: Any, name: str = "Data Overview", show_coords: bool = True, show_dist: bool = True, ) -> "Report": """ Add a summary section for a DataContainer. Automatically runs quality checks (Missingness, Constants). Parameters ---------- container : DataContainer The data container to summarize. name : str Title for the section. show_coords : bool If True, shows the table of coordinates. show_dist : bool If True, shows the data/class distribution plot. """ try: # Create Section sec = Section(title=name, icon="💾") # Dimensions dims_data = [ {"Dimension": d, "Size": s} for d, s in zip(container.dims, container.shape) ] sec.add_element(TableElement(dims_data, title="Dimensions")) # Coordinates Info if show_coords and container.coords: coords_data = [ {"Name": k, "Type": str(np.array(v).dtype), "Count": len(v)} for k, v in container.coords.items() ] sec.add_element(TableElement(coords_data, title="Coordinates")) # 2. Distribution Plot if show_dist: try: # Quality Checks if container.X is not None: res_missing = check_missingness(container.X) if res_missing.is_issue: sec.add_finding(res_missing) for res in check_constant_columns(container.X): sec.add_finding(res) import matplotlib.pyplot as plt fig, ax = plt.subplots(figsize=(6, 3)) if container.y is not None: y_series = pd.Series(container.y) y_series.value_counts().plot(kind="bar", ax=ax, color="skyblue") ax.set_title("Class Distribution") ax.set_xlabel("Class") ax.set_ylabel("Count") caption = "Target label distribution." else: data_flat = container.X.flatten() if len(data_flat) > 5000: data_flat = np.random.choice(data_flat, 5000, replace=False) ax.hist(data_flat, bins=30, color="gray", alpha=0.7) ax.set_title("Data Value Distribution (Sampled)") caption = "Histogram of data values." plt.tight_layout() sec.add_element(ImageElement(fig, caption=caption, width="80%")) plt.close(fig) except Exception as e: msg = f"Could not generate plot: {e}" html = f"<div class='text-red-500 text-xs'>{msg}</div>" sec.add_element(HtmlElement(html)) self.add_section(sec) except Exception as e: import warnings warnings.warn(f"Failed to add container info to report: {e}", UserWarning) return self
[docs] def add_reduction( self, reducer: Any, name: Optional[str] = None, *, X_emb: Optional[np.ndarray] = None, labels: Optional[np.ndarray] = None, metadata: Optional[Dict[str, Any]] = None, times: Optional[np.ndarray] = None, ) -> "Report": """ Add one scored and optionally interpreted reduction result to the report. Parameters ---------- reducer : Any Reduction object implementing ``get_summary()``. name : str, optional Section title. Defaults to the reduction method name. X_emb : np.ndarray, optional Explicit embedding to visualize. When omitted, the section renders scalar summaries, diagnostics, and interpretation outputs only. labels : np.ndarray, optional Optional labels aligned with ``X_emb`` for embedding or trajectory plots. metadata : dict, optional Optional column-oriented metadata aligned with the sample axis of a 2D embedding. times : np.ndarray, optional Optional explicit time axis aligned with the time dimension of a 3D trajectory tensor. Returns ------- Report The report instance for fluent chaining. Raises ------ ValueError If the supplied embedding or aligned plotting metadata are invalid. TypeError If ``reducer`` does not implement the strict summary contract. See Also -------- coco_pipe.dim_reduction.core.DimReduction.get_summary coco_pipe.viz.plotly_utils.plot_embedding_interactive coco_pipe.viz.plotly_utils.plot_interpretation_interactive """ summary = _get_reducer_summary(reducer) method_name = summary["method"] title = name or method_name sec = Section(title=title, icon="📉") if X_emb is not None: emb = np.asarray(X_emb) if emb.ndim == 2: from coco_pipe.viz.plotly_utils import plot_embedding_interactive fig = plot_embedding_interactive( embedding=emb, labels=labels, metadata=metadata, title=f"{title} Embedding", dimensions=min(emb.shape[1], 3), ) sec.add_element(PlotlyElement(fig)) elif emb.ndim == 3: from coco_pipe.viz.plotly_utils import plot_trajectory_interactive time_values = _trajectory_times(summary["diagnostics"], times) fig = plot_trajectory_interactive( emb, times=time_values, labels=labels, title=f"{title} Trajectory", dimensions=min(emb.shape[-1], 3), ) sec.add_element(PlotlyElement(fig)) else: msg = "`X_emb` must be a 2D embedding or 3D trajectory tensor." raise ValueError(msg) metrics = summary["metrics"] quality_metadata = summary["quality_metadata"] scalar_table = { **{ key: value for key, value in metrics.items() if isinstance(value, (int, float, np.number)) and not isinstance(value, bool) }, **{ key: value for key, value in quality_metadata.items() if isinstance(value, (int, float, np.number)) and not isinstance(value, bool) }, } if scalar_table: sec.add_element(TableElement(scalar_table, title="Quality Metrics")) metric_records = summary["metric_records"] if metric_records: from coco_pipe.viz.plotly_utils import plot_metric_details sec.add_element( PlotlyElement( plot_metric_details(metric_records, title="Metric Details"), height="380px", ) ) diagnostics = summary["diagnostics"] loss_history = diagnostics.get("loss_history_") if loss_history is not None: from coco_pipe.viz.plotly_utils import plot_loss_history_interactive sec.add_element( PlotlyElement( plot_loss_history_interactive(loss_history), height="350px", ) ) explained_variance = diagnostics.get("explained_variance_ratio_") if explained_variance is not None: from coco_pipe.viz.plotly_utils import plot_scree_interactive sec.add_element( PlotlyElement( plot_scree_interactive(explained_variance), height="350px", ) ) coranking = diagnostics.get("coranking_matrix_") if coranking is not None: import plotly.graph_objects as go fig_coranking = go.Figure( data=[ go.Heatmap( z=np.asarray(coranking), colorscale="Viridis", colorbar=dict(title="Count"), ) ] ) fig_coranking.update_layout( title="Co-Ranking Matrix", xaxis_title="Embedded Rank", yaxis_title="Original Rank", margin=dict(l=40, r=40, b=40, t=40), template="plotly_white", ) sec.add_element(PlotlyElement(fig_coranking, height="420px")) from coco_pipe.viz.plotly_utils import ( plot_interpretation_interactive, plot_trajectory_metric_series_interactive, ) time_values = _trajectory_times(diagnostics, times) trajectory_series = ( "trajectory_speed_", "trajectory_acceleration_", "trajectory_curvature_", "trajectory_turning_angle_", "trajectory_dispersion_", "trajectory_path_length_", "trajectory_displacement_", ) for metric_key in trajectory_series: values = diagnostics.get(metric_key) if values is None: continue sec.add_element( PlotlyElement( plot_trajectory_metric_series_interactive( values, times=time_values, labels=labels, title=metric_key.rstrip("_").replace("_", " ").title(), ylabel="Value", ), height="360px", ) ) separation = diagnostics.get("trajectory_separation_") if separation is not None: sec.add_element( PlotlyElement( plot_trajectory_metric_series_interactive( separation, times=time_values, title="Trajectory Separation", ylabel="Separation", ), height="380px", ) ) interpretation = summary["interpretation"] if interpretation: interpretation_payload = { "analysis": interpretation, "records": summary["interpretation_records"], } record_analyses = { record.get("analysis") for record in summary["interpretation_records"] if isinstance(record, dict) and record.get("analysis") } analysis_names = sorted(record_analyses | set(interpretation.keys())) for analysis_name in analysis_names: sec.add_element( PlotlyElement( plot_interpretation_interactive( interpretation_payload, analysis=analysis_name, title=( f"{analysis_name.replace('_', ' ').title()} " "Interpretation" ), method=method_name, ), height="420px", ) ) self.add_section(sec) return self
[docs] def add_raw_preview(self, data: Any, name: str = "Raw Data Inspector") -> "Report": """ Add an interactive scroller for raw data. Automatically checks for flatlines and outliers. Parameters ---------- data : DataContainer or np.ndarray The data to visualize. name : str Section title. """ sec = Section(title=name, icon="🔍") # Extract array X = data names = None if hasattr(data, "X"): # DataContainer X = data.X try: sample_X = X if X.size < 10000 else X.flat[:10000] res_flat = check_flatline(sample_X) if res_flat.is_issue: sec.add_finding(res_flat) res_outlier = check_outliers_zscore(sample_X) if res_outlier: sec.add_finding(res_outlier) except Exception: pass # Ensure 2D if hasattr(X, "ndim") and X.ndim == 1: X = X.reshape(-1, 1) if hasattr(X, "ndim") and X.ndim > 2: # Concatenating for flattened view X = X.reshape(X.shape[0] * X.shape[1], -1) from coco_pipe.viz.plotly_utils import plot_raw_preview fig = plot_raw_preview(X, names=names, title=name) sec.add_element(PlotlyElement(fig, height="450px")) self.add_section(sec) return self
[docs] def add_comparison( self, metrics_df: Any, name: str = "Method Comparison" ) -> "Report": """ Add a comparison section for multiple reduction methods. Parameters ---------- metrics_df : DataFrame or MethodSelector-like Wide/tidy metric data or an object exposing ``to_frame()``. name : str Section title. Returns ------- Report The report instance for fluent chaining. Raises ------ ValueError If no comparison metrics are available after normalization. See Also -------- coco_pipe.viz.plotly_utils.plot_metric_details coco_pipe.dim_reduction.evaluation.core.MethodSelector """ sec = Section(title=name, icon="📊") from coco_pipe.viz.plotly_utils import ( plot_metric_details, plot_radar_comparison, ) from coco_pipe.viz.utils import infer_metric_plot_type, prepare_metrics_frame long_df = prepare_metrics_frame(metrics_df) summary_table = _metrics_summary_table(long_df) if summary_table.empty: raise ValueError("No comparison metrics available to add to the report.") # 1. Metrics Table (Best values highlighted) sec.add_element(MetricsTableElement(summary_table, title="Quality Metrics")) # 2. Primary visual summaries fig_heatmap = plot_metric_details( long_df, title="Metric Heatmap", plot_type="heatmap" ) sec.add_element(PlotlyElement(fig_heatmap, height="400px")) primary_plot_type = infer_metric_plot_type(long_df) fig_primary = plot_metric_details( long_df, title="Metric Details", plot_type=primary_plot_type ) sec.add_element(PlotlyElement(fig_primary, height="400px")) if ( long_df["scope_value"].astype(str).nunique() == 1 and summary_table.shape[1] >= 3 and summary_table.shape[0] >= 2 ): fig_radar = plot_radar_comparison(summary_table, normalize=True) sec.add_element(PlotlyElement(fig_radar, height="400px")) self.add_section(sec) return self
[docs] def render(self) -> str: """ Render the full HTML report. Collates payloads, compresses data, and passes to template. """ # 1. Collect Payload (Global Data Store) data_registry = {} self.collect_payload(data_registry) # 2. Compress Payload (JSON -> Gzip -> Base64) payload_json = json.dumps(data_registry).encode("utf-8") compressed = gzip.compress(payload_json) payload_b64 = base64.b64encode(compressed).decode("utf-8") # 3. Get content from children (Sections) # Note: Children now render with data-id references since collect_payload # was called. content_html = super().render() # Build TOC Structure from Sections toc = [] for child in self.children: if isinstance(child, Section): toc.append( { "id": child.id, "title": child.title, "icon": child.icon, "status": child.status, } ) # Wrap in base template return render_template( "base.html", title=self.title, content=content_html, timestamp=self.timestamp, toc=toc, metadata=self.metadata, config=self.config.model_dump_json(indent=2), payload=payload_b64, )
[docs] def save(self, filename: str) -> None: """ Render and save the report to a file. Parameters ---------- filename : str Path to save the HTML file. """ full_html = self.render() with open(filename, "w", encoding="utf-8") as f: f.write(full_html)