Central agent work-load partitioning

Provides a mechanism to allow the central agent to be horizontally
scaled out, such that each agent polls a disjoint subset of resources.

This is achieved through the use of `tooz` library for distributed
coordination.

If a service wants to use work-load partitioning, it must first
create a PartitionCoordinator object and call its `heartbeat` method
periodically.

To distribute a set of resources over multiple agents, use the
`extract_my_subset` method of the PartitionCoordinator that filters an
iterable, returning only the resources assigned to us.

The `PartitionCoordinator` uses `tooz` to figure out which agents are
in the same group and figures out which resources belong to the
current agent.

DocImpact
Change-Id: I7adef87b03129f4f8b38109bf547c7403cc6adad
Implements: blueprint central-agent-partitioning
This commit is contained in:
Nejc Saje 2014-08-12 10:58:41 -04:00
parent 8362688d64
commit 9a2f8618de
10 changed files with 565 additions and 3 deletions

View File

@ -4,6 +4,7 @@
#
# Authors: Julien Danjou <julien@danjou.info>
# Eoghan Glynn <eglynn@redhat.com>
# Nejc Saje <nsaje@redhat.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -20,10 +21,12 @@
import collections
import itertools
from oslo.config import cfg
import six
from six.moves.urllib import parse as urlparse
from stevedore import extension
from ceilometer import coordination
from ceilometer.openstack.common import context
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log
@ -32,6 +35,9 @@ from ceilometer import pipeline
LOG = log.getLogger(__name__)
cfg.CONF.import_opt('heartbeat', 'ceilometer.coordination',
group='coordination')
class Resources(object):
def __init__(self, agent_manager):
@ -100,13 +106,16 @@ class PollingTask(object):
class AgentManager(os_service.Service):
def __init__(self, namespace, default_discovery=None):
def __init__(self, namespace, default_discovery=None, group_prefix=None):
super(AgentManager, self).__init__()
default_discovery = default_discovery or []
self.default_discovery = default_discovery
self.pollster_manager = self._extensions('poll', namespace)
self.discovery_manager = self._extensions('discover')
self.context = context.RequestContext('admin', 'admin', is_admin=True)
self.partition_coordinator = coordination.PartitionCoordinator()
self.group_prefix = ('%s-%s' % (namespace, group_prefix)
if group_prefix else namespace)
@staticmethod
def _extensions(category, agent_ns=None):
@ -117,6 +126,12 @@ class AgentManager(os_service.Service):
invoke_on_load=True,
)
def join_partitioning_groups(self):
groups = set([self._construct_group_id(d.obj.group_id)
for d in self.discovery_manager])
for group in groups:
self.partition_coordinator.join_group(group)
def create_polling_task(self):
"""Create an initially empty polling task."""
return PollingTask(self)
@ -135,13 +150,27 @@ class AgentManager(os_service.Service):
return polling_tasks
def _construct_group_id(self, discovery_group_id):
return ('%s-%s' % (self.group_prefix,
discovery_group_id)
if discovery_group_id else None)
def start(self):
self.pipeline_manager = pipeline.setup_pipeline()
self.partition_coordinator.start()
self.join_partitioning_groups()
# allow time for coordination if necessary
delay_start = self.partition_coordinator.is_active()
for interval, task in six.iteritems(self.setup_polling_tasks()):
self.tg.add_timer(interval,
self.interval_task,
initial_delay=interval if delay_start else None,
task=task)
self.tg.add_timer(cfg.CONF.coordination.heartbeat,
self.partition_coordinator.heartbeat)
@staticmethod
def interval_task(task):
@ -166,7 +195,10 @@ class AgentManager(os_service.Service):
if discoverer:
try:
discovered = discoverer.discover(param)
resources.extend(discovered)
partitioned = self.partition_coordinator.extract_my_subset(
self._construct_group_id(discoverer.group_id),
discovered)
resources.extend(partitioned)
except Exception as err:
LOG.exception(_('Unable to discover resources: %s') % err)
else:

View File

@ -21,6 +21,16 @@ from oslo.config import cfg
from ceilometer import agent
from ceilometer.openstack.common import log
OPTS = [
cfg.StrOpt('partitioning_group_prefix',
default=None,
help='Work-load partitioning group prefix. Use only if you '
'want to run multiple central agents with different '
'config files. For each sub-group of the central agent '
'pool with the same partitioning_group_prefix a disjoint '
'subset of pollsters should be loaded.'),
]
cfg.CONF.register_opts(OPTS, group='central')
cfg.CONF.import_group('service_credentials', 'ceilometer.service')
LOG = log.getLogger(__name__)
@ -29,7 +39,8 @@ LOG = log.getLogger(__name__)
class AgentManager(agent.AgentManager):
def __init__(self):
super(AgentManager, self).__init__('central')
super(AgentManager, self).__init__(
'central', group_prefix=cfg.CONF.central.partitioning_group_prefix)
def interval_task(self, task):
try:

View File

@ -20,6 +20,14 @@ from oslo.config import cfg
from ceilometer import nova_client
from ceilometer import plugin
OPTS = [
cfg.BoolOpt('workload_partitioning',
default=False,
help='Enable work-load partitioning, allowing multiple '
'compute agents to be run simultaneously.')
]
cfg.CONF.register_opts(OPTS, group='compute')
class InstanceDiscovery(plugin.DiscoveryBase):
def __init__(self):
@ -31,3 +39,10 @@ class InstanceDiscovery(plugin.DiscoveryBase):
instances = self.nova_cli.instance_get_all_by_host(cfg.CONF.host)
return [i for i in instances
if getattr(i, 'OS-EXT-STS:vm_state', None) != 'error']
@property
def group_id(self):
if cfg.CONF.compute.workload_partitioning:
return cfg.CONF.host
else:
return None

144
ceilometer/coordination.py Normal file
View File

@ -0,0 +1,144 @@
#
# Copyright 2014 Red Hat, Inc.
#
# Author: Nejc Saje <nsaje@redhat.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from oslo.config import cfg
import tooz.coordination
from ceilometer.openstack.common.gettextutils import _LE, _LI
from ceilometer.openstack.common import log
from ceilometer import utils
LOG = log.getLogger(__name__)
OPTS = [
cfg.StrOpt('backend_url',
default=None,
help='The backend URL to use for distributed coordination. If '
'left empty, per-deployment central agent and per-host '
'compute agent won\'t do workload '
'partitioning and will only function correctly if a '
'single instance of that service is running.'),
cfg.FloatOpt('heartbeat',
default=1.0,
help='Number of seconds between heartbeats for distributed '
'coordination (float)')
]
cfg.CONF.register_opts(OPTS, group='coordination')
class PartitionCoordinator(object):
"""Workload partitioning coordinator.
This class uses the `tooz` library to manage group membership.
To ensure that the other agents know this agent is still alive,
the `heartbeat` method should be called periodically.
Coordination errors and reconnects are handled under the hood, so the
service using the partition coordinator need not care whether the
coordination backend is down. The `extract_my_subset` will simply return an
empty iterable in this case.
"""
def __init__(self, my_id=None):
self._coordinator = None
self._groups = set()
self._my_id = my_id or str(uuid.uuid4())
self._started = False
def start(self):
backend_url = cfg.CONF.coordination.backend_url
if backend_url:
try:
self._coordinator = tooz.coordination.get_coordinator(
backend_url, self._my_id)
self._coordinator.start()
self._started = True
LOG.info(_LI('Coordination backend started successfully.'))
except tooz.coordination.ToozError:
self._started = False
LOG.exception(_LE('Error connecting to coordination backend.'))
def is_active(self):
return self._coordinator is not None
def heartbeat(self):
if self._coordinator:
if not self._started:
# re-connect
self.start()
try:
self._coordinator.heartbeat()
except tooz.coordination.ToozError:
LOG.exception(_LE('Error sending a heartbeat to coordination '
'backend.'))
def join_group(self, group_id):
if not self._coordinator or not self._started or not group_id:
return
while True:
try:
join_req = self._coordinator.join_group(group_id)
join_req.get()
LOG.info(_LI('Joined partitioning group %s'), group_id)
break
except tooz.coordination.MemberAlreadyExist:
return
except tooz.coordination.GroupNotCreated:
create_grp_req = self._coordinator.create_group(group_id)
try:
create_grp_req.get()
except tooz.coordination.GroupAlreadyExist:
pass
self._groups.add(group_id)
def _get_members(self, group_id):
if not self._coordinator:
return [self._my_id]
while True:
get_members_req = self._coordinator.get_members(group_id)
try:
return get_members_req.get()
except tooz.coordination.GroupNotCreated:
self.join_group(group_id)
def extract_my_subset(self, group_id, iterable):
"""Filters an iterable, returning only objects assigned to this agent.
We have a list of objects and get a list of active group members from
`tooz`. We then hash all the objects into buckets and return only
the ones that hashed into *our* bucket.
"""
if not group_id:
return iterable
if group_id not in self._groups:
self.join_group(group_id)
try:
members = self._get_members(group_id)
LOG.debug('Members of group: %s', members)
hr = utils.HashRing(members)
filtered = [v for v in iterable
if hr.get_node(str(v)) == self._my_id]
LOG.debug('My subset: %s', filtered)
return filtered
except tooz.coordination.ToozError:
LOG.exception(_LE('Error getting group membership info from '
'coordination backend.'))
return []

View File

@ -152,3 +152,21 @@ class DiscoveryBase(object):
:param param: an optional parameter to guide the discovery
"""
@property
def group_id(self):
"""Return group id of this discovery.
All running recoveries with the same group_id should return the same
set of resources at a given point in time. By default, a discovery is
put into a global group, meaning that all discoveries of its type
running anywhere in the cloud, return the same set of resources.
This property can be overridden to provide correct grouping of
localized discoveries. For example, compute discovery is localized
to a host, which is reflected in its group_id.
A None value signifies that this discovery does not want to be part
of workload partitioning at all.
"""
return 'global'

View File

@ -7,6 +7,7 @@
# Authors: Yunhong Jiang <yunhong.jiang@intel.com>
# Julien Danjou <julien@danjou.info>
# Eoghan Glynn <eglynn@redhat.com>
# Nejc Saje <nsaje@redhat.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -158,6 +159,10 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
params = []
resources = []
@property
def group_id(self):
return 'another_group'
class DiscoveryException(TestDiscoveryException):
params = []
@ -222,6 +227,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
super(BaseAgentManagerTestCase, self).setUp()
self.mgr = self.create_manager()
self.mgr.pollster_manager = self.create_pollster_manager()
self.mgr.partition_coordinator = mock.MagicMock()
fake_subset = lambda _, x: x
p_coord = self.mgr.partition_coordinator
p_coord.extract_my_subset.side_effect = fake_subset
self.mgr.tg = mock.MagicMock()
self.pipeline_cfg = [{
'name': "test_pipeline",
'interval': 60,
@ -261,6 +271,29 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.DiscoveryAnother.resources = []
super(BaseAgentManagerTestCase, self).tearDown()
@mock.patch('ceilometer.pipeline.setup_pipeline')
def test_start(self, setup_pipeline):
self.mgr.join_partitioning_groups = mock.MagicMock()
self.mgr.setup_polling_tasks = mock.MagicMock()
self.CONF.set_override('heartbeat', 1.0, group='coordination')
self.mgr.start()
setup_pipeline.assert_called_once_with()
self.mgr.partition_coordinator.start.assert_called_once_with()
self.mgr.join_partitioning_groups.assert_called_once_with()
self.mgr.setup_polling_tasks.assert_called_once_with()
timer_call = mock.call(1.0, self.mgr.partition_coordinator.heartbeat)
self.assertEqual([timer_call], self.mgr.tg.add_timer.call_args_list)
def test_join_partitioning_groups(self):
self.mgr.discovery_manager = self.create_discovery_manager()
self.mgr.join_partitioning_groups()
p_coord = self.mgr.partition_coordinator
expected = [mock.call(self.mgr._construct_group_id(g))
for g in ['another_group', 'global']]
self.assertEqual(len(expected), len(p_coord.join_group.call_args_list))
for c in expected:
self.assertIn(c, p_coord.join_group.call_args_list)
def test_setup_polling_tasks(self):
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
@ -544,3 +577,22 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.assertEqual(1, len(self.Pollster.samples))
self.assertEqual(['discovered_1', 'discovered_2'],
self.Pollster.resources)
def test_discovery_partitioning(self):
self.mgr.discovery_manager = self.create_discovery_manager()
p_coord = self.mgr.partition_coordinator
self.pipeline_cfg[0]['discovery'] = ['testdiscovery',
'testdiscoveryanother',
'testdiscoverynonexistent',
'testdiscoveryexception']
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
expected = [mock.call(self.mgr._construct_group_id(d.obj.group_id),
d.obj.resources)
for d in self.mgr.discovery_manager
if hasattr(d.obj, 'resources')]
self.assertEqual(len(expected),
len(p_coord.extract_my_subset.call_args_list))
for c in expected:
self.assertIn(c, p_coord.extract_my_subset.call_args_list)

View File

@ -0,0 +1,224 @@
#
# Copyright 2014 Red Hat, Inc.
#
# Author: Nejc Saje <nsaje@redhat.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import mock
from oslo.config import fixture as fixture_config
import tooz.coordination
from ceilometer import coordination
from ceilometer.tests import base
from ceilometer import utils
class MockToozCoordinator(object):
def __init__(self, member_id, shared_storage):
self._member_id = member_id
self._groups = shared_storage
def start(self):
pass
def heartbeat(self):
pass
def create_group(self, group_id):
if group_id in self._groups:
return MockAsyncError(
tooz.coordination.GroupAlreadyExist(group_id))
self._groups[group_id] = {}
return MockAsyncResult(None)
def join_group(self, group_id, capabilities=b''):
if group_id not in self._groups:
return MockAsyncError(
tooz.coordination.GroupNotCreated(group_id))
if self._member_id in self._groups[group_id]:
return MockAsyncError(
tooz.coordination.MemberAlreadyExist(group_id,
self._member_id))
self._groups[group_id][self._member_id] = {
"capabilities": capabilities,
}
return MockAsyncResult(None)
def get_members(self, group_id):
if group_id not in self._groups:
return MockAsyncError(
tooz.coordination.GroupNotCreated(group_id))
return MockAsyncResult(self._groups[group_id])
class MockToozCoordExceptionRaiser(MockToozCoordinator):
def start(self):
raise tooz.coordination.ToozError('error')
def heartbeat(self):
raise tooz.coordination.ToozError('error')
def join_group(self, group_id, capabilities=b''):
raise tooz.coordination.ToozError('error')
def get_members(self, group_id):
raise tooz.coordination.ToozError('error')
class MockAsyncResult(tooz.coordination.CoordAsyncResult):
def __init__(self, result):
self.result = result
def get(self, timeout=0):
return self.result
@staticmethod
def done():
return True
class MockAsyncError(tooz.coordination.CoordAsyncResult):
def __init__(self, error):
self.error = error
def get(self, timeout=0):
raise self.error
@staticmethod
def done():
return True
class MockLoggingHandler(logging.Handler):
"""Mock logging handler to check for expected logs."""
def __init__(self, *args, **kwargs):
self.reset()
logging.Handler.__init__(self, *args, **kwargs)
def emit(self, record):
self.messages[record.levelname.lower()].append(record.getMessage())
def reset(self):
self.messages = {'debug': [],
'info': [],
'warning': [],
'error': [],
'critical': []}
class TestPartitioning(base.BaseTestCase):
def setUp(self):
super(TestPartitioning, self).setUp()
self.CONF = self.useFixture(fixture_config.Config()).conf
self.str_handler = MockLoggingHandler()
coordination.LOG.logger.addHandler(self.str_handler)
self.shared_storage = {}
def _get_new_started_coordinator(self, shared_storage, agent_id=None,
coordinator_cls=None):
coordinator_cls = coordinator_cls or MockToozCoordinator
self.CONF.set_override('backend_url', 'xxx://yyy',
group='coordination')
with mock.patch('tooz.coordination.get_coordinator',
lambda _, member_id:
coordinator_cls(member_id, shared_storage)):
pc = coordination.PartitionCoordinator(agent_id)
pc.start()
return pc
def _usage_simulation(self, *agents_kwargs):
partition_coordinators = []
for kwargs in agents_kwargs:
partition_coordinator = self._get_new_started_coordinator(
self.shared_storage, kwargs['agent_id'], kwargs.get(
'coordinator_cls'))
partition_coordinator.join_group(kwargs['group_id'])
partition_coordinators.append(partition_coordinator)
for i, kwargs in enumerate(agents_kwargs):
all_resources = kwargs.get('all_resources', [])
expected_resources = kwargs.get('expected_resources', [])
actual_resources = partition_coordinators[i].extract_my_subset(
kwargs['group_id'], all_resources)
self.assertEqual(expected_resources, actual_resources)
def test_single_group(self):
agents = [dict(agent_id='agent1', group_id='group'),
dict(agent_id='agent2', group_id='group')]
self._usage_simulation(*agents)
self.assertEqual(sorted(self.shared_storage.keys()), ['group'])
self.assertEqual(sorted(self.shared_storage['group'].keys()),
['agent1', 'agent2'])
def test_multiple_groups(self):
agents = [dict(agent_id='agent1', group_id='group1'),
dict(agent_id='agent2', group_id='group2')]
self._usage_simulation(*agents)
self.assertEqual(sorted(self.shared_storage.keys()), ['group1',
'group2'])
def test_partitioning(self):
all_resources = ['resource_%s' % i for i in range(1000)]
agents = ['agent_%s' % i for i in range(10)]
expected_resources = [list() for _ in range(len(agents))]
hr = utils.HashRing(agents)
for r in all_resources:
key = agents.index(hr.get_node(r))
expected_resources[key].append(r)
agents_kwargs = []
for i, agent in enumerate(agents):
agents_kwargs.append(dict(agent_id=agent,
group_id='group',
all_resources=all_resources,
expected_resources=expected_resources[i]))
self._usage_simulation(*agents_kwargs)
def test_coordination_backend_offline(self):
agents = [dict(agent_id='agent1',
group_id='group',
all_resources=['res1', 'res2'],
expected_resources=[],
coordinator_cls=MockToozCoordExceptionRaiser)]
self._usage_simulation(*agents)
expected_errors = ['Error getting group membership info from '
'coordination backend.',
'Error connecting to coordination backend.']
for e in expected_errors:
self.assertIn(e, self.str_handler.messages['error'])
def test_reconnect(self):
coord = self._get_new_started_coordinator({}, 'a',
MockToozCoordExceptionRaiser)
with mock.patch('tooz.coordination.get_coordinator',
return_value=MockToozCoordExceptionRaiser('a', {})):
coord.heartbeat()
expected_errors = ['Error connecting to coordination backend.',
'Error sending a heartbeat to coordination '
'backend.']
for e in expected_errors:
self.assertIn(e, self.str_handler.messages['error'])
self.str_handler.messages['error'] = []
with mock.patch('tooz.coordination.get_coordinator',
return_value=MockToozCoordinator('a', {})):
coord.heartbeat()
for e in expected_errors:
self.assertNotIn(e, self.str_handler.messages['error'])

View File

@ -147,3 +147,35 @@ class TestUtils(base.BaseTestCase):
('nested2[0].c', 'A'),
('nested2[1].c', 'B')],
sorted(pairs, key=lambda x: x[0]))
def test_hash_ring(self):
num_nodes = 10
num_keys = 1000
nodes = [str(x) for x in range(num_nodes)]
hr = utils.HashRing(nodes)
buckets = [0] * num_nodes
assignments = [-1] * num_keys
for k in range(num_keys):
n = int(hr.get_node(str(k)))
self.assertTrue(0 <= n <= num_nodes)
buckets[n] += 1
assignments[k] = n
# at least something in each bucket
self.assertTrue(all((c > 0 for c in buckets)))
# approximately even distribution
diff = max(buckets) - min(buckets)
self.assertTrue(diff < 0.3 * (num_keys / num_nodes))
# consistency
num_nodes += 1
nodes.append(str(num_nodes + 1))
hr = utils.HashRing(nodes)
for k in range(num_keys):
n = int(hr.get_node(str(k)))
assignments[k] -= n
reassigned = len([c for c in assignments if c != 0])
self.assertTrue(reassigned < num_keys / num_nodes)

View File

@ -18,11 +18,14 @@
"""Utilities and helper functions."""
import bisect
import calendar
import copy
import datetime
import decimal
import hashlib
import multiprocessing
import struct
from oslo.utils import timeutils
from oslo.utils import units
@ -180,3 +183,33 @@ def uniq(dupes, attrs):
deduped.append(d)
keys.append(key(d))
return deduped
class HashRing(object):
def __init__(self, nodes, replicas=100):
self._ring = dict()
self._sorted_keys = []
for node in nodes:
for r in six.moves.range(replicas):
hashed_key = self._hash('%s-%s' % (node, r))
self._ring[hashed_key] = node
self._sorted_keys.append(hashed_key)
self._sorted_keys.sort()
@staticmethod
def _hash(key):
return struct.unpack_from('>I',
hashlib.md5(str(key).encode()).digest())[0]
def _get_position_on_ring(self, key):
hashed_key = self._hash(key)
position = bisect.bisect(self._sorted_keys, hashed_key)
return position if position < len(self._sorted_keys) else 0
def get_node(self, key):
if not self._ring:
return None
pos = self._get_position_on_ring(key)
return self._ring[self._sorted_keys[pos]]

View File

@ -35,5 +35,6 @@ six>=1.7.0
SQLAlchemy>=0.8.4,<=0.8.99,>=0.9.7,<=0.9.99
sqlalchemy-migrate>=0.9.1
stevedore>=0.14
tooz>=0.3
WebOb>=1.2.3
WSME>=0.6