Skip to content

Commit f38fc8a

Browse files
committed
fix OpenTelemetry integration test
The commit introduces fixes in Open Telemetry integration test. Added conditional variable to SpansCollector. It waits until all spans end.
1 parent 9574ef7 commit f38fc8a

File tree

1 file changed

+79
-49
lines changed

1 file changed

+79
-49
lines changed

driver-opentelemetry/src/test/java/com/datastax/driver/opentelemetry/OpenTelemetryTest.java

Lines changed: 79 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,10 @@
3737
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes;
3838
import java.util.ArrayList;
3939
import java.util.Collection;
40-
import java.util.Collections;
4140
import java.util.List;
41+
import java.util.concurrent.locks.Condition;
42+
import java.util.concurrent.locks.Lock;
43+
import java.util.concurrent.locks.ReentrantLock;
4244
import java.util.function.BiConsumer;
4345
import java.util.stream.Collectors;
4446
import org.testng.annotations.Test;
@@ -47,14 +49,22 @@
4749
public class OpenTelemetryTest extends CCMTestsSupport {
4850
/** Collects and saves spans. */
4951
private static final class SpansCollector implements SpanProcessor {
50-
final Collection<ReadableSpan> startedSpans =
51-
Collections.synchronizedList(new ArrayList<ReadableSpan>());
52-
final Collection<ReadableSpan> spans =
53-
Collections.synchronizedList(new ArrayList<ReadableSpan>());
52+
final Lock lock = new ReentrantLock();
53+
final Condition allEnded = lock.newCondition();
54+
55+
final Collection<ReadableSpan> startedSpans = new ArrayList<>();
56+
final Collection<ReadableSpan> spans = new ArrayList<>();
57+
58+
int activeSpans = 0;
5459

5560
@Override
5661
public void onStart(Context parentContext, ReadWriteSpan span) {
62+
lock.lock();
63+
5764
startedSpans.add(span);
65+
++activeSpans;
66+
67+
lock.unlock();
5868
}
5969

6070
@Override
@@ -64,7 +74,14 @@ public boolean isStartRequired() {
6474

6575
@Override
6676
public void onEnd(ReadableSpan span) {
77+
lock.lock();
78+
6779
spans.add(span);
80+
--activeSpans;
81+
82+
if (activeSpans == 0) allEnded.signal();
83+
84+
lock.unlock();
6885
}
6986

7087
@Override
@@ -73,8 +90,18 @@ public boolean isEndRequired() {
7390
}
7491

7592
public Collection<ReadableSpan> getSpans() {
76-
for (ReadableSpan span : startedSpans) {
77-
assertTrue(span.hasEnded());
93+
lock.lock();
94+
95+
try {
96+
while (activeSpans > 0) allEnded.await();
97+
98+
for (ReadableSpan span : startedSpans) {
99+
assertTrue(span.hasEnded());
100+
}
101+
} catch (InterruptedException e) {
102+
assert false;
103+
} finally {
104+
lock.unlock();
78105
}
79106

80107
return spans;
@@ -102,7 +129,7 @@ private Collection<ReadableSpan> collectSpans(BiConsumer<Tracer, TracingInfoFact
102129
.setResource(Resource.getDefault().merge(serviceNameResource))
103130
.build();
104131
final OpenTelemetrySdk openTelemetry =
105-
OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).buildAndRegisterGlobal();
132+
OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build();
106133

107134
final Tracer tracer = openTelemetry.getTracerProvider().get("this");
108135
final OpenTelemetryTracingInfoFactory tracingInfoFactory =
@@ -111,6 +138,7 @@ private Collection<ReadableSpan> collectSpans(BiConsumer<Tracer, TracingInfoFact
111138
session = cluster().connect();
112139

113140
session.execute("USE " + keyspace);
141+
session.execute("DROP TABLE IF EXISTS t");
114142
session.execute("CREATE TABLE t (k int PRIMARY KEY, v int)");
115143
collector.getSpans().clear();
116144

@@ -125,47 +153,49 @@ private Collection<ReadableSpan> collectSpans(BiConsumer<Tracer, TracingInfoFact
125153
/** Basic test for creating spans. */
126154
@Test(groups = "short")
127155
public void simpleTracingTest() {
128-
final Collection<ReadableSpan> spans =
129-
collectSpans(
130-
(tracer, tracingInfoFactory) -> {
131-
Span userSpan = tracer.spanBuilder("user span").startSpan();
132-
Scope scope = userSpan.makeCurrent();
133-
134-
session.execute("INSERT INTO t(k, v) VALUES (4, 2)");
135-
session.execute("INSERT INTO t(k, v) VALUES (2, 1)");
136-
137-
scope.close();
138-
userSpan.end();
139-
});
140-
141-
// Retrieve span created directly by tracer.
142-
final List<ReadableSpan> userSpans =
143-
spans.stream()
144-
.filter(span -> !span.getParentSpanContext().isValid())
145-
.collect(Collectors.toList());
146-
assertEquals(userSpans.size(), 1);
147-
final ReadableSpan userSpan = userSpans.get(0);
148-
149-
for (ReadableSpan span : spans) {
150-
assertTrue(span.getSpanContext().isValid());
151-
assertTrue(
152-
span.getSpanContext().equals(userSpan.getSpanContext())
153-
|| span.getParentSpanContext().isValid());
154-
}
156+
for (int i = 0; i < 1500; i++) {
157+
final Collection<ReadableSpan> spans =
158+
collectSpans(
159+
(tracer, tracingInfoFactory) -> {
160+
Span userSpan = tracer.spanBuilder("user span").startSpan();
161+
Scope scope = userSpan.makeCurrent();
162+
163+
session.execute("INSERT INTO t(k, v) VALUES (4, 2)");
164+
session.execute("INSERT INTO t(k, v) VALUES (2, 1)");
165+
166+
scope.close();
167+
userSpan.end();
168+
});
169+
170+
// Retrieve span created directly by tracer.
171+
final List<ReadableSpan> userSpans =
172+
spans.stream()
173+
.filter(span -> !span.getParentSpanContext().isValid())
174+
.collect(Collectors.toList());
175+
assertEquals(userSpans.size(), 1);
176+
final ReadableSpan userSpan = userSpans.get(0);
177+
178+
for (ReadableSpan span : spans) {
179+
assertTrue(span.getSpanContext().isValid());
180+
assertTrue(
181+
span.getSpanContext().equals(userSpan.getSpanContext())
182+
|| span.getParentSpanContext().isValid());
183+
}
155184

156-
// Retrieve spans representing requests.
157-
final Collection<ReadableSpan> rootSpans =
158-
spans.stream()
159-
.filter(span -> span.getParentSpanContext().equals(userSpan.getSpanContext()))
160-
.collect(Collectors.toList());
161-
assertEquals(rootSpans.size(), 2);
162-
163-
rootSpans.stream()
164-
.map(ReadableSpan::toSpanData)
165-
.forEach(
166-
spanData -> {
167-
assertEquals(spanData.getName(), "request");
168-
assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK);
169-
});
185+
// Retrieve spans representing requests.
186+
final Collection<ReadableSpan> rootSpans =
187+
spans.stream()
188+
.filter(span -> span.getParentSpanContext().equals(userSpan.getSpanContext()))
189+
.collect(Collectors.toList());
190+
assertEquals(rootSpans.size(), 2);
191+
192+
rootSpans.stream()
193+
.map(ReadableSpan::toSpanData)
194+
.forEach(
195+
spanData -> {
196+
assertEquals(spanData.getName(), "request");
197+
assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK);
198+
});
199+
}
170200
}
171201
}

0 commit comments

Comments
 (0)