Skip to content

Commit b82cf32

Browse files
committed
Add raw flow logs endpoint and filtering capabilities to API; enhance flow data processing and visualization
1 parent 289abc5 commit b82cf32

File tree

5 files changed

+1184
-263
lines changed

5 files changed

+1184
-263
lines changed

client/tailscale.go

Lines changed: 221 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package client
22

33
import (
4+
"crypto/md5"
45
"encoding/json"
56
"fmt"
67
"io"
78
"net/http"
89
"net/url"
10+
"sort"
911
"strings"
1012
"time"
1113
"tsflow/models"
@@ -92,11 +94,12 @@ func (c *TailscaleClient) GetDevices() (*models.DevicesResponse, error) {
9294
return &devicesResponse, nil
9395
}
9496

95-
// ProcessFlowData processes raw network logs into structured flow data with aggregation
97+
// ProcessFlowData processes raw network logs into both raw flow entries and aggregated flow data
9698
func (c *TailscaleClient) ProcessFlowData(logs *models.NetworkLogResponse, devices *models.DevicesResponse) *models.NetworkMap {
9799
deviceMap := make(map[string]*models.Device)
98100
ipToDevice := make(map[string]*models.Device)
99101

102+
// Build device maps
100103
for i := range devices.Devices {
101104
device := &devices.Devices[i]
102105
deviceMap[device.ID] = device
@@ -107,9 +110,11 @@ func (c *TailscaleClient) ProcessFlowData(logs *models.NetworkLogResponse, devic
107110
}
108111
}
109112

113+
var rawFlows []models.RawFlowEntry
110114
flowAggregator := make(map[string]*models.FlowData)
111115
var timeStart, timeEnd time.Time
112116

117+
// Process each log entry to create raw flows
113118
for _, log := range logs.Logs {
114119
if timeStart.IsZero() || log.Start.Before(timeStart) {
115120
timeStart = log.Start
@@ -118,41 +123,224 @@ func (c *TailscaleClient) ProcessFlowData(logs *models.NetworkLogResponse, devic
118123
timeEnd = log.End
119124
}
120125

126+
// Process virtual traffic
121127
for _, flow := range log.VirtualTraffic {
128+
rawFlow := c.createRawFlowEntry(flow, "virtual", log, ipToDevice)
129+
rawFlows = append(rawFlows, rawFlow)
122130
c.aggregateFlow(flow, "virtual", log, ipToDevice, flowAggregator)
123131
}
124132

133+
// Process physical traffic
125134
for _, flow := range log.PhysicalTraffic {
135+
rawFlow := c.createRawFlowEntry(flow, "physical", log, ipToDevice)
136+
rawFlows = append(rawFlows, rawFlow)
126137
c.aggregateFlow(flow, "physical", log, ipToDevice, flowAggregator)
127138
}
128139

140+
// Process subnet traffic
129141
for _, flow := range log.SubnetTraffic {
142+
rawFlow := c.createRawFlowEntry(flow, "subnet", log, ipToDevice)
143+
rawFlows = append(rawFlows, rawFlow)
130144
c.aggregateFlow(flow, "subnet", log, ipToDevice, flowAggregator)
131145
}
132146
}
133147

148+
// Convert aggregated flows to slice and sort by bytes
134149
var flows []models.FlowData
135150
for _, flow := range flowAggregator {
136151
flows = append(flows, *flow)
137152
}
138-
for i := 0; i < len(flows)-1; i++ {
139-
for j := i + 1; j < len(flows); j++ {
140-
if flows[i].TotalBytes < flows[j].TotalBytes {
141-
flows[i], flows[j] = flows[j], flows[i]
142-
}
143-
}
144-
}
153+
154+
// Sort aggregated flows by total bytes (descending)
155+
sort.Slice(flows, func(i, j int) bool {
156+
return flows[i].TotalBytes > flows[j].TotalBytes
157+
})
158+
159+
// Sort raw flows by timestamp (most recent first)
160+
sort.Slice(rawFlows, func(i, j int) bool {
161+
return rawFlows[i].Timestamp.After(rawFlows[j].Timestamp)
162+
})
145163

146164
return &models.NetworkMap{
147-
Devices: devices.Devices,
148-
Flows: flows,
165+
Devices: devices.Devices,
166+
Flows: flows,
167+
RawFlows: rawFlows,
149168
TimeRange: models.TimeWindow{
150169
Start: timeStart,
151170
End: timeEnd,
152171
},
153172
}
154173
}
155174

175+
// createRawFlowEntry creates a raw flow entry from a traffic flow
176+
func (c *TailscaleClient) createRawFlowEntry(flow models.TrafficFlow, flowType string, log models.NetworkLog, ipToDevice map[string]*models.Device) models.RawFlowEntry {
177+
srcIP, srcPort := parseAddress(flow.Src)
178+
dstIP, dstPort := parseAddress(flow.Dst)
179+
180+
srcIP = normalizeIP(srcIP)
181+
dstIP = normalizeIP(dstIP)
182+
183+
// Generate unique ID for this flow entry
184+
flowID := fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s-%s-%s-%s-%s-%d-%d",
185+
log.NodeID, srcIP, dstIP, srcPort, dstPort, log.Start.Unix(), flow.Proto))))
186+
187+
// Determine direction
188+
direction := "bidirectional"
189+
if flow.TxBytes > 0 && flow.RxBytes == 0 {
190+
direction = "outbound"
191+
} else if flow.RxBytes > 0 && flow.TxBytes == 0 {
192+
direction = "inbound"
193+
}
194+
195+
return models.RawFlowEntry{
196+
ID: flowID,
197+
NodeID: log.NodeID,
198+
Timestamp: log.Logged,
199+
StartTime: log.Start,
200+
EndTime: log.End,
201+
SourceDevice: ipToDevice[srcIP],
202+
DestinationDevice: ipToDevice[dstIP],
203+
SourceIP: srcIP,
204+
DestinationIP: dstIP,
205+
SourcePort: srcPort,
206+
DestinationPort: dstPort,
207+
Protocol: getProtocolName(flow.Proto),
208+
ProtocolNumber: flow.Proto,
209+
TxBytes: flow.TxBytes,
210+
RxBytes: flow.RxBytes,
211+
TxPackets: flow.TxPkts,
212+
RxPackets: flow.RxPkts,
213+
TotalBytes: flow.TxBytes + flow.RxBytes,
214+
TotalPackets: flow.TxPkts + flow.RxPkts,
215+
FlowType: flowType,
216+
Direction: direction,
217+
}
218+
}
219+
220+
// FilterRawFlows applies filters to raw flow entries
221+
func (c *TailscaleClient) FilterRawFlows(rawFlows []models.RawFlowEntry, filters models.FlowFilters) []models.RawFlowEntry {
222+
var filtered []models.RawFlowEntry
223+
224+
for _, flow := range rawFlows {
225+
// Port filtering
226+
if len(filters.Ports) > 0 {
227+
portMatch := false
228+
for _, port := range filters.Ports {
229+
if flow.SourcePort == port || flow.DestinationPort == port {
230+
portMatch = true
231+
break
232+
}
233+
}
234+
if !portMatch {
235+
continue
236+
}
237+
}
238+
239+
// Protocol filtering
240+
if len(filters.Protocols) > 0 {
241+
protocolMatch := false
242+
for _, protocol := range filters.Protocols {
243+
if strings.EqualFold(flow.Protocol, protocol) {
244+
protocolMatch = true
245+
break
246+
}
247+
}
248+
if !protocolMatch {
249+
continue
250+
}
251+
}
252+
253+
// Flow type filtering
254+
if len(filters.FlowTypes) > 0 {
255+
flowTypeMatch := false
256+
for _, flowType := range filters.FlowTypes {
257+
if flow.FlowType == flowType {
258+
flowTypeMatch = true
259+
break
260+
}
261+
}
262+
if !flowTypeMatch {
263+
continue
264+
}
265+
}
266+
267+
// Device ID filtering
268+
if len(filters.DeviceIDs) > 0 {
269+
deviceMatch := false
270+
for _, deviceID := range filters.DeviceIDs {
271+
if (flow.SourceDevice != nil && flow.SourceDevice.ID == deviceID) ||
272+
(flow.DestinationDevice != nil && flow.DestinationDevice.ID == deviceID) {
273+
deviceMatch = true
274+
break
275+
}
276+
}
277+
if !deviceMatch {
278+
continue
279+
}
280+
}
281+
282+
// Bytes filtering
283+
if filters.MinBytes > 0 && flow.TotalBytes < filters.MinBytes {
284+
continue
285+
}
286+
if filters.MaxBytes > 0 && flow.TotalBytes > filters.MaxBytes {
287+
continue
288+
}
289+
290+
filtered = append(filtered, flow)
291+
}
292+
293+
// Sort the filtered results
294+
c.sortRawFlows(filtered, filters.SortBy, filters.SortOrder)
295+
296+
// Apply limit
297+
if filters.Limit > 0 && len(filtered) > filters.Limit {
298+
filtered = filtered[:filters.Limit]
299+
}
300+
301+
return filtered
302+
}
303+
304+
// sortRawFlows sorts raw flows based on the specified criteria
305+
func (c *TailscaleClient) sortRawFlows(flows []models.RawFlowEntry, sortBy, sortOrder string) {
306+
if sortBy == "" {
307+
sortBy = "timestamp"
308+
}
309+
if sortOrder == "" {
310+
sortOrder = "desc"
311+
}
312+
313+
ascending := strings.ToLower(sortOrder) == "asc"
314+
315+
sort.Slice(flows, func(i, j int) bool {
316+
var result bool
317+
switch strings.ToLower(sortBy) {
318+
case "timestamp":
319+
result = flows[i].Timestamp.Before(flows[j].Timestamp)
320+
case "bytes":
321+
result = flows[i].TotalBytes < flows[j].TotalBytes
322+
case "packets":
323+
result = flows[i].TotalPackets < flows[j].TotalPackets
324+
case "port":
325+
// Sort by destination port, then source port
326+
if flows[i].DestinationPort != flows[j].DestinationPort {
327+
result = flows[i].DestinationPort < flows[j].DestinationPort
328+
} else {
329+
result = flows[i].SourcePort < flows[j].SourcePort
330+
}
331+
case "protocol":
332+
result = flows[i].Protocol < flows[j].Protocol
333+
default:
334+
result = flows[i].Timestamp.Before(flows[j].Timestamp)
335+
}
336+
337+
if ascending {
338+
return result
339+
}
340+
return !result
341+
})
342+
}
343+
156344
func (c *TailscaleClient) aggregateFlow(flow models.TrafficFlow, flowType string, log models.NetworkLog, ipToDevice map[string]*models.Device, aggregator map[string]*models.FlowData) {
157345
srcIP, _ := parseAddress(flow.Src)
158346
dstIP, dstPort := parseAddress(flow.Dst)
@@ -198,27 +386,46 @@ func (c *TailscaleClient) aggregateFlow(flow models.TrafficFlow, flowType string
198386

199387
func parseAddress(addr string) (ip, port string) {
200388
if strings.HasPrefix(addr, "[") {
389+
// IPv6 address with port: [::1]:8080
201390
if idx := strings.LastIndex(addr, "]:"); idx != -1 {
202391
ip = addr[1:idx]
203392
port = addr[idx+2:]
204393
return
205394
}
395+
// IPv6 address without port: [::1]
206396
ip = strings.Trim(addr, "[]")
207397
return
208398
}
209399

400+
// IPv4 address with port: 192.168.1.1:8080
210401
if idx := strings.LastIndex(addr, ":"); idx != -1 {
211-
ip = addr[:idx]
212-
port = addr[idx+1:]
213-
return
402+
// Check if this is actually a port (numeric)
403+
potentialPort := addr[idx+1:]
404+
if potentialPort != "" && isNumeric(potentialPort) {
405+
ip = addr[:idx]
406+
port = potentialPort
407+
return
408+
}
214409
}
215410

411+
// No port found
216412
ip = addr
217413
return
218414
}
219415

416+
func isNumeric(s string) bool {
417+
for _, r := range s {
418+
if r < '0' || r > '9' {
419+
return false
420+
}
421+
}
422+
return len(s) > 0
423+
}
424+
220425
func getProtocolName(proto int) string {
221426
switch proto {
427+
case 0:
428+
return "proto-0"
222429
case 1:
223430
return "ICMP"
224431
case 6:
@@ -228,7 +435,7 @@ func getProtocolName(proto int) string {
228435
case 255:
229436
return "RAW"
230437
default:
231-
return fmt.Sprintf("Proto-%d", proto)
438+
return fmt.Sprintf("proto-%d", proto)
232439
}
233440
}
234441

0 commit comments

Comments
 (0)