Skip to content

Commit 8390827

Browse files
authored
Merge branch 'main' into node_chan_writes_without_blocking
2 parents ed9be6e + 4e4eaa6 commit 8390827

File tree

6 files changed

+1515
-53
lines changed

6 files changed

+1515
-53
lines changed

Diff for: node/pkg/telemetry/loki.go

+165-11
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package telemetry
77
import (
88
"context"
99
"encoding/json"
10+
"errors"
1011
"fmt"
1112
"time"
1213

@@ -28,6 +29,14 @@ import (
2829
"github.com/prometheus/common/model"
2930
)
3031

32+
const (
33+
// bufferSize specifies how many log messages can be queued up locally before we start dropping them.
34+
bufferSize = 1000
35+
36+
// clientTimeout is how long we are willing to wait for Loki on shutdown. Note that this is an UPPER LIMIT. In the sunny day scenario, we won't need to wait.
37+
clientTimeout = 250 * time.Millisecond
38+
)
39+
3140
// ExternalLoggerLoki implements ExternalLogger for the Grafana Loki cloud logging.
3241
type ExternalLoggerLoki struct {
3342
// c is the promtail client.
@@ -38,6 +47,17 @@ type ExternalLoggerLoki struct {
3847

3948
// localLogger is the zap localLogger used to log errors generated by the loki adapter. It does not use telemetry.
4049
localLogger *zap.Logger
50+
51+
// bufferedChan is used to buffer log messages so that the app does not block on Loki. The Loki internal channel is unbuffered, so we
52+
// write things to this channel. If this write would block, we peg a metric and drop the log message (although it still gets logged locally).
53+
// There is then a worker routine picking messages off of this local channel and writing them to the Loki channel in a blocking manner.
54+
bufferedChan chan api.Entry
55+
56+
// cancelWorker is used to cancel the worker routine on shutdown.
57+
cancelWorker context.CancelFunc
58+
59+
// workerExitedC is used by the worker to signal that it has exited.
60+
workerExitedC chan struct{}
4161
}
4262

4363
var (
@@ -54,7 +74,7 @@ var (
5474
})
5575
)
5676

57-
func (logger *ExternalLoggerLoki) log(time time.Time, message json.RawMessage, level zapcore.Level) {
77+
func (logger *ExternalLoggerLoki) log(ts time.Time, message json.RawMessage, level zapcore.Level) {
5878
lokiLabels := logger.labels[level]
5979

6080
bytes, err := message.MarshalJSON()
@@ -64,29 +84,29 @@ func (logger *ExternalLoggerLoki) log(time time.Time, message json.RawMessage, l
6484
}
6585
entry := api.Entry{
6686
Entry: logproto.Entry{
67-
Timestamp: time,
87+
Timestamp: ts,
6888
Line: string(bytes),
6989
},
7090

7191
Labels: lokiLabels,
7292
}
7393

7494
select {
75-
case logger.c.Chan() <- entry:
95+
case logger.bufferedChan <- entry:
7696
lokiMessagesSent.Inc()
7797
default:
7898
lokiMessagesDropped.Inc()
7999
}
80100

81-
// A fatal error exits, which can cause us to lose messages. Flush everything.
101+
// A fatal error exits, which can cause us to lose messages. Shut down the worker so it will flush the logs.
82102
if level == zapcore.FatalLevel {
83-
logger.c.StopNow()
103+
logger.stopWorkerWithTimeout()
84104
}
85105
}
86106

87-
func (logger *ExternalLoggerLoki) close() error {
88-
logger.c.Stop()
89-
return nil
107+
func (logger *ExternalLoggerLoki) close() {
108+
// Shut down the worker and wait for it to exit. It has a timeout so we won't wait forever.
109+
logger.stopWorkerWithTimeout()
90110
}
91111

92112
// NewLokiCloudLogger creates a new Telemetry logger using Grafana Loki Cloud Logging.
@@ -142,15 +162,149 @@ func NewLokiCloudLogger(ctx context.Context, logger *zap.Logger, url string, pro
142162
lokiLabels[level] = levLabels
143163
}
144164

165+
// Create a buffered channel so the application does not block in the logger.
166+
bufferedChan := make(chan api.Entry, bufferSize)
167+
168+
// Create a local context with a cancel function so we can signal our worker to shutdown when the time comes.
169+
// Cancelling the worker also closes the Loki client.
170+
workerContext, cancelWorker := context.WithCancel(ctx)
171+
172+
// Create a channel used by the worker to signal that it has exited.
173+
workerExitedC := make(chan struct{}, 1)
174+
175+
// Kick off the worker to read from the local buffered channel and write to the Loki unbuffered channel.
176+
go logWriter(workerContext, localLogger, bufferedChan, workerExitedC, c)
177+
145178
return &Telemetry{
146179
encoder: &guardianTelemetryEncoder{
147180
Encoder: zapcore.NewJSONEncoder(zapdriver.NewProductionEncoderConfig()),
148181
logger: &ExternalLoggerLoki{
149-
c: c,
150-
labels: lokiLabels,
151-
localLogger: localLogger,
182+
c: c,
183+
labels: lokiLabels,
184+
localLogger: localLogger,
185+
bufferedChan: bufferedChan,
186+
cancelWorker: cancelWorker,
187+
workerExitedC: workerExitedC,
152188
},
153189
skipPrivateLogs: skipPrivateLogs,
154190
},
155191
}, nil
156192
}
193+
194+
// logWriter is the go routine that takes log messages off the buffered channel and posts them to the Loki client. It can block
195+
// on the Loki client until our context is canceled, meaning we are shutting down. On shutdown, it tries to flush buffered messages
196+
// and shutdown the Loki client using a timeout for both actions.
197+
func logWriter(ctx context.Context, logger *zap.Logger, localC chan api.Entry, workerExitedC chan struct{}, c client.Client) {
198+
// pendingEntry is used to save the last log message if the write to Loki is interrupted by the context being canceled. We will attempt to flush it.
199+
var pendingEntry *api.Entry
200+
201+
for {
202+
select {
203+
case entry, ok := <-localC:
204+
if !ok {
205+
logger.Error("Loki log writer is exiting because the buffered channel has been closed")
206+
cleanUpWorker(logger, workerExitedC, c)
207+
return
208+
}
209+
210+
// Write to Loki in a blocking manner unless we are signaled to shutdown.
211+
select {
212+
case c.Chan() <- entry:
213+
pendingEntry = nil
214+
case <-ctx.Done():
215+
// Time to shutdown. We probably failed to write this message, save it so we can try to flush it.
216+
pendingEntry = &entry
217+
}
218+
case <-ctx.Done():
219+
logger.Info("Loki log writer shutting down")
220+
221+
// Flush as much as we can in our allowed time.
222+
if numRemaining, err := flushLogsWithTimeout(localC, c, pendingEntry); err != nil {
223+
logger.Error("worker failed to flush logs", zap.Error(err), zap.Int("numEventsRemaining", numRemaining))
224+
}
225+
226+
cleanUpWorker(logger, workerExitedC, c)
227+
return
228+
}
229+
}
230+
}
231+
232+
// flushLogsWithTimeout is used to flush any buffered log messages on shutdown.
233+
// It uses a timeout so that we only delay guardian shutdown for so long.
234+
func flushLogsWithTimeout(localC chan api.Entry, c client.Client, pendingEntry *api.Entry) (int, error) {
235+
// Create a timeout context. Base it on the background one since ours has been canceled.
236+
// We are using a timeout context rather than `time.After` here because that is the maximum
237+
// we want to wait, rather than a per-event timeout.
238+
timeout, cancel := context.WithTimeout(context.Background(), clientTimeout)
239+
defer cancel()
240+
241+
if pendingEntry != nil {
242+
select {
243+
case c.Chan() <- *pendingEntry:
244+
case <-timeout.Done():
245+
// If we timeout, we didn't write the pending one, so count that as remaining.
246+
return (1 + len(localC)), errors.New("timeout writing pending entry")
247+
}
248+
}
249+
250+
for len(localC) > 0 {
251+
select {
252+
case entry := <-localC:
253+
c.Chan() <- entry
254+
case <-timeout.Done():
255+
// If we timeout, we didn't write the current one, so count that as remaining.
256+
return (1 + len(localC)), errors.New("timeout flushing buffered entry")
257+
}
258+
}
259+
260+
return 0, nil
261+
}
262+
263+
// cleanUpWorker is called when the worker is shutting down. It closes the Loki client connection and signals that the worker has exited.
264+
func cleanUpWorker(logger *zap.Logger, workerExitedC chan struct{}, c client.Client) {
265+
// Stop the client without blocking indefinitely.
266+
if err := stopClientWithTimeout(c); err != nil {
267+
logger.Error("worker failed to stop Loki client", zap.Error(err))
268+
}
269+
270+
// Signal that we are done.
271+
select {
272+
case workerExitedC <- struct{}{}:
273+
logger.Info("Loki log writer exiting")
274+
default:
275+
logger.Error("Loki log writer failed to write the exited flag, exiting anyway")
276+
}
277+
}
278+
279+
// stopClientWithTimeout calls the Loki client shutdown function using a timeout so that we only delay guardian shutdown for so long.
280+
func stopClientWithTimeout(c client.Client) error {
281+
// Call the stop function in a go routine so we can use a timeout.
282+
stopExitedC := make(chan struct{}, 1)
283+
go func(c client.Client) {
284+
c.StopNow()
285+
stopExitedC <- struct{}{}
286+
}(c)
287+
288+
// Wait for the go routine to exit or the timer to expire. Using `time.After` since this is a one shot and we don't have the context.
289+
select {
290+
case <-stopExitedC:
291+
return nil
292+
case <-time.After(clientTimeout):
293+
return errors.New("timeout")
294+
}
295+
}
296+
297+
// stopWorkerWithTimeout stops the log writer and waits for it to exit. It only waits a finite length of time.
298+
func (logger *ExternalLoggerLoki) stopWorkerWithTimeout() {
299+
// Shut down the worker.
300+
logger.cancelWorker()
301+
302+
// Wait for the worker to signal that it has exited. Use a timeout so we don't wait forever.
303+
// It could take up to twice the client timeout for the worker to exit. Wait a little longer than that.
304+
// Using `time.After` since this is a one shot and we don't have the context.
305+
select {
306+
case <-logger.workerExitedC:
307+
case <-time.After(3 * clientTimeout):
308+
logger.localLogger.Error("log writer failed to exit, giving up")
309+
}
310+
}

Diff for: node/pkg/telemetry/telemetry.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ type Telemetry struct {
2020

2121
type ExternalLogger interface {
2222
log(time time.Time, message json.RawMessage, level zapcore.Level)
23-
close() error
23+
close()
2424
}
2525

2626
// guardianTelemetryEncoder is a wrapper around zapcore.jsonEncoder that logs to cloud based logging
@@ -86,6 +86,6 @@ func (s *Telemetry) WrapLogger(logger *zap.Logger) *zap.Logger {
8686
}))
8787
}
8888

89-
func (s *Telemetry) Close() error {
90-
return s.encoder.logger.close()
89+
func (s *Telemetry) Close() {
90+
s.encoder.logger.close()
9191
}

Diff for: node/pkg/telemetry/telemetry_test.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,7 @@ func (logger *externalLoggerMock) log(time time.Time, message json.RawMessage, l
3535
}
3636

3737
}
38-
func (logger *externalLoggerMock) close() error {
39-
return nil
38+
func (logger *externalLoggerMock) close() {
4039
}
4140

4241
func TestTelemetryWithPrivate(t *testing.T) {

Diff for: node/pkg/watchers/solana/client.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"time"
1111

1212
"encoding/base64"
13-
"encoding/hex"
1413
"encoding/json"
1514

1615
"github.com/certusone/wormhole/node/pkg/common"
@@ -409,13 +408,13 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
409408
if err != nil {
410409
logger.Error("failed to process observation request",
411410
zap.Uint32("chainID", m.ChainId),
412-
zap.String("txID", hex.EncodeToString(m.TxHash)),
411+
zap.String("identifier", base58.Encode(m.TxHash)),
413412
zap.Error(err),
414413
)
415414
} else {
416415
logger.Info("reobserved transactions",
417416
zap.Uint32("chainID", m.ChainId),
418-
zap.String("txID", hex.EncodeToString(m.TxHash)),
417+
zap.String("identifier", base58.Encode(m.TxHash)),
419418
zap.Uint32("numObservations", numObservations),
420419
)
421420
}

0 commit comments

Comments
 (0)