Fix decryption for broken objects

Try to get decryption object key from stored metadata (key_id path from
X-Object-Sysmeta-Crypto-Body-Meta) because sometime object.path is wrong
during encryption process.
This patch doesn't solve the underlying issue, but is needed to decrypt
already wrongly stored objects.

Change-Id: I1a6bcdebdb46ef03c342428aeed73ae76db29922
Co-Author: Thomas Goirand <thomas@goirand.fr>
Partial-Bug: #1813725
This commit is contained in:
Simeon Gourlin 2019-01-29 09:13:16 +01:00 committed by Tim Burke
parent bc6ad038cd
commit 9b3ca9423e
2 changed files with 104 additions and 6 deletions

View File

@ -18,7 +18,8 @@ import hmac
from swift.common.exceptions import UnknownSecretIdError from swift.common.exceptions import UnknownSecretIdError
from swift.common.middleware.crypto.crypto_utils import CRYPTO_KEY_CALLBACK from swift.common.middleware.crypto.crypto_utils import CRYPTO_KEY_CALLBACK
from swift.common.swob import Request, HTTPException, wsgi_to_bytes from swift.common.swob import Request, HTTPException, wsgi_to_bytes
from swift.common.utils import readconf, strict_b64decode, get_logger from swift.common.utils import readconf, strict_b64decode, get_logger, \
split_path
from swift.common.wsgi import WSGIContext from swift.common.wsgi import WSGIContext
@ -77,29 +78,54 @@ class KeyMasterContext(WSGIContext):
version = key_id['v'] version = key_id['v']
if version not in ('1', '2'): if version not in ('1', '2'):
raise ValueError('Unknown key_id version: %s' % version) raise ValueError('Unknown key_id version: %s' % version)
if version == '1' and not key_id['path'].startswith(
'/' + self.account + '/'):
# Well shoot. This was the bug that made us notice we needed
# a v2! Hope the current account/container was the original!
key_acct, key_cont, key_obj = (
self.account, self.container, key_id['path'])
else:
key_acct, key_cont, key_obj = split_path(
key_id['path'], 1, 3, True)
check_path = (
self.account, self.container or key_cont, self.obj or key_obj)
if (key_acct, key_cont, key_obj) != check_path:
self.keymaster.logger.info(
"Path stored in meta (%r) does not match path from "
"request (%r)! Using path from meta.",
key_id['path'],
'/' + '/'.join(x for x in [
self.account, self.container, self.obj] if x))
else: else:
secret_id = self.keymaster.active_secret_id secret_id = self.keymaster.active_secret_id
# v1 had a bug where we would claim the path was just the object # v1 had a bug where we would claim the path was just the object
# name if the object started with a slash. Bump versions to # name if the object started with a slash. Bump versions to
# establish that we can trust the path. # establish that we can trust the path.
version = '2' version = '2'
key_acct, key_cont, key_obj = (
self.account, self.container, self.obj)
if (secret_id, version) in self._keys: if (secret_id, version) in self._keys:
return self._keys[(secret_id, version)] return self._keys[(secret_id, version)]
keys = {} keys = {}
account_path = '/' + self.account account_path = '/' + key_acct
try: try:
# self.account/container/obj reflect the level of the *request*,
# which may be different from the level of the key_id-path. Only
# fetch the keys that the request needs.
if self.container: if self.container:
path = account_path + '/' + self.container path = account_path + '/' + key_cont
keys['container'] = self.keymaster.create_key( keys['container'] = self.keymaster.create_key(
path, secret_id=secret_id) path, secret_id=secret_id)
if self.obj: if self.obj:
if self.obj.startswith('/') and version == '1': if key_obj.startswith('/') and version == '1':
path = self.obj path = key_obj
else: else:
path = path + '/' + self.obj path = path + '/' + key_obj
keys['object'] = self.keymaster.create_key( keys['object'] = self.keymaster.create_key(
path, secret_id=secret_id) path, secret_id=secret_id)

View File

@ -482,6 +482,78 @@ class TestKeymaster(unittest.TestCase):
self.assertEqual(expected_keys, keys) self.assertEqual(expected_keys, keys)
self.assertEqual([('/a/c', None), ('/a/c//o', None)], calls) self.assertEqual([('/a/c', None), ('/a/c//o', None)], calls)
def test_v1_keys_with_weird_paths(self):
secrets = {None: os.urandom(32),
'22': os.urandom(33)}
conf = {}
for secret_id, secret in secrets.items():
opt = ('encryption_root_secret%s' %
(('_%s' % secret_id) if secret_id else ''))
conf[opt] = base64.b64encode(secret)
conf['active_root_secret_id'] = '22'
self.app = keymaster.KeyMaster(self.swift, conf)
orig_create_key = self.app.create_key
calls = []
def mock_create_key(path, secret_id=None):
calls.append((path, secret_id))
return orig_create_key(path, secret_id)
# request path doesn't match stored path -- this could happen if you
# misconfigured your proxy to have copy right of encryption
context = keymaster.KeyMasterContext(self.app, 'a', 'not-c', 'not-o')
for version in ('1', '2'):
with mock.patch.object(self.app, 'create_key', mock_create_key):
keys = context.fetch_crypto_keys(key_id={
'v': version, 'path': '/a/c/o'})
expected_keys = {
'container': hmac.new(secrets[None], b'/a/c',
digestmod=hashlib.sha256).digest(),
'object': hmac.new(secrets[None], b'/a/c/o',
digestmod=hashlib.sha256).digest(),
'id': {'path': '/a/c/o', 'v': version},
'all_ids': [
{'path': '/a/c/o', 'v': version},
{'path': '/a/c/o', 'secret_id': '22', 'v': version}]}
self.assertEqual(expected_keys, keys)
self.assertEqual([('/a/c', None), ('/a/c/o', None)], calls)
del calls[:]
context = keymaster.KeyMasterContext(
self.app, 'not-a', 'not-c', '/not-o')
with mock.patch.object(self.app, 'create_key', mock_create_key):
keys = context.fetch_crypto_keys(key_id={
'v': '1', 'path': '/o'})
expected_keys = {
'container': hmac.new(secrets[None], b'/not-a/not-c',
digestmod=hashlib.sha256).digest(),
'object': hmac.new(secrets[None], b'/o',
digestmod=hashlib.sha256).digest(),
'id': {'path': '/o', 'v': '1'},
'all_ids': [
{'path': '/o', 'v': '1'},
{'path': '/o', 'secret_id': '22', 'v': '1'}]}
self.assertEqual(expected_keys, keys)
self.assertEqual([('/not-a/not-c', None), ('/o', None)], calls)
del calls[:]
context = keymaster.KeyMasterContext(
self.app, 'not-a', 'not-c', '/not-o')
with mock.patch.object(self.app, 'create_key', mock_create_key):
keys = context.fetch_crypto_keys(key_id={
'v': '2', 'path': '/a/c//o'})
expected_keys = {
'container': hmac.new(secrets[None], b'/a/c',
digestmod=hashlib.sha256).digest(),
'object': hmac.new(secrets[None], b'/a/c//o',
digestmod=hashlib.sha256).digest(),
'id': {'path': '/a/c//o', 'v': '2'},
'all_ids': [
{'path': '/a/c//o', 'v': '2'},
{'path': '/a/c//o', 'secret_id': '22', 'v': '2'}]}
self.assertEqual(expected_keys, keys)
self.assertEqual([('/a/c', None), ('/a/c//o', None)], calls)
@mock.patch('swift.common.middleware.crypto.keymaster.readconf') @mock.patch('swift.common.middleware.crypto.keymaster.readconf')
def test_keymaster_config_path(self, mock_readconf): def test_keymaster_config_path(self, mock_readconf):
for secret in (os.urandom(32), os.urandom(33), os.urandom(50)): for secret in (os.urandom(32), os.urandom(33), os.urandom(50)):