Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ func (c *Client) EventStream() *EventStream {

// Stream establishes a new subscription to Nomad's event stream and streams
// results back to the returned channel.
//
// Events stop being emitted once the Events.Err field is non-nil.
func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, index uint64, q *QueryOptions) (<-chan *Events, error) {
r, err := e.client.newRequest("GET", "/v1/event/stream")
if err != nil {
Expand Down Expand Up @@ -220,6 +222,11 @@ func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, ind
return
case eventsCh <- &events:
}

// There are no recoverable Decode errors, so return on error.
if events.Err != nil {
return
}
}
}()

Expand Down
37 changes: 37 additions & 0 deletions api/event_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package api
import (
"context"
"encoding/json"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -118,6 +119,42 @@ func TestEvent_Stream(t *testing.T) {
case <-time.After(5 * time.Second):
must.Unreachable(t, must.Sprint("failed waiting for event stream event"))
}

// Stop the server to ensure EOF is returned
s.Stop()

for {
select {
case event, ok := <-streamCh:
if !ok {
must.Unreachable(t, must.Sprintf("chan closed before EOF received"))
}

// Sadly decode doesn't return io.EOF
if event.Err != nil && strings.HasSuffix(event.Err.Error(), "EOF") {
// Succcess! Make sure chan gets closed
select {
case _, ok := <-streamCh:
if ok {
must.Unreachable(t, must.Sprintf("expected chan to close after EOF"))
}

// Success!
return
case <-time.After(5 * time.Second):
must.Unreachable(t, must.Sprint("failed waiting for event stream to close"))
}
}

if event.Err != nil {
must.Unreachable(t, must.Sprintf("unexpected %v (%T)", event.Err, event.Err))
}
must.Len(t, 1, event.Events)
must.Eq(t, "Evaluation", string(event.Events[0].Topic))
case <-time.After(5 * time.Second):
must.Unreachable(t, must.Sprint("failed waiting for event stream EOF"))
}
}
}

func TestEvent_Stream_Err_InvalidQueryParam(t *testing.T) {
Expand Down
9 changes: 9 additions & 0 deletions api/internal/testutil/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ type TestServer struct {
cmd *exec.Cmd
Config *TestServerConfig
t testing.TB
exited bool

HTTPAddr string
SerfAddr string
Expand Down Expand Up @@ -240,6 +241,12 @@ func NewTestServer(t testing.TB, cb ServerConfigCallback) *TestServer {
// Stop stops the test Nomad server, and removes the Nomad data
// directory once we are done.
func (s *TestServer) Stop() {
if s.exited {
// Allow calling multiple times to allow for tests that use defer s.Stop()
// as well as stop the server during the test to assert behavior.
return
}

s.t.Cleanup(func() {
_ = os.RemoveAll(s.Config.DataDir)
})
Expand All @@ -258,6 +265,7 @@ func (s *TestServer) Stop() {

select {
case <-done:
s.exited = true
return
case <-time.After(5 * time.Second):
s.t.Logf("timed out waiting for process to gracefully terminate")
Expand All @@ -268,6 +276,7 @@ func (s *TestServer) Stop() {

select {
case <-done:
s.exited = true
case <-time.After(5 * time.Second):
s.t.Logf("timed out waiting for process to be killed")
}
Expand Down
Loading