Skip to content

[Bug]: ignoreUnknownValues not working when using CreateDisposition.CREATE_IF_NEEDED  #27892

Open
@sarfarazahammed

Description

@sarfarazahammed

What happened?

We are currently executing a dataflow operation to transfer data from Kafka to BigQuery. Within this data flow, we have established a predetermined schema for BigQuery, which we use for generating new tables. The event from Kafka is updated with a new schema, our intention is to maintain the data in the existing tables while disregarding the newly introduced values.

In our initial implementation using STREAMING_INSERTS, we effectively addressed this scenario by configuring ignoreUnknownValues as true. This approach allowed us to manage this use case. However, upon transitioning to STORAGE_WRITE_API to achieve precisely once inserts, we encountered a challenge. Specifically, the aforementioned approach of utilizing ignoreUnknownValues is no longer effective, resulting in potential data loss.

The error we are encountering is as follows:

com.google.cloud.bigquery.storage.v1.ConnectionWorker doneCallback

Connection finished with error com.google.api.gax.rpc.InvalidArgumentException: 
io.grpc.StatusRuntimeException: 
INVALID_ARGUMENT: 
Input schema has more fields than BigQuery schema, extra fields: 'testfield' Entity: projects/{projectId}/{dataset}/tables/{tableName}/streams/{streamId} for stream projects/{projectId}/datasets/{staging_events_visitor}/tables/{tableId}/streams/{streamId} with write id: {writeId}

Steps to produce it:

  1. Create a TableSchema
  2. use .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED).withSchema(schema).withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API) while writing to BQ
  3. Execute beam to transfer some data with the current schema
  4. Add new field to TableSchema
  5. Try to execute pipeline that consumes event with new field in it and adds it to existing table.
  6. You will see the error

Adding a table to provide more information and assume ignoreUnknownValues is set for all cases

withSchema set? field exist in TableSchema field exist in BQ CreateDisposition Remarks w.r.t StorageWriteAPI Remarks w.r.tStreaming Insert
yes no no ANY No issue No issue
yes no yes ANY value of the field is set as null in BQ No issue[value of the field is getting set]
yes yes yes ANY No issue No issue
yes yes no ANY INVALID_ARGUMENT: Input schema has more fields than BigQuery schema No issue
no   no CREATE_NEVER No issue No issue
no   yes CREATE_NEVER No issue [value of field is being set in BQ] No issue [value of field is being set in BQ]

*ANY represent, same behavior with of disposition(CREATE_NEVER / CREATE_IF_NEEDED)

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions