Skip to content

Commit 84c1440

Browse files
authored
[executors] feat: add support for multiple writers for one query
1 parent 0835c80 commit 84c1440

File tree

6 files changed

+170
-48
lines changed

6 files changed

+170
-48
lines changed

libs/executors/garf_executors/api_executor.py

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
class ApiExecutionContext(execution_context.ExecutionContext):
3636
"""Common context for executing one or more queries."""
3737

38-
writer: str = 'console'
38+
writer: str | list[str] = 'console'
3939

4040

4141
class ApiQueryExecutor(executor.Executor):
@@ -94,20 +94,27 @@ def execute(
9494
args=context.query_parameters,
9595
**context.fetcher_parameters,
9696
)
97-
writer_client = context.writer_client
98-
logger.debug(
99-
'Start writing data for query %s via %s writer',
100-
title,
101-
type(writer_client),
102-
)
103-
result = writer_client.write(results, title)
104-
logger.debug(
105-
'Finish writing data for query %s via %s writer',
106-
title,
107-
type(writer_client),
108-
)
97+
writer_clients = context.writer_clients
98+
if not writer_clients:
99+
logger.warning('No writers configured, skipping write operation')
100+
return None
101+
writing_results = []
102+
for writer_client in writer_clients:
103+
logger.debug(
104+
'Start writing data for query %s via %s writer',
105+
title,
106+
type(writer_client),
107+
)
108+
result = writer_client.write(results, title)
109+
logger.debug(
110+
'Finish writing data for query %s via %s writer',
111+
title,
112+
type(writer_client),
113+
)
114+
writing_results.append(result)
109115
logger.info('%s executed successfully', title)
110-
return result
116+
# Return the last writer's result for backward compatibility
117+
return writing_results[-1] if writing_results else None
111118
except Exception as e:
112119
logger.error('%s generated an exception: %s', title, str(e))
113120
raise exceptions.GarfExecutorError(

libs/executors/garf_executors/bq_executor.py

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -104,20 +104,27 @@ def execute(
104104
else:
105105
results = report.GarfReport()
106106
if context.writer and results:
107-
writer_client = context.writer_client
108-
logger.debug(
109-
'Start writing data for query %s via %s writer',
110-
title,
111-
type(writer_client),
112-
)
113-
writing_result = writer_client.write(results, title)
114-
logger.debug(
115-
'Finish writing data for query %s via %s writer',
116-
title,
117-
type(writer_client),
118-
)
119-
logger.info('%s executed successfully', title)
120-
return writing_result
107+
writer_clients = context.writer_clients
108+
if not writer_clients:
109+
logger.warning('No writers configured, skipping write operation')
110+
else:
111+
writing_results = []
112+
for writer_client in writer_clients:
113+
logger.debug(
114+
'Start writing data for query %s via %s writer',
115+
title,
116+
type(writer_client),
117+
)
118+
writing_result = writer_client.write(results, title)
119+
logger.debug(
120+
'Finish writing data for query %s via %s writer',
121+
title,
122+
type(writer_client),
123+
)
124+
writing_results.append(writing_result)
125+
logger.info('%s executed successfully', title)
126+
# Return the last writer's result for backward compatibility
127+
return writing_results[-1] if writing_results else None
121128
return results
122129
except google_cloud_exceptions.GoogleCloudError as e:
123130
raise BigQueryExecutorError(e) from e

libs/executors/garf_executors/execution_context.py

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class ExecutionContext(pydantic.BaseModel):
3535
Attributes:
3636
query_parameters: Parameters to dynamically change query text.
3737
fetcher_parameters: Parameters to specify fetching setup.
38-
writer: Type of writer to use.
38+
writer: Type of writer to use. Can be a single writer string or list of writers.
3939
writer_parameters: Optional parameters to setup writer.
4040
"""
4141

@@ -45,7 +45,7 @@ class ExecutionContext(pydantic.BaseModel):
4545
fetcher_parameters: dict[str, str | bool | int | list[str | int]] | None = (
4646
pydantic.Field(default_factory=dict)
4747
)
48-
writer: str | None = None
48+
writer: str | list[str] | None = None
4949
writer_parameters: dict[str, str] | None = pydantic.Field(
5050
default_factory=dict
5151
)
@@ -75,9 +75,40 @@ def save(self, path: str | pathlib.Path | os.PathLike[str]) -> str:
7575

7676
@property
7777
def writer_client(self) -> abs_writer.AbsWriter:
78-
writer_client = writer.create_writer(self.writer, **self.writer_parameters)
79-
if self.writer == 'bq':
78+
"""Returns single writer client."""
79+
if isinstance(self.writer, list) and len(self.writer) > 0:
80+
writer_type = self.writer[0]
81+
else:
82+
writer_type = self.writer
83+
84+
writer_params = self.writer_parameters or {}
85+
86+
if not writer_type:
87+
raise ValueError('No writer specified')
88+
89+
writer_client = writer.create_writer(writer_type, **writer_params)
90+
if writer_type == 'bq':
8091
_ = writer_client.create_or_get_dataset()
81-
if self.writer == 'sheet':
92+
if writer_type == 'sheet':
8293
writer_client.init_client()
8394
return writer_client
95+
96+
@property
97+
def writer_clients(self) -> list[abs_writer.AbsWriter]:
98+
"""Returns list of writer clients."""
99+
if not self.writer:
100+
return []
101+
102+
# Convert single writer to list for uniform processing
103+
writers_to_use = self.writer if isinstance(self.writer, list) else [self.writer]
104+
writer_params = self.writer_parameters or {}
105+
106+
clients = []
107+
for writer_type in writers_to_use:
108+
writer_client = writer.create_writer(writer_type, **writer_params)
109+
if writer_type == 'bq':
110+
_ = writer_client.create_or_get_dataset()
111+
if writer_type == 'sheet':
112+
writer_client.init_client()
113+
clients.append(writer_client)
114+
return clients

libs/executors/garf_executors/sql_executor.py

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -106,19 +106,26 @@ def execute(
106106
finally:
107107
conn.connection.execute(f'DROP TABLE {temp_table_name}')
108108
if context.writer and results:
109-
writer_client = context.writer_client
110-
logger.debug(
111-
'Start writing data for query %s via %s writer',
112-
title,
113-
type(writer_client),
114-
)
115-
writing_result = writer_client.write(results, title)
116-
logger.debug(
117-
'Finish writing data for query %s via %s writer',
118-
title,
119-
type(writer_client),
120-
)
121-
logger.info('%s executed successfully', title)
122-
return writing_result
109+
writer_clients = context.writer_clients
110+
if not writer_clients:
111+
logger.warning('No writers configured, skipping write operation')
112+
else:
113+
writing_results = []
114+
for writer_client in writer_clients:
115+
logger.debug(
116+
'Start writing data for query %s via %s writer',
117+
title,
118+
type(writer_client),
119+
)
120+
writing_result = writer_client.write(results, title)
121+
logger.debug(
122+
'Finish writing data for query %s via %s writer',
123+
title,
124+
type(writer_client),
125+
)
126+
writing_results.append(writing_result)
127+
logger.info('%s executed successfully', title)
128+
# Return the last writer's result for backward compatibility
129+
return writing_results[-1] if writing_results else None
123130
span.set_attribute('execute.num_results', len(results))
124131
return results

libs/executors/tests/unit/test_api_executor.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,3 +85,25 @@ def test_from_fetcher_alias_returns_initialized_executor(self, tmp_path):
8585
},
8686
)
8787
assert isinstance(executor.fetcher, fake_fetcher.FakeApiReportFetcher)
88+
89+
def test_execute_with_multiple_writers_saves_to_both(self, executor, tmp_path, capsys):
90+
"""Test that multiple writers (console and json) both execute."""
91+
context = api_executor.ApiExecutionContext(
92+
writer=['console', 'json'],
93+
writer_parameters={'destination_folder': str(tmp_path)},
94+
)
95+
executor.execute(
96+
query=_TEST_QUERY,
97+
title='test',
98+
context=context,
99+
)
100+
# Verify JSON file was created
101+
json_file = pathlib.Path(context.writer_clients[1].destination_folder) / 'test.json'
102+
assert json_file.exists()
103+
with pathlib.Path.open(json_file, 'r', encoding='utf-8') as f:
104+
result = json.load(f)
105+
assert result == _TEST_DATA
106+
107+
# Verify console output was generated
108+
output = capsys.readouterr().out
109+
assert 'showing results' in output and 'test' in output

libs/executors/tests/unit/test_execution_context.py

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
import yaml
16+
import pytest
1617
from garf_core import query_editor
1718
from garf_executors.execution_context import ExecutionContext
1819

@@ -80,4 +81,51 @@ def test_save_returns_correct_data(self, tmp_path):
8081
context.save(tmp_config)
8182
with open(tmp_config, 'r', encoding='utf-8') as f:
8283
config_data = yaml.safe_load(f)
83-
assert config_data == data
84+
# Check that the data is saved correctly without extra fields
85+
assert config_data['writer'] == data['writer']
86+
assert config_data['writer_parameters'] == data['writer_parameters']
87+
88+
def test_multiple_writers_creates_multiple_clients(self, tmp_path):
89+
context = ExecutionContext(
90+
writer=['console', 'json'],
91+
writer_parameters={'destination_folder': str(tmp_path)},
92+
)
93+
writer_clients = context.writer_clients
94+
assert len(writer_clients) == 2
95+
assert writer_clients[0].__class__.__name__ == 'ConsoleWriter'
96+
assert writer_clients[1].__class__.__name__ == 'JsonWriter'
97+
98+
def test_multiple_writers_without_parameters_creates_empty_dicts(self):
99+
context = ExecutionContext(
100+
writer=['console', 'json'],
101+
)
102+
writer_clients = context.writer_clients
103+
assert len(writer_clients) == 2
104+
105+
def test_backward_compatibility_single_writer_still_works(self, tmp_path):
106+
context = ExecutionContext(
107+
writer='json',
108+
writer_parameters={'destination_folder': str(tmp_path)},
109+
)
110+
# Should work with writer_client property
111+
writer_client = context.writer_client
112+
assert writer_client.__class__.__name__ == 'JsonWriter'
113+
# Should also work with writer_clients property
114+
writer_clients = context.writer_clients
115+
assert len(writer_clients) == 1
116+
assert writer_clients[0].__class__.__name__ == 'JsonWriter'
117+
118+
119+
def test_from_file_with_multiple_writers(self, tmp_path):
120+
tmp_config = tmp_path / 'config.yaml'
121+
data = {
122+
'writer': ['console', 'json'],
123+
'writer_parameters': {
124+
'destination_folder': '/tmp',
125+
},
126+
}
127+
with open(tmp_config, 'w', encoding='utf-8') as f:
128+
yaml.dump(data, f, encoding='utf-8')
129+
context = ExecutionContext.from_file(tmp_config)
130+
assert context.writer == ['console', 'json']
131+
assert len(context.writer_clients) == 2

0 commit comments

Comments
 (0)