Multiple dispatcher enablement

Ceilometer does not allow multiple dispatchers to be configured.
With this implementation, a deployment can be configured to have
multiple dispatchers to direct the meters to database and other
outlet.

blueprint multi-dispatcher-enablement

Change-Id: I68c731f65d198d4fa1220f75752f242e74355dfe
This commit is contained in:
Tong Li 2013-07-17 09:59:31 -04:00
parent 962c6a9533
commit 70226205fd
12 changed files with 493 additions and 119 deletions

View File

@ -0,0 +1,31 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2013 IBM
#
# Author: Tong Li <litong01@us.ibm.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
class Base(object):
__metaclass__ = abc.ABCMeta
def __init__(self, conf):
self.conf = conf
@abc.abstractmethod
def record_metering_data(self, context, data):
"""Recording metering data interface."""

View File

@ -0,0 +1,72 @@
# -*- encoding: utf-8 -*-
#
# Copyright 2013 IBM Corp
#
# Author: Tong Li <litong01@us.ibm.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ceilometer import storage
from ceilometer.collector import dispatcher
from ceilometer.openstack.common import log
from ceilometer.openstack.common import timeutils
from ceilometer.publisher import rpc as publisher_rpc
LOG = log.getLogger(__name__)
class DatabaseDispatcher(dispatcher.Base):
'''Dispatcher class for recording metering data into database.
The dispatcher class which records each meter into a database configured
in ceilometer configuration file.
To enable this dispatcher, the following section needs to be present in
ceilometer.conf file
dispatchers = database
'''
def __init__(self, conf):
super(DatabaseDispatcher, self).__init__(conf)
self.storage_conn = storage.get_connection(conf)
def record_metering_data(self, context, data):
# We may have receive only one counter on the wire
if not isinstance(data, list):
data = [data]
for meter in data:
LOG.debug('metering data %s for %s @ %s: %s',
meter['counter_name'],
meter['resource_id'],
meter.get('timestamp', 'NO TIMESTAMP'),
meter['counter_volume'])
if publisher_rpc.verify_signature(
meter,
self.conf.publisher_rpc.metering_secret):
try:
# Convert the timestamp to a datetime instance.
# Storage engines are responsible for converting
# that value to something they can store.
if meter.get('timestamp'):
ts = timeutils.parse_isotime(meter['timestamp'])
meter['timestamp'] = timeutils.normalize_time(ts)
self.storage_conn.record_metering_data(meter)
except Exception as err:
LOG.error('Failed to record metering data: %s', err)
LOG.exception(err)
else:
LOG.warning(
'message signature invalid, discarding message: %r',
meter)

View File

@ -0,0 +1,81 @@
# -*- encoding: utf-8 -*-
#
# Copyright 2013 IBM Corp
#
# Author: Tong Li <litong01@us.ibm.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import logging.handlers
from oslo.config import cfg
from ceilometer.collector import dispatcher
file_dispatcher_opts = [
cfg.StrOpt('file_path',
default=None,
help='Name and the location of the file to record '
'meters.'),
cfg.IntOpt('max_bytes',
default=0,
help='The max size of the file'),
cfg.IntOpt('backup_count',
default=0,
help='The max number of the files to keep'),
]
cfg.CONF.register_opts(file_dispatcher_opts, group="dispatcher_file")
class FileDispatcher(dispatcher.Base):
'''Dispatcher class for recording metering data to a file.
The dispatcher class which logs each meter into a file configured in
ceilometer configuration file. An example configuration may look like the
following:
[dispatcher_file]
file_path = /tmp/meters
To enable this dispatcher, the following section needs to be present in
ceilometer.conf file
[collector]
dispatchers = file
'''
def __init__(self, conf):
super(FileDispatcher, self).__init__(conf)
self.log = None
# if the directory and path are configured, then log to the file
if self.conf.dispatcher_file.file_path:
dispatcher_logger = logging.Logger('dispather.file')
dispatcher_logger.setLevel(logging.INFO)
# create rotating file handler which logs meters
rfh = logging.handlers.RotatingFileHandler(
self.conf.dispatcher_file.file_path,
maxBytes=self.conf.dispatcher_file.max_bytes,
backupCount=self.conf.dispatcher_file.backup_count,
encoding='utf8')
rfh.setLevel(logging.INFO)
# Only wanted the meters to be saved in the file, not the
# project root logger.
dispatcher_logger.propagate = False
dispatcher_logger.addHandler(rfh)
self.log = dispatcher_logger
def record_metering_data(self, context, data):
if self.log:
self.log.info(data)

View File

@ -20,8 +20,8 @@ import msgpack
from oslo.config import cfg
import socket
from stevedore import extension
from stevedore import named
from ceilometer.publisher import rpc as publisher_rpc
from ceilometer.service import prepare_service
from ceilometer.openstack.common import context
from ceilometer.openstack.common.gettextutils import _
@ -30,7 +30,6 @@ from ceilometer.openstack.common import service as os_service
from ceilometer.openstack.common.rpc import dispatcher as rpc_dispatcher
from ceilometer.openstack.common.rpc import service as rpc_service
from ceilometer.openstack.common import timeutils
from ceilometer import pipeline
from ceilometer import storage
@ -51,6 +50,9 @@ OPTS = [
cfg.BoolOpt('store_events',
default=False,
help='Save event details'),
cfg.MultiStrOpt('dispatcher',
default=['database'],
help='dispatcher to process metering data'),
]
cfg.CONF.register_opts(OPTS, group="collector")
@ -108,10 +110,10 @@ def udp_collector():
class CollectorService(rpc_service.Service):
COLLECTOR_NAMESPACE = 'ceilometer.collector'
DISPATCHER_NAMESPACE = 'ceilometer.dispatcher'
def __init__(self, host, topic, manager=None):
super(CollectorService, self).__init__(host, topic, manager)
self.storage_conn = storage.get_connection(cfg.CONF)
def start(self):
super(CollectorService, self).start()
@ -140,6 +142,18 @@ class CollectorService(rpc_service.Service):
self.COLLECTOR_NAMESPACE)
self.notification_manager.map(self._setup_subscription)
# Load all configured dispatchers
self.dispatchers = []
for dispatcher in named.NamedExtensionManager(
namespace=self.DISPATCHER_NAMESPACE,
names=cfg.CONF.collector.dispatcher,
invoke_on_load=True,
invoke_args=[cfg.CONF]):
if dispatcher.obj:
self.dispatchers.append(dispatcher.obj)
LOG.info('dispatchers loaded %s' % str(self.dispatchers))
# Set ourselves up as a separate worker for the metering data,
# since the default for service is to use create_consumer().
self.conn.create_worker(
@ -168,6 +182,10 @@ class CollectorService(rpc_service.Service):
LOG.exception('Could not join consumer pool %s/%s' %
(topic, exchange_topic.exchange))
def record_metering_data(self, context, data):
for dispatcher in self.dispatchers:
dispatcher.record_metering_data(context, data)
def process_notification(self, notification):
"""Make a notification processed by an handler."""
LOG.debug('notification %r', notification.get('event_type'))
@ -236,39 +254,6 @@ class CollectorService(rpc_service.Service):
# FIXME(dhellmann): Spawn green thread?
p(list(handler.process_notification(notification)))
def record_metering_data(self, context, data):
"""This method is triggered when metering data is
cast from an agent.
"""
# We may have receive only one counter on the wire
if not isinstance(data, list):
data = [data]
for meter in data:
LOG.debug('metering data %s for %s @ %s: %s',
meter['counter_name'],
meter['resource_id'],
meter.get('timestamp', 'NO TIMESTAMP'),
meter['counter_volume'])
if publisher_rpc.verify_signature(
meter,
cfg.CONF.publisher_rpc.metering_secret):
try:
# Convert the timestamp to a datetime instance.
# Storage engines are responsible for converting
# that value to something they can store.
if meter.get('timestamp'):
ts = timeutils.parse_isotime(meter['timestamp'])
meter['timestamp'] = timeutils.normalize_time(ts)
self.storage_conn.record_metering_data(meter)
except Exception as err:
LOG.error('Failed to record metering data: %s', err)
LOG.exception(err)
else:
LOG.warning(
'message signature invalid, discarding message: %r',
meter)
def collector():
prepare_service()

View File

@ -247,6 +247,7 @@ rpc_conn_pool_size 30 Size of RPC c
rpc_response_timeout 60 Seconds to wait for a response from call or multicall
rpc_cast_timeout 30 Seconds to wait before a cast expires (TTL).
Only supported by impl_zmq.
dispatchers database The list of dispatchers to process metering data.
=========================== ==================================== ==============================================================
A sample configuration file can be found in `ceilometer.conf.sample`_.

View File

@ -375,3 +375,46 @@ Configuring keystone to work with API
default port value for ceilometer API is 8777. If the port value
has been customized, adjust accordingly.
Use multiple dispatchers
========================
.. index::
double: installing; multiple dispatchers
.. note::
The Ceilometer collector allows multiple dispatchers to be configured so that
metering data can be easily sent to multiple internal and external systems.
Ceilometer by default only saves metering data in a database, to allow
Ceilometer to send metering data to other systems in addition to the
database, multiple dispatchers can be developed and enabled by modifying
Ceilometer configuration file.
Ceilometer ships two dispatchers currently. One is called database
dispatcher, and the other is called file dispatcher. As the names imply,
database dispatcher basically sends metering data to a database driver,
eventually metering data will be saved in database. File dispatcher sends
metering data into a file. The location, name, size of the file can be
configured in ceilometer configuration file. These two dispatchers are
shipped in the Ceilometer egg and defined in the entry_points as follows:
[ceilometer.dispatcher]
file = ceilometer.collector.dispatcher.file:FileDispatcher
database = ceilometer.collector.dispatcher.database:DatabaseDispatcher
To use both dispatchers on a Ceilometer collector service, add the following
line in file ceilometer.conf
[collector]
dispatcher=database
dispatcher=file
If there is no dispatcher present, database dispatcher is used as the
default. If in some cases such as traffic tests, no dispatcher is needed,
one can configure the line like the following:
dispatcher=
With above configuration, no dispatcher is used by the Ceilometer collector
service, all metering data received by Ceilometer collector will be dropped.

View File

@ -597,6 +597,23 @@
#os_endpoint_type=publicURL
[dispatcher_file]
#
# Options defined in ceilometer.collector.dispatcher.file
#
# Name and the location of the file to record meters. (string
# value)
#file_path=<None>
# The max size of the file (integer value)
#max_bytes=0
# The max number of the files to keep (integer value)
#backup_count=0
[collector]
#
@ -617,6 +634,9 @@
# Save event details (boolean value)
#store_events=false
# dispatcher to process metering data (multi valued)
#dispatcher=database
[matchmaker_ring]
@ -644,4 +664,4 @@
#password=<None>
# Total option count: 123
# Total option count: 127

View File

@ -119,6 +119,10 @@ console_scripts =
ceilometer-alarm-singleton = ceilometer.alarm.service:singleton_alarm
ceilometer-alarm-notifier = ceilometer.alarm.service:alarm_notifier
ceilometer.dispatcher =
database = ceilometer.collector.dispatcher.database:DatabaseDispatcher
file = ceilometer.collector.dispatcher.file:FileDispatcher
[build_sphinx]
all_files = 1
build-dir = doc/build

View File

View File

@ -0,0 +1,114 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2013 IBM Corp
#
# Author: Tong Li <litong01@us.ibm.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for ceilometer/collector/dispatcher/database.py
"""
from oslo.config import cfg
from datetime import datetime
from ceilometer.collector.dispatcher import database
from ceilometer.publisher import rpc
from ceilometer.tests import base as tests_base
from ceilometer.storage import base
class TestDispatcherDB(tests_base.TestCase):
def setUp(self):
super(TestDispatcherDB, self).setUp()
self.dispatcher = database.DatabaseDispatcher(cfg.CONF)
self.ctx = None
def test_valid_message(self):
msg = {'counter_name': 'test',
'resource_id': self.id(),
'counter_volume': 1,
}
msg['message_signature'] = rpc.compute_signature(
msg,
cfg.CONF.publisher_rpc.metering_secret,
)
self.dispatcher.storage_conn = self.mox.CreateMock(base.Connection)
self.dispatcher.storage_conn.record_metering_data(msg)
self.mox.ReplayAll()
self.dispatcher.record_metering_data(self.ctx, msg)
self.mox.VerifyAll()
def test_invalid_message(self):
msg = {'counter_name': 'test',
'resource_id': self.id(),
'counter_volume': 1,
}
msg['message_signature'] = 'invalid-signature'
class ErrorConnection:
called = False
def record_metering_data(self, data):
self.called = True
self.dispatcher.storage_conn = ErrorConnection()
self.dispatcher.record_metering_data(self.ctx, msg)
assert not self.dispatcher.storage_conn.called, \
'Should not have called the storage connection'
def test_timestamp_conversion(self):
msg = {'counter_name': 'test',
'resource_id': self.id(),
'counter_volume': 1,
'timestamp': '2012-07-02T13:53:40Z',
}
msg['message_signature'] = rpc.compute_signature(
msg,
cfg.CONF.publisher_rpc.metering_secret,
)
expected = {}
expected.update(msg)
expected['timestamp'] = datetime(2012, 7, 2, 13, 53, 40)
self.dispatcher.storage_conn = self.mox.CreateMock(base.Connection)
self.dispatcher.storage_conn.record_metering_data(expected)
self.mox.ReplayAll()
self.dispatcher.record_metering_data(self.ctx, msg)
def test_timestamp_tzinfo_conversion(self):
msg = {'counter_name': 'test',
'resource_id': self.id(),
'counter_volume': 1,
'timestamp': '2012-09-30T15:31:50.262-08:00',
}
msg['message_signature'] = rpc.compute_signature(
msg,
cfg.CONF.publisher_rpc.metering_secret,
)
expected = {}
expected.update(msg)
expected['timestamp'] = datetime(2012, 9, 30, 23, 31, 50, 262000)
self.dispatcher.storage_conn = self.mox.CreateMock(base.Connection)
self.dispatcher.storage_conn.record_metering_data(expected)
self.mox.ReplayAll()
self.dispatcher.record_metering_data(self.ctx, msg)

View File

@ -0,0 +1,105 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2013 IBM Corp
#
# Author: Tong Li <litong01@us.ibm.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for ceilometer/collector/dispatcher/file.py
"""
import os
import tempfile
import logging.handlers
from oslo.config import cfg
from ceilometer.collector.dispatcher import file
from ceilometer.publisher import rpc
from ceilometer.tests import base as tests_base
class TestDispatcherFile(tests_base.TestCase):
def setUp(self):
super(TestDispatcherFile, self).setUp()
def test_file_dispatcher_with_all_config(self):
# Create a temporaryFile to get a file name
tf = tempfile.NamedTemporaryFile('r')
filename = tf.name
tf.close()
cfg.CONF.dispatcher_file.file_path = filename
cfg.CONF.dispatcher_file.max_bytes = 50
cfg.CONF.dispatcher_file.backup_count = 5
dispatcher = file.FileDispatcher(cfg.CONF)
# The number of the handlers should be 1
self.assertEqual(1, len(dispatcher.log.handlers))
# The handler should be RotatingFileHandler
handler = dispatcher.log.handlers[0]
self.assertTrue(isinstance(handler,
logging.handlers.RotatingFileHandler))
msg = {'counter_name': 'test',
'resource_id': self.id(),
'counter_volume': 1,
}
msg['message_signature'] = rpc.compute_signature(
msg,
cfg.CONF.publisher_rpc.metering_secret,
)
# The record_metering_data method should exist and not produce errors.
dispatcher.record_metering_data(None, msg)
# After the method call above, the file should have been created.
self.assertTrue(os.path.exists(handler.baseFilename))
def test_file_dispatcher_with_path_only(self):
# Create a temporaryFile to get a file name
tf = tempfile.NamedTemporaryFile('r')
filename = tf.name
tf.close()
cfg.CONF.dispatcher_file.file_path = filename
cfg.CONF.dispatcher_file.max_bytes = None
cfg.CONF.dispatcher_file.backup_count = None
dispatcher = file.FileDispatcher(cfg.CONF)
# The number of the handlers should be 1
self.assertEqual(1, len(dispatcher.log.handlers))
# The handler should be RotatingFileHandler
handler = dispatcher.log.handlers[0]
self.assertTrue(isinstance(handler,
logging.FileHandler))
msg = {'counter_name': 'test',
'resource_id': self.id(),
'counter_volume': 1,
}
msg['message_signature'] = rpc.compute_signature(
msg,
cfg.CONF.publisher_rpc.metering_secret,
)
# The record_metering_data method should exist and not produce errors.
dispatcher.record_metering_data(None, msg)
# After the method call above, the file should have been created.
self.assertTrue(os.path.exists(handler.baseFilename))
def test_file_dispatcher_with_no_path(self):
cfg.CONF.dispatcher_file.file_path = None
dispatcher = file.FileDispatcher(cfg.CONF)
# The log should be None
self.assertIsNone(dispatcher.log)

View File

@ -30,7 +30,6 @@ from stevedore.tests import manager as test_manager
from ceilometer import counter
from ceilometer.openstack.common import timeutils
from ceilometer.publisher import rpc
from ceilometer.collector import service
from ceilometer.storage import base
from ceilometer.tests import base as tests_base
@ -184,87 +183,6 @@ class TestCollectorService(TestCollector):
with patch('ceilometer.openstack.common.rpc.create_connection'):
self.srv.start()
def test_valid_message(self):
msg = {'counter_name': 'test',
'resource_id': self.id(),
'counter_volume': 1,
}
msg['message_signature'] = rpc.compute_signature(
msg,
cfg.CONF.publisher_rpc.metering_secret,
)
self.srv.storage_conn = self.mox.CreateMock(base.Connection)
self.srv.storage_conn.record_metering_data(msg)
self.mox.ReplayAll()
self.srv.record_metering_data(self.ctx, msg)
self.mox.VerifyAll()
def test_invalid_message(self):
msg = {'counter_name': 'test',
'resource_id': self.id(),
'counter_volume': 1,
}
msg['message_signature'] = 'invalid-signature'
class ErrorConnection:
called = False
def record_metering_data(self, data):
self.called = True
self.srv.storage_conn = ErrorConnection()
self.srv.record_metering_data(self.ctx, msg)
assert not self.srv.storage_conn.called, \
'Should not have called the storage connection'
def test_timestamp_conversion(self):
msg = {'counter_name': 'test',
'resource_id': self.id(),
'counter_volume': 1,
'timestamp': '2012-07-02T13:53:40Z',
}
msg['message_signature'] = rpc.compute_signature(
msg,
cfg.CONF.publisher_rpc.metering_secret,
)
expected = {}
expected.update(msg)
expected['timestamp'] = datetime.datetime(2012, 7, 2, 13, 53, 40)
self.srv.storage_conn = self.mox.CreateMock(base.Connection)
self.srv.storage_conn.record_metering_data(expected)
self.mox.ReplayAll()
self.srv.record_metering_data(self.ctx, msg)
def test_timestamp_tzinfo_conversion(self):
msg = {'counter_name': 'test',
'resource_id': self.id(),
'counter_volume': 1,
'timestamp': '2012-09-30T15:31:50.262-08:00',
}
msg['message_signature'] = rpc.compute_signature(
msg,
cfg.CONF.publisher_rpc.metering_secret,
)
expected = {}
expected.update(msg)
expected['timestamp'] = datetime.datetime(2012, 9, 30,
23, 31, 50, 262000)
self.srv.storage_conn = self.mox.CreateMock(base.Connection)
self.srv.storage_conn.record_metering_data(expected)
self.mox.ReplayAll()
self.srv.record_metering_data(self.ctx, msg)
@patch('ceilometer.pipeline.setup_pipeline', MagicMock())
def test_process_notification(self):
# If we try to create a real RPC connection, init_host() never