Add collectd_gnocchi support

- Create a new gnocchi dir for the collectd-gnocchi plugin.
- Get gnocchi endpoint from keystone
- Add new _base_url format
- Create metrics if they don't already exist
- Add instructions to doc/source/devstackGSG.rst

Change-Id: Id7ce8130cb22f33147b7f031cd65564375db10d6
This commit is contained in:
Emma Foley 2016-09-22 19:02:29 +00:00
parent 4f1d999665
commit cfcdbfdfd1
8 changed files with 856 additions and 0 deletions

View File

View File

@ -0,0 +1,91 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Gnocchi collectd plugin"""
import logging
try:
# pylint: disable=import-error
import collectd
# pylint: enable=import-error
except ImportError:
collectd = None # when running unit tests collectd is not avaliable
import collectd_ceilometer
from collectd_ceilometer.common.logger import CollectdLogHandler
from collectd_ceilometer.common.meters import MeterStorage
from collectd_ceilometer.common.settings import Config
from collectd_ceilometer.gnocchi.writer import Writer
LOGGER = logging.getLogger(__name__)
ROOT_LOGGER = logging.getLogger(collectd_ceilometer.__name__)
def register_plugin(collectd):
"Bind plugin hooks to collectd and viceversa"
config = Config.instance()
# Setup loggging
log_handler = CollectdLogHandler(collectd=collectd)
log_handler.cfg = config
ROOT_LOGGER.addHandler(log_handler)
ROOT_LOGGER.setLevel(logging.NOTSET)
# Creates collectd plugin instance
instance = Plugin(collectd=collectd, config=config)
# Register plugin callbacks
collectd.register_init(instance.init)
collectd.register_config(instance.config)
collectd.register_write(instance.write)
collectd.register_shutdown(instance.shutdown)
class Plugin(object):
"""Gnocchi plugin with collectd callbacks"""
# NOTE: this is multithreaded class
def __init__(self, collectd, config):
self._config = config
self._meters = MeterStorage(collectd=collectd)
self._writer = Writer(self._meters, config=config)
def config(self, cfg):
"""Configuration callback
@param cfg configuration node provided by collectd
"""
# pylint: disable=no-self-use
self._config.read(cfg)
def init(self):
"""Initialization callback"""
collectd.info('Initializing the collectd OpenStack python plugin')
self._meters = MeterStorage(collectd=collectd)
# self._writer = Writer(self._meters)
def write(self, vl, data=None):
"""Collectd write callback"""
self._writer.write(vl, data)
def shutdown(self):
"""Shutdown callback"""
LOGGER.info("SHUTDOWN")
self._writer.flush()
if collectd:
register_plugin(collectd=collectd)

View File

@ -0,0 +1,242 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Gnocchi collectd plugin implementation"""
from __future__ import division
from __future__ import unicode_literals
import collectd_ceilometer
from collectd_ceilometer.common.keystone_light import ClientV3
from collectd_ceilometer.common.keystone_light import KeystoneException
from collectd_ceilometer.common.settings import Config
import json
import logging
import requests
from requests.exceptions import RequestException
import six
import threading
LOGGER = logging.getLogger(__name__)
ROOT_LOGGER = logging.getLogger(collectd_ceilometer.__name__)
# HTTP status codes
HTTP_CREATED = 201
HTTP_UNAUTHORIZED = 401
HTTP_NOT_FOUND = 404
class Sender(object):
"""Sends the JSON serialized data to Gnocchi"""
def __init__(self):
"""Create the Sender instance
The cofinguration must be initialized before the object is created.
"""
self._url_base = None
self._keystone = None
self._auth_token = None
self._auth_lock = threading.Lock()
self._failed_auth = False
self._meter_ids = {}
def _authenticate(self):
"""Authenticate and renew the authentication token"""
# if auth_token is available, just return it
if self._auth_token is not None:
return self._auth_token
# aquire the authentication lock
with self._auth_lock:
# re-check the auth_token as another thread could set it
if self._auth_token is not None:
return self._auth_token
LOGGER.debug('Authenticating request')
# pylint: disable=broad-except
try:
# create a keystone client if it doesn't exist
if self._keystone is None:
cfg = Config.instance()
self._keystone = ClientV3(
auth_url=cfg.OS_AUTH_URL,
username=cfg.OS_USERNAME,
password=cfg.OS_PASSWORD,
tenant_name=cfg.OS_TENANT_NAME
)
# store the authentication token
self._auth_token = self._keystone.auth_token
# get the uri of service endpoint
endpoint = self._get_endpoint("gnocchi")
self._url_base = "{}/v1/metric/%s/measures".format(endpoint)
LOGGER.info('Authenticating request - success')
self._failed_auth = False
except KeystoneException as exc:
log_level = logging.DEBUG
if not self._failed_auth:
log_level = logging.ERROR
LOGGER.error(
'Suspending error logs until successful auth'
)
LOGGER.log(log_level, 'Authentication error: %s',
six.text_type(exc),
exc_info=0)
if exc.response:
LOGGER.debug('Response: %s', exc.response)
self._auth_token = None
self._failed_auth = True
return self._auth_token
def send(self, metername, payload, unit):
"""Send the payload to Gnocchi"""
# get the auth_token
auth_token = self._authenticate()
LOGGER.info('Auth_token: %s',
auth_token,
)
# if auth_token is not set, there is nothing to do
if auth_token is None:
LOGGER.debug('Unable to send data. Not authenticated')
return
if self._url_base is None:
LOGGER.debug(
'Unable to send data. Missing endpoint from ident server')
return
# create request URL
metric_id = self._get_metric_id(metername, unit)
url = self._url_base % (metric_id)
# send the POST request
result = self._perform_request(url, payload, auth_token)
if result is None:
return
LOGGER.info('Result: %s %s',
six.text_type(result.status_code),
result.text)
# if the request failed due to an auth error
if result.status_code == HTTP_UNAUTHORIZED:
# reset the auth token in order to force the subsequent
# _authenticate() call to renew it
# Here, it can happen that the token is reset right after
# another thread has finished the authentication and thus
# the authentication may be performed twice
self._auth_token = None
# renew the authentication token
auth_token = self._authenticate()
if auth_token is not None:
# and try to repost
result = self._perform_request(url, payload, auth_token)
if result.status_code == HTTP_NOT_FOUND:
LOGGER.debug("Received 404 error when submitting %s sample, \
creating a new metric",
metername)
# create metric (endpoint, metername)
metric_id = self._get_metric_id(metername, unit)
LOGGER.info('metername: %s, meter_id: %s', metername, metric_id)
# Set a new url for the request
url = self._url_base % (metric_id)
# TODO(emma-l-foley): Add error checking
# Submit the sample
result = self._perform_request(url, payload, auth_token)
if result.status_code == HTTP_CREATED:
LOGGER.debug('Result: %s', HTTP_CREATED)
else:
LOGGER.info('Result: %s %s',
result.status_code,
result.text)
def _get_endpoint(self, service):
# get the uri of service endpoint
endpoint = self._keystone.get_service_endpoint(
service,
Config.instance().CEILOMETER_URL_TYPE)
return endpoint
def _get_metric_id(self, metername, unit):
try:
return self._meter_ids[metername]
except KeyError as ke:
LOGGER.warn(ke)
LOGGER.warn('No known ID for %s', metername)
endpoint = self._get_endpoint("gnocchi")
self._meter_ids[metername] = \
self._create_metric(metername, endpoint, unit)
return self._meter_ids[metername]
def _create_metric(self, metername, endpoint, unit):
url = "{}/v1/metric/".format(endpoint)
payload = json.dumps({"archive_policy_name": "high",
"name": metername,
"unit": unit,
})
result = self._perform_request(url, payload, self._auth_token)
metric_id = json.loads(result.text)['id']
LOGGER.debug("metric_id=%s", metric_id)
return metric_id
@classmethod
def _perform_request(cls, url, payload, auth_token):
"""Perform the POST request"""
LOGGER.debug('Performing request to %s', url)
# request headers
headers = {'X-Auth-Token': auth_token,
'Content-type': 'application/json'}
# perform request and return its result
response = None
try:
LOGGER.debug(
"Performing request to: %s with data=%s and headers=%s",
url, payload, headers)
response = requests.post(
url, data=payload, headers=headers,
timeout=(Config.instance().CEILOMETER_TIMEOUT / 1000.))
LOGGER.info('Response: %s: %s',
response.status_code, response.text
)
except RequestException as exc:
LOGGER.error('gnocchi request error: %s', six.text_type(exc))
finally:
LOGGER.debug('Returning response from _perform_request(): %s',
response.status_code)
return response

View File

@ -0,0 +1,139 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Gnocchi collectd plugin implementation"""
from __future__ import unicode_literals
from collectd_ceilometer.gnocchi.sender import Sender
from collections import defaultdict
from collections import namedtuple
import datetime
import json
import logging
import six
import threading
LOGGER = logging.getLogger(__name__)
class Sample(namedtuple('Sample', ['value', 'timestamp', 'meta',
'unit', 'metername'])):
"""Sample data"""
def to_payload(self):
"""Return a payload dictionary"""
return {
'value': self.value,
'timestamp': self.timestamp,
}
class SampleContainer(object):
"""Sample storage"""
def __init__(self):
self._lock = threading.Lock()
self._data = defaultdict(list)
def add(self, key, samples, limit):
"""Store list of samples under the key
Store the list of samples under the given key. If numer of stored
samples is greater than the given limit, all the samples are returned
and the stored samples are dropped. Otherwise None is returned.
@param key key of the samples
@param samples list of samples
@param limit sample list limit
"""
with self._lock:
current = self._data[key]
current += samples
if len(current) >= limit:
self._data[key] = []
return current
return None
def reset(self):
"""Reset stored samples
Returns all samples and removes them from the container.
"""
with self._lock:
retval = self._data
self._data = defaultdict(list)
return retval
class Writer(object):
"""Data collector"""
def __init__(self, meters, config):
self._meters = meters
self._samples = SampleContainer()
self._sender = Sender()
self._config = config
def write(self, vl, data):
"""Collect data from collectd
example of vl: collectd.Values(type='vmpage_action',
type_instance='interleave_hit',plugin='numa',plugin_instance='node0',
host='localhost',time=1443092594.625033,interval=10.0,values=[21383])
"""
# take the plugin (specialized or default) for parsing the data
plugin = self._meters.get(vl.plugin)
# prepare all data related to the sample
metername = plugin.meter_name(vl)
unit = plugin.unit(vl)
timestamp = datetime.datetime.utcfromtimestamp(vl.time).isoformat()
LOGGER.debug(
'Writing: plugin="%s", metername="%s"', vl.plugin, metername)
# store sample for every value
data = [
Sample(
value=value, timestamp=timestamp, meta=vl.meta,
unit=unit, metername=metername
)
for value in vl.values
]
# add data to cache and get the samples to send
to_send = self._samples.add(metername, data,
self._config.BATCH_SIZE)
if to_send:
self._send_data(metername, to_send, unit)
def flush(self):
"""Flush all pending samples"""
# get all stored samples
to_send = self._samples.reset()
# send all cached samples
for key, samples in six.iteritems(to_send):
if samples:
self._send_data(key, samples)
def _send_data(self, metername, to_send, unit=None):
"""Send data to gnocchi"""
LOGGER.debug('Sending %d samples of %s',
len(to_send), metername)
# gnocchi samples
payload = json.dumps([sample.to_payload() for sample in to_send])
self._sender.send(metername, payload, unit)

View File

@ -0,0 +1,365 @@
# -*- coding: utf-8 -*-
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2015 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Plugin tests"""
import logging
import mock
import requests
import unittest
from collectd_ceilometer.common.keystone_light import KeystoneException
from collectd_ceilometer.gnocchi import plugin
from collectd_ceilometer.gnocchi import sender
from collectd_ceilometer.tests import match
Logger = logging.getLoggerClass()
def mock_collectd(**kwargs):
"Returns collecd module with collecd logging hooks."
return mock.patch(
__name__ + '.' + MockedCollectd.__name__, specs=True,
get_dataset=mock.MagicMock(side_effect=Exception), **kwargs)
class MockedCollectd(object):
"Mocked collectd module specifications."
def debug(self, record):
"Hook for debug messages"
def info(self, record):
"Hook for info messages"
def warning(self, record):
"Hook for warning messages"
def error(self, record):
"Hook for error messages"
def register_init(self, hook):
"Register an hook for init."
def register_config(self, hook):
"Register an hook for config."
def register_write(self, hook):
"Register an hook for write."
def register_shutdown(self, hook):
"Register an hook for shutdown."
def get_dataset(self, s):
"Gets a dataset."
def mock_config(BATCH_SIZE=1, **kwargs):
"Returns collecd module with collecd logging hooks."
return mock.patch(
__name__ + '.' + MockedConfig.__name__, specs=True,
BATCH_SIZE=BATCH_SIZE, **kwargs)
class MockedConfig(object):
"Mocked config class."
BATCH_SIZE = 1
def mock_value(
host='localhost', plugin='cpu', plugin_instance='0',
_type='freq', type_instance=None, time=123456789, values=(1234,),
**kwargs):
"""Create a mock value"""
return mock.patch(
__name__ + '.' + MockedValue.__name__, specs=True,
host=host, plugin=plugin, plugin_instance=plugin_instance, type=_type,
type_instance=type_instance, time=time, values=list(values), meta=None,
**kwargs)
class MockedValue(object):
"""Value used for testing"""
host = 'localhost'
plugin = None
plugin_instance = None
type = None
type_instance = None
time = 123456789
values = []
meta = None
class TestPlugin(unittest.TestCase):
"""Test the collectd plugin"""
@mock.patch.object(plugin, 'Plugin', autospec=True)
@mock.patch.object(plugin, 'Config', autospec=True)
@mock.patch.object(plugin, 'CollectdLogHandler', autospec=True)
@mock.patch.object(plugin, 'ROOT_LOGGER', autospec=True)
@mock_collectd()
def test_callbacks(
self, collectd, ROOT_LOGGER, CollectdLogHandler, Config, Plugin):
"""Verify that the callbacks are registered properly"""
# When plugin function is called
plugin.register_plugin(collectd=collectd)
# Logger handler is set up
ROOT_LOGGER.addHandler.assert_called_once_with(
CollectdLogHandler.return_value)
ROOT_LOGGER.setLevel.assert_called_once_with(logging.NOTSET)
# It create a plugin
Plugin.assert_called_once_with(
collectd=collectd, config=Config.instance.return_value)
# callbacks are registered to collectd
instance = Plugin.return_value
collectd.register_config.assert_called_once_with(instance.config)
collectd.register_write.assert_called_once_with(instance.write)
collectd.register_shutdown.assert_called_once_with(instance.shutdown)
@mock.patch.object(sender.Sender, '_get_metric_id', autospec=True)
@mock.patch.object(requests, 'post', spec=callable)
@mock.patch.object(sender, 'ClientV3', autospec=True)
@mock_collectd()
@mock_config(BATCH_SIZE=2)
@mock_value()
def test_write(self, data, config, collectd, ClientV3, post, get_metric_id):
"""Test collectd data writing"""
auth_client = ClientV3.return_value
auth_client.get_service_endpoint.return_value = \
'https://test-gnocchi.tld'
post.return_value.status_code = sender.HTTP_CREATED
post.return_value.text = 'Created'
get_metric_id.return_value = 'my-metric-id'
# init instance
instance = plugin.Plugin(collectd=collectd, config=config)
# write the first value
instance.write(data)
collectd.error.assert_not_called()
# no value has been sent to ceilometer
post.assert_not_called()
# send the second value
instance.write(data)
collectd.error.assert_not_called()
# authentication client has been created
ClientV3.assert_called_once()
# and values has been sent
post.assert_called_once_with(
'https://test-gnocchi.tld' +
'/v1/metric/my-metric-id/measures',
data=match.json([{
"value": 1234,
"timestamp": "1973-11-29T21:33:09",
}, {
"value": 1234,
"timestamp": "1973-11-29T21:33:09",
}]),
headers={'Content-type': 'application/json',
'X-Auth-Token': auth_client.auth_token},
timeout=1.0)
# reset post method
post.reset_mock()
# write another values
instance.write(data)
collectd.error.assert_not_called()
# nothing has been sent
post.assert_not_called()
# call shutdown
instance.shutdown()
# no errors
collectd.error.assert_not_called()
# previously written value has been sent
post.assert_called_once_with(
'https://test-gnocchi.tld' +
'/v1/metric/my-metric-id/measures',
data=match.json([{
"value": 1234,
"timestamp": "1973-11-29T21:33:09",
}]),
headers={
'Content-type': 'application/json',
'X-Auth-Token': auth_client.auth_token},
timeout=1.0)
@mock.patch.object(requests, 'post', spec=callable)
@mock.patch.object(sender, 'ClientV3', autospec=True)
@mock.patch.object(sender, 'LOGGER', autospec=True)
@mock_collectd()
@mock_config()
@mock_value()
def test_write_auth_failed(
self, data, config, collectd, LOGGER, ClientV3, post):
"""Test authentication failure"""
# tell the auth client to rise an exception
ClientV3.side_effect = KeystoneException(
"Missing name 'xxx' in received services",
"exception",
"services list")
# init instance
instance = plugin.Plugin(collectd=collectd, config=config)
# write the value
instance.write(data)
LOGGER.error.assert_called_once_with(
"Suspending error logs until successful auth")
LOGGER.log.assert_called_once_with(
logging.ERROR, "Authentication error: %s",
"Missing name 'xxx' in received services\nReason: exception",
exc_info=0)
# no requests method has been called
post.assert_not_called()
@mock.patch.object(sender.Sender, '_perform_request', spec=callable)
@mock.patch.object(sender, 'ClientV3', autospec=True)
@mock_collectd()
@mock_config()
@mock_value()
def test_request_error(
self, data, config, collectd, ClientV3, perf_req):
"""Test error raised by underlying requests module"""
# tell POST request to raise an exception
perf_req.side_effect = requests.RequestException('Test POST exception')
# ieit instance
instance = plugin.Plugin(collectd=collectd, config=config)
# write the value
self.assertRaises(requests.RequestException, instance.write, data)
@mock.patch.object(sender.Sender, '_get_metric_id', autospec=True)
@mock.patch.object(requests, 'post', spec=callable)
@mock.patch.object(sender, 'ClientV3', autospec=True)
@mock_collectd()
@mock_config()
@mock_value()
def test_reauthentication(self, data, config, collectd,
ClientV3, post, get_metric_id):
"""Test re-authentication"""
# init instance
instance = plugin.Plugin(collectd=collectd, config=config)
# the sender used by the instance
get_metric_id.return_value = 'my-metric-id'
# response returned on success
response_ok = requests.Response()
response_ok.status_code = requests.codes["OK"]
# response returned on failure
response_unauthorized = requests.Response()
response_unauthorized.status_code = requests.codes["UNAUTHORIZED"]
post.return_value = response_ok
client = ClientV3.return_value
client.auth_token = 'Test auth token'
# write the value
instance.write(data)
# verify the auth token
post.assert_called_once_with(
mock.ANY, data=mock.ANY,
headers={u'Content-type': mock.ANY,
u'X-Auth-Token': 'Test auth token'},
timeout=1.0)
# POST response is unauthorized -> new token needs to be acquired
post.side_effect = [response_unauthorized, response_ok]
# set a new auth token
client.auth_token = 'New test auth token'
instance.write(data)
# verify the auth token:
call_list = post.call_args_list
# POST called three times
self.assertEqual(len(call_list), 3)
# the second call contains the old token
token = call_list[1][1]['headers']['X-Auth-Token']
self.assertEqual(token, 'Test auth token')
# the third call contains the new token
token = call_list[2][1]['headers']['X-Auth-Token']
self.assertEqual(token, 'New test auth token')
@mock.patch.object(requests, 'post', spec=callable)
@mock.patch.object(sender, 'ClientV3', autospec=True)
@mock.patch.object(plugin, 'Writer', autospec=True)
@mock.patch.object(plugin, 'LOGGER', autospec=True)
@mock_collectd()
@mock_config()
@mock_value()
def test_exception_value_error(self, data, config, collectd,
LOGGER, Writer, ClientV3, post):
"""Test exception raised during write and shutdown"""
writer = Writer.return_value
writer.write.side_effect = ValueError('Test write error')
# init instance
instance = plugin.Plugin(collectd=collectd, config=config)
self.assertRaises(ValueError, instance.write, data)
@mock.patch.object(requests, 'post', spec=callable)
@mock.patch.object(sender, 'ClientV3', autospec=True)
@mock.patch.object(plugin, 'Writer', autospec=True)
@mock.patch.object(plugin, 'LOGGER', autospec=True)
@mock_collectd()
@mock_config()
@mock_value()
def test_exception_runtime_error(self, data, config, collectd,
LOGGER, Writer, ClientV3, post):
"""Test exception raised during write and shutdown"""
writer = Writer.return_value
writer.flush.side_effect = RuntimeError('Test shutdown error')
# init instance
instance = plugin.Plugin(collectd=collectd, config=config)
self.assertRaises(RuntimeError, instance.shutdown)

View File

@ -77,3 +77,21 @@ To enable this feature execute the following instructions:
and seperate each meter and its unit with a comma, as shown below.
| COLLECTD_CUSTOM_UNITS="<meter> <unit>,<meter> <unit>"
Gnocchi
=======
To deploy with gnocchi using devstack, add the following to you local.conf:
enable_plugin collectd-ceilometer-plugin http://github.com/openstack/collectd-ceilometer-plugin
COLLECTD_INSTALL=True
COLLECTD_CONF_DIR=/etc/collectd/collectd.conf.d/
# GNOCCHI
enable_plugin gnocchi https://github.com/openstack/gnocchi master
enable_service gnocchi-api,gnocchi-metricd,gnocchi-statsd
GNOCCHI_USE_KEYSTONE=True
Once deployment is complete, edit collectd-ceilometer-plugin.conf to point at
the collectd_ceiloemter.gnocchi.plugin module.

View File

@ -24,6 +24,7 @@ test_suite=collectd_ceilometer.tests
[files]
packages =
collectd_ceilometer
collectd_gnocchi
[build_sphinx]
source-dir = doc/source