From 48da3c1ed783a2b69cc74b02e8fd45e9d36cf80a Mon Sep 17 00:00:00 2001 From: Samuel Merritt Date: Tue, 9 Jan 2018 13:27:48 -0800 Subject: [PATCH] Limit object-expirer queue updates on object DELETE, PUT, POST Currently, on deletion of an expiring object, each object server writes an async_pending to update the expirer queue and remove the row for that object. Each async_pending is processed by the object updater and results in all container replicas being updated. This is also true for PUT and POST requests for existing expiring objects. If you have Rc container replicas and Ro object replicas (or EC pieces), then the number of expirer-queue requests made is Rc * Ro [1]. For a 3-replica cluster, that number is 9, which is not terrible. For a cluster with 3 container replicas and a 15+4 EC scheme, that number is 57, which is terrible. This commit makes it so at most two object servers will write out the async_pending files needed to update the queue, dropping the request count to 2 * Rc [2]. The object server now looks for a header "X-Backend-Clean-Expiring-Object-Queue: " and writes or does not write expirer-queue async_pendings as appropriate. The proxy sends that header to 2 object servers. The queue update is not necessary for the proper functioning of the object expirer; if the queue update fails, then the object expirer will try to delete the object, receive 404s or 412s, and remove the queue entry. Removal on object PUT/POST/DELETE is helpful but not required. [1] assuming no retries needed by the object updater [2] or Rc, if a cluster has only one object replica Change-Id: I4d64f4d1d107c437fd3c23e19160157fdafbcd42 --- swift/obj/server.py | 5 + swift/proxy/controllers/obj.py | 24 +++ test/unit/obj/test_server.py | 187 +++++++++++++++++++++++- test/unit/proxy/controllers/test_obj.py | 90 ++++++++++++ 4 files changed, 305 insertions(+), 1 deletion(-) diff --git a/swift/obj/server.py b/swift/obj/server.py index 563ccb9865..e3d71ca21a 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -413,6 +413,11 @@ class ObjectController(BaseStorageServer): headers_out['x-content-type'] = 'text/plain' headers_out['x-etag'] = 'd41d8cd98f00b204e9800998ecf8427e' else: + if not config_true_value( + request.headers.get( + 'X-Backend-Clean-Expiring-Object-Queue', 't')): + return + # DELETEs of old expiration data have no way of knowing what the # old X-Delete-At-Container was at the time of the initial setting # of the data, so a best guess is made here. diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index bac0ed1dbe..c922d474fd 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -336,6 +336,30 @@ class BaseObjectController(Controller): next(container_iter)) existing_updates += 1 + # Keep the number of expirer-queue deletes to a reasonable number. + # + # In the best case, at least one object server writes out an + # async_pending for an expirer-queue update. In the worst case, no + # object server does so, and an expirer-queue row remains that + # refers to an already-deleted object. In this case, upon attempting + # to delete the object, the object expirer will notice that the + # object does not exist and then remove the row from the expirer + # queue. + # + # In other words: expirer-queue updates on object DELETE are nice to + # have, but not strictly necessary for correct operation. + # + # Also, each queue update results in an async_pending record, which + # causes the object updater to talk to all container servers. If we + # have N async_pendings and Rc container replicas, we cause N * Rc + # requests from object updaters to container servers (possibly more, + # depending on retries). Thus, it is helpful to keep this number + # small. + n_desired_queue_updates = 2 + for i in range(len(headers)): + headers[i]['X-Backend-Clean-Expiring-Object-Queue'] = ( + 't' if i < n_desired_queue_updates else 'f') + for i, node in enumerate(delete_at_nodes or []): i = i % len(headers) diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index a692a27b72..c04e3d9667 100644 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -5965,6 +5965,46 @@ class TestObjectController(unittest.TestCase): finally: object_server.time.time = orig_time + def test_DELETE_can_skip_updating_expirer_queue(self): + policy = POLICIES.get_by_index(0) + test_time = time() + put_time = test_time + delete_time = test_time + 1 + delete_at_timestamp = int(test_time + 10000) + delete_at_container = str( + delete_at_timestamp / + self.object_controller.expiring_objects_container_divisor * + self.object_controller.expiring_objects_container_divisor) + req = Request.blank( + '/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': normalize_timestamp(put_time), + 'X-Delete-At': str(delete_at_timestamp), + 'X-Delete-At-Container': delete_at_container, + 'Content-Length': '4', + 'Content-Type': 'application/octet-stream'}) + req.body = 'TEST' + + # Mock out async_update so we don't get any async_pending files. + with mock.patch.object(self.object_controller, 'async_update'): + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 201) + + req = Request.blank( + '/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'DELETE'}, + headers={'X-Timestamp': normalize_timestamp(delete_time), + 'X-Backend-Clean-Expiring-Object-Queue': 'false', + 'X-If-Delete-At': str(delete_at_timestamp)}) + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 204) + + async_pending_dir = os.path.join( + self.testdir, 'sda1', diskfile.get_async_dir(policy)) + # empty dir or absent dir, either is fine + try: + self.assertEqual([], os.listdir(async_pending_dir)) + except OSError as err: + self.assertEqual(err.errno, errno.ENOENT) + def test_DELETE_but_expired(self): test_time = time() + 10000 delete_at_timestamp = int(test_time + 100) @@ -6030,7 +6070,7 @@ class TestObjectController(unittest.TestCase): utils.Timestamp(test_timestamp).internal + '.data') self.assertTrue(os.path.isfile(objfile)) - # move time past expirery + # move time past expiry with mock.patch('swift.obj.diskfile.time') as mock_time: mock_time.time.return_value = test_time + 100 req = Request.blank( @@ -6210,6 +6250,109 @@ class TestObjectController(unittest.TestCase): 'DELETE', int(delete_at_timestamp1), 'a', 'c', 'o', given_args[5], 'sda1', POLICIES[0]]) + def test_PUT_can_skip_updating_expirer_queue(self): + policy = POLICIES.get_by_index(0) + test_time = time() + put_time = test_time + overwrite_time = test_time + 1 + delete_at_timestamp = int(test_time + 10000) + delete_at_container = str( + delete_at_timestamp / + self.object_controller.expiring_objects_container_divisor * + self.object_controller.expiring_objects_container_divisor) + req = Request.blank( + '/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': normalize_timestamp(put_time), + 'X-Delete-At': str(delete_at_timestamp), + 'X-Delete-At-Container': delete_at_container, + 'Content-Length': '4', + 'Content-Type': 'application/octet-stream'}) + req.body = 'TEST' + + # Mock out async_update so we don't get any async_pending files. + with mock.patch.object(self.object_controller, 'async_update'): + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 201) + + # Overwrite with a non-expiring object + req = Request.blank( + '/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': normalize_timestamp(overwrite_time), + 'X-Backend-Clean-Expiring-Object-Queue': 'false', + 'Content-Length': '9', + 'Content-Type': 'application/octet-stream'}) + req.body = 'new stuff' + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 201) + + async_pending_dir = os.path.join( + self.testdir, 'sda1', diskfile.get_async_dir(policy)) + # empty dir or absent dir, either is fine + try: + self.assertEqual([], os.listdir(async_pending_dir)) + except OSError as err: + self.assertEqual(err.errno, errno.ENOENT) + + def test_PUT_can_skip_deleting_expirer_queue_but_still_inserts(self): + policy = POLICIES.get_by_index(0) + test_time = time() + put_time = test_time + overwrite_time = test_time + 1 + delete_at_timestamp_1 = int(test_time + 10000) + delete_at_timestamp_2 = int(test_time + 20000) + delete_at_container_1 = str( + delete_at_timestamp_1 / + self.object_controller.expiring_objects_container_divisor * + self.object_controller.expiring_objects_container_divisor) + delete_at_container_2 = str( + delete_at_timestamp_2 / + self.object_controller.expiring_objects_container_divisor * + self.object_controller.expiring_objects_container_divisor) + req = Request.blank( + '/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': normalize_timestamp(put_time), + 'X-Delete-At': str(delete_at_timestamp_1), + 'X-Delete-At-Container': delete_at_container_1, + 'Content-Length': '4', + 'Content-Type': 'application/octet-stream'}) + req.body = 'TEST' + + # Mock out async_update so we don't get any async_pending files. + with mock.patch.object(self.object_controller, 'async_update'): + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 201) + + # Overwrite with an expiring object + req = Request.blank( + '/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': normalize_timestamp(overwrite_time), + 'X-Backend-Clean-Expiring-Object-Queue': 'false', + 'X-Delete-At': str(delete_at_timestamp_2), + 'X-Delete-At-Container': delete_at_container_2, + 'Content-Length': '9', + 'Content-Type': 'application/octet-stream'}) + req.body = 'new stuff' + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 201) + + async_pendings = [] + async_pending_dir = os.path.join( + self.testdir, 'sda1', diskfile.get_async_dir(policy)) + for dirpath, _, filenames in os.walk(async_pending_dir): + for filename in filenames: + async_pendings.append(os.path.join(dirpath, filename)) + + self.assertEqual(len(async_pendings), 1) + + async_pending_ops = [] + for pending_file in async_pendings: + with open(pending_file) as fh: + async_pending = pickle.load(fh) + async_pending_ops.append(async_pending['op']) + self.assertEqual(async_pending_ops, ['PUT']) + def test_PUT_delete_at_in_past(self): req = Request.blank( '/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, @@ -6222,6 +6365,48 @@ class TestObjectController(unittest.TestCase): self.assertEqual(resp.status_int, 400) self.assertTrue('X-Delete-At in past' in resp.body) + def test_POST_can_skip_updating_expirer_queue(self): + policy = POLICIES.get_by_index(0) + test_time = time() + put_time = test_time + overwrite_time = test_time + 1 + delete_at_timestamp = int(test_time + 10000) + delete_at_container = str( + delete_at_timestamp / + self.object_controller.expiring_objects_container_divisor * + self.object_controller.expiring_objects_container_divisor) + req = Request.blank( + '/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': normalize_timestamp(put_time), + 'X-Delete-At': str(delete_at_timestamp), + 'X-Delete-At-Container': delete_at_container, + 'Content-Length': '4', + 'Content-Type': 'application/octet-stream'}) + req.body = 'TEST' + + # Mock out async_update so we don't get any async_pending files. + with mock.patch.object(self.object_controller, 'async_update'): + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 201) + + # POST to remove X-Delete-At + req = Request.blank( + '/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'POST'}, + headers={'X-Timestamp': normalize_timestamp(overwrite_time), + 'X-Backend-Clean-Expiring-Object-Queue': 'false', + 'X-Delete-At': ''}) + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 202) + + async_pending_dir = os.path.join( + self.testdir, 'sda1', diskfile.get_async_dir(policy)) + # empty dir or absent dir, either is fine + try: + self.assertEqual([], os.listdir(async_pending_dir)) + except OSError as err: + self.assertEqual(err.errno, errno.ENOENT) + def test_POST_delete_at_in_past(self): req = Request.blank( '/sda1/p/a/c/o', diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index c3a1d5a6c9..5e428d08ac 100644 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -452,6 +452,33 @@ class BaseObjectControllerMixin(object): resp = req.get_response(self.app) self.assertEqual(resp.status_int, 204) + def test_DELETE_limits_expirer_queue_updates(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE') + codes = [204] * self.replicas() + captured_headers = [] + + def capture_headers(ip, port, device, part, method, path, + headers=None, **kwargs): + captured_headers.append(headers) + + with set_http_connect(*codes, give_connect=capture_headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 204) # sanity check + + counts = {True: 0, False: 0, None: 0} + for headers in captured_headers: + v = headers.get('X-Backend-Clean-Expiring-Object-Queue') + norm_v = None if v is None else utils.config_true_value(v) + counts[norm_v] += 1 + + max_queue_updates = 2 + o_replicas = self.replicas() + self.assertEqual(counts, { + True: min(max_queue_updates, o_replicas), + False: max(o_replicas - max_queue_updates, 0), + None: 0, + }) + def test_DELETE_write_affinity_before_replication(self): policy_conf = self.app.get_policy_options(self.policy) policy_conf.write_affinity_handoff_delete_count = self.replicas() / 2 @@ -482,6 +509,69 @@ class BaseObjectControllerMixin(object): self.assertEqual(resp.status_int, 204) + def test_PUT_limits_expirer_queue_deletes(self): + req = swift.common.swob.Request.blank( + '/v1/a/c/o', method='PUT', body='', + headers={'Content-Type': 'application/octet-stream'}) + codes = [201] * self.replicas() + captured_headers = [] + + def capture_headers(ip, port, device, part, method, path, + headers=None, **kwargs): + captured_headers.append(headers) + + expect_headers = { + 'X-Obj-Metadata-Footer': 'yes', + 'X-Obj-Multiphase-Commit': 'yes' + } + with set_http_connect(*codes, give_connect=capture_headers, + expect_headers=expect_headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 201) # sanity check + + counts = {True: 0, False: 0, None: 0} + for headers in captured_headers: + v = headers.get('X-Backend-Clean-Expiring-Object-Queue') + norm_v = None if v is None else utils.config_true_value(v) + counts[norm_v] += 1 + + max_queue_updates = 2 + o_replicas = self.replicas() + self.assertEqual(counts, { + True: min(max_queue_updates, o_replicas), + False: max(o_replicas - max_queue_updates, 0), + None: 0, + }) + + def test_POST_limits_expirer_queue_deletes(self): + req = swift.common.swob.Request.blank( + '/v1/a/c/o', method='POST', body='', + headers={'Content-Type': 'application/octet-stream'}) + codes = [201] * self.replicas() + captured_headers = [] + + def capture_headers(ip, port, device, part, method, path, + headers=None, **kwargs): + captured_headers.append(headers) + + with set_http_connect(*codes, give_connect=capture_headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 201) # sanity check + + counts = {True: 0, False: 0, None: 0} + for headers in captured_headers: + v = headers.get('X-Backend-Clean-Expiring-Object-Queue') + norm_v = None if v is None else utils.config_true_value(v) + counts[norm_v] += 1 + + max_queue_updates = 2 + o_replicas = self.replicas() + self.assertEqual(counts, { + True: min(max_queue_updates, o_replicas), + False: max(o_replicas - max_queue_updates, 0), + None: 0, + }) + def test_POST_non_int_delete_after(self): t = str(int(time.time() + 100)) + '.1' req = swob.Request.blank('/v1/a/c/o', method='POST',