Add replay graph core skeleton#150
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a set of deterministic graph utility functions for replay validation, including edge normalization, adjacency mapping, reachability analysis, and sequence ordering checks. The feedback identifies several logic and performance improvements: handling duplicate nodes in sequence ordering by using the first occurrence, removing redundant normalization calls when calculating node sets, optimizing reachability queries via optional pre-computed adjacency maps, and simplifying the return logic in the reachability helper.
| required_before: Iterable[Edge], | ||
| ) -> tuple[Edge, ...]: | ||
| """Return lexicographically sorted order violations.""" | ||
| positions = {node: index for index, node in enumerate(sequence)} |
There was a problem hiding this comment.
The current implementation overwrites the index for duplicate nodes in the sequence, effectively using the last occurrence. In replay sequences where events or states might repeat, this can lead to incorrect violation reports. For example, in a sequence ['A', 'B', 'A'], a requirement that A precedes B would be flagged as a violation because the last A (index 2) is after B (index 1). Using the first occurrence is generally more appropriate for causal ordering checks.
| positions = {node: index for index, node in enumerate(sequence)} | |
| positions: dict[str, int] = {} | |
| for index, node in enumerate(sequence): | |
| if node not in positions: | |
| positions[node] = index |
| original_nodes = set(nodes_from_edges(original)) | ||
| replay_nodes = set(nodes_from_edges(replay)) | ||
|
|
||
| missing_nodes = tuple(sorted(original_nodes - replay_nodes)) | ||
| added_nodes = tuple(sorted(replay_nodes - original_nodes)) |
There was a problem hiding this comment.
Calling nodes_from_edges here is redundant because it internally calls normalize_edges, which has already been performed on lines 29-30. Since original and replay are already normalized tuples of edges, you can extract the nodes more efficiently using a set comprehension.
| original_nodes = set(nodes_from_edges(original)) | |
| replay_nodes = set(nodes_from_edges(replay)) | |
| missing_nodes = tuple(sorted(original_nodes - replay_nodes)) | |
| added_nodes = tuple(sorted(replay_nodes - original_nodes)) | |
| original_nodes = {n for edge in original for n in edge} | |
| replay_nodes = {n for edge in replay for n in edge} | |
| missing_nodes = tuple(sorted(original_nodes - replay_nodes)) | |
| added_nodes = tuple(sorted(replay_nodes - original_nodes)) |
| def reachable_nodes(edges: Iterable[Edge], start: str) -> tuple[str, ...]: | ||
| """Return sorted reachable nodes from start. | ||
|
|
||
| The start node is excluded unless it is reachable through a cycle. | ||
| """ | ||
| adjacency = adjacency_map(edges) |
There was a problem hiding this comment.
Reconstructing the adjacency map on every call to reachable_nodes (and has_path) is inefficient, especially for multiple queries on the same graph. Consider allowing an optional pre-computed adjacency map to be passed in to avoid the
| def reachable_nodes(edges: Iterable[Edge], start: str) -> tuple[str, ...]: | |
| """Return sorted reachable nodes from start. | |
| The start node is excluded unless it is reachable through a cycle. | |
| """ | |
| adjacency = adjacency_map(edges) | |
| def reachable_nodes( | |
| edges: Iterable[Edge], | |
| start: str, | |
| adjacency: dict[str, tuple[str, ...]] | None = None, | |
| ) -> tuple[str, ...]: | |
| """Return sorted reachable nodes from start. | |
| The start node is excluded unless it is reachable through a cycle. | |
| """ | |
| if adjacency is None: | |
| adjacency = adjacency_map(edges) |
|
|
||
| if start in seen: | ||
| return tuple(sorted(seen)) | ||
| return tuple(sorted(node for node in seen if node != start)) |
There was a problem hiding this comment.
The conditional check node != start is redundant. If start is not in seen (no cycle), then the condition is always true for all elements in seen. If start is in seen (cycle), it is already handled by the if start in seen block on line 32. Simplifying this to return tuple(sorted(seen)) improves clarity.
| return tuple(sorted(node for node in seen if node != start)) | |
| return tuple(sorted(seen)) |
Motivation
Description
src/comptext_v7/graph/implementing topology, ordering, reachability, and deterministic edge-diff evidence with a small public export surface.src/comptext_v7/graph/__init__.py,src/comptext_v7/graph/topology.py,src/comptext_v7/graph/ordering.py,src/comptext_v7/graph/reachability.py,src/comptext_v7/graph/evidence.py,tests/test_replay_graph_core.py.normalize_edges,nodes_from_edges,adjacency_map(topology);find_order_violations(ordering);reachable_nodes,has_path(reachability);ReplayGraphDiffdataclass andcompare_edges(evidence); all outputs are immutable/tuple-based and deterministic.Testing
pytest tests/test_replay_graph_core.py -qand the module tests passed (10 passed).pytest tests/test_failure_taxonomy.py -q(4 passed) andpytest tests/test_fixture_manifest.py -q(8 passed).npm run checkwhich completed and ran the test suite; full test run reported247 passedacross the repository.Codex Task