Skip to content

WIP: fluentd metrics #813

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions apis/fluentd/v1alpha1/fluentd_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ type FluentdSpec struct {
SecurityContext *corev1.PodSecurityContext `json:"securityContext,omitempty"`
// SchedulerName represents the desired scheduler for fluentd pods.
SchedulerName string `json:"schedulerName,omitempty"`
// EnablePrometheusMetrics will enable Prometheus metrics from fluentd.
EnablePrometheusMetrics bool `json:"enablePrometheusMetrics,omitempty"`
// MetricsPort is the port that the Prometheus metrics listener will be exposed on if
// metrics are enabled, default is 2021
// +kubebuilder:validation:Minimum:=1
// +kubebuilder:validation:Maximum:=65535
MetricsPort *int32 `json:"metricsPort,omitempty"`
// MetricsBind is the host for metrics to listen on, default is "0.0.0.0"
MetricsBind *string `json:"metricsBind,omitempty"`
}

// FluentDService the service of the FluentD
Expand Down
48 changes: 44 additions & 4 deletions apis/fluentd/v1alpha1/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/fluent/fluent-operator/v2/apis/fluentd/v1alpha1/plugins/output"
"github.com/fluent/fluent-operator/v2/apis/fluentd/v1alpha1/plugins/params"
fluentdRouter "github.com/fluent/fluent-operator/v2/pkg/fluentd/router"

"github.com/fluent/fluent-operator/v2/pkg/constants"
)

// +kubebuilder:object:generate=false
Expand All @@ -30,6 +32,7 @@ type Renderer interface {
type PluginResources struct {
InputPlugins []params.PluginStore
MainRouterPlugins params.PluginStore
MonitoringFilters []params.PluginStore
LabelPluginResources []params.PluginStore
}

Expand All @@ -45,13 +48,39 @@ type CfgResources struct {
}

// NewGlobalPluginResources represents a combined global fluentd resources
func NewGlobalPluginResources(globalId string) *PluginResources {
func NewGlobalPluginResources(globalId string, enablePrometheusMonitoring bool, metricsPort *int32, metricsBind *string) *PluginResources {
globalMainRouter := fluentdRouter.NewGlobalRouter(globalId)
return &PluginResources{
pluginResources := &PluginResources{
InputPlugins: make([]params.PluginStore, 0),
MainRouterPlugins: *globalMainRouter,
MonitoringFilters: make([]params.PluginStore, 0),
LabelPluginResources: make([]params.PluginStore, 0),
}
if enablePrometheusMonitoring {
incomingMonitoringFilter := fluentdRouter.NewIncomingMonitoringFilter()
outgoingMonitoringMatch := fluentdRouter.NewOutgoingMonitoringMatch()

var determinedMetricsPort int32
if metricsPort == nil {
determinedMetricsPort = constants.DefaultMetricsPort
} else {
determinedMetricsPort = *metricsPort
}
var determinedMetricsBind string
if metricsBind == nil {
determinedMetricsBind = constants.DefaultBind
} else {
determinedMetricsBind = *metricsBind
}

pluginResources.MonitoringFilters = append(pluginResources.MonitoringFilters, *incomingMonitoringFilter, *outgoingMonitoringMatch)

outputSources := fluentdRouter.NewMetricsExposeSources(determinedMetricsPort, determinedMetricsBind)
for _, s := range outputSources {
pluginResources.MonitoringFilters = append(pluginResources.MonitoringFilters, *s)
}
}
return pluginResources
}

func NewCfgResources() *CfgResources {
Expand Down Expand Up @@ -237,7 +266,7 @@ func (pgr *PluginResources) WithCfgResources(cfgRouteLabel string, r *CfgResourc
}

cfgLabelPlugin := params.NewPluginStore("label")
cfgLabelPlugin.InsertPairs("tag", cfgRouteLabel)
cfgLabelPlugin.Tag = cfgRouteLabel

// insert filter plugins of this fluentd config
for _, filter := range r.FilterPlugins {
Expand All @@ -255,7 +284,7 @@ func (pgr *PluginResources) WithCfgResources(cfgRouteLabel string, r *CfgResourc
return nil
}

func (pgr *PluginResources) RenderMainConfig(enableMultiWorkers bool) (string, error) {
func (pgr *PluginResources) RenderMainConfig(enableMultiWorkers bool, enablePrometheusMetrics bool) (string, error) {
if len(pgr.InputPlugins) == 0 && len(pgr.LabelPluginResources) == 0 {
return "", fmt.Errorf("no plugins detect")
}
Expand All @@ -271,6 +300,17 @@ func (pgr *PluginResources) RenderMainConfig(enableMultiWorkers bool) (string, e
buf.WriteString(pluginStore.String())
}

// sort monitoring plugins
if enablePrometheusMetrics {
monitoringPlugins := ByHashcode(pgr.MonitoringFilters)
for _, pluginStore := range monitoringPlugins {
if enableMultiWorkers {
pluginStore.SetIgnorePath()
}
buf.WriteString(pluginStore.String())
}
}

// sort main routers
childRouters := ByRouteLabelsPointers(pgr.MainRouterPlugins.Childs)
pgr.MainRouterPlugins.Childs = childRouters
Expand Down
2 changes: 1 addition & 1 deletion apis/fluentd/v1alpha1/plugins/common/buffer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (b *Buffer) Params(_ plugins.SecretLoader) (*params.PluginStore, error) {
ps.InsertPairs("timekey_wait", fmt.Sprint(*b.TimeKeyWait))
}

ps.InsertPairs("tag", b.Tag)
ps.Tag = b.Tag

if b.ChunkLimitSize != nil {
ps.InsertPairs("chunk_limit_size", *b.ChunkLimitSize)
Expand Down
11 changes: 6 additions & 5 deletions apis/fluentd/v1alpha1/plugins/filter/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ func (f *Filter) Name() string {
}

func (f *Filter) Params(loader plugins.SecretLoader) (*params.PluginStore, error) {
ps := params.NewPluginStore(f.Name())
var ps *params.PluginStore
if f.Tag != nil {
ps = params.NewPluginStoreWithTag(f.Name(), fmt.Sprint(*f.Tag))
} else {
ps = params.NewPluginStore(f.Name())
}

if f.Id != nil {
ps.InsertPairs("@id", fmt.Sprint(*f.Id))
Expand All @@ -64,10 +69,6 @@ func (f *Filter) Params(loader plugins.SecretLoader) (*params.PluginStore, error
ps.InsertPairs("@log_level", fmt.Sprint(*f.LogLevel))
}

if f.Tag != nil {
ps.InsertPairs("tag", fmt.Sprint(*f.Tag))
}

if f.Grep != nil {
ps.InsertType(string(params.GrepFilterType))
return f.grepPlugin(ps, loader), nil
Expand Down
4 changes: 2 additions & 2 deletions apis/fluentd/v1alpha1/plugins/input/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (i *Input) tailPlugin(parent *params.PluginStore, loader plugins.SecretLoad
parent.InsertChilds(childs...)

if tailModel.Tag != "" {
parent.InsertPairs("tag", fmt.Sprint(tailModel.Tag))
parent.Tag = fmt.Sprint(tailModel.Tag)
}

if tailModel.Path != "" {
Expand Down Expand Up @@ -232,7 +232,7 @@ func (i *Input) forwardPlugin(parent *params.PluginStore, loader plugins.SecretL
}

if forwardModel.Tag != nil {
parent.InsertPairs("tag", fmt.Sprint(*forwardModel.Tag))
parent.Tag = fmt.Sprint(*forwardModel.Tag)
}

if forwardModel.AddTagPrefix != nil {
Expand Down
13 changes: 8 additions & 5 deletions apis/fluentd/v1alpha1/plugins/output/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,14 @@ func (o *Output) Name() string {
}

func (o *Output) Params(loader plugins.SecretLoader) (*params.PluginStore, error) {
ps := params.NewPluginStore(o.Name())
var ps *params.PluginStore

if o.Tag != nil {
ps = params.NewPluginStoreWithTag(o.Name(), fmt.Sprint(*o.Tag))
} else {
ps = params.NewPluginStore(o.Name())
}

childs := make([]*params.PluginStore, 0)

ps.InsertPairs("@id", fmt.Sprint(*o.Id))
Expand All @@ -85,10 +92,6 @@ func (o *Output) Params(loader plugins.SecretLoader) (*params.PluginStore, error
ps.InsertPairs("@label", fmt.Sprint(*o.Label))
}

if o.Tag != nil {
ps.InsertPairs("tag", fmt.Sprint(*o.Tag))
}

if o.BufferSection.Buffer != nil {
child, _ := o.BufferSection.Buffer.Params(loader)
childs = append(childs, child)
Expand Down
32 changes: 18 additions & 14 deletions apis/fluentd/v1alpha1/plugins/params/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
type PluginStore struct {
// The plugin name
Name string
// Plugin routing tag
Tag string
// The key-value pairs
Store map[string]string
// The child plugins mounted here
Expand All @@ -30,6 +32,15 @@ func NewPluginStore(name string) *PluginStore {
}
}

func NewPluginStoreWithTag(name string, tag string) *PluginStore {
return &PluginStore{
Name: name,
Tag: tag,
Store: make(map[string]string),
Childs: make([]*PluginStore, 0),
}
}

func (ps *PluginStore) InsertPairs(key, value string) {
ps.Store[key] = value
}
Expand Down Expand Up @@ -60,10 +71,10 @@ func (ps *PluginStore) InsertChilds(childs ...*PluginStore) {

// The total hash string for this plugin store
func (ps *PluginStore) Hash() string {
c := NewPluginStore(ps.Name)
c := NewPluginStoreWithTag(ps.Name, ps.Tag)

for k, v := range ps.Store {
if k == "@id" || k == "tag" {
if k == "@id" {
continue
}
c.Store[k] = v
Expand All @@ -75,7 +86,7 @@ func (ps *PluginStore) Hash() string {

// Returns tag value
func (ps *PluginStore) GetTag() string {
return ps.Store["tag"]
return ps.Tag
}

// Returns the @label value string of this plugin store
Expand Down Expand Up @@ -131,18 +142,15 @@ func (ps *PluginStore) processHead(buf *bytes.Buffer) {
var head string
switch PluginName(ps.Name) {
case BufferPlugin:
tag, ok := ps.Store[BufferTag]
if ok {
head = ps.headFmtSprintf(tag)
}
head = ps.headFmtSprintf(ps.Tag)
case MatchPlugin:
head = ps.headFmtSprintf(ps.Store[MatchTag])
head = ps.headFmtSprintf(ps.Tag)
case FilterPlugin:
head = ps.headFmtSprintf(ps.Store[FilterTag])
head = ps.headFmtSprintf(ps.Tag)
case TransportPlugin:
head = ps.headFmtSprintf(ps.Store[ProtocolName])
case LabelPlugin:
head = ps.headFmtSprintf(ps.Store[LabelTag])
head = ps.headFmtSprintf(ps.Tag)
default:
head = fmt.Sprintf("%s<%s>\n", ps.PrefixWhitespaces, ps.Name)
}
Expand All @@ -155,10 +163,6 @@ func (ps *PluginStore) processBody(buf *bytes.Buffer) {

keys := make([]string, 0, len(ps.Store))
for k := range ps.Store {
// Don't add tag unless it is an input plugin
if k == "tag" && ps.Name != "source" {
continue
}
if ps.Name == string(BufferPlugin) && ps.IgnorePath {
continue
}
Expand Down
Loading