Fix ComponentResource for flink deployment custom interpreter#7537
Fix ComponentResource for flink deployment custom interpreter#7537dahuo98 wants to merge 2 commits into
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses a critical bug in the Flink resource interpreter where memory requirements were being incorrectly parsed due to a conflict between Flink's and Kubernetes' resource notation. By introducing a dedicated parsing function, the interpreter now correctly converts Flink's binary-based memory units into bytes, ensuring accurate resource binding for Flink deployments. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Updates FlinkDeployment resource interpretation to correctly handle Flink-style memory units (where m means mebibytes) by introducing a dedicated parser and updating expected test outputs accordingly.
Changes:
- Added
kube.parseFlinkMemoryLua function to parse Flink memory strings into bytes. - Switched FlinkDeployment customization logic from
getResourceQuantitytoparseFlinkMemoryfor memory fields. - Updated FlinkDeployment interpreter testdata to expect byte values instead of
Xmstrings.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| pkg/resourceinterpreter/default/thirdparty/resourcecustomizations/flink.apache.org/v1beta1/FlinkDeployment/testdata/interpretreplica-test.yaml | Updates expected interpreted memory values to bytes; one expected field appears removed. |
| pkg/resourceinterpreter/default/thirdparty/resourcecustomizations/flink.apache.org/v1beta1/FlinkDeployment/testdata/interpretcomponent-test.yaml | Updates expected component replica memory values to bytes. |
| pkg/resourceinterpreter/default/thirdparty/resourcecustomizations/flink.apache.org/v1beta1/FlinkDeployment/customizations.yaml | Uses kube.parseFlinkMemory for Flink memory parsing and sets requires memory in bytes. |
| pkg/resourceinterpreter/customized/declarative/luavm/kube.go | Adds the parseFlinkMemory Lua function and Flink unit multipliers. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| "memory": "2048m" | ||
| namespace: "test-namespace" | ||
| "memory": "2147483648" | ||
|
|
| s := strings.TrimSpace(ls.CheckString(1)) | ||
| if len(s) == 0 { | ||
| ls.Push(lua.LNumber(0)) | ||
| return 1 | ||
| } | ||
|
|
||
| // Extract leading digits. | ||
| i := 0 | ||
| for i < len(s) && s[i] >= '0' && s[i] <= '9' { | ||
| i++ | ||
| } | ||
| if i == 0 { | ||
| ls.Push(lua.LNumber(0)) | ||
| return 1 | ||
| } |
| return 0 | ||
| } | ||
|
|
||
| ls.Push(lua.LNumber(value * mult)) |
| end | ||
| if jm_memory ~= nil then | ||
| jm_requires.resourceRequest.memory = kube.getResourceQuantity(jm_memory) | ||
| jm_requires.resourceRequest.memory = kube.parseFlinkMemory(jm_memory) |
| end | ||
| if tm_memory ~= nil then | ||
| tm_requires.resourceRequest.memory = kube.getResourceQuantity(tm_memory) | ||
| tm_requires.resourceRequest.memory = kube.parseFlinkMemory(tm_memory) |
There was a problem hiding this comment.
Code Review
This pull request introduces a specialized parseFlinkMemory function to the Lua VM to correctly handle Flink's binary memory suffixes, where 'm' represents mebibytes rather than the Kubernetes default of milli-units. The Flink resource customization logic and associated test data have been updated to utilize this new parsing logic. Feedback suggests expanding the supported memory units to include petabytes, supporting decimal values in memory strings by transitioning to float parsing, and correcting an accidental deletion of the namespace field in the test expectations.
| var flinkMemoryMultipliers = map[string]int64{ | ||
| "": 1, | ||
| "b": 1, | ||
| "bytes": 1, | ||
| "k": 1024, | ||
| "kb": 1024, | ||
| "kibibytes": 1024, | ||
| "m": 1048576, | ||
| "mb": 1048576, | ||
| "mebibytes": 1048576, | ||
| "g": 1073741824, | ||
| "gb": 1073741824, | ||
| "gibibytes": 1073741824, | ||
| "t": 1099511627776, | ||
| "tb": 1099511627776, | ||
| "tebibytes": 1099511627776, | ||
| } |
There was a problem hiding this comment.
Flink also supports Petabyte units (p, pb, pebibytes). Consider adding them to the flinkMemoryMultipliers map for completeness, as Flink's MemorySize implementation supports these units.
| var flinkMemoryMultipliers = map[string]int64{ | |
| "": 1, | |
| "b": 1, | |
| "bytes": 1, | |
| "k": 1024, | |
| "kb": 1024, | |
| "kibibytes": 1024, | |
| "m": 1048576, | |
| "mb": 1048576, | |
| "mebibytes": 1048576, | |
| "g": 1073741824, | |
| "gb": 1073741824, | |
| "gibibytes": 1073741824, | |
| "t": 1099511627776, | |
| "tb": 1099511627776, | |
| "tebibytes": 1099511627776, | |
| } | |
| var flinkMemoryMultipliers = map[string]int64{ | |
| "": 1, | |
| "b": 1, | |
| "bytes": 1, | |
| "k": 1024, | |
| "kb": 1024, | |
| "kibibytes": 1024, | |
| "m": 1048576, | |
| "mb": 1048576, | |
| "mebibytes": 1048576, | |
| "g": 1073741824, | |
| "gb": 1073741824, | |
| "gibibytes": 1073741824, | |
| "t": 1099511627776, | |
| "tb": 1099511627776, | |
| "tebibytes": 1099511627776, | |
| "p": 1125899906842624, | |
| "pb": 1125899906842624, | |
| "pebibytes": 1125899906842624, | |
| } |
| func parseFlinkMemory(ls *lua.LState) int { | ||
| if ls.GetTop() != 1 { | ||
| ls.RaiseError("parseFlinkMemory only accepts one argument") | ||
| return 0 | ||
| } | ||
|
|
||
| v := ls.Get(1) | ||
| if v.Type() == lua.LTNil { | ||
| ls.Push(lua.LNumber(0)) | ||
| return 1 | ||
| } | ||
|
|
||
| s := strings.TrimSpace(ls.CheckString(1)) | ||
| if len(s) == 0 { | ||
| ls.Push(lua.LNumber(0)) | ||
| return 1 | ||
| } | ||
|
|
||
| // Extract leading digits. | ||
| i := 0 | ||
| for i < len(s) && s[i] >= '0' && s[i] <= '9' { | ||
| i++ | ||
| } | ||
| if i == 0 { | ||
| ls.Push(lua.LNumber(0)) | ||
| return 1 | ||
| } | ||
|
|
||
| value, err := strconv.ParseInt(s[:i], 10, 64) | ||
| if err != nil { | ||
| ls.RaiseError("parseFlinkMemory: invalid number %q: %v", s[:i], err) | ||
| return 0 | ||
| } | ||
|
|
||
| suffix := strings.ToLower(strings.TrimSpace(s[i:])) | ||
| mult, ok := flinkMemoryMultipliers[suffix] | ||
| if !ok { | ||
| ls.RaiseError("parseFlinkMemory: unrecognized suffix %q", suffix) | ||
| return 0 | ||
| } | ||
|
|
||
| ls.Push(lua.LNumber(value * mult)) | ||
| return 1 | ||
| } |
There was a problem hiding this comment.
The current implementation for extracting the numeric part of the memory string does not support decimal points (e.g., 1.5g), which are valid in Flink memory configurations. Additionally, using strconv.ParseFloat would be more robust and align better with Flink's own parsing logic. A doc comment has been added to this unexported function to improve maintainability as it contains key logic.
// parseFlinkMemory parses Flink memory strings into numeric values.
func parseFlinkMemory(ls *lua.LState) int {
if ls.GetTop() != 1 {
ls.RaiseError("parseFlinkMemory only accepts one argument")
return 0
}
v := ls.Get(1)
if v.Type() == lua.LTNil {
ls.Push(lua.LNumber(0))
return 1
}
s := strings.TrimSpace(ls.CheckString(1))
if len(s) == 0 {
ls.Push(lua.LNumber(0))
return 1
}
// Extract leading digits and decimal point.
i := 0
for i < len(s) && ((s[i] >= '0' && s[i] <= '9') || s[i] == '.') {
i++
}
if i == 0 {
ls.Push(lua.LNumber(0))
return 1
}
value, err := strconv.ParseFloat(s[:i], 64)
if err != nil {
ls.RaiseError("parseFlinkMemory: invalid number %q: %v", s[:i], err)
return 0
}
suffix := strings.ToLower(strings.TrimSpace(s[i:]))
mult, ok := flinkMemoryMultipliers[suffix]
if !ok {
ls.RaiseError("parseFlinkMemory: unrecognized suffix %q", suffix)
return 0
}
ls.Push(lua.LNumber(value * float64(mult)))
return 1
}References
- Add doc comments to unexported functions that contain key logic to improve maintainability.
| "cpu": "1" | ||
| "memory": "2048m" | ||
| namespace: "test-namespace" | ||
| "memory": "2147483648" |
There was a problem hiding this comment.
What type of PR is this?
/kind bug
What this PR does / why we need it:
For resource notion, Apache Flink uses "m" for mebibyte, however, Kuberentes uses "m" for millibyte. The current resource interpreter uses Kubernetes resource parser for parsing flink component resource requirements. This incorrectly interprets flink JM/TM memory (e.g. 1024 MiB to 1.024 Bytes in resourcebinding)
This PR adds a function to interpret flink resource notion in flink custom interpreter.
Which issue(s) this PR fixes:
Special notes for your reviewer:
Does this PR introduce a user-facing change?: