From a8215f72f960aeb22f3e3e30abf79b74617def4f Mon Sep 17 00:00:00 2001 From: yangzhenyu Date: Tue, 16 Jan 2018 11:15:34 +0800 Subject: [PATCH] Support md5 of message body DocImpact ApiImpact Tempest plugin Depends-on: Icb82042afb1759f129f09e55c2961f1802ae83b4 Implement blueprint support-md5-of-body Change-Id: I671737f423248ddc79bde74e492fc6d4c172bcd0 --- zaqar/api/v1_1/response.py | 10 ++++++-- zaqar/api/v2/response.py | 11 +++++++-- zaqar/common/configs.py | 3 +++ zaqar/storage/mongodb/messages.py | 39 +++++++++++++++++++------------ zaqar/storage/redis/messages.py | 6 ++++- zaqar/storage/redis/models.py | 26 +++++++++++++++------ zaqar/storage/swift/messages.py | 15 ++++++++---- zaqar/storage/utils.py | 29 +++++++++++++++++++++++ zaqar/tests/unit/storage/base.py | 35 +++++++++++++++++++++++++++ zaqar/transport/wsgi/utils.py | 7 ++++-- 10 files changed, 148 insertions(+), 33 deletions(-) diff --git a/zaqar/api/v1_1/response.py b/zaqar/api/v1_1/response.py index b57209bcf..90c45fb7d 100644 --- a/zaqar/api/v1_1/response.py +++ b/zaqar/api/v1_1/response.py @@ -49,7 +49,10 @@ class ResponseSchema(api.Api): "body": { "type": "object" - } + }, + "checksum": { + "type": "string", + }, }, "required": ["href", "ttl", "age", "body", "id"], "additionalProperties": False, @@ -328,7 +331,10 @@ class ResponseSchema(api.Api): "age": age, "body": { "type": "object" - } + }, + "checksum": { + "type": "string", + }, }, "required": ["href", "ttl", "age", "body", "id"], "additionalProperties": False, diff --git a/zaqar/api/v2/response.py b/zaqar/api/v2/response.py index 704d88c52..e3a1bfeb0 100644 --- a/zaqar/api/v2/response.py +++ b/zaqar/api/v2/response.py @@ -49,7 +49,11 @@ class ResponseSchema(api.Api): "body": { "type": "object" - } + }, + + "checksum": { + "type": "string", + }, }, "required": ["href", "ttl", "age", "body", "id"], "additionalProperties": False, @@ -328,7 +332,10 @@ class ResponseSchema(api.Api): "age": age, "body": { "type": "object" - } + }, + "checksum": { + "type": "string", + }, }, "required": ["href", "ttl", "age", "body", "id"], "additionalProperties": False, diff --git a/zaqar/common/configs.py b/zaqar/common/configs.py index 5fcbdf82b..916e9a33e 100644 --- a/zaqar/common/configs.py +++ b/zaqar/common/configs.py @@ -35,6 +35,9 @@ _GENERAL_OPTIONS = ( item_type=cfg.types.List(item_type=cfg.types.String( choices=('1', '1.1'))), help='List of deprecated API versions to enable.'), + cfg.BoolOpt('enable_checksum', default=False, + help='Enable a checksum for message body. The default value ' + 'is False.'), ) _DRIVER_OPTIONS = ( diff --git a/zaqar/storage/mongodb/messages.py b/zaqar/storage/mongodb/messages.py index 7db74e08e..aac5e87f0 100644 --- a/zaqar/storage/mongodb/messages.py +++ b/zaqar/storage/mongodb/messages.py @@ -35,6 +35,7 @@ from zaqar.i18n import _ from zaqar import storage from zaqar.storage import errors from zaqar.storage.mongodb import utils +from zaqar.storage import utils as s_utils LOG = logging.getLogger(__name__) @@ -137,6 +138,7 @@ class MessageController(storage.Message): client uuid -> u transaction -> tx delay -> d + checksum -> cs """ def __init__(self, *args, **kwargs): @@ -646,8 +648,9 @@ class MessageController(storage.Message): project, amount=msgs_n) - msgs_n - prepared_messages = [ - { + prepared_messages = [] + for index, message in enumerate(messages): + msg = { PROJ_QUEUE: utils.scope_queue_name(queue_name, project), 't': message['ttl'], 'e': now_dt + datetime.timedelta(seconds=message['ttl']), @@ -656,11 +659,12 @@ class MessageController(storage.Message): 'd': now + message.get('delay', 0), 'b': message['body'] if 'body' in message else {}, 'k': next_marker + index, - 'tx': None, - } + 'tx': None + } + if self.driver.conf.enable_checksum: + msg['cs'] = s_utils.get_checksum(message.get('body', None)) - for index, message in enumerate(messages) - ] + prepared_messages.append(msg) res = collection.insert_many(prepared_messages, bypass_document_validation=True) @@ -825,8 +829,9 @@ class FIFOMessageController(MessageController): # Unique transaction ID to facilitate atomic batch inserts transaction = objectid.ObjectId() - prepared_messages = [ - { + prepared_messages = [] + for index, message in enumerate(messages): + msg = { PROJ_QUEUE: utils.scope_queue_name(queue_name, project), 't': message['ttl'], 'e': now_dt + datetime.timedelta(seconds=message['ttl']), @@ -835,11 +840,12 @@ class FIFOMessageController(MessageController): 'd': now + message.get('delay', 0), 'b': message['body'] if 'body' in message else {}, 'k': next_marker + index, - 'tx': transaction, - } + 'tx': None + } + if self.driver.conf.enable_checksum: + msg['cs'] = s_utils.get_checksum(message.get('body', None)) - for index, message in enumerate(messages) - ] + prepared_messages.append(msg) # NOTE(kgriffs): Don't take the time to do a 2-phase insert # if there is no way for it to partially succeed. @@ -1002,15 +1008,18 @@ def _is_claimed(msg, now): def _basic_message(msg, now): oid = msg['_id'] age = now - utils.oid_ts(oid) - - return { + res = { 'id': str(oid), 'age': int(age), 'ttl': msg['t'], 'claim_count': msg['c'].get('c', 0), 'body': msg['b'], 'claim_id': str(msg['c']['id']) if msg['c']['id'] else None - } + } + if msg.get('cs'): + res['checksum'] = msg.get('cs') + + return res class MessageQueueHandler(object): diff --git a/zaqar/storage/redis/messages.py b/zaqar/storage/redis/messages.py index 762c8da37..bf683d312 100644 --- a/zaqar/storage/redis/messages.py +++ b/zaqar/storage/redis/messages.py @@ -25,6 +25,7 @@ from zaqar.storage import errors from zaqar.storage.redis import models from zaqar.storage.redis import scripting from zaqar.storage.redis import utils +from zaqar.storage import utils as s_utils Message = models.Message MessageEnvelope = models.MessageEnvelope @@ -98,6 +99,8 @@ class MessageController(storage.Message, scripting.Mixin): +---------------------+---------+ | delay expiry time | d | +---------------------+---------+ + | body checksum | cs | + +---------------------+---------+ 4. Messages rank counter (Redis Hash): @@ -416,7 +419,6 @@ class MessageController(storage.Message, scripting.Mixin): message_ids = [] now = timeutils.utcnow_ts() - with self._client.pipeline() as pipe: for msg in messages: prepared_msg = Message( @@ -428,6 +430,8 @@ class MessageController(storage.Message, scripting.Mixin): claim_count=0, delay_expires=now + msg.get('delay', 0), body=msg.get('body', {}), + checksum=s_utils.get_checksum(msg.get('body', None)) if + self.driver.conf.enable_checksum else None ) prepared_msg.to_redis(pipe) diff --git a/zaqar/storage/redis/models.py b/zaqar/storage/redis/models.py index 6af3f5876..b81008e39 100644 --- a/zaqar/storage/redis/models.py +++ b/zaqar/storage/redis/models.py @@ -23,7 +23,7 @@ from oslo_utils import encodeutils from oslo_utils import uuidutils MSGENV_FIELD_KEYS = (b'id', b't', b'cr', b'e', b'u', b'c', b'c.e', - b'c.c', b'd') + b'c.c', b'd', b'cs') SUBENV_FIELD_KEYS = (b'id', b's', b'u', b't', b'e', b'o', b'p', b'c') @@ -51,6 +51,7 @@ class MessageEnvelope(object): 'claim_expires', 'claim_count', 'delay_expires', + 'checksum', ] def __init__(self, **kwargs): @@ -67,6 +68,7 @@ class MessageEnvelope(object): self.claim_expires = kwargs['claim_expires'] self.claim_count = kwargs.get('claim_count', 0) self.delay_expires = kwargs.get('delay_expires', 0) + self.checksum = kwargs.get('checksum') @staticmethod def from_hmap(hmap): @@ -238,7 +240,8 @@ class Message(MessageEnvelope): created_iso = datetime.datetime.utcfromtimestamp( self.created).strftime('%Y-%m-%dT%H:%M:%SZ') basic_msg['created'] = created_iso - + if self.checksum: + basic_msg['checksum'] = self.checksum return basic_msg @@ -266,7 +269,7 @@ def _hmap_to_msgenv_kwargs(hmap): # NOTE(kgriffs): Under Py3K, redis-py converts all strings # into binary. Woohoo! - return { + res = { 'id': encodeutils.safe_decode(hmap[b'id']), 'ttl': int(hmap[b't']), 'created': int(hmap[b'cr']), @@ -277,12 +280,18 @@ def _hmap_to_msgenv_kwargs(hmap): 'claim_id': claim_id, 'claim_expires': int(hmap[b'c.e']), 'claim_count': int(hmap[b'c.c']), - 'delay_expires': int(hmap.get(b'd', 0)), + 'delay_expires': int(hmap.get(b'd', 0)) } + checksum = hmap.get(b'cs') + if checksum: + res['checksum'] = encodeutils.safe_decode(hmap[b'cs']) + + return res + def _msgenv_to_hmap(msg): - return { + res = { 'id': msg.id, 't': msg.ttl, 'cr': msg.created, @@ -291,8 +300,11 @@ def _msgenv_to_hmap(msg): 'c': msg.claim_id or '', 'c.e': msg.claim_expires, 'c.c': msg.claim_count, - 'd': msg.delay_expires, - } + 'd': msg.delay_expires + } + if msg.checksum: + res['cs'] = msg.checksum + return res def _hmap_kv_to_subenv(keys, values): diff --git a/zaqar/storage/swift/messages.py b/zaqar/storage/swift/messages.py index f2b25a431..69b9f136a 100644 --- a/zaqar/storage/swift/messages.py +++ b/zaqar/storage/swift/messages.py @@ -24,6 +24,7 @@ from zaqar.common import decorators from zaqar import storage from zaqar.storage import errors from zaqar.storage.swift import utils +from zaqar.storage import utils as s_utils class MessageController(storage.Message): @@ -53,6 +54,8 @@ class MessageController(storage.Message): +--------------+-----------------------------------------+ | Expires | Object Delete-After header | +--------------------------------------------------------+ + | Checksum | Object content 'body' checksum | + +--------------------------------------------------------+ """ def __init__(self, *args, **kwargs): @@ -223,10 +226,14 @@ class MessageController(storage.Message): def _create_msg(self, queue, msg, client_uuid, project): slug = str(uuid.uuid1()) now = timeutils.utcnow_ts() - contents = jsonutils.dumps( - {'body': msg.get('body', {}), 'claim_id': None, - 'ttl': msg['ttl'], 'claim_count': 0, - 'delay_expires': now + msg.get('delay', 0)}) + message = {'body': msg.get('body', {}), 'claim_id': None, + 'ttl': msg['ttl'], 'claim_count': 0, + 'delay_expires': now + msg.get('delay', 0)} + + if self.driver.conf.enable_checksum: + message['checksum'] = s_utils.get_checksum(msg.get('body', None)) + + contents = jsonutils.dumps(message) utils._put_or_create_container( self._client, utils._message_container(queue, project), diff --git a/zaqar/storage/utils.py b/zaqar/storage/utils.py index 4a9a4a651..fbe0b5589 100644 --- a/zaqar/storage/utils.py +++ b/zaqar/storage/utils.py @@ -13,6 +13,8 @@ # the License. import copy +import hashlib +import json from oslo_config import cfg from oslo_log import log @@ -210,3 +212,30 @@ def can_connect(uri, conf=None): except Exception as exc: LOG.debug('Can\'t connect to: %s \n%s', (uri, exc)) return False + + +def get_checksum(body, algorithm='MD5'): + """According to the algorithm to get the message body checksum. + + :param body: The message body. + :type body: six.text_type + :param algorithm: The algorithm type, default is MD5. + :type algorithm: six.text_type + :returns: The message body checksum. + :rtype: six.text_type + """ + + checksum = '%s:' % algorithm + + if body is None: + return '' + else: + checksum_body = json.dumps(body).encode('utf-8') + # TODO(yangzhenyu): We may support other algorithms in future + # versions, including SHA1, SHA256, SHA512, and so on. + if algorithm == 'MD5': + md5 = hashlib.md5() + md5.update(checksum_body) + checksum += md5.hexdigest() + + return checksum diff --git a/zaqar/tests/unit/storage/base.py b/zaqar/tests/unit/storage/base.py index b6bafa27c..6bae8e45c 100644 --- a/zaqar/tests/unit/storage/base.py +++ b/zaqar/tests/unit/storage/base.py @@ -18,6 +18,8 @@ import os import collections import datetime +import hashlib +import json import math import random import time @@ -460,6 +462,39 @@ class MessageControllerTest(ControllerBaseTest): with testing.expect(errors.DoesNotExist): self.controller.get(queue_name, message_id, project=self.project) + def test_message_body_checksum(self): + self.conf.enable_checksum = True + queue_name = self.queue_name + message = { + 'ttl': 60, + 'body': { + 'event': 'BackupStarted', + 'backupId': 'c378813c-3f0b-11e2-ad92-7823d2b0f3ce' + } + } + + # Test Message Creation + created = list(self.controller.post(queue_name, [message], + project=self.project, + client_uuid=uuid.uuid4())) + self.assertEqual(1, len(created)) + message_id = created[0] + + # Test Message Get + message_out = self.controller.get(queue_name, message_id, + project=self.project) + self.assertEqual({'id', 'body', 'ttl', 'age', 'claim_count', + 'claim_id', 'checksum'}, set(message_out)) + + algorithm, checksum = message_out['checksum'].split(':') + expected_checksum = '' + if algorithm == 'MD5': + md5 = hashlib.md5() + md5.update(json.dumps(message['body']).encode('utf-8')) + expected_checksum = md5.hexdigest() + + self.assertEqual(expected_checksum, checksum) + def test_get_multi(self): client_uuid = uuid.uuid4() diff --git a/zaqar/transport/wsgi/utils.py b/zaqar/transport/wsgi/utils.py index 481bd2487..4e35ea551 100644 --- a/zaqar/transport/wsgi/utils.py +++ b/zaqar/transport/wsgi/utils.py @@ -235,10 +235,13 @@ def format_message_v1(message, base_path, claim_id=None): def format_message_v1_1(message, base_path, claim_id=None): url = message_url(message, base_path, claim_id) - return { + res = { 'id': message['id'], 'href': url, 'ttl': message['ttl'], 'age': message['age'], - 'body': message['body'], + 'body': message['body'] } + if message.get('checksum'): + res['checksum'] = message.get('checksum') + return res