Skip to content

Commit 517bf98

Browse files
enarhatekton-robot
authored andcommitted
Add support for merging blob objects parts
Some log storages (e.g S3) store the logs as immutable object, preventing append to existing objects. This is not suitable for long running logs and requires to keep the log in memory for hour or more consuming memory and risking to lose the log if the pod is restarted. The alternative is to write the log to the storage more often and using unique object names, following a pattern which allows to collect and merge the parts for specific log. LOGGING_PLUGIN_MULTIPART_REGEX allows to specify a Regex to match the parts of the same log. Check docs/logging-support.md for more details.
1 parent ac68abe commit 517bf98

File tree

6 files changed

+159
-23
lines changed

6 files changed

+159
-23
lines changed

Diff for: config/base/env/config

+2-1
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,5 @@ LOGGING_PLUGIN_CA_CERT=
4949
LOGGING_PLUGIN_QUERY_LIMIT=1700
5050
LOGGING_PLUGIN_TLS_VERIFICATION_DISABLE=
5151
LOGGING_PLUGIN_FORWARDER_DELAY_DURATION=10
52-
LOGGING_PLUGIN_QUERY_PARAMS='direction=forward'
52+
LOGGING_PLUGIN_QUERY_PARAMS='direction=forward'
53+
LOGGING_PLUGIN_MULTIPART_REGEX=

Diff for: docs/logging-support.md

+1
Original file line numberDiff line numberDiff line change
@@ -51,3 +51,4 @@ These are the common configuration options for all third party logging APIs.
5151
- `LOGGING_PLUGIN_FORWARDER_DELAY_DURATION`: This is the max duration in minutes taken by third party logging system to forward and store the logs after completion of taskrun and pipelinerun. This is used to search between start time of runs and completion plus buffer duration.
5252
- `LOGGING_PLUGIN_QUERY_LIMIT`: Sets the query limit for Third Party Logging API if logging backend has a limit on number of log lines returned.
5353
- `LOGGING_PLUGIN_QUERY_PARAMS`: Sets the query params for Third Party Logging API, these can be direction/sort order.Specify them in this format: "foo=bar&direction=backward"
54+
- `LOGGING_PLUGIN_MULTIPART_REGEX`: Sets a Regex for matching parts of the same log. Some log backends (e.g S3) store objects immutably, once stored, you can't append. For long running TaskRun steps, it's not effective to keep such log in memory until the step completes. Instead one can store the log in multiple parts with a name suffix (e.g `-1743932245` seconds since the Epoch) and set a regex to match the parts of the same log (e.g `-\d{10}$`). (optional)

Diff for: pkg/api/server/config/config.go

+1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ type Config struct {
6666
LOGGING_PLUGIN_TLS_VERIFICATION_DISABLE bool `mapstructure:"LOGGING_PLUGIN_TLS_VERIFICATION_DISABLE"`
6767
LOGGING_PLUGIN_FORWARDER_DELAY_DURATION int64 `mapstructure:"LOGGING_PLUGIN_FORWARDER_DELAY_DURATION"`
6868
LOGGING_PLUGIN_QUERY_PARAMS string `mapstructure:"LOGGING_PLUGIN_QUERY_PARAMS"`
69+
LOGGING_PLUGIN_MULTIPART_REGEX string `mapstructure:"LOGGING_PLUGIN_MULTIPART_REGEX"`
6970
}
7071

7172
func Get() *Config {

Diff for: pkg/api/server/v1alpha2/plugin/export_test.go

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package plugin
2+
3+
// exports for tests
4+
5+
var MergeLogParts = mergeLogParts

Diff for: pkg/api/server/v1alpha2/plugin/plugin_logs.go

+65-22
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,12 @@ import (
1111
"net/url"
1212
"path"
1313
"path/filepath"
14+
"regexp"
15+
"slices"
1416
"sort"
1517
"strconv"
1618
"strings"
19+
"time"
1720

1821
"go.uber.org/zap"
1922
"google.golang.org/grpc/codes"
@@ -318,7 +321,7 @@ func getBlobLogs(s *LogServer, writer io.Writer, parent string, rec *db.Record)
318321
}
319322
u.RawQuery = queryParams.Encode()
320323

321-
logPath := map[string]string{}
324+
logPath := []string{}
322325

323326
ctx := context.Background()
324327
s.logger.Debugf("blob bucket: %s", u.String())
@@ -348,7 +351,7 @@ func getBlobLogs(s *LogServer, writer io.Writer, parent string, rec *db.Record)
348351
s.logger.Error(err)
349352
return err
350353
}
351-
logPath[""] = filepath.Join(s.config.LOGS_PATH, log.Status.Path)
354+
logPath = append(logPath, filepath.Join(s.config.LOGS_PATH, log.Status.Path))
352355
}
353356
} else {
354357
s.logger.Errorf("record type is invalid %s", rec.Type)
@@ -360,6 +363,8 @@ func getBlobLogs(s *LogServer, writer io.Writer, parent string, rec *db.Record)
360363
Prefix: strings.TrimPrefix(s.config.LOGS_PATH+fmt.Sprintf(defaultBlobPathParams, parent, rec.ResultName, rec.Name), "/"),
361364
})
362365
s.logger.Debugf("prefix: %s", strings.TrimPrefix(s.config.LOGS_PATH+fmt.Sprintf(defaultBlobPathParams, parent, rec.ResultName, rec.Name), "/"))
366+
// bucket.List returns the objects sorted alphabetically by key (name), we need that sorted by last modified time
367+
toSort := []*blob.ListObject{}
363368
for {
364369
obj, err := iter.Next(ctx)
365370
if err == io.EOF {
@@ -370,8 +375,16 @@ func getBlobLogs(s *LogServer, writer io.Writer, parent string, rec *db.Record)
370375
s.logger.Error(err)
371376
return err
372377
}
373-
logPath[obj.Key] = obj.Key
378+
toSort = append(toSort, obj)
374379
}
380+
// S3 objects ModTime is rounded to the second (not milliseconds), so objects stored in the same second are still ordered alphabetically
381+
slices.SortFunc(toSort, func(a, b *blob.ListObject) int {
382+
return time.Time.Compare(a.ModTime, b.ModTime)
383+
})
384+
for _, obj := range toSort {
385+
logPath = append(logPath, obj.Key)
386+
}
387+
375388
s.logger.Debugf("logPath: %v", logPath)
376389
case v1alpha3.LogRecordType, v1alpha3.LogRecordTypeV2:
377390
log := &v1alpha3.Log{}
@@ -381,40 +394,70 @@ func getBlobLogs(s *LogServer, writer io.Writer, parent string, rec *db.Record)
381394
s.logger.Error(err)
382395
return err
383396
}
384-
logPath[""] = filepath.Join(s.config.LOGS_PATH, log.Status.Path)
397+
logPath = append(logPath, filepath.Join(s.config.LOGS_PATH, log.Status.Path))
385398
default:
386399
s.logger.Errorf("record type is invalid, record ID: %v, Name: %v, result Name: %v, result ID: %v", rec.ID, rec.Name, rec.ResultName, rec.ResultID)
387400
return fmt.Errorf("record type is invalid %s", rec.Type)
388401
}
389402

390-
for k, v := range logPath {
391-
err := func() error {
392-
s.logger.Debugf("logPath key: %s value: %s", k, v)
393-
_, file := filepath.Split(k)
394-
fmt.Fprint(writer, strings.TrimRight(file, ".log")+" :-\n")
395-
rc, err := bucket.NewReader(ctx, v, nil)
396-
if err != nil {
397-
s.logger.Errorf("error creating bucket reader: %s for log path: %s", err, logPath)
398-
return err
399-
}
400-
defer rc.Close()
403+
regex := s.config.LOGGING_PLUGIN_MULTIPART_REGEX
404+
re, err := regexp.Compile(regex)
405+
if err != nil {
406+
s.logger.Errorf("failed to compile regexp: %s", err)
407+
return err
408+
}
409+
mergedLogParts := mergeLogParts(logPath, re)
410+
411+
for _, parts := range mergedLogParts {
412+
baseName := re.ReplaceAllString(parts[0], "")
413+
s.logger.Debugf("mergedLogParts key: %s value: %v", baseName, parts)
414+
_, file := filepath.Split(baseName)
415+
fmt.Fprint(writer, strings.TrimRight(file, ".log")+" :-\n")
416+
for _, part := range parts {
417+
err := func() error {
418+
rc, err := bucket.NewReader(ctx, part, nil)
419+
if err != nil {
420+
s.logger.Errorf("error creating bucket reader: %s for log part: %s", err, part)
421+
return err
422+
}
423+
defer rc.Close()
401424

402-
_, err = io.Copy(writer, rc)
425+
_, err = rc.WriteTo(writer)
426+
if err != nil {
427+
s.logger.Errorf("error writing the logs: %s", err)
428+
}
429+
return nil
430+
}()
403431
if err != nil {
404-
s.logger.Errorf("error copying the logs: %s", err)
432+
s.logger.Error(err)
405433
return err
406434
}
407435
fmt.Fprint(writer, "\n")
408-
return nil
409-
}()
410-
if err != nil {
411-
s.logger.Error(err)
412-
return err
413436
}
414437
}
438+
415439
return nil
416440
}
417441

442+
// mergeLogParts organizes in groups objects part of the same log
443+
func mergeLogParts(logPath []string, re *regexp.Regexp) [][]string {
444+
merged := [][]string{}
445+
// use extra mapping [log_base_name:index_of_slice_of_parts] to preserve the order of elements
446+
baseNameIndexes := map[string]int{}
447+
index := 0
448+
for _, log := range logPath {
449+
baseName := re.ReplaceAllString(log, "")
450+
if existingIndex, ok := baseNameIndexes[baseName]; ok {
451+
merged[existingIndex] = append(merged[existingIndex], log)
452+
} else {
453+
baseNameIndexes[baseName] = index
454+
merged = append(merged, []string{log})
455+
index++
456+
}
457+
}
458+
return merged
459+
}
460+
418461
func (s *LogServer) setLogPlugin() bool {
419462
switch strings.ToLower(s.config.LOGS_TYPE) {
420463
case string(v1alpha3.LokiLogType):

Diff for: pkg/api/server/v1alpha2/plugin/plugin_logs_test.go

+85
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"net/http/httptest"
99
"os"
1010
"path/filepath"
11+
"regexp"
1112
"testing"
1213
"time"
1314

@@ -16,6 +17,7 @@ import (
1617
"github.com/tektoncd/results/pkg/api/server/test"
1718
server "github.com/tektoncd/results/pkg/api/server/v1alpha2"
1819
"github.com/tektoncd/results/pkg/api/server/v1alpha2/log"
20+
"github.com/tektoncd/results/pkg/api/server/v1alpha2/plugin"
1921
"github.com/tektoncd/results/pkg/api/server/v1alpha2/record"
2022

2123
pipelinev1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
@@ -102,6 +104,7 @@ func TestLogPluginServer_GetLog(t *testing.T) {
102104
LOGGING_PLUGIN_CONTAINER_KEY: "kubernetes.container_name",
103105
LOGGING_PLUGIN_QUERY_LIMIT: 1500,
104106
LOGGING_PLUGIN_QUERY_PARAMS: "direction=forward",
107+
LOGGING_PLUGIN_MULTIPART_REGEX: "",
105108
}, logger.Get("info"), test.NewDB(t))
106109
if err != nil {
107110
t.Fatalf("failed to create server: %v", err)
@@ -166,3 +169,85 @@ func TestLogPluginServer_GetLog(t *testing.T) {
166169
}
167170

168171
}
172+
173+
func TestMergeLogParts(t *testing.T) {
174+
tests := []struct {
175+
name string
176+
regex string
177+
logParts []string
178+
expectedMerged [][]string
179+
}{
180+
{
181+
name: "Test with matching regexp",
182+
regex: `-\d{10}.log$`,
183+
logParts: []string{
184+
"/logs/default/0c8ca3dc-92ea-40df-aa0d-dff9f5361ae8/0f66649d-b8fa-4bb6-a42f-169d96c70298/prepare-1743090392.log",
185+
"/logs/default/0c8ca3dc-92ea-40df-aa0d-dff9f5361ae8/0f66649d-b8fa-4bb6-a42f-169d96c70298/place-scripts-1743090392.log",
186+
"/logs/default/0c8ca3dc-92ea-40df-aa0d-dff9f5361ae8/0f66649d-b8fa-4bb6-a42f-169d96c70298/container-step-foo-1743090392.log",
187+
"/logs/default/0c8ca3dc-92ea-40df-aa0d-dff9f5361ae8/0f66649d-b8fa-4bb6-a42f-169d96c70298/container-step-foo-1743090554.log",
188+
"/logs/default/0c8ca3dc-92ea-40df-aa0d-dff9f5361ae8/0f66649d-b8fa-4bb6-a42f-169d96c70298/container-step-foo-1743090738.log",
189+
},
190+
expectedMerged: [][]string{
191+
{"/logs/default/0c8ca3dc-92ea-40df-aa0d-dff9f5361ae8/0f66649d-b8fa-4bb6-a42f-169d96c70298/prepare-1743090392.log"},
192+
{"/logs/default/0c8ca3dc-92ea-40df-aa0d-dff9f5361ae8/0f66649d-b8fa-4bb6-a42f-169d96c70298/place-scripts-1743090392.log"},
193+
{
194+
"/logs/default/0c8ca3dc-92ea-40df-aa0d-dff9f5361ae8/0f66649d-b8fa-4bb6-a42f-169d96c70298/container-step-foo-1743090392.log",
195+
"/logs/default/0c8ca3dc-92ea-40df-aa0d-dff9f5361ae8/0f66649d-b8fa-4bb6-a42f-169d96c70298/container-step-foo-1743090554.log",
196+
"/logs/default/0c8ca3dc-92ea-40df-aa0d-dff9f5361ae8/0f66649d-b8fa-4bb6-a42f-169d96c70298/container-step-foo-1743090738.log",
197+
},
198+
},
199+
},
200+
{
201+
name: "Test with empty regexp",
202+
regex: ``,
203+
logParts: []string{
204+
"/logs/default/0c8ca3dc-92ea-40df-aa0d-dff9f5361ae8/0f66649d-b8fa-4bb6-a42f-169d96c70298/prepare-1743090392.log",
205+
"/logs/default/0c8ca3dc-92ea-40df-aa0d-dff9f5361ae8/0f66649d-b8fa-4bb6-a42f-169d96c70298/place-scripts-1743090392.log",
206+
"/logs/default/0c8ca3dc-92ea-40df-aa0d-dff9f5361ae8/0f66649d-b8fa-4bb6-a42f-169d96c70298/container-step-foo-1743090392.log",
207+
"/logs/default/0c8ca3dc-92ea-40df-aa0d-dff9f5361ae8/0f66649d-b8fa-4bb6-a42f-169d96c70298/container-step-foo-1743090554.log",
208+
"/logs/default/0c8ca3dc-92ea-40df-aa0d-dff9f5361ae8/0f66649d-b8fa-4bb6-a42f-169d96c70298/container-step-foo-1743090738.log",
209+
},
210+
expectedMerged: [][]string{
211+
{"/logs/default/0c8ca3dc-92ea-40df-aa0d-dff9f5361ae8/0f66649d-b8fa-4bb6-a42f-169d96c70298/prepare-1743090392.log"},
212+
{"/logs/default/0c8ca3dc-92ea-40df-aa0d-dff9f5361ae8/0f66649d-b8fa-4bb6-a42f-169d96c70298/place-scripts-1743090392.log"},
213+
{"/logs/default/0c8ca3dc-92ea-40df-aa0d-dff9f5361ae8/0f66649d-b8fa-4bb6-a42f-169d96c70298/container-step-foo-1743090392.log"},
214+
{"/logs/default/0c8ca3dc-92ea-40df-aa0d-dff9f5361ae8/0f66649d-b8fa-4bb6-a42f-169d96c70298/container-step-foo-1743090554.log"},
215+
{"/logs/default/0c8ca3dc-92ea-40df-aa0d-dff9f5361ae8/0f66649d-b8fa-4bb6-a42f-169d96c70298/container-step-foo-1743090738.log"},
216+
},
217+
},
218+
{
219+
name: "Test with not matching regexp",
220+
regex: `not-matching-regex`,
221+
logParts: []string{
222+
"/logs/default/0c8ca3dc-92ea-40df-aa0d-dff9f5361ae8/0f66649d-b8fa-4bb6-a42f-169d96c70298/prepare-1743090392.log",
223+
"/logs/default/0c8ca3dc-92ea-40df-aa0d-dff9f5361ae8/0f66649d-b8fa-4bb6-a42f-169d96c70298/place-scripts-1743090392.log",
224+
"/logs/default/0c8ca3dc-92ea-40df-aa0d-dff9f5361ae8/0f66649d-b8fa-4bb6-a42f-169d96c70298/container-step-foo-1743090392.log",
225+
"/logs/default/0c8ca3dc-92ea-40df-aa0d-dff9f5361ae8/0f66649d-b8fa-4bb6-a42f-169d96c70298/container-step-foo-1743090554.log",
226+
"/logs/default/0c8ca3dc-92ea-40df-aa0d-dff9f5361ae8/0f66649d-b8fa-4bb6-a42f-169d96c70298/container-step-foo-1743090738.log",
227+
},
228+
expectedMerged: [][]string{
229+
{"/logs/default/0c8ca3dc-92ea-40df-aa0d-dff9f5361ae8/0f66649d-b8fa-4bb6-a42f-169d96c70298/prepare-1743090392.log"},
230+
{"/logs/default/0c8ca3dc-92ea-40df-aa0d-dff9f5361ae8/0f66649d-b8fa-4bb6-a42f-169d96c70298/place-scripts-1743090392.log"},
231+
{"/logs/default/0c8ca3dc-92ea-40df-aa0d-dff9f5361ae8/0f66649d-b8fa-4bb6-a42f-169d96c70298/container-step-foo-1743090392.log"},
232+
{"/logs/default/0c8ca3dc-92ea-40df-aa0d-dff9f5361ae8/0f66649d-b8fa-4bb6-a42f-169d96c70298/container-step-foo-1743090554.log"},
233+
{"/logs/default/0c8ca3dc-92ea-40df-aa0d-dff9f5361ae8/0f66649d-b8fa-4bb6-a42f-169d96c70298/container-step-foo-1743090738.log"},
234+
},
235+
},
236+
}
237+
238+
for _, tt := range tests {
239+
t.Run(tt.name, func(t *testing.T) {
240+
re := regexp.MustCompile(tt.regex)
241+
result := plugin.MergeLogParts(tt.logParts, re)
242+
243+
for i, parts := range result {
244+
for j, expectedPart := range parts {
245+
if expectedPart != tt.expectedMerged[i][j] {
246+
t.Errorf("Expected merged log part %d to be %q, got %q", i, tt.expectedMerged[i][j], expectedPart)
247+
}
248+
}
249+
}
250+
})
251+
}
252+
253+
}

0 commit comments

Comments
 (0)