-
Notifications
You must be signed in to change notification settings - Fork 389
/
Copy pathoperation_manager.go
460 lines (411 loc) · 15.5 KB
/
operation_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
// Copyright 2016 Google LLC. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package log holds the code that is specific to Trillian logs core operation,
// particularly the code for sequencing.
package log
import (
"context"
"fmt"
"reflect"
"sort"
"strconv"
"sync"
"time"
"github.com/google/trillian/extension"
"github.com/google/trillian/monitoring"
"github.com/google/trillian/storage"
"github.com/google/trillian/util/clock"
"github.com/google/trillian/util/election"
"golang.org/x/sync/semaphore"
"k8s.io/klog/v2"
)
var (
// DefaultTimeout is the default timeout on a single log operation run.
DefaultTimeout = 60 * time.Second
once sync.Once
knownLogs monitoring.Gauge
resignations monitoring.Counter
isMaster monitoring.Gauge
signingRuns monitoring.Counter
failedSigningRuns monitoring.Counter
entriesAdded monitoring.Counter
batchesAdded monitoring.Counter
)
func createMetrics(mf monitoring.MetricFactory) {
if mf == nil {
mf = monitoring.InertMetricFactory{}
}
knownLogs = mf.NewGauge("known_logs", "Set to 1 for known logs (whether this instance is master or not)", logIDLabel)
resignations = mf.NewCounter("master_resignations", "Number of mastership resignations", logIDLabel)
isMaster = mf.NewGauge("is_master", "Whether this instance is master (0/1)", logIDLabel)
signingRuns = mf.NewCounter("signing_runs", "Number of times a signing run has succeeded", logIDLabel)
failedSigningRuns = mf.NewCounter("failed_signing_runs", "Number of times a signing run has failed", logIDLabel)
// entriesAdded is the total number of entries that have been added to the
// log during the lifetime of a signer. This allows an operator to determine
// that the queue is empty for a particular log; if signing runs are succeeding
// but nothing is being processed then this counter will stop increasing.
entriesAdded = mf.NewCounter("entries_added", "Number of entries added to the log", logIDLabel)
// batchesAdded is the number of times a signing run caused entries to be
// integrated into the log. The value batchesAdded / signingRuns is an
// indication of how often the signer runs but does no work. The value of
// entriesAdded / batchesAdded is average batch size. These can be used for
// tuning sequencing or evaluating performance.
batchesAdded = mf.NewCounter("batches_added", "Number of times a non zero number of entries was added", logIDLabel)
}
// Operation defines a task that operates on a log. Examples are scheduling, signing,
// consistency checking or cleanup.
type Operation interface {
// ExecutePass performs a single pass of processing on a single log. It returns
// a count of items processed (for logging) and an error.
ExecutePass(ctx context.Context, logID int64, info *OperationInfo) (int, error)
}
// OperationInfo bundles up information needed for running a set of Operations.
type OperationInfo struct {
// Registry provides access to Trillian storage.
Registry extension.Registry
// The following parameters are passed to individual Operations.
// BatchSize is the batch size to be passed to tasks run by this manager.
BatchSize int
// TimeSource should be used by the Operation to allow mocking for tests.
TimeSource clock.TimeSource
// The following parameters govern the overall scheduling of Operations
// by a OperationManager.
// Election-related configuration. Copied for each log.
ElectionConfig election.RunnerConfig
// RunInterval is the time between starting batches of processing. If a
// batch takes longer than this interval to complete, the next batch
// will start immediately.
RunInterval time.Duration
// NumWorkers is the number of worker goroutines to run in parallel.
NumWorkers int
// Timeout sets an optional timeout on each operation run.
// If unset, default to the value of DefaultTimeout.
Timeout time.Duration
}
// OperationManager controls scheduling activities for logs.
type OperationManager struct {
info OperationInfo
// logOperation is the task that gets run for active logs.
logOperation Operation
// runnerWG groups all goroutines with election Runners.
runnerWG sync.WaitGroup
// runnerCancels contains cancel function for each logID election Runner.
runnerCancels map[string]context.CancelFunc
// pendingResignations delivers resignation requests from election Runners.
pendingResignations chan election.Resignation
tracker *election.MasterTracker
// Cache of logID => name. Names are assumed not to change during runtime.
logNames map[int64]string
// A recent list of active logs that this instance is master for.
lastHeld []int64
// idsMutex guards logNames and lastHeld fields.
idsMutex sync.Mutex
}
// NewOperationManager creates a new OperationManager instance.
func NewOperationManager(info OperationInfo, logOperation Operation) *OperationManager {
once.Do(func() {
createMetrics(info.Registry.MetricFactory)
})
if info.Timeout == 0 {
info.Timeout = DefaultTimeout
}
tracker := election.NewMasterTracker(nil, func(id string, v bool) {
val := 0.0
if v {
val = 1.0
}
isMaster.Set(val, id)
})
return &OperationManager{
info: info,
logOperation: logOperation,
runnerCancels: make(map[string]context.CancelFunc),
pendingResignations: make(chan election.Resignation, 100),
tracker: tracker,
logNames: make(map[int64]string),
}
}
// logName maps a logID to a human-readable name, caching results along the way.
// The human-readable name may non-unique so should only be used for diagnostics.
func (o *OperationManager) logName(ctx context.Context, logID int64) string {
o.idsMutex.Lock()
defer o.idsMutex.Unlock()
if name, ok := o.logNames[logID]; ok {
return name
}
tree, err := storage.GetTree(ctx, o.info.Registry.AdminStorage, logID)
if err != nil {
klog.Errorf("%v: failed to get log info: %v", logID, err)
return "<err>"
}
name := tree.DisplayName
if name == "" {
name = fmt.Sprintf("<log-%d>", logID)
}
o.logNames[logID] = name
return o.logNames[logID]
}
func (o *OperationManager) heldInfo(ctx context.Context, logIDs []int64) string {
names := make([]string, 0, len(logIDs))
for _, logID := range logIDs {
names = append(names, o.logName(ctx, logID))
}
sort.Strings(names)
result := "master for:"
for _, name := range names {
result += " " + name
}
return result
}
// masterFor returns the list of log IDs among allIDs that this instance is
// master for. Note that the instance may hold mastership for logs that are not
// listed in allIDs, but such logs are skipped.
func (o *OperationManager) masterFor(ctx context.Context, allIDs []int64) ([]int64, error) {
if o.info.Registry.ElectionFactory == nil {
return allIDs, nil
}
allStringIDs := make([]string, 0, len(allIDs))
for _, id := range allIDs {
s := strconv.FormatInt(id, 10)
allStringIDs = append(allStringIDs, s)
}
// Synchronize the set of log IDs with those we are tracking mastership for.
for _, logID := range allStringIDs {
knownLogs.Set(1, logID)
if o.runnerCancels[logID] == nil {
o.tracker.Set(logID, false) // Initialise tracking for this ID.
o.runnerCancels[logID] = o.runElectionWithRestarts(ctx, logID)
}
}
held := o.tracker.Held()
heldIDs := make([]int64, 0, len(allIDs))
sort.Strings(allStringIDs)
for _, s := range held {
// Skip the log if it is not present in allIDs.
if i := sort.SearchStrings(allStringIDs, s); i >= len(allStringIDs) || allStringIDs[i] != s {
continue
}
id, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to parse logID %v as int64", s)
}
heldIDs = append(heldIDs, id)
}
return heldIDs, nil
}
// runElectionWithRestarts runs the election/resignation loop for the given log
// indefinitely, until the returned CancelFunc is invoked. Any failure during
// the loop leads to a restart of the loop with a few seconds delay.
//
// TODO(pavelkalinnikov): Restart the whole log operation rather than just the
// election, and have a metric for restarts.
func (o *OperationManager) runElectionWithRestarts(ctx context.Context, logID string) context.CancelFunc {
klog.Infof("create master election goroutine for %v", logID)
cctx, cancel := context.WithCancel(ctx)
run := func(ctx context.Context) {
e, err := o.info.Registry.ElectionFactory.NewElection(ctx, logID)
if err != nil {
klog.Errorf("failed to create election for %v: %v", logID, err)
return
}
// Warning: NewRunner can attempt to modify the config. Make a separate
// copy of the config for each log, to avoid data races.
config := o.info.ElectionConfig
// TODO(pavelkalinnikov): Passing the cancel function is not needed here.
r := election.NewRunner(logID, &config, o.tracker, cancel, e)
r.Run(ctx, o.pendingResignations)
}
o.runnerWG.Add(1)
go func(ctx context.Context) {
defer o.runnerWG.Done()
// Continue only while the context is active.
for ctx.Err() == nil {
run(ctx)
// Sleep before restarts, to not spam the log with errors.
// TODO(pavelkalinnikov): Make the interval configurable.
const pause = time.Duration(5 * time.Second)
if err := clock.SleepSource(ctx, pause, o.info.TimeSource); err != nil {
break // The context has been canceled during the sleep.
}
}
}(cctx)
return cancel
}
// updateHeldIDs updates the process status with the number/list of logs that
// the instance holds mastership for.
func (o *OperationManager) updateHeldIDs(ctx context.Context, logIDs, activeIDs []int64) {
heldInfo := o.heldInfo(ctx, logIDs)
msg := fmt.Sprintf("Acting as master for %d / %d active logs: %s", len(logIDs), len(activeIDs), heldInfo)
o.idsMutex.Lock()
defer o.idsMutex.Unlock()
if !reflect.DeepEqual(logIDs, o.lastHeld) {
o.lastHeld = make([]int64, len(logIDs))
copy(o.lastHeld, logIDs)
klog.Info(msg)
if o.info.Registry.SetProcessStatus != nil {
o.info.Registry.SetProcessStatus(heldInfo)
}
} else {
klog.V(1).Info(msg)
}
}
func (o *OperationManager) getLogsAndExecutePass(ctx context.Context) error {
runCtx, cancel := context.WithTimeout(ctx, o.info.Timeout)
defer cancel()
activeIDs, err := o.info.Registry.GetActiveLogIDs(ctx)
if err != nil {
return fmt.Errorf("failed to list active log IDs: %v", err)
}
// Find the logs we are master for, skipping those logs that are not active,
// e.g. deleted or FROZEN ones.
// TODO(pavelkalinnikov): Resign mastership for the inactive logs.
logIDs, err := o.masterFor(ctx, activeIDs)
if err != nil {
return fmt.Errorf("failed to determine log IDs we're master for: %v", err)
}
o.updateHeldIDs(ctx, logIDs, activeIDs)
executePassForAll(runCtx, &o.info, o.logOperation, logIDs)
return nil
}
// OperationSingle performs a single pass of the manager.
//
// TODO(pavelkalinnikov): Deprecate this because it doesn't clean up any state,
// and is used only for testing.
func (o *OperationManager) OperationSingle(ctx context.Context) {
if err := o.getLogsAndExecutePass(ctx); err != nil {
klog.Errorf("failed to perform operation: %v", err)
}
}
// OperationLoop starts the manager working. It continues until told to exit.
// TODO(Martin2112): No mechanism for error reporting etc., this is OK for v1 but needs work
func (o *OperationManager) OperationLoop(ctx context.Context) {
klog.Infof("Log operation manager starting")
// Outer loop, runs until terminated.
for {
if err := o.operateOnce(ctx); err != nil {
klog.Infof("Log operation manager shutting down")
break
}
}
// Terminate all the election Runners.
for logID, cancel := range o.runnerCancels {
if cancel != nil {
klog.V(1).Infof("cancel election runner for %s", logID)
cancel()
}
}
// Drain any remaining resignations which might have triggered.
close(o.pendingResignations)
for r := range o.pendingResignations {
resignations.Inc(r.ID)
r.Execute(ctx)
}
klog.Infof("wait for termination of election runners...")
o.runnerWG.Wait()
klog.Infof("wait for termination of election runners...done")
}
// operateOnce runs a single round of operation for each of the active logs
// that this instance is master for. Returns an error only if the context is
// canceled, i.e. the operation is being shut down.
func (o *OperationManager) operateOnce(ctx context.Context) error {
// TODO(alcutter): want a child context with deadline here?
start := o.info.TimeSource.Now()
if err := o.getLogsAndExecutePass(ctx); err != nil {
// Suppress the error if ctx is done (ctx.Err != nil) as we're exiting.
if ctx.Err() != nil {
klog.Errorf("failed to execute operation on logs: %v", err)
}
}
klog.V(1).Infof("Log operation manager pass complete")
// Process any pending resignations while there's no activity.
doneResigning := false
for !doneResigning {
select {
case r := <-o.pendingResignations:
resignations.Inc(r.ID)
r.Execute(ctx)
default:
doneResigning = true
}
}
// See if it's time to quit.
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Wait for the configured time before going for another pass.
duration := o.info.TimeSource.Now().Sub(start)
wait := o.info.RunInterval - duration
if wait > 0 {
klog.V(1).Infof("Processing started at %v for %v; wait %v before next run", start, duration, wait)
if err := clock.SleepContext(ctx, wait); err != nil {
return err
}
} else {
klog.V(1).Infof("Processing started at %v for %v; start next run immediately", start, duration)
}
return nil
}
// executePassForAll runs ExecutePass of the given operation for each of the
// passed-in logs, allowing up to a configurable number of parallel operations.
func executePassForAll(ctx context.Context, info *OperationInfo, op Operation, logIDs []int64) {
startBatch := info.TimeSource.Now()
numWorkers := info.NumWorkers
if numWorkers <= 0 {
klog.Warning("Running executor with NumWorkers <= 0, assuming 1")
numWorkers = 1
}
klog.V(1).Infof("Running executor with %d worker(s)", numWorkers)
sem := semaphore.NewWeighted(int64(numWorkers))
var wg sync.WaitGroup
for _, logID := range logIDs {
if err := sem.Acquire(ctx, 1); err != nil {
break // Terminate because the context is canceled.
}
wg.Add(1)
go func(logID int64) {
defer wg.Done()
defer sem.Release(1)
if err := executePass(ctx, info, op, logID); err != nil {
klog.Errorf("ExecutePass(%v) failed: %v", logID, err)
}
}(logID)
}
// Wait for the workers to consume all of the logIDs.
wg.Wait()
d := clock.SecondsSince(info.TimeSource, startBatch)
klog.V(1).Infof("Group run completed in %.2f seconds", d)
}
// executePass runs ExecutePass of the given operation for the passed-in log.
func executePass(ctx context.Context, info *OperationInfo, op Operation, logID int64) error {
label := strconv.FormatInt(logID, 10)
start := info.TimeSource.Now()
count, err := op.ExecutePass(ctx, logID, info)
if err != nil {
failedSigningRuns.Inc(label)
return err
}
// This indicates signing activity is proceeding on the logID.
signingRuns.Inc(label)
if count > 0 {
d := clock.SecondsSince(info.TimeSource, start)
klog.Infof("%v: processed %d items in %.2f seconds (%.2f qps)", logID, count, d, float64(count)/d)
entriesAdded.Add(float64(count), label)
batchesAdded.Inc(label)
} else {
klog.V(1).Infof("%v: no items to process", logID)
}
return nil
}