Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First message lost between pub/sub sockets #4766

Open
jmspereira opened this issue Jan 7, 2025 · 0 comments
Open

First message lost between pub/sub sockets #4766

jmspereira opened this issue Jan 7, 2025 · 0 comments

Comments

@jmspereira
Copy link

jmspereira commented Jan 7, 2025

Hey everyone,
Sorry for creating another issue about this. But my question slightly differs from the other problems I found about this...

I have a middleware that uses zeromq as the communication layer, and in some specific cases, I am interested in receiving all the messages. To achieve this I used the monitor socket, and I only publish the first message when there are N (a magic number that for the cases that I require this, I know its value) connections established. However, it appears that sometimes, the first message is lost anyway... I isolated the core logic for this in the following minimal example (which when running locally does not replicate the issue, only when executing on a distributed set of machines):

import zmq
import time
import multiprocessing

from zmq import Flag, PollEvent, Event
from zmq.utils.monitor import recv_monitor_message


def pub_process():
    # Create a context and a PUB socket
    context = zmq.Context()
    pub_socket = context.socket(zmq.PUB)
    pub_socket.bind("tcp://*:5555")

    # Create monitor socket
    monitor_socket = pub_socket.get_monitor_socket()
    print("Waiting for subscriber to connect...")

    # Wait for the subscriber to connect
    while True:
        result = monitor_socket.poll(10)

        if result != PollEvent.POLLIN:
            continue

        event = recv_monitor_message(monitor_socket, Flag.DONTWAIT)

        if event["event"] == Event.ACCEPTED:
            print("Subscriber connected!")
            monitor_socket.close()
            break

    # Now send messages after the subscriber is connected
    for i in range(5):
        message = f"Message {i}"
        print(f"Publishing: {message}")
        pub_socket.send_string(message)
        time.sleep(1)

    # Send a stop message to the subscriber, then close the socket
    pub_socket.send_string("STOP")
    pub_socket.close()
    context.term()
    print("Publisher process finished")


def sub_process():
    # Create a context and a SUB socket
    context = zmq.Context()
    sub_socket = context.socket(zmq.SUB)
    sub_socket.setsockopt_string(zmq.SUBSCRIBE, "")
    sub_socket.connect("tcp://localhost:5555")

    # Poller setup to check for messages
    poller = zmq.Poller()
    poller.register(sub_socket, zmq.POLLIN)
    print("Waiting for messages...")

    # Wait for messages
    while True:
        socks = dict(poller.poll())
        if sub_socket in socks:
            message = sub_socket.recv_string()

            if message == "STOP":
                break

            print(f"Received: {message}")

    # Close the socket and terminate the context
    sub_socket.close()
    context.term()
    print("Subscriber process finished")


def main():
    pub_proc = multiprocessing.Process(target=pub_process, daemon=True)
    sub_proc = multiprocessing.Process(target=sub_process, daemon=True)

    pub_proc.start()
    sub_proc.start()

    pub_proc.join()
    sub_proc.join()


if __name__ == "__main__":
    main()

This issue #4746 suggests using the monitor sockets, which I am doing.
This issue #2267 suggests that the problem exists when the PUB connects to the SUB, which I am not doing.
According to the documentation here https://zguide.zeromq.org/docs/chapter1/#Getting-the-Message-Out, if I understand it correctly, this could happen if the PUB is already sending messages out when the SUB connects, which is not the case because the PUB only publishes the message when the connections are established.

So, does the monitor socket guarantee that the SUB is going to receive the message, or do I need to implement logic with another socket to guarantee that the PUB/SUB connection is fully established?

Best,
Jorge

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant