diff --git a/airbyte-cdk/bulk/core/load/build.gradle b/airbyte-cdk/bulk/core/load/build.gradle index b656f8cf90f0..77b1b6142983 100644 --- a/airbyte-cdk/bulk/core/load/build.gradle +++ b/airbyte-cdk/bulk/core/load/build.gradle @@ -21,8 +21,10 @@ dependencies { implementation("com.google.guava:guava:33.3.0-jre") testFixturesApi testFixtures(project(':airbyte-cdk:bulk:core:bulk-cdk-core-base')) + testFixturesApi project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-db') testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.8.1") + testFixturesImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.8.1") testImplementation("io.mockk:mockk:1.13.12") implementation "org.jetbrains.kotlin:kotlin-reflect:2.0.20" testFixturesImplementation "uk.org.webcompere:system-stubs-jupiter:2.1.7" diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/CoreTableOperationsSuite.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/CoreTableOperationsSuite.kt new file mode 100644 index 000000000000..5703bb460c5d --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/CoreTableOperationsSuite.kt @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2025 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.component + +import io.airbyte.cdk.load.CoreTableOperationsClient +import io.airbyte.cdk.load.command.Append +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.command.NamespaceMapper +import io.airbyte.cdk.load.data.ObjectType +import io.airbyte.cdk.load.orchestration.db.ColumnNameMapping +import io.airbyte.cdk.load.orchestration.db.TableName +import java.util.UUID +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.assertDoesNotThrow + +interface CoreTableOperationsSuite { + val client: CoreTableOperationsClient + + fun `connect to database`() = runTest { + assertDoesNotThrow { client.ping() } + } + + fun `create and drop namespaces`() = runTest { + val testNamespace = "namespace-test-${UUID.randomUUID()}" + + require(!client.namespaceExists(testNamespace)) { "test namespace: $testNamespace already exists. Please validate it's deleted before running again." } + + client.createNamespace(testNamespace) + + assert(client.namespaceExists(testNamespace)) + + client.dropNamespace(testNamespace) + + assert(!client.namespaceExists(testNamespace)) + } + + fun `create and drop tables`() = runTest { + val uniquePostFix = UUID.randomUUID() + val testTable = TableName( + "table-test-namespace-$uniquePostFix", + "table-test-table-$uniquePostFix", + ) + + require(!client.tableExists(testTable)) { "test table: ${testTable.namespace}.${testTable.name} already exists. Please validate it's deleted before running again." } + + client.createTable( + stream = DestinationStream( + unmappedNamespace = testTable.namespace, + unmappedName = testTable.name, + importType = Append, + generationId = 1, + minimumGenerationId = 0, + syncId = 1, + includeFiles = false, + schema = ObjectType(linkedMapOf()), + namespaceMapper = NamespaceMapper(), + ), + tableName = testTable, + columnNameMapping = ColumnNameMapping(mapOf()), + replace = false, + ) + + assert(client.tableExists(testTable)) + + client.dropTable(testTable) + + assert(!client.tableExists(testTable)) + } + + fun `count table rows`() {} + + fun `overwrite tables`() {} + + fun `copy tables`() {} + + fun `upsert tables`() {} + + fun `get generation id`() {} +} diff --git a/airbyte-cdk/bulk/toolkits/load-db/src/main/kotlin/io/airbyte/cdk/load/CoreTableOperationsClient.kt b/airbyte-cdk/bulk/toolkits/load-db/src/main/kotlin/io/airbyte/cdk/load/CoreTableOperationsClient.kt new file mode 100644 index 000000000000..911e61626a3d --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-db/src/main/kotlin/io/airbyte/cdk/load/CoreTableOperationsClient.kt @@ -0,0 +1,21 @@ +package io.airbyte.cdk.load + +import io.airbyte.cdk.load.client.AirbyteClient +import io.airbyte.cdk.load.orchestration.db.TableName +import io.airbyte.cdk.load.orchestration.db.direct_load_table.DirectLoadTableNativeOperations +import io.airbyte.cdk.load.orchestration.db.direct_load_table.DirectLoadTableSqlOperations + +interface CoreTableOperationsClient : + AirbyteClient, + DirectLoadTableSqlOperations, + DirectLoadTableNativeOperations { + + suspend fun ping() = Unit + + suspend fun namespaceExists(namespace: String) = false + + suspend fun dropNamespace(namespace: String) = Unit + + suspend fun tableExists(table: TableName) = false + +} diff --git a/airbyte-integrations/connectors/destination-clickhouse/build.gradle b/airbyte-integrations/connectors/destination-clickhouse/build.gradle index 5437a67a20e9..41fbe61139d2 100644 --- a/airbyte-integrations/connectors/destination-clickhouse/build.gradle +++ b/airbyte-integrations/connectors/destination-clickhouse/build.gradle @@ -33,6 +33,7 @@ application { dependencies { implementation 'com.clickhouse:client-v2:0.9.0' + implementation project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-db') testImplementation("io.mockk:mockk:1.14.2") testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.2") @@ -42,4 +43,5 @@ dependencies { // https://mvnrepository.com/artifact/org.testcontainers/clickhouse integrationTestImplementation 'org.testcontainers:clickhouse:1.21.1' integrationTestImplementation 'com.clickhouse:client-v2:0.9.0' + integrationTestImplementation testFixtures(project(':airbyte-cdk:bulk:core:bulk-cdk-core-load')) } diff --git a/airbyte-integrations/connectors/destination-clickhouse/metadata.yaml b/airbyte-integrations/connectors/destination-clickhouse/metadata.yaml index ddb7094552d4..04bf028ab5b9 100644 --- a/airbyte-integrations/connectors/destination-clickhouse/metadata.yaml +++ b/airbyte-integrations/connectors/destination-clickhouse/metadata.yaml @@ -39,5 +39,8 @@ data: connectorTestSuitesOptions: - suite: unitTests - suite: integrationTests + testSecrets: + - name: test_instance + fileName: test-instance.json supportsRefreshes: true metadataSpecVersion: "1.0" diff --git a/airbyte-integrations/connectors/destination-clickhouse/src/main/kotlin/io/airbyte/integrations/destination/clickhouse/client/ClickhouseAirbyteClient.kt b/airbyte-integrations/connectors/destination-clickhouse/src/main/kotlin/io/airbyte/integrations/destination/clickhouse/client/ClickhouseAirbyteClient.kt index 89116ab7bab8..575f03fb891f 100644 --- a/airbyte-integrations/connectors/destination-clickhouse/src/main/kotlin/io/airbyte/integrations/destination/clickhouse/client/ClickhouseAirbyteClient.kt +++ b/airbyte-integrations/connectors/destination-clickhouse/src/main/kotlin/io/airbyte/integrations/destination/clickhouse/client/ClickhouseAirbyteClient.kt @@ -13,15 +13,13 @@ import com.clickhouse.data.ClickHouseColumn import com.clickhouse.data.ClickHouseDataType import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import io.airbyte.cdk.ConfigErrorException -import io.airbyte.cdk.load.client.AirbyteClient +import io.airbyte.cdk.load.CoreTableOperationsClient import io.airbyte.cdk.load.command.Dedupe import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAMES import io.airbyte.cdk.load.orchestration.db.ColumnNameMapping import io.airbyte.cdk.load.orchestration.db.TableName import io.airbyte.cdk.load.orchestration.db.TempTableNameGenerator -import io.airbyte.cdk.load.orchestration.db.direct_load_table.DirectLoadTableNativeOperations -import io.airbyte.cdk.load.orchestration.db.direct_load_table.DirectLoadTableSqlOperations import io.airbyte.integrations.destination.clickhouse.client.ClickhouseSqlGenerator.Companion.DATETIME_WITH_PRECISION import io.airbyte.integrations.destination.clickhouse.client.ClickhouseSqlGenerator.Companion.DECIMAL_WITH_PRECISION_AND_SCALE import io.airbyte.integrations.destination.clickhouse.config.ClickhouseFinalTableNameGenerator @@ -46,7 +44,7 @@ class ClickhouseAirbyteClient( private val nameGenerator: ClickhouseFinalTableNameGenerator, private val tempTableNameGenerator: TempTableNameGenerator, private val clickhouseConfiguration: ClickhouseConfiguration, -) : AirbyteClient, DirectLoadTableSqlOperations, DirectLoadTableNativeOperations { +) : CoreTableOperationsClient { override suspend fun createNamespace(namespace: String) { val statement = sqlGenerator.createNamespace(namespace) @@ -349,4 +347,8 @@ class ClickhouseAirbyteClient( this.name } } + + override suspend fun ping() { + execute("SELECT 1") + } } diff --git a/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/kotlin/io/airbyte/integrations/destination/clickhouse/component/ClickhouseTableOperationsTest.kt b/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/kotlin/io/airbyte/integrations/destination/clickhouse/component/ClickhouseTableOperationsTest.kt new file mode 100644 index 000000000000..14a831624b5b --- /dev/null +++ b/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/kotlin/io/airbyte/integrations/destination/clickhouse/component/ClickhouseTableOperationsTest.kt @@ -0,0 +1,47 @@ +package io.airbyte.integrations.destination.clickhouse.component + +import io.airbyte.cdk.load.component.CoreTableOperationsSuite +import io.airbyte.integrations.destination.clickhouse.client.ClickhouseAirbyteClient +import io.micronaut.test.extensions.junit5.annotation.MicronautTest +import jakarta.inject.Inject +import org.junit.jupiter.api.Test + +@MicronautTest +class ClickhouseTableOperationsTest: CoreTableOperationsSuite { + @Inject + override lateinit var client: ClickhouseAirbyteClient + + @Test + override fun `connect to database`() { + super.`connect to database`() + } + + @Test + override fun `create and drop tables`() { + super.`create and drop tables`() + } + + override fun `create and drop namespaces`() { + super.`create and drop namespaces`() + } + + override fun `count table rows`() { + super.`count table rows`() + } + + override fun `overwrite tables`() { + super.`overwrite tables`() + } + + override fun `copy tables`() { + super.`copy tables`() + } + + override fun `upsert tables`() { + super.`upsert tables`() + } + + override fun `get generation id`() { + super.`get generation id`() + } +} diff --git a/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/kotlin/io/airbyte/integrations/destination/clickhouse/component/config/TestConfigBeanOverrides.kt b/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/kotlin/io/airbyte/integrations/destination/clickhouse/component/config/TestConfigBeanOverrides.kt new file mode 100644 index 000000000000..3be8b0bba0e1 --- /dev/null +++ b/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/kotlin/io/airbyte/integrations/destination/clickhouse/component/config/TestConfigBeanOverrides.kt @@ -0,0 +1,24 @@ +package io.airbyte.integrations.destination.clickhouse.component.config + +import io.airbyte.cdk.util.Jsons +import io.airbyte.integrations.destination.clickhouse.spec.ClickhouseConfiguration +import io.airbyte.integrations.destination.clickhouse.spec.ClickhouseConfigurationFactory +import io.airbyte.integrations.destination.clickhouse.spec.ClickhouseSpecificationOss +import io.micronaut.context.annotation.Factory +import io.micronaut.context.annotation.Primary +import jakarta.inject.Singleton +import java.nio.file.Files +import kotlin.io.path.Path + +@Factory +class TestConfigBeanOverrides { + @Singleton + @Primary + fun config(): ClickhouseConfiguration { + val configPath = Path("secrets/test-instance.json") + val configStr = Files.readString(configPath) + val spec = Jsons.readValue(configStr, ClickhouseSpecificationOss::class.java) + + return ClickhouseConfigurationFactory().makeWithoutExceptionHandling(spec) + } +}