Skip to content

Commit aa14f19

Browse files
authored
feat: load method for partition replace using aggregation queries (#15)
* fix: ignoring asset compilation even if window is for a month with REPLACE load method * feat: replace_all load method for aggregate queries Signed-off-by: Kush <[email protected]>
1 parent cb26e8f commit aa14f19

File tree

17 files changed

+368
-113
lines changed

17 files changed

+368
-113
lines changed

.goreleaser.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ dockers:
5252
extra_files:
5353
- ./entrypoint.sh
5454
build_flag_templates:
55-
- "--build-arg=OPTIMUS_RELEASE_URL=https://github.com/odpf/optimus/releases/download/v0.0.1/optimus_0.0.1_linux_x86_64.tar.gz"
55+
- "--build-arg=OPTIMUS_RELEASE_URL=https://github.com/odpf/optimus/releases/download/v0.0.3/optimus_0.0.3_linux_x86_64.tar.gz"
5656

5757
brews:
5858
- name: optimus-plugins-odpf

task/bq2bq/README.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Bigquery SQL transformation
2+
3+
Optimus plugin that supports variety of load methods to execute SQL transformations.
4+
Features
5+
- Automatic dependency resolution
6+
- Append to a partition
7+
- Replace table partition
8+
- Merge statements
9+
- BQ Scripts
10+
11+
#### Load Method
12+
13+
The way data loaded to destination table depends on the partition configuration of the destination tables
14+
15+
| Load Method | No Partition | Partitioned Table |
16+
| -------------|------------------------------------------------------------------------------------------------| -------------------------------------------------------------------------------------------|
17+
| APPEND | Append new records to destination table | Append new records to destination table per partition based on localised start_time |
18+
| MERGE | Load the data using DML Merge statement, all of the load logic lies on DML merge statement | Load the data using DML Merge statement, all of the load logic lies on DML merge statement |
19+
| REPLACE | Truncate/Clean the table before insert new records | Clean records in destination partition before insert new record to new partition |
20+
| REPLACE_MERGE| Doesn't work for non partitioned tables and partitioned tables with ingestion time | Same as REPLACE but uses Merge query to emulate replace |
21+
| REPLACE_ALL | Truncate/Clean the table before insert new records, use this instead of REPLACE for aggregation| Clean records in destination partition before insert new record to new partition |
22+
23+
Note: if `REPLACE` load method is used and window size greater than the partition delta,
24+
it is assumed the table is partitioned with `DAY` at the moment.
25+
This will split the query into multiple queries executing for each partition one by one.
26+
27+
Note: `REPLACE_MERGE` is experimental and might not work properly for deeply
28+
nested structs, it is advised to test it before using in production

task/bq2bq/executor/bumblebee/config.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,15 @@ class LoadMethod(Enum):
4141

4242
REPLACE_MERGE = "REPLACE_MERGE"
4343

44+
REPLACE_ALL = "REPLACE_ALL"
45+
4446
MERGE = "MERGE"
4547

4648
@property
4749
def write_disposition(self):
4850
if self == LoadMethod.APPEND:
4951
return WriteDisposition.WRITE_APPEND
50-
elif self == LoadMethod.REPLACE or self == LoadMethod.REPLACE_MERGE:
52+
elif self == LoadMethod.REPLACE or self == LoadMethod.REPLACE_MERGE or self == LoadMethod.REPLACE_ALL:
5153
return WriteDisposition.WRITE_TRUNCATE
5254
else:
5355
raise Exception("write disposition is only for APPEND and REPLACE load method")

task/bq2bq/executor/bumblebee/transformation.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,27 @@ def transform(self):
134134
)
135135

136136
transformation.transform()
137+
elif self.task_config.load_method is LoadMethod.REPLACE_ALL:
138+
# query bq and check if table is partitioned
139+
bq_destination_table = self.bigquery_service.get_table(self.task_config.destination_table)
140+
if bq_destination_table.time_partitioning is None and bq_destination_table.range_partitioning is None:
141+
task_queries = self.sql_query.split(OPTIMUS_QUERY_BREAK_MARKER)
142+
transformation = TableTransformation(self.bigquery_service,
143+
self.task_config,
144+
task_queries[0],
145+
self.dstart,
146+
self.dend,
147+
self.dry_run,
148+
localised_execution_time)
149+
else:
150+
# queries where source data/partition map with start date as destination partition
151+
transformation = SinglePartitionTransformation(self.bigquery_service,
152+
self.task_config,
153+
self.sql_query,
154+
self.dstart, self.dend,
155+
self.dry_run,
156+
localised_execution_time,)
157+
transformation.transform()
137158
else:
138159
raise Exception("unsupported load method {}".format(self.task_config.load_method))
139160

@@ -222,14 +243,12 @@ def __init__(self, bigquery_service: BigqueryService,
222243
dstart: datetime,
223244
dend: datetime,
224245
dry_run: bool,
225-
execution_time: datetime,
226-
partition_strategy: timedelta):
246+
execution_time: datetime):
227247
self.bigquery_service = bigquery_service
228248
self.task_config = task_config
229249
self.task_query = task_query
230250

231251
self.dry_run = dry_run
232-
self.partition_strategy = partition_strategy
233252
self.window = CustomWindow(dstart, dend)
234253
self.execution_time = execution_time
235254

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
VERSION = "0.0.1"
1+
VERSION = "0.0.2"

task/bq2bq/executor/example.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -178,10 +178,10 @@ def partition_append():
178178
)
179179

180180

181-
def merge_replace():
181+
def replace_merge():
182182
bq2bq(
183-
"./samples/tasks/merge_replace/auto/properties.cfg",
184-
"./samples/tasks/merge_replace/auto/query.sql",
183+
"samples/tasks/replace_merge/auto/properties.cfg",
184+
"samples/tasks/replace_merge/auto/query.sql",
185185
None,
186186
datetime(2020, 12, 5, 1),
187187
datetime(2020, 12, 5, 1),
@@ -190,4 +190,16 @@ def merge_replace():
190190
)
191191

192192

193-
merge_replace()
193+
def replace_all():
194+
bq2bq(
195+
"samples/tasks/replace_all/basic/properties.cfg",
196+
"samples/tasks/replace_all/basic/query.sql",
197+
None,
198+
datetime(2021, 9, 1, 1),
199+
datetime(2021, 9, 30, 1),
200+
execution_date,
201+
False
202+
)
203+
204+
205+
replace_all()
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
[DESTINATION]
2+
PROJECT="g-project"
3+
DATASET="playground"
4+
TABLE="sample_replace_monthly"
5+
6+
[TRANSFORMATION]
7+
WINDOW_SIZE = 720h
8+
WINDOW_OFFSET = 0
9+
WINDOW_TRUNCATE_UPTO = M
10+
11+
[LOAD]
12+
LOAD_METHOD="REPLACE_ALL"
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
select
2+
`hakai`,
3+
`rasengan`,
4+
`over`,
5+
TIMESTAMP ('2021-09-01T01:02:03') as `event_timestamp`
6+
from
7+
`g-project.playground.sample_select`
8+
WHERE
9+
DATE(`load_timestamp`) >= DATE('2021-09-01')
10+
AND DATE(`load_timestamp`) < DATE('2021-09-30')

task/bq2bq/executor/samples/tasks/merge_replace/auto/properties.cfg renamed to task/bq2bq/executor/samples/tasks/replace_merge/auto/properties.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,4 @@ WINDOW_OFFSET = 0
99
WINDOW_TRUNCATE_UPTO = d
1010

1111
[LOAD]
12-
LOAD_METHOD="REPLACE"
12+
LOAD_METHOD="REPLACE_MERGE"

task/bq2bq/executor/samples/tasks/merge_replace/auto/query.sql renamed to task/bq2bq/executor/samples/tasks/replace_merge/auto/query.sql

File renamed without changes.

0 commit comments

Comments
 (0)