Skip to content

Commit dc78a11

Browse files
committed
[FLINK-38839][runtime] Support semicolon delimiter for table-options
1 parent 097aa9a commit dc78a11

4 files changed

Lines changed: 79 additions & 6 deletions

File tree

docs/content.zh/docs/core-concept/transform.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ transform:
311311
table-options: comment=web order
312312
description: auto creating table options example
313313
```
314-
小技巧:table-options 的格式是 `key1=value1,key2=value2`。
314+
小技巧:table-options 的格式是 `key1=value1,key2=value2`;如果 value 中包含逗号,使用分号分隔:`key1=value1;key2=value2`
315315

316316
## Classification mapping
317317
多个转换规则可以定义为分类映射。
@@ -466,4 +466,4 @@ pipeline:
466466
|---------------|--------|-------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
467467
| openai.model | STRING | required | Name of model to be called, for example: "text-embedding-3-small", Available options are "text-embedding-3-small", "text-embedding-3-large", "text-embedding-ada-002". |
468468
| openai.host | STRING | required | Host of the Model server to be connected, for example: `http://langchain4j.dev/demo/openai/v1`. |
469-
| openai.apikey | STRING | required | Api Key for verification of the Model server, for example, "demo". |
469+
| openai.apikey | STRING | required | Api Key for verification of the Model server, for example, "demo". |

docs/content/docs/core-concept/transform.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,7 @@ transform:
315315
description: auto creating table options example
316316
```
317317
Tips: The format of table-options is `key1=value1,key2=value2`.
318+
If option values contain commas, use semicolon delimiter like `key1=value1;key2=value2`.
318319

319320
## Classification mapping
320321
Multiple transform rules can be defined to classify input data rows and apply different processing.
@@ -471,4 +472,4 @@ The following built-in models are provided:
471472
|---------------|--------|-------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
472473
| openai.model | STRING | required | Name of model to be called, for example: "text-embedding-3-small", Available options are "text-embedding-3-small", "text-embedding-3-large", "text-embedding-ada-002". |
473474
| openai.host | STRING | required | Host of the Model server to be connected, for example: `http://langchain4j.dev/demo/openai/v1`. |
474-
| openai.apikey | STRING | required | Api Key for verification of the Model server, for example, "demo". |
475+
| openai.apikey | STRING | required | Api Key for verification of the Model server, for example, "demo". |

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/SchemaMetadataTransform.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,17 @@ public SchemaMetadataTransform(
5858
partitionKeys = Arrays.asList(partitionKeyArr);
5959
}
6060
if (!StringUtils.isNullOrWhitespaceOnly(tableOptionString)) {
61-
for (String tableOption : tableOptionString.split(",")) {
62-
String[] kv = tableOption.split("=");
61+
// Use semicolon as delimiter to support multi-value options like
62+
// sequence.field=jjsj,gxsj.
63+
// Keep comma delimiter for backward compatibility.
64+
String delimiter = tableOptionString.contains(";") ? ";" : ",";
65+
for (String tableOption : tableOptionString.split(delimiter)) {
66+
String[] kv = tableOption.split("=", 2);
6367
if (kv.length != 2) {
6468
throw new IllegalArgumentException(
6569
"table option format error: "
6670
+ tableOptionString
67-
+ ", it should be like `key1=value1,key2=value2`.");
71+
+ ", it should be like `key1=value1,key2=value2` or `key1=value1;key2=value2`.");
6872
}
6973
options.put(kv[0].trim(), kv[1].trim());
7074
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.runtime.operators.transform;
19+
20+
import org.junit.jupiter.api.Test;
21+
22+
import static org.assertj.core.api.Assertions.assertThat;
23+
24+
/** Testcases for {@link SchemaMetadataTransform}. */
25+
class SchemaMetadataTransformTest {
26+
27+
@Test
28+
void testTableOptionsWithCommaDelimiter() {
29+
SchemaMetadataTransform transform =
30+
new SchemaMetadataTransform(null, null, "key1=value1,key2=value2");
31+
assertThat(transform.getOptions())
32+
.containsEntry("key1", "value1")
33+
.containsEntry("key2", "value2");
34+
}
35+
36+
@Test
37+
void testTableOptionsWithSemicolonDelimiter() {
38+
SchemaMetadataTransform transform =
39+
new SchemaMetadataTransform(null, null, "key1=value1;key2=value2");
40+
assertThat(transform.getOptions())
41+
.containsEntry("key1", "value1")
42+
.containsEntry("key2", "value2");
43+
}
44+
45+
@Test
46+
void testTableOptionsWithCommaInValue() {
47+
SchemaMetadataTransform transform =
48+
new SchemaMetadataTransform(
49+
null,
50+
null,
51+
"sequence.field=gxsj,jjsj;"
52+
+ "file-index.range-bitmap.columns=jjsj;"
53+
+ "file-index.bloom-filter.columns=jjdbh");
54+
assertThat(transform.getOptions())
55+
.containsEntry("sequence.field", "gxsj,jjsj")
56+
.containsEntry("file-index.range-bitmap.columns", "jjsj")
57+
.containsEntry("file-index.bloom-filter.columns", "jjdbh");
58+
}
59+
60+
@Test
61+
void testTableOptionsSplitByFirstEqualSign() {
62+
SchemaMetadataTransform transform =
63+
new SchemaMetadataTransform(null, null, "key1=value=1;key2=value2");
64+
assertThat(transform.getOptions())
65+
.containsEntry("key1", "value=1")
66+
.containsEntry("key2", "value2");
67+
}
68+
}

0 commit comments

Comments
 (0)