Skip to content

Commit b3dba8b

Browse files
[feat] Support Apache Doris as time-series storage for both metrics and logs (#4031)
1 parent 8174b68 commit b3dba8b

10 files changed

Lines changed: 2932 additions & 0 deletions

File tree

hertzbeat-startup/src/main/resources/application.yml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,38 @@ warehouse:
225225
password: root
226226
expire-time: '30d'
227227
replication: 1
228+
doris:
229+
enabled: false
230+
url: jdbc:mysql://127.0.0.1:9030
231+
username: root
232+
password:
233+
table-config:
234+
enable-partition: false
235+
partition-time-unit: DAY
236+
partition-retention-days: 30
237+
partition-future-days: 3
238+
buckets: 8
239+
replication-num: 1
240+
pool-config:
241+
minimum-idle: 5
242+
maximum-pool-size: 20
243+
connection-timeout: 30000
244+
write-config:
245+
# Write mode: jdbc (default, suitable for small/medium scale) or stream (high throughput)
246+
write-mode: jdbc
247+
# JDBC mode: batch size and flush interval
248+
batch-size: 1000
249+
flush-interval: 5
250+
# Stream Load mode configuration (only used when write-mode: stream)
251+
stream-load-config:
252+
# Doris FE HTTP port for Stream Load API
253+
http-port: :8030
254+
# Stream load timeout in seconds
255+
timeout: 60
256+
# Max batch size in bytes for stream load (10MB default)
257+
max-bytes-per-batch: 10485760
258+
# Redirect policy in complex networks: direct/public/private, empty means Doris default
259+
redirect-policy: ""
228260
# store real-time metrics data, enable only one below
229261
real-time:
230262
memory:

hertzbeat-warehouse/pom.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,19 @@
119119
</exclusion>
120120
</exclusions>
121121
</dependency>
122+
<!-- doris -->
123+
<dependency>
124+
<groupId>com.mysql</groupId>
125+
<artifactId>mysql-connector-j</artifactId>
126+
</dependency>
127+
<dependency>
128+
<groupId>com.zaxxer</groupId>
129+
<artifactId>HikariCP</artifactId>
130+
</dependency>
131+
<dependency>
132+
<groupId>org.apache.httpcomponents</groupId>
133+
<artifactId>httpclient</artifactId>
134+
</dependency>
122135
<!-- kafka -->
123136
<dependency>
124137
<groupId>org.apache.kafka</groupId>

hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/constants/WarehouseConstants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ interface HistoryName {
4848
String QUEST_DB = "questdb";
4949

5050
String DUCKDB = "duckdb";
51+
52+
String DORIS = "doris";
5153
}
5254

5355
/**

hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisDataStorage.java

Lines changed: 1244 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.hertzbeat.warehouse.store.history.tsdb.doris;
19+
20+
import lombok.AllArgsConstructor;
21+
import lombok.Builder;
22+
import lombok.Data;
23+
import lombok.NoArgsConstructor;
24+
25+
import java.sql.Timestamp;
26+
27+
/**
28+
* Internal class to represent a metric row for Doris storage.
29+
* This class is used by both DorisDataStorage and DorisStreamLoadWriter.
30+
*/
31+
@Data
32+
@Builder
33+
@AllArgsConstructor
34+
@NoArgsConstructor
35+
public class DorisMetricRow {
36+
public String instance;
37+
public String app;
38+
public String metrics;
39+
public String metric;
40+
public byte metricType;
41+
public Integer int32Value;
42+
public Double doubleValue;
43+
public String strValue;
44+
public Timestamp recordTime;
45+
public String labels;
46+
}
Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.hertzbeat.warehouse.store.history.tsdb.doris;
19+
20+
import org.apache.hertzbeat.common.constants.ConfigConstants;
21+
import org.apache.hertzbeat.common.constants.SignConstants;
22+
import org.apache.hertzbeat.warehouse.constants.WarehouseConstants;
23+
import org.springframework.boot.context.properties.ConfigurationProperties;
24+
import org.springframework.boot.context.properties.bind.DefaultValue;
25+
26+
/**
27+
* Apache Doris configuration properties
28+
*/
29+
@ConfigurationProperties(prefix = ConfigConstants.FunctionModuleConstants.WAREHOUSE
30+
+ SignConstants.DOT
31+
+ WarehouseConstants.STORE
32+
+ SignConstants.DOT
33+
+ WarehouseConstants.HistoryName.DORIS)
34+
public record DorisProperties(
35+
@DefaultValue("false") boolean enabled,
36+
@DefaultValue("jdbc:mysql://127.0.0.1:9030/hertzbeat") String url,
37+
String username,
38+
String password,
39+
TableConfig tableConfig,
40+
PoolConfig poolConfig,
41+
WriteConfig writeConfig) {
42+
/**
43+
* Table structure configuration
44+
*/
45+
public record TableConfig(
46+
// Whether to enable dynamic partitioning (default enabled)
47+
@DefaultValue("false") boolean enablePartition,
48+
// Partition time unit: DAY, HOUR, MONTH
49+
@DefaultValue("DAY") String partitionTimeUnit,
50+
// Dynamic partition retention days
51+
@DefaultValue("7") int partitionRetentionDays,
52+
// Dynamic partition creation range (future partitions to create)
53+
@DefaultValue("3") int partitionFutureDays,
54+
// Number of buckets
55+
@DefaultValue("8") int buckets,
56+
// Number of replicas (recommended 3 for production)
57+
@DefaultValue("1") int replicationNum,
58+
// Maximum length of string columns
59+
@DefaultValue("4096") int strColumnMaxLength) {
60+
public TableConfig {
61+
if (partitionRetentionDays <= 0) {
62+
partitionRetentionDays = 7;
63+
}
64+
if (partitionFutureDays <= 0) {
65+
partitionFutureDays = 3;
66+
}
67+
if (buckets <= 0) {
68+
buckets = 8;
69+
}
70+
if (replicationNum <= 0) {
71+
replicationNum = 1;
72+
}
73+
if (strColumnMaxLength <= 0 || strColumnMaxLength > 65533) {
74+
strColumnMaxLength = 4096;
75+
}
76+
}
77+
}
78+
79+
/**
80+
* Connection pool configuration (based on HikariCP)
81+
*/
82+
public record PoolConfig(
83+
// Minimum idle connections
84+
@DefaultValue("5") int minimumIdle,
85+
// Maximum pool size
86+
@DefaultValue("20") int maximumPoolSize,
87+
// Connection timeout in milliseconds
88+
@DefaultValue("30000") int connectionTimeout,
89+
// Maximum connection lifetime in milliseconds (0 means no limit)
90+
@DefaultValue("0") long maxLifetime,
91+
// Idle connection timeout in milliseconds (0 means never recycle)
92+
@DefaultValue("600000") long idleTimeout) {
93+
public PoolConfig {
94+
if (minimumIdle <= 0) {
95+
minimumIdle = 5;
96+
}
97+
if (maximumPoolSize <= 0) {
98+
maximumPoolSize = 20;
99+
}
100+
if (minimumIdle > maximumPoolSize) {
101+
minimumIdle = maximumPoolSize;
102+
}
103+
if (connectionTimeout <= 0) {
104+
connectionTimeout = 30000;
105+
}
106+
}
107+
}
108+
109+
/**
110+
* Write configuration
111+
*/
112+
public record WriteConfig(
113+
// Write mode: jdbc (batch insert) or stream (HTTP stream load)
114+
@DefaultValue("jdbc") String writeMode,
115+
// Batch write size (for jdbc mode)
116+
@DefaultValue("1000") int batchSize,
117+
// Batch write flush interval in seconds (for jdbc mode)
118+
@DefaultValue("5") int flushInterval,
119+
// Fallback to JDBC when stream load fails (may introduce duplicate data in ambiguous cases)
120+
@DefaultValue("false") boolean fallbackToJdbcOnFailure,
121+
// Stream load configuration (for stream mode)
122+
StreamLoadConfig streamLoadConfig) {
123+
public WriteConfig {
124+
String normalizedWriteMode = writeMode == null ? "" : writeMode.trim().toLowerCase();
125+
if (!"jdbc".equals(normalizedWriteMode) && !"stream".equals(normalizedWriteMode)) {
126+
writeMode = "jdbc";
127+
} else {
128+
writeMode = normalizedWriteMode;
129+
}
130+
if (batchSize <= 0) {
131+
batchSize = 1000;
132+
}
133+
if (flushInterval <= 0) {
134+
flushInterval = 5;
135+
}
136+
if (streamLoadConfig == null) {
137+
streamLoadConfig = StreamLoadConfig.createDefault();
138+
}
139+
}
140+
}
141+
142+
/**
143+
* Stream Load configuration for HTTP-based streaming writes
144+
*/
145+
public record StreamLoadConfig(
146+
// Doris FE HTTP port for Stream Load API
147+
@DefaultValue(":8030") String httpPort,
148+
// Stream load timeout in seconds
149+
@DefaultValue("60") int timeout,
150+
// Max batch size in bytes for stream load
151+
@DefaultValue("10485760") int maxBytesPerBatch,
152+
// Maximum allowed filter ratio in [0,1]
153+
@DefaultValue("0.1") double maxFilterRatio,
154+
// Enable strict mode
155+
@DefaultValue("false") boolean strictMode,
156+
// Import timezone, empty means Doris default
157+
@DefaultValue("") String timezone,
158+
// Redirect policy: direct/public/private
159+
@DefaultValue("") String redirectPolicy,
160+
// Group commit mode: async_mode/sync_mode/off_mode
161+
@DefaultValue("") String groupCommit,
162+
// Send batch parallelism, 0 means Doris default
163+
@DefaultValue("0") int sendBatchParallelism,
164+
// Retry times for one label when stream load is retryable
165+
@DefaultValue("2") int retryTimes) {
166+
public StreamLoadConfig {
167+
if (httpPort == null || httpPort.isBlank()) {
168+
httpPort = ":8030";
169+
} else {
170+
httpPort = httpPort.trim();
171+
}
172+
if (timeout <= 0) {
173+
timeout = 60;
174+
}
175+
if (maxBytesPerBatch <= 0) {
176+
maxBytesPerBatch = 10485760; // 10MB
177+
}
178+
if (maxFilterRatio < 0 || maxFilterRatio > 1) {
179+
maxFilterRatio = 0.1;
180+
}
181+
if (sendBatchParallelism < 0) {
182+
sendBatchParallelism = 0;
183+
}
184+
if (retryTimes < 0) {
185+
retryTimes = 2;
186+
}
187+
if (!isValidRedirectPolicy(redirectPolicy)) {
188+
redirectPolicy = "";
189+
}
190+
if (!isValidGroupCommit(groupCommit)) {
191+
groupCommit = "";
192+
}
193+
timezone = timezone == null ? "" : timezone.trim();
194+
}
195+
196+
/**
197+
* Factory method to create default StreamLoadConfig
198+
*/
199+
public static StreamLoadConfig createDefault() {
200+
return new StreamLoadConfig(":8030", 60, 10485760,
201+
0.1, false, "", "", "", 0, 2);
202+
}
203+
204+
private static boolean isValidRedirectPolicy(String value) {
205+
if (value == null || value.isBlank()) {
206+
return true;
207+
}
208+
return "direct".equalsIgnoreCase(value)
209+
|| "public".equalsIgnoreCase(value)
210+
|| "private".equalsIgnoreCase(value);
211+
}
212+
213+
private static boolean isValidGroupCommit(String value) {
214+
if (value == null || value.isBlank()) {
215+
return true;
216+
}
217+
return "async_mode".equalsIgnoreCase(value)
218+
|| "sync_mode".equalsIgnoreCase(value)
219+
|| "off_mode".equalsIgnoreCase(value);
220+
}
221+
}
222+
223+
// Provide default values for nested configs if null
224+
public DorisProperties {
225+
if (tableConfig == null) {
226+
tableConfig = new TableConfig(false, "DAY", 7, 3, 8, 1, 4096);
227+
}
228+
if (poolConfig == null) {
229+
poolConfig = new PoolConfig(5, 20, 30000, 0, 600000);
230+
}
231+
if (writeConfig == null) {
232+
writeConfig = new WriteConfig("jdbc", 1000, 5, false, StreamLoadConfig.createDefault());
233+
}
234+
}
235+
}

0 commit comments

Comments
 (0)