From e3444bb6b5adfe240bbb4fa3a14fbb11311c7139 Mon Sep 17 00:00:00 2001 From: Fei Long Wang Date: Fri, 7 Apr 2017 22:52:20 +1200 Subject: [PATCH] Fix swift messages posting when pooling 1. Currently, swift driver is always getting the swift queue controller which is not correct when pooling is enabled. 2. When using pooling, all the queues will be saved in management database, it works for database backend. But it doesn' work for swift, because for database backend, the queue name is just a field of the message record. For swift, we're using container as the "queue" to save messages. So before saving messages, we need to make sure the container of the queue has been created. Closes-Bug: 1680678 Change-Id: I5195911d22d5935aa3de105fe735ac159276a025 --- zaqar/storage/swift/controllers.py | 2 - zaqar/storage/swift/driver.py | 4 - zaqar/storage/swift/messages.py | 21 +++- zaqar/storage/swift/queues.py | 124 -------------------- zaqar/tests/unit/storage/test_impl_swift.py | 9 -- 5 files changed, 16 insertions(+), 144 deletions(-) delete mode 100644 zaqar/storage/swift/queues.py diff --git a/zaqar/storage/swift/controllers.py b/zaqar/storage/swift/controllers.py index 1fb3cb490..9d6e80b85 100644 --- a/zaqar/storage/swift/controllers.py +++ b/zaqar/storage/swift/controllers.py @@ -13,11 +13,9 @@ from zaqar.storage.swift import claims from zaqar.storage.swift import messages -from zaqar.storage.swift import queues from zaqar.storage.swift import subscriptions MessageController = messages.MessageController -QueueController = queues.QueueController ClaimController = claims.ClaimController SubscriptionController = subscriptions.SubscriptionController diff --git a/zaqar/storage/swift/driver.py b/zaqar/storage/swift/driver.py index 953a2fdcc..b0cd6077f 100644 --- a/zaqar/storage/swift/driver.py +++ b/zaqar/storage/swift/driver.py @@ -48,10 +48,6 @@ class DataDriver(storage.DataDriverBase): def is_alive(self): return True - @decorators.lazy_property(write=False) - def queue_controller(self): - return controllers.QueueController(self) - @decorators.lazy_property(write=False) def message_controller(self): return controllers.MessageController(self) diff --git a/zaqar/storage/swift/messages.py b/zaqar/storage/swift/messages.py index f1ddb4666..cd18550a4 100644 --- a/zaqar/storage/swift/messages.py +++ b/zaqar/storage/swift/messages.py @@ -205,9 +205,8 @@ class MessageController(storage.Message): pass def post(self, queue, messages, client_uuid, project=None): - if not self._queue_ctrl.exists(queue, project): - raise errors.QueueDoesNotExist(queue, project) - + # TODO(flwang): It would be nice if we can create a middleware in Swift + # to accept a json list so that Zaqar can create objects in bulk. return [self._create_msg(queue, m, client_uuid, project) for m in messages] @@ -225,9 +224,21 @@ class MessageController(storage.Message): 'x-object-meta-clientid': str(client_uuid), 'x-delete-after': msg['ttl']}) except swiftclient.ClientException as exc: + # NOTE(flwang): To avoid check the queue existence each time when + # posting messages, let's catch the 404 and create the 'queue' + # on demand. if exc.http_status == 404: - raise errors.QueueDoesNotExist(queue, project) - raise + self._client.put_container(utils._message_container(queue, + project)) + self._client.put_object( + utils._message_container(queue, project), + slug, + contents=contents, + content_type='application/json', + headers={ + 'x-object-meta-clientid': str(client_uuid), + 'x-delete-after': msg['ttl']}) + return slug def delete(self, queue, message_id, project=None, claim=None): diff --git a/zaqar/storage/swift/queues.py b/zaqar/storage/swift/queues.py deleted file mode 100644 index e8bbb2c95..000000000 --- a/zaqar/storage/swift/queues.py +++ /dev/null @@ -1,124 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may not -# use this file except in compliance with the License. You may obtain a copy -# of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations under -# the License. - -import functools - -from oslo_serialization import jsonutils -import swiftclient - -from zaqar import storage -from zaqar.storage import errors -from zaqar.storage.swift import utils - - -class QueueController(storage.Queue): - """Implements queue resource operations with swift backend. - - Queues are scoped by project. - - queue -> Swift mapping: - +----------------+---------------------------------------+ - | Attribute | Storage location | - +----------------+---------------------------------------+ - | Queue Name | Object name | - +----------------+---------------------------------------+ - | Project name | Container name prefix | - +----------------+---------------------------------------+ - | Created time | Object Creation Time | - +----------------+---------------------------------------+ - | Queue metadata | Object content | - +----------------+---------------------------------------+ - """ - - def __init__(self, *args, **kwargs): - super(QueueController, self).__init__(*args, **kwargs) - self._client = self.driver.connection - - def _list(self, project=None, marker=None, - limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False): - container = utils._queue_container(project) - try: - _, objects = self._client.get_container(container, - limit=limit, - marker=marker) - except swiftclient.ClientException as exc: - if exc.http_status == 404: - self._client.put_container(container) - objects = [] - else: - raise - marker_next = {} - yield utils.QueueListCursor( - objects, detailed, marker_next, - functools.partial(self._client.get_object, container)) - yield marker_next and marker_next['next'] - - def _get(self, name, project=None): - try: - return self.get_metadata(name, project) - except errors.QueueDoesNotExist: - return {} - - def get_metadata(self, name, project=None): - container = utils._queue_container(project) - try: - _, metadata = self._client.get_object(container, name) - except swiftclient.ClientException as exc: - if exc.http_status == 404: - raise errors.QueueDoesNotExist(name, project) - else: - raise - return jsonutils.loads(metadata) or {} - - def set_metadata(self, name, metadata, project=None): - self._create(name, metadata, project) - - def _create(self, name, metadata=None, project=None): - try: - utils._put_or_create_container( - self._client, utils._queue_container(project), name, - content_type='application/json', - contents=jsonutils.dumps(metadata), - headers={'if-none-match': '*'}) - except swiftclient.ClientException as exc: - if exc.http_status == 412: - if metadata: - # Enforce metadata setting regardless - utils._put_or_create_container( - self._client, utils._queue_container(project), name, - content_type='application/json', - contents=jsonutils.dumps(metadata)) - return False - raise - else: - return True - - def _delete(self, name, project=None): - try: - self._client.delete_object(utils._queue_container(project), name) - except swiftclient.ClientException as exc: - if exc.http_status != 404: - raise - - def _stats(self, name, project=None): - pass - - def _exists(self, name, project=None): - try: - return self._client.head_object(utils._queue_container(project), - name) - except swiftclient.ClientException as exc: - if exc.http_status != 404: - raise - return False - else: - return True diff --git a/zaqar/tests/unit/storage/test_impl_swift.py b/zaqar/tests/unit/storage/test_impl_swift.py index 07f0f0f78..963d46487 100644 --- a/zaqar/tests/unit/storage/test_impl_swift.py +++ b/zaqar/tests/unit/storage/test_impl_swift.py @@ -18,15 +18,6 @@ from zaqar import tests as testing from zaqar.tests.unit.storage import base -@testing.requires_swift -class SwiftQueuesTest(base.QueueControllerTest): - - driver_class = driver.DataDriver - config_file = 'wsgi_swift.conf' - controller_class = controllers.QueueController - control_driver_class = mongodb.ControlDriver - - @testing.requires_swift class SwiftMessagesTest(base.MessageControllerTest): driver_class = driver.DataDriver