Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion libs/executors/garf/executors/entrypoints/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@
import sys

import garf.executors
from garf.executors import config, exceptions, setup, workflow, workflow_runner
from garf.executors import config, exceptions, setup
from garf.executors.entrypoints import utils
from garf.executors.entrypoints.tracer import initialize_tracer
from garf.executors.telemetry import tracer
from garf.executors.workflows import workflow, workflow_runner
from garf.io import reader
from opentelemetry import trace

Expand Down
3 changes: 2 additions & 1 deletion libs/executors/garf/executors/entrypoints/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
import pydantic
import typer
import uvicorn
from garf.executors import exceptions, setup, workflow_runner
from garf.executors import exceptions, setup
from garf.executors.entrypoints import utils
from garf.executors.entrypoints.tracer import initialize_tracer
from garf.executors.workflows import workflow_runner
from garf.io import reader
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from pydantic_settings import BaseSettings, SettingsConfigDict
Expand Down
Empty file.
49 changes: 49 additions & 0 deletions libs/executors/garf/executors/workflows/gcp_workflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
run:
for:
value: pair
in: ${pairs}
steps:
- log_source:
call: sys.log
args:
data: ${pair.alias}
- execute_queries:
parallel:
for:
value: query
in: ${pair.queries}
steps:
- log_query:
call: sys.log
args:
data: ${pair}
- execute_single_query:
try:
call: http.post
args:
url: ${sys.get_env("GARF_ENDPOINT") + "/api/execute"}
auth:
type: OIDC
body:
source: ${pair.fetcher}
# query_path: ${query.path}
title: ${query.query.title}
query: ${query.query.text}
context:
fetcher_parameters: ${pair.fetcher_parameters}
writer: ${pair.writer}
writer_parameters: ${pair.writer_parameters}
query_parameters:
macro: ${pair.query_parameters.macro}
template: ${pair.query_parameters.template}
result: task_resp
except:
as: e
assign:
- task_resp:
status: "failed"
error: ${e.message}
- log_result:
call: sys.log
args:
data: ${task_resp}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@
import re
from typing import Final

from garf.executors import exceptions, setup, workflow
import yaml
from garf.executors import exceptions, setup
from garf.executors.telemetry import tracer
from garf.executors.workflows import workflow
from garf.io import reader

logger = logging.getLogger(__name__)

_REMOTE_FILES_PATTERN: Final[str] = (
'^(http|gs|s3|aruze|hdfs|webhdfs|ssh|scp|sftp)'
)
_SCRIPT_PATH = pathlib.Path(__file__).parent


class WorkflowRunner:
Expand Down Expand Up @@ -112,3 +115,27 @@ def run(
)
execution_results.append(step_name)
return execution_results

def compile(self, path: str | pathlib.Path) -> str:
"""Saves workflow with expanded anchors."""
return self.workflow.save(path)

def deploy(self, path: str | pathlib.Path) -> str:
"""Prepares workflow for deployment to Google Cloud Workflows."""
wf = self.workflow.model_dump(exclude_none=True).get('steps')
with open(_SCRIPT_PATH / 'gcp_workflow.yaml', 'r', encoding='utf-8') as f:
cloud_workflow_run_template = yaml.safe_load(f)
init = {
'init': {
'assign': [{'pairs': wf}],
},
}
cloud_workflow = {
'main': {
'params': [],
'steps': [init, cloud_workflow_run_template],
},
}
with open(path, 'w', encoding='utf-8') as f:
yaml.dump(cloud_workflow, f, sort_keys=False)
return f'Workflow is saved to {path}'
14 changes: 11 additions & 3 deletions libs/executors/tests/end-to-end/test_workflow.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
writer_section: &writer_defaults
writer: console
writer_parameters:
format: json
common_context: &empty_context
context:
query_parameters:
macro: {}
template: {}

steps:
- alias: test
fetcher: fake
Expand All @@ -11,9 +21,7 @@ steps:
dimensions.name AS name,
metrics.clicks AS clicks
FROM resource
writer: console
writer_parameters:
format: json
<<: [*empty_context, *writer_defaults]
fetcher_parameters:
data:
- resource: Campaign A
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

import yaml
from garf.executors.workflow import Workflow
from garf.executors.workflows.workflow import Workflow


class TestWorkflow:
Expand Down
53 changes: 53 additions & 0 deletions libs/executors/tests/unit/workflows/test_workflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
writer_section: &writer_defaults
writer: console
writer_parameters:
format: json
common_context: &empty_context
context:
query_parameters:
macro: {}
template: {}

steps:
- alias: test
fetcher: fake
queries:
- query:
title: test_query
text: |
SELECT
resource,
dimensions.name AS name,
metrics.clicks AS clicks
FROM resource
<<: [*empty_context, *writer_defaults]
fetcher_parameters:
data:
- resource: Campaign A
dimensions:
name: Ad Group 1
id: 101
metrics:
clicks: 1500
cost: 250.75
- resource: Campaign B
dimensions:
name: Ad Group 2
id: 102
metrics:
clicks: 2300
cost: 410.20
- resource: Campaign C
dimensions:
name: Ad Group 3
id: 103
metrics:
clicks: 800
cost: 120.50
- resource: Campaign A
dimensions:
name: Ad Group 4
id: 104
metrics:
clicks: 3200
cost: 600.00
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,27 @@
# limitations under the License.
import pathlib

from garf.executors import workflow_runner
from garf.executors.workflows import workflow_runner

_SCRIPT_PATH = pathlib.Path(__file__).parent

_TEST_WORKFLOW_PATH = _SCRIPT_PATH / 'test_workflow.yaml'


class TestWorkflowRunner:
def test_run_returns_executed_step_names(self):
runner = workflow_runner.WorkflowRunner.from_file(
_SCRIPT_PATH / '../end-to-end/test_workflow.yaml'
)
runner = workflow_runner.WorkflowRunner.from_file(_TEST_WORKFLOW_PATH)
results = runner.run()
assert results == ['1-fake-test']

def test_compile_saves_file(self, tmp_path):
tmp_workflow_path = tmp_path / 'workflow.yaml'
runner = workflow_runner.WorkflowRunner.from_file(_TEST_WORKFLOW_PATH)
result = runner.compile(tmp_workflow_path)
assert result == f'Workflow is saved to {tmp_workflow_path}'

def test_deploy_saves_file(self, tmp_path):
tmp_workflow_path = tmp_path / 'workflow.yaml'
runner = workflow_runner.WorkflowRunner.from_file(_TEST_WORKFLOW_PATH)
result = runner.deploy(tmp_workflow_path)
assert result == f'Workflow is saved to {tmp_workflow_path}'