-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathhealth_check.go
140 lines (119 loc) · 3.58 KB
/
health_check.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package couchdb
import (
"context"
"fmt"
"net/http"
"time"
kivik "github.com/go-kivik/kivik/v4"
"github.com/pace/bricks/maintenance/health/servicehealthcheck"
)
// HealthCheck checks the state of the object storage client. It must not be changed
// after it was registered as a health check.
type HealthCheck struct {
Name string
Client *kivik.Client
DB *kivik.DB
Config *Config
state servicehealthcheck.ConnectionState
}
var (
healthCheckTimeFormat = time.RFC3339
healthCheckConcurrentSpan = 10 * time.Second
)
// HealthCheck checks if the object storage client is healthy. If the last result is outdated,
// object storage is checked for upload and download,
// otherwise returns the old result
func (h *HealthCheck) HealthCheck(ctx context.Context) servicehealthcheck.HealthCheckResult {
if time.Since(h.state.LastChecked()) <= h.Config.HealthCheckResultTTL {
// the last health check is not outdated, an can be reused.
return h.state.GetState()
}
checkTime := time.Now()
var (
doc Doc
err error
row *kivik.Document
rev string
)
check:
// check if context was canceled
select {
case <-ctx.Done():
h.state.SetErrorState(fmt.Errorf("failed: %v", ctx.Err()))
return h.state.GetState()
default:
}
row = h.DB.Get(ctx, h.Config.HealthCheckKey)
if err := row.Err(); err != nil {
if kivik.HTTPStatus(err) == http.StatusNotFound {
goto put
}
h.state.SetErrorState(fmt.Errorf("failed to get: %#v", err))
return h.state.GetState()
}
defer row.Close()
// check if document exists
rev, err = row.Rev()
if err != nil {
h.state.SetErrorState(fmt.Errorf("failed to get document revision: %v", err))
}
if rev != "" {
err = row.ScanDoc(&doc)
if err != nil {
h.state.SetErrorState(fmt.Errorf("failed to get: %v", err))
return h.state.GetState()
}
// check was already perfromed
if wasConcurrentHealthCheck(checkTime, doc.Time) {
goto healthy
}
}
put:
// update document
doc.ID = h.Config.HealthCheckKey
doc.Time = time.Now().Format(healthCheckTimeFormat)
_, err = h.DB.Put(ctx, h.Config.HealthCheckKey, doc)
if err != nil {
// not yet created, try to create
if h.Config.DatabaseAutoCreate && kivik.HTTPStatus(err) == http.StatusNotFound {
err := h.Client.CreateDB(ctx, h.Name)
if err != nil {
h.state.SetErrorState(fmt.Errorf("failed to put object: %v", err))
return h.state.GetState()
}
goto put
}
if kivik.HTTPStatus(err) == http.StatusConflict {
goto check
}
h.state.SetErrorState(fmt.Errorf("failed to put object: %v", err))
return h.state.GetState()
}
// document was uploaded goto check
goto check
healthy:
// If uploading and downloading worked set the Health Check to healthy
h.state.SetHealthy()
return h.state.GetState()
}
type Doc struct {
ID string `json:"_id"`
Rev string `json:"_rev,omitempty"`
Time string `json:"at"`
}
// wasConcurrentHealthCheck checks if the time doesn't match in a certain
// time span concurrent request to the objstore may break the assumption
// that the value is the same, but in this case it would be acceptable.
// Assumption all instances are created equal and one providing evidence
// of a good write would be sufficient. See #244
func wasConcurrentHealthCheck(checkTime time.Time, observedValue string) bool {
t, err := time.Parse(healthCheckTimeFormat, observedValue)
if err == nil {
allowedStart := checkTime.Add(-healthCheckConcurrentSpan)
allowedEnd := checkTime.Add(healthCheckConcurrentSpan)
// timestamp we got from the document is in allowed range
// concider it healthy
return t.After(allowedStart) && t.Before(allowedEnd)
}
return false
}