Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
import org.apache.flink.api.connector.sink2.SupportsWriterState;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
Expand All @@ -33,30 +37,35 @@
import org.apache.paimon.options.Options;
import org.apache.paimon.table.sink.CommitMessageSerializer;

import java.util.Collection;
import java.util.UUID;

/**
* A {@link Sink} for Paimon. Maintain this package until Paimon has it own sinkV2 implementation.
*/
public class PaimonSink<InputT> implements WithPreCommitTopology<InputT, MultiTableCommittable> {
public class PaimonSink<InputT>
implements WithPreCommitTopology<InputT, MultiTableCommittable>,
SupportsWriterState<InputT, PaimonWriterState> {

// provided a default commit user.
public static final String DEFAULT_COMMIT_USER = "admin";

protected final Options catalogOptions;

/** The commitUser should be restored in state and restore it in writer. */
protected final String commitUser;

private final PaimonRecordSerializer<InputT> serializer;

public PaimonSink(Options catalogOptions, PaimonRecordSerializer<InputT> serializer) {
this.catalogOptions = catalogOptions;
this.serializer = serializer;
commitUser = DEFAULT_COMMIT_USER;
this(catalogOptions, DEFAULT_COMMIT_USER, serializer);
}

public PaimonSink(
Options catalogOptions, String commitUser, PaimonRecordSerializer<InputT> serializer) {
this.catalogOptions = catalogOptions;
this.commitUser = commitUser;
// generate a random commit user to avoid conflict.
this.commitUser = commitUser + UUID.randomUUID();
this.serializer = serializer;
}

Expand All @@ -69,6 +78,22 @@ public PaimonWriter<InputT> createWriter(InitContext context) {
catalogOptions, context.metricGroup(), commitUser, serializer, lastCheckpointId);
}

@Override
public StatefulSinkWriter<InputT, PaimonWriterState> restoreWriter(
WriterInitContext context, Collection<PaimonWriterState> paimonWriterStates) {
long lastCheckpointId =
context.getRestoredCheckpointId()
.orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1);
Preconditions.checkNotNull(paimonWriterStates);
String storedCommitUser = paimonWriterStates.iterator().next().getCommitUser();
return new PaimonWriter<>(
catalogOptions,
context.metricGroup(),
storedCommitUser,
serializer,
lastCheckpointId);
}

@Override
public Committer<MultiTableCommittable> createCommitter() {
return new PaimonCommitter(catalogOptions, commitUser);
Expand Down Expand Up @@ -100,4 +125,9 @@ public DataStream<CommittableMessage<MultiTableCommittable>> addPreCommitTopolog
new PreCommitOperator(catalogOptions, commitUser))
.setParallelism(committables.getParallelism());
}

@Override
public SimpleVersionedSerializer<PaimonWriterState> getWriterStateSerializer() {
return PaimonWriterStateSerializer.INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.cdc.connectors.paimon.sink.v2;

import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.metrics.MetricGroup;
Expand All @@ -42,6 +43,7 @@

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -51,7 +53,8 @@

/** A {@link Sink} to write {@link DataChangeEvent} to Paimon storage. */
public class PaimonWriter<InputT>
implements TwoPhaseCommittingSink.PrecommittingSinkWriter<InputT, MultiTableCommittable> {
implements TwoPhaseCommittingSink.PrecommittingSinkWriter<InputT, MultiTableCommittable>,
StatefulSinkWriter<InputT, PaimonWriterState> {

private static final Logger LOG = LoggerFactory.getLogger(PaimonWriter.class);

Expand All @@ -75,6 +78,8 @@ public class PaimonWriter<InputT>
/** A workaround variable trace the checkpointId in {@link StreamOperator#snapshotState}. */
private long lastCheckpointId;

private final PaimonWriterState stateCache;

public PaimonWriter(
Options catalogOptions,
MetricGroup metricGroup,
Expand All @@ -93,6 +98,11 @@ public PaimonWriter(
Thread.currentThread().getName() + "-CdcMultiWrite-Compaction"));
this.serializer = serializer;
this.lastCheckpointId = lastCheckpointId;
this.stateCache = new PaimonWriterState(commitUser);
LOG.info(
"Created PaimonWriter with commit user {} and identifier {}",
commitUser,
lastCheckpointId);
}

@Override
Expand Down Expand Up @@ -214,4 +224,9 @@ public void close() throws Exception {
compactExecutor.shutdownNow();
}
}

@Override
public List<PaimonWriterState> snapshotState(long checkpointId) {
return Collections.singletonList(stateCache);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.paimon.sink.v2;

/** The state of {@link PaimonWriter}. */
public class PaimonWriterState {

public static final int VERSION = 0;

/**
* The commit user of {@link PaimonWriter}.
*
* <p>Note: Introduced from version 0.
*/
private final String commitUser;

private transient byte[] serializedBytesCache;

public PaimonWriterState(String commitUser) {
this.commitUser = commitUser;
}

public String getCommitUser() {
return commitUser;
}

public byte[] getSerializedBytesCache() {
return serializedBytesCache;
}

public void setSerializedBytesCache(byte[] serializedBytesCache) {
this.serializedBytesCache = serializedBytesCache;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.paimon.sink.v2;

import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;

import java.io.IOException;

/** A {@link SimpleVersionedSerializer} for {@link PaimonWriterState}. */
public class PaimonWriterStateSerializer implements SimpleVersionedSerializer<PaimonWriterState> {

public static final PaimonWriterStateSerializer INSTANCE = new PaimonWriterStateSerializer();

@Override
public int getVersion() {
return PaimonWriterState.VERSION;
}

@Override
public byte[] serialize(PaimonWriterState paimonWriterState) throws IOException {
if (paimonWriterState.getSerializedBytesCache() != null) {
return paimonWriterState.getSerializedBytesCache();
} else {
final DataOutputSerializer out = new DataOutputSerializer(64);
out.writeUTF(paimonWriterState.getCommitUser());
byte[] serializedBytesCache = out.getCopyOfBuffer();
paimonWriterState.setSerializedBytesCache(serializedBytesCache);
return serializedBytesCache;
}
}

@Override
public PaimonWriterState deserialize(int version, byte[] serialized) throws IOException {
final DataInputDeserializer in = new DataInputDeserializer(serialized);
if (version == 0) {
String commitUser = in.readUTF();
return new PaimonWriterState(commitUser);
}
throw new IOException("Unknown version: " + version);
}
}
Loading