# -*- encoding: utf-8 -*- # # Copyright © 2012 eNovance # # Author: Julien Danjou # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. from oslo.config import cfg from stevedore import dispatch from ceilometer.collector import meter as meter_api from ceilometer import extension_manager from ceilometer import pipeline from ceilometer import service from ceilometer import storage from ceilometer.openstack.common import context from ceilometer.openstack.common import log from ceilometer.openstack.common import timeutils from ceilometer.openstack.common.rpc import dispatcher as rpc_dispatcher # Import rpc_notifier to register `notification_topics` flag so that # plugins can use it # FIXME(dhellmann): Use option importing feature of oslo.config instead. import ceilometer.openstack.common.notifier.rpc_notifier OPTS = [ cfg.ListOpt('disabled_notification_listeners', default=[], help='list of listener plugins to disable', ), ] cfg.CONF.register_opts(OPTS) LOG = log.getLogger(__name__) class CollectorService(service.PeriodicService): COLLECTOR_NAMESPACE = 'ceilometer.collector' def start(self): super(CollectorService, self).start() storage.register_opts(cfg.CONF) self.storage_engine = storage.get_engine(cfg.CONF) self.storage_conn = self.storage_engine.get_connection(cfg.CONF) def initialize_service_hook(self, service): '''Consumers must be declared before consume_thread start.''' LOG.debug('initialize_service_hooks') publisher_manager = dispatch.NameDispatchExtensionManager( namespace=pipeline.PUBLISHER_NAMESPACE, check_func=lambda x: True, invoke_on_load=True, ) self.pipeline_manager = pipeline.setup_pipeline(publisher_manager) LOG.debug('loading notification handlers from %s', self.COLLECTOR_NAMESPACE) self.notification_manager = \ extension_manager.ActivatedExtensionManager( namespace=self.COLLECTOR_NAMESPACE, disabled_names=cfg.CONF.disabled_notification_listeners, ) if not list(self.notification_manager): LOG.warning('Failed to load any notification handlers for %s', self.COLLECTOR_NAMESPACE) self.notification_manager.map(self._setup_subscription) # Set ourselves up as a separate worker for the metering data, # since the default for service is to use create_consumer(). self.conn.create_worker( cfg.CONF.metering_topic, rpc_dispatcher.RpcDispatcher([self]), 'ceilometer.collector.' + cfg.CONF.metering_topic, ) def _setup_subscription(self, ext, *args, **kwds): handler = ext.obj LOG.debug('Event types from %s: %s', ext.name, ', '.join(handler.get_event_types())) for exchange_topic in handler.get_exchange_topics(cfg.CONF): for topic in exchange_topic.topics: try: self.conn.join_consumer_pool( callback=self.process_notification, pool_name='ceilometer.notifications', topic=topic, exchange_name=exchange_topic.exchange, ) except Exception: LOG.exception('Could not join consumer pool %s/%s' % (topic, exchange_topic.exchange)) def process_notification(self, notification): """Make a notification processed by an handler.""" LOG.debug('notification %r', notification.get('event_type')) self.notification_manager.map(self._process_notification_for_ext, notification=notification, ) def _process_notification_for_ext(self, ext, notification): handler = ext.obj if notification['event_type'] in handler.get_event_types(): ctxt = context.get_admin_context() with self.pipeline_manager.publisher(ctxt, cfg.CONF.counter_source) as p: # FIXME(dhellmann): Spawn green thread? p(list(handler.process_notification(notification))) def record_metering_data(self, context, data): """This method is triggered when metering data is cast from an agent. """ # We may have receive only one counter on the wire if not isinstance(data, list): data = [data] for meter in data: LOG.info('metering data %s for %s @ %s: %s', meter['counter_name'], meter['resource_id'], meter.get('timestamp', 'NO TIMESTAMP'), meter['counter_volume']) if meter_api.verify_signature(meter, cfg.CONF.metering_secret): try: # Convert the timestamp to a datetime instance. # Storage engines are responsible for converting # that value to something they can store. if meter.get('timestamp'): ts = timeutils.parse_isotime(meter['timestamp']) meter['timestamp'] = timeutils.normalize_time(ts) self.storage_conn.record_metering_data(meter) except Exception as err: LOG.error('Failed to record metering data: %s', err) LOG.exception(err) else: LOG.warning( 'message signature invalid, discarding message: %r', meter) def periodic_tasks(self, context): pass