Skip to content

Commit ab7ceeb

Browse files
committed
feat: add log export
1 parent 75200ea commit ab7ceeb

File tree

276 files changed

+14491
-1796
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

276 files changed

+14491
-1796
lines changed

cmd/docs_events.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package cmd
33
import (
44
"encoding/json"
55
"fmt"
6-
"github.com/formancehq/ledger/internal/bus"
6+
"github.com/formancehq/ledger/pkg/events"
77
"github.com/invopop/jsonschema"
88
"github.com/spf13/cobra"
99
"os"
@@ -30,10 +30,10 @@ func NewDocEventsCommand() *cobra.Command {
3030
}
3131

3232
for _, o := range []any{
33-
bus.CommittedTransactions{},
34-
bus.DeletedMetadata{},
35-
bus.SavedMetadata{},
36-
bus.RevertedTransaction{},
33+
events.CommittedTransactions{},
34+
events.DeletedMetadata{},
35+
events.SavedMetadata{},
36+
events.RevertedTransaction{},
3737
} {
3838
schema := jsonschema.Reflect(o)
3939
data, err := json.MarshalIndent(schema, "", " ")

cmd/serve.go

+25-17
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ package cmd
33
import (
44
"github.com/formancehq/go-libs/v2/logging"
55
"github.com/formancehq/ledger/internal/api/common"
6+
"github.com/formancehq/ledger/internal/replication/drivers"
7+
"github.com/formancehq/ledger/internal/replication/drivers/all"
68
systemstore "github.com/formancehq/ledger/internal/storage/system"
7-
"github.com/formancehq/ledger/internal/worker"
89
"net/http"
910
"net/http/pprof"
1011
"time"
@@ -56,7 +57,10 @@ func NewServeCommand() *cobra.Command {
5657
Use: "serve",
5758
SilenceUsage: true,
5859
RunE: func(cmd *cobra.Command, _ []string) error {
59-
serveConfiguration := discoverServeConfiguration(cmd)
60+
serveConfiguration, err := discoverServeConfiguration(cmd)
61+
if err != nil {
62+
return err
63+
}
6064

6165
connectionOptions, err := bunconnect.ConnectionOptionsFromFlags(cmd)
6266
if err != nil {
@@ -101,6 +105,8 @@ func NewServeCommand() *cobra.Command {
101105
storage.NewFXModule(storage.ModuleConfig{
102106
AutoUpgrade: serveConfiguration.autoUpgrade,
103107
}),
108+
drivers.NewFXModule(),
109+
fx.Invoke(all.Register),
104110
systemcontroller.NewFXModule(systemcontroller.ModuleConfiguration{
105111
NumscriptInterpreter: numscriptInterpreter,
106112
NumscriptInterpreterFlags: numscriptInterpreterFlags,
@@ -129,15 +135,15 @@ func NewServeCommand() *cobra.Command {
129135
}),
130136
fx.Decorate(func(
131137
params struct {
132-
fx.In
138+
fx.In
133139

134-
Handler chi.Router
135-
HealthController *health.HealthController
136-
Logger logging.Logger
140+
Handler chi.Router
141+
HealthController *health.HealthController
142+
Logger logging.Logger
137143

138-
MeterProvider *metric.MeterProvider `optional:"true"`
139-
Exporter *otlpmetrics.InMemoryExporter `optional:"true"`
140-
},
144+
MeterProvider *metric.MeterProvider `optional:"true"`
145+
Exporter *otlpmetrics.InMemoryExporter `optional:"true"`
146+
},
141147
) chi.Router {
142148
return assembleFinalRouter(
143149
service.IsDebug(cmd),
@@ -155,10 +161,7 @@ func NewServeCommand() *cobra.Command {
155161

156162
workerEnabled, _ := cmd.Flags().GetBool(WorkerEnabledFlag)
157163
if workerEnabled {
158-
options = append(options, worker.NewFXModule(worker.ModuleConfig{
159-
Schedule: serveConfiguration.workerConfiguration.hashLogsBlockCRONSpec,
160-
MaxBlockSize: serveConfiguration.workerConfiguration.hashLogsBlockMaxSize,
161-
}))
164+
options = append(options, newWorkerModule(serveConfiguration.workerConfiguration))
162165
}
163166

164167
return service.New(cmd.OutOrStdout(), options...).Run(cmd)
@@ -199,15 +202,20 @@ type serveConfiguration struct {
199202
workerConfiguration workerConfiguration
200203
}
201204

202-
func discoverServeConfiguration(cmd *cobra.Command) serveConfiguration {
203-
ret := serveConfiguration{}
205+
func discoverServeConfiguration(cmd *cobra.Command) (*serveConfiguration, error) {
206+
ret := &serveConfiguration{}
204207
ret.ballastSize, _ = cmd.Flags().GetUint(BallastSizeInBytesFlag)
205208
ret.numscriptCacheMaxCount, _ = cmd.Flags().GetUint(NumscriptCacheMaxCountFlag)
206209
ret.autoUpgrade, _ = cmd.Flags().GetBool(AutoUpgradeFlag)
207210
ret.bind, _ = cmd.Flags().GetString(BindFlag)
208-
ret.workerConfiguration = discoverWorkerConfiguration(cmd)
209211

210-
return ret
212+
workerConfiguration, err := discoverWorkerConfiguration(cmd)
213+
if err != nil {
214+
return nil, err
215+
}
216+
ret.workerConfiguration = *workerConfiguration
217+
218+
return ret, nil
211219
}
212220

213221
func assembleFinalRouter(

cmd/worker.go

+46-15
Original file line numberDiff line numberDiff line change
@@ -6,31 +6,57 @@ import (
66
"github.com/formancehq/go-libs/v2/otlp/otlpmetrics"
77
"github.com/formancehq/go-libs/v2/otlp/otlptraces"
88
"github.com/formancehq/go-libs/v2/service"
9+
replication "github.com/formancehq/ledger/internal/replication"
10+
"github.com/formancehq/ledger/internal/replication/drivers"
11+
"github.com/formancehq/ledger/internal/replication/drivers/all"
912
"github.com/formancehq/ledger/internal/storage"
1013
"github.com/formancehq/ledger/internal/worker"
14+
"github.com/robfig/cron/v3"
1115
"github.com/spf13/cobra"
1216
"go.uber.org/fx"
17+
"time"
1318
)
1419

1520
const (
21+
WorkerPipelinesSyncPeriodFlag = "worker-pipelines-sync-period"
22+
WorkerPipelinesPullIntervalFlag = "worker-pipelines-pull-interval"
23+
WorkerPipelinesPushRetryPeriodFlag = "worker-pipelines-push-retry-period"
24+
1625
WorkerAsyncBlockHasherMaxBlockSizeFlag = "worker-async-block-hasher-max-block-size"
1726
WorkerAsyncBlockHasherScheduleFlag = "worker-async-block-hasher-schedule"
1827
)
1928

2029
type workerConfiguration struct {
21-
hashLogsBlockMaxSize int
22-
hashLogsBlockCRONSpec string
30+
AsyncBlockRunnerConfig storage.AsyncBlockRunnerConfig
31+
ReplicationConfig replication.ModuleConfig
2332
}
2433

25-
func discoverWorkerConfiguration(cmd *cobra.Command) workerConfiguration {
26-
ret := workerConfiguration{}
27-
ret.hashLogsBlockCRONSpec, _ = cmd.Flags().GetString(WorkerAsyncBlockHasherScheduleFlag)
28-
ret.hashLogsBlockMaxSize, _ = cmd.Flags().GetInt(WorkerAsyncBlockHasherMaxBlockSizeFlag)
34+
func discoverWorkerConfiguration(cmd *cobra.Command) (*workerConfiguration, error) {
35+
ret := &workerConfiguration{}
36+
hashLogsBlockCRONSpec, _ := cmd.Flags().GetString(WorkerAsyncBlockHasherScheduleFlag)
37+
if hashLogsBlockCRONSpec == "" {
38+
hashLogsBlockCRONSpec = "0 * * * * *"
39+
}
40+
parser := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
41+
42+
var err error
43+
ret.AsyncBlockRunnerConfig.Schedule, err = parser.Parse(hashLogsBlockCRONSpec)
44+
if err != nil {
45+
return nil, err
46+
}
47+
48+
ret.AsyncBlockRunnerConfig.MaxBlockSize, _ = cmd.Flags().GetInt(WorkerAsyncBlockHasherMaxBlockSizeFlag)
49+
ret.ReplicationConfig.SyncPeriod, _ = cmd.Flags().GetDuration(WorkerPipelinesSyncPeriodFlag)
50+
ret.ReplicationConfig.PullInterval, _ = cmd.Flags().GetDuration(WorkerPipelinesPullIntervalFlag)
51+
ret.ReplicationConfig.PushRetryPeriod, _ = cmd.Flags().GetDuration(WorkerPipelinesPushRetryPeriodFlag)
2952

30-
return ret
53+
return ret, nil
3154
}
3255

3356
func addWorkerFlags(cmd *cobra.Command) {
57+
cmd.Flags().Duration(WorkerPipelinesSyncPeriodFlag, 5*time.Second, "Pipelines sync period")
58+
cmd.Flags().Duration(WorkerPipelinesPullIntervalFlag, replication.DefaultPullInterval, "Pipelines pull interval")
59+
cmd.Flags().Duration(WorkerPipelinesPushRetryPeriodFlag, replication.DefaultPushRetryPeriod, "Pipelines push retry period")
3460
cmd.Flags().Int(WorkerAsyncBlockHasherMaxBlockSizeFlag, 1000, "Max block size")
3561
cmd.Flags().String(WorkerAsyncBlockHasherScheduleFlag, "0 * * * * *", "Schedule")
3662
}
@@ -45,7 +71,10 @@ func NewWorkerCommand() *cobra.Command {
4571
return err
4672
}
4773

48-
workerConfiguration := discoverWorkerConfiguration(cmd)
74+
workerConfiguration, err := discoverWorkerConfiguration(cmd)
75+
if err != nil {
76+
return err
77+
}
4978

5079
return service.New(cmd.OutOrStdout(),
5180
fx.NopLogger,
@@ -54,10 +83,9 @@ func NewWorkerCommand() *cobra.Command {
5483
otlpmetrics.FXModuleFromFlags(cmd),
5584
bunconnect.Module(*connectionOptions, service.IsDebug(cmd)),
5685
storage.NewFXModule(storage.ModuleConfig{}),
57-
worker.NewFXModule(worker.ModuleConfig{
58-
MaxBlockSize: workerConfiguration.hashLogsBlockMaxSize,
59-
Schedule: workerConfiguration.hashLogsBlockCRONSpec,
60-
}),
86+
drivers.NewFXModule(),
87+
fx.Invoke(all.Register),
88+
newWorkerModule(*workerConfiguration),
6189
).Run(cmd)
6290
},
6391
}
@@ -71,6 +99,9 @@ func NewWorkerCommand() *cobra.Command {
7199
return cmd
72100
}
73101

74-
func newWorkerModule() fx.Option {
75-
return fx.Options()
76-
}
102+
func newWorkerModule(configuration workerConfiguration) fx.Option {
103+
return worker.NewFXModule(worker.ModuleConfig{
104+
AsyncBlockRunnerConfig: configuration.AsyncBlockRunnerConfig,
105+
ReplicationConfig: configuration.ReplicationConfig,
106+
})
107+
}

0 commit comments

Comments
 (0)