Skip to content

Commit 7344ede

Browse files
committed
syz-cluster/email-reporter: poll an LKML archive for replies
Add the functionality to poll the specified LKML git archive for the new user replies under the bot's messages. Check out the repository and pull it once in the specified time period. Keep track of the latest processed commit hash. Track the original report ID via In-Reply-To references.
1 parent 09706c8 commit 7344ede

File tree

8 files changed

+291
-29
lines changed

8 files changed

+291
-29
lines changed

syz-cluster/email-reporter/deployment.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ spec:
2222
volumeMounts:
2323
- name: config-volume
2424
mountPath: /config
25+
- name: reporter-lore-disk
26+
mountPath: /lore-repo
2527
resources:
2628
requests:
2729
cpu: 2
@@ -33,3 +35,6 @@ spec:
3335
- name: config-volume
3436
configMap:
3537
name: global-config
38+
- name: reporter-lore-disk
39+
persistentVolumeClaim:
40+
claimName: reporter-lore-disk-claim

syz-cluster/email-reporter/handler_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818
func TestModerationReportFlow(t *testing.T) {
1919
env, ctx := app.TestEnvironment(t)
2020
testSeries := controller.DummySeries()
21-
handler, emailServer := setupHandlerTest(t, env, ctx, testSeries)
21+
handler, _, emailServer := setupHandlerTest(t, env, ctx, testSeries)
2222

2323
report, err := handler.PollAndReport(ctx)
2424
assert.NoError(t, err)
@@ -61,7 +61,7 @@ func TestModerationReportFlow(t *testing.T) {
6161
func TestInvalidReply(t *testing.T) {
6262
env, ctx := app.TestEnvironment(t)
6363
testSeries := controller.DummySeries()
64-
handler, emailServer := setupHandlerTest(t, env, ctx, testSeries)
64+
handler, _, emailServer := setupHandlerTest(t, env, ctx, testSeries)
6565

6666
report, err := handler.PollAndReport(ctx)
6767
assert.NoError(t, err)
@@ -117,7 +117,7 @@ Unknown command
117117
}
118118

119119
func setupHandlerTest(t *testing.T, env *app.AppEnvironment, ctx context.Context,
120-
series *api.Series) (*Handler, *fakeSender) {
120+
series *api.Series) (*Handler, *api.ReporterClient, *fakeSender) {
121121
client := controller.TestServer(t, env)
122122
controller.FakeSeriesWithFindings(t, ctx, env, client, series)
123123

@@ -126,13 +126,14 @@ func setupHandlerTest(t *testing.T, env *app.AppEnvironment, ctx context.Context
126126
assert.NoError(t, err)
127127

128128
emailServer := makeFakeSender()
129+
reporterClient := reporter.TestServer(t, env)
129130
handler := &Handler{
130131
reporter: api.LKMLReporter,
131-
apiClient: reporter.TestServer(t, env),
132+
apiClient: reporterClient,
132133
emailConfig: testEmailConfig,
133134
sender: emailServer.send,
134135
}
135-
return handler, emailServer
136+
return handler, reporterClient, emailServer
136137
}
137138

138139
type fakeSender struct {

syz-cluster/email-reporter/kustomization.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@
33

44
resources:
55
- deployment.yaml
6+
- lore-disk-pvc.yaml
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# Copyright 2025 syzkaller project authors. All rights reserved.
2+
# Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
3+
4+
apiVersion: v1
5+
kind: PersistentVolumeClaim
6+
metadata:
7+
name: reporter-lore-disk-claim
8+
spec:
9+
accessModes:
10+
- ReadWriteOnce
11+
resources:
12+
requests:
13+
storage: 16Gi
14+
storageClassName: standard

syz-cluster/email-reporter/main.go

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,19 @@ import (
1313
"github.com/google/syzkaller/pkg/email"
1414
"github.com/google/syzkaller/syz-cluster/pkg/api"
1515
"github.com/google/syzkaller/syz-cluster/pkg/app"
16+
"golang.org/x/sync/errgroup"
1617
)
1718

1819
// TODO: add extra sanity checks that would prevent flooding the mailing lists:
1920
// - this pod may crash and be restarted by K8S: this complicates accounting,
2021
// - the send operation might return an error, yet an email would be actually sent: back off on errors?
2122

22-
// How often to check whether there are new emails to be sent.
23-
const pollPeriod = 30 * time.Second
23+
const (
24+
// How often to check whether there are new emails to be sent.
25+
senderPollPeriod = 30 * time.Second
26+
// How often to check whether there are new incoming emails.
27+
fetcherPollPeriod = 2 * time.Minute
28+
)
2429

2530
func main() {
2631
ctx := context.Background()
@@ -42,20 +47,34 @@ func main() {
4247
emailConfig: cfg.EmailReporting,
4348
sender: sender.Send,
4449
}
45-
emailStream := NewLoreEmailStreamer()
46-
ch := make(chan *email.Email, 16)
47-
go func() {
48-
for newEmail := range ch {
50+
msgCh := make(chan *email.Email, 16)
51+
eg, loopCtx := errgroup.WithContext(ctx)
52+
if cfg.EmailReporting.LoreArchiveURL != "" {
53+
fetcher := NewLKMLEmailStream("/lore-repo",
54+
cfg.EmailReporting.LoreArchiveURL, reporterClient, msgCh)
55+
eg.Go(func() error { return fetcher.Loop(loopCtx, fetcherPollPeriod) })
56+
}
57+
eg.Go(func() error {
58+
for {
59+
var newEmail *email.Email
60+
select {
61+
case newEmail = <-msgCh:
62+
case <-loopCtx.Done():
63+
return nil
64+
}
4965
log.Printf("received email %q", newEmail.MessageID)
50-
err := handler.IncomingEmail(ctx, newEmail)
66+
err := handler.IncomingEmail(loopCtx, newEmail)
5167
if err != nil {
5268
// Note that we just print an error and go on instead of retrying.
5369
// Some retrying may be reasonable, but it also comes with a risk of flooding
5470
// the mailing lists.
5571
app.Errorf("email %q: failed to process: %v", newEmail.MessageID, err)
5672
}
5773
}
58-
}()
59-
go handler.PollReportsLoop(ctx, pollPeriod)
60-
go emailStream.Loop(ctx, ch)
74+
})
75+
eg.Go(func() error {
76+
handler.PollReportsLoop(loopCtx, senderPollPeriod)
77+
return nil
78+
})
79+
eg.Wait()
6180
}

syz-cluster/email-reporter/stream.go

Lines changed: 109 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,28 +5,123 @@ package main
55

66
import (
77
"context"
8+
"fmt"
9+
"log"
10+
"time"
811

912
"github.com/google/syzkaller/pkg/email"
13+
"github.com/google/syzkaller/pkg/email/lore"
14+
"github.com/google/syzkaller/pkg/vcs"
15+
"github.com/google/syzkaller/syz-cluster/pkg/api"
16+
"github.com/google/syzkaller/syz-cluster/pkg/app"
1017
)
1118

12-
// TODO: there's a lot of common code with series-tracker.
13-
// TODO: alternatively, we could parse the whole archive and track each email via In-Reply-To to the original email.
19+
type LKMLEmailStream struct {
20+
reporterName string
21+
repoURL string
22+
repoFolder string
23+
client *api.ReporterClient
24+
newMessages chan *email.Email
25+
lastCommitDate time.Time
26+
lastCommit string
27+
}
1428

15-
type LoreEmailStreamer struct {
29+
func NewLKMLEmailStream(repoFolder, repoURL string, client *api.ReporterClient,
30+
writeTo chan *email.Email) *LKMLEmailStream {
31+
return &LKMLEmailStream{
32+
reporterName: api.LKMLReporter,
33+
repoURL: repoURL,
34+
repoFolder: repoFolder,
35+
client: client,
36+
newMessages: writeTo,
37+
}
1638
}
1739

18-
func NewLoreEmailStreamer() *LoreEmailStreamer {
19-
return &LoreEmailStreamer{}
40+
const (
41+
// Don't consider older replies.
42+
relevantPeriod = 7 * 24 * time.Hour
43+
)
44+
45+
func (s *LKMLEmailStream) Loop(ctx context.Context, pollPeriod time.Duration) error {
46+
last, err := s.client.LastReply(ctx, s.reporterName)
47+
if err != nil {
48+
return fmt.Errorf("failed to query the last reply: %w", err)
49+
}
50+
// We assume that the archive mostly consists of relevant emails, so after the restart
51+
// we just start with the last saved message's date.
52+
s.lastCommitDate = time.Now().Add(-relevantPeriod)
53+
if last != nil && last.Time.After(s.lastCommitDate) {
54+
s.lastCommitDate = last.Time
55+
}
56+
for {
57+
err := s.fetchMessages(ctx)
58+
if err != nil {
59+
// Occasional errors are fine.
60+
log.Printf("failed to poll the lore archive messages: %v", err)
61+
}
62+
select {
63+
case <-ctx.Done():
64+
return nil
65+
case <-time.After(pollPeriod):
66+
}
67+
}
2068
}
2169

22-
func (s *LoreEmailStreamer) Loop(ctx context.Context, writeTo chan *email.Email) {
23-
<-ctx.Done()
70+
func (s *LKMLEmailStream) fetchMessages(ctx context.Context) error {
71+
gitRepo := vcs.NewLKMLRepo(s.repoFolder)
72+
_, err := gitRepo.Poll(s.repoURL, "master")
73+
if err != nil {
74+
return err
75+
}
76+
var messages []lore.EmailReader
77+
if s.lastCommit != "" {
78+
// If it's not the first iteration, it's better to rely on the last commit hash.
79+
messages, err = lore.ReadArchive(gitRepo, s.lastCommit, time.Time{})
80+
} else {
81+
messages, err = lore.ReadArchive(gitRepo, "", s.lastCommitDate)
82+
}
83+
if err != nil {
84+
return err
85+
}
86+
// From oldest to newest.
87+
for i := len(messages) - 1; i >= 0; i-- {
88+
msg := messages[i]
89+
parsed, err := msg.Parse(nil, nil)
90+
if err != nil || parsed == nil {
91+
log.Printf("failed to parse the email from hash %q: %v", msg.Hash, err)
92+
continue
93+
}
94+
if msg.CommitDate.After(s.lastCommitDate) {
95+
s.lastCommitDate = msg.CommitDate
96+
}
97+
s.lastCommit = msg.Hash
2498

25-
// !! We assume that the archive mostly consists of relevant emails.
26-
// 1. Query the last recorded discussion via API.
27-
// 2. Poll the lore archive and query the emails starting from the date returned in (1).
28-
// 3. Parse the email using email.Parse().
29-
// 4. Report the new email via API, figure out which report was involved, save report ID to msg's BugIDs.
30-
// 5. Push to the channel only if the message has not been seen before.
31-
// Also, we probablty don't want to react to old messages (e.g. > 1 day from now).
99+
// We cannot fully trust the date specified in the message itself, so let's sanitize it
100+
// using the commit date. It will at least help us prevent weird client.lastReply() responses.
101+
messageDate := parsed.Date
102+
if messageDate.After(msg.CommitDate) {
103+
messageDate = msg.CommitDate
104+
}
105+
resp, err := s.client.RecordReply(ctx, &api.RecordReplyReq{
106+
MessageID: parsed.MessageID,
107+
InReplyTo: parsed.InReplyTo,
108+
Reporter: s.reporterName,
109+
Time: messageDate,
110+
})
111+
if err != nil || resp == nil {
112+
// TODO: retry?
113+
app.Errorf("failed to report email %q: %v", parsed.MessageID, err)
114+
continue
115+
} else if resp.ReportID != "" {
116+
if !resp.New {
117+
continue
118+
}
119+
parsed.BugIDs = []string{resp.ReportID}
120+
}
121+
select {
122+
case s.newMessages <- parsed:
123+
case <-ctx.Done():
124+
}
125+
}
126+
return nil
32127
}

0 commit comments

Comments
 (0)