publisher.rpc: queing policies
This change allow to configure the behavior of rpc publisher when rabbitmq is down. 3 policies are available: - default: wait until it comes back if rabbit_max_retries <= 0 raise a exception if rabbit_max_retries > 0 - drop: don't publish the samples - queue: create a local queue of 'max_queue_length' samples Configuration of the policy is done via the publisher url example: rpc://?policy=queue&max_queue_length=100 Change-Id: I41c9be9e1c760db1155711325434a1877e6dd7b2 Fixes: bug#1192918 bug#1189488
This commit is contained in:
parent
77e59ebb00
commit
3f044dd1ac
@ -58,6 +58,9 @@ def register_opts(config):
|
|||||||
|
|
||||||
register_opts(cfg.CONF)
|
register_opts(cfg.CONF)
|
||||||
|
|
||||||
|
cfg.CONF.import_opt('rabbit_max_retries',
|
||||||
|
'ceilometer.openstack.common.rpc.impl_kombu')
|
||||||
|
|
||||||
|
|
||||||
def compute_signature(message, secret):
|
def compute_signature(message, secret):
|
||||||
"""Return the signature for a message dictionary.
|
"""Return the signature for a message dictionary.
|
||||||
@ -108,10 +111,32 @@ class RPCPublisher(publisher.PublisherBase):
|
|||||||
|
|
||||||
def __init__(self, parsed_url):
|
def __init__(self, parsed_url):
|
||||||
options = urlparse.parse_qs(parsed_url.query)
|
options = urlparse.parse_qs(parsed_url.query)
|
||||||
|
# the values of the option is a list of url params values
|
||||||
|
# only take care of the latest one if the option
|
||||||
|
# is provided more than once
|
||||||
self.per_meter_topic = bool(int(
|
self.per_meter_topic = bool(int(
|
||||||
options.get('per_meter_topic', [0])[-1]))
|
options.get('per_meter_topic', [0])[-1]))
|
||||||
|
|
||||||
self.target = options.get('target', ['record_metering_data'])[0]
|
self.target = options.get('target', ['record_metering_data'])[0]
|
||||||
|
|
||||||
|
self.policy = options.get('policy', ['wait'])[-1]
|
||||||
|
self.max_queue_length = int(options.get(
|
||||||
|
'max_queue_length', [1024])[-1])
|
||||||
|
|
||||||
|
self.local_queue = []
|
||||||
|
|
||||||
|
if self.policy in ['queue', 'drop']:
|
||||||
|
LOG.info('Publishing policy set to %s, \
|
||||||
|
override rabbit_max_retries to 1' % self.policy)
|
||||||
|
cfg.CONF.set_override("rabbit_max_retries", 1)
|
||||||
|
|
||||||
|
elif self.policy == 'default':
|
||||||
|
LOG.info('Publishing policy set to %s' % self.policy)
|
||||||
|
else:
|
||||||
|
LOG.warn('Publishing policy is unknown (%s) force to default'
|
||||||
|
% self.policy)
|
||||||
|
self.policy = 'default'
|
||||||
|
|
||||||
def publish_counters(self, context, counters, source):
|
def publish_counters(self, context, counters, source):
|
||||||
"""Publish counters on RPC.
|
"""Publish counters on RPC.
|
||||||
|
|
||||||
@ -137,7 +162,7 @@ class RPCPublisher(publisher.PublisherBase):
|
|||||||
}
|
}
|
||||||
LOG.audit('Publishing %d counters on %s',
|
LOG.audit('Publishing %d counters on %s',
|
||||||
len(msg['args']['data']), topic)
|
len(msg['args']['data']), topic)
|
||||||
rpc.cast(context, topic, msg)
|
self.local_queue.append((context, topic, msg))
|
||||||
|
|
||||||
if self.per_meter_topic:
|
if self.per_meter_topic:
|
||||||
for meter_name, meter_list in itertools.groupby(
|
for meter_name, meter_list in itertools.groupby(
|
||||||
@ -151,4 +176,52 @@ class RPCPublisher(publisher.PublisherBase):
|
|||||||
topic_name = topic + '.' + meter_name
|
topic_name = topic + '.' + meter_name
|
||||||
LOG.audit('Publishing %d counters on %s',
|
LOG.audit('Publishing %d counters on %s',
|
||||||
len(msg['args']['data']), topic_name)
|
len(msg['args']['data']), topic_name)
|
||||||
rpc.cast(context, topic_name, msg)
|
self.local_queue.append((context, topic_name, msg))
|
||||||
|
|
||||||
|
self.flush()
|
||||||
|
|
||||||
|
def flush(self):
|
||||||
|
#note(sileht):
|
||||||
|
# the behavior of rpc.cast call depends of rabbit_max_retries
|
||||||
|
# if rabbit_max_retries <= 0:
|
||||||
|
# it returns only if the msg has been sent on the amqp queue
|
||||||
|
# if rabbit_max_retries > 0:
|
||||||
|
# it raises a exception if rabbitmq is unreachable
|
||||||
|
#
|
||||||
|
# Ugly, but actually the oslo.rpc do a sys.exit(1) instead of a
|
||||||
|
# RPCException, so we catch both until a correct behavior is
|
||||||
|
# implemented in oslo
|
||||||
|
#
|
||||||
|
# the default policy just respect the rabbitmq configuration
|
||||||
|
# nothing special is done if rabbit_max_retries <= 0
|
||||||
|
# and exception is reraised if rabbit_max_retries > 0
|
||||||
|
while self.local_queue:
|
||||||
|
context, topic, msg = self.local_queue[0]
|
||||||
|
try:
|
||||||
|
rpc.cast(context, topic, msg)
|
||||||
|
except (SystemExit, rpc.common.RPCException):
|
||||||
|
if self.policy == 'queue':
|
||||||
|
LOG.warn("Failed to publish counters, queue them")
|
||||||
|
queue_length = len(self.local_queue)
|
||||||
|
if queue_length > self.max_queue_length > 0:
|
||||||
|
count = queue_length - self.max_queue_length
|
||||||
|
self.local_queue = self.local_queue[count:]
|
||||||
|
LOG.warn("Publisher max queue length is exceeded, "
|
||||||
|
"dropping %d oldest counters",
|
||||||
|
count)
|
||||||
|
break
|
||||||
|
|
||||||
|
elif self.policy == 'drop':
|
||||||
|
counters = sum([len(m['args']['data'])
|
||||||
|
for _, _, m in self.local_queue])
|
||||||
|
LOG.warn(
|
||||||
|
"Failed to publish %d counters, dropping them",
|
||||||
|
counters)
|
||||||
|
self.local_queue = []
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
# default, occur only if rabbit_max_retries > 0
|
||||||
|
self.local_queue = []
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
self.local_queue.pop(0)
|
||||||
|
@ -204,11 +204,19 @@ class TestPublish(base.TestCase):
|
|||||||
]
|
]
|
||||||
|
|
||||||
def faux_cast(self, context, topic, msg):
|
def faux_cast(self, context, topic, msg):
|
||||||
self.published.append((topic, msg))
|
if self.rpc_unreachable:
|
||||||
|
#note(sileht): Ugly, but when rabbitmq is unreachable
|
||||||
|
# and rabbitmq_max_retries is not 0
|
||||||
|
# oslo.rpc do a sys.exit(1), so we do the same
|
||||||
|
# things here until this is fixed in oslo
|
||||||
|
raise SystemExit(1)
|
||||||
|
else:
|
||||||
|
self.published.append((topic, msg))
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(TestPublish, self).setUp()
|
super(TestPublish, self).setUp()
|
||||||
self.published = []
|
self.published = []
|
||||||
|
self.rpc_unreachable = False
|
||||||
self.stubs.Set(oslo_rpc, 'cast', self.faux_cast)
|
self.stubs.Set(oslo_rpc, 'cast', self.faux_cast)
|
||||||
|
|
||||||
def test_published(self):
|
def test_published(self):
|
||||||
@ -261,3 +269,118 @@ class TestPublish(base.TestCase):
|
|||||||
cfg.CONF.publisher_rpc.metering_topic + '.' + 'test2', topics)
|
cfg.CONF.publisher_rpc.metering_topic + '.' + 'test2', topics)
|
||||||
self.assertIn(
|
self.assertIn(
|
||||||
cfg.CONF.publisher_rpc.metering_topic + '.' + 'test3', topics)
|
cfg.CONF.publisher_rpc.metering_topic + '.' + 'test3', topics)
|
||||||
|
|
||||||
|
def test_published_with_no_policy(self):
|
||||||
|
self.rpc_unreachable = True
|
||||||
|
publisher = rpc.RPCPublisher(
|
||||||
|
network_utils.urlsplit('rpc://'))
|
||||||
|
self.assertRaises(
|
||||||
|
SystemExit,
|
||||||
|
publisher.publish_counters,
|
||||||
|
None, self.test_data, 'test')
|
||||||
|
self.assertEqual(publisher.policy, 'default')
|
||||||
|
self.assertEqual(len(self.published), 0)
|
||||||
|
self.assertEqual(len(publisher.local_queue), 0)
|
||||||
|
|
||||||
|
def test_published_with_policy_block(self):
|
||||||
|
self.rpc_unreachable = True
|
||||||
|
publisher = rpc.RPCPublisher(
|
||||||
|
network_utils.urlsplit('rpc://?policy=default'))
|
||||||
|
self.assertRaises(
|
||||||
|
SystemExit,
|
||||||
|
publisher.publish_counters,
|
||||||
|
None, self.test_data, 'test')
|
||||||
|
self.assertEqual(len(self.published), 0)
|
||||||
|
self.assertEqual(len(publisher.local_queue), 0)
|
||||||
|
|
||||||
|
def test_published_with_policy_incorrect(self):
|
||||||
|
self.rpc_unreachable = True
|
||||||
|
publisher = rpc.RPCPublisher(
|
||||||
|
network_utils.urlsplit('rpc://?policy=notexist'))
|
||||||
|
self.assertRaises(
|
||||||
|
SystemExit,
|
||||||
|
publisher.publish_counters,
|
||||||
|
None, self.test_data, 'test')
|
||||||
|
self.assertEqual(publisher.policy, 'default')
|
||||||
|
self.assertEqual(len(self.published), 0)
|
||||||
|
self.assertEqual(len(publisher.local_queue), 0)
|
||||||
|
|
||||||
|
def test_published_with_policy_drop_and_rpc_down(self):
|
||||||
|
self.rpc_unreachable = True
|
||||||
|
publisher = rpc.RPCPublisher(
|
||||||
|
network_utils.urlsplit('rpc://?policy=drop'))
|
||||||
|
publisher.publish_counters(None,
|
||||||
|
self.test_data,
|
||||||
|
'test')
|
||||||
|
self.assertEqual(len(self.published), 0)
|
||||||
|
self.assertEqual(len(publisher.local_queue), 0)
|
||||||
|
|
||||||
|
def test_published_with_policy_queue_and_rpc_down(self):
|
||||||
|
self.rpc_unreachable = True
|
||||||
|
publisher = rpc.RPCPublisher(
|
||||||
|
network_utils.urlsplit('rpc://?policy=queue'))
|
||||||
|
publisher.publish_counters(None,
|
||||||
|
self.test_data,
|
||||||
|
'test')
|
||||||
|
self.assertEqual(len(self.published), 0)
|
||||||
|
self.assertEqual(len(publisher.local_queue), 1)
|
||||||
|
|
||||||
|
def test_published_with_policy_queue_and_rpc_down_up(self):
|
||||||
|
self.rpc_unreachable = True
|
||||||
|
publisher = rpc.RPCPublisher(
|
||||||
|
network_utils.urlsplit('rpc://?policy=queue'))
|
||||||
|
publisher.publish_counters(None,
|
||||||
|
self.test_data,
|
||||||
|
'test')
|
||||||
|
self.assertEqual(len(self.published), 0)
|
||||||
|
self.assertEqual(len(publisher.local_queue), 1)
|
||||||
|
|
||||||
|
self.rpc_unreachable = False
|
||||||
|
publisher.publish_counters(None,
|
||||||
|
self.test_data,
|
||||||
|
'test')
|
||||||
|
|
||||||
|
self.assertEqual(len(self.published), 2)
|
||||||
|
self.assertEqual(len(publisher.local_queue), 0)
|
||||||
|
|
||||||
|
def test_published_with_policy_sized_queue_and_rpc_down(self):
|
||||||
|
self.rpc_unreachable = True
|
||||||
|
publisher = rpc.RPCPublisher(
|
||||||
|
network_utils.urlsplit('rpc://?policy=queue&max_queue_length=3'))
|
||||||
|
for i in range(0, 5):
|
||||||
|
publisher.publish_counters(None,
|
||||||
|
self.test_data,
|
||||||
|
'test-%d' % i)
|
||||||
|
self.assertEqual(len(self.published), 0)
|
||||||
|
self.assertEqual(len(publisher.local_queue), 3)
|
||||||
|
self.assertEqual(
|
||||||
|
publisher.local_queue[0][2]['args']['data'][0]['source'],
|
||||||
|
'test-2'
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
publisher.local_queue[1][2]['args']['data'][0]['source'],
|
||||||
|
'test-3'
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
publisher.local_queue[2][2]['args']['data'][0]['source'],
|
||||||
|
'test-4'
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_published_with_policy_default_sized_queue_and_rpc_down(self):
|
||||||
|
self.rpc_unreachable = True
|
||||||
|
publisher = rpc.RPCPublisher(
|
||||||
|
network_utils.urlsplit('rpc://?policy=queue'))
|
||||||
|
for i in range(0, 2000):
|
||||||
|
publisher.publish_counters(None,
|
||||||
|
self.test_data,
|
||||||
|
'test-%d' % i)
|
||||||
|
self.assertEqual(len(self.published), 0)
|
||||||
|
self.assertEqual(len(publisher.local_queue), 1024)
|
||||||
|
self.assertEqual(
|
||||||
|
publisher.local_queue[0][2]['args']['data'][0]['source'],
|
||||||
|
'test-976'
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
publisher.local_queue[1023][2]['args']['data'][0]['source'],
|
||||||
|
'test-1999'
|
||||||
|
)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user