Skip to content

Commit d60e73e

Browse files
committed
[improve] Supports http request use utf8 charset
1 parent 3e6e0ab commit d60e73e

11 files changed

Lines changed: 147 additions & 8 deletions

File tree

flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,8 @@ public interface ConfigurationOptions {
6060

6161
String FLIGHT_SQL_PORT = "source.flight-sql-port";
6262
Integer FLIGHT_SQL_PORT_DEFAULT = -1;
63+
64+
String SINK_HTTP_UTF8_CHARSET = "sink.http-utf8-charset";
65+
Boolean SINK_HTTP_UTF8_CHARSET_DEFAULT = false;
66+
6367
}

flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class DorisExecutionOptions implements Serializable {
4949
private final int bufferCount;
5050
private final String labelPrefix;
5151
private final boolean useCache;
52+
private final boolean httpUtf8Charset;
5253

5354
/** Properties for the StreamLoad. */
5455
private final Properties streamLoadProp;
@@ -74,6 +75,7 @@ public DorisExecutionOptions(
7475
int bufferCount,
7576
String labelPrefix,
7677
boolean useCache,
78+
boolean httpUtf8Charset,
7779
Properties streamLoadProp,
7880
Boolean enableDelete,
7981
Boolean enable2PC,
@@ -93,6 +95,7 @@ public DorisExecutionOptions(
9395
this.bufferCount = bufferCount;
9496
this.labelPrefix = labelPrefix;
9597
this.useCache = useCache;
98+
this.httpUtf8Charset = httpUtf8Charset;
9699
this.streamLoadProp = streamLoadProp;
97100
this.enableDelete = enableDelete;
98101
this.enable2PC = enable2PC;
@@ -162,6 +165,10 @@ public boolean isUseCache() {
162165
return useCache;
163166
}
164167

168+
public boolean isHttpUtf8Charset() {
169+
return httpUtf8Charset;
170+
}
171+
165172
public Properties getStreamLoadProp() {
166173
return streamLoadProp;
167174
}
@@ -228,6 +235,7 @@ public boolean equals(Object o) {
228235
&& bufferSize == that.bufferSize
229236
&& bufferCount == that.bufferCount
230237
&& useCache == that.useCache
238+
&& httpUtf8Charset == that.httpUtf8Charset
231239
&& force2PC == that.force2PC
232240
&& flushQueueSize == that.flushQueueSize
233241
&& bufferFlushMaxRows == that.bufferFlushMaxRows
@@ -252,6 +260,7 @@ public int hashCode() {
252260
bufferCount,
253261
labelPrefix,
254262
useCache,
263+
httpUtf8Charset,
255264
streamLoadProp,
256265
enableDelete,
257266
enable2PC,
@@ -274,6 +283,7 @@ public static class Builder {
274283
private int bufferCount = DEFAULT_BUFFER_COUNT;
275284
private String labelPrefix = "";
276285
private boolean useCache = false;
286+
private boolean httpUtf8Charset = false;
277287
private Properties streamLoadProp = new Properties();
278288
private boolean enableDelete = true;
279289
private boolean enable2PC = true;
@@ -361,6 +371,17 @@ public Builder setUseCache(boolean useCache) {
361371
return this;
362372
}
363373

374+
/**
375+
* Sets whether to set http utf8 charset for stream load.
376+
*
377+
* @param httpUtf8Charset
378+
* @return this DorisExecutionOptions.builder.
379+
*/
380+
public Builder setHttpUtf8Charset(boolean httpUtf8Charset) {
381+
this.httpUtf8Charset = httpUtf8Charset;
382+
return this;
383+
}
384+
364385
/**
365386
* Sets the properties for stream load.
366387
*
@@ -529,6 +550,7 @@ public DorisExecutionOptions build() {
529550
bufferCount,
530551
labelPrefix,
531552
useCache,
553+
httpUtf8Charset,
532554
streamLoadProp,
533555
enableDelete,
534556
enable2PC,

flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.doris.flink.cfg.DorisReadOptions;
2121
import org.apache.http.client.config.RequestConfig;
22+
import org.apache.http.config.ConnectionConfig;
2223
import org.apache.http.impl.NoConnectionReuseStrategy;
2324
import org.apache.http.impl.client.CloseableHttpClient;
2425
import org.apache.http.impl.client.DefaultRedirectStrategy;
@@ -27,29 +28,45 @@
2728
import org.apache.http.protocol.HttpRequestExecutor;
2829
import org.apache.http.protocol.RequestContent;
2930

31+
import java.nio.charset.CodingErrorAction;
32+
import java.nio.charset.StandardCharsets;
33+
3034
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
35+
import static org.apache.doris.flink.cfg.ConfigurationOptions.SINK_HTTP_UTF8_CHARSET_DEFAULT;
3136

3237
/** util to build http client. */
3338
public class HttpUtil {
3439
private final int connectTimeout;
3540
private final int waitForContinueTimeout;
41+
private final boolean httpUtf8Charset;
3642
private HttpClientBuilder httpClientBuilder;
3743

3844
public HttpUtil() {
3945
this.connectTimeout = DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
4046
this.waitForContinueTimeout = DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
47+
this.httpUtf8Charset = SINK_HTTP_UTF8_CHARSET_DEFAULT;
4148
settingStreamHttpClientBuilder();
4249
}
4350

44-
public HttpUtil(DorisReadOptions readOptions) {
51+
public HttpUtil(DorisReadOptions readOptions, boolean httpUtf8Charset) {
4552
this.connectTimeout = readOptions.getRequestConnectTimeoutMs();
4653
this.waitForContinueTimeout = readOptions.getRequestConnectTimeoutMs();
54+
this.httpUtf8Charset = httpUtf8Charset;
4755
settingStreamHttpClientBuilder();
4856
}
4957

5058
private void settingStreamHttpClientBuilder() {
59+
ConnectionConfig connectionConfig = ConnectionConfig.DEFAULT;
60+
if (httpUtf8Charset) {
61+
connectionConfig = ConnectionConfig.custom()
62+
.setCharset(StandardCharsets.UTF_8)
63+
.setMalformedInputAction(CodingErrorAction.REPLACE)
64+
.setUnmappableInputAction(CodingErrorAction.REPLACE)
65+
.build();
66+
}
5167
this.httpClientBuilder =
5268
HttpClients.custom()
69+
.setDefaultConnectionConfig(connectionConfig)
5370
// default timeout 3s, maybe report 307 error when fe busy
5471
.setRequestExecutor(new HttpRequestExecutor(waitForContinueTimeout))
5572
.setRedirectStrategy(

flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ public DorisBatchStreamLoad(
172172
this.started = new AtomicBoolean(true);
173173
this.loadExecutorService.execute(loadAsyncExecutor);
174174
this.subTaskId = subTaskId;
175-
this.httpClientBuilder = new HttpUtil(dorisReadOptions).getHttpClientBuilderForBatch();
175+
this.httpClientBuilder = new HttpUtil(dorisReadOptions, executionOptions.isHttpUtf8Charset()).getHttpClientBuilderForBatch();
176176
}
177177

178178
/**

flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public DorisCommitter(
7070
dorisOptions,
7171
dorisReadOptions,
7272
executionOptions,
73-
new HttpUtil(dorisReadOptions).getHttpClient());
73+
new HttpUtil(dorisReadOptions, executionOptions.isHttpUtf8Charset()).getHttpClient());
7474
}
7575

7676
public DorisCommitter(

flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ public DorisStreamLoad getStreamLoader(String tableKey) {
310310
dorisOptions,
311311
executionOptions,
312312
labelGenerator,
313-
new HttpUtil(dorisReadOptions).getHttpClient()));
313+
new HttpUtil(dorisReadOptions, executionOptions.isHttpUtf8Charset()).getHttpClient()));
314314
}
315315

316316
/** Http throws an exception actively, there is no need to check regularly. */

flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@
4040
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
4141
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT;
4242

43-
/** Options for the Doris connector. */
43+
/**
44+
* Options for the Doris connector.
45+
*/
4446
@PublicEvolving
4547
public class DorisConfigOptions {
4648

@@ -340,4 +342,11 @@ public static Properties getStreamLoadProp(Map<String, String> tableOptions) {
340342
}
341343
return streamLoadProp;
342344
}
345+
346+
public static final ConfigOption<Boolean> SINK_HTTP_UTF8_CHARSET =
347+
ConfigOptions.key("sink.http-utf8-charset")
348+
.booleanType()
349+
.defaultValue(false)
350+
.withDescription("Set sink http client default charset to utf8 for support unicode characters in header, the default value is false");
351+
343352
}

flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER;
8787
import static org.apache.doris.flink.table.DorisConfigOptions.USERNAME;
8888
import static org.apache.doris.flink.table.DorisConfigOptions.USE_FLIGHT_SQL;
89+
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_HTTP_UTF8_CHARSET;
8990

9091
/**
9192
* The {@link DorisDynamicTableFactory} translates the catalog table to a table source.
@@ -165,6 +166,7 @@ public Set<ConfigOption<?>> optionalOptions() {
165166

166167
options.add(USE_FLIGHT_SQL);
167168
options.add(FLIGHT_SQL_PORT);
169+
options.add(SINK_HTTP_UTF8_CHARSET);
168170
return options;
169171
}
170172

@@ -263,6 +265,7 @@ private DorisExecutionOptions getDorisExecutionOptions(
263265
(int) readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES).getBytes());
264266
builder.setBufferFlushIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
265267
builder.setUseCache(readableConfig.get(SINK_USE_CACHE));
268+
builder.setHttpUtf8Charset(readableConfig.get(SINK_HTTP_UTF8_CHARSET));
266269
return builder.build();
267270
}
268271

flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,9 @@ public DorisSink<String> buildDorisSink(String tableIdentifier) {
326326
sinkConfig
327327
.getOptional(DorisConfigOptions.SINK_IGNORE_COMMIT_ERROR)
328328
.ifPresent(executionBuilder::setIgnoreCommitError);
329+
sinkConfig
330+
.getOptional(DorisConfigOptions.SINK_HTTP_UTF8_CHARSET)
331+
.ifPresent(executionBuilder::setHttpUtf8Charset);
329332

330333
DorisExecutionOptions executionOptions = executionBuilder.build();
331334
builder.setDorisReadOptions(DorisReadOptions.builder().build())

flink-doris-connector/src/test/java/org/apache/doris/flink/cfg/DorisExecutionOptionsTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public void testEquals() {
6262
.setUseCache(true)
6363
.setFlushQueueSize(2)
6464
.setIgnoreUpdateBefore(true)
65+
.setHttpUtf8Charset(true)
6566
.build();
6667

6768
DorisExecutionOptions.Builder builder =
@@ -81,7 +82,8 @@ public void testEquals() {
8182
.setBatchMode(false)
8283
.setUseCache(true)
8384
.setFlushQueueSize(2)
84-
.setIgnoreUpdateBefore(true);
85+
.setIgnoreUpdateBefore(true)
86+
.setHttpUtf8Charset(true);
8587

8688
Assert.assertNotEquals(exceptOptions, null);
8789
Assert.assertEquals(exceptOptions, exceptOptions);
@@ -147,9 +149,9 @@ public void testEquals() {
147149
Assert.assertNotEquals(exceptOptions, builder.build());
148150
builder.setFlushQueueSize(2);
149151

150-
builder.setIgnoreUpdateBefore(false);
152+
builder.setHttpUtf8Charset(false);
151153
Assert.assertNotEquals(exceptOptions, builder.build());
152-
builder.setIgnoreUpdateBefore(true);
154+
builder.setHttpUtf8Charset(true);
153155
}
154156

155157
@Test(expected = IllegalArgumentException.class)

0 commit comments

Comments
 (0)