Merge "Don't make async_pendings during object expiration"
This commit is contained in:
commit
840e2c67db
@ -308,7 +308,8 @@ class ObjectExpirer(Daemon):
|
|||||||
perform the actual delete.
|
perform the actual delete.
|
||||||
"""
|
"""
|
||||||
path = '/v1/' + urllib.parse.quote(actual_obj.lstrip('/'))
|
path = '/v1/' + urllib.parse.quote(actual_obj.lstrip('/'))
|
||||||
self.swift.make_request('DELETE', path,
|
self.swift.make_request(
|
||||||
{'X-If-Delete-At': str(timestamp),
|
'DELETE', path,
|
||||||
'X-Timestamp': str(timestamp)},
|
{'X-If-Delete-At': str(timestamp), 'X-Timestamp': str(timestamp),
|
||||||
(2,))
|
'X-Backend-Clean-Expiring-Object-Queue': 'no'},
|
||||||
|
(2,))
|
||||||
|
@ -372,8 +372,8 @@ class BaseObjectController(Controller):
|
|||||||
# small.
|
# small.
|
||||||
n_desired_queue_updates = 2
|
n_desired_queue_updates = 2
|
||||||
for i in range(len(headers)):
|
for i in range(len(headers)):
|
||||||
headers[i]['X-Backend-Clean-Expiring-Object-Queue'] = (
|
headers[i].setdefault('X-Backend-Clean-Expiring-Object-Queue',
|
||||||
't' if i < n_desired_queue_updates else 'f')
|
't' if i < n_desired_queue_updates else 'f')
|
||||||
|
|
||||||
return headers
|
return headers
|
||||||
|
|
||||||
|
@ -12,6 +12,8 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
import errno
|
||||||
|
import os
|
||||||
import random
|
import random
|
||||||
import time
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
@ -103,7 +105,7 @@ class TestObjectExpirer(ReplProbeTest):
|
|||||||
|
|
||||||
# clear proxy cache
|
# clear proxy cache
|
||||||
client.post_container(self.url, self.token, self.container_name, {})
|
client.post_container(self.url, self.token, self.container_name, {})
|
||||||
# run the expirier again after replication
|
# run the expirer again after replication
|
||||||
self.expirer.once()
|
self.expirer.once()
|
||||||
|
|
||||||
# object is not in the listing
|
# object is not in the listing
|
||||||
@ -126,6 +128,80 @@ class TestObjectExpirer(ReplProbeTest):
|
|||||||
self.assertGreater(Timestamp(metadata['x-backend-timestamp']),
|
self.assertGreater(Timestamp(metadata['x-backend-timestamp']),
|
||||||
create_timestamp)
|
create_timestamp)
|
||||||
|
|
||||||
|
def test_expirer_doesnt_make_async_pendings(self):
|
||||||
|
# The object expirer cleans up its own queue. The inner loop
|
||||||
|
# basically looks like this:
|
||||||
|
#
|
||||||
|
# for obj in stuff_to_delete:
|
||||||
|
# delete_the_object(obj)
|
||||||
|
# remove_the_queue_entry(obj)
|
||||||
|
#
|
||||||
|
# By default, upon receipt of a DELETE request for an expiring
|
||||||
|
# object, the object servers will create async_pending records to
|
||||||
|
# clean the expirer queue. Since the expirer cleans its own queue,
|
||||||
|
# this is unnecessary. The expirer can make requests in such a way
|
||||||
|
# tha the object server does not write out any async pendings; this
|
||||||
|
# test asserts that this is the case.
|
||||||
|
|
||||||
|
def gather_async_pendings(onodes):
|
||||||
|
async_pendings = []
|
||||||
|
for onode in onodes:
|
||||||
|
device_dir = self.device_dir('', onode)
|
||||||
|
for ap_pol_dir in os.listdir(device_dir):
|
||||||
|
if not ap_pol_dir.startswith('async_pending'):
|
||||||
|
# skip 'objects', 'containers', etc.
|
||||||
|
continue
|
||||||
|
async_pending_dir = os.path.join(device_dir, ap_pol_dir)
|
||||||
|
try:
|
||||||
|
ap_dirs = os.listdir(async_pending_dir)
|
||||||
|
except OSError as err:
|
||||||
|
if err.errno == errno.ENOENT:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
for ap_dir in ap_dirs:
|
||||||
|
ap_dir_fullpath = os.path.join(
|
||||||
|
async_pending_dir, ap_dir)
|
||||||
|
async_pendings.extend([
|
||||||
|
os.path.join(ap_dir_fullpath, ent)
|
||||||
|
for ent in os.listdir(ap_dir_fullpath)])
|
||||||
|
return async_pendings
|
||||||
|
|
||||||
|
# Make an expiring object in each policy
|
||||||
|
for policy in ENABLED_POLICIES:
|
||||||
|
container_name = "expirer-test-%d" % policy.idx
|
||||||
|
container_headers = {'X-Storage-Policy': policy.name}
|
||||||
|
client.put_container(self.url, self.token, container_name,
|
||||||
|
headers=container_headers)
|
||||||
|
|
||||||
|
now = time.time()
|
||||||
|
delete_at = int(now + 2.0)
|
||||||
|
client.put_object(
|
||||||
|
self.url, self.token, container_name, "some-object",
|
||||||
|
headers={'X-Delete-At': str(delete_at),
|
||||||
|
'X-Timestamp': Timestamp(now).normal},
|
||||||
|
contents='dontcare')
|
||||||
|
|
||||||
|
time.sleep(2.0)
|
||||||
|
# make sure auto-created expirer-queue containers get in the account
|
||||||
|
# listing so the expirer can find them
|
||||||
|
Manager(['container-updater']).once()
|
||||||
|
|
||||||
|
# Make sure there's no async_pendings anywhere. Probe tests only run
|
||||||
|
# on single-node installs anyway, so this set should be small enough
|
||||||
|
# that an exhaustive check doesn't take too long.
|
||||||
|
all_obj_nodes = {}
|
||||||
|
for policy in ENABLED_POLICIES:
|
||||||
|
for dev in policy.object_ring.devs:
|
||||||
|
all_obj_nodes[dev['device']] = dev
|
||||||
|
pendings_before = gather_async_pendings(all_obj_nodes.values())
|
||||||
|
|
||||||
|
# expire the objects
|
||||||
|
Manager(['object-expirer']).once()
|
||||||
|
pendings_after = gather_async_pendings(all_obj_nodes.values())
|
||||||
|
self.assertEqual(pendings_after, pendings_before)
|
||||||
|
|
||||||
def test_expirer_object_should_not_be_expired(self):
|
def test_expirer_object_should_not_be_expired(self):
|
||||||
|
|
||||||
# Current object-expirer checks the correctness via x-if-delete-at
|
# Current object-expirer checks the correctness via x-if-delete-at
|
||||||
|
@ -796,6 +796,18 @@ class TestObjectExpirer(TestCase):
|
|||||||
self.assertEqual(x.swift.make_request.call_args[0][1],
|
self.assertEqual(x.swift.make_request.call_args[0][1],
|
||||||
'/v1/' + urllib.parse.quote(name))
|
'/v1/' + urllib.parse.quote(name))
|
||||||
|
|
||||||
|
def test_delete_actual_object_queue_cleaning(self):
|
||||||
|
name = 'something'
|
||||||
|
timestamp = '1515544858.80602'
|
||||||
|
x = expirer.ObjectExpirer({})
|
||||||
|
x.swift.make_request = mock.MagicMock()
|
||||||
|
x.delete_actual_object(name, timestamp)
|
||||||
|
self.assertEqual(x.swift.make_request.call_count, 1)
|
||||||
|
header = 'X-Backend-Clean-Expiring-Object-Queue'
|
||||||
|
self.assertEqual(
|
||||||
|
x.swift.make_request.call_args[0][2].get(header),
|
||||||
|
'no')
|
||||||
|
|
||||||
def test_pop_queue(self):
|
def test_pop_queue(self):
|
||||||
class InternalClient(object):
|
class InternalClient(object):
|
||||||
container_ring = FakeRing()
|
container_ring = FakeRing()
|
||||||
|
@ -479,6 +479,43 @@ class BaseObjectControllerMixin(object):
|
|||||||
None: 0,
|
None: 0,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
def test_expirer_DELETE_suppresses_expirer_queue_updates(self):
|
||||||
|
req = swift.common.swob.Request.blank(
|
||||||
|
'/v1/a/c/o', method='DELETE', headers={
|
||||||
|
'X-Backend-Clean-Expiring-Object-Queue': 'no'})
|
||||||
|
codes = [204] * self.replicas()
|
||||||
|
captured_headers = []
|
||||||
|
|
||||||
|
def capture_headers(ip, port, device, part, method, path,
|
||||||
|
headers=None, **kwargs):
|
||||||
|
captured_headers.append(headers)
|
||||||
|
|
||||||
|
with set_http_connect(*codes, give_connect=capture_headers):
|
||||||
|
resp = req.get_response(self.app)
|
||||||
|
self.assertEqual(resp.status_int, 204) # sanity check
|
||||||
|
|
||||||
|
counts = {True: 0, False: 0, None: 0}
|
||||||
|
for headers in captured_headers:
|
||||||
|
v = headers.get('X-Backend-Clean-Expiring-Object-Queue')
|
||||||
|
norm_v = None if v is None else utils.config_true_value(v)
|
||||||
|
counts[norm_v] += 1
|
||||||
|
|
||||||
|
o_replicas = self.replicas()
|
||||||
|
self.assertEqual(counts, {
|
||||||
|
True: 0,
|
||||||
|
False: o_replicas,
|
||||||
|
None: 0,
|
||||||
|
})
|
||||||
|
|
||||||
|
# Make sure we're not sending any expirer-queue update headers here.
|
||||||
|
# Since we're not updating the expirer queue, these headers would be
|
||||||
|
# superfluous.
|
||||||
|
for headers in captured_headers:
|
||||||
|
self.assertNotIn('X-Delete-At-Container', headers)
|
||||||
|
self.assertNotIn('X-Delete-At-Partition', headers)
|
||||||
|
self.assertNotIn('X-Delete-At-Host', headers)
|
||||||
|
self.assertNotIn('X-Delete-At-Device', headers)
|
||||||
|
|
||||||
def test_DELETE_write_affinity_before_replication(self):
|
def test_DELETE_write_affinity_before_replication(self):
|
||||||
policy_conf = self.app.get_policy_options(self.policy)
|
policy_conf = self.app.get_policy_options(self.policy)
|
||||||
policy_conf.write_affinity_handoff_delete_count = self.replicas() / 2
|
policy_conf.write_affinity_handoff_delete_count = self.replicas() / 2
|
||||||
|
Loading…
x
Reference in New Issue
Block a user