Skip to content

Fix critical bug in delete_tree #835

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions tiled/_tests/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from ..catalog.adapter import WouldDeleteData
from ..catalog.explain import record_explanations
from ..client import Context, from_context
from ..client.register import register
from ..client.xarray import write_xarray_dataset
from ..queries import Eq, Key
from ..server.app import build_app, build_app_from_config
Expand Down Expand Up @@ -324,6 +325,111 @@ def test_write_xarray_dataset(client):
dsc.read()


@pytest.mark.asyncio
async def test_delete_subtree(tmpdir):
# Do not use client fixture here.
# The Context must be opened inside the test or we run into
# event loop crossing issues with the Postgres test.
tree = in_memory(readable_storage=[str(tmpdir)])
with Context.from_app(build_app(tree)) as context:
client = from_context(context)

for i in range(1, 5):
with open(tmpdir / f"test_{i}.csv", "w") as file:
file.write(
"""a, b, c
1, 2, 3
4, 5, 6
"""
)
# a has children b1 and b2, which each contain arrays
a = client.create_container("a")
b1 = a.create_container("b1")
await register(b1, tmpdir / "test_1.csv")
await register(b1, tmpdir / "test_2.csv")
b2 = a.create_container("b2")
await register(b2, tmpdir / "test_3.csv")
await register(b2, tmpdir / "test_4.csv")

assert list(client) == ["a"]
assert list(client["a"]) == ["b1", "b2"]
assert list(client["a"]["b1"]) == ["test_1", "test_2"]
assert list(client["a"]["b2"]) == ["test_3", "test_4"]

nodes_before_delete = (await tree.context.execute("SELECT * from nodes")).all()
assert len(nodes_before_delete) == 7
data_sources_before_delete = (
await tree.context.execute("SELECT * from data_sources")
).all()
assert len(data_sources_before_delete) == 4
assets_before_delete = (
await tree.context.execute("SELECT * from assets")
).all()
assert len(assets_before_delete) == 4

# Delete all children of b1, but not b1 itself.
client["a"]["b1"].delete_tree()

assert list(client) == ["a"]
assert list(client["a"]) == ["b1", "b2"]
assert list(client["a"]["b1"]) == [] # children deleted
assert list(client["a"]["b2"]) == ["test_3", "test_4"] # not affected
nodes_after_delete = (await tree.context.execute("SELECT * from nodes")).all()
assert len(nodes_after_delete) == 5
data_sources_after_delete = (
await tree.context.execute("SELECT * from data_sources")
).all()
assert len(data_sources_after_delete) == 2
assets_after_delete = (await tree.context.execute("SELECT * from assets")).all()
assert len(assets_after_delete) == 2


@pytest.mark.asyncio
async def test_delete_asset_registered_twice(tmpdir):
# Do not use client fixture here.
# The Context must be opened inside the test or we run into
# event loop crossing issues with the Postgres test.
tree = in_memory(readable_storage=[str(tmpdir)])
with Context.from_app(build_app(tree)) as context:
client = from_context(context)

for i in range(1, 4):
with open(tmpdir / f"test_{i}.csv", "w") as file:
file.write(
"""a, b, c
1, 2, 3
4, 5, 6
"""
)
# a has children b1 and b2, which each contain arrays
a = client.create_container("a")
b1 = a.create_container("b1")
await register(b1, tmpdir / "test_1.csv")
await register(b1, tmpdir / "test_2.csv")
b2 = a.create_container("b2")
await register(b2, tmpdir / "test_1.csv")
await register(b2, tmpdir / "test_3.csv")

data_sources_before_delete = (
await tree.context.execute("SELECT * from data_sources")
).all()
assert len(data_sources_before_delete) == 4
assets_after_delete = (await tree.context.execute("SELECT * from assets")).all()
assert len(assets_after_delete) == 3 # shared by two data sources

b2.delete_tree()

data_sources_after_delete = (
await tree.context.execute("SELECT * from data_sources")
).all()
assert len(data_sources_after_delete) == 2
assets_after_delete = (await tree.context.execute("SELECT * from assets")).all()
assert len(assets_after_delete) == 2

client["a"]["b1"]["test_1"][:]
b2.delete_tree()


@pytest.mark.asyncio
async def test_delete_tree(tmpdir):
# Do not use client fixture here.
Expand Down
64 changes: 35 additions & 29 deletions tiled/catalog/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -888,29 +888,34 @@ async def delete(self):

async def delete_tree(self, external_only=True):
"""
Delete a Node and of the Nodes beneath it in the tree.
Delete the Nodes beneath a Node in the tree.

That is, delete all Nodes that have this Node as an ancestor, any number
of "generators" up.

Any DataSources belonging to those Nodes and any Assets associated (only) with
those DataSources will also be deleted.
"""
conditions = []
segments = self.ancestors + [self.key]
for generation in range(len(segments)):
conditions.append(orm.Node.ancestors[generation] == segments[0])
condition = orm.Node.ancestors == segments
async with self.context.session() as db:
if external_only:
count_int_asset_statement = select(
func.count(orm.Asset.data_uri)
).filter(
orm.Asset.data_sources.any(
orm.DataSource.management != Management.external
count_int_asset_statement = (
select(func.count(orm.Asset.data_uri))
.select_from(orm.Asset)
.join(
orm.DataSourceAssetAssociation,
orm.DataSourceAssetAssociation.asset_id == orm.Asset.id,
)
.join(
orm.DataSource,
orm.DataSource.id
== orm.DataSourceAssetAssociation.data_source_id,
)
.join(orm.Node, orm.Node.id == orm.DataSource.node_id)
.filter(orm.DataSource.management != Management.external)
.filter(condition)
)
for condition in conditions:
count_int_asset_statement.filter(condition)
count_int_assets = (
await db.execute(count_int_asset_statement)
).scalar()
Expand All @@ -921,26 +926,27 @@ async def delete_tree(self, external_only=True):
"If you want to delete them, pass external_only=False."
)
else:
sel_int_asset_statement = select(
orm.Asset.data_uri, orm.Asset.is_directory
).filter(
orm.Asset.data_sources.any(
orm.DataSource.management != Management.external
)
)
for condition in conditions:
sel_int_asset_statement.filter(condition)
int_assets = (await db.execute(sel_int_asset_statement)).all()
for data_uri, is_directory in int_assets:
delete_asset(data_uri, is_directory)
raise NotImplementedError
# TODO Deal with Assets belonging to multiple DataSources.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent some time working through this, and it's quite tricky. Since this has no use cases for us yet, I propose to defer this (and open an issue) without letting it holdup the fix for the common case.

del_asset_statement = delete(orm.Asset)
for condition in conditions:
del_asset_statement.filter(condition)
asset_ids_to_delete = (
select(orm.Asset.id)
.select_from(orm.Asset) # Explicitly define the starting point
.join(
orm.DataSourceAssetAssociation,
orm.DataSourceAssetAssociation.asset_id == orm.Asset.id,
)
.join(
orm.DataSource,
orm.DataSource.id == orm.DataSourceAssetAssociation.data_source_id,
)
.join(orm.Node, orm.Node.id == orm.DataSource.node_id)
.filter(condition)
)
del_asset_statement = delete(orm.Asset).where(
orm.Asset.id.in_(asset_ids_to_delete)
)
await db.execute(del_asset_statement)
del_node_statement = delete(orm.Node)
for condition in conditions:
del_node_statement.filter(condition)
del_node_statement = delete(orm.Node).filter(condition)
result = await db.execute(del_node_statement)
await db.commit()
return result.rowcount
Expand Down
Loading