Merge "Swift storage"
This commit is contained in:
commit
cdb3c044da
@ -133,6 +133,15 @@ function configure_zaqar {
|
|||||||
iniset $ZAQAR_CONF 'drivers:message_store:redis' uri redis://localhost:6379
|
iniset $ZAQAR_CONF 'drivers:message_store:redis' uri redis://localhost:6379
|
||||||
iniset $ZAQAR_CONF 'drivers:message_store:redis' database zaqar
|
iniset $ZAQAR_CONF 'drivers:message_store:redis' database zaqar
|
||||||
configure_redis
|
configure_redis
|
||||||
|
elif [ "$ZAQAR_BACKEND" = 'swift' ] ; then
|
||||||
|
recreate_database zaqar
|
||||||
|
iniset $ZAQAR_CONF drivers management_store sqlalchemy
|
||||||
|
iniset $ZAQAR_CONF 'drivers:management_store:sqlalchemy' uri `database_connection_url zaqar`
|
||||||
|
iniset $ZAQAR_CONF 'drivers:management_store:sqlalchemy' database zaqar_mgmt
|
||||||
|
|
||||||
|
iniset $ZAQAR_CONF drivers message_store swift
|
||||||
|
iniset $ZAQAR_CONF 'drivers:message_store:swift' auth_url $KEYSTONE_AUTH_URI_V3
|
||||||
|
iniset $ZAQAR_CONF 'drivers:message_store:swift' uri swift://zaqar:$SERVICE_PASSWORD@/service
|
||||||
fi
|
fi
|
||||||
|
|
||||||
if is_service_enabled qpid || [ -n "$RABBIT_HOST" ] && [ -n "$RABBIT_PASSWORD" ]; then
|
if is_service_enabled qpid || [ -n "$RABBIT_HOST" ] && [ -n "$RABBIT_PASSWORD" ]; then
|
||||||
@ -265,6 +274,9 @@ function create_zaqar_accounts {
|
|||||||
"$ZAQAR_SERVICE_PROTOCOL://$ZAQAR_SERVICE_HOST:$ZAQAR_WEBSOCKET_PORT"
|
"$ZAQAR_SERVICE_PROTOCOL://$ZAQAR_SERVICE_HOST:$ZAQAR_WEBSOCKET_PORT"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
if [ "$ZAQAR_BACKEND" = 'swift' ] ; then
|
||||||
|
get_or_add_user_project_role ResellerAdmin zaqar service
|
||||||
|
fi
|
||||||
}
|
}
|
||||||
|
|
||||||
if is_service_enabled zaqar-websocket || is_service_enabled zaqar-wsgi; then
|
if is_service_enabled zaqar-websocket || is_service_enabled zaqar-wsgi; then
|
||||||
|
@ -8,6 +8,7 @@ namespace = zaqar.storage.pooling
|
|||||||
namespace = zaqar.storage.mongodb
|
namespace = zaqar.storage.mongodb
|
||||||
namespace = zaqar.storage.redis
|
namespace = zaqar.storage.redis
|
||||||
namespace = zaqar.storage.sqlalchemy
|
namespace = zaqar.storage.sqlalchemy
|
||||||
|
namespace = zaqar.storage.swift
|
||||||
namespace = zaqar.transport.wsgi
|
namespace = zaqar.transport.wsgi
|
||||||
namespace = zaqar.transport.base
|
namespace = zaqar.transport.base
|
||||||
namespace = zaqar.transport.validation
|
namespace = zaqar.transport.validation
|
||||||
|
@ -38,6 +38,7 @@ zaqar.data.storage =
|
|||||||
mongodb = zaqar.storage.mongodb.driver:DataDriver
|
mongodb = zaqar.storage.mongodb.driver:DataDriver
|
||||||
mongodb.fifo = zaqar.storage.mongodb.driver:FIFODataDriver
|
mongodb.fifo = zaqar.storage.mongodb.driver:FIFODataDriver
|
||||||
redis = zaqar.storage.redis.driver:DataDriver
|
redis = zaqar.storage.redis.driver:DataDriver
|
||||||
|
swift = zaqar.storage.swift.driver:DataDriver
|
||||||
faulty = zaqar.tests.faulty_storage:DataDriver
|
faulty = zaqar.tests.faulty_storage:DataDriver
|
||||||
|
|
||||||
zaqar.control.storage =
|
zaqar.control.storage =
|
||||||
@ -58,6 +59,7 @@ oslo.config.opts =
|
|||||||
zaqar.storage.mongodb = zaqar.storage.mongodb.options:_config_options
|
zaqar.storage.mongodb = zaqar.storage.mongodb.options:_config_options
|
||||||
zaqar.storage.redis = zaqar.storage.redis.options:_config_options
|
zaqar.storage.redis = zaqar.storage.redis.options:_config_options
|
||||||
zaqar.storage.sqlalchemy = zaqar.storage.sqlalchemy.options:_config_options
|
zaqar.storage.sqlalchemy = zaqar.storage.sqlalchemy.options:_config_options
|
||||||
|
zaqar.storage.swift = zaqar.storage.swift.options:_config_options
|
||||||
zaqar.transport.wsgi = zaqar.transport.wsgi.driver:_config_options
|
zaqar.transport.wsgi = zaqar.transport.wsgi.driver:_config_options
|
||||||
zaqar.transport.websocket = zaqar.transport.websocket.driver:_config_options
|
zaqar.transport.websocket = zaqar.transport.websocket.driver:_config_options
|
||||||
zaqar.transport.base = zaqar.transport.base:_config_options
|
zaqar.transport.base = zaqar.transport.base:_config_options
|
||||||
@ -72,6 +74,9 @@ zaqar.storage.mongodb.driver.queue.stages =
|
|||||||
zaqar.storage.redis.driver.queue.stages =
|
zaqar.storage.redis.driver.queue.stages =
|
||||||
message_queue_handler = zaqar.storage.redis.messages:MessageQueueHandler
|
message_queue_handler = zaqar.storage.redis.messages:MessageQueueHandler
|
||||||
|
|
||||||
|
zaqar.storage.swift.driver.queue.stages =
|
||||||
|
message_queue_handler = zaqar.storage.swift.messages:MessageQueueHandler
|
||||||
|
|
||||||
zaqar.notification.tasks =
|
zaqar.notification.tasks =
|
||||||
http = zaqar.notification.tasks.webhook:WebhookTask
|
http = zaqar.notification.tasks.webhook:WebhookTask
|
||||||
https = zaqar.notification.tasks.webhook:WebhookTask
|
https = zaqar.notification.tasks.webhook:WebhookTask
|
||||||
|
@ -10,6 +10,7 @@ mock>=2.0 # BSD
|
|||||||
# Backends
|
# Backends
|
||||||
redis>=2.10.0 # MIT
|
redis>=2.10.0 # MIT
|
||||||
pymongo!=3.1,>=3.0.2 # Apache-2.0
|
pymongo!=3.1,>=3.0.2 # Apache-2.0
|
||||||
|
python-swiftclient>=3.2.0 # Apache-2.0
|
||||||
websocket-client>=0.32.0 # LGPLv2+
|
websocket-client>=0.32.0 # LGPLv2+
|
||||||
|
|
||||||
# Unit testing
|
# Unit testing
|
||||||
|
0
zaqar/storage/swift/__init__.py
Normal file
0
zaqar/storage/swift/__init__.py
Normal file
188
zaqar/storage/swift/claims.py
Normal file
188
zaqar/storage/swift/claims.py
Normal file
@ -0,0 +1,188 @@
|
|||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import hashlib
|
||||||
|
import math
|
||||||
|
|
||||||
|
from oslo_serialization import jsonutils
|
||||||
|
from oslo_utils import timeutils
|
||||||
|
from oslo_utils import uuidutils
|
||||||
|
import swiftclient
|
||||||
|
|
||||||
|
from zaqar.common import decorators
|
||||||
|
from zaqar import storage
|
||||||
|
from zaqar.storage import errors
|
||||||
|
from zaqar.storage.swift import utils
|
||||||
|
|
||||||
|
|
||||||
|
class ClaimController(storage.Claim):
|
||||||
|
"""Implements claims resource operations with swift backend
|
||||||
|
|
||||||
|
Claims are scoped by project + queue.
|
||||||
|
"""
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super(ClaimController, self).__init__(*args, **kwargs)
|
||||||
|
self._client = self.driver.connection
|
||||||
|
|
||||||
|
@decorators.lazy_property(write=False)
|
||||||
|
def _message_ctrl(self):
|
||||||
|
return self.driver.message_controller
|
||||||
|
|
||||||
|
@decorators.lazy_property(write=False)
|
||||||
|
def _queue_ctrl(self):
|
||||||
|
return self.driver.queue_controller
|
||||||
|
|
||||||
|
def _exists(self, queue, claim_id, project=None):
|
||||||
|
try:
|
||||||
|
return self._client.head_object(
|
||||||
|
utils._claim_container(queue, project),
|
||||||
|
claim_id)
|
||||||
|
except swiftclient.ClientException as exc:
|
||||||
|
if exc.http_status == 404:
|
||||||
|
raise errors.ClaimDoesNotExist(claim_id, queue, project)
|
||||||
|
raise
|
||||||
|
|
||||||
|
def _get(self, queue, claim_id, project=None):
|
||||||
|
try:
|
||||||
|
container = utils._claim_container(queue, project)
|
||||||
|
headers, claim = self._client.get_object(container, claim_id)
|
||||||
|
except swiftclient.ClientException as exc:
|
||||||
|
if exc.http_status != 404:
|
||||||
|
raise
|
||||||
|
return
|
||||||
|
now = timeutils.utcnow_ts(True)
|
||||||
|
return {
|
||||||
|
'id': claim_id,
|
||||||
|
'age': now - float(headers['x-timestamp']),
|
||||||
|
'ttl': int(headers['x-delete-at']) - math.floor(now),
|
||||||
|
}
|
||||||
|
|
||||||
|
def get(self, queue, claim_id, project=None):
|
||||||
|
now = timeutils.utcnow_ts(True)
|
||||||
|
self._exists(queue, claim_id, project)
|
||||||
|
|
||||||
|
container = utils._claim_container(queue, project)
|
||||||
|
|
||||||
|
headers, claim_obj = self._client.get_object(container, claim_id)
|
||||||
|
|
||||||
|
def g():
|
||||||
|
for msg_id in jsonutils.loads(claim_obj):
|
||||||
|
try:
|
||||||
|
headers, msg = self._message_ctrl._find_message(
|
||||||
|
queue, msg_id, project)
|
||||||
|
except errors.MessageDoesNotExist:
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
yield utils._message_to_json(msg_id, msg, headers, now)
|
||||||
|
|
||||||
|
claim_meta = {
|
||||||
|
'id': claim_id,
|
||||||
|
'age': now - float(headers['x-timestamp']),
|
||||||
|
'ttl': int(headers['x-delete-at']) - math.floor(now),
|
||||||
|
}
|
||||||
|
|
||||||
|
return claim_meta, g()
|
||||||
|
|
||||||
|
def create(self, queue, metadata, project=None,
|
||||||
|
limit=storage.DEFAULT_MESSAGES_PER_CLAIM):
|
||||||
|
ttl = metadata['ttl']
|
||||||
|
grace = metadata['grace']
|
||||||
|
msg_ts = ttl + grace
|
||||||
|
claim_id = uuidutils.generate_uuid()
|
||||||
|
|
||||||
|
messages, marker = self._message_ctrl._list(
|
||||||
|
queue, project, limit=limit, include_claimed=False)
|
||||||
|
|
||||||
|
claimed = []
|
||||||
|
for msg in messages:
|
||||||
|
md5 = hashlib.md5()
|
||||||
|
md5.update(
|
||||||
|
jsonutils.dumps(
|
||||||
|
{'body': msg['body'], 'claim_id': None,
|
||||||
|
'ttl': msg['ttl']}))
|
||||||
|
md5 = md5.hexdigest()
|
||||||
|
msg_ttl = max(msg['ttl'], msg_ts)
|
||||||
|
content = jsonutils.dumps(
|
||||||
|
{'body': msg['body'], 'claim_id': claim_id, 'ttl': msg_ttl})
|
||||||
|
try:
|
||||||
|
self._client.put_object(
|
||||||
|
utils._message_container(queue, project),
|
||||||
|
msg['id'],
|
||||||
|
content,
|
||||||
|
headers={'x-object-meta-clientid': msg['client_uuid'],
|
||||||
|
'if-match': md5,
|
||||||
|
'x-object-meta-claimid': claim_id,
|
||||||
|
'x-delete-after': msg_ttl})
|
||||||
|
except swiftclient.ClientException as exc:
|
||||||
|
if exc.http_status == 412:
|
||||||
|
continue
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
msg['claim_id'] = claim_id
|
||||||
|
msg['ttl'] = msg_ttl
|
||||||
|
claimed.append(msg)
|
||||||
|
|
||||||
|
utils._put_or_create_container(
|
||||||
|
self._client,
|
||||||
|
utils._claim_container(queue, project),
|
||||||
|
claim_id,
|
||||||
|
jsonutils.dumps([msg['id'] for msg in claimed]),
|
||||||
|
headers={'x-delete-after': ttl}
|
||||||
|
)
|
||||||
|
|
||||||
|
return claim_id, claimed
|
||||||
|
|
||||||
|
def update(self, queue, claim_id, metadata, project=None):
|
||||||
|
container = utils._claim_container(queue, project)
|
||||||
|
try:
|
||||||
|
headers, obj = self._client.get_object(container, claim_id)
|
||||||
|
except swiftclient.ClientException as exc:
|
||||||
|
if exc.http_status == 404:
|
||||||
|
raise errors.ClaimDoesNotExist(claim_id, queue, project)
|
||||||
|
raise
|
||||||
|
|
||||||
|
self._client.put_object(container, claim_id, obj,
|
||||||
|
headers={'x-delete-after': metadata['ttl']})
|
||||||
|
|
||||||
|
def delete(self, queue, claim_id, project=None):
|
||||||
|
try:
|
||||||
|
header, obj = self._client.get_object(
|
||||||
|
utils._claim_container(queue, project),
|
||||||
|
claim_id)
|
||||||
|
for msg_id in jsonutils.loads(obj):
|
||||||
|
try:
|
||||||
|
headers, msg = self._message_ctrl._find_message(
|
||||||
|
queue, msg_id, project)
|
||||||
|
except errors.MessageDoesNotExist:
|
||||||
|
continue
|
||||||
|
md5 = hashlib.md5()
|
||||||
|
md5.update(msg)
|
||||||
|
md5 = md5.hexdigest()
|
||||||
|
msg = jsonutils.loads(msg)
|
||||||
|
content = jsonutils.dumps(
|
||||||
|
{'body': msg['body'], 'claim_id': None, 'ttl': msg['ttl']})
|
||||||
|
client_id = headers['x-object-meta-clientid']
|
||||||
|
self._client.put_object(
|
||||||
|
utils._message_container(queue, project),
|
||||||
|
msg_id,
|
||||||
|
content,
|
||||||
|
headers={'x-object-meta-clientid': client_id,
|
||||||
|
'if-match': md5,
|
||||||
|
'x-delete-at': headers['x-delete-at']})
|
||||||
|
|
||||||
|
self._client.delete_object(
|
||||||
|
utils._claim_container(queue, project),
|
||||||
|
claim_id)
|
||||||
|
except swiftclient.ClientException as exc:
|
||||||
|
if exc.http_status != 404:
|
||||||
|
raise
|
23
zaqar/storage/swift/controllers.py
Normal file
23
zaqar/storage/swift/controllers.py
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from zaqar.storage.swift import claims
|
||||||
|
from zaqar.storage.swift import messages
|
||||||
|
from zaqar.storage.swift import queues
|
||||||
|
from zaqar.storage.swift import subscriptions
|
||||||
|
|
||||||
|
|
||||||
|
MessageController = messages.MessageController
|
||||||
|
QueueController = queues.QueueController
|
||||||
|
ClaimController = claims.ClaimController
|
||||||
|
SubscriptionController = subscriptions.SubscriptionController
|
78
zaqar/storage/swift/driver.py
Normal file
78
zaqar/storage/swift/driver.py
Normal file
@ -0,0 +1,78 @@
|
|||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from six.moves import urllib
|
||||||
|
|
||||||
|
from oslo_log import log as logging
|
||||||
|
import swiftclient
|
||||||
|
|
||||||
|
from zaqar.common import decorators
|
||||||
|
from zaqar import storage
|
||||||
|
from zaqar.storage.swift import controllers
|
||||||
|
from zaqar.storage.swift import options
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class DataDriver(storage.DataDriverBase):
|
||||||
|
|
||||||
|
_DRIVER_OPTIONS = options._config_options()
|
||||||
|
|
||||||
|
def __init__(self, conf, cache, control_driver):
|
||||||
|
super(DataDriver, self).__init__(conf, cache, control_driver)
|
||||||
|
self.swift_conf = self.conf[options.MESSAGE_SWIFT_GROUP]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def capabilities(self):
|
||||||
|
return (
|
||||||
|
storage.Capabilities.AOD,
|
||||||
|
storage.Capabilities.DURABILITY,
|
||||||
|
)
|
||||||
|
|
||||||
|
@decorators.lazy_property(write=False)
|
||||||
|
def connection(self):
|
||||||
|
return _get_swift_client(self)
|
||||||
|
|
||||||
|
def is_alive(self):
|
||||||
|
return True
|
||||||
|
|
||||||
|
@decorators.lazy_property(write=False)
|
||||||
|
def queue_controller(self):
|
||||||
|
return controllers.QueueController(self)
|
||||||
|
|
||||||
|
@decorators.lazy_property(write=False)
|
||||||
|
def message_controller(self):
|
||||||
|
return controllers.MessageController(self)
|
||||||
|
|
||||||
|
@decorators.lazy_property(write=False)
|
||||||
|
def subscription_controller(self):
|
||||||
|
return controllers.SubscriptionController(self)
|
||||||
|
|
||||||
|
@decorators.lazy_property(write=False)
|
||||||
|
def claim_controller(self):
|
||||||
|
return controllers.ClaimController(self)
|
||||||
|
|
||||||
|
def _health(self):
|
||||||
|
raise NotImplementedError("No health checks")
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def _get_swift_client(driver):
|
||||||
|
conf = driver.swift_conf
|
||||||
|
parsed_url = urllib.parse.urlparse(conf.uri)
|
||||||
|
return swiftclient.Connection(conf.auth_url, parsed_url.username,
|
||||||
|
parsed_url.password,
|
||||||
|
insecure=conf.insecure, auth_version="3",
|
||||||
|
tenant_name=parsed_url.path[1:])
|
352
zaqar/storage/swift/messages.py
Normal file
352
zaqar/storage/swift/messages.py
Normal file
@ -0,0 +1,352 @@
|
|||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import functools
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
from oslo_serialization import jsonutils
|
||||||
|
from oslo_utils import timeutils
|
||||||
|
import swiftclient
|
||||||
|
|
||||||
|
from zaqar.common import decorators
|
||||||
|
from zaqar import storage
|
||||||
|
from zaqar.storage import errors
|
||||||
|
from zaqar.storage.swift import utils
|
||||||
|
|
||||||
|
|
||||||
|
class MessageController(storage.Message):
|
||||||
|
"""Implements message resource operations with swift backend
|
||||||
|
|
||||||
|
Messages are scoped by project + queue.
|
||||||
|
|
||||||
|
message -> Swift mapping:
|
||||||
|
+--------------+-----------------------------------------+
|
||||||
|
| Attribute | Storage location |
|
||||||
|
+--------------+-----------------------------------------+
|
||||||
|
| Msg UUID | Object name |
|
||||||
|
+--------------+-----------------------------------------+
|
||||||
|
| Queue Name | Container name prefix |
|
||||||
|
+--------------+-----------------------------------------+
|
||||||
|
| Project name | Container name prefix |
|
||||||
|
+--------------+-----------------------------------------+
|
||||||
|
| Created time | Object Creation Time |
|
||||||
|
+--------------+-----------------------------------------+
|
||||||
|
| Msg Body | Object content 'body' |
|
||||||
|
+--------------+-----------------------------------------+
|
||||||
|
| Client ID | Object header 'ClientID' |
|
||||||
|
+--------------+-----------------------------------------+
|
||||||
|
| Claim ID | Object content 'claim_id' |
|
||||||
|
+--------------+-----------------------------------------+
|
||||||
|
| Expires | Object Delete-After header |
|
||||||
|
+--------------------------------------------------------+
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super(MessageController, self).__init__(*args, **kwargs)
|
||||||
|
self._client = self.driver.connection
|
||||||
|
|
||||||
|
@decorators.lazy_property(write=False)
|
||||||
|
def _queue_ctrl(self):
|
||||||
|
return self.driver.queue_controller
|
||||||
|
|
||||||
|
@decorators.lazy_property(write=False)
|
||||||
|
def _claim_ctrl(self):
|
||||||
|
return self.driver.claim_controller
|
||||||
|
|
||||||
|
def _delete_queue_messages(self, queue, project, pipe):
|
||||||
|
"""Method to remove all the messages belonging to a queue.
|
||||||
|
|
||||||
|
Will be referenced from the QueueController.
|
||||||
|
The pipe to execute deletion will be passed from the QueueController
|
||||||
|
executing the operation.
|
||||||
|
"""
|
||||||
|
container = utils._message_container(queue, project)
|
||||||
|
remaining = True
|
||||||
|
key = ''
|
||||||
|
while remaining:
|
||||||
|
headers, objects = self._client.get_container(container,
|
||||||
|
limit=1000,
|
||||||
|
marker=key)
|
||||||
|
if not objects:
|
||||||
|
return
|
||||||
|
remaining = len(objects) == 1000
|
||||||
|
key = objects[-1]['name']
|
||||||
|
for o in objects:
|
||||||
|
try:
|
||||||
|
self._client.delete_object(container, o['name'])
|
||||||
|
except swiftclient.ClientException as exc:
|
||||||
|
if exc.http_status == 404:
|
||||||
|
continue
|
||||||
|
raise
|
||||||
|
|
||||||
|
def _list(self, queue, project=None, marker=None,
|
||||||
|
limit=storage.DEFAULT_MESSAGES_PER_PAGE,
|
||||||
|
echo=False, client_uuid=None,
|
||||||
|
include_claimed=False, sort=1):
|
||||||
|
"""List messages in the queue, oldest first(ish)
|
||||||
|
|
||||||
|
Time ordering and message inclusion in lists are soft, there is no
|
||||||
|
global order and times are based on the UTC time of the zaqar-api
|
||||||
|
server that the message was created from.
|
||||||
|
|
||||||
|
Here be consistency dragons.
|
||||||
|
"""
|
||||||
|
if not self._queue_ctrl.exists(queue, project):
|
||||||
|
raise errors.QueueDoesNotExist(queue, project)
|
||||||
|
|
||||||
|
client = self._client
|
||||||
|
container = utils._message_container(queue, project)
|
||||||
|
query_string = None
|
||||||
|
if sort == -1:
|
||||||
|
query_string = 'reverse=on'
|
||||||
|
|
||||||
|
try:
|
||||||
|
_, objects = client.get_container(
|
||||||
|
container,
|
||||||
|
marker=marker,
|
||||||
|
# list 2x the objects because some listing items may have
|
||||||
|
# expired
|
||||||
|
limit=limit * 2,
|
||||||
|
query_string=query_string)
|
||||||
|
except swiftclient.ClientException as exc:
|
||||||
|
if exc.http_status == 404:
|
||||||
|
raise errors.QueueDoesNotExist(queue, project)
|
||||||
|
raise
|
||||||
|
|
||||||
|
def is_claimed(msg, headers):
|
||||||
|
if include_claimed:
|
||||||
|
return False
|
||||||
|
return msg['claim_id'] is not None
|
||||||
|
|
||||||
|
def is_echo(msg, headers):
|
||||||
|
if echo:
|
||||||
|
return False
|
||||||
|
return headers['x-object-meta-clientid'] == str(client_uuid)
|
||||||
|
|
||||||
|
filters = [
|
||||||
|
is_echo,
|
||||||
|
is_claimed,
|
||||||
|
]
|
||||||
|
marker = {}
|
||||||
|
get_object = functools.partial(client.get_object, container)
|
||||||
|
list_objects = functools.partial(client.get_container, container,
|
||||||
|
limit=limit * 2,
|
||||||
|
query_string=query_string)
|
||||||
|
yield utils._filter_messages(objects, filters, marker, get_object,
|
||||||
|
list_objects, limit=limit)
|
||||||
|
yield marker and marker['next']
|
||||||
|
|
||||||
|
def list(self, queue, project=None, marker=None,
|
||||||
|
limit=storage.DEFAULT_MESSAGES_PER_PAGE,
|
||||||
|
echo=False, client_uuid=None,
|
||||||
|
include_claimed=False):
|
||||||
|
return self._list(queue, project, marker, limit, echo,
|
||||||
|
client_uuid, include_claimed)
|
||||||
|
|
||||||
|
def first(self, queue, project=None, sort=1):
|
||||||
|
if sort not in (1, -1):
|
||||||
|
raise ValueError(u'sort must be either 1 (ascending) '
|
||||||
|
u'or -1 (descending)')
|
||||||
|
cursor = self._list(queue, project, limit=1, sort=sort)
|
||||||
|
try:
|
||||||
|
message = next(next(cursor))
|
||||||
|
except StopIteration:
|
||||||
|
raise errors.QueueIsEmpty(queue, project)
|
||||||
|
return message
|
||||||
|
|
||||||
|
def get(self, queue, message_id, project=None):
|
||||||
|
return self._get(queue, message_id, project)
|
||||||
|
|
||||||
|
def _get(self, queue, message_id, project=None, check_queue=True):
|
||||||
|
if check_queue and not self._queue_ctrl.exists(queue, project):
|
||||||
|
raise errors.QueueDoesNotExist(queue, project)
|
||||||
|
|
||||||
|
now = timeutils.utcnow_ts(True)
|
||||||
|
|
||||||
|
headers, msg = self._find_message(queue, message_id, project)
|
||||||
|
return utils._message_to_json(message_id, msg, headers, now)
|
||||||
|
|
||||||
|
def _find_message(self, queue, message_id, project):
|
||||||
|
try:
|
||||||
|
return self._client.get_object(
|
||||||
|
utils._message_container(queue, project), message_id)
|
||||||
|
|
||||||
|
except swiftclient.ClientException as exc:
|
||||||
|
if exc.http_status == 404:
|
||||||
|
raise errors.MessageDoesNotExist(message_id, queue, project)
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
|
def bulk_delete(self, queue, message_ids, project=None):
|
||||||
|
for id in message_ids:
|
||||||
|
try:
|
||||||
|
self._delete(queue, id, project)
|
||||||
|
except errors.MessageDoesNotExist:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def bulk_get(self, queue, message_ids, project=None):
|
||||||
|
if not self._queue_ctrl.exists(queue, project):
|
||||||
|
raise StopIteration()
|
||||||
|
|
||||||
|
for id in message_ids:
|
||||||
|
try:
|
||||||
|
yield self._get(queue, id, project, check_queue=False)
|
||||||
|
except errors.MessageDoesNotExist:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def post(self, queue, messages, client_uuid, project=None):
|
||||||
|
if not self._queue_ctrl.exists(queue, project):
|
||||||
|
raise errors.QueueDoesNotExist(queue, project)
|
||||||
|
|
||||||
|
return [self._create_msg(queue, m, client_uuid, project)
|
||||||
|
for m in messages]
|
||||||
|
|
||||||
|
def _create_msg(self, queue, msg, client_uuid, project):
|
||||||
|
slug = str(uuid.uuid1())
|
||||||
|
contents = jsonutils.dumps(
|
||||||
|
{'body': msg.get('body', {}), 'claim_id': None, 'ttl': msg['ttl']})
|
||||||
|
try:
|
||||||
|
self._client.put_object(
|
||||||
|
utils._message_container(queue, project),
|
||||||
|
slug,
|
||||||
|
contents=contents,
|
||||||
|
headers={
|
||||||
|
'x-object-meta-clientid': str(client_uuid),
|
||||||
|
'x-delete-after': msg['ttl']})
|
||||||
|
except swiftclient.ClientException as exc:
|
||||||
|
if exc.http_status == 404:
|
||||||
|
raise errors.QueueDoesNotExist(queue, project)
|
||||||
|
raise
|
||||||
|
return slug
|
||||||
|
|
||||||
|
def delete(self, queue, message_id, project=None, claim=None):
|
||||||
|
try:
|
||||||
|
msg = self._get(queue, message_id, project)
|
||||||
|
except (errors.QueueDoesNotExist, errors.MessageDoesNotExist):
|
||||||
|
return
|
||||||
|
if claim is None:
|
||||||
|
if msg['claim_id']:
|
||||||
|
claim_obj = self._claim_ctrl._get(queue, msg['claim_id'],
|
||||||
|
project)
|
||||||
|
if claim_obj is not None and claim_obj['ttl'] > 0:
|
||||||
|
raise errors.MessageIsClaimed(message_id)
|
||||||
|
else:
|
||||||
|
# Check if the claim does exist
|
||||||
|
self._claim_ctrl._exists(queue, claim, project)
|
||||||
|
if not msg['claim_id']:
|
||||||
|
raise errors.MessageNotClaimed(message_id)
|
||||||
|
elif msg['claim_id'] != claim:
|
||||||
|
raise errors.MessageNotClaimedBy(message_id, claim)
|
||||||
|
|
||||||
|
self._delete(queue, message_id, project)
|
||||||
|
|
||||||
|
def _delete(self, queue, message_id, project=None):
|
||||||
|
try:
|
||||||
|
self._client.delete_object(
|
||||||
|
utils._message_container(queue, project), message_id)
|
||||||
|
except swiftclient.ClientException as exc:
|
||||||
|
if exc.http_status != 404:
|
||||||
|
raise
|
||||||
|
|
||||||
|
def pop(self, queue, limit, project=None):
|
||||||
|
# Pop is implemented as a chain of the following operations:
|
||||||
|
# 1. Create a claim.
|
||||||
|
# 2. Delete the messages claimed.
|
||||||
|
# 3. Delete the claim.
|
||||||
|
|
||||||
|
claim_id, messages = self._claim_ctrl.create(
|
||||||
|
queue, dict(ttl=1, grace=0), project, limit=limit)
|
||||||
|
|
||||||
|
message_ids = [message['id'] for message in messages]
|
||||||
|
self.bulk_delete(queue, message_ids, project)
|
||||||
|
return messages
|
||||||
|
|
||||||
|
|
||||||
|
class MessageQueueHandler(object):
|
||||||
|
def __init__(self, driver, control_driver):
|
||||||
|
self.driver = driver
|
||||||
|
self._client = self.driver.connection
|
||||||
|
self._queue_ctrl = self.driver.queue_controller
|
||||||
|
self._message_ctrl = self.driver.message_controller
|
||||||
|
self._claim_ctrl = self.driver.claim_controller
|
||||||
|
|
||||||
|
def create(self, name, metadata=None, project=None):
|
||||||
|
self._client.put_container(utils._message_container(name, project))
|
||||||
|
|
||||||
|
def delete(self, name, project=None):
|
||||||
|
for container in [utils._message_container(name, project),
|
||||||
|
utils._claim_container(name, project)]:
|
||||||
|
try:
|
||||||
|
headers, objects = self._client.get_container(container)
|
||||||
|
for obj in objects:
|
||||||
|
try:
|
||||||
|
self._client.delete_object(container, obj['name'])
|
||||||
|
except swiftclient.ClientException as exc:
|
||||||
|
if exc.http_status != 404:
|
||||||
|
raise
|
||||||
|
self._client.delete_container(container)
|
||||||
|
except swiftclient.ClientException as exc:
|
||||||
|
if exc.http_status != 404:
|
||||||
|
raise
|
||||||
|
|
||||||
|
def stats(self, name, project=None):
|
||||||
|
if not self._queue_ctrl.exists(name, project=project):
|
||||||
|
raise errors.QueueDoesNotExist(name, project)
|
||||||
|
|
||||||
|
total = 0
|
||||||
|
claimed = 0
|
||||||
|
container = utils._message_container(name, project)
|
||||||
|
_, objects = self._client.get_container(container)
|
||||||
|
newest = None
|
||||||
|
oldest = None
|
||||||
|
now = timeutils.utcnow_ts(True)
|
||||||
|
for obj in objects:
|
||||||
|
try:
|
||||||
|
headers = self._client.head_object(container, obj['name'])
|
||||||
|
except swiftclient.ClientException as exc:
|
||||||
|
if exc.http_status != 404:
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
created = float(headers['x-timestamp'])
|
||||||
|
newest = {
|
||||||
|
'id': obj['name'],
|
||||||
|
'age': now - created,
|
||||||
|
'created': timeutils.iso8601_from_timestamp(created)}
|
||||||
|
if oldest is None:
|
||||||
|
oldest = newest
|
||||||
|
total += 1
|
||||||
|
if headers.get('x-object-meta-claimid'):
|
||||||
|
claimed += 1
|
||||||
|
|
||||||
|
msg_stats = {
|
||||||
|
'claimed': claimed,
|
||||||
|
'free': total - claimed,
|
||||||
|
'total': total,
|
||||||
|
}
|
||||||
|
if newest is not None:
|
||||||
|
msg_stats['newest'] = newest
|
||||||
|
msg_stats['oldest'] = oldest
|
||||||
|
|
||||||
|
return {'messages': msg_stats}
|
||||||
|
|
||||||
|
def exists(self, queue, project=None):
|
||||||
|
try:
|
||||||
|
self._client.head_container(utils._message_container(queue,
|
||||||
|
project))
|
||||||
|
|
||||||
|
except swiftclient.ClientException as exc:
|
||||||
|
if exc.http_status == 404:
|
||||||
|
return False
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
return True
|
31
zaqar/storage/swift/options.py
Normal file
31
zaqar/storage/swift/options.py
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
"""Swift storage driver configuration options."""
|
||||||
|
|
||||||
|
from oslo_config import cfg
|
||||||
|
MESSAGE_SWIFT_OPTIONS = (
|
||||||
|
cfg.StrOpt('auth_url', default="http://127.0.0.1:5000/v3/",
|
||||||
|
help="URI of Keystone endpoint to discover Swift"),
|
||||||
|
cfg.StrOpt('uri',
|
||||||
|
default="swift://demo:nomoresecrete@/demo",
|
||||||
|
help="Custom URI describing the swift connection."),
|
||||||
|
cfg.StrOpt('insecure', default=False, help="Don't check SSL certificate"),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
MESSAGE_SWIFT_GROUP = 'drivers:message_store:swift'
|
||||||
|
|
||||||
|
|
||||||
|
def _config_options():
|
||||||
|
return [(MESSAGE_SWIFT_GROUP, MESSAGE_SWIFT_OPTIONS), ]
|
124
zaqar/storage/swift/queues.py
Normal file
124
zaqar/storage/swift/queues.py
Normal file
@ -0,0 +1,124 @@
|
|||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
||||||
|
# use this file except in compliance with the License. You may obtain a copy
|
||||||
|
# of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations under
|
||||||
|
# the License.
|
||||||
|
|
||||||
|
import functools
|
||||||
|
|
||||||
|
from oslo_serialization import jsonutils
|
||||||
|
import swiftclient
|
||||||
|
|
||||||
|
from zaqar import storage
|
||||||
|
from zaqar.storage import errors
|
||||||
|
from zaqar.storage.swift import utils
|
||||||
|
|
||||||
|
|
||||||
|
class QueueController(storage.Queue):
|
||||||
|
"""Implements queue resource operations with swift backend.
|
||||||
|
|
||||||
|
Queues are scoped by project.
|
||||||
|
|
||||||
|
queue -> Swift mapping:
|
||||||
|
+----------------+---------------------------------------+
|
||||||
|
| Attribute | Storage location |
|
||||||
|
+----------------+---------------------------------------+
|
||||||
|
| Queue Name | Object name |
|
||||||
|
+----------------+---------------------------------------+
|
||||||
|
| Project name | Container name prefix |
|
||||||
|
+----------------+---------------------------------------+
|
||||||
|
| Created time | Object Creation Time |
|
||||||
|
+----------------+---------------------------------------+
|
||||||
|
| Queue metadata | Object content |
|
||||||
|
+----------------+---------------------------------------+
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super(QueueController, self).__init__(*args, **kwargs)
|
||||||
|
self._client = self.driver.connection
|
||||||
|
|
||||||
|
def _list(self, project=None, marker=None,
|
||||||
|
limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False):
|
||||||
|
container = utils._queue_container(project)
|
||||||
|
try:
|
||||||
|
_, objects = self._client.get_container(container,
|
||||||
|
limit=limit,
|
||||||
|
marker=marker)
|
||||||
|
except swiftclient.ClientException as exc:
|
||||||
|
if exc.http_status == 404:
|
||||||
|
self._client.put_container(container)
|
||||||
|
objects = []
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
marker_next = {}
|
||||||
|
yield utils.QueueListCursor(
|
||||||
|
objects, detailed, marker_next,
|
||||||
|
functools.partial(self._client.get_object, container))
|
||||||
|
yield marker_next and marker_next['next']
|
||||||
|
|
||||||
|
def _get(self, name, project=None):
|
||||||
|
try:
|
||||||
|
return self.get_metadata(name, project)
|
||||||
|
except errors.QueueDoesNotExist:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
def get_metadata(self, name, project=None):
|
||||||
|
container = utils._queue_container(project)
|
||||||
|
try:
|
||||||
|
_, metadata = self._client.get_object(container, name)
|
||||||
|
except swiftclient.ClientException as exc:
|
||||||
|
if exc.http_status == 404:
|
||||||
|
raise errors.QueueDoesNotExist(name, project)
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
return jsonutils.loads(metadata) or {}
|
||||||
|
|
||||||
|
def set_metadata(self, name, metadata, project=None):
|
||||||
|
self._create(name, metadata, project)
|
||||||
|
|
||||||
|
def _create(self, name, metadata=None, project=None):
|
||||||
|
try:
|
||||||
|
utils._put_or_create_container(
|
||||||
|
self._client, utils._queue_container(project), name,
|
||||||
|
content_type='application/json',
|
||||||
|
contents=jsonutils.dumps(metadata),
|
||||||
|
headers={'if-none-match': '*'})
|
||||||
|
except swiftclient.ClientException as exc:
|
||||||
|
if exc.http_status == 412:
|
||||||
|
if metadata:
|
||||||
|
# Enforce metadata setting regardless
|
||||||
|
utils._put_or_create_container(
|
||||||
|
self._client, utils._queue_container(project), name,
|
||||||
|
content_type='application/json',
|
||||||
|
contents=jsonutils.dumps(metadata))
|
||||||
|
return False
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _delete(self, name, project=None):
|
||||||
|
try:
|
||||||
|
self._client.delete_object(utils._queue_container(project), name)
|
||||||
|
except swiftclient.ClientException as exc:
|
||||||
|
if exc.http_status != 404:
|
||||||
|
raise
|
||||||
|
|
||||||
|
def _stats(self, name, project=None):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _exists(self, name, project=None):
|
||||||
|
try:
|
||||||
|
return self._client.head_object(utils._queue_container(project),
|
||||||
|
name)
|
||||||
|
except swiftclient.ClientException as exc:
|
||||||
|
if exc.http_status != 404:
|
||||||
|
raise
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
return True
|
159
zaqar/storage/swift/subscriptions.py
Normal file
159
zaqar/storage/swift/subscriptions.py
Normal file
@ -0,0 +1,159 @@
|
|||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
||||||
|
# use this file except in compliance with the License. You may obtain a copy
|
||||||
|
# of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations under
|
||||||
|
# the License.
|
||||||
|
|
||||||
|
import functools
|
||||||
|
|
||||||
|
from oslo_serialization import jsonutils
|
||||||
|
from oslo_utils import uuidutils
|
||||||
|
import swiftclient
|
||||||
|
import urllib
|
||||||
|
|
||||||
|
from zaqar import storage
|
||||||
|
from zaqar.storage import errors
|
||||||
|
from zaqar.storage.swift import utils
|
||||||
|
|
||||||
|
|
||||||
|
class SubscriptionController(storage.Subscription):
|
||||||
|
"""Implements subscription resource operations with swift backend.
|
||||||
|
|
||||||
|
Subscriptions are scoped by queue and project.
|
||||||
|
|
||||||
|
subscription -> Swift mapping:
|
||||||
|
+----------------+---------------------------------------+
|
||||||
|
| Attribute | Storage location |
|
||||||
|
+----------------+---------------------------------------+
|
||||||
|
| Sub UUID | Object name |
|
||||||
|
+----------------+---------------------------------------+
|
||||||
|
| Queue Name | Container name prefix |
|
||||||
|
+----------------+---------------------------------------+
|
||||||
|
| Project name | Container name prefix |
|
||||||
|
+----------------+---------------------------------------+
|
||||||
|
| Created time | Object Creation Time |
|
||||||
|
+----------------+---------------------------------------+
|
||||||
|
| Sub options | Object content |
|
||||||
|
+----------------+---------------------------------------+
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super(SubscriptionController, self).__init__(*args, **kwargs)
|
||||||
|
self._client = self.driver.connection
|
||||||
|
|
||||||
|
def list(self, queue, project=None, marker=None,
|
||||||
|
limit=storage.DEFAULT_SUBSCRIPTIONS_PER_PAGE):
|
||||||
|
container = utils._subscription_container(queue, project)
|
||||||
|
try:
|
||||||
|
_, objects = self._client.get_container(container,
|
||||||
|
limit=limit,
|
||||||
|
marker=marker)
|
||||||
|
except swiftclient.ClientException as exc:
|
||||||
|
if exc.http_status == 404:
|
||||||
|
objects = []
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
marker_next = {}
|
||||||
|
yield utils.SubscriptionListCursor(
|
||||||
|
objects, marker_next,
|
||||||
|
functools.partial(self._client.get_object, container))
|
||||||
|
yield marker_next and marker_next['next']
|
||||||
|
|
||||||
|
def get(self, queue, subscription_id, project=None):
|
||||||
|
container = utils._subscription_container(queue, project)
|
||||||
|
try:
|
||||||
|
headers, data = self._client.get_object(container, subscription_id)
|
||||||
|
except swiftclient.ClientException as exc:
|
||||||
|
if exc.http_status == 404:
|
||||||
|
raise errors.SubscriptionDoesNotExist(subscription_id)
|
||||||
|
raise
|
||||||
|
return utils._subscription_to_json(data, headers)
|
||||||
|
|
||||||
|
def create(self, queue, subscriber, ttl, options, project=None):
|
||||||
|
sub_container = utils._subscriber_container(queue, project)
|
||||||
|
slug = uuidutils.generate_uuid()
|
||||||
|
try:
|
||||||
|
utils._put_or_create_container(
|
||||||
|
self._client,
|
||||||
|
sub_container,
|
||||||
|
urllib.quote_plus(subscriber),
|
||||||
|
contents=slug,
|
||||||
|
headers={'x-delete-after': ttl, 'if-none-match': '*'})
|
||||||
|
except swiftclient.ClientException as exc:
|
||||||
|
if exc.http_status == 412:
|
||||||
|
return
|
||||||
|
raise
|
||||||
|
container = utils._subscription_container(queue, project)
|
||||||
|
data = {'id': slug,
|
||||||
|
'source': queue,
|
||||||
|
'subscriber': subscriber,
|
||||||
|
'options': options,
|
||||||
|
'ttl': ttl,
|
||||||
|
'confirmed': False}
|
||||||
|
utils._put_or_create_container(
|
||||||
|
self._client, container, slug, contents=jsonutils.dumps(data),
|
||||||
|
headers={'x-delete-after': ttl})
|
||||||
|
return slug
|
||||||
|
|
||||||
|
def update(self, queue, subscription_id, project=None, **kwargs):
|
||||||
|
container = utils._subscription_container(queue, project)
|
||||||
|
data = self.get(queue, subscription_id, project)
|
||||||
|
data.pop('age')
|
||||||
|
ttl = data['ttl']
|
||||||
|
if 'subscriber' in kwargs:
|
||||||
|
sub_container = utils._subscriber_container(queue, project)
|
||||||
|
try:
|
||||||
|
self._client.put_object(
|
||||||
|
sub_container,
|
||||||
|
urllib.quote_plus(kwargs['subscriber']),
|
||||||
|
contents=subscription_id,
|
||||||
|
headers={'x-delete-after': ttl, 'if-none-match': '*'})
|
||||||
|
except swiftclient.ClientException as exc:
|
||||||
|
if exc.http_status == 412:
|
||||||
|
raise errors.SubscriptionAlreadyExists()
|
||||||
|
raise
|
||||||
|
self._client.delete_object(sub_container,
|
||||||
|
urllib.quote_plus(data['subscriber']))
|
||||||
|
data.update(kwargs)
|
||||||
|
self._client.put_object(container,
|
||||||
|
subscription_id,
|
||||||
|
contents=jsonutils.dumps(data),
|
||||||
|
headers={'x-delete-after': ttl})
|
||||||
|
|
||||||
|
def exists(self, queue, subscription_id, project=None):
|
||||||
|
container = utils._subscription_container(queue, project)
|
||||||
|
return self._client.head_object(container, subscription_id)
|
||||||
|
|
||||||
|
def delete(self, queue, subscription_id, project=None):
|
||||||
|
try:
|
||||||
|
data = self.get(queue, subscription_id, project)
|
||||||
|
except errors.SubscriptionDoesNotExist:
|
||||||
|
return
|
||||||
|
sub_container = utils._subscriber_container(queue, project)
|
||||||
|
try:
|
||||||
|
self._client.delete_object(sub_container,
|
||||||
|
urllib.quote_plus(data['subscriber']))
|
||||||
|
except swiftclient.ClientException as exc:
|
||||||
|
if exc.http_status != 404:
|
||||||
|
raise
|
||||||
|
container = utils._subscription_container(queue, project)
|
||||||
|
try:
|
||||||
|
self._client.delete_object(container, subscription_id)
|
||||||
|
except swiftclient.ClientException as exc:
|
||||||
|
if exc.http_status != 404:
|
||||||
|
raise
|
||||||
|
|
||||||
|
def get_with_subscriber(self, queue, subscriber, project=None):
|
||||||
|
sub_container = utils._subscriber_container(queue, project)
|
||||||
|
headers, obj = self._client.get_object(sub_container,
|
||||||
|
urllib.quote_plus(subscriber))
|
||||||
|
return self.get(queue, obj, project)
|
||||||
|
|
||||||
|
def confirm(self, queue, subscription_id, project=None, confirmed=True):
|
||||||
|
self.update(queue, subscription_id, project, confirmed=True)
|
166
zaqar/storage/swift/utils.py
Normal file
166
zaqar/storage/swift/utils.py
Normal file
@ -0,0 +1,166 @@
|
|||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from oslo_serialization import jsonutils
|
||||||
|
from oslo_utils import timeutils
|
||||||
|
import swiftclient
|
||||||
|
|
||||||
|
|
||||||
|
def _message_container(queue, project=None):
|
||||||
|
return "zaqar_message:%s:%s" % (queue, project)
|
||||||
|
|
||||||
|
|
||||||
|
def _claim_container(queue=None, project=None):
|
||||||
|
return "zaqar_claim:%s:%s" % (queue, project)
|
||||||
|
|
||||||
|
|
||||||
|
def _queue_container(project=None):
|
||||||
|
return "zaqar_queue:%s" % (project,)
|
||||||
|
|
||||||
|
|
||||||
|
def _subscription_container(queue, project=None):
|
||||||
|
return "zaqar_subscription:%s:%s" % (queue, project)
|
||||||
|
|
||||||
|
|
||||||
|
def _subscriber_container(queue, project=None):
|
||||||
|
return "zaqar_subscriber:%s:%s" % (queue, project)
|
||||||
|
|
||||||
|
|
||||||
|
def _put_or_create_container(client, *args, **kwargs):
|
||||||
|
"""PUT a swift object to a container that may not exist
|
||||||
|
|
||||||
|
Takes the exact arguments of swiftclient.put_object but will
|
||||||
|
autocreate a container that doesn't exist
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
client.put_object(*args, **kwargs)
|
||||||
|
except swiftclient.ClientException as e:
|
||||||
|
if e.http_status == 404:
|
||||||
|
client.put_container(args[0])
|
||||||
|
client.put_object(*args, **kwargs)
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
def _message_to_json(message_id, msg, headers, now):
|
||||||
|
msg = jsonutils.loads(msg)
|
||||||
|
|
||||||
|
return {
|
||||||
|
'id': message_id,
|
||||||
|
'age': now - float(headers['x-timestamp']),
|
||||||
|
'ttl': msg['ttl'],
|
||||||
|
'body': msg['body'],
|
||||||
|
'claim_id': msg['claim_id']
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _subscription_to_json(sub, headers):
|
||||||
|
sub = jsonutils.loads(sub)
|
||||||
|
now = timeutils.utcnow_ts(True)
|
||||||
|
return {'id': sub['id'],
|
||||||
|
'age': now - float(headers['x-timestamp']),
|
||||||
|
'source': sub['source'],
|
||||||
|
'subscriber': sub['subscriber'],
|
||||||
|
'ttl': sub['ttl'],
|
||||||
|
'options': sub['options'],
|
||||||
|
'confirmed': sub['confirmed']}
|
||||||
|
|
||||||
|
|
||||||
|
def _filter_messages(messages, filters, marker, get_object, list_objects,
|
||||||
|
limit):
|
||||||
|
"""Create a filtering iterator over a list of messages.
|
||||||
|
|
||||||
|
The function accepts a list of filters to be filtered
|
||||||
|
before the the message can be included as a part of the reply.
|
||||||
|
"""
|
||||||
|
now = timeutils.utcnow_ts(True)
|
||||||
|
|
||||||
|
for msg in messages:
|
||||||
|
if msg is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
marker['next'] = msg['name']
|
||||||
|
try:
|
||||||
|
headers, obj = get_object(msg['name'])
|
||||||
|
except swiftclient.ClientException as exc:
|
||||||
|
if exc.http_status == 404:
|
||||||
|
continue
|
||||||
|
raise
|
||||||
|
obj = jsonutils.loads(obj)
|
||||||
|
for should_skip in filters:
|
||||||
|
if should_skip(obj, headers):
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
limit -= 1
|
||||||
|
yield {
|
||||||
|
'id': marker['next'],
|
||||||
|
'ttl': obj['ttl'],
|
||||||
|
'client_uuid': headers['x-object-meta-clientid'],
|
||||||
|
'body': obj['body'],
|
||||||
|
'age': now - float(headers['x-timestamp']),
|
||||||
|
'claim_id': obj['claim_id'],
|
||||||
|
}
|
||||||
|
if limit <= 0:
|
||||||
|
break
|
||||||
|
if limit > 0 and marker:
|
||||||
|
# We haven't reached the limit, let's try to get some more messages
|
||||||
|
_, objects = list_objects(marker=marker['next'])
|
||||||
|
if not objects:
|
||||||
|
return
|
||||||
|
for msg in _filter_messages(objects, filters, marker, get_object,
|
||||||
|
list_objects, limit):
|
||||||
|
yield msg
|
||||||
|
|
||||||
|
|
||||||
|
class QueueListCursor(object):
|
||||||
|
|
||||||
|
def __init__(self, objects, detailed, marker_next, get_object):
|
||||||
|
self.objects = iter(objects)
|
||||||
|
self.detailed = detailed
|
||||||
|
self.marker_next = marker_next
|
||||||
|
self.get_object = get_object
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def next(self):
|
||||||
|
curr = next(self.objects)
|
||||||
|
self.marker_next['next'] = curr['name']
|
||||||
|
queue = {'name': curr['name']}
|
||||||
|
if self.detailed:
|
||||||
|
_, metadata = self.get_object(curr['name'])
|
||||||
|
queue['metadata'] = metadata
|
||||||
|
return queue
|
||||||
|
|
||||||
|
def __next__(self):
|
||||||
|
return self.next()
|
||||||
|
|
||||||
|
|
||||||
|
class SubscriptionListCursor(object):
|
||||||
|
|
||||||
|
def __init__(self, objects, marker_next, get_object):
|
||||||
|
self.objects = iter(objects)
|
||||||
|
self.marker_next = marker_next
|
||||||
|
self.get_object = get_object
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def next(self):
|
||||||
|
curr = next(self.objects)
|
||||||
|
self.marker_next['next'] = curr['name']
|
||||||
|
headers, sub = self.get_object(curr['name'])
|
||||||
|
return _subscription_to_json(sub, headers)
|
||||||
|
|
||||||
|
def __next__(self):
|
||||||
|
return self.next()
|
@ -26,4 +26,5 @@ expect = helpers.expect
|
|||||||
is_slow = helpers.is_slow
|
is_slow = helpers.is_slow
|
||||||
requires_mongodb = helpers.requires_mongodb
|
requires_mongodb = helpers.requires_mongodb
|
||||||
requires_redis = helpers.requires_redis
|
requires_redis = helpers.requires_redis
|
||||||
|
requires_swift = helpers.requires_swift
|
||||||
TestBase = base.TestBase
|
TestBase = base.TestBase
|
||||||
|
11
zaqar/tests/etc/wsgi_swift.conf
Normal file
11
zaqar/tests/etc/wsgi_swift.conf
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
[DEFAULT]
|
||||||
|
debug = False
|
||||||
|
verbose = False
|
||||||
|
enable_deprecated_api_versions = 1,1.1
|
||||||
|
|
||||||
|
[drivers]
|
||||||
|
transport = wsgi
|
||||||
|
message_store = swift
|
||||||
|
|
||||||
|
[drivers:transport:wsgi]
|
||||||
|
port = 8888
|
@ -33,6 +33,7 @@ def _test_variable_set(variable):
|
|||||||
SKIP_SLOW_TESTS = _test_variable_set('ZAQAR_TEST_SLOW')
|
SKIP_SLOW_TESTS = _test_variable_set('ZAQAR_TEST_SLOW')
|
||||||
SKIP_MONGODB_TESTS = _test_variable_set('ZAQAR_TEST_MONGODB')
|
SKIP_MONGODB_TESTS = _test_variable_set('ZAQAR_TEST_MONGODB')
|
||||||
SKIP_REDIS_TESTS = _test_variable_set('ZAQAR_TEST_REDIS')
|
SKIP_REDIS_TESTS = _test_variable_set('ZAQAR_TEST_REDIS')
|
||||||
|
SKIP_SWIFT_TESTS = _test_variable_set('ZAQAR_TEST_SWIFT')
|
||||||
|
|
||||||
|
|
||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
@ -232,6 +233,21 @@ def requires_redis(test_case):
|
|||||||
return testtools.skipIf(SKIP_REDIS_TESTS, reason)(test_case)
|
return testtools.skipIf(SKIP_REDIS_TESTS, reason)(test_case)
|
||||||
|
|
||||||
|
|
||||||
|
def requires_swift(test_case):
|
||||||
|
"""Decorator to flag a test case as being dependent on Swift.
|
||||||
|
|
||||||
|
Redis-specific tests will be skipped unless the ZAQAR_TEST_SWIFT
|
||||||
|
environment variable is set. If the variable is set, the tests will
|
||||||
|
assume that Swift is accessible and configured properly.
|
||||||
|
"""
|
||||||
|
|
||||||
|
reason = ('Skipping tests that require Swift. Ensure Swift is running '
|
||||||
|
'and then set ZAQAR_TEST_SWIFT in order to enable tests '
|
||||||
|
'that are specific to this storage backend. ')
|
||||||
|
|
||||||
|
return testtools.skipIf(SKIP_SWIFT_TESTS, reason)(test_case)
|
||||||
|
|
||||||
|
|
||||||
def is_slow(condition=lambda self: True):
|
def is_slow(condition=lambda self: True):
|
||||||
"""Decorator to flag slow tests.
|
"""Decorator to flag slow tests.
|
||||||
|
|
||||||
|
@ -16,6 +16,7 @@
|
|||||||
|
|
||||||
import collections
|
import collections
|
||||||
import datetime
|
import datetime
|
||||||
|
import math
|
||||||
import random
|
import random
|
||||||
import time
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
@ -568,14 +569,14 @@ class MessageControllerTest(ControllerBaseTest):
|
|||||||
# perhaps the claim expired before it got around to
|
# perhaps the claim expired before it got around to
|
||||||
# trying to delete the message, which means another
|
# trying to delete the message, which means another
|
||||||
# worker could be processing this message now.
|
# worker could be processing this message now.
|
||||||
with testing.expect(errors.NotPermitted):
|
with testing.expect(errors.NotPermitted, errors.ClaimDoesNotExist):
|
||||||
self.controller.delete(self.queue_name, msg2['id'],
|
self.controller.delete(self.queue_name, msg2['id'],
|
||||||
project=self.project,
|
project=self.project,
|
||||||
claim=cid)
|
claim=cid)
|
||||||
|
|
||||||
@testing.is_slow(condition=lambda self: self.gc_interval > 1)
|
@testing.is_slow(condition=lambda self: self.gc_interval > 1)
|
||||||
def test_expired_messages(self):
|
def test_expired_messages(self):
|
||||||
messages = [{'body': 3.14, 'ttl': 0}, {'body': 0.618, 'ttl': 600}]
|
messages = [{'body': 3.14, 'ttl': 1}, {'body': 0.618, 'ttl': 600}]
|
||||||
client_uuid = uuid.uuid4()
|
client_uuid = uuid.uuid4()
|
||||||
|
|
||||||
[msgid_expired, msgid] = self.controller.post(self.queue_name,
|
[msgid_expired, msgid] = self.controller.post(self.queue_name,
|
||||||
@ -599,11 +600,6 @@ class MessageControllerTest(ControllerBaseTest):
|
|||||||
else:
|
else:
|
||||||
self.fail("Didn't remove the queue")
|
self.fail("Didn't remove the queue")
|
||||||
|
|
||||||
stats = self.queue_controller.stats(self.queue_name,
|
|
||||||
project=self.project)
|
|
||||||
|
|
||||||
self.assertEqual(1, stats['messages']['free'])
|
|
||||||
|
|
||||||
# Make sure expired messages not return when listing
|
# Make sure expired messages not return when listing
|
||||||
interaction = self.controller.list(self.queue_name,
|
interaction = self.controller.list(self.queue_name,
|
||||||
project=self.project)
|
project=self.project)
|
||||||
@ -612,6 +608,10 @@ class MessageControllerTest(ControllerBaseTest):
|
|||||||
self.assertEqual(1, len(messages))
|
self.assertEqual(1, len(messages))
|
||||||
self.assertEqual(msgid, messages[0]['id'])
|
self.assertEqual(msgid, messages[0]['id'])
|
||||||
|
|
||||||
|
stats = self.queue_controller.stats(self.queue_name,
|
||||||
|
project=self.project)
|
||||||
|
self.assertEqual(1, stats['messages']['free'])
|
||||||
|
|
||||||
# Make sure expired messages not return when popping
|
# Make sure expired messages not return when popping
|
||||||
messages = self.controller.pop(self.queue_name,
|
messages = self.controller.pop(self.queue_name,
|
||||||
limit=10,
|
limit=10,
|
||||||
@ -950,10 +950,11 @@ class ClaimControllerTest(ControllerBaseTest):
|
|||||||
self.assertEqual(120, message['ttl'])
|
self.assertEqual(120, message['ttl'])
|
||||||
|
|
||||||
def test_expired_claim(self):
|
def test_expired_claim(self):
|
||||||
meta = {'ttl': 0, 'grace': 60}
|
meta = {'ttl': 1, 'grace': 60}
|
||||||
|
|
||||||
claim_id, messages = self.controller.create(self.queue_name, meta,
|
claim_id, messages = self.controller.create(self.queue_name, meta,
|
||||||
project=self.project)
|
project=self.project)
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
with testing.expect(errors.DoesNotExist):
|
with testing.expect(errors.DoesNotExist):
|
||||||
self.controller.get(self.queue_name, claim_id,
|
self.controller.get(self.queue_name, claim_id,
|
||||||
@ -1063,7 +1064,7 @@ class SubscriptionControllerTest(ControllerBaseTest):
|
|||||||
'source' in s and 'subscriber' in s,
|
'source' in s and 'subscriber' in s,
|
||||||
subscriptions)))
|
subscriptions)))
|
||||||
self.assertEqual(10, len(subscriptions))
|
self.assertEqual(10, len(subscriptions))
|
||||||
self.assertLessEqual(added_age, subscriptions[2]['age'])
|
self.assertLessEqual(added_age, math.ceil(subscriptions[2]['age']))
|
||||||
|
|
||||||
interaction = (self.subscription_controller.list(self.source,
|
interaction = (self.subscription_controller.list(self.source,
|
||||||
project=self.project,
|
project=self.project,
|
||||||
@ -1127,7 +1128,7 @@ class SubscriptionControllerTest(ControllerBaseTest):
|
|||||||
self.assertEqual(self.subscriber, subscription['subscriber'])
|
self.assertEqual(self.subscriber, subscription['subscriber'])
|
||||||
self.assertEqual(self.ttl, subscription['ttl'])
|
self.assertEqual(self.ttl, subscription['ttl'])
|
||||||
self.assertEqual(self.options, subscription['options'])
|
self.assertEqual(self.options, subscription['options'])
|
||||||
self.assertLessEqual(added_age, subscription['age'])
|
self.assertLessEqual(added_age, math.ceil(subscription['age']))
|
||||||
|
|
||||||
exist = self.subscription_controller.exists(self.queue_name,
|
exist = self.subscription_controller.exists(self.queue_name,
|
||||||
s_id,
|
s_id,
|
||||||
@ -1218,8 +1219,8 @@ class SubscriptionControllerTest(ControllerBaseTest):
|
|||||||
except Exception:
|
except Exception:
|
||||||
self.fail("Subscription controller should not raise an exception "
|
self.fail("Subscription controller should not raise an exception "
|
||||||
"in case of non-existing queue.")
|
"in case of non-existing queue.")
|
||||||
self.addCleanup(self.subscription_controller.delete, self.source, s_id,
|
self.addCleanup(self.subscription_controller.delete, 'fake_queue_name',
|
||||||
self.project)
|
s_id, self.project)
|
||||||
|
|
||||||
@ddt.data(True, False)
|
@ddt.data(True, False)
|
||||||
def test_update_raises_if_try_to_update_to_existing_subscription(
|
def test_update_raises_if_try_to_update_to_existing_subscription(
|
||||||
|
52
zaqar/tests/unit/storage/test_impl_swift.py
Normal file
52
zaqar/tests/unit/storage/test_impl_swift.py
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from zaqar.storage import mongodb
|
||||||
|
from zaqar.storage.swift import controllers
|
||||||
|
from zaqar.storage.swift import driver
|
||||||
|
from zaqar import tests as testing
|
||||||
|
from zaqar.tests.unit.storage import base
|
||||||
|
|
||||||
|
|
||||||
|
@testing.requires_swift
|
||||||
|
class SwiftQueuesTest(base.QueueControllerTest):
|
||||||
|
|
||||||
|
driver_class = driver.DataDriver
|
||||||
|
config_file = 'wsgi_swift.conf'
|
||||||
|
controller_class = controllers.QueueController
|
||||||
|
control_driver_class = mongodb.ControlDriver
|
||||||
|
|
||||||
|
|
||||||
|
@testing.requires_swift
|
||||||
|
class SwiftMessagesTest(base.MessageControllerTest):
|
||||||
|
driver_class = driver.DataDriver
|
||||||
|
config_file = 'wsgi_swift.conf'
|
||||||
|
controller_class = controllers.MessageController
|
||||||
|
control_driver_class = mongodb.ControlDriver
|
||||||
|
gc_interval = 1
|
||||||
|
|
||||||
|
|
||||||
|
@testing.requires_swift
|
||||||
|
class SwiftClaimsTest(base.ClaimControllerTest):
|
||||||
|
driver_class = driver.DataDriver
|
||||||
|
config_file = 'wsgi_swift.conf'
|
||||||
|
controller_class = controllers.ClaimController
|
||||||
|
control_driver_class = mongodb.ControlDriver
|
||||||
|
|
||||||
|
|
||||||
|
@testing.requires_swift
|
||||||
|
class SwiftSubscriptionsTest(base.SubscriptionControllerTest):
|
||||||
|
driver_class = driver.DataDriver
|
||||||
|
config_file = 'wsgi_swift.conf'
|
||||||
|
controller_class = controllers.SubscriptionController
|
||||||
|
control_driver_class = mongodb.ControlDriver
|
Loading…
Reference in New Issue
Block a user