Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions libs/core/garf/core/fetchers/fake.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,21 @@ def __init__(
query_specification_builder: query_editor.QuerySpecification = (
query_editor.QuerySpecification
),
data: Sequence[dict[str, Any]] | None = None,
data_location: str | os.PathLike[str] | None = None,
csv_location: str | os.PathLike[str] | None = None,
json_location: str | os.PathLike[str] | None = None,
**kwargs: str,
) -> None:
if not api_client and not (
data_location := json_location or csv_location or data_location
):
raise report_fetcher.ApiReportFetcherError(
'Missing fake data for the fetcher.'
)
if not api_client:
api_client = api_clients.FakeApiClient.from_file(data_location)
if data:
api_client = api_clients.FakeApiClient(results=list(data))
elif data_location := json_location or csv_location or data_location:
api_client = api_clients.FakeApiClient.from_file(data_location)
else:
raise report_fetcher.ApiReportFetcherError(
'Missing fake data for the fetcher.'
)
super().__init__(api_client, parser, query_specification_builder, **kwargs)

@classmethod
Expand Down
5 changes: 4 additions & 1 deletion libs/executors/garf/executors/entrypoints/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import argparse
import logging
import pathlib
import sys

import garf.executors
Expand Down Expand Up @@ -82,6 +83,7 @@ def main():
)
reader_client = reader.create_reader(args.input)
if workflow_file := args.workflow:
wf_parent = pathlib.Path.cwd() / pathlib.Path(workflow_file).parent
execution_workflow = workflow.Workflow.from_file(workflow_file)
for i, step in enumerate(execution_workflow.steps, 1):
with tracer.start_as_current_span(f'{i}-{step.fetcher}'):
Expand All @@ -99,7 +101,8 @@ def main():
)
for query in queries:
if isinstance(query, garf.executors.workflow.QueryPath):
batch[query.path] = reader_client.read(query.path)
query_path = wf_parent / pathlib.Path(query.path)
batch[query.path] = reader_client.read(query_path)
else:
batch[query.query.title] = query.query.text
query_executor.execute_batch(
Expand Down
5 changes: 3 additions & 2 deletions libs/executors/garf/executors/execution_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import os
import pathlib
from typing import Any

import pydantic
import smart_open
Expand All @@ -42,8 +43,8 @@ class ExecutionContext(pydantic.BaseModel):
query_parameters: query_editor.GarfQueryParameters | None = pydantic.Field(
default_factory=dict
)
fetcher_parameters: dict[str, str | bool | int | list[str | int]] | None = (
pydantic.Field(default_factory=dict)
fetcher_parameters: dict[str, Any] | None = pydantic.Field(
default_factory=dict
)
writer: str | list[str] | None = None
writer_parameters: dict[str, str] | None = pydantic.Field(
Expand Down
4 changes: 2 additions & 2 deletions libs/executors/tests/end-to-end/query.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
SELECT
resource,
dimension.name AS name,
metric.clicks AS clics
dimensions.name AS name,
metrics.clicks AS clicks
FROM resource
19 changes: 5 additions & 14 deletions libs/executors/tests/end-to-end/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,20 +124,9 @@ def test_fake_source_from_config(self, tmp_path):
assert result.returncode == 0
assert json.loads(result.stdout) == self.expected_output

def test_fake_source_from_workflow(self, tmp_path):
def test_fake_source_from_workflow(self):
workflow_path = _SCRIPT_PATH / 'test_workflow.yaml'
with open(workflow_path, 'r', encoding='utf-8') as f:
workflow_data = yaml.safe_load(f)
original_data_location = workflow_data['steps'][0]['fetcher_parameters'][
'data_location'
]
workflow_data['steps'][0]['fetcher_parameters']['data_location'] = str(
_SCRIPT_PATH / original_data_location
)
tmp_workflow = tmp_path / 'workflow.yaml'
with open(tmp_workflow, 'w', encoding='utf-8') as f:
yaml.dump(workflow_data, f, encoding='utf-8')
command = f'garf -w {str(tmp_workflow)} --loglevel ERROR'
command = f'garf -w {str(workflow_path)} --loglevel ERROR'
result = subprocess.run(
command,
shell=True,
Expand All @@ -147,4 +136,6 @@ def test_fake_source_from_workflow(self, tmp_path):
)

assert result.returncode == 0
assert json.loads(result.stdout) == self.expected_output
for output in result.stdout.split('\n'):
if output:
assert json.loads(output) == self.expected_output
31 changes: 30 additions & 1 deletion libs/executors/tests/end-to-end/test_workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ steps:
- alias: test
fetcher: fake
queries:
- path: query.sql
- query:
title: test_query
text: |
Expand All @@ -14,4 +15,32 @@ steps:
writer_parameters:
format: json
fetcher_parameters:
data_location: test.json
data:
- resource: Campaign A
dimensions:
name: Ad Group 1
id: 101
metrics:
clicks: 1500
cost: 250.75
- resource: Campaign B
dimensions:
name: Ad Group 2
id: 102
metrics:
clicks: 2300
cost: 410.20
- resource: Campaign C
dimensions:
name: Ad Group 3
id: 103
metrics:
clicks: 800
cost: 120.50
- resource: Campaign A
dimensions:
name: Ad Group 4
id: 104
metrics:
clicks: 3200
cost: 600.00