Right now RSocketRequester.Builder uses the connect method as a terminal operation which builds a Mono<RSocketRequester as of now this method wraps doConnect with extra Mono.defer which seems to be a legacy solution to defer of some heavy internals allocation.

As of now, this method seems to be redundant and somewhat stops to use one of the key features of vanilla RSocket like reconnect which allows the usage of the same Mono<RSocket> instance in order to access the same cached connection.

Unfortunately, because of the Mono.defer(() -> doCancel() the underlying mono is not directly propagated to the user, hence all subsequent subscriptions create new connections instead of the usage of the cached (assumed the reconnect feature is enabled)

Expected

rsocketRequesterBuilder
                .rsocketConnector(rsocketConnector ->
                    rsocketConnector
                            .lease(() -> Leases.create().receiver(leaseReceiver))
                            .reconnect(
                                Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(100))
                                     .maxBackoff(Duration.ofSeconds(5))
                            )
                )
                .connectWebSocket(URI.create(adjustmentProperties.getBaseUrl()));

to behave approximately identical to

RSocketConnector
                .create()
                .lease(() -> Leases.create().receiver(leaseReceiver))
                .dataMimeType(dataMimeType.toString())
                .metadataMimeType(metadataMimeType.toString())
                .reconnect(
                        Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(100))
                             .maxBackoff(Duration.ofSeconds(5))
                )
                .connect(WebsocketClientTransport.create(URI.create(adjustmentProperties.getBaseUrl() + "rsocket")))
                .map(rsocket -> RSocketRequester.wrap(
                    rsocket,
                    dataMimeType,
                    metadataMimeType,
                    rSocketStrategies
                ));

Actual

connect method wraps

RSocketConnector
                .create()
                .lease(() -> Leases.create().receiver(leaseReceiver))
                .dataMimeType(dataMimeType.toString())
                .metadataMimeType(metadataMimeType.toString())
                .reconnect(
                        Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(100))
                             .maxBackoff(Duration.ofSeconds(5))
                )
                .connect(WebsocketClientTransport.create(URI.create(adjustmentProperties.getBaseUrl() + "rsocket")))

into

Mono.defer(() -> RSocketConnector
                .create()
                .lease(() -> Leases.create().receiver(leaseReceiver))
                .dataMimeType(dataMimeType.toString())
                .metadataMimeType(metadataMimeType.toString())
                .reconnect(
                        Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(100))
                             .maxBackoff(Duration.ofSeconds(5))
                )
                .connect(WebsocketClientTransport.create(URI.create(adjustmentProperties.getBaseUrl() + "rsocket"))))

so reconnect does not work as expected