# # Copyright 2012-2013 eNovance # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import socket import msgpack import oslo.messaging from oslo_config import cfg from oslo_utils import timeutils from oslo_utils import units from ceilometer import dispatcher from ceilometer.event.storage import models from ceilometer import messaging from ceilometer.i18n import _, _LE from ceilometer.openstack.common import log from ceilometer.openstack.common import service as os_service from ceilometer import utils OPTS = [ cfg.StrOpt('udp_address', default='0.0.0.0', help='Address to which the UDP socket is bound. Set to ' 'an empty string to disable.'), cfg.IntOpt('udp_port', default=4952, help='Port to which the UDP socket is bound.'), cfg.BoolOpt('requeue_sample_on_dispatcher_error', default=False, help='Requeue the sample on the collector sample queue ' 'when the collector fails to dispatch it. This is only valid ' 'if the sample come from the notifier publisher.'), cfg.BoolOpt('requeue_event_on_dispatcher_error', default=False, help='Requeue the event on the collector event queue ' 'when the collector fails to dispatch it.'), ] cfg.CONF.register_opts(OPTS, group="collector") cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.messaging', group='publisher_rpc') cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.messaging', group='publisher_notifier') cfg.CONF.import_opt('event_topic', 'ceilometer.publisher.messaging', group='publisher_notifier') cfg.CONF.import_opt('store_events', 'ceilometer.notification', group='notification') LOG = log.getLogger(__name__) class CollectorService(os_service.Service): """Listener for the collector service.""" def start(self): """Bind the UDP socket and handle incoming data.""" # ensure dispatcher is configured before starting other services self.dispatcher_manager = dispatcher.load_dispatcher_manager() self.rpc_server = None self.sample_listener = None self.event_listener = None super(CollectorService, self).start() if cfg.CONF.collector.udp_address: self.tg.add_thread(self.start_udp) transport = messaging.get_transport(optional=True) if transport: self.rpc_server = messaging.get_rpc_server( transport, cfg.CONF.publisher_rpc.metering_topic, self) sample_target = oslo.messaging.Target( topic=cfg.CONF.publisher_notifier.metering_topic) self.sample_listener = messaging.get_notification_listener( transport, [sample_target], [SampleEndpoint(self.dispatcher_manager)], allow_requeue=(cfg.CONF.collector. requeue_sample_on_dispatcher_error)) if cfg.CONF.notification.store_events: event_target = oslo.messaging.Target( topic=cfg.CONF.publisher_notifier.event_topic) self.event_listener = messaging.get_notification_listener( transport, [event_target], [EventEndpoint(self.dispatcher_manager)], allow_requeue=(cfg.CONF.collector. requeue_event_on_dispatcher_error)) self.event_listener.start() self.rpc_server.start() self.sample_listener.start() if not cfg.CONF.collector.udp_address: # Add a dummy thread to have wait() working self.tg.add_timer(604800, lambda: None) def start_udp(self): udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) udp.bind((cfg.CONF.collector.udp_address, cfg.CONF.collector.udp_port)) self.udp_run = True while self.udp_run: # NOTE(jd) Arbitrary limit of 64K because that ought to be # enough for anybody. data, source = udp.recvfrom(64 * units.Ki) try: sample = msgpack.loads(data, encoding='utf-8') except Exception: LOG.warn(_("UDP: Cannot decode data sent by %s"), source) else: try: LOG.debug(_("UDP: Storing %s"), sample) self.dispatcher_manager.map_method('record_metering_data', sample) except Exception: LOG.exception(_("UDP: Unable to store meter")) def stop(self): self.udp_run = False if self.rpc_server: self.rpc_server.stop() if self.sample_listener: utils.kill_listeners([self.sample_listener]) if self.event_listener: utils.kill_listeners([self.event_listener]) super(CollectorService, self).stop() def record_metering_data(self, context, data): """RPC endpoint for messages we send to ourselves. When the notification messages are re-published through the RPC publisher, this method receives them for processing. """ self.dispatcher_manager.map_method('record_metering_data', data=data) class CollectorEndpoint(object): def __init__(self, dispatcher_manager, requeue_on_error): self.dispatcher_manager = dispatcher_manager self.requeue_on_error = requeue_on_error def sample(self, ctxt, publisher_id, event_type, payload, metadata): """RPC endpoint for notification messages When another service sends a notification over the message bus, this method receives it. """ try: self.dispatcher_manager.map_method(self.method, payload) except Exception: if self.requeue_on_error: LOG.exception(_LE("Dispatcher failed to handle the %s, " "requeue it."), self.ep_type) return oslo.messaging.NotificationResult.REQUEUE raise class SampleEndpoint(CollectorEndpoint): method = 'record_metering_data' ep_type = 'sample' def __init__(self, dispatcher_manager): super(SampleEndpoint, self).__init__( dispatcher_manager, cfg.CONF.collector.requeue_sample_on_dispatcher_error) class EventEndpoint(CollectorEndpoint): method = 'record_events' ep_type = 'event' def __init__(self, dispatcher_manager): super(EventEndpoint, self).__init__( dispatcher_manager, cfg.CONF.collector.requeue_event_on_dispatcher_error) def sample(self, ctxt, publisher_id, event_type, payload, metadata): events = [] for ev in payload: try: events.append( models.Event( message_id=ev['message_id'], event_type=ev['event_type'], generated=timeutils.normalize_time( timeutils.parse_isotime(ev['generated'])), traits=[models.Trait( name, dtype, models.Trait.convert_value(dtype, value)) for name, dtype, value in ev['traits']], raw=ev.get('raw', {})) ) except Exception: LOG.exception(_LE("Error processing event and it will be " "dropped: %s"), ev) return super(EventEndpoint, self).sample( ctxt, publisher_id, event_type, events, metadata)