Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3338,12 +3338,12 @@ abstract class BasicFunctionalityIntegrationTest(
"""
{
"id": 1,
"string": "fo\u0000o",
"string": "fo,\u0000o",
"number": 42.1,
"integer": 42,
"boolean": true,
"timestamp_with_timezone": "2023-01-23T11:34:56-01:00",
"timestamp_without_timezone": "2023-01-23T12:34:56",
"timestamp_without_timezone": "2023-01-23T12:34:56,5",
"time_with_timezone": "11:34:56-01:00",
"time_without_timezone": "12:34:56",
"date": "2023-01-23"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,34 @@ def hikariCpVersion = "7.0.2"
def junitVersion = "5.13.4"
def junitPlatformVersion = "1.13.4"
def postgresqlVersion = "42.7.2"
def testContainersVersion = "1.20.5"

dependencies {
implementation("org.postgresql:postgresql:$postgresqlVersion")
implementation("com.zaxxer:HikariCP:$hikariCpVersion")
implementation(project(":airbyte-cdk:java:airbyte-cdk:airbyte-cdk-core"))
implementation(project(":airbyte-cdk:java:airbyte-cdk:airbyte-cdk-typing-deduping"))
implementation(project(":airbyte-cdk:java:airbyte-cdk:airbyte-cdk-db-destinations"))

testImplementation("io.mockk:mockk:1.14.5")
testImplementation("org.junit.jupiter:junit-jupiter-api:$junitVersion")
testImplementation("org.junit.jupiter:junit-jupiter:$junitVersion")

testRuntimeOnly("org.junit.platform:junit-platform-engine:$junitPlatformVersion")
testRuntimeOnly("org.junit.platform:junit-platform-launcher:$junitPlatformVersion")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:$junitVersion")

// TestFixtures dependencies - needed for AbstractPostgresTypingDedupingTest
testFixturesApi(testFixtures(project(":airbyte-cdk:java:airbyte-cdk:airbyte-cdk-db-destinations")))
testFixturesApi(project(":airbyte-cdk:java:airbyte-cdk:airbyte-cdk-typing-deduping"))
testFixturesApi(project(":airbyte-cdk:java:airbyte-cdk:airbyte-cdk-dependencies"))
testFixturesApi("org.testcontainers:postgresql:$testContainersVersion")
testFixturesImplementation("org.postgresql:postgresql:$postgresqlVersion")
testFixturesImplementation("com.zaxxer:HikariCP:$hikariCpVersion")

integrationTestImplementation("com.zaxxer:HikariCP:$hikariCpVersion")
integrationTestImplementation("org.postgresql:postgresql:$postgresqlVersion")
integrationTestImplementation("org.testcontainers:postgresql:$testContainersVersion")
integrationTestImplementation(project(":airbyte-cdk:java:airbyte-cdk:airbyte-cdk-dependencies"))
integrationTestImplementation(testFixtures(project(":airbyte-cdk:java:airbyte-cdk:airbyte-cdk-db-destinations")))
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,234 @@
/*
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.postgres

import io.airbyte.cdk.AirbyteDestinationRunner
import com.fasterxml.jackson.databind.JsonNode
import com.google.common.collect.ImmutableMap
import io.airbyte.cdk.db.factory.DataSourceFactory
import io.airbyte.cdk.db.factory.DatabaseDriver
import io.airbyte.cdk.db.jdbc.JdbcDatabase
import io.airbyte.cdk.db.jdbc.JdbcUtils
import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler.Companion.addThrowableForDeinterpolation
import io.airbyte.cdk.integrations.base.Destination
import io.airbyte.cdk.integrations.base.IntegrationRunner
import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination
import io.airbyte.cdk.integrations.destination.async.deser.StreamAwareDataTransformer
import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator
import io.airbyte.cdk.integrations.util.PostgresSslConnectionUtils
import io.airbyte.cdk.integrations.util.PostgresSslConnectionUtils.obtainConnectionOptions
import io.airbyte.commons.json.Jsons.jsonNode
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration
import io.airbyte.integrations.destination.postgres.typing_deduping.*
import java.net.URLEncoder
import java.nio.charset.StandardCharsets
import java.time.Duration
import java.util.*
import org.postgresql.util.PSQLException
import org.slf4j.Logger
import org.slf4j.LoggerFactory

fun main(args: Array<String>) {
AirbyteDestinationRunner.run(*args)
class PostgresDestination :
AbstractJdbcDestination<PostgresState>(DRIVER_CLASS, PostgresSQLNameTransformer()),
Destination {
override fun modifyDataSourceBuilder(
builder: DataSourceFactory.DataSourceBuilder
): DataSourceFactory.DataSourceBuilder {
// Anything in the pg_temp schema is only visible to the connection that created it.
// So this creates an airbyte_safe_cast function that only exists for the duration of
// a single connection.
// This avoids issues with creating the same function concurrently (e.g. if multiple syncs
// run
// at the same time).
// Function definition copied from https://dba.stackexchange.com/a/203986

// Adding 60 seconds to connection timeout, for ssl connections, default 10 seconds is not
// enough

return builder
.withConnectionTimeout(Duration.ofSeconds(60))
.withConnectionInitSql(
"""
CREATE OR REPLACE FUNCTION pg_temp.airbyte_safe_cast(_in text, INOUT _out ANYELEMENT)
LANGUAGE plpgsql AS
${'$'}func${'$'}
BEGIN
EXECUTE format('SELECT %L::%s', ${'$'}1, pg_typeof(_out))
INTO _out;
EXCEPTION WHEN others THEN
-- do nothing: _out already carries default
END
${'$'}func${'$'};

""".trimIndent()
)
}

public override fun getDefaultConnectionProperties(config: JsonNode): Map<String, String> {
val additionalParameters: MutableMap<String, String> = HashMap()
if (
!config.has(PostgresSslConnectionUtils.PARAM_SSL) ||
config
.get(
PostgresSslConnectionUtils.PARAM_SSL,
)
.asBoolean()
) {
if (config.has(PostgresSslConnectionUtils.PARAM_SSL_MODE)) {
if (
PostgresSslConnectionUtils.DISABLE ==
config
.get(PostgresSslConnectionUtils.PARAM_SSL_MODE)
.get(PostgresSslConnectionUtils.PARAM_MODE)
.asText()
) {
additionalParameters["sslmode"] = PostgresSslConnectionUtils.DISABLE
} else {
additionalParameters.putAll(
obtainConnectionOptions(
config.get(
PostgresSslConnectionUtils.PARAM_SSL_MODE,
),
),
)
}
} else {
additionalParameters[JdbcUtils.SSL_KEY] = "true"
additionalParameters["sslmode"] = "require"
}
}
return additionalParameters
}

override fun toJdbcConfig(config: JsonNode): JsonNode {
val schema =
Optional.ofNullable(config[JdbcUtils.SCHEMA_KEY])
.map { obj: JsonNode -> obj.asText() }
.orElse("public")

var encodedDatabase = config[JdbcUtils.DATABASE_KEY].asText()
if (encodedDatabase != null) {
encodedDatabase = URLEncoder.encode(encodedDatabase, StandardCharsets.UTF_8)
}
val jdbcUrl =
String.format(
"jdbc:postgresql://%s:%s/%s?",
config[JdbcUtils.HOST_KEY].asText(),
config[JdbcUtils.PORT_KEY].asText(),
encodedDatabase
)

val configBuilder =
ImmutableMap.builder<Any, Any>()
.put(JdbcUtils.USERNAME_KEY, config[JdbcUtils.USERNAME_KEY].asText())
.put(JdbcUtils.JDBC_URL_KEY, jdbcUrl)
.put(JdbcUtils.SCHEMA_KEY, schema)

if (config.has(JdbcUtils.PASSWORD_KEY)) {
configBuilder.put(JdbcUtils.PASSWORD_KEY, config[JdbcUtils.PASSWORD_KEY].asText())
}

if (config.has(JdbcUtils.JDBC_URL_PARAMS_KEY)) {
configBuilder.put(
JdbcUtils.JDBC_URL_PARAMS_KEY,
config[JdbcUtils.JDBC_URL_PARAMS_KEY].asText()
)
}

return jsonNode(configBuilder.build())
}

override fun getSqlGenerator(config: JsonNode): JdbcSqlGenerator {
return PostgresSqlGenerator(
PostgresSQLNameTransformer(),
hasDropCascadeMode(config),
hasUnconstrainedNumber(config),
)
}
override fun getSqlOperations(config: JsonNode): PostgresSqlOperations {
return PostgresSqlOperations(hasDropCascadeMode(config))
}

override fun getGenerationHandler(): PostgresGenerationHandler {
return PostgresGenerationHandler()
}

private fun hasDropCascadeMode(config: JsonNode): Boolean {
val dropCascadeNode = config[DROP_CASCADE_OPTION]
return dropCascadeNode != null && dropCascadeNode.asBoolean()
}

private fun hasUnconstrainedNumber(config: JsonNode): Boolean {
val unconstrainedNumberNode = config[UNCONSTRAINED_NUMBER_OPTION]
return unconstrainedNumberNode != null && unconstrainedNumberNode.asBoolean()
}

override fun getDestinationHandler(
config: JsonNode,
databaseName: String,
database: JdbcDatabase,
rawTableSchema: String
): JdbcDestinationHandler<PostgresState> {
return PostgresDestinationHandler(
databaseName,
database,
rawTableSchema,
getGenerationHandler(),
)
}

protected override fun getMigrations(
database: JdbcDatabase,
databaseName: String,
sqlGenerator: SqlGenerator,
destinationHandler: DestinationHandler<PostgresState>
): List<Migration<PostgresState>> {
return java.util.List.of<Migration<PostgresState>>(
PostgresRawTableAirbyteMetaMigration(database, databaseName),
PostgresGenerationIdMigration(database, databaseName),
)
}

override fun getDataTransformer(
parsedCatalog: ParsedCatalog?,
defaultNamespace: String?
): StreamAwareDataTransformer {
return PostgresDataTransformer()
}

override val isV2Destination: Boolean
get() = true

companion object {
private val LOGGER: Logger = LoggerFactory.getLogger(PostgresDestination::class.java)

val DRIVER_CLASS: String = DatabaseDriver.POSTGRESQL.driverClassName

const val DROP_CASCADE_OPTION = "drop_cascade"
const val UNCONSTRAINED_NUMBER_OPTION = "unconstrained_number"

@JvmStatic
fun sshWrappedDestination(): Destination {
return SshWrappedDestination(
PostgresDestination(),
JdbcUtils.HOST_LIST_KEY,
JdbcUtils.PORT_LIST_KEY
)
}

@Throws(Exception::class)
@JvmStatic
fun main(args: Array<String>) {
addThrowableForDeinterpolation(PSQLException::class.java)
val destination = sshWrappedDestination()
LOGGER.info("starting destination: {}", PostgresDestination::class.java)
IntegrationRunner(destination).run(args)
LOGGER.info("completed destination: {}", PostgresDestination::class.java)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.postgres

import io.airbyte.cdk.db.jdbc.JdbcDatabase
import io.airbyte.cdk.integrations.destination.jdbc.JdbcGenerationHandler

class PostgresGenerationHandler : JdbcGenerationHandler {
override fun getGenerationIdInTable(
database: JdbcDatabase,
namespace: String,
name: String
): Long? {
val selectTableResultSet =
database
.unsafeQuery(
"""SELECT 1
| FROM pg_catalog.pg_namespace n
| JOIN pg_catalog.pg_class c
| ON c.relnamespace=n.oid
| JOIN pg_catalog.pg_attribute a
| ON a.attrelid = c.oid
| WHERE n.nspname=?
| AND c.relkind='r'
| AND c.relname=?
| AND a.attname=?
| LIMIT 1
""".trimMargin(),
namespace,
name,
"_airbyte_generation_id"
)
.use { it.toList() }
if (selectTableResultSet.isEmpty()) {
return null
} else {
val selectGenIdResultSet =
database
.unsafeQuery("SELECT _airbyte_generation_id FROM $namespace.$name LIMIT 1;")
.use { it.toList() }
if (selectGenIdResultSet.isEmpty()) {
return null
} else {
val genIdInTable =
selectGenIdResultSet.first().get("_airbyte_generation_id")?.asLong()
LOGGER.info { "found generationId in table $namespace.$name: $genIdInTable" }
return genIdInTable ?: -1L
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.postgres

import io.airbyte.cdk.integrations.destination.StandardNameTransformer
import java.util.*
import kotlin.math.min

class PostgresSQLNameTransformer : StandardNameTransformer() {
// I _think_ overriding these two methods is sufficient to apply the truncation logic everywhere
// but this interface + our superclass are weirdly complicated, so plausibly something is
// missing
override fun getIdentifier(name: String): String {
return truncate(super.getIdentifier(name))
}

override fun convertStreamName(input: String): String {
return truncate(super.convertStreamName(input))
}

override fun applyDefaultCase(input: String): String {
return input.lowercase(Locale.getDefault())
}

// see https://github.com/airbytehq/airbyte/issues/35333
// We cannot delete these method until connectors don't need old v1 raw table references for
// migration
@Deprecated("") // Overriding a deprecated method is, itself, a warning
@Suppress("deprecation")
override fun getRawTableName(streamName: String): String {
return convertStreamName("_airbyte_raw_" + streamName.lowercase(Locale.getDefault()))
}

/**
* Postgres silently truncates identifiers to 63 characters. Utility method to do that
* truncation explicitly, so that we can detect e.g. name collisions.
*/
private fun truncate(str: String): String {
return str.substring(0, min(str.length.toDouble(), 63.0).toInt())
}
}
Loading
Loading