-
Notifications
You must be signed in to change notification settings - Fork 649
Description
Who is this feature for?
Engineers needing high-throughput S3 streaming and better resource utilization under load.
What problem are they facing today?
The current AbstractObjectStorage implementation uses java.util.concurrent.Semaphore for IO rate limiting (inflightReadLimiter, inflightWriteLimiter).
The acquire() calls block the calling thread until a permit is available. In an async architecture like AutoMQ, blocking the event loop or worker threads causes thread starvation and limits concurrency. The code currently tracks this debt with a // TODO: async acquire? comment.
Why is solving this impactful?
Migrating to non-blocking acquisition will prevent thread starvation, improve the responsiveness of the S3 stream layer, and allow for higher IO concurrency without blocking worker threads.
Proposed solution
Refactor AbstractObjectStorage to use the existing com.automq.stream.utils.AsyncSemaphore utility.
- Change
inflightReadLimiterandinflightWriteLimiterfields toAsyncSemaphore. - Update methods (
createMultipartUpload,uploadPart,mergedRangeRead, etc.) to use the non-blockingacquire(long permits, Supplier<CompletableFuture> task, Executor executor)pattern. - Remove the now-redundant blocking helper methods
acquireReadPermitandacquireWritePermit.
Additional notes
Target file: s3stream/src/main/java/com/automq/stream/s3/operator/AbstractObjectStorage.java