Merge "Implements queue, message and claim controllers for Redis"
This commit is contained in:
commit
89aa8d0734
@ -142,6 +142,26 @@
|
|||||||
# Storage driver to use. (string value)
|
# Storage driver to use. (string value)
|
||||||
#storage=sqlite
|
#storage=sqlite
|
||||||
|
|
||||||
|
[drivers:storage:redis]
|
||||||
|
|
||||||
|
#
|
||||||
|
# Options defined in zaqar.queues.storage.redis
|
||||||
|
#
|
||||||
|
|
||||||
|
# Redis Server URI. Socket file based connectors are supported.
|
||||||
|
# Example for socket file connector: redis:/tmp/redis.sock'
|
||||||
|
#uri=<None>
|
||||||
|
|
||||||
|
# Maximum number of times to retry an operation that failed
|
||||||
|
# due to a primary node failover. (integer value)
|
||||||
|
#max_reconnect_attempts=10
|
||||||
|
|
||||||
|
# Base sleep interval between attempts to reconnect after a
|
||||||
|
# primary node failover. The actual sleep time increases
|
||||||
|
# exponentially (power of 2) each time the operation is
|
||||||
|
# retried. (floating point value)
|
||||||
|
#reconnect_sleep=0.02
|
||||||
|
|
||||||
|
|
||||||
[drivers:storage:mongodb]
|
[drivers:storage:mongodb]
|
||||||
|
|
||||||
|
@ -41,12 +41,14 @@ zaqar.queues.data.storage =
|
|||||||
sqlite = zaqar.queues.storage.sqlalchemy.driver:DataDriver
|
sqlite = zaqar.queues.storage.sqlalchemy.driver:DataDriver
|
||||||
sqlalchemy = zaqar.queues.storage.sqlalchemy.driver:DataDriver
|
sqlalchemy = zaqar.queues.storage.sqlalchemy.driver:DataDriver
|
||||||
mongodb = zaqar.queues.storage.mongodb.driver:DataDriver
|
mongodb = zaqar.queues.storage.mongodb.driver:DataDriver
|
||||||
|
redis = zaqar.queues.storage.redis.driver:DataDriver
|
||||||
faulty = zaqar.tests.faulty_storage:DataDriver
|
faulty = zaqar.tests.faulty_storage:DataDriver
|
||||||
|
|
||||||
zaqar.queues.control.storage =
|
zaqar.queues.control.storage =
|
||||||
sqlite = zaqar.queues.storage.sqlalchemy.driver:ControlDriver
|
sqlite = zaqar.queues.storage.sqlalchemy.driver:ControlDriver
|
||||||
sqlalchemy = zaqar.queues.storage.sqlalchemy.driver:ControlDriver
|
sqlalchemy = zaqar.queues.storage.sqlalchemy.driver:ControlDriver
|
||||||
mongodb = zaqar.queues.storage.mongodb.driver:ControlDriver
|
mongodb = zaqar.queues.storage.mongodb.driver:ControlDriver
|
||||||
|
redis = zaqar.queues.storage.redis.driver:ControlDriver
|
||||||
faulty = zaqar.tests.faulty_storage:ControlDriver
|
faulty = zaqar.tests.faulty_storage:ControlDriver
|
||||||
|
|
||||||
zaqar.queues.transport =
|
zaqar.queues.transport =
|
||||||
@ -60,6 +62,7 @@ oslo.config.opts =
|
|||||||
zaqar.queues.storage.pipeline = zaqar.queues.storage.pipeline:_config_options
|
zaqar.queues.storage.pipeline = zaqar.queues.storage.pipeline:_config_options
|
||||||
zaqar.queues.storage.pooling = zaqar.queues.storage.pooling:_config_options
|
zaqar.queues.storage.pooling = zaqar.queues.storage.pooling:_config_options
|
||||||
zaqar.queues.storage.mongodb = zaqar.queues.storage.mongodb.options:_config_options
|
zaqar.queues.storage.mongodb = zaqar.queues.storage.mongodb.options:_config_options
|
||||||
|
zaqar.queues.storage.redis = zaqar.queues.storage.redis.options:_config_option
|
||||||
zaqar.queues.storage.sqlalchemy = zaqar.queues.storage.sqlalchemy.options:_config_options
|
zaqar.queues.storage.sqlalchemy = zaqar.queues.storage.sqlalchemy.options:_config_options
|
||||||
zaqar.queues.transport.wsgi = zaqar.queues.transport.wsgi.driver:_config_options
|
zaqar.queues.transport.wsgi = zaqar.queues.transport.wsgi.driver:_config_options
|
||||||
zaqar.queues.transport.base = zaqar.queues.transport.base:_config_options
|
zaqar.queues.transport.base = zaqar.queues.transport.base:_config_options
|
||||||
|
15
tests/etc/wsgi_redis.conf
Normal file
15
tests/etc/wsgi_redis.conf
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
[DEFAULT]
|
||||||
|
debug = False
|
||||||
|
verbose = False
|
||||||
|
|
||||||
|
[drivers]
|
||||||
|
transport = wsgi
|
||||||
|
storage = redis
|
||||||
|
|
||||||
|
[drivers:transport:wsgi]
|
||||||
|
port = 8888
|
||||||
|
|
||||||
|
[drivers:storage:redis]
|
||||||
|
uri = redis://127.0.0.1:6379
|
||||||
|
max_reconnect_attempts = 3
|
||||||
|
reconnect_sleep = 1
|
11
tests/etc/wsgi_redis_pooled.conf
Normal file
11
tests/etc/wsgi_redis_pooled.conf
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
[DEFAULT]
|
||||||
|
pooling = True
|
||||||
|
|
||||||
|
[drivers]
|
||||||
|
transport = wsgi
|
||||||
|
storage = redis
|
||||||
|
|
||||||
|
[drivers:storage:redis]
|
||||||
|
uri = redis://127.0.0.1:6379
|
||||||
|
max_reconnect_attempts = 3
|
||||||
|
reconnect_sleep = 1
|
@ -29,11 +29,15 @@ class TestUtils(testing.TestBase):
|
|||||||
def test_can_connect_suceeds_if_good_uri_sqlite(self):
|
def test_can_connect_suceeds_if_good_uri_sqlite(self):
|
||||||
self.assertTrue(utils.can_connect('sqlite://:memory:'))
|
self.assertTrue(utils.can_connect('sqlite://:memory:'))
|
||||||
|
|
||||||
@ddt.data(
|
def test_can_connect_fails_if_bad_uri_missing_schema(self):
|
||||||
'mongodb://localhost:27018', # wrong port
|
self.assertFalse(utils.can_connect('localhost:27017'))
|
||||||
'localhost:27017', # missing scheme
|
|
||||||
'redis://localhost:6379' # not supported with default install
|
|
||||||
)
|
|
||||||
@testing.requires_mongodb
|
@testing.requires_mongodb
|
||||||
def test_can_connect_fails_if_bad_uri(self, uri):
|
def test_can_connect_fails_if_bad_uri_mongodb(self):
|
||||||
self.assertFalse(utils.can_connect(uri))
|
self.assertFalse(utils.can_connect('mongodb://localhost:8080'))
|
||||||
|
self.assertFalse(utils.can_connect('mongodb://example.com:27017'))
|
||||||
|
|
||||||
|
@testing.requires_redis
|
||||||
|
def test_can_connect_fails_if_bad_uri_redis(self):
|
||||||
|
self.assertFalse(utils.can_connect('redis://localhost:8080'))
|
||||||
|
self.assertFalse(utils.can_connect('redis://example.com:6379'))
|
||||||
|
277
tests/unit/queues/storage/test_impl_redis.py
Normal file
277
tests/unit/queues/storage/test_impl_redis.py
Normal file
@ -0,0 +1,277 @@
|
|||||||
|
# Copyright (c) 2014 Prashanth Raghu.
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import collections
|
||||||
|
import time
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
import redis
|
||||||
|
|
||||||
|
from zaqar.openstack.common.cache import cache as oslo_cache
|
||||||
|
from zaqar.openstack.common import timeutils
|
||||||
|
from zaqar.queues import storage
|
||||||
|
from zaqar.queues.storage.redis import controllers
|
||||||
|
from zaqar.queues.storage.redis import driver
|
||||||
|
from zaqar.queues.storage.redis import messages
|
||||||
|
from zaqar.queues.storage.redis import options
|
||||||
|
from zaqar.queues.storage.redis import utils
|
||||||
|
from zaqar import tests as testing
|
||||||
|
from zaqar.tests.queues.storage import base
|
||||||
|
|
||||||
|
|
||||||
|
def _create_sample_message(now=None, claimed=False, body=None):
|
||||||
|
if now is None:
|
||||||
|
now = timeutils.utcnow_ts()
|
||||||
|
|
||||||
|
if claimed:
|
||||||
|
claim_id = uuid.uuid4()
|
||||||
|
claim_expires = now + 300
|
||||||
|
else:
|
||||||
|
claim_id = None
|
||||||
|
claim_expires = now
|
||||||
|
|
||||||
|
if body is None:
|
||||||
|
body = {}
|
||||||
|
|
||||||
|
return messages.Message(
|
||||||
|
ttl=60,
|
||||||
|
created=now,
|
||||||
|
client_uuid=uuid.uuid4(),
|
||||||
|
claim_id=claim_id,
|
||||||
|
claim_expires=claim_expires,
|
||||||
|
body=body
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class RedisUtilsTest(testing.TestBase):
|
||||||
|
|
||||||
|
config_file = 'wsgi_redis.conf'
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(RedisUtilsTest, self).setUp()
|
||||||
|
|
||||||
|
self.conf.register_opts(options.REDIS_OPTIONS,
|
||||||
|
group=options.REDIS_GROUP)
|
||||||
|
|
||||||
|
self.redis_conf = self.conf[options.REDIS_GROUP]
|
||||||
|
|
||||||
|
MockDriver = collections.namedtuple('MockDriver', 'redis_conf')
|
||||||
|
|
||||||
|
self.driver = MockDriver(self.redis_conf)
|
||||||
|
|
||||||
|
def test_scope_queue_name(self):
|
||||||
|
self.assertEqual(utils.scope_queue_name('my-q'), '.my-q')
|
||||||
|
self.assertEqual(utils.scope_queue_name('my-q', None), '.my-q')
|
||||||
|
self.assertEqual(utils.scope_queue_name('my-q', '123'), '123.my-q')
|
||||||
|
self.assertEqual(utils.scope_queue_name('my-q_1', '123'), '123.my-q_1')
|
||||||
|
|
||||||
|
self.assertEqual(utils.scope_queue_name(), '.')
|
||||||
|
self.assertEqual(utils.scope_queue_name(None, '123'), '123.')
|
||||||
|
|
||||||
|
def test_scope_messages_set(self):
|
||||||
|
self.assertEqual(utils.scope_message_ids_set('my-q'), '.my-q.')
|
||||||
|
self.assertEqual(utils.scope_message_ids_set('my-q', 'p'), 'p.my-q.')
|
||||||
|
self.assertEqual(utils.scope_message_ids_set('my-q', 'p', 's'),
|
||||||
|
'p.my-q.s')
|
||||||
|
|
||||||
|
self.assertEqual(utils.scope_message_ids_set(None), '..')
|
||||||
|
self.assertEqual(utils.scope_message_ids_set(None, '123'), '123..')
|
||||||
|
self.assertEqual(utils.scope_message_ids_set(None, None, 's'), '..s')
|
||||||
|
|
||||||
|
def test_normalize_none_str(self):
|
||||||
|
self.assertEqual(utils.normalize_none_str('my-q'), 'my-q')
|
||||||
|
self.assertEqual(utils.normalize_none_str(None), '')
|
||||||
|
|
||||||
|
def test_msg_claimed_filter(self):
|
||||||
|
now = timeutils.utcnow_ts()
|
||||||
|
|
||||||
|
unclaimed_msg = _create_sample_message()
|
||||||
|
self.assertFalse(utils.msg_claimed_filter(unclaimed_msg, now))
|
||||||
|
|
||||||
|
claimed_msg = _create_sample_message(claimed=True)
|
||||||
|
self.assertTrue(utils.msg_claimed_filter(claimed_msg, now))
|
||||||
|
|
||||||
|
# NOTE(kgriffs): Has a claim ID, but the claim is expired
|
||||||
|
claimed_msg.claim_expires = now - 60
|
||||||
|
self.assertFalse(utils.msg_claimed_filter(claimed_msg, now))
|
||||||
|
|
||||||
|
def test_descope_queue_name(self):
|
||||||
|
self.assertEqual(utils.descope_queue_name('p.q'), 'q')
|
||||||
|
self.assertEqual(utils.descope_queue_name('.q'), 'q')
|
||||||
|
self.assertEqual(utils.descope_queue_name('.'), '')
|
||||||
|
|
||||||
|
def test_msg_echo_filter(self):
|
||||||
|
msg = _create_sample_message()
|
||||||
|
self.assertTrue(utils.msg_echo_filter(msg, msg.client_uuid))
|
||||||
|
|
||||||
|
alt_uuid = utils.generate_uuid()
|
||||||
|
self.assertFalse(utils.msg_echo_filter(msg, alt_uuid))
|
||||||
|
|
||||||
|
def test_basic_message(self):
|
||||||
|
now = timeutils.utcnow_ts()
|
||||||
|
body = {
|
||||||
|
'msg': 'Hello Earthlings!',
|
||||||
|
'unicode': u'ab\u00e7',
|
||||||
|
'bytes': b'ab\xc3\xa7',
|
||||||
|
b'ab\xc3\xa7': 'one, two, three',
|
||||||
|
u'ab\u00e7': 'one, two, three',
|
||||||
|
}
|
||||||
|
|
||||||
|
msg = _create_sample_message(now=now, body=body)
|
||||||
|
basic_msg = msg.to_basic(now + 5)
|
||||||
|
|
||||||
|
self.assertEqual(basic_msg['id'], msg.id)
|
||||||
|
self.assertEqual(basic_msg['age'], 5)
|
||||||
|
self.assertEqual(basic_msg['body'], body)
|
||||||
|
self.assertEqual(basic_msg['ttl'], msg.ttl)
|
||||||
|
|
||||||
|
def test_retries_on_connection_error(self):
|
||||||
|
num_calls = [0]
|
||||||
|
|
||||||
|
@utils.retries_on_connection_error
|
||||||
|
def _raises_connection_error(self):
|
||||||
|
num_calls[0] += 1
|
||||||
|
raise redis.exceptions.ConnectionError
|
||||||
|
|
||||||
|
self.assertRaises(redis.exceptions.ConnectionError,
|
||||||
|
_raises_connection_error, self)
|
||||||
|
self.assertEqual(num_calls, [self.redis_conf.max_reconnect_attempts])
|
||||||
|
|
||||||
|
|
||||||
|
@testing.requires_redis
|
||||||
|
class RedisDriverTest(testing.TestBase):
|
||||||
|
|
||||||
|
config_file = 'wsgi_redis.conf'
|
||||||
|
|
||||||
|
def test_db_instance(self):
|
||||||
|
cache = oslo_cache.get_cache()
|
||||||
|
redis_driver = driver.DataDriver(self.conf, cache)
|
||||||
|
|
||||||
|
self.assertTrue(isinstance(redis_driver.connection, redis.StrictRedis))
|
||||||
|
|
||||||
|
|
||||||
|
@testing.requires_redis
|
||||||
|
class RedisQueuesTest(base.QueueControllerTest):
|
||||||
|
|
||||||
|
driver_class = driver.DataDriver
|
||||||
|
config_file = 'wsgi_redis.conf'
|
||||||
|
controller_class = controllers.QueueController
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(RedisQueuesTest, self).setUp()
|
||||||
|
self.connection = self.driver.connection
|
||||||
|
self.msg_controller = self.driver.message_controller
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
super(RedisQueuesTest, self).tearDown()
|
||||||
|
self.connection.flushdb()
|
||||||
|
|
||||||
|
def test_inc_counter(self):
|
||||||
|
queue_name = 'inc-counter'
|
||||||
|
self.controller.create(queue_name)
|
||||||
|
self.controller._inc_counter(queue_name, None, 10)
|
||||||
|
|
||||||
|
scoped_q_name = utils.scope_queue_name(queue_name)
|
||||||
|
count = self.controller._get_queue_info(scoped_q_name, b'c', int)[0]
|
||||||
|
self.assertEqual(count, 10)
|
||||||
|
|
||||||
|
def test_inc_claimed(self):
|
||||||
|
self.addCleanup(self.controller.delete, 'test-queue',
|
||||||
|
project=self.project)
|
||||||
|
|
||||||
|
queue_name = 'inc-claimed'
|
||||||
|
|
||||||
|
self.controller.create(queue_name)
|
||||||
|
self.controller._inc_claimed(queue_name, None, 10)
|
||||||
|
|
||||||
|
scoped_q_name = utils.scope_queue_name(queue_name)
|
||||||
|
claimed = self.controller._get_queue_info(scoped_q_name,
|
||||||
|
b'cl', int)[0]
|
||||||
|
self.assertEqual(claimed, 10)
|
||||||
|
|
||||||
|
|
||||||
|
@testing.requires_redis
|
||||||
|
class RedisMessagesTest(base.MessageControllerTest):
|
||||||
|
driver_class = driver.DataDriver
|
||||||
|
config_file = 'wsgi_redis.conf'
|
||||||
|
controller_class = controllers.MessageController
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(RedisMessagesTest, self).setUp()
|
||||||
|
self.connection = self.driver.connection
|
||||||
|
self.q_controller = self.driver.queue_controller
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
super(RedisMessagesTest, self).tearDown()
|
||||||
|
self.connection.flushdb()
|
||||||
|
|
||||||
|
def test_get_count(self):
|
||||||
|
queue_name = 'get-count'
|
||||||
|
self.q_controller.create(queue_name)
|
||||||
|
|
||||||
|
msgs = [{
|
||||||
|
'ttl': 300,
|
||||||
|
'body': 'di mo fy'
|
||||||
|
} for i in range(0, 10)]
|
||||||
|
|
||||||
|
client_id = uuid.uuid4()
|
||||||
|
# Creating 10 messages
|
||||||
|
self.controller.post(queue_name, msgs, client_id)
|
||||||
|
|
||||||
|
messages_set_id = utils.scope_message_ids_set(queue_name, None,
|
||||||
|
'messages')
|
||||||
|
|
||||||
|
num_msg = self.controller._get_count(messages_set_id)
|
||||||
|
self.assertEqual(num_msg, 10)
|
||||||
|
|
||||||
|
def test_empty_queue_exception(self):
|
||||||
|
queue_name = 'empty-queue-test'
|
||||||
|
self.q_controller.create(queue_name)
|
||||||
|
|
||||||
|
self.assertRaises(storage.errors.QueueIsEmpty,
|
||||||
|
self.controller.first, queue_name)
|
||||||
|
|
||||||
|
|
||||||
|
@testing.requires_redis
|
||||||
|
class RedisClaimsTest(base.ClaimControllerTest):
|
||||||
|
driver_class = driver.DataDriver
|
||||||
|
config_file = 'wsgi_redis.conf'
|
||||||
|
controller_class = controllers.ClaimController
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(RedisClaimsTest, self).setUp()
|
||||||
|
self.connection = self.driver.connection
|
||||||
|
self.q_controller = self.driver.queue_controller
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
super(RedisClaimsTest, self).tearDown()
|
||||||
|
self.connection.flushdb()
|
||||||
|
|
||||||
|
def test_claim_doesnt_exist(self):
|
||||||
|
queue_name = 'no-such-claim'
|
||||||
|
epoch = '000000000000000000000000'
|
||||||
|
self.q_controller.create(queue_name)
|
||||||
|
self.assertRaises(storage.errors.ClaimDoesNotExist,
|
||||||
|
self.controller.get, queue_name,
|
||||||
|
epoch, project=None)
|
||||||
|
|
||||||
|
claim_id, messages = self.controller.create(queue_name, {'ttl': 2,
|
||||||
|
'grace': 0},
|
||||||
|
project=None)
|
||||||
|
|
||||||
|
# Lets let it expire
|
||||||
|
time.sleep(2)
|
||||||
|
self.assertRaises(storage.errors.ClaimDoesNotExist,
|
||||||
|
self.controller.update, queue_name,
|
||||||
|
claim_id, {}, project=None)
|
@ -42,26 +42,34 @@ class Conflict(ExceptionBase):
|
|||||||
class MessageConflict(Conflict):
|
class MessageConflict(Conflict):
|
||||||
|
|
||||||
msg_format = (u'Message could not be enqueued due to a conflict '
|
msg_format = (u'Message could not be enqueued due to a conflict '
|
||||||
u'with another message that is already in '
|
u'with one or more other messages that are already in '
|
||||||
u'queue {queue} for project {project}')
|
u'queue {queue} for project {project}')
|
||||||
|
|
||||||
def __init__(self, queue, project, message_ids):
|
def __init__(self, queue, project):
|
||||||
"""Initializes the error with contextual information.
|
"""Initializes the error with contextual information.
|
||||||
|
|
||||||
:param queue: name of the queue to which the message was posted
|
:param queue: name of the queue to which the message was posted
|
||||||
|
|
||||||
:param project: name of the project to which the queue belongs
|
:param project: name of the project to which the queue belongs
|
||||||
:param message_ids: list of IDs for messages successfully
|
|
||||||
posted. Note that these must be in the same order as the
|
|
||||||
list of messages originally submitted to be enqueued.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
super(MessageConflict, self).__init__(queue=queue, project=project)
|
super(MessageConflict, self).__init__(queue=queue, project=project)
|
||||||
self._succeeded_ids = message_ids
|
|
||||||
|
|
||||||
@property
|
|
||||||
def succeeded_ids(self):
|
class ClaimConflict(Conflict):
|
||||||
return self._succeeded_ids
|
|
||||||
|
msg_format = (u'Messages could not be claimed due to a conflict '
|
||||||
|
u'with another parallel claim that is already in '
|
||||||
|
u'queue {queue} for project {project}')
|
||||||
|
|
||||||
|
def __init__(self, queue, project):
|
||||||
|
"""Initializes the error with contextual information.
|
||||||
|
|
||||||
|
:param queue: name of the queue to which the message was posted
|
||||||
|
:param project: name of the project to which the queue belongs
|
||||||
|
"""
|
||||||
|
|
||||||
|
super(ClaimConflict, self).__init__(queue=queue, project=project)
|
||||||
|
|
||||||
|
|
||||||
class QueueDoesNotExist(DoesNotExist):
|
class QueueDoesNotExist(DoesNotExist):
|
||||||
|
@ -668,9 +668,7 @@ class MessageController(storage.Message):
|
|||||||
queue=queue_name,
|
queue=queue_name,
|
||||||
project=project))
|
project=project))
|
||||||
|
|
||||||
succeeded_ids = []
|
raise errors.MessageConflict(queue_name, project)
|
||||||
raise errors.MessageConflict(queue_name, project,
|
|
||||||
succeeded_ids)
|
|
||||||
|
|
||||||
@utils.raises_conn_error
|
@utils.raises_conn_error
|
||||||
@utils.retries_on_autoreconnect
|
@utils.retries_on_autoreconnect
|
||||||
|
0
zaqar/queues/storage/redis/__init__.py
Normal file
0
zaqar/queues/storage/redis/__init__.py
Normal file
368
zaqar/queues/storage/redis/claims.py
Normal file
368
zaqar/queues/storage/redis/claims.py
Normal file
@ -0,0 +1,368 @@
|
|||||||
|
# Copyright (c) 2014 Prashanth Raghu.
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import functools
|
||||||
|
|
||||||
|
import msgpack
|
||||||
|
import redis
|
||||||
|
|
||||||
|
from zaqar.common import decorators
|
||||||
|
from zaqar.openstack.common import log as logging
|
||||||
|
from zaqar.openstack.common import timeutils
|
||||||
|
from zaqar.queues import storage
|
||||||
|
from zaqar.queues.storage import errors
|
||||||
|
from zaqar.queues.storage.redis import messages
|
||||||
|
from zaqar.queues.storage.redis import utils
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
QUEUE_CLAIMS_SUFFIX = 'claims'
|
||||||
|
CLAIM_MESSAGES_SUFFIX = 'messages'
|
||||||
|
|
||||||
|
RETRY_CLAIM_TIMEOUT = 10
|
||||||
|
|
||||||
|
|
||||||
|
class ClaimController(storage.Claim):
|
||||||
|
"""Implements claim resource operations using Redis.
|
||||||
|
|
||||||
|
Redis Data Structures:
|
||||||
|
----------------------
|
||||||
|
Claims list (Redis set) contains claim ids
|
||||||
|
|
||||||
|
Key: <project-id_q-name>
|
||||||
|
|
||||||
|
Name Field
|
||||||
|
-------------------------
|
||||||
|
claim_ids m
|
||||||
|
|
||||||
|
Claimed Messages (Redis set) contains the list of
|
||||||
|
message ids stored per claim
|
||||||
|
|
||||||
|
Key: <claim_id>_messages
|
||||||
|
|
||||||
|
Claim info(Redis Hash):
|
||||||
|
|
||||||
|
Key: <claim_id>
|
||||||
|
|
||||||
|
Name Field
|
||||||
|
-------------------------
|
||||||
|
ttl -> t
|
||||||
|
id -> id
|
||||||
|
expires -> e
|
||||||
|
"""
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super(ClaimController, self).__init__(*args, **kwargs)
|
||||||
|
self._client = self.driver.connection
|
||||||
|
|
||||||
|
self._packer = msgpack.Packer(encoding='utf-8',
|
||||||
|
use_bin_type=True).pack
|
||||||
|
self._unpacker = functools.partial(msgpack.unpackb, encoding='utf-8')
|
||||||
|
|
||||||
|
def _get_claim_info(self, claim_id, fields, transform=int):
|
||||||
|
"""Get one or more fields from the claim Info."""
|
||||||
|
|
||||||
|
values = self._client.hmget(claim_id, fields)
|
||||||
|
return [transform(v) for v in values] if transform else values
|
||||||
|
|
||||||
|
def _exists(self, queue, claim_id, project):
|
||||||
|
client = self._client
|
||||||
|
claims_set_key = utils.scope_claims_set(queue, project,
|
||||||
|
QUEUE_CLAIMS_SUFFIX)
|
||||||
|
|
||||||
|
# Return False if no such claim exists
|
||||||
|
# TODO(prashanthr_): Discuss the feasibility of a bloom filter.
|
||||||
|
if not client.sismember(claims_set_key, claim_id):
|
||||||
|
return False
|
||||||
|
|
||||||
|
expires = self._get_claim_info(claim_id, b'e')[0]
|
||||||
|
now = timeutils.utcnow_ts()
|
||||||
|
|
||||||
|
if now > expires:
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _get_claimed_message_keys(self, claim_id):
|
||||||
|
return self._client.lrange(claim_id, 0, -1)
|
||||||
|
|
||||||
|
@decorators.lazy_property(write=False)
|
||||||
|
def _message_ctrl(self):
|
||||||
|
return self.driver.message_controller
|
||||||
|
|
||||||
|
@decorators.lazy_property(write=False)
|
||||||
|
def _queue_ctrl(self):
|
||||||
|
return self.driver.queue_controller
|
||||||
|
|
||||||
|
@utils.raises_conn_error
|
||||||
|
@utils.retries_on_connection_error
|
||||||
|
def get(self, queue, claim_id, project=None):
|
||||||
|
if not self._exists(queue, claim_id, project):
|
||||||
|
raise errors.ClaimDoesNotExist(queue, project, claim_id)
|
||||||
|
|
||||||
|
claim_msgs_key = utils.scope_claim_messages(claim_id,
|
||||||
|
CLAIM_MESSAGES_SUFFIX)
|
||||||
|
|
||||||
|
# basic_messages
|
||||||
|
msg_keys = self._get_claimed_message_keys(claim_msgs_key)
|
||||||
|
|
||||||
|
with self._client.pipeline() as pipe:
|
||||||
|
for key in msg_keys:
|
||||||
|
pipe.hgetall(key)
|
||||||
|
|
||||||
|
raw_messages = pipe.execute()
|
||||||
|
|
||||||
|
now = timeutils.utcnow_ts()
|
||||||
|
basic_messages = [messages.Message.from_redis(msg).to_basic(now)
|
||||||
|
for msg in raw_messages if msg]
|
||||||
|
|
||||||
|
# claim_meta
|
||||||
|
now = timeutils.utcnow_ts()
|
||||||
|
expires, ttl = self._get_claim_info(claim_id, [b'e', b't'])
|
||||||
|
update_time = expires - ttl
|
||||||
|
age = now - update_time
|
||||||
|
|
||||||
|
claim_meta = {
|
||||||
|
'age': age,
|
||||||
|
'ttl': ttl,
|
||||||
|
'id': claim_id,
|
||||||
|
}
|
||||||
|
|
||||||
|
return claim_meta, basic_messages
|
||||||
|
|
||||||
|
@utils.raises_conn_error
|
||||||
|
@utils.retries_on_connection_error
|
||||||
|
def create(self, queue, metadata, project=None,
|
||||||
|
limit=storage.DEFAULT_MESSAGES_PER_CLAIM):
|
||||||
|
|
||||||
|
ttl = int(metadata.get('ttl', 60))
|
||||||
|
grace = int(metadata.get('grace', 60))
|
||||||
|
msg_ttl = ttl + grace
|
||||||
|
|
||||||
|
claim_id = utils.generate_uuid()
|
||||||
|
claim_key = utils.scope_claim_messages(claim_id,
|
||||||
|
CLAIM_MESSAGES_SUFFIX)
|
||||||
|
|
||||||
|
claims_set_key = utils.scope_claims_set(queue, project,
|
||||||
|
QUEUE_CLAIMS_SUFFIX)
|
||||||
|
|
||||||
|
counter_key = self._queue_ctrl._claim_counter_key(queue, project)
|
||||||
|
|
||||||
|
with self._client.pipeline() as pipe:
|
||||||
|
|
||||||
|
start_ts = timeutils.utcnow_ts()
|
||||||
|
|
||||||
|
# NOTE(kgriffs): Retry the operation if another transaction
|
||||||
|
# completes before this one, in which case it will have
|
||||||
|
# claimed the same messages the current thread is trying
|
||||||
|
# to claim, and therefoe we must try for another batch.
|
||||||
|
#
|
||||||
|
# This loop will eventually time out if we can't manage to
|
||||||
|
# claim any messages due to other threads continually beating
|
||||||
|
# us to the punch.
|
||||||
|
|
||||||
|
# TODO(kgriffs): Would it be beneficial (or harmful) to
|
||||||
|
# introducce a backoff sleep in between retries?
|
||||||
|
while (timeutils.utcnow_ts() - start_ts) < RETRY_CLAIM_TIMEOUT:
|
||||||
|
|
||||||
|
# NOTE(kgriffs): The algorithm for claiming messages:
|
||||||
|
#
|
||||||
|
# 1. Get a batch of messages that are currently active.
|
||||||
|
# 2. For each active message in the batch, extend its
|
||||||
|
# lifetime IFF it would otherwise expire before the
|
||||||
|
# claim itself does.
|
||||||
|
# 3. Associate the claim with each message
|
||||||
|
# 4. Create a claim record with details such as TTL
|
||||||
|
# and expiration time.
|
||||||
|
# 5. Add the claim's ID to a set to facilitate fast
|
||||||
|
# existence checks.
|
||||||
|
|
||||||
|
results = self._message_ctrl._active(queue, project=project,
|
||||||
|
limit=limit)
|
||||||
|
|
||||||
|
cursor = next(results)
|
||||||
|
msg_list = list(cursor)
|
||||||
|
|
||||||
|
# NOTE(kgriffs): If there are no active messages to
|
||||||
|
# claim, simply return an empty list.
|
||||||
|
if not msg_list:
|
||||||
|
return (None, iter([]))
|
||||||
|
|
||||||
|
basic_messages = []
|
||||||
|
|
||||||
|
try:
|
||||||
|
# TODO(kgriffs): Is it faster/better to do this all
|
||||||
|
# in a Lua script instead of using an app-layer
|
||||||
|
# transaction?
|
||||||
|
|
||||||
|
# NOTE(kgriffs): Abort the entire transaction if
|
||||||
|
# another request beats us to the punch. We detect
|
||||||
|
# this by putting a watch on the key that will have
|
||||||
|
# one of its fields updated as the final step of
|
||||||
|
# the transaction.
|
||||||
|
pipe.watch(counter_key)
|
||||||
|
pipe.multi()
|
||||||
|
|
||||||
|
now = timeutils.utcnow_ts()
|
||||||
|
|
||||||
|
claim_expires = now + ttl
|
||||||
|
msg_expires = claim_expires + grace
|
||||||
|
|
||||||
|
# Associate the claim with each message
|
||||||
|
for msg in msg_list:
|
||||||
|
msg.claim_id = claim_id
|
||||||
|
msg.claim_expires = claim_expires
|
||||||
|
|
||||||
|
if _msg_would_expire(msg, msg_expires):
|
||||||
|
msg.ttl = msg_ttl
|
||||||
|
msg.expires = msg_expires
|
||||||
|
|
||||||
|
pipe.rpush(claim_key, msg.id)
|
||||||
|
|
||||||
|
# TODO(kgriffs): Rather than writing back the
|
||||||
|
# entire message, only set the fields that
|
||||||
|
# have changed.
|
||||||
|
msg.to_redis(pipe)
|
||||||
|
|
||||||
|
basic_messages.append(msg.to_basic(now))
|
||||||
|
|
||||||
|
# Create the claim
|
||||||
|
claim_info = {
|
||||||
|
'id': claim_id,
|
||||||
|
't': ttl,
|
||||||
|
'e': claim_expires
|
||||||
|
}
|
||||||
|
|
||||||
|
pipe.hmset(claim_id, claim_info)
|
||||||
|
|
||||||
|
# NOTE(kgriffs): Add the claim ID to a set so that
|
||||||
|
# existence checks can be performed quickly.
|
||||||
|
pipe.sadd(claims_set_key, claim_id)
|
||||||
|
|
||||||
|
# NOTE(kgriffs): Update a counter that facilitates
|
||||||
|
# the queue stats calculation.
|
||||||
|
self._queue_ctrl._inc_claimed(queue, project,
|
||||||
|
len(msg_list),
|
||||||
|
pipe=pipe)
|
||||||
|
|
||||||
|
pipe.execute()
|
||||||
|
return claim_id, basic_messages
|
||||||
|
|
||||||
|
except redis.exceptions.WatchError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
raise errors.ClaimConflict(queue, project)
|
||||||
|
|
||||||
|
@utils.raises_conn_error
|
||||||
|
@utils.retries_on_connection_error
|
||||||
|
def update(self, queue, claim_id, metadata, project=None):
|
||||||
|
if not self._exists(queue, claim_id, project):
|
||||||
|
raise errors.ClaimDoesNotExist(claim_id, queue, project)
|
||||||
|
|
||||||
|
now = timeutils.utcnow_ts()
|
||||||
|
|
||||||
|
claim_ttl = int(metadata.get('ttl', 60))
|
||||||
|
claim_expires = now + claim_ttl
|
||||||
|
|
||||||
|
grace = int(metadata.get('grace', 60))
|
||||||
|
msg_ttl = claim_ttl + grace
|
||||||
|
msg_expires = claim_expires + grace
|
||||||
|
|
||||||
|
claim_messages = utils.scope_claim_messages(claim_id,
|
||||||
|
CLAIM_MESSAGES_SUFFIX)
|
||||||
|
|
||||||
|
msg_keys = self._get_claimed_message_keys(claim_messages)
|
||||||
|
|
||||||
|
with self._client.pipeline() as pipe:
|
||||||
|
for key in msg_keys:
|
||||||
|
pipe.hgetall(key)
|
||||||
|
|
||||||
|
claimed_msgs = pipe.execute()
|
||||||
|
|
||||||
|
claim_info = {
|
||||||
|
't': claim_ttl,
|
||||||
|
'e': claim_expires,
|
||||||
|
}
|
||||||
|
|
||||||
|
with self._client.pipeline() as pipe:
|
||||||
|
for msg in claimed_msgs:
|
||||||
|
if msg:
|
||||||
|
msg = messages.Message.from_redis(msg)
|
||||||
|
msg.claim_id = claim_id
|
||||||
|
msg.claim_expires = claim_expires
|
||||||
|
|
||||||
|
if _msg_would_expire(msg, msg_expires):
|
||||||
|
msg.ttl = msg_ttl
|
||||||
|
msg.expires = msg_expires
|
||||||
|
|
||||||
|
# TODO(kgriffs): Rather than writing back the
|
||||||
|
# entire message, only set the fields that
|
||||||
|
# have changed.
|
||||||
|
msg.to_redis(pipe)
|
||||||
|
|
||||||
|
# Update the claim id and claim expiration info
|
||||||
|
# for all the messages.
|
||||||
|
pipe.hmset(claim_id, claim_info)
|
||||||
|
|
||||||
|
pipe.execute()
|
||||||
|
|
||||||
|
@utils.raises_conn_error
|
||||||
|
@utils.retries_on_connection_error
|
||||||
|
def delete(self, queue, claim_id, project=None):
|
||||||
|
# NOTE(prashanthr_): Return silently when the claim
|
||||||
|
# does not exist
|
||||||
|
if not self._exists(queue, claim_id, project):
|
||||||
|
return
|
||||||
|
|
||||||
|
now = timeutils.utcnow_ts()
|
||||||
|
claim_messages_key = utils.scope_claim_messages(claim_id,
|
||||||
|
CLAIM_MESSAGES_SUFFIX)
|
||||||
|
|
||||||
|
msg_keys = self._get_claimed_message_keys(claim_messages_key)
|
||||||
|
|
||||||
|
with self._client.pipeline() as pipe:
|
||||||
|
for msg_key in msg_keys:
|
||||||
|
pipe.hgetall(msg_key)
|
||||||
|
|
||||||
|
claimed_msgs = pipe.execute()
|
||||||
|
|
||||||
|
# Update the claim id and claim expiration info
|
||||||
|
# for all the messages.
|
||||||
|
claims_set_key = utils.scope_claims_set(queue, project,
|
||||||
|
QUEUE_CLAIMS_SUFFIX)
|
||||||
|
|
||||||
|
with self._client.pipeline() as pipe:
|
||||||
|
pipe.srem(claims_set_key, claim_id)
|
||||||
|
pipe.delete(claim_id)
|
||||||
|
pipe.delete(claim_messages_key)
|
||||||
|
|
||||||
|
for msg in claimed_msgs:
|
||||||
|
if msg:
|
||||||
|
msg = messages.Message.from_redis(msg)
|
||||||
|
msg.claim_id = None
|
||||||
|
msg.claim_expires = now
|
||||||
|
|
||||||
|
# TODO(kgriffs): Rather than writing back the
|
||||||
|
# entire message, only set the fields that
|
||||||
|
# have changed.
|
||||||
|
msg.to_redis(pipe)
|
||||||
|
|
||||||
|
self._queue_ctrl._inc_claimed(queue, project,
|
||||||
|
-1 * len(claimed_msgs),
|
||||||
|
pipe=pipe)
|
||||||
|
|
||||||
|
pipe.execute()
|
||||||
|
|
||||||
|
|
||||||
|
def _msg_would_expire(message, now):
|
||||||
|
return message.expires < now
|
22
zaqar/queues/storage/redis/controllers.py
Normal file
22
zaqar/queues/storage/redis/controllers.py
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
# Copyright (c) 2014 Prashanth Raghu
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from zaqar.queues.storage.redis import claims
|
||||||
|
from zaqar.queues.storage.redis import messages
|
||||||
|
from zaqar.queues.storage.redis import queues
|
||||||
|
|
||||||
|
|
||||||
|
QueueController = queues.QueueController
|
||||||
|
MessageController = messages.MessageController
|
||||||
|
ClaimController = claims.ClaimController
|
107
zaqar/queues/storage/redis/driver.py
Normal file
107
zaqar/queues/storage/redis/driver.py
Normal file
@ -0,0 +1,107 @@
|
|||||||
|
# Copyright (c) 2014 Prashanth Raghu.
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import redis
|
||||||
|
from six.moves import urllib
|
||||||
|
|
||||||
|
from zaqar.common import decorators
|
||||||
|
from zaqar.openstack.common import log as logging
|
||||||
|
from zaqar.queues import storage
|
||||||
|
from zaqar.queues.storage.redis import controllers
|
||||||
|
from zaqar.queues.storage.redis import options
|
||||||
|
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def _get_redis_client(conf):
|
||||||
|
# TODO(prashanthr_): Add SSL support
|
||||||
|
parsed_url = urllib.parse.urlparse(conf.uri)
|
||||||
|
|
||||||
|
if parsed_url.hostname:
|
||||||
|
return redis.StrictRedis(host=parsed_url.hostname,
|
||||||
|
port=parsed_url.port)
|
||||||
|
else:
|
||||||
|
return redis.StrictRedis(unix_socket_path=parsed_url.path)
|
||||||
|
|
||||||
|
|
||||||
|
class DataDriver(storage.DataDriverBase):
|
||||||
|
|
||||||
|
def __init__(self, conf, cache):
|
||||||
|
super(DataDriver, self).__init__(conf, cache)
|
||||||
|
|
||||||
|
opts = options.REDIS_OPTIONS
|
||||||
|
|
||||||
|
if 'dynamic' in conf:
|
||||||
|
names = conf[options.REDIS_GROUP].keys()
|
||||||
|
opts = filter(lambda x: x.name not in names, opts)
|
||||||
|
|
||||||
|
self.conf.register_opts(opts,
|
||||||
|
group=options.REDIS_GROUP)
|
||||||
|
self.redis_conf = self.conf[options.REDIS_GROUP]
|
||||||
|
|
||||||
|
def is_alive(self):
|
||||||
|
try:
|
||||||
|
return self.connection.ping()
|
||||||
|
except redis.exceptions.ConnectionError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _health(self):
|
||||||
|
KPI = {}
|
||||||
|
KPI['storage_reachable'] = self.is_alive()
|
||||||
|
KPI['operation_status'] = self._get_operation_status()
|
||||||
|
|
||||||
|
# TODO(kgriffs): Add metrics re message volume
|
||||||
|
return KPI
|
||||||
|
|
||||||
|
@decorators.lazy_property(write=False)
|
||||||
|
def connection(self):
|
||||||
|
"""Redis client connection instance."""
|
||||||
|
return _get_redis_client(self.redis_conf)
|
||||||
|
|
||||||
|
@decorators.lazy_property(write=False)
|
||||||
|
def queue_controller(self):
|
||||||
|
return controllers.QueueController(self)
|
||||||
|
|
||||||
|
@decorators.lazy_property(write=False)
|
||||||
|
def message_controller(self):
|
||||||
|
return controllers.MessageController(self)
|
||||||
|
|
||||||
|
@decorators.lazy_property(write=False)
|
||||||
|
def claim_controller(self):
|
||||||
|
return controllers.ClaimController(self)
|
||||||
|
|
||||||
|
|
||||||
|
class ControlDriver(storage.ControlDriverBase):
|
||||||
|
|
||||||
|
def __init__(self, conf, cache):
|
||||||
|
super(ControlDriver, self).__init__(conf, cache)
|
||||||
|
|
||||||
|
self.conf.register_opts(options.REDIS_OPTIONS,
|
||||||
|
group=options.REDIS_GROUP)
|
||||||
|
|
||||||
|
self.redis_conf = self.conf[options.REDIS_GROUP]
|
||||||
|
|
||||||
|
@decorators.lazy_property(write=False)
|
||||||
|
def connection(self):
|
||||||
|
"""Redis client connection instance."""
|
||||||
|
return _get_redis_client(self.redis_conf)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def pools_controller(self):
|
||||||
|
return None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def catalogue_controller(self):
|
||||||
|
return None
|
483
zaqar/queues/storage/redis/messages.py
Normal file
483
zaqar/queues/storage/redis/messages.py
Normal file
@ -0,0 +1,483 @@
|
|||||||
|
# Copyright (c) 2014 Prashanth Raghu.
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import functools
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
import redis
|
||||||
|
|
||||||
|
from zaqar.common import decorators
|
||||||
|
from zaqar.openstack.common import strutils
|
||||||
|
from zaqar.openstack.common import timeutils
|
||||||
|
from zaqar.queues import storage
|
||||||
|
from zaqar.queues.storage import errors
|
||||||
|
from zaqar.queues.storage.redis import models
|
||||||
|
from zaqar.queues.storage.redis import utils
|
||||||
|
|
||||||
|
Message = models.Message
|
||||||
|
|
||||||
|
|
||||||
|
MESSAGE_IDS_SUFFIX = 'messages'
|
||||||
|
# The rank counter is an atomic index to rank messages
|
||||||
|
# in a FIFO manner.
|
||||||
|
MESSAGE_RANK_COUNTER_SUFFIX = 'rank_counter'
|
||||||
|
|
||||||
|
# NOTE(kgriffs): This value, in seconds, should be at least less than the
|
||||||
|
# minimum allowed TTL for messages (60 seconds).
|
||||||
|
RETRY_POST_TIMEOUT = 10
|
||||||
|
|
||||||
|
|
||||||
|
class MessageController(storage.Message):
|
||||||
|
"""Implements message resource operations using Redis.
|
||||||
|
|
||||||
|
Messages are scoped by project + queue.
|
||||||
|
|
||||||
|
Redis Data Structures:
|
||||||
|
----------------------
|
||||||
|
1. Message id's list (Redis sorted set)
|
||||||
|
|
||||||
|
Each queue in the system has a set of message ids currently
|
||||||
|
in the queue. The list is sorted based on a ranking which is
|
||||||
|
incremented atomically using the counter(MESSAGE_RANK_COUNTER_SUFFIX)
|
||||||
|
also stored in the database for every queue.
|
||||||
|
|
||||||
|
Key: <project-id.q-name>
|
||||||
|
|
||||||
|
2. Messages(Redis Hash):
|
||||||
|
|
||||||
|
Scoped by the UUID of the message, the redis datastructure
|
||||||
|
has the following information.
|
||||||
|
|
||||||
|
|
||||||
|
Name Field
|
||||||
|
-----------------------------
|
||||||
|
id -> id
|
||||||
|
ttl -> t
|
||||||
|
expires -> e
|
||||||
|
body -> b
|
||||||
|
claim -> c
|
||||||
|
claim expiry time -> c.e
|
||||||
|
client uuid -> u
|
||||||
|
created time -> cr
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super(MessageController, self).__init__(*args, **kwargs)
|
||||||
|
self._client = self.driver.connection
|
||||||
|
|
||||||
|
@decorators.lazy_property(write=False)
|
||||||
|
def _queue_ctrl(self):
|
||||||
|
return self.driver.queue_controller
|
||||||
|
|
||||||
|
@decorators.lazy_property(write=False)
|
||||||
|
def _claim_ctrl(self):
|
||||||
|
return self.driver.claim_controller
|
||||||
|
|
||||||
|
def _active(self, queue_name, marker=None, echo=False,
|
||||||
|
client_uuid=None, project=None,
|
||||||
|
limit=None):
|
||||||
|
return self._list(queue_name, project=project, marker=marker,
|
||||||
|
echo=echo, client_uuid=client_uuid,
|
||||||
|
include_claimed=False,
|
||||||
|
limit=limit, to_basic=False)
|
||||||
|
|
||||||
|
def _get_count(self, msgset_key):
|
||||||
|
"""Get num messages in a Queue.
|
||||||
|
|
||||||
|
Return the number of messages in a queue scoped by
|
||||||
|
queue and project.
|
||||||
|
"""
|
||||||
|
return self._client.zcard(msgset_key)
|
||||||
|
|
||||||
|
@utils.raises_conn_error
|
||||||
|
@utils.retries_on_connection_error
|
||||||
|
def _delete_queue_messages(self, queue, project, pipe):
|
||||||
|
"""Method to remove all the messages belonging to a queue.
|
||||||
|
|
||||||
|
Will be referenced from the QueueController.
|
||||||
|
The pipe to execute deletion will be passed from the QueueController
|
||||||
|
executing the operation.
|
||||||
|
"""
|
||||||
|
client = self._client
|
||||||
|
msgset_key = utils.scope_message_ids_set(queue, project,
|
||||||
|
MESSAGE_IDS_SUFFIX)
|
||||||
|
message_ids = client.zrange(msgset_key, 0, -1)
|
||||||
|
|
||||||
|
pipe.delete(msgset_key)
|
||||||
|
for msg_id in message_ids:
|
||||||
|
pipe.delete(msg_id)
|
||||||
|
|
||||||
|
# TODO(prashanthr_): Looking for better ways to solve the issue.
|
||||||
|
def _find_first_unclaimed(self, queue, project, limit):
|
||||||
|
"""Find the first unclaimed message in the queue."""
|
||||||
|
|
||||||
|
msgset_key = utils.scope_message_ids_set(queue, project,
|
||||||
|
MESSAGE_IDS_SUFFIX)
|
||||||
|
marker = 0
|
||||||
|
now = timeutils.utcnow_ts()
|
||||||
|
|
||||||
|
# NOTE(prashanthr_): This will not be an infinite loop.
|
||||||
|
while True:
|
||||||
|
msg_keys = self._client.zrange(msgset_key, marker,
|
||||||
|
marker + limit)
|
||||||
|
if msg_keys:
|
||||||
|
messages = [Message.from_redis(self._client.hgetall(msg_key))
|
||||||
|
for msg_key in msg_keys]
|
||||||
|
|
||||||
|
for msg in messages:
|
||||||
|
if not utils.msg_claimed_filter(msg, now):
|
||||||
|
return msg.id
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _exists(self, key):
|
||||||
|
"""Check if message exists in the Queue.
|
||||||
|
|
||||||
|
Helper function which checks if a particular message_id
|
||||||
|
exists in the sorted set of the queues message ids.
|
||||||
|
"""
|
||||||
|
return self._client.exists(key)
|
||||||
|
|
||||||
|
def _get_first_message_id(self, queue, project, sort):
|
||||||
|
"""Fetch head/tail of the Queue.
|
||||||
|
|
||||||
|
Helper function to get the first message in the queue
|
||||||
|
sort > 0 get from the left else from the right.
|
||||||
|
"""
|
||||||
|
msgset_key = utils.scope_message_ids_set(queue, project,
|
||||||
|
MESSAGE_IDS_SUFFIX)
|
||||||
|
|
||||||
|
sorter = self._client.zrange if sort == 1 else self._client.zrevrange
|
||||||
|
message_ids = sorter(msgset_key, 0, 0)
|
||||||
|
return message_ids[0] if message_ids else None
|
||||||
|
|
||||||
|
def _get(self, message_id):
|
||||||
|
msg = self._client.hgetall(message_id)
|
||||||
|
return Message.from_redis(msg) if msg else None
|
||||||
|
|
||||||
|
def _get_claim(self, message_id):
|
||||||
|
claim = self._client.hmget(message_id, 'c', 'c.e')
|
||||||
|
|
||||||
|
if claim == [None, None]:
|
||||||
|
# NOTE(kgriffs): message_id was not found
|
||||||
|
return None
|
||||||
|
|
||||||
|
return {
|
||||||
|
# NOTE(kgriffs): A "None" claim is serialized as an empty str
|
||||||
|
'id': strutils.safe_decode(claim[0]) or None,
|
||||||
|
'expires': int(claim[1]),
|
||||||
|
}
|
||||||
|
|
||||||
|
def _list(self, queue, project=None, marker=None,
|
||||||
|
limit=storage.DEFAULT_MESSAGES_PER_PAGE,
|
||||||
|
echo=False, client_uuid=None,
|
||||||
|
include_claimed=False, to_basic=True):
|
||||||
|
|
||||||
|
if not self._queue_ctrl.exists(queue, project):
|
||||||
|
raise errors.QueueDoesNotExist(queue,
|
||||||
|
project)
|
||||||
|
|
||||||
|
msgset_key = utils.scope_message_ids_set(queue,
|
||||||
|
project,
|
||||||
|
MESSAGE_IDS_SUFFIX)
|
||||||
|
client = self._client
|
||||||
|
|
||||||
|
with self._client.pipeline() as pipe:
|
||||||
|
# NOTE(prashanthr_): Iterate through the queue to find the first
|
||||||
|
# unclaimed message.
|
||||||
|
if not marker and not include_claimed:
|
||||||
|
marker = self._find_first_unclaimed(queue, project, limit)
|
||||||
|
start = client.zrank(msgset_key, marker) or 0
|
||||||
|
else:
|
||||||
|
rank = client.zrank(msgset_key, marker)
|
||||||
|
start = rank + 1 if rank else 0
|
||||||
|
|
||||||
|
message_ids = client.zrange(msgset_key, start,
|
||||||
|
start + (limit - 1))
|
||||||
|
|
||||||
|
for msg_id in message_ids:
|
||||||
|
pipe.hgetall(msg_id)
|
||||||
|
|
||||||
|
messages = pipe.execute()
|
||||||
|
|
||||||
|
# NOTE(prashanthr_): Build a list of filters for checking
|
||||||
|
# the following:
|
||||||
|
#
|
||||||
|
# 1. Message is expired
|
||||||
|
# 2. Message is claimed
|
||||||
|
# 3. Message should not be echoed
|
||||||
|
#
|
||||||
|
now = timeutils.utcnow_ts()
|
||||||
|
filters = [functools.partial(utils.msg_expired_filter, now=now)]
|
||||||
|
|
||||||
|
if not include_claimed:
|
||||||
|
filters.append(functools.partial(utils.msg_claimed_filter,
|
||||||
|
now=now))
|
||||||
|
|
||||||
|
if not echo:
|
||||||
|
filters.append(functools.partial(utils.msg_echo_filter,
|
||||||
|
client_uuid=client_uuid))
|
||||||
|
|
||||||
|
marker = {}
|
||||||
|
|
||||||
|
yield _filter_messages(messages, pipe, filters, to_basic, marker)
|
||||||
|
yield marker['next']
|
||||||
|
|
||||||
|
@utils.raises_conn_error
|
||||||
|
@utils.retries_on_connection_error
|
||||||
|
def list(self, queue, project=None, marker=None,
|
||||||
|
limit=storage.DEFAULT_MESSAGES_PER_PAGE,
|
||||||
|
echo=False, client_uuid=None,
|
||||||
|
include_claimed=False):
|
||||||
|
|
||||||
|
return self._list(queue, project, marker, limit, echo,
|
||||||
|
client_uuid, include_claimed)
|
||||||
|
|
||||||
|
@utils.raises_conn_error
|
||||||
|
@utils.retries_on_connection_error
|
||||||
|
def first(self, queue, project=None, sort=1):
|
||||||
|
if sort not in (1, -1):
|
||||||
|
raise ValueError(u'sort must be either 1 (ascending) '
|
||||||
|
u'or -1 (descending)')
|
||||||
|
|
||||||
|
message_id = self._get_first_message_id(queue, project, sort)
|
||||||
|
if not message_id:
|
||||||
|
raise errors.QueueIsEmpty(queue, project)
|
||||||
|
|
||||||
|
now = timeutils.utcnow_ts()
|
||||||
|
return self._get(message_id).to_basic(now, include_created=True)
|
||||||
|
|
||||||
|
@utils.raises_conn_error
|
||||||
|
@utils.retries_on_connection_error
|
||||||
|
def get(self, queue, message_id, project=None):
|
||||||
|
if not self._queue_ctrl.exists(queue, project):
|
||||||
|
raise errors.QueueDoesNotExist(queue, project)
|
||||||
|
|
||||||
|
message = self._get(message_id)
|
||||||
|
now = timeutils.utcnow_ts()
|
||||||
|
|
||||||
|
if message and not utils.msg_expired_filter(message, now):
|
||||||
|
return message.to_basic(now)
|
||||||
|
else:
|
||||||
|
raise errors.MessageDoesNotExist(message_id, queue, project)
|
||||||
|
|
||||||
|
@utils.raises_conn_error
|
||||||
|
@utils.retries_on_connection_error
|
||||||
|
def bulk_get(self, queue, message_ids, project=None):
|
||||||
|
if not self._queue_ctrl.exists(queue, project):
|
||||||
|
raise errors.QueueDoesNotExist(queue, project)
|
||||||
|
|
||||||
|
# NOTE(prashanthr_): Pipelining is used here purely
|
||||||
|
# for performance.
|
||||||
|
with self._client.pipeline() as pipe:
|
||||||
|
for mid in message_ids:
|
||||||
|
pipe.hgetall(mid)
|
||||||
|
|
||||||
|
messages = pipe.execute()
|
||||||
|
|
||||||
|
# NOTE(kgriffs): Skip messages that may have been deleted
|
||||||
|
now = timeutils.utcnow_ts()
|
||||||
|
return (Message.from_redis(msg).to_basic(now)
|
||||||
|
for msg in messages if msg)
|
||||||
|
|
||||||
|
@utils.raises_conn_error
|
||||||
|
@utils.retries_on_connection_error
|
||||||
|
def post(self, queue, messages, client_uuid, project=None):
|
||||||
|
if not self._queue_ctrl.exists(queue, project):
|
||||||
|
raise errors.QueueDoesNotExist(queue, project)
|
||||||
|
|
||||||
|
msgset_key = utils.scope_message_ids_set(queue, project,
|
||||||
|
MESSAGE_IDS_SUFFIX)
|
||||||
|
|
||||||
|
counter_key = utils.scope_queue_index(queue, project,
|
||||||
|
MESSAGE_RANK_COUNTER_SUFFIX)
|
||||||
|
|
||||||
|
with self._client.pipeline() as pipe:
|
||||||
|
|
||||||
|
start_ts = timeutils.utcnow_ts()
|
||||||
|
|
||||||
|
# NOTE(kgriffs): Retry the operation if another transaction
|
||||||
|
# completes before this one, in which case it may have
|
||||||
|
# posted messages with the same rank counter the current
|
||||||
|
# thread is trying to use, which would cause messages
|
||||||
|
# to get out of order and introduce the risk of a client
|
||||||
|
# missing a message while reading from the queue.
|
||||||
|
#
|
||||||
|
# This loop will eventually time out if we can't manage to
|
||||||
|
# post any messages due to other threads continually beating
|
||||||
|
# us to the punch.
|
||||||
|
|
||||||
|
# TODO(kgriffs): Would it be beneficial (or harmful) to
|
||||||
|
# introducce a backoff sleep in between retries?
|
||||||
|
while (timeutils.utcnow_ts() - start_ts) < RETRY_POST_TIMEOUT:
|
||||||
|
now = timeutils.utcnow_ts()
|
||||||
|
prepared_messages = [
|
||||||
|
Message(
|
||||||
|
ttl=msg['ttl'],
|
||||||
|
created=now,
|
||||||
|
client_uuid=client_uuid,
|
||||||
|
claim_id=None,
|
||||||
|
claim_expires=now,
|
||||||
|
body=msg.get('body', {}),
|
||||||
|
)
|
||||||
|
|
||||||
|
for msg in messages
|
||||||
|
]
|
||||||
|
|
||||||
|
try:
|
||||||
|
pipe.watch(counter_key)
|
||||||
|
|
||||||
|
rank_counter = pipe.get(counter_key)
|
||||||
|
rank_counter = int(rank_counter) if rank_counter else 0
|
||||||
|
|
||||||
|
pipe.multi()
|
||||||
|
|
||||||
|
keys = []
|
||||||
|
for i, msg in enumerate(prepared_messages):
|
||||||
|
msg.to_redis(pipe)
|
||||||
|
pipe.zadd(msgset_key, rank_counter + i, msg.id)
|
||||||
|
keys.append(msg.id)
|
||||||
|
|
||||||
|
pipe.incrby(counter_key, len(keys))
|
||||||
|
self._queue_ctrl._inc_counter(queue, project,
|
||||||
|
len(prepared_messages),
|
||||||
|
pipe=pipe)
|
||||||
|
|
||||||
|
pipe.execute()
|
||||||
|
return keys
|
||||||
|
|
||||||
|
except redis.exceptions.WatchError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
raise errors.MessageConflict(queue, project)
|
||||||
|
|
||||||
|
@utils.raises_conn_error
|
||||||
|
@utils.retries_on_connection_error
|
||||||
|
def delete(self, queue, message_id, project=None, claim=None):
|
||||||
|
if not self._queue_ctrl.exists(queue, project):
|
||||||
|
raise errors.QueueDoesNotExist(queue, project)
|
||||||
|
|
||||||
|
# TODO(kgriffs): Create decorator for validating claim and message
|
||||||
|
# IDs, since those are not checked at the transport layer. This
|
||||||
|
# decorator should be applied to all relevant methods.
|
||||||
|
if claim is not None:
|
||||||
|
try:
|
||||||
|
uuid.UUID(claim)
|
||||||
|
except ValueError:
|
||||||
|
raise errors.ClaimDoesNotExist(queue, project, claim)
|
||||||
|
|
||||||
|
msg_claim = self._get_claim(message_id)
|
||||||
|
|
||||||
|
# NOTE(kgriffs): The message does not exist, so
|
||||||
|
# it is essentially "already deleted".
|
||||||
|
if msg_claim is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
now = timeutils.utcnow_ts()
|
||||||
|
is_claimed = msg_claim['id'] and (now < msg_claim['expires'])
|
||||||
|
|
||||||
|
if claim is None:
|
||||||
|
if is_claimed:
|
||||||
|
raise errors.MessageIsClaimed(message_id)
|
||||||
|
|
||||||
|
elif not is_claimed:
|
||||||
|
raise errors.MessageNotClaimed(message_id)
|
||||||
|
|
||||||
|
elif msg_claim['id'] != claim:
|
||||||
|
if not self._claim_ctrl._exists(queue, claim, project):
|
||||||
|
raise errors.ClaimDoesNotExist(queue, project, claim)
|
||||||
|
|
||||||
|
raise errors.MessageNotClaimedBy(message_id, claim)
|
||||||
|
|
||||||
|
msgset_key = utils.scope_message_ids_set(queue, project,
|
||||||
|
MESSAGE_IDS_SUFFIX)
|
||||||
|
with self._client.pipeline() as pipe:
|
||||||
|
results = pipe.delete(message_id).zrem(msgset_key,
|
||||||
|
message_id).execute()
|
||||||
|
|
||||||
|
# NOTE(prashanthr_): results[0] is 1 when the delete is
|
||||||
|
# successful. Hence we use that case to identify successful
|
||||||
|
# deletes.
|
||||||
|
if results[0] == 1:
|
||||||
|
self._queue_ctrl._inc_counter(queue, project, -1)
|
||||||
|
|
||||||
|
@utils.raises_conn_error
|
||||||
|
@utils.retries_on_connection_error
|
||||||
|
def bulk_delete(self, queue, message_ids, project=None):
|
||||||
|
if not self._queue_ctrl.exists(queue, project):
|
||||||
|
raise errors.QueueDoesNotExist(queue,
|
||||||
|
project)
|
||||||
|
|
||||||
|
msgset_key = utils.scope_message_ids_set(queue, project,
|
||||||
|
MESSAGE_IDS_SUFFIX)
|
||||||
|
|
||||||
|
with self._client.pipeline() as pipe:
|
||||||
|
for message_id in message_ids:
|
||||||
|
pipe.delete(message_id).zrem(msgset_key, message_id)
|
||||||
|
|
||||||
|
results = pipe.execute()
|
||||||
|
|
||||||
|
# NOTE(prashanthr_): None is returned for the cases where
|
||||||
|
# the message might not exist or has been deleted/expired.
|
||||||
|
# Hence we calculate the number of deletes as the
|
||||||
|
# total number of message ids - number of failed deletes.
|
||||||
|
amount = -1 * (len(results) - results.count(0)) / 2
|
||||||
|
self._queue_ctrl._inc_counter(queue, project, int(amount))
|
||||||
|
|
||||||
|
@utils.raises_conn_error
|
||||||
|
@utils.retries_on_connection_error
|
||||||
|
def pop(self, queue, limit, project=None):
|
||||||
|
# Pop is implemented as a chain of the following operations:
|
||||||
|
# 1. Create a claim.
|
||||||
|
# 2. Delete the messages claimed.
|
||||||
|
# 3. Delete the claim.
|
||||||
|
|
||||||
|
claim_id, messages = self._claim_ctrl.create(
|
||||||
|
queue, dict(ttl=1, grace=0), project, limit=limit)
|
||||||
|
|
||||||
|
message_ids = [message['id'] for message in messages]
|
||||||
|
self.bulk_delete(queue, message_ids, project)
|
||||||
|
# NOTE(prashanthr_): Creating a claim controller reference
|
||||||
|
# causes a recursive reference. Hence, using the reference
|
||||||
|
# from the driver.
|
||||||
|
self._claim_ctrl.delete(queue, claim_id, project)
|
||||||
|
return messages
|
||||||
|
|
||||||
|
|
||||||
|
def _filter_messages(messages, pipe, filters, to_basic, marker):
|
||||||
|
"""Create a filtering iterator over a list of messages.
|
||||||
|
|
||||||
|
The function accepts a list of filters to be filtered
|
||||||
|
before the the message can be included as a part of the reply.
|
||||||
|
"""
|
||||||
|
now = timeutils.utcnow_ts()
|
||||||
|
|
||||||
|
for msg in messages:
|
||||||
|
# NOTE(kgriffs): Message may have been deleted, so
|
||||||
|
# check each value to ensure we got a message back
|
||||||
|
if msg:
|
||||||
|
msg = Message.from_redis(msg)
|
||||||
|
|
||||||
|
# NOTE(kgriffs): Check to see if any of the filters
|
||||||
|
# indiciate that this message should be skipped.
|
||||||
|
for should_skip in filters:
|
||||||
|
if should_skip(msg):
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
marker['next'] = msg.id
|
||||||
|
|
||||||
|
if to_basic:
|
||||||
|
yield msg.to_basic(now)
|
||||||
|
else:
|
||||||
|
yield msg
|
124
zaqar/queues/storage/redis/models.py
Normal file
124
zaqar/queues/storage/redis/models.py
Normal file
@ -0,0 +1,124 @@
|
|||||||
|
# Copyright (c) 2014 Prashanth Raghu.
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import functools
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
import msgpack
|
||||||
|
|
||||||
|
from zaqar.openstack.common import strutils
|
||||||
|
from zaqar.openstack.common import timeutils
|
||||||
|
|
||||||
|
|
||||||
|
_pack = msgpack.Packer(encoding='utf-8', use_bin_type=True).pack
|
||||||
|
_unpack = functools.partial(msgpack.unpackb, encoding='utf-8')
|
||||||
|
|
||||||
|
|
||||||
|
# TODO(kgriffs): Make similar classes for claims and queues
|
||||||
|
class Message(object):
|
||||||
|
"""Message is used to organize,store and retrieve messages from redis.
|
||||||
|
|
||||||
|
Message class helps organize,store and retrieve messages in a version
|
||||||
|
compatible manner.
|
||||||
|
|
||||||
|
:param id: Message ID in the form of a hexadecimal UUID. If not
|
||||||
|
given, one will be automatically generated.
|
||||||
|
:param ttl: Message TTL in seconds
|
||||||
|
:param created: Message creation time as a UNIX timestamp
|
||||||
|
:param client_uuid: UUID of the client that posted the message
|
||||||
|
:param claim_id: If claimed, the UUID of the claim. Set to None
|
||||||
|
for messages that have never been claimed.
|
||||||
|
:param claim_expires: Claim expiration as a UNIX timestamp
|
||||||
|
:param body: Message payload. Must be serializable to mspack.
|
||||||
|
"""
|
||||||
|
message_data = {}
|
||||||
|
|
||||||
|
__slots__ = (
|
||||||
|
'id',
|
||||||
|
'ttl',
|
||||||
|
'created',
|
||||||
|
'expires',
|
||||||
|
'client_uuid',
|
||||||
|
'claim_id',
|
||||||
|
'claim_expires',
|
||||||
|
'body',
|
||||||
|
)
|
||||||
|
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
self.id = kwargs.get('id', str(uuid.uuid4()))
|
||||||
|
self.ttl = kwargs['ttl']
|
||||||
|
self.created = kwargs['created']
|
||||||
|
self.expires = kwargs.get('expires', self.created + self.ttl)
|
||||||
|
|
||||||
|
self.client_uuid = str(kwargs['client_uuid'])
|
||||||
|
|
||||||
|
self.claim_id = kwargs.get('claim_id')
|
||||||
|
self.claim_expires = kwargs['claim_expires']
|
||||||
|
|
||||||
|
self.body = kwargs['body']
|
||||||
|
|
||||||
|
@property
|
||||||
|
def created_iso(self):
|
||||||
|
return timeutils.iso8601_from_timestamp(self.created)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def from_redis(doc):
|
||||||
|
claim_id = doc[b'c']
|
||||||
|
if claim_id:
|
||||||
|
claim_id = strutils.safe_decode(claim_id)
|
||||||
|
else:
|
||||||
|
claim_id = None
|
||||||
|
|
||||||
|
# NOTE(kgriffs): Under Py3K, redis-py converts all strings
|
||||||
|
# into binary. Woohoo!
|
||||||
|
return Message(
|
||||||
|
id=strutils.safe_decode(doc[b'id']),
|
||||||
|
ttl=int(doc[b't']),
|
||||||
|
created=int(doc[b'cr']),
|
||||||
|
expires=int(doc[b'e']),
|
||||||
|
|
||||||
|
client_uuid=strutils.safe_decode(doc[b'u']),
|
||||||
|
|
||||||
|
claim_id=claim_id,
|
||||||
|
claim_expires=int(doc[b'c.e']),
|
||||||
|
|
||||||
|
body=_unpack(doc[b'b']),
|
||||||
|
)
|
||||||
|
|
||||||
|
def to_redis(self, pipe):
|
||||||
|
doc = {
|
||||||
|
'id': self.id,
|
||||||
|
't': self.ttl,
|
||||||
|
'cr': self.created,
|
||||||
|
'e': self.expires,
|
||||||
|
'u': self.client_uuid,
|
||||||
|
'c': self.claim_id or '',
|
||||||
|
'c.e': self.claim_expires,
|
||||||
|
'b': _pack(self.body),
|
||||||
|
}
|
||||||
|
|
||||||
|
pipe.hmset(self.id, doc)
|
||||||
|
|
||||||
|
def to_basic(self, now, include_created=False):
|
||||||
|
basic_msg = {
|
||||||
|
'id': self.id,
|
||||||
|
'age': now - self.created,
|
||||||
|
'ttl': self.ttl,
|
||||||
|
'body': self.body
|
||||||
|
}
|
||||||
|
|
||||||
|
if include_created:
|
||||||
|
basic_msg['created'] = self.created_iso
|
||||||
|
|
||||||
|
return basic_msg
|
40
zaqar/queues/storage/redis/options.py
Normal file
40
zaqar/queues/storage/redis/options.py
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
# Copyright (c) 2014 Prashanth Raghu.
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
"""Redis storage driver configuration options."""
|
||||||
|
|
||||||
|
from oslo.config import cfg
|
||||||
|
|
||||||
|
|
||||||
|
REDIS_OPTIONS = (
|
||||||
|
cfg.StrOpt('uri', default="redis://127.0.0.1:6379",
|
||||||
|
help=('Redis Server URI. Can also use a '
|
||||||
|
'socket file based connector. '
|
||||||
|
'Ex: redis:/tmp/redis.sock')),
|
||||||
|
|
||||||
|
cfg.IntOpt('max_reconnect_attempts', default=10,
|
||||||
|
help=('Maximum number of times to retry an operation that '
|
||||||
|
'failed due to a redis node failover.')),
|
||||||
|
|
||||||
|
cfg.FloatOpt('reconnect_sleep', default=1,
|
||||||
|
help=('Base sleep interval between attempts to reconnect '
|
||||||
|
'after a redis node failover. '))
|
||||||
|
|
||||||
|
)
|
||||||
|
|
||||||
|
REDIS_GROUP = 'drivers:storage:redis'
|
||||||
|
|
||||||
|
|
||||||
|
def _config_options():
|
||||||
|
return [(REDIS_GROUP, REDIS_OPTIONS)]
|
253
zaqar/queues/storage/redis/queues.py
Normal file
253
zaqar/queues/storage/redis/queues.py
Normal file
@ -0,0 +1,253 @@
|
|||||||
|
# Copyright (c) 2014 Prashanth Raghu.
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import functools
|
||||||
|
|
||||||
|
import msgpack
|
||||||
|
import redis
|
||||||
|
|
||||||
|
from zaqar.common import decorators
|
||||||
|
from zaqar.openstack.common import log as logging
|
||||||
|
from zaqar.openstack.common import timeutils
|
||||||
|
from zaqar.queues import storage
|
||||||
|
from zaqar.queues.storage import errors
|
||||||
|
from zaqar.queues.storage.redis import messages
|
||||||
|
from zaqar.queues.storage.redis import utils
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
QUEUES_SET_STORE_NAME = 'queues_set'
|
||||||
|
MESSAGE_IDS_SUFFIX = 'messages'
|
||||||
|
|
||||||
|
|
||||||
|
class QueueController(storage.Queue):
|
||||||
|
"""Implements queue resource operations using Redis.
|
||||||
|
|
||||||
|
Queues are scoped by project, which is prefixed to the
|
||||||
|
queue name.
|
||||||
|
|
||||||
|
Queues (Redis sorted set):
|
||||||
|
|
||||||
|
Key: queues_set
|
||||||
|
|
||||||
|
Id Value
|
||||||
|
---------------------------------
|
||||||
|
name -> <project-id_q-name>
|
||||||
|
|
||||||
|
|
||||||
|
The set helps faster existence checks, while the list helps
|
||||||
|
paginated retrieval of queues.
|
||||||
|
|
||||||
|
Queue Information (Redis hash):
|
||||||
|
|
||||||
|
Key: <project-id_q-name>
|
||||||
|
|
||||||
|
Name Field
|
||||||
|
-------------------------------
|
||||||
|
count -> c
|
||||||
|
num_msgs_claimed -> cl
|
||||||
|
metadata -> m
|
||||||
|
creation timestamp -> t
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super(QueueController, self).__init__(*args, **kwargs)
|
||||||
|
self._client = self.driver.connection
|
||||||
|
self._packer = msgpack.Packer(encoding='utf-8',
|
||||||
|
use_bin_type=True).pack
|
||||||
|
self._unpacker = functools.partial(msgpack.unpackb, encoding='utf-8')
|
||||||
|
|
||||||
|
@decorators.lazy_property(write=False)
|
||||||
|
def _message_ctrl(self):
|
||||||
|
return self.driver.message_controller
|
||||||
|
|
||||||
|
def _claim_counter_key(self, name, project):
|
||||||
|
return utils.scope_queue_name(name, project)
|
||||||
|
|
||||||
|
def _inc_counter(self, name, project, amount=1, pipe=None):
|
||||||
|
queue_key = utils.scope_queue_name(name, project)
|
||||||
|
|
||||||
|
client = pipe if pipe is not None else self._client
|
||||||
|
client.hincrby(queue_key, 'c', amount)
|
||||||
|
|
||||||
|
def _inc_claimed(self, name, project, amount=1, pipe=None):
|
||||||
|
queue_key = utils.scope_queue_name(name, project)
|
||||||
|
|
||||||
|
client = pipe if pipe is not None else self._client
|
||||||
|
client.hincrby(queue_key, 'cl', amount)
|
||||||
|
|
||||||
|
# TODO(kgriffs): Reimplement in Lua; this is way too expensive!
|
||||||
|
def _get_expired_message_count(self, name, project):
|
||||||
|
"""Calculate the number of expired messages in the queue.
|
||||||
|
|
||||||
|
Used to compute the stats on the queue.
|
||||||
|
Method has O(n) complexity as we iterate the entire list of
|
||||||
|
messages.
|
||||||
|
"""
|
||||||
|
|
||||||
|
messages_set_key = utils.scope_message_ids_set(name, project,
|
||||||
|
MESSAGE_IDS_SUFFIX)
|
||||||
|
|
||||||
|
with self._client.pipeline() as pipe:
|
||||||
|
for msg_key in self._client.zrange(messages_set_key, 0, -1):
|
||||||
|
pipe.hgetall(msg_key)
|
||||||
|
|
||||||
|
raw_messages = pipe.execute()
|
||||||
|
|
||||||
|
expired = 0
|
||||||
|
now = timeutils.utcnow_ts()
|
||||||
|
|
||||||
|
for msg in raw_messages:
|
||||||
|
if msg:
|
||||||
|
msg = messages.Message.from_redis(msg)
|
||||||
|
if utils.msg_expired_filter(msg, now):
|
||||||
|
expired += 1
|
||||||
|
|
||||||
|
return expired
|
||||||
|
|
||||||
|
def _get_queue_info(self, queue_key, fields, transform=str):
|
||||||
|
"""Get one or more fields from Queue Info."""
|
||||||
|
|
||||||
|
values = self._client.hmget(queue_key, fields)
|
||||||
|
return [transform(v) for v in values] if transform else values
|
||||||
|
|
||||||
|
@utils.raises_conn_error
|
||||||
|
@utils.retries_on_connection_error
|
||||||
|
def list(self, project=None, marker=None,
|
||||||
|
limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False):
|
||||||
|
client = self._client
|
||||||
|
qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project)
|
||||||
|
marker = utils.scope_queue_name(marker, project)
|
||||||
|
rank = client.zrank(qset_key, marker)
|
||||||
|
start = rank + 1 if rank else 0
|
||||||
|
|
||||||
|
cursor = (q for q in client.zrange(qset_key, start,
|
||||||
|
start + limit - 1))
|
||||||
|
marker_next = {}
|
||||||
|
|
||||||
|
def denormalizer(info, name):
|
||||||
|
queue = {'name': utils.descope_queue_name(name)}
|
||||||
|
marker_next['next'] = queue['name']
|
||||||
|
if detailed:
|
||||||
|
queue['metadata'] = info[1]
|
||||||
|
|
||||||
|
return queue
|
||||||
|
|
||||||
|
yield utils.QueueListCursor(self._client, cursor, denormalizer)
|
||||||
|
yield marker_next and marker_next['next']
|
||||||
|
|
||||||
|
def get(self, name, project=None):
|
||||||
|
"""Obtain the metadata from the queue."""
|
||||||
|
return self.get_metadata(name, project)
|
||||||
|
|
||||||
|
@utils.raises_conn_error
|
||||||
|
def create(self, name, metadata=None, project=None):
|
||||||
|
# TODO(prashanthr_): Implement as a lua script.
|
||||||
|
queue_key = utils.scope_queue_name(name, project)
|
||||||
|
qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project)
|
||||||
|
|
||||||
|
# Check if the queue already exists.
|
||||||
|
if self.exists(name, project):
|
||||||
|
return False
|
||||||
|
|
||||||
|
queue = {
|
||||||
|
'c': 0,
|
||||||
|
'cl': 0,
|
||||||
|
'm': self._packer(metadata or {}),
|
||||||
|
't': timeutils.utcnow_ts()
|
||||||
|
}
|
||||||
|
|
||||||
|
# Pipeline ensures atomic inserts.
|
||||||
|
with self._client.pipeline() as pipe:
|
||||||
|
pipe.zadd(qset_key, 1, queue_key).hmset(queue_key, queue)
|
||||||
|
|
||||||
|
try:
|
||||||
|
pipe.execute()
|
||||||
|
except redis.exceptions.ResponseError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
@utils.raises_conn_error
|
||||||
|
@utils.retries_on_connection_error
|
||||||
|
def exists(self, name, project=None):
|
||||||
|
# TODO(prashanthr_): Cache this lookup
|
||||||
|
queue_key = utils.scope_queue_name(name, project)
|
||||||
|
qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project)
|
||||||
|
|
||||||
|
return self._client.zrank(qset_key, queue_key) is not None
|
||||||
|
|
||||||
|
@utils.raises_conn_error
|
||||||
|
@utils.retries_on_connection_error
|
||||||
|
def set_metadata(self, name, metadata, project=None):
|
||||||
|
if not self.exists(name, project):
|
||||||
|
raise errors.QueueDoesNotExist(name, project)
|
||||||
|
|
||||||
|
key = utils.scope_queue_name(name, project)
|
||||||
|
fields = {'m': self._packer(metadata)}
|
||||||
|
|
||||||
|
self._client.hmset(key, fields)
|
||||||
|
|
||||||
|
@utils.raises_conn_error
|
||||||
|
@utils.retries_on_connection_error
|
||||||
|
def get_metadata(self, name, project=None):
|
||||||
|
if not self.exists(name, project):
|
||||||
|
raise errors.QueueDoesNotExist(name, project)
|
||||||
|
|
||||||
|
queue_key = utils.scope_queue_name(name, project)
|
||||||
|
metadata = self._get_queue_info(queue_key, b'm', None)[0]
|
||||||
|
|
||||||
|
return self._unpacker(metadata)
|
||||||
|
|
||||||
|
@utils.raises_conn_error
|
||||||
|
@utils.retries_on_connection_error
|
||||||
|
def delete(self, name, project=None):
|
||||||
|
queue_key = utils.scope_queue_name(name, project)
|
||||||
|
qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project)
|
||||||
|
|
||||||
|
# NOTE(prashanthr_): Pipelining is used to mitigate race conditions
|
||||||
|
with self._client.pipeline() as pipe:
|
||||||
|
pipe.zrem(qset_key, queue_key)
|
||||||
|
pipe.delete(queue_key)
|
||||||
|
self._message_ctrl._delete_queue_messages(name, project, pipe)
|
||||||
|
|
||||||
|
pipe.execute()
|
||||||
|
|
||||||
|
@utils.raises_conn_error
|
||||||
|
@utils.retries_on_connection_error
|
||||||
|
def stats(self, name, project=None):
|
||||||
|
if not self.exists(name, project=project):
|
||||||
|
raise errors.QueueDoesNotExist(name, project)
|
||||||
|
|
||||||
|
queue_key = utils.scope_queue_name(name, project)
|
||||||
|
|
||||||
|
claimed, total = self._get_queue_info(queue_key, [b'cl', b'c'], int)
|
||||||
|
expired = self._get_expired_message_count(name, project)
|
||||||
|
|
||||||
|
message_stats = {
|
||||||
|
'claimed': claimed,
|
||||||
|
'free': total - claimed - expired,
|
||||||
|
'total': total
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
newest = self._message_ctrl.first(name, project, -1)
|
||||||
|
oldest = self._message_ctrl.first(name, project, 1)
|
||||||
|
except errors.QueueIsEmpty:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
message_stats['newest'] = newest
|
||||||
|
message_stats['oldest'] = oldest
|
||||||
|
|
||||||
|
return {'messages': message_stats}
|
193
zaqar/queues/storage/redis/utils.py
Normal file
193
zaqar/queues/storage/redis/utils.py
Normal file
@ -0,0 +1,193 @@
|
|||||||
|
# Copyright (c) 2014 Prashanth Raghu.
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import functools
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
import redis
|
||||||
|
import six
|
||||||
|
|
||||||
|
from zaqar.i18n import _
|
||||||
|
from zaqar.openstack.common import log as logging
|
||||||
|
from zaqar.openstack.common import strutils
|
||||||
|
from zaqar.queues.storage import errors
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def descope_queue_name(scoped_name):
|
||||||
|
"""Descope Queue name with '.'.
|
||||||
|
|
||||||
|
Returns the queue name from the scoped name
|
||||||
|
which is of the form project-id.queue-name
|
||||||
|
"""
|
||||||
|
|
||||||
|
return scoped_name.split('.')[1]
|
||||||
|
|
||||||
|
|
||||||
|
def normalize_none_str(string_or_none):
|
||||||
|
"""Returns '' IFF given value is None, passthrough otherwise.
|
||||||
|
|
||||||
|
This function normalizes None to the empty string to facilitate
|
||||||
|
string concatenation when a variable could be None.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# TODO(prashanthr_) : Try to reuse this utility. Violates DRY
|
||||||
|
return '' if string_or_none is None else string_or_none
|
||||||
|
|
||||||
|
|
||||||
|
def generate_uuid():
|
||||||
|
return str(uuid.uuid4())
|
||||||
|
|
||||||
|
|
||||||
|
def scope_queue_name(queue=None, project=None):
|
||||||
|
"""Returns a scoped name for a queue based on project and queue.
|
||||||
|
|
||||||
|
If only the project name is specified, a scope signifying "all queues"
|
||||||
|
for that project is returned. If neither queue nor project are
|
||||||
|
specified, a scope for "all global queues" is returned, which
|
||||||
|
is to be interpreted as excluding queues scoped by project.
|
||||||
|
|
||||||
|
:returns: '{project}.{queue}' if project and queue are given,
|
||||||
|
'{project}.' if ONLY project is given, '.{queue}' if ONLY
|
||||||
|
queue is given, and '.' if neither are given.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# TODO(prashanthr_) : Try to reuse this utility. Violates DRY
|
||||||
|
return normalize_none_str(project) + '.' + normalize_none_str(queue)
|
||||||
|
|
||||||
|
# NOTE(prashanthr_): Aliase the scope_queue_name function
|
||||||
|
# to be used in the pools and claims controller as similar
|
||||||
|
# functionality is required to scope redis id's.
|
||||||
|
scope_pool_catalogue = scope_claim_messages = scope_queue_name
|
||||||
|
|
||||||
|
|
||||||
|
def scope_message_ids_set(queue=None, project=None, message_suffix=''):
|
||||||
|
"""Scope messages set with '.'
|
||||||
|
|
||||||
|
Returns a scoped name for the list of messages in the form
|
||||||
|
project-id_queue-name_suffix
|
||||||
|
"""
|
||||||
|
|
||||||
|
return (normalize_none_str(project) + '.' +
|
||||||
|
normalize_none_str(queue) + '.' +
|
||||||
|
message_suffix)
|
||||||
|
|
||||||
|
# NOTE(prashanthr_): Aliasing the scope_message_ids_set function
|
||||||
|
# to be used in the pools and claims controller as similar
|
||||||
|
# functionality is required to scope redis id's.
|
||||||
|
scope_queue_catalogue = scope_claims_set = scope_message_ids_set
|
||||||
|
scope_queue_index = scope_message_ids_set
|
||||||
|
|
||||||
|
|
||||||
|
def raises_conn_error(func):
|
||||||
|
"""Handles the Redis ConnectionFailure error.
|
||||||
|
|
||||||
|
This decorator catches Redis's ConnectionError
|
||||||
|
and raises Marconi's ConnectionError instead.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Note(prashanthr_) : Try to reuse this utility. Violates DRY
|
||||||
|
# Can pass exception type into the decorator and create a
|
||||||
|
# storage level utility.
|
||||||
|
|
||||||
|
@functools.wraps(func)
|
||||||
|
def wrapper(*args, **kwargs):
|
||||||
|
try:
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
except redis.exceptions.ConnectionError as ex:
|
||||||
|
LOG.exception(ex)
|
||||||
|
raise errors.ConnectionError()
|
||||||
|
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
|
def retries_on_connection_error(func):
|
||||||
|
"""Causes the wrapped function to be re-called on ConnectionError.
|
||||||
|
|
||||||
|
This decorator catches Redis ConnectionError and retries
|
||||||
|
the function call.
|
||||||
|
|
||||||
|
.. Note::
|
||||||
|
Assumes that the decorated function has defined self.driver.redis_cinf
|
||||||
|
so that `max_reconnect_attempts` and `reconnect_sleep` can be taken
|
||||||
|
into account.
|
||||||
|
|
||||||
|
.. Warning:: The decorated function must be idempotent.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@functools.wraps(func)
|
||||||
|
def wrapper(self, *args, **kwargs):
|
||||||
|
# TODO(prashanthr_) : Try to reuse this utility. Violates DRY
|
||||||
|
# Can pass config parameters into the decorator and create a
|
||||||
|
# storage level utility.
|
||||||
|
|
||||||
|
max_attemps = self.driver.redis_conf.max_reconnect_attempts
|
||||||
|
sleep_sec = self.driver.redis_conf.reconnect_sleep
|
||||||
|
|
||||||
|
for attempt in range(max_attemps):
|
||||||
|
try:
|
||||||
|
return func(self, *args, **kwargs)
|
||||||
|
except redis.exceptions.ConnectionError:
|
||||||
|
ex = sys.exc_info()[1]
|
||||||
|
LOG.warn(_(u'Caught ConnectionError, retrying the '
|
||||||
|
'call to {0}').format(func))
|
||||||
|
|
||||||
|
time.sleep(sleep_sec * (2 ** attempt))
|
||||||
|
else:
|
||||||
|
LOG.error(_(u'Caught ConnectionError, maximum attempts '
|
||||||
|
'to {0} exceeded.').format(func))
|
||||||
|
raise ex
|
||||||
|
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
|
def msg_claimed_filter(message, now):
|
||||||
|
"""Return True IFF the message is currently claimed."""
|
||||||
|
|
||||||
|
return message.claim_id and (now < message.claim_expires)
|
||||||
|
|
||||||
|
|
||||||
|
def msg_echo_filter(message, client_uuid):
|
||||||
|
"""Return True IFF the specified client posted the message."""
|
||||||
|
|
||||||
|
return message.client_uuid == six.text_type(client_uuid)
|
||||||
|
|
||||||
|
|
||||||
|
def msg_expired_filter(message, now):
|
||||||
|
"""Return True IFF the message has expired."""
|
||||||
|
|
||||||
|
return message.expires <= now
|
||||||
|
|
||||||
|
|
||||||
|
class QueueListCursor(object):
|
||||||
|
|
||||||
|
def __init__(self, client, queues, denormalizer):
|
||||||
|
self.queue_iter = queues
|
||||||
|
self.denormalizer = denormalizer
|
||||||
|
self.client = client
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
@raises_conn_error
|
||||||
|
def next(self):
|
||||||
|
curr = next(self.queue_iter)
|
||||||
|
queue = self.client.hmget(curr, ['c', 'm'])
|
||||||
|
return self.denormalizer(queue, strutils.safe_decode(curr))
|
||||||
|
|
||||||
|
def __next__(self):
|
||||||
|
return self.next()
|
@ -120,7 +120,7 @@ def keyify(key, iterable):
|
|||||||
|
|
||||||
|
|
||||||
def can_connect(uri):
|
def can_connect(uri):
|
||||||
"""Given a URI, verifies whether its possible to connect to it.
|
"""Given a URI, verifies whether it's possible to connect to it.
|
||||||
|
|
||||||
:param uri: connection string to a storage endpoint
|
:param uri: connection string to a storage endpoint
|
||||||
:type uri: six.text_type
|
:type uri: six.text_type
|
||||||
|
@ -169,11 +169,6 @@ class CollectionResource(object):
|
|||||||
|
|
||||||
except storage_errors.MessageConflict as ex:
|
except storage_errors.MessageConflict as ex:
|
||||||
LOG.exception(ex)
|
LOG.exception(ex)
|
||||||
message_ids = ex.succeeded_ids
|
|
||||||
|
|
||||||
if not message_ids:
|
|
||||||
# TODO(kgriffs): Include error code that is different
|
|
||||||
# from the code used in the generic case, below.
|
|
||||||
description = _(u'No messages could be enqueued.')
|
description = _(u'No messages could be enqueued.')
|
||||||
raise wsgi_errors.HTTPServiceUnavailable(description)
|
raise wsgi_errors.HTTPServiceUnavailable(description)
|
||||||
|
|
||||||
|
@ -190,11 +190,6 @@ class CollectionResource(object):
|
|||||||
|
|
||||||
except storage_errors.MessageConflict as ex:
|
except storage_errors.MessageConflict as ex:
|
||||||
LOG.exception(ex)
|
LOG.exception(ex)
|
||||||
message_ids = ex.succeeded_ids
|
|
||||||
|
|
||||||
if not message_ids:
|
|
||||||
# TODO(kgriffs): Include error code that is different
|
|
||||||
# from the code used in the generic case, below.
|
|
||||||
description = _(u'No messages could be enqueued.')
|
description = _(u'No messages could be enqueued.')
|
||||||
raise wsgi_errors.HTTPServiceUnavailable(description)
|
raise wsgi_errors.HTTPServiceUnavailable(description)
|
||||||
|
|
||||||
|
@ -25,4 +25,5 @@ RUN_SLOW_TESTS = not SKIP_SLOW_TESTS
|
|||||||
expect = helpers.expect
|
expect = helpers.expect
|
||||||
is_slow = helpers.is_slow
|
is_slow = helpers.is_slow
|
||||||
requires_mongodb = helpers.requires_mongodb
|
requires_mongodb = helpers.requires_mongodb
|
||||||
|
requires_redis = helpers.requires_redis
|
||||||
TestBase = base.TestBase
|
TestBase = base.TestBase
|
||||||
|
@ -24,6 +24,7 @@ import testtools
|
|||||||
|
|
||||||
SKIP_SLOW_TESTS = os.environ.get('ZAQAR_TEST_SLOW') is None
|
SKIP_SLOW_TESTS = os.environ.get('ZAQAR_TEST_SLOW') is None
|
||||||
SKIP_MONGODB_TESTS = os.environ.get('ZAQAR_TEST_MONGODB') is None
|
SKIP_MONGODB_TESTS = os.environ.get('ZAQAR_TEST_MONGODB') is None
|
||||||
|
SKIP_REDIS_TESTS = os.environ.get('ZAQAR_TEST_REDIS') is None
|
||||||
|
|
||||||
|
|
||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
@ -205,6 +206,22 @@ def requires_mongodb(test_case):
|
|||||||
return testtools.skipIf(SKIP_MONGODB_TESTS, reason)(test_case)
|
return testtools.skipIf(SKIP_MONGODB_TESTS, reason)(test_case)
|
||||||
|
|
||||||
|
|
||||||
|
def requires_redis(test_case):
|
||||||
|
"""Decorator to flag a test case as being dependent on Redis.
|
||||||
|
|
||||||
|
Redis-specific tests will be skipped unless the MARCONI_TEST_REDIS
|
||||||
|
environment variable is set. If the variable is set, the tests will
|
||||||
|
assume that redis is running and listening on localhost.
|
||||||
|
"""
|
||||||
|
|
||||||
|
reason = ('Skipping tests that require Redis. Ensure '
|
||||||
|
'Redis is running on localhost and then set '
|
||||||
|
'ZAQAR_TEST_REDIS in order to enable tests '
|
||||||
|
'that are specific to this storage backend. ')
|
||||||
|
|
||||||
|
return testtools.skipIf(SKIP_REDIS_TESTS, reason)(test_case)
|
||||||
|
|
||||||
|
|
||||||
def is_slow(condition=lambda self: True):
|
def is_slow(condition=lambda self: True):
|
||||||
"""Decorator to flag slow tests.
|
"""Decorator to flag slow tests.
|
||||||
|
|
||||||
|
@ -226,6 +226,119 @@ class QueueControllerTest(ControllerBaseTest):
|
|||||||
self.assertNotIn('newest', message_stats)
|
self.assertNotIn('newest', message_stats)
|
||||||
self.assertNotIn('oldest', message_stats)
|
self.assertNotIn('oldest', message_stats)
|
||||||
|
|
||||||
|
def test_queue_count_on_bulk_delete(self):
|
||||||
|
self.addCleanup(self.controller.delete, 'test-queue',
|
||||||
|
project=self.project)
|
||||||
|
queue_name = 'test-queue'
|
||||||
|
client_uuid = uuid.uuid4()
|
||||||
|
|
||||||
|
created = self.controller.create(queue_name, project=self.project)
|
||||||
|
self.assertTrue(created)
|
||||||
|
|
||||||
|
# Create 10 messages.
|
||||||
|
msg_keys = _insert_fixtures(self.message_controller, queue_name,
|
||||||
|
project=self.project,
|
||||||
|
client_uuid=client_uuid, num=10)
|
||||||
|
|
||||||
|
stats = self.controller.stats(queue_name,
|
||||||
|
self.project)['messages']
|
||||||
|
self.assertEqual(stats['total'], 10)
|
||||||
|
|
||||||
|
# Delete 5 messages
|
||||||
|
self.message_controller.bulk_delete(queue_name, msg_keys[0:5],
|
||||||
|
self.project)
|
||||||
|
|
||||||
|
stats = self.controller.stats(queue_name,
|
||||||
|
self.project)['messages']
|
||||||
|
self.assertEqual(stats['total'], 5)
|
||||||
|
|
||||||
|
def test_queue_count_on_bulk_delete_with_invalid_id(self):
|
||||||
|
self.addCleanup(self.controller.delete, 'test-queue',
|
||||||
|
project=self.project)
|
||||||
|
queue_name = 'test-queue'
|
||||||
|
client_uuid = uuid.uuid4()
|
||||||
|
|
||||||
|
created = self.controller.create(queue_name, project=self.project)
|
||||||
|
self.assertTrue(created)
|
||||||
|
|
||||||
|
# Create 10 messages.
|
||||||
|
msg_keys = _insert_fixtures(self.message_controller, queue_name,
|
||||||
|
project=self.project,
|
||||||
|
client_uuid=client_uuid, num=10)
|
||||||
|
|
||||||
|
stats = self.controller.stats(queue_name,
|
||||||
|
self.project)['messages']
|
||||||
|
self.assertEqual(stats['total'], 10)
|
||||||
|
|
||||||
|
# Delete 5 messages
|
||||||
|
self.message_controller.bulk_delete(queue_name,
|
||||||
|
msg_keys[0:5] + ['invalid'],
|
||||||
|
self.project)
|
||||||
|
|
||||||
|
stats = self.controller.stats(queue_name,
|
||||||
|
self.project)['messages']
|
||||||
|
self.assertEqual(stats['total'], 5)
|
||||||
|
|
||||||
|
def test_queue_count_on_delete(self):
|
||||||
|
self.addCleanup(self.controller.delete, 'test-queue',
|
||||||
|
project=self.project)
|
||||||
|
queue_name = 'test-queue'
|
||||||
|
client_uuid = uuid.uuid4()
|
||||||
|
|
||||||
|
created = self.controller.create(queue_name, project=self.project)
|
||||||
|
self.assertTrue(created)
|
||||||
|
|
||||||
|
# Create 10 messages.
|
||||||
|
msg_keys = _insert_fixtures(self.message_controller, queue_name,
|
||||||
|
project=self.project,
|
||||||
|
client_uuid=client_uuid, num=10)
|
||||||
|
|
||||||
|
stats = self.controller.stats(queue_name,
|
||||||
|
self.project)['messages']
|
||||||
|
self.assertEqual(stats['total'], 10)
|
||||||
|
|
||||||
|
# Delete 1 message
|
||||||
|
self.message_controller.delete(queue_name, msg_keys[0],
|
||||||
|
self.project)
|
||||||
|
stats = self.controller.stats(queue_name,
|
||||||
|
self.project)['messages']
|
||||||
|
self.assertEqual(stats['total'], 9)
|
||||||
|
|
||||||
|
def test_queue_count_on_claim_delete(self):
|
||||||
|
self.addCleanup(self.controller.delete, 'test-queue',
|
||||||
|
project=self.project)
|
||||||
|
queue_name = 'test-queue'
|
||||||
|
client_uuid = uuid.uuid4()
|
||||||
|
|
||||||
|
created = self.controller.create(queue_name, project=self.project)
|
||||||
|
self.assertTrue(created)
|
||||||
|
|
||||||
|
# Create 15 messages.
|
||||||
|
_insert_fixtures(self.message_controller, queue_name,
|
||||||
|
project=self.project,
|
||||||
|
client_uuid=client_uuid, num=15)
|
||||||
|
|
||||||
|
stats = self.controller.stats(queue_name,
|
||||||
|
self.project)['messages']
|
||||||
|
self.assertEqual(stats['total'], 15)
|
||||||
|
|
||||||
|
metadata = {'ttl': 120, 'grace': 60}
|
||||||
|
# Claim 10 messages
|
||||||
|
claim_id, _ = self.claim_controller.create(queue_name, metadata,
|
||||||
|
self.project)
|
||||||
|
|
||||||
|
stats = self.controller.stats(queue_name,
|
||||||
|
self.project)['messages']
|
||||||
|
self.assertEqual(stats['claimed'], 10)
|
||||||
|
|
||||||
|
# Delete the claim
|
||||||
|
self.claim_controller.delete(queue_name, claim_id,
|
||||||
|
self.project)
|
||||||
|
stats = self.controller.stats(queue_name,
|
||||||
|
self.project)['messages']
|
||||||
|
|
||||||
|
self.assertEqual(stats['claimed'], 0)
|
||||||
|
|
||||||
|
|
||||||
class MessageControllerTest(ControllerBaseTest):
|
class MessageControllerTest(ControllerBaseTest):
|
||||||
"""Message Controller base tests.
|
"""Message Controller base tests.
|
||||||
@ -1174,5 +1287,5 @@ def _insert_fixtures(controller, queue_name, project=None,
|
|||||||
'event': 'Event number {0}'.format(n)
|
'event': 'Event number {0}'.format(n)
|
||||||
}}
|
}}
|
||||||
|
|
||||||
controller.post(queue_name, messages(),
|
return controller.post(queue_name, messages(),
|
||||||
project=project, client_uuid=client_uuid)
|
project=project, client_uuid=client_uuid)
|
||||||
|
Loading…
Reference in New Issue
Block a user