-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain.go
More file actions
102 lines (83 loc) · 3.01 KB
/
main.go
File metadata and controls
102 lines (83 loc) · 3.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package main
import (
"context"
"errors"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/fabricekabongo/loggerhead/admin"
"github.com/fabricekabongo/loggerhead/clustering"
"github.com/fabricekabongo/loggerhead/config"
"github.com/fabricekabongo/loggerhead/query"
"github.com/fabricekabongo/loggerhead/server"
"github.com/fabricekabongo/loggerhead/world"
)
func main() {
ctx := context.Background()
start := time.Now()
cfg := config.GetConfig()
worldMap := world.NewWorld()
readEngine := query.NewReadQueryEngine(worldMap)
writeEngine := query.NewWriteQueryEngine(worldMap)
// subscriberEngine := query.NewSubscriberQueryEngine(worldMap)
cluster, err := clustering.NewCluster(writeEngine, cfg)
if err != nil {
log.Println(err)
if errors.Is(err, clustering.ErrFailedToCreateCluster) {
log.Fatal("Failed to create cluster: ", err)
}
}
defer func(cluster *clustering.Cluster) {
err := cluster.Close(0)
if err != nil {
log.Println("Failed to leave cluster: ", err)
}
}(cluster)
ClusterCtx, concel := context.WithCancel(ctx)
clusterEngine := clustering.NewEngineDecorator(ClusterCtx, cluster, writeEngine)
opsServer := admin.NewOpsServer(cluster, cfg)
go opsServer.Start()
writer := server.NewListener(cfg.WritePort, cfg.MaxConnections, cfg.MaxEOFWait, clusterEngine) // This is the writer listener (for writes and broadcasts)
reader := server.NewListener(cfg.ReadPort, cfg.MaxConnections, cfg.MaxEOFWait, readEngine) // This is the reader listener (for reads).
// subscriber := server.NewListener(cfg, subscriberEngine)
svr := server.NewServer([]*server.Listener{writer, reader})
defer svr.Stop()
printWelcomeMessage(cfg, cluster)
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
go func(cancel context.CancelFunc) {
s := <-sigc
log.Println("Received signal: ", s)
svr.Stop()
close(sigc)
cancel()
err := cluster.Close(0)
if err != nil {
return
}
os.Exit(0)
}(concel)
end := time.Now()
fmt.Println("Startup time: ", end.Sub(start))
svr.Start()
}
func printWelcomeMessage(cfg config.Config, cluster *clustering.Cluster) {
fmt.Println("===========================================================")
fmt.Println("Starting the Database Server")
fmt.Println("===========================================================")
fmt.Println("Read Port: ", cfg.ReadPort)
fmt.Println("Write Port: ", cfg.WritePort)
fmt.Println("Cluster Port: ", cfg.ClusterPort)
fmt.Println("Admin & Prometheus Port:", cfg.HttpPort)
fmt.Println("Max Connections: ", cfg.MaxConnections)
fmt.Println("Max EOF Wait: ", cfg.MaxEOFWait)
fmt.Println("Cluster DNS: ", cfg.ClusterDNS)
fmt.Println("Seed Node: ", cfg.SeedNode)
fmt.Println("My IP: ", cluster.MemberList().LocalNode().Addr.String())
fmt.Println("Node Name: ", cluster.MemberList().LocalNode().Name)
fmt.Println("Node State: ", clustering.StateToString(cluster.MemberList().LocalNode().State))
fmt.Println("===========================================================")
}