Skip to content

Commit fff127f

Browse files
committed
WIP: fluentd metrics
1 parent 5b16383 commit fff127f

File tree

18 files changed

+298
-101
lines changed

18 files changed

+298
-101
lines changed

apis/fluentd/v1alpha1/fluentd_types.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,15 @@ type FluentdSpec struct {
9999
SecurityContext *corev1.PodSecurityContext `json:"securityContext,omitempty"`
100100
// SchedulerName represents the desired scheduler for fluentd pods.
101101
SchedulerName string `json:"schedulerName,omitempty"`
102+
// EnablePrometheusMetrics will enable Prometheus metrics from fluentd.
103+
EnablePrometheusMetrics bool `json:"enablePrometheusMetrics,omitempty"`
104+
// MetricsPort is the port that the Prometheus metrics listener will be exposed on if
105+
// metrics are enabled, default is 2021
106+
// +kubebuilder:validation:Minimum:=1
107+
// +kubebuilder:validation:Maximum:=65535
108+
MetricsPort *int32 `json:"metricsPort,omitempty"`
109+
// MetricsBind is the host for metrics to listen on, default is "0.0.0.0"
110+
MetricsBind *string `json:"metricsBind,omitempty"`
102111
}
103112

104113
// FluentDService the service of the FluentD

apis/fluentd/v1alpha1/helper.go

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
"github.com/fluent/fluent-operator/v2/apis/fluentd/v1alpha1/plugins/output"
1313
"github.com/fluent/fluent-operator/v2/apis/fluentd/v1alpha1/plugins/params"
1414
fluentdRouter "github.com/fluent/fluent-operator/v2/pkg/fluentd/router"
15+
16+
"github.com/fluent/fluent-operator/v2/pkg/constants"
1517
)
1618

1719
// +kubebuilder:object:generate=false
@@ -30,6 +32,7 @@ type Renderer interface {
3032
type PluginResources struct {
3133
InputPlugins []params.PluginStore
3234
MainRouterPlugins params.PluginStore
35+
MonitoringFilters []params.PluginStore
3336
LabelPluginResources []params.PluginStore
3437
}
3538

@@ -45,13 +48,39 @@ type CfgResources struct {
4548
}
4649

4750
// NewGlobalPluginResources represents a combined global fluentd resources
48-
func NewGlobalPluginResources(globalId string) *PluginResources {
51+
func NewGlobalPluginResources(globalId string, enablePrometheusMonitoring bool, metricsPort *int32, metricsBind *string) *PluginResources {
4952
globalMainRouter := fluentdRouter.NewGlobalRouter(globalId)
50-
return &PluginResources{
53+
pluginResources := &PluginResources{
5154
InputPlugins: make([]params.PluginStore, 0),
5255
MainRouterPlugins: *globalMainRouter,
56+
MonitoringFilters: make([]params.PluginStore, 0),
5357
LabelPluginResources: make([]params.PluginStore, 0),
5458
}
59+
if enablePrometheusMonitoring {
60+
incomingMonitoringFilter := fluentdRouter.NewIncomingMonitoringFilter()
61+
outgoingMonitoringMatch := fluentdRouter.NewOutgoingMonitoringMatch()
62+
63+
var determinedMetricsPort int32
64+
if metricsPort == nil {
65+
determinedMetricsPort = constants.DefaultMetricsPort
66+
} else {
67+
determinedMetricsPort = *metricsPort
68+
}
69+
var determinedMetricsBind string
70+
if metricsBind == nil {
71+
determinedMetricsBind = constants.DefaultBind
72+
} else {
73+
determinedMetricsBind = *metricsBind
74+
}
75+
76+
pluginResources.MonitoringFilters = append(pluginResources.MonitoringFilters, *incomingMonitoringFilter, *outgoingMonitoringMatch)
77+
78+
outputSources := fluentdRouter.NewMetricsExposeSources(determinedMetricsPort, determinedMetricsBind)
79+
for _, s := range outputSources {
80+
pluginResources.MonitoringFilters = append(pluginResources.MonitoringFilters, *s)
81+
}
82+
}
83+
return pluginResources
5584
}
5685

5786
func NewCfgResources() *CfgResources {
@@ -237,7 +266,7 @@ func (pgr *PluginResources) WithCfgResources(cfgRouteLabel string, r *CfgResourc
237266
}
238267

239268
cfgLabelPlugin := params.NewPluginStore("label")
240-
cfgLabelPlugin.InsertPairs("tag", cfgRouteLabel)
269+
cfgLabelPlugin.Tag = cfgRouteLabel
241270

242271
// insert filter plugins of this fluentd config
243272
for _, filter := range r.FilterPlugins {
@@ -255,7 +284,7 @@ func (pgr *PluginResources) WithCfgResources(cfgRouteLabel string, r *CfgResourc
255284
return nil
256285
}
257286

258-
func (pgr *PluginResources) RenderMainConfig(enableMultiWorkers bool) (string, error) {
287+
func (pgr *PluginResources) RenderMainConfig(enableMultiWorkers bool, enablePrometheusMetrics bool) (string, error) {
259288
if len(pgr.InputPlugins) == 0 && len(pgr.LabelPluginResources) == 0 {
260289
return "", fmt.Errorf("no plugins detect")
261290
}
@@ -271,6 +300,17 @@ func (pgr *PluginResources) RenderMainConfig(enableMultiWorkers bool) (string, e
271300
buf.WriteString(pluginStore.String())
272301
}
273302

303+
// sort monitoring plugins
304+
if enablePrometheusMetrics {
305+
monitoringPlugins := ByHashcode(pgr.MonitoringFilters)
306+
for _, pluginStore := range monitoringPlugins {
307+
if enableMultiWorkers {
308+
pluginStore.SetIgnorePath()
309+
}
310+
buf.WriteString(pluginStore.String())
311+
}
312+
}
313+
274314
// sort main routers
275315
childRouters := ByRouteLabelsPointers(pgr.MainRouterPlugins.Childs)
276316
pgr.MainRouterPlugins.Childs = childRouters

apis/fluentd/v1alpha1/plugins/common/buffer_types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ func (b *Buffer) Params(_ plugins.SecretLoader) (*params.PluginStore, error) {
199199
ps.InsertPairs("timekey_wait", fmt.Sprint(*b.TimeKeyWait))
200200
}
201201

202-
ps.InsertPairs("tag", b.Tag)
202+
ps.Tag = b.Tag
203203

204204
if b.ChunkLimitSize != nil {
205205
ps.InsertPairs("chunk_limit_size", *b.ChunkLimitSize)

apis/fluentd/v1alpha1/plugins/filter/types.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,12 @@ func (f *Filter) Name() string {
5454
}
5555

5656
func (f *Filter) Params(loader plugins.SecretLoader) (*params.PluginStore, error) {
57-
ps := params.NewPluginStore(f.Name())
57+
var ps *params.PluginStore
58+
if f.Tag != nil {
59+
ps = params.NewPluginStoreWithTag(f.Name(), fmt.Sprint(*f.Tag))
60+
} else {
61+
ps = params.NewPluginStore(f.Name())
62+
}
5863

5964
if f.Id != nil {
6065
ps.InsertPairs("@id", fmt.Sprint(*f.Id))
@@ -64,10 +69,6 @@ func (f *Filter) Params(loader plugins.SecretLoader) (*params.PluginStore, error
6469
ps.InsertPairs("@log_level", fmt.Sprint(*f.LogLevel))
6570
}
6671

67-
if f.Tag != nil {
68-
ps.InsertPairs("tag", fmt.Sprint(*f.Tag))
69-
}
70-
7172
if f.Grep != nil {
7273
ps.InsertType(string(params.GrepFilterType))
7374
return f.grepPlugin(ps, loader), nil

apis/fluentd/v1alpha1/plugins/input/types.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func (i *Input) tailPlugin(parent *params.PluginStore, loader plugins.SecretLoad
9898
parent.InsertChilds(childs...)
9999

100100
if tailModel.Tag != "" {
101-
parent.InsertPairs("tag", fmt.Sprint(tailModel.Tag))
101+
parent.Tag = fmt.Sprint(tailModel.Tag)
102102
}
103103

104104
if tailModel.Path != "" {
@@ -232,7 +232,7 @@ func (i *Input) forwardPlugin(parent *params.PluginStore, loader plugins.SecretL
232232
}
233233

234234
if forwardModel.Tag != nil {
235-
parent.InsertPairs("tag", fmt.Sprint(*forwardModel.Tag))
235+
parent.Tag = fmt.Sprint(*forwardModel.Tag)
236236
}
237237

238238
if forwardModel.AddTagPrefix != nil {

apis/fluentd/v1alpha1/plugins/output/types.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,14 @@ func (o *Output) Name() string {
7272
}
7373

7474
func (o *Output) Params(loader plugins.SecretLoader) (*params.PluginStore, error) {
75-
ps := params.NewPluginStore(o.Name())
75+
var ps *params.PluginStore
76+
77+
if o.Tag != nil {
78+
ps = params.NewPluginStoreWithTag(o.Name(), fmt.Sprint(*o.Tag))
79+
} else {
80+
ps = params.NewPluginStore(o.Name())
81+
}
82+
7683
childs := make([]*params.PluginStore, 0)
7784

7885
ps.InsertPairs("@id", fmt.Sprint(*o.Id))
@@ -85,10 +92,6 @@ func (o *Output) Params(loader plugins.SecretLoader) (*params.PluginStore, error
8592
ps.InsertPairs("@label", fmt.Sprint(*o.Label))
8693
}
8794

88-
if o.Tag != nil {
89-
ps.InsertPairs("tag", fmt.Sprint(*o.Tag))
90-
}
91-
9295
if o.BufferSection.Buffer != nil {
9396
child, _ := o.BufferSection.Buffer.Params(loader)
9497
childs = append(childs, child)

apis/fluentd/v1alpha1/plugins/params/model.go

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
type PluginStore struct {
1212
// The plugin name
1313
Name string
14+
// Plugin routing tag
15+
Tag string
1416
// The key-value pairs
1517
Store map[string]string
1618
// The child plugins mounted here
@@ -30,6 +32,15 @@ func NewPluginStore(name string) *PluginStore {
3032
}
3133
}
3234

35+
func NewPluginStoreWithTag(name string, tag string) *PluginStore {
36+
return &PluginStore{
37+
Name: name,
38+
Tag: tag,
39+
Store: make(map[string]string),
40+
Childs: make([]*PluginStore, 0),
41+
}
42+
}
43+
3344
func (ps *PluginStore) InsertPairs(key, value string) {
3445
ps.Store[key] = value
3546
}
@@ -60,10 +71,10 @@ func (ps *PluginStore) InsertChilds(childs ...*PluginStore) {
6071

6172
// The total hash string for this plugin store
6273
func (ps *PluginStore) Hash() string {
63-
c := NewPluginStore(ps.Name)
74+
c := NewPluginStoreWithTag(ps.Name, ps.Tag)
6475

6576
for k, v := range ps.Store {
66-
if k == "@id" || k == "tag" {
77+
if k == "@id" {
6778
continue
6879
}
6980
c.Store[k] = v
@@ -75,7 +86,7 @@ func (ps *PluginStore) Hash() string {
7586

7687
// Returns tag value
7788
func (ps *PluginStore) GetTag() string {
78-
return ps.Store["tag"]
89+
return ps.Tag
7990
}
8091

8192
// Returns the @label value string of this plugin store
@@ -131,18 +142,18 @@ func (ps *PluginStore) processHead(buf *bytes.Buffer) {
131142
var head string
132143
switch PluginName(ps.Name) {
133144
case BufferPlugin:
134-
tag, ok := ps.Store[BufferTag]
135-
if ok {
145+
tag := ps.Tag
146+
if tag != "" {
136147
head = ps.headFmtSprintf(tag)
137148
}
138149
case MatchPlugin:
139-
head = ps.headFmtSprintf(ps.Store[MatchTag])
150+
head = ps.headFmtSprintf(ps.Tag)
140151
case FilterPlugin:
141-
head = ps.headFmtSprintf(ps.Store[FilterTag])
152+
head = ps.headFmtSprintf(ps.Tag)
142153
case TransportPlugin:
143154
head = ps.headFmtSprintf(ps.Store[ProtocolName])
144155
case LabelPlugin:
145-
head = ps.headFmtSprintf(ps.Store[LabelTag])
156+
head = ps.headFmtSprintf(ps.Tag)
146157
default:
147158
head = fmt.Sprintf("%s<%s>\n", ps.PrefixWhitespaces, ps.Name)
148159
}
@@ -155,10 +166,6 @@ func (ps *PluginStore) processBody(buf *bytes.Buffer) {
155166

156167
keys := make([]string, 0, len(ps.Store))
157168
for k := range ps.Store {
158-
// Don't add tag unless it is an input plugin
159-
if k == "tag" && ps.Name != "source" {
160-
continue
161-
}
162169
if ps.Name == string(BufferPlugin) && ps.IgnorePath {
163170
continue
164171
}

0 commit comments

Comments
 (0)