3
3
from dataclasses import dataclass
4
4
from typing import List
5
5
6
- import aioboto3
6
+ import boto3
7
7
import pandas as pd
8
8
import temporalio .api .export .v1 as export
9
9
from google .protobuf .json_format import MessageToJson
@@ -25,55 +25,50 @@ class DataTransAndLandActivityInput:
25
25
26
26
27
27
@activity .defn
28
- async def get_object_keys (activity_input : GetObjectKeysActivityInput ) -> List [str ]:
28
+ def get_object_keys (activity_input : GetObjectKeysActivityInput ) -> List [str ]:
29
29
"""Function that list objects by key."""
30
- session = aioboto3 .Session ()
31
30
object_keys = []
32
- async with session .client ("s3" ) as s3 :
33
- response = await s3 .list_objects_v2 (
34
- Bucket = activity_input .bucket , Prefix = activity_input .path
31
+ s3 = boto3 .client ("s3" )
32
+ response = s3 .list_objects_v2 (
33
+ Bucket = activity_input .bucket , Prefix = activity_input .path
34
+ )
35
+ for obj in response .get ("Contents" , []):
36
+ object_keys .append (obj ["Key" ])
37
+ if len (object_keys ) == 0 :
38
+ raise FileNotFoundError (
39
+ f"No files found in { activity_input .bucket } /{ activity_input .path } "
35
40
)
36
41
37
- for obj in response .get ("Contents" , []):
38
- object_keys .append (obj ["Key" ])
39
- if len (object_keys ) == 0 :
40
- raise FileNotFoundError (
41
- f"No files found in { activity_input .bucket } /{ activity_input .path } "
42
- )
43
-
44
42
return object_keys
45
43
46
44
47
45
@activity .defn
48
- async def data_trans_and_land (activity_input : DataTransAndLandActivityInput ) -> str :
46
+ def data_trans_and_land (activity_input : DataTransAndLandActivityInput ) -> str :
49
47
"""Function that convert proto to parquet and save to S3."""
50
48
key = activity_input .object_key
51
- data = await get_data_from_object_key (activity_input .export_s3_bucket , key )
49
+ data = get_data_from_object_key (activity_input .export_s3_bucket , key )
52
50
activity .logger .info ("Convert proto to parquet for file: %s" , key )
53
51
parquet_data = convert_proto_to_parquet_flatten (data )
54
52
activity .logger .info ("Finish transformation for file: %s" , key )
55
- return await save_to_sink (
53
+ return save_to_sink (
56
54
parquet_data , activity_input .output_s3_bucket , activity_input .write_path
57
55
)
58
56
59
57
60
- async def get_data_from_object_key (
58
+ def get_data_from_object_key (
61
59
bucket_name : str , object_key : str
62
60
) -> export .WorkflowExecutions :
63
61
"""Function that get object by key."""
64
62
v = export .WorkflowExecutions ()
65
63
66
- session = aioboto3 .Session ()
67
- async with session .client ("s3" ) as s3 :
68
- try :
69
- get_object = await s3 .get_object (Bucket = bucket_name , Key = object_key )
70
- data = await get_object ["Body" ].read ()
71
- except Exception as e :
72
- activity .logger .error (f"Error reading object: { e } " )
73
- raise e
74
-
75
- v .ParseFromString (data )
76
- return v
64
+ s3 = boto3 .client ("s3" )
65
+ try :
66
+ data = s3 .get_object (Bucket = bucket_name , Key = object_key )["Body" ].read ()
67
+ except Exception as e :
68
+ activity .logger .error (f"Error reading object: { e } " )
69
+ raise e
70
+ v .ParseFromString (data )
71
+ return v
77
72
78
73
79
74
def convert_proto_to_parquet_flatten (wfs : export .WorkflowExecutions ) -> pd .DataFrame :
@@ -111,19 +106,18 @@ def convert_proto_to_parquet_flatten(wfs: export.WorkflowExecutions) -> pd.DataF
111
106
return df_flatten
112
107
113
108
114
- async def save_to_sink (data : pd .DataFrame , s3_bucket : str , write_path : str ) -> str :
109
+ def save_to_sink (data : pd .DataFrame , s3_bucket : str , write_path : str ) -> str :
115
110
"""Function that save object to s3 bucket."""
116
111
write_bytes = data .to_parquet (None , compression = "snappy" , index = False )
117
112
uuid_name = uuid .uuid1 ()
118
113
file_name = f"{ uuid_name } .parquet"
119
114
activity .logger .info ("Writing to S3 bucket: %s" , file_name )
120
115
121
- session = aioboto3 .Session ()
122
- async with session .client ("s3" ) as s3 :
123
- try :
124
- key = f"{ write_path } /{ file_name } "
125
- await s3 .put_object (Bucket = s3_bucket , Key = key , Body = write_bytes )
126
- return key
127
- except Exception as e :
128
- activity .logger .error (f"Error saving to sink: { e } " )
129
- raise e
116
+ s3 = boto3 .client ("s3" )
117
+ try :
118
+ key = f"{ write_path } /{ file_name } "
119
+ s3 .put_object (Bucket = s3_bucket , Key = key , Body = write_bytes )
120
+ return key
121
+ except Exception as e :
122
+ activity .logger .error (f"Error saving to sink: { e } " )
123
+ raise e
0 commit comments