Skip to content

Commit de38965

Browse files
committed
Bugfix: In flight flow calculation was stuck sometimes
If flows crashed before sending any status then the server would not update them. This resulted in repeatedly sending flow status request without making progress.
1 parent cdc4edb commit de38965

File tree

10 files changed

+229
-33
lines changed

10 files changed

+229
-33
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,4 @@ config/ab0x.go
2626

2727
__debug*
2828
debug.test*
29-
enriched.json*
29+
artifacts/testdata/server/hunts/H.*

actions/proto/vql.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ message ClientInfo {
228228
map<string, int64> in_flight_flows = 28;
229229

230230

231-
// A list of indexed metadata fields. There are not all metadata
231+
// A list of indexed metadata fields. These are not all metadata
232232
// fields, only the ones that are important enough to be indexed.
233233
map<string, string> metadata = 29;
234234
}

flows/client_flow_runner.go

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -513,31 +513,38 @@ func (self *ClientFlowRunner) handleUnknwonFlow(
513513
ctx context.Context, client_id, flow_id string,
514514
msg *crypto_proto.FlowStats) error {
515515

516-
flow_path_manager := paths.NewFlowPathManager(client_id, flow_id)
517-
db, err := datastore.GetDB(self.config_obj)
516+
if len(msg.QueryStatus) == 0 {
517+
return nil
518+
}
519+
520+
launcher_service, err := services.GetLauncher(self.config_obj)
518521
if err != nil {
519522
return err
520523
}
521524

522-
// Just a blind write will eventually hit the disk.
523-
stats := &flows_proto.ArtifactCollectorContext{}
524-
err = db.GetSubject(self.config_obj, flow_path_manager.Stats(), stats)
525+
// If we dont know anything about the flow, ignore it.
526+
collection_context, err := launcher_service.Storage().LoadCollectionContext(
527+
ctx, self.config_obj, client_id, flow_id)
525528
if err != nil {
526529
return nil
527530
}
528531

529532
// Mark all the stats as terminated if they are still running.
530-
for _, s := range stats.QueryStats {
531-
if s.Status == crypto_proto.VeloStatus_PROGRESS {
532-
s.Status = crypto_proto.VeloStatus_GENERIC_ERROR
533-
s.ErrorMessage = msg.QueryStatus[0].ErrorMessage
533+
if len(collection_context.QueryStats) == 0 {
534+
collection_context.QueryStats = append(
535+
collection_context.QueryStats, msg.QueryStatus...)
536+
} else {
537+
for _, s := range collection_context.QueryStats {
538+
if s.Status == crypto_proto.VeloStatus_PROGRESS {
539+
s.Status = msg.QueryStatus[0].Status
540+
s.ErrorMessage = msg.QueryStatus[0].ErrorMessage
541+
}
534542
}
535543
}
536544

537-
launcher.UpdateFlowStats(stats)
538-
539-
return db.SetSubjectWithCompletion(self.config_obj,
540-
flow_path_manager.Stats(), stats, nil)
545+
// Update the flow.
546+
return launcher_service.Storage().WriteFlow(ctx, self.config_obj,
547+
collection_context, utils.BackgroundWriter)
541548
}
542549

543550
func (self *ClientFlowRunner) FlowStats(

flows/client_flow_runner_test.go

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package flows_test
33
import (
44
"context"
55
"fmt"
6-
"os"
76
"regexp"
87
"strings"
98
"sync"
@@ -1025,42 +1024,62 @@ func (self *ServerTestSuite) TestCancellation() {
10251024

10261025
// Test an unknown flow. What happens when the server receives a
10271026
// message to an unknown flow.
1027+
1028+
// This message is received in response to the in flight keep
1029+
// alive. If the client crashes, the server will send a status request
1030+
// for the in flight flow. But the client does not know about this
1031+
// flow id. The server will terminate the flow.
10281032
func (self *ServerTestSuite) TestUnknownFlow() {
10291033
t := self.T()
10301034

1031-
db, err := datastore.GetDB(self.ConfigObj)
1032-
require.NoError(t, err)
1035+
closer := utils.SetFlowIdForTests("F.SomeFlow")
1036+
defer closer()
1037+
1038+
// Create a new collection flow
1039+
flow_id, err := self.createArtifactCollection()
1040+
require.NoError(self.T(), err)
1041+
assert.Equal(self.T(), "F.SomeFlow", flow_id)
1042+
1043+
launcher, err := services.GetLauncher(self.ConfigObj)
1044+
assert.NoError(t, err)
1045+
1046+
collection_context, err := launcher.GetFlowDetails(self.Ctx, self.ConfigObj,
1047+
services.GetFlowOptions{}, self.client_id, flow_id)
1048+
assert.NoError(t, err)
1049+
1050+
assert.Equal(self.T(), collection_context.Context.State,
1051+
flows_proto.ArtifactCollectorContext_RUNNING)
10331052

10341053
runner := flows.NewFlowRunner(self.Ctx, self.ConfigObj)
10351054
defer runner.Close(self.Ctx)
10361055

10371056
// Send a message to a random non-existant flow from client.
1038-
flow_id := "F.NONEXISTENT"
10391057
runner.ProcessSingleMessage(
10401058
self.Ctx,
10411059
&crypto_proto.VeloMessage{
10421060
Source: self.client_id,
10431061
SessionId: flow_id,
10441062
FlowStats: &crypto_proto.FlowStats{
10451063
QueryStatus: []*crypto_proto.VeloStatus{
1046-
{Status: crypto_proto.VeloStatus_OK, QueryId: 1},
1064+
{
1065+
Status: crypto_proto.VeloStatus_UNKNOWN_FLOW,
1066+
ErrorMessage: "Unknown flow",
1067+
},
10471068
},
10481069
},
10491070
})
10501071

1051-
// We used to send cancellation message to the client, but this
1052-
// too expensive for the server to keep track of. Now we just
1053-
// write data in the flow as if it exists anyway.
1072+
collection_context, err = launcher.GetFlowDetails(self.Ctx, self.ConfigObj,
1073+
services.GetFlowOptions{}, self.client_id, flow_id)
1074+
assert.NoError(t, err)
10541075

1055-
// The flow does not exist - make sure it still does not.
1056-
collection_context := &flows_proto.ArtifactCollectorContext{}
1057-
path_manager := paths.NewFlowPathManager(self.client_id, flow_id)
1058-
err = db.GetSubject(self.ConfigObj, path_manager.Path(), collection_context)
1059-
require.Error(t, err, os.ErrNotExist)
1076+
assert.Equal(self.T(), collection_context.Context.State,
1077+
flows_proto.ArtifactCollectorContext_ERROR)
10601078

1061-
// The flow stats are written as normal.
1062-
err = db.GetSubject(self.ConfigObj, path_manager.Stats(), collection_context)
1063-
assert.NoError(t, err)
1079+
assert.Equal(self.T(), collection_context.Context.Status,
1080+
"Unknown flow")
1081+
1082+
assert.Equal(self.T(), len(collection_context.Context.QueryStats), 1)
10641083
}
10651084

10661085
// Test an unknown flow. What happens when the server receives a

services/client_info/client_info_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ type: INTERNAL
4848
`, `
4949
name: Server.Audit.Logs
5050
type: INTERNAL
51+
`, `
52+
name: Client.Test
53+
type: CLIENT
54+
sources:
55+
- query: SELECT * FROM info()
5156
`})
5257

5358
// Create a client in the datastore so we can test initializing
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
{
2+
"InFlightFlows": {
3+
"client_id": "C.1234",
4+
"hostname": "Hostname",
5+
"has_tasks": true,
6+
"in_flight_flows": {
7+
"F.0": 10,
8+
"F.1": 10,
9+
"F.2": 10,
10+
"F.3": 10
11+
}
12+
},
13+
"StatusChecks": [
14+
{
15+
"session_id": "F.Status",
16+
"flow_stats_request": {
17+
"flow_id": [
18+
"F.0",
19+
"F.1",
20+
"F.2",
21+
"F.3"
22+
]
23+
}
24+
}
25+
],
26+
"AfterCompletion": {
27+
"client_id": "C.1234",
28+
"hostname": "Hostname",
29+
"has_tasks": true
30+
},
31+
"SecondSetOfTasks": {
32+
"client_id": "C.1234",
33+
"hostname": "Hostname",
34+
"has_tasks": true,
35+
"in_flight_flows": {
36+
"F.4": 100,
37+
"F.5": 100,
38+
"F.6": 100,
39+
"F.7": 100
40+
}
41+
}
42+
}

services/client_info/tasks_test.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,19 @@ import (
66
"sort"
77
"time"
88

9+
"github.com/Velocidex/ordereddict"
910
"google.golang.org/protobuf/proto"
1011
crypto_proto "www.velocidex.com/golang/velociraptor/crypto/proto"
12+
"www.velocidex.com/golang/velociraptor/flows"
13+
flows_proto "www.velocidex.com/golang/velociraptor/flows/proto"
14+
"www.velocidex.com/golang/velociraptor/json"
1115
"www.velocidex.com/golang/velociraptor/services"
1216
"www.velocidex.com/golang/velociraptor/services/client_info"
1317
"www.velocidex.com/golang/velociraptor/utils"
18+
"www.velocidex.com/golang/velociraptor/vql/acl_managers"
1419
"www.velocidex.com/golang/velociraptor/vtesting"
1520
"www.velocidex.com/golang/velociraptor/vtesting/assert"
21+
"www.velocidex.com/golang/velociraptor/vtesting/goldie"
1622
)
1723

1824
func (self *ClientInfoTestSuite) TestQueueMessages() {
@@ -85,3 +91,111 @@ func (self *ClientInfoTestSuite) TestFastQueueMessages() {
8591
assert.True(self.T(), proto.Equal(tasks[i], written[i]))
8692
}
8793
}
94+
95+
func (self *ClientInfoTestSuite) TestInFlightMessages() {
96+
closer := utils.MockTime(utils.NewMockClock(time.Unix(10, 0)))
97+
defer closer()
98+
99+
launcher, err := services.GetLauncher(self.ConfigObj)
100+
assert.NoError(self.T(), err)
101+
102+
var flow_ids []string
103+
acl_manager := acl_managers.NullACLManager{}
104+
manager, _ := services.GetRepositoryManager(self.ConfigObj)
105+
repository, _ := manager.GetGlobalRepository(self.ConfigObj)
106+
107+
for i := 0; i < 10; i++ {
108+
closer := utils.SetFlowIdForTests(fmt.Sprintf("F.%d", i))
109+
110+
flow_id, err := launcher.ScheduleArtifactCollection(self.Ctx,
111+
self.ConfigObj, acl_manager,
112+
repository, &flows_proto.ArtifactCollectorArgs{
113+
Creator: "admin",
114+
ClientId: self.client_id,
115+
Artifacts: []string{"Client.Test"},
116+
}, utils.SyncCompleter)
117+
assert.NoError(self.T(), err)
118+
119+
flow_ids = append(flow_ids, flow_id)
120+
121+
closer()
122+
}
123+
124+
client_info_manager, err := services.GetClientInfoManager(self.ConfigObj)
125+
assert.NoError(self.T(), err)
126+
127+
tasks, err := client_info_manager.GetClientTasks(self.Ctx, self.client_id)
128+
assert.NoError(self.T(), err)
129+
130+
// 4 tasks are queued
131+
assert.Equal(self.T(), len(tasks), 4)
132+
133+
tasks, err = client_info_manager.GetClientTasks(self.Ctx, self.client_id)
134+
assert.NoError(self.T(), err)
135+
136+
// Tasks are still in flight, so we can not get any new tasks yet.
137+
assert.Equal(self.T(), len(tasks), 0)
138+
139+
client_info, err := client_info_manager.Get(self.Ctx, self.client_id)
140+
assert.NoError(self.T(), err)
141+
142+
// Should contain only 4 flow ids in the in_flight_flows set.
143+
golden := ordereddict.NewDict().
144+
Set("InFlightFlows", client_info)
145+
146+
// Pass some time
147+
closer = utils.MockTime(utils.NewMockClock(time.Unix(100, 0)))
148+
defer closer()
149+
150+
// Tasks are still in flight, so we do not send any flows, instead
151+
// we send a task status request to see how those other tasks are
152+
// going.
153+
tasks, err = client_info_manager.GetClientTasks(self.Ctx, self.client_id)
154+
assert.NoError(self.T(), err)
155+
156+
sort.Strings(tasks[0].FlowStatsRequest.FlowId)
157+
158+
assert.Equal(self.T(), len(tasks), 1)
159+
160+
// Should contains a status check request for all inflight flows.
161+
golden.Set("StatusChecks", tasks)
162+
163+
// Now complete the flows
164+
runner := flows.NewFlowRunner(self.Ctx, self.ConfigObj)
165+
for flow_id := range client_info.InFlightFlows {
166+
runner.ProcessSingleMessage(self.Ctx,
167+
&crypto_proto.VeloMessage{
168+
Source: self.client_id,
169+
SessionId: flow_id,
170+
FlowStats: &crypto_proto.FlowStats{
171+
FlowComplete: true,
172+
QueryStatus: []*crypto_proto.VeloStatus{{
173+
Status: crypto_proto.VeloStatus_OK,
174+
}},
175+
}})
176+
}
177+
runner.Close(self.Ctx)
178+
179+
client_info, err = client_info_manager.Get(self.Ctx, self.client_id)
180+
assert.NoError(self.T(), err)
181+
182+
// Completing the flows removes the flows from the in flight set.
183+
assert.Equal(self.T(), len(client_info.InFlightFlows), 0)
184+
185+
// Should contain no in flight flows but still contain the
186+
// has_tasks flag.
187+
golden.Set("AfterCompletion", client_info)
188+
189+
// Now read some more tasks
190+
tasks, err = client_info_manager.GetClientTasks(self.Ctx, self.client_id)
191+
assert.NoError(self.T(), err)
192+
193+
// Should conatin
194+
assert.Equal(self.T(), len(tasks), 4)
195+
196+
client_info, err = client_info_manager.Get(self.Ctx, self.client_id)
197+
assert.NoError(self.T(), err)
198+
golden.Set("SecondSetOfTasks", client_info)
199+
200+
goldie.Assert(self.T(), "TestInFlightMessages", json.MustMarshalIndent(golden))
201+
}

services/hunt_dispatcher/storage.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,7 @@ func (self *HuntStorageManagerImpl) loadHuntsFromDatastore(
561561
self.dirty = true
562562

563563
// The old hunt record is newer than the one on disk, ignore it.
564-
} else if old_hunt_record.Version >= hunt_obj.Version {
564+
} else if old_hunt_record.Version > hunt_obj.Version {
565565
continue
566566
}
567567

services/launcher/flows.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,15 @@ func UpdateFlowStats(collection_context *flows_proto.ArtifactCollectorContext) {
276276
collection_context.StartTime = s.FirstActive
277277
}
278278

279+
// If the Query stats represents an unknown flow, we mark the
280+
// flow as errored.
281+
if s.Status == crypto_proto.VeloStatus_UNKNOWN_FLOW {
282+
collection_context.State = flows_proto.ArtifactCollectorContext_ERROR
283+
collection_context.Status = s.ErrorMessage
284+
collection_context.Backtrace = s.Backtrace
285+
break
286+
}
287+
279288
// Get the first errored query and mark the entire collection_context with it.
280289
if collection_context.State == flows_proto.ArtifactCollectorContext_RUNNING &&
281290
s.Status == crypto_proto.VeloStatus_GENERIC_ERROR {

vql/functions/alerts.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func (self *AlertFunction) Call(ctx context.Context,
4646

4747
alert_name, pres := args.GetString("name")
4848
if !pres {
49-
scope.Log("alert: Alert name must be specified!")
49+
scope.Log("ERROR:alert: Alert name must be specified!")
5050
return &vfilter.Null{}
5151
}
5252

0 commit comments

Comments
 (0)