Skip to content

Commit 596b5bb

Browse files
committed
Added server side gRPC handler
Signed-off-by: Alok Kumar Singh <[email protected]>
1 parent c753064 commit 596b5bb

File tree

6 files changed

+70
-72
lines changed

6 files changed

+70
-72
lines changed

cmd/remote-storage/app/server.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
2323
"github.com/jaegertracing/jaeger/storage"
2424
"github.com/jaegertracing/jaeger/storage/dependencystore"
25+
"github.com/jaegertracing/jaeger/storage/samplingstore"
2526
"github.com/jaegertracing/jaeger/storage/spanstore"
2627
)
2728

@@ -36,8 +37,8 @@ type Server struct {
3637
}
3738

3839
// NewServer creates and initializes Server.
39-
func NewServer(options *Options, storageFactory storage.BaseFactory, tm *tenancy.Manager, telset telemetry.Settings) (*Server, error) {
40-
handler, err := createGRPCHandler(storageFactory, telset.Logger)
40+
func NewServer(options *Options, storageFactory storage.BaseFactory, tm *tenancy.Manager, telset telemetry.Settings, samplingStoreFactory storage.SamplingStoreFactory) (*Server, error) {
41+
handler, err := createGRPCHandler(storageFactory, samplingStoreFactory, telset.Logger)
4142
if err != nil {
4243
return nil, err
4344
}
@@ -54,7 +55,7 @@ func NewServer(options *Options, storageFactory storage.BaseFactory, tm *tenancy
5455
}, nil
5556
}
5657

57-
func createGRPCHandler(f storage.BaseFactory, logger *zap.Logger) (*shared.GRPCHandler, error) {
58+
func createGRPCHandler(f storage.BaseFactory, samplingStoreFactory storage.SamplingStoreFactory, logger *zap.Logger) (*shared.GRPCHandler, error) {
5859
reader, err := f.CreateSpanReader()
5960
if err != nil {
6061
return nil, err
@@ -67,12 +68,17 @@ func createGRPCHandler(f storage.BaseFactory, logger *zap.Logger) (*shared.GRPCH
6768
if err != nil {
6869
return nil, err
6970
}
71+
samplingStore, err := samplingStoreFactory.CreateSamplingStore(1)
72+
if err != nil {
73+
return nil, err
74+
}
7075

7176
impl := &shared.GRPCHandlerStorageImpl{
7277
SpanReader: func() spanstore.Reader { return reader },
7378
SpanWriter: func() spanstore.Writer { return writer },
7479
DependencyReader: func() dependencystore.Reader { return depReader },
7580
StreamingSpanWriter: func() spanstore.Writer { return nil },
81+
SamplingStore: func() samplingstore.Store { return samplingStore },
7682
}
7783

7884
// borrow code from Query service for archive storage

cmd/remote-storage/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func main() {
7777
tm := tenancy.NewManager(&opts.Tenancy)
7878
telset := baseTelset // copy
7979
telset.Metrics = metricsFactory
80-
server, err := app.NewServer(opts, storageFactory, tm, telset)
80+
server, err := app.NewServer(opts, storageFactory, tm, telset, nil)
8181
if err != nil {
8282
logger.Fatal("Failed to create server", zap.Error(err))
8383
}

plugin/storage/factory_config.go

-3
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,6 @@ func FactoryConfigFromEnvAndCLI(args []string, log io.Writer) FactoryConfig {
6868
depStorageType = spanWriterTypes[0]
6969
}
7070
samplingStorageType := os.Getenv(SamplingStorageTypeEnvVar)
71-
if samplingStorageType == "" {
72-
samplingStorageType = cassandraStorageType
73-
}
7471
// TODO support explicit configuration for readers
7572
return FactoryConfig{
7673
SpanWriterTypes: spanWriterTypes,

plugin/storage/grpc/shared/grpc_client.go

+8-64
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"errors"
99
"fmt"
1010
"io"
11-
"strconv"
1211
"time"
1312

1413
"google.golang.org/grpc"
@@ -278,26 +277,12 @@ func readTrace(stream storage_v1.SpanReaderPlugin_GetTraceClient) (*model.Trace,
278277

279278
func (c *GRPCClient) InsertThroughput(throughputs []*samplingStoreModel.Throughput) error {
280279
ctx := context.Background()
281-
storageV1Throughput := []*storage_v1.Throughput{}
282-
for _, throughput := range throughputs {
283-
probsAsArray := []float64{}
284-
for prob := range throughput.Probabilities {
285-
probInFloat, err := strconv.ParseFloat(prob, 64)
286-
if err != nil {
287-
return err
288-
}
289-
probsAsArray = append(probsAsArray, probInFloat)
290-
}
291-
292-
storageV1Throughput = append(storageV1Throughput, &storage_v1.Throughput{
293-
Service: throughput.Service,
294-
Operation: throughput.Operation,
295-
Count: throughput.Count,
296-
Probabilities: probsAsArray,
297-
})
280+
storageV1Throughput, err := samplingStoreThroughpusToStorageV1Throughputs(throughputs)
281+
if err != nil {
282+
return err
298283
}
299284

300-
_, err := c.samplingStoreClient.InsertThroughput(ctx, &storage_v1.InsertThroughputRequest{
285+
_, err = c.samplingStoreClient.InsertThroughput(ctx, &storage_v1.InsertThroughputRequest{
301286
Throughput: storageV1Throughput,
302287
})
303288
if err != nil {
@@ -309,27 +294,14 @@ func (c *GRPCClient) InsertThroughput(throughputs []*samplingStoreModel.Throughp
309294

310295
func (c *GRPCClient) InsertProbabilitiesAndQPS(hostname string, probabilities samplingStoreModel.ServiceOperationProbabilities, qps samplingStoreModel.ServiceOperationQPS) error {
311296
ctx := context.Background()
312-
stringFloatMapToV1StringFloatMap := func(in map[string]float64) *storage_v1.StringFloatMap {
313-
return &storage_v1.StringFloatMap{
314-
StringFloatMap: in,
315-
}
316-
}
317-
318-
convertToV1Map := func(in map[string]map[string]float64) map[string]*storage_v1.StringFloatMap {
319-
res := make(map[string]*storage_v1.StringFloatMap)
320-
for k, v := range in {
321-
res[k] = stringFloatMapToV1StringFloatMap(v)
322-
}
323-
return res
324-
}
325297

326298
_, err := c.samplingStoreClient.InsertProbabilitiesAndQPS(ctx, &storage_v1.InsertProbabilitiesAndQPSRequest{
327299
Hostname: hostname,
328300
Probabilities: &storage_v1.ServiceOperationProbabilities{
329-
ServiceOperationProbabilities: convertToV1Map(probabilities),
301+
ServiceOperationProbabilities: sSFloatMapToStorageV1SSFloatMap(probabilities),
330302
},
331303
Qps: &storage_v1.ServiceOperationQPS{
332-
ServiceOperationQPS: convertToV1Map(qps),
304+
ServiceOperationQPS: sSFloatMapToStorageV1SSFloatMap(qps),
333305
},
334306
})
335307
if err != nil {
@@ -349,23 +321,7 @@ func (c *GRPCClient) GetThroughput(start, end time.Time) ([]*samplingStoreModel.
349321
return nil, fmt.Errorf("plugin error: %w", err)
350322
}
351323

352-
resThroughput := []*samplingStoreModel.Throughput{}
353-
354-
for _, throughput := range resp.Throughput {
355-
probsAsSet := make(map[string]struct{})
356-
for _, prob := range throughput.Probabilities {
357-
probsAsSet[strconv.FormatFloat(prob, 'E', -1, 64)] = struct{}{}
358-
}
359-
360-
resThroughput = append(resThroughput, &samplingStoreModel.Throughput{
361-
Service: throughput.Service,
362-
Operation: throughput.Operation,
363-
Count: throughput.Count,
364-
Probabilities: probsAsSet,
365-
})
366-
}
367-
368-
return resThroughput, nil
324+
return storageV1ThroughputsToSamplingStoreThroughputs(resp.Throughput), nil
369325
}
370326

371327
func (c *GRPCClient) GetLatestProbabilities() (samplingStoreModel.ServiceOperationProbabilities, error) {
@@ -375,17 +331,5 @@ func (c *GRPCClient) GetLatestProbabilities() (samplingStoreModel.ServiceOperati
375331
return nil, fmt.Errorf("plugin error: %w", err)
376332
}
377333

378-
v1StringFloatMapToStringFloatMap := func(in *storage_v1.StringFloatMap) map[string]float64 {
379-
return in.StringFloatMap
380-
}
381-
382-
convertToMap := func(in map[string]*storage_v1.StringFloatMap) map[string]map[string]float64 {
383-
res := make(map[string]map[string]float64)
384-
for k, v := range in {
385-
res[k] = v1StringFloatMapToStringFloatMap(v)
386-
}
387-
return res
388-
}
389-
390-
return convertToMap(resp.ServiceOperationProbabilities.ServiceOperationProbabilities), nil
334+
return storageV1SSFloatMapToSSFloatMap(resp.ServiceOperationProbabilities.ServiceOperationProbabilities), nil
391335
}

plugin/storage/grpc/shared/grpc_handler.go

+48
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
_ "github.com/jaegertracing/jaeger/pkg/gogocodec" // force gogo codec registration
2020
"github.com/jaegertracing/jaeger/proto-gen/storage_v1"
2121
"github.com/jaegertracing/jaeger/storage/dependencystore"
22+
"github.com/jaegertracing/jaeger/storage/samplingstore"
2223
"github.com/jaegertracing/jaeger/storage/spanstore"
2324
)
2425

@@ -42,6 +43,8 @@ type GRPCHandlerStorageImpl struct {
4243
ArchiveSpanWriter func() spanstore.Writer
4344

4445
StreamingSpanWriter func() spanstore.Writer
46+
47+
SamplingStore func() samplingstore.Store
4548
}
4649

4750
// NewGRPCHandler creates a handler given individual storage implementations.
@@ -83,6 +86,7 @@ func (s *GRPCHandler) Register(ss *grpc.Server, hs *health.Server) error {
8386
storage_v1.RegisterPluginCapabilitiesServer(ss, s)
8487
storage_v1.RegisterDependenciesReaderPluginServer(ss, s)
8588
storage_v1.RegisterStreamingSpanWriterPluginServer(ss, s)
89+
storage_v1.RegisterSamplingStorePluginServer(ss, s)
8690

8791
hs.SetServingStatus("jaeger.storage.v1.SpanReaderPlugin", grpc_health_v1.HealthCheckResponse_SERVING)
8892
hs.SetServingStatus("jaeger.storage.v1.SpanWriterPlugin", grpc_health_v1.HealthCheckResponse_SERVING)
@@ -91,6 +95,7 @@ func (s *GRPCHandler) Register(ss *grpc.Server, hs *health.Server) error {
9195
hs.SetServingStatus("jaeger.storage.v1.PluginCapabilities", grpc_health_v1.HealthCheckResponse_SERVING)
9296
hs.SetServingStatus("jaeger.storage.v1.DependenciesReaderPlugin", grpc_health_v1.HealthCheckResponse_SERVING)
9397
hs.SetServingStatus("jaeger.storage.v1.StreamingSpanWriterPlugin", grpc_health_v1.HealthCheckResponse_SERVING)
98+
hs.SetServingStatus("jaeger.storage.v1.SamplingStorePlugin", grpc_health_v1.HealthCheckResponse_SERVING)
9499
grpc_health_v1.RegisterHealthServer(ss, hs)
95100

96101
return nil
@@ -303,3 +308,46 @@ func (s *GRPCHandler) WriteArchiveSpan(ctx context.Context, r *storage_v1.WriteS
303308
}
304309
return &storage_v1.WriteSpanResponse{}, nil
305310
}
311+
312+
func (s *GRPCHandler) InsertThroughput(_ context.Context, r *storage_v1.InsertThroughputRequest) (*storage_v1.InsertThroughputResponse, error) {
313+
err := s.impl.SamplingStore().InsertThroughput(storageV1ThroughputsToSamplingStoreThroughputs(r.Throughput))
314+
if err != nil {
315+
return nil, err
316+
}
317+
return &storage_v1.InsertThroughputResponse{}, nil
318+
}
319+
320+
func (s *GRPCHandler) InsertProbabilitiesAndQPS(_ context.Context, r *storage_v1.InsertProbabilitiesAndQPSRequest) (*storage_v1.InsertProbabilitiesAndQPSResponse, error) {
321+
err := s.impl.SamplingStore().InsertProbabilitiesAndQPS(r.Hostname, storageV1SSFloatMapToSSFloatMap(r.Probabilities.ServiceOperationProbabilities), storageV1SSFloatMapToSSFloatMap(r.Qps.ServiceOperationQPS))
322+
if err != nil {
323+
return nil, err
324+
}
325+
return &storage_v1.InsertProbabilitiesAndQPSResponse{}, nil
326+
}
327+
328+
func (s *GRPCHandler) GetThroughput(_ context.Context, r *storage_v1.GetThroughputRequest) (*storage_v1.GetThroughputResponse, error) {
329+
throughput, err := s.impl.SamplingStore().GetThroughput(r.StartTime, r.EndTime)
330+
if err != nil {
331+
return nil, err
332+
}
333+
storageV1Throughput, err := samplingStoreThroughpusToStorageV1Throughputs(throughput)
334+
if err != nil {
335+
return nil, err
336+
}
337+
338+
return &storage_v1.GetThroughputResponse{
339+
Throughput: storageV1Throughput,
340+
}, nil
341+
}
342+
343+
func (s *GRPCHandler) GetLatestProbabilities(context.Context, *storage_v1.GetLatestProbabilitiesRequest) (*storage_v1.GetLatestProbabilitiesResponse, error) {
344+
probabilities, err := s.impl.SamplingStore().GetLatestProbabilities()
345+
if err != nil {
346+
return nil, err
347+
}
348+
return &storage_v1.GetLatestProbabilitiesResponse{
349+
ServiceOperationProbabilities: &storage_v1.ServiceOperationProbabilities{
350+
ServiceOperationProbabilities: sSFloatMapToStorageV1SSFloatMap(probabilities),
351+
},
352+
}, nil
353+
}

plugin/storage/integration/remote_memory_storage.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ func StartNewRemoteMemoryStorage(t *testing.T) *RemoteMemoryStorage {
4343
storageFactory, err := storage.NewFactory(storage.FactoryConfigFromEnvAndCLI(os.Args, os.Stderr))
4444
require.NoError(t, err)
4545

46+
samplingStoreFactory, err := storageFactory.CreateSamplingStoreFactory()
47+
require.NoError(t, err)
48+
4649
v, _ := config.Viperize(storageFactory.AddFlags)
4750
storageFactory.InitFromViper(v, logger)
4851
require.NoError(t, storageFactory.Initialize(metrics.NullFactory, logger))
@@ -51,7 +54,7 @@ func StartNewRemoteMemoryStorage(t *testing.T) *RemoteMemoryStorage {
5154
telset := telemetry.NoopSettings()
5255
telset.Logger = logger
5356
telset.ReportStatus = telemetry.HCAdapter(healthcheck.New())
54-
server, err := app.NewServer(opts, storageFactory, tm, telset)
57+
server, err := app.NewServer(opts, storageFactory, tm, telset, samplingStoreFactory)
5558
require.NoError(t, err)
5659
require.NoError(t, server.Start())
5760

0 commit comments

Comments
 (0)