Merge "reset listeners on agent refresh"

This commit is contained in:
Jenkins 2015-01-05 16:22:54 +00:00 committed by Gerrit Code Review
commit fbe62343fc
2 changed files with 20 additions and 13 deletions

View File

@ -175,19 +175,19 @@ class NotificationService(os_service.Service):
self._configure_pipeline_listeners() self._configure_pipeline_listeners()
def _configure_pipeline_listeners(self): def _configure_pipeline_listeners(self):
if cfg.CONF.notification.workload_partitioning: self.pipeline_listeners = []
partitioned = self.partition_coordinator.extract_my_subset( partitioned = self.partition_coordinator.extract_my_subset(
self.group_id, self.pipeline_manager.pipelines) self.group_id, self.pipeline_manager.pipelines)
transport = messaging.get_transport() transport = messaging.get_transport()
for pipe in partitioned: for pipe in partitioned:
LOG.debug(_('Pipeline endpoint: %s'), pipe.name) LOG.debug(_('Pipeline endpoint: %s'), pipe.name)
listener = messaging.get_notification_listener( listener = messaging.get_notification_listener(
transport, transport,
[oslo.messaging.Target( [oslo.messaging.Target(
topic='%s-%s' % (self.NOTIFICATION_IPC, pipe.name))], topic='%s-%s' % (self.NOTIFICATION_IPC, pipe.name))],
[pipeline.PipelineEndpoint(self.ctxt, pipe)]) [pipeline.PipelineEndpoint(self.ctxt, pipe)])
listener.start() listener.start()
self.pipeline_listeners.append(listener) self.pipeline_listeners.append(listener)
def stop(self): def stop(self):
self.partition_coordinator.leave_group(self.group_id) self.partition_coordinator.leave_group(self.group_id)

View File

@ -262,3 +262,10 @@ class TestRealNotificationHA(BaseRealNotification):
fake_coord1.extract_my_subset.side_effect = lambda x, y: y fake_coord1.extract_my_subset.side_effect = lambda x, y: y
fake_coord.return_value = fake_coord1 fake_coord.return_value = fake_coord1
self._check_notification_service() self._check_notification_service()
def test_reset_listeners_on_refresh(self):
self.srv.start()
self.assertEqual(1, len(self.srv.pipeline_listeners))
self.srv._refresh_agent(None)
self.assertEqual(1, len(self.srv.pipeline_listeners))
self.srv.stop()