diff --git a/fn.go b/fn.go index e60a219..0acc940 100644 --- a/fn.go +++ b/fn.go @@ -11,6 +11,8 @@ import ( "github.com/crossplane/crossplane-runtime/v2/pkg/errors" "github.com/crossplane/crossplane-runtime/v2/pkg/logging" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/structpb" "k8s.io/apimachinery/pkg/runtime" "kcl-lang.io/krm-kcl/pkg/api" @@ -42,10 +44,20 @@ type Function struct { log logging.Logger dependencies string + recycler *recycler + cache *renderCache } // RunFunction runs the Function. func (f *Function) RunFunction(_ context.Context, req *fnv1.RunFunctionRequest) (*fnv1.RunFunctionResponse, error) { + // Reject new work while the process is draining for a memory recycle so + // Crossplane retries this reconcile elsewhere instead of having it cut off + // mid-render. begin() also bounds the recycle drain to in-flight calls. + if !f.recycler.begin() { + return nil, status.Error(codes.Unavailable, "function-kcl is recycling to release native memory; retry") + } + defer f.recycler.end() + log := f.log.WithValues("tag", req.GetMeta().GetTag()) log.Debug("Running Function") @@ -189,7 +201,6 @@ func (f *Function) RunFunction(_ context.Context, req *fnv1.RunFunctionRequest) response.Fatal(rsp, err) return rsp, nil } - inputBytes, outputBytes := bytes.NewBuffer([]byte{}), bytes.NewBuffer([]byte{}) // Convert the function-kcl KCLInput to the KRM-KCL spec and run function pipelines. // Input Example: https://github.com/kcl-lang/krm-kcl/blob/main/examples/mutation/set-annotations/suite/good.yaml in.APIVersion = v1alpha1.KCLRunAPIVersion @@ -200,16 +211,28 @@ func (f *Function) RunFunction(_ context.Context, req *fnv1.RunFunctionRequest) response.Fatal(rsp, errors.Wrap(err, "cannot marshal input to yaml")) return rsp, nil } - inputBytes.Write(kclRunBytes) - // Run pipeline to get the result mutated or validated by the KCL source. - pipeline := kio.NewPipeline(inputBytes, outputBytes, false) - - if err := pipeline.Execute(); err != nil { - response.Fatal(rsp, errors.Wrap(err, "failed to run kcl function pipelines")) - return rsp, nil + // The KCL render is deterministic over kclRunBytes (source + dependencies + + // all params + config). Reuse the previous output for byte-identical inputs + // to skip recompiling the module — this avoids both the CPU cost and a native + // memory-leak increment on no-op re-syncs. See rendercache.go. + var outputData []byte + if cached, ok := f.cache.lookup(kclRunBytes); ok { + outputData = cached + hits, misses := f.cache.stats() + log.Debug("render cache hit", "hits", hits, "misses", misses) + } else { + inputBytes, outputBytes := bytes.NewBuffer(kclRunBytes), bytes.NewBuffer([]byte{}) + // Run pipeline to get the result mutated or validated by the KCL source. + pipeline := kio.NewPipeline(inputBytes, outputBytes, false) + if err := pipeline.Execute(); err != nil { + response.Fatal(rsp, errors.Wrap(err, "failed to run kcl function pipelines")) + return rsp, nil + } + outputData = outputBytes.Bytes() + f.cache.store(kclRunBytes, outputData) } - log.Debug(fmt.Sprintf("Pipeline output: %v", outputBytes.String())) - data, err := pkgresource.DataResourcesFromYaml(outputBytes.Bytes()) + log.Debug(fmt.Sprintf("Pipeline output: %v", string(outputData))) + data, err := pkgresource.DataResourcesFromYaml(outputData) if err != nil { response.Fatal(rsp, errors.Wrapf(err, "cannot parse data resources from the pipeline output in %T", rsp)) return rsp, nil diff --git a/go.mod b/go.mod index 5f59b68..09f3c95 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/go-logr/logr v1.4.3 github.com/google/go-cmp v0.7.0 github.com/pkg/errors v0.9.1 + google.golang.org/grpc v1.78.0 google.golang.org/protobuf v1.36.11 gopkg.in/yaml.v2 v2.4.0 k8s.io/apimachinery v0.35.4 @@ -244,7 +245,6 @@ require ( google.golang.org/genproto v0.0.0-20260128011058-8636f8732409 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260203192932-546029d2fa20 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260203192932-546029d2fa20 // indirect - google.golang.org/grpc v1.78.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/main.go b/main.go index 4edb7eb..26c615e 100644 --- a/main.go +++ b/main.go @@ -36,7 +36,18 @@ func (c *CLI) Run() error { if err != nil { return err } - return function.Serve(&Function{dependencies: dependencies, log: log}, + // Watchdog that recycles the process before the KCL native memory leak can + // OOMKill it mid-reconcile. Configured via FUNCTION_KCL_MAX_* env vars; + // no-op when no trigger is enabled. + rec := newRecycler(log, recycleConfigFromEnv()) + rec.run() + // Optional render cache: memoise KCL output for byte-identical reconciles to + // skip recompilation (CPU + leak). Enabled via FUNCTION_KCL_RENDER_CACHE_SIZE. + cache := newRenderCacheFromEnv() + if cache.enabled() { + log.Info("render cache enabled", "maxEntries", cache.max, "ttl", cache.ttl.String()) + } + return function.Serve(&Function{dependencies: dependencies, log: log, recycler: rec, cache: cache}, function.Listen(c.Network, c.Address), function.MTLSCertificates(c.TLSCertsDir), function.Insecure(c.Insecure), diff --git a/recycle.go b/recycle.go new file mode 100644 index 0000000..602067f --- /dev/null +++ b/recycle.go @@ -0,0 +1,348 @@ +package main + +import ( + "os" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/crossplane/crossplane-runtime/v2/pkg/logging" +) + +// The KCL native runtime accumulates off-heap memory across reconciles (it +// recompiles the whole KCL module on every call and the native service handle +// is a long-lived process-global singleton that is never reset). Left alone, +// function-kcl pods climb to their memory limit and get OOMKilled (exit 137), +// which violently drops in-flight reconciles -> "connection reset by peer" / +// DeadlineExceeded on heavy composites. +// +// The recycler turns those hard OOMKills into clean, scheduled restarts. A +// watchdog samples the process RSS (which includes the native off-heap +// allocations) plus optional reconcile-count / lifetime limits. When a limit is +// crossed it drains in-flight RunFunction calls and exits 0 so Kubernetes +// restarts the pod *between* reconciles instead of *during* one. With multiple +// replicas this is a rolling, self-healing recycle and removes the need for a +// manual rollout restart. +// +// This is a stability backstop; the render cache (see rendercache.go) avoids the +// recompile — and its leak increment — for byte-identical reconciles. The two +// are complementary: the backstop bounds any residual growth regardless of the +// leak's exact internals. + +// recycleConfig is read from the environment so it can be tuned per deployment +// via container env vars, without a rebuild. +type recycleConfig struct { + // maxRSSBytes recycles the process once its resident set size reaches this + // many bytes. 0 disables the RSS trigger. + maxRSSBytes uint64 + // maxRSSRatio is used to derive maxRSSBytes from the detected cgroup memory + // limit when maxRSSBytes is not set explicitly. + maxRSSRatio float64 + // maxReconciles recycles after this many RunFunction calls. 0 disables. + maxReconciles uint64 + // maxLifetime recycles after the process has been up this long. 0 disables. + maxLifetime time.Duration + // checkInterval is how often the watchdog samples the triggers. + checkInterval time.Duration + // drainTimeout bounds how long we wait for in-flight calls to finish before + // exiting anyway. + drainTimeout time.Duration +} + +const ( + envMaxRSSBytes = "FUNCTION_KCL_MAX_RSS_BYTES" + envMaxRSSRatio = "FUNCTION_KCL_MAX_RSS_RATIO" + envMaxReconciles = "FUNCTION_KCL_MAX_RECONCILES" + envMaxLifetime = "FUNCTION_KCL_MAX_LIFETIME" + envCheckInterval = "FUNCTION_KCL_RECYCLE_CHECK_INTERVAL" + envDrainTimeout = "FUNCTION_KCL_RECYCLE_DRAIN_TIMEOUT" + + defaultMaxRSSRatio = 0.85 + defaultCheckInterval = 30 * time.Second + defaultDrainTimeout = 15 * time.Second +) + +// recycleConfigFromEnv builds the config from environment variables, applying +// defaults. If no explicit RSS limit is set, it derives one from the cgroup +// memory limit so the in-cluster default "just works" (recycle at 85% of the +// pod's memory limit, before the kubelet OOMKills at 100%). +func recycleConfigFromEnv() recycleConfig { + cfg := recycleConfig{ + maxRSSRatio: envFloat(envMaxRSSRatio, defaultMaxRSSRatio), + maxRSSBytes: envBytes(envMaxRSSBytes, 0), + maxReconciles: envUint(envMaxReconciles, 0), + maxLifetime: envDuration(envMaxLifetime, 0), + checkInterval: envDuration(envCheckInterval, defaultCheckInterval), + drainTimeout: envDuration(envDrainTimeout, defaultDrainTimeout), + } + if cfg.maxRSSBytes == 0 && cfg.maxRSSRatio > 0 { + if limit, ok := cgroupMemoryLimit(); ok { + cfg.maxRSSBytes = uint64(float64(limit) * cfg.maxRSSRatio) + } + } + return cfg +} + +// enabled reports whether any recycle trigger is active. When false the +// watchdog never fires, so behaviour is identical to upstream (useful for local +// runs and tests with no cgroup limit). +func (c recycleConfig) enabled() bool { + return c.maxRSSBytes > 0 || c.maxReconciles > 0 || c.maxLifetime > 0 +} + +// recycler tracks in-flight RunFunction calls and drives the watchdog. The zero +// value and a nil *recycler are both safe no-ops, so existing tests that build a +// Function without one are unaffected. +type recycler struct { + log logging.Logger + cfg recycleConfig + start time.Time + + // gate is held for reading for the duration of each RunFunction call. + // Draining acquires it for writing, which blocks until all in-flight calls + // release it and prevents new ones from starting. + gate sync.RWMutex + inflight atomic.Int64 + count atomic.Uint64 + draining atomic.Bool + + // Injectable for tests. + readRSS func() (uint64, error) + exit func(int) + now func() time.Time +} + +func newRecycler(log logging.Logger, cfg recycleConfig) *recycler { + return &recycler{ + log: log, + cfg: cfg, + start: time.Now(), + readRSS: processRSS, + exit: os.Exit, + now: time.Now, + } +} + +// begin marks the start of a RunFunction call. It returns false if the process +// is draining, in which case the caller must reject the request so Crossplane +// retries it (against another replica or after the restart). A nil recycler +// always admits the call. +func (r *recycler) begin() bool { + if r == nil { + return true + } + if r.draining.Load() { + return false + } + r.gate.RLock() + r.inflight.Add(1) + r.count.Add(1) + return true +} + +// end marks the completion of a RunFunction call. +func (r *recycler) end() { + if r == nil { + return + } + r.inflight.Add(-1) + r.gate.RUnlock() +} + +// start launches the watchdog goroutine if any trigger is enabled. +func (r *recycler) run() { + if r == nil || !r.cfg.enabled() { + return + } + r.log.Info("memory recycler enabled", + "maxRSSBytes", r.cfg.maxRSSBytes, + "maxReconciles", r.cfg.maxReconciles, + "maxLifetime", r.cfg.maxLifetime.String(), + "checkInterval", r.cfg.checkInterval.String()) + go r.watch() +} + +func (r *recycler) watch() { + ticker := time.NewTicker(r.cfg.checkInterval) + defer ticker.Stop() + for range ticker.C { + if reason := r.shouldRecycle(); reason != "" { + r.recycle(reason) + return + } + } +} + +// shouldRecycle returns a non-empty human-readable reason if any trigger has +// fired, else "". +func (r *recycler) shouldRecycle() string { + if r.cfg.maxReconciles > 0 && r.count.Load() >= r.cfg.maxReconciles { + return "reconcile count " + strconv.FormatUint(r.count.Load(), 10) + + " >= " + strconv.FormatUint(r.cfg.maxReconciles, 10) + } + if r.cfg.maxLifetime > 0 && r.now().Sub(r.start) >= r.cfg.maxLifetime { + return "lifetime " + r.now().Sub(r.start).Round(time.Second).String() + + " >= " + r.cfg.maxLifetime.String() + } + if r.cfg.maxRSSBytes > 0 { + rss, err := r.readRSS() + if err != nil { + r.log.Debug("cannot read process RSS", "error", err) + return "" + } + if rss >= r.cfg.maxRSSBytes { + return "RSS " + strconv.FormatUint(rss, 10) + + " >= " + strconv.FormatUint(r.cfg.maxRSSBytes, 10) + " bytes" + } + } + return "" +} + +// recycle drains in-flight calls (bounded by drainTimeout) and exits cleanly so +// the orchestrator restarts the pod. +func (r *recycler) recycle(reason string) { + if !r.draining.CompareAndSwap(false, true) { + return + } + r.log.Info("recycling function-kcl to release native memory", + "reason", reason, "inflight", r.inflight.Load()) + + drained := make(chan struct{}) + go func() { + r.gate.Lock() // waits for all in-flight RunFunction calls to return + close(drained) + }() + + select { + case <-drained: + r.log.Info("drained in-flight reconciles, exiting for clean restart") + case <-time.After(r.cfg.drainTimeout): + r.log.Info("drain timed out, forcing restart", + "inflight", r.inflight.Load(), "timeout", r.cfg.drainTimeout.String()) + } + r.exit(0) +} + +// processRSS returns the resident set size of the current process in bytes by +// reading /proc/self/statm. RSS includes the KCL native (off-heap) allocations, +// which is exactly what we need to bound. +func processRSS() (uint64, error) { + data, err := os.ReadFile("/proc/self/statm") + if err != nil { + return 0, err + } + fields := strings.Fields(string(data)) + if len(fields) < 2 { + return 0, errInvalidStatm + } + residentPages, err := strconv.ParseUint(fields[1], 10, 64) + if err != nil { + return 0, err + } + return residentPages * uint64(os.Getpagesize()), nil +} + +// cgroupMemoryLimit returns the container memory limit in bytes, trying cgroup +// v2 then v1. ok is false if no finite limit is set. +func cgroupMemoryLimit() (uint64, bool) { + // cgroup v2 + if v, ok := readCgroupLimit("/sys/fs/cgroup/memory.max"); ok { + return v, true + } + // cgroup v1 + if v, ok := readCgroupLimit("/sys/fs/cgroup/memory/memory.limit_in_bytes"); ok { + return v, true + } + return 0, false +} + +func readCgroupLimit(path string) (uint64, bool) { + data, err := os.ReadFile(path) + if err != nil { + return 0, false + } + s := strings.TrimSpace(string(data)) + if s == "" || s == "max" { + return 0, false + } + v, err := strconv.ParseUint(s, 10, 64) + if err != nil { + return 0, false + } + // cgroup v1 reports a sentinel "unlimited" value close to max uint64; treat + // anything implausibly large (>= 1 PiB) as no limit. + if v == 0 || v >= 1<<50 { + return 0, false + } + return v, true +} + +var errInvalidStatm = &recycleError{"unexpected /proc/self/statm format"} + +type recycleError struct{ msg string } + +func (e *recycleError) Error() string { return e.msg } + +// envUint, envFloat, envBytes and envDuration parse optional environment +// variables, falling back to def on absence or parse error. +func envUint(key string, def uint64) uint64 { + v, ok := os.LookupEnv(key) + if !ok { + return def + } + n, err := strconv.ParseUint(strings.TrimSpace(v), 10, 64) + if err != nil { + return def + } + return n +} + +func envFloat(key string, def float64) float64 { + v, ok := os.LookupEnv(key) + if !ok { + return def + } + f, err := strconv.ParseFloat(strings.TrimSpace(v), 64) + if err != nil { + return def + } + return f +} + +func envDuration(key string, def time.Duration) time.Duration { + v, ok := os.LookupEnv(key) + if !ok { + return def + } + d, err := time.ParseDuration(strings.TrimSpace(v)) + if err != nil { + return def + } + return d +} + +// envBytes parses a byte size that may carry a binary unit suffix (Ki, Mi, Gi) +// or be a plain integer count of bytes. +func envBytes(key string, def uint64) uint64 { + v, ok := os.LookupEnv(key) + if !ok { + return def + } + s := strings.TrimSpace(v) + mult := uint64(1) + switch { + case strings.HasSuffix(s, "Gi"): + mult, s = 1<<30, strings.TrimSuffix(s, "Gi") + case strings.HasSuffix(s, "Mi"): + mult, s = 1<<20, strings.TrimSuffix(s, "Mi") + case strings.HasSuffix(s, "Ki"): + mult, s = 1<<10, strings.TrimSuffix(s, "Ki") + } + n, err := strconv.ParseUint(strings.TrimSpace(s), 10, 64) + if err != nil { + return def + } + return n * mult +} diff --git a/recycle_test.go b/recycle_test.go new file mode 100644 index 0000000..6d987cb --- /dev/null +++ b/recycle_test.go @@ -0,0 +1,230 @@ +package main + +import ( + "os" + "sync" + "testing" + "time" + + "github.com/crossplane/crossplane-runtime/v2/pkg/logging" +) + +func newTestRecycler(cfg recycleConfig) *recycler { + r := newRecycler(logging.NewNopLogger(), cfg) + r.exit = func(int) {} // never really exit during tests + return r +} + +func TestNilRecyclerAdmits(t *testing.T) { + var r *recycler + if !r.begin() { + t.Fatal("nil recycler must admit calls") + } + r.end() // must not panic +} + +func TestBeginRejectsWhenDraining(t *testing.T) { + r := newTestRecycler(recycleConfig{}) + if !r.begin() { + t.Fatal("expected admit before draining") + } + r.end() + + r.draining.Store(true) + if r.begin() { + t.Fatal("expected reject while draining") + } +} + +func TestRecycleDrainsInflightThenExits(t *testing.T) { + r := newTestRecycler(recycleConfig{drainTimeout: 2 * time.Second}) + + exited := make(chan int, 1) + r.exit = func(code int) { exited <- code } + + // Simulate an in-flight call holding the gate. + if !r.begin() { + t.Fatal("expected admit") + } + + recycleReturned := make(chan struct{}) + go func() { + r.recycle("test") + close(recycleReturned) + }() + + // recycle must not exit while a call is in flight. + select { + case <-exited: + t.Fatal("exited before in-flight call finished") + case <-time.After(150 * time.Millisecond): + } + + // New calls are rejected once draining started. + if r.begin() { + t.Fatal("expected reject during drain") + } + + // Finish the in-flight call; drain should now complete and exit cleanly. + r.end() + + select { + case code := <-exited: + if code != 0 { + t.Fatalf("expected exit code 0, got %d", code) + } + case <-time.After(2 * time.Second): + t.Fatal("recycle did not exit after drain") + } + <-recycleReturned +} + +func TestRecycleDrainTimeout(t *testing.T) { + r := newTestRecycler(recycleConfig{drainTimeout: 100 * time.Millisecond}) + exited := make(chan int, 1) + r.exit = func(code int) { exited <- code } + + // Hold a call in-flight and never release it; recycle must still exit via + // the drain timeout. + if !r.begin() { + t.Fatal("expected admit") + } + defer r.end() + + go r.recycle("stuck") + + select { + case code := <-exited: + if code != 0 { + t.Fatalf("expected exit code 0, got %d", code) + } + case <-time.After(2 * time.Second): + t.Fatal("recycle did not force-exit on drain timeout") + } +} + +func TestRecycleIsIdempotent(t *testing.T) { + r := newTestRecycler(recycleConfig{drainTimeout: time.Second}) + var exits int + var mu sync.Mutex + r.exit = func(int) { mu.Lock(); exits++; mu.Unlock() } + + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { defer wg.Done(); r.recycle("race") }() + } + wg.Wait() + time.Sleep(50 * time.Millisecond) + + mu.Lock() + defer mu.Unlock() + if exits != 1 { + t.Fatalf("expected exactly one exit, got %d", exits) + } +} + +func TestShouldRecycleTriggers(t *testing.T) { + cases := []struct { + name string + cfg recycleConfig + setup func(*recycler) + expect bool + }{ + { + name: "rss over limit", + cfg: recycleConfig{maxRSSBytes: 1000}, + setup: func(r *recycler) { r.readRSS = func() (uint64, error) { return 2000, nil } }, + expect: true, + }, + { + name: "rss under limit", + cfg: recycleConfig{maxRSSBytes: 1000}, + setup: func(r *recycler) { r.readRSS = func() (uint64, error) { return 500, nil } }, + expect: false, + }, + { + name: "reconcile count reached", + cfg: recycleConfig{maxReconciles: 3}, + setup: func(r *recycler) { r.count.Store(3) }, + expect: true, + }, + { + name: "lifetime reached", + cfg: recycleConfig{maxLifetime: time.Hour}, + setup: func(r *recycler) { + base := r.start + r.now = func() time.Time { return base.Add(2 * time.Hour) } + }, + expect: true, + }, + { + name: "nothing enabled", + cfg: recycleConfig{}, + setup: func(*recycler) {}, + expect: false, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + r := newTestRecycler(tc.cfg) + tc.setup(r) + if got := r.shouldRecycle() != ""; got != tc.expect { + t.Fatalf("shouldRecycle()=%v, want %v", got, tc.expect) + } + }) + } +} + +func TestEnabled(t *testing.T) { + if (recycleConfig{}).enabled() { + t.Fatal("empty config must be disabled") + } + if !(recycleConfig{maxRSSBytes: 1}).enabled() { + t.Fatal("rss trigger must enable") + } + if !(recycleConfig{maxReconciles: 1}).enabled() { + t.Fatal("reconcile trigger must enable") + } + if !(recycleConfig{maxLifetime: time.Second}).enabled() { + t.Fatal("lifetime trigger must enable") + } +} + +func TestEnvBytes(t *testing.T) { + cases := map[string]uint64{ + "1024": 1024, + "1Ki": 1 << 10, + "6Gi": 6 << 30, + "512Mi": 512 << 20, + "bad": 7, // falls back to def + " 2Gi ": 2 << 30, + } + for in, want := range cases { + t.Setenv("X_TEST_BYTES", in) + if got := envBytes("X_TEST_BYTES", 7); got != want { + t.Errorf("envBytes(%q)=%d, want %d", in, got, want) + } + } +} + +func TestReadCgroupLimitRejectsSentinels(t *testing.T) { + dir := t.TempDir() + write := func(name, content string) string { + p := dir + "/" + name + if err := os.WriteFile(p, []byte(content), 0o600); err != nil { + t.Fatal(err) + } + return p + } + if _, ok := readCgroupLimit(write("max", "max")); ok { + t.Error(`"max" must be treated as no limit`) + } + if _, ok := readCgroupLimit(write("huge", "9223372036854771712")); ok { + t.Error("v1 unlimited sentinel must be treated as no limit") + } + v, ok := readCgroupLimit(write("real", "8589934592")) + if !ok || v != 8<<30 { + t.Errorf("real limit: got %d ok=%v, want %d true", v, ok, uint64(8<<30)) + } +} diff --git a/rendercache.go b/rendercache.go new file mode 100644 index 0000000..2efcc7d --- /dev/null +++ b/rendercache.go @@ -0,0 +1,149 @@ +package main + +import ( + "container/list" + "crypto/sha256" + "sync" + "sync/atomic" + "time" +) + +// A Crossplane composition function is deterministic: the same RunFunctionRequest +// must always yield the same response. function-kcl recompiles and re-executes +// the whole KCL module on every reconcile, which is both the CPU hot spot +// (~4 cores under load) and the driver of the native (off-heap) memory leak. +// +// In steady state most reconciles are no-op re-syncs: Crossplane periodically +// reconciles converged composites with byte-identical inputs. The render cache +// short-circuits those — it memoises the KCL pipeline output keyed on the exact +// bytes fed to the pipeline (source + dependencies + all params + config), so an +// identical reconcile returns the cached output without touching the KCL native +// runtime at all. That cuts the CPU peg and avoids a recompile (hence a leak +// increment) on every cache hit. +// +// It does NOT help when inputs genuinely change every reconcile (e.g. a composite +// actively churning during a rollout) — those still recompile. The graceful +// recycler (recycle.go) is the backstop that bounds any residual growth. +// +// Safety: caching is sound because the function is deterministic over its input, +// and the cache key is the complete serialized input. It is opt-in via +// FUNCTION_KCL_RENDER_CACHE_SIZE (entries; 0 = disabled) with an optional TTL. + +type renderCache struct { + mu sync.Mutex + max int + ttl time.Duration + ll *list.List // front = most recently used + items map[string]*list.Element + + hits atomic.Uint64 + misses atomic.Uint64 + + now func() time.Time // injectable for tests +} + +type renderCacheEntry struct { + key string + value []byte + stored time.Time +} + +const ( + envRenderCacheSize = "FUNCTION_KCL_RENDER_CACHE_SIZE" + envRenderCacheTTL = "FUNCTION_KCL_RENDER_CACHE_TTL" +) + +// newRenderCacheFromEnv returns a cache configured from the environment, or nil +// when disabled (size <= 0). A nil *renderCache is a safe no-op. +func newRenderCacheFromEnv() *renderCache { + return newRenderCache(int(envUint(envRenderCacheSize, 0)), envDuration(envRenderCacheTTL, 0)) +} + +// newRenderCache returns a cache holding up to max entries with an optional TTL, +// or nil when max <= 0 (disabled). +func newRenderCache(max int, ttl time.Duration) *renderCache { + if max <= 0 { + return nil + } + return &renderCache{ + max: max, + ttl: ttl, + ll: list.New(), + items: make(map[string]*list.Element, max), + now: time.Now, + } +} + +func (c *renderCache) enabled() bool { return c != nil } + +// key derives the cache key from the serialized pipeline input. +func (c *renderCache) key(input []byte) string { + sum := sha256.Sum256(input) + return string(sum[:]) +} + +// lookup returns a cached pipeline output for the given input, if present and +// not expired. The returned slice is read-only for callers. Nil/disabled caches +// always miss. +func (c *renderCache) lookup(input []byte) ([]byte, bool) { + if c == nil { + return nil, false + } + k := c.key(input) + c.mu.Lock() + defer c.mu.Unlock() + el, ok := c.items[k] + if !ok { + c.misses.Add(1) + return nil, false + } + ent := el.Value.(*renderCacheEntry) + if c.ttl > 0 && c.now().Sub(ent.stored) >= c.ttl { + c.ll.Remove(el) + delete(c.items, k) + c.misses.Add(1) + return nil, false + } + c.ll.MoveToFront(el) + c.hits.Add(1) + return ent.value, true +} + +// store records a pipeline output for the given input, evicting the least +// recently used entry when over capacity. A defensive copy of output is kept. +func (c *renderCache) store(input, output []byte) { + if c == nil { + return + } + k := c.key(input) + cp := make([]byte, len(output)) + copy(cp, output) + + c.mu.Lock() + defer c.mu.Unlock() + if el, ok := c.items[k]; ok { + ent := el.Value.(*renderCacheEntry) + ent.value = cp + ent.stored = c.now() + c.ll.MoveToFront(el) + return + } + el := c.ll.PushFront(&renderCacheEntry{key: k, value: cp, stored: c.now()}) + c.items[k] = el + for c.ll.Len() > c.max { + oldest := c.ll.Back() + if oldest == nil { + break + } + c.ll.Remove(oldest) + delete(c.items, oldest.Value.(*renderCacheEntry).key) + } +} + +// stats returns hit/miss counters for logging. +func (c *renderCache) stats() (hits, misses uint64) { + if c == nil { + return 0, 0 + } + return c.hits.Load(), c.misses.Load() +} diff --git a/rendercache_test.go b/rendercache_test.go new file mode 100644 index 0000000..5f0c2b9 --- /dev/null +++ b/rendercache_test.go @@ -0,0 +1,150 @@ +package main + +import ( + "context" + "testing" + "time" + + "github.com/crossplane/crossplane-runtime/v2/pkg/logging" + "github.com/google/go-cmp/cmp" + "google.golang.org/protobuf/testing/protocmp" + + fnv1 "github.com/crossplane/function-sdk-go/proto/v1" + "github.com/crossplane/function-sdk-go/resource" +) + +// realRenderCache builds an enabled cache directly so tests don't juggle env vars. +func realRenderCache(max int, ttl time.Duration) *renderCache { + return newRenderCache(max, ttl) +} + +func TestNilRenderCacheIsNoOp(t *testing.T) { + var c *renderCache + if c.enabled() { + t.Fatal("nil cache must be disabled") + } + if _, ok := c.lookup([]byte("x")); ok { + t.Fatal("nil cache must miss") + } + c.store([]byte("x"), []byte("y")) // must not panic + if h, m := c.stats(); h != 0 || m != 0 { + t.Fatalf("nil cache stats must be zero, got %d/%d", h, m) + } +} + +func TestRenderCacheHitMiss(t *testing.T) { + c := realRenderCache(8, 0) + in := []byte("source+params") + if _, ok := c.lookup(in); ok { + t.Fatal("expected miss on empty cache") + } + c.store(in, []byte("rendered")) + got, ok := c.lookup(in) + if !ok || string(got) != "rendered" { + t.Fatalf("expected hit 'rendered', got %q ok=%v", got, ok) + } + if h, m := c.stats(); h != 1 || m != 1 { + t.Fatalf("expected 1 hit 1 miss, got %d/%d", h, m) + } +} + +func TestRenderCacheStoresCopy(t *testing.T) { + c := realRenderCache(8, 0) + in := []byte("k") + out := []byte("abc") + c.store(in, out) + out[0] = 'X' // mutate caller's buffer after store + got, _ := c.lookup(in) + if string(got) != "abc" { + t.Fatalf("cache must keep a copy, got %q", got) + } +} + +func TestRenderCacheLRUEviction(t *testing.T) { + c := realRenderCache(2, 0) + c.store([]byte("a"), []byte("1")) + c.store([]byte("b"), []byte("2")) + // touch "a" so "b" becomes least-recently-used + if _, ok := c.lookup([]byte("a")); !ok { + t.Fatal("a should be present") + } + c.store([]byte("c"), []byte("3")) // evicts "b" + if _, ok := c.lookup([]byte("b")); ok { + t.Fatal("b should have been evicted") + } + if _, ok := c.lookup([]byte("a")); !ok { + t.Fatal("a should still be present") + } + if _, ok := c.lookup([]byte("c")); !ok { + t.Fatal("c should be present") + } +} + +func TestRenderCacheTTL(t *testing.T) { + c := realRenderCache(8, time.Minute) + now := time.Unix(0, 0) + c.now = func() time.Time { return now } + c.store([]byte("k"), []byte("v")) + now = now.Add(30 * time.Second) + if _, ok := c.lookup([]byte("k")); !ok { + t.Fatal("entry should be live before TTL") + } + now = now.Add(31 * time.Second) // total 61s > 60s + if _, ok := c.lookup([]byte("k")); ok { + t.Fatal("entry should be expired after TTL") + } +} + +func TestRenderCacheUpdateExisting(t *testing.T) { + c := realRenderCache(8, 0) + c.store([]byte("k"), []byte("v1")) + c.store([]byte("k"), []byte("v2")) + got, _ := c.lookup([]byte("k")) + if string(got) != "v2" { + t.Fatalf("expected updated value v2, got %q", got) + } + if c.ll.Len() != 1 { + t.Fatalf("expected single entry after update, got %d", c.ll.Len()) + } +} + +// TestRunFunctionCacheHitIsIdentical proves a cached reconcile produces the +// exact same response as the uncached one, and that the second identical call +// is served from cache. +func TestRunFunctionCacheHitIsIdentical(t *testing.T) { + req := &fnv1.RunFunctionRequest{ + Meta: &fnv1.RequestMeta{Tag: "hello"}, + Input: resource.MustStructJSON(`{ + "apiVersion": "krm.kcl.dev/v1alpha1", + "kind": "KCLInput", + "metadata": {"name": "basic"}, + "spec": { + "target": "Default", + "source": "{\n apiVersion: \"example.org/v1\"\n kind: \"Generated\"\n metadata.annotations = {\"krm.kcl.dev/composition-resource-name\": \"custom-composition-resource-name\"}\n}" + } + }`), + Observed: &fnv1.State{ + Composite: &fnv1.Resource{ + Resource: resource.MustStructJSON(`{"apiVersion":"example.org/v1","kind":"XR"}`), + }, + }, + } + + f := &Function{log: logging.NewNopLogger(), cache: realRenderCache(16, 0)} + + first, err := f.RunFunction(context.Background(), req) + if err != nil { + t.Fatalf("first RunFunction: %v", err) + } + second, err := f.RunFunction(context.Background(), req) + if err != nil { + t.Fatalf("second RunFunction: %v", err) + } + + if diff := cmp.Diff(first, second, protocmp.Transform()); diff != "" { + t.Fatalf("cached response differs from uncached (-first +second):\n%s", diff) + } + if h, m := f.cache.stats(); h != 1 || m != 1 { + t.Fatalf("expected exactly 1 hit and 1 miss, got %d hits %d misses", h, m) + } +}