Skip to content

Commit 4290b01

Browse files
committed
add socket timeout config
1 parent 280bd66 commit 4290b01

8 files changed

Lines changed: 56 additions & 11 deletions

File tree

flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,6 @@ public interface ConfigurationOptions {
5858

5959
String FLIGHT_SQL_PORT = "source.flight-sql-port";
6060
Integer FLIGHT_SQL_PORT_DEFAULT = -1;
61+
62+
Integer DEFAULT_SINK_SOCKET_TIMEOUT_MS = 9 * 60 * 1000;
6163
}

flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Objects;
2626
import java.util.Properties;
2727

28+
import static org.apache.doris.flink.cfg.ConfigurationOptions.DEFAULT_SINK_SOCKET_TIMEOUT_MS;
2829
import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
2930
import static org.apache.doris.flink.sink.writer.LoadConstants.JSON;
3031
import static org.apache.doris.flink.sink.writer.LoadConstants.READ_JSON_BY_LINE;
@@ -66,6 +67,7 @@ public class DorisExecutionOptions implements Serializable {
6667
private final boolean ignoreUpdateBefore;
6768
private final WriteMode writeMode;
6869
private final boolean ignoreCommitError;
70+
private final int sinkSocketTimeoutMs;
6971

7072
public DorisExecutionOptions(
7173
int checkInterval,
@@ -85,7 +87,8 @@ public DorisExecutionOptions(
8587
boolean ignoreUpdateBefore,
8688
boolean force2PC,
8789
WriteMode writeMode,
88-
boolean ignoreCommitError) {
90+
boolean ignoreCommitError,
91+
int sinkSocketTimeoutMs) {
8992
Preconditions.checkArgument(maxRetries >= 0);
9093
this.checkInterval = checkInterval;
9194
this.maxRetries = maxRetries;
@@ -107,6 +110,8 @@ public DorisExecutionOptions(
107110
this.ignoreUpdateBefore = ignoreUpdateBefore;
108111
this.writeMode = writeMode;
109112
this.ignoreCommitError = ignoreCommitError;
113+
114+
this.sinkSocketTimeoutMs = sinkSocketTimeoutMs;
110115
}
111116

112117
public static Builder builder() {
@@ -214,6 +219,10 @@ public boolean ignoreCommitError() {
214219
return ignoreCommitError;
215220
}
216221

222+
public int getSinkSocketTimeoutMs() {
223+
return sinkSocketTimeoutMs;
224+
}
225+
217226
@Override
218227
public boolean equals(Object o) {
219228
if (this == o) {
@@ -240,7 +249,8 @@ public boolean equals(Object o) {
240249
&& Objects.equals(streamLoadProp, that.streamLoadProp)
241250
&& Objects.equals(enableDelete, that.enableDelete)
242251
&& Objects.equals(enable2PC, that.enable2PC)
243-
&& writeMode == that.writeMode;
252+
&& writeMode == that.writeMode
253+
&& sinkSocketTimeoutMs == that.sinkSocketTimeoutMs;
244254
}
245255

246256
@Override
@@ -263,7 +273,8 @@ public int hashCode() {
263273
enableBatchMode,
264274
ignoreUpdateBefore,
265275
writeMode,
266-
ignoreCommitError);
276+
ignoreCommitError,
277+
sinkSocketTimeoutMs);
267278
}
268279

269280
/** Builder of {@link DorisExecutionOptions}. */
@@ -292,6 +303,8 @@ public static class Builder {
292303
private WriteMode writeMode = WriteMode.STREAM_LOAD;
293304
private boolean ignoreCommitError = false;
294305

306+
private int sinkSocketTimeoutMs = DEFAULT_SINK_SOCKET_TIMEOUT_MS;
307+
295308
/**
296309
* Sets the checkInterval to check exception with the interval while loading, The default is
297310
* 0, disabling the checker thread.
@@ -497,6 +510,17 @@ public Builder setIgnoreCommitError(boolean ignoreCommitError) {
497510
return this;
498511
}
499512

513+
/**
514+
* Set http socket timeout, only effective in batch mode.
515+
*
516+
* @param sinkSocketTimeoutMs
517+
* @return this DorisExecutionOptions.builder.
518+
*/
519+
public Builder setSinkSocketTimeoutMs(int sinkSocketTimeoutMs) {
520+
this.sinkSocketTimeoutMs = sinkSocketTimeoutMs;
521+
return this;
522+
}
523+
500524
/**
501525
* Build the {@link DorisExecutionOptions}.
502526
*
@@ -540,7 +564,8 @@ public DorisExecutionOptions build() {
540564
ignoreUpdateBefore,
541565
force2PC,
542566
writeMode,
543-
ignoreCommitError);
567+
ignoreCommitError,
568+
sinkSocketTimeoutMs);
544569
}
545570
}
546571
}

flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717

1818
package org.apache.doris.flink.sink;
1919

20+
import org.apache.doris.flink.cfg.DorisExecutionOptions;
2021
import org.apache.doris.flink.cfg.DorisReadOptions;
2122
import org.apache.http.client.config.RequestConfig;
23+
import org.apache.http.config.SocketConfig;
2224
import org.apache.http.impl.NoConnectionReuseStrategy;
2325
import org.apache.http.impl.client.CloseableHttpClient;
2426
import org.apache.http.impl.client.DefaultRedirectStrategy;
@@ -83,7 +85,7 @@ public CloseableHttpClient getHttpClient() {
8385
*
8486
* @return
8587
*/
86-
public HttpClientBuilder getHttpClientBuilderForBatch() {
88+
public HttpClientBuilder getHttpClientBuilderForBatch(DorisExecutionOptions executionOptions) {
8789
return HttpClients.custom()
8890
.setRedirectStrategy(
8991
new DefaultRedirectStrategy() {
@@ -96,10 +98,10 @@ protected boolean isRedirectable(String method) {
9698
RequestConfig.custom()
9799
.setConnectTimeout(connectTimeout)
98100
.setConnectionRequestTimeout(connectTimeout)
99-
// todo: Need to be extracted to DorisExecutionOption
100-
// default checkpoint timeout is 10min
101-
.setSocketTimeout(9 * 60 * 1000)
102-
.build());
101+
// default socket timeout 9min, checkpoint timeout default 10min
102+
.setSocketTimeout(executionOptions.getSinkSocketTimeoutMs())
103+
.build())
104+
.setDefaultSocketConfig(SocketConfig.custom().setSoKeepAlive(true).build());
103105
}
104106

105107
public HttpClientBuilder getHttpClientBuilderForCopyBatch() {

flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,12 +167,13 @@ public DorisBatchStreamLoad(
167167
0L,
168168
TimeUnit.MILLISECONDS,
169169
new LinkedBlockingQueue<>(1),
170-
new DefaultThreadFactory("streamload-executor"),
170+
new DefaultThreadFactory("streamload-executor-" + subTaskId),
171171
new ThreadPoolExecutor.AbortPolicy());
172172
this.started = new AtomicBoolean(true);
173173
this.loadExecutorService.execute(loadAsyncExecutor);
174174
this.subTaskId = subTaskId;
175-
this.httpClientBuilder = new HttpUtil(dorisReadOptions).getHttpClientBuilderForBatch();
175+
this.httpClientBuilder =
176+
new HttpUtil(dorisReadOptions).getHttpClientBuilderForBatch(executionOptions);
176177
}
177178

178179
/**

flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.Map;
3030
import java.util.Properties;
3131

32+
import static org.apache.doris.flink.cfg.ConfigurationOptions.DEFAULT_SINK_SOCKET_TIMEOUT_MS;
3233
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
3334
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
3435
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT;
@@ -294,6 +295,13 @@ public class DorisConfigOptions {
294295
"the flush interval mills, over this time, asynchronous threads will flush data. The "
295296
+ "default value is 10s.");
296297

298+
public static final ConfigOption<Duration> SINK_SOCKET_TIMEOUT =
299+
ConfigOptions.key("sink.socket.timeout")
300+
.durationType()
301+
.defaultValue(Duration.ofMillis(DEFAULT_SINK_SOCKET_TIMEOUT_MS))
302+
.withDescription(
303+
"the socket timeout for stream load, the default value is 9min, only effective in batch mode.");
304+
297305
public static final ConfigOption<Boolean> SINK_IGNORE_UPDATE_BEFORE =
298306
ConfigOptions.key("sink.ignore.update-before")
299307
.booleanType()

flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX;
7979
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES;
8080
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_PARALLELISM;
81+
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_SOCKET_TIMEOUT;
8182
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_USE_CACHE;
8283
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_WRITE_MODE;
8384
import static org.apache.doris.flink.table.DorisConfigOptions.SOURCE_USE_OLD_API;
@@ -162,6 +163,8 @@ public Set<ConfigOption<?>> optionalOptions() {
162163

163164
options.add(USE_FLIGHT_SQL);
164165
options.add(FLIGHT_SQL_PORT);
166+
167+
options.add(SINK_SOCKET_TIMEOUT);
165168
return options;
166169
}
167170

@@ -258,6 +261,7 @@ private DorisExecutionOptions getDorisExecutionOptions(
258261
builder.setBufferFlushMaxBytes(
259262
(int) readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES).getBytes());
260263
builder.setBufferFlushIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
264+
builder.setSinkSocketTimeoutMs((int) readableConfig.get(SINK_SOCKET_TIMEOUT).toMillis());
261265
builder.setUseCache(readableConfig.get(SINK_USE_CACHE));
262266
return builder.build();
263267
}

flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,7 @@ public void testTableBatch() throws Exception {
317317
+ " 'sink.ignore.update-before' = 'false',"
318318
+ " 'sink.enable.batch-mode' = '%s',"
319319
+ " 'sink.enable-delete' = 'true',"
320+
+ " 'sink.socket.timeout' = '5m',"
320321
+ " 'sink.flush.queue-size' = '2',"
321322
+ " 'sink.buffer-flush.max-rows' = '10000',"
322323
+ " 'sink.buffer-flush.max-bytes' = '10MB',"

flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ public void testDorisSinkProperties() {
143143
properties.put("sink.ignore.update-before", "true");
144144
properties.put("sink.ignore.commit-error", "false");
145145
properties.put("sink.parallelism", "1");
146+
properties.put("sink.socket.timeout", "9m");
146147

147148
DynamicTableSink actual = createTableSink(SCHEMA, properties);
148149
DorisOptions options =
@@ -175,6 +176,7 @@ public void testDorisSinkProperties() {
175176
.setFlushQueueSize(2)
176177
.setUseCache(true)
177178
.setIgnoreCommitError(false)
179+
.setSinkSocketTimeoutMs(9 * 60 * 1000)
178180
.build();
179181

180182
final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();

0 commit comments

Comments
 (0)