diff --git a/etc/object-expirer.conf-sample b/etc/object-expirer.conf-sample index 7d9e515ff1..611be36548 100644 --- a/etc/object-expirer.conf-sample +++ b/etc/object-expirer.conf-sample @@ -46,6 +46,10 @@ # process is "zero based", if you want to use 3 processes, you should run # processes with process set to 0, 1, and 2 # process = 0 +# The expirer will re-attempt expiring if the source object is not available +# up to reclaim_age seconds before it gives up and deletes the entry in the +# queue. +# reclaim_age = 604800 [pipeline:main] pipeline = catch_errors cache proxy-server diff --git a/swift/obj/expirer.py b/swift/obj/expirer.py index fcbcb24224..fdf711962b 100644 --- a/swift/obj/expirer.py +++ b/swift/obj/expirer.py @@ -24,11 +24,13 @@ from eventlet import sleep, Timeout from eventlet.greenpool import GreenPool from swift.common.daemon import Daemon -from swift.common.internal_client import InternalClient +from swift.common.internal_client import InternalClient, UnexpectedResponse from swift.common.utils import get_logger, dump_recon_cache from swift.common.http import HTTP_NOT_FOUND, HTTP_CONFLICT, \ HTTP_PRECONDITION_FAILED +from swift.container.reconciler import direct_delete_container_entry + class ObjectExpirer(Daemon): """ @@ -38,18 +40,17 @@ class ObjectExpirer(Daemon): :param conf: The daemon configuration. """ - def __init__(self, conf): + def __init__(self, conf, logger=None, swift=None): self.conf = conf - self.logger = get_logger(conf, log_route='object-expirer') + self.logger = logger or get_logger(conf, log_route='object-expirer') self.interval = int(conf.get('interval') or 300) self.expiring_objects_account = \ (conf.get('auto_create_account_prefix') or '.') + \ (conf.get('expiring_objects_account_name') or 'expiring_objects') conf_path = conf.get('__file__') or '/etc/swift/object-expirer.conf' request_tries = int(conf.get('request_tries') or 3) - self.swift = InternalClient(conf_path, - 'Swift Object Expirer', - request_tries) + self.swift = swift or InternalClient( + conf_path, 'Swift Object Expirer', request_tries) self.report_interval = int(conf.get('report_interval') or 300) self.report_first_time = self.report_last_time = time() self.report_objects = 0 @@ -61,6 +62,7 @@ class ObjectExpirer(Daemon): raise ValueError("concurrency must be set to at least 1") self.processes = int(self.conf.get('processes', 0)) self.process = int(self.conf.get('process', 0)) + self.reclaim_age = int(conf.get('reclaim_age', 86400 * 7)) def report(self, final=False): """ @@ -200,9 +202,15 @@ class ObjectExpirer(Daemon): def delete_object(self, actual_obj, timestamp, container, obj): start_time = time() try: - self.delete_actual_object(actual_obj, timestamp) - self.swift.delete_object(self.expiring_objects_account, - container, obj) + try: + self.delete_actual_object(actual_obj, timestamp) + except UnexpectedResponse as err: + if err.resp.status_int != HTTP_NOT_FOUND: + raise + if float(timestamp) > time() - self.reclaim_age: + # we'll have to retry the DELETE later + raise + self.pop_queue(container, obj) self.report_objects += 1 self.logger.increment('objects') except (Exception, Timeout) as err: @@ -213,6 +221,15 @@ class ObjectExpirer(Daemon): self.logger.timing_since('timing', start_time) self.report() + def pop_queue(self, container, obj): + """ + Issue a delete object request to the container for the expiring object + queue entry. + """ + direct_delete_container_entry(self.swift.container_ring, + self.expiring_objects_account, + container, obj) + def delete_actual_object(self, actual_obj, timestamp): """ Deletes the end-user object indicated by the actual object name given @@ -227,4 +244,4 @@ class ObjectExpirer(Daemon): path = '/v1/' + urllib.quote(actual_obj.lstrip('/')) self.swift.make_request('DELETE', path, {'X-If-Delete-At': str(timestamp)}, - (2, HTTP_NOT_FOUND, HTTP_PRECONDITION_FAILED)) + (2, HTTP_PRECONDITION_FAILED)) diff --git a/swift/obj/server.py b/swift/obj/server.py index 2122d81713..4d771ae7ff 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -617,10 +617,18 @@ class ObjectController(object): request=request, body='Bad X-If-Delete-At header value') else: + # request includes x-if-delete-at; we must not place a tombstone + # if we can not verify the x-if-delete-at time + if not orig_timestamp: + # no object found at all + return HTTPNotFound() if orig_delete_at != req_if_delete_at: return HTTPPreconditionFailed( request=request, body='X-If-Delete-At and X-Delete-At do not match') + else: + # differentiate success from no object at all + response_class = HTTPNoContent if orig_delete_at: self.delete_at_update('DELETE', orig_delete_at, account, container, obj, request, device, diff --git a/test/probe/test_object_expirer.py b/test/probe/test_object_expirer.py new file mode 100644 index 0000000000..4e336483c1 --- /dev/null +++ b/test/probe/test_object_expirer.py @@ -0,0 +1,131 @@ +#!/usr/bin/python -u +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import random +import unittest +import uuid + +from nose import SkipTest + +from swift.common.internal_client import InternalClient +from swift.common.manager import Manager +from swift.common.storage_policy import POLICIES + +from test.probe.common import reset_environment, get_to_final_state +from test.probe.test_container_merge_policy_index import BrainSplitter + +from swiftclient import client + + +class TestObjectExpirer(unittest.TestCase): + + def setUp(self): + if len(POLICIES) < 2: + raise SkipTest('Need more than one policy') + + self.expirer = Manager(['object-expirer']) + self.expirer.start() + err = self.expirer.stop() + if err: + raise SkipTest('Unable to verify object-expirer service') + + conf_files = [] + for server in self.expirer.servers: + conf_files.extend(server.conf_files()) + conf_file = conf_files[0] + self.client = InternalClient(conf_file, 'probe-test', 3) + + (self.pids, self.port2server, self.account_ring, self.container_ring, + self.object_ring, self.policy, self.url, self.token, + self.account, self.configs) = reset_environment() + self.container_name = 'container-%s' % uuid.uuid4() + self.object_name = 'object-%s' % uuid.uuid4() + self.brain = BrainSplitter(self.url, self.token, self.container_name, + self.object_name) + + def test_expirer_object_split_brain(self): + old_policy = random.choice(list(POLICIES)) + wrong_policy = random.choice([p for p in POLICIES if p != old_policy]) + # create an expiring object and a container with the wrong policy + self.brain.stop_primary_half() + self.brain.put_container(int(old_policy)) + self.brain.put_object(headers={'X-Delete-After': 1}) + # get the object timestamp + metadata = self.client.get_object_metadata( + self.account, self.container_name, self.object_name, + headers={'X-Backend-Storage-Policy-Index': int(old_policy)}) + create_timestamp = metadata['x-timestamp'] + self.brain.start_primary_half() + # get the expiring object updates in their queue, while we have all + # the servers up + Manager(['object-updater']).once() + self.brain.stop_handoff_half() + self.brain.put_container(int(wrong_policy)) + # don't start handoff servers, only wrong policy is available + + # make sure auto-created containers get in the account listing + Manager(['container-updater']).once() + # this guy should no-op since it's unable to expire the object + self.expirer.once() + + self.brain.start_handoff_half() + get_to_final_state() + + # validate object is expired + found_in_policy = None + metadata = self.client.get_object_metadata( + self.account, self.container_name, self.object_name, + acceptable_statuses=(4,), + headers={'X-Backend-Storage-Policy-Index': int(old_policy)}) + self.assert_('x-backend-timestamp' in metadata) + self.assertEqual(metadata['x-backend-timestamp'], + create_timestamp) + + # but it is still in the listing + for obj in self.client.iter_objects(self.account, + self.container_name): + if self.object_name == obj['name']: + break + else: + self.fail('Did not find listing for %s' % self.object_name) + + # clear proxy cache + client.post_container(self.url, self.token, self.container_name, {}) + # run the expirier again after replication + self.expirer.once() + + # object is not in the listing + for obj in self.client.iter_objects(self.account, + self.container_name): + if self.object_name == obj['name']: + self.fail('Found listing for %s' % self.object_name) + + # and validate object is tombstoned + found_in_policy = None + for policy in POLICIES: + metadata = self.client.get_object_metadata( + self.account, self.container_name, self.object_name, + acceptable_statuses=(4,), + headers={'X-Backend-Storage-Policy-Index': int(policy)}) + if 'x-backend-timestamp' in metadata: + if found_in_policy: + self.fail('found object in %s and also %s' % + (found_in_policy, policy)) + found_in_policy = policy + self.assert_('x-backend-timestamp' in metadata) + self.assert_(float(metadata['x-backend-timestamp']) > + float(create_timestamp)) + +if __name__ == "__main__": + unittest.main() diff --git a/test/unit/obj/test_expirer.py b/test/unit/obj/test_expirer.py index fd8100d7f9..faa46fdb2b 100644 --- a/test/unit/obj/test_expirer.py +++ b/test/unit/obj/test_expirer.py @@ -16,12 +16,14 @@ import urllib from time import time from unittest import main, TestCase -from test.unit import FakeLogger +from test.unit import FakeLogger, FakeRing, mocked_http_conn from copy import deepcopy +from tempfile import mkdtemp +from shutil import rmtree import mock -from swift.common import internal_client +from swift.common import internal_client, utils from swift.obj import expirer @@ -49,7 +51,11 @@ class TestObjectExpirer(TestCase): internal_client.loadapp = lambda *a, **kw: None internal_client.sleep = not_sleep + self.rcache = mkdtemp() + self.logger = FakeLogger() + def teardown(self): + rmtree(self.rcache) internal_client.sleep = self.old_sleep internal_client.loadapp = self.loadapp @@ -166,6 +172,9 @@ class TestObjectExpirer(TestCase): def test_delete_object(self): class InternalClient(object): + + container_ring = None + def __init__(self, test, account, container, obj): self.test = test self.account = account @@ -173,12 +182,6 @@ class TestObjectExpirer(TestCase): self.obj = obj self.delete_object_called = False - def delete_object(self, account, container, obj): - self.test.assertEqual(self.account, account) - self.test.assertEqual(self.container, container) - self.test.assertEqual(self.obj, obj) - self.delete_object_called = True - class DeleteActualObject(object): def __init__(self, test, actual_obj, timestamp): self.test = test @@ -196,20 +199,27 @@ class TestObjectExpirer(TestCase): actual_obj = 'actual_obj' timestamp = 'timestamp' - x = expirer.ObjectExpirer({}) - x.logger = FakeLogger() + x = expirer.ObjectExpirer({}, logger=self.logger) x.swift = \ InternalClient(self, x.expiring_objects_account, container, obj) x.delete_actual_object = \ DeleteActualObject(self, actual_obj, timestamp) + delete_object_called = [] + + def pop_queue(c, o): + self.assertEqual(container, c) + self.assertEqual(obj, o) + delete_object_called[:] = [True] + + x.pop_queue = pop_queue + x.delete_object(actual_obj, timestamp, container, obj) - self.assertTrue(x.swift.delete_object_called) + self.assertTrue(delete_object_called) self.assertTrue(x.delete_actual_object.called) def test_report(self): - x = expirer.ObjectExpirer({}) - x.logger = FakeLogger() + x = expirer.ObjectExpirer({}, logger=self.logger) x.report() self.assertEqual(x.logger.log_dict['info'], []) @@ -230,8 +240,7 @@ class TestObjectExpirer(TestCase): x.logger.log_dict['info']) def test_run_once_nothing_to_do(self): - x = expirer.ObjectExpirer({}) - x.logger = FakeLogger() + x = expirer.ObjectExpirer({}, logger=self.logger) x.swift = 'throw error because a string does not have needed methods' x.run_once() self.assertEqual(x.logger.log_dict['exception'], @@ -247,8 +256,7 @@ class TestObjectExpirer(TestCase): def iter_containers(*a, **kw): return [] - x = expirer.ObjectExpirer({}) - x.logger = FakeLogger() + x = expirer.ObjectExpirer({}, logger=self.logger) x.swift = InternalClient() x.run_once() self.assertEqual( @@ -271,8 +279,8 @@ class TestObjectExpirer(TestCase): def iter_objects(*a, **kw): raise Exception('This should not have been called') - x = expirer.ObjectExpirer({}) - x.logger = FakeLogger() + x = expirer.ObjectExpirer({'recon_cache_path': self.rcache}, + logger=self.logger) x.swift = InternalClient([{'name': str(int(time() + 86400))}]) x.run_once() for exccall in x.logger.log_dict['exception']: @@ -285,9 +293,8 @@ class TestObjectExpirer(TestCase): (('Pass completed in 0s; 0 objects expired',), {})]) # Reverse test to be sure it still would blow up the way expected. - x = expirer.ObjectExpirer({}) - x.logger = FakeLogger() - x.swift = InternalClient([{'name': str(int(time() - 86400))}]) + fake_swift = InternalClient([{'name': str(int(time() - 86400))}]) + x = expirer.ObjectExpirer({}, logger=self.logger, swift=fake_swift) x.run_once() self.assertEqual( x.logger.log_dict['exception'], @@ -315,11 +322,10 @@ class TestObjectExpirer(TestCase): def should_not_be_called(*a, **kw): raise Exception('This should not have been called') - x = expirer.ObjectExpirer({}) - x.logger = FakeLogger() - x.swift = InternalClient( + fake_swift = InternalClient( [{'name': str(int(time() - 86400))}], [{'name': '%d-actual-obj' % int(time() + 86400)}]) + x = expirer.ObjectExpirer({}, logger=self.logger, swift=fake_swift) x.run_once() for exccall in x.logger.log_dict['exception']: self.assertTrue( @@ -331,12 +337,11 @@ class TestObjectExpirer(TestCase): (('Pass completed in 0s; 0 objects expired',), {})]) # Reverse test to be sure it still would blow up the way expected. - x = expirer.ObjectExpirer({}) - x.logger = FakeLogger() ts = int(time() - 86400) - x.swift = InternalClient( + fake_swift = InternalClient( [{'name': str(int(time() - 86400))}], [{'name': '%d-actual-obj' % ts}]) + x = expirer.ObjectExpirer({}, logger=self.logger, swift=fake_swift) x.delete_actual_object = should_not_be_called x.run_once() excswhiledeleting = [] @@ -350,6 +355,9 @@ class TestObjectExpirer(TestCase): def test_failed_delete_keeps_entry(self): class InternalClient(object): + + container_ring = None + def __init__(self, containers, objects): self.containers = containers self.objects = objects @@ -363,9 +371,6 @@ class TestObjectExpirer(TestCase): def delete_container(*a, **kw): pass - def delete_object(*a, **kw): - raise Exception('This should not have been called') - def iter_objects(self, *a, **kw): return self.objects @@ -375,14 +380,14 @@ class TestObjectExpirer(TestCase): def should_not_get_called(container, obj): raise Exception('This should not have been called') - x = expirer.ObjectExpirer({}) - x.logger = FakeLogger() - x.iter_containers = lambda: [str(int(time() - 86400))] ts = int(time() - 86400) - x.delete_actual_object = deliberately_blow_up - x.swift = InternalClient( + fake_swift = InternalClient( [{'name': str(int(time() - 86400))}], [{'name': '%d-actual-obj' % ts}]) + x = expirer.ObjectExpirer({}, logger=self.logger, swift=fake_swift) + x.iter_containers = lambda: [str(int(time() - 86400))] + x.delete_actual_object = deliberately_blow_up + x.pop_queue = should_not_get_called x.run_once() excswhiledeleting = [] for exccall in x.logger.log_dict['exception']: @@ -399,13 +404,14 @@ class TestObjectExpirer(TestCase): (('Pass completed in 0s; 0 objects expired',), {})]) # Reverse test to be sure it still would blow up the way expected. - x = expirer.ObjectExpirer({}) - x.logger = FakeLogger() ts = int(time() - 86400) - x.delete_actual_object = lambda o, t: None - x.swift = InternalClient( + fake_swift = InternalClient( [{'name': str(int(time() - 86400))}], [{'name': '%d-actual-obj' % ts}]) + self.logger._clear() + x = expirer.ObjectExpirer({}, logger=self.logger, swift=fake_swift) + x.delete_actual_object = lambda o, t: None + x.pop_queue = should_not_get_called x.run_once() excswhiledeleting = [] for exccall in x.logger.log_dict['exception']: @@ -418,6 +424,9 @@ class TestObjectExpirer(TestCase): def test_success_gets_counted(self): class InternalClient(object): + + container_ring = None + def __init__(self, containers, objects): self.containers = containers self.objects = objects @@ -437,13 +446,13 @@ class TestObjectExpirer(TestCase): def iter_objects(self, *a, **kw): return self.objects - x = expirer.ObjectExpirer({}) - x.logger = FakeLogger() - x.delete_actual_object = lambda o, t: None - self.assertEqual(x.report_objects, 0) - x.swift = InternalClient( + fake_swift = InternalClient( [{'name': str(int(time() - 86400))}], [{'name': '%d-actual-obj' % int(time() - 86400)}]) + x = expirer.ObjectExpirer({}, logger=self.logger, swift=fake_swift) + x.delete_actual_object = lambda o, t: None + x.pop_queue = lambda c, o: None + self.assertEqual(x.report_objects, 0) x.run_once() self.assertEqual(x.report_objects, 1) self.assertEqual( @@ -454,6 +463,9 @@ class TestObjectExpirer(TestCase): def test_delete_actual_object_does_not_get_unicode(self): class InternalClient(object): + + container_ring = None + def __init__(self, containers, objects): self.containers = containers self.objects = objects @@ -479,13 +491,13 @@ class TestObjectExpirer(TestCase): if isinstance(actual_obj, unicode): got_unicode[0] = True - x = expirer.ObjectExpirer({}) - x.logger = FakeLogger() - x.delete_actual_object = delete_actual_object_test_for_unicode - self.assertEqual(x.report_objects, 0) - x.swift = InternalClient( + fake_swift = InternalClient( [{'name': str(int(time() - 86400))}], [{'name': u'%d-actual-obj' % int(time() - 86400)}]) + x = expirer.ObjectExpirer({}, logger=self.logger, swift=fake_swift) + x.delete_actual_object = delete_actual_object_test_for_unicode + x.pop_queue = lambda c, o: None + self.assertEqual(x.report_objects, 0) x.run_once() self.assertEqual(x.report_objects, 1) self.assertEqual( @@ -497,6 +509,9 @@ class TestObjectExpirer(TestCase): def test_failed_delete_continues_on(self): class InternalClient(object): + + container_ring = None + def __init__(self, containers, objects): self.containers = containers self.objects = objects @@ -519,8 +534,7 @@ class TestObjectExpirer(TestCase): def fail_delete_actual_object(actual_obj, timestamp): raise Exception('failed to delete actual object') - x = expirer.ObjectExpirer({}) - x.logger = FakeLogger() + x = expirer.ObjectExpirer({}, logger=self.logger) cts = int(time() - 86400) ots = int(time() - 86400) @@ -594,8 +608,7 @@ class TestObjectExpirer(TestCase): raise Exception('exception %d' % raises[0]) raise SystemExit('exiting exception %d' % raises[0]) - x = expirer.ObjectExpirer({}) - x.logger = FakeLogger() + x = expirer.ObjectExpirer({}, logger=self.logger) orig_sleep = expirer.sleep try: expirer.sleep = not_sleep @@ -643,7 +656,7 @@ class TestObjectExpirer(TestCase): self.assertEqual(got_env[0]['HTTP_X_IF_DELETE_AT'], ts) self.assertEqual(got_env[0]['PATH_INFO'], '/v1/path/to/object name') - def test_delete_actual_object_handles_404(self): + def test_delete_actual_object_raises_404(self): def fake_app(env, start_response): start_response('404 Not Found', [('Content-Length', '0')]) @@ -652,7 +665,8 @@ class TestObjectExpirer(TestCase): internal_client.loadapp = lambda *a, **kw: fake_app x = expirer.ObjectExpirer({}) - x.delete_actual_object('/path/to/object', '1234') + self.assertRaises(internal_client.UnexpectedResponse, + x.delete_actual_object, '/path/to/object', '1234') def test_delete_actual_object_handles_412(self): @@ -696,6 +710,27 @@ class TestObjectExpirer(TestCase): self.assertEqual(x.swift.make_request.call_args[0][1], '/v1/' + urllib.quote(name)) + def test_pop_queue(self): + class InternalClient(object): + container_ring = FakeRing() + x = expirer.ObjectExpirer({}, logger=self.logger, + swift=InternalClient()) + requests = [] + + def capture_requests(ipaddr, port, method, path, *args, **kwargs): + requests.append((method, path)) + with mocked_http_conn( + 200, 200, 200, give_connect=capture_requests) as fake_conn: + x.pop_queue('c', 'o') + self.assertRaises(StopIteration, fake_conn.code_iter.next) + for method, path in requests: + self.assertEqual(method, 'DELETE') + device, part, account, container, obj = utils.split_path( + path, 5, 5, True) + self.assertEqual(account, '.expiring_objects') + self.assertEqual(container, 'c') + self.assertEqual(obj, 'o') + if __name__ == '__main__': main() diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index 435f26457a..b9f9345159 100755 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -3266,6 +3266,16 @@ class TestObjectController(unittest.TestCase): # but file still exists self.assert_(os.path.isfile(objfile)) + # make the x-if-delete-at with some wrong bits + req = Request.blank( + '/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'DELETE'}, + headers={'X-Timestamp': delete_at_timestamp, + 'X-If-Delete-At': int(time() + 1)}) + resp = req.get_response(self.object_controller) + self.assertEquals(resp.status_int, 412) + self.assertTrue(os.path.isfile(objfile)) + # make the x-if-delete-at with all the right bits req = Request.blank( '/sda1/p/a/c/o', @@ -3273,9 +3283,28 @@ class TestObjectController(unittest.TestCase): headers={'X-Timestamp': delete_at_timestamp, 'X-If-Delete-At': delete_at_timestamp}) resp = req.get_response(self.object_controller) - self.assertEquals(resp.status_int, 404) + self.assertEquals(resp.status_int, 204) self.assertFalse(os.path.isfile(objfile)) + # make the x-if-delete-at with all the right bits (again) + req = Request.blank( + '/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'DELETE'}, + headers={'X-Timestamp': delete_at_timestamp, + 'X-If-Delete-At': delete_at_timestamp}) + resp = req.get_response(self.object_controller) + self.assertEquals(resp.status_int, 412) + self.assertFalse(os.path.isfile(objfile)) + + # make the x-if-delete-at for some not found + req = Request.blank( + '/sda1/p/a/c/o-not-found', + environ={'REQUEST_METHOD': 'DELETE'}, + headers={'X-Timestamp': delete_at_timestamp, + 'X-If-Delete-At': delete_at_timestamp}) + resp = req.get_response(self.object_controller) + self.assertEquals(resp.status_int, 404) + def test_DELETE_if_delete_at(self): test_time = time() + 10000 req = Request.blank(