Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit f08b23f

Browse files
authored
Merge pull request #704 from raintank/whisper_importer_cassandra_conn
Improvements on whisper importer
2 parents 3b731e3 + 992ad72 commit f08b23f

File tree

2 files changed

+105
-35
lines changed

2 files changed

+105
-35
lines changed

cmd/mt-whisper-importer-reader/main.go

+28-3
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package main
22

33
import (
4+
"crypto/tls"
5+
"encoding/base64"
46
"errors"
57
"flag"
68
"fmt"
79
"io"
10+
"io/ioutil"
811
"net/http"
912
"os"
1013
"path/filepath"
@@ -63,6 +66,11 @@ var (
6366
1,
6467
"Organization ID the data belongs to ",
6568
)
69+
insecureSSL = flag.Bool(
70+
"insecure-ssl",
71+
false,
72+
"Disables ssl certificate verification",
73+
)
6674
whisperDirectory = flag.String(
6775
"whisper-directory",
6876
"/opt/graphite/storage/whisper",
@@ -73,6 +81,11 @@ var (
7381
"*",
7482
"Comma separated list of positive integers or '*' for all archives",
7583
)
84+
httpAuth = flag.String(
85+
"http-auth",
86+
"",
87+
"The credentials used to authenticate in the format \"user:password\"",
88+
)
7689
chunkSpans []uint32
7790
readArchives map[int]struct{}
7891
printLock sync.Mutex
@@ -139,7 +152,10 @@ func log(msg string) {
139152
}
140153

141154
func processFromChan(files chan string, wg *sync.WaitGroup) {
142-
client := &http.Client{}
155+
tr := &http.Transport{
156+
TLSClientConfig: &tls.Config{InsecureSkipVerify: *insecureSSL},
157+
}
158+
client := &http.Client{Transport: tr}
143159

144160
for file := range files {
145161
fd, err := os.Open(file)
@@ -173,11 +189,20 @@ func processFromChan(files chan string, wg *sync.WaitGroup) {
173189
req.Header.Set("Content-Type", "application/json")
174190
req.Header.Set("Content-Encoding", "gzip")
175191

176-
_, err = client.Do(req)
192+
if len(*httpAuth) > 0 {
193+
req.Header.Add("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(*httpAuth)))
194+
}
195+
196+
resp, err := client.Do(req)
177197
if err != nil {
178198
throwError(fmt.Sprintf("Error sending request to http endpoint %q: %q", *httpEndpoint, err))
179199
continue
180200
}
201+
if resp.StatusCode != 200 {
202+
throwError(fmt.Sprintf("Error when submitting data: %s", resp.Status))
203+
}
204+
io.Copy(ioutil.Discard, resp.Body)
205+
resp.Body.Close()
181206
}
182207
wg.Done()
183208
}
@@ -363,7 +388,7 @@ func getFileListIntoChan(fileChan chan string) {
363388
filepath.Walk(
364389
*whisperDirectory,
365390
func(path string, info os.FileInfo, err error) error {
366-
if path[len(path)-4:] == ".wsp" {
391+
if len(path) >= 4 && path[len(path)-4:] == ".wsp" {
367392
fileChan <- path
368393
}
369394
return nil

cmd/mt-whisper-importer-writer/main.go

+77-32
Original file line numberDiff line numberDiff line change
@@ -25,87 +25,128 @@ import (
2525
)
2626

2727
var (
28-
exitOnError = flag.Bool(
28+
globalFlags = flag.NewFlagSet("global config flags", flag.ExitOnError)
29+
30+
exitOnError = globalFlags.Bool(
2931
"exit-on-error",
3032
true,
3133
"Exit with a message when there's an error",
3234
)
33-
verbose = flag.Bool(
35+
verbose = globalFlags.Bool(
3436
"verbose",
3537
false,
3638
"Write logs to terminal",
3739
)
38-
fakeAvgAggregates = flag.Bool(
40+
fakeAvgAggregates = globalFlags.Bool(
3941
"fake-avg-aggregates",
4042
true,
4143
"Generate sum/cnt series out of avg series to accommodate metrictank",
4244
)
43-
httpEndpoint = flag.String(
45+
httpEndpoint = globalFlags.String(
4446
"http-endpoint",
4547
"127.0.0.1:8080",
4648
"The http endpoint to listen on",
4749
)
48-
cassandraAddrs = flag.String(
49-
"cassandra-addrs",
50-
"localhost",
51-
"cassandra host (may be given multiple times as comma-separated list)",
52-
)
53-
cassandraKeyspace = flag.String(
54-
"cassandra-keyspace",
55-
"metrictank",
56-
"cassandra keyspace to use for storing the metric data table",
57-
)
58-
ttlsStr = flag.String(
50+
ttlsStr = globalFlags.String(
5951
"ttls",
6052
"35d",
6153
"list of ttl strings used by MT separated by ','",
6254
)
63-
windowFactor = flag.Int(
55+
windowFactor = globalFlags.Int(
6456
"window-factor",
6557
20,
6658
"the window factor be used when creating the metric table schema",
6759
)
68-
partitionScheme = flag.String(
60+
partitionScheme = globalFlags.String(
6961
"partition-scheme",
7062
"bySeries",
7163
"method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)",
7264
)
73-
uriPath = flag.String(
65+
uriPath = globalFlags.String(
7466
"uri-path",
7567
"/chunks",
7668
"the URI on which we expect chunks to get posted",
7769
)
78-
numPartitions = flag.Int(
70+
numPartitions = globalFlags.Int(
7971
"num-partitions",
8072
1,
8173
"Number of Partitions",
8274
)
75+
76+
cassandraAddrs = globalFlags.String("cassandra-addrs", "localhost", "cassandra host (may be given multiple times as comma-separated list)")
77+
cassandraKeyspace = globalFlags.String("cassandra-keyspace", "raintank", "cassandra keyspace to use for storing the metric data table")
78+
cassandraConsistency = globalFlags.String("cassandra-consistency", "one", "write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one")
79+
cassandraHostSelectionPolicy = globalFlags.String("cassandra-host-selection-policy", "tokenaware,hostpool-epsilon-greedy", "")
80+
cassandraTimeout = globalFlags.Int("cassandra-timeout", 1000, "cassandra timeout in milliseconds")
81+
cassandraReadConcurrency = globalFlags.Int("cassandra-read-concurrency", 20, "max number of concurrent reads to cassandra.")
82+
cassandraReadQueueSize = globalFlags.Int("cassandra-read-queue-size", 100, "max number of outstanding reads before blocking. value doesn't matter much")
83+
cassandraRetries = globalFlags.Int("cassandra-retries", 0, "how many times to retry a query before failing it")
84+
cqlProtocolVersion = globalFlags.Int("cql-protocol-version", 4, "cql protocol version to use")
85+
86+
cassandraSSL = globalFlags.Bool("cassandra-ssl", false, "enable SSL connection to cassandra")
87+
cassandraCaPath = globalFlags.String("cassandra-ca-path", "/etc/metrictank/ca.pem", "cassandra CA certificate path when using SSL")
88+
cassandraHostVerification = globalFlags.Bool("cassandra-host-verification", true, "host (hostname and server cert) verification when using SSL")
89+
90+
cassandraAuth = globalFlags.Bool("cassandra-auth", false, "enable cassandra authentication")
91+
cassandraUsername = globalFlags.String("cassandra-username", "cassandra", "username for authentication")
92+
cassandraPassword = globalFlags.String("cassandra-password", "cassandra", "password for authentication")
93+
8394
GitHash = "(none)"
8495
printLock sync.Mutex
8596
)
8697

8798
type Server struct {
88-
Cluster *gocql.ClusterConfig
8999
Session *gocql.Session
90100
TTLTables mdata.TTLTables
91101
Partitioner partitioner.Partitioner
92102
Index idx.MetricIndex
93103
}
94104

95105
func main() {
96-
cassandra.ConfigSetup()
97-
flag.Parse()
106+
cassFlags := cassandra.ConfigSetup()
107+
108+
flag.Usage = func() {
109+
fmt.Println("mt-whisper-importer-writer")
110+
fmt.Println()
111+
fmt.Println("Opens an endpoint to send data to, which then gets stored in the MT internal DB(s)")
112+
fmt.Println()
113+
fmt.Printf("Usage:\n\n")
114+
fmt.Printf(" mt-whisper-importer-writer [global config flags] <idxtype> [idx config flags] \n\n")
115+
fmt.Printf("global config flags:\n\n")
116+
globalFlags.PrintDefaults()
117+
fmt.Println()
118+
fmt.Printf("idxtype: only 'cass' supported for now\n\n")
119+
fmt.Printf("cass config flags:\n\n")
120+
cassFlags.PrintDefaults()
121+
fmt.Println()
122+
fmt.Println("EXAMPLES:")
123+
fmt.Println("mt-whisper-importer-writer -cassandra-addrs=192.168.0.1 -cassandra-keyspace=mydata -exit-on-error=true -fake-avg-aggregates=true -http-endpoint=0.0.0.0:8080 -num-partitions=8 -partition-scheme=bySeries -ttls=8d,2y -uri-path=/chunks -verbose=true -window-factor=20 cass -hosts=192.168.0.1:9042 -keyspace=mydata")
124+
}
98125

99-
cassCluster := gocql.NewCluster(strings.Split(*cassandraAddrs, ",")...)
100-
cassCluster.Consistency = gocql.ParseConsistency("one")
101-
cassCluster.Timeout = time.Second
102-
cassCluster.NumConns = 2
103-
cassCluster.ProtoVersion = 4
104-
cassCluster.Keyspace = *cassandraKeyspace
126+
if len(os.Args) == 2 && (os.Args[1] == "-h" || os.Args[1] == "--help") {
127+
flag.Usage()
128+
os.Exit(0)
129+
}
130+
131+
var cassI int
132+
for i, v := range os.Args {
133+
if v == "cass" {
134+
cassI = i
135+
}
136+
}
137+
if cassI == 0 {
138+
fmt.Println("only indextype 'cass' supported")
139+
flag.Usage()
140+
os.Exit(1)
141+
}
142+
143+
globalFlags.Parse(os.Args[1:cassI])
144+
cassFlags.Parse(os.Args[cassI+1 : len(os.Args)])
145+
cassandra.Enabled = true
105146

106-
session, err := cassCluster.CreateSession()
147+
store, err := mdata.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraReadConcurrency, *cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, *windowFactor, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, nil)
107148
if err != nil {
108-
panic(fmt.Sprintf("Failed to create cassandra session: %q", err))
149+
panic(fmt.Sprintf("Failed to initialize cassandra: %q", err))
109150
}
110151

111152
splits := strings.Split(*ttlsStr, ",")
@@ -121,8 +162,7 @@ func main() {
121162
}
122163

123164
server := &Server{
124-
Cluster: cassCluster,
125-
Session: session,
165+
Session: store.Session,
126166
TTLTables: ttlTables,
127167
Partitioner: p,
128168
Index: cassandra.New(),
@@ -131,6 +171,7 @@ func main() {
131171
server.Index.Init()
132172

133173
http.HandleFunc(*uriPath, server.chunksHandler)
174+
http.HandleFunc("/healthz", server.healthzHandler)
134175

135176
log(fmt.Sprintf("Listening on %q", *httpEndpoint))
136177
err = http.ListenAndServe(*httpEndpoint, nil)
@@ -158,6 +199,10 @@ func log(msg string) {
158199
}
159200
}
160201

202+
func (s *Server) healthzHandler(w http.ResponseWriter, req *http.Request) {
203+
w.Write([]byte("ok"))
204+
}
205+
161206
func (s *Server) chunksHandler(w http.ResponseWriter, req *http.Request) {
162207
metric := &archive.Metric{}
163208
err := metric.UnmarshalCompressed(req.Body)

0 commit comments

Comments
 (0)