Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughReplaces manifest assembly with Helm rendering, updates antithesis image list and gitignore, bumps pointer imports to v3, adds cross-ledger Kafka-backed transaction verification (EventChecker), introduces a generic event payload extractor, and improves error handling and nolint placements. Changes
Sequence Diagram(s)sequenceDiagram
participant Main as eventually_correct\nmain.go
participant Client as Formance Client
participant EventChecker as EventChecker
participant Kafka as Kafka Consumer
participant Buffer as Event Buffer
Main->>Client: List ledgers (PageSize)
Client-->>Main: Return ledgers
Main->>EventChecker: CheckTransactions(ctx, client, ledgerNames)
EventChecker->>Kafka: Initialize consumer & subscribe to topic
loop Initialize per-ledger
EventChecker->>Client: List transactions (cursor)
Client-->>EventChecker: Return transaction page
EventChecker->>Buffer: Buffer transactions
end
loop Consume COMMITTED_TRANSACTIONS events
Kafka-->>EventChecker: Event message
EventChecker->>EventChecker: ExtractEventPayload & validate
EventChecker->>Buffer: findInBuffer() — try match
alt Match found
EventChecker->>EventChecker: Record matched transaction
else No match
EventChecker->>Client: fetchNext() for ledger cursor
Client-->>EventChecker: Next transaction page
EventChecker->>Buffer: Buffer and retry match
end
EventChecker->>EventChecker: Advance cursor as needed
end
EventChecker->>EventChecker: Assert all transactions matched
EventChecker-->>Main: Report verification results
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
🧹 Nitpick comments (3)
workload/bin/cmds/eventually_correct/events.go (2)
25-53: Hardcoded Kafka bootstrap server address.The Kafka bootstrap server
formance-kafka-bootstrap.kafka.svc.cluster.local:9092(line 29) and topicstack0.ledger(line 41) are hardcoded. If the deployment topology changes, this code silently connects to the wrong (or non-existent) broker. Consider extracting these as configuration values (environment variables or parameters).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workload/bin/cmds/eventually_correct/events.go` around lines 25 - 53, The checkEvents function currently hardcodes the Kafka bootstrap.servers value and the topic name when creating the consumer and calling consumer.SubscribeTopics; change checkEvents to read the bootstrap server address and topic(s) from configuration (e.g., environment variables or passed parameters) instead of the literal "formance-kafka-bootstrap.kafka.svc.cluster.local:9092" and "stack0.ledger", validate they are present, and use those variables in the kafka.NewConsumer config map and consumer.SubscribeTopics call (also ensure the group.id formation using random.GetRandom() remains unchanged).
105-131: Value-typeEventCheckerstored in map — mutations require write-back.
EventCheckeris a struct (value type) stored incheckersmap. Any mutation tocheckerafterchecker := checkers[eventTx.Ledger](line 153) requires writing it back withcheckers[eventTx.Ledger] = checker(line 174). This pattern is correctly implemented in the polling loop, but the same care is needed infindInBufferandfetchNext— these modify the receiver's fields (data,eventBuffer,nextCursor) through a pointer receiver, so they work correctly sincecheckeris addressed.Actually,
fetchNextandfindInBuffertake a pointer receiver — good, the modifications propagate to the localcheckervariable. The write-back on line 174 then persists them to the map. This is correct but fragile; consider using a*EventCheckerin the map to avoid this footgun.Use pointer values in the map
- checkers = map[string]EventChecker{} + checkers = map[string]*EventChecker{} ... - checkers[ledger] = EventChecker{ + checkers[ledger] = &EventChecker{ data: res.V2TransactionsCursorResponse.Cursor.Data, nextCursor: res.V2TransactionsCursorResponse.Cursor.Next, eventBuffer: []internal.CommittedTransactions{}, }This eliminates the need for the write-back on line 174 and prevents subtle bugs if a future code path forgets it.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workload/bin/cmds/eventually_correct/events.go` around lines 105 - 131, checkers currently stores EventChecker by value which requires manual write-back after mutations; change the map type from map[string]EventChecker to map[string]*EventChecker, instantiate entries with &EventChecker{...} when populating in checkForAllLedgers, and update all usages (including where you do checker := checkers[eventTx.Ledger]) to work with pointers so findInBuffer and fetchNext (pointer receivers) mutate the stored object directly and you can remove the manual write-back (checkers[eventTx.Ledger] = checker).config/Justfile (1)
3-5:helm dependency buildis redundant afterhelm dependency update.
helm dependency updatealready rebuilds thecharts/directory from the lock file it just fetched. Runningbuildimmediately after is a no-op in the normal case.Additionally,
$LEDGER_PREVIOUS_TAGis not quoted; if the variable is unset or empty, Helm will silently receive an empty string. Consider adding a guard or quoting it.Suggested simplification
rm -f -- '{{output}}' helm dependency update - helm dependency build - helm template . --include-crds --set "ledger.previous_tag=$LEDGER_PREVIOUS_TAG" >> '{{output}}' + helm template . --include-crds --set "ledger.previous_tag=${LEDGER_PREVIOUS_TAG:?LEDGER_PREVIOUS_TAG must be set}" >> '{{output}}'🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@config/Justfile` around lines 3 - 5, Remove the redundant helm dependency build invocation after helm dependency update and ensure the LEDGER_PREVIOUS_TAG is quoted or guarded when passed to helm template: delete the `helm dependency build` line, and change the helm template invocation that sets ledger.previous_tag (the current line using $LEDGER_PREVIOUS_TAG) to either wrap the variable in quotes or add a guard to avoid passing an empty value (e.g., check presence of LEDGER_PREVIOUS_TAG before calling helm template or quote the variable in the --set expression) so Helm does not receive an unintended empty string.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@workload/bin/cmds/eventually_correct/events.go`:
- Around line 86-103: In findInBuffer you are indexing bufTx.Transactions[0]
without verifying the slice is non-empty—add a guard (e.g., check
len(bufTx.Transactions) > 0) before using Transactions[0] and skip or remove any
buffer entries with empty Transactions; apply the same defensive checks inside
transactionEventMatches where Transactions[0] is accessed (or modify the
ingestion code that appends into eventBuffer to filter out events with empty
Transactions), ensuring all places that read Transactions[0] first validate
len(... )>0 and handle/skip malformed events.
- Around line 235-251: transactionEventMatches currently dereferences
actual.InsertedAt and indexes event.Transactions[0] without guards; add checks
at the top of transactionEventMatches to return false if actual.InsertedAt is
nil or if len(event.Transactions) == 0, and also ensure the referenced
event.Transactions[0].Postings slice length is validated before indexing
(compare lengths with actual.Postings and bail false if mismatch); keep the rest
of the comparison logic the same but only perform dereference of
actual.InsertedAt and per-posting indexing after these guards.
- Around line 153-174: The current block reads checker.data[0] and
eventTx.Transactions[0] without bounds checks causing panics; before using those
indices, ensure the checker exists and both slices are non-empty: check
len(eventTx.Transactions) > 0 and fetch/ensure checker.data has items (call
checker.fetchNext(ctx, client, eventTx.Ledger, details) if checker.data is empty
and handle any error) then proceed to compare IDs; if eventTx.Transactions is
empty, skip or log and append to checker.eventBuffer as appropriate; keep the
existing loop that calls checker.fetchNext and checker.findInBuffer, and make
sure to store the updated checker back into checkers[eventTx.Ledger].
In `@workload/bin/init/main.go`:
- Around line 67-70: The IsAlreadyExists(err) branch prints "Stack already
exists" without a newline; update the fmt.Printf call inside the
apierrors.IsAlreadyExists(err) block (where fmt.Printf("Stack already exists")
is used) to include a trailing "\n" or replace it with fmt.Println("Stack
already exists") so the message ends with a newline before returning.
In `@workload/internal/events.go`:
- Around line 28-47: TransactionEvent.TxID is a *big.Int and can be nil, but
call sites in events.go call TxID.Cmp(...) without nil checks; either change the
struct to use a non-pointer big.Int (TransactionEvent.TxID big.Int) and update
JSON un/marshalling to produce a zero value when absent, or add nil guards at
each call site in events.go (e.g., if te.TxID == nil { return error/handle }
else use te.TxID.Cmp(...)) to avoid nil-pointer panics and provide a clear
error/assertion message; reference TransactionEvent.TxID and the places in
events.go where TxID.Cmp is called.
In `@workload/internal/state.go`:
- Around line 33-34: The standalone "//nolint:errcheck" above the defer does not
suppress errcheck for the deferred call; update the defer so the linter
directive is on the same line as the call (i.e., change the standalone comment
to an inline comment on the defer statement) so that defer etcdClient.Close()
//nolint:errcheck correctly suppresses errcheck for etcdClient.Close().
---
Nitpick comments:
In `@config/Justfile`:
- Around line 3-5: Remove the redundant helm dependency build invocation after
helm dependency update and ensure the LEDGER_PREVIOUS_TAG is quoted or guarded
when passed to helm template: delete the `helm dependency build` line, and
change the helm template invocation that sets ledger.previous_tag (the current
line using $LEDGER_PREVIOUS_TAG) to either wrap the variable in quotes or add a
guard to avoid passing an empty value (e.g., check presence of
LEDGER_PREVIOUS_TAG before calling helm template or quote the variable in the
--set expression) so Helm does not receive an unintended empty string.
In `@workload/bin/cmds/eventually_correct/events.go`:
- Around line 25-53: The checkEvents function currently hardcodes the Kafka
bootstrap.servers value and the topic name when creating the consumer and
calling consumer.SubscribeTopics; change checkEvents to read the bootstrap
server address and topic(s) from configuration (e.g., environment variables or
passed parameters) instead of the literal
"formance-kafka-bootstrap.kafka.svc.cluster.local:9092" and "stack0.ledger",
validate they are present, and use those variables in the kafka.NewConsumer
config map and consumer.SubscribeTopics call (also ensure the group.id formation
using random.GetRandom() remains unchanged).
- Around line 105-131: checkers currently stores EventChecker by value which
requires manual write-back after mutations; change the map type from
map[string]EventChecker to map[string]*EventChecker, instantiate entries with
&EventChecker{...} when populating in checkForAllLedgers, and update all usages
(including where you do checker := checkers[eventTx.Ledger]) to work with
pointers so findInBuffer and fetchNext (pointer receivers) mutate the stored
object directly and you can remove the manual write-back
(checkers[eventTx.Ledger] = checker).
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (2)
workload/bin/cmds/parallel_driver_transaction_metadata/main.go (1)
74-74: Consider using a fresh context formutex.Unlockto guard against context cancellation.
defer mutex.Unlock(ctx)re-uses the caller-supplied context. Ifctxis ever cancelled before the deferred call fires, the etcd Unlock RPC will be skipped and the distributed lock stays held until the session lease expires. Currently harmless (caller always passescontext.Background()), but the function signature accepts any context.♻️ Proposed fix — decouple Unlock from the caller's context
- defer mutex.Unlock(ctx) //nolint:errcheck + defer mutex.Unlock(context.Background()) //nolint:errcheck🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workload/bin/cmds/parallel_driver_transaction_metadata/main.go` at line 74, The deferred call defer mutex.Unlock(ctx) reuses the caller's ctx and can fail to run if that context is canceled; replace it with a deferred anonymous function that creates a fresh context (e.g., context.WithTimeout(context.Background(), <short timeout>)), defers the cancel, calls mutex.Unlock with that fresh context, and handles/logs the error instead of relying on the caller's ctx; target the mutex.Unlock call and the surrounding function that accepts ctx to implement this change.workload/bin/cmds/eventually_correct/events.go (1)
29-34: Optional: make the Kafka bootstrap address configurable.
formance-kafka-bootstrap.kafka.svc.cluster.local:9092is hardcoded. A mismatch between the deployed cluster name and this string causes a silent Kafka connection failure at test time.♻️ Suggested refactor
- consumer, err := kafka.NewConsumer(&kafka.ConfigMap{ - "bootstrap.servers": "formance-kafka-bootstrap.kafka.svc.cluster.local:9092", + bootstrapServers := os.Getenv("KAFKA_BOOTSTRAP_SERVERS") + if bootstrapServers == "" { + bootstrapServers = "formance-kafka-bootstrap.kafka.svc.cluster.local:9092" + } + consumer, err := kafka.NewConsumer(&kafka.ConfigMap{ + "bootstrap.servers": bootstrapServers,🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workload/bin/cmds/eventually_correct/events.go` around lines 29 - 34, The bootstrap.servers value is hardcoded in the kafka.NewConsumer ConfigMap (the key "bootstrap.servers" in the consumer creation block) which can cause silent failures; change it to read a configurable value (environment variable or CLI flag) with a sensible default (e.g., "formance-kafka-bootstrap.kafka.svc.cluster.local:9092") and use that variable when building the ConfigMap for kafka.NewConsumer, keeping the rest of the options ("group.id", "auto.offset.reset", "enable.auto.commit") unchanged so deployments can override the Kafka address without modifying events.go.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@workload/bin/cmds/eventually_correct/events.go`:
- Around line 208-213: The end-of-stream check currently only compares partition
0 (using assignment[0].Partition and pos[0].Offset) which causes early exit for
multi-partition topics; update the loop around the events label to iterate all
assigned partitions (use assignment slice and pos slice) and for each call
consumer.GetWatermarkOffsets(partition) and compare pos[i].Offset against that
partition's high; only break out of events when every assigned partition's
offset >= its high. Ensure you reference and update the logic where
consumer.GetWatermarkOffsets, assignment, pos, and the events loop label are
used so all partitions are considered.
- Around line 247-263: The function transactionEventMatches currently compares
pointer addresses causing a false negative; update transactionEventMatches to
(1) nil-check actual.InsertedAt and event.Transactions[0].InsertedAt and compare
the time values (dereference and use Equal/Compare) instead of pointer
addresses, (2) add nil guards for each posting.Amount in the loop before calling
Amount.Cmp (handle cases where one is nil and the other isn't), and (3) keep the
existing postings length and field equality checks (Asset, Source, Destination)
but perform Amount comparisons only after the nil-safe checks; target the
transactionEventMatches function and fields InsertedAt and Posting.Amount to
implement these changes.
---
Duplicate comments:
In `@workload/bin/cmds/eventually_correct/events.go`:
- Around line 165-186: This is a duplicate/obsolete bounds-check concern: both
checker.data[0] and eventTx.Transactions[0] are already guarded
(len(checker.data)>0 and ExtractEventPayload validator ensures
eventTx.Transactions non-empty), so resolve the review by removing the duplicate
comment or marking it resolved; no code changes needed in the block that
references checker.data[0], eventTx.Transactions[0], the fetchNext/findInBuffer
loop, or the ExtractEventPayload validator.
- Around line 86-103: The access to Transactions[0].TxID in findInBuffer is safe
because eventBuffer entries are validated by ExtractEventPayload, so add a brief
inline comment in findInBuffer (or immediately above it) stating that
ExtractEventPayload enforces len(tx.Transactions) == 1 and non-nil TxID,
referencing eventBuffer, Transactions[0] and TxID to justify the direct indexing
and silence the prior concern; no runtime guard is needed.
In `@workload/bin/init/main.go`:
- Around line 67-70: The current code correctly uses apierrors.IsAlreadyExists
to make resource creation idempotent; keep the guard as-is and retain the
fmt.Printf("Stack already exists\n") call (with the trailing newline) to log and
return quietly, and ensure all other errors continue to propagate to panic as
the init fail-fast policy (no code changes required to apierrors.IsAlreadyExists
or the fmt.Printf call).
In `@workload/internal/events.go`:
- Line 48: The reviewer notes that the prior nil-TxID concern is already handled
by the validator used in ExtractEventPayload within the consumer, which rejects
events with TxID == nil; update the code/review by removing the redundant
nil-checks or the duplicate review comment and mark the issue as resolved,
referencing TxID and ExtractEventPayload so reviewers know the validation lives
in the consumer path and no change to downstream TxID.Cmp(...) call sites is
necessary.
---
Nitpick comments:
In `@workload/bin/cmds/eventually_correct/events.go`:
- Around line 29-34: The bootstrap.servers value is hardcoded in the
kafka.NewConsumer ConfigMap (the key "bootstrap.servers" in the consumer
creation block) which can cause silent failures; change it to read a
configurable value (environment variable or CLI flag) with a sensible default
(e.g., "formance-kafka-bootstrap.kafka.svc.cluster.local:9092") and use that
variable when building the ConfigMap for kafka.NewConsumer, keeping the rest of
the options ("group.id", "auto.offset.reset", "enable.auto.commit") unchanged so
deployments can override the Kafka address without modifying events.go.
In `@workload/bin/cmds/parallel_driver_transaction_metadata/main.go`:
- Line 74: The deferred call defer mutex.Unlock(ctx) reuses the caller's ctx and
can fail to run if that context is canceled; replace it with a deferred
anonymous function that creates a fresh context (e.g.,
context.WithTimeout(context.Background(), <short timeout>)), defers the cancel,
calls mutex.Unlock with that fresh context, and handles/logs the error instead
of relying on the caller's ctx; target the mutex.Unlock call and the surrounding
function that accepts ctx to implement this change.
3fbd4a7 to
efcd6e0
Compare
There was a problem hiding this comment.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@workload/bin/cmds/eventually_correct/events.go`:
- Around line 247-260: transactionEventMatches has nil-dereference risks: guard
against nil pointers on the SDK-side `actual` before dereferencing. In the
function transactionEventMatches, first check that actual.InsertedAt != nil and
event.Transactions[0].InsertedAt != nil (or treat missing actual as non-match)
before using *actual.InsertedAt; likewise for each posting ensure posting.Amount
!= nil and event.Transactions[0].Postings[idx].Amount != nil before calling
posting.Amount.Cmp(...); if any required field on `actual` is nil return false
to avoid panics and keep callers' semantics.
- Around line 208-214: The watermark check only inspects partition 0 (using
assignment[0].Partition and pos[0].Offset) which can exit early for
multi-partition topics; update the logic in the loop that uses
GetWatermarkOffsets, assignment and pos so you iterate over all assigned
partitions (e.g., for each element in assignment), call
consumer.GetWatermarkOffsets for each partition, compare the corresponding pos
entry (match by Partition) against that partition's high watermark, and only
break from the events loop when every partition's pos has reached or exceeded
its high watermark (otherwise continue waiting).
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
workload/bin/cmds/eventually_correct/events/transactions.go (1)
44-44: Consider extracting the hardcoded topic name"stack0.ledger"to a constant.The topic string appears at lines 44 and 210. A shared constant would prevent silent drift if one is updated without the other.
Also applies to: 210-210
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workload/bin/cmds/eventually_correct/events/transactions.go` at line 44, Extract the hardcoded topic string "stack0.ledger" into a single package-level constant (e.g., LedgerTopic or TopicStack0Ledger) and replace all occurrences (the call to consumer.SubscribeTopics([]string{"stack0.ledger"}, nil) and the other use around the later reference) with that constant; add the constant near the top of transactions.go so both consumer.SubscribeTopics and the other usage reference the same identifier to prevent drift.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@workload/bin/cmds/eventually_correct/events/transactions.go`:
- Line 197: Fix the typo in the inline comment that reads "try to get assigment
if not fetched yet" by changing "assigment" to "assignment" so it reads "try to
get assignment if not fetched yet"; update this comment in
workload/bin/cmds/eventually_correct/events/transactions.go (the comment above
the assignment-fetch logic) to correct the spelling.
- Around line 167-188: The code currently treats a zero-value EventChecker for
unknown eventTx.Ledger as a present checker and will buffer events until it
falsely triggers the ordering assert; fix this by explicitly testing map
membership (e.g., checker, ok := checkers[eventTx.Ledger]) and handling the
missing-case: if !ok, initialize and store a new EventChecker with eventBuffer
containing the event (or otherwise enqueue it in a dedicated pending map) and
continue, otherwise proceed with the existing logic that uses checker.data,
fetchNext, findInBuffer and finally writes back checkers[eventTx.Ledger].
---
Duplicate comments:
In `@workload/bin/cmds/eventually_correct/events/transactions.go`:
- Around line 196-216: The end-of-stream check only inspects partition 0, so
when assignment has multiple partitions the loop can exit early; update the
logic that uses assignment[0].Partition, pos[0].Offset, consumer.Position and
consumer.GetWatermarkOffsets to iterate over all partitions in assignment (or
map positions by partition), fetch each partition's watermark via
GetWatermarkOffsets(topic, partition) and compare the corresponding pos offset
for that partition against its high watermark, and only break out of the events
loop when every partition's position has reached or exceeded its high watermark.
---
Nitpick comments:
In `@workload/bin/cmds/eventually_correct/events/transactions.go`:
- Line 44: Extract the hardcoded topic string "stack0.ledger" into a single
package-level constant (e.g., LedgerTopic or TopicStack0Ledger) and replace all
occurrences (the call to consumer.SubscribeTopics([]string{"stack0.ledger"},
nil) and the other use around the later reference) with that constant; add the
constant near the top of transactions.go so both consumer.SubscribeTopics and
the other usage reference the same identifier to prevent drift.
There was a problem hiding this comment.
🧹 Nitpick comments (4)
Justfile (1)
11-11:docker.io/natsio/nats-box:0.19.2is confirmed valid, but a newer patch is available.Docker Hub shows the most recent
nats-boxtag is0.19.3-nonroot. Version0.19.2is confirmed to exist (a layer digest page is present on Docker Hub), so there's no breakage, but consider pinning to0.19.3(or the equivalent non-root variant) if it fits your base image requirements.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@Justfile` at line 11, The antithesis.images list currently pins docker.io/natsio/nats-box:0.19.2; update this to the newer patch tag (e.g., docker.io/natsio/nats-box:0.19.3 or docker.io/natsio/nats-box:0.19.3-nonroot) in the "antithesis.images" value to pick the latest patch, and verify the nonroot variant fits your base-image/permission requirements before committing the change.workload/bin/cmds/eventually_correct/events/transactions.go (2)
186-191: Buffer threshold of 16 is a magic number — consider extracting a named constant.The
> 16threshold silently controls how much reordering is tolerated before triggeringassert.Unreachable. A named constant (e.g.,maxEventBufferSize) would make the tolerance explicit, easier to tune, and self-documenting.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workload/bin/cmds/eventually_correct/events/transactions.go` around lines 186 - 191, Replace the magic literal 16 with a named constant (e.g., maxEventBufferSize) to document and centralize the buffer threshold: define const maxEventBufferSize = 16 near the top of the file or next to the checker type, then change the condition in the transactions handling code from "len(checker.eventBuffer) > 16" to "len(checker.eventBuffer) > maxEventBufferSize" (leaving the append of checker.eventBuffer and the assert.Unreachable call unchanged) so the tolerance is explicit and easy to tune.
253-269:transactionEventMatchesdoesn't compare the transaction ID itself.The function checks
InsertedAt, posting count, amounts, and posting fields — but never verifies that the IDs match. The caller at line 95 already matched on ID to locate the buffer entry, so this is currently safe, but the direct-match path at line 173 also calls this function after an ID equality check (line 172). If the call sites ever change, the omission could silently pass mismatched transactions.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workload/bin/cmds/eventually_correct/events/transactions.go` around lines 253 - 269, transactionEventMatches currently omits verifying the transaction ID, so add a direct equality check between the transaction ID on the shared.V2Transaction parameter and the ID of the first transaction in events.CommittedTransactions (i.e., compare the ID field on actual with event.Transactions[0].ID or the equivalent transaction ID field used in your types) and return false if they differ before proceeding to the existing InsertedAt and postings checks; update the function transactionEventMatches to include this ID comparison as the first guard to ensure mismatched transactions cannot pass.workload/bin/cmds/eventually_correct/main.go (1)
23-25: No pagination follow-up — only the first page of ledgers is used.
ListLedgersis called withPageSize: 100but the response cursor'sNextis never followed. If the environment ever has more than 100 ledgers, the surplus will be silently excluded from both the per-ledger checks andCheckTransactions, leading to incomplete verification without any warning.Probably fine for the current test setup, but worth a brief comment or a log/assertion to surface the assumption.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workload/bin/cmds/eventually_correct/main.go` around lines 23 - 25, The ListLedgers call only fetches the first page (client.Ledger.V2.ListLedgers with operations.V2ListLedgersRequest PageSize:100) so any ledgers beyond the first page are ignored; update the code that populates the ledgers variable to follow the response pagination cursor (check the response's Next/Cursor field and loop calling ListLedgers with that cursor until no more pages), aggregating results into the ledgers slice, or if you intentionally expect <=100 ledgers add an explicit log/assert near the ListLedgers call that validates the total count or documents the assumption.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@workload/bin/cmds/eventually_correct/events/transactions.go`:
- Around line 210-220: The code currently assumes a single partition by calling
consumer.GetWatermarkOffsets and indexing pos[0] for assignment[0]; instead,
iterate over all assigned partitions in assignment, call
consumer.GetWatermarkOffsets("stack0.ledger", partition) for each partition and
compare each partition's position (from consumer.Position(assignment) results)
against its corresponding high watermark, and only break out of the events loop
when every assigned partition's offset has reached or exceeded its high
watermark; update references to pos[0].Offset and assignment[0].Partition to use
the looped partition values so no partition is ignored.
---
Nitpick comments:
In `@Justfile`:
- Line 11: The antithesis.images list currently pins
docker.io/natsio/nats-box:0.19.2; update this to the newer patch tag (e.g.,
docker.io/natsio/nats-box:0.19.3 or docker.io/natsio/nats-box:0.19.3-nonroot) in
the "antithesis.images" value to pick the latest patch, and verify the nonroot
variant fits your base-image/permission requirements before committing the
change.
In `@workload/bin/cmds/eventually_correct/events/transactions.go`:
- Around line 186-191: Replace the magic literal 16 with a named constant (e.g.,
maxEventBufferSize) to document and centralize the buffer threshold: define
const maxEventBufferSize = 16 near the top of the file or next to the checker
type, then change the condition in the transactions handling code from
"len(checker.eventBuffer) > 16" to "len(checker.eventBuffer) >
maxEventBufferSize" (leaving the append of checker.eventBuffer and the
assert.Unreachable call unchanged) so the tolerance is explicit and easy to
tune.
- Around line 253-269: transactionEventMatches currently omits verifying the
transaction ID, so add a direct equality check between the transaction ID on the
shared.V2Transaction parameter and the ID of the first transaction in
events.CommittedTransactions (i.e., compare the ID field on actual with
event.Transactions[0].ID or the equivalent transaction ID field used in your
types) and return false if they differ before proceeding to the existing
InsertedAt and postings checks; update the function transactionEventMatches to
include this ID comparison as the first guard to ensure mismatched transactions
cannot pass.
In `@workload/bin/cmds/eventually_correct/main.go`:
- Around line 23-25: The ListLedgers call only fetches the first page
(client.Ledger.V2.ListLedgers with operations.V2ListLedgersRequest PageSize:100)
so any ledgers beyond the first page are ignored; update the code that populates
the ledgers variable to follow the response pagination cursor (check the
response's Next/Cursor field and loop calling ListLedgers with that cursor until
no more pages), aggregating results into the ledgers slice, or if you
intentionally expect <=100 ledgers add an explicit log/assert near the
ListLedgers call that validates the total count or documents the assumption.
5d7e49a to
1afa15c
Compare
1ab520e to
efca139
Compare
eventually_correctto ensure at least once & at most once delivery of CommittedTransaction events