Skip to content

Commit dc5e923

Browse files
authored
[FLINK-38160][starrocks] Add support for BINARY and VARBINARY types in StarRocks connector (#4303)
1 parent c003b93 commit dc5e923

7 files changed

Lines changed: 237 additions & 17 deletions

File tree

docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,21 @@ pipeline:
337337
<td>CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。根据 UTF-8 编码,一个中文字符占用三个字节,因此 CDC 中的长度对应到 StarRocks
338338
中为 n * 3。</td>
339339
</tr>
340+
<tr>
341+
<td>BINARY(n)</td>
342+
<td>VARBINARY(min(n,1048576))</td>
343+
<td>长度上限为 1048576。</td>
344+
</tr>
345+
<tr>
346+
<td>VARBINARY(n)</td>
347+
<td>VARBINARY(min(n,1048576))</td>
348+
<td>长度上限为 1048576。</td>
349+
</tr>
350+
<tr>
351+
<td>BYTES</td>
352+
<td>VARBINARY(1048576)</td>
353+
<td>BYTES 映射为最大长度为 1048576 的 VARBINARY。</td>
354+
</tr>
340355
</tbody>
341356
</table>
342357
</div>

docs/content/docs/connectors/pipeline-connectors/starrocks.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,21 @@ pipeline:
348348
<td>CDC defines the length by characters, and StarRocks defines it by bytes. According to UTF-8, one Chinese
349349
character is equal to three bytes, so the length for StarRocks is n * 3.</td>
350350
</tr>
351+
<tr>
352+
<td>BINARY(n)</td>
353+
<td>VARBINARY(min(n,1048576))</td>
354+
<td>The length is capped to 1048576.</td>
355+
</tr>
356+
<tr>
357+
<td>VARBINARY(n)</td>
358+
<td>VARBINARY(min(n,1048576))</td>
359+
<td>The length is capped to 1048576.</td>
360+
</tr>
361+
<tr>
362+
<td>BYTES</td>
363+
<td>VARBINARY(1048576)</td>
364+
<td>BYTES is mapped to VARBINARY with max length 1048576.</td>
365+
</tr>
351366
</tbody>
352367
</table>
353368
</div>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.cdc.common.schema.Column;
2323
import org.apache.flink.cdc.common.schema.Schema;
2424
import org.apache.flink.cdc.common.types.BigIntType;
25+
import org.apache.flink.cdc.common.types.BinaryType;
2526
import org.apache.flink.cdc.common.types.BooleanType;
2627
import org.apache.flink.cdc.common.types.CharType;
2728
import org.apache.flink.cdc.common.types.DataType;
@@ -36,6 +37,7 @@
3637
import org.apache.flink.cdc.common.types.TimeType;
3738
import org.apache.flink.cdc.common.types.TimestampType;
3839
import org.apache.flink.cdc.common.types.TinyIntType;
40+
import org.apache.flink.cdc.common.types.VarBinaryType;
3941
import org.apache.flink.cdc.common.types.VarCharType;
4042

4143
import com.starrocks.connector.flink.catalog.StarRocksColumn;
@@ -215,6 +217,10 @@ record ->
215217
fieldGetter =
216218
record -> record.getDate(fieldPos).toLocalDate().format(DATE_FORMATTER);
217219
break;
220+
case BINARY:
221+
case VARBINARY:
222+
fieldGetter = record -> record.getBinary(fieldPos);
223+
break;
218224
case TIME_WITHOUT_TIME_ZONE:
219225
fieldGetter =
220226
record ->
@@ -273,6 +279,7 @@ fieldPos, getPrecision(fieldType))
273279
public static final String STRING = "STRING";
274280
public static final String DATE = "DATE";
275281
public static final String DATETIME = "DATETIME";
282+
public static final String VARBINARY = "VARBINARY";
276283
public static final String JSON = "JSON";
277284

278285
/** Max size of char type of StarRocks. */
@@ -281,6 +288,9 @@ fieldPos, getPrecision(fieldType))
281288
/** Max size of varchar type of StarRocks. */
282289
public static final int MAX_VARCHAR_SIZE = 1048576;
283290

291+
/** Max size of varbinary type of StarRocks. */
292+
public static final int MAX_VARBINARY_SIZE = 1048576;
293+
284294
/** Transforms CDC {@link DataType} to StarRocks data type. */
285295
public static class CdcDataTypeTransformer
286296
extends DataTypeDefaultVisitor<StarRocksColumn.Builder> {
@@ -406,6 +416,22 @@ public StarRocksColumn.Builder visit(VarCharType varCharType) {
406416
return builder;
407417
}
408418

419+
@Override
420+
public StarRocksColumn.Builder visit(BinaryType binaryType) {
421+
builder.setDataType(VARBINARY);
422+
builder.setNullable(binaryType.isNullable());
423+
builder.setColumnSize(Math.min(binaryType.getLength(), MAX_VARBINARY_SIZE));
424+
return builder;
425+
}
426+
427+
@Override
428+
public StarRocksColumn.Builder visit(VarBinaryType varBinaryType) {
429+
builder.setDataType(VARBINARY);
430+
builder.setNullable(varBinaryType.isNullable());
431+
builder.setColumnSize(Math.min(varBinaryType.getLength(), MAX_VARBINARY_SIZE));
432+
return builder;
433+
}
434+
409435
@Override
410436
public StarRocksColumn.Builder visit(DateType dateType) {
411437
builder.setDataType(DATE);

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/CdcDataTypeTransformerTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717

1818
package org.apache.flink.cdc.connectors.starrocks.sink;
1919

20+
import org.apache.flink.cdc.common.types.BinaryType;
2021
import org.apache.flink.cdc.common.types.CharType;
22+
import org.apache.flink.cdc.common.types.DataTypes;
2123
import org.apache.flink.cdc.common.types.DecimalType;
24+
import org.apache.flink.cdc.common.types.VarBinaryType;
2225
import org.apache.flink.cdc.common.types.VarCharType;
2326

2427
import com.starrocks.connector.flink.catalog.StarRocksColumn;
@@ -146,4 +149,38 @@ void testVarCharType() {
146149
.hasValue(StarRocksUtils.MAX_VARCHAR_SIZE);
147150
Assertions.assertThat(largeLengthColumn.isNullable()).isTrue();
148151
}
152+
153+
@Test
154+
void testBinaryType() {
155+
StarRocksColumn.Builder builder =
156+
new StarRocksColumn.Builder().setColumnName("binary_col").setOrdinalPosition(0);
157+
new BinaryType(17).accept(new StarRocksUtils.CdcDataTypeTransformer(false, builder));
158+
StarRocksColumn column = builder.build();
159+
Assertions.assertThat(column.getDataType()).isEqualTo(StarRocksUtils.VARBINARY);
160+
Assertions.assertThat(column.getColumnSize()).hasValue(17);
161+
Assertions.assertThat(column.isNullable()).isTrue();
162+
}
163+
164+
@Test
165+
void testVarBinaryType() {
166+
StarRocksColumn.Builder builder =
167+
new StarRocksColumn.Builder().setColumnName("varbinary_col").setOrdinalPosition(0);
168+
new VarBinaryType(255).accept(new StarRocksUtils.CdcDataTypeTransformer(false, builder));
169+
StarRocksColumn column = builder.build();
170+
Assertions.assertThat(column.getDataType()).isEqualTo(StarRocksUtils.VARBINARY);
171+
Assertions.assertThat(column.getColumnSize()).hasValue(255);
172+
Assertions.assertThat(column.isNullable()).isTrue();
173+
}
174+
175+
@Test
176+
void testBytesType() {
177+
// BYTES is VarBinaryType with MAX_LENGTH, should be capped to MAX_VARBINARY_SIZE
178+
StarRocksColumn.Builder builder =
179+
new StarRocksColumn.Builder().setColumnName("bytes_col").setOrdinalPosition(0);
180+
DataTypes.BYTES().accept(new StarRocksUtils.CdcDataTypeTransformer(false, builder));
181+
StarRocksColumn column = builder.build();
182+
Assertions.assertThat(column.getDataType()).isEqualTo(StarRocksUtils.VARBINARY);
183+
Assertions.assertThat(column.getColumnSize()).hasValue(StarRocksUtils.MAX_VARBINARY_SIZE);
184+
Assertions.assertThat(column.isNullable()).isTrue();
185+
}
149186
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.flink.cdc.common.event.TableId;
3939
import org.apache.flink.cdc.common.schema.Column;
4040
import org.apache.flink.cdc.common.schema.Schema;
41+
import org.apache.flink.cdc.common.types.BinaryType;
4142
import org.apache.flink.cdc.common.types.BooleanType;
4243
import org.apache.flink.cdc.common.types.DataType;
4344
import org.apache.flink.cdc.common.types.DateType;
@@ -48,6 +49,7 @@
4849
import org.apache.flink.cdc.common.types.SmallIntType;
4950
import org.apache.flink.cdc.common.types.TimeType;
5051
import org.apache.flink.cdc.common.types.TimestampType;
52+
import org.apache.flink.cdc.common.types.VarBinaryType;
5153
import org.apache.flink.cdc.common.types.VarCharType;
5254
import org.apache.flink.cdc.common.utils.SchemaUtils;
5355
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
@@ -77,6 +79,7 @@
7779
import java.time.ZoneId;
7880
import java.time.ZoneOffset;
7981
import java.util.Arrays;
82+
import java.util.Base64;
8083
import java.util.HashMap;
8184
import java.util.Objects;
8285
import java.util.OptionalLong;
@@ -484,6 +487,77 @@ void testTimeTypeWithNullValues() throws Exception {
484487
tableId, "{\"id\":1,\"not_null_time\":\"12:00:00\",\"__op\":0}", result);
485488
}
486489

490+
@Test
491+
void testBinaryTypeSerialization() throws Exception {
492+
TableId tableId = TableId.parse("test.binary_table");
493+
Schema schema =
494+
Schema.newBuilder()
495+
.physicalColumn("id", new IntType())
496+
.physicalColumn("bin_col", new BinaryType(10))
497+
.physicalColumn("varbin_col", new VarBinaryType(255))
498+
.primaryKey("id")
499+
.build();
500+
501+
CreateTableEvent createTableEvent = new CreateTableEvent(tableId, schema);
502+
Assertions.assertThat(serializer.serialize(createTableEvent)).isNull();
503+
504+
BinaryRecordDataGenerator generator =
505+
new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
506+
507+
byte[] binData = new byte[] {1, 2, 3, 4, 5};
508+
byte[] varBinData = new byte[] {0x0A, 0x0B, 0x0C};
509+
510+
DataChangeEvent insertEvent =
511+
DataChangeEvent.insertEvent(
512+
tableId, generator.generate(new Object[] {1, binData, varBinData}));
513+
514+
StarRocksRowData result = serializer.serialize(insertEvent);
515+
Assertions.assertThat(result).isNotNull();
516+
Assertions.assertThat(result.getDatabase()).isEqualTo(tableId.getSchemaName());
517+
Assertions.assertThat(result.getTable()).isEqualTo(tableId.getTableName());
518+
519+
// Binary data is serialized as Base64-encoded strings in JSON by Jackson
520+
String expectedBin = Base64.getEncoder().encodeToString(binData);
521+
String expectedVarBin = Base64.getEncoder().encodeToString(varBinData);
522+
String expectedJson =
523+
String.format(
524+
"{\"id\":1,\"bin_col\":\"%s\",\"varbin_col\":\"%s\",\"__op\":0}",
525+
expectedBin, expectedVarBin);
526+
527+
SortedMap<String, Object> expectMap =
528+
objectMapper.readValue(
529+
expectedJson, new TypeReference<TreeMap<String, Object>>() {});
530+
SortedMap<String, Object> actualMap =
531+
objectMapper.readValue(
532+
result.getRow(), new TypeReference<TreeMap<String, Object>>() {});
533+
Assertions.assertThat(actualMap).isEqualTo(expectMap);
534+
}
535+
536+
@Test
537+
void testBinaryTypeWithNullValues() throws Exception {
538+
TableId tableId = TableId.parse("test.binary_null_table");
539+
Schema schema =
540+
Schema.newBuilder()
541+
.physicalColumn("id", new IntType())
542+
.physicalColumn("nullable_bin", new VarBinaryType(100))
543+
.primaryKey("id")
544+
.build();
545+
546+
CreateTableEvent createTableEvent = new CreateTableEvent(tableId, schema);
547+
Assertions.assertThat(serializer.serialize(createTableEvent)).isNull();
548+
549+
BinaryRecordDataGenerator generator =
550+
new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
551+
552+
DataChangeEvent insertEvent =
553+
DataChangeEvent.insertEvent(tableId, generator.generate(new Object[] {1, null}));
554+
555+
StarRocksRowData result = serializer.serialize(insertEvent);
556+
Assertions.assertThat(result).isNotNull();
557+
558+
verifySerializeResult(tableId, "{\"id\":1,\"__op\":0}", result);
559+
}
560+
487561
private void verifySerializeResult(
488562
TableId expectTable, String expectRow, StarRocksRowData actualRowData)
489563
throws Exception {

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -197,11 +197,11 @@ void testStarRocksDataType() throws Exception {
197197
Schema schema =
198198
Schema.newBuilder()
199199
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), "ID"))
200-
// StarRocks sink doesn't support BINARY and BYTES type yet.
201-
// .column(new PhysicalColumn("binary", DataTypes.BINARY(17), "Binary"))
202-
// .column(new PhysicalColumn("varbinary", DataTypes.VARBINARY(17), "Var
203-
// Binary"))
204-
// .column(new PhysicalColumn("bytes", DataTypes.BYTES(), "Bytes"))
200+
.column(new PhysicalColumn("binary", DataTypes.BINARY(17), "Binary"))
201+
.column(
202+
new PhysicalColumn(
203+
"varbinary", DataTypes.VARBINARY(17), "Var Binary"))
204+
.column(new PhysicalColumn("bytes", DataTypes.BYTES(), "Bytes"))
205205
.column(new PhysicalColumn("boolean", DataTypes.BOOLEAN(), "Boolean"))
206206
.column(new PhysicalColumn("int", DataTypes.INT(), "Int"))
207207
.column(new PhysicalColumn("tinyint", DataTypes.TINYINT(), "Tiny Int"))
@@ -245,6 +245,9 @@ void testStarRocksDataType() throws Exception {
245245
List<String> expected =
246246
Arrays.asList(
247247
"id | int | NO | true | null",
248+
"binary | varbinary | YES | false | null",
249+
"varbinary | varbinary | YES | false | null",
250+
"bytes | varbinary | YES | false | null",
248251
"boolean | boolean | YES | false | null",
249252
"int | int | YES | false | null",
250253
"tinyint | tinyint | YES | false | null",

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtilsTest.java

Lines changed: 62 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.flink.cdc.common.event.TableId;
2727
import org.apache.flink.cdc.common.schema.Schema;
2828
import org.apache.flink.cdc.common.types.BigIntType;
29+
import org.apache.flink.cdc.common.types.BinaryType;
2930
import org.apache.flink.cdc.common.types.BooleanType;
3031
import org.apache.flink.cdc.common.types.DataType;
3132
import org.apache.flink.cdc.common.types.DataTypes;
@@ -37,6 +38,7 @@
3738
import org.apache.flink.cdc.common.types.SmallIntType;
3839
import org.apache.flink.cdc.common.types.TimestampType;
3940
import org.apache.flink.cdc.common.types.TinyIntType;
41+
import org.apache.flink.cdc.common.types.VarBinaryType;
4042
import org.apache.flink.cdc.common.types.VarCharType;
4143
import org.apache.flink.cdc.common.types.ZonedTimestampType;
4244
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
@@ -300,13 +302,36 @@ void testToStarRocksDataTypeAllBasicTypes() {
300302
}
301303

302304
@Test
303-
void testToStarRocksDataTypeUnsupported() {
305+
void testToStarRocksDataTypeBinary() {
304306
StarRocksColumn.Builder builder =
305307
new StarRocksColumn.Builder().setColumnName("col").setOrdinalPosition(0);
306-
assertThatThrownBy(
307-
() -> StarRocksUtils.toStarRocksDataType(DataTypes.BYTES(), false, builder))
308-
.isInstanceOf(UnsupportedOperationException.class)
309-
.hasMessageContaining("Unsupported CDC data type");
308+
StarRocksUtils.toStarRocksDataType(new BinaryType(17), false, builder);
309+
StarRocksColumn column = builder.build();
310+
assertThat(column.getDataType()).isEqualTo(StarRocksUtils.VARBINARY);
311+
assertThat(column.getColumnSize()).hasValue(17);
312+
assertThat(column.isNullable()).isTrue();
313+
}
314+
315+
@Test
316+
void testToStarRocksDataTypeVarBinary() {
317+
StarRocksColumn.Builder builder =
318+
new StarRocksColumn.Builder().setColumnName("col").setOrdinalPosition(0);
319+
StarRocksUtils.toStarRocksDataType(new VarBinaryType(255), false, builder);
320+
StarRocksColumn column = builder.build();
321+
assertThat(column.getDataType()).isEqualTo(StarRocksUtils.VARBINARY);
322+
assertThat(column.getColumnSize()).hasValue(255);
323+
assertThat(column.isNullable()).isTrue();
324+
}
325+
326+
@Test
327+
void testToStarRocksDataTypeBytes() {
328+
StarRocksColumn.Builder builder =
329+
new StarRocksColumn.Builder().setColumnName("col").setOrdinalPosition(0);
330+
StarRocksUtils.toStarRocksDataType(DataTypes.BYTES(), false, builder);
331+
StarRocksColumn column = builder.build();
332+
assertThat(column.getDataType()).isEqualTo(StarRocksUtils.VARBINARY);
333+
assertThat(column.getColumnSize()).hasValue(StarRocksUtils.MAX_VARBINARY_SIZE);
334+
assertThat(column.isNullable()).isTrue();
310335
}
311336

312337
private void assertStarRocksDataType(DataType cdcType, String expectedStarRocksType) {
@@ -418,12 +443,37 @@ void testCreateFieldGetterNullable() {
418443
}
419444

420445
@Test
421-
void testCreateFieldGetterUnsupportedType() {
422-
assertThatThrownBy(
423-
() ->
424-
StarRocksUtils.createFieldGetter(
425-
DataTypes.BYTES(), 0, ZoneId.of("UTC")))
426-
.isInstanceOf(UnsupportedOperationException.class)
427-
.hasMessageContaining("Don't support data type");
446+
void testCreateFieldGetterBinary() {
447+
RecordData.FieldGetter getter =
448+
StarRocksUtils.createFieldGetter(new BinaryType(10), 0, ZoneId.of("UTC"));
449+
450+
BinaryRecordDataGenerator generator =
451+
new BinaryRecordDataGenerator(new DataType[] {new BinaryType(10)});
452+
byte[] expected = new byte[] {1, 2, 3, 4, 5};
453+
BinaryRecordData record = generator.generate(new Object[] {expected});
454+
assertThat(getter.getFieldOrNull(record)).isEqualTo(expected);
455+
}
456+
457+
@Test
458+
void testCreateFieldGetterVarBinary() {
459+
RecordData.FieldGetter getter =
460+
StarRocksUtils.createFieldGetter(new VarBinaryType(255), 0, ZoneId.of("UTC"));
461+
462+
BinaryRecordDataGenerator generator =
463+
new BinaryRecordDataGenerator(new DataType[] {new VarBinaryType(255)});
464+
byte[] expected = new byte[] {0x0A, 0x0B, 0x0C};
465+
BinaryRecordData record = generator.generate(new Object[] {expected});
466+
assertThat(getter.getFieldOrNull(record)).isEqualTo(expected);
467+
}
468+
469+
@Test
470+
void testCreateFieldGetterBinaryNullable() {
471+
RecordData.FieldGetter getter =
472+
StarRocksUtils.createFieldGetter(DataTypes.BYTES(), 0, ZoneId.of("UTC"));
473+
474+
BinaryRecordDataGenerator generator =
475+
new BinaryRecordDataGenerator(new DataType[] {DataTypes.BYTES()});
476+
BinaryRecordData record = generator.generate(new Object[] {null});
477+
assertThat(getter.getFieldOrNull(record)).isNull();
428478
}
429479
}

0 commit comments

Comments
 (0)