DRY out tests until the stone bleeds

Can we go deeper!?

Change-Id: Ibd3b06542aa1bfcbcb71cc98e6bb21a6a67c12f4
This commit is contained in:
Clay Gerrard 2018-01-16 16:30:13 -08:00 committed by Alistair Coles
parent 314b915179
commit d707fc7b6d

View File

@ -5427,9 +5427,9 @@ class TestObjectController(unittest.TestCase):
self.assertTrue('chost,badhost' in msg)
self.assertTrue('cdevice' in msg)
def test_delete_at_update_on_put_and_post(self):
# Test how delete_at_update works when issued a delete for old
# expiration info after a new put/post with no new expiration info.
def test_delete_at_update_cleans_old_entries(self):
# Test how delete_at_update works with a request to overwrite an object
# with delete-at metadata
policy = random.choice(list(POLICIES))
def do_test(method, headers, expected_args):
@ -5451,41 +5451,26 @@ class TestObjectController(unittest.TestCase):
'DELETE', 2, 'a', 'c', 'o', req, 'sda1', policy)
self.assertEqual(expected_args, given_args)
expected_args = [
'DELETE', '.expiring_objects', '0000000000',
'0000000002-a/c/o', None, None, None,
HeaderKeyDict({
'X-Backend-Storage-Policy-Index': 0,
'x-timestamp': utils.Timestamp('1').internal,
'x-trans-id': '123',
'referer': 'PUT http://localhost/v1/a/c/o'}),
'sda1', policy]
do_test('PUT', {}, expected_args)
do_test('PUT', {'X-Backend-Clean-Expiring-Object-Queue': 'true'},
expected_args)
do_test('PUT', {'X-Backend-Clean-Expiring-Object-Queue': 't'},
expected_args)
expected_args = [
'DELETE', '.expiring_objects', '0000000000',
'0000000002-a/c/o', None, None, None,
HeaderKeyDict({
'X-Backend-Storage-Policy-Index': 0,
'x-timestamp': utils.Timestamp('1').internal,
'x-trans-id': '123',
'referer': 'POST http://localhost/v1/a/c/o'}),
'sda1', policy]
do_test('POST', {}, expected_args)
do_test('POST', {'X-Backend-Clean-Expiring-Object-Queue': 'true'},
expected_args)
do_test('POST', {'X-Backend-Clean-Expiring-Object-Queue': 't'},
expected_args)
do_test('PUT', {'X-Backend-Clean-Expiring-Object-Queue': 'false'}, [])
do_test('PUT', {'X-Backend-Clean-Expiring-Object-Queue': 'f'}, [])
do_test('POST', {'X-Backend-Clean-Expiring-Object-Queue': 'false'}, [])
do_test('POST', {'X-Backend-Clean-Expiring-Object-Queue': 'f'}, [])
for method in ('PUT', 'POST', 'DELETE'):
expected_args = [
'DELETE', '.expiring_objects', '0000000000',
'0000000002-a/c/o', None, None,
None, HeaderKeyDict({
'X-Backend-Storage-Policy-Index': 0,
'x-timestamp': utils.Timestamp('1').internal,
'x-trans-id': '123',
'referer': '%s http://localhost/v1/a/c/o' % method}),
'sda1', policy]
# async_update should be called by default...
do_test(method, {}, expected_args)
do_test(method, {'X-Backend-Clean-Expiring-Object-Queue': 'true'},
expected_args)
do_test(method, {'X-Backend-Clean-Expiring-Object-Queue': 't'},
expected_args)
# ...unless header has a false value
do_test(method, {'X-Backend-Clean-Expiring-Object-Queue': 'false'},
[])
do_test(method, {'X-Backend-Clean-Expiring-Object-Queue': 'f'}, [])
def test_delete_at_negative(self):
# Test how delete_at_update works when issued a delete for old
@ -5614,45 +5599,6 @@ class TestObjectController(unittest.TestCase):
req, 'sda1', policy)
self.assertEqual(given_args, [])
def test_delete_at_update_delete(self):
policy = random.choice(list(POLICIES))
def do_test(headers, expected_args):
given_args = []
def fake_async_update(*args):
given_args.extend(args)
self.object_controller.async_update = fake_async_update
headers.update({'X-Timestamp': 1,
'X-Trans-Id': '1234',
'X-Backend-Storage-Policy-Index': int(policy)})
req = Request.blank(
'/v1/a/c/o',
environ={'REQUEST_METHOD': 'DELETE'},
headers=headers)
self.object_controller.delete_at_update('DELETE', 2, 'a', 'c', 'o',
req, 'sda1', policy)
self.assertEqual(expected_args, given_args)
expected_args = [
'DELETE', '.expiring_objects', '0000000000',
'0000000002-a/c/o', None, None,
None, HeaderKeyDict({
'X-Backend-Storage-Policy-Index': 0,
'x-timestamp': utils.Timestamp('1').internal,
'x-trans-id': '1234',
'referer': 'DELETE http://localhost/v1/a/c/o'}),
'sda1', policy]
do_test({}, expected_args)
do_test({'X-Backend-Clean-Expiring-Object-Queue': 'true'},
expected_args)
do_test({'X-Backend-Clean-Expiring-Object-Queue': 't'}, expected_args)
do_test({'X-Backend-Clean-Expiring-Object-Queue': 'false'}, [])
do_test({'X-Backend-Clean-Expiring-Object-Queue': 'f'}, [])
def test_delete_backend_replication(self):
# If X-Backend-Replication: True delete_at_update should completely
# short-circuit.