|
20 | 20 | from google.api_core.exceptions import BadRequest |
21 | 21 | from google.cloud import bigquery |
22 | 22 |
|
23 | | -from bigquery_etl.cli.utils import table_matches_patterns |
| 23 | +from bigquery_etl.cli.utils import ( |
| 24 | + get_glean_app_id_to_app_name_mapping, |
| 25 | + parallelism_option, |
| 26 | + project_id_option, |
| 27 | + table_matches_patterns, |
| 28 | +) |
24 | 29 | from bigquery_etl.config import ConfigLoader |
25 | 30 | from bigquery_etl.util.bigquery_id import sql_table_id |
26 | 31 | from bigquery_etl.util.client_queue import ClientQueue |
27 | 32 | from bigquery_etl.util.common import TempDatasetReference |
28 | 33 |
|
29 | | -from .cli.utils import parallelism_option, project_id_option |
30 | | - |
31 | 34 | QUERY_TEMPLATE = """ |
32 | 35 | WITH |
33 | 36 | -- Distinct document_ids and their minimum submission_timestamp today |
@@ -97,33 +100,43 @@ def _has_field_path(schema: List[bigquery.SchemaField], path: List[str]) -> bool |
97 | 100 | def _select_geo(live_table: str, client: bigquery.Client) -> str: |
98 | 101 | """Build a SELECT REPLACE clause that NULLs metadata.geo.* if applicable.""" |
99 | 102 | _, dataset_id, table_id = live_table.split(".") |
| 103 | + channel_to_app_name = get_glean_app_id_to_app_name_mapping() |
| 104 | + app_id = re.sub("_live$", "", dataset_id) |
| 105 | + |
| 106 | + excluded_apps = set(ConfigLoader.get("geo_deprecation", "skip_apps", fallback=[])) |
| 107 | + app_name = channel_to_app_name.get(app_id) |
| 108 | + if app_name in excluded_apps: |
| 109 | + return "" |
100 | 110 |
|
101 | 111 | excluded_tables = set( |
102 | 112 | ConfigLoader.get("geo_deprecation", "skip_tables", fallback=[]) |
103 | 113 | ) |
104 | 114 | if re.sub(r"_v\d+$", "", table_id) in excluded_tables: |
105 | 115 | return "" |
106 | 116 |
|
107 | | - app_id = dataset_id.removesuffix("_live") |
108 | | - included_apps = set( |
109 | | - ConfigLoader.get("geo_deprecation", "include_app_ids", fallback=[]) |
110 | | - ) |
111 | | - if app_id not in included_apps: |
112 | | - return "" |
113 | | - |
114 | 117 | table = client.get_table(live_table) |
115 | 118 |
|
| 119 | + # Only deprecating the geo fields for glean apps. Legacy tables would be deprecated after glean migration |
| 120 | + if app_id not in channel_to_app_name.keys(): |
| 121 | + return "" |
| 122 | + |
| 123 | + # only glean tables have this label |
116 | 124 | include_client_id = table.labels.get("include_client_id") == "true" |
117 | 125 | if not include_client_id: |
118 | 126 | return "" |
119 | 127 |
|
120 | | - # Check schema to ensure geo fields exists |
| 128 | + # Check schema to ensure required fields exists |
121 | 129 | schema = table.schema |
122 | | - required_fields = ("city", "subdivision1", "subdivision2") |
123 | | - has_required_fields = all( |
124 | | - _has_field_path(schema, ["metadata", "geo", field]) for field in required_fields |
| 130 | + has_client_id_field = _has_field_path(schema, ["client_info", "client_id"]) |
| 131 | + if not has_client_id_field: |
| 132 | + return "" |
| 133 | + |
| 134 | + required_geo_fields = ("city", "subdivision1", "subdivision2") |
| 135 | + has_required_geo_fields = all( |
| 136 | + _has_field_path(schema, ["metadata", "geo", field]) |
| 137 | + for field in required_geo_fields |
125 | 138 | ) |
126 | | - if not has_required_fields: |
| 139 | + if not has_required_geo_fields: |
127 | 140 | return "" |
128 | 141 |
|
129 | 142 | return """ |
|
0 commit comments