Skip to content

Commit cd67eec

Browse files
committed
[flink] Flink sink support detect new buckets for existing table or partition
1 parent 1f72acd commit cd67eec

File tree

3 files changed

+172
-2
lines changed

3 files changed

+172
-2
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/AppendSinkWriter.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,23 @@ public void flush(boolean endOfInput) throws IOException {
7272
TableWriter getTableWriter() {
7373
return appendWriter;
7474
}
75+
76+
@Override
77+
protected void updateTable() {
78+
appendWriter.flush();
79+
80+
try {
81+
table.close();
82+
} catch (Exception e) {
83+
LOG.warn("Exception occurs while closing Fluss table before update table.", e);
84+
}
85+
86+
table = connection.getTable(tablePath);
87+
appendWriter = table.newAppend().createWriter();
88+
89+
LOG.info(
90+
"Update table {}, current bucket {}.",
91+
tablePath,
92+
table.getTableInfo().getNumBuckets());
93+
}
7594
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriter.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,16 @@ public abstract class FlinkSinkWriter<InputT> implements SinkWriter<InputT> {
5656

5757
protected static final Logger LOG = LoggerFactory.getLogger(FlinkSinkWriter.class);
5858

59-
private final TablePath tablePath;
59+
private static final int REFRESH_INTERVAL_MS = 60_000;
60+
61+
protected final TablePath tablePath;
6062
private final Configuration flussConfig;
6163
protected final RowType tableRowType;
6264
protected final @Nullable int[] targetColumnIndexes;
6365
private final MailboxExecutor mailboxExecutor;
6466
private final FlussSerializationSchema<InputT> serializationSchema;
6567

66-
private transient Connection connection;
68+
protected transient Connection connection;
6769
protected transient Table table;
6870
protected transient FlinkMetricRegistry flinkMetricRegistry;
6971

@@ -73,6 +75,8 @@ public abstract class FlinkSinkWriter<InputT> implements SinkWriter<InputT> {
7375
private transient Counter numRecordsOutErrorsCounter;
7476
private volatile Throwable asyncWriterException;
7577

78+
private volatile long lastRefreshTime;
79+
7680
public FlinkSinkWriter(
7781
TablePath tablePath,
7882
Configuration flussConfig,
@@ -114,6 +118,7 @@ public void initialize(SinkWriterMetricGroup metricGroup) {
114118
table.getTableInfo().getSchema(),
115119
tableRowType);
116120
sanityCheck(table.getTableInfo());
121+
lastRefreshTime = System.currentTimeMillis();
117122

118123
try {
119124
this.serializationSchema.open(
@@ -161,6 +166,11 @@ public void write(InputT inputValue, Context context) throws IOException, Interr
161166
} catch (Exception e) {
162167
throw new IOException(e.getMessage(), e);
163168
}
169+
170+
if (System.currentTimeMillis() - lastRefreshTime > REFRESH_INTERVAL_MS) {
171+
updateTable();
172+
lastRefreshTime = System.currentTimeMillis();
173+
}
164174
}
165175

166176
@Override
@@ -242,4 +252,6 @@ protected void checkAsyncException() throws IOException {
242252

243253
@VisibleForTesting
244254
abstract TableWriter getTableWriter();
255+
256+
protected void updateTable() {}
245257
}

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriterTest.java

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,16 @@
2020
import org.apache.fluss.client.Connection;
2121
import org.apache.fluss.client.ConnectionFactory;
2222
import org.apache.fluss.client.admin.Admin;
23+
import org.apache.fluss.client.table.Table;
24+
import org.apache.fluss.client.table.scanner.log.LogScanner;
2325
import org.apache.fluss.config.ConfigOptions;
2426
import org.apache.fluss.config.Configuration;
2527
import org.apache.fluss.exception.NetworkException;
2628
import org.apache.fluss.flink.sink.serializer.RowDataSerializationSchema;
2729
import org.apache.fluss.flink.utils.FlinkTestBase;
2830
import org.apache.fluss.metadata.DatabaseDescriptor;
2931
import org.apache.fluss.metadata.Schema;
32+
import org.apache.fluss.metadata.TableBucket;
3033
import org.apache.fluss.metadata.TableChange;
3134
import org.apache.fluss.metadata.TableDescriptor;
3235
import org.apache.fluss.metadata.TablePath;
@@ -51,7 +54,12 @@
5154
import org.junit.jupiter.params.ParameterizedTest;
5255
import org.junit.jupiter.params.provider.ValueSource;
5356

57+
import java.lang.reflect.Field;
58+
import java.time.Duration;
5459
import java.util.Collections;
60+
import java.util.HashMap;
61+
import java.util.Map;
62+
import java.util.Set;
5563
import java.util.function.BiConsumer;
5664

5765
import static org.assertj.core.api.Assertions.assertThat;
@@ -265,6 +273,137 @@ private FlinkSinkWriter<RowData> createSinkWriter(
265273
serializationSchema);
266274
}
267275

276+
@Test
277+
void testTableInfoAutoUpdate() throws Exception {
278+
String testDb = "test-auto-update-db";
279+
TablePath testTablePath = TablePath.of(testDb, "test-auto-update-table");
280+
281+
// Create database
282+
admin.createDatabase(testDb, DatabaseDescriptor.EMPTY, true).get();
283+
284+
// Create log table with 3 buckets (no primary key)
285+
TableDescriptor tableDescriptor =
286+
TableDescriptor.builder()
287+
.schema(
288+
Schema.newBuilder()
289+
.column("id", DataTypes.INT())
290+
.column("name", DataTypes.STRING())
291+
.build())
292+
.distributedBy(3)
293+
.build();
294+
createTable(testTablePath, tableDescriptor);
295+
296+
Configuration clientConfig = FLUSS_CLUSTER_EXTENSION.getClientConfig();
297+
MockWriterInitContext mockWriterInitContext =
298+
new MockWriterInitContext(new InterceptingOperatorMetricGroup());
299+
300+
// Create AppendSinkWriter
301+
RowType tableRowType =
302+
RowType.of(
303+
new LogicalType[] {new IntType(), new CharType(10)},
304+
new String[] {"id", "name"});
305+
RowDataSerializationSchema serializationSchema =
306+
new RowDataSerializationSchema(true, false);
307+
AppendSinkWriter<RowData> writer =
308+
new AppendSinkWriter<>(
309+
testTablePath,
310+
clientConfig,
311+
tableRowType,
312+
mockWriterInitContext.getMailboxExecutor(),
313+
serializationSchema);
314+
315+
try {
316+
writer.initialize(mockWriterInitContext.metricGroup());
317+
318+
// Step 1: Write data with 3 buckets, verify success
319+
for (int i = 0; i < 10; i++) {
320+
writer.write(
321+
GenericRowData.of(i, StringData.fromString("name" + i)),
322+
new MockSinkWriterContext());
323+
}
324+
writer.flush(false);
325+
326+
// Verify data is written to 3 buckets
327+
Map<Integer, Integer> bucketCounts = countRecordsPerBucket(testTablePath, 3);
328+
assertThat(bucketCounts.size()).isEqualTo(3);
329+
int totalRecords = bucketCounts.values().stream().mapToInt(Integer::intValue).sum();
330+
assertThat(totalRecords).isEqualTo(10);
331+
332+
// Step 2: Alter table bucket number to 4
333+
admin.alterTable(
334+
testTablePath,
335+
Collections.singletonList(TableChange.set("bucket.num", "4")),
336+
false)
337+
.get();
338+
339+
// Wait for schema sync
340+
FLUSS_CLUSTER_EXTENSION.waitAllSchemaSync(testTablePath, 2);
341+
342+
// Step 3: Force update table by setting lastRefreshTime to trigger refresh
343+
Field lastRefreshTimeField = FlinkSinkWriter.class.getDeclaredField("lastRefreshTime");
344+
lastRefreshTimeField.setAccessible(true);
345+
lastRefreshTimeField.set(
346+
writer, System.currentTimeMillis() - 61000); // Set to 61 seconds ago
347+
348+
// Step 4: Write more data, should use 4 buckets now
349+
for (int i = 10; i < 20; i++) {
350+
writer.write(
351+
GenericRowData.of(i, StringData.fromString("name" + i)),
352+
new MockSinkWriterContext());
353+
}
354+
writer.flush(false);
355+
356+
// Step 5: Verify data is written to 4 buckets
357+
Map<Integer, Integer> newBucketCounts = countRecordsPerBucket(testTablePath, 4);
358+
assertThat(newBucketCounts.size()).isEqualTo(4);
359+
int newTotalRecords =
360+
newBucketCounts.values().stream().mapToInt(Integer::intValue).sum();
361+
assertThat(newTotalRecords).isEqualTo(20); // Total records from both writes
362+
363+
// Verify that we have records in all 4 buckets
364+
Set<Integer> bucketsWithData = newBucketCounts.keySet();
365+
assertThat(bucketsWithData).hasSize(4);
366+
for (int bucket = 0; bucket < 4; bucket++) {
367+
assertThat(bucketsWithData).contains(bucket);
368+
}
369+
} finally {
370+
writer.close();
371+
}
372+
}
373+
374+
private Map<Integer, Integer> countRecordsPerBucket(TablePath tablePath, int expectedBuckets)
375+
throws Exception {
376+
Map<Integer, Integer> bucketCounts = new HashMap<>();
377+
Configuration clientConfig = FLUSS_CLUSTER_EXTENSION.getClientConfig();
378+
try (Connection connection = ConnectionFactory.createConnection(clientConfig);
379+
Table table = connection.getTable(tablePath);
380+
LogScanner logScanner = table.newScan().createLogScanner()) {
381+
// Subscribe to all buckets from beginning
382+
for (int bucket = 0; bucket < expectedBuckets; bucket++) {
383+
logScanner.subscribeFromBeginning(bucket);
384+
}
385+
386+
// Collect all records and count by bucket
387+
int totalScanned = 0;
388+
int maxRecords = 50; // Limit to avoid infinite loop
389+
while (totalScanned < maxRecords) {
390+
org.apache.fluss.client.table.scanner.log.ScanRecords scanRecords =
391+
logScanner.poll(Duration.ofSeconds(1));
392+
if (scanRecords.isEmpty()) {
393+
break;
394+
}
395+
for (TableBucket tableBucket : scanRecords.buckets()) {
396+
int bucketId = tableBucket.getBucket();
397+
int recordCount = scanRecords.records(tableBucket).size();
398+
bucketCounts.put(
399+
bucketId, bucketCounts.getOrDefault(bucketId, 0) + recordCount);
400+
totalScanned += recordCount;
401+
}
402+
}
403+
}
404+
return bucketCounts;
405+
}
406+
268407
static class MockSinkWriterContext implements SinkWriter.Context {
269408
@Override
270409
public long currentWatermark() {

0 commit comments

Comments
 (0)