Migrate the rest of the central agent pollsters to use discoveries

Currently, not all the central agent pollsters use discoveries.
In order to enable horizontal scaling, all pollsters must use
discoveries so the resources can be partitioned across the pool
of running central agents.

For pollsters that don't poll specific resources, but only poll
general info from services, we treat the service endpoints as
resources. So in the case of Glance, there will be only one
resource available for all the pollsters (a Glance endpoint).
If there are multiple agents running, only one will be assigned
that endpoint and only one will poll Glance API.

DocImpact
Co-Authored-By: Dina Belova <dbelova@mirantis.com>
Closes-Bug: #1364352
Change-Id: I8f3b228db9aacf3a7cc4b719c50013cc30d5aa79
This commit is contained in:
Nejc Saje 2014-08-27 17:14:15 -04:00
parent fcc6b0de4e
commit b6e1c7e5dc
23 changed files with 507 additions and 320 deletions

View File

@ -86,15 +86,22 @@ class PollingTask(object):
agent_resources = self.manager.discover()
with self.publish_context as publisher:
cache = {}
discovery_cache = {}
for pollster in self.pollsters:
key = pollster.name
LOG.info(_("Polling pollster %s"), key)
pollster_resources = None
if pollster.obj.default_discovery:
pollster_resources = self.manager.discover(
[pollster.obj.default_discovery], discovery_cache)
source_resources = list(self.resources[key].resources)
try:
samples = list(pollster.obj.get_samples(
manager=self.manager,
cache=cache,
resources=source_resources or agent_resources,
resources=(source_resources or
pollster_resources or
agent_resources)
))
publisher(samples)
except Exception as err:
@ -187,9 +194,12 @@ class AgentManager(os_service.Service):
return d.obj
return None
def discover(self, discovery=None):
def discover(self, discovery=None, discovery_cache=None):
resources = []
for url in (discovery or self.default_discovery):
if discovery_cache is not None and url in discovery_cache:
resources.extend(discovery_cache[url])
continue
name, param = self._parse_discoverer(url)
discoverer = self._discoverer(name)
if discoverer:
@ -199,6 +209,8 @@ class AgentManager(os_service.Service):
self._construct_group_id(discoverer.group_id),
discovered)
resources.extend(partitioned)
if discovery_cache is not None:
discovery_cache[url] = partitioned
except Exception as err:
LOG.exception(_('Unable to discover resources: %s') % err)
else:

View File

@ -0,0 +1,54 @@
#
# Copyright 2014 Red Hat, Inc
#
# Author: Nejc Saje <nsaje@redhat.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystoneclient.v2_0 import client as ksclient
from oslo.config import cfg
from ceilometer.openstack.common.gettextutils import _LW
from ceilometer.openstack.common import log
from ceilometer import plugin
LOG = log.getLogger(__name__)
cfg.CONF.import_group('service_credentials', 'ceilometer.service')
class EndpointDiscovery(plugin.DiscoveryBase):
def __init__(self):
super(EndpointDiscovery, self).__init__()
self.keystone = ksclient.Client(
username=cfg.CONF.service_credentials.os_username,
password=cfg.CONF.service_credentials.os_password,
tenant_id=cfg.CONF.service_credentials.os_tenant_id,
tenant_name=cfg.CONF.service_credentials.os_tenant_name,
cacert=cfg.CONF.service_credentials.os_cacert,
auth_url=cfg.CONF.service_credentials.os_auth_url,
region_name=cfg.CONF.service_credentials.os_region_name,
insecure=cfg.CONF.service_credentials.insecure)
def discover(self, param=None):
if not param:
return []
endpoints = self.keystone.service_catalog.get_urls(
service_type=param,
endpoint_type=cfg.CONF.service_credentials.os_endpoint_type,
region_name=cfg.CONF.service_credentials.os_region_name)
if not endpoints:
LOG.warning(_LW('No endpoints found for service %s'), param)
return []
else:
return endpoints

View File

@ -32,6 +32,11 @@ class ComputePollster(plugin.PollsterBase):
It supports the polling API on the compute node.
"""
@property
def default_discovery(self):
# get resources from agent-default discovery
return None
@abc.abstractmethod
def get_samples(self, manager, cache, resources):
"""Return a sequence of Counter instances from polling the resources.

View File

@ -17,7 +17,6 @@
import datetime
from keystoneclient import exceptions
from oslo.config import cfg
import requests
import six
@ -55,25 +54,27 @@ class KwapiClient(object):
class _Base(plugin.CentralPollster):
"""Base class for the Kwapi pollster, derived from CentralPollster."""
@property
def default_discovery(self):
return 'endpoint:energy'
@staticmethod
def get_kwapi_client(ksclient):
def get_kwapi_client(ksclient, endpoint):
"""Returns a KwapiClient configured with the proper url and token."""
endpoint = ksclient.service_catalog.url_for(
service_type='energy',
endpoint_type=cfg.CONF.service_credentials.os_endpoint_type)
return KwapiClient(endpoint, ksclient.auth_token)
CACHE_KEY_PROBE = 'kwapi.probes'
def _iter_probes(self, ksclient, cache):
def _iter_probes(self, ksclient, cache, endpoint):
"""Iterate over all probes."""
if self.CACHE_KEY_PROBE not in cache:
cache[self.CACHE_KEY_PROBE] = self._get_probes(ksclient)
return iter(cache[self.CACHE_KEY_PROBE])
key = '%s-%s' % (endpoint, self.CACHE_KEY_PROBE)
if key not in cache:
cache[key] = self._get_probes(ksclient, endpoint)
return iter(cache[key])
def _get_probes(self, ksclient):
def _get_probes(self, ksclient, endpoint):
try:
client = self.get_kwapi_client(ksclient)
client = self.get_kwapi_client(ksclient, endpoint)
except exceptions.EndpointNotFound:
LOG.debug(_("Kwapi endpoint not found"))
return []
@ -82,39 +83,39 @@ class _Base(plugin.CentralPollster):
class EnergyPollster(_Base):
"""Measures energy consumption."""
@plugin.check_keystone('energy')
def get_samples(self, manager, cache, resources=None):
def get_samples(self, manager, cache, resources):
"""Returns all samples."""
for probe in self._iter_probes(manager.keystone, cache):
yield sample.Sample(
name='energy',
type=sample.TYPE_CUMULATIVE,
unit='kWh',
volume=probe['kwh'],
user_id=None,
project_id=None,
resource_id=probe['id'],
timestamp=datetime.datetime.fromtimestamp(
probe['timestamp']).isoformat(),
resource_metadata={}
)
for endpoint in resources:
for probe in self._iter_probes(manager.keystone, cache, endpoint):
yield sample.Sample(
name='energy',
type=sample.TYPE_CUMULATIVE,
unit='kWh',
volume=probe['kwh'],
user_id=None,
project_id=None,
resource_id=probe['id'],
timestamp=datetime.datetime.fromtimestamp(
probe['timestamp']).isoformat(),
resource_metadata={}
)
class PowerPollster(_Base):
"""Measures power consumption."""
@plugin.check_keystone('energy')
def get_samples(self, manager, cache, resources=None):
def get_samples(self, manager, cache, resources):
"""Returns all samples."""
for probe in self._iter_probes(manager.keystone, cache):
yield sample.Sample(
name='power',
type=sample.TYPE_GAUGE,
unit='W',
volume=probe['w'],
user_id=None,
project_id=None,
resource_id=probe['id'],
timestamp=datetime.datetime.fromtimestamp(
probe['timestamp']).isoformat(),
resource_metadata={}
)
for endpoint in resources:
for probe in self._iter_probes(manager.keystone, cache, endpoint):
yield sample.Sample(
name='power',
type=sample.TYPE_GAUGE,
unit='W',
volume=probe['w'],
user_id=None,
project_id=None,
resource_id=probe['id'],
timestamp=datetime.datetime.fromtimestamp(
probe['timestamp']).isoformat(),
resource_metadata={}
)

View File

@ -43,7 +43,11 @@ class HardwarePollster(plugin.CentralPollster):
super(HardwarePollster, self).__init__()
self.inspectors = {}
def get_samples(self, manager, cache, resources=None):
@property
def default_discovery(self):
return 'tripleo_overcloud_nodes'
def get_samples(self, manager, cache, resources):
"""Return an iterable of Sample instances from polling the resources.
:param manager: The service manager invoking the plugin

View File

@ -44,12 +44,12 @@ cfg.CONF.register_opts(OPTS)
class _Base(plugin.CentralPollster):
@staticmethod
def get_glance_client(ksclient):
endpoint = ksclient.service_catalog.url_for(
service_type='image',
endpoint_type=cfg.CONF.service_credentials.os_endpoint_type)
@property
def default_discovery(self):
return 'endpoint:image'
@staticmethod
def get_glance_client(ksclient, endpoint):
# hard-code v1 glance API version selection while v2 API matures
service_credentials = cfg.CONF.service_credentials
return glanceclient.Client('1', endpoint,
@ -57,8 +57,8 @@ class _Base(plugin.CentralPollster):
cacert=service_credentials.os_cacert,
insecure=service_credentials.insecure)
def _get_images(self, ksclient):
client = self.get_glance_client(ksclient)
def _get_images(self, ksclient, endpoint):
client = self.get_glance_client(ksclient, endpoint)
page_size = cfg.CONF.glance_page_size
kwargs = {}
if page_size > 0:
@ -88,11 +88,12 @@ class _Base(plugin.CentralPollster):
imageIdSet -= set([image.id])
yield image
def _iter_images(self, ksclient, cache):
def _iter_images(self, ksclient, cache, endpoint):
"""Iterate over all images."""
if 'images' not in cache:
cache['images'] = list(self._get_images(ksclient))
return iter(cache['images'])
key = '%s-images' % endpoint
if key not in cache:
cache[key] = list(self._get_images(ksclient, endpoint))
return iter(cache[key])
@staticmethod
def extract_image_metadata(image):
@ -117,34 +118,34 @@ class _Base(plugin.CentralPollster):
class ImagePollster(_Base):
@plugin.check_keystone('image')
def get_samples(self, manager, cache, resources=None):
for image in self._iter_images(manager.keystone, cache):
yield sample.Sample(
name='image',
type=sample.TYPE_GAUGE,
unit='image',
volume=1,
user_id=None,
project_id=image.owner,
resource_id=image.id,
timestamp=timeutils.isotime(),
resource_metadata=self.extract_image_metadata(image),
)
def get_samples(self, manager, cache, resources):
for endpoint in resources:
for image in self._iter_images(manager.keystone, cache, endpoint):
yield sample.Sample(
name='image',
type=sample.TYPE_GAUGE,
unit='image',
volume=1,
user_id=None,
project_id=image.owner,
resource_id=image.id,
timestamp=timeutils.isotime(),
resource_metadata=self.extract_image_metadata(image),
)
class ImageSizePollster(_Base):
@plugin.check_keystone('image')
def get_samples(self, manager, cache, resources=None):
for image in self._iter_images(manager.keystone, cache):
yield sample.Sample(
name='image.size',
type=sample.TYPE_GAUGE,
unit='B',
volume=image.size,
user_id=None,
project_id=image.owner,
resource_id=image.id,
timestamp=timeutils.isotime(),
resource_metadata=self.extract_image_metadata(image),
)
def get_samples(self, manager, cache, resources):
for endpoint in resources:
for image in self._iter_images(manager.keystone, cache, endpoint):
yield sample.Sample(
name='image.size',
type=sample.TYPE_GAUGE,
unit='B',
volume=image.size,
user_id=None,
project_id=image.owner,
resource_id=image.id,
timestamp=timeutils.isotime(),
resource_metadata=self.extract_image_metadata(image),
)

View File

@ -30,33 +30,41 @@ LOG = log.getLogger(__name__)
class FloatingIPPollster(plugin.CentralPollster):
def _get_floating_ips(self):
nv = nova_client.Client()
def _get_floating_ips(self, ksclient, endpoint):
nv = nova_client.Client(
auth_token=ksclient.auth_token, bypass_url=endpoint)
return nv.floating_ip_get_all()
def _iter_floating_ips(self, cache):
if 'floating_ips' not in cache:
cache['floating_ips'] = list(self._get_floating_ips())
return iter(cache['floating_ips'])
def _iter_floating_ips(self, ksclient, cache, endpoint):
key = '%s-floating_ips' % endpoint
if key not in cache:
cache[key] = list(self._get_floating_ips(ksclient, endpoint))
return iter(cache[key])
@plugin.check_keystone('network')
def get_samples(self, manager, cache, resources=None):
for ip in self._iter_floating_ips(cache):
LOG.info(_("FLOATING IP USAGE: %s") % ip.ip)
# FIXME (flwang) Now Nova API /os-floating-ips can't provide those
# attributes were used by Ceilometer, such as project id, host.
# In this fix, those attributes usage will be removed temporarily.
# And they will be back after fix the Nova bug 1174802.
yield sample.Sample(
name='ip.floating',
type=sample.TYPE_GAUGE,
unit='ip',
volume=1,
user_id=None,
project_id=None,
resource_id=ip.id,
timestamp=timeutils.utcnow().isoformat(),
resource_metadata={
'address': ip.ip,
'pool': ip.pool
})
@property
def default_discovery(self):
return 'endpoint:compute'
def get_samples(self, manager, cache, resources):
for endpoint in resources:
for ip in self._iter_floating_ips(manager.keystone, cache,
endpoint):
LOG.info(_("FLOATING IP USAGE: %s") % ip.ip)
# FIXME (flwang) Now Nova API /os-floating-ips can't provide
# those attributes were used by Ceilometer, such as project
# id, host. In this fix, those attributes usage will be
# removed temporarily. And they will be back after fix the
# Nova bug 1174802.
yield sample.Sample(
name='ip.floating',
type=sample.TYPE_GAUGE,
unit='ip',
volume=1,
user_id=None,
project_id=None,
resource_id=ip.id,
timestamp=timeutils.utcnow().isoformat(),
resource_metadata={
'address': ip.ip,
'pool': ip.pool
})

View File

@ -34,7 +34,11 @@ class FirewallPollster(base.BaseServicesPollster):
'firewall_policy_id',
]
def get_samples(self, manager, cache, resources=None):
@property
def default_discovery(self):
return 'fw_services'
def get_samples(self, manager, cache, resources):
resources = resources or []
for fw in resources:
@ -71,7 +75,11 @@ class FirewallPolicyPollster(base.BaseServicesPollster):
'audited',
]
def get_samples(self, manager, cache, resources=None):
@property
def default_discovery(self):
return 'fw_policy'
def get_samples(self, manager, cache, resources):
resources = resources or []
for fw in resources:

View File

@ -21,7 +21,6 @@ import collections
from oslo.utils import timeutils
import six
from ceilometer.central import plugin
from ceilometer.network.services import base
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log
@ -50,7 +49,11 @@ class LBPoolPollster(base.BaseServicesPollster):
'vip_id'
]
def get_samples(self, manager, cache, resources=None):
@property
def default_discovery(self):
return 'lb_pools'
def get_samples(self, manager, cache, resources):
resources = resources or []
for pool in resources:
@ -94,7 +97,11 @@ class LBVipPollster(base.BaseServicesPollster):
'session_persistence',
]
def get_samples(self, manager, cache, resources=None):
@property
def default_discovery(self):
return 'lb_vips'
def get_samples(self, manager, cache, resources):
resources = resources or []
for vip in resources:
@ -132,7 +139,11 @@ class LBMemberPollster(base.BaseServicesPollster):
'weight',
]
def get_samples(self, manager, cache, resources=None):
@property
def default_discovery(self):
return 'lb_members'
def get_samples(self, manager, cache, resources):
resources = resources or []
for member in resources:
@ -167,7 +178,11 @@ class LBHealthMonitorPollster(base.BaseServicesPollster):
'type'
]
def get_samples(self, manager, cache, resources=None):
@property
def default_discovery(self):
return 'lb_health_probes'
def get_samples(self, manager, cache, resources):
for probe in resources:
LOG.debug("Load Balancer Health probe : %s" % probe)
yield sample.Sample(
@ -226,13 +241,16 @@ class _LBStatsPollster(base.BaseServicesPollster):
)
return i_cache[pool_id]
@property
def default_discovery(self):
return 'lb_pools'
@abc.abstractmethod
def _get_sample(pool, c_data):
"""Return one Sample."""
@plugin.check_keystone('network', 'nc')
def get_samples(self, manager, cache, resources=None):
for pool in self._get_lb_pools():
def get_samples(self, manager, cache, resources):
for pool in resources:
try:
c_data = self._populate_stats_cache(pool['id'], cache)
yield self._get_sample(pool, c_data)

View File

@ -35,7 +35,11 @@ class VPNServicesPollster(base.BaseServicesPollster):
'router_id'
]
def get_samples(self, manager, cache, resources=None):
@property
def default_discovery(self):
return 'vpn_services'
def get_samples(self, manager, cache, resources):
resources = resources or []
for vpn in resources:
@ -80,7 +84,11 @@ class IPSecConnectionsPollster(base.BaseServicesPollster):
'tenant_id'
]
def get_samples(self, manager, cache, resources=None):
@property
def default_discovery(self):
return 'ipsec_connections'
def get_samples(self, manager, cache, resources):
resources = resources or []
for conn in resources:

View File

@ -30,6 +30,13 @@ class _Base(plugin.CentralPollster):
NAMESPACE = 'network.statistics.drivers'
drivers = {}
@property
def default_discovery(self):
# this signifies that the pollster gets its resources from
# elsewhere, in this case they're manually listed in the
# pipeline configuration
return None
@abc.abstractproperty
def meter_name(self):
"""Return a Meter Name."""
@ -63,7 +70,7 @@ class _Base(plugin.CentralPollster):
scheme).driver()
return _Base.drivers[scheme]
def get_samples(self, manager, cache, resources=None):
def get_samples(self, manager, cache, resources):
resources = resources or []
for resource in resources:
parse_url, params = self._parse_my_resource(resource)

View File

@ -48,7 +48,7 @@ def logged(func):
class Client(object):
"""A client which gets information via python-novaclient."""
def __init__(self):
def __init__(self, bypass_url=None, auth_token=None):
"""Initialize a nova client object."""
conf = cfg.CONF.service_credentials
tenant = conf.os_tenant_id or conf.os_tenant_name
@ -57,8 +57,10 @@ class Client(object):
api_key=conf.os_password,
project_id=tenant,
auth_url=conf.os_auth_url,
auth_token=auth_token,
region_name=conf.os_region_name,
endpoint_type=conf.os_endpoint_type,
bypass_url=bypass_url,
cacert=conf.os_cacert,
insecure=conf.insecure,
http_log_debug=cfg.CONF.nova_http_log_debug,

View File

@ -19,14 +19,12 @@
from __future__ import absolute_import
from keystoneclient import exceptions
from oslo.config import cfg
from oslo.utils import timeutils
import six.moves.urllib.parse as urlparse
from swiftclient import client as swift
from ceilometer.central import plugin
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log
from ceilometer import sample
@ -48,28 +46,27 @@ class _Base(plugin.CentralPollster):
CACHE_KEY_TENANT = 'tenants'
METHOD = 'head'
@property
def default_discovery(self):
return 'endpoint:object-store'
@property
def CACHE_KEY_METHOD(self):
return 'swift.%s_account' % self.METHOD
def _iter_accounts(self, ksclient, cache):
if self.CACHE_KEY_TENANT not in cache:
cache[self.CACHE_KEY_TENANT] = ksclient.tenants.list()
if self.CACHE_KEY_METHOD not in cache:
cache[self.CACHE_KEY_METHOD] = list(self._get_account_info(
ksclient, cache))
return iter(cache[self.CACHE_KEY_METHOD])
def _iter_accounts(self, ksclient, cache, endpoint):
key_tenant = '%s-%s' % (endpoint, self.CACHE_KEY_TENANT)
key_method = '%s-%s' % (endpoint, self.CACHE_KEY_METHOD)
if key_tenant not in cache:
cache[key_tenant] = ksclient.tenants.list()
if key_method not in cache:
cache[key_method] = list(self._get_account_info(
ksclient, cache, endpoint))
return iter(cache[key_method])
def _get_account_info(self, ksclient, cache):
try:
endpoint = ksclient.service_catalog.url_for(
service_type='object-store',
endpoint_type=cfg.CONF.service_credentials.os_endpoint_type)
except exceptions.EndpointNotFound:
LOG.debug(_("Swift endpoint not found"))
raise StopIteration()
for t in cache[self.CACHE_KEY_TENANT]:
def _get_account_info(self, ksclient, cache, endpoint):
key_tenant = '%s-%s' % (endpoint, self.CACHE_KEY_TENANT)
for t in cache[key_tenant]:
api_method = '%s_account' % self.METHOD
yield (t.id, getattr(swift, api_method)
(self._neaten_url(endpoint, t.id),
@ -84,56 +81,59 @@ class _Base(plugin.CentralPollster):
class ObjectsPollster(_Base):
"""Iterate over all accounts, using keystone."""
@plugin.check_keystone('object-store')
def get_samples(self, manager, cache, resources=None):
for tenant, account in self._iter_accounts(manager.keystone, cache):
yield sample.Sample(
name='storage.objects',
type=sample.TYPE_GAUGE,
volume=int(account['x-account-object-count']),
unit='object',
user_id=None,
project_id=tenant,
resource_id=tenant,
timestamp=timeutils.isotime(),
resource_metadata=None,
)
def get_samples(self, manager, cache, resources):
for endpoint in resources:
for tenant, account in self._iter_accounts(manager.keystone,
cache, endpoint):
yield sample.Sample(
name='storage.objects',
type=sample.TYPE_GAUGE,
volume=int(account['x-account-object-count']),
unit='object',
user_id=None,
project_id=tenant,
resource_id=tenant,
timestamp=timeutils.isotime(),
resource_metadata=None,
)
class ObjectsSizePollster(_Base):
"""Iterate over all accounts, using keystone."""
@plugin.check_keystone('object-store')
def get_samples(self, manager, cache, resources=None):
for tenant, account in self._iter_accounts(manager.keystone, cache):
yield sample.Sample(
name='storage.objects.size',
type=sample.TYPE_GAUGE,
volume=int(account['x-account-bytes-used']),
unit='B',
user_id=None,
project_id=tenant,
resource_id=tenant,
timestamp=timeutils.isotime(),
resource_metadata=None,
)
def get_samples(self, manager, cache, resources):
for endpoint in resources:
for tenant, account in self._iter_accounts(manager.keystone,
cache, endpoint):
yield sample.Sample(
name='storage.objects.size',
type=sample.TYPE_GAUGE,
volume=int(account['x-account-bytes-used']),
unit='B',
user_id=None,
project_id=tenant,
resource_id=tenant,
timestamp=timeutils.isotime(),
resource_metadata=None,
)
class ObjectsContainersPollster(_Base):
"""Iterate over all accounts, using keystone."""
@plugin.check_keystone('object-store')
def get_samples(self, manager, cache, resources=None):
for tenant, account in self._iter_accounts(manager.keystone, cache):
yield sample.Sample(
name='storage.objects.containers',
type=sample.TYPE_GAUGE,
volume=int(account['x-account-container-count']),
unit='container',
user_id=None,
project_id=tenant,
resource_id=tenant,
timestamp=timeutils.isotime(),
resource_metadata=None,
)
def get_samples(self, manager, cache, resources):
for endpoint in resources:
for tenant, account in self._iter_accounts(manager.keystone,
cache, endpoint):
yield sample.Sample(
name='storage.objects.containers',
type=sample.TYPE_GAUGE,
volume=int(account['x-account-container-count']),
unit='container',
user_id=None,
project_id=tenant,
resource_id=tenant,
timestamp=timeutils.isotime(),
resource_metadata=None,
)
class ContainersObjectsPollster(_Base):
@ -141,22 +141,23 @@ class ContainersObjectsPollster(_Base):
METHOD = 'get'
@plugin.check_keystone('object-store')
def get_samples(self, manager, cache, resources=None):
for project, account in self._iter_accounts(manager.keystone, cache):
containers_info = account[1]
for container in containers_info:
yield sample.Sample(
name='storage.containers.objects',
type=sample.TYPE_GAUGE,
volume=int(container['count']),
unit='object',
user_id=None,
project_id=project,
resource_id=project + '/' + container['name'],
timestamp=timeutils.isotime(),
resource_metadata=None,
)
def get_samples(self, manager, cache, resources):
for endpoint in resources:
for project, account in self._iter_accounts(manager.keystone,
cache, endpoint):
containers_info = account[1]
for container in containers_info:
yield sample.Sample(
name='storage.containers.objects',
type=sample.TYPE_GAUGE,
volume=int(container['count']),
unit='object',
user_id=None,
project_id=project,
resource_id=project + '/' + container['name'],
timestamp=timeutils.isotime(),
resource_metadata=None,
)
class ContainersSizePollster(_Base):
@ -164,19 +165,20 @@ class ContainersSizePollster(_Base):
METHOD = 'get'
@plugin.check_keystone('object-store')
def get_samples(self, manager, cache, resources=None):
for project, account in self._iter_accounts(manager.keystone, cache):
containers_info = account[1]
for container in containers_info:
yield sample.Sample(
name='storage.containers.objects.size',
type=sample.TYPE_GAUGE,
volume=int(container['bytes']),
unit='B',
user_id=None,
project_id=project,
resource_id=project + '/' + container['name'],
timestamp=timeutils.isotime(),
resource_metadata=None,
)
def get_samples(self, manager, cache, resources):
for endpoint in resources:
for project, account in self._iter_accounts(manager.keystone,
cache, endpoint):
containers_info = account[1]
for container in containers_info:
yield sample.Sample(
name='storage.containers.objects.size',
type=sample.TYPE_GAUGE,
volume=int(container['bytes']),
unit='B',
user_id=None,
project_id=project,
resource_id=project + '/' + container['name'],
timestamp=timeutils.isotime(),
resource_metadata=None,
)

View File

@ -128,8 +128,23 @@ class NotificationBase(PluginBase):
class PollsterBase(PluginBase):
"""Base class for plugins that support the polling API."""
@abc.abstractproperty
def default_discovery(self):
"""Default discovery to use for this pollster.
There are three ways a pollster can get a list of resources to poll,
listed here in ascending order of precedence:
1. from the per-agent discovery,
2. from the per-pollster discovery (defined here)
3. from the per-pipeline configured discovery and/or per-pipeline
configured static resources.
If a pollster should only get resources from #1 or #3, this property
should be set to None.
"""
@abc.abstractmethod
def get_samples(self, manager, cache, resources=None):
def get_samples(self, manager, cache, resources):
"""Return a sequence of Counter instances from polling the resources.
:param manager: The service manager class invoking the plugin.
@ -137,9 +152,10 @@ class PollsterBase(PluginBase):
between themselves when recomputing it would be
expensive (e.g., asking another service for a
list of objects).
:param resources: A list of the endpoints the pollster will get data
:param resources: A list of resources the pollster will get data
from. It's up to the specific pollster to decide
how to use it.
how to use it. It is usually supplied by a discovery,
see ``default_discovery`` for more information.
"""

View File

@ -71,8 +71,13 @@ default_test_data = TestSample(
class TestPollster(plugin.PollsterBase):
test_data = default_test_data
discovery = None
def get_samples(self, manager, cache, resources=None):
@property
def default_discovery(self):
return self.discovery
def get_samples(self, manager, cache, resources):
resources = resources or []
self.samples.append((manager, resources))
self.resources.extend(resources)
@ -82,7 +87,7 @@ class TestPollster(plugin.PollsterBase):
class TestPollsterException(TestPollster):
def get_samples(self, manager, cache, resources=None):
def get_samples(self, manager, cache, resources):
resources = resources or []
self.samples.append((manager, resources))
self.resources.extend(resources)
@ -257,9 +262,13 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
def tearDown(self):
self.Pollster.samples = []
self.Pollster.discovery = []
self.PollsterAnother.samples = []
self.PollsterAnother.discovery = []
self.PollsterException.samples = []
self.PollsterException.discovery = []
self.PollsterExceptionAnother.samples = []
self.PollsterExceptionAnother.discovery = []
self.Pollster.resources = []
self.PollsterAnother.resources = []
self.PollsterException.resources = []
@ -442,6 +451,23 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self._do_test_per_agent_discovery(['discovered_1', 'discovered_2'],
['static_1', 'static_2'])
def test_per_agent_discovery_overridden_by_per_pollster_discovery(self):
discovered_resources = ['discovered_1', 'discovered_2']
self.mgr.discovery_manager = self.create_discovery_manager()
self.Pollster.discovery = 'testdiscovery'
self.mgr.default_discovery = ['testdiscoveryanother',
'testdiscoverynonexistent',
'testdiscoveryexception']
self.pipeline_cfg[0]['resources'] = []
self.Discovery.resources = discovered_resources
self.DiscoveryAnother.resources = [d[::-1]
for d in discovered_resources]
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
self.assertEqual(set(self.Discovery.resources),
set(self.Pollster.resources))
def test_per_agent_discovery_overridden_by_per_pipeline_discovery(self):
discovered_resources = ['discovered_1', 'discovered_2']
self.mgr.discovery_manager = self.create_discovery_manager()
@ -458,6 +484,57 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.assertEqual(set(self.DiscoveryAnother.resources),
set(self.Pollster.resources))
def _do_test_per_pollster_discovery(self, discovered_resources,
static_resources):
self.Pollster.discovery = 'testdiscovery'
self.mgr.discovery_manager = self.create_discovery_manager()
self.Discovery.resources = discovered_resources
self.DiscoveryAnother.resources = [d[::-1]
for d in discovered_resources]
if static_resources:
# just so we can test that static + pre_pipeline amalgamated
# override per_pollster
self.pipeline_cfg[0]['discovery'] = ['testdiscoveryanother',
'testdiscoverynonexistent',
'testdiscoveryexception']
self.pipeline_cfg[0]['resources'] = static_resources
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
if static_resources:
self.assertEqual(set(static_resources +
self.DiscoveryAnother.resources),
set(self.Pollster.resources))
else:
self.assertEqual(set(self.Discovery.resources),
set(self.Pollster.resources))
def test_per_pollster_discovery(self):
self._do_test_per_pollster_discovery(['discovered_1', 'discovered_2'],
[])
def test_per_pollster_discovery_overridden_by_per_pipeline_discovery(self):
# ensure static+per_source_discovery overrides per_pollster_discovery
self._do_test_per_pollster_discovery(['discovered_1', 'discovered_2'],
['static_1', 'static_2'])
def test_per_pollster_discovery_caching(self):
# ensure single discovery associated with multiple pollsters
# only called once per polling cycle
discovered_resources = ['discovered_1', 'discovered_2']
self.Pollster.discovery = 'testdiscovery'
self.PollsterAnother.discovery = 'testdiscovery'
self.mgr.discovery_manager = self.create_discovery_manager()
self.Discovery.resources = discovered_resources
self.pipeline_cfg[0]['counters'].append('testanother')
self.pipeline_cfg[0]['resources'] = []
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
self.assertEqual(1, len(self.Discovery.params))
self.assertEqual(discovered_resources, self.Pollster.resources)
self.assertEqual(discovered_resources, self.PollsterAnother.resources)
def _do_test_per_pipeline_discovery(self,
discovered_resources,
static_resources):

View File

@ -38,7 +38,7 @@ class TestManager(base.BaseTestCase):
class TestPollsterKeystone(agentbase.TestPollster):
@plugin.check_keystone
def get_samples(self, manager, cache, resources=None):
def get_samples(self, manager, cache, resources):
func = super(TestPollsterKeystone, self).get_samples
return func(manager=manager,
cache=cache,

View File

@ -47,9 +47,12 @@ PROBE_DICT = {
}
}
ENDPOINT = 'end://point'
class TestManager(manager.AgentManager):
@mock.patch('keystoneclient.v2_0.client', mock.MagicMock())
def __init__(self):
super(TestManager, self).__init__()
self.keystone = mock.Mock()
@ -64,14 +67,15 @@ class TestKwapi(base.BaseTestCase):
self.manager = TestManager()
@staticmethod
def fake_get_kwapi_client(ksclient):
def fake_get_kwapi_client(ksclient, endpoint):
raise exceptions.EndpointNotFound("fake keystone exception")
def test_endpoint_not_exist(self):
with mockpatch.PatchObject(kwapi._Base, 'get_kwapi_client',
side_effect=self.fake_get_kwapi_client):
pollster = kwapi.EnergyPollster()
samples = list(pollster.get_samples(self.manager, {}))
samples = list(pollster.get_samples(self.manager, {},
[ENDPOINT]))
self.assertEqual(0, len(samples))
@ -87,7 +91,7 @@ class TestEnergyPollster(base.BaseTestCase):
kwapi._Base, '_iter_probes', side_effect=self.fake_iter_probes))
@staticmethod
def fake_iter_probes(ksclient, cache):
def fake_iter_probes(ksclient, cache, endpoint):
probes = PROBE_DICT['probes']
for key, value in six.iteritems(probes):
probe_dict = value
@ -99,6 +103,7 @@ class TestEnergyPollster(base.BaseTestCase):
samples = list(kwapi.EnergyPollster().get_samples(
self.manager,
cache,
[ENDPOINT]
))
self.assertEqual(3, len(samples))
samples_by_name = dict((s.resource_id, s) for s in samples)
@ -126,13 +131,15 @@ class TestEnergyPollsterCache(base.BaseTestCase):
probe = {'id': 'A'}
probe.update(PROBE_DICT['probes']['A'])
cache = {
kwapi.EnergyPollster.CACHE_KEY_PROBE: [probe],
'%s-%s' % (ENDPOINT, kwapi.EnergyPollster.CACHE_KEY_PROBE):
[probe],
}
self.manager.keystone = mock.Mock()
pollster = kwapi.EnergyPollster()
with mock.patch.object(pollster, '_get_probes') as do_not_call:
do_not_call.side_effect = AssertionError('should not be called')
samples = list(pollster.get_samples(self.manager, cache))
samples = list(pollster.get_samples(self.manager, cache,
[ENDPOINT]))
self.assertEqual(1, len(samples))
@ -147,7 +154,7 @@ class TestPowerPollster(base.BaseTestCase):
kwapi._Base, '_iter_probes', side_effect=self.fake_iter_probes))
@staticmethod
def fake_iter_probes(ksclient, cache):
def fake_iter_probes(ksclient, cache, endpoint):
probes = PROBE_DICT['probes']
for key, value in six.iteritems(probes):
probe_dict = value
@ -159,6 +166,7 @@ class TestPowerPollster(base.BaseTestCase):
samples = list(kwapi.PowerPollster().get_samples(
self.manager,
cache,
[ENDPOINT]
))
self.assertEqual(3, len(samples))
samples_by_name = dict((s.resource_id, s) for s in samples)
@ -183,11 +191,12 @@ class TestPowerPollsterCache(base.BaseTestCase):
probe = {'id': 'A'}
probe.update(PROBE_DICT['probes']['A'])
cache = {
kwapi.PowerPollster.CACHE_KEY_PROBE: [probe],
'%s-%s' % (ENDPOINT, kwapi.PowerPollster.CACHE_KEY_PROBE): [probe],
}
self.manager.keystone = mock.Mock()
pollster = kwapi.PowerPollster()
with mock.patch.object(pollster, '_get_probes') as do_not_call:
do_not_call.side_effect = AssertionError('should not be called')
samples = list(pollster.get_samples(self.manager, cache))
samples = list(pollster.get_samples(self.manager, cache,
[ENDPOINT]))
self.assertEqual(1, len(samples))

View File

@ -104,6 +104,8 @@ IMAGE_LIST = [
u'size': 2048}),
]
ENDPOINT = 'end://point'
class _BaseObject(object):
pass
@ -125,7 +127,7 @@ class TestManager(manager.AgentManager):
class TestImagePollsterPageSize(base.BaseTestCase):
def fake_get_glance_client(self, ksclient):
def fake_get_glance_client(self, ksclient, endpoint):
glanceclient = FakeGlanceClient()
glanceclient.images.list = mock.MagicMock(return_value=IMAGE_LIST)
return glanceclient
@ -143,7 +145,7 @@ class TestImagePollsterPageSize(base.BaseTestCase):
def _do_test_iter_images(self, page_size=0):
self.CONF.set_override("glance_page_size", page_size)
images = list(glance.ImagePollster().
_iter_images(self.manager.keystone, {}))
_iter_images(self.manager.keystone, {}, ENDPOINT))
kwargs = {}
if page_size > 0:
kwargs['page_size'] = page_size
@ -163,7 +165,7 @@ class TestImagePollsterPageSize(base.BaseTestCase):
class TestImagePollster(base.BaseTestCase):
def fake_get_glance_client(self, ksclient):
def fake_get_glance_client(self, ksclient, endpoint):
glanceclient = _BaseObject()
setattr(glanceclient, "images", _BaseObject())
setattr(glanceclient.images,
@ -183,26 +185,29 @@ class TestImagePollster(base.BaseTestCase):
# Tests whether the iter_images method returns a unique image
# list when there is nothing in the cache
images = list(glance.ImagePollster().
_iter_images(self.manager.keystone, {}))
_iter_images(self.manager.keystone, {}, ENDPOINT))
self.assertEqual(len(set(image.id for image in images)), len(images))
def test_iter_images_cached(self):
# Tests whether the iter_images method returns the values from
# the cache
cache = {'images': []}
cache = {'%s-images' % ENDPOINT: []}
images = list(glance.ImagePollster().
_iter_images(self.manager.keystone, cache))
_iter_images(self.manager.keystone, cache,
ENDPOINT))
self.assertEqual([], images)
def test_image(self):
samples = list(glance.ImagePollster().get_samples(self.manager, {}))
samples = list(glance.ImagePollster().get_samples(self.manager, {},
[ENDPOINT]))
self.assertEqual(3, len(samples))
for sample in samples:
self.assertEqual(1, sample.volume)
def test_image_size(self):
samples = list(glance.ImageSizePollster().get_samples(self.manager,
{}))
{},
[ENDPOINT]))
self.assertEqual(3, len(samples))
for image in IMAGE_LIST:
self.assertTrue(
@ -210,10 +215,12 @@ class TestImagePollster(base.BaseTestCase):
samples)))
def test_image_get_sample_names(self):
samples = list(glance.ImagePollster().get_samples(self.manager, {}))
samples = list(glance.ImagePollster().get_samples(self.manager, {},
[ENDPOINT]))
self.assertEqual(set(['image']), set([s.name for s in samples]))
def test_image_size_get_sample_names(self):
samples = list(glance.ImageSizePollster().get_samples(self.manager,
{}))
{},
[ENDPOINT]))
self.assertEqual(set(['image.size']), set([s.name for s in samples]))

View File

@ -469,7 +469,8 @@ class TestLBStatsPollster(_BaseTestLBPollster):
pollster = factory()
cache = {}
samples = list(pollster.get_samples(self.manager, cache))
samples = list(pollster.get_samples(self.manager, cache,
self.fake_get_pools()))
self.assertEqual(1, len(samples))
self.assertIsNotNone(samples)
self.assertIn('lbstats', cache)

View File

@ -70,7 +70,7 @@ class TestFloatingIPPollster(base.BaseTestCase):
# assert False, 'Should have seen an error'
def test_get_samples_not_empty(self):
samples = list(self.pollster.get_samples(self.manager, {}))
samples = list(self.pollster.get_samples(self.manager, {}, ['e']))
self.assertEqual(3, len(samples))
# It's necessary to verify all the attributes extracted by Nova
# API /os-floating-ips to make sure they're available and correct.
@ -87,10 +87,10 @@ class TestFloatingIPPollster(base.BaseTestCase):
self.assertEqual("public", samples[2].resource_metadata["pool"])
def test_get_meter_names(self):
samples = list(self.pollster.get_samples(self.manager, {}))
samples = list(self.pollster.get_samples(self.manager, {}, ['e']))
self.assertEqual(set(['ip.floating']), set([s.name for s in samples]))
def test_get_samples_cached(self):
cache = {'floating_ips': self.fake_get_ips()[:2]}
samples = list(self.pollster.get_samples(self.manager, cache))
cache = {'e-floating_ips': self.fake_get_ips()[:2]}
samples = list(self.pollster.get_samples(self.manager, cache, ['e']))
self.assertEqual(2, len(samples))

View File

@ -53,6 +53,8 @@ GET_ACCOUNTS = [('tenant-002', ({'x-account-object-count': 10,
'x-account-container-count': 0,
}, [])), ]
ENDPOINT = 'end://point'
class TestManager(manager.AgentManager):
@ -83,7 +85,7 @@ class TestSwiftPollster(testscenarios.testcase.WithScenarios,
def fake_ks_service_catalog_url_for(*args, **kwargs):
raise exceptions.EndpointNotFound("Fake keystone exception")
def fake_iter_accounts(self, ksclient, cache):
def fake_iter_accounts(self, ksclient, cache, endpoint):
for i in self.ACCOUNTS:
yield i
@ -102,10 +104,13 @@ class TestSwiftPollster(testscenarios.testcase.WithScenarios,
cache = {}
with mockpatch.PatchObject(self.factory, '_get_account_info',
return_value=[]):
data = list(self.pollster._iter_accounts(mock.Mock(), cache))
data = list(self.pollster._iter_accounts(mock.Mock(), cache,
ENDPOINT))
self.assertTrue(self.pollster.CACHE_KEY_TENANT in cache)
self.assertTrue(self.pollster.CACHE_KEY_METHOD in cache)
self.assertTrue('%s-%s' % (ENDPOINT, self.pollster.CACHE_KEY_TENANT)
in cache)
self.assertTrue('%s-%s' % (ENDPOINT, self.pollster.CACHE_KEY_METHOD)
in cache)
self.assertEqual([], data)
def test_iter_accounts_tenants_cached(self):
@ -119,15 +124,17 @@ class TestSwiftPollster(testscenarios.testcase.WithScenarios,
api_method = '%s_account' % self.pollster.METHOD
with mockpatch.PatchObject(swift_client, api_method, new=ksclient):
key = '%s-%s' % (ENDPOINT, self.pollster.CACHE_KEY_TENANT)
with mockpatch.PatchObject(self.factory, '_neaten_url'):
Tenant = collections.namedtuple('Tenant', 'id')
cache = {
self.pollster.CACHE_KEY_TENANT: [
key: [
Tenant(self.ACCOUNTS[0][0])
],
}
data = list(self.pollster._iter_accounts(mock.Mock(), cache))
self.assertTrue(self.pollster.CACHE_KEY_METHOD in cache)
data = list(self.pollster._iter_accounts(mock.Mock(), cache,
ENDPOINT))
self.assertTrue(key in cache)
self.assertEqual(self.ACCOUNTS[0][0], data[0][0])
def test_neaten_url(self):
@ -150,14 +157,16 @@ class TestSwiftPollster(testscenarios.testcase.WithScenarios,
def test_metering(self):
with mockpatch.PatchObject(self.factory, '_iter_accounts',
side_effect=self.fake_iter_accounts):
samples = list(self.pollster.get_samples(self.manager, {}))
samples = list(self.pollster.get_samples(self.manager, {},
[ENDPOINT]))
self.assertEqual(2, len(samples))
def test_get_meter_names(self):
with mockpatch.PatchObject(self.factory, '_iter_accounts',
side_effect=self.fake_iter_accounts):
samples = list(self.pollster.get_samples(self.manager, {}))
samples = list(self.pollster.get_samples(self.manager, {},
[ENDPOINT]))
self.assertEqual(set([samples[0].name]),
set([s.name for s in samples]))
@ -166,6 +175,7 @@ class TestSwiftPollster(testscenarios.testcase.WithScenarios,
with mockpatch.PatchObject(
self.manager.keystone.service_catalog, 'url_for',
side_effect=self.fake_ks_service_catalog_url_for):
samples = list(self.pollster.get_samples(self.manager, {}))
samples = list(self.pollster.get_samples(self.manager, {},
[ENDPOINT]))
self.assertEqual(0, len(samples))

View File

@ -30,70 +30,6 @@ sources:
- "network.outgoing.packets"
sinks:
- network_sink
- name: lb_pool_source
interval: 600
meters:
- "network.services.lb.pool"
discovery:
- "lb_pools"
sinks:
- meter_sink
- name: lb_health_monitor_source
interval: 600
meters:
- "network.services.lb.health_monitor"
discovery:
- "lb_health_probes"
sinks:
- meter_sink
- name: lb_vip_source
interval: 600
meters:
- "network.services.lb.vip"
discovery:
- "lb_vips"
sinks:
- meter_sink
- name: lb_member_source
interval: 600
meters:
- "network.services.lb.member"
discovery:
- "lb_members"
sinks:
- meter_sink
- name: vpn_services_source
interval: 600
meters:
- "network.services.vpn"
discovery:
- "vpn_services"
sinks:
- "meter_sink"
- name: vpn_conns_source
interval: 600
meters:
- "network.services.vpn.connections"
discovery:
- "ipsec_connections"
sinks:
- "meter_sink"
- name: firewall_source
interval: 600
meters:
- "network.services.firewall"
discovery:
- "fw_services"
sinks:
- "meter_sink"
- name: fw_policy_source
interval: 600
meters:
- "network.services.firewall.policy"
discovery:
- "fw_policy"
sinks:
- "meter_sink"
sinks:
- name: meter_sink
transformers:

View File

@ -80,6 +80,7 @@ ceilometer.notification =
ceilometer.discover =
local_instances = ceilometer.compute.discovery:InstanceDiscovery
endpoint = ceilometer.central.discovery:EndpointDiscovery
lb_pools = ceilometer.network.services.discovery:LBPoolsDiscovery
lb_vips = ceilometer.network.services.discovery:LBVipsDiscovery
lb_members = ceilometer.network.services.discovery:LBMembersDiscovery