Skip to content

Commit f625172

Browse files
authored
add export proto to parquet example (#110)
* add export proto to parquet example * address comments * lower python to 3.8 * set up own poetry * address comments * modify dependencies * update mypy to 0.981 * remove redundant comma
1 parent 4303a9b commit f625172

10 files changed

+1143
-555
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ Some examples require extra dependencies. See each sample's directory for specif
5252
* [hello_signal](hello/hello_signal.py) - Send signals to a workflow.
5353
<!-- Keep this list in alphabetical order -->
5454
* [activity_worker](activity_worker) - Use Python activities from a workflow in another language.
55+
* [cloud_export_to_parquet](cloud_export_to_parquet) - Set up schedule workflow to process exported files on an hourly basis
5556
* [custom_converter](custom_converter) - Use a custom payload converter to handle custom types.
5657
* [custom_decorator](custom_decorator) - Custom decorator to auto-heartbeat a long-running activity.
5758
* [dsl](dsl) - DSL workflow that executes steps defined in a YAML file.
@@ -68,7 +69,6 @@ Some examples require extra dependencies. See each sample's directory for specif
6869
* [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers.
6970
* [worker_versioning](worker_versioning) - Use the Worker Versioning feature to more easily version your workflows & other code.
7071

71-
7272
## Test
7373

7474
Running the tests requires `poe` to be installed.

cloud_export_to_parquet/README.md

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Cloud Export to parquet sample
2+
3+
This is an example workflow to convert exported file from proto to parquet file. The workflow is an hourly schedule.
4+
5+
Please make sure your python is 3.9 above. For this sample, run:
6+
7+
poetry install --with cloud_export_to_parquet
8+
9+
Before you start, please modify workflow input in `create_schedule.py` with your s3 bucket and namespace. Also make sure you've the right AWS permission set up in your environment to allow this workflow read and write to your s3 bucket.
10+
11+
To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the worker:
12+
13+
```bash
14+
poetry run python run_worker.py
15+
```
16+
17+
This will start the worker. Then, in another terminal, run the following to execute the schedule:
18+
19+
```bash
20+
poetry run python create_schedule.py
21+
```
22+
23+
The workflow should convert exported file in your input s3 bucket to parquet in your specified location.

cloud_export_to_parquet/__init__.py

Whitespace-only changes.
+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import asyncio
2+
import traceback
3+
from datetime import datetime, timedelta
4+
5+
from temporalio.client import (
6+
Client,
7+
Schedule,
8+
ScheduleActionStartWorkflow,
9+
ScheduleIntervalSpec,
10+
ScheduleSpec,
11+
WorkflowFailureError,
12+
)
13+
14+
from cloud_export_to_parquet.workflows import (
15+
ProtoToParquet,
16+
ProtoToParquetWorkflowInput,
17+
)
18+
19+
20+
async def main() -> None:
21+
"""Main function to run temporal workflow."""
22+
# Create client connected to server at the given address
23+
client = await Client.connect("localhost:7233")
24+
# TODO: update s3_bucket and namespace to the actual usecase
25+
wf_input = ProtoToParquetWorkflowInput(
26+
num_delay_hour=2,
27+
export_s3_bucket="test-input-bucket",
28+
namespace="test.namespace",
29+
output_s3_bucket="test-output-bucket",
30+
)
31+
32+
# Run the workflow
33+
# try:
34+
# await client.start_workflow(
35+
# ProtoToParquet.run,
36+
# wf_input,
37+
# id = f"proto-to-parquet-{datetime.now()}",
38+
# task_queue="DATA_TRANSFORMATION_TASK_QUEUE",
39+
# )
40+
# except WorkflowFailureError:
41+
# print("Got exception: ", traceback.format_exc())
42+
43+
# Create the schedule
44+
try:
45+
await client.create_schedule(
46+
"hourly-proto-to-parquet-wf-schedule",
47+
Schedule(
48+
action=ScheduleActionStartWorkflow(
49+
ProtoToParquet.run,
50+
wf_input,
51+
id=f"proto-to-parquet-{datetime.now()}",
52+
task_queue="DATA_TRANSFORMATION_TASK_QUEUE",
53+
),
54+
spec=ScheduleSpec(
55+
intervals=[ScheduleIntervalSpec(every=timedelta(hours=1))]
56+
),
57+
),
58+
)
59+
except WorkflowFailureError:
60+
print("Got exception: ", traceback.format_exc())
61+
62+
63+
if __name__ == "__main__":
64+
asyncio.run(main())
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
import json
2+
import uuid
3+
from dataclasses import dataclass
4+
from typing import List
5+
6+
import boto3
7+
import pandas as pd
8+
import temporalio.api.export.v1 as export
9+
from google.protobuf.json_format import MessageToJson
10+
from temporalio import activity
11+
12+
13+
@dataclass
14+
class GetObjectKeysActivityInput:
15+
bucket: str
16+
path: str
17+
18+
19+
@dataclass
20+
class DataTransAndLandActivityInput:
21+
export_s3_bucket: str
22+
object_key: str
23+
output_s3_bucket: str
24+
write_path: str
25+
26+
27+
@activity.defn
28+
def get_object_keys(activity_input: GetObjectKeysActivityInput) -> List[str]:
29+
"""Function that list objects by key."""
30+
object_keys = []
31+
s3 = boto3.client("s3")
32+
response = s3.list_objects_v2(
33+
Bucket=activity_input.bucket, Prefix=activity_input.path
34+
)
35+
for obj in response.get("Contents", []):
36+
object_keys.append(obj["Key"])
37+
if len(object_keys) == 0:
38+
raise FileNotFoundError(
39+
f"No files found in {activity_input.bucket}/{activity_input.path}"
40+
)
41+
42+
return object_keys
43+
44+
45+
@activity.defn
46+
def data_trans_and_land(activity_input: DataTransAndLandActivityInput) -> str:
47+
"""Function that convert proto to parquet and save to S3."""
48+
key = activity_input.object_key
49+
data = get_data_from_object_key(activity_input.export_s3_bucket, key)
50+
activity.logger.info("Convert proto to parquet for file: %s", key)
51+
parquet_data = convert_proto_to_parquet_flatten(data)
52+
activity.logger.info("Finish transformation for file: %s", key)
53+
return save_to_sink(
54+
parquet_data, activity_input.output_s3_bucket, activity_input.write_path
55+
)
56+
57+
58+
def get_data_from_object_key(
59+
bucket_name: str, object_key: str
60+
) -> export.WorkflowExecutions:
61+
"""Function that get object by key."""
62+
v = export.WorkflowExecutions()
63+
64+
s3 = boto3.client("s3")
65+
try:
66+
data = s3.get_object(Bucket=bucket_name, Key=object_key)["Body"].read()
67+
except Exception as e:
68+
activity.logger.error(f"Error reading object: {e}")
69+
raise e
70+
v.ParseFromString(data)
71+
return v
72+
73+
74+
def convert_proto_to_parquet_flatten(wfs: export.WorkflowExecutions) -> pd.DataFrame:
75+
"""Function that convert flatten proto data to parquet."""
76+
dfs = []
77+
for wf in wfs.items:
78+
start_attributes = wf.history.events[
79+
0
80+
].workflow_execution_started_event_attributes
81+
histories = wf.history
82+
json_str = MessageToJson(histories)
83+
row = {
84+
"WorkflowID": start_attributes.workflow_id,
85+
"RunID": start_attributes.original_execution_run_id,
86+
"Histories": json.loads(json_str),
87+
}
88+
dfs.append(pd.DataFrame([row]))
89+
df = pd.concat(dfs, ignore_index=True)
90+
rows_flatten = []
91+
for _, row in df.iterrows():
92+
wf_histories_raw = row["Histories"]["events"]
93+
worfkow_id = row["WorkflowID"]
94+
run_id = row["RunID"]
95+
for history_event in wf_histories_raw:
96+
row_flatten = pd.json_normalize(history_event, sep="_")
97+
skip_name = ["payloads", "."]
98+
columns_to_drop = [
99+
col for col in row_flatten.columns for skip in skip_name if skip in col
100+
]
101+
row_flatten.drop(columns_to_drop, axis=1, inplace=True)
102+
row_flatten.insert(0, "WorkflowId", worfkow_id)
103+
row_flatten.insert(1, "RunId", run_id)
104+
rows_flatten.append(row_flatten)
105+
df_flatten = pd.concat(rows_flatten, ignore_index=True)
106+
return df_flatten
107+
108+
109+
def save_to_sink(data: pd.DataFrame, s3_bucket: str, write_path: str) -> str:
110+
"""Function that save object to s3 bucket."""
111+
write_bytes = data.to_parquet(None, compression="snappy", index=False)
112+
uuid_name = uuid.uuid1()
113+
file_name = f"{uuid_name}.parquet"
114+
activity.logger.info("Writing to S3 bucket: %s", file_name)
115+
116+
s3 = boto3.client("s3")
117+
try:
118+
key = f"{write_path}/{file_name}"
119+
s3.put_object(Bucket=s3_bucket, Key=key, Body=write_bytes)
120+
return key
121+
except Exception as e:
122+
activity.logger.error(f"Error saving to sink: {e}")
123+
raise e

cloud_export_to_parquet/run_worker.py

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import asyncio
2+
from concurrent.futures import ThreadPoolExecutor
3+
4+
from temporalio.client import Client
5+
from temporalio.worker import Worker
6+
from temporalio.worker.workflow_sandbox import (
7+
SandboxedWorkflowRunner,
8+
SandboxRestrictions,
9+
)
10+
11+
from cloud_export_to_parquet.data_trans_activities import (
12+
data_trans_and_land,
13+
get_object_keys,
14+
)
15+
from cloud_export_to_parquet.workflows import ProtoToParquet
16+
17+
18+
async def main() -> None:
19+
"""Main worker function."""
20+
# Create client connected to server at the given address
21+
client = await Client.connect("localhost:7233")
22+
23+
# Run the worker
24+
worker: Worker = Worker(
25+
client,
26+
task_queue="DATA_TRANSFORMATION_TASK_QUEUE",
27+
workflows=[ProtoToParquet],
28+
activities=[get_object_keys, data_trans_and_land],
29+
workflow_runner=SandboxedWorkflowRunner(
30+
restrictions=SandboxRestrictions.default.with_passthrough_modules("boto3")
31+
),
32+
activity_executor=ThreadPoolExecutor(100),
33+
)
34+
await worker.run()
35+
36+
37+
if __name__ == "__main__":
38+
asyncio.run(main())

cloud_export_to_parquet/workflows.py

+74
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
from datetime import timedelta
2+
3+
from temporalio import workflow
4+
from temporalio.common import RetryPolicy
5+
from temporalio.exceptions import ActivityError
6+
7+
with workflow.unsafe.imports_passed_through():
8+
from cloud_export_to_parquet.data_trans_activities import (
9+
DataTransAndLandActivityInput,
10+
data_trans_and_land,
11+
get_object_keys,
12+
GetObjectKeysActivityInput,
13+
)
14+
from dataclasses import dataclass
15+
16+
17+
@dataclass
18+
class ProtoToParquetWorkflowInput:
19+
num_delay_hour: int
20+
export_s3_bucket: str
21+
namespace: str
22+
output_s3_bucket: str
23+
24+
25+
@workflow.defn
26+
class ProtoToParquet:
27+
"""Proto to parquet workflow."""
28+
29+
@workflow.run
30+
async def run(self, workflow_input: ProtoToParquetWorkflowInput) -> str:
31+
"""Run proto to parquet workflow."""
32+
retry_policy = RetryPolicy(
33+
maximum_attempts=10, maximum_interval=timedelta(seconds=5)
34+
)
35+
36+
# Read from export S3 bucket and given at least 2 hour delay to ensure the file has been uploaded
37+
read_time = workflow.now() - timedelta(hours=workflow_input.num_delay_hour)
38+
common_path = f"{workflow_input.namespace}/{read_time.year}/{read_time.month:02}/{read_time.day:02}/{read_time.hour:02}/00"
39+
path = f"temporal-workflow-history/export/{common_path}"
40+
get_object_keys_input = GetObjectKeysActivityInput(
41+
workflow_input.export_s3_bucket, path
42+
)
43+
44+
# Read Input File
45+
object_keys_output = await workflow.execute_activity(
46+
get_object_keys,
47+
get_object_keys_input,
48+
start_to_close_timeout=timedelta(minutes=5),
49+
retry_policy=retry_policy,
50+
)
51+
52+
write_path = f"temporal-workflow-history/parquet/{common_path}"
53+
54+
try:
55+
# Could create a list of corountine objects to process files in parallel
56+
for key in object_keys_output:
57+
data_trans_and_land_input = DataTransAndLandActivityInput(
58+
workflow_input.export_s3_bucket,
59+
key,
60+
workflow_input.output_s3_bucket,
61+
write_path,
62+
)
63+
# Convert proto to parquet and save to S3
64+
await workflow.execute_activity(
65+
data_trans_and_land,
66+
data_trans_and_land_input,
67+
start_to_close_timeout=timedelta(minutes=15),
68+
retry_policy=retry_policy,
69+
)
70+
except ActivityError as output_err:
71+
workflow.logger.error(f"Data transformation failed: {output_err}")
72+
raise output_err
73+
74+
return write_path

0 commit comments

Comments
 (0)