Conversation
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Use the checkbox below for a quick retry:
WalkthroughA new ChainBridge plugin implementation that enables account and balance retrieval from ChainBridge sources. The plugin includes client API communication, configuration validation, state-based account pagination, and comprehensive test coverage across accounts and balances workflows. Changes
Sequence Diagram(s)sequenceDiagram
participant Registry
participant PluginFactory as Plugin Factory
participant Config as Config Validator
participant HTTPClient as HTTP Client
participant ChainBridgeAPI as ChainBridge API
Registry->>PluginFactory: New(name, logger, rawConfig)
PluginFactory->>Config: unmarshalAndValidateConfig(rawConfig)
Config->>Config: Validate APIKey, Endpoint
Config-->>PluginFactory: Config
PluginFactory->>HTTPClient: New(connectorName, apiKey, endpoint)
HTTPClient->>HTTPClient: Configure Bearer Auth Transport
HTTPClient-->>PluginFactory: Client
PluginFactory-->>Registry: Plugin
sequenceDiagram
participant App as Application
participant Plugin
participant Client as HTTP Client
participant ChainBridgeAPI as ChainBridge API
participant StateParser as State Parser
App->>Plugin: FetchNextAccounts(ctx, req{State})
Plugin->>StateParser: Parse req.State (if provided)
StateParser-->>Plugin: accountsState{LastCreatedAt}
Plugin->>Client: GetMonitors(ctx)
Client->>ChainBridgeAPI: GET /monitors
ChainBridgeAPI-->>Client: []*Monitor
Client-->>Plugin: []*Monitor
Plugin->>Plugin: Filter by LastCreatedAt
Plugin->>Plugin: Construct PSPAccount entries
Plugin->>StateParser: Serialize new accountsState
StateParser-->>Plugin: serialized state
Plugin-->>App: FetchNextAccountsResponse{accounts, newState, HasMore=false}
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@internal/connectors/plugins/public/chainbridge/accounts.go`:
- Around line 28-57: The current deduplication assumes monitors are sorted by
CreatedAt: the loop uses oldState.LastCreatedAt to skip entries and then sets
newState.LastCreatedAt from accounts[len(accounts)-1].CreatedAt, which breaks if
monitors isn't ordered. Fix by not relying on order: while iterating monitors
(variable monitors) still filter using oldState.LastCreatedAt, but also track
the maximum CreatedAt seen (e.g., maxCreated := oldState.LastCreatedAt) and
update it for every processed monitor; after the loop set newState.LastCreatedAt
= maxCreated (in the accountsState struct), ensuring correct watermarking
regardless of input order.
In `@internal/connectors/plugins/public/chainbridge/client/client.go`:
- Around line 24-27: The RoundTrip implementation mutates the incoming
*http.Request (req) which violates http.RoundTripper; clone the request before
changing headers: create a new request via req.Clone(req.Context()) or by
shallow-copying req and copying the Header map, set the "Authorization" header
on the cloned request, and then call t.underlying.RoundTrip with the cloned
request (keep apiTransport.RoundTrip, t.apiKey, and t.underlying.RoundTrip as
the referenced symbols).
In `@internal/connectors/plugins/public/chainbridge/plugin_test.go`:
- Around line 44-53: The test is calling Install on the uninitialized plg from
BeforeEach instead of the configured instance returned by New; replace the
discarded result of New("chainbridge", logger, config) with the plugin variable
used in the test (assign the returned plugin to plg or a local variable) and use
that configured instance when calling Install(ctx, req), ensuring you check the
New(...) error before proceeding so the test verifies installation on the
properly configured plugin (referencing New, plg, and Install).
In `@internal/connectors/plugins/public/chainbridge/workflow.go`:
- Around line 5-19: The workflow currently returns two top-level tasks
(workflow(), TASK_FETCH_ACCOUNTS "fetch_monitors" and TASK_FETCH_BALANCES
"fetch_balances") running in parallel; move "fetch_balances" to be a child of
"fetch_monitors" by adding the TASK_FETCH_BALANCES task to the NextTasks slice
of the TASK_FETCH_ACCOUNTS entry (preserve Periodically flags and empty
NextTasks on the leaf) so balances are fetched only after monitors/accounts are
created and avoid orphaned MonitorID references.
| accounts := make([]models.PSPAccount, 0, len(monitors)) | ||
| for _, m := range monitors { | ||
| if !oldState.LastCreatedAt.IsZero() && !m.CreatedAt.After(oldState.LastCreatedAt) { | ||
| continue | ||
| } | ||
|
|
||
| raw, err := json.Marshal(m) | ||
| if err != nil { | ||
| return models.FetchNextAccountsResponse{}, err | ||
| } | ||
|
|
||
| accounts = append(accounts, models.PSPAccount{ | ||
| Reference: m.ID, | ||
| CreatedAt: m.CreatedAt, | ||
| Name: &m.Address, | ||
| Metadata: map[string]string{ | ||
| "chain": m.Chain, | ||
| "address": m.Address, | ||
| "status": m.Status, | ||
| }, | ||
| Raw: raw, | ||
| }) | ||
| } | ||
|
|
||
| newState := accountsState{ | ||
| LastCreatedAt: oldState.LastCreatedAt, | ||
| } | ||
| if len(accounts) > 0 { | ||
| newState.LastCreatedAt = accounts[len(accounts)-1].CreatedAt | ||
| } |
There was a problem hiding this comment.
State-based deduplication assumes monitors are sorted by CreatedAt ascending.
The filtering on Line 30 skips monitors with CreatedAt <= oldState.LastCreatedAt, and Line 56 sets the new watermark from the last element. If the ChainBridge API returns monitors in a different order (e.g., descending, or by ID), accounts will be silently skipped or re-ingested.
Either:
- Sort
monitorsbyCreatedAtbefore iterating, or - Iterate all monitors and track the max
CreatedAtseparately.
Option 2 is more robust:
Proposed fix: track max CreatedAt independently of order
accounts := make([]models.PSPAccount, 0, len(monitors))
+ maxCreatedAt := oldState.LastCreatedAt
for _, m := range monitors {
if !oldState.LastCreatedAt.IsZero() && !m.CreatedAt.After(oldState.LastCreatedAt) {
continue
}
raw, err := json.Marshal(m)
if err != nil {
return models.FetchNextAccountsResponse{}, err
}
accounts = append(accounts, models.PSPAccount{
Reference: m.ID,
CreatedAt: m.CreatedAt,
Name: &m.Address,
Metadata: map[string]string{
"chain": m.Chain,
"address": m.Address,
"status": m.Status,
},
Raw: raw,
})
+ if m.CreatedAt.After(maxCreatedAt) {
+ maxCreatedAt = m.CreatedAt
+ }
}
newState := accountsState{
- LastCreatedAt: oldState.LastCreatedAt,
- }
- if len(accounts) > 0 {
- newState.LastCreatedAt = accounts[len(accounts)-1].CreatedAt
+ LastCreatedAt: maxCreatedAt,
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| accounts := make([]models.PSPAccount, 0, len(monitors)) | |
| for _, m := range monitors { | |
| if !oldState.LastCreatedAt.IsZero() && !m.CreatedAt.After(oldState.LastCreatedAt) { | |
| continue | |
| } | |
| raw, err := json.Marshal(m) | |
| if err != nil { | |
| return models.FetchNextAccountsResponse{}, err | |
| } | |
| accounts = append(accounts, models.PSPAccount{ | |
| Reference: m.ID, | |
| CreatedAt: m.CreatedAt, | |
| Name: &m.Address, | |
| Metadata: map[string]string{ | |
| "chain": m.Chain, | |
| "address": m.Address, | |
| "status": m.Status, | |
| }, | |
| Raw: raw, | |
| }) | |
| } | |
| newState := accountsState{ | |
| LastCreatedAt: oldState.LastCreatedAt, | |
| } | |
| if len(accounts) > 0 { | |
| newState.LastCreatedAt = accounts[len(accounts)-1].CreatedAt | |
| } | |
| accounts := make([]models.PSPAccount, 0, len(monitors)) | |
| maxCreatedAt := oldState.LastCreatedAt | |
| for _, m := range monitors { | |
| if !oldState.LastCreatedAt.IsZero() && !m.CreatedAt.After(oldState.LastCreatedAt) { | |
| continue | |
| } | |
| raw, err := json.Marshal(m) | |
| if err != nil { | |
| return models.FetchNextAccountsResponse{}, err | |
| } | |
| accounts = append(accounts, models.PSPAccount{ | |
| Reference: m.ID, | |
| CreatedAt: m.CreatedAt, | |
| Name: &m.Address, | |
| Metadata: map[string]string{ | |
| "chain": m.Chain, | |
| "address": m.Address, | |
| "status": m.Status, | |
| }, | |
| Raw: raw, | |
| }) | |
| if m.CreatedAt.After(maxCreatedAt) { | |
| maxCreatedAt = m.CreatedAt | |
| } | |
| } | |
| newState := accountsState{ | |
| LastCreatedAt: maxCreatedAt, | |
| } |
🤖 Prompt for AI Agents
In `@internal/connectors/plugins/public/chainbridge/accounts.go` around lines 28 -
57, The current deduplication assumes monitors are sorted by CreatedAt: the loop
uses oldState.LastCreatedAt to skip entries and then sets newState.LastCreatedAt
from accounts[len(accounts)-1].CreatedAt, which breaks if monitors isn't
ordered. Fix by not relying on order: while iterating monitors (variable
monitors) still filter using oldState.LastCreatedAt, but also track the maximum
CreatedAt seen (e.g., maxCreated := oldState.LastCreatedAt) and update it for
every processed monitor; after the loop set newState.LastCreatedAt = maxCreated
(in the accountsState struct), ensuring correct watermarking regardless of input
order.
| func (t *apiTransport) RoundTrip(req *http.Request) (*http.Response, error) { | ||
| req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", t.apiKey)) | ||
| return t.underlying.RoundTrip(req) | ||
| } |
There was a problem hiding this comment.
RoundTrip mutates the incoming request, violating the http.RoundTripper contract.
Per the http.RoundTripper documentation, RoundTrip should not modify the original request. Mutating headers on the original *http.Request can cause data races if the request is reused or read concurrently (e.g., by retry logic or redirect handling).
Proposed fix: clone the request before modifying headers
func (t *apiTransport) RoundTrip(req *http.Request) (*http.Response, error) {
- req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", t.apiKey))
- return t.underlying.RoundTrip(req)
+ r := req.Clone(req.Context())
+ r.Header.Set("Authorization", fmt.Sprintf("Bearer %s", t.apiKey))
+ return t.underlying.RoundTrip(r)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (t *apiTransport) RoundTrip(req *http.Request) (*http.Response, error) { | |
| req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", t.apiKey)) | |
| return t.underlying.RoundTrip(req) | |
| } | |
| func (t *apiTransport) RoundTrip(req *http.Request) (*http.Response, error) { | |
| r := req.Clone(req.Context()) | |
| r.Header.Set("Authorization", fmt.Sprintf("Bearer %s", t.apiKey)) | |
| return t.underlying.RoundTrip(r) | |
| } |
🤖 Prompt for AI Agents
In `@internal/connectors/plugins/public/chainbridge/client/client.go` around lines
24 - 27, The RoundTrip implementation mutates the incoming *http.Request (req)
which violates http.RoundTripper; clone the request before changing headers:
create a new request via req.Clone(req.Context()) or by shallow-copying req and
copying the Header map, set the "Authorization" header on the cloned request,
and then call t.underlying.RoundTrip with the cloned request (keep
apiTransport.RoundTrip, t.apiKey, and t.underlying.RoundTrip as the referenced
symbols).
| It("should return valid install response", func(ctx SpecContext) { | ||
| config := json.RawMessage(`{"apiKey": "test", "endpoint": "https://example.com"}`) | ||
| _, err := New("chainbridge", logger, config) | ||
| Expect(err).To(BeNil()) | ||
| req := models.InstallRequest{} | ||
| res, err := plg.Install(ctx, req) | ||
| Expect(err).To(BeNil()) | ||
| Expect(len(res.Workflow) > 0).To(BeTrue()) | ||
| Expect(res.Workflow).To(Equal(workflow())) | ||
| }) |
There was a problem hiding this comment.
Install test uses the wrong plugin instance.
New("chainbridge", logger, config) creates a fully configured plugin, but the result is discarded. plg.Install(ctx, req) is then called on the bare plg from BeforeEach (no client, no config). While this may work if Install only returns the workflow, the test doesn't actually verify that a properly configured plugin installs correctly.
Suggested fix
It("should return valid install response", func(ctx SpecContext) {
config := json.RawMessage(`{"apiKey": "test", "endpoint": "https://example.com"}`)
- _, err := New("chainbridge", logger, config)
+ p, err := New("chainbridge", logger, config)
Expect(err).To(BeNil())
req := models.InstallRequest{}
- res, err := plg.Install(ctx, req)
+ res, err := p.Install(ctx, req)
Expect(err).To(BeNil())
Expect(len(res.Workflow) > 0).To(BeTrue())
Expect(res.Workflow).To(Equal(workflow()))
})📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| It("should return valid install response", func(ctx SpecContext) { | |
| config := json.RawMessage(`{"apiKey": "test", "endpoint": "https://example.com"}`) | |
| _, err := New("chainbridge", logger, config) | |
| Expect(err).To(BeNil()) | |
| req := models.InstallRequest{} | |
| res, err := plg.Install(ctx, req) | |
| Expect(err).To(BeNil()) | |
| Expect(len(res.Workflow) > 0).To(BeTrue()) | |
| Expect(res.Workflow).To(Equal(workflow())) | |
| }) | |
| It("should return valid install response", func(ctx SpecContext) { | |
| config := json.RawMessage(`{"apiKey": "test", "endpoint": "https://example.com"}`) | |
| p, err := New("chainbridge", logger, config) | |
| Expect(err).To(BeNil()) | |
| req := models.InstallRequest{} | |
| res, err := p.Install(ctx, req) | |
| Expect(err).To(BeNil()) | |
| Expect(len(res.Workflow) > 0).To(BeTrue()) | |
| Expect(res.Workflow).To(Equal(workflow())) | |
| }) |
🤖 Prompt for AI Agents
In `@internal/connectors/plugins/public/chainbridge/plugin_test.go` around lines
44 - 53, The test is calling Install on the uninitialized plg from BeforeEach
instead of the configured instance returned by New; replace the discarded result
of New("chainbridge", logger, config) with the plugin variable used in the test
(assign the returned plugin to plg or a local variable) and use that configured
instance when calling Install(ctx, req), ensuring you check the New(...) error
before proceeding so the test verifies installation on the properly configured
plugin (referencing New, plg, and Install).
| func workflow() models.ConnectorTasksTree { | ||
| return []models.ConnectorTaskTree{ | ||
| { | ||
| TaskType: models.TASK_FETCH_ACCOUNTS, | ||
| Name: "fetch_monitors", | ||
| Periodically: true, | ||
| NextTasks: []models.ConnectorTaskTree{}, | ||
| }, | ||
| { | ||
| TaskType: models.TASK_FETCH_BALANCES, | ||
| Name: "fetch_balances", | ||
| Periodically: true, | ||
| NextTasks: []models.ConnectorTaskTree{}, | ||
| }, | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
rg -n "TASK_FETCH_BALANCES" --type=go -B5 -A5 -g '*/workflow.go' | head -100Repository: formancehq/payments
Length of output: 211
🏁 Script executed:
# Also check the chainbridge connector's balance and account handling logic
fd -e go chainbridge | grep -E "(balance|account)" | head -20Repository: formancehq/payments
Length of output: 45
🏁 Script executed:
# Look at the chainbridge workflow file more completely
fd workflow.go -path "*/chainbridge/*"Repository: formancehq/payments
Length of output: 231
🏁 Script executed:
# Find all workflow.go files in the connectors directory
find . -path "*/connectors/plugins/public/*/workflow.go" -type fRepository: formancehq/payments
Length of output: 1151
🏁 Script executed:
# Search for TASK_FETCH_BALANCES in go files without type restriction
rg "TASK_FETCH_BALANCES" -B5 -A5 internal/connectors/plugins/public/Repository: formancehq/payments
Length of output: 14128
🏁 Script executed:
# Look at the chainbridge workflow file directly
cat internal/connectors/plugins/public/chainbridge/workflow.goRepository: formancehq/payments
Length of output: 543
🏁 Script executed:
cat internal/connectors/plugins/public/chainbridge/balances.go | head -80Repository: formancehq/payments
Length of output: 1072
🏁 Script executed:
cat internal/connectors/plugins/public/chainbridge/accounts.go | head -80Repository: formancehq/payments
Length of output: 1621
Nest fetch_balances under fetch_monitors to ensure account creation before balance fetching.
Balances reference monitors via MonitorID as AccountReference (line 28 in balances.go). With parallel task execution, balances could arrive before their corresponding accounts are created, causing orphaned balance references. All other connectors (qonto, wise, modulr, stripe, increase, generic, fireblocks, etc.) nest balance fetching under account fetching for this reason.
Suggested change
func workflow() models.ConnectorTasksTree {
return []models.ConnectorTaskTree{
{
TaskType: models.TASK_FETCH_ACCOUNTS,
Name: "fetch_monitors",
Periodically: true,
- NextTasks: []models.ConnectorTaskTree{},
- },
- {
- TaskType: models.TASK_FETCH_BALANCES,
- Name: "fetch_balances",
- Periodically: true,
- NextTasks: []models.ConnectorTaskTree{},
+ NextTasks: []models.ConnectorTaskTree{
+ {
+ TaskType: models.TASK_FETCH_BALANCES,
+ Name: "fetch_balances",
+ Periodically: true,
+ NextTasks: []models.ConnectorTaskTree{},
+ },
+ },
},
}
}🤖 Prompt for AI Agents
In `@internal/connectors/plugins/public/chainbridge/workflow.go` around lines 5 -
19, The workflow currently returns two top-level tasks (workflow(),
TASK_FETCH_ACCOUNTS "fetch_monitors" and TASK_FETCH_BALANCES "fetch_balances")
running in parallel; move "fetch_balances" to be a child of "fetch_monitors" by
adding the TASK_FETCH_BALANCES task to the NextTasks slice of the
TASK_FETCH_ACCOUNTS entry (preserve Periodically flags and empty NextTasks on
the leaf) so balances are fetched only after monitors/accounts are created and
avoid orphaned MonitorID references.
Chain Bridge now emits amounts as JSON numbers. Update the client to parse directly into *big.Int, removing the ParseAmount string helper. Also adds a README for the chainbridge connector. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Summary
Test plan
🤖 Generated with Claude Code