diff --git a/marconi/common/transport/wsgi/health.py b/marconi/common/transport/wsgi/health.py index 567cdfc8f..3043e32c7 100644 --- a/marconi/common/transport/wsgi/health.py +++ b/marconi/common/transport/wsgi/health.py @@ -19,8 +19,14 @@ import falcon class Resource(object): + __slots__ = ('driver',) + + def __init__(self, driver): + self.driver = driver + def on_get(self, req, resp, **kwargs): - resp.status = falcon.HTTP_204 + resp.status = (falcon.HTTP_204 if self.driver.is_alive() + else falcon.HTTP_503) def on_head(self, req, resp, **kwargs): resp.status = falcon.HTTP_204 diff --git a/marconi/queues/storage/base.py b/marconi/queues/storage/base.py index a1cdda7f7..af1623481 100644 --- a/marconi/queues/storage/base.py +++ b/marconi/queues/storage/base.py @@ -55,6 +55,11 @@ class DataDriverBase(object): self.conf.register_opts(_LIMITS_OPTIONS, group=_LIMITS_GROUP) self.limits_conf = self.conf[_LIMITS_GROUP] + @abc.abstractmethod + def is_alive(self): + """Check whether the storage is ready.""" + raise NotImplementedError + @abc.abstractproperty def queue_controller(self): """Returns the driver's queue controller.""" diff --git a/marconi/queues/storage/mongodb/driver.py b/marconi/queues/storage/mongodb/driver.py index 38f85ca14..78f09c2c8 100644 --- a/marconi/queues/storage/mongodb/driver.py +++ b/marconi/queues/storage/mongodb/driver.py @@ -56,6 +56,14 @@ class DataDriver(storage.DataDriverBase): group=options.MONGODB_GROUP) self.mongodb_conf = self.conf[options.MONGODB_GROUP] + def is_alive(self): + try: + # NOTE(zyuan): Requires admin access to mongodb + return 'ok' in self.connection.admin.command('ping') + + except pymongo.errors.PyMongoError: + return False + @decorators.lazy_property(write=False) def queues_database(self): """Database dedicated to the "queues" collection. diff --git a/marconi/queues/storage/pipeline.py b/marconi/queues/storage/pipeline.py index e907cf65c..0a51deb27 100644 --- a/marconi/queues/storage/pipeline.py +++ b/marconi/queues/storage/pipeline.py @@ -93,6 +93,9 @@ class DataDriver(base.DataDriverBase): super(DataDriver, self).__init__(conf) self._storage = storage + def is_alive(self): + return self._storage.is_alive() + @decorators.lazy_property(write=False) def queue_controller(self): stages = _get_storage_pipeline('queue', self.conf) diff --git a/marconi/queues/storage/sharding.py b/marconi/queues/storage/sharding.py index 258d23304..818f9a998 100644 --- a/marconi/queues/storage/sharding.py +++ b/marconi/queues/storage/sharding.py @@ -46,6 +46,11 @@ class DataDriver(storage.DataDriverBase): super(DataDriver, self).__init__(conf) self._shard_catalog = Catalog(conf, control) + def is_alive(self): + return all(self._shard_catalog.get_driver(shard['name']).is_alive() + for shard in + self._shard_catalog._shards_ctrl.list(limit=0)) + @decorators.lazy_property(write=False) def queue_controller(self): return QueueController(self._shard_catalog) diff --git a/marconi/queues/storage/sqlite/driver.py b/marconi/queues/storage/sqlite/driver.py index 27d0018c8..843d741e6 100644 --- a/marconi/queues/storage/sqlite/driver.py +++ b/marconi/queues/storage/sqlite/driver.py @@ -185,6 +185,9 @@ class DataDriver(storage.DataDriverBase): self.__conn.rollback() raise + def is_alive(self): + return True + @decorators.lazy_property(write=False) def queue_controller(self): return controllers.QueueController(self) diff --git a/marconi/queues/transport/wsgi/admin/driver.py b/marconi/queues/transport/wsgi/admin/driver.py index 775196945..5669698a4 100644 --- a/marconi/queues/transport/wsgi/admin/driver.py +++ b/marconi/queues/transport/wsgi/admin/driver.py @@ -30,5 +30,5 @@ class Driver(public_driver.Driver): ('/shards/{shard}', shards.Resource(shards_controller)), ('/health', - health.Resource()) + health.Resource(self._storage)) ] diff --git a/marconi/queues/transport/wsgi/public/driver.py b/marconi/queues/transport/wsgi/public/driver.py index 91cc1169f..157f2dcdb 100644 --- a/marconi/queues/transport/wsgi/public/driver.py +++ b/marconi/queues/transport/wsgi/public/driver.py @@ -68,5 +68,5 @@ class Driver(driver.DriverBase): # Health ('/health', - health.Resource()) + health.Resource(self._storage)) ] diff --git a/marconi/tests/faulty_storage.py b/marconi/tests/faulty_storage.py index 3b265969b..9e0db921e 100644 --- a/marconi/tests/faulty_storage.py +++ b/marconi/tests/faulty_storage.py @@ -24,6 +24,9 @@ class DataDriver(storage.DataDriverBase): def default_options(self): return {} + def is_alive(self): + raise NotImplementedError() + @property def queue_controller(self): return QueueController(self) diff --git a/tests/unit/queues/storage/test_shard_queues.py b/tests/unit/queues/storage/test_shard_queues.py index 78e8c44e4..546fe1d60 100644 --- a/tests/unit/queues/storage/test_shard_queues.py +++ b/tests/unit/queues/storage/test_shard_queues.py @@ -37,7 +37,8 @@ class TestShardQueues(base.TestBase): control = utils.load_storage_driver(conf, control_mode=True) self.shards_ctrl = control.shards_controller - self.controller = sharding.DataDriver(conf, control).queue_controller + self.driver = sharding.DataDriver(conf, control) + self.controller = self.driver.queue_controller # fake two shards for _ in xrange(2): @@ -47,6 +48,10 @@ class TestShardQueues(base.TestBase): self.shards_ctrl.drop_all() super(TestShardQueues, self).tearDown() + def test_health(self): + health = self.driver.is_alive() + self.assertTrue(health) + def test_listing(self): project = "I.G"