Implement POP in v1.1 API
This patch implements the pop functionality for v1.1 of the Marconi API. The POP functionality is implemented as an optional parameter in the delete messages using id params endpoint. This includes the following validations, 1. pop & id params cannot be used together in the request. 2. 0 < pop value <= max_messages_per_claim_or_pop docImpact Change-Id: I63b22e57ed5ab398b81cde2d0284767e9704ec11 Implements: blueprint api-v1.1-pop-operation
This commit is contained in:
parent
ddabc98a99
commit
3a59f52d55
@ -168,7 +168,7 @@ class ResponseSchema(api.Api):
|
||||
"type": "array",
|
||||
"items": message,
|
||||
"minItems": 1,
|
||||
"maxItems": self.limits.max_messages_per_claim
|
||||
"maxItems": self.limits.max_messages_per_claim_or_pop
|
||||
}
|
||||
}
|
||||
},
|
||||
|
@ -212,7 +212,7 @@ class ResponseSchema(api.Api):
|
||||
"type": "array",
|
||||
"items": message,
|
||||
"minItems": 0,
|
||||
"maxItems": self.limits.max_messages_per_claim
|
||||
"maxItems": self.limits.max_messages_per_claim_or_pop
|
||||
}
|
||||
}
|
||||
},
|
||||
|
@ -330,6 +330,17 @@ class Message(ControllerBase):
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def pop(self, queue, limit, project=None):
|
||||
"""Base method for popping messages.
|
||||
|
||||
:param queue: Name of the queue to pop
|
||||
message from.
|
||||
:param limit: Number of messages to pop.
|
||||
:param project: Project id
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class Claim(ControllerBase):
|
||||
|
@ -22,6 +22,7 @@ Field Mappings:
|
||||
"""
|
||||
|
||||
import datetime
|
||||
import itertools
|
||||
import time
|
||||
|
||||
from bson import objectid
|
||||
@ -406,7 +407,7 @@ class MessageController(storage.Message):
|
||||
yield iter([])
|
||||
|
||||
messages = self._list(queue_name, project=project, marker=marker,
|
||||
client_uuid=client_uuid, echo=echo,
|
||||
client_uuid=client_uuid, echo=echo,
|
||||
include_claimed=include_claimed, limit=limit)
|
||||
|
||||
marker_id = {}
|
||||
@ -735,6 +736,33 @@ class MessageController(storage.Message):
|
||||
collection = self._collection(queue_name, project)
|
||||
collection.remove(query, w=0)
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
def pop(self, queue_name, limit, project=None):
|
||||
query = {
|
||||
PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
|
||||
}
|
||||
|
||||
# Only include messages that are not part of
|
||||
# any claim, or are part of an expired claim.
|
||||
now = timeutils.utcnow_ts()
|
||||
query['c.e'] = {'$lte': now}
|
||||
|
||||
collection = self._collection(queue_name, project)
|
||||
fields = {'_id': 1, 't': 1, 'b': 1}
|
||||
|
||||
messages = (collection.find_and_modify(query,
|
||||
fields=fields,
|
||||
remove=True)
|
||||
for _ in range(limit))
|
||||
|
||||
messages = itertools.ifilter(None, messages)
|
||||
|
||||
final_messages = [_basic_message(message, now)
|
||||
for message in messages]
|
||||
|
||||
return final_messages
|
||||
|
||||
|
||||
def _basic_message(msg, now):
|
||||
oid = msg['_id']
|
||||
|
@ -254,7 +254,7 @@ class MessageController(RoutingController):
|
||||
message_id=message_id, claim=claim)
|
||||
return None
|
||||
|
||||
def bulk_delete(self, queue, message_ids, project=None):
|
||||
def bulk_delete(self, queue, message_ids=None, project=None):
|
||||
target = self._lookup(queue, project)
|
||||
if target:
|
||||
control = target.message_controller
|
||||
@ -262,6 +262,13 @@ class MessageController(RoutingController):
|
||||
message_ids=message_ids)
|
||||
return None
|
||||
|
||||
def pop(self, queue, limit, project=None):
|
||||
target = self._lookup(queue, project)
|
||||
if target:
|
||||
control = target.message_controller
|
||||
return control.pop(queue, project=project, limit=limit)
|
||||
return None
|
||||
|
||||
def bulk_get(self, queue, message_ids, project=None):
|
||||
target = self._lookup(queue, project)
|
||||
if target:
|
||||
|
@ -156,7 +156,6 @@ class MessageController(storage.Message):
|
||||
|
||||
if project is None:
|
||||
project = ''
|
||||
|
||||
with self.driver.trans() as trans:
|
||||
sel = sa.sql.select([tables.Messages.c.id,
|
||||
tables.Messages.c.body,
|
||||
@ -294,3 +293,49 @@ class MessageController(storage.Message):
|
||||
tables.Messages.c.qid == qid]
|
||||
|
||||
trans.execute(statement.where(sa.and_(*and_stmt)))
|
||||
|
||||
def pop(self, queue_name, limit, project=None):
|
||||
if project is None:
|
||||
project = ''
|
||||
|
||||
with self.driver.trans() as trans:
|
||||
sel = sa.sql.select([tables.Messages.c.id,
|
||||
tables.Messages.c.body,
|
||||
tables.Messages.c.ttl,
|
||||
tables.Messages.c.created])
|
||||
|
||||
j = sa.join(tables.Messages, tables.Queues,
|
||||
tables.Messages.c.qid == tables.Queues.c.id)
|
||||
|
||||
sel = sel.select_from(j)
|
||||
and_clause = [tables.Queues.c.name == queue_name,
|
||||
tables.Queues.c.project == project]
|
||||
|
||||
and_clause.append(tables.Messages.c.cid == (None))
|
||||
|
||||
sel = sel.where(sa.and_(*and_clause))
|
||||
sel = sel.limit(limit)
|
||||
|
||||
records = trans.execute(sel)
|
||||
now = timeutils.utcnow_ts()
|
||||
messages = []
|
||||
message_ids = []
|
||||
for id, body, ttl, created in records:
|
||||
messages.append({
|
||||
'id': utils.msgid_encode(id),
|
||||
'ttl': ttl,
|
||||
'age': now - calendar.timegm(created.timetuple()),
|
||||
'body': utils.json_decode(body),
|
||||
})
|
||||
message_ids.append(id)
|
||||
|
||||
statement = tables.Messages.delete()
|
||||
|
||||
qid = utils.get_qid(self.driver, queue_name, project)
|
||||
|
||||
and_stmt = [tables.Messages.c.id.in_(message_ids),
|
||||
tables.Messages.c.qid == qid]
|
||||
|
||||
trans.execute(statement.where(sa.and_(*and_stmt)))
|
||||
|
||||
return messages
|
||||
|
@ -23,7 +23,6 @@ MIN_MESSAGE_TTL = 60
|
||||
MIN_CLAIM_TTL = 60
|
||||
MIN_CLAIM_GRACE = 60
|
||||
|
||||
|
||||
_TRANSPORT_LIMITS_OPTIONS = (
|
||||
cfg.IntOpt('max_queues_per_page', default=20,
|
||||
deprecated_name='queue_paging_uplimit',
|
||||
@ -32,7 +31,10 @@ _TRANSPORT_LIMITS_OPTIONS = (
|
||||
deprecated_name='message_paging_uplimit',
|
||||
deprecated_group='limits:transport'),
|
||||
|
||||
cfg.IntOpt('max_messages_per_claim', default=20),
|
||||
cfg.IntOpt('max_messages_per_claim_or_pop', default=20,
|
||||
deprecated_name='max_messages_per_claim',
|
||||
help='The maximum number of messages that can be claimed (OR) '
|
||||
'popped in a single request'),
|
||||
|
||||
cfg.IntOpt('max_queue_metadata', default=64 * 1024,
|
||||
deprecated_name='metadata_size_uplimit',
|
||||
@ -184,6 +186,43 @@ class Validator(object):
|
||||
raise ValidationFailed(
|
||||
msg, self._limits_conf.max_messages_per_page)
|
||||
|
||||
def message_deletion(self, ids=None, pop=None):
|
||||
"""Restrictions involving deletion of messages.
|
||||
|
||||
:param ids: message ids passed in by the delete request
|
||||
:param pop: count of messages to be POPped
|
||||
:raises: ValidationFailed if,
|
||||
pop AND id params are present together
|
||||
neither pop or id params are present
|
||||
message count to be popped > maximum allowed
|
||||
"""
|
||||
|
||||
if pop is not None and ids is not None:
|
||||
msg = _(u'pop and id params cannot be present together in the '
|
||||
'delete request.')
|
||||
|
||||
raise ValidationFailed(msg)
|
||||
|
||||
if pop is None and ids is None:
|
||||
msg = _(u'The request should have either "ids" or "pop" '
|
||||
'parameter in the request, to be able to delete.')
|
||||
|
||||
raise ValidationFailed(msg)
|
||||
|
||||
pop_uplimit = self._limits_conf.max_messages_per_claim_or_pop
|
||||
if pop is not None and not (0 < pop <= pop_uplimit):
|
||||
msg = _(u'Pop value must be at least 1 and may not '
|
||||
'be greater than {0}.')
|
||||
|
||||
raise ValidationFailed(msg, pop_uplimit)
|
||||
|
||||
delete_uplimit = self._limits_conf.max_messages_per_page
|
||||
if ids is not None and not (0 < len(ids) <= delete_uplimit):
|
||||
msg = _(u'ids parameter should have at least 1 and not '
|
||||
'greater than {0} values.')
|
||||
|
||||
raise ValidationFailed(msg, delete_uplimit)
|
||||
|
||||
def claim_creation(self, metadata, limit=None):
|
||||
"""Restrictions on the claim parameters upon creation.
|
||||
|
||||
@ -195,13 +234,13 @@ class Validator(object):
|
||||
|
||||
self.claim_updating(metadata)
|
||||
|
||||
uplimit = self._limits_conf.max_messages_per_claim
|
||||
uplimit = self._limits_conf.max_messages_per_claim_or_pop
|
||||
if limit is not None and not (0 < limit <= uplimit):
|
||||
msg = _(u'Limit must be at least 1 and may not '
|
||||
'be greater than {0}.')
|
||||
|
||||
raise ValidationFailed(
|
||||
msg, self._limits_conf.max_messages_per_claim)
|
||||
msg, self._limits_conf.max_messages_per_claim_or_pop)
|
||||
|
||||
grace = metadata['grace']
|
||||
|
||||
|
@ -230,27 +230,65 @@ class CollectionResource(object):
|
||||
# status defaults to 200
|
||||
|
||||
def on_delete(self, req, resp, project_id, queue_name):
|
||||
# NOTE(zyuan): Attempt to delete the whole message collection
|
||||
# (without an "ids" parameter) is not allowed
|
||||
ids = req.get_param_as_list('ids', required=True)
|
||||
LOG.debug(u'Messages collection DELETE - queue: %(queue)s, '
|
||||
u'project: %(project)s',
|
||||
{'queue': queue_name, 'project': project_id})
|
||||
|
||||
ids = req.get_param_as_list('ids')
|
||||
pop_limit = req.get_param_as_int('pop')
|
||||
try:
|
||||
self._validate.message_listing(limit=len(ids))
|
||||
self.message_controller.bulk_delete(
|
||||
queue_name,
|
||||
message_ids=ids,
|
||||
project=project_id)
|
||||
self._validate.message_deletion(ids, pop_limit)
|
||||
|
||||
except validation.ValidationFailed as ex:
|
||||
LOG.debug(ex)
|
||||
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
|
||||
|
||||
if ids:
|
||||
resp.status = self._delete_messages_by_id(queue_name, ids,
|
||||
project_id)
|
||||
|
||||
elif pop_limit:
|
||||
resp.status, resp.body = self._pop_messages(queue_name,
|
||||
project_id,
|
||||
pop_limit)
|
||||
|
||||
def _delete_messages_by_id(self, queue_name, ids, project_id):
|
||||
try:
|
||||
self.message_controller.bulk_delete(
|
||||
queue_name,
|
||||
message_ids=ids,
|
||||
project=project_id)
|
||||
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
description = _(u'Messages could not be deleted.')
|
||||
raise wsgi_errors.HTTPServiceUnavailable(description)
|
||||
|
||||
resp.status = falcon.HTTP_204
|
||||
return falcon.HTTP_204
|
||||
|
||||
def _pop_messages(self, queue_name, project_id, pop_limit):
|
||||
try:
|
||||
LOG.debug(u'POP messages - queue: %(queue)s, '
|
||||
u'project: %(project)s',
|
||||
{'queue': queue_name, 'project': project_id})
|
||||
|
||||
messages = self.message_controller.pop(
|
||||
queue_name,
|
||||
project=project_id,
|
||||
limit=pop_limit)
|
||||
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
description = _(u'Messages could not be popped.')
|
||||
raise wsgi_errors.HTTPServiceUnavailable(description)
|
||||
|
||||
# Prepare response
|
||||
if not messages:
|
||||
messages = []
|
||||
body = {'messages': messages}
|
||||
body = utils.to_json(body)
|
||||
|
||||
return falcon.HTTP_200, body
|
||||
|
||||
|
||||
class ItemResource(object):
|
||||
@ -290,6 +328,7 @@ class ItemResource(object):
|
||||
# status defaults to 200
|
||||
|
||||
def on_delete(self, req, resp, project_id, queue_name, message_id):
|
||||
|
||||
LOG.debug(u'Messages item DELETE - message: %(message)s, '
|
||||
u'queue: %(queue)s, project: %(project)s',
|
||||
{'message': message_id,
|
||||
|
@ -100,6 +100,9 @@ class MessageController(storage.Message):
|
||||
def post(self, queue, messages, project=None):
|
||||
raise NotImplementedError()
|
||||
|
||||
def pop(self, queue, pop_limit, project=None):
|
||||
raise NotImplementedError()
|
||||
|
||||
def delete(self, queue, message_id, project=None, claim=None):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
@ -121,7 +121,7 @@ class FunctionalTestBase(testing.TestBase):
|
||||
:param result_json: json response returned for Queue Stats.
|
||||
:param claimed: expected number of claimed messages.
|
||||
"""
|
||||
total = self.limits.max_messages_per_claim
|
||||
total = self.limits.max_messages_per_claim_or_pop
|
||||
free = total - claimed
|
||||
|
||||
self.assertEqual(result_json['messages']['claimed'], claimed)
|
||||
|
@ -133,7 +133,7 @@ class V1_1Base(TestBase):
|
||||
def _empty_message_list(self, body):
|
||||
self.assertEqual(jsonutils.loads(body[0])['messages'], [])
|
||||
|
||||
def simulate_request(self, path, **kwargs):
|
||||
def simulate_request(self, path, project_id=None, **kwargs):
|
||||
"""Simulate a request.
|
||||
|
||||
Simulates a WSGI request to the API for testing.
|
||||
@ -143,6 +143,11 @@ class V1_1Base(TestBase):
|
||||
|
||||
:returns: standard WSGI iterable response
|
||||
"""
|
||||
if project_id is not None:
|
||||
headers = dict(kwargs['headers']) if 'headers' in kwargs else {}
|
||||
headers['X-Project-ID'] = project_id
|
||||
kwargs['headers'] = headers
|
||||
|
||||
return self.app(ftest.create_environ(path=path, **kwargs),
|
||||
self.srmock)
|
||||
|
||||
|
@ -40,8 +40,8 @@ port = 8888
|
||||
# Maximum number of messages per page when listing messages.
|
||||
;max_messages_per_page = 20
|
||||
|
||||
# Maximum number of messages that can be claimed at a time.
|
||||
;max_messages_per_claim = 20
|
||||
# Maximum number of messages that can be claimed or popped at a time.
|
||||
;max_messages_per_claim_or_pop = 20
|
||||
|
||||
# Expiration limits; the minimal values are all 60 (seconds)
|
||||
;max_message_ttl = 1209600
|
||||
|
@ -43,6 +43,7 @@ class TestClaims(base.V1FunctionalTestBase):
|
||||
|
||||
# Post Messages
|
||||
url = self.queue_url + '/messages'
|
||||
|
||||
doc = helpers.create_message_body(
|
||||
messagecount=self.limits.max_messages_per_page)
|
||||
|
||||
@ -52,7 +53,8 @@ class TestClaims(base.V1FunctionalTestBase):
|
||||
@ddt.data({}, dict(limit=2))
|
||||
def test_claim_messages(self, params):
|
||||
"""Claim messages."""
|
||||
message_count = params.get('limit', self.limits.max_messages_per_claim)
|
||||
message_count = params.get('limit',
|
||||
self.limits.max_messages_per_claim_or_pop)
|
||||
|
||||
doc = {"ttl": 300, "grace": 100}
|
||||
|
||||
@ -90,7 +92,7 @@ class TestClaims(base.V1FunctionalTestBase):
|
||||
|
||||
Marconi allows a maximum of 20 messages per claim by default.
|
||||
"""
|
||||
params = {"limit": self.limits.max_messages_per_claim + 1}
|
||||
params = {"limit": self.limits.max_messages_per_claim_or_pop + 1}
|
||||
doc = {"ttl": 300, "grace": 100}
|
||||
|
||||
result = self.client.post(params=params, data=doc)
|
||||
|
@ -380,7 +380,7 @@ class TestQueueMisc(base.V1FunctionalTestBase):
|
||||
|
||||
# Post Messages to the test queue
|
||||
doc = helpers.create_message_body(
|
||||
messagecount=self.limits.max_messages_per_claim)
|
||||
messagecount=self.limits.max_messages_per_claim_or_pop)
|
||||
|
||||
message_url = self.queue_url + '/messages'
|
||||
result = self.client.post(message_url, data=doc)
|
||||
|
@ -55,7 +55,8 @@ class TestClaims(base.V1_1FunctionalTestBase):
|
||||
@ddt.data({}, {'limit': 2})
|
||||
def test_claim_messages(self, params):
|
||||
"""Claim messages."""
|
||||
message_count = params.get('limit', self.limits.max_messages_per_claim)
|
||||
message_count = params.get('limit',
|
||||
self.limits.max_messages_per_claim_or_pop)
|
||||
|
||||
doc = {"ttl": 300, "grace": 100}
|
||||
|
||||
@ -90,7 +91,7 @@ class TestClaims(base.V1_1FunctionalTestBase):
|
||||
|
||||
Marconi allows a maximum of 20 messages per claim by default.
|
||||
"""
|
||||
params = {"limit": self.limits.max_messages_per_claim + 1}
|
||||
params = {"limit": self.limits.max_messages_per_claim_or_pop + 1}
|
||||
doc = {"ttl": 300, "grace": 100}
|
||||
|
||||
result = self.client.post(params=params, data=doc)
|
||||
|
@ -43,74 +43,13 @@ class TestMessages(base.V1_1FunctionalTestBase):
|
||||
self.client.headers = self.headers
|
||||
|
||||
self.client.put(self.queue_url) # Create the queue
|
||||
self.message_url = self.queue_url+'/messages'
|
||||
self.message_url = self.queue_url + '/messages'
|
||||
self.client.set_base_url(self.message_url)
|
||||
|
||||
def tearDown(self):
|
||||
self.client.delete(self.queue_url) # Remove the queue
|
||||
super(TestMessages, self).tearDown()
|
||||
|
||||
# TODO(abettadapur) Not sure if return format is right
|
||||
def test_message_single_pop(self):
|
||||
"""Pop a message."""
|
||||
# Setup
|
||||
self.skipTest("Not supported")
|
||||
doc = helpers.create_message_body(messagecount=1)
|
||||
result = self.client.post(data=doc)
|
||||
href = result.json()['links'][0]['href']
|
||||
index = href.find('/')
|
||||
id = href[index+1:]
|
||||
self.assertEqual(result.status_code, 200)
|
||||
|
||||
# Pop a message, compare the HREFs. They should be the same
|
||||
url = self.message_url+'?pop=1'
|
||||
result = self.client.delete(url)
|
||||
claimid = result.json()['messages'][0]['id']
|
||||
self.assertEqual(claimid, id)
|
||||
# Make sure there are no messages left in the queue
|
||||
result = self.client.get(self.message_url)
|
||||
self.assertEqual(result.status_code, 204)
|
||||
|
||||
# TODO(abettadapur) Not sure if return format is right
|
||||
def test_message_bulk_pop(self):
|
||||
"""Pop multiple messages."""
|
||||
# Setup
|
||||
self.skipTest("Not supported")
|
||||
doc = helpers.create_message_body(messagecount=10)
|
||||
result = self.client.post(data=doc)
|
||||
links = result.json()["links"]
|
||||
|
||||
# Gather inserted ids
|
||||
ids = []
|
||||
for item in links:
|
||||
href = item['href']
|
||||
index = href.find('/')
|
||||
ids.append(href[index:])
|
||||
|
||||
# Pop the 10 messages
|
||||
url = self.message_url+'?pop=10'
|
||||
result = self.client.delete(url)
|
||||
self.assertEqual(result.status_code, 200)
|
||||
claims = result.json()
|
||||
|
||||
# compare the HREFS
|
||||
match = True
|
||||
for i in range(0, 10):
|
||||
if ids[i] != claims['messages'][i]['id']:
|
||||
match = False
|
||||
break
|
||||
|
||||
self.assertEqual(match, True)
|
||||
# Make sure there are no messages left in the queue
|
||||
result = self.client.get(self.message_url)
|
||||
self.assertEqual(result.status_code, 204)
|
||||
|
||||
def test_message_pop_too_high(self):
|
||||
self.skipTest("Not supported")
|
||||
url = self.message_url+'?pop=21'
|
||||
result = self.client.delete(url)
|
||||
self.assertEqual(result.status_code, 400)
|
||||
|
||||
def _post_large_bulk_insert(self, offset):
|
||||
"""Insert just under than max allowed messages."""
|
||||
|
||||
@ -301,6 +240,117 @@ class TestMessages(base.V1_1FunctionalTestBase):
|
||||
|
||||
test_message_partial_delete.tags = ['negative']
|
||||
|
||||
@ddt.data(5, 1)
|
||||
def test_messages_pop(self, limit=5):
|
||||
"""Pop messages from a queue."""
|
||||
doc = helpers.create_message_body(messagecount=limit)
|
||||
result = self.client.post(data=doc)
|
||||
|
||||
self.assertEqual(result.status_code, 201)
|
||||
|
||||
# Pop messages
|
||||
url = self.message_url + '?pop=' + str(limit)
|
||||
|
||||
result = self.client.delete(url)
|
||||
self.assertEqual(result.status_code, 200)
|
||||
|
||||
params = {'echo': True}
|
||||
|
||||
result = self.client.get(self.message_url, params=params)
|
||||
self.assertEqual(result.status_code, 200)
|
||||
|
||||
messages = result.json()['messages']
|
||||
self.assertEqual(messages, [])
|
||||
|
||||
test_messages_pop.tags = ['smoke', 'positive']
|
||||
|
||||
@ddt.data(10000000, 0, -1)
|
||||
def test_messages_pop_invalid(self, limit):
|
||||
"""Pop messages from a queue."""
|
||||
doc = helpers.create_message_body(
|
||||
messagecount=self.limits.max_messages_per_page)
|
||||
result = self.client.post(data=doc)
|
||||
|
||||
self.assertEqual(result.status_code, 201)
|
||||
|
||||
# Pop messages
|
||||
url = self.message_url + '?pop=' + str(limit)
|
||||
|
||||
result = self.client.delete(url)
|
||||
self.assertEqual(result.status_code, 400)
|
||||
|
||||
params = {'echo': True}
|
||||
result = self.client.get(self.message_url, params=params)
|
||||
self.assertEqual(result.status_code, 200)
|
||||
|
||||
messages = result.json()['messages']
|
||||
self.assertNotEqual(messages, [])
|
||||
|
||||
test_messages_pop_invalid.tags = ['smoke', 'negative']
|
||||
|
||||
def test_messages_delete_pop_and_id(self):
|
||||
"""Delete messages with pop & id params in the request."""
|
||||
doc = helpers.create_message_body(
|
||||
messagecount=1)
|
||||
result = self.client.post(data=doc)
|
||||
|
||||
self.assertEqual(result.status_code, 201)
|
||||
location = result.headers['Location']
|
||||
|
||||
# Pop messages
|
||||
url = self.cfg.marconi.url + location + '&pop=1'
|
||||
|
||||
result = self.client.delete(url)
|
||||
self.assertEqual(result.status_code, 400)
|
||||
|
||||
params = {'echo': True}
|
||||
|
||||
result = self.client.get(self.message_url, params=params)
|
||||
self.assertEqual(result.status_code, 200)
|
||||
|
||||
messages = result.json()['messages']
|
||||
self.assertNotEqual(messages, [])
|
||||
|
||||
test_messages_delete_pop_and_id.tags = ['smoke', 'negative']
|
||||
|
||||
def test_messages_pop_empty_queue(self):
|
||||
"""Pop messages from an empty queue."""
|
||||
url = self.message_url + '?pop=2'
|
||||
|
||||
result = self.client.delete(url)
|
||||
self.assertEqual(result.status_code, 200)
|
||||
|
||||
messages = result.json()['messages']
|
||||
self.assertEqual(messages, [])
|
||||
|
||||
test_messages_pop_empty_queue.tags = ['smoke', 'positive']
|
||||
|
||||
def test_messages_pop_one(self):
|
||||
"""Pop single messages from a queue."""
|
||||
doc = helpers.create_message_body(
|
||||
messagecount=self.limits.max_messages_per_page)
|
||||
result = self.client.post(data=doc)
|
||||
|
||||
self.assertEqual(result.status_code, 201)
|
||||
|
||||
# Pop Single Message
|
||||
url = self.message_url + '?pop=1'
|
||||
|
||||
result = self.client.delete(url)
|
||||
self.assertEqual(result.status_code, 200)
|
||||
|
||||
# Get messages from the queue & verify message count
|
||||
params = {'echo': True, 'limit': self.limits.max_messages_per_page}
|
||||
|
||||
result = self.client.get(self.message_url, params=params)
|
||||
self.assertEqual(result.status_code, 200)
|
||||
|
||||
expected_msg_count = self.limits.max_messages_per_page - 1
|
||||
actual_msg_count = len(result.json()['messages'])
|
||||
self.assertEqual(expected_msg_count, actual_msg_count)
|
||||
|
||||
test_messages_pop_one.tags = ['smoke', 'positive']
|
||||
|
||||
def test_message_partial_get(self):
|
||||
"""Get Messages will be partially successful."""
|
||||
doc = helpers.create_message_body(messagecount=3)
|
||||
|
@ -308,7 +308,7 @@ class TestQueueMisc(base.V1_1FunctionalTestBase):
|
||||
|
||||
# Post Messages to the test queue
|
||||
doc = helpers.create_message_body(
|
||||
messagecount=self.limits.max_messages_per_claim)
|
||||
messagecount=self.limits.max_messages_per_claim_or_pop)
|
||||
|
||||
message_url = self.queue_url + '/messages'
|
||||
result = self.client.post(message_url, data=doc)
|
||||
|
@ -334,6 +334,38 @@ class MongodbMessageTests(base.MessageControllerTest):
|
||||
|
||||
self.assertEqual(actual_ids, expected_ids)
|
||||
|
||||
def test_pop_messages(self):
|
||||
queue_name = 'pop-queue-test'
|
||||
self.queue_controller.create(queue_name, self.project)
|
||||
messages = [
|
||||
{
|
||||
'ttl': 60,
|
||||
'body': {
|
||||
'event': 'BackupStarted',
|
||||
'backupId': 'c378813c-3f0b-11e2-ad92-7823d2b0f3ce',
|
||||
},
|
||||
},
|
||||
{
|
||||
'ttl': 60,
|
||||
'body': {
|
||||
'event': 'BackupStarted',
|
||||
'backupId': 'd378813c-3f0b-11e2-ad92-7823d2b0f3ce',
|
||||
},
|
||||
},
|
||||
{
|
||||
'ttl': 60,
|
||||
'body': {
|
||||
'event': 'BackupStarted',
|
||||
'backupId': 'e378813c-3f0b-11e2-ad92-7823d2b0f3ce',
|
||||
},
|
||||
},
|
||||
]
|
||||
self.controller.post(queue_name, messages, uuid.uuid1(), self.project)
|
||||
message_popped = self.controller.pop(queue_name,
|
||||
limit=1,
|
||||
project=self.project)
|
||||
self.assertEqual(len(message_popped), 1)
|
||||
|
||||
def test_empty_queue_exception(self):
|
||||
self.assertRaises(storage.errors.QueueIsEmpty,
|
||||
self.controller.first,
|
||||
|
@ -73,6 +73,42 @@ class SqlalchemyMessageTests(base.MessageControllerTest):
|
||||
self.controller.first,
|
||||
queue_name, None, sort=1)
|
||||
|
||||
def test_pop_message(self):
|
||||
queue_name = 'pop-message-test'
|
||||
self.queue_controller.create(queue_name, self.project)
|
||||
messages = [
|
||||
{
|
||||
'ttl': 60,
|
||||
'body': {
|
||||
'event': 'BackupStarted',
|
||||
'backupId': 'c378813c-3f0b-11e2-ad92-7823d2b0f3ce',
|
||||
},
|
||||
},
|
||||
{
|
||||
'ttl': 60,
|
||||
'body': {
|
||||
'event': 'BackupStarted',
|
||||
'backupId': 'd378813c-3f0b-11e2-ad92-7823d2b0f3ce',
|
||||
},
|
||||
},
|
||||
{
|
||||
'ttl': 60,
|
||||
'body': {
|
||||
'event': 'BackupStarted',
|
||||
'backupId': 'e378813c-3f0b-11e2-ad92-7823d2b0f3ce',
|
||||
},
|
||||
},
|
||||
]
|
||||
uuid = '33a7ce80-0892-11e4-9d5d-28cfe91478b9'
|
||||
self.controller.post(queue_name, messages, uuid,
|
||||
self.project)
|
||||
|
||||
# Test Message Pop
|
||||
message_popped = self.controller.pop(queue_name,
|
||||
limit=1,
|
||||
project=self.project)
|
||||
self.assertEqual(len(message_popped), 1)
|
||||
|
||||
|
||||
class SqlalchemyClaimTests(base.ClaimControllerTest):
|
||||
driver_class = sqlalchemy.DataDriver
|
||||
|
@ -11,9 +11,12 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations under
|
||||
# the License.
|
||||
import uuid
|
||||
|
||||
import ddt
|
||||
import falcon
|
||||
|
||||
from marconi.openstack.common import jsonutils
|
||||
from marconi.tests.queues.transport.wsgi import base
|
||||
from marconi.tests.queues.transport.wsgi import v1_1
|
||||
|
||||
@ -127,3 +130,99 @@ class TestHealth(base.V1_1Base):
|
||||
response = self.simulate_head('/v1.1/health')
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_204)
|
||||
self.assertEqual(response, [])
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class TestMessages(base.V1_1Base):
|
||||
|
||||
config_file = 'wsgi_sqlalchemy.conf'
|
||||
|
||||
def setUp(self):
|
||||
super(TestMessages, self).setUp()
|
||||
|
||||
self.queue_path = '/v1.1/queues/test-queue'
|
||||
self.messages_path = self.queue_path + '/messages'
|
||||
|
||||
self.project_id = 'e8ba1038'
|
||||
self.headers = {'Client-ID': str(uuid.uuid4())}
|
||||
self.simulate_put(self.queue_path, self.project_id)
|
||||
|
||||
def tearDown(self):
|
||||
self.simulate_delete(self.queue_path, self.project_id)
|
||||
|
||||
super(TestMessages, self).tearDown()
|
||||
|
||||
def _post_messages(self, target, repeat=1):
|
||||
doc = jsonutils.dumps([{'body': 239, 'ttl': 300}] * repeat)
|
||||
return self.simulate_post(target, self.project_id, body=doc,
|
||||
headers=self.headers)
|
||||
|
||||
def _get_msg_id(self, headers):
|
||||
return self._get_msg_ids(headers)[0]
|
||||
|
||||
def _get_msg_ids(self, headers):
|
||||
return headers['Location'].rsplit('=', 1)[-1].split(',')
|
||||
|
||||
@ddt.data(1, 2, 10)
|
||||
def test_pop(self, message_count):
|
||||
|
||||
self._post_messages(self.messages_path, repeat=message_count)
|
||||
msg_id = self._get_msg_id(self.srmock.headers_dict)
|
||||
target = self.messages_path + '/' + msg_id
|
||||
|
||||
self.simulate_get(target, self.project_id)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_200)
|
||||
|
||||
query_string = 'pop=' + str(message_count)
|
||||
result = self.simulate_delete(self.messages_path, self.project_id,
|
||||
query_string=query_string)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_200)
|
||||
|
||||
result_doc = jsonutils.loads(result[0])
|
||||
|
||||
self.assertEqual(len(result_doc['messages']), message_count)
|
||||
|
||||
self.simulate_get(target, self.project_id)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_404)
|
||||
|
||||
@ddt.data('', 'pop=1000000', 'pop=10&ids=1', 'pop=-1')
|
||||
def test_pop_invalid(self, query_string):
|
||||
|
||||
self.simulate_delete(self.messages_path, self.project_id,
|
||||
query_string=query_string)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
def test_pop_empty_queue(self):
|
||||
|
||||
query_string = 'pop=1'
|
||||
result = self.simulate_delete(self.messages_path, self.project_id,
|
||||
query_string=query_string)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_200)
|
||||
|
||||
result_doc = jsonutils.loads(result[0])
|
||||
self.assertEqual(result_doc['messages'], [])
|
||||
|
||||
def test_pop_single_message(self):
|
||||
|
||||
self._post_messages(self.messages_path, repeat=5)
|
||||
msg_id = self._get_msg_id(self.srmock.headers_dict)
|
||||
target = self.messages_path + '/' + msg_id
|
||||
|
||||
self.simulate_get(target, self.project_id)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_200)
|
||||
|
||||
# Pop Single message from the queue
|
||||
query_string = 'pop=1'
|
||||
result = self.simulate_delete(self.messages_path, self.project_id,
|
||||
query_string=query_string)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_200)
|
||||
|
||||
# Get messages from the queue & verify message count
|
||||
query_string = 'echo=True'
|
||||
result = self.simulate_get(self.messages_path, self.project_id,
|
||||
query_string=query_string,
|
||||
headers=self.headers)
|
||||
result_doc = jsonutils.loads(result[0])
|
||||
actual_msg_count = len(result_doc['messages'])
|
||||
expected_msg_count = 4
|
||||
self.assertEqual(actual_msg_count, expected_msg_count)
|
||||
|
Loading…
x
Reference in New Issue
Block a user