swift/test/probe/test_container_merge_policy_index.py
Ade Lee 5320ecbaf2 replace md5 with swift utils version
md5 is not an approved algorithm in FIPS mode, and trying to
instantiate a hashlib.md5() will fail when the system is running in
FIPS mode.

md5 is allowed when in a non-security context.  There is a plan to
add a keyword parameter (usedforsecurity) to hashlib.md5() to annotate
whether or not the instance is being used in a security context.

In the case where it is not, the instantiation of md5 will be allowed.
See https://bugs.python.org/issue9216 for more details.

Some downstream python versions already support this parameter.  To
support these versions, a new encapsulation of md5() is added to
swift/common/utils.py.  This encapsulation is identical to the one being
added to oslo.utils, but is recreated here to avoid adding a dependency.

This patch is to replace the instances of hashlib.md5() with this new
encapsulation, adding an annotation indicating whether the usage is
a security context or not.

While this patch seems large, it is really just the same change over and
again.  Reviewers need to pay particular attention as to whether the
keyword parameter (usedforsecurity) is set correctly.   Right now, all
of them appear to be not used in a security context.

Now that all the instances have been converted, we can update the bandit
run to look for these instances and ensure that new invocations do not
creep in.

With this latest patch, the functional and unit tests all pass
on a FIPS enabled system.

Co-Authored-By: Pete Zaitcev
Change-Id: Ibb4917da4c083e1e094156d748708b87387f2d87
2020-12-15 09:52:55 -05:00

583 lines
26 KiB
Python

#!/usr/bin/python -u
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import uuid
import random
import unittest
from swift.common.manager import Manager
from swift.common.internal_client import InternalClient
from swift.common import utils, direct_client
from swift.common.storage_policy import POLICIES
from swift.common.http import HTTP_NOT_FOUND
from swift.common.utils import md5
from swift.container.reconciler import MISPLACED_OBJECTS_ACCOUNT
from test.probe.brain import BrainSplitter, InternalBrainSplitter
from swift.common.request_helpers import get_reserved_name
from test.probe.common import (ReplProbeTest, ENABLED_POLICIES,
POLICIES_BY_TYPE, REPL_POLICY)
from swiftclient import ClientException
TIMEOUT = 60
class TestContainerMergePolicyIndex(ReplProbeTest):
@unittest.skipIf(len(ENABLED_POLICIES) < 2, "Need more than one policy")
def setUp(self):
super(TestContainerMergePolicyIndex, self).setUp()
self.container_name = 'container-%s' % uuid.uuid4()
self.object_name = 'object-%s' % uuid.uuid4()
self.brain = BrainSplitter(self.url, self.token, self.container_name,
self.object_name, 'container')
def _get_object_patiently(self, policy_index):
# use proxy to access object (bad container info might be cached...)
timeout = time.time() + TIMEOUT
while time.time() < timeout:
try:
return self.brain.get_object()
except ClientException as err:
if err.http_status != HTTP_NOT_FOUND:
raise
time.sleep(1)
else:
self.fail('could not GET /%s/%s/%s/ from policy %s '
'after %s seconds.' % (
self.account, self.container_name, self.object_name,
int(policy_index), TIMEOUT))
def test_merge_storage_policy_index(self):
# generic split brain
self.brain.stop_primary_half()
self.brain.put_container()
self.brain.start_primary_half()
self.brain.stop_handoff_half()
self.brain.put_container()
self.brain.put_object(headers={'x-object-meta-test': 'custom-meta'},
contents=b'VERIFY')
self.brain.start_handoff_half()
# make sure we have some manner of split brain
container_part, container_nodes = self.container_ring.get_nodes(
self.account, self.container_name)
head_responses = []
for node in container_nodes:
metadata = direct_client.direct_head_container(
node, container_part, self.account, self.container_name)
head_responses.append((node, metadata))
found_policy_indexes = {
metadata['X-Backend-Storage-Policy-Index']
for node, metadata in head_responses}
self.assertGreater(
len(found_policy_indexes), 1,
'primary nodes did not disagree about policy index %r' %
head_responses)
# find our object
orig_policy_index = None
for policy_index in found_policy_indexes:
object_ring = POLICIES.get_object_ring(policy_index, '/etc/swift')
part, nodes = object_ring.get_nodes(
self.account, self.container_name, self.object_name)
for node in nodes:
try:
direct_client.direct_head_object(
node, part, self.account, self.container_name,
self.object_name,
headers={'X-Backend-Storage-Policy-Index':
policy_index})
except direct_client.ClientException:
continue
orig_policy_index = policy_index
break
if orig_policy_index is not None:
break
else:
self.fail('Unable to find /%s/%s/%s in %r' % (
self.account, self.container_name, self.object_name,
found_policy_indexes))
self.get_to_final_state()
Manager(['container-reconciler']).once()
# validate containers
head_responses = []
for node in container_nodes:
metadata = direct_client.direct_head_container(
node, container_part, self.account, self.container_name)
head_responses.append((node, metadata))
found_policy_indexes = {
metadata['X-Backend-Storage-Policy-Index']
for node, metadata in head_responses}
self.assertEqual(len(found_policy_indexes), 1,
'primary nodes disagree about policy index %r' %
head_responses)
expected_policy_index = found_policy_indexes.pop()
self.assertNotEqual(orig_policy_index, expected_policy_index)
# validate object placement
orig_policy_ring = POLICIES.get_object_ring(orig_policy_index,
'/etc/swift')
for node in orig_policy_ring.devs:
try:
direct_client.direct_head_object(
node, part, self.account, self.container_name,
self.object_name, headers={
'X-Backend-Storage-Policy-Index': orig_policy_index})
except direct_client.ClientException as err:
if err.http_status == HTTP_NOT_FOUND:
continue
raise
else:
self.fail('Found /%s/%s/%s in %s' % (
self.account, self.container_name, self.object_name,
orig_policy_index))
# verify that the object data read by external client is correct
headers, data = self._get_object_patiently(expected_policy_index)
self.assertEqual(b'VERIFY', data)
self.assertEqual('custom-meta', headers['x-object-meta-test'])
def test_reconcile_delete(self):
# generic split brain
self.brain.stop_primary_half()
self.brain.put_container()
self.brain.put_object()
self.brain.start_primary_half()
self.brain.stop_handoff_half()
self.brain.put_container()
self.brain.delete_object()
self.brain.start_handoff_half()
# make sure we have some manner of split brain
container_part, container_nodes = self.container_ring.get_nodes(
self.account, self.container_name)
head_responses = []
for node in container_nodes:
metadata = direct_client.direct_head_container(
node, container_part, self.account, self.container_name)
head_responses.append((node, metadata))
found_policy_indexes = {
metadata['X-Backend-Storage-Policy-Index']
for node, metadata in head_responses}
self.assertGreater(
len(found_policy_indexes), 1,
'primary nodes did not disagree about policy index %r' %
head_responses)
# find our object
orig_policy_index = ts_policy_index = None
for policy_index in found_policy_indexes:
object_ring = POLICIES.get_object_ring(policy_index, '/etc/swift')
part, nodes = object_ring.get_nodes(
self.account, self.container_name, self.object_name)
for node in nodes:
try:
direct_client.direct_head_object(
node, part, self.account, self.container_name,
self.object_name,
headers={'X-Backend-Storage-Policy-Index':
policy_index})
except direct_client.ClientException as err:
if 'x-backend-timestamp' in err.http_headers:
ts_policy_index = policy_index
break
else:
orig_policy_index = policy_index
break
if not orig_policy_index:
self.fail('Unable to find /%s/%s/%s in %r' % (
self.account, self.container_name, self.object_name,
found_policy_indexes))
if not ts_policy_index:
self.fail('Unable to find tombstone /%s/%s/%s in %r' % (
self.account, self.container_name, self.object_name,
found_policy_indexes))
self.get_to_final_state()
Manager(['container-reconciler']).once()
# validate containers
head_responses = []
for node in container_nodes:
metadata = direct_client.direct_head_container(
node, container_part, self.account, self.container_name)
head_responses.append((node, metadata))
node_to_policy = {
node['port']: metadata['X-Backend-Storage-Policy-Index']
for node, metadata in head_responses}
policies = set(node_to_policy.values())
self.assertEqual(len(policies), 1,
'primary nodes disagree about policy index %r' %
node_to_policy)
expected_policy_index = policies.pop()
self.assertEqual(orig_policy_index, expected_policy_index)
# validate object fully deleted
for policy_index in found_policy_indexes:
object_ring = POLICIES.get_object_ring(policy_index, '/etc/swift')
part, nodes = object_ring.get_nodes(
self.account, self.container_name, self.object_name)
for node in nodes:
try:
direct_client.direct_head_object(
node, part, self.account, self.container_name,
self.object_name,
headers={'X-Backend-Storage-Policy-Index':
policy_index})
except direct_client.ClientException as err:
if err.http_status == HTTP_NOT_FOUND:
continue
else:
self.fail('Found /%s/%s/%s in %s on %s' % (
self.account, self.container_name, self.object_name,
orig_policy_index, node))
def get_object_name(self, name):
"""
hook for sublcass to translate object names
"""
return name
def test_reconcile_manifest(self):
if 'slo' not in self.cluster_info:
raise unittest.SkipTest(
"SLO not enabled in proxy; can't test manifest reconciliation")
# this test is not only testing a split brain scenario on
# multiple policies with mis-placed objects - it even writes out
# a static large object directly to the storage nodes while the
# objects are unavailably mis-placed from *behind* the proxy and
# doesn't know how to do that for EC_POLICY (clayg: why did you
# guys let me write a test that does this!?) - so we force
# wrong_policy (where the manifest gets written) to be one of
# any of your configured REPL_POLICY (we know you have one
# because this is a ReplProbeTest)
wrong_policy = random.choice(POLICIES_BY_TYPE[REPL_POLICY])
policy = random.choice([p for p in ENABLED_POLICIES
if p is not wrong_policy])
manifest_data = []
def write_part(i):
body = b'VERIFY%0.2d' % i + b'\x00' * 1048576
part_name = self.get_object_name('manifest_part_%0.2d' % i)
manifest_entry = {
"path": "/%s/%s" % (self.container_name, part_name),
"etag": md5(body, usedforsecurity=False).hexdigest(),
"size_bytes": len(body),
}
self.brain.client.put_object(self.container_name, part_name, {},
body)
manifest_data.append(manifest_entry)
# get an old container stashed
self.brain.stop_primary_half()
self.brain.put_container(int(policy))
self.brain.start_primary_half()
# write some parts
for i in range(10):
write_part(i)
self.brain.stop_handoff_half()
self.brain.put_container(int(wrong_policy))
# write some more parts
for i in range(10, 20):
write_part(i)
# write manifest
with self.assertRaises(ClientException) as catcher:
self.brain.client.put_object(
self.container_name, self.object_name,
{}, utils.json.dumps(manifest_data),
query_string='multipart-manifest=put')
# so as it works out, you can't really upload a multi-part
# manifest for objects that are currently misplaced - you have to
# wait until they're all available - which is about the same as
# some other failure that causes data to be unavailable to the
# proxy at the time of upload
self.assertEqual(catcher.exception.http_status, 400)
# but what the heck, we'll sneak one in just to see what happens...
direct_manifest_name = self.object_name + '-direct-test'
object_ring = POLICIES.get_object_ring(wrong_policy.idx, '/etc/swift')
part, nodes = object_ring.get_nodes(
self.account, self.container_name, direct_manifest_name)
container_part = self.container_ring.get_part(self.account,
self.container_name)
def translate_direct(data):
return {
'hash': data['etag'],
'bytes': data['size_bytes'],
'name': data['path'],
}
direct_manifest_data = [translate_direct(item)
for item in manifest_data]
headers = {
'x-container-host': ','.join('%s:%s' % (n['ip'], n['port']) for n
in self.container_ring.devs),
'x-container-device': ','.join(n['device'] for n in
self.container_ring.devs),
'x-container-partition': container_part,
'X-Backend-Storage-Policy-Index': wrong_policy.idx,
'X-Static-Large-Object': 'True',
}
body = utils.json.dumps(direct_manifest_data).encode('ascii')
for node in nodes:
direct_client.direct_put_object(
node, part, self.account, self.container_name,
direct_manifest_name,
contents=body,
headers=headers)
break # one should do it...
self.brain.start_handoff_half()
self.get_to_final_state()
Manager(['container-reconciler']).once()
# clear proxy cache
self.brain.client.post_container(self.container_name, {})
# let's see how that direct upload worked out...
metadata, body = self.brain.client.get_object(
self.container_name, direct_manifest_name,
query_string='multipart-manifest=get')
self.assertEqual(metadata['x-static-large-object'].lower(), 'true')
for i, entry in enumerate(utils.json.loads(body)):
for key in ('hash', 'bytes', 'name'):
self.assertEqual(entry[key], direct_manifest_data[i][key])
metadata, body = self.brain.client.get_object(
self.container_name, direct_manifest_name)
self.assertEqual(metadata['x-static-large-object'].lower(), 'true')
self.assertEqual(int(metadata['content-length']),
sum(part['size_bytes'] for part in manifest_data))
self.assertEqual(body, b''.join(b'VERIFY%0.2d' % i + b'\x00' * 1048576
for i in range(20)))
# and regular upload should work now too
self.brain.client.put_object(
self.container_name, self.object_name, {},
utils.json.dumps(manifest_data).encode('ascii'),
query_string='multipart-manifest=put')
metadata = self.brain.client.head_object(self.container_name,
self.object_name)
self.assertEqual(int(metadata['content-length']),
sum(part['size_bytes'] for part in manifest_data))
def test_reconcile_symlink(self):
if 'symlink' not in self.cluster_info:
raise unittest.SkipTest(
"Symlink not enabled in proxy; can't test "
"symlink reconciliation")
wrong_policy = random.choice(ENABLED_POLICIES)
policy = random.choice([p for p in ENABLED_POLICIES
if p is not wrong_policy])
# get an old container stashed
self.brain.stop_primary_half()
self.brain.put_container(int(policy))
self.brain.start_primary_half()
# write some target data
target_name = self.get_object_name('target')
self.brain.client.put_object(self.container_name, target_name, {},
b'this is the target data')
# write the symlink
self.brain.stop_handoff_half()
self.brain.put_container(int(wrong_policy))
symlink_name = self.get_object_name('symlink')
self.brain.client.put_object(
self.container_name, symlink_name, {
'X-Symlink-Target': '%s/%s' % (
self.container_name, target_name),
'Content-Type': 'application/symlink',
}, b'')
# at this point we have a broken symlink (the container_info has the
# proxy looking for the target in the wrong policy)
with self.assertRaises(ClientException) as ctx:
self.brain.client.get_object(self.container_name, symlink_name)
self.assertEqual(ctx.exception.http_status, 404)
# of course the symlink itself is fine
metadata, body = self.brain.client.get_object(
self.container_name, symlink_name, query_string='symlink=get')
self.assertEqual(metadata['x-symlink-target'],
utils.quote('%s/%s' % (
self.container_name, target_name)))
self.assertEqual(metadata['content-type'], 'application/symlink')
self.assertEqual(body, b'')
# ... although in the wrong policy
object_ring = POLICIES.get_object_ring(int(wrong_policy), '/etc/swift')
part, nodes = object_ring.get_nodes(
self.account, self.container_name, symlink_name)
for node in nodes:
metadata = direct_client.direct_head_object(
node, part, self.account, self.container_name, symlink_name,
headers={'X-Backend-Storage-Policy-Index': int(wrong_policy)})
self.assertEqual(metadata['X-Object-Sysmeta-Symlink-Target'],
utils.quote('%s/%s' % (
self.container_name, target_name)))
# let the reconciler run
self.brain.start_handoff_half()
self.get_to_final_state()
Manager(['container-reconciler']).once()
# clear proxy cache
self.brain.client.post_container(self.container_name, {})
# now the symlink works
metadata, body = self.brain.client.get_object(
self.container_name, symlink_name)
self.assertEqual(body, b'this is the target data')
# and it's in the correct policy
object_ring = POLICIES.get_object_ring(int(policy), '/etc/swift')
part, nodes = object_ring.get_nodes(
self.account, self.container_name, symlink_name)
for node in nodes:
metadata = direct_client.direct_head_object(
node, part, self.account, self.container_name, symlink_name,
headers={'X-Backend-Storage-Policy-Index': int(policy)})
self.assertEqual(metadata['X-Object-Sysmeta-Symlink-Target'],
utils.quote('%s/%s' % (
self.container_name, target_name)))
def test_reconciler_move_object_twice(self):
# select some policies
old_policy = random.choice(ENABLED_POLICIES)
new_policy = random.choice([p for p in ENABLED_POLICIES
if p != old_policy])
# setup a split brain
self.brain.stop_handoff_half()
# get old_policy on two primaries
self.brain.put_container(policy_index=int(old_policy))
self.brain.start_handoff_half()
self.brain.stop_primary_half()
# force a recreate on handoffs
self.brain.put_container(policy_index=int(old_policy))
self.brain.delete_container()
self.brain.put_container(policy_index=int(new_policy))
self.brain.put_object() # populate memcache with new_policy
self.brain.start_primary_half()
# at this point two primaries have old policy
container_part, container_nodes = self.container_ring.get_nodes(
self.account, self.container_name)
head_responses = [
(node, direct_client.direct_head_container(
node, container_part, self.account, self.container_name))
for node in container_nodes]
old_container_nodes = [
node for node, metadata in head_responses
if int(old_policy) ==
int(metadata['X-Backend-Storage-Policy-Index'])]
self.assertEqual(2, len(old_container_nodes))
# hopefully memcache still has the new policy cached
self.brain.put_object(headers={'x-object-meta-test': 'custom-meta'},
contents=b'VERIFY')
# double-check object correctly written to new policy
conf_files = []
for server in Manager(['container-reconciler']).servers:
conf_files.extend(server.conf_files())
conf_file = conf_files[0]
int_client = InternalClient(conf_file, 'probe-test', 3)
int_client.get_object_metadata(
self.account, self.container_name, self.object_name,
headers={'X-Backend-Storage-Policy-Index': int(new_policy)})
int_client.get_object_metadata(
self.account, self.container_name, self.object_name,
acceptable_statuses=(4,),
headers={'X-Backend-Storage-Policy-Index': int(old_policy)})
# shutdown the containers that know about the new policy
self.brain.stop_handoff_half()
# and get rows enqueued from old nodes
for server_type in ('container-replicator', 'container-updater'):
server = Manager([server_type])
for node in old_container_nodes:
server.once(number=self.config_number(node))
# verify entry in the queue for the "misplaced" new_policy
for container in int_client.iter_containers(MISPLACED_OBJECTS_ACCOUNT):
for obj in int_client.iter_objects(MISPLACED_OBJECTS_ACCOUNT,
container['name']):
expected = '%d:/%s/%s/%s' % (new_policy, self.account,
self.container_name,
self.object_name)
self.assertEqual(obj['name'], expected)
Manager(['container-reconciler']).once()
# verify object in old_policy
int_client.get_object_metadata(
self.account, self.container_name, self.object_name,
headers={'X-Backend-Storage-Policy-Index': int(old_policy)})
# verify object is *not* in new_policy
int_client.get_object_metadata(
self.account, self.container_name, self.object_name,
acceptable_statuses=(4,),
headers={'X-Backend-Storage-Policy-Index': int(new_policy)})
self.get_to_final_state()
# verify entry in the queue
for container in int_client.iter_containers(MISPLACED_OBJECTS_ACCOUNT):
for obj in int_client.iter_objects(MISPLACED_OBJECTS_ACCOUNT,
container['name']):
expected = '%d:/%s/%s/%s' % (old_policy, self.account,
self.container_name,
self.object_name)
self.assertEqual(obj['name'], expected)
Manager(['container-reconciler']).once()
# and now it flops back
int_client.get_object_metadata(
self.account, self.container_name, self.object_name,
headers={'X-Backend-Storage-Policy-Index': int(new_policy)})
int_client.get_object_metadata(
self.account, self.container_name, self.object_name,
acceptable_statuses=(4,),
headers={'X-Backend-Storage-Policy-Index': int(old_policy)})
# make sure the queue is settled
self.get_to_final_state()
for container in int_client.iter_containers(MISPLACED_OBJECTS_ACCOUNT):
for obj in int_client.iter_objects(MISPLACED_OBJECTS_ACCOUNT,
container['name']):
self.fail('Found unexpected object %r in the queue' % obj)
# verify that the object data read by external client is correct
headers, data = self._get_object_patiently(int(new_policy))
self.assertEqual(b'VERIFY', data)
self.assertEqual('custom-meta', headers['x-object-meta-test'])
class TestReservedNamespaceMergePolicyIndex(TestContainerMergePolicyIndex):
@unittest.skipIf(len(ENABLED_POLICIES) < 2, "Need more than one policy")
def setUp(self):
super(TestReservedNamespaceMergePolicyIndex, self).setUp()
self.container_name = get_reserved_name('container', str(uuid.uuid4()))
self.object_name = get_reserved_name('object', str(uuid.uuid4()))
self.brain = InternalBrainSplitter('/etc/swift/internal-client.conf',
self.container_name,
self.object_name, 'container')
def get_object_name(self, name):
return get_reserved_name(name)
def test_reconcile_manifest(self):
raise unittest.SkipTest(
'SLO does not allow parts in the reserved namespace')
if __name__ == "__main__":
unittest.main()