-
Notifications
You must be signed in to change notification settings - Fork 211
Expand file tree
/
Copy pathhandler.go
More file actions
220 lines (199 loc) · 7.19 KB
/
handler.go
File metadata and controls
220 lines (199 loc) · 7.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
package ms
import (
"context"
"database/sql"
"github.com/Ehco1996/ehco/pkg/metric_reader"
)
type NodeMetrics struct {
Timestamp int64 `json:"timestamp"`
CPUUsage float64 `json:"cpu_usage"`
MemoryUsage float64 `json:"memory_usage"`
DiskUsage float64 `json:"disk_usage"`
NetworkIn float64 `json:"network_in"` // bytes per second
NetworkOut float64 `json:"network_out"` // bytes per second
}
type QueryNodeMetricsReq struct {
StartTimestamp int64
EndTimestamp int64
Num int64
// Step buckets samples into N-second windows when > 1, averaging
// every gauge field per bucket. Lets the SPA pull 7d/30d windows
// without dragging back hundreds of thousands of raw points.
Step int64
}
type QueryNodeMetricsResp struct {
TOTAL int `json:"total"`
Data []NodeMetrics `json:"data"`
}
type RuleMetricsData struct {
Timestamp int64 `json:"timestamp"`
Label string `json:"label"`
Remote string `json:"remote"`
PingLatency int64 `json:"ping_latency"`
TCPConnectionCount int64 `json:"tcp_connection_count"`
TCPHandshakeDuration int64 `json:"tcp_handshake_duration"`
TCPNetworkTransmitBytes int64 `json:"tcp_network_transmit_bytes"`
UDPConnectionCount int64 `json:"udp_connection_count"`
UDPHandshakeDuration int64 `json:"udp_handshake_duration"`
UDPNetworkTransmitBytes int64 `json:"udp_network_transmit_bytes"`
}
type QueryRuleMetricsReq struct {
RuleLabel string
Remote string
StartTimestamp int64
EndTimestamp int64
Num int64
// Step keeps the last sample per (label, remote) within each
// N-second bucket. Counter-style fields (transmit bytes) keep
// monotonic semantics so the SPA's delta-on-consecutive-points
// trend math still works after bucketing.
Step int64
}
type QueryRuleMetricsResp struct {
TOTAL int `json:"total"`
Data []RuleMetricsData `json:"data"`
}
func (ms *MetricsStore) AddNodeMetric(ctx context.Context, m *metric_reader.NodeMetrics) error {
defer track(&ms.stats.AddNode)()
_, err := ms.db.ExecContext(ctx, `
INSERT OR REPLACE INTO node_metrics (timestamp, cpu_usage, memory_usage, disk_usage, network_in, network_out)
VALUES (?, ?, ?, ?, ?, ?)
`, m.SyncTime.Unix(), m.CpuUsagePercent, m.MemoryUsagePercent, m.DiskUsagePercent, m.NetworkReceiveBytesRate, m.NetworkTransmitBytesRate)
if err != nil {
return err
}
// INSERT OR REPLACE may collapse duplicates rather than add a row;
// the count is best-effort and is reconciled by recountRows on
// next Vacuum / Truncate / restart.
ms.nodeRows.Add(1)
return nil
}
func (ms *MetricsStore) AddRuleMetric(ctx context.Context, rm *metric_reader.RuleMetrics) error {
defer track(&ms.stats.AddRule)()
tx, err := ms.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback() //nolint:errcheck
stmt, err := tx.PrepareContext(ctx, `
INSERT OR REPLACE INTO rule_metrics
(timestamp, label, remote, ping_latency,
tcp_connection_count, tcp_handshake_duration, tcp_network_transmit_bytes,
udp_connection_count, udp_handshake_duration, udp_network_transmit_bytes)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`)
if err != nil {
return err
}
defer stmt.Close() //nolint:errcheck
var inserted int64
for remote, pingMetric := range rm.PingMetrics {
_, err := stmt.ExecContext(ctx, rm.SyncTime.Unix(), rm.Label, remote, pingMetric.Latency,
rm.TCPConnectionCount[remote], rm.TCPHandShakeDuration[remote], rm.TCPNetworkTransmitBytes[remote],
rm.UDPConnectionCount[remote], rm.UDPHandShakeDuration[remote], rm.UDPNetworkTransmitBytes[remote])
if err != nil {
return err
}
inserted++
}
if err := tx.Commit(); err != nil {
return err
}
// Same caveat as AddNodeMetric: REPLACE collapses, count is
// best-effort, reconciled on Vacuum / Truncate / restart.
ms.ruleRows.Add(inserted)
return nil
}
func (ms *MetricsStore) QueryNodeMetric(ctx context.Context, req *QueryNodeMetricsReq) (*QueryNodeMetricsResp, error) {
defer track(&ms.stats.QueryNode)()
var (
rows *sql.Rows
err error
)
if req.Step > 1 {
// Floor each timestamp to a step-second bucket and average every
// gauge field. Cheaper than rolling a separate downsample table
// for the windows we care about (≤30d).
rows, err = ms.db.QueryContext(ctx, `
SELECT (timestamp/?)*? AS bucket_ts,
AVG(cpu_usage), AVG(memory_usage), AVG(disk_usage),
AVG(network_in), AVG(network_out)
FROM node_metrics
WHERE timestamp >= ? AND timestamp <= ?
GROUP BY bucket_ts
ORDER BY bucket_ts DESC
LIMIT ?
`, req.Step, req.Step, req.StartTimestamp, req.EndTimestamp, req.Num)
} else {
rows, err = ms.db.QueryContext(ctx, `
SELECT timestamp, cpu_usage, memory_usage, disk_usage, network_in, network_out
FROM node_metrics
WHERE timestamp >= ? AND timestamp <= ?
ORDER BY timestamp DESC
LIMIT ?
`, req.StartTimestamp, req.EndTimestamp, req.Num)
}
if err != nil {
return nil, err
}
defer rows.Close() //nolint:errcheck
var resp QueryNodeMetricsResp
for rows.Next() {
var m NodeMetrics
if err := rows.Scan(&m.Timestamp, &m.CPUUsage, &m.MemoryUsage, &m.DiskUsage, &m.NetworkIn, &m.NetworkOut); err != nil {
return nil, err
}
resp.Data = append(resp.Data, m)
}
resp.TOTAL = len(resp.Data)
return &resp, nil
}
func (ms *MetricsStore) QueryRuleMetric(ctx context.Context, req *QueryRuleMetricsReq) (*QueryRuleMetricsResp, error) {
defer track(&ms.stats.QueryRule)()
// Bucketed mode keeps the last sample per (label, remote) inside each
// step-second window. The bytes columns are monotonic counters, so
// last-of-bucket preserves the deltas the SPA computes — averaging
// would smear the curve.
const cols = `timestamp, label, remote, ping_latency,
tcp_connection_count, tcp_handshake_duration, tcp_network_transmit_bytes,
udp_connection_count, udp_handshake_duration, udp_network_transmit_bytes`
whereSQL := "WHERE timestamp >= ? AND timestamp <= ?"
whereArgs := []interface{}{req.StartTimestamp, req.EndTimestamp}
if req.RuleLabel != "" {
whereSQL += " AND label = ?"
whereArgs = append(whereArgs, req.RuleLabel)
}
if req.Remote != "" {
whereSQL += " AND remote = ?"
whereArgs = append(whereArgs, req.Remote)
}
var query string
var args []interface{}
if req.Step > 1 {
query = "SELECT " + cols + " FROM rule_metrics WHERE rowid IN (" +
"SELECT MAX(rowid) FROM rule_metrics " + whereSQL +
" GROUP BY (timestamp/?), label, remote) ORDER BY timestamp DESC LIMIT ?"
args = append(append([]interface{}{}, whereArgs...), req.Step, req.Num)
} else {
query = "SELECT " + cols + " FROM rule_metrics " + whereSQL +
" ORDER BY timestamp DESC LIMIT ?"
args = append(whereArgs, req.Num)
}
rows, err := ms.db.Query(query, args...)
if err != nil {
return nil, err
}
defer rows.Close() //nolint:errcheck
var resp QueryRuleMetricsResp
for rows.Next() {
var m RuleMetricsData
if err := rows.Scan(&m.Timestamp, &m.Label, &m.Remote, &m.PingLatency,
&m.TCPConnectionCount, &m.TCPHandshakeDuration, &m.TCPNetworkTransmitBytes,
&m.UDPConnectionCount, &m.UDPHandshakeDuration, &m.UDPNetworkTransmitBytes); err != nil {
return nil, err
}
resp.Data = append(resp.Data, m)
}
resp.TOTAL = len(resp.Data)
return &resp, nil
}