Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion bigquery_etl/shredder/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def fields(self) -> tuple[str, ...]:
CLIENT_ID = "client_id"
GLEAN_CLIENT_ID = "client_info.client_id"
GLEAN_USAGE_PROFILE_ID = "metrics.uuid.usage_profile_id"
USAGE_PROFILE_ID = "usage_profile_id"
IMPRESSION_ID = "impression_id"
USER_ID = "user_id"
POCKET_ID = "pocket_id"
Expand Down Expand Up @@ -214,6 +215,7 @@ def fields(self) -> tuple[str, ...]:


client_id_target = partial(DeleteTarget, field=CLIENT_ID)
usage_profile_id_target = partial(DeleteTarget, field=USAGE_PROFILE_ID)
glean_target = partial(DeleteTarget, field=GLEAN_CLIENT_ID)
impression_id_target = partial(DeleteTarget, field=IMPRESSION_ID)
fxa_user_id_target = partial(DeleteTarget, field=FXA_USER_ID)
Expand Down Expand Up @@ -901,7 +903,7 @@ def stable_tables_by_schema(schema_id):
metric_type_field.name == "uuid"
and any(
[
metric_field.name == "usage_profile_id"
metric_field.name == USAGE_PROFILE_ID
for metric_field in metric_type_field.fields
]
)
Expand All @@ -913,6 +915,12 @@ def stable_tables_by_schema(schema_id):
qualified_table_id(table), GLEAN_USAGE_PROFILE_ID, project
)
usage_reporting_sources[table.dataset_id] += (source,)
usage_reporting_sources[
table.dataset_id.replace("stable", "derived")
] += (source,)
usage_reporting_sources[
channel_to_app_name[table.dataset_id.replace("_stable", "")]
] += (source,)

return {
**{
Expand Down Expand Up @@ -974,6 +982,20 @@ def stable_tables_by_schema(schema_id):
if table.table_id == "usage_reporting_v1"
and table.dataset_id in usage_reporting_sources
},
**{
# usage_reporting derived tables that contain usage_profile_id
DeleteTarget(
table=qualified_table_id(table),
# field must be repeated for each deletion source
field=(USAGE_PROFILE_ID,)
* len(usage_reporting_sources[table.dataset_id]),
): usage_reporting_sources[table.dataset_id]
for table in glean_derived_tables
if any(field.name == USAGE_PROFILE_ID for field in table.schema)
and all(field.name != CLIENT_ID for field in table.schema)
and not table.table_id.startswith(derived_source_prefix)
and qualified_table_id(table) not in skipped_tables
},
}


Expand Down
82 changes: 73 additions & 9 deletions tests/shredder/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,23 @@ def list_datasets(self, project):
]

def list_tables(self, dataset_ref):
labels = {}
if dataset_ref.dataset_id.endswith("stable"):
table_ids = ["metrics_v1", "deletion_request_v1", "migration_v1"]
labels["schema_id"] = "glean_ping_1"
table_ids = [
("metrics_v1", {"schema_id": "glean_ping_1"}),
("deletion_request_v1", {"schema_id": "glean_ping_1"}),
("migration_v1", {"schema_id": "glean_ping_1"}),
("usage_reporting_v1", {"schema_id": "glean-min_ping_1"}),
("usage_deletion_request_v1", {"schema_id": "glean-min_ping_1"}),
]
elif dataset_ref.dataset_id in {
"org_mozilla_focus_derived",
"org_mozilla_focus_beta_derived",
}:
table_ids = [
"additional_deletion_requests_v1", # should be ignored
"clients_daily_v1",
"dau_v1", # aggregated, no client_id
"dau_v1", # aggregated, no client_id,
"usage_reporting_clients_daily_v1", # should use usage_deletion_request_v1
]
elif dataset_ref.dataset_id.endswith("derived"):
table_ids = ["clients_daily_v1"]
Expand All @@ -97,9 +102,10 @@ def list_tables(self, dataset_ref):
bigquery.table.TableListItem(
{
"tableReference": bigquery.TableReference(
dataset_ref, table_id
dataset_ref,
table_id if isinstance(table_id, str) else table_id[0],
).to_api_repr(),
"labels": labels,
"labels": {} if isinstance(table_id, str) else table_id[1],
}
)
for table_id in table_ids
Expand All @@ -108,13 +114,29 @@ def list_tables(self, dataset_ref):
def get_table(self, table_ref):
table = bigquery.Table(table_ref)
table._properties[table._PROPERTY_TO_API_FIELD["type"]] = "TABLE"
if table.dataset_id.endswith("stable"):
if table.table_id in {
"usage_reporting_v1",
"usage_deletion_request_v1",
}:
table.schema = [
bigquery.SchemaField(
"metrics",
"RECORD",
fields=[
bigquery.SchemaField(
"uuid",
"RECORD",
fields=[bigquery.SchemaField("usage_profile_id", "STRING")],
),
],
)
]
elif table.dataset_id.endswith("stable"):
table.schema = [
bigquery.SchemaField(
"client_info",
"RECORD",
"NULLABLE",
[bigquery.SchemaField("client_id", "STRING")],
fields=[bigquery.SchemaField("client_id", "STRING")],
)
]
elif table.table_id in {
Expand All @@ -125,6 +147,10 @@ def get_table(self, table_ref):
"focus_android",
}:
table.schema = [bigquery.SchemaField("client_id", "STRING")]
elif table.table_id in {
"usage_reporting_clients_daily_v1",
}:
table.schema = [bigquery.SchemaField("usage_profile_id", "STRING")]
else:
table.schema = [bigquery.SchemaField("document_id", "STRING")]
return table
Expand Down Expand Up @@ -292,6 +318,44 @@ def test_glean_targets(mock_requests):
),
]
},
**{ # usage_reporting_v1
DeleteTarget(
table=f"{app_id}_stable.usage_reporting_v1",
field=("metrics.uuid.usage_profile_id",),
project="moz-fx-data-shared-prod",
): {
DeleteSource(
table=f"{app_id}_stable.usage_deletion_request_v1",
field="metrics.uuid.usage_profile_id",
project="moz-fx-data-shared-prod",
conditions=(),
),
}
for app_id in [
"org_mozilla_focus",
"org_mozilla_focus_beta",
"org_mozilla_firefox",
"org_mozilla_firefox_beta",
]
},
**{ # usage_reporting_clients_daily_v1
DeleteTarget(
table=f"{app_id}_derived.usage_reporting_clients_daily_v1",
field=("usage_profile_id",),
project="moz-fx-data-shared-prod",
): {
DeleteSource(
table=f"{app_id}_stable.usage_deletion_request_v1",
field="metrics.uuid.usage_profile_id",
project="moz-fx-data-shared-prod",
conditions=(),
),
}
for app_id in [
"org_mozilla_focus",
"org_mozilla_focus_beta",
]
},
}


Expand Down