Add _max_messages_post_size
and _default_message_ttl
for queue
Now Zaqar supports metadata as the attributes of a queue, so that user can fill it with some key-value pair. However, Zaqar itself isn't using it very much(only the '_flavor'). Meanwhile, we have some configurations defined in zaqar.conf against messages or claims which are global defined. For example, the max message size. So the idea is adding more reserved attributes for queue so that the queues of Zaqar are more flexible. This patch adds the `_max_messages_post_size` and `_default_message_ttl` for message posting, tests are added as well. APIImpact DocImpact blueprint more-reserved-queue-attributes Change-Id: I7ba84b6e862ff15684a1b1d919c0fed60f4c58b2
This commit is contained in:
parent
49c90739de
commit
7c6d52ccbf
@ -100,6 +100,7 @@ class Endpoints(object):
|
||||
try:
|
||||
self._validate.queue_identification(queue_name, project_id)
|
||||
self._validate.queue_metadata_length(len(str(metadata)))
|
||||
self._validate.queue_metadata_putting(metadata)
|
||||
created = self._queue_controller.create(queue_name,
|
||||
metadata=metadata,
|
||||
project=project_id)
|
||||
@ -371,18 +372,41 @@ class Endpoints(object):
|
||||
return api_utils.error_response(req, ex, headers, error)
|
||||
|
||||
try:
|
||||
# NOTE(flwang): Replace 'exists' with 'get_metadata' won't impact
|
||||
# the performance since both of them will call
|
||||
# collection.find_one()
|
||||
queue_meta = None
|
||||
try:
|
||||
queue_meta = self._queue_controller.get_metadata(queue_name,
|
||||
project_id)
|
||||
except storage_errors.DoesNotExist as ex:
|
||||
self._validate.queue_identification(queue_name, project_id)
|
||||
self._queue_controller.create(queue_name, project=project_id)
|
||||
# NOTE(flwang): Queue is created in lazy mode, so no metadata
|
||||
# set.
|
||||
queue_meta = {}
|
||||
|
||||
queue_max_msg_size = queue_meta.get('_max_messages_post_size',
|
||||
None)
|
||||
queue_default_ttl = queue_meta.get('_default_message_ttl', None)
|
||||
|
||||
# TODO(flwang): To avoid any unexpected regression issue, we just
|
||||
# leave the _message_post_spec attribute of class as it's. It
|
||||
# should be removed in Newton release.
|
||||
if queue_default_ttl:
|
||||
_message_post_spec = (('ttl', int, queue_default_ttl),
|
||||
('body', '*', None),)
|
||||
else:
|
||||
_message_post_spec = (('ttl', int, self._defaults.message_ttl),
|
||||
('body', '*', None),)
|
||||
# Place JSON size restriction before parsing
|
||||
self._validate.message_length(len(str(messages)))
|
||||
self._validate.message_length(len(str(messages)),
|
||||
max_msg_post_size=queue_max_msg_size)
|
||||
except validation.ValidationFailed as ex:
|
||||
LOG.debug(ex)
|
||||
headers = {'status': 400}
|
||||
return api_utils.error_response(req, ex, headers)
|
||||
|
||||
_message_post_spec = (
|
||||
('ttl', int, self._defaults.message_ttl),
|
||||
('body', '*', None),
|
||||
)
|
||||
|
||||
try:
|
||||
messages = api_utils.sanitize(messages,
|
||||
_message_post_spec,
|
||||
@ -397,10 +421,6 @@ class Endpoints(object):
|
||||
|
||||
self._validate.message_posting(messages)
|
||||
|
||||
if not self._queue_controller.exists(queue_name, project_id):
|
||||
self._validate.queue_identification(queue_name, project_id)
|
||||
self._queue_controller.create(queue_name, project=project_id)
|
||||
|
||||
message_ids = self._message_controller.post(
|
||||
queue_name,
|
||||
messages=messages,
|
||||
|
@ -265,6 +265,45 @@ class TestMessagesMongoDB(base.V2Base):
|
||||
self._post_messages(self.url_prefix + '/queues/nonexistent/messages')
|
||||
self.assertEqual(falcon.HTTP_201, self.srmock.status)
|
||||
|
||||
def test_post_using_queue_default_message_ttl(self):
|
||||
queue_path = self.url_prefix + '/queues/test_queue1'
|
||||
messages_path = queue_path + '/messages'
|
||||
doc = '{"_default_message_ttl": 999}'
|
||||
self.simulate_put(queue_path, body=doc, headers=self.headers)
|
||||
self.addCleanup(self.simulate_delete, queue_path, headers=self.headers)
|
||||
sample_messages = {
|
||||
'messages': [
|
||||
{'body': {'key': 'value'}},
|
||||
],
|
||||
}
|
||||
|
||||
sample_doc = jsonutils.dumps(sample_messages)
|
||||
result = self.simulate_post(messages_path,
|
||||
body=sample_doc, headers=self.headers)
|
||||
result_doc = jsonutils.loads(result[0])
|
||||
href = result_doc['resources'][0]
|
||||
result = self.simulate_get(href, headers=self.headers)
|
||||
message = jsonutils.loads(result[0])
|
||||
|
||||
self.assertEqual(999, message['ttl'])
|
||||
|
||||
def test_post_using_queue_max_messages_post_size(self):
|
||||
queue_path = self.url_prefix + '/queues/test_queue2'
|
||||
messages_path = queue_path + '/messages'
|
||||
doc = '{"_max_messages_post_size": 1023}'
|
||||
self.simulate_put(queue_path, body=doc, headers=self.headers)
|
||||
self.addCleanup(self.simulate_delete, queue_path, headers=self.headers)
|
||||
sample_messages = {
|
||||
'messages': [
|
||||
{'body': {'key': 'a' * 1204}},
|
||||
],
|
||||
}
|
||||
|
||||
sample_doc = jsonutils.dumps(sample_messages)
|
||||
self.simulate_post(messages_path,
|
||||
body=sample_doc, headers=self.headers)
|
||||
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
||||
|
||||
def test_get_from_missing_queue(self):
|
||||
body = self.simulate_get(self.url_prefix +
|
||||
'/queues/nonexistent/messages',
|
||||
@ -633,7 +672,7 @@ class TestMessagesFaultyDriver(base.V2BaseFaulty):
|
||||
self.simulate_post(path,
|
||||
body=body,
|
||||
headers=headers)
|
||||
self.assertEqual(falcon.HTTP_503, self.srmock.status)
|
||||
self.assertEqual(falcon.HTTP_500, self.srmock.status)
|
||||
|
||||
self.simulate_get(path,
|
||||
headers=headers)
|
||||
|
@ -98,3 +98,40 @@ class TestValidation(base.V2Base):
|
||||
body='{"timespace": "Shangri-la"}',
|
||||
headers=empty_headers)
|
||||
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
||||
|
||||
def test_queue_metadata_putting(self):
|
||||
# Test _default_message_ttl
|
||||
# TTL normal case
|
||||
queue_1 = self.url_prefix + '/queues/queue1'
|
||||
self.simulate_put(queue_1,
|
||||
self.project_id,
|
||||
body='{"_default_message_ttl": 60}')
|
||||
self.addCleanup(self.simulate_delete, queue_1, headers=self.headers)
|
||||
self.assertEqual(falcon.HTTP_201, self.srmock.status)
|
||||
|
||||
# TTL under min
|
||||
self.simulate_put(queue_1,
|
||||
self.project_id,
|
||||
body='{"_default_message_ttl": 59}')
|
||||
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
||||
|
||||
# TTL over max
|
||||
self.simulate_put(queue_1,
|
||||
self.project_id,
|
||||
body='{"_default_message_ttl": 1209601}')
|
||||
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
||||
|
||||
# Test _max_messages_post_size
|
||||
# Size normal case
|
||||
queue_2 = self.url_prefix + '/queues/queue2'
|
||||
self.simulate_put(queue_2,
|
||||
self.project_id,
|
||||
body='{"_max_messages_post_size": 255}')
|
||||
self.addCleanup(self.simulate_delete, queue_2, headers=self.headers)
|
||||
self.assertEqual(falcon.HTTP_201, self.srmock.status)
|
||||
|
||||
# Size over max
|
||||
self.simulate_put(queue_2,
|
||||
self.project_id,
|
||||
body='{"_max_messages_post_size": 257}')
|
||||
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
||||
|
@ -153,6 +153,42 @@ class Validator(object):
|
||||
msg = _(u'Queue metadata is too large. Max size: {0}')
|
||||
raise ValidationFailed(msg, self._limits_conf.max_queue_metadata)
|
||||
|
||||
def queue_metadata_putting(self, queue_metadata):
|
||||
"""Checking if the reserved attributes of the queue are valid.
|
||||
|
||||
:param queue_metadata: Queue's metadata.
|
||||
:raises: ValidationFailed if any reserved attribute is invalid.
|
||||
"""
|
||||
if not queue_metadata:
|
||||
return
|
||||
|
||||
queue_default_ttl = queue_metadata.get('_default_message_ttl', None)
|
||||
if queue_default_ttl and not isinstance(queue_default_ttl, int):
|
||||
msg = _(u'_default_message_ttl must be integer.')
|
||||
raise ValidationFailed(msg)
|
||||
|
||||
if queue_default_ttl:
|
||||
if not (MIN_MESSAGE_TTL <= queue_default_ttl <=
|
||||
self._limits_conf.max_message_ttl):
|
||||
msg = _(u'_default_message_ttl can not exceed {0} '
|
||||
'seconds, and must be at least {1} seconds long.')
|
||||
raise ValidationFailed(
|
||||
msg, self._limits_conf.max_message_ttl, MIN_MESSAGE_TTL)
|
||||
|
||||
queue_max_msg_size = queue_metadata.get('_max_messages_post_size',
|
||||
None)
|
||||
if queue_max_msg_size and not isinstance(queue_max_msg_size, int):
|
||||
msg = _(u'_max_messages_post_size must be integer.')
|
||||
raise ValidationFailed(msg)
|
||||
|
||||
if queue_max_msg_size:
|
||||
if not (0 < queue_max_msg_size <=
|
||||
self._limits_conf.max_messages_post_size):
|
||||
raise ValidationFailed(
|
||||
_(u'_max_messages_post_size can not exceed {0}, '
|
||||
' and must be at least greater than 0.'),
|
||||
self._limits_conf.max_messages_post_size)
|
||||
|
||||
def message_posting(self, messages):
|
||||
"""Restrictions on a list of messages.
|
||||
|
||||
@ -167,7 +203,7 @@ class Validator(object):
|
||||
for msg in messages:
|
||||
self.message_content(msg)
|
||||
|
||||
def message_length(self, content_length):
|
||||
def message_length(self, content_length, max_msg_post_size=None):
|
||||
"""Restrictions on message post length.
|
||||
|
||||
:param content_length: Queue request's length.
|
||||
@ -175,6 +211,26 @@ class Validator(object):
|
||||
"""
|
||||
if content_length is None:
|
||||
return
|
||||
|
||||
if max_msg_post_size:
|
||||
try:
|
||||
min_max_size = min(max_msg_post_size,
|
||||
self._limits_conf.max_messages_post_size)
|
||||
if content_length > min_max_size:
|
||||
raise ValidationFailed(
|
||||
_(u'Message collection size is too large. The max '
|
||||
'size for current queue is {0}. It is calculated '
|
||||
'by max size = min(max_messages_post_size_config: '
|
||||
'{1}, max_messages_post_size_queue: {2}).'),
|
||||
min_max_size,
|
||||
self._limits_conf.max_messages_post_size,
|
||||
max_msg_post_size)
|
||||
except TypeError:
|
||||
# NOTE(flwang): If there is a type error when using min(),
|
||||
# it only happens in py3.x, it will be skipped and compare
|
||||
# the message length with the size defined in config file.
|
||||
pass
|
||||
|
||||
if content_length > self._limits_conf.max_messages_post_size:
|
||||
raise ValidationFailed(
|
||||
_(u'Message collection size is too large. Max size {0}'),
|
||||
|
@ -38,6 +38,7 @@ class CollectionResource(object):
|
||||
'_wsgi_conf',
|
||||
'_validate',
|
||||
'_message_post_spec',
|
||||
'_default_message_ttl'
|
||||
)
|
||||
|
||||
def __init__(self, wsgi_conf, validate,
|
||||
@ -48,9 +49,10 @@ class CollectionResource(object):
|
||||
self._validate = validate
|
||||
self._message_controller = message_controller
|
||||
self._queue_controller = queue_controller
|
||||
self._default_message_ttl = default_message_ttl
|
||||
|
||||
self._message_post_spec = (
|
||||
('ttl', int, default_message_ttl),
|
||||
('ttl', int, self._default_message_ttl),
|
||||
('body', '*', None),
|
||||
)
|
||||
|
||||
@ -155,10 +157,37 @@ class CollectionResource(object):
|
||||
@acl.enforce("messages:create")
|
||||
def on_post(self, req, resp, project_id, queue_name):
|
||||
client_uuid = wsgi_helpers.get_client_uuid(req)
|
||||
|
||||
try:
|
||||
# NOTE(flwang): Replace 'exists' with 'get_metadata' won't impact
|
||||
# the performance since both of them will call
|
||||
# collection.find_one()
|
||||
queue_meta = None
|
||||
try:
|
||||
queue_meta = self._queue_controller.get_metadata(queue_name,
|
||||
project_id)
|
||||
except storage_errors.DoesNotExist as ex:
|
||||
self._validate.queue_identification(queue_name, project_id)
|
||||
self._queue_controller.create(queue_name, project=project_id)
|
||||
# NOTE(flwang): Queue is created in lazy mode, so no metadata
|
||||
# set.
|
||||
queue_meta = {}
|
||||
|
||||
queue_max_msg_size = queue_meta.get('_max_messages_post_size',
|
||||
None)
|
||||
queue_default_ttl = queue_meta.get('_default_message_ttl', None)
|
||||
|
||||
# TODO(flwang): To avoid any unexpected regression issue, we just
|
||||
# leave the _message_post_spec attribute of class as it's. It
|
||||
# should be removed in Newton release.
|
||||
if queue_default_ttl:
|
||||
message_post_spec = (('ttl', int, queue_default_ttl),
|
||||
('body', '*', None),)
|
||||
else:
|
||||
message_post_spec = (('ttl', int, self._default_message_ttl),
|
||||
('body', '*', None),)
|
||||
# Place JSON size restriction before parsing
|
||||
self._validate.message_length(req.content_length)
|
||||
self._validate.message_length(req.content_length,
|
||||
max_msg_post_size=queue_max_msg_size)
|
||||
except validation.ValidationFailed as ex:
|
||||
LOG.debug(ex)
|
||||
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
|
||||
@ -171,15 +200,12 @@ class CollectionResource(object):
|
||||
raise wsgi_errors.HTTPBadRequestAPI(description)
|
||||
|
||||
messages = wsgi_utils.sanitize(document['messages'],
|
||||
self._message_post_spec,
|
||||
message_post_spec,
|
||||
doctype=wsgi_utils.JSONArray)
|
||||
|
||||
try:
|
||||
self._validate.message_posting(messages)
|
||||
|
||||
if not self._queue_controller.exists(queue_name, project_id):
|
||||
self._queue_controller.create(queue_name, project=project_id)
|
||||
|
||||
message_ids = self._message_controller.post(
|
||||
queue_name,
|
||||
messages=messages,
|
||||
|
@ -74,6 +74,7 @@ class ItemResource(object):
|
||||
metadata = wsgi_utils.sanitize(document, spec=None)
|
||||
|
||||
try:
|
||||
self._validate.queue_metadata_putting(metadata)
|
||||
created = self._queue_controller.create(queue_name,
|
||||
metadata=metadata,
|
||||
project=project_id)
|
||||
@ -81,7 +82,9 @@ class ItemResource(object):
|
||||
except storage_errors.FlavorDoesNotExist as ex:
|
||||
LOG.exception(ex)
|
||||
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
|
||||
|
||||
except validation.ValidationFailed as ex:
|
||||
LOG.debug(ex)
|
||||
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
description = _(u'Queue could not be created.')
|
||||
|
Loading…
x
Reference in New Issue
Block a user