Skip to content

Commit adff013

Browse files
feat(executors): add support for preparing Google Cloud workflow file
1 parent c8ca410 commit adff013

File tree

10 files changed

+162
-11
lines changed

10 files changed

+162
-11
lines changed

libs/executors/garf/executors/entrypoints/cli.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,11 @@
2525
import sys
2626

2727
import garf.executors
28-
from garf.executors import config, exceptions, setup, workflow, workflow_runner
28+
from garf.executors import config, exceptions, setup
2929
from garf.executors.entrypoints import utils
3030
from garf.executors.entrypoints.tracer import initialize_tracer
3131
from garf.executors.telemetry import tracer
32+
from garf.executors.workflows import workflow, workflow_runner
3233
from garf.io import reader
3334
from opentelemetry import trace
3435

libs/executors/garf/executors/entrypoints/server.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@
2121
import pydantic
2222
import typer
2323
import uvicorn
24-
from garf.executors import exceptions, setup, workflow_runner
24+
from garf.executors import exceptions, setup
2525
from garf.executors.entrypoints import utils
2626
from garf.executors.entrypoints.tracer import initialize_tracer
27+
from garf.executors.workflows import workflow_runner
2728
from garf.io import reader
2829
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
2930
from pydantic_settings import BaseSettings, SettingsConfigDict

libs/executors/garf/executors/workflows/__init__.py

Whitespace-only changes.
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
run:
2+
for:
3+
value: pair
4+
in: ${pairs}
5+
steps:
6+
- log_source:
7+
call: sys.log
8+
args:
9+
data: ${pair.alias}
10+
- execute_queries:
11+
parallel:
12+
for:
13+
value: query
14+
in: ${pair.queries}
15+
steps:
16+
- log_query:
17+
call: sys.log
18+
args:
19+
data: ${pair}
20+
- execute_single_query:
21+
try:
22+
call: http.post
23+
args:
24+
url: ${sys.get_env("GARF_ENDPOINT") + "/api/execute"}
25+
auth:
26+
type: OIDC
27+
body:
28+
source: ${pair.fetcher}
29+
# query_path: ${query.path}
30+
title: ${query.query.title}
31+
query: ${query.query.text}
32+
context:
33+
fetcher_parameters: ${pair.fetcher_parameters}
34+
writer: ${pair.writer}
35+
writer_parameters: ${pair.writer_parameters}
36+
query_parameters:
37+
macro: ${pair.query_parameters.macro}
38+
template: ${pair.query_parameters.template}
39+
result: task_resp
40+
except:
41+
as: e
42+
assign:
43+
- task_resp:
44+
status: "failed"
45+
error: ${e.message}
46+
- log_result:
47+
call: sys.log
48+
args:
49+
data: ${task_resp}
File renamed without changes.

libs/executors/garf/executors/workflow_runner.py renamed to libs/executors/garf/executors/workflows/workflow_runner.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,18 @@
2020
import re
2121
from typing import Final
2222

23-
from garf.executors import exceptions, setup, workflow
23+
import yaml
24+
from garf.executors import exceptions, setup
2425
from garf.executors.telemetry import tracer
26+
from garf.executors.workflows import workflow
2527
from garf.io import reader
2628

2729
logger = logging.getLogger(__name__)
2830

2931
_REMOTE_FILES_PATTERN: Final[str] = (
3032
'^(http|gs|s3|aruze|hdfs|webhdfs|ssh|scp|sftp)'
3133
)
34+
_SCRIPT_PATH = pathlib.Path(__file__).parent
3235

3336

3437
class WorkflowRunner:
@@ -112,3 +115,27 @@ def run(
112115
)
113116
execution_results.append(step_name)
114117
return execution_results
118+
119+
def compile(self, path: str | pathlib.Path) -> str:
120+
"""Saves workflow with expanded anchors."""
121+
return self.workflow.save(path)
122+
123+
def deploy(self, path: str | pathlib.Path) -> str:
124+
"""Prepares workflow for deployment to Google Cloud Workflows."""
125+
wf = self.workflow.model_dump(exclude_none=True).get('steps')
126+
with open(_SCRIPT_PATH / 'gcp_workflow.yaml', 'r', encoding='utf-8') as f:
127+
cloud_workflow_run_template = yaml.safe_load(f)
128+
init = {
129+
'init': {
130+
'assign': [{'pairs': wf}],
131+
},
132+
}
133+
cloud_workflow = {
134+
'main': {
135+
'params': [],
136+
'steps': [init, cloud_workflow_run_template],
137+
},
138+
}
139+
with open(path, 'w', encoding='utf-8') as f:
140+
yaml.dump(cloud_workflow, f, sort_keys=False)
141+
return f'Workflow is saved to {path}'

libs/executors/tests/end-to-end/test_workflow.yaml

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,13 @@
1+
writer_section: &writer_defaults
2+
writer: console
3+
writer_parameters:
4+
format: json
5+
common_context: &empty_context
6+
context:
7+
query_parameters:
8+
macro: {}
9+
template: {}
10+
111
steps:
212
- alias: test
313
fetcher: fake
@@ -11,9 +21,7 @@ steps:
1121
dimensions.name AS name,
1222
metrics.clicks AS clicks
1323
FROM resource
14-
writer: console
15-
writer_parameters:
16-
format: json
24+
<<: [*empty_context, *writer_defaults]
1725
fetcher_parameters:
1826
data:
1927
- resource: Campaign A

libs/executors/tests/unit/test_workflow.py renamed to libs/executors/tests/unit/workflows/test_workflow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414

1515
import yaml
16-
from garf.executors.workflow import Workflow
16+
from garf.executors.workflows.workflow import Workflow
1717

1818

1919
class TestWorkflow:
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
writer_section: &writer_defaults
2+
writer: console
3+
writer_parameters:
4+
format: json
5+
common_context: &empty_context
6+
context:
7+
query_parameters:
8+
macro: {}
9+
template: {}
10+
11+
steps:
12+
- alias: test
13+
fetcher: fake
14+
queries:
15+
- query:
16+
title: test_query
17+
text: |
18+
SELECT
19+
resource,
20+
dimensions.name AS name,
21+
metrics.clicks AS clicks
22+
FROM resource
23+
<<: [*empty_context, *writer_defaults]
24+
fetcher_parameters:
25+
data:
26+
- resource: Campaign A
27+
dimensions:
28+
name: Ad Group 1
29+
id: 101
30+
metrics:
31+
clicks: 1500
32+
cost: 250.75
33+
- resource: Campaign B
34+
dimensions:
35+
name: Ad Group 2
36+
id: 102
37+
metrics:
38+
clicks: 2300
39+
cost: 410.20
40+
- resource: Campaign C
41+
dimensions:
42+
name: Ad Group 3
43+
id: 103
44+
metrics:
45+
clicks: 800
46+
cost: 120.50
47+
- resource: Campaign A
48+
dimensions:
49+
name: Ad Group 4
50+
id: 104
51+
metrics:
52+
clicks: 3200
53+
cost: 600.00

libs/executors/tests/unit/test_workflow_runner.py renamed to libs/executors/tests/unit/workflows/test_workflow_runner.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,27 @@
1313
# limitations under the License.
1414
import pathlib
1515

16-
from garf.executors import workflow_runner
16+
from garf.executors.workflows import workflow_runner
1717

1818
_SCRIPT_PATH = pathlib.Path(__file__).parent
1919

20+
_TEST_WORKFLOW_PATH = _SCRIPT_PATH / 'test_workflow.yaml'
21+
2022

2123
class TestWorkflowRunner:
2224
def test_run_returns_executed_step_names(self):
23-
runner = workflow_runner.WorkflowRunner.from_file(
24-
_SCRIPT_PATH / '../end-to-end/test_workflow.yaml'
25-
)
25+
runner = workflow_runner.WorkflowRunner.from_file(_TEST_WORKFLOW_PATH)
2626
results = runner.run()
2727
assert results == ['1-fake-test']
28+
29+
def test_compile_saves_file(self, tmp_path):
30+
tmp_workflow_path = tmp_path / 'workflow.yaml'
31+
runner = workflow_runner.WorkflowRunner.from_file(_TEST_WORKFLOW_PATH)
32+
result = runner.compile(tmp_workflow_path)
33+
assert result == f'Workflow is saved to {tmp_workflow_path}'
34+
35+
def test_deploy_saves_file(self, tmp_path):
36+
tmp_workflow_path = tmp_path / 'workflow.yaml'
37+
runner = workflow_runner.WorkflowRunner.from_file(_TEST_WORKFLOW_PATH)
38+
result = runner.deploy(tmp_workflow_path)
39+
assert result == f'Workflow is saved to {tmp_workflow_path}'

0 commit comments

Comments
 (0)