Skip to content

Sink connector sometimes re-insert some topic message, for some really busy topic #182

@brianliefinaccel

Description

@brianliefinaccel

Hi, when using this connector on production with upsertEnabled, we found that sometimes this connector will "re-insert" previously read Kafka topic message, causing some incorrect record state on BigQuery after merge.

Image

This is a table with millions of DB event being processed daily, replicated from MySQL to Kafka and then brought from Kafka into BigQuery by the sink.

In above's evidence the record with id 912007723 should have the final status of 4 (on i = 22371), but because the sink somehow reinserts the first state (on i = 38686) about a minute after, the record is having the wrong status of 1.

Here's the configuration on production that we use

    Allow Big Query Required Field Relaxation:             true
    Allow New Big Query Fields:                            true
    Allow Schema Unionization:                             true
    Auto Create Tables:                                    true
    Big Query Partition Decorator:                         false
    Big Query Retry:                                       5
    Big Query Retry Wait:                                  5000
    Clustering Partition Field Names:                      order_id,merchant_id,user_id,bonus_id
    consumer.override.max.poll.records:                    20000
    Default Dataset:                                       l2alpha
    Delete Enabled:                                        true
    errors.deadletterqueue.context.headers.enable:         true
    errors.deadletterqueue.topic.name:                     ffi-production.dlq
    errors.retry.delay.max.ms:                             5000
    errors.retry.timeout:                                  600000
    errors.tolerance:                                      none
    fetch.wait.max.ms:                                     30000
    Kafka Data Field Name:                                 kafkaData
    Kafka Key Field Name:                                  kafkaKeyNum
    key.converter:                                         io.confluent.connect.json.JsonSchemaConverter
    key.converter.basic.auth.credentials.source:           USER_INFO
    key.converter.schema.registry.basic.auth.user.info:    <omitted>
    key.converter.schema.registry.url:                     <omitted>
    key.converter.schemas.enable:                          true
    Key Source:                                            APPLICATION_DEFAULT
    max.poll.records:                                      20000
    Merge Interval Ms:                                     900000
    Merge Records Threshold:                               -1
    Project:                                               <omitted>
    Sanitize Field Names:                                  true
    Sanitize Topics:                                       true
    schema.compatibility:                                  NONE
    Schema Retriever:                                      com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
    snapshot.locking.mode:                                 none
    Timestamp Partition Field Name:                        created_on
    topic2TableMap:                                        <omitted>
    Topic Keys:                                            id
    Topics:                                                <omitted>
    Upsert Enabled:                                        true
    value.converter:                                       io.confluent.connect.json.JsonSchemaConverter
    value.converter.basic.auth.credentials.source:         USER_INFO
    value.converter.schema.registry.basic.auth.user.info:  <omitted>
    value.converter.schema.registry.url:                   <omitted>

What could be the cause of the sink reinserting previously-read message into the temporary table?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions