Peti Koch opened SPR-16768 and commented

We are a local group of developers and we are studying the "reactive" inter-process communication using WebFlux (within Spring Boot 2.0.1).

One of our examples (https://github.com/ReactiveMeetupLucerne/SpringWebFluxWithSpringBoot2.0, question 5) looks like this:

WebClient.create("http://localhost:8080")
        .get()
        .uri("/question5/slowflightprice")
        .retrieve()
        .bodyToFlux(Integer.class)
        .timeout(Duration.ofSeconds(1))
        .subscribe(
                price -> LOGGER.info("Got price: {}", price),
                throwable -> LOGGER.warn("{}: {}", throwable.getClass().getSimpleName(), throwable.getMessage()),
                () -> System.exit(0)
        ); 

"server":

@GetMapping(value = "/question5/slowflightprice", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<Integer> flightprice() {
    return Flux.interval(Duration.ofSeconds(10)).take(1)
            .doOnSubscribe(subscription -> LOGGER.info("price is requested"))
            .flatMap(tick -> Flux.just(250))
            .doOnCancel(() -> LOGGER.info("cancelled"))
            .doOnNext(price -> LOGGER.info("published price: {}", price));
}

(full code: https://github.com/ReactiveMeetupLucerne/SpringWebFluxWithSpringBoot2.0/tree/master/src/main/java/question5)

We expected that the cancellation due to a timeout is propagated "somehow" to the server, but it doesn't. The "server" continues its work although the client has unsubscribed.

We think this is a typical use case and it is important that cancellations (unsubscribing) are propagated between processes.

PS: In a second example using a stream of elements, the cancellation works fine: https://github.com/ReactiveMeetupLucerne/SpringWebFluxWithSpringBoot2.0/tree/master/src/main/java/question4 The unsubscription is done there using take(1000) instead of timeout


Affects: 5.0.5

Comment From: deepak-auto

Hello, is there an update on this bug?

Comment From: rstoyanchev

In response to the original description, WebFlux client and server are HTTP based and there is no explicit cancellation as in https://rsocket.io for example. When the HTTP connection is closed, and depending on how it is closed, and also depending on the server, this may be detected and that should result in a cancellation signal, especially if the server keeps writing.

This issue was opened a while ago and a lot has changed. We've fixed mulitple issues with detecting closed connection, so this should work currently with Reactor Netty. If necessary please open a separate issue with a sample to demonstrate.