diff --git a/hertzbeat-manager/src/main/resources/application.yml b/hertzbeat-manager/src/main/resources/application.yml
index 7060aca0958..669f240ae62 100644
--- a/hertzbeat-manager/src/main/resources/application.yml
+++ b/hertzbeat-manager/src/main/resources/application.yml
@@ -229,6 +229,11 @@ warehouse:
password: root
expire-time: '30d'
replication: 1
+ alibabacloud-es:
+ enabled: false
+ url: http://127.0.01:9200
+ username: elastic
+ password: elastic
# store real-time metrics data, enable only one below
real-time:
memory:
diff --git a/hertzbeat-warehouse/pom.xml b/hertzbeat-warehouse/pom.xml
index 95a32a50128..f1d1c85d3db 100644
--- a/hertzbeat-warehouse/pom.xml
+++ b/hertzbeat-warehouse/pom.xml
@@ -156,5 +156,12 @@
snappy-java
${snappy-java.version}
+
+
+ io.searchbox
+ jest
+ ${io.searchbox.version}
+
+
diff --git a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/constants/WarehouseConstants.java b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/constants/WarehouseConstants.java
index b646b5f9ecb..54e52843477 100644
--- a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/constants/WarehouseConstants.java
+++ b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/constants/WarehouseConstants.java
@@ -31,6 +31,9 @@ public interface WarehouseConstants {
* History database name.
*/
interface HistoryName {
+
+ String ALIBABACLOUD_ES = "alibabacloud-es";
+
String GREPTIME = "greptime";
String INFLUXDB = "influxdb";
diff --git a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/AlibabaCloudEsPromqlQueryExecutor.java b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/AlibabaCloudEsPromqlQueryExecutor.java
new file mode 100644
index 00000000000..ce0ae7e9cbf
--- /dev/null
+++ b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/AlibabaCloudEsPromqlQueryExecutor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.warehouse.db;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hertzbeat.warehouse.store.history.tsdb.alibabacloud.AlibabaCloudEsProperties;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+import org.springframework.web.client.RestTemplate;
+
+/**
+ * query executor for alibaba cloud elasticsearch
+ */
+@Slf4j
+@Component
+@ConditionalOnProperty(prefix = "warehouse.store.alibabacloud-es", name = "enabled", havingValue = "true")
+public class AlibabaCloudEsPromqlQueryExecutor extends PromqlQueryExecutor {
+
+ private static final String QUERY_PATH = "/_time_stream/prom/%s/query";
+ private static final String QUERY_RANGE_PATH = "/_time_stream/prom/%s/query_range";
+
+ private final String datasource;
+
+ AlibabaCloudEsPromqlQueryExecutor(AlibabaCloudEsProperties properties, RestTemplate restTemplate) {
+ super(restTemplate, new HttpPromqlProperties(properties.url(), properties.username(), properties.password(),
+ String.format(QUERY_PATH, properties.database()), String.format(QUERY_RANGE_PATH, properties.database())));
+ this.datasource = properties.database();
+ }
+
+ @Override
+ public String getDatasource() {
+ return datasource;
+ }
+}
\ No newline at end of file
diff --git a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/PromqlQueryExecutor.java b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/PromqlQueryExecutor.java
index 9e70596fb96..c6ae7eb3556 100644
--- a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/PromqlQueryExecutor.java
+++ b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/PromqlQueryExecutor.java
@@ -79,8 +79,13 @@ public abstract class PromqlQueryExecutor implements QueryExecutor {
protected record HttpPromqlProperties(
String url,
String username,
- String password
+ String password,
+ String queryPath,
+ String queryRangePath
) {
+ public HttpPromqlProperties(String url, String username, String password) {
+ this(url, username, password, QUERY_PATH, QUERY_RANGE_PATH);
+ }
}
@Override
@@ -98,7 +103,7 @@ public List> execute(String queryString) {
}
HttpEntity httpEntity = new HttpEntity<>(headers);
- UriComponentsBuilder uriComponentsBuilder = UriComponentsBuilder.fromUriString(httpPromqlProperties.url + QUERY_PATH);
+ UriComponentsBuilder uriComponentsBuilder = UriComponentsBuilder.fromUriString(httpPromqlProperties.url + httpPromqlProperties.queryPath);
uriComponentsBuilder.queryParam(HTTP_QUERY_PARAM, queryString);
URI uri = uriComponentsBuilder.build().toUri();
ResponseEntity responseEntity = restTemplate.exchange(uri,
@@ -150,14 +155,14 @@ public DatasourceQueryData query(DatasourceQuery datasourceQuery) {
HttpEntity httpEntity = new HttpEntity<>(headers);
URI uri;
if (datasourceQuery.getTimeType().equals(RANGE)) {
- uri = UriComponentsBuilder.fromUriString(httpPromqlProperties.url() + QUERY_RANGE_PATH)
+ uri = UriComponentsBuilder.fromUriString(httpPromqlProperties.url() + httpPromqlProperties.queryRangePath)
.queryParam(HTTP_QUERY_PARAM, datasourceQuery.getExpr())
.queryParam(HTTP_START_PARAM, datasourceQuery.getStart())
.queryParam(HTTP_END_PARAM, datasourceQuery.getEnd())
.queryParam(HTTP_STEP_PARAM, datasourceQuery.getStep())
.build().toUri();
} else if (datasourceQuery.getTimeType().equals(INSTANT)) {
- uri = UriComponentsBuilder.fromUriString(httpPromqlProperties.url() + QUERY_PATH)
+ uri = UriComponentsBuilder.fromUriString(httpPromqlProperties.url() + httpPromqlProperties.queryPath)
.queryParam(HTTP_QUERY_PARAM, datasourceQuery.getExpr())
.build().toUri();
} else {
diff --git a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/alibabacloud/AlibabaCloudEsConstants.java b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/alibabacloud/AlibabaCloudEsConstants.java
new file mode 100644
index 00000000000..c143732f919
--- /dev/null
+++ b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/alibabacloud/AlibabaCloudEsConstants.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.warehouse.store.history.tsdb.alibabacloud;
+
+/**
+ * Constants for AlibabaCloud Elasticsearch data storage
+ */
+public final class AlibabaCloudEsConstants {
+
+ private AlibabaCloudEsConstants() {
+
+ }
+
+ // Elasticsearch specific constants
+ public static final String TIME_STREAM = "_time_stream";
+ public static final String INDEX_TYPE = "_doc";
+ public static final String QUERY_RANGE_PATH = TIME_STREAM + "/prom/%s/query_range";
+
+ // Label keys
+ public static final String LABEL_KEY_HOST = "host";
+ public static final String LABEL_KEY_INSTANCE = "instance";
+ public static final String LABEL_KEY_JOB = "job";
+ public static final String LABEL_KEY_NAME = "__name__";
+
+ // Separators
+ public static final String SPLIT = "_";
+
+ // Error messages
+ public static final String ES_INIT_ERROR_MSG = """
+
+ \t---------------Alibaba Cloud Elasticsearch Init Failed---------------
+ \t--------------Please Config Alibaba Cloud Elasticsearch--------------
+ \t----------Can Not Use Metric History Now----------
+ """;
+}
\ No newline at end of file
diff --git a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/alibabacloud/AlibabaCloudEsDataStorage.java b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/alibabacloud/AlibabaCloudEsDataStorage.java
new file mode 100644
index 00000000000..a87c5baf2f1
--- /dev/null
+++ b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/alibabacloud/AlibabaCloudEsDataStorage.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.warehouse.store.history.tsdb.alibabacloud;
+
+import com.google.common.collect.Maps;
+import io.searchbox.action.BulkableAction;
+import io.searchbox.client.JestClient;
+import io.searchbox.client.JestClientFactory;
+import io.searchbox.client.config.HttpClientConfig;
+import io.searchbox.core.Bulk;
+import io.searchbox.core.BulkResult;
+import io.searchbox.core.Delete;
+import io.searchbox.core.DocumentResult;
+import io.searchbox.core.Index;
+import io.searchbox.core.Update;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.hertzbeat.common.constants.CommonConstants;
+import org.apache.hertzbeat.common.constants.MetricDataConstants;
+import org.apache.hertzbeat.common.entity.arrow.RowWrapper;
+import org.apache.hertzbeat.common.entity.dto.Value;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+import org.apache.hertzbeat.common.util.CommonUtil;
+import org.apache.hertzbeat.common.util.JsonUtil;
+import org.apache.hertzbeat.common.util.TimePeriodUtil;
+import org.apache.hertzbeat.warehouse.store.history.tsdb.AbstractHistoryDataStorage;
+import org.apache.hertzbeat.warehouse.store.history.tsdb.vm.PromQlQueryContent;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.ResponseEntity;
+import org.springframework.http.client.support.BasicAuthenticationInterceptor;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+import org.springframework.util.ObjectUtils;
+import org.springframework.util.StringUtils;
+import org.springframework.web.client.RestTemplate;
+import org.springframework.web.util.UriComponentsBuilder;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.net.URI;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZonedDateTime;
+import java.time.temporal.ChronoUnit;
+import java.time.temporal.TemporalAmount;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hertzbeat.warehouse.store.history.tsdb.alibabacloud.AlibabaCloudEsConstants.ES_INIT_ERROR_MSG;
+import static org.apache.hertzbeat.warehouse.store.history.tsdb.alibabacloud.AlibabaCloudEsConstants.INDEX_TYPE;
+import static org.apache.hertzbeat.warehouse.store.history.tsdb.alibabacloud.AlibabaCloudEsConstants.LABEL_KEY_HOST;
+import static org.apache.hertzbeat.warehouse.store.history.tsdb.alibabacloud.AlibabaCloudEsConstants.LABEL_KEY_INSTANCE;
+import static org.apache.hertzbeat.warehouse.store.history.tsdb.alibabacloud.AlibabaCloudEsConstants.LABEL_KEY_JOB;
+import static org.apache.hertzbeat.warehouse.store.history.tsdb.alibabacloud.AlibabaCloudEsConstants.LABEL_KEY_NAME;
+import static org.apache.hertzbeat.warehouse.store.history.tsdb.alibabacloud.AlibabaCloudEsConstants.QUERY_RANGE_PATH;
+import static org.apache.hertzbeat.warehouse.store.history.tsdb.alibabacloud.AlibabaCloudEsConstants.SPLIT;
+import static org.apache.hertzbeat.warehouse.store.history.tsdb.alibabacloud.AlibabaCloudEsConstants.TIME_STREAM;
+
+/**
+ * AlibabaCloud elasticsearch data storage.
+ * Elasticsearch requires the commercial version, and it supports kernel versions 1.7.0 and above or 1.8.0 and above.
+ */
+@Slf4j
+@Component
+@ConditionalOnProperty(prefix = "warehouse.store.alibabacloud-es", name = "enabled", havingValue = "true")
+public class AlibabaCloudEsDataStorage extends AbstractHistoryDataStorage {
+
+ private final AlibabaCloudEsProperties properties;
+
+ private final RestTemplate restTemplate;
+
+ private final JestClient jestClient;
+
+ public AlibabaCloudEsDataStorage(AlibabaCloudEsProperties properties, RestTemplate restTemplate) {
+ if (properties == null) {
+ log.error("init error, please config Warehouse alibabaCloud es props in application.yml");
+ throw new IllegalArgumentException("please config Warehouse alibabaCloud es props");
+ }
+ this.properties = properties;
+ this.restTemplate = initBasicAuthRestTemplate(restTemplate);
+ this.jestClient = initJestClient();
+ this.serverAvailable = initAlibabaCloudEsDataStorage();
+ }
+
+
+ /**
+ * Used to integrate the Prometheus API.
+ *
+ * @param restTemplate Global restTemplate
+ * @return RestTemplate
+ */
+ private RestTemplate initBasicAuthRestTemplate(RestTemplate restTemplate) {
+ RestTemplate dedicated = new RestTemplate(restTemplate.getRequestFactory());
+ if (!CollectionUtils.isEmpty(restTemplate.getInterceptors())) {
+ dedicated.setInterceptors(new ArrayList<>(restTemplate.getInterceptors()));
+ }
+ if (StringUtils.hasText(properties.username())
+ && StringUtils.hasText(properties.password())) {
+ dedicated.getInterceptors().add(
+ new BasicAuthenticationInterceptor(properties.username(), properties.password())
+ );
+ }
+ return dedicated;
+ }
+
+ /**
+ * Used for batch writing
+ *
+ * @return JestClient
+ */
+ private JestClient initJestClient() {
+ JestClientFactory factory = new JestClientFactory();
+ HttpClientConfig.Builder httpClientConfigBuiler = new HttpClientConfig.Builder(properties.url());
+ // add basic auth
+ if (StringUtils.hasText(properties.username()) && StringUtils.hasText(properties.password())) {
+ CredentialsProvider provider = new BasicCredentialsProvider();
+ UsernamePasswordCredentials credentials = new UsernamePasswordCredentials(properties.username(), properties.password());
+ provider.setCredentials(AuthScope.ANY, credentials);
+ httpClientConfigBuiler.credentialsProvider(provider);
+ }
+ JestPoolConfig poolConfig = properties.pool();
+ factory.setHttpClientConfig(
+ httpClientConfigBuiler
+ .maxConnectionIdleTime(poolConfig.maxConnectionIdleTime(), TimeUnit.MILLISECONDS)
+ .multiThreaded(true)
+ .connTimeout(poolConfig.connTimeout())
+ .readTimeout(poolConfig.readTimeout())
+ .maxTotalConnection(poolConfig.currentCount())
+ .defaultMaxTotalConnectionPerRoute(poolConfig.currentCount())
+ .build());
+ return factory.getObject();
+ }
+
+ /**
+ * Used to check whether the index has been created properly.
+ * Because index configuration is highly likely to be customized.
+ *
+ * @return Is it normal
+ */
+ private boolean initAlibabaCloudEsDataStorage() {
+ try {
+ HttpHeaders headers = new HttpHeaders();
+ HttpEntity httpEntity = new HttpEntity<>(headers);
+ ResponseEntity responseEntity = restTemplate.exchange(
+ properties.url() + "/" + TIME_STREAM + "/" + properties.database(), HttpMethod.GET, httpEntity, String.class);
+ if (responseEntity.getStatusCode().is2xxSuccessful()) {
+ log.info("Check alibaba cloud es metrics server status success.");
+ return true;
+ }
+ log.error("Check alibaba cloud es metrics server status failed: {}.", responseEntity.getBody());
+ } catch (Exception e) {
+ log.error("Check alibaba cloud es metrics server status error: {}.", e.getMessage());
+ }
+ return false;
+ }
+
+ @Override
+ public Map> getHistoryMetricData(Long monitorId, String app, String metrics, String metric, String label, String history) {
+ if (!serverAvailable) {
+ log.error(ES_INIT_ERROR_MSG);
+ return Collections.emptyMap();
+ }
+ Map> instanceValuesMap = new HashMap<>(8);
+ try {
+
+ Instant now = Instant.now();
+ long start;
+ try {
+ if (NumberUtils.isParsable(history)) {
+ start = NumberUtils.toLong(history);
+ start = (ZonedDateTime.now().toEpochSecond() - start);
+ } else {
+ TemporalAmount temporalAmount = TimePeriodUtil.parseTokenTime(history);
+ assert temporalAmount != null;
+ Instant dateTime = now.minus(temporalAmount);
+ start = dateTime.getEpochSecond();
+ }
+ } catch (Exception e) {
+ log.error("history time error: {}. use default: 6h", e.getMessage());
+ start = now.minus(6, ChronoUnit.HOURS).getEpochSecond();
+ }
+
+ long end = now.getEpochSecond();
+ String step = "60s";
+ if (end - start < Duration.ofDays(7).getSeconds() && end - start > Duration.ofDays(1).getSeconds()) {
+ step = "1h";
+ } else if (end - start >= Duration.ofDays(7).getSeconds()) {
+ step = "4h";
+ }
+
+ String metricsName = metrics + SPLIT + metric;
+ if (CommonConstants.PROMETHEUS.equals(app)) {
+ metricsName = metrics;
+ }
+
+ String timeSeriesSelector = LABEL_KEY_INSTANCE + "=\"" + monitorId + "\"";
+ HttpHeaders headers = new HttpHeaders();
+ HttpEntity httpEntity = new HttpEntity<>(headers);
+
+ URI uri = UriComponentsBuilder.fromUriString(properties.url() + "/" + String.format(QUERY_RANGE_PATH, properties.database()))
+ .queryParam(URLEncoder.encode("query", StandardCharsets.UTF_8), URLEncoder.encode(metricsName + "{" + timeSeriesSelector + "}", StandardCharsets.UTF_8))
+ .queryParam("start", start)
+ .queryParam("end", end)
+ .queryParam("step", step)
+ .build(true).toUri();
+
+ ResponseEntity responseEntity = restTemplate.exchange(uri, HttpMethod.GET, httpEntity, PromQlQueryContent.class);
+ if (responseEntity.getStatusCode().is2xxSuccessful()) {
+ log.debug("query metrics data from alibaba cloud es success. {}", uri);
+ if (responseEntity.getBody().getData() != null && responseEntity.getBody().getData().getResult() != null) {
+ List contents = responseEntity.getBody().getData().getResult();
+ for (PromQlQueryContent.ContentData.Content content : contents) {
+ Map labels = content.getMetric();
+ labels.remove(LABEL_KEY_NAME);
+ labels.remove(LABEL_KEY_JOB);
+ labels.remove(LABEL_KEY_INSTANCE);
+ String labelStr = JsonUtil.toJson(labels);
+ if (content.getValues() != null && !content.getValues().isEmpty()) {
+ List valueList = instanceValuesMap.computeIfAbsent(labelStr, k -> new LinkedList<>());
+ for (Object[] valueArr : content.getValues()) {
+ long timestamp = ((Number) valueArr[0]).longValue();
+ String value = new BigDecimal(String.valueOf(valueArr[1])).setScale(4, RoundingMode.HALF_UP).stripTrailingZeros().toPlainString();
+ valueList.add(new Value(value, timestamp * 1000));
+ }
+ }
+ }
+ }
+ } else {
+ log.error("query metrics data from alibaba cloud es failed. {}", responseEntity);
+ }
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ }
+ return instanceValuesMap;
+ }
+
+ @Override
+ public Map> getHistoryIntervalMetricData(Long monitorId, String app, String metrics, String metric, String label, String history) {
+ return getHistoryMetricData(monitorId, app, metrics, metric, label, history);
+ }
+
+ @Override
+ public void saveData(CollectRep.MetricsData metricsData) {
+ if (!validateAndInitializeServer(metricsData)) {
+ return;
+ }
+
+ Map defaultLabels = createDefaultLabels(metricsData);
+ List entities = processMetricsData(metricsData, defaultLabels);
+
+ if (entities.isEmpty()) {
+ return;
+ }
+
+ bulkWriteToElasticsearch(entities);
+ }
+
+ private boolean validateAndInitializeServer(CollectRep.MetricsData metricsData) {
+ if (!isServerAvailable()) {
+ serverAvailable = initAlibabaCloudEsDataStorage();
+ }
+ if (!isServerAvailable() || null == metricsData || metricsData.getCode() != CollectRep.Code.SUCCESS) {
+ return false;
+ }
+ if (metricsData.rowCount() == 0) {
+ log.info("[warehouse alibabaCloud es metrics data {} is null, ignore.", metricsData.getId());
+ return false;
+ }
+ return true;
+ }
+
+ private Map createDefaultLabels(CollectRep.MetricsData metricsData) {
+ Map defaultLabels = Maps.newHashMapWithExpectedSize(4);
+ boolean isPrometheusAuto = metricsData.getApp().startsWith(CommonConstants.PROMETHEUS_APP_PREFIX);
+ if (isPrometheusAuto) {
+ defaultLabels.put(LABEL_KEY_JOB, metricsData.getApp().substring(CommonConstants.PROMETHEUS_APP_PREFIX.length()));
+ } else {
+ defaultLabels.put(LABEL_KEY_JOB, metricsData.getApp());
+ }
+ defaultLabels.put(LABEL_KEY_INSTANCE, String.valueOf(metricsData.getId()));
+ return defaultLabels;
+ }
+
+ private List processMetricsData(CollectRep.MetricsData metricsData, Map defaultLabels) {
+ List entities = new ArrayList<>();
+ boolean isPrometheusAuto = metricsData.getApp().startsWith(CommonConstants.PROMETHEUS_APP_PREFIX);
+
+ try {
+ Map fieldsValue = Maps.newHashMapWithExpectedSize(8);
+ Map labels = Maps.newHashMapWithExpectedSize(8);
+
+ RowWrapper rowWrapper = metricsData.readRow();
+
+ while (rowWrapper.hasNextRow()) {
+ rowWrapper = rowWrapper.nextRow();
+ labels.clear();
+ fieldsValue.clear();
+
+ extractFieldsAndLabels(rowWrapper, fieldsValue, labels);
+ List indexedEntities = createTimeStreamEntities(fieldsValue, labels, defaultLabels, metricsData, isPrometheusAuto);
+ entities.addAll(indexedEntities);
+ }
+ } catch (Exception e) {
+ log.error("[warehouse alibaba es] Error processing metrics data: {}", e.getMessage(), e);
+ }
+ return entities;
+ }
+
+ private void extractFieldsAndLabels(RowWrapper rowWrapper, Map fieldsValue, Map labels) {
+ rowWrapper.cellStream().forEach(cell -> {
+ String value = cell.getValue();
+ boolean isLabel = cell.getMetadataAsBoolean(MetricDataConstants.LABEL);
+ byte type = cell.getMetadataAsByte(MetricDataConstants.TYPE);
+
+ if (type == CommonConstants.TYPE_NUMBER && !isLabel) {
+ // number metrics data
+ if (!CommonConstants.NULL_VALUE.equals(value)) {
+ fieldsValue.put(cell.getField().getName(), CommonUtil.parseStrDouble(value));
+ }
+ }
+ // label
+ if (isLabel && !CommonConstants.NULL_VALUE.equals(value)) {
+ labels.put(cell.getField().getName(), value);
+ }
+ });
+ }
+
+ private List createTimeStreamEntities(Map fieldsValue,
+ Map labels,
+ Map defaultLabels,
+ CollectRep.MetricsData metricsData,
+ boolean isPrometheusAuto) {
+ List entities = new ArrayList<>(fieldsValue.size());
+
+ // Pre-calculate common values to avoid repeated computation
+ final String instanceHost = metricsData.getInstanceHost();
+ final String metricsPrefix = metricsData.getMetrics();
+ final long timestamp = metricsData.getTime();
+ final Map customizedLabels = metricsData.getLabels();
+ final boolean hasCustomizedLabels = !ObjectUtils.isEmpty(customizedLabels);
+
+ for (Map.Entry entry : fieldsValue.entrySet()) {
+ if (entry.getKey() != null && entry.getValue() != null) {
+ try {
+ Map entityLabels = Maps.newHashMapWithExpectedSize(8);
+ entityLabels.putAll(labels);
+ entityLabels.putAll(defaultLabels);
+ entityLabels.put(LABEL_KEY_HOST, instanceHost);
+
+ if (hasCustomizedLabels) {
+ entityLabels.putAll(customizedLabels);
+ }
+
+ String metricsName = isPrometheusAuto ? metricsPrefix : metricsPrefix + SPLIT + entry.getKey();
+
+ TimeStreamIndexedEntity indexedEntity = TimeStreamIndexedEntity.builder()
+ .labels(entityLabels)
+ .metrics(Map.of(metricsName, entry.getValue()))
+ .timestamp(timestamp)
+ .operator(TimeStreamIndexedEntity.Operator.INSERT)
+ .build();
+ entities.add(indexedEntity);
+ } catch (Exception e) {
+ log.error("combine metrics data error: {}.", e.getMessage(), e);
+ }
+ }
+ }
+
+ return entities;
+ }
+
+ private void bulkWriteToElasticsearch(List entities) {
+ try {
+ List> actions = new ArrayList<>(entities.size());
+ for (TimeStreamIndexedEntity entity : entities) {
+ if (entity != null) {
+ BulkableAction action = buildAction(entity);
+ actions.add(action);
+ }
+ }
+
+ if (actions.isEmpty()) {
+ return;
+ }
+
+ Bulk bulk = new Bulk.Builder()
+ .defaultIndex(properties.database())
+ .defaultType(INDEX_TYPE)
+ .addAction(actions)
+ .build();
+ BulkResult bulkResult = jestClient.execute(bulk);
+
+ if (!bulkResult.isSucceeded()) {
+ log.error("[warehouse alibaba es] write failed, res: {}", bulkResult.getJsonString());
+ }
+ } catch (IOException e) {
+ log.error("[warehouse alibaba es] IO error writing to Elasticsearch: {}", e.getMessage(), e);
+ } catch (Exception e) {
+ log.error("[warehouse alibaba es] Error writing to Elasticsearch: {}", e.getMessage(), e);
+ }
+ }
+
+ private BulkableAction buildAction(TimeStreamIndexedEntity indexedEntity) {
+ Map actionParams = indexedEntity.getActionParams();
+ TimeStreamIndexedEntity.Operator operator = indexedEntity.getOperator();
+ indexedEntity.clear();
+ if (TimeStreamIndexedEntity.Operator.DELETE.equals(operator)) {
+ Delete.Builder builder = new Delete.Builder(indexedEntity.getId());
+ if (MapUtils.isNotEmpty(actionParams)) {
+ actionParams.forEach(builder::setParameter);
+ }
+ return builder.build();
+ } else if (TimeStreamIndexedEntity.Operator.UPDATE.equals(operator)) {
+ Update.Builder builder = new Update.Builder(JsonUtil.toJson(indexedEntity)).id(indexedEntity.getId());
+ if (MapUtils.isNotEmpty(actionParams)) {
+ actionParams.forEach(builder::setParameter);
+ }
+ return builder.build();
+ } else {
+ Index.Builder builder = new Index.Builder(JsonUtil.toJson(indexedEntity));
+ if (MapUtils.isNotEmpty(actionParams)) {
+ actionParams.forEach(builder::setParameter);
+ }
+ return builder.build();
+ }
+ }
+
+ @Override
+ public void destroy() throws Exception {
+ if (null != jestClient) {
+ jestClient.close();
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/alibabacloud/AlibabaCloudEsProperties.java b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/alibabacloud/AlibabaCloudEsProperties.java
new file mode 100644
index 00000000000..9df033c335a
--- /dev/null
+++ b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/alibabacloud/AlibabaCloudEsProperties.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.warehouse.store.history.tsdb.alibabacloud;
+
+import org.apache.hertzbeat.common.constants.ConfigConstants;
+import org.apache.hertzbeat.common.constants.SignConstants;
+import org.apache.hertzbeat.warehouse.constants.WarehouseConstants;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.boot.context.properties.bind.DefaultValue;
+
+/**
+ * AlibabaCloud elasticsearch configuration information
+ */
+@ConfigurationProperties(prefix = ConfigConstants.FunctionModuleConstants.WAREHOUSE
+ + SignConstants.DOT
+ + WarehouseConstants.STORE
+ + SignConstants.DOT
+ + WarehouseConstants.HistoryName.ALIBABACLOUD_ES)
+public record AlibabaCloudEsProperties(@DefaultValue("false") boolean enabled,
+ @DefaultValue("http://127.0.0.1:9200") String url,
+ @DefaultValue("hertzbeat") String database,
+ String username,
+ String password,
+ JestPoolConfig pool) {
+ public JestPoolConfig pool() {
+ return pool != null ? pool : new JestPoolConfig();
+ }
+}
\ No newline at end of file
diff --git a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/alibabacloud/JestPoolConfig.java b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/alibabacloud/JestPoolConfig.java
new file mode 100644
index 00000000000..3633c7fa020
--- /dev/null
+++ b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/alibabacloud/JestPoolConfig.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.warehouse.store.history.tsdb.alibabacloud;
+
+/**
+ * AlibabaCloud elasticsearch jest client pool configuration information
+ */
+public record JestPoolConfig(int connTimeout, int readTimeout, long maxConnectionIdleTime, int currentCount) {
+ public JestPoolConfig() {
+ this(3000, 5000, 20000, 50);
+ }
+}
diff --git a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/alibabacloud/TimeStreamIndexedEntity.java b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/alibabacloud/TimeStreamIndexedEntity.java
new file mode 100644
index 00000000000..3334cefeafa
--- /dev/null
+++ b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/alibabacloud/TimeStreamIndexedEntity.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.warehouse.store.history.tsdb.alibabacloud;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.gson.annotations.SerializedName;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+
+/**
+ * Data entitie
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class TimeStreamIndexedEntity {
+
+ /**
+ * Document ID
+ */
+ private String id;
+
+ /**
+ * Tag list
+ * Example: {"env":"test"}
+ */
+ private Map labels;
+
+ /**
+ * Metric data collection, where metrics can only be of type long or double.
+ */
+ private Map metrics;
+
+ /**
+ * Current time
+ */
+ @SerializedName("@timestamp")
+ @JsonProperty("@timestamp")
+ private long timestamp;
+
+ /**
+ * Operation Type - Reserved
+ */
+ private Operator operator;
+
+ /**
+ * Parameters
+ */
+ private Map actionParams;
+
+
+ public void clear() {
+ this.operator = null;
+ this.actionParams = null;
+ }
+
+ enum Operator {
+ INSERT, UPDATE, DELETE
+ }
+
+}
\ No newline at end of file
diff --git a/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/alibabacloud/AlibabaCloudEsDataStorageTest.java b/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/alibabacloud/AlibabaCloudEsDataStorageTest.java
new file mode 100644
index 00000000000..64cb529128c
--- /dev/null
+++ b/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/alibabacloud/AlibabaCloudEsDataStorageTest.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.warehouse.store.history.tsdb.alibabacloud;
+
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import io.searchbox.client.JestClient;
+import io.searchbox.core.Bulk;
+import io.searchbox.core.BulkResult;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.hertzbeat.common.constants.CommonConstants;
+import org.apache.hertzbeat.common.constants.MetricDataConstants;
+import org.apache.hertzbeat.common.entity.arrow.ArrowCell;
+import org.apache.hertzbeat.common.entity.arrow.RowWrapper;
+import org.apache.hertzbeat.common.entity.dto.Value;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+import org.apache.hertzbeat.common.util.JsonUtil;
+import org.apache.hertzbeat.warehouse.store.history.tsdb.vm.PromQlQueryContent;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestTemplate;
+
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.net.URLDecoder;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test case for {@link AlibabaCloudEsDataStorage}
+ */
+@ExtendWith(MockitoExtension.class)
+class AlibabaCloudEsDataStorageTest {
+
+ @Mock
+ private AlibabaCloudEsProperties properties;
+
+ private AlibabaCloudEsDataStorage dataStorage;
+
+ @Test
+ void testConstructorWithNullProperties() {
+ IllegalArgumentException exception = assertThrows(
+ IllegalArgumentException.class,
+ () -> new AlibabaCloudEsDataStorage(null, null)
+ );
+ assertEquals("please config Warehouse alibabaCloud es props", exception.getMessage());
+ }
+
+ @Test
+ void testConstructorInitServerAvailable() {
+ when(properties.url()).thenReturn("http://localhost:9200");
+ when(properties.database()).thenReturn("hertzbeat");
+ when(properties.username()).thenReturn("elastic");
+ when(properties.password()).thenReturn("password");
+ when(properties.pool()).thenReturn(new JestPoolConfig(3000, 5000, 20000, 50));
+
+ assertDoesNotThrow(() -> {
+ dataStorage = new AlibabaCloudEsDataStorage(properties, new RestTemplate());
+ assertNotNull(dataStorage);
+ assertFalse(dataStorage.isServerAvailable());
+ });
+ }
+
+ @Test
+ void testIsServerAvailableWhenServerUp() {
+ when(properties.url()).thenReturn("http://localhost:9200");
+ when(properties.database()).thenReturn("hertzbeat");
+ when(properties.username()).thenReturn("elastic");
+ when(properties.password()).thenReturn("password");
+ when(properties.pool()).thenReturn(new JestPoolConfig(3000, 5000, 20000, 50));
+
+ dataStorage = new AlibabaCloudEsDataStorage(properties, new RestTemplate());
+ // Use reflection to simulate server being available
+ // This avoids the complexity of mocking the HTTP calls which get wrapped in a new RestTemplate instance
+ setServerAvailable(dataStorage, true);
+ assertTrue(dataStorage.isServerAvailable());
+ }
+
+ @Test
+ void testIsServerAvailableWhenServerDown() {
+ when(properties.url()).thenReturn("http://localhost:9200");
+ when(properties.database()).thenReturn("hertzbeat");
+ when(properties.username()).thenReturn("elastic");
+ when(properties.password()).thenReturn("password");
+ when(properties.pool()).thenReturn(new JestPoolConfig(3000, 5000, 20000, 50));
+
+ dataStorage = new AlibabaCloudEsDataStorage(properties, new RestTemplate());
+ assertFalse(dataStorage.isServerAvailable());
+ }
+
+ @Test
+ void testSaveDataWithNullMetrics() {
+ when(properties.url()).thenReturn("http://localhost:9200");
+ when(properties.database()).thenReturn("hertzbeat");
+ when(properties.username()).thenReturn("elastic");
+ when(properties.password()).thenReturn("password");
+ when(properties.pool()).thenReturn(new JestPoolConfig(3000, 5000, 20000, 50));
+
+ dataStorage = new AlibabaCloudEsDataStorage(properties, new RestTemplate());
+ setServerAvailable(dataStorage, true);
+ assertDoesNotThrow(() -> dataStorage.saveData(null));
+
+ CollectRep.MetricsData metricsData = mock(CollectRep.MetricsData.class);
+ when(metricsData.getCode()).thenReturn(CollectRep.Code.FAIL);
+ assertDoesNotThrow(() -> dataStorage.saveData(metricsData));
+ }
+
+ @Test
+ void testSaveDataWithEmptyValues() {
+ when(properties.url()).thenReturn("http://localhost:9200");
+ when(properties.database()).thenReturn("hertzbeat");
+ when(properties.username()).thenReturn("elastic");
+ when(properties.password()).thenReturn("password");
+ when(properties.pool()).thenReturn(new JestPoolConfig(3000, 5000, 20000, 50));
+
+
+ dataStorage = new AlibabaCloudEsDataStorage(properties, new RestTemplate());
+ setServerAvailable(dataStorage, true);
+
+ CollectRep.MetricsData metricsData = mock(CollectRep.MetricsData.class);
+ when(metricsData.getCode()).thenReturn(CollectRep.Code.SUCCESS);
+
+ assertDoesNotThrow(() -> dataStorage.saveData(metricsData));
+ }
+
+ @Test
+ void testSaveDataWithValidData() throws Exception {
+ when(properties.url()).thenReturn("http://localhost:9200");
+ when(properties.database()).thenReturn("hertzbeat");
+ when(properties.username()).thenReturn("elastic");
+ when(properties.password()).thenReturn("password");
+ when(properties.pool()).thenReturn(new JestPoolConfig(3000, 5000, 20000, 50));
+
+
+ dataStorage = new AlibabaCloudEsDataStorage(properties, new RestTemplate());
+ setServerAvailable(dataStorage, true);
+
+ // Mock JestClient
+ JestClient mockJestClient = mock(JestClient.class);
+ BulkResult mockBulkResult = mock(BulkResult.class);
+ when(mockBulkResult.isSucceeded()).thenReturn(true);
+ when(mockJestClient.execute(any(Bulk.class))).thenReturn(mockBulkResult);
+
+ // Use reflection to set the mocked JestClient
+ Field jestClientField = dataStorage.getClass().getDeclaredField("jestClient");
+ jestClientField.setAccessible(true);
+ jestClientField.set(dataStorage, mockJestClient);
+
+ CollectRep.MetricsData metricsData = createMockMetricsData();
+
+ // Execute saveData
+ assertDoesNotThrow(() -> dataStorage.saveData(metricsData));
+
+ // Verify that jestClient.execute was called with a Bulk object
+ verify(mockJestClient, times(1)).execute(any(Bulk.class));
+
+ // Capture the Bulk argument to verify its properties
+ ArgumentCaptor bulkCaptor = ArgumentCaptor.forClass(Bulk.class);
+ verify(mockJestClient).execute(bulkCaptor.capture());
+
+ Bulk capturedBulk = bulkCaptor.getValue();
+ assertNotNull(capturedBulk, "Bulk object should not be null");
+
+ String bulkData = capturedBulk.getData(new Gson());
+ assertNotNull(bulkData, "Bulk data should not be null");
+
+ String[] lines = bulkData.trim().split("\n");
+ assertTrue(lines.length >= 2, "Bulk data should contain at least 2 lines (index + document)");
+
+ // Verify first line contains index operation
+ String indexLine = lines[0];
+ assertTrue(indexLine.contains("{\"index\":{}}"), "First line should contain index operation");
+
+ // Verify second line contains document data
+ String documentLine = lines[1];
+ assertTrue(documentLine.contains("\"labels\":"), "Document should contain labels field");
+ assertTrue(documentLine.contains("\"metrics\":"), "Document should contain metrics field");
+ assertTrue(documentLine.contains("\"@timestamp\":"), "Document should contain @timestamp field");
+
+ TimeStreamIndexedEntity indexedEntity = JsonUtil.fromJson(documentLine, TimeStreamIndexedEntity.class);
+ assertEquals(indexedEntity.getLabels().get("instance"), "0");
+ assertEquals(indexedEntity.getLabels().get("job"), "app");
+ assertEquals(indexedEntity.getMetrics().get("cpu_instance"), 68.7);
+ assertEquals(indexedEntity.getTimestamp(), 1755794346092L);
+
+ }
+
+ @Test
+ void testGetHistoryMetricData() {
+ when(properties.url()).thenReturn("http://localhost:9200");
+ when(properties.database()).thenReturn("hertzbeat");
+ when(properties.username()).thenReturn("elastic");
+ when(properties.password()).thenReturn("password");
+ when(properties.pool()).thenReturn(new JestPoolConfig(3000, 5000, 20000, 50));
+
+ dataStorage = new AlibabaCloudEsDataStorage(properties, new RestTemplate());
+ setServerAvailable(dataStorage, true);
+
+ // Mock RestTemplate
+ RestTemplate restTemplate = mock(RestTemplate.class);
+ ResponseEntity responseEntity = mock(ResponseEntity.class);
+ PromQlQueryContent promQlQueryContent = new PromQlQueryContent();
+ PromQlQueryContent.ContentData contentData = new PromQlQueryContent.ContentData();
+ contentData.setResultType("matrix");
+ PromQlQueryContent.ContentData.Content content = new PromQlQueryContent.ContentData.Content();
+
+ Map metricMap = new HashMap<>();
+ metricMap.put("host", "127.0.0.1");
+ metricMap.put("instance", "545707837213952");
+ metricMap.put("job", "springboot3");
+ metricMap.put("space", "CodeCache");
+ metricMap.put("__name__", "memory_used_mem_used");
+ content.setMetric(metricMap);
+
+ Object[] objects = {1755709577L, "12.5006"};
+ List objects1 = new ArrayList<>();
+ objects1.add(objects);
+ content.setValues(objects1);
+
+ contentData.setResult(Lists.newArrayList(content));
+ promQlQueryContent.setData(contentData);
+ when(responseEntity.getStatusCode()).thenReturn(HttpStatus.OK);
+ when(responseEntity.getBody()).thenReturn(promQlQueryContent);
+
+ ArgumentCaptor uriCaptor = ArgumentCaptor.forClass(URI.class);
+
+ doReturn(responseEntity).when(restTemplate).exchange(
+ uriCaptor.capture(),
+ eq(HttpMethod.GET),
+ any(HttpEntity.class),
+ eq(PromQlQueryContent.class)
+ );
+
+ setRestTemplate(dataStorage, restTemplate);
+
+ // Test parameters for URI validation
+ Long monitorId = 545707837213952L;
+ String app = "springboot3";
+ String metrics = "memory_used";
+ String metric = "mem_used";
+ String label = null;
+ String history = "6h";
+
+ Map> result = dataStorage.getHistoryMetricData(
+ monitorId, app, metrics, metric, label, history
+ );
+ assertNotNull(result);
+ List values = result.get("{\"host\":\"127.0.0.1\",\"space\":\"CodeCache\"}");
+ assertNotNull(values);
+ assertEquals(values.get(0).getTime(), 1755709577000L);
+ assertEquals(values.get(0).getOrigin(), "12.5006");
+
+ URI capturedUri = uriCaptor.getValue();
+ assertNotNull(capturedUri, "Captured URI should not be null");
+ // Verify URI components
+ String uriString = URLDecoder.decode(capturedUri.toString(), StandardCharsets.UTF_8);
+ // Verify base URL and path
+ assertTrue(uriString.startsWith("http://localhost:9200/_time_stream/prom/hertzbeat/query_range"), "URI should start with base URL");
+ // Verify queryParams
+ Map queryParams = parseQueryParams(capturedUri.getQuery());
+ assertNotNull(queryParams.get("query"));
+ assertNotNull(queryParams.get("start"));
+ assertNotNull(queryParams.get("end"));
+ assertNotNull(queryParams.get("step"));
+ assertEquals("memory_used_mem_used{instance=\"545707837213952\"}", queryParams.get("query"));
+ }
+
+ @Test
+ void testDestroy() {
+ when(properties.url()).thenReturn("http://localhost:9200");
+ when(properties.database()).thenReturn("hertzbeat");
+ when(properties.username()).thenReturn("elastic");
+ when(properties.password()).thenReturn("password");
+ when(properties.pool()).thenReturn(new JestPoolConfig(3000, 5000, 20000, 50));
+
+ dataStorage = new AlibabaCloudEsDataStorage(properties, new RestTemplate());
+
+ assertDoesNotThrow(() -> dataStorage.destroy());
+ }
+
+ /**
+ * Create a mock MetricsData for testing
+ */
+ private CollectRep.MetricsData createMockMetricsData() {
+ CollectRep.MetricsData mockMetricsData = Mockito.mock(CollectRep.MetricsData.class);
+ when(mockMetricsData.getId()).thenReturn(0L);
+ when(mockMetricsData.getMetrics()).thenReturn("cpu");
+ when(mockMetricsData.getCode()).thenReturn(CollectRep.Code.SUCCESS);
+ when(mockMetricsData.getApp()).thenReturn("app");
+ when(mockMetricsData.getTime()).thenReturn(1755794346092L);
+ when(mockMetricsData.rowCount()).thenReturn(1L);
+
+ ArrowType instanceArrowType = new ArrowType.Utf8();
+ FieldType instanceFieldType = new FieldType(true, instanceArrowType, null, null);
+
+ org.apache.arrow.vector.types.pojo.Field instanceRealField = new org.apache.arrow.vector.types.pojo.Field("instance", instanceFieldType, null);
+ ArrowCell instanceCell = Mockito.mock(ArrowCell.class);
+ when(instanceCell.getField()).thenReturn(instanceRealField);
+ when(instanceCell.getValue()).thenReturn("server-test-01");
+ when(instanceCell.getMetadataAsBoolean(MetricDataConstants.LABEL)).thenReturn(true);
+ when(instanceCell.getMetadataAsByte(MetricDataConstants.TYPE)).thenReturn(CommonConstants.TYPE_STRING);
+
+ ArrowCell usageCell = Mockito.mock(ArrowCell.class);
+ when(usageCell.getField()).thenReturn(instanceRealField);
+ when(usageCell.getValue()).thenReturn("68.7");
+ when(usageCell.getMetadataAsBoolean(MetricDataConstants.LABEL)).thenReturn(false);
+ when(usageCell.getMetadataAsByte(MetricDataConstants.TYPE)).thenReturn(CommonConstants.TYPE_NUMBER);
+ List mockCells = List.of(instanceCell, usageCell);
+
+ RowWrapper mockRowWrapper = Mockito.mock(RowWrapper.class);
+ when(mockRowWrapper.hasNextRow()).thenReturn(true).thenReturn(false);
+ when(mockRowWrapper.nextRow()).thenReturn(mockRowWrapper);
+ when(mockRowWrapper.cellStream()).thenAnswer(invocation -> mockCells.stream());
+ when(mockMetricsData.readRow()).thenReturn(mockRowWrapper);
+ return mockMetricsData;
+ }
+
+ private void setServerAvailable(AlibabaCloudEsDataStorage dataStorage, boolean available) {
+ try {
+ Field serverAvailableField = dataStorage.getClass().getSuperclass().getDeclaredField("serverAvailable");
+ serverAvailableField.setAccessible(true);
+ serverAvailableField.set(dataStorage, available);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to set serverAvailable field", e);
+ }
+ }
+
+ private void setRestTemplate(AlibabaCloudEsDataStorage dataStorage, RestTemplate restTemplate) {
+ try {
+ Field restTemplateField = dataStorage.getClass().getDeclaredField("restTemplate");
+ restTemplateField.setAccessible(true);
+ restTemplateField.set(dataStorage, restTemplate);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to set restTemplate field", e);
+ }
+ }
+
+ /**
+ * Parse query parameters from URI query string
+ */
+ private Map parseQueryParams(String query) {
+ Map params = new HashMap<>();
+ if (query != null && !query.isEmpty()) {
+ String[] pairs = query.split("&");
+ for (String pair : pairs) {
+ String[] keyValue = pair.split("=", 2);
+ if (keyValue.length == 2) {
+ try {
+ String key = java.net.URLDecoder.decode(keyValue[0], "UTF-8");
+ String value = java.net.URLDecoder.decode(keyValue[1], "UTF-8");
+ params.put(key, value);
+ } catch (Exception e) {
+ // Skip malformed parameters
+ }
+ }
+ }
+ }
+ return params;
+ }
+
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 30cdf008282..a5b7765951d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -175,6 +175,7 @@
0.11.0
18.1.0
1.1.10.7
+ 6.3.1
2.13.1
2.15.0
2.14.0-alpha