Skip to content

Commit 065b173

Browse files
committed
feat: add log export
1 parent bf5b817 commit 065b173

File tree

305 files changed

+15809
-2461
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

305 files changed

+15809
-2461
lines changed

Justfile

+3-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ tidy:
1717
@cd {{justfile_directory()}}/deployments/pulumi && go mod tidy
1818

1919
generate:
20+
@rm $(find ./internal -name '*_generated_test.go') || true
2021
@go generate ./...
22+
g: generate
2123

2224
export-docs-events:
2325
@go run . docs events --write-dir docs/events
@@ -35,7 +37,7 @@ openapi:
3537
@yq eval-all '. as $item ireduce ({}; . * $item)' openapi/v1.yaml openapi/v2.yaml openapi/overlay.yaml > openapi.yaml
3638
@npx -y widdershins {{justfile_directory()}}/openapi/v2.yaml -o {{justfile_directory()}}/docs/api/README.md --search false --language_tabs 'http:HTTP' --summary --omitHeader
3739

38-
generate-client:
40+
generate-client: openapi
3941
@speakeasy generate sdk -s openapi.yaml -o ./pkg/client -l go
4042

4143
release-local:

cmd/buckets.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"github.com/spf13/cobra"
55
)
66

7-
func NewBucket() *cobra.Command {
7+
func NewBucketsCommand() *cobra.Command {
88
ret := &cobra.Command{
99
Use: "buckets",
1010
Aliases: []string{"storage"},

cmd/docs_events.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package cmd
33
import (
44
"encoding/json"
55
"fmt"
6-
"github.com/formancehq/ledger/internal/bus"
6+
"github.com/formancehq/ledger/pkg/events"
77
"github.com/invopop/jsonschema"
88
"github.com/spf13/cobra"
99
"os"
@@ -30,10 +30,10 @@ func NewDocEventsCommand() *cobra.Command {
3030
}
3131

3232
for _, o := range []any{
33-
bus.CommittedTransactions{},
34-
bus.DeletedMetadata{},
35-
bus.SavedMetadata{},
36-
bus.RevertedTransaction{},
33+
events.CommittedTransactions{},
34+
events.DeletedMetadata{},
35+
events.SavedMetadata{},
36+
events.RevertedTransaction{},
3737
} {
3838
schema := jsonschema.Reflect(o)
3939
data, err := json.MarshalIndent(schema, "", " ")

cmd/root.go

+5-8
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,12 @@ func NewRootCommand() *cobra.Command {
3131
Version: Version,
3232
}
3333

34-
serve := NewServeCommand()
35-
version := NewVersion()
36-
37-
buckets := NewBucket()
34+
root.AddCommand(NewServeCommand())
35+
root.AddCommand(NewBucketsCommand())
36+
root.AddCommand(NewVersionCommand())
37+
root.AddCommand(NewWorkerCommand())
38+
root.AddCommand(NewDocsCommand())
3839

39-
root.AddCommand(serve)
40-
root.AddCommand(buckets)
41-
root.AddCommand(version)
4240
root.AddCommand(bunmigrate.NewDefaultCommand(func(cmd *cobra.Command, _ []string, db *bun.DB) error {
4341
logger := logging.NewDefaultLogger(cmd.OutOrStdout(), service.IsDebug(cmd), false, false)
4442
cmd.SetContext(logging.ContextWithLogger(cmd.Context(), logger))
@@ -54,7 +52,6 @@ func NewRootCommand() *cobra.Command {
5452

5553
return driver.UpgradeAllBuckets(cmd.Context())
5654
}))
57-
root.AddCommand(NewDocsCommand())
5855

5956
return root
6057
}

cmd/serve.go

+23-4
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package cmd
22

33
import (
4-
"net/http"
5-
"net/http/pprof"
6-
"time"
7-
84
"github.com/formancehq/go-libs/v2/logging"
95
"github.com/formancehq/ledger/internal/api/common"
6+
"github.com/formancehq/ledger/internal/replication/drivers"
7+
"github.com/formancehq/ledger/internal/replication/drivers/all"
8+
"github.com/formancehq/ledger/internal/replication/runner"
109
systemstore "github.com/formancehq/ledger/internal/storage/system"
10+
"net/http"
11+
"net/http/pprof"
12+
"time"
1113

1214
apilib "github.com/formancehq/go-libs/v2/api"
1315
"github.com/formancehq/go-libs/v2/health"
@@ -48,6 +50,7 @@ const (
4850
NumscriptInterpreterFlagsToPass = "numscript-interpreter-flags"
4951
DefaultPageSizeFlag = "default-page-size"
5052
MaxPageSizeFlag = "max-page-size"
53+
WorkerEnabledFlag = "worker"
5154
)
5255

5356
func NewServeCommand() *cobra.Command {
@@ -98,6 +101,8 @@ func NewServeCommand() *cobra.Command {
98101
auth.FXModuleFromFlags(cmd),
99102
bunconnect.Module(*connectionOptions, service.IsDebug(cmd)),
100103
storage.NewFXModule(serveConfiguration.autoUpgrade),
104+
drivers.NewFXModule(),
105+
fx.Invoke(all.Register),
101106
systemcontroller.NewFXModule(systemcontroller.ModuleConfiguration{
102107
NumscriptInterpreter: numscriptInterpreter,
103108
NumscriptInterpreterFlags: numscriptInterpreterFlags,
@@ -150,6 +155,15 @@ func NewServeCommand() *cobra.Command {
150155
}),
151156
}
152157

158+
workerEnabled, _ := cmd.Flags().GetBool(WorkerEnabledFlag)
159+
if workerEnabled {
160+
options = append(options, runner.NewFXModule(runner.ModuleConfig{
161+
SyncPeriod: serveConfiguration.Worker.pipelinesSyncPeriod,
162+
PullInterval: serveConfiguration.Worker.pipelinesPullInterval,
163+
PushRetryPeriod: serveConfiguration.Worker.pipelinesPushRetryPeriod,
164+
}))
165+
}
166+
153167
return service.New(cmd.OutOrStdout(), options...).Run(cmd)
154168
},
155169
}
@@ -164,7 +178,9 @@ func NewServeCommand() *cobra.Command {
164178
cmd.Flags().String(NumscriptInterpreterFlagsToPass, "", "Feature flags to pass to the experimental numscript interpreter")
165179
cmd.Flags().Uint64(MaxPageSizeFlag, 100, "Max page size")
166180
cmd.Flags().Uint64(DefaultPageSizeFlag, 15, "Default page size")
181+
cmd.Flags().Bool(WorkerEnabledFlag, false, "Enable worker")
167182

183+
addWorkerFlags(cmd)
168184
service.AddFlags(cmd.Flags())
169185
bunconnect.AddFlags(cmd.Flags())
170186
otlpmetrics.AddFlags(cmd.Flags())
@@ -183,6 +199,7 @@ type serveConfiguration struct {
183199
numscriptCacheMaxCount uint
184200
autoUpgrade bool
185201
bind string
202+
Worker workerConfiguration
186203
}
187204

188205
func discoverServeConfiguration(cmd *cobra.Command) serveConfiguration {
@@ -192,6 +209,8 @@ func discoverServeConfiguration(cmd *cobra.Command) serveConfiguration {
192209
ret.autoUpgrade, _ = cmd.Flags().GetBool(AutoUpgradeFlag)
193210
ret.bind, _ = cmd.Flags().GetString(BindFlag)
194211

212+
ret.Worker = discoverWorkerConfiguration(cmd)
213+
195214
return ret
196215
}
197216

cmd/version.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ func PrintVersion(_ *cobra.Command, _ []string) {
1212
fmt.Printf("Commit: %s \n", Commit)
1313
}
1414

15-
func NewVersion() *cobra.Command {
15+
func NewVersionCommand() *cobra.Command {
1616
return &cobra.Command{
1717
Use: "version",
1818
Short: "Get version",

cmd/worker.go

+82
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package cmd
2+
3+
import (
4+
"github.com/formancehq/go-libs/v2/bun/bunconnect"
5+
"github.com/formancehq/go-libs/v2/otlp"
6+
"github.com/formancehq/go-libs/v2/otlp/otlpmetrics"
7+
"github.com/formancehq/go-libs/v2/otlp/otlptraces"
8+
"github.com/formancehq/go-libs/v2/service"
9+
"github.com/formancehq/ledger/internal/replication/drivers"
10+
"github.com/formancehq/ledger/internal/replication/drivers/all"
11+
"github.com/formancehq/ledger/internal/replication/runner"
12+
"github.com/formancehq/ledger/internal/storage"
13+
"github.com/spf13/cobra"
14+
"go.uber.org/fx"
15+
"time"
16+
)
17+
18+
const (
19+
PipelinesSyncPeriodFlag = "pipelines-sync-period"
20+
PipelinesPullIntervalFlag = "pipelines-pull-interval"
21+
PipelinesPushRetryPeriodFlag = "pipelines-push-retry-period"
22+
)
23+
24+
func addWorkerFlags(cmd *cobra.Command) {
25+
cmd.Flags().Duration(PipelinesSyncPeriodFlag, 5*time.Second, "Pipelines sync period")
26+
cmd.Flags().Duration(PipelinesPullIntervalFlag, runner.DefaultPullInterval, "Pipelines pull interval")
27+
cmd.Flags().Duration(PipelinesPushRetryPeriodFlag, runner.DefaultPushRetryPeriod, "Pipelines push retry period")
28+
}
29+
30+
type workerConfiguration struct {
31+
pipelinesSyncPeriod time.Duration
32+
pipelinesPullInterval time.Duration
33+
pipelinesPushRetryPeriod time.Duration
34+
}
35+
36+
func discoverWorkerConfiguration(cmd *cobra.Command) workerConfiguration {
37+
ret := workerConfiguration{}
38+
ret.pipelinesSyncPeriod, _ = cmd.Flags().GetDuration(PipelinesSyncPeriodFlag)
39+
ret.pipelinesPullInterval, _ = cmd.Flags().GetDuration(PipelinesPullIntervalFlag)
40+
ret.pipelinesPushRetryPeriod, _ = cmd.Flags().GetDuration(PipelinesPushRetryPeriodFlag)
41+
42+
return ret
43+
}
44+
45+
func NewWorkerCommand() *cobra.Command {
46+
cmd := &cobra.Command{
47+
Use: "worker",
48+
SilenceUsage: true,
49+
RunE: func(cmd *cobra.Command, _ []string) error {
50+
configuration := discoverWorkerConfiguration(cmd)
51+
52+
connectionOptions, err := bunconnect.ConnectionOptionsFromFlags(cmd)
53+
if err != nil {
54+
return err
55+
}
56+
57+
return service.New(cmd.OutOrStdout(),
58+
fx.NopLogger,
59+
otlp.FXModuleFromFlags(cmd),
60+
otlptraces.FXModuleFromFlags(cmd),
61+
otlpmetrics.FXModuleFromFlags(cmd),
62+
bunconnect.Module(*connectionOptions, service.IsDebug(cmd)),
63+
storage.NewFXModule(false),
64+
drivers.NewFXModule(),
65+
fx.Invoke(all.Register),
66+
runner.NewFXModule(runner.ModuleConfig{
67+
SyncPeriod: configuration.pipelinesSyncPeriod,
68+
PullInterval: configuration.pipelinesPullInterval,
69+
PushRetryPeriod: configuration.pipelinesPushRetryPeriod,
70+
}),
71+
).Run(cmd)
72+
},
73+
}
74+
75+
addWorkerFlags(cmd)
76+
service.AddFlags(cmd.Flags())
77+
bunconnect.AddFlags(cmd.Flags())
78+
otlpmetrics.AddFlags(cmd.Flags())
79+
otlptraces.AddFlags(cmd.Flags())
80+
81+
return cmd
82+
}

deployments/pulumi/main.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"errors"
55
"fmt"
6+
"github.com/formancehq/go-libs/v2/pointer"
67
"github.com/formancehq/ledger/deployments/pulumi/pkg"
78
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
89
"github.com/pulumi/pulumi/sdk/v3/go/pulumi/config"
@@ -44,10 +45,11 @@ func deploy(ctx *pulumi.Context) error {
4445
Postgres: pulumi_ledger.PostgresArgs{
4546
URI: pulumi.String(postgresURI),
4647
},
47-
Debug: pulumi.Bool(conf.GetBool("debug")),
48-
ReplicaCount: pulumi.Int(conf.GetInt("replicaCount")),
49-
ExperimentalFeatures: pulumi.Bool(conf.GetBool("experimentalFeatures")),
50-
Upgrade: pulumix.Val(pulumi_ledger.UpgradeMode(config.Get(ctx, "upgrade-mode"))),
48+
Debug: pulumi.Bool(conf.GetBool("debug")),
49+
API: pulumi_ledger.APIArgs{
50+
ReplicaCount: pulumix.Val(pointer.For(conf.GetInt("replicaCount"))),
51+
ExperimentalFeatures: pulumi.Bool(conf.GetBool("experimentalFeatures")),
52+
},
5153
})
5254

5355
return err

0 commit comments

Comments
 (0)