Skip to content

Commit c950a4c

Browse files
byPixelTVCopilot
andcommitted
feat: add time range validation and historical data querying
- Introduced CalculateStepForTimeRange function for real-time updates without maxDataPoints. - Added QueryDataPointsWithStep function to fetch full aggregated series for fixed steps. - Implemented ValidateTimeRange and QueryHistoricalDataPoints for time range validation and querying. - Enhanced websocket handling with metrics for initial data latency, live fanout, and event counts. - Updated websocket tests to cover new subscription and data handling logic. Co-authored-by: Copilot <copilot@github.com>
1 parent e9697a8 commit c950a4c

9 files changed

Lines changed: 1124 additions & 103 deletions

File tree

data/query.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,35 @@ func CalculateOptimalStep(start string, maxDataPoints int) (string, error) {
114114
return CalculateOptimalStepWithMin(start, maxDataPoints, 10)
115115
}
116116

117+
// CalculateStepForTimeRange calculates the aggregation step for a time range without needing maxDataPoints
118+
// Used for quick step determination in real-time updates
119+
func CalculateStepForTimeRange(timeRange string) (string, error) {
120+
rangeInMinutes, err := timeToMinutes(timeRange)
121+
if err != nil {
122+
return "", err
123+
}
124+
rangeInMinutes = math.Abs(rangeInMinutes)
125+
126+
// Use sensible defaults for different ranges
127+
var stepMinutes float64
128+
switch {
129+
case rangeInMinutes <= 60: // <= 1 hour
130+
stepMinutes = 10.0 / 60.0 // 10 seconds
131+
case rangeInMinutes <= 360: // <= 6 hours
132+
stepMinutes = 1.0 // 1 minute
133+
case rangeInMinutes <= 1440: // <= 1 day
134+
stepMinutes = 4.0 // 4 minutes
135+
case rangeInMinutes <= 7200: // <= 5 days
136+
stepMinutes = 20.0 // 20 minutes
137+
case rangeInMinutes <= 10080: // <= 7 days
138+
stepMinutes = 30.0 // 30 minutes
139+
default:
140+
stepMinutes = 60.0 // 1 hour
141+
}
142+
143+
return minutesToTime(stepMinutes), nil
144+
}
145+
117146
// CalculateOptimalStepWithMin calculates the optimal step for a given time range
118147
// Uses adaptive strategy to ensure meaningful data points even with sparse data
119148
func CalculateOptimalStepWithMin(start string, maxDataPoints, minDataPoints int) (string, error) {

data/servers.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,67 @@ func QueryDataPoints(ip string, duration string) ([]ServerDataPoint, string, err
127127
return points, step, nil
128128
}
129129

130+
// QueryDataPointsWithStep returns the full aggregated series for a fixed step without additional downsampling.
131+
// This is used for websocket initial_data payloads where the frontend should receive the exact series
132+
// that matches the chosen chart range.
133+
func QueryDataPointsWithStep(ip string, duration string, step string) ([]ServerDataPoint, string, error) {
134+
queryApi := database.InfluxClient.QueryAPI(os.Getenv("INFLUXDB_ORG"))
135+
136+
query, err := BuildInfluxQuery(duration, step, ip)
137+
if err != nil {
138+
return nil, step, fmt.Errorf("failed to build query: %w", err)
139+
}
140+
141+
result, err := queryApi.Query(context.Background(), query)
142+
if err != nil {
143+
return nil, step, fmt.Errorf("query execution failed: %w", err)
144+
}
145+
defer func() { _ = result.Close() }()
146+
147+
var dataPoints []ServerDataPoint
148+
for result.Next() {
149+
record := result.Record()
150+
if record == nil {
151+
continue
152+
}
153+
154+
playerCount, ok := recordValueToInt(record.Value())
155+
if !ok {
156+
continue
157+
}
158+
159+
point := ServerDataPoint{
160+
Timestamp: record.Time().Unix(),
161+
PlayerCount: playerCount,
162+
Ip: record.ValueByKey("ip").(string),
163+
Name: record.ValueByKey("name").(string),
164+
}
165+
166+
if ip == "" || point.Ip == ip {
167+
dataPoints = append(dataPoints, point)
168+
}
169+
}
170+
171+
if result.Err() != nil {
172+
return nil, step, fmt.Errorf("result error: %w", result.Err())
173+
}
174+
175+
sort.Slice(dataPoints, func(i, j int) bool {
176+
if dataPoints[i].Timestamp != dataPoints[j].Timestamp {
177+
return dataPoints[i].Timestamp < dataPoints[j].Timestamp
178+
}
179+
if dataPoints[i].PlayerCount != dataPoints[j].PlayerCount {
180+
return dataPoints[i].PlayerCount < dataPoints[j].PlayerCount
181+
}
182+
if dataPoints[i].Ip != dataPoints[j].Ip {
183+
return dataPoints[i].Ip < dataPoints[j].Ip
184+
}
185+
return dataPoints[i].Name < dataPoints[j].Name
186+
})
187+
188+
return dataPoints, step, nil
189+
}
190+
130191
func durationLongerThanADay(duration string) bool {
131192
rangeInMinutes, err := timeToMinutes(duration)
132193
if err != nil {

data/time_range.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package data
2+
3+
import (
4+
"MineTracker/database"
5+
"context"
6+
"fmt"
7+
"os"
8+
"sort"
9+
"strings"
10+
)
11+
12+
var allowedTimeRanges = map[string]string{
13+
"1h": "10s",
14+
"6h": "1m",
15+
"12h": "2m",
16+
"24h": "4m",
17+
"7d": "30m",
18+
"30d": "2h",
19+
"1y": "1d",
20+
}
21+
22+
// ValidateTimeRange returns the canonical time range and the exact aggregation step.
23+
func ValidateTimeRange(timeRange string) (string, string, error) {
24+
normalized := strings.TrimSpace(timeRange)
25+
step, ok := allowedTimeRanges[normalized]
26+
if !ok {
27+
return "", "", fmt.Errorf("unsupported time range: %s", timeRange)
28+
}
29+
return normalized, step, nil
30+
}
31+
32+
// QueryHistoricalDataPoints queries the historical series for a validated time range.
33+
// The returned step is the exact aggregation step used in the query.
34+
func QueryHistoricalDataPoints(ip string, timeRange string) ([]ServerDataPoint, string, error) {
35+
canonicalRange, step, err := ValidateTimeRange(timeRange)
36+
if err != nil {
37+
return nil, "", err
38+
}
39+
40+
query, _, resolvedStep, err := BuildInfluxQueryFromParams(QueryParams{
41+
Start: "-" + canonicalRange,
42+
Step: step,
43+
ServerFilter: ip,
44+
MaxDataPoints: 5000,
45+
MinDataPoints: 1,
46+
UseAdaptive: false,
47+
})
48+
if err != nil {
49+
return nil, "", err
50+
}
51+
52+
queryApi := database.InfluxClient.QueryAPI(os.Getenv("INFLUXDB_ORG"))
53+
result, err := queryApi.Query(context.Background(), query)
54+
if err != nil {
55+
return nil, "", fmt.Errorf("query execution failed: %w", err)
56+
}
57+
defer func() { _ = result.Close() }()
58+
59+
points := make([]ServerDataPoint, 0, 512)
60+
for result.Next() {
61+
record := result.Record()
62+
if record == nil {
63+
continue
64+
}
65+
66+
playerCount, ok := recordValueToInt(record.Value())
67+
if !ok {
68+
continue
69+
}
70+
71+
point := ServerDataPoint{
72+
Timestamp: record.Time().Unix(),
73+
PlayerCount: playerCount,
74+
Ip: record.ValueByKey("ip").(string),
75+
Name: record.ValueByKey("name").(string),
76+
}
77+
if ip == "" || point.Ip == ip {
78+
points = append(points, point)
79+
}
80+
}
81+
82+
if result.Err() != nil {
83+
return nil, "", fmt.Errorf("result error: %w", result.Err())
84+
}
85+
86+
sort.Slice(points, func(i, j int) bool {
87+
if points[i].Timestamp != points[j].Timestamp {
88+
return points[i].Timestamp < points[j].Timestamp
89+
}
90+
if points[i].PlayerCount != points[j].PlayerCount {
91+
return points[i].PlayerCount < points[j].PlayerCount
92+
}
93+
if points[i].Ip != points[j].Ip {
94+
return points[i].Ip < points[j].Ip
95+
}
96+
return points[i].Name < points[j].Name
97+
})
98+
99+
return points, resolvedStep, nil
100+
}

data/time_range_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package data
2+
3+
import "testing"
4+
5+
func TestValidateTimeRange(t *testing.T) {
6+
tests := []struct {
7+
name string
8+
timeRange string
9+
wantStep string
10+
wantErr bool
11+
}{
12+
{name: "valid 1h", timeRange: "1h", wantStep: "10s"},
13+
{name: "valid 7d", timeRange: "7d", wantStep: "30m"},
14+
{name: "invalid", timeRange: "168h", wantErr: true},
15+
}
16+
17+
for _, tt := range tests {
18+
t.Run(tt.name, func(t *testing.T) {
19+
_, step, err := ValidateTimeRange(tt.timeRange)
20+
if tt.wantErr {
21+
if err == nil {
22+
t.Fatal("expected error")
23+
}
24+
return
25+
}
26+
if err != nil {
27+
t.Fatalf("unexpected error: %v", err)
28+
}
29+
if step != tt.wantStep {
30+
t.Fatalf("step = %q, want %q", step, tt.wantStep)
31+
}
32+
})
33+
}
34+
}

main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"MineTracker/util"
99
"MineTracker/websocket"
1010
"context"
11+
"expvar"
1112
"net/http"
1213
_ "net/http/pprof"
1314
"os"
@@ -87,6 +88,7 @@ func main() {
8788
routes.RegisterGetBulkDatedDataRoute(r)
8889
routes.RegisterGetServers(r)
8990
routes.RegisterGetVersionRoute(r)
91+
r.GET("/debug/vars", gin.WrapH(expvar.Handler()))
9092

9193
r.GET("/ws", func(c *gin.Context) {
9294
websocket.HandleWebSocket(c.Writer, c.Request)

task/pingJob.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -456,15 +456,14 @@ func (j *PingJob) pingServer(server data.PingableServer, pinger serverPinger) {
456456

457457
pc := resp.PlayerCount
458458

459-
websocket.GlobalHub.SendToServer(server.IP, map[string]interface{}{
460-
"type": "data_point_rt",
461-
"data": data.ServerDataPoint{
462-
Timestamp: time.Now().Unix(),
463-
PlayerCount: pc,
464-
Ip: server.IP,
465-
Name: server.Name,
466-
},
467-
})
459+
dataPoint := data.ServerDataPoint{
460+
Timestamp: time.Now().Unix(),
461+
PlayerCount: pc,
462+
Ip: server.IP,
463+
Name: server.Name,
464+
}
465+
466+
websocket.GlobalHub.SendRawDataPoint(server.IP, dataPoint)
468467

469468
serverCacheMu.RLock()
470469
existing, found := serverCacheMap[server.IP]

websocket/metrics.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package websocket
2+
3+
import (
4+
"expvar"
5+
"time"
6+
)
7+
8+
var (
9+
metricInitialDataLatencyMs = expvar.NewInt("ws_initial_data_latency_ms")
10+
metricLiveFanoutMs = expvar.NewInt("ws_live_fanout_ms")
11+
metricSubscriptionCount = expvar.NewInt("ws_subscription_count")
12+
metricDroppedEvents = expvar.NewInt("ws_dropped_events_total")
13+
metricOutOfOrderEvents = expvar.NewInt("ws_out_of_order_events_total")
14+
metricEventsSentTotal = expvar.NewMap("ws_events_sent_total")
15+
)
16+
17+
func recordInitialDataLatency(duration time.Duration) {
18+
metricInitialDataLatencyMs.Set(duration.Milliseconds())
19+
}
20+
21+
func recordLiveFanout(duration time.Duration) {
22+
metricLiveFanoutMs.Set(duration.Milliseconds())
23+
}
24+
25+
func recordSubscriptionAdded() {
26+
metricSubscriptionCount.Add(1)
27+
}
28+
29+
func recordSubscriptionRemoved() {
30+
metricSubscriptionCount.Add(-1)
31+
}
32+
33+
func recordDroppedEvent() {
34+
metricDroppedEvents.Add(1)
35+
}
36+
37+
func recordOutOfOrderEvent() {
38+
metricOutOfOrderEvents.Add(1)
39+
}
40+
41+
func recordEventSent(eventType string) {
42+
metricEventsSentTotal.Add(eventType, 1)
43+
}

0 commit comments

Comments
 (0)