From 68402375740ccc31ca6b4977938765ac66b966f6 Mon Sep 17 00:00:00 2001 From: Pradeep Kilambi Date: Tue, 1 Jul 2014 10:42:13 -0700 Subject: [PATCH] Use resource discovery for Network Services This change migrates the network services pollsters to leverage resource discovery through pipeline yaml. This will also be more HA friendly to distribute work across multiple central agents. Change-Id: I4d454e98974438438c166051451b76ce9fbbc2a4 --- ceilometer/network/services/base.py | 51 ++++++++ ceilometer/network/services/discovery.py | 78 ++++++++++++ ceilometer/network/services/lbaas.py | 70 +++-------- ceilometer/neutron_client.py | 1 + ceilometer/pipeline.py | 2 - .../tests/network/services/test_lbaas.py | 115 ++++++++++++++++-- etc/ceilometer/pipeline.yaml | 32 +++++ setup.cfg | 5 + 8 files changed, 287 insertions(+), 67 deletions(-) create mode 100644 ceilometer/network/services/base.py create mode 100644 ceilometer/network/services/discovery.py diff --git a/ceilometer/network/services/base.py b/ceilometer/network/services/base.py new file mode 100644 index 000000000..64f330131 --- /dev/null +++ b/ceilometer/network/services/base.py @@ -0,0 +1,51 @@ +# +# Copyright 2014 Cisco Systems,Inc. +# +# Author: Pradeep Kilambi +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from ceilometer import neutron_client +from ceilometer.openstack.common import log +from ceilometer import plugin + +LOG = log.getLogger(__name__) + + +# status map for converting metric status to volume int +STATUS = { + 'inactive': 0, + 'active': 1, + 'pending_create': 2, +} + + +class BaseServicesPollster(plugin.PollsterBase): + + FIELDS = [] + nc = neutron_client.Client() + + def _iter_cache(self, cache, meter_name, method): + if meter_name not in cache: + cache[meter_name] = list(method()) + return iter(cache[meter_name]) + + def extract_metadata(self, metric): + return dict((k, metric[k]) for k in self.FIELDS) + + @staticmethod + def get_status_id(value): + status = value.lower() + if status not in STATUS: + return -1 + return STATUS[status] diff --git a/ceilometer/network/services/discovery.py b/ceilometer/network/services/discovery.py new file mode 100644 index 000000000..34719d49f --- /dev/null +++ b/ceilometer/network/services/discovery.py @@ -0,0 +1,78 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2014 Cisco Systems, Inc +# +# Author:Pradeep Kilambi +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from ceilometer import neutron_client +from ceilometer import plugin + + +class _BaseServicesDiscovery(plugin.DiscoveryBase): + + def __init__(self): + super(_BaseServicesDiscovery, self).__init__() + self.neutron_cli = neutron_client.Client() + + +class LBPoolsDiscovery(_BaseServicesDiscovery): + + def __init__(self): + super(LBPoolsDiscovery, self).__init__() + + def discover(self, param=None): + """Discover resources to monitor.""" + + pools = self.neutron_cli.pool_get_all() + return [i for i in pools + if i.get('status') != 'error'] + + +class LBVipsDiscovery(_BaseServicesDiscovery): + + def __init__(self): + super(LBVipsDiscovery, self).__init__() + + def discover(self, param=None): + """Discover resources to monitor.""" + + vips = self.neutron_cli.vip_get_all() + return [i for i in vips + if i.get('status', None) != 'error'] + + +class LBMembersDiscovery(_BaseServicesDiscovery): + + def __init__(self): + super(LBMembersDiscovery, self).__init__() + + def discover(self, param=None): + """Discover resources to monitor.""" + + members = self.neutron_cli.member_get_all() + return [i for i in members + if i.get('status', None) != 'error'] + + +class LBHealthMonitorsDiscovery(_BaseServicesDiscovery): + + def __init__(self): + super(LBHealthMonitorsDiscovery, self).__init__() + + def discover(self, param=None): + """Discover resources to monitor.""" + + probes = self.neutron_cli.health_monitor_get_all() + return probes diff --git a/ceilometer/network/services/lbaas.py b/ceilometer/network/services/lbaas.py index e49f297af..c92555d6b 100644 --- a/ceilometer/network/services/lbaas.py +++ b/ceilometer/network/services/lbaas.py @@ -19,11 +19,10 @@ import abc import collections import six -from ceilometer import neutron_client +from ceilometer.network.services import base from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import log from ceilometer.openstack.common import timeutils -from ceilometer import plugin from ceilometer import sample LOG = log.getLogger(__name__) @@ -33,37 +32,10 @@ LBStatsData = collections.namedtuple( ['active_connections', 'total_connections', 'bytes_in', 'bytes_out'] ) -# status map for converting metric status to volume int -STATUS = { - 'inactive': 0, - 'active': 1, - 'pending_create': 2, -} - -class _BasePollster(plugin.PollsterBase): - - FIELDS = [] - nc = neutron_client.Client() - - def _iter_cache(self, cache, meter_name, method): - if meter_name not in cache: - cache[meter_name] = list(method()) - return iter(cache[meter_name]) - - def extract_metadata(self, metric): - return dict((k, metric[k]) for k in self.FIELDS) - - @staticmethod - def get_status_id(value): - status = value.lower() - if status not in STATUS: - return -1 - return STATUS[status] - - -class LBPoolPollster(_BasePollster): +class LBPoolPollster(base.BaseServicesPollster): """Pollster to capture Load Balancer pool status samples.""" + FIELDS = ['admin_state_up', 'description', 'lb_method', @@ -76,11 +48,8 @@ class LBPoolPollster(_BasePollster): 'vip_id' ] - def _get_lb_pools(self): - return self.nc.pool_get_all() - def get_samples(self, manager, cache, resources=None): - for pool in self._iter_cache(cache, 'pool', self._get_lb_pools): + for pool in resources: LOG.debug("Load Balancer Pool : %s" % pool) status = self.get_status_id(pool['status']) if status == -1: @@ -102,8 +71,9 @@ class LBPoolPollster(_BasePollster): ) -class LBVipPollster(_BasePollster): +class LBVipPollster(base.BaseServicesPollster): """Pollster to capture Load Balancer Vip status samples.""" + FIELDS = ['admin_state_up', 'address', 'connection_limit', @@ -119,11 +89,8 @@ class LBVipPollster(_BasePollster): 'session_persistence', ] - def _get_lb_vips(self): - return self.nc.vip_get_all() - def get_samples(self, manager, cache, resources=None): - for vip in self._iter_cache(cache, 'vip', self._get_lb_vips): + for vip in resources: LOG.debug("Load Balancer Vip : %s" % vip) status = self.get_status_id(vip['status']) if status == -1: @@ -145,8 +112,9 @@ class LBVipPollster(_BasePollster): ) -class LBMemberPollster(_BasePollster): +class LBMemberPollster(base.BaseServicesPollster): """Pollster to capture Load Balancer Member status samples.""" + FIELDS = ['admin_state_up', 'address', 'pool_id', @@ -156,11 +124,8 @@ class LBMemberPollster(_BasePollster): 'weight', ] - def _get_lb_members(self): - return self.nc.member_get_all() - def get_samples(self, manager, cache, resources=None): - for member in self._iter_cache(cache, 'member', self._get_lb_members): + for member in resources: LOG.debug("Load Balancer Member : %s" % member) status = self.get_status_id(member['status']) if status == -1: @@ -180,8 +145,9 @@ class LBMemberPollster(_BasePollster): ) -class LBHealthMonitorPollster(_BasePollster): +class LBHealthMonitorPollster(base.BaseServicesPollster): """Pollster to capture Load Balancer Health probes status samples.""" + FIELDS = ['admin_state_up', 'delay', 'max_retries', @@ -190,12 +156,8 @@ class LBHealthMonitorPollster(_BasePollster): 'type' ] - def _get_lb_health_probes(self): - return self.nc.health_monitor_get_all() - def get_samples(self, manager, cache, resources=None): - for probe in self._iter_cache(cache, 'monitor', - self._get_lb_health_probes): + for probe in resources: LOG.debug("Load Balancer Health probe : %s" % probe) yield sample.Sample( name='network.services.lb.health_monitor', @@ -211,11 +173,11 @@ class LBHealthMonitorPollster(_BasePollster): @six.add_metaclass(abc.ABCMeta) -class _LBStatsPollster(_BasePollster): +class _LBStatsPollster(base.BaseServicesPollster): """Base Statistics pollster. - It is capturing the statistics info and yielding samples for connections - and bandwidth. + It is capturing the statistics info and yielding samples for connections + and bandwidth. """ def _get_lb_pools(self): diff --git a/ceilometer/neutron_client.py b/ceilometer/neutron_client.py index a58447b09..36323ae4d 100644 --- a/ceilometer/neutron_client.py +++ b/ceilometer/neutron_client.py @@ -79,6 +79,7 @@ class Client(object): @logged def pool_get_all(self): + LOG.debug("NEUTRON POOL GET") resp = self.client.list_pools() return resp.get('pools') diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index bb4ffb00d..651a44907 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -98,7 +98,6 @@ class Source(object): except KeyError as err: raise PipelineException( "Required field %s not specified" % err.args[0], cfg) - if self.interval <= 0: raise PipelineException("Interval value should > 0", cfg) @@ -109,7 +108,6 @@ class Source(object): self.discovery = cfg.get('discovery') or [] if not isinstance(self.discovery, list): raise PipelineException("Discovery should be a list", cfg) - self._check_meters() def __str__(self): diff --git a/ceilometer/tests/network/services/test_lbaas.py b/ceilometer/tests/network/services/test_lbaas.py index 15ef1002b..340c6f23d 100644 --- a/ceilometer/tests/network/services/test_lbaas.py +++ b/ceilometer/tests/network/services/test_lbaas.py @@ -18,6 +18,7 @@ import mock from ceilometer.central import manager +from ceilometer.network.services import discovery from ceilometer.network.services import lbaas from ceilometer.openstack.common import context from ceilometer.openstack.common.fixture import mockpatch @@ -106,26 +107,56 @@ class TestLBPoolPollster(_BaseTestLBPollster): 'subnet_id': 'bbe3d818-bdcb-4e4b-b47f-5650dc8a9d7a', 'tenant_id': 'a4eb9f4938bb418bbc4f8eb31802fefa', 'health_monitors_status': []}, + {'status': 'error', + 'lb_method': 'ROUND_ROBIN', + 'protocol': 'HTTP', + 'description': '', + 'health_monitors': [], + 'members': [], + 'provider': 'haproxy', + 'status_description': None, + 'id': 'fe7rad36-437d-4c84-aee1-186027d3bdcd', + 'vip_id': 'cd6a6fee-e2fa-4e6c-b3c2-bfbe395752c1', + 'name': 'mylb_error', + 'admin_state_up': True, + 'subnet_id': 'bbe3d818-bdcb-4e4b-b47f-5650dc8a9d7a', + 'tenant_id': 'a4eb9f4938bb418bbc4f8eb31802fefa', + 'health_monitors_status': []}, ] def test_pool_get_samples(self): - samples = list(self.pollster.get_samples(self.manager, {})) + samples = list(self.pollster.get_samples( + self.manager, {}, + resources=self.fake_get_pools())) self.assertEqual(3, len(samples)) for field in self.pollster.FIELDS: self.assertEqual(self.fake_get_pools()[0][field], samples[0].resource_metadata[field]) def test_pool_volume(self): - samples = list(self.pollster.get_samples(self.manager, {})) + samples = list(self.pollster.get_samples( + self.manager, {}, + resources=self.fake_get_pools())) self.assertEqual(1, samples[0].volume) self.assertEqual(0, samples[1].volume) self.assertEqual(2, samples[2].volume) def test_get_pool_meter_names(self): - samples = list(self.pollster.get_samples(self.manager, {})) + samples = list(self.pollster.get_samples( + self.manager, {}, + resources=self.fake_get_pools())) self.assertEqual(set(['network.services.lb.pool']), set([s.name for s in samples])) + def test_pool_discovery(self): + discovered_pools = discovery.LBPoolsDiscovery().discover() + self.assertEqual(4, len(discovered_pools)) + for pool in self.fake_get_pools(): + if pool['status'] == 'error': + self.assertTrue(pool not in discovered_pools) + else: + self.assertTrue(pool in discovered_pools) + class TestLBVipPollster(_BaseTestLBPollster): @@ -199,26 +230,56 @@ class TestLBVipPollster(_BaseTestLBPollster): 'port_id': '3df3c4de-b32e-4ca1-a7f4-84323ba5f291', 'id': 'fg6a6fee-e2fa-4e6c-b3c2-bfbe395752c1', 'name': 'myvip03'}, + {'status': 'error', + 'status_description': None, + 'protocol': 'HTTP', + 'description': '', + 'admin_state_up': True, + 'subnet_id': 'bbe3d818-bdcb-4e4b-b47f-5650dc8a9d7a', + 'tenant_id': 'a4eb9f4938bb418bbc4f8eb31802fefa', + 'connection_limit': -1, + 'pool_id': 'ce73ad36-437d-4c84-aee1-186027d3da9a', + 'session_persistence': None, + 'address': '10.0.0.8', + 'protocol_port': 80, + 'port_id': '3df3c4de-b32e-4ca1-a7f4-84323ba5f291', + 'id': 'fg6a6fee-e2fa-4e6c-b3c2-bfbe395752c1', + 'name': 'myvip_error'}, ] def test_vip_get_samples(self): - samples = list(self.pollster.get_samples(self.manager, {})) + samples = list(self.pollster.get_samples( + self.manager, {}, + resources=self.fake_get_vips())) self.assertEqual(3, len(samples)) for field in self.pollster.FIELDS: self.assertEqual(self.fake_get_vips()[0][field], samples[0].resource_metadata[field]) def test_pool_volume(self): - samples = list(self.pollster.get_samples(self.manager, {})) + samples = list(self.pollster.get_samples( + self.manager, {}, + resources=self.fake_get_vips())) self.assertEqual(1, samples[0].volume) self.assertEqual(0, samples[1].volume) self.assertEqual(2, samples[2].volume) def test_get_vip_meter_names(self): - samples = list(self.pollster.get_samples(self.manager, {})) + samples = list(self.pollster.get_samples( + self.manager, {}, + resources=self.fake_get_vips())) self.assertEqual(set(['network.services.lb.vip']), set([s.name for s in samples])) + def test_vip_discovery(self): + discovered_vips = discovery.LBVipsDiscovery().discover() + self.assertEqual(4, len(discovered_vips)) + for pool in self.fake_get_vips(): + if pool['status'] == 'error': + self.assertTrue(pool not in discovered_vips) + else: + self.assertTrue(pool in discovered_vips) + class TestLBMemberPollster(_BaseTestLBPollster): @@ -268,26 +329,50 @@ class TestLBMemberPollster(_BaseTestLBPollster): 'address': '10.0.0.6', 'status_description': None, 'id': '45630b61eb-07bc-4372-9fbf-36459dd0f96b'}, + {'status': 'error', + 'protocol_port': 80, + 'weight': 1, + 'admin_state_up': True, + 'tenant_id': 'a4eb9f4938bb418bbc4f8eb31802fefa', + 'pool_id': 'ce73ad36-437d-4c84-aee1-186027d3da9a', + 'address': '10.0.0.6', + 'status_description': None, + 'id': '45630b61eb-07bc-4372-9fbf-36459dd0f96b'}, ] def test_get_samples_not_empty(self): - samples = list(self.pollster.get_samples(self.manager, {})) + samples = list(self.pollster.get_samples( + self.manager, {}, + self.fake_get_members())) self.assertEqual(3, len(samples)) for field in self.pollster.FIELDS: self.assertEqual(self.fake_get_members()[0][field], samples[0].resource_metadata[field]) def test_pool_volume(self): - samples = list(self.pollster.get_samples(self.manager, {})) + samples = list(self.pollster.get_samples( + self.manager, {}, + self.fake_get_members())) self.assertEqual(1, samples[0].volume) self.assertEqual(0, samples[1].volume) self.assertEqual(2, samples[2].volume) def test_get_meter_names(self): - samples = list(self.pollster.get_samples(self.manager, {})) + samples = list(self.pollster.get_samples( + self.manager, {}, + self.fake_get_members())) self.assertEqual(set(['network.services.lb.member']), set([s.name for s in samples])) + def test_members_discovery(self): + discovered_members = discovery.LBMembersDiscovery().discover() + self.assertEqual(4, len(discovered_members)) + for pool in self.fake_get_members(): + if pool['status'] == 'error': + self.assertTrue(pool not in discovered_members) + else: + self.assertTrue(pool in discovered_members) + class TestLBHealthProbePollster(_BaseTestLBPollster): @@ -312,17 +397,25 @@ class TestLBHealthProbePollster(_BaseTestLBPollster): }] def test_get_samples_not_empty(self): - samples = list(self.pollster.get_samples(self.manager, {})) + samples = list(self.pollster.get_samples( + self.manager, {}, + self.fake_get_health_monitor())) self.assertEqual(1, len(samples)) for field in self.pollster.FIELDS: self.assertEqual(self.fake_get_health_monitor()[0][field], samples[0].resource_metadata[field]) def test_get_meter_names(self): - samples = list(self.pollster.get_samples(self.manager, {})) + samples = list(self.pollster.get_samples( + self.manager, {}, + self.fake_get_health_monitor())) self.assertEqual(set(['network.services.lb.health_monitor']), set([s.name for s in samples])) + def test_probes_discovery(self): + discovered_probes = discovery.LBHealthMonitorsDiscovery().discover() + self.assertEqual(discovered_probes, self.fake_get_health_monitor()) + class TestLBStatsPollster(_BaseTestLBPollster): diff --git a/etc/ceilometer/pipeline.yaml b/etc/ceilometer/pipeline.yaml index dcd9feab6..e02f05582 100644 --- a/etc/ceilometer/pipeline.yaml +++ b/etc/ceilometer/pipeline.yaml @@ -30,6 +30,38 @@ sources: - "network.outgoing.packets" sinks: - network_sink + - name: lb_pool_source + interval: 600 + meters: + - "network.services.lb.pool" + discovery: + - "lb_pools" + sinks: + - meter_sink + - name: lb_health_monitor_source + interval: 600 + meters: + - "network.services.lb.health_monitor" + discovery: + - "lb_health_probes" + sinks: + - meter_sink + - name: lb_vip_source + interval: 600 + meters: + - "network.services.lb.vip" + discovery: + - "lb_vips" + sinks: + - meter_sink + - name: lb_member_source + interval: 600 + meters: + - "network.services.lb.member" + discovery: + - "lb_members" + sinks: + - meter_sink sinks: - name: meter_sink transformers: diff --git a/setup.cfg b/setup.cfg index 67c64126e..fe1d9fd11 100644 --- a/setup.cfg +++ b/setup.cfg @@ -69,6 +69,11 @@ ceilometer.notification = ceilometer.discover = local_instances = ceilometer.compute.discovery:InstanceDiscovery + lb_pools = ceilometer.network.services.discovery:LBPoolsDiscovery + lb_vips = ceilometer.network.services.discovery:LBVipsDiscovery + lb_members = ceilometer.network.services.discovery:LBMembersDiscovery + lb_health_probes = ceilometer.network.services.discovery:LBHealthMonitorsDiscovery + ceilometer.poll.compute = disk.read.requests = ceilometer.compute.pollsters.disk:ReadRequestsPollster