From a00f360f8fad0199df8f9ec1313885354e15f1f6 Mon Sep 17 00:00:00 2001 From: Emma Foley Date: Mon, 14 Dec 2015 15:21:56 +0000 Subject: [PATCH] Implement plugin Add functionality to the plugin: - Convert from collectd data sources to Ceilometer format - Add unit mappings for Ceilometer - Define resource IDs - Add unit tests Change-Id: Ica1f49ea3c9bbc4bc857044dea7da39869b33bba --- MANIFEST.in | 4 + .../__init__.py | 2 +- collectd_ceilometer/keystone_light.py | 183 +++++++++ collectd_ceilometer/logger.py | 48 +++ .../meters/__init__.py | 11 +- collectd_ceilometer/meters/base.py | 44 +++ collectd_ceilometer/meters/libvirt.py | 48 +++ collectd_ceilometer/meters/storage.py | 43 +++ collectd_ceilometer/plugin.py | 87 +++++ collectd_ceilometer/sender.py | 175 +++++++++ collectd_ceilometer/settings.py | 173 +++++++++ collectd_ceilometer/singleton.py | 40 ++ .../tests/__init__.py | 0 collectd_ceilometer/tests/base.py | 179 +++++++++ .../tests/test_collectd_ceilometer.py | 27 ++ .../tests/test_collectd_ceilometer_plugin.py | 6 +- collectd_ceilometer/tests/test_config.py | 176 +++++++++ .../tests/test_keystone_light.py | 222 +++++++++++ collectd_ceilometer/tests/test_plugin.py | 364 ++++++++++++++++++ collectd_ceilometer/units.py | 203 ++++++++++ collectd_ceilometer/writer.py | 147 +++++++ doc/source/installation.rst | 12 + doc/source/usage.rst | 11 + setup.cfg | 12 +- test-requirements.txt | 1 + tox.ini | 2 +- 26 files changed, 2201 insertions(+), 19 deletions(-) rename {collectd_ceilometer_plugin => collectd_ceilometer}/__init__.py (93%) create mode 100644 collectd_ceilometer/keystone_light.py create mode 100644 collectd_ceilometer/logger.py rename collectd_ceilometer_plugin/tests/base.py => collectd_ceilometer/meters/__init__.py (72%) create mode 100644 collectd_ceilometer/meters/base.py create mode 100644 collectd_ceilometer/meters/libvirt.py create mode 100644 collectd_ceilometer/meters/storage.py create mode 100644 collectd_ceilometer/plugin.py create mode 100644 collectd_ceilometer/sender.py create mode 100644 collectd_ceilometer/settings.py create mode 100644 collectd_ceilometer/singleton.py rename {collectd_ceilometer_plugin => collectd_ceilometer}/tests/__init__.py (100%) create mode 100644 collectd_ceilometer/tests/base.py create mode 100644 collectd_ceilometer/tests/test_collectd_ceilometer.py rename {collectd_ceilometer_plugin => collectd_ceilometer}/tests/test_collectd_ceilometer_plugin.py (84%) create mode 100644 collectd_ceilometer/tests/test_config.py create mode 100644 collectd_ceilometer/tests/test_keystone_light.py create mode 100644 collectd_ceilometer/tests/test_plugin.py create mode 100644 collectd_ceilometer/units.py create mode 100644 collectd_ceilometer/writer.py create mode 100644 doc/source/installation.rst create mode 100644 doc/source/usage.rst diff --git a/MANIFEST.in b/MANIFEST.in index c978a52..52ac2f0 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,5 +1,9 @@ include AUTHORS +include README include ChangeLog +include requirements.txt test-requirements.txt +include tox.ini +include example.conf exclude .gitignore exclude .gitreview diff --git a/collectd_ceilometer_plugin/__init__.py b/collectd_ceilometer/__init__.py similarity index 93% rename from collectd_ceilometer_plugin/__init__.py rename to collectd_ceilometer/__init__.py index 07cc44f..5120983 100644 --- a/collectd_ceilometer_plugin/__init__.py +++ b/collectd_ceilometer/__init__.py @@ -11,7 +11,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - +"""Collectd Ceilometer plugin implementation""" import pbr.version diff --git a/collectd_ceilometer/keystone_light.py b/collectd_ceilometer/keystone_light.py new file mode 100644 index 0000000..e9c5183 --- /dev/null +++ b/collectd_ceilometer/keystone_light.py @@ -0,0 +1,183 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" Lightweight (keystone) client for the OpenStack Identity API """ + +import requests + + +class KeystoneException(Exception): + def __init__(self, message, exc=None, response=None): + if exc: + message += "\nReason: %s" % exc + super(KeystoneException, self).__init__(message) + + self.response = response + self.exception = exc + + +class InvalidResponse(KeystoneException): + def __init__(self, exc, response): + super(InvalidResponse, self).__init__( + "Invalid response from ident", exc, response) + + +class MissingServices(KeystoneException): + def __init__(self, message, exc, response): + super(MissingServices, self).__init__( + "MissingServices: " + message, exc, response) + + +class ClientV2(object): + """Light weight client for the OpenStack Identity API V2. + + :param string username: Username for authentication. (optional) + :param string password: Password for authentication. + :param string tenant_name: Tenant name. (optional) + :param string auth_url: Keystone service endpoint for authorization. + + """ + + def __init__(self, auth_url, username, password, tenant_name): + """Initialize a new client""" + + self.auth_url = auth_url + self.username = username + self.password = password + self.tenant_name = tenant_name + self._auth_token = None + self._services = None + + @property + def auth_token(self): + """Return token string usable for X-Auth-Token """ + # actualize token + self.refresh() + return self._auth_token + + @property + def services(self): + """Return list of services retrieved from identity server """ + return self._services + + def _get_auth_data(self, headers=None): + """Prepare auth data for request """ + + auth = {'password': self.password} + + if self.username: + auth['username'] = self.username + + return {'passwordCredentials': auth} + + def _request_identity_data(self): + """Will send (POST) and retrieve data from identity server """ + + headers = {'Accept': 'application/json'} + url = self.auth_url.rstrip('/') + '/tokens' + params = {'auth': self._get_auth_data(headers)} + + if self.tenant_name: + params['auth']['tenantName'] = self.tenant_name + + resp = requests.post(url, json=params, headers=headers) + try: + resp.raise_for_status() + resp_data = resp.json()['access'] + except (KeyError, ValueError, requests.exceptions.HTTPError) as e: + raise InvalidResponse(e, resp.json()) + + return resp_data + + def refresh(self): + """Refresh token and services list (getting it from identity server) """ + resp_data = self._request_identity_data() + + try: + self._services = resp_data['serviceCatalog'] + token = resp_data['token'] + + self._auth_token = token['id'] + except (TypeError, KeyError, ValueError) as e: + raise InvalidResponse(e, resp_data) + + return resp_data + + def get_service_endpoint(self, name, urlkey="internalURL", region=None): + """Return url endpoint of service + + possible values of urlkey = 'adminURL' | 'publicURL' | 'internalURL' + provide region if more endpoints are available + """ + endpoints = None + + try: + for service in self._services: + if service['name'] == name: + endpoints = service['endpoints'] + break + + if not endpoints: + raise MissingServices("Missing name '%s' in received services" + % name, + None, self._services) + + # preselect default + endpoint = endpoints[0] + + if region: + for ep in endpoints: + if ep['region'] == region: + endpoint = ep + break + + return endpoint[urlkey].rstrip('/') + except (KeyError, ValueError) as e: + raise MissingServices("Missing data in received services", + e, self._services) + +""" +Example of response (part only) +{ + "token": { + "issued_at": "2015-09-04T08:59:09.991646", + "expires": "2015-09-04T09:59:09Z", + "id": "c5bbb1c9a27e470fb482de2a718e08c2", + "tenant": { + "enabled": true, + "description": null, + "name": "service", + "id": "fdeec62f6c794c8dbfda448a83de9ce2" + }, + "audit_ids": [ + "Pig7hVfGQjSuUnt1Hc5mCg" + ] + }, + "serviceCatalog": [ + { + "endpoints_links": [], + "endpoints": [ + { + "adminURL": "http://10.237.214.74:8777/", + "region": "RegionOne", + "publicURL": "http://10.237.214.74:8777/", + "internalURL": "http://10.237.214.74:8777/", + "id": "ac95b1a24a854ec7a4b63b08ed4cbd83" + } + ], + "type": "metering", + "name": "ceilometer" + }, + ], +} +""" diff --git a/collectd_ceilometer/logger.py b/collectd_ceilometer/logger.py new file mode 100644 index 0000000..4356c2f --- /dev/null +++ b/collectd_ceilometer/logger.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Ceilometer collectd plugin implementation""" + +from __future__ import unicode_literals + +# pylint: disable=import-error +import collectd +# pylint: enable=import-error +from collectd_ceilometer.settings import Config +import logging + + +class CollectdLogHandler(logging.Handler): + """A handler class for collectd plugin""" + + priority_map = { + logging.DEBUG: collectd.debug, + logging.INFO: collectd.info, + logging.WARNING: collectd.warning, + logging.ERROR: collectd.error, + logging.CRITICAL: collectd.error + } + cfg = Config.instance() + + def emit(self, record): + try: + msg = self.format(record) + + logger = self.priority_map.get(record.levelno, collectd.error) + + if self.cfg.VERBOSE and logging.DEBUG == record.levelno: + logger = collectd.info + logger(msg) + + except Exception as e: + collectd.info("Exception in logger %s" % e) diff --git a/collectd_ceilometer_plugin/tests/base.py b/collectd_ceilometer/meters/__init__.py similarity index 72% rename from collectd_ceilometer_plugin/tests/base.py rename to collectd_ceilometer/meters/__init__.py index 1c30cdb..d49220f 100644 --- a/collectd_ceilometer_plugin/tests/base.py +++ b/collectd_ceilometer/meters/__init__.py @@ -1,8 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2010-2011 OpenStack Foundation -# Copyright (c) 2013 Hewlett-Packard Development Company, L.P. -# # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at @@ -14,10 +11,8 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +"""Collectd meters""" -from oslotest import base +from __future__ import unicode_literals - -class TestCase(base.BaseTestCase): - - """Test case base class for all unit tests.""" +from collectd_ceilometer.meters.storage import MeterStorage # noqa diff --git a/collectd_ceilometer/meters/base.py b/collectd_ceilometer/meters/base.py new file mode 100644 index 0000000..253d124 --- /dev/null +++ b/collectd_ceilometer/meters/base.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Default collectd meter""" + +from __future__ import unicode_literals + +from collectd_ceilometer.settings import Config + + +class Meter(object): + """Default collectd meter""" + + def meter_name(self, vl): + """Return meter name""" + # pylint: disable=no-self-use + resources = [vl.plugin, vl.type] + return '.'.join([i for i in resources if i]) + + def hostname(self, vl): + """Get host name""" + # pylint: disable=no-self-use + return vl.host + + def resource_id(self, vl): + """Get resource ID""" + + resources = [self.hostname(vl), vl.plugin_instance, vl.type_instance] + return '-'.join([i for i in resources if i]) + + def unit(self, vl): + """Get meter unit""" + # pylint: disable=no-self-use + return Config.instance().unit(vl.plugin, vl.type) diff --git a/collectd_ceilometer/meters/libvirt.py b/collectd_ceilometer/meters/libvirt.py new file mode 100644 index 0000000..9977854 --- /dev/null +++ b/collectd_ceilometer/meters/libvirt.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Collectd meter for libvirt plugin""" + +from __future__ import unicode_literals + +from collectd_ceilometer.meters.base import Meter +from collectd_ceilometer.settings import Config +import libvirt +import threading + + +class LibvirtMeter(Meter): + """Specialization for libvirt plugin""" + + def __init__(self): + self._cache_lock = threading.Lock() + self._conn = None + self._vms = {} + + def hostname(self, vl): + """Get hostname based on the input""" + + hostname = self._vms.get(vl.host) + if not hostname: + with self._cache_lock: + # check again with lock because another thread could + # store the hostname meanwhile + hostname = self._vms.get(vl.host) + if not hostname: + if self._conn is None: + self._conn = libvirt.openReadOnly( + Config.instance().LIBVIRT_CONN_URI) + + hostname = self._conn.lookupByName(vl.host).UUIDString() + self._vms[vl.host] = hostname + return hostname diff --git a/collectd_ceilometer/meters/storage.py b/collectd_ceilometer/meters/storage.py new file mode 100644 index 0000000..adba79b --- /dev/null +++ b/collectd_ceilometer/meters/storage.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Meter storage""" + +from __future__ import unicode_literals + +import six + +from collectd_ceilometer.meters.base import Meter +from collectd_ceilometer.meters.libvirt import LibvirtMeter + + +class MeterStorage(object): + """Meter storage""" + + # all plugins + _classes = { + 'libvirt': LibvirtMeter, + } + + def __init__(self): + self._meters = {} + self._default = Meter() + + # fill dict with specialized meters classes + self._meters = {key: meter_class() + for key, meter_class in six.iteritems(self._classes)} + + def get(self, plugin): + """Get meter for the collectd plugin""" + # return specialized meter class for collectd plugin or default Meter + return self._meters.get(plugin, self._default) diff --git a/collectd_ceilometer/plugin.py b/collectd_ceilometer/plugin.py new file mode 100644 index 0000000..e77b08a --- /dev/null +++ b/collectd_ceilometer/plugin.py @@ -0,0 +1,87 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Ceilometer collectd plugin""" + +from __future__ import unicode_literals + +# pylint: disable=import-error +import collectd +# pylint: enable=import-error + +from collectd_ceilometer.logger import CollectdLogHandler +from collectd_ceilometer.meters import MeterStorage +from collectd_ceilometer.settings import Config +from collectd_ceilometer.writer import Writer +import logging + +logging.getLogger().addHandler(CollectdLogHandler()) +logging.getLogger().setLevel(logging.NOTSET) +LOGGER = logging.getLogger(__name__) + + +class Plugin(object): + """Ceilometer plugin with collectd callbacks""" + # NOTE: this is multithreaded class + + def __init__(self): + self._meters = None + self._writer = None + logging.getLogger("requests").setLevel(logging.WARNING) + + def config(self, cfg): + """Configuration callback + + @param cfg configuration node provided by collectd + """ + # pylint: disable=no-self-use + Config.instance().read(cfg) + + def init(self): + """Initialization callback""" + + collectd.info('Initializing the collectd OpenStack python plugin') + self._meters = MeterStorage() + self._writer = Writer(self._meters) + + def write(self, vl, data=None): + """Collectd write callback""" + # pylint: disable=broad-except + # pass arguments to the writer + try: + self._writer.write(vl, data) + except Exception as exc: + if collectd is not None: + collectd.error('Exception during write: %s' % exc) + + def shutdown(self): + """Shutdown callback""" + # pylint: disable=broad-except + collectd.info("SHUTDOWN") + try: + self._writer.flush() + except Exception as exc: + if collectd is not None: + collectd.error('Exception during shutdown: %s' % exc) + + +# The collectd plugin instance +# pylint: disable=invalid-name +instance = Plugin() +# pylint: enable=invalid-name + +# Register plugin callbacks +collectd.register_init(instance.init) +collectd.register_config(instance.config) +collectd.register_write(instance.write) +collectd.register_shutdown(instance.shutdown) diff --git a/collectd_ceilometer/sender.py b/collectd_ceilometer/sender.py new file mode 100644 index 0000000..366872b --- /dev/null +++ b/collectd_ceilometer/sender.py @@ -0,0 +1,175 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Ceilometer collectd plugin implementation""" + +from __future__ import division +from __future__ import unicode_literals + +from collectd_ceilometer.keystone_light import ClientV2 as keystoneClientV2 +from collectd_ceilometer.keystone_light import KeystoneException +from collectd_ceilometer.settings import Config +import logging +import requests +from requests.exceptions import RequestException +import six +import threading + + +LOGGER = logging.getLogger(__name__) + +# HTTP status codes +HTTP_CREATED = 201 +HTTP_UNAUTHORIZED = 401 + + +class Sender(object): + """Sends the JSON serialized data to Ceilometer""" + + def __init__(self): + """Create the Sender instance + + The cofinguration must be initialized before the object is created. + """ + self._url_base = None + self._keystone = None + self._auth_token = None + self._auth_lock = threading.Lock() + self._failed_auth = False + + def _authenticate(self): + """Authenticate and renew the authentication token""" + + # if auth_token is available, just return it + if self._auth_token is not None: + return self._auth_token + + # aquire the authentication lock + with self._auth_lock: + # re-check the auth_token as another thread could set it + if self._auth_token is not None: + return self._auth_token + + LOGGER.debug('Authenticating request') + # pylint: disable=broad-except + try: + # create a keystone client if it doesn't exist + if self._keystone is None: + cfg = Config.instance() + self._keystone = keystoneClientV2( + auth_url=cfg.OS_AUTH_URL, + username=cfg.OS_USERNAME, + password=cfg.OS_PASSWORD, + tenant_name=cfg.OS_TENANT_NAME + ) + # store the authentication token + self._auth_token = self._keystone.auth_token + + # get the uri of service endpoint + endpoint = self._keystone.get_service_endpoint( + "ceilometer", + Config.instance().CEILOMETER_URL_TYPE) + + self._url_base = "{}/v2/meters/%s".format(endpoint) + LOGGER.info('Authenticating request - success') + self._failed_auth = False + + except KeystoneException as exc: + log_level = logging.DEBUG + + if not self._failed_auth: + log_level = logging.ERROR + LOGGER.error( + 'Suspending error logs until successful auth' + ) + + LOGGER.log(log_level, 'Authentication error: %s', + six.text_type(exc), + exc_info=0) + + if exc.response: + LOGGER.debug('Response: %s', exc.response) + + self._auth_token = None + self._failed_auth = True + + return self._auth_token + + def send(self, metername, payload): + """Send the payload to Ceilometer""" + + # get the auth_token + auth_token = self._authenticate() + + # if auth_token is not set, there is nothing to do + if auth_token is None: + LOGGER.debug('Unable to send data. Not authenticated') + return + + if self._url_base is None: + LOGGER.debug( + 'Unable to send data. Missing endpoint from ident server') + return + + # create request URL + url = self._url_base % metername + + # send the POST request + result = self._perform_request(url, payload, auth_token) + if not result: + return + + # if the request failed due to an auth error + if result.status_code == HTTP_UNAUTHORIZED: + # reset the auth token in order to force the subsequent + # _authenticate() call to renew it + # Here, it can happen that the token is reset right after + # another thread has finished the authentication and thus + # the authentication may be performed twice + self._auth_token = None + + LOGGER.debug('Result: %s %s', + six.text_type(result.status_code), + result.text) + + # renew the authentication token + auth_token = self._authenticate() + + if auth_token is not None: + # and try to repost + result = self._perform_request(url, payload, auth_token) + + if result.status_code == HTTP_CREATED: + LOGGER.debug('Result: %s', HTTP_CREATED) + else: + LOGGER.info('Result: %s %s', + result.status_code, + result.text) + + @classmethod + def _perform_request(cls, url, payload, auth_token): + """Perform the POST request""" + + LOGGER.debug('Performing request to %s', url) + + # request headers + headers = {'X-Auth-Token': auth_token, + 'Content-type': 'application/json'} + # perform request and return its result + try: + return requests.post( + url, data=payload, headers=headers, + timeout=(Config.instance().CEILOMETER_TIMEOUT / 1000.)) + except RequestException as exc: + LOGGER.error('Ceilometer request error: %s', six.text_type(exc)) + return None diff --git a/collectd_ceilometer/settings.py b/collectd_ceilometer/settings.py new file mode 100644 index 0000000..51bc0ae --- /dev/null +++ b/collectd_ceilometer/settings.py @@ -0,0 +1,173 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Ceilometer collectd plugin configuration""" + +from __future__ import unicode_literals + +from collectd_ceilometer.singleton import Singleton +from collectd_ceilometer.units import UNITS +from collections import namedtuple +import logging +import six + +LOGGER = logging.getLogger(__name__) + + +class BadConfigError(Exception): + """Configuration exception""" + pass + + +class CfgParam(namedtuple('CfgParam', ['key', 'default', 'data_type'])): + """Configuration parameter definition""" + + def value(self, data): + """Convert a string to the parameter type""" + + try: + return self.data_type(data) + except (ValueError, TypeError) as exc: + LOGGER.info('Config value exception: %s', six.text_type(exc)) + raise BadConfigError( + 'Invalid value "%s" for configuration parameter "%s"' % ( + data, self.key)) + + +@Singleton +class Config(object): + """Plugin confguration""" + + _configuration = [ + CfgParam('BATCH_SIZE', 1, int), + CfgParam('OS_AUTH_URL', None, six.text_type), + CfgParam('CEILOMETER_URL_TYPE', 'internalURL', six.text_type), + CfgParam('CEILOMETER_TIMEOUT', 1000, int), + CfgParam('OS_USERNAME', None, six.text_type), + CfgParam('OS_PASSWORD', None, six.text_type), + CfgParam('OS_TENANT_NAME', None, six.text_type), + CfgParam('VERBOSE', False, bool), + + CfgParam('LIBVIRT_CONN_URI', 'qemu:///system', six.text_type), + ] + + _config_dict = {cfg.key: cfg for cfg in _configuration} + _config_keys = _config_dict.keys() + + def __init__(self): + """Set the default values""" + + # init all parameters to default values + for cfg in self._configuration: + setattr(self, cfg.key, cfg.default) + + # dictionary for user-defined units + self._user_units = {} + self._units = UNITS.copy() + + def read(self, cfg): + """Read the collectd configuration + + @param cfg configuration provided by collectd + """ + LOGGER.info('Reading the OS plugin configuration') + assert 'MODULE' == cfg.key.upper() + self._read_node(cfg) + + # verify the configuration + error = False + for key in self._config_keys: + if getattr(self, key, None) is None: + LOGGER.error('Configuration parameter %s not set.', key) + error = True + if error: + LOGGER.error( + 'Collectd plugin for Ceilometer will not work properly') + else: + LOGGER.info('Configuration OK') + + def unit(self, plugin, pltype): + """Get unit for plugin and type""" + + if pltype: + unit = self._units.get('%s.%s' % (plugin, pltype)) + if unit: + return unit + return self._units.get(plugin, "None") + + def _read_node(self, node): + """Read a configuration node + + @param node collectd configuration node + """ + + key = node.key.upper() + + # if the node is 'UNITS' call the units function + if key == 'UNITS': + self._read_units(node.children) + return + + # if we have a root node read only all its children + # as we don't expect any value here + if node.children: + for child in node.children: + self._read_node(child) + return + + # if the configuration key is known + if key in self._config_keys: + try: + # read, normalize and check the value + val = self._config_dict[key].value(node.values[0]) + + # and store it as a attribute of current instance + setattr(self, key, val) + + except (IndexError, TypeError): + # the variable node.value is an empty list + # or it is not a list at all + LOGGER.error('No configuration value found for "%s"', key) + return + except BadConfigError as exc: + LOGGER.error(six.text_type(exc)) + return + + # do not show the password in the logs + if key == 'OS_PASSWORD': + val = '*****' + LOGGER.info( + 'Got configuration parameter: %s -> "%s"', key, val) + else: + LOGGER.error('Unknown configuration parameter "%s"', key) + + def _read_units(self, nodes): + """Read user-defined units + + @param node collectd configuration nodes + """ + for node in nodes: + if node.key.upper() == 'UNIT': + if len(node.values) == 2: + key, val = node.values + self._user_units[key] = val + LOGGER.info( + 'Got user defined unit "%s" for "%s"', val, key) + else: + LOGGER.error( + 'Invalid unit configuration: unit %s' % ' '.join( + ['"%s"' % i for i in node.values])) + else: + LOGGER.error( + 'Invalid unit configuration: %s', node.key.upper()) + self._units.update(self._user_units) diff --git a/collectd_ceilometer/singleton.py b/collectd_ceilometer/singleton.py new file mode 100644 index 0000000..0148c36 --- /dev/null +++ b/collectd_ceilometer/singleton.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Generic singleton implementation""" + +from __future__ import unicode_literals + +import threading + + +class Singleton(object): + """Generic singleton""" + + def __init__(self, decorated): + self._decorated = decorated + self._lock = threading.Lock() + + def __call__(self, *args, **kwargs): + raise TypeError('Singleton must be accessed through instance() method') + + def instance(self): + """Return singleton instance""" + # pylint: disable=attribute-defined-outside-init + try: + return self._instance + except AttributeError: + with self._lock: + if not hasattr(self, '_instance'): + self._instance = self._decorated() + return self._instance diff --git a/collectd_ceilometer_plugin/tests/__init__.py b/collectd_ceilometer/tests/__init__.py similarity index 100% rename from collectd_ceilometer_plugin/tests/__init__.py rename to collectd_ceilometer/tests/__init__.py diff --git a/collectd_ceilometer/tests/base.py b/collectd_ceilometer/tests/base.py new file mode 100644 index 0000000..d5dddec --- /dev/null +++ b/collectd_ceilometer/tests/base.py @@ -0,0 +1,179 @@ +# -*- coding: utf-8 -*- + +# Copyright 2010-2011 OpenStack Foundation +# Copyright (c) 2015 Intel Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Unittest tools""" + +from __future__ import unicode_literals + +from collectd_ceilometer.keystone_light import KeystoneException +from collections import OrderedDict +import logging +from mock import Mock +from mock import patch +import six +import unittest + + +class Value(object): + """Value used for testing""" + + def __init__(self): + self.host = 'localhost' + self.plugin = None + self.plugin_instance = None + self.type = None + self.type_instance = None + self.time = 123456789 + self.values = [] + self.meta = None + + def add_value(self, value): + """Add value""" + self.values.append(value) + + +class TestConfig(object): + """Test configuration""" + + default_values = OrderedDict([ + ('BATCH_SIZE', 1,), + ('OS_AUTH_URL', 'https://test-auth.url.tld/test',), + ('CEILOMETER_URL_TYPE', 'internalURL',), + ('CEILOMETER_TIMEOUT', 1000,), + ('OS_USERNAME', 'tester',), + ('OS_PASSWORD', 'testpasswd',), + ('OS_TENANT_NAME', 'service',), + ]) + + def __init__(self): + self._values = self.default_values.copy() + self._units = {} + + def update_value(self, key, value): + """Update the configuration value + + @param key configuration key + @param value configuration value + """ + self._values.update({key: value}) + + def add_unit(self, name, unit): + """Add user defined unit + + @param name name of the plugin + @param unit unit name + """ + self._units.update({name: unit}) + + @property + def node(self): + """Return the master node of current configuration + + Return the configuration node in format readable by config singleton. + """ + nodes = [self._Node(key=key, values=[val]) + for key, val in six.iteritems(self._values)] + units = [self._Node(key='UNIT', values=[key, val]) + for key, val in six.iteritems(self._units)] + if units: + nodes.append(self._Node(key='UNITS', children=units)) + return self._Node(key='MODULE', children=nodes) + + class _Node(object): + """Test configuration node""" + + def __init__(self, children=None, key=None, values=None): + """Create the node + + @param children list of children nodes + @param key configuration key + @param value configuration value + """ + + if children is None: + children = [] + if values is None: + values = [] + + self.children = children + self.key = key + self.values = values + + +class TestCase(unittest.TestCase): + + def __init__(self, *args, **kwargs): + """Declare additional class attributes""" + super(TestCase, self).__init__(*args, **kwargs) + self._patchset = None + self._mocked = {} + + def get_mock(self, module): + """Get module mock""" + return self._mocked.get(module) + + def setUp(self): + """Mock collectd module""" + + super(TestCase, self).setUp() + + modules = ['collectd', 'libvirt', 'requests', + 'collectd_ceilometer.keystone_light'] + + self._mocked = {module: Mock() for module in modules} + + # requests + requests = self.get_mock('requests') + requests.exceptions.RequestException = Exception + self._mocked.update({'requests.exceptions': requests.exceptions}) + + keystone = self.get_mock('collectd_ceilometer.keystone_light') + keystone.KeystoneException = KeystoneException + self._mocked.update( + {'collectd_ceilometer.keystone_light.KeystoneException': + keystone.KeystoneException}) + + self._patchset = patch.dict('sys.modules', self._mocked) + self._patchset.start() + + self.config = TestConfig() + + logging.getLogger().handlers = [] + + def tearDown(self): + """Clean up""" + self._patchset.stop() + + def assertNoError(self): + """Assert no error has been logged""" + collectd = self.get_mock('collectd') + self.assertFalse(collectd.error.called, [collectd.error.call_args_list]) + + def assertError(self, msg): + """Assert an error has been logged""" + + collectd = self.get_mock('collectd') + self.assertTrue(collectd.error.called, + 'Error "%s" expected but not logged' % msg) + self.assertIn(((msg,),), collectd.error.call_args_list) + + def assertErrors(self, errors): + """Assert the list of logged errors""" + + collectd = self.get_mock('collectd') + self.assertTrue(collectd.error.called, 'Errors expected but not logged') + expected = [((i,),) for i in errors] + self.assertEqual(expected, collectd.error.call_args_list) diff --git a/collectd_ceilometer/tests/test_collectd_ceilometer.py b/collectd_ceilometer/tests/test_collectd_ceilometer.py new file mode 100644 index 0000000..4325433 --- /dev/null +++ b/collectd_ceilometer/tests/test_collectd_ceilometer.py @@ -0,0 +1,27 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +test_collectd_ceilometer +------------------------ + +Tests for `collectd_ceilometer` module. +""" + +from collectd_ceilometer.tests import base + + +class TestCollectdCeilometer(base.TestCase): + def test_something(self): + pass diff --git a/collectd_ceilometer_plugin/tests/test_collectd_ceilometer_plugin.py b/collectd_ceilometer/tests/test_collectd_ceilometer_plugin.py similarity index 84% rename from collectd_ceilometer_plugin/tests/test_collectd_ceilometer_plugin.py rename to collectd_ceilometer/tests/test_collectd_ceilometer_plugin.py index 2b02594..96a2fb5 100644 --- a/collectd_ceilometer_plugin/tests/test_collectd_ceilometer_plugin.py +++ b/collectd_ceilometer/tests/test_collectd_ceilometer_plugin.py @@ -13,13 +13,13 @@ # under the License. """ -test_collectd_ceilometer_plugin +test_collectd_ceilometer ---------------------------------- -Tests for `collectd_ceilometer_plugin` module. +Tests for `collectd_ceilometer` module. """ -from collectd_ceilometer_plugin.tests import base +from collectd_ceilometer.tests import base class TestCollectdCeilometerPlugin(base.TestCase): diff --git a/collectd_ceilometer/tests/test_config.py b/collectd_ceilometer/tests/test_config.py new file mode 100644 index 0000000..6ad0eb2 --- /dev/null +++ b/collectd_ceilometer/tests/test_config.py @@ -0,0 +1,176 @@ +# -*- coding: utf-8 -*- + +# Copyright 2010-2011 OpenStack Foundation +# Copyright (c) 2015 Intel Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Configuration tests""" + +from __future__ import unicode_literals + +from collectd_ceilometer.tests.base import TestCase +import mock +import six + + +class TestConfig(TestCase): + """Test configuration reader""" + + def setUp(self): + """Initialization""" + + super(TestConfig, self).setUp() + + # TODO (ema-l-foley) Import at top and mock here + from collectd_ceilometer.settings import Config + self.config_class = Config + + @mock.patch('collectd_ceilometer.settings.LOGGER') + def test_default_configuration(self, mock_log): + """Test valid configuration""" + cfg = self.config_class.instance() + + # read default configuration + cfg.read(self.config.node) + + # compare the configuration values with the default values + for key in self.config.default_values.keys(): + self.assertEqual(getattr(cfg, key), + self.config.default_values[key]) + + # test configuration change + self.assertEqual(cfg.BATCH_SIZE, 1) + self.config.update_value('BATCH_SIZE', 10) + cfg.read(self.config.node) + self.assertEqual(cfg.BATCH_SIZE, 10) + mock_log.error.assert_not_called() + + def test_singleton(self): + """Test config singleton class + + Verify that the TypeError exception is raised when the instance + of the Config class is created by user. + """ + # pylint: disable=invalid-name,unused-variable + + Config = self.config_class + + with self.assertRaises(TypeError) as exc: + # must rise a TypeError as the singleton class cannot + # be created by the user and can be accessed only + # by its instance() method + new_cfg = Config() # flake8: noqa + + self.assertEqual( + six.text_type(exc.exception), + 'Singleton must be accessed through instance() method') + + @mock.patch('collectd_ceilometer.settings.LOGGER') + def test_invalid_value(self, mock_log): + """Test invalid value + + Test string instead of int + """ + cfg = self.config_class.instance() + self.config.update_value('BATCH_SIZE', 'xyz') + cfg.read(self.config.node) + self.assertEqual(cfg.BATCH_SIZE, 1) + mock_log.error.assert_called_with( + 'Invalid value "xyz" for configuration parameter "BATCH_SIZE"') + + @mock.patch('collectd_ceilometer.settings.LOGGER') + def test_unknown_parameter(self, mock_log): + """Test unknown parameter + + Test configuration parameter which is not known (expected)""" + + cfg = self.config_class.instance() + self.config.update_value('UNKNOWN', 'xyz') + cfg.read(self.config.node) + self.assertFalse(hasattr(cfg, 'UNKNOWN')) + mock_log.error.assert_called_with('Unknown configuration parameter "%s"', 'UNKNOWN') + + @mock.patch('collectd_ceilometer.settings.LOGGER') + def test_missing_value(self, mock_log): + """Test configuration node vithout value""" + + cfg = self.config_class.instance() + + # remove values from some node + node = self.config.node + first = node.children[1] + self.assertEqual(first.key, 'OS_AUTH_URL') + first.values = [] + + cfg.read(node) + + mock_log.error.assert_any_call( + 'No configuration value found for "%s"', "OS_AUTH_URL") + mock_log.error.assert_any_call( + 'Configuration parameter %s not set.', "OS_AUTH_URL") + mock_log.error.assert_any_call( + 'Collectd plugin for Ceilometer will not work properly') + + @mock.patch('collectd_ceilometer.settings.LOGGER') + def test_user_units(self, mock_log): + """Test configuration with user defined units""" + self.config.add_unit('age', 'years') + self.config.add_unit('star.distance', 'LY') + self.config.add_unit('star.temperature', 'K') + + cfg = self.config_class.instance() + cfg.read(self.config.node) + mock_log.error.assert_not_called() + + self.assertEqual(cfg.unit('age', None), 'years') + self.assertEqual(cfg.unit('star', 'distance'), 'LY') + self.assertEqual(cfg.unit('star', 'temperature'), 'K') + self.assertEqual(cfg.unit('monty', None), 'None') + self.assertEqual(cfg.unit('monty', 'python'), 'None') + + @mock.patch('collectd_ceilometer.settings.LOGGER') + def test_user_units_invalid(self, mock_log): + """Test invalid user defined units + + The unit node contains three values (two are expected) + """ + + self.config.add_unit('age', 'years') + + node = self.config.node + unit = node.children[-1].children[0] + unit.values = [1, 2, 3] + + cfg = self.config_class.instance() + cfg.read(node) + + mock_log.error.assert_called_with( + 'Invalid unit configuration: unit "1" "2" "3"') + self.assertEqual(cfg.unit('age', None), 'None') + + @mock.patch('collectd_ceilometer.settings.LOGGER') + def test_user_units_invalid_invalid_node(self, mock_log): + """Test invalid node with units configuration""" + + self.config.add_unit('age', 'years') + + node = self.config.node + unit = node.children[-1].children[0] + unit.key = 'NOT_UNITS' + + cfg = self.config_class.instance() + cfg.read(node) + + mock_log.error.assert_called_with( + 'Invalid unit configuration: %s',"NOT_UNITS") + self.assertEqual(cfg.unit('age', None), 'None') diff --git a/collectd_ceilometer/tests/test_keystone_light.py b/collectd_ceilometer/tests/test_keystone_light.py new file mode 100644 index 0000000..99b9983 --- /dev/null +++ b/collectd_ceilometer/tests/test_keystone_light.py @@ -0,0 +1,222 @@ +# -*- coding: utf-8 -*- + +# Copyright 2010-2011 OpenStack Foundation +# Copyright (c) 2015 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Plugin tests""" + +from __future__ import unicode_literals + +from collectd_ceilometer import keystone_light +from collectd_ceilometer.keystone_light import ClientV2 +from collectd_ceilometer.keystone_light import MissingServices +import mock +import unittest + + +class KeystoneLightTest(unittest.TestCase): + """Test the keystone light client""" + + def setUp(self): + super(KeystoneLightTest, self).setUp() + + self.test_authtoken = "c5bbb1c9a27e470fb482de2a718e08c2" + self.test_public_endpoint = "http://public_endpoint" + self.test_internal_endpoint = "http://iternal_endpoint" + self.test_region = "RegionOne" + + response = {'access': { + "token": { + "issued_at": "2015-09-04T08:59:09.991646", + "expires": "2015-09-04T09:59:09Z", + "id": self.test_authtoken, + "tenant": { + "enabled": True, + "description": None, + "name": "service", + "id": "fdeec62f6c794c8dbfda448a83de9ce2" + }, + "audit_ids": [ + "Pig7hVfGQjSuUnt1Hc5mCg" + ] + }, + "serviceCatalog": [{ + "endpoints_links": [], + "endpoints": [{ + "adminURL": "http://10.237.214.74:8777/", + "region": self.test_region, + "publicURL": self.test_public_endpoint + '/', + "internalURL": self.test_internal_endpoint, + "id": "ac95b1a24a854ec7a4b63b08ed4cbd83" + }], + "type": "metering", + "name": "ceilometer" + }, ], + }} + + self.mock_response = mock.Mock() + self.mock_response.json.return_value = response + + @mock.patch('collectd_ceilometer.keystone_light.requests.post') + def test_refresh(self, mock_post): + """Test refresh""" + + mock_post.return_value = self.mock_response + + client = ClientV2("test_auth_url", "test_username", + "test_password", "test_tenant") + client.refresh() + + self.assertEqual(mock_post.call_count, 1) + self.assertEqual(client.auth_token, self.test_authtoken) + + expected_args = { + 'headers': {'Accept': 'application/json'}, + 'json': { + 'auth': { + 'tenantName': u'test_tenant', + 'passwordCredentials': { + 'username': u'test_username', + 'password': u'test_password' + } + } + } + } + + self.assertEqual(mock_post.call_args[0], (u'test_auth_url/tokens',)) + self.assertEqual(mock_post.call_args[1], expected_args) + + @mock.patch('collectd_ceilometer.keystone_light.requests.post') + def test_getservice_endpoint(self, mock_post): + """Test getservice endpoint""" + + mock_post.return_value = self.mock_response + + client = ClientV2("test_auth_url", "test_username", + "test_password", "test_tenant") + client.refresh() + + endpoint = client.get_service_endpoint('ceilometer') + self.assertEqual(endpoint, self.test_internal_endpoint) + + endpoint = client.get_service_endpoint('ceilometer', 'publicURL') + self.assertEqual(endpoint, self.test_public_endpoint) + + endpoint = client.get_service_endpoint('ceilometer', 'publicURL', + self.test_region) + self.assertEqual(endpoint, self.test_public_endpoint) + + with self.assertRaises(MissingServices): + client.get_service_endpoint('badname') + + @mock.patch('collectd_ceilometer.keystone_light.requests.post') + def test_getservice_endpoint_error(self, mock_post): + """Test getservice endpoint error""" + + response = {'access': { + "token": { + "id": "authtoken", + }, + "serviceCatalog": [{ + "endpoints_links": [], + "endpoints": [], + "type": "metering", + "missingname": "ceilometer" + }, + ], + }} + + self.mock_response = mock.Mock() + self.mock_response.json.return_value = response + + mock_post.return_value = self.mock_response + + client = ClientV2("test_auth_url", "test_username", + "test_password", "test_tenant") + client.refresh() + + with self.assertRaises(MissingServices): + client.get_service_endpoint('ceilometer') + + @mock.patch('collectd_ceilometer.keystone_light.requests.post') + def test_invalidresponse_missing_access(self, mock_post): + """Test invalid response: missing access""" + + response = {'badresponse': None} + + mock_response = mock.Mock() + mock_response.json.return_value = response + mock_post.return_value = mock_response + + client = keystone_light.ClientV2("test_auth_url", "test_username", + "test_password", "test_tenant") + + with self.assertRaises(keystone_light.InvalidResponse): + client.refresh() + + @mock.patch('collectd_ceilometer.keystone_light.requests.post') + def test_invalidresponse_missing_servicecatalog(self, mock_post): + """Test invalid response: missing servicecatalog""" + + response = {'access': { + 'token': None + } + } + + mock_response = mock.Mock() + mock_response.json.return_value = response + mock_post.return_value = mock_response + + client = keystone_light.ClientV2("test_auth_url", "test_username", + "test_password", "test_tenant") + + with self.assertRaises(keystone_light.InvalidResponse): + client.refresh() + + @mock.patch('collectd_ceilometer.keystone_light.requests.post') + def test_invalidresponse_missing_token(self, mock_post): + """Test invalid response: missing token""" + + response = {'access': { + "serviceCatalog": [] + }} + + mock_response = mock.Mock() + mock_response.json.return_value = response + mock_post.return_value = mock_response + + client = keystone_light.ClientV2("test_auth_url", "test_username", + "test_password", "test_tenant") + + with self.assertRaises(keystone_light.InvalidResponse): + client.refresh() + + @mock.patch('collectd_ceilometer.keystone_light.requests.post') + def test_invalidresponse_missing_id(self, mock_post): + """Test invalid response: missing id""" + + response = {'access': { + "serviceCatalog": [], + "token": None + }, } + + mock_response = mock.Mock() + mock_response.json.return_value = response + mock_post.return_value = mock_response + + client = keystone_light.ClientV2("test_auth_url", "test_username", + "test_password", "test_tenant") + + with self.assertRaises(keystone_light.InvalidResponse): + client.refresh() diff --git a/collectd_ceilometer/tests/test_plugin.py b/collectd_ceilometer/tests/test_plugin.py new file mode 100644 index 0000000..5ef1de0 --- /dev/null +++ b/collectd_ceilometer/tests/test_plugin.py @@ -0,0 +1,364 @@ +# -*- coding: utf-8 -*- + +# Copyright 2010-2011 OpenStack Foundation +# Copyright (c) 2015 Intel Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Plugin tests""" + +from __future__ import unicode_literals + +from collectd_ceilometer.tests.base import TestCase +from collectd_ceilometer.tests.base import Value +from collections import namedtuple +import json +import mock + + +class PluginTest(TestCase): + """Test the collectd plugin""" + + def setUp(self): + super(PluginTest, self).setUp() + client_class \ + = self.get_mock('collectd_ceilometer.keystone_light').ClientV2 + client_class.return_value\ + .get_service_endpoint.return_value = "https://test-ceilometer.tld" + + # TODO(emma-l-foley): Import at top and mock here + from collectd_ceilometer.plugin import instance + from collectd_ceilometer.plugin import Plugin + self.default_instance = instance + self.plugin_instance = Plugin() + self.maxDiff = None + + def test_callbacks(self): + """Verify that the callbacks are registered properly""" + + collectd = self.get_mock('collectd') + + self.assertTrue(collectd.register_init.called) + self.assertTrue(collectd.register_config.called) + self.assertTrue(collectd.register_write.called) + self.assertTrue(collectd.register_shutdown.called) + + def test_write(self): + """Test collectd data writing""" + from collectd_ceilometer.sender import HTTP_CREATED + + requests = self.get_mock('requests') + requests.post.return_value.status_code = HTTP_CREATED + requests.post.return_value.text = 'Created' + + client_class \ + = self.get_mock('collectd_ceilometer.keystone_light').ClientV2 + auth_token = client_class.return_value.auth_token + + # create a value + data = self._create_value() + + # set batch size to 2 and init instance + self.config.update_value('BATCH_SIZE', 2) + self._init_instance() + + # no authentication has been performed so far + self.assertFalse(client_class.called) + + # write first value + self._write_value(data) + + # no value has been sent to ceilometer + self.assertFalse(requests.post.called) + + # send the second value + self._write_value(data) + + # authentication client has been created + self.assertTrue(client_class.called) + self.assertEqual(client_class.call_count, 1) + # and values has been sent + self.assertTrue(requests.post.called) + self.assertEqual(requests.post.call_count, 1) + + expected_args = ('https://test-ceilometer.tld/v2/meters/cpu.freq',) + expected_kwargs = { + 'data': [{ + "source": "collectd", + "counter_name": "cpu.freq", + "counter_unit": "jiffies", + "counter_volume": 1234, + "timestamp": "Thu Nov 29 21:33:09 1973", + "resource_id": "localhost-0", + "resource_metadata": None, + "counter_type": "gauge" + }, { + "source": "collectd", + "counter_name": "cpu.freq", + "counter_unit": "jiffies", + "counter_volume": 1234, + "timestamp": "Thu Nov 29 21:33:09 1973", + "resource_id": "localhost-0", + "resource_metadata": None, + "counter_type": "gauge"}], + 'headers': { + 'Content-type': u'application/json', + 'X-Auth-Token': auth_token}, + 'timeout': 1.0} + + # we cannot compare JSON directly because the original data + # dictionary is unordered + called_kwargs = requests.post.call_args[1] + called_kwargs['data'] = json.loads(called_kwargs['data']) + + # verify data sent to ceilometer + self.assertEqual(requests.post.call_args[0], expected_args) + self.assertEqual(called_kwargs, expected_kwargs) + + # reset post method + requests.post.reset_mock() + + # write another values + self._write_value(data) + # nothing has been sent + self.assertFalse(requests.post.called) + + # call shutdown + self.plugin_instance.shutdown() + self.assertNoError() + # previously written value has been sent + self.assertTrue(requests.post.called) + # no more authentication required + self.assertEqual(client_class.call_count, 1) + + expected_kwargs = { + 'data': [{ + "source": "collectd", + "counter_name": "cpu.freq", + "counter_unit": "jiffies", + "counter_volume": 1234, + "timestamp": "Thu Nov 29 21:33:09 1973", + "resource_id": "localhost-0", + "resource_metadata": None, + "counter_type": "gauge"}], + 'headers': { + 'Content-type': u'application/json', + 'X-Auth-Token': auth_token}, + 'timeout': 1.0} + + # we cannot compare JSON directly because the original data + # dictionary is unordered + called_kwargs = requests.post.call_args[1] + called_kwargs['data'] = json.loads(called_kwargs['data']) + + # verify data sent to ceilometer + self.assertEqual(requests.post.call_args[0], expected_args) + self.assertEqual(called_kwargs, expected_kwargs) + + def test_write_auth_failed(self): + """Test authentication failure""" + + # tell the auth client to rise an exception + client_class \ + = self.get_mock('collectd_ceilometer.keystone_light').ClientV2 + client_class.side_effect = Exception('Test Client() exception') + + # init instance + self._init_instance() + + # write the value + errors = [ + 'Exception during write: Test Client() exception'] + self._write_value(self._create_value(), errors) + + # no requests method has been called + self.assertFalse(self.get_mock('requests').post.called, + "requests method has been called") + + def test_write_auth_failed2(self): + """Test authentication failure2""" + + # tell the auth client to rise an exception + keystone \ + = self.get_mock('collectd_ceilometer.keystone_light') + + client_class = keystone.ClientV2 + client_class.side_effect = keystone.KeystoneException( + "Missing name 'xxx' in received services", + "exception", + "services list") + + # init instance + self._init_instance() + + # write the value + errors = [ + "Suspending error logs until successful auth", + "Authentication error: Missing name 'xxx' in received services" + "\nReason: exception"] + self._write_value(self._create_value(), errors) + + # no requests method has been called + self.assertFalse(self.get_mock('requests').post.called, + "requests method has been called") + + def test_request_error(self): + """Test error raised by underlying requests module""" + + # we have to import the RequestException here as it has been mocked + from requests.exceptions import RequestException + + # tell POST request to raise an exception + requests = self.get_mock('requests') + requests.post.side_effect = RequestException('Test POST exception') + + # init instance + self._init_instance() + + # write the value + self._write_value( + self._create_value(), + ['Ceilometer request error: Test POST exception']) + + def test_reauthentication(self): + """Test re-authentication""" + from collectd_ceilometer.sender import HTTP_UNAUTHORIZED + + requests = self.get_mock('requests') + client_class \ + = self.get_mock('collectd_ceilometer.keystone_light').ClientV2 + client_class.return_value.auth_token = 'Test auth token' + + # init instance + self._init_instance() + + # write the first value + self._write_value(self._create_value()) + + # verify the auth token + call_list = requests.post.call_args_list + self.assertEqual(len(call_list), 1) + # 0 = first call > 1 = call kwargs > headers argument > auth token + token = call_list[0][1]['headers']['X-Auth-Token'] + self.assertEqual(token, 'Test auth token') + + # subsequent call of POST method will fail due to the authentication + requests.post.return_value.status_code = HTTP_UNAUTHORIZED + requests.post.return_value.text = 'Unauthorized' + # set a new auth token + client_class.return_value.auth_token = 'New test auth token' + + self._write_value(self._create_value()) + + # verify the auth token + call_list = requests.post.call_args_list + + # POST called three times + self.assertEqual(len(call_list), 3) + # the second call contains the old token + token = call_list[1][1]['headers']['X-Auth-Token'] + self.assertEqual(token, 'Test auth token') + # the third call contains the new token + token = call_list[2][1]['headers']['X-Auth-Token'] + self.assertEqual(token, 'New test auth token') + + def test_authentication_in_multiple_threads(self): + """Test authentication in muliple threads + + This test simulates the authentication performed from different thread + after the authentication lock has been acquired. The sender is not + authenticated, the lock is acquired, the authentication token exists + (i.e. it has been set by different thread) and it is used. + """ + # pylint: disable=protected-access + + # init plugin instance + self._init_instance() + + # the sender used by the instance + sender = self.plugin_instance._writer._sender + + # create a dummy lock + class DummyLock(namedtuple('LockBase', ['sender', 'token', 'urlbase'])): + """Lock simulation, which sets the auth token when locked""" + + def __enter__(self, *args, **kwargs): + self.sender._auth_token = self.token + self.sender._url_base = self.urlbase + + def __exit__(self, *args, **kwargs): + pass + + # replace the sender's lock by the dummy lock + sender._auth_lock = DummyLock(sender, 'TOKEN', 'URLBASE/%s') + + # write the value + self._write_value(self._create_value()) + + # verify the results + requests = self.get_mock('requests') + client_class \ + = self.get_mock('collectd_ceilometer.keystone_light').ClientV2 + + # client has not been called at all + self.assertFalse(client_class.called) + + # verify the auth token + call_list = requests.post.call_args_list + self.assertEqual(len(call_list), 1) + # 0 = first call > 1 = call kwargs > headers argument > auth token + token = call_list[0][1]['headers']['X-Auth-Token'] + self.assertEqual(token, 'TOKEN') + + def test_exceptions(self): + """Test exception raised during write and shutdown""" + + self._init_instance() + + writer = mock.Mock() + writer.flush.side_effect = Exception('Test shutdown error') + writer.write.side_effect = Exception('Test write error') + + # pylint: disable=protected-access + self.plugin_instance._writer = writer + # pylint: enable=protected-access + + self.plugin_instance.write(self._create_value()) + self.plugin_instance.shutdown() + + self.assertErrors([ + 'Exception during write: Test write error', + 'Exception during shutdown: Test shutdown error']) + + @staticmethod + def _create_value(): + """Create a value""" + retval = Value() + retval.plugin = 'cpu' + retval.plugin_instance = '0' + retval.type = 'freq' + retval.add_value(1234) + return retval + + def _init_instance(self): + """Init current plugin instance""" + self.plugin_instance.config(self.config.node) + self.plugin_instance.init() + + def _write_value(self, value, errors=None): + """Write a value and verify result""" + self.plugin_instance.write(value) + if errors is None: + self.assertNoError() + else: + self.assertErrors(errors) diff --git a/collectd_ceilometer/units.py b/collectd_ceilometer/units.py new file mode 100644 index 0000000..629469e --- /dev/null +++ b/collectd_ceilometer/units.py @@ -0,0 +1,203 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Ceilometer collectd plugin units definition""" + +from __future__ import unicode_literals + + +# Unit mappings in alphabetical order +UNITS = { + 'apache.apache_idle_workers': 'Workers', + 'apache.apache_bytes': 'B/s', + 'apache.apache_requests': 'Req/s', + 'apache.apache_scoreboard': 'Slots', + 'apache.apache_connections': 'Connections', + + 'apcups.timeleft': 's', + 'apcups.temperature': '°C', + 'apcups.percent': 'Load', + 'apcups.charge': 'Ah', + 'apcups.frequency': 'Hz', + 'apcups.voltage': 'V', + + # 'ascent': 'None', + + 'battery.current': 'A', + 'battery.charge': 'Ah', + 'battery.voltage': 'V', + + # 'bind': 'None', + + 'conntrack': 'Entries', + 'contextswitch': 'Sw./s', + 'cpu': 'jiffies', + 'cpufreq': 'MHz', + + 'dbi': 'Count', + 'dbi.mysql_databasesize': 'B', + 'dbi.pg_db_size': 'MB', + + 'df': 'B', + + 'disk.disk_merged': 'Ops/s', + 'disk.disk_octets': 'B/s', + 'disk.disk_ops': 'Ops/s', + 'disk.disk_time': 's', + + 'dns.dns_opcode': 'Queries/s', + 'dns.dns_qtype': 'Queries/s', + 'dns.dns_octets': 'b/s', + 'dns.dns_rcode': 'Queries/s', + + 'entropy': 'b', + + 'filecount.files': 'Files', + 'filecount.bytes': 'B', + + 'hddtemp': '°C', + + 'interface.if_octets': 'B/s', + 'interface.if_errors': 'Errors/s', + 'interface.if_packets': 'Packets/s', + + 'ipmi.fanspeed': 'RPM', + 'ipmi.temperature': '°C', + 'ipmi.voltage': 'V', + + 'iptables.ipt_bytes': 'B', + 'iptables.ipt_packets': 'Packets', + + 'irq': 'Irq/s', + + 'libvirt.if_octets': 'B/s', + 'libvirt.virt_cpu_total': 'ms', + 'libvirt.disk_octets': 'B/s', + 'libvirt.virt_vcpu': 'ms', + 'libvirt.if_dropped': 'Packets/s', + 'libvirt.if_errors': 'Errors/s', + 'libvirt.if_packets': 'Packets/s', + 'libvirt.disk_ops': 'Ops/s', + + 'load': '', + 'lvm': 'B', + # 'madwifi': 'None', + # 'mbmon': 'None', + 'md': 'Disks', + + 'memcached.memcached_command': 'Commands', + 'memcached.memcached_items': 'Items', + 'memcached.df': 'B', + 'memcached.memcached_ops': 'Commands', + 'memcached.ps_count': 'Threads', + 'memcached.percent': '%', + 'memcached.memcached_connections': 'Connections', + 'memcached.memcached_octets': 'B', + 'memcached.ps_cputime': 'Jiffies', + + 'memory': 'B', + + 'mysql.mysql_commands': 'Commands/s', + 'mysql.mysql_qcache': 'Queries', + 'mysql.mysql_locks': 'locks', + 'mysql.cache_result': 'Queries/s', + 'mysql.total_threads': 'Threads', + 'mysql.mysql_handler': 'Invocations', + 'mysql.threads': 'Threads', + 'mysql.mysql_octets': 'B/s', + 'mysql.mysql_log_position': 'Position', + 'mysql.cache_size': 'Queries', + 'mysql.time_offset': 's', + 'mysql.mysql_threads': 'Threads', + + 'netlink.if_rx_errors': 'Errors/s', + 'netlink.if_octets': 'B/s', + 'netlink.if_multicast': 'Packets/s', + 'netlink.if_dropped': 'Packets/s', + 'netlink.if_errors': 'Errors/s', + 'netlink.if_packets': 'Packets/s', + 'netlink.if_tx_errors': 'Errors/s', + 'netlink.if_collisions': 'Collisions/s', + + 'nfs': 'Calls', + + 'nginx.connections': 'Connections', + 'nginx.nginx_requests': 'Requests/s', + 'nginx.nginx_connections': 'Connections/s', + + 'ntpd': 's', + 'ntpd.frequency_offset': 'ppm', + + 'numa': 'Actions', + + 'nut.timeleft': 's', + 'nut.temperature': '°C', + 'nut.power': 'VA', + 'nut.percent': '%', + 'nut.frequency': 'Hz', + 'nut.voltage': 'V', + + 'openvpn.if_octets': 'B/s', + 'openvpn.users': 'Users', + + # 'pinba': 'None', + 'ping': 'ms', + + 'postgresql.pg_blks': 'Blocks', + 'postgresql.pg_xact': 'Transactions', + 'postgresql.pg_n_tup_g': 'Rows', + 'postgresql.pg_numbackends': 'Backends', + 'postgresql.pg_n_tup_c': 'Rows', + 'postgresql.pg_db_size': 'B', + 'postgresql.pg_scan': 'Scans/Rows', + + 'processes': 'Processes', + 'processes.fork_rate': 'forks/s', + 'processes.ps_cputime': 'Jiffies', + 'processes.ps_disk_octets': 'B/s', + 'processes.ps_disk_ops': 'Ops/s', + 'processes.ps_pagefaults': 'Pagefaults', + 'processes.ps_rss': 'B', + 'processes.ps_vm': 'B', + 'processes.ps_stacksize': 'B', + 'processes.ps_code': 'B', + 'processes.ps_data': 'B', + + 'swap': 'B', + 'swap.swap_io': 'Pages', + + 'tcpconns': 'Connections', + 'thermal': '°C', + 'uptime': 's', + 'users': 'Users', + + 'varnish.total_sessions': 'Sessions', + 'varnish.cache': 'Hits', + 'varnish.cache_result': 'Hits', + 'varnish.connections': 'Hits', + 'varnish.total_threads': 'Thread', + 'varnish.http_requests': 'Operations', + 'varnish.total_bytes': 'B', + 'varnish.threads': 'Thread', + 'varnish.total_requests': 'Requests', + 'varnish.total_operations': 'Operations', + + 'vmem.vmpage_action': 'Actions', + 'vmem.vmpage_faults': 'Faults/s', + 'vmem.vmpage_io': 'Pages/s', + 'vmem.vmpage_number': 'Pages', + + 'wireless.signal_quality': '', + 'wireless.signal_power': 'dBm', + 'wireless.signal_noise': 'dBm', +} diff --git a/collectd_ceilometer/writer.py b/collectd_ceilometer/writer.py new file mode 100644 index 0000000..b2ab2e6 --- /dev/null +++ b/collectd_ceilometer/writer.py @@ -0,0 +1,147 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Ceilometer collectd plugin implementation""" + +from __future__ import unicode_literals + +from collectd_ceilometer.sender import Sender +from collectd_ceilometer.settings import Config +from collections import defaultdict +from collections import namedtuple +import json +import logging +import six +import threading +import time + +LOGGER = logging.getLogger(__name__) + + +class Sample(namedtuple('Sample', ['value', 'timestamp', 'meta', + 'resource_id', 'unit', 'metername'])): + """Sample data""" + + def to_payload(self): + """Return a payload dictionary""" + return { + 'counter_name': self.metername, + 'counter_type': 'gauge', + 'counter_unit': self.unit, + 'counter_volume': self.value, + 'timestamp': self.timestamp, + 'resource_metadata': self.meta, + 'source': 'collectd', + 'resource_id': self.resource_id, + } + + +class SampleContainer(object): + """Sample storage""" + + def __init__(self): + self._lock = threading.Lock() + self._data = defaultdict(list) + + def add(self, key, samples, limit): + """Store list of samples under the key + + Store the list of samples under the given key. If numer of stored + samples is greater than the given limit, all the samples are returned + and the stored samples are dropped. Otherwise None is returned. + + @param key key of the samples + @param samples list of samples + @param limit sample list limit + """ + with self._lock: + current = self._data[key] + current += samples + if len(current) >= limit: + self._data[key] = [] + return current + return None + + def reset(self): + """Reset stored samples + + Returns all samples and removes them from the container. + """ + with self._lock: + retval = self._data + self._data = defaultdict(list) + return retval + + +class Writer(object): + """Data collector""" + + def __init__(self, meters): + self._meters = meters + self._samples = SampleContainer() + self._sender = Sender() + + def write(self, vl, data): + """Collect data from collectd + + example of vl: collectd.Values(type='vmpage_action', + type_instance='interleave_hit',plugin='numa',plugin_instance='node0', + host='localhost',time=1443092594.625033,interval=10.0,values=[21383]) + """ + + # take the plugin (specialized or default) for parsing the data + plugin = self._meters.get(vl.plugin) + # prepare all data related to the sample + resource_id = plugin.resource_id(vl) + metername = plugin.meter_name(vl) + unit = plugin.unit(vl) + timestamp = time.asctime(time.gmtime(vl.time)) + + LOGGER.debug( + 'Writing: plugin="%s", metername="%s", unit="%s"', + vl.plugin, metername, unit) + + # store sample for every value + data = [ + Sample( + value=value, timestamp=timestamp, meta=vl.meta, + resource_id=resource_id, unit=unit, metername=metername) + for value in vl.values + ] + + # add data to cache and get the samples to send + to_send = self._samples.add(metername, data, + Config.instance().BATCH_SIZE) + if to_send: + self._send_data(metername, to_send) + + def flush(self): + """Flush all pending samples""" + + # get all stored samples + to_send = self._samples.reset() + + # send all cached samples + for key, samples in six.iteritems(to_send): + if samples: + self._send_data(key, samples) + + def _send_data(self, metername, to_send): + """Send data to ceilometer""" + + LOGGER.debug('Sending %d samples of %s', + len(to_send), metername) + + # ceilometer samples + payload = json.dumps([sample.to_payload() for sample in to_send]) + self._sender.send(metername, payload) diff --git a/doc/source/installation.rst b/doc/source/installation.rst new file mode 100644 index 0000000..4da2a3c --- /dev/null +++ b/doc/source/installation.rst @@ -0,0 +1,12 @@ +============ +Installation +============ + +At the command line:: + + $ pip install collectd_ceilmeter + +Or, if you have virtualenvwrapper installed:: + + $ mkvirtualenv collectd_ceilometer + $ pip install collectd_ceilometer diff --git a/doc/source/usage.rst b/doc/source/usage.rst new file mode 100644 index 0000000..29cf52a --- /dev/null +++ b/doc/source/usage.rst @@ -0,0 +1,11 @@ +======== +Usage +======== + +To use collectd-ceilometer-plugin in collectd:: + + Python plugin must be enabled in collectd + + collectd must be started + (ex: systemctl start collectd) + diff --git a/setup.cfg b/setup.cfg index ce1d807..a579726 100644 --- a/setup.cfg +++ b/setup.cfg @@ -20,11 +20,11 @@ classifier = Programming Language :: Python :: 3.4 [test] -test_suite=collectd_ceilometer_plugin.tests +test_suite=collectd_ceilometer.tests [files] packages = - collectd_ceilometer_plugin + collectd_ceilometer [build_sphinx] source-dir = doc/source @@ -35,15 +35,15 @@ all_files = 1 upload-dir = doc/build/html [compile_catalog] -directory = collectd-ceilometer-plugin/locale +directory = collectd_ceilometer/locale domain = collectd-ceilometer-plugin [update_catalog] domain = collectd-ceilometer-plugin -output_dir = collectd-ceilometer-plugin/locale -input_file = collectd-ceilometer-plugin/locale/collectd-ceilometer-plugin.pot +output_dir = collectd_ceilometer/locale +input_file = collectd_ceilometer/locale/collectd-ceilometer-plugin.pot [extract_messages] keywords = _ gettext ngettext l_ lazy_gettext mapping_file = babel.cfg -output_file = collectd-ceilometer-plugin/locale/collectd-ceilometer-plugin.pot +output_file = collectd_ceilometer/locale/collectd-ceilometer-plugin.pot diff --git a/test-requirements.txt b/test-requirements.txt index 5f73582..1e4413d 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -16,3 +16,4 @@ python-subunit>=0.0.18 sphinx!=1.2.0,!=1.3b1,<1.3,>=1.1.2 testscenarios>=0.4 testtools>=1.4.0 + diff --git a/tox.ini b/tox.ini index 9c19ffc..2b768c6 100644 --- a/tox.ini +++ b/tox.ini @@ -18,7 +18,7 @@ setenv = VIRTUAL_ENV={envdir} [testenv:pep8] -commands = flake8 +commands = flake8 collectd_ceilometer [testenv:venv] commands = {posargs}