Right now RSocketRequester.Builder
uses the connect
method as a terminal operation which builds a Mono<RSocketRequester
as of now this method wraps doConnect
with extra Mono.defer
which seems to be a legacy solution to defer of some heavy internals allocation.
As of now, this method seems to be redundant and somewhat stops to use one of the key features of vanilla RSocket
like reconnect
which allows the usage of the same Mono<RSocket>
instance in order to access the same cached connection.
Unfortunately, because of the Mono.defer(() -> doCancel()
the underlying mono is not directly propagated to the user, hence all subsequent subscriptions create new connections instead of the usage of the cached (assumed the reconnect
feature is enabled)
Expected
rsocketRequesterBuilder
.rsocketConnector(rsocketConnector ->
rsocketConnector
.lease(() -> Leases.create().receiver(leaseReceiver))
.reconnect(
Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(100))
.maxBackoff(Duration.ofSeconds(5))
)
)
.connectWebSocket(URI.create(adjustmentProperties.getBaseUrl()));
to behave approximately identical to
RSocketConnector
.create()
.lease(() -> Leases.create().receiver(leaseReceiver))
.dataMimeType(dataMimeType.toString())
.metadataMimeType(metadataMimeType.toString())
.reconnect(
Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(100))
.maxBackoff(Duration.ofSeconds(5))
)
.connect(WebsocketClientTransport.create(URI.create(adjustmentProperties.getBaseUrl() + "rsocket")))
.map(rsocket -> RSocketRequester.wrap(
rsocket,
dataMimeType,
metadataMimeType,
rSocketStrategies
));
Actual
connect method wraps
RSocketConnector
.create()
.lease(() -> Leases.create().receiver(leaseReceiver))
.dataMimeType(dataMimeType.toString())
.metadataMimeType(metadataMimeType.toString())
.reconnect(
Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(100))
.maxBackoff(Duration.ofSeconds(5))
)
.connect(WebsocketClientTransport.create(URI.create(adjustmentProperties.getBaseUrl() + "rsocket")))
into
Mono.defer(() -> RSocketConnector
.create()
.lease(() -> Leases.create().receiver(leaseReceiver))
.dataMimeType(dataMimeType.toString())
.metadataMimeType(metadataMimeType.toString())
.reconnect(
Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(100))
.maxBackoff(Duration.ofSeconds(5))
)
.connect(WebsocketClientTransport.create(URI.create(adjustmentProperties.getBaseUrl() + "rsocket"))))
so reconnect
does not work as expected