Skip to content

Commit 6d36bc1

Browse files
Poll ES stats and log same
1 parent ca85f91 commit 6d36bc1

File tree

3 files changed

+122
-1
lines changed

3 files changed

+122
-1
lines changed

graphql-api/src/app.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ import logger from './logger'
1010

1111
import { loadWhitelist } from './whitelist'
1212

13+
import startEsStatsPolling from './esPoll'
14+
15+
const STATS_POLL_INTERVAL = 5000
16+
1317
process.on('uncaughtException', (error) => {
1418
logger.error(error)
1519
process.exit(1)
@@ -83,4 +87,6 @@ const context = { esClient }
8387

8488
app.use('/api/', graphQLApi({ context }))
8589

90+
startEsStatsPolling(STATS_POLL_INTERVAL)
91+
8692
app.listen(config.PORT)

graphql-api/src/elasticsearch.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ if (config.ELASTICSEARCH_USERNAME || config.ELASTICSEARCH_PASSWORD) {
2525
}
2626
}
2727

28-
const elastic = new elasticsearch.Client(elasticsearchConfig)
28+
export const createUnlimitedElasticClient = () => new elasticsearch.Client(elasticsearchConfig)
29+
const elastic = createUnlimitedElasticClient()
2930

3031
const esLimiter = new Bottleneck({
3132
maxConcurrent: config.MAX_CONCURRENT_ELASTICSEARCH_REQUESTS,

graphql-api/src/esPoll.ts

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
import logger from './logger'
2+
3+
const GC_POOL_NAMES = ['old', 'survivor', 'young']
4+
const GC_METRICS = ['max_in_bytes', 'peak_max_in_bytes', 'peak_used_in_bytes', 'used_in_bytes']
5+
const LOAD_AVERAGE_METRICS = ['1m', '5m', '15m']
6+
7+
import { createUnlimitedElasticClient } from './elasticsearch'
8+
const isFulfilled = <T>(promise: PromiseSettledResult<T>): promise is PromiseFulfilledResult<T> =>
9+
promise.status === 'fulfilled'
10+
11+
const isRejected = (promise: PromiseSettledResult<unknown>): promise is PromiseRejectedResult =>
12+
!isFulfilled(promise)
13+
14+
const hasNonzeroTotalMetrics = (indexData: any) => {
15+
const searchMetrics = indexData?.['total']?.['search'] || {}
16+
const getMetrics = indexData?.['total']?.['get'] || {}
17+
return (
18+
Object.keys(searchMetrics).some((key) => key.endsWith('_total') && searchMetrics[key] > 0) ||
19+
Object.keys(getMetrics).some((key) => key.endsWith('_total') && getMetrics[key] > 0)
20+
)
21+
}
22+
23+
const filterIndexStats = (indexStatsPromise: PromiseSettledResult<any>) => {
24+
if (isRejected(indexStatsPromise)) {
25+
return {}
26+
}
27+
// Internal indices start with a dot, we don't want those as a rule
28+
const indexData = indexStatsPromise.value?.indices || {}
29+
const indexNames = Object.keys(indexData)
30+
const nonInternalIndexNames = indexNames.filter((indexName) => indexName[0] !== '.')
31+
32+
// Also ignore indices that aren't getting queried (we should clean
33+
// these up)
34+
const indicesWithMetrics = nonInternalIndexNames.filter((indexName) =>
35+
hasNonzeroTotalMetrics(indexData?.[indexName])
36+
)
37+
38+
const allResult = indexStatsPromise.value?.['_all']?.['total']
39+
let result: any = { _all: allResult }
40+
indicesWithMetrics.forEach((indexName) => (result[indexName] = indexData[indexName]['total']))
41+
return result
42+
}
43+
44+
const filterNodeStats = (nodeStatsPromise: PromiseSettledResult<any>) => {
45+
if (isRejected(nodeStatsPromise)) {
46+
return {}
47+
}
48+
const allNodesData = nodeStatsPromise.value?.['nodes'] || null
49+
if (!allNodesData) {
50+
return {}
51+
}
52+
const nodeNames = Object.keys(allNodesData)
53+
const memKeys = Object.keys(allNodesData[nodeNames[0]]?.jvm?.mem)
54+
const cpuKeys = Object.keys(allNodesData[nodeNames[0]]?.os?.cpu)
55+
const deepMemData = nodeNames.map((nodeName) => allNodesData[nodeName]?.jvm?.mem)
56+
const deepCpuData = nodeNames.map((nodeName) => allNodesData[nodeName]?.os?.cpu)
57+
58+
const wideMemData = Object.fromEntries(
59+
memKeys.map((memKey) => [
60+
memKey,
61+
deepMemData.map((deepMemDataForNode) => deepMemDataForNode[memKey]),
62+
])
63+
)
64+
65+
const wideCpuData = Object.fromEntries(
66+
cpuKeys.map((cpuKey) => [
67+
cpuKey,
68+
deepCpuData.map((deepCpuDataForNode) => deepCpuDataForNode[cpuKey]),
69+
])
70+
)
71+
72+
let { pools, ...memData }: any = {
73+
...wideMemData,
74+
}
75+
76+
GC_POOL_NAMES.forEach((poolName) => {
77+
GC_METRICS.forEach((metric) => {
78+
memData[`${poolName}_${metric}`] = pools.map(
79+
(poolsForNode: any) => poolsForNode[poolName][metric]
80+
)
81+
})
82+
})
83+
84+
let { load_average, ...cpuData }: any = { ...wideCpuData }
85+
LOAD_AVERAGE_METRICS.forEach((metric) => {
86+
cpuData[`load_average_${metric}`] = load_average.map(
87+
(loadAverageForNode: any) => loadAverageForNode[metric]
88+
)
89+
})
90+
91+
return { nodeNames, jvm: { mem: memData }, os: { cpu: cpuData } }
92+
}
93+
94+
const startEsStatsPolling = (pollInterval: number) => {
95+
const statsEsClient = createUnlimitedElasticClient()
96+
const scheduleEsStatsPoll = async () => {
97+
const nodeStatsPromise = statsEsClient.nodes.stats({}).then((stats) => stats.body)
98+
const indexStatsPromise = statsEsClient.indices
99+
.stats({ metric: ['get', 'search'] })
100+
.then((stats) => stats.body)
101+
102+
Promise.allSettled([indexStatsPromise, nodeStatsPromise]).then(([indexStats, nodeStats]) => {
103+
logger.info({ esMetrics: { type: 'index', indexStats: filterIndexStats(indexStats) } })
104+
logger.info({
105+
esMetrics: { type: 'node', nodeStats: filterNodeStats(nodeStats) },
106+
})
107+
setTimeout(scheduleEsStatsPoll, pollInterval)
108+
})
109+
}
110+
111+
scheduleEsStatsPoll()
112+
}
113+
114+
export default startEsStatsPolling

0 commit comments

Comments
 (0)