Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions carbon/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ func (app *App) Start() (err error) {
receiver.DropPast(uint32(conf.Tcp.DropPast.Value().Seconds())),
receiver.DropLongerThan(conf.Tcp.DropLongerThan),
receiver.ReadTimeout(uint32(conf.Tcp.ReadTimeout.Value().Seconds())),
receiver.ValidationRegex(conf.Common.ValidationRegex),
)

if err != nil {
Expand All @@ -274,6 +275,7 @@ func (app *App) Start() (err error) {
receiver.DropFuture(uint32(conf.Udp.DropFuture.Value().Seconds())),
receiver.DropPast(uint32(conf.Udp.DropPast.Value().Seconds())),
receiver.DropLongerThan(conf.Udp.DropLongerThan),
receiver.ValidationRegex(conf.Common.ValidationRegex),
)

if err != nil {
Expand All @@ -292,6 +294,7 @@ func (app *App) Start() (err error) {
receiver.DropFuture(uint32(conf.Pickle.DropFuture.Value().Seconds())),
receiver.DropPast(uint32(conf.Pickle.DropPast.Value().Seconds())),
receiver.DropLongerThan(conf.Pickle.DropLongerThan),
receiver.ValidationRegex(conf.Common.ValidationRegex),
)

if err != nil {
Expand All @@ -309,6 +312,7 @@ func (app *App) Start() (err error) {
receiver.DropFuture(uint32(conf.Grpc.DropFuture.Value().Seconds())),
receiver.DropPast(uint32(conf.Grpc.DropPast.Value().Seconds())),
receiver.DropLongerThan(conf.Grpc.DropLongerThan),
receiver.ValidationRegex(conf.Common.ValidationRegex),
)

if err != nil {
Expand All @@ -326,6 +330,7 @@ func (app *App) Start() (err error) {
receiver.DropFuture(uint32(conf.Prometheus.DropFuture.Value().Seconds())),
receiver.DropPast(uint32(conf.Prometheus.DropPast.Value().Seconds())),
receiver.DropLongerThan(conf.Prometheus.DropLongerThan),
receiver.ValidationRegex(conf.Common.ValidationRegex),
)

if err != nil {
Expand All @@ -344,6 +349,7 @@ func (app *App) Start() (err error) {
receiver.DropPast(uint32(conf.TelegrafHttpJson.DropPast.Value().Seconds())),
receiver.DropLongerThan(conf.TelegrafHttpJson.DropLongerThan),
receiver.ConcatChar(conf.TelegrafHttpJson.Concat),
receiver.ValidationRegex(conf.Common.ValidationRegex),
)

if err != nil {
Expand Down
18 changes: 13 additions & 5 deletions carbon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"io/ioutil"
"regexp"
"strings"
"time"

Expand All @@ -20,11 +21,12 @@ const (
)

type commonConfig struct {
MetricPrefix string `toml:"metric-prefix"`
MetricInterval *config.Duration `toml:"metric-interval"`
MetricEndpoint string `toml:"metric-endpoint"`
MaxCPU int `toml:"max-cpu"`
Enabled bool `toml:"enabled"`
MetricPrefix string `toml:"metric-prefix"`
MetricInterval *config.Duration `toml:"metric-interval"`
MetricEndpoint string `toml:"metric-endpoint"`
MaxCPU int `toml:"max-cpu"`
Enabled bool `toml:"enabled"`
ValidationRegex string `toml:"validation-regex"`
}

type clickhouseConfig struct {
Expand Down Expand Up @@ -279,6 +281,12 @@ func ReadConfig(filename string, exactConfig bool) (*Config, error) {
}
}

if cfg.Common.ValidationRegex != "" {
if _, err := regexp.Compile(cfg.Common.ValidationRegex); err != nil {
return nil, fmt.Errorf("invalid regex in validation-regex option: %s", err.Error())
}
}

if cfg.Logging == nil {
cfg.Logging = make([]zapwriter.Config, 0)
}
Expand Down
31 changes: 22 additions & 9 deletions receiver/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package receiver
import (
"fmt"
"net/http"
"regexp"
"sort"
"sync"
"sync/atomic"
Expand All @@ -18,15 +19,16 @@ const droppedListSize = 1000
type Base struct {
stop.Struct
stat struct {
samplesReceived uint64 // atomic
messagesReceived uint64 // atomic
metricsReceived uint64 // atomic
errors uint64 // atomic
active int64 // atomic
incompleteReceived uint64 // atomic
futureDropped uint64 // atomic
pastDropped uint64 // atomic
tooLongDropped uint64 // atomic
samplesReceived uint64 // atomic
messagesReceived uint64 // atomic
metricsReceived uint64 // atomic
errors uint64 // atomic
active int64 // atomic
incompleteReceived uint64 // atomic
futureDropped uint64 // atomic
pastDropped uint64 // atomic
tooLongDropped uint64 // atomic
validationRegexDropped uint64 // atomic
}
droppedList [droppedListSize]string
droppedListNext int
Expand All @@ -36,6 +38,7 @@ type Base struct {
dropPastSeconds uint32
dropTooLongLimit uint16
readTimeoutSeconds uint32
validationRegex *regexp.Regexp
writeChan chan *RowBinary.WriteBuffer
logger *zap.Logger
Tags tags.TagConfig
Expand Down Expand Up @@ -85,6 +88,14 @@ func (base *Base) isDropMetricNameTooLong(name string) bool {
return false
}

func (base *Base) isMatchedByValidationRegex(name []byte) bool {
if base.validationRegex != nil && base.validationRegex.Match(name) {
atomic.AddUint64(&base.stat.validationRegexDropped, 1)
return true
}
return false
}

func (base *Base) DroppedHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain")

Expand Down Expand Up @@ -143,6 +154,8 @@ func (base *Base) SendStat(send func(metric string, value float64), fields ...st
sendUint64Counter(send, f, &base.stat.pastDropped)
case "tooLongDropped":
sendUint64Counter(send, f, &base.stat.tooLongDropped)
case "validationRegexDropped":
sendUint64Counter(send, f, &base.stat.validationRegexDropped)
case "errors":
sendUint64Counter(send, f, &base.stat.errors)
case "active":
Expand Down
2 changes: 1 addition & 1 deletion receiver/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (g *GRPC) Addr() net.Addr {
}

func (g *GRPC) Stat(send func(metric string, value float64)) {
g.SendStat(send, "metricsReceived", "errors", "futureDropped", "pastDropped", "tooLongDropped")
g.SendStat(send, "metricsReceived", "errors", "futureDropped", "pastDropped", "tooLongDropped", "validationRegexDropped")
}

// Listen bind port. Receive messages and send to out channel
Expand Down
2 changes: 1 addition & 1 deletion receiver/pickle.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (rcv *Pickle) Addr() net.Addr {

func (rcv *Pickle) Stat(send func(metric string, value float64)) {
rcv.SendStat(send, "metricsReceived", "messagesReceived", "errors", "active", "futureDropped", "pastDropped",
"tooLongDropped")
"tooLongDropped", "validationRegexDropped")
}

func (rcv *Pickle) HandleConnection(conn net.Conn) {
Expand Down
4 changes: 4 additions & 0 deletions receiver/plain.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func (base *Base) PlainParseLine(p []byte, now uint32, buf *tags.GraphiteBuf) ([
i3--
}

if base.isMatchedByValidationRegex(p[:i1]) {
return nil, 0, 0, errors.New("metric name matched by validation regex: '" + unsafeString(p) + "'")
}

value, err := strconv.ParseFloat(unsafeString(p[i1+1:i2]), 64)
if err != nil || math.IsNaN(value) {
return nil, 0, 0, errors.New("bad message: '" + unsafeString(p) + "'")
Expand Down
127 changes: 127 additions & 0 deletions receiver/plain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package receiver
import (
"context"
"fmt"
"regexp"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -177,6 +178,8 @@ func TestPlainParseLine(t *testing.T) {
{"metric.name;tag=value;k=v 42.15 1422642189\r\n", "metric.name?k=v&tag=value", 42.15, 1422642189},
{"metric..name 42.15 -1\n", "metric.name", 42.15, now},
{"cpu.loadavg;env=test2;host=host1;env=test 21.4 1422642189\n", "cpu.loadavg?env=test&host=host1", 21.4, 1422642189},
{"cpu.loadavg~ 21.4 1422642189\n", "cpu.loadavg~", 21.4, 1422642189},
{"cpu.loadavg~;env=test2;host=host1;env=test 21.4 1422642189\n", "cpu.loadavg~?env=test&host=host1", 21.4, 1422642189},
}

base := &Base{}
Expand All @@ -202,4 +205,128 @@ func TestPlainParseLine(t *testing.T) {
}
}
}

tableWithValidation := [](struct {
b string
name string
value float64
timestamp uint32
}){
{b: "42"},
{b: ""},
{b: "\n"},
{b: "metric..name 42 \n"},
{b: "metric..name 42"},
{b: "metric.name 42 a1422642189\n"},
{b: "metric.name 42a 1422642189\n"},
{b: "metric.name NaN 1422642189\n"},
{b: "metric.name 42 NaN\n"},
{"metric.name -42.76 1422642189\n", "metric.name", -42.76, 1422642189},
{"metric.name 42.15 1422642189\n", "metric.name", 42.15, 1422642189},
{"metric..name 42.15 1422642189\n", "metric.name", 42.15, 1422642189},
{"metric...name 42.15 1422642189\n", "metric.name", 42.15, 1422642189},
{"metric.name 42.15 1422642189\r\n", "metric.name", 42.15, 1422642189},
{"metric.name;tag=value;k=v 42.15 1422642189\r\n", "metric.name?k=v&tag=value", 42.15, 1422642189},
{"metric..name 42.15 -1\n", "metric.name", 42.15, now},
{"cpu.loadavg;env=test2;host=host1;env=test 21.4 1422642189\n", "cpu.loadavg?env=test&host=host1", 21.4, 1422642189},

// Additional test cases for validation
// Test invalid characters in metric names
{b: "metric@name 42.15 1422642189\n"},
{b: "metric#name 42.15 1422642189\n"},
{b: "metric$name 42.15 1422642189\n"},
{b: "metric%name 42.15 1422642189\n"},
{b: "metric&name 42.15 1422642189\n"},
{b: "metric*name 42.15 1422642189\n"},
{b: "metric!name 42.15 1422642189\n"},
{b: "metric name 42.15 1422642189\n"}, // space in metric name
{b: "metric\tname 42.15 1422642189\n"}, // tab in metric name
{b: "metric[name] 42.15 1422642189\n"},
{b: "metric{name} 42.15 1422642189\n"},
{b: "metric(name) 42.15 1422642189\n"},
{b: "metric/name 42.15 1422642189\n"},
{b: "metric\\name 42.15 1422642189\n"},
{b: "metric|name 42.15 1422642189\n"},
{b: "metric?name 42.15 1422642189\n"},
{b: "metric<name> 42.15 1422642189\n"},
{b: "metric'name' 42.15 1422642189\n"},
{b: "metric\"name\" 42.15 1422642189\n"},

// Test valid characters that should pass
{"metric-name 42.15 1422642189\n", "metric-name", 42.15, 1422642189},
{"metric_name 42.15 1422642189\n", "metric_name", 42.15, 1422642189},
{"metric:name 42.15 1422642189\n", "metric:name", 42.15, 1422642189},
{"metric.sub.name 42.15 1422642189\n", "metric.sub.name", 42.15, 1422642189},
{"metric-123_test:data 42.15 1422642189\n", "metric-123_test:data", 42.15, 1422642189},

// Test invalid characters in tags
{b: "metric.name;tag@=value 42.15 1422642189\n"},
{b: "metric.name;tag=val@ue 42.15 1422642189\n"},
{b: "metric.name;t ag=value 42.15 1422642189\n"},
{b: "metric.name;tag=val ue 42.15 1422642189\n"},
{b: "metric.name;tag#key=value 42.15 1422642189\n"},
{b: "metric.name;tag=value! 42.15 1422642189\n"},
{b: "metric.name;tag=value;key=val*ue 42.15 1422642189\n"},
{b: "metric.name;tag=value;k ey=value 42.15 1422642189\n"},
{b: "metric.name;tag=value;key=val\tue 42.15 1422642189\n"},
{b: "metric.name;tag=value;key=val\nue 42.15 1422642189\n"},

// Test valid tags that should pass
{"metric.name;env=prod 42.15 1422642189\n", "metric.name?env=prod", 42.15, 1422642189},
{"metric.name;env=prod;region=us-east-1 42.15 1422642189\n", "metric.name?env=prod&region=us-east-1", 42.15, 1422642189},
{"metric.name;tag-name=tag-value 42.15 1422642189\n", "metric.name?tag-name=tag-value", 42.15, 1422642189},
{"metric.name;tag_name=tag_value 42.15 1422642189\n", "metric.name?tag_name=tag_value", 42.15, 1422642189},
{"metric.name;tag:name=tag:value 42.15 1422642189\n", "metric.name?tag%3Aname=tag%3Avalue", 42.15, 1422642189},
{"metric.name;tag.name=tag.value 42.15 1422642189\n", "metric.name?tag.name=tag.value", 42.15, 1422642189},

// Test edge cases with multiple invalid characters
{b: "metric@#$%name 42.15 1422642189\n"},
{b: "metric.name;tag@#=value$% 42.15 1422642189\n"},
{b: "met!ric.na@me;ta#g=val$ue 42.15 1422642189\n"},

// Test unicode characters (should fail validation)
{b: "metric.名前 42.15 1422642189\n"},
{b: "metric.name;tag=値 42.15 1422642189\n"},
{b: "metric.name;标签=value 42.15 1422642189\n"},
{b: "метрика.name 42.15 1422642189\n"},

// Test empty tag keys/values
{b: "metric.name;=value 42.15 1422642189\n"},
{b: "metric.name;= 42.15 1422642189\n"},

// Test metrics with numbers
{"metric123 42.15 1422642189\n", "metric123", 42.15, 1422642189},
{"123metric 42.15 1422642189\n", "123metric", 42.15, 1422642189},
{"123 42.15 1422642189\n", "123", 42.15, 1422642189},

// Test metrics with only valid special characters
{"metric-_.:name 42.15 1422642189\n", "metric-_.:name", 42.15, 1422642189},
{"metric.name;tag-_.:key=tag-_.:value 42.15 1422642189\n", "metric.name?tag-_.%3Akey=tag-_.%3Avalue", 42.15, 1422642189},

// Additional tests for colon encoding
{"host:port:metric 42.15 1422642189\n", "host:port:metric", 42.15, 1422642189},
{"metric.name;service:port=web:8080 42.15 1422642189\n", "metric.name?service%3Aport=web%3A8080", 42.15, 1422642189},
{"app:service:metric;env=prod:primary 42.15 1422642189\n", "app:service:metric?env=prod%3Aprimary", 42.15, 1422642189},
}

baseWithValidation := &Base{validationRegex: regexp.MustCompile(`[^a-zA-Z0-9.;\-_:=]{1}`)}
for _, p := range tableWithValidation {
name, value, timestamp, err := baseWithValidation.PlainParseLine([]byte(p.b), now, &tagBuf)
if p.name == "" {
// expected error
if err == nil {
t.Fatal("error expected")
}
} else {
if string(name) != p.name {
t.Fatalf("%#v != %#v", string(name), p.name)
}
if value != p.value {
t.Fatalf("%#v != %#v", value, p.value)
}
if timestamp != p.timestamp {
t.Fatalf("%d != %d", timestamp, p.timestamp)
}
}
}
}
2 changes: 1 addition & 1 deletion receiver/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (rcv *PrometheusRemoteWrite) Addr() net.Addr {
}

func (rcv *PrometheusRemoteWrite) Stat(send func(metric string, value float64)) {
rcv.SendStat(send, "samplesReceived", "errors", "futureDropped", "pastDropped", "tooLongDropped")
rcv.SendStat(send, "samplesReceived", "errors", "futureDropped", "pastDropped", "tooLongDropped", "validationRegexDropped")
}

// Listen bind port. Receive messages and send to out channel
Expand Down
13 changes: 13 additions & 0 deletions receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net"
"net/http"
"net/url"
"regexp"
"strings"

"github.com/lomik/carbon-clickhouse/helper/RowBinary"
Expand Down Expand Up @@ -90,6 +91,18 @@ func ConcatChar(concat string) Option {
}
}

// ValidationRegex creates option for New constructor
func ValidationRegex(regex string) Option {
return func(r interface{}) error {
if t, ok := r.(*Base); ok {
if regex != "" {
t.validationRegex = regexp.MustCompile(regex)
}
}
return nil
}
}

// New creates udp, tcp, pickle receiver
func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) {
u, err := url.Parse(dsn)
Expand Down
2 changes: 1 addition & 1 deletion receiver/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (rcv *TCP) Addr() net.Addr {
}

func (rcv *TCP) Stat(send func(metric string, value float64)) {
rcv.SendStat(send, "metricsReceived", "errors", "active", "futureDropped", "pastDropped", "tooLongDropped")
rcv.SendStat(send, "metricsReceived", "errors", "active", "futureDropped", "pastDropped", "tooLongDropped", "validationRegexDropped")
}

func (rcv *TCP) HandleConnection(conn net.Conn) {
Expand Down
2 changes: 1 addition & 1 deletion receiver/telegraf_http_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (rcv *TelegrafHttpJson) Addr() net.Addr {
}

func (rcv *TelegrafHttpJson) Stat(send func(metric string, value float64)) {
rcv.SendStat(send, "samplesReceived", "errors", "futureDropped", "pastDropped", "tooLongDropped")
rcv.SendStat(send, "samplesReceived", "errors", "futureDropped", "pastDropped", "tooLongDropped", "validationRegexDropped")
}

// Listen bind port. Receive messages and send to out channel
Expand Down
2 changes: 1 addition & 1 deletion receiver/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (rcv *UDP) Addr() net.Addr {

func (rcv *UDP) Stat(send func(metric string, value float64)) {
rcv.SendStat(send, "metricsReceived", "errors", "incompleteReceived", "futureDropped", "pastDropped",
"tooLongDropped")
"tooLongDropped", "validationRegexDropped")
}

func (rcv *UDP) receiveWorker(ctx context.Context) {
Expand Down
Loading