diff --git a/ceilometer/publisher/rpc.py b/ceilometer/publisher/rpc.py index 2b26676a4..603caad30 100644 --- a/ceilometer/publisher/rpc.py +++ b/ceilometer/publisher/rpc.py @@ -58,6 +58,9 @@ def register_opts(config): register_opts(cfg.CONF) +cfg.CONF.import_opt('rabbit_max_retries', + 'ceilometer.openstack.common.rpc.impl_kombu') + def compute_signature(message, secret): """Return the signature for a message dictionary. @@ -108,10 +111,32 @@ class RPCPublisher(publisher.PublisherBase): def __init__(self, parsed_url): options = urlparse.parse_qs(parsed_url.query) + # the values of the option is a list of url params values + # only take care of the latest one if the option + # is provided more than once self.per_meter_topic = bool(int( options.get('per_meter_topic', [0])[-1])) + self.target = options.get('target', ['record_metering_data'])[0] + self.policy = options.get('policy', ['wait'])[-1] + self.max_queue_length = int(options.get( + 'max_queue_length', [1024])[-1]) + + self.local_queue = [] + + if self.policy in ['queue', 'drop']: + LOG.info('Publishing policy set to %s, \ + override rabbit_max_retries to 1' % self.policy) + cfg.CONF.set_override("rabbit_max_retries", 1) + + elif self.policy == 'default': + LOG.info('Publishing policy set to %s' % self.policy) + else: + LOG.warn('Publishing policy is unknown (%s) force to default' + % self.policy) + self.policy = 'default' + def publish_counters(self, context, counters, source): """Publish counters on RPC. @@ -137,7 +162,7 @@ class RPCPublisher(publisher.PublisherBase): } LOG.audit('Publishing %d counters on %s', len(msg['args']['data']), topic) - rpc.cast(context, topic, msg) + self.local_queue.append((context, topic, msg)) if self.per_meter_topic: for meter_name, meter_list in itertools.groupby( @@ -151,4 +176,52 @@ class RPCPublisher(publisher.PublisherBase): topic_name = topic + '.' + meter_name LOG.audit('Publishing %d counters on %s', len(msg['args']['data']), topic_name) - rpc.cast(context, topic_name, msg) + self.local_queue.append((context, topic_name, msg)) + + self.flush() + + def flush(self): + #note(sileht): + # the behavior of rpc.cast call depends of rabbit_max_retries + # if rabbit_max_retries <= 0: + # it returns only if the msg has been sent on the amqp queue + # if rabbit_max_retries > 0: + # it raises a exception if rabbitmq is unreachable + # + # Ugly, but actually the oslo.rpc do a sys.exit(1) instead of a + # RPCException, so we catch both until a correct behavior is + # implemented in oslo + # + # the default policy just respect the rabbitmq configuration + # nothing special is done if rabbit_max_retries <= 0 + # and exception is reraised if rabbit_max_retries > 0 + while self.local_queue: + context, topic, msg = self.local_queue[0] + try: + rpc.cast(context, topic, msg) + except (SystemExit, rpc.common.RPCException): + if self.policy == 'queue': + LOG.warn("Failed to publish counters, queue them") + queue_length = len(self.local_queue) + if queue_length > self.max_queue_length > 0: + count = queue_length - self.max_queue_length + self.local_queue = self.local_queue[count:] + LOG.warn("Publisher max queue length is exceeded, " + "dropping %d oldest counters", + count) + break + + elif self.policy == 'drop': + counters = sum([len(m['args']['data']) + for _, _, m in self.local_queue]) + LOG.warn( + "Failed to publish %d counters, dropping them", + counters) + self.local_queue = [] + break + else: + # default, occur only if rabbit_max_retries > 0 + self.local_queue = [] + raise + else: + self.local_queue.pop(0) diff --git a/tests/publisher/test_rpc_publisher.py b/tests/publisher/test_rpc_publisher.py index 799df67a1..e79f4d9cd 100644 --- a/tests/publisher/test_rpc_publisher.py +++ b/tests/publisher/test_rpc_publisher.py @@ -204,11 +204,19 @@ class TestPublish(base.TestCase): ] def faux_cast(self, context, topic, msg): - self.published.append((topic, msg)) + if self.rpc_unreachable: + #note(sileht): Ugly, but when rabbitmq is unreachable + # and rabbitmq_max_retries is not 0 + # oslo.rpc do a sys.exit(1), so we do the same + # things here until this is fixed in oslo + raise SystemExit(1) + else: + self.published.append((topic, msg)) def setUp(self): super(TestPublish, self).setUp() self.published = [] + self.rpc_unreachable = False self.stubs.Set(oslo_rpc, 'cast', self.faux_cast) def test_published(self): @@ -261,3 +269,118 @@ class TestPublish(base.TestCase): cfg.CONF.publisher_rpc.metering_topic + '.' + 'test2', topics) self.assertIn( cfg.CONF.publisher_rpc.metering_topic + '.' + 'test3', topics) + + def test_published_with_no_policy(self): + self.rpc_unreachable = True + publisher = rpc.RPCPublisher( + network_utils.urlsplit('rpc://')) + self.assertRaises( + SystemExit, + publisher.publish_counters, + None, self.test_data, 'test') + self.assertEqual(publisher.policy, 'default') + self.assertEqual(len(self.published), 0) + self.assertEqual(len(publisher.local_queue), 0) + + def test_published_with_policy_block(self): + self.rpc_unreachable = True + publisher = rpc.RPCPublisher( + network_utils.urlsplit('rpc://?policy=default')) + self.assertRaises( + SystemExit, + publisher.publish_counters, + None, self.test_data, 'test') + self.assertEqual(len(self.published), 0) + self.assertEqual(len(publisher.local_queue), 0) + + def test_published_with_policy_incorrect(self): + self.rpc_unreachable = True + publisher = rpc.RPCPublisher( + network_utils.urlsplit('rpc://?policy=notexist')) + self.assertRaises( + SystemExit, + publisher.publish_counters, + None, self.test_data, 'test') + self.assertEqual(publisher.policy, 'default') + self.assertEqual(len(self.published), 0) + self.assertEqual(len(publisher.local_queue), 0) + + def test_published_with_policy_drop_and_rpc_down(self): + self.rpc_unreachable = True + publisher = rpc.RPCPublisher( + network_utils.urlsplit('rpc://?policy=drop')) + publisher.publish_counters(None, + self.test_data, + 'test') + self.assertEqual(len(self.published), 0) + self.assertEqual(len(publisher.local_queue), 0) + + def test_published_with_policy_queue_and_rpc_down(self): + self.rpc_unreachable = True + publisher = rpc.RPCPublisher( + network_utils.urlsplit('rpc://?policy=queue')) + publisher.publish_counters(None, + self.test_data, + 'test') + self.assertEqual(len(self.published), 0) + self.assertEqual(len(publisher.local_queue), 1) + + def test_published_with_policy_queue_and_rpc_down_up(self): + self.rpc_unreachable = True + publisher = rpc.RPCPublisher( + network_utils.urlsplit('rpc://?policy=queue')) + publisher.publish_counters(None, + self.test_data, + 'test') + self.assertEqual(len(self.published), 0) + self.assertEqual(len(publisher.local_queue), 1) + + self.rpc_unreachable = False + publisher.publish_counters(None, + self.test_data, + 'test') + + self.assertEqual(len(self.published), 2) + self.assertEqual(len(publisher.local_queue), 0) + + def test_published_with_policy_sized_queue_and_rpc_down(self): + self.rpc_unreachable = True + publisher = rpc.RPCPublisher( + network_utils.urlsplit('rpc://?policy=queue&max_queue_length=3')) + for i in range(0, 5): + publisher.publish_counters(None, + self.test_data, + 'test-%d' % i) + self.assertEqual(len(self.published), 0) + self.assertEqual(len(publisher.local_queue), 3) + self.assertEqual( + publisher.local_queue[0][2]['args']['data'][0]['source'], + 'test-2' + ) + self.assertEqual( + publisher.local_queue[1][2]['args']['data'][0]['source'], + 'test-3' + ) + self.assertEqual( + publisher.local_queue[2][2]['args']['data'][0]['source'], + 'test-4' + ) + + def test_published_with_policy_default_sized_queue_and_rpc_down(self): + self.rpc_unreachable = True + publisher = rpc.RPCPublisher( + network_utils.urlsplit('rpc://?policy=queue')) + for i in range(0, 2000): + publisher.publish_counters(None, + self.test_data, + 'test-%d' % i) + self.assertEqual(len(self.published), 0) + self.assertEqual(len(publisher.local_queue), 1024) + self.assertEqual( + publisher.local_queue[0][2]['args']['data'][0]['source'], + 'test-976' + ) + self.assertEqual( + publisher.local_queue[1023][2]['args']['data'][0]['source'], + 'test-1999' + )