Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,21 @@
</dependencyManagement>

<dependencies>
<!-- Debezium dependencies -->
<!-- Debezium dependencies with 'provided' scope - connectors supply the actual version -->
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.cdc.base.debezium;

import java.util.Collection;

/**
* SPI for connector-specific Debezium implementations. Loaded via {@link java.util.ServiceLoader}.
*/
public interface DebeziumAdapter {

DebeziumEventDispatcher createEventDispatcher(DebeziumEventDispatcherConfig config);

DebeziumSchemaHistory createSchemaHistory(
String instanceName, Collection<TableChangeInfo> tableChanges);

DebeziumTopicNaming createTopicNaming(String logicalName, String heartbeatPrefix);

String getDebeziumVersion();

boolean supports(String connectorType);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.cdc.base.debezium;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;

/** Factory for loading connector-specific Debezium adapters via ServiceLoader. */
public class DebeziumAdapterFactory {

private static final Logger LOG = LoggerFactory.getLogger(DebeziumAdapterFactory.class);

private static final Map<String, DebeziumAdapter> ADAPTERS = new ConcurrentHashMap<>();

public static DebeziumAdapter getAdapter(String connectorType, ClassLoader classLoader) {
return ADAPTERS.computeIfAbsent(
connectorType,
type -> {
LOG.info("Loading DebeziumAdapter for connector type: {}", type);
ServiceLoader<DebeziumAdapter> loader =
ServiceLoader.load(DebeziumAdapter.class, classLoader);

for (DebeziumAdapter adapter : loader) {
if (adapter.supports(type)) {
LOG.info(
"Found DebeziumAdapter for {}: {} (Debezium version: {})",
type,
adapter.getClass().getName(),
adapter.getDebeziumVersion());
return adapter;
}
}

throw new IllegalStateException(
"No DebeziumAdapter found for connector type: "
+ type
+ ". Ensure META-INF/services configuration is present.");
});
}

public static void clearCache() {
ADAPTERS.clear();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.cdc.base.debezium;

import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkKind;

import java.util.Map;

/** @param <P> Partition type */
public interface DebeziumEventDispatcher<P> {

void dispatchWatermarkEvent(
Map<String, ?> sourcePartition,
String splitId,
WatermarkKind watermarkKind,
Offset offset)
throws InterruptedException;

Object getQueue();

String getPrimaryTopic();

void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.cdc.base.debezium;

/** Configuration for creating event dispatchers. */
public class DebeziumEventDispatcherConfig {

private final Object connectorConfig;
private final DebeziumTopicNaming<?> topicNaming;
private final Object databaseSchema;
private final Object queue;
private final Object dataCollectionFilter;
private final Object changeEventCreator;
private final Object metadataProvider;
private final Object heartbeatFactory;
private final Object schemaNameAdjuster;

private DebeziumEventDispatcherConfig(Builder builder) {
this.connectorConfig = builder.connectorConfig;
this.topicNaming = builder.topicNaming;
this.databaseSchema = builder.databaseSchema;
this.queue = builder.queue;
this.dataCollectionFilter = builder.dataCollectionFilter;
this.changeEventCreator = builder.changeEventCreator;
this.metadataProvider = builder.metadataProvider;
this.heartbeatFactory = builder.heartbeatFactory;
this.schemaNameAdjuster = builder.schemaNameAdjuster;
}

public Object getConnectorConfig() {
return connectorConfig;
}

public DebeziumTopicNaming<?> getTopicNaming() {
return topicNaming;
}

public Object getDatabaseSchema() {
return databaseSchema;
}

public Object getQueue() {
return queue;
}

public Object getDataCollectionFilter() {
return dataCollectionFilter;
}

public Object getChangeEventCreator() {
return changeEventCreator;
}

public Object getMetadataProvider() {
return metadataProvider;
}

public Object getHeartbeatFactory() {
return heartbeatFactory;
}

public Object getSchemaNameAdjuster() {
return schemaNameAdjuster;
}

public static Builder builder() {
return new Builder();
}

/** Builder for DebeziumEventDispatcherConfig */
public static class Builder {
private Object connectorConfig;
private DebeziumTopicNaming<?> topicNaming;
private Object databaseSchema;
private Object queue;
private Object dataCollectionFilter;
private Object changeEventCreator;
private Object metadataProvider;
private Object heartbeatFactory;
private Object schemaNameAdjuster;

public Builder connectorConfig(Object connectorConfig) {
this.connectorConfig = connectorConfig;
return this;
}

public Builder topicNaming(DebeziumTopicNaming<?> topicNaming) {
this.topicNaming = topicNaming;
return this;
}

public Builder databaseSchema(Object databaseSchema) {
this.databaseSchema = databaseSchema;
return this;
}

public Builder queue(Object queue) {
this.queue = queue;
return this;
}

public Builder dataCollectionFilter(Object dataCollectionFilter) {
this.dataCollectionFilter = dataCollectionFilter;
return this;
}

public Builder changeEventCreator(Object changeEventCreator) {
this.changeEventCreator = changeEventCreator;
return this;
}

public Builder metadataProvider(Object metadataProvider) {
this.metadataProvider = metadataProvider;
return this;
}

public Builder heartbeatFactory(Object heartbeatFactory) {
this.heartbeatFactory = heartbeatFactory;
return this;
}

public Builder schemaNameAdjuster(Object schemaNameAdjuster) {
this.schemaNameAdjuster = schemaNameAdjuster;
return this;
}

public DebeziumEventDispatcherConfig build() {
return new DebeziumEventDispatcherConfig(this);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.cdc.base.debezium;

import java.util.Collection;
import java.util.Map;

public interface DebeziumSchemaHistory {

void registerHistory(String instanceName, Collection<TableChangeInfo> changes);

Collection<TableChangeInfo> removeHistory(String instanceName);

void configure(Map<String, ?> config);

void start();

void stop();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.cdc.base.debezium;

/** @param <T> Table identifier type */
public interface DebeziumTopicNaming<T> {

String getPrimaryTopic();

String getHeartbeatTopic();

String dataChangeTopicName(T tableId);
}
Loading
Loading