Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSub: do not depend on messages arriving in order to get control #49

Open
nmattia opened this issue Feb 25, 2016 · 21 comments
Open

PubSub: do not depend on messages arriving in order to get control #49

nmattia opened this issue Feb 25, 2016 · 21 comments

Comments

@nmattia
Copy link

nmattia commented Feb 25, 2016

Hedis' PubSub interface is really neat, making sure you can never exit without having unsubscribed to all the channels. Unfortunately (unless I missed something) you can never gain control unless a message arrives. A situation is the following: you subscribe to a channel, exchange with redis, and at some point want to unsubscribe; unfortunately you have to wait until the next message arrives in order for your code to be executed. This is clear from the type signature:

pubSub
    :: PubSub                 -- ^ Initial subscriptions.
    -> (Message -> IO PubSub) -- ^ Callback function.
    -> Core.Redis ()

You could prevent your callback from returning, but it's not particularly elegant, and you'd miss all the further incoming messages (really not an option).

The interface is actually really nice, but I think it could somehow be made more powerful. It's annoying to have to use "bare metal" commands from Core with in order to gain more flexibility. I don't have a solution for this, but it's probably worth thinking about. pubSub could look more like runRedis, except that it will lock you in if you are still subscribed to any channel, and will prevent you from running any non-pubsub command.

That is, there could be a PubSub monad (just a newtype wrapper Redis just allowing subscription/unsubscription) and pubSub would have type

pubSub 
    :: [Channel] -- ^ inital subscriptions
    -> PubSub a -- ^ action to run
    -> Either (PubSub a) (Core.Redis a)

so that in case you are not fully unsubscribed when returning (that is, if the result of the last command did not return a null number of subscriptions), it would return a Left (PubSub a) which you'll have to handle explicitly. This should also help with #28 since all requests can be handled explicitly before pubSub returns.

This is not as nice as the monoid interface, and probably not the best alternative, but I think it would work. Please let me know if I missed something very simple, and please share your thoughts in general. :)

@qrilka
Copy link
Contributor

qrilka commented Mar 16, 2016

I think here were have 2 (almost opposite) situations mixed together:

  1. Unsubscribing from pubsub without getting a new message
  2. Not reacting on messages after we have unsubscribed in the 1st callback (the problem of PubSub can still receive mesages after unsubscribe #28)

I think the 2nd one (and not missign incoming messages) could be resolved by just using IORef to store the fact that we have unsubscribed and also those incoming messages which were received after "usubscribe" was to be sent.
Regarding the 1st problem I think at least 2 threads are required and something send usbuscribe request to the one with PubSub in it. And this one doesn't look to be doable without getting into the library source code.

@k-bx
Copy link
Collaborator

k-bx commented Mar 18, 2016

@Nicowcow after thinking about it for a while I figure out that I can't understand your request. Current API seems to cover redis semantics in full.

I'm not sure I understood your monad proposal. What would it do that can't be done with current interface? Also, would it just hang at the end if not all subscriptions were unsubscribed from?

@nmattia
Copy link
Author

nmattia commented Mar 18, 2016

@k-bx I fear more and more that I have been missing something about the PubSub API. My use case is the following:

A publishes on two channels, foo and bar. To start with, B is only subscribed to foo. B receives all the foo messages from A, and everything is fine. After a while, A stops publishing on foo, and publishes only on bar. B would now like to subscribe to bar. How can B do that? What I understood from the current API is that B cannot subscribe to bar (at least not in the same thread) nor unsubscribe from foo (without a thread kill) until any message arrives on foo. This reason is that the pub sub API is purely callback-based, i.e. you can never take any action unless you have received a message. Am I missing something?

@nmattia
Copy link
Author

nmattia commented Mar 18, 2016

@qrilka do you mean the Hedis source code, or Redis'? I agree that we can't do much without getting into Hedis' source code, as the core modules are not exported. Is it a matter of the core API changing too often?

@informatikr
Copy link
Owner

@Nicowcow you are correct that Hedis does not allow you to re-take control from pubSub unless a message arrives. This is arguably a bug, and at least a weakness of the API.

The question would be how do we get control? What does the API look like? A simple timeout that unsubscribes all channels should be relatively easy to implement. But I think it would get really messy if we had different timeouts for different channels/patterns. #7 is related to this.

@k-bx
Copy link
Collaborator

k-bx commented Mar 18, 2016

@Nicowcow from your scenario, it's still not clear, how do you determine when foo messages are ended and who's responsible for that. If A knows when there will be no more foo messages – it could send a message "no more foo messages", and B will unsubscribe from foo in that case. Would this work? Other scenario I see is that you declare that there won't be any more foo messages in case of some time-period you haven't seen any. So, what's the semantics exactly? How does one decide that there won't be any more foo messages?

@Yuras
Copy link
Contributor

Yuras commented Mar 18, 2016

How does one decide that there won't be any more foo messages?

It could be some external event. If you want a particular example, imaging that we have a cluster managed by e.g. zookeeper. Each node publishes messages to a separate channel. Lets initially there is only one node, sending to "node1" channel, so we open a connection to redis and subscribe to the channel:

$ telnet localhost 6379
Trying ::1...
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
SUBSCRIBE node1
*3
$9
subscribe
$5
node1
:1

(Note that redis answer contains the channel I just subscribed. After receiving the answer I can know for sure that subscription was successful. It is not possible with hedis, there is no way to know that the answer arrived, see #28, which is not just a documentation bug in my opinion.)

Now other node comes online (channel "node2") and announces itself via zookeeper. Lets subscribe to the channel:

SUBSCRIBE node2
*3
$9
subscribe
$5
node2
:2

Now the first node goes down (e.g. crashes, or is just removed from the cluster), so zookeeper notifies us about that, we want to unsubscribe from its channel:

UNSUBSCRIBE node1
*3
$11
unsubscribe
$5
node1
:1

(Again, when I got the answer, I can know that unsubscribing was successful, and no more messages from that channel will come. It is not possible with hedis, see #28 )

Unfortunately the same subscribe/unsubscribe sequence is impossible with hedis because the API is too restrictive. As mentioned in #60 redis pub/sub is asynchronous, but the issue is that hedis API is not! I can't subscribe at any point of time!

Yes, there is a number of possible workarounds. E.g. I can use a separate connection per channel, or I can send a wake-up message to myself to get control. But they are ugly.

Possible API change could be to expose an IO-based pub/sub handle for sending subscribe and unsubscribe commands:

pubsub connection $ \h -> do
  forkIO $ forever $ do
    msg <- readFrom h
    ...
  subscribe h "channel1"      -- waits for answer from redis
  unsubscribe h "channel2"  -- the same

@informatikr
Copy link
Owner

First of all: Is there a Redis client in another language that has a really nice pubSub API that we might want to copy?

@Yuras SUBSCRIBE is a "special" redis command: It changes the "mode" of the connection and subsequently only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT are allowed on the connection. After all channels are unsubscribed, the connection returns to "normal mode". So while your proposal would work in principle, it would now be possible for the user to put "forbidden" (error-producing) commands between SUBSCRIBE/UNSUBSCRIBE.

Regarding the Nodes+Zookeeper scenario, wouldn't it be simpler to use PSUBSCRIBE "node*"?

I think that changing subscriptions while "in pubSub mode" is somewhat of a Redis anti-pattern. There are so many pitfalls and intricacies that it might be better to disallow it completely.

Again, I would be interested to see a shining example from another language that gets pubSub right.

@nmattia
Copy link
Author

nmattia commented Mar 18, 2016

thanks @Yuras for the redis comparison. There are indeed workarounds, though nothing feels quite right. On top of that, if I'm correct, all those workarounds make you lose the safety provided by the PubSub monoid, that is you could end up with a bad count of requests, and never exit even though you have actually unsubscribed from everything

@nmattia
Copy link
Author

nmattia commented Mar 18, 2016

@informatikr thanks for the input, and for taking the time to think about it. I had never considered changing subscriptions when in pubsub mode as an antipattern. Then, not allowing a change of subscription would be more of a feature than a bug, though we are still prevented from publishing until a new message arrives.

@k-bx
Copy link
Collaborator

k-bx commented Mar 18, 2016

@Nicowcow thanks for the problem explanation. I see now that indeed, we would need to make a two-way socket connection somehow in order to achieve "full power" which you'd like to use in this scenario.

One thing to try out would be to add a new async exception, which would pause msg-awaiting and would send subscribe/unsubscribe messages. I'm not sure it would work correctly, but it's worth a try.

@informatikr
Copy link
Owner

@Nicowcow "Anti-pattern" might be a strong word and is just my impression that isn't even backed by using it in practice. After all, it is possible and allowed in Redis. But a combination of

  • SUBSCRIBEing to a single channel or PSUBSCRIBEing to a single pattern and
  • using multiple connections for multiple subscriptions when patterns are not enough

also gets the job done and is, at least in my opinion, clearer.

Of course, the issue remains that we can not gain control over subscriptions until a message arrives (without ugly hacks).

@Yuras
Copy link
Contributor

Yuras commented Mar 18, 2016

@informatikr

Is there a Redis client in another language that has a really nice pubSub API that we might want to copy?

Python? https://pypi.python.org/pypi/redis
It has a special pubsub handler:

# connect to redis
r = redis.StrictRedis("localhost")

# create pubsub handler
h = r.pubsub()

# subscribe
h.subscripe("channel1")

# read message
msg = h.get_message()

# publish, note that it is done with r, not via h
r.publish("channel1", "hello")

# unsubscribe
h.unsubscripe("channel1")

I never used python, but from documentation I see that it supports all the necessary functionality.

SUBSCRIBE is a "special" redis command

In my example h should allow only (p)(un)subscripe and readFrom.

Regarding the Nodes+Zookeeper scenario, wouldn't it be simpler to use PSUBSCRIBE "node*"?

It is just another workaround. There could not be any naming schema. Or we just don't want to subscribe to all nodes. Or whatever.

@Yuras
Copy link
Contributor

Yuras commented Mar 18, 2016

Also, python redis library supports reading subscribe/unsubscribe confirmation:

>>> p.get_message()
{'pattern': None, 'type': 'subscribe', 'channel': 'my-second-channel', 'data': 1L}

@k-bx
Copy link
Collaborator

k-bx commented Mar 18, 2016

@Yuras it's not clear to me how python code does what we want. Wouldn't h.get_message() be stuck until a message arrives as well as Haskell's implementation? (I think yes) What @Nicowcow wants is to be able, during waiting for a new message, to be able to also send unsubscribe command based on an external event.

@Yuras
Copy link
Contributor

Yuras commented Mar 18, 2016

@k-bx

it's not clear to me how python code does what we want.

You should probably read the docs I liked to:

There are three different strategies for reading messages.

The examples above have been using pubsub.get_message(). Behind the scenes, get_message() uses the system’s ‘select’ module to quickly poll the connection’s socket. If there’s data available to be read, get_message() will read it, format the message and return it or pass it to a message handler. If there’s no data to be read, get_message() will immediately return None. This makes it trivial to integrate into an existing event loop inside your application.

Older versions of redis-py only read messages with pubsub.listen(). listen() is a generator that blocks until a message is available. If your application doesn’t need to do anything else but receive and act on messages received from redis, listen() is an easy way to get up an running.

The third option runs an event loop in a separate thread. pubsub.run_in_thread() creates a new thread and starts the event loop. The thread object is returned to the caller of run_in_thread(). The caller can use the thread.stop() method to shut down the event loop and thread. Behind the scenes, this is simply a wrapper around get_message() that runs in a separate thread, essentially creating a tiny non-blocking event loop for you. run_in_thread() takes an optional sleep_time argument. If specified, the event loop will call time.sleep() with the value in each iteration of the loop.

In the example I provided I simply fork a worker thread to read messages:

pubsub connection $ \h -> do
  forkIO $ forever $ do
    msg <- readFrom h
    ...
  subscribe h "channel1"      -- waits for answer from redis
  unsubscribe h "channel2"  -- the same

What @Nicowcow wants is to be able, during waiting for a new message, to be able to also send unsubscribe command based on an external event.

Yes, I understand that. And that is exactly what I want too.

@qrilka
Copy link
Contributor

qrilka commented Mar 18, 2016

@Yuras reading python client source code doesn't show that it could resolve #28 somehow, what it does is just sending commands (see https://github.com/andymccurdy/redis-py/blob/master/redis/connection.py#L537) without doing any response reads.
The only difference would be that you would need to manually check subscription/unsubscription acknowledgements but probably as hedis does not provide you with any guarantee that "raw" interface could be even better than fiddling with something like IORefs

@Yuras
Copy link
Contributor

Yuras commented Mar 19, 2016

@qrilka It is not necessary to copy python API exactly. As I mentioned a number of times, (p)(un)subscribe should block waiting for acknowledgement. Unlike in python, we have lightweight threads, so if someone wants to unsubscribe asynchronously, he can fork a thread. Also the IORef workaround doesn't work with subscribing.

@nmattia
Copy link
Author

nmattia commented Mar 19, 2016

Indeed what I want is to be able to unsubscribe based on an external event, but I realized above that publishing based on an external event is probably what even more people would like. Hedis is pretty conservative in what modules it exposes, but maybe creating a simple API that wraps the raw pub sub redis calls would be enough to start with, and give people more freedom (it'd still be in the Redis monad, so the users would have to make sure they don't break anything).

From there you'd just have a few internal functions like subscribe :: [Channel] -> Redis (), publish :: Channel -> String -> Redis Int, etc. The current PubSub module could make use of that too, but exposing the new module would allow people to trade some safety for better control. @informatikr what do you think of such an intermediate solution? Is there a reason why Hedis doesn't otherwise expose its inner modules?

@k0001
Copy link

k0001 commented Mar 21, 2016

A while ago I made some changes to it to address this concern. I haven't tested it extensively though: k0001@4b64450?w=1

wuzzeb added a commit to wuzzeb/hedis that referenced this issue Jul 30, 2016
To implement issue informatikr#49, add a new multithreaded Pub/Sub message processing feature.
The main benefit over the existing Pub/Sub code (which is left unchanged) is:

- you can make subscription changes at any time
- you can safely recover from networking errors such as the redis servier dying
- you can detect when Redis has actually processed a subscription request and
  handlers will now start receiving messages.
@wuzzeb
Copy link
Contributor

wuzzeb commented Jul 30, 2016

I implemented this feature in commit #77. I went through several designs. I initially tried an approach like @nmattia suggested with just a few internal functions, but in order to work properly with multiple threads you essentially need raw access to the underlying socket, which should not be exposed as part of the public API. So the design I settled on is one thread for reading, one thread for writing, and a controller that allows you to interact with the reading/writing threads.

I also wanted to properly handle network disconnects and resubscribing to channels after a disconnect. It made sense to make that directly part of the threads since the underlying implementation is tracking channels anyway.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants