Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,34 @@ protected void queryByAdvanceFilterWithOffsetV2(TapConnectorContext connectorCon
});
}

//for SQL Server type (with OFFSET-FETCH)
protected void queryByAdvanceFilterWithOffsetFetch(TapConnectorContext connectorContext, TapAdvanceFilter filter, TapTable table, Consumer<FilterResults> consumer) throws Throwable {
String sql = commonSqlMaker.buildSelectClause(table, filter, false) + getSchemaAndTable(table.getId()) + commonSqlMaker.buildSqlByAdvanceFilterWithOffsetFetch(filter);
jdbcContext.query(sql, resultSet -> {
FilterResults filterResults = new FilterResults();
try {
while (resultSet.next()) {
List<String> allColumn = DbKit.getColumnsFromResultSet(resultSet);
DataMap dataMap = DbKit.getRowFromResultSet(resultSet, allColumn);
processDataMap(dataMap, table);
filterResults.add(dataMap);
if (filterResults.getResults().size() == BATCH_ADVANCE_READ_LIMIT) {
consumer.accept(filterResults);
filterResults = new FilterResults();
}
}
} catch (SQLException e) {
exceptionCollector.collectTerminateByServer(e);
exceptionCollector.collectReadPrivileges("batchReadWithoutOffset", Collections.emptyList(), e);
exceptionCollector.revealException(e);
throw e;
}
if (EmptyKit.isNotEmpty(filterResults.getResults())) {
consumer.accept(filterResults);
}
});
}

protected void beginTransaction(TapConnectorContext connectorContext) throws Throwable {
isTransaction = true;
}
Expand Down Expand Up @@ -1010,6 +1038,17 @@ protected long countByAdvanceFilterV2(TapConnectorContext connectorContext, TapT
return count.get();
}

protected long countByAdvanceFilterWithOffsetFetch(TapConnectorContext connectorContext, TapTable tapTable, TapAdvanceFilter tapAdvanceFilter) throws SQLException {
AtomicLong count = new AtomicLong(0);
String sql = "SELECT COUNT(1) FROM " + getSchemaAndTable(tapTable.getId()) + commonSqlMaker.buildSqlByAdvanceFilterWithOffsetFetch(tapAdvanceFilter);
jdbcContext.query(sql, resultSet -> {
if (resultSet.next()) {
count.set(resultSet.getLong(1));
}
});
return count.get();
}

protected List<String> getAfterUniqueAutoIncrementFields(TapTable tapTable, List<TapIndex> indexList) {
return new ArrayList<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,33 @@ public void buildRowNumberClause(StringBuilder builder, TapAdvanceFilter filter)
}
}

/**
* build subSql after where for advance query using OFFSET-FETCH syntax
*
* @param filter condition of advance query
* @return where substring
*/
public String buildSqlByAdvanceFilterWithOffsetFetch(TapAdvanceFilter filter) {
StringBuilder builder = new StringBuilder();
buildWhereClause(builder, filter);
buildOrderClause(builder, filter);
buildOffsetFetchClause(builder, filter);
return builder.toString();
}

public void buildOffsetFetchClause(StringBuilder builder, TapAdvanceFilter filter) {
if (EmptyKit.isNotNull(filter.getSkip())) {
builder.append(" OFFSET ").append(filter.getSkip()).append(" ROWS ");
} else if (EmptyKit.isNotNull(filter.getLimit())) {
// FETCH requires OFFSET, so add OFFSET 0 if only LIMIT is specified
builder.append(" OFFSET 0 ROWS ");
}

if (EmptyKit.isNotNull(filter.getLimit())) {
builder.append(" FETCH FIRST ").append(filter.getLimit()).append(" ROWS ONLY ");
}
}

/**
* set value for each column in sql
* e.g.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package io.tapdata.common;

import io.tapdata.entity.utils.DataMap;
import io.tapdata.pdk.apis.entity.SortOn;
import io.tapdata.pdk.apis.entity.TapAdvanceFilter;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -76,5 +78,102 @@ void buildCommandWhereSqlParamsWithWhereTest(){
Assertions.assertEquals(whereSql,actualData);
}

@Test
void buildSqlByAdvanceFilterWithOffsetFetchTest() {
CommonSqlMaker commonSqlMaker = new CommonSqlMaker();
TapAdvanceFilter filter = new TapAdvanceFilter();

// Test with both skip and limit
filter.setSkip(10);
filter.setLimit(20);
filter.setSortOnList(Arrays.asList(new SortOn("id", SortOn.ASCENDING)));

String result = commonSqlMaker.buildSqlByAdvanceFilterWithOffsetFetch(filter);
String expected = " ORDER BY \"id\" ASC OFFSET 10 ROWS FETCH FIRST 20 ROWS ONLY ";
Assertions.assertEquals(expected, result);
}

@Test
void buildSqlByAdvanceFilterWithOffsetFetchOnlyLimitTest() {
CommonSqlMaker commonSqlMaker = new CommonSqlMaker();
TapAdvanceFilter filter = new TapAdvanceFilter();

// Test with only limit (should add OFFSET 0)
filter.setLimit(15);
filter.setSortOnList(Arrays.asList(new SortOn("name", SortOn.DESCENDING)));

String result = commonSqlMaker.buildSqlByAdvanceFilterWithOffsetFetch(filter);
String expected = " ORDER BY \"name\" DESC OFFSET 0 ROWS FETCH FIRST 15 ROWS ONLY ";
Assertions.assertEquals(expected, result);
}

@Test
void buildSqlByAdvanceFilterWithOffsetFetchOnlySkipTest() {
CommonSqlMaker commonSqlMaker = new CommonSqlMaker();
TapAdvanceFilter filter = new TapAdvanceFilter();

// Test with only skip
filter.setSkip(5);
filter.setSortOnList(Arrays.asList(new SortOn("created_at", SortOn.ASCENDING)));

String result = commonSqlMaker.buildSqlByAdvanceFilterWithOffsetFetch(filter);
String expected = " ORDER BY \"created_at\" ASC OFFSET 5 ROWS ";
Assertions.assertEquals(expected, result);
}

@Test
void buildSqlByAdvanceFilterWithOffsetFetchNoOrderByTest() {
CommonSqlMaker commonSqlMaker = new CommonSqlMaker();
TapAdvanceFilter filter = new TapAdvanceFilter();

// Test without ORDER BY but with pagination (should not add ORDER BY)
filter.setSkip(10);
filter.setLimit(20);

String result = commonSqlMaker.buildSqlByAdvanceFilterWithOffsetFetch(filter);
String expected = " OFFSET 10 ROWS FETCH FIRST 20 ROWS ONLY ";
Assertions.assertEquals(expected, result);
}

@Test
void buildOffsetFetchClauseTest() {
CommonSqlMaker commonSqlMaker = new CommonSqlMaker();
StringBuilder builder = new StringBuilder();
TapAdvanceFilter filter = new TapAdvanceFilter();

filter.setSkip(100);
filter.setLimit(50);
filter.setSortOnList(Arrays.asList(new SortOn("id", SortOn.ASCENDING)));

commonSqlMaker.buildOffsetFetchClause(builder, filter);
String result = builder.toString();
String expected = " OFFSET 100 ROWS FETCH FIRST 50 ROWS ONLY ";
Assertions.assertEquals(expected, result);
}

@Test
void buildSqlByAdvanceFilterWithOffsetFetchNoPaginationTest() {
CommonSqlMaker commonSqlMaker = new CommonSqlMaker();
TapAdvanceFilter filter = new TapAdvanceFilter();

// Test without any pagination parameters
filter.setSortOnList(Arrays.asList(new SortOn("id", SortOn.ASCENDING)));

String result = commonSqlMaker.buildSqlByAdvanceFilterWithOffsetFetch(filter);
String expected = " ORDER BY \"id\" ASC ";
Assertions.assertEquals(expected, result);
}

@Test
void buildSqlByAdvanceFilterWithOffsetFetchEmptyTest() {
CommonSqlMaker commonSqlMaker = new CommonSqlMaker();
TapAdvanceFilter filter = new TapAdvanceFilter();

// Test with completely empty filter
String result = commonSqlMaker.buildSqlByAdvanceFilterWithOffsetFetch(filter);
String expected = "";
Assertions.assertEquals(expected, result);
}


}
Loading