This is a follow up to #22384

The following exception is observed

2019-03-14 08:09:37.211 ERROR 13233 --- [r-http-kqueue-7] reactor.core.publisher.Operators         : Operator called default onErrorDropped

io.netty.util.IllegalReferenceCountException: refCnt: 0
    at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1441)
    at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1373)
    at io.netty.buffer.PooledHeapByteBuf.nioBuffer(PooledHeapByteBuf.java:298)
    at io.netty.buffer.AbstractByteBuf.nioBuffer(AbstractByteBuf.java:1224)
    at io.netty.buffer.WrappedByteBuf.nioBuffer(WrappedByteBuf.java:919)
    at io.netty.buffer.AdvancedLeakAwareByteBuf.nioBuffer(AdvancedLeakAwareByteBuf.java:713)
    at org.springframework.core.io.buffer.NettyDataBuffer.asByteBuffer(NettyDataBuffer.java:266)
    at org.springframework.core.codec.StringDecoder.decodeDataBuffer(StringDecoder.java:207)
    at org.springframework.core.codec.StringDecoder.decodeDataBuffer(StringDecoder.java:59)
    at org.springframework.core.codec.AbstractDataBufferDecoder.lambda$decodeToMono$1(AbstractDataBufferDecoder.java:68)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107)
    at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:287)
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:331)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1505)
    at reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onComplete(MonoCollectList.java:123)
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
    at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:372)

The cause is the following: https://github.com/spring-projects/spring-framework/blob/master/spring-core/src/main/java/org/springframework/core/codec/AbstractDataBufferDecoder.java#L67

DataBufferUtils.join(InputStream) declares doOnDiscard, when such event happens the buffer will be recycled then when the StringDecoder tries to do the actual decoding the exception above will be observed

https://github.com/spring-projects/spring-framework/blob/master/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java#L207

Comment From: rstoyanchev

Aside from wrapping the error in something with a more helpful message, I just don't see what eles we can do. StringDecoder has to return something after all, an an error signal is the only thing that makes sense to propagate downstream. Such errors have to be expected, e.g. via onErrorResume, if a cancellation signal could come independently while reading. What do you think @violetagg, anything I'm missing?

Comment From: spring-projects-issues

If you would like us to look at this issue, please provide the requested information. If the information is not provided within the next 7 days this issue will be closed.

Comment From: spring-projects-issues

Closing due to lack of requested feedback. If you would like us to look at this issue, please provide the requested information and we will re-open the issue.

Comment From: rstoyanchev

Experimenting with something like this:

class LeakTests extends AbstractDataBufferAllocatingTests {

    @ParameterizedDataBufferAllocatingTest
    void byteCountsAndPositions(String displayName, DataBufferFactory bufferFactory) {
        super.bufferFactory = bufferFactory;

        try {
            Flux.range(1, 10)
                    .doOnNext(i -> {
                        if (i == 8) {
                            throw new IllegalStateException("boo");
                        }
                    })
                    .map(i -> bufferFactory.allocateBuffer())
                    .collectList()
                    .doOnDiscard(PooledDataBuffer.class, dataBuffer -> {
                        DataBufferUtils.release(dataBuffer);
                        dataBuffer.release();
                    })
                    .block();
        }
        catch (Exception ex) {
            // Ignore
        }
    }
}

I see that doOnDiscard does continue to iterate over the remaining buffers even after an IllegalReferenceCountException. However with reactor.core at WARN level you see:

14:44:54.836 [Test worker] WARN  r.c.p.Operators - Error while discarding element from a Collection, continuing with next element
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
    at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74) ~[netty-common-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138) ~[netty-common-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100) ~[netty-buffer-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.buffer.WrappedByteBuf.release(WrappedByteBuf.java:1037) ~[netty-buffer-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.buffer.SimpleLeakAwareByteBuf.release(SimpleLeakAwareByteBuf.java:102) ~[netty-buffer-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.buffer.AdvancedLeakAwareByteBuf.release(AdvancedLeakAwareByteBuf.java:941) ~[netty-buffer-4.1.50.Final.jar:4.1.50.Final]
    at org.springframework.core.io.buffer.NettyDataBuffer.release(NettyDataBuffer.java:320) ~[main/:?]
    at org.springframework.core.io.buffer.LeakTests.lambda$byteCountsAndPositions$2(LeakTests.java:45) ~[test/:?]

So in the very least we can avoid the unnecessary stack trace and print a shorter message.

Comment From: rstoyanchev

On closer look this is a bigger issue than just logging. Aside from causing code that shouldn't fail, it places like this can also cause other cleanup to not be performed.

Comment From: eleftherias

@rstoyanchev Can 2d8b2fed8bbb9d41e1773992f7175814606386cd be backported to 5.2.x? We are seeing java.lang.ClassNotFoundException: io.netty.util.IllegalReferenceCountException when using 5.2.8.BUILD-SNAPSHOT and I suspect the backport will fix it.