Skip to content

Commit 5e6f8c1

Browse files
committed
[Feature][Connector-V2] Add tail file source
1 parent e9ab5d7 commit 5e6f8c1

File tree

34 files changed

+2388
-31
lines changed

34 files changed

+2388
-31
lines changed

.github/workflows/labeler/label-scope-conf.yml

+7-1
Original file line numberDiff line numberDiff line change
@@ -310,4 +310,10 @@ aerospike:
310310
- all:
311311
- changed-files:
312312
- any-glob-to-any-file: seatunnel-connectors-v2/connector-aerospike/**
313-
- all-globs-to-all-files: '!seatunnel-connectors-v2/connector-!(aerospike)/**'
313+
- all-globs-to-all-files: '!seatunnel-connectors-v2/connector-!(aerospike)/**'
314+
315+
tailfile:
316+
- all:
317+
- changed-files:
318+
- any-glob-to-any-file: seatunnel-connectors-v2/connector-tailfile/**
319+
- all-globs-to-all-files: '!seatunnel-connectors-v2/connector-!(tailfile)/**'

config/plugin_config

+1
Original file line numberDiff line numberDiff line change
@@ -93,3 +93,4 @@ connector-sls
9393
connector-qdrant
9494
connector-typesense
9595
connector-cdc-opengauss
96+
connector-tailfile

plugin-mapping.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ seatunnel.source.Typesense = connector-typesense
142142
seatunnel.sink.Typesense = connector-typesense
143143
seatunnel.source.Opengauss-CDC = connector-cdc-opengauss
144144
seatunnel.sink.Aerospike = connector-aerospike
145-
145+
seatunnel.source.tailfile = connector-tailfile
146146
seatunnel.transform.Sql = seatunnel-transforms-v2
147147
seatunnel.transform.FieldMapper = seatunnel-transforms-v2
148148
seatunnel.transform.Filter = seatunnel-transforms-v2

seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java

+5
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ public interface SourceSplitEnumerator<SplitT extends SourceSplit, StateT>
4141
/** The method is executed by the engine only once. */
4242
void run() throws Exception;
4343

44+
default boolean runStep() throws Exception {
45+
run();
46+
return false;
47+
}
48+
4449
/**
4550
* Called to close the enumerator, in case it holds on to any resources, like threads or network
4651
* connections.

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/JdbcSourceFetchTaskContext.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
2828
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
2929
import org.apache.seatunnel.connectors.cdc.debezium.ConnectTableChangeSerializer;
30+
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumSchemaNameAdjuster;
3031
import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
3132

3233
import org.apache.kafka.connect.data.SchemaAndValue;
@@ -71,7 +72,7 @@ public JdbcSourceFetchTaskContext(
7172
this.sourceConfig = sourceConfig;
7273
this.dataSourceDialect = dataSourceDialect;
7374
this.dbzConnectorConfig = sourceConfig.getDbzConnectorConfig();
74-
this.schemaNameAdjuster = SchemaNameAdjuster.create();
75+
this.schemaNameAdjuster = DebeziumSchemaNameAdjuster.create();
7576
this.jsonConverter = new JsonConverter();
7677
jsonConverter.configure(Collections.singletonMap("schemas.enable", true), false);
7778
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/ConnectTableChangeSerializer.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@
6363
public class ConnectTableChangeSerializer
6464
implements TableChanges.TableChangesSerializer<List<Struct>>, Serializable {
6565
private static final String ENUM_VALUES_KEY = "enumValues";
66-
private static final SchemaNameAdjuster SCHEMA_NAME_ADJUSTER = SchemaNameAdjuster.create();
66+
private static final SchemaNameAdjuster SCHEMA_NAME_ADJUSTER =
67+
DebeziumSchemaNameAdjuster.create();
6768

6869
private static final Schema COLUMN_SCHEMA =
6970
SchemaBuilder.struct()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.cdc.debezium;
19+
20+
import org.apache.kafka.connect.errors.ConnectException;
21+
22+
import io.debezium.util.SchemaNameAdjuster;
23+
import lombok.extern.slf4j.Slf4j;
24+
25+
@Slf4j
26+
public class DebeziumSchemaNameAdjuster {
27+
private static final SchemaNameAdjuster.ReplacementOccurred HANDLER =
28+
(original, replacement, conflictsWith) -> {
29+
if (conflictsWith != null) {
30+
String msg =
31+
"The Kafka Connect schema name '"
32+
+ original
33+
+ "' is not a valid Avro schema name and its replacement '"
34+
+ replacement
35+
+ "' conflicts with another different schema '"
36+
+ conflictsWith
37+
+ "'";
38+
log.error(msg);
39+
throw new ConnectException(msg);
40+
} else {
41+
log.warn(
42+
"The Kafka Connect schema name '{}' is not a valid Avro schema name, so replacing with '{}'",
43+
original,
44+
replacement);
45+
}
46+
};
47+
48+
private static final SchemaNameAdjuster.ReplacementFunction REPLACE_CHAR_HANDLER =
49+
invalid -> {
50+
// Support for Chinese characters
51+
if (Character.isIdeographic(invalid)) {
52+
return String.valueOf(invalid);
53+
}
54+
return "_";
55+
};
56+
57+
public static SchemaNameAdjuster create() {
58+
return (original) ->
59+
SchemaNameAdjuster.validFullname(
60+
original, REPLACE_CHAR_HANDLER, HANDLER.firstTimeOnly());
61+
}
62+
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,10 @@ public LogMinerStreamingChangeEventSource(
122122
this.schema = schema;
123123
this.connectorConfig = connectorConfig;
124124
this.strategy = connectorConfig.getLogMiningStrategy();
125-
this.isContinuousMining = connectorConfig.isContinuousMining();
125+
this.isContinuousMining =
126+
jdbcConnection.getOracleVersion().getMajor() >= 19
127+
? false
128+
: connectorConfig.isContinuousMining();
126129
this.errorHandler = errorHandler;
127130
this.streamingMetrics = streamingMetrics;
128131
this.jdbcConfiguration = JdbcConfiguration.adapt(jdbcConfig);

seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/io/debezium/connector/postgresql/PostgresObjectUtils.java

+1-5
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package io.debezium.connector.postgresql;
1919

20-
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
21-
2220
import org.apache.kafka.connect.errors.ConnectException;
2321

2422
import io.debezium.connector.postgresql.connection.PostgresConnection;
@@ -32,8 +30,6 @@
3230
import java.sql.SQLException;
3331
import java.time.Duration;
3432

35-
import static org.apache.seatunnel.connectors.seatunnel.cdc.postgres.exception.PostgresConnectorErrorCode.CREATE_REPLICATION_CONNECTION_FAILED;
36-
3733
/**
3834
* A factory for creating various Debezium objects
3935
*
@@ -118,6 +114,6 @@ public static ReplicationConnection createReplicationConnection(
118114
}
119115
}
120116
}
121-
throw new SeaTunnelRuntimeException(CREATE_REPLICATION_CONNECTION_FAILED, "" + taskContext);
117+
throw new RuntimeException("Failed to create replication connection" + taskContext);
122118
}
123119
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -253,8 +253,10 @@ public PostgresOffsetContext load(Map<String, ?> offset) {
253253
final Long txId = readOptionalLong(offset, SourceInfo.TXID_KEY);
254254

255255
final Instant useconds =
256-
Conversions.toInstantFromMicros(
257-
(Long) offset.get(SourceInfo.TIMESTAMP_USEC_KEY));
256+
offset.get(SourceInfo.TIMESTAMP_USEC_KEY) != null
257+
? Conversions.toInstantFromMicros(
258+
(Long) offset.get(SourceInfo.TIMESTAMP_USEC_KEY))
259+
: Clock.system().currentTimeAsInstant();
258260
final boolean snapshot =
259261
(boolean)
260262
((Map<String, Object>) offset)

0 commit comments

Comments
 (0)