Skip to content

Commit

Permalink
Merge pull request #7 from vdice/fix/monitor-job-logs-truncation
Browse files Browse the repository at this point in the history
fix(monitor): use circular buffer to truncate job logs, if needed
  • Loading branch information
vdice authored Jul 1, 2021
2 parents bb23a3d + d5361d5 commit e654a7f
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 27 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.15
replace k8s.io/client-go => k8s.io/client-go v0.18.2

require (
github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2
github.com/brigadecore/brigade/sdk/v2 v2.0.0-alpha.5.0.20210614205223-c89ad0ef0260
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/google/go-github/v33 v33.0.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2 h1:7Ip0wMmLHLRJdrloDxZfhMm0xrLXZS8+COSu2bXmEQs=
github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/brigadecore/brigade/sdk/v2 v2.0.0-alpha.5.0.20210614205223-c89ad0ef0260 h1:BuuzEs84uw06myuo4G2j+fLiXMjwS/pXFDNp9i6e1Ds=
github.com/brigadecore/brigade/sdk/v2 v2.0.0-alpha.5.0.20210614205223-c89ad0ef0260/go.mod h1:rB3y/pIheORX5AHbxaSAw5Xr/U6bUAUtSLkgJcbOHIY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
Expand Down
37 changes: 18 additions & 19 deletions monitor/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
"fmt"
"log"
"strconv"
"strings"
"time"

"github.com/armon/circbuf"

"github.com/brigadecore/brigade/sdk/v2/core"
"github.com/brigadecore/brigade/sdk/v2/meta"
"github.com/google/go-github/v33/github"
Expand Down Expand Up @@ -401,7 +402,6 @@ func (m *monitor) getJobLogs(
if !job.Status.Phase.IsTerminal() {
return "", nil
}
var jobLogsBuilder strings.Builder
logCh, errCh, err := m.logsClient.Stream(
ctx,
eventID,
Expand All @@ -413,23 +413,27 @@ func (m *monitor) getJobLogs(
if err != nil {
return "", err
}
// Arbitrarily limiting to 1000 log lines because we don't want to blow
// out the heap if a Job has produced an enormous amount of logs
const maxLines = 1000
var i int
// GitHub places a restriction on the text field for a Check Run at 65535
// characters, so we use this as a maximum and truncate if needed.
const maxBytes = 65535
// This circular buffer can be written to infinitely but maintains a fixed
// size, preserving the most recent bytes written - this is what we want
// as we wish to present the last logs written.
// Ignoring the error as it is only returned if the provided size is <= 0
jobLogsBuffer, _ := circbuf.NewBuffer(maxBytes)
logLoop:
for i = 0; i < maxLines; i++ {
for {
var logEntry core.LogEntry
var ok bool
select {
case logEntry, ok = <-logCh:
if !ok { // The channel was closed. We got everything.
break logLoop
}
if _, err = jobLogsBuilder.WriteString(logEntry.Message); err != nil {
if _, err = jobLogsBuffer.Write([]byte(logEntry.Message)); err != nil {
return "", err
}
if _, err = jobLogsBuilder.WriteString("\n"); err != nil {
if _, err = jobLogsBuffer.Write([]byte("\n")); err != nil {
return "", err
}
case err, ok = <-errCh:
Expand All @@ -440,15 +444,10 @@ logLoop:
return "", ctx.Err()
}
}
if i > maxLines {
if _, err = jobLogsBuilder.WriteString(
fmt.Sprintf(
"--- !!! THESE LOGS HAVE BEEN TRUNCATED AFTER %d LINES !!! ---\n",
maxLines,
),
); err != nil {
return "", err
}
if jobLogsBuffer.TotalWritten() > maxBytes {
logBytes := jobLogsBuffer.Bytes()
copy(logBytes[0:], "(Previous text omitted)\n")
return string(logBytes), nil
}
return jobLogsBuilder.String(), nil
return jobLogsBuffer.String(), nil
}
53 changes: 45 additions & 8 deletions monitor/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -735,7 +734,7 @@ func TestGetJobLogs(t *testing.T) {
},
},
{
name: "success streaming logs",
name: "success streaming logs, with truncation",
monitor: &monitor{
logsClient: &coreTesting.MockLogsClient{
StreamFn: func(
Expand All @@ -747,14 +746,16 @@ func TestGetJobLogs(t *testing.T) {
logEntryCh := make(chan core.LogEntry)
errCh := make(chan error)
go func() {
// Send 2,000 lines
for i := 0; i < 2000; i++ {
// Send 32768 one-char lines for 65536 bytes total
// (one-char msg + one-char newline)
for i := 0; i < 32768; i++ {
select {
case logEntryCh <- core.LogEntry{Message: strconv.Itoa(i)}:
case logEntryCh <- core.LogEntry{Message: "l"}:
case <-ctx.Done():
return
}
}
close(logEntryCh)
}()
return logEntryCh, errCh, nil
},
Expand All @@ -767,9 +768,45 @@ func TestGetJobLogs(t *testing.T) {
},
assertions: func(logs string, err error) {
require.NoError(t, err)
// Make sure we got back only the first 1,000 lines
lines := strings.Split(logs, "\n")
assert.Len(t, lines, 1001) // 1,001 accounts for the trailing newline
assert.Contains(t, logs, "(Previous text omitted)\n")
assert.Equal(t, len(logs), 65535)
},
},
{
name: "success streaming logs, no truncation",
monitor: &monitor{
logsClient: &coreTesting.MockLogsClient{
StreamFn: func(
ctx context.Context,
_ string,
_ *core.LogsSelector,
_ *core.LogStreamOptions,
) (<-chan core.LogEntry, <-chan error, error) {
logEntryCh := make(chan core.LogEntry)
errCh := make(chan error)
go func() {
for i := 0; i < 32767; i++ {
select {
case logEntryCh <- core.LogEntry{Message: "l"}:
case <-ctx.Done():
return
}
}
close(logEntryCh)
}()
return logEntryCh, errCh, nil
},
},
},
job: core.Job{
Status: &core.JobStatus{
Phase: core.JobPhaseSucceeded,
},
},
assertions: func(logs string, err error) {
require.NoError(t, err)
assert.NotContains(t, logs, "(Previous text omitted)\n")
assert.Equal(t, len(logs), 65534)
},
},
}
Expand Down

0 comments on commit e654a7f

Please sign in to comment.