Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect;

import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
Expand All @@ -12,20 +11,18 @@
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class ConnectClusterMetrics extends BaseMetrics {
private Long connectClusterId;
protected Long connectClusterId;

public ConnectClusterMetrics(Long clusterPhyId, Long connectClusterId){
public ConnectClusterMetrics(Long clusterPhyId, Long connectClusterId ){
super(clusterPhyId);
this.connectClusterId = connectClusterId;
}

public static ConnectClusterMetrics initWithMetric(Long connectClusterId, String metric, Float value) {
ConnectClusterMetrics brokerMetrics = new ConnectClusterMetrics(connectClusterId, connectClusterId);
brokerMetrics.putMetric(metric, value);
return brokerMetrics;
public ConnectClusterMetrics(Long connectClusterId, String metricName, Float metricValue) {
this(null, connectClusterId);
this.putMetric(metricName, metricValue);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect;

import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
Expand All @@ -11,25 +9,19 @@
* @date 2022/11/2
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class ConnectWorkerMetrics extends BaseMetrics {

private Long connectClusterId;

public class ConnectWorkerMetrics extends ConnectClusterMetrics {
private String workerId;

public static ConnectWorkerMetrics initWithMetric(Long connectClusterId, String workerId, String metric, Float value) {
ConnectWorkerMetrics connectWorkerMetrics = new ConnectWorkerMetrics();
connectWorkerMetrics.setConnectClusterId(connectClusterId);
connectWorkerMetrics.setWorkerId(workerId);
connectWorkerMetrics.putMetric(metric, value);
return connectWorkerMetrics;
public ConnectWorkerMetrics(Long connectClusterId, String workerId, String metricName, Float metricValue) {
super(null, connectClusterId);
this.workerId = workerId;
this.putMetric(metricName, metricValue);
}

@Override
public String unique() {
return "KCC@" + clusterPhyId + "@" + connectClusterId + "@" + workerId;
return "KCW@" + clusterPhyId + "@" + connectClusterId + "@" + workerId;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect;

import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
Expand All @@ -12,24 +11,21 @@
@Data
@NoArgsConstructor
@ToString
public class ConnectorMetrics extends BaseMetrics {
private Long connectClusterId;
public class ConnectorMetrics extends ConnectClusterMetrics {
protected String connectorName;

private String connectorName;

private String connectorNameAndClusterId;
protected String connectorNameAndClusterId;

public ConnectorMetrics(Long connectClusterId, String connectorName) {
super(null);
super(null, connectClusterId);
this.connectClusterId = connectClusterId;
this.connectorName = connectorName;
this.connectorNameAndClusterId = connectorName + "#" + connectClusterId;
}

public static ConnectorMetrics initWithMetric(Long connectClusterId, String connectorName, String metricName, Float value) {
ConnectorMetrics metrics = new ConnectorMetrics(connectClusterId, connectorName);
metrics.putMetric(metricName, value);
return metrics;
public ConnectorMetrics(Long connectClusterId, String connectorName, String metricName, Float metricValue) {
this(connectClusterId, connectorName);
this.putMetric(metricName, metricValue);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect;

import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
Expand All @@ -12,11 +11,7 @@
@Data
@NoArgsConstructor
@ToString
public class ConnectorTaskMetrics extends BaseMetrics {
private Long connectClusterId;

private String connectorName;

public class ConnectorTaskMetrics extends ConnectorMetrics {
private Integer taskId;

public ConnectorTaskMetrics(Long connectClusterId, String connectorName, Integer taskId) {
Expand All @@ -25,14 +20,13 @@ public ConnectorTaskMetrics(Long connectClusterId, String connectorName, Integer
this.taskId = taskId;
}

public static ConnectorTaskMetrics initWithMetric(Long connectClusterId, String connectorName, Integer taskId, String metricName, Float value) {
ConnectorTaskMetrics metrics = new ConnectorTaskMetrics(connectClusterId, connectorName, taskId);
metrics.putMetric(metricName,value);
return metrics;
public ConnectorTaskMetrics(Long connectClusterId, String connectorName, Integer taskId, String metricName, Float metricValue) {
this(connectClusterId, connectorName, taskId);
this.putMetric(metricName, metricValue);
}

@Override
public String unique() {
return "KCOR@" + connectClusterId + "@" + connectorName + "@" + taskId;
return "KCORT@" + connectClusterId + "@" + connectorName + "@" + taskId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.xiaojukeji.know.streaming.km.common.enums.connect;

import org.apache.kafka.connect.runtime.AbstractStatus;

/**
* connector运行状态
* @see AbstractStatus
*/
public enum ConnectStatusEnum {
UNASSIGNED(0, "UNASSIGNED"),

RUNNING(1,"RUNNING"),

PAUSED(2,"PAUSED"),

FAILED(3, "FAILED"),

DESTROYED(4, "DESTROYED"),

UNKNOWN(-1, "UNKNOWN")

;

ConnectStatusEnum(int status, String value) {
this.status = status;
this.value = value;
}

private final int status;

private final String value;

public static ConnectStatusEnum getByValue(String value) {
for (ConnectStatusEnum statusEnum: ConnectStatusEnum.values()) {
if (statusEnum.value.equals(value)) {
return statusEnum;
}
}

return ConnectStatusEnum.UNKNOWN;
}

public int getStatus() {
return status;
}

public String getValue() {
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterMetricService;
import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService;
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseConnectorMetricService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseConnectMetricService;
import com.xiaojukeji.know.streaming.km.persistence.connect.ConnectJMXClient;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.ConnectClusterMetricESDAO;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.cluster.ConnectClusterMetricESDAO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
Expand All @@ -43,7 +43,7 @@
* @author didi
*/
@Service
public class ConnectClusterMetricServiceImpl extends BaseConnectorMetricService implements ConnectClusterMetricService {
public class ConnectClusterMetricServiceImpl extends BaseConnectMetricService implements ConnectClusterMetricService {
protected static final ILog LOGGER = LogFactory.getLog(ConnectClusterMetricServiceImpl.class);

public static final String CONNECT_CLUSTER_METHOD_GET_WORKER_METRIC_AVG = "getWorkerMetricAvg";
Expand Down Expand Up @@ -86,8 +86,7 @@ public Result<ConnectClusterMetrics> collectConnectClusterMetricsFromKafkaWithCa
String connectClusterMetricKey = CollectedMetricsLocalCache.genConnectClusterMetricCacheKey(connectClusterPhyId, metric);
Float keyValue = CollectedMetricsLocalCache.getConnectClusterMetrics(connectClusterMetricKey);
if (keyValue != null) {
ConnectClusterMetrics connectClusterMetrics = ConnectClusterMetrics.initWithMetric(connectClusterPhyId,metric,keyValue);
return Result.buildSuc(connectClusterMetrics);
return Result.buildSuc(new ConnectClusterMetrics(connectClusterPhyId, metric, keyValue));
}

Result<ConnectClusterMetrics> ret = this.collectConnectClusterMetricsFromKafka(connectClusterPhyId, metric);
Expand Down Expand Up @@ -209,8 +208,7 @@ private Result<ConnectWorkerMetrics> getConnectWorkerMetricByJMX(Long connectClu
try {
//2、获取jmx指标
String value = jmxConnectorWrap.getAttribute(new ObjectName(jmxInfo.getJmxObjectName()), jmxInfo.getJmxAttribute()).toString();
ConnectWorkerMetrics connectWorkerMetrics = ConnectWorkerMetrics.initWithMetric(connectClusterId, workerId, metric, Float.valueOf(value));
return Result.buildSuc(connectWorkerMetrics);
return Result.buildSuc(new ConnectWorkerMetrics(connectClusterId, workerId, metric, Float.valueOf(value)));
} catch (Exception e) {
LOGGER.error("method=getConnectWorkerMetricsByJMX||connectClusterId={}||workerId={}||metrics={}||jmx={}||msg={}",
connectClusterId, workerId, metric, jmxInfo.getJmxObjectName(), e.getClass().getName());
Expand All @@ -231,8 +229,8 @@ private List<Long> listTopNConnectClusterIdList(Long clusterPhyId, Integer topN)
.collect(Collectors.toList());
}

protected List<MetricMultiLinesVO> metricMap2VO(Long connectClusterId,
Map<String/*metric*/, Map<Long, List<MetricPointVO>>> map){
private List<MetricMultiLinesVO> metricMap2VO(Long connectClusterId,
Map<String/*metric*/, Map<Long, List<MetricPointVO>>> map){
List<MetricMultiLinesVO> multiLinesVOS = new ArrayList<>();
if (map == null || map.isEmpty()) {
// 如果为空,则直接返回
Expand Down
Loading