Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airbyte-cdk/bulk/core/load/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ dependencies {
implementation("com.google.guava:guava:33.3.0-jre")

testFixturesApi testFixtures(project(':airbyte-cdk:bulk:core:bulk-cdk-core-base'))
testFixturesApi project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-db')

testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.8.1")
testFixturesImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.8.1")
testImplementation("io.mockk:mockk:1.13.12")
implementation "org.jetbrains.kotlin:kotlin-reflect:2.0.20"
testFixturesImplementation "uk.org.webcompere:system-stubs-jupiter:2.1.7"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.component

import io.airbyte.cdk.load.CoreTableOperationsClient
import io.airbyte.cdk.load.command.Append
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.command.NamespaceMapper
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.orchestration.db.ColumnNameMapping
import io.airbyte.cdk.load.orchestration.db.TableName
import java.util.UUID
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.assertDoesNotThrow

interface CoreTableOperationsSuite {
val client: CoreTableOperationsClient

fun `connect to database`() = runTest {
assertDoesNotThrow { client.ping() }
}

fun `create and drop namespaces`() = runTest {
val testNamespace = "namespace-test-${UUID.randomUUID()}"

require(!client.namespaceExists(testNamespace)) { "test namespace: $testNamespace already exists. Please validate it's deleted before running again." }

client.createNamespace(testNamespace)

assert(client.namespaceExists(testNamespace))

client.dropNamespace(testNamespace)

assert(!client.namespaceExists(testNamespace))
}

fun `create and drop tables`() = runTest {
val uniquePostFix = UUID.randomUUID()
val testTable = TableName(
"table-test-namespace-$uniquePostFix",
"table-test-table-$uniquePostFix",
)

require(!client.tableExists(testTable)) { "test table: ${testTable.namespace}.${testTable.name} already exists. Please validate it's deleted before running again." }

client.createTable(
stream = DestinationStream(
unmappedNamespace = testTable.namespace,
unmappedName = testTable.name,
importType = Append,
generationId = 1,
minimumGenerationId = 0,
syncId = 1,
includeFiles = false,
schema = ObjectType(linkedMapOf()),
namespaceMapper = NamespaceMapper(),
),
tableName = testTable,
columnNameMapping = ColumnNameMapping(mapOf()),
replace = false,
)

assert(client.tableExists(testTable))

client.dropTable(testTable)

assert(!client.tableExists(testTable))
}

fun `count table rows`() {}

fun `overwrite tables`() {}

fun `copy tables`() {}

fun `upsert tables`() {}

fun `get generation id`() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.airbyte.cdk.load

import io.airbyte.cdk.load.client.AirbyteClient
import io.airbyte.cdk.load.orchestration.db.TableName
import io.airbyte.cdk.load.orchestration.db.direct_load_table.DirectLoadTableNativeOperations
import io.airbyte.cdk.load.orchestration.db.direct_load_table.DirectLoadTableSqlOperations

interface CoreTableOperationsClient :
AirbyteClient,
DirectLoadTableSqlOperations,
DirectLoadTableNativeOperations {

suspend fun ping() = Unit

suspend fun namespaceExists(namespace: String) = false

suspend fun dropNamespace(namespace: String) = Unit

suspend fun tableExists(table: TableName) = false

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ application {

dependencies {
implementation 'com.clickhouse:client-v2:0.9.0'
implementation project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-db')

testImplementation("io.mockk:mockk:1.14.2")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.2")
Expand All @@ -42,4 +43,5 @@ dependencies {
// https://mvnrepository.com/artifact/org.testcontainers/clickhouse
integrationTestImplementation 'org.testcontainers:clickhouse:1.21.1'
integrationTestImplementation 'com.clickhouse:client-v2:0.9.0'
integrationTestImplementation testFixtures(project(':airbyte-cdk:bulk:core:bulk-cdk-core-load'))
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,8 @@ data:
connectorTestSuitesOptions:
- suite: unitTests
- suite: integrationTests
testSecrets:
- name: test_instance
fileName: test-instance.json
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edgao I've got a file named test-instance.json in the secrets dir with creds etc in there and I want this to be injected for the tests.

supportsRefreshes: true
metadataSpecVersion: "1.0"
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@ import com.clickhouse.data.ClickHouseColumn
import com.clickhouse.data.ClickHouseDataType
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.ConfigErrorException
import io.airbyte.cdk.load.client.AirbyteClient
import io.airbyte.cdk.load.CoreTableOperationsClient
import io.airbyte.cdk.load.command.Dedupe
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAMES
import io.airbyte.cdk.load.orchestration.db.ColumnNameMapping
import io.airbyte.cdk.load.orchestration.db.TableName
import io.airbyte.cdk.load.orchestration.db.TempTableNameGenerator
import io.airbyte.cdk.load.orchestration.db.direct_load_table.DirectLoadTableNativeOperations
import io.airbyte.cdk.load.orchestration.db.direct_load_table.DirectLoadTableSqlOperations
import io.airbyte.integrations.destination.clickhouse.client.ClickhouseSqlGenerator.Companion.DATETIME_WITH_PRECISION
import io.airbyte.integrations.destination.clickhouse.client.ClickhouseSqlGenerator.Companion.DECIMAL_WITH_PRECISION_AND_SCALE
import io.airbyte.integrations.destination.clickhouse.config.ClickhouseFinalTableNameGenerator
Expand All @@ -46,7 +44,7 @@ class ClickhouseAirbyteClient(
private val nameGenerator: ClickhouseFinalTableNameGenerator,
private val tempTableNameGenerator: TempTableNameGenerator,
private val clickhouseConfiguration: ClickhouseConfiguration,
) : AirbyteClient, DirectLoadTableSqlOperations, DirectLoadTableNativeOperations {
) : CoreTableOperationsClient {

override suspend fun createNamespace(namespace: String) {
val statement = sqlGenerator.createNamespace(namespace)
Expand Down Expand Up @@ -349,4 +347,8 @@ class ClickhouseAirbyteClient(
this.name
}
}

override suspend fun ping() {
execute("SELECT 1")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.airbyte.integrations.destination.clickhouse.component

import io.airbyte.cdk.load.component.CoreTableOperationsSuite
import io.airbyte.integrations.destination.clickhouse.client.ClickhouseAirbyteClient
import io.micronaut.test.extensions.junit5.annotation.MicronautTest
import jakarta.inject.Inject
import org.junit.jupiter.api.Test

@MicronautTest
class ClickhouseTableOperationsTest: CoreTableOperationsSuite {
@Inject
override lateinit var client: ClickhouseAirbyteClient

@Test
override fun `connect to database`() {
super.`connect to database`()
}

override fun `create and drop namespaces`() {
super.`create and drop namespaces`()
}

override fun `create and drop tables`() {
super.`create and drop tables`()
}

override fun `count table rows`() {
super.`count table rows`()
}

override fun `overwrite tables`() {
super.`overwrite tables`()
}

override fun `copy tables`() {
super.`copy tables`()
}

override fun `upsert tables`() {
super.`upsert tables`()
}

override fun `get generation id`() {
super.`get generation id`()
}
}
Loading