Merge "Mutex access to local lock attributes"
This commit is contained in:
commit
e096928d30
@ -15,6 +15,7 @@ from copy import copy
|
||||
import abc
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
|
||||
@ -394,6 +395,9 @@ class NodeRequest(BaseModel):
|
||||
def __init__(self, id=None):
|
||||
super(NodeRequest, self).__init__(id)
|
||||
self.lock = None
|
||||
# Local thread lock that is acquired when we are manipulating
|
||||
# the ZK lock.
|
||||
self._thread_lock = threading.Lock()
|
||||
self.declined_by = []
|
||||
self.node_types = []
|
||||
self.nodes = []
|
||||
@ -510,6 +514,9 @@ class Node(BaseModel):
|
||||
super(Node, self).__init__(id)
|
||||
# Local lock object; not serialized
|
||||
self.lock = None
|
||||
# Local thread lock that is acquired when we are manipulating
|
||||
# the ZK lock.
|
||||
self._thread_lock = threading.Lock()
|
||||
# Cached list of lock contenders; not serialized (and possibly
|
||||
# not up to date; use for status listings only).
|
||||
self.lock_contenders = set()
|
||||
@ -2138,22 +2145,23 @@ class ZooKeeper(ZooKeeperBase):
|
||||
log = get_annotated_logger(self.log, event_id=request.event_id,
|
||||
node_request_id=request.id)
|
||||
path = self._requestLockPath(request.id)
|
||||
try:
|
||||
lock = Lock(self.kazoo_client, path)
|
||||
have_lock = lock.acquire(blocking, timeout)
|
||||
except kze.LockTimeout:
|
||||
raise npe.TimeoutException(
|
||||
"Timeout trying to acquire lock %s" % path)
|
||||
except kze.NoNodeError:
|
||||
have_lock = False
|
||||
log.error("Request not found for locking: %s", request)
|
||||
with request._thread_lock:
|
||||
try:
|
||||
lock = Lock(self.kazoo_client, path)
|
||||
have_lock = lock.acquire(blocking, timeout)
|
||||
except kze.LockTimeout:
|
||||
raise npe.TimeoutException(
|
||||
"Timeout trying to acquire lock %s" % path)
|
||||
except kze.NoNodeError:
|
||||
have_lock = False
|
||||
log.error("Request not found for locking: %s", request)
|
||||
|
||||
# If we aren't blocking, it's possible we didn't get the lock
|
||||
# because someone else has it.
|
||||
if not have_lock:
|
||||
raise npe.ZKLockException("Did not get lock on %s" % path)
|
||||
# If we aren't blocking, it's possible we didn't get the lock
|
||||
# because someone else has it.
|
||||
if not have_lock:
|
||||
raise npe.ZKLockException("Did not get lock on %s" % path)
|
||||
|
||||
request.lock = lock
|
||||
request.lock = lock
|
||||
|
||||
# Do an in-place update of the node request so we have the latest data
|
||||
self.updateNodeRequest(request)
|
||||
@ -2171,8 +2179,9 @@ class ZooKeeper(ZooKeeperBase):
|
||||
if request.lock is None:
|
||||
raise npe.ZKLockException(
|
||||
"Request %s does not hold a lock" % request)
|
||||
request.lock.release()
|
||||
request.lock = None
|
||||
with request._thread_lock:
|
||||
request.lock.release()
|
||||
request.lock = None
|
||||
|
||||
def lockNode(self, node, blocking=True, timeout=None,
|
||||
ephemeral=True, identifier=None):
|
||||
@ -2199,22 +2208,23 @@ class ZooKeeper(ZooKeeperBase):
|
||||
and could not get the lock, or a lock is already held.
|
||||
'''
|
||||
path = self._nodeLockPath(node.id)
|
||||
try:
|
||||
lock = Lock(self.kazoo_client, path, identifier)
|
||||
have_lock = lock.acquire(blocking, timeout, ephemeral)
|
||||
except kze.LockTimeout:
|
||||
raise npe.TimeoutException(
|
||||
"Timeout trying to acquire lock %s" % path)
|
||||
except kze.NoNodeError:
|
||||
have_lock = False
|
||||
self.log.error("Node not found for locking: %s", node)
|
||||
with node._thread_lock:
|
||||
try:
|
||||
lock = Lock(self.kazoo_client, path, identifier)
|
||||
have_lock = lock.acquire(blocking, timeout, ephemeral)
|
||||
except kze.LockTimeout:
|
||||
raise npe.TimeoutException(
|
||||
"Timeout trying to acquire lock %s" % path)
|
||||
except kze.NoNodeError:
|
||||
have_lock = False
|
||||
self.log.error("Node not found for locking: %s", node)
|
||||
|
||||
# If we aren't blocking, it's possible we didn't get the lock
|
||||
# because someone else has it.
|
||||
if not have_lock:
|
||||
raise npe.ZKLockException("Did not get lock on %s" % path)
|
||||
# If we aren't blocking, it's possible we didn't get the lock
|
||||
# because someone else has it.
|
||||
if not have_lock:
|
||||
raise npe.ZKLockException("Did not get lock on %s" % path)
|
||||
|
||||
node.lock = lock
|
||||
node.lock = lock
|
||||
|
||||
# Do an in-place update of the node so we have the latest data.
|
||||
self.updateNode(node)
|
||||
@ -2231,8 +2241,9 @@ class ZooKeeper(ZooKeeperBase):
|
||||
'''
|
||||
if node.lock is None:
|
||||
raise npe.ZKLockException("Node %s does not hold a lock" % node)
|
||||
node.lock.release()
|
||||
node.lock = None
|
||||
with node._thread_lock:
|
||||
node.lock.release()
|
||||
node.lock = None
|
||||
|
||||
def forceUnlockNode(self, node):
|
||||
'''
|
||||
|
Loading…
x
Reference in New Issue
Block a user