Skip to content

Commit 1ee2afe

Browse files
committed
Add kafka resource metrics
Signed-off-by: Tamal Saha <[email protected]>
1 parent 20ab6d2 commit 1ee2afe

File tree

5 files changed

+186
-5
lines changed

5 files changed

+186
-5
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ require (
4444
kmodules.xyz/go-containerregistry v0.0.11
4545
kmodules.xyz/monitoring-agent-api v0.25.1
4646
kmodules.xyz/resource-metadata v0.17.1
47-
kmodules.xyz/resource-metrics v0.25.1
47+
kmodules.xyz/resource-metrics v0.25.2
4848
kmodules.xyz/sets v0.24.0
4949
kubeops.dev/scanner v0.0.10
5050
sigs.k8s.io/cli-utils v0.34.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2123,8 +2123,8 @@ kmodules.xyz/offshoot-api v0.25.0 h1:Svq9da/+sg5afOjpgo9vx2J/Lu90Mo0aFxkdQmgKnGI
21232123
kmodules.xyz/offshoot-api v0.25.0/go.mod h1:ysEBn7LJuT3+s8ynAQA/OG0BSsJugXa6KGtDLMRjlKo=
21242124
kmodules.xyz/resource-metadata v0.17.1 h1:klZ7a4DLHD3vEMsnuIB/xmFoZBzN4T9XID/XwiGUWtI=
21252125
kmodules.xyz/resource-metadata v0.17.1/go.mod h1:MP+u4U1VMdtn2j52SLMw0pqzBDOa+wxnGoYNLtsJzYM=
2126-
kmodules.xyz/resource-metrics v0.25.1 h1:cnE1ydGZwETyvFVTPoWI9pInLyKLWjD3Ye3fpV6P49I=
2127-
kmodules.xyz/resource-metrics v0.25.1/go.mod h1:H7YLdUQJXUSzf5cNI4IYWU4Wsmrua/jpw7gqDnE3BwM=
2126+
kmodules.xyz/resource-metrics v0.25.2 h1:BwCb6qyunvQBa0u8UUkw+wYG5/T4qtNtAKcHjSsk0JU=
2127+
kmodules.xyz/resource-metrics v0.25.2/go.mod h1:ZK/52NLuwMk+Jt0bmUtGQHtSxPLYYpsFILG7SJhYPg0=
21282128
kmodules.xyz/sets v0.24.0 h1:GbltLEPVnURjcmWyf8eFstgJBpm9o151wsrABkByGrc=
21292129
kmodules.xyz/sets v0.24.0/go.mod h1:V+RRt2RqZfwYpVe9/rAodFiUARLPhz995GArdBhL8vU=
21302130
kubeops.dev/scanner v0.0.10 h1:h+igCycQwN06d3QSyd441OyE/ghRHvznQJ82SUb6yLo=

vendor/kmodules.xyz/resource-metrics/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ OS := $(if $(GOOS),$(GOOS),$(shell go env GOOS))
5454
ARCH := $(if $(GOARCH),$(GOARCH),$(shell go env GOARCH))
5555

5656
GO_VERSION ?= 1.20
57-
BUILD_IMAGE ?= appscode/golang-dev:$(GO_VERSION)
57+
BUILD_IMAGE ?= ghcr.io/appscode/golang-dev:$(GO_VERSION)
5858

5959
OUTBIN = bin/$(BIN)-$(OS)-$(ARCH)
6060
ifeq ($(OS),windows)
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/*
2+
Copyright AppsCode Inc. and Contributors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package v1alpha2
18+
19+
import (
20+
"fmt"
21+
"reflect"
22+
23+
"kmodules.xyz/resource-metrics/api"
24+
25+
"gomodules.xyz/pointer"
26+
core "k8s.io/api/core/v1"
27+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
28+
"k8s.io/apimachinery/pkg/runtime"
29+
"k8s.io/apimachinery/pkg/runtime/schema"
30+
)
31+
32+
func init() {
33+
api.Register(schema.GroupVersionKind{
34+
Group: "kubedb.com",
35+
Version: "v1alpha2",
36+
Kind: "Kafka",
37+
}, Kafka{}.ResourceCalculator())
38+
}
39+
40+
type Kafka struct{}
41+
42+
func (r Kafka) ResourceCalculator() api.ResourceCalculator {
43+
return &api.ResourceCalculatorFuncs{
44+
AppRoles: []api.PodRole{api.PodRoleDefault},
45+
RuntimeRoles: []api.PodRole{api.PodRoleDefault, api.PodRoleExporter},
46+
RoleReplicasFn: r.roleReplicasFn,
47+
ModeFn: r.modeFn,
48+
UsesTLSFn: r.usesTLSFn,
49+
RoleResourceLimitsFn: r.roleResourceFn(api.ResourceLimits),
50+
RoleResourceRequestsFn: r.roleResourceFn(api.ResourceRequests),
51+
}
52+
}
53+
54+
func (r Kafka) roleReplicasFn(obj map[string]interface{}) (api.ReplicaList, error) {
55+
result := api.ReplicaList{}
56+
57+
topology, found, err := unstructured.NestedMap(obj, "spec", "topology")
58+
if err != nil {
59+
return nil, err
60+
}
61+
if found && topology != nil {
62+
// dedicated topology mode
63+
var replicas int64 = 0
64+
for role, roleSpec := range topology {
65+
roleReplicas, found, err := unstructured.NestedInt64(roleSpec.(map[string]interface{}), "replicas")
66+
if err != nil {
67+
return nil, err
68+
}
69+
if found {
70+
result[api.PodRole(role)] = roleReplicas
71+
replicas += roleReplicas
72+
}
73+
}
74+
result[api.PodRoleDefault] = replicas
75+
} else {
76+
// Combined mode
77+
replicas, found, err := unstructured.NestedInt64(obj, "spec", "replicas")
78+
if err != nil {
79+
return nil, fmt.Errorf("failed to read spec.replicas %v: %w", obj, err)
80+
}
81+
if !found {
82+
result[api.PodRoleDefault] = 1
83+
} else {
84+
result[api.PodRoleDefault] = replicas
85+
}
86+
}
87+
return result, nil
88+
}
89+
90+
func (r Kafka) modeFn(obj map[string]interface{}) (string, error) {
91+
topology, found, err := unstructured.NestedFieldNoCopy(obj, "spec", "topology")
92+
if err != nil {
93+
return "", err
94+
}
95+
if found && !reflect.ValueOf(topology).IsNil() {
96+
return "Dedicated", nil
97+
}
98+
return "Combined", nil
99+
}
100+
101+
func (r Kafka) usesTLSFn(obj map[string]interface{}) (bool, error) {
102+
_, found, err := unstructured.NestedFieldNoCopy(obj, "spec", "enableSSL")
103+
return found, err
104+
}
105+
106+
func (r Kafka) roleResourceFn(fn func(rr core.ResourceRequirements) core.ResourceList) func(obj map[string]interface{}) (map[api.PodRole]core.ResourceList, error) {
107+
return func(obj map[string]interface{}) (map[api.PodRole]core.ResourceList, error) {
108+
exporter, err := api.ContainerResources(obj, fn, "spec", "monitor", "prometheus", "exporter")
109+
if err != nil {
110+
return nil, err
111+
}
112+
113+
topology, found, err := unstructured.NestedMap(obj, "spec", "topology")
114+
if err != nil {
115+
return nil, err
116+
}
117+
if found && topology != nil {
118+
var replicas int64 = 0
119+
var totalResources core.ResourceList
120+
result := map[api.PodRole]core.ResourceList{}
121+
122+
for role, roleSpec := range topology {
123+
rolePerReplicaResources, roleReplicas, err := KafkaNodeResources(roleSpec.(map[string]interface{}), fn)
124+
if err != nil {
125+
return nil, err
126+
}
127+
128+
roleResources := api.MulResourceList(rolePerReplicaResources, roleReplicas)
129+
result[api.PodRole(role)] = roleResources
130+
totalResources = api.AddResourceList(totalResources, roleResources)
131+
}
132+
133+
result[api.PodRoleDefault] = totalResources
134+
result[api.PodRoleExporter] = api.MulResourceList(exporter, replicas)
135+
return result, nil
136+
}
137+
138+
// Kafka Combined
139+
container, replicas, err := api.AppNodeResources(obj, fn, "spec")
140+
if err != nil {
141+
return nil, err
142+
}
143+
144+
return map[api.PodRole]core.ResourceList{
145+
api.PodRoleDefault: api.MulResourceList(container, replicas),
146+
api.PodRoleExporter: api.MulResourceList(exporter, replicas),
147+
}, nil
148+
}
149+
}
150+
151+
type KafkaNode struct {
152+
Replicas *int64 `json:"replicas,omitempty"`
153+
Resources core.ResourceRequirements `json:"resources,omitempty"`
154+
Storage core.PersistentVolumeClaimSpec `json:"storage,omitempty"`
155+
}
156+
157+
func KafkaNodeResources(
158+
obj map[string]interface{},
159+
fn func(rr core.ResourceRequirements) core.ResourceList,
160+
fields ...string,
161+
) (core.ResourceList, int64, error) {
162+
val, found, err := unstructured.NestedFieldNoCopy(obj, fields...)
163+
if !found || err != nil {
164+
return nil, 0, err
165+
}
166+
167+
var node KafkaNode
168+
err = runtime.DefaultUnstructuredConverter.FromUnstructured(val.(map[string]interface{}), &node)
169+
if err != nil {
170+
return nil, 0, fmt.Errorf("failed to parse node %#v: %w", node, err)
171+
}
172+
173+
if node.Replicas == nil {
174+
node.Replicas = pointer.Int64P(1)
175+
}
176+
rr := fn(node.Resources)
177+
sr := fn(node.Storage.Resources)
178+
rr[core.ResourceStorage] = *sr.Storage()
179+
180+
return rr, *node.Replicas, nil
181+
}

vendor/modules.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1806,7 +1806,7 @@ kmodules.xyz/resource-metadata/pkg/layouts
18061806
kmodules.xyz/resource-metadata/pkg/tableconvertor
18071807
kmodules.xyz/resource-metadata/pkg/tableconvertor/lib
18081808
kmodules.xyz/resource-metadata/pkg/tableconvertor/printers
1809-
# kmodules.xyz/resource-metrics v0.25.1
1809+
# kmodules.xyz/resource-metrics v0.25.2
18101810
## explicit; go 1.18
18111811
kmodules.xyz/resource-metrics
18121812
kmodules.xyz/resource-metrics/api

0 commit comments

Comments
 (0)