From 720502793c1518398ac3260579ded8c57c4c4ec2 Mon Sep 17 00:00:00 2001 From: Leon Date: Tue, 10 Mar 2026 07:58:17 +0800 Subject: [PATCH] add OFFSET-FETCH sql format to support DB2i advance query --- .../io/tapdata/common/CommonDbConnector.java | 39 ++++++++ .../io/tapdata/common/CommonSqlMaker.java | 27 +++++ .../io/tapdata/common/CommonSqlMakerTest.java | 99 +++++++++++++++++++ 3 files changed, 165 insertions(+) diff --git a/connectors-common/sql-core/src/main/java/io/tapdata/common/CommonDbConnector.java b/connectors-common/sql-core/src/main/java/io/tapdata/common/CommonDbConnector.java index 56d83a227..135bef92b 100644 --- a/connectors-common/sql-core/src/main/java/io/tapdata/common/CommonDbConnector.java +++ b/connectors-common/sql-core/src/main/java/io/tapdata/common/CommonDbConnector.java @@ -920,6 +920,34 @@ protected void queryByAdvanceFilterWithOffsetV2(TapConnectorContext connectorCon }); } + //for SQL Server type (with OFFSET-FETCH) + protected void queryByAdvanceFilterWithOffsetFetch(TapConnectorContext connectorContext, TapAdvanceFilter filter, TapTable table, Consumer consumer) throws Throwable { + String sql = commonSqlMaker.buildSelectClause(table, filter, false) + getSchemaAndTable(table.getId()) + commonSqlMaker.buildSqlByAdvanceFilterWithOffsetFetch(filter); + jdbcContext.query(sql, resultSet -> { + FilterResults filterResults = new FilterResults(); + try { + while (resultSet.next()) { + List allColumn = DbKit.getColumnsFromResultSet(resultSet); + DataMap dataMap = DbKit.getRowFromResultSet(resultSet, allColumn); + processDataMap(dataMap, table); + filterResults.add(dataMap); + if (filterResults.getResults().size() == BATCH_ADVANCE_READ_LIMIT) { + consumer.accept(filterResults); + filterResults = new FilterResults(); + } + } + } catch (SQLException e) { + exceptionCollector.collectTerminateByServer(e); + exceptionCollector.collectReadPrivileges("batchReadWithoutOffset", Collections.emptyList(), e); + exceptionCollector.revealException(e); + throw e; + } + if (EmptyKit.isNotEmpty(filterResults.getResults())) { + consumer.accept(filterResults); + } + }); + } + protected void beginTransaction(TapConnectorContext connectorContext) throws Throwable { isTransaction = true; } @@ -1010,6 +1038,17 @@ protected long countByAdvanceFilterV2(TapConnectorContext connectorContext, TapT return count.get(); } + protected long countByAdvanceFilterWithOffsetFetch(TapConnectorContext connectorContext, TapTable tapTable, TapAdvanceFilter tapAdvanceFilter) throws SQLException { + AtomicLong count = new AtomicLong(0); + String sql = "SELECT COUNT(1) FROM " + getSchemaAndTable(tapTable.getId()) + commonSqlMaker.buildSqlByAdvanceFilterWithOffsetFetch(tapAdvanceFilter); + jdbcContext.query(sql, resultSet -> { + if (resultSet.next()) { + count.set(resultSet.getLong(1)); + } + }); + return count.get(); + } + protected List getAfterUniqueAutoIncrementFields(TapTable tapTable, List indexList) { return new ArrayList<>(); } diff --git a/connectors-common/sql-core/src/main/java/io/tapdata/common/CommonSqlMaker.java b/connectors-common/sql-core/src/main/java/io/tapdata/common/CommonSqlMaker.java index 61294e38a..91c1c4192 100644 --- a/connectors-common/sql-core/src/main/java/io/tapdata/common/CommonSqlMaker.java +++ b/connectors-common/sql-core/src/main/java/io/tapdata/common/CommonSqlMaker.java @@ -317,6 +317,33 @@ public void buildRowNumberClause(StringBuilder builder, TapAdvanceFilter filter) } } + /** + * build subSql after where for advance query using OFFSET-FETCH syntax + * + * @param filter condition of advance query + * @return where substring + */ + public String buildSqlByAdvanceFilterWithOffsetFetch(TapAdvanceFilter filter) { + StringBuilder builder = new StringBuilder(); + buildWhereClause(builder, filter); + buildOrderClause(builder, filter); + buildOffsetFetchClause(builder, filter); + return builder.toString(); + } + + public void buildOffsetFetchClause(StringBuilder builder, TapAdvanceFilter filter) { + if (EmptyKit.isNotNull(filter.getSkip())) { + builder.append(" OFFSET ").append(filter.getSkip()).append(" ROWS "); + } else if (EmptyKit.isNotNull(filter.getLimit())) { + // FETCH requires OFFSET, so add OFFSET 0 if only LIMIT is specified + builder.append(" OFFSET 0 ROWS "); + } + + if (EmptyKit.isNotNull(filter.getLimit())) { + builder.append(" FETCH FIRST ").append(filter.getLimit()).append(" ROWS ONLY "); + } + } + /** * set value for each column in sql * e.g. diff --git a/connectors-common/sql-core/src/test/java/io/tapdata/common/CommonSqlMakerTest.java b/connectors-common/sql-core/src/test/java/io/tapdata/common/CommonSqlMakerTest.java index 8a6e15ad4..b65018c0c 100644 --- a/connectors-common/sql-core/src/test/java/io/tapdata/common/CommonSqlMakerTest.java +++ b/connectors-common/sql-core/src/test/java/io/tapdata/common/CommonSqlMakerTest.java @@ -1,10 +1,12 @@ package io.tapdata.common; import io.tapdata.entity.utils.DataMap; +import io.tapdata.pdk.apis.entity.SortOn; import io.tapdata.pdk.apis.entity.TapAdvanceFilter; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -76,5 +78,102 @@ void buildCommandWhereSqlParamsWithWhereTest(){ Assertions.assertEquals(whereSql,actualData); } + @Test + void buildSqlByAdvanceFilterWithOffsetFetchTest() { + CommonSqlMaker commonSqlMaker = new CommonSqlMaker(); + TapAdvanceFilter filter = new TapAdvanceFilter(); + + // Test with both skip and limit + filter.setSkip(10); + filter.setLimit(20); + filter.setSortOnList(Arrays.asList(new SortOn("id", SortOn.ASCENDING))); + + String result = commonSqlMaker.buildSqlByAdvanceFilterWithOffsetFetch(filter); + String expected = " ORDER BY \"id\" ASC OFFSET 10 ROWS FETCH FIRST 20 ROWS ONLY "; + Assertions.assertEquals(expected, result); + } + + @Test + void buildSqlByAdvanceFilterWithOffsetFetchOnlyLimitTest() { + CommonSqlMaker commonSqlMaker = new CommonSqlMaker(); + TapAdvanceFilter filter = new TapAdvanceFilter(); + + // Test with only limit (should add OFFSET 0) + filter.setLimit(15); + filter.setSortOnList(Arrays.asList(new SortOn("name", SortOn.DESCENDING))); + + String result = commonSqlMaker.buildSqlByAdvanceFilterWithOffsetFetch(filter); + String expected = " ORDER BY \"name\" DESC OFFSET 0 ROWS FETCH FIRST 15 ROWS ONLY "; + Assertions.assertEquals(expected, result); + } + + @Test + void buildSqlByAdvanceFilterWithOffsetFetchOnlySkipTest() { + CommonSqlMaker commonSqlMaker = new CommonSqlMaker(); + TapAdvanceFilter filter = new TapAdvanceFilter(); + + // Test with only skip + filter.setSkip(5); + filter.setSortOnList(Arrays.asList(new SortOn("created_at", SortOn.ASCENDING))); + + String result = commonSqlMaker.buildSqlByAdvanceFilterWithOffsetFetch(filter); + String expected = " ORDER BY \"created_at\" ASC OFFSET 5 ROWS "; + Assertions.assertEquals(expected, result); + } + + @Test + void buildSqlByAdvanceFilterWithOffsetFetchNoOrderByTest() { + CommonSqlMaker commonSqlMaker = new CommonSqlMaker(); + TapAdvanceFilter filter = new TapAdvanceFilter(); + + // Test without ORDER BY but with pagination (should not add ORDER BY) + filter.setSkip(10); + filter.setLimit(20); + + String result = commonSqlMaker.buildSqlByAdvanceFilterWithOffsetFetch(filter); + String expected = " OFFSET 10 ROWS FETCH FIRST 20 ROWS ONLY "; + Assertions.assertEquals(expected, result); + } + + @Test + void buildOffsetFetchClauseTest() { + CommonSqlMaker commonSqlMaker = new CommonSqlMaker(); + StringBuilder builder = new StringBuilder(); + TapAdvanceFilter filter = new TapAdvanceFilter(); + + filter.setSkip(100); + filter.setLimit(50); + filter.setSortOnList(Arrays.asList(new SortOn("id", SortOn.ASCENDING))); + + commonSqlMaker.buildOffsetFetchClause(builder, filter); + String result = builder.toString(); + String expected = " OFFSET 100 ROWS FETCH FIRST 50 ROWS ONLY "; + Assertions.assertEquals(expected, result); + } + + @Test + void buildSqlByAdvanceFilterWithOffsetFetchNoPaginationTest() { + CommonSqlMaker commonSqlMaker = new CommonSqlMaker(); + TapAdvanceFilter filter = new TapAdvanceFilter(); + + // Test without any pagination parameters + filter.setSortOnList(Arrays.asList(new SortOn("id", SortOn.ASCENDING))); + + String result = commonSqlMaker.buildSqlByAdvanceFilterWithOffsetFetch(filter); + String expected = " ORDER BY \"id\" ASC "; + Assertions.assertEquals(expected, result); + } + + @Test + void buildSqlByAdvanceFilterWithOffsetFetchEmptyTest() { + CommonSqlMaker commonSqlMaker = new CommonSqlMaker(); + TapAdvanceFilter filter = new TapAdvanceFilter(); + + // Test with completely empty filter + String result = commonSqlMaker.buildSqlByAdvanceFilterWithOffsetFetch(filter); + String expected = ""; + Assertions.assertEquals(expected, result); + } + }