Skip to content

Commit 707675d

Browse files
committed
[10882] Add server map module based on redis-timeseries
1 parent 9ccc3fd commit 707675d

File tree

68 files changed

+3393
-54
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+3393
-54
lines changed

collector/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,12 @@
222222
<artifactId>jakarta.annotation-api</artifactId>
223223
</dependency>
224224

225+
226+
<dependency>
227+
<groupId>com.navercorp.pinpoint</groupId>
228+
<artifactId>pinpoint-redis-timeseries</artifactId>
229+
<version>3.1.0-SNAPSHOT</version>
230+
</dependency>
225231
</dependencies>
226232

227233
<build>

collector/src/main/java/com/navercorp/pinpoint/collector/PinpointCollectorModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.navercorp.pinpoint.collector;
22

33

4+
import com.navercorp.pinpoint.collector.applicationmap.ApplicationMapModule;
45
import com.navercorp.pinpoint.collector.config.ClusterModule;
56
import com.navercorp.pinpoint.collector.config.CollectorCommonConfiguration;
67
import com.navercorp.pinpoint.collector.config.CollectorConfiguration;
@@ -38,6 +39,7 @@
3839
RealtimeCollectorModule.class,
3940

4041
CollectorPinpointIdCacheConfiguration.class,
42+
ApplicationMapModule.class,
4143
})
4244
@ComponentScan(basePackages = {
4345
"com.navercorp.pinpoint.collector.handler",
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2024 NAVER Corp.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.navercorp.pinpoint.collector.applicationmap;
17+
18+
import org.springframework.context.annotation.ComponentScan;
19+
import org.springframework.context.annotation.Configuration;
20+
21+
/**
22+
* @author intr3p1d
23+
*/
24+
@Configuration(proxyBeanMethods = false)
25+
@ComponentScan(basePackages = {
26+
"com.navercorp.pinpoint.collector.applicationmap",
27+
"com.navercorp.pinpoint.collector.applicationmap.dao",
28+
"com.navercorp.pinpoint.collector.applicationmap.redis",
29+
"com.navercorp.pinpoint.collector.applicationmap.service",
30+
})
31+
public class ApplicationMapModule {
32+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2024 NAVER Corp.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.navercorp.pinpoint.collector.applicationmap.dao;
17+
18+
import com.navercorp.pinpoint.collector.dao.CachedStatisticsDao;
19+
import com.navercorp.pinpoint.common.trace.ServiceType;
20+
21+
/**
22+
* @author intr3p1d
23+
*/
24+
public interface InboundDao extends CachedStatisticsDao {
25+
// src -> dest
26+
// inbound (rowKey dest <- columnName src)
27+
// outbound (rowKey src -> columnName dest)
28+
void update(
29+
long requestTime,
30+
int srcServiceId, String srcApplicationName, ServiceType srcApplicationType,
31+
int destServiceId, String destApplicationName, ServiceType destApplicationType,
32+
String srcHost, int elapsed, boolean isError
33+
);
34+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2024 NAVER Corp.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.navercorp.pinpoint.collector.applicationmap.dao;
17+
18+
import com.navercorp.pinpoint.collector.dao.CachedStatisticsDao;
19+
import com.navercorp.pinpoint.common.trace.ServiceType;
20+
21+
/**
22+
* @author intr3p1d
23+
*/
24+
public interface OutboundDao extends CachedStatisticsDao {
25+
// src -> dest
26+
// inbound (rowKey dest <- columnName src)
27+
// outbound (rowKey src -> columnName dest)
28+
void update(
29+
long requestTime,
30+
int srcServiceId, String srcApplicationName, ServiceType srcApplicationType,
31+
int destServiceId, String destApplicationName, ServiceType destApplicationType,
32+
String srcHost, int elapsed, boolean isError
33+
);
34+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright 2024 NAVER Corp.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.navercorp.pinpoint.collector.applicationmap.dao;
17+
18+
19+
import com.navercorp.pinpoint.collector.dao.CachedStatisticsDao;
20+
import com.navercorp.pinpoint.common.trace.ServiceType;
21+
22+
/**
23+
* @author intr3p1d
24+
*/
25+
public interface SelfDao extends CachedStatisticsDao {
26+
void received(long requestTime, int serviceId, String applicationName, ServiceType applicationType, int elapsed, boolean isError);
27+
void updatePing(long requestTime, int serviceId, String applicationName, ServiceType applicationType, int elapsed, boolean isError);
28+
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Copyright 2024 NAVER Corp.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.navercorp.pinpoint.collector.applicationmap.redis;
17+
18+
import com.navercorp.pinpoint.collector.applicationmap.dao.InboundDao;
19+
import com.navercorp.pinpoint.collector.applicationmap.redis.schema.ApplicationMapTable;
20+
import com.navercorp.pinpoint.collector.applicationmap.redis.schema.TimeSeriesKey;
21+
import com.navercorp.pinpoint.collector.applicationmap.redis.schema.TimeSeriesValue;
22+
import com.navercorp.pinpoint.collector.dao.hbase.IgnoreStatFilter;
23+
import com.navercorp.pinpoint.collector.dao.hbase.statistics.MapLinkConfiguration;
24+
import com.navercorp.pinpoint.collector.applicationmap.redis.statistics.RedisBulkWriter;
25+
import com.navercorp.pinpoint.common.server.util.ApplicationMapStatisticsUtils;
26+
import com.navercorp.pinpoint.common.trace.HistogramSchema;
27+
import com.navercorp.pinpoint.common.trace.ServiceType;
28+
import org.apache.logging.log4j.LogManager;
29+
import org.apache.logging.log4j.Logger;
30+
import org.springframework.beans.factory.annotation.Qualifier;
31+
import org.springframework.stereotype.Repository;
32+
33+
import java.util.Objects;
34+
35+
/**
36+
* @author intr3p1d
37+
*/
38+
@Repository
39+
public class RedisInboundDao implements InboundDao {
40+
41+
private final Logger logger = LogManager.getLogger(this.getClass());
42+
43+
private final IgnoreStatFilter ignoreStatFilter;
44+
private final RedisBulkWriter bulkWriter;
45+
private final MapLinkConfiguration mapLinkConfiguration;
46+
47+
public RedisInboundDao(
48+
MapLinkConfiguration mapLinkConfiguration,
49+
IgnoreStatFilter ignoreStatFilter,
50+
@Qualifier("inboundBulkWriter") RedisBulkWriter bulkWriter
51+
) {
52+
this.mapLinkConfiguration = Objects.requireNonNull(mapLinkConfiguration, "mapLinkConfiguration");
53+
this.ignoreStatFilter = Objects.requireNonNull(ignoreStatFilter, "ignoreStatFilter");
54+
this.bulkWriter = Objects.requireNonNull(bulkWriter, "inboundBulkWriter");
55+
}
56+
57+
58+
@Override
59+
public void update(
60+
long requestTime,
61+
int srcServiceId, String srcApplicationName, ServiceType srcApplicationType,
62+
int destServiceId, String destApplicationName, ServiceType destApplicationType,
63+
String srcHost, int elapsed, boolean isError
64+
) {
65+
Objects.requireNonNull(srcApplicationName, "srcApplicationName");
66+
Objects.requireNonNull(destApplicationName, "destApplicationName");
67+
68+
if (logger.isDebugEnabled()) {
69+
logger.debug("[Inbound] {} {}({}) <- {} {}({})[{}]",
70+
destServiceId, destApplicationName, destApplicationType,
71+
srcServiceId, srcApplicationName, srcApplicationType, srcHost
72+
);
73+
}
74+
75+
if (ignoreStatFilter.filter(srcApplicationType, srcHost)) {
76+
logger.debug("[Ignore-Inbound] {} {}({}) <- {} {}({})[{}]",
77+
destServiceId, destApplicationName, destApplicationType,
78+
srcServiceId, srcApplicationName, srcApplicationType, srcHost
79+
);
80+
return;
81+
}
82+
83+
final short srcSlotNumber = ApplicationMapStatisticsUtils.getSlotNumber(srcApplicationType, elapsed, isError);
84+
HistogramSchema histogramSchema = srcApplicationType.getHistogramSchema();
85+
86+
// for inbound, main is destination
87+
// and sub is source
88+
final TimeSeriesKey applicationTypeKey = new TimeSeriesKey(
89+
ApplicationMapTable.Inbound, "tenantId",
90+
destServiceId, destApplicationName, destApplicationType.getCode(),
91+
srcServiceId, srcApplicationName, srcApplicationType.getCode(),
92+
srcSlotNumber
93+
);
94+
TimeSeriesValue addOne = new TimeSeriesValue(requestTime);
95+
this.bulkWriter.increment(applicationTypeKey, addOne);
96+
97+
if (mapLinkConfiguration.isEnableAvg()) {
98+
final TimeSeriesKey sumStatKey = new TimeSeriesKey(
99+
ApplicationMapTable.Inbound, "tenantId",
100+
destServiceId, destApplicationName, destApplicationType.getCode(),
101+
srcServiceId, srcApplicationName, srcApplicationType.getCode(),
102+
histogramSchema.getSumStatSlot().getSlotTime()
103+
);
104+
final TimeSeriesValue sumValue = new TimeSeriesValue(requestTime);
105+
this.bulkWriter.increment(sumStatKey, sumValue, elapsed);
106+
}
107+
if (mapLinkConfiguration.isEnableMax()) {
108+
final TimeSeriesKey maxStatKey = new TimeSeriesKey(
109+
ApplicationMapTable.Inbound, "tenantId",
110+
destServiceId, destApplicationName, destApplicationType.getCode(),
111+
srcServiceId, srcApplicationName, srcApplicationType.getCode(),
112+
histogramSchema.getMaxStatSlot().getSlotTime()
113+
);
114+
final TimeSeriesValue maxValue = new TimeSeriesValue(requestTime);
115+
this.bulkWriter.updateMax(maxStatKey, maxValue, elapsed);
116+
}
117+
118+
}
119+
120+
@Override
121+
public void flushLink() {
122+
this.bulkWriter.flushLink();
123+
}
124+
125+
@Override
126+
public void flushAvgMax() {
127+
this.bulkWriter.flushAvgMax();
128+
}
129+
130+
}

0 commit comments

Comments
 (0)