@@ -3,9 +3,10 @@ package cmd
3
3
import (
4
4
"github.com/formancehq/go-libs/v2/logging"
5
5
"github.com/formancehq/ledger/internal/api/common"
6
- replicationcontroller "github.com/formancehq/ledger/internal/replication/controller"
7
- "github.com/formancehq/ledger/internal/replication/leadership"
8
6
"github.com/formancehq/ledger/internal/leadership"
7
+ "github.com/formancehq/ledger/internal/replication/drivers"
8
+ "github.com/formancehq/ledger/internal/replication/drivers/all"
9
+ "github.com/formancehq/ledger/internal/replication/runner"
9
10
systemstore "github.com/formancehq/ledger/internal/storage/system"
10
11
"net/http"
11
12
"net/http/pprof"
@@ -39,17 +40,20 @@ import (
39
40
)
40
41
41
42
const (
42
- BindFlag = "bind"
43
- BallastSizeInBytesFlag = "ballast-size"
44
- NumscriptCacheMaxCountFlag = "numscript-cache-max-count"
45
- AutoUpgradeFlag = "auto-upgrade"
46
- ExperimentalFeaturesFlag = "experimental-features"
47
- BulkMaxSizeFlag = "bulk-max-size"
48
- BulkParallelFlag = "bulk-parallel"
49
- NumscriptInterpreterFlag = "experimental-numscript-interpreter"
43
+ BindFlag = "bind"
44
+ BallastSizeInBytesFlag = "ballast-size"
45
+ NumscriptCacheMaxCountFlag = "numscript-cache-max-count"
46
+ AutoUpgradeFlag = "auto-upgrade"
47
+ ExperimentalFeaturesFlag = "experimental-features"
48
+ BulkMaxSizeFlag = "bulk-max-size"
49
+ BulkParallelFlag = "bulk-parallel"
50
+ NumscriptInterpreterFlag = "experimental-numscript-interpreter"
50
51
NumscriptInterpreterFlagsToPass = "numscript-interpreter-flags"
51
- DefaultPageSizeFlag = "default-page-size"
52
- MaxPageSizeFlag = "max-page-size"
52
+ DefaultPageSizeFlag = "default-page-size"
53
+ MaxPageSizeFlag = "max-page-size"
54
+ PipelinesSyncPeriodFlag = "pipelines-sync-period"
55
+ PipelinesPullIntervalFlag = "pipelines-pull-interval"
56
+ PipelinesPushRetryPeriodFlag = "pipelines-push-retry-period"
53
57
)
54
58
55
59
func NewServeCommand () * cobra.Command {
@@ -100,6 +104,13 @@ func NewServeCommand() *cobra.Command {
100
104
auth .FXModuleFromFlags (cmd ),
101
105
bunconnect .Module (* connectionOptions , service .IsDebug (cmd )),
102
106
storage .NewFXModule (serveConfiguration .autoUpgrade ),
107
+ runner .NewFXModule (runner.ModuleConfig {
108
+ SyncPeriod : serveConfiguration .pipelinesSyncPeriod ,
109
+ PullInterval : serveConfiguration .pipelinesPullInterval ,
110
+ PushRetryPeriod : serveConfiguration .pipelinesPushRetryPeriod ,
111
+ }),
112
+ drivers .NewFXModule (),
113
+ fx .Invoke (all .Register ),
103
114
systemcontroller .NewFXModule (systemcontroller.ModuleConfiguration {
104
115
NumscriptInterpreter : numscriptInterpreter ,
105
116
NumscriptInterpreterFlags : numscriptInterpreterFlags ,
@@ -114,7 +125,9 @@ func NewServeCommand() *cobra.Command {
114
125
}),
115
126
bus .NewFxModule (),
116
127
ballast .Module (serveConfiguration .ballastSize ),
117
- leadership .NewFXModule (),
128
+ leadership .NewFXModule (leadership.ModuleConfig {
129
+ LeadershipRetryPeriod : 5 * time .Second ,
130
+ }),
118
131
api .Module (api.Config {
119
132
Version : Version ,
120
133
Debug : service .IsDebug (cmd ),
@@ -148,8 +161,6 @@ func NewServeCommand() *cobra.Command {
148
161
params .Handler ,
149
162
)
150
163
}),
151
- replicationcontroller .NewFXModule (),
152
- leadership .NewFXModule (),
153
164
fx .Invoke (func (lc fx.Lifecycle , h chi.Router ) {
154
165
lc .Append (httpserver .NewHook (h , httpserver .WithAddress (serveConfiguration .bind )))
155
166
}),
@@ -169,6 +180,9 @@ func NewServeCommand() *cobra.Command {
169
180
cmd .Flags ().String (NumscriptInterpreterFlagsToPass , "" , "Feature flags to pass to the experimental numscript interpreter" )
170
181
cmd .Flags ().Uint64 (MaxPageSizeFlag , 100 , "Max page size" )
171
182
cmd .Flags ().Uint64 (DefaultPageSizeFlag , 15 , "Default page size" )
183
+ cmd .Flags ().Duration (PipelinesSyncPeriodFlag , 5 * time .Second , "Pipelines sync period" )
184
+ cmd .Flags ().Duration (PipelinesPullIntervalFlag , runner .DefaultPullInterval , "Pipelines pull interval" )
185
+ cmd .Flags ().Duration (PipelinesPushRetryPeriodFlag , runner .DefaultPushRetryPeriod , "Pipelines push retry period" )
172
186
173
187
service .AddFlags (cmd .Flags ())
174
188
bunconnect .AddFlags (cmd .Flags ())
@@ -184,10 +198,13 @@ func NewServeCommand() *cobra.Command {
184
198
}
185
199
186
200
type serveConfiguration struct {
187
- ballastSize uint
188
- numscriptCacheMaxCount uint
189
- autoUpgrade bool
190
- bind string
201
+ ballastSize uint
202
+ numscriptCacheMaxCount uint
203
+ autoUpgrade bool
204
+ bind string
205
+ pipelinesSyncPeriod time.Duration
206
+ pipelinesPullInterval time.Duration
207
+ pipelinesPushRetryPeriod time.Duration
191
208
}
192
209
193
210
func discoverServeConfiguration (cmd * cobra.Command ) serveConfiguration {
@@ -196,6 +213,9 @@ func discoverServeConfiguration(cmd *cobra.Command) serveConfiguration {
196
213
ret .numscriptCacheMaxCount , _ = cmd .Flags ().GetUint (NumscriptCacheMaxCountFlag )
197
214
ret .autoUpgrade , _ = cmd .Flags ().GetBool (AutoUpgradeFlag )
198
215
ret .bind , _ = cmd .Flags ().GetString (BindFlag )
216
+ ret .pipelinesSyncPeriod , _ = cmd .Flags ().GetDuration (PipelinesSyncPeriodFlag )
217
+ ret .pipelinesPullInterval , _ = cmd .Flags ().GetDuration (PipelinesPullIntervalFlag )
218
+ ret .pipelinesPushRetryPeriod , _ = cmd .Flags ().GetDuration (PipelinesPushRetryPeriodFlag )
199
219
200
220
return ret
201
221
}
0 commit comments