Skip to content

Commit 1a19e3c

Browse files
committed
[flink] Make PostponeBucketSink no state and no intended failure
1 parent 0e02a39 commit 1a19e3c

File tree

11 files changed

+212
-88
lines changed

11 files changed

+212
-88
lines changed

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,6 @@ public DataStreamSink<?> sinkFrom(
177177

178178
protected CommittableStateManager<WrappedManifestCommittable> createCommittableStateManager() {
179179
return new RestoreAndFailCommittableStateManager<>(
180-
WrappedManifestCommittableSerializer::new);
180+
WrappedManifestCommittableSerializer::new, true);
181181
}
182182
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendTableSink.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
*/
4949
public abstract class AppendTableSink<T> extends FlinkWriteSink<T> {
5050

51+
private static final long serialVersionUID = 1L;
52+
5153
protected final FileStoreTable table;
5254
protected final LogSinkFunction logSinkFunction;
5355

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ protected DataStreamSink<?> buildForFixedBucket(DataStream<InternalRow> input) {
293293
private DataStreamSink<?> buildPostponeBucketSink(DataStream<InternalRow> input) {
294294
DataStream<InternalRow> partitioned =
295295
partition(input, new PostponeBucketChannelComputer(table.schema()), parallelism);
296-
FixedBucketSink sink = new FixedBucketSink(table, overwritePartition, null);
296+
PostponeBucketSink sink = new PostponeBucketSink(table, overwritePartition);
297297
return sink.sinkFrom(partitioned);
298298
}
299299

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,17 @@
1818

1919
package org.apache.paimon.flink.sink;
2020

21+
import org.apache.paimon.data.InternalRow;
2122
import org.apache.paimon.manifest.ManifestCommittable;
2223
import org.apache.paimon.manifest.ManifestCommittableSerializer;
2324
import org.apache.paimon.options.Options;
2425
import org.apache.paimon.table.FileStoreTable;
2526

27+
import org.apache.flink.runtime.state.StateInitializationContext;
28+
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
29+
import org.apache.flink.streaming.api.operators.StreamOperator;
30+
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
31+
2632
import javax.annotation.Nullable;
2733

2834
import java.util.Map;
@@ -63,4 +69,48 @@ protected CommittableStateManager<ManifestCommittable> createCommittableStateMan
6369
ManifestCommittableSerializer::new,
6470
options.get(PARTITION_MARK_DONE_RECOVER_FROM_STATE));
6571
}
72+
73+
protected static OneInputStreamOperatorFactory<InternalRow, Committable>
74+
createNoStateRowWriteOperatorFactory(
75+
FileStoreTable table,
76+
LogSinkFunction logSinkFunction,
77+
StoreSinkWrite.Provider writeProvider,
78+
String commitUser) {
79+
return new RowDataStoreWriteOperator.Factory(
80+
table, logSinkFunction, writeProvider, commitUser) {
81+
@Override
82+
@SuppressWarnings("unchecked, rawtypes")
83+
public StreamOperator createStreamOperator(StreamOperatorParameters parameters) {
84+
return new RowDataStoreWriteOperator(
85+
parameters, table, logSinkFunction, writeProvider, commitUser) {
86+
87+
@Override
88+
protected StoreSinkWriteState createState(
89+
int subtaskId,
90+
StateInitializationContext context,
91+
StoreSinkWriteState.StateValueFilter stateFilter) {
92+
// No conflicts will occur in append only unaware bucket writer, so no state
93+
// is needed.
94+
return new NoopStoreSinkWriteState(subtaskId, stateFilter);
95+
}
96+
97+
@Override
98+
protected String getCommitUser(StateInitializationContext context)
99+
throws Exception {
100+
// No conflicts will occur in append only unaware bucket writer, so
101+
// commitUser does not matter.
102+
return commitUser;
103+
}
104+
};
105+
}
106+
};
107+
}
108+
109+
protected static CommittableStateManager<ManifestCommittable>
110+
createRestoreOnlyCommittableStateManager(FileStoreTable table) {
111+
Options options = table.coreOptions().toConfiguration();
112+
return new RestoreCommittableStateManager<>(
113+
ManifestCommittableSerializer::new,
114+
options.get(PARTITION_MARK_DONE_RECOVER_FROM_STATE));
115+
}
66116
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink.sink;
20+
21+
import org.apache.paimon.data.InternalRow;
22+
import org.apache.paimon.manifest.ManifestCommittable;
23+
import org.apache.paimon.table.FileStoreTable;
24+
25+
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
26+
27+
import javax.annotation.Nullable;
28+
29+
import java.util.Map;
30+
31+
/** {@link FlinkSink} for writing records into fixed bucket Paimon table. */
32+
public class PostponeBucketSink extends FlinkWriteSink<InternalRow> {
33+
34+
private static final long serialVersionUID = 1L;
35+
36+
public PostponeBucketSink(
37+
FileStoreTable table, @Nullable Map<String, String> overwritePartition) {
38+
super(table, overwritePartition);
39+
}
40+
41+
@Override
42+
protected OneInputStreamOperatorFactory<InternalRow, Committable> createWriteOperatorFactory(
43+
StoreSinkWrite.Provider writeProvider, String commitUser) {
44+
return createNoStateRowWriteOperatorFactory(table, null, writeProvider, commitUser);
45+
}
46+
47+
@Override
48+
protected CommittableStateManager<ManifestCommittable> createCommittableStateManager() {
49+
return createRestoreOnlyCommittableStateManager(table);
50+
}
51+
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java

Lines changed: 5 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,9 @@
1919
package org.apache.paimon.flink.sink;
2020

2121
import org.apache.paimon.data.serializer.VersionedSerializer;
22-
import org.apache.paimon.flink.VersionedSerializerWrapper;
2322
import org.apache.paimon.manifest.ManifestCommittable;
2423
import org.apache.paimon.utils.SerializableSupplier;
2524

26-
import org.apache.flink.api.common.state.ListState;
27-
import org.apache.flink.api.common.state.ListStateDescriptor;
28-
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
29-
import org.apache.flink.runtime.state.StateInitializationContext;
30-
import org.apache.flink.runtime.state.StateSnapshotContext;
31-
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
32-
33-
import java.util.ArrayList;
3425
import java.util.List;
3526

3627
/**
@@ -44,64 +35,27 @@
4435
* store writers.
4536
*/
4637
public class RestoreAndFailCommittableStateManager<GlobalCommitT>
47-
implements CommittableStateManager<GlobalCommitT> {
38+
extends RestoreCommittableStateManager<GlobalCommitT> {
4839

4940
private static final long serialVersionUID = 1L;
5041

51-
/** The committable's serializer. */
52-
private final SerializableSupplier<VersionedSerializer<GlobalCommitT>> committableSerializer;
53-
54-
private final boolean partitionMarkDoneRecoverFromState;
55-
56-
/** GlobalCommitT state of this job. Used to filter out previous successful commits. */
57-
private ListState<GlobalCommitT> streamingCommitterState;
58-
59-
public RestoreAndFailCommittableStateManager(
60-
SerializableSupplier<VersionedSerializer<GlobalCommitT>> committableSerializer) {
61-
this(committableSerializer, true);
62-
}
63-
6442
public RestoreAndFailCommittableStateManager(
6543
SerializableSupplier<VersionedSerializer<GlobalCommitT>> committableSerializer,
6644
boolean partitionMarkDoneRecoverFromState) {
67-
this.committableSerializer = committableSerializer;
68-
this.partitionMarkDoneRecoverFromState = partitionMarkDoneRecoverFromState;
45+
super(committableSerializer, partitionMarkDoneRecoverFromState);
6946
}
7047

7148
@Override
72-
public void initializeState(
73-
StateInitializationContext context, Committer<?, GlobalCommitT> committer)
49+
protected int recover(List<GlobalCommitT> committables, Committer<?, GlobalCommitT> committer)
7450
throws Exception {
75-
streamingCommitterState =
76-
new SimpleVersionedListState<>(
77-
context.getOperatorStateStore()
78-
.getListState(
79-
new ListStateDescriptor<>(
80-
"streaming_committer_raw_states",
81-
BytePrimitiveArraySerializer.INSTANCE)),
82-
new VersionedSerializerWrapper<>(committableSerializer.get()));
83-
List<GlobalCommitT> restored = new ArrayList<>();
84-
streamingCommitterState.get().forEach(restored::add);
85-
streamingCommitterState.clear();
86-
recover(restored, committer);
87-
}
88-
89-
private void recover(List<GlobalCommitT> committables, Committer<?, GlobalCommitT> committer)
90-
throws Exception {
91-
int numCommitted =
92-
committer.filterAndCommit(committables, true, partitionMarkDoneRecoverFromState);
51+
int numCommitted = super.recover(committables, committer);
9352
if (numCommitted > 0) {
9453
throw new RuntimeException(
9554
"This exception is intentionally thrown "
9655
+ "after committing the restored checkpoints. "
9756
+ "By restarting the job we hope that "
9857
+ "writers can start writing based on these new commits.");
9958
}
100-
}
101-
102-
@Override
103-
public void snapshotState(StateSnapshotContext context, List<GlobalCommitT> committables)
104-
throws Exception {
105-
streamingCommitterState.update(committables);
59+
return numCommitted;
10660
}
10761
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink.sink;
20+
21+
import org.apache.paimon.data.serializer.VersionedSerializer;
22+
import org.apache.paimon.flink.VersionedSerializerWrapper;
23+
import org.apache.paimon.manifest.ManifestCommittable;
24+
import org.apache.paimon.utils.SerializableSupplier;
25+
26+
import org.apache.flink.api.common.state.ListState;
27+
import org.apache.flink.api.common.state.ListStateDescriptor;
28+
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
29+
import org.apache.flink.runtime.state.StateInitializationContext;
30+
import org.apache.flink.runtime.state.StateSnapshotContext;
31+
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
32+
33+
import java.util.ArrayList;
34+
import java.util.List;
35+
36+
/**
37+
* A {@link CommittableStateManager} which stores uncommitted {@link ManifestCommittable}s in state.
38+
*
39+
* <p>When the job restarts, these {@link ManifestCommittable}s will be restored and committed.
40+
*/
41+
public class RestoreCommittableStateManager<GlobalCommitT>
42+
implements CommittableStateManager<GlobalCommitT> {
43+
44+
private static final long serialVersionUID = 1L;
45+
46+
/** The committable's serializer. */
47+
private final SerializableSupplier<VersionedSerializer<GlobalCommitT>> committableSerializer;
48+
49+
private final boolean partitionMarkDoneRecoverFromState;
50+
51+
/** GlobalCommitT state of this job. Used to filter out previous successful commits. */
52+
private ListState<GlobalCommitT> streamingCommitterState;
53+
54+
public RestoreCommittableStateManager(
55+
SerializableSupplier<VersionedSerializer<GlobalCommitT>> committableSerializer,
56+
boolean partitionMarkDoneRecoverFromState) {
57+
this.committableSerializer = committableSerializer;
58+
this.partitionMarkDoneRecoverFromState = partitionMarkDoneRecoverFromState;
59+
}
60+
61+
@Override
62+
public void initializeState(
63+
StateInitializationContext context, Committer<?, GlobalCommitT> committer)
64+
throws Exception {
65+
streamingCommitterState =
66+
new SimpleVersionedListState<>(
67+
context.getOperatorStateStore()
68+
.getListState(
69+
new ListStateDescriptor<>(
70+
"streaming_committer_raw_states",
71+
BytePrimitiveArraySerializer.INSTANCE)),
72+
new VersionedSerializerWrapper<>(committableSerializer.get()));
73+
List<GlobalCommitT> restored = new ArrayList<>();
74+
streamingCommitterState.get().forEach(restored::add);
75+
streamingCommitterState.clear();
76+
recover(restored, committer);
77+
}
78+
79+
protected int recover(List<GlobalCommitT> committables, Committer<?, GlobalCommitT> committer)
80+
throws Exception {
81+
return committer.filterAndCommit(committables, true, partitionMarkDoneRecoverFromState);
82+
}
83+
84+
@Override
85+
public void snapshotState(StateSnapshotContext context, List<GlobalCommitT> committables)
86+
throws Exception {
87+
streamingCommitterState.update(committables);
88+
}
89+
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowAppendTableSink.java

Lines changed: 9 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,18 @@
1919
package org.apache.paimon.flink.sink;
2020

2121
import org.apache.paimon.data.InternalRow;
22+
import org.apache.paimon.manifest.ManifestCommittable;
2223
import org.apache.paimon.table.FileStoreTable;
2324

24-
import org.apache.flink.runtime.state.StateInitializationContext;
2525
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
26-
import org.apache.flink.streaming.api.operators.StreamOperator;
27-
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
2826

2927
import java.util.Map;
3028

3129
/** An {@link AppendTableSink} which handles {@link InternalRow}. */
3230
public class RowAppendTableSink extends AppendTableSink<InternalRow> {
3331

32+
private static final long serialVersionUID = 1L;
33+
3434
public RowAppendTableSink(
3535
FileStoreTable table,
3636
Map<String, String> overwritePartitions,
@@ -42,34 +42,12 @@ public RowAppendTableSink(
4242
@Override
4343
protected OneInputStreamOperatorFactory<InternalRow, Committable> createWriteOperatorFactory(
4444
StoreSinkWrite.Provider writeProvider, String commitUser) {
45-
return new RowDataStoreWriteOperator.Factory(
46-
table, logSinkFunction, writeProvider, commitUser) {
47-
@Override
48-
@SuppressWarnings("unchecked, rawtypes")
49-
public StreamOperator createStreamOperator(StreamOperatorParameters parameters) {
50-
return new RowDataStoreWriteOperator(
51-
parameters, table, logSinkFunction, writeProvider, commitUser) {
52-
53-
@Override
54-
protected StoreSinkWriteState createState(
55-
int subtaskId,
56-
StateInitializationContext context,
57-
StoreSinkWriteState.StateValueFilter stateFilter)
58-
throws Exception {
59-
// No conflicts will occur in append only unaware bucket writer, so no state
60-
// is needed.
61-
return new NoopStoreSinkWriteState(subtaskId, stateFilter);
62-
}
45+
return createNoStateRowWriteOperatorFactory(
46+
table, logSinkFunction, writeProvider, commitUser);
47+
}
6348

64-
@Override
65-
protected String getCommitUser(StateInitializationContext context)
66-
throws Exception {
67-
// No conflicts will occur in append only unaware bucket writer, so
68-
// commitUser does not matter.
69-
return commitUser;
70-
}
71-
};
72-
}
73-
};
49+
@Override
50+
protected CommittableStateManager<ManifestCommittable> createCommittableStateManager() {
51+
return createRestoreOnlyCommittableStateManager(table);
7452
}
7553
}

0 commit comments

Comments
 (0)