- 
                Notifications
    
You must be signed in to change notification settings  - Fork 41
 
Open
Description
What happened?
kafka: 3.9.1
tiered-storage-for-apache-kafka: 1.0.0
I found that when retrying after a consumption error, ChunkCache.getChunk would throw a nested ExecutionException.
I think this is because even if an error occurs when getting a chunk for the first time, the abnormal CompletableFuture will still be added to the cache, causing the next time the same chunk is obtained, the abnormal CompletableFuture will be called directly.
[2025-10-12 10:30:17,140] ERROR Error occurred while reading the remote data for dspretok_01-0 (kafka.log.remote.RemoteLogReader)
org.apache.kafka.server.log.remote.storage.RemoteStorageException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.util.concurrent.ExecutionException: java.util.concurrent.ExecutionException: java.net.SocketTimeoutException: Read timed out
        at io.aiven.kafka.tieredstorage.RemoteStorageManager.fetchLogSegment(RemoteStorageManager.java:571)
        at io.aiven.kafka.tieredstorage.RemoteStorageManager.fetchLogSegment(RemoteStorageManager.java:532)
        at org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager.lambda$fetchLogSegment$3(ClassLoaderAwareRemoteStorageManager.java:78)
        at org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager.withClassLoader(ClassLoaderAwareRemoteStorageManager.java:65)
        at org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager.fetchLogSegment(ClassLoaderAwareRemoteStorageManager.java:78)
        at kafka.log.remote.RemoteLogManager.read(RemoteLogManager.java:1617)
        at kafka.log.remote.RemoteLogReader.lambda$call$0(RemoteLogReader.java:66)
        at com.yammer.metrics.core.Timer.time(Timer.java:91)
        at kafka.log.remote.RemoteLogReader.call(RemoteLogReader.java:66)
        at kafka.log.remote.RemoteLogReader.call(RemoteLogReader.java:36)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.util.concurrent.ExecutionException: java.util.concurrent.ExecutionException: java.net.SocketTimeoutException: Read timed out
        at io.aiven.kafka.tieredstorage.fetch.cache.ChunkCache.getChunk(ChunkCache.java:125)
        at io.aiven.kafka.tieredstorage.fetch.FetchChunkEnumeration.getChunkContent(FetchChunkEnumeration.java:142)
        at io.aiven.kafka.tieredstorage.fetch.FetchChunkEnumeration.nextElement(FetchChunkEnumeration.java:105)
        at io.aiven.kafka.tieredstorage.fetch.FetchChunkEnumeration.nextElement(FetchChunkEnumeration.java:37)
        at java.base/java.io.SequenceInputStream.peekNextStream(SequenceInputStream.java:101)
        at java.base/java.io.SequenceInputStream.<init>(SequenceInputStream.java:67)
        at io.aiven.kafka.tieredstorage.fetch.FetchChunkEnumeration$LazySequenceInputStream.<init>(FetchChunkEnumeration.java:167)
        at io.aiven.kafka.tieredstorage.fetch.FetchChunkEnumeration.toInputStream(FetchChunkEnumeration.java:151)
        at io.aiven.kafka.tieredstorage.RemoteStorageManager.fetchLogSegment(RemoteStorageManager.java:560)
        ... 13 more
Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.ExecutionException: java.util.concurrent.ExecutionException: java.net.SocketTimeoutException: Read timed out
        at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
        at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2096)
        at io.aiven.kafka.tieredstorage.fetch.cache.ChunkCache.getChunk(ChunkCache.java:109)
        ... 21 more
Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.ExecutionException: java.net.SocketTimeoutException: Read timed out
        at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
        at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
        at io.aiven.kafka.tieredstorage.fetch.cache.ChunkCache.lambda$getChunk$0(ChunkCache.java:100)
        at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
        at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Caused by: java.util.concurrent.ExecutionException: java.net.SocketTimeoutException: Read timed out
        ... 10 more
Caused by: java.net.SocketTimeoutException: Read timed out
        at java.base/sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:288)
        at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:314)
        at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:355)
        at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:808)
        at java.base/java.net.Socket$SocketInputStream.read(Socket.java:966)
        at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:136)
        at org.apache.http.impl.io.SessionInputBufferImpl.read(SessionInputBufferImpl.java:195)
        at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178)
        at org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137)
        at java.base/java.io.FilterInputStream.read(FilterInputStream.java:132)
        at com.baidubce.util.LengthCheckInputStream.read(LengthCheckInputStream.java:89)
        at java.base/java.io.FilterInputStream.read(FilterInputStream.java:132)
        at java.base/java.io.InputStream.readNBytes(InputStream.java:409)
        at java.base/java.io.InputStream.readAllBytes(InputStream.java:346)
        at io.aiven.kafka.tieredstorage.fetch.cache.DirectMemoryChunkCache.cacheChunk(DirectMemoryChunkCache.java:59)
        at io.aiven.kafka.tieredstorage.fetch.cache.DirectMemoryChunkCache.cacheChunk(DirectMemoryChunkCache.java:17)
        at io.aiven.kafka.tieredstorage.fetch.cache.ChunkCache.lambda$getChunk$0(ChunkCache.java:91)
        ... 7 more
What did you expect to happen?
If CompletableFuture is abnormal, do not add it to the cache.
What else do we need to know?
Metadata
Metadata
Assignees
Labels
No labels