22
22
import io .opentelemetry .api .OpenTelemetry ;
23
23
import io .opentelemetry .api .metrics .ObservableLongCounter ;
24
24
import io .prometheus .client .Gauge ;
25
+ import java .util .ArrayDeque ;
25
26
import java .util .Optional ;
26
27
import java .util .Queue ;
27
28
import java .util .concurrent .ScheduledExecutorService ;
31
32
import org .apache .pulsar .opentelemetry .Constants ;
32
33
import org .apache .pulsar .opentelemetry .OpenTelemetryAttributes .InflightReadLimiterUtilization ;
33
34
import org .apache .pulsar .opentelemetry .annotations .PulsarDeprecatedMetric ;
34
- import org .jctools .queues .SpscArrayQueue ;
35
35
36
36
@ Slf4j
37
37
public class InflightReadsLimiter implements AutoCloseable {
@@ -51,6 +51,7 @@ public class InflightReadsLimiter implements AutoCloseable {
51
51
public static final String INFLIGHT_READS_LIMITER_USAGE_METRIC_NAME =
52
52
"pulsar.broker.managed_ledger.inflight.read.usage" ;
53
53
private final ObservableLongCounter inflightReadsUsageCounter ;
54
+ private final int maxReadsInFlightAcquireQueueSize ;
54
55
55
56
@ PulsarDeprecatedMetric (newMetricName = INFLIGHT_READS_LIMITER_USAGE_METRIC_NAME )
56
57
@ Deprecated
@@ -82,9 +83,10 @@ public InflightReadsLimiter(long maxReadsInFlightSize, int maxReadsInFlightAcqui
82
83
this .remainingBytes = maxReadsInFlightSize ;
83
84
this .acquireTimeoutMillis = acquireTimeoutMillis ;
84
85
this .timeOutExecutor = timeOutExecutor ;
86
+ this .maxReadsInFlightAcquireQueueSize = maxReadsInFlightAcquireQueueSize ;
85
87
if (maxReadsInFlightSize > 0 ) {
86
88
enabled = true ;
87
- this .queuedHandles = new SpscArrayQueue <>(maxReadsInFlightAcquireQueueSize );
89
+ this .queuedHandles = new ArrayDeque <>();
88
90
} else {
89
91
enabled = false ;
90
92
this .queuedHandles = null ;
@@ -174,13 +176,14 @@ private synchronized Optional<Handle> internalAcquire(long permits, Consumer<Han
174
176
updateMetrics ();
175
177
return Optional .of (new Handle (maxReadsInFlightSize , handle .creationTime , true ));
176
178
} else {
177
- if (queuedHandles .offer (new QueuedHandle (handle , callback ))) {
178
- scheduleTimeOutCheck (acquireTimeoutMillis );
179
- return Optional .empty ();
180
- } else {
179
+ if (queuedHandles .size () >= maxReadsInFlightAcquireQueueSize ) {
181
180
log .warn ("Failed to queue handle for acquiring permits: {}, creationTime: {}, remainingBytes:{}" ,
182
181
permits , handle .creationTime , remainingBytes );
183
182
return Optional .of (new Handle (0 , handle .creationTime , false ));
183
+ } else {
184
+ queuedHandles .offer (new QueuedHandle (handle , callback ));
185
+ scheduleTimeOutCheck (acquireTimeoutMillis );
186
+ return Optional .empty ();
184
187
}
185
188
}
186
189
}
0 commit comments