diff --git a/README.md b/README.md index 7c0ecc52..ca5d4039 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ This project provides both the [core Spring Data Valkey library](spring-data-val * Exception translation to Spring's portable Data Access exception hierarchy for driver exceptions. * `ValkeyTemplate` that provides a high level abstraction for performing various Valkey operations, exception translation and serialization support. * Pubsub support (such as a MessageListenerContainer for message-driven POJOs). Available with Jedis and Lettuce, with Valkey GLIDE support WIP for version 1.0.0. +* OpenTelemetry instrumentation support when using the Valkey GLIDE client for emitting traces and metrics for Valkey operations. * Valkey Sentinel support is currently available in Jedis and Lettuce, while support in Valkey GLIDE is planned for a future release. * Reactive API using Lettuce. * JDK, String, JSON and Spring Object/XML mapping serializers. @@ -29,6 +30,7 @@ This project provides both the [core Spring Data Valkey library](spring-data-val * Support for multiple Valkey drivers ([Valkey GLIDE](https://github.com/valkey-io/valkey-glide), [Lettuce](https://github.com/lettuce-io/lettuce-core), and [Jedis](https://github.com/redis/jedis)). * Connection pooling configuration for all supported clients. * Valkey Cluster auto-configuration and support. +* Property-based OpenTelemetry configuration for Valkey GLIDE, enabling automatic trace and metric export without application code changes. * Valkey Sentinel configuration support (Lettuce and Jedis only). * SSL/TLS connection support with Spring Boot SSL bundles. * Spring Boot Actuator health indicators and metrics for Valkey connections. diff --git a/examples/boot-telemetry/README.md b/examples/boot-telemetry/README.md new file mode 100644 index 00000000..c333999f --- /dev/null +++ b/examples/boot-telemetry/README.md @@ -0,0 +1,55 @@ +# Spring Boot + Valkey-GLIDE + OpenTelemetry + +This example demonstrates using **Spring Boot** with **Spring Data Valkey**, backed by the **Valkey-GLIDE** client, with **OpenTelemetry tracing and metrics enabled via configuration**. + +On startup, the application executes a small number of Valkey `SET` / `GET` commands using `StringValkeyTemplate`. Each command is automatically instrumented by GLIDE and exported via OpenTelemetry. + +--- + +## How it works + +- Spring Boot auto-configures all Valkey beans +- Valkey-GLIDE is selected using `client-type=valkeyglide` +- OpenTelemetry is enabled inside GLIDE using Spring Boot properties +- An OpenTelemetry Collector is started using Docker Compose +- Traces and metrics are exported through the collector + +No explicit OpenTelemetry SDK or client setup code is required. + +--- + +## Running the example + +```bash +../../mvnw clean compile exec:java +```` + +Docker Compose is started automatically and kept running after the application exits. + +--- + +## Key configuration properties + +```properties +spring.data.valkey.client-type=valkeyglide + +spring.data.valkey.valkey-glide.open-telemetry.enabled=true +spring.data.valkey.valkey-glide.open-telemetry.traces-endpoint=http://localhost:4318/v1/traces +spring.data.valkey.valkey-glide.open-telemetry.metrics-endpoint=http://localhost:4318/v1/metrics + +spring.docker.compose.lifecycle-management=start-only +``` + +--- + +## Inspecting OpenTelemetry + +```bash +docker logs -f spring-boot-opentelemetry-otel-collector-1 +``` + +If yoy change the configuration of the docker shutdown the existing containers first and then run the example again: + +```bash +docker compose down --remove-orphans +``` diff --git a/examples/boot-telemetry/compose.yaml b/examples/boot-telemetry/compose.yaml new file mode 100644 index 00000000..ddf4ab3f --- /dev/null +++ b/examples/boot-telemetry/compose.yaml @@ -0,0 +1,10 @@ +services: + + otel-collector: + image: otel/opentelemetry-collector-contrib:latest + command: ["--config=/etc/otelcol/config.yaml"] + volumes: + - ./otel-collector-config.yaml:/etc/otelcol/config.yaml:ro + ports: + - "4318:4318" # OTLP HTTP receiver + - "4317:4317" # OTLP gRPC receiver \ No newline at end of file diff --git a/examples/boot-telemetry/otel-collector-config.yaml b/examples/boot-telemetry/otel-collector-config.yaml new file mode 100644 index 00000000..2e2eef31 --- /dev/null +++ b/examples/boot-telemetry/otel-collector-config.yaml @@ -0,0 +1,36 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 # OTLP gRPC endpoint + http: + endpoint: 0.0.0.0:4318 # OTLP HTTP endpoint (used by Spring Boot) + +processors: + batch: {} # Required for efficient exporting + +exporters: + debug: + verbosity: detailed # Optional: remove in production + + # Uncomment the following exporters to enable AWS X-Ray and CloudWatch exporting + # awsxray: + # region: "us-east-1" # X-Ray traces + + # awsemf: + # region: "us-east-1" + # log_group_name: "/spring/metrics" # Change to your CloudWatch log group + # namespace: "opentelemetry" # CloudWatch metrics namespace + + +service: + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [debug] # [awsxray, debug] # Uncomment awsxray to enable X-Ray exporting + + metrics: + receivers: [otlp] + processors: [batch] + exporters: [debug] # [awsemf, debug] # Uncomment awsemf to enable CloudWatch exporting diff --git a/examples/boot-telemetry/pom.xml b/examples/boot-telemetry/pom.xml new file mode 100644 index 00000000..2c463420 --- /dev/null +++ b/examples/boot-telemetry/pom.xml @@ -0,0 +1,53 @@ + + + 4.0.0 + + + io.valkey.springframework + spring-data-valkey-examples + 0.1.0 + + + spring-data-valkey-example-boot-telemetry + Spring Data Valkey - Spring Boot OpenTelemetry Example + + + example.boottelemetry.SpringBootTelemetryExample + + + + + org.springframework.boot + spring-boot-starter + + + io.valkey.springframework.boot + spring-boot-starter-data-valkey + + + org.springframework.boot + spring-boot-docker-compose + + + + + + + kr.motd.maven + os-maven-plugin + 1.7.1 + + + + + org.codehaus.mojo + exec-maven-plugin + + example.boottelemetry.SpringBootTelemetryExample + + + + + diff --git a/examples/boot-telemetry/src/main/java/example/springboot/SpringBootTelemetryExample.java b/examples/boot-telemetry/src/main/java/example/springboot/SpringBootTelemetryExample.java new file mode 100644 index 00000000..6bc73f4d --- /dev/null +++ b/examples/boot-telemetry/src/main/java/example/springboot/SpringBootTelemetryExample.java @@ -0,0 +1,52 @@ +package example.boottelemetry; + +import io.valkey.springframework.data.valkey.core.StringValkeyTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * Minimal Spring Boot example that demonstrates OpenTelemetry integration + * with Valkey-GLIDE via {@link StringValkeyTemplate}. + * + *

This example exists to verify and showcase that Valkey commands executed + * through Spring Data Valkey automatically emit OpenTelemetry signals when + * OpenTelemetry is enabled via application properties.

+ * + *

Telemetry is emitted while the application runs and Valkey commands are + * executed. Traces can be inspected via the configured OpenTelemetry backend, + * for example by viewing the collector logs:

+ * + *
+ * docker logs -f spring-boot-opentelemetry-otel-collector-1
+ * 
+ */ +@SpringBootApplication +public class SpringBootTelemetryExample implements CommandLineRunner { + + @Autowired + private StringValkeyTemplate valkeyTemplate; + + public static void main(String[] args) { + SpringApplication.run(SpringBootTelemetryExample.class, args); + } + + @Override + public void run(String... args) { + + // Increase this number to generate more Valkey commands/traces + int iterations = 10; + for (int i = 0; i < iterations; i++) { + String key = "key" + i; + String value = "value" + i; + + valkeyTemplate.opsForValue().set(key, value); + String readBack = valkeyTemplate.opsForValue().get(key); + + // System.out.println("Iteration " + i + ": " + key + "=" + readBack); + } + + System.out.println("Completed " + iterations + " iterations of Valkey commands."); + } +} diff --git a/examples/boot-telemetry/src/main/resources/application.properties b/examples/boot-telemetry/src/main/resources/application.properties new file mode 100644 index 00000000..3ee03617 --- /dev/null +++ b/examples/boot-telemetry/src/main/resources/application.properties @@ -0,0 +1,16 @@ +spring.data.valkey.host=localhost +spring.data.valkey.port=6379 +spring.data.valkey.client-type=valkeyglide + +# Keep the Docker Compose services running after the example stops +# Provide the option to inspect the services if needed +# Run `docker logs -f spring-boot-opentelemetry-otel-collector-1` to see the OpenTelemetry Collector logs +# If you changed the docker logs run `docker compose down --remove-orphans` to stop and remove the containers +spring.docker.compose.lifecycle-management=start-only + +# Enable OpenTelemetry inside GLIDE +spring.data.valkey.valkey-glide.open-telemetry.enabled=true +spring.data.valkey.valkey-glide.open-telemetry.traces-endpoint=http://localhost:4318/v1/traces +spring.data.valkey.valkey-glide.open-telemetry.metrics-endpoint=http://localhost:4318/v1/metrics +spring.data.valkey.valkey-glide.open-telemetry.sample-percentage=10 +spring.data.valkey.valkey-glide.open-telemetry.flush-interval-ms=1000 diff --git a/examples/boot-telemetry/src/main/resources/logback.xml b/examples/boot-telemetry/src/main/resources/logback.xml new file mode 100644 index 00000000..67933d5f --- /dev/null +++ b/examples/boot-telemetry/src/main/resources/logback.xml @@ -0,0 +1,16 @@ + + + + + + %d %5p %40.40c:%4L - %m%n + + + + + + + + + + diff --git a/examples/pom.xml b/examples/pom.xml index af732abd..9d692b07 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,6 +21,7 @@ cache cluster collections + telemetry operations pipeline quickstart diff --git a/examples/telemetry/pom.xml b/examples/telemetry/pom.xml new file mode 100644 index 00000000..fc8f39bf --- /dev/null +++ b/examples/telemetry/pom.xml @@ -0,0 +1,31 @@ + + + 4.0.0 + + + io.valkey.springframework + spring-data-valkey-examples + 0.1.0 + + + spring-data-valkey-example-telemetry + Spring Data Valkey - OpenTelemetry Example + + + example.telemetry.OpenTelemetryExample + + + + + + org.codehaus.mojo + exec-maven-plugin + + example.telemetry.OpenTelemetryExample + + + + + diff --git a/examples/telemetry/src/main/java/example/telemetry/OpenTelemetryExample.java b/examples/telemetry/src/main/java/example/telemetry/OpenTelemetryExample.java new file mode 100644 index 00000000..00b444b0 --- /dev/null +++ b/examples/telemetry/src/main/java/example/telemetry/OpenTelemetryExample.java @@ -0,0 +1,83 @@ +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example.telemetry; + +import io.valkey.springframework.data.valkey.connection.ValkeyStandaloneConfiguration; +import io.valkey.springframework.data.valkey.connection.valkeyglide.ValkeyGlideClientConfiguration; +import io.valkey.springframework.data.valkey.connection.valkeyglide.ValkeyGlideClientConfiguration.OpenTelemetryForGlide; +import io.valkey.springframework.data.valkey.connection.valkeyglide.ValkeyGlideConnectionFactory; +import io.valkey.springframework.data.valkey.core.StringValkeyTemplate; + +/** + * Minimal example that demonstrates how to enable and emit OpenTelemetry traces + * from Valkey-GLIDE using {@link StringValkeyTemplate}. + * + *

This example exists to validate and showcase the OpenTelemetry integration + * in Valkey-GLIDE by executing a small number of Valkey commands and exporting + * traces to an OpenTelemetry Collector.

+ * + *

Telemetry is emitted while the application runs and Valkey commands are + * executed. Traces can be inspected via the configured OpenTelemetry backend, + * for example by viewing the collector logs:

+ * + *
+ * docker logs -f spring-boot-opentelemetry-otel-collector-1
+ * 
+ */ +public class OpenTelemetryExample { + + public static void main(String[] args) { + + ValkeyStandaloneConfiguration standaloneConfig = + new ValkeyStandaloneConfiguration(); + + // Change the default tracesEndpoint / metricsEndpoint if needed + OpenTelemetryForGlide openTelemetry = + OpenTelemetryForGlide.defaults(); + + ValkeyGlideClientConfiguration clientConfig = + ValkeyGlideClientConfiguration.builder() + .useOpenTelemetry(openTelemetry) + .build(); + + ValkeyGlideConnectionFactory connectionFactory = + new ValkeyGlideConnectionFactory(standaloneConfig, clientConfig); + + connectionFactory.afterPropertiesSet(); + + try { + StringValkeyTemplate template = + new StringValkeyTemplate(connectionFactory); + + // Increase this number to generate more Valkey commands/traces + int iterations = 10; + for (int i = 0; i < iterations; i++) { + String key = "key" + i; + String value = "value" + i; + + template.opsForValue().set(key, value); + String readBack = template.opsForValue().get(key); + + // System.out.println("Iteration " + i + ": " + key + "=" + readBack); + } + + System.out.println("Completed " + iterations + " iterations of Valkey commands."); + + } finally { + connectionFactory.destroy(); + } + } +} diff --git a/spring-boot-starter-data-valkey/src/main/java/io/valkey/springframework/boot/autoconfigure/data/valkey/ValkeyGlideConnectionConfiguration.java b/spring-boot-starter-data-valkey/src/main/java/io/valkey/springframework/boot/autoconfigure/data/valkey/ValkeyGlideConnectionConfiguration.java index cb8cdc38..d7d8714b 100644 --- a/spring-boot-starter-data-valkey/src/main/java/io/valkey/springframework/boot/autoconfigure/data/valkey/ValkeyGlideConnectionConfiguration.java +++ b/spring-boot-starter-data-valkey/src/main/java/io/valkey/springframework/boot/autoconfigure/data/valkey/ValkeyGlideConnectionConfiguration.java @@ -120,6 +120,19 @@ private ValkeyGlideClientConfiguration getValkeyGlideClientConfiguration( builder.maxPoolSize(valkeyGlideProperties.getMaxPoolSize()); } + // Apply OpenTelemetry configuration if enabled + ValkeyProperties.ValkeyGlide.OpenTelemetry otelProperties = valkeyGlideProperties.getOpenTelemetry(); + if (otelProperties != null && otelProperties.isEnabled()) { + ValkeyGlideClientConfiguration.OpenTelemetryForGlide otelConfig = + new ValkeyGlideClientConfiguration.OpenTelemetryForGlide( + otelProperties.getTracesEndpoint(), + otelProperties.getMetricsEndpoint(), + otelProperties.getSamplePercentage(), + otelProperties.getFlushIntervalMs() + ); + builder.useOpenTelemetry(otelConfig); + } + builderCustomizers.orderedStream().forEach((customizer) -> customizer.customize(builder)); return builder.build(); } diff --git a/spring-boot-starter-data-valkey/src/main/java/io/valkey/springframework/boot/autoconfigure/data/valkey/ValkeyProperties.java b/spring-boot-starter-data-valkey/src/main/java/io/valkey/springframework/boot/autoconfigure/data/valkey/ValkeyProperties.java index eac2c014..3e0407a5 100644 --- a/spring-boot-starter-data-valkey/src/main/java/io/valkey/springframework/boot/autoconfigure/data/valkey/ValkeyProperties.java +++ b/spring-boot-starter-data-valkey/src/main/java/io/valkey/springframework/boot/autoconfigure/data/valkey/ValkeyProperties.java @@ -608,6 +608,8 @@ public static class ValkeyGlide { private final Cluster cluster = new Cluster(); + private final OpenTelemetry openTelemetry = new OpenTelemetry(); + public Duration getConnectionTimeout() { return this.connectionTimeout; } @@ -652,6 +654,10 @@ public Cluster getCluster() { return this.cluster; } + public OpenTelemetry getOpenTelemetry() { + return this.openTelemetry; + } + public static class Cluster { private final Refresh refresh = new Refresh(); @@ -692,6 +698,78 @@ public void setAdaptive(boolean adaptive) { } } + /** + * OpenTelemetry configuration for GLIDE client. + */ + public static class OpenTelemetry { + + /** + * Whether to enable OpenTelemetry instrumentation. + */ + private boolean enabled = false; + + /** + * OTLP endpoint for traces. + */ + private String tracesEndpoint; + + /** + * OTLP endpoint for metrics. + */ + private String metricsEndpoint; + + /** + * Sampling percentage for traces (0-100). + */ + private Integer samplePercentage; + + /** + * Flush interval in milliseconds. + */ + private Long flushIntervalMs; + + public boolean isEnabled() { + return this.enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public String getTracesEndpoint() { + return this.tracesEndpoint; + } + + public void setTracesEndpoint(String tracesEndpoint) { + this.tracesEndpoint = tracesEndpoint; + } + + public String getMetricsEndpoint() { + return this.metricsEndpoint; + } + + public void setMetricsEndpoint(String metricsEndpoint) { + this.metricsEndpoint = metricsEndpoint; + } + + public Integer getSamplePercentage() { + return this.samplePercentage; + } + + public void setSamplePercentage(Integer samplePercentage) { + this.samplePercentage = samplePercentage; + } + + public Long getFlushIntervalMs() { + return this.flushIntervalMs; + } + + public void setFlushIntervalMs(Long flushIntervalMs) { + this.flushIntervalMs = flushIntervalMs; + } + + } + } -} +} \ No newline at end of file diff --git a/spring-boot-starter-data-valkey/src/test/java/io/valkey/springframework/boot/autoconfigure/data/valkey/ValkeyAutoConfigurationTests.java b/spring-boot-starter-data-valkey/src/test/java/io/valkey/springframework/boot/autoconfigure/data/valkey/ValkeyAutoConfigurationTests.java index dcc06865..2acd5c74 100644 --- a/spring-boot-starter-data-valkey/src/test/java/io/valkey/springframework/boot/autoconfigure/data/valkey/ValkeyAutoConfigurationTests.java +++ b/spring-boot-starter-data-valkey/src/test/java/io/valkey/springframework/boot/autoconfigure/data/valkey/ValkeyAutoConfigurationTests.java @@ -399,6 +399,94 @@ void shouldUseVirtualThreadsIfEnabled() { }); } + @Test + void shouldNotEnableOpenTelemetryByDefault() { + this.contextRunner.run((context) -> { + ValkeyProperties properties = context.getBean(ValkeyProperties.class); + assertThat(properties.getValkeyGlide().getOpenTelemetry().isEnabled()).isEqualTo(false); + }); + } + + @Test + void shouldBindOpenTelemetryTracesEndpoint() { + this.contextRunner.withPropertyValues( + "spring.data.valkey.client-type:valkeyglide", + "spring.data.valkey.valkey-glide.open-telemetry.enabled:true", + "spring.data.valkey.valkey-glide.open-telemetry.traces-endpoint:http://localhost:4318/v1/traces") + .run((context) -> { + ValkeyProperties properties = context.getBean(ValkeyProperties.class); + assertThat(properties.getValkeyGlide().getOpenTelemetry().getTracesEndpoint()) + .isEqualTo("http://localhost:4318/v1/traces"); + }); + } + + @Test + void shouldBindOpenTelemetryMetricsEndpoint() { + this.contextRunner.withPropertyValues( + "spring.data.valkey.client-type:valkeyglide", + "spring.data.valkey.valkey-glide.open-telemetry.enabled:true", + "spring.data.valkey.valkey-glide.open-telemetry.metrics-endpoint:http://localhost:4315/v1/metrics") + .run((context) -> { + ValkeyProperties properties = context.getBean(ValkeyProperties.class); + assertThat(properties.getValkeyGlide().getOpenTelemetry().getMetricsEndpoint()) + .isEqualTo("http://localhost:4315/v1/metrics"); + }); + } + + @Test + void shouldBindOpenTelemetrySamplePercentage() { + this.contextRunner.withPropertyValues( + "spring.data.valkey.client-type:valkeyglide", + "spring.data.valkey.valkey-glide.open-telemetry.enabled:true", + "spring.data.valkey.valkey-glide.open-telemetry.sample-percentage:30") + .run((context) -> { + ValkeyProperties properties = context.getBean(ValkeyProperties.class); + assertThat(properties.getValkeyGlide().getOpenTelemetry().getSamplePercentage()).isEqualTo(30); + }); + } + + @Test + void shouldBindOpenTelemetryFlushIntervalMs() { + this.contextRunner.withPropertyValues( + "spring.data.valkey.client-type:valkeyglide", + "spring.data.valkey.valkey-glide.open-telemetry.enabled:true", + "spring.data.valkey.valkey-glide.open-telemetry.flush-interval-ms:3000") + .run((context) -> { + ValkeyProperties properties = context.getBean(ValkeyProperties.class); + assertThat(properties.getValkeyGlide().getOpenTelemetry().getFlushIntervalMs()).isEqualTo(3000L); + }); + } + + @Test + void shouldFailToStartWhenOpenTelemetrySamplePercentageIsInvalid() { + this.contextRunner.withPropertyValues( + "spring.data.valkey.client-type:valkeyglide", + "spring.data.valkey.valkey-glide.open-telemetry.enabled:true", + "spring.data.valkey.valkey-glide.open-telemetry.sample-percentage:abc") + .run((context) -> assertThat(context).hasFailed()); + } + + @Test + void shouldFailToStartWhenOpenTelemetryFlushIntervalMsIsInvalid() { + this.contextRunner.withPropertyValues( + "spring.data.valkey.client-type:valkeyglide", + "spring.data.valkey.valkey-glide.open-telemetry.enabled:true", + "spring.data.valkey.valkey-glide.open-telemetry.flush-interval-ms:abc") + .run((context) -> assertThat(context).hasFailed()); + } + + @Test + void shouldNotEnableOpenTelemetryByDefaultEvenIfOtherPropertiesSet() { + this.contextRunner.withPropertyValues( + "spring.data.valkey.client-type:valkeyglide", + "spring.data.valkey.valkey-glide.open-telemetry.traces-endpoint:http://localhost:4318/v1/traces") + .run((context) -> { + ValkeyProperties properties = context.getBean(ValkeyProperties.class); + assertThat(properties.getValkeyGlide().getOpenTelemetry().isEnabled()).isEqualTo(false); + + }); + } + private ContextConsumer assertClientOptions( Consumer configConsumer) { return (context) -> { diff --git a/spring-boot-starter-data-valkey/src/test/java/io/valkey/springframework/boot/autoconfigure/data/valkey/ValkeyPropertiesTests.java b/spring-boot-starter-data-valkey/src/test/java/io/valkey/springframework/boot/autoconfigure/data/valkey/ValkeyPropertiesTests.java index 3e55b984..f2b50b5d 100644 --- a/spring-boot-starter-data-valkey/src/test/java/io/valkey/springframework/boot/autoconfigure/data/valkey/ValkeyPropertiesTests.java +++ b/spring-boot-starter-data-valkey/src/test/java/io/valkey/springframework/boot/autoconfigure/data/valkey/ValkeyPropertiesTests.java @@ -38,6 +38,8 @@ void valkeyGlideDefaultsAreConsistent() { assertThat(valkeyGlide.getClientAZ()).isNull(); assertThat(valkeyGlide.getCluster()).isNotNull(); assertThat(valkeyGlide.getMaxPoolSize()).isEqualTo(8); + assertThat(valkeyGlide.getOpenTelemetry()).isNotNull(); + assertThat(valkeyGlide.getOpenTelemetry().isEnabled()).isEqualTo(false); } } diff --git a/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/valkeyglide/DefaultValkeyGlideClientConfiguration.java b/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/valkeyglide/DefaultValkeyGlideClientConfiguration.java index 5825dcab..d70aef8d 100644 --- a/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/valkeyglide/DefaultValkeyGlideClientConfiguration.java +++ b/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/valkeyglide/DefaultValkeyGlideClientConfiguration.java @@ -37,9 +37,10 @@ public class DefaultValkeyGlideClientConfiguration implements ValkeyGlideClientC private final @Nullable String clientAZ; private final @Nullable BackoffStrategy reconnectStrategy; private final int maxPoolSize; + private final @Nullable OpenTelemetryForGlide openTelemetryForGlide; DefaultValkeyGlideClientConfiguration() { - this(null, false, null, null, null, null, null, 8); + this(null, false, null, null, null, null, null, 8, null); } public DefaultValkeyGlideClientConfiguration( @@ -50,7 +51,8 @@ public DefaultValkeyGlideClientConfiguration( @Nullable Integer inflightRequestsLimit, @Nullable String clientAZ, @Nullable BackoffStrategy reconnectStrategy, - int maxPoolSize) { + int maxPoolSize, + @Nullable OpenTelemetryForGlide openTelemetryForGlide) { this.commandTimeout = commandTimeout; this.useSsl = useSsl; this.connectionTimeout = connectionTimeout; @@ -59,6 +61,7 @@ public DefaultValkeyGlideClientConfiguration( this.clientAZ = clientAZ; this.reconnectStrategy = reconnectStrategy; this.maxPoolSize = maxPoolSize; + this.openTelemetryForGlide = openTelemetryForGlide; } @Nullable @@ -107,6 +110,12 @@ public int getMaxPoolSize() { return maxPoolSize; } + @Nullable + @Override + public OpenTelemetryForGlide getOpenTelemetryForGlide() { + return openTelemetryForGlide; + } + @Override public Optional getClientOptions() { return Optional.empty(); diff --git a/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideClientConfiguration.java b/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideClientConfiguration.java index 91a1b98b..24f0ca48 100644 --- a/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideClientConfiguration.java +++ b/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideClientConfiguration.java @@ -110,6 +110,14 @@ static ValkeyGlideClientConfiguration defaultConfiguration() { */ int getMaxPoolSize(); + /** + * Get OpenTelemetry configuration for Valkey-Glide client. + * + * @return The {@link OpenTelemetryForGlide} configuration. May be {@literal null} if not set. + */ + @Nullable + OpenTelemetryForGlide getOpenTelemetryForGlide(); + /** * Get client options for mode-specific configurations. * Placeholder for future mode-specific extensions. @@ -117,7 +125,34 @@ static ValkeyGlideClientConfiguration defaultConfiguration() { * @return Optional containing client options if configured. */ Optional getClientOptions(); - + /** + * Record representing OpenTelemetry configuration for Valkey-Glide client. + * + * @param tracesEndpoint the OTLP endpoint for traces, or {@code null} if not set. + * @param metricsEndpoint the OTLP endpoint for metrics, or {@code null} if not set. + * @param samplePercentage the sampling percentage for traces, or {@code null} if not set. + * @param flushIntervalMs the flush interval in milliseconds, or {@code null} if not set. + */ + public record OpenTelemetryForGlide( + @Nullable String tracesEndpoint, + @Nullable String metricsEndpoint, + @Nullable Integer samplePercentage, + @Nullable Long flushIntervalMs + ) { + + /** + * Default OpenTelemetry configuration for Valkey-Glide. + */ + public static OpenTelemetryForGlide defaults() { + return new OpenTelemetryForGlide( + "http://localhost:4318/v1/traces", + "http://localhost:4318/v1/metrics", + 1, + 5000L + ); + } + } + /** * Builder for {@link ValkeyGlideClientConfiguration}. */ @@ -131,6 +166,8 @@ class ValkeyGlideClientConfigurationBuilder { private @Nullable String clientAZ; private @Nullable BackoffStrategy reconnectStrategy; private int maxPoolSize = 8; // Default pool size + private @Nullable OpenTelemetryForGlide openTelemetryForGlide; + ValkeyGlideClientConfigurationBuilder() {} @@ -209,7 +246,20 @@ public ValkeyGlideClientConfigurationBuilder reconnectStrategy(BackoffStrategy r this.reconnectStrategy = reconnectStrategy; return this; } - + + /** + * Initialize GLIDE OpenTelemetry with OTLP endpoints. + * + * If at least one endpoint (traces or metrics) is provided, this will initialize + * OpenTelemetry once per JVM. + */ + public ValkeyGlideClientConfigurationBuilder useOpenTelemetry( + OpenTelemetryForGlide openTelemetryForGlide + ) { + this.openTelemetryForGlide = openTelemetryForGlide; + return this; + } + /** * Set the maximum pool size for client pooling. * @@ -235,7 +285,8 @@ public ValkeyGlideClientConfiguration build() { inflightRequestsLimit, clientAZ, reconnectStrategy, - maxPoolSize + maxPoolSize, + openTelemetryForGlide ); } } diff --git a/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideConnectionFactory.java b/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideConnectionFactory.java index 4198e1b1..4f57a2ae 100644 --- a/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideConnectionFactory.java +++ b/spring-data-valkey/src/main/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideConnectionFactory.java @@ -32,6 +32,7 @@ import io.valkey.springframework.data.valkey.connection.ValkeyPassword; import io.valkey.springframework.data.valkey.connection.ValkeySentinelConfiguration; import io.valkey.springframework.data.valkey.connection.ValkeyStandaloneConfiguration; +import io.valkey.springframework.data.valkey.connection.valkeyglide.ValkeyGlideClientConfiguration.OpenTelemetryForGlide; import io.valkey.springframework.data.valkey.connection.ValkeySentinelConnection; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -49,6 +50,10 @@ import glide.api.models.configuration.GlideClusterClientConfiguration; import glide.api.models.configuration.NodeAddress; import glide.api.models.configuration.ReadFrom; +import glide.api.OpenTelemetry; +import glide.api.OpenTelemetry.MetricsConfig; +import glide.api.OpenTelemetry.OpenTelemetryConfig; +import glide.api.OpenTelemetry.TracesConfig; import glide.api.models.configuration.StandaloneSubscriptionConfiguration; import glide.api.models.configuration.ClusterSubscriptionConfiguration; @@ -59,6 +64,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; /** * Connection factory creating Valkey Glide based @@ -96,6 +102,9 @@ public class ValkeyGlideConnectionFactory private boolean autoStartup = true; private boolean earlyStartup = true; private int phase = 0; + private static final AtomicBoolean OTEL_INITIALIZED = new AtomicBoolean(false); + private static @Nullable OpenTelemetryForGlide OTEL_INITIALIZED_CONFIG; + private static final Object OTEL_LOCK = new Object(); /** * Maps native Glide clients ({@link GlideClient} or {@link GlideClusterClient}) to their @@ -315,6 +324,84 @@ public void destroy() { running = false; } + private boolean sameOtelConfig(@Nullable OpenTelemetryForGlide a, @Nullable OpenTelemetryForGlide b) { + return java.util.Objects.equals(a, b); + } + + private void useOpenTelemetry(OpenTelemetryForGlide openTelemetryForGlide) { + Assert.notNull(openTelemetryForGlide, "OpenTelemetryForGlide must not be null"); + + String tracesEndpoint = openTelemetryForGlide.tracesEndpoint(); + String metricsEndpoint = openTelemetryForGlide.metricsEndpoint(); + Integer samplePercentage = openTelemetryForGlide.samplePercentage(); + Long flushIntervalMs = openTelemetryForGlide.flushIntervalMs(); + + boolean hasTraces = tracesEndpoint != null && !tracesEndpoint.isBlank(); + boolean hasMetrics = metricsEndpoint != null && !metricsEndpoint.isBlank(); + + if (!hasTraces && !hasMetrics){ + throw new IllegalArgumentException( + "OpenTelemetryForGlide requires at least one of tracesEndpoint or metricsEndpoint" + ); + } + + if (samplePercentage != null) { + Assert.isTrue( + samplePercentage >= 0 && samplePercentage <= 100, + "samplePercentage must be in range [0..100]" + ); + } + + if (flushIntervalMs != null) { + Assert.isTrue( + flushIntervalMs > 0, + "flushIntervalMs must be > 0" + ); + } + + synchronized (OTEL_LOCK) { + if (OTEL_INITIALIZED.getAndSet(true)) { + if (sameOtelConfig(OTEL_INITIALIZED_CONFIG, openTelemetryForGlide)) { + return; // Already initialized with the same config + } + + throw new IllegalStateException( + "OpenTelemetry is already initialized with a different configuration. " + + "existing=" + OTEL_INITIALIZED_CONFIG + + ", requested=" + openTelemetryForGlide + ); + } + + OpenTelemetryConfig.Builder otelBuilder = OpenTelemetryConfig.builder(); + + if (hasTraces) { + TracesConfig.Builder tracesBuilder = + TracesConfig.builder().endpoint(tracesEndpoint); + + if (samplePercentage != null) { + tracesBuilder.samplePercentage(samplePercentage); + } + + otelBuilder.traces(tracesBuilder.build()); + } + + if (hasMetrics) { + otelBuilder.metrics( + MetricsConfig.builder() + .endpoint(metricsEndpoint) + .build() + ); + } + + if (flushIntervalMs != null) { + otelBuilder.flushIntervalMs(flushIntervalMs); + } + + OTEL_INITIALIZED_CONFIG = openTelemetryForGlide; + OpenTelemetry.init(otelBuilder.build()); + } + } + /** * Creates a GlideClient instance for each connection. */ @@ -408,6 +495,12 @@ private GlideClient createGlideClient() { configBuilder.reconnectStrategy(reconnectStrategy); } + // OpenTelemetry + OpenTelemetryForGlide openTelemetryForGlide = valkeyGlideConfiguration.getOpenTelemetryForGlide(); + if (openTelemetryForGlide != null){ + this.useOpenTelemetry(openTelemetryForGlide); + } + // Pubsub listener DelegatingPubSubListener clientListener = new DelegatingPubSubListener(); @@ -521,6 +614,12 @@ private GlideClusterClient createGlideClusterClient() { if (reconnectStrategy != null) { configBuilder.reconnectStrategy(reconnectStrategy); } + + // OpenTelemetry + OpenTelemetryForGlide openTelemetryForGlide = valkeyGlideConfiguration.getOpenTelemetryForGlide(); + if (openTelemetryForGlide != null){ + this.useOpenTelemetry(openTelemetryForGlide); + } DelegatingPubSubListener clientListener = new DelegatingPubSubListener(); @@ -759,4 +858,4 @@ public DataAccessException translateExceptionIfPossible(RuntimeException ex) { // Use ValkeyGlideExceptionConverter to translate exceptions return new ValkeyGlideExceptionConverter().convert(ex); } -} +} \ No newline at end of file diff --git a/spring-data-valkey/src/test/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideOpenTelemetryConfigurationTests.java b/spring-data-valkey/src/test/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideOpenTelemetryConfigurationTests.java new file mode 100644 index 00000000..a724ac47 --- /dev/null +++ b/spring-data-valkey/src/test/java/io/valkey/springframework/data/valkey/connection/valkeyglide/ValkeyGlideOpenTelemetryConfigurationTests.java @@ -0,0 +1,280 @@ +package io.valkey.springframework.data.valkey.connection.valkeyglide; + +import static org.assertj.core.api.Assertions.*; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.valkey.springframework.data.valkey.connection.ValkeyClusterConfiguration; + +/** + * Unit tests for OpenTelemetry configuration in Valkey-Glide integration. + * + * Important: Validation + "init once" logic lives in ValkeyGlideConnectionFactory#useOpenTelemetry, + * not in ValkeyGlideClientConfigurationBuilder#useOpenTelemetry (builder only stores config). + */ +class ValkeyGlideOpenTelemetryConfigurationTests { + + private static final String TRACES_ENDPOINT = "http://localhost:4318/v1/traces"; + private static final String METRICS_ENDPOINT = "http://localhost:4318/v1/metrics"; + + @BeforeEach + void resetOtelInitializationState() { + resetFactoryStaticOpenTelemetryState(); + } + + @Test + void defaultsShouldReturnExpectedValues() { + + ValkeyGlideClientConfiguration.OpenTelemetryForGlide defaults = + ValkeyGlideClientConfiguration.OpenTelemetryForGlide.defaults(); + + assertThat(defaults.tracesEndpoint()).isEqualTo(TRACES_ENDPOINT); + assertThat(defaults.metricsEndpoint()).isEqualTo(METRICS_ENDPOINT); + assertThat(defaults.samplePercentage()).isEqualTo(1); + assertThat(defaults.flushIntervalMs()).isEqualTo(5000L); + } + + @Test + void shouldThrowExceptionWhenBothEndpointsAreNull() { + + ValkeyGlideClientConfiguration.OpenTelemetryForGlide cfg = + new ValkeyGlideClientConfiguration.OpenTelemetryForGlide(null, null, 10, 100L); + + ValkeyGlideConnectionFactory factory = new ValkeyGlideConnectionFactory( + new ValkeyClusterConfiguration(), + ValkeyGlideClientConfiguration.builder().useOpenTelemetry(cfg).build() + ); + + assertThatIllegalArgumentException() + .isThrownBy(() -> invokeUseOpenTelemetry(factory, cfg)); + } + + @Test + void shouldInitializeWhenTracesOnlyIsProvided() { + + ValkeyGlideClientConfiguration.OpenTelemetryForGlide cfg = + new ValkeyGlideClientConfiguration.OpenTelemetryForGlide(TRACES_ENDPOINT, null, 10, 100L); + + ValkeyGlideConnectionFactory factory = new ValkeyGlideConnectionFactory( + new ValkeyClusterConfiguration(), + ValkeyGlideClientConfiguration.builder().useOpenTelemetry(cfg).build() + ); + + assertThatNoException().isThrownBy(() -> invokeUseOpenTelemetry(factory, cfg)); + } + + @Test + void shouldInitializeWhenMetricsOnlyIsProvided() { + + ValkeyGlideClientConfiguration.OpenTelemetryForGlide cfg = + new ValkeyGlideClientConfiguration.OpenTelemetryForGlide(null, METRICS_ENDPOINT, null, null); + + ValkeyGlideConnectionFactory factory = new ValkeyGlideConnectionFactory( + new ValkeyClusterConfiguration(), + ValkeyGlideClientConfiguration.builder().useOpenTelemetry(cfg).build() + ); + + assertThatNoException().isThrownBy(() -> invokeUseOpenTelemetry(factory, cfg)); + } + + @Test + void shouldNotFailWhenAlreadyInitializedWithSameConfig() { + + ValkeyGlideClientConfiguration.OpenTelemetryForGlide cfg = + new ValkeyGlideClientConfiguration.OpenTelemetryForGlide( + TRACES_ENDPOINT, METRICS_ENDPOINT, 10, 100L + ); + + ValkeyGlideConnectionFactory factory = new ValkeyGlideConnectionFactory( + new ValkeyClusterConfiguration(), + ValkeyGlideClientConfiguration.builder().useOpenTelemetry(cfg).build() + ); + + assertThatNoException().isThrownBy(() -> invokeUseOpenTelemetry(factory, cfg)); + assertThatNoException().isThrownBy(() -> invokeUseOpenTelemetry(factory, cfg)); + } + + @Test + void shouldThrowWhenAlreadyInitializedWithDifferentConfig() { + + ValkeyGlideClientConfiguration.OpenTelemetryForGlide first = + new ValkeyGlideClientConfiguration.OpenTelemetryForGlide(TRACES_ENDPOINT, null, 10, 100L); + + ValkeyGlideClientConfiguration.OpenTelemetryForGlide second = + new ValkeyGlideClientConfiguration.OpenTelemetryForGlide( + "http://different-endpoint.com:4318/v1/traces", null, 10, 100L + ); + + ValkeyGlideConnectionFactory factory = new ValkeyGlideConnectionFactory( + new ValkeyClusterConfiguration(), + ValkeyGlideClientConfiguration.builder().useOpenTelemetry(first).build() + ); + + assertThatNoException().isThrownBy(() -> invokeUseOpenTelemetry(factory, first)); + + assertThatThrownBy(() -> invokeUseOpenTelemetry(factory, second)) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("already initialized with a different configuration"); + } + + @Test + void shouldTreatBlankEndpointsAsMissingAndFail() { + + ValkeyGlideClientConfiguration.OpenTelemetryForGlide cfg = + new ValkeyGlideClientConfiguration.OpenTelemetryForGlide(" ", "\t", 10, 100L); + + ValkeyGlideConnectionFactory factory = new ValkeyGlideConnectionFactory( + new ValkeyClusterConfiguration(), + ValkeyGlideClientConfiguration.builder().useOpenTelemetry(cfg).build() + ); + + assertThatIllegalArgumentException() + .isThrownBy(() -> invokeUseOpenTelemetry(factory, cfg)) + .withMessageContaining("requires at least one of tracesEndpoint or metricsEndpoint"); + } + + @Test + void shouldInitializeWhenTracesEndpointIsBlankButMetricsIsProvided() { + + ValkeyGlideClientConfiguration.OpenTelemetryForGlide cfg = + new ValkeyGlideClientConfiguration.OpenTelemetryForGlide(" ", METRICS_ENDPOINT, null, null); + + ValkeyGlideConnectionFactory factory = new ValkeyGlideConnectionFactory( + new ValkeyClusterConfiguration(), + ValkeyGlideClientConfiguration.builder().useOpenTelemetry(cfg).build() + ); + + assertThatNoException().isThrownBy(() -> invokeUseOpenTelemetry(factory, cfg)); + } + + @Test + void shouldInitializeWhenMetricsEndpointIsBlankButTracesIsProvided() { + + ValkeyGlideClientConfiguration.OpenTelemetryForGlide cfg = + new ValkeyGlideClientConfiguration.OpenTelemetryForGlide(TRACES_ENDPOINT, " ", 10, 100L); + + ValkeyGlideConnectionFactory factory = new ValkeyGlideConnectionFactory( + new ValkeyClusterConfiguration(), + ValkeyGlideClientConfiguration.builder().useOpenTelemetry(cfg).build() + ); + + assertThatNoException().isThrownBy(() -> invokeUseOpenTelemetry(factory, cfg)); + } + + @Test + void shouldNotInitializeWhenOpenTelemetryConfigIsNull() { + assertThat(isOtelInitialized()).isFalse(); + assertThat(getInitializedConfig()).isNull(); + } + + @Test + void shouldThrowWhenSamplePercentageIsOutOfRange() { + + ValkeyGlideClientConfiguration.OpenTelemetryForGlide cfg = + new ValkeyGlideClientConfiguration.OpenTelemetryForGlide(TRACES_ENDPOINT, null, 101, 100L); + + ValkeyGlideConnectionFactory factory = new ValkeyGlideConnectionFactory( + new ValkeyClusterConfiguration(), + ValkeyGlideClientConfiguration.builder().useOpenTelemetry(cfg).build() + ); + + assertThatIllegalArgumentException() + .isThrownBy(() -> invokeUseOpenTelemetry(factory, cfg)) + .withMessageContaining("samplePercentage"); + } + + @Test + void shouldThrowWhenFlushIntervalMsIsNonPositive() { + + ValkeyGlideClientConfiguration.OpenTelemetryForGlide cfg = + new ValkeyGlideClientConfiguration.OpenTelemetryForGlide(TRACES_ENDPOINT, null, 10, 0L); + + ValkeyGlideConnectionFactory factory = new ValkeyGlideConnectionFactory( + new ValkeyClusterConfiguration(), + ValkeyGlideClientConfiguration.builder().useOpenTelemetry(cfg).build() + ); + + assertThatIllegalArgumentException() + .isThrownBy(() -> invokeUseOpenTelemetry(factory, cfg)) + .withMessageContaining("flushIntervalMs"); + } + + // ----------------- helpers ----------------- + + private static boolean isOtelInitialized() { + try { + Field initializedField = ValkeyGlideConnectionFactory.class.getDeclaredField("OTEL_INITIALIZED"); + initializedField.setAccessible(true); + AtomicBoolean initialized = (AtomicBoolean) initializedField.get(null); + return initialized.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static ValkeyGlideClientConfiguration.OpenTelemetryForGlide getInitializedConfig() { + try { + Field configField = ValkeyGlideConnectionFactory.class.getDeclaredField("OTEL_INITIALIZED_CONFIG"); + configField.setAccessible(true); + return (ValkeyGlideClientConfiguration.OpenTelemetryForGlide) configField.get(null); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Invoke the private useOpenTelemetry method via reflection. + */ + private static void invokeUseOpenTelemetry( + ValkeyGlideConnectionFactory factory, + ValkeyGlideClientConfiguration.OpenTelemetryForGlide openTelemetryForGlide + ) throws Exception { + + Method m = ValkeyGlideConnectionFactory.class + .getDeclaredMethod( + "useOpenTelemetry", + ValkeyGlideClientConfiguration.OpenTelemetryForGlide.class + ); + m.setAccessible(true); + + try { + m.invoke(factory, openTelemetryForGlide); + } catch (InvocationTargetException ite) { + Throwable cause = ite.getCause(); + if (cause instanceof Exception e) { + throw e; + } + if (cause instanceof Error err) { + throw err; + } + throw new RuntimeException(cause); + } + } + + /** + * Reset the static OpenTelemetry initialization state in ValkeyGlideConnectionFactory. + */ + private static void resetFactoryStaticOpenTelemetryState() { + try { + Class factoryClass = ValkeyGlideConnectionFactory.class; + + Field initializedField = factoryClass.getDeclaredField("OTEL_INITIALIZED"); + initializedField.setAccessible(true); + AtomicBoolean initialized = (AtomicBoolean) initializedField.get(null); + initialized.set(false); + + Field configField = factoryClass.getDeclaredField("OTEL_INITIALIZED_CONFIG"); + configField.setAccessible(true); + configField.set(null, null); + + } catch (Exception e) { + throw new RuntimeException("Failed to reset OpenTelemetry static state for tests", e); + } + } +}