Merge "Fix swift messages posting when pooling"
This commit is contained in:
commit
cb1bde31ce
@ -13,11 +13,9 @@
|
||||
|
||||
from zaqar.storage.swift import claims
|
||||
from zaqar.storage.swift import messages
|
||||
from zaqar.storage.swift import queues
|
||||
from zaqar.storage.swift import subscriptions
|
||||
|
||||
|
||||
MessageController = messages.MessageController
|
||||
QueueController = queues.QueueController
|
||||
ClaimController = claims.ClaimController
|
||||
SubscriptionController = subscriptions.SubscriptionController
|
||||
|
@ -53,10 +53,6 @@ class DataDriver(storage.DataDriverBase):
|
||||
LOG.exception(e)
|
||||
return False
|
||||
|
||||
@decorators.lazy_property(write=False)
|
||||
def queue_controller(self):
|
||||
return controllers.QueueController(self)
|
||||
|
||||
@decorators.lazy_property(write=False)
|
||||
def message_controller(self):
|
||||
return controllers.MessageController(self)
|
||||
|
@ -205,9 +205,8 @@ class MessageController(storage.Message):
|
||||
pass
|
||||
|
||||
def post(self, queue, messages, client_uuid, project=None):
|
||||
if not self._queue_ctrl.exists(queue, project):
|
||||
raise errors.QueueDoesNotExist(queue, project)
|
||||
|
||||
# TODO(flwang): It would be nice if we can create a middleware in Swift
|
||||
# to accept a json list so that Zaqar can create objects in bulk.
|
||||
return [self._create_msg(queue, m, client_uuid, project)
|
||||
for m in messages]
|
||||
|
||||
@ -225,9 +224,21 @@ class MessageController(storage.Message):
|
||||
'x-object-meta-clientid': str(client_uuid),
|
||||
'x-delete-after': msg['ttl']})
|
||||
except swiftclient.ClientException as exc:
|
||||
# NOTE(flwang): To avoid check the queue existence each time when
|
||||
# posting messages, let's catch the 404 and create the 'queue'
|
||||
# on demand.
|
||||
if exc.http_status == 404:
|
||||
raise errors.QueueDoesNotExist(queue, project)
|
||||
raise
|
||||
self._client.put_container(utils._message_container(queue,
|
||||
project))
|
||||
self._client.put_object(
|
||||
utils._message_container(queue, project),
|
||||
slug,
|
||||
contents=contents,
|
||||
content_type='application/json',
|
||||
headers={
|
||||
'x-object-meta-clientid': str(client_uuid),
|
||||
'x-delete-after': msg['ttl']})
|
||||
|
||||
return slug
|
||||
|
||||
def delete(self, queue, message_id, project=None, claim=None):
|
||||
|
@ -1,124 +0,0 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
||||
# use this file except in compliance with the License. You may obtain a copy
|
||||
# of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations under
|
||||
# the License.
|
||||
|
||||
import functools
|
||||
|
||||
from oslo_serialization import jsonutils
|
||||
import swiftclient
|
||||
|
||||
from zaqar import storage
|
||||
from zaqar.storage import errors
|
||||
from zaqar.storage.swift import utils
|
||||
|
||||
|
||||
class QueueController(storage.Queue):
|
||||
"""Implements queue resource operations with swift backend.
|
||||
|
||||
Queues are scoped by project.
|
||||
|
||||
queue -> Swift mapping:
|
||||
+----------------+---------------------------------------+
|
||||
| Attribute | Storage location |
|
||||
+----------------+---------------------------------------+
|
||||
| Queue Name | Object name |
|
||||
+----------------+---------------------------------------+
|
||||
| Project name | Container name prefix |
|
||||
+----------------+---------------------------------------+
|
||||
| Created time | Object Creation Time |
|
||||
+----------------+---------------------------------------+
|
||||
| Queue metadata | Object content |
|
||||
+----------------+---------------------------------------+
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(QueueController, self).__init__(*args, **kwargs)
|
||||
self._client = self.driver.connection
|
||||
|
||||
def _list(self, project=None, marker=None,
|
||||
limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False):
|
||||
container = utils._queue_container(project)
|
||||
try:
|
||||
_, objects = self._client.get_container(container,
|
||||
limit=limit,
|
||||
marker=marker)
|
||||
except swiftclient.ClientException as exc:
|
||||
if exc.http_status == 404:
|
||||
self._client.put_container(container)
|
||||
objects = []
|
||||
else:
|
||||
raise
|
||||
marker_next = {}
|
||||
yield utils.QueueListCursor(
|
||||
objects, detailed, marker_next,
|
||||
functools.partial(self._client.get_object, container))
|
||||
yield marker_next and marker_next['next']
|
||||
|
||||
def _get(self, name, project=None):
|
||||
try:
|
||||
return self.get_metadata(name, project)
|
||||
except errors.QueueDoesNotExist:
|
||||
return {}
|
||||
|
||||
def get_metadata(self, name, project=None):
|
||||
container = utils._queue_container(project)
|
||||
try:
|
||||
_, metadata = self._client.get_object(container, name)
|
||||
except swiftclient.ClientException as exc:
|
||||
if exc.http_status == 404:
|
||||
raise errors.QueueDoesNotExist(name, project)
|
||||
else:
|
||||
raise
|
||||
return jsonutils.loads(metadata) or {}
|
||||
|
||||
def set_metadata(self, name, metadata, project=None):
|
||||
self._create(name, metadata, project)
|
||||
|
||||
def _create(self, name, metadata=None, project=None):
|
||||
try:
|
||||
utils._put_or_create_container(
|
||||
self._client, utils._queue_container(project), name,
|
||||
content_type='application/json',
|
||||
contents=jsonutils.dumps(metadata),
|
||||
headers={'if-none-match': '*'})
|
||||
except swiftclient.ClientException as exc:
|
||||
if exc.http_status == 412:
|
||||
if metadata:
|
||||
# Enforce metadata setting regardless
|
||||
utils._put_or_create_container(
|
||||
self._client, utils._queue_container(project), name,
|
||||
content_type='application/json',
|
||||
contents=jsonutils.dumps(metadata))
|
||||
return False
|
||||
raise
|
||||
else:
|
||||
return True
|
||||
|
||||
def _delete(self, name, project=None):
|
||||
try:
|
||||
self._client.delete_object(utils._queue_container(project), name)
|
||||
except swiftclient.ClientException as exc:
|
||||
if exc.http_status != 404:
|
||||
raise
|
||||
|
||||
def _stats(self, name, project=None):
|
||||
pass
|
||||
|
||||
def _exists(self, name, project=None):
|
||||
try:
|
||||
return self._client.head_object(utils._queue_container(project),
|
||||
name)
|
||||
except swiftclient.ClientException as exc:
|
||||
if exc.http_status != 404:
|
||||
raise
|
||||
return False
|
||||
else:
|
||||
return True
|
@ -18,15 +18,6 @@ from zaqar import tests as testing
|
||||
from zaqar.tests.unit.storage import base
|
||||
|
||||
|
||||
@testing.requires_swift
|
||||
class SwiftQueuesTest(base.QueueControllerTest):
|
||||
|
||||
driver_class = driver.DataDriver
|
||||
config_file = 'wsgi_swift.conf'
|
||||
controller_class = controllers.QueueController
|
||||
control_driver_class = mongodb.ControlDriver
|
||||
|
||||
|
||||
@testing.requires_swift
|
||||
class SwiftMessagesTest(base.MessageControllerTest):
|
||||
driver_class = driver.DataDriver
|
||||
|
Loading…
x
Reference in New Issue
Block a user