Skip to content

Commit 8e059f1

Browse files
authored
Adding AgentHealth Middleware to Exporters (#266)
1 parent 2dd9562 commit 8e059f1

File tree

9 files changed

+217
-78
lines changed

9 files changed

+217
-78
lines changed

exporter/awscloudwatchlogsexporter/exporter.go

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ type cwlExporter struct {
3434
collectorID string
3535
svcStructuredLog *cwlogs.Client
3636
pusherFactory cwlogs.MultiStreamPusherFactory
37+
params exp.Settings
3738
}
3839

3940
type awsMetadata struct {
@@ -52,29 +53,16 @@ func newCwLogsPusher(expConfig *Config, params exp.Settings) (*cwlExporter, erro
5253
return nil, errors.New("awscloudwatchlogs exporter config is nil")
5354
}
5455

55-
// create AWS session
56-
awsConfig, session, err := awsutil.GetAWSConfigSession(params.Logger, &awsutil.Conn{}, &expConfig.AWSSessionSettings)
57-
if err != nil {
58-
return nil, err
59-
}
60-
61-
// create CWLogs client with aws session config
62-
svcStructuredLog := cwlogs.NewClient(params.Logger, awsConfig, params.BuildInfo, expConfig.LogGroupName, expConfig.LogRetention, expConfig.Tags, session)
6356
collectorIdentifier, err := uuid.NewRandom()
6457
if err != nil {
6558
return nil, err
6659
}
6760

68-
logStreamManager := cwlogs.NewLogStreamManager(*svcStructuredLog)
69-
multiStreamPusherFactory := cwlogs.NewMultiStreamPusherFactory(logStreamManager, *svcStructuredLog, params.Logger)
70-
7161
logsExporter := &cwlExporter{
72-
svcStructuredLog: svcStructuredLog,
73-
Config: expConfig,
74-
logger: params.Logger,
75-
retryCount: *awsConfig.MaxRetries,
76-
collectorID: collectorIdentifier.String(),
77-
pusherFactory: multiStreamPusherFactory,
62+
Config: expConfig,
63+
logger: params.Logger,
64+
collectorID: collectorIdentifier.String(),
65+
params: params,
7866
}
7967
return logsExporter, nil
8068
}
@@ -117,10 +105,25 @@ func (e *cwlExporter) consumeLogs(_ context.Context, ld plog.Logs) error {
117105
return errs
118106
}
119107

120-
func (e *cwlExporter) start(_ context.Context, host component.Host) error {
108+
func (e *cwlExporter) start(ctx context.Context, host component.Host) error {
109+
// Create AWS session
110+
awsConfig, session, err := awsutil.GetAWSConfigSession(e.logger, &awsutil.Conn{}, &e.Config.AWSSessionSettings)
111+
if err != nil {
112+
return fmt.Errorf("failed to create AWS session: %w", err)
113+
}
114+
115+
// Create CWLogs client with aws session config
116+
e.svcStructuredLog = cwlogs.NewClient(e.logger, awsConfig, e.params.BuildInfo, e.Config.LogGroupName, e.Config.LogRetention, e.Config.Tags, session)
117+
118+
e.retryCount = *awsConfig.MaxRetries
119+
120+
logStreamManager := cwlogs.NewLogStreamManager(*e.svcStructuredLog)
121+
e.pusherFactory = cwlogs.NewMultiStreamPusherFactory(logStreamManager, *e.svcStructuredLog, e.logger)
122+
121123
if e.Config.MiddlewareID != nil {
122124
awsmiddleware.TryConfigure(e.logger, host, *e.Config.MiddlewareID, awsmiddleware.SDKv1(e.svcStructuredLog.Handlers()))
123125
}
126+
124127
return nil
125128
}
126129

exporter/awscloudwatchlogsexporter/exporter_test.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,13 @@ func init() {
3232
type mockPusher struct {
3333
mock.Mock
3434
}
35+
type mockHost struct {
36+
component.Host
37+
}
38+
39+
func (m *mockHost) GetExtensions() map[component.ID]component.Component {
40+
return nil
41+
}
3542

3643
func (p *mockPusher) AddLogEntry(_ *cwlogs.Event) error {
3744
args := p.Called(nil)
@@ -509,7 +516,14 @@ func TestNewExporterWithoutRegionErr(t *testing.T) {
509516
factory := NewFactory()
510517
expCfg := factory.CreateDefaultConfig().(*Config)
511518
expCfg.MaxRetries = 0
519+
expCfg.Region = "" // Ensure the region is not set
520+
512521
exp, err := newCwLogsExporter(expCfg, exportertest.NewNopSettings())
513-
assert.Nil(t, exp)
514-
assert.Error(t, err)
522+
assert.NoError(t, err) // The exporter creation should not fail
523+
assert.NotNil(t, exp) // The exporter should be created
524+
525+
// Now try to start the exporter
526+
err = exp.Start(context.Background(), &mockHost{})
527+
assert.Error(t, err) // The start should fail due to missing region
528+
assert.Contains(t, err.Error(), "NoAwsRegion")
515529
}

exporter/awsemfexporter/emf_exporter.go

Lines changed: 48 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type emfExporter struct {
3939
pusherMap map[cwlogs.StreamKey]cwlogs.Pusher
4040
svcStructuredLog *cwlogs.Client
4141
config *Config
42+
set exporter.Settings
4243

4344
metricTranslator metricTranslator
4445

@@ -57,42 +58,23 @@ func newEmfExporter(config *Config, set exporter.Settings) (*emfExporter, error)
5758

5859
config.logger = set.Logger
5960

60-
// create AWS session
61-
awsConfig, session, err := awsutil.GetAWSConfigSession(set.Logger, &awsutil.Conn{}, &config.AWSSessionSettings)
62-
if err != nil {
63-
return nil, err
64-
}
65-
66-
// create CWLogs client with aws session config
67-
svcStructuredLog := cwlogs.NewClient(set.Logger,
68-
awsConfig,
69-
set.BuildInfo,
70-
config.LogGroupName,
71-
config.LogRetention,
72-
config.Tags,
73-
session,
74-
cwlogs.WithEnabledContainerInsights(config.IsEnhancedContainerInsights()),
75-
cwlogs.WithEnabledAppSignals(config.IsAppSignalsEnabled()),
76-
)
77-
7861
collectorIdentifier, err := uuid.NewRandom()
7962
if err != nil {
8063
return nil, err
8164
}
8265

66+
// Initialize emfExporter without AWS session and structured logs
8367
emfExporter := &emfExporter{
84-
svcStructuredLog: svcStructuredLog,
8568
config: config,
8669
metricTranslator: newMetricTranslator(*config),
87-
retryCnt: *awsConfig.MaxRetries,
70+
retryCnt: config.AWSSessionSettings.MaxRetries,
8871
collectorID: collectorIdentifier.String(),
8972
pusherMap: map[cwlogs.StreamKey]cwlogs.Pusher{},
9073
processResourceLabels: func(map[string]string) {},
9174
}
9275

9376
if config.IsAppSignalsEnabled() {
9477
userAgent := appsignals.NewUserAgent()
95-
svcStructuredLog.Handlers().Build.PushBackNamed(userAgent.Handler())
9678
emfExporter.processResourceLabels = userAgent.Process
9779
}
9880

@@ -148,7 +130,10 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e
148130
fmt.Println(*putLogEvent.InputLogEvent.Message)
149131
}
150132
} else if strings.EqualFold(outputDestination, outputDestinationCloudWatch) {
151-
emfPusher := emf.getPusher(putLogEvent.StreamKey)
133+
emfPusher, err := emf.getPusher(putLogEvent.StreamKey)
134+
if err != nil {
135+
return fmt.Errorf("failed to get pusher: %w", err)
136+
}
152137
if emfPusher != nil {
153138
returnError := emfPusher.AddLogEntry(putLogEvent)
154139
if returnError != nil {
@@ -177,12 +162,24 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e
177162
return nil
178163
}
179164

180-
func (emf *emfExporter) getPusher(key cwlogs.StreamKey) cwlogs.Pusher {
181-
var ok bool
182-
if _, ok = emf.pusherMap[key]; !ok {
183-
emf.pusherMap[key] = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.config.logger)
165+
func (emf *emfExporter) getPusher(key cwlogs.StreamKey) (cwlogs.Pusher, error) {
166+
emf.pusherMapLock.Lock()
167+
defer emf.pusherMapLock.Unlock()
168+
169+
if emf.svcStructuredLog == nil {
170+
return nil, errors.New("CloudWatch Logs client not initialized")
184171
}
185-
return emf.pusherMap[key]
172+
173+
pusher, exists := emf.pusherMap[key]
174+
if !exists {
175+
if emf.set.Logger != nil {
176+
pusher = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.set.Logger)
177+
} else {
178+
pusher = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.config.logger)
179+
}
180+
emf.pusherMap[key] = pusher
181+
}
182+
return pusher, nil
186183
}
187184

188185
func (emf *emfExporter) listPushers() []cwlogs.Pusher {
@@ -197,9 +194,32 @@ func (emf *emfExporter) listPushers() []cwlogs.Pusher {
197194
}
198195

199196
func (emf *emfExporter) start(_ context.Context, host component.Host) error {
197+
// Create AWS session here
198+
awsConfig, session, err := awsutil.GetAWSConfigSession(emf.config.logger, &awsutil.Conn{}, &emf.config.AWSSessionSettings)
199+
if err != nil {
200+
return err
201+
}
202+
203+
// create CWLogs client with aws session config
204+
svcStructuredLog := cwlogs.NewClient(emf.config.logger,
205+
awsConfig,
206+
emf.set.BuildInfo,
207+
emf.config.LogGroupName,
208+
emf.config.LogRetention,
209+
emf.config.Tags,
210+
session,
211+
cwlogs.WithEnabledContainerInsights(emf.config.IsEnhancedContainerInsights()),
212+
cwlogs.WithEnabledAppSignals(emf.config.IsAppSignalsEnabled()),
213+
)
214+
215+
// Assign to the struct
216+
emf.svcStructuredLog = svcStructuredLog
217+
218+
// Optionally configure middleware
200219
if emf.config.MiddlewareID != nil {
201-
awsmiddleware.TryConfigure(emf.config.logger, host, *emf.config.MiddlewareID, awsmiddleware.SDKv1(emf.svcStructuredLog.Handlers()))
220+
awsmiddleware.TryConfigure(emf.config.logger, host, *emf.config.MiddlewareID, awsmiddleware.SDKv1(svcStructuredLog.Handlers()))
202221
}
222+
203223
return nil
204224
}
205225

0 commit comments

Comments
 (0)