Skip to content

Commit d8ea9a0

Browse files
fix: load healthcheck v2 once when edot is running as subprocess (#9848)
* ci: extend unit-test to validate number of healthcheck extensions in edot subprocess mode * fix: instantiate and use healthcheck extension id once
1 parent 7242558 commit d8ea9a0

File tree

4 files changed

+87
-37
lines changed

4 files changed

+87
-37
lines changed

internal/pkg/otel/manager/execution_subprocess.go

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313
"os/exec"
1414
"time"
1515

16+
"github.com/gofrs/uuid/v5"
17+
"go.opentelemetry.io/collector/component"
1618
"gopkg.in/yaml.v3"
1719

1820
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
@@ -34,22 +36,34 @@ const (
3436
OtelSupervisedLoggingLevelFlagName = "supervised.logging.level"
3537
)
3638

37-
func newSubprocessExecution(logLevel logp.Level, collectorPath string) *subprocessExecution {
39+
func newSubprocessExecution(logLevel logp.Level, collectorPath string) (*subprocessExecution, error) {
40+
nsUUID, err := uuid.NewV4()
41+
if err != nil {
42+
return nil, fmt.Errorf("cannot generate UUID: %w", err)
43+
}
44+
componentType, err := component.NewType(healthCheckExtensionName)
45+
if err != nil {
46+
return nil, fmt.Errorf("cannot create component type: %w", err)
47+
}
48+
healthCheckExtensionID := component.NewIDWithName(componentType, nsUUID.String()).String()
49+
3850
return &subprocessExecution{
3951
collectorPath: collectorPath,
4052
collectorArgs: []string{
4153
"otel",
4254
fmt.Sprintf("--%s", OtelSetSupervisedFlagName),
4355
fmt.Sprintf("--%s=%s", OtelSupervisedLoggingLevelFlagName, logLevel.String()),
4456
},
45-
logLevel: logLevel,
46-
}
57+
logLevel: logLevel,
58+
healthCheckExtensionID: healthCheckExtensionID,
59+
}, nil
4760
}
4861

4962
type subprocessExecution struct {
50-
collectorPath string
51-
collectorArgs []string
52-
logLevel logp.Level
63+
collectorPath string
64+
collectorArgs []string
65+
logLevel logp.Level
66+
healthCheckExtensionID string
5367
}
5468

5569
// startCollector starts a supervised collector and monitors its health. Process exit errors are sent to the
@@ -75,7 +89,7 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger
7589
return nil, fmt.Errorf("could not find port for http health check: %w", err)
7690
}
7791

78-
if err := injectHeathCheckV2Extension(cfg, httpHealthCheckPort); err != nil {
92+
if err := injectHeathCheckV2Extension(cfg, r.healthCheckExtensionID, httpHealthCheckPort); err != nil {
7993
return nil, fmt.Errorf("failed to inject health check extension: %w", err)
8094
}
8195

internal/pkg/otel/manager/healthcheck.go

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@ import (
1313
"net/http"
1414
"time"
1515

16-
"github.com/gofrs/uuid/v5"
17-
"go.opentelemetry.io/collector/component"
1816
"go.opentelemetry.io/collector/confmap"
1917

2018
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
@@ -197,14 +195,8 @@ func aggregateStatus(sts componentstatus.Status, err error) *status.AggregateSta
197195
}
198196

199197
// injectHeathCheckV2Extension injects the healthcheckv2 extension into the provided configuration.
200-
func injectHeathCheckV2Extension(conf *confmap.Conf, httpHealthCheckPort int) error {
201-
nsUUID, err := uuid.NewV4()
202-
if err != nil {
203-
return fmt.Errorf("cannot generate UUID V4: %w", err)
204-
}
205-
componentType := component.MustNewType(healthCheckExtensionName)
206-
healthCheckExtensionID := component.NewIDWithName(componentType, nsUUID.String()).String()
207-
err = conf.Merge(confmap.NewFromStringMap(map[string]interface{}{
198+
func injectHeathCheckV2Extension(conf *confmap.Conf, healthCheckExtensionID string, httpHealthCheckPort int) error {
199+
err := conf.Merge(confmap.NewFromStringMap(map[string]interface{}{
208200
"extensions": map[string]interface{}{
209201
healthCheckExtensionID: map[string]interface{}{
210202
"use_v2": true,

internal/pkg/otel/manager/manager.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,10 @@ func NewOTelManager(
123123
return nil, fmt.Errorf("failed to get the path to the collector executable: %w", err)
124124
}
125125
recoveryTimer = newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute)
126-
exec = newSubprocessExecution(logLevel, executable)
126+
exec, err = newSubprocessExecution(logLevel, executable)
127+
if err != nil {
128+
return nil, fmt.Errorf("failed to create subprocess execution: %w", err)
129+
}
127130
case EmbeddedExecutionMode:
128131
recoveryTimer = newRestarterNoop()
129132
exec = newExecutionEmbedded()

internal/pkg/otel/manager/manager_test.go

Lines changed: 60 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"errors"
1212
"os"
1313
"path/filepath"
14+
"strings"
1415
"sync"
1516
"testing"
1617
"time"
@@ -264,6 +265,22 @@ func (t *EventTime[T]) Time() time.Time {
264265
return t.time
265266
}
266267

268+
func countHealthCheckExtensionStatuses(status *status.AggregateStatus) uint {
269+
extensions, ok := status.ComponentStatusMap["extensions"]
270+
if !ok {
271+
return 0
272+
}
273+
274+
count := uint(0)
275+
for key := range extensions.ComponentStatusMap {
276+
if strings.HasPrefix(key, "extension:healthcheckv2/") {
277+
count++
278+
}
279+
}
280+
281+
return count
282+
}
283+
267284
func TestOTelManager_Run(t *testing.T) {
268285
wd, erWd := os.Getwd()
269286
require.NoError(t, erWd, "cannot get working directory")
@@ -273,14 +290,16 @@ func TestOTelManager_Run(t *testing.T) {
273290

274291
for _, tc := range []struct {
275292
name string
276-
exec *testExecution
293+
exec func() (collectorExecution, error)
277294
restarter collectorRecoveryTimer
278295
skipListeningErrors bool
279296
testFn func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution)
280297
}{
281298
{
282-
name: "embedded collector config updates",
283-
exec: &testExecution{exec: newExecutionEmbedded()},
299+
name: "embedded collector config updates",
300+
exec: func() (collectorExecution, error) {
301+
return newExecutionEmbedded(), nil
302+
},
284303
restarter: newRestarterNoop(),
285304
testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution) {
286305
// ensure that it got healthy
@@ -302,8 +321,10 @@ func TestOTelManager_Run(t *testing.T) {
302321
},
303322
},
304323
{
305-
name: "subprocess collector config updates",
306-
exec: &testExecution{exec: newSubprocessExecution(logp.DebugLevel, testBinary)},
324+
name: "subprocess collector config updates",
325+
exec: func() (collectorExecution, error) {
326+
return newSubprocessExecution(logp.DebugLevel, testBinary)
327+
},
307328
restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute),
308329
testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution) {
309330
// ensure that it got healthy
@@ -326,8 +347,10 @@ func TestOTelManager_Run(t *testing.T) {
326347
},
327348
},
328349
{
329-
name: "embedded collector stopped gracefully outside manager",
330-
exec: &testExecution{exec: newExecutionEmbedded()},
350+
name: "embedded collector stopped gracefully outside manager",
351+
exec: func() (collectorExecution, error) {
352+
return newExecutionEmbedded(), nil
353+
},
331354
restarter: newRestarterNoop(),
332355
testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution) {
333356
// ensure that it got healthy
@@ -350,8 +373,10 @@ func TestOTelManager_Run(t *testing.T) {
350373
},
351374
},
352375
{
353-
name: "subprocess collector stopped gracefully outside manager",
354-
exec: &testExecution{exec: newSubprocessExecution(logp.DebugLevel, testBinary)},
376+
name: "subprocess collector stopped gracefully outside manager",
377+
exec: func() (collectorExecution, error) {
378+
return newSubprocessExecution(logp.DebugLevel, testBinary)
379+
},
355380
restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute),
356381
testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution) {
357382
// ensure that it got healthy
@@ -365,6 +390,7 @@ func TestOTelManager_Run(t *testing.T) {
365390
require.NotNil(t, exec.handle, "exec handle should not be nil")
366391
exec.handle.Stop(t.Context())
367392
e.EnsureHealthy(t, updateTime)
393+
assert.EqualValues(t, 1, countHealthCheckExtensionStatuses(e.getStatus()), "health check extension status count should be 1")
368394

369395
// no configuration should stop the runner
370396
updateTime = time.Now()
@@ -375,15 +401,18 @@ func TestOTelManager_Run(t *testing.T) {
375401
},
376402
},
377403
{
378-
name: "subprocess collector killed outside manager",
379-
exec: &testExecution{exec: newSubprocessExecution(logp.DebugLevel, testBinary)},
404+
name: "subprocess collector killed outside manager",
405+
exec: func() (collectorExecution, error) {
406+
return newSubprocessExecution(logp.DebugLevel, testBinary)
407+
},
380408
restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute),
381409
testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution) {
382410
// ensure that it got healthy
383411
cfg := confmap.NewFromStringMap(testConfig)
384412
updateTime := time.Now()
385413
m.Update(cfg, nil)
386414
e.EnsureHealthy(t, updateTime)
415+
assert.EqualValues(t, 1, countHealthCheckExtensionStatuses(e.getStatus()), "health check extension status count should be 1")
387416

388417
var oldPHandle *procHandle
389418
// repeatedly kill the collector
@@ -401,6 +430,7 @@ func TestOTelManager_Run(t *testing.T) {
401430
// the collector should restart and report healthy
402431
updateTime = time.Now()
403432
e.EnsureHealthy(t, updateTime)
433+
assert.EqualValues(t, 1, countHealthCheckExtensionStatuses(e.getStatus()), "health check extension status count should be 1")
404434
}
405435

406436
seenRecoveredTimes := m.recoveryRetries.Load()
@@ -414,8 +444,10 @@ func TestOTelManager_Run(t *testing.T) {
414444
},
415445
},
416446
{
417-
name: "subprocess collector panics",
418-
exec: &testExecution{exec: newSubprocessExecution(logp.DebugLevel, testBinary)},
447+
name: "subprocess collector panics",
448+
exec: func() (collectorExecution, error) {
449+
return newSubprocessExecution(logp.DebugLevel, testBinary)
450+
},
419451
restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute),
420452
testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution) {
421453
err := os.Setenv("TEST_SUPERVISED_COLLECTOR_PANIC", (3 * time.Second).String())
@@ -448,8 +480,10 @@ func TestOTelManager_Run(t *testing.T) {
448480
},
449481
},
450482
{
451-
name: "embedded collector invalid config",
452-
exec: &testExecution{exec: newExecutionEmbedded()},
483+
name: "embedded collector invalid config",
484+
exec: func() (collectorExecution, error) {
485+
return newExecutionEmbedded(), nil
486+
},
453487
restarter: newRestarterNoop(),
454488
skipListeningErrors: true,
455489
testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution) {
@@ -487,8 +521,10 @@ func TestOTelManager_Run(t *testing.T) {
487521
},
488522
},
489523
{
490-
name: "subprocess collector invalid config",
491-
exec: &testExecution{exec: newSubprocessExecution(logp.DebugLevel, testBinary)},
524+
name: "subprocess collector invalid config",
525+
exec: func() (collectorExecution, error) {
526+
return newSubprocessExecution(logp.DebugLevel, testBinary)
527+
},
492528
restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute),
493529
skipListeningErrors: true,
494530
testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution) {
@@ -531,6 +567,11 @@ func TestOTelManager_Run(t *testing.T) {
531567
defer cancel()
532568
l, _ := loggertest.New("otel")
533569
base, obs := loggertest.New("otel")
570+
571+
executionMode, err := tc.exec()
572+
require.NoError(t, err, "failed to create execution mode")
573+
testExecutionMode := &testExecution{exec: executionMode}
574+
534575
m := &OTelManager{
535576
logger: l,
536577
baseLogger: base,
@@ -540,7 +581,7 @@ func TestOTelManager_Run(t *testing.T) {
540581
componentStateCh: make(chan []runtime.ComponentComponentState, 1),
541582
doneChan: make(chan struct{}),
542583
recoveryTimer: tc.restarter,
543-
execution: tc.exec,
584+
execution: testExecutionMode,
544585
}
545586

546587
eListener := &EventListener{}
@@ -573,7 +614,7 @@ func TestOTelManager_Run(t *testing.T) {
573614
runErr = m.Run(ctx)
574615
}()
575616

576-
tc.testFn(t, m, eListener, tc.exec)
617+
tc.testFn(t, m, eListener, testExecutionMode)
577618

578619
cancel()
579620
runWg.Wait()

0 commit comments

Comments
 (0)