Peti Koch opened SPR-16768 and commented
We are a local group of developers and we are studying the "reactive" inter-process communication using WebFlux (within Spring Boot 2.0.1).
One of our examples (https://github.com/ReactiveMeetupLucerne/SpringWebFluxWithSpringBoot2.0, question 5) looks like this:
WebClient.create("http://localhost:8080")
.get()
.uri("/question5/slowflightprice")
.retrieve()
.bodyToFlux(Integer.class)
.timeout(Duration.ofSeconds(1))
.subscribe(
price -> LOGGER.info("Got price: {}", price),
throwable -> LOGGER.warn("{}: {}", throwable.getClass().getSimpleName(), throwable.getMessage()),
() -> System.exit(0)
);
"server":
@GetMapping(value = "/question5/slowflightprice", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<Integer> flightprice() {
return Flux.interval(Duration.ofSeconds(10)).take(1)
.doOnSubscribe(subscription -> LOGGER.info("price is requested"))
.flatMap(tick -> Flux.just(250))
.doOnCancel(() -> LOGGER.info("cancelled"))
.doOnNext(price -> LOGGER.info("published price: {}", price));
}
(full code: https://github.com/ReactiveMeetupLucerne/SpringWebFluxWithSpringBoot2.0/tree/master/src/main/java/question5)
We expected that the cancellation due to a timeout is propagated "somehow" to the server, but it doesn't. The "server" continues its work although the client has unsubscribed.
We think this is a typical use case and it is important that cancellations (unsubscribing) are propagated between processes.
PS: In a second example using a stream of elements, the cancellation works fine: https://github.com/ReactiveMeetupLucerne/SpringWebFluxWithSpringBoot2.0/tree/master/src/main/java/question4
The unsubscription is done there using take(1000)
instead of timeout
Affects: 5.0.5
Comment From: deepak-auto
Hello, is there an update on this bug?
Comment From: rstoyanchev
In response to the original description, WebFlux client and server are HTTP based and there is no explicit cancellation as in https://rsocket.io for example. When the HTTP connection is closed, and depending on how it is closed, and also depending on the server, this may be detected and that should result in a cancellation signal, especially if the server keeps writing.
This issue was opened a while ago and a lot has changed. We've fixed mulitple issues with detecting closed connection, so this should work currently with Reactor Netty. If necessary please open a separate issue with a sample to demonstrate.