diff --git a/swift/obj/server.py b/swift/obj/server.py index e3d71ca21a..e04d662f83 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -391,15 +391,10 @@ class ObjectController(BaseStorageServer): if op != 'DELETE': delete_at_container = headers_in.get('X-Delete-At-Container', None) if not delete_at_container: - self.logger.warning( - 'X-Delete-At-Container header must be specified for ' - 'expiring objects background %s to work properly. Making ' - 'best guess as to the container name for now.' % op) - # TODO(gholt): In a future release, change the above warning to - # a raised exception and remove the guess code below. - delete_at_container = get_expirer_container( - delete_at, self.expiring_objects_container_divisor, - account, container, obj) + # If header is missing, no update needed as sufficient other + # object servers should perform the required update. + return + partition = headers_in.get('X-Delete-At-Partition', None) hosts = headers_in.get('X-Delete-At-Host', '') contdevices = headers_in.get('X-Delete-At-Device', '') diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 4337f3551b..96730d77a3 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -325,15 +325,30 @@ class BaseObjectController(Controller): headers[index].get('X-Container-Device'), container['device']) + def set_delete_at_headers(index, delete_at_node): + headers[index]['X-Delete-At-Container'] = delete_at_container + headers[index]['X-Delete-At-Partition'] = delete_at_partition + headers[index]['X-Delete-At-Host'] = csv_append( + headers[index].get('X-Delete-At-Host'), + '%(ip)s:%(port)s' % delete_at_node) + headers[index]['X-Delete-At-Device'] = csv_append( + headers[index].get('X-Delete-At-Device'), + delete_at_node['device']) + n_updates_needed = num_container_updates( len(containers), quorum_size(len(containers)), n_outgoing, policy.quorum) container_iter = itertools.cycle(containers) + dan_iter = itertools.cycle(delete_at_nodes or []) existing_updates = 0 while existing_updates < n_updates_needed: - set_container_update(existing_updates % n_outgoing, - next(container_iter)) + index = existing_updates % n_outgoing + set_container_update(index, next(container_iter)) + if delete_at_nodes: + # We reverse the index in order to distribute the updates + # across all nodes. + set_delete_at_headers(n_outgoing - 1 - index, next(dan_iter)) existing_updates += 1 # Keep the number of expirer-queue deletes to a reasonable number. @@ -360,18 +375,6 @@ class BaseObjectController(Controller): headers[i]['X-Backend-Clean-Expiring-Object-Queue'] = ( 't' if i < n_desired_queue_updates else 'f') - for i, node in enumerate(delete_at_nodes or []): - i = i % len(headers) - - headers[i]['X-Delete-At-Container'] = delete_at_container - headers[i]['X-Delete-At-Partition'] = delete_at_partition - headers[i]['X-Delete-At-Host'] = csv_append( - headers[i].get('X-Delete-At-Host'), - '%(ip)s:%(port)s' % node) - headers[i]['X-Delete-At-Device'] = csv_append( - headers[i].get('X-Delete-At-Device'), - node['device']) - return headers def _get_conn_response(self, putter, path, logger_thread_locals, diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index ee0b4df8c7..312249ffb7 100644 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -5582,11 +5582,7 @@ class TestObjectController(unittest.TestCase): 'X-Backend-Storage-Policy-Index': int(policy)}) self.object_controller.delete_at_update('PUT', 2, 'a', 'c', 'o', req, 'sda1', policy) - self.assertEqual( - self.logger.get_lines_for_level('warning'), - ['X-Delete-At-Container header must be specified for expiring ' - 'objects background PUT to work properly. Making best guess as ' - 'to the container name for now.']) + self.assertEqual(given_args, []) def test_delete_at_update_delete(self): policy = random.choice(list(POLICIES)) diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index 903b92e161..991bdedff8 100644 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -34,7 +34,7 @@ from six.moves import range import swift from swift.common import utils, swob, exceptions from swift.common.exceptions import ChunkWriteTimeout -from swift.common.utils import Timestamp +from swift.common.utils import Timestamp, list_from_csv from swift.proxy import server as proxy_server from swift.proxy.controllers import obj from swift.proxy.controllers.base import \ @@ -787,6 +787,111 @@ class BaseObjectControllerMixin(object): n_expected_updates, self.replicas(policy)) self.assertEqual(n_expected_updates, n_container_updates) + def test_delete_at_backend_requests(self): + t = str(int(time.time() + 100)) + for policy in POLICIES: + req = swift.common.swob.Request.blank( + '/v1/a/c/o', method='PUT', + headers={'Content-Length': '0', + 'X-Backend-Storage-Policy-Index': int(policy), + 'X-Delete-At': t}) + controller = self.controller_cls(self.app, 'a', 'c', 'o') + + for num_del_at_nodes in range(1, 16): + containers = [ + {'ip': '2.0.0.%s' % i, 'port': '70%s' % str(i).zfill(2), + 'device': 'sdc'} for i in range(num_del_at_nodes)] + del_at_nodes = [ + {'ip': '1.0.0.%s' % i, 'port': '60%s' % str(i).zfill(2), + 'device': 'sdb'} for i in range(num_del_at_nodes)] + + backend_headers = controller._backend_requests( + req, self.replicas(policy), 1, containers, + delete_at_container='dac', delete_at_partition=2, + delete_at_nodes=del_at_nodes) + + devices = [] + hosts = [] + part = ctr = 0 + for given_headers in backend_headers: + self.assertEqual(given_headers.get('X-Delete-At'), t) + if 'X-Delete-At-Partition' in given_headers: + self.assertEqual( + given_headers.get('X-Delete-At-Partition'), '2') + part += 1 + if 'X-Delete-At-Container' in given_headers: + self.assertEqual( + given_headers.get('X-Delete-At-Container'), 'dac') + ctr += 1 + devices += ( + list_from_csv(given_headers.get('X-Delete-At-Device'))) + hosts += ( + list_from_csv(given_headers.get('X-Delete-At-Host'))) + + # same as in test_container_update_backend_requests + n_can_fail = self.replicas(policy) - self.quorum(policy) + n_expected_updates = ( + n_can_fail + utils.quorum_size(num_del_at_nodes)) + + n_expected_hosts = max( + n_expected_updates, num_del_at_nodes) + + self.assertEqual(len(hosts), n_expected_hosts) + self.assertEqual(len(devices), n_expected_hosts) + + # parts don't get doubled up, maximum is count of obj requests + n_expected_parts = min( + n_expected_hosts, self.replicas(policy)) + self.assertEqual(part, n_expected_parts) + self.assertEqual(ctr, n_expected_parts) + + # check that hosts are correct + self.assertEqual( + set(hosts), + set('%s:%s' % (h['ip'], h['port']) for h in del_at_nodes)) + self.assertEqual(set(devices), set(('sdb',))) + + def test_smooth_distributed_backend_requests(self): + t = str(int(time.time() + 100)) + for policy in POLICIES: + req = swift.common.swob.Request.blank( + '/v1/a/c/o', method='PUT', + headers={'Content-Length': '0', + 'X-Backend-Storage-Policy-Index': int(policy), + 'X-Delete-At': t}) + controller = self.controller_cls(self.app, 'a', 'c', 'o') + + for num_containers in range(1, 16): + containers = [ + {'ip': '2.0.0.%s' % i, 'port': '70%s' % str(i).zfill(2), + 'device': 'sdc'} for i in range(num_containers)] + del_at_nodes = [ + {'ip': '1.0.0.%s' % i, 'port': '60%s' % str(i).zfill(2), + 'device': 'sdb'} for i in range(num_containers)] + + backend_headers = controller._backend_requests( + req, self.replicas(policy), 1, containers, + delete_at_container='dac', delete_at_partition=2, + delete_at_nodes=del_at_nodes) + + # caculate no of expected updates, see + # test_container_update_backend_requests for explanation + n_expected_updates = min(max( + self.replicas(policy) - self.quorum(policy) + + utils.quorum_size(num_containers), num_containers), + self.replicas(policy)) + + # the first n_expected_updates servers should have received + # a container update + self.assertTrue( + all([h.get('X-Container-Partition') + for h in backend_headers[:n_expected_updates]])) + # the last n_expected_updates servers should have received + # the x-delete-at* headers + self.assertTrue( + all([h.get('X-Delete-At-Container') + for h in backend_headers[-n_expected_updates:]])) + def _check_write_affinity( self, conf, policy_conf, policy, affinity_regions, affinity_count): conf['policy_config'] = policy_conf diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 2246ece854..f4470a4cd9 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -5486,9 +5486,9 @@ class TestReplicatedObjectController( 'X-Delete-At-Partition': '0', 'X-Delete-At-Device': 'sdb'}, {'X-Delete-At-Host': None, - 'X-Delete-At-Container': None, 'X-Delete-At-Partition': None, - 'X-Delete-At-Device': None} + 'X-Delete-At-Container': None, + 'X-Delete-At-Device': None}, ]) @mock.patch('time.time', new=lambda: STATIC_TIME)