Skip to content

Commit c26f546

Browse files
authored
Merge pull request #33 from risingglory/fix-30day-query-chunking
Fix 30-day query failures with smart chunking and sampling
2 parents ac7dd20 + feb1e6b commit c26f546

File tree

4 files changed

+134
-37
lines changed

4 files changed

+134
-37
lines changed

backend/internal/handlers/handlers.go

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/gin-gonic/gin"
99
"github.com/rajsinghtech/tsflow/backend/internal/services"
10+
tailscale "tailscale.com/client/tailscale/v2"
1011
)
1112

1213
type Handlers struct {
@@ -107,8 +108,12 @@ func (h *Handlers) GetNetworkLogs(c *gin.Context) {
107108
}
108109

109110
duration := et.Sub(st)
110-
if duration > 24*time.Hour {
111-
chunks, err := h.tailscaleService.GetNetworkLogsChunkedParallel(start, end, 6*time.Hour, 3)
111+
// Use chunking for queries longer than 7 days to prevent response size issues
112+
if duration > 7*24*time.Hour {
113+
// Use smaller chunks and fewer parallel requests for 30+ day queries
114+
chunkSize := 24 * time.Hour // 1-day chunks to prevent timeouts
115+
maxParallel := 2 // Reduce parallel requests to prevent memory issues
116+
chunks, err := h.tailscaleService.GetNetworkLogsChunkedParallel(start, end, chunkSize, maxParallel)
112117
if err != nil {
113118
c.JSON(http.StatusInternalServerError, gin.H{
114119
"error": "Failed to fetch network logs",
@@ -119,25 +124,66 @@ func (h *Handlers) GetNetworkLogs(c *gin.Context) {
119124
}
120125

121126
var allLogs []interface{}
127+
maxLogs := 10000 // Limit total logs to prevent memory issues
128+
122129
for _, chunk := range chunks {
123130
if logsArray, ok := chunk.([]interface{}); ok {
131+
if len(allLogs)+len(logsArray) > maxLogs {
132+
// Truncate if we're approaching the limit
133+
remaining := maxLogs - len(allLogs)
134+
if remaining > 0 {
135+
allLogs = append(allLogs, logsArray[:remaining]...)
136+
}
137+
break
138+
}
124139
allLogs = append(allLogs, logsArray...)
125140
} else if logsMap, ok := chunk.(map[string]interface{}); ok {
126141
if logs, exists := logsMap["logs"]; exists {
127142
if logsArray, ok := logs.([]interface{}); ok {
143+
if len(allLogs)+len(logsArray) > maxLogs {
144+
// Truncate if we're approaching the limit
145+
remaining := maxLogs - len(allLogs)
146+
if remaining > 0 {
147+
allLogs = append(allLogs, logsArray[:remaining]...)
148+
}
149+
break
150+
}
128151
allLogs = append(allLogs, logsArray...)
152+
} else if logsArray, ok := logs.([]tailscale.NetworkFlowLog); ok {
153+
// Convert []NetworkFlowLog to []interface{}
154+
for _, log := range logsArray {
155+
allLogs = append(allLogs, log)
156+
}
129157
}
130158
}
131159
}
132160
}
133-
161+
162+
// If we have too many logs, sample them to prevent response size issues
163+
finalLogs := allLogs
164+
if len(allLogs) > 50000 {
165+
// Sample every Nth log to get approximately 50,000 logs
166+
sampleRate := len(allLogs) / 50000
167+
if sampleRate < 1 {
168+
sampleRate = 1
169+
}
170+
171+
sampledLogs := make([]interface{}, 0, 50000)
172+
for i := 0; i < len(allLogs); i += sampleRate {
173+
sampledLogs = append(sampledLogs, allLogs[i])
174+
}
175+
finalLogs = sampledLogs
176+
}
177+
134178
c.JSON(http.StatusOK, gin.H{
135-
"logs": allLogs,
179+
"logs": finalLogs,
136180
"metadata": gin.H{
137181
"chunked": true,
138182
"chunks": len(chunks),
139183
"duration": duration.String(),
140184
"totalLogs": len(allLogs),
185+
"sampled": len(finalLogs) < len(allLogs),
186+
"sampleRate": len(allLogs) / len(finalLogs),
141187
},
142188
})
143189
return
@@ -152,10 +198,18 @@ func (h *Handlers) GetNetworkLogs(c *gin.Context) {
152198
return
153199
}
154200

155-
log.Printf("SUCCESS GetNetworkLogs: returned logs for %s to %s", start, end)
156201
c.JSON(http.StatusOK, logs)
157202
}
158203

204+
// Helper function to get map keys
205+
func getMapKeys(m map[string]interface{}) []string {
206+
keys := make([]string, 0, len(m))
207+
for k := range m {
208+
keys = append(keys, k)
209+
}
210+
return keys
211+
}
212+
159213
func (h *Handlers) GetNetworkMap(c *gin.Context) {
160214
networkMap, err := h.tailscaleService.GetNetworkMap()
161215
if err != nil {

backend/internal/services/tailscale.go

Lines changed: 48 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func NewTailscaleService(cfg *config.Config) *TailscaleService {
8787
} else if cfg.TailscaleAPIKey != "" {
8888
ts.apiKey = cfg.TailscaleAPIKey
8989
ts.client = &http.Client{
90-
Timeout: 5 * time.Minute,
90+
Timeout: 30 * time.Minute, // Much longer timeout for large requests
9191
}
9292
ts.tsClient = &tailscale.Client{
9393
APIKey: cfg.TailscaleAPIKey,
@@ -96,7 +96,7 @@ func NewTailscaleService(cfg *config.Config) *TailscaleService {
9696
ts.useOAuth = false
9797
} else {
9898
ts.client = &http.Client{
99-
Timeout: 5 * time.Minute,
99+
Timeout: 30 * time.Minute, // Much longer timeout for large requests
100100
}
101101
}
102102

@@ -243,19 +243,27 @@ func (ts *TailscaleService) GetDevices() (*DevicesResponse, error) {
243243
}
244244

245245
func (ts *TailscaleService) GetNetworkLogs(start, end string) (interface{}, error) {
246+
// Parse time range to determine if we need chunking
247+
startTime, err := time.Parse(time.RFC3339, start)
248+
if err != nil {
249+
return nil, fmt.Errorf("invalid start time: %w", err)
250+
}
251+
252+
endTime, err := time.Parse(time.RFC3339, end)
253+
if err != nil {
254+
return nil, fmt.Errorf("invalid end time: %w", err)
255+
}
256+
257+
258+
// For smaller ranges, use the original approach
246259
if ts.tsClient != nil {
247-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
248-
defer cancel()
249-
250-
startTime, err := time.Parse(time.RFC3339, start)
251-
if err != nil {
252-
return nil, fmt.Errorf("invalid start time: %w", err)
253-
}
254-
255-
endTime, err := time.Parse(time.RFC3339, end)
256-
if err != nil {
257-
return nil, fmt.Errorf("invalid end time: %w", err)
260+
// Use much longer timeout for larger time ranges
261+
timeoutDuration := 10 * time.Minute
262+
if endTime.Sub(startTime) > 7*24*time.Hour {
263+
timeoutDuration = 30 * time.Minute // Much longer timeout for 30+ day queries
258264
}
265+
ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration)
266+
defer cancel()
259267

260268
var logs []tailscale.NetworkFlowLog
261269

@@ -271,6 +279,7 @@ func (ts *TailscaleService) GetNetworkLogs(start, end string) (interface{}, erro
271279
return nil, fmt.Errorf("failed to fetch network logs from tailscale client: %w", err)
272280
}
273281

282+
274283
return map[string]interface{}{
275284
"logs": logs,
276285
}, nil
@@ -283,7 +292,12 @@ func (ts *TailscaleService) GetNetworkLogs(start, end string) (interface{}, erro
283292
endpoint += fmt.Sprintf("?start=%s&end=%s", url.QueryEscape(start), url.QueryEscape(end))
284293
}
285294

286-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
295+
// Use much longer timeout for larger time ranges
296+
timeoutDuration := 10 * time.Minute
297+
if endTime.Sub(startTime) > 7*24*time.Hour {
298+
timeoutDuration = 30 * time.Minute // Much longer timeout for 30+ day queries
299+
}
300+
ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration)
287301
defer cancel()
288302

289303
body, err := ts.makeRequest(ctx, endpoint)
@@ -296,7 +310,17 @@ func (ts *TailscaleService) GetNetworkLogs(start, end string) (interface{}, erro
296310
return nil, fmt.Errorf("failed to unmarshal network logs response: %w", err)
297311
}
298312

299-
return response, nil
313+
// Ensure consistent response format
314+
if responseMap, ok := response.(map[string]interface{}); ok {
315+
if logs, exists := responseMap["logs"]; exists {
316+
return map[string]interface{}{
317+
"logs": logs,
318+
}, nil
319+
}
320+
}
321+
return map[string]interface{}{
322+
"logs": response,
323+
}, nil
300324
}
301325

302326
// GetNetworkLogsChunked retrieves network logs in chunks for large time ranges
@@ -660,3 +684,12 @@ func (ts *TailscaleService) GetStaticRecords() (map[string]StaticRecordInfo, err
660684

661685
return response.Records, nil
662686
}
687+
688+
// Helper function to get map keys for debugging
689+
func getMapKeys(m map[string]interface{}) []string {
690+
keys := make([]string, 0, len(m))
691+
for k := range m {
692+
keys = append(keys, k)
693+
}
694+
return keys
695+
}

frontend/src/components/LogViewer.tsx

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -112,35 +112,41 @@ const processLogsWorker = (networkLogs: NetworkFlowLog[], devices: Device[]) =>
112112
const log = logs[i]
113113
const processTraffic = (traffic: TrafficFlow[], type: 'virtual' | 'subnet' | 'physical') => {
114114
traffic.forEach((flow) => {
115-
const srcIP = extractIP(flow.src)
116-
const dstIP = extractIP(flow.dst)
117-
const timestamp = new Date(log.logged)
115+
// Handle both capitalized and lowercase field names
116+
const srcIP = extractIP((flow as any).Src || (flow as any).src)
117+
const dstIP = extractIP((flow as any).Dst || (flow as any).dst)
118+
const timestamp = new Date((log as any).Logged || (log as any).logged)
118119

119120
entries.push({
120-
id: `${log.nodeId}-${entryId++}`,
121-
timestamp: log.logged,
121+
id: `${(log as any).NodeID || (log as any).nodeId}-${entryId++}`,
122+
timestamp: (log as any).Logged || (log as any).logged,
122123
timestampMs: timestamp.getTime(),
123-
nodeId: log.nodeId,
124+
nodeId: (log as any).NodeID || (log as any).nodeId,
124125
srcDevice: getDeviceName(srcIP, devices),
125126
dstDevice: getDeviceName(dstIP, devices),
126127
srcIP,
127128
dstIP,
128-
srcPort: extractPort(flow.src) || undefined,
129-
dstPort: extractPort(flow.dst) || undefined,
130-
protocol: getProtocolName(flow.proto),
129+
srcPort: extractPort((flow as any).Src || (flow as any).src) || undefined,
130+
dstPort: extractPort((flow as any).Dst || (flow as any).dst) || undefined,
131+
protocol: getProtocolName((flow as any).Proto || (flow as any).proto),
131132
trafficType: type,
132-
txBytes: flow.txBytes || 0,
133-
rxBytes: flow.rxBytes || 0,
134-
txPackets: flow.txPackets || 0,
135-
rxPackets: flow.rxPackets || 0,
133+
txBytes: (flow as any).TxBytes || (flow as any).txBytes || 0,
134+
rxBytes: (flow as any).RxBytes || (flow as any).rxBytes || 0,
135+
txPackets: (flow as any).TxPkts || (flow as any).txPackets || 0,
136+
rxPackets: (flow as any).RxPkts || (flow as any).rxPackets || 0,
136137
tags: []
137138
})
138139
})
139140
}
140141

141-
if (log.virtualTraffic) processTraffic(log.virtualTraffic, 'virtual')
142-
if (log.subnetTraffic) processTraffic(log.subnetTraffic, 'subnet')
143-
if (log.physicalTraffic) processTraffic(log.physicalTraffic, 'physical')
142+
// Handle both capitalized and lowercase field names
143+
const virtualTraffic = (log as any).VirtualTraffic || (log as any).virtualTraffic
144+
const subnetTraffic = (log as any).SubnetTraffic || (log as any).subnetTraffic
145+
const physicalTraffic = (log as any).PhysicalTraffic || (log as any).physicalTraffic
146+
147+
if (virtualTraffic) processTraffic(virtualTraffic, 'virtual')
148+
if (subnetTraffic) processTraffic(subnetTraffic, 'subnet')
149+
if (physicalTraffic) processTraffic(physicalTraffic, 'physical')
144150
}
145151

146152
if (endIndex < logs.length) {

frontend/src/pages/NetworkView.tsx

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,11 @@ const NetworkView: React.FC = () => {
354354
}
355355

356356
// Validate that we have actual log entries
357-
return logsArray.filter(log => log && typeof log === 'object' && 'logged' in log)
357+
const validLogs = logsArray.filter(log => {
358+
return log && typeof log === 'object' && ('logged' in log || 'Logged' in log)
359+
})
360+
361+
return validLogs
358362
}, [networkLogsData])
359363

360364
// Set default date range to show most recent data (last 5 minutes)

0 commit comments

Comments
 (0)