diff --git a/swift/obj/server.py b/swift/obj/server.py index df170ae194..cb0008f9a0 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -272,7 +272,8 @@ class ObjectController(BaseStorageServer): def async_update(self, op, account, container, obj, host, partition, contdevice, headers_out, objdevice, policy, - logger_thread_locals=None, container_path=None): + logger_thread_locals=None, container_path=None, + db_state=None): """ Sends or saves an async update. @@ -294,6 +295,8 @@ class ObjectController(BaseStorageServer): to which the update should be sent. If given this path will be used instead of constructing a path from the ``account`` and ``container`` params. + :param db_state: The current database state of the container as + supplied to us by the proxy. """ if logger_thread_locals: self.logger.thread_locals = logger_thread_locals @@ -337,7 +340,7 @@ class ObjectController(BaseStorageServer): '%(ip)s:%(port)s/%(dev)s (saving for async update later)', {'ip': ip, 'port': port, 'dev': contdevice}) data = {'op': op, 'account': account, 'container': container, - 'obj': obj, 'headers': headers_out} + 'obj': obj, 'headers': headers_out, 'db_state': db_state} if redirect_data: self.logger.debug( 'Update to %(path)s redirected to %(redirect)s', @@ -371,6 +374,7 @@ class ObjectController(BaseStorageServer): contdevices = [d.strip() for d in headers_in.get('X-Container-Device', '').split(',')] contpartition = headers_in.get('X-Container-Partition', '') + contdbstate = headers_in.get('X-Container-Root-Db-State') if len(conthosts) != len(contdevices): # This shouldn't happen unless there's a bug in the proxy, @@ -421,7 +425,7 @@ class ObjectController(BaseStorageServer): conthost, contpartition, contdevice, headers_out, objdevice, policy, logger_thread_locals=self.logger.thread_locals, - container_path=contpath) + container_path=contpath, db_state=contdbstate) update_greenthreads.append(gt) # Wait a little bit to see if the container updates are successful. # If we immediately return after firing off the greenthread above, then diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 94e6399521..91da58c3fe 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -391,9 +391,10 @@ class BaseObjectController(Controller): if update_shard_ns: partition, nodes = self.app.container_ring.get_nodes( update_shard_ns.account, update_shard_ns.container) - return partition, nodes, update_shard_ns.name + return partition, nodes, update_shard_ns.name, db_state - return container_info['partition'], container_info['nodes'], None + return (container_info['partition'], container_info['nodes'], None, + db_state) @public @cors_validation @@ -402,8 +403,6 @@ class BaseObjectController(Controller): """HTTP POST request handler.""" container_info = self.container_info( self.account_name, self.container_name, req) - container_partition, container_nodes, container_path = \ - self._get_update_target(req, container_info) req.acl = container_info['write_acl'] if is_open_expired(self.app, req): req.headers['X-Backend-Open-Expired'] = 'true' @@ -411,7 +410,7 @@ class BaseObjectController(Controller): aresp = req.environ['swift.authorize'](req) if aresp: return aresp - if not container_nodes: + if not is_success(container_info.get('status')): return HTTPNotFound(request=req) error_response = check_metadata(req, 'object') if error_response: @@ -434,17 +433,17 @@ class BaseObjectController(Controller): self.account_name, self.container_name, self.object_name) headers = self._backend_requests( - req, len(nodes), container_partition, container_nodes, - delete_at_container, delete_at_part, delete_at_nodes, - container_path=container_path) + req, len(nodes), container_info, delete_at_container, + delete_at_part, delete_at_nodes) return self._post_object(req, obj_ring, partition, headers) def _backend_requests(self, req, n_outgoing, - container_partition, containers, - delete_at_container=None, delete_at_partition=None, - delete_at_nodes=None, container_path=None): + container_info, delete_at_container=None, + delete_at_partition=None, delete_at_nodes=None): policy_index = req.headers['X-Backend-Storage-Policy-Index'] policy = POLICIES.get_by_index(policy_index) + container_partition, containers, container_path, db_state = \ + self._get_update_target(req, container_info) headers = [self.generate_request_headers(req, additional=req.headers) for _junk in range(n_outgoing)] @@ -457,6 +456,7 @@ class BaseObjectController(Controller): headers[index]['X-Container-Device'] = csv_append( headers[index].get('X-Container-Device'), container_node['device']) + headers[index]['X-Container-Root-Db-State'] = db_state if container_path: headers[index]['X-Backend-Quoted-Container-Path'] = quote( container_path) @@ -848,8 +848,6 @@ class BaseObjectController(Controller): policy_index = req.headers.get('X-Backend-Storage-Policy-Index', container_info['storage_policy']) obj_ring = self.app.get_object_ring(policy_index) - container_partition, container_nodes, container_path = \ - self._get_update_target(req, container_info) partition, nodes = obj_ring.get_nodes( self.account_name, self.container_name, self.object_name) @@ -867,7 +865,7 @@ class BaseObjectController(Controller): if aresp: return aresp - if not container_nodes: + if not is_success(container_info.get('status')): return HTTPNotFound(request=req) # update content type in case it is missing @@ -895,9 +893,8 @@ class BaseObjectController(Controller): # add special headers to be handled by storage nodes outgoing_headers = self._backend_requests( - req, len(nodes), container_partition, container_nodes, - delete_at_container, delete_at_part, delete_at_nodes, - container_path=container_path) + req, len(nodes), container_info, + delete_at_container, delete_at_part, delete_at_nodes) # send object to storage nodes resp = self._store_object( @@ -920,15 +917,13 @@ class BaseObjectController(Controller): next_part_power = getattr(obj_ring, 'next_part_power', None) if next_part_power: req.headers['X-Backend-Next-Part-Power'] = next_part_power - container_partition, container_nodes, container_path = \ - self._get_update_target(req, container_info) req.acl = container_info['write_acl'] req.environ['swift_sync_key'] = container_info['sync_key'] if 'swift.authorize' in req.environ: aresp = req.environ['swift.authorize'](req) if aresp: return aresp - if not container_nodes: + if not is_success(container_info.get('status')): return HTTPNotFound(request=req) partition, nodes = obj_ring.get_nodes( self.account_name, self.container_name, self.object_name) @@ -951,9 +946,7 @@ class BaseObjectController(Controller): obj_ring, partition, req, policy=policy, local_handoffs_first=True) - headers = self._backend_requests( - req, node_count, container_partition, container_nodes, - container_path=container_path) + headers = self._backend_requests(req, node_count, container_info) return self._delete_object(req, obj_ring, partition, headers, node_count=node_count, node_iterator=node_iterator) diff --git a/test/probe/common.py b/test/probe/common.py index 54212bf8a1..e980f99f46 100644 --- a/test/probe/common.py +++ b/test/probe/common.py @@ -555,16 +555,18 @@ class ProbeTest(unittest.TestCase): for policy in ENABLED_POLICIES: for dev in policy.object_ring.devs: all_obj_nodes[dev['device']] = dev - return all_obj_nodes.values() + return list(all_obj_nodes.values()) - def gather_async_pendings(self, onodes): + def gather_async_pendings(self, onodes=None): """ Returns a list of paths to async pending files found on given nodes. - :param onodes: a list of nodes. + :param onodes: a list of nodes. If None, check all object nodes. :return: a list of file paths. """ async_pendings = [] + if onodes is None: + onodes = self.get_all_object_nodes() for onode in onodes: device_dir = self.device_dir(onode) for ap_pol_dir in os.listdir(device_dir): diff --git a/test/probe/test_object_expirer.py b/test/probe/test_object_expirer.py index d0cbebf098..c0d4e734c9 100644 --- a/test/probe/test_object_expirer.py +++ b/test/probe/test_object_expirer.py @@ -170,12 +170,11 @@ class TestObjectExpirer(ReplProbeTest): # Make sure there's no async_pendings anywhere. Probe tests only run # on single-node installs anyway, so this set should be small enough # that an exhaustive check doesn't take too long. - all_obj_nodes = self.get_all_object_nodes() - pendings_before = self.gather_async_pendings(all_obj_nodes) + pendings_before = self.gather_async_pendings() # expire the objects Manager(['object-expirer']).once() - pendings_after = self.gather_async_pendings(all_obj_nodes) + pendings_after = self.gather_async_pendings() self.assertEqual(pendings_after, pendings_before) def test_expirer_object_should_not_be_expired(self): diff --git a/test/probe/test_sharder.py b/test/probe/test_sharder.py index c8aa662955..efef292081 100644 --- a/test/probe/test_sharder.py +++ b/test/probe/test_sharder.py @@ -14,6 +14,7 @@ # limitations under the License. import json import os +import pickle import shutil import subprocess import unittest @@ -701,8 +702,8 @@ class TestContainerShardingNonUTF8(BaseAutoContainerSharding): class TestContainerShardingFunkyNames(TestContainerShardingNonUTF8): DELIM = '\n' - def _make_object_names(self, number): - return ['obj\n%04d%%Ff' % x for x in range(number)] + def _make_object_names(self, number, start=0): + return ['obj\n%04d%%Ff' % x for x in range(start, start + number)] def _setup_container_name(self): self.container_name = 'container\n%%Ff\n%s' % uuid.uuid4() @@ -1446,6 +1447,37 @@ class TestContainerSharding(BaseAutoContainerSharding): additional_args='--partitions=%s' % self.brain.part) self.assert_container_listing(obj_names, req_hdrs={'x-newest': 'true'}) + def assertInAsyncFile(self, async_path, expected): + with open(async_path, 'rb') as fd: + async_data = pickle.load(fd) + + errors = [] + for k, v in expected.items(): + if k not in async_data: + errors.append("Key '%s' does not exist" % k) + continue + if async_data[k] != v: + errors.append( + "Exp value %s != %s" % (str(v), str(async_data[k]))) + continue + + if errors: + self.fail('\n'.join(errors)) + + def assertNotInAsyncFile(self, async_path, not_expect_keys): + with open(async_path, 'rb') as fd: + async_data = pickle.load(fd) + + errors = [] + for k in not_expect_keys: + if k in async_data: + errors.append( + "Key '%s' exists with value '%s'" % (k, async_data[k])) + continue + + if errors: + self.fail('\n'.join(errors)) + def test_async_pendings(self): obj_names = self._make_object_names(self.max_shard_size * 2) @@ -1459,6 +1491,13 @@ class TestContainerSharding(BaseAutoContainerSharding): self.put_objects(obj_names[i::5]) self.brain.servers.start(number=n) + # Check the async pendings, they are unsharded so that's the db_state + async_files = self.gather_async_pendings() + self.assertTrue(async_files) + for af in async_files: + self.assertInAsyncFile(af, {'db_state': 'unsharded'}) + self.assertNotInAsyncFile(af, ['container_path']) + # But there are also 1/5 updates *no one* gets self.brain.servers.stop() self.put_objects(obj_names[4::5]) @@ -1542,6 +1581,9 @@ class TestContainerSharding(BaseAutoContainerSharding): # Run updaters to clear the async pendings Manager(['object-updater']).once() + async_files = self.gather_async_pendings() + self.assertFalse(async_files) + # Our "big" dbs didn't take updates for db_file in found['normal_dbs']: broker = ContainerBroker(db_file) @@ -1606,6 +1648,66 @@ class TestContainerSharding(BaseAutoContainerSharding): for x in listing], obj_names) + # Create a few more objects in async pending. Check them, they should + # now have the correct db_state as sharded + more_obj_names = self._make_object_names(10, self.max_shard_size * 2) + + # No one should get these updates + self.brain.servers.stop() + self.put_objects(more_obj_names) + self.brain.servers.start() + + async_files = self.gather_async_pendings() + self.assertTrue(async_files) + for af in async_files: + # They should have a sharded db_state + self.assertInAsyncFile(af, {'db_state': 'sharded'}) + # But because the container-servers were down, they wont have + # container-path (because it couldn't get a shard range back) + self.assertNotInAsyncFile(af, ['container_path']) + + # they don't exist yet + headers, listing = client.get_container(self.url, self.token, + self.container_name) + self.assertEqual([x['name'].encode('utf-8') if six.PY2 else x['name'] + for x in listing], + obj_names) + + # Now clear them out and they should now exist where we expect. + Manager(['object-updater']).once() + headers, listing = client.get_container(self.url, self.token, + self.container_name) + self.assertEqual([x['name'].encode('utf-8') if six.PY2 else x['name'] + for x in listing], + obj_names + more_obj_names) + + # And they're cleared up + async_files = self.gather_async_pendings() + self.assertFalse(async_files) + + # If we take 1/2 the nodes offline when we add some more objects, + # we should get async pendings with container-path because there + # was a container-server to respond. + even_more_obj_names = self._make_object_names( + 10, self.max_shard_size * 2 + 10) + + self.brain.stop_primary_half() + self.put_objects(even_more_obj_names) + self.brain.start_primary_half() + + async_files = self.gather_async_pendings() + self.assertTrue(async_files) + for af in async_files: + # They should have a sharded db_state AND container_path + self.assertInAsyncFile(af, {'db_state': 'sharded', + 'container_path': mock.ANY}) + + Manager(['object-updater']).once() + + # And they're cleared up + async_files = self.gather_async_pendings() + self.assertFalse(async_files) + def test_shrinking(self): int_client = self.make_internal_client() @@ -1783,8 +1885,7 @@ class TestContainerSharding(BaseAutoContainerSharding): self.put_objects([beta]) self.brain.servers.start() - async_pendings = self.gather_async_pendings( - self.get_all_object_nodes()) + async_pendings = self.gather_async_pendings() num_container_replicas = len(self.brain.nodes) num_obj_replicas = self.policy.object_ring.replica_count expected_num_updates = num_container_updates( @@ -2449,8 +2550,7 @@ class TestContainerSharding(BaseAutoContainerSharding): self.put_objects(['alpha']) self.assert_container_listing(['alpha']) self.assert_container_object_count(0) - self.assertLengthEqual( - self.gather_async_pendings(self.get_all_object_nodes()), 1) + self.assertLengthEqual(self.gather_async_pendings(), 1) self.brain.servers.start(number=shard_nodes[2]) # run sharder on root to discover first shrink candidate @@ -2497,8 +2597,7 @@ class TestContainerSharding(BaseAutoContainerSharding): self.put_objects(['beta']) self.assert_container_listing(['beta']) self.assert_container_object_count(1) - self.assertLengthEqual( - self.gather_async_pendings(self.get_all_object_nodes()), 2) + self.assertLengthEqual(self.gather_async_pendings(), 2) self.brain.servers.start(number=shard_nodes[2]) # run sharder on root to discover second shrink candidate - root is not @@ -3097,11 +3196,11 @@ class TestShardedAPI(BaseTestContainerSharding): class TestContainerShardingMoreUTF8(TestContainerSharding): - def _make_object_names(self, number): + def _make_object_names(self, number, start=0): # override default with names that include non-ascii chars name_length = self.cluster_info['swift']['max_object_name_length'] obj_names = [] - for x in range(number): + for x in range(start, start + number): name = (u'obj-\u00e4\u00ea\u00ec\u00f2\u00fb-%04d' % x) name = name.encode('utf8').ljust(name_length, b'o') if not six.PY2: diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index d07486292d..d20a594681 100644 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -978,7 +978,8 @@ class TestObjectController(BaseTestCase): 'X-Backend-Storage-Policy-Index': int(policy), 'X-Container-Host': 'chost:cport', 'X-Container-Partition': 'cpartition', - 'X-Container-Device': 'cdevice'} + 'X-Container-Device': 'cdevice', + 'X-Container-Root-Db-State': 'unsharded'} if policy.policy_type == EC_POLICY: put_headers.update({ 'X-Object-Sysmeta-Ec-Frag-Index': '2', @@ -1016,7 +1017,8 @@ class TestObjectController(BaseTestCase): self.assertDictEqual( pickle.load(open(async_pending_file_put, 'rb')), {'headers': expected_put_headers, - 'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT'}) + 'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT', + 'db_state': 'unsharded'}) # POST with newer metadata returns success and container update # is expected @@ -1028,7 +1030,8 @@ class TestObjectController(BaseTestCase): 'X-Backend-Storage-Policy-Index': int(policy), 'X-Container-Host': 'chost:cport', 'X-Container-Partition': 'cpartition', - 'X-Container-Device': 'cdevice'} + 'X-Container-Device': 'cdevice', + 'X-Container-Root-Db-State': 'unsharded'} req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'}, headers=post_headers) @@ -1045,7 +1048,8 @@ class TestObjectController(BaseTestCase): self.assertDictEqual( pickle.load(open(async_pending_file_put, 'rb')), {'headers': expected_put_headers, - 'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT'}) + 'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT', + 'db_state': 'unsharded'}) # check distinct async pending file for POST async_pending_file_post = os.path.join( @@ -1071,7 +1075,8 @@ class TestObjectController(BaseTestCase): self.assertDictEqual( pickle.load(open(async_pending_file_post, 'rb')), {'headers': expected_post_headers, - 'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT'}) + 'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT', + 'db_state': 'unsharded'}) # verify that only the POST (most recent) async update gets sent by the # object updater, and that both update files are deleted @@ -1124,13 +1129,15 @@ class TestObjectController(BaseTestCase): 'X-Backend-Storage-Policy-Index': int(policy), 'X-Container-Host': 'chost:3200', 'X-Container-Partition': '99', - 'X-Container-Device': 'cdevice'} + 'X-Container-Device': 'cdevice', + 'X-Container-Root-Db-State': 'unsharded'} if container_path: # the proxy may include either header hdr = ('X-Backend-Container-Path' if old_style else 'X-Backend-Quoted-Container-Path') put_headers[hdr] = container_path + put_headers['X-Container-Root-Db-State'] = 'sharded' expected_update_path = '/cdevice/99/%s/o' % container_path else: expected_update_path = '/cdevice/99/a/c/o' @@ -1180,7 +1187,8 @@ class TestObjectController(BaseTestCase): self.assertEqual( {'headers': expected_put_headers, 'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT', - 'container_path': '.sharded_a/c_shard_1'}, + 'container_path': '.sharded_a/c_shard_1', + 'db_state': 'sharded' if container_path else 'unsharded'}, pickle.load(open(async_pending_file_put, 'rb'))) # when updater is run its first request will be to the redirect @@ -5940,7 +5948,7 @@ class TestObjectController(BaseTestCase): 'PUT', 'a', 'c', 'o', '127.0.0.1:1234', 1, 'sdc1', {'x-timestamp': '1', 'x-out': 'set', 'X-Backend-Storage-Policy-Index': int(policy)}, 'sda1', - policy) + policy, db_state='unsharded') finally: object_server.http_connect = orig_http_connect utils.HASH_PATH_PREFIX = _prefix @@ -5953,7 +5961,8 @@ class TestObjectController(BaseTestCase): {'headers': {'x-timestamp': '1', 'x-out': 'set', 'user-agent': 'object-server %s' % os.getpid(), 'X-Backend-Storage-Policy-Index': int(policy)}, - 'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT'}) + 'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT', + 'db_state': 'unsharded'}) def test_async_update_saves_on_non_2xx(self): policy = random.choice(list(POLICIES)) @@ -5984,7 +5993,7 @@ class TestObjectController(BaseTestCase): 'PUT', 'a', 'c', 'o', '127.0.0.1:1234', 1, 'sdc1', {'x-timestamp': '1', 'x-out': str(status), 'X-Backend-Storage-Policy-Index': int(policy)}, 'sda1', - policy) + policy, db_state='unsharded') async_dir = diskfile.get_async_dir(policy) self.assertEqual( pickle.load(open(os.path.join( @@ -5997,7 +6006,7 @@ class TestObjectController(BaseTestCase): 'X-Backend-Storage-Policy-Index': int(policy)}, 'account': 'a', 'container': 'c', 'obj': 'o', - 'op': 'PUT'}) + 'op': 'PUT', 'db_state': 'unsharded'}) finally: object_server.http_connect = orig_http_connect utils.HASH_PATH_PREFIX = _prefix @@ -6224,12 +6233,14 @@ class TestObjectController(BaseTestCase): 'X-Container-Host': 'chost:cport', 'X-Container-Partition': 'cpartition', 'X-Container-Device': 'cdevice', + 'X-Container-Root-Db-State': 'unsharded', 'Content-Type': 'text/plain', 'X-Object-Sysmeta-Ec-Frag-Index': 0, 'X-Backend-Storage-Policy-Index': int(policy), } if container_path is not None: headers['X-Backend-Container-Path'] = container_path + headers['X-Container-Root-Db-State'] = 'sharded' req = Request.blank('/sda1/0/a/c/o', method='PUT', headers=headers, body='') @@ -6271,9 +6282,12 @@ class TestObjectController(BaseTestCase): 'obj': 'o', 'account': 'a', 'container': 'c', - 'op': 'PUT'} + 'op': 'PUT', + 'db_state': 'unsharded'} if expected_container_path: expected_data['container_path'] = expected_container_path + if container_path is not None: + expected_data['db_state'] = 'sharded' self.assertEqual(expected_data, data) do_test('a_shard/c_shard', 'a_shard/c_shard', 'a_shard/c_shard') @@ -6314,12 +6328,14 @@ class TestObjectController(BaseTestCase): 'X-Container-Host': 'chost:cport', 'X-Container-Partition': 'cpartition', 'X-Container-Device': 'cdevice', + 'X-Container-Root-Db-State': 'unsharded', 'Content-Type': 'text/plain', 'X-Object-Sysmeta-Ec-Frag-Index': 0, 'X-Backend-Storage-Policy-Index': int(policy), } if container_path is not None: headers['X-Backend-Quoted-Container-Path'] = container_path + headers['X-Container-Root-Db-State'] = 'sharded' req = Request.blank('/sda1/0/a/c/o', method='PUT', headers=headers, body='') @@ -6361,9 +6377,12 @@ class TestObjectController(BaseTestCase): 'obj': 'o', 'account': 'a', 'container': 'c', - 'op': 'PUT'} + 'op': 'PUT', + 'db_state': 'unsharded'} if expected_container_path: expected_data['container_path'] = expected_container_path + if container_path is not None: + expected_data['db_state'] = 'sharded' self.assertEqual(expected_data, data) do_test('a_shard/c_shard', 'a_shard/c_shard', 'a_shard/c_shard') @@ -6388,6 +6407,7 @@ class TestObjectController(BaseTestCase): 'X-Container-Host': 'chost:cport', 'X-Container-Partition': 'cpartition', 'X-Container-Device': 'cdevice', + 'X-Container-Root-Db-State': 'unsharded', 'Content-Type': 'text/plain', 'X-Object-Sysmeta-Ec-Frag-Index': 0, 'X-Backend-Storage-Policy-Index': int(policy)}, body='') @@ -6426,7 +6446,8 @@ class TestObjectController(BaseTestCase): 'obj': 'o', 'account': 'a', 'container': 'c', - 'op': 'PUT'}) + 'op': 'PUT', + 'db_state': 'unsharded'}) def test_container_update_as_greenthread(self): greenthreads = [] @@ -6450,7 +6471,8 @@ class TestObjectController(BaseTestCase): 'X-Backend-Storage-Policy-Index': 0, 'X-Container-Partition': '20', 'X-Container-Host': '1.2.3.4:5', - 'X-Container-Device': 'sdb1'}) + 'X-Container-Device': 'sdb1', + 'X-Container-Root-Db-State': 'unsharded'}) with mock.patch.object(object_server, 'spawn', local_fake_spawn), \ mock.patch.object(self.object_controller, 'async_update', local_fake_async_update): @@ -6477,7 +6499,8 @@ class TestObjectController(BaseTestCase): expected = [('PUT', 'a', 'c', 'o', '1.2.3.4:5', '20', 'sdb1', headers_out, 'sda1', POLICIES[0]), {'logger_thread_locals': (None, None), - 'container_path': None}] + 'container_path': None, + 'db_state': 'unsharded'}] self.assertEqual(called_async_update_args, [expected]) def test_container_update_as_greenthread_with_timeout(self): @@ -7944,6 +7967,7 @@ class TestObjectController(BaseTestCase): 'account': '.expiring_objects', 'container': delete_at_container, 'obj': '%s-a/c/o' % put_delete_at, + 'db_state': None, 'headers': { 'X-Backend-Storage-Policy-Index': '0', # only POST-1 has to clear the orig PUT delete-at diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index bee295e113..b8f108bed2 100644 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -163,14 +163,22 @@ def make_footers_callback(body=None): class BaseObjectControllerMixin(object): - container_info = { - 'status': 200, - 'write_acl': None, - 'read_acl': None, - 'storage_policy': None, - 'sync_key': None, - 'versions': None, - } + def fake_container_info(self, extra_info=None): + container_info = { + 'status': 200, + 'read_acl': None, + 'write_acl': None, + 'sync_key': None, + 'versions': None, + 'storage_policy': '0', + 'partition': 50, + 'nodes': [], + 'sharding_state': 'unsharded', + } + + if extra_info: + container_info.update(extra_info) + return container_info # this needs to be set on the test case controller_cls = None @@ -191,7 +199,7 @@ class BaseObjectControllerMixin(object): # you can over-ride the container_info just by setting it on the app # (see PatchedObjControllerApp for details) - self.app.container_info = dict(self.container_info) + self.app.container_info = dict(self.fake_container_info()) # default policy and ring references self.policy = POLICIES.default @@ -878,7 +886,7 @@ class CommonObjectControllerMixin(BaseObjectControllerMixin): conf, account_ring=FakeRing(), container_ring=FakeRing(), logger=None) # Use the same container info as the app used in other tests - self.app.container_info = dict(self.container_info) + self.app.container_info = dict(self.fake_container_info()) self.obj_ring = self.app.get_object_ring(int(self.policy)) for method, num_reqs in ( @@ -920,7 +928,7 @@ class CommonObjectControllerMixin(BaseObjectControllerMixin): conf, account_ring=FakeRing(), container_ring=FakeRing(), logger=None) # Use the same container info as the app used in other tests - self.app.container_info = dict(self.container_info) + self.app.container_info = dict(self.fake_container_info()) self.obj_ring = self.app.get_object_ring(int(self.policy)) for method, num_reqs in ( @@ -1073,8 +1081,10 @@ class CommonObjectControllerMixin(BaseObjectControllerMixin): 'port': '60%s' % str(i).zfill(2), 'device': 'sdb'} for i in range(num_containers)] + container_info = self.fake_container_info( + {'nodes': containers}) backend_headers = controller._backend_requests( - req, self.replicas(policy), 1, containers) + req, self.replicas(policy), container_info) # how many of the backend headers have a container update n_container_updates = len( @@ -1116,8 +1126,11 @@ class CommonObjectControllerMixin(BaseObjectControllerMixin): {'ip': '1.0.0.%s' % i, 'port': '60%s' % str(i).zfill(2), 'device': 'sdb'} for i in range(num_del_at_nodes)] + container_info = self.fake_container_info( + {'nodes': containers}) + backend_headers = controller._backend_requests( - req, self.replicas(policy), 1, containers, + req, self.replicas(policy), container_info, delete_at_container='dac', delete_at_partition=2, delete_at_nodes=del_at_nodes) @@ -1180,8 +1193,11 @@ class CommonObjectControllerMixin(BaseObjectControllerMixin): {'ip': '1.0.0.%s' % i, 'port': '60%s' % str(i).zfill(2), 'device': 'sdb'} for i in range(num_containers)] + container_info = self.fake_container_info( + {'nodes': containers}) + backend_headers = controller._backend_requests( - req, self.replicas(policy), 1, containers, + req, self.replicas(policy), container_info, delete_at_container='dac', delete_at_partition=2, delete_at_nodes=del_at_nodes) @@ -2435,7 +2451,7 @@ class TestReplicatedObjController(CommonObjectControllerMixin, self.app = PatchedObjControllerApp( conf, account_ring=FakeRing(), container_ring=FakeRing(), logger=None) - self.app.container_info = dict(self.container_info) + self.app.container_info = dict(self.fake_container_info()) self.obj_ring = self.app.get_object_ring(int(self.policy)) post_headers = [] @@ -2459,7 +2475,7 @@ class TestReplicatedObjController(CommonObjectControllerMixin, self.app = PatchedObjControllerApp( conf, account_ring=FakeRing(), container_ring=FakeRing(), logger=None) - self.app.container_info = dict(self.container_info) + self.app.container_info = dict(self.fake_container_info()) self.obj_ring = self.app.get_object_ring(int(self.policy)) post_headers = [] diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 46cc3f4be3..220ca4888b 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -72,10 +72,13 @@ from swift.common.exceptions import ChunkReadTimeout, DiskFileNotExist, \ APIVersionError, ChunkReadError from swift.common import utils, constraints, registry from swift.common.utils import hash_path, storage_directory, \ - parse_content_type, parse_mime_headers, iter_multipart_mime_documents, \ - public, mkdirs, NullLogger, md5, node_to_string, NamespaceBoundList + ShardRange, parse_content_type, parse_mime_headers, \ + iter_multipart_mime_documents, public, mkdirs, NullLogger, md5, \ + node_to_string, NamespaceBoundList from swift.common.wsgi import loadapp, ConfigString from swift.common.http_protocol import SwiftHttpProtocol +from swift.container.backend import NOTFOUND, UNSHARDED, SHARDING, SHARDED, \ + COLLAPSED from swift.proxy.controllers import base as proxy_base from swift.proxy.controllers.base import get_cache_key, cors_validation, \ get_account_info, get_container_info @@ -2345,6 +2348,7 @@ class BaseTestObjectController(object): A root of TestObjController that implements helper methods for child TestObjControllers. """ + def setUp(self): # clear proxy logger result for each test _test_servers[0].logger._clear() @@ -2567,6 +2571,7 @@ class TestReplicatedObjectController( """ Test suite for replication policy """ + def setUp(self): skip_if_no_xattrs() _test_servers[0].error_limiter.stats.clear() # clear out errors @@ -7326,7 +7331,8 @@ class TestReplicatedObjectController( **kwargs): header_list = kwargs.pop('header_list', ['X-Container-Device', 'X-Container-Host', - 'X-Container-Partition']) + 'X-Container-Partition', + 'X-Container-Root-Db-State']) seen_headers = [] def capture_headers(ipaddr, port, device, partition, method, @@ -7344,11 +7350,78 @@ class TestReplicatedObjectController( resp = controller_call(req) self.assertEqual(2, resp.status_int // 100) # sanity check - # don't care about the account/container HEADs, so chuck - # the first two requests - return sorted(seen_headers[2:], + if kwargs.get('no_heads', False): + results = seen_headers + else: + # don't care about the account/container HEADs, so chuck + # the first two requests + results = seen_headers[2:] + return sorted(results, key=lambda d: d.get(header_list[0]) or 'z') + def test_x_container_headers_db_states(self): + # let's force the db_states by inserting a crafted container + # info into info cache + crafted_container_info = { + 'status': 200, 'read_acl': None, 'write_acl': None, + 'sync_to': None, 'sync_key': None, 'object_count': 0, 'bytes': 0, + 'versions': None, 'storage_policy': 0, + 'cors': { + 'allow_origin': None, 'expose_headers': None, 'max_age': None}, + 'meta': {}, 'sysmeta': {}, + 'created_at': '1', 'put_timestamp': None, + 'delete_timestamp': None, + 'status_changed_at': None + } + shardrange = ShardRange('.sharded_a/c_something', 0, 'm', 'z') + + # We should always get X-Container-Root-Db-State with the current + # db_state and when db_state is either sharding or sharded we should + # also get an X-Backend-Quoted-Container-Path with a shard name. + for db_state, expect_cont_path in ( + (NOTFOUND, False), (UNSHARDED, False), (SHARDING, True), + (SHARDED, True), (COLLAPSED, False)): + crafted_container_info['sharding_state'] = db_state + req = Request.blank( + '/v1/a/c/o', + environ={'REQUEST_METHOD': 'PUT', 'swift.infocache': {}}, + headers={'Content-Length': '5'}, body='12345') + req.environ['swift.infocache']['container/a/c'] = \ + crafted_container_info + + exp_seen_header_list = [ + 'X-Container-Device', 'X-Container-Host', + 'X-Container-Partition', 'X-Container-Root-Db-State'] + expected_headers = [ + {'X-Container-Host': '10.0.0.0:1000', + 'X-Container-Partition': '0', + 'X-Container-Device': 'sda', + 'X-Container-Root-Db-State': db_state}, + {'X-Container-Host': '10.0.0.1:1001', + 'X-Container-Partition': '0', + 'X-Container-Device': 'sdb', + 'X-Container-Root-Db-State': db_state}, + {'X-Container-Host': '10.0.0.2:1002', + 'X-Container-Partition': '0', + 'X-Container-Device': 'sdc', + 'X-Container-Root-Db-State': db_state}] + if expect_cont_path: + exp_seen_header_list.append('X-Backend-Quoted-Container-Path') + for headers in expected_headers: + headers['X-Backend-Quoted-Container-Path'] = \ + shardrange.name + + with mock.patch('swift.proxy.controllers.obj.BaseObjectController.' + '_get_update_shard', return_value=shardrange): + controller = ReplicatedObjectController( + self.app, 'a', 'c', 'o') + seen_headers = self._gather_x_container_headers( + controller.PUT, req, + 201, 201, 201, # PUT PUT PUT + header_list=exp_seen_header_list, no_heads=True) + + self.assertEqual(seen_headers, expected_headers) + def test_PUT_x_container_headers_with_equal_replicas(self): req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, headers={'Content-Length': '5'}, body='12345') @@ -7361,13 +7434,16 @@ class TestReplicatedObjectController( seen_headers, [ {'X-Container-Host': '10.0.0.0:1000', 'X-Container-Partition': '0', - 'X-Container-Device': 'sda'}, + 'X-Container-Device': 'sda', + 'X-Container-Root-Db-State': 'unsharded'}, {'X-Container-Host': '10.0.0.1:1001', 'X-Container-Partition': '0', - 'X-Container-Device': 'sdb'}, + 'X-Container-Device': 'sdb', + 'X-Container-Root-Db-State': 'unsharded'}, {'X-Container-Host': '10.0.0.2:1002', 'X-Container-Partition': '0', - 'X-Container-Device': 'sdc'}]) + 'X-Container-Device': 'sdc', + 'X-Container-Root-Db-State': 'unsharded'}]) def test_PUT_x_container_headers_with_fewer_container_replicas(self): self.app.container_ring.set_replicas(2) @@ -7384,13 +7460,16 @@ class TestReplicatedObjectController( seen_headers, [ {'X-Container-Host': '10.0.0.0:1000', 'X-Container-Partition': '0', - 'X-Container-Device': 'sda'}, + 'X-Container-Device': 'sda', + 'X-Container-Root-Db-State': 'unsharded'}, {'X-Container-Host': '10.0.0.1:1001', 'X-Container-Partition': '0', - 'X-Container-Device': 'sdb'}, + 'X-Container-Device': 'sdb', + 'X-Container-Root-Db-State': 'unsharded'}, {'X-Container-Host': None, 'X-Container-Partition': None, - 'X-Container-Device': None}]) + 'X-Container-Device': None, + 'X-Container-Root-Db-State': None}]) def test_PUT_x_container_headers_with_many_object_replicas(self): POLICIES[0].object_ring.set_replicas(11) @@ -7409,16 +7488,20 @@ class TestReplicatedObjectController( for h in seen_headers)), {(('X-Container-Device', 'sda'), ('X-Container-Host', '10.0.0.0:1000'), - ('X-Container-Partition', '0')): 3, + ('X-Container-Partition', '0'), + ('X-Container-Root-Db-State', 'unsharded')): 3, (('X-Container-Device', 'sdb'), ('X-Container-Host', '10.0.0.1:1001'), - ('X-Container-Partition', '0')): 2, + ('X-Container-Partition', '0'), + ('X-Container-Root-Db-State', 'unsharded')): 2, (('X-Container-Device', 'sdc'), ('X-Container-Host', '10.0.0.2:1002'), - ('X-Container-Partition', '0')): 2, + ('X-Container-Partition', '0'), + ('X-Container-Root-Db-State', 'unsharded')): 2, (('X-Container-Device', None), ('X-Container-Host', None), - ('X-Container-Partition', None)): 4}) + ('X-Container-Partition', None), + ('X-Container-Root-Db-State', None)): 4}) def test_PUT_x_container_headers_with_more_container_replicas(self): self.app.container_ring.set_replicas(4) @@ -7435,13 +7518,16 @@ class TestReplicatedObjectController( seen_headers, [ {'X-Container-Host': '10.0.0.0:1000,10.0.0.3:1003', 'X-Container-Partition': '0', - 'X-Container-Device': 'sda,sdd'}, + 'X-Container-Device': 'sda,sdd', + 'X-Container-Root-Db-State': 'unsharded'}, {'X-Container-Host': '10.0.0.1:1001', 'X-Container-Partition': '0', - 'X-Container-Device': 'sdb'}, + 'X-Container-Device': 'sdb', + 'X-Container-Root-Db-State': 'unsharded'}, {'X-Container-Host': '10.0.0.2:1002', 'X-Container-Partition': '0', - 'X-Container-Device': 'sdc'}]) + 'X-Container-Device': 'sdc', + 'X-Container-Root-Db-State': 'unsharded'}]) def test_POST_x_container_headers_with_more_container_replicas(self): self.app.container_ring.set_replicas(4) @@ -7459,13 +7545,16 @@ class TestReplicatedObjectController( seen_headers, [ {'X-Container-Host': '10.0.0.0:1000,10.0.0.3:1003', 'X-Container-Partition': '0', - 'X-Container-Device': 'sda,sdd'}, + 'X-Container-Device': 'sda,sdd', + 'X-Container-Root-Db-State': 'unsharded'}, {'X-Container-Host': '10.0.0.1:1001', 'X-Container-Partition': '0', - 'X-Container-Device': 'sdb'}, + 'X-Container-Device': 'sdb', + 'X-Container-Root-Db-State': 'unsharded'}, {'X-Container-Host': '10.0.0.2:1002', 'X-Container-Partition': '0', - 'X-Container-Device': 'sdc'}]) + 'X-Container-Device': 'sdc', + 'X-Container-Root-Db-State': 'unsharded'}]) def test_DELETE_x_container_headers_with_more_container_replicas(self): self.app.container_ring.set_replicas(4) @@ -7479,16 +7568,20 @@ class TestReplicatedObjectController( controller.DELETE, req, 200, 200, 200, 200, 200) # HEAD HEAD DELETE DELETE DELETE + self.maxDiff = None self.assertEqual(seen_headers, [ {'X-Container-Host': '10.0.0.0:1000,10.0.0.3:1003', 'X-Container-Partition': '0', - 'X-Container-Device': 'sda,sdd'}, + 'X-Container-Device': 'sda,sdd', + 'X-Container-Root-Db-State': 'unsharded'}, {'X-Container-Host': '10.0.0.1:1001', 'X-Container-Partition': '0', - 'X-Container-Device': 'sdb'}, + 'X-Container-Device': 'sdb', + 'X-Container-Root-Db-State': 'unsharded'}, {'X-Container-Host': '10.0.0.2:1002', 'X-Container-Partition': '0', - 'X-Container-Device': 'sdc'} + 'X-Container-Device': 'sdc', + 'X-Container-Root-Db-State': 'unsharded'} ]) @mock.patch('time.time', new=lambda: STATIC_TIME) @@ -11684,6 +11777,7 @@ class TestAccountControllerFakeGetResponse(unittest.TestCase): Test all the faked-out GET responses for accounts that don't exist. They have to match the responses for empty accounts that really exist. """ + def setUp(self): conf = {'account_autocreate': 'yes'} self.app = listing_formats.ListingFilter(