Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airbyte-cdk/bulk/core/load/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ dependencies {
implementation("com.google.guava:guava:33.3.0-jre")

testFixturesApi testFixtures(project(':airbyte-cdk:bulk:core:bulk-cdk-core-base'))
testFixturesApi project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-db')

testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.8.1")
testFixturesImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.8.1")
testImplementation("io.mockk:mockk:1.13.12")
implementation "org.jetbrains.kotlin:kotlin-reflect:2.0.20"
testFixturesImplementation "uk.org.webcompere:system-stubs-jupiter:2.1.7"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.component

import io.airbyte.cdk.load.CoreTableOperationsClient
import io.airbyte.cdk.load.command.Append
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.command.NamespaceMapper
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.orchestration.db.ColumnNameMapping
import io.airbyte.cdk.load.orchestration.db.TableName
import java.util.UUID
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.assertDoesNotThrow

interface CoreTableOperationsSuite {
val client: CoreTableOperationsClient

fun `connect to database`() = runTest {
assertDoesNotThrow { client.ping() }
}

fun `create and drop namespaces`() = runTest {
val testNamespace = "namespace-test-${UUID.randomUUID()}"

require(!client.namespaceExists(testNamespace)) { "test namespace: $testNamespace already exists. Please validate it's deleted before running again." }

client.createNamespace(testNamespace)

assert(client.namespaceExists(testNamespace))

client.dropNamespace(testNamespace)

assert(!client.namespaceExists(testNamespace))
}

fun `create and drop tables`() = runTest {
val uniquePostFix = UUID.randomUUID()
val testTable = TableName(
"table-test-namespace-$uniquePostFix",
"table-test-table-$uniquePostFix",
)

require(!client.tableExists(testTable)) { "test table: ${testTable.namespace}.${testTable.name} already exists. Please validate it's deleted before running again." }

client.createTable(
stream = DestinationStream(
unmappedNamespace = testTable.namespace,
unmappedName = testTable.name,
importType = Append,
generationId = 1,
minimumGenerationId = 0,
syncId = 1,
includeFiles = false,
schema = ObjectType(linkedMapOf()),
namespaceMapper = NamespaceMapper(),
),
tableName = testTable,
columnNameMapping = ColumnNameMapping(mapOf()),
replace = false,
)

assert(client.tableExists(testTable))

client.dropTable(testTable)

assert(!client.tableExists(testTable))
}

fun `count table rows`() {}

fun `overwrite tables`() {}

fun `copy tables`() {}

fun `upsert tables`() {}

fun `get generation id`() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.airbyte.cdk.load

import io.airbyte.cdk.load.client.AirbyteClient
import io.airbyte.cdk.load.orchestration.db.TableName
import io.airbyte.cdk.load.orchestration.db.direct_load_table.DirectLoadTableNativeOperations
import io.airbyte.cdk.load.orchestration.db.direct_load_table.DirectLoadTableSqlOperations

interface CoreTableOperationsClient :
AirbyteClient,
DirectLoadTableSqlOperations,
DirectLoadTableNativeOperations {

suspend fun ping() = Unit

suspend fun namespaceExists(namespace: String) = false

suspend fun dropNamespace(namespace: String) = Unit

suspend fun tableExists(table: TableName) = false

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ application {

dependencies {
implementation 'com.clickhouse:client-v2:0.9.0'
implementation project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-db')

testImplementation("io.mockk:mockk:1.14.2")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.2")
Expand All @@ -42,4 +43,5 @@ dependencies {
// https://mvnrepository.com/artifact/org.testcontainers/clickhouse
integrationTestImplementation 'org.testcontainers:clickhouse:1.21.1'
integrationTestImplementation 'com.clickhouse:client-v2:0.9.0'
integrationTestImplementation testFixtures(project(':airbyte-cdk:bulk:core:bulk-cdk-core-load'))
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,8 @@ data:
connectorTestSuitesOptions:
- suite: unitTests
- suite: integrationTests
testSecrets:
- name: test_instance
fileName: test-instance.json
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edgao I've got a file named test-instance.json in the secrets dir with creds etc in there and I want this to be injected for the tests.

supportsRefreshes: true
metadataSpecVersion: "1.0"
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@ import com.clickhouse.data.ClickHouseColumn
import com.clickhouse.data.ClickHouseDataType
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.ConfigErrorException
import io.airbyte.cdk.load.client.AirbyteClient
import io.airbyte.cdk.load.CoreTableOperationsClient
import io.airbyte.cdk.load.command.Dedupe
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAMES
import io.airbyte.cdk.load.orchestration.db.ColumnNameMapping
import io.airbyte.cdk.load.orchestration.db.TableName
import io.airbyte.cdk.load.orchestration.db.TempTableNameGenerator
import io.airbyte.cdk.load.orchestration.db.direct_load_table.DirectLoadTableNativeOperations
import io.airbyte.cdk.load.orchestration.db.direct_load_table.DirectLoadTableSqlOperations
import io.airbyte.integrations.destination.clickhouse.client.ClickhouseSqlGenerator.Companion.DATETIME_WITH_PRECISION
import io.airbyte.integrations.destination.clickhouse.client.ClickhouseSqlGenerator.Companion.DECIMAL_WITH_PRECISION_AND_SCALE
import io.airbyte.integrations.destination.clickhouse.config.ClickhouseFinalTableNameGenerator
Expand All @@ -46,7 +44,7 @@ class ClickhouseAirbyteClient(
private val nameGenerator: ClickhouseFinalTableNameGenerator,
private val tempTableNameGenerator: TempTableNameGenerator,
private val clickhouseConfiguration: ClickhouseConfiguration,
) : AirbyteClient, DirectLoadTableSqlOperations, DirectLoadTableNativeOperations {
) : CoreTableOperationsClient {

override suspend fun createNamespace(namespace: String) {
val statement = sqlGenerator.createNamespace(namespace)
Expand Down Expand Up @@ -349,4 +347,8 @@ class ClickhouseAirbyteClient(
this.name
}
}

override suspend fun ping() {
execute("SELECT 1")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.airbyte.integrations.destination.clickhouse.component

import io.airbyte.cdk.load.component.CoreTableOperationsSuite
import io.airbyte.integrations.destination.clickhouse.client.ClickhouseAirbyteClient
import io.micronaut.test.extensions.junit5.annotation.MicronautTest
import jakarta.inject.Inject
import org.junit.jupiter.api.Test

@MicronautTest
class ClickhouseTableOperationsTest: CoreTableOperationsSuite {
@Inject
override lateinit var client: ClickhouseAirbyteClient

@Test
override fun `connect to database`() {
super.`connect to database`()
}

@Test
override fun `create and drop tables`() {
super.`create and drop tables`()
}

override fun `create and drop namespaces`() {
super.`create and drop namespaces`()
}

override fun `count table rows`() {
super.`count table rows`()
}

override fun `overwrite tables`() {
super.`overwrite tables`()
}

override fun `copy tables`() {
super.`copy tables`()
}

override fun `upsert tables`() {
super.`upsert tables`()
}

override fun `get generation id`() {
super.`get generation id`()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.airbyte.integrations.destination.clickhouse.component.config

import io.airbyte.cdk.util.Jsons
import io.airbyte.integrations.destination.clickhouse.spec.ClickhouseConfiguration
import io.airbyte.integrations.destination.clickhouse.spec.ClickhouseConfigurationFactory
import io.airbyte.integrations.destination.clickhouse.spec.ClickhouseSpecificationOss
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Primary
import jakarta.inject.Singleton
import java.nio.file.Files
import kotlin.io.path.Path

@Factory
class TestConfigBeanOverrides {
@Singleton
@Primary
fun config(): ClickhouseConfiguration {
val configPath = Path("secrets/test-instance.json")
val configStr = Files.readString(configPath)
val spec = Jsons.readValue(configStr, ClickhouseSpecificationOss::class.java)

return ClickhouseConfigurationFactory().makeWithoutExceptionHandling(spec)
}
}
Loading