Skip to content

Commit

Permalink
Mutex access to local lock attributes
Browse files Browse the repository at this point in the history
When we lock or unlock requests or nodes, we set the .lock attribute
of the request/node to the ZK lock object or None respectively.

If two threads are racing an unlock and lock, we can have the following
sequence:

1: unlock request in ZK
2: lock request in ZK
2: set request.lock to ZK lock object
1: set request.lock object to None

If this happens, thread 2 will not be able to unlock the request
and it will stay locked permanently.

To correct this, set a local mutex around these operations so that
they can not be sequenced as above.

Change-Id: Ic52dc0202bf629462d774406b08b7dee01396dd8
  • Loading branch information
jeblair committed Mar 27, 2023
1 parent faca60b commit f125c6a
Showing 1 changed file with 43 additions and 32 deletions.
75 changes: 43 additions & 32 deletions nodepool/zk/zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import abc
import json
import logging
import threading
import time
import uuid

Expand Down Expand Up @@ -394,6 +395,9 @@ class NodeRequest(BaseModel):
def __init__(self, id=None):
super(NodeRequest, self).__init__(id)
self.lock = None
# Local thread lock that is acquired when we are manipulating
# the ZK lock.
self._thread_lock = threading.Lock()
self.declined_by = []
self.node_types = []
self.nodes = []
Expand Down Expand Up @@ -510,6 +514,9 @@ def __init__(self, id=None):
super(Node, self).__init__(id)
# Local lock object; not serialized
self.lock = None
# Local thread lock that is acquired when we are manipulating
# the ZK lock.
self._thread_lock = threading.Lock()
# Cached list of lock contenders; not serialized (and possibly
# not up to date; use for status listings only).
self.lock_contenders = set()
Expand Down Expand Up @@ -2138,22 +2145,23 @@ def lockNodeRequest(self, request, blocking=True, timeout=None):
log = get_annotated_logger(self.log, event_id=request.event_id,
node_request_id=request.id)
path = self._requestLockPath(request.id)
try:
lock = Lock(self.kazoo_client, path)
have_lock = lock.acquire(blocking, timeout)
except kze.LockTimeout:
raise npe.TimeoutException(
"Timeout trying to acquire lock %s" % path)
except kze.NoNodeError:
have_lock = False
log.error("Request not found for locking: %s", request)
with request._thread_lock:
try:
lock = Lock(self.kazoo_client, path)
have_lock = lock.acquire(blocking, timeout)
except kze.LockTimeout:
raise npe.TimeoutException(
"Timeout trying to acquire lock %s" % path)
except kze.NoNodeError:
have_lock = False
log.error("Request not found for locking: %s", request)

# If we aren't blocking, it's possible we didn't get the lock
# because someone else has it.
if not have_lock:
raise npe.ZKLockException("Did not get lock on %s" % path)
# If we aren't blocking, it's possible we didn't get the lock
# because someone else has it.
if not have_lock:
raise npe.ZKLockException("Did not get lock on %s" % path)

request.lock = lock
request.lock = lock

# Do an in-place update of the node request so we have the latest data
self.updateNodeRequest(request)
Expand All @@ -2171,8 +2179,9 @@ def unlockNodeRequest(self, request):
if request.lock is None:
raise npe.ZKLockException(
"Request %s does not hold a lock" % request)
request.lock.release()
request.lock = None
with request._thread_lock:
request.lock.release()
request.lock = None

def lockNode(self, node, blocking=True, timeout=None,
ephemeral=True, identifier=None):
Expand All @@ -2199,22 +2208,23 @@ def lockNode(self, node, blocking=True, timeout=None,
and could not get the lock, or a lock is already held.
'''
path = self._nodeLockPath(node.id)
try:
lock = Lock(self.kazoo_client, path, identifier)
have_lock = lock.acquire(blocking, timeout, ephemeral)
except kze.LockTimeout:
raise npe.TimeoutException(
"Timeout trying to acquire lock %s" % path)
except kze.NoNodeError:
have_lock = False
self.log.error("Node not found for locking: %s", node)
with node._thread_lock:
try:
lock = Lock(self.kazoo_client, path, identifier)
have_lock = lock.acquire(blocking, timeout, ephemeral)
except kze.LockTimeout:
raise npe.TimeoutException(
"Timeout trying to acquire lock %s" % path)
except kze.NoNodeError:
have_lock = False
self.log.error("Node not found for locking: %s", node)

# If we aren't blocking, it's possible we didn't get the lock
# because someone else has it.
if not have_lock:
raise npe.ZKLockException("Did not get lock on %s" % path)
# If we aren't blocking, it's possible we didn't get the lock
# because someone else has it.
if not have_lock:
raise npe.ZKLockException("Did not get lock on %s" % path)

node.lock = lock
node.lock = lock

# Do an in-place update of the node so we have the latest data.
self.updateNode(node)
Expand All @@ -2231,8 +2241,9 @@ def unlockNode(self, node):
'''
if node.lock is None:
raise npe.ZKLockException("Node %s does not hold a lock" % node)
node.lock.release()
node.lock = None
with node._thread_lock:
node.lock.release()
node.lock = None

def forceUnlockNode(self, node):
'''
Expand Down

0 comments on commit f125c6a

Please sign in to comment.