diff --git a/ceilometer/collector/service.py b/ceilometer/collector/service.py index 033f1f0a2..9da8b9603 100644 --- a/ceilometer/collector/service.py +++ b/ceilometer/collector/service.py @@ -168,6 +168,16 @@ class CollectorService(rpc_service.Service): ) def _setup_subscription(self, ext, *args, **kwds): + """Connect to message bus to get notifications + + Configure the RPC connection to listen for messages on the + right exchanges and topics so we receive all of the + notifications. + + Use a connection pool so that multiple collector instances can + run in parallel to share load and without competing with each + other for incoming messages. + """ handler = ext.obj ack_on_error = cfg.CONF.collector.ack_on_event_error LOG.debug('Event types from %s: %s (ack_on_error=%s)', @@ -188,12 +198,22 @@ class CollectorService(rpc_service.Service): (topic, exchange_topic.exchange)) def record_metering_data(self, context, data): + """RPC endpoint for messages we send to ourself + + When the notification messages are re-published through the + RPC publisher, this method receives them for processing. + """ self.dispatcher_manager.map(self._record_metering_data_for_ext, context=context, data=data) def process_notification(self, notification): - """Make a notification processed by an handler.""" + """RPC endpoint for notification messages + + When another service sends a notification over the message + bus, this method receives it. See _setup_subscription(). + + """ LOG.debug('notification %r', notification.get('event_type')) self.notification_manager.map(self._process_notification_for_ext, notification=notification) @@ -250,9 +270,22 @@ class CollectorService(rpc_service.Service): @staticmethod def _record_metering_data_for_ext(ext, context, data): + """Wrapper for calling dispatcher plugin when a sample arrives + + When a message is received by record_metering_data(), it calls + this method with each plugin to allow it to process the data. + + """ ext.obj.record_metering_data(context, data) def _process_notification_for_ext(self, ext, notification): + """Wrapper for calling pipelines when a notification arrives + + When a message is received by process_notification(), it calls + this method with each notification plugin to allow all the + plugins process the notification. + + """ with self.pipeline_manager.publisher(context.get_admin_context()) as p: # FIXME(dhellmann): Spawn green thread? p(list(ext.obj.to_samples(notification)))