Use oslo.msg retry API in rpc publisher
This patch removes the ceilometer hack around rabbitmq configuration to use the oslo.messaging API to retry rpc connection. Closes bug #1244698 Co-Authored-By: Ala Rezmerita <ala.rezmerita@cloudwatt.com> Change-Id: I162f2ce7ba7f6644d995dfeef8b0ef5e2727c226
This commit is contained in:
parent
94ebf00429
commit
b629e14111
@ -104,12 +104,13 @@ def get_rpc_server(transport, topic, endpoint):
|
|||||||
serializer=serializer)
|
serializer=serializer)
|
||||||
|
|
||||||
|
|
||||||
def get_rpc_client(transport, **kwargs):
|
def get_rpc_client(transport, retry=None, **kwargs):
|
||||||
"""Return a configured oslo.messaging RPCClient."""
|
"""Return a configured oslo.messaging RPCClient."""
|
||||||
target = oslo.messaging.Target(**kwargs)
|
target = oslo.messaging.Target(**kwargs)
|
||||||
serializer = RequestContextSerializer(JsonPayloadSerializer())
|
serializer = RequestContextSerializer(JsonPayloadSerializer())
|
||||||
return oslo.messaging.RPCClient(transport, target,
|
return oslo.messaging.RPCClient(transport, target,
|
||||||
serializer=serializer)
|
serializer=serializer,
|
||||||
|
retry=retry)
|
||||||
|
|
||||||
|
|
||||||
def get_notification_listener(transport, targets, endpoints,
|
def get_notification_listener(transport, targets, endpoints,
|
||||||
|
@ -23,7 +23,6 @@ import operator
|
|||||||
|
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
import oslo.messaging
|
import oslo.messaging
|
||||||
import oslo.messaging._drivers.common
|
|
||||||
import six
|
import six
|
||||||
import six.moves.urllib.parse as urlparse
|
import six.moves.urllib.parse as urlparse
|
||||||
|
|
||||||
@ -64,30 +63,6 @@ cfg.CONF.register_opts(METER_PUBLISH_NOTIFIER_OPTS,
|
|||||||
cfg.CONF.import_opt('host', 'ceilometer.service')
|
cfg.CONF.import_opt('host', 'ceilometer.service')
|
||||||
|
|
||||||
|
|
||||||
def oslo_messaging_is_rabbit():
|
|
||||||
kombu = ['ceilometer.openstack.common.rpc.impl_kombu',
|
|
||||||
'oslo.messaging._drivers.impl_rabbit:RabbitDriver'
|
|
||||||
'rabbit']
|
|
||||||
return cfg.CONF.rpc_backend in kombu or (
|
|
||||||
cfg.CONF.transport_url and
|
|
||||||
cfg.CONF.transport_url.startswith('rabbit://'))
|
|
||||||
|
|
||||||
|
|
||||||
def override_backend_retry_config(value):
|
|
||||||
"""Override the retry config option native to the configured rpc backend.
|
|
||||||
|
|
||||||
It is done if such a native config option exists.
|
|
||||||
:param value: the value to override
|
|
||||||
"""
|
|
||||||
# TODO(sileht): ultimately we should add to olso a more generic concept
|
|
||||||
# of retry config (i.e. not specific to an individual AMQP provider)
|
|
||||||
# see: https://bugs.launchpad.net/ceilometer/+bug/1244698
|
|
||||||
# and: https://bugs.launchpad.net/oslo.messaging/+bug/1282639
|
|
||||||
if oslo_messaging_is_rabbit():
|
|
||||||
if 'rabbit_max_retries' in cfg.CONF:
|
|
||||||
cfg.CONF.set_override('rabbit_max_retries', value)
|
|
||||||
|
|
||||||
|
|
||||||
@six.add_metaclass(abc.ABCMeta)
|
@six.add_metaclass(abc.ABCMeta)
|
||||||
class MessagingPublisher(publisher.PublisherBase):
|
class MessagingPublisher(publisher.PublisherBase):
|
||||||
|
|
||||||
@ -105,17 +80,15 @@ class MessagingPublisher(publisher.PublisherBase):
|
|||||||
|
|
||||||
self.local_queue = []
|
self.local_queue = []
|
||||||
|
|
||||||
if self.policy in ['queue', 'drop']:
|
if self.policy in ['default', 'queue', 'drop']:
|
||||||
LOG.info(_('Publishing policy set to %s, '
|
|
||||||
'override backend retry config to 1') % self.policy)
|
|
||||||
override_backend_retry_config(1)
|
|
||||||
elif self.policy == 'default':
|
|
||||||
LOG.info(_('Publishing policy set to %s') % self.policy)
|
LOG.info(_('Publishing policy set to %s') % self.policy)
|
||||||
else:
|
else:
|
||||||
LOG.warn(_('Publishing policy is unknown (%s) force to default')
|
LOG.warn(_('Publishing policy is unknown (%s) force to default')
|
||||||
% self.policy)
|
% self.policy)
|
||||||
self.policy = 'default'
|
self.policy = 'default'
|
||||||
|
|
||||||
|
self.retry = 1 if self.policy in ['queue', 'drop'] else None
|
||||||
|
|
||||||
def publish_samples(self, context, samples):
|
def publish_samples(self, context, samples):
|
||||||
"""Publish samples on RPC.
|
"""Publish samples on RPC.
|
||||||
|
|
||||||
@ -169,21 +142,11 @@ class MessagingPublisher(publisher.PublisherBase):
|
|||||||
"dropping %d oldest samples") % count)
|
"dropping %d oldest samples") % count)
|
||||||
|
|
||||||
def _process_queue(self, queue, policy):
|
def _process_queue(self, queue, policy):
|
||||||
# NOTE(sileht):
|
|
||||||
# the behavior of rpc.cast call depends of rabbit_max_retries
|
|
||||||
# if rabbit_max_retries <= 0:
|
|
||||||
# it returns only if the msg has been sent on the amqp queue
|
|
||||||
# if rabbit_max_retries > 0:
|
|
||||||
# it raises an exception if rabbitmq is unreachable
|
|
||||||
#
|
|
||||||
# the default policy just respect the rabbitmq configuration
|
|
||||||
# nothing special is done if rabbit_max_retries <= 0
|
|
||||||
# and exception is reraised if rabbit_max_retries > 0
|
|
||||||
while queue:
|
while queue:
|
||||||
context, topic, meters = queue[0]
|
context, topic, meters = queue[0]
|
||||||
try:
|
try:
|
||||||
self._send(context, topic, meters)
|
self._send(context, topic, meters)
|
||||||
except oslo.messaging._drivers.common.RPCException:
|
except oslo.messaging.MessageDeliveryFailure:
|
||||||
samples = sum([len(m) for __, __, m in queue])
|
samples = sum([len(m) for __, __, m in queue])
|
||||||
if policy == 'queue':
|
if policy == 'queue':
|
||||||
LOG.warn(_("Failed to publish %d samples, queue them"),
|
LOG.warn(_("Failed to publish %d samples, queue them"),
|
||||||
@ -213,7 +176,7 @@ class RPCPublisher(MessagingPublisher):
|
|||||||
|
|
||||||
self.rpc_client = messaging.get_rpc_client(
|
self.rpc_client = messaging.get_rpc_client(
|
||||||
messaging.get_transport(),
|
messaging.get_transport(),
|
||||||
version='1.0'
|
retry=self.retry, version='1.0'
|
||||||
)
|
)
|
||||||
|
|
||||||
def _send(self, context, topic, meters):
|
def _send(self, context, topic, meters):
|
||||||
@ -228,7 +191,8 @@ class NotifierPublisher(MessagingPublisher):
|
|||||||
messaging.get_transport(),
|
messaging.get_transport(),
|
||||||
driver=cfg.CONF.publisher_notifier.metering_driver,
|
driver=cfg.CONF.publisher_notifier.metering_driver,
|
||||||
publisher_id='metering.publisher.%s' % cfg.CONF.host,
|
publisher_id='metering.publisher.%s' % cfg.CONF.host,
|
||||||
topic=cfg.CONF.publisher_notifier.metering_topic
|
topic=cfg.CONF.publisher_notifier.metering_topic,
|
||||||
|
retry=self.retry
|
||||||
)
|
)
|
||||||
|
|
||||||
def _send(self, context, event_type, meters):
|
def _send(self, context, event_type, meters):
|
||||||
|
@ -23,7 +23,6 @@ import eventlet
|
|||||||
import mock
|
import mock
|
||||||
from oslo.config import fixture as fixture_config
|
from oslo.config import fixture as fixture_config
|
||||||
import oslo.messaging
|
import oslo.messaging
|
||||||
import oslo.messaging._drivers.common
|
|
||||||
from oslo.utils import netutils
|
from oslo.utils import netutils
|
||||||
import testscenarios.testcase
|
import testscenarios.testcase
|
||||||
|
|
||||||
@ -209,12 +208,11 @@ class TestPublisher(testscenarios.testcase.WithScenarios,
|
|||||||
def test_published_with_no_policy(self, mylog):
|
def test_published_with_no_policy(self, mylog):
|
||||||
publisher = self.publisher_cls(
|
publisher = self.publisher_cls(
|
||||||
netutils.urlsplit('%s://' % self.protocol))
|
netutils.urlsplit('%s://' % self.protocol))
|
||||||
side_effect = oslo.messaging._drivers.common.RPCException()
|
side_effect = oslo.messaging.MessageDeliveryFailure()
|
||||||
with mock.patch.object(publisher, '_send') as fake_send:
|
with mock.patch.object(publisher, '_send') as fake_send:
|
||||||
fake_send.side_effect = side_effect
|
fake_send.side_effect = side_effect
|
||||||
|
|
||||||
self.assertRaises(
|
self.assertRaises(
|
||||||
oslo.messaging._drivers.common.RPCException,
|
oslo.messaging.MessageDeliveryFailure,
|
||||||
publisher.publish_samples,
|
publisher.publish_samples,
|
||||||
mock.MagicMock(), self.test_data)
|
mock.MagicMock(), self.test_data)
|
||||||
self.assertTrue(mylog.info.called)
|
self.assertTrue(mylog.info.called)
|
||||||
@ -228,11 +226,11 @@ class TestPublisher(testscenarios.testcase.WithScenarios,
|
|||||||
def test_published_with_policy_block(self, mylog):
|
def test_published_with_policy_block(self, mylog):
|
||||||
publisher = self.publisher_cls(
|
publisher = self.publisher_cls(
|
||||||
netutils.urlsplit('%s://?policy=default' % self.protocol))
|
netutils.urlsplit('%s://?policy=default' % self.protocol))
|
||||||
side_effect = oslo.messaging._drivers.common.RPCException()
|
side_effect = oslo.messaging.MessageDeliveryFailure()
|
||||||
with mock.patch.object(publisher, '_send') as fake_send:
|
with mock.patch.object(publisher, '_send') as fake_send:
|
||||||
fake_send.side_effect = side_effect
|
fake_send.side_effect = side_effect
|
||||||
self.assertRaises(
|
self.assertRaises(
|
||||||
oslo.messaging._drivers.common.RPCException,
|
oslo.messaging.MessageDeliveryFailure,
|
||||||
publisher.publish_samples,
|
publisher.publish_samples,
|
||||||
mock.MagicMock(), self.test_data)
|
mock.MagicMock(), self.test_data)
|
||||||
self.assertTrue(mylog.info.called)
|
self.assertTrue(mylog.info.called)
|
||||||
@ -245,11 +243,11 @@ class TestPublisher(testscenarios.testcase.WithScenarios,
|
|||||||
def test_published_with_policy_incorrect(self, mylog):
|
def test_published_with_policy_incorrect(self, mylog):
|
||||||
publisher = self.publisher_cls(
|
publisher = self.publisher_cls(
|
||||||
netutils.urlsplit('%s://?policy=notexist' % self.protocol))
|
netutils.urlsplit('%s://?policy=notexist' % self.protocol))
|
||||||
side_effect = oslo.messaging._drivers.common.RPCException()
|
side_effect = oslo.messaging.MessageDeliveryFailure()
|
||||||
with mock.patch.object(publisher, '_send') as fake_send:
|
with mock.patch.object(publisher, '_send') as fake_send:
|
||||||
fake_send.side_effect = side_effect
|
fake_send.side_effect = side_effect
|
||||||
self.assertRaises(
|
self.assertRaises(
|
||||||
oslo.messaging._drivers.common.RPCException,
|
oslo.messaging.MessageDeliveryFailure,
|
||||||
publisher.publish_samples,
|
publisher.publish_samples,
|
||||||
mock.MagicMock(), self.test_data)
|
mock.MagicMock(), self.test_data)
|
||||||
self.assertTrue(mylog.warn.called)
|
self.assertTrue(mylog.warn.called)
|
||||||
@ -262,7 +260,7 @@ class TestPublisher(testscenarios.testcase.WithScenarios,
|
|||||||
def test_published_with_policy_drop_and_rpc_down(self):
|
def test_published_with_policy_drop_and_rpc_down(self):
|
||||||
publisher = self.publisher_cls(
|
publisher = self.publisher_cls(
|
||||||
netutils.urlsplit('%s://?policy=drop' % self.protocol))
|
netutils.urlsplit('%s://?policy=drop' % self.protocol))
|
||||||
side_effect = oslo.messaging._drivers.common.RPCException()
|
side_effect = oslo.messaging.MessageDeliveryFailure()
|
||||||
with mock.patch.object(publisher, '_send') as fake_send:
|
with mock.patch.object(publisher, '_send') as fake_send:
|
||||||
fake_send.side_effect = side_effect
|
fake_send.side_effect = side_effect
|
||||||
publisher.publish_samples(mock.MagicMock(),
|
publisher.publish_samples(mock.MagicMock(),
|
||||||
@ -275,7 +273,7 @@ class TestPublisher(testscenarios.testcase.WithScenarios,
|
|||||||
def test_published_with_policy_queue_and_rpc_down(self):
|
def test_published_with_policy_queue_and_rpc_down(self):
|
||||||
publisher = self.publisher_cls(
|
publisher = self.publisher_cls(
|
||||||
netutils.urlsplit('%s://?policy=queue' % self.protocol))
|
netutils.urlsplit('%s://?policy=queue' % self.protocol))
|
||||||
side_effect = oslo.messaging._drivers.common.RPCException()
|
side_effect = oslo.messaging.MessageDeliveryFailure()
|
||||||
with mock.patch.object(publisher, '_send') as fake_send:
|
with mock.patch.object(publisher, '_send') as fake_send:
|
||||||
fake_send.side_effect = side_effect
|
fake_send.side_effect = side_effect
|
||||||
|
|
||||||
@ -291,7 +289,7 @@ class TestPublisher(testscenarios.testcase.WithScenarios,
|
|||||||
publisher = self.publisher_cls(
|
publisher = self.publisher_cls(
|
||||||
netutils.urlsplit('%s://?policy=queue' % self.protocol))
|
netutils.urlsplit('%s://?policy=queue' % self.protocol))
|
||||||
|
|
||||||
side_effect = oslo.messaging._drivers.common.RPCException()
|
side_effect = oslo.messaging.MessageDeliveryFailure()
|
||||||
with mock.patch.object(publisher, '_send') as fake_send:
|
with mock.patch.object(publisher, '_send') as fake_send:
|
||||||
fake_send.side_effect = side_effect
|
fake_send.side_effect = side_effect
|
||||||
publisher.publish_samples(mock.MagicMock(),
|
publisher.publish_samples(mock.MagicMock(),
|
||||||
@ -315,7 +313,7 @@ class TestPublisher(testscenarios.testcase.WithScenarios,
|
|||||||
publisher = self.publisher_cls(netutils.urlsplit(
|
publisher = self.publisher_cls(netutils.urlsplit(
|
||||||
'%s://?policy=queue&max_queue_length=3' % self.protocol))
|
'%s://?policy=queue&max_queue_length=3' % self.protocol))
|
||||||
|
|
||||||
side_effect = oslo.messaging._drivers.common.RPCException()
|
side_effect = oslo.messaging.MessageDeliveryFailure()
|
||||||
with mock.patch.object(publisher, '_send') as fake_send:
|
with mock.patch.object(publisher, '_send') as fake_send:
|
||||||
fake_send.side_effect = side_effect
|
fake_send.side_effect = side_effect
|
||||||
for i in range(0, 5):
|
for i in range(0, 5):
|
||||||
@ -342,7 +340,7 @@ class TestPublisher(testscenarios.testcase.WithScenarios,
|
|||||||
publisher = self.publisher_cls(
|
publisher = self.publisher_cls(
|
||||||
netutils.urlsplit('%s://?policy=queue' % self.protocol))
|
netutils.urlsplit('%s://?policy=queue' % self.protocol))
|
||||||
|
|
||||||
side_effect = oslo.messaging._drivers.common.RPCException()
|
side_effect = oslo.messaging.MessageDeliveryFailure()
|
||||||
with mock.patch.object(publisher, '_send') as fake_send:
|
with mock.patch.object(publisher, '_send') as fake_send:
|
||||||
fake_send.side_effect = side_effect
|
fake_send.side_effect = side_effect
|
||||||
for i in range(0, 2000):
|
for i in range(0, 2000):
|
||||||
|
Loading…
Reference in New Issue
Block a user