Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 33 additions & 10 deletions fn.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (

"github.com/crossplane/crossplane-runtime/v2/pkg/errors"
"github.com/crossplane/crossplane-runtime/v2/pkg/logging"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/structpb"
"k8s.io/apimachinery/pkg/runtime"
"kcl-lang.io/krm-kcl/pkg/api"
Expand Down Expand Up @@ -42,10 +44,20 @@ type Function struct {

log logging.Logger
dependencies string
recycler *recycler
cache *renderCache
}

// RunFunction runs the Function.
func (f *Function) RunFunction(_ context.Context, req *fnv1.RunFunctionRequest) (*fnv1.RunFunctionResponse, error) {
// Reject new work while the process is draining for a memory recycle so
// Crossplane retries this reconcile elsewhere instead of having it cut off
// mid-render. begin() also bounds the recycle drain to in-flight calls.
if !f.recycler.begin() {
return nil, status.Error(codes.Unavailable, "function-kcl is recycling to release native memory; retry")
}
defer f.recycler.end()

log := f.log.WithValues("tag", req.GetMeta().GetTag())
log.Debug("Running Function")

Expand Down Expand Up @@ -189,7 +201,6 @@ func (f *Function) RunFunction(_ context.Context, req *fnv1.RunFunctionRequest)
response.Fatal(rsp, err)
return rsp, nil
}
inputBytes, outputBytes := bytes.NewBuffer([]byte{}), bytes.NewBuffer([]byte{})
// Convert the function-kcl KCLInput to the KRM-KCL spec and run function pipelines.
// Input Example: https://github.com/kcl-lang/krm-kcl/blob/main/examples/mutation/set-annotations/suite/good.yaml
in.APIVersion = v1alpha1.KCLRunAPIVersion
Expand All @@ -200,16 +211,28 @@ func (f *Function) RunFunction(_ context.Context, req *fnv1.RunFunctionRequest)
response.Fatal(rsp, errors.Wrap(err, "cannot marshal input to yaml"))
return rsp, nil
}
inputBytes.Write(kclRunBytes)
// Run pipeline to get the result mutated or validated by the KCL source.
pipeline := kio.NewPipeline(inputBytes, outputBytes, false)

if err := pipeline.Execute(); err != nil {
response.Fatal(rsp, errors.Wrap(err, "failed to run kcl function pipelines"))
return rsp, nil
// The KCL render is deterministic over kclRunBytes (source + dependencies +
// all params + config). Reuse the previous output for byte-identical inputs
// to skip recompiling the module — this avoids both the CPU cost and a native
// memory-leak increment on no-op re-syncs. See rendercache.go.
var outputData []byte
if cached, ok := f.cache.lookup(kclRunBytes); ok {
outputData = cached
hits, misses := f.cache.stats()
log.Debug("render cache hit", "hits", hits, "misses", misses)
} else {
inputBytes, outputBytes := bytes.NewBuffer(kclRunBytes), bytes.NewBuffer([]byte{})
// Run pipeline to get the result mutated or validated by the KCL source.
pipeline := kio.NewPipeline(inputBytes, outputBytes, false)
if err := pipeline.Execute(); err != nil {
response.Fatal(rsp, errors.Wrap(err, "failed to run kcl function pipelines"))
return rsp, nil
}
outputData = outputBytes.Bytes()
f.cache.store(kclRunBytes, outputData)
}
log.Debug(fmt.Sprintf("Pipeline output: %v", outputBytes.String()))
data, err := pkgresource.DataResourcesFromYaml(outputBytes.Bytes())
log.Debug(fmt.Sprintf("Pipeline output: %v", string(outputData)))
data, err := pkgresource.DataResourcesFromYaml(outputData)
if err != nil {
response.Fatal(rsp, errors.Wrapf(err, "cannot parse data resources from the pipeline output in %T", rsp))
return rsp, nil
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/go-logr/logr v1.4.3
github.com/google/go-cmp v0.7.0
github.com/pkg/errors v0.9.1
google.golang.org/grpc v1.78.0
google.golang.org/protobuf v1.36.11
gopkg.in/yaml.v2 v2.4.0
k8s.io/apimachinery v0.35.4
Expand Down Expand Up @@ -244,7 +245,6 @@ require (
google.golang.org/genproto v0.0.0-20260128011058-8636f8732409 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20260203192932-546029d2fa20 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260203192932-546029d2fa20 // indirect
google.golang.org/grpc v1.78.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
13 changes: 12 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,18 @@ func (c *CLI) Run() error {
if err != nil {
return err
}
return function.Serve(&Function{dependencies: dependencies, log: log},
// Watchdog that recycles the process before the KCL native memory leak can
// OOMKill it mid-reconcile. Configured via FUNCTION_KCL_MAX_* env vars;
// no-op when no trigger is enabled.
rec := newRecycler(log, recycleConfigFromEnv())
rec.run()
// Optional render cache: memoise KCL output for byte-identical reconciles to
// skip recompilation (CPU + leak). Enabled via FUNCTION_KCL_RENDER_CACHE_SIZE.
cache := newRenderCacheFromEnv()
if cache.enabled() {
log.Info("render cache enabled", "maxEntries", cache.max, "ttl", cache.ttl.String())
}
return function.Serve(&Function{dependencies: dependencies, log: log, recycler: rec, cache: cache},
function.Listen(c.Network, c.Address),
function.MTLSCertificates(c.TLSCertsDir),
function.Insecure(c.Insecure),
Expand Down
Loading
Loading