Skip to content

Commit 8de96b0

Browse files
committed
use new synapseclient methods
1 parent 7168053 commit 8de96b0

File tree

1 file changed

+79
-4
lines changed

1 file changed

+79
-4
lines changed

dags/synapse_minimal_jsonld_dag.py

Lines changed: 79 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@
44
from datetime import datetime
55
from synapseclient.models import query
66
from synapseclient import Entity, Synapse
7+
from synapseclient.models import Table
78
from airflow.models.param import Param
89
from orca.services.synapse import SynapseHook
910
import pandas as pd
11+
from pandas import DataFrame
12+
from urllib.parse import quote_plus
1013
import json
1114
import os
1215
from io import BytesIO
@@ -33,6 +36,7 @@
3336
"synapse_conn_id": Param("SYNAPSE_ORCA_SERVICE_ACCOUNT_CONN", type="string"),
3437
"push_results_to_s3": Param(True, type="boolean"),
3538
"aws_conn_id": Param("AWS_SYNAPSE_CROISSANT_METADATA_S3_CONN", type="string"),
39+
"push_links_to_synapse": Param(True, type="boolean"),
3640
}
3741

3842
dag_config = {
@@ -56,6 +60,7 @@
5660
MY_SERVICE_NAME = "airflow-synapse-dataset-to-minimal-croissant"
5761
MY_DEPLOYMENT_ENVIRONMENT = "prod"
5862
MY_SERVICE_VERSION = "1.0.0"
63+
SYNAPSE_TABLE_FOR_CROISSANT_LINKS = "syn72041138"
5964

6065
def set_up_tracing() -> Tracer:
6166
"""
@@ -245,11 +250,77 @@ def create_syn_client() -> Synapse:
245250
def save_minimal_jsonld_to_s3() -> None:
246251
"""Execute a query on Snowflake and report the results to a Synapse table."""
247252

253+
def execute_push_to_synapse(push_to_synapse: bool, dataset: Entity, dataset_id: str, s3_url: str, **context) -> None:
254+
"""
255+
Handle the push to Synapse of the croissant file link. This is done by using
256+
an authenticated Synapse client to first query the table to determine if an
257+
update is needed. If the link already exists with the expected S3 URL, then
258+
skip the update. If the link does not exist or the S3 URL is different, then
259+
update the link with the new S3 URL using the authenticated Synapse client.
260+
261+
Arguments:
262+
push_to_synapse: A boolean to indicate if the results should be pushed to
263+
Synapse. When set to `False`, the results will be printed to the logs.
264+
dataset: The dataset to push to Synapse.
265+
dataset_id: The ID of the dataset.
266+
s3_url: The S3 URL to use for the value of the cell in the table.
267+
syn_client: The unauthenticated Synapse client to use to query the table.
268+
context: The context of the DAG run.
269+
270+
Returns:
271+
None
272+
"""
273+
try:
274+
if not push_to_synapse:
275+
otel_logger.info(
276+
f"Croissant file link for [dataset: {dataset.name}, id: {dataset_id}]: {s3_url}")
277+
return
278+
279+
otel_logger.info(
280+
f"Uploading croissant file link to Synapse table {SYNAPSE_TABLE_FOR_CROISSANT_LINKS}"
281+
)
282+
283+
# Warning: Using an authenticated Synapse Client during this section of code
284+
syn_hook = SynapseHook(
285+
context["params"]["synapse_conn_id"])
286+
authenticated_syn_client: Synapse = syn_hook.client
287+
authenticated_syn_client._rest_call = MethodType(
288+
_rest_call_replacement, authenticated_syn_client)
289+
existing_row_df = query(query=f"SELECT * FROM {SYNAPSE_TABLE_FOR_CROISSANT_LINKS} WHERE dataset = '{dataset_id}'", synapse_client=authenticated_syn_client)
290+
291+
if not existing_row_df.empty and existing_row_df["minimal_croissant_file_s3_object"].values[0] == s3_url:
292+
otel_logger.info(
293+
f"Croissant file link already exists in Synapse table {SYNAPSE_TABLE_FOR_CROISSANT_LINKS}. Skipping.")
294+
return
295+
296+
df = DataFrame(
297+
data={
298+
"dataset": [dataset_id],
299+
"minimal_croissant_file_s3_object": [s3_url]
300+
}
301+
)
302+
if existing_row_df.empty:
303+
# If the row does not exist, create a new row
304+
print("Creating new row in Synapse table")
305+
Table(id=SYNAPSE_TABLE_FOR_CROISSANT_LINKS).store_rows(values=df)
306+
307+
else:
308+
# Update the existing row with the new value
309+
print("Updating existing row in Synapse table")
310+
existing_row_df["minimal_croissant_file_s3_object"] = [s3_url]
311+
Table(id=SYNAPSE_TABLE_FOR_CROISSANT_LINKS).store_rows(values=existing_row_df)
312+
313+
except Exception as ex:
314+
otel_logger.exception(
315+
"Failed to push croissant file link to Synapse.")
316+
otel_tracer.span_processor.force_flush()
317+
otel_logger.handlers[0].flush()
318+
raise ex
319+
248320
@task
249-
def create_and_push_to_s3(**context) -> None:
321+
def create_and_save_jsonld(**context) -> None:
250322
"""
251-
Query the dataset_collection to get the IDs for the dataset we are going to
252-
be running this process for.
323+
Create and save the minimal croissant JSON-LD files to S3 and links to a synapse table for all datasets in the home page of data catalog.
253324
254325
Arguments:
255326
dataset_collection: The dataset collection to query for datasets.
@@ -263,6 +334,7 @@ def create_and_push_to_s3(**context) -> None:
263334
table = query(f"select * from {SYNAPSE_DATA_CATALOG}", synapse_client=syn_client)
264335

265336
push_to_s3 = context["params"]["push_results_to_s3"]
337+
push_to_synapse = context["params"]["push_links_to_synapse"]
266338

267339

268340
for index, row in table.iterrows():
@@ -292,9 +364,12 @@ def create_and_push_to_s3(**context) -> None:
292364

293365
s3_key = f"{dataset_name}_{dataset_id}.minimal_croissant.jsonld"
294366
execute_push_to_s3(dataset=dataset,dataset_id=row["id"],s3_key=s3_key, croissant_file=minimal_croissant_file, push_to_s3=push_to_s3, **context)
367+
368+
s3_url = f"https://{BUCKET_NAME}.s3.us-east-1.amazonaws.com/{quote_plus(s3_key)}"
369+
execute_push_to_synapse(push_to_synapse=push_to_synapse, dataset=dataset, dataset_id=dataset_id, s3_url=s3_url, **context)
295370

296371

297-
create_and_push_to_s3()
372+
create_and_save_jsonld()
298373

299374

300375
save_minimal_jsonld_to_s3()

0 commit comments

Comments
 (0)