@@ -21,6 +21,10 @@ import (
21
21
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
22
22
"go.opentelemetry.io/otel"
23
23
"go.opentelemetry.io/otel/attribute"
24
+ otelprom "go.opentelemetry.io/otel/exporters/prometheus"
25
+ "go.opentelemetry.io/otel/sdk/metric"
26
+ otelres "go.opentelemetry.io/otel/sdk/resource"
27
+ semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
24
28
"go.opentelemetry.io/otel/trace"
25
29
"google.golang.org/api/option"
26
30
"google.golang.org/grpc"
@@ -98,6 +102,7 @@ type spannerDatastore struct {
98
102
99
103
tableSizesStatsTable string
100
104
filterMaximumIDCount uint16
105
+ meterProvider * metric.MeterProvider
101
106
}
102
107
103
108
// NewSpannerDatastore returns a datastore backed by cloud spanner
@@ -122,9 +127,23 @@ func NewSpannerDatastore(ctx context.Context, database string, opts ...Option) (
122
127
log .Info ().Str ("spanner-emulator-host" , os .Getenv ("SPANNER_EMULATOR_HOST" )).Msg ("running against spanner emulator" )
123
128
}
124
129
130
+ var meterProvider * metric.MeterProvider
125
131
if config .datastoreMetricsOption == DatastoreMetricsOptionOpenTelemetry {
126
132
log .Info ().Msg ("enabling OpenTelemetry metrics for Spanner datastore" )
127
133
spanner .EnableOpenTelemetryMetrics ()
134
+
135
+ res , err := otelres .Merge (otelres .Default (),
136
+ otelres .NewWithAttributes (semconv .SchemaURL ,
137
+ semconv .ServiceName ("spicedb" ),
138
+ ))
139
+ if err != nil {
140
+ return nil , fmt .Errorf ("failed to create otel metrics resource: %w" , err )
141
+ }
142
+
143
+ meterProvider , err = getMeterProviderWithPromExporter (res )
144
+ if err != nil {
145
+ return nil , fmt .Errorf ("failed to enable Spanner prometheus metrics: %w" , err )
146
+ }
128
147
}
129
148
130
149
if config .datastoreMetricsOption == DatastoreMetricsOptionLegacyPrometheus {
@@ -133,23 +152,21 @@ func NewSpannerDatastore(ctx context.Context, database string, opts ...Option) (
133
152
if err != nil {
134
153
return nil , fmt .Errorf ("failed to enable spanner session metrics: %w" , err )
135
154
}
155
+
136
156
err = spanner .EnableGfeLatencyAndHeaderMissingCountViews () // nolint: staticcheck
137
157
if err != nil {
138
158
return nil , fmt .Errorf ("failed to enable spanner GFE metrics: %w" , err )
139
159
}
140
- }
141
160
142
- // Register Spanner client gRPC metrics (include round-trip latency, received/sent bytes...)
143
- if err := view .Register (ocgrpc .DefaultClientViews ... ); err != nil {
144
- return nil , fmt .Errorf ("failed to enable gRPC metrics for Spanner client: %w" , err )
145
- }
161
+ // Register Spanner client gRPC metrics (include round-trip latency, received/sent bytes...)
162
+ if err := view .Register (ocgrpc .DefaultClientViews ... ); err != nil {
163
+ return nil , fmt .Errorf ("failed to enable gRPC metrics for Spanner client: %w" , err )
164
+ }
146
165
147
- _ , err = ocprom .NewExporter (ocprom.Options {
148
- Namespace : "spicedb" ,
149
- Registerer : prometheus .DefaultRegisterer ,
150
- })
151
- if err != nil {
152
- return nil , fmt .Errorf ("failed to enable spanner GFE latency stats: %w" , err )
166
+ _ , err = ocprom .NewExporter (ocprom.Options {
167
+ Namespace : "spicedb" ,
168
+ Registerer : prometheus .DefaultRegisterer ,
169
+ })
153
170
}
154
171
155
172
cfg := spanner .DefaultSessionPoolConfig
@@ -175,8 +192,9 @@ func NewSpannerDatastore(ctx context.Context, database string, opts ...Option) (
175
192
context .Background (),
176
193
database ,
177
194
spanner.ClientConfig {
178
- SessionPoolConfig : cfg ,
179
- DisableNativeMetrics : config .datastoreMetricsOption != DatastoreMetricsOptionNative ,
195
+ SessionPoolConfig : cfg ,
196
+ DisableNativeMetrics : config .datastoreMetricsOption != DatastoreMetricsOptionNative ,
197
+ OpenTelemetryMeterProvider : meterProvider ,
180
198
},
181
199
spannerOpts ... ,
182
200
)
@@ -243,6 +261,7 @@ func NewSpannerDatastore(ctx context.Context, database string, opts ...Option) (
243
261
tableSizesStatsTable : tableSizesStatsTable ,
244
262
filterMaximumIDCount : config .filterMaximumIDCount ,
245
263
schema : * schema ,
264
+ meterProvider : meterProvider ,
246
265
}
247
266
// Optimized revision and revision checking use a stale read for the
248
267
// current timestamp.
@@ -253,6 +272,20 @@ func NewSpannerDatastore(ctx context.Context, database string, opts ...Option) (
253
272
return ds , nil
254
273
}
255
274
275
+ func getMeterProviderWithPromExporter (res * otelres.Resource ) (* metric.MeterProvider , error ) {
276
+ exporter , err := otelprom .New ()
277
+ if err != nil {
278
+ return nil , err
279
+ }
280
+
281
+ meterProvider := metric .NewMeterProvider (
282
+ metric .WithResource (res ),
283
+ metric .WithReader (exporter ),
284
+ )
285
+
286
+ return meterProvider , nil
287
+ }
288
+
256
289
type traceableRTX struct {
257
290
delegate readTX
258
291
}
@@ -410,7 +443,7 @@ func (sd *spannerDatastore) OfflineFeatures() (*datastore.Features, error) {
410
443
411
444
func (sd * spannerDatastore ) Close () error {
412
445
sd .client .Close ()
413
- return nil
446
+ return sd . meterProvider . ForceFlush ( context . TODO ())
414
447
}
415
448
416
449
func statementFromSQL (sql string , args []any ) spanner.Statement {
0 commit comments