Skip to content

Commit c3e803b

Browse files
committed
Add lastLogForTick flag to events
Consumers (Kafka, gRPC) need to know when they've received the last event for a given tick so they can finalize tick-level processing. - Add bool last_log_for_tick field to Event proto message - Add LastLogForTick to Kafka EventMessage struct - Mark last valid event in each tick batch after processing loop - Add processor, kafka, and storage tests for the new flag
1 parent 4132d06 commit c3e803b

8 files changed

Lines changed: 264 additions & 20 deletions

File tree

.github/workflows/push-docker-bob-events-bridge-dev.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name: Deploy bob-events-bridge prod images to GHCR
33
on:
44
push:
55
branches:
6-
- 17-bridge-address-bob-wslogs-removed-endpoint
6+
- 25-bob-events-bridge-add-last-log-flag
77
paths:
88
- 'bob-events-bridge/**'
99

@@ -44,4 +44,4 @@ jobs:
4444
context: ./bob-events-bridge
4545
file: ./bob-events-bridge/Dockerfile
4646
push: true
47-
tags: ghcr.io/qubic/bob-events-bridge:dev
47+
tags: ghcr.io/qubic/bob-events-bridge:25-bob-events-bridge-add-last-log-flag

bob-events-bridge/api/events-bridge/v1/messages.pb.go

Lines changed: 24 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bob-events-bridge/api/events-bridge/v1/messages.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ message Event {
1717
google.protobuf.Struct body = 7;
1818
uint32 index_in_tick = 8;
1919
string log_digest = 9;
20+
bool last_log_for_tick = 10;
2021
}
2122

2223
// TickInterval represents a range of ticks

bob-events-bridge/internal/kafka/message.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type EventMessage struct {
1818
Timestamp uint64 `json:"timestamp"`
1919
TransactionHash string `json:"transactionHash"`
2020
Body map[string]any `json:"body"`
21+
LastLogForTick bool `json:"lastLogForTick"`
2122
}
2223

2324
// TransformEventBody converts a typed bob event body into the Kafka body format
@@ -100,7 +101,7 @@ func TransformEventBody(eventType uint32, body interface{}) (map[string]any, err
100101
}
101102

102103
// BuildEventMessage assembles a full Kafka EventMessage from bob log payload components.
103-
func BuildEventMessage(payload *bob.LogPayload, parsedBody interface{}, indexInTick uint32) (*EventMessage, error) {
104+
func BuildEventMessage(payload *bob.LogPayload, parsedBody interface{}, indexInTick uint32, lastLogForTick bool) (*EventMessage, error) {
104105
body, err := TransformEventBody(payload.Type, parsedBody)
105106
if err != nil {
106107
return nil, fmt.Errorf("failed to transform event body: %w", err)
@@ -117,5 +118,6 @@ func BuildEventMessage(payload *bob.LogPayload, parsedBody interface{}, indexInT
117118
Timestamp: payload.Timestamp,
118119
TransactionHash: payload.TxHash,
119120
Body: body,
121+
LastLogForTick: lastLogForTick,
120122
}, nil
121123
}

bob-events-bridge/internal/kafka/message_test.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ func TestBuildEventMessage(t *testing.T) {
243243
Amount: 1000,
244244
}
245245

246-
msg, err := BuildEventMessage(payload, body, 3)
246+
msg, err := BuildEventMessage(payload, body, 3, false)
247247
require.NoError(t, err)
248248

249249
assert.Equal(t, uint64(3), msg.Index)
@@ -258,6 +258,7 @@ func TestBuildEventMessage(t *testing.T) {
258258
assert.Equal(t, "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", msg.Body["source"])
259259
assert.Equal(t, "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB", msg.Body["destination"])
260260
assert.Equal(t, int64(1000), msg.Body["amount"])
261+
assert.False(t, msg.LastLogForTick)
261262
}
262263

263264
func TestBuildEventMessage_NilBody(t *testing.T) {
@@ -272,7 +273,25 @@ func TestBuildEventMessage_NilBody(t *testing.T) {
272273
TxHash: "TXHASH",
273274
}
274275

275-
msg, err := BuildEventMessage(payload, nil, 0)
276+
msg, err := BuildEventMessage(payload, nil, 0, false)
276277
require.NoError(t, err)
277278
assert.Nil(t, msg.Body)
279+
assert.False(t, msg.LastLogForTick)
280+
}
281+
282+
func TestBuildEventMessage_LastLogForTick(t *testing.T) {
283+
payload := &bob.LogPayload{
284+
OK: true,
285+
Epoch: 145,
286+
Tick: 22000001,
287+
Type: 0,
288+
LogID: 42,
289+
LogDigest: "abc123",
290+
Timestamp: uint64(1718461800),
291+
TxHash: "TXHASH",
292+
}
293+
294+
msg, err := BuildEventMessage(payload, nil, 0, true)
295+
require.NoError(t, err)
296+
assert.True(t, msg.LastLogForTick)
278297
}

bob-events-bridge/internal/processor/processor.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,7 @@ func (p *Processor) handleTickStreamResult(ctx context.Context, result *bob.Tick
352352
// Build kafka message (if publisher configured)
353353
var kafkaMsg *kafka.EventMessage
354354
if p.publisher != nil {
355-
kafkaMsg, err = kafka.BuildEventMessage(&payload, parsed, p.tickEventIndex)
355+
kafkaMsg, err = kafka.BuildEventMessage(&payload, parsed, p.tickEventIndex, false)
356356
if err != nil {
357357
return fmt.Errorf("failed to build kafka message: %w", err)
358358
}
@@ -403,6 +403,14 @@ func (p *Processor) handleTickStreamResult(ctx context.Context, result *bob.Tick
403403
zap.Int("batchSize", len(p.pendingBatch.protoEvents)))
404404
}
405405

406+
// Mark last event in tick
407+
if p.pendingBatch != nil && len(p.pendingBatch.protoEvents) > 0 {
408+
p.pendingBatch.protoEvents[len(p.pendingBatch.protoEvents)-1].LastLogForTick = true
409+
if len(p.pendingBatch.kafkaMsgs) > 0 {
410+
p.pendingBatch.kafkaMsgs[len(p.pendingBatch.kafkaMsgs)-1].LastLogForTick = true
411+
}
412+
}
413+
406414
return nil
407415
}
408416

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package processor
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"testing"
7+
8+
"github.com/prometheus/client_golang/prometheus"
9+
"github.com/qubic/bob-events-bridge/internal/bob"
10+
"github.com/qubic/bob-events-bridge/internal/config"
11+
"github.com/qubic/bob-events-bridge/internal/kafka"
12+
"github.com/qubic/bob-events-bridge/internal/metrics"
13+
"github.com/qubic/bob-events-bridge/internal/storage"
14+
"github.com/stretchr/testify/assert"
15+
"github.com/stretchr/testify/require"
16+
"go.uber.org/zap"
17+
)
18+
19+
// mockPublisher implements kafka.Publisher for testing
20+
type mockPublisher struct {
21+
published []*kafka.EventMessage
22+
}
23+
24+
func (m *mockPublisher) PublishEvent(_ context.Context, msg *kafka.EventMessage) error {
25+
m.published = append(m.published, msg)
26+
return nil
27+
}
28+
29+
func (m *mockPublisher) PublishEvents(_ context.Context, msgs []*kafka.EventMessage) error {
30+
m.published = append(m.published, msgs...)
31+
return nil
32+
}
33+
34+
func (m *mockPublisher) Close() error { return nil }
35+
36+
// newTestProcessor creates a processor with real storage and mock publisher for testing
37+
func newTestProcessor(t *testing.T, publisher kafka.Publisher) *Processor {
38+
t.Helper()
39+
tempDir := t.TempDir()
40+
logger := zap.NewNop()
41+
mgr, err := storage.NewManager(tempDir, logger)
42+
require.NoError(t, err)
43+
t.Cleanup(func() { _ = mgr.Close() })
44+
45+
reg := prometheus.NewRegistry()
46+
m := metrics.NewBridgeMetrics(reg, "test")
47+
48+
cfg := &config.Config{}
49+
return NewProcessor(cfg, mgr, logger, publisher, m)
50+
}
51+
52+
func makeQuTransferBody(from, to string, amount int64) json.RawMessage {
53+
b, _ := json.Marshal(map[string]any{"from": from, "to": to, "amount": amount})
54+
return b
55+
}
56+
57+
func TestHandleTickStreamResult_LastLogForTick_MultipleEvents(t *testing.T) {
58+
pub := &mockPublisher{}
59+
p := newTestProcessor(t, pub)
60+
p.currentEpoch = 1
61+
62+
result := &bob.TickStreamResult{
63+
Epoch: 1,
64+
Tick: 100,
65+
Logs: []bob.LogPayload{
66+
{OK: true, Epoch: 1, Tick: 100, Type: 0, LogID: 1, LogDigest: "d1", BodySize: 10, Timestamp: 1000, TxHash: "tx1",
67+
Body: makeQuTransferBody("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB", 100)},
68+
{OK: true, Epoch: 1, Tick: 100, Type: 0, LogID: 2, LogDigest: "d2", BodySize: 10, Timestamp: 1001, TxHash: "tx2",
69+
Body: makeQuTransferBody("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC", 200)},
70+
{OK: true, Epoch: 1, Tick: 100, Type: 0, LogID: 3, LogDigest: "d3", BodySize: 10, Timestamp: 1002, TxHash: "tx3",
71+
Body: makeQuTransferBody("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", "DDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD", 300)},
72+
},
73+
}
74+
75+
err := p.handleTickStreamResult(context.Background(), result)
76+
require.NoError(t, err)
77+
require.NotNil(t, p.pendingBatch)
78+
79+
// Only last proto event should have LastLogForTick = true
80+
require.Len(t, p.pendingBatch.protoEvents, 3)
81+
assert.False(t, p.pendingBatch.protoEvents[0].LastLogForTick)
82+
assert.False(t, p.pendingBatch.protoEvents[1].LastLogForTick)
83+
assert.True(t, p.pendingBatch.protoEvents[2].LastLogForTick)
84+
85+
// Only last kafka msg should have LastLogForTick = true
86+
require.Len(t, p.pendingBatch.kafkaMsgs, 3)
87+
assert.False(t, p.pendingBatch.kafkaMsgs[0].LastLogForTick)
88+
assert.False(t, p.pendingBatch.kafkaMsgs[1].LastLogForTick)
89+
assert.True(t, p.pendingBatch.kafkaMsgs[2].LastLogForTick)
90+
}
91+
92+
func TestHandleTickStreamResult_LastLogForTick_SingleEvent(t *testing.T) {
93+
pub := &mockPublisher{}
94+
p := newTestProcessor(t, pub)
95+
p.currentEpoch = 1
96+
97+
result := &bob.TickStreamResult{
98+
Epoch: 1,
99+
Tick: 100,
100+
Logs: []bob.LogPayload{
101+
{OK: true, Epoch: 1, Tick: 100, Type: 0, LogID: 1, LogDigest: "d1", BodySize: 10, Timestamp: 1000, TxHash: "tx1",
102+
Body: makeQuTransferBody("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB", 100)},
103+
},
104+
}
105+
106+
err := p.handleTickStreamResult(context.Background(), result)
107+
require.NoError(t, err)
108+
require.NotNil(t, p.pendingBatch)
109+
110+
require.Len(t, p.pendingBatch.protoEvents, 1)
111+
assert.True(t, p.pendingBatch.protoEvents[0].LastLogForTick)
112+
113+
require.Len(t, p.pendingBatch.kafkaMsgs, 1)
114+
assert.True(t, p.pendingBatch.kafkaMsgs[0].LastLogForTick)
115+
}
116+
117+
func TestHandleTickStreamResult_LastLogForTick_MixedOKAndNonOK(t *testing.T) {
118+
pub := &mockPublisher{}
119+
p := newTestProcessor(t, pub)
120+
p.currentEpoch = 1
121+
122+
result := &bob.TickStreamResult{
123+
Epoch: 1,
124+
Tick: 100,
125+
Logs: []bob.LogPayload{
126+
{OK: true, Epoch: 1, Tick: 100, Type: 0, LogID: 1, LogDigest: "d1", BodySize: 10, Timestamp: 1000, TxHash: "tx1",
127+
Body: makeQuTransferBody("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB", 100)},
128+
{OK: false, Epoch: 1, Tick: 100, Type: 0, LogID: 2}, // non-OK, skipped
129+
{OK: true, Epoch: 1, Tick: 100, Type: 0, LogID: 3, LogDigest: "d3", BodySize: 10, Timestamp: 1002, TxHash: "tx3",
130+
Body: makeQuTransferBody("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC", 200)},
131+
{OK: false, Epoch: 1, Tick: 100, Type: 0, LogID: 4}, // non-OK, skipped — last in tick but not added
132+
},
133+
}
134+
135+
err := p.handleTickStreamResult(context.Background(), result)
136+
require.NoError(t, err)
137+
require.NotNil(t, p.pendingBatch)
138+
139+
// Only 2 valid events should be in batch
140+
require.Len(t, p.pendingBatch.protoEvents, 2)
141+
assert.False(t, p.pendingBatch.protoEvents[0].LastLogForTick)
142+
assert.True(t, p.pendingBatch.protoEvents[1].LastLogForTick, "Last valid event should have LastLogForTick=true")
143+
144+
require.Len(t, p.pendingBatch.kafkaMsgs, 2)
145+
assert.False(t, p.pendingBatch.kafkaMsgs[0].LastLogForTick)
146+
assert.True(t, p.pendingBatch.kafkaMsgs[1].LastLogForTick)
147+
}
148+
149+
func TestHandleTickStreamResult_LastLogForTick_NoPublisher(t *testing.T) {
150+
// Test without Kafka publisher (nil)
151+
p := newTestProcessor(t, nil)
152+
p.currentEpoch = 1
153+
154+
result := &bob.TickStreamResult{
155+
Epoch: 1,
156+
Tick: 100,
157+
Logs: []bob.LogPayload{
158+
{OK: true, Epoch: 1, Tick: 100, Type: 0, LogID: 1, LogDigest: "d1", BodySize: 10, Timestamp: 1000, TxHash: "tx1",
159+
Body: makeQuTransferBody("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB", 100)},
160+
{OK: true, Epoch: 1, Tick: 100, Type: 0, LogID: 2, LogDigest: "d2", BodySize: 10, Timestamp: 1001, TxHash: "tx2",
161+
Body: makeQuTransferBody("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC", 200)},
162+
},
163+
}
164+
165+
err := p.handleTickStreamResult(context.Background(), result)
166+
require.NoError(t, err)
167+
require.NotNil(t, p.pendingBatch)
168+
169+
// Proto events should still have flag set
170+
require.Len(t, p.pendingBatch.protoEvents, 2)
171+
assert.False(t, p.pendingBatch.protoEvents[0].LastLogForTick)
172+
assert.True(t, p.pendingBatch.protoEvents[1].LastLogForTick)
173+
174+
// No kafka messages when publisher is nil
175+
assert.Empty(t, p.pendingBatch.kafkaMsgs)
176+
}

bob-events-bridge/internal/storage/manager_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,34 @@ func TestCountEventsForTick(t *testing.T) {
393393
require.Equal(t, uint32(0), count)
394394
}
395395

396+
func TestLastLogForTickRoundTrip(t *testing.T) {
397+
tempDir := t.TempDir()
398+
manager := createTestManager(t, tempDir)
399+
defer manager.Close() //nolint:errcheck
400+
401+
// Store event with LastLogForTick = true
402+
event := createTestEvent(1, 100, 1)
403+
event.LastLogForTick = true
404+
err := manager.StoreEvent(event)
405+
require.NoError(t, err)
406+
407+
// Retrieve and verify
408+
_, events, err := manager.GetEventsForTick(100)
409+
require.NoError(t, err)
410+
require.Len(t, events, 1)
411+
require.True(t, events[0].LastLogForTick, "Expected LastLogForTick to round-trip as true")
412+
413+
// Store event with LastLogForTick = false (default)
414+
event2 := createTestEvent(1, 101, 2)
415+
err = manager.StoreEvent(event2)
416+
require.NoError(t, err)
417+
418+
_, events2, err := manager.GetEventsForTick(101)
419+
require.NoError(t, err)
420+
require.Len(t, events2, 1)
421+
require.False(t, events2[0].LastLogForTick, "Expected LastLogForTick to round-trip as false")
422+
}
423+
396424
func TestBasePath(t *testing.T) {
397425
// Create temp directory
398426
tempDir := t.TempDir()

0 commit comments

Comments
 (0)