Skip to content

Commit

Permalink
Change caught error string (#127)
Browse files Browse the repository at this point in the history
* add retries
  • Loading branch information
grantleehoffman authored Sep 27, 2021
1 parent 6b137dd commit 06c0ec0
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 27 deletions.
2 changes: 1 addition & 1 deletion cli/cmd/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var logsCmd = &cobra.Command{
ctx := context.Background()
if streamLogs {
// This is a _very_ simple approach to streaming.
cobra.CheckErr(apiCl.StreamLogs(ctx, os.Stdout, workflowName, loggingStartByte))
cobra.CheckErr(apiCl.StreamLogs(ctx, os.Stdout, workflowName))
} else {
resp, err := apiCl.GetLogs(ctx, workflowName)
if err != nil {
Expand Down
40 changes: 24 additions & 16 deletions cli/internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,24 @@ func (c *Client) GetLogs(ctx context.Context, workflowName string) (responses.Ge
}

// StreamLogs streams the logs of a workflow starting after loggedBytes.
func (c *Client) StreamLogs(ctx context.Context, w io.Writer, workflowName string, skippedLogBytes int64) error {
func (c *Client) StreamLogs(ctx context.Context, w io.Writer, workflowName string) error {
var loggingCursorByte int64
// When we receive a stream error we continue and retry processing up to 5 times keeping track of the byte we were logging.
for i := 0; i < 5; i++ {
err := c.streamLogsToWriterAtCursor(ctx, w, workflowName, &loggingCursorByte)
if err != nil {
if strings.Contains(err.Error(), "stream error: stream ID 1; INTERNAL_ERROR") {
time.Sleep(time.Second * 10)
continue
}
return err
}
break
}
return nil
}

func (c *Client) streamLogsToWriterAtCursor(ctx context.Context, w io.Writer, workflowName string, loggingCursorByte *int64) error {
url := fmt.Sprintf("%s/workflows/%s/logstream", c.endpoint, workflowName)

req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
Expand All @@ -96,26 +113,17 @@ func (c *Client) StreamLogs(ctx context.Context, w io.Writer, workflowName strin
return fmt.Errorf("received unexpected status code: %d", resp.StatusCode)
}

// discard reader bytes already logged
if _, err := io.CopyN(ioutil.Discard, resp.Body, skippedLogBytes); err != nil {
// discard reader bytes till cursor byte number
if _, err := io.CopyN(ioutil.Discard, resp.Body, *loggingCursorByte); err != nil {
return err
}
skippedLogBytes, err = io.Copy(w, resp.Body)
writtenBytes, err := io.Copy(w, resp.Body)
if err != nil {
// retry call if we receive the stream error
if strings.Contains(err.Error(), "INTERNAL_ERROR") {
// temporary debug message
_, err := fmt.Fprintf(w, "internal error found while copying: '%v'\n", err.Error())
if err != nil {
return err
}
time.Sleep(time.Second * 10)
// Restart log streaming
return c.StreamLogs(ctx, w, workflowName, skippedLogBytes)
}
// retry call if we receive the stream error, increase the logging cursor byte by the amount we logged.
*loggingCursorByte += writtenBytes

return fmt.Errorf("error reading response body. status code: %d, error: %w", resp.StatusCode, err)
}

return nil
}

Expand Down
13 changes: 3 additions & 10 deletions cli/internal/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func TestGetLogs(t *testing.T) {
} else {
assert.Nil(t, err)
assert.Equal(t, output, tt.want)

}
})
}
Expand All @@ -134,8 +135,7 @@ func TestStreamLogs(t *testing.T) {
name string
apiRespBody []byte
apiRespStatusCode int
endpoint string // Used to create new request error.
loggedBytes int64
endpoint string // Used to create new request error.
mockHTTPClient *mockHTTPClient // Only used when needed.
writeBadContentLength bool // Used to create response body error.
want []byte
Expand All @@ -147,13 +147,6 @@ func TestStreamLogs(t *testing.T) {
apiRespStatusCode: http.StatusOK,
want: readFile(t, "stream_logs_good.txt"),
},
{
name: "Log after 8 bytes",
apiRespBody: readFile(t, "stream_logs_good.txt"),
apiRespStatusCode: http.StatusOK,
loggedBytes: 8,
want: readFile(t, "stream_logs_good.txt")[8:],
},
{
name: "error non-200 response",
apiRespBody: []byte("boom"),
Expand Down Expand Up @@ -215,7 +208,7 @@ func TestStreamLogs(t *testing.T) {
}

var b bytes.Buffer
err := client.StreamLogs(context.Background(), &b, "workflow1", tt.loggedBytes)
err := client.StreamLogs(context.Background(), &b, "workflow1")

if tt.wantErr != nil {
assert.EqualError(t, err, tt.wantErr.Error())
Expand Down

0 comments on commit 06c0ec0

Please sign in to comment.