Pass db_state to container-update and async_pending

When the proxy passes the container-update headers to the object server
now include the db_state, which it already had in hand. This will be
written to async_pending and allow the object-updater to know more about
a container rather then just relying on container_path attribute.

This patch also cleans up the PUT, POST and DELETE _get_update_target
paths refactoring the call into _backend_requests, only used by these
methods, so it only happens once.

Change-Id: Ie665e5c656c7fb27b45ee7427fe4b07ad466e3e2
This commit is contained in:
Matthew Oliver 2023-03-08 12:28:14 +11:00 committed by Clay Gerrard
parent 9c38d756e0
commit e8affa7db5
8 changed files with 330 additions and 99 deletions

View File

@ -272,7 +272,8 @@ class ObjectController(BaseStorageServer):
def async_update(self, op, account, container, obj, host, partition,
contdevice, headers_out, objdevice, policy,
logger_thread_locals=None, container_path=None):
logger_thread_locals=None, container_path=None,
db_state=None):
"""
Sends or saves an async update.
@ -294,6 +295,8 @@ class ObjectController(BaseStorageServer):
to which the update should be sent. If given this path will be used
instead of constructing a path from the ``account`` and
``container`` params.
:param db_state: The current database state of the container as
supplied to us by the proxy.
"""
if logger_thread_locals:
self.logger.thread_locals = logger_thread_locals
@ -337,7 +340,7 @@ class ObjectController(BaseStorageServer):
'%(ip)s:%(port)s/%(dev)s (saving for async update later)',
{'ip': ip, 'port': port, 'dev': contdevice})
data = {'op': op, 'account': account, 'container': container,
'obj': obj, 'headers': headers_out}
'obj': obj, 'headers': headers_out, 'db_state': db_state}
if redirect_data:
self.logger.debug(
'Update to %(path)s redirected to %(redirect)s',
@ -371,6 +374,7 @@ class ObjectController(BaseStorageServer):
contdevices = [d.strip() for d in
headers_in.get('X-Container-Device', '').split(',')]
contpartition = headers_in.get('X-Container-Partition', '')
contdbstate = headers_in.get('X-Container-Root-Db-State')
if len(conthosts) != len(contdevices):
# This shouldn't happen unless there's a bug in the proxy,
@ -421,7 +425,7 @@ class ObjectController(BaseStorageServer):
conthost, contpartition, contdevice, headers_out,
objdevice, policy,
logger_thread_locals=self.logger.thread_locals,
container_path=contpath)
container_path=contpath, db_state=contdbstate)
update_greenthreads.append(gt)
# Wait a little bit to see if the container updates are successful.
# If we immediately return after firing off the greenthread above, then

View File

@ -391,9 +391,10 @@ class BaseObjectController(Controller):
if update_shard_ns:
partition, nodes = self.app.container_ring.get_nodes(
update_shard_ns.account, update_shard_ns.container)
return partition, nodes, update_shard_ns.name
return partition, nodes, update_shard_ns.name, db_state
return container_info['partition'], container_info['nodes'], None
return (container_info['partition'], container_info['nodes'], None,
db_state)
@public
@cors_validation
@ -402,8 +403,6 @@ class BaseObjectController(Controller):
"""HTTP POST request handler."""
container_info = self.container_info(
self.account_name, self.container_name, req)
container_partition, container_nodes, container_path = \
self._get_update_target(req, container_info)
req.acl = container_info['write_acl']
if is_open_expired(self.app, req):
req.headers['X-Backend-Open-Expired'] = 'true'
@ -411,7 +410,7 @@ class BaseObjectController(Controller):
aresp = req.environ['swift.authorize'](req)
if aresp:
return aresp
if not container_nodes:
if not is_success(container_info.get('status')):
return HTTPNotFound(request=req)
error_response = check_metadata(req, 'object')
if error_response:
@ -434,17 +433,17 @@ class BaseObjectController(Controller):
self.account_name, self.container_name, self.object_name)
headers = self._backend_requests(
req, len(nodes), container_partition, container_nodes,
delete_at_container, delete_at_part, delete_at_nodes,
container_path=container_path)
req, len(nodes), container_info, delete_at_container,
delete_at_part, delete_at_nodes)
return self._post_object(req, obj_ring, partition, headers)
def _backend_requests(self, req, n_outgoing,
container_partition, containers,
delete_at_container=None, delete_at_partition=None,
delete_at_nodes=None, container_path=None):
container_info, delete_at_container=None,
delete_at_partition=None, delete_at_nodes=None):
policy_index = req.headers['X-Backend-Storage-Policy-Index']
policy = POLICIES.get_by_index(policy_index)
container_partition, containers, container_path, db_state = \
self._get_update_target(req, container_info)
headers = [self.generate_request_headers(req, additional=req.headers)
for _junk in range(n_outgoing)]
@ -457,6 +456,7 @@ class BaseObjectController(Controller):
headers[index]['X-Container-Device'] = csv_append(
headers[index].get('X-Container-Device'),
container_node['device'])
headers[index]['X-Container-Root-Db-State'] = db_state
if container_path:
headers[index]['X-Backend-Quoted-Container-Path'] = quote(
container_path)
@ -848,8 +848,6 @@ class BaseObjectController(Controller):
policy_index = req.headers.get('X-Backend-Storage-Policy-Index',
container_info['storage_policy'])
obj_ring = self.app.get_object_ring(policy_index)
container_partition, container_nodes, container_path = \
self._get_update_target(req, container_info)
partition, nodes = obj_ring.get_nodes(
self.account_name, self.container_name, self.object_name)
@ -867,7 +865,7 @@ class BaseObjectController(Controller):
if aresp:
return aresp
if not container_nodes:
if not is_success(container_info.get('status')):
return HTTPNotFound(request=req)
# update content type in case it is missing
@ -895,9 +893,8 @@ class BaseObjectController(Controller):
# add special headers to be handled by storage nodes
outgoing_headers = self._backend_requests(
req, len(nodes), container_partition, container_nodes,
delete_at_container, delete_at_part, delete_at_nodes,
container_path=container_path)
req, len(nodes), container_info,
delete_at_container, delete_at_part, delete_at_nodes)
# send object to storage nodes
resp = self._store_object(
@ -920,15 +917,13 @@ class BaseObjectController(Controller):
next_part_power = getattr(obj_ring, 'next_part_power', None)
if next_part_power:
req.headers['X-Backend-Next-Part-Power'] = next_part_power
container_partition, container_nodes, container_path = \
self._get_update_target(req, container_info)
req.acl = container_info['write_acl']
req.environ['swift_sync_key'] = container_info['sync_key']
if 'swift.authorize' in req.environ:
aresp = req.environ['swift.authorize'](req)
if aresp:
return aresp
if not container_nodes:
if not is_success(container_info.get('status')):
return HTTPNotFound(request=req)
partition, nodes = obj_ring.get_nodes(
self.account_name, self.container_name, self.object_name)
@ -951,9 +946,7 @@ class BaseObjectController(Controller):
obj_ring, partition, req, policy=policy,
local_handoffs_first=True)
headers = self._backend_requests(
req, node_count, container_partition, container_nodes,
container_path=container_path)
headers = self._backend_requests(req, node_count, container_info)
return self._delete_object(req, obj_ring, partition, headers,
node_count=node_count,
node_iterator=node_iterator)

View File

@ -555,16 +555,18 @@ class ProbeTest(unittest.TestCase):
for policy in ENABLED_POLICIES:
for dev in policy.object_ring.devs:
all_obj_nodes[dev['device']] = dev
return all_obj_nodes.values()
return list(all_obj_nodes.values())
def gather_async_pendings(self, onodes):
def gather_async_pendings(self, onodes=None):
"""
Returns a list of paths to async pending files found on given nodes.
:param onodes: a list of nodes.
:param onodes: a list of nodes. If None, check all object nodes.
:return: a list of file paths.
"""
async_pendings = []
if onodes is None:
onodes = self.get_all_object_nodes()
for onode in onodes:
device_dir = self.device_dir(onode)
for ap_pol_dir in os.listdir(device_dir):

View File

@ -170,12 +170,11 @@ class TestObjectExpirer(ReplProbeTest):
# Make sure there's no async_pendings anywhere. Probe tests only run
# on single-node installs anyway, so this set should be small enough
# that an exhaustive check doesn't take too long.
all_obj_nodes = self.get_all_object_nodes()
pendings_before = self.gather_async_pendings(all_obj_nodes)
pendings_before = self.gather_async_pendings()
# expire the objects
Manager(['object-expirer']).once()
pendings_after = self.gather_async_pendings(all_obj_nodes)
pendings_after = self.gather_async_pendings()
self.assertEqual(pendings_after, pendings_before)
def test_expirer_object_should_not_be_expired(self):

View File

@ -14,6 +14,7 @@
# limitations under the License.
import json
import os
import pickle
import shutil
import subprocess
import unittest
@ -701,8 +702,8 @@ class TestContainerShardingNonUTF8(BaseAutoContainerSharding):
class TestContainerShardingFunkyNames(TestContainerShardingNonUTF8):
DELIM = '\n'
def _make_object_names(self, number):
return ['obj\n%04d%%Ff' % x for x in range(number)]
def _make_object_names(self, number, start=0):
return ['obj\n%04d%%Ff' % x for x in range(start, start + number)]
def _setup_container_name(self):
self.container_name = 'container\n%%Ff\n%s' % uuid.uuid4()
@ -1446,6 +1447,37 @@ class TestContainerSharding(BaseAutoContainerSharding):
additional_args='--partitions=%s' % self.brain.part)
self.assert_container_listing(obj_names, req_hdrs={'x-newest': 'true'})
def assertInAsyncFile(self, async_path, expected):
with open(async_path, 'rb') as fd:
async_data = pickle.load(fd)
errors = []
for k, v in expected.items():
if k not in async_data:
errors.append("Key '%s' does not exist" % k)
continue
if async_data[k] != v:
errors.append(
"Exp value %s != %s" % (str(v), str(async_data[k])))
continue
if errors:
self.fail('\n'.join(errors))
def assertNotInAsyncFile(self, async_path, not_expect_keys):
with open(async_path, 'rb') as fd:
async_data = pickle.load(fd)
errors = []
for k in not_expect_keys:
if k in async_data:
errors.append(
"Key '%s' exists with value '%s'" % (k, async_data[k]))
continue
if errors:
self.fail('\n'.join(errors))
def test_async_pendings(self):
obj_names = self._make_object_names(self.max_shard_size * 2)
@ -1459,6 +1491,13 @@ class TestContainerSharding(BaseAutoContainerSharding):
self.put_objects(obj_names[i::5])
self.brain.servers.start(number=n)
# Check the async pendings, they are unsharded so that's the db_state
async_files = self.gather_async_pendings()
self.assertTrue(async_files)
for af in async_files:
self.assertInAsyncFile(af, {'db_state': 'unsharded'})
self.assertNotInAsyncFile(af, ['container_path'])
# But there are also 1/5 updates *no one* gets
self.brain.servers.stop()
self.put_objects(obj_names[4::5])
@ -1542,6 +1581,9 @@ class TestContainerSharding(BaseAutoContainerSharding):
# Run updaters to clear the async pendings
Manager(['object-updater']).once()
async_files = self.gather_async_pendings()
self.assertFalse(async_files)
# Our "big" dbs didn't take updates
for db_file in found['normal_dbs']:
broker = ContainerBroker(db_file)
@ -1606,6 +1648,66 @@ class TestContainerSharding(BaseAutoContainerSharding):
for x in listing],
obj_names)
# Create a few more objects in async pending. Check them, they should
# now have the correct db_state as sharded
more_obj_names = self._make_object_names(10, self.max_shard_size * 2)
# No one should get these updates
self.brain.servers.stop()
self.put_objects(more_obj_names)
self.brain.servers.start()
async_files = self.gather_async_pendings()
self.assertTrue(async_files)
for af in async_files:
# They should have a sharded db_state
self.assertInAsyncFile(af, {'db_state': 'sharded'})
# But because the container-servers were down, they wont have
# container-path (because it couldn't get a shard range back)
self.assertNotInAsyncFile(af, ['container_path'])
# they don't exist yet
headers, listing = client.get_container(self.url, self.token,
self.container_name)
self.assertEqual([x['name'].encode('utf-8') if six.PY2 else x['name']
for x in listing],
obj_names)
# Now clear them out and they should now exist where we expect.
Manager(['object-updater']).once()
headers, listing = client.get_container(self.url, self.token,
self.container_name)
self.assertEqual([x['name'].encode('utf-8') if six.PY2 else x['name']
for x in listing],
obj_names + more_obj_names)
# And they're cleared up
async_files = self.gather_async_pendings()
self.assertFalse(async_files)
# If we take 1/2 the nodes offline when we add some more objects,
# we should get async pendings with container-path because there
# was a container-server to respond.
even_more_obj_names = self._make_object_names(
10, self.max_shard_size * 2 + 10)
self.brain.stop_primary_half()
self.put_objects(even_more_obj_names)
self.brain.start_primary_half()
async_files = self.gather_async_pendings()
self.assertTrue(async_files)
for af in async_files:
# They should have a sharded db_state AND container_path
self.assertInAsyncFile(af, {'db_state': 'sharded',
'container_path': mock.ANY})
Manager(['object-updater']).once()
# And they're cleared up
async_files = self.gather_async_pendings()
self.assertFalse(async_files)
def test_shrinking(self):
int_client = self.make_internal_client()
@ -1783,8 +1885,7 @@ class TestContainerSharding(BaseAutoContainerSharding):
self.put_objects([beta])
self.brain.servers.start()
async_pendings = self.gather_async_pendings(
self.get_all_object_nodes())
async_pendings = self.gather_async_pendings()
num_container_replicas = len(self.brain.nodes)
num_obj_replicas = self.policy.object_ring.replica_count
expected_num_updates = num_container_updates(
@ -2449,8 +2550,7 @@ class TestContainerSharding(BaseAutoContainerSharding):
self.put_objects(['alpha'])
self.assert_container_listing(['alpha'])
self.assert_container_object_count(0)
self.assertLengthEqual(
self.gather_async_pendings(self.get_all_object_nodes()), 1)
self.assertLengthEqual(self.gather_async_pendings(), 1)
self.brain.servers.start(number=shard_nodes[2])
# run sharder on root to discover first shrink candidate
@ -2497,8 +2597,7 @@ class TestContainerSharding(BaseAutoContainerSharding):
self.put_objects(['beta'])
self.assert_container_listing(['beta'])
self.assert_container_object_count(1)
self.assertLengthEqual(
self.gather_async_pendings(self.get_all_object_nodes()), 2)
self.assertLengthEqual(self.gather_async_pendings(), 2)
self.brain.servers.start(number=shard_nodes[2])
# run sharder on root to discover second shrink candidate - root is not
@ -3097,11 +3196,11 @@ class TestShardedAPI(BaseTestContainerSharding):
class TestContainerShardingMoreUTF8(TestContainerSharding):
def _make_object_names(self, number):
def _make_object_names(self, number, start=0):
# override default with names that include non-ascii chars
name_length = self.cluster_info['swift']['max_object_name_length']
obj_names = []
for x in range(number):
for x in range(start, start + number):
name = (u'obj-\u00e4\u00ea\u00ec\u00f2\u00fb-%04d' % x)
name = name.encode('utf8').ljust(name_length, b'o')
if not six.PY2:

View File

@ -978,7 +978,8 @@ class TestObjectController(BaseTestCase):
'X-Backend-Storage-Policy-Index': int(policy),
'X-Container-Host': 'chost:cport',
'X-Container-Partition': 'cpartition',
'X-Container-Device': 'cdevice'}
'X-Container-Device': 'cdevice',
'X-Container-Root-Db-State': 'unsharded'}
if policy.policy_type == EC_POLICY:
put_headers.update({
'X-Object-Sysmeta-Ec-Frag-Index': '2',
@ -1016,7 +1017,8 @@ class TestObjectController(BaseTestCase):
self.assertDictEqual(
pickle.load(open(async_pending_file_put, 'rb')),
{'headers': expected_put_headers,
'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT'})
'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT',
'db_state': 'unsharded'})
# POST with newer metadata returns success and container update
# is expected
@ -1028,7 +1030,8 @@ class TestObjectController(BaseTestCase):
'X-Backend-Storage-Policy-Index': int(policy),
'X-Container-Host': 'chost:cport',
'X-Container-Partition': 'cpartition',
'X-Container-Device': 'cdevice'}
'X-Container-Device': 'cdevice',
'X-Container-Root-Db-State': 'unsharded'}
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'POST'},
headers=post_headers)
@ -1045,7 +1048,8 @@ class TestObjectController(BaseTestCase):
self.assertDictEqual(
pickle.load(open(async_pending_file_put, 'rb')),
{'headers': expected_put_headers,
'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT'})
'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT',
'db_state': 'unsharded'})
# check distinct async pending file for POST
async_pending_file_post = os.path.join(
@ -1071,7 +1075,8 @@ class TestObjectController(BaseTestCase):
self.assertDictEqual(
pickle.load(open(async_pending_file_post, 'rb')),
{'headers': expected_post_headers,
'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT'})
'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT',
'db_state': 'unsharded'})
# verify that only the POST (most recent) async update gets sent by the
# object updater, and that both update files are deleted
@ -1124,13 +1129,15 @@ class TestObjectController(BaseTestCase):
'X-Backend-Storage-Policy-Index': int(policy),
'X-Container-Host': 'chost:3200',
'X-Container-Partition': '99',
'X-Container-Device': 'cdevice'}
'X-Container-Device': 'cdevice',
'X-Container-Root-Db-State': 'unsharded'}
if container_path:
# the proxy may include either header
hdr = ('X-Backend-Container-Path' if old_style
else 'X-Backend-Quoted-Container-Path')
put_headers[hdr] = container_path
put_headers['X-Container-Root-Db-State'] = 'sharded'
expected_update_path = '/cdevice/99/%s/o' % container_path
else:
expected_update_path = '/cdevice/99/a/c/o'
@ -1180,7 +1187,8 @@ class TestObjectController(BaseTestCase):
self.assertEqual(
{'headers': expected_put_headers,
'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT',
'container_path': '.sharded_a/c_shard_1'},
'container_path': '.sharded_a/c_shard_1',
'db_state': 'sharded' if container_path else 'unsharded'},
pickle.load(open(async_pending_file_put, 'rb')))
# when updater is run its first request will be to the redirect
@ -5940,7 +5948,7 @@ class TestObjectController(BaseTestCase):
'PUT', 'a', 'c', 'o', '127.0.0.1:1234', 1, 'sdc1',
{'x-timestamp': '1', 'x-out': 'set',
'X-Backend-Storage-Policy-Index': int(policy)}, 'sda1',
policy)
policy, db_state='unsharded')
finally:
object_server.http_connect = orig_http_connect
utils.HASH_PATH_PREFIX = _prefix
@ -5953,7 +5961,8 @@ class TestObjectController(BaseTestCase):
{'headers': {'x-timestamp': '1', 'x-out': 'set',
'user-agent': 'object-server %s' % os.getpid(),
'X-Backend-Storage-Policy-Index': int(policy)},
'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT'})
'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT',
'db_state': 'unsharded'})
def test_async_update_saves_on_non_2xx(self):
policy = random.choice(list(POLICIES))
@ -5984,7 +5993,7 @@ class TestObjectController(BaseTestCase):
'PUT', 'a', 'c', 'o', '127.0.0.1:1234', 1, 'sdc1',
{'x-timestamp': '1', 'x-out': str(status),
'X-Backend-Storage-Policy-Index': int(policy)}, 'sda1',
policy)
policy, db_state='unsharded')
async_dir = diskfile.get_async_dir(policy)
self.assertEqual(
pickle.load(open(os.path.join(
@ -5997,7 +6006,7 @@ class TestObjectController(BaseTestCase):
'X-Backend-Storage-Policy-Index':
int(policy)},
'account': 'a', 'container': 'c', 'obj': 'o',
'op': 'PUT'})
'op': 'PUT', 'db_state': 'unsharded'})
finally:
object_server.http_connect = orig_http_connect
utils.HASH_PATH_PREFIX = _prefix
@ -6224,12 +6233,14 @@ class TestObjectController(BaseTestCase):
'X-Container-Host': 'chost:cport',
'X-Container-Partition': 'cpartition',
'X-Container-Device': 'cdevice',
'X-Container-Root-Db-State': 'unsharded',
'Content-Type': 'text/plain',
'X-Object-Sysmeta-Ec-Frag-Index': 0,
'X-Backend-Storage-Policy-Index': int(policy),
}
if container_path is not None:
headers['X-Backend-Container-Path'] = container_path
headers['X-Container-Root-Db-State'] = 'sharded'
req = Request.blank('/sda1/0/a/c/o', method='PUT',
headers=headers, body='')
@ -6271,9 +6282,12 @@ class TestObjectController(BaseTestCase):
'obj': 'o',
'account': 'a',
'container': 'c',
'op': 'PUT'}
'op': 'PUT',
'db_state': 'unsharded'}
if expected_container_path:
expected_data['container_path'] = expected_container_path
if container_path is not None:
expected_data['db_state'] = 'sharded'
self.assertEqual(expected_data, data)
do_test('a_shard/c_shard', 'a_shard/c_shard', 'a_shard/c_shard')
@ -6314,12 +6328,14 @@ class TestObjectController(BaseTestCase):
'X-Container-Host': 'chost:cport',
'X-Container-Partition': 'cpartition',
'X-Container-Device': 'cdevice',
'X-Container-Root-Db-State': 'unsharded',
'Content-Type': 'text/plain',
'X-Object-Sysmeta-Ec-Frag-Index': 0,
'X-Backend-Storage-Policy-Index': int(policy),
}
if container_path is not None:
headers['X-Backend-Quoted-Container-Path'] = container_path
headers['X-Container-Root-Db-State'] = 'sharded'
req = Request.blank('/sda1/0/a/c/o', method='PUT',
headers=headers, body='')
@ -6361,9 +6377,12 @@ class TestObjectController(BaseTestCase):
'obj': 'o',
'account': 'a',
'container': 'c',
'op': 'PUT'}
'op': 'PUT',
'db_state': 'unsharded'}
if expected_container_path:
expected_data['container_path'] = expected_container_path
if container_path is not None:
expected_data['db_state'] = 'sharded'
self.assertEqual(expected_data, data)
do_test('a_shard/c_shard', 'a_shard/c_shard', 'a_shard/c_shard')
@ -6388,6 +6407,7 @@ class TestObjectController(BaseTestCase):
'X-Container-Host': 'chost:cport',
'X-Container-Partition': 'cpartition',
'X-Container-Device': 'cdevice',
'X-Container-Root-Db-State': 'unsharded',
'Content-Type': 'text/plain',
'X-Object-Sysmeta-Ec-Frag-Index': 0,
'X-Backend-Storage-Policy-Index': int(policy)}, body='')
@ -6426,7 +6446,8 @@ class TestObjectController(BaseTestCase):
'obj': 'o',
'account': 'a',
'container': 'c',
'op': 'PUT'})
'op': 'PUT',
'db_state': 'unsharded'})
def test_container_update_as_greenthread(self):
greenthreads = []
@ -6450,7 +6471,8 @@ class TestObjectController(BaseTestCase):
'X-Backend-Storage-Policy-Index': 0,
'X-Container-Partition': '20',
'X-Container-Host': '1.2.3.4:5',
'X-Container-Device': 'sdb1'})
'X-Container-Device': 'sdb1',
'X-Container-Root-Db-State': 'unsharded'})
with mock.patch.object(object_server, 'spawn', local_fake_spawn), \
mock.patch.object(self.object_controller, 'async_update',
local_fake_async_update):
@ -6477,7 +6499,8 @@ class TestObjectController(BaseTestCase):
expected = [('PUT', 'a', 'c', 'o', '1.2.3.4:5', '20', 'sdb1',
headers_out, 'sda1', POLICIES[0]),
{'logger_thread_locals': (None, None),
'container_path': None}]
'container_path': None,
'db_state': 'unsharded'}]
self.assertEqual(called_async_update_args, [expected])
def test_container_update_as_greenthread_with_timeout(self):
@ -7944,6 +7967,7 @@ class TestObjectController(BaseTestCase):
'account': '.expiring_objects',
'container': delete_at_container,
'obj': '%s-a/c/o' % put_delete_at,
'db_state': None,
'headers': {
'X-Backend-Storage-Policy-Index': '0',
# only POST-1 has to clear the orig PUT delete-at

View File

@ -163,14 +163,22 @@ def make_footers_callback(body=None):
class BaseObjectControllerMixin(object):
container_info = {
'status': 200,
'write_acl': None,
'read_acl': None,
'storage_policy': None,
'sync_key': None,
'versions': None,
}
def fake_container_info(self, extra_info=None):
container_info = {
'status': 200,
'read_acl': None,
'write_acl': None,
'sync_key': None,
'versions': None,
'storage_policy': '0',
'partition': 50,
'nodes': [],
'sharding_state': 'unsharded',
}
if extra_info:
container_info.update(extra_info)
return container_info
# this needs to be set on the test case
controller_cls = None
@ -191,7 +199,7 @@ class BaseObjectControllerMixin(object):
# you can over-ride the container_info just by setting it on the app
# (see PatchedObjControllerApp for details)
self.app.container_info = dict(self.container_info)
self.app.container_info = dict(self.fake_container_info())
# default policy and ring references
self.policy = POLICIES.default
@ -878,7 +886,7 @@ class CommonObjectControllerMixin(BaseObjectControllerMixin):
conf, account_ring=FakeRing(),
container_ring=FakeRing(), logger=None)
# Use the same container info as the app used in other tests
self.app.container_info = dict(self.container_info)
self.app.container_info = dict(self.fake_container_info())
self.obj_ring = self.app.get_object_ring(int(self.policy))
for method, num_reqs in (
@ -920,7 +928,7 @@ class CommonObjectControllerMixin(BaseObjectControllerMixin):
conf, account_ring=FakeRing(),
container_ring=FakeRing(), logger=None)
# Use the same container info as the app used in other tests
self.app.container_info = dict(self.container_info)
self.app.container_info = dict(self.fake_container_info())
self.obj_ring = self.app.get_object_ring(int(self.policy))
for method, num_reqs in (
@ -1073,8 +1081,10 @@ class CommonObjectControllerMixin(BaseObjectControllerMixin):
'port': '60%s' % str(i).zfill(2),
'device': 'sdb'} for i in range(num_containers)]
container_info = self.fake_container_info(
{'nodes': containers})
backend_headers = controller._backend_requests(
req, self.replicas(policy), 1, containers)
req, self.replicas(policy), container_info)
# how many of the backend headers have a container update
n_container_updates = len(
@ -1116,8 +1126,11 @@ class CommonObjectControllerMixin(BaseObjectControllerMixin):
{'ip': '1.0.0.%s' % i, 'port': '60%s' % str(i).zfill(2),
'device': 'sdb'} for i in range(num_del_at_nodes)]
container_info = self.fake_container_info(
{'nodes': containers})
backend_headers = controller._backend_requests(
req, self.replicas(policy), 1, containers,
req, self.replicas(policy), container_info,
delete_at_container='dac', delete_at_partition=2,
delete_at_nodes=del_at_nodes)
@ -1180,8 +1193,11 @@ class CommonObjectControllerMixin(BaseObjectControllerMixin):
{'ip': '1.0.0.%s' % i, 'port': '60%s' % str(i).zfill(2),
'device': 'sdb'} for i in range(num_containers)]
container_info = self.fake_container_info(
{'nodes': containers})
backend_headers = controller._backend_requests(
req, self.replicas(policy), 1, containers,
req, self.replicas(policy), container_info,
delete_at_container='dac', delete_at_partition=2,
delete_at_nodes=del_at_nodes)
@ -2435,7 +2451,7 @@ class TestReplicatedObjController(CommonObjectControllerMixin,
self.app = PatchedObjControllerApp(
conf, account_ring=FakeRing(),
container_ring=FakeRing(), logger=None)
self.app.container_info = dict(self.container_info)
self.app.container_info = dict(self.fake_container_info())
self.obj_ring = self.app.get_object_ring(int(self.policy))
post_headers = []
@ -2459,7 +2475,7 @@ class TestReplicatedObjController(CommonObjectControllerMixin,
self.app = PatchedObjControllerApp(
conf, account_ring=FakeRing(),
container_ring=FakeRing(), logger=None)
self.app.container_info = dict(self.container_info)
self.app.container_info = dict(self.fake_container_info())
self.obj_ring = self.app.get_object_ring(int(self.policy))
post_headers = []

View File

@ -72,10 +72,13 @@ from swift.common.exceptions import ChunkReadTimeout, DiskFileNotExist, \
APIVersionError, ChunkReadError
from swift.common import utils, constraints, registry
from swift.common.utils import hash_path, storage_directory, \
parse_content_type, parse_mime_headers, iter_multipart_mime_documents, \
public, mkdirs, NullLogger, md5, node_to_string, NamespaceBoundList
ShardRange, parse_content_type, parse_mime_headers, \
iter_multipart_mime_documents, public, mkdirs, NullLogger, md5, \
node_to_string, NamespaceBoundList
from swift.common.wsgi import loadapp, ConfigString
from swift.common.http_protocol import SwiftHttpProtocol
from swift.container.backend import NOTFOUND, UNSHARDED, SHARDING, SHARDED, \
COLLAPSED
from swift.proxy.controllers import base as proxy_base
from swift.proxy.controllers.base import get_cache_key, cors_validation, \
get_account_info, get_container_info
@ -2345,6 +2348,7 @@ class BaseTestObjectController(object):
A root of TestObjController that implements helper methods for child
TestObjControllers.
"""
def setUp(self):
# clear proxy logger result for each test
_test_servers[0].logger._clear()
@ -2567,6 +2571,7 @@ class TestReplicatedObjectController(
"""
Test suite for replication policy
"""
def setUp(self):
skip_if_no_xattrs()
_test_servers[0].error_limiter.stats.clear() # clear out errors
@ -7326,7 +7331,8 @@ class TestReplicatedObjectController(
**kwargs):
header_list = kwargs.pop('header_list', ['X-Container-Device',
'X-Container-Host',
'X-Container-Partition'])
'X-Container-Partition',
'X-Container-Root-Db-State'])
seen_headers = []
def capture_headers(ipaddr, port, device, partition, method,
@ -7344,11 +7350,78 @@ class TestReplicatedObjectController(
resp = controller_call(req)
self.assertEqual(2, resp.status_int // 100) # sanity check
# don't care about the account/container HEADs, so chuck
# the first two requests
return sorted(seen_headers[2:],
if kwargs.get('no_heads', False):
results = seen_headers
else:
# don't care about the account/container HEADs, so chuck
# the first two requests
results = seen_headers[2:]
return sorted(results,
key=lambda d: d.get(header_list[0]) or 'z')
def test_x_container_headers_db_states(self):
# let's force the db_states by inserting a crafted container
# info into info cache
crafted_container_info = {
'status': 200, 'read_acl': None, 'write_acl': None,
'sync_to': None, 'sync_key': None, 'object_count': 0, 'bytes': 0,
'versions': None, 'storage_policy': 0,
'cors': {
'allow_origin': None, 'expose_headers': None, 'max_age': None},
'meta': {}, 'sysmeta': {},
'created_at': '1', 'put_timestamp': None,
'delete_timestamp': None,
'status_changed_at': None
}
shardrange = ShardRange('.sharded_a/c_something', 0, 'm', 'z')
# We should always get X-Container-Root-Db-State with the current
# db_state and when db_state is either sharding or sharded we should
# also get an X-Backend-Quoted-Container-Path with a shard name.
for db_state, expect_cont_path in (
(NOTFOUND, False), (UNSHARDED, False), (SHARDING, True),
(SHARDED, True), (COLLAPSED, False)):
crafted_container_info['sharding_state'] = db_state
req = Request.blank(
'/v1/a/c/o',
environ={'REQUEST_METHOD': 'PUT', 'swift.infocache': {}},
headers={'Content-Length': '5'}, body='12345')
req.environ['swift.infocache']['container/a/c'] = \
crafted_container_info
exp_seen_header_list = [
'X-Container-Device', 'X-Container-Host',
'X-Container-Partition', 'X-Container-Root-Db-State']
expected_headers = [
{'X-Container-Host': '10.0.0.0:1000',
'X-Container-Partition': '0',
'X-Container-Device': 'sda',
'X-Container-Root-Db-State': db_state},
{'X-Container-Host': '10.0.0.1:1001',
'X-Container-Partition': '0',
'X-Container-Device': 'sdb',
'X-Container-Root-Db-State': db_state},
{'X-Container-Host': '10.0.0.2:1002',
'X-Container-Partition': '0',
'X-Container-Device': 'sdc',
'X-Container-Root-Db-State': db_state}]
if expect_cont_path:
exp_seen_header_list.append('X-Backend-Quoted-Container-Path')
for headers in expected_headers:
headers['X-Backend-Quoted-Container-Path'] = \
shardrange.name
with mock.patch('swift.proxy.controllers.obj.BaseObjectController.'
'_get_update_shard', return_value=shardrange):
controller = ReplicatedObjectController(
self.app, 'a', 'c', 'o')
seen_headers = self._gather_x_container_headers(
controller.PUT, req,
201, 201, 201, # PUT PUT PUT
header_list=exp_seen_header_list, no_heads=True)
self.assertEqual(seen_headers, expected_headers)
def test_PUT_x_container_headers_with_equal_replicas(self):
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '5'}, body='12345')
@ -7361,13 +7434,16 @@ class TestReplicatedObjectController(
seen_headers, [
{'X-Container-Host': '10.0.0.0:1000',
'X-Container-Partition': '0',
'X-Container-Device': 'sda'},
'X-Container-Device': 'sda',
'X-Container-Root-Db-State': 'unsharded'},
{'X-Container-Host': '10.0.0.1:1001',
'X-Container-Partition': '0',
'X-Container-Device': 'sdb'},
'X-Container-Device': 'sdb',
'X-Container-Root-Db-State': 'unsharded'},
{'X-Container-Host': '10.0.0.2:1002',
'X-Container-Partition': '0',
'X-Container-Device': 'sdc'}])
'X-Container-Device': 'sdc',
'X-Container-Root-Db-State': 'unsharded'}])
def test_PUT_x_container_headers_with_fewer_container_replicas(self):
self.app.container_ring.set_replicas(2)
@ -7384,13 +7460,16 @@ class TestReplicatedObjectController(
seen_headers, [
{'X-Container-Host': '10.0.0.0:1000',
'X-Container-Partition': '0',
'X-Container-Device': 'sda'},
'X-Container-Device': 'sda',
'X-Container-Root-Db-State': 'unsharded'},
{'X-Container-Host': '10.0.0.1:1001',
'X-Container-Partition': '0',
'X-Container-Device': 'sdb'},
'X-Container-Device': 'sdb',
'X-Container-Root-Db-State': 'unsharded'},
{'X-Container-Host': None,
'X-Container-Partition': None,
'X-Container-Device': None}])
'X-Container-Device': None,
'X-Container-Root-Db-State': None}])
def test_PUT_x_container_headers_with_many_object_replicas(self):
POLICIES[0].object_ring.set_replicas(11)
@ -7409,16 +7488,20 @@ class TestReplicatedObjectController(
for h in seen_headers)),
{(('X-Container-Device', 'sda'),
('X-Container-Host', '10.0.0.0:1000'),
('X-Container-Partition', '0')): 3,
('X-Container-Partition', '0'),
('X-Container-Root-Db-State', 'unsharded')): 3,
(('X-Container-Device', 'sdb'),
('X-Container-Host', '10.0.0.1:1001'),
('X-Container-Partition', '0')): 2,
('X-Container-Partition', '0'),
('X-Container-Root-Db-State', 'unsharded')): 2,
(('X-Container-Device', 'sdc'),
('X-Container-Host', '10.0.0.2:1002'),
('X-Container-Partition', '0')): 2,
('X-Container-Partition', '0'),
('X-Container-Root-Db-State', 'unsharded')): 2,
(('X-Container-Device', None),
('X-Container-Host', None),
('X-Container-Partition', None)): 4})
('X-Container-Partition', None),
('X-Container-Root-Db-State', None)): 4})
def test_PUT_x_container_headers_with_more_container_replicas(self):
self.app.container_ring.set_replicas(4)
@ -7435,13 +7518,16 @@ class TestReplicatedObjectController(
seen_headers, [
{'X-Container-Host': '10.0.0.0:1000,10.0.0.3:1003',
'X-Container-Partition': '0',
'X-Container-Device': 'sda,sdd'},
'X-Container-Device': 'sda,sdd',
'X-Container-Root-Db-State': 'unsharded'},
{'X-Container-Host': '10.0.0.1:1001',
'X-Container-Partition': '0',
'X-Container-Device': 'sdb'},
'X-Container-Device': 'sdb',
'X-Container-Root-Db-State': 'unsharded'},
{'X-Container-Host': '10.0.0.2:1002',
'X-Container-Partition': '0',
'X-Container-Device': 'sdc'}])
'X-Container-Device': 'sdc',
'X-Container-Root-Db-State': 'unsharded'}])
def test_POST_x_container_headers_with_more_container_replicas(self):
self.app.container_ring.set_replicas(4)
@ -7459,13 +7545,16 @@ class TestReplicatedObjectController(
seen_headers, [
{'X-Container-Host': '10.0.0.0:1000,10.0.0.3:1003',
'X-Container-Partition': '0',
'X-Container-Device': 'sda,sdd'},
'X-Container-Device': 'sda,sdd',
'X-Container-Root-Db-State': 'unsharded'},
{'X-Container-Host': '10.0.0.1:1001',
'X-Container-Partition': '0',
'X-Container-Device': 'sdb'},
'X-Container-Device': 'sdb',
'X-Container-Root-Db-State': 'unsharded'},
{'X-Container-Host': '10.0.0.2:1002',
'X-Container-Partition': '0',
'X-Container-Device': 'sdc'}])
'X-Container-Device': 'sdc',
'X-Container-Root-Db-State': 'unsharded'}])
def test_DELETE_x_container_headers_with_more_container_replicas(self):
self.app.container_ring.set_replicas(4)
@ -7479,16 +7568,20 @@ class TestReplicatedObjectController(
controller.DELETE, req,
200, 200, 200, 200, 200) # HEAD HEAD DELETE DELETE DELETE
self.maxDiff = None
self.assertEqual(seen_headers, [
{'X-Container-Host': '10.0.0.0:1000,10.0.0.3:1003',
'X-Container-Partition': '0',
'X-Container-Device': 'sda,sdd'},
'X-Container-Device': 'sda,sdd',
'X-Container-Root-Db-State': 'unsharded'},
{'X-Container-Host': '10.0.0.1:1001',
'X-Container-Partition': '0',
'X-Container-Device': 'sdb'},
'X-Container-Device': 'sdb',
'X-Container-Root-Db-State': 'unsharded'},
{'X-Container-Host': '10.0.0.2:1002',
'X-Container-Partition': '0',
'X-Container-Device': 'sdc'}
'X-Container-Device': 'sdc',
'X-Container-Root-Db-State': 'unsharded'}
])
@mock.patch('time.time', new=lambda: STATIC_TIME)
@ -11684,6 +11777,7 @@ class TestAccountControllerFakeGetResponse(unittest.TestCase):
Test all the faked-out GET responses for accounts that don't exist. They
have to match the responses for empty accounts that really exist.
"""
def setUp(self):
conf = {'account_autocreate': 'yes'}
self.app = listing_formats.ListingFilter(