Skip to content

Commit f4b2334

Browse files
Add support for external queues that come from a different
region or a different account GitOrigin-RevId: 66f01777c52a8747b544e0537bb9920bcdca77d9
1 parent 57a263a commit f4b2334

19 files changed

+354
-154
lines changed

Diff for: misk-aws2-sqs/README.md

-7
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,7 @@ The module will not be considered beta/GA state until the below items are comple
7979
8080
Outstanding work that needs to be done:
8181
* detailed test
82-
* tracing
83-
* external queues
8482
* detailed documentation
8583
86-
Things that are supported in the old documentation but are questionable:
87-
* aws queue attribute importer
88-
* delayed backoff - old implementation does not take into account the original visibility timeout,
89-
only count of retries
90-
9184
Outstanding things to document:
9285
* how batch size plays out with channel size and visibility timeout

Diff for: misk-aws2-sqs/api/misk-aws2-sqs.api

+31-20
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@ public final class misk/aws2/sqs/jobqueue/DefaultDeadLetterQueueProvider : misk/
1313
public fun deadLetterQueueFor (Lmisk/jobqueue/QueueName;)Lmisk/jobqueue/QueueName;
1414
}
1515

16-
public final class misk/aws2/sqs/jobqueue/QueueResolver {
17-
public fun <init> (Lsoftware/amazon/awssdk/services/sqs/SqsAsyncClient;)V
18-
public final fun getQueueUrl (Lmisk/jobqueue/QueueName;)Ljava/lang/String;
16+
public final class misk/aws2/sqs/jobqueue/SqsClientFactory {
17+
public fun <init> (Lsoftware/amazon/awssdk/auth/credentials/AwsCredentialsProvider;)V
18+
public fun <init> (Lsoftware/amazon/awssdk/auth/credentials/AwsCredentialsProvider;Lkotlin/jvm/functions/Function1;)V
19+
public synthetic fun <init> (Lsoftware/amazon/awssdk/auth/credentials/AwsCredentialsProvider;Lkotlin/jvm/functions/Function1;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
20+
public final fun get (Ljava/lang/String;)Lsoftware/amazon/awssdk/services/sqs/SqsAsyncClient;
1921
}
2022

2123
public final class misk/aws2/sqs/jobqueue/SqsJob : misk/jobqueue/v2/Job {
@@ -40,7 +42,7 @@ public final class misk/aws2/sqs/jobqueue/SqsJob$Companion {
4042

4143
public final class misk/aws2/sqs/jobqueue/SqsJobConsumer : com/google/common/util/concurrent/AbstractService, misk/jobqueue/v2/JobConsumer {
4244
public static final field Companion Lmisk/aws2/sqs/jobqueue/SqsJobConsumer$Companion;
43-
public fun <init> (Lsoftware/amazon/awssdk/services/sqs/SqsAsyncClient;Lmisk/aws2/sqs/jobqueue/QueueResolver;Lmisk/aws2/sqs/jobqueue/VisibilityTimeoutCalculator;Lcom/squareup/moshi/Moshi;Lmisk/aws2/sqs/jobqueue/DeadLetterQueueProvider;Lmisk/aws2/sqs/jobqueue/SqsMetrics;Ljava/time/Clock;Lio/opentracing/Tracer;)V
45+
public fun <init> (Lmisk/aws2/sqs/jobqueue/SqsClientFactory;Lmisk/aws2/sqs/jobqueue/SqsQueueResolver;Lmisk/aws2/sqs/jobqueue/VisibilityTimeoutCalculator;Lcom/squareup/moshi/Moshi;Lmisk/aws2/sqs/jobqueue/DeadLetterQueueProvider;Lmisk/aws2/sqs/jobqueue/SqsMetrics;Ljava/time/Clock;Lio/opentracing/Tracer;)V
4446
public fun subscribe (Lmisk/jobqueue/QueueName;Lmisk/jobqueue/v2/JobHandler;)V
4547
public final fun subscribe (Lmisk/jobqueue/QueueName;Lmisk/jobqueue/v2/JobHandler;Lmisk/aws2/sqs/jobqueue/config/SqsQueueConfig;)V
4648
public fun unsubscribe (Lmisk/jobqueue/QueueName;)V
@@ -51,28 +53,28 @@ public final class misk/aws2/sqs/jobqueue/SqsJobConsumer$Companion {
5153
}
5254

5355
public final class misk/aws2/sqs/jobqueue/SqsJobEnqueuer : misk/jobqueue/v2/JobEnqueuer {
54-
public fun <init> (Lsoftware/amazon/awssdk/services/sqs/SqsAsyncClient;Lmisk/aws2/sqs/jobqueue/QueueResolver;Lwisp/token/TokenGenerator;Lmisk/aws2/sqs/jobqueue/SqsMetrics;Lcom/squareup/moshi/Moshi;Lio/opentracing/Tracer;)V
56+
public fun <init> (Lmisk/aws2/sqs/jobqueue/SqsClientFactory;Lmisk/aws2/sqs/jobqueue/config/SqsConfig;Lmisk/aws2/sqs/jobqueue/SqsQueueResolver;Lwisp/token/TokenGenerator;Lmisk/aws2/sqs/jobqueue/SqsMetrics;Lcom/squareup/moshi/Moshi;Lio/opentracing/Tracer;Ljava/time/Clock;)V
5557
public fun enqueue (Lmisk/jobqueue/QueueName;Ljava/lang/String;Ljava/lang/String;Ljava/time/Duration;Ljava/util/Map;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
5658
public fun enqueueAsync (Lmisk/jobqueue/QueueName;Ljava/lang/String;Ljava/lang/String;Ljava/time/Duration;Ljava/util/Map;)Ljava/util/concurrent/CompletableFuture;
5759
public fun enqueueBlocking (Lmisk/jobqueue/QueueName;Ljava/lang/String;Ljava/lang/String;Ljava/time/Duration;Ljava/util/Map;)V
5860
}
5961

6062
public final class misk/aws2/sqs/jobqueue/SqsJobHandlerModule : misk/inject/KAbstractModule {
6163
public static final field Companion Lmisk/aws2/sqs/jobqueue/SqsJobHandlerModule$Companion;
62-
public synthetic fun <init> (Lmisk/aws2/sqs/jobqueue/config/SqsConfig;Lmisk/jobqueue/QueueName;Lkotlin/reflect/KClass;Lkotlin/jvm/internal/DefaultConstructorMarker;)V
64+
public synthetic fun <init> (Lmisk/jobqueue/QueueName;Lkotlin/reflect/KClass;Lkotlin/jvm/internal/DefaultConstructorMarker;)V
6365
}
6466

6567
public final class misk/aws2/sqs/jobqueue/SqsJobHandlerModule$Companion {
66-
public final fun create (Lmisk/jobqueue/QueueName;Lkotlin/reflect/KClass;Lmisk/aws2/sqs/jobqueue/config/SqsConfig;)Lmisk/aws2/sqs/jobqueue/SqsJobHandlerModule;
67-
public static synthetic fun create$default (Lmisk/aws2/sqs/jobqueue/SqsJobHandlerModule$Companion;Lmisk/jobqueue/QueueName;Lkotlin/reflect/KClass;Lmisk/aws2/sqs/jobqueue/config/SqsConfig;ILjava/lang/Object;)Lmisk/aws2/sqs/jobqueue/SqsJobHandlerModule;
68+
public final fun create (Lmisk/jobqueue/QueueName;Lkotlin/reflect/KClass;)Lmisk/aws2/sqs/jobqueue/SqsJobHandlerModule;
6869
}
6970

7071
public class misk/aws2/sqs/jobqueue/SqsJobQueueModule : misk/inject/KAbstractModule {
71-
public fun <init> ()V
72-
public fun <init> (Lkotlin/jvm/functions/Function1;)V
73-
public synthetic fun <init> (Lkotlin/jvm/functions/Function1;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
72+
public fun <init> (Lmisk/aws2/sqs/jobqueue/config/SqsConfig;)V
73+
public fun <init> (Lmisk/aws2/sqs/jobqueue/config/SqsConfig;Lkotlin/jvm/functions/Function1;)V
74+
public synthetic fun <init> (Lmisk/aws2/sqs/jobqueue/config/SqsConfig;Lkotlin/jvm/functions/Function1;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
7475
protected fun configure ()V
75-
public final fun sqsAsyncClient (Lsoftware/amazon/awssdk/auth/credentials/AwsCredentialsProvider;Lmisk/cloud/aws/AwsRegion;)Lsoftware/amazon/awssdk/services/sqs/SqsAsyncClient;
76+
public final fun sqsClientClientFactory (Lsoftware/amazon/awssdk/auth/credentials/AwsCredentialsProvider;)Lmisk/aws2/sqs/jobqueue/SqsClientFactory;
77+
public final fun sqsConfig (Lmisk/cloud/aws/AwsRegion;)Lmisk/aws2/sqs/jobqueue/config/SqsConfig;
7678
}
7779

7880
public final class misk/aws2/sqs/jobqueue/SqsMetrics {
@@ -83,6 +85,7 @@ public final class misk/aws2/sqs/jobqueue/SqsMetrics {
8385
public final fun getJobsAcknowledged ()Lio/prometheus/client/Counter;
8486
public final fun getJobsDeadLettered ()Lio/prometheus/client/Counter;
8587
public final fun getJobsEnqueued ()Lio/prometheus/client/Counter;
88+
public final fun getJobsFailedToAcknowledge ()Lio/prometheus/client/Counter;
8689
public final fun getJobsReceived ()Lio/prometheus/client/Counter;
8790
public final fun getQueueFirstProcessingLag ()Lio/prometheus/client/Histogram;
8891
public final fun getQueueProcessingLag ()Lio/prometheus/client/Histogram;
@@ -92,13 +95,19 @@ public final class misk/aws2/sqs/jobqueue/SqsMetrics {
9295
public final fun getVisibilityTime ()Lio/prometheus/client/Histogram;
9396
}
9497

98+
public final class misk/aws2/sqs/jobqueue/SqsQueueResolver {
99+
public fun <init> (Lmisk/aws2/sqs/jobqueue/SqsClientFactory;Lmisk/aws2/sqs/jobqueue/config/SqsConfig;)V
100+
public final fun getQueueUrl (Lmisk/jobqueue/QueueName;)Ljava/lang/String;
101+
}
102+
95103
public final class misk/aws2/sqs/jobqueue/StaticDeadLetterQueueProvider : misk/aws2/sqs/jobqueue/DeadLetterQueueProvider {
96104
public fun <init> (Ljava/lang/String;)V
97105
public fun deadLetterQueueFor (Lmisk/jobqueue/QueueName;)Lmisk/jobqueue/QueueName;
98106
}
99107

100108
public final class misk/aws2/sqs/jobqueue/Subscriber {
101-
public fun <init> (Lmisk/jobqueue/QueueName;Lmisk/aws2/sqs/jobqueue/config/SqsQueueConfig;Lmisk/jobqueue/QueueName;Lmisk/jobqueue/v2/JobHandler;Lkotlinx/coroutines/channels/Channel;Lsoftware/amazon/awssdk/services/sqs/SqsAsyncClient;Lmisk/aws2/sqs/jobqueue/QueueResolver;Lmisk/aws2/sqs/jobqueue/SqsMetrics;Lcom/squareup/moshi/Moshi;Ljava/time/Clock;Lio/opentracing/Tracer;Lmisk/aws2/sqs/jobqueue/VisibilityTimeoutCalculator;)V
109+
public static final field Companion Lmisk/aws2/sqs/jobqueue/Subscriber$Companion;
110+
public fun <init> (Lmisk/jobqueue/QueueName;Lmisk/aws2/sqs/jobqueue/config/SqsQueueConfig;Lmisk/jobqueue/QueueName;Lmisk/jobqueue/v2/JobHandler;Lkotlinx/coroutines/channels/Channel;Lsoftware/amazon/awssdk/services/sqs/SqsAsyncClient;Lmisk/aws2/sqs/jobqueue/SqsQueueResolver;Lmisk/aws2/sqs/jobqueue/SqsMetrics;Lcom/squareup/moshi/Moshi;Ljava/time/Clock;Lio/opentracing/Tracer;Lmisk/aws2/sqs/jobqueue/VisibilityTimeoutCalculator;)V
102111
public final fun getChannel ()Lkotlinx/coroutines/channels/Channel;
103112
public final fun getClient ()Lsoftware/amazon/awssdk/services/sqs/SqsAsyncClient;
104113
public final fun getClock ()Ljava/time/Clock;
@@ -107,14 +116,18 @@ public final class misk/aws2/sqs/jobqueue/Subscriber {
107116
public final fun getMoshi ()Lcom/squareup/moshi/Moshi;
108117
public final fun getQueueConfig ()Lmisk/aws2/sqs/jobqueue/config/SqsQueueConfig;
109118
public final fun getQueueName ()Lmisk/jobqueue/QueueName;
110-
public final fun getQueueResolver ()Lmisk/aws2/sqs/jobqueue/QueueResolver;
111119
public final fun getSqsMetrics ()Lmisk/aws2/sqs/jobqueue/SqsMetrics;
120+
public final fun getSqsQueueResolver ()Lmisk/aws2/sqs/jobqueue/SqsQueueResolver;
112121
public final fun getTracer ()Lio/opentracing/Tracer;
113122
public final fun getVisibilityTimeoutCalculator ()Lmisk/aws2/sqs/jobqueue/VisibilityTimeoutCalculator;
114123
public final fun poll (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
115124
public final fun run (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
116125
}
117126

127+
public final class misk/aws2/sqs/jobqueue/Subscriber$Companion {
128+
public final fun getLogger ()Lmu/KLogger;
129+
}
130+
118131
public final class misk/aws2/sqs/jobqueue/SubscriptionService : com/google/common/util/concurrent/AbstractIdleService {
119132
public static final field Companion Lmisk/aws2/sqs/jobqueue/SubscriptionService$Companion;
120133
public fun <init> (Lmisk/aws2/sqs/jobqueue/SqsJobConsumer;Ljava/util/Map;Lmisk/aws2/sqs/jobqueue/config/SqsConfig;)V
@@ -146,6 +159,7 @@ public final class misk/aws2/sqs/jobqueue/config/SqsConfig : wisp/config/Config
146159
public fun equals (Ljava/lang/Object;)Z
147160
public final fun getAll_queues ()Lmisk/aws2/sqs/jobqueue/config/SqsQueueConfig;
148161
public final fun getPer_queue_overrides ()Ljava/util/Map;
162+
public final fun getQueueConfig (Lmisk/jobqueue/QueueName;)Lmisk/aws2/sqs/jobqueue/config/SqsQueueConfig;
149163
public fun hashCode ()I
150164
public fun toString ()Ljava/lang/String;
151165
}
@@ -161,10 +175,8 @@ public final class misk/aws2/sqs/jobqueue/config/SqsQueueConfig {
161175
public fun <init> (IIIIZLjava/lang/Integer;Ljava/lang/Integer;)V
162176
public fun <init> (IIIIZLjava/lang/Integer;Ljava/lang/Integer;Ljava/lang/String;)V
163177
public fun <init> (IIIIZLjava/lang/Integer;Ljava/lang/Integer;Ljava/lang/String;Ljava/lang/String;)V
164-
public fun <init> (IIIIZLjava/lang/Integer;Ljava/lang/Integer;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)V
165-
public synthetic fun <init> (IIIIZLjava/lang/Integer;Ljava/lang/Integer;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
178+
public synthetic fun <init> (IIIIZLjava/lang/Integer;Ljava/lang/Integer;Ljava/lang/String;Ljava/lang/String;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
166179
public final fun component1 ()I
167-
public final fun component10 ()Ljava/lang/String;
168180
public final fun component2 ()I
169181
public final fun component3 ()I
170182
public final fun component4 ()I
@@ -173,16 +185,15 @@ public final class misk/aws2/sqs/jobqueue/config/SqsQueueConfig {
173185
public final fun component7 ()Ljava/lang/Integer;
174186
public final fun component8 ()Ljava/lang/String;
175187
public final fun component9 ()Ljava/lang/String;
176-
public final fun copy (IIIIZLjava/lang/Integer;Ljava/lang/Integer;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)Lmisk/aws2/sqs/jobqueue/config/SqsQueueConfig;
177-
public static synthetic fun copy$default (Lmisk/aws2/sqs/jobqueue/config/SqsQueueConfig;IIIIZLjava/lang/Integer;Ljava/lang/Integer;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;ILjava/lang/Object;)Lmisk/aws2/sqs/jobqueue/config/SqsQueueConfig;
188+
public final fun copy (IIIIZLjava/lang/Integer;Ljava/lang/Integer;Ljava/lang/String;Ljava/lang/String;)Lmisk/aws2/sqs/jobqueue/config/SqsQueueConfig;
189+
public static synthetic fun copy$default (Lmisk/aws2/sqs/jobqueue/config/SqsQueueConfig;IIIIZLjava/lang/Integer;Ljava/lang/Integer;Ljava/lang/String;Ljava/lang/String;ILjava/lang/Object;)Lmisk/aws2/sqs/jobqueue/config/SqsQueueConfig;
178190
public fun equals (Ljava/lang/Object;)Z
179191
public final fun getAccount_id ()Ljava/lang/String;
180192
public final fun getChannel_capacity ()I
181193
public final fun getConcurrency ()I
182194
public final fun getInstall_retry_queue ()Z
183195
public final fun getMax_number_of_messages ()I
184196
public final fun getParallelism ()I
185-
public final fun getQueue_name ()Ljava/lang/String;
186197
public final fun getRegion ()Ljava/lang/String;
187198
public final fun getVisibility_timeout ()Ljava/lang/Integer;
188199
public final fun getWait_timeout ()Ljava/lang/Integer;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package misk.aws2.sqs.jobqueue
2+
3+
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
4+
import software.amazon.awssdk.regions.Region
5+
import software.amazon.awssdk.services.sqs.SqsAsyncClient
6+
import software.amazon.awssdk.services.sqs.SqsAsyncClientBuilder
7+
import java.util.concurrent.ConcurrentHashMap
8+
9+
class SqsClientFactory @JvmOverloads constructor(
10+
private val credentialsProvider: AwsCredentialsProvider,
11+
private val configureClient: SqsAsyncClientBuilder.() -> Unit = {}
12+
) {
13+
private val clients = ConcurrentHashMap<String, SqsAsyncClient>()
14+
15+
fun get(region: String): SqsAsyncClient {
16+
return clients.computeIfAbsent(region) {
17+
val builder = SqsAsyncClient.builder()
18+
.credentialsProvider(credentialsProvider)
19+
.region(Region.of(region))
20+
21+
builder.configureClient()
22+
builder.build()
23+
}
24+
}
25+
}

Diff for: misk-aws2-sqs/src/main/kotlin/misk/aws2/sqs/jobqueue/SqsJobConsumer.kt

+20-23
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,10 @@ import kotlinx.coroutines.SupervisorJob
1111
import kotlinx.coroutines.cancel
1212
import kotlinx.coroutines.channels.Channel
1313
import kotlinx.coroutines.launch
14-
import kotlinx.coroutines.runBlocking
1514
import misk.aws2.sqs.jobqueue.config.SqsQueueConfig
1615
import misk.jobqueue.QueueName
1716
import misk.jobqueue.v2.JobConsumer
1817
import misk.jobqueue.v2.JobHandler
19-
import software.amazon.awssdk.services.sqs.SqsAsyncClient
2018
import wisp.logging.getLogger
2119
import java.time.Clock
2220
import java.util.concurrent.ConcurrentHashMap
@@ -44,8 +42,8 @@ import java.util.concurrent.ConcurrentHashMap
4442
*/
4543
@Singleton
4644
class SqsJobConsumer @Inject constructor(
47-
private val client: SqsAsyncClient,
48-
private val queueResolver: QueueResolver,
45+
private val sqsClientFactory: SqsClientFactory,
46+
private val sqsQueueResolver: SqsQueueResolver,
4947
private val visibilityTimeoutCalculator: VisibilityTimeoutCalculator,
5048
private val moshi: Moshi,
5149
private val dlqProvider: DeadLetterQueueProvider,
@@ -62,28 +60,27 @@ class SqsJobConsumer @Inject constructor(
6260
}
6361

6462
fun subscribe(queueName: QueueName, handler: JobHandler, queueConfig: SqsQueueConfig) {
65-
val subscriber = runBlocking {
66-
// We won't resolve dead letter queue yet to skip it for local development and testing
67-
val deadLetterQueueName = dlqProvider.deadLetterQueueFor(queueName)
63+
// We won't resolve dead letter queue yet to skip it for local development and testing
64+
val deadLetterQueueName = dlqProvider.deadLetterQueueFor(queueName)
6865

69-
Subscriber(
70-
queueName = queueName,
71-
queueConfig = queueConfig,
72-
deadLetterQueueName = deadLetterQueueName,
73-
handler = handler,
74-
channel = Channel(queueConfig.channel_capacity),
75-
client = client,
76-
queueResolver = queueResolver,
77-
sqsMetrics = sqsMetrics,
78-
moshi = moshi,
79-
clock = clock,
80-
tracer = tracer,
81-
visibilityTimeoutCalculator = visibilityTimeoutCalculator,
82-
)
83-
}
66+
val subscriber = Subscriber(
67+
queueName = queueName,
68+
queueConfig = queueConfig,
69+
deadLetterQueueName = deadLetterQueueName,
70+
handler = handler,
71+
channel = Channel(queueConfig.channel_capacity),
72+
client = sqsClientFactory.get(queueConfig.region!!),
73+
sqsQueueResolver = sqsQueueResolver,
74+
sqsMetrics = sqsMetrics,
75+
moshi = moshi,
76+
clock = clock,
77+
tracer = tracer,
78+
visibilityTimeoutCalculator = visibilityTimeoutCalculator,
79+
)
8480

8581
scope.launch { subscriber.poll() }
86-
handlingScopes[queueName] = CoroutineScope(Dispatchers.IO.limitedParallelism(queueConfig.parallelism) + SupervisorJob())
82+
handlingScopes[queueName] =
83+
CoroutineScope(Dispatchers.IO.limitedParallelism(queueConfig.parallelism) + SupervisorJob())
8784
repeat(queueConfig.concurrency) {
8885
handlingScopes[queueName]?.launch { subscriber.run() }
8986
}

Diff for: misk-aws2-sqs/src/main/kotlin/misk/aws2/sqs/jobqueue/SqsJobEnquer.kt renamed to misk-aws2-sqs/src/main/kotlin/misk/aws2/sqs/jobqueue/SqsJobEnqueuer.kt

+11-6
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import io.opentracing.Scope
88
import io.opentracing.Span
99
import io.opentracing.Tracer
1010
import io.opentracing.tag.Tags
11-
import kotlinx.coroutines.future.await
11+
import misk.aws2.sqs.jobqueue.config.SqsConfig
1212
import misk.jobqueue.QueueName
1313
import misk.jobqueue.sqs.parentQueue
1414
import misk.jobqueue.v2.JobEnqueuer
@@ -17,17 +17,20 @@ import misk.tokens.TokenGenerator
1717
import software.amazon.awssdk.services.sqs.SqsAsyncClient
1818
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue
1919
import software.amazon.awssdk.services.sqs.model.SendMessageRequest
20+
import java.time.Clock
2021
import java.time.Duration
2122
import java.util.concurrent.CompletableFuture
2223

2324
@Singleton
2425
class SqsJobEnqueuer @Inject constructor(
25-
private val client: SqsAsyncClient,
26-
private val queueResolver: QueueResolver,
26+
private val sqsClientFactory: SqsClientFactory,
27+
private val sqsConfig: SqsConfig,
28+
private val sqsQueueResolver: SqsQueueResolver,
2729
private val tokenGenerator: TokenGenerator,
2830
private val sqsMetrics: SqsMetrics,
2931
private val moshi: Moshi,
3032
private val tracer: Tracer,
33+
private val clock: Clock,
3134
) : JobEnqueuer {
3235
/**
3336
* Enqueue the job and return a CompletableFuture.
@@ -42,7 +45,7 @@ class SqsJobEnqueuer @Inject constructor(
4245
attributes: Map<String, String>,
4346
): CompletableFuture<Boolean> {
4447
return tracer.withSpanAsync("enqueue-job-${queueName.value}") { span, scope ->
45-
val queueUrl = queueResolver.getQueueUrl(queueName)
48+
val queueUrl = sqsQueueResolver.getQueueUrl(queueName)
4649
val resolvedIdempotencyKey = idempotencyKey ?: tokenGenerator.generate()
4750

4851
val attrs = attributes.map {
@@ -58,12 +61,14 @@ class SqsJobEnqueuer @Inject constructor(
5861
.messageAttributes(attrs)
5962
.build()
6063

61-
val timer = sqsMetrics.sqsSendTime.labels(queueName.value).startTimer()
64+
val startTime = clock.millis()
6265
try {
66+
val region = sqsConfig.getQueueConfig(queueName).region!!
67+
val client = sqsClientFactory.get(region)
6368
val response = client.sendMessage(request)
6469
sqsMetrics.jobsEnqueued.labels(queueName.value).inc()
6570
response.whenComplete { _, _ ->
66-
timer.observeDuration()
71+
sqsMetrics.sqsSendTime.labels(queueName.value).observe((clock.millis() - startTime).toDouble())
6772
span.finish()
6873
scope.close()
6974
}.thenCompose { CompletableFuture.supplyAsync { true } }

0 commit comments

Comments
 (0)