Skip to content

Commit

Permalink
feat(core): add support for persistent recursive watches
Browse files Browse the repository at this point in the history
ZooKeeper 3.6.0 added support for persistent, and persistent
recursive watches.  This adds the corresponding support to the
Kazoo client class.
  • Loading branch information
jeblair authored and StephenSorriaux committed Mar 16, 2024
1 parent 6540c93 commit d2a911e
Show file tree
Hide file tree
Showing 6 changed files with 359 additions and 8 deletions.
117 changes: 117 additions & 0 deletions kazoo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from kazoo.protocol.connection import ConnectionHandler
from kazoo.protocol.paths import _prefix_root, normpath
from kazoo.protocol.serialization import (
AddWatch,
Auth,
CheckVersion,
CloseInstance,
Expand All @@ -38,6 +39,7 @@
SetACL,
GetData,
Reconfig,
RemoveWatches,
SetData,
Sync,
Transaction,
Expand All @@ -48,6 +50,8 @@
KazooState,
KeeperState,
WatchedEvent,
AddWatchMode,
WatcherType,
)
from kazoo.retry import KazooRetry
from kazoo.security import ACL, OPEN_ACL_UNSAFE
Expand Down Expand Up @@ -248,6 +252,8 @@ def __init__(
self.state_listeners = set()
self._child_watchers = defaultdict(set)
self._data_watchers = defaultdict(set)
self._persistent_watchers = defaultdict(set)
self._persistent_recursive_watchers = defaultdict(set)
self._reset()
self.read_only = read_only

Expand Down Expand Up @@ -416,8 +422,16 @@ def _reset_watchers(self):
for data_watchers in self._data_watchers.values():
watchers.extend(data_watchers)

for persistent_watchers in self._persistent_watchers.values():
watchers.extend(persistent_watchers)

for pr_watchers in self._persistent_recursive_watchers.values():
watchers.extend(pr_watchers)

self._child_watchers = defaultdict(set)
self._data_watchers = defaultdict(set)
self._persistent_watchers = defaultdict(set)
self._persistent_recursive_watchers = defaultdict(set)

ev = WatchedEvent(EventType.NONE, self._state, None)
for watch in watchers:
Expand Down Expand Up @@ -1644,8 +1658,111 @@ def reconfig_async(self, joining, leaving, new_members, from_config):

return async_result

def add_watch(self, path, watch, mode):
"""Add a watch.
This method adds persistent watches. Unlike the data and
child watches which may be set by calls to
:meth:`KazooClient.exists`, :meth:`KazooClient.get`, and
:meth:`KazooClient.get_children`, persistent watches are not
removed after being triggered.
To remove a persistent watch, use
:meth:`KazooClient.remove_all_watches` with an argument of
:attr:`~kazoo.protocol.states.WatcherType.ANY`.
The `mode` argument determines whether or not the watch is
recursive. To set a persistent watch, use
:class:`~kazoo.protocol.states.AddWatchMode.PERSISTENT`. To set a
persistent recursive watch, use
:class:`~kazoo.protocol.states.AddWatchMode.PERSISTENT_RECURSIVE`.
:param path: Path of node to watch.
:param watch: Watch callback to set for future changes
to this path.
:param mode: The mode to use.
:type mode: int
:raises:
:exc:`~kazoo.exceptions.MarshallingError` if mode is
unknown.
:exc:`~kazoo.exceptions.ZookeeperError` if the server
returns a non-zero error code.
"""
return self.add_watch_async(path, watch, mode).get()

def add_watch_async(self, path, watch, mode):
"""Asynchronously add a watch. Takes the same arguments as
:meth:`add_watch`.
"""
if not isinstance(path, str):
raise TypeError("Invalid type for 'path' (string expected)")
if not callable(watch):
raise TypeError("Invalid type for 'watch' (must be a callable)")
if not isinstance(mode, int):
raise TypeError("Invalid type for 'mode' (int expected)")
if mode not in (
AddWatchMode.PERSISTENT,
AddWatchMode.PERSISTENT_RECURSIVE,
):
raise ValueError("Invalid value for 'mode'")

async_result = self.handler.async_result()
self._call(
AddWatch(_prefix_root(self.chroot, path), watch, mode),
async_result,
)
return async_result

def remove_all_watches(self, path, watcher_type):
"""Remove watches from a path.
This removes all watches of a specified type (data, child,
any) from a given path.
The `watcher_type` argument specifies which type to use. It
may be one of:
* :attr:`~kazoo.protocol.states.WatcherType.DATA`
* :attr:`~kazoo.protocol.states.WatcherType.CHILDREN`
* :attr:`~kazoo.protocol.states.WatcherType.ANY`
To remove persistent watches, specify a watcher type of
:attr:`~kazoo.protocol.states.WatcherType.ANY`.
:param path: Path of watch to remove.
:param watcher_type: The type of watch to remove.
:type watcher_type: int
"""

return self.remove_all_watches_async(path, watcher_type).get()

def remove_all_watches_async(self, path, watcher_type):
"""Asynchronously remove watches. Takes the same arguments as
:meth:`remove_all_watches`.
"""
if not isinstance(path, str):
raise TypeError("Invalid type for 'path' (string expected)")
if not isinstance(watcher_type, int):
raise TypeError("Invalid type for 'watcher_type' (int expected)")
if watcher_type not in (
WatcherType.ANY,
WatcherType.CHILDREN,
WatcherType.DATA,
):
raise ValueError("Invalid value for 'watcher_type'")

async_result = self.handler.async_result()
self._call(
RemoveWatches(_prefix_root(self.chroot, path), watcher_type),
async_result,
)
return async_result


class TransactionRequest(object):

"""A Zookeeper Transaction Request
A Transaction provides a builder object that can be used to
Expand Down
5 changes: 5 additions & 0 deletions kazoo/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@ class SessionClosedRequireSaslError(ZookeeperError):
"""


@_zookeeper_exception(-121)
class NoWatcherError(ZookeeperError):
"""No watcher was found at the supplied path"""


@_zookeeper_exception(-125)
class QuotaExceededError(ZookeeperError):
"""Exceeded the quota that was set on the path."""
Expand Down
56 changes: 49 additions & 7 deletions kazoo/protocol/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
)
from kazoo.loggingsupport import BLATHER
from kazoo.protocol.serialization import (
AddWatch,
Auth,
Close,
Connect,
Expand All @@ -28,17 +29,20 @@
GetChildren2,
Ping,
PingInstance,
RemoveWatches,
ReplyHeader,
SASL,
Transaction,
Watch,
int_struct,
)
from kazoo.protocol.states import (
AddWatchMode,
Callback,
KeeperState,
WatchedEvent,
EVENT_TYPE_MAP,
WatcherType,
)
from kazoo.retry import (
ForceRetryError,
Expand Down Expand Up @@ -363,6 +367,18 @@ def _write(self, msg, timeout):
raise ConnectionDropped("socket connection broken")
sent += bytes_sent

def _find_persistent_recursive_watchers(self, path):
parts = path.split("/")
watchers = []
for count in range(len(parts)):
candidate = "/".join(parts[: count + 1])
if not candidate:
continue
watchers.extend(
self.client._persistent_recursive_watchers.get(candidate, [])
)
return watchers

def _read_watch_event(self, buffer, offset):
client = self.client
watch, offset = Watch.deserialize(buffer, offset)
Expand All @@ -374,9 +390,13 @@ def _read_watch_event(self, buffer, offset):

if watch.type in (CREATED_EVENT, CHANGED_EVENT):
watchers.extend(client._data_watchers.pop(path, []))
watchers.extend(client._persistent_watchers.get(path, []))
watchers.extend(self._find_persistent_recursive_watchers(path))
elif watch.type == DELETED_EVENT:
watchers.extend(client._data_watchers.pop(path, []))
watchers.extend(client._child_watchers.pop(path, []))
watchers.extend(client._persistent_watchers.get(path, []))
watchers.extend(self._find_persistent_recursive_watchers(path))
elif watch.type == CHILD_EVENT:
watchers.extend(client._child_watchers.pop(path, []))
else:
Expand Down Expand Up @@ -448,13 +468,35 @@ def _read_response(self, header, buffer, offset):

async_object.set(response)

# Determine if watchers should be registered
watcher = getattr(request, "watcher", None)
if not client._stopped.is_set() and watcher:
if isinstance(request, (GetChildren, GetChildren2)):
client._child_watchers[request.path].add(watcher)
else:
client._data_watchers[request.path].add(watcher)
# Determine if watchers should be registered or unregistered
if not client._stopped.is_set():
watcher = getattr(request, "watcher", None)
if watcher:
if isinstance(request, AddWatch):
if request.mode == AddWatchMode.PERSISTENT:
client._persistent_watchers[request.path].add(
watcher
)
elif request.mode == AddWatchMode.PERSISTENT_RECURSIVE:
client._persistent_recursive_watchers[
request.path
].add(watcher)
elif isinstance(request, (GetChildren, GetChildren2)):
client._child_watchers[request.path].add(watcher)
else:
client._data_watchers[request.path].add(watcher)
if isinstance(request, RemoveWatches):
if request.watcher_type == WatcherType.CHILDREN:
client._child_watchers.pop(request.path, None)
elif request.watcher_type == WatcherType.DATA:
client._data_watchers.pop(request.path, None)
elif request.watcher_type == WatcherType.ANY:
client._child_watchers.pop(request.path, None)
client._data_watchers.pop(request.path, None)
client._persistent_watchers.pop(request.path, None)
client._persistent_recursive_watchers.pop(
request.path, None
)

if isinstance(request, Close):
self.logger.log(BLATHER, "Read close response")
Expand Down
28 changes: 28 additions & 0 deletions kazoo/protocol/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,20 @@ def deserialize(cls, bytes, offset):
return data, stat


class RemoveWatches(namedtuple("RemoveWatches", "path watcher_type")):
type = 18

def serialize(self):
b = bytearray()
b.extend(write_string(self.path))
b.extend(int_struct.pack(self.watcher_type))
return b

@classmethod
def deserialize(cls, bytes, offset):
return None


class Auth(namedtuple("Auth", "auth_type scheme auth")):
type = 100

Expand All @@ -441,6 +455,20 @@ def deserialize(cls, bytes, offset):
return challenge, offset


class AddWatch(namedtuple("AddWatch", "path watcher mode")):
type = 106

def serialize(self):
b = bytearray()
b.extend(write_string(self.path))
b.extend(int_struct.pack(self.mode))
return b

@classmethod
def deserialize(cls, bytes, offset):
return None


class Watch(namedtuple("Watch", "type state path")):
@classmethod
def deserialize(cls, bytes, offset):
Expand Down
41 changes: 41 additions & 0 deletions kazoo/protocol/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,3 +251,44 @@ def data_length(self):
@property
def children_count(self):
return self.numChildren


class AddWatchMode(object):
"""Modes for use with :meth:`~kazoo.client.KazooClient.add_watch`
.. attribute:: PERSISTENT
The watch is not removed when trigged.
.. attribute:: PERSISTENT_RECURSIVE
The watch is not removed when trigged, and applies to all
paths underneath the supplied path as well.
"""

PERSISTENT = 0
PERSISTENT_RECURSIVE = 1


class WatcherType(object):
"""Watcher types for use with
:meth:`~kazoo.client.KazooClient.remove_all_watches`
.. attribute:: CHILDREN
Child watches.
.. attribute:: DATA
Data watches.
.. attribute:: ANY
Any type of watch (child, data, persistent, or persistent
recursive).
"""

CHILDREN = 1
DATA = 2
ANY = 3
Loading

0 comments on commit d2a911e

Please sign in to comment.