Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 31 additions & 6 deletions api/grpc/thanos.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/util/annotations"
"github.com/prometheus/prometheus/util/stats"
"go.opentelemetry.io/otel/attribute"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand All @@ -32,6 +33,9 @@ import (

"github.com/thanos-io/thanos-parquet-gateway/db"
"github.com/thanos-io/thanos-parquet-gateway/internal/limits"
"github.com/thanos-io/thanos-parquet-gateway/internal/matchers"
"github.com/thanos-io/thanos-parquet-gateway/internal/tracing"
"github.com/thanos-io/thanos-parquet-gateway/internal/util"
"github.com/thanos-io/thanos-parquet-gateway/internal/warnings"
"github.com/thanos-io/thanos-parquet-gateway/schema"
)
Expand Down Expand Up @@ -239,6 +243,11 @@ func (qs *QueryServer) Query(req *querypb.QueryRequest, srv querypb.Query_QueryS
ctx, cancel := context.WithTimeout(srv.Context(), timeout)
defer cancel()

span := tracing.SpanFromContext(ctx)
span.SetAttributes(
attribute.String("query.expr", req.Query),
)

if err := qs.concurrentQuerySemaphore.Reserve(ctx); err != nil {
return status.Error(codes.Aborted, fmt.Sprintf("semaphore blocked: %s", err))
}
Expand Down Expand Up @@ -268,6 +277,7 @@ func (qs *QueryServer) Query(req *querypb.QueryRequest, srv querypb.Query_QueryS
return err
}
}
util.InjectResultMetrics(span, res.Value)
switch results := res.Value.(type) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we use computeResultMetrics here?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd add a second pass over the data, instead of doing the counting as data is processed.

If that's not considered a concern I can probably update this to extract computeResultMetrics to a shared package and use it in the gRPC handler too.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll push a commit to merge these.

case promql.Vector:
for _, result := range results {
Expand Down Expand Up @@ -304,6 +314,11 @@ func (qs *QueryServer) QueryRange(req *querypb.QueryRangeRequest, srv querypb.Qu
ctx, cancel := context.WithTimeout(srv.Context(), timeout)
defer cancel()

span := tracing.SpanFromContext(ctx)
span.SetAttributes(
attribute.String("query.expr", req.Query),
)

if err := qs.concurrentQuerySemaphore.Reserve(ctx); err != nil {
return status.Error(codes.Aborted, fmt.Sprintf("semaphore blocked: %s", err))
}
Expand Down Expand Up @@ -332,6 +347,7 @@ func (qs *QueryServer) QueryRange(req *querypb.QueryRangeRequest, srv querypb.Qu
return err
}
}
util.InjectResultMetrics(span, res.Value)
switch results := res.Value.(type) {
case promql.Matrix:
for _, result := range results {
Expand Down Expand Up @@ -363,7 +379,6 @@ func (qs *QueryServer) QueryRange(req *querypb.QueryRangeRequest, srv querypb.Qu
return err
}
}

if stats := qry.Stats(); stats != nil {
if err := srv.Send(querypb.NewQueryRangeStatsResponse(toQueryStats(stats))); err != nil {
return err
Expand All @@ -374,13 +389,17 @@ func (qs *QueryServer) QueryRange(req *querypb.QueryRangeRequest, srv querypb.Qu
}

func (qs *QueryServer) Series(request *storepb.SeriesRequest, srv storepb.Store_SeriesServer) (rerr error) {
span := tracing.SpanFromContext(srv.Context())

qryable := qs.queryable(request.WithoutReplicaLabels...)

ms, err := storepb.MatchersToPromMatchers(request.Matchers...)
if err != nil {
return status.Error(codes.Internal, err.Error())
}

span.SetAttributes(attribute.StringSlice("series.matchers", matchers.ToStringSlice(ms)))

hints := &storage.SelectHints{
Start: request.MinTime,
End: request.MaxTime,
Expand All @@ -402,20 +421,20 @@ func (qs *QueryServer) Series(request *storepb.SeriesRequest, srv storepb.Store_
css := cq.Select(srv.Context(), true, hints, ms...)

var (
i = int64(0)
it chunks.Iterator
seriesCount = int64(0)
sampleCount = int64(0)
it chunks.Iterator
)
for css.Next() {
i++

series := css.At()

if request.Limit > 0 && i > request.Limit {
if request.Limit > 0 && seriesCount >= request.Limit {
if err := srv.Send(storepb.NewWarnSeriesResponse(warnings.ErrorTruncatedResponse)); err != nil {
return status.Error(codes.Aborted, err.Error())
}
break
}
seriesCount++

storeSeries := storepb.Series{Labels: zLabelsFromMetric(series.Labels())}

Expand All @@ -433,6 +452,7 @@ func (qs *QueryServer) Series(request *storepb.SeriesRequest, srv storepb.Store_
Data: chk.Chunk.Bytes(),
},
})
sampleCount += int64(chk.Chunk.NumSamples())
}
if err := it.Err(); err != nil {
return status.Error(codes.Internal, err.Error())
Expand All @@ -455,6 +475,11 @@ func (qs *QueryServer) Series(request *storepb.SeriesRequest, srv storepb.Store_
}
}

span.SetAttributes(
attribute.Int64("result.series", seriesCount),
attribute.Int64("result.samples", sampleCount),
)

return nil
}

Expand Down
9 changes: 9 additions & 0 deletions api/http/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ func (qapi *queryAPI) queryable() storage.Queryable {
)
}


func (qapi *queryAPI) query(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
span := tracing.SpanFromContext(ctx)
Expand Down Expand Up @@ -476,6 +477,8 @@ func (qapi *queryAPI) query(w http.ResponseWriter, r *http.Request) {
}
return
}
// Add result metrics to span
util.InjectResultMetrics(span, res.Value)
writeQueryResponse(w, res, qapi.l)
}

Expand Down Expand Up @@ -561,6 +564,8 @@ func (qapi *queryAPI) queryRange(w http.ResponseWriter, r *http.Request) {
}
return
}
// Add result metrics to span
util.InjectResultMetrics(span, res.Value)
writeQueryResponse(w, res, qapi.l)
}

Expand Down Expand Up @@ -649,6 +654,10 @@ func (qapi *queryAPI) series(w http.ResponseWriter, r *http.Request) {
return
}

span.SetAttributes(
attribute.Int("result.series", len(series)),
)

writeSeriesResponse(w, series, annos, qapi.l)
}

Expand Down
133 changes: 83 additions & 50 deletions cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,6 @@ import (
"regexp"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/jaeger" //nolint:staticcheck
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"go.opentelemetry.io/otel/trace/noop"

"github.com/alecthomas/units"
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -30,6 +22,7 @@ import (
"github.com/thanos-io/objstore/client"
"github.com/thanos-io/thanos/pkg/runutil"

cftracing "github.com/thanos-io/thanos-parquet-gateway/internal/tracing"
"github.com/thanos-io/thanos-parquet-gateway/locate"
)

Expand Down Expand Up @@ -102,6 +95,11 @@ func (s slogAdapter) Log(args ...any) error {
}

type tracingOpts struct {
// Config file options (Thanos-compatible)
configFile string
config string

// Legacy flag-based options
exporterType string

// jaeger opts
Expand All @@ -111,56 +109,91 @@ type tracingOpts struct {
samplingType string
}

func setupTracing(ctx context.Context, opts tracingOpts) error {
var (
exporter trace.SpanExporter
err error
)
switch opts.exporterType {
case "JAEGER":
exporter, err = jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(opts.jaegerEndpoint)))
if err != nil {
return err
func setupTracing(ctx context.Context, logger *slog.Logger, opts tracingOpts) error {
// First, check if config file is provided (Thanos-compatible format)
if opts.configFile != "" || opts.config != "" {
var confContentYaml []byte
var err error

if opts.configFile != "" {
confContentYaml, err = os.ReadFile(opts.configFile)
if err != nil {
return fmt.Errorf("unable to read tracing config file: %w", err)
}
} else {
confContentYaml = []byte(opts.config)
}
case "STDOUT":
exporter, err = stdouttrace.New()
if err != nil {
return err

confContentYaml = ExpandEnvParens(confContentYaml)
return cftracing.SetupTracingFromConfig(ctx, logger, confContentYaml)
}

// Convert legacy flag-based configuration to TracingConfig
tracingConfig, err := convertLegacyTracingFlags(logger, opts)
if err != nil {
return err
}

// Use the parsed config loader to avoid serialization/deserialization
return cftracing.SetupTracingFromParsedConfig(ctx, logger, tracingConfig)
}

// convertLegacyTracingFlags converts legacy flag-based tracing configuration
// to the new TracingConfig format.
func convertLegacyTracingFlags(logger *slog.Logger, opts tracingOpts) (*cftracing.TracingConfig, error) {
// Handle no tracing
if opts.exporterType == "" {
return &cftracing.TracingConfig{}, nil
}

// Handle STDOUT - not supported by thanos packages
if opts.exporterType == "STDOUT" {
logger.Warn("STDOUT tracing is not supported by file-based configuration. Tracing will be disabled. Please use OTLP or JAEGER providers instead.")
return &cftracing.TracingConfig{}, nil
}

// Handle JAEGER
if opts.exporterType == "JAEGER" {
config := map[string]any{
"service_name": "parquet-gateway",
}
case "":
otel.SetTracerProvider(noop.NewTracerProvider())
return nil
default:
return fmt.Errorf("invalid exporter type %s", opts.exporterType)

// Add endpoint if provided
if opts.jaegerEndpoint != "" {
config["endpoint"] = opts.jaegerEndpoint
}

// Convert sampling type
samplerType, samplerParam := convertSamplingToJaeger(opts.samplingType, opts.samplingParam)
if samplerType != "" {
config["sampler_type"] = samplerType
}
if samplerParam != 0 {
config["sampler_param"] = samplerParam
}

return &cftracing.TracingConfig{
Type: cftracing.ProviderJaeger,
Config: config,
}, nil
}
var sampler trace.Sampler
switch opts.samplingType {

return nil, fmt.Errorf("invalid exporter type %s", opts.exporterType)
}

// convertSamplingToJaeger converts legacy sampling configuration to Jaeger-compatible format.
func convertSamplingToJaeger(samplingType string, samplingParam float64) (string, float64) {
switch samplingType {
case "PROBABILISTIC":
sampler = trace.TraceIDRatioBased(opts.samplingParam)
return "probabilistic", samplingParam
case "ALWAYS":
sampler = trace.AlwaysSample()
return "const", 1.0
case "NEVER":
sampler = trace.NeverSample()
return "const", 0.0
default:
return fmt.Errorf("invalid sampling type %s", opts.samplingType)
}
r, err := resource.New(ctx,
resource.WithAttributes(
semconv.ServiceName("parquet-gateway"),
semconv.ServiceVersion("v0.0.0"),
),
)
if err != nil {
return err
// Default to const sampler with always sample
return "const", 1.0
}

tracerProvider := trace.NewTracerProvider(
trace.WithSampler(trace.ParentBased(sampler)),
trace.WithBatcher(exporter),
trace.WithResource(r),
)
otel.SetTracerProvider(tracerProvider)
return nil
}

type apiOpts struct {
Expand Down
13 changes: 8 additions & 5 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,13 @@ func (opts *bucketOpts) registerServeFlags(cmd *kingpin.CmdClause) {
}

func (opts *tracingOpts) registerServeFlags(cmd *kingpin.CmdClause) {
cmd.Flag("tracing.exporter.type", "type of tracing exporter").Default("").EnumVar(&opts.exporterType, "JAEGER", "STDOUT", "")
cmd.Flag("tracing.jaeger.endpoint", "endpoint to send traces, eg. https://example.com:4318/v1/traces").StringVar(&opts.jaegerEndpoint)
cmd.Flag("tracing.sampling.param", "sample of traces to send").Default("0.1").Float64Var(&opts.samplingParam)
cmd.Flag("tracing.sampling.type", "type of sampling").Default("PROBABILISTIC").EnumVar(&opts.samplingType, "PROBABILISTIC", "ALWAYS", "NEVER")
cmd.Flag("tracing.config-file", "Path to YAML file with tracing configuration. See format details: https://thanos.io/tip/thanos/tracing.md/").StringVar(&opts.configFile)
cmd.Flag("tracing.config", "Alternative to 'tracing.config-file'. YAML content for tracing configuration.").StringVar(&opts.config)
// Command line tracing config flags are deprecated in favour of the tracing config file
cmd.Flag("tracing.exporter.type", "type of tracing exporter: [\"\", \"JAEGER\", \"STDOUT\"]. Default: \"\"\nDEPRECATED: prefer --tracing.config or --tracing.config-file").Default("").Hidden().EnumVar(&opts.exporterType, "JAEGER", "STDOUT", "")
cmd.Flag("tracing.jaeger.endpoint", "endpoint to send traces, eg. https://example.com:4318/v1/traces\nDEPRECATED: prefer --tracing.config or --tracing.config-file").Hidden().StringVar(&opts.jaegerEndpoint)
cmd.Flag("tracing.sampling.param", "sample of traces to send\nDEPRECATED: prefer --tracing.config or --tracing.config-file").Default("0.1").Hidden().Float64Var(&opts.samplingParam)
cmd.Flag("tracing.sampling.type", "type of sampling\nDEPRECATED: prefer --tracing.config or --tracing.config-file").Default("PROBABILISTIC").Hidden().EnumVar(&opts.samplingType, "PROBABILISTIC", "ALWAYS", "NEVER")
}

func (opts *discoveryOpts) registerServeFlags(cmd *kingpin.CmdClause) {
Expand Down Expand Up @@ -140,7 +143,7 @@ func registerServeApp(app *kingpin.Application) (*kingpin.CmdClause, func(contex

setupInterrupt(ctx, &g, log)

if err := setupTracing(ctx, opts.tracing); err != nil {
if err := setupTracing(ctx, log, opts.tracing); err != nil {
return fmt.Errorf("unable to setup tracing: %w", err)
}

Expand Down
5 changes: 3 additions & 2 deletions db/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"golang.org/x/sync/errgroup"

"github.com/thanos-io/thanos-parquet-gateway/internal/limits"
matcherspkg "github.com/thanos-io/thanos-parquet-gateway/internal/matchers"
"github.com/thanos-io/thanos-parquet-gateway/internal/tracing"
"github.com/thanos-io/thanos-parquet-gateway/internal/util"
"github.com/thanos-io/thanos-parquet-gateway/internal/warnings"
Expand Down Expand Up @@ -208,7 +209,7 @@ func (q BlockQuerier) selectFn(ctx context.Context, _ bool, hints *storage.Selec
defer span.End()

span.SetAttributes(attribute.Bool("sorted", true))
span.SetAttributes(attribute.StringSlice("matchers", matchersToStringSlice(matchers)))
span.SetAttributes(attribute.StringSlice("matchers", matcherspkg.ToStringSlice(matchers)))
span.SetAttributes(attribute.Int("block.shards", len(q.shards)))
span.SetAttributes(attribute.String("block.mint", time.UnixMilli(q.mint).String()))
span.SetAttributes(attribute.String("block.maxt", time.UnixMilli(q.maxt).String()))
Expand Down Expand Up @@ -239,7 +240,7 @@ func (q *BlockChunkQuerier) selectChunksFn(ctx context.Context, _ bool, hints *s
defer span.End()

span.SetAttributes(attribute.Bool("sorted", true))
span.SetAttributes(attribute.StringSlice("matchers", matchersToStringSlice(matchers)))
span.SetAttributes(attribute.StringSlice("matchers", matcherspkg.ToStringSlice(matchers)))
span.SetAttributes(attribute.Int("block.shards", len(q.shards)))

sss := make([]storage.ChunkSeriesSet, 0, len(q.shards))
Expand Down
Loading
Loading