@@ -19,8 +19,10 @@ import (
1919 "encoding/binary"
2020 "fmt"
2121 "hash/fnv"
22+ "math/rand"
2223 "strings"
2324 "sync"
25+ "time"
2426
2527 "github.com/vmihailenco/msgpack/v5"
2628 "k8s.io/client-go/util/workqueue"
@@ -33,6 +35,14 @@ import (
3335const (
3436 defaultEventSourceDeviceTier = "GPU"
3537 defaultPodSelector = "llm-d.ai/inferenceServing=true"
38+ // maxParentLookupRetries is the number of times to retry looking up a missing parent block
39+ // before falling back to EmptyBlockHash. This handles race conditions where child events
40+ // arrive before parent events (e.g., in P/D disaggregation).
41+ maxParentLookupRetries = 3
42+ // initialRetryDelay is the base delay for exponential backoff when retrying parent lookup.
43+ initialRetryDelay = 10 * time .Millisecond
44+ // maxRetryDelay is the maximum backoff delay for retrying parent lookup.
45+ maxRetryDelay = 100 * time .Millisecond
3646)
3747
3848// Config holds the configuration for the event processing pool.
@@ -95,6 +105,9 @@ type Message struct {
95105 PodIdentifier string
96106 // ModelName is the name of the model that is associated with this event.
97107 ModelName string
108+ // Retries tracks the number of times this message has been retried due to missing parent blocks.
109+ // Used for exponential backoff when parent blocks are not yet available.
110+ Retries int
98111}
99112
100113// Pool is a sharded worker pool that processes events from ZMQ subscribers.
@@ -172,6 +185,88 @@ func (p *Pool) AddTask(task *Message) {
172185 p .queues [queueIndex ].Add (task )
173186}
174187
188+ // calculateBackoff computes the exponential backoff delay with jitter for retrying
189+ // parent block lookups. The delay doubles with each retry, capped at maxRetryDelay.
190+ func calculateBackoff (retries int ) time.Duration {
191+ if retries <= 0 {
192+ return initialRetryDelay
193+ }
194+
195+ // Exponential backoff: initialRetryDelay * 2^(retries-1)
196+ // retries=1: 10ms, retries=2: 20ms, retries=3: 40ms, retries=4: 80ms
197+ // Cap exponent at 10 to prevent overflow (2^10 = 1024x is already huge)
198+ exponent := min (retries - 1 , 10 )
199+ delay := initialRetryDelay * (1 << exponent )
200+
201+ // Cap at maxRetryDelay
202+ if delay > maxRetryDelay {
203+ delay = maxRetryDelay
204+ }
205+
206+ // Add jitter (±20%) to avoid thundering herd
207+ jitterRange := int64 (delay / 5 )
208+ if jitterRange > 0 {
209+ //nolint:gosec // crypto-grade randomness not needed for backoff jitter
210+ jitter := time .Duration (rand .Int63n (jitterRange ))
211+ delay += jitter
212+ }
213+
214+ return delay
215+ }
216+
217+ // resolveParentKey attempts to resolve a parent block's request key.
218+ // Returns (requestKey, shouldRetry, shouldSkip).
219+ // - shouldRetry: true if event should be added to retry batch.
220+ // - shouldSkip: true if event should not be processed in this iteration.
221+ func (p * Pool ) resolveParentKey (
222+ ctx context.Context ,
223+ msg * Message ,
224+ parentBlockHash any ,
225+ podIdentifier string ,
226+ ) (kvblock.BlockHash , bool , bool ) {
227+ debugLogger := log .FromContext (ctx ).V (logging .DEBUG )
228+
229+ hash , err := getHashAsUint64 (parentBlockHash )
230+ if err != nil {
231+ debugLogger .Error (err , "Failed to convert parent block hash for BlockStored event" ,
232+ "rawHash" , parentBlockHash )
233+ return kvblock .EmptyBlockHash , false , true // skip this event
234+ }
235+
236+ parentEngineKey := kvblock .BlockHash (hash )
237+ key , err := p .index .GetRequestKey (ctx , parentEngineKey )
238+
239+ if err == nil {
240+ // Parent found successfully
241+ if msg .Retries > 0 {
242+ debugLogger .Info ("Parent block found after retry" ,
243+ "parentEngineKey" , parentEngineKey ,
244+ "retriesNeeded" , msg .Retries ,
245+ "podIdentifier" , podIdentifier )
246+ }
247+ return key , false , false
248+ }
249+
250+ // Parent block not found
251+ if msg .Retries < maxParentLookupRetries {
252+ // Add to retry batch
253+ debugLogger .V (1 ).Info ("Parent block not found, marking for retry" ,
254+ "parentEngineKey" , parentEngineKey ,
255+ "retries" , msg .Retries ,
256+ "podIdentifier" , podIdentifier )
257+
258+ return kvblock .EmptyBlockHash , true , true // add to retry batch and skip processing
259+ }
260+
261+ // Max retries exceeded - fall back to EmptyBlockHash
262+ debugLogger .Info ("Parent block not found after max retries, using EmptyBlockHash" ,
263+ "parentEngineKey" , parentEngineKey ,
264+ "retries" , msg .Retries ,
265+ "podIdentifier" , podIdentifier )
266+
267+ return kvblock .EmptyBlockHash , false , false
268+ }
269+
175270// worker is the main processing loop for a single worker goroutine.
176271// It processes messages from its dedicated queue using the workqueue pattern.
177272// TODO: profile and benchmark cases like backpressure, slow processing (profile), etc.
@@ -225,17 +320,18 @@ func (p *Pool) processEvent(ctx context.Context, msg *Message) {
225320 events = append (events , event )
226321 }
227322
228- podIdentifier := msg .PodIdentifier
229- modelName := msg .ModelName
230- p .digestEvents (ctx , podIdentifier , modelName , events )
323+ p .digestEvents (ctx , msg , events )
231324}
232325
233- func (p * Pool ) digestEvents (ctx context.Context , podIdentifier , modelName string ,
234- events [] event ,
235- ) {
326+ func (p * Pool ) digestEvents (ctx context.Context , msg * Message , events [] event ) {
327+ podIdentifier := msg . PodIdentifier
328+ modelName := msg . ModelName
236329 debugLogger := log .FromContext (ctx ).V (logging .DEBUG )
237330 debugLogger .V (logging .TRACE ).Info ("Digesting events" , "count" , len (events ))
238331
332+ // Collect events with missing parents for batch retry
333+ var eventsToRetry []event
334+
239335 // Process each event in the batch
240336 for _ , event := range events {
241337 switch ev := event .(type ) {
@@ -271,22 +367,14 @@ func (p *Pool) digestEvents(ctx context.Context, podIdentifier, modelName string
271367
272368 var parentRequestKey kvblock.BlockHash
273369 if ev .ParentBlockHash != nil {
274- hash , err := getHashAsUint64 (ev .ParentBlockHash )
275- if err != nil {
276- debugLogger .Error (err , "Failed to convert parent block hash for BlockStored event" ,
277- "rawHash" , ev .ParentBlockHash )
370+ var shouldRetry , shouldSkip bool
371+ parentRequestKey , shouldRetry , shouldSkip = p .resolveParentKey (ctx , msg , ev .ParentBlockHash , podIdentifier )
372+ if shouldSkip {
373+ if shouldRetry {
374+ eventsToRetry = append (eventsToRetry , ev )
375+ }
278376 continue
279377 }
280-
281- parentEngineKey := kvblock .BlockHash (hash )
282-
283- key , err := p .index .GetRequestKey (ctx , parentEngineKey )
284- if err != nil {
285- debugLogger .Error (err , "Failed to get request key for parent block" ,
286- "parentEngineKey" , parentEngineKey , "effectiveModelName" , effectiveModelName )
287- continue
288- }
289- parentRequestKey = key
290378 }
291379
292380 requestKeys := p .tokenProcessor .TokensToKVBlockKeys (parentRequestKey , ev .TokenIds , effectiveModelName )
@@ -331,6 +419,56 @@ func (p *Pool) digestEvents(ctx context.Context, podIdentifier, modelName string
331419 debugLogger .Info ("Unknown event" , "podIdentifier" , podIdentifier , "event" , ev )
332420 }
333421 }
422+
423+ // After processing all events, handle events that need retry
424+ if len (eventsToRetry ) > 0 {
425+ retries := msg .Retries + 1
426+ delay := calculateBackoff (retries )
427+
428+ debugLogger .Info ("Batching events for retry" ,
429+ "eventCount" , len (eventsToRetry ),
430+ "retries" , retries ,
431+ "delay" , delay ,
432+ "podIdentifier" , podIdentifier )
433+
434+ // Create new EventBatch with only events needing retry
435+ retryBatch := EventBatch {
436+ Events : make ([]msgpack.RawMessage , 0 , len (eventsToRetry )),
437+ }
438+
439+ // Marshal each event for retry
440+ for _ , event := range eventsToRetry {
441+ eventBytes , err := msgpack .Marshal (event )
442+ if err != nil {
443+ debugLogger .Error (err , "Failed to marshal event for retry" )
444+ continue
445+ }
446+ retryBatch .Events = append (retryBatch .Events , eventBytes )
447+ }
448+
449+ // Marshal the batch
450+ batchPayload , err := msgpack .Marshal (retryBatch )
451+ if err != nil {
452+ debugLogger .Error (err , "Failed to marshal retry batch" )
453+ return
454+ }
455+
456+ // Create new message with events to retry
457+ retryMsg := & Message {
458+ Topic : msg .Topic ,
459+ Payload : batchPayload ,
460+ Seq : msg .Seq ,
461+ PodIdentifier : msg .PodIdentifier ,
462+ ModelName : msg .ModelName ,
463+ Retries : retries ,
464+ }
465+
466+ // Schedule retry with backoff
467+ go func (task * Message , retryDelay time.Duration ) {
468+ time .Sleep (retryDelay )
469+ p .AddTask (task )
470+ }(retryMsg , delay )
471+ }
334472}
335473
336474// getHashAsUint64 converts a block hash from an `any` type to a uint64.
0 commit comments