1
1
package config
2
2
3
3
import (
4
+ "encoding/json"
4
5
"errors"
5
6
"fmt"
7
+ . "github.com/formancehq/go-libs/v2/collectionutils"
6
8
pulumi_ledger "github.com/formancehq/ledger/deployments/pulumi/pkg"
7
9
"github.com/formancehq/ledger/deployments/pulumi/pkg/api"
10
+ "github.com/formancehq/ledger/deployments/pulumi/pkg/connectors"
8
11
"github.com/formancehq/ledger/deployments/pulumi/pkg/storage"
9
12
"github.com/formancehq/ledger/deployments/pulumi/pkg/utils"
10
13
"github.com/formancehq/ledger/deployments/pulumi/pkg/worker"
11
14
"github.com/pulumi/pulumi-aws/sdk/v6/go/aws/rds"
12
15
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
13
16
"github.com/pulumi/pulumi/sdk/v3/go/pulumi/config"
14
17
"github.com/pulumi/pulumi/sdk/v3/go/pulumix"
18
+ "reflect"
15
19
"time"
16
20
)
17
21
@@ -248,25 +252,66 @@ func (d API) toInput() api.Args {
248
252
}
249
253
}
250
254
251
- type Worker struct {
252
- Connectors map [string ]map [string ]any `json:"connectors" yaml:"connectors"`
255
+ type Connector struct {
256
+ Driver string `json:"driver" yaml:"driver"`
257
+ Config any `json:"config" yaml:"config"`
253
258
}
254
259
255
- func (w Worker ) toInput () worker.Args {
256
- connectors := make (map [string ]pulumi.Map )
257
- for name , rawConfig := range w .Connectors {
258
- convertedConfig := pulumi.Map {}
259
- for k , v := range rawConfig {
260
- convertedConfig [k ] = pulumi .Any (v )
260
+ func (c Connector ) toInput () connectors.ConnectorArgs {
261
+ return connectors.ConnectorArgs {
262
+ Driver : c .Driver ,
263
+ Config : c .Config ,
264
+ }
265
+ }
266
+
267
+ type Connectors map [string ]Connector
268
+
269
+ func (c * Connectors ) UnmarshalJSON (data []byte ) error {
270
+ asMap := make (map [string ]json.RawMessage , 0 )
271
+ if err := json .Unmarshal (data , & asMap ); err != nil {
272
+ return fmt .Errorf ("error unmarshalling connectors into an array: %w" , err )
273
+ }
274
+
275
+ * c = make (map [string ]Connector )
276
+ for id , elem := range asMap {
277
+ type def struct {
278
+ Driver string `json:"driver" yaml:"driver"`
279
+ }
280
+ d := def {}
281
+ if err := json .Unmarshal (elem , & d ); err != nil {
282
+ return fmt .Errorf ("error unmarshalling connector definition %s: %w" , id , err )
283
+ }
284
+
285
+ cfg , err := connectors .GetConnectorConfig (d .Driver )
286
+ if err != nil {
287
+ return err
288
+ }
289
+
290
+ if err := json .Unmarshal (elem , cfg ); err != nil {
291
+ return fmt .Errorf ("error unmarshalling connector config %s: %w" , id , err )
292
+ }
293
+
294
+ (* c )[id ] = Connector {
295
+ Driver : d .Driver ,
296
+ Config : reflect .ValueOf (cfg ).Elem ().Interface (),
261
297
}
262
- connectors [name ] = convertedConfig
263
298
}
264
299
265
- return worker.Args {
266
- Connectors : connectors ,
300
+ return nil
301
+ }
302
+
303
+ func (c * Connectors ) toInput () connectors.Args {
304
+ return connectors.Args {
305
+ Connectors : ConvertMap (* c , Connector .toInput ),
267
306
}
268
307
}
269
308
309
+ type Worker struct {}
310
+
311
+ func (w Worker ) toInput () worker.Args {
312
+ return worker.Args {}
313
+ }
314
+
270
315
type Monitoring struct {
271
316
// ResourceAttributes is the resource attributes for OpenTelemetry
272
317
ResourceAttributes map [string ]string `json:"resource-attributes" yaml:"resource-attributes"`
@@ -431,6 +476,9 @@ type Config struct {
431
476
// Worker is the worker configuration for the ledger
432
477
Worker * Worker `json:"worker" yaml:"worker"`
433
478
479
+ // Connectors is the connectors configuration for the ledger
480
+ Connectors Connectors `json:"connectors" yaml:"connectors"`
481
+
434
482
// Ingress is the ingress configuration for the ledger
435
483
Ingress * Ingress `json:"ingress" yaml:"ingress"`
436
484
@@ -446,10 +494,11 @@ func (cfg Config) ToInput() pulumi_ledger.ComponentArgs {
446
494
CommonArgs : cfg .Common .toInput (),
447
495
Database : cfg .Storage .toInput (),
448
496
API : cfg .API .toInput (),
449
- Worker : cfg .Worker .toInput (),
497
+ Worker : cfg .Worker .toInput (),
450
498
Timeout : pulumix .Val (cfg .Timeout ),
451
499
Ingress : cfg .Ingress .toInput (),
452
500
InstallDevBox : pulumix .Val (cfg .InstallDevBox ),
501
+ Connectors : cfg .Connectors .toInput (),
453
502
}
454
503
}
455
504
@@ -494,6 +543,13 @@ func Load(ctx *pulumi.Context) (*Config, error) {
494
543
}
495
544
}
496
545
546
+ connectors := Connectors {}
547
+ if err := config .GetObject (ctx , "connectors" , & connectors ); err != nil {
548
+ if ! errors .Is (err , config .ErrMissingVar ) {
549
+ return nil , err
550
+ }
551
+ }
552
+
497
553
otel := & Monitoring {}
498
554
if err := config .GetObject (ctx , "monitoring" , otel ); err != nil {
499
555
if ! errors .Is (err , config .ErrMissingVar ) {
@@ -512,7 +568,8 @@ func Load(ctx *pulumi.Context) (*Config, error) {
512
568
InstallDevBox : config .GetBool (ctx , "install-dev-box" ),
513
569
Storage : storage ,
514
570
API : api ,
515
- Worker : worker ,
571
+ Worker : worker ,
516
572
Ingress : ingress ,
573
+ Connectors : connectors ,
517
574
}, nil
518
575
}
0 commit comments