Fix redis CI job

Change-Id: Ia59ad6f5ce6311eda92cf033a72df9c94e1ffb18
This commit is contained in:
xywang 2018-12-11 11:40:20 +08:00
parent 7c2d6a970d
commit 3641f28cc6
9 changed files with 38 additions and 24 deletions

View File

@ -48,7 +48,7 @@ python-subunit==1.0.0
python-swiftclient==3.2.0 python-swiftclient==3.2.0
pytz==2013.6 pytz==2013.6
PyYAML==3.12 PyYAML==3.12
redis==2.10.0 redis==3.0.0
reno==2.5.0 reno==2.5.0
requests==2.14.2 requests==2.14.2
requestsexceptions==1.2.0 requestsexceptions==1.2.0

View File

@ -8,7 +8,7 @@ hacking!=0.13.0,<0.14,>=0.12.0 # Apache-2.0
mock>=2.0.0 # BSD mock>=2.0.0 # BSD
# Backends # Backends
redis>=2.10.0 # MIT redis>=3.0.0 # MIT
pymongo>=3.6.0 # Apache-2.0 pymongo>=3.6.0 # Apache-2.0
python-swiftclient>=3.2.0 # Apache-2.0 python-swiftclient>=3.2.0 # Apache-2.0
websocket-client>=0.44.0 # LGPLv2+ websocket-client>=0.44.0 # LGPLv2+

View File

@ -92,7 +92,7 @@ class CatalogueController(base.CatalogueBase):
} }
# Pipeline ensures atomic inserts. # Pipeline ensures atomic inserts.
with self._client.pipeline() as pipe: with self._client.pipeline() as pipe:
pipe.zadd(catalogue_project_key, 1, queue_key) pipe.zadd(catalogue_project_key, {queue_key: 1})
pipe.hmset(catalogue_queue_key, catalogue) pipe.hmset(catalogue_queue_key, catalogue)
try: try:

View File

@ -315,7 +315,7 @@ class ClaimController(storage.Claim, scripting.Mixin):
claims_set_key = utils.scope_claims_set(queue, project, claims_set_key = utils.scope_claims_set(queue, project,
QUEUE_CLAIMS_SUFFIX) QUEUE_CLAIMS_SUFFIX)
pipe.zadd(claims_set_key, claim_expires, claim_id) pipe.zadd(claims_set_key, {claim_id: claim_expires})
pipe.execute() pipe.execute()
if ('_max_claim_count' in queue_meta and if ('_max_claim_count' in queue_meta and
@ -430,7 +430,7 @@ class ClaimController(storage.Claim, scripting.Mixin):
claims_set_key = utils.scope_claims_set(queue, project, claims_set_key = utils.scope_claims_set(queue, project,
QUEUE_CLAIMS_SUFFIX) QUEUE_CLAIMS_SUFFIX)
pipe.zadd(claims_set_key, claim_expires, claim_id) pipe.zadd(claims_set_key, {claim_id: claim_expires})
pipe.execute() pipe.execute()

View File

@ -78,7 +78,10 @@ class FlavorsController(base.FlavorsBase):
client = self._client client = self._client
subset_key = utils.flavor_project_subset_key(project) subset_key = utils.flavor_project_subset_key(project)
marker_key = utils.flavor_name_hash_key(marker) marker_key = utils.flavor_name_hash_key(marker)
if marker_key:
rank = client.zrank(subset_key, marker_key) rank = client.zrank(subset_key, marker_key)
else:
rank = None
start = rank + 1 if rank is not None else 0 start = rank + 1 if rank is not None else 0
cursor = (f for f in client.zrange(subset_key, start, cursor = (f for f in client.zrange(subset_key, start,
@ -119,8 +122,8 @@ class FlavorsController(base.FlavorsBase):
} }
# Pipeline ensures atomic inserts. # Pipeline ensures atomic inserts.
with self._client.pipeline() as pipe: with self._client.pipeline() as pipe:
pipe.zadd(set_key, 1, hash_key) pipe.zadd(set_key, {hash_key: 1})
pipe.zadd(subset_key, 1, hash_key) pipe.zadd(subset_key, {hash_key: 1})
pipe.hmset(hash_key, flavors) pipe.hmset(hash_key, flavors)
pipe.execute() pipe.execute()
else: else:

View File

@ -135,7 +135,7 @@ class MessageController(storage.Message, scripting.Mixin):
return self._client.zcard(utils.msgset_key(queue, project)) return self._client.zcard(utils.msgset_key(queue, project))
def _create_msgset(self, queue, project, pipe): def _create_msgset(self, queue, project, pipe):
pipe.zadd(MSGSET_INDEX_KEY, 1, utils.msgset_key(queue, project)) pipe.zadd(MSGSET_INDEX_KEY, {utils.msgset_key(queue, project): 1})
def _delete_msgset(self, queue, project, pipe): def _delete_msgset(self, queue, project, pipe):
pipe.zrem(MSGSET_INDEX_KEY, utils.msgset_key(queue, project)) pipe.zrem(MSGSET_INDEX_KEY, utils.msgset_key(queue, project))
@ -243,7 +243,10 @@ class MessageController(storage.Message, scripting.Mixin):
# of the queue; otherwise we would just filter them all # of the queue; otherwise we would just filter them all
# out and likely end up with an empty list to return. # out and likely end up with an empty list to return.
marker = self._find_first_unclaimed(queue, project, limit) marker = self._find_first_unclaimed(queue, project, limit)
if marker:
start = client.zrank(msgset_key, marker) or 0 start = client.zrank(msgset_key, marker) or 0
else:
start = 0
else: else:
rank = client.zrank(msgset_key, marker) rank = client.zrank(msgset_key, marker)
start = rank + 1 if rank else 0 start = rank + 1 if rank else 0

View File

@ -97,7 +97,10 @@ class PoolsController(base.PoolsBase):
client = self._client client = self._client
set_key = utils.pools_set_key() set_key = utils.pools_set_key()
marker_key = utils.pools_name_hash_key(marker) marker_key = utils.pools_name_hash_key(marker)
if marker_key:
rank = client.zrank(set_key, marker_key) rank = client.zrank(set_key, marker_key)
else:
rank = None
start = rank + 1 if rank is not None else 0 start = rank + 1 if rank is not None else 0
cursor = (f for f in client.zrange(set_key, start, cursor = (f for f in client.zrange(set_key, start,
@ -163,9 +166,9 @@ class PoolsController(base.PoolsBase):
} }
# Pipeline ensures atomic inserts. # Pipeline ensures atomic inserts.
with self._client.pipeline() as pipe: with self._client.pipeline() as pipe:
pipe.zadd(set_key, 1, pool_key) pipe.zadd(set_key, {pool_key: 1})
if flavor is not None: if flavor is not None:
pipe.zadd(subset_key, 1, pool_key) pipe.zadd(subset_key, {pool_key: 1})
pipe.hmset(pool_key, pool) pipe.hmset(pool_key, pool)
pipe.execute() pipe.execute()
@ -200,7 +203,7 @@ class PoolsController(base.PoolsBase):
if flavor_old != flavor_new: if flavor_old != flavor_new:
if flavor_new is not None: if flavor_new is not None:
new_subset_key = utils.pools_subset_key(flavor_new) new_subset_key = utils.pools_subset_key(flavor_new)
pipe.zadd(new_subset_key, 1, pool_key) pipe.zadd(new_subset_key, {pool_key: 1})
# (gengchc2) remove pool from flavor_old.pools subset # (gengchc2) remove pool from flavor_old.pools subset
if flavor_old is not None: if flavor_old is not None:
old_subset_key = utils.pools_subset_key(flavor_old) old_subset_key = utils.pools_subset_key(flavor_old)

View File

@ -89,7 +89,10 @@ class QueueController(storage.Queue):
client = self._client client = self._client
qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project) qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project)
marker = utils.scope_queue_name(marker, project) marker = utils.scope_queue_name(marker, project)
if marker:
rank = client.zrank(qset_key, marker) rank = client.zrank(qset_key, marker)
else:
rank = None
start = rank + 1 if rank else 0 start = rank + 1 if rank else 0
cursor = (q for q in client.zrange(qset_key, start, cursor = (q for q in client.zrange(qset_key, start,
@ -133,7 +136,7 @@ class QueueController(storage.Queue):
# Pipeline ensures atomic inserts. # Pipeline ensures atomic inserts.
with self._client.pipeline() as pipe: with self._client.pipeline() as pipe:
pipe.zadd(qset_key, 1, queue_key).hmset(queue_key, queue) pipe.zadd(qset_key, {queue_key: 1}).hmset(queue_key, queue)
try: try:
pipe.execute() pipe.execute()

View File

@ -59,7 +59,10 @@ class SubscriptionController(base.Subscription):
subset_key = utils.scope_subscription_ids_set(queue, subset_key = utils.scope_subscription_ids_set(queue,
project, project,
SUBSCRIPTION_IDS_SUFFIX) SUBSCRIPTION_IDS_SUFFIX)
if marker:
rank = client.zrank(subset_key, marker) rank = client.zrank(subset_key, marker)
else:
rank = None
start = rank + 1 if rank is not None else 0 start = rank + 1 if rank is not None else 0
cursor = (q for q in client.zrange(subset_key, start, cursor = (q for q in client.zrange(subset_key, start,
@ -71,9 +74,9 @@ class SubscriptionController(base.Subscription):
ttl = int(record[2]) ttl = int(record[2])
expires = int(record[3]) expires = int(record[3])
created = expires - ttl created = expires - ttl
is_confirmed = True is_confirmed = 1
if len(record) == 6: if len(record) == 6:
is_confirmed = record[5] == str(True) is_confirmed = record[5]
ret = { ret = {
'id': sid, 'id': sid,
'source': record[0], 'source': record[0],
@ -114,7 +117,7 @@ class SubscriptionController(base.Subscription):
source = queue source = queue
now = timeutils.utcnow_ts() now = timeutils.utcnow_ts()
expires = now + ttl expires = now + ttl
confirmed = False confirmed = 0
subscription = {'id': subscription_id, subscription = {'id': subscription_id,
's': source, 's': source,
@ -131,9 +134,8 @@ class SubscriptionController(base.Subscription):
if not self._is_duplicated_subscriber(subscriber, if not self._is_duplicated_subscriber(subscriber,
queue, queue,
project): project):
pipe.zadd(subset_key, 1, pipe.zadd(subset_key, {subscription_id: 1}).hmset(
subscription_id).hmset(subscription_id, subscription_id, subscription)
subscription)
pipe.expire(subscription_id, ttl) pipe.expire(subscription_id, ttl)
pipe.execute() pipe.execute()
else: else:
@ -262,7 +264,7 @@ class SubscriptionController(base.Subscription):
# Let's get our subscription by ID. If it does not exist, # Let's get our subscription by ID. If it does not exist,
# SubscriptionDoesNotExist error will be raised internally. # SubscriptionDoesNotExist error will be raised internally.
self.get(queue, subscription_id, project=project) self.get(queue, subscription_id, project=project)
confirmed = 1 if confirmed else 0
fields = {'c': confirmed} fields = {'c': confirmed}
with self._client.pipeline() as pipe: with self._client.pipeline() as pipe:
pipe.hmset(subscription_id, fields) pipe.hmset(subscription_id, fields)