Skip to content

Commit 3bcc1e0

Browse files
authored
Merge pull request #498 from tapdata/develop
Develop
2 parents 0ce0450 + 1a537ad commit 3bcc1e0

File tree

120 files changed

+1394
-389
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

120 files changed

+1394
-389
lines changed

connectors-common/connector-core/src/main/java/io/tapdata/kit/DbKit.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,11 @@ public static boolean ignoreCreateIndex(TapIndex exists, TapIndex created) {
157157
}
158158

159159
public static String buildIndexName(String table) {
160-
return "TAPIDX_" + table.substring(Math.max(table.length() - 10, 0)) + UUID.randomUUID().toString().replaceAll("-", "").substring(20);
160+
return "IDX_" + table.substring(Math.max(table.length() - 10, 0)) + UUID.randomUUID().toString().replaceAll("-", "").substring(20);
161+
}
162+
163+
public static String buildForeignKeyName(String table) {
164+
return "FK_" + table.substring(Math.max(table.length() - 10, 0)) + UUID.randomUUID().toString().replaceAll("-", "").substring(20);
161165
}
162166

163167
public static <T> List<List<T>> splitToPieces(List<T> data, int eachPieceSize) {

connectors-common/connector-core/src/main/java/io/tapdata/kit/StringKit.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.util.Collection;
77
import java.util.Iterator;
88
import java.util.List;
9+
import java.util.regex.Matcher;
910
import java.util.regex.Pattern;
1011
import java.util.stream.Collectors;
1112

@@ -190,7 +191,7 @@ public static String removeHeadTail(String str, String remove, Boolean upperCase
190191
if (EmptyKit.isBlank(str) || EmptyKit.isBlank(remove)) {
191192
return str;
192193
}
193-
if (str.startsWith(remove) && str.endsWith(remove) && str.length() > 2 * remove.length()) {
194+
if (str.startsWith(remove) && str.endsWith(remove) && str.length() >= 2 * remove.length()) {
194195
return str.substring(remove.length(), str.length() - remove.length());
195196
}
196197
if (EmptyKit.isNull(upperCase)) {
@@ -342,4 +343,27 @@ public static String trimTailBlank(Object str) {
342343
if (null == str) return null;
343344
return ("_" + str).trim().substring(1);
344345
}
346+
347+
public static String escape(String name, String escapes) {
348+
String res = name;
349+
for (int i = 0; i < escapes.length(); i++) {
350+
char escape = escapes.charAt(i);
351+
res = escape(res, escape);
352+
}
353+
return res;
354+
}
355+
356+
public static String escape(String name, char escape) {
357+
return name.replace(escape + "", "" + escape + escape);
358+
}
359+
360+
private static final Pattern REGEX_SPECIAL_CHARS = Pattern.compile("[\\\\^$|*+?.,()\\[\\]{}]");
361+
362+
public static String escapeRegex(String input) {
363+
if (input == null || input.isEmpty()) {
364+
return input;
365+
}
366+
Matcher matcher = REGEX_SPECIAL_CHARS.matcher(input);
367+
return matcher.replaceAll("\\\\$0");
368+
}
345369
}

connectors-common/debezium-bucket/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,7 @@ private void tableUnlock() throws SQLException {
583583
}
584584

585585
private String quote(String dbOrTableName) {
586-
return "`" + dbOrTableName + "`";
586+
return "`" + dbOrTableName.replace("`", "``") + "`";
587587
}
588588

589589
private String quote(TableId id) {

connectors-common/debezium-bucket/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeRecordEmitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public PostgresChangeRecordEmitter(OffsetContext offset, Clock clock, PostgresCo
6363
this.connectorConfig = connectorConfig;
6464
this.connection = connection;
6565

66-
this.tableId = PostgresSchema.parse(message.getTable());
66+
this.tableId = message.getTableId();
6767
this.unchangedToastColumnMarkerMissing = !connectorConfig.plugin().hasUnchangedToastColumnMarker();
6868
this.nullToastedValuesMissingFromOld = !connectorConfig.plugin().sendsNullToastedValuesInOld();
6969
Objects.requireNonNull(tableId);

connectors-common/debezium-bucket/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ private void refreshToastableColumnsMap(PostgresConnection connection, TableId t
213213
tableIdToToastableColumns.put(tableId, Collections.unmodifiableList(toastableColumns));
214214
}
215215

216-
protected static TableId parse(String table) {
216+
public static TableId parse(String table) {
217217
TableId tableId = TableId.parse(table, false);
218218
if (tableId == null) {
219219
return null;

connectors-common/debezium-bucket/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java

Lines changed: 29 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,8 @@
55
*/
66
package io.debezium.connector.postgresql;
77

8-
import java.sql.SQLException;
9-
import java.util.Map;
10-
import java.util.Objects;
11-
import java.util.concurrent.Executors;
12-
import java.util.concurrent.atomic.AtomicReference;
13-
14-
import org.apache.kafka.connect.errors.ConnectException;
15-
import org.postgresql.core.BaseConnection;
16-
import org.slf4j.Logger;
17-
import org.slf4j.LoggerFactory;
18-
19-
import io.debezium.connector.postgresql.connection.Lsn;
20-
import io.debezium.connector.postgresql.connection.PostgresConnection;
21-
import io.debezium.connector.postgresql.connection.ReplicationConnection;
8+
import io.debezium.connector.postgresql.connection.*;
229
import io.debezium.connector.postgresql.connection.ReplicationMessage.Operation;
23-
import io.debezium.connector.postgresql.connection.ReplicationStream;
24-
import io.debezium.connector.postgresql.connection.WalPositionLocator;
2510
import io.debezium.connector.postgresql.spi.Snapshotter;
2611
import io.debezium.heartbeat.Heartbeat;
2712
import io.debezium.pipeline.ErrorHandler;
@@ -30,9 +15,18 @@
3015
import io.debezium.relational.TableId;
3116
import io.debezium.util.Clock;
3217
import io.debezium.util.DelayStrategy;
18+
import org.apache.kafka.connect.errors.ConnectException;
19+
import org.postgresql.core.BaseConnection;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
23+
import java.sql.SQLException;
24+
import java.util.Map;
25+
import java.util.Objects;
26+
import java.util.concurrent.Executors;
27+
import java.util.concurrent.atomic.AtomicReference;
3328

3429
/**
35-
*
3630
* @author Horia Chiorean (hchiorea@redhat.com), Jiri Pechanec
3731
*/
3832
public class PostgresStreamingChangeEventSource implements StreamingChangeEventSource {
@@ -104,8 +98,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
10498
LOGGER.info("Retrieved latest position from stored offset '{}'", lsn);
10599
walPosition = new WalPositionLocator(offsetContext.lastCommitLsn(), lsn);
106100
replicationStream.compareAndSet(null, replicationConnection.startStreaming(lsn, walPosition));
107-
}
108-
else {
101+
} else {
109102
LOGGER.info("No previous LSN found in Kafka, streaming from the latest xlogpos or flushed LSN...");
110103
walPosition = new WalPositionLocator();
111104
replicationStream.compareAndSet(null, replicationConnection.startStreaming(walPosition));
@@ -133,8 +126,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
133126
if (!isInPreSnapshotCatchUpStreaming()) {
134127
connection.commit();
135128
}
136-
}
137-
catch (Exception e) {
129+
} catch (Exception e) {
138130
LOGGER.info("Commit failed while preparing for reconnect", e);
139131
}
140132
walPosition.enableFiltering();
@@ -145,11 +137,9 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
145137
stream.startKeepAlive(Executors.newSingleThreadExecutor());
146138
}
147139
processMessages(context, stream);
148-
}
149-
catch (Throwable e) {
140+
} catch (Throwable e) {
150141
errorHandler.setProducerThrowable(e);
151-
}
152-
finally {
142+
} finally {
153143
if (replicationConnection != null) {
154144
LOGGER.debug("stopping streaming...");
155145
// stop the keep alive thread, this also shuts down the
@@ -166,8 +156,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
166156
connection.commit();
167157
}
168158
replicationConnection.close();
169-
}
170-
catch (Exception e) {
159+
} catch (Exception e) {
171160
LOGGER.debug("Exception while closing the connection", e);
172161
}
173162
replicationStream.set(null);
@@ -205,8 +194,7 @@ private void processMessages(ChangeEventSourceContext context, final Replication
205194
taskContext.getSlotXmin(connection));
206195
if (message.getOperation() == Operation.BEGIN) {
207196
dispatcher.dispatchTransactionStartedEvent(Long.toString(message.getTransactionId()), offsetContext);
208-
}
209-
else if (message.getOperation() == Operation.COMMIT) {
197+
} else if (message.getOperation() == Operation.COMMIT) {
210198
commitMessage(lsn);
211199
dispatcher.dispatchTransactionCommittedEvent(offsetContext);
212200
}
@@ -216,7 +204,7 @@ else if (message.getOperation() == Operation.COMMIT) {
216204
else {
217205
TableId tableId = null;
218206
if (message.getOperation() != Operation.NOOP) {
219-
tableId = PostgresSchema.parse(message.getTable());
207+
tableId = message.getTableId();
220208
Objects.requireNonNull(tableId);
221209
}
222210

@@ -239,8 +227,7 @@ else if (message.getOperation() == Operation.COMMIT) {
239227

240228
if (receivedMessage) {
241229
noMessageIterations = 0;
242-
}
243-
else {
230+
} else {
244231
if (offsetContext.hasCompletelyProcessedPosition()) {
245232
dispatcher.dispatchHeartbeatEvent(offsetContext);
246233
}
@@ -275,8 +262,7 @@ private void searchWalPosition(ChangeEventSourceContext context, final Replicati
275262

276263
if (receivedMessage) {
277264
noMessageIterations = 0;
278-
}
279-
else {
265+
} else {
280266
noMessageIterations++;
281267
if (noMessageIterations >= THROTTLE_NO_MESSAGE_BEFORE_PAUSE) {
282268
noMessageIterations = 0;
@@ -307,23 +293,21 @@ private void commitMessage(final Lsn lsn) throws SQLException, InterruptedExcept
307293
* The purpose of this method is to detect this situation and log a warning
308294
* every {@link #GROWING_WAL_WARNING_LOG_INTERVAL} filtered events.
309295
*
310-
* @param dispatched
311-
* Whether an event was sent to the broker or not
296+
* @param dispatched Whether an event was sent to the broker or not
312297
*/
313298
private void maybeWarnAboutGrowingWalBacklog(boolean dispatched) {
314299
if (dispatched) {
315300
numberOfEventsSinceLastEventSentOrWalGrowingWarning = 0;
316-
}
317-
else {
301+
} else {
318302
numberOfEventsSinceLastEventSentOrWalGrowingWarning++;
319303
}
320304

321305
if (numberOfEventsSinceLastEventSentOrWalGrowingWarning > GROWING_WAL_WARNING_LOG_INTERVAL && !dispatcher.heartbeatsEnabled()) {
322306
LOGGER.warn("Received {} events which were all filtered out, so no offset could be committed. "
323-
+ "This prevents the replication slot from acknowledging the processed WAL offsets, "
324-
+ "causing a growing backlog of non-removeable WAL segments on the database server. "
325-
+ "Consider to either adjust your filter configuration or enable heartbeat events "
326-
+ "(via the {} option) to avoid this situation.",
307+
+ "This prevents the replication slot from acknowledging the processed WAL offsets, "
308+
+ "causing a growing backlog of non-removeable WAL segments on the database server. "
309+
+ "Consider to either adjust your filter configuration or enable heartbeat events "
310+
+ "(via the {} option) to avoid this situation.",
327311
numberOfEventsSinceLastEventSentOrWalGrowingWarning, Heartbeat.HEARTBEAT_INTERVAL_PROPERTY_NAME);
328312

329313
numberOfEventsSinceLastEventSentOrWalGrowingWarning = 0;
@@ -344,12 +328,10 @@ public void commitOffset(Map<String, ?> offset) {
344328
}
345329
// tell the server the point up to which we've processed data, so it can be free to recycle WAL segments
346330
replicationStream.flushLsn(lsn);
347-
}
348-
else {
331+
} else {
349332
LOGGER.debug("Streaming has already stopped, ignoring commit callback...");
350333
}
351-
}
352-
catch (SQLException e) {
334+
} catch (SQLException e) {
353335
throw new ConnectException(e);
354336
}
355337
}
@@ -358,7 +340,7 @@ public void commitOffset(Map<String, ?> offset) {
358340
* Returns whether the current streaming phase is running a catch up streaming
359341
* phase that runs before a snapshot. This is useful for transaction
360342
* management.
361-
*
343+
* <p>
362344
* During pre-snapshot catch up streaming, we open the snapshot transaction
363345
* early and hold the transaction open throughout the pre snapshot catch up
364346
* streaming phase so that we know where to stop streaming and can start the

connectors-common/debezium-bucket/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationMessage.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import java.time.OffsetTime;
1313
import java.util.List;
1414

15+
import io.debezium.connector.postgresql.PostgresSchema;
16+
import io.debezium.relational.TableId;
1517
import org.postgresql.geometric.PGbox;
1618
import org.postgresql.geometric.PGcircle;
1719
import org.postgresql.geometric.PGline;
@@ -156,6 +158,10 @@ public interface ColumnValue<T> {
156158
*/
157159
public String getTable();
158160

161+
default TableId getTableId() {
162+
return PostgresSchema.parse(getTable());
163+
}
164+
159165
/**
160166
* @return Set of original values of table columns, null for INSERT
161167
*/

connectors-common/debezium-bucket/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66
package io.debezium.connector.postgresql.connection.pgoutput;
77

8+
import java.io.ByteArrayOutputStream;
89
import java.nio.ByteBuffer;
910
import java.nio.charset.Charset;
1011
import java.sql.DatabaseMetaData;
@@ -250,8 +251,8 @@ private void handleCommitMessage(ByteBuffer buffer, ReplicationMessageProcessor
250251
*/
251252
private void handleRelationMessage(ByteBuffer buffer, TypeRegistry typeRegistry) throws SQLException {
252253
int relationId = buffer.getInt();
253-
String schemaName = readString(buffer);
254-
String tableName = readString(buffer);
254+
String schemaName = readStringV2(buffer);
255+
String tableName = readStringV2(buffer);
255256
int replicaIdentityId = buffer.get();
256257
short columnCount = buffer.getShort();
257258

@@ -276,7 +277,7 @@ private void handleRelationMessage(ByteBuffer buffer, TypeRegistry typeRegistry)
276277
Set<String> columnNames = new HashSet<>();
277278
for (short i = 0; i < columnCount; ++i) {
278279
byte flags = buffer.get();
279-
String columnName = Strings.unquoteIdentifierPart(readString(buffer));
280+
String columnName = Strings.unquoteIdentifierPart(readStringV2(buffer));
280281
int columnType = buffer.getInt();
281282
int attypmod = buffer.getInt();
282283

@@ -379,7 +380,7 @@ private void decodeInsert(ByteBuffer buffer, TypeRegistry typeRegistry, Replicat
379380
List<Column> columns = resolveColumnsFromStreamTupleData(buffer, typeRegistry, table);
380381
processor.process(new PgOutputReplicationMessage(
381382
Operation.INSERT,
382-
table.id().toDoubleQuotedString(),
383+
table.id(),
383384
commitTimestamp,
384385
transactionId,
385386
null,
@@ -426,7 +427,7 @@ private void decodeUpdate(ByteBuffer buffer, TypeRegistry typeRegistry, Replicat
426427
List<Column> columns = resolveColumnsFromStreamTupleData(buffer, typeRegistry, table);
427428
processor.process(new PgOutputReplicationMessage(
428429
Operation.UPDATE,
429-
table.id().toDoubleQuotedString(),
430+
table.id(),
430431
commitTimestamp,
431432
transactionId,
432433
oldColumns,
@@ -459,7 +460,7 @@ private void decodeDelete(ByteBuffer buffer, TypeRegistry typeRegistry, Replicat
459460
List<Column> columns = resolveColumnsFromStreamTupleData(buffer, typeRegistry, table);
460461
processor.process(new PgOutputReplicationMessage(
461462
Operation.DELETE,
462-
table.id().toDoubleQuotedString(),
463+
table.id(),
463464
commitTimestamp,
464465
transactionId,
465466
columns,
@@ -512,7 +513,7 @@ private void decodeTruncate(ByteBuffer buffer, TypeRegistry typeRegistry, Replic
512513
boolean lastTableInTruncate = (i + 1) == noOfResolvedTables;
513514
processor.process(new PgOutputTruncateReplicationMessage(
514515
Operation.TRUNCATE,
515-
table.id().toDoubleQuotedString(),
516+
table.id(),
516517
commitTimestamp,
517518
transactionId,
518519
lastTableInTruncate));
@@ -596,6 +597,15 @@ private static String readString(ByteBuffer buffer) {
596597
return sb.toString();
597598
}
598599

600+
private static String readStringV2(ByteBuffer buffer) {
601+
ByteArrayOutputStream buf = new ByteArrayOutputStream();
602+
byte b = 0;
603+
while ((b = buffer.get()) != 0) {
604+
buf.write(b);
605+
}
606+
return buf.toString();
607+
}
608+
599609
/**
600610
* Reads the replication stream where the column stream specifies a length followed by the value.
601611
*

0 commit comments

Comments
 (0)