Skip to content

Commit 4e4c9b9

Browse files
authored
Merge pull request #152 from v3io/development
Development --> Master (0.0.12)
2 parents 2474f95 + b784a53 commit 4e4c9b9

File tree

1,504 files changed

+166795
-865
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

1,504 files changed

+166795
-865
lines changed

Diff for: Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ integration: get
2727

2828
.PHONY: bench
2929
bench: get
30-
go test -run=XXX -bench='^BenchmarkIngest$$' -benchtime 10s -timeout 1m ./test/benchmark/...
30+
go test -run=XXX -bench='^BenchmarkIngest$$' -benchtime 10s -timeout 5m ./test/benchmark/...
3131

3232
.PHONY: build
3333
build: get

Diff for: README.md

+23-19
Original file line numberDiff line numberDiff line change
@@ -81,17 +81,17 @@ A user can run the CLI to add (append) or query the DB, to use the CLI, build th
8181
it has built-in help, see the following add/query examples:
8282

8383
```
84-
# create a DB with some aggregates (at 30 min interval)
85-
tsdbctl create -p <path> -r count,sum,max -i 30
84+
# create a DB with expected ingestion rate of one sample per second and some aggregates (at 30 min interval)
85+
tsdbctl create -t <table> --ingestion-rate 1/s -a count,sum,max -i 30m
8686
8787
# display DB info with metric names (types)
88-
tsdbctl info -n
88+
tsdbctl info -t <table> -n
8989
9090
# append a sample (73.2) to the specified metric type (cpu) + labels at the current time
91-
tsdbctl add cpu os=win,node=xyz123 -d 73.2
91+
tsdbctl add -t <table> cpu os=win,node=xyz123 -d 73.2
9292
9393
# display all the CPU metrics for win servers from the last hours, in CSV format
94-
tsdbctl query cpu -f "os=='win'" -l 1h -o csv
94+
tsdbctl query -t <table> cpu -f "os=='win'" -l 1h -o csv
9595
9696
```
9797

@@ -106,17 +106,21 @@ such as partitioning strategy, retention, aggregators, etc. this can be done via
106106

107107
```go
108108
// Load v3io connection/path details (see YAML below)
109-
v3iocfg, _ := cfg, err = config.LoadConfig("v3io.yaml")
109+
v3iocfg, err := config.GetOrLoadFromFile("v3io.yaml")
110+
if err != nil {
111+
// TODO: handle error
112+
}
110113

111114
// Specify the default DB configuration (can be modified per partition)
112-
dbcfg := config.DBPartConfig{
113-
DaysPerObj: 1,
114-
HrInChunk: 1,
115-
DefaultRollups: "count,avg,sum,stddev",
116-
RollupMin: 30,
115+
sampleRate = "1/s"
116+
aggregatorGranularity = "1h"
117+
aggregatesList = "scount,avg,min,max"
118+
schema, err := schema.NewSchema(v3iocfg, sampleRate, aggregatorGranularity, aggregatesList)
119+
if err != nil {
120+
// TODO: handle error
117121
}
118-
119-
return tsdb.CreateTSDB(v3iocfg, &dbcfg)
122+
123+
return tsdb.CreateTSDB(v3iocfg, schema)
120124
```
121125

122126
> If you plan on using pre-aggregation to speed aggregate queries you should specify the `Rollups` (function list) and
@@ -127,14 +131,14 @@ In order to use the TSDB we need to create an adapter, the `NewV3ioAdapter` func
127131
parameters: the configuration structure, v3io data container object and logger object. The last 2 are optional, in case
128132
you already have container and logger (when using nuclio data bindings).
129133

130-
Configuration is specified in a YAML or JSON format, and can be read from a file using `config.LoadConfig(path string)`
131-
or can be loaded from a local buffer using `config.LoadFromData(data []byte)`. You can see details on the configuration
134+
Configuration is specified in a YAML or JSON format, and can be read from a file using `config.GetOrLoadFromFile(path string)`
135+
or can be loaded from a local buffer using `config.GetOrLoadFromData(data []byte)`. You can see details on the configuration
132136
options in [config](internal/pkg/config/config.go), a minimal configuration looks like:
133137

134138
```yaml
135139
v3ioUrl: "v3io address:port"
136140
container: "tsdb"
137-
path: "metrics"
141+
path: performance
138142
username: "<username>"
139143
password: "<password>"
140144
```
@@ -143,15 +147,15 @@ example of creating an adapter:
143147
144148
```go
145149
// create configuration object from file
146-
cfg, err := config.LoadConfig("v3io.yaml")
150+
cfg, err := config.GetOrLoadFromFile("v3io.yaml")
147151
if err != nil {
148-
panic(err)
152+
// TODO: handle error
149153
}
150154

151155
// create and start a new TSDB adapter
152156
adapter, err := tsdb.NewV3ioAdapter(cfg, nil, nil)
153157
if err != nil {
154-
panic(err)
158+
// TODO: handle error
155159
}
156160
```
157161

Diff for: cmd/tsdbctl/tsdbctl.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,19 @@ import (
77

88
func main() {
99
if err := Run(); err != nil {
10-
1110
os.Exit(1)
1211
}
13-
1412
os.Exit(0)
1513
}
1614

1715
func Run() error {
18-
return tsdbctl.NewRootCommandeer().Execute()
16+
rootCmd := tsdbctl.NewRootCommandeer()
17+
defer tearDown(rootCmd)
18+
return rootCmd.Execute()
19+
}
20+
21+
func tearDown(cmd *tsdbctl.RootCommandeer) {
22+
if cmd.Reporter != nil { // could be nil if has failed on initialisation
23+
cmd.Reporter.Stop()
24+
}
1925
}

Diff for: examples/nuclio/ingest/ingest_example.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func InitContext(context *nuclio.Context) error {
6363

6464
if adapter == nil {
6565
// create adapter once for all contexts
66-
cfg, _ := config.LoadFromData([]byte(tsdbConfig))
66+
cfg, _ := config.GetOrLoadFromData([]byte(tsdbConfig))
6767
data := context.DataBinding["db0"].(*v3io.Container)
6868
adapter, err = tsdb.NewV3ioAdapter(cfg, data, context.Logger)
6969
if err != nil {

Diff for: examples/nuclio/ingest/ingest_example_test.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,15 @@ import (
1212

1313
func TestIngestIntegration(t *testing.T) {
1414
v3ioConfig, err := tsdbtest.LoadV3ioConfig()
15+
if err != nil {
16+
t.Fatalf("unable to load configuration. Error: %v", err)
17+
}
1518
defer tsdbtest.SetUp(t, v3ioConfig)()
16-
tsdbConfig = fmt.Sprintf(`path: "%v"`, v3ioConfig.Path)
19+
tsdbConfig = fmt.Sprintf(`path: "%v"`, v3ioConfig.TablePath)
1720

1821
url := os.Getenv("V3IO_SERVICE_URL")
1922
if url == "" {
20-
url = v3ioConfig.V3ioUrl
23+
url = v3ioConfig.WebApiEndpoint
2124
}
2225

2326
data := nutest.DataBind{

Diff for: examples/nuclio/query/query_example.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func Handler(context *nuclio.Context, event nuclio.Event) (interface{}, error) {
8080

8181
// InitContext runs only once when the function runtime starts
8282
func InitContext(context *nuclio.Context) error {
83-
cfg, _ := config.LoadFromData([]byte(tsdbConfig))
83+
cfg, _ := config.GetOrLoadFromData([]byte(tsdbConfig))
8484
data := context.DataBinding["db0"].(*v3io.Container)
8585
adapter, err := tsdb.NewV3ioAdapter(cfg, data, context.Logger)
8686
if err != nil {

Diff for: examples/nuclio/query/query_example_test.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,15 @@ import (
1212

1313
func TestQueryIntegration(t *testing.T) {
1414
v3ioConfig, err := tsdbtest.LoadV3ioConfig()
15+
if err != nil {
16+
t.Fatalf("unable to load configuration. Error: %v", err)
17+
}
1518
defer tsdbtest.SetUp(t, v3ioConfig)()
16-
tsdbConfig = fmt.Sprintf(`path: "%v"`, v3ioConfig.Path)
19+
tsdbConfig = fmt.Sprintf(`path: "%v"`, v3ioConfig.TablePath)
1720

1821
url := os.Getenv("V3IO_SERVICE_URL")
1922
if url == "" {
20-
url = v3ioConfig.V3ioUrl
23+
url = v3ioConfig.WebApiEndpoint
2124
}
2225

2326
data := nutest.DataBind{

Diff for: examples/v3io.yaml.template

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# File: v3io.yaml.template
2+
# Description: Template of a V3IO TSDB Configuration File
3+
4+
# TODO: In your configuration file, delete the configuration keys that you
5+
# don't need and replace the "<...>" placeholders.
6+
7+
# Endpoint of an Iguazio Continuous Data Platform web-gateway (web-API) service,
8+
# consisting of an IP address or resolvable host domain name, and a port number
9+
# (currently, always port 8081)
10+
# Example: "192.168.1.100:8081"
11+
webApiEndpoint: "<IP address/host name>:8081"
12+
13+
# Name of an Iguazio Continuous Data Platform container for storing the TSDB table
14+
# Example: "bigdata"
15+
container: "<container name>"
16+
17+
# Logging verbosity level
18+
# Valid values: "debug" | "info" | "warn" | "error"
19+
# Default value: "debug"
20+
verbose: "warn"
21+
22+
# Authentication credentials for the web-API service
23+
# Username
24+
username: "<username>"
25+
# Password
26+
password: "<password>"

Diff for: internal/pkg/performance/metrics.go

+184
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
package performance
2+
3+
import (
4+
"fmt"
5+
"github.com/pkg/errors"
6+
"github.com/rcrowley/go-metrics"
7+
"github.com/v3io/v3io-tsdb/pkg/config"
8+
"io"
9+
"log"
10+
"os"
11+
"os/signal"
12+
"sync"
13+
"syscall"
14+
"time"
15+
)
16+
17+
var instance *MetricReporter
18+
var once sync.Once
19+
20+
const (
21+
STDOUT = "stdout"
22+
STDERR = "stderr"
23+
)
24+
25+
type MetricReporter struct {
26+
lock sync.Mutex
27+
running bool
28+
registry metrics.Registry
29+
logWriter io.Writer
30+
reportPeriodically bool
31+
reportIntervalSeconds int
32+
reportOnShutdown bool
33+
}
34+
35+
func DefaultReporterInstance() (reporter *MetricReporter, err error) {
36+
cfg, err := config.GetOrDefaultConfig()
37+
38+
if err != nil {
39+
// DO NOT return the error to prevent failures of unit tests
40+
fmt.Fprintf(os.Stderr, "unable to load configuration. Reason: %v\n"+
41+
"Will use default reporter configuration instead.", err)
42+
reporter = ReporterInstance(STDOUT, true, 60, true)
43+
} else {
44+
reporter = ReporterInstanceFromConfig(cfg)
45+
}
46+
47+
return reporter, nil
48+
}
49+
50+
func ReporterInstance(writeTo string, reportPeriodically bool, reportIntervalSeconds int, reportOnShutdown bool) *MetricReporter {
51+
once.Do(func() {
52+
var writer io.Writer
53+
switch writeTo {
54+
case STDOUT:
55+
writer = os.Stdout
56+
case STDERR:
57+
writer = os.Stderr
58+
default:
59+
writer = os.Stdout
60+
}
61+
62+
instance = newMetricReporter(writer, reportPeriodically, reportIntervalSeconds, reportOnShutdown)
63+
})
64+
return instance
65+
}
66+
67+
func ReporterInstanceFromConfig(config *config.V3ioConfig) *MetricReporter {
68+
return ReporterInstance(
69+
config.MetricsReporter.Output,
70+
config.MetricsReporter.ReportPeriodically,
71+
config.MetricsReporter.RepotInterval,
72+
config.MetricsReporter.ReportOnShutdown)
73+
}
74+
75+
func (mr *MetricReporter) Start() error {
76+
mr.lock.Lock()
77+
defer mr.lock.Unlock()
78+
79+
if !mr.running {
80+
mr.running = true
81+
} else {
82+
return errors.Errorf("metric reporter is already running.")
83+
}
84+
85+
return nil
86+
}
87+
88+
func (mr *MetricReporter) Stop() error {
89+
mr.lock.Lock()
90+
defer mr.lock.Unlock()
91+
92+
if mr.running {
93+
mr.running = false
94+
if mr.reportOnShutdown {
95+
time.Sleep(300 * time.Millisecond) // postpone performance report on shutdown to avoid mixing with other log messages
96+
metrics.WriteOnce(mr.registry, mr.logWriter)
97+
}
98+
mr.registry.UnregisterAll()
99+
} else {
100+
return errors.Errorf("can't stop metric reporter since it's not running.")
101+
}
102+
103+
return nil
104+
}
105+
106+
func (mr *MetricReporter) GetTimer(name string) (metrics.Timer, error) {
107+
if mr.running {
108+
return metrics.GetOrRegisterTimer(name, mr.registry), nil
109+
} else {
110+
return nil, errors.Errorf("failed to create timer '%s'. Reason: metric reporter in not running", name)
111+
}
112+
}
113+
114+
func (mr *MetricReporter) GetCounter(name string) (metrics.Counter, error) {
115+
if mr.running {
116+
return metrics.GetOrRegisterCounter(name, mr.registry), nil
117+
} else {
118+
return nil, errors.Errorf("failed to create counter '%s'. Reason: metric reporter in not running", name)
119+
}
120+
}
121+
122+
func (mr *MetricReporter) GetMeter(name string) (metrics.Meter, error) {
123+
if mr.running {
124+
return metrics.GetOrRegisterMeter(name, mr.registry), nil
125+
} else {
126+
return nil, errors.Errorf("failed to create meter '%s'. Reason: metric reporter in not running", name)
127+
}
128+
}
129+
130+
func (mr *MetricReporter) GetHistogram(name string, reservoirSize int) (metrics.Histogram, error) {
131+
if mr.running {
132+
sample := metrics.NewUniformSample(reservoirSize)
133+
return metrics.GetOrRegisterHistogram(name, mr.registry, sample), nil
134+
} else {
135+
return nil, errors.Errorf("failed to create histogram '%s'. Reason: metric reporter in not running", name)
136+
}
137+
}
138+
139+
// Listen to the SIGINT and SIGTERM
140+
// SIGINT will listen to CTRL-C.
141+
// SIGTERM will be caught if kill command executed.
142+
func (mr *MetricReporter) registerShutdownHook() {
143+
var gracefulStop = make(chan os.Signal)
144+
// Register for specific signals
145+
signal.Notify(gracefulStop, syscall.SIGINT, syscall.SIGTERM)
146+
147+
go func() {
148+
sig := <-gracefulStop
149+
mr.logWriter.Write([]byte(fmt.Sprintf("\n**************************\ncaught sig: %+v\n**************************\n", sig)))
150+
metrics.WriteOnce(mr.registry, mr.logWriter)
151+
}()
152+
}
153+
154+
func newMetricReporter(outputWriter io.Writer, reportPeriodically bool, reportIntervalSeconds int, reportOnShutdown bool) *MetricReporter {
155+
var writer io.Writer
156+
157+
if outputWriter != nil {
158+
writer = outputWriter
159+
} else {
160+
writer = os.Stderr
161+
}
162+
163+
reporter := MetricReporter{
164+
registry: metrics.NewPrefixedRegistry("v3io-tsdb -> "),
165+
logWriter: writer,
166+
running: true,
167+
reportPeriodically: reportPeriodically,
168+
reportIntervalSeconds: reportIntervalSeconds,
169+
reportOnShutdown: reportOnShutdown,
170+
}
171+
172+
if reportPeriodically && reportIntervalSeconds > 0 {
173+
// Log periodically
174+
go metrics.Log(reporter.registry,
175+
time.Duration(reportIntervalSeconds)*time.Second,
176+
log.New(reporter.logWriter, "metrics: ", log.Lmicroseconds))
177+
}
178+
179+
if reportOnShutdown {
180+
reporter.registerShutdownHook()
181+
}
182+
183+
return &reporter
184+
}

0 commit comments

Comments
 (0)