diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt index 0627460cf5a5..52cc339639ae 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt @@ -3338,12 +3338,12 @@ abstract class BasicFunctionalityIntegrationTest( """ { "id": 1, - "string": "fo\u0000o", + "string": "fo,\u0000o", "number": 42.1, "integer": 42, "boolean": true, "timestamp_with_timezone": "2023-01-23T11:34:56-01:00", - "timestamp_without_timezone": "2023-01-23T12:34:56", + "timestamp_without_timezone": "2023-01-23T12:34:56,5", "time_with_timezone": "11:34:56-01:00", "time_without_timezone": "12:34:56", "date": "2023-01-23" diff --git a/airbyte-integrations/connectors/destination-postgres/build.gradle b/airbyte-integrations/connectors/destination-postgres/build.gradle index 3b26a943efc1..3ed23537b9e9 100644 --- a/airbyte-integrations/connectors/destination-postgres/build.gradle +++ b/airbyte-integrations/connectors/destination-postgres/build.gradle @@ -26,19 +26,34 @@ def hikariCpVersion = "7.0.2" def junitVersion = "5.13.4" def junitPlatformVersion = "1.13.4" def postgresqlVersion = "42.7.2" +def testContainersVersion = "1.20.5" dependencies { implementation("org.postgresql:postgresql:$postgresqlVersion") implementation("com.zaxxer:HikariCP:$hikariCpVersion") + implementation(project(":airbyte-cdk:java:airbyte-cdk:airbyte-cdk-core")) + implementation(project(":airbyte-cdk:java:airbyte-cdk:airbyte-cdk-typing-deduping")) + implementation(project(":airbyte-cdk:java:airbyte-cdk:airbyte-cdk-db-destinations")) testImplementation("io.mockk:mockk:1.14.5") testImplementation("org.junit.jupiter:junit-jupiter-api:$junitVersion") testImplementation("org.junit.jupiter:junit-jupiter:$junitVersion") + testRuntimeOnly("org.junit.platform:junit-platform-engine:$junitPlatformVersion") testRuntimeOnly("org.junit.platform:junit-platform-launcher:$junitPlatformVersion") testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:$junitVersion") + // TestFixtures dependencies - needed for AbstractPostgresTypingDedupingTest + testFixturesApi(testFixtures(project(":airbyte-cdk:java:airbyte-cdk:airbyte-cdk-db-destinations"))) + testFixturesApi(project(":airbyte-cdk:java:airbyte-cdk:airbyte-cdk-typing-deduping")) + testFixturesApi(project(":airbyte-cdk:java:airbyte-cdk:airbyte-cdk-dependencies")) + testFixturesApi("org.testcontainers:postgresql:$testContainersVersion") + testFixturesImplementation("org.postgresql:postgresql:$postgresqlVersion") + testFixturesImplementation("com.zaxxer:HikariCP:$hikariCpVersion") + integrationTestImplementation("com.zaxxer:HikariCP:$hikariCpVersion") integrationTestImplementation("org.postgresql:postgresql:$postgresqlVersion") + integrationTestImplementation("org.testcontainers:postgresql:$testContainersVersion") integrationTestImplementation(project(":airbyte-cdk:java:airbyte-cdk:airbyte-cdk-dependencies")) + integrationTestImplementation(testFixtures(project(":airbyte-cdk:java:airbyte-cdk:airbyte-cdk-db-destinations"))) } diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/PostgresDestination.kt b/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/PostgresDestination.kt index 4df3385379f5..4efb275a2c5d 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/PostgresDestination.kt +++ b/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/PostgresDestination.kt @@ -1,10 +1,234 @@ /* - * Copyright (c) 2025 Airbyte, Inc., all rights reserved. + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. */ package io.airbyte.integrations.destination.postgres -import io.airbyte.cdk.AirbyteDestinationRunner +import com.fasterxml.jackson.databind.JsonNode +import com.google.common.collect.ImmutableMap +import io.airbyte.cdk.db.factory.DataSourceFactory +import io.airbyte.cdk.db.factory.DatabaseDriver +import io.airbyte.cdk.db.jdbc.JdbcDatabase +import io.airbyte.cdk.db.jdbc.JdbcUtils +import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler.Companion.addThrowableForDeinterpolation +import io.airbyte.cdk.integrations.base.Destination +import io.airbyte.cdk.integrations.base.IntegrationRunner +import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination +import io.airbyte.cdk.integrations.destination.async.deser.StreamAwareDataTransformer +import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator +import io.airbyte.cdk.integrations.util.PostgresSslConnectionUtils +import io.airbyte.cdk.integrations.util.PostgresSslConnectionUtils.obtainConnectionOptions +import io.airbyte.commons.json.Jsons.jsonNode +import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler +import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog +import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator +import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration +import io.airbyte.integrations.destination.postgres.typing_deduping.* +import java.net.URLEncoder +import java.nio.charset.StandardCharsets +import java.time.Duration +import java.util.* +import org.postgresql.util.PSQLException +import org.slf4j.Logger +import org.slf4j.LoggerFactory -fun main(args: Array) { - AirbyteDestinationRunner.run(*args) +class PostgresDestination : + AbstractJdbcDestination(DRIVER_CLASS, PostgresSQLNameTransformer()), + Destination { + override fun modifyDataSourceBuilder( + builder: DataSourceFactory.DataSourceBuilder + ): DataSourceFactory.DataSourceBuilder { + // Anything in the pg_temp schema is only visible to the connection that created it. + // So this creates an airbyte_safe_cast function that only exists for the duration of + // a single connection. + // This avoids issues with creating the same function concurrently (e.g. if multiple syncs + // run + // at the same time). + // Function definition copied from https://dba.stackexchange.com/a/203986 + + // Adding 60 seconds to connection timeout, for ssl connections, default 10 seconds is not + // enough + + return builder + .withConnectionTimeout(Duration.ofSeconds(60)) + .withConnectionInitSql( + """ + CREATE OR REPLACE FUNCTION pg_temp.airbyte_safe_cast(_in text, INOUT _out ANYELEMENT) + LANGUAGE plpgsql AS + ${'$'}func${'$'} + BEGIN + EXECUTE format('SELECT %L::%s', ${'$'}1, pg_typeof(_out)) + INTO _out; + EXCEPTION WHEN others THEN + -- do nothing: _out already carries default + END + ${'$'}func${'$'}; + + """.trimIndent() + ) + } + + public override fun getDefaultConnectionProperties(config: JsonNode): Map { + val additionalParameters: MutableMap = HashMap() + if ( + !config.has(PostgresSslConnectionUtils.PARAM_SSL) || + config + .get( + PostgresSslConnectionUtils.PARAM_SSL, + ) + .asBoolean() + ) { + if (config.has(PostgresSslConnectionUtils.PARAM_SSL_MODE)) { + if ( + PostgresSslConnectionUtils.DISABLE == + config + .get(PostgresSslConnectionUtils.PARAM_SSL_MODE) + .get(PostgresSslConnectionUtils.PARAM_MODE) + .asText() + ) { + additionalParameters["sslmode"] = PostgresSslConnectionUtils.DISABLE + } else { + additionalParameters.putAll( + obtainConnectionOptions( + config.get( + PostgresSslConnectionUtils.PARAM_SSL_MODE, + ), + ), + ) + } + } else { + additionalParameters[JdbcUtils.SSL_KEY] = "true" + additionalParameters["sslmode"] = "require" + } + } + return additionalParameters + } + + override fun toJdbcConfig(config: JsonNode): JsonNode { + val schema = + Optional.ofNullable(config[JdbcUtils.SCHEMA_KEY]) + .map { obj: JsonNode -> obj.asText() } + .orElse("public") + + var encodedDatabase = config[JdbcUtils.DATABASE_KEY].asText() + if (encodedDatabase != null) { + encodedDatabase = URLEncoder.encode(encodedDatabase, StandardCharsets.UTF_8) + } + val jdbcUrl = + String.format( + "jdbc:postgresql://%s:%s/%s?", + config[JdbcUtils.HOST_KEY].asText(), + config[JdbcUtils.PORT_KEY].asText(), + encodedDatabase + ) + + val configBuilder = + ImmutableMap.builder() + .put(JdbcUtils.USERNAME_KEY, config[JdbcUtils.USERNAME_KEY].asText()) + .put(JdbcUtils.JDBC_URL_KEY, jdbcUrl) + .put(JdbcUtils.SCHEMA_KEY, schema) + + if (config.has(JdbcUtils.PASSWORD_KEY)) { + configBuilder.put(JdbcUtils.PASSWORD_KEY, config[JdbcUtils.PASSWORD_KEY].asText()) + } + + if (config.has(JdbcUtils.JDBC_URL_PARAMS_KEY)) { + configBuilder.put( + JdbcUtils.JDBC_URL_PARAMS_KEY, + config[JdbcUtils.JDBC_URL_PARAMS_KEY].asText() + ) + } + + return jsonNode(configBuilder.build()) + } + + override fun getSqlGenerator(config: JsonNode): JdbcSqlGenerator { + return PostgresSqlGenerator( + PostgresSQLNameTransformer(), + hasDropCascadeMode(config), + hasUnconstrainedNumber(config), + ) + } + override fun getSqlOperations(config: JsonNode): PostgresSqlOperations { + return PostgresSqlOperations(hasDropCascadeMode(config)) + } + + override fun getGenerationHandler(): PostgresGenerationHandler { + return PostgresGenerationHandler() + } + + private fun hasDropCascadeMode(config: JsonNode): Boolean { + val dropCascadeNode = config[DROP_CASCADE_OPTION] + return dropCascadeNode != null && dropCascadeNode.asBoolean() + } + + private fun hasUnconstrainedNumber(config: JsonNode): Boolean { + val unconstrainedNumberNode = config[UNCONSTRAINED_NUMBER_OPTION] + return unconstrainedNumberNode != null && unconstrainedNumberNode.asBoolean() + } + + override fun getDestinationHandler( + config: JsonNode, + databaseName: String, + database: JdbcDatabase, + rawTableSchema: String + ): JdbcDestinationHandler { + return PostgresDestinationHandler( + databaseName, + database, + rawTableSchema, + getGenerationHandler(), + ) + } + + protected override fun getMigrations( + database: JdbcDatabase, + databaseName: String, + sqlGenerator: SqlGenerator, + destinationHandler: DestinationHandler + ): List> { + return java.util.List.of>( + PostgresRawTableAirbyteMetaMigration(database, databaseName), + PostgresGenerationIdMigration(database, databaseName), + ) + } + + override fun getDataTransformer( + parsedCatalog: ParsedCatalog?, + defaultNamespace: String? + ): StreamAwareDataTransformer { + return PostgresDataTransformer() + } + + override val isV2Destination: Boolean + get() = true + + companion object { + private val LOGGER: Logger = LoggerFactory.getLogger(PostgresDestination::class.java) + + val DRIVER_CLASS: String = DatabaseDriver.POSTGRESQL.driverClassName + + const val DROP_CASCADE_OPTION = "drop_cascade" + const val UNCONSTRAINED_NUMBER_OPTION = "unconstrained_number" + + @JvmStatic + fun sshWrappedDestination(): Destination { + return SshWrappedDestination( + PostgresDestination(), + JdbcUtils.HOST_LIST_KEY, + JdbcUtils.PORT_LIST_KEY + ) + } + + @Throws(Exception::class) + @JvmStatic + fun main(args: Array) { + addThrowableForDeinterpolation(PSQLException::class.java) + val destination = sshWrappedDestination() + LOGGER.info("starting destination: {}", PostgresDestination::class.java) + IntegrationRunner(destination).run(args) + LOGGER.info("completed destination: {}", PostgresDestination::class.java) + } + } } diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/PostgresGenerationHandler.kt b/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/PostgresGenerationHandler.kt new file mode 100644 index 000000000000..74a6b6c0bf7e --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/PostgresGenerationHandler.kt @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.postgres + +import io.airbyte.cdk.db.jdbc.JdbcDatabase +import io.airbyte.cdk.integrations.destination.jdbc.JdbcGenerationHandler + +class PostgresGenerationHandler : JdbcGenerationHandler { + override fun getGenerationIdInTable( + database: JdbcDatabase, + namespace: String, + name: String + ): Long? { + val selectTableResultSet = + database + .unsafeQuery( + """SELECT 1 + | FROM pg_catalog.pg_namespace n + | JOIN pg_catalog.pg_class c + | ON c.relnamespace=n.oid + | JOIN pg_catalog.pg_attribute a + | ON a.attrelid = c.oid + | WHERE n.nspname=? + | AND c.relkind='r' + | AND c.relname=? + | AND a.attname=? + | LIMIT 1 + """.trimMargin(), + namespace, + name, + "_airbyte_generation_id" + ) + .use { it.toList() } + if (selectTableResultSet.isEmpty()) { + return null + } else { + val selectGenIdResultSet = + database + .unsafeQuery("SELECT _airbyte_generation_id FROM $namespace.$name LIMIT 1;") + .use { it.toList() } + if (selectGenIdResultSet.isEmpty()) { + return null + } else { + val genIdInTable = + selectGenIdResultSet.first().get("_airbyte_generation_id")?.asLong() + LOGGER.info { "found generationId in table $namespace.$name: $genIdInTable" } + return genIdInTable ?: -1L + } + } + } +} diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/PostgresSQLNameTransformer.kt b/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/PostgresSQLNameTransformer.kt new file mode 100644 index 000000000000..04d5de573ff7 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/PostgresSQLNameTransformer.kt @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.destination.postgres + +import io.airbyte.cdk.integrations.destination.StandardNameTransformer +import java.util.* +import kotlin.math.min + +class PostgresSQLNameTransformer : StandardNameTransformer() { + // I _think_ overriding these two methods is sufficient to apply the truncation logic everywhere + // but this interface + our superclass are weirdly complicated, so plausibly something is + // missing + override fun getIdentifier(name: String): String { + return truncate(super.getIdentifier(name)) + } + + override fun convertStreamName(input: String): String { + return truncate(super.convertStreamName(input)) + } + + override fun applyDefaultCase(input: String): String { + return input.lowercase(Locale.getDefault()) + } + + // see https://github.com/airbytehq/airbyte/issues/35333 + // We cannot delete these method until connectors don't need old v1 raw table references for + // migration + @Deprecated("") // Overriding a deprecated method is, itself, a warning + @Suppress("deprecation") + override fun getRawTableName(streamName: String): String { + return convertStreamName("_airbyte_raw_" + streamName.lowercase(Locale.getDefault())) + } + + /** + * Postgres silently truncates identifiers to 63 characters. Utility method to do that + * truncation explicitly, so that we can detect e.g. name collisions. + */ + private fun truncate(str: String): String { + return str.substring(0, min(str.length.toDouble(), 63.0).toInt()) + } +} diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/PostgresSqlOperations.kt b/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/PostgresSqlOperations.kt new file mode 100644 index 000000000000..068b548520e5 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/PostgresSqlOperations.kt @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.destination.postgres + +import io.airbyte.cdk.db.jdbc.JdbcDatabase +import io.airbyte.cdk.integrations.base.JavaBaseConstants +import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag.isDestinationV2 +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.cdk.integrations.destination.jdbc.JdbcSqlOperations +import io.airbyte.integrations.base.destination.operation.AbstractStreamOperation +import io.github.oshai.kotlinlogging.KotlinLogging +import java.io.BufferedReader +import java.io.File +import java.io.FileReader +import java.io.IOException +import java.nio.charset.StandardCharsets +import java.nio.file.Files +import java.sql.Connection +import java.sql.SQLException +import org.apache.commons.lang3.StringUtils +import org.postgresql.copy.CopyManager +import org.postgresql.core.BaseConnection + +val LOGGER = KotlinLogging.logger {} + +class PostgresSqlOperations(useDropCascade: Boolean) : JdbcSqlOperations() { + private val dropTableQualifier: String = if (useDropCascade) "CASCADE" else "" + override fun postCreateTableQueries(schemaName: String?, tableName: String?): List { + return if (isDestinationV2) { + java.util.List.of( // the raw_id index _could_ be unique (since raw_id is a UUID) + // but there's no reason to do that (because it's a UUID :P ) + // and it would just slow down inserts. + // also, intentionally don't specify the type of index (btree, hash, etc). Just use + // the default. + "CREATE INDEX IF NOT EXISTS " + + tableName + + "_raw_id" + + " ON " + + schemaName + + "." + + tableName + + "(_airbyte_raw_id)", + "CREATE INDEX IF NOT EXISTS " + + tableName + + "_extracted_at" + + " ON " + + schemaName + + "." + + tableName + + "(_airbyte_extracted_at)", + "CREATE INDEX IF NOT EXISTS " + + tableName + + "_loaded_at" + + " ON " + + schemaName + + "." + + tableName + + "(_airbyte_loaded_at, _airbyte_extracted_at)" + ) + } else { + emptyList() + } + } + + @Throws(Exception::class) + override fun insertRecordsInternalV2( + database: JdbcDatabase, + records: List, + schemaName: String?, + tableName: String?, + syncId: Long, + generationId: Long + ) { + insertRecordsInternal( + database, + records, + schemaName, + tableName, + syncId, + generationId, + JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, + JavaBaseConstants.COLUMN_NAME_DATA, + JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, + JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, + JavaBaseConstants.COLUMN_NAME_AB_META, + JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID + ) + } + + @Throws(SQLException::class) + private fun insertRecordsInternal( + database: JdbcDatabase, + records: List, + schemaName: String?, + tmpTableName: String?, + syncId: Long, + generationId: Long, + vararg columnNames: String + ) { + if (records.isEmpty()) { + return + } + LOGGER.info { "preparing records to insert. generationId=$generationId, syncId=$syncId" } + // Explicitly passing column order to avoid order mismatches between CREATE TABLE and COPY + // statement + val orderedColumnNames = StringUtils.join(columnNames, ", ") + database.execute { connection: Connection -> + var tmpFile: File? = null + try { + tmpFile = Files.createTempFile("$tmpTableName-", ".tmp").toFile() + writeBatchToFile(tmpFile, records, syncId, generationId) + + val copyManager = CopyManager(connection.unwrap(BaseConnection::class.java)) + val sql = + String.format( + "COPY %s.%s (%s) FROM stdin DELIMITER ',' CSV", + schemaName, + tmpTableName, + orderedColumnNames + ) + LOGGER.info { "executing COPY command: $sql" } + val bufferedReader = BufferedReader(FileReader(tmpFile, StandardCharsets.UTF_8)) + copyManager.copyIn(sql, bufferedReader) + } catch (e: Exception) { + throw RuntimeException(e) + } finally { + try { + if (tmpFile != null) { + Files.delete(tmpFile.toPath()) + } + } catch (e: IOException) { + throw RuntimeException(e) + } + } + } + LOGGER.info { "COPY command completed sucessfully" } + } + + override fun overwriteRawTable(database: JdbcDatabase, rawNamespace: String, rawName: String) { + val tmpName = rawName + AbstractStreamOperation.TMP_TABLE_SUFFIX + database.executeWithinTransaction( + listOf( + "DROP TABLE $rawNamespace.$rawName $dropTableQualifier", + "ALTER TABLE $rawNamespace.$tmpName RENAME TO $rawName" + ) + ) + } +} diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/cdk/WriteOperationV2.kt b/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/cdk/WriteOperationV2.kt new file mode 100644 index 000000000000..49463564a761 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/cdk/WriteOperationV2.kt @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2025 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.postgres.cdk + +import io.airbyte.cdk.Operation +import io.airbyte.cdk.load.dataflow.DestinationLifecycle +import io.github.oshai.kotlinlogging.KotlinLogging +import io.micronaut.context.annotation.Primary +import io.micronaut.context.annotation.Requires +import jakarta.inject.Singleton + +@Primary +@Singleton +@Requires(property = Operation.PROPERTY, value = "write") +class WriteOperationV2( + private val d: DestinationLifecycle, +) : Operation { + private val log = KotlinLogging.logger {} + + override fun execute() { + log.info { "Running new pipe..." } + d.run() + log.info { "New pipe complete :tada:" } + } +} diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/config/PostgresDirectLoadDatabaseInitialStatusGatherer.kt b/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/config/PostgresDirectLoadDatabaseInitialStatusGatherer.kt new file mode 100644 index 000000000000..9a3009942610 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/config/PostgresDirectLoadDatabaseInitialStatusGatherer.kt @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2025 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.postgres.config + +import io.airbyte.cdk.load.client.AirbyteClient +import io.airbyte.cdk.load.orchestration.db.BaseDirectLoadInitialStatusGatherer +import io.airbyte.cdk.load.orchestration.db.TempTableNameGenerator +import jakarta.inject.Singleton + +@Singleton +class PostgresDirectLoadDatabaseInitialStatusGatherer( + airbyteClient: AirbyteClient, + tempTableNameGenerator: TempTableNameGenerator, +) : + BaseDirectLoadInitialStatusGatherer( + airbyteClient, + tempTableNameGenerator, + ) diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/db/PostgresNameGenerators.kt b/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/db/PostgresNameGenerators.kt new file mode 100644 index 000000000000..3f1ad8a9e0db --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/db/PostgresNameGenerators.kt @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2025 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.postgres.db + +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.data.Transformations.Companion.toAlphanumericAndUnderscore +import io.airbyte.cdk.load.orchestration.db.ColumnNameGenerator +import io.airbyte.cdk.load.orchestration.db.FinalTableNameGenerator +import io.airbyte.cdk.load.orchestration.db.TableName +import io.airbyte.cdk.load.orchestration.db.legacy_typing_deduping.TypingDedupingUtil +import io.airbyte.integrations.destination.postgres.spec.PostgresConfiguration +import jakarta.inject.Singleton +import java.util.Locale +import java.util.UUID + +@Singleton +class PostgresFinalTableNameGenerator(private val config: PostgresConfiguration) : + FinalTableNameGenerator { + override fun getTableName(streamDescriptor: DestinationStream.Descriptor) = + TableName( + namespace = + (config.internalTableSchema ?: (streamDescriptor.namespace ?: config.schema)) + .toPostgresCompatibleName(), + name = + if (config.internalTableSchema.isNullOrBlank()) { + streamDescriptor.name.toPostgresCompatibleName() + } else { + TypingDedupingUtil.concatenateRawTableName( + streamDescriptor.namespace ?: config.schema, + streamDescriptor.name + ) + .toPostgresCompatibleName() + }, + ) +} + +@Singleton +class PostgresColumnNameGenerator : ColumnNameGenerator { + override fun getColumnName(column: String): ColumnNameGenerator.ColumnName { + return ColumnNameGenerator.ColumnName( + column.toPostgresCompatibleName(), + column.lowercase(Locale.getDefault()).toPostgresCompatibleName(), + ) + } +} + +/** + * Transforms a string to be compatible with PostgreSQL table and column names. + * + * PostgreSQL identifier rules: + * - Maximum length is 63 bytes + * - Can contain letters, digits, and underscores + * - Must start with a letter or underscore (not a digit) + * - Case-insensitive by default (unless quoted) + * + * @return The transformed string suitable for PostgreSQL identifiers. + */ +fun String.toPostgresCompatibleName(): String { + // 1. Replace any character that is not a letter, + // a digit (0-9), or an underscore (_) with a single underscore. + var transformed = toAlphanumericAndUnderscore(this) + + // 2. Ensure the identifier does not start with a digit. + // If it starts with a digit, prepend an underscore. + if (transformed.isNotEmpty() && transformed[0].isDigit()) { + transformed = "_$transformed" + } + + // 3. Do not allow empty strings. + if (transformed.isEmpty()) { + return "default_name_${UUID.randomUUID()}" // A fallback name if the input results in an + // empty string + } + + // 4. Truncate to 63 characters if needed (PostgreSQL identifier limit) + if (transformed.length > 63) { + // Keep first part and add hash to avoid collisions + val hash = transformed.hashCode().toString().takeLast(8) + transformed = transformed.take(54) + "_" + hash + } + + return transformed +} diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/spec/PostgresConfiguration.kt b/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/spec/PostgresConfiguration.kt index 1c5b7f7bb06b..7a28700a23ad 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/spec/PostgresConfiguration.kt +++ b/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/spec/PostgresConfiguration.kt @@ -30,29 +30,36 @@ class PostgresConfigurationFactory : DestinationConfigurationFactory { override fun makeWithoutExceptionHandling( pojo: PostgresSpecification + ): PostgresConfiguration { + return makeWithOverrides(spec = pojo) + } + + fun makeWithOverrides( + spec: PostgresSpecification, + overrides: Map = emptyMap() ): PostgresConfiguration { return PostgresConfiguration( - host = pojo.host, - port = pojo.port, - database = pojo.database, - schema = pojo.schema, - username = pojo.username, - password = pojo.password, - sslMode = pojo.sslMode, - jdbcUrlParams = pojo.jdbcUrlParams, - cdcDeletionMode = pojo.cdcDeletionMode ?: CdcDeletionMode.HARD_DELETE, - legacyRawTablesOnly = pojo.legacyRawTablesOnly ?: false, + host = overrides.getOrDefault("host", spec.host), + port = overrides.getOrDefault("port", spec.port.toString()).toInt(), + database = overrides.getOrDefault("database", spec.database), + schema = overrides.getOrDefault("schema", spec.schema), + username = overrides.getOrDefault("username", spec.username), + password = overrides.getOrDefault("password", spec.password), + sslMode = spec.sslMode, + jdbcUrlParams = overrides.getOrDefault("jdbcUrlParams", spec.jdbcUrlParams), + cdcDeletionMode = spec.cdcDeletionMode ?: CdcDeletionMode.HARD_DELETE, + legacyRawTablesOnly = spec.legacyRawTablesOnly ?: false, internalTableSchema = - if (pojo.legacyRawTablesOnly == true) { - if (pojo.internalTableSchema.isNullOrBlank()) { + if (spec.legacyRawTablesOnly == true) { + if (spec.internalTableSchema.isNullOrBlank()) { DbConstants.DEFAULT_RAW_TABLE_NAMESPACE } else { - pojo.internalTableSchema + spec.internalTableSchema } } else { null }, - tunnelMethod = pojo.getTunnelMethodValue() + tunnelMethod = spec.getTunnelMethodValue() ) } } diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresDataTransformer.kt b/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresDataTransformer.kt new file mode 100644 index 000000000000..9313b041f7ba --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresDataTransformer.kt @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.destination.postgres.typing_deduping + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.ObjectNode +import io.airbyte.cdk.integrations.destination.async.deser.StreamAwareDataTransformer +import io.airbyte.commons.json.Jsons.jsonNode +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange +import io.airbyte.protocol.models.v0.StreamDescriptor +import java.util.function.Function +import java.util.function.Predicate + +class PostgresDataTransformer : StreamAwareDataTransformer { + /* + * This class is copied in its entirety from DataAdapter class to unify logic into one single + * transformer invocation before serializing to string in AsyncStreamConsumer. + */ + val filterValueNode: Predicate = Predicate { jsonNode: JsonNode -> + jsonNode.isTextual && jsonNode.textValue().contains("\u0000") + } + val valueNodeAdapter: Function = Function { jsonNode: JsonNode -> + val textValue = jsonNode.textValue().replace("\\u0000".toRegex(), "") + jsonNode(textValue) + } + + override fun transform( + streamDescriptor: StreamDescriptor?, + data: JsonNode?, + meta: AirbyteRecordMessageMeta? + ): Pair { + val metaChanges: MutableList = ArrayList() + if (meta != null && meta.changes != null) { + metaChanges.addAll(meta.changes) + } + // Does inplace changes in the actual JsonNode reference. + adapt(data) + return Pair(data, AirbyteRecordMessageMeta().withChanges(metaChanges)) + } + + fun adapt(messageData: JsonNode?) { + if (messageData != null) { + adaptAllValueNodes(messageData) + } + } + + private fun adaptAllValueNodes(rootNode: JsonNode) { + adaptValueNodes(null, rootNode, null) + } + + /** + * The method inspects json node. In case, it's a value node we check the node by CheckFunction + * and apply ValueNodeAdapter. Filtered nodes will be updated by adapted version. If element is + * an array or an object, this we run the method recursively for them. + * + * @param fieldName Name of a json node + * @param node Json node + * @param parentNode Parent json node + */ + private fun adaptValueNodes(fieldName: String?, node: JsonNode, parentNode: JsonNode?) { + if (node.isValueNode && filterValueNode.test(node)) { + if (fieldName != null) { + val adaptedNode = valueNodeAdapter.apply(node) + (parentNode as ObjectNode?)!!.set(fieldName, adaptedNode) + } else throw RuntimeException("Unexpected value node without fieldName. Node: $node") + } else if (node.isArray) { + node.elements().forEachRemaining { arrayNode: JsonNode -> + adaptValueNodes(null, arrayNode, node) + } + } else { + node.fields().forEachRemaining { stringJsonNodeEntry: Map.Entry -> + adaptValueNodes(stringJsonNodeEntry.key, stringJsonNodeEntry.value, node) + } + } + } +} diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresDestinationHandler.kt b/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresDestinationHandler.kt new file mode 100644 index 000000000000..f7693a7cf717 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresDestinationHandler.kt @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.destination.postgres.typing_deduping + +import com.fasterxml.jackson.databind.JsonNode +import io.airbyte.cdk.db.jdbc.JdbcDatabase +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler +import io.airbyte.commons.exceptions.ConfigErrorException +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType +import io.airbyte.integrations.base.destination.typing_deduping.Array +import io.airbyte.integrations.base.destination.typing_deduping.Sql +import io.airbyte.integrations.base.destination.typing_deduping.Struct +import io.airbyte.integrations.base.destination.typing_deduping.Union +import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf +import io.airbyte.integrations.destination.postgres.PostgresGenerationHandler +import org.jooq.SQLDialect + +class PostgresDestinationHandler( + databaseName: String?, + jdbcDatabase: JdbcDatabase, + rawTableSchema: String, + generationHandler: PostgresGenerationHandler, +) : + JdbcDestinationHandler( + databaseName, + jdbcDatabase, + rawTableSchema, + SQLDialect.POSTGRES, + generationHandler = generationHandler + ) { + override fun toJdbcTypeName(airbyteType: AirbyteType): String { + // This is mostly identical to the postgres implementation, but swaps jsonb to super + if (airbyteType is AirbyteProtocolType) { + return toJdbcTypeName(airbyteType) + } + return when (airbyteType.typeName) { + Struct.TYPE, + UnsupportedOneOf.TYPE, + Array.TYPE -> "jsonb" + Union.TYPE -> toJdbcTypeName((airbyteType as Union).chooseType()) + else -> throw IllegalArgumentException("Unsupported AirbyteType: $airbyteType") + } + } + + override fun toDestinationState(json: JsonNode): PostgresState { + return PostgresState( + json.hasNonNull("needsSoftReset") && json["needsSoftReset"].asBoolean(), + json.hasNonNull("isAirbyteMetaPresentInRaw") && + json["isAirbyteMetaPresentInRaw"].asBoolean(), + json.hasNonNull("isAirbyteGenerationIdPresent") && + json["isAirbyteGenerationIdPresent"].asBoolean() + ) + } + + override fun createNamespaces(schemas: Set) { + TODO("Not yet implemented") + } + + private fun toJdbcTypeName(airbyteProtocolType: AirbyteProtocolType): String { + return when (airbyteProtocolType) { + AirbyteProtocolType.STRING -> "varchar" + AirbyteProtocolType.NUMBER -> "numeric" + AirbyteProtocolType.INTEGER -> "int8" + AirbyteProtocolType.BOOLEAN -> "bool" + AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE -> "timestamptz" + AirbyteProtocolType.TIMESTAMP_WITHOUT_TIMEZONE -> "timestamp" + AirbyteProtocolType.TIME_WITH_TIMEZONE -> "timetz" + AirbyteProtocolType.TIME_WITHOUT_TIMEZONE -> "time" + AirbyteProtocolType.DATE -> "date" + AirbyteProtocolType.UNKNOWN -> "jsonb" + } + } + + override fun execute(sql: Sql) { + try { + super.execute(sql) + } catch (e: Exception) { + // executing the + // DROP TABLE command. + if ( + e.message!!.contains("ERROR: cannot drop table") && + e.message!!.contains("because other objects depend on it") + ) { + throw ConfigErrorException( + "Failed to drop table without the CASCADE option. Consider changing the drop_cascade configuration parameter", + e + ) + } + throw e + } + } +} diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresGenerationIdMigrator.kt b/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresGenerationIdMigrator.kt new file mode 100644 index 000000000000..9353bc2ddc5e --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresGenerationIdMigrator.kt @@ -0,0 +1,120 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.postgres.typing_deduping + +import io.airbyte.cdk.db.jdbc.JdbcDatabase +import io.airbyte.cdk.integrations.base.JavaBaseConstants +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler +import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus +import io.airbyte.integrations.base.destination.typing_deduping.Sql +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig +import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration +import io.github.oshai.kotlinlogging.KotlinLogging +import org.jooq.conf.ParamType +import org.jooq.impl.DSL +import org.jooq.impl.SQLDataType + +private val logger = KotlinLogging.logger {} + +class PostgresGenerationIdMigration( + private val database: JdbcDatabase, + private val databaseName: String +) : Migration { + // TODO: This class is almost similar to RedshiftAirbyteMetaMigration except the JSONB type. + // try to unify later. + override fun migrateIfNecessary( + destinationHandler: DestinationHandler, + stream: StreamConfig, + state: DestinationInitialStatus + ): Migration.MigrationResult { + var needsStateRefresh = false + if (state.initialRawTableStatus.rawTableExists) { + // The table should exist because we checked for it above, so safe to get it. + val existingRawTable = + JdbcDestinationHandler.findExistingTable( + database, + databaseName, + stream.id.rawNamespace, + stream.id.rawName + ) + .get() + + if (existingRawTable.columns[JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID] != null) { + // The raw table already has the _airbyte_meta column. No migration necessary. + logger.info( + "Skipping migration for ${stream.id.rawNamespace}.${stream.id.rawName}'s raw table because the generation_id column is already present" + ) + } else { + logger.info( + "Executing migration for ${stream.id.rawNamespace}.${stream.id.rawName}'s raw table for real" + ) + + needsStateRefresh = true + destinationHandler.execute( + Sql.of( + DSL.alterTable(DSL.name(stream.id.rawNamespace, stream.id.rawName)) + .addColumn( + DSL.name(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID), + SQLDataType.BIGINT.nullable(true) + ) + .getSQL(ParamType.INLINED) + ) + ) + } + } + + val maybeExistingFinalTable = + JdbcDestinationHandler.findExistingTable( + database, + databaseName, + stream.id.finalNamespace, + stream.id.finalName + ) + if (maybeExistingFinalTable.isEmpty) { + logger.info( + "Stopping migration for ${stream.id.originalNamespace}.${stream.id.originalName} because the final table doesn't exist" + ) + return Migration.MigrationResult( + state.destinationState.copy(isAirbyteGenerationIdPresent = true), + needsStateRefresh + ) + } + val existingFinalTable = maybeExistingFinalTable.get() + if (existingFinalTable.columns[JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID] != null) { + // The raw table already has the _airbyte_meta column. No migration necessary. Update + // the state. + logger.info( + "Skipping migration for ${stream.id.finalNamespace}.${stream.id.finalName} because the generation_id column is already present" + ) + } else { + logger.info( + "Executing migration for ${stream.id.finalNamespace}.${stream.id.finalName} for real" + ) + + needsStateRefresh = true + destinationHandler.execute( + Sql.of( + DSL.alterTable(DSL.name(stream.id.finalNamespace, stream.id.finalName)) + .addColumn( + DSL.name(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID), + SQLDataType.BIGINT.nullable(true) + ) + .getSQL(ParamType.INLINED) + ) + ) + } + + // We will not do a soft reset since it could be time-consuming, instead we leave the old + // data i.e. `errors` instead of `changes` as is since this column is controlled by us. + return Migration.MigrationResult( + state.destinationState.copy( + needsSoftReset = false, + isAirbyteGenerationIdPresent = true + ), + needsStateRefresh + ) + } +} diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresRawTableAirbyteMetaMigration.kt b/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresRawTableAirbyteMetaMigration.kt new file mode 100644 index 000000000000..60af19a9b001 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresRawTableAirbyteMetaMigration.kt @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.postgres.typing_deduping + +import io.airbyte.cdk.db.jdbc.JdbcDatabase +import io.airbyte.cdk.integrations.base.JavaBaseConstants +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler +import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus +import io.airbyte.integrations.base.destination.typing_deduping.Sql +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig +import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration +import io.airbyte.integrations.destination.postgres.typing_deduping.PostgresSqlGenerator.* +import org.jooq.conf.ParamType +import org.jooq.impl.DSL +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +class PostgresRawTableAirbyteMetaMigration( + private val database: JdbcDatabase, + private val databaseName: String +) : Migration { + private val logger: Logger = LoggerFactory.getLogger(this.javaClass) + + // TODO: This class is almost similar to RedshiftAirbyteMetaMigration except the JSONB type. + // try to unify later. + override fun migrateIfNecessary( + destinationHandler: DestinationHandler, + stream: StreamConfig, + state: DestinationInitialStatus + ): Migration.MigrationResult { + if (!state.initialRawTableStatus.rawTableExists) { + // The raw table doesn't exist. No migration necessary. Update the state. + logger.info( + "Skipping RawTableAirbyteMetaMigration for ${stream.id.originalNamespace}.${stream.id.originalName} because the raw table doesn't exist" + ) + return Migration.MigrationResult( + state.destinationState.copy(isAirbyteMetaPresentInRaw = true), + false + ) + } + + // The table should exist because we checked for it above, so safe to get it. + val existingRawTable = + JdbcDestinationHandler.findExistingTable( + database, + databaseName, + stream.id.rawNamespace, + stream.id.rawName + ) + .get() + + if (existingRawTable.columns[JavaBaseConstants.COLUMN_NAME_AB_META] != null) { + // The raw table already has the _airbyte_meta column. No migration necessary. Update + // the state. + return Migration.MigrationResult( + state.destinationState.copy(isAirbyteMetaPresentInRaw = true), + false + ) + } + + logger.info( + "Executing RawTableAirbyteMetaMigration for ${stream.id.rawNamespace}.${stream.id.rawName} for real" + ) + + destinationHandler.execute( + Sql.of( + DSL.alterTable(DSL.name(stream.id.rawNamespace, stream.id.rawName)) + .addColumn( + DSL.name(JavaBaseConstants.COLUMN_NAME_AB_META), + PostgresSqlGenerator.JSONB_TYPE + ) + .getSQL(ParamType.INLINED) + ) + ) + + // Update the state. We didn't modify the table in a relevant way, so don't invalidate the + // InitialState. + // We will not do a soft reset since it could be time-consuming, instead we leave the old + // data i.e. `errors` instead of `changes` as is since this column is controlled by us. + return Migration.MigrationResult( + state.destinationState.copy(needsSoftReset = false, isAirbyteMetaPresentInRaw = true), + false + ) + } +} diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGenerator.kt b/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGenerator.kt new file mode 100644 index 000000000000..84599bdc4995 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGenerator.kt @@ -0,0 +1,384 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.destination.postgres.typing_deduping + +import io.airbyte.cdk.integrations.base.JavaBaseConstants +import io.airbyte.cdk.integrations.destination.NamingConventionTransformer +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator +import io.airbyte.integrations.base.destination.typing_deduping.* +import io.airbyte.integrations.base.destination.typing_deduping.Array +import io.airbyte.integrations.base.destination.typing_deduping.Sql.Companion.concat +import io.airbyte.integrations.base.destination.typing_deduping.Sql.Companion.of +import io.airbyte.integrations.base.destination.typing_deduping.StreamId.Companion.concatenateRawTableName +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange +import java.util.* +import java.util.function.Function +import java.util.stream.Collectors +import java.util.stream.Stream +import org.jooq.Condition +import org.jooq.DataType +import org.jooq.Field +import org.jooq.Name +import org.jooq.SQLDialect +import org.jooq.impl.DSL +import org.jooq.impl.DefaultDataType +import org.jooq.impl.SQLDataType + +class PostgresSqlGenerator( + namingTransformer: NamingConventionTransformer, + cascadeDrop: Boolean, + private val unconstrainedNumber: Boolean, +) : JdbcSqlGenerator(namingTransformer, cascadeDrop) { + override fun buildStreamId( + namespace: String, + name: String, + rawNamespaceOverride: String + ): StreamId { + // There is a mismatch between convention used in create table query in SqlOperations vs + // this. + // For postgres specifically, when a create table is issued without a quoted identifier, it + // will be + // converted to lowercase. + // To keep it consistent when querying raw table in T+D query, convert it to lowercase. + // TODO: This logic should be unified across Raw and final table operations in a single + // class + // operating on a StreamId. + val streamName = + namingTransformer + .getIdentifier(concatenateRawTableName(namespace, name)) + .lowercase(Locale.getDefault()) + return StreamId( + namingTransformer.getNamespace(namespace), + namingTransformer.convertStreamName(name), + namingTransformer.getNamespace(rawNamespaceOverride).lowercase(Locale.getDefault()), + streamName, + namespace, + name + ) + } + + override val structType: DataType<*> + get() = JSONB_TYPE + + override val arrayType: DataType<*> + get() = JSONB_TYPE + + override val widestType: DataType<*> + get() = JSONB_TYPE + + override val dialect: SQLDialect + get() = SQLDialect.POSTGRES + + override fun toDialectType(airbyteProtocolType: AirbyteProtocolType): DataType<*> { + if (airbyteProtocolType == AirbyteProtocolType.STRING) { + // https://www.postgresql.org/docs/current/datatype-character.html + // If specified, the length n must be greater than zero and cannot exceed 10,485,760 (10 + // MB). + // If you desire to store long strings with no specific upper limit, + // use text or character varying without a length specifier, + // rather than making up an arbitrary length limit. + return SQLDataType.VARCHAR + } + + if (airbyteProtocolType == AirbyteProtocolType.NUMBER && unconstrainedNumber) { + return SQLDataType.DECIMAL + } + + return super.toDialectType(airbyteProtocolType) + } + + override fun createTable(stream: StreamConfig, suffix: String, force: Boolean): Sql { + val statements: MutableList = ArrayList() + val finalTableName = DSL.name(stream.id.finalNamespace, stream.id.finalName + suffix) + + statements.add(super.createTable(stream, suffix, force)) + + if (stream.postImportAction == ImportType.DEDUPE) { + // An index for our ROW_NUMBER() PARTITION BY pk ORDER BY cursor, extracted_at function + val pkNames = + stream.primaryKey.stream().map { pk: ColumnId -> DSL.quotedName(pk.name) }.toList() + statements.add( + of( + dslContext + .createIndex() + .on( + finalTableName, + Stream.of>( + pkNames + .stream(), // if cursor is present, then a stream containing + // its name + // but if no cursor, then empty stream + stream.cursor.stream().map { cursor: ColumnId -> + DSL.quotedName(cursor.name) + }, + Stream.of( + DSL.name(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT) + ) + ) + .flatMap(Function.identity>()) + .toList() + ) + .sql + ) + ) + } + statements.add( + of( + dslContext + .createIndex() + .on(finalTableName, DSL.name(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT)) + .getSQL() + ) + ) + + statements.add( + of( + dslContext + .createIndex() + .on(finalTableName, DSL.name(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID)) + .getSQL() + ) + ) + + return concat(statements) + } + + override fun extractRawDataFields( + columns: LinkedHashMap, + useExpensiveSaferCasting: Boolean + ): MutableList> { + return columns.entries + .stream() + .map { column: Map.Entry -> + castedField(extractColumnAsJson(column.key), column.value, useExpensiveSaferCasting) + .`as`(column.key.name) + } + .collect(Collectors.toList()) + } + + override fun castedField( + field: Field<*>, + type: AirbyteType, + useExpensiveSaferCasting: Boolean + ): Field<*> { + if (type is Struct) { + // If this field is a struct, verify that the raw data is an object. + return DSL.cast( + DSL.case_() + .`when`( + field.isNull().or(jsonTypeof(field).ne("object")), + DSL.`val`(null as Any?) + ) + .else_(field), + JSONB_TYPE + ) + } else if (type is Array) { + // Do the same for arrays. + return DSL.cast( + DSL.case_() + .`when`( + field.isNull().or(jsonTypeof(field).ne("array")), + DSL.`val`(null as Any?) + ) + .else_(field), + JSONB_TYPE + ) + } else if (type === AirbyteProtocolType.UNKNOWN) { + return DSL.cast(field, JSONB_TYPE) + } else if (type === AirbyteProtocolType.STRING) { + // we need to render the jsonb to a normal string. For strings, this is the difference + // between + // "\"foo\"" and "foo". + // postgres provides the #>> operator, which takes a json path and returns that + // extraction as a + // string. + // '{}' is an empty json path (it's an empty array literal), so it just stringifies the + // json value. + return DSL.field("{0} #>> '{}'", String::class.java, field) + } else { + val dialectType = toDialectType(type) + // jsonb can't directly cast to most types, so convert to text first. + // also convert jsonb null to proper sql null. + val extractAsText = + DSL.case_() + .`when`( + field.isNull().or(jsonTypeof(field).eq("null")), + DSL.`val`(null as String?) + ) + .else_(DSL.cast(field, SQLDataType.VARCHAR)) + var cleanedText = extractAsText + if (type == AirbyteProtocolType.NUMBER) { + cleanedText = DSL.trim(extractAsText, "\"") + } + return if (useExpensiveSaferCasting) { + DSL.function( + DSL.name("pg_temp", "airbyte_safe_cast"), + dialectType, + cleanedText, + DSL.cast(DSL.`val`(null as Any?), dialectType) + ) + } else { + DSL.cast(cleanedText, dialectType) + } + } + } + + protected override fun castedField( + field: Field<*>, + type: AirbyteProtocolType, + useExpensiveSaferCasting: Boolean + ): Field<*> { + return DSL.cast(field, toDialectType(type)) + } + + private fun jsonBuildObject(vararg arguments: Field<*>): Field<*> { + return DSL.function("JSONB_BUILD_OBJECT", JSONB_TYPE, *arguments) + } + + override fun buildAirbyteMetaColumn(columns: LinkedHashMap): Field<*> { + val dataFieldErrors = + columns.entries + .stream() + .map { column: Map.Entry -> + toCastingErrorCaseStmt(column.key, column.value) + } + .toList() + val rawTableChangesArray: Field<*> = + DSL.field( + "ARRAY(SELECT jsonb_array_elements_text({0}#>'{changes}'))::jsonb[]", + DSL.field(DSL.name(JavaBaseConstants.COLUMN_NAME_AB_META)) + ) + + // Jooq is inferring and casting as int[] for empty fields array call. So explicitly casting + // it to + // jsonb[] on empty array + val finalTableChangesArray: Field<*> = + if (dataFieldErrors.isEmpty()) DSL.field("ARRAY[]::jsonb[]") + else + DSL.function( + "ARRAY_REMOVE", + JSONB_TYPE, + DSL.array(dataFieldErrors).cast(JSONB_TYPE.arrayDataType), + DSL.`val`(null as String?) + ) + val syncId: Field<*> = + DSL.field( + "{0}#>'{${JavaBaseConstants.AIRBYTE_META_SYNC_ID_KEY}}'", + DSL.field(DSL.name(JavaBaseConstants.COLUMN_NAME_AB_META)) + ) + return jsonBuildObject( + DSL.`val`(AB_META_COLUMN_CHANGES_KEY), + DSL.field("ARRAY_CAT({0}, {1})", finalTableChangesArray, rawTableChangesArray), + DSL.`val`(JavaBaseConstants.AIRBYTE_META_SYNC_ID_KEY), + syncId + ) + .`as`(JavaBaseConstants.COLUMN_NAME_AB_META) + } + + private fun nulledChangeObject(fieldName: String): Field<*> { + return jsonBuildObject( + DSL.`val`(AB_META_CHANGES_FIELD_KEY), + DSL.`val`(fieldName), + DSL.`val`(AB_META_CHANGES_CHANGE_KEY), + DSL.`val`(AirbyteRecordMessageMetaChange.Change.NULLED), + DSL.`val`(AB_META_CHANGES_REASON_KEY), + DSL.`val`(AirbyteRecordMessageMetaChange.Reason.DESTINATION_TYPECAST_ERROR) + ) + } + + private fun toCastingErrorCaseStmt(column: ColumnId, type: AirbyteType): Field { + val extract = extractColumnAsJson(column) + // If this field is a struct, verify that the raw data is an object or null. + // Do the same for arrays. + return when (type) { + is Struct -> + DSL.field( + CASE_STATEMENT_SQL_TEMPLATE, + extract.isNotNull().and(jsonTypeof(extract).notIn("object", "null")), + nulledChangeObject(column.originalName), + DSL.cast(DSL.`val`(null as Any?), JSONB_TYPE) + ) + is Array -> + DSL.field( + CASE_STATEMENT_SQL_TEMPLATE, + extract.isNotNull().and(jsonTypeof(extract).notIn("array", "null")), + nulledChangeObject(column.originalName), + DSL.cast(DSL.`val`(null as Any?), JSONB_TYPE) + ) + AirbyteProtocolType.STRING, + AirbyteProtocolType.UNKNOWN -> DSL.cast(DSL.`val`(null as Any?), JSONB_TYPE) + else -> + DSL.field( + CASE_STATEMENT_SQL_TEMPLATE, + extract + .isNotNull() + .and(jsonTypeof(extract).ne("null")) + .and(castedField(extract, type, true).isNull()), + nulledChangeObject(column.originalName), + DSL.cast(DSL.`val`(null as Any?), JSONB_TYPE) + ) + } + } + + override fun cdcDeletedAtNotNullCondition(): Condition { + return DSL.field(DSL.name(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT)) + .isNotNull() + .and(jsonTypeof(extractColumnAsJson(cdcDeletedAtColumn)).ne("null")) + } + + override fun getRowNumber(primaryKeys: List, cursor: Optional): Field { + // literally identical to redshift's getRowNumber implementation, changes here probably + // should + // be reflected there + val primaryKeyFields = + if (primaryKeys != null) + primaryKeys + .stream() + .map { columnId: ColumnId -> DSL.field(DSL.quotedName(columnId.name)) } + .collect(Collectors.toList()) + else ArrayList() + val orderedFields: MutableList> = ArrayList() + // We can still use Jooq's field to get the quoted name with raw sql templating. + // jooq's .desc returns SortField instead of Field and NULLS LAST doesn't work with it + cursor.ifPresent { columnId: ColumnId -> + orderedFields.add( + DSL.field("{0} desc NULLS LAST", DSL.field(DSL.quotedName(columnId.name))) + ) + } + orderedFields.add( + DSL.field("{0} desc", DSL.quotedName(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT)) + ) + return DSL.rowNumber() + .over() + .partitionBy(primaryKeyFields) + .orderBy(orderedFields) + .`as`(ROW_NUMBER_COLUMN_NAME) + } + + /** Extract a raw field, leaving it as jsonb */ + private fun extractColumnAsJson(column: ColumnId): Field { + return DSL.field( + "{0} -> {1}", + DSL.name(JavaBaseConstants.COLUMN_NAME_DATA), + DSL.`val`(column.originalName) + ) + } + + private fun jsonTypeof(field: Field<*>): Field { + return DSL.function("JSONB_TYPEOF", SQLDataType.VARCHAR, field) + } + + companion object { + @JvmField + val JSONB_TYPE: DataType = + DefaultDataType(SQLDialect.POSTGRES, Any::class.java, "jsonb") + + const val CASE_STATEMENT_SQL_TEMPLATE: String = "CASE WHEN {0} THEN {1} ELSE {2} END " + + private const val AB_META_COLUMN_CHANGES_KEY = "changes" + private const val AB_META_CHANGES_FIELD_KEY = "field" + private const val AB_META_CHANGES_CHANGE_KEY = "change" + private const val AB_META_CHANGES_REASON_KEY = "reason" + } +} diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresState.kt b/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresState.kt new file mode 100644 index 000000000000..09ec1ff3991e --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres/src/main/kotlin/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresState.kt @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.postgres.typing_deduping + +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState + +data class PostgresState( + val needsSoftReset: Boolean, + val isAirbyteMetaPresentInRaw: Boolean, + val isAirbyteGenerationIdPresent: Boolean +) : MinimumDestinationState { + override fun needsSoftReset(): Boolean { + return needsSoftReset + } + + @Suppress("UNCHECKED_CAST") + override fun withSoftReset(needsSoftReset: Boolean): T { + return copy(needsSoftReset = needsSoftReset) as T + } +} diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/kotlin/io/airbyte/integrations/destination/postgres/PostgresContainerHelper.kt b/airbyte-integrations/connectors/destination-postgres/src/test-integration/kotlin/io/airbyte/integrations/destination/postgres/PostgresContainerHelper.kt new file mode 100644 index 000000000000..c63b35fa3184 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/kotlin/io/airbyte/integrations/destination/postgres/PostgresContainerHelper.kt @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2025 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.postgres + +import io.airbyte.cdk.load.test.util.ConfigurationUpdater +import io.airbyte.cdk.load.test.util.DefaultNamespaceResult +import io.github.oshai.kotlinlogging.KotlinLogging +import org.testcontainers.containers.PostgreSQLContainer + +private val logger = KotlinLogging.logger {} + +/** + * Helper class for launching/stopping PostgreSQL test containers, as well as updating destination + * configuration to match test container configuration. + */ +object PostgresContainerHelper { + + private val testContainer: PostgreSQLContainer = + PostgreSQLContainer("postgres:16-alpine").apply { + withDatabaseName("postgres") + withUsername("postgres") + withPassword("postgres") + } + + fun start() { + synchronized(lock = testContainer) { + if (!testContainer.isRunning) { + testContainer.start() + } + } + } + + /** This method cleanly stops the test container if it is running. */ + fun stop() { + synchronized(lock = testContainer) { + if (testContainer.isRunning) { + testContainer.stop() + } + } + } + + fun getHost(): String = testContainer.host + + fun getPassword(): String = testContainer.password + + fun getPort(): Int = testContainer.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT) + + fun getUsername(): String = testContainer.username + + fun getDatabaseName(): String = testContainer.databaseName + + fun getIpAddress(): String { + // Ensure that the container is started first + start() + return testContainer.containerInfo.networkSettings.networks.entries.first().value.ipAddress!! + } +} + +/** + * Configuration updater that replaces placeholder values with actual test container connection + * details. + */ +class PostgresConfigUpdater : ConfigurationUpdater { + override fun update(config: String): String { + var updatedConfig = config + + // If not running the connector in docker, we must use the mapped port to connect to the + // database. Otherwise, get the container's IP address for the host + updatedConfig = + if (System.getenv("AIRBYTE_CONNECTOR_INTEGRATION_TEST_RUNNER") != "docker") { + updatedConfig + .replace("replace_me_host", PostgresContainerHelper.getHost()) + .replace("replace_me_port", PostgresContainerHelper.getPort().toString()) + } else { + updatedConfig + .replace("replace_me_host", PostgresContainerHelper.getIpAddress()) + .replace("replace_me_port", PostgreSQLContainer.POSTGRESQL_PORT.toString()) + } + + updatedConfig = + updatedConfig + .replace("replace_me_database", PostgresContainerHelper.getDatabaseName()) + .replace("replace_me_username", PostgresContainerHelper.getUsername()) + .replace("replace_me_password", PostgresContainerHelper.getPassword()) + + logger.debug { "Using updated PostgreSQL configuration: $updatedConfig" } + return updatedConfig + } + + override fun setDefaultNamespace( + config: String, + defaultNamespace: String + ): DefaultNamespaceResult = + DefaultNamespaceResult( + config.replace("postgres_default_schema_placeholder", defaultNamespace), + defaultNamespace + ) +} diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/kotlin/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresRawOverrideDisableTypingDedupingTest.kt b/airbyte-integrations/connectors/destination-postgres/src/test-integration/kotlin/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresRawOverrideDisableTypingDedupingTest.kt new file mode 100644 index 000000000000..6a6f87d1d197 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/kotlin/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresRawOverrideDisableTypingDedupingTest.kt @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.destination.postgres.typing_deduping + +import com.fasterxml.jackson.databind.node.ObjectNode +import org.junit.jupiter.api.Disabled +import org.junit.jupiter.api.Test + +class PostgresRawOverrideDisableTypingDedupingTest : PostgresTypingDedupingTest() { + override fun getBaseConfig(): ObjectNode { + return super.getBaseConfig() + .put("raw_data_schema", "overridden_raw_dataset") + .put("disable_type_dedupe", true) + } + + override val rawSchema: String + get() = "overridden_raw_dataset" + + override fun disableFinalTableComparison(): Boolean { + return true + } + + @Disabled @Test override fun identicalNameSimultaneousSync() {} + + @Disabled @Test override fun testVarcharLimitOver64K() {} +} diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/kotlin/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresRawOverrideTypingDedupingTest.kt b/airbyte-integrations/connectors/destination-postgres/src/test-integration/kotlin/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresRawOverrideTypingDedupingTest.kt new file mode 100644 index 000000000000..00c8ac545412 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/kotlin/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresRawOverrideTypingDedupingTest.kt @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.destination.postgres.typing_deduping + +import com.fasterxml.jackson.databind.node.ObjectNode + +class PostgresRawOverrideTypingDedupingTest : PostgresTypingDedupingTest() { + override fun getBaseConfig(): ObjectNode { + return super.getBaseConfig().put("raw_data_schema", "overridden_raw_dataset") + } + + override val rawSchema: String + get() = "overridden_raw_dataset" +} diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/kotlin/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGeneratorIntegrationTest.kt b/airbyte-integrations/connectors/destination-postgres/src/test-integration/kotlin/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGeneratorIntegrationTest.kt new file mode 100644 index 000000000000..6661c4051181 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/kotlin/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGeneratorIntegrationTest.kt @@ -0,0 +1,322 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.destination.postgres.typing_deduping + +import com.fasterxml.jackson.databind.JsonNode +import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase +import io.airbyte.cdk.db.jdbc.JdbcDatabase +import io.airbyte.cdk.db.jdbc.JdbcUtils +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator +import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcSqlGeneratorIntegrationTest +import io.airbyte.commons.exceptions.ConfigErrorException +import io.airbyte.commons.json.Jsons +import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler +import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.executeTypeAndDedupe +import io.airbyte.integrations.destination.postgres.* +import java.util.List +import java.util.Optional +import org.jooq.DataType +import org.jooq.Field +import org.jooq.SQLDialect +import org.jooq.conf.ParamType +import org.jooq.impl.DSL +import org.junit.jupiter.api.AfterAll +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.BeforeAll +import org.junit.jupiter.api.Nested +import org.junit.jupiter.api.Test +import org.mockito.Mockito.mock + +class PostgresSqlGeneratorIntegrationTest : JdbcSqlGeneratorIntegrationTest() { + override val supportsSafeCast: Boolean + get() = true + + override val database: JdbcDatabase + get() = Companion.database!! + + override val structType: DataType<*> + get() = PostgresSqlGenerator.JSONB_TYPE + + override val sqlGenerator: JdbcSqlGenerator + get() = + PostgresSqlGenerator( + PostgresSQLNameTransformer(), + cascadeDrop = false, + unconstrainedNumber = false + ) + + override val destinationHandler: DestinationHandler + get() = PostgresDestinationHandler(databaseName, Companion.database!!, namespace, mock()) + + override val sqlDialect: SQLDialect + get() = SQLDialect.POSTGRES + + override fun toJsonValue(valueAsString: String?): Field<*> { + return DSL.cast(DSL.`val`(valueAsString), PostgresSqlGenerator.JSONB_TYPE) + } + + @Test + @Throws(Exception::class) + override fun testCreateTableIncremental() { + val sql = generator.createTable(incrementalDedupStream, "", false) + destinationHandler.execute(sql) + + val initialStatuses = destinationHandler.gatherInitialState(List.of(incrementalDedupStream)) + Assertions.assertEquals(1, initialStatuses.size) + val initialStatus = initialStatuses.first() + Assertions.assertTrue(initialStatus.isFinalTablePresent) + Assertions.assertFalse(initialStatus.isSchemaMismatch) + } + + /** Verify that we correctly DROP...CASCADE the final table when cascadeDrop is enabled. */ + @Test + @Throws(Exception::class) + fun testCascadeDrop() { + // Explicitly create a sqlgenerator with cascadeDrop=true + val generator = + PostgresSqlGenerator( + PostgresSQLNameTransformer(), + cascadeDrop = true, + unconstrainedNumber = false + ) + // Create a table, then create a view referencing it + destinationHandler.execute(generator.createTable(incrementalAppendStream, "", false)) + Companion.database!!.execute( + DSL.createView( + DSL.quotedName(incrementalAppendStream.id.finalNamespace, "example_view") + ) + .`as`( + DSL.select() + .from( + DSL.quotedName( + incrementalAppendStream.id.finalNamespace, + incrementalAppendStream.id.finalName + ) + ) + ) + .getSQL(ParamType.INLINED) + ) + // Create a "soft reset" table + destinationHandler.execute( + generator.createTable(incrementalDedupStream, "_soft_reset", false) + ) + + // Overwriting the first table with the second table should succeed. + Assertions.assertDoesNotThrow { + destinationHandler.execute( + generator.overwriteFinalTable(incrementalDedupStream.id, "_soft_reset") + ) + } + } + + /** + * Verify that when cascadeDrop is disabled, an error caused by dropping a table with + * dependencies results in a configuration error with the correct message. + */ + @Test + @Throws(Exception::class) + fun testCascadeDropDisabled() { + // Create a sql generator with cascadeDrop=false (this simulates what the framework passes + // from the + // config). + val generator = + PostgresSqlGenerator( + PostgresSQLNameTransformer(), + cascadeDrop = false, + unconstrainedNumber = false + ) + + // Create a table in the test namespace with a default name. + destinationHandler.execute(generator.createTable(incrementalAppendStream, "", false)) + + // Create a view in the test namespace that selects from the test table. + // (Ie, emulate a client action that creates some dependency on the table.) + Companion.database!!.execute( + DSL.createView( + DSL.quotedName(incrementalAppendStream.id.finalNamespace, "example_view") + ) + .`as`( + DSL.select() + .from( + DSL.quotedName( + incrementalAppendStream.id.finalNamespace, + incrementalAppendStream.id.finalName + ) + ) + ) + .getSQL(ParamType.INLINED) + ) + + // Simulate a staging table with an arbitrary suffix. + destinationHandler.execute( + generator.createTable(incrementalDedupStream, "_soft_reset", false) + ) + + // `overwriteFinalTable` drops the "original" table (without the suffix) and swaps in the + // suffixed table. The first step should fail because of the view dependency. + // (The generator does not support dropping tables directly.) + val t: Throwable = + Assertions.assertThrowsExactly(ConfigErrorException::class.java) { + destinationHandler.execute( + generator.overwriteFinalTable(incrementalDedupStream.id, "_soft_reset") + ) + } + Assertions.assertTrue( + t.message == + "Failed to drop table without the CASCADE option. Consider changing the drop_cascade configuration parameter" + ) + } + + @Nested + inner class UnconstrainedNumber { + @Test + fun testUnconstrainedNumber() { + val generator = + PostgresSqlGenerator( + PostgresSQLNameTransformer(), + cascadeDrop = false, + unconstrainedNumber = true, + ) + + createRawTable(streamId) + destinationHandler.execute(generator.createTable(incrementalDedupStream, "", false)) + insertRawTableRecords( + streamId, + listOf( + Jsons.deserializeExact( + """ + { + "_airbyte_raw_id": "7e1fac0c-017e-4ad6-bc78-334a34d64fce", + "_airbyte_extracted_at": "2023-01-01T00:00:00Z", + "_airbyte_data": { + "id1": 6, + "id2": 100, + "updated_at": "2023-01-01T01:00:00Z", + "number": 10325.876543219876543 + } + } + """.trimIndent() + ) + ) + ) + + executeTypeAndDedupe( + generator, + destinationHandler, + incrementalDedupStream, + Optional.empty(), + "" + ) + + DIFFER.diffFinalTableRecords( + listOf( + Jsons.deserializeExact( + """ + { + "_airbyte_raw_id": "7e1fac0c-017e-4ad6-bc78-334a34d64fce", + "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", + "_airbyte_meta": {"changes":[],"sync_id":null}, + "id1": 6, + "id2": 100, + "updated_at": "2023-01-01T01:00:00.000000Z", + "number": 10325.876543219876543 + } + """.trimIndent() + ) + ), + dumpFinalTableRecords(streamId, ""), + ) + } + @Test + fun testUnconstrainedStringifiedNumber() { + val generator = + PostgresSqlGenerator( + PostgresSQLNameTransformer(), + cascadeDrop = false, + unconstrainedNumber = true, + ) + + createRawTable(streamId) + destinationHandler.execute(generator.createTable(incrementalDedupStream, "", false)) + insertRawTableRecords( + streamId, + listOf( + Jsons.deserializeExact( + """ + { + "_airbyte_raw_id": "7e1fac0c-017e-4ad6-bc78-334a34d64fce", + "_airbyte_extracted_at": "2023-01-01T00:00:00Z", + "_airbyte_data": { + "id1": 6, + "id2": 100, + "updated_at": "2023-01-01T01:00:00Z", + "number": "10325.876543219876543" + } + } + """.trimIndent() + ) + ) + ) + + executeTypeAndDedupe( + generator, + destinationHandler, + incrementalDedupStream, + Optional.empty(), + "" + ) + + DIFFER.diffFinalTableRecords( + listOf( + Jsons.deserializeExact( + """ + { + "_airbyte_raw_id": "7e1fac0c-017e-4ad6-bc78-334a34d64fce", + "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", + "_airbyte_meta": {"changes":[],"sync_id":null}, + "id1": 6, + "id2": 100, + "updated_at": "2023-01-01T01:00:00.000000Z", + "number": 10325.876543219876543 + } + """.trimIndent() + ) + ), + dumpFinalTableRecords(streamId, ""), + ) + } + } + + companion object { + private var testContainer: PostgresTestDatabase? = null + private var databaseName: String? = null + private var database: JdbcDatabase? = null + + @JvmStatic + @BeforeAll + fun setupPostgres(): Unit { + testContainer = PostgresTestDatabase.`in`(PostgresTestDatabase.BaseImage.POSTGRES_13) + val config: JsonNode = + testContainer!! + .configBuilder() + .with("schema", "public") + .withDatabase() + .withHostAndPort() + .withCredentials() + .withoutSsl() + .build() + + databaseName = config.get(JdbcUtils.DATABASE_KEY).asText() + val postgresDestination = PostgresDestination() + val dataSource = postgresDestination.getDataSource(config) + database = DefaultJdbcDatabase(dataSource, PostgresSourceOperations()) + } + + @JvmStatic + @AfterAll + fun teardownPostgres(): Unit { + testContainer!!.close() + } + } +} diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/kotlin/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresTypingDedupingTest.kt b/airbyte-integrations/connectors/destination-postgres/src/test-integration/kotlin/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresTypingDedupingTest.kt new file mode 100644 index 000000000000..3e06d4181e69 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/kotlin/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresTypingDedupingTest.kt @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.destination.postgres.typing_deduping + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.ObjectNode +// import io.airbyte.integrations.destination.postgres.PostgresDestination +import io.airbyte.integrations.destination.postgres.PostgresTestDatabase +import javax.sql.DataSource +import org.junit.jupiter.api.AfterAll +import org.junit.jupiter.api.BeforeAll + +open class PostgresTypingDedupingTest : AbstractPostgresTypingDedupingTest() { + override fun getBaseConfig(): ObjectNode { + return testContainer!! + .configBuilder() + .with("schema", "public") + // .with(PostgresDestination.DROP_CASCADE_OPTION, true) + .withDatabase() + .withResolvedHostAndPort() + .withCredentials() + .withoutSsl() + .build() + } + + override fun getDataSource(config: JsonNode?): DataSource? { + // Intentionally ignore the config and rebuild it. + // The config param has the resolved (i.e. in-docker) host/port. + // We need the unresolved host/port since the test wrapper code is running from the docker + // host + // rather than in a container. + return PostgresDestination() + .getDataSource( + testContainer!! + .configBuilder() + .with("schema", "public") + .withDatabase() + .withHostAndPort() + .withCredentials() + .withoutSsl() + .build() + ) + } + + override val imageName: String + get() = "airbyte/destination-postgres:dev" + + companion object { + protected var testContainer: PostgresTestDatabase? = null + + @JvmStatic + @BeforeAll + fun setupPostgres() { + testContainer = PostgresTestDatabase.`in`(PostgresTestDatabase.BaseImage.POSTGRES_13) + } + + @JvmStatic + @AfterAll + fun teardownPostgres() { + testContainer!!.close() + } + } +} diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/kotlin/io/airbyte/integrations/destination/postgres/write/PostgresAcceptanceTest.kt b/airbyte-integrations/connectors/destination-postgres/src/test-integration/kotlin/io/airbyte/integrations/destination/postgres/write/PostgresAcceptanceTest.kt index b82b67c42ffe..640789250819 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test-integration/kotlin/io/airbyte/integrations/destination/postgres/write/PostgresAcceptanceTest.kt +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/kotlin/io/airbyte/integrations/destination/postgres/write/PostgresAcceptanceTest.kt @@ -4,41 +4,146 @@ package io.airbyte.integrations.destination.postgres.write +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.ArrayNode import io.airbyte.cdk.command.ConfigurationSpecification import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.data.AirbyteValue +import io.airbyte.cdk.load.data.NullValue +import io.airbyte.cdk.load.data.ObjectValue +import io.airbyte.cdk.load.message.Meta import io.airbyte.cdk.load.test.util.DestinationCleaner import io.airbyte.cdk.load.test.util.DestinationDataDumper import io.airbyte.cdk.load.test.util.OutputRecord +import io.airbyte.cdk.load.util.Jsons import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest import io.airbyte.cdk.load.write.DedupBehavior import io.airbyte.cdk.load.write.SchematizedNestedValueBehavior import io.airbyte.cdk.load.write.StronglyTyped import io.airbyte.cdk.load.write.UnionBehavior import io.airbyte.cdk.load.write.UnknownTypesBehavior +import io.airbyte.integrations.destination.postgres.PostgresConfigUpdater +import io.airbyte.integrations.destination.postgres.PostgresContainerHelper +import io.airbyte.integrations.destination.postgres.config.PostgresBeanFactory +import io.airbyte.integrations.destination.postgres.db.PostgresFinalTableNameGenerator +import io.airbyte.integrations.destination.postgres.spec.PostgresConfiguration +import io.airbyte.integrations.destination.postgres.spec.PostgresConfigurationFactory +import io.airbyte.integrations.destination.postgres.spec.PostgresSpecification +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange +import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.Disabled -object PostgresDataDumper : DestinationDataDumper { +class PostgresDataDumper( + private val configProvider: (ConfigurationSpecification) -> PostgresConfiguration +) : DestinationDataDumper { override fun dumpRecords( spec: ConfigurationSpecification, stream: DestinationStream - ): List = emptyList() + ): List { + val config = configProvider(spec) + val tableNameGenerator = PostgresFinalTableNameGenerator(config) + val dataSource = PostgresBeanFactory().postgresDataSource( + postgresConfiguration = config, + resolvedHost = config.host, + resolvedPort = config.port + ) + + val output = mutableListOf() + + dataSource.use { ds -> + ds.connection.use { connection -> + val statement = connection.createStatement() + + // Use the FinalTableNameGenerator to get the correct table name + val tableName = tableNameGenerator.getTableName(stream.mappedDescriptor) + val quotedTableName = "\"${tableName.namespace}\".\"${tableName.name}\"" + + // First check if the table exists + val tableExistsQuery = """ + SELECT COUNT(*) AS table_count + FROM information_schema.tables + WHERE table_schema = '${tableName.namespace}' + AND table_name = '${tableName.name}' + """.trimIndent() + + val existsResultSet = statement.executeQuery(tableExistsQuery) + existsResultSet.next() + val tableExists = existsResultSet.getInt("table_count") > 0 + existsResultSet.close() + + if (!tableExists) { + // Table doesn't exist, return empty list + return output + } + + val resultSet = statement.executeQuery("SELECT * FROM $quotedTableName") + + while (resultSet.next()) { + val dataMap = linkedMapOf() + for (i in 1..resultSet.metaData.columnCount) { + val columnName = resultSet.metaData.getColumnName(i) + if (!Meta.COLUMN_NAMES.contains(columnName)) { + val value = resultSet.getObject(i) + dataMap[columnName] = value?.let { + AirbyteValue.from(convertValue(it)) + } ?: NullValue + } + } + + val outputRecord = OutputRecord( + rawId = resultSet.getString(Meta.COLUMN_NAME_AB_RAW_ID), + extractedAt = resultSet.getTimestamp(Meta.COLUMN_NAME_AB_EXTRACTED_AT).toInstant().toEpochMilli(), + loadedAt = null, + generationId = resultSet.getLong(Meta.COLUMN_NAME_AB_GENERATION_ID), + data = ObjectValue(dataMap), + airbyteMeta = stringToMeta(resultSet.getString(Meta.COLUMN_NAME_AB_META)) + ) + output.add(outputRecord) + } + resultSet.close() + } + } + + return output + } override fun dumpFile( spec: ConfigurationSpecification, stream: DestinationStream - ): Map = emptyMap() + ): Map { + throw UnsupportedOperationException("Postgres does not support file transfer.") + } + + private fun convertValue(value: Any): Any = + when (value) { + is java.sql.Date -> value.toLocalDate() + is java.sql.Time -> value.toLocalTime() + is java.sql.Timestamp -> value.toLocalDateTime() + else -> value + } } object PostgresDataCleaner : DestinationCleaner { - override fun cleanup() {} + override fun cleanup() { + // TODO: Implement cleanup logic to drop test schemas/tables + // Similar to ClickhouseDataCleaner or MSSQLDataCleaner + } } -class PostgresSpecification : ConfigurationSpecification() - class PostgresAcceptanceTest : BasicFunctionalityIntegrationTest( - configContents = "{}", + configContents = """{ + "host": "localhost", + "port": 5432, + "database": "postgres", + "schema": "public", + "username": "postgres", + "password": "postgres" + }""", configSpecClass = PostgresSpecification::class.java, - dataDumper = PostgresDataDumper, + dataDumper = PostgresDataDumper { spec -> + val configOverrides = buildConfigOverridesForTestContainer() + PostgresConfigurationFactory().makeWithOverrides(spec as PostgresSpecification, configOverrides) + }, destinationCleaner = PostgresDataCleaner, isStreamSchemaRetroactive = true, dedupBehavior = DedupBehavior(DedupBehavior.CdcDeletionMode.HARD_DELETE), @@ -58,4 +163,45 @@ class PostgresAcceptanceTest : BasicFunctionalityIntegrationTest( ), unknownTypesBehavior = UnknownTypesBehavior.PASS_THROUGH, nullEqualsUnset = true, -) + configUpdater = PostgresConfigUpdater(), +) { + companion object { + @JvmStatic + @BeforeAll + fun beforeAll() { + PostgresContainerHelper.start() + } + + /** Builds a map of overrides for the test container environment. */ + private fun buildConfigOverridesForTestContainer(): MutableMap { + return mutableMapOf( + "host" to PostgresContainerHelper.getHost(), + "port" to PostgresContainerHelper.getPort().toString(), + "database" to PostgresContainerHelper.getDatabaseName(), + "username" to PostgresContainerHelper.getUsername(), + "password" to PostgresContainerHelper.getPassword() + ) + } + } +} + +fun stringToMeta(metaAsString: String?): OutputRecord.Meta? { + if (metaAsString.isNullOrEmpty()) { + return null + } + val metaJson = Jsons.readTree(metaAsString) + + val changes = (metaJson["changes"] as ArrayNode).map { change -> + val changeNode = change as JsonNode + Meta.Change( + field = changeNode["field"].textValue(), + change = AirbyteRecordMessageMetaChange.Change.fromValue(changeNode["change"].textValue()), + reason = AirbyteRecordMessageMetaChange.Reason.fromValue(changeNode["reason"].textValue()) + ) + } + + return OutputRecord.Meta( + changes = changes, + syncId = metaJson["sync_id"].longValue() + ) +} diff --git a/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/AbstractPostgresDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/AbstractPostgresDestinationAcceptanceTest.java new file mode 100644 index 000000000000..10a15dcf1d6f --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/AbstractPostgresDestinationAcceptanceTest.java @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.postgres; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.cdk.integrations.base.JavaBaseConstants; +import io.airbyte.cdk.integrations.destination.StandardNameTransformer; +import io.airbyte.cdk.integrations.standardtest.destination.JdbcDestinationAcceptanceTest; +import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator; +import io.airbyte.commons.json.Jsons; +import java.util.List; +import java.util.stream.Collectors; + +public abstract class AbstractPostgresDestinationAcceptanceTest extends JdbcDestinationAcceptanceTest { + + public static final String DEFAULT_DEV_IMAGE = "airbyte/destination-postgres:dev"; + + private final StandardNameTransformer namingResolver = new StandardNameTransformer(); + + @Override + protected String getImageName() { + return DEFAULT_DEV_IMAGE; + } + + @Override + protected JsonNode getFailCheckConfig() throws Exception { + final JsonNode clone = Jsons.clone(getConfig()); + ((ObjectNode) clone).put("password", "wrong password"); + return clone; + } + + @Override + protected List retrieveNormalizedRecords(final TestDestinationEnv env, final String streamName, final String namespace) + throws Exception { + final String tableName = namingResolver.getIdentifier(streamName); + return retrieveRecordsFromTable(tableName, namespace); + } + + @Override + // namingResolver.getRawTableName is deprecated + @SuppressWarnings("deprecation") + protected List retrieveRecords(final TestDestinationEnv env, + final String streamName, + final String namespace, + final JsonNode streamSchema) + throws Exception { + return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) + .stream() + .map(r -> r.get(JavaBaseConstants.COLUMN_NAME_DATA)) + .collect(Collectors.toList()); + } + + protected List retrieveRecordsFromTable(final String tableName, final String schemaName) throws Exception { + // TODO: Change emitted_at with DV2 + return getTestDb().query(ctx -> { + ctx.execute("set time zone 'UTC';"); + return ctx.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) + .stream() + .map(this::getJsonFromRecord) + .collect(Collectors.toList()); + }); + } + + protected abstract PostgresTestDatabase getTestDb(); + + @Override + protected boolean implementsNamespaces() { + return true; + } + + @Override + protected TestDataComparator getTestDataComparator() { + return new PostgresTestDataComparator(); + } + + @Override + protected boolean supportBasicDataTypeTest() { + return true; + } + + @Override + protected boolean supportArrayDataTypeTest() { + return true; + } + + @Override + protected boolean supportObjectDataTypeTest() { + return true; + } + + @Override + protected boolean supportsInDestinationNormalization() { + return true; + } + +} diff --git a/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/PostgresContainerFactory.java b/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/PostgresContainerFactory.java new file mode 100644 index 000000000000..f794ce20fc30 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/PostgresContainerFactory.java @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.postgres; + +import io.airbyte.cdk.testutils.ContainerFactory; +import java.io.IOException; +import java.io.UncheckedIOException; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; + +/** + * TODO: This class is a copy from source-postgres:testFixtures. Eventually merge into a common + * fixtures module. + */ +public class PostgresContainerFactory extends ContainerFactory> { + + @Override + protected PostgreSQLContainer createNewContainer(DockerImageName imageName) { + return new PostgreSQLContainer<>(imageName.asCompatibleSubstituteFor("postgres")); + } + + /** + * Apply the postgresql.conf file that we've packaged as a resource. + */ + public static void withConf(PostgreSQLContainer container) { + container + .withCopyFileToContainer( + MountableFile.forClasspathResource("postgresql.conf"), + "/etc/postgresql/postgresql.conf") + .withCommand("postgres -c config_file=/etc/postgresql/postgresql.conf"); + } + + /** + * Create a new network and bind it to the container. + */ + public static void withNetwork(PostgreSQLContainer container) { + container.withNetwork(Network.newNetwork()); + } + + /** + * Generate SSL certificates and tell postgres to enable SSL and use them. + */ + public static void withCert(PostgreSQLContainer container) { + container.start(); + String[] commands = { + "psql -U test -c \"CREATE USER postgres WITH PASSWORD 'postgres';\"", + "psql -U test -c \"GRANT CONNECT ON DATABASE \"test\" TO postgres;\"", + "psql -U test -c \"ALTER USER postgres WITH SUPERUSER;\"", + "openssl ecparam -name prime256v1 -genkey -noout -out ca.key", + "openssl req -new -x509 -sha256 -key ca.key -out ca.crt -subj \"/CN=127.0.0.1\"", + "openssl ecparam -name prime256v1 -genkey -noout -out server.key", + "openssl req -new -sha256 -key server.key -out server.csr -subj \"/CN=localhost\"", + "openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt -days 365 -sha256", + "cp server.key /etc/ssl/private/", + "cp server.crt /etc/ssl/private/", + "cp ca.crt /etc/ssl/private/", + "chmod og-rwx /etc/ssl/private/server.* /etc/ssl/private/ca.*", + "chown postgres:postgres /etc/ssl/private/server.crt /etc/ssl/private/server.key /etc/ssl/private/ca.crt", + "echo \"ssl = on\" >> /var/lib/postgresql/data/postgresql.conf", + "echo \"ssl_cert_file = '/etc/ssl/private/server.crt'\" >> /var/lib/postgresql/data/postgresql.conf", + "echo \"ssl_key_file = '/etc/ssl/private/server.key'\" >> /var/lib/postgresql/data/postgresql.conf", + "echo \"ssl_ca_file = '/etc/ssl/private/ca.crt'\" >> /var/lib/postgresql/data/postgresql.conf", + "mkdir root/.postgresql", + "echo \"hostssl all all 127.0.0.1/32 cert clientcert=verify-full\" >> /var/lib/postgresql/data/pg_hba.conf", + "openssl ecparam -name prime256v1 -genkey -noout -out client.key", + "openssl req -new -sha256 -key client.key -out client.csr -subj \"/CN=postgres\"", + "openssl x509 -req -in client.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out client.crt -days 365 -sha256", + "cp client.crt ~/.postgresql/postgresql.crt", + "cp client.key ~/.postgresql/postgresql.key", + "chmod 0600 ~/.postgresql/postgresql.crt ~/.postgresql/postgresql.key", + "cp ca.crt root/.postgresql/ca.crt", + "chown postgres:postgres ~/.postgresql/ca.crt", + "psql -U test -c \"SELECT pg_reload_conf();\"", + }; + for (String cmd : commands) { + try { + container.execInContainer("su", "-c", cmd); + } catch (IOException e) { + throw new UncheckedIOException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + /** + * Tell postgres to enable SSL. + */ + public static void withSSL(PostgreSQLContainer container) { + container.withCommand("postgres " + + "-c ssl=on " + + "-c ssl_cert_file=/var/lib/postgresql/server.crt " + + "-c ssl_key_file=/var/lib/postgresql/server.key"); + } + + /** + * Configure postgres with client_encoding=sql_ascii. + */ + public static void withASCII(PostgreSQLContainer container) { + container.withCommand("postgres -c client_encoding=sql_ascii"); + } + +} diff --git a/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/PostgresTestDataComparator.java b/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/PostgresTestDataComparator.java new file mode 100644 index 000000000000..1775a17139b5 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/PostgresTestDataComparator.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.postgres; + +import io.airbyte.cdk.integrations.destination.StandardNameTransformer; +import io.airbyte.cdk.integrations.standardtest.destination.comparator.AdvancedTestDataComparator; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; + +public class PostgresTestDataComparator extends AdvancedTestDataComparator { + + private final StandardNameTransformer namingResolver = new StandardNameTransformer(); + + private static final String POSTGRES_DATETIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'"; + private static final String POSTGRES_DATETIME_WITH_TZ_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'"; + + @Override + protected List resolveIdentifier(final String identifier) { + final List result = new ArrayList<>(); + final String resolved = namingResolver.getIdentifier(identifier); + result.add(identifier); + result.add(resolved); + if (!resolved.startsWith("\"")) { + result.add(resolved.toLowerCase()); + result.add(resolved.toUpperCase()); + } + return result; + } + + @Override + protected boolean compareDateTimeValues(String expectedValue, String actualValue) { + var destinationDate = parseLocalDate(actualValue); + var expectedDate = LocalDate.parse(expectedValue, DateTimeFormatter.ofPattern(AIRBYTE_DATETIME_FORMAT)); + return expectedDate.equals(destinationDate); + } + + @Override + protected ZonedDateTime parseDestinationDateWithTz(String destinationValue) { + return ZonedDateTime.of(LocalDateTime.parse(destinationValue, + DateTimeFormatter.ofPattern(POSTGRES_DATETIME_WITH_TZ_FORMAT)), ZoneOffset.UTC); + } + + private LocalDate parseLocalDate(String dateTimeValue) { + if (dateTimeValue != null) { + return LocalDate.parse(dateTimeValue, + DateTimeFormatter.ofPattern(getFormat(dateTimeValue))); + } else { + return null; + } + } + + private String getFormat(String dateTimeValue) { + if (dateTimeValue.matches(".+Z")) { + return POSTGRES_DATETIME_FORMAT; + } else if (dateTimeValue.contains("T")) { + // Postgres stores array of objects as a jsobb type, i.e. array of string for all cases + return AIRBYTE_DATETIME_FORMAT; + } else { + // Postgres stores datetime as datetime type after normalization + return AIRBYTE_DATETIME_PARSED_FORMAT; + } + } + +} diff --git a/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/PostgresTestDatabase.java b/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/PostgresTestDatabase.java new file mode 100644 index 000000000000..afc0ea8d59c6 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/PostgresTestDatabase.java @@ -0,0 +1,206 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.postgres; + +import com.google.common.collect.ImmutableMap; +import io.airbyte.cdk.db.factory.DatabaseDriver; +import io.airbyte.cdk.db.jdbc.JdbcUtils; +import io.airbyte.cdk.testutils.ContainerFactory.NamedContainerModifier; +import io.airbyte.cdk.testutils.TestDatabase; +import io.airbyte.commons.json.Jsons; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Stream; +import org.jooq.SQLDialect; +import org.testcontainers.containers.PostgreSQLContainer; + +/** + * TODO: This class is a copy from source-postgres:testFixtures. Eventually merge into a common + * fixtures module. + */ +public class PostgresTestDatabase extends + TestDatabase, PostgresTestDatabase, PostgresTestDatabase.PostgresConfigBuilder> { + + public enum BaseImage { + + POSTGRES_16("postgres:16-bullseye"), + POSTGRES_12("postgres:12-bullseye"), + POSTGRES_13("postgres:13-alpine"), + POSTGRES_9("postgres:9-alpine"), + POSTGRES_SSL_DEV("marcosmarxm/postgres-ssl:dev"); + + private final String reference; + + private BaseImage(String reference) { + this.reference = reference; + }; + + } + + public enum ContainerModifier implements NamedContainerModifier> { + + ASCII(PostgresContainerFactory::withASCII), + CONF(PostgresContainerFactory::withConf), + NETWORK(PostgresContainerFactory::withNetwork), + SSL(PostgresContainerFactory::withSSL), + CERT(PostgresContainerFactory::withCert), + ; + + private Consumer> modifer; + + private ContainerModifier(final Consumer> modifer) { + this.modifer = modifer; + } + + @Override + public Consumer> modifier() { + return modifer; + } + + } + + static public PostgresTestDatabase in(BaseImage baseImage, ContainerModifier... modifiers) { + final var container = new PostgresContainerFactory().shared(baseImage.reference, modifiers); + return new PostgresTestDatabase(container).initialized(); + } + + public PostgresTestDatabase(PostgreSQLContainer container) { + super(container); + } + + @Override + protected Stream> inContainerBootstrapCmd() { + return Stream.of(psqlCmd(Stream.of( + String.format("CREATE DATABASE %s", getDatabaseName()), + String.format("CREATE USER %s PASSWORD '%s'", getUserName(), getPassword()), + String.format("GRANT ALL PRIVILEGES ON DATABASE %s TO %s", getDatabaseName(), getUserName()), + String.format("ALTER USER %s WITH SUPERUSER", getUserName())))); + } + + /** + * Close resources held by this instance. This deliberately avoids dropping the database, which is + * really expensive in Postgres. This is because a DROP DATABASE in Postgres triggers a CHECKPOINT. + * Call {@link #dropDatabaseAndUser} to explicitly drop the database and the user. + */ + @Override + protected Stream inContainerUndoBootstrapCmd() { + return Stream.empty(); + } + + /** + * Drop the database owned by this instance. + */ + public void dropDatabaseAndUser() { + execInContainer(psqlCmd(Stream.of( + String.format("DROP DATABASE %s", getDatabaseName()), + String.format("DROP OWNED BY %s", getUserName()), + String.format("DROP USER %s", getUserName())))); + } + + public Stream psqlCmd(Stream sql) { + return Stream.concat( + Stream.of("psql", + "-d", getContainer().getDatabaseName(), + "-U", getContainer().getUsername(), + "-v", "ON_ERROR_STOP=1", + "-a"), + sql.flatMap(stmt -> Stream.of("-c", stmt))); + } + + @Override + public DatabaseDriver getDatabaseDriver() { + return DatabaseDriver.POSTGRESQL; + } + + @Override + public SQLDialect getSqlDialect() { + return SQLDialect.POSTGRES; + } + + private Certificates cachedCerts; + + public synchronized Certificates getCertificates() { + if (cachedCerts == null) { + final String caCert, clientKey, clientCert; + try { + caCert = getContainer().execInContainer("su", "-c", "cat ca.crt").getStdout().trim(); + clientKey = getContainer().execInContainer("su", "-c", "cat client.key").getStdout().trim(); + clientCert = getContainer().execInContainer("su", "-c", "cat client.crt").getStdout().trim(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + cachedCerts = new Certificates(caCert, clientCert, clientKey); + } + return cachedCerts; + } + + public record Certificates(String caCertificate, String clientCertificate, String clientKey) {} + + @Override + public PostgresConfigBuilder configBuilder() { + return new PostgresConfigBuilder(this); + } + + public String getReplicationSlotName() { + return withNamespace("debezium_slot"); + } + + public String getPublicationName() { + return withNamespace("publication"); + } + + public PostgresTestDatabase withReplicationSlot() { + return this + .with("SELECT pg_create_logical_replication_slot('%s', 'pgoutput');", getReplicationSlotName()) + .onClose("SELECT pg_drop_replication_slot('%s');", getReplicationSlotName()); + } + + public PostgresTestDatabase withPublicationForAllTables() { + return this + .with("CREATE PUBLICATION %s FOR ALL TABLES;", getPublicationName()) + .onClose("DROP PUBLICATION %s CASCADE;", getPublicationName()); + } + + static public class PostgresConfigBuilder extends ConfigBuilder { + + protected PostgresConfigBuilder(PostgresTestDatabase testdb) { + super(testdb); + } + + public PostgresConfigBuilder withSchemas(String... schemas) { + return with(JdbcUtils.SCHEMAS_KEY, List.of(schemas)); + } + + public PostgresConfigBuilder withStandardReplication() { + return with("replication_method", ImmutableMap.builder().put("method", "Standard").build()); + } + + public PostgresConfigBuilder withCdcReplication() { + return withCdcReplication("While reading Data"); + } + + public PostgresConfigBuilder withCdcReplication(String LsnCommitBehaviour) { + return this + .with("is_test", true) + .with("replication_method", Jsons.jsonNode(ImmutableMap.builder() + .put("method", "CDC") + .put("replication_slot", getTestDatabase().getReplicationSlotName()) + .put("publication", getTestDatabase().getPublicationName()) + .put("initial_waiting_seconds", ConfigBuilder.DEFAULT_CDC_REPLICATION_INITIAL_WAIT.getSeconds()) + .put("lsn_commit_behaviour", LsnCommitBehaviour) + .build())); + } + + public PostgresConfigBuilder withXminReplication() { + return this.with("replication_method", Jsons.jsonNode(ImmutableMap.builder().put("method", "Xmin").build())); + } + + } + +} diff --git a/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/typing_deduping/AbstractPostgresTypingDedupingTest.java b/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/typing_deduping/AbstractPostgresTypingDedupingTest.java new file mode 100644 index 000000000000..2003241a57dd --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/typing_deduping/AbstractPostgresTypingDedupingTest.java @@ -0,0 +1,432 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.postgres.typing_deduping; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.cdk.db.JdbcCompatibleSourceOperations; +import io.airbyte.cdk.integrations.base.JavaBaseConstants; +import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcTypingDedupingTest; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.text.Names; +import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator; +import io.airbyte.integrations.base.destination.typing_deduping.StreamId; +import io.airbyte.integrations.destination.postgres.PostgresSQLNameTransformer; +import io.airbyte.protocol.models.v0.AirbyteMessage; +import io.airbyte.protocol.models.v0.AirbyteMessage.Type; +import io.airbyte.protocol.models.v0.AirbyteRecordMessage; +import io.airbyte.protocol.models.v0.AirbyteStream; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.v0.DestinationSyncMode; +import io.airbyte.protocol.models.v0.SyncMode; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.jooq.impl.DSL; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +public abstract class AbstractPostgresTypingDedupingTest extends JdbcTypingDedupingTest { + + private static final int DEFAULT_VARCHAR_LIMIT_IN_JDBC_GEN = 65535; + + private static final Random RANDOM = new Random(); + + private String generateBigString() { + // Generate exactly 2 chars over the limit + final int length = DEFAULT_VARCHAR_LIMIT_IN_JDBC_GEN + 2; + return RANDOM + .ints('a', 'z' + 1) + .limit(length) + .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append) + .toString(); + } + + @Override + protected SqlGenerator getSqlGenerator() { + return new PostgresSqlGenerator(new PostgresSQLNameTransformer(), false, false); + } + + @Override + protected JdbcCompatibleSourceOperations getSourceOperations() { + return new PostgresSourceOperations(); + } + + @Disabled + @Test + @Override + public void resumeAfterCancelledTruncate() throws Exception { + super.resumeAfterCancelledTruncate(); + } + + @Test + public void testMixedCasedSchema() throws Exception { + setStreamName("MixedCaseSchema" + getStreamName()); + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.FULL_REFRESH) + .withDestinationSyncMode(DestinationSyncMode.OVERWRITE) + .withStream(new AirbyteStream() + .withNamespace(getStreamNamespace()) + .withName(getStreamName()) + .withJsonSchema(getSchema())) + .withMinimumGenerationId(0L) + .withSyncId(42L) + .withGenerationId(43L))); + + // First sync + final List messages1 = readMessages("dat/sync1_messages.jsonl"); + + runSync(catalog, messages1); + + final List expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl"); + final List expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl"); + verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison()); + } + + @Test + public void testMixedCaseRawTableV1V2Migration() throws Exception { + setStreamName("Mixed Case Table" + getStreamName()); + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.FULL_REFRESH) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withStream(new AirbyteStream() + .withNamespace(getStreamNamespace()) + .withName(getStreamName()) + .withJsonSchema(getSchema())) + .withGenerationId(43L) + .withMinimumGenerationId(0L) + .withSyncId(13L))); + + // First sync + final List messages1 = readMessages("dat/sync1_messages.jsonl"); + + runSync(catalog, messages1, "airbyte/destination-postgres:0.6.3", Function.identity(), null); + // Special case to retrieve raw records pre DV2 using the same logic as actual code. + final String rawTableName = "_airbyte_raw_" + Names.toAlphanumericAndUnderscore(getStreamName()).toLowerCase(); + final List rawActualRecords = getDatabase().queryJsons( + DSL.selectFrom(DSL.name(getStreamNamespace(), rawTableName)).getSQL()); + // Just verify the size of raw pre DV2, postgres was lower casing the MixedCaseSchema so above + // retrieval should give 5 records from sync1 + assertEquals(5, rawActualRecords.size()); + final List messages2 = readMessages("dat/sync2_messages.jsonl"); + runSync(catalog, messages2); + final List expectedRawRecords2 = readRecords("dat/sync2_mixedcase_expectedrecords_raw.jsonl"); + final List expectedFinalRecords2 = readRecords("dat/sync2_mixedcase_expectedrecords_fullrefresh_append_final.jsonl"); + verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison()); + } + + @Test + public void testRawTableMetaMigration_append() throws Exception { + final ConfiguredAirbyteCatalog catalog1 = new ConfiguredAirbyteCatalog().withStreams(List.of( + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.FULL_REFRESH) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withStream(new AirbyteStream() + .withNamespace(getStreamNamespace()) + .withName(getStreamName()) + .withJsonSchema(getSchema())))); + + // First sync without _airbyte_meta + final List messages1 = readMessages("dat/sync1_messages.jsonl"); + runSync(catalog1, messages1, "airbyte/destination-postgres:2.0.4", Function.identity(), null); + // Second sync + final ConfiguredAirbyteCatalog catalog2 = new ConfiguredAirbyteCatalog().withStreams(List.of( + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.FULL_REFRESH) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withStream(new AirbyteStream() + .withNamespace(getStreamNamespace()) + .withName(getStreamName()) + .withJsonSchema(getSchema())) + .withMinimumGenerationId(0L) + .withSyncId(13L) + .withGenerationId(42L))); + final List messages2 = readMessages("dat/sync2_messages_after_meta.jsonl"); + runSync(catalog2, messages2); + + final List expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_mixed_meta_raw.jsonl"); + final List expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_mixed_meta_final.jsonl"); + verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison()); + } + + @Test + public void testRawTableMetaMigration_incrementalDedupe() throws Exception { + final ConfiguredAirbyteCatalog catalog1 = new ConfiguredAirbyteCatalog().withStreams(List.of( + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.INCREMENTAL) + .withCursorField(List.of("updated_at")) + .withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP) + .withPrimaryKey(List.of(List.of("id1"), List.of("id2"))) + .withStream(new AirbyteStream() + .withNamespace(getStreamNamespace()) + .withName(getStreamName()) + .withJsonSchema(getSchema())))); + + // First sync without _airbyte_meta + final List messages1 = readMessages("dat/sync1_messages.jsonl"); + runSync(catalog1, messages1, "airbyte/destination-postgres:2.0.4", Function.identity(), null); + // Second sync + final ConfiguredAirbyteCatalog catalog2 = new ConfiguredAirbyteCatalog().withStreams(List.of( + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.INCREMENTAL) + .withCursorField(List.of("updated_at")) + .withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP) + .withPrimaryKey(List.of(List.of("id1"), List.of("id2"))) + .withStream(new AirbyteStream() + .withNamespace(getStreamNamespace()) + .withName(getStreamName()) + .withJsonSchema(getSchema())) + .withMinimumGenerationId(0L) + .withSyncId(13L) + .withGenerationId(42L))); + final List messages2 = readMessages("dat/sync2_messages_after_meta.jsonl"); + runSync(catalog2, messages2); + + final List expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_mixed_meta_raw.jsonl"); + final List expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_incremental_dedup_meta_final.jsonl"); + verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison()); + } + + @Override + protected List dumpRawTableRecords(String streamNamespace, String streamName) throws Exception { + return super.dumpRawTableRecords(streamNamespace, streamName.toLowerCase()); + } + + @Test + public void testVarcharLimitOver64K() throws Exception { + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.FULL_REFRESH) + .withDestinationSyncMode(DestinationSyncMode.OVERWRITE) + .withStream(new AirbyteStream() + .withNamespace(getStreamNamespace()) + .withName(getStreamName()) + .withJsonSchema(getSchema())) + .withMinimumGenerationId(0L) + .withSyncId(13L) + .withGenerationId(42L))); + + final AirbyteMessage message = new AirbyteMessage(); + final String largeString = generateBigString(); + final Map data = ImmutableMap.of( + "id1", 1, + "id2", 200, + "updated_at", "2021-01-01T00:00:00Z", + "name", largeString); + message.setType(Type.RECORD); + message.setRecord(new AirbyteRecordMessage() + .withNamespace(getStreamNamespace()) + .withStream(getStreamName()) + .withData(Jsons.jsonNode(data)) + .withEmittedAt(1000L)); + final List messages1 = new ArrayList<>(); + messages1.add(message); + runSync(catalog, messages1); + + // Only assert on the large varchar string landing in final table. + // Rest of the fields' correctness is tested by other means in other tests. + final List actualFinalRecords = dumpFinalTableRecords(getStreamNamespace(), getStreamName()); + assertEquals(1, actualFinalRecords.size()); + assertEquals(largeString, actualFinalRecords.get(0).get("name").asText()); + + } + + @Test + void testDropCascade() throws Exception { + ConfiguredAirbyteCatalog catalog1 = + new ConfiguredAirbyteCatalog() + .withStreams( + List.of( + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.FULL_REFRESH) + .withDestinationSyncMode(DestinationSyncMode.OVERWRITE) + .withCursorField(List.of("updated_at")) + .withPrimaryKey(java.util.List.of(List.of("id1"), List.of("id2"))) + .withStream( + new AirbyteStream() + .withNamespace(getStreamNamespace()) + .withName(getStreamName()) + .withJsonSchema(getSchema())) + .withMinimumGenerationId(43L) + .withSyncId(42L) + .withGenerationId(43L))); + + // First sync + List messages1 = readMessages("dat/sync1_messages.jsonl"); + runSync(catalog1, messages1); + var expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl"); + var expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl"); + verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison()); + + String rawTableName = getRawSchema() + "." + + getNameTransformer().convertStreamName( + StreamId.concatenateRawTableName( + getStreamNamespace(), + Names.toAlphanumericAndUnderscore(getStreamName()))); + String finalTableName = getStreamNamespace() + "." + Names.toAlphanumericAndUnderscore(getStreamName()); + getDatabase().execute("CREATE VIEW " + getStreamNamespace() + ".v1 AS SELECT * FROM " + rawTableName); + if (!disableFinalTableComparison()) { + getDatabase().execute("CREATE VIEW " + getStreamNamespace() + ".v2 AS SELECT * FROM " + finalTableName); + } // Second sync + for (var message : messages1) { + message.getRecord().setEmittedAt(2000L); + } + var catalog2 = + new ConfiguredAirbyteCatalog() + .withStreams( + List.of( + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.FULL_REFRESH) + .withDestinationSyncMode(DestinationSyncMode.OVERWRITE) + .withCursorField(List.of("updated_at")) + .withPrimaryKey(java.util.List.of(List.of("id1"), List.of("id2"))) + .withStream( + new AirbyteStream() + .withNamespace(getStreamNamespace()) + .withName(getStreamName()) + .withJsonSchema(getSchema())) + .withMinimumGenerationId(44L) + .withSyncId(42L) + .withGenerationId(44L))); + runSync(catalog2, messages1); + + for (var record : expectedRawRecords1) { + ((ObjectNode) record).put(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, "1970-01-01T00:00:02.000000Z"); + ((ObjectNode) record).put(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, 44); + } + for (var record : expectedFinalRecords1) { + ((ObjectNode) record).put(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, "1970-01-01T00:00:02.000000Z"); + ((ObjectNode) record).put(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, 44); + } + verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison()); + + } + + @Test + void testAirbyteMetaAndGenerationIdMigration() throws Exception { + ConfiguredAirbyteCatalog catalog = + new ConfiguredAirbyteCatalog() + .withStreams( + List.of( + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.FULL_REFRESH) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withSyncId(42L) + .withGenerationId(43L) + .withMinimumGenerationId(0L) + .withStream( + new AirbyteStream() + .withNamespace(getStreamNamespace()) + .withName(getStreamName()) + .withJsonSchema(getSchema())))); + + // First sync + List messages1 = readMessages("dat/sync1_messages.jsonl"); + runSync(catalog, messages1, "airbyte/destination-postgres:2.0.15", Function.identity(), null); + List actualRawRecords1 = dumpRawTableRecords(getStreamNamespace(), getStreamName()); + Set loadedAtValues1 = + actualRawRecords1.stream() + .map((JsonNode record) -> record.get(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT)) + .collect(Collectors.toSet()); + assertEquals( + 1, + loadedAtValues1.size(), + "Expected only one value for _airbyte_loaded_at after the 1st sync!"); + + // Second sync + List messages2 = readMessages("dat/sync2_messages.jsonl"); + runSync(catalog, messages2); + + // The first 5 records in these files were written by the old version, and have + // several differences with the new records: + // In raw tables: _airbyte_generation_id at all. _airbyte_meta only contains the changes field + // In final tables: no generation ID, and airbyte_meta still uses the old `{errors: [...]}` + // structure + // So modify the expected records to reflect those differences. + List expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_raw.jsonl"); + for (int i = 0; i < 5; i++) { + ObjectNode record = (ObjectNode) expectedRawRecords2.get(i); + String originalChanges = record.get(JavaBaseConstants.COLUMN_NAME_AB_META).get("changes").toString(); + record.set(JavaBaseConstants.COLUMN_NAME_AB_META, + Jsons.deserialize( + "{\"changes\":" + originalChanges + "}")); + record.remove(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID); + } + List expectedFinalRecords2 = + readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl"); + for (int i = 0; i < 5; i++) { + ObjectNode record = (ObjectNode) expectedFinalRecords2.get(i); + String originalChanges = record.get(JavaBaseConstants.COLUMN_NAME_AB_META).get("changes").toString(); + record.set(JavaBaseConstants.COLUMN_NAME_AB_META, + Jsons.deserialize( + "{\"changes\":" + originalChanges + "}")); + record.remove(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID); + } + verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison()); + + // Verify that we didn't trigger a soft reset. + // There should be two unique loaded_at values in the raw table. + // (only do this if T+D is enabled to begin with; otherwise loaded_at will just be null) + if (!disableFinalTableComparison()) { + List actualRawRecords2 = dumpRawTableRecords(getStreamNamespace(), getStreamName()); + Set loadedAtValues2 = + actualRawRecords2.stream() + .map((JsonNode record) -> record.get(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT)) + .collect(Collectors.toSet()); + assertEquals( + 2, + loadedAtValues2.size(), + "Expected two different values for loaded_at. If there is only 1 value, then we incorrectly triggered a soft reset. If there are more than 2, then something weird happened?"); + assertTrue( + loadedAtValues2.containsAll(loadedAtValues1), + "expected the loaded_at value from the 1st sync. If it's not there, then we incorrectly triggered a soft reset."); + + } + } + + @Test + void testAirbyteMetaAndGenerationIdMigrationForOverwrite() throws Exception { + ConfiguredAirbyteCatalog catalog = + new ConfiguredAirbyteCatalog() + .withStreams( + List.of( + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.FULL_REFRESH) + .withDestinationSyncMode(DestinationSyncMode.OVERWRITE) + .withSyncId(42L) + .withGenerationId(43L) + .withMinimumGenerationId(43L) + .withStream( + new AirbyteStream() + .withNamespace(getStreamNamespace()) + .withName(getStreamName()) + .withJsonSchema(getSchema())))); + + // First sync + List messages1 = readMessages("dat/sync1_messages.jsonl"); + runSync(catalog, messages1, "airbyte/destination-postgres:2.0.15", Function.identity(), null); + + // Second sync + List messages2 = readMessages("dat/sync2_messages.jsonl"); + runSync(catalog, messages2); + + List expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_overwrite_raw.jsonl"); + List expectedFinalRecords2 = + readRecords("dat/sync2_expectedrecords_fullrefresh_overwrite_final.jsonl"); + verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison()); + } + +} diff --git a/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSourceOperations.java b/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSourceOperations.java new file mode 100644 index 000000000000..997aa66bf2a2 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSourceOperations.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.postgres.typing_deduping; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.cdk.db.jdbc.JdbcSourceOperations; +import io.airbyte.commons.json.Jsons; +import java.sql.ResultSet; +import java.sql.SQLException; + +/** + * See + * {@link io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGeneratorIntegrationTest.RedshiftSourceOperations}. + * Copied here to avoid weird dependencies. + */ +public class PostgresSourceOperations extends JdbcSourceOperations { + + @Override + public void copyToJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { + final String columnName = resultSet.getMetaData().getColumnName(colIndex); + final String columnTypeName = resultSet.getMetaData().getColumnTypeName(colIndex).toLowerCase(); + + switch (columnTypeName) { + // JSONB has no equivalent in JDBCType + case "jsonb" -> json.set(columnName, Jsons.deserializeExact(resultSet.getString(colIndex))); + // For some reason, the driver maps these to their timezoneless equivalents (TIME and TIMESTAMP) + case "timetz" -> putTimeWithTimezone(json, columnName, resultSet, colIndex); + case "timestamptz" -> putTimestampWithTimezone(json, columnName, resultSet, colIndex); + default -> super.copyToJsonField(resultSet, colIndex, json); + } + } + +} diff --git a/airbyte-integrations/connectors/destination-snowflake/gradle.properties b/airbyte-integrations/connectors/destination-snowflake/gradle.properties index 499790fc2c20..4ab967cdc434 100644 --- a/airbyte-integrations/connectors/destination-snowflake/gradle.properties +++ b/airbyte-integrations/connectors/destination-snowflake/gradle.properties @@ -1,3 +1,3 @@ testExecutionConcurrency=-1 -cdkVersion=0.1.44 +cdkVersion=local JunitMethodExecutionTimeout=10 m diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/write/load/SnowflakeInsertBuffer.kt b/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/write/load/SnowflakeInsertBuffer.kt index dab8165d0e27..b93ddc475c47 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/write/load/SnowflakeInsertBuffer.kt +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/write/load/SnowflakeInsertBuffer.kt @@ -58,6 +58,8 @@ class SnowflakeInsertBuffer( try { csvPrinter?.flush() logger.info { "Beginning insert into ${tableName.toPrettyString(quote = QUOTE)}" } + logger.error { "_________________________" } + logger.error { File(filePath.pathString).readText() } // Next, put the CSV file into the staging table snowflakeClient.putInStage(tableName, filePath.pathString) // Finally, copy the data from the staging table to the final table @@ -86,6 +88,8 @@ class SnowflakeInsertBuffer( private fun writeToCsvFile(record: Map) { csvPrinter?.let { + logger.error { "--------------- ${snowflakeRecordFormatter.format(record)}" } + logger.error { "--------------- $record" } it.printRecord(snowflakeRecordFormatter.format(record)) recordCount++ if ((recordCount % flushLimit) == 0) {