|
7 | 7 | "encoding/hex" |
8 | 8 | "encoding/json" |
9 | 9 | "fmt" |
| 10 | + "google.golang.org/grpc/metadata" |
10 | 11 | "io" |
11 | 12 | "math" |
12 | 13 | "net/http" |
@@ -646,6 +647,13 @@ func (listener *CarbonserverListener) streamMetrics(stream grpcv2.CarbonV2_Rende |
646 | 647 | return |
647 | 648 | } |
648 | 649 |
|
| 650 | +const gRPCRenderMetricsCountHeaderKey = "metrics-count" |
| 651 | + |
| 652 | +func (listener *CarbonserverListener) sendRenderMetadataHeader(stream grpcv2.CarbonV2_RenderServer, filesCount int) error { |
| 653 | + header := metadata.Pairs(gRPCRenderMetricsCountHeaderKey, strconv.Itoa(filesCount)) |
| 654 | + return stream.SendHeader(header) |
| 655 | +} |
| 656 | + |
649 | 657 | // Render implements Render rpc of CarbonV2 gRPC service |
650 | 658 | func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, stream grpcv2.CarbonV2_RenderServer) (rpcErr error) { |
651 | 659 | t0 := time.Now() |
@@ -726,6 +734,10 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str |
726 | 734 | metricGlobMap := getMetricGlobMapFromExpandedGlobs(expandedGlobs) |
727 | 735 | tle.MetricGlobMapLength = len(metricGlobMap) |
728 | 736 | filesCount := countFilesInExpandedGlobs(expandedGlobs) |
| 737 | + err = listener.sendRenderMetadataHeader(stream, filesCount) |
| 738 | + if err != nil { |
| 739 | + return nil, err |
| 740 | + } |
729 | 741 | prepareChan := make(chan response, getStreamingChannelSize(filesCount)) |
730 | 742 | go func() { |
731 | 743 | prepareT0 := time.Now() |
@@ -768,6 +780,7 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str |
768 | 780 | case res != nil: |
769 | 781 | atomic.AddUint64(&listener.metrics.QueryCacheHit, 1) |
770 | 782 | cachedResponses := res.([]response) |
| 783 | + err = listener.sendRenderMetadataHeader(stream, len(cachedResponses)) |
771 | 784 | responseChanToStream = make(chan response, getStreamingChannelSize(len(cachedResponses))) |
772 | 785 | go func() { |
773 | 786 | for _, r := range cachedResponses { |
|
0 commit comments