Improves exception handling and logging
1) try to catch all possible exceptions during message acknowledging and dispatching to send ack/nack when we can to do it 2) improve logging in case of exceptions during message acknowledging and dispatching Depends-On: I2d230d49e5aff6ab4d84ab8c3d2834f85e3405eb Change-Id: I41a768c5624fa2212257ce20bf9a67d09de0c4ab
This commit is contained in:
parent
c743b332c6
commit
4df633db74
@ -150,7 +150,8 @@ class NotificationServer(NotificationServerBase):
|
|||||||
try:
|
try:
|
||||||
res = self.dispatcher.dispatch(message)
|
res = self.dispatcher.dispatch(message)
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.error(_LE('Exception during message handling'), exc_info=True)
|
LOG.exception(_LE('Exception during message handling.'))
|
||||||
|
res = notify_dispatcher.NotificationResult.REQUEUE
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if (res == notify_dispatcher.NotificationResult.REQUEUE and
|
if (res == notify_dispatcher.NotificationResult.REQUEUE and
|
||||||
@ -159,7 +160,7 @@ class NotificationServer(NotificationServerBase):
|
|||||||
else:
|
else:
|
||||||
message.acknowledge()
|
message.acknowledge()
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.error(_LE("Fail to ack/requeue message"), exc_info=True)
|
LOG.exception(_LE("Fail to ack/requeue message."))
|
||||||
|
|
||||||
|
|
||||||
class BatchNotificationServer(NotificationServerBase):
|
class BatchNotificationServer(NotificationServerBase):
|
||||||
@ -169,7 +170,7 @@ class BatchNotificationServer(NotificationServerBase):
|
|||||||
not_processed_messages = self.dispatcher.dispatch(incoming)
|
not_processed_messages = self.dispatcher.dispatch(incoming)
|
||||||
except Exception:
|
except Exception:
|
||||||
not_processed_messages = set(incoming)
|
not_processed_messages = set(incoming)
|
||||||
LOG.error(_LE('Exception during message handling'), exc_info=True)
|
LOG.exception(_LE('Exception during messages handling.'))
|
||||||
for m in incoming:
|
for m in incoming:
|
||||||
try:
|
try:
|
||||||
if m in not_processed_messages and self._allow_requeue:
|
if m in not_processed_messages and self._allow_requeue:
|
||||||
@ -177,7 +178,7 @@ class BatchNotificationServer(NotificationServerBase):
|
|||||||
else:
|
else:
|
||||||
m.acknowledge()
|
m.acknowledge()
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.error(_LE("Fail to ack/requeue message"), exc_info=True)
|
LOG.exception(_LE("Fail to ack/requeue message."))
|
||||||
|
|
||||||
|
|
||||||
def get_notification_listener(transport, targets, endpoints,
|
def get_notification_listener(transport, targets, endpoints,
|
||||||
|
@ -122,31 +122,38 @@ class RPCServer(msg_server.MessageHandlingServer):
|
|||||||
|
|
||||||
def _process_incoming(self, incoming):
|
def _process_incoming(self, incoming):
|
||||||
message = incoming[0]
|
message = incoming[0]
|
||||||
message.acknowledge()
|
try:
|
||||||
|
message.acknowledge()
|
||||||
|
except Exception:
|
||||||
|
LOG.exception(_LE("Can not acknowledge message. Skip processing"))
|
||||||
|
return
|
||||||
|
|
||||||
|
failure = None
|
||||||
try:
|
try:
|
||||||
res = self.dispatcher.dispatch(message)
|
res = self.dispatcher.dispatch(message)
|
||||||
except rpc_dispatcher.ExpectedException as e:
|
except rpc_dispatcher.ExpectedException as e:
|
||||||
LOG.debug(u'Expected exception during message handling (%s)',
|
LOG.debug(u'Expected exception during message handling (%s)',
|
||||||
e.exc_info[1])
|
e.exc_info[1])
|
||||||
message.reply(failure=e.exc_info)
|
failure = e.exc_info
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# current sys.exc_info() content can be overriden
|
# current sys.exc_info() content can be overriden
|
||||||
# by another exception raise by a log handler during
|
# by another exception raise by a log handler during
|
||||||
# LOG.exception(). So keep a copy and delete it later.
|
# LOG.exception(). So keep a copy and delete it later.
|
||||||
exc_info = sys.exc_info()
|
failure = sys.exc_info()
|
||||||
try:
|
LOG.exception(_LE('Exception during handling message'))
|
||||||
LOG.exception(_LE('Exception during message handling: %s'), e)
|
|
||||||
message.reply(failure=exc_info)
|
try:
|
||||||
finally:
|
if failure is None:
|
||||||
|
message.reply(res)
|
||||||
|
else:
|
||||||
|
message.reply(failure=failure)
|
||||||
|
except Exception:
|
||||||
|
LOG.exception(_LE("Can not send reply for message"))
|
||||||
|
finally:
|
||||||
# NOTE(dhellmann): Remove circular object reference
|
# NOTE(dhellmann): Remove circular object reference
|
||||||
# between the current stack frame and the traceback in
|
# between the current stack frame and the traceback in
|
||||||
# exc_info.
|
# exc_info.
|
||||||
del exc_info
|
del failure
|
||||||
else:
|
|
||||||
try:
|
|
||||||
message.reply(res)
|
|
||||||
except Exception:
|
|
||||||
LOG.Exception("Can not send reply for message %s", message)
|
|
||||||
|
|
||||||
|
|
||||||
def get_rpc_server(transport, target, endpoints,
|
def get_rpc_server(transport, target, endpoints,
|
||||||
|
Loading…
Reference in New Issue
Block a user