-
Notifications
You must be signed in to change notification settings - Fork 3.5k
Expand file tree
/
Copy pathhealthcheckextension.go
More file actions
108 lines (90 loc) · 2.79 KB
/
healthcheckextension.go
File metadata and controls
108 lines (90 loc) · 2.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package healthcheckextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension"
import (
"context"
"errors"
"fmt"
"net/http"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/extension/extensioncapabilities"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension/internal/healthcheck"
)
type healthCheckExtension struct {
config Config
logger *zap.Logger
state *healthcheck.HealthCheck
server *http.Server
stopCh chan struct{}
settings component.TelemetrySettings
}
var _ extensioncapabilities.PipelineWatcher = (*healthCheckExtension)(nil)
func (hc *healthCheckExtension) Start(ctx context.Context, host component.Host) error {
hc.logger.Info("Starting health_check extension", zap.Any("config", hc.config))
ln, err := hc.config.ToListener(ctx)
if err != nil {
return fmt.Errorf("failed to bind to address %s: %w", hc.config.NetAddr.Endpoint, err)
}
hc.server, err = hc.config.ToServer(ctx, host.GetExtensions(), hc.settings, nil)
if err != nil {
return err
}
// Mount HC handler
mux := http.NewServeMux()
mux.Handle(hc.config.Path, hc.baseHandler())
hc.server.Handler = mux
hc.stopCh = make(chan struct{})
go func() {
defer close(hc.stopCh)
// The listener ownership goes to the server.
if err = hc.server.Serve(ln); !errors.Is(err, http.ErrServerClosed) && err != nil {
componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err))
}
}()
return nil
}
// base handler function
func (hc *healthCheckExtension) baseHandler() http.Handler {
if hc.config.ResponseBody != nil {
return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
if hc.state.Get() == healthcheck.Ready {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(hc.config.ResponseBody.Healthy))
} else {
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = w.Write([]byte(hc.config.ResponseBody.Unhealthy))
}
})
}
return hc.state.Handler()
}
func (hc *healthCheckExtension) Shutdown(context.Context) error {
if hc.server == nil {
return nil
}
err := hc.server.Close()
if hc.stopCh != nil {
<-hc.stopCh
}
return err
}
func (hc *healthCheckExtension) Ready() error {
hc.state.Set(healthcheck.Ready)
return nil
}
func (hc *healthCheckExtension) NotReady() error {
hc.state.Set(healthcheck.Unavailable)
return nil
}
func newServer(config Config, settings component.TelemetrySettings) *healthCheckExtension {
hc := &healthCheckExtension{
config: config,
logger: settings.Logger,
state: healthcheck.New(),
settings: settings,
}
hc.state.SetLogger(settings.Logger)
return hc
}