Merge "Add second level cache to node requests"
This commit is contained in:
commit
103e64ce24
@ -703,6 +703,7 @@ class ZooKeeper(object):
|
||||
self._node_cache = None
|
||||
self._request_cache = None
|
||||
self._cached_nodes = {}
|
||||
self._cached_node_requests = {}
|
||||
|
||||
# =======================================================================
|
||||
# Private Methods
|
||||
@ -899,6 +900,8 @@ class ZooKeeper(object):
|
||||
self._node_cache.start()
|
||||
|
||||
self._request_cache = TreeCache(self.client, self.REQUEST_ROOT)
|
||||
self._request_cache.listen_fault(self.cacheFaultListener)
|
||||
self._request_cache.listen(self.requestCacheListener)
|
||||
self._request_cache.start()
|
||||
|
||||
def disconnect(self):
|
||||
@ -1569,22 +1572,18 @@ class ZooKeeper(object):
|
||||
|
||||
:returns: The request data, or None if the request was not found.
|
||||
'''
|
||||
path = self._requestPath(request)
|
||||
data = None
|
||||
stat = None
|
||||
if cached:
|
||||
cached_data = self._request_cache.get_data(path)
|
||||
if cached_data:
|
||||
data = cached_data.data
|
||||
stat = cached_data.stat
|
||||
d = self._cached_node_requests.get(request)
|
||||
if d:
|
||||
return d
|
||||
|
||||
# If data is empty we either didn't use the cache or the cache didn't
|
||||
# If we got here we either didn't use the cache or the cache didn't
|
||||
# have the request (yet). Note that even if we use caching we need to
|
||||
# do a real query if the cached data is empty because the request data
|
||||
# might not be in the cache yet when it's listed by the get_children
|
||||
# call.
|
||||
if not data:
|
||||
try:
|
||||
path = self._requestPath(request)
|
||||
data, stat = self.client.get(path)
|
||||
except kze.NoNodeError:
|
||||
return None
|
||||
@ -2107,3 +2106,49 @@ class ZooKeeper(object):
|
||||
except KeyError:
|
||||
# If it's already gone, don't care
|
||||
pass
|
||||
|
||||
def requestCacheListener(self, event):
|
||||
|
||||
if hasattr(event.event_data, 'path'):
|
||||
# Ignore root node
|
||||
path = event.event_data.path
|
||||
if path == self.REQUEST_ROOT:
|
||||
return
|
||||
|
||||
# Ignore lock nodes
|
||||
if '/lock' in path:
|
||||
return
|
||||
|
||||
# Ignore any non-node related events such as connection events here
|
||||
if event.event_type not in (TreeEvent.NODE_ADDED,
|
||||
TreeEvent.NODE_UPDATED,
|
||||
TreeEvent.NODE_REMOVED):
|
||||
return
|
||||
|
||||
path = event.event_data.path
|
||||
request_id = path.rsplit('/', 1)[1]
|
||||
|
||||
if event.event_type in (TreeEvent.NODE_ADDED, TreeEvent.NODE_UPDATED):
|
||||
# Perform an in-place update of the cached request if possible
|
||||
d = self._bytesToDict(event.event_data.data)
|
||||
old_request = self._cached_node_requests.get(request_id)
|
||||
if old_request:
|
||||
if event.event_data.stat.version <= old_request.stat.version:
|
||||
# Don't update to older data
|
||||
return
|
||||
if old_request.lock:
|
||||
# Don't update a locked node request
|
||||
return
|
||||
old_request.updateFromDict(d)
|
||||
old_request.stat = event.event_data.stat
|
||||
else:
|
||||
request = NodeRequest.fromDict(d, request_id)
|
||||
request.stat = event.event_data.stat
|
||||
self._cached_node_requests[request_id] = request
|
||||
|
||||
elif event.event_type == TreeEvent.NODE_REMOVED:
|
||||
try:
|
||||
del self._cached_node_requests[request_id]
|
||||
except KeyError:
|
||||
# If it's already gone, don't care
|
||||
pass
|
||||
|
Loading…
x
Reference in New Issue
Block a user