Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions pkg/durableemitter/durable_emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,8 +486,16 @@ func (d *DurableEmitter) deliveryCallback(id int64, eventPb *chipingress.CloudEv
if d.metrics != nil {
d.metrics.publishBatchEvErr.Add(cbCtx, 1)
}
// Batch path failed. If a fallback client is configured, retry the
// single event directly; otherwise leave in DB for retransmit.
// Permanent failures (e.g. a missing schema) will never succeed on
// retry. Drop the event from persistence instead of falling back or
// retransmitting it forever.
if reason, ok := nonRetryablePublishError(sendErr); ok {
d.dropNonRetryable(id, eventPb, reason)
return
}
// Batch path failed with a retryable error. If a fallback client is
// configured, retry the single event directly; otherwise leave in DB
// for retransmit.
d.tryFallback(id, eventPb)
return
}
Expand Down
12 changes: 11 additions & 1 deletion pkg/durableemitter/durable_emitter_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ type durableEmitterMetrics struct {
publishBatchErr metric.Int64Counter
publishBatchEvOK metric.Int64Counter
publishBatchEvErr metric.Int64Counter
deliverComplete metric.Int64Counter
// nonRetryableDropped counts events removed from persistence after a
// permanent (non-retryable) publish failure such as a missing schema.
nonRetryableDropped metric.Int64Counter
deliverComplete metric.Int64Counter
expiredPurged metric.Int64Counter
storeOps metric.Int64Counter
storeOpDuration metric.Float64Histogram
Expand Down Expand Up @@ -162,6 +165,13 @@ func newDurableEmitterMetrics(meter metric.Meter) (*durableEmitterMetrics, error
); err != nil {
return nil, err
}
if m.nonRetryableDropped, err = meter.Int64Counter(
"durable_emitter.non_retryable_dropped",
metric.WithUnit("{event}"),
metric.WithDescription("Events removed from store after a non-retryable publish failure (e.g. missing schema); not retransmitted"),
); err != nil {
return nil, err
}
if m.deliverComplete, err = meter.Int64Counter(
"durable_emitter.delivery.completed",
metric.WithUnit("{event}"),
Expand Down
94 changes: 94 additions & 0 deletions pkg/durableemitter/durable_emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,100 @@ func TestDurableEmitter_MetricsRegistersEmitSuccess(t *testing.T) {
assert.True(t, found, "expected durable_emitter.emit.success in exported metrics")
}

// readCounterSum collects metrics from the reader and returns the summed value
// of the named Int64 counter, plus whether the counter was found.
func readCounterSum(t *testing.T, reader *sdkmetric.ManualReader, name string) (int64, bool) {
t.Helper()
var rm metricdata.ResourceMetrics
require.NoError(t, reader.Collect(context.Background(), &rm))
for _, sm := range rm.ScopeMetrics {
for _, m := range sm.Metrics {
if m.Name != name {
continue
}
sum, ok := m.Data.(metricdata.Sum[int64])
if !ok {
return 0, false
}
var total int64
for _, dp := range sum.DataPoints {
total += dp.Value
}
return total, true
}
}
return 0, false
}

// TestDurableEmitter_NonRetryableSchemaMissingDrops verifies that when chip
// ingress rejects an event with PUBLISH_ERROR_CODE_SCHEMA_MISSING, the event is
// removed from persistence and never retransmitted, even with the retransmit
// loop enabled and ticking aggressively.
func TestDurableEmitter_NonRetryableSchemaMissingDrops(t *testing.T) {
meter, reader := newTestMeter(t)

store := NewMemDurableEventStore()
be := newTestBatchEmitter()
be.setPublishErr(errors.New("PUBLISH_ERROR_CODE_SCHEMA_MISSING: schema cre-workflows.v2.WorkflowExecutionProfile:-1 not found"))

cfg := DefaultConfig()
// Aggressive retransmit cadence so that, if the event were left in the DB,
// it would be re-queued many times during the wait below.
cfg.RetransmitInterval = 20 * time.Millisecond
cfg.RetransmitAfter = 5 * time.Millisecond
cfg.Metrics = &DurableEmitterMetricsConfig{PollInterval: time.Hour}

em, err := NewDurableEmitter(store, be, nil, true, cfg, logger.Test(t), meter)
require.NoError(t, err)
servicetest.Run(t, em)
ctx := t.Context()

require.NoError(t, em.Emit(ctx, []byte("schema-missing"), testEmitAttrs()...))

// The delivery callback should delete the row from persistence.
require.Eventually(t, func() bool {
return store.Len() == 0
}, 2*time.Second, 10*time.Millisecond, "non-retryable event must be removed from persistence")

// Give the retransmit loop several ticks to (incorrectly) re-queue if the
// drop logic were missing.
time.Sleep(200 * time.Millisecond)

assert.Equal(t, int64(1), be.callCount.Load(),
"non-retryable event must be queued exactly once and never retransmitted")
assert.Equal(t, int64(0), em.PendingDepth(), "pending depth must return to zero after drop")
assert.Equal(t, 0, store.Len(), "store must remain empty (no retransmit re-inserts)")

dropped, found := readCounterSum(t, reader, "durable_emitter.non_retryable_dropped")
require.True(t, found, "expected durable_emitter.non_retryable_dropped in exported metrics")
assert.Equal(t, int64(1), dropped, "exactly one event should be counted as non-retryable dropped")
}

// TestDurableEmitter_RetryableErrorIsRetransmitted is the contrast case: an
// error that is NOT classified as non-retryable leaves the event in the DB and
// the retransmit loop keeps re-queuing it.
func TestDurableEmitter_RetryableErrorIsRetransmitted(t *testing.T) {
store := NewMemDurableEventStore()
be := newTestBatchEmitter()
be.setPublishErr(errors.New("rpc error: code = Unavailable desc = connection error"))

cfg := DefaultConfig()
cfg.RetransmitInterval = 20 * time.Millisecond
cfg.RetransmitAfter = 5 * time.Millisecond

em, err := NewDurableEmitter(store, be, nil, true, cfg, logger.Test(t), nil)
require.NoError(t, err)
servicetest.Run(t, em)
ctx := t.Context()

require.NoError(t, em.Emit(ctx, []byte("transient"), testEmitAttrs()...))

// Retryable errors keep the row and trigger repeated retransmit attempts.
require.Eventually(t, func() bool {
return be.callCount.Load() >= 3 && store.Len() == 1
}, 2*time.Second, 10*time.Millisecond, "retryable event must remain persisted and be retransmitted")
}

// mockChipServer implements ChipIngressServer with controllable behaviour.
type mockChipServer struct {
pb.UnimplementedChipIngressServer
Expand Down
64 changes: 64 additions & 0 deletions pkg/durableemitter/non_retryable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package durableemitter

import (
"strings"

"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
)

// nonRetryablePublishErrorMarkers lists substrings that identify chip-ingress
// publish failures that will never succeed on retry for a given event. We match
// on the error message rather than a typed error code so this stays decoupled
// from the chip-ingress error API/version — we only inspect the error we
// actually receive on our side.
//
// This is the single extension point for non-retryable handling: add further
// permanently-failing markers here as they are identified.
var nonRetryablePublishErrorMarkers = []string{
// The event's schema is not registered in the chip-ingress schema registry.
// Republishing the same event will keep failing until the schema is
// registered server-side, so there is no point retransmitting it.
"PUBLISH_ERROR_CODE_SCHEMA_MISSING",
}

// nonRetryablePublishError reports whether err represents a permanent chip
// publish failure that should not be retransmitted, returning the matched
// marker for logging/metrics.
func nonRetryablePublishError(err error) (string, bool) {
if err == nil {
return "", false
}
msg := err.Error()
for _, marker := range nonRetryablePublishErrorMarkers {
if strings.Contains(msg, marker) {
return marker, true
}
}
return "", false
}

// dropNonRetryable removes an event from persistence after a permanent publish
// failure and logs a warning. The event is intentionally not retransmitted.
// pendingCount is decremented so queue-depth metrics stay accurate.
func (d *DurableEmitter) dropNonRetryable(id int64, eventPb *chipingress.CloudEventPb, reason string) {
d.eng.Warnw("DurableEmitter: dropping event with non-retryable publish error; will not retransmit",
"id", id,
"eventID", eventPb.GetId(),
"source", eventPb.GetSource(),
"type", eventPb.GetType(),
"reason", reason,
)

ctx, cancel := d.stopCh.CtxWithTimeout(d.cfg.PublishTimeout)
defer cancel()

if err := d.store.Delete(ctx, id); err != nil {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can do in a followup, but I think as is, this will be inefficient as we are going to collect many many non retryable events at a time

d.eng.Errorw("DurableEmitter: failed to delete non-retryable event from store", "id", id, "error", err)
return
}

d.decPending(1)
if d.metrics != nil {
d.metrics.nonRetryableDropped.Add(ctx, 1)
}
}
Loading