diff --git a/api/event_stream.go b/api/event_stream.go index f2a1bc79002..49a666709b1 100644 --- a/api/event_stream.go +++ b/api/event_stream.go @@ -167,6 +167,8 @@ func (c *Client) EventStream() *EventStream { // Stream establishes a new subscription to Nomad's event stream and streams // results back to the returned channel. +// +// Events stop being emitted once the Events.Err field is non-nil. func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, index uint64, q *QueryOptions) (<-chan *Events, error) { r, err := e.client.newRequest("GET", "/v1/event/stream") if err != nil { @@ -220,6 +222,11 @@ func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, ind return case eventsCh <- &events: } + + // There are no recoverable Decode errors, so return on error. + if events.Err != nil { + return + } } }() diff --git a/api/event_stream_test.go b/api/event_stream_test.go index 0e701b8fc52..b4455fa8309 100644 --- a/api/event_stream_test.go +++ b/api/event_stream_test.go @@ -6,6 +6,7 @@ package api import ( "context" "encoding/json" + "strings" "testing" "time" @@ -118,6 +119,42 @@ func TestEvent_Stream(t *testing.T) { case <-time.After(5 * time.Second): must.Unreachable(t, must.Sprint("failed waiting for event stream event")) } + + // Stop the server to ensure EOF is returned + s.Stop() + + for { + select { + case event, ok := <-streamCh: + if !ok { + must.Unreachable(t, must.Sprintf("chan closed before EOF received")) + } + + // Sadly decode doesn't return io.EOF + if event.Err != nil && strings.HasSuffix(event.Err.Error(), "EOF") { + // Succcess! Make sure chan gets closed + select { + case _, ok := <-streamCh: + if ok { + must.Unreachable(t, must.Sprintf("expected chan to close after EOF")) + } + + // Success! + return + case <-time.After(5 * time.Second): + must.Unreachable(t, must.Sprint("failed waiting for event stream to close")) + } + } + + if event.Err != nil { + must.Unreachable(t, must.Sprintf("unexpected %v (%T)", event.Err, event.Err)) + } + must.Len(t, 1, event.Events) + must.Eq(t, "Evaluation", string(event.Events[0].Topic)) + case <-time.After(5 * time.Second): + must.Unreachable(t, must.Sprint("failed waiting for event stream EOF")) + } + } } func TestEvent_Stream_Err_InvalidQueryParam(t *testing.T) { diff --git a/api/internal/testutil/server.go b/api/internal/testutil/server.go index 59799326df1..fb16ccd022a 100644 --- a/api/internal/testutil/server.go +++ b/api/internal/testutil/server.go @@ -143,6 +143,7 @@ type TestServer struct { cmd *exec.Cmd Config *TestServerConfig t testing.TB + exited bool HTTPAddr string SerfAddr string @@ -240,6 +241,12 @@ func NewTestServer(t testing.TB, cb ServerConfigCallback) *TestServer { // Stop stops the test Nomad server, and removes the Nomad data // directory once we are done. func (s *TestServer) Stop() { + if s.exited { + // Allow calling multiple times to allow for tests that use defer s.Stop() + // as well as stop the server during the test to assert behavior. + return + } + s.t.Cleanup(func() { _ = os.RemoveAll(s.Config.DataDir) }) @@ -258,6 +265,7 @@ func (s *TestServer) Stop() { select { case <-done: + s.exited = true return case <-time.After(5 * time.Second): s.t.Logf("timed out waiting for process to gracefully terminate") @@ -268,6 +276,7 @@ func (s *TestServer) Stop() { select { case <-done: + s.exited = true case <-time.After(5 * time.Second): s.t.Logf("timed out waiting for process to be killed") }