Always complete cache init on reconnect

There are some race conditions in the current treecache code that
could cause a cache to initialize empty or run without workers.

The following sequence can occur:

* ZK connection -> suspended
* Start cache init #1
* Cache init #1 starts workers
* ZK connection -> connected
* Set stop workers flag
* Workers stop
* Short spawn of cache init (call this cache init #2)
* Cache init #2 fails to obtain lock since init #1 is running
* Cache init #1 finishes and releases lock

This results in a cache with no worker threads running.

The problem is that we assume that if we reconnect to ZK multiple
times and a cache init is already running that we don't need to
run another one.  However, that depends greatly on the exact timing.

To fix this, we will always run the cache init each time we spawn
it, rather than assuming that an already running cache init is
sufficient.  If we do have a flapping connection, we may re-init
the cache more than necessary, but at least we should end up with
a working cache at the end.

Change-Id: I3ba56a5cddc516656bcce016ed048f4805ef3751
This commit is contained in:
James E. Blair 2023-08-03 10:17:24 -07:00
parent 07c83f555d
commit 9c6fd55af2

View File

@ -763,23 +763,22 @@ class NodepoolTreeCache(abc.ABC):
self._event_queue.put(None)
self._playback_queue.put(None)
elif state == KazooState.CONNECTED and not self._stopped:
self._ready.clear()
self._stop_workers = True
self._event_queue.put(None)
self._playback_queue.put(None)
self.zk.kazoo_client.handler.short_spawn(self._start)
def _cacheListener(self, event):
self._event_queue.put(event)
def _start(self):
locked = self._init_lock.acquire(blocking=False)
if locked:
with self._init_lock:
self.log.debug("Initialize cache at %s", self.root)
# If we have an event worker (this is a re-init), then way
# for it to finish stopping (the session listener should
# have told it to stop).
self._ready.clear()
self._stop_workers = True
self._event_queue.put(None)
self._playback_queue.put(None)
# If we have an event worker (this is a re-init), then wait
# for it to finish stopping.
if self._event_worker:
self._event_worker.join()
# Replace the queue since any events from the previous
@ -814,11 +813,6 @@ class NodepoolTreeCache(abc.ABC):
except Exception:
self.log.exception("Error initializing cache at %s", self.root)
self.zk.kazoo_client.handler.short_spawn(self._start)
finally:
self._init_lock.release()
else:
self.log.debug("Skipping locked cache initialization at %s",
self.root)
def stop(self):
self._stopped = True