Skip to content

Commit ace9a59

Browse files
authored
[flink] Automatically detect changes of external-paths strategy (apache#6024)
1 parent 2c73863 commit ace9a59

25 files changed

+856
-34
lines changed

docs/layouts/shortcodes/generated/flink_connector_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,12 @@
326326
<td>MemorySize</td>
327327
<td>Sink writer memory to control heap memory of writer.</td>
328328
</tr>
329+
<tr>
330+
<td><h5>sink.writer-refresh-detectors</h5></td>
331+
<td style="word-wrap: break-word;">(none)</td>
332+
<td>String</td>
333+
<td>The option groups which are expected to be refreshed when streaming writing, multiple option group separated by commas. Now only 'external-paths' is supported.</td>
334+
</tr>
329335
<tr>
330336
<td><h5>source.checkpoint-align.enabled</h5></td>
331337
<td style="word-wrap: break-word;">false</td>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2683,6 +2683,18 @@ public String externalSpecificFS() {
26832683
return options.get(DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS);
26842684
}
26852685

2686+
public Map<String, String> configGroups(Set<String> groups) {
2687+
Map<String, String> configs = new HashMap<>();
2688+
// external-paths config group
2689+
String externalPaths = "external-paths";
2690+
if (groups.contains(externalPaths)) {
2691+
configs.put(DATA_FILE_EXTERNAL_PATHS.key(), dataFileExternalPaths());
2692+
configs.put(DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(), externalPathStrategy().toString());
2693+
configs.put(DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS.key(), externalSpecificFS());
2694+
}
2695+
return configs;
2696+
}
2697+
26862698
public Boolean forceRewriteAllFiles() {
26872699
return options.get(COMPACTION_FORCE_REWRITE_ALL_FILES);
26882700
}

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableWriteOperator.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,13 @@ protected StoreSinkWriteState createState(
5353
return new NoopStoreSinkWriteState(subtaskId);
5454
}
5555

56+
@Override
57+
protected String getCommitUser(StateInitializationContext context) throws Exception {
58+
// No conflicts will occur in append only unaware bucket writer, so
59+
// commitUser does not matter.
60+
return commitUser == null ? initialCommitUser : commitUser;
61+
}
62+
5663
@Override
5764
public void processElement(StreamRecord<CdcRecord> element) throws Exception {
5865
// only accepts INSERT record

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,15 @@ public class FlinkConnectorOptions {
530530
.defaultValue(true)
531531
.withDescription("Enable pass job level filesystem settings to table file IO.");
532532

533+
public static final ConfigOption<String> SINK_WRITER_REFRESH_DETECTORS =
534+
key("sink.writer-refresh-detectors")
535+
.stringType()
536+
.noDefaultValue()
537+
.withDescription(
538+
"The option groups which are expected to be refreshed when streaming writing, "
539+
+ "multiple option group separated by commas. "
540+
+ "Now only 'external-paths' is supported.");
541+
533542
public static List<ConfigOption<?>> getOptions() {
534543
final Field[] fields = FlinkConnectorOptions.class.getFields();
535544
final List<ConfigOption<?>> list = new ArrayList<>(fields.length);

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
import org.apache.paimon.data.BinaryRow;
2525
import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
2626
import org.apache.paimon.flink.sink.Committable;
27+
import org.apache.paimon.flink.sink.WriterRefresher;
2728
import org.apache.paimon.operation.BaseAppendFileStoreWrite;
29+
import org.apache.paimon.operation.FileStoreWrite;
2830
import org.apache.paimon.operation.metrics.CompactionMetrics;
2931
import org.apache.paimon.operation.metrics.MetricUtils;
3032
import org.apache.paimon.table.FileStoreTable;
@@ -53,22 +55,25 @@ public class AppendTableCompactor {
5355

5456
private static final Logger LOG = LoggerFactory.getLogger(AppendTableCompactor.class);
5557

56-
private final FileStoreTable table;
58+
private FileStoreTable table;
5759
private final String commitUser;
5860

59-
private final transient BaseAppendFileStoreWrite write;
61+
private transient BaseAppendFileStoreWrite write;
6062

6163
protected final transient Queue<Future<CommitMessage>> result;
6264

6365
private final transient Supplier<ExecutorService> compactExecutorsupplier;
6466
@Nullable private final transient CompactionMetrics compactionMetrics;
6567
@Nullable private final transient CompactionMetrics.Reporter metricsReporter;
6668

69+
@Nullable protected final transient WriterRefresher<BaseAppendFileStoreWrite> writeRefresher;
70+
6771
public AppendTableCompactor(
6872
FileStoreTable table,
6973
String commitUser,
7074
Supplier<ExecutorService> lazyCompactExecutor,
71-
@Nullable MetricGroup metricGroup) {
75+
@Nullable MetricGroup metricGroup,
76+
boolean isStreaming) {
7277
this.table = table;
7378
this.commitUser = commitUser;
7479
CoreOptions coreOptions = table.coreOptions();
@@ -87,6 +92,12 @@ public AppendTableCompactor(
8792
? null
8893
// partition and bucket fields are no use.
8994
: this.compactionMetrics.createReporter(BinaryRow.EMPTY_ROW, 0);
95+
if (isStreaming) {
96+
this.writeRefresher =
97+
new WriterRefresher<>(table, write, (newTable, writer) -> replace(newTable));
98+
} else {
99+
this.writeRefresher = null;
100+
}
90101
}
91102

92103
public void processElement(AppendCompactTask task) throws Exception {
@@ -205,4 +216,22 @@ public List<Committable> prepareCommit(boolean waitCompaction, long checkpointId
205216
public Iterable<Future<CommitMessage>> result() {
206217
return result;
207218
}
219+
220+
private void replace(FileStoreTable newTable) throws Exception {
221+
222+
List<? extends FileStoreWrite.State<?>> states = write.checkpoint();
223+
write.close();
224+
write = (BaseAppendFileStoreWrite) newTable.store().newWrite(commitUser);
225+
write.restore((List) states);
226+
}
227+
228+
public void tryRefreshWrite() {
229+
if (commitUser == null) {
230+
return;
231+
}
232+
233+
if (writeRefresher != null) {
234+
writeRefresher.tryRefresh();
235+
}
236+
}
208237
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendBypassCompactWorkerOperator.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ public class AppendBypassCompactWorkerOperator
3434
private AppendBypassCompactWorkerOperator(
3535
StreamOperatorParameters<Committable> parameters,
3636
FileStoreTable table,
37-
String commitUser) {
38-
super(parameters, table, commitUser);
37+
String commitUser,
38+
boolean isStreaming) {
39+
super(parameters, table, commitUser, isStreaming);
3940
}
4041

4142
@Override
@@ -57,15 +58,17 @@ public void processElement(StreamRecord<Either<Committable, AppendCompactTask>>
5758
public static class Factory
5859
extends AppendCompactWorkerOperator.Factory<Either<Committable, AppendCompactTask>> {
5960

60-
public Factory(FileStoreTable table, String initialCommitUser) {
61-
super(table, initialCommitUser);
61+
public Factory(FileStoreTable table, String initialCommitUser, boolean isStreaming) {
62+
super(table, initialCommitUser, isStreaming);
6263
}
6364

6465
@Override
6566
@SuppressWarnings("unchecked")
6667
public <T extends StreamOperator<Committable>> T createStreamOperator(
6768
StreamOperatorParameters<Committable> parameters) {
68-
return (T) new AppendBypassCompactWorkerOperator(parameters, table, commitUser);
69+
return (T)
70+
new AppendBypassCompactWorkerOperator(
71+
parameters, table, commitUser, isStreaming);
6972
}
7073

7174
@Override

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendCompactWorkerOperator.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,17 @@ public abstract class AppendCompactWorkerOperator<IN>
5555

5656
private transient ExecutorService lazyCompactExecutor;
5757

58+
private final boolean isStreaming;
59+
5860
public AppendCompactWorkerOperator(
5961
StreamOperatorParameters<Committable> parameters,
6062
FileStoreTable table,
61-
String commitUser) {
63+
String commitUser,
64+
boolean isStreaming) {
6265
super(parameters, Options.fromMap(table.options()));
6366
this.table = table;
6467
this.commitUser = commitUser;
68+
this.isStreaming = isStreaming;
6569
}
6670

6771
@VisibleForTesting
@@ -73,13 +77,17 @@ Iterable<Future<CommitMessage>> result() {
7377
public void open() throws Exception {
7478
LOG.debug("Opened a append-only table compaction worker.");
7579
this.unawareBucketCompactor =
76-
new AppendTableCompactor(table, commitUser, this::workerExecutor, getMetricGroup());
80+
new AppendTableCompactor(
81+
table, commitUser, this::workerExecutor, getMetricGroup(), isStreaming);
7782
}
7883

7984
@Override
8085
protected List<Committable> prepareCommit(boolean waitCompaction, long checkpointId)
8186
throws IOException {
82-
return this.unawareBucketCompactor.prepareCommit(waitCompaction, checkpointId);
87+
List<Committable> committables =
88+
this.unawareBucketCompactor.prepareCommit(waitCompaction, checkpointId);
89+
this.unawareBucketCompactor.tryRefreshWrite();
90+
return committables;
8391
}
8492

8593
private ExecutorService workerExecutor() {
@@ -111,11 +119,13 @@ protected abstract static class Factory<IN>
111119
extends PrepareCommitOperator.Factory<IN, Committable> {
112120
protected final FileStoreTable table;
113121
protected final String commitUser;
122+
protected final boolean isStreaming;
114123

115-
protected Factory(FileStoreTable table, String commitUser) {
124+
protected Factory(FileStoreTable table, String commitUser, boolean isStreaming) {
116125
super(Options.fromMap(table.options()));
117126
this.table = table;
118127
this.commitUser = commitUser;
128+
this.isStreaming = isStreaming;
119129
}
120130
}
121131
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,18 @@ public class AppendOnlyMultiTableCompactionWorkerOperator
6565

6666
private transient Catalog catalog;
6767

68+
private final boolean isStreaming;
69+
6870
private AppendOnlyMultiTableCompactionWorkerOperator(
6971
StreamOperatorParameters<MultiTableCommittable> parameters,
7072
CatalogLoader catalogLoader,
7173
String commitUser,
72-
Options options) {
74+
Options options,
75+
boolean isStreaming) {
7376
super(parameters, options);
7477
this.commitUser = commitUser;
7578
this.catalogLoader = catalogLoader;
79+
this.isStreaming = isStreaming;
7680
}
7781

7882
@Override
@@ -119,7 +123,8 @@ private AppendTableCompactor compactor(Identifier tableId) {
119123
(FileStoreTable) catalog.getTable(tableId).copy(options.toMap()),
120124
commitUser,
121125
this::workerExecutor,
122-
getMetricGroup());
126+
getMetricGroup(),
127+
isStreaming);
123128
} catch (Catalog.TableNotExistException e) {
124129
throw new RuntimeException(e);
125130
}
@@ -188,11 +193,17 @@ public static class Factory
188193

189194
private final String commitUser;
190195
private final CatalogLoader catalogLoader;
196+
private final boolean isStreaming;
191197

192-
public Factory(CatalogLoader catalogLoader, String commitUser, Options options) {
198+
public Factory(
199+
CatalogLoader catalogLoader,
200+
String commitUser,
201+
Options options,
202+
boolean isStreaming) {
193203
super(options);
194204
this.commitUser = commitUser;
195205
this.catalogLoader = catalogLoader;
206+
this.isStreaming = isStreaming;
196207
}
197208

198209
@Override
@@ -201,7 +212,7 @@ public <T extends StreamOperator<MultiTableCommittable>> T createStreamOperator(
201212
StreamOperatorParameters<MultiTableCommittable> parameters) {
202213
return (T)
203214
new AppendOnlyMultiTableCompactionWorkerOperator(
204-
parameters, catalogLoader, commitUser, options);
215+
parameters, catalogLoader, commitUser, options, isStreaming);
205216
}
206217

207218
@Override

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,9 @@ public class AppendOnlySingleTableCompactionWorkerOperator
3737
private AppendOnlySingleTableCompactionWorkerOperator(
3838
StreamOperatorParameters<Committable> parameters,
3939
FileStoreTable table,
40-
String commitUser) {
41-
super(parameters, table, commitUser);
40+
String commitUser,
41+
boolean isStreaming) {
42+
super(parameters, table, commitUser, isStreaming);
4243
}
4344

4445
@Override
@@ -49,8 +50,8 @@ public void processElement(StreamRecord<AppendCompactTask> element) throws Excep
4950
/** {@link StreamOperatorFactory} of {@link AppendOnlySingleTableCompactionWorkerOperator}. */
5051
public static class Factory extends AppendCompactWorkerOperator.Factory<AppendCompactTask> {
5152

52-
public Factory(FileStoreTable table, String initialCommitUser) {
53-
super(table, initialCommitUser);
53+
public Factory(FileStoreTable table, String initialCommitUser, boolean isStreaming) {
54+
super(table, initialCommitUser, isStreaming);
5455
}
5556

5657
@Override
@@ -59,7 +60,7 @@ public <T extends StreamOperator<Committable>> T createStreamOperator(
5960
StreamOperatorParameters<Committable> parameters) {
6061
return (T)
6162
new AppendOnlySingleTableCompactionWorkerOperator(
62-
parameters, table, commitUser);
63+
parameters, table, commitUser, isStreaming);
6364
}
6465

6566
@Override

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendTableCompactSink.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,25 @@
3030
public class AppendTableCompactSink extends FlinkSink<AppendCompactTask> {
3131

3232
private final FileStoreTable table;
33+
private final boolean isStreaming;
3334

34-
public AppendTableCompactSink(FileStoreTable table) {
35+
public AppendTableCompactSink(FileStoreTable table, boolean isStreaming) {
3536
super(table, true);
3637
this.table = table;
38+
this.isStreaming = isStreaming;
3739
}
3840

3941
public static DataStreamSink<?> sink(
4042
FileStoreTable table, DataStream<AppendCompactTask> input) {
41-
return new AppendTableCompactSink(table).sinkFrom(input);
43+
boolean isStreaming = isStreaming(input);
44+
return new AppendTableCompactSink(table, isStreaming).sinkFrom(input);
4245
}
4346

4447
@Override
4548
protected OneInputStreamOperatorFactory<AppendCompactTask, Committable>
4649
createWriteOperatorFactory(StoreSinkWrite.Provider writeProvider, String commitUser) {
47-
return new AppendOnlySingleTableCompactionWorkerOperator.Factory(table, commitUser);
50+
return new AppendOnlySingleTableCompactionWorkerOperator.Factory(
51+
table, commitUser, isStreaming);
4852
}
4953

5054
@Override

0 commit comments

Comments
 (0)