salesforce: introduce inputs and align sink output#4355
Conversation
6c73578 to
6885768
Compare
| func (e *salesforceCDCInputExecutor) saveState(ctx context.Context) error { | ||
| e.mu.Lock() | ||
| state := persistState{ | ||
| SnapshotComplete: e.snapshotComplete, | ||
| RestCursor: e.restCursor, | ||
| Topics: make(TopicReplays, len(e.topicReplays)), | ||
| } | ||
| maps.Copy(state.Topics, e.topicReplays) | ||
| prev := e.lastPersisted | ||
| e.mu.Unlock() | ||
|
|
||
| b, err := json.Marshal(state) | ||
| if err != nil { | ||
| return fmt.Errorf("marshal checkpoint state: %w", err) | ||
| } | ||
| if bytes.Equal(prev, b) { | ||
| return nil | ||
| } | ||
|
|
||
| var setErr error | ||
| if err := e.mgr.AccessCache(ctx, e.conf.Checkpoint.Cache, func(cache service.Cache) { | ||
| setErr = cache.Set(ctx, e.conf.Checkpoint.CacheKey, b, nil) | ||
| }); err != nil { | ||
| return fmt.Errorf("access cache %q: %w", e.conf.Checkpoint.Cache, err) | ||
| } | ||
| if setErr != nil { | ||
| return fmt.Errorf("set cache key %q: %w", e.conf.Checkpoint.CacheKey, setErr) | ||
| } | ||
|
|
||
| e.mu.Lock() | ||
| e.lastPersisted = b | ||
| e.mu.Unlock() | ||
| return nil | ||
| } |
There was a problem hiding this comment.
saveState releases e.mu between snapshotting in-memory state and writing to the cache. When multiple per-topic ack callbacks run concurrently (which they will — every topic goroutine calls this independently), two concurrent calls can:
- G1 reads state S1, releases lock, starts
cache.Set(b1). - G2 reads state S2 (newer, contains an additional topic's replay), releases lock, starts
cache.Set(b2). - The cache writes can complete in either order. If
b1lands last, the cache holds older state than memory. - After the writes, G1 sets
lastPersisted = b1and G2 setslastPersisted = b2. End state:lastPersisted = b2(newer) but cache =b1(older). - On the next
saveStatecall where the in-memory state still equals S2,bytes.Equal(prev, b)returns true and the write is skipped — so the cache permanently lags memory until the in-memory state changes again.
The result is regressed replay-IDs in the persisted checkpoint: on restart a topic loses progress and replays already-acked events. To make this safe, hold e.mu for the duration of the cache write, or serialize saveState calls through a single writer goroutine.
There was a problem hiding this comment.
Introduces state type and variable
6885768 to
ef0f91d
Compare
| Use a different Salesforce input instead if: | ||
|
|
||
| - You need continuous change events — use xref:components:inputs/salesforce_cdc.adoc[` + "`salesforce_cdc`" + `]. | ||
| - You need Platform Events or channel subscriptions — use xref:components:inputs/salesforce_platform_events.adoc[` + "`salesforce_platform_events`" + `]. |
There was a problem hiding this comment.
This xref points to salesforce_platform_events.adoc, but no salesforce_platform_events component is added in this PR (only salesforce, salesforce_cdc, and salesforce_graphql are registered). The reference is propagated into the generated docs/modules/components/pages/inputs/salesforce.adoc and renders as a broken AsciiDoc link. Either drop the bullet or point readers at salesforce_cdc (which already handles /event/... Platform Event topics).
|
|
||
| - You only need single-object SELECTs — use xref:components:inputs/salesforce.adoc[` + "`salesforce`" + `] (simpler, no GraphQL schema knowledge needed). | ||
| - You need continuous change events — use xref:components:inputs/salesforce_cdc.adoc[` + "`salesforce_cdc`" + `]. | ||
| - You need Platform Events or channels — use xref:components:inputs/salesforce_platform_events.adoc[` + "`salesforce_platform_events`" + `]. |
There was a problem hiding this comment.
Same as in input_salesforce.go: this xref points at salesforce_platform_events.adoc for a component that does not exist in this PR. The reference flows into docs/modules/components/pages/inputs/salesforce_graphql.adoc and renders as a broken AsciiDoc link. Replace the pointer with salesforce_cdc (which subscribes to /event/... Platform Event topics) or remove the bullet.
| ./app/setup.sh > /dev/null | ||
| ``` | ||
|
|
||
| When `SALESFORCE_CLIENT_SECRET` is set, `setup.sh` probes the `client_credentials` OAuth flow end-to-end. `OAuth: OK` on stderr means the integration tests (`internal/impl/salesforce/processor_salesforce_integration_test.go`) can run against this org. Any other response is printed verbatim and the script exits non-zero. |
There was a problem hiding this comment.
This sentence references internal/impl/salesforce/processor_salesforce_integration_test.go, but the processor (and all its tests) is removed in this PR. The integration tests now live in input_salesforce_integration_test.go, input_salesforce_cdc_integration_test.go, and input_salesforce_graphql_integration_test.go. Update the file reference so the README doesn't point at a deleted file.
|
Commits Review
|
ef0f91d to
9ff8765
Compare
| salesforce ,input ,Salesforce ,0.0.0 ,enterprise ,n ,n ,n | ||
| salesforce_cdc ,input ,Salesforce ,0.0.0 ,enterprise ,n ,n ,n | ||
| salesforce_graphql ,input ,Salesforce ,0.0.0 ,enterprise ,n ,y ,y |
There was a problem hiding this comment.
The cloud / cloud_with_gpu columns are inconsistent across the new salesforce inputs: salesforce and salesforce_cdc are n,n but salesforce_graphql is y,y. salesforce_cdc reasonably differs because it depends on a checkpoint_cache resource, but salesforce and salesforce_graphql are both stateless one-shot extracts that share the same auth/HTTP plumbing — they should match. Either both should be cloud-eligible or neither should. Worth verifying which is intended; the schema filter uses these flags to gate availability in the cloud distribution.
|
Commits
Review One inline issue: the connect/internal/plugins/info.csv Lines 244 to 247 in 9ff8765 |
| ristretto ,cache ,Ristretto ,0.0.0 ,community ,n ,y ,y | ||
| salesforce ,processor ,Salesforce ,4.0.0 ,enterprise ,n ,y ,y | ||
| salesforce ,input ,Salesforce ,0.0.0 ,enterprise ,n ,n ,n | ||
| salesforce_cdc ,input ,Salesforce ,0.0.0 ,enterprise ,n ,n ,n |
There was a problem hiding this comment.
please enable all 3 for cloud
Adds setup.sh (and Metadata API descriptors) under app/ that deploys a Connected App via sf CLI and prints SALESFORCE_ORG_URL and SALESFORCE_CLIENT_ID. SALESFORCE_CLIENT_SECRET is not exposed by the Salesforce API and must be copied from the UI. Updates the README with the automated flow and splits manual steps into their own section.
GetNextBatchParallel can return a non-empty batch together with done=true when the last SObject's final page arrives in the same call. The snapshot completion path discarded that batch and returned nil, silently dropping records — most visible with rest_objects scoped to a single SObject small enough to finish in one parallel-fetch cycle.
…tp.Client Adds GraphQLWithVariables and GraphQLQueryPageWithVariables so callers can forward GraphQL variables alongside the query. The original GraphQL and GraphQLQueryPage methods now delegate to the new variants, preserving backward compatibility.
… Client Introduces Events(), WaitReady(ctx), and a subscribeSettleDelay-gated ready channel so callers can wait for the Pub/Sub subscription to be fully routed server-side before publishing events. Without the settle delay, events emitted immediately after Send returns are silently dropped on ReplayPreset_LATEST because Salesforce's server-side subscription routing lags the gRPC-level stream acknowledgement.
…e subscriptions Splits subscription stream management out of salesforcegrpc.Client into a standalone Subscription type. A single Client can now own multiple Subscriptions sharing one connection, auth, and schema cache, instead of being limited to one subscribe stream per Client. Also moves Avro decoding helpers to avro.go and shared types to types.go.
Introduces salesforce input family: - salesforce: extracts the SOQL builder into soql.go with tests and pulls SalesforceInputConfig parsing into a reusable struct. - salesforce_graphql: extracts the config struct and introduces Bloblang-evaluated variables for the GraphQL query. - salesforce_cdc: supports multiple topics on one Pub/Sub gRPC client (one Subscription per topic), accepts CDC sObject shorthand, full CDC paths, the /data/ChangeEvents firehose, and /event/* Platform Events. Introduces a snapshot-then-stream phase backed by REST and a durable per-topic replay-ID checkpoint (TopicReplays). Adds integration tests and a RPCN_Test__e Platform Event definition under app/objects for the SFDX scratch org used by the test suite.
Replaces the four hand-rolled auth fields and the flat HTTP transport fields with the shared helpers from config.go: - authFieldSpecs() / NewAuthConfigFromParsed for org_url, client_id, client_secret, api_version. - httpFieldSpec() / newHTTPConfigFromParsed for the nested http namespace; the inline httpclient.Config literal is gone. Breaking config changes (output not yet released): - restapi_version → api_version - request_timeout (flat, 30s) removed; configure under http.timeout (default 5s from httpclient). - max_retries (flat, 10) removed; configure under http.retries (default 3 from httpclient).
|
Commits Review LGTM |
Summary
salesforce(SOQL one-shot),salesforce_cdc(multi-topic Pub/Sub: CDC, firehose, Platform Events, with REST snapshot phase and per-topic replay-ID checkpoints), andsalesforce_graphql(with Bloblang-evaluated variables).Subscriptiontype fromsalesforcegrpc.Clientso one Client can own multiple subscriptions sharing a single connection, auth, and schema cache.salesforce_sinkoutput to use the sharedauthFieldSpecs()/httpFieldSpec()helpers fromconfig.go. Breaking config rename:restapi_version→api_version; flatrequest_timeoutandmax_retriesmove under the nestedhttpnamespace.salesforcehttphelpers (SObjectPath, variables-aware GraphQL methods) and aRPCN_Test__ePlatform Event definition for the SFDX scratch org used by integration tests.