Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: zeromq/libzmq
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: b95d94935ed107679fd0ad9efd2f3d47309b6fd3
Choose a base ref
...
head repository: zeromq/libzmq
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 90b4f410a07222fa2e9a5f53b454a09d4533e45a
Choose a head ref
  • 10 commits
  • 7 files changed
  • 5 contributors

Commits on Aug 9, 2024

  1. Copy the full SHA
    aa77c03 View commit details
  2. Problem: tests reconnect_stop_after_disconnect failed

    Solution: this test failed because "_disconnected" is not initialized
    in constructor, and the behavior of reconnect will not be as designed
    when "_disconnected" is randomly assigned to true.
    So we specify it as false in initialize list to solve this problem.
    githejie committed Aug 9, 2024
    Copy the full SHA
    ce17349 View commit details

Commits on Aug 11, 2024

  1. Merge pull request #4721 from githejie/reconnect_options

    Fix issue with ZMQ_RECONNECT_STOP_AFTER_DISCONNECT option
    bluca authored Aug 11, 2024
    Copy the full SHA
    6aaafe0 View commit details
  2. Copy the full SHA
    b714341 View commit details

Commits on Aug 16, 2024

  1. Problem: CI build android on ubuntu-latest failed

    Solution: Fixed the NDK version to android-ndk-r25 to
    avoid using unsupported NDK version on ubuntu-latest.
    githejie authored and bluca committed Aug 16, 2024
    Copy the full SHA
    4c6cff6 View commit details

Commits on Aug 21, 2024

  1. Fix CMake-generated libzmq.pc file

    This change mirrors the Autotools-based build system behavior for
    cross-compiling for Windows with static linking.
    hebasto authored and bluca committed Aug 21, 2024
    Copy the full SHA
    5f408ba View commit details

Commits on Aug 23, 2024

  1. Problem: create_ipc_wildcard_address can fail

    check and handle failure
    minrk authored and bluca committed Aug 23, 2024
    Copy the full SHA
    0ed7a08 View commit details

Commits on Aug 26, 2024

  1. Problem: ipc connect can fail on Windows, even after bind (#4734)

    * ipc connect can fail, even after bind
    minrk authored Aug 26, 2024
    Copy the full SHA
    1f4dd54 View commit details

Commits on Sep 6, 2024

  1. Fix format issue of zmq_socket.adoc

    githejie authored and bluca committed Sep 6, 2024
    Copy the full SHA
    64db7d2 View commit details

Commits on Sep 24, 2024

  1. cmake: Refer to prefix variable in generated libzmq.pc

    This change:
    1. Makes the `libzmq.pc` files generated by Autotools and CMake more
    aligned.
    2. Allows the `prefix` variable to be redefined if the package is
    relocated.
    hebasto authored and bluca committed Sep 24, 2024
    Copy the full SHA
    90b4f41 View commit details
Showing with 101 additions and 22 deletions.
  1. +3 −0 .github/workflows/CI.yaml
  2. +9 −3 CMakeLists.txt
  3. +7 −7 doc/zmq_socket.adoc
  4. +4 −0 src/clock.cpp
  5. +33 −11 src/ip.cpp
  6. +2 −1 src/socket_base.cpp
  7. +43 −0 tests/test_reconnect_options.cpp
3 changes: 3 additions & 0 deletions .github/workflows/CI.yaml
Original file line number Diff line number Diff line change
@@ -34,6 +34,7 @@ jobs:
POLLER: poll
- os: ubuntu-latest
BUILD_TYPE: android
NDK_VERSION: android-ndk-r25
DRAFT: disabled
- os: ubuntu-latest
BUILD_TYPE: coverage
@@ -148,6 +149,8 @@ jobs:
USE_NSS: ${{ matrix.USE_NSS }}
VMCI: ${{ matrix.VMCI }}
POLLER: ${{ matrix.POLLER }}
NDK_VERSION: ${{ matrix.NDK_VERSION }}
ANDROID_NDK_ROOT: /tmp/${{ matrix.NDK_VERSION }}
steps:
- name: Add msbuild to PATH
uses: microsoft/setup-msbuild@v1.0.2
12 changes: 9 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -567,12 +567,18 @@ if(ZMQ_HAVE_WINDOWS)
# Cannot use check_library_exists because the symbol is always declared as char(*)(void)
set(CMAKE_REQUIRED_LIBRARIES "ws2_32.lib")
check_cxx_symbol_exists(WSAStartup "winsock2.h" HAVE_WS2_32)
if(HAVE_WS2_32)
set(pkg_config_libs_private "${pkg_config_libs_private} -lws2_32")
endif()

set(CMAKE_REQUIRED_LIBRARIES "rpcrt4.lib")
check_cxx_symbol_exists(UuidCreateSequential "rpc.h" HAVE_RPCRT4)

set(CMAKE_REQUIRED_LIBRARIES "iphlpapi.lib")
check_cxx_symbol_exists(GetAdaptersAddresses "winsock2.h;iphlpapi.h" HAVE_IPHLAPI)
if(HAVE_IPHLAPI)
set(pkg_config_libs_private "${pkg_config_libs_private} -liphlpapi")
endif()
check_cxx_symbol_exists(if_nametoindex "iphlpapi.h" HAVE_IF_NAMETOINDEX)

set(CMAKE_REQUIRED_LIBRARIES "")
@@ -1250,9 +1256,9 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/builds/cmake/platform.hpp.in ${CMAKE_
list(APPEND sources ${CMAKE_CURRENT_BINARY_DIR}/platform.hpp)

set(prefix ${CMAKE_INSTALL_PREFIX})
set(exec_prefix ${prefix})
set(libdir ${prefix}/${CMAKE_INSTALL_LIBDIR})
set(includedir ${prefix}/${CMAKE_INSTALL_INCLUDEDIR})
set(exec_prefix "\${prefix}")
set(libdir "\${prefix}/${CMAKE_INSTALL_LIBDIR}")
set(includedir "\${prefix}/${CMAKE_INSTALL_INCLUDEDIR}")
set(VERSION ${ZMQ_VERSION_MAJOR}.${ZMQ_VERSION_MINOR}.${ZMQ_VERSION_PATCH})
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/src/libzmq.pc.in ${CMAKE_CURRENT_BINARY_DIR}/libzmq.pc @ONLY)
set(zmq-pkgconfig ${CMAKE_CURRENT_BINARY_DIR}/libzmq.pc)
14 changes: 7 additions & 7 deletions doc/zmq_socket.adoc
Original file line number Diff line number Diff line change
@@ -164,7 +164,7 @@ Groups are matched using exact matching (vs prefix matching of PubSub).
NOTE: Radio-dish is still in draft phase.

ZMQ_RADIO
^^^^^^^
^^^^^^^^^
A socket of type 'ZMQ_RADIO' is used by a _publisher_ to distribute data.
Each message belong to a group, a group is specified with xref:zmq_msg_set_group.adoc[zmq_msg_set_group]
Messages are distributed to all members of a group.
@@ -339,15 +339,15 @@ Outgoing routing strategy:: N/A
Action in mute state:: Block

Scatter-gather pattern
~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~~~~
The scatter-gather pattern is the thread-safe version of the pipeline pattern.
The scatter-gather pattern is used for distributing data to _nodes_ arranged in
a pipeline. Data always flows down the pipeline, and each stage of the pipeline
is connected to at least one _node_. When a pipeline stage is connected to
multiple _nodes_ data is round-robined among all connected _nodes_.

ZMQ_SCATTER
^^^^^^^^
^^^^^^^^^^^
A socket of type 'ZMQ_SCATTER' is used by a scatter-gather _node_ to send messages
to downstream scatter-gather _nodes_. Messages are round-robined to all connected
downstream _nodes_. The _zmq_recv()_ function is not implemented for this
@@ -374,7 +374,7 @@ Action in mute state:: Block


ZMQ_GATHER
^^^^^^^^
^^^^^^^^^^
A socket of type 'ZMQ_GATHER' is used by a scatter-gather _node_ to receive messages
from upstream scatter-gather _nodes_. Messages are fair-queued from among all
connected upstream _nodes_. The _zmq_send()_ function is not implemented for
@@ -433,7 +433,7 @@ Action in mute state:: Block


Peer-to-peer pattern
~~~~~~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~~

The peer-to-peer pattern is used to connect a peer to multiple peers.
Peer can both connect and bind and mix both of them with the same socket.
@@ -474,7 +474,7 @@ Incoming routing strategy:: Fair-queued
Action in mute state:: Return EAGAIN

Channel pattern
~~~~~~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~
The channel pattern is the thread-safe version of the exclusive pair pattern.
The channel pattern is used to connect a peer to precisely one other
peer. This pattern is used for inter-thread communication across the inproc
@@ -483,7 +483,7 @@ transport.
NOTE: Channel is still in draft phase.

ZMQ_CHANNEL
^^^^^^^^
^^^^^^^^^^^
A socket of type 'ZMQ_CHANNEL' can only be connected to a single peer at any one
time. No message routing or filtering is performed on messages sent over a
'ZMQ_CHANNEL' socket.
4 changes: 4 additions & 0 deletions src/clock.cpp
Original file line number Diff line number Diff line change
@@ -223,6 +223,10 @@ uint64_t zmq::clock_t::rdtsc ()
((13 & 15) << 3) | // crm
((0 & 7) << 0)); // op2
return _ReadStatusReg (pmccntr_el0);
#elif (defined(_WIN32) && defined(__GNUC__) && defined(__aarch64__))
uint64_t val;
__asm__ volatile("mrs %0, pmccntr_el0" : "=r"(val));
return val;
#elif (defined __GNUC__ && (defined __i386__ || defined __x86_64__))
uint32_t low, high;
__asm__ volatile("rdtsc" : "=a"(low), "=d"(high));
44 changes: 33 additions & 11 deletions src/ip.cpp
Original file line number Diff line number Diff line change
@@ -23,6 +23,11 @@
#include "tcp.hpp"
#ifdef ZMQ_HAVE_IPC
#include "ipc_address.hpp"
// Don't try ipc if it fails once
namespace zmq
{
static bool try_ipc_first = true;
}
#endif

#include <direct.h>
@@ -555,17 +560,26 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_)

// It appears that a lack of runtime AF_UNIX support
// can fail in more than one way.
// At least: open_socket can fail or later in bind
// At least: open_socket can fail or later in bind or even in connect after bind
bool ipc_fallback_on_tcpip = true;

if (!zmq::try_ipc_first) {
// a past ipc attempt failed, skip straight to try_tcpip in the future;
goto try_tcpip;
}

// Create a listening socket.
const SOCKET listener = open_socket (AF_UNIX, SOCK_STREAM, 0);
if (listener == retired_fd) {
// This may happen if the library was built on a system supporting AF_UNIX, but the system running doesn't support it.
goto try_tcpip;
}

create_ipc_wildcard_address (dirname, filename);
rc = create_ipc_wildcard_address (dirname, filename);
if (rc != 0) {
// This may happen if tmpfile creation fails
goto error_closelistener;
}

// Initialise the address structure.
rc = address.resolve (filename.c_str ());
@@ -581,8 +595,7 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_)
goto error_closelistener;
}
// if we got here, ipc should be working,
// so raise any remaining errors
ipc_fallback_on_tcpip = false;
// but there are at least some cases where connect can still fail

// Listen for incoming connections.
rc = listen (listener, 1);
@@ -593,24 +606,28 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_)

rc = getsockname (listener, reinterpret_cast<struct sockaddr *> (&lcladdr),
&lcladdr_len);
wsa_assert (rc != -1);
wsa_assert (rc == 0);

// Create the client socket.
*w_ = open_socket (AF_UNIX, SOCK_STREAM, 0);
if (*w_ == -1) {
if (*w_ == retired_fd) {
errno = wsa_error_to_errno (WSAGetLastError ());
goto error_closelistener;
}

// Connect to the remote peer.
rc = ::connect (*w_, reinterpret_cast<const struct sockaddr *> (&lcladdr),
lcladdr_len);
if (rc == -1) {
if (rc != 0) {
errno = wsa_error_to_errno (WSAGetLastError ());
goto error_closeclient;
}
// if we got here, ipc should be working,
// so raise any remaining errors
ipc_fallback_on_tcpip = false;

*r_ = accept (listener, NULL, NULL);
errno_assert (*r_ != -1);
wsa_assert (*r_ != retired_fd);

// Close the listener socket, we don't need it anymore.
rc = closesocket (listener);
@@ -632,6 +649,7 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_)
saved_errno = errno;
rc = closesocket (*w_);
wsa_assert (rc == 0);
*w_ = retired_fd;
errno = saved_errno;

error_closelistener:
@@ -659,9 +677,13 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_)

try_tcpip:
// try to fallback to TCP/IP
// TODO: maybe remember this decision permanently?
#endif

rc = make_fdpair_tcpip (r_, w_);
if (rc == 0 && zmq::try_ipc_first) {
// ipc didn't work but tcp/ip did; skip ipc in the future
zmq::try_ipc_first = false;
}
return rc;
#endif // ZMQ_HAVE_IPC
return make_fdpair_tcpip (r_, w_);
#elif defined ZMQ_HAVE_OPENVMS

3 changes: 2 additions & 1 deletion src/socket_base.cpp
Original file line number Diff line number Diff line change
@@ -228,7 +228,8 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_,
_monitor_events (0),
_thread_safe (thread_safe_),
_reaper_signaler (NULL),
_monitor_sync ()
_monitor_sync (),
_disconnected (false)
{
options.socket_id = sid_;
options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
43 changes: 43 additions & 0 deletions tests/test_reconnect_options.cpp
Original file line number Diff line number Diff line change
@@ -246,6 +246,46 @@ void reconnect_stop_on_handshake_failed ()
}
#endif

#if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_HAVE_IPC)
// test stopping reconnect after disconnect
void reconnect_stop_after_disconnect ()
{
// Setup sub socket
void *sub = test_context_socket (ZMQ_SUB);
// Monitor all events on sub
TEST_ASSERT_SUCCESS_ERRNO (
zmq_socket_monitor (sub, "inproc://monitor-sub", ZMQ_EVENT_ALL));
// Create socket for collecting monitor events
void *sub_mon = test_context_socket (ZMQ_PAIR);
// Connect so they'll get events
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub_mon, "inproc://monitor-sub"));
// Set option to stop reconnecting after disconnect
int stopReconnectAfterDisconnect = ZMQ_RECONNECT_STOP_AFTER_DISCONNECT;
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sub, ZMQ_RECONNECT_STOP, &stopReconnectAfterDisconnect,
sizeof (stopReconnectAfterDisconnect)));

// Connect to a dummy that cannot be connected
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "ipc://@dummy"));

// Confirm that connect failed and reconnect
expect_monitor_event (sub_mon, ZMQ_EVENT_CLOSED);
expect_monitor_event (sub_mon, ZMQ_EVENT_CONNECT_RETRIED);

// Disconnect the sub socket
TEST_ASSERT_SUCCESS_ERRNO (zmq_disconnect (sub, "ipc://@dummy"));

// Confirm that connect failed and will not reconnect
expect_monitor_event (sub_mon, ZMQ_EVENT_CLOSED);

// Close sub
test_context_socket_close_zero_linger (sub);

// Close monitor
test_context_socket_close_zero_linger (sub_mon);
}
#endif

void setUp ()
{
setup_test_context ();
@@ -267,6 +307,9 @@ int main (void)
#ifdef ZMQ_BUILD_DRAFT_API
RUN_TEST (reconnect_stop_on_refused);
RUN_TEST (reconnect_stop_on_handshake_failed);
#endif
#if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_HAVE_IPC)
RUN_TEST (reconnect_stop_after_disconnect);
#endif
return UNITY_END ();
}