Skip to content

Commit 6165c47

Browse files
committed
adding command name detection and remaining tests
Signed-off-by: TJ Zhang <[email protected]>
1 parent 61eeda1 commit 6165c47

File tree

4 files changed

+179
-44
lines changed

4 files changed

+179
-44
lines changed

ffi/src/lib.rs

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -940,6 +940,7 @@ fn valkey_value_to_command_response(value: Value) -> RedisResult<CommandResponse
940940
/// * `route_bytes` is an optional array of bytes that will be parsed into a Protobuf `Routes` object. The array must be allocated by the caller and subsequently freed by the caller after this function returns.
941941
/// * `route_bytes_len` is the number of bytes in `route_bytes`. It must also not be greater than the max value of a signed pointer-sized integer.
942942
/// * `route_bytes_len` must be 0 if `route_bytes` is null.
943+
/// * `span_ptr` is a pointer to a span created by [`create_otel_span`]. The span must be valid until the command is finished.
943944
/// * This function should only be called should with a `client_adapter_ptr` created by [`create_client`], before [`close_client`] was called with the pointer.
944945
#[unsafe(no_mangle)]
945946
pub unsafe extern "C" fn command(
@@ -974,7 +975,7 @@ pub unsafe extern "C" fn command(
974975
cmd.arg(command_arg);
975976
}
976977
if span_ptr != 0 {
977-
cmd.set_span(get_unsafe_span_from_ptr(Some(span_ptr)));
978+
cmd.set_span(unsafe { get_unsafe_span_from_ptr(Some(span_ptr)) });
978979
}
979980

980981
let route = if !route_bytes.is_null() {
@@ -1376,24 +1377,25 @@ pub unsafe extern "C" fn invoke_script(
13761377
/// Creates an OpenTelemetry span with the given name and returns a pointer to the span as u64.
13771378
///
13781379
/// #Safety
1379-
/// TODO
1380+
/// * `request_type` must be a valid request type.
13801381
#[unsafe(no_mangle)]
1381-
pub unsafe extern "C" fn create_otel_span(name: *const c_char) -> u64 {
1382-
if name.is_null() {
1383-
return 0;
1384-
}
1385-
let c_str = unsafe { CStr::from_ptr(name) };
1386-
let name_str = match c_str.to_str() {
1387-
Ok(s) => s,
1388-
Err(_) => return 0,
1389-
};
1390-
let span = GlideOpenTelemetry::new_span(name_str);
1382+
pub unsafe extern "C" fn create_otel_span(request_type: RequestType) -> u64 {
1383+
let cmd = request_type
1384+
.get_command()
1385+
.expect("Couldn't fetch command type");
1386+
let cmd_bytes = cmd.command().unwrap();
1387+
let command_name = std::str::from_utf8(cmd_bytes.as_slice()).unwrap();
1388+
1389+
let span = GlideOpenTelemetry::new_span(command_name);
13911390
let arc = Arc::new(span);
13921391
let ptr = Arc::into_raw(arc);
13931392
ptr as u64
13941393
}
13951394

13961395
/// Drops an OpenTelemetry span given its pointer as u64.
1396+
///
1397+
/// # Safety
1398+
/// * `span_ptr` must be a valid pointer to a span created by [`create_otel_span`].
13971399
#[unsafe(no_mangle)]
13981400
pub unsafe extern "C" fn drop_otel_span(span_ptr: u64) {
13991401
if span_ptr == 0 {
@@ -1570,7 +1572,7 @@ fn ensure_tokio_runtime() -> &'static Runtime {
15701572
///
15711573
/// * `Some(GlideSpan)` - A cloned GlideSpan if the pointer is valid
15721574
/// * `None` - If the pointer is None
1573-
fn get_unsafe_span_from_ptr(command_span: Option<u64>) -> Option<GlideSpan> {
1575+
unsafe fn get_unsafe_span_from_ptr(command_span: Option<u64>) -> Option<GlideSpan> {
15741576
command_span.map(|command_span| unsafe {
15751577
Arc::increment_strong_count(command_span as *const GlideSpan);
15761578
(*Arc::from_raw(command_span as *const GlideSpan)).clone()

go/base_client.go

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -287,8 +287,6 @@ func (client *baseClient) executeCommandWithRoute(
287287
args []string,
288288
route config.Route,
289289
) (*C.struct_CommandResponse, error) {
290-
//fmt.Println("Starting executeCommandWithRoute===============================")
291-
//fmt.Println("otelInstance.ShouldSample()===============================", otelInstance.ShouldSample())
292290
// Check if context is already done
293291
select {
294292
case <-ctx.Done():
@@ -299,14 +297,9 @@ func (client *baseClient) executeCommandWithRoute(
299297
// Create span if OpenTelemetry is enabled and sampling is configured
300298
var spanPtr uint64
301299
if otelInstance != nil && otelInstance.ShouldSample() {
302-
// Use the first argument as the command name, or "Batch" if it's a batch command
303-
spanName := "Batch"
304-
if len(args) > 0 {
305-
spanName = args[0]
306-
}
307-
//fmt.Println("ready to create span===============================")
308-
spanPtr = otelInstance.CreateSpan(spanName)
309-
//fmt.Println("span created===============================", spanPtr)
300+
// Pass the request type to determine the descriptive name of the command
301+
// to use as the span name
302+
spanPtr = otelInstance.CreateSpan(requestType)
310303
}
311304
var cArgsPtr *C.uintptr_t = nil
312305
var argLengthsPtr *C.ulong = nil
@@ -321,15 +314,13 @@ func (client *baseClient) executeCommandWithRoute(
321314
routeProto, err := routeToProtobuf(route)
322315
if err != nil {
323316
if spanPtr != 0 {
324-
//fmt.Println("Dropping span at point 0===============================")
325317
otelInstance.DropSpan(spanPtr)
326318
}
327319
return nil, &errors.RequestError{Msg: "ExecuteCommand failed due to invalid route"}
328320
}
329321
msg, err := proto.Marshal(routeProto)
330322
if err != nil {
331323
if spanPtr != 0 {
332-
//fmt.Println("Dropping span at point 1===============================")
333324
otelInstance.DropSpan(spanPtr)
334325
}
335326
return nil, err
@@ -350,7 +341,6 @@ func (client *baseClient) executeCommandWithRoute(
350341
if client.coreClient == nil {
351342
client.mu.Unlock()
352343
if spanPtr != 0 {
353-
//fmt.Println("Dropping span at point 2===============================")
354344
otelInstance.DropSpan(spanPtr)
355345
}
356346
return nil, &errors.ClosingError{Msg: "ExecuteCommand failed. The client is closed."}
@@ -373,7 +363,6 @@ func (client *baseClient) executeCommandWithRoute(
373363
select {
374364
case <-ctx.Done():
375365
if spanPtr != 0 {
376-
//fmt.Println("Dropping span at point 3===============================")
377366
otelInstance.DropSpan(spanPtr)
378367
}
379368
client.mu.Lock()
@@ -391,7 +380,6 @@ func (client *baseClient) executeCommandWithRoute(
391380
delete(client.pending, resultChannelPtr)
392381
}
393382
if spanPtr != 0 {
394-
//fmt.Println("Dropping span at point 4===============================")
395383
otelInstance.DropSpan(spanPtr)
396384
}
397385
client.mu.Unlock()

go/integTest/opentelemetry_test.go

Lines changed: 158 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0
2+
13
package integTest
24

35
import (
@@ -18,22 +20,21 @@ import (
1820
)
1921

2022
const (
21-
//otelSpanFile = "/tmp/spans.json"
2223
otelSpanFlushIntervalMs = 100
2324
otelSpanTimeoutMs = 50000
2425
validEndpointTraces = "/tmp/spans.json"
2526
validFileEndpointTraces = "file://" + validEndpointTraces
2627
validEndpointMetrics = "https://valid-endpoint/v1/metrics"
2728
)
2829

29-
type OpenTelemetryIntegrationSuite struct {
30+
type OpenTelemetryTestSuite struct {
3031
GlideTestSuite
3132
}
3233

3334
// OpenTelemetry standalone tests
3435

3536
// SetupSuite runs once before all tests
36-
func (suite *OpenTelemetryIntegrationSuite) SetupSuite() {
37+
func (suite *OpenTelemetryTestSuite) SetupSuite() {
3738
// One-time setup for all tests
3839
suite.GlideTestSuite.SetupSuite()
3940
WrongOpenTelemetryConfig(suite)
@@ -53,12 +54,12 @@ func (suite *OpenTelemetryIntegrationSuite) SetupSuite() {
5354
}
5455

5556
// TearDownSuite runs once after all tests
56-
func (suite *OpenTelemetryIntegrationSuite) TearDownSuite() {
57+
func (suite *OpenTelemetryTestSuite) TearDownSuite() {
5758
// One-time cleanup for all tests
5859
suite.GlideTestSuite.TearDownSuite()
5960
}
6061

61-
func (suite *OpenTelemetryIntegrationSuite) TearDownTest() {
62+
func (suite *OpenTelemetryTestSuite) TearDownTest() {
6263
time.Sleep(100 * time.Millisecond)
6364
// Remove the span file if it exists
6465
if _, err := os.Stat(validEndpointTraces); err == nil {
@@ -70,7 +71,7 @@ func (suite *OpenTelemetryIntegrationSuite) TearDownTest() {
7071
suite.GlideTestSuite.TearDownTest()
7172
}
7273

73-
func WrongOpenTelemetryConfig(suite *OpenTelemetryIntegrationSuite) {
74+
func WrongOpenTelemetryConfig(suite *OpenTelemetryTestSuite) {
7475
// Test wrong traces endpoint
7576
cfg := api.OpenTelemetryConfig{
7677
Traces: &api.OpenTelemetryTracesConfig{
@@ -185,7 +186,7 @@ func readAndParseSpanFile(path string) (SpanFileData, error) {
185186
}, nil
186187
}
187188

188-
func (suite *OpenTelemetryIntegrationSuite) TestOpenTelemetry_AutomaticSpanLifecycle() {
189+
func (suite *OpenTelemetryTestSuite) TestOpenTelemetry_AutomaticSpanLifecycle() {
189190
suite.runWithSpecificClients(ClientTypeFlag(StandaloneFlag), func(client api.BaseClient) {
190191
// Force garbage collection
191192
runtime.GC()
@@ -213,7 +214,7 @@ func (suite *OpenTelemetryIntegrationSuite) TestOpenTelemetry_AutomaticSpanLifec
213214
})
214215
}
215216

216-
func (suite *OpenTelemetryIntegrationSuite) TestOpenTelemetry_GlobalConfigNotReinitialize() {
217+
func (suite *OpenTelemetryTestSuite) TestOpenTelemetry_GlobalConfigNotReinitialize() {
217218
suite.runWithSpecificClients(ClientTypeFlag(StandaloneFlag), func(client api.BaseClient) {
218219
// Try to initialize OpenTelemetry with wrong endpoint
219220
wrongConfig := api.OpenTelemetryConfig{
@@ -239,7 +240,7 @@ func (suite *OpenTelemetryIntegrationSuite) TestOpenTelemetry_GlobalConfigNotRei
239240
})
240241
}
241242

242-
func (suite *OpenTelemetryIntegrationSuite) TestOpenTelemetry_ConcurrentCommandsSpanLifecycle() {
243+
func (suite *OpenTelemetryTestSuite) TestOpenTelemetry_ConcurrentCommandsSpanLifecycle() {
243244
suite.runWithSpecificClients(ClientTypeFlag(StandaloneFlag), func(client api.BaseClient) {
244245
// Force garbage collection
245246
runtime.GC()
@@ -298,6 +299,152 @@ func (suite *OpenTelemetryIntegrationSuite) TestOpenTelemetry_ConcurrentCommands
298299
})
299300
}
300301

301-
func TestOpenTelemetryIntegrationSuite(t *testing.T) {
302-
suite.Run(t, new(OpenTelemetryIntegrationSuite))
302+
// cluster tests
303+
func (suite *OpenTelemetryTestSuite) TestOpenTelemetry_ClusterClientMemoryLeak() {
304+
suite.runWithSpecificClients(ClientTypeFlag(ClusterFlag), func(client api.BaseClient) {
305+
// Force garbage collection
306+
runtime.GC()
307+
308+
// Get initial memory stats
309+
var startMem runtime.MemStats
310+
runtime.ReadMemStats(&startMem)
311+
312+
// Execute multiple commands sequentially
313+
for i := 0; i < 100; i++ {
314+
key := fmt.Sprintf("test_key_%d", i)
315+
_, err := client.Set(context.Background(), key, fmt.Sprintf("value_%d", i))
316+
require.NoError(suite.T(), err)
317+
_, err = client.Get(context.Background(), key)
318+
require.NoError(suite.T(), err)
319+
}
320+
321+
// Force garbage collection again
322+
runtime.GC()
323+
324+
// Get final memory stats
325+
var endMem runtime.MemStats
326+
runtime.ReadMemStats(&endMem)
327+
328+
// Allow small fluctuations (10% increase)
329+
maxAllowedMemory := float64(startMem.HeapAlloc) * 1.1
330+
assert.Less(suite.T(), float64(endMem.HeapAlloc), maxAllowedMemory,
331+
"Memory usage should not increase significantly")
332+
})
333+
}
334+
335+
func (suite *OpenTelemetryTestSuite) TestOpenTelemetry_ClusterClientSamplingPercentage() {
336+
suite.runWithSpecificClients(ClientTypeFlag(ClusterFlag), func(client api.BaseClient) {
337+
// Set sampling percentage to 0
338+
err := api.GetInstance().SetSamplePercentage(0)
339+
require.NoError(suite.T(), err)
340+
assert.Equal(suite.T(), int32(0), api.GetInstance().GetSamplePercentage())
341+
342+
// Wait for any existing spans to be flushed
343+
time.Sleep(500 * time.Millisecond)
344+
345+
// Remove any existing span file
346+
if _, err := os.Stat(validEndpointTraces); err == nil {
347+
err = os.Remove(validEndpointTraces)
348+
require.NoError(suite.T(), err)
349+
}
350+
351+
// Execute commands with 0% sampling
352+
for i := 0; i < 100; i++ {
353+
_, err := client.Set(context.Background(), "GlideClusterClient_test_percentage_requests_config", "value")
354+
require.NoError(suite.T(), err)
355+
}
356+
357+
// Wait for spans to be flushed
358+
time.Sleep(500 * time.Millisecond)
359+
360+
// Verify no spans were exported
361+
_, err = os.Stat(validEndpointTraces)
362+
assert.True(suite.T(), os.IsNotExist(err), "Span file should not exist with 0% sampling")
363+
364+
// Set sampling percentage to 100
365+
err = api.GetInstance().SetSamplePercentage(100)
366+
require.NoError(suite.T(), err)
367+
368+
// Execute commands with 100% sampling
369+
for i := 0; i < 10; i++ {
370+
key := fmt.Sprintf("GlideClusterClient_test_percentage_requests_config_%d", i)
371+
_, err := client.Get(context.Background(), key)
372+
require.NoError(suite.T(), err)
373+
}
374+
375+
// Wait for spans to be flushed
376+
time.Sleep(5 * time.Second)
377+
378+
// Read and verify spans
379+
spans, err := readAndParseSpanFile(validEndpointTraces)
380+
require.NoError(suite.T(), err)
381+
382+
// Count Get spans
383+
getSpanCount := 0
384+
for _, name := range spans.SpanNames {
385+
if name == "GET" {
386+
getSpanCount++
387+
}
388+
}
389+
390+
// Verify we have exactly 10 Get spans
391+
assert.Equal(suite.T(), 10, getSpanCount, "Should have exactly 10 Get spans")
392+
})
393+
}
394+
395+
func (suite *OpenTelemetryTestSuite) TestOpenTelemetry_ClusterClientGlobalConfigNotReinitialize() {
396+
suite.runWithSpecificClients(ClientTypeFlag(ClusterFlag), func(client api.BaseClient) {
397+
// Try to initialize OpenTelemetry with wrong endpoint
398+
wrongConfig := api.OpenTelemetryConfig{
399+
Traces: &api.OpenTelemetryTracesConfig{
400+
Endpoint: "wrong.endpoint",
401+
SamplePercentage: 1,
402+
},
403+
}
404+
405+
// The init should not throw error because it can only be initialized once per process
406+
err := api.GetInstance().Init(wrongConfig)
407+
assert.NoError(suite.T(), err, "OpenTelemetry should not throw error on reinitialization")
408+
409+
// Execute a command to verify spans are still being exported
410+
_, err = client.Set(context.Background(), "GlideClusterClient_test_otel_global_config", "value")
411+
require.NoError(suite.T(), err)
412+
413+
// Wait for spans to be flushed
414+
time.Sleep(500 * time.Millisecond)
415+
416+
// Read spans to verify they're still being exported to the correct endpoint
417+
spans, err := readAndParseSpanFile(validEndpointTraces)
418+
require.NoError(suite.T(), err)
419+
assert.Contains(suite.T(), spans.SpanNames, "SET", "Should find SET span in exported spans")
420+
})
421+
}
422+
423+
func (suite *OpenTelemetryTestSuite) TestOpenTelemetry_ClusterClientMultipleClients() {
424+
suite.runWithSpecificClients(ClientTypeFlag(ClusterFlag), func(client1 api.BaseClient) {
425+
// Create a second client with the same configuration
426+
client2 := suite.clusterClient(suite.defaultClusterClientConfig())
427+
defer client2.Close()
428+
429+
// Execute commands with both clients
430+
_, err := client1.Set(context.Background(), "test_key", "value")
431+
require.NoError(suite.T(), err)
432+
_, err = client2.Get(context.Background(), "test_key")
433+
require.NoError(suite.T(), err)
434+
435+
// Wait for spans to be flushed
436+
time.Sleep(5 * time.Second)
437+
438+
// Read and verify spans
439+
spans, err := readAndParseSpanFile(validEndpointTraces)
440+
require.NoError(suite.T(), err)
441+
442+
// Verify both SET and GET spans exist
443+
assert.Contains(suite.T(), spans.SpanNames, "SET", "Should find SET span in exported spans")
444+
assert.Contains(suite.T(), spans.SpanNames, "GET", "Should find GET span in exported spans")
445+
})
446+
}
447+
448+
func TestOpenTelemetryTestSuite(t *testing.T) {
449+
suite.Run(t, new(OpenTelemetryTestSuite))
303450
}

go/opentelemetry.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -151,13 +151,11 @@ func (o *OpenTelemetry) SetSamplePercentage(percentage int32) error {
151151
}
152152

153153
// CreateSpan creates a new OpenTelemetry span with the given name and returns a pointer to the span.
154-
func (o *OpenTelemetry) CreateSpan(name string) uint64 {
155-
if !o.IsInitialized() || name == "" {
154+
func (o *OpenTelemetry) CreateSpan(requestType C.RequestType) uint64 {
155+
if !o.IsInitialized() {
156156
return 0
157157
}
158-
cName := C.CString(name)
159-
defer C.free(unsafe.Pointer(cName))
160-
return uint64(C.create_otel_span(cName))
158+
return uint64(C.create_otel_span(uint32(requestType)))
161159
}
162160

163161
// DropSpan drops an OpenTelemetry span given its pointer.

0 commit comments

Comments
 (0)