Make RPCPublisher flush method threadsafe
This change allow concurrent access to the local queue of the RPCPublisher Fixes bug #1211736 Change-Id: I13371329d40e43f42c357a0893e4023f343a5efa
This commit is contained in:
parent
98d2c8c3f4
commit
e7e74f7dc3
@ -178,6 +178,29 @@ class RPCPublisher(publisher.PublisherBase):
|
|||||||
self.flush()
|
self.flush()
|
||||||
|
|
||||||
def flush(self):
|
def flush(self):
|
||||||
|
#note(sileht):
|
||||||
|
# IO of the rpc stuff in handled by eventlet,
|
||||||
|
# this is why the self.local_queue, is emptied before processing the
|
||||||
|
# queue and the remaining messages in the queue are added to
|
||||||
|
# self.local_queue after in case of a other call have already added
|
||||||
|
# something in the self.local_queue
|
||||||
|
queue = self.local_queue
|
||||||
|
self.local_queue = []
|
||||||
|
self.local_queue = self._process_queue(queue, self.policy) + \
|
||||||
|
self.local_queue
|
||||||
|
if self.policy == 'queue':
|
||||||
|
self._check_queue_length()
|
||||||
|
|
||||||
|
def _check_queue_length(self):
|
||||||
|
queue_length = len(self.local_queue)
|
||||||
|
if queue_length > self.max_queue_length > 0:
|
||||||
|
count = queue_length - self.max_queue_length
|
||||||
|
self.local_queue = self.local_queue[count:]
|
||||||
|
LOG.warn("Publisher max local_queue length is exceeded, "
|
||||||
|
"dropping %d oldest counters", count)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _process_queue(queue, policy):
|
||||||
#note(sileht):
|
#note(sileht):
|
||||||
# the behavior of rpc.cast call depends of rabbit_max_retries
|
# the behavior of rpc.cast call depends of rabbit_max_retries
|
||||||
# if rabbit_max_retries <= 0:
|
# if rabbit_max_retries <= 0:
|
||||||
@ -192,33 +215,22 @@ class RPCPublisher(publisher.PublisherBase):
|
|||||||
# the default policy just respect the rabbitmq configuration
|
# the default policy just respect the rabbitmq configuration
|
||||||
# nothing special is done if rabbit_max_retries <= 0
|
# nothing special is done if rabbit_max_retries <= 0
|
||||||
# and exception is reraised if rabbit_max_retries > 0
|
# and exception is reraised if rabbit_max_retries > 0
|
||||||
while self.local_queue:
|
while queue:
|
||||||
context, topic, msg = self.local_queue[0]
|
context, topic, msg = queue[0]
|
||||||
try:
|
try:
|
||||||
rpc.cast(context, topic, msg)
|
rpc.cast(context, topic, msg)
|
||||||
except (SystemExit, rpc.common.RPCException):
|
except (SystemExit, rpc.common.RPCException):
|
||||||
if self.policy == 'queue':
|
counters = sum([len(m['args']['data']) for _, _, m in queue])
|
||||||
LOG.warn("Failed to publish counters, queue them")
|
if policy == 'queue':
|
||||||
queue_length = len(self.local_queue)
|
LOG.warn("Failed to publish %s counters, queue them",
|
||||||
if queue_length > self.max_queue_length > 0:
|
counters)
|
||||||
count = queue_length - self.max_queue_length
|
return queue
|
||||||
self.local_queue = self.local_queue[count:]
|
elif policy == 'drop':
|
||||||
LOG.warn("Publisher max queue length is exceeded, "
|
LOG.warn("Failed to publish %d counters, dropping them",
|
||||||
"dropping %d oldest counters",
|
counters)
|
||||||
count)
|
return []
|
||||||
break
|
# default, occur only if rabbit_max_retries > 0
|
||||||
|
raise
|
||||||
elif self.policy == 'drop':
|
|
||||||
counters = sum([len(m['args']['data'])
|
|
||||||
for _, _, m in self.local_queue])
|
|
||||||
LOG.warn(
|
|
||||||
"Failed to publish %d counters, dropping them",
|
|
||||||
counters)
|
|
||||||
self.local_queue = []
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
# default, occur only if rabbit_max_retries > 0
|
|
||||||
self.local_queue = []
|
|
||||||
raise
|
|
||||||
else:
|
else:
|
||||||
self.local_queue.pop(0)
|
queue.pop(0)
|
||||||
|
return []
|
||||||
|
@ -19,6 +19,7 @@
|
|||||||
"""Tests for ceilometer/publish.py
|
"""Tests for ceilometer/publish.py
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import eventlet
|
||||||
import datetime
|
import datetime
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
|
|
||||||
@ -265,6 +266,32 @@ class TestPublish(base.TestCase):
|
|||||||
self.assertIn(
|
self.assertIn(
|
||||||
cfg.CONF.publisher_rpc.metering_topic + '.' + 'test3', topics)
|
cfg.CONF.publisher_rpc.metering_topic + '.' + 'test3', topics)
|
||||||
|
|
||||||
|
def test_published_concurrency(self):
|
||||||
|
"""This test the concurrent access to the local queue
|
||||||
|
of the rpc publisher
|
||||||
|
"""
|
||||||
|
|
||||||
|
def faux_cast_go(context, topic, msg):
|
||||||
|
self.published.append((topic, msg))
|
||||||
|
|
||||||
|
def faux_cast_wait(context, topic, msg):
|
||||||
|
self.stubs.Set(oslo_rpc, 'cast', faux_cast_go)
|
||||||
|
eventlet.sleep(1)
|
||||||
|
self.published.append((topic, msg))
|
||||||
|
|
||||||
|
self.stubs.Set(oslo_rpc, 'cast', faux_cast_wait)
|
||||||
|
|
||||||
|
publisher = rpc.RPCPublisher(network_utils.urlsplit('rpc://'))
|
||||||
|
job1 = eventlet.spawn(publisher.publish_samples, None, self.test_data)
|
||||||
|
job2 = eventlet.spawn(publisher.publish_samples, None, self.test_data)
|
||||||
|
|
||||||
|
job1.wait()
|
||||||
|
job2.wait()
|
||||||
|
|
||||||
|
self.assertEqual(publisher.policy, 'default')
|
||||||
|
self.assertEqual(len(self.published), 2)
|
||||||
|
self.assertEqual(len(publisher.local_queue), 0)
|
||||||
|
|
||||||
def test_published_with_no_policy(self):
|
def test_published_with_no_policy(self):
|
||||||
self.rpc_unreachable = True
|
self.rpc_unreachable = True
|
||||||
publisher = rpc.RPCPublisher(
|
publisher = rpc.RPCPublisher(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user