Skip to content

Commit

Permalink
Merge pull request #1822 from CounterpartyXCP/zmq
Browse files Browse the repository at this point in the history
ZMQ Server
  • Loading branch information
ouziel-slama authored May 29, 2024
2 parents 49826b1 + 9b6d43c commit c481d14
Show file tree
Hide file tree
Showing 13 changed files with 208 additions and 13 deletions.
23 changes: 22 additions & 1 deletion apiary.apib
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[//]: # (Generated by genapidoc.py on 2024-05-22 18:18:42.799794. Do not edit manually.)
[//]: # (Generated by genapidoc.py on 2024-05-27 11:58:19.844640. Do not edit manually.)
FORMAT: 1A
HOST: https://api.counterparty.io:4000

Expand Down Expand Up @@ -85,6 +85,27 @@ Or to know the events triggered by a given transaction:

`/v2/transactions/<tx_hash>/events`

### ZMQ Publisher

You can enable a ZMQ server by starting `counteparty-server` with the `--enable-zmq-publisher` flag.
All events are published, each in a specific topic. You can subscribe to the events that interest you. For example in Python:

```
context = zmq.asyncio.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.RCVHWM, 0)
socket.setsockopt_string(zmq.SUBSCRIBE, "CREDIT")
socket.setsockopt_string(zmq.SUBSCRIBE, "DEBIT")
```

You can use an empty string to subscribe to all events.

By default events are published on port `4001`, you can customize this port with the flag `--zmq-publisher-port`.

You can see a complete, working example in Python here: https://github.com/CounterpartyXCP/counterparty-core/blob/master/counterparty-core/tools/zmqclient.py.

### Events Reference

Here is a list of events classified by theme and for each an example response:


Expand Down
11 changes: 11 additions & 0 deletions counterparty-core/counterpartycore/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,17 @@ def float_range_checker(arg):
"help": "port on which bitcoind will publish ZMQ notificiations for `rawblock` topic",
},
],
[
("--enable-zmq-publisher",),
{"action": "store_true", "default": False, "help": "Enable ZMQ events publisher"},
],
[
("--zmq-publisher-port",),
{
"type": int,
"help": "port on which Counterparty server will publish ZMQ notificiations for every event",
},
],
]


Expand Down
11 changes: 1 addition & 10 deletions counterparty-core/counterpartycore/lib/api/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
function_needs_db,
get_backend_height,
init_api_access_log,
inject_dispensers,
inject_issuance,
inject_normalized_quantities,
inject_details,
remove_rowids,
to_json,
)
Expand Down Expand Up @@ -199,13 +197,6 @@ def execute_api_function(db, route, function_args):
return result


def inject_details(db, result):
result = inject_dispensers(db, result)
result = inject_issuance(db, result)
result = inject_normalized_quantities(result)
return result


def get_transaction_name(rule):
if rule == "/v2/":
return "APIRoot"
Expand Down
7 changes: 7 additions & 0 deletions counterparty-core/counterpartycore/lib/api/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,13 @@ def inject_dispensers(db, result):
return result


def inject_details(db, result):
result = inject_dispensers(db, result)
result = inject_issuance(db, result)
result = inject_normalized_quantities(result)
return result


def redirect_to_rpc_v1():
"""
Redirect to the RPC API v1.
Expand Down
4 changes: 4 additions & 0 deletions counterparty-core/counterpartycore/lib/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@
DEFAULT_ZMQ_RAWBLOCK_PORT_TESTNET = 19333
DEFAULT_ZMQ_RAWBLOCK_PORT = 9333

DEFAULT_ZMQ_PUBLISHER_PORT_REGTEST = 24001
DEFAULT_ZMQ_PUBLISHER_PORT_TESTNET = 14001
DEFAULT_ZMQ_PUBLISHER_PORT = 4001

UNSPENDABLE_REGTEST = "mvCounterpartyXXXXXXXXXXXXXXW24Hef"
UNSPENDABLE_TESTNET = "mvCounterpartyXXXXXXXXXXXXXXW24Hef"
UNSPENDABLE_MAINNET = "1CounterpartyXXXXXXXXXXXXXXXUWLpVr"
Expand Down
2 changes: 1 addition & 1 deletion counterparty-core/counterpartycore/lib/ledger.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def add_to_journal(db, block_index, command, category, event, bindings):

BLOCK_JOURNAL.append(f"{command}{category}{bindings_string}")

log.log_event(block_index, event, items)
log.log_event(db, block_index, message_index, event, items)


def replay_event(db, event, action, table, bindings, id_name=None):
Expand Down
34 changes: 33 additions & 1 deletion counterparty-core/counterpartycore/lib/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
from datetime import datetime
from logging.handlers import RotatingFileHandler

import zmq
from colorlog import ColoredFormatter
from dateutil.tz import tzlocal
from termcolor import cprint

from counterpartycore.lib import config, util
from counterpartycore.lib.api.util import inject_details, to_json

logging.TRACE = logging.DEBUG - 5
logging.addLevelName(logging.TRACE, "TRACE")
Expand Down Expand Up @@ -133,15 +135,45 @@ def isodt(epoch_time):
}


def log_event(block_index, event_name, bindings):
def log_event(db, block_index, event_index, event_name, bindings):
if config.JSON_LOG:
logger.info({"event": event_name, "bindings": bindings})
elif event_name in EVENTS:
block_name = "mempool" if util.PARSING_MEMPOOL else block_index
log_message = f"[{block_name}] {EVENTS[event_name]}"
logger.info(log_message, bindings)
# Publish event to ZMQ
if config.ENABLE_ZMQ_PUBLISHER:
zmq_publisher = ZmqPublisher()
zmq_event = {
"event": event_name,
"params": bindings,
"mempool": util.PARSING_MEMPOOL,
"description": EVENTS[event_name] % bindings,
}
if not util.PARSING_MEMPOOL:
zmq_event["block_index"] = block_index
zmq_event["event_index"] = event_index
zmq_publisher.publish_event(db, zmq_event)


def shutdown():
logger.info("Shutting down logging...")
logging.shutdown()


class ZmqPublisher(metaclass=util.SingletonMeta):
def __init__(self):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PUB)
self.socket.bind("tcp://*:%s" % config.ZMQ_PUBLISHER_PORT)

def publish_event(self, db, event):
logger.debug("Publishing event: %s", event["event"])
event = inject_details(db, event)
self.socket.send_multipart([event["event"].encode("utf-8"), to_json(event).encode("utf-8")])

def close(self):
if self.socket:
self.socket.close(linger=0)
self.context.term()
14 changes: 14 additions & 0 deletions counterparty-core/counterpartycore/lib/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,3 +536,17 @@ def get_value_by_block_index(change_name, block_index=None):
max_block_index = key

return PROTOCOL_CHANGES[change_name][index_name][max_block_index]["value"]


class SingletonMeta(type):
_instances = {}

def __call__(cls, *args, **kwargs):
"""
Possible changes to the value of the `__init__` argument do not affect
the returned instance.
"""
if cls not in cls._instances:
instance = super().__call__(*args, **kwargs)
cls._instances[cls] = instance
return cls._instances[cls]
34 changes: 34 additions & 0 deletions counterparty-core/counterpartycore/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ def initialise_config(
no_telemetry=False,
zmq_sequence_port=None,
zmq_rawblock_port=None,
enable_zmq_publisher=False,
zmq_publisher_port=None,
):
# log config alreasdy initialized
logger.debug("VERBOSE: %s", config.VERBOSE)
Expand Down Expand Up @@ -468,6 +470,36 @@ def initialise_config(
"Please specific a valid port number rpc-port configuration parameter"
)

# ZMQ Publisher
config.ENABLE_ZMQ_PUBLISHER = enable_zmq_publisher

if zmq_publisher_port:
config.ZMQ_PUBLISHER_PORT = zmq_publisher_port
else:
if config.TESTNET:
if config.TESTCOIN:
config.ZMQ_PUBLISHER_PORT = config.DEFAULT_ZMQ_PUBLISHER_PORT_TESTNET + 1
else:
config.ZMQ_PUBLISHER_PORT = config.DEFAULT_ZMQ_PUBLISHER_PORT_TESTNET
elif config.REGTEST:
if config.TESTCOIN:
config.ZMQ_PUBLISHER_PORT = config.DEFAULT_ZMQ_PUBLISHER_PORT_REGTEST + 1
else:
config.ZMQ_PUBLISHER_PORT = config.DEFAULT_ZMQ_PUBLISHER_PORT_REGTEST
else:
if config.TESTCOIN:
config.ZMQ_PUBLISHER_PORT = config.DEFAULT_ZMQ_PUBLISHER_PORT + 1
else:
config.ZMQ_PUBLISHER_PORT = config.DEFAULT_ZMQ_PUBLISHER_PORT
try:
config.ZMQ_PUBLISHER_PORT = int(config.ZMQ_PUBLISHER_PORT)
if not (int(config.ZMQ_PUBLISHER_PORT) > 1 and int(config.ZMQ_PUBLISHER_PORT) < 65535):
raise ConfigurationError("invalid ZMQ publisher port number")
except: # noqa: E722
raise ConfigurationError( # noqa: B904
"Please specific a valid port number rpc-port configuration parameter"
)

# Server API user
if api_user:
config.API_USER = api_user
Expand Down Expand Up @@ -621,6 +653,8 @@ def initialise_log_and_config(args):
"no_telemetry": args.no_telemetry,
"zmq_sequence_port": args.zmq_sequence_port,
"zmq_rawblock_port": args.zmq_rawblock_port,
"enable_zmq_publisher": args.enable_zmq_publisher,
"zmq_publisher_port": args.zmq_publisher_port,
}

initialise_log_config(
Expand Down
2 changes: 2 additions & 0 deletions counterparty-core/counterpartycore/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ def api_server_v2(request, cp_server):
"no_telemetry": True,
"zmq_rawblock_port": None,
"zmq_sequence_port": None,
"enable_zmq_publisher": False,
"zmq_publisher_port": None,
}
server_config = (
default_config
Expand Down
21 changes: 21 additions & 0 deletions counterparty-core/tools/apidoc/blueprint-template.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,27 @@ Or to know the events triggered by a given transaction:

`/v2/transactions/<tx_hash>/events`

### ZMQ Publisher

You can enable a ZMQ server by starting `counteparty-server` with the `--enable-zmq-publisher` flag.
All events are published, each in a specific topic. You can subscribe to the events that interest you. For example in Python:

```
context = zmq.asyncio.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.RCVHWM, 0)
socket.setsockopt_string(zmq.SUBSCRIBE, "CREDIT")
socket.setsockopt_string(zmq.SUBSCRIBE, "DEBIT")
```

You can use an empty string to subscribe to all events.

By default events are published on port `4001`, you can customize this port with the flag `--zmq-publisher-port`.

You can see a complete, working example in Python here: https://github.com/CounterpartyXCP/counterparty-core/blob/master/counterparty-core/tools/zmqclient.py.

### Events Reference

Here is a list of events classified by theme and for each an example response:

<EVENTS_DOC>
Expand Down
57 changes: 57 additions & 0 deletions counterparty-core/tools/zmqclient.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import asyncio
import json
import logging
import signal
import time
import traceback

import zmq
import zmq.asyncio

logger = logging.getLogger(__name__)

ZMQ_PORT = 4001


class CounterpartyWatcher:
def __init__(self):
print("Initializing Counterparty watcher...")
self.loop = asyncio.get_event_loop()
self.connect_to_zmq()

def connect_to_zmq(self):
self.zmq_context = zmq.asyncio.Context()
self.zmq_sub_socket = self.zmq_context.socket(zmq.SUB)
self.zmq_sub_socket.setsockopt(zmq.RCVHWM, 0)
# "" => Subscribe to all events
self.zmq_sub_socket.setsockopt_string(zmq.SUBSCRIBE, "")
self.zmq_sub_socket.connect(f"tcp://localhost:{ZMQ_PORT}")

async def handle(self):
try:
event_name, event = await self.zmq_sub_socket.recv_multipart(flags=zmq.NOBLOCK)
event = json.loads(event.decode("utf-8"))
print(event)
except zmq.ZMQError:
time.sleep(1)
except Exception as e:
logger.error(traceback.format_exc())
self.stop()
raise e
# schedule ourselves to receive the next message
asyncio.ensure_future(self.handle())

def start(self):
print("Starting Counterparty watcher...")
self.loop.add_signal_handler(signal.SIGINT, self.stop)
self.loop.create_task(self.handle())
self.loop.run_forever()

def stop(self):
print("Stopping Counterparty watcher...")
self.loop.stop()
self.zmq_context.destroy()


watcher = CounterpartyWatcher()
watcher.start()
1 change: 1 addition & 0 deletions release-notes/release-notes-v10.2.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
* All queries that return lists from the database now accept the `cursor`/`offset` and `limit` arguments (see the Pagination paragraph from the API Documentation).
* Document the list of events with an example for each of them.
* The `asset`, `assets`, `give_asset`, and `get_asset` parameters are no longer case-sensitive.
* Publish events on ZMQ Pub/Sub channel (see Documentation)

## Command-Line Interface

Expand Down

0 comments on commit c481d14

Please sign in to comment.