Cache store and threads #13181
Replies: 4 comments
-
Unfortunately, without the rest of the code or TRACE I cannot say for sure. My guess is that the downstream Publisher is being subscribed to in a way that is requesting less than Integer.MAX_VALUE entries. Probably from the preload call at https://github.com/infinispan/infinispan/blob/main/core/src/main/java/org/infinispan/persistence/manager/PreloadManager.java#L98. So this means if during this initial subscribe it returns less objects than that your upstream Publisher will have to wait until another request happens. This might help to reproduce the issue by having more entries to preload. Can you confirm what version you are using and if this is embedded or server mode? If you always have to be on the same thread for the subscribe call, you will not be able to use the BlockingManager#blockingPublisher as it is entirely possible that a subscribe is done on a different thread each time. Instead you will need to do something yourself to pin the same blocking thread each invocation (we do not provide a way to do that out of the box). The only other option is if you can guarantee that all elements are returned from a single subscribe call (but that requires storing all entries in memory at one time which is most likely very bad for your case). |
Beta Was this translation helpful? Give feedback.
-
Hi, First I am using version 14.0.30-SNAPSHOT and I would say it is embedded. BTW it is very much based on the RocksDB implementation that ships with Infinispan. I was able to get it again (even if I have a production system that gets it regularly) and I capture this info:
But since everything but our code is RxJava code, it's very hard for me to know what launched the executing thread. Does that help figuring out where it comes from ? I'd like to know if it can be done at this level or if we need to provide some wrapping/delegating to manage this. Thanks |
Beta Was this translation helpful? Give feedback.
-
Looking at your stack trace I was able to confirm what is going on and it is related to what I said before. We use RxJava onSubscribe to offload operations to a blocking thread for requests via the onSubscribe operator. This is done here https://github.com/ReactiveX/RxJava/blob/3.x/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSubscribeOn.java#L90 Note how it offloads to a separate Scheduler/Executor. This works great for requests, however it appears that the cancel call, which is done in the take due to hitting the maximum request size it doesn't offload to the Scheduler/Executor and invokes directly at https://github.com/ReactiveX/RxJava/blob/3.x/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSubscribeOn.java#L141 It looks like we work around this by adding a call to https://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Flowable.html#unsubscribeOn-io.reactivex.rxjava3.core.Scheduler- However, this is still just a bandage over the real issue. If you have a connection that must be always invoked on the same thread, you really need to use your own Flowable and pass a Scheduler that only ever runs the subscribe and cancel operations on the single thread. While we reuse the same blocking thread whenever possible, there is no guarantee it will be invoked on the same one each time as we have a pool of blocking threads used by the BlockingManager. |
Beta Was this translation helpful? Give feedback.
-
I created #13619 to handle the onCancel for blockingPublisher. |
Beta Was this translation helpful? Give feedback.
-
I have implemented a NonBlockingStore for a key value pair clone of BDB. We've now been using it in production for the last 5-6 months and have lately noticed that sometimes when our app is starting, an exception occurs and the service restarts (so a bit harder to notice). Finally added some tracing logs and after a week finally got the issue to occur.
What I found is that the following code:
would trigger a critical error at
cursor.close()
. Here is the log where it finally happened:The first 2 lines show a normal close cursor followed by close transaction. All the normal one are on thread 'blocking-thread--p3-t2` while the offending one is on thread 'non-blocking-thread--p2-t12'.
This error has never happened at any other time that at startup time. My question is, should I expect to have calls from threads that don't own the transaction ? Here we can work in a thread safe way, but the thread that opens a transaction owns it and should be used for all related operations on that transaction.
Is that expected to be guaranteed or not ? (i.e. is it a bug) or is there a known way to configure our store to avoid this?
Thanks
Alain
Beta Was this translation helpful? Give feedback.
All reactions