diff --git a/_examples/flush-with-context/main.go b/_examples/flush-with-context/main.go new file mode 100644 index 000000000..3f6d3c728 --- /dev/null +++ b/_examples/flush-with-context/main.go @@ -0,0 +1,36 @@ +package main + +import ( + "context" + "fmt" + "time" + + "github.com/getsentry/sentry-go" +) + +func main() { + _ = sentry.Init(sentry.ClientOptions{ + Dsn: "https://hello@example.com/1337", + Debug: true, + }) + + sentry.CaptureMessage("Event #1") + sentry.CaptureMessage("Event #2") + sentry.CaptureMessage("Event #3") + + go func() { + sentry.CaptureMessage("Event #4") + sentry.CaptureMessage("Event #5") + }() + + fmt.Println("=> Flushing transport buffer") + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + defer cancel() + + if sentry.FlushWithContext(ctx) { + fmt.Println("=> All queued events delivered!") + } else { + fmt.Println("=> Flush timeout reached") + } +} diff --git a/client.go b/client.go index 5b6fc1b1b..ea29096b6 100644 --- a/client.go +++ b/client.go @@ -516,6 +516,25 @@ func (client *Client) Flush(timeout time.Duration) bool { return client.Transport.Flush(timeout) } +// FlushWithContext waits until the underlying Transport sends any buffered events +// to the Sentry server, blocking for at most the duration specified by the context. +// It returns false if the context is canceled before the events are sent. In such a case, +// some events may not be delivered. +// +// FlushWithContext should be called before terminating the program to ensure no +// events are unintentionally dropped. +// +// Avoid calling FlushWithContext indiscriminately after each call to CaptureEvent, +// CaptureException, or CaptureMessage. To send events synchronously over the network, +// configure the SDK to use HTTPSyncTransport during initialization with Init. + +func (client *Client) FlushWithContext(ctx context.Context) bool { + if client.batchLogger != nil { + client.batchLogger.Flush() + } + return client.Transport.FlushWithContext(ctx) +} + // Close clean up underlying Transport resources. // // Close should be called after Flush and before terminating the program diff --git a/hub.go b/hub.go index 1b36c2f86..8aea27377 100644 --- a/hub.go +++ b/hub.go @@ -365,6 +365,28 @@ func (hub *Hub) Flush(timeout time.Duration) bool { return client.Flush(timeout) } +// FlushWithContext waits until the underlying Transport sends any buffered events +// to the Sentry server, blocking for at most the duration specified by the context. +// It returns false if the context is canceled before the events are sent. In such a case, +// some events may not be delivered. +// +// FlushWithContext should be called before terminating the program to ensure no +// events are unintentionally dropped. +// +// Avoid calling FlushWithContext indiscriminately after each call to CaptureEvent, +// CaptureException, or CaptureMessage. To send events synchronously over the network, +// configure the SDK to use HTTPSyncTransport during initialization with Init. + +func (hub *Hub) FlushWithContext(ctx context.Context) bool { + client := hub.Client() + + if client == nil { + return false + } + + return client.FlushWithContext(ctx) +} + // GetTraceparent returns the current Sentry traceparent string, to be used as a HTTP header value // or HTML meta tag value. // This function is context aware, as in it either returns the traceparent based diff --git a/hub_test.go b/hub_test.go index 7aade5593..184062179 100644 --- a/hub_test.go +++ b/hub_test.go @@ -7,6 +7,7 @@ import ( "strings" "sync" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" @@ -493,3 +494,62 @@ func TestConcurrentHubClone(t *testing.T) { t.Errorf("Events mismatch (-want +got):\n%s", diff) } } + +func TestHub_Flush(t *testing.T) { + hub, client, _ := setupHubTest() + transport := &MockTransport{} + client.Transport = transport + + wantEvent := Event{Message: "something"} + hub.CaptureEvent(&wantEvent) + hub.Flush(20 * time.Millisecond) + + gotEvents := transport.Events() + if len(gotEvents) != 1 { + t.Fatalf("expected 1 event, got %d", len(gotEvents)) + } + if gotEvents[0].Message != wantEvent.Message { + t.Fatalf("expected message to be %v, got %v", wantEvent.Message, gotEvents[0].Message) + } +} + +func TestHub_Flush_NoClient(t *testing.T) { + hub := NewHub(nil, nil) + flushed := hub.Flush(20 * time.Millisecond) + + if flushed != false { + t.Fatalf("expected flush to be false, got %v", flushed) + } +} + +func TestHub_FlushWithCtx_NoClient(t *testing.T) { + hub := NewHub(nil, nil) + cancelCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + flushed := hub.FlushWithContext(cancelCtx) + + if flushed != false { + t.Fatalf("expected flush to be false, got %v", flushed) + } +} + +func TestHub_FlushWithContext(t *testing.T) { + hub, client, _ := setupHubTest() + transport := &MockTransport{} + client.Transport = transport + + wantEvent := Event{Message: "something"} + hub.CaptureEvent(&wantEvent) + + cancelCtx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) + hub.FlushWithContext(cancelCtx) + defer cancel() + + gotEvents := transport.Events() + if len(gotEvents) != 1 { + t.Fatalf("expected 1 event, got %d", len(gotEvents)) + } + if gotEvents[0].Message != wantEvent.Message { + t.Fatalf("expected message to be %v, got %v", wantEvent.Message, gotEvents[0].Message) + } +} diff --git a/log_test.go b/log_test.go index 58852a026..f21154c4f 100644 --- a/log_test.go +++ b/log_test.go @@ -455,6 +455,21 @@ func Test_batchLogger_Flush(t *testing.T) { } } +func Test_batchLogger_FlushWithContext(t *testing.T) { + ctx, mockTransport := setupMockTransport() + l := NewLogger(context.Background()) + l.Info(ctx, "context done log") + + cancelCtx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) + FlushWithContext(cancelCtx) + defer cancel() + + events := mockTransport.Events() + if len(events) != 1 { + t.Fatalf("expected 1 event, got %d", len(events)) + } +} + func Test_sentryLogger_BeforeSendLog(t *testing.T) { ctx := context.Background() mockTransport := &MockTransport{} diff --git a/logrus/README.md b/logrus/README.md index cbb16a573..1797a3920 100644 --- a/logrus/README.md +++ b/logrus/README.md @@ -102,5 +102,5 @@ This ensures that logs from specific contexts or threads use the appropriate Sen ## Notes -- Always call Flush to ensure all events are sent to Sentry before program termination +- Always call `Flush` or `FlushWithContext` to ensure all events are sent to Sentry before program termination diff --git a/logrus/logrusentry.go b/logrus/logrusentry.go index 1ac9da443..18371105b 100644 --- a/logrus/logrusentry.go +++ b/logrus/logrusentry.go @@ -2,6 +2,7 @@ package sentrylogrus import ( + "context" "errors" "net/http" "time" @@ -222,3 +223,11 @@ func (h *Hook) entryToEvent(l *logrus.Entry) *sentry.Event { func (h *Hook) Flush(timeout time.Duration) bool { return h.hubProvider().Client().Flush(timeout) } + +// FlushWithContext waits for the underlying Sentry transport to send any buffered +// events, blocking until the context's deadline is reached or the context is canceled. +// It returns false if the context is canceled or its deadline expires before the events +// are sent, meaning some events may not have been sent. +func (h *Hook) FlushWithContext(ctx context.Context) bool { + return h.hubProvider().Client().FlushWithContext(ctx) +} diff --git a/mocks.go b/mocks.go index e6345abab..92bd84d9c 100644 --- a/mocks.go +++ b/mocks.go @@ -1,6 +1,7 @@ package sentry import ( + "context" "sync" "time" ) @@ -39,6 +40,7 @@ func (t *MockTransport) SendEvent(event *Event) { func (t *MockTransport) Flush(_ time.Duration) bool { return true } +func (t *MockTransport) FlushWithContext(_ context.Context) bool { return true } func (t *MockTransport) Events() []*Event { t.mu.Lock() defer t.mu.Unlock() diff --git a/sentry.go b/sentry.go index 0ce41543e..3041c547f 100644 --- a/sentry.go +++ b/sentry.go @@ -125,6 +125,23 @@ func Flush(timeout time.Duration) bool { return hub.Flush(timeout) } +// FlushWithContext waits until the underlying Transport sends any buffered events +// to the Sentry server, blocking for at most the duration specified by the context. +// It returns false if the context is canceled before the events are sent. In such a case, +// some events may not be delivered. +// +// FlushWithContext should be called before terminating the program to ensure no +// events are unintentionally dropped. +// +// Avoid calling FlushWithContext indiscriminately after each call to CaptureEvent, +// CaptureException, or CaptureMessage. To send events synchronously over the network, +// configure the SDK to use HTTPSyncTransport during initialization with Init. + +func FlushWithContext(ctx context.Context) bool { + hub := CurrentHub() + return hub.FlushWithContext(ctx) +} + // LastEventID returns an ID of last captured event. func LastEventID() EventID { hub := CurrentHub() diff --git a/slog/README.MD b/slog/README.MD index 494fda8ad..a6a44c078 100644 --- a/slog/README.MD +++ b/slog/README.MD @@ -90,4 +90,4 @@ handler := slogSentry.Option{ ## Notes -- Always call Flush to ensure all events are sent to Sentry before program termination +- Always call `Flush` or `FlushWithContext` to ensure all events are sent to Sentry before program termination diff --git a/transport.go b/transport.go index 28f7713a4..e2ec87abf 100644 --- a/transport.go +++ b/transport.go @@ -35,6 +35,7 @@ const maxDrainResponseBytes = 16 << 10 // Transport is used by the Client to deliver events to remote server. type Transport interface { Flush(timeout time.Duration) bool + FlushWithContext(ctx context.Context) bool Configure(options ClientOptions) SendEvent(event *Event) Close() @@ -439,8 +440,19 @@ func (t *HTTPTransport) SendEventWithContext(ctx context.Context, event *Event) // have the SDK send events over the network synchronously, configure it to use // the HTTPSyncTransport in the call to Init. func (t *HTTPTransport) Flush(timeout time.Duration) bool { - toolate := time.After(timeout) + timeoutCh := make(chan struct{}) + time.AfterFunc(timeout, func() { + close(timeoutCh) + }) + return t.flushInternal(timeoutCh) +} + +// FlushWithContext works like Flush, but it accepts a context.Context instead of a timeout. +func (t *HTTPTransport) FlushWithContext(ctx context.Context) bool { + return t.flushInternal(ctx.Done()) +} +func (t *HTTPTransport) flushInternal(timeout <-chan struct{}) bool { // Wait until processing the current batch has started or the timeout. // // We must wait until the worker has seen the current batch, because it is @@ -448,6 +460,7 @@ func (t *HTTPTransport) Flush(timeout time.Duration) bool { // possible execution flow in which b.done is never closed, and the only way // out of Flush would be waiting for the timeout, which is undesired. var b batch + for { select { case b = <-t.buffer: @@ -457,7 +470,7 @@ func (t *HTTPTransport) Flush(timeout time.Duration) bool { default: t.buffer <- b } - case <-toolate: + case <-timeout: goto fail } } @@ -478,12 +491,12 @@ started: case <-b.done: DebugLogger.Println("Buffer flushed successfully.") return true - case <-toolate: + case <-timeout: goto fail } fail: - DebugLogger.Println("Buffer flushing reached the timeout.") + DebugLogger.Println("Buffer flushing was canceled or timed out.") return false } @@ -697,6 +710,11 @@ func (t *HTTPSyncTransport) Flush(_ time.Duration) bool { return true } +// FlushWithContext is a no-op for HTTPSyncTransport. It always returns true immediately. +func (t *HTTPSyncTransport) FlushWithContext(_ context.Context) bool { + return true +} + func (t *HTTPSyncTransport) disabled(c ratelimit.Category) bool { t.mu.Lock() defer t.mu.Unlock() @@ -729,4 +747,8 @@ func (noopTransport) Flush(time.Duration) bool { return true } +func (noopTransport) FlushWithContext(context.Context) bool { + return true +} + func (noopTransport) Close() {} diff --git a/transport_test.go b/transport_test.go index 5204dc657..a1e937c26 100644 --- a/transport_test.go +++ b/transport_test.go @@ -491,6 +491,98 @@ func TestHTTPTransport(t *testing.T) { }) } +func TestHTTPTransport_FlushWithContext(t *testing.T) { + server := newTestHTTPServer(t) + defer server.Close() + + transport := NewHTTPTransport() + transport.Configure(ClientOptions{ + Dsn: fmt.Sprintf("https://test@%s/1", server.Listener.Addr()), + HTTPClient: server.Client(), + }) + + // Helpers + + transportSendTestEvent := func(t *testing.T) (id string) { + t.Helper() + + e := NewEvent() + id = uuid() + e.EventID = EventID(id) + + transport.SendEvent(e) + + t.Logf("[CLIENT] {%.4s} event sent", e.EventID) + return id + } + + transportMustFlushWithCtx := func(t *testing.T, id string) { + t.Helper() + cancelCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + ok := transport.FlushWithContext(cancelCtx) + if !ok { + t.Fatalf("[CLIENT] {%.4s} Flush() timed out", id) + } + } + + serverEventCountMustBe := func(t *testing.T, n uint64) { + t.Helper() + + count := server.EventCount() + if count != n { + t.Fatalf("[SERVER] event count = %d, want %d", count, n) + } + } + + testSendSingleEvent := func(t *testing.T) { + // Sending a single event should increase the server event count by + // exactly one. + + initialCount := server.EventCount() + id := transportSendTestEvent(t) + + // Server is blocked waiting for us, right now count must not have + // changed yet. + serverEventCountMustBe(t, initialCount) + + // After unblocking the server, Flush must guarantee that the server + // event count increased by one. + server.Unblock() + transportMustFlushWithCtx(t, id) + serverEventCountMustBe(t, initialCount+1) + } + + // Actual tests + t.Run("SendSingleEvent", testSendSingleEvent) + + t.Run("FlushMultipleTimes", func(t *testing.T) { + // Flushing multiple times should not increase the server event count. + + initialCount := server.EventCount() + for i := 0; i < 10; i++ { + transportMustFlushWithCtx(t, fmt.Sprintf("loop%d", i)) + } + serverEventCountMustBe(t, initialCount) + }) + + t.Run("ConcurrentSendAndFlush", func(t *testing.T) { + // It should be safe to send events and flush concurrently. + + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + testSendSingleEvent(t) + }() + go func() { + defer wg.Done() + transportMustFlushWithCtx(t, "from goroutine") + }() + wg.Wait() + }) +} + // httptraceRoundTripper implements http.RoundTripper by wrapping // http.DefaultTransport and keeps track of whether TCP connections have been // reused for every request. @@ -707,3 +799,23 @@ func TestHTTPSyncTransportClose(_ *testing.T) { tr := noopTransport{} tr.Close() } + +func TestHTTPSyncTransport_Flush(_ *testing.T) { + // Flush does not do anything for HTTPSyncTransport, added for coverage. + transport := HTTPSyncTransport{} + transport.Flush(testutils.FlushTimeout()) + + tr := noopTransport{} + tr.Flush(testutils.FlushTimeout()) +} + +func TestHTTPSyncTransport_FlushWithContext(_ *testing.T) { + // FlushWithContext does not do anything for HTTPSyncTransport, added for coverage. + transport := HTTPSyncTransport{} + cancelCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + transport.FlushWithContext(cancelCtx) + + tr := noopTransport{} + tr.FlushWithContext(cancelCtx) +} diff --git a/zerolog/README.md b/zerolog/README.md index 18e4b0d08..e27fcbfc0 100644 --- a/zerolog/README.md +++ b/zerolog/README.md @@ -85,4 +85,4 @@ The `sentryzerolog.Options` struct allows you to configure the following: ## Notes -- Always call Flush to ensure all events are sent to Sentry before program termination \ No newline at end of file +- Always call `Flush` or `FlushWithContext` to ensure all events are sent to Sentry before program termination \ No newline at end of file