Context
Consider this method
@Scheduled(fixedDelay = 1000)
void job()
{
// some syncronous work
}
The job()
method can be counted upon to never be called until the previous run of job()
is finished.
Now consider this method
@Scheduled(fixedDelay = 1000)
void job()
{
someAsyncMethodReturningAMono()
.subscribe()
}
Here, job()
would be called 1000ms after the previous run of job()
returns, not when the the Mono
returned by someAsyncMethodReturningAMono()
would terminate. If the asynchronous work takes just over a bit longer than 1000ms, it may run at the same time as previous runs. This can be mitigated by using the .block()
statement to subscribe to someAsyncMethodReturningAMono()
, effectively making job()
syncronous again, but it would be better to have a non-blocking answer to this.
Proposal
To let @Scheduled
's fixedDelay
property, when the annotated method returns a Mono
or a Flux
, start the delay count from the moment the returned Mono
or Flux
terminates instead of when the method returns the Mono
, so that one can write this:
@Scheduled(fixedDelay = 1000)
Mono job()
{
return someAsyncMethodReturningAMono();
}
This code would resonate with WebFlux users nicely.
Comment From: s50600822
It does not make sense to me when someone wrote an async method then try to make it run in sync manner. Why couple that into the scheduling.
Comment From: DavidDuwaer
Someone has a choice to return a Mono/Flux from the method. Returning it would denote they want Spring to consider its asynchronous completion. The user can opt not to return the mono.
Furthermore I'm only proposing an API, not an implementation. The implementation could either be truly async or cause a thread to block; that is independent of my proposal.
Your argument applies the same on WebFlux asynchronous REST controller methods. Do you also think those do not make sense?
Comment From: simonbasle
Currently investigating this. There are several hurdles to overcome which I will below. For reference here is the code that would emulate fixedDelay
and fixedRate
in the reactive world (based on Reactor operators).
This assumes that the annotated method returns a Publisher<T>
(e.g. a Mono<Void>
) and that unlike in the current arrangement, that particular kind of return value is used by the annotation processor.
fixedDelay
Assuming we resolved a Duration fixedDelay
and a Duration initialDelay
(which is ZERO if not configured):
Publisher<?> p; // conceptually the publisher returned by the annotated method
Mono<Void> iterationMono = Flux.from(p).then();
Flux<Void> scheduledFlux = Mono.delay(this.fixedDelay).then(iterationMono).repeat();
if (!this.initialDelay.isZero()) {
scheduledFlux = Flux.concat(
Mono.delay(this.initialDelay).then(iterationMono),
scheduledFlux
);
}
scheduledFlux.subscribe(it -> {}, ex -> this.logger.error("Unexpected error occurred in scheduled reactive task", ex)
fixedRate
Assuming the same but with a resolved Duration fixedRate
:
Publisher<?> p;
Mono<Void> iterationMono = Flux.from(p).then();
Flux<Void> scheduledFlux = Flux.interval(this.initialDelay, this.fixedRate).flatMap(it -> iterationMono);
scheduledFlux.subscribe(it -> {}, ex -> this.logger.error("Unexpected error occurred in scheduled reactive task", ex)
Scope of change
I would consider the following out of scope for this change:
* supporting cron
* modifying the ScheduledTaskRegistrar
API
I would also challenge the scope of the change with the following questions:
* should return types other than Publisher
be considered?
* should return types other than subclasses of Publisher<T>
be considered ? (see ReactiveAdapterRegistry Caveat
below)
* should we consider <Void>
only publishers, i.e. empty ones, or any Publisher<T>
? if the later, the onNext
events will be ignored and this caveat should be documented
Caveats
Optional dependencies
- needs Reactor at runtime to implement scheduling
- needs Reactive Streams at runtime
ReactiveAdapterRegistry
Ideally the support would cover not only straight Publisher
-returning methods or Mono/Flux
-returning methods but also other reactive return types as long as there is a ReactiveAdapter
for them and they support deferred
mode (ie. the return value can be subscribed to repeatedly).
CompletableFuture
is therefore out of scope- RxJava
Maybe
,Completable
andSingle
could typically be converted. note these don't implementPublisher
.
Unfortunately, the ReactiveAdapterRegistry
doesn't seem to be typically available when the @Scheduled
annotation gets processed. It is therefore quite complicated to distinguish methods for which a reactive approach is needed vs classic methods, unless they return a type plainly assignable from Publisher
.
A mitigation could be to add an explicit boolean reactiveSupport
opt-in parameter to @Scheduled
, but that feels a bit on the nose.
This also plays into the next caveat.
Late scheduling, tracking and cancelling
In the current arrangement, a ScheduledTaskRegistrar
is used to both track the repeated tasks and also lazily trigger the initial iteration if the infrastructure isn't ready (no TaskScheduler
yet).
The issue is that the registrar API only covers tasks that are TriggerTask
or IntervalTask
, which it wraps in a ScheduledTask
, with a Future
attached for cancellation during the destroy phase.
This is not really applicable for reactive types. Using a Future
would be wasteful, and it would be preferable to track subscription's Disposable
for cancellation.
Yet, the late scheduling logic could be useful. A mitigation would be to create a TriggerTask
with an immediate Trigger
which only triggers once. That task would create the Flux<Void>
and subscribe to it. It would also track the Disposable
independently, to be stored alongside the ScheduledTask
in the bean processor for cancellation.
Note that adding this level of indirection might lead to the ReactiveAdapterRegistry
being available, which solves the previous caveat at the expense of added complexity and internal boilerplate.
Comment From: DavidDuwaer
Thanks for looking into it Simon!
Addressing your 3 points on scope
I would also challenge the scope of the change with the following questions: - Should return types other than Publisher be considered? Should return types other than subclasses of Publisher
be considered ? (see ReactiveAdapterRegistry Caveat below) Because it would enable releasing this feature earlier, because not supporting types other thanPublisher
is easy to communicate in the docs, and because people who would expect (i.e. without finding out about it via the docs)@Scheduler
to "await" asynchronous methods are likelyWebFlux
users, I would say the way to go, at least right now, is to only supportPublisher
andMono
. - should we consideronly publishers, i.e. empty ones, or any Publisher To minimize documentation dependence ("why does it not work!") / maximize plug-n-play experience, and to allow better multi-purposing of single methods to minimise code (no need to use a wrapper method that converts to? if the later, the onNext events will be ignored and this caveat should be documented Publisher<Void>
), I would allow all return types. The fact that users should be familiar with the distinction between the emission of aPublisher
's (one and only) value and its completion, and the danger that they might confuse the two, would not be something specific to this feature but is general toMono
andFlux
already
Comment From: simonbasle
Superseded-by gh-29924 (let's continue the discussion in the PR which already covers a lot of ground, on the road to 6.1.0-M1)