diff --git a/nodepool/zk/zookeeper.py b/nodepool/zk/zookeeper.py index 8398d08e..ecf7777a 100644 --- a/nodepool/zk/zookeeper.py +++ b/nodepool/zk/zookeeper.py @@ -15,6 +15,7 @@ import abc import json import logging +import threading import time import uuid @@ -394,6 +395,9 @@ class NodeRequest(BaseModel): def __init__(self, id=None): super(NodeRequest, self).__init__(id) self.lock = None + # Local thread lock that is acquired when we are manipulating + # the ZK lock. + self._thread_lock = threading.Lock() self.declined_by = [] self.node_types = [] self.nodes = [] @@ -510,6 +514,9 @@ def __init__(self, id=None): super(Node, self).__init__(id) # Local lock object; not serialized self.lock = None + # Local thread lock that is acquired when we are manipulating + # the ZK lock. + self._thread_lock = threading.Lock() # Cached list of lock contenders; not serialized (and possibly # not up to date; use for status listings only). self.lock_contenders = set() @@ -2138,22 +2145,23 @@ def lockNodeRequest(self, request, blocking=True, timeout=None): log = get_annotated_logger(self.log, event_id=request.event_id, node_request_id=request.id) path = self._requestLockPath(request.id) - try: - lock = Lock(self.kazoo_client, path) - have_lock = lock.acquire(blocking, timeout) - except kze.LockTimeout: - raise npe.TimeoutException( - "Timeout trying to acquire lock %s" % path) - except kze.NoNodeError: - have_lock = False - log.error("Request not found for locking: %s", request) + with request._thread_lock: + try: + lock = Lock(self.kazoo_client, path) + have_lock = lock.acquire(blocking, timeout) + except kze.LockTimeout: + raise npe.TimeoutException( + "Timeout trying to acquire lock %s" % path) + except kze.NoNodeError: + have_lock = False + log.error("Request not found for locking: %s", request) - # If we aren't blocking, it's possible we didn't get the lock - # because someone else has it. - if not have_lock: - raise npe.ZKLockException("Did not get lock on %s" % path) + # If we aren't blocking, it's possible we didn't get the lock + # because someone else has it. + if not have_lock: + raise npe.ZKLockException("Did not get lock on %s" % path) - request.lock = lock + request.lock = lock # Do an in-place update of the node request so we have the latest data self.updateNodeRequest(request) @@ -2171,8 +2179,9 @@ def unlockNodeRequest(self, request): if request.lock is None: raise npe.ZKLockException( "Request %s does not hold a lock" % request) - request.lock.release() - request.lock = None + with request._thread_lock: + request.lock.release() + request.lock = None def lockNode(self, node, blocking=True, timeout=None, ephemeral=True, identifier=None): @@ -2199,22 +2208,23 @@ def lockNode(self, node, blocking=True, timeout=None, and could not get the lock, or a lock is already held. ''' path = self._nodeLockPath(node.id) - try: - lock = Lock(self.kazoo_client, path, identifier) - have_lock = lock.acquire(blocking, timeout, ephemeral) - except kze.LockTimeout: - raise npe.TimeoutException( - "Timeout trying to acquire lock %s" % path) - except kze.NoNodeError: - have_lock = False - self.log.error("Node not found for locking: %s", node) + with node._thread_lock: + try: + lock = Lock(self.kazoo_client, path, identifier) + have_lock = lock.acquire(blocking, timeout, ephemeral) + except kze.LockTimeout: + raise npe.TimeoutException( + "Timeout trying to acquire lock %s" % path) + except kze.NoNodeError: + have_lock = False + self.log.error("Node not found for locking: %s", node) - # If we aren't blocking, it's possible we didn't get the lock - # because someone else has it. - if not have_lock: - raise npe.ZKLockException("Did not get lock on %s" % path) + # If we aren't blocking, it's possible we didn't get the lock + # because someone else has it. + if not have_lock: + raise npe.ZKLockException("Did not get lock on %s" % path) - node.lock = lock + node.lock = lock # Do an in-place update of the node so we have the latest data. self.updateNode(node) @@ -2231,8 +2241,9 @@ def unlockNode(self, node): ''' if node.lock is None: raise npe.ZKLockException("Node %s does not hold a lock" % node) - node.lock.release() - node.lock = None + with node._thread_lock: + node.lock.release() + node.lock = None def forceUnlockNode(self, node): '''