forked from valkey-io/valkey-admin
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinit-collectors.js
More file actions
80 lines (72 loc) · 2.4 KB
/
init-collectors.js
File metadata and controls
80 lines (72 loc) · 2.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import { makeFetcher } from "./effects/fetchers.js"
import { makeMonitorStream } from "./effects/monitor-stream.js"
import { makeNdjsonWriter } from "./effects/ndjson-writer.js"
import { startCollector } from "./epics/collector-rx.js"
import { loadConfig } from "./config.js"
const cfg = loadConfig()
const setupCollectors = async client => {
const fetcher = makeFetcher(client)
const stoppers = {}
// here we start data collection epics per each config with corresponding stat fetchers
for (const f of cfg.epics) {
let fn, sink
const nd = makeNdjsonWriter({
dataDir: cfg.server.data_dir,
filePrefix: f.file_prefix || f.name
})
if(f.type === "monitor"){
sink = {
appendRows: async rows => {
await nd.appendRows(rows)
console.info(`[${f.name}] wrote ${rows.length} logs to ${cfg.server.data_dir}/${f.file_prefix || f.name}`)
},
close: nd.close
}
const stream$ = makeMonitorStream(async logs => {
await sink.appendRows(logs)
}, f)
const subscription = stream$.subscribe({
next: logs => console.info(`[${f.name}] monitor cycle complete (${logs.length} logs)`),
error: err => console.error(`[${f.name}] monitor error:`, err),
complete: () => console.info(`[${f.name}] monitor completed`),
})
stoppers[f.name] = async () => {
console.info(`[${f.name}] stopping monitor...`)
subscription.unsubscribe()
await sink.close()
}
}
else {
fn = fetcher[f.type]
if (!fn) {
console.warn(`unknown epic type ${f.type} for ${f.name}, skipping`)
continue
}
// write NDJSON files; if we need to ingest into memory for some reason — do it here
sink = {
appendRows: async rows => {
await nd.appendRows(rows)
},
close: nd.close
}
// collect the first values immediately
try {
const rows = await fn()
await sink.appendRows(rows)
} catch (e) {
console.error(`[${f.name}] error`, e?.message || e)
}
// then start a corresponding epic to poll on `pollMs` interval
stoppers[f.name] = startCollector({
name: f.name,
pollMs: f.poll_ms,
fetch: fn,
writer: sink,
batchMs: cfg.collector.batch_ms,
batchMax: cfg.collector.batch_max
})
}
}
return stoppers
}
export { setupCollectors }