Context

Consider this method

@Scheduled(fixedDelay = 1000)
void job()
{
    // some syncronous work
}

The job() method can be counted upon to never be called until the previous run of job() is finished.

Now consider this method

@Scheduled(fixedDelay = 1000)
void job()
{
    someAsyncMethodReturningAMono()
       .subscribe()
}

Here, job() would be called 1000ms after the previous run of job() returns, not when the the Mono returned by someAsyncMethodReturningAMono() would terminate. If the asynchronous work takes just over a bit longer than 1000ms, it may run at the same time as previous runs. This can be mitigated by using the .block() statement to subscribe to someAsyncMethodReturningAMono(), effectively making job() syncronous again, but it would be better to have a non-blocking answer to this.

Proposal

To let @Scheduled's fixedDelay property, when the annotated method returns a Mono or a Flux, start the delay count from the moment the returned Mono or Flux terminates instead of when the method returns the Mono, so that one can write this:

@Scheduled(fixedDelay = 1000)
Mono job()
{
    return someAsyncMethodReturningAMono();
}

This code would resonate with WebFlux users nicely.

Comment From: s50600822

It does not make sense to me when someone wrote an async method then try to make it run in sync manner. Why couple that into the scheduling.

Comment From: DavidDuwaer

Someone has a choice to return a Mono/Flux from the method. Returning it would denote they want Spring to consider its asynchronous completion. The user can opt not to return the mono.

Furthermore I'm only proposing an API, not an implementation. The implementation could either be truly async or cause a thread to block; that is independent of my proposal.

Your argument applies the same on WebFlux asynchronous REST controller methods. Do you also think those do not make sense?

Comment From: simonbasle

Currently investigating this. There are several hurdles to overcome which I will below. For reference here is the code that would emulate fixedDelay and fixedRate in the reactive world (based on Reactor operators).

This assumes that the annotated method returns a Publisher<T> (e.g. a Mono<Void>) and that unlike in the current arrangement, that particular kind of return value is used by the annotation processor.

fixedDelay

Assuming we resolved a Duration fixedDelay and a Duration initialDelay (which is ZERO if not configured):

Publisher<?> p; // conceptually the publisher returned by the annotated method

Mono<Void> iterationMono = Flux.from(p).then();
Flux<Void> scheduledFlux = Mono.delay(this.fixedDelay).then(iterationMono).repeat();

if (!this.initialDelay.isZero()) {
    scheduledFlux = Flux.concat(
            Mono.delay(this.initialDelay).then(iterationMono),
            scheduledFlux
    );
}

scheduledFlux.subscribe(it -> {}, ex -> this.logger.error("Unexpected error occurred in scheduled reactive task", ex)

fixedRate

Assuming the same but with a resolved Duration fixedRate:

Publisher<?> p;
Mono<Void> iterationMono = Flux.from(p).then();
Flux<Void> scheduledFlux = Flux.interval(this.initialDelay, this.fixedRate).flatMap(it -> iterationMono);

scheduledFlux.subscribe(it -> {}, ex -> this.logger.error("Unexpected error occurred in scheduled reactive task", ex)

Scope of change

I would consider the following out of scope for this change: * supporting cron * modifying the ScheduledTaskRegistrar API

I would also challenge the scope of the change with the following questions: * should return types other than Publisher be considered? * should return types other than subclasses of Publisher<T> be considered ? (see ReactiveAdapterRegistry Caveat below) * should we consider <Void> only publishers, i.e. empty ones, or any Publisher<T>? if the later, the onNext events will be ignored and this caveat should be documented

Caveats

Optional dependencies

  • needs Reactor at runtime to implement scheduling
  • needs Reactive Streams at runtime

ReactiveAdapterRegistry

Ideally the support would cover not only straight Publisher-returning methods or Mono/Flux-returning methods but also other reactive return types as long as there is a ReactiveAdapter for them and they support deferred mode (ie. the return value can be subscribed to repeatedly).

  • CompletableFuture is therefore out of scope
  • RxJava Maybe, Completable and Single could typically be converted. note these don't implement Publisher.

Unfortunately, the ReactiveAdapterRegistry doesn't seem to be typically available when the @Scheduled annotation gets processed. It is therefore quite complicated to distinguish methods for which a reactive approach is needed vs classic methods, unless they return a type plainly assignable from Publisher.

A mitigation could be to add an explicit boolean reactiveSupport opt-in parameter to @Scheduled, but that feels a bit on the nose.

This also plays into the next caveat.

Late scheduling, tracking and cancelling

In the current arrangement, a ScheduledTaskRegistrar is used to both track the repeated tasks and also lazily trigger the initial iteration if the infrastructure isn't ready (no TaskScheduler yet).

The issue is that the registrar API only covers tasks that are TriggerTask or IntervalTask, which it wraps in a ScheduledTask, with a Future attached for cancellation during the destroy phase. This is not really applicable for reactive types. Using a Future would be wasteful, and it would be preferable to track subscription's Disposable for cancellation.

Yet, the late scheduling logic could be useful. A mitigation would be to create a TriggerTask with an immediate Trigger which only triggers once. That task would create the Flux<Void> and subscribe to it. It would also track the Disposable independently, to be stored alongside the ScheduledTask in the bean processor for cancellation.

Note that adding this level of indirection might lead to the ReactiveAdapterRegistry being available, which solves the previous caveat at the expense of added complexity and internal boilerplate.

Comment From: DavidDuwaer

Thanks for looking into it Simon!

Addressing your 3 points on scope

I would also challenge the scope of the change with the following questions: - Should return types other than Publisher be considered? Should return types other than subclasses of Publisher be considered ? (see ReactiveAdapterRegistry Caveat below) Because it would enable releasing this feature earlier, because not supporting types other than Publisher is easy to communicate in the docs, and because people who would expect (i.e. without finding out about it via the docs) @Scheduler to "await" asynchronous methods are likely WebFlux users, I would say the way to go, at least right now, is to only support Publisher and Mono. - should we consider only publishers, i.e. empty ones, or any Publisher? if the later, the onNext events will be ignored and this caveat should be documented To minimize documentation dependence ("why does it not work!") / maximize plug-n-play experience, and to allow better multi-purposing of single methods to minimise code (no need to use a wrapper method that converts to Publisher<Void>), I would allow all return types. The fact that users should be familiar with the distinction between the emission of a Publisher's (one and only) value and its completion, and the danger that they might confuse the two, would not be something specific to this feature but is general to Mono and Flux already

Comment From: simonbasle

Superseded-by gh-29924 (let's continue the discussion in the PR which already covers a lot of ground, on the road to 6.1.0-M1)