Merge "Make RPCPublisher flush method threadsafe"
This commit is contained in:
commit
5bd759fd69
@ -178,6 +178,29 @@ class RPCPublisher(publisher.PublisherBase):
|
||||
self.flush()
|
||||
|
||||
def flush(self):
|
||||
#note(sileht):
|
||||
# IO of the rpc stuff in handled by eventlet,
|
||||
# this is why the self.local_queue, is emptied before processing the
|
||||
# queue and the remaining messages in the queue are added to
|
||||
# self.local_queue after in case of a other call have already added
|
||||
# something in the self.local_queue
|
||||
queue = self.local_queue
|
||||
self.local_queue = []
|
||||
self.local_queue = self._process_queue(queue, self.policy) + \
|
||||
self.local_queue
|
||||
if self.policy == 'queue':
|
||||
self._check_queue_length()
|
||||
|
||||
def _check_queue_length(self):
|
||||
queue_length = len(self.local_queue)
|
||||
if queue_length > self.max_queue_length > 0:
|
||||
count = queue_length - self.max_queue_length
|
||||
self.local_queue = self.local_queue[count:]
|
||||
LOG.warn("Publisher max local_queue length is exceeded, "
|
||||
"dropping %d oldest counters", count)
|
||||
|
||||
@staticmethod
|
||||
def _process_queue(queue, policy):
|
||||
#note(sileht):
|
||||
# the behavior of rpc.cast call depends of rabbit_max_retries
|
||||
# if rabbit_max_retries <= 0:
|
||||
@ -192,33 +215,22 @@ class RPCPublisher(publisher.PublisherBase):
|
||||
# the default policy just respect the rabbitmq configuration
|
||||
# nothing special is done if rabbit_max_retries <= 0
|
||||
# and exception is reraised if rabbit_max_retries > 0
|
||||
while self.local_queue:
|
||||
context, topic, msg = self.local_queue[0]
|
||||
while queue:
|
||||
context, topic, msg = queue[0]
|
||||
try:
|
||||
rpc.cast(context, topic, msg)
|
||||
except (SystemExit, rpc.common.RPCException):
|
||||
if self.policy == 'queue':
|
||||
LOG.warn("Failed to publish counters, queue them")
|
||||
queue_length = len(self.local_queue)
|
||||
if queue_length > self.max_queue_length > 0:
|
||||
count = queue_length - self.max_queue_length
|
||||
self.local_queue = self.local_queue[count:]
|
||||
LOG.warn("Publisher max queue length is exceeded, "
|
||||
"dropping %d oldest counters",
|
||||
count)
|
||||
break
|
||||
|
||||
elif self.policy == 'drop':
|
||||
counters = sum([len(m['args']['data'])
|
||||
for _, _, m in self.local_queue])
|
||||
LOG.warn(
|
||||
"Failed to publish %d counters, dropping them",
|
||||
counters)
|
||||
self.local_queue = []
|
||||
break
|
||||
else:
|
||||
# default, occur only if rabbit_max_retries > 0
|
||||
self.local_queue = []
|
||||
raise
|
||||
counters = sum([len(m['args']['data']) for _, _, m in queue])
|
||||
if policy == 'queue':
|
||||
LOG.warn("Failed to publish %s counters, queue them",
|
||||
counters)
|
||||
return queue
|
||||
elif policy == 'drop':
|
||||
LOG.warn("Failed to publish %d counters, dropping them",
|
||||
counters)
|
||||
return []
|
||||
# default, occur only if rabbit_max_retries > 0
|
||||
raise
|
||||
else:
|
||||
self.local_queue.pop(0)
|
||||
queue.pop(0)
|
||||
return []
|
||||
|
@ -19,6 +19,7 @@
|
||||
"""Tests for ceilometer/publish.py
|
||||
"""
|
||||
|
||||
import eventlet
|
||||
import datetime
|
||||
from oslo.config import cfg
|
||||
|
||||
@ -265,6 +266,32 @@ class TestPublish(base.TestCase):
|
||||
self.assertIn(
|
||||
cfg.CONF.publisher_rpc.metering_topic + '.' + 'test3', topics)
|
||||
|
||||
def test_published_concurrency(self):
|
||||
"""This test the concurrent access to the local queue
|
||||
of the rpc publisher
|
||||
"""
|
||||
|
||||
def faux_cast_go(context, topic, msg):
|
||||
self.published.append((topic, msg))
|
||||
|
||||
def faux_cast_wait(context, topic, msg):
|
||||
self.stubs.Set(oslo_rpc, 'cast', faux_cast_go)
|
||||
eventlet.sleep(1)
|
||||
self.published.append((topic, msg))
|
||||
|
||||
self.stubs.Set(oslo_rpc, 'cast', faux_cast_wait)
|
||||
|
||||
publisher = rpc.RPCPublisher(network_utils.urlsplit('rpc://'))
|
||||
job1 = eventlet.spawn(publisher.publish_samples, None, self.test_data)
|
||||
job2 = eventlet.spawn(publisher.publish_samples, None, self.test_data)
|
||||
|
||||
job1.wait()
|
||||
job2.wait()
|
||||
|
||||
self.assertEqual(publisher.policy, 'default')
|
||||
self.assertEqual(len(self.published), 2)
|
||||
self.assertEqual(len(publisher.local_queue), 0)
|
||||
|
||||
def test_published_with_no_policy(self):
|
||||
self.rpc_unreachable = True
|
||||
publisher = rpc.RPCPublisher(
|
||||
|
Loading…
Reference in New Issue
Block a user