-
Notifications
You must be signed in to change notification settings - Fork 410
/
Copy patheventcache.go
286 lines (252 loc) · 8.29 KB
/
eventcache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Tetragon
package eventcache
import (
"errors"
"time"
"github.com/cilium/tetragon/api/v1/tetragon"
"github.com/cilium/tetragon/pkg/ktime"
"github.com/cilium/tetragon/pkg/logger"
"github.com/cilium/tetragon/pkg/option"
"github.com/cilium/tetragon/pkg/process"
"github.com/cilium/tetragon/pkg/reader/notify"
"github.com/cilium/tetragon/pkg/server"
)
const (
// Event information was completed without cache retries
NO_EV_CACHE = iota
// Cache retries was triggered in order to complete event information
FROM_EV_CACHE
)
var (
cache *Cache
)
type CacheObj struct {
internal *process.ProcessInternal
event notify.Event
timestamp uint64
startTime uint64
color int
msg notify.Message
}
type Cache struct {
objsChan chan CacheObj
done chan bool
cache []CacheObj
notifier server.Notifier
dur time.Duration
}
var (
ErrFailedToGetPodInfo = errors.New("failed to get pod info from event cache")
ErrFailedToGetProcessInfo = errors.New("failed to get process info from event cache")
ErrFailedToGetParentInfo = errors.New("failed to get parent info from event cache")
ErrFailedToGetAncestorsInfo = errors.New("failed to get ancestors info from event cache")
)
// Generic internal lookup happens when events are received out of order and
// this event was handled before an exec event so it wasn't able to populate
// the process info yet.
func HandleGenericInternal(ev notify.Event, pid uint32, tid *uint32, timestamp uint64) (*process.ProcessInternal, error) {
internal, parent := process.GetParentProcessInternal(pid, timestamp)
var err error
eventType := notify.EventType(ev)
if option.Config.EnableAncestors[option.AncestorsEventTypeMap[eventType]] && internal.NeededAncestors() {
// We do not need to try to recollect all ancestors starting from the immediate parent here,
// if we already collected some of them in previous attempts. So, if we already have a number
// of ancestors collected, we just need to try to resume the collection process from the last
// known ancestor.
tetragonAncestors := ev.GetAncestors()
var nextExecId string
if len(tetragonAncestors) == 0 {
nextExecId = internal.UnsafeGetProcess().ParentExecId
} else {
nextExecId = tetragonAncestors[len(tetragonAncestors)-1].ExecId
}
if ancestors, perr := process.GetAncestorProcessesInternal(nextExecId); perr == nil {
for _, ancestor := range ancestors {
tetragonAncestors = append(tetragonAncestors, ancestor.UnsafeGetProcess())
}
ev.SetAncestors(tetragonAncestors)
} else {
CacheRetries(AncestorsInfo).Inc()
err = ErrFailedToGetAncestorsInfo
}
}
if parent != nil {
ev.SetParent(parent.UnsafeGetProcess())
} else {
CacheRetries(ParentInfo).Inc()
err = ErrFailedToGetParentInfo
}
if internal != nil {
// When we report the per thread fields, take a copy
// of the thread leader from the cache then update the corresponding
// per thread fields.
//
// The cost to get this is relatively high because it requires a
// deep copy of all the fields of the thread leader from the cache in
// order to safely modify them, to not corrupt gRPC streams.
proc := internal.GetProcessCopy()
process.UpdateEventProcessTid(proc, tid)
ev.SetProcess(proc)
} else {
CacheRetries(ProcessInfo).Inc()
err = ErrFailedToGetProcessInfo
}
if err == nil {
return internal, err
}
return nil, err
}
// Generic Event handler without any extra msg specific details or debugging
// so we only need to wait for the internal link to the process context to
// resolve PodInfo. This happens when the msg populates the internal state
// but that event is not fully populated yet.
func HandleGenericEvent(internal *process.ProcessInternal, ev notify.Event, tid *uint32) error {
p := internal.UnsafeGetProcess()
if option.Config.EnableK8s && p.Pod == nil {
CacheRetries(PodInfo).Inc()
return ErrFailedToGetPodInfo
}
// When we report the per thread fields, take a copy
// of the thread leader from the cache then update the corresponding
// per thread fields.
//
// The cost to get this is relatively high because it requires a
// deep copy of all the fields of the thread leader from the cache in
// order to safely modify them, to not corrupt gRPC streams.
proc := internal.GetProcessCopy()
process.UpdateEventProcessTid(proc, tid)
ev.SetProcess(proc)
return nil
}
func (ec *Cache) handleEvents() {
tmp := ec.cache[:0]
for _, event := range ec.cache {
var err error
// If the process wasn't found before the Add(), likely because
// the execve event was processed after this event, lets look it up
// now because it should be available. Otherwise we have a valid
// process and lets copy it across.
if event.internal == nil {
event.internal, err = event.msg.RetryInternal(event.event, event.startTime)
}
if err == nil {
err = event.msg.Retry(event.internal, event.event)
}
if err != nil {
event.color++
if event.color < option.Config.EventCacheNumRetries {
tmp = append(tmp, event)
continue
}
eventType := notify.EventType(event.event).String()
if errors.Is(err, ErrFailedToGetParentInfo) {
failedFetches.WithLabelValues(eventType, ParentInfo.String()).Inc()
} else if errors.Is(err, ErrFailedToGetProcessInfo) {
failedFetches.WithLabelValues(eventType, ProcessInfo.String()).Inc()
} else if errors.Is(err, ErrFailedToGetAncestorsInfo) {
failedFetches.WithLabelValues(eventType, AncestorsInfo.String()).Inc()
} else if errors.Is(err, ErrFailedToGetPodInfo) {
failedFetches.WithLabelValues(eventType, PodInfo.String()).Inc()
}
}
if event.msg.Notify() {
processedEvent := &tetragon.GetEventsResponse{
Event: event.event.Encapsulate(),
Time: ktime.ToProto(event.timestamp),
}
ec.notifier.NotifyListener(event.msg, processedEvent)
}
}
ec.cache = tmp
}
func (ec *Cache) loop() {
ticker := time.NewTicker(ec.dur)
defer ticker.Stop()
for {
select {
case <-ticker.C:
/* Every 'option.Config.EventCacheRetryDelay' seconds walk the slice of events
* pending pod info. If an event hasn't completed its podInfo after two iterations
* send the event anyways.
*/
ec.handleEvents()
case event := <-ec.objsChan:
cacheInserts.Inc()
ec.cache = append(ec.cache, event)
case <-ec.done:
return
}
}
}
// We handle two race conditions here one where the event races with
// a Tetragon execve event and the other -- much more common -- where we
// race with K8s watcher
// case 1 (execve race):
//
// Its possible to receive this Tetragon event before the process event cache
// has been populated with a Tetragon execve event. In this case we need to
// cache the event until the process cache is populated.
//
// case 2 (k8s watcher race):
//
// Its possible to receive an event before the k8s watcher receives the
// podInfo event and populates the local cache. If we expect podInfo,
// indicated by having a nonZero dockerID we cache the event until the
// podInfo arrives.
func (ec *Cache) Needed(proc *tetragon.Process) bool {
if proc == nil {
return true
}
if option.Config.EnableK8s {
if proc.Docker != "" && proc.Pod == nil {
return true
}
}
if proc.Binary == "" {
return true
}
return false
}
func (ec *Cache) NeededAncestors(parent *process.ProcessInternal, ancestors []*process.ProcessInternal) bool {
if parent.NeededAncestors() {
if len(ancestors) == 0 {
return true
}
if ancestors[len(ancestors)-1].UnsafeGetProcess().Pid.Value > 2 {
return true
}
}
return false
}
func (ec *Cache) Add(internal *process.ProcessInternal,
e notify.Event,
t uint64,
s uint64,
msg notify.Message) {
ec.objsChan <- CacheObj{internal: internal, event: e, timestamp: t, startTime: s, msg: msg}
}
func NewWithTimer(n server.Notifier, dur time.Duration) *Cache {
if cache != nil {
cache.done <- true
}
logger.GetLogger().WithField("retries", option.Config.EventCacheNumRetries).WithField("delay", dur).Info("Creating new EventCache")
cache = &Cache{
objsChan: make(chan CacheObj),
done: make(chan bool),
cache: make([]CacheObj, 0),
notifier: n,
dur: dur,
}
go cache.loop()
return cache
}
func New(n server.Notifier) *Cache {
return NewWithTimer(n, time.Second*time.Duration(option.Config.EventCacheRetryDelay))
}
func Get() *Cache {
return cache
}
func (ec *Cache) len() int {
return len(ec.cache)
}