Skip to content

RisingWave Returns Duplicate Records When Reading Iceberg Tables with Delete Files #23411

@jw-itq

Description

@jw-itq

Describe the bug

RisingWave returns duplicate records when querying Iceberg tables that contain Position Delete or Equality Delete files, while other engines (Spark, StarRocks) correctly filter the deleted records and return deduplicated results.

Same query returns different results across engines:

  • ✅ Spark (via Amoro): 1 record (correct)
  • ✅ StarRocks: 1 record (correct)
  • ❌ RisingWave: 2 records (duplicate primary key)

This happens specifically when:

  1. The Iceberg table has primary keys with upsert mode enabled
  2. Data has been updated (creating equality delete files)
  3. Table optimization has converted equality deletes to position deletes
  4. RisingWave queries the optimized table

Root Cause: RisingWave's code in src/connector/src/source/iceberg/mod.rs (lines 403-460) extracts delete files from FileScanTask.deletes and uses zip_eq_fast to re-pair them with data files. This breaks the original association that Iceberg's plan_files() already established between data files and their corresponding delete files, causing delete files to be applied to wrong data files or not applied at all.

Error message/log

No error message. The query executes successfully but returns incorrect results (duplicate records).

Query execution plan shows delete filtering is attempted:

BatchHashJoin { type: LeftAnti, predicate: _iceberg_file_path = _iceberg_file_path AND _iceberg_file_pos = _iceberg_file_pos }
├─BatchHashJoin { type: LeftAnti, predicate: id = id AND (_iceberg_sequence_number < _iceberg_sequence_number) }
│ ├─BatchIcebergScan { iceberg_scan_type: DataScan }
│ └─BatchIcebergScan { iceberg_scan_type: EqualityDeleteScan }
└─BatchIcebergScan { iceberg_scan_type: PositionDeleteScan }

But the filtering doesn't work correctly due to wrong file associations.

To Reproduce

1. Setup Iceberg table with primary key:

-- Using Spark or Amoro
CREATE TABLE test_table (
    id INT,
    value STRING
) USING iceberg
TBLPROPERTIES (
    'write.upsert.enabled' = 'true',
    'primary.keys' = 'id'
);

2. Insert and update data:

INSERT INTO test_table VALUES (1, 'value1');
UPDATE test_table SET value = 'value2' WHERE id = 1;

3. Run table optimization (converts equality deletes to position deletes):

-- Using Amoro optimizer or Spark
CALL catalog.system.optimize('test_table');

4. Query in RisingWave:

-- Create source in RisingWave
CREATE SOURCE test_source (
    id INT,
    value VARCHAR
) WITH (
    connector = 'iceberg',
    catalog.type = 'rest',
    catalog.uri = 'http://your-catalog:8181/catalog',
    database.name = 'your_db',
    table.name = 'test_table',
    warehouse.path = 'your_warehouse',
    s3.region = 'your-region',
    s3.endpoint = 'your-endpoint',
    s3.access.key = 'your-key',
    s3.secret.key = 'your-secret'
) FORMAT NONE ENCODE NONE;

SELECT * FROM test_source WHERE id = 1;

Expected: 1 record with value='value2'
Actual: 2 records (both value1 and value2)

Expected behavior

RisingWave should return the same deduplicated results as Spark and StarRocks:

  • Apply position delete files to filter deleted rows
  • Apply equality delete files to filter outdated records
  • Return only the latest version of each primary key

For the test case, should return only 1 record with the updated value.

How did you deploy RisingWave?

Docker / Standalone deployment

The version of RisingWave

v2.6.0

Additional context

Iceberg File Metadata Verification (via Spark):

-- Current snapshot only contains one data file with position delete
SELECT file_path, content, record_count
FROM table.files;

Results:
file_path                          | content | record_count
-----------------------------------|---------|-------------
...e9a7b974...00001.parquet       | 0 (DATA)| 109617
...e9a7b974...00001-delete-...    | 1 (POS_DELETE) | 1

The position delete file marks 1 row in the data file as deleted, but RisingWave still returns it.

Code Analysis:

The bug is in src/connector/src/source/iceberg/mod.rs where delete files are extracted from their original FileScanTask and redistributed:

// Lines 403-460
for task in file_scan_stream {
    for delete_file in task.deletes.drain(..) {  // ❌ Extracts deletes
        position_delete_files.push(delete_file);
    }
    data_files.push(task);  // task.deletes is now empty
}

// Then re-pairs using zip - WRONG ASSOCIATION!
let splits = data_files
    .zip_eq_fast(position_delete_files)

Iceberg's plan_files() already returns FileScanTask with correct delete file associations. Breaking and re-pairing them causes position/equality deletes to be applied to wrong data files.

Suggested Fix: Keep each FileScanTask intact with its original deletes, and split them as complete units for parallel processing.

Related Files:

  • src/connector/src/source/iceberg/mod.rs (main issue)
  • src/batch/executors/src/executor/iceberg_scan.rs
  • src/frontend/src/optimizer/rule/source_to_iceberg_scan_rule.rs

Impact: All Iceberg tables with delete files will return incorrect/duplicate data in RisingWave.

Metadata

Metadata

Assignees

No one assigned

    Labels

    type/bugType: Bug. Only for issues.

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions