diff --git a/pkg/pfpstatus/pfpstatus.go b/pkg/pfpstatus/pfpstatus.go index db6723ea..8c3c6ca1 100644 --- a/pkg/pfpstatus/pfpstatus.go +++ b/pkg/pfpstatus/pfpstatus.go @@ -41,13 +41,16 @@ const ( const ( defaultMaxNodes = 5000 defaultMaxSamplesPerNode = 10 + defaultMaxSizePerNode = 0 // no constraints defaultDumpPeriod = 10 * time.Second ) type StorageParams struct { - Enabled bool - Directory string - Period time.Duration + Enabled bool + Directory string + Period time.Duration + CoalesceLast bool + MaxSizePerNode int } type Params struct { @@ -63,9 +66,11 @@ type environ struct { func DefaultParams() Params { return Params{ Storage: StorageParams{ - Enabled: false, - Directory: DefaultDumpDirectory, - Period: 10 * time.Second, + Enabled: false, + Directory: DefaultDumpDirectory, + Period: 10 * time.Second, + CoalesceLast: false, + MaxSizePerNode: defaultMaxSizePerNode, }, } } @@ -95,7 +100,12 @@ func Setup(logh logr.Logger, params Params) { logh.Info("Setup in progress", "params", fmt.Sprintf("%+#v", params)) - rec, err := record.NewRecorder(record.WithMaxNodes(defaultMaxNodes), record.WithNodeCapacity(defaultMaxSamplesPerNode)) + rec, err := record.NewRecorder( + record.WithMaxNodes(defaultMaxNodes), + record.WithNodeCapacity(defaultMaxSamplesPerNode), + record.WithMaxSizePerNode(defaultMaxSizePerNode), + record.WithPFPCoalescing(params.Storage.CoalesceLast), + ) if err != nil { logh.Error(err, "cannot create a status recorder") return diff --git a/pkg/pfpstatus/record/options.go b/pkg/pfpstatus/record/options.go new file mode 100644 index 00000000..2a76713d --- /dev/null +++ b/pkg/pfpstatus/record/options.go @@ -0,0 +1,77 @@ +/* + * Copyright 2025 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package record + +import "time" + +type Option func(*Recorder) + +func WithNodeCapacity(nodeCapacity int) Option { + return func(rec *Recorder) { + rec.nodeCapacity = nodeCapacity + } +} + +func WithNodeTimestamper(tsr func() time.Time) Option { + return func(rec *Recorder) { + rec.timestamper = tsr + } +} + +func WithMaxNodes(maxNodes int) Option { + return func(rec *Recorder) { + rec.maxNodes = maxNodes + } +} + +func WithPFPCoalescing(val bool) Option { + return func(rr *Recorder) { + rr.coalesceLast = val + } +} + +func WithMaxSizePerNode(maxSize int) Option { + return func(rr *Recorder) { + rr.maxSize = maxSize + } +} + +type NodeOption func(*NodeRecorder) + +func WithCapacity(capacity int) NodeOption { + return func(nr *NodeRecorder) { + nr.capacity = capacity + } +} + +func WithTimestamper(tsr func() time.Time) NodeOption { + return func(nr *NodeRecorder) { + nr.timestamper = tsr + } +} + +func WithCoalescing(val bool) NodeOption { + return func(nr *NodeRecorder) { + nr.coalesceLast = val + } +} + +func WithMaxSize(maxSize int) NodeOption { + return func(nr *NodeRecorder) { + nr.maxSize = maxSize + } +} diff --git a/pkg/pfpstatus/record/recorder.go b/pkg/pfpstatus/record/recorder.go index afd7ebfc..e2ec71ce 100644 --- a/pkg/pfpstatus/record/recorder.go +++ b/pkg/pfpstatus/record/recorder.go @@ -18,6 +18,8 @@ package record import ( "errors" + "fmt" + "io" "time" "github.com/k8stopologyawareschedwg/podfingerprint" @@ -40,6 +42,13 @@ type RecordedStatus struct { podfingerprint.Status // RecordTime is a timestamp of when the RecordedStatus was added to the record RecordTime time.Time `json:"recordTime"` + // statusSize is approximate size of the string representation of the status in bytes + statusSize int `json:"-"` +} + +func (rs RecordedStatus) Size() int { + byteCount, _ := fmt.Fprint(io.Discard, fmt.Sprintf("%+v\n%+v", rs.Pods, rs.RecordTime)) + return byteCount } func (rs RecordedStatus) Equal(x RecordedStatus) bool { @@ -52,24 +61,13 @@ type Timestamper func() time.Time // NodeRecorder stores all the recorded statuses for a given node name. // Statuses belonging to different nodes won't be accepted. type NodeRecorder struct { - timestamper func() time.Time - nodeName string // shortcut - capacity int - statuses []RecordedStatus -} - -type NodeOption func(*NodeRecorder) - -func WithCapacity(capacity int) NodeOption { - return func(nr *NodeRecorder) { - nr.capacity = capacity - } -} - -func WithTimestamper(tsr func() time.Time) NodeOption { - return func(nr *NodeRecorder) { - nr.timestamper = tsr - } + timestamper func() time.Time + nodeName string // shortcut + capacity int + maxSize int + size int + statuses []RecordedStatus + coalesceLast bool } // NewNodeRecorder creates a new recorder for the given node with the given capacity. @@ -101,14 +99,27 @@ func (nr *NodeRecorder) dropOldest() { if nr.Len() < 1 { return } + nr.size -= nr.statuses[0].Size() nr.statuses = nr.statuses[1:] } -func (nr *NodeRecorder) makeRoom() { - if nr.Len() < nr.Cap() { +func (nr *NodeRecorder) makeRoom(size int) { + if nr.capacity > 1 && nr.Len() == nr.Cap() { + nr.dropOldest() + } + if nr.maxSize == 0 { return } - nr.dropOldest() + + if size >= nr.maxSize { + nr.size = 0 + nr.statuses = []RecordedStatus{} + return + } + + for (nr.size + size) > nr.maxSize { + nr.dropOldest() + } } // Push adds a new Status to the record, evicting the oldest Status if necessary. @@ -123,16 +134,28 @@ func (nr *NodeRecorder) Push(st podfingerprint.Status) error { return ErrMismatchingNode } ts := nr.timestamper() + + if nr.IsCoalescing() && nr.Len() != 0 { + lastItem := nr.statuses[nr.Len()-1] + if lastItem.FingerprintComputed == st.FingerprintComputed && lastItem.FingerprintExpected == st.FingerprintExpected { // pod list should not change + return nil + } + } + item := RecordedStatus{ - Status: st.Clone(), + Status: st, RecordTime: ts, } + item.statusSize = item.Size() if nr.capacity == 1 { // handle common special case, avoid any resize nr.statuses[0] = item + nr.size = item.Size() + // no maxSize constraint required return nil } - nr.makeRoom() + nr.makeRoom(item.statusSize) nr.statuses = append(nr.statuses, item) + nr.size += item.Size() return nil } @@ -146,38 +169,30 @@ func (nr *NodeRecorder) Cap() int { return nr.capacity } -// Content() returns a shallow copy of all the recorded statuses. +// Content returns a shallow copy of all the recorded statuses. func (nr *NodeRecorder) Content() []RecordedStatus { return nr.statuses } +// IsCoalescing returns true if the node recorder is configured to push statuses only if they are unique in PFPs +func (nr *NodeRecorder) IsCoalescing() bool { + return nr.coalesceLast +} + +// MaxSize returns the maximum size allowed for nr +func (nr *NodeRecorder) MaxSize() int { + return nr.maxSize +} + // Recorder stores all the recorded statuses, dividing them by node name. // There is a hard cap of how many nodes are managed, and how many Statuses are recorded per node. type Recorder struct { nodes map[string]*NodeRecorder nodeCapacity int maxNodes int + maxSize int timestamper Timestamper -} - -type Option func(*Recorder) - -func WithNodeCapacity(nodeCapacity int) Option { - return func(rec *Recorder) { - rec.nodeCapacity = nodeCapacity - } -} - -func WithNodeTimestamper(tsr func() time.Time) Option { - return func(rec *Recorder) { - rec.timestamper = tsr - } -} - -func WithMaxNodes(maxNodes int) Option { - return func(rec *Recorder) { - rec.maxNodes = maxNodes - } + coalesceLast bool } // NewRecorder creates a new recorder up to the given node count, each with the given capacity. @@ -205,7 +220,7 @@ func NewRecorder(opts ...Option) (*Recorder, error) { return &rec, nil } -// Cap returns the maximum nodes allowed in this Recorder +// MaxNodes returns the maximum nodes allowed in this Recorder func (rr *Recorder) MaxNodes() int { return rr.maxNodes } @@ -238,6 +253,16 @@ func (rr *Recorder) Len() int { return tot } +// IsCoalescing returns true if the recorder is configured to push statuses only if they are unique in PFPs +func (rr *Recorder) IsCoalescing() bool { + return rr.coalesceLast +} + +// MaxSize returns the maximum size allowed per NodeRecorder in rr +func (rr *Recorder) MaxSize() int { + return rr.maxSize +} + // Push adds a new Status to the record for its node, evicting the oldest Status // belonging to the same node if necessary. // Per-node records are created lazily as needed, up to the configured maximum. @@ -257,7 +282,7 @@ func (rr *Recorder) Push(st podfingerprint.Status) error { } if !ok { - nr, err = NewNodeRecorder(st.NodeName, WithTimestamper(rr.timestamper), WithCapacity(rr.nodeCapacity)) + nr, err = NewNodeRecorder(st.NodeName, WithTimestamper(rr.timestamper), WithCapacity(rr.nodeCapacity), WithMaxSize(rr.maxSize), WithCoalescing(rr.coalesceLast)) if err != nil { return err } @@ -266,7 +291,7 @@ func (rr *Recorder) Push(st podfingerprint.Status) error { return nr.Push(st) } -// Content() returns a shallow copy of all the recorded statuses, by node name. +// Content returns a shallow copy of all the recorded statuses, by node name. func (rr *Recorder) Content() map[string][]RecordedStatus { ret := make(map[string][]RecordedStatus, len(rr.nodes)) for nodeName, nr := range rr.nodes { diff --git a/pkg/pfpstatus/record/recorder_test.go b/pkg/pfpstatus/record/recorder_test.go index 9c60b02d..cedf809c 100644 --- a/pkg/pfpstatus/record/recorder_test.go +++ b/pkg/pfpstatus/record/recorder_test.go @@ -19,7 +19,9 @@ package record import ( "errors" "fmt" + "io" "testing" + "time" "github.com/k8stopologyawareschedwg/podfingerprint" ) @@ -392,3 +394,140 @@ func TestRecorderMultiPushMultipleNodes(t *testing.T) { t.Fatalf("unexpected status count for %q", "node-0") } } + +func TestRecorderWithCoalescing(t *testing.T) { + rr, err := NewRecorder(WithMaxNodes(1), WithNodeCapacity(10), WithPFPCoalescing(true)) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if rr.IsCoalescing() != true { + t.Fatalf("IsCoalescing should be true") + } + + st := podfingerprint.Status{ + FingerprintExpected: "pfp-exp-st1", + FingerprintComputed: "pfp-comp-st1", + NodeName: "node-1", + } + + rr.Push(st) + rs1 := rr.Content()["node-1"][0] + rr.Push(st) + rs2 := rr.Content()["node-1"][0] + if !rs2.RecordTime.Equal(rs1.RecordTime) { + t.Fatalf("RecordTime should not change, %v vs %v", rs2.RecordTime, rs1.RecordTime) + } + rr.Push(st) + rs3 := rr.Content()["node-1"][0] + if !rs3.RecordTime.Equal(rs1.RecordTime) { + t.Fatalf("RecordTime should not change, %v vs %v", rs3.RecordTime, rs1.RecordTime) + } + + if rr.CountRecords("node-1") != 1 { + t.Fatalf("unexpected status count with WithCoalescing option set to true") + } + + if rr.nodes["node-1"].IsCoalescing() != true { + t.Fatalf("unexpected status with WithCoalescing option set to true") + } + + st.FingerprintExpected = "pfp-exp-st2" + rr.Push(st) + if rr.CountRecords("node-1") != 2 { + t.Fatalf("unexpected status count") + } + + st.FingerprintComputed = "pfp-comp-st2" + rr.Push(st) + if rr.CountRecords("node-1") != 3 { + t.Fatalf("unexpected status count") + } +} + +func TestRecorderWithMaxSize(t *testing.T) { + rr, err := NewRecorder(WithMaxNodes(8), WithNodeCapacity(5), WithMaxSizePerNode(1500)) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if rr.MaxSize() != 1500 { + t.Fatalf("unexpected max size: %d", rr.MaxSize()) + } +} + +func TestNodeRecorderWithMaxSize(t *testing.T) { + rt := time.Now() // to manage accurate size checks + rtf := func() time.Time { + return rt + } + + st := podfingerprint.Status{ + FingerprintExpected: "pfp-exp-st1", + FingerprintComputed: "pfp-comp-st1", + NodeName: "node-0", + } + rs := RecordedStatus{ + Status: st, + RecordTime: rt, + } + stSize := rs.Size() + maxSize := 2*stSize + 1000 + + nr, err := NewNodeRecorder("node-0", WithCapacity(5), WithTimestamper(rtf), WithMaxSize(maxSize)) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if nr.MaxSize() != maxSize { + t.Fatalf("unexpected max size: %d vs %d", nr.MaxSize(), maxSize) + } + + for i := 0; i < 2; i++ { + if err := nr.Push(st); err != nil { + t.Fatalf("unexpected error: %v", err) + } + } + + stSize += rs.Size() + if nr.Len() != 2 { + t.Fatalf("unexpected length: %d", nr.Len()) + } + if nr.size != stSize { + t.Fatalf("unexpected size: %d vs %d", nr.size, stSize) + } + + var pods []podfingerprint.NamespacedName + for i := 0; getSize(pods) < maxSize; i++ { + pods = append(pods, podfingerprint.NamespacedName{ + Namespace: fmt.Sprintf("pod-name %d", i), + Name: fmt.Sprintf("namespace-name %d", i), + }) + } + + rs2 := RecordedStatus{ + Status: podfingerprint.Status{ + FingerprintExpected: "pfp-exp-st2", + FingerprintComputed: "pfp-comp-st2", + Pods: pods, + NodeName: st.NodeName, + }, + RecordTime: rt, + } + + if err := nr.Push(rs2.Status); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if nr.Len() != 1 { + t.Fatalf("unexpected length: expected 1 found %d", nr.Len()) + } + + if nr.size != rs2.Size() { + t.Fatalf("unexpected size: expected %d found %d", rs2.Size(), nr.size) + } +} + +func getSize(pods []podfingerprint.NamespacedName) int { + byteCount, _ := fmt.Fprint(io.Discard, fmt.Sprintf("%+v\n", pods)) + return byteCount + +}