Skip to content

Commit df1c226

Browse files
committed
salesforce: rework salesforce_sink output to use shared config helpers
Replaces the four hand-rolled auth fields and the flat HTTP transport fields with the shared helpers from config.go: - authFieldSpecs() / NewAuthConfigFromParsed for org_url, client_id, client_secret, api_version. - httpFieldSpec() / newHTTPConfigFromParsed for the nested http namespace; the inline httpclient.Config literal is gone. Breaking config changes (output not yet released): - restapi_version → api_version - request_timeout (flat, 30s) removed; configure under http.timeout (default 5s from httpclient). - max_retries (flat, 10) removed; configure under http.retries (default 3 from httpclient).
1 parent e86a3d5 commit df1c226

1 file changed

Lines changed: 16 additions & 61 deletions

File tree

internal/impl/salesforce/output_salesforce.go

Lines changed: 16 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,6 @@ const (
4747
)
4848

4949
const (
50-
sfsFieldOrgURL = "org_url"
51-
sfsFieldClientID = "client_id"
52-
sfsFieldClientSecret = "client_secret"
53-
sfsFieldRESTAPIVersion = "restapi_version"
54-
sfsFieldRequestTimeout = "request_timeout"
55-
sfsFieldMaxRetries = "max_retries"
5650
sfsFieldBulkBatchSize = "bulk_batch_size"
5751
sfsFieldMaxConcurrentBulkJobs = "max_concurrent_bulk_jobs"
5852
sfsFieldBulkPollInterval = "bulk_poll_interval"
@@ -150,7 +144,7 @@ func newSalesforceSinkConfigSpec() *service.ConfigSpec {
150144
Default(false),
151145
).Description("Per-topic Salesforce write configuration. Each entry maps a topic to an SObject and write settings.")
152146

153-
return service.NewConfigSpec().
147+
spec := service.NewConfigSpec().
154148
Summary("Writes messages to Salesforce, routing each topic to its own SObject configuration.").
155149
Description(`Consumes batches of messages and writes them to Salesforce.
156150
@@ -198,23 +192,9 @@ output:
198192
operation: upsert
199193
external_id_field: External_Id__c
200194
mode: bulk
201-
` + "```").
202-
Field(service.NewStringField(sfsFieldOrgURL).
203-
Description("Salesforce instance base URL (e.g., https://your-domain.salesforce.com)")).
204-
Field(service.NewStringField(sfsFieldClientID).
205-
Description("Client ID for the Salesforce Connected App")).
206-
Field(service.NewStringField(sfsFieldClientSecret).
207-
Description("Client Secret for the Salesforce Connected App").
208-
Secret()).
209-
Field(service.NewStringField(sfsFieldRESTAPIVersion).
210-
Description("Salesforce REST API version to use (e.g., v65.0)").
211-
Default("v65.0")).
212-
Field(service.NewDurationField(sfsFieldRequestTimeout).
213-
Description("HTTP request timeout").
214-
Default("30s")).
215-
Field(service.NewIntField(sfsFieldMaxRetries).
216-
Description("Maximum number of retries on 429 Too Many Requests").
217-
Default(10)).
195+
` + "```")
196+
197+
spec = spec.Fields(authFieldSpecs()...).
218198
Field(service.NewIntField(sfsFieldBulkBatchSize).
219199
Description("Number of records per bulk job. Also controls the output batch size.").
220200
Default(defaultBulkBatchSize)).
@@ -230,43 +210,26 @@ output:
230210
Field(service.NewIntField(sfsFieldMaxInFlight).
231211
Description("Maximum number of batches to send concurrently. Increasing this improves realtime write throughput.").
232212
Default(1)).
233-
Field(topicMappingSpec)
213+
Field(topicMappingSpec).
214+
Field(httpFieldSpec())
215+
216+
return spec
234217
}
235218

236219
func newSalesforceSinkOutput(conf *service.ParsedConfig, mgr *service.Resources) (*salesforceSinkOutput, error) {
237220
if err := license.CheckRunningEnterprise(mgr); err != nil {
238221
return nil, err
239222
}
240223

241-
orgURL, err := conf.FieldString(sfsFieldOrgURL)
224+
auth, err := NewAuthConfigFromParsed(conf)
242225
if err != nil {
243226
return nil, err
244227
}
245-
if _, err := url.ParseRequestURI(orgURL); err != nil {
228+
if _, err := url.ParseRequestURI(auth.OrgURL); err != nil {
246229
return nil, errors.New("org_url is not a valid URL")
247230
}
248231

249-
clientID, err := conf.FieldString(sfsFieldClientID)
250-
if err != nil {
251-
return nil, err
252-
}
253-
254-
clientSecret, err := conf.FieldString(sfsFieldClientSecret)
255-
if err != nil {
256-
return nil, err
257-
}
258-
259-
apiVersion, err := conf.FieldString(sfsFieldRESTAPIVersion)
260-
if err != nil {
261-
return nil, err
262-
}
263-
264-
timeout, err := conf.FieldDuration(sfsFieldRequestTimeout)
265-
if err != nil {
266-
return nil, err
267-
}
268-
269-
maxRetries, err := conf.FieldInt(sfsFieldMaxRetries)
232+
httpConf, err := newHTTPConfigFromParsed(auth.OrgURL, conf)
270233
if err != nil {
271234
return nil, err
272235
}
@@ -335,24 +298,16 @@ func newSalesforceSinkOutput(conf *service.ParsedConfig, mgr *service.Resources)
335298
}
336299
}
337300

338-
httpClient, err := httpclient.NewClient(httpclient.Config{
339-
BaseURL: orgURL,
340-
Timeout: timeout,
341-
BackoffMaxRetries: maxRetries,
342-
BackoffInitialInterval: 500 * time.Millisecond,
343-
BackoffMaxInterval: 30 * time.Second,
344-
Transport: httpclient.DefaultTransportConfig(),
345-
MetricPrefix: "salesforce_http",
346-
}, mgr)
301+
httpClient, err := httpclient.NewClient(httpConf, mgr)
347302
if err != nil {
348303
return nil, err
349304
}
350305

351306
sfClient, err := salesforcehttp.NewClient(salesforcehttp.ClientConfig{
352-
OrgURL: orgURL,
353-
ClientID: clientID,
354-
ClientSecret: clientSecret,
355-
APIVersion: apiVersion,
307+
OrgURL: auth.OrgURL,
308+
ClientID: auth.ClientID,
309+
ClientSecret: auth.ClientSecret,
310+
APIVersion: auth.APIVersion,
356311
QueryBatchSize: 2000,
357312
HTTPClient: httpClient,
358313
Logger: mgr.Logger(),

0 commit comments

Comments
 (0)