Merge "Limit object-expirer queue updates on object DELETE, PUT, POST"
This commit is contained in:
commit
eaf056154e
@ -413,6 +413,11 @@ class ObjectController(BaseStorageServer):
|
||||
headers_out['x-content-type'] = 'text/plain'
|
||||
headers_out['x-etag'] = 'd41d8cd98f00b204e9800998ecf8427e'
|
||||
else:
|
||||
if not config_true_value(
|
||||
request.headers.get(
|
||||
'X-Backend-Clean-Expiring-Object-Queue', 't')):
|
||||
return
|
||||
|
||||
# DELETEs of old expiration data have no way of knowing what the
|
||||
# old X-Delete-At-Container was at the time of the initial setting
|
||||
# of the data, so a best guess is made here.
|
||||
|
@ -336,6 +336,30 @@ class BaseObjectController(Controller):
|
||||
next(container_iter))
|
||||
existing_updates += 1
|
||||
|
||||
# Keep the number of expirer-queue deletes to a reasonable number.
|
||||
#
|
||||
# In the best case, at least one object server writes out an
|
||||
# async_pending for an expirer-queue update. In the worst case, no
|
||||
# object server does so, and an expirer-queue row remains that
|
||||
# refers to an already-deleted object. In this case, upon attempting
|
||||
# to delete the object, the object expirer will notice that the
|
||||
# object does not exist and then remove the row from the expirer
|
||||
# queue.
|
||||
#
|
||||
# In other words: expirer-queue updates on object DELETE are nice to
|
||||
# have, but not strictly necessary for correct operation.
|
||||
#
|
||||
# Also, each queue update results in an async_pending record, which
|
||||
# causes the object updater to talk to all container servers. If we
|
||||
# have N async_pendings and Rc container replicas, we cause N * Rc
|
||||
# requests from object updaters to container servers (possibly more,
|
||||
# depending on retries). Thus, it is helpful to keep this number
|
||||
# small.
|
||||
n_desired_queue_updates = 2
|
||||
for i in range(len(headers)):
|
||||
headers[i]['X-Backend-Clean-Expiring-Object-Queue'] = (
|
||||
't' if i < n_desired_queue_updates else 'f')
|
||||
|
||||
for i, node in enumerate(delete_at_nodes or []):
|
||||
i = i % len(headers)
|
||||
|
||||
|
@ -5916,6 +5916,46 @@ class TestObjectController(unittest.TestCase):
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
|
||||
def test_DELETE_can_skip_updating_expirer_queue(self):
|
||||
policy = POLICIES.get_by_index(0)
|
||||
test_time = time()
|
||||
put_time = test_time
|
||||
delete_time = test_time + 1
|
||||
delete_at_timestamp = int(test_time + 10000)
|
||||
delete_at_container = str(
|
||||
delete_at_timestamp /
|
||||
self.object_controller.expiring_objects_container_divisor *
|
||||
self.object_controller.expiring_objects_container_divisor)
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': normalize_timestamp(put_time),
|
||||
'X-Delete-At': str(delete_at_timestamp),
|
||||
'X-Delete-At-Container': delete_at_container,
|
||||
'Content-Length': '4',
|
||||
'Content-Type': 'application/octet-stream'})
|
||||
req.body = 'TEST'
|
||||
|
||||
# Mock out async_update so we don't get any async_pending files.
|
||||
with mock.patch.object(self.object_controller, 'async_update'):
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'DELETE'},
|
||||
headers={'X-Timestamp': normalize_timestamp(delete_time),
|
||||
'X-Backend-Clean-Expiring-Object-Queue': 'false',
|
||||
'X-If-Delete-At': str(delete_at_timestamp)})
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 204)
|
||||
|
||||
async_pending_dir = os.path.join(
|
||||
self.testdir, 'sda1', diskfile.get_async_dir(policy))
|
||||
# empty dir or absent dir, either is fine
|
||||
try:
|
||||
self.assertEqual([], os.listdir(async_pending_dir))
|
||||
except OSError as err:
|
||||
self.assertEqual(err.errno, errno.ENOENT)
|
||||
|
||||
def test_DELETE_but_expired(self):
|
||||
test_time = time() + 10000
|
||||
delete_at_timestamp = int(test_time + 100)
|
||||
@ -5981,7 +6021,7 @@ class TestObjectController(unittest.TestCase):
|
||||
utils.Timestamp(test_timestamp).internal + '.data')
|
||||
self.assertTrue(os.path.isfile(objfile))
|
||||
|
||||
# move time past expirery
|
||||
# move time past expiry
|
||||
with mock.patch('swift.obj.diskfile.time') as mock_time:
|
||||
mock_time.time.return_value = test_time + 100
|
||||
req = Request.blank(
|
||||
@ -6161,6 +6201,109 @@ class TestObjectController(unittest.TestCase):
|
||||
'DELETE', int(delete_at_timestamp1), 'a', 'c', 'o',
|
||||
given_args[5], 'sda1', POLICIES[0]])
|
||||
|
||||
def test_PUT_can_skip_updating_expirer_queue(self):
|
||||
policy = POLICIES.get_by_index(0)
|
||||
test_time = time()
|
||||
put_time = test_time
|
||||
overwrite_time = test_time + 1
|
||||
delete_at_timestamp = int(test_time + 10000)
|
||||
delete_at_container = str(
|
||||
delete_at_timestamp /
|
||||
self.object_controller.expiring_objects_container_divisor *
|
||||
self.object_controller.expiring_objects_container_divisor)
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': normalize_timestamp(put_time),
|
||||
'X-Delete-At': str(delete_at_timestamp),
|
||||
'X-Delete-At-Container': delete_at_container,
|
||||
'Content-Length': '4',
|
||||
'Content-Type': 'application/octet-stream'})
|
||||
req.body = 'TEST'
|
||||
|
||||
# Mock out async_update so we don't get any async_pending files.
|
||||
with mock.patch.object(self.object_controller, 'async_update'):
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
# Overwrite with a non-expiring object
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': normalize_timestamp(overwrite_time),
|
||||
'X-Backend-Clean-Expiring-Object-Queue': 'false',
|
||||
'Content-Length': '9',
|
||||
'Content-Type': 'application/octet-stream'})
|
||||
req.body = 'new stuff'
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
async_pending_dir = os.path.join(
|
||||
self.testdir, 'sda1', diskfile.get_async_dir(policy))
|
||||
# empty dir or absent dir, either is fine
|
||||
try:
|
||||
self.assertEqual([], os.listdir(async_pending_dir))
|
||||
except OSError as err:
|
||||
self.assertEqual(err.errno, errno.ENOENT)
|
||||
|
||||
def test_PUT_can_skip_deleting_expirer_queue_but_still_inserts(self):
|
||||
policy = POLICIES.get_by_index(0)
|
||||
test_time = time()
|
||||
put_time = test_time
|
||||
overwrite_time = test_time + 1
|
||||
delete_at_timestamp_1 = int(test_time + 10000)
|
||||
delete_at_timestamp_2 = int(test_time + 20000)
|
||||
delete_at_container_1 = str(
|
||||
delete_at_timestamp_1 /
|
||||
self.object_controller.expiring_objects_container_divisor *
|
||||
self.object_controller.expiring_objects_container_divisor)
|
||||
delete_at_container_2 = str(
|
||||
delete_at_timestamp_2 /
|
||||
self.object_controller.expiring_objects_container_divisor *
|
||||
self.object_controller.expiring_objects_container_divisor)
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': normalize_timestamp(put_time),
|
||||
'X-Delete-At': str(delete_at_timestamp_1),
|
||||
'X-Delete-At-Container': delete_at_container_1,
|
||||
'Content-Length': '4',
|
||||
'Content-Type': 'application/octet-stream'})
|
||||
req.body = 'TEST'
|
||||
|
||||
# Mock out async_update so we don't get any async_pending files.
|
||||
with mock.patch.object(self.object_controller, 'async_update'):
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
# Overwrite with an expiring object
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': normalize_timestamp(overwrite_time),
|
||||
'X-Backend-Clean-Expiring-Object-Queue': 'false',
|
||||
'X-Delete-At': str(delete_at_timestamp_2),
|
||||
'X-Delete-At-Container': delete_at_container_2,
|
||||
'Content-Length': '9',
|
||||
'Content-Type': 'application/octet-stream'})
|
||||
req.body = 'new stuff'
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
async_pendings = []
|
||||
async_pending_dir = os.path.join(
|
||||
self.testdir, 'sda1', diskfile.get_async_dir(policy))
|
||||
for dirpath, _, filenames in os.walk(async_pending_dir):
|
||||
for filename in filenames:
|
||||
async_pendings.append(os.path.join(dirpath, filename))
|
||||
|
||||
self.assertEqual(len(async_pendings), 1)
|
||||
|
||||
async_pending_ops = []
|
||||
for pending_file in async_pendings:
|
||||
with open(pending_file) as fh:
|
||||
async_pending = pickle.load(fh)
|
||||
async_pending_ops.append(async_pending['op'])
|
||||
self.assertEqual(async_pending_ops, ['PUT'])
|
||||
|
||||
def test_PUT_delete_at_in_past(self):
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
@ -6173,6 +6316,48 @@ class TestObjectController(unittest.TestCase):
|
||||
self.assertEqual(resp.status_int, 400)
|
||||
self.assertTrue('X-Delete-At in past' in resp.body)
|
||||
|
||||
def test_POST_can_skip_updating_expirer_queue(self):
|
||||
policy = POLICIES.get_by_index(0)
|
||||
test_time = time()
|
||||
put_time = test_time
|
||||
overwrite_time = test_time + 1
|
||||
delete_at_timestamp = int(test_time + 10000)
|
||||
delete_at_container = str(
|
||||
delete_at_timestamp /
|
||||
self.object_controller.expiring_objects_container_divisor *
|
||||
self.object_controller.expiring_objects_container_divisor)
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': normalize_timestamp(put_time),
|
||||
'X-Delete-At': str(delete_at_timestamp),
|
||||
'X-Delete-At-Container': delete_at_container,
|
||||
'Content-Length': '4',
|
||||
'Content-Type': 'application/octet-stream'})
|
||||
req.body = 'TEST'
|
||||
|
||||
# Mock out async_update so we don't get any async_pending files.
|
||||
with mock.patch.object(self.object_controller, 'async_update'):
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
# POST to remove X-Delete-At
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'X-Timestamp': normalize_timestamp(overwrite_time),
|
||||
'X-Backend-Clean-Expiring-Object-Queue': 'false',
|
||||
'X-Delete-At': ''})
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 202)
|
||||
|
||||
async_pending_dir = os.path.join(
|
||||
self.testdir, 'sda1', diskfile.get_async_dir(policy))
|
||||
# empty dir or absent dir, either is fine
|
||||
try:
|
||||
self.assertEqual([], os.listdir(async_pending_dir))
|
||||
except OSError as err:
|
||||
self.assertEqual(err.errno, errno.ENOENT)
|
||||
|
||||
def test_POST_delete_at_in_past(self):
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
|
@ -452,6 +452,33 @@ class BaseObjectControllerMixin(object):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 204)
|
||||
|
||||
def test_DELETE_limits_expirer_queue_updates(self):
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
|
||||
codes = [204] * self.replicas()
|
||||
captured_headers = []
|
||||
|
||||
def capture_headers(ip, port, device, part, method, path,
|
||||
headers=None, **kwargs):
|
||||
captured_headers.append(headers)
|
||||
|
||||
with set_http_connect(*codes, give_connect=capture_headers):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 204) # sanity check
|
||||
|
||||
counts = {True: 0, False: 0, None: 0}
|
||||
for headers in captured_headers:
|
||||
v = headers.get('X-Backend-Clean-Expiring-Object-Queue')
|
||||
norm_v = None if v is None else utils.config_true_value(v)
|
||||
counts[norm_v] += 1
|
||||
|
||||
max_queue_updates = 2
|
||||
o_replicas = self.replicas()
|
||||
self.assertEqual(counts, {
|
||||
True: min(max_queue_updates, o_replicas),
|
||||
False: max(o_replicas - max_queue_updates, 0),
|
||||
None: 0,
|
||||
})
|
||||
|
||||
def test_DELETE_write_affinity_before_replication(self):
|
||||
policy_conf = self.app.get_policy_options(self.policy)
|
||||
policy_conf.write_affinity_handoff_delete_count = self.replicas() / 2
|
||||
@ -482,6 +509,69 @@ class BaseObjectControllerMixin(object):
|
||||
|
||||
self.assertEqual(resp.status_int, 204)
|
||||
|
||||
def test_PUT_limits_expirer_queue_deletes(self):
|
||||
req = swift.common.swob.Request.blank(
|
||||
'/v1/a/c/o', method='PUT', body='',
|
||||
headers={'Content-Type': 'application/octet-stream'})
|
||||
codes = [201] * self.replicas()
|
||||
captured_headers = []
|
||||
|
||||
def capture_headers(ip, port, device, part, method, path,
|
||||
headers=None, **kwargs):
|
||||
captured_headers.append(headers)
|
||||
|
||||
expect_headers = {
|
||||
'X-Obj-Metadata-Footer': 'yes',
|
||||
'X-Obj-Multiphase-Commit': 'yes'
|
||||
}
|
||||
with set_http_connect(*codes, give_connect=capture_headers,
|
||||
expect_headers=expect_headers):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 201) # sanity check
|
||||
|
||||
counts = {True: 0, False: 0, None: 0}
|
||||
for headers in captured_headers:
|
||||
v = headers.get('X-Backend-Clean-Expiring-Object-Queue')
|
||||
norm_v = None if v is None else utils.config_true_value(v)
|
||||
counts[norm_v] += 1
|
||||
|
||||
max_queue_updates = 2
|
||||
o_replicas = self.replicas()
|
||||
self.assertEqual(counts, {
|
||||
True: min(max_queue_updates, o_replicas),
|
||||
False: max(o_replicas - max_queue_updates, 0),
|
||||
None: 0,
|
||||
})
|
||||
|
||||
def test_POST_limits_expirer_queue_deletes(self):
|
||||
req = swift.common.swob.Request.blank(
|
||||
'/v1/a/c/o', method='POST', body='',
|
||||
headers={'Content-Type': 'application/octet-stream'})
|
||||
codes = [201] * self.replicas()
|
||||
captured_headers = []
|
||||
|
||||
def capture_headers(ip, port, device, part, method, path,
|
||||
headers=None, **kwargs):
|
||||
captured_headers.append(headers)
|
||||
|
||||
with set_http_connect(*codes, give_connect=capture_headers):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 201) # sanity check
|
||||
|
||||
counts = {True: 0, False: 0, None: 0}
|
||||
for headers in captured_headers:
|
||||
v = headers.get('X-Backend-Clean-Expiring-Object-Queue')
|
||||
norm_v = None if v is None else utils.config_true_value(v)
|
||||
counts[norm_v] += 1
|
||||
|
||||
max_queue_updates = 2
|
||||
o_replicas = self.replicas()
|
||||
self.assertEqual(counts, {
|
||||
True: min(max_queue_updates, o_replicas),
|
||||
False: max(o_replicas - max_queue_updates, 0),
|
||||
None: 0,
|
||||
})
|
||||
|
||||
def test_POST_non_int_delete_after(self):
|
||||
t = str(int(time.time() + 100)) + '.1'
|
||||
req = swob.Request.blank('/v1/a/c/o', method='POST',
|
||||
|
Loading…
x
Reference in New Issue
Block a user