Skip to content

Commit 7bb29e5

Browse files
authored
refactor(bigframes): enable sqlglot as the default compiler with failsafe mechanism (#16602)
Internal issue 417774347
1 parent 6dd1960 commit 7bb29e5

5 files changed

Lines changed: 245 additions & 72 deletions

File tree

packages/bigframes/bigframes/core/compile/__init__.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,28 +13,30 @@
1313
# limitations under the License.
1414
from __future__ import annotations
1515

16-
from typing import Any
16+
from typing import Literal
1717

18-
from bigframes import options
1918
from bigframes.core.compile.api import test_only_ibis_inferred_schema
2019
from bigframes.core.compile.configs import CompileRequest, CompileResult
2120

2221

23-
def compiler() -> Any:
24-
"""Returns the appropriate compiler module based on session options."""
25-
if options.experiments.sql_compiler == "experimental":
22+
def compile_sql(
23+
request: CompileRequest,
24+
compiler_name: Literal["sqlglot", "ibis"] = "sqlglot",
25+
) -> CompileResult:
26+
"""Compiles a BigFrameNode according to the request into SQL."""
27+
if compiler_name == "sqlglot":
2628
import bigframes.core.compile.sqlglot.compiler as sqlglot_compiler
2729

28-
return sqlglot_compiler
30+
return sqlglot_compiler.compile_sql(request)
2931
else:
3032
import bigframes.core.compile.ibis_compiler.ibis_compiler as ibis_compiler
3133

32-
return ibis_compiler
34+
return ibis_compiler.compile_sql(request)
3335

3436

3537
__all__ = [
3638
"test_only_ibis_inferred_schema",
3739
"CompileRequest",
3840
"CompileResult",
39-
"compiler",
41+
"compile_sql",
4042
]

packages/bigframes/bigframes/core/compile/sqlglot/expressions/datetime_ops.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -364,9 +364,14 @@ def _(expr: TypedExpr) -> sge.Expression:
364364
def _(expr: TypedExpr, op: ops.ToDatetimeOp) -> sge.Expression:
365365
if op.format:
366366
result = expr.expr
367-
if expr.dtype != dtypes.STRING_DTYPE:
367+
if expr.dtype == dtypes.STRING_DTYPE:
368+
return sge.TryCast(this=result, to="DATETIME")
369+
else:
368370
result = sge.Cast(this=result, to="STRING")
369-
return sge.TryCast(this=result, to="DATETIME")
371+
result = sge.func(
372+
"PARSE_TIMESTAMP", sge.convert(op.format), result, sge.convert("UTC")
373+
)
374+
return sge.Cast(this=result, to="DATETIME")
370375

371376
if expr.dtype in (
372377
dtypes.STRING_DTYPE,

packages/bigframes/bigframes/session/bq_caching_executor.py

Lines changed: 117 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717
import concurrent.futures
1818
import math
1919
import threading
20+
import uuid
21+
import warnings
2022
from typing import Literal, Mapping, Optional, Sequence, Tuple
2123

2224
import google.api_core.exceptions
2325
import google.cloud.bigquery.job as bq_job
2426
import google.cloud.bigquery.table as bq_table
2527
import google.cloud.bigquery_storage_v1
28+
import google.cloud.exceptions
2629
from google.cloud import bigquery
2730

2831
import bigframes
@@ -124,9 +127,7 @@ def to_sql(
124127
else array_value.node
125128
)
126129
node = self._substitute_large_local_sources(node)
127-
compiled = compile.compiler().compile_sql(
128-
compile.CompileRequest(node, sort_rows=ordered)
129-
)
130+
compiled = self._compile(node, ordered=ordered)
130131
return compiled.sql
131132

132133
def execute(
@@ -242,46 +243,55 @@ def _export_gbq(
242243
# validate destination table
243244
existing_table = self._maybe_find_existing_table(spec)
244245

245-
compiled = compile.compiler().compile_sql(
246-
compile.CompileRequest(plan, sort_rows=False)
247-
)
248-
sql = compiled.sql
246+
def run_with_compiler(compiler_name, compiler_id=None):
247+
compiled = self._compile(plan, ordered=False, compiler_name=compiler_name)
248+
sql = compiled.sql
249249

250-
if (existing_table is not None) and _is_schema_match(
251-
existing_table.schema, array_value.schema
252-
):
253-
# b/409086472: Uses DML for table appends and replacements to avoid
254-
# BigQuery `RATE_LIMIT_EXCEEDED` errors, as per quota limits:
255-
# https://cloud.google.com/bigquery/quotas#standard_tables
256-
job_config = bigquery.QueryJobConfig()
250+
if (existing_table is not None) and _is_schema_match(
251+
existing_table.schema, array_value.schema
252+
):
253+
# b/409086472: Uses DML for table appends and replacements to avoid
254+
# BigQuery `RATE_LIMIT_EXCEEDED` errors, as per quota limits:
255+
# https://cloud.google.com/bigquery/quotas#standard_tables
256+
job_config = bigquery.QueryJobConfig()
257+
258+
ir = sqlglot_ir.SQLGlotIR.from_unparsed_query(sql)
259+
if spec.if_exists == "append":
260+
sql = sg_sql.to_sql(
261+
sg_sql.insert(ir.expr.as_select_all(), spec.table)
262+
)
263+
else: # for "replace"
264+
assert spec.if_exists == "replace"
265+
sql = sg_sql.to_sql(
266+
sg_sql.replace(ir.expr.as_select_all(), spec.table)
267+
)
268+
else:
269+
dispositions = {
270+
"fail": bigquery.WriteDisposition.WRITE_EMPTY,
271+
"replace": bigquery.WriteDisposition.WRITE_TRUNCATE,
272+
"append": bigquery.WriteDisposition.WRITE_APPEND,
273+
}
274+
job_config = bigquery.QueryJobConfig(
275+
write_disposition=dispositions[spec.if_exists],
276+
destination=spec.table,
277+
clustering_fields=spec.cluster_cols if spec.cluster_cols else None,
278+
)
257279

258-
ir = sqlglot_ir.SQLGlotIR.from_unparsed_query(sql)
259-
if spec.if_exists == "append":
260-
sql = sg_sql.to_sql(sg_sql.insert(ir.expr.as_select_all(), spec.table))
261-
else: # for "replace"
262-
assert spec.if_exists == "replace"
263-
sql = sg_sql.to_sql(sg_sql.replace(ir.expr.as_select_all(), spec.table))
264-
else:
265-
dispositions = {
266-
"fail": bigquery.WriteDisposition.WRITE_EMPTY,
267-
"replace": bigquery.WriteDisposition.WRITE_TRUNCATE,
268-
"append": bigquery.WriteDisposition.WRITE_APPEND,
269-
}
270-
job_config = bigquery.QueryJobConfig(
271-
write_disposition=dispositions[spec.if_exists],
272-
destination=spec.table,
273-
clustering_fields=spec.cluster_cols if spec.cluster_cols else None,
280+
# Attach data type usage to the job labels
281+
job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs
282+
job_config.labels["bigframes-compiler"] = (
283+
f"{compiler_name}-{compiler_id}" if compiler_id else compiler_name
284+
)
285+
# TODO(swast): plumb through the api_name of the user-facing api that
286+
# caused this query.
287+
iterator, job = self._run_execute_query(
288+
sql=sql,
289+
job_config=job_config,
290+
session=array_value.session,
274291
)
292+
return iterator, job
275293

276-
# Attach data type usage to the job labels
277-
job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs
278-
# TODO(swast): plumb through the api_name of the user-facing api that
279-
# caused this query.
280-
iterator, job = self._run_execute_query(
281-
sql=sql,
282-
job_config=job_config,
283-
session=array_value.session,
284-
)
294+
iterator, job = self._compile_with_fallback(run_with_compiler)
285295

286296
has_special_dtype_col = any(
287297
t in (bigframes.dtypes.TIMEDELTA_DTYPE, bigframes.dtypes.OBJ_REF_DTYPE)
@@ -410,6 +420,43 @@ def _is_trivially_executable(self, array_value: bigframes.core.ArrayValue):
410420
self.prepare_plan(array_value.node)
411421
)
412422

423+
def _compile(
424+
self,
425+
node: nodes.BigFrameNode,
426+
*,
427+
ordered: bool = False,
428+
peek: Optional[int] = None,
429+
materialize_all_order_keys: bool = False,
430+
compiler_name: Literal["sqlglot", "ibis"] = "sqlglot",
431+
) -> compile.CompileResult:
432+
return compile.compile_sql(
433+
compile.CompileRequest(
434+
node,
435+
sort_rows=ordered,
436+
peek_count=peek,
437+
materialize_all_order_keys=materialize_all_order_keys,
438+
),
439+
compiler_name=compiler_name,
440+
)
441+
442+
def _compile_with_fallback(self, run_fn):
443+
compiler_option = bigframes.options.experiments.sql_compiler
444+
if compiler_option == "legacy":
445+
return run_fn("ibis")
446+
elif compiler_option == "experimental":
447+
return run_fn("sqlglot")
448+
else: # stable
449+
compiler_id = f"{uuid.uuid1().hex[:12]}"
450+
try:
451+
return run_fn("sqlglot", compiler_id=compiler_id)
452+
except google.cloud.exceptions.BadRequest as e:
453+
msg = bfe.format_message(
454+
f"Compiler ID {compiler_id}: BadRequest on sqlglot. "
455+
f"Falling back to ibis. Details: {e.message}"
456+
)
457+
warnings.warn(msg, category=UserWarning)
458+
return run_fn("ibis", compiler_id=compiler_id)
459+
413460
def prepare_plan(
414461
self,
415462
plan: nodes.BigFrameNode,
@@ -604,34 +651,43 @@ def _execute_plan_gbq(
604651
]
605652
cluster_cols = cluster_cols[:_MAX_CLUSTER_COLUMNS]
606653

607-
compiled = compile.compiler().compile_sql(
608-
compile.CompileRequest(
654+
def run_with_compiler(compiler_name, compiler_id=None):
655+
compiled = self._compile(
609656
plan,
610-
sort_rows=ordered,
611-
peek_count=peek,
657+
ordered=ordered,
658+
peek=peek,
612659
materialize_all_order_keys=(cache_spec is not None),
660+
compiler_name=compiler_name,
613661
)
614-
)
615-
# might have more columns than og schema, for hidden ordering columns
616-
compiled_schema = compiled.sql_schema
662+
# might have more columns than og schema, for hidden ordering columns
663+
compiled_schema = compiled.sql_schema
617664

618-
destination_table: Optional[bigquery.TableReference] = None
665+
destination_table: Optional[bigquery.TableReference] = None
619666

620-
job_config = bigquery.QueryJobConfig()
621-
if create_table:
622-
destination_table = self.storage_manager.create_temp_table(
623-
compiled_schema, cluster_cols
667+
job_config = bigquery.QueryJobConfig()
668+
if create_table:
669+
destination_table = self.storage_manager.create_temp_table(
670+
compiled_schema, cluster_cols
671+
)
672+
job_config.destination = destination_table
673+
674+
# Attach data type usage to the job labels
675+
job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs
676+
job_config.labels["bigframes-compiler"] = (
677+
f"{compiler_name}-{compiler_id}" if compiler_id else compiler_name
624678
)
625-
job_config.destination = destination_table
626-
627-
# Attach data type usage to the job labels
628-
job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs
629-
iterator, query_job = self._run_execute_query(
630-
sql=compiled.sql,
631-
job_config=job_config,
632-
query_with_job=(destination_table is not None),
633-
session=plan.session,
634-
)
679+
iterator, query_job = self._run_execute_query(
680+
sql=compiled.sql,
681+
job_config=job_config,
682+
query_with_job=(destination_table is not None),
683+
session=plan.session,
684+
)
685+
return iterator, query_job, compiled
686+
687+
iterator, query_job, compiled = self._compile_with_fallback(run_with_compiler)
688+
689+
# might have more columns than og schema, for hidden ordering columns
690+
compiled_schema = compiled.sql_schema
635691

636692
# we could actually cache even when caching is not explicitly requested, but being conservative for now
637693
result_bq_data = None

packages/bigframes/noxfile.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,6 @@ def cover(session):
467467
omitted_paths = [
468468
# non-prod, unit tested
469469
"bigframes/core/compile/polars/*",
470-
"bigframes/core/compile/sqlglot/*",
471470
# untested
472471
"bigframes/streaming/*",
473472
# utils

0 commit comments

Comments
 (0)