Skip to content

Commit 6aca448

Browse files
committed
Port to newest cclib. Use metricstore as library.
1 parent ede0cb1 commit 6aca448

32 files changed

Lines changed: 1204 additions & 4177 deletions

cmd/cc-metric-store/cli.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
2+
// All rights reserved. This file is part of cc-metric-store.
3+
// Use of this source code is governed by a MIT-style
4+
// license that can be found in the LICENSE file.
5+
6+
// Package main provides the entry point for the ClusterCockpit metric store server.
7+
// This file defines all command-line flags and their default values.
8+
package main
9+
10+
import "flag"
11+
12+
var (
13+
flagGops, flagVersion, flagDev, flagLogDateTime bool
14+
flagConfigFile, flagLogLevel string
15+
)
16+
17+
func cliInit() {
18+
flag.BoolVar(&flagGops, "gops", false, "Listen via github.com/google/gops/agent (for debugging)")
19+
flag.BoolVar(&flagDev, "dev", false, "Enable development component: Swagger UI")
20+
flag.BoolVar(&flagVersion, "version", false, "Show version information and exit")
21+
flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages")
22+
flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`")
23+
flag.StringVar(&flagLogLevel, "loglevel", "warn", "Sets the logging level: `[debug, info, warn (default), err, crit]`")
24+
flag.Parse()
25+
}

cmd/cc-metric-store/main.go

Lines changed: 117 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,25 @@
11
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
2-
// All rights reserved.
2+
// All rights reserved. This file is part of cc-metric-store.
33
// Use of this source code is governed by a MIT-style
44
// license that can be found in the LICENSE file.
5+
56
package main
67

78
import (
89
"context"
9-
"crypto/tls"
10-
"flag"
1110
"fmt"
12-
"log"
13-
"net"
14-
"net/http"
1511
"os"
1612
"os/signal"
17-
"runtime"
18-
"runtime/debug"
1913
"sync"
2014
"syscall"
21-
"time"
2215

23-
"github.com/ClusterCockpit/cc-metric-store/internal/api"
24-
"github.com/ClusterCockpit/cc-metric-store/internal/avro"
16+
"github.com/ClusterCockpit/cc-backend/pkg/metricstore"
17+
ccconf "github.com/ClusterCockpit/cc-lib/v2/ccConfig"
18+
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
19+
"github.com/ClusterCockpit/cc-lib/v2/nats"
20+
"github.com/ClusterCockpit/cc-lib/v2/runtime"
2521
"github.com/ClusterCockpit/cc-metric-store/internal/config"
26-
"github.com/ClusterCockpit/cc-metric-store/internal/memorystore"
27-
"github.com/ClusterCockpit/cc-metric-store/internal/runtimeEnv"
2822
"github.com/google/gops/agent"
29-
httpSwagger "github.com/swaggo/http-swagger"
3023
)
3124

3225
var (
@@ -35,147 +28,148 @@ var (
3528
version string
3629
)
3730

38-
func main() {
39-
var configFile string
40-
var enableGopsAgent, flagVersion, flagDev bool
41-
flag.StringVar(&configFile, "config", "./config.json", "configuration file")
42-
flag.BoolVar(&enableGopsAgent, "gops", false, "Listen via github.com/google/gops/agent")
43-
flag.BoolVar(&flagDev, "dev", false, "Enable development Swagger UI component")
44-
flag.BoolVar(&flagVersion, "version", false, "Show version information and exit")
45-
flag.Parse()
31+
func printVersion() {
32+
fmt.Printf("Version:\t%s\n", version)
33+
fmt.Printf("Git hash:\t%s\n", commit)
34+
fmt.Printf("Build time:\t%s\n", date)
35+
}
4636

47-
if flagVersion {
48-
fmt.Printf("Version:\t%s\n", version)
49-
fmt.Printf("Git hash:\t%s\n", commit)
50-
fmt.Printf("Build time:\t%s\n", date)
51-
os.Exit(0)
37+
func initGops() error {
38+
if !flagGops && !config.Keys.Debug.EnableGops {
39+
return nil
5240
}
5341

54-
startupTime := time.Now()
55-
config.Init(configFile)
56-
memorystore.Init(config.Keys.Metrics)
57-
ms := memorystore.GetMemoryStore()
58-
59-
if enableGopsAgent || config.Keys.Debug.EnableGops {
60-
if err := agent.Listen(agent.Options{}); err != nil {
61-
log.Fatal(err)
62-
}
42+
if err := agent.Listen(agent.Options{}); err != nil {
43+
return fmt.Errorf("starting gops agent: %w", err)
6344
}
45+
return nil
46+
}
6447

65-
d, err := time.ParseDuration(config.Keys.Checkpoints.Restore)
66-
if err != nil {
67-
log.Fatal(err)
68-
}
48+
func initConfiguration() error {
49+
ccconf.Init(flagConfigFile)
6950

70-
restoreFrom := startupTime.Add(-d)
71-
log.Printf("Loading checkpoints newer than %s\n", restoreFrom.Format(time.RFC3339))
72-
files, err := ms.FromCheckpointFiles(config.Keys.Checkpoints.RootDir, restoreFrom.Unix())
73-
loadedData := ms.SizeInBytes() / 1024 / 1024 // In MB
74-
if err != nil {
75-
log.Fatalf("Loading checkpoints failed: %s\n", err.Error())
76-
} else {
77-
log.Printf("Checkpoints loaded (%d files, %d MB, that took %fs)\n", files, loadedData, time.Since(startupTime).Seconds())
51+
cfg := ccconf.GetPackageConfig("main")
52+
if cfg == nil {
53+
return fmt.Errorf("main configuration must be present")
7854
}
7955

80-
// Try to use less memory by forcing a GC run here and then
81-
// lowering the target percentage. The default of 100 means
82-
// that only once the ratio of new allocations execeds the
83-
// previously active heap, a GC is triggered.
84-
// Forcing a GC here will set the "previously active heap"
85-
// to a minumum.
86-
runtime.GC()
87-
if loadedData > 1000 && os.Getenv("GOGC") == "" {
88-
debug.SetGCPercent(10)
56+
config.Init(cfg)
57+
return nil
58+
}
59+
60+
func initSubsystems() error {
61+
// Initialize nats client
62+
natsConfig := ccconf.GetPackageConfig("nats")
63+
if err := nats.Init(natsConfig); err != nil {
64+
cclog.Warnf("initializing (optional) nats client: %s", err.Error())
8965
}
66+
nats.Connect()
9067

91-
ctx, shutdown := context.WithCancel(context.Background())
68+
return nil
69+
}
9270

71+
func runServer(ctx context.Context) error {
9372
var wg sync.WaitGroup
94-
wg.Add(4)
95-
96-
memorystore.Retention(&wg, ctx)
97-
memorystore.Checkpointing(&wg, ctx)
98-
memorystore.Archiving(&wg, ctx)
99-
avro.DataStaging(&wg, ctx)
100-
101-
r := http.NewServeMux()
102-
api.MountRoutes(r)
103-
104-
if flagDev {
105-
log.Print("Enable Swagger UI!")
106-
r.HandleFunc("GET /swagger/", httpSwagger.Handler(
107-
httpSwagger.URL("http://"+config.Keys.HttpConfig.Address+"/swagger/doc.json")))
108-
}
10973

110-
server := &http.Server{
111-
Handler: r,
112-
Addr: config.Keys.HttpConfig.Address,
113-
WriteTimeout: 30 * time.Second,
114-
ReadTimeout: 30 * time.Second,
74+
// Initialize metric store if configuration is provided
75+
mscfg := ccconf.GetPackageConfig("metric-store")
76+
if mscfg != nil {
77+
metricstore.Init(mscfg, &wg)
78+
} else {
79+
return fmt.Errorf("missing metricstore configuration")
11580
}
11681

117-
// Start http or https server
118-
listener, err := net.Listen("tcp", config.Keys.HttpConfig.Address)
82+
// Initialize HTTP server
83+
srv, err := NewServer(version, commit, date)
11984
if err != nil {
120-
log.Fatalf("starting http listener failed: %v", err)
85+
return fmt.Errorf("creating server: %w", err)
12186
}
12287

123-
if config.Keys.HttpConfig.CertFile != "" && config.Keys.HttpConfig.KeyFile != "" {
124-
cert, err := tls.LoadX509KeyPair(config.Keys.HttpConfig.CertFile, config.Keys.HttpConfig.KeyFile)
125-
if err != nil {
126-
log.Fatalf("loading X509 keypair failed: %v", err)
127-
}
128-
listener = tls.NewListener(listener, &tls.Config{
129-
Certificates: []tls.Certificate{cert},
130-
CipherSuites: []uint16{
131-
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
132-
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
133-
},
134-
MinVersion: tls.VersionTLS12,
135-
PreferServerCipherSuites: true,
136-
})
137-
fmt.Printf("HTTPS server listening at %s...\n", config.Keys.HttpConfig.Address)
138-
} else {
139-
fmt.Printf("HTTP server listening at %s...\n", config.Keys.HttpConfig.Address)
140-
}
88+
// Channel to collect errors from server
89+
errChan := make(chan error, 1)
14190

91+
// Start HTTP server
14292
wg.Add(1)
14393
go func() {
14494
defer wg.Done()
145-
if err = server.Serve(listener); err != nil && err != http.ErrServerClosed {
146-
log.Fatalf("starting server failed: %v", err)
95+
if err := srv.Start(ctx); err != nil {
96+
errChan <- err
14797
}
14898
}()
14999

100+
// Handle shutdown signals
150101
wg.Add(1)
151102
sigs := make(chan os.Signal, 1)
152103
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
153104
go func() {
154105
defer wg.Done()
155-
<-sigs
156-
runtimeEnv.SystemdNotifiy(false, "Shutting down ...")
157-
server.Shutdown(context.Background())
158-
shutdown()
159-
memorystore.Shutdown()
106+
select {
107+
case <-sigs:
108+
cclog.Info("Shutdown signal received")
109+
case <-ctx.Done():
110+
}
111+
112+
runtime.SystemdNotify(false, "Shutting down ...")
113+
srv.Shutdown(ctx)
160114
}()
161115

162-
if config.Keys.Nats != nil {
163-
for _, natsConf := range config.Keys.Nats {
164-
// TODO: When multiple nats configs share a URL, do a single connect.
165-
wg.Add(1)
166-
nc := natsConf
167-
go func() {
168-
// err := ReceiveNats(conf.Nats, decodeLine, runtime.NumCPU()-1, ctx)
169-
err := api.ReceiveNats(nc, ms, 1, ctx)
170-
if err != nil {
171-
log.Fatal(err)
172-
}
173-
wg.Done()
174-
}()
175-
}
116+
runtime.SystemdNotify(true, "running")
117+
118+
// Wait for completion or errors
119+
go func() {
120+
wg.Wait()
121+
close(errChan)
122+
}()
123+
124+
// Wait for either server startup error or shutdown completion
125+
if err := <-errChan; err != nil {
126+
return err
127+
}
128+
129+
cclog.Print("Graceful shutdown completed!")
130+
return nil
131+
}
132+
133+
func run() error {
134+
cliInit()
135+
136+
if flagVersion {
137+
printVersion()
138+
return nil
139+
}
140+
141+
// Initialize logger
142+
cclog.Init(flagLogLevel, flagLogDateTime)
143+
144+
// Initialize gops agent
145+
if err := initGops(); err != nil {
146+
return err
147+
}
148+
149+
// Initialize subsystems in dependency order:
150+
// 1. Load configuration from config.json
151+
// 2. Initialize subsystems like nats
152+
153+
// Load configuration
154+
if err := initConfiguration(); err != nil {
155+
return err
156+
}
157+
158+
// Initialize subsystems (nats, etc.)
159+
if err := initSubsystems(); err != nil {
160+
return err
176161
}
177162

178-
runtimeEnv.SystemdNotifiy(true, "running")
179-
wg.Wait()
180-
log.Print("Graceful shutdown completed!")
163+
// Run server with context
164+
ctx, cancel := context.WithCancel(context.Background())
165+
defer cancel()
166+
167+
return runServer(ctx)
168+
}
169+
170+
func main() {
171+
if err := run(); err != nil {
172+
cclog.Error(err.Error())
173+
os.Exit(1)
174+
}
181175
}

0 commit comments

Comments
 (0)