Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions output/webhook/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

package webhook

import "github.com/blinklabs-io/adder/plugin"
import (
"time"

// import "github.com/blinklabs-io/adder/event"
"github.com/blinklabs-io/adder/plugin"
)

type WebhookOptionFunc func(*WebhookOutput)

Expand Down Expand Up @@ -49,3 +51,18 @@ func WithFormat(format string) WebhookOptionFunc {
o.format = format
}
}

// WithRetryConfig specifies the retry configuration for webhook delivery
func WithRetryConfig(maxRetries int, initialBackoff, maxBackoff time.Duration) WebhookOptionFunc {
return func(o *WebhookOutput) {
if maxRetries >= 0 {
o.maxRetries = maxRetries
}
if initialBackoff > 0 {
o.initialBackoff = initialBackoff
}
if maxBackoff > 0 {
o.maxBackoff = maxBackoff
}
}
}
114 changes: 96 additions & 18 deletions output/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,26 +37,40 @@ const (
mainnetNetworkMagic uint32 = 764824073
previewNetworkMagic uint32 = 2
preprodNetworkMagic uint32 = 1

// Default retry configuration
defaultMaxRetries = 3
defaultInitialBackoff = 1 * time.Second
defaultMaxBackoff = 30 * time.Second
defaultBackoffFactor = 2.0
)

type WebhookOutput struct {
errorChan chan error
eventChan chan event.Event
doneChan chan struct{}
wg sync.WaitGroup
logger plugin.Logger
format string
url string
username string
password string
skipVerify bool
errorChan chan error
eventChan chan event.Event
doneChan chan struct{}
wg sync.WaitGroup
logger plugin.Logger
format string
url string
username string
password string
skipVerify bool
maxRetries int
initialBackoff time.Duration
maxBackoff time.Duration
backoffFactor float64
}

func New(options ...WebhookOptionFunc) *WebhookOutput {
w := &WebhookOutput{
format: "adder",
url: "http://localhost:3000",
skipVerify: false,
format: "adder",
url: "http://localhost:3000",
skipVerify: false,
maxRetries: defaultMaxRetries,
initialBackoff: defaultInitialBackoff,
maxBackoff: defaultMaxBackoff,
backoffFactor: defaultBackoffFactor,
}
for _, option := range options {
option(w)
Expand Down Expand Up @@ -112,11 +126,8 @@ func (w *WebhookOutput) Start() error {
logger.Error("unknown event type: " + evt.Type)
return
}
// TODO: error handle (#334)
err := w.SendWebhook(&evt)
if err != nil {
logger.Error(fmt.Sprintf("ERROR: %s", err))
}
// Send webhook with retry logic and exponential backoff
w.sendWebhookWithRetry(&evt)
}
}
}()
Expand Down Expand Up @@ -315,6 +326,73 @@ func (w *WebhookOutput) SendWebhook(e *event.Event) error {
return nil
}

// sendWebhookWithRetry wraps SendWebhook with retry logic and exponential backoff
func (w *WebhookOutput) sendWebhookWithRetry(e *event.Event) {
logger := logging.GetLogger()
var lastErr error
backoff := w.initialBackoff

for attempt := 0; attempt <= w.maxRetries; attempt++ {
if attempt > 0 {
logger.Warn(
fmt.Sprintf(
"webhook delivery failed, retrying (attempt %d/%d) after %v",
attempt,
w.maxRetries,
backoff,
),
"url", w.url,
"event_type", e.Type,
"error", lastErr,
)
time.Sleep(backoff)

// Calculate next backoff with exponential increase
backoff = time.Duration(float64(backoff) * w.backoffFactor)
if backoff > w.maxBackoff {
backoff = w.maxBackoff
}
}

err := w.SendWebhook(e)
if err == nil {
if attempt > 0 {
logger.Info(
fmt.Sprintf("webhook delivery succeeded after %d retries", attempt),
"url", w.url,
"event_type", e.Type,
)
}
return
}
lastErr = err
}

// All retries exhausted
logger.Error(
fmt.Sprintf(
"webhook delivery failed after %d retries, giving up",
w.maxRetries,
),
"url", w.url,
"event_type", e.Type,
"error", lastErr,
)

// Send error to error channel for monitoring (non-blocking)
select {
case w.errorChan <- fmt.Errorf(
"webhook delivery to %s failed after %d retries: %w",
w.url,
w.maxRetries,
lastErr,
):
default:
// Error channel is full or closed, just log
logger.Warn("could not send error to error channel (full or closed)")
}
}

// Stop the webhook output
func (w *WebhookOutput) Stop() error {
if w.doneChan != nil {
Expand Down