Skip to content

Commit f5f8248

Browse files
committed
feat: add worker to pulumi
1 parent 3abb083 commit f5f8248

19 files changed

+821
-737
lines changed

cmd/worker.go

+3-10
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func NewWorkerCommand() *cobra.Command {
5454
return err
5555
}
5656

57-
options := []fx.Option{
57+
return service.New(cmd.OutOrStdout(),
5858
fx.NopLogger,
5959
otlp.FXModuleFromFlags(cmd),
6060
otlptraces.FXModuleFromFlags(cmd),
@@ -68,14 +68,7 @@ func NewWorkerCommand() *cobra.Command {
6868
PullInterval: configuration.pipelinesPullInterval,
6969
PushRetryPeriod: configuration.pipelinesPushRetryPeriod,
7070
}),
71-
}
72-
73-
workerEnabled, _ := cmd.Flags().GetBool(WorkerEnabledFlag)
74-
if workerEnabled {
75-
options = append(options, )
76-
}
77-
78-
return service.New(cmd.OutOrStdout(), options...).Run(cmd)
71+
).Run(cmd)
7972
},
8073
}
8174

@@ -86,4 +79,4 @@ func NewWorkerCommand() *cobra.Command {
8679
otlptraces.AddFlags(cmd.Flags())
8780

8881
return cmd
89-
}
82+
}

deployments/pulumi/main.go

-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"github.com/formancehq/ledger/deployments/pulumi/pkg"
77
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
88
"github.com/pulumi/pulumi/sdk/v3/go/pulumi/config"
9-
"github.com/pulumi/pulumi/sdk/v3/go/pulumix"
109
)
1110

1211
func main() {
@@ -47,7 +46,6 @@ func deploy(ctx *pulumi.Context) error {
4746
Debug: pulumi.Bool(conf.GetBool("debug")),
4847
ReplicaCount: pulumi.Int(conf.GetInt("replicaCount")),
4948
ExperimentalFeatures: pulumi.Bool(conf.GetBool("experimentalFeatures")),
50-
Upgrade: pulumix.Val(pulumi_ledger.UpgradeMode(config.Get(ctx, "upgrade-mode"))),
5149
})
5250

5351
return err

deployments/pulumi/pkg/args.go

+318
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,318 @@
1+
package pulumi_ledger
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/formancehq/go-libs/v2/collectionutils"
7+
corev1 "github.com/pulumi/pulumi-kubernetes/sdk/v4/go/kubernetes/core/v1"
8+
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
9+
"github.com/pulumi/pulumi/sdk/v3/go/pulumix"
10+
"slices"
11+
"time"
12+
)
13+
14+
type PostgresArgs struct {
15+
URI pulumix.Input[string]
16+
AWSEnableIAM pulumix.Input[bool]
17+
MaxIdleConns pulumix.Input[*int]
18+
MaxOpenConns pulumix.Input[*int]
19+
ConnMaxIdleTime pulumix.Input[*time.Duration]
20+
}
21+
22+
func (args PostgresArgs) getEnvVars(ctx context.Context) corev1.EnvVarArray {
23+
envVars := corev1.EnvVarArray{
24+
corev1.EnvVarArgs{
25+
Name: pulumi.String("POSTGRES_URI"),
26+
Value: args.URI.ToOutput(ctx).Untyped().(pulumi.StringOutput),
27+
},
28+
}
29+
30+
if args.AWSEnableIAM != nil {
31+
envVars = append(envVars, corev1.EnvVarArgs{
32+
Name: pulumi.String("POSTGRES_AWS_ENABLE_IAM"),
33+
Value: boolToString(args.AWSEnableIAM).Untyped().(pulumi.StringOutput),
34+
})
35+
}
36+
37+
if args.ConnMaxIdleTime != nil {
38+
envVars = append(envVars, corev1.EnvVarArgs{
39+
Name: pulumi.String("POSTGRES_CONN_MAX_IDLE_TIME"),
40+
Value: pulumix.Apply(args.ConnMaxIdleTime, func(connMaxIdleTime *time.Duration) string {
41+
if connMaxIdleTime == nil {
42+
return ""
43+
}
44+
return connMaxIdleTime.String()
45+
}).Untyped().(pulumi.StringOutput),
46+
})
47+
}
48+
49+
if args.MaxOpenConns != nil {
50+
envVars = append(envVars, corev1.EnvVarArgs{
51+
Name: pulumi.String("POSTGRES_MAX_OPEN_CONNS"),
52+
Value: pulumix.Apply(args.MaxOpenConns, func(maxOpenConns *int) string {
53+
if maxOpenConns == nil {
54+
return ""
55+
}
56+
return fmt.Sprint(*maxOpenConns)
57+
}).Untyped().(pulumi.StringOutput),
58+
})
59+
}
60+
61+
if args.MaxIdleConns != nil {
62+
envVars = append(envVars, corev1.EnvVarArgs{
63+
Name: pulumi.String("POSTGRES_MAX_IDLE_CONNS"),
64+
Value: pulumix.Apply(args.MaxIdleConns, func(maxIdleConns *int) string {
65+
if maxIdleConns == nil {
66+
return ""
67+
}
68+
return fmt.Sprint(*maxIdleConns)
69+
}).Untyped().(pulumi.StringOutput),
70+
})
71+
}
72+
73+
return envVars
74+
}
75+
76+
type OtelTracesArgs struct {
77+
OtelTracesBatch pulumix.Input[bool]
78+
OtelTracesExporterFlag pulumix.Input[string]
79+
OtelTracesExporterJaegerEndpoint pulumix.Input[string]
80+
OtelTracesExporterJaegerUser pulumix.Input[string]
81+
OtelTracesExporterJaegerPassword pulumix.Input[string]
82+
OtelTracesExporterOTLPMode pulumix.Input[string]
83+
OtelTracesExporterOTLPEndpoint pulumix.Input[string]
84+
OtelTracesExporterOTLPInsecure pulumix.Input[bool]
85+
}
86+
87+
type OtelMetricsArgs struct {
88+
OtelMetricsExporterPushInterval pulumix.Input[*time.Duration]
89+
OtelMetricsRuntime pulumix.Input[bool]
90+
OtelMetricsRuntimeMinimumReadMemStatsInterval pulumix.Input[*time.Duration]
91+
OtelMetricsExporter pulumix.Input[string]
92+
OtelMetricsKeepInMemory pulumix.Input[bool]
93+
OtelMetricsExporterOTLPMode pulumix.Input[string]
94+
OtelMetricsExporterOTLPEndpoint pulumix.Input[string]
95+
OtelMetricsExporterOTLPInsecure pulumix.Input[bool]
96+
}
97+
98+
type IngressArgs struct {
99+
Host pulumix.Input[string]
100+
Secret pulumix.Input[*string]
101+
}
102+
103+
type OtelArgs struct {
104+
ResourceAttributes pulumix.Input[map[string]string]
105+
ServiceName pulumix.Input[string]
106+
107+
Traces *OtelTracesArgs
108+
Metrics *OtelMetricsArgs
109+
}
110+
111+
type ComponentArgs struct {
112+
Postgres PostgresArgs
113+
Otel *OtelArgs
114+
Ingress *IngressArgs
115+
Namespace pulumix.Input[string]
116+
Timeout pulumix.Input[int]
117+
Tag pulumix.Input[string]
118+
ImagePullPolicy pulumix.Input[string]
119+
Debug pulumix.Input[bool]
120+
ReplicaCount pulumix.Input[int]
121+
GracePeriod pulumix.Input[string]
122+
BallastSizeInBytes pulumix.Input[int]
123+
NumscriptCacheMaxCount pulumix.Input[int]
124+
BulkMaxSize pulumix.Input[int]
125+
BulkParallel pulumix.Input[int]
126+
TerminationGracePeriodSeconds pulumix.Input[*int]
127+
128+
ExperimentalFeatures pulumix.Input[bool]
129+
ExperimentalNumscriptInterpreter pulumix.Input[bool]
130+
}
131+
132+
func (args *ComponentArgs) setDefaults() {
133+
if args.Namespace == nil {
134+
args.Namespace = pulumi.String("")
135+
}
136+
if args.Tag == nil {
137+
args.Tag = pulumi.String("latest")
138+
} else {
139+
args.Tag = pulumix.Apply(args.Tag, func(tag string) string {
140+
if tag == "" {
141+
return "latest"
142+
}
143+
return tag
144+
})
145+
}
146+
if args.ImagePullPolicy == nil {
147+
args.ImagePullPolicy = pulumi.String("")
148+
}
149+
if args.Debug == nil {
150+
args.Debug = pulumi.Bool(false)
151+
}
152+
if args.GracePeriod == nil {
153+
args.GracePeriod = pulumi.String("0s")
154+
}
155+
if args.ExperimentalFeatures == nil {
156+
args.ExperimentalFeatures = pulumi.Bool(false)
157+
}
158+
if args.ExperimentalNumscriptInterpreter == nil {
159+
args.ExperimentalNumscriptInterpreter = pulumi.Bool(false)
160+
}
161+
if args.NumscriptCacheMaxCount == nil {
162+
args.NumscriptCacheMaxCount = pulumi.Int(0)
163+
}
164+
if args.BulkParallel == nil {
165+
args.BulkParallel = pulumi.Int(0)
166+
}
167+
if args.BallastSizeInBytes == nil {
168+
args.BallastSizeInBytes = pulumi.Int(0)
169+
}
170+
if args.BulkMaxSize == nil {
171+
args.BulkMaxSize = pulumi.Int(0)
172+
}
173+
if args.TerminationGracePeriodSeconds == nil {
174+
args.TerminationGracePeriodSeconds = pulumix.Val((*int)(nil))
175+
}
176+
}
177+
178+
func (args ComponentArgs) getOpenTelemetryEnvVars(ctx context.Context) corev1.EnvVarArray {
179+
envVars := corev1.EnvVarArray{}
180+
if otel := args.Otel; otel != nil {
181+
if otel.ServiceName != nil {
182+
envVars = append(envVars, corev1.EnvVarArgs{
183+
Name: pulumi.String("OTEL_SERVICE_NAME"),
184+
Value: otel.ServiceName.ToOutput(ctx).Untyped().(pulumi.StringOutput),
185+
})
186+
}
187+
if otel.ResourceAttributes != nil {
188+
envVars = append(envVars, corev1.EnvVarArgs{
189+
Name: pulumi.String("OTEL_RESOURCE_ATTRIBUTES"),
190+
Value: pulumix.Apply(otel.ResourceAttributes, func(rawResourceAttributes map[string]string) string {
191+
ret := ""
192+
keys := collectionutils.Keys(rawResourceAttributes)
193+
slices.Sort(keys)
194+
for _, key := range keys {
195+
ret += key + "=" + rawResourceAttributes[key] + ","
196+
}
197+
if len(ret) > 0 {
198+
ret = ret[:len(ret)-1]
199+
}
200+
return ret
201+
}).Untyped().(pulumi.StringOutput),
202+
})
203+
}
204+
if traces := args.Otel.Traces; traces != nil {
205+
if traces.OtelTracesBatch != nil {
206+
envVars = append(envVars, corev1.EnvVarArgs{
207+
Name: pulumi.String("OTEL_TRACES_BATCH"),
208+
Value: boolToString(traces.OtelTracesBatch).Untyped().(pulumi.StringOutput),
209+
})
210+
}
211+
if traces.OtelTracesExporterFlag != nil {
212+
envVars = append(envVars, corev1.EnvVarArgs{
213+
Name: pulumi.String("OTEL_TRACES_EXPORTER"),
214+
Value: traces.OtelTracesExporterFlag.ToOutput(ctx).Untyped().(pulumi.StringOutput),
215+
})
216+
}
217+
if traces.OtelTracesExporterJaegerEndpoint != nil {
218+
envVars = append(envVars, corev1.EnvVarArgs{
219+
Name: pulumi.String("OTEL_TRACES_EXPORTER_JAEGER_ENDPOINT"),
220+
Value: traces.OtelTracesExporterJaegerEndpoint.ToOutput(ctx).Untyped().(pulumi.StringOutput),
221+
})
222+
}
223+
if traces.OtelTracesExporterJaegerUser != nil {
224+
envVars = append(envVars, corev1.EnvVarArgs{
225+
Name: pulumi.String("OTEL_TRACES_EXPORTER_JAEGER_USER"),
226+
Value: traces.OtelTracesExporterJaegerUser.ToOutput(ctx).Untyped().(pulumi.StringOutput),
227+
})
228+
}
229+
if traces.OtelTracesExporterJaegerPassword != nil {
230+
envVars = append(envVars, corev1.EnvVarArgs{
231+
Name: pulumi.String("OTEL_TRACES_EXPORTER_JAEGER_PASSWORD"),
232+
Value: traces.OtelTracesExporterJaegerPassword.ToOutput(ctx).Untyped().(pulumi.StringOutput),
233+
})
234+
}
235+
if traces.OtelTracesExporterOTLPMode != nil {
236+
envVars = append(envVars, corev1.EnvVarArgs{
237+
Name: pulumi.String("OTEL_TRACES_EXPORTER_OTLP_MODE"),
238+
Value: traces.OtelTracesExporterOTLPMode.ToOutput(ctx).Untyped().(pulumi.StringOutput),
239+
})
240+
}
241+
if traces.OtelTracesExporterOTLPEndpoint != nil {
242+
envVars = append(envVars, corev1.EnvVarArgs{
243+
Name: pulumi.String("OTEL_TRACES_EXPORTER_OTLP_ENDPOINT"),
244+
Value: traces.OtelTracesExporterOTLPEndpoint.ToOutput(ctx).Untyped().(pulumi.StringOutput),
245+
})
246+
}
247+
if traces.OtelTracesExporterOTLPInsecure != nil {
248+
envVars = append(envVars, corev1.EnvVarArgs{
249+
Name: pulumi.String("OTEL_TRACES_EXPORTER_OTLP_INSECURE"),
250+
Value: boolToString(traces.OtelTracesExporterOTLPInsecure).Untyped().(pulumi.StringOutput),
251+
})
252+
}
253+
}
254+
255+
if metrics := args.Otel.Metrics; metrics != nil {
256+
if metrics.OtelMetricsExporterPushInterval != nil {
257+
envVars = append(envVars, corev1.EnvVarArgs{
258+
Name: pulumi.String("OTEL_METRICS_EXPORTER_PUSH_INTERVAL"),
259+
Value: pulumix.Apply(metrics.OtelMetricsExporterPushInterval, func(pushInterval *time.Duration) string {
260+
if pushInterval == nil {
261+
return ""
262+
}
263+
return pushInterval.String()
264+
}).Untyped().(pulumi.StringOutput),
265+
})
266+
}
267+
if metrics.OtelMetricsRuntime != nil {
268+
envVars = append(envVars, corev1.EnvVarArgs{
269+
Name: pulumi.String("OTEL_METRICS_RUNTIME"),
270+
Value: boolToString(metrics.OtelMetricsRuntime).Untyped().(pulumi.StringOutput),
271+
})
272+
}
273+
if metrics.OtelMetricsRuntimeMinimumReadMemStatsInterval != nil {
274+
envVars = append(envVars, corev1.EnvVarArgs{
275+
Name: pulumi.String("OTEL_METRICS_RUNTIME_MINIMUM_READ_MEM_STATS_INTERVAL"),
276+
Value: pulumix.Apply(metrics.OtelMetricsRuntimeMinimumReadMemStatsInterval, func(interval *time.Duration) string {
277+
if interval == nil {
278+
return ""
279+
}
280+
return interval.String()
281+
}).Untyped().(pulumi.StringOutput),
282+
})
283+
}
284+
if metrics.OtelMetricsExporter != nil {
285+
envVars = append(envVars, corev1.EnvVarArgs{
286+
Name: pulumi.String("OTEL_METRICS_EXPORTER"),
287+
Value: metrics.OtelMetricsExporter.ToOutput(ctx).Untyped().(pulumi.StringOutput),
288+
})
289+
}
290+
if metrics.OtelMetricsKeepInMemory != nil {
291+
envVars = append(envVars, corev1.EnvVarArgs{
292+
Name: pulumi.String("OTEL_METRICS_KEEP_IN_MEMORY"),
293+
Value: boolToString(metrics.OtelMetricsKeepInMemory).Untyped().(pulumi.StringOutput),
294+
})
295+
}
296+
if metrics.OtelMetricsExporterOTLPMode != nil {
297+
envVars = append(envVars, corev1.EnvVarArgs{
298+
Name: pulumi.String("OTEL_METRICS_EXPORTER_OTLP_MODE"),
299+
Value: metrics.OtelMetricsExporterOTLPMode.ToOutput(ctx).Untyped().(pulumi.StringOutput),
300+
})
301+
}
302+
if metrics.OtelMetricsExporterOTLPEndpoint != nil {
303+
envVars = append(envVars, corev1.EnvVarArgs{
304+
Name: pulumi.String("OTEL_METRICS_EXPORTER_OTLP_ENDPOINT"),
305+
Value: metrics.OtelMetricsExporterOTLPEndpoint.ToOutput(ctx).Untyped().(pulumi.StringOutput),
306+
})
307+
}
308+
if metrics.OtelMetricsExporterOTLPInsecure != nil {
309+
envVars = append(envVars, corev1.EnvVarArgs{
310+
Name: pulumi.String("OTEL_METRICS_EXPORTER_OTLP_INSECURE"),
311+
Value: boolToString(metrics.OtelMetricsExporterOTLPInsecure).Untyped().(pulumi.StringOutput),
312+
})
313+
}
314+
}
315+
}
316+
317+
return envVars
318+
}

0 commit comments

Comments
 (0)