Skip to content

Commit 4db426f

Browse files
committed
add dag for creating minimal jsonld
1 parent 4d61289 commit 4db426f

File tree

1 file changed

+300
-0
lines changed

1 file changed

+300
-0
lines changed

dags/synapse_minimal_jsonld_dag.py

Lines changed: 300 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,300 @@
1+
2+
3+
from airflow.decorators import dag, task
4+
from datetime import datetime
5+
from synapseclient.models import query
6+
from synapseclient import Entity, Synapse
7+
from airflow.models.param import Param
8+
from orca.services.synapse import SynapseHook
9+
import pandas as pd
10+
import json
11+
import os
12+
from io import BytesIO
13+
from typing import Any, Dict
14+
from types import MethodType
15+
from airflow.models import Variable
16+
from logging import NOTSET, Logger, getLogger
17+
from opentelemetry import trace
18+
from opentelemetry._logs import set_logger_provider
19+
from opentelemetry.exporter.otlp.proto.http._log_exporter import \
20+
OTLPLogExporter
21+
from opentelemetry.exporter.otlp.proto.http.trace_exporter import \
22+
OTLPSpanExporter
23+
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
24+
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
25+
from opentelemetry.sdk.resources import (DEPLOYMENT_ENVIRONMENT, SERVICE_NAME,
26+
SERVICE_VERSION, Resource)
27+
from opentelemetry.sdk.trace import Tracer, TracerProvider
28+
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
29+
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
30+
from synapseclient.core.retry import with_retry
31+
32+
dag_params = {
33+
"synapse_conn_id": Param("SYNAPSE_ORCA_SERVICE_ACCOUNT_CONN", type="string"),
34+
"push_results_to_s3": Param(True, type="boolean"),
35+
"aws_conn_id": Param("AWS_SYNAPSE_CROISSANT_METADATA_S3_CONN", type="string"),
36+
}
37+
38+
dag_config = {
39+
"schedule_interval": "0 0 * * 1",
40+
"start_date": datetime(2025, 2, 1),
41+
"catchup": False,
42+
"default_args": {
43+
"retries": 1,
44+
},
45+
"params": dag_params,
46+
}
47+
48+
SYNAPSE_DATA_CATALOG = "syn61609402"
49+
# AWS related constants
50+
REGION_NAME = "us-east-1"
51+
BUCKET_NAME="synapse-croissant-metadata-minimal"
52+
# Open telemetry related constants
53+
# Used to set `deployment.environment` in the telemetry data.
54+
# Since tracing and logging is getting set up outside of the DAG, we need to set
55+
# the deployment environment here.
56+
MY_SERVICE_NAME = "airflow-synapse-dataset-to-minimal-croissant"
57+
MY_DEPLOYMENT_ENVIRONMENT = "prod"
58+
MY_SERVICE_VERSION = "1.0.0"
59+
60+
def set_up_tracing() -> Tracer:
61+
"""
62+
Set up the opentelemetry tracing library to export telemetry data via a BatchSpanProcessor.
63+
64+
The following environment variables are used to configure the service:
65+
- SERVICE_NAME: The name of the service.
66+
- DEPLOYMENT_ENVIRONMENT: The environment in which the service is running.
67+
- MY_SERVICE_VERSION: The version of the service.
68+
69+
70+
These attributes are used by the OTLP exporter to tag the telemetry data. They
71+
should be set to something that uniquely identifies the code that is producing this
72+
data. Within the telemetry backend these attributes will be used to filter and
73+
group the data.
74+
"""
75+
service_name = os.environ.get("SERVICE_NAME", MY_SERVICE_NAME)
76+
deployment_environment = os.environ.get(
77+
"DEPLOYMENT_ENVIRONMENT", MY_DEPLOYMENT_ENVIRONMENT)
78+
service_version = os.environ.get("MY_SERVICE_VERSION", MY_SERVICE_VERSION)
79+
80+
trace.set_tracer_provider(
81+
TracerProvider(
82+
resource=Resource(
83+
attributes={
84+
SERVICE_NAME: service_name,
85+
SERVICE_VERSION: service_version,
86+
DEPLOYMENT_ENVIRONMENT: deployment_environment,
87+
}
88+
)
89+
)
90+
)
91+
92+
exporter = OTLPSpanExporter(endpoint="https://ingest.us.signoz.cloud:443/v1/traces", headers={
93+
"signoz-ingestion-key": Variable.get("SIGNOZ_INGESTION_KEY")})
94+
trace.get_tracer_provider().add_span_processor(SimpleSpanProcessor(exporter))
95+
return trace.get_tracer(__name__)
96+
97+
98+
def set_up_logging() -> Logger:
99+
"""
100+
Set up the opentelemetry logging library to export telemetry data via a BatchLogRecordProcessor.
101+
102+
The following static variables are used to configure the service:
103+
- SERVICE_NAME: The name of the service.
104+
- DEPLOYMENT_ENVIRONMENT: The environment in which the service is running.
105+
- MY_SERVICE_VERSION: The version of the service.
106+
107+
108+
These attributes are used by the OTLP exporter to tag the telemetry data. They
109+
should be set to something that uniquely identifies the code that is producing this
110+
data. Within the telemetry backend these attributes will be used to filter and
111+
group the data.
112+
"""
113+
service_name = os.environ.get("SERVICE_NAME", MY_SERVICE_NAME)
114+
deployment_environment = os.environ.get(
115+
"DEPLOYMENT_ENVIRONMENT", MY_DEPLOYMENT_ENVIRONMENT)
116+
service_version = os.environ.get("MY_SERVICE_VERSION", MY_SERVICE_VERSION)
117+
118+
resource = Resource.create(
119+
{
120+
SERVICE_NAME: service_name,
121+
SERVICE_VERSION: service_version,
122+
DEPLOYMENT_ENVIRONMENT: deployment_environment,
123+
}
124+
)
125+
126+
logger_provider = LoggerProvider(resource=resource)
127+
set_logger_provider(logger_provider=logger_provider)
128+
129+
exporter = OTLPLogExporter(endpoint="https://ingest.us.signoz.cloud:443/v1/logs", headers={
130+
"signoz-ingestion-key": Variable.get("SIGNOZ_INGESTION_KEY")})
131+
logger_provider.add_log_record_processor(BatchLogRecordProcessor(exporter))
132+
133+
handler = LoggingHandler(level=NOTSET,
134+
logger_provider=logger_provider)
135+
logger = getLogger()
136+
logger.addHandler(handler)
137+
return logger
138+
139+
otel_tracer = set_up_tracing()
140+
otel_logger = set_up_logging()
141+
142+
143+
def _rest_call_replacement(
144+
self,
145+
method,
146+
uri,
147+
data,
148+
endpoint,
149+
headers,
150+
retryPolicy,
151+
requests_session,
152+
**kwargs,
153+
):
154+
"""
155+
See original _rest_call method in the Synapse client for more details.
156+
"""
157+
self.logger.debug(f"Sending {method} request to {uri}")
158+
uri, headers = self._build_uri_and_headers(
159+
uri, endpoint=endpoint, headers=headers
160+
)
161+
162+
retryPolicy = self._build_retry_policy(retryPolicy)
163+
requests_session = requests_session or self._requests_session
164+
165+
auth = kwargs.pop("auth", self.credentials)
166+
requests_method_fn = getattr(requests_session, method)
167+
response = with_retry(
168+
lambda: requests_method_fn(
169+
uri,
170+
data=data,
171+
headers=headers,
172+
auth=auth,
173+
timeout=70,
174+
**kwargs,
175+
),
176+
verbose=self.debug,
177+
**retryPolicy,
178+
)
179+
self._handle_synapse_http_error(response)
180+
return response
181+
182+
def execute_push_to_s3(dataset: Entity, dataset_id: str, s3_key: str, croissant_file: Dict[str, Any], push_to_s3: bool, **context) -> None:
183+
"""
184+
Handle the push to S3 of the croissant file. This is done by using the S3Hook to
185+
upload the file to S3. The S3 bucket is stored in the `org-sagebase-dpe-prod` AWS
186+
account.
187+
188+
Arguments:
189+
dataset: The dataset to push to S3.
190+
dataset_id: The ID of the dataset.
191+
s3_key: The S3 key to use to push the file to S3.
192+
croissant_file: The croissant file to push to S3.
193+
push_to_s3: A boolean to indicate if the results should be pushed to S3.
194+
When set to `False`, the results will be printed to the logs.
195+
context: The context of the DAG run.
196+
197+
Returns:
198+
199+
"""
200+
try:
201+
if not push_to_s3:
202+
otel_logger.info(
203+
f"Croissant file for [dataset: {dataset.name}, id: {dataset_id}]:\n{json.dumps(croissant_file)}")
204+
return
205+
206+
otel_logger.info(
207+
f"Uploading croissant file for [dataset: {dataset.name}, id: {dataset_id}]")
208+
209+
croissant_metadata_bytes = json.dumps(croissant_file).encode(
210+
'utf-8')
211+
metadata_file = BytesIO(croissant_metadata_bytes)
212+
s3_hook = S3Hook(
213+
aws_conn_id=context["params"]["aws_conn_id"], region_name=REGION_NAME, extra_args={
214+
"ContentType": "application/ld+json"
215+
}
216+
)
217+
218+
otel_logger.info(
219+
f"Uploading croissant file to S3: {s3_key}")
220+
s3_hook.load_file_obj(file_obj=metadata_file,
221+
key=s3_key,
222+
bucket_name=BUCKET_NAME,
223+
replace=True,
224+
)
225+
except Exception as ex:
226+
otel_logger.exception(
227+
"Failed to query snowflake and push croissant file to S3.")
228+
otel_tracer.span_processor.force_flush()
229+
otel_logger.handlers[0].flush()
230+
raise ex
231+
232+
def create_syn_client() -> Synapse:
233+
"""
234+
Create a Synapse client that can be used to query Synapse.
235+
236+
Returns:
237+
The Synapse client.
238+
"""
239+
syn_client: Synapse = Synapse(skip_checks=True)
240+
syn_client._rest_call = MethodType(_rest_call_replacement, syn_client)
241+
assert syn_client.credentials is None, "Synapse client is not logged out"
242+
return syn_client
243+
244+
@dag(**dag_config)
245+
def save_minimal_jsonld_to_s3() -> None:
246+
"""Execute a query on Snowflake and report the results to a Synapse table."""
247+
248+
@task
249+
def create_and_push_to_s3(**context) -> None:
250+
"""
251+
Query the dataset_collection to get the IDs for the dataset we are going to
252+
be running this process for.
253+
254+
Arguments:
255+
dataset_collection: The dataset collection to query for datasets.
256+
257+
Returns:
258+
The list of dataset IDs for the given dataset collection.
259+
"""
260+
syn_hook = SynapseHook(context["params"]["synapse_conn_id"])
261+
syn_client: Synapse = syn_hook.client
262+
263+
table = query(f"select * from {SYNAPSE_DATA_CATALOG}", synapse_client=syn_client)
264+
265+
push_to_s3 = context["params"]["push_results_to_s3"]
266+
267+
268+
for index, row in table.iterrows():
269+
if pd.isnull(row["id"]):
270+
# skip rows without a Synapse ID
271+
continue
272+
273+
dataset_id = row["id"]
274+
dataset_name = row["name"]
275+
dataset = syn_client.get(dataset_id, downloadFile=False)
276+
277+
link = f"https://www.synapse.org/#!Synapse:{row['id']}"
278+
279+
minimal_croissant_file = {
280+
"@context": "https://schema.org/",
281+
"@type": "Dataset",
282+
"name": dataset_name,
283+
"description": "" if pd.isnull(row["description"]) else row["description"],
284+
"url": link,
285+
"identifier": dataset_id,
286+
"creator": {
287+
"@type": "Organization",
288+
"name": "Sage Bionetworks" if pd.isnull(row["contributors"]) else row["contributors"]
289+
},
290+
"license": "" if pd.isnull(row["license"]) else row["license"]
291+
}
292+
293+
s3_key = f"{dataset_name}_{dataset_id}.minimal_croissant.jsonld"
294+
execute_push_to_s3(dataset=dataset,dataset_id=row["id"],s3_key=s3_key, croissant_file=minimal_croissant_file, push_to_s3=push_to_s3, **context)
295+
296+
297+
create_and_push_to_s3()
298+
299+
300+
save_minimal_jsonld_to_s3()

0 commit comments

Comments
 (0)