Skip to content

Non deterministic order with ORDER OFFSET LIMIT queries #28370

@freakyzoidberg

Description

@freakyzoidberg

With connector guaranteeing limit/topN pushdown (like Postgres), queries involving order-by offset limit do not always output ordered results.

The issue seems to be that when the TopN is pushed though the connector, there is TopN in plan as expected.
StreamPropertyDerivations.visitRowNumber today is just a passthrough - but now the input is a gatherer not reporting orderered=true (LocalExchange[SINGLE]) - and ultimately those AddLocalExchanges to add this ROUND_ROBIN repartition which breaks the effective ordering.

AI generated reproducer/fix at freakyzoidberg@8c666da

Thanks

SELECT ...
FROM ...
ORDER BY 1
OFFSET 10
LIMIT 5
Fragment 0 [SINGLE]
    Output layout: [name]
    Output partitioning: SINGLE []
    Output[columnNames = [name]]
    │   Layout: [name:varchar(25)]
    │   Estimates: {rows: 8 (413B), cpu: 0, memory: 0B, network: 0B}
    └─ FilterProject[filterPredicate = (row_number > bigint '10')]
       │   Layout: [name:varchar(25)]
       │   Estimates: {rows: 8 (413B), cpu: 960, memory: 0B, network: 0B}/{rows: 8 (413B), cpu: 413, memory: 0B, network: 0B}
       └─ LocalExchange[partitioning = ROUND_ROBIN]
          │   Layout: [name:varchar(25), row_number:bigint]
          │   Estimates: {rows: 15 (960B), cpu: 960, memory: 0B, network: 0B}
          └─ RowNumber[]
             │   Layout: [name:varchar(25), row_number:bigint]
             │   Estimates: {rows: 15 (960B), cpu: 135, memory: 0B, network: 0B}
             │   row_number := row_number()
             └─ LocalExchange[partitioning = SINGLE]
                │   Layout: [name:varchar(25)]
                │   Estimates: {rows: 15 (825B), cpu: 0, memory: 0B, network: 0B}
                └─ RemoteSource[sourceFragmentIds = [1]]
                       Layout: [name:varchar(25)]

Fragment 1 [SOURCE]
    Output layout: [name]
    Output partitioning: SINGLE []
    TableScan[table = postgresql:tpch.nation tpch.tpch.nation sortOrder=[name:varchar(25):varchar ASC NULLS LAST, nationkey:bigint:int8 ASC NULLS LAST] limit=15 columns=[name:varchar(25):varchar]]
        Layout: [name:varchar(25)]
        Estimates: {rows: 15 (825B), cpu: 825, memory: 0B, network: 0B}
        name := name:varchar(25):varchar

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions