Skip to content

Commit 9d2c211

Browse files
Merge branch 'master' into vmalert-proxy
2 parents 9c4af50 + bdf9033 commit 9d2c211

File tree

149 files changed

+9906
-4807
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

149 files changed

+9906
-4807
lines changed

app/vlinsert/elasticsearch/elasticsearch.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,10 @@ func readBulkLine(lr *insertutil.LineReader, timeFields, msgFields []string, lmp
187187
// Continue parsing next lines.
188188
return true, nil
189189
}
190+
190191
p := logstorage.GetJSONParser()
192+
defer logstorage.PutJSONParser(p)
193+
191194
if err := p.ParseLogMessage(line); err != nil {
192195
return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err)
193196
}
@@ -201,7 +204,6 @@ func readBulkLine(lr *insertutil.LineReader, timeFields, msgFields []string, lmp
201204
}
202205
logstorage.RenameField(p.Fields, msgFields, "_msg")
203206
lmp.AddRow(ts, p.Fields, nil)
204-
logstorage.PutJSONParser(p)
205207

206208
return true, nil
207209
}

app/vlinsert/syslog/syslog.go

Lines changed: 203 additions & 106 deletions
Large diffs are not rendered by default.

app/vlselect/internalselect/internalselect.go

Lines changed: 70 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ func processQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
100100

101101
bb := bufs.Get(workerID)
102102

103+
// Write the marker of a regular data block.
104+
bb.B = append(bb.B, 0)
105+
106+
// Marshal the data block.
103107
bb.B = db.Marshal(bb.B)
104108

105109
if len(bb.B) < 1024*1024 {
@@ -117,7 +121,10 @@ func processQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
117121
}
118122
}
119123

120-
if err := vlstorage.RunQuery(ctx, cp.TenantIDs, cp.Query, writeBlock); err != nil {
124+
qctx := cp.NewQueryContext(ctx)
125+
defer cp.UpdatePerQueryStatsMetrics()
126+
127+
if err := vlstorage.RunQuery(qctx, writeBlock); err != nil {
121128
return err
122129
}
123130
if errGlobal != nil {
@@ -131,7 +138,13 @@ func processQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
131138
}
132139
}
133140

134-
return nil
141+
// Send the query stats block.
142+
bb := bufs.Get(0)
143+
// Write the marker of query stats block.
144+
bb.B = append(bb.B, 1)
145+
// Marshal the block itself
146+
bb.B = marshalQueryStatsBlock(bb.B, qctx)
147+
return sendBuf(bb)
135148
}
136149

137150
func processFieldNamesRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
@@ -140,12 +153,15 @@ func processFieldNamesRequest(ctx context.Context, w http.ResponseWriter, r *htt
140153
return err
141154
}
142155

143-
fieldNames, err := vlstorage.GetFieldNames(ctx, cp.TenantIDs, cp.Query)
156+
qctx := cp.NewQueryContext(ctx)
157+
defer cp.UpdatePerQueryStatsMetrics()
158+
159+
fieldNames, err := vlstorage.GetFieldNames(qctx)
144160
if err != nil {
145161
return fmt.Errorf("cannot obtain field names: %w", err)
146162
}
147163

148-
return writeValuesWithHits(w, fieldNames, cp.DisableCompression)
164+
return writeValuesWithHits(w, qctx, fieldNames, cp.DisableCompression)
149165
}
150166

151167
func processFieldValuesRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
@@ -161,12 +177,15 @@ func processFieldValuesRequest(ctx context.Context, w http.ResponseWriter, r *ht
161177
return err
162178
}
163179

164-
fieldValues, err := vlstorage.GetFieldValues(ctx, cp.TenantIDs, cp.Query, fieldName, uint64(limit))
180+
qctx := cp.NewQueryContext(ctx)
181+
defer cp.UpdatePerQueryStatsMetrics()
182+
183+
fieldValues, err := vlstorage.GetFieldValues(qctx, fieldName, uint64(limit))
165184
if err != nil {
166185
return fmt.Errorf("cannot obtain field values: %w", err)
167186
}
168187

169-
return writeValuesWithHits(w, fieldValues, cp.DisableCompression)
188+
return writeValuesWithHits(w, qctx, fieldValues, cp.DisableCompression)
170189
}
171190

172191
func processStreamFieldNamesRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
@@ -175,12 +194,15 @@ func processStreamFieldNamesRequest(ctx context.Context, w http.ResponseWriter,
175194
return err
176195
}
177196

178-
fieldNames, err := vlstorage.GetStreamFieldNames(ctx, cp.TenantIDs, cp.Query)
197+
qctx := cp.NewQueryContext(ctx)
198+
defer cp.UpdatePerQueryStatsMetrics()
199+
200+
fieldNames, err := vlstorage.GetStreamFieldNames(qctx)
179201
if err != nil {
180202
return fmt.Errorf("cannot obtain stream field names: %w", err)
181203
}
182204

183-
return writeValuesWithHits(w, fieldNames, cp.DisableCompression)
205+
return writeValuesWithHits(w, qctx, fieldNames, cp.DisableCompression)
184206
}
185207

186208
func processStreamFieldValuesRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
@@ -196,12 +218,15 @@ func processStreamFieldValuesRequest(ctx context.Context, w http.ResponseWriter,
196218
return err
197219
}
198220

199-
fieldValues, err := vlstorage.GetStreamFieldValues(ctx, cp.TenantIDs, cp.Query, fieldName, uint64(limit))
221+
qctx := cp.NewQueryContext(ctx)
222+
defer cp.UpdatePerQueryStatsMetrics()
223+
224+
fieldValues, err := vlstorage.GetStreamFieldValues(qctx, fieldName, uint64(limit))
200225
if err != nil {
201226
return fmt.Errorf("cannot obtain stream field values: %w", err)
202227
}
203228

204-
return writeValuesWithHits(w, fieldValues, cp.DisableCompression)
229+
return writeValuesWithHits(w, qctx, fieldValues, cp.DisableCompression)
205230
}
206231

207232
func processStreamsRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
@@ -215,12 +240,15 @@ func processStreamsRequest(ctx context.Context, w http.ResponseWriter, r *http.R
215240
return err
216241
}
217242

218-
streams, err := vlstorage.GetStreams(ctx, cp.TenantIDs, cp.Query, uint64(limit))
243+
qctx := cp.NewQueryContext(ctx)
244+
defer cp.UpdatePerQueryStatsMetrics()
245+
246+
streams, err := vlstorage.GetStreams(qctx, uint64(limit))
219247
if err != nil {
220248
return fmt.Errorf("cannot obtain streams: %w", err)
221249
}
222250

223-
return writeValuesWithHits(w, streams, cp.DisableCompression)
251+
return writeValuesWithHits(w, qctx, streams, cp.DisableCompression)
224252
}
225253

226254
func processStreamIDsRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
@@ -234,19 +262,33 @@ func processStreamIDsRequest(ctx context.Context, w http.ResponseWriter, r *http
234262
return err
235263
}
236264

237-
streamIDs, err := vlstorage.GetStreamIDs(ctx, cp.TenantIDs, cp.Query, uint64(limit))
265+
qctx := cp.NewQueryContext(ctx)
266+
defer cp.UpdatePerQueryStatsMetrics()
267+
268+
streamIDs, err := vlstorage.GetStreamIDs(qctx, uint64(limit))
238269
if err != nil {
239270
return fmt.Errorf("cannot obtain streams: %w", err)
240271
}
241272

242-
return writeValuesWithHits(w, streamIDs, cp.DisableCompression)
273+
return writeValuesWithHits(w, qctx, streamIDs, cp.DisableCompression)
243274
}
244275

245276
type commonParams struct {
246277
TenantIDs []logstorage.TenantID
247278
Query *logstorage.Query
248279

249280
DisableCompression bool
281+
282+
// qs contains execution statistics for the Query.
283+
qs logstorage.QueryStats
284+
}
285+
286+
func (cp *commonParams) NewQueryContext(ctx context.Context) *logstorage.QueryContext {
287+
return logstorage.NewQueryContext(ctx, &cp.qs, cp.TenantIDs, cp.Query)
288+
}
289+
290+
func (cp *commonParams) UpdatePerQueryStatsMetrics() {
291+
vlstorage.UpdatePerQueryStatsMetrics(&cp.qs)
250292
}
251293

252294
func getCommonParams(r *http.Request, expectedProtocolVersion string) (*commonParams, error) {
@@ -287,12 +329,18 @@ func getCommonParams(r *http.Request, expectedProtocolVersion string) (*commonPa
287329
return cp, nil
288330
}
289331

290-
func writeValuesWithHits(w http.ResponseWriter, vhs []logstorage.ValueWithHits, disableCompression bool) error {
332+
func writeValuesWithHits(w http.ResponseWriter, qctx *logstorage.QueryContext, vhs []logstorage.ValueWithHits, disableCompression bool) error {
291333
var b []byte
334+
335+
// Marshal vhs at first
336+
b = encoding.MarshalUint64(b, uint64(len(vhs)))
292337
for i := range vhs {
293338
b = vhs[i].Marshal(b)
294339
}
295340

341+
// Marshal query stats block after that
342+
b = marshalQueryStatsBlock(b, qctx)
343+
296344
if !disableCompression {
297345
b = zstd.CompressLevel(nil, b, 1)
298346
}
@@ -306,6 +354,13 @@ func writeValuesWithHits(w http.ResponseWriter, vhs []logstorage.ValueWithHits,
306354
return nil
307355
}
308356

357+
func marshalQueryStatsBlock(dst []byte, qctx *logstorage.QueryContext) []byte {
358+
queryDurationNsecs := qctx.QueryDurationNsecs()
359+
db := qctx.QueryStats.CreateDataBlock(queryDurationNsecs)
360+
dst = db.Marshal(dst)
361+
return dst
362+
}
363+
309364
func getInt64FromRequest(r *http.Request, argName string) (int64, error) {
310365
s := r.FormValue(argName)
311366
n, err := strconv.ParseInt(s, 10, 64)

0 commit comments

Comments
 (0)