-
Notifications
You must be signed in to change notification settings - Fork 4.8k
Migrate source mssql from old CDK to new CDK #63731
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
The latest updates on your projects. Learn more about Vercel for Git βοΈ 1 Skipped Deployment
|
π Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Helpful Resources
PR Slash CommandsAirbyte Maintainers (that's you!) can execute the following slash commands on your PR:
|
|
9a4ae16
to
66ff966
Compare
|
|
|
|
|
|
2e95eb8
to
003eeae
Compare
Deploy preview for airbyte-docs ready! β
Preview Built with commit b3afefe. |
a3b9731
to
b0d1572
Compare
f07ec67
to
7b41d01
Compare
|
e44a5f6
to
fc52150
Compare
fc52150
to
0da601c
Compare
Note Detected that there are differences in the Gradle dependencies. |
98efdc7
to
392fbf9
Compare
652f675
to
15df445
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Progressive rollout needs also
releases:
rolloutConfiguration:
enableProgressiveRollout: true
) { | ||
recordData.set<JsonNode>( | ||
CommonMetaField.CDC_UPDATED_AT.id, | ||
CdcOffsetDateTimeMetaFieldType.jsonEncoder.encode(timestamp), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should match the change in MsSqlServerDebeziumOperations:
CommonMetaField.CDC_UPDATED_AT.type.jsonEncoder as JsonEncoder<Any>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's when we decorate NativeRecordPayload
not ObjectNode
right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ohhh π€¦
that's right.
ignore
5d1a565
to
dd22c3e
Compare
6d1670a
to
9735e06
Compare
| 4.2.3 | 2025-07-01 | [62052](https://github.com/airbytehq/airbyte/pull/62052) | Revert change to CDK interface signature. | | ||
| 4.2.2 | 2025-06-25 | [61729](https://github.com/airbytehq/airbyte/pull/61729) | Support the use of logical primary keys for CDC. | | ||
| 4.2.1 | 2025-06-23 | [62015](https://github.com/airbytehq/airbyte/pull/62015) | Fix previous merge. Improve cutoff date handling | | ||
| 4.3.0 | 2025-09-26 | [63731](https://github.com/airbytehq/airbyte/pull/63731) | Migrate source mssql from old CDK to new CDK | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
π« [vale] reported by reviewdog πΆ
[Vale.Spelling] Did you really mean 'mssql'?
docs/integrations/sources/mssql.md
Outdated
| 4.2.2 | 2025-06-25 | [61729](https://github.com/airbytehq/airbyte/pull/61729) | Support the use of logical primary keys for CDC. | | ||
| 4.2.1 | 2025-06-23 | [62015](https://github.com/airbytehq/airbyte/pull/62015) | Fix previous merge. Improve cutoff date handling | | ||
| 4.3.0 | 2025-09-26 | [63731](https://github.com/airbytehq/airbyte/pull/63731) | Migrate source mssql from old CDK to new CDK | | ||
| 4.2.5 | 2025-08-13 | [64905](https://github.com/airbytehq/airbyte/pull/64905) | bumping up java cdk version for mssql | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
π« [vale] reported by reviewdog πΆ
[Vale.Spelling] Did you really mean 'cdk'?
docs/integrations/sources/mssql.md
Outdated
| 4.2.2 | 2025-06-25 | [61729](https://github.com/airbytehq/airbyte/pull/61729) | Support the use of logical primary keys for CDC. | | ||
| 4.2.1 | 2025-06-23 | [62015](https://github.com/airbytehq/airbyte/pull/62015) | Fix previous merge. Improve cutoff date handling | | ||
| 4.3.0 | 2025-09-26 | [63731](https://github.com/airbytehq/airbyte/pull/63731) | Migrate source mssql from old CDK to new CDK | | ||
| 4.2.5 | 2025-08-13 | [64905](https://github.com/airbytehq/airbyte/pull/64905) | bumping up java cdk version for mssql | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
π« [vale] reported by reviewdog πΆ
[Vale.Spelling] Did you really mean 'mssql'?
docs/integrations/sources/mssql.md
Outdated
| 4.2.1 | 2025-06-23 | [62015](https://github.com/airbytehq/airbyte/pull/62015) | Fix previous merge. Improve cutoff date handling | | ||
| 4.3.0 | 2025-09-26 | [63731](https://github.com/airbytehq/airbyte/pull/63731) | Migrate source mssql from old CDK to new CDK | | ||
| 4.2.5 | 2025-08-13 | [64905](https://github.com/airbytehq/airbyte/pull/64905) | bumping up java cdk version for mssql | | ||
| 4.2.4 | 2025-07-03 | [62491](https://github.com/airbytehq/airbyte/pull/62491) | Improve Debezium performance by configuring the poll interval parameter. | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
π« [vale] reported by reviewdog πΆ
[Vale.Spelling] Did you really mean 'Debezium'?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some questions and small requests. This is a huge piece of work! Thanks for handling it.
@JsonProperty("pk_val") val pkVal: String? = null, | ||
@JsonProperty("pk_name") val pkName: String? = null, | ||
@JsonProperty("version") val version: Int? = null, | ||
@JsonProperty("state_type") val stateType: String? = null, | ||
@JsonProperty("incremental_state") val incrementalState: JsonNode? = null, | ||
@JsonProperty("stream_name") val streamName: String? = null, | ||
@JsonProperty("cursor_field") val cursorField: List<String>? = null, | ||
@JsonProperty("stream_namespace") val streamNamespace: String? = null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't the default naming strategy for Jackson convert these names to snake_case
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked this up and it's configurable in the ObjectMapper. We don't do that in Jsons
, however. I imagine it's because we want to avoid doing any kind of renaming of fields or namespaces in the connection state. Given that, this way seems ideal.
primaryKey: List<Field>, | ||
primaryKeyCheckpoint: List<JsonNode>, | ||
): OpaqueStateValue { | ||
val primaryKeyField = primaryKey.first() | ||
return when (primaryKeyCheckpoint.first().isNull) { | ||
true -> Jsons.nullNode() | ||
false -> | ||
Jsons.valueToTree( | ||
MsSqlServerCdcInitialSnapshotStateValue( | ||
pkName = primaryKeyField.id, | ||
pkVal = primaryKeyCheckpoint.first().asText(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we only using the first PK field? Imagine the PK fields are year, month, day. This way, we can't checkpoint until we finish reading a whole year's data, right? Is the snapshot state value object defined for backwards compatibility with old states so we can't add the other fields/values?
private const val MSSQL_DATE_TYPE = "DATE" | ||
private const val MSSQL_DATETIME_TYPE = "DATETIME" | ||
private const val MSSQL_DATETIME2_TYPE = "DATETIME2" | ||
private const val MSSQL_SMALLDATETIME_TYPE = "SMALLDATETIME" | ||
private const val MSSQL_DATETIMEOFFSET_TYPE = "DATETIMEOFFSET" | ||
private const val MSSQL_TIME_TYPE = "TIME" | ||
private const val MSSQL_SMALLMONEY_TYPE = "SMALLMONEY" | ||
private const val MSSQL_MONEY_TYPE = "MONEY" | ||
private const val MSSQL_BINARY_TYPE = "BINARY" | ||
private const val MSSQL_VARBINARY_TYPE = "VARBINARY" | ||
private const val MSSQL_IMAGE_TYPE = "IMAGE" | ||
private const val MSSQL_GEOMETRY_TYPE = "GEOMETRY" | ||
private const val MSSQL_GEOGRAPHY_TYPE = "GEOGRAPHY" | ||
private const val MSSQL_UNIQUEIDENTIFIER_TYPE = "UNIQUEIDENTIFIER" | ||
private const val MSSQL_XML_TYPE = "XML" | ||
private const val MSSQL_HIERARCHYID_TYPE = "HIERARCHYID" | ||
private const val MSSQL_SQL_VARIANT_TYPE = "SQL_VARIANT" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as far as I can tell these are each used once - maybe we can just inline them?
if (value == null) return null | ||
|
||
return try { | ||
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this format is also present in MsSqlServerCursorCutoffTimeProvider. Maybe we can centralize the definition?
if (value == null) return null | ||
|
||
return try { | ||
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one looks slightly different than the one in MsSqlServerCursorCutoffTimeProvider. Should it be?
is And -> conj.flatMap { it.bindings() } | ||
is Or -> disj.flatMap { it.bindings() } | ||
is WhereClauseLeafNode -> { | ||
val type = column.type as LosslessJdbcFieldType<*, *> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is related to some of the type issues that Rodi and I have been looking at recently. It would be nice if we had compile-time enforcement of which field types were eligible for use as a cursor and which weren't. Probably nothing we can do about it at the moment.
// For MSSQL, we would need to deserialize the state to get the LSN | ||
// This is a placeholder implementation - actual implementation would extract LSN from state |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this is what we're doing. Old comment?
} | ||
} | ||
} catch (e: Exception) { | ||
// Log error but don't fail the sync |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like we're not logging anything ATM
} | ||
} | ||
} catch (e: Exception) { | ||
// Log error but don't fail the sync - keep the empty string value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same - no logging currently
- pattern: (?i).*timeout expired.* | ||
input-example: >- | ||
com.microsoft.sqlserver.jdbc.SQLServerException: The query has timed out. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I imagine this was copied from the old connector, but it looks like the input example doesn't contain the pattern.
8db7e49
to
84192a3
Compare
84192a3
to
b3afefe
Compare
What
How
Review guide
User Impact
Can this PR be safely reverted and rolled back?