Skip to content

Commit 111475b

Browse files
committed
Retry fetch grant token
1 parent 06c148d commit 111475b

File tree

5 files changed

+46
-25
lines changed

5 files changed

+46
-25
lines changed

internal/auth/handshake.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ import (
44
"crypto/rand"
55
"encoding/hex"
66
"fmt"
7-
"task-runner-launcher/internal/logs"
87
"net/url"
98
"strings"
9+
"task-runner-launcher/internal/logs"
1010

1111
"github.com/gorilla/websocket"
1212
)
@@ -201,6 +201,7 @@ func Handshake(cfg HandshakeConfig) error {
201201
case err := <-errReceived:
202202
return err
203203
case <-handshakeComplete:
204+
logs.Info("Runner's task offer was accepted")
204205
return nil
205206
}
206207
}

internal/auth/token_exchange.go

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"fmt"
77
"net/http"
8+
launcherHttp "task-runner-launcher/internal/http"
89
)
910

1011
type grantTokenResponse struct {
@@ -13,10 +14,7 @@ type grantTokenResponse struct {
1314
} `json:"data"`
1415
}
1516

16-
// FetchGrantToken exchanges the launcher's auth token for a less privileged
17-
// grant token returned by the n8n main instance. The launcher will later pass
18-
// this grant token to the task runner.
19-
func FetchGrantToken(n8nURI, authToken string) (string, error) {
17+
func fetchGrantToken(n8nURI, authToken string) (string, error) {
2018
url := fmt.Sprintf("http://%s/runners/auth", n8nURI)
2119

2220
payload := map[string]string{"token": authToken}
@@ -44,8 +42,29 @@ func FetchGrantToken(n8nURI, authToken string) (string, error) {
4442

4543
var tokenResp grantTokenResponse
4644
if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil {
47-
return "", err
45+
return "", fmt.Errorf("failed to decode grant token response: %w", err)
4846
}
4947

5048
return tokenResp.Data.Token, nil
5149
}
50+
51+
// FetchGrantTokenWithRetry exchanges the launcher's auth token for a less
52+
// privileged grant token returned by the n8n main instance. This exchange is
53+
// retried in case the n8n main instance is temporarily unavailable.
54+
func FetchGrantTokenWithRetry(n8nURI, authToken string) (string, error) {
55+
grantTokenFetch := func() (string, error) {
56+
token, err := fetchGrantToken(n8nURI, authToken)
57+
if err != nil {
58+
return "", fmt.Errorf("failed to fetch grant token: %w", err)
59+
}
60+
return token, nil
61+
}
62+
63+
token, err := launcherHttp.LimitedRetry("grant-token-fetch", grantTokenFetch)
64+
65+
if err != nil {
66+
return "", fmt.Errorf("exhausted retries to fetch grant token: %w", err)
67+
}
68+
69+
return token, nil
70+
}

internal/commands/launch.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,14 +92,14 @@ func (l *LaunchCommand) Execute() error {
9292

9393
// 4. wait for n8n instance to be ready
9494

95-
if err := http.WaitForN8n(n8nMainServerURI); err != nil {
95+
if err := http.WaitForN8nReadiness(n8nMainServerURI); err != nil {
9696
return fmt.Errorf("exhausted retries while waiting for n8n instance to be ready: %w", err)
9797
}
9898

9999
for {
100100
// 5. fetch grant token for launcher
101101

102-
launcherGrantToken, err := auth.FetchGrantToken(n8nRunnersServerURI, token)
102+
launcherGrantToken, err := auth.FetchGrantTokenWithRetry(n8nRunnersServerURI, token)
103103
if err != nil {
104104
return fmt.Errorf("failed to fetch grant token for launcher: %w", err)
105105
}
@@ -120,7 +120,7 @@ func (l *LaunchCommand) Execute() error {
120120

121121
// 7. fetch grant token for runner
122122

123-
runnerGrantToken, err := auth.FetchGrantToken(n8nRunnersServerURI, token)
123+
runnerGrantToken, err := auth.FetchGrantTokenWithRetry(n8nRunnersServerURI, token)
124124
if err != nil {
125125
return fmt.Errorf("failed to fetch grant token for runner: %w", err)
126126
}

internal/http/retry.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,15 @@ type retryConfig struct {
2525
WaitTimeBetweenRetries time.Duration
2626
}
2727

28-
func retry(operationName string, operationFn func() error, cfg retryConfig) error {
28+
func retry[T any](operationName string, operationFn func() (T, error), cfg retryConfig) (T, error) {
2929
var lastErr error
30+
var zero T
3031
startTime := time.Now()
3132
attempt := 1
3233

3334
for {
3435
if cfg.MaxRetryTime > 0 && time.Since(startTime) > cfg.MaxRetryTime {
35-
return fmt.Errorf(
36+
return zero, fmt.Errorf(
3637
"gave up retrying operation `%s` on reaching max retry time %v, last error: %w",
3738
operationName,
3839
cfg.MaxRetryTime,
@@ -41,17 +42,17 @@ func retry(operationName string, operationFn func() error, cfg retryConfig) erro
4142
}
4243

4344
if cfg.MaxAttempts > 0 && attempt > cfg.MaxAttempts {
44-
return fmt.Errorf(
45+
return zero, fmt.Errorf(
4546
"gave up retrying operation `%s` on reaching max retry attempts %d, last error: %w",
4647
operationName,
4748
cfg.MaxAttempts,
4849
lastErr,
4950
)
5051
}
5152

52-
err := operationFn()
53+
str, err := operationFn()
5354
if err == nil {
54-
return nil
55+
return str, nil
5556
}
5657

5758
lastErr = err
@@ -63,7 +64,7 @@ func retry(operationName string, operationFn func() error, cfg retryConfig) erro
6364
}
6465

6566
// UnlimitedRetry retries an operation forever.
66-
func UnlimitedRetry(operationName string, operation func() error) error {
67+
func UnlimitedRetry[T any](operationName string, operation func() (T, error)) (T, error) {
6768
return retry(operationName, operation, retryConfig{
6869
MaxRetryTime: 0,
6970
MaxAttempts: 0,
@@ -72,7 +73,7 @@ func UnlimitedRetry(operationName string, operation func() error) error {
7273
}
7374

7475
// LimitedRetry retries an operation until max retry time or until max attempts.
75-
func LimitedRetry(operationName string, operation func() error) error {
76+
func LimitedRetry[T any](operationName string, operation func() (T, error)) (T, error) {
7677
return retry(operationName, operation, retryConfig{
7778
MaxRetryTime: defaultMaxRetryTime,
7879
MaxAttempts: defaultMaxRetries,

internal/http/wait_for_n8n.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,27 +28,27 @@ func sendReadinessRequest(n8nMainServerURI string) (*http.Response, error) {
2828
return client.Do(req)
2929
}
3030

31-
// WaitForN8n retries indefinitely until the n8n main instance is ready, i.e.
32-
// until its database is connected and migrated. In case of long-running
33-
// migrations, n8n instance readiness may take a long time.
34-
func WaitForN8n(n8nMainServerURI string) error {
31+
// WaitForN8nReadiness checks forever until the n8n main instance is ready, i.e.
32+
// until its DB is connected and migrated. In case of long-running migrations,
33+
// readiness may take a long time. Returns nil when ready.
34+
func WaitForN8nReadiness(n8nMainServerURI string) error {
3535
logs.Info("Waiting for n8n to be ready...")
3636

37-
readinessCheck := func() error {
37+
readinessCheck := func() (string, error) {
3838
resp, err := sendReadinessRequest(n8nMainServerURI)
3939
if err != nil {
40-
return fmt.Errorf("failed to send readiness check request to n8n main server: %w", err)
40+
return "", fmt.Errorf("n8n readiness check failed with error: %w", err)
4141
}
4242
defer resp.Body.Close()
4343

4444
if resp.StatusCode != http.StatusOK {
45-
return fmt.Errorf("readiness check failed with status code: %d", resp.StatusCode)
45+
return "", fmt.Errorf("readiness check failed with status code: %d", resp.StatusCode)
4646
}
4747

48-
return nil
48+
return "", nil
4949
}
5050

51-
if err := UnlimitedRetry("readiness-check", readinessCheck); err != nil {
51+
if _, err := UnlimitedRetry("readiness-check", readinessCheck); err != nil {
5252
return fmt.Errorf("encountered error while waiting for n8n to be ready: %w", err)
5353
}
5454

0 commit comments

Comments
 (0)