Skip to content

Commit 79eb9ab

Browse files
committed
Add end test for Doris/OceanBase back.
1 parent b0670ae commit 79eb9ab

6 files changed

Lines changed: 326 additions & 5 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.doris.flink.sink.batch;
19+
20+
import org.apache.flink.annotation.PublicEvolving;
21+
import org.apache.flink.annotation.VisibleForTesting;
22+
import org.apache.flink.api.connector.sink2.InitContextAdapter;
23+
import org.apache.flink.api.connector.sink2.Sink;
24+
import org.apache.flink.api.connector.sink2.SinkWriter;
25+
import org.apache.flink.api.connector.sink2.WriterInitContext;
26+
import org.apache.flink.util.Preconditions;
27+
28+
import org.apache.doris.flink.cfg.DorisExecutionOptions;
29+
import org.apache.doris.flink.cfg.DorisOptions;
30+
import org.apache.doris.flink.cfg.DorisReadOptions;
31+
import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
32+
33+
import java.io.IOException;
34+
35+
/**
36+
* Copy from <a
37+
* href="https://github.com/apache/doris-flink-connector/blob/25.1.0/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java">DorisBatchSink</a>
38+
* to add {@link #createWriter(WriterInitContext)}} method.
39+
*/
40+
@Deprecated
41+
@PublicEvolving
42+
public class DorisBatchSink<IN> implements Sink<IN> {
43+
private final DorisOptions dorisOptions;
44+
private final DorisReadOptions dorisReadOptions;
45+
private final DorisExecutionOptions dorisExecutionOptions;
46+
private final DorisRecordSerializer<IN> serializer;
47+
48+
public DorisBatchSink(
49+
DorisOptions dorisOptions,
50+
DorisReadOptions dorisReadOptions,
51+
DorisExecutionOptions dorisExecutionOptions,
52+
DorisRecordSerializer<IN> serializer) {
53+
this.dorisOptions = dorisOptions;
54+
this.dorisReadOptions = dorisReadOptions;
55+
this.dorisExecutionOptions = dorisExecutionOptions;
56+
this.serializer = serializer;
57+
}
58+
59+
@Override
60+
public SinkWriter<IN> createWriter(InitContext initContext) throws IOException {
61+
DorisBatchWriter<IN> dorisBatchWriter =
62+
new DorisBatchWriter<IN>(
63+
initContext,
64+
serializer,
65+
dorisOptions,
66+
dorisReadOptions,
67+
dorisExecutionOptions);
68+
return dorisBatchWriter;
69+
}
70+
71+
@Override
72+
public SinkWriter<IN> createWriter(WriterInitContext context) throws IOException {
73+
DorisBatchWriter<IN> dorisBatchWriter =
74+
new DorisBatchWriter<IN>(
75+
new InitContextAdapter(context),
76+
serializer,
77+
dorisOptions,
78+
dorisReadOptions,
79+
dorisExecutionOptions);
80+
return dorisBatchWriter;
81+
}
82+
83+
public static <IN> DorisBatchSink.Builder<IN> builder() {
84+
return new DorisBatchSink.Builder<>();
85+
}
86+
87+
/**
88+
* build for DorisBatchSink.
89+
*
90+
* @param <IN> record type.
91+
*/
92+
public static class Builder<IN> {
93+
private DorisOptions dorisOptions;
94+
private DorisReadOptions dorisReadOptions;
95+
private DorisExecutionOptions dorisExecutionOptions;
96+
private DorisRecordSerializer<IN> serializer;
97+
98+
public DorisBatchSink.Builder<IN> setDorisOptions(DorisOptions dorisOptions) {
99+
this.dorisOptions = dorisOptions;
100+
return this;
101+
}
102+
103+
public DorisBatchSink.Builder<IN> setDorisReadOptions(DorisReadOptions dorisReadOptions) {
104+
this.dorisReadOptions = dorisReadOptions;
105+
return this;
106+
}
107+
108+
public DorisBatchSink.Builder<IN> setDorisExecutionOptions(
109+
DorisExecutionOptions dorisExecutionOptions) {
110+
this.dorisExecutionOptions = dorisExecutionOptions;
111+
return this;
112+
}
113+
114+
public DorisBatchSink.Builder<IN> setSerializer(DorisRecordSerializer<IN> serializer) {
115+
this.serializer = serializer;
116+
return this;
117+
}
118+
119+
public DorisBatchSink<IN> build() {
120+
Preconditions.checkNotNull(dorisOptions);
121+
Preconditions.checkNotNull(dorisExecutionOptions);
122+
Preconditions.checkNotNull(serializer);
123+
if (dorisReadOptions == null) {
124+
dorisReadOptions = DorisReadOptions.builder().build();
125+
}
126+
return new DorisBatchSink<>(
127+
dorisOptions, dorisReadOptions, dorisExecutionOptions, serializer);
128+
}
129+
}
130+
131+
@VisibleForTesting
132+
public DorisReadOptions getDorisReadOptions() {
133+
return dorisReadOptions;
134+
}
135+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.oceanbase.connector.flink.sink;
19+
20+
import org.apache.flink.api.common.typeutils.TypeSerializer;
21+
import org.apache.flink.api.connector.sink2.InitContextAdapter;
22+
import org.apache.flink.api.connector.sink2.Sink;
23+
import org.apache.flink.api.connector.sink2.SinkWriter;
24+
import org.apache.flink.api.connector.sink2.WriterInitContext;
25+
26+
import com.oceanbase.connector.flink.ConnectorOptions;
27+
import com.oceanbase.connector.flink.table.DataChangeRecord;
28+
import com.oceanbase.connector.flink.table.RecordSerializationSchema;
29+
30+
import java.io.IOException;
31+
32+
/**
33+
* Copy from <a
34+
* href="https://github.com/oceanbase/flink-connector-oceanbase/blob/v1.2/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/sink/OceanBaseSink.java">OceanBaseSink</a>
35+
* to add {@link #createWriter(WriterInitContext)} method.
36+
*/
37+
public class OceanBaseSink<T> implements Sink<T> {
38+
39+
private static final long serialVersionUID = 1L;
40+
41+
private final ConnectorOptions options;
42+
private final TypeSerializer<T> typeSerializer;
43+
private final RecordSerializationSchema<T> recordSerializer;
44+
private final DataChangeRecord.KeyExtractor keyExtractor;
45+
private final RecordFlusher recordFlusher;
46+
private final OceanBaseWriterEvent.Listener writerEventListener;
47+
48+
public OceanBaseSink(
49+
ConnectorOptions options,
50+
TypeSerializer<T> typeSerializer,
51+
RecordSerializationSchema<T> recordSerializer,
52+
DataChangeRecord.KeyExtractor keyExtractor,
53+
RecordFlusher recordFlusher) {
54+
this(options, typeSerializer, recordSerializer, keyExtractor, recordFlusher, null);
55+
}
56+
57+
public OceanBaseSink(
58+
ConnectorOptions options,
59+
TypeSerializer<T> typeSerializer,
60+
RecordSerializationSchema<T> recordSerializer,
61+
DataChangeRecord.KeyExtractor keyExtractor,
62+
RecordFlusher recordFlusher,
63+
OceanBaseWriterEvent.Listener writerEventListener) {
64+
this.options = options;
65+
this.typeSerializer = typeSerializer;
66+
this.recordSerializer = recordSerializer;
67+
this.keyExtractor = keyExtractor;
68+
this.recordFlusher = recordFlusher;
69+
this.writerEventListener = writerEventListener;
70+
}
71+
72+
@Override
73+
public SinkWriter<T> createWriter(InitContext context) {
74+
return new OceanBaseWriter<>(
75+
options,
76+
context,
77+
typeSerializer,
78+
recordSerializer,
79+
keyExtractor,
80+
recordFlusher,
81+
writerEventListener);
82+
}
83+
84+
@Override
85+
public SinkWriter<T> createWriter(WriterInitContext writerInitContext) throws IOException {
86+
return new OceanBaseWriter<>(
87+
options,
88+
new InitContextAdapter(writerInitContext),
89+
typeSerializer,
90+
recordSerializer,
91+
keyExtractor,
92+
recordFlusher,
93+
writerEventListener);
94+
}
95+
}

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.junit.jupiter.api.BeforeAll;
2828
import org.junit.jupiter.api.BeforeEach;
2929
import org.junit.jupiter.api.Test;
30-
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
3130
import org.slf4j.Logger;
3231
import org.slf4j.LoggerFactory;
3332
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
@@ -50,7 +49,6 @@
5049
import java.util.stream.Stream;
5150

5251
/** End-to-end tests for mysql cdc to Doris pipeline job. */
53-
@EnabledIfSystemProperty(named = "specifiedFlinkVersion", matches = "^1.*")
5452
class MySqlToDorisE2eITCase extends PipelineTestEnvironment {
5553
private static final Logger LOG = LoggerFactory.getLogger(MySqlToDorisE2eITCase.class);
5654

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OceanBaseE2eITCase.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.junit.jupiter.api.AfterEach;
3030
import org.junit.jupiter.api.BeforeEach;
3131
import org.junit.jupiter.api.Test;
32-
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
3332
import org.slf4j.Logger;
3433
import org.slf4j.LoggerFactory;
3534
import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -52,7 +51,6 @@
5251
import static org.assertj.core.api.Assertions.assertThat;
5352

5453
/** OceanBase flink cdc pipeline connector sink integrate test. */
55-
@EnabledIfSystemProperty(named = "specifiedFlinkVersion", matches = "^1.*")
5654
class OceanBaseE2eITCase extends PipelineTestEnvironment {
5755
private static final Logger LOG = LoggerFactory.getLogger(OceanBaseE2eITCase.class);
5856

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.api.connector.sink2;
19+
20+
import org.apache.flink.annotation.Internal;
21+
import org.apache.flink.api.common.JobInfo;
22+
import org.apache.flink.api.common.TaskInfo;
23+
import org.apache.flink.api.common.operators.MailboxExecutor;
24+
import org.apache.flink.api.common.operators.ProcessingTimeService;
25+
import org.apache.flink.api.common.serialization.SerializationSchema;
26+
import org.apache.flink.api.common.typeutils.TypeSerializer;
27+
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
28+
import org.apache.flink.util.UserCodeClassLoader;
29+
30+
import java.util.OptionalLong;
31+
32+
/**
33+
* Compatibility adapter for Flink 2.2. This class is part of the multi-version compatibility layer
34+
* that allows Flink CDC to work across different Flink versions.
35+
*/
36+
@Internal
37+
public class InitContextAdapter implements Sink.InitContext {
38+
private final WriterInitContext context;
39+
40+
public InitContextAdapter(WriterInitContext context) {
41+
this.context = context;
42+
}
43+
44+
public UserCodeClassLoader getUserCodeClassLoader() {
45+
return this.context.getUserCodeClassLoader();
46+
}
47+
48+
public MailboxExecutor getMailboxExecutor() {
49+
return this.context.getMailboxExecutor();
50+
}
51+
52+
public ProcessingTimeService getProcessingTimeService() {
53+
return this.context.getProcessingTimeService();
54+
}
55+
56+
public int getSubtaskId() {
57+
return this.context.getTaskInfo().getIndexOfThisSubtask();
58+
}
59+
60+
public int getNumberOfParallelSubtasks() {
61+
return this.context.getTaskInfo().getNumberOfParallelSubtasks();
62+
}
63+
64+
public SinkWriterMetricGroup metricGroup() {
65+
return this.context.metricGroup();
66+
}
67+
68+
public OptionalLong getRestoredCheckpointId() {
69+
return this.context.getRestoredCheckpointId();
70+
}
71+
72+
@Override
73+
public JobInfo getJobInfo() {
74+
return this.context.getJobInfo();
75+
}
76+
77+
@Override
78+
public TaskInfo getTaskInfo() {
79+
return this.context.getTaskInfo();
80+
}
81+
82+
public SerializationSchema.InitializationContext asSerializationSchemaInitializationContext() {
83+
return this.context.asSerializationSchemaInitializationContext();
84+
}
85+
86+
@Override
87+
public boolean isObjectReuseEnabled() {
88+
return this.context.isObjectReuseEnabled();
89+
}
90+
91+
@Override
92+
public <IN> TypeSerializer<IN> createInputSerializer() {
93+
return this.context.createInputSerializer();
94+
}
95+
}

flink-cdc-flink2-compat/src/main/java/org/apache/flink/api/connector/sink2/InitContextAdapter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
* that allows Flink CDC to work across different Flink versions.
3535
*/
3636
@Internal
37-
class InitContextAdapter implements Sink.InitContext {
37+
public class InitContextAdapter implements Sink.InitContext {
3838
private final WriterInitContext context;
3939

4040
public InitContextAdapter(WriterInitContext context) {

0 commit comments

Comments
 (0)