Skip to content

Commit 417b1b8

Browse files
committed
[Feature][Connector-V2] Refactoring to have per-connector version management of Debezium
Signed-off-by: vsantonastaso <vsantonastaso.dev@gmail.com>
1 parent 06d2d28 commit 417b1b8

37 files changed

Lines changed: 2696 additions & 69 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.cdc.base.debezium;
19+
20+
import java.util.Collection;
21+
22+
/**
23+
* SPI for connector-specific Debezium implementations. Loaded via {@link java.util.ServiceLoader}.
24+
*/
25+
public interface DebeziumAdapter {
26+
27+
DebeziumEventDispatcher createEventDispatcher(DebeziumEventDispatcherConfig config);
28+
29+
DebeziumSchemaHistory createSchemaHistory(
30+
String instanceName, Collection<TableChangeInfo> tableChanges);
31+
32+
DebeziumTopicNaming createTopicNaming(String logicalName, String heartbeatPrefix);
33+
34+
String getDebeziumVersion();
35+
36+
boolean supports(String connectorType);
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.cdc.base.debezium;
19+
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
23+
import java.util.Map;
24+
import java.util.ServiceLoader;
25+
import java.util.concurrent.ConcurrentHashMap;
26+
27+
/** Factory for loading connector-specific Debezium adapters via ServiceLoader. */
28+
public class DebeziumAdapterFactory {
29+
30+
private static final Logger LOG = LoggerFactory.getLogger(DebeziumAdapterFactory.class);
31+
32+
private static final Map<String, DebeziumAdapter> ADAPTERS = new ConcurrentHashMap<>();
33+
34+
public static DebeziumAdapter getAdapter(String connectorType) {
35+
return ADAPTERS.computeIfAbsent(
36+
connectorType,
37+
type -> {
38+
LOG.info("Loading DebeziumAdapter for connector type: {}", type);
39+
ServiceLoader<DebeziumAdapter> loader =
40+
ServiceLoader.load(DebeziumAdapter.class);
41+
42+
for (DebeziumAdapter adapter : loader) {
43+
if (adapter.supports(type)) {
44+
LOG.info(
45+
"Found DebeziumAdapter for {}: {} (Debezium version: {})",
46+
type,
47+
adapter.getClass().getName(),
48+
adapter.getDebeziumVersion());
49+
return adapter;
50+
}
51+
}
52+
53+
throw new IllegalStateException(
54+
"No DebeziumAdapter found for connector type: "
55+
+ type
56+
+ ". Ensure META-INF/services configuration is present.");
57+
});
58+
}
59+
60+
public static void clearCache() {
61+
ADAPTERS.clear();
62+
}
63+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.cdc.base.debezium;
19+
20+
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
21+
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkKind;
22+
23+
import java.util.Map;
24+
25+
/** @param <P> Partition type */
26+
public interface DebeziumEventDispatcher<P> {
27+
28+
void dispatchWatermarkEvent(
29+
Map<String, ?> sourcePartition,
30+
String splitId,
31+
WatermarkKind watermarkKind,
32+
Offset offset)
33+
throws InterruptedException;
34+
35+
Object getQueue();
36+
37+
String getPrimaryTopic();
38+
39+
void close();
40+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.cdc.base.debezium;
19+
20+
/** Configuration for creating event dispatchers. */
21+
public class DebeziumEventDispatcherConfig {
22+
23+
private final Object connectorConfig;
24+
private final DebeziumTopicNaming<?> topicNaming;
25+
private final Object databaseSchema;
26+
private final Object queue;
27+
private final Object dataCollectionFilter;
28+
private final Object changeEventCreator;
29+
private final Object metadataProvider;
30+
private final Object heartbeatFactory;
31+
private final Object schemaNameAdjuster;
32+
33+
private DebeziumEventDispatcherConfig(Builder builder) {
34+
this.connectorConfig = builder.connectorConfig;
35+
this.topicNaming = builder.topicNaming;
36+
this.databaseSchema = builder.databaseSchema;
37+
this.queue = builder.queue;
38+
this.dataCollectionFilter = builder.dataCollectionFilter;
39+
this.changeEventCreator = builder.changeEventCreator;
40+
this.metadataProvider = builder.metadataProvider;
41+
this.heartbeatFactory = builder.heartbeatFactory;
42+
this.schemaNameAdjuster = builder.schemaNameAdjuster;
43+
}
44+
45+
public Object getConnectorConfig() {
46+
return connectorConfig;
47+
}
48+
49+
public DebeziumTopicNaming<?> getTopicNaming() {
50+
return topicNaming;
51+
}
52+
53+
public Object getDatabaseSchema() {
54+
return databaseSchema;
55+
}
56+
57+
public Object getQueue() {
58+
return queue;
59+
}
60+
61+
public Object getDataCollectionFilter() {
62+
return dataCollectionFilter;
63+
}
64+
65+
public Object getChangeEventCreator() {
66+
return changeEventCreator;
67+
}
68+
69+
public Object getMetadataProvider() {
70+
return metadataProvider;
71+
}
72+
73+
public Object getHeartbeatFactory() {
74+
return heartbeatFactory;
75+
}
76+
77+
public Object getSchemaNameAdjuster() {
78+
return schemaNameAdjuster;
79+
}
80+
81+
public static Builder builder() {
82+
return new Builder();
83+
}
84+
85+
/** Builder for DebeziumEventDispatcherConfig */
86+
public static class Builder {
87+
private Object connectorConfig;
88+
private DebeziumTopicNaming<?> topicNaming;
89+
private Object databaseSchema;
90+
private Object queue;
91+
private Object dataCollectionFilter;
92+
private Object changeEventCreator;
93+
private Object metadataProvider;
94+
private Object heartbeatFactory;
95+
private Object schemaNameAdjuster;
96+
97+
public Builder connectorConfig(Object connectorConfig) {
98+
this.connectorConfig = connectorConfig;
99+
return this;
100+
}
101+
102+
public Builder topicNaming(DebeziumTopicNaming<?> topicNaming) {
103+
this.topicNaming = topicNaming;
104+
return this;
105+
}
106+
107+
public Builder databaseSchema(Object databaseSchema) {
108+
this.databaseSchema = databaseSchema;
109+
return this;
110+
}
111+
112+
public Builder queue(Object queue) {
113+
this.queue = queue;
114+
return this;
115+
}
116+
117+
public Builder dataCollectionFilter(Object dataCollectionFilter) {
118+
this.dataCollectionFilter = dataCollectionFilter;
119+
return this;
120+
}
121+
122+
public Builder changeEventCreator(Object changeEventCreator) {
123+
this.changeEventCreator = changeEventCreator;
124+
return this;
125+
}
126+
127+
public Builder metadataProvider(Object metadataProvider) {
128+
this.metadataProvider = metadataProvider;
129+
return this;
130+
}
131+
132+
public Builder heartbeatFactory(Object heartbeatFactory) {
133+
this.heartbeatFactory = heartbeatFactory;
134+
return this;
135+
}
136+
137+
public Builder schemaNameAdjuster(Object schemaNameAdjuster) {
138+
this.schemaNameAdjuster = schemaNameAdjuster;
139+
return this;
140+
}
141+
142+
public DebeziumEventDispatcherConfig build() {
143+
return new DebeziumEventDispatcherConfig(this);
144+
}
145+
}
146+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.cdc.base.debezium;
19+
20+
import java.util.Collection;
21+
import java.util.Map;
22+
23+
public interface DebeziumSchemaHistory {
24+
25+
void registerHistory(String instanceName, Collection<TableChangeInfo> changes);
26+
27+
Collection<TableChangeInfo> removeHistory(String instanceName);
28+
29+
void configure(Map<String, ?> config);
30+
31+
void start();
32+
33+
void stop();
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.cdc.base.debezium;
19+
20+
/** @param <T> Table identifier type */
21+
public interface DebeziumTopicNaming<T> {
22+
23+
String getPrimaryTopic();
24+
25+
String getHeartbeatTopic();
26+
27+
String dataChangeTopicName(T tableId);
28+
}

0 commit comments

Comments
 (0)