Skip to content

Changes from the same partition/stream appear on different threads #152

@Stepuk

Description

@Stepuk

Hi Team,

Consider the following set-up.

Table:

create table t (k bigint, v text, primary key (k)) with cdc = {'enabled':true};

insert into t (k, v) values (1, 'a');
insert into t (k, v) values (1, 'b');
insert into t (k, v) values (1, 'c');

select * from t_scylla_cdc_log;

 cdc$stream_id                      | cdc$time                             | cdc$batch_seq_no | cdc$deleted_v | cdc$end_of_batch | cdc$operation | cdc$ttl | k | v
------------------------------------+--------------------------------------+------------------+---------------+------------------+---------------+---------+---+---
 0x57400000000000001e5234f818002901 | a10dd362-ce7d-11f0-ab70-a72ae550bec7 |                0 |          null |             True |             2 |    null | 1 | a
 0x57400000000000001e5234f818002901 | a2315e76-ce7d-11f0-e37a-559a3eebe08d |                0 |          null |             True |             2 |    null | 1 | b
 0x57400000000000001e5234f818002901 | a3601a80-ce7d-11f0-3228-d09619a9c9c8 |                0 |          null |             True |             2 |    null | 1 | c

(all changes are within the same partition/stream)

Consumer:

CDCConsumer c = CDCConsumer.builder()
    .addTable(new TableName("test", "t"))
    .withQueryTimeWindowSizeMs(3600000) // 1 hour
    .withConsumer(Consumer.forTaskAndRawChangeConsumer((task, change) -> { // Just log task & change
        LoggerFactory.getLogger(ScyllaAutoConfiguration.class).info(task.toString());
        LoggerFactory.getLogger(ScyllaAutoConfiguration.class).info(change.toString());
        return CompletableFuture.completedFuture(null);
    }))
    .build();
c.start();

Output:

2025-12-01 09:35:03.100 [pool-7-thread-4] - INFO  Consumer - [] - [] - Task(TaskId(GenerationId(Timestamp(13/03/2024, 17:27:09.425)), VNodeId(656), TableName(test, t)), [StreamId(573D6CBA26684AB0D7A0DF54B0002901), StreamId(57400000000000001E5234F818002901)], TaskState(6a131170-ce77-11f0-8080-808080808080 (Timestamp(01/12/2025, 08:34:32.455)), cbd7796f-ce7f-11f0-7f7f-7f7f7f7f7f7f (Timestamp(01/12/2025, 09:34:32.455)), Optional.empty)) 
2025-12-01 09:35:03.109 [pool-7-thread-4] - INFO  Consumer - [] - [] - Driver3RawChange(cdc$stream_id = 57400000000000001E5234F818002901, cdc$time = a10dd362-ce7d-11f0-ab70-a72ae550bec7, cdc$batch_seq_no = 0, cdc$deleted_v = null, cdc$end_of_batch = true, cdc$operation = 2, cdc$ttl = null, k = 1, v = a) 

2025-12-01 09:35:03.111 [pool-7-thread-3] - INFO  Consumer - [] - [] - Task(TaskId(GenerationId(Timestamp(13/03/2024, 17:27:09.425)), VNodeId(656), TableName(test, t)), [StreamId(573D6CBA26684AB0D7A0DF54B0002901), StreamId(57400000000000001E5234F818002901)], TaskState(6a131170-ce77-11f0-8080-808080808080 (Timestamp(01/12/2025, 08:34:32.455)), cbd7796f-ce7f-11f0-7f7f-7f7f7f7f7f7f (Timestamp(01/12/2025, 09:34:32.455)), Optional[ChangeId(StreamId(57400000000000001E5234F818002901), ChangeTime(a10dd362-ce7d-11f0-ab70-a72ae550bec7))])) 
2025-12-01 09:35:03.111 [pool-7-thread-3] - INFO  Consumer - [] - [] - Driver3RawChange(cdc$stream_id = 57400000000000001E5234F818002901, cdc$time = a2315e76-ce7d-11f0-e37a-559a3eebe08d, cdc$batch_seq_no = 0, cdc$deleted_v = null, cdc$end_of_batch = true, cdc$operation = 2, cdc$ttl = null, k = 1, v = b) 

2025-12-01 09:35:03.112 [pool-7-thread-1] - INFO  Consumer - [] - [] - Task(TaskId(GenerationId(Timestamp(13/03/2024, 17:27:09.425)), VNodeId(656), TableName(test, t)), [StreamId(573D6CBA26684AB0D7A0DF54B0002901), StreamId(57400000000000001E5234F818002901)], TaskState(6a131170-ce77-11f0-8080-808080808080 (Timestamp(01/12/2025, 08:34:32.455)), cbd7796f-ce7f-11f0-7f7f-7f7f7f7f7f7f (Timestamp(01/12/2025, 09:34:32.455)), Optional[ChangeId(StreamId(57400000000000001E5234F818002901), ChangeTime(a2315e76-ce7d-11f0-e37a-559a3eebe08d))])) 
2025-12-01 09:35:03.112 [pool-7-thread-1] - INFO  Consumer - [] - [] - Driver3RawChange(cdc$stream_id = 57400000000000001E5234F818002901, cdc$time = a3601a80-ce7d-11f0-3228-d09619a9c9c8, cdc$batch_seq_no = 0, cdc$deleted_v = null, cdc$end_of_batch = true, cdc$operation = 2, cdc$ttl = null, k = 1, v = c)

According to https://www.scylladb.com/2021/02/09/consuming-cdc-with-java-and-go/ and https://github.com/scylladb/scylla-cdc-java/tree/master/scylla-cdc-printer

The CDC consumer is started multi-threaded, with a configurable number of threads. Each thread will read a distinct subset of the CDC log (partitioned based on Vnodes. Those multiple threads will cumulatively read the entire CDC log. All changes related to the same row (more generally the same partition key) will appear on the same thread.

However in my case three changes are consumed by different threads as seen from the log entries above (pool-7-thread-4, pool-7-thread-3, pool-7-thread-1). Is this expected and threading/change ordering behavior in the library somehow changed over the time or I'm doing something wrong in this example?

Thanks!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions