Skip to content

Commit ab05573

Browse files
committed
fix(migration): erase corrupt ids from failed edits
1 parent 8b92f00 commit ab05573

File tree

3 files changed

+71
-12
lines changed

3 files changed

+71
-12
lines changed

pychunkedgraph/ingest/upgrade/atomic_layer.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from pychunkedgraph.graph.attributes import Connectivity, Hierarchy
1212
from pychunkedgraph.graph.utils import serializers
1313

14-
from .utils import get_end_timestamps, get_parent_timestamps
14+
from .utils import fix_corrupt_nodes, get_end_timestamps, get_parent_timestamps
1515

1616
CHILDREN = {}
1717

@@ -136,16 +136,21 @@ def update_chunk(cg: ChunkedGraph, chunk_coords: list[int]):
136136
nodes = []
137137
nodes_ts = []
138138
earliest_ts = cg.get_earliest_timestamp()
139+
corrupt_nodes = []
140+
corrupt_nodes_ts = []
139141
for k, v in rr.items():
140142
try:
141-
_ = v[Hierarchy.Parent]
142-
nodes.append(k)
143143
CHILDREN[k] = v[Hierarchy.Child][0].value
144144
ts = v[Hierarchy.Child][0].timestamp
145+
_ = v[Hierarchy.Parent]
146+
nodes.append(k)
145147
nodes_ts.append(earliest_ts if ts < earliest_ts else ts)
146148
except KeyError:
147-
# invalid nodes from failed tasks w/o parent column entry
148-
continue
149+
# ignore invalid nodes from failed ingest tasks, w/o parent column entry
150+
# retain invalid nodes from edits to fix the hierarchy
151+
if ts > earliest_ts:
152+
corrupt_nodes.append(k)
153+
corrupt_nodes_ts.append(ts)
149154

150155
if len(nodes) > 0:
151156
logging.info(f"processing {len(nodes)} nodes.")
@@ -156,3 +161,7 @@ def update_chunk(cg: ChunkedGraph, chunk_coords: list[int]):
156161
rows = update_nodes(cg, nodes, nodes_ts)
157162
cg.client.write(rows)
158163
logging.info(f"mutations: {len(rows)}, time: {time.time() - start}")
164+
165+
if len(corrupt_nodes) > 0:
166+
logging.info(f"found {len(corrupt_nodes)} corrupt nodes {corrupt_nodes[:3]}...")
167+
fix_corrupt_nodes(cg, corrupt_nodes, corrupt_nodes_ts, CHILDREN)

pychunkedgraph/ingest/upgrade/parent_layer.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from pychunkedgraph.graph.types import empty_2d
1818
from pychunkedgraph.utils.general import chunked
1919

20-
from .utils import get_end_timestamps, get_parent_timestamps
20+
from .utils import fix_corrupt_nodes, get_end_timestamps, get_parent_timestamps
2121

2222

2323
CHILDREN = {}
@@ -145,17 +145,30 @@ def _update_cross_edges_helper(args):
145145
parents = cg.get_parents(nodes, fail_to_zero=True)
146146

147147
tasks = []
148+
corrupt_nodes = []
149+
corrupt_nodes_ts = []
150+
earliest_ts = cg.get_earliest_timestamp()
148151
for node, parent, node_ts in zip(nodes, parents, nodes_ts):
149152
if parent == 0:
150-
# invalid id caused by failed ingest task / edits
151-
continue
152-
tasks.append((cg, layer, node, node_ts))
153+
# ignore invalid nodes from failed ingest tasks, w/o parent column entry
154+
# retain invalid nodes from edits to fix the hierarchy
155+
if node_ts > earliest_ts:
156+
corrupt_nodes.append(node)
157+
corrupt_nodes_ts.append(node_ts)
158+
else:
159+
tasks.append((cg, layer, node, node_ts))
153160

154161
with ThreadPoolExecutor(max_workers=4) as executor:
155-
futures = [executor.submit(_update_cross_edges_helper_thread, task) for task in tasks]
162+
futures = [
163+
executor.submit(_update_cross_edges_helper_thread, task) for task in tasks
164+
]
156165
for future in tqdm(as_completed(futures), total=len(futures)):
157166
rows.extend(future.result())
167+
158168
cg.client.write(rows)
169+
if len(corrupt_nodes) > 0:
170+
logging.info(f"found {len(corrupt_nodes)} corrupt nodes {corrupt_nodes[:3]}...")
171+
fix_corrupt_nodes(cg, corrupt_nodes, corrupt_nodes_ts, CHILDREN)
159172

160173

161174
def update_chunk(
@@ -164,7 +177,7 @@ def update_chunk(
164177
"""
165178
Iterate over all layer IDs in a chunk and update their cross chunk edges.
166179
"""
167-
debug = nodes is not None
180+
debug = nodes is not None
168181
start = time.time()
169182
x, y, z = chunk_coords
170183
chunk_id = cg.get_chunk_id(layer=layer, x=x, y=y, z=z)

pychunkedgraph/ingest/upgrade/utils.py

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
# pylint: disable=invalid-name, missing-docstring
22

33
from collections import defaultdict
4-
from datetime import datetime, timezone
4+
from datetime import datetime, timedelta
55

66
import numpy as np
77
from pychunkedgraph.graph import ChunkedGraph
88
from pychunkedgraph.graph.attributes import Hierarchy
9+
from pychunkedgraph.graph.utils import serializers
10+
from google.cloud.bigtable.row_filters import TimestampRange
911

1012

1113
def exists_as_parent(cg: ChunkedGraph, parent, nodes) -> bool:
@@ -102,3 +104,38 @@ def get_parent_timestamps(
102104
ts = cell.timestamp
103105
result[k].add(earliest_ts if ts < earliest_ts else ts)
104106
return result
107+
108+
109+
def _fix_corrupt_node(cg: ChunkedGraph, node: int, children: np.ndarray):
110+
"""
111+
Removes this node from parent column of its children.
112+
Then removes the node iteself, effectively erasing it.
113+
"""
114+
table = cg.client._table
115+
batcher = table.mutations_batcher(flush_count=500)
116+
children_d = cg.client.read_nodes(node_ids=children, properties=Hierarchy.Parent)
117+
for child, parent_cells in children_d.items():
118+
row = table.direct_row(serializers.serialize_uint64(child))
119+
for cell in parent_cells:
120+
if cell.value == node:
121+
start = cell.timestamp
122+
end = start + timedelta(microseconds=1)
123+
row.delete_cell(
124+
column_family_id=Hierarchy.Parent.family_id,
125+
column=Hierarchy.Parent.key,
126+
time_range=TimestampRange(start=start, end=end),
127+
)
128+
batcher.mutate(row)
129+
130+
row = table.direct_row(serializers.serialize_uint64(node))
131+
row.delete()
132+
batcher.mutate(row)
133+
batcher.flush()
134+
135+
136+
def fix_corrupt_nodes(cg: ChunkedGraph, nodes: list, nodes_ts: list, children_d: dict):
137+
_children_d = {node: children_d[node] for node in nodes}
138+
end_timestamps = get_end_timestamps(cg, nodes, nodes_ts, _children_d, layer=2)
139+
for node, end_ts in zip(nodes, end_timestamps):
140+
assert end_ts is None, f"{node}: {end_ts}"
141+
_fix_corrupt_node(cg, node, _children_d[node])

0 commit comments

Comments
 (0)