From e12c65a369825ea5469bdb31f5c7151268d7926b Mon Sep 17 00:00:00 2001 From: wanghao Date: Tue, 23 Jun 2020 18:01:45 +0800 Subject: [PATCH] Encrypted Messages in Queue The queue in Zaqar will support to encrypt messages before storing them into storage backends, also could support to decrypt messages when those are claimed by consumer. This feature will enhance the security of messaging service. Implements: blueprint encrypted-messages-in-queue Signed-off-by: wanghao Change-Id: Icecfb9a232cfeefc2f9603934696bb2dcd56bc9c --- api-ref/source/parameters.yaml | 9 + api-ref/source/queues.inc | 6 +- .../source/samples/queue-create-request.json | 1 + .../source/samples/queue-show-response.json | 3 +- lower-constraints.txt | 1 + ...ed-messages-in-queue-d7438d4f185be444.yaml | 9 + requirements.txt | 1 + test-requirements.txt | 1 + zaqar/conf/transport.py | 13 +- .../unit/transport/wsgi/v2_0/test_messages.py | 126 +++++++++- .../wsgi/v2_0/test_queue_lifecycle.py | 12 +- zaqar/transport/encryptor.py | 216 ++++++++++++++++++ zaqar/transport/validation.py | 5 + zaqar/transport/wsgi/driver.py | 2 + zaqar/transport/wsgi/v2_0/__init__.py | 12 +- zaqar/transport/wsgi/v2_0/messages.py | 41 +++- zaqar/transport/wsgi/v2_0/queues.py | 1 + 17 files changed, 439 insertions(+), 20 deletions(-) create mode 100644 releasenotes/notes/encrypted-messages-in-queue-d7438d4f185be444.yaml create mode 100644 zaqar/transport/encryptor.py diff --git a/api-ref/source/parameters.yaml b/api-ref/source/parameters.yaml index b2c788ea8..c2fd14719 100644 --- a/api-ref/source/parameters.yaml +++ b/api-ref/source/parameters.yaml @@ -239,6 +239,15 @@ _default_message_ttl: one of the ``reserved attributes`` of Zaqar queues. The value will be reverted to the default value after deleting it explicitly. +_enable_encrypt_messages: + type: boolean + in: body + required: False + description: | + The switch of encrypting messages for a queue, which will effect for + any messages posted to the queue. By default, the value is False. It is + one of the ``reserved attributes`` of Zaqar queues. + _flavor: type: string in: body diff --git a/api-ref/source/queues.inc b/api-ref/source/queues.inc index 49cedcac1..fc74de90c 100644 --- a/api-ref/source/queues.inc +++ b/api-ref/source/queues.inc @@ -85,8 +85,8 @@ exceed 64 bytes in length, and it is limited to US-ASCII letters, digits, underscores, and hyphens. When create queue, user can specify metadata for the queue. Currently, Zaqar -supports below metadata: _flavor, _max_claim_count, _dead_letter_queue and -_dead_letter_queue_messages_ttl. +supports below metadata: _flavor, _max_claim_count, _dead_letter_queue, +_dead_letter_queue_messages_ttl and _enable_encrypt_messages. In order to support the delayed queues, now add a metadata ``_default_message_delay``. @@ -119,6 +119,7 @@ Request Parameters - _flavor: _flavor - _max_claim_count: _max_claim_count - _max_messages_post_size: _max_messages_post_size + - _enable_encrypt_messages: _enable_encrypt_messages Request Example --------------- @@ -225,6 +226,7 @@ Response Parameters - _max_claim_count: _max_claim_count_response - _dead_letter_queue: _dead_letter_queue_response - _dead_letter_queue_messages_ttl: _dead_letter_queue_messages_ttl_response + - _enable_encrypt_messages: _enable_encrypt_messages Response Example ---------------- diff --git a/api-ref/source/samples/queue-create-request.json b/api-ref/source/samples/queue-create-request.json index 1f2cd71e9..2b0e0209e 100644 --- a/api-ref/source/samples/queue-create-request.json +++ b/api-ref/source/samples/queue-create-request.json @@ -5,5 +5,6 @@ "_dead_letter_queue": "dead_letter", "_dead_letter_queue_messages_ttl": 3600, "_max_claim_count": 10, + "_enable_encrypt_messages": true, "description": "Queue for international traffic billing." } \ No newline at end of file diff --git a/api-ref/source/samples/queue-show-response.json b/api-ref/source/samples/queue-show-response.json index 2303f2a5a..30045df93 100644 --- a/api-ref/source/samples/queue-show-response.json +++ b/api-ref/source/samples/queue-show-response.json @@ -4,5 +4,6 @@ "description": "Queue used for billing.", "_max_claim_count": 10, "_dead_letter_queue": "dead_letter", - "_dead_letter_queue_messages_ttl": 3600 + "_dead_letter_queue_messages_ttl": 3600, + "_enable_encrypt_messages": true } \ No newline at end of file diff --git a/lower-constraints.txt b/lower-constraints.txt index 33c3ca66c..69740375f 100644 --- a/lower-constraints.txt +++ b/lower-constraints.txt @@ -2,6 +2,7 @@ alembic==0.8.10 autobahn==0.17.1 Babel==2.3.4 coverage==4.0 +cryptography==2.1 ddt==1.0.1 doc8==0.6.0 dogpile.cache==0.6.2 diff --git a/releasenotes/notes/encrypted-messages-in-queue-d7438d4f185be444.yaml b/releasenotes/notes/encrypted-messages-in-queue-d7438d4f185be444.yaml new file mode 100644 index 000000000..28b9daa94 --- /dev/null +++ b/releasenotes/notes/encrypted-messages-in-queue-d7438d4f185be444.yaml @@ -0,0 +1,9 @@ +--- +features: + - | + To enhance the security of messaging service, the queue in Zaqar + supports to encrypt messages before storing them into storage backends, + also could support to decrypt messages when those are claimed by consumer. + To enable this feature, user just need to take "_enable_encrypt_messages=True" + when creating queue. AES-256 is used as the default of encryption algorithm and + encryption key is configurable in the zaqar.conf. diff --git a/requirements.txt b/requirements.txt index 008da0801..4a7a297ca 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ pbr!=2.1.0,>=2.0.0 # Apache-2.0 alembic>=0.8.10 # MIT Babel!=2.4.0,>=2.3.4 # BSD +cryptography>=2.1 # BSD/Apache-2.0 falcon>=1.1.0 # Apache-2.0 jsonschema>=2.6.0 # MIT iso8601>=0.1.11 # MIT diff --git a/test-requirements.txt b/test-requirements.txt index ab39b5d5d..fba056680 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -16,6 +16,7 @@ PyMySQL>=0.7.6 # MIT License # Unit testing coverage!=4.4,>=4.0 # Apache-2.0 +cryptography>=2.1 # BSD/Apache-2.0 ddt>=1.0.1 # MIT doc8>=0.6.0 # Apache-2.0 Pygments>=2.2.0 # BSD license diff --git a/zaqar/conf/transport.py b/zaqar/conf/transport.py index fe6de9b67..25f81fd13 100644 --- a/zaqar/conf/transport.py +++ b/zaqar/conf/transport.py @@ -149,6 +149,15 @@ message_delete_with_claim_id = cfg.BoolOpt( 'improve the security of the message avoiding delete messages before' ' they are claimed and handled.') +message_encryption_algorithms = cfg.StrOpt( + 'message_encryption_algorithms', default='AES256', choices=['AES256'], + help='Defines the encryption algorithms of messages, the value could be ' + '"AES256" for now.') + +message_encryption_key = cfg.StrOpt( + 'message_encryption_key', default='AES256', + help='Defines the encryption key of algorithms.') + GROUP_NAME = 'transport' ALL_OPTS = [ @@ -173,7 +182,9 @@ ALL_OPTS = [ client_id_uuid_safe, min_length_client_id, max_length_client_id, - message_delete_with_claim_id + message_delete_with_claim_id, + message_encryption_algorithms, + message_encryption_key ] diff --git a/zaqar/tests/unit/transport/wsgi/v2_0/test_messages.py b/zaqar/tests/unit/transport/wsgi/v2_0/test_messages.py index 8ae9c3d8e..298ee106f 100644 --- a/zaqar/tests/unit/transport/wsgi/v2_0/test_messages.py +++ b/zaqar/tests/unit/transport/wsgi/v2_0/test_messages.py @@ -61,13 +61,19 @@ class TestMessagesMongoDB(base.V2Base): # so that we don't have to concatenate against self.url_prefix # all over the place. self.queue_path = self.url_prefix + '/queues/fizbit' + self.encrypted_queue_path = self.url_prefix + '/queues/secretbit' self.messages_path = self.queue_path + '/messages' + self.encrypted_messages_path = self.encrypted_queue_path + '/messages' doc = '{"_ttl": 60}' self.simulate_put(self.queue_path, body=doc, headers=self.headers) + doc = '{"_ttl": 60, "_enable_encrypt_messages": true}' + self.simulate_put(self.encrypted_queue_path, body=doc, + headers=self.headers) def tearDown(self): self.simulate_delete(self.queue_path, headers=self.headers) + self.simulate_delete(self.encrypted_queue_path, headers=self.headers) if self.conf.pooling: for i in range(4): self.simulate_delete(self.url_prefix + '/pools/' + str(i), @@ -94,10 +100,15 @@ class TestMessagesMongoDB(base.V2Base): body=sample_doc, headers=self.headers) self.assertEqual(falcon.HTTP_400, self.srmock.status) - def _test_post(self, sample_messages): + def _test_post(self, sample_messages, is_encrypted=False): sample_doc = jsonutils.dumps({'messages': sample_messages}) + messages_path = None + if is_encrypted: + messages_path = self.encrypted_messages_path + else: + messages_path = self.messages_path - result = self.simulate_post(self.messages_path, + result = self.simulate_post(messages_path, body=sample_doc, headers=self.headers) self.assertEqual(falcon.HTTP_201, self.srmock.status) @@ -125,7 +136,7 @@ class TestMessagesMongoDB(base.V2Base): with mock.patch(timeutils_utcnow) as mock_utcnow: mock_utcnow.return_value = now for msg_id in msg_ids: - message_uri = self.messages_path + '/' + msg_id + message_uri = messages_path + '/' + msg_id headers = self.headers.copy() headers['X-Project-ID'] = '777777' @@ -150,7 +161,7 @@ class TestMessagesMongoDB(base.V2Base): # Test bulk GET query_string = 'ids=' + ','.join(msg_ids) - result = self.simulate_get(self.messages_path, + result = self.simulate_get(messages_path, query_string=query_string, headers=self.headers) @@ -204,6 +215,22 @@ class TestMessagesMongoDB(base.V2Base): self._test_post(sample_messages) + def test_post_single_encrypted(self): + sample_messages = [ + {'body': {'key': 'value'}, 'ttl': 200}, + ] + + self._test_post(sample_messages) + + def test_post_multiple_encrypted(self): + sample_messages = [ + {'body': 239, 'ttl': 100}, + {'body': {'key': 'value'}, 'ttl': 200}, + {'body': [1, 3], 'ttl': 300}, + ] + + self._test_post(sample_messages) + def test_post_optional_ttl(self): sample_messages = { 'messages': [ @@ -304,6 +331,24 @@ class TestMessagesMongoDB(base.V2Base): body=sample_doc, headers=self.headers) self.assertEqual(falcon.HTTP_400, self.srmock.status) + def test_post_using_queue_max_messages_post_size_with_encrypted(self): + queue_path = self.url_prefix + '/queues/test_queue2' + messages_path = queue_path + '/messages' + doc = ('{"_max_messages_post_size": 1023, ' + '"_enable_encrypt_messages": true}') + self.simulate_put(queue_path, body=doc, headers=self.headers) + self.addCleanup(self.simulate_delete, queue_path, headers=self.headers) + sample_messages = { + 'messages': [ + {'body': {'key': 'a' * 1204}}, + ], + } + + sample_doc = jsonutils.dumps(sample_messages) + self.simulate_post(messages_path, + body=sample_doc, headers=self.headers) + self.assertEqual(falcon.HTTP_400, self.srmock.status) + def test_get_from_missing_queue(self): body = self.simulate_get(self.url_prefix + '/queues/nonexistent/messages', @@ -384,6 +429,24 @@ class TestMessagesMongoDB(base.V2Base): self.simulate_delete(target, headers=self.headers) self.assertEqual(falcon.HTTP_204, self.srmock.status) + def test_delete_with_encrypted(self): + self._post_messages(self.encrypted_messages_path) + msg_id = self._get_msg_id(self.srmock.headers_dict) + target = self.encrypted_messages_path + '/' + msg_id + + self.simulate_get(target, headers=self.headers) + self.assertEqual(falcon.HTTP_200, self.srmock.status) + + self.simulate_delete(target, headers=self.headers) + self.assertEqual(falcon.HTTP_204, self.srmock.status) + + self.simulate_get(target, headers=self.headers) + self.assertEqual(falcon.HTTP_404, self.srmock.status) + + # Safe to delete non-existing ones + self.simulate_delete(target, headers=self.headers) + self.assertEqual(falcon.HTTP_204, self.srmock.status) + def test_bulk_delete(self): path = self.queue_path + '/messages' self._post_messages(path, repeat=5) @@ -410,6 +473,32 @@ class TestMessagesMongoDB(base.V2Base): self.simulate_delete(target, query_string=params, headers=self.headers) self.assertEqual(falcon.HTTP_204, self.srmock.status) + def test_bulk_delete_with_encrpted(self): + path = self.encrypted_queue_path + '/messages' + self._post_messages(path, repeat=5) + [target, params] = self.srmock.headers_dict['location'].split('?') + + # Deleting the whole collection is denied + self.simulate_delete(path, headers=self.headers) + self.assertEqual(falcon.HTTP_400, self.srmock.status) + + self.simulate_delete(target, query_string=params, headers=self.headers) + self.assertEqual(falcon.HTTP_204, self.srmock.status) + + self.simulate_get(target, query_string=params, headers=self.headers) + self.assertEqual(falcon.HTTP_404, self.srmock.status) + + # Safe to delete non-existing ones + self.simulate_delete(target, query_string=params, headers=self.headers) + self.assertEqual(falcon.HTTP_204, self.srmock.status) + + # Even after the queue is gone + self.simulate_delete(self.queue_path, headers=self.headers) + self.assertEqual(falcon.HTTP_204, self.srmock.status) + + self.simulate_delete(target, query_string=params, headers=self.headers) + self.assertEqual(falcon.HTTP_204, self.srmock.status) + def test_bulk_delete_with_claim_ids(self): self.conf.set_override('message_delete_with_claim_id', True, 'transport') @@ -490,6 +579,35 @@ class TestMessagesMongoDB(base.V2Base): self.assertEqual(falcon.HTTP_200, self.srmock.status) self._empty_message_list(body) + def test_list_with_encrpyted(self): + path = self.encrypted_queue_path + '/messages' + self._post_messages(path, repeat=10) + + query_string = 'limit=3&echo=true' + body = self.simulate_get(path, + query_string=query_string, + headers=self.headers) + + self.assertEqual(falcon.HTTP_200, self.srmock.status) + + cnt = 0 + while jsonutils.loads(body[0])['messages'] != []: + contents = jsonutils.loads(body[0]) + [target, params] = contents['links'][0]['href'].split('?') + + for msg in contents['messages']: + self.simulate_get(msg['href'], headers=self.headers) + self.assertEqual(falcon.HTTP_200, self.srmock.status) + + body = self.simulate_get(target, + query_string=params, + headers=self.headers) + cnt += 1 + + self.assertEqual(4, cnt) + self.assertEqual(falcon.HTTP_200, self.srmock.status) + self._empty_message_list(body) + def test_list_with_bad_marker(self): path = self.queue_path + '/messages' self._post_messages(path, repeat=5) diff --git a/zaqar/tests/unit/transport/wsgi/v2_0/test_queue_lifecycle.py b/zaqar/tests/unit/transport/wsgi/v2_0/test_queue_lifecycle.py index 633895645..4648e863c 100644 --- a/zaqar/tests/unit/transport/wsgi/v2_0/test_queue_lifecycle.py +++ b/zaqar/tests/unit/transport/wsgi/v2_0/test_queue_lifecycle.py @@ -113,6 +113,7 @@ class TestQueueLifecycleMongoDB(base.V2Base): ref_doc['_dead_letter_queue'] = None ref_doc['_dead_letter_queue_messages_ttl'] = None ref_doc['_max_claim_count'] = None + ref_doc['_enable_encrypt_messages'] = False self.assertEqual(ref_doc, result_doc) # Stats empty queue @@ -161,6 +162,7 @@ class TestQueueLifecycleMongoDB(base.V2Base): ref_doc['_dead_letter_queue'] = None ref_doc['_dead_letter_queue_messages_ttl'] = None ref_doc['_max_claim_count'] = None + ref_doc['_enable_encrypt_messages'] = False self.assertEqual(ref_doc, result_doc) # Stats empty queue @@ -301,6 +303,7 @@ class TestQueueLifecycleMongoDB(base.V2Base): ref_doc['_dead_letter_queue'] = None ref_doc['_dead_letter_queue_messages_ttl'] = None ref_doc['_max_claim_count'] = None + ref_doc['_enable_encrypt_messages'] = False self.assertEqual(ref_doc, result_doc) self.assertEqual(falcon.HTTP_200, self.srmock.status) @@ -358,7 +361,8 @@ class TestQueueLifecycleMongoDB(base.V2Base): '_default_message_delay': 0, '_dead_letter_queue': None, '_dead_letter_queue_messages_ttl': None, - '_max_claim_count': None}, result_doc) + '_max_claim_count': None, + '_enable_encrypt_messages': False}, result_doc) # remove metadata doc3 = '[{"op":"remove", "path": "/metadata/key1"}]' @@ -383,7 +387,8 @@ class TestQueueLifecycleMongoDB(base.V2Base): '_default_message_delay': 0, '_dead_letter_queue': None, '_dead_letter_queue_messages_ttl': None, - '_max_claim_count': None}, result_doc) + '_max_claim_count': None, + '_enable_encrypt_messages': False}, result_doc) # replace non-existent metadata doc4 = '[{"op":"replace", "path": "/metadata/key3", "value":2}]' @@ -501,7 +506,8 @@ class TestQueueLifecycleMongoDB(base.V2Base): '_default_message_delay': 0, '_dead_letter_queue': None, '_dead_letter_queue_messages_ttl': None, - '_max_claim_count': None}, result_doc) + '_max_claim_count': None, + '_enable_encrypt_messages': False}, result_doc) # queue filter result = self.simulate_get(self.queue_path, headers=header, diff --git a/zaqar/transport/encryptor.py b/zaqar/transport/encryptor.py new file mode 100644 index 000000000..0074d0afc --- /dev/null +++ b/zaqar/transport/encryptor.py @@ -0,0 +1,216 @@ +# Copyright (c) 2020 Fiberhome Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Encryption has a dependency on the pycrypto. If pycrypto is not available, +CryptoUnavailableError will be raised. + +""" + +import base64 +import functools +import hashlib +import os +import pickle + +try: + from cryptography.hazmat import backends as crypto_backends + from cryptography.hazmat.primitives import ciphers + from cryptography.hazmat.primitives.ciphers import algorithms + from cryptography.hazmat.primitives.ciphers import modes + from cryptography.hazmat.primitives import padding +except ImportError: + ciphers = None + +from zaqar.conf import transport +from zaqar.i18n import _ + + +class EncryptionFailed(ValueError): + """Encryption failed when encrypting messages.""" + + def __init__(self, msg, *args, **kwargs): + msg = msg.format(*args, **kwargs) + super(EncryptionFailed, self).__init__(msg) + + +class DecryptError(Exception): + """raise when unable to decrypt encrypted data.""" + pass + + +class CryptoUnavailableError(Exception): + """raise when Python Crypto module is not available.""" + pass + + +def assert_crypto_availability(f): + """Ensure cryptography module is available.""" + @functools.wraps(f) + def wrapper(*args, **kwds): + if ciphers is None: + raise CryptoUnavailableError() + return f(*args, **kwds) + return wrapper + + +class EncryptionFactory(object): + + def __init__(self, conf): + self._conf = conf + self._conf.register_opts(transport.ALL_OPTS, + group=transport.GROUP_NAME) + self._limits_conf = self._conf[transport.GROUP_NAME] + self._algorithm = self._limits_conf.message_encryption_algorithms + self._encryption_key = None + if self._limits_conf.message_encryption_key: + hash_function = hashlib.sha256() + key = bytes(self._limits_conf.message_encryption_key, 'utf-8') + hash_function.update(key) + self._encryption_key = hash_function.digest() + + def getEncryptor(self): + if self._algorithm == 'AES256' and self._encryption_key: + return AES256Encryptor(self._encryption_key) + + +class Encryptor(object): + + def __init__(self, encryption_key): + self._encryption_key = encryption_key + + def message_encrypted(self, messages): + """Encrypting a list of messages. + + :param messages: A list of messages + """ + pass + + def message_decrypted(self, messages): + """decrypting a list of messages. + + :param messages: A list of messages + """ + pass + + def get_cipher(self): + pass + + def get_encryption_key(self): + return self._encryption_key + + +class AES256Encryptor(Encryptor): + + def get_cipher(self): + iv = os.urandom(16) + cipher = ciphers.Cipher( + algorithms.AES(self.get_encryption_key()), + modes.CBC(iv), backend=crypto_backends.default_backend()) + # AES algorithm uses block size of 16 bytes = 128 bits, defined in + # algorithms.AES.block_size. Using ``cryptography``, we will + # analogously use hazmat.primitives.padding to pad it to + # the 128-bit block size. + padder = padding.PKCS7(algorithms.AES.block_size).padder() + return iv, cipher, padder + + def _encrypt_string_message(self, message): + """Encrypt the message type of string""" + message = message.encode('utf-8') + iv, cipher, padder = self.get_cipher() + encryptor = cipher.encryptor() + padded_data = padder.update(message) + padder.finalize() + data = iv + encryptor.update(padded_data) + encryptor.finalize() + return base64.b64encode(data) + + def _encrypt_other_types_message(self, message): + """Encrypt the message type of other types""" + iv, cipher, padder = self.get_cipher() + encryptor = cipher.encryptor() + padded_data = padder.update(message) + padder.finalize() + data = iv + encryptor.update(padded_data) + encryptor.finalize() + return base64.b64encode(data) + + def _encrypt_message(self, message): + """Encrypt the message data with the given secret key. + + Padding is n bytes of the value n, where 1 <= n <= blocksize. + """ + if isinstance(message['body'], str): + message['body'] = self._encrypt_string_message(message['body']) + else: + # For other types like dict or list, we need to serialize them + # first. + try: + s_message = pickle.dumps(message['body']) + except pickle.PickleError: + return + message['body'] = self._encrypt_other_types_message(s_message) + + def _decrypt_message(self, message): + try: + encrypted_message = base64.b64decode(message['body']) + except (ValueError, TypeError): + return + iv = encrypted_message[:16] + cipher = ciphers.Cipher( + algorithms.AES(self._encryption_key), + modes.CBC(iv), + backend=crypto_backends.default_backend()) + try: + decryptor = cipher.decryptor() + data = (decryptor.update(encrypted_message[16:]) + + decryptor.finalize()) + except Exception: + raise DecryptError(_('Encrypted data appears to be corrupted.')) + + # Strip the last n padding bytes where n is the last value in + # the plaintext + unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder() + data = unpadder.update(data) + unpadder.finalize() + try: + message['body'] = pickle.loads(data) + except pickle.UnpicklingError: + # If the data is a string which didn't be serialized, there will + # raise an exception. We just try to return the string itself. + message['body'] = str(data, encoding="utf-8") + + @assert_crypto_availability + def message_encrypted(self, messages): + """Encrypting a list of messages. + + :param messages: A list of messages + """ + if self.get_encryption_key(): + for msg in messages: + self._encrypt_message(msg) + else: + msg = _(u'Now Zaqar only support AES-256 and need to specify the' + u'key.') + raise EncryptionFailed(msg) + + @assert_crypto_availability + def message_decrypted(self, messages): + """decrypting a list of messages. + + :param messages: A list of messages + """ + if self.get_encryption_key(): + for msg in messages: + self._decrypt_message(msg) + else: + msg = _(u'Now Zaqar only support AES-256 and need to specify the' + u'key.') + raise EncryptionFailed(msg) diff --git a/zaqar/transport/validation.py b/zaqar/transport/validation.py index bcb271c2d..c9fca5b84 100644 --- a/zaqar/transport/validation.py +++ b/zaqar/transport/validation.py @@ -341,6 +341,11 @@ class Validator(object): msg, self._limits_conf.max_message_delay, MIN_DELAY_TTL) + encrypted_queue = queue_metadata.get('_enable_encrypt_messages', False) + if encrypted_queue and not isinstance(encrypted_queue, bool): + msg = _(u'_enable_encrypt_messages must be boolean.') + raise ValidationFailed(msg) + self._validate_retry_policy(queue_metadata) def queue_purging(self, document): diff --git a/zaqar/transport/wsgi/driver.py b/zaqar/transport/wsgi/driver.py index 3e1dfc46b..ad3c9a19f 100644 --- a/zaqar/transport/wsgi/driver.py +++ b/zaqar/transport/wsgi/driver.py @@ -28,6 +28,7 @@ from zaqar.conf import drivers_transport_wsgi from zaqar.i18n import _ from zaqar import transport from zaqar.transport import acl +from zaqar.transport import encryptor from zaqar.transport.middleware import auth from zaqar.transport.middleware import cors from zaqar.transport.middleware import profile @@ -59,6 +60,7 @@ class Driver(transport.DriverBase): group=drivers_transport_wsgi.GROUP_NAME) self._wsgi_conf = self._conf[drivers_transport_wsgi.GROUP_NAME] self._validate = validation.Validator(self._conf) + self._encryptor_factory = encryptor.EncryptionFactory(self._conf) self.app = None self._init_routes() diff --git a/zaqar/transport/wsgi/v2_0/__init__.py b/zaqar/transport/wsgi/v2_0/__init__.py index 266e27fea..184d29db5 100644 --- a/zaqar/transport/wsgi/v2_0/__init__.py +++ b/zaqar/transport/wsgi/v2_0/__init__.py @@ -82,9 +82,11 @@ def public_endpoints(driver, conf): driver._validate, message_controller, queue_controller, - defaults.message_ttl)), + defaults.message_ttl, + driver._encryptor_factory)), ('/queues/{queue_name}/messages/{message_id}', - messages.ItemResource(message_controller)), + messages.ItemResource(message_controller, queue_controller, + driver._encryptor_factory)), # Claims Endpoints ('/queues/{queue_name}/claims', @@ -140,9 +142,11 @@ def public_endpoints(driver, conf): driver._validate, message_controller, topic_controller, - defaults.message_ttl)), + defaults.message_ttl, + driver._encryptor_factory)), ('/topics/{topic_name}/messages/{message_id}', - messages.ItemResource(message_controller)), + messages.ItemResource(message_controller, queue_controller, + driver._encryptor_factory)), # Topic Subscription Endpoints ('/topics/{topic_name}/subscriptions', subscriptions.CollectionResource(driver._validate, diff --git a/zaqar/transport/wsgi/v2_0/messages.py b/zaqar/transport/wsgi/v2_0/messages.py index 57d5fc466..9a71e619d 100644 --- a/zaqar/transport/wsgi/v2_0/messages.py +++ b/zaqar/transport/wsgi/v2_0/messages.py @@ -37,18 +37,20 @@ class CollectionResource(object): '_queue_controller', '_wsgi_conf', '_validate', - '_default_message_ttl' + '_default_message_ttl', + '_encryptor' ) def __init__(self, wsgi_conf, validate, message_controller, queue_controller, - default_message_ttl): + default_message_ttl, encryptor_factory): self._wsgi_conf = wsgi_conf self._validate = validate self._message_controller = message_controller self._queue_controller = queue_controller self._default_message_ttl = default_message_ttl + self._encryptor = encryptor_factory.getEncryptor() # ---------------------------------------------------------------------- # Helpers @@ -63,10 +65,14 @@ class CollectionResource(object): message_ids=ids, project=project_id) + queue_meta = self._queue_controller.get_metadata(queue_name, + project_id) except validation.ValidationFailed as ex: LOG.debug(ex) raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) - + except storage_errors.QueueDoesNotExist: + LOG.exception('Queue name "%s" does not exist', queue_name) + queue_meta = None except Exception: description = _(u'Message could not be retrieved.') LOG.exception(description) @@ -77,6 +83,10 @@ class CollectionResource(object): if not messages: return None + # Decrypt messages + if queue_meta and queue_meta.get('_enable_encrypt_messages', False): + self._encryptor.message_decrypted(messages) + messages = [wsgi_utils.format_message_v1_1(m, base_path, m['claim_id']) for m in messages] @@ -123,6 +133,10 @@ class CollectionResource(object): cursor = next(results) messages = list(cursor) + # Decrypt messages + if queue_meta.get('_enable_encrypt_messages', False): + self._encryptor.message_decrypted(messages) + except validation.ValidationFailed as ex: LOG.debug(ex) raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) @@ -187,6 +201,7 @@ class CollectionResource(object): queue_max_msg_size = queue_meta.get('_max_messages_post_size') queue_default_ttl = queue_meta.get('_default_message_ttl') queue_delay = queue_meta.get('_default_message_delay') + queue_encrypted = queue_meta.get('_enable_encrypt_messages', False) if queue_default_ttl: message_post_spec = (('ttl', int, queue_default_ttl), @@ -217,6 +232,9 @@ class CollectionResource(object): try: self._validate.message_posting(messages) + if queue_encrypted: + self._encryptor.message_encrypted(messages) + message_ids = self._message_controller.post( queue_name, messages=messages, @@ -343,10 +361,17 @@ class CollectionResource(object): class ItemResource(object): - __slots__ = '_message_controller' + __slots__ = ( + '_message_controller', + '_queue_controller', + '_encryptor' + ) - def __init__(self, message_controller): + def __init__(self, message_controller, queue_controller, + encryptor_factory): self._message_controller = message_controller + self._queue_controller = queue_controller + self._encryptor = encryptor_factory.getEncryptor() @decorators.TransportLog("Messages item") @acl.enforce("messages:get") @@ -357,6 +382,12 @@ class ItemResource(object): message_id, project=project_id) + queue_meta = self._queue_controller.get_metadata(queue_name, + project_id) + # Decrypt messages + if queue_meta.get('_enable_encrypt_messages', False): + self._encryptor.message_decrypted([message]) + except storage_errors.DoesNotExist as ex: LOG.debug(ex) raise wsgi_errors.HTTPNotFound(six.text_type(ex)) diff --git a/zaqar/transport/wsgi/v2_0/queues.py b/zaqar/transport/wsgi/v2_0/queues.py index 014bbeb13..61c0b10be 100644 --- a/zaqar/transport/wsgi/v2_0/queues.py +++ b/zaqar/transport/wsgi/v2_0/queues.py @@ -42,6 +42,7 @@ def _get_reserved_metadata(validate): for metadata in ['_dead_letter_queue', '_dead_letter_queue_messages_ttl', '_max_claim_count']: reserved_metadata.update({metadata: None}) + reserved_metadata.update({'_enable_encrypt_messages': False}) return reserved_metadata