Skip to content

Commit 7622adc

Browse files
Migrates zipkin-aws to v2 SDK; add v2 sender/instrumentation (#237)
Ports collectors (SQS, Kinesis), module/, and aws-junit from AWS SDK v1 to v2 in-place, keeping package names. Adds new instrumentation-awssdk-sqs and sender-awssdk-kinesis modules. The module jar exclusively uses v2 SDK with zero com.amazonaws dependencies. Upgrades KCL to 3.4.1 and fixes all CVEs via jackson-bom overrides. Signed-off-by: Adrian Cole <adrian@tetrate.io>
1 parent e73aee8 commit 7622adc

File tree

41 files changed

+1203
-337
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1203
-337
lines changed

.dockerignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@
1414
!brave/instrumentation-aws-java-sdk-core/src/main/**
1515
!brave/instrumentation-aws-java-sdk-v2-core/src/main/**
1616
!brave/instrumentation-aws-java-sdk-sqs/src/main/**
17+
!brave/instrumentation-awssdk-sqs/src/main/**
1718
!brave/propagation-aws/src/main/**
1819
!module/src/main/**
1920
!reporter/reporter-xray-udp/src/main/**
2021
!reporter/sender-awssdk-sqs/src/main/**
22+
!reporter/sender-awssdk-kinesis/src/main/**
2123
!reporter/sender-kinesis/src/main/**
2224
!reporter/sender-sqs/src/main/**
2325
!storage/xray-udp/src/main/**

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ Instrumentation | Description
2626
[AWS SDK](brave/instrumentation-aws-java-sdk-core) | Traces [AmazonWebServiceClient](https://github.com/aws/aws-sdk-java)
2727
[AWS SDK V2](brave/instrumentation-aws-java-sdk-v2-core) | Traces [SdkClient](https://github.com/aws/aws-sdk-java-v2)
2828
[AWS SQS Messaging](brave/instrumentation-aws-java-sdk-sqs) | Traces [AmazonSQS](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/sqs/AmazonSQS.html)
29+
[AWS SQS V2 Messaging](brave/instrumentation-awssdk-sqs) | Traces [SqsClient](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/sqs/SqsClient.html)
2930

3031
We also have a [library to read Amazon's trace header](brave/propagation-aws).
3132

@@ -49,6 +50,7 @@ Sender | Description
4950
[SQS](reporter/sender-sqs) | Sends tracing data to Zipkin using [SQS](https://aws.amazon.com/sqs/), a message queue service.
5051
[SQS v2](reporter/sender-awssdk-sqs) | Sends tracing data to Zipkin using [SQS](https://aws.amazon.com/sqs/), a message queue service.
5152
[Kinesis](reporter/sender-kinesis) | Sends tracing data to Zipkin using [Kinesis](https://aws.amazon.com/kinesis/), an alternative similar to Kafka.
53+
[Kinesis v2](reporter/sender-awssdk-kinesis) | Sends tracing data to Zipkin using [Kinesis](https://aws.amazon.com/kinesis/) with the [V2 AWS SDK](https://github.com/aws/aws-sdk-java-v2).
5254

5355
## Collectors
5456
The component in a zipkin server that receives trace data is called a

aws-junit/pom.xml

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
<parent>
1313
<groupId>io.zipkin.aws</groupId>
1414
<artifactId>zipkin-aws-parent</artifactId>
15-
<version>1.4.1-SNAPSHOT</version>
15+
<version>2.0.0-SNAPSHOT</version>
1616
</parent>
1717

1818
<artifactId>zipkin-aws-junit</artifactId>
@@ -56,21 +56,14 @@
5656
</dependency>
5757

5858
<dependency>
59-
<groupId>com.amazonaws</groupId>
60-
<artifactId>aws-java-sdk-sqs</artifactId>
61-
<version>${aws-java-sdk.version}</version>
62-
<exclusions>
63-
<exclusion>
64-
<groupId>com.fasterxml.jackson.core</groupId>
65-
<artifactId>jackson-core</artifactId>
66-
</exclusion>
67-
</exclusions>
59+
<groupId>software.amazon.awssdk</groupId>
60+
<artifactId>sqs</artifactId>
61+
<version>${sdk-core.version}</version>
6862
</dependency>
69-
7063
<dependency>
71-
<groupId>com.fasterxml.jackson.core</groupId>
72-
<artifactId>jackson-core</artifactId>
73-
<version>${jackson.version}</version>
64+
<groupId>software.amazon.awssdk</groupId>
65+
<artifactId>url-connection-client</artifactId>
66+
<version>${sdk-core.version}</version>
7467
</dependency>
7568

7669
<dependency>

aws-junit/src/main/java/zipkin2/junit/aws/AmazonSQSExtension.java

Lines changed: 49 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,8 @@
44
*/
55
package zipkin2.junit.aws;
66

7-
import com.amazonaws.auth.AWSStaticCredentialsProvider;
8-
import com.amazonaws.auth.BasicAWSCredentials;
9-
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
10-
import com.amazonaws.services.sqs.AmazonSQS;
11-
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
12-
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
13-
import com.amazonaws.services.sqs.model.Message;
14-
import com.amazonaws.services.sqs.model.PurgeQueueRequest;
15-
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
16-
import com.amazonaws.services.sqs.model.SendMessageRequest;
17-
import com.amazonaws.util.Base64;
7+
import java.net.URI;
8+
import java.util.Base64;
189
import java.util.List;
1910
import java.util.stream.Collectors;
2011
import java.util.stream.Stream;
@@ -24,6 +15,18 @@
2415
import org.junit.jupiter.api.extension.AfterEachCallback;
2516
import org.junit.jupiter.api.extension.BeforeEachCallback;
2617
import org.junit.jupiter.api.extension.ExtensionContext;
18+
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
19+
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
20+
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
21+
import software.amazon.awssdk.regions.Region;
22+
import software.amazon.awssdk.services.sqs.SqsClient;
23+
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
24+
import software.amazon.awssdk.services.sqs.model.Message;
25+
import software.amazon.awssdk.services.sqs.model.PurgeQueueRequest;
26+
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
27+
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
28+
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
29+
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
2730
import zipkin2.Span;
2831
import zipkin2.codec.SpanBytesDecoder;
2932

@@ -33,7 +36,7 @@
3336
public class AmazonSQSExtension implements BeforeEachCallback, AfterEachCallback {
3437
SQSRestServer server;
3538
int serverPort;
36-
AmazonSQS client;
39+
SqsClient client;
3740
String queueUrl;
3841

3942
public AmazonSQSExtension() {
@@ -47,22 +50,24 @@ public AmazonSQSExtension() {
4750
}
4851

4952
if (client == null) {
50-
client = AmazonSQSClientBuilder.standard()
51-
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x")))
52-
.withEndpointConfiguration(
53-
new EndpointConfiguration("http://localhost:%d".formatted(serverPort), null))
53+
client = SqsClient.builder()
54+
.httpClient(UrlConnectionHttpClient.create())
55+
.credentialsProvider(
56+
StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x")))
57+
.endpointOverride(URI.create("http://localhost:%d".formatted(serverPort)))
58+
.region(Region.US_EAST_1)
5459
.build();
55-
queueUrl = client.createQueue("zipkin").getQueueUrl();
60+
queueUrl = client.createQueue(b -> b.queueName("zipkin")).queueUrl();
5661
}
57-
62+
5863
if (client != null && queueUrl != null) {
59-
client.purgeQueue(new PurgeQueueRequest(queueUrl));
64+
client.purgeQueue(PurgeQueueRequest.builder().queueUrl(queueUrl).build());
6065
}
6166
}
6267

6368
@Override public void afterEach(ExtensionContext extensionContext) {
6469
if (client != null) {
65-
client.shutdown();
70+
client.close();
6671
client = null;
6772
}
6873

@@ -77,18 +82,19 @@ public String queueUrl() {
7782
}
7883

7984
public int queueCount() {
80-
String count = client.getQueueAttributes(queueUrl, singletonList("ApproximateNumberOfMessages"))
81-
.getAttributes()
82-
.get("ApproximateNumberOfMessages");
85+
String count = client.getQueueAttributes(b -> b.queueUrl(queueUrl)
86+
.attributeNames(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES))
87+
.attributes()
88+
.get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES);
8389

8490
return Integer.parseInt(count);
8591
}
8692

8793
public int notVisibleCount() {
88-
String count =
89-
client.getQueueAttributes(queueUrl, singletonList("ApproximateNumberOfMessagesNotVisible"))
90-
.getAttributes()
91-
.get("ApproximateNumberOfMessagesNotVisible");
94+
String count = client.getQueueAttributes(b -> b.queueUrl(queueUrl)
95+
.attributeNames(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_NOT_VISIBLE))
96+
.attributes()
97+
.get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_NOT_VISIBLE);
9298

9399
return Integer.parseInt(count);
94100
}
@@ -101,19 +107,24 @@ public List<Span> getSpans(boolean delete) {
101107

102108
Stream<Span> spans = Stream.empty();
103109

104-
ReceiveMessageResult result = client.receiveMessage(queueUrl);
110+
ReceiveMessageResponse result = client.receiveMessage(
111+
ReceiveMessageRequest.builder().queueUrl(queueUrl).build());
105112

106-
while (result != null && !result.getMessages().isEmpty()) {
113+
while (result != null && !result.messages().isEmpty()) {
107114

108115
spans = Stream.concat(spans,
109-
result.getMessages().stream().flatMap(AmazonSQSExtension::decodeSpans));
116+
result.messages().stream().flatMap(AmazonSQSExtension::decodeSpans));
110117

111-
result = client.receiveMessage(queueUrl);
118+
result = client.receiveMessage(
119+
ReceiveMessageRequest.builder().queueUrl(queueUrl).build());
112120

113121
if (delete) {
114-
List<DeleteMessageRequest> deletes = result.getMessages()
122+
List<DeleteMessageRequest> deletes = result.messages()
115123
.stream()
116-
.map(m -> new DeleteMessageRequest(queueUrl, m.getReceiptHandle()))
124+
.map(m -> DeleteMessageRequest.builder()
125+
.queueUrl(queueUrl)
126+
.receiptHandle(m.receiptHandle())
127+
.build())
117128
.toList();
118129
deletes.forEach(d -> client.deleteMessage(d));
119130
}
@@ -123,12 +134,15 @@ public List<Span> getSpans(boolean delete) {
123134
}
124135

125136
public void send(String body) {
126-
client.sendMessage(new SendMessageRequest(queueUrl, body));
137+
client.sendMessage(SendMessageRequest.builder()
138+
.queueUrl(queueUrl)
139+
.messageBody(body)
140+
.build());
127141
}
128142

129143
static Stream<? extends Span> decodeSpans(Message m) {
130144
byte[] bytes =
131-
m.getBody().charAt(0) == '[' ? m.getBody().getBytes(UTF_8) : Base64.decode(m.getBody());
145+
m.body().charAt(0) == '[' ? m.body().getBytes(UTF_8) : Base64.getDecoder().decode(m.body());
132146
if (bytes[0] == '[') {
133147
return SpanBytesDecoder.JSON_V2.decodeList(bytes).stream();
134148
}

brave/instrumentation-aws-java-sdk-core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
<parent>
1212
<artifactId>zipkin-aws-parent</artifactId>
1313
<groupId>io.zipkin.aws</groupId>
14-
<version>1.4.1-SNAPSHOT</version>
14+
<version>2.0.0-SNAPSHOT</version>
1515
<relativePath>../../pom.xml</relativePath>
1616
</parent>
1717

brave/instrumentation-aws-java-sdk-sqs/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
<parent>
1212
<artifactId>zipkin-aws-parent</artifactId>
1313
<groupId>io.zipkin.aws</groupId>
14-
<version>1.4.1-SNAPSHOT</version>
14+
<version>2.0.0-SNAPSHOT</version>
1515
<relativePath>../../pom.xml</relativePath>
1616
</parent>
1717

brave/instrumentation-aws-java-sdk-v2-core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
<parent>
1212
<artifactId>zipkin-aws-parent</artifactId>
1313
<groupId>io.zipkin.aws</groupId>
14-
<version>1.4.1-SNAPSHOT</version>
14+
<version>2.0.0-SNAPSHOT</version>
1515
<relativePath>../../pom.xml</relativePath>
1616
</parent>
1717

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# AWS SQS V2 Messaging Instrumentation
2+
3+
This module contains instrumentation for AWS SDK V2 `SqsClient` `SendMessage*` request types.
4+
5+
The `SqsMessageTracing` type provides an `ExecutionInterceptor` instance that can be added to your
6+
clients at build time. It adds tracing headers to both `SendMessageRequest` and
7+
`SendMessageBatchRequest` types. In the case of `SendMessageBatchRequest` each message in the batch
8+
gets its own `Span` and headers added.
9+
10+
## Usage
11+
12+
You will want to create your SQS client and add the execution interceptor:
13+
14+
```java
15+
Tracing tracing = ...;
16+
SqsMessageTracing sqsMessageTracing = SqsMessageTracing.create(tracing);
17+
18+
ClientOverrideConfiguration configuration = ClientOverrideConfiguration.builder()
19+
.addExecutionInterceptor(sqsMessageTracing.executionInterceptor())
20+
.build();
21+
22+
SqsClient client = SqsClient.builder()
23+
.overrideConfiguration(configuration)
24+
.build();
25+
```
26+
27+
Now use your SQS client as you normally would and spans will be attached to outgoing messages.
28+
29+
## Notes
30+
31+
* This is the V2 SDK equivalent of [instrumentation-aws-java-sdk-sqs](../instrumentation-aws-java-sdk-sqs).
32+
* This can be combined with [instrumentation-aws-java-sdk-v2-core](../instrumentation-aws-java-sdk-v2-core) on the same client to get both HTTP-level CLIENT spans and message-level PRODUCER spans.
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
Copyright The OpenZipkin Authors
5+
SPDX-License-Identifier: Apache-2.0
6+
7+
-->
8+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
9+
<modelVersion>4.0.0</modelVersion>
10+
11+
<parent>
12+
<artifactId>zipkin-aws-parent</artifactId>
13+
<groupId>io.zipkin.aws</groupId>
14+
<version>2.0.0-SNAPSHOT</version>
15+
<relativePath>../../pom.xml</relativePath>
16+
</parent>
17+
18+
<artifactId>brave-instrumentation-awssdk-sqs</artifactId>
19+
<name>Brave Instrumentation: AWS SDK V2 SQS</name>
20+
21+
<properties>
22+
<main.basedir>${project.basedir}/../..</main.basedir>
23+
</properties>
24+
25+
<dependencies>
26+
<dependency>
27+
<groupId>io.zipkin.brave</groupId>
28+
<artifactId>brave</artifactId>
29+
<version>${brave.version}</version>
30+
</dependency>
31+
<dependency>
32+
<groupId>software.amazon.awssdk</groupId>
33+
<artifactId>sqs</artifactId>
34+
<version>${sdk-core.version}</version>
35+
<scope>provided</scope>
36+
</dependency>
37+
38+
<dependency>
39+
<groupId>io.zipkin.brave</groupId>
40+
<artifactId>brave-context-log4j2</artifactId>
41+
<version>${brave.version}</version>
42+
<scope>test</scope>
43+
</dependency>
44+
<dependency>
45+
<groupId>io.zipkin.brave</groupId>
46+
<artifactId>brave-tests</artifactId>
47+
<version>${brave.version}</version>
48+
<scope>test</scope>
49+
</dependency>
50+
</dependencies>
51+
</project>

0 commit comments

Comments
 (0)