Skip to content

Commit 5847562

Browse files
committed
[FLINK-39252][sqlserver] Refine timestamp and type handling
- Preserve timestamp startup instants when binding with server time zone - Prefer SQL Server type names for precise pipeline type mapping - Use shared regex splitters for table pattern validation - Expand SQL Server connector docs and type-mapping coverage
1 parent 4367173 commit 5847562

8 files changed

Lines changed: 327 additions & 61 deletions

File tree

docs/content.zh/docs/connectors/pipeline-connectors/sqlserver.md

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,31 @@ SQL Server CDC Pipeline 连接器支持从 SQL Server 数据库读取快照数
3535
- 配置的 SQL Server 用户可以连接服务器并读取被捕获的表。
3636
- 当前连接器的 `tables` 配置只支持一个字面量数据库名;schema 和 table 部分可以使用正则表达式。
3737

38+
为数据库和表启用 CDC:
39+
40+
```sql
41+
USE MyDB;
42+
GO
43+
EXEC sys.sp_cdc_enable_db;
44+
GO
45+
46+
EXEC sys.sp_cdc_enable_table
47+
@source_schema = N'dbo',
48+
@source_name = N'MyTable',
49+
@role_name = NULL,
50+
@supports_net_changes = 0;
51+
GO
52+
```
53+
54+
检查表是否已启用 CDC:
55+
56+
```sql
57+
USE MyDB;
58+
GO
59+
EXEC sys.sp_cdc_help_change_data_capture;
60+
GO
61+
```
62+
3863
## 示例
3964

4065
从 SQL Server 读取数据同步到 Doris 的 Pipeline 可以定义如下:
@@ -237,6 +262,42 @@ pipeline:
237262
</table>
238263
</div>
239264

265+
## 可用 Metadata
266+
267+
配置 `metadata.list` 后,以下 metadata 可以传递到下游。
268+
269+
<table class="colwidths-auto docutils">
270+
<thead>
271+
<tr>
272+
<th class="text-left" style="width: 15%">Key</th>
273+
<th class="text-left" style="width: 30%">DataType</th>
274+
<th class="text-left" style="width: 55%">Description</th>
275+
</tr>
276+
</thead>
277+
<tbody>
278+
<tr>
279+
<td>database_name</td>
280+
<td>STRING NOT NULL</td>
281+
<td>包含该行的数据库名称。</td>
282+
</tr>
283+
<tr>
284+
<td>schema_name</td>
285+
<td>STRING NOT NULL</td>
286+
<td>包含该行的 schema 名称。</td>
287+
</tr>
288+
<tr>
289+
<td>table_name</td>
290+
<td>STRING NOT NULL</td>
291+
<td>包含该行的表名。</td>
292+
</tr>
293+
<tr>
294+
<td>op_ts</td>
295+
<td>TIMESTAMP_LTZ(3) NOT NULL</td>
296+
<td>该变更在数据库中发生的时间。对于快照记录,该值始终为 0。</td>
297+
</tr>
298+
</tbody>
299+
</table>
300+
240301
## 启动读取位置
241302

242303
配置项 `scan.startup.mode` 指定 SQL Server CDC 消费者的启动模式。有效值包括:
@@ -246,4 +307,10 @@ pipeline:
246307
- `snapshot`:只读取快照。
247308
- `timestamp`:从 `scan.startup.timestamp-millis` 指定的时间戳开始读取。
248309

310+
## 限制
311+
312+
### 单数据库
313+
314+
`tables` 中的所有条目必须属于同一个字面量数据库。schema 和 table 名称支持正则表达式,但 database 部分不支持正则表达式。
315+
249316
{{< top >}}

docs/content/docs/connectors/pipeline-connectors/sqlserver.md

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,31 @@ This document describes how to set up the SQL Server connector.
3636
- The configured SQL Server user can connect to the server and read the captured tables.
3737
- The current connector scope supports one literal database name in `tables`; schema and table components may use regular expressions.
3838

39+
Enable CDC on a database and table:
40+
41+
```sql
42+
USE MyDB;
43+
GO
44+
EXEC sys.sp_cdc_enable_db;
45+
GO
46+
47+
EXEC sys.sp_cdc_enable_table
48+
@source_schema = N'dbo',
49+
@source_name = N'MyTable',
50+
@role_name = NULL,
51+
@supports_net_changes = 0;
52+
GO
53+
```
54+
55+
Verify that the table is enabled for CDC:
56+
57+
```sql
58+
USE MyDB;
59+
GO
60+
EXEC sys.sp_cdc_help_change_data_capture;
61+
GO
62+
```
63+
3964
## Example
4065

4166
An example of the pipeline for reading data from SQL Server and sink to Doris can be defined as follows:
@@ -238,6 +263,42 @@ pipeline:
238263
</table>
239264
</div>
240265

266+
## Available Metadata
267+
268+
The following metadata can be passed downstream when configured in `metadata.list`.
269+
270+
<table class="colwidths-auto docutils">
271+
<thead>
272+
<tr>
273+
<th class="text-left" style="width: 15%">Key</th>
274+
<th class="text-left" style="width: 30%">DataType</th>
275+
<th class="text-left" style="width: 55%">Description</th>
276+
</tr>
277+
</thead>
278+
<tbody>
279+
<tr>
280+
<td>database_name</td>
281+
<td>STRING NOT NULL</td>
282+
<td>Name of the database that contains the row.</td>
283+
</tr>
284+
<tr>
285+
<td>schema_name</td>
286+
<td>STRING NOT NULL</td>
287+
<td>Name of the schema that contains the row.</td>
288+
</tr>
289+
<tr>
290+
<td>table_name</td>
291+
<td>STRING NOT NULL</td>
292+
<td>Name of the table that contains the row.</td>
293+
</tr>
294+
<tr>
295+
<td>op_ts</td>
296+
<td>TIMESTAMP_LTZ(3) NOT NULL</td>
297+
<td>Time when the change was made in the database. For snapshot records, the value is always 0.</td>
298+
</tr>
299+
</tbody>
300+
</table>
301+
241302
## Startup Reading Position
242303

243304
The config option `scan.startup.mode` specifies the startup mode for SQL Server CDC consumer. The valid values are:
@@ -247,4 +308,10 @@ The config option `scan.startup.mode` specifies the startup mode for SQL Server
247308
- `snapshot`: Reads the snapshot only.
248309
- `timestamp`: Starts from the specified timestamp in `scan.startup.timestamp-millis`.
249310

311+
## Limitations
312+
313+
### Single Database
314+
315+
All entries in `tables` must belong to the same literal database. Regular expressions are supported for schema and table names, but not for the database segment.
316+
250317
{{< top >}}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/factory/SqlServerDataSourceFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.flink.cdc.common.factories.FactoryHelper;
2727
import org.apache.flink.cdc.common.schema.Selectors;
2828
import org.apache.flink.cdc.common.source.DataSource;
29+
import org.apache.flink.cdc.common.utils.Predicates;
2930
import org.apache.flink.cdc.common.utils.StringUtils;
3031
import org.apache.flink.cdc.connectors.base.options.SourceOptions;
3132
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
@@ -372,12 +373,12 @@ private String getValidateDatabaseName(String tables) {
372373
throw new IllegalArgumentException("Parameter tables cannot be null or empty");
373374
}
374375

375-
String[] tableNames = tables.split(",");
376+
String[] tableNames = Predicates.RegExSplitterByComma.split(tables);
376377
String dbName = null;
377378

378379
for (String tableName : tableNames) {
379380
String trimmedTableName = tableName.trim();
380-
String[] tableNameParts = trimmedTableName.split("(?<!\\\\)\\.", -1);
381+
String[] tableNameParts = Predicates.RegExSplitterByDot.split(trimmedTableName);
381382

382383
checkState(
383384
tableNameParts.length == 3,

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerDataSourceOptions.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public class SqlServerDataSourceOptions {
6161
.withDescription(
6262
"Table names of the SQL Server tables to monitor. Regular expressions are supported. "
6363
+ "All table patterns must use the same literal database name; database-level regular expressions or cross-database patterns are not supported. "
64+
+ "Multiple entries are separated by a comma (,); escape a comma with a backslash when it is part of a regular expression. "
6465
+ "It is important to note that the dot (.) is treated as a delimiter for database, schema and table names. "
6566
+ "If there is a need to use a dot (.) in a regular expression to match any character, "
6667
+ "it is necessary to escape the dot with a backslash."
@@ -202,6 +203,7 @@ public class SqlServerDataSourceOptions {
202203
.withDescription(
203204
"Table names of the SQL Server tables to Exclude. Regular expressions are supported. "
204205
+ "All exclude patterns must use the same literal database name as the tables option; database-level regular expressions or cross-database patterns are not supported. "
206+
+ "Multiple entries are separated by a comma (,); escape a comma with a backslash when it is part of a regular expression. "
205207
+ "It is important to note that the dot (.) is treated as a delimiter for database, schema and table names. "
206208
+ "If there is a need to use a dot (.) in a regular expression to match any character, "
207209
+ "it is necessary to escape the dot with a backslash."

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/utils/SqlServerTypeUtils.java

Lines changed: 37 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,37 @@ private static DataType convertFromColumn(Column column) {
7070
int scale = column.scale().orElse(0);
7171
String typeName = column.typeName();
7272

73-
if (MONEY.equalsIgnoreCase(typeName)) {
74-
return DataTypes.DECIMAL(MONEY_PRECISION, MONEY_SCALE);
75-
}
76-
if (SMALL_MONEY.equalsIgnoreCase(typeName)) {
77-
return DataTypes.DECIMAL(SMALL_MONEY_PRECISION, MONEY_SCALE);
73+
if (typeName != null) {
74+
switch (typeName.toLowerCase()) {
75+
case UNIQUE_IDENTIFIER:
76+
case XML:
77+
case SQL_VARIANT:
78+
case HIERARCHY_ID:
79+
case GEOMETRY:
80+
case GEOGRAPHY:
81+
return DataTypes.STRING();
82+
case MONEY:
83+
return DataTypes.DECIMAL(MONEY_PRECISION, MONEY_SCALE);
84+
case SMALL_MONEY:
85+
return DataTypes.DECIMAL(SMALL_MONEY_PRECISION, MONEY_SCALE);
86+
case DATETIME_OFFSET:
87+
return DataTypes.TIMESTAMP_LTZ(column.scale().orElse(7));
88+
case DATETIME2:
89+
return DataTypes.TIMESTAMP(column.scale().orElse(7));
90+
case DATETIME:
91+
return DataTypes.TIMESTAMP(3);
92+
case SMALL_DATETIME:
93+
return DataTypes.TIMESTAMP(0);
94+
case IMAGE:
95+
case TIMESTAMP:
96+
case ROW_VERSION:
97+
return DataTypes.BYTES();
98+
case TEXT:
99+
case N_TEXT:
100+
return DataTypes.STRING();
101+
default:
102+
// Fall through to JDBC type handling.
103+
}
78104
}
79105

80106
switch (column.jdbcType()) {
@@ -104,7 +130,10 @@ private static DataType convertFromColumn(Column column) {
104130
return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, DecimalType.DEFAULT_SCALE);
105131
case Types.CHAR:
106132
case Types.NCHAR:
107-
return DataTypes.CHAR(precision);
133+
if (precision > 0) {
134+
return DataTypes.CHAR(precision);
135+
}
136+
return DataTypes.STRING();
108137
case Types.VARCHAR:
109138
case Types.NVARCHAR:
110139
case Types.LONGVARCHAR:
@@ -127,51 +156,12 @@ private static DataType convertFromColumn(Column column) {
127156
case Types.TIME_WITH_TIMEZONE:
128157
return DataTypes.TIME(Math.max(scale, 0));
129158
case Types.TIMESTAMP:
130-
return DataTypes.TIMESTAMP(scale > 0 ? scale : 6);
159+
return DataTypes.TIMESTAMP(column.scale().orElse(6));
131160
case Types.TIMESTAMP_WITH_TIMEZONE:
132-
return DataTypes.TIMESTAMP_LTZ(scale > 0 ? scale : 6);
161+
return DataTypes.TIMESTAMP_LTZ(column.scale().orElse(6));
133162
case Types.STRUCT:
134-
// SQL Server specific types like unique identifier, xml, etc.
135-
if (UNIQUE_IDENTIFIER.equalsIgnoreCase(typeName)) {
136-
return DataTypes.STRING();
137-
}
138163
return DataTypes.STRING();
139164
default:
140-
// For unknown types, try to handle them as STRING
141-
String unknownTypeName = typeName;
142-
if (unknownTypeName != null) {
143-
// Handle SQL Server specific types
144-
switch (unknownTypeName.toLowerCase()) {
145-
case UNIQUE_IDENTIFIER:
146-
case XML:
147-
case SQL_VARIANT:
148-
case HIERARCHY_ID:
149-
case GEOMETRY:
150-
case GEOGRAPHY:
151-
return DataTypes.STRING();
152-
case MONEY:
153-
return DataTypes.DECIMAL(MONEY_PRECISION, MONEY_SCALE);
154-
case SMALL_MONEY:
155-
return DataTypes.DECIMAL(SMALL_MONEY_PRECISION, MONEY_SCALE);
156-
case DATETIME_OFFSET:
157-
return DataTypes.TIMESTAMP_LTZ(scale > 0 ? scale : 7);
158-
case DATETIME2:
159-
return DataTypes.TIMESTAMP(scale > 0 ? scale : 7);
160-
case DATETIME:
161-
return DataTypes.TIMESTAMP(3);
162-
case SMALL_DATETIME:
163-
return DataTypes.TIMESTAMP(0);
164-
case IMAGE:
165-
case TIMESTAMP:
166-
case ROW_VERSION:
167-
return DataTypes.BYTES();
168-
case TEXT:
169-
case N_TEXT:
170-
return DataTypes.STRING();
171-
default:
172-
// Fall through to exception
173-
}
174-
}
175165
throw new UnsupportedOperationException(
176166
String.format(
177167
"Doesn't support SQL Server type '%s', JDBC type '%d' yet.",

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerMetadataAccessorITCase.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,14 +167,14 @@ public void testAccessTimeTypesSchema() {
167167
DataTypes.TIME(0),
168168
DataTypes.TIME(3),
169169
DataTypes.TIME(6),
170-
DataTypes.TIMESTAMP(6),
170+
DataTypes.TIMESTAMP(0),
171171
DataTypes.TIMESTAMP(3),
172172
DataTypes.TIMESTAMP(6),
173-
DataTypes.TIMESTAMP_LTZ(7),
173+
DataTypes.TIMESTAMP_LTZ(0),
174174
DataTypes.TIMESTAMP_LTZ(3),
175175
DataTypes.TIMESTAMP_LTZ(6),
176176
DataTypes.TIMESTAMP(3),
177-
DataTypes.TIMESTAMP(6)
177+
DataTypes.TIMESTAMP(0)
178178
},
179179
new String[] {
180180
"id",

0 commit comments

Comments
 (0)