diff --git a/devstack/plugin.sh b/devstack/plugin.sh index d1b0ec272..28688ea68 100755 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -133,6 +133,15 @@ function configure_zaqar { iniset $ZAQAR_CONF 'drivers:message_store:redis' uri redis://localhost:6379 iniset $ZAQAR_CONF 'drivers:message_store:redis' database zaqar configure_redis + elif [ "$ZAQAR_BACKEND" = 'swift' ] ; then + recreate_database zaqar + iniset $ZAQAR_CONF drivers management_store sqlalchemy + iniset $ZAQAR_CONF 'drivers:management_store:sqlalchemy' uri `database_connection_url zaqar` + iniset $ZAQAR_CONF 'drivers:management_store:sqlalchemy' database zaqar_mgmt + + iniset $ZAQAR_CONF drivers message_store swift + iniset $ZAQAR_CONF 'drivers:message_store:swift' auth_url $KEYSTONE_AUTH_URI_V3 + iniset $ZAQAR_CONF 'drivers:message_store:swift' uri swift://zaqar:$SERVICE_PASSWORD@/service fi if is_service_enabled qpid || [ -n "$RABBIT_HOST" ] && [ -n "$RABBIT_PASSWORD" ]; then @@ -265,6 +274,9 @@ function create_zaqar_accounts { "$ZAQAR_SERVICE_PROTOCOL://$ZAQAR_SERVICE_HOST:$ZAQAR_WEBSOCKET_PORT" fi + if [ "$ZAQAR_BACKEND" = 'swift' ] ; then + get_or_add_user_project_role ResellerAdmin zaqar service + fi } if is_service_enabled zaqar-websocket || is_service_enabled zaqar-wsgi; then diff --git a/etc/oslo-config-generator/zaqar.conf b/etc/oslo-config-generator/zaqar.conf index 94d81fe72..64561e1c8 100644 --- a/etc/oslo-config-generator/zaqar.conf +++ b/etc/oslo-config-generator/zaqar.conf @@ -8,6 +8,7 @@ namespace = zaqar.storage.pooling namespace = zaqar.storage.mongodb namespace = zaqar.storage.redis namespace = zaqar.storage.sqlalchemy +namespace = zaqar.storage.swift namespace = zaqar.transport.wsgi namespace = zaqar.transport.base namespace = zaqar.transport.validation diff --git a/setup.cfg b/setup.cfg index 24acec1b6..01d96db76 100644 --- a/setup.cfg +++ b/setup.cfg @@ -38,6 +38,7 @@ zaqar.data.storage = mongodb = zaqar.storage.mongodb.driver:DataDriver mongodb.fifo = zaqar.storage.mongodb.driver:FIFODataDriver redis = zaqar.storage.redis.driver:DataDriver + swift = zaqar.storage.swift.driver:DataDriver faulty = zaqar.tests.faulty_storage:DataDriver zaqar.control.storage = @@ -58,6 +59,7 @@ oslo.config.opts = zaqar.storage.mongodb = zaqar.storage.mongodb.options:_config_options zaqar.storage.redis = zaqar.storage.redis.options:_config_options zaqar.storage.sqlalchemy = zaqar.storage.sqlalchemy.options:_config_options + zaqar.storage.swift = zaqar.storage.swift.options:_config_options zaqar.transport.wsgi = zaqar.transport.wsgi.driver:_config_options zaqar.transport.websocket = zaqar.transport.websocket.driver:_config_options zaqar.transport.base = zaqar.transport.base:_config_options @@ -72,6 +74,9 @@ zaqar.storage.mongodb.driver.queue.stages = zaqar.storage.redis.driver.queue.stages = message_queue_handler = zaqar.storage.redis.messages:MessageQueueHandler +zaqar.storage.swift.driver.queue.stages = + message_queue_handler = zaqar.storage.swift.messages:MessageQueueHandler + zaqar.notification.tasks = http = zaqar.notification.tasks.webhook:WebhookTask https = zaqar.notification.tasks.webhook:WebhookTask diff --git a/test-requirements.txt b/test-requirements.txt index 8ca7e5fcd..1ce69dd2b 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -10,6 +10,7 @@ mock>=2.0 # BSD # Backends redis>=2.10.0 # MIT pymongo!=3.1,>=3.0.2 # Apache-2.0 +python-swiftclient>=3.2.0 # Apache-2.0 websocket-client>=0.32.0 # LGPLv2+ # Unit testing diff --git a/zaqar/storage/swift/__init__.py b/zaqar/storage/swift/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/zaqar/storage/swift/claims.py b/zaqar/storage/swift/claims.py new file mode 100644 index 000000000..ae7dc7605 --- /dev/null +++ b/zaqar/storage/swift/claims.py @@ -0,0 +1,188 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import hashlib +import math + +from oslo_serialization import jsonutils +from oslo_utils import timeutils +from oslo_utils import uuidutils +import swiftclient + +from zaqar.common import decorators +from zaqar import storage +from zaqar.storage import errors +from zaqar.storage.swift import utils + + +class ClaimController(storage.Claim): + """Implements claims resource operations with swift backend + + Claims are scoped by project + queue. + """ + def __init__(self, *args, **kwargs): + super(ClaimController, self).__init__(*args, **kwargs) + self._client = self.driver.connection + + @decorators.lazy_property(write=False) + def _message_ctrl(self): + return self.driver.message_controller + + @decorators.lazy_property(write=False) + def _queue_ctrl(self): + return self.driver.queue_controller + + def _exists(self, queue, claim_id, project=None): + try: + return self._client.head_object( + utils._claim_container(queue, project), + claim_id) + except swiftclient.ClientException as exc: + if exc.http_status == 404: + raise errors.ClaimDoesNotExist(claim_id, queue, project) + raise + + def _get(self, queue, claim_id, project=None): + try: + container = utils._claim_container(queue, project) + headers, claim = self._client.get_object(container, claim_id) + except swiftclient.ClientException as exc: + if exc.http_status != 404: + raise + return + now = timeutils.utcnow_ts(True) + return { + 'id': claim_id, + 'age': now - float(headers['x-timestamp']), + 'ttl': int(headers['x-delete-at']) - math.floor(now), + } + + def get(self, queue, claim_id, project=None): + now = timeutils.utcnow_ts(True) + self._exists(queue, claim_id, project) + + container = utils._claim_container(queue, project) + + headers, claim_obj = self._client.get_object(container, claim_id) + + def g(): + for msg_id in jsonutils.loads(claim_obj): + try: + headers, msg = self._message_ctrl._find_message( + queue, msg_id, project) + except errors.MessageDoesNotExist: + continue + else: + yield utils._message_to_json(msg_id, msg, headers, now) + + claim_meta = { + 'id': claim_id, + 'age': now - float(headers['x-timestamp']), + 'ttl': int(headers['x-delete-at']) - math.floor(now), + } + + return claim_meta, g() + + def create(self, queue, metadata, project=None, + limit=storage.DEFAULT_MESSAGES_PER_CLAIM): + ttl = metadata['ttl'] + grace = metadata['grace'] + msg_ts = ttl + grace + claim_id = uuidutils.generate_uuid() + + messages, marker = self._message_ctrl._list( + queue, project, limit=limit, include_claimed=False) + + claimed = [] + for msg in messages: + md5 = hashlib.md5() + md5.update( + jsonutils.dumps( + {'body': msg['body'], 'claim_id': None, + 'ttl': msg['ttl']})) + md5 = md5.hexdigest() + msg_ttl = max(msg['ttl'], msg_ts) + content = jsonutils.dumps( + {'body': msg['body'], 'claim_id': claim_id, 'ttl': msg_ttl}) + try: + self._client.put_object( + utils._message_container(queue, project), + msg['id'], + content, + headers={'x-object-meta-clientid': msg['client_uuid'], + 'if-match': md5, + 'x-object-meta-claimid': claim_id, + 'x-delete-after': msg_ttl}) + except swiftclient.ClientException as exc: + if exc.http_status == 412: + continue + raise + else: + msg['claim_id'] = claim_id + msg['ttl'] = msg_ttl + claimed.append(msg) + + utils._put_or_create_container( + self._client, + utils._claim_container(queue, project), + claim_id, + jsonutils.dumps([msg['id'] for msg in claimed]), + headers={'x-delete-after': ttl} + ) + + return claim_id, claimed + + def update(self, queue, claim_id, metadata, project=None): + container = utils._claim_container(queue, project) + try: + headers, obj = self._client.get_object(container, claim_id) + except swiftclient.ClientException as exc: + if exc.http_status == 404: + raise errors.ClaimDoesNotExist(claim_id, queue, project) + raise + + self._client.put_object(container, claim_id, obj, + headers={'x-delete-after': metadata['ttl']}) + + def delete(self, queue, claim_id, project=None): + try: + header, obj = self._client.get_object( + utils._claim_container(queue, project), + claim_id) + for msg_id in jsonutils.loads(obj): + try: + headers, msg = self._message_ctrl._find_message( + queue, msg_id, project) + except errors.MessageDoesNotExist: + continue + md5 = hashlib.md5() + md5.update(msg) + md5 = md5.hexdigest() + msg = jsonutils.loads(msg) + content = jsonutils.dumps( + {'body': msg['body'], 'claim_id': None, 'ttl': msg['ttl']}) + client_id = headers['x-object-meta-clientid'] + self._client.put_object( + utils._message_container(queue, project), + msg_id, + content, + headers={'x-object-meta-clientid': client_id, + 'if-match': md5, + 'x-delete-at': headers['x-delete-at']}) + + self._client.delete_object( + utils._claim_container(queue, project), + claim_id) + except swiftclient.ClientException as exc: + if exc.http_status != 404: + raise diff --git a/zaqar/storage/swift/controllers.py b/zaqar/storage/swift/controllers.py new file mode 100644 index 000000000..1fb3cb490 --- /dev/null +++ b/zaqar/storage/swift/controllers.py @@ -0,0 +1,23 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from zaqar.storage.swift import claims +from zaqar.storage.swift import messages +from zaqar.storage.swift import queues +from zaqar.storage.swift import subscriptions + + +MessageController = messages.MessageController +QueueController = queues.QueueController +ClaimController = claims.ClaimController +SubscriptionController = subscriptions.SubscriptionController diff --git a/zaqar/storage/swift/driver.py b/zaqar/storage/swift/driver.py new file mode 100644 index 000000000..039c483a3 --- /dev/null +++ b/zaqar/storage/swift/driver.py @@ -0,0 +1,78 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from six.moves import urllib + +from oslo_log import log as logging +import swiftclient + +from zaqar.common import decorators +from zaqar import storage +from zaqar.storage.swift import controllers +from zaqar.storage.swift import options + +LOG = logging.getLogger(__name__) + + +class DataDriver(storage.DataDriverBase): + + _DRIVER_OPTIONS = options._config_options() + + def __init__(self, conf, cache, control_driver): + super(DataDriver, self).__init__(conf, cache, control_driver) + self.swift_conf = self.conf[options.MESSAGE_SWIFT_GROUP] + + @property + def capabilities(self): + return ( + storage.Capabilities.AOD, + storage.Capabilities.DURABILITY, + ) + + @decorators.lazy_property(write=False) + def connection(self): + return _get_swift_client(self) + + def is_alive(self): + return True + + @decorators.lazy_property(write=False) + def queue_controller(self): + return controllers.QueueController(self) + + @decorators.lazy_property(write=False) + def message_controller(self): + return controllers.MessageController(self) + + @decorators.lazy_property(write=False) + def subscription_controller(self): + return controllers.SubscriptionController(self) + + @decorators.lazy_property(write=False) + def claim_controller(self): + return controllers.ClaimController(self) + + def _health(self): + raise NotImplementedError("No health checks") + + def close(self): + pass + + +def _get_swift_client(driver): + conf = driver.swift_conf + parsed_url = urllib.parse.urlparse(conf.uri) + return swiftclient.Connection(conf.auth_url, parsed_url.username, + parsed_url.password, + insecure=conf.insecure, auth_version="3", + tenant_name=parsed_url.path[1:]) diff --git a/zaqar/storage/swift/messages.py b/zaqar/storage/swift/messages.py new file mode 100644 index 000000000..ea473db84 --- /dev/null +++ b/zaqar/storage/swift/messages.py @@ -0,0 +1,352 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import functools +import uuid + +from oslo_serialization import jsonutils +from oslo_utils import timeutils +import swiftclient + +from zaqar.common import decorators +from zaqar import storage +from zaqar.storage import errors +from zaqar.storage.swift import utils + + +class MessageController(storage.Message): + """Implements message resource operations with swift backend + + Messages are scoped by project + queue. + + message -> Swift mapping: + +--------------+-----------------------------------------+ + | Attribute | Storage location | + +--------------+-----------------------------------------+ + | Msg UUID | Object name | + +--------------+-----------------------------------------+ + | Queue Name | Container name prefix | + +--------------+-----------------------------------------+ + | Project name | Container name prefix | + +--------------+-----------------------------------------+ + | Created time | Object Creation Time | + +--------------+-----------------------------------------+ + | Msg Body | Object content 'body' | + +--------------+-----------------------------------------+ + | Client ID | Object header 'ClientID' | + +--------------+-----------------------------------------+ + | Claim ID | Object content 'claim_id' | + +--------------+-----------------------------------------+ + | Expires | Object Delete-After header | + +--------------------------------------------------------+ + """ + + def __init__(self, *args, **kwargs): + super(MessageController, self).__init__(*args, **kwargs) + self._client = self.driver.connection + + @decorators.lazy_property(write=False) + def _queue_ctrl(self): + return self.driver.queue_controller + + @decorators.lazy_property(write=False) + def _claim_ctrl(self): + return self.driver.claim_controller + + def _delete_queue_messages(self, queue, project, pipe): + """Method to remove all the messages belonging to a queue. + + Will be referenced from the QueueController. + The pipe to execute deletion will be passed from the QueueController + executing the operation. + """ + container = utils._message_container(queue, project) + remaining = True + key = '' + while remaining: + headers, objects = self._client.get_container(container, + limit=1000, + marker=key) + if not objects: + return + remaining = len(objects) == 1000 + key = objects[-1]['name'] + for o in objects: + try: + self._client.delete_object(container, o['name']) + except swiftclient.ClientException as exc: + if exc.http_status == 404: + continue + raise + + def _list(self, queue, project=None, marker=None, + limit=storage.DEFAULT_MESSAGES_PER_PAGE, + echo=False, client_uuid=None, + include_claimed=False, sort=1): + """List messages in the queue, oldest first(ish) + + Time ordering and message inclusion in lists are soft, there is no + global order and times are based on the UTC time of the zaqar-api + server that the message was created from. + + Here be consistency dragons. + """ + if not self._queue_ctrl.exists(queue, project): + raise errors.QueueDoesNotExist(queue, project) + + client = self._client + container = utils._message_container(queue, project) + query_string = None + if sort == -1: + query_string = 'reverse=on' + + try: + _, objects = client.get_container( + container, + marker=marker, + # list 2x the objects because some listing items may have + # expired + limit=limit * 2, + query_string=query_string) + except swiftclient.ClientException as exc: + if exc.http_status == 404: + raise errors.QueueDoesNotExist(queue, project) + raise + + def is_claimed(msg, headers): + if include_claimed: + return False + return msg['claim_id'] is not None + + def is_echo(msg, headers): + if echo: + return False + return headers['x-object-meta-clientid'] == str(client_uuid) + + filters = [ + is_echo, + is_claimed, + ] + marker = {} + get_object = functools.partial(client.get_object, container) + list_objects = functools.partial(client.get_container, container, + limit=limit * 2, + query_string=query_string) + yield utils._filter_messages(objects, filters, marker, get_object, + list_objects, limit=limit) + yield marker and marker['next'] + + def list(self, queue, project=None, marker=None, + limit=storage.DEFAULT_MESSAGES_PER_PAGE, + echo=False, client_uuid=None, + include_claimed=False): + return self._list(queue, project, marker, limit, echo, + client_uuid, include_claimed) + + def first(self, queue, project=None, sort=1): + if sort not in (1, -1): + raise ValueError(u'sort must be either 1 (ascending) ' + u'or -1 (descending)') + cursor = self._list(queue, project, limit=1, sort=sort) + try: + message = next(next(cursor)) + except StopIteration: + raise errors.QueueIsEmpty(queue, project) + return message + + def get(self, queue, message_id, project=None): + return self._get(queue, message_id, project) + + def _get(self, queue, message_id, project=None, check_queue=True): + if check_queue and not self._queue_ctrl.exists(queue, project): + raise errors.QueueDoesNotExist(queue, project) + + now = timeutils.utcnow_ts(True) + + headers, msg = self._find_message(queue, message_id, project) + return utils._message_to_json(message_id, msg, headers, now) + + def _find_message(self, queue, message_id, project): + try: + return self._client.get_object( + utils._message_container(queue, project), message_id) + + except swiftclient.ClientException as exc: + if exc.http_status == 404: + raise errors.MessageDoesNotExist(message_id, queue, project) + else: + raise + + def bulk_delete(self, queue, message_ids, project=None): + for id in message_ids: + try: + self._delete(queue, id, project) + except errors.MessageDoesNotExist: + pass + + def bulk_get(self, queue, message_ids, project=None): + if not self._queue_ctrl.exists(queue, project): + raise StopIteration() + + for id in message_ids: + try: + yield self._get(queue, id, project, check_queue=False) + except errors.MessageDoesNotExist: + pass + + def post(self, queue, messages, client_uuid, project=None): + if not self._queue_ctrl.exists(queue, project): + raise errors.QueueDoesNotExist(queue, project) + + return [self._create_msg(queue, m, client_uuid, project) + for m in messages] + + def _create_msg(self, queue, msg, client_uuid, project): + slug = str(uuid.uuid1()) + contents = jsonutils.dumps( + {'body': msg.get('body', {}), 'claim_id': None, 'ttl': msg['ttl']}) + try: + self._client.put_object( + utils._message_container(queue, project), + slug, + contents=contents, + headers={ + 'x-object-meta-clientid': str(client_uuid), + 'x-delete-after': msg['ttl']}) + except swiftclient.ClientException as exc: + if exc.http_status == 404: + raise errors.QueueDoesNotExist(queue, project) + raise + return slug + + def delete(self, queue, message_id, project=None, claim=None): + try: + msg = self._get(queue, message_id, project) + except (errors.QueueDoesNotExist, errors.MessageDoesNotExist): + return + if claim is None: + if msg['claim_id']: + claim_obj = self._claim_ctrl._get(queue, msg['claim_id'], + project) + if claim_obj is not None and claim_obj['ttl'] > 0: + raise errors.MessageIsClaimed(message_id) + else: + # Check if the claim does exist + self._claim_ctrl._exists(queue, claim, project) + if not msg['claim_id']: + raise errors.MessageNotClaimed(message_id) + elif msg['claim_id'] != claim: + raise errors.MessageNotClaimedBy(message_id, claim) + + self._delete(queue, message_id, project) + + def _delete(self, queue, message_id, project=None): + try: + self._client.delete_object( + utils._message_container(queue, project), message_id) + except swiftclient.ClientException as exc: + if exc.http_status != 404: + raise + + def pop(self, queue, limit, project=None): + # Pop is implemented as a chain of the following operations: + # 1. Create a claim. + # 2. Delete the messages claimed. + # 3. Delete the claim. + + claim_id, messages = self._claim_ctrl.create( + queue, dict(ttl=1, grace=0), project, limit=limit) + + message_ids = [message['id'] for message in messages] + self.bulk_delete(queue, message_ids, project) + return messages + + +class MessageQueueHandler(object): + def __init__(self, driver, control_driver): + self.driver = driver + self._client = self.driver.connection + self._queue_ctrl = self.driver.queue_controller + self._message_ctrl = self.driver.message_controller + self._claim_ctrl = self.driver.claim_controller + + def create(self, name, metadata=None, project=None): + self._client.put_container(utils._message_container(name, project)) + + def delete(self, name, project=None): + for container in [utils._message_container(name, project), + utils._claim_container(name, project)]: + try: + headers, objects = self._client.get_container(container) + for obj in objects: + try: + self._client.delete_object(container, obj['name']) + except swiftclient.ClientException as exc: + if exc.http_status != 404: + raise + self._client.delete_container(container) + except swiftclient.ClientException as exc: + if exc.http_status != 404: + raise + + def stats(self, name, project=None): + if not self._queue_ctrl.exists(name, project=project): + raise errors.QueueDoesNotExist(name, project) + + total = 0 + claimed = 0 + container = utils._message_container(name, project) + _, objects = self._client.get_container(container) + newest = None + oldest = None + now = timeutils.utcnow_ts(True) + for obj in objects: + try: + headers = self._client.head_object(container, obj['name']) + except swiftclient.ClientException as exc: + if exc.http_status != 404: + raise + else: + created = float(headers['x-timestamp']) + newest = { + 'id': obj['name'], + 'age': now - created, + 'created': timeutils.iso8601_from_timestamp(created)} + if oldest is None: + oldest = newest + total += 1 + if headers.get('x-object-meta-claimid'): + claimed += 1 + + msg_stats = { + 'claimed': claimed, + 'free': total - claimed, + 'total': total, + } + if newest is not None: + msg_stats['newest'] = newest + msg_stats['oldest'] = oldest + + return {'messages': msg_stats} + + def exists(self, queue, project=None): + try: + self._client.head_container(utils._message_container(queue, + project)) + + except swiftclient.ClientException as exc: + if exc.http_status == 404: + return False + raise + else: + return True diff --git a/zaqar/storage/swift/options.py b/zaqar/storage/swift/options.py new file mode 100644 index 000000000..e31a3dba8 --- /dev/null +++ b/zaqar/storage/swift/options.py @@ -0,0 +1,31 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Swift storage driver configuration options.""" + +from oslo_config import cfg +MESSAGE_SWIFT_OPTIONS = ( + cfg.StrOpt('auth_url', default="http://127.0.0.1:5000/v3/", + help="URI of Keystone endpoint to discover Swift"), + cfg.StrOpt('uri', + default="swift://demo:nomoresecrete@/demo", + help="Custom URI describing the swift connection."), + cfg.StrOpt('insecure', default=False, help="Don't check SSL certificate"), +) + + +MESSAGE_SWIFT_GROUP = 'drivers:message_store:swift' + + +def _config_options(): + return [(MESSAGE_SWIFT_GROUP, MESSAGE_SWIFT_OPTIONS), ] diff --git a/zaqar/storage/swift/queues.py b/zaqar/storage/swift/queues.py new file mode 100644 index 000000000..e8bbb2c95 --- /dev/null +++ b/zaqar/storage/swift/queues.py @@ -0,0 +1,124 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy +# of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +import functools + +from oslo_serialization import jsonutils +import swiftclient + +from zaqar import storage +from zaqar.storage import errors +from zaqar.storage.swift import utils + + +class QueueController(storage.Queue): + """Implements queue resource operations with swift backend. + + Queues are scoped by project. + + queue -> Swift mapping: + +----------------+---------------------------------------+ + | Attribute | Storage location | + +----------------+---------------------------------------+ + | Queue Name | Object name | + +----------------+---------------------------------------+ + | Project name | Container name prefix | + +----------------+---------------------------------------+ + | Created time | Object Creation Time | + +----------------+---------------------------------------+ + | Queue metadata | Object content | + +----------------+---------------------------------------+ + """ + + def __init__(self, *args, **kwargs): + super(QueueController, self).__init__(*args, **kwargs) + self._client = self.driver.connection + + def _list(self, project=None, marker=None, + limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False): + container = utils._queue_container(project) + try: + _, objects = self._client.get_container(container, + limit=limit, + marker=marker) + except swiftclient.ClientException as exc: + if exc.http_status == 404: + self._client.put_container(container) + objects = [] + else: + raise + marker_next = {} + yield utils.QueueListCursor( + objects, detailed, marker_next, + functools.partial(self._client.get_object, container)) + yield marker_next and marker_next['next'] + + def _get(self, name, project=None): + try: + return self.get_metadata(name, project) + except errors.QueueDoesNotExist: + return {} + + def get_metadata(self, name, project=None): + container = utils._queue_container(project) + try: + _, metadata = self._client.get_object(container, name) + except swiftclient.ClientException as exc: + if exc.http_status == 404: + raise errors.QueueDoesNotExist(name, project) + else: + raise + return jsonutils.loads(metadata) or {} + + def set_metadata(self, name, metadata, project=None): + self._create(name, metadata, project) + + def _create(self, name, metadata=None, project=None): + try: + utils._put_or_create_container( + self._client, utils._queue_container(project), name, + content_type='application/json', + contents=jsonutils.dumps(metadata), + headers={'if-none-match': '*'}) + except swiftclient.ClientException as exc: + if exc.http_status == 412: + if metadata: + # Enforce metadata setting regardless + utils._put_or_create_container( + self._client, utils._queue_container(project), name, + content_type='application/json', + contents=jsonutils.dumps(metadata)) + return False + raise + else: + return True + + def _delete(self, name, project=None): + try: + self._client.delete_object(utils._queue_container(project), name) + except swiftclient.ClientException as exc: + if exc.http_status != 404: + raise + + def _stats(self, name, project=None): + pass + + def _exists(self, name, project=None): + try: + return self._client.head_object(utils._queue_container(project), + name) + except swiftclient.ClientException as exc: + if exc.http_status != 404: + raise + return False + else: + return True diff --git a/zaqar/storage/swift/subscriptions.py b/zaqar/storage/swift/subscriptions.py new file mode 100644 index 000000000..cbc73d238 --- /dev/null +++ b/zaqar/storage/swift/subscriptions.py @@ -0,0 +1,159 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy +# of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +import functools + +from oslo_serialization import jsonutils +from oslo_utils import uuidutils +import swiftclient +import urllib + +from zaqar import storage +from zaqar.storage import errors +from zaqar.storage.swift import utils + + +class SubscriptionController(storage.Subscription): + """Implements subscription resource operations with swift backend. + + Subscriptions are scoped by queue and project. + + subscription -> Swift mapping: + +----------------+---------------------------------------+ + | Attribute | Storage location | + +----------------+---------------------------------------+ + | Sub UUID | Object name | + +----------------+---------------------------------------+ + | Queue Name | Container name prefix | + +----------------+---------------------------------------+ + | Project name | Container name prefix | + +----------------+---------------------------------------+ + | Created time | Object Creation Time | + +----------------+---------------------------------------+ + | Sub options | Object content | + +----------------+---------------------------------------+ + """ + + def __init__(self, *args, **kwargs): + super(SubscriptionController, self).__init__(*args, **kwargs) + self._client = self.driver.connection + + def list(self, queue, project=None, marker=None, + limit=storage.DEFAULT_SUBSCRIPTIONS_PER_PAGE): + container = utils._subscription_container(queue, project) + try: + _, objects = self._client.get_container(container, + limit=limit, + marker=marker) + except swiftclient.ClientException as exc: + if exc.http_status == 404: + objects = [] + else: + raise + marker_next = {} + yield utils.SubscriptionListCursor( + objects, marker_next, + functools.partial(self._client.get_object, container)) + yield marker_next and marker_next['next'] + + def get(self, queue, subscription_id, project=None): + container = utils._subscription_container(queue, project) + try: + headers, data = self._client.get_object(container, subscription_id) + except swiftclient.ClientException as exc: + if exc.http_status == 404: + raise errors.SubscriptionDoesNotExist(subscription_id) + raise + return utils._subscription_to_json(data, headers) + + def create(self, queue, subscriber, ttl, options, project=None): + sub_container = utils._subscriber_container(queue, project) + slug = uuidutils.generate_uuid() + try: + utils._put_or_create_container( + self._client, + sub_container, + urllib.quote_plus(subscriber), + contents=slug, + headers={'x-delete-after': ttl, 'if-none-match': '*'}) + except swiftclient.ClientException as exc: + if exc.http_status == 412: + return + raise + container = utils._subscription_container(queue, project) + data = {'id': slug, + 'source': queue, + 'subscriber': subscriber, + 'options': options, + 'ttl': ttl, + 'confirmed': False} + utils._put_or_create_container( + self._client, container, slug, contents=jsonutils.dumps(data), + headers={'x-delete-after': ttl}) + return slug + + def update(self, queue, subscription_id, project=None, **kwargs): + container = utils._subscription_container(queue, project) + data = self.get(queue, subscription_id, project) + data.pop('age') + ttl = data['ttl'] + if 'subscriber' in kwargs: + sub_container = utils._subscriber_container(queue, project) + try: + self._client.put_object( + sub_container, + urllib.quote_plus(kwargs['subscriber']), + contents=subscription_id, + headers={'x-delete-after': ttl, 'if-none-match': '*'}) + except swiftclient.ClientException as exc: + if exc.http_status == 412: + raise errors.SubscriptionAlreadyExists() + raise + self._client.delete_object(sub_container, + urllib.quote_plus(data['subscriber'])) + data.update(kwargs) + self._client.put_object(container, + subscription_id, + contents=jsonutils.dumps(data), + headers={'x-delete-after': ttl}) + + def exists(self, queue, subscription_id, project=None): + container = utils._subscription_container(queue, project) + return self._client.head_object(container, subscription_id) + + def delete(self, queue, subscription_id, project=None): + try: + data = self.get(queue, subscription_id, project) + except errors.SubscriptionDoesNotExist: + return + sub_container = utils._subscriber_container(queue, project) + try: + self._client.delete_object(sub_container, + urllib.quote_plus(data['subscriber'])) + except swiftclient.ClientException as exc: + if exc.http_status != 404: + raise + container = utils._subscription_container(queue, project) + try: + self._client.delete_object(container, subscription_id) + except swiftclient.ClientException as exc: + if exc.http_status != 404: + raise + + def get_with_subscriber(self, queue, subscriber, project=None): + sub_container = utils._subscriber_container(queue, project) + headers, obj = self._client.get_object(sub_container, + urllib.quote_plus(subscriber)) + return self.get(queue, obj, project) + + def confirm(self, queue, subscription_id, project=None, confirmed=True): + self.update(queue, subscription_id, project, confirmed=True) diff --git a/zaqar/storage/swift/utils.py b/zaqar/storage/swift/utils.py new file mode 100644 index 000000000..1f93ed6ef --- /dev/null +++ b/zaqar/storage/swift/utils.py @@ -0,0 +1,166 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_serialization import jsonutils +from oslo_utils import timeutils +import swiftclient + + +def _message_container(queue, project=None): + return "zaqar_message:%s:%s" % (queue, project) + + +def _claim_container(queue=None, project=None): + return "zaqar_claim:%s:%s" % (queue, project) + + +def _queue_container(project=None): + return "zaqar_queue:%s" % (project,) + + +def _subscription_container(queue, project=None): + return "zaqar_subscription:%s:%s" % (queue, project) + + +def _subscriber_container(queue, project=None): + return "zaqar_subscriber:%s:%s" % (queue, project) + + +def _put_or_create_container(client, *args, **kwargs): + """PUT a swift object to a container that may not exist + + Takes the exact arguments of swiftclient.put_object but will + autocreate a container that doesn't exist + """ + try: + client.put_object(*args, **kwargs) + except swiftclient.ClientException as e: + if e.http_status == 404: + client.put_container(args[0]) + client.put_object(*args, **kwargs) + else: + raise + + +def _message_to_json(message_id, msg, headers, now): + msg = jsonutils.loads(msg) + + return { + 'id': message_id, + 'age': now - float(headers['x-timestamp']), + 'ttl': msg['ttl'], + 'body': msg['body'], + 'claim_id': msg['claim_id'] + } + + +def _subscription_to_json(sub, headers): + sub = jsonutils.loads(sub) + now = timeutils.utcnow_ts(True) + return {'id': sub['id'], + 'age': now - float(headers['x-timestamp']), + 'source': sub['source'], + 'subscriber': sub['subscriber'], + 'ttl': sub['ttl'], + 'options': sub['options'], + 'confirmed': sub['confirmed']} + + +def _filter_messages(messages, filters, marker, get_object, list_objects, + limit): + """Create a filtering iterator over a list of messages. + + The function accepts a list of filters to be filtered + before the the message can be included as a part of the reply. + """ + now = timeutils.utcnow_ts(True) + + for msg in messages: + if msg is None: + continue + + marker['next'] = msg['name'] + try: + headers, obj = get_object(msg['name']) + except swiftclient.ClientException as exc: + if exc.http_status == 404: + continue + raise + obj = jsonutils.loads(obj) + for should_skip in filters: + if should_skip(obj, headers): + break + else: + limit -= 1 + yield { + 'id': marker['next'], + 'ttl': obj['ttl'], + 'client_uuid': headers['x-object-meta-clientid'], + 'body': obj['body'], + 'age': now - float(headers['x-timestamp']), + 'claim_id': obj['claim_id'], + } + if limit <= 0: + break + if limit > 0 and marker: + # We haven't reached the limit, let's try to get some more messages + _, objects = list_objects(marker=marker['next']) + if not objects: + return + for msg in _filter_messages(objects, filters, marker, get_object, + list_objects, limit): + yield msg + + +class QueueListCursor(object): + + def __init__(self, objects, detailed, marker_next, get_object): + self.objects = iter(objects) + self.detailed = detailed + self.marker_next = marker_next + self.get_object = get_object + + def __iter__(self): + return self + + def next(self): + curr = next(self.objects) + self.marker_next['next'] = curr['name'] + queue = {'name': curr['name']} + if self.detailed: + _, metadata = self.get_object(curr['name']) + queue['metadata'] = metadata + return queue + + def __next__(self): + return self.next() + + +class SubscriptionListCursor(object): + + def __init__(self, objects, marker_next, get_object): + self.objects = iter(objects) + self.marker_next = marker_next + self.get_object = get_object + + def __iter__(self): + return self + + def next(self): + curr = next(self.objects) + self.marker_next['next'] = curr['name'] + headers, sub = self.get_object(curr['name']) + return _subscription_to_json(sub, headers) + + def __next__(self): + return self.next() diff --git a/zaqar/tests/__init__.py b/zaqar/tests/__init__.py index 07ed21229..4f7d71804 100644 --- a/zaqar/tests/__init__.py +++ b/zaqar/tests/__init__.py @@ -26,4 +26,5 @@ expect = helpers.expect is_slow = helpers.is_slow requires_mongodb = helpers.requires_mongodb requires_redis = helpers.requires_redis +requires_swift = helpers.requires_swift TestBase = base.TestBase diff --git a/zaqar/tests/etc/wsgi_swift.conf b/zaqar/tests/etc/wsgi_swift.conf new file mode 100644 index 000000000..173806a75 --- /dev/null +++ b/zaqar/tests/etc/wsgi_swift.conf @@ -0,0 +1,11 @@ +[DEFAULT] +debug = False +verbose = False +enable_deprecated_api_versions = 1,1.1 + +[drivers] +transport = wsgi +message_store = swift + +[drivers:transport:wsgi] +port = 8888 diff --git a/zaqar/tests/helpers.py b/zaqar/tests/helpers.py index 15bb03998..647e409b9 100644 --- a/zaqar/tests/helpers.py +++ b/zaqar/tests/helpers.py @@ -33,6 +33,7 @@ def _test_variable_set(variable): SKIP_SLOW_TESTS = _test_variable_set('ZAQAR_TEST_SLOW') SKIP_MONGODB_TESTS = _test_variable_set('ZAQAR_TEST_MONGODB') SKIP_REDIS_TESTS = _test_variable_set('ZAQAR_TEST_REDIS') +SKIP_SWIFT_TESTS = _test_variable_set('ZAQAR_TEST_SWIFT') @contextlib.contextmanager @@ -232,6 +233,21 @@ def requires_redis(test_case): return testtools.skipIf(SKIP_REDIS_TESTS, reason)(test_case) +def requires_swift(test_case): + """Decorator to flag a test case as being dependent on Swift. + + Redis-specific tests will be skipped unless the ZAQAR_TEST_SWIFT + environment variable is set. If the variable is set, the tests will + assume that Swift is accessible and configured properly. + """ + + reason = ('Skipping tests that require Swift. Ensure Swift is running ' + 'and then set ZAQAR_TEST_SWIFT in order to enable tests ' + 'that are specific to this storage backend. ') + + return testtools.skipIf(SKIP_SWIFT_TESTS, reason)(test_case) + + def is_slow(condition=lambda self: True): """Decorator to flag slow tests. diff --git a/zaqar/tests/unit/storage/base.py b/zaqar/tests/unit/storage/base.py index 422535d62..badafc377 100644 --- a/zaqar/tests/unit/storage/base.py +++ b/zaqar/tests/unit/storage/base.py @@ -16,6 +16,7 @@ import collections import datetime +import math import random import time import uuid @@ -568,14 +569,14 @@ class MessageControllerTest(ControllerBaseTest): # perhaps the claim expired before it got around to # trying to delete the message, which means another # worker could be processing this message now. - with testing.expect(errors.NotPermitted): + with testing.expect(errors.NotPermitted, errors.ClaimDoesNotExist): self.controller.delete(self.queue_name, msg2['id'], project=self.project, claim=cid) @testing.is_slow(condition=lambda self: self.gc_interval > 1) def test_expired_messages(self): - messages = [{'body': 3.14, 'ttl': 0}, {'body': 0.618, 'ttl': 600}] + messages = [{'body': 3.14, 'ttl': 1}, {'body': 0.618, 'ttl': 600}] client_uuid = uuid.uuid4() [msgid_expired, msgid] = self.controller.post(self.queue_name, @@ -599,11 +600,6 @@ class MessageControllerTest(ControllerBaseTest): else: self.fail("Didn't remove the queue") - stats = self.queue_controller.stats(self.queue_name, - project=self.project) - - self.assertEqual(1, stats['messages']['free']) - # Make sure expired messages not return when listing interaction = self.controller.list(self.queue_name, project=self.project) @@ -612,6 +608,10 @@ class MessageControllerTest(ControllerBaseTest): self.assertEqual(1, len(messages)) self.assertEqual(msgid, messages[0]['id']) + stats = self.queue_controller.stats(self.queue_name, + project=self.project) + self.assertEqual(1, stats['messages']['free']) + # Make sure expired messages not return when popping messages = self.controller.pop(self.queue_name, limit=10, @@ -950,10 +950,11 @@ class ClaimControllerTest(ControllerBaseTest): self.assertEqual(120, message['ttl']) def test_expired_claim(self): - meta = {'ttl': 0, 'grace': 60} + meta = {'ttl': 1, 'grace': 60} claim_id, messages = self.controller.create(self.queue_name, meta, project=self.project) + time.sleep(1) with testing.expect(errors.DoesNotExist): self.controller.get(self.queue_name, claim_id, @@ -1063,7 +1064,7 @@ class SubscriptionControllerTest(ControllerBaseTest): 'source' in s and 'subscriber' in s, subscriptions))) self.assertEqual(10, len(subscriptions)) - self.assertLessEqual(added_age, subscriptions[2]['age']) + self.assertLessEqual(added_age, math.ceil(subscriptions[2]['age'])) interaction = (self.subscription_controller.list(self.source, project=self.project, @@ -1127,7 +1128,7 @@ class SubscriptionControllerTest(ControllerBaseTest): self.assertEqual(self.subscriber, subscription['subscriber']) self.assertEqual(self.ttl, subscription['ttl']) self.assertEqual(self.options, subscription['options']) - self.assertLessEqual(added_age, subscription['age']) + self.assertLessEqual(added_age, math.ceil(subscription['age'])) exist = self.subscription_controller.exists(self.queue_name, s_id, @@ -1218,8 +1219,8 @@ class SubscriptionControllerTest(ControllerBaseTest): except Exception: self.fail("Subscription controller should not raise an exception " "in case of non-existing queue.") - self.addCleanup(self.subscription_controller.delete, self.source, s_id, - self.project) + self.addCleanup(self.subscription_controller.delete, 'fake_queue_name', + s_id, self.project) @ddt.data(True, False) def test_update_raises_if_try_to_update_to_existing_subscription( diff --git a/zaqar/tests/unit/storage/test_impl_swift.py b/zaqar/tests/unit/storage/test_impl_swift.py new file mode 100644 index 000000000..07f0f0f78 --- /dev/null +++ b/zaqar/tests/unit/storage/test_impl_swift.py @@ -0,0 +1,52 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from zaqar.storage import mongodb +from zaqar.storage.swift import controllers +from zaqar.storage.swift import driver +from zaqar import tests as testing +from zaqar.tests.unit.storage import base + + +@testing.requires_swift +class SwiftQueuesTest(base.QueueControllerTest): + + driver_class = driver.DataDriver + config_file = 'wsgi_swift.conf' + controller_class = controllers.QueueController + control_driver_class = mongodb.ControlDriver + + +@testing.requires_swift +class SwiftMessagesTest(base.MessageControllerTest): + driver_class = driver.DataDriver + config_file = 'wsgi_swift.conf' + controller_class = controllers.MessageController + control_driver_class = mongodb.ControlDriver + gc_interval = 1 + + +@testing.requires_swift +class SwiftClaimsTest(base.ClaimControllerTest): + driver_class = driver.DataDriver + config_file = 'wsgi_swift.conf' + controller_class = controllers.ClaimController + control_driver_class = mongodb.ControlDriver + + +@testing.requires_swift +class SwiftSubscriptionsTest(base.SubscriptionControllerTest): + driver_class = driver.DataDriver + config_file = 'wsgi_swift.conf' + controller_class = controllers.SubscriptionController + control_driver_class = mongodb.ControlDriver