Skip to content

Commit 8819b96

Browse files
kik-kikBenWu
andauthored
fix(DENG-10248): update shredder to support usage reporting derived (#8589)
* feat: update shredder config to ensure usage_reporting derived datasets get shredded * feat: add usage_reporting to shredder test_config.py * feat: use correct source tracking for usage reporting and make sure client_id field does not exist in the usage reporting schema prior to usage_reporting being added as deletion source * Add unit tests --------- Co-authored-by: bwu <[email protected]>
1 parent 52ac2b9 commit 8819b96

File tree

2 files changed

+96
-10
lines changed

2 files changed

+96
-10
lines changed

bigquery_etl/shredder/config.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ def fields(self) -> tuple[str, ...]:
9494
CLIENT_ID = "client_id"
9595
GLEAN_CLIENT_ID = "client_info.client_id"
9696
GLEAN_USAGE_PROFILE_ID = "metrics.uuid.usage_profile_id"
97+
USAGE_PROFILE_ID = "usage_profile_id"
9798
IMPRESSION_ID = "impression_id"
9899
USER_ID = "user_id"
99100
POCKET_ID = "pocket_id"
@@ -214,6 +215,7 @@ def fields(self) -> tuple[str, ...]:
214215

215216

216217
client_id_target = partial(DeleteTarget, field=CLIENT_ID)
218+
usage_profile_id_target = partial(DeleteTarget, field=USAGE_PROFILE_ID)
217219
glean_target = partial(DeleteTarget, field=GLEAN_CLIENT_ID)
218220
impression_id_target = partial(DeleteTarget, field=IMPRESSION_ID)
219221
fxa_user_id_target = partial(DeleteTarget, field=FXA_USER_ID)
@@ -906,7 +908,7 @@ def stable_tables_by_schema(schema_id):
906908
metric_type_field.name == "uuid"
907909
and any(
908910
[
909-
metric_field.name == "usage_profile_id"
911+
metric_field.name == USAGE_PROFILE_ID
910912
for metric_field in metric_type_field.fields
911913
]
912914
)
@@ -918,6 +920,12 @@ def stable_tables_by_schema(schema_id):
918920
qualified_table_id(table), GLEAN_USAGE_PROFILE_ID, project
919921
)
920922
usage_reporting_sources[table.dataset_id] += (source,)
923+
usage_reporting_sources[
924+
table.dataset_id.replace("stable", "derived")
925+
] += (source,)
926+
usage_reporting_sources[
927+
channel_to_app_name[table.dataset_id.replace("_stable", "")]
928+
] += (source,)
921929

922930
return {
923931
**{
@@ -979,6 +987,20 @@ def stable_tables_by_schema(schema_id):
979987
if table.table_id == "usage_reporting_v1"
980988
and table.dataset_id in usage_reporting_sources
981989
},
990+
**{
991+
# usage_reporting derived tables that contain usage_profile_id
992+
DeleteTarget(
993+
table=qualified_table_id(table),
994+
# field must be repeated for each deletion source
995+
field=(USAGE_PROFILE_ID,)
996+
* len(usage_reporting_sources[table.dataset_id]),
997+
): usage_reporting_sources[table.dataset_id]
998+
for table in glean_derived_tables
999+
if any(field.name == USAGE_PROFILE_ID for field in table.schema)
1000+
and all(field.name != CLIENT_ID for field in table.schema)
1001+
and not table.table_id.startswith(derived_source_prefix)
1002+
and qualified_table_id(table) not in skipped_tables
1003+
},
9821004
}
9831005

9841006

tests/shredder/test_config.py

Lines changed: 73 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -76,18 +76,23 @@ def list_datasets(self, project):
7676
]
7777

7878
def list_tables(self, dataset_ref):
79-
labels = {}
8079
if dataset_ref.dataset_id.endswith("stable"):
81-
table_ids = ["metrics_v1", "deletion_request_v1", "migration_v1"]
82-
labels["schema_id"] = "glean_ping_1"
80+
table_ids = [
81+
("metrics_v1", {"schema_id": "glean_ping_1"}),
82+
("deletion_request_v1", {"schema_id": "glean_ping_1"}),
83+
("migration_v1", {"schema_id": "glean_ping_1"}),
84+
("usage_reporting_v1", {"schema_id": "glean-min_ping_1"}),
85+
("usage_deletion_request_v1", {"schema_id": "glean-min_ping_1"}),
86+
]
8387
elif dataset_ref.dataset_id in {
8488
"org_mozilla_focus_derived",
8589
"org_mozilla_focus_beta_derived",
8690
}:
8791
table_ids = [
8892
"additional_deletion_requests_v1", # should be ignored
8993
"clients_daily_v1",
90-
"dau_v1", # aggregated, no client_id
94+
"dau_v1", # aggregated, no client_id,
95+
"usage_reporting_clients_daily_v1", # should use usage_deletion_request_v1
9196
]
9297
elif dataset_ref.dataset_id.endswith("derived"):
9398
table_ids = ["clients_daily_v1"]
@@ -97,9 +102,10 @@ def list_tables(self, dataset_ref):
97102
bigquery.table.TableListItem(
98103
{
99104
"tableReference": bigquery.TableReference(
100-
dataset_ref, table_id
105+
dataset_ref,
106+
table_id if isinstance(table_id, str) else table_id[0],
101107
).to_api_repr(),
102-
"labels": labels,
108+
"labels": {} if isinstance(table_id, str) else table_id[1],
103109
}
104110
)
105111
for table_id in table_ids
@@ -108,13 +114,29 @@ def list_tables(self, dataset_ref):
108114
def get_table(self, table_ref):
109115
table = bigquery.Table(table_ref)
110116
table._properties[table._PROPERTY_TO_API_FIELD["type"]] = "TABLE"
111-
if table.dataset_id.endswith("stable"):
117+
if table.table_id in {
118+
"usage_reporting_v1",
119+
"usage_deletion_request_v1",
120+
}:
121+
table.schema = [
122+
bigquery.SchemaField(
123+
"metrics",
124+
"RECORD",
125+
fields=[
126+
bigquery.SchemaField(
127+
"uuid",
128+
"RECORD",
129+
fields=[bigquery.SchemaField("usage_profile_id", "STRING")],
130+
),
131+
],
132+
)
133+
]
134+
elif table.dataset_id.endswith("stable"):
112135
table.schema = [
113136
bigquery.SchemaField(
114137
"client_info",
115138
"RECORD",
116-
"NULLABLE",
117-
[bigquery.SchemaField("client_id", "STRING")],
139+
fields=[bigquery.SchemaField("client_id", "STRING")],
118140
)
119141
]
120142
elif table.table_id in {
@@ -125,6 +147,10 @@ def get_table(self, table_ref):
125147
"focus_android",
126148
}:
127149
table.schema = [bigquery.SchemaField("client_id", "STRING")]
150+
elif table.table_id in {
151+
"usage_reporting_clients_daily_v1",
152+
}:
153+
table.schema = [bigquery.SchemaField("usage_profile_id", "STRING")]
128154
else:
129155
table.schema = [bigquery.SchemaField("document_id", "STRING")]
130156
return table
@@ -292,6 +318,44 @@ def test_glean_targets(mock_requests):
292318
),
293319
]
294320
},
321+
**{ # usage_reporting_v1
322+
DeleteTarget(
323+
table=f"{app_id}_stable.usage_reporting_v1",
324+
field=("metrics.uuid.usage_profile_id",),
325+
project="moz-fx-data-shared-prod",
326+
): {
327+
DeleteSource(
328+
table=f"{app_id}_stable.usage_deletion_request_v1",
329+
field="metrics.uuid.usage_profile_id",
330+
project="moz-fx-data-shared-prod",
331+
conditions=(),
332+
),
333+
}
334+
for app_id in [
335+
"org_mozilla_focus",
336+
"org_mozilla_focus_beta",
337+
"org_mozilla_firefox",
338+
"org_mozilla_firefox_beta",
339+
]
340+
},
341+
**{ # usage_reporting_clients_daily_v1
342+
DeleteTarget(
343+
table=f"{app_id}_derived.usage_reporting_clients_daily_v1",
344+
field=("usage_profile_id",),
345+
project="moz-fx-data-shared-prod",
346+
): {
347+
DeleteSource(
348+
table=f"{app_id}_stable.usage_deletion_request_v1",
349+
field="metrics.uuid.usage_profile_id",
350+
project="moz-fx-data-shared-prod",
351+
conditions=(),
352+
),
353+
}
354+
for app_id in [
355+
"org_mozilla_focus",
356+
"org_mozilla_focus_beta",
357+
]
358+
},
295359
}
296360

297361

0 commit comments

Comments
 (0)