Source code for floweaver.compiler.spec

"""Spec schema for compiled Sankey definitions.

This module defines the JSON-serializable spec format that represents a fully
compiled Sankey diagram definition, with selections/partitions expanded and
converted to explicit filters. The spec can be executed against flow data to
produce SankeyData results.

"""

import attr
from typing import Dict, List, Optional, Tuple, Union

from ..sankey_definition import BundleID
from .tree import Node, tree_from_dict, tree_to_dict


[docs] @attr.s(frozen=True) class MeasureSpec: """Specification for a measure to aggregate from flow data.""" column: str = attr.ib() aggregation: str = attr.ib() # 'sum' or 'mean'
[docs] def to_json(self) -> dict: """Convert to JSON-serializable dict.""" return { "column": self.column, "aggregation": self.aggregation, }
[docs] @classmethod def from_json(cls, data: dict) -> "MeasureSpec": """Create from JSON dict.""" return cls( column=data["column"], aggregation=data["aggregation"], )
[docs] @attr.s(frozen=True) class NodeSpec: """Specification for a single node in the Sankey diagram.""" title: str = attr.ib() type: str = attr.ib() # 'process' or 'group' group: Optional[str] = attr.ib() style: str = attr.ib() direction: str = attr.ib() # 'R' or 'L' hidden: bool = attr.ib(default=False)
[docs] def to_json(self) -> dict: """Convert to JSON-serializable dict.""" return { "title": self.title, "type": self.type, "group": self.group, "style": self.style, "direction": self.direction, "hidden": self.hidden, }
[docs] @classmethod def from_json(cls, data: dict) -> "NodeSpec": """Create from JSON dict.""" return cls( title=data["title"], type=data["type"], group=data.get("group"), style=data["style"], direction=data["direction"], hidden=data.get("hidden", False), )
[docs] @attr.s(frozen=True) class GroupSpec: """Specification for a process group.""" id: str = attr.ib() title: str = attr.ib() nodes: List[str] = attr.ib()
[docs] def to_json(self) -> dict: """Convert to JSON-serializable dict.""" return { "id": self.id, "title": self.title, "nodes": self.nodes, }
[docs] @classmethod def from_json(cls, data: dict) -> "GroupSpec": """Create from JSON dict.""" return cls( id=data["id"], title=data["title"], nodes=data["nodes"], )
[docs] @attr.s(frozen=True) class BundleSpec: """Specification for a bundle (provenance for edges).""" id: BundleID = attr.ib() source: str = attr.ib() # ProcessGroup ID or 'Elsewhere' target: str = attr.ib() # ProcessGroup ID or 'Elsewhere'
[docs] def to_json(self) -> dict: """Convert to JSON-serializable dict.""" return { "id": self.id, "source": self.source, "target": self.target, }
[docs] @classmethod def from_json(cls, data: dict) -> "BundleSpec": """Create from JSON dict.""" return cls( id=data["id"], source=data["source"], target=data["target"], )
[docs] @attr.s(frozen=True) class EdgeSpec: """Specification for a single edge in the Sankey diagram.""" source: Optional[str] = attr.ib() # None = from elsewhere target: Optional[str] = attr.ib() # None = to elsewhere type: str = attr.ib() # flow type (from flow_partition) time: str = attr.ib() # time key (from time_partition) bundle_ids: List[BundleID] = ( attr.ib() ) # bundle IDs this edge represents (for titles/provenance)
[docs] def to_json(self) -> dict: """Convert to JSON-serializable dict.""" return { "source": self.source, "target": self.target, "type": self.type, "time": self.time, "bundle_ids": self.bundle_ids, }
[docs] @classmethod def from_json(cls, data: dict) -> "EdgeSpec": """Create from JSON dict.""" return cls( source=data.get("source"), target=data.get("target"), type=data["type"], time=data["time"], bundle_ids=data["bundle_ids"], )
[docs] @attr.s(frozen=True) class CategoricalColorSpec: """Categorical color scale specification.""" attribute: str = attr.ib() # 'type', 'source', 'target', or measure name lookup: Dict[str, str] = attr.ib() # value -> hex color default: str = attr.ib() # fallback color for unknown values type: str = attr.ib(default="categorical")
[docs] def to_json(self) -> dict: """Convert to JSON-serializable dict.""" return { "type": self.type, "attr": self.attribute, "lookup": self.lookup, "default": self.default, }
[docs] @classmethod def from_json(cls, data: dict) -> "CategoricalColorSpec": """Create from JSON dict.""" return cls( type=data.get("type", "categorical"), attribute=data["attr"], lookup=data["lookup"], default=data["default"], )
[docs] @attr.s(frozen=True) class QuantitativeColorSpec: """Quantitative color scale specification.""" attribute: str = attr.ib() # measure name palette: List[str] = attr.ib() # array of hex colors for interpolation domain: Tuple[float, float] = attr.ib() # [min, max] for normalization type: str = attr.ib(default="quantitative") intensity: Optional[str] = attr.ib(default=None) # optional measure to normalize by
[docs] def to_json(self) -> dict: """Convert to JSON-serializable dict.""" return { "type": self.type, "attr": self.attribute, "palette": self.palette, "domain": list(self.domain), # Convert tuple to list for JSON "intensity": self.intensity, }
[docs] @classmethod def from_json(cls, data: dict) -> "QuantitativeColorSpec": """Create from JSON dict.""" return cls( type=data.get("type", "quantitative"), attribute=data["attr"], palette=data["palette"], domain=tuple(data["domain"]), # Convert list to tuple intensity=data.get("intensity"), )
# Type alias for color specs ColorSpec = Union[CategoricalColorSpec, QuantitativeColorSpec]
[docs] @attr.s(frozen=True) class DisplaySpec: """Display configuration for the Sankey diagram.""" link_width: str = attr.ib() # measure name link_color: ColorSpec = attr.ib()
[docs] def to_json(self) -> dict: """Convert to JSON-serializable dict.""" return { "link_width": self.link_width, "link_color": self.link_color.to_json(), }
[docs] @classmethod def from_json(cls, data: dict) -> "DisplaySpec": """Create from JSON dict.""" color_data = data["link_color"] if color_data.get("type") == "quantitative": link_color = QuantitativeColorSpec.from_json(color_data) else: link_color = CategoricalColorSpec.from_json(color_data) return cls( link_width=data["link_width"], link_color=link_color, )
[docs] @attr.s(frozen=True) class WeaverSpec: """Complete specification for a compiled Sankey diagram. This is the top-level spec that can be serialized to JSON and executed against flow data to produce a SankeyData object. """ version: str = attr.ib() nodes: Dict[str, NodeSpec] = attr.ib() groups: List[GroupSpec] = attr.ib() # ProcessGroup provenance for nodes bundles: List[BundleSpec] = attr.ib() # Bundle provenance for edges ordering: List[List[List[str]]] = attr.ib() edges: List[EdgeSpec] = ( attr.ib() ) # includes elsewhere edges (source=None or target=None) measures: List[MeasureSpec] = attr.ib() display: DisplaySpec = attr.ib() routing_tree: Node[tuple[int, ...]] = attr.ib() # decision tree for flow routing
[docs] def to_json(self) -> dict: """Convert to JSON-serializable dict.""" result = { "version": self.version, "nodes": {k: v.to_json() for k, v in self.nodes.items()}, "groups": [g.to_json() for g in self.groups], "bundles": [b.to_json() for b in self.bundles], "ordering": self.ordering, "edges": [e.to_json() for e in self.edges], "measures": [m.to_json() for m in self.measures], "display": self.display.to_json(), } if self.routing_tree is not None: result["routing_tree"] = tree_to_dict(self.routing_tree) return result
[docs] @classmethod def from_json(cls, data: dict) -> "WeaverSpec": """Create from JSON dict.""" return cls( version=data["version"], nodes={k: NodeSpec.from_json(v) for k, v in data["nodes"].items()}, groups=[GroupSpec.from_json(g) for g in data["groups"]], bundles=[BundleSpec.from_json(b) for b in data["bundles"]], ordering=data["ordering"], edges=[EdgeSpec.from_json(e) for e in data["edges"]], measures=[MeasureSpec.from_json(m) for m in data["measures"]], display=DisplaySpec.from_json(data["display"]), routing_tree=tree_from_dict(data["routing_tree"]), )