Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func createFeatureFlagsMD(clientSideMetricsEnabled, disableRetryInfo, enableDire
RetryInfo: !disableRetryInfo,
TrafficDirectorEnabled: enableDirectAccess,
DirectAccessRequested: enableDirectAccess,
PeerInfo: true,
}

val := ""
Expand Down
82 changes: 79 additions & 3 deletions bigtable/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,16 @@ const (
metricLabelKeyClientName = "client_name"
metricLabelKeyClientUID = "client_uid"

metricTransportType = "transport_type"
metricTransportRegion = "transport_region"
metricTransportSubZone = "transport_subzone"
metricTransportZone = "transport_zone"

// Metric names
metricNameOperationLatencies = "operation_latencies"
metricNameAttemptLatencies = "attempt_latencies"
metricNameOperationLatencies = "operation_latencies"
metricNameAttemptLatencies = "attempt_latencies"
metricNameAttemptLatencies2 = "attempt_latencies2"

metricNameServerLatencies = "server_latencies"
metricNameAppBlockingLatencies = "application_latencies"
metricNameClientBlockingLatencies = "throttling_latencies"
Expand All @@ -78,7 +85,7 @@ const (
// Metric units
metricUnitMS = "ms"
metricUnitCount = "1"
maxAttrsLen = 12 // Monitored resource labels + Metric labels
maxAttrsLen = 16 // Monitored resource labels + Metric labels
)

type contextKey string
Expand Down Expand Up @@ -123,6 +130,17 @@ var (
},
recordedPerAttempt: true,
},
metricNameAttemptLatencies2: {
additionalAttrs: []string{
metricLabelKeyStatus,
metricLabelKeyStreamingOperation,
metricTransportType,
metricTransportRegion,
metricTransportSubZone,
metricTransportZone,
},
recordedPerAttempt: true,
},
metricNameServerLatencies: {
additionalAttrs: []string{
metricLabelKeyStatus,
Expand Down Expand Up @@ -207,6 +225,7 @@ type builtinMetricsTracerFactory struct {
operationLatencies metric.Float64Histogram
serverLatencies metric.Float64Histogram
attemptLatencies metric.Float64Histogram
attemptLatencies2 metric.Float64Histogram
firstRespLatencies metric.Float64Histogram
appBlockingLatencies metric.Float64Histogram
clientBlockingLatencies metric.Float64Histogram
Expand Down Expand Up @@ -345,6 +364,19 @@ func (tf *builtinMetricsTracerFactory) createInstruments(meter metric.Meter) err
metric.WithUnit(metricUnitMS),
metric.WithExplicitBucketBoundaries(bucketBounds...),
)

if err != nil {
return err
}

// Create attempt_latencies2
tf.attemptLatencies2, err = meter.Float64Histogram(
metricNameAttemptLatencies2,
metric.WithDescription("Client observed latency per RPC attempt."),
metric.WithUnit(metricUnitMS),
metric.WithExplicitBucketBoundaries(bucketBounds...),
)

if err != nil {
return err
}
Expand Down Expand Up @@ -436,6 +468,7 @@ type builtinMetricsTracer struct {
instrumentOperationLatencies metric.Float64Histogram
instrumentServerLatencies metric.Float64Histogram
instrumentAttemptLatencies metric.Float64Histogram
instrumentAttemptLatencies2 metric.Float64Histogram
instrumentFirstRespLatencies metric.Float64Histogram
instrumentAppBlockingLatencies metric.Float64Histogram
instrumentClientBlockingLatencies metric.Float64Histogram
Expand Down Expand Up @@ -499,6 +532,11 @@ type attemptTracer struct {
clusterID string
zoneID string

transportRegion string
transportZone string
transportSubZone string
transportType string

// gRPC status code
status string

Expand All @@ -524,6 +562,22 @@ func (a *attemptTracer) setZoneID(zoneID string) {
a.zoneID = zoneID
}

func (a *attemptTracer) setTransportZone(transportZone string) {
a.transportZone = transportZone
}

func (a *attemptTracer) setTransportSubZone(transportSubZone string) {
a.transportSubZone = transportSubZone
}

func (a *attemptTracer) setTransportRegion(transportRegion string) {
a.transportRegion = transportRegion
}

func (a *attemptTracer) setTransportType(transportType string) {
a.transportType = transportType
}

func (a *attemptTracer) setStatus(status string) {
a.status = status
}
Expand Down Expand Up @@ -561,6 +615,7 @@ func (tf *builtinMetricsTracerFactory) createBuiltinMetricsTracer(ctx context.Co
instrumentOperationLatencies: tf.operationLatencies,
instrumentServerLatencies: tf.serverLatencies,
instrumentAttemptLatencies: tf.attemptLatencies,
instrumentAttemptLatencies2: tf.attemptLatencies2,
instrumentFirstRespLatencies: tf.firstRespLatencies,
instrumentAppBlockingLatencies: tf.appBlockingLatencies,
instrumentClientBlockingLatencies: tf.clientBlockingLatencies,
Expand Down Expand Up @@ -617,6 +672,14 @@ func (mt *builtinMetricsTracer) toOtelMetricAttrs(metricName string) (attribute.
attrKeyValues = append(attrKeyValues, attribute.String(metricLabelKeyStatus, status))
case metricLabelKeyStreamingOperation:
attrKeyValues = append(attrKeyValues, attribute.Bool(metricLabelKeyStreamingOperation, mt.isStreaming))
case metricTransportRegion:
attrKeyValues = append(attrKeyValues, attribute.String(metricTransportRegion, mt.currOp.currAttempt.transportRegion))
case metricTransportSubZone:
attrKeyValues = append(attrKeyValues, attribute.String(metricTransportSubZone, mt.currOp.currAttempt.transportSubZone))
case metricTransportZone:
attrKeyValues = append(attrKeyValues, attribute.String(metricTransportZone, mt.currOp.currAttempt.transportZone))
case metricTransportType:
attrKeyValues = append(attrKeyValues, attribute.String(metricTransportType, mt.currOp.currAttempt.transportType))
default:
return attribute.Set{}, fmt.Errorf("unknown additional attribute: %v", attrKey)
}
Expand Down Expand Up @@ -655,6 +718,15 @@ func (mt *builtinMetricsTracer) recordAttemptCompletion(attemptHeaderMD, attempT
// Get location attributes from metadata and set it in tracer
// Ignore get location error since the metric can still be recorded with rest of the attributes
clusterID, zoneID, _ := extractLocation(attemptHeaderMD, attempTrailerMD)

peerInfo, _ := extractPeerInfo(attemptHeaderMD, attempTrailerMD)
if peerInfo != nil {
mt.currOp.currAttempt.setTransportType(peerInfo.TransportType.String())
mt.currOp.currAttempt.setTransportRegion(peerInfo.GetApplicationFrontendZone())
mt.currOp.currAttempt.setTransportZone(peerInfo.GetApplicationFrontendZone())
Comment on lines +725 to +726
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Both transportRegion and transportZone are being set to the same value from peerInfo.GetApplicationFrontendZone(). This is incorrect. The region should be extracted from the zone string. For example, if the zone is us-central1-b, the region should be us-central1.

You can extract the region by taking the substring before the last hyphen. Please also remember to add import "strings" to the file.

mt.currOp.currAttempt.setTransportZone(peerInfo.GetApplicationFrontendZone())
		zone := peerInfo.GetApplicationFrontendZone()
		region := zone
		if i := strings.LastIndex(zone, "-"); i != -1 {
			region = zone[:i]
		}
		mt.currOp.currAttempt.setTransportRegion(region)

mt.currOp.currAttempt.setTransportSubZone(peerInfo.GetApplicationFrontendSubzone())
}

mt.currOp.currAttempt.setClusterID(clusterID)
mt.currOp.currAttempt.setZoneID(zoneID)

Expand All @@ -670,6 +742,10 @@ func (mt *builtinMetricsTracer) recordAttemptCompletion(attemptHeaderMD, attempT
attemptLatAttrs, _ := mt.toOtelMetricAttrs(metricNameAttemptLatencies)
mt.instrumentAttemptLatencies.Record(mt.ctx, elapsedTime, metric.WithAttributeSet(attemptLatAttrs))

// Record attempt_latencies
attemptLat2Attrs, _ := mt.toOtelMetricAttrs(metricNameAttemptLatencies2)
mt.instrumentAttemptLatencies2.Record(mt.ctx, elapsedTime, metric.WithAttributeSet(attemptLat2Attrs))

// Record client_blocking_latencies
var clientBlockingLatencyMs float64
if mt.currOp.currAttempt.blockingLatencyTracker != nil {
Expand Down
10 changes: 7 additions & 3 deletions bigtable/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) {

wantMetricNamesStdout := []string{
metricNameAttemptLatencies, metricNameAttemptLatencies,
metricNameAttemptLatencies2, metricNameAttemptLatencies2,
metricNameFirstRespLatencies,
metricNameConnErrCount, metricNameConnErrCount,
metricNameOperationLatencies,
Expand Down Expand Up @@ -372,6 +373,7 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) {
gotNonNilInstruments := gotClient.metricsTracerFactory.operationLatencies != nil &&
gotClient.metricsTracerFactory.serverLatencies != nil &&
gotClient.metricsTracerFactory.attemptLatencies != nil &&
gotClient.metricsTracerFactory.attemptLatencies2 != nil &&
gotClient.metricsTracerFactory.appBlockingLatencies != nil &&
gotClient.metricsTracerFactory.firstRespLatencies != nil &&
gotClient.metricsTracerFactory.retryCount != nil &&
Expand Down Expand Up @@ -420,9 +422,11 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) {

// Calculate elapsed time
elapsedTime := time.Since(testStartTime)
if elapsedTime < 3*defaultSamplePeriod {
// Ensure at least 2 datapoints are recorded
time.Sleep(3*defaultSamplePeriod - elapsedTime)
if test.wantBuiltinEnabled {
if elapsedTime < 3*defaultSamplePeriod {
// Ensure at least 2 datapoints are recorded
time.Sleep(3*defaultSamplePeriod - elapsedTime)
}
}

// Get new CreateServiceTimeSeriesRequests
Expand Down
33 changes: 33 additions & 0 deletions bigtable/metrics_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ limitations under the License.
package bigtable

import (
"encoding/base64"
"errors"
"fmt"
"strconv"
"strings"
"time"
Expand All @@ -31,6 +33,7 @@ const (
defaultCluster = "<unspecified>"
defaultZone = "global"
defaultTable = "<unspecified>"
peerInfoMDKey = "bigtable-peer-info"
)

// get GFE latency in ms from response metadata
Expand Down Expand Up @@ -97,6 +100,36 @@ func extractLocation(headerMD metadata.MD, trailerMD metadata.MD) (string, strin
return responseParams.GetClusterId(), responseParams.GetZoneId(), nil
}

func extractPeerInfo(headerMD metadata.MD, trailerMD metadata.MD) (*btpb.PeerInfo, error) {
var peerInfoData []string

// Check whether peer info metadata is available in response header metadata
if headerMD != nil {
peerInfoData = headerMD.Get(peerInfoMDKey)
}

// If none found in header, check trailer metadata
if len(peerInfoData) == 0 && trailerMD != nil {
peerInfoData = trailerMD.Get(peerInfoMDKey)
}

// If it's still empty, there's no PeerInfo to process
if len(peerInfoData) == 0 || peerInfoData[0] == "" {
return nil, nil
}

decoded, err := base64.RawURLEncoding.DecodeString(peerInfoData[0])

if err != nil {
return nil, fmt.Errorf("failed to decode %s from header: %w", peerInfoMDKey, err)
}
var peerInfo btpb.PeerInfo
if err := proto.Unmarshal(decoded, &peerInfo); err != nil {
return nil, fmt.Errorf("failed to parse %s protobuf: %w", peerInfoMDKey, err)
}
return &peerInfo, nil
}

func convertToMs(d time.Duration) float64 {
return float64(d.Nanoseconds()) / 1000000
}
Loading
Loading