Update RPC code from OSLO

Fixes bug 1172922

Change-Id: Ieb5f58fe3d2c879bc71f4241288e48e35ab54366
This commit is contained in:
Gary Kotton 2013-05-01 13:47:59 +00:00
parent 5f9721756b
commit 8a381447d0
3 changed files with 14 additions and 13 deletions

View File

@ -276,7 +276,7 @@ def _safe_log(log_func, msg, msg_data):
for elem in arg[:-1]: for elem in arg[:-1]:
d = d[elem] d = d[elem]
d[arg[-1]] = '<SANITIZED>' d[arg[-1]] = '<SANITIZED>'
except KeyError, e: except KeyError as e:
LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'), LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'),
{'item': arg, {'item': arg,
'err': e}) 'err': e})
@ -419,7 +419,7 @@ class ClientException(Exception):
def catch_client_exception(exceptions, func, *args, **kwargs): def catch_client_exception(exceptions, func, *args, **kwargs):
try: try:
return func(*args, **kwargs) return func(*args, **kwargs)
except Exception, e: except Exception as e:
if type(e) in exceptions: if type(e) in exceptions:
raise ClientException() raise ClientException()
else: else:

View File

@ -176,7 +176,7 @@ class ConsumerBase(object):
"""Cancel the consuming from the queue, if it has started""" """Cancel the consuming from the queue, if it has started"""
try: try:
self.queue.cancel(self.tag) self.queue.cancel(self.tag)
except KeyError, e: except KeyError as e:
# NOTE(comstud): Kludge to get around a amqplib bug # NOTE(comstud): Kludge to get around a amqplib bug
if str(e) != "u'%s'" % self.tag: if str(e) != "u'%s'" % self.tag:
raise raise
@ -520,7 +520,7 @@ class Connection(object):
return return
except (IOError, self.connection_errors) as e: except (IOError, self.connection_errors) as e:
pass pass
except Exception, e: except Exception as e:
# NOTE(comstud): Unfortunately it's possible for amqplib # NOTE(comstud): Unfortunately it's possible for amqplib
# to return an error not covered by its transport # to return an error not covered by its transport
# connection_errors in the case of a timeout waiting for # connection_errors in the case of a timeout waiting for
@ -561,10 +561,10 @@ class Connection(object):
while True: while True:
try: try:
return method(*args, **kwargs) return method(*args, **kwargs)
except (self.connection_errors, socket.timeout, IOError), e: except (self.connection_errors, socket.timeout, IOError) as e:
if error_callback: if error_callback:
error_callback(e) error_callback(e)
except Exception, e: except Exception as e:
# NOTE(comstud): Unfortunately it's possible for amqplib # NOTE(comstud): Unfortunately it's possible for amqplib
# to return an error not covered by its transport # to return an error not covered by its transport
# connection_errors in the case of a timeout waiting for # connection_errors in the case of a timeout waiting for

View File

@ -331,22 +331,23 @@ class Connection(object):
def reconnect(self): def reconnect(self):
"""Handles reconnecting and re-establishing sessions and queues""" """Handles reconnecting and re-establishing sessions and queues"""
attempt = 0
delay = 1
while True:
# Close the session if necessary
if self.connection.opened(): if self.connection.opened():
try: try:
self.connection.close() self.connection.close()
except qpid_exceptions.ConnectionError: except qpid_exceptions.ConnectionError:
pass pass
attempt = 0
delay = 1
while True:
broker = self.brokers[attempt % len(self.brokers)] broker = self.brokers[attempt % len(self.brokers)]
attempt += 1 attempt += 1
try: try:
self.connection_create(broker) self.connection_create(broker)
self.connection.open() self.connection.open()
except qpid_exceptions.ConnectionError, e: except qpid_exceptions.ConnectionError as e:
msg_dict = dict(e=e, delay=delay) msg_dict = dict(e=e, delay=delay)
msg = _("Unable to connect to AMQP server: %(e)s. " msg = _("Unable to connect to AMQP server: %(e)s. "
"Sleeping %(delay)s seconds") % msg_dict "Sleeping %(delay)s seconds") % msg_dict