Skip to content

Commit 27b3bec

Browse files
committed
feat: add logs exporter
1 parent 9db4450 commit 27b3bec

File tree

309 files changed

+16864
-1777
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

309 files changed

+16864
-1777
lines changed

.github/workflows/main.yml

-2
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,6 @@ jobs:
7474
GoReleaser:
7575
runs-on: "formance-runner"
7676
if: contains(github.event.pull_request.labels.*.name, 'build-images') || github.ref == 'refs/heads/main' || github.event_name == 'merge_group'
77-
needs:
78-
- Dirty
7977
steps:
8078
- uses: earthly/actions-setup@v1
8179
with:

Earthfile

+1-31
Original file line numberDiff line numberDiff line change
@@ -49,34 +49,4 @@ deploy:
4949
RUN kubectl patch Versions.formance.com default -p "{\"spec\":{\"ledger\": \"${tag}\"}}" --type=merge
5050

5151
deploy-staging:
52-
BUILD --pass-args core+deploy-staging
53-
54-
export-database-schema:
55-
FROM +sources
56-
RUN go install github.com/roerohan/wait-for-it@latest
57-
WITH DOCKER --load=postgres:15-alpine=+postgres --pull schemaspy/schemaspy:6.2.4
58-
RUN bash -c '
59-
echo "Creating PG server...";
60-
postgresContainerID=$(docker run -d --rm -e POSTGRES_USER=root -e POSTGRES_PASSWORD=root -e POSTGRES_DB=formance --net=host postgres:15-alpine);
61-
wait-for-it -w 127.0.0.1:5432;
62-
63-
echo "Creating bucket...";
64-
go run main.go buckets upgrade _default --postgres-uri "postgres://root:[email protected]:5432/formance?sslmode=disable";
65-
66-
echo "Exporting schemas...";
67-
docker run --rm -u root \
68-
-v ./docs/database:/output \
69-
--net=host \
70-
schemaspy/schemaspy:6.2.4 -u root -db formance -t pgsql11 -host 127.0.0.1 -port 5432 -p root -schemas _system,_default;
71-
72-
docker kill "$postgresContainerID";
73-
'
74-
END
75-
SAVE ARTIFACT docs/database/_system/diagrams AS LOCAL docs/database/_system/diagrams
76-
SAVE ARTIFACT docs/database/_default/diagrams AS LOCAL docs/database/_default/diagrams
77-
78-
openapi:
79-
FROM core+base-image
80-
WORKDIR /src
81-
COPY openapi.yaml openapi.yaml
82-
SAVE ARTIFACT ./openapi.yaml
52+
BUILD --pass-args core+deploy-staging

cmd/config.go

+34-2
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,50 @@ package cmd
22

33
import (
44
"fmt"
5+
"github.com/mitchellh/mapstructure"
6+
"github.com/robfig/cron/v3"
57
"github.com/spf13/cobra"
68
"github.com/spf13/viper"
9+
"reflect"
710
)
811

9-
func LoadConfig[V any](cmd *cobra.Command) (*V, error){
12+
type commonConfig struct {
13+
NumscriptInterpreter bool `mapstructure:"experimental-numscript-interpreter"`
14+
NumscriptInterpreterFlags []string `mapstructure:"experimental-numscript-interpreter-flags"`
15+
ExperimentalFeaturesEnabled bool `mapstructure:"experimental-features"`
16+
ExperimentalConnectors bool `mapstructure:"experimental-connectors"`
17+
}
18+
19+
func decodeCronSchedule(sourceType, destType reflect.Type, value any) (any, error) {
20+
if sourceType.Kind() != reflect.String {
21+
return value, nil
22+
}
23+
if destType != reflect.TypeOf((*cron.Schedule)(nil)).Elem() {
24+
return value, nil
25+
}
26+
27+
parser := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)
28+
schedule, err := parser.Parse(value.(string))
29+
if err != nil {
30+
return nil, fmt.Errorf("parsing cron schedule: %w", err)
31+
}
32+
33+
return schedule, nil
34+
}
35+
36+
func LoadConfig[V any](cmd *cobra.Command) (*V, error) {
1037
v := viper.New()
1138
if err := v.BindPFlags(cmd.Flags()); err != nil {
1239
return nil, fmt.Errorf("binding flags: %w", err)
1340
}
1441

1542
var cfg V
16-
if err := v.Unmarshal(&cfg); err != nil {
43+
if err := v.Unmarshal(&cfg,
44+
viper.DecodeHook(mapstructure.ComposeDecodeHookFunc(
45+
decodeCronSchedule,
46+
mapstructure.StringToTimeDurationHookFunc(),
47+
)),
48+
); err != nil {
1749
return nil, fmt.Errorf("unmarshalling config: %w", err)
1850
}
1951

cmd/docs_events.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package cmd
33
import (
44
"encoding/json"
55
"fmt"
6-
"github.com/formancehq/ledger/internal/bus"
6+
"github.com/formancehq/ledger/pkg/events"
77
"github.com/invopop/jsonschema"
88
"github.com/spf13/cobra"
99
"os"
@@ -30,10 +30,10 @@ func NewDocEventsCommand() *cobra.Command {
3030
}
3131

3232
for _, o := range []any{
33-
bus.CommittedTransactions{},
34-
bus.DeletedMetadata{},
35-
bus.SavedMetadata{},
36-
bus.RevertedTransaction{},
33+
events.CommittedTransactions{},
34+
events.DeletedMetadata{},
35+
events.SavedMetadata{},
36+
events.RevertedTransaction{},
3737
} {
3838
schema := jsonschema.Reflect(o)
3939
data, err := json.MarshalIndent(schema, "", " ")

cmd/root.go

+10
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@ import (
1414

1515
const (
1616
ServiceName = "ledger"
17+
18+
NumscriptInterpreterFlag = "experimental-numscript-interpreter"
19+
NumscriptInterpreterFlagsToPass = "experimental-numscript-interpreter-flags"
20+
ExperimentalFeaturesFlag = "experimental-features"
21+
ExperimentalConnectors = "experimental-connectors"
1722
)
1823

1924
var (
@@ -30,6 +35,11 @@ func NewRootCommand() *cobra.Command {
3035
Version: Version,
3136
}
3237

38+
root.PersistentFlags().Bool(ExperimentalFeaturesFlag, false, "Enable features configurability")
39+
root.PersistentFlags().Bool(NumscriptInterpreterFlag, false, "Enable experimental numscript rewrite")
40+
root.PersistentFlags().String(NumscriptInterpreterFlagsToPass, "", "Feature flags to pass to the experimental numscript interpreter")
41+
root.PersistentFlags().Bool(ExperimentalConnectors, false, "Enable connectors support")
42+
3343
root.AddCommand(NewServeCommand())
3444
root.AddCommand(NewBucketsCommand())
3545
root.AddCommand(NewVersionCommand())

cmd/serve.go

+16-20
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@ import (
44
"fmt"
55
"github.com/formancehq/go-libs/v2/logging"
66
"github.com/formancehq/ledger/internal/api/common"
7+
"github.com/formancehq/ledger/internal/replication/drivers"
8+
"github.com/formancehq/ledger/internal/replication/drivers/all"
79
systemstore "github.com/formancehq/ledger/internal/storage/system"
8-
"github.com/formancehq/ledger/internal/worker"
910
"net/http"
1011
"net/http/pprof"
1112
"time"
@@ -37,7 +38,8 @@ import (
3738
"go.uber.org/fx"
3839
)
3940

40-
type ServeConfig struct {
41+
type ServeCommandConfig struct {
42+
commonConfig `mapstructure:",squash"`
4143
WorkerConfiguration `mapstructure:",squash"`
4244

4345
Bind string `mapstructure:"bind"`
@@ -49,9 +51,6 @@ type ServeConfig struct {
4951
DefaultPageSize uint64 `mapstructure:"default-page-size"`
5052
MaxPageSize uint64 `mapstructure:"max-page-size"`
5153
WorkerEnabled bool `mapstructure:"worker"`
52-
NumscriptInterpreter bool `mapstructure:"experimental-numscript-interpreter"`
53-
NumscriptInterpreterFlags []string `mapstructure:"experimental-numscript-interpreter-flags"`
54-
ExperimentalFeaturesEnabled bool `mapstructure:"experimental-features"`
5554
}
5655

5756
const (
@@ -65,9 +64,6 @@ const (
6564
DefaultPageSizeFlag = "default-page-size"
6665
MaxPageSizeFlag = "max-page-size"
6766
WorkerEnabledFlag = "worker"
68-
NumscriptInterpreterFlag = "experimental-numscript-interpreter"
69-
NumscriptInterpreterFlagsToPass = "experimental-numscript-interpreter-flags"
70-
ExperimentalFeaturesFlag = "experimental-features"
7167
)
7268

7369
func NewServeCommand() *cobra.Command {
@@ -76,7 +72,7 @@ func NewServeCommand() *cobra.Command {
7672
SilenceUsage: true,
7773
RunE: func(cmd *cobra.Command, _ []string) error {
7874

79-
cfg, err := LoadConfig[ServeConfig](cmd)
75+
cfg, err := LoadConfig[ServeCommandConfig](cmd)
8076
if err != nil {
8177
return fmt.Errorf("loading config: %w", err)
8278
}
@@ -97,6 +93,8 @@ func NewServeCommand() *cobra.Command {
9793
storage.NewFXModule(storage.ModuleConfig{
9894
AutoUpgrade: cfg.AutoUpgrade,
9995
}),
96+
drivers.NewFXModule(),
97+
fx.Invoke(all.Register),
10098
systemcontroller.NewFXModule(systemcontroller.ModuleConfiguration{
10199
NumscriptInterpreter: cfg.NumscriptInterpreter,
102100
NumscriptInterpreterFlags: cfg.NumscriptInterpreterFlags,
@@ -122,18 +120,19 @@ func NewServeCommand() *cobra.Command {
122120
MaxPageSize: cfg.MaxPageSize,
123121
DefaultPageSize: cfg.DefaultPageSize,
124122
},
123+
Connectors: cfg.ExperimentalConnectors,
125124
}),
126125
fx.Decorate(func(
127126
params struct {
128-
fx.In
127+
fx.In
129128

130-
Handler chi.Router
131-
HealthController *health.HealthController
132-
Logger logging.Logger
129+
Handler chi.Router
130+
HealthController *health.HealthController
131+
Logger logging.Logger
133132

134-
MeterProvider *metric.MeterProvider `optional:"true"`
135-
Exporter *otlpmetrics.InMemoryExporter `optional:"true"`
136-
},
133+
MeterProvider *metric.MeterProvider `optional:"true"`
134+
Exporter *otlpmetrics.InMemoryExporter `optional:"true"`
135+
},
137136
) chi.Router {
138137
return assembleFinalRouter(
139138
service.IsDebug(cmd),
@@ -150,10 +149,7 @@ func NewServeCommand() *cobra.Command {
150149
}
151150

152151
if cfg.WorkerEnabled {
153-
options = append(options, worker.NewFXModule(worker.ModuleConfig{
154-
Schedule: cfg.WorkerConfiguration.HashLogsBlockCRONSpec,
155-
MaxBlockSize: cfg.WorkerConfiguration.HashLogsBlockMaxSize,
156-
}))
152+
options = append(options, newWorkerModule(cfg.WorkerConfiguration))
157153
}
158154

159155
return service.New(cmd.OutOrStdout(), options...).Run(cmd)

cmd/worker.go

+41-7
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,46 @@ import (
77
"github.com/formancehq/go-libs/v2/otlp/otlpmetrics"
88
"github.com/formancehq/go-libs/v2/otlp/otlptraces"
99
"github.com/formancehq/go-libs/v2/service"
10+
"github.com/formancehq/ledger/internal/replication"
11+
"github.com/formancehq/ledger/internal/replication/drivers"
12+
"github.com/formancehq/ledger/internal/replication/drivers/all"
1013
"github.com/formancehq/ledger/internal/storage"
1114
"github.com/formancehq/ledger/internal/worker"
15+
"github.com/robfig/cron/v3"
1216
"github.com/spf13/cobra"
1317
"go.uber.org/fx"
18+
"time"
1419
)
1520

1621
const (
22+
WorkerPipelinesSyncPeriodFlag = "worker-pipelines-sync-period"
23+
WorkerPipelinesPullIntervalFlag = "worker-pipelines-pull-interval"
24+
WorkerPipelinesPushRetryPeriodFlag = "worker-pipelines-push-retry-period"
25+
1726
WorkerAsyncBlockHasherMaxBlockSizeFlag = "worker-async-block-hasher-max-block-size"
1827
WorkerAsyncBlockHasherScheduleFlag = "worker-async-block-hasher-schedule"
1928
)
2029

2130
type WorkerConfiguration struct {
22-
HashLogsBlockMaxSize int `mapstructure:"worker-async-block-hasher-max-block-size"`
23-
HashLogsBlockCRONSpec string `mapstructure:"worker-async-block-hasher-schedule"`
31+
HashLogsBlockMaxSize int `mapstructure:"worker-async-block-hasher-max-block-size"`
32+
HashLogsBlockCRONSpec cron.Schedule `mapstructure:"worker-async-block-hasher-schedule"`
33+
34+
SyncPeriod time.Duration `mapstructure:"worker-pipelines-sync-period"`
35+
PushRetryPeriod time.Duration `mapstructure:"worker-pipelines-push-retry-period"`
36+
PullInterval time.Duration `mapstructure:"worker-pipelines-pull-interval"`
37+
}
38+
39+
type WorkerCommandConfiguration struct {
40+
WorkerConfiguration `mapstructure:",squash"`
41+
commonConfig `mapstructure:",squash"`
2442
}
2543

2644
func addWorkerFlags(cmd *cobra.Command) {
2745
cmd.Flags().Int(WorkerAsyncBlockHasherMaxBlockSizeFlag, 1000, "Max block size")
2846
cmd.Flags().String(WorkerAsyncBlockHasherScheduleFlag, "0 * * * * *", "Schedule")
47+
cmd.Flags().Duration(WorkerPipelinesSyncPeriodFlag, 5*time.Second, "Pipelines sync period")
48+
cmd.Flags().Duration(WorkerPipelinesPullIntervalFlag, 5*time.Second, "Pipelines pull interval")
49+
cmd.Flags().Duration(WorkerPipelinesPushRetryPeriodFlag, 10*time.Second, "Pipelines push retry period")
2950
}
3051

3152
func NewWorkerCommand() *cobra.Command {
@@ -38,7 +59,7 @@ func NewWorkerCommand() *cobra.Command {
3859
return err
3960
}
4061

41-
cfg, err := LoadConfig[WorkerConfiguration](cmd)
62+
cfg, err := LoadConfig[WorkerCommandConfiguration](cmd)
4263
if err != nil {
4364
return fmt.Errorf("loading config: %w", err)
4465
}
@@ -50,10 +71,9 @@ func NewWorkerCommand() *cobra.Command {
5071
otlpmetrics.FXModuleFromFlags(cmd),
5172
bunconnect.Module(*connectionOptions, service.IsDebug(cmd)),
5273
storage.NewFXModule(storage.ModuleConfig{}),
53-
worker.NewFXModule(worker.ModuleConfig{
54-
MaxBlockSize: cfg.HashLogsBlockMaxSize,
55-
Schedule: cfg.HashLogsBlockCRONSpec,
56-
}),
74+
drivers.NewFXModule(),
75+
fx.Invoke(all.Register),
76+
newWorkerModule(cfg.WorkerConfiguration),
5777
).Run(cmd)
5878
},
5979
}
@@ -66,3 +86,17 @@ func NewWorkerCommand() *cobra.Command {
6686

6787
return cmd
6888
}
89+
90+
func newWorkerModule(configuration WorkerConfiguration) fx.Option {
91+
return worker.NewFXModule(worker.ModuleConfig{
92+
AsyncBlockRunnerConfig: storage.AsyncBlockRunnerConfig{
93+
MaxBlockSize: configuration.HashLogsBlockMaxSize,
94+
Schedule: configuration.HashLogsBlockCRONSpec,
95+
},
96+
ReplicationConfig: runner.ModuleConfig{
97+
SyncPeriod: configuration.SyncPeriod,
98+
PushRetryPeriod: configuration.PushRetryPeriod,
99+
PullInterval: configuration.PullInterval,
100+
},
101+
})
102+
}

deployments/pulumi/.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
Pulumi.*.yaml
2+
examples/

0 commit comments

Comments
 (0)