diff --git a/ceilometer/notification.py b/ceilometer/notification.py index 883a801b5..43ce229e8 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -175,19 +175,19 @@ class NotificationService(os_service.Service): self._configure_pipeline_listeners() def _configure_pipeline_listeners(self): - if cfg.CONF.notification.workload_partitioning: - partitioned = self.partition_coordinator.extract_my_subset( - self.group_id, self.pipeline_manager.pipelines) - transport = messaging.get_transport() - for pipe in partitioned: - LOG.debug(_('Pipeline endpoint: %s'), pipe.name) - listener = messaging.get_notification_listener( - transport, - [oslo.messaging.Target( - topic='%s-%s' % (self.NOTIFICATION_IPC, pipe.name))], - [pipeline.PipelineEndpoint(self.ctxt, pipe)]) - listener.start() - self.pipeline_listeners.append(listener) + self.pipeline_listeners = [] + partitioned = self.partition_coordinator.extract_my_subset( + self.group_id, self.pipeline_manager.pipelines) + transport = messaging.get_transport() + for pipe in partitioned: + LOG.debug(_('Pipeline endpoint: %s'), pipe.name) + listener = messaging.get_notification_listener( + transport, + [oslo.messaging.Target( + topic='%s-%s' % (self.NOTIFICATION_IPC, pipe.name))], + [pipeline.PipelineEndpoint(self.ctxt, pipe)]) + listener.start() + self.pipeline_listeners.append(listener) def stop(self): self.partition_coordinator.leave_group(self.group_id) diff --git a/ceilometer/tests/test_notification.py b/ceilometer/tests/test_notification.py index e15e03381..9cd9c109d 100644 --- a/ceilometer/tests/test_notification.py +++ b/ceilometer/tests/test_notification.py @@ -262,3 +262,10 @@ class TestRealNotificationHA(BaseRealNotification): fake_coord1.extract_my_subset.side_effect = lambda x, y: y fake_coord.return_value = fake_coord1 self._check_notification_service() + + def test_reset_listeners_on_refresh(self): + self.srv.start() + self.assertEqual(1, len(self.srv.pipeline_listeners)) + self.srv._refresh_agent(None) + self.assertEqual(1, len(self.srv.pipeline_listeners)) + self.srv.stop()