Skip to content

Commit 1379903

Browse files
Merge branch 'master' into daryna/source-mixpanel/set-up-concurrency
2 parents b6ef781 + c803a45 commit 1379903

File tree

135 files changed

+23951
-4371
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

135 files changed

+23951
-4371
lines changed
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.message
6+
7+
import io.airbyte.cdk.load.state.ReservationManager
8+
import kotlinx.coroutines.channels.Channel
9+
import kotlinx.coroutines.flow.Flow
10+
import kotlinx.coroutines.runBlocking
11+
import org.jetbrains.annotations.VisibleForTesting
12+
13+
class ResourceReservingPartitionedQueue<T>(
14+
val reservationManager: ReservationManager,
15+
val ratioOfTotalMemoryToReserve: Double,
16+
val numConsumers: Int,
17+
val numProducers: Int,
18+
val expectedResourceUsagePerUnit: Long
19+
) : PartitionedQueue<T> {
20+
21+
private val requestedResourceAmount =
22+
(ratioOfTotalMemoryToReserve * reservationManager.totalCapacityBytes).toLong()
23+
private val reservation = runBlocking {
24+
reservationManager.reserveOrThrow(requestedResourceAmount, this)
25+
}
26+
private val minNumUnits: Int = numProducers + numConsumers * 2
27+
private val maxMessageSize = reservation.bytesReserved / minNumUnits
28+
29+
val clampedMessageSize = expectedResourceUsagePerUnit.coerceAtMost(maxMessageSize)
30+
private val maxNumUnits = (reservation.bytesReserved / clampedMessageSize).toInt()
31+
32+
private val totalQueueCapacity: Int = (maxNumUnits - (numProducers + numConsumers))
33+
34+
// Our earlier calculations should ensure this is always at least 1, but
35+
// we'll clamp it to be safe.
36+
@VisibleForTesting
37+
val queuePartitionCapacity: Int = (totalQueueCapacity / numConsumers).coerceAtLeast(1)
38+
39+
private val underlying =
40+
StrictPartitionedQueue<T>(
41+
(0 until numConsumers)
42+
.map { ChannelMessageQueue<T>(Channel(queuePartitionCapacity)) }
43+
.toTypedArray()
44+
)
45+
46+
override val partitions: Int = numConsumers
47+
48+
override fun consume(partition: Int): Flow<T> = underlying.consume(partition)
49+
50+
override suspend fun close() {
51+
underlying.close()
52+
reservation.release()
53+
}
54+
55+
override suspend fun broadcast(value: T) = underlying.broadcast(value)
56+
57+
override suspend fun publish(value: T, partition: Int) = underlying.publish(value, partition)
58+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.pipeline
6+
7+
import io.airbyte.cdk.load.command.Dedupe
8+
import io.airbyte.cdk.load.message.DestinationRecordRaw
9+
import kotlin.random.Random
10+
11+
class ByPrimaryKeyInputPartitioner : InputPartitioner {
12+
private val random = Random(System.currentTimeMillis())
13+
14+
override fun getPartition(record: DestinationRecordRaw, numParts: Int): Int {
15+
if (numParts == 1) {
16+
return 0
17+
}
18+
19+
if (record.stream.importType !is Dedupe) {
20+
return random.nextInt(numParts)
21+
}
22+
23+
val primaryKey = (record.stream.importType).primaryKey
24+
val jsonData = record.asRawJson()
25+
26+
val primaryKeyValues =
27+
primaryKey.map { keys ->
28+
keys.map { key -> if (jsonData.has(key)) jsonData.get(key) else null }
29+
}
30+
return Math.floorMod(primaryKeyValues.hashCode(), numParts)
31+
}
32+
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/OutputPartitioner.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,5 @@ import io.airbyte.cdk.load.message.WithStream
1313
*/
1414
interface OutputPartitioner<K1 : WithStream, T, K2 : WithStream, U> {
1515
fun getOutputKey(inputKey: K1, output: U): K2
16-
fun getPart(outputKey: K2, numParts: Int): Int
16+
fun getPart(outputKey: K2, inputPart: Int, numParts: Int): Int
1717
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.pipeline
6+
7+
import io.airbyte.cdk.load.message.DestinationRecordRaw
8+
import kotlin.random.Random
9+
10+
class RandomInputPartitioner : InputPartitioner {
11+
private val prng = Random(System.currentTimeMillis())
12+
13+
override fun getPartition(record: DestinationRecordRaw, numParts: Int): Int {
14+
return prng.nextInt(numParts)
15+
}
16+
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/spec/DestinationSpecificationInternal.kt

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44

55
package io.airbyte.cdk.load.spec
66

7+
import com.fasterxml.jackson.databind.node.ObjectNode
78
import io.airbyte.cdk.spec.IdentitySpecificationExtender
89
import io.airbyte.cdk.spec.SpecificationExtender
10+
import io.airbyte.cdk.util.Jsons
911
import io.airbyte.protocol.models.v0.ConnectorSpecification
1012
import io.airbyte.protocol.models.v0.DestinationSyncMode
1113
import io.micronaut.context.annotation.Replaces
@@ -18,13 +20,47 @@ import jakarta.inject.Singleton
1820
class DestinationSpecificationExtender(private val spec: DestinationSpecificationExtension) :
1921
SpecificationExtender {
2022
override fun invoke(specification: ConnectorSpecification): ConnectorSpecification {
23+
if (spec.groups.isNotEmpty()) {
24+
val schema = specification.connectionSpecification as ObjectNode
25+
schema.set<ObjectNode>(
26+
"groups",
27+
Jsons.arrayNode().apply {
28+
spec.groups.forEach { group ->
29+
add(
30+
Jsons.objectNode().apply {
31+
put("id", group.id)
32+
put("title", group.title)
33+
}
34+
)
35+
}
36+
}
37+
)
38+
}
39+
2140
return specification
2241
.withSupportedDestinationSyncModes(spec.supportedSyncModes)
2342
.withSupportsIncremental(spec.supportsIncremental)
2443
}
2544
}
2645

2746
interface DestinationSpecificationExtension {
47+
/**
48+
* A connector's spec can specify "groups", which the UI will use to put related spec options in
49+
* the same place. To do this, you should:
50+
* * add a [Group] to the [groups] list (e.g. `Group(id = "foo", title = "Foo")`
51+
* * inject `{"group": "foo"}` to the generated JSONSchema for the relevant spec options
52+
* (`@JsonSchemaInject(json = """{"group": "foo"}""") val theOption: String`
53+
* * note that this should be the id of the group, not the title.
54+
*/
55+
data class Group(
56+
/** A computer-friendly ID for the group */
57+
val id: String,
58+
/** A human-readable name for the group */
59+
val title: String
60+
)
61+
2862
val supportedSyncModes: List<DestinationSyncMode>
2963
val supportsIncremental: Boolean
64+
val groups: List<Group>
65+
get() = emptyList()
3066
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTask.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ class LoadPipelineStepTask<S : AutoCloseable, K1 : WithStream, T, K2 : WithStrea
263263
outputQueue?.let {
264264
val outputKey = outputPartitioner!!.getOutputKey(inputKey, output)
265265
val message = PipelineMessage(checkpointCounts.toMap(), outputKey, output)
266-
val outputPart = outputPartitioner.getPart(outputKey, it.partitions)
266+
val outputPart = outputPartitioner.getPart(outputKey, part, it.partitions)
267267
it.publish(message, outputPart)
268268
}
269269

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.message
6+
7+
import io.airbyte.cdk.load.state.ReservationManager
8+
import io.airbyte.cdk.load.state.Reserved
9+
import io.mockk.coEvery
10+
import io.mockk.coVerify
11+
import io.mockk.mockk
12+
import kotlinx.coroutines.test.runTest
13+
import org.junit.jupiter.api.Assertions
14+
import org.junit.jupiter.api.Test
15+
16+
class ResourceReservingPartitionedQueueTest {
17+
18+
@Test
19+
fun `part queue respects memory available`() = runTest {
20+
val reservationManager = mockk<ReservationManager>(relaxed = true)
21+
coEvery { reservationManager.totalCapacityBytes } returns 1000
22+
val reservation = mockk<Reserved<ResourceReservingPartitionedQueue<Unit>>>(relaxed = true)
23+
coEvery { reservation.bytesReserved } returns 500
24+
coEvery {
25+
reservationManager.reserveOrThrow<ResourceReservingPartitionedQueue<Unit>>(any(), any())
26+
} returns reservation
27+
ResourceReservingPartitionedQueue<Unit>(
28+
reservationManager,
29+
0.5,
30+
1, // not relevant
31+
1, // not relevant
32+
1, // not relevant
33+
)
34+
coVerify {
35+
reservationManager.reserveOrThrow<ResourceReservingPartitionedQueue<Unit>>(500, any())
36+
}
37+
}
38+
39+
@Test
40+
fun `part queue clamps part size if too many workers`() {
41+
val reservationManager = ReservationManager(1000)
42+
val queue =
43+
ResourceReservingPartitionedQueue<Unit>(
44+
reservationManager,
45+
0.8,
46+
3,
47+
5,
48+
100,
49+
)
50+
val clampedSize = queue.clampedMessageSize
51+
Assertions.assertEquals(800 / 11, clampedSize)
52+
}
53+
54+
@Test
55+
fun `part queue does not clamp part size if not too many workers`() {
56+
val reservationManager = ReservationManager(1000)
57+
val queue =
58+
ResourceReservingPartitionedQueue<Unit>(
59+
reservationManager,
60+
0.8,
61+
1,
62+
5,
63+
100,
64+
)
65+
val clampedSize = queue.clampedMessageSize
66+
Assertions.assertEquals(100, clampedSize)
67+
}
68+
69+
@Test
70+
fun `queue capacity is derived from clamped size and available memory`() {
71+
val reservationManager = ReservationManager(1000)
72+
val queue =
73+
ResourceReservingPartitionedQueue<Unit>(
74+
reservationManager,
75+
0.75,
76+
1,
77+
3,
78+
100,
79+
)
80+
Assertions.assertEquals(100L, queue.clampedMessageSize)
81+
Assertions.assertEquals(3, queue.queuePartitionCapacity)
82+
}
83+
}

airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTaskUTest.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,11 @@ class LoadPipelineStepTaskUTest {
429429
return TestKey(output, inputKey.stream)
430430
}
431431

432-
override fun getPart(outputKey: TestKey, numParts: Int): Int {
432+
override fun getPart(
433+
outputKey: TestKey,
434+
inputPart: Int,
435+
numParts: Int
436+
): Int {
433437
if (outputKey.output) return 1
434438
return 0
435439
}

airbyte-cdk/bulk/toolkits/load-db/src/main/kotlin/io/airbyte/cdk/load/pipeline/db/BulkLoadCompletedUploadPartitioner.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class BulkLoadCompletedUploadPartitioner<T : RemoteObject<*>> :
2626
return StreamKey(inputKey.stream)
2727
}
2828

29-
override fun getPart(outputKey: StreamKey, numParts: Int): Int {
29+
override fun getPart(outputKey: StreamKey, inputPart: Int, numParts: Int): Int {
3030
return Math.floorMod(outputKey.stream.hashCode(), numParts)
3131
}
3232
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.pipeline.db
6+
7+
import io.airbyte.cdk.load.pipeline.ByPrimaryKeyInputPartitioner
8+
import io.airbyte.cdk.load.pipeline.ByStreamInputPartitioner
9+
import io.airbyte.cdk.load.pipeline.InputPartitioner
10+
import io.airbyte.cdk.load.pipeline.RandomInputPartitioner
11+
import io.airbyte.cdk.load.write.db.InsertLoader
12+
import io.micronaut.context.annotation.Factory
13+
import io.micronaut.context.annotation.Requires
14+
import jakarta.inject.Singleton
15+
16+
@Factory
17+
class InsertLoaderInputPartitionerFactory {
18+
@Singleton
19+
@Requires(bean = InsertLoader::class)
20+
fun insertLoaderInputPartitioner(insertLoader: InsertLoader<*>): InputPartitioner {
21+
return when (insertLoader.partitioningStrategy) {
22+
InsertLoader.PartitioningStrategy.ByStream -> ByStreamInputPartitioner()
23+
InsertLoader.PartitioningStrategy.ByPrimaryKey -> ByPrimaryKeyInputPartitioner()
24+
InsertLoader.PartitioningStrategy.Random -> RandomInputPartitioner()
25+
}
26+
}
27+
}

0 commit comments

Comments
 (0)