Skip to content

Implement BigQuery Partition By statements using sqlglot #2722

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 12 commits into
base: devel
Choose a base branch
from

Conversation

hsm207
Copy link
Collaborator

@hsm207 hsm207 commented Jun 5, 2025

Description

Imeplement

Implenent BigQuery Partition By statements using sqlglot

Related Issues

Additional Context

Copy link

netlify bot commented Jun 5, 2025

Deploy Preview for dlt-hub-docs canceled.

Name Link
🔨 Latest commit 8a67683
🔍 Latest deploy log https://app.netlify.com/projects/dlt-hub-docs/deploys/68431a4db4dadd00083e33aa

@hsm207 hsm207 changed the title Implenent BigQuery Partition By statements using sqlglot Implement BigQuery Partition By statements using sqlglot Jun 6, 2025
)

pipeline.extract(partitioned_table)
pipeline.normalize()
Copy link
Collaborator Author

@hsm207 hsm207 Jun 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rudolfix Calling normalize throws the following error:

dlt.common.storages.exceptions.InStorageSchemaModified: Schema bigquery_test_partition_with_partition_spec in /workspaces/dlt/_storage/.dlt/pipelines/bigquery_test_partition_with_partition_spec/schemas was externally modified. This is not allowed as that would prevent correct version tracking. Use import/export capabilities of dlt to provide external changes.

Here is the full stack trace:

/workspaces/dlt/tests/load/bigquery/test_bigquery_table_builder.py::test_create_table_with_custom_range_bucket_partition_using_partition_spec failed: self = <dlt.pipeline(pipeline_name='bigquery_test_partition_with_partition_spec', destination='bigquery', dataset_name='bigqu...ge/.dlt/pipelines', working_dir='/workspaces/dlt/_storage/.dlt/pipelines/bigquery_test_partition_with_partition_spec')>
args = (), kwargs = {}, name = 'bigquery_test_partition_with_partition_spec'

    @wraps(f)
    def _wrap(self: "Pipeline", *args: Any, **kwargs: Any) -> Any:
        for name in self._schema_storage.live_schemas:
            # refresh live schemas in storage or import schema path
            self._schema_storage.commit_live_schema(name)
        try:
>           rv = f(self, *args, **kwargs)

args       = ()
f          = <function Pipeline.normalize at 0xffff8c13ae80>
kwargs     = {}
name       = 'bigquery_test_partition_with_partition_spec'
self       = <dlt.pipeline(pipeline_name='bigquery_test_partition_with_partition_spec', destination='bigquery', dataset_name='bigqu...ge/.dlt/pipelines', working_dir='/workspaces/dlt/_storage/.dlt/pipelines/bigquery_test_partition_with_partition_spec')>

dlt/pipeline/pipeline.py:176: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
dlt/pipeline/pipeline.py:271: in _wrap
    return f(self, *args, **kwargs)
        args       = ()
        f          = <function Pipeline.normalize at 0xffff8c13ade0>
        kwargs     = {}
        merge_func = None
        sections   = ('normalize',)
        self       = <dlt.pipeline(pipeline_name='bigquery_test_partition_with_partition_spec', destination='bigquery', dataset_name='bigqu...ge/.dlt/pipelines', working_dir='/workspaces/dlt/_storage/.dlt/pipelines/bigquery_test_partition_with_partition_spec')>
dlt/pipeline/pipeline.py:509: in normalize
    self._get_destination_capabilities()
        self       = <dlt.pipeline(pipeline_name='bigquery_test_partition_with_partition_spec', destination='bigquery', dataset_name='bigqu...ge/.dlt/pipelines', working_dir='/workspaces/dlt/_storage/.dlt/pipelines/bigquery_test_partition_with_partition_spec')>
        workers    = 1
dlt/pipeline/pipeline.py:1338: in _get_destination_capabilities
    naming = self.default_schema.naming
        self       = <dlt.pipeline(pipeline_name='bigquery_test_partition_with_partition_spec', destination='bigquery', dataset_name='bigqu...ge/.dlt/pipelines', working_dir='/workspaces/dlt/_storage/.dlt/pipelines/bigquery_test_partition_with_partition_spec')>
dlt/pipeline/pipeline.py:925: in default_schema
    return self.schemas[self.default_schema_name]
        self       = <dlt.pipeline(pipeline_name='bigquery_test_partition_with_partition_spec', destination='bigquery', dataset_name='bigqu...ge/.dlt/pipelines', working_dir='/workspaces/dlt/_storage/.dlt/pipelines/bigquery_test_partition_with_partition_spec')>
dlt/common/storages/live_schema_storage.py:22: in __getitem__
    schema = self.load_schema(name)
        name       = 'bigquery_test_partition_with_partition_spec'
        schema     = <dlt.Schema(name='bigquery_test_partition_with_partition_spec', version=1, tables=['_dlt_version', '_dlt_loads', 'partitioned_table', '_dlt_pipeline_state'], version_hash='N7gNHerzeTe1xPWVZCc6CMOZuTHFjgrD6dZMEeaCBrc=')>
        self       = <dlt.common.storages.live_schema_storage.LiveSchemaStorage object at 0xffff890f7820>
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <dlt.common.storages.live_schema_storage.LiveSchemaStorage object at 0xffff890f7820>
name = 'bigquery_test_partition_with_partition_spec'

    def load_schema(self, name: str) -> Schema:
        # loads a schema from a store holding many schemas
        storage_schema: DictStrAny = None
        try:
            storage_schema = self._load_schema_json(name)
            # prevent external modifications of schemas kept in storage
            if not verify_schema_hash(storage_schema, verifies_if_not_migrated=True):
>               raise InStorageSchemaModified(name, self.config.schema_volume_path)
E               dlt.common.storages.exceptions.InStorageSchemaModified: Schema bigquery_test_partition_with_partition_spec in /workspaces/dlt/_storage/.dlt/pipelines/bigquery_test_partition_with_partition_spec/schemas was externally modified. This is not allowed as that would prevent correct version tracking. Use import/export capabilities of dlt to provide external changes.

name       = 'bigquery_test_partition_with_partition_spec'
self       = <dlt.common.storages.live_schema_storage.LiveSchemaStorage object at 0xffff890f7820>
storage_schema = {'engine_version': 11, 'name': 'bigquery_test_partition_with_partition_spec', 'normalizers': {'json': {'module': 'dlt....rs.json.relational'}, 'names': 'snake_case'}, 'previous_hashes': ['mfxkJfziU9lGeLIoX2qxauVud2IBkheCqvlFJaq7x8M='], ...}

dlt/common/storages/schema_storage.py:47: InStorageSchemaModified

During handling of the above exception, another exception occurred:

    def test_create_table_with_custom_range_bucket_partition_using_partition_spec() -> None:
        @dlt.resource
        def partitioned_table():
            yield {
                "user_id": 10000,
                "name": "user 1",
                "created_at": "2021-01-01T00:00:00Z",
                "category": "category 1",
                "score": 100.0,
            }
    
        partition_spec = BigQueryRangeBucketPartition(
            column_name="user_id", start=0, end=1000000, interval=10000
        )
    
        bigquery_adapter(partitioned_table, partition=partition_spec)
    
        pipeline = dlt.pipeline(
            "bigquery_test_partition_with_partition_spec",
            destination="bigquery",
            dev_mode=True,
        )
    
        pipeline.extract(partitioned_table)
>       pipeline.normalize()

partition_spec = BigQueryRangeBucketPartition(column_name='user_id', start=0, end=1000000, interval=10000)
partitioned_table = <@dlt.resource(name='partitioned_table', write_disposition='append')>
pipeline   = <dlt.pipeline(pipeline_name='bigquery_test_partition_with_partition_spec', destination='bigquery', dataset_name='bigqu...ge/.dlt/pipelines', working_dir='/workspaces/dlt/_storage/.dlt/pipelines/bigquery_test_partition_with_partition_spec')>

tests/load/bigquery/test_bigquery_table_builder.py:317: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
dlt/pipeline/pipeline.py:222: in _wrap
    step_info = f(self, *args, **kwargs)
        args       = ()
        f          = <function Pipeline.normalize at 0xffff8c13ad40>
        is_new_trace = True
        kwargs     = {}
        self       = <dlt.pipeline(pipeline_name='bigquery_test_partition_with_partition_spec', destination='bigquery', dataset_name='bigqu...ge/.dlt/pipelines', working_dir='/workspaces/dlt/_storage/.dlt/pipelines/bigquery_test_partition_with_partition_spec')>
        send_state = False
        step_info  = InStorageSchemaModified('Schema bigquery_test_partition_with_partition_spec in /workspaces/dlt/_storage/.dlt/pipelines...ed as that would prevent correct version tracking. Use import/export capabilities of dlt to provide external changes.')
        trace      = PipelineTrace(transaction_id='941b9f50827e1951f052746aeaf0789d', pipeline_name='bigquery_test_partition_with_partition...vider_name='Environment Variables', config_type_name='GcpServiceAccountCredentialsWithoutDefaults')], engine_version=1)
        trace_step = PipelineStepTrace(span_id='bdee8089f52b69ea2a04c812e10c78f0', step='normalize', started_at=DateTime(2025, 6, 6, 16, 21, 10, 246277, tzinfo=Timezone('UTC')), finished_at=None, step_info=None, step_exception=None, exception_traces=None)
dlt/pipeline/pipeline.py:182: in _wrap
    schema = self._schema_storage.load_schema(name)
        args       = ()
        f          = <function Pipeline.normalize at 0xffff8c13ae80>
        kwargs     = {}
        name       = 'bigquery_test_partition_with_partition_spec'
        self       = <dlt.pipeline(pipeline_name='bigquery_test_partition_with_partition_spec', destination='bigquery', dataset_name='bigqu...ge/.dlt/pipelines', working_dir='/workspaces/dlt/_storage/.dlt/pipelines/bigquery_test_partition_with_partition_spec')>
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <dlt.common.storages.live_schema_storage.LiveSchemaStorage object at 0xffff890f7820>
name = 'bigquery_test_partition_with_partition_spec'

    def load_schema(self, name: str) -> Schema:
        # loads a schema from a store holding many schemas
        storage_schema: DictStrAny = None
        try:
            storage_schema = self._load_schema_json(name)
            # prevent external modifications of schemas kept in storage
            if not verify_schema_hash(storage_schema, verifies_if_not_migrated=True):
>               raise InStorageSchemaModified(name, self.config.schema_volume_path)
E               dlt.common.storages.exceptions.InStorageSchemaModified: Schema bigquery_test_partition_with_partition_spec in /workspaces/dlt/_storage/.dlt/pipelines/bigquery_test_partition_with_partition_spec/schemas was externally modified. This is not allowed as that would prevent correct version tracking. Use import/export capabilities of dlt to provide external changes.

name       = 'bigquery_test_partition_with_partition_spec'
self       = <dlt.common.storages.live_schema_storage.LiveSchemaStorage object at 0xffff890f7820>
storage_schema = {'engine_version': 11, 'name': 'bigquery_test_partition_with_partition_spec', 'normalizers': {'json': {'module': 'dlt....rs.json.relational'}, 'names': 'snake_case'}, 'previous_hashes': ['mfxkJfziU9lGeLIoX2qxauVud2IBkheCqvlFJaq7x8M='], ...}

dlt/common/storages/schema_storage.py:47: InStorageSchemaModified

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Use sqlglot to create partition by expressions in BigQuery
1 participant