Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
5d41e97
feat: pg add table owner
jarad0628 Feb 6, 2025
f5d657b
fix: sybase=>pg default number
jarad0628 Feb 8, 2025
66f0e94
fix: sybase=>pg constraint
jarad0628 Feb 10, 2025
d683356
Merge branch 'develop' into TAP-5735-migrate-HA-LIS-foreign-key-const…
jarad0628 Feb 11, 2025
628398b
feat: add autoInc default constraint for SQL Server
jarad0628 Feb 11, 2025
bda907f
Merge branch 'develop' into TAP-5735-migrate-HA-LIS-foreign-key-const…
jarad0628 Feb 11, 2025
06f3a9d
feat: upgrade tapdata-cli to runner-1.12
jarad0628 Feb 12, 2025
8c1b39d
feat: writeRecord ignore constraint for pg
jarad0628 Feb 12, 2025
d4b6b85
fix: escape single quote for comment
jarad0628 Feb 12, 2025
9bd3c69
fix: add TapConstraintException
jarad0628 Feb 13, 2025
3d725e2
fix: postgres support auto increment field
jarad0628 Feb 13, 2025
f155b1b
fix: drop table cascade
jarad0628 Feb 13, 2025
00c8a44
feat: mysql support autoInc
jarad0628 Feb 14, 2025
845feba
feat: format sybase discover schema
jarad0628 Feb 15, 2025
1cb5cb0
fix: sybase remove comment for table and column
jarad0628 Feb 15, 2025
f935da2
feat: mysql support load defaultFunction foreign key
jarad0628 Feb 15, 2025
20abe01
fix: format default function
jarad0628 Feb 15, 2025
a8a18e3
feat: mysql oracle postgres mssql support TapJson and TapXml
jarad0628 Feb 16, 2025
c5f5e8b
feat: mysql as target support default constraint auto increment
jarad0628 Feb 17, 2025
7aa6728
feat: sybase as target support default value and autoInc
jarad0628 Feb 18, 2025
b823f08
feat: change api for sybase which cannot disable foreign key
jarad0628 Feb 19, 2025
aab00b1
feat: change api for sybase which cannot disable foreign key
jarad0628 Feb 19, 2025
d756c7f
fix: cache value and table owner hint and sybase default object
jarad0628 Feb 19, 2025
4be4a80
fix: PG default uuid
jarad0628 Feb 19, 2025
fbd4af5
fix(pg):fix pg queryByAdvanceFilter when value is null failed
ljvv7 Feb 19, 2025
c9a30aa
fix: common build key and value
jarad0628 Feb 20, 2025
84bf374
fix: common build key and value
jarad0628 Feb 20, 2025
a93157a
fix: sybase default value
jarad0628 Feb 20, 2025
9a7fea8
fix: sybase default value
jarad0628 Feb 20, 2025
270b154
fix: sybase default value "0000"
jarad0628 Feb 20, 2025
2268388
fix: SET session_replication_role
jarad0628 Feb 20, 2025
7d68ce7
feat:TAP-5812 无法禁用外键的目标数据源支持导出外键约束
weiliang110100 Feb 21, 2025
7ba04e2
fix: replica write in with no permission
jarad0628 Feb 21, 2025
e20eded
Merge remote-tracking branch 'origin/TAP-5735-migrate-HA-LIS-foreign-…
jarad0628 Feb 21, 2025
4e2337f
feat: Tap-5836 set triggers off for sybase
jarad0628 Feb 25, 2025
3df2472
fix:TAP-5547 mongodb shard key
weiliang110100 Feb 25, 2025
cfdb114
fix: rollback catch statement exception
jarad0628 Feb 27, 2025
fe957c9
fix: postgres=>sybase bug
jarad0628 Feb 28, 2025
9732e58
fix: no pk hash nullable
jarad0628 Feb 28, 2025
48358ba
fix: sybase support remove column and support delete
jarad0628 Feb 28, 2025
62a5cd1
fix: sybase support remove column and support delete
jarad0628 Feb 28, 2025
ed412f8
fix: sybase support remove column and support delete
jarad0628 Feb 28, 2025
a4834e2
Merge pull request #490 from tapdata/TAP-5895
jarad0628 Mar 4, 2025
f4f98ab
fix: greenplum npe with postgres version
jarad0628 Mar 4, 2025
4fee0ac
fix: postgres and mysql with specific letters
jarad0628 Mar 4, 2025
bd30f02
Merge pull request #484 from tapdata/TAP-5547-fix_mongodb_shard_key
jarad0628 Mar 4, 2025
eab969c
Merge branch 'develop' into TAP-5735-migrate-HA-LIS-foreign-key-const…
jarad0628 Mar 4, 2025
0c9da21
fix: mysql default value and auto increment and foreign key
jarad0628 Mar 5, 2025
8e5a196
fix: postgres with lower version has no pg_sequence
jarad0628 Mar 5, 2025
423e5d8
fix(connector):add TapBinaryValue type
ljvv7 Mar 6, 2025
07e33b1
Merge pull request #492 from tapdata/feat-dev-binary
jarad0628 Mar 6, 2025
5e1d03d
feat: add createConstraint for sybase
jarad0628 Mar 7, 2025
e089238
fix: TAP-5859 fix dws replace blank
mnianqi Mar 7, 2025
a4939f5
Merge pull request #493 from tapdata/TAP-5859
jarad0628 Mar 7, 2025
5ae1f54
feat: hash read retry for each
jarad0628 Mar 7, 2025
349c18c
fix: TAP-5941 doris create table failed with default value
mnianqi Mar 12, 2025
c9d8fa2
fix: TAP-5941 doris create table failed with default value
mnianqi Mar 13, 2025
72d0d0c
feat: postgres support sequence
jarad0628 Mar 13, 2025
57d992e
fix: postgres as target with specific letters
jarad0628 Mar 16, 2025
26d8532
Merge pull request #495 from tapdata/TAP-5941
jarad0628 Mar 16, 2025
1a537ad
Merge branch 'main' into develop
jarad0628 Mar 16, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,11 @@ public static boolean ignoreCreateIndex(TapIndex exists, TapIndex created) {
}

public static String buildIndexName(String table) {
return "TAPIDX_" + table.substring(Math.max(table.length() - 10, 0)) + UUID.randomUUID().toString().replaceAll("-", "").substring(20);
return "IDX_" + table.substring(Math.max(table.length() - 10, 0)) + UUID.randomUUID().toString().replaceAll("-", "").substring(20);
}

public static String buildForeignKeyName(String table) {
return "FK_" + table.substring(Math.max(table.length() - 10, 0)) + UUID.randomUUID().toString().replaceAll("-", "").substring(20);
}

public static <T> List<List<T>> splitToPieces(List<T> data, int eachPieceSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -190,7 +191,7 @@ public static String removeHeadTail(String str, String remove, Boolean upperCase
if (EmptyKit.isBlank(str) || EmptyKit.isBlank(remove)) {
return str;
}
if (str.startsWith(remove) && str.endsWith(remove) && str.length() > 2 * remove.length()) {
if (str.startsWith(remove) && str.endsWith(remove) && str.length() >= 2 * remove.length()) {
return str.substring(remove.length(), str.length() - remove.length());
}
if (EmptyKit.isNull(upperCase)) {
Expand Down Expand Up @@ -342,4 +343,27 @@ public static String trimTailBlank(Object str) {
if (null == str) return null;
return ("_" + str).trim().substring(1);
}

public static String escape(String name, String escapes) {
String res = name;
for (int i = 0; i < escapes.length(); i++) {
char escape = escapes.charAt(i);
res = escape(res, escape);
}
return res;
}

public static String escape(String name, char escape) {
return name.replace(escape + "", "" + escape + escape);
}

private static final Pattern REGEX_SPECIAL_CHARS = Pattern.compile("[\\\\^$|*+?.,()\\[\\]{}]");

public static String escapeRegex(String input) {
if (input == null || input.isEmpty()) {
return input;
}
Matcher matcher = REGEX_SPECIAL_CHARS.matcher(input);
return matcher.replaceAll("\\\\$0");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ private void tableUnlock() throws SQLException {
}

private String quote(String dbOrTableName) {
return "`" + dbOrTableName + "`";
return "`" + dbOrTableName.replace("`", "``") + "`";
}

private String quote(TableId id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public PostgresChangeRecordEmitter(OffsetContext offset, Clock clock, PostgresCo
this.connectorConfig = connectorConfig;
this.connection = connection;

this.tableId = PostgresSchema.parse(message.getTable());
this.tableId = message.getTableId();
this.unchangedToastColumnMarkerMissing = !connectorConfig.plugin().hasUnchangedToastColumnMarker();
this.nullToastedValuesMissingFromOld = !connectorConfig.plugin().sendsNullToastedValuesInOld();
Objects.requireNonNull(tableId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ private void refreshToastableColumnsMap(PostgresConnection connection, TableId t
tableIdToToastableColumns.put(tableId, Collections.unmodifiableList(toastableColumns));
}

protected static TableId parse(String table) {
public static TableId parse(String table) {
TableId tableId = TableId.parse(table, false);
if (tableId == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,8 @@
*/
package io.debezium.connector.postgresql;

import java.sql.SQLException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.core.BaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.connector.postgresql.connection.Lsn;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.connection.*;
import io.debezium.connector.postgresql.connection.ReplicationMessage.Operation;
import io.debezium.connector.postgresql.connection.ReplicationStream;
import io.debezium.connector.postgresql.connection.WalPositionLocator;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.pipeline.ErrorHandler;
Expand All @@ -30,9 +15,18 @@
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.DelayStrategy;
import org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.core.BaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.SQLException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

/**
*
* @author Horia Chiorean (hchiorea@redhat.com), Jiri Pechanec
*/
public class PostgresStreamingChangeEventSource implements StreamingChangeEventSource {
Expand Down Expand Up @@ -104,8 +98,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
LOGGER.info("Retrieved latest position from stored offset '{}'", lsn);
walPosition = new WalPositionLocator(offsetContext.lastCommitLsn(), lsn);
replicationStream.compareAndSet(null, replicationConnection.startStreaming(lsn, walPosition));
}
else {
} else {
LOGGER.info("No previous LSN found in Kafka, streaming from the latest xlogpos or flushed LSN...");
walPosition = new WalPositionLocator();
replicationStream.compareAndSet(null, replicationConnection.startStreaming(walPosition));
Expand Down Expand Up @@ -133,8 +126,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
if (!isInPreSnapshotCatchUpStreaming()) {
connection.commit();
}
}
catch (Exception e) {
} catch (Exception e) {
LOGGER.info("Commit failed while preparing for reconnect", e);
}
walPosition.enableFiltering();
Expand All @@ -145,11 +137,9 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
stream.startKeepAlive(Executors.newSingleThreadExecutor());
}
processMessages(context, stream);
}
catch (Throwable e) {
} catch (Throwable e) {
errorHandler.setProducerThrowable(e);
}
finally {
} finally {
if (replicationConnection != null) {
LOGGER.debug("stopping streaming...");
// stop the keep alive thread, this also shuts down the
Expand All @@ -166,8 +156,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
connection.commit();
}
replicationConnection.close();
}
catch (Exception e) {
} catch (Exception e) {
LOGGER.debug("Exception while closing the connection", e);
}
replicationStream.set(null);
Expand Down Expand Up @@ -205,8 +194,7 @@ private void processMessages(ChangeEventSourceContext context, final Replication
taskContext.getSlotXmin(connection));
if (message.getOperation() == Operation.BEGIN) {
dispatcher.dispatchTransactionStartedEvent(Long.toString(message.getTransactionId()), offsetContext);
}
else if (message.getOperation() == Operation.COMMIT) {
} else if (message.getOperation() == Operation.COMMIT) {
commitMessage(lsn);
dispatcher.dispatchTransactionCommittedEvent(offsetContext);
}
Expand All @@ -216,7 +204,7 @@ else if (message.getOperation() == Operation.COMMIT) {
else {
TableId tableId = null;
if (message.getOperation() != Operation.NOOP) {
tableId = PostgresSchema.parse(message.getTable());
tableId = message.getTableId();
Objects.requireNonNull(tableId);
}

Expand All @@ -239,8 +227,7 @@ else if (message.getOperation() == Operation.COMMIT) {

if (receivedMessage) {
noMessageIterations = 0;
}
else {
} else {
if (offsetContext.hasCompletelyProcessedPosition()) {
dispatcher.dispatchHeartbeatEvent(offsetContext);
}
Expand Down Expand Up @@ -275,8 +262,7 @@ private void searchWalPosition(ChangeEventSourceContext context, final Replicati

if (receivedMessage) {
noMessageIterations = 0;
}
else {
} else {
noMessageIterations++;
if (noMessageIterations >= THROTTLE_NO_MESSAGE_BEFORE_PAUSE) {
noMessageIterations = 0;
Expand Down Expand Up @@ -307,23 +293,21 @@ private void commitMessage(final Lsn lsn) throws SQLException, InterruptedExcept
* The purpose of this method is to detect this situation and log a warning
* every {@link #GROWING_WAL_WARNING_LOG_INTERVAL} filtered events.
*
* @param dispatched
* Whether an event was sent to the broker or not
* @param dispatched Whether an event was sent to the broker or not
*/
private void maybeWarnAboutGrowingWalBacklog(boolean dispatched) {
if (dispatched) {
numberOfEventsSinceLastEventSentOrWalGrowingWarning = 0;
}
else {
} else {
numberOfEventsSinceLastEventSentOrWalGrowingWarning++;
}

if (numberOfEventsSinceLastEventSentOrWalGrowingWarning > GROWING_WAL_WARNING_LOG_INTERVAL && !dispatcher.heartbeatsEnabled()) {
LOGGER.warn("Received {} events which were all filtered out, so no offset could be committed. "
+ "This prevents the replication slot from acknowledging the processed WAL offsets, "
+ "causing a growing backlog of non-removeable WAL segments on the database server. "
+ "Consider to either adjust your filter configuration or enable heartbeat events "
+ "(via the {} option) to avoid this situation.",
+ "This prevents the replication slot from acknowledging the processed WAL offsets, "
+ "causing a growing backlog of non-removeable WAL segments on the database server. "
+ "Consider to either adjust your filter configuration or enable heartbeat events "
+ "(via the {} option) to avoid this situation.",
numberOfEventsSinceLastEventSentOrWalGrowingWarning, Heartbeat.HEARTBEAT_INTERVAL_PROPERTY_NAME);

numberOfEventsSinceLastEventSentOrWalGrowingWarning = 0;
Expand All @@ -344,12 +328,10 @@ public void commitOffset(Map<String, ?> offset) {
}
// tell the server the point up to which we've processed data, so it can be free to recycle WAL segments
replicationStream.flushLsn(lsn);
}
else {
} else {
LOGGER.debug("Streaming has already stopped, ignoring commit callback...");
}
}
catch (SQLException e) {
} catch (SQLException e) {
throw new ConnectException(e);
}
}
Expand All @@ -358,7 +340,7 @@ public void commitOffset(Map<String, ?> offset) {
* Returns whether the current streaming phase is running a catch up streaming
* phase that runs before a snapshot. This is useful for transaction
* management.
*
* <p>
* During pre-snapshot catch up streaming, we open the snapshot transaction
* early and hold the transaction open throughout the pre snapshot catch up
* streaming phase so that we know where to stop streaming and can start the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import java.time.OffsetTime;
import java.util.List;

import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.relational.TableId;
import org.postgresql.geometric.PGbox;
import org.postgresql.geometric.PGcircle;
import org.postgresql.geometric.PGline;
Expand Down Expand Up @@ -156,6 +158,10 @@ public interface ColumnValue<T> {
*/
public String getTable();

default TableId getTableId() {
return PostgresSchema.parse(getTable());
}

/**
* @return Set of original values of table columns, null for INSERT
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package io.debezium.connector.postgresql.connection.pgoutput;

import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.sql.DatabaseMetaData;
Expand Down Expand Up @@ -250,8 +251,8 @@ private void handleCommitMessage(ByteBuffer buffer, ReplicationMessageProcessor
*/
private void handleRelationMessage(ByteBuffer buffer, TypeRegistry typeRegistry) throws SQLException {
int relationId = buffer.getInt();
String schemaName = readString(buffer);
String tableName = readString(buffer);
String schemaName = readStringV2(buffer);
String tableName = readStringV2(buffer);
int replicaIdentityId = buffer.get();
short columnCount = buffer.getShort();

Expand All @@ -276,7 +277,7 @@ private void handleRelationMessage(ByteBuffer buffer, TypeRegistry typeRegistry)
Set<String> columnNames = new HashSet<>();
for (short i = 0; i < columnCount; ++i) {
byte flags = buffer.get();
String columnName = Strings.unquoteIdentifierPart(readString(buffer));
String columnName = Strings.unquoteIdentifierPart(readStringV2(buffer));
int columnType = buffer.getInt();
int attypmod = buffer.getInt();

Expand Down Expand Up @@ -379,7 +380,7 @@ private void decodeInsert(ByteBuffer buffer, TypeRegistry typeRegistry, Replicat
List<Column> columns = resolveColumnsFromStreamTupleData(buffer, typeRegistry, table);
processor.process(new PgOutputReplicationMessage(
Operation.INSERT,
table.id().toDoubleQuotedString(),
table.id(),
commitTimestamp,
transactionId,
null,
Expand Down Expand Up @@ -426,7 +427,7 @@ private void decodeUpdate(ByteBuffer buffer, TypeRegistry typeRegistry, Replicat
List<Column> columns = resolveColumnsFromStreamTupleData(buffer, typeRegistry, table);
processor.process(new PgOutputReplicationMessage(
Operation.UPDATE,
table.id().toDoubleQuotedString(),
table.id(),
commitTimestamp,
transactionId,
oldColumns,
Expand Down Expand Up @@ -459,7 +460,7 @@ private void decodeDelete(ByteBuffer buffer, TypeRegistry typeRegistry, Replicat
List<Column> columns = resolveColumnsFromStreamTupleData(buffer, typeRegistry, table);
processor.process(new PgOutputReplicationMessage(
Operation.DELETE,
table.id().toDoubleQuotedString(),
table.id(),
commitTimestamp,
transactionId,
columns,
Expand Down Expand Up @@ -512,7 +513,7 @@ private void decodeTruncate(ByteBuffer buffer, TypeRegistry typeRegistry, Replic
boolean lastTableInTruncate = (i + 1) == noOfResolvedTables;
processor.process(new PgOutputTruncateReplicationMessage(
Operation.TRUNCATE,
table.id().toDoubleQuotedString(),
table.id(),
commitTimestamp,
transactionId,
lastTableInTruncate));
Expand Down Expand Up @@ -596,6 +597,15 @@ private static String readString(ByteBuffer buffer) {
return sb.toString();
}

private static String readStringV2(ByteBuffer buffer) {
ByteArrayOutputStream buf = new ByteArrayOutputStream();
byte b = 0;
while ((b = buffer.get()) != 0) {
buf.write(b);
}
return buf.toString();
}

/**
* Reads the replication stream where the column stream specifies a length followed by the value.
*
Expand Down
Loading
Loading