Skip to content

Commit

Permalink
feat: add support for persistent recursive watches
Browse files Browse the repository at this point in the history
ZooKeeper 3.6.0 added support for persistent, and persistent
recursive watches.  This adds the corresponding support to the
Kazoo client class.
  • Loading branch information
jeblair committed Mar 21, 2023
1 parent 33c348b commit 741f246
Show file tree
Hide file tree
Showing 6 changed files with 326 additions and 8 deletions.
99 changes: 99 additions & 0 deletions kazoo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from kazoo.protocol.connection import ConnectionHandler
from kazoo.protocol.paths import _prefix_root, normpath
from kazoo.protocol.serialization import (
AddWatch,
Auth,
CheckVersion,
CloseInstance,
Expand All @@ -38,6 +39,7 @@
SetACL,
GetData,
Reconfig,
RemoveWatches,
SetData,
Sync,
Transaction,
Expand Down Expand Up @@ -248,6 +250,7 @@ def __init__(
self.state_listeners = set()
self._child_watchers = defaultdict(set)
self._data_watchers = defaultdict(set)
self._persistent_watchers = defaultdict(set)
self._reset()
self.read_only = read_only

Expand Down Expand Up @@ -416,8 +419,12 @@ def _reset_watchers(self):
for data_watchers in self._data_watchers.values():
watchers.extend(data_watchers)

for persistent_watchers in self._persistent_watchers.values():
watchers.extend(persistent_watchers)

self._child_watchers = defaultdict(set)
self._data_watchers = defaultdict(set)
self._persistent_watchers = defaultdict(set)

ev = WatchedEvent(EventType.NONE, self._state, None)
for watch in watchers:
Expand Down Expand Up @@ -1644,8 +1651,100 @@ def reconfig_async(self, joining, leaving, new_members, from_config):

return async_result

def add_watch(self, path, watch, mode):
"""Add a watch.
This method adds persistent watches. Unlike the data and
child watches which may be set by calls to
:meth:`KazooClient.exists`, :meth:`KazooClient.get`, and
:meth:`KazooClient.get_children`, persistent watches are not
removed after being triggered.
To remove a persistent watch, use
:meth:`KazooClient.remove_all_watches` with an argument of
:attr:`~kazoo.states.WatcherType.ANY`.
The `mode` argument determines whether or not the watch is
recursive. To set a persistent watch, use
:class:`~kazoo.states.AddWatchMode.PERSISTENT`. To set a
persistent recursive watch, use
:class:`~kazoo.states.AddWatchMode.PERSISTENT_RECURSIVE`.
:param path: Path of node to watch.
:param watch: Watch callback to set for future changes
to this path.
:param mode: The mode to use.
:type mode: int
:raises:
:exc:`~kazoo.exceptions.MarshallingError` if mode is
unknown.
:exc:`~kazoo.exceptions.ZookeeperError` if the server
returns a non-zero error code.
"""
return self.add_watch_async(path, watch, mode).get()

def add_watch_async(self, path, watch, mode):
"""Asynchronously add a watch. Takes the same arguments as
:meth:`add_watch`.
"""
if not isinstance(path, str):
raise TypeError("Invalid type for 'path' (string expected)")
if not callable(watch):
raise TypeError("Invalid type for 'watch' (must be a callable)")
if not isinstance(mode, int):
raise TypeError("Invalid type for 'mode' (int expected)")

async_result = self.handler.async_result()
self._call(
AddWatch(_prefix_root(self.chroot, path), watch, mode),
async_result,
)
return async_result

def remove_all_watches(self, path, watcher_type):
"""Remove watches from a path.
This removes all watches of a specified type (data, child,
any) from a given path.
The `watcher_type` argument specifies which type to use. It
may be one of:
* :attr:`~kazoo.states.WatcherType.DATA`
* :attr:`~kazoo.states.WatcherType.CHILD`
* :attr:`~kazoo.states.WatcherType.ANY`
To remove persistent watches, specify a watcher type of
:attr:`~kazoo.states.WatcherType.ANY`.
:param path: Path of watch to remove.
:param watcher_type: The type of watch to remove.
:type watcher_type: int
"""

return self.remove_all_watches_async(path, watcher_type).get()

def remove_all_watches_async(self, path, watcher_type):
"""Asynchronously remove watches. Takes the same arguments as
:meth:`remove_all_watches`.
"""
if not isinstance(path, str):
raise TypeError("Invalid type for 'path' (string expected)")
if not isinstance(watcher_type, int):
raise TypeError("Invalid type for 'watcher_type' (int expected)")

async_result = self.handler.async_result()
self._call(
RemoveWatches(_prefix_root(self.chroot, path), watcher_type),
async_result,
)
return async_result


class TransactionRequest(object):

"""A Zookeeper Transaction Request
A Transaction provides a builder object that can be used to
Expand Down
5 changes: 5 additions & 0 deletions kazoo/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ class NotReadOnlyCallError(ZookeeperError):
a read-only server"""


@_zookeeper_exception(-121)
class NoWatcherError(ZookeeperError):
"""No watcher was found at the supplied path"""


class ConnectionClosedError(SessionExpiredError):
"""Connection is closed"""

Expand Down
42 changes: 35 additions & 7 deletions kazoo/protocol/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
)
from kazoo.loggingsupport import BLATHER
from kazoo.protocol.serialization import (
AddWatch,
Auth,
Close,
Connect,
Expand All @@ -28,6 +29,7 @@
GetChildren2,
Ping,
PingInstance,
RemoveWatches,
ReplyHeader,
SASL,
Transaction,
Expand Down Expand Up @@ -363,6 +365,18 @@ def _write(self, msg, timeout):
raise ConnectionDropped("socket connection broken")
sent += bytes_sent

def _find_persistent_watchers(self, path):
parts = path.split("/")
watchers = []
for count in range(len(parts)):
candidate = "/".join(parts[: count + 1])
if not candidate:
continue
watchers.extend(
self.client._persistent_watchers.get(candidate, [])
)
return watchers

def _read_watch_event(self, buffer, offset):
client = self.client
watch, offset = Watch.deserialize(buffer, offset)
Expand All @@ -374,9 +388,11 @@ def _read_watch_event(self, buffer, offset):

if watch.type in (CREATED_EVENT, CHANGED_EVENT):
watchers.extend(client._data_watchers.pop(path, []))
watchers.extend(self._find_persistent_watchers(path))
elif watch.type == DELETED_EVENT:
watchers.extend(client._data_watchers.pop(path, []))
watchers.extend(client._child_watchers.pop(path, []))
watchers.extend(self._find_persistent_watchers(path))
elif watch.type == CHILD_EVENT:
watchers.extend(client._child_watchers.pop(path, []))
else:
Expand Down Expand Up @@ -448,13 +464,25 @@ def _read_response(self, header, buffer, offset):

async_object.set(response)

# Determine if watchers should be registered
watcher = getattr(request, "watcher", None)
if not client._stopped.is_set() and watcher:
if isinstance(request, (GetChildren, GetChildren2)):
client._child_watchers[request.path].add(watcher)
else:
client._data_watchers[request.path].add(watcher)
# Determine if watchers should be registered or unregistered
if not client._stopped.is_set():
watcher = getattr(request, "watcher", None)
if watcher:
if isinstance(request, AddWatch):
client._persistent_watchers[request.path].add(watcher)
elif isinstance(request, (GetChildren, GetChildren2)):
client._child_watchers[request.path].add(watcher)
else:
client._data_watchers[request.path].add(watcher)
if isinstance(request, RemoveWatches):
if request.watcher_type == 1:
client._child_watchers.pop(request.path, None)
if request.watcher_type == 2:
client._data_watchers.pop(request.path, None)
if request.watcher_type == 3:
client._child_watchers.pop(request.path, None)
client._data_watchers.pop(request.path, None)
client._persistent_watchers.pop(request.path, None)

if isinstance(request, Close):
self.logger.log(BLATHER, "Read close response")
Expand Down
28 changes: 28 additions & 0 deletions kazoo/protocol/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,20 @@ def deserialize(cls, bytes, offset):
return data, stat


class RemoveWatches(namedtuple("RemoveWatches", "path watcher_type")):
type = 18

def serialize(self):
b = bytearray()
b.extend(write_string(self.path))
b.extend(int_struct.pack(self.watcher_type))
return b

@classmethod
def deserialize(cls, bytes, offset):
return None


class Auth(namedtuple("Auth", "auth_type scheme auth")):
type = 100

Expand All @@ -441,6 +455,20 @@ def deserialize(cls, bytes, offset):
return challenge, offset


class AddWatch(namedtuple("AddWatch", "path watcher mode")):
type = 106

def serialize(self):
b = bytearray()
b.extend(write_string(self.path))
b.extend(int_struct.pack(self.mode))
return b

@classmethod
def deserialize(cls, bytes, offset):
return None


class Watch(namedtuple("Watch", "type state path")):
@classmethod
def deserialize(cls, bytes, offset):
Expand Down
41 changes: 41 additions & 0 deletions kazoo/protocol/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,3 +251,44 @@ def data_length(self):
@property
def children_count(self):
return self.numChildren


class AddWatchMode(object):
"""Modes for use with :meth:`~kazoo.client.KazooClient.add_watch`
.. attribute:: PERSISTENT
The watch is not removed when trigged.
.. attribute:: PERSISTENT_RECURSIVE
The watch is not removed when trigged, and applies to all
paths underneath the supplied path as well.
"""

PERSISTENT = 0
PERSISTENT_RECURSIVE = 1


class WatcherType(object):
"""Watcher types for use with
:meth:`~kazoo.client.KazooClient.remove_all_watches`
.. attribute:: CHILDREN
Child watches.
.. attribute:: DATA
Data watches.
.. attribute:: ANY
Any type of watch (child, data, persistent, or persistent
recursive).
"""

CHILDREN = 1
DATA = 2
ANY = 3
Loading

0 comments on commit 741f246

Please sign in to comment.