Description
Describe the bug
Non-Heap Memory creeps over time leading to an OOM when writing data to ADLS-GEN2 datalake.
Exception or Stack Trace
Component Runtime Exception: IllegalStateException: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 4194304 byte(s) of direct memory (used: 62914567, max: 64880640) >> Cause: failed to allocate 4194304 byte(s) of direct memory (used: 62914567, max: 64880640)] java.lang.IllegalStateException: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 4194304 byte(s) of direct memory (used: 62914567, max: 64880640) at io.netty.channel.AbstractCoalescingBufferQueue.releaseAndCompleteAll(AbstractCoalescingBufferQueue.java:371) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.AbstractCoalescingBufferQueue.releaseAndFailAll(AbstractCoalescingBufferQueue.java:218) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.handler.ssl.SslHandler.releaseAndFailAll(SslHandler.java:2007) ~[netty-handler-4.1.100.Final.jar:4.1.100.Final] at io.netty.handler.ssl.SslHandler.setHandshakeFailure(SslHandler.java:1986) ~[netty-handler-4.1.100.Final.jar:4.1.100.Final] at io.netty.handler.ssl.SslHandler.setHandshakeFailure(SslHandler.java:1951) ~[netty-handler-4.1.100.Final.jar:4.1.100.Final] at io.netty.handler.ssl.SslHandler.flush(SslHandler.java:804) ~[netty-handler-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:925) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:907) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:893) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.flush(CombinedChannelDuplexHandler.java:531) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:125) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.CombinedChannelDuplexHandler.flush(CombinedChannelDuplexHandler.java:356) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:923) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:907) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:893) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final] at reactor.netty.channel.MonoSendMany$SendManyInner.run(MonoSendMany.java:325) ~[reactor-netty-core-1.0.38.jar:1.0.38] at reactor.netty.channel.MonoSendMany$SendManyInner.trySchedule(MonoSendMany.java:434) ~[reactor-netty-core-1.0.38.jar:1.0.38] at reactor.netty.channel.MonoSendMany$SendManyInner.onNext(MonoSendMany.java:223) ~[reactor-netty-core-1.0.38.jar:1.0.38] at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:126) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onNext(FluxConcatArray.java:201) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:335) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:294) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.request(FluxConcatArray.java:276) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.core.publisher.FluxHandle$HandleSubscriber.request(FluxHandle.java:269) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.netty.channel.MonoSendMany$SendManyInner.onSubscribe(MonoSendMany.java:254) ~[reactor-netty-core-1.0.38.jar:1.0.38] at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.core.publisher.FluxHandle$HandleSubscriber.onSubscribe(FluxHandle.java:87) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onSubscribe(FluxConcatArray.java:187) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:201) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:83) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.core.publisher.Flux.subscribe(Flux.java:8642) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:258) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:78) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.core.publisher.Flux.subscribe(Flux.java:8642) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.netty.channel.MonoSendMany.subscribe(MonoSendMany.java:102) ~[reactor-netty-core-1.0.38.jar:1.0.38] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:240) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.core.publisher.Operators.complete(Operators.java:137) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.netty.FutureMono.doSubscribe(FutureMono.java:122) ~[reactor-netty-core-1.0.38.jar:1.0.38] at reactor.netty.FutureMono$DeferredFutureMono.subscribe(FutureMono.java:114) ~[reactor-netty-core-1.0.38.jar:1.0.38] at reactor.core.publisher.Mono.subscribe(Mono.java:4490) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.core.publisher.Mono.subscribe(Mono.java:4490) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.netty.NettyOutbound.subscribe(NettyOutbound.java:336) ~[reactor-netty-core-1.0.38.jar:1.0.38] at reactor.core.publisher.MonoSource.subscribe(MonoSource.java:69) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:445) ~[reactor-netty-http-1.0.38.jar:1.0.38] at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:707) ~[reactor-netty-core-1.0.38.jar:1.0.38] at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onStateChange(DefaultPooledConnectionProvider.java:193) ~[reactor-netty-core-1.0.38.jar:1.0.38] at reactor.netty.resources.DefaultPooledConnectionProvider$PooledConnection.onStateChange(DefaultPooledConnectionProvider.java:454) ~[reactor-netty-core-1.0.38.jar:1.0.38] at reactor.netty.channel.ChannelOperationsHandler.channelActive(ChannelOperationsHandler.java:62) ~[reactor-netty-core-1.0.38.jar:1.0.38] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:238) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:231) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelActive(CombinedChannelDuplexHandler.java:412) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelInboundHandlerAdapter.java:69) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.CombinedChannelDuplexHandler.channelActive(CombinedChannelDuplexHandler.java:211) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:260) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:238) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:231) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final] at reactor.netty.tcp.SslProvider$SslReadHandler.userEventTriggered(SslProvider.java:856) ~[reactor-netty-core-1.0.38.jar:1.0.38] at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:400) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:376) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:368) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.handler.ssl.SslHandler.setHandshakeSuccess(SslHandler.java:1936) ~[netty-handler-4.1.100.Final.jar:4.1.100.Final] at io.netty.handler.ssl.SslHandler.wrapNonAppData(SslHandler.java:998) ~[netty-handler-4.1.100.Final.jar:4.1.100.Final] at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1507) ~[netty-handler-4.1.100.Final.jar:4.1.100.Final] at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1345) ~[netty-handler-4.1.100.Final.jar:4.1.100.Final] at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1385) ~[netty-handler-4.1.100.Final.jar:4.1.100.Final] at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529) ~[netty-codec-4.1.100.Final.jar:4.1.100.Final] at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468) ~[netty-codec-4.1.100.Final.jar:4.1.100.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[netty-codec-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800) ~[netty-transport-classes-epoll-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe$1.run(AbstractEpollChannel.java:425) ~[netty-transport-classes-epoll-4.1.100.Final.jar:4.1.100.Final] at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173) ~[netty-common-4.1.100.Final.jar:4.1.100.Final] at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166) ~[netty-common-4.1.100.Final.jar:4.1.100.Final] at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) ~[netty-common-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:413) ~[netty-transport-classes-epoll-4.1.100.Final.jar:4.1.100.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.100.Final.jar:4.1.100.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.100.Final.jar:4.1.100.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.100.Final.jar:4.1.100.Final] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_422] Suppressed: java.lang.Exception: #block terminated with an error at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:141) ~[reactor-core-3.4.33.jar:3.4.33] at reactor.core.publisher.Mono.block(Mono.java:1766) ~[reactor-core-3.4.33.jar:3.4.33] at com.azure.storage.common.implementation.StorageImplUtils.blockWithOptionalTimeout(StorageImplUtils.java:149) ~[azure-storage-common-12.24.0.jar:12.24.0] at com.azure.storage.file.datalake.DataLakeFileClient.uploadWithResponse(DataLakeFileClient.java:432) ~[azure-storage-file-datalake-12.18.0.jar:12.18.0]
To Reproduce
To reproduce quickly set heap to low amount -Xmx35m, enough so that it can write files up to 4mb without OOM immediately.
Keep writing files in interval to see memory growth.
Code Snippet
`
public static void main(String[] args) throws InterruptedException {
StorageSharedKeyCredential storageSharedKeyCredential = new StorageSharedKeyCredential(
"",
"");
DataLakeServiceClient dataLakeServiceClient = new DataLakeServiceClientBuilder()
.endpoint("")
.sasToken(
"").credential(storageSharedKeyCredential)
.buildClient();
dataLakeServiceClient.getFileSystemClient("").createIfNotExists();
while (true) {
System.out.println("Direct Memory Start " + PlatformDependent.usedDirectMemory() + "/" + PlatformDependent.maxDirectMemory());
System.out.println("Heap Memory Start: " + Runtime.getRuntime().freeMemory() + "/" + Runtime.getRuntime().totalMemory());
for (int i = 0; i < 10; i++) {
write(dataLakeServiceClient, i);
}
System.out.println("Direct Memory End: " + PlatformDependent.usedDirectMemory() + "/" + PlatformDependent.maxDirectMemory());
System.out.println("Heap Memory End: " + Runtime.getRuntime().freeMemory() + "/" + Runtime.getRuntime().totalMemory());
Thread.sleep(30000);
}
}
public static void write(DataLakeServiceClient dataLakeServiceClient, int i) {
byte[] object = new byte[1024 * 1024 * 4];
DataLakePathCreateOptions dataLakePathCreateOptions = new DataLakePathCreateOptions();
dataLakeServiceClient.getFileSystemClient("").getFileClient("root/testingfile" + i).upload(BinaryData.fromBytes(new String(object).getBytes()), true);
}
`
Expected behavior
Total memory should not increase, including both direct and heap memory.
No leaks from netty leak detector.
Setup (please complete the following information):
- OS: Ubuntu 22.04
- IDE: IntelliJ
- Library/Libraries: azure-storage-file-datalake: 12.18.0
- Java version: 8
- Frameworks: Maven