From 384d613f5e50d7441fadcf289987f398ef6a8a1e Mon Sep 17 00:00:00 2001 From: Nataliia Uvarova Date: Thu, 5 Jun 2014 18:13:34 +0200 Subject: [PATCH] Small fixes in storage drivers for Python 3 Fixes in this commit include use of list comprehession instead of map, since on Python 3 map returns iterator, not list; add proper iterator interface for HookedCursor and some other minor changes. Partially-implements: blueprint python-py3k Change-Id: I211477cbd018b0dd2ffe0d280ff24dd4c5021324 --- marconi/queues/storage/mongodb/messages.py | 2 +- marconi/queues/storage/mongodb/utils.py | 9 +++++++-- marconi/queues/storage/sqlalchemy/messages.py | 2 +- marconi/queues/storage/sqlalchemy/utils.py | 4 +++- 4 files changed, 12 insertions(+), 5 deletions(-) diff --git a/marconi/queues/storage/mongodb/messages.py b/marconi/queues/storage/mongodb/messages.py index 5ce815143..eef331a7b 100644 --- a/marconi/queues/storage/mongodb/messages.py +++ b/marconi/queues/storage/mongodb/messages.py @@ -573,7 +573,7 @@ class MessageController(storage.Message): {'$set': {'tx': None}}, upsert=False, multi=True) - return map(str, ids) + return [str(id_) for id_ in ids] except pymongo.errors.DuplicateKeyError as ex: # TODO(kgriffs): Record stats of how often retries happen, diff --git a/marconi/queues/storage/mongodb/utils.py b/marconi/queues/storage/mongodb/utils.py index cf926ce9b..96d208ae1 100644 --- a/marconi/queues/storage/mongodb/utils.py +++ b/marconi/queues/storage/mongodb/utils.py @@ -237,7 +237,7 @@ def get_partition(num_partitions, queue, project=None): # NOTE(kgriffs): For small numbers of partitions, crc32 will # provide a uniform distribution. This was verified experimentally # with up to 100 partitions. - return binascii.crc32(name) % num_partitions + return binascii.crc32(name.encode('utf-8')) % num_partitions def raises_conn_error(func): @@ -279,6 +279,7 @@ def retries_on_autoreconnect(func): max_attemps = self.driver.mongodb_conf.max_reconnect_attempts sleep_sec = self.driver.mongodb_conf.reconnect_sleep + last_ex = None for attempt in range(max_attemps): try: return func(self, *args, **kwargs) @@ -288,12 +289,13 @@ def retries_on_autoreconnect(func): LOG.warn(_(u'Caught AutoReconnect, retrying the ' 'call to {0}').format(func)) + last_ex = ex time.sleep(sleep_sec * (2 ** attempt)) else: LOG.error(_(u'Caught AutoReconnect, maximum attempts ' 'to {0} exceeded.').format(func)) - raise ex + raise last_ex return wrapper @@ -317,3 +319,6 @@ class HookedCursor(object): def next(self): item = next(self.cursor) return self.denormalizer(item) + + def __next__(self): + return self.next() diff --git a/marconi/queues/storage/sqlalchemy/messages.py b/marconi/queues/storage/sqlalchemy/messages.py index b5e5fb905..50fb00dd2 100644 --- a/marconi/queues/storage/sqlalchemy/messages.py +++ b/marconi/queues/storage/sqlalchemy/messages.py @@ -238,7 +238,7 @@ class MessageController(storage.Message): statement = statement.order_by(tables.Messages.c.id.desc()) result = trans.execute(statement).fetchall() - return map(utils.msgid_encode, [i[0] for i in reversed(result)]) + return [utils.msgid_encode(i[0]) for i in reversed(result)] def delete(self, queue, message_id, project, claim=None): if project is None: diff --git a/marconi/queues/storage/sqlalchemy/utils.py b/marconi/queues/storage/sqlalchemy/utils.py index 047476692..44f120071 100644 --- a/marconi/queues/storage/sqlalchemy/utils.py +++ b/marconi/queues/storage/sqlalchemy/utils.py @@ -88,7 +88,9 @@ def msgid_decode(id): def marker_encode(id): - return oct(id ^ 0x3c96a355)[1:] + # NOTE(AAzza): cannot use oct(id) here, because on Python 3 it returns + # string with prefix '0o', whereas on Python 2 prefix is just '0' + return '{0:o}'.format(id ^ 0x3c96a355) def marker_decode(id):