@@ -60,10 +60,20 @@ func (runner *Runner) stopConnector(ctx context.Context, connector drivers.Drive
60
60
}
61
61
}
62
62
63
- func (runner * Runner ) StartPipeline (ctx context.Context , pipeline ledger. Pipeline ) (* PipelineHandler , error ) {
63
+ func (runner * Runner ) StartPipeline (ctx context.Context , pipelineID string ) (* PipelineHandler , error ) {
64
64
runner .mu .Lock ()
65
65
defer runner .mu .Unlock ()
66
66
67
+ pipeline , err := runner .storage .GetPipeline (ctx , pipelineID )
68
+ if err != nil {
69
+ return nil , err
70
+ }
71
+
72
+ return runner .startPipeline (ctx , * pipeline )
73
+ }
74
+
75
+ func (runner * Runner ) startPipeline (ctx context.Context , pipeline ledger.Pipeline ) (* PipelineHandler , error ) {
76
+ runner .logger .Infof ("initializing pipeline" )
67
77
_ , ok := runner .pipelines [pipeline .ID ]
68
78
if ok {
69
79
return nil , ledger .NewErrAlreadyStarted (pipeline .ID )
@@ -72,15 +82,15 @@ func (runner *Runner) StartPipeline(ctx context.Context, pipeline ledger.Pipelin
72
82
ctx = logging .ContextWithLogger (
73
83
ctx ,
74
84
runner .logger .WithFields (map [string ]any {
75
- "module " : pipeline .Ledger ,
85
+ "ledger " : pipeline .Ledger ,
76
86
"connector" : pipeline .ConnectorID ,
77
87
}),
78
88
)
79
89
80
90
// Detach the context as once the process of pipeline initialisation is started, we must not stop it
81
91
ctx = context .WithoutCancel (ctx )
82
92
83
- runner .logger .Infof ("initializing pipeline " )
93
+ runner .logger .Infof ("initializing connector " )
84
94
if err := runner .initConnector (pipeline .ConnectorID ); err != nil {
85
95
return nil , err
86
96
}
@@ -103,6 +113,7 @@ func (runner *Runner) StartPipeline(ctx context.Context, pipeline ledger.Pipelin
103
113
// ignore the cancel function, as it will be called by the pipeline at its end
104
114
subscription := make (chan int )
105
115
116
+ runner .logger .Infof ("starting handler" )
106
117
go func () {
107
118
for lastLogID := range subscription {
108
119
if err := runner .storage .StorePipelineState (ctx , pipeline .ID , lastLogID ); err != nil {
@@ -162,26 +173,34 @@ func (runner *Runner) stopConnectorIfNeeded(ctx context.Context, handler *Pipeli
162
173
}
163
174
164
175
func (runner * Runner ) synchronizePipelines (ctx context.Context ) error {
176
+ runner .mu .Lock ()
177
+ defer runner .mu .Unlock ()
178
+
165
179
runner .logger .Debug ("restore pipelines from store" )
180
+ defer func () {
181
+ runner .logger .Debug ("restoration terminated" )
182
+ }()
166
183
pipelines , err := runner .storage .ListEnabledPipelines (ctx )
167
184
if err != nil {
168
185
return fmt .Errorf ("reading pipelines from store: %w" , err )
169
186
}
170
187
171
188
for _ , pipeline := range pipelines {
172
- if handler := runner .GetPipeline ( pipeline .ID ); handler != nil {
173
- if pipeline .LastLogID != nil ||
174
- handler . pipeline .LastLogID == nil ||
175
- * handler . pipeline . LastLogID == 0 {
189
+ runner .logger . Debugf ( "restoring pipeline %s" , pipeline .ID )
190
+ if handler := runner . pipelines [ pipeline .ID ]; handler != nil {
191
+ if pipeline .Version == handler . pipeline . Version {
192
+ runner . logger . Debugf ( "pipeline %s up to date, skipping" , pipeline . ID )
176
193
continue
177
194
}
178
195
196
+ runner .logger .Debugf ("pipeline %s outdated, stopping it" , pipeline .ID )
179
197
if err := runner .stopPipeline (ctx , pipeline .ID ); err != nil {
180
198
runner .logger .Errorf ("error stopping pipeline: %s" , err )
181
199
continue
182
200
}
183
201
}
184
- if _ , err := runner .StartPipeline (ctx , pipeline ); err != nil {
202
+ runner .logger .Debugf ("starting pipeline %s" , pipeline .ID )
203
+ if _ , err := runner .startPipeline (ctx , pipeline ); err != nil {
185
204
return err
186
205
}
187
206
}
203
222
return nil
204
223
}
205
224
225
+ func (runner * Runner ) Refresh (ctx context.Context , pipelineID string ) error {
226
+ runner .mu .Lock ()
227
+ defer runner .mu .Unlock ()
228
+
229
+ pipeline , err := runner .storage .GetPipeline (ctx , pipelineID )
230
+ if err != nil {
231
+ return err
232
+ }
233
+
234
+ if pipeline .Enabled {
235
+ handler , ok := runner .pipelines [pipelineID ]
236
+ if ! ok {
237
+ if _ , err := runner .startPipeline (ctx , * pipeline ); err != nil {
238
+ return err
239
+ }
240
+ return nil
241
+ } else if pipeline .Version != handler .pipeline .Version {
242
+ if err := runner .stopPipeline (ctx , pipelineID ); err != nil {
243
+ return err
244
+ }
245
+ if _ , err := runner .startPipeline (ctx , * pipeline ); err != nil {
246
+ return err
247
+ }
248
+ }
249
+
250
+ } else {
251
+ _ , ok := runner .pipelines [pipelineID ]
252
+ if ok {
253
+ return runner .stopPipeline (ctx , pipelineID )
254
+ }
255
+ }
256
+
257
+ return nil
258
+ }
259
+
206
260
func (runner * Runner ) Run (ctx context.Context ) {
207
261
if err := runner .synchronizePipelines (ctx ); err != nil {
208
262
runner .logger .Errorf ("starting pipelines: %s" , err )
@@ -228,13 +282,7 @@ func (runner *Runner) GetPipeline(id string) *PipelineHandler {
228
282
runner .mu .Lock ()
229
283
defer runner .mu .Unlock ()
230
284
231
- for p , pipeline := range runner .pipelines {
232
- if id == p {
233
- return pipeline
234
- }
235
- }
236
-
237
- return nil
285
+ return runner .pipelines [id ]
238
286
}
239
287
240
288
func (runner * Runner ) Stop (ctx context.Context ) error {
0 commit comments