Affects: 5.2.7.RELEASE

I’m having a memory leak in a real application, which I tried to reproduce with the sample: https://github.com/EtienneMiret/reactor-netty-oom.

In this sample, having a flux of integers, I call the below method on each of them and then aggregate the results:

public Mono<Long> get (int i) {
  return webClient.get ()
      .uri ("/{index}", i)
      .exchange ()
      .delayElement (Duration.ofMillis (200))
      .flatMap (response -> response.bodyToMono (ByteArrayResource.class))
      .map (ByteArrayResource::contentLength);
}

When the server is behaving properly, all is fine, but in case it aborts the TCP connection, this code starts leaking memory. My bet is that the network error triggers an exception which cancels the Flux, thus preventing the response -> response.bodyToMono (ByteArrayResource.class) lambda from being called. The .delayElement () operator is an attempt at making it more likely that the ClientResponse is built and not consumed.

If I understand properly ClientResponse’s Javadoc and #21801, my code is correctly consuming the response, and error cases (such as IO error or pipeline cancellation) should be handled by Spring. Is this indeed the case?

After the first connection reset from the server, I have the below stack:

reactor.core.publisher.Operators Operator called default onErrorDropped
java.lang.NullPointerException: null
    at reactor.core.publisher.MonoCollect$CollectSubscriber.onNext(MonoCollect.java:124) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    |_ checkpoint ⇢ Body from GET http://localhost:7890/52 [DefaultClientResponse]
Stack trace:
        at reactor.core.publisher.MonoCollect$CollectSubscriber.onNext(MonoCollect.java:124) [reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) [reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192) [reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192) [reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) [reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
        at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:330) [reactor-netty-0.9.8.RELEASE.jar:0.9.8.RELEASE]
        at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:353) [reactor-netty-0.9.8.RELEASE.jar:0.9.8.RELEASE]
        at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:605) [reactor-netty-0.9.8.RELEASE.jar:0.9.8.RELEASE]
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96) [reactor-netty-0.9.8.RELEASE.jar:0.9.8.RELEASE]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) [netty-handler-4.1.50.Final.jar:4.1.50.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) [netty-codec-4.1.50.Final.jar:4.1.50.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) [netty-codec-4.1.50.Final.jar:4.1.50.Final]
        at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [netty-common-4.1.50.Final.jar:4.1.50.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.50.Final.jar:4.1.50.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.50.Final.jar:4.1.50.Final]
        at java.lang.Thread.run(Thread.java:830) [?:?]

And later (when the GC runs I guess), I have a number of:

io.netty.util.ResourceLeakDetector LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.

See full error.log.

I’m still not sure where the issue is (my code? Spring? Reactor-netty?). Sorry if this turns out not to be a Spring bug.

Comment From: rstoyanchev

Thanks for the reproducer!

The NPE is a Reactor Core issue for which I've created https://github.com/reactor/reactor-core/issues/2186. Still looking and also need to check the impact of that fix.

Comment From: rstoyanchev

@EtienneMiret I think we've found the the issue for this sample. Using a local fix for https://github.com/reactor/reactor-netty/issues/1161 along with https://github.com/reactor/reactor-core/issues/2186 I see no more leak reports with the sample.

Once there is a snapshot available with a fix for the Reactor Netty issue, perhaps you can give it a try? I will keep this issue open until then.

Comment From: EtienneMiret

@rstoyanchev Thank you so much. I’ll have a look at those fixes.

Comment From: rstoyanchev

This is going to be fixed in Spring Framework after all.

What happens is that when batches of 50 requests are send concurrently with flatMap, some will fail, which closes their connection but also cancels the others because they are in the same flatMap (using flatMapDelayError can make the others continue). When that cancellation happens after the ClientHttpResponse is ready and before the response Flux has been subscribed to, which happens more easily due to the delayElement(200) after exchange, the cancel is effectively lost.

Comment From: rstoyanchev

@EtienneMiret when you have a chance, please give 5.2.8 snapshots a try and you'll also need reactor-core 3.3.7 snapshots for the NPE.

Comment From: EtienneMiret

Hi @rstoyanchev. Thanks for the fix. I can confirm that my sample runs smoothly with reactor-core:3.3.7.BUILD-SNAPSHOT and spring-webflux:5.2.8.BUILD-SNAPSHOT.

My real app still has OOM errors, but since it doesn’t log LEAK: ByteBuf.release()... anymore, it must be due to some other issue.

Comment From: rstoyanchev

Thanks, let us know if you find anything further on our side.