Skip to content

Commit 3edf6f6

Browse files
committed
[flink] Refactor WriterRefresher to better abstraction
1 parent ace9a59 commit 3edf6f6

File tree

6 files changed

+98
-157
lines changed

6 files changed

+98
-157
lines changed

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2683,18 +2683,6 @@ public String externalSpecificFS() {
26832683
return options.get(DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS);
26842684
}
26852685

2686-
public Map<String, String> configGroups(Set<String> groups) {
2687-
Map<String, String> configs = new HashMap<>();
2688-
// external-paths config group
2689-
String externalPaths = "external-paths";
2690-
if (groups.contains(externalPaths)) {
2691-
configs.put(DATA_FILE_EXTERNAL_PATHS.key(), dataFileExternalPaths());
2692-
configs.put(DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(), externalPathStrategy().toString());
2693-
configs.put(DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS.key(), externalSpecificFS());
2694-
}
2695-
return configs;
2696-
}
2697-
26982686
public Boolean forceRewriteAllFiles() {
26992687
return options.get(COMPACTION_FORCE_REWRITE_ALL_FILES);
27002688
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@
2222
import org.apache.paimon.annotation.VisibleForTesting;
2323
import org.apache.paimon.append.AppendCompactTask;
2424
import org.apache.paimon.data.BinaryRow;
25+
import org.apache.paimon.data.InternalRow;
2526
import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
2627
import org.apache.paimon.flink.sink.Committable;
2728
import org.apache.paimon.flink.sink.WriterRefresher;
2829
import org.apache.paimon.operation.BaseAppendFileStoreWrite;
29-
import org.apache.paimon.operation.FileStoreWrite;
30+
import org.apache.paimon.operation.FileStoreWrite.State;
3031
import org.apache.paimon.operation.metrics.CompactionMetrics;
3132
import org.apache.paimon.operation.metrics.MetricUtils;
3233
import org.apache.paimon.table.FileStoreTable;
@@ -56,17 +57,15 @@ public class AppendTableCompactor {
5657
private static final Logger LOG = LoggerFactory.getLogger(AppendTableCompactor.class);
5758

5859
private FileStoreTable table;
59-
private final String commitUser;
60-
61-
private transient BaseAppendFileStoreWrite write;
60+
private BaseAppendFileStoreWrite write;
6261

63-
protected final transient Queue<Future<CommitMessage>> result;
64-
65-
private final transient Supplier<ExecutorService> compactExecutorsupplier;
66-
@Nullable private final transient CompactionMetrics compactionMetrics;
67-
@Nullable private final transient CompactionMetrics.Reporter metricsReporter;
62+
private final String commitUser;
63+
protected final Queue<Future<CommitMessage>> result;
64+
private final Supplier<ExecutorService> compactExecutorsupplier;
65+
@Nullable private final CompactionMetrics compactionMetrics;
66+
@Nullable private final CompactionMetrics.Reporter metricsReporter;
6867

69-
@Nullable protected final transient WriterRefresher<BaseAppendFileStoreWrite> writeRefresher;
68+
@Nullable protected final WriterRefresher writeRefresher;
7069

7170
public AppendTableCompactor(
7271
FileStoreTable table,
@@ -92,12 +91,7 @@ public AppendTableCompactor(
9291
? null
9392
// partition and bucket fields are no use.
9493
: this.compactionMetrics.createReporter(BinaryRow.EMPTY_ROW, 0);
95-
if (isStreaming) {
96-
this.writeRefresher =
97-
new WriterRefresher<>(table, write, (newTable, writer) -> replace(newTable));
98-
} else {
99-
this.writeRefresher = null;
100-
}
94+
this.writeRefresher = WriterRefresher.create(isStreaming, table, this::replace);
10195
}
10296

10397
public void processElement(AppendCompactTask task) throws Exception {
@@ -218,18 +212,17 @@ public Iterable<Future<CommitMessage>> result() {
218212
}
219213

220214
private void replace(FileStoreTable newTable) throws Exception {
221-
222-
List<? extends FileStoreWrite.State<?>> states = write.checkpoint();
223-
write.close();
224-
write = (BaseAppendFileStoreWrite) newTable.store().newWrite(commitUser);
225-
write.restore((List) states);
215+
this.table = newTable;
216+
List<State<InternalRow>> states = write.checkpoint();
217+
this.write.close();
218+
this.write = (BaseAppendFileStoreWrite) newTable.store().newWrite(commitUser);
219+
this.write.restore(states);
226220
}
227221

228222
public void tryRefreshWrite() {
229223
if (commitUser == null) {
230224
return;
231225
}
232-
233226
if (writeRefresher != null) {
234227
writeRefresher.tryRefresh();
235228
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public class StoreCompactOperator extends PrepareCommitOperator<RowData, Committ
7070
private transient DataFileMetaSerializer dataFileMetaSerializer;
7171
private transient Set<Pair<BinaryRow, Integer>> waitToCompact;
7272

73-
protected transient @Nullable WriterRefresher<StoreSinkWrite> writeRefresher;
73+
protected transient @Nullable WriterRefresher writeRefresher;
7474

7575
public StoreCompactOperator(
7676
StreamOperatorParameters<Committable> parameters,
@@ -119,18 +119,7 @@ public void initializeState(StateInitializationContext context) throws Exception
119119
getContainingTask().getEnvironment().getIOManager(),
120120
memoryPool,
121121
getMetricGroup());
122-
123-
if (write.streamingMode()) {
124-
writeRefresher =
125-
new WriterRefresher<>(
126-
table,
127-
write,
128-
(newTable, writer) -> {
129-
writer.replace(newTable);
130-
});
131-
} else {
132-
writeRefresher = null;
133-
}
122+
this.writeRefresher = WriterRefresher.create(write.streamingMode(), table, write::replace);
134123
}
135124

136125
@Override

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public abstract class TableWriteOperator<IN> extends PrepareCommitOperator<IN, C
5858
protected transient StoreSinkWriteState state;
5959
protected transient StoreSinkWrite write;
6060

61-
protected transient @Nullable WriterRefresher<StoreSinkWrite> writeRefresher;
61+
protected transient @Nullable WriterRefresher writeRefresher;
6262

6363
public TableWriteOperator(
6464
StreamOperatorParameters<Committable> parameters,
@@ -99,18 +99,7 @@ public void initializeState(StateInitializationContext context) throws Exception
9999
if (writeRestore != null) {
100100
write.setWriteRestore(writeRestore);
101101
}
102-
103-
if (write.streamingMode()) {
104-
writeRefresher =
105-
new WriterRefresher<>(
106-
table,
107-
write,
108-
(newTable, writer) -> {
109-
writer.replace(newTable);
110-
});
111-
} else {
112-
writeRefresher = null;
113-
}
102+
this.writeRefresher = WriterRefresher.create(write.streamingMode(), table, write::replace);
114103
}
115104

116105
public void setWriteRestore(@Nullable WriteRestore writeRestore) {

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WriterRefresher.java

Lines changed: 51 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -23,57 +23,73 @@
2323
import org.apache.paimon.options.Options;
2424
import org.apache.paimon.schema.TableSchema;
2525
import org.apache.paimon.table.FileStoreTable;
26-
import org.apache.paimon.utils.StringUtils;
2726

2827
import org.slf4j.Logger;
2928
import org.slf4j.LoggerFactory;
3029

3130
import javax.annotation.Nullable;
3231

3332
import java.util.Arrays;
33+
import java.util.HashMap;
3434
import java.util.Map;
3535
import java.util.Objects;
3636
import java.util.Optional;
3737
import java.util.Set;
3838
import java.util.stream.Collectors;
3939

40+
import static org.apache.paimon.CoreOptions.DATA_FILE_EXTERNAL_PATHS;
41+
import static org.apache.paimon.CoreOptions.DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS;
42+
import static org.apache.paimon.CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY;
43+
import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;
44+
4045
/** Writer refresher for refresh write when configs changed. */
41-
public class WriterRefresher<T> {
46+
public class WriterRefresher {
4247

4348
private static final Logger LOG = LoggerFactory.getLogger(WriterRefresher.class);
4449

45-
@Nullable private final Set<String> configGroups;
46-
private final Refresher<T> refresher;
4750
private FileStoreTable table;
48-
private T write;
51+
private final Refresher refresher;
52+
private final Set<String> configGroups;
4953

50-
public WriterRefresher(FileStoreTable table, T write, Refresher<T> refresher) {
54+
private WriterRefresher(FileStoreTable table, Refresher refresher, Set<String> configGroups) {
5155
this.table = table;
52-
this.write = write;
5356
this.refresher = refresher;
57+
this.configGroups = configGroups;
58+
}
59+
60+
@Nullable
61+
public static WriterRefresher create(
62+
boolean isStreaming, FileStoreTable table, Refresher refresher) {
63+
if (!isStreaming) {
64+
return null;
65+
}
66+
5467
String refreshDetectors =
5568
Options.fromMap(table.options())
5669
.get(FlinkConnectorOptions.SINK_WRITER_REFRESH_DETECTORS);
57-
if (StringUtils.isNullOrWhitespaceOnly(refreshDetectors)) {
58-
configGroups = null;
59-
} else {
60-
configGroups = Arrays.stream(refreshDetectors.split(",")).collect(Collectors.toSet());
70+
Set<String> configGroups =
71+
isNullOrWhitespaceOnly(refreshDetectors)
72+
? null
73+
: Arrays.stream(refreshDetectors.split(",")).collect(Collectors.toSet());
74+
if (configGroups == null || configGroups.isEmpty()) {
75+
return null;
6176
}
77+
return new WriterRefresher(table, refresher, configGroups);
6278
}
6379

6480
public void tryRefresh() {
65-
if (configGroups == null || configGroups.isEmpty()) {
81+
Optional<TableSchema> latestSchema = table.schemaManager().latest();
82+
if (!latestSchema.isPresent()) {
6683
return;
6784
}
6885

69-
Optional<TableSchema> latestSchema = table.schemaManager().latest();
70-
if (latestSchema.isPresent() && latestSchema.get().id() > table.schema().id()) {
86+
TableSchema latest = latestSchema.get();
87+
if (latest.id() > table.schema().id()) {
7188
try {
7289
Map<String, String> currentOptions =
73-
CoreOptions.fromMap(table.schema().options()).configGroups(configGroups);
90+
configGroups(configGroups, table.coreOptions());
7491
Map<String, String> newOptions =
75-
CoreOptions.fromMap(latestSchema.get().options())
76-
.configGroups(configGroups);
92+
configGroups(configGroups, CoreOptions.fromMap(latest.options()));
7793

7894
if (!Objects.equals(newOptions, currentOptions)) {
7995
if (LOG.isDebugEnabled()) {
@@ -86,20 +102,30 @@ public void tryRefresh() {
86102
newOptions);
87103
}
88104
table = table.copy(newOptions);
89-
refresher.refresh(table, write);
105+
refresher.refresh(table);
90106
}
91107
} catch (Exception e) {
92108
throw new RuntimeException("update write failed.", e);
93109
}
94110
}
95111
}
96112

97-
/**
98-
* Refresher for refresh write when configs changed.
99-
*
100-
* @param <T> the type of writer.
101-
*/
102-
public interface Refresher<T> {
103-
void refresh(FileStoreTable table, T writer) throws Exception;
113+
/** Refresher when configs changed. */
114+
public interface Refresher {
115+
void refresh(FileStoreTable table) throws Exception;
116+
}
117+
118+
public static Map<String, String> configGroups(Set<String> groups, CoreOptions options) {
119+
Map<String, String> configs = new HashMap<>();
120+
// external-paths config group
121+
String externalPaths = "external-paths";
122+
if (groups.contains(externalPaths)) {
123+
configs.put(DATA_FILE_EXTERNAL_PATHS.key(), options.dataFileExternalPaths());
124+
configs.put(
125+
DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(),
126+
options.externalPathStrategy().toString());
127+
configs.put(DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS.key(), options.externalSpecificFS());
128+
}
129+
return configs;
104130
}
105131
}

0 commit comments

Comments
 (0)