Skip to content

Commit 5e5135f

Browse files
committed
fix stream events
Signed-off-by: sameeh.jubran <[email protected]>
1 parent b6394d2 commit 5e5135f

File tree

4 files changed

+738
-62
lines changed

4 files changed

+738
-62
lines changed

internal/ebpf/stream_events.go

Lines changed: 224 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -10,67 +10,257 @@ import (
1010

1111
type StreamEventsArgs struct {
1212
Source struct {
13-
MapID int `json:"map_id"`
14-
Type string `json:"type"`
13+
ProgramID int `json:"program_id,omitempty"`
14+
LinkID int `json:"link_id,omitempty"`
15+
MapID int `json:"map_id,omitempty"`
16+
Type string `json:"type,omitempty"`
1517
} `json:"source"`
16-
SessionID string `json:"session_id"`
17-
DurationMs int `json:"duration_ms"`
18-
MaxEvents int `json:"max_events"`
19-
Filters map[string]interface{} `json:"filters"`
20-
Format string `json:"format"`
18+
Duration int `json:"duration,omitempty"` // seconds
19+
DurationMs int `json:"duration_ms,omitempty"` // milliseconds
20+
MaxEvents int `json:"max_events,omitempty"`
21+
Filters map[string]interface{} `json:"filters,omitempty"`
22+
Format string `json:"format,omitempty"`
2123
}
2224

2325
type StreamEventsResult struct {
24-
Events []map[string]interface{}
25-
Stats map[string]interface{}
26-
Complete bool
27-
SessionID string
26+
Events []map[string]interface{} `json:"events"`
27+
Stats map[string]interface{} `json:"stats"`
28+
Complete bool `json:"complete"`
29+
SessionID string `json:"session_id"`
2830
}
2931

3032
func ParseStreamEventsArgs(input map[string]interface{}) (*StreamEventsArgs, error) {
33+
if input == nil {
34+
return nil, errors.New("input cannot be nil")
35+
}
36+
3137
var args StreamEventsArgs
3238
if err := types.StrictUnmarshal(input, &args); err != nil {
33-
return nil, err
39+
return nil, fmt.Errorf("failed to parse stream events args: %w", err)
3440
}
3541
return &args, nil
3642
}
3743

3844
func StreamEvents(args *StreamEventsArgs, emit func(any)) error {
39-
if args.Source.Type != "ringbuf" && args.Source.Type != "perfbuf" {
40-
return errors.New("unsupported map type")
45+
// Validate inputs
46+
if args == nil {
47+
return errors.New("args cannot be nil")
48+
}
49+
if emit == nil {
50+
return errors.New("emit function cannot be nil")
51+
}
52+
53+
// Set default type if not provided
54+
sourceType := args.Source.Type
55+
if sourceType == "" {
56+
sourceType = "ringbuf" // default
57+
}
58+
59+
// Validate type
60+
if sourceType != "ringbuf" && sourceType != "perfbuf" {
61+
return fmt.Errorf("unsupported map type: %s", sourceType)
4162
}
4263

43-
// Simulate event streaming by emitting dummy data
64+
// Determine the source
65+
var mapID int
66+
var sourceDesc string
67+
68+
if args.Source.MapID != 0 {
69+
mapID = args.Source.MapID
70+
sourceDesc = fmt.Sprintf("map %d", mapID)
71+
} else if args.Source.ProgramID != 0 {
72+
// For now, use the program ID as map ID
73+
// (in real implementation, you'd look up the actual map)
74+
mapID = args.Source.ProgramID
75+
sourceDesc = fmt.Sprintf("program %d", args.Source.ProgramID)
76+
} else if args.Source.LinkID != 0 {
77+
// For now, use the link ID as map ID
78+
// (in real implementation, you'd look up the actual map)
79+
mapID = args.Source.LinkID
80+
sourceDesc = fmt.Sprintf("link %d", args.Source.LinkID)
81+
} else {
82+
return errors.New("must specify program_id, link_id, or map_id in source")
83+
}
84+
85+
// Set duration (prefer duration_ms, fall back to duration * 1000)
86+
durationMs := args.DurationMs
87+
if durationMs == 0 && args.Duration > 0 {
88+
durationMs = args.Duration * 1000
89+
}
90+
if durationMs == 0 {
91+
durationMs = 5000 // default 5 seconds
92+
}
93+
94+
// Set max events
95+
maxEvents := args.MaxEvents
96+
if maxEvents == 0 {
97+
maxEvents = 100 // default
98+
}
99+
100+
// Set format
101+
format := args.Format
102+
if format == "" {
103+
format = "json"
104+
}
105+
106+
// Validate format
107+
if format != "json" && format != "raw" && format != "base64" {
108+
return fmt.Errorf("unsupported format: %s", format)
109+
}
110+
111+
// Initialize session
112+
sessionID := fmt.Sprintf("stream-session-%d", time.Now().Unix())
113+
114+
// Start streaming simulation
44115
start := time.Now()
45-
for i := 0; i < args.MaxEvents; i++ {
46-
event := map[string]interface{}{
47-
"timestamp_ns": time.Now().UnixNano(),
48-
"cpu": i % 4,
49-
"pid": 1000 + i,
50-
"comm": fmt.Sprintf("task-%d", i),
51-
"data": map[string]interface{}{
52-
"msg": fmt.Sprintf("event %d", i),
53-
},
116+
eventsGenerated := 0
117+
118+
// Emit initial status
119+
emit(map[string]interface{}{
120+
"type": "status",
121+
"message": fmt.Sprintf("Starting event stream from %s for %dms", sourceDesc, durationMs),
122+
"session_id": sessionID,
123+
"format": format,
124+
})
125+
126+
// Generate events over the duration
127+
eventInterval := time.Duration(durationMs) * time.Millisecond / time.Duration(maxEvents)
128+
if eventInterval < time.Millisecond {
129+
eventInterval = time.Millisecond
130+
}
131+
132+
ticker := time.NewTicker(eventInterval)
133+
defer ticker.Stop()
134+
135+
timeout := time.After(time.Duration(durationMs) * time.Millisecond)
136+
137+
eventLoop:
138+
for eventsGenerated < maxEvents {
139+
select {
140+
case <-timeout:
141+
break eventLoop
142+
case <-ticker.C:
143+
// Generate a mock event
144+
event := generateMockEvent(eventsGenerated, mapID, format)
145+
146+
// Apply filters if any
147+
if shouldIncludeEvent(event, args.Filters) {
148+
emit(map[string]interface{}{
149+
"type": "event",
150+
"event": event,
151+
})
152+
eventsGenerated++
153+
}
54154
}
55-
emit(event)
56-
time.Sleep(time.Millisecond * 10) // simulate delay
57155
}
58156

59-
duration := time.Since(start).Milliseconds()
157+
// Calculate final stats
158+
actualDuration := time.Since(start).Milliseconds()
60159
stats := map[string]interface{}{
61-
"events_received": args.MaxEvents,
62-
"events_dropped": 0,
63-
"duration_ms": duration,
160+
"events_received": eventsGenerated,
161+
"events_dropped": 0,
162+
"duration_ms": actualDuration,
163+
"events_per_second": float64(eventsGenerated) / (float64(actualDuration) / 1000.0),
164+
"source": sourceDesc,
165+
"format": format,
64166
}
65167

66-
emit(map[string]interface{}{
168+
// Emit final result
169+
result := map[string]interface{}{
67170
"success": true,
68171
"tool_version": "1.0.0",
69-
"session_id": args.SessionID,
70-
"events": []interface{}{},
172+
"session_id": sessionID,
71173
"stats": stats,
72174
"complete": true,
73-
})
175+
"message": fmt.Sprintf("Stream completed: %d events in %dms", eventsGenerated, actualDuration),
176+
}
74177

178+
emit(result)
75179
return nil
76180
}
181+
182+
func generateMockEvent(index, mapID int, format string) map[string]interface{} {
183+
timestamp := time.Now().UnixNano()
184+
185+
baseEvent := map[string]interface{}{
186+
"timestamp_ns": timestamp,
187+
"cpu": index % 4,
188+
"pid": 1000 + index,
189+
"tid": 1000 + index,
190+
"comm": fmt.Sprintf("task-%d", index),
191+
"map_id": mapID,
192+
"index": index,
193+
}
194+
195+
switch format {
196+
case "json":
197+
baseEvent["data"] = map[string]interface{}{
198+
"syscall": "execve",
199+
"args": []string{"/bin/ls", "-la"},
200+
"exit_code": 0,
201+
"message": fmt.Sprintf("kprobe event %d from map %d", index, mapID),
202+
}
203+
case "raw":
204+
baseEvent["raw_data"] = fmt.Sprintf("raw_event_%d_map_%d_ts_%d", index, mapID, timestamp)
205+
case "base64":
206+
// Simulate base64 encoded data
207+
data := fmt.Sprintf("event_%d_map_%d", index, mapID)
208+
baseEvent["data_base64"] = data // In real implementation, this would be base64 encoded
209+
}
210+
211+
return baseEvent
212+
}
213+
214+
func shouldIncludeEvent(event map[string]interface{}, filters map[string]interface{}) bool {
215+
if filters == nil {
216+
return true
217+
}
218+
219+
// Apply event type filter
220+
if eventTypes, ok := filters["event_types"].([]interface{}); ok {
221+
if len(eventTypes) > 0 {
222+
// For mock events, we'll assume they're all "kprobe" type
223+
hasKprobe := false
224+
for _, et := range eventTypes {
225+
if et == "kprobe" {
226+
hasKprobe = true
227+
break
228+
}
229+
}
230+
if !hasKprobe {
231+
return false
232+
}
233+
}
234+
}
235+
236+
// Apply PID filter
237+
if targetPids, ok := filters["target_pids"].([]interface{}); ok {
238+
if len(targetPids) > 0 {
239+
eventPid, hasPid := event["pid"].(int)
240+
if hasPid {
241+
pidMatch := false
242+
for _, pid := range targetPids {
243+
if pidInt, ok := pid.(int); ok && pidInt == eventPid {
244+
pidMatch = true
245+
break
246+
}
247+
}
248+
if !pidMatch {
249+
return false
250+
}
251+
}
252+
}
253+
}
254+
255+
// Apply timestamp filter
256+
if minTimestamp, ok := filters["min_timestamp"].(float64); ok {
257+
if eventTs, hasTs := event["timestamp_ns"].(int64); hasTs {
258+
if float64(eventTs) < minTimestamp {
259+
return false
260+
}
261+
}
262+
}
263+
264+
// Apply max events filter (handled in main loop)
265+
return true
266+
}

0 commit comments

Comments
 (0)