Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.50.9
dockerImageTag: 3.50.10
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ class MySqlSourceCdcTemporalConverter : RelationalColumnCustomConverter {
TimeHandler,
TimestampHandler
)

/**
* Handles zero-dates (e.g., '0000-00-00 00:00:00') which MySQL allows in NON-NULLABLE columns
* but are invalid dates that JDBC drivers return as NULL. We are now converting those NULL
* values to epoch (1970-01-01...) to match default value behavior and satisfy Debezium's
* non-nullable schema constraints.
*/
data object DatetimeMillisHandler : RelationalColumnCustomConverter.Handler {

override fun matches(column: RelationalColumn): Boolean =
Expand All @@ -48,7 +53,14 @@ class MySqlSourceCdcTemporalConverter : RelationalColumnCustomConverter {

override val partialConverters: List<PartialConverter> =
listOf(
NullFallThrough,
PartialConverter {
if (it == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is your input here a null?
In that case in the old connector we fixed this by setting another property

props.setProperty("value.converter.replace.null.with.default", "false")

This was a bug in debezium which goes into kafka.

But setting this property you should be able to return a NULL and then let debezium place the default value.
See in the old CDK: DebeziumPropertiesManager

val epoch = LocalDateTime.ofInstant(Instant.EPOCH, ZoneOffset.UTC)
Converted(epoch.format(LocalDateTimeCodec.formatter))
} else {
NoConversion
}
},
PartialConverter {
if (it is LocalDateTime) {
Converted(it.format(LocalDateTimeCodec.formatter))
Expand Down Expand Up @@ -80,7 +92,14 @@ class MySqlSourceCdcTemporalConverter : RelationalColumnCustomConverter {

override val partialConverters: List<PartialConverter> =
listOf(
NullFallThrough,
PartialConverter {
if (it == null) {
val epoch = LocalDateTime.ofInstant(Instant.EPOCH, ZoneOffset.UTC)
Converted(epoch.format(LocalDateTimeCodec.formatter))
} else {
NoConversion
}
},
PartialConverter {
if (it is LocalDateTime) {
Converted(it.format(LocalDateTimeCodec.formatter))
Expand Down Expand Up @@ -112,7 +131,14 @@ class MySqlSourceCdcTemporalConverter : RelationalColumnCustomConverter {

override val partialConverters: List<PartialConverter> =
listOf(
NullFallThrough,
PartialConverter {
if (it == null) {
val epoch = LocalDate.ofEpochDay(0)
Converted(epoch.format(LocalDateCodec.formatter))
} else {
NoConversion
}
},
PartialConverter {
if (it is LocalDate) {
Converted(it.format(LocalDateCodec.formatter))
Expand All @@ -131,7 +157,11 @@ class MySqlSourceCdcTemporalConverter : RelationalColumnCustomConverter {
}
)
}

/**
* TIME supports '00:00:00' and it is considered valid, see
* https://dev.mysql.com/doc/refman/8.0/en/time.html. If we get a null value from the server, it
* means the TIME is invalid/corrupt and in that case we should return null.
*/
data object TimeHandler : RelationalColumnCustomConverter.Handler {

override fun matches(column: RelationalColumn): Boolean =
Expand Down Expand Up @@ -164,14 +194,22 @@ class MySqlSourceCdcTemporalConverter : RelationalColumnCustomConverter {
}

data object TimestampHandler : RelationalColumnCustomConverter.Handler {

override fun matches(column: RelationalColumn): Boolean =
column.typeName().equals("TIMESTAMP", ignoreCase = true)

override fun outputSchemaBuilder(): SchemaBuilder = SchemaBuilder.string()

override val partialConverters: List<PartialConverter> =
listOf(
NullFallThrough,
PartialConverter {
if (it == null) {
val epoch = OffsetDateTime.ofInstant(Instant.EPOCH, ZoneOffset.UTC)
Converted(epoch.format(OffsetDateTimeCodec.formatter))
} else {
NoConversion
}
},
PartialConverter {
if (it is ZonedDateTime) {
val offsetDateTime: OffsetDateTime = it.toOffsetDateTime()
Expand Down
Loading
Loading