Skip to content

Commit 995c122

Browse files
committed
feat: add log export
1 parent f940427 commit 995c122

File tree

269 files changed

+14280
-1482
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

269 files changed

+14280
-1482
lines changed

cmd/docs_events.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package cmd
33
import (
44
"encoding/json"
55
"fmt"
6-
"github.com/formancehq/ledger/internal/bus"
6+
"github.com/formancehq/ledger/pkg/events"
77
"github.com/invopop/jsonschema"
88
"github.com/spf13/cobra"
99
"os"
@@ -30,10 +30,10 @@ func NewDocEventsCommand() *cobra.Command {
3030
}
3131

3232
for _, o := range []any{
33-
bus.CommittedTransactions{},
34-
bus.DeletedMetadata{},
35-
bus.SavedMetadata{},
36-
bus.RevertedTransaction{},
33+
events.CommittedTransactions{},
34+
events.DeletedMetadata{},
35+
events.SavedMetadata{},
36+
events.RevertedTransaction{},
3737
} {
3838
schema := jsonschema.Reflect(o)
3939
data, err := json.MarshalIndent(schema, "", " ")

cmd/serve.go

+13-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ package cmd
33
import (
44
"github.com/formancehq/go-libs/v2/logging"
55
"github.com/formancehq/ledger/internal/api/common"
6+
"github.com/formancehq/ledger/internal/replication/drivers"
7+
"github.com/formancehq/ledger/internal/replication/drivers/all"
8+
"github.com/formancehq/ledger/internal/replication/runner"
69
systemstore "github.com/formancehq/ledger/internal/storage/system"
710
"net/http"
811
"net/http/pprof"
@@ -98,6 +101,8 @@ func NewServeCommand() *cobra.Command {
98101
auth.FXModuleFromFlags(cmd),
99102
bunconnect.Module(*connectionOptions, service.IsDebug(cmd)),
100103
storage.NewFXModule(serveConfiguration.autoUpgrade),
104+
drivers.NewFXModule(),
105+
fx.Invoke(all.Register),
101106
systemcontroller.NewFXModule(systemcontroller.ModuleConfiguration{
102107
NumscriptInterpreter: numscriptInterpreter,
103108
NumscriptInterpreterFlags: numscriptInterpreterFlags,
@@ -152,7 +157,11 @@ func NewServeCommand() *cobra.Command {
152157

153158
workerEnabled, _ := cmd.Flags().GetBool(WorkerEnabledFlag)
154159
if workerEnabled {
155-
options = append(options, newWorkerModule())
160+
options = append(options, runner.NewFXModule(runner.ModuleConfig{
161+
SyncPeriod: serveConfiguration.Worker.pipelinesSyncPeriod,
162+
PullInterval: serveConfiguration.Worker.pipelinesPullInterval,
163+
PushRetryPeriod: serveConfiguration.Worker.pipelinesPushRetryPeriod,
164+
}))
156165
}
157166

158167
return service.New(cmd.OutOrStdout(), options...).Run(cmd)
@@ -190,6 +199,7 @@ type serveConfiguration struct {
190199
numscriptCacheMaxCount uint
191200
autoUpgrade bool
192201
bind string
202+
Worker workerConfiguration
193203
}
194204

195205
func discoverServeConfiguration(cmd *cobra.Command) serveConfiguration {
@@ -199,6 +209,8 @@ func discoverServeConfiguration(cmd *cobra.Command) serveConfiguration {
199209
ret.autoUpgrade, _ = cmd.Flags().GetBool(AutoUpgradeFlag)
200210
ret.bind, _ = cmd.Flags().GetString(BindFlag)
201211

212+
ret.Worker = discoverWorkerConfiguration(cmd)
213+
202214
return ret
203215
}
204216

cmd/worker.go

+41-3
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,49 @@ import (
66
"github.com/formancehq/go-libs/v2/otlp/otlpmetrics"
77
"github.com/formancehq/go-libs/v2/otlp/otlptraces"
88
"github.com/formancehq/go-libs/v2/service"
9+
"github.com/formancehq/ledger/internal/replication/drivers"
10+
"github.com/formancehq/ledger/internal/replication/drivers/all"
11+
"github.com/formancehq/ledger/internal/replication/runner"
912
"github.com/formancehq/ledger/internal/storage"
1013
"github.com/spf13/cobra"
1114
"go.uber.org/fx"
15+
"time"
16+
)
17+
18+
const (
19+
PipelinesSyncPeriodFlag = "pipelines-sync-period"
20+
PipelinesPullIntervalFlag = "pipelines-pull-interval"
21+
PipelinesPushRetryPeriodFlag = "pipelines-push-retry-period"
1222
)
1323

1424
func addWorkerFlags(cmd *cobra.Command) {
25+
cmd.Flags().Duration(PipelinesSyncPeriodFlag, 5*time.Second, "Pipelines sync period")
26+
cmd.Flags().Duration(PipelinesPullIntervalFlag, runner.DefaultPullInterval, "Pipelines pull interval")
27+
cmd.Flags().Duration(PipelinesPushRetryPeriodFlag, runner.DefaultPushRetryPeriod, "Pipelines push retry period")
28+
}
29+
30+
type workerConfiguration struct {
31+
pipelinesSyncPeriod time.Duration
32+
pipelinesPullInterval time.Duration
33+
pipelinesPushRetryPeriod time.Duration
34+
}
35+
36+
func discoverWorkerConfiguration(cmd *cobra.Command) workerConfiguration {
37+
ret := workerConfiguration{}
38+
ret.pipelinesSyncPeriod, _ = cmd.Flags().GetDuration(PipelinesSyncPeriodFlag)
39+
ret.pipelinesPullInterval, _ = cmd.Flags().GetDuration(PipelinesPullIntervalFlag)
40+
ret.pipelinesPushRetryPeriod, _ = cmd.Flags().GetDuration(PipelinesPushRetryPeriodFlag)
41+
42+
return ret
1543
}
1644

1745
func NewWorkerCommand() *cobra.Command {
1846
cmd := &cobra.Command{
1947
Use: "worker",
2048
SilenceUsage: true,
2149
RunE: func(cmd *cobra.Command, _ []string) error {
50+
configuration := discoverWorkerConfiguration(cmd)
51+
2252
connectionOptions, err := bunconnect.ConnectionOptionsFromFlags(cmd)
2353
if err != nil {
2454
return err
@@ -31,7 +61,7 @@ func NewWorkerCommand() *cobra.Command {
3161
otlpmetrics.FXModuleFromFlags(cmd),
3262
bunconnect.Module(*connectionOptions, service.IsDebug(cmd)),
3363
storage.NewFXModule(false),
34-
newWorkerModule(),
64+
newWorkerModule(configuration),
3565
).Run(cmd)
3666
},
3767
}
@@ -45,6 +75,14 @@ func NewWorkerCommand() *cobra.Command {
4575
return cmd
4676
}
4777

48-
func newWorkerModule() fx.Option {
49-
return fx.Options()
78+
func newWorkerModule(configuration workerConfiguration) fx.Option {
79+
return fx.Options(
80+
drivers.NewFXModule(),
81+
fx.Invoke(all.Register),
82+
runner.NewFXModule(runner.ModuleConfig{
83+
SyncPeriod: configuration.pipelinesSyncPeriod,
84+
PullInterval: configuration.pipelinesPullInterval,
85+
PushRetryPeriod: configuration.pipelinesPushRetryPeriod,
86+
}),
87+
)
5088
}

0 commit comments

Comments
 (0)