6
6
"sync"
7
7
"syscall"
8
8
9
+ "oss.nandlabs.io/golly/collections"
9
10
"oss.nandlabs.io/golly/errutils"
10
11
)
11
12
@@ -117,6 +118,31 @@ type SimpleComponentManager struct {
117
118
componentIds []string
118
119
cMutex * sync.RWMutex
119
120
waitChan chan struct {}
121
+ dependencies map [string ]collections.List [string ]
122
+ }
123
+
124
+ // AddDependency will add a dependency between the two components.
125
+ func (scm * SimpleComponentManager ) AddDependency (id , dependsOn string ) (err error ) {
126
+ scm .cMutex .Lock ()
127
+ defer scm .cMutex .Unlock ()
128
+ if _ , exists := scm .components [id ]; ! exists {
129
+ return ErrCompNotFound
130
+ }
131
+ if _ , exists := scm .components [dependsOn ]; ! exists {
132
+ return ErrCompNotFound
133
+ }
134
+
135
+ //detect cyclic dependencies
136
+ if v , ok := scm .dependencies [dependsOn ]; ok && v .Contains (id ) {
137
+ return ErrCyclicDependency
138
+ }
139
+
140
+ if _ , exists := scm .dependencies [id ]; ! exists {
141
+ scm .dependencies [id ] = collections .NewArrayList [string ]()
142
+ }
143
+ scm .dependencies [id ].Add (dependsOn )
144
+ logger .InfoF ("Added dependency %s depends on %s:" , id , dependsOn )
145
+ return
120
146
}
121
147
122
148
// GetState will return the current state of the LifeCycle for the component with the given id.
@@ -171,21 +197,54 @@ func (scm *SimpleComponentManager) Start(id string) (err error) {
171
197
scm .cMutex .Lock ()
172
198
defer scm .cMutex .Unlock ()
173
199
component , exists := scm .components [id ]
174
- if exists {
175
- if component .State () != Running {
176
- var err error = nil
177
- go func (c Component , scm * SimpleComponentManager ) {
178
- err = component .Start ()
179
- if err != nil {
180
- logger .ErrorF ("Error starting component: %v" , err )
181
- }
182
- }(component , scm )
183
- return err
200
+ if ! exists {
201
+ return ErrCompNotFound
202
+ }
203
+ if component .State () == Running {
204
+ return
205
+ }
206
+ // Start the dependencies first
207
+ if v , ok := scm .dependencies [id ]; ok {
208
+ logger .DebugF ("Component %s has dependencies. Starting dependencies" , id )
209
+ dependecyWait := sync.WaitGroup {}
210
+ var multiError * errutils.MultiError = errutils .NewMultiErr (nil )
211
+ for ite := v .Iterator (); ite .HasNext (); {
212
+ dependentComp := scm .components [ite .Next ()]
213
+ if dependentComp .State () != Running {
214
+ dependecyWait .Add (1 )
215
+ go func (c Component , scm * SimpleComponentManager ) {
216
+ logger .DebugF ("Starting dependent component %s" , dependentComp .Id ())
217
+ err = dependentComp .Start ()
218
+ if err != nil {
219
+ multiError .Add (err )
220
+ logger .ErrorF ("Error starting component: %v" , err )
221
+ } else {
222
+ logger .DebugF ("Started dependent component %s" , dependentComp .Id ())
223
+ }
224
+ dependecyWait .Done ()
225
+ }(dependentComp , scm )
226
+ } else {
227
+ logger .DebugF ("Dependent component %s already running" , dependentComp .Id ())
228
+ }
229
+ }
230
+ dependecyWait .Wait ()
231
+
232
+ if multiError .HasErrors () {
233
+ return multiError
184
234
} else {
185
- return ErrCompAlreadyStarted
235
+ logger . Info ( "All dependencies started" )
186
236
}
237
+
238
+ }
239
+ logger .DebugF ("Starting component %s" , id )
240
+ err = component .Start ()
241
+ if err != nil {
242
+ logger .ErrorF ("Error starting component: %v" , err )
243
+ } else {
244
+ logger .DebugF ("Started component %s" , id )
187
245
}
188
- return ErrCompNotFound
246
+
247
+ return
189
248
}
190
249
191
250
// StartAll will start all the Components. Returns the number of components started
@@ -211,32 +270,79 @@ func (scm *SimpleComponentManager) StartAndWait() {
211
270
212
271
}
213
272
273
+ // Stop will stop the LifeCycle for the component with the given id. It returns if the component was stopped.
274
+ func (scm * SimpleComponentManager ) Stop (id string ) (err error ) {
275
+
276
+ component , exists := scm .components [id ]
277
+ if ! exists {
278
+ return ErrCompNotFound
279
+ }
280
+ if component .State () == Stopped {
281
+ return
282
+ }
283
+ // check if the component has dependencies
284
+ if v , ok := scm .dependencies [id ]; ok {
285
+ logger .DebugF ("Component %s has dependencies" , id )
286
+ dependecyWait := sync.WaitGroup {}
287
+ var multiError * errutils.MultiError = errutils .NewMultiErr (nil )
288
+ for ite := v .Iterator (); ite .HasNext (); {
289
+ dependentComp := scm .components [ite .Next ()]
290
+ logger .DebugF ("Checking dependent component %s" , dependentComp .Id ())
291
+ if dependentComp .State () != Stopped {
292
+ dependecyWait .Add (1 )
293
+ go func (c Component , scm * SimpleComponentManager ) {
294
+ logger .InfoF ("Stopping dependent component %s" , c .Id ())
295
+ err = dependentComp .Stop ()
296
+ if err != nil {
297
+ multiError .Add (err )
298
+ logger .ErrorF ("Error stopping component: %v" , err )
299
+ } else {
300
+ logger .DebugF ("Stopped dependent component %s" , c .Id ())
301
+ }
302
+ dependecyWait .Done ()
303
+
304
+ }(dependentComp , scm )
305
+ } else {
306
+ logger .InfoF ("Dependent component %s already stopped" , dependentComp .Id ())
307
+ }
308
+ }
309
+ dependecyWait .Wait ()
310
+ if multiError .HasErrors () {
311
+ return multiError
312
+ } else {
313
+ logger .DebugF ("All dependencies stopped proceeding to stop component %s" , id )
314
+ }
315
+ }
316
+ scm .cMutex .Lock ()
317
+ defer scm .cMutex .Unlock ()
318
+ if component .State () == Running {
319
+ logger .Debug ("Stopping component " , id )
320
+ err := component .Stop ()
321
+
322
+ if err != nil {
323
+ logger .ErrorF ("Error stopping component: %v" , err )
324
+ } else {
325
+ logger .InfoF ("Stopped component %s" , id )
326
+ }
327
+
328
+ }
329
+ return
330
+ }
331
+
214
332
// StopAll will stop all the Components.
215
333
func (scm * SimpleComponentManager ) StopAll () error {
216
334
logger .InfoF ("Stopping all components" )
217
335
err := errutils .NewMultiErr (nil )
218
- scm .cMutex .Lock ()
219
- defer scm .cMutex .Unlock ()
220
- wg := & sync.WaitGroup {}
221
336
for i := len (scm .componentIds ) - 1 ; i >= 0 ; i -- {
222
- component := scm.components [scm.componentIds [i ]]
223
- if component .State () == Running {
224
- wg .Add (1 )
225
- go func (c Component , wg * sync.WaitGroup ) {
226
- e := component .Stop ()
227
- if e != nil {
228
- logger .ErrorF ("Error stopping component: %v" , err )
229
- err .Add (e )
230
- }
231
-
232
- wg .Done ()
233
- }(component , wg )
337
+ e := scm .Stop (scm .componentIds [i ])
338
+ if e != nil {
339
+ logger .ErrorF ("Error stopping component: %v" , err )
340
+ err .Add (e )
234
341
}
235
342
}
236
- wg . Wait ( )
343
+ logger . Info ( "All components stopped" )
237
344
select {
238
345
case <- scm .waitChan :
239
- logger .Info ("All components stopped" )
240
346
default :
241
347
close (scm .waitChan )
242
348
}
@@ -247,28 +353,6 @@ func (scm *SimpleComponentManager) StopAll() error {
247
353
}
248
354
}
249
355
250
- // Stop will stop the LifeCycle for the component with the given id. It returns if the component was stopped.
251
- func (scm * SimpleComponentManager ) Stop (id string ) error {
252
- scm .cMutex .Lock ()
253
- defer scm .cMutex .Unlock ()
254
- component , exists := scm .components [id ]
255
- if exists {
256
- if component .State () == Running {
257
- err := component .Stop ()
258
- if err != nil {
259
- logger .ErrorF ("Error stopping component: %v" , err )
260
- }
261
- return err
262
- } else if component .State () == Stopped {
263
- return ErrCompAlreadyStopped
264
- } else {
265
- return ErrInvalidComponentState
266
- }
267
-
268
- }
269
- return ErrCompNotFound
270
- }
271
-
272
356
// Unregister will unregister a Component.
273
357
func (scm * SimpleComponentManager ) Unregister (id string ) {
274
358
scm .cMutex .Lock ()
@@ -304,9 +388,10 @@ func (scm *SimpleComponentManager) Wait() {
304
388
// NewSimpleComponentManager will return a new SimpleComponentManager.
305
389
func NewSimpleComponentManager () ComponentManager {
306
390
manager := & SimpleComponentManager {
307
- components : make (map [string ]Component ),
308
- cMutex : & sync.RWMutex {},
309
- waitChan : make (chan struct {}),
391
+ components : make (map [string ]Component ),
392
+ cMutex : & sync.RWMutex {},
393
+ waitChan : make (chan struct {}),
394
+ dependencies : make (map [string ]collections.List [string ]),
310
395
}
311
396
sigs := make (chan os.Signal , 1 )
312
397
0 commit comments