Merge pull request #83 from ramielrowe/message_acking_2

Only ack message if successfully processed
This commit is contained in:
Andrew Melton 2013-04-26 10:17:25 -07:00
commit 76b5953cde
2 changed files with 26 additions and 3 deletions

View File

@ -96,7 +96,6 @@ class NovaConsumerTestCase(unittest.TestCase):
self.mox.VerifyAll()
def test_create_queue_with_queue_args(self):
self.mox.StubOutClassWithMocks(kombu, 'Queue')
exchange = self.mox.CreateMockAnything()
@ -127,14 +126,38 @@ class NovaConsumerTestCase(unittest.TestCase):
args = (routing_key, body_dict)
views.process_raw_data(deployment, args, json.dumps(args))\
.AndReturn(raw)
message.ack()
self.mox.StubOutWithMock(consumer, '_check_memory',
use_mock_anything=True)
use_mock_anything=True)
consumer._check_memory()
self.mox.ReplayAll()
consumer._process(message)
self.assertEqual(consumer.processed, 1)
self.mox.VerifyAll()
def test_process_no_raw_dont_ack(self):
deployment = self.mox.CreateMockAnything()
raw = self.mox.CreateMockAnything()
message = self.mox.CreateMockAnything()
consumer = worker.NovaConsumer('test', None, deployment, True, {})
routing_key = 'monitor.info'
message.delivery_info = {'routing_key': routing_key}
body_dict = {u'key': u'value'}
message.body = json.dumps(body_dict)
self.mox.StubOutWithMock(views, 'process_raw_data',
use_mock_anything=True)
args = (routing_key, body_dict)
views.process_raw_data(deployment, args, json.dumps(args))\
.AndReturn(None)
self.mox.StubOutWithMock(consumer, '_check_memory',
use_mock_anything=True)
consumer._check_memory()
self.mox.ReplayAll()
consumer._process(message)
self.assertEqual(consumer.processed, 0)
self.mox.VerifyAll()
def test_run(self):
config = {
'name': 'east_coast.prod.global',

View File

@ -89,6 +89,7 @@ class NovaConsumer(kombu.mixins.ConsumerMixin):
raw = views.process_raw_data(self.deployment, args, asJson)
if raw:
self.processed += 1
message.ack()
self._check_memory()
@ -125,7 +126,6 @@ class NovaConsumer(kombu.mixins.ConsumerMixin):
self._process(message)
except Exception, e:
LOG.exception("Problem %s" % e)
message.ack()
def continue_running():