Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -336,18 +336,23 @@ data class ArrayEncoder<T>(
override fun encode(decoded: List<T>): JsonNode =
Jsons.arrayNode().apply {
for (e in decoded) {
add(elementEncoder.encode(e))
// Note: in generics, T can be nullable!
if (e == null) add(NullCodec.encode(e)) else add(elementEncoder.encode(e))
}
}
}

data class ArrayDecoder<T>(
val elementDecoder: JsonDecoder<T>,
) : JsonDecoder<List<T>> {
override fun decode(encoded: JsonNode): List<T> {
) : JsonDecoder<List<T?>> {
override fun decode(encoded: JsonNode): List<T?> {
if (!encoded.isArray) {
throw IllegalArgumentException("invalid array value $encoded")
}
return encoded.elements().asSequence().map { elementDecoder.decode(it) }.toList()
return encoded
.elements()
.asSequence()
.map { if (it == null) null else elementDecoder.decode(it) }
.toList()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ sealed interface DataOrMetaField {

@Deprecated(
message = "Use `DataOrMetaField` directly instead.",
replaceWith = ReplaceWith("DataOrMetaField"))
replaceWith = ReplaceWith("DataOrMetaField")
)
typealias FieldOrMetaField = DataOrMetaField
/**
* Root of our own type hierarchy for Airbyte record fields.
Expand All @@ -43,11 +44,12 @@ interface LosslessFieldType : FieldType {
val jsonDecoder: JsonDecoder<*>
}

interface DataField: DataOrMetaField
interface DataField : DataOrMetaField

@Deprecated(
message = "Use `EmittedField` directly instead.",
replaceWith = ReplaceWith("EmittedField"))
replaceWith = ReplaceWith("EmittedField")
)
typealias Field = EmittedField
/**
* Internal equivalent of [io.airbyte.protocol.models.Field] for values which come from the source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ fun <T> JsonEncoder<T>.toProtobufEncoder(): ProtoEncoder<*> {
is LocalTimeCodec, -> localTimeProtoEncoder
is LocalDateTimeCodec, -> localDateTimeProtoEncoder
is OffsetTimeCodec, -> offsetTimeProtoEncoder
is ArrayEncoder<*>, -> anyProtoEncoder
is ArrayEncoder<*>, -> arrayProtoEncoder
else -> anyProtoEncoder
}
}
Expand Down Expand Up @@ -160,7 +160,9 @@ val floatProtoEncoder =

val nullProtoEncoder = generateProtoEncoder<Any?> { builder, _ -> builder.setIsNull(true) }
val anyProtoEncoder = textProtoEncoder
// typealias AnyProtoEncoder = TextProtoEncoder

// For now arrays are encoded in protobuf as json strings
val arrayProtoEncoder = textProtoEncoder

fun NativeRecordPayload.toProtobuf(
recordMessageBuilder: AirbyteRecordMessageProtobuf.Builder,
Expand All @@ -177,7 +179,12 @@ fun NativeRecordPayload.toProtobuf(
entry.value.fieldValue?.let {
(entry.value.jsonEncoder.toProtobufEncoder() as ProtoEncoder<Any>).encode(
valueBuilder.clear(),
entry.value.fieldValue!!
when (entry.value.jsonEncoder) {
// For arrays we use the value of its json string.
is ArrayEncoder<*> -> entry.value.encode().toString()
else -> entry.value.fieldValue!!
}

)
}
?: nullProtoEncoder.encode(valueBuilder.clear(), null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
package io.airbyte.cdk.read

import io.airbyte.cdk.StreamIdentifier
import io.airbyte.cdk.discover.Field
import io.airbyte.cdk.discover.DataOrMetaField
import io.airbyte.cdk.discover.Field

/**
* [Feed] identifies part of the data consumed during a READ operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import io.airbyte.cdk.command.StreamInputState
import io.airbyte.cdk.data.AirbyteSchemaType
import io.airbyte.cdk.data.ArrayAirbyteSchemaType
import io.airbyte.cdk.data.LeafAirbyteSchemaType
import io.airbyte.cdk.discover.Field
import io.airbyte.cdk.discover.DataOrMetaField
import io.airbyte.cdk.discover.Field
import io.airbyte.cdk.discover.MetaField
import io.airbyte.cdk.discover.MetaFieldDecorator
import io.airbyte.cdk.discover.MetadataQuerier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class JdbcMetadataQuerier(
}
}
val clause = if (constants.includePseudoColumns) " and pseudo-column(s)" else ""
log.info { "Discovered ${results.size} column(s)${clause}."}
log.info { "Discovered ${results.size} column(s)${clause}." }
} catch (e: Exception) {
throw RuntimeException("Column name discovery query failed: ${e.message}", e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.cdk.read

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.command.OpaqueStateValue
import io.airbyte.cdk.discover.Field
import io.airbyte.cdk.output.sockets.toJson
Expand Down Expand Up @@ -180,7 +179,8 @@ class DefaultJdbcSplittableSnapshotPartition(
override fun incompleteState(lastRecord: SelectQuerier.ResultRow): OpaqueStateValue =
DefaultJdbcStreamStateValue.snapshotCheckpoint(
primaryKey = checkpointColumns,
primaryKeyCheckpoint = checkpointColumns.map { lastRecord.data.toJson()[it.id] ?: Jsons.nullNode() },
primaryKeyCheckpoint =
checkpointColumns.map { lastRecord.data.toJson()[it.id] ?: Jsons.nullNode() },
)
}

Expand Down Expand Up @@ -247,7 +247,8 @@ class DefaultJdbcSplittableSnapshotWithCursorPartition(
override fun incompleteState(lastRecord: SelectQuerier.ResultRow): OpaqueStateValue =
DefaultJdbcStreamStateValue.snapshotWithCursorCheckpoint(
primaryKey = checkpointColumns,
primaryKeyCheckpoint = checkpointColumns.map { lastRecord.data.toJson()[it.id] ?: Jsons.nullNode() },
primaryKeyCheckpoint =
checkpointColumns.map { lastRecord.data.toJson()[it.id] ?: Jsons.nullNode() },
cursor,
cursorUpperBound,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import io.airbyte.cdk.ConfigErrorException
import io.airbyte.cdk.StreamIdentifier
import io.airbyte.cdk.command.JdbcSourceConfiguration
import io.airbyte.cdk.command.OpaqueStateValue
import io.airbyte.cdk.discover.Field
import io.airbyte.cdk.discover.DataOrMetaField
import io.airbyte.cdk.discover.Field
import io.airbyte.cdk.output.CatalogValidationFailureHandler
import io.airbyte.cdk.output.InvalidCursor
import io.airbyte.cdk.output.InvalidPrimaryKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package io.airbyte.cdk.read

import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.command.OpaqueStateValue

/**
Expand Down Expand Up @@ -46,7 +45,6 @@ interface JdbcSplittablePartition<S : JdbcStreamState<*>> : JdbcPartition<S> {

/** State value to emit when the partition is read up to (and including) [lastRecord]. */
fun incompleteState(lastRecord: SelectQuerier.ResultRow): OpaqueStateValue

}

/** A [JdbcPartition] which allows cursor-based incremental reads. */
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
package io.airbyte.cdk.read

import com.fasterxml.jackson.databind.node.ObjectNode
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.TransientErrorException
import io.airbyte.cdk.command.OpaqueStateValue
import io.airbyte.cdk.discover.Field
import io.airbyte.cdk.output.DataChannelMedium.*
import io.airbyte.cdk.output.OutputMessageRouter
import io.airbyte.cdk.output.sockets.NativeRecordPayload
import io.airbyte.cdk.output.sockets.toJson
import io.airbyte.cdk.util.Jsons
import io.airbyte.protocol.models.v0.AirbyteStateMessage
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage
import java.time.Duration
Expand Down Expand Up @@ -200,7 +197,7 @@ class JdbcResumablePartitionReader<P : JdbcSplittablePartition<*>>(
.use { result: SelectQuerier.Result ->
for (row in result) {
out(row)
// lastRecord.set(row.data.toJson(Jsons.objectNode()))
// lastRecord.set(row.data.toJson(Jsons.objectNode()))
lastRecord.set(row)
// Check activity periodically to handle timeout.
if (numRecords.incrementAndGet() % fetchSize == 0L) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,9 @@ class JdbcSequentialPartitionsCreator<
}
if (streamState.fetchSize == null) {
if (sharedState.withSampling) {
val rowByteSizeSample: Sample<Long> =
collectSample { sharedState.rowByteSizeEstimator().apply(it.data.toJson()) }
val rowByteSizeSample: Sample<Long> = collectSample {
sharedState.rowByteSizeEstimator().apply(it.data.toJson())
}
val expectedTableByteSize: Long =
rowByteSizeSample.sampledValues.sum() * rowByteSizeSample.valueWeight
log.info { "Table memory size estimated at ${expectedTableByteSize shr 20} MiB." }
Expand Down Expand Up @@ -212,12 +213,14 @@ class JdbcConcurrentPartitionsCreator<
return listOf(JdbcNonResumablePartitionReader(partition))
}
// Sample the table for partition split boundaries and for record byte sizes.
val sample: Sample<Pair<OpaqueStateValue?, Long>> = collectSample { record: SelectQuerier.ResultRow ->
val boundary: OpaqueStateValue? =
(partition as? JdbcSplittablePartition<*>)?.incompleteState(record)
val rowByteSize: Long = sharedState.rowByteSizeEstimator().apply(record.data.toJson())
boundary to rowByteSize
}
val sample: Sample<Pair<OpaqueStateValue?, Long>> =
collectSample { record: SelectQuerier.ResultRow ->
val boundary: OpaqueStateValue? =
(partition as? JdbcSplittablePartition<*>)?.incompleteState(record)
val rowByteSize: Long =
sharedState.rowByteSizeEstimator().apply(record.data.toJson())
boundary to rowByteSize
}
if (sample.kind == Sample.Kind.EMPTY) {
log.info { "Sampling query found that the table was empty." }
return listOf(CheckpointOnlyPartitionReader())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ class JdbcSelectQuerier(
val jdbcFieldType: JdbcFieldType<*> = column.type as JdbcFieldType<*>
try {
if (column is NonEmittedField) {
resultRow.nonEmittedData[column.id] =
FieldValueEncoder(
jdbcFieldType.jdbcGetter.get(rs!!, colIdx),
jdbcFieldType.jsonEncoder as JsonEncoder<in Any?>,
)
resultRow.nonEmittedData[column.id] =
FieldValueEncoder(
jdbcFieldType.jdbcGetter.get(rs!!, colIdx),
jdbcFieldType.jsonEncoder as JsonEncoder<in Any?>,
)
} else {
@Suppress("UNCHECKED_CAST")
resultRow.data[column.id] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,17 @@ class IntegrationTestOperations(
return streams
}

@Deprecated("Use the correctly named 'read' function")
fun sync(
catalog: ConfiguredAirbyteCatalog,
state: List<AirbyteStateMessage> = listOf(),
vararg featureFlags: FeatureFlag
): BufferingOutputConsumer = read(catalog, state, *featureFlags)

fun read(
catalog: ConfiguredAirbyteCatalog,
state: List<AirbyteStateMessage> = listOf(),
vararg featureFlags: FeatureFlag
): BufferingOutputConsumer {
return CliRunner.source("read", configSpec, catalog, state, *featureFlags).run()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import io.airbyte.cdk.command.SourceConfiguration
import io.airbyte.cdk.command.SourceConfigurationFactory
import io.airbyte.cdk.discover.DiscoveredStream
import io.airbyte.cdk.discover.EmittedField
import io.airbyte.cdk.discover.Field
import io.airbyte.cdk.discover.JdbcAirbyteStreamFactory
import io.airbyte.cdk.jdbc.BigIntegerFieldType
import io.airbyte.cdk.jdbc.LocalDateTimeFieldType
Expand Down
Loading
Loading