Swift storage

Implement storing of queues, messages, subscriptions in Swift.

Change-Id: I94e158b35b069ea1fbf8dde17c89ff9b9c758ab4
This commit is contained in:
Ryan S. Brown 2015-05-31 10:32:15 -04:00 committed by Thomas Herve
parent 12091072a3
commit 40083a720c
18 changed files with 1233 additions and 12 deletions

View File

@ -133,6 +133,15 @@ function configure_zaqar {
iniset $ZAQAR_CONF 'drivers:message_store:redis' uri redis://localhost:6379
iniset $ZAQAR_CONF 'drivers:message_store:redis' database zaqar
configure_redis
elif [ "$ZAQAR_BACKEND" = 'swift' ] ; then
recreate_database zaqar
iniset $ZAQAR_CONF drivers management_store sqlalchemy
iniset $ZAQAR_CONF 'drivers:management_store:sqlalchemy' uri `database_connection_url zaqar`
iniset $ZAQAR_CONF 'drivers:management_store:sqlalchemy' database zaqar_mgmt
iniset $ZAQAR_CONF drivers message_store swift
iniset $ZAQAR_CONF 'drivers:message_store:swift' auth_url $KEYSTONE_AUTH_URI_V3
iniset $ZAQAR_CONF 'drivers:message_store:swift' uri swift://zaqar:$SERVICE_PASSWORD@/service
fi
if is_service_enabled qpid || [ -n "$RABBIT_HOST" ] && [ -n "$RABBIT_PASSWORD" ]; then
@ -265,6 +274,9 @@ function create_zaqar_accounts {
"$ZAQAR_SERVICE_PROTOCOL://$ZAQAR_SERVICE_HOST:$ZAQAR_WEBSOCKET_PORT"
fi
if [ "$ZAQAR_BACKEND" = 'swift' ] ; then
get_or_add_user_project_role ResellerAdmin zaqar service
fi
}
if is_service_enabled zaqar-websocket || is_service_enabled zaqar-wsgi; then

View File

@ -8,6 +8,7 @@ namespace = zaqar.storage.pooling
namespace = zaqar.storage.mongodb
namespace = zaqar.storage.redis
namespace = zaqar.storage.sqlalchemy
namespace = zaqar.storage.swift
namespace = zaqar.transport.wsgi
namespace = zaqar.transport.base
namespace = zaqar.transport.validation

View File

@ -38,6 +38,7 @@ zaqar.data.storage =
mongodb = zaqar.storage.mongodb.driver:DataDriver
mongodb.fifo = zaqar.storage.mongodb.driver:FIFODataDriver
redis = zaqar.storage.redis.driver:DataDriver
swift = zaqar.storage.swift.driver:DataDriver
faulty = zaqar.tests.faulty_storage:DataDriver
zaqar.control.storage =
@ -58,6 +59,7 @@ oslo.config.opts =
zaqar.storage.mongodb = zaqar.storage.mongodb.options:_config_options
zaqar.storage.redis = zaqar.storage.redis.options:_config_options
zaqar.storage.sqlalchemy = zaqar.storage.sqlalchemy.options:_config_options
zaqar.storage.swift = zaqar.storage.swift.options:_config_options
zaqar.transport.wsgi = zaqar.transport.wsgi.driver:_config_options
zaqar.transport.websocket = zaqar.transport.websocket.driver:_config_options
zaqar.transport.base = zaqar.transport.base:_config_options
@ -72,6 +74,9 @@ zaqar.storage.mongodb.driver.queue.stages =
zaqar.storage.redis.driver.queue.stages =
message_queue_handler = zaqar.storage.redis.messages:MessageQueueHandler
zaqar.storage.swift.driver.queue.stages =
message_queue_handler = zaqar.storage.swift.messages:MessageQueueHandler
zaqar.notification.tasks =
http = zaqar.notification.tasks.webhook:WebhookTask
https = zaqar.notification.tasks.webhook:WebhookTask

View File

@ -10,6 +10,7 @@ mock>=2.0 # BSD
# Backends
redis>=2.10.0 # MIT
pymongo!=3.1,>=3.0.2 # Apache-2.0
python-swiftclient>=3.2.0 # Apache-2.0
websocket-client>=0.32.0 # LGPLv2+
# Unit testing

View File

View File

@ -0,0 +1,188 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import math
from oslo_serialization import jsonutils
from oslo_utils import timeutils
from oslo_utils import uuidutils
import swiftclient
from zaqar.common import decorators
from zaqar import storage
from zaqar.storage import errors
from zaqar.storage.swift import utils
class ClaimController(storage.Claim):
"""Implements claims resource operations with swift backend
Claims are scoped by project + queue.
"""
def __init__(self, *args, **kwargs):
super(ClaimController, self).__init__(*args, **kwargs)
self._client = self.driver.connection
@decorators.lazy_property(write=False)
def _message_ctrl(self):
return self.driver.message_controller
@decorators.lazy_property(write=False)
def _queue_ctrl(self):
return self.driver.queue_controller
def _exists(self, queue, claim_id, project=None):
try:
return self._client.head_object(
utils._claim_container(queue, project),
claim_id)
except swiftclient.ClientException as exc:
if exc.http_status == 404:
raise errors.ClaimDoesNotExist(claim_id, queue, project)
raise
def _get(self, queue, claim_id, project=None):
try:
container = utils._claim_container(queue, project)
headers, claim = self._client.get_object(container, claim_id)
except swiftclient.ClientException as exc:
if exc.http_status != 404:
raise
return
now = timeutils.utcnow_ts(True)
return {
'id': claim_id,
'age': now - float(headers['x-timestamp']),
'ttl': int(headers['x-delete-at']) - math.floor(now),
}
def get(self, queue, claim_id, project=None):
now = timeutils.utcnow_ts(True)
self._exists(queue, claim_id, project)
container = utils._claim_container(queue, project)
headers, claim_obj = self._client.get_object(container, claim_id)
def g():
for msg_id in jsonutils.loads(claim_obj):
try:
headers, msg = self._message_ctrl._find_message(
queue, msg_id, project)
except errors.MessageDoesNotExist:
continue
else:
yield utils._message_to_json(msg_id, msg, headers, now)
claim_meta = {
'id': claim_id,
'age': now - float(headers['x-timestamp']),
'ttl': int(headers['x-delete-at']) - math.floor(now),
}
return claim_meta, g()
def create(self, queue, metadata, project=None,
limit=storage.DEFAULT_MESSAGES_PER_CLAIM):
ttl = metadata['ttl']
grace = metadata['grace']
msg_ts = ttl + grace
claim_id = uuidutils.generate_uuid()
messages, marker = self._message_ctrl._list(
queue, project, limit=limit, include_claimed=False)
claimed = []
for msg in messages:
md5 = hashlib.md5()
md5.update(
jsonutils.dumps(
{'body': msg['body'], 'claim_id': None,
'ttl': msg['ttl']}))
md5 = md5.hexdigest()
msg_ttl = max(msg['ttl'], msg_ts)
content = jsonutils.dumps(
{'body': msg['body'], 'claim_id': claim_id, 'ttl': msg_ttl})
try:
self._client.put_object(
utils._message_container(queue, project),
msg['id'],
content,
headers={'x-object-meta-clientid': msg['client_uuid'],
'if-match': md5,
'x-object-meta-claimid': claim_id,
'x-delete-after': msg_ttl})
except swiftclient.ClientException as exc:
if exc.http_status == 412:
continue
raise
else:
msg['claim_id'] = claim_id
msg['ttl'] = msg_ttl
claimed.append(msg)
utils._put_or_create_container(
self._client,
utils._claim_container(queue, project),
claim_id,
jsonutils.dumps([msg['id'] for msg in claimed]),
headers={'x-delete-after': ttl}
)
return claim_id, claimed
def update(self, queue, claim_id, metadata, project=None):
container = utils._claim_container(queue, project)
try:
headers, obj = self._client.get_object(container, claim_id)
except swiftclient.ClientException as exc:
if exc.http_status == 404:
raise errors.ClaimDoesNotExist(claim_id, queue, project)
raise
self._client.put_object(container, claim_id, obj,
headers={'x-delete-after': metadata['ttl']})
def delete(self, queue, claim_id, project=None):
try:
header, obj = self._client.get_object(
utils._claim_container(queue, project),
claim_id)
for msg_id in jsonutils.loads(obj):
try:
headers, msg = self._message_ctrl._find_message(
queue, msg_id, project)
except errors.MessageDoesNotExist:
continue
md5 = hashlib.md5()
md5.update(msg)
md5 = md5.hexdigest()
msg = jsonutils.loads(msg)
content = jsonutils.dumps(
{'body': msg['body'], 'claim_id': None, 'ttl': msg['ttl']})
client_id = headers['x-object-meta-clientid']
self._client.put_object(
utils._message_container(queue, project),
msg_id,
content,
headers={'x-object-meta-clientid': client_id,
'if-match': md5,
'x-delete-at': headers['x-delete-at']})
self._client.delete_object(
utils._claim_container(queue, project),
claim_id)
except swiftclient.ClientException as exc:
if exc.http_status != 404:
raise

View File

@ -0,0 +1,23 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from zaqar.storage.swift import claims
from zaqar.storage.swift import messages
from zaqar.storage.swift import queues
from zaqar.storage.swift import subscriptions
MessageController = messages.MessageController
QueueController = queues.QueueController
ClaimController = claims.ClaimController
SubscriptionController = subscriptions.SubscriptionController

View File

@ -0,0 +1,78 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from six.moves import urllib
from oslo_log import log as logging
import swiftclient
from zaqar.common import decorators
from zaqar import storage
from zaqar.storage.swift import controllers
from zaqar.storage.swift import options
LOG = logging.getLogger(__name__)
class DataDriver(storage.DataDriverBase):
_DRIVER_OPTIONS = options._config_options()
def __init__(self, conf, cache, control_driver):
super(DataDriver, self).__init__(conf, cache, control_driver)
self.swift_conf = self.conf[options.MESSAGE_SWIFT_GROUP]
@property
def capabilities(self):
return (
storage.Capabilities.AOD,
storage.Capabilities.DURABILITY,
)
@decorators.lazy_property(write=False)
def connection(self):
return _get_swift_client(self)
def is_alive(self):
return True
@decorators.lazy_property(write=False)
def queue_controller(self):
return controllers.QueueController(self)
@decorators.lazy_property(write=False)
def message_controller(self):
return controllers.MessageController(self)
@decorators.lazy_property(write=False)
def subscription_controller(self):
return controllers.SubscriptionController(self)
@decorators.lazy_property(write=False)
def claim_controller(self):
return controllers.ClaimController(self)
def _health(self):
raise NotImplementedError("No health checks")
def close(self):
pass
def _get_swift_client(driver):
conf = driver.swift_conf
parsed_url = urllib.parse.urlparse(conf.uri)
return swiftclient.Connection(conf.auth_url, parsed_url.username,
parsed_url.password,
insecure=conf.insecure, auth_version="3",
tenant_name=parsed_url.path[1:])

View File

@ -0,0 +1,352 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import uuid
from oslo_serialization import jsonutils
from oslo_utils import timeutils
import swiftclient
from zaqar.common import decorators
from zaqar import storage
from zaqar.storage import errors
from zaqar.storage.swift import utils
class MessageController(storage.Message):
"""Implements message resource operations with swift backend
Messages are scoped by project + queue.
message -> Swift mapping:
+--------------+-----------------------------------------+
| Attribute | Storage location |
+--------------+-----------------------------------------+
| Msg UUID | Object name |
+--------------+-----------------------------------------+
| Queue Name | Container name prefix |
+--------------+-----------------------------------------+
| Project name | Container name prefix |
+--------------+-----------------------------------------+
| Created time | Object Creation Time |
+--------------+-----------------------------------------+
| Msg Body | Object content 'body' |
+--------------+-----------------------------------------+
| Client ID | Object header 'ClientID' |
+--------------+-----------------------------------------+
| Claim ID | Object content 'claim_id' |
+--------------+-----------------------------------------+
| Expires | Object Delete-After header |
+--------------------------------------------------------+
"""
def __init__(self, *args, **kwargs):
super(MessageController, self).__init__(*args, **kwargs)
self._client = self.driver.connection
@decorators.lazy_property(write=False)
def _queue_ctrl(self):
return self.driver.queue_controller
@decorators.lazy_property(write=False)
def _claim_ctrl(self):
return self.driver.claim_controller
def _delete_queue_messages(self, queue, project, pipe):
"""Method to remove all the messages belonging to a queue.
Will be referenced from the QueueController.
The pipe to execute deletion will be passed from the QueueController
executing the operation.
"""
container = utils._message_container(queue, project)
remaining = True
key = ''
while remaining:
headers, objects = self._client.get_container(container,
limit=1000,
marker=key)
if not objects:
return
remaining = len(objects) == 1000
key = objects[-1]['name']
for o in objects:
try:
self._client.delete_object(container, o['name'])
except swiftclient.ClientException as exc:
if exc.http_status == 404:
continue
raise
def _list(self, queue, project=None, marker=None,
limit=storage.DEFAULT_MESSAGES_PER_PAGE,
echo=False, client_uuid=None,
include_claimed=False, sort=1):
"""List messages in the queue, oldest first(ish)
Time ordering and message inclusion in lists are soft, there is no
global order and times are based on the UTC time of the zaqar-api
server that the message was created from.
Here be consistency dragons.
"""
if not self._queue_ctrl.exists(queue, project):
raise errors.QueueDoesNotExist(queue, project)
client = self._client
container = utils._message_container(queue, project)
query_string = None
if sort == -1:
query_string = 'reverse=on'
try:
_, objects = client.get_container(
container,
marker=marker,
# list 2x the objects because some listing items may have
# expired
limit=limit * 2,
query_string=query_string)
except swiftclient.ClientException as exc:
if exc.http_status == 404:
raise errors.QueueDoesNotExist(queue, project)
raise
def is_claimed(msg, headers):
if include_claimed:
return False
return msg['claim_id'] is not None
def is_echo(msg, headers):
if echo:
return False
return headers['x-object-meta-clientid'] == str(client_uuid)
filters = [
is_echo,
is_claimed,
]
marker = {}
get_object = functools.partial(client.get_object, container)
list_objects = functools.partial(client.get_container, container,
limit=limit * 2,
query_string=query_string)
yield utils._filter_messages(objects, filters, marker, get_object,
list_objects, limit=limit)
yield marker and marker['next']
def list(self, queue, project=None, marker=None,
limit=storage.DEFAULT_MESSAGES_PER_PAGE,
echo=False, client_uuid=None,
include_claimed=False):
return self._list(queue, project, marker, limit, echo,
client_uuid, include_claimed)
def first(self, queue, project=None, sort=1):
if sort not in (1, -1):
raise ValueError(u'sort must be either 1 (ascending) '
u'or -1 (descending)')
cursor = self._list(queue, project, limit=1, sort=sort)
try:
message = next(next(cursor))
except StopIteration:
raise errors.QueueIsEmpty(queue, project)
return message
def get(self, queue, message_id, project=None):
return self._get(queue, message_id, project)
def _get(self, queue, message_id, project=None, check_queue=True):
if check_queue and not self._queue_ctrl.exists(queue, project):
raise errors.QueueDoesNotExist(queue, project)
now = timeutils.utcnow_ts(True)
headers, msg = self._find_message(queue, message_id, project)
return utils._message_to_json(message_id, msg, headers, now)
def _find_message(self, queue, message_id, project):
try:
return self._client.get_object(
utils._message_container(queue, project), message_id)
except swiftclient.ClientException as exc:
if exc.http_status == 404:
raise errors.MessageDoesNotExist(message_id, queue, project)
else:
raise
def bulk_delete(self, queue, message_ids, project=None):
for id in message_ids:
try:
self._delete(queue, id, project)
except errors.MessageDoesNotExist:
pass
def bulk_get(self, queue, message_ids, project=None):
if not self._queue_ctrl.exists(queue, project):
raise StopIteration()
for id in message_ids:
try:
yield self._get(queue, id, project, check_queue=False)
except errors.MessageDoesNotExist:
pass
def post(self, queue, messages, client_uuid, project=None):
if not self._queue_ctrl.exists(queue, project):
raise errors.QueueDoesNotExist(queue, project)
return [self._create_msg(queue, m, client_uuid, project)
for m in messages]
def _create_msg(self, queue, msg, client_uuid, project):
slug = str(uuid.uuid1())
contents = jsonutils.dumps(
{'body': msg.get('body', {}), 'claim_id': None, 'ttl': msg['ttl']})
try:
self._client.put_object(
utils._message_container(queue, project),
slug,
contents=contents,
headers={
'x-object-meta-clientid': str(client_uuid),
'x-delete-after': msg['ttl']})
except swiftclient.ClientException as exc:
if exc.http_status == 404:
raise errors.QueueDoesNotExist(queue, project)
raise
return slug
def delete(self, queue, message_id, project=None, claim=None):
try:
msg = self._get(queue, message_id, project)
except (errors.QueueDoesNotExist, errors.MessageDoesNotExist):
return
if claim is None:
if msg['claim_id']:
claim_obj = self._claim_ctrl._get(queue, msg['claim_id'],
project)
if claim_obj is not None and claim_obj['ttl'] > 0:
raise errors.MessageIsClaimed(message_id)
else:
# Check if the claim does exist
self._claim_ctrl._exists(queue, claim, project)
if not msg['claim_id']:
raise errors.MessageNotClaimed(message_id)
elif msg['claim_id'] != claim:
raise errors.MessageNotClaimedBy(message_id, claim)
self._delete(queue, message_id, project)
def _delete(self, queue, message_id, project=None):
try:
self._client.delete_object(
utils._message_container(queue, project), message_id)
except swiftclient.ClientException as exc:
if exc.http_status != 404:
raise
def pop(self, queue, limit, project=None):
# Pop is implemented as a chain of the following operations:
# 1. Create a claim.
# 2. Delete the messages claimed.
# 3. Delete the claim.
claim_id, messages = self._claim_ctrl.create(
queue, dict(ttl=1, grace=0), project, limit=limit)
message_ids = [message['id'] for message in messages]
self.bulk_delete(queue, message_ids, project)
return messages
class MessageQueueHandler(object):
def __init__(self, driver, control_driver):
self.driver = driver
self._client = self.driver.connection
self._queue_ctrl = self.driver.queue_controller
self._message_ctrl = self.driver.message_controller
self._claim_ctrl = self.driver.claim_controller
def create(self, name, metadata=None, project=None):
self._client.put_container(utils._message_container(name, project))
def delete(self, name, project=None):
for container in [utils._message_container(name, project),
utils._claim_container(name, project)]:
try:
headers, objects = self._client.get_container(container)
for obj in objects:
try:
self._client.delete_object(container, obj['name'])
except swiftclient.ClientException as exc:
if exc.http_status != 404:
raise
self._client.delete_container(container)
except swiftclient.ClientException as exc:
if exc.http_status != 404:
raise
def stats(self, name, project=None):
if not self._queue_ctrl.exists(name, project=project):
raise errors.QueueDoesNotExist(name, project)
total = 0
claimed = 0
container = utils._message_container(name, project)
_, objects = self._client.get_container(container)
newest = None
oldest = None
now = timeutils.utcnow_ts(True)
for obj in objects:
try:
headers = self._client.head_object(container, obj['name'])
except swiftclient.ClientException as exc:
if exc.http_status != 404:
raise
else:
created = float(headers['x-timestamp'])
newest = {
'id': obj['name'],
'age': now - created,
'created': timeutils.iso8601_from_timestamp(created)}
if oldest is None:
oldest = newest
total += 1
if headers.get('x-object-meta-claimid'):
claimed += 1
msg_stats = {
'claimed': claimed,
'free': total - claimed,
'total': total,
}
if newest is not None:
msg_stats['newest'] = newest
msg_stats['oldest'] = oldest
return {'messages': msg_stats}
def exists(self, queue, project=None):
try:
self._client.head_container(utils._message_container(queue,
project))
except swiftclient.ClientException as exc:
if exc.http_status == 404:
return False
raise
else:
return True

View File

@ -0,0 +1,31 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Swift storage driver configuration options."""
from oslo_config import cfg
MESSAGE_SWIFT_OPTIONS = (
cfg.StrOpt('auth_url', default="http://127.0.0.1:5000/v3/",
help="URI of Keystone endpoint to discover Swift"),
cfg.StrOpt('uri',
default="swift://demo:nomoresecrete@/demo",
help="Custom URI describing the swift connection."),
cfg.StrOpt('insecure', default=False, help="Don't check SSL certificate"),
)
MESSAGE_SWIFT_GROUP = 'drivers:message_store:swift'
def _config_options():
return [(MESSAGE_SWIFT_GROUP, MESSAGE_SWIFT_OPTIONS), ]

View File

@ -0,0 +1,124 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import functools
from oslo_serialization import jsonutils
import swiftclient
from zaqar import storage
from zaqar.storage import errors
from zaqar.storage.swift import utils
class QueueController(storage.Queue):
"""Implements queue resource operations with swift backend.
Queues are scoped by project.
queue -> Swift mapping:
+----------------+---------------------------------------+
| Attribute | Storage location |
+----------------+---------------------------------------+
| Queue Name | Object name |
+----------------+---------------------------------------+
| Project name | Container name prefix |
+----------------+---------------------------------------+
| Created time | Object Creation Time |
+----------------+---------------------------------------+
| Queue metadata | Object content |
+----------------+---------------------------------------+
"""
def __init__(self, *args, **kwargs):
super(QueueController, self).__init__(*args, **kwargs)
self._client = self.driver.connection
def _list(self, project=None, marker=None,
limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False):
container = utils._queue_container(project)
try:
_, objects = self._client.get_container(container,
limit=limit,
marker=marker)
except swiftclient.ClientException as exc:
if exc.http_status == 404:
self._client.put_container(container)
objects = []
else:
raise
marker_next = {}
yield utils.QueueListCursor(
objects, detailed, marker_next,
functools.partial(self._client.get_object, container))
yield marker_next and marker_next['next']
def _get(self, name, project=None):
try:
return self.get_metadata(name, project)
except errors.QueueDoesNotExist:
return {}
def get_metadata(self, name, project=None):
container = utils._queue_container(project)
try:
_, metadata = self._client.get_object(container, name)
except swiftclient.ClientException as exc:
if exc.http_status == 404:
raise errors.QueueDoesNotExist(name, project)
else:
raise
return jsonutils.loads(metadata) or {}
def set_metadata(self, name, metadata, project=None):
self._create(name, metadata, project)
def _create(self, name, metadata=None, project=None):
try:
utils._put_or_create_container(
self._client, utils._queue_container(project), name,
content_type='application/json',
contents=jsonutils.dumps(metadata),
headers={'if-none-match': '*'})
except swiftclient.ClientException as exc:
if exc.http_status == 412:
if metadata:
# Enforce metadata setting regardless
utils._put_or_create_container(
self._client, utils._queue_container(project), name,
content_type='application/json',
contents=jsonutils.dumps(metadata))
return False
raise
else:
return True
def _delete(self, name, project=None):
try:
self._client.delete_object(utils._queue_container(project), name)
except swiftclient.ClientException as exc:
if exc.http_status != 404:
raise
def _stats(self, name, project=None):
pass
def _exists(self, name, project=None):
try:
return self._client.head_object(utils._queue_container(project),
name)
except swiftclient.ClientException as exc:
if exc.http_status != 404:
raise
return False
else:
return True

View File

@ -0,0 +1,159 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import functools
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
import swiftclient
import urllib
from zaqar import storage
from zaqar.storage import errors
from zaqar.storage.swift import utils
class SubscriptionController(storage.Subscription):
"""Implements subscription resource operations with swift backend.
Subscriptions are scoped by queue and project.
subscription -> Swift mapping:
+----------------+---------------------------------------+
| Attribute | Storage location |
+----------------+---------------------------------------+
| Sub UUID | Object name |
+----------------+---------------------------------------+
| Queue Name | Container name prefix |
+----------------+---------------------------------------+
| Project name | Container name prefix |
+----------------+---------------------------------------+
| Created time | Object Creation Time |
+----------------+---------------------------------------+
| Sub options | Object content |
+----------------+---------------------------------------+
"""
def __init__(self, *args, **kwargs):
super(SubscriptionController, self).__init__(*args, **kwargs)
self._client = self.driver.connection
def list(self, queue, project=None, marker=None,
limit=storage.DEFAULT_SUBSCRIPTIONS_PER_PAGE):
container = utils._subscription_container(queue, project)
try:
_, objects = self._client.get_container(container,
limit=limit,
marker=marker)
except swiftclient.ClientException as exc:
if exc.http_status == 404:
objects = []
else:
raise
marker_next = {}
yield utils.SubscriptionListCursor(
objects, marker_next,
functools.partial(self._client.get_object, container))
yield marker_next and marker_next['next']
def get(self, queue, subscription_id, project=None):
container = utils._subscription_container(queue, project)
try:
headers, data = self._client.get_object(container, subscription_id)
except swiftclient.ClientException as exc:
if exc.http_status == 404:
raise errors.SubscriptionDoesNotExist(subscription_id)
raise
return utils._subscription_to_json(data, headers)
def create(self, queue, subscriber, ttl, options, project=None):
sub_container = utils._subscriber_container(queue, project)
slug = uuidutils.generate_uuid()
try:
utils._put_or_create_container(
self._client,
sub_container,
urllib.quote_plus(subscriber),
contents=slug,
headers={'x-delete-after': ttl, 'if-none-match': '*'})
except swiftclient.ClientException as exc:
if exc.http_status == 412:
return
raise
container = utils._subscription_container(queue, project)
data = {'id': slug,
'source': queue,
'subscriber': subscriber,
'options': options,
'ttl': ttl,
'confirmed': False}
utils._put_or_create_container(
self._client, container, slug, contents=jsonutils.dumps(data),
headers={'x-delete-after': ttl})
return slug
def update(self, queue, subscription_id, project=None, **kwargs):
container = utils._subscription_container(queue, project)
data = self.get(queue, subscription_id, project)
data.pop('age')
ttl = data['ttl']
if 'subscriber' in kwargs:
sub_container = utils._subscriber_container(queue, project)
try:
self._client.put_object(
sub_container,
urllib.quote_plus(kwargs['subscriber']),
contents=subscription_id,
headers={'x-delete-after': ttl, 'if-none-match': '*'})
except swiftclient.ClientException as exc:
if exc.http_status == 412:
raise errors.SubscriptionAlreadyExists()
raise
self._client.delete_object(sub_container,
urllib.quote_plus(data['subscriber']))
data.update(kwargs)
self._client.put_object(container,
subscription_id,
contents=jsonutils.dumps(data),
headers={'x-delete-after': ttl})
def exists(self, queue, subscription_id, project=None):
container = utils._subscription_container(queue, project)
return self._client.head_object(container, subscription_id)
def delete(self, queue, subscription_id, project=None):
try:
data = self.get(queue, subscription_id, project)
except errors.SubscriptionDoesNotExist:
return
sub_container = utils._subscriber_container(queue, project)
try:
self._client.delete_object(sub_container,
urllib.quote_plus(data['subscriber']))
except swiftclient.ClientException as exc:
if exc.http_status != 404:
raise
container = utils._subscription_container(queue, project)
try:
self._client.delete_object(container, subscription_id)
except swiftclient.ClientException as exc:
if exc.http_status != 404:
raise
def get_with_subscriber(self, queue, subscriber, project=None):
sub_container = utils._subscriber_container(queue, project)
headers, obj = self._client.get_object(sub_container,
urllib.quote_plus(subscriber))
return self.get(queue, obj, project)
def confirm(self, queue, subscription_id, project=None, confirmed=True):
self.update(queue, subscription_id, project, confirmed=True)

View File

@ -0,0 +1,166 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_serialization import jsonutils
from oslo_utils import timeutils
import swiftclient
def _message_container(queue, project=None):
return "zaqar_message:%s:%s" % (queue, project)
def _claim_container(queue=None, project=None):
return "zaqar_claim:%s:%s" % (queue, project)
def _queue_container(project=None):
return "zaqar_queue:%s" % (project,)
def _subscription_container(queue, project=None):
return "zaqar_subscription:%s:%s" % (queue, project)
def _subscriber_container(queue, project=None):
return "zaqar_subscriber:%s:%s" % (queue, project)
def _put_or_create_container(client, *args, **kwargs):
"""PUT a swift object to a container that may not exist
Takes the exact arguments of swiftclient.put_object but will
autocreate a container that doesn't exist
"""
try:
client.put_object(*args, **kwargs)
except swiftclient.ClientException as e:
if e.http_status == 404:
client.put_container(args[0])
client.put_object(*args, **kwargs)
else:
raise
def _message_to_json(message_id, msg, headers, now):
msg = jsonutils.loads(msg)
return {
'id': message_id,
'age': now - float(headers['x-timestamp']),
'ttl': msg['ttl'],
'body': msg['body'],
'claim_id': msg['claim_id']
}
def _subscription_to_json(sub, headers):
sub = jsonutils.loads(sub)
now = timeutils.utcnow_ts(True)
return {'id': sub['id'],
'age': now - float(headers['x-timestamp']),
'source': sub['source'],
'subscriber': sub['subscriber'],
'ttl': sub['ttl'],
'options': sub['options'],
'confirmed': sub['confirmed']}
def _filter_messages(messages, filters, marker, get_object, list_objects,
limit):
"""Create a filtering iterator over a list of messages.
The function accepts a list of filters to be filtered
before the the message can be included as a part of the reply.
"""
now = timeutils.utcnow_ts(True)
for msg in messages:
if msg is None:
continue
marker['next'] = msg['name']
try:
headers, obj = get_object(msg['name'])
except swiftclient.ClientException as exc:
if exc.http_status == 404:
continue
raise
obj = jsonutils.loads(obj)
for should_skip in filters:
if should_skip(obj, headers):
break
else:
limit -= 1
yield {
'id': marker['next'],
'ttl': obj['ttl'],
'client_uuid': headers['x-object-meta-clientid'],
'body': obj['body'],
'age': now - float(headers['x-timestamp']),
'claim_id': obj['claim_id'],
}
if limit <= 0:
break
if limit > 0 and marker:
# We haven't reached the limit, let's try to get some more messages
_, objects = list_objects(marker=marker['next'])
if not objects:
return
for msg in _filter_messages(objects, filters, marker, get_object,
list_objects, limit):
yield msg
class QueueListCursor(object):
def __init__(self, objects, detailed, marker_next, get_object):
self.objects = iter(objects)
self.detailed = detailed
self.marker_next = marker_next
self.get_object = get_object
def __iter__(self):
return self
def next(self):
curr = next(self.objects)
self.marker_next['next'] = curr['name']
queue = {'name': curr['name']}
if self.detailed:
_, metadata = self.get_object(curr['name'])
queue['metadata'] = metadata
return queue
def __next__(self):
return self.next()
class SubscriptionListCursor(object):
def __init__(self, objects, marker_next, get_object):
self.objects = iter(objects)
self.marker_next = marker_next
self.get_object = get_object
def __iter__(self):
return self
def next(self):
curr = next(self.objects)
self.marker_next['next'] = curr['name']
headers, sub = self.get_object(curr['name'])
return _subscription_to_json(sub, headers)
def __next__(self):
return self.next()

View File

@ -26,4 +26,5 @@ expect = helpers.expect
is_slow = helpers.is_slow
requires_mongodb = helpers.requires_mongodb
requires_redis = helpers.requires_redis
requires_swift = helpers.requires_swift
TestBase = base.TestBase

View File

@ -0,0 +1,11 @@
[DEFAULT]
debug = False
verbose = False
enable_deprecated_api_versions = 1,1.1
[drivers]
transport = wsgi
message_store = swift
[drivers:transport:wsgi]
port = 8888

View File

@ -33,6 +33,7 @@ def _test_variable_set(variable):
SKIP_SLOW_TESTS = _test_variable_set('ZAQAR_TEST_SLOW')
SKIP_MONGODB_TESTS = _test_variable_set('ZAQAR_TEST_MONGODB')
SKIP_REDIS_TESTS = _test_variable_set('ZAQAR_TEST_REDIS')
SKIP_SWIFT_TESTS = _test_variable_set('ZAQAR_TEST_SWIFT')
@contextlib.contextmanager
@ -232,6 +233,21 @@ def requires_redis(test_case):
return testtools.skipIf(SKIP_REDIS_TESTS, reason)(test_case)
def requires_swift(test_case):
"""Decorator to flag a test case as being dependent on Swift.
Redis-specific tests will be skipped unless the ZAQAR_TEST_SWIFT
environment variable is set. If the variable is set, the tests will
assume that Swift is accessible and configured properly.
"""
reason = ('Skipping tests that require Swift. Ensure Swift is running '
'and then set ZAQAR_TEST_SWIFT in order to enable tests '
'that are specific to this storage backend. ')
return testtools.skipIf(SKIP_SWIFT_TESTS, reason)(test_case)
def is_slow(condition=lambda self: True):
"""Decorator to flag slow tests.

View File

@ -16,6 +16,7 @@
import collections
import datetime
import math
import random
import time
import uuid
@ -568,14 +569,14 @@ class MessageControllerTest(ControllerBaseTest):
# perhaps the claim expired before it got around to
# trying to delete the message, which means another
# worker could be processing this message now.
with testing.expect(errors.NotPermitted):
with testing.expect(errors.NotPermitted, errors.ClaimDoesNotExist):
self.controller.delete(self.queue_name, msg2['id'],
project=self.project,
claim=cid)
@testing.is_slow(condition=lambda self: self.gc_interval > 1)
def test_expired_messages(self):
messages = [{'body': 3.14, 'ttl': 0}, {'body': 0.618, 'ttl': 600}]
messages = [{'body': 3.14, 'ttl': 1}, {'body': 0.618, 'ttl': 600}]
client_uuid = uuid.uuid4()
[msgid_expired, msgid] = self.controller.post(self.queue_name,
@ -599,11 +600,6 @@ class MessageControllerTest(ControllerBaseTest):
else:
self.fail("Didn't remove the queue")
stats = self.queue_controller.stats(self.queue_name,
project=self.project)
self.assertEqual(1, stats['messages']['free'])
# Make sure expired messages not return when listing
interaction = self.controller.list(self.queue_name,
project=self.project)
@ -612,6 +608,10 @@ class MessageControllerTest(ControllerBaseTest):
self.assertEqual(1, len(messages))
self.assertEqual(msgid, messages[0]['id'])
stats = self.queue_controller.stats(self.queue_name,
project=self.project)
self.assertEqual(1, stats['messages']['free'])
# Make sure expired messages not return when popping
messages = self.controller.pop(self.queue_name,
limit=10,
@ -950,10 +950,11 @@ class ClaimControllerTest(ControllerBaseTest):
self.assertEqual(120, message['ttl'])
def test_expired_claim(self):
meta = {'ttl': 0, 'grace': 60}
meta = {'ttl': 1, 'grace': 60}
claim_id, messages = self.controller.create(self.queue_name, meta,
project=self.project)
time.sleep(1)
with testing.expect(errors.DoesNotExist):
self.controller.get(self.queue_name, claim_id,
@ -1063,7 +1064,7 @@ class SubscriptionControllerTest(ControllerBaseTest):
'source' in s and 'subscriber' in s,
subscriptions)))
self.assertEqual(10, len(subscriptions))
self.assertLessEqual(added_age, subscriptions[2]['age'])
self.assertLessEqual(added_age, math.ceil(subscriptions[2]['age']))
interaction = (self.subscription_controller.list(self.source,
project=self.project,
@ -1127,7 +1128,7 @@ class SubscriptionControllerTest(ControllerBaseTest):
self.assertEqual(self.subscriber, subscription['subscriber'])
self.assertEqual(self.ttl, subscription['ttl'])
self.assertEqual(self.options, subscription['options'])
self.assertLessEqual(added_age, subscription['age'])
self.assertLessEqual(added_age, math.ceil(subscription['age']))
exist = self.subscription_controller.exists(self.queue_name,
s_id,
@ -1218,8 +1219,8 @@ class SubscriptionControllerTest(ControllerBaseTest):
except Exception:
self.fail("Subscription controller should not raise an exception "
"in case of non-existing queue.")
self.addCleanup(self.subscription_controller.delete, self.source, s_id,
self.project)
self.addCleanup(self.subscription_controller.delete, 'fake_queue_name',
s_id, self.project)
@ddt.data(True, False)
def test_update_raises_if_try_to_update_to_existing_subscription(

View File

@ -0,0 +1,52 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from zaqar.storage import mongodb
from zaqar.storage.swift import controllers
from zaqar.storage.swift import driver
from zaqar import tests as testing
from zaqar.tests.unit.storage import base
@testing.requires_swift
class SwiftQueuesTest(base.QueueControllerTest):
driver_class = driver.DataDriver
config_file = 'wsgi_swift.conf'
controller_class = controllers.QueueController
control_driver_class = mongodb.ControlDriver
@testing.requires_swift
class SwiftMessagesTest(base.MessageControllerTest):
driver_class = driver.DataDriver
config_file = 'wsgi_swift.conf'
controller_class = controllers.MessageController
control_driver_class = mongodb.ControlDriver
gc_interval = 1
@testing.requires_swift
class SwiftClaimsTest(base.ClaimControllerTest):
driver_class = driver.DataDriver
config_file = 'wsgi_swift.conf'
controller_class = controllers.ClaimController
control_driver_class = mongodb.ControlDriver
@testing.requires_swift
class SwiftSubscriptionsTest(base.SubscriptionControllerTest):
driver_class = driver.DataDriver
config_file = 'wsgi_swift.conf'
controller_class = controllers.SubscriptionController
control_driver_class = mongodb.ControlDriver