Skip to content

Commit 29c8f64

Browse files
[8.19](backport #48013) refactor(x-pack/otel/processor/beatprocessor): remove code duplication (#48016)
* refactor: remove code duplication (#48013) (cherry picked from commit 8458c5a) # Conflicts: # x-pack/otel/processor/beatprocessor/processor.go * fix conflicts --------- Co-authored-by: Andrzej Stencel <[email protected]>
1 parent 6e0b5e0 commit 29c8f64

File tree

1 file changed

+27
-36
lines changed

1 file changed

+27
-36
lines changed

x-pack/otel/processor/beatprocessor/processor.go

Lines changed: 27 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ func newBeatProcessor(set processor.Settings, cfg *Config) (*beatProcessor, erro
3838
return nil, fmt.Errorf("failed to configure logp logger: %w", err)
3939
}
4040

41-
for _, processorConfig := range cfg.Processors {
42-
processor, err := createProcessor(processorConfig, logpLogger)
41+
for _, processorNameAndConfig := range cfg.Processors {
42+
processor, err := createProcessor(processorNameAndConfig, logpLogger)
4343
if err != nil {
4444
return nil, fmt.Errorf("failed to create processor: %w", err)
4545
}
@@ -56,56 +56,47 @@ func newBeatProcessor(set processor.Settings, cfg *Config) (*beatProcessor, erro
5656
// The configuration is expected to be a map with a single key containing the processor name
5757
// and the processor's configuration as the value for that key.
5858
// For example: {"add_host_metadata":{"netinfo":{"enabled":false}}}
59-
func createProcessor(cfg map[string]any, logpLogger *logp.Logger) (beat.Processor, error) {
60-
if len(cfg) == 0 {
59+
func createProcessor(processorNameAndConfig map[string]any, logpLogger *logp.Logger) (beat.Processor, error) {
60+
if len(processorNameAndConfig) == 0 {
6161
return nil, nil
6262
}
63-
if len(cfg) > 1 {
64-
if len(cfg) < 10 {
65-
configKeys := make([]string, 0, len(cfg))
66-
for k := range cfg {
63+
if len(processorNameAndConfig) > 1 {
64+
if len(processorNameAndConfig) < 10 {
65+
configKeys := make([]string, 0, len(processorNameAndConfig))
66+
for k := range processorNameAndConfig {
6767
configKeys = append(configKeys, k)
6868
}
69-
return nil, fmt.Errorf("expected single processor name but got %v: %v", len(cfg), configKeys)
69+
return nil, fmt.Errorf("expected single processor name but got %v: %v", len(processorNameAndConfig), configKeys)
7070
}
71-
return nil, fmt.Errorf("expected single processor name but got %v", len(cfg))
71+
return nil, fmt.Errorf("expected single processor name but got %v", len(processorNameAndConfig))
7272
}
7373

74-
for processorName, processorConfig := range cfg {
74+
for processorName, processorConfig := range processorNameAndConfig {
75+
processorConfig, configError := config.NewConfigFrom(processorConfig)
76+
if configError != nil {
77+
return nil, fmt.Errorf("failed to create config for processor '%s': %w", processorName, configError)
78+
}
79+
80+
var processorInstance beat.Processor
81+
var createProcessorError error
82+
7583
switch processorName {
7684
case "add_host_metadata":
77-
return createAddHostMetadataProcessor(processorConfig, logpLogger)
85+
processorInstance, createProcessorError = add_host_metadata.New(processorConfig, logpLogger)
7886
case "add_kubernetes_metadata":
79-
return createAddKubernetesMetadataProcessor(processorConfig, logpLogger)
87+
processorInstance, createProcessorError = add_kubernetes_metadata.New(processorConfig, logpLogger)
8088
default:
8189
return nil, fmt.Errorf("invalid processor name '%s'", processorName)
8290
}
83-
}
84-
return nil, errors.New("malformed processor config")
85-
}
8691

87-
func createAddHostMetadataProcessor(cfg any, logpLogger *logp.Logger) (beat.Processor, error) {
88-
addHostMetadataConfig, err := config.NewConfigFrom(cfg)
89-
if err != nil {
90-
return nil, fmt.Errorf("failed to create add_host_metadata processor config: %w", err)
91-
}
92-
addHostMetadataProcessor, err := add_host_metadata.New(addHostMetadataConfig, logpLogger)
93-
if err != nil {
94-
return nil, fmt.Errorf("failed to create add_host_metadata processor: %w", err)
95-
}
96-
return addHostMetadataProcessor, nil
97-
}
92+
if createProcessorError != nil {
93+
return nil, fmt.Errorf("failed to create processor '%s': %w", processorName, createProcessorError)
94+
}
9895

99-
func createAddKubernetesMetadataProcessor(cfg any, logpLogger *logp.Logger) (beat.Processor, error) {
100-
addKubernetesMetadataConfig, err := config.NewConfigFrom(cfg)
101-
if err != nil {
102-
return nil, fmt.Errorf("failed to create add_kubernetes_metadata processor config: %w", err)
96+
return processorInstance, nil
10397
}
104-
addKubernetesMetadataProcessor, err := add_kubernetes_metadata.New(addKubernetesMetadataConfig, logpLogger)
105-
if err != nil {
106-
return nil, fmt.Errorf("failed to create add_kubernetes_metadata processor: %w", err)
107-
}
108-
return addKubernetesMetadataProcessor, nil
98+
99+
return nil, errors.New("malformed processor config")
109100
}
110101

111102
func (p *beatProcessor) ConsumeLogs(_ context.Context, logs plog.Logs) (plog.Logs, error) {

0 commit comments

Comments
 (0)