Skip to content

Commit a21ca45

Browse files
0xlukclaude
andcommitted
Simplify lastLogForTick: set from bob data, remove dedup
Set lastLogForTick inline based on position in bob's Logs array (last OK log gets the flag) instead of marking post-loop on batch entries. Remove deduplication — bob is the source of truth and events are overwritten in storage on resend. Revert BuildEventMessage to 3-arg signature since the flag is now set by the processor after building. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent c3e803b commit a21ca45

4 files changed

Lines changed: 68 additions & 85 deletions

File tree

bob-events-bridge/internal/e2e/e2e_test.go

Lines changed: 43 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -254,8 +254,8 @@ func TestE2E_EpochTransition(t *testing.T) {
254254
require.Contains(t, epochs, uint32(146))
255255
}
256256

257-
// TestE2E_Deduplication tests that duplicate events are not stored twice
258-
func TestE2E_Deduplication(t *testing.T) {
257+
// TestE2E_EventOverwrite tests that resent events are overwritten in storage (no dedup)
258+
func TestE2E_EventOverwrite(t *testing.T) {
259259
mockBob := NewMockBobServer(145, 22000000)
260260
defer mockBob.Close()
261261

@@ -301,7 +301,7 @@ func TestE2E_Deduplication(t *testing.T) {
301301
return exists
302302
}, "first event should be stored")
303303

304-
// Send duplicate
304+
// Resend the same event (bob resends on reconnect — no dedup, overwrites in storage)
305305
err = mockBob.SendTickStreamResult(bob.TickStreamResult{
306306
Epoch: 145,
307307
Tick: 22000001,
@@ -312,13 +312,13 @@ func TestE2E_Deduplication(t *testing.T) {
312312
require.NoError(t, err)
313313
mockBob.SendCatchUpComplete()
314314

315-
// Give some time for the duplicate to be processed
315+
// Give some time for the resent event to be processed
316316
time.Sleep(100 * time.Millisecond)
317317

318-
// Verify only one event is stored
318+
// Verify still one event stored (overwritten with same data)
319319
_, events, err := storageMgr.GetEventsForTick(22000001)
320320
require.NoError(t, err)
321-
require.Len(t, events, 1, "Duplicate event should not be stored")
321+
require.Len(t, events, 1, "Resent event should overwrite, not duplicate")
322322
}
323323

324324
// TestE2E_StatePersistence tests that state (lastTick/lastLogID/currentEpoch) is persisted
@@ -789,7 +789,8 @@ func TestE2E_IndexInTickResetsAcrossTicks(t *testing.T) {
789789
require.True(t, indicesB[2], "Expected IndexInTick 2 for tick B")
790790
}
791791

792-
// TestE2E_CrashRecoveryIndexInTick tests that IndexInTick is correctly recovered after crash
792+
// TestE2E_CrashRecoveryIndexInTick tests IndexInTick behavior after crash recovery.
793+
// Without dedup, resent events are re-processed with indexes continuing from the recovered count.
793794
func TestE2E_CrashRecoveryIndexInTick(t *testing.T) {
794795
tempDir := t.TempDir()
795796
tick := uint32(22000050)
@@ -839,7 +840,9 @@ func TestE2E_CrashRecoveryIndexInTick(t *testing.T) {
839840
_ = storageMgr.Close()
840841
}()
841842

842-
// Phase 2: Restart, send 2 duplicates + 1 new event, verify new event gets index 2
843+
// Phase 2: Restart, resend 2 events + 1 new event.
844+
// Without dedup, all 3 are processed. Index resumes from recovered count (2).
845+
// Resent events get indexes 2, 3 (overwritten in storage), new event gets index 4.
843846
mockBob2 := NewMockBobServer(145, 22000000)
844847
defer mockBob2.Close()
845848

@@ -860,7 +863,7 @@ func TestE2E_CrashRecoveryIndexInTick(t *testing.T) {
860863
_, err = mockBob2.WaitForSubscription(5 * time.Second)
861864
require.NoError(t, err)
862865

863-
// Resend the 2 duplicate events + 1 new event in one tickStream message
866+
// Resend the 2 events + 1 new event in one tickStream message
864867
var logs []bob.LogPayload
865868
for i := uint64(1); i <= 2; i++ {
866869
logs = append(logs, CreateLogPayload(145, tick, i, 0, map[string]any{
@@ -890,16 +893,17 @@ func TestE2E_CrashRecoveryIndexInTick(t *testing.T) {
890893
return len(events) >= 3
891894
}, "3 events should be stored after recovery")
892895

893-
// Verify the new event has IndexInTick=2
896+
// Verify all 3 events are stored
894897
service := grpc.NewEventsBridgeService(storageMgr2, zap.NewNop())
895898
resp, err := service.GetEventsForTick(ctx, &eventsbridge.GetEventsForTickRequest{Tick: tick})
896899
require.NoError(t, err)
897900
require.Len(t, resp.Events, 3)
898901

899-
// Find the event with logID=3 (the new one) and check its index
902+
// Without dedup, resent events continue from recovered index (2).
903+
// New event (logID=3) gets IndexInTick=4.
900904
for _, event := range resp.Events {
901905
if event.LogId == 3 {
902-
require.Equal(t, uint32(2), event.IndexInTick, "New event after crash recovery should have IndexInTick=2")
906+
require.Equal(t, uint32(4), event.IndexInTick, "New event after crash recovery should have IndexInTick=4 (resumed from recovered count)")
903907
}
904908
}
905909
}
@@ -1474,9 +1478,9 @@ func TestE2E_KafkaPublishFailure(t *testing.T) {
14741478
assert.Equal(t, uint64(1), msgs[0].LogID)
14751479
}
14761480

1477-
// TestE2E_IndexResetAfterDeduplication tests that IndexInTick resets to 0 for a new tick
1478-
// even when all events from the previous tick were deduplicated (pendingBatch is nil).
1479-
func TestE2E_IndexResetAfterDeduplication(t *testing.T) {
1481+
// TestE2E_IndexResetAfterResend tests that IndexInTick resets to 0 for a new tick
1482+
// even when events from the previous tick were resent (overwritten, no dedup).
1483+
func TestE2E_IndexResetAfterResend(t *testing.T) {
14801484
tempDir := t.TempDir()
14811485
tickOld := uint32(22000050) // Previous tick
14821486
tickNew := uint32(22000051) // New tick
@@ -1534,8 +1538,8 @@ func TestE2E_IndexResetAfterDeduplication(t *testing.T) {
15341538
_ = storageMgr.Close()
15351539
}()
15361540

1537-
// Phase 2: Restart, resend tickOld events (will be deduplicated), then send tickNew events.
1538-
// The tickNew events should start with index 0, not index 3.
1541+
// Phase 2: Restart, resend tickOld events (now re-processed and published), then send tickNew events.
1542+
// The tickNew events should start with index 0.
15391543
mockBob2 := NewMockBobServer(145, 22000000)
15401544
defer mockBob2.Close()
15411545

@@ -1557,7 +1561,7 @@ func TestE2E_IndexResetAfterDeduplication(t *testing.T) {
15571561
_, err = mockBob2.WaitForSubscription(5 * time.Second)
15581562
require.NoError(t, err)
15591563

1560-
// Resend the 3 events for tickOld (these will be deduplicated - already in storage)
1564+
// Resend the 3 events for tickOld (no dedup — re-processed and overwritten in storage)
15611565
var logsOld []bob.LogPayload
15621566
for i := uint64(1); i <= 3; i++ {
15631567
logsOld = append(logsOld, CreateLogPayloadWithTimestamp(145, tickOld, i, 0, map[string]any{
@@ -1575,7 +1579,7 @@ func TestE2E_IndexResetAfterDeduplication(t *testing.T) {
15751579
})
15761580
require.NoError(t, err)
15771581

1578-
// Now send 2 events for tickNew (these should get index 0, 1 - NOT 3, 4)
1582+
// Now send 2 events for tickNew (these should get index 0, 1)
15791583
var logsNew []bob.LogPayload
15801584
for i := uint64(4); i <= 5; i++ {
15811585
logsNew = append(logsNew, CreateLogPayloadWithTimestamp(145, tickNew, i, 0, map[string]any{
@@ -1600,14 +1604,20 @@ func TestE2E_IndexResetAfterDeduplication(t *testing.T) {
16001604
return len(events) >= 2
16011605
}, "2 events should be stored for tickNew")
16021606

1603-
// Verify that tickNew events have indexes 0 and 1 in Kafka (NOT 3 and 4)
1607+
// Resent tickOld events (3) + tickNew events (2) = 5 total Kafka messages
16041608
msgs := mockKafka2.Messages()
1605-
require.Len(t, msgs, 2, "Only 2 new events should be published to Kafka (tickOld events were deduplicated)")
1609+
require.Len(t, msgs, 5, "3 resent + 2 new events should be published to Kafka")
16061610

1607-
// The events should have index 0 and 1 for tickNew
1608-
for i, msg := range msgs {
1609-
require.Equal(t, uint64(i), msg.Index, "tickNew event %d should have index %d, not %d", i, i, msg.Index)
1610-
require.Equal(t, tickNew, msg.TickNumber, "Event should be for tickNew")
1611+
// tickNew events should have index 0 and 1 (tick boundary resets the index)
1612+
tickNewMsgs := make([]*kafka.EventMessage, 0)
1613+
for _, msg := range msgs {
1614+
if msg.TickNumber == tickNew {
1615+
tickNewMsgs = append(tickNewMsgs, msg)
1616+
}
1617+
}
1618+
require.Len(t, tickNewMsgs, 2)
1619+
for i, msg := range tickNewMsgs {
1620+
require.Equal(t, uint64(i), msg.Index, "tickNew event %d should have index %d", i, i)
16111621
}
16121622

16131623
// Also verify storage IndexInTick values
@@ -1704,8 +1714,8 @@ func TestE2E_NotificationsBeforeSubscribeResponse(t *testing.T) {
17041714
require.Equal(t, uint64(2), resp2.Events[0].LogId)
17051715
}
17061716

1707-
// TestE2E_KafkaDeduplication tests that deduplicated events are NOT published to Kafka
1708-
func TestE2E_KafkaDeduplication(t *testing.T) {
1717+
// TestE2E_KafkaResend tests that resent events ARE published to Kafka (no dedup)
1718+
func TestE2E_KafkaResend(t *testing.T) {
17091719
tempDir := t.TempDir()
17101720
tick := uint32(22000001)
17111721

@@ -1754,7 +1764,7 @@ func TestE2E_KafkaDeduplication(t *testing.T) {
17541764
_ = storageMgr.Close()
17551765
}()
17561766

1757-
// Phase 2: Restart and resend the same event — should be deduplicated
1767+
// Phase 2: Restart and resend the same event — now re-published to Kafka
17581768
mockBob2 := NewMockBobServer(145, 22000000)
17591769
defer mockBob2.Close()
17601770

@@ -1776,7 +1786,7 @@ func TestE2E_KafkaDeduplication(t *testing.T) {
17761786
_, err = mockBob2.WaitForSubscription(5 * time.Second)
17771787
require.NoError(t, err)
17781788

1779-
// Resend the same event (bob resends on reconnect)
1789+
// Resend the same event (bob resends on reconnect — no dedup)
17801790
payload := CreateLogPayloadWithTimestamp(145, tick, 1, 0, map[string]any{
17811791
"from": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
17821792
"to": "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB",
@@ -1813,8 +1823,9 @@ func TestE2E_KafkaDeduplication(t *testing.T) {
18131823
return exists
18141824
}, "new event should be stored")
18151825

1816-
// Verify only the new event was published to Kafka (duplicate was skipped)
1826+
// Both resent and new events should be published to Kafka (no dedup)
18171827
msgs := mockKafka2.Messages()
1818-
require.Len(t, msgs, 1, "Only non-duplicate event should be published to Kafka")
1819-
assert.Equal(t, uint64(2), msgs[0].LogID, "Only the new event should be published")
1828+
require.Len(t, msgs, 2, "Both resent and new events should be published to Kafka")
1829+
assert.Equal(t, uint64(1), msgs[0].LogID, "Resent event should be published")
1830+
assert.Equal(t, uint64(2), msgs[1].LogID, "New event should be published")
18201831
}

bob-events-bridge/internal/kafka/message.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func TransformEventBody(eventType uint32, body interface{}) (map[string]any, err
101101
}
102102

103103
// BuildEventMessage assembles a full Kafka EventMessage from bob log payload components.
104-
func BuildEventMessage(payload *bob.LogPayload, parsedBody interface{}, indexInTick uint32, lastLogForTick bool) (*EventMessage, error) {
104+
func BuildEventMessage(payload *bob.LogPayload, parsedBody interface{}, indexInTick uint32) (*EventMessage, error) {
105105
body, err := TransformEventBody(payload.Type, parsedBody)
106106
if err != nil {
107107
return nil, fmt.Errorf("failed to transform event body: %w", err)
@@ -118,6 +118,5 @@ func BuildEventMessage(payload *bob.LogPayload, parsedBody interface{}, indexInT
118118
Timestamp: payload.Timestamp,
119119
TransactionHash: payload.TxHash,
120120
Body: body,
121-
LastLogForTick: lastLogForTick,
122121
}, nil
123122
}

bob-events-bridge/internal/kafka/message_test.go

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ func TestBuildEventMessage(t *testing.T) {
243243
Amount: 1000,
244244
}
245245

246-
msg, err := BuildEventMessage(payload, body, 3, false)
246+
msg, err := BuildEventMessage(payload, body, 3)
247247
require.NoError(t, err)
248248

249249
assert.Equal(t, uint64(3), msg.Index)
@@ -258,7 +258,6 @@ func TestBuildEventMessage(t *testing.T) {
258258
assert.Equal(t, "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", msg.Body["source"])
259259
assert.Equal(t, "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB", msg.Body["destination"])
260260
assert.Equal(t, int64(1000), msg.Body["amount"])
261-
assert.False(t, msg.LastLogForTick)
262261
}
263262

264263
func TestBuildEventMessage_NilBody(t *testing.T) {
@@ -273,25 +272,8 @@ func TestBuildEventMessage_NilBody(t *testing.T) {
273272
TxHash: "TXHASH",
274273
}
275274

276-
msg, err := BuildEventMessage(payload, nil, 0, false)
275+
msg, err := BuildEventMessage(payload, nil, 0)
277276
require.NoError(t, err)
278277
assert.Nil(t, msg.Body)
279-
assert.False(t, msg.LastLogForTick)
280278
}
281279

282-
func TestBuildEventMessage_LastLogForTick(t *testing.T) {
283-
payload := &bob.LogPayload{
284-
OK: true,
285-
Epoch: 145,
286-
Tick: 22000001,
287-
Type: 0,
288-
LogID: 42,
289-
LogDigest: "abc123",
290-
Timestamp: uint64(1718461800),
291-
TxHash: "TXHASH",
292-
}
293-
294-
msg, err := BuildEventMessage(payload, nil, 0, true)
295-
require.NoError(t, err)
296-
assert.True(t, msg.LastLogForTick)
297-
}

bob-events-bridge/internal/processor/processor.go

Lines changed: 22 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -306,8 +306,16 @@ func (p *Processor) handleTickStreamResult(ctx context.Context, result *bob.Tick
306306
p.tickForIndex = result.Tick
307307
}
308308

309+
// Find last OK log index for setting lastLogForTick flag
310+
lastOKIdx := -1
311+
for i, log := range result.Logs {
312+
if log.OK {
313+
lastOKIdx = i
314+
}
315+
}
316+
309317
// Process each log in the tick
310-
for _, payload := range result.Logs {
318+
for i, payload := range result.Logs {
311319
if !payload.OK {
312320
p.logger.Debug("Skipping non-OK log message")
313321
p.metrics.IncProcessorEventsSkippedNonOK()
@@ -316,18 +324,7 @@ func (p *Processor) handleTickStreamResult(ctx context.Context, result *bob.Tick
316324

317325
p.metrics.IncProcessorEventsReceived(payload.Type)
318326

319-
// Deduplication check: skip if we already have this event
320-
exists, err := p.storage.HasEvent(p.currentEpoch, payload.Tick, payload.LogID)
321-
if err != nil {
322-
p.logger.Warn("Failed to check for duplicate event", zap.Error(err))
323-
} else if exists {
324-
p.logger.Debug("Skipping duplicate event",
325-
zap.Uint64("logID", payload.LogID),
326-
zap.Uint32("tick", payload.Tick),
327-
zap.Uint32("type", payload.Type))
328-
p.metrics.IncProcessorEventsDeduplicated(payload.Type)
329-
continue
330-
}
327+
isLastLog := i == lastOKIdx
331328

332329
// Validate and parse body into typed struct
333330
parsed, err := bob.ParseEventBody(payload.Type, payload.Body)
@@ -352,23 +349,25 @@ func (p *Processor) handleTickStreamResult(ctx context.Context, result *bob.Tick
352349
// Build kafka message (if publisher configured)
353350
var kafkaMsg *kafka.EventMessage
354351
if p.publisher != nil {
355-
kafkaMsg, err = kafka.BuildEventMessage(&payload, parsed, p.tickEventIndex, false)
352+
kafkaMsg, err = kafka.BuildEventMessage(&payload, parsed, p.tickEventIndex)
356353
if err != nil {
357354
return fmt.Errorf("failed to build kafka message: %w", err)
358355
}
356+
kafkaMsg.LastLogForTick = isLastLog
359357
}
360358

361359
// Create event proto
362360
event := &eventsbridge.Event{
363-
LogId: payload.LogID,
364-
Tick: payload.Tick,
365-
Epoch: uint32(payload.Epoch),
366-
EventType: payload.Type,
367-
TxHash: payload.TxHash,
368-
Timestamp: payload.Timestamp,
369-
Body: bodyStruct,
370-
IndexInTick: p.tickEventIndex,
371-
LogDigest: payload.LogDigest,
361+
LogId: payload.LogID,
362+
Tick: payload.Tick,
363+
Epoch: uint32(payload.Epoch),
364+
EventType: payload.Type,
365+
TxHash: payload.TxHash,
366+
Timestamp: payload.Timestamp,
367+
Body: bodyStruct,
368+
IndexInTick: p.tickEventIndex,
369+
LogDigest: payload.LogDigest,
370+
LastLogForTick: isLastLog,
372371
}
373372

374373
// Initialize batch if nil
@@ -403,14 +402,6 @@ func (p *Processor) handleTickStreamResult(ctx context.Context, result *bob.Tick
403402
zap.Int("batchSize", len(p.pendingBatch.protoEvents)))
404403
}
405404

406-
// Mark last event in tick
407-
if p.pendingBatch != nil && len(p.pendingBatch.protoEvents) > 0 {
408-
p.pendingBatch.protoEvents[len(p.pendingBatch.protoEvents)-1].LastLogForTick = true
409-
if len(p.pendingBatch.kafkaMsgs) > 0 {
410-
p.pendingBatch.kafkaMsgs[len(p.pendingBatch.kafkaMsgs)-1].LastLogForTick = true
411-
}
412-
}
413-
414405
return nil
415406
}
416407

0 commit comments

Comments
 (0)