Skip to content

Commit 00cee69

Browse files
Hisoka-XMrart
authored andcommitted
[FLINK-37485][starrocks] Add support for TIME type (apache#4253)
1 parent 0f58a64 commit 00cee69

7 files changed

Lines changed: 488 additions & 11 deletions

File tree

docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,15 +128,15 @@ pipeline:
128128
<td>sink.connect.timeout-ms</td>
129129
<td>optional</td>
130130
<td style="word-wrap: break-word;">30000</td>
131-
<td>String</td>
131+
<td>Integer</td>
132132
<td>与 FE 建立 HTTP 连接的超时时间。取值范围:[100, 60000]。</td>
133133
</tr>
134134
<tr>
135135
<td>sink.wait-for-continue.timeout-ms</td>
136136
<td>optional</td>
137137
<td style="word-wrap: break-word;">30000</td>
138-
<td>String</td>
139-
<td>等待 FE HTTP 100-continue 应答的超时时间。取值范围:[3000, 60000]。</td>
138+
<td>Integer</td>
139+
<td>等待 FE HTTP 100-continue 应答的超时时间。取值范围:[3000, 600000]。</td>
140140
</tr>
141141
<tr>
142142
<td>sink.buffer-flush.max-bytes</td>
@@ -174,6 +174,13 @@ pipeline:
174174
<td>Boolean</td>
175175
<td>at-least-once 下是否使用 transaction stream load。</td>
176176
</tr>
177+
<tr>
178+
<td>sink.metric.histogram-window-size</td>
179+
<td>optional</td>
180+
<td style="word-wrap: break-word;">100</td>
181+
<td>Integer</td>
182+
<td>直方图指标的窗口大小。</td>
183+
</tr>
177184
<tr>
178185
<td>sink.properties.*</td>
179186
<td>optional</td>
@@ -297,6 +304,11 @@ pipeline:
297304
<td>DATE</td>
298305
<td></td>
299306
</tr>
307+
<tr>
308+
<td>TIME</td>
309+
<td>VARCHAR</td>
310+
<td>StarRocks 不支持 TIME 类型,因此映射为 VARCHAR。TIME(p) 值以字符串形式存储:当 p = 0 时格式为 "HH:mm:ss",当 p > 0 时格式为 "HH:mm:ss.&lt;p 位小数&gt;"(例如 p = 3 时为 "HH:mm:ss.SSS")。</td>
311+
</tr>
300312
<tr>
301313
<td>TIMESTAMP</td>
302314
<td>DATETIME</td>

docs/content/docs/connectors/pipeline-connectors/starrocks.md

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,14 +128,14 @@ pipeline:
128128
<td>sink.connect.timeout-ms</td>
129129
<td>optional</td>
130130
<td style="word-wrap: break-word;">30000</td>
131-
<td>String</td>
131+
<td>Integer</td>
132132
<td>The timeout for establishing HTTP connection. Valid values: 100 to 60000.</td>
133133
</tr>
134134
<tr>
135135
<td>sink.wait-for-continue.timeout-ms</td>
136136
<td>optional</td>
137137
<td style="word-wrap: break-word;">30000</td>
138-
<td>String</td>
138+
<td>Integer</td>
139139
<td>Timeout in millisecond to wait for 100-continue response from FE http server.
140140
Valid values: 3000 to 600000.</td>
141141
</tr>
@@ -177,6 +177,13 @@ pipeline:
177177
<td>Boolean</td>
178178
<td>Whether to use transaction stream load for at-least-once when it's available.</td>
179179
</tr>
180+
<tr>
181+
<td>sink.metric.histogram-window-size</td>
182+
<td>optional</td>
183+
<td style="word-wrap: break-word;">100</td>
184+
<td>Integer</td>
185+
<td>Window size of histogram metrics.</td>
186+
</tr>
180187
<tr>
181188
<td>sink.properties.*</td>
182189
<td>optional</td>
@@ -306,6 +313,11 @@ pipeline:
306313
<td>DATE</td>
307314
<td></td>
308315
</tr>
316+
<tr>
317+
<td>TIME</td>
318+
<td>VARCHAR</td>
319+
<td>StarRocks does not support TIME type, so it is mapped to VARCHAR. TIME values are stored as strings in format "HH:mm:ss" when the precision p = 0, or "HH:mm:ss.&lt;p digits&gt;" when p &gt; 0 (for example, p = 3 uses "HH:mm:ss.SSS").</td>
320+
</tr>
309321
<tr>
310322
<td>TIMESTAMP</td>
311323
<td>DATETIME</td>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
import org.apache.flink.types.Row;
4646
import org.apache.flink.util.CloseableIterator;
4747

48+
import com.alibaba.fluss.client.Connection;
49+
import com.alibaba.fluss.client.ConnectionFactory;
4850
import com.alibaba.fluss.config.ConfigOptions;
4951
import com.alibaba.fluss.config.MemorySize;
5052
import com.alibaba.fluss.metadata.DataLakeFormat;
@@ -117,7 +119,8 @@ public class FlussPipelineITCase {
117119
protected TableEnvironment tBatchEnv;
118120

119121
@BeforeEach
120-
void before() {
122+
void before() throws Exception {
123+
waitForFlussClusterReady();
121124
// open a catalog so that we can get table from the catalog
122125
String bootstrapServers = FLUSS_CLUSTER_EXTENSION.getBootstrapServers();
123126

@@ -137,6 +140,27 @@ void before() {
137140
tBatchEnv.useDatabase(DEFAULT_DB);
138141
}
139142

143+
private void waitForFlussClusterReady() throws Exception {
144+
int maxRetries = 30;
145+
int retryIntervalMs = 1000;
146+
Exception lastException = null;
147+
148+
for (int i = 0; i < maxRetries; i++) {
149+
try (Connection connection =
150+
ConnectionFactory.createConnection(FLUSS_CLUSTER_EXTENSION.getClientConfig())) {
151+
// Connection successful, cluster is ready
152+
return;
153+
} catch (Exception e) {
154+
lastException = e;
155+
Thread.sleep(retryIntervalMs);
156+
}
157+
}
158+
159+
throw new IllegalStateException(
160+
"Failed to connect to Fluss cluster after " + maxRetries + " attempts",
161+
lastException);
162+
}
163+
140164
@AfterEach
141165
void after() {
142166
tBatchEnv.useDatabase(BUILTIN_DATABASE);

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.flink.cdc.common.types.IntType;
3434
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
3535
import org.apache.flink.cdc.common.types.SmallIntType;
36+
import org.apache.flink.cdc.common.types.TimeType;
3637
import org.apache.flink.cdc.common.types.TimestampType;
3738
import org.apache.flink.cdc.common.types.TinyIntType;
3839
import org.apache.flink.cdc.common.types.VarCharType;
@@ -43,6 +44,8 @@
4344
import java.time.ZoneId;
4445
import java.time.ZonedDateTime;
4546
import java.time.format.DateTimeFormatter;
47+
import java.time.format.DateTimeFormatterBuilder;
48+
import java.time.temporal.ChronoField;
4649
import java.util.ArrayList;
4750
import java.util.List;
4851

@@ -132,6 +135,35 @@ public static void toStarRocksDataType(
132135
private static final DateTimeFormatter DATETIME_FORMATTER =
133136
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
134137

138+
/** Format TIME type data. */
139+
private static final DateTimeFormatter TIME_FORMATTER =
140+
new DateTimeFormatterBuilder().appendPattern("HH:mm:ss").toFormatter();
141+
142+
private static final DateTimeFormatter[] TIME_FORMATTERS = new DateTimeFormatter[10];
143+
144+
private static DateTimeFormatter timeFormatter(int precision) {
145+
if (precision <= 0) {
146+
return TIME_FORMATTER;
147+
}
148+
if (precision < TIME_FORMATTERS.length) {
149+
DateTimeFormatter formatter = TIME_FORMATTERS[precision];
150+
if (formatter == null) {
151+
formatter =
152+
new DateTimeFormatterBuilder()
153+
.appendPattern("HH:mm:ss")
154+
.appendFraction(
155+
ChronoField.NANO_OF_SECOND, precision, precision, true)
156+
.toFormatter();
157+
TIME_FORMATTERS[precision] = formatter;
158+
}
159+
return formatter;
160+
}
161+
return new DateTimeFormatterBuilder()
162+
.appendPattern("HH:mm:ss")
163+
.appendFraction(ChronoField.NANO_OF_SECOND, precision, precision, true)
164+
.toFormatter();
165+
}
166+
135167
/**
136168
* Creates an accessor for getting elements in an internal RecordData structure at the given
137169
* position.
@@ -183,6 +215,13 @@ record ->
183215
fieldGetter =
184216
record -> record.getDate(fieldPos).toLocalDate().format(DATE_FORMATTER);
185217
break;
218+
case TIME_WITHOUT_TIME_ZONE:
219+
fieldGetter =
220+
record ->
221+
record.getTime(fieldPos)
222+
.toLocalTime()
223+
.format(timeFormatter(getPrecision(fieldType)));
224+
break;
186225
case TIMESTAMP_WITHOUT_TIME_ZONE:
187226
fieldGetter =
188227
record ->
@@ -374,6 +413,21 @@ public StarRocksColumn.Builder visit(DateType dateType) {
374413
return builder;
375414
}
376415

416+
@Override
417+
public StarRocksColumn.Builder visit(TimeType timeType) {
418+
// StarRocks does not support TIME type, so map it to VARCHAR.
419+
// Format: HH:mm:ss for precision 0, HH:mm:ss.<p digits> for precision > 0
420+
// Maximum length: 8 (HH:mm:ss) + 1 (.) + precision = 8 + 1 + precision
421+
// For precision 0: "HH:mm:ss" = 8 characters
422+
// For precision > 0: "HH:mm:ss." + precision digits
423+
builder.setDataType(VARCHAR);
424+
builder.setNullable(timeType.isNullable());
425+
int precision = timeType.getPrecision();
426+
int length = precision > 0 ? 8 + 1 + precision : 8;
427+
builder.setColumnSize(length);
428+
return builder;
429+
}
430+
377431
@Override
378432
public StarRocksColumn.Builder visit(TimestampType timestampType) {
379433
builder.setDataType(DATETIME);
@@ -404,7 +458,8 @@ public static String convertInvalidTimestampDefaultValue(
404458
|| dataType instanceof org.apache.flink.cdc.common.types.TimestampType
405459
|| dataType instanceof org.apache.flink.cdc.common.types.ZonedTimestampType) {
406460

407-
if (INVALID_OR_MISSING_DATATIME.equals(defaultValue)) {
461+
if (INVALID_OR_MISSING_DATATIME.equals(defaultValue)
462+
|| defaultValue.startsWith(INVALID_OR_MISSING_DATATIME)) {
408463
return DEFAULT_DATETIME;
409464
}
410465
}

0 commit comments

Comments
 (0)