@@ -10,6 +10,7 @@ import (
1010 "time"
1111
1212 "github.com/lomik/carbon-clickhouse/helper/RowBinary"
13+ "go.uber.org/zap"
1314)
1415
1516type Tagged struct {
@@ -20,6 +21,8 @@ type Tagged struct {
2021var _ Uploader = & Tagged {}
2122var _ UploaderWithReset = & Tagged {}
2223
24+ var errBufOverflow = fmt .Errorf ("output buffer overflow" )
25+
2326func NewTagged (base * Base ) * Tagged {
2427 u := & Tagged {}
2528 u .cached = newCached (base )
@@ -49,6 +52,74 @@ func urlParse(rawurl string) (*url.URL, error) {
4952 return m , err
5053}
5154
55+ func (u * Tagged ) parseName (name string , days uint16 ,
56+ // reusable buffers
57+ tag1 []string , wb * RowBinary.WriteBuffer , tagsBuf * RowBinary.WriteBuffer ) error {
58+
59+ m , err := urlParse (name )
60+ if err != nil {
61+ return err
62+ }
63+
64+ version := uint32 (time .Now ().Unix ())
65+
66+ wb .Reset ()
67+ tagsBuf .Reset ()
68+ tag1 = tag1 [:0 ]
69+
70+ t := fmt .Sprintf ("__name__=%s" , m .Path )
71+ tag1 = append (tag1 , t )
72+ tagsBuf .WriteString (t )
73+
74+ // calc size for prevent buffer overflow
75+ sizeTags := RowBinary .SIZE_INT16 /* days */ +
76+ RowBinary .SIZE_INT64 + len (m .Path ) +
77+ RowBinary .SIZE_INT64 + len (name ) +
78+ RowBinary .SIZE_INT64 + // tagsBuf.Len() not known at this step
79+ RowBinary .SIZE_INT16 //version
80+
81+ // don't upload any other tag but __name__
82+ // if either main metric (m.Path) or each metric (*) is ignored
83+ ignoreAllButName := u .ignoredMetrics [m .Path ] || u .ignoredMetrics ["*" ]
84+ tagsWritten := 1
85+ for k , v := range m .Query () {
86+ t := fmt .Sprintf ("%s=%s" , k , v [0 ])
87+
88+ sizeTags += RowBinary .SIZE_INT16 /* days */ +
89+ RowBinary .SIZE_INT64 + len (t ) +
90+ RowBinary .SIZE_INT64 + len (name ) +
91+ RowBinary .SIZE_INT64 + // tagsBuf.Len() not known at this step
92+ RowBinary .SIZE_INT16 //version
93+
94+ if sizeTags >= wb .FreeSize () {
95+ return errBufOverflow
96+ }
97+
98+ tagsBuf .WriteString (t )
99+ tagsWritten ++
100+
101+ if ! ignoreAllButName {
102+ tag1 = append (tag1 , t )
103+ }
104+ }
105+
106+ sizeTags += len (tag1 ) * tagsBuf .Len ()
107+ if sizeTags >= wb .FreeSize () {
108+ return errBufOverflow
109+ }
110+
111+ for i := 0 ; i < len (tag1 ); i ++ {
112+ wb .WriteUint16 (days )
113+ wb .WriteString (tag1 [i ])
114+ wb .WriteString (name )
115+ wb .WriteUVarint (uint64 (tagsWritten ))
116+ wb .Write (tagsBuf .Bytes ())
117+ wb .WriteUint32 (version )
118+ }
119+
120+ return nil
121+ }
122+
52123func (u * Tagged ) parseFile (filename string , out io.Writer ) (uint64 , map [string ]bool , error ) {
53124 var reader * RowBinary.Reader
54125 var err error
@@ -60,8 +131,6 @@ func (u *Tagged) parseFile(filename string, out io.Writer) (uint64, map[string]b
60131 }
61132 defer reader .Close ()
62133
63- version := uint32 (time .Now ().Unix ())
64-
65134 newTagged := make (map [string ]bool )
66135
67136 wb := RowBinary .GetWriteBuffer ()
@@ -83,59 +152,30 @@ LineLoop:
83152 continue
84153 }
85154
86- key := strconv . Itoa ( int ( reader . Days ())) + ":" + unsafeString (name )
155+ nameStr := unsafeString (name )
87156
157+ days := reader .Days ()
158+ key := strconv .Itoa (int (days )) + ":" + nameStr
88159 if u .existsCache .Exists (key ) {
89160 continue LineLoop
90161 }
91162
92163 if newTagged [key ] {
164+ // already processed
93165 continue LineLoop
94166 }
95- n ++
96-
97- m , err := urlParse (unsafeString (name ))
98- if err != nil {
99- continue
100- }
101-
102- newTagged [key ] = true
103-
104- wb .Reset ()
105- tagsBuf .Reset ()
106- tag1 = tag1 [:0 ]
107167
108- t := fmt .Sprintf ("__name__=%s" , m .Path )
109- tag1 = append (tag1 , t )
110- tagsBuf .WriteString (t )
111-
112- // don't upload any other tag but __name__
113- // if either main metric (m.Path) or each metric (*) is ignored
114- ignoreAllButName := u .ignoredMetrics [m .Path ] || u .ignoredMetrics ["*" ]
115- tagsWritten := 1
116- for k , v := range m .Query () {
117- t := fmt .Sprintf ("%s=%s" , k , v [0 ])
118- tagsBuf .WriteString (t )
119- tagsWritten ++
120-
121- if ! ignoreAllButName {
122- tag1 = append (tag1 , t )
123- }
124- }
125-
126- for i := 0 ; i < len (tag1 ); i ++ {
127- wb .WriteUint16 (reader .Days ())
128- wb .WriteString (tag1 [i ])
129- wb .WriteBytes (name )
130- wb .WriteUVarint (uint64 (tagsWritten ))
131- wb .Write (tagsBuf .Bytes ())
132- wb .WriteUint32 (version )
133- }
168+ n ++
134169
135- _ , err = out .Write (wb .Bytes ())
136- if err != nil {
170+ if err = u .parseName (nameStr , days , tag1 , wb , tagsBuf ); err != nil {
171+ u .logger .Warn ("parse" ,
172+ zap .String ("metric" , nameStr ), zap .String ("type" , "tagged" ), zap .String ("name" , filename ), zap .Error (err ),
173+ )
174+ continue LineLoop
175+ } else if _ , err = out .Write (wb .Bytes ()); err != nil {
137176 return n , nil , err
138177 }
178+ newTagged [key ] = true
139179 }
140180
141181 return n , newTagged , nil
0 commit comments