import pandas as pd
import networkx as nx
from .partition import Partition
def leaves_below(tree, node):
return set(
sum(
(
[vv for vv in v if tree.out_degree(vv) == 0]
for k, v in nx.dfs_successors(tree, node).items()
),
[],
)
)
class Resolver:
def __init__(self, df, column):
self.df = df
self.column = column
def __iter__(self):
# XXX hack to avoid __getitem__ being called with integer indices, but
# to have non-zero len.
return iter(["keys"])
def __getitem__(self, k):
if not self.column:
col = k
elif k == "id":
col = self.column
else:
col = "{}.{}".format(self.column, k)
return self.df[col]
def eval_selection(df, column, sel):
if isinstance(sel, (list, tuple)):
return df[column].isin(sel)
elif isinstance(sel, str):
resolver = Resolver(df, column)
return df.eval(sel, local_dict={}, global_dict={}, resolvers=(resolver,))
else:
raise TypeError("Unknown selection type: %s" % type(sel))
[docs]
class Dataset:
def __init__(self, flows, dim_process=None, dim_material=None, dim_time=None):
if dim_process is not None and not dim_process.index.is_unique:
raise ValueError("dim_process index not unique")
if dim_material is not None and not dim_material.index.is_unique:
raise ValueError("dim_material index not unique")
if dim_time is not None and not dim_time.index.is_unique:
raise ValueError("dim_time index not unique")
# Fixed bug: make sure flows index is unique
flows = flows.reset_index(drop=True)
self._flows = flows
self._dim_process = dim_process
self._dim_material = dim_material
self._dim_time = dim_time
self._table = flows
if dim_process is not None:
self._table = self._table.join(
dim_process.add_prefix("source."), on="source"
).join(dim_process.add_prefix("target."), on="target")
if dim_material is not None:
self._table = self._table.join(
dim_material.add_prefix("material."), on="material"
)
if dim_time is not None:
self._table = self._table.join(dim_time.add_prefix("time."), on="time")
def __repr__(self):
return (
f"Dataset(flows={self._flows!r}"
+ (
f", dim_process={self._dim_process!r}"
if self._dim_process is not None
else ""
)
+ (
f", dim_material={self._dim_material!r}"
if self._dim_material is not None
else ""
)
+ (f", dim_time={self._dim_time!r}" if self._dim_time is not None else "")
+ ")"
)
def partition(self, dimension, processes=None):
"""Partition of all values of `dimension` within `processes`"""
if processes:
q = self._table.source.isin(processes) | self._table.target.isin(processes)
values = self._table.loc[q, dimension].unique()
else:
values = self._table[dimension].unique()
return Partition.Simple(dimension, values)
def apply_view(self, process_groups, bundles, flow_selection=None):
return _apply_view(self, process_groups, bundles, flow_selection)
def save(self, filename):
with pd.HDFStore(filename) as store:
store["flows"] = self._flows
if self._dim_process is not None:
store["dim_process"] = self._dim_process
if self._dim_material is not None:
store["dim_material"] = self._dim_material
if self._dim_time is not None:
store["dim_time"] = self._dim_time
@classmethod
def from_hdf(cls, filename):
with pd.HDFStore(filename) as store:
return cls(
store["flows"],
store["dim_process"] if "dim_process" in store else None,
store["dim_material"] if "dim_material" in store else None,
store["dim_time"] if "dim_time" in store else None,
)
@classmethod
def from_csv(
cls,
flows_filename,
dim_process_filename=None,
dim_material_filename=None,
dim_time_filename=None,
):
def read(filename):
if filename is not None:
return pd.read_csv(filename).set_index("id")
else:
return None
flows = pd.read_csv(flows_filename)
dim_process = read(dim_process_filename)
dim_material = read(dim_material_filename)
dim_time = read(dim_time_filename)
return cls(flows, dim_process, dim_material, dim_time)
def find_flows(flows, source_query, target_query, flow_query=None, ignore_edges=None):
"""Filter flows according to source_query, target_query, and flow_query."""
if flow_query is not None:
flows = flows[eval_selection(flows, "", flow_query)]
if source_query is None and target_query is None:
raise ValueError("source_query and target_query cannot both be None")
elif source_query is None and target_query is not None:
qt = eval_selection(flows, "target", target_query)
qs = ~eval_selection(flows, "source", target_query) & ~flows.index.isin(
ignore_edges or []
)
elif source_query is not None and target_query is None:
qs = eval_selection(flows, "source", source_query)
qt = ~eval_selection(flows, "target", source_query) & ~flows.index.isin(
ignore_edges or []
)
else:
qs = eval_selection(flows, "source", source_query)
qt = eval_selection(flows, "target", target_query)
f = flows[qs & qt]
if source_query is None:
internal_source = None
else:
internal_source = flows[qs & eval_selection(flows, "target", source_query)]
if target_query is None:
internal_target = None
else:
internal_target = flows[qt & eval_selection(flows, "source", target_query)]
return f, internal_source, internal_target
def _apply_view(dataset, process_groups, bundles, flow_selection):
# What we want to warn about is flows between process_groups in the view_graph; they
# are "used", since they appear in Elsewhere bundles, but the connection
# isn't visible.
used_edges = set()
used_internal = set()
used_process_groups = set()
bundle_flows = {}
table = dataset._table
if flow_selection:
table = table[eval_selection(table, "", flow_selection)]
for k, bundle in bundles.items():
if bundle.from_elsewhere or bundle.to_elsewhere:
continue # do these afterwards
source = process_groups[bundle.source]
target = process_groups[bundle.target]
flows, internal_source, internal_target = find_flows(
table, source.selection, target.selection, bundle.flow_selection
)
assert len(used_edges.intersection(flows.index.values)) == 0, "duplicate bundle"
bundle_flows[k] = flows
used_edges.update(flows.index.values)
used_process_groups.update(flows.source)
used_process_groups.update(flows.target)
# Also marked internal edges as "used"
used_internal.update(internal_source.index.values)
used_internal.update(internal_target.index.values)
for k, bundle in bundles.items():
if bundle.from_elsewhere and bundle.to_elsewhere:
raise ValueError("Cannot have flow from Elsewhere to Elsewhere")
elif bundle.from_elsewhere:
target = process_groups[bundle.target]
flows, _, _ = find_flows(
table, None, target.selection, bundle.flow_selection, used_edges
)
used_process_groups.add(bundle.target)
elif bundle.to_elsewhere:
source = process_groups[bundle.source]
flows, _, _ = find_flows(
table, source.selection, None, bundle.flow_selection, used_edges
)
used_process_groups.add(bundle.source)
else:
continue
bundle_flows[k] = flows
# XXX shouldn't this check processes in selections, not process groups?
# Check set of process_groups
relevant_flows = dataset._flows[
dataset._flows.source.isin(used_process_groups)
& dataset._flows.target.isin(used_process_groups)
]
unused_flows = relevant_flows[
~relevant_flows.index.isin(used_edges)
& ~relevant_flows.index.isin(used_internal)
]
return bundle_flows, unused_flows