Skip to content

Commit dc9d5af

Browse files
committed
fix: data race
1 parent ccc2887 commit dc9d5af

File tree

14 files changed

+116
-356
lines changed

14 files changed

+116
-356
lines changed

deployments/pulumi/main.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package main
33
import (
44
"errors"
55
"fmt"
6+
"github.com/formancehq/go-libs/v2/pointer"
67
"github.com/formancehq/ledger/deployments/pulumi/pkg"
78
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
89
"github.com/pulumi/pulumi/sdk/v3/go/pulumi/config"
10+
"github.com/pulumi/pulumi/sdk/v3/go/pulumix"
911
)
1012

1113
func main() {
@@ -43,9 +45,9 @@ func deploy(ctx *pulumi.Context) error {
4345
Postgres: pulumi_ledger.PostgresArgs{
4446
URI: pulumi.String(postgresURI),
4547
},
46-
Debug: pulumi.Bool(conf.GetBool("debug")),
48+
Debug: pulumi.Bool(conf.GetBool("debug")),
4749
API: pulumi_ledger.APIArgs{
48-
ReplicaCount: pulumi.Int(conf.GetInt("replicaCount")),
50+
ReplicaCount: pulumix.Val(pointer.For(conf.GetInt("replicaCount"))),
4951
ExperimentalFeatures: pulumi.Bool(conf.GetBool("experimentalFeatures")),
5052
},
5153
})

internal/README.md

+4-3
Original file line numberDiff line numberDiff line change
@@ -903,7 +903,7 @@ func (m Moves) ComputePostCommitEffectiveVolumes() PostCommitVolumes
903903

904904

905905
<a name="Pipeline"></a>
906-
## type [Pipeline](<https://github.com/formancehq/ledger/blob/main/internal/pipeline.go#L27-L36>)
906+
## type [Pipeline](<https://github.com/formancehq/ledger/blob/main/internal/pipeline.go#L27-L37>)
907907

908908

909909

@@ -914,14 +914,15 @@ type Pipeline struct {
914914
PipelineConfiguration
915915
CreatedAt time.Time `json:"createdAt" bun:"created_at"`
916916
ID string `json:"id" bun:"id,pk"`
917+
Version int `json:"-" bun:"version"`
917918
Enabled bool `json:"enabled" bun:"enabled"`
918-
LastLogID *int `json:"lastLogID,omitempty" bun:"last_log_id"`
919+
LastLogID int `json:"lastLogID,omitempty" bun:"last_log_id"`
919920
Error string `json:"error,omitempty" bun:"error"`
920921
}
921922
```
922923

923924
<a name="NewPipeline"></a>
924-
### func [NewPipeline](<https://github.com/formancehq/ledger/blob/main/internal/pipeline.go#L38>)
925+
### func [NewPipeline](<https://github.com/formancehq/ledger/blob/main/internal/pipeline.go#L39>)
925926

926927
```go
927928
func NewPipeline(pipelineConfiguration PipelineConfiguration) Pipeline

internal/pipeline.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@ type Pipeline struct {
3030
PipelineConfiguration
3131
CreatedAt time.Time `json:"createdAt" bun:"created_at"`
3232
ID string `json:"id" bun:"id,pk"`
33+
Version int `json:"-" bun:"version"`
3334
Enabled bool `json:"enabled" bun:"enabled"`
34-
LastLogID *int `json:"lastLogID,omitempty" bun:"last_log_id"`
35+
LastLogID int `json:"lastLogID,omitempty" bun:"last_log_id"`
3536
Error string `json:"error,omitempty" bun:"error"`
3637
}
3738

@@ -41,5 +42,6 @@ func NewPipeline(pipelineConfiguration PipelineConfiguration) Pipeline {
4142
PipelineConfiguration: pipelineConfiguration,
4243
Enabled: true,
4344
CreatedAt: time.Now(),
45+
LastLogID: -1,
4446
}
4547
}

internal/replication/runner/pipeline.go

+4-9
Original file line numberDiff line numberDiff line change
@@ -62,16 +62,11 @@ func (p *PipelineHandler) Run(ctx context.Context, ingestedLogs chan int) {
6262
close(ch)
6363
return
6464
case <-time.After(nextInterval):
65-
id := -1
66-
if p.pipeline.LastLogID != nil {
67-
id = *p.pipeline.LastLogID
68-
}
69-
7065
logs, err := p.store.ListLogs(ctx, pagination.ColumnPaginatedQuery[any]{
7166
PageSize: 100,
7267
Column: "id",
7368
Options: pagination.ResourceQuery[any]{
74-
Builder: query.Gt("id", id),
69+
Builder: query.Gt("id", p.pipeline.LastLogID),
7570
},
7671
Order: pointer.For(bunpaginate.Order(bunpaginate.OrderAsc)),
7772
})
@@ -119,12 +114,12 @@ func (p *PipelineHandler) Run(ctx context.Context, ingestedLogs chan int) {
119114

120115
wg.Wait()
121116

122-
p.pipeline.LastLogID = logs.Data[len(logs.Data)-1].ID
117+
p.pipeline.LastLogID = *logs.Data[len(logs.Data)-1].ID
123118

124119
select {
125120
case <-ctx.Done():
126121
return
127-
case ingestedLogs <- *p.pipeline.LastLogID:
122+
case ingestedLogs <- p.pipeline.LastLogID:
128123
}
129124

130125
if !logs.HasMore {
@@ -176,4 +171,4 @@ func NewPipelineHandler(
176171
WithField("module", pipeline.Ledger).
177172
WithField("connector", pipeline.ConnectorID),
178173
}
179-
}
174+
}

internal/replication/runner/runner.go

+63-15
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,20 @@ func (runner *Runner) stopConnector(ctx context.Context, connector drivers.Drive
6060
}
6161
}
6262

63-
func (runner *Runner) StartPipeline(ctx context.Context, pipeline ledger.Pipeline) (*PipelineHandler, error) {
63+
func (runner *Runner) StartPipeline(ctx context.Context, pipelineID string) (*PipelineHandler, error) {
6464
runner.mu.Lock()
6565
defer runner.mu.Unlock()
6666

67+
pipeline, err := runner.storage.GetPipeline(ctx, pipelineID)
68+
if err != nil {
69+
return nil, err
70+
}
71+
72+
return runner.startPipeline(ctx, *pipeline)
73+
}
74+
75+
func (runner *Runner) startPipeline(ctx context.Context, pipeline ledger.Pipeline) (*PipelineHandler, error) {
76+
runner.logger.Infof("initializing pipeline")
6777
_, ok := runner.pipelines[pipeline.ID]
6878
if ok {
6979
return nil, ledger.NewErrAlreadyStarted(pipeline.ID)
@@ -72,15 +82,15 @@ func (runner *Runner) StartPipeline(ctx context.Context, pipeline ledger.Pipelin
7282
ctx = logging.ContextWithLogger(
7383
ctx,
7484
runner.logger.WithFields(map[string]any{
75-
"module": pipeline.Ledger,
85+
"ledger": pipeline.Ledger,
7686
"connector": pipeline.ConnectorID,
7787
}),
7888
)
7989

8090
// Detach the context as once the process of pipeline initialisation is started, we must not stop it
8191
ctx = context.WithoutCancel(ctx)
8292

83-
runner.logger.Infof("initializing pipeline")
93+
runner.logger.Infof("initializing connector")
8494
if err := runner.initConnector(pipeline.ConnectorID); err != nil {
8595
return nil, err
8696
}
@@ -103,6 +113,7 @@ func (runner *Runner) StartPipeline(ctx context.Context, pipeline ledger.Pipelin
103113
// ignore the cancel function, as it will be called by the pipeline at its end
104114
subscription := make(chan int)
105115

116+
runner.logger.Infof("starting handler")
106117
go func() {
107118
for lastLogID := range subscription {
108119
if err := runner.storage.StorePipelineState(ctx, pipeline.ID, lastLogID); err != nil {
@@ -162,26 +173,34 @@ func (runner *Runner) stopConnectorIfNeeded(ctx context.Context, handler *Pipeli
162173
}
163174

164175
func (runner *Runner) synchronizePipelines(ctx context.Context) error {
176+
runner.mu.Lock()
177+
defer runner.mu.Unlock()
178+
165179
runner.logger.Debug("restore pipelines from store")
180+
defer func() {
181+
runner.logger.Debug("restoration terminated")
182+
}()
166183
pipelines, err := runner.storage.ListEnabledPipelines(ctx)
167184
if err != nil {
168185
return fmt.Errorf("reading pipelines from store: %w", err)
169186
}
170187

171188
for _, pipeline := range pipelines {
172-
if handler := runner.GetPipeline(pipeline.ID); handler != nil {
173-
if pipeline.LastLogID != nil ||
174-
handler.pipeline.LastLogID == nil ||
175-
*handler.pipeline.LastLogID == 0 {
189+
runner.logger.Debugf("restoring pipeline %s", pipeline.ID)
190+
if handler := runner.pipelines[pipeline.ID]; handler != nil {
191+
if pipeline.Version == handler.pipeline.Version {
192+
runner.logger.Debugf("pipeline %s up to date, skipping", pipeline.ID)
176193
continue
177194
}
178195

196+
runner.logger.Debugf("pipeline %s outdated, stopping it", pipeline.ID)
179197
if err := runner.stopPipeline(ctx, pipeline.ID); err != nil {
180198
runner.logger.Errorf("error stopping pipeline: %s", err)
181199
continue
182200
}
183201
}
184-
if _, err := runner.StartPipeline(ctx, pipeline); err != nil {
202+
runner.logger.Debugf("starting pipeline %s", pipeline.ID)
203+
if _, err := runner.startPipeline(ctx, pipeline); err != nil {
185204
return err
186205
}
187206
}
@@ -203,6 +222,41 @@ l:
203222
return nil
204223
}
205224

225+
func (runner *Runner) Refresh(ctx context.Context, pipelineID string) error {
226+
runner.mu.Lock()
227+
defer runner.mu.Unlock()
228+
229+
pipeline, err := runner.storage.GetPipeline(ctx, pipelineID)
230+
if err != nil {
231+
return err
232+
}
233+
234+
if pipeline.Enabled {
235+
handler, ok := runner.pipelines[pipelineID]
236+
if !ok {
237+
if _, err := runner.startPipeline(ctx, *pipeline); err != nil {
238+
return err
239+
}
240+
return nil
241+
} else if pipeline.Version != handler.pipeline.Version {
242+
if err := runner.stopPipeline(ctx, pipelineID); err != nil {
243+
return err
244+
}
245+
if _, err := runner.startPipeline(ctx, *pipeline); err != nil {
246+
return err
247+
}
248+
}
249+
250+
} else {
251+
_, ok := runner.pipelines[pipelineID]
252+
if ok {
253+
return runner.stopPipeline(ctx, pipelineID)
254+
}
255+
}
256+
257+
return nil
258+
}
259+
206260
func (runner *Runner) Run(ctx context.Context) {
207261
if err := runner.synchronizePipelines(ctx); err != nil {
208262
runner.logger.Errorf("starting pipelines: %s", err)
@@ -228,13 +282,7 @@ func (runner *Runner) GetPipeline(id string) *PipelineHandler {
228282
runner.mu.Lock()
229283
defer runner.mu.Unlock()
230284

231-
for p, pipeline := range runner.pipelines {
232-
if id == p {
233-
return pipeline
234-
}
235-
}
236-
237-
return nil
285+
return runner.pipelines[id]
238286
}
239287

240288
func (runner *Runner) Stop(ctx context.Context) error {

internal/replication/runner/runner_test.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ func TestRunner(t *testing.T) {
9292
AnyTimes().
9393
Return([]ledger.Pipeline{pipeline}, nil)
9494

95+
storage.EXPECT().
96+
GetPipeline(gomock.Any(), pipeline.ID).
97+
Return(&pipeline, nil)
98+
9599
storage.EXPECT().
96100
OpenLedger(gomock.Any(), pipelineConfiguration.Ledger).
97101
Return(logFetcher, &ledger.Ledger{}, nil)
@@ -101,7 +105,7 @@ func TestRunner(t *testing.T) {
101105
Return(nil)
102106

103107
runner := startRunner(t, ctx, storage, connectorFactory)
104-
_, err := runner.StartPipeline(ctx, pipeline)
108+
_, err := runner.StartPipeline(ctx, pipeline.ID)
105109
require.NoError(t, err)
106110

107111
connector.EXPECT().
@@ -110,7 +114,7 @@ func TestRunner(t *testing.T) {
110114

111115
require.Eventually(t, func() bool {
112116
return runner.GetConnector("connector") != nil
113-
}, time.Second, 10*time.Millisecond)
117+
}, 5*time.Second, 10*time.Millisecond)
114118

115119
select {
116120
case <-runner.GetConnector("connector").Ready():

internal/replication/runner/store.go

+5
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,18 @@ type Storage interface {
2727
OpenLedger(context.Context, string) (LogFetcher, *ledger.Ledger, error)
2828
StorePipelineState(ctx context.Context, id string, lastLogID int) error
2929
ListEnabledPipelines(ctx context.Context) ([]ledger.Pipeline, error)
30+
GetPipeline(ctx context.Context, id string) (*ledger.Pipeline, error)
3031
}
3132

3233
type storageAdapter struct {
3334
storageDriver *driver.Driver
3435
systemStore *systemstore.DefaultStore
3536
}
3637

38+
func (s *storageAdapter) GetPipeline(ctx context.Context, id string) (*ledger.Pipeline, error) {
39+
return s.systemStore.GetPipeline(ctx, id)
40+
}
41+
3742
func (s *storageAdapter) OpenLedger(ctx context.Context, name string) (LogFetcher, *ledger.Ledger, error) {
3843
store, l, err := s.storageDriver.OpenLedger(ctx, name)
3944

internal/replication/runner/store_generated_test.go

+15
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/storage/system/migrations.go

+1
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ func GetMigrator(db *bun.DB, options ...migrations.Option) *migrations.Migrator
248248
enabled bool,
249249
last_log_id bigint,
250250
error varchar,
251+
version int,
251252
252253
primary key(id)
253254
);

internal/storage/system/store.go

+10-5
Original file line numberDiff line numberDiff line change
@@ -210,11 +210,16 @@ func (store *DefaultStore) CreatePipeline(ctx context.Context, pipeline ledger.P
210210
}
211211

212212
func (store *DefaultStore) UpdatePipeline(ctx context.Context, id string, o map[string]any) error {
213-
_, err := store.db.NewUpdate().
214-
Table("_system.pipelines").
215-
Model(&o).
216-
Where("id = ?", id).
217-
Exec(ctx)
213+
updateQuery := store.db.NewUpdate().
214+
Table("_system.pipelines")
215+
for k, v := range o {
216+
updateQuery = updateQuery.Set(k + " = ?", v)
217+
}
218+
updateQuery = updateQuery.
219+
Set("version = version + 1").
220+
Where("id = ?", id)
221+
222+
_, err := updateQuery.Exec(ctx)
218223
return postgres.ResolveError(err)
219224
}
220225

internal/storage/system/store_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,7 @@ func TestUpdatePipeline(t *testing.T) {
270270
require.False(t, pipelineFromDB.Enabled)
271271

272272
pipelineFromDB.Enabled = true
273+
pipelineFromDB.Version -= 1
273274
require.Equal(t, alivePipeline, *pipelineFromDB)
274275
}
275276

0 commit comments

Comments
 (0)