Skip to content

Commit 404d1b2

Browse files
authored
Merge pull request #469 from WangZzzhe/dev/add-rodan-metrics
feat(metric): add rodan metric provisioner
2 parents 33726be + 6218679 commit 404d1b2

File tree

18 files changed

+1709
-11
lines changed

18 files changed

+1709
-11
lines changed

cmd/katalyst-agent/app/options/metaserver/metaserver.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ const defaultCustomNodeResourceCacheTTL = 15 * time.Second
5656

5757
const defaultCustomNodeConfigCacheTTL = 15 * time.Second
5858

59+
const defaultRodanServerPort = 9102
60+
5961
// MetaServerOptions holds all the configurations for metaserver.
6062
// we will not try to separate this structure into several individual
6163
// structures since it will not be used directly by other components; instead,
@@ -79,6 +81,7 @@ type MetaServerOptions struct {
7981
// configurations for metric-fetcher
8082
MetricInsurancePeriod time.Duration
8183
MetricProvisions []string
84+
RodanServerPort int
8285

8386
// configurations for pod-cache
8487
KubeletPodCacheSyncPeriod time.Duration
@@ -109,6 +112,7 @@ func NewMetaServerOptions() *MetaServerOptions {
109112

110113
MetricInsurancePeriod: defaultMetricInsurancePeriod,
111114
MetricProvisions: []string{metaserver.MetricProvisionerMalachite, metaserver.MetricProvisionerKubelet},
115+
RodanServerPort: defaultRodanServerPort,
112116

113117
KubeletPodCacheSyncPeriod: defaultKubeletPodCacheSyncPeriod,
114118
KubeletPodCacheSyncMaxRate: defaultKubeletPodCacheSyncMaxRate,
@@ -150,6 +154,8 @@ func (o *MetaServerOptions) AddFlags(fss *cliflag.NamedFlagSets) {
150154
"The meta server return metric data and MetricDataExpired if the update time of metric data is earlier than this period.")
151155
fs.StringSliceVar(&o.MetricProvisions, "metric-provisioners", o.MetricProvisions,
152156
"The provisioners that should be enabled by default")
157+
fs.IntVar(&o.RodanServerPort, "rodan-server-port", o.RodanServerPort,
158+
"The rodan metric provisioner server port")
153159

154160
fs.DurationVar(&o.KubeletPodCacheSyncPeriod, "kubelet-pod-cache-sync-period", o.KubeletPodCacheSyncPeriod,
155161
"The period of meta server to sync pod from kubelet 10255 port")
@@ -183,6 +189,7 @@ func (o *MetaServerOptions) ApplyTo(c *metaserver.MetaServerConfiguration) error
183189

184190
c.MetricInsurancePeriod = o.MetricInsurancePeriod
185191
c.MetricProvisions = o.MetricProvisions
192+
c.RodanServerPort = o.RodanServerPort
186193

187194
c.KubeletPodCacheSyncPeriod = o.KubeletPodCacheSyncPeriod
188195
c.KubeletPodCacheSyncMaxRate = rate.Limit(o.KubeletPodCacheSyncMaxRate)

pkg/config/agent/metaserver/agent.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,17 @@ const (
2626
MetricProvisionerMalachite = "malachite"
2727
MetricProvisionerCgroup = "cgroup"
2828
MetricProvisionerKubelet = "kubelet"
29+
MetricProvisionerRodan = "rodan"
2930
)
3031

3132
type MetricConfiguration struct {
3233
MetricInsurancePeriod time.Duration
3334
MetricProvisions []string
35+
*RodanMetricConfiguration
36+
}
37+
38+
type RodanMetricConfiguration struct {
39+
RodanServerPort int
3440
}
3541

3642
type PodConfiguration struct {
@@ -64,10 +70,12 @@ type AgentConfiguration struct {
6470

6571
func NewAgentConfiguration() *AgentConfiguration {
6672
return &AgentConfiguration{
67-
MetricConfiguration: &MetricConfiguration{},
68-
PodConfiguration: &PodConfiguration{},
69-
NodeConfiguration: &NodeConfiguration{},
70-
CNRConfiguration: &CNRConfiguration{},
71-
CNCConfiguration: &CNCConfiguration{},
73+
MetricConfiguration: &MetricConfiguration{
74+
RodanMetricConfiguration: &RodanMetricConfiguration{},
75+
},
76+
PodConfiguration: &PodConfiguration{},
77+
NodeConfiguration: &NodeConfiguration{},
78+
CNRConfiguration: &CNRConfiguration{},
79+
CNCConfiguration: &CNCConfiguration{},
7280
}
7381
}

pkg/metaserver/agent/metric/metric_impl.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ func NewMetricsFetcher(baseConf *global.BaseConfiguration, metricConf *metaserve
233233
var enabledProvisioners []types.MetricsProvisioner
234234
for _, name := range metricConf.MetricProvisions {
235235
if f, ok := registeredProvisioners[name]; ok {
236-
enabledProvisioners = append(enabledProvisioners, f(baseConf, emitter, podFetcher, metricStore))
236+
enabledProvisioners = append(enabledProvisioners, f(baseConf, metricConf, emitter, podFetcher, metricStore))
237237
}
238238
}
239239

pkg/metaserver/agent/metric/metric_provisoner.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/provisioner/cgroup"
2525
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/provisioner/kubelet"
2626
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/provisioner/malachite"
27+
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/provisioner/rodan"
2728
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/types"
2829
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod"
2930
"github.com/kubewharf/katalyst-core/pkg/metrics"
@@ -34,9 +35,10 @@ func init() {
3435
RegisterProvisioners(metaserver.MetricProvisionerMalachite, malachite.NewMalachiteMetricsProvisioner)
3536
RegisterProvisioners(metaserver.MetricProvisionerKubelet, kubelet.NewKubeletSummaryProvisioner)
3637
RegisterProvisioners(metaserver.MetricProvisionerCgroup, cgroup.NewCGroupMetricsProvisioner)
38+
RegisterProvisioners(metaserver.MetricProvisionerRodan, rodan.NewRodanMetricsProvisioner)
3739
}
3840

39-
type ProvisionerInitFunc func(baseConf *global.BaseConfiguration,
41+
type ProvisionerInitFunc func(baseConf *global.BaseConfiguration, metricConf *metaserver.MetricConfiguration,
4042
emitter metrics.MetricEmitter, fetcher pod.PodFetcher, metricStore *utilmetric.MetricStore) types.MetricsProvisioner
4143

4244
// provisioners stores the initializing function for each-provisioner

pkg/metaserver/agent/metric/provisioner/cgroup/provisioner.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,15 @@ import (
2020
"context"
2121

2222
"github.com/kubewharf/katalyst-core/pkg/config/agent/global"
23+
"github.com/kubewharf/katalyst-core/pkg/config/agent/metaserver"
2324
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/types"
2425
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod"
2526
"github.com/kubewharf/katalyst-core/pkg/metrics"
2627
utilmetric "github.com/kubewharf/katalyst-core/pkg/util/metric"
2728
)
2829

2930
// NewCGroupMetricsProvisioner returns the default implementation of CGroup.
30-
func NewCGroupMetricsProvisioner(baseConf *global.BaseConfiguration,
31+
func NewCGroupMetricsProvisioner(baseConf *global.BaseConfiguration, _ *metaserver.MetricConfiguration,
3132
emitter metrics.MetricEmitter, _ pod.PodFetcher, metricStore *utilmetric.MetricStore) types.MetricsProvisioner {
3233
return &CGroupMetricsProvisioner{
3334
metricStore: metricStore,

pkg/metaserver/agent/metric/provisioner/kubelet/provisioner.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
2525

2626
"github.com/kubewharf/katalyst-core/pkg/config/agent/global"
27+
"github.com/kubewharf/katalyst-core/pkg/config/agent/metaserver"
2728
"github.com/kubewharf/katalyst-core/pkg/consts"
2829
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/provisioner/kubelet/client"
2930
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/types"
@@ -36,7 +37,7 @@ const (
3637
metricsNamKubeletSummaryUnHealthy = "kubelet_summary_unhealthy"
3738
)
3839

39-
func NewKubeletSummaryProvisioner(baseConf *global.BaseConfiguration,
40+
func NewKubeletSummaryProvisioner(baseConf *global.BaseConfiguration, _ *metaserver.MetricConfiguration,
4041
emitter metrics.MetricEmitter, _ pod.PodFetcher, metricStore *utilmetric.MetricStore) types.MetricsProvisioner {
4142
return &KubeletSummaryProvisioner{
4243
metricStore: metricStore,

pkg/metaserver/agent/metric/provisioner/malachite/provisioner.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"k8s.io/klog/v2"
2828

2929
"github.com/kubewharf/katalyst-core/pkg/config/agent/global"
30+
"github.com/kubewharf/katalyst-core/pkg/config/agent/metaserver"
3031
"github.com/kubewharf/katalyst-core/pkg/consts"
3132
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/provisioner/malachite/client"
3233
malachitetypes "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/provisioner/malachite/types"
@@ -53,7 +54,7 @@ const (
5354
)
5455

5556
// NewMalachiteMetricsProvisioner returns the default implementation of MetricsFetcher.
56-
func NewMalachiteMetricsProvisioner(baseConf *global.BaseConfiguration,
57+
func NewMalachiteMetricsProvisioner(baseConf *global.BaseConfiguration, _ *metaserver.MetricConfiguration,
5758
emitter metrics.MetricEmitter, fetcher pod.PodFetcher, metricStore *utilmetric.MetricStore) types.MetricsProvisioner {
5859
return &MalachiteMetricsProvisioner{
5960
malachiteClient: client.NewMalachiteClient(fetcher),

pkg/metaserver/agent/metric/provisioner/malachite/provisioner_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"testing"
2121

2222
"github.com/kubewharf/katalyst-core/pkg/config/agent/global"
23+
"github.com/kubewharf/katalyst-core/pkg/config/agent/metaserver"
2324
malachitetypes "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/provisioner/malachite/types"
2425
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod"
2526
"github.com/kubewharf/katalyst-core/pkg/metrics"
@@ -32,7 +33,7 @@ func Test_noneExistMetricsProvisioner(t *testing.T) {
3233
store := utilmetric.NewMetricStore()
3334

3435
var err error
35-
implement := NewMalachiteMetricsProvisioner(&global.BaseConfiguration{}, metrics.DummyMetrics{}, &pod.PodFetcherStub{}, store)
36+
implement := NewMalachiteMetricsProvisioner(&global.BaseConfiguration{}, &metaserver.MetricConfiguration{}, metrics.DummyMetrics{}, &pod.PodFetcherStub{}, store)
3637

3738
fakeSystemCompute := &malachitetypes.SystemComputeData{
3839
CPU: []malachitetypes.CPU{
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
Copyright 2022 The Katalyst Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package client
18+
19+
import (
20+
"fmt"
21+
"io"
22+
"net/http"
23+
24+
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/provisioner/rodan/types"
25+
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod"
26+
)
27+
28+
type RodanClient struct {
29+
urls map[string]string
30+
31+
fetcher pod.PodFetcher
32+
33+
metricFunc MetricFunc
34+
}
35+
36+
func NewRodanClient(fetcher pod.PodFetcher, metricFunc MetricFunc, port int) *RodanClient {
37+
urls := make(map[string]string)
38+
for path := range types.MetricsMap {
39+
urls[path] = fmt.Sprintf("http://localhost:%d%s", port, path)
40+
}
41+
42+
if metricFunc == nil {
43+
metricFunc = getMetrics
44+
}
45+
46+
return &RodanClient{
47+
fetcher: fetcher,
48+
urls: urls,
49+
metricFunc: metricFunc,
50+
}
51+
}
52+
53+
type MetricFunc func(url string, params map[string]string) ([]byte, error)
54+
55+
func getMetrics(url string, params map[string]string) ([]byte, error) {
56+
if len(params) != 0 {
57+
firstParam := true
58+
59+
for k, v := range params {
60+
if firstParam {
61+
url += "?"
62+
firstParam = false
63+
} else {
64+
url += "&"
65+
}
66+
url = fmt.Sprintf("%s%s=%s", url, k, v)
67+
}
68+
}
69+
70+
rsp, err := http.Get(url)
71+
if err != nil {
72+
return nil, fmt.Errorf("failed to get metrics, url: %v, err: %v", url, err)
73+
}
74+
defer func() { _ = rsp.Body.Close() }()
75+
76+
if rsp.StatusCode != 200 {
77+
return nil, fmt.Errorf("invalid http response status code %d, status: %s, url: %s", rsp.StatusCode, rsp.Status, url)
78+
}
79+
80+
return io.ReadAll(rsp.Body)
81+
}

0 commit comments

Comments
 (0)