Fix connection handling in recursive tree cache
There were two errors related to connection handling in the new treecache implementation: 1) Upon disconnection, the cache receives a "NONE" event with the client state ("CONNECTING") and no path. This is a signal to the watcher that the connection state has changed. Since we process connection changes via the session listener, we should ignore this event. We were previously attempting to process it and assumed that the path attribute would be present. This change ignores NONE events with no path. 2) The treecache is responsible for re-establishing watches if the connection is LOST or SUSPENDED. Previously, it would only re-establish watches and refresh the cache if the connection was LOST. This change updates it to re-establish/refresh in both cases. The "_started" variable is no longer necessary to distinguish between the two. However, it may be useful for future consumers to know whether or not the cache is synced, so it is converted to "_ready" in case we want to expose that in the future. Change-Id: Id1f9c34628d1cc04881621a83d1d1f78e9e0f366
This commit is contained in:
parent
3e8dce8873
commit
c4c7052f10
@ -1219,6 +1219,14 @@ class TestTreeCache(tests.DBTestCase):
|
|||||||
'/test/foo': {},
|
'/test/foo': {},
|
||||||
})
|
})
|
||||||
|
|
||||||
|
# Simulate a change happening while the state was suspendede
|
||||||
|
cache._cached_paths.add('/test/bar')
|
||||||
|
cache._sessionListener(KazooState.SUSPENDED)
|
||||||
|
cache._sessionListener(KazooState.CONNECTED)
|
||||||
|
self.waitForCache(cache, {
|
||||||
|
'/test/foo': {},
|
||||||
|
})
|
||||||
|
|
||||||
def test_tree_cache_root(self):
|
def test_tree_cache_root(self):
|
||||||
client = self.zk.kazoo_client
|
client = self.zk.kazoo_client
|
||||||
data = b'{}'
|
data = b'{}'
|
||||||
|
@ -741,7 +741,7 @@ class NodepoolTreeCache(abc.ABC):
|
|||||||
self.root = root
|
self.root = root
|
||||||
self._cached_objects = {}
|
self._cached_objects = {}
|
||||||
self._cached_paths = set()
|
self._cached_paths = set()
|
||||||
self._started = False
|
self._ready = False
|
||||||
self._stopped = False
|
self._stopped = False
|
||||||
self._queue = queue.Queue()
|
self._queue = queue.Queue()
|
||||||
self._background_thread = threading.Thread(
|
self._background_thread = threading.Thread(
|
||||||
@ -763,9 +763,9 @@ class NodepoolTreeCache(abc.ABC):
|
|||||||
|
|
||||||
def _sessionListener(self, state):
|
def _sessionListener(self, state):
|
||||||
if state == KazooState.LOST:
|
if state == KazooState.LOST:
|
||||||
self._started = False
|
self._ready = False
|
||||||
elif (state == KazooState.CONNECTED and
|
elif state == KazooState.CONNECTED and not self._stopped:
|
||||||
not self._started and not self._stopped):
|
self._ready = False
|
||||||
self.zk.kazoo_client.handler.short_spawn(self._start)
|
self.zk.kazoo_client.handler.short_spawn(self._start)
|
||||||
|
|
||||||
def _cacheListener(self, event):
|
def _cacheListener(self, event):
|
||||||
@ -776,7 +776,7 @@ class NodepoolTreeCache(abc.ABC):
|
|||||||
self.zk.kazoo_client.add_watch(self.root, self._cacheListener,
|
self.zk.kazoo_client.add_watch(self.root, self._cacheListener,
|
||||||
AddWatchMode.PERSISTENT_RECURSIVE)
|
AddWatchMode.PERSISTENT_RECURSIVE)
|
||||||
self._walkTree()
|
self._walkTree()
|
||||||
self._started = True
|
self._ready = True
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self._stopped = True
|
self._stopped = True
|
||||||
@ -827,6 +827,9 @@ class NodepoolTreeCache(abc.ABC):
|
|||||||
# as a delete which may be followed by a normal delete event.
|
# as a delete which may be followed by a normal delete event.
|
||||||
# That case, and any other variations should be anticipated.
|
# That case, and any other variations should be anticipated.
|
||||||
if event.type == EventType.NONE:
|
if event.type == EventType.NONE:
|
||||||
|
if event.path is None:
|
||||||
|
# We're probably being told of a connection change; ignore.
|
||||||
|
return
|
||||||
try:
|
try:
|
||||||
data, stat = self.zk.kazoo_client.get(event.path)
|
data, stat = self.zk.kazoo_client.get(event.path)
|
||||||
exists = True
|
exists = True
|
||||||
|
Loading…
x
Reference in New Issue
Block a user