From e7e74f7dc311fa2695514ab32584f849aecc6a1d Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Mon, 19 Aug 2013 10:48:18 +0200 Subject: [PATCH] Make RPCPublisher flush method threadsafe This change allow concurrent access to the local queue of the RPCPublisher Fixes bug #1211736 Change-Id: I13371329d40e43f42c357a0893e4023f343a5efa --- ceilometer/publisher/rpc.py | 64 ++++++++++++++++----------- tests/publisher/test_rpc_publisher.py | 27 +++++++++++ 2 files changed, 65 insertions(+), 26 deletions(-) diff --git a/ceilometer/publisher/rpc.py b/ceilometer/publisher/rpc.py index fcaeb4e93..441dbef87 100644 --- a/ceilometer/publisher/rpc.py +++ b/ceilometer/publisher/rpc.py @@ -178,6 +178,29 @@ class RPCPublisher(publisher.PublisherBase): self.flush() def flush(self): + #note(sileht): + # IO of the rpc stuff in handled by eventlet, + # this is why the self.local_queue, is emptied before processing the + # queue and the remaining messages in the queue are added to + # self.local_queue after in case of a other call have already added + # something in the self.local_queue + queue = self.local_queue + self.local_queue = [] + self.local_queue = self._process_queue(queue, self.policy) + \ + self.local_queue + if self.policy == 'queue': + self._check_queue_length() + + def _check_queue_length(self): + queue_length = len(self.local_queue) + if queue_length > self.max_queue_length > 0: + count = queue_length - self.max_queue_length + self.local_queue = self.local_queue[count:] + LOG.warn("Publisher max local_queue length is exceeded, " + "dropping %d oldest counters", count) + + @staticmethod + def _process_queue(queue, policy): #note(sileht): # the behavior of rpc.cast call depends of rabbit_max_retries # if rabbit_max_retries <= 0: @@ -192,33 +215,22 @@ class RPCPublisher(publisher.PublisherBase): # the default policy just respect the rabbitmq configuration # nothing special is done if rabbit_max_retries <= 0 # and exception is reraised if rabbit_max_retries > 0 - while self.local_queue: - context, topic, msg = self.local_queue[0] + while queue: + context, topic, msg = queue[0] try: rpc.cast(context, topic, msg) except (SystemExit, rpc.common.RPCException): - if self.policy == 'queue': - LOG.warn("Failed to publish counters, queue them") - queue_length = len(self.local_queue) - if queue_length > self.max_queue_length > 0: - count = queue_length - self.max_queue_length - self.local_queue = self.local_queue[count:] - LOG.warn("Publisher max queue length is exceeded, " - "dropping %d oldest counters", - count) - break - - elif self.policy == 'drop': - counters = sum([len(m['args']['data']) - for _, _, m in self.local_queue]) - LOG.warn( - "Failed to publish %d counters, dropping them", - counters) - self.local_queue = [] - break - else: - # default, occur only if rabbit_max_retries > 0 - self.local_queue = [] - raise + counters = sum([len(m['args']['data']) for _, _, m in queue]) + if policy == 'queue': + LOG.warn("Failed to publish %s counters, queue them", + counters) + return queue + elif policy == 'drop': + LOG.warn("Failed to publish %d counters, dropping them", + counters) + return [] + # default, occur only if rabbit_max_retries > 0 + raise else: - self.local_queue.pop(0) + queue.pop(0) + return [] diff --git a/tests/publisher/test_rpc_publisher.py b/tests/publisher/test_rpc_publisher.py index a072d8f8b..10f37e50e 100644 --- a/tests/publisher/test_rpc_publisher.py +++ b/tests/publisher/test_rpc_publisher.py @@ -19,6 +19,7 @@ """Tests for ceilometer/publish.py """ +import eventlet import datetime from oslo.config import cfg @@ -265,6 +266,32 @@ class TestPublish(base.TestCase): self.assertIn( cfg.CONF.publisher_rpc.metering_topic + '.' + 'test3', topics) + def test_published_concurrency(self): + """This test the concurrent access to the local queue + of the rpc publisher + """ + + def faux_cast_go(context, topic, msg): + self.published.append((topic, msg)) + + def faux_cast_wait(context, topic, msg): + self.stubs.Set(oslo_rpc, 'cast', faux_cast_go) + eventlet.sleep(1) + self.published.append((topic, msg)) + + self.stubs.Set(oslo_rpc, 'cast', faux_cast_wait) + + publisher = rpc.RPCPublisher(network_utils.urlsplit('rpc://')) + job1 = eventlet.spawn(publisher.publish_samples, None, self.test_data) + job2 = eventlet.spawn(publisher.publish_samples, None, self.test_data) + + job1.wait() + job2.wait() + + self.assertEqual(publisher.policy, 'default') + self.assertEqual(len(self.published), 2) + self.assertEqual(len(publisher.local_queue), 0) + def test_published_with_no_policy(self): self.rpc_unreachable = True publisher = rpc.RPCPublisher(