Skip to content

Commit f8d8ada

Browse files
committed
[improve] Supports http request use utf8 charset
1 parent f5492fc commit f8d8ada

6 files changed

Lines changed: 15 additions & 79 deletions

File tree

spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -326,12 +326,8 @@ private void handleStreamLoadProperties(HttpPut httpPut) throws OptionRequiredEx
326326
currentLabel = generateStreamLoadLabel();
327327
httpPut.setHeader("label", currentLabel);
328328
}
329-
if (config.getValue(DorisOptions.DORIS_SINK_ADD_COLUMNS_HEADER)) {
330-
String writeFields = getWriteFields();
331-
httpPut.setHeader("columns", writeFields);
332-
} else {
333-
logger.info("skip setting columns header for stream load");
334-
}
329+
String writeFields = getWriteFields();
330+
httpPut.setHeader("columns", writeFields);
335331
if (config.contains(DorisOptions.DORIS_MAX_FILTER_RATIO)) {
336332
httpPut.setHeader("max_filter_ratio", config.getValue(DorisOptions.DORIS_MAX_FILTER_RATIO));
337333
}

spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisConfig.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -122,14 +122,6 @@ private void checkOptions(Map<String, String> options) throws OptionRequiredExce
122122
throw new IllegalArgumentException("option [%s] and [%s] cannot be true at this same time");
123123
}
124124
}
125-
if (options.containsKey(DorisOptions.DORIS_SINK_ADD_COLUMNS_HEADER.getName())
126-
&& !Boolean.parseBoolean(options.get(DorisOptions.DORIS_SINK_ADD_COLUMNS_HEADER.getName()))) {
127-
Map<String, String> sinkProperties = getSinkProperties();
128-
if (!sinkProperties.containsKey("format") || !"json".equalsIgnoreCase(sinkProperties.get("format"))) {
129-
throw new IllegalArgumentException(String.format("option [%s] is only supported when sink property 'format' is 'json'",
130-
DorisOptions.DORIS_SINK_ADD_COLUMNS_HEADER.getName()));
131-
}
132-
}
133125
}
134126

135127
public boolean contains(ConfigOption<?> option) {

spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ public class DorisOptions {
144144

145145
public static final ConfigOption<Integer> DORIS_SINK_NET_BUFFER_SIZE = ConfigOptions.name("doris.sink.net.buffer.size").intType().defaultValue(1024 * 1024).withDescription("");
146146

147-
public static final ConfigOption<Boolean> DORIS_SINK_ADD_COLUMNS_HEADER = ConfigOptions.name("doris.sink.add-columns-header").booleanType().defaultValue(true).withDescription("");
147+
public static final ConfigOption<Boolean> DORIS_SINK_HTTP_UTF8_CHARSET = ConfigOptions.name("doris.sink.http-utf8-charset").booleanType().defaultValue(false).withDescription("");
148148

149149

150150
}

spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/HttpUtils.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,31 @@ package org.apache.doris.spark.util
2020
import org.apache.doris.spark.config.{DorisConfig, DorisOptions}
2121
import org.apache.http.HttpHeaders
2222
import org.apache.http.client.methods.HttpRequestBase
23+
import org.apache.http.config.ConnectionConfig
2324
import org.apache.http.conn.ssl.{SSLConnectionSocketFactory, TrustAllStrategy}
2425
import org.apache.http.impl.client.{CloseableHttpClient, DefaultRedirectStrategy, HttpClients}
2526
import org.apache.http.protocol.HttpRequestExecutor
2627
import org.apache.http.ssl.SSLContexts
2728

2829
import java.io.{File, FileInputStream}
29-
import java.nio.charset.StandardCharsets
30+
import java.nio.charset.{CodingErrorAction, StandardCharsets}
3031
import java.security.KeyStore
3132
import java.util.Base64
3233
import scala.util.{Failure, Success, Try}
3334

3435
object HttpUtils {
3536

3637
def getHttpClient(config: DorisConfig): CloseableHttpClient = {
38+
39+
var connectionConfig = ConnectionConfig.DEFAULT;
40+
if (config.getValue(DorisOptions.DORIS_SINK_HTTP_UTF8_CHARSET)) {
41+
connectionConfig = ConnectionConfig.custom()
42+
.setCharset(StandardCharsets.UTF_8)
43+
.setMalformedInputAction(CodingErrorAction.REPLACE)
44+
.setUnmappableInputAction(CodingErrorAction.REPLACE).build()
45+
}
3746
val builder = HttpClients.custom()
47+
.setDefaultConnectionConfig(connectionConfig)
3848
.setRequestExecutor(new HttpRequestExecutor(60000))
3949
.setRedirectStrategy(new DefaultRedirectStrategy {
4050
override def isRedirectable(method: String): Boolean = true

spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/config/DorisConfigTest.scala

Lines changed: 0 additions & 61 deletions
This file was deleted.

spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -440,8 +440,7 @@ class DorisWriterITCase extends AbstractContainerTestBase {
440440
| "fenodes"="${getFenodes}",
441441
| "user"="${getDorisUsername}",
442442
| "password"="${getDorisPassword}",
443-
| "doris.sink.add-columns-header"="false",
444-
| "doris.sink.properties.format"="json"
443+
| "doris.sink.http-utf8-charset"="true"
445444
|)
446445
|""".stripMargin)
447446
session.sql(

0 commit comments

Comments
 (0)