Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.50.9
dockerImageTag: 3.50.10
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ class MySqlSourceCdcTemporalConverter : RelationalColumnCustomConverter {
TimeHandler,
TimestampHandler
)

/**
* Handles zero-dates (e.g., '0000-00-00 00:00:00') which MySQL allows in NON-NULLABLE columns
* but are invalid dates that JDBC drivers return as NULL. We are now converting those NULL
* values to epoch (1970-01-01...) to match default value behavior and satisfy Debezium's
* non-nullable schema constraints.
*/
data object DatetimeMillisHandler : RelationalColumnCustomConverter.Handler {

override fun matches(column: RelationalColumn): Boolean =
Expand All @@ -48,7 +53,14 @@ class MySqlSourceCdcTemporalConverter : RelationalColumnCustomConverter {

override val partialConverters: List<PartialConverter> =
listOf(
NullFallThrough,
PartialConverter {
if (it == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is your input here a null?
In that case in the old connector we fixed this by setting another property

props.setProperty("value.converter.replace.null.with.default", "false")

This was a bug in debezium which goes into kafka.

But setting this property you should be able to return a NULL and then let debezium place the default value.
See in the old CDK: DebeziumPropertiesManager

val epoch = LocalDateTime.ofInstant(Instant.EPOCH, ZoneOffset.UTC)
Converted(epoch.format(LocalDateTimeCodec.formatter))
} else {
NoConversion
}
},
PartialConverter {
if (it is LocalDateTime) {
Converted(it.format(LocalDateTimeCodec.formatter))
Expand Down Expand Up @@ -80,7 +92,14 @@ class MySqlSourceCdcTemporalConverter : RelationalColumnCustomConverter {

override val partialConverters: List<PartialConverter> =
listOf(
NullFallThrough,
PartialConverter {
if (it == null) {
val epoch = LocalDateTime.ofInstant(Instant.EPOCH, ZoneOffset.UTC)
Converted(epoch.format(LocalDateTimeCodec.formatter))
} else {
NoConversion
}
},
PartialConverter {
if (it is LocalDateTime) {
Converted(it.format(LocalDateTimeCodec.formatter))
Expand Down Expand Up @@ -112,7 +131,14 @@ class MySqlSourceCdcTemporalConverter : RelationalColumnCustomConverter {

override val partialConverters: List<PartialConverter> =
listOf(
NullFallThrough,
PartialConverter {
if (it == null) {
val epoch = LocalDate.ofEpochDay(0)
Converted(epoch.format(LocalDateCodec.formatter))
} else {
NoConversion
}
},
PartialConverter {
if (it is LocalDate) {
Converted(it.format(LocalDateCodec.formatter))
Expand All @@ -131,7 +157,11 @@ class MySqlSourceCdcTemporalConverter : RelationalColumnCustomConverter {
}
)
}

/**
* TIME supports '00:00:00' and it is considered valid, see
* https://dev.mysql.com/doc/refman/8.0/en/time.html. If we get a null value from the server, it
* means the TIME is invalid/corrupt and in that case we should return null.
*/
data object TimeHandler : RelationalColumnCustomConverter.Handler {

override fun matches(column: RelationalColumn): Boolean =
Expand Down Expand Up @@ -164,14 +194,22 @@ class MySqlSourceCdcTemporalConverter : RelationalColumnCustomConverter {
}

data object TimestampHandler : RelationalColumnCustomConverter.Handler {

override fun matches(column: RelationalColumn): Boolean =
column.typeName().equals("TIMESTAMP", ignoreCase = true)

override fun outputSchemaBuilder(): SchemaBuilder = SchemaBuilder.string()

override val partialConverters: List<PartialConverter> =
listOf(
NullFallThrough,
PartialConverter {
if (it == null) {
val epoch = OffsetDateTime.ofInstant(Instant.EPOCH, ZoneOffset.UTC)
Converted(epoch.format(OffsetDateTimeCodec.formatter))
} else {
NoConversion
}
},
PartialConverter {
if (it is ZonedDateTime) {
val offsetDateTime: OffsetDateTime = it.toOffsetDateTime()
Expand Down
Loading
Loading