Skip to content

Commit c8c1d1d

Browse files
[executors] fix: init super classes for Bq and SQl executors
* Provide query title when executor fails to run the query * Provide INFO logging messages for bq executor
1 parent 3b620b2 commit c8c1d1d

File tree

3 files changed

+50
-32
lines changed

3 files changed

+50
-32
lines changed

libs/executors/garf_executors/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,4 @@ def setup_executor(
5757
'ApiExecutionContext',
5858
]
5959

60-
__version__ = '0.2.0'
60+
__version__ = '0.2.1'

libs/executors/garf_executors/bq_executor.py

Lines changed: 36 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
from garf_core import query_editor, report
3232
from google.cloud import exceptions as google_cloud_exceptions
33+
from opentelemetry import trace
3334

3435
from garf_executors import exceptions, execution_context, executor
3536
from garf_executors.telemetry import tracer
@@ -68,6 +69,7 @@ def __init__(
6869
)
6970
self.project_id = project_id
7071
self.location = location
72+
super().__init__()
7173

7274
@property
7375
def client(self) -> bigquery.Client:
@@ -93,41 +95,47 @@ def execute(
9395
Returns:
9496
Report with data if query returns some data otherwise empty Report.
9597
"""
98+
span = trace.get_current_span()
99+
logger.info('Executing script: %s', title)
96100
query_text = self.replace_params_template(query, context.query_parameters)
97101
self.create_datasets(context.query_parameters.macro)
98102
job = self.client.query(query_text)
99103
try:
100104
result = job.result()
105+
except google_cloud_exceptions.GoogleCloudError as e:
106+
raise BigQueryExecutorError(
107+
f'Failed to execute query {title}: Reason: {e}'
108+
) from e
101109
logger.debug('%s launched successfully', title)
102-
if result.total_rows:
103-
results = report.GarfReport.from_pandas(result.to_dataframe())
110+
if result.total_rows:
111+
results = report.GarfReport.from_pandas(result.to_dataframe())
112+
else:
113+
results = report.GarfReport()
114+
if context.writer and results:
115+
writer_clients = context.writer_clients
116+
if not writer_clients:
117+
logger.warning('No writers configured, skipping write operation')
104118
else:
105-
results = report.GarfReport()
106-
if context.writer and results:
107-
writer_clients = context.writer_clients
108-
if not writer_clients:
109-
logger.warning('No writers configured, skipping write operation')
110-
else:
111-
writing_results = []
112-
for writer_client in writer_clients:
113-
logger.debug(
114-
'Start writing data for query %s via %s writer',
115-
title,
116-
type(writer_client),
117-
)
118-
writing_result = writer_client.write(results, title)
119-
logger.debug(
120-
'Finish writing data for query %s via %s writer',
121-
title,
122-
type(writer_client),
123-
)
124-
writing_results.append(writing_result)
125-
logger.info('%s executed successfully', title)
126-
# Return the last writer's result for backward compatibility
127-
return writing_results[-1] if writing_results else None
128-
return results
129-
except google_cloud_exceptions.GoogleCloudError as e:
130-
raise BigQueryExecutorError(e) from e
119+
writing_results = []
120+
for writer_client in writer_clients:
121+
logger.debug(
122+
'Start writing data for query %s via %s writer',
123+
title,
124+
type(writer_client),
125+
)
126+
writing_result = writer_client.write(results, title)
127+
logger.debug(
128+
'Finish writing data for query %s via %s writer',
129+
title,
130+
type(writer_client),
131+
)
132+
writing_results.append(writing_result)
133+
# Return the last writer's result for backward compatibility
134+
logger.info('%s executed successfully', title)
135+
return writing_results[-1] if writing_results else None
136+
logger.info('%s executed successfully', title)
137+
span.set_attribute('execute.num_results', len(results))
138+
return results
131139

132140
@tracer.start_as_current_span('bq.create_datasets')
133141
def create_datasets(self, macros: dict | None) -> None:

libs/executors/garf_executors/sql_executor.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ def __init__(self, engine: sqlalchemy.engine.base.Engine) -> None:
5757
engine: Initialized Engine object to operated on a given database.
5858
"""
5959
self.engine = engine
60+
super().__init__()
6061

6162
@classmethod
6263
def from_connection_string(
@@ -89,12 +90,17 @@ def execute(
8990
Report with data if query returns some data otherwise empty Report.
9091
"""
9192
span = trace.get_current_span()
92-
logging.info('Executing script: %s', title)
93+
logger.info('Executing script: %s', title)
9394
query_text = self.replace_params_template(query, context.query_parameters)
9495
with self.engine.begin() as conn:
9596
if re.findall(r'(create|update) ', query_text.lower()):
96-
conn.connection.executescript(query_text)
97-
results = report.GarfReport()
97+
try:
98+
conn.connection.executescript(query_text)
99+
results = report.GarfReport()
100+
except Exception as e:
101+
raise SqlAlchemyQueryExecutorError(
102+
f'Failed to execute query {title}: Reason: {e}'
103+
) from e
98104
else:
99105
temp_table_name = f'temp_{uuid.uuid4().hex}'
100106
query_text = f'CREATE TABLE {temp_table_name} AS {query_text}'
@@ -103,6 +109,10 @@ def execute(
103109
results = report.GarfReport.from_pandas(
104110
pd.read_sql(f'SELECT * FROM {temp_table_name}', conn)
105111
)
112+
except Exception as e:
113+
raise SqlAlchemyQueryExecutorError(
114+
f'Failed to execute query {title}: Reason: {e}'
115+
) from e
106116
finally:
107117
conn.connection.execute(f'DROP TABLE {temp_table_name}')
108118
if context.writer and results:

0 commit comments

Comments
 (0)