Skip to content

Commit f5efa37

Browse files
author
MJarmo
committed
more fixes
Signed-off-by: MJarmo <michal.jarmolkiewicz@sap.com>
1 parent 74a756b commit f5efa37

File tree

9 files changed

+139
-174
lines changed

9 files changed

+139
-174
lines changed

.checkapi.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ ignored_paths:
3434
- processor/tailsamplingprocessor
3535
- receiver/hostmetricsreceiver # issue with the parser not identifying Config as a config struct
3636
- receiver/pulsarreceiver # 38930
37+
- receiver/auditlogreceiver/test-standalone # test module
3738
- exporter/pulsarexporter # 38929
39+
- test-auditlog # test module
3840
- exporter/elasticsearchexporter/integrationtest
3941
- testbed/mockdatasenders/mockdatadogagentexporter
4042
- extension/sumologicextension # 40655

receiver/auditlogreceiver/circuitbreaker.go

Lines changed: 38 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -4,78 +4,78 @@
44
package auditlogreceiver
55

66
import (
7-
"fmt"
7+
"errors"
88
"sync"
99
"time"
1010

1111
"go.uber.org/zap"
1212
)
1313

14-
type CircuitBreakerState int
14+
type circuitBreakerState int
1515

1616
const (
17-
CircuitClosed CircuitBreakerState = iota
18-
CircuitOpen
19-
CircuitHalfOpen
17+
circuitClosed circuitBreakerState = iota
18+
circuitOpen
19+
circuitHalfOpen
2020
)
2121

22-
type CircuitBreaker struct {
23-
state CircuitBreakerState
22+
type circuitBreaker struct {
23+
state circuitBreakerState
2424
consecutiveFails int
2525
lastFailureTime time.Time
2626
mutex sync.RWMutex
2727
logger *zap.Logger
2828
config CircuitBreakerConfig
2929
}
3030

31-
func NewCircuitBreaker(config CircuitBreakerConfig, logger *zap.Logger) *CircuitBreaker {
32-
return &CircuitBreaker{
33-
state: CircuitClosed,
31+
func newCircuitBreaker(config CircuitBreakerConfig, logger *zap.Logger) *circuitBreaker {
32+
return &circuitBreaker{
33+
state: circuitClosed,
3434
config: config,
3535
logger: logger,
3636
}
3737
}
3838

39-
func (cb *CircuitBreaker) IsOpen() bool {
39+
func (cb *circuitBreaker) IsOpen() bool {
4040
cb.mutex.RLock()
4141
defer cb.mutex.RUnlock()
4242

43-
if cb.state == CircuitOpen {
43+
if cb.state == circuitOpen {
4444
openDuration := cb.config.CircuitOpenDuration
4545
if openDuration == 0 {
4646
openDuration = time.Minute
4747
}
4848
if time.Since(cb.lastFailureTime) >= openDuration {
49-
cb.state = CircuitHalfOpen
49+
cb.state = circuitHalfOpen
5050
cb.logger.Info("Circuit breaker transitioning to half-open state")
5151
}
52-
return cb.state == CircuitOpen
52+
return cb.state == circuitOpen
5353
}
5454
return false
5555
}
5656

57-
func (cb *CircuitBreaker) IsHalfOpen() bool {
57+
func (cb *circuitBreaker) IsHalfOpen() bool {
5858
cb.mutex.RLock()
5959
defer cb.mutex.RUnlock()
60-
return cb.state == CircuitHalfOpen
60+
return cb.state == circuitHalfOpen
6161
}
6262

63-
func (cb *CircuitBreaker) RecordSuccess() {
63+
func (cb *circuitBreaker) RecordSuccess() {
6464
cb.mutex.Lock()
6565
defer cb.mutex.Unlock()
6666

6767
cb.consecutiveFails = 0
6868
previousState := cb.state
69-
cb.state = CircuitClosed
69+
cb.state = circuitClosed
7070

71-
if previousState == CircuitHalfOpen {
71+
if previousState == circuitHalfOpen {
7272
cb.logger.Info("Circuit breaker transitioned from half-open to closed due to successful operation")
7373
} else {
7474
cb.logger.Info("Circuit breaker closed due to successful operation")
7575
}
7676
}
7777

78-
func (cb *CircuitBreaker) RecordFailure() {
78+
func (cb *circuitBreaker) RecordFailure() {
7979
cb.mutex.Lock()
8080
defer cb.mutex.Unlock()
8181

@@ -87,69 +87,60 @@ func (cb *CircuitBreaker) RecordFailure() {
8787
threshold = 5
8888
}
8989

90-
if cb.state == CircuitHalfOpen {
91-
cb.state = CircuitOpen
90+
if cb.state == circuitHalfOpen {
91+
cb.state = circuitOpen
9292
cb.logger.Warn("Circuit breaker reopened due to failure in half-open state",
9393
zap.Int("failures", cb.consecutiveFails))
9494
} else if cb.consecutiveFails >= threshold {
95-
cb.state = CircuitOpen
95+
cb.state = circuitOpen
9696
cb.logger.Warn("Circuit breaker opened due to consecutive failures",
9797
zap.Int("failures", cb.consecutiveFails))
9898
}
9999
}
100100

101-
// for debugging/monitoring
102-
func (cb *CircuitBreaker) GetState() CircuitBreakerState {
101+
func (cb *circuitBreaker) getState() circuitBreakerState {
103102
cb.mutex.RLock()
104103
defer cb.mutex.RUnlock()
105104
return cb.state
106105
}
107106

108-
// for debugging/monitoring
109-
func (cb *CircuitBreaker) GetConsecutiveFailures() int {
110-
cb.mutex.RLock()
111-
defer cb.mutex.RUnlock()
112-
return cb.consecutiveFails
113-
}
114-
115-
func (cb *CircuitBreaker) ShouldAttemptProcessing() bool {
107+
func (cb *circuitBreaker) shouldAttemptProcessing() bool {
116108
cb.mutex.RLock()
117109
defer cb.mutex.RUnlock()
118110

119-
return cb.state == CircuitClosed || cb.state == CircuitHalfOpen
111+
return cb.state == circuitClosed || cb.state == circuitHalfOpen
120112
}
121113

122114
// CheckAndUpdateState checks if the circuit breaker should transition states
123-
func (cb *CircuitBreaker) CheckAndUpdateState() {
115+
func (cb *circuitBreaker) checkAndUpdateState() {
124116
cb.mutex.Lock()
125117
defer cb.mutex.Unlock()
126118

127-
if cb.state == CircuitOpen {
119+
if cb.state == circuitOpen {
128120
openDuration := cb.config.CircuitOpenDuration
129121
if openDuration == 0 {
130122
openDuration = time.Minute
131123
}
132124
if time.Since(cb.lastFailureTime) >= openDuration {
133-
cb.state = CircuitHalfOpen
125+
cb.state = circuitHalfOpen
134126
cb.logger.Info("Circuit breaker transitioning to half-open state")
135127
}
136128
}
137129
}
138130

139-
// CheckCircuitBreakerState checks and updates circuit breaker state, returns true if processing should continue
140-
func (cb *CircuitBreaker) CheckCircuitBreakerState(entryID string) (bool, error) {
141-
// Check and update circuit breaker state
142-
cb.CheckAndUpdateState()
131+
// checkCircuitBreakerState checks and updates circuit breaker state, returns true if processing should continue
132+
func (cb *circuitBreaker) checkCircuitBreakerState(entryID string) (bool, error) {
133+
cb.checkAndUpdateState()
143134

144-
if !cb.ShouldAttemptProcessing() {
145-
return false, fmt.Errorf("circuit breaker is open, skipping processing")
135+
if !cb.shouldAttemptProcessing() {
136+
return false, errors.New("circuit breaker is open, skipping processing")
146137
}
147138

148-
// Log the circuit breaker state for debugging
149-
state := cb.GetState()
150-
if state == CircuitHalfOpen {
139+
state := cb.getState()
140+
switch state {
141+
case circuitHalfOpen:
151142
cb.logger.Debug("Processing log in half-open state to test connectivity", zap.String("id", entryID))
152-
} else if state == CircuitClosed {
143+
case circuitClosed:
153144
cb.logger.Debug("Processing log in closed state (normal operation)", zap.String("id", entryID))
154145
}
155146

receiver/auditlogreceiver/examples/otlp_request_example.go renamed to receiver/auditlogreceiver/examples/request_example/otlp_request_example.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4+
//go:build ignore
5+
46
package main
57

68
import (

receiver/auditlogreceiver/examples/test_otlp_client.go renamed to receiver/auditlogreceiver/examples/test_client/test_otlp_client.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4+
//go:build ignore
5+
46
package main
57

68
import (

receiver/auditlogreceiver/factory.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@ package auditlogreceiver
66
import (
77
"context"
88

9-
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/auditlogreceiver/internal/metadata"
109
"go.opentelemetry.io/collector/component"
1110
"go.opentelemetry.io/collector/config/confighttp"
1211
"go.opentelemetry.io/collector/consumer"
1312
"go.opentelemetry.io/collector/receiver"
13+
14+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/auditlogreceiver/internal/metadata"
1415
)
1516

1617
func NewFactory() receiver.Factory {
@@ -30,10 +31,10 @@ func createDefaultConfig() component.Config {
3031
}
3132
}
3233

33-
//type CreateLogsFunc func(context.Context, Settings, component.Config, consumer.Logs) (Logs, error)
34+
// type CreateLogsFunc func(context.Context, Settings, component.Config, consumer.Logs) (Logs, error)
3435

3536
func createLogsReceiver(
36-
ctx context.Context,
37+
_ context.Context,
3738
set receiver.Settings,
3839
cfg component.Config,
3940
consumer consumer.Logs,

receiver/auditlogreceiver/otlp_test.go

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ type mockConsumer struct {
2323
logs []plog.Logs
2424
}
2525

26-
func (m *mockConsumer) ConsumeLogs(ctx context.Context, logs plog.Logs) error {
26+
func (m *mockConsumer) ConsumeLogs(_ context.Context, logs plog.Logs) error {
2727
m.logs = append(m.logs, logs)
2828
return nil
2929
}
3030

31-
func (m *mockConsumer) Capabilities() consumer.Capabilities {
31+
func (*mockConsumer) Capabilities() consumer.Capabilities {
3232
return consumer.Capabilities{}
3333
}
3434

@@ -64,33 +64,26 @@ func TestHandleOTLPProtobuf(t *testing.T) {
6464
t.Fatalf("Failed to marshal OTLP request: %v", err)
6565
}
6666

67-
// Create HTTP request
68-
req := httptest.NewRequest("POST", "/v1/logs", bytes.NewReader(requestData))
67+
req := httptest.NewRequest(http.MethodPost, "/v1/logs", bytes.NewReader(requestData))
6968
req.Header.Set("Content-Type", "application/x-protobuf")
7069

71-
// Create response recorder
7270
w := httptest.NewRecorder()
7371

74-
// Handle the request
7572
r.handleAuditLogs(w, req)
7673

77-
// Check response
7874
if w.Code != http.StatusOK {
7975
t.Errorf("Expected status 200, got %d", w.Code)
8076
}
8177

82-
// Check content type
8378
contentType := w.Header().Get("Content-Type")
8479
if contentType != "application/x-protobuf" {
8580
t.Errorf("Expected content type 'application/x-protobuf', got '%s'", contentType)
8681
}
8782

88-
// Check that logs were consumed
8983
if len(mockConsumer.logs) == 0 {
9084
t.Error("Expected logs to be consumed")
9185
}
9286

93-
// Verify the response is valid OTLP protobuf
9487
responseData := w.Body.Bytes()
9588
otlpResp := plogotlp.NewExportResponse()
9689
if err := otlpResp.UnmarshalProto(responseData); err != nil {
@@ -122,28 +115,22 @@ func TestHandleOTLPProtobufEmptyLogs(t *testing.T) {
122115
t.Fatalf("Failed to marshal OTLP request: %v", err)
123116
}
124117

125-
// Create HTTP request
126-
req := httptest.NewRequest("POST", "/v1/logs", bytes.NewReader(requestData))
118+
req := httptest.NewRequest(http.MethodPost, "/v1/logs", bytes.NewReader(requestData))
127119
req.Header.Set("Content-Type", "application/x-protobuf")
128120

129-
// Create response recorder
130121
w := httptest.NewRecorder()
131122

132-
// Handle the request
133123
r.handleAuditLogs(w, req)
134124

135-
// Check response
136125
if w.Code != http.StatusOK {
137126
t.Errorf("Expected status 200, got %d", w.Code)
138127
}
139128

140-
// Check content type
141129
contentType := w.Header().Get("Content-Type")
142130
if contentType != "application/x-protobuf" {
143131
t.Errorf("Expected content type 'application/x-protobuf', got '%s'", contentType)
144132
}
145133

146-
// Verify the response is valid OTLP protobuf
147134
responseData := w.Body.Bytes()
148135
otlpResp := plogotlp.NewExportResponse()
149136
if err := otlpResp.UnmarshalProto(responseData); err != nil {
@@ -152,23 +139,19 @@ func TestHandleOTLPProtobufEmptyLogs(t *testing.T) {
152139
}
153140

154141
func TestHandleOTLPProtobufInvalidData(t *testing.T) {
155-
// Create a mock consumer
156142
mockConsumer := &mockConsumer{}
157143

158-
// Create receiver config
159144
cfg := &Config{
160145
ServerConfig: confighttp.NewDefaultServerConfig(),
161146
}
162147

163-
// Create receiver
164148
settings := receivertest.NewNopSettings(metadata.Type)
165149
r, err := NewReceiver(cfg, settings, mockConsumer)
166150
if err != nil {
167151
t.Fatalf("Failed to create receiver: %v", err)
168152
}
169153

170-
// Create HTTP request with invalid protobuf data
171-
req := httptest.NewRequest("POST", "/v1/logs", bytes.NewReader([]byte("invalid protobuf data")))
154+
req := httptest.NewRequest(http.MethodPost, "/v1/logs", bytes.NewReader([]byte("invalid protobuf data")))
172155
req.Header.Set("Content-Type", "application/x-protobuf")
173156

174157
// Create response recorder

0 commit comments

Comments
 (0)