Core batch stream load writer shared across Flink versions. + * + *
Version-specific modules should wrap this class (for example via DorisBatchWriterAdapter and
+ * implement the appropriate Flink sink interfaces there.
+ */
+public class DorisBatchWriter This is the core implementation that is shared across different Flink versions.
+ * Version-specific modules should wrap this class and implement the appropriate Flink sink
+ * interfaces there (for example via DorisWriterAdapter).
+ *
+ * @param This class adapts Flink 1.x's {@link Sink.InitContext} to the version-neutral core writer in
+ * the base module while implementing {@link DorisAbstractWriter} for use by the 1.x connector
+ * module.
+ */
+public class DorisWriterAdapter This class adapts Flink 2.x's {@link WriterInitContext} to the version-neutral core writer in
+ * the base module while implementing {@link DorisAbstractWriter} for use by the 2.x connector
+ * module.
+ */
+public class DorisWriterAdapter Because the table source requires a decoding format, we are discovering the format using the
+ * provided {@link FactoryUtil} for convenience.
+ */
+public final class DorisDynamicTableFactory
+ implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER; // used for matching to `connector = '...'`
+ }
+
+ @Override
+ public Set>
+ implements ResultTypeQueryable
> {
+
+ private static final Logger logger = LoggerFactory.getLogger(DorisSourceFunction.class);
+
+ private final DorisDeserializationSchema
> deserializer;
+ private final DorisOptions options;
+ private final DorisReadOptions readOptions;
+ private transient volatile boolean isRunning;
+ private List
> deserializer) {
+ this.deserializer = deserializer;
+ this.options = streamOptions.getOptions();
+ this.readOptions = streamOptions.getReadOptions();
+ try {
+ this.dorisPartitions = RestService.findPartitions(options, readOptions, logger);
+ logger.info("Doris partitions size {}", dorisPartitions.size());
+ } catch (DorisException e) {
+ throw new RuntimeException("Failed fetch doris partitions");
+ }
+ }
+
+ @Override
+ public void open(OpenContext parameters) throws Exception {
+ super.open(parameters);
+ this.isRunning = true;
+ assignTaskPartitions();
+ }
+
+ /** Assign partitions to each task. */
+ private void assignTaskPartitions() {
+ int taskIndex = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
+ int totalTasks = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
+
+ for (int i = 0; i < dorisPartitions.size(); i++) {
+ if (i % totalTasks == taskIndex) {
+ taskDorisPartitions.add(dorisPartitions.get(i));
+ }
+ }
+ logger.info("subtask {} process {} partitions ", taskIndex, taskDorisPartitions.size());
+ }
+
+ @Override
+ public void run(SourceContext
> sourceContext) {
+ for (PartitionDefinition partitions : taskDorisPartitions) {
+ try (DorisValueReader valueReader =
+ new DorisValueReader(partitions, options, readOptions)) {
+ while (isRunning && valueReader.hasNext()) {
+ List> next = valueReader.next();
+ sourceContext.collect(next);
+ }
+ } catch (Exception e) {
+ logger.error("close reader resource failed,", e);
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ isRunning = false;
+ }
+
+ @Override
+ public TypeInformation
> getProducedType() {
+ return this.deserializer.getProducedType();
+ }
+}
diff --git a/flink-doris-connector/flink-doris-connector-flink2/src/main/java/org/apache/doris/flink/sink/DorisSink.java b/flink-doris-connector/flink-doris-connector-flink2/src/main/java/org/apache/doris/flink/sink/DorisSink.java
new file mode 100644
index 000000000..fe1e920ff
--- /dev/null
+++ b/flink-doris-connector/flink-doris-connector-flink2/src/main/java/org/apache/doris/flink/sink/DorisSink.java
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.CommitterInitContext;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SupportsCommitter;
+import org.apache.flink.api.connector.sink2.SupportsWriterState;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.sink.batch.DorisBatchWriterAdapter;
+import org.apache.doris.flink.sink.committer.DorisCommitter;
+import org.apache.doris.flink.sink.copy.CopyCommittableSerializer;
+import org.apache.doris.flink.sink.copy.DorisCopyCommitter;
+import org.apache.doris.flink.sink.copy.DorisCopyWriterAdapter;
+import org.apache.doris.flink.sink.writer.DorisAbstractWriter;
+import org.apache.doris.flink.sink.writer.DorisWriter;
+import org.apache.doris.flink.sink.writer.DorisWriterAdapter;
+import org.apache.doris.flink.sink.writer.DorisWriterState;
+import org.apache.doris.flink.sink.writer.DorisWriterStateSerializer;
+import org.apache.doris.flink.sink.writer.WriteMode;
+import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Load data into Doris based on 2PC. see {@link DorisWriter} and {@link DorisCommitter}.
+ *
+ * @param