-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathinit-collectors.js
More file actions
69 lines (64 loc) · 2.12 KB
/
init-collectors.js
File metadata and controls
69 lines (64 loc) · 2.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import { makeFetcher } from "./effects/fetchers.js"
import { makeMonitorStream } from "./effects/monitor-stream.js"
import { makeNdjsonWriter } from "./effects/ndjson-writer.js"
import { startCollector } from "./epics/collector-rx.js"
const MONITOR = "monitor"
const stoppers = {}
const startMonitor = (cfg) => {
const nd = makeNdjsonWriter({
dataDir: cfg.server.data_dir,
filePrefix: MONITOR
})
const monitorEpic = cfg.epics.find(e => e.name === MONITOR)
const sink = {
appendRows: async rows => {
await nd.appendRows(rows)
console.info(`[${monitorEpic.name}] wrote ${rows.length} logs to ${cfg.server.data_dir}/`)
},
close: nd.close
}
const stream$ = makeMonitorStream(async logs => {
await sink.appendRows(logs)
}, monitorEpic)
const subscription = stream$.subscribe({
next: logs => console.info(`[${monitorEpic.name}] monitor cycle complete (${logs.length} logs)`),
error: err => console.error(`[${monitorEpic.name}] monitor error:`, err),
complete: () => console.info(`[${monitorEpic.name}] monitor completed`),
})
stoppers[monitorEpic.name] = async () => {
console.info(`[${monitorEpic.name}] stopping monitor...`)
subscription.unsubscribe()
await sink.close()
}
}
const stopMonitor = async () => await stoppers[MONITOR]()
const setupCollectors = async (client, cfg) => {
const fetcher = makeFetcher(client)
const stoppers = {}
await Promise.all(cfg.epics
.filter(f => f.name !== MONITOR && fetcher[f.type])
.map(async f => {
const fn = fetcher[f.type]
const nd = makeNdjsonWriter({
dataDir: cfg.server.data_dir,
filePrefix: f.file_prefix || f.name
})
const sink = {
appendRows: async rows => nd.appendRows(rows),
close: nd.close
}
const rows = await fn()
await sink.appendRows(rows)
stoppers[f.name] = startCollector({
name: f.name,
pollMs: f.poll_ms,
fetch: fn,
writer: sink,
batchMs: cfg.collector.batch_ms,
batchMax: cfg.collector.batch_max
})
})
)
return stoppers
}
export { setupCollectors, startMonitor, stopMonitor }