Skip to content

Commit 0ee45a7

Browse files
Add the loop to stop the clustering engine
1 parent 02e4f5d commit 0ee45a7

2 files changed

Lines changed: 22 additions & 13 deletions

File tree

clustering/engine.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
package clustering
22

3-
import "github.com/fabricekabongo/loggerhead/query"
3+
import (
4+
"context"
5+
6+
"github.com/fabricekabongo/loggerhead/query"
7+
)
48

59
type EngineDecorator struct {
610
cluster *Cluster
711
engine *query.Engine
812
commandChan chan string
13+
ctx context.Context
914
}
1015

1116
func (e EngineDecorator) ExecuteQuery(query string) string {
@@ -15,11 +20,12 @@ func (e EngineDecorator) ExecuteQuery(query string) string {
1520
return e.engine.ExecuteQuery(query)
1621
}
1722

18-
func NewEngineDecorator(cluster *Cluster, engine *query.Engine) query.EngineInterface {
23+
func NewEngineDecorator(ctx context.Context, cluster *Cluster, engine *query.Engine) query.EngineInterface {
1924
eng := &EngineDecorator{
2025
cluster: cluster,
2126
engine: engine,
2227
commandChan: make(chan string),
28+
ctx: ctx,
2329
}
2430

2531
go eng.commandLoop()
@@ -28,12 +34,12 @@ func NewEngineDecorator(cluster *Cluster, engine *query.Engine) query.EngineInte
2834
}
2935

3036
func (e EngineDecorator) commandLoop() {
31-
3237
for {
3338
select {
39+
case <-e.ctx.Done():
40+
return
3441
case command := <-e.commandChan:
35-
e.cluster.Broadcasts().QueueBroadcast(NewLocationBroadcast(command)) // At least broadcast the query to the cluster in case we go down before executing it
36-
42+
e.cluster.Broadcasts().QueueBroadcast(NewLocationBroadcast(command))
3743
}
3844
}
3945
}

main.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"context"
45
"errors"
56
"fmt"
67
"log"
@@ -18,6 +19,8 @@ import (
1819
)
1920

2021
func main() {
22+
23+
ctx := context.Background()
2124
start := time.Now()
2225
cfg := config.GetConfig()
2326

@@ -43,7 +46,8 @@ func main() {
4346
}
4447
}(cluster)
4548

46-
clusterEngine := clustering.NewEngineDecorator(cluster, writeEngine)
49+
ClusterCtx, concel := context.WithCancel(ctx)
50+
clusterEngine := clustering.NewEngineDecorator(ClusterCtx, cluster, writeEngine)
4751

4852
opsServer := admin.NewOpsServer(cluster, cfg)
4953
go opsServer.Start()
@@ -58,22 +62,21 @@ func main() {
5862

5963
printWelcomeMessage(cfg, cluster)
6064
sigc := make(chan os.Signal, 1)
61-
signal.Notify(sigc,
62-
syscall.SIGHUP,
63-
syscall.SIGINT,
64-
syscall.SIGTERM,
65-
syscall.SIGQUIT)
66-
go func() {
65+
signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
66+
67+
go func(cancel context.CancelFunc) {
6768
s := <-sigc
6869
log.Println("Received signal: ", s)
6970
svr.Stop()
7071
close(sigc)
72+
cancel()
73+
7174
err := cluster.Close(0)
7275
if err != nil {
7376
return
7477
}
7578
os.Exit(0)
76-
}()
79+
}(concel)
7780

7881
end := time.Now()
7982
fmt.Println("Startup time: ", end.Sub(start))

0 commit comments

Comments
 (0)