• DestinationCache is now synchronized on multiple 'destination' locks (previously a single shared lock)
  • DestinationCache keeps destinations without any subscriptions (previously such destinations were recomputed over and over)
  • SessionSubscriptionRegistry is now a 'sessionId -> subscriptionId -> (destination,selector)' map for faster lookups (previously 'sessionId -> destination -> set of (subscriptionId,selector)')

closes gh-24395

Comment From: 7erg

Hey! It's very good implementation (especially its performance), but I have run into a deadlock:)

private final class DestinationCache {
...
        private final Queue<String> cacheEvictionPolicy = new LinkedList<>();

        public LinkedMultiValueMap<String, String> getSubscriptions(String destination) {
            LinkedMultiValueMap<String, String> subscriptions = this.destinationCache.get(destination);
            if (subscriptions == null) {
                subscriptions = this.destinationCache.computeIfAbsent(destination, dest -> { // catalina-exec-1 locked 0x0000000532af8070
                    LinkedMultiValueMap<String, String> sessionSubscriptions = calculateSubscriptions(destination);
                    synchronized (this.cacheEvictionPolicy) { //catalina-exec-1 waiting for lock 0x0000000506bb19b0
                        this.cacheEvictionPolicy.add(destination);
                    }
                    return sessionSubscriptions;
                });
                ensureCacheLimit();
            }
            return subscriptions;
        }

        private void ensureCacheLimit() {
            synchronized (this.cacheEvictionPolicy) { //catalina-exec-2 locked 0x0000000506bb19b0
                int limit = cacheLimit;
                while (this.cacheEvictionPolicy.size() > limit) {
                    this.destinationCache.remove(this.cacheEvictionPolicy.poll()); // catalina-exec-2 waiting to lock 0x0000000532af8070 in destinationCache.remove(...)
                }
            }
        }
...
}

I think it could be fixed removing first synchronized (this.cacheEvictionPolicy) and using private final Queue<String> cacheEvictionPolicy = new ConcurrentLinkedQueue<>();

Comment From: trim09

Hey! It's very good implementation (especially its performance), but I have run into a deadlock:)

ah, my apologize. I've found the same this morning. ConcurrentLinkedQueue is a great lock-free implementation, but have a look on it's size() method implementation. It traverse whole linked list to count the size :-/ LinkedBlockingQueue has two locks - one for add operation and one for take and fast size() operation. I think this will fix it.

Comment From: trim09

Thank you all for the review. I have fixed all findings mentioned above. Please have a look.

Comment From: trim09

I dedicated my Laptop i5-8250U @ 1.6Ghz, ram @ 2.4Ghz to perf tests for two days .Because it took ~6hours to run one set of JMH tests, I would like to share them with you. I ran the following:

  1. results-newImpl-4-threads.txt - proposed implementation with @Threads(4)
  2. results-newImpl-single-thread.txt - proposed implementation
  3. results-oldfixed-single-thread.tx - old implementation with a cacheLimit(0) fix described below
  4. results-oldFixed-4-threads.txt] - old implementation with @Threads(4) and the cacheLimit(0) fix

I had to fix setCacheLimit(0) on the old implementation by swapping these two lines: this.updateCache.put(destination); // check cache limit and remove eldest if over size limit this.accessCache.put(destination); // add item int the cache They were in a wrong order and could not keep the cache empty which I need for the perf test.

Spring DefaultSubscriptionRegistry: Reduced thread contention

results-oldFixed-4-threads.txt results-oldfixed-single-thread.txt

results-newImpl-single-thread.txt results-newImpl-4-threads.txt

Comment From: rstoyanchev

One thing that would still be useful for the benchmark is a setup category that has both finding and registering/unrestering. Within that category, patterns vs no patterns, where patterns should probably exceed the cache size, since they allow variation in destinations and a greater number of destinations, e.g. with path variables "/some/destination/{id}".

Comment From: trim09

....where patterns should probably exceed the cache size, since they allow variation in destinations and a greater number of destinations, e.g. with path variables "/some/destination/{id}".

@rstoyanchev I think I see your point, but I am not sure if I fully understand. I think there are no "pattern destinations". There are only "pattern subscriptions". Patterns can be passed in only on a subscription registration (the client registers on e.g. "/some/destination/" via _addSubscriptionInternal()). There is no possibility to use patterns from the server side. The server cannot send to _"/some/destination/", he needs to pick and choose one of static destinations e.g. "/some/destination/id12345".

Any (static/pattern) subscription registration updates only destinations that are currently cached. In other words subscription registration does not add any new cache entries and could not trigger a cache overflow.

The cache overflow can happen then server sends (findSubscriptionsInternal() method) to more destinations than is the cache size limit. This case is already covered by a benchmark, please see a cacheSizeLimt=0 column in a table above.

I am happy to implement more benchmarks. Could you please describe the benchmark setup you proposed in more details so I can get on the same page?

Comment From: rstoyanchev

Sorry, I meant to say where destinations (not patterns) exceed the cache size. I was merely pointing out the current setups demonstrate well individual aspects but it would be useful to have a combined setup (both find and subscribe/unsubscribe) as it happens at runtime. My second thought was that when patterns are used, it's easy to imagine the number of destinations exceeding the cache size and that would also be a good match for such a combined setup. Does that make sense?

By the way I'm already reviewing and polishing locally so please don't submit any further changes. I was just thinking out loud but it can be done separately later.

Comment From: trim09

@rstoyanchev Ok, thanks a lot. I will leave it as is for you without further changes to save you from ugly merges. I left removal of a this.subscriptionRegistry.addSubscription return value in my local repo, please, do It as a part of your polishing.

I am sorry for closing and reopening this issue. I unintentionally deleted (and then resurrected) a remote branch in my github repo and it most likely closed this PR. Please force push your changes if needed.

combined setup (both find and subscribe/unsubscribe) as it happens at runtime

I really like your idea of a more realistic performance test. We can do a setup where e.g.: * server is publishing messages on 10000 different destinations * cache size is 1024 * 1000? client sessions * each session has x% of static destination subscriptions and 100-x% of pattern subscriptions Honestly, I am not sure if I am able to setup such a test, because I think usages of websocket can differ so much, that I cannot think of all in a test.

E.g. our case on a single server in production is: * 2000 sessions * we use static subscriptions only (no patterns) * ~300 destinations in total. Some has no subscribers. Some (4 destinations) are subscribed by all sessions. * in average ~7 subscriptions per session. 4 common and then e.g. 3 other subscriptions. * 4000 delivered msg/s

I can imagine that someone is not using static destinations, but is using patterns only, or is registered to much more destinations. But you are right, we can at least try to deliver some kind setup and do a benchmark. Maybe I will get back to it once you are done with polishing/merging.

destinations (not patterns) exceed the cache size.

I think, this kind of benchmark is already there in a form of cache size = 0. In such case any find operation does not find an cached entry and perform recalculation, store and evict.

Comment From: rstoyanchev

@trim09 your changes are now in master. Thanks again for the very detailed contribution!

I did a little polishing, completely functionally neutral. That aside I also reviewed it with @jhoeller and we spotted one issue. Since the cacheSize is updated before the queue, it can happen that cacheSize > queue.size() when competing threads have updated the size but not the queue yet. Then another thread checking cacheSize can poll more than is available in the queue getting null and leading to NPE from ConcurrentHashMap. Switching the order ensures the queue always has more elements than cacheSize and eventually the two will get consistent as threads catch up. Accordingly we also now use remove instead of poll since the absence of an item would be a bug.

One more point that I wanted to raise with you before making changes. In the previous implementations, removal iterated over the cache looking for a match by subscriptionId as a way of finding matching destinations rather than pattern matching destinations. Is there any reason we can't do the same still for subscriptions with a pattern, i.e. this part?

I am sorry for closing and reopening this issue.

No worries.

For the more realistic test, yes we can work on that next and it can have a couple of sub-categories with static destinations like your scenario and also with patterns. Here is one example with stock quotes by ticker which would then broadcasts.

Comment From: trim09

In the previous implementations, removal iterated over the cache looking for a match by subscriptionId as a way of finding matching destinations rather than pattern matching destinations. Is there any reason we can't do the same still for subscriptions with a pattern, i.e. this part?

@rstoyanchev No, there is no reason do it in that new way. The older way looks better and is maybe even faster. Please, feel free to revert it to something like this:

String subscriptionId = subscription.getId();
this.destinationCache.forEach((destination, sessionIdToSubscriptionIds) -> {
    List<String> subscriptionIds = sessionIdToSubscriptionIds.get(sessionId);
    if (subscriptionIds != null && subscriptionIds.contains(subscriptionId)) {
        removeInternal(destination, sessionId, subscriptionId);
    }
});

cacheSize is updated before the queue

Great catch! I agree, it was in a wrong order. 👍

use remove instead of poll since the absence of an item would be a bug.

👍

I did a little polishing

It looks great! Thanks.

your changes are now in master. Thanks again for the very detailed contribution!

Cool! I really enjoyed my first contribution here and I am considering a next performance improvement.

Comment From: trim09

I closed this review as it has been already merged in master.

Comment From: rstoyanchev

I've updated the removal logic so we should be all good here.

I really enjoyed my first contribution here and I am considering a next performance improvement.

Thanks again, much appreciated!