Skip to content

Commit 413465b

Browse files
committed
feat: add clickhouse on pulumi
1 parent 3b0b0f9 commit 413465b

File tree

11 files changed

+494
-73
lines changed

11 files changed

+494
-73
lines changed

deployments/pulumi/main.go

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"github.com/formancehq/ledger/deployments/pulumi/pkg"
55
"github.com/formancehq/ledger/deployments/pulumi/pkg/config"
66
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
7+
8+
_ "github.com/formancehq/ledger/deployments/pulumi/pkg/connectors/clickhouse"
79
)
810

911
func main() {

deployments/pulumi/pkg/component.go

+24-5
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ package ledger
33
import (
44
"fmt"
55
"github.com/formancehq/ledger/deployments/pulumi/pkg/api"
6+
"github.com/formancehq/ledger/deployments/pulumi/pkg/connectors"
67
"github.com/formancehq/ledger/deployments/pulumi/pkg/devbox"
8+
"github.com/formancehq/ledger/deployments/pulumi/pkg/initialize"
79
"github.com/formancehq/ledger/deployments/pulumi/pkg/storage"
810
"github.com/formancehq/ledger/deployments/pulumi/pkg/utils"
911
"github.com/formancehq/ledger/deployments/pulumi/pkg/worker"
@@ -20,6 +22,7 @@ type ComponentArgs struct {
2022
InstallDevBox pulumix.Input[bool]
2123
Database storage.DatabaseArgs
2224
API api.Args
25+
Connectors connectors.Args
2326
Worker worker.Args
2427
Ingress *api.IngressArgs
2528
}
@@ -34,11 +37,13 @@ func (args *ComponentArgs) SetDefaults() {
3437
type Component struct {
3538
pulumi.ResourceState
3639

37-
API *api.Component
38-
Worker *worker.Component
39-
Storage *storage.Component
40-
Namespace *corev1.Namespace
41-
Devbox *devbox.Component
40+
API *api.Component
41+
Worker *worker.Component
42+
Storage *storage.Component
43+
Namespace *corev1.Namespace
44+
Devbox *devbox.Component
45+
Connectors *connectors.Component
46+
Initialize *initialize.Component
4247
}
4348

4449
func NewComponent(ctx *pulumi.Context, name string, args ComponentArgs, opts ...pulumi.ResourceOption) (*Component, error) {
@@ -96,6 +101,14 @@ func NewComponent(ctx *pulumi.Context, name string, args ComponentArgs, opts ...
96101
return nil, err
97102
}
98103

104+
cmp.Connectors, err = connectors.NewComponent(ctx, "connectors", connectors.ComponentArgs{
105+
CommonArgs: args.CommonArgs,
106+
Args: args.Connectors,
107+
})
108+
if err != nil {
109+
return nil, err
110+
}
111+
99112
cmp.Worker, err = worker.NewComponent(ctx, "worker", worker.ComponentArgs{
100113
CommonArgs: args.CommonArgs,
101114
Args: args.Worker,
@@ -106,6 +119,12 @@ func NewComponent(ctx *pulumi.Context, name string, args ComponentArgs, opts ...
106119
return nil, err
107120
}
108121

122+
cmp.Initialize, err = initialize.NewComponent(ctx, "initialize", initialize.ComponentArgs{
123+
CommonArgs: args.CommonArgs,
124+
Connectors: cmp.Connectors,
125+
API: cmp.API,
126+
}, options...)
127+
109128
installDevBox, err := internals.UnsafeAwaitOutput(ctx.Context(), args.InstallDevBox.ToOutput(ctx.Context()))
110129
if err != nil {
111130
return nil, err

deployments/pulumi/pkg/config/config.go

+70-13
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,21 @@
11
package config
22

33
import (
4+
"encoding/json"
45
"errors"
56
"fmt"
7+
. "github.com/formancehq/go-libs/v2/collectionutils"
68
pulumi_ledger "github.com/formancehq/ledger/deployments/pulumi/pkg"
79
"github.com/formancehq/ledger/deployments/pulumi/pkg/api"
10+
"github.com/formancehq/ledger/deployments/pulumi/pkg/connectors"
811
"github.com/formancehq/ledger/deployments/pulumi/pkg/storage"
912
"github.com/formancehq/ledger/deployments/pulumi/pkg/utils"
1013
"github.com/formancehq/ledger/deployments/pulumi/pkg/worker"
1114
"github.com/pulumi/pulumi-aws/sdk/v6/go/aws/rds"
1215
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
1316
"github.com/pulumi/pulumi/sdk/v3/go/pulumi/config"
1417
"github.com/pulumi/pulumi/sdk/v3/go/pulumix"
18+
"reflect"
1519
"time"
1620
)
1721

@@ -248,25 +252,66 @@ func (d API) toInput() api.Args {
248252
}
249253
}
250254

251-
type Worker struct {
252-
Connectors map[string]map[string]any `json:"connectors" yaml:"connectors"`
255+
type Connector struct {
256+
Driver string `json:"driver" yaml:"driver"`
257+
Config any `json:"config" yaml:"config"`
253258
}
254259

255-
func (w Worker) toInput() worker.Args {
256-
connectors := make(map[string]pulumi.Map)
257-
for name, rawConfig := range w.Connectors {
258-
convertedConfig := pulumi.Map{}
259-
for k, v := range rawConfig {
260-
convertedConfig[k] = pulumi.Any(v)
260+
func (c Connector) toInput() connectors.ConnectorArgs {
261+
return connectors.ConnectorArgs{
262+
Driver: c.Driver,
263+
Config: c.Config,
264+
}
265+
}
266+
267+
type Connectors map[string]Connector
268+
269+
func (c *Connectors) UnmarshalJSON(data []byte) error {
270+
asMap := make(map[string]json.RawMessage, 0)
271+
if err := json.Unmarshal(data, &asMap); err != nil {
272+
return fmt.Errorf("error unmarshalling connectors into an array: %w", err)
273+
}
274+
275+
*c = make(map[string]Connector)
276+
for id, elem := range asMap {
277+
type def struct {
278+
Driver string `json:"driver" yaml:"driver"`
279+
}
280+
d := def{}
281+
if err := json.Unmarshal(elem, &d); err != nil {
282+
return fmt.Errorf("error unmarshalling connector definition %s: %w", id, err)
283+
}
284+
285+
cfg, err := connectors.GetConnectorConfig(d.Driver)
286+
if err != nil {
287+
return err
288+
}
289+
290+
if err := json.Unmarshal(elem, cfg); err != nil {
291+
return fmt.Errorf("error unmarshalling connector config %s: %w", id, err)
292+
}
293+
294+
(*c)[id] = Connector{
295+
Driver: d.Driver,
296+
Config: reflect.ValueOf(cfg).Elem().Interface(),
261297
}
262-
connectors[name] = convertedConfig
263298
}
264299

265-
return worker.Args{
266-
Connectors: connectors,
300+
return nil
301+
}
302+
303+
func (c *Connectors) toInput() connectors.Args {
304+
return connectors.Args{
305+
Connectors: ConvertMap(*c, Connector.toInput),
267306
}
268307
}
269308

309+
type Worker struct{}
310+
311+
func (w Worker) toInput() worker.Args {
312+
return worker.Args{}
313+
}
314+
270315
type Monitoring struct {
271316
// ResourceAttributes is the resource attributes for OpenTelemetry
272317
ResourceAttributes map[string]string `json:"resource-attributes" yaml:"resource-attributes"`
@@ -431,6 +476,9 @@ type Config struct {
431476
// Worker is the worker configuration for the ledger
432477
Worker *Worker `json:"worker" yaml:"worker"`
433478

479+
// Connectors is the connectors configuration for the ledger
480+
Connectors Connectors `json:"connectors" yaml:"connectors"`
481+
434482
// Ingress is the ingress configuration for the ledger
435483
Ingress *Ingress `json:"ingress" yaml:"ingress"`
436484

@@ -446,10 +494,11 @@ func (cfg Config) ToInput() pulumi_ledger.ComponentArgs {
446494
CommonArgs: cfg.Common.toInput(),
447495
Database: cfg.Storage.toInput(),
448496
API: cfg.API.toInput(),
449-
Worker: cfg.Worker.toInput(),
497+
Worker: cfg.Worker.toInput(),
450498
Timeout: pulumix.Val(cfg.Timeout),
451499
Ingress: cfg.Ingress.toInput(),
452500
InstallDevBox: pulumix.Val(cfg.InstallDevBox),
501+
Connectors: cfg.Connectors.toInput(),
453502
}
454503
}
455504

@@ -494,6 +543,13 @@ func Load(ctx *pulumi.Context) (*Config, error) {
494543
}
495544
}
496545

546+
connectors := Connectors{}
547+
if err := config.GetObject(ctx, "connectors", &connectors); err != nil {
548+
if !errors.Is(err, config.ErrMissingVar) {
549+
return nil, err
550+
}
551+
}
552+
497553
otel := &Monitoring{}
498554
if err := config.GetObject(ctx, "monitoring", otel); err != nil {
499555
if !errors.Is(err, config.ErrMissingVar) {
@@ -512,7 +568,8 @@ func Load(ctx *pulumi.Context) (*Config, error) {
512568
InstallDevBox: config.GetBool(ctx, "install-dev-box"),
513569
Storage: storage,
514570
API: api,
515-
Worker: worker,
571+
Worker: worker,
516572
Ingress: ingress,
573+
Connectors: connectors,
517574
}, nil
518575
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package clickhouse
2+
3+
import (
4+
"fmt"
5+
"github.com/formancehq/ledger/deployments/pulumi/pkg/connectors"
6+
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
7+
"github.com/pulumi/pulumi/sdk/v3/go/pulumix"
8+
)
9+
10+
type externalComponent struct {
11+
pulumi.ResourceState
12+
13+
DSN pulumix.Output[string]
14+
}
15+
16+
func (e *externalComponent) GetConfig() pulumi.AnyOutput {
17+
return pulumi.Any(map[string]any{
18+
"dsn": e.DSN,
19+
})
20+
}
21+
22+
type externalComponentArgs struct {
23+
DSN pulumi.String
24+
}
25+
26+
func newExternalComponent(ctx *pulumi.Context, name string, args externalComponentArgs, opts ...pulumi.ResourceOption) (*externalComponent, error) {
27+
cmp := &externalComponent{}
28+
err := ctx.RegisterComponentResource("Formance:Ledger:Clickhouse:External", name, cmp, opts...)
29+
if err != nil {
30+
return nil, err
31+
}
32+
33+
cmp.DSN = args.DSN.ToOutput(ctx.Context())
34+
35+
if err := ctx.RegisterResourceOutputs(cmp, pulumi.Map{}); err != nil {
36+
return nil, fmt.Errorf("registering outputs: %w", err)
37+
}
38+
39+
return cmp, nil
40+
}
41+
42+
var _ connectors.ConnectorComponent = (*externalComponent)(nil)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package clickhouse
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"github.com/formancehq/ledger/deployments/pulumi/pkg/connectors"
7+
"github.com/formancehq/ledger/deployments/pulumi/pkg/utils"
8+
corev1 "github.com/pulumi/pulumi-kubernetes/sdk/v4/go/kubernetes/core/v1"
9+
helm "github.com/pulumi/pulumi-kubernetes/sdk/v4/go/kubernetes/helm/v4"
10+
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
11+
"github.com/pulumi/pulumi/sdk/v3/go/pulumi/internals"
12+
"github.com/pulumi/pulumi/sdk/v3/go/pulumix"
13+
)
14+
15+
type internalComponent struct {
16+
pulumi.ResourceState
17+
18+
DSN pulumix.Output[string]
19+
Chart *helm.Chart
20+
Service *corev1.Service
21+
}
22+
23+
func (e *internalComponent) GetConfig() pulumi.AnyOutput {
24+
return pulumi.Any(map[string]any{
25+
"dsn": pulumi.Sprintf(
26+
"clickhouse://%s.%s.svc.cluster.local:%d",
27+
e.Service.Metadata.Name().Elem(),
28+
e.Service.Metadata.Namespace().Elem(),
29+
9000,
30+
),
31+
})
32+
}
33+
34+
type internalComponentArgs struct {
35+
utils.CommonArgs
36+
}
37+
38+
func newInternalComponent(ctx *pulumi.Context, name string, args internalComponentArgs, opts ...pulumi.ResourceOption) (*internalComponent, error) {
39+
cmp := &internalComponent{}
40+
err := ctx.RegisterComponentResource("Formance:Ledger:Clickhouse:Internal", name, cmp, opts...)
41+
if err != nil {
42+
return nil, err
43+
}
44+
45+
cmp.Chart, err = helm.NewChart(ctx, "clickhouse", &helm.ChartArgs{
46+
Chart: pulumi.String("oci://registry-1.docker.io/bitnamicharts/clickhouse"),
47+
Version: pulumi.String("8.0.5"),
48+
Name: pulumi.String("clickhouse"),
49+
Namespace: args.Namespace.ToOutput(ctx.Context()).Untyped().(pulumi.StringOutput),
50+
Values: pulumi.Map{
51+
"replicaCount": pulumi.Int(1),
52+
"shards": pulumi.Int(1),
53+
"zookeeper": pulumi.Map{
54+
"enabled": pulumi.Bool(false),
55+
},
56+
},
57+
}, pulumi.Parent(cmp))
58+
if err != nil {
59+
return nil, err
60+
}
61+
62+
ret, err := internals.UnsafeAwaitOutput(ctx.Context(), pulumix.ApplyErr(cmp.Chart.Resources, func(resources []any) (*corev1.Service, error) {
63+
for _, resource := range resources {
64+
service, ok := resource.(*corev1.Service)
65+
if !ok {
66+
continue
67+
}
68+
ret, err := internals.UnsafeAwaitOutput(ctx.Context(), pulumix.Apply2(
69+
service.Spec.Type().Elem(),
70+
service.Spec.ClusterIP().Elem(),
71+
func(serviceType, clusterIP string) *corev1.Service {
72+
// select not headless service
73+
if serviceType != "ClusterIP" || clusterIP == "None" {
74+
return nil
75+
}
76+
return service
77+
},
78+
))
79+
if err != nil {
80+
return nil, err
81+
}
82+
if ret.Value != nil {
83+
return ret.Value.(*corev1.Service), nil
84+
}
85+
}
86+
return nil, errors.New("not found")
87+
}))
88+
if err != nil {
89+
return nil, err
90+
}
91+
cmp.Service = ret.Value.(*corev1.Service)
92+
93+
if err := ctx.RegisterResourceOutputs(cmp, pulumi.Map{}); err != nil {
94+
return nil, fmt.Errorf("registering outputs: %w", err)
95+
}
96+
97+
return cmp, nil
98+
}
99+
100+
var _ connectors.ConnectorComponent = (*internalComponent)(nil)

0 commit comments

Comments
 (0)