diff --git a/bin/swift-container-reconciler b/bin/swift-container-reconciler new file mode 100755 index 0000000000..29f35d8a82 --- /dev/null +++ b/bin/swift-container-reconciler @@ -0,0 +1,21 @@ +#!/usr/bin/env python +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from swift.container.reconciler import ContainerReconciler +from swift.common.utils import parse_options +from swift.common.daemon import run_daemon + +if __name__ == '__main__': + conf_file, options = parse_options(once=True) + run_daemon(ContainerReconciler, conf_file, **options) diff --git a/bin/swift-reconciler-enqueue b/bin/swift-reconciler-enqueue new file mode 100755 index 0000000000..662eb393fd --- /dev/null +++ b/bin/swift-reconciler-enqueue @@ -0,0 +1,74 @@ +#!/usr/bin/env python +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import sys +from optparse import OptionParser + +import eventlet.debug +eventlet.debug.hub_exceptions(True) + +from swift.common.ring import Ring +from swift.common.utils import split_path +from swift.common.storage_policy import POLICIES + +from swift.container.reconciler import add_to_reconciler_queue +""" +This tool is primarly for debugging and development but can be used an example +of how an operator could enqueue objects manually if a problem is discovered - +might be particularlly useful if you need to hack a fix into the reconciler +and re-run it. +""" + +USAGE = """ +%prog [options] + +This script enqueues an object to be evaluated by the reconciler. + +Arguments: +policy_index: the policy the object is currently stored in. + /a/c/o: the full path of the object - utf-8 + timestamp: the timestamp of the datafile/tombstone. + +""".strip() + +parser = OptionParser(USAGE) +parser.add_option('-X', '--op', default='PUT', choices=('PUT', 'DELETE'), + help='the method of the misplaced operation') +parser.add_option('-f', '--force', action='store_true', + help='force an object to be re-enqueued') + + +def main(): + options, args = parser.parse_args() + try: + policy_index, path, timestamp = args + except ValueError: + sys.exit(parser.print_help()) + container_ring = Ring('/etc/swift/container.ring.gz') + policy = POLICIES.get_by_index(policy_index) + if not policy: + return 'ERROR: invalid storage policy index: %s' % policy + try: + account, container, obj = split_path(path, 3, 3, True) + except ValueError as e: + return 'ERROR: %s' % e + container_name = add_to_reconciler_queue( + container_ring, account, container, obj, + policy.idx, timestamp, options.op, force=options.force) + if not container_name: + return 'ERROR: unable to enqueue!' + print container_name + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/etc/container-reconciler.conf-sample b/etc/container-reconciler.conf-sample new file mode 100644 index 0000000000..0a3de0220d --- /dev/null +++ b/etc/container-reconciler.conf-sample @@ -0,0 +1,52 @@ +[DEFAULT] +# swift_dir = /etc/swift +# user = swift +# You can specify default log routing here if you want: +# log_name = swift +# log_facility = LOG_LOCAL0 +# log_level = INFO +# log_address = /dev/log +# +# comma separated list of functions to call to setup custom log handlers. +# functions get passed: conf, name, log_to_console, log_route, fmt, logger, +# adapted_logger +# log_custom_handlers = +# +# If set, log_udp_host will override log_address +# log_udp_host = +# log_udp_port = 514 +# +# You can enable StatsD logging here: +# log_statsd_host = localhost +# log_statsd_port = 8125 +# log_statsd_default_sample_rate = 1.0 +# log_statsd_sample_rate_factor = 1.0 +# log_statsd_metric_prefix = + +[container-reconciler] +# The reconciler will re-attempt reconciliation if the source object is not +# available up to reclaim_age seconds before it gives up and deletes the entry +# in the queue. +# reclaim_age = 604800 +# The cycle time of the daemon +# interval = 300 +# Server errors from requests will be retried by default +# request_tries = 3 + +[pipeline:main] +pipeline = catch_errors proxy-logging cache proxy-server + +[app:proxy-server] +use = egg:swift#proxy +# See proxy-server.conf-sample for options + +[filter:cache] +use = egg:swift#memcache +# See proxy-server.conf-sample for options + +[filter:proxy-logging] +use = egg:swift#proxy_logging + +[filter:catch_errors] +use = egg:swift#catch_errors +# See proxy-server.conf-sample for options diff --git a/setup.cfg b/setup.cfg index 6239e68ed5..0c35d53f04 100644 --- a/setup.cfg +++ b/setup.cfg @@ -39,6 +39,8 @@ scripts = bin/swift-container-server bin/swift-container-sync bin/swift-container-updater + bin/swift-container-reconciler + bin/swift-reconciler-enqueue bin/swift-dispersion-populate bin/swift-dispersion-report bin/swift-drive-audit diff --git a/swift/common/manager.py b/swift/common/manager.py index 103b5c2979..583912b3c2 100644 --- a/swift/common/manager.py +++ b/swift/common/manager.py @@ -30,7 +30,8 @@ RUN_DIR = '/var/run/swift' # auth-server has been removed from ALL_SERVERS, start it explicitly ALL_SERVERS = ['account-auditor', 'account-server', 'container-auditor', - 'container-replicator', 'container-server', 'container-sync', + 'container-replicator', 'container-reconciler', + 'container-server', 'container-sync', 'container-updater', 'object-auditor', 'object-server', 'object-expirer', 'object-replicator', 'object-updater', 'proxy-server', 'account-replicator', 'account-reaper'] @@ -41,7 +42,7 @@ GRACEFUL_SHUTDOWN_SERVERS = MAIN_SERVERS + ['auth-server'] START_ONCE_SERVERS = REST_SERVERS # These are servers that match a type (account-*, container-*, object-*) but # don't use that type-server.conf file and instead use their own. -STANDALONE_SERVERS = ['object-expirer'] +STANDALONE_SERVERS = ['object-expirer', 'container-reconciler'] KILL_WAIT = 15 # seconds to wait for servers to die (by default) WARNING_WAIT = 3 # seconds to wait after message that may just be a warning diff --git a/swift/common/utils.py b/swift/common/utils.py index 02e9cfe572..1a5b592269 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -49,6 +49,7 @@ import glob from urlparse import urlparse as stdlib_urlparse, ParseResult import itertools import stat +import datetime import eventlet import eventlet.semaphore @@ -576,6 +577,18 @@ def normalize_timestamp(timestamp): return "%016.05f" % (float(timestamp)) +def last_modified_date_to_timestamp(last_modified_date_str): + """ + Convert a last modified date (liked you'd get from a container listing, + e.g. 2014-02-28T23:22:36.698390) to a float. + """ + return float( + datetime.datetime.strptime( + last_modified_date_str, '%Y-%m-%dT%H:%M:%S.%f' + ).strftime('%s.%f') + ) + + def normalize_delete_at_timestamp(timestamp): """ Format a timestamp (string or numeric) into a standardized diff --git a/swift/container/reconciler.py b/swift/container/reconciler.py new file mode 100644 index 0000000000..64aa7ba373 --- /dev/null +++ b/swift/container/reconciler.py @@ -0,0 +1,749 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +from collections import defaultdict +import socket +import itertools +import logging + +from eventlet import GreenPile, GreenPool, Timeout + +from swift.common import constraints +from swift.common.daemon import Daemon +from swift.common.direct_client import ( + direct_head_container, direct_delete_container_object, + direct_put_container_object, ClientException) +from swift.common.internal_client import InternalClient, UnexpectedResponse +from swift.common.storage_policy import POLICY_INDEX +from swift.common.utils import get_logger, split_path, quorum_size, \ + FileLikeIter, normalize_timestamp, last_modified_date_to_timestamp + + +MISPLACED_OBJECTS_ACCOUNT = '.misplaced_objects' +MISPLACED_OBJECTS_CONTAINER_DIVISOR = 3600 # 1 hour + + +def cmp_policy_info(info, remote_info): + """ + You have to squint to see it, but the general strategy is just: + + if either has been recreated: + return the newest (of the recreated) + else + return the oldest + + I tried cleaning it up for awhile, but settled on just writing a bunch of + tests instead. Once you get an intuitive sense for the nuance here you + can try and see there's a better way to spell the boolean logic but it all + ends up looking sorta hairy. + + :returns: -1 if info is correct, 1 if remote_info is better + """ + def is_deleted(info): + return (info['delete_timestamp'] > info['put_timestamp'] and + info.get('count', info.get('object_count', 0)) == 0) + + deleted = is_deleted(info) + remote_deleted = is_deleted(remote_info) + if any([deleted, remote_deleted]): + if not deleted: + return -1 + elif not remote_deleted: + return 1 + return cmp(remote_info['status_changed_at'], + info['status_changed_at']) + + def has_been_recreated(info): + return (info['put_timestamp'] > info['delete_timestamp'] > + normalize_timestamp(0)) + + remote_recreated = has_been_recreated(remote_info) + recreated = has_been_recreated(info) + if any([remote_recreated, recreated]): + if not recreated: + return 1 + elif not remote_recreated: + return -1 + return cmp(remote_info['status_changed_at'], + info['status_changed_at']) + return cmp(info['status_changed_at'], remote_info['status_changed_at']) + + +def incorrect_policy_index(info, remote_info): + """ + Compare remote_info to info and decide if the remote storage policy index + should be used instead of ours. + """ + if 'storage_policy_index' not in remote_info: + return False + if remote_info['storage_policy_index'] == \ + info['storage_policy_index']: + return False + + return info['storage_policy_index'] != sorted( + [info, remote_info], cmp=cmp_policy_info)[0]['storage_policy_index'] + + +def translate_container_headers_to_info(headers): + default_timestamp = normalize_timestamp(0) + return { + 'storage_policy_index': int(headers[POLICY_INDEX]), + 'put_timestamp': headers.get('x-backend-put-timestamp', + default_timestamp), + 'delete_timestamp': headers.get('x-backend-delete-timestamp', + default_timestamp), + 'status_changed_at': headers.get('x-backend-status-changed-at', + default_timestamp), + } + + +def best_policy_index(headers): + container_info = map(translate_container_headers_to_info, headers) + container_info.sort(cmp=cmp_policy_info) + return container_info[0]['storage_policy_index'] + + +def get_reconciler_container_name(obj_timestamp): + return str(int(float(obj_timestamp)) // + MISPLACED_OBJECTS_CONTAINER_DIVISOR * + MISPLACED_OBJECTS_CONTAINER_DIVISOR) + + +def get_reconciler_obj_name(policy_index, account, container, obj): + return "%(policy_index)d:/%(acc)s/%(con)s/%(obj)s" % { + 'policy_index': policy_index, 'acc': account, + 'con': container, 'obj': obj} + + +def get_reconciler_content_type(op): + try: + return { + 'put': 'application/x-put', + 'delete': 'application/x-delete', + }[op.lower()] + except KeyError: + raise ValueError('invalid operation type %r' % op) + + +def get_row_to_q_entry_translater(broker): + account = broker.account + container = broker.container + op_type = { + 0: get_reconciler_content_type('put'), + 1: get_reconciler_content_type('delete'), + } + + def translater(obj_info): + name = get_reconciler_obj_name(obj_info['storage_policy_index'], + account, container, + obj_info['name']) + return { + 'name': name, + 'deleted': 0, + 'created_at': obj_info['created_at'], + 'etag': obj_info['created_at'], + 'content_type': op_type[obj_info['deleted']], + 'size': 0, + } + return translater + + +def add_to_reconciler_queue(container_ring, account, container, obj, + obj_policy_index, obj_timestamp, op, + force=False, conn_timeout=5, response_timeout=15): + """ + Add an object to the container reconciler's queue. This will cause the + container reconciler to move it from its current storage policy index to + the correct storage policy index. + + :param container_ring: container ring + :param account: the misplaced object's account + :param container: the misplaced object's container + :param obj: the misplaced object + :param obj_policy_index: the policy index where the misplaced object + currently is + :param obj_timestamp: the misplaced object's X-Timestamp. We need this to + ensure that the reconciler doesn't overwrite a newer + object with an older one. + :param op: the method of the operation (DELETE or PUT) + :param force: over-write queue entries newer than obj_timestamp + :param conn_timeout: max time to wait for connection to container server + :param response_timeout: max time to wait for response from container + server + + :returns: .misplaced_object container name, False on failure. "Success" + means a quorum of containers got the update. + """ + container_name = get_reconciler_container_name(obj_timestamp) + object_name = get_reconciler_obj_name(obj_policy_index, account, + container, obj) + if force: + # this allows an operator to re-enqueue an object that has + # already been popped from the queue to be reprocessed, but + # could potentially prevent out of order updates from making it + # into the queue + x_timestamp = normalize_timestamp(time.time()) + else: + x_timestamp = obj_timestamp + q_op_type = get_reconciler_content_type(op) + headers = { + 'X-Size': 0, + 'X-Etag': obj_timestamp, + 'X-Timestamp': x_timestamp, + 'X-Content-Type': q_op_type, + } + + def _check_success(*args, **kwargs): + try: + direct_put_container_object(*args, **kwargs) + return 1 + except (ClientException, Timeout, socket.error): + return 0 + + pile = GreenPile() + part, nodes = container_ring.get_nodes(MISPLACED_OBJECTS_ACCOUNT, + container_name) + for node in nodes: + pile.spawn(_check_success, node, part, MISPLACED_OBJECTS_ACCOUNT, + container_name, object_name, headers=headers, + conn_timeout=conn_timeout, + response_timeout=response_timeout) + + successes = sum(pile) + if successes >= quorum_size(len(nodes)): + return container_name + else: + return False + + +def slightly_later_timestamp(ts, offset=1): + # I'm guessing to avoid rounding errors Swift uses a 10-microsecond + # resolution instead of Python's 1-microsecond resolution. + offset *= 0.00001 + return normalize_timestamp(float(ts) + offset) + + +def parse_raw_obj(obj_info): + """ + Translate a reconciler container listing entry to a dictionary + containing the parts of the misplaced object queue entry. + + :param obj_info: an entry in an a container listing with the + required keys: name, content_type, and hash + + :returns: a queue entry dict with the keys: q_policy_index, account, + container, obj, q_op, q_ts, q_record, and path + """ + raw_obj_name = obj_info['name'].encode('utf-8') + + policy_index, obj_name = raw_obj_name.split(':', 1) + q_policy_index = int(policy_index) + account, container, obj = split_path(obj_name, 3, 3, rest_with_last=True) + try: + q_op = { + 'application/x-put': 'PUT', + 'application/x-delete': 'DELETE', + }[obj_info['content_type']] + except KeyError: + raise ValueError('invalid operation type %r' % + obj_info.get('content_type', None)) + return { + 'q_policy_index': q_policy_index, + 'account': account, + 'container': container, + 'obj': obj, + 'q_op': q_op, + 'q_ts': float(obj_info['hash']), + 'q_record': last_modified_date_to_timestamp( + obj_info['last_modified']), + 'path': '/%s/%s/%s' % (account, container, obj) + } + + +def direct_get_container_policy_index(container_ring, account_name, + container_name): + """ + Talk directly to the primary container servers to figure out the storage + policy index for a given container. + + :param container_ring: ring in which to look up the container locations + :param account_name: name of the container's account + :param container_name: name of the container + :returns: storage policy index, or None if it couldn't get a quorum + """ + def _eat_client_exception(*args): + try: + return direct_head_container(*args) + except ClientException as err: + if err.http_status == 404: + return err.http_headers + except (Timeout, socket.error): + pass + + pile = GreenPile() + part, nodes = container_ring.get_nodes(account_name, container_name) + for node in nodes: + pile.spawn(_eat_client_exception, node, part, account_name, + container_name) + + headers = [x for x in pile if x is not None] + if len(headers) < quorum_size(len(nodes)): + return + return best_policy_index(headers) + + +def direct_delete_container_entry(container_ring, account_name, container_name, + object_name, headers=None): + """ + Talk directly to the primary container servers to delete a particular + object listing. Does not talk to object servers; use this only when a + container entry does not actually have a corresponding object. + """ + pool = GreenPool() + part, nodes = container_ring.get_nodes(account_name, container_name) + for node in nodes: + pool.spawn_n(direct_delete_container_object, node, part, account_name, + container_name, object_name, headers=headers) + + # This either worked or it didn't; if it didn't, we'll retry on the next + # reconciler loop when we see the queue entry again. + pool.waitall() + + +class ContainerReconciler(Daemon): + """ + Move objects that are in the wrong storage policy. + """ + + def __init__(self, conf): + self.conf = conf + self.reclaim_age = int(conf.get('reclaim_age', 86400 * 7)) + self.interval = int(conf.get('interval', 30)) + conf_path = conf.get('__file__') or \ + '/etc/swift/container-reconciler.conf' + self.logger = get_logger(conf, log_route='container-reconciler') + request_tries = int(conf.get('request_tries') or 3) + self.swift = InternalClient(conf_path, + 'Swift Container Reconciler', + request_tries) + self.stats = defaultdict(int) + self.last_stat_time = time.time() + + def stats_log(self, metric, msg, *args, **kwargs): + """ + Update stats tracking for metric and emit log message. + """ + level = kwargs.pop('level', logging.DEBUG) + log_message = '%s: ' % metric + msg + self.logger.log(level, log_message, *args, **kwargs) + self.stats[metric] += 1 + + def log_stats(self, force=False): + """ + Dump stats to logger, noop when stats have been already been + logged in the last minute. + """ + now = time.time() + should_log = force or (now - self.last_stat_time > 60) + if should_log: + self.last_stat_time = now + self.logger.info('Reconciler Stats: %r', dict(**self.stats)) + + def pop_queue(self, container, obj, q_ts, q_record): + """ + Issue a delete object request to the container for the misplaced + object queue entry. + + :param container: the misplaced objects container + :param q_ts: the timestamp of the misplaced object + :param q_record: the timestamp of the queue entry + + N.B. q_ts will normally be the same time as q_record except when + an object was manually re-enqued. + """ + q_path = '/%s/%s/%s' % (MISPLACED_OBJECTS_ACCOUNT, container, obj) + x_timestamp = slightly_later_timestamp(q_record) + self.stats_log('pop_queue', 'remove %r (%f) from the queue (%s)', + q_path, q_ts, x_timestamp) + headers = {'X-Timestamp': x_timestamp} + direct_delete_container_entry( + self.swift.container_ring, MISPLACED_OBJECTS_ACCOUNT, + container, obj, headers=headers) + + def throw_tombstones(self, account, container, obj, timestamp, + policy_index, path): + """ + Issue a delete object request to the given storage_policy. + + :param account: the account name + :param container: the container name + :param account: the object name + :param timestamp: the timestamp of the object to delete + :param policy_index: the policy index to direct the request + :param path: the path to be used for logging + """ + x_timestamp = slightly_later_timestamp(timestamp) + self.stats_log('cleanup_attempt', '%r (%f) from policy_index ' + '%s (%s) will be deleted', + path, timestamp, policy_index, x_timestamp) + headers = { + 'X-Timestamp': x_timestamp, + 'X-Backend-Storage-Policy-Index': policy_index, + } + success = False + try: + self.swift.delete_object(account, container, obj, + acceptable_statuses=(2, 4), + headers=headers) + except UnexpectedResponse as err: + self.stats_log('cleanup_failed', '%r (%f) was not cleaned up ' + 'in storage_policy %s (%s)', path, timestamp, + policy_index, err) + else: + success = True + self.stats_log('cleanup_success', '%r (%f) was successfully ' + 'removed from policy_index %s', path, timestamp, + policy_index) + return success + + def _reconcile_object(self, account, container, obj, q_policy_index, q_ts, + q_op, path, **kwargs): + """ + Perform object reconciliation. + + :param account: the account name of the misplaced object + :param container: the container name of the misplaced object + :param obj: the object name + :param q_policy_index: the policy index of the source indicated by the + queue entry. + :param q_ts: a float, the timestamp of the misplaced object + :param q_op: the operation of the misplaced request + :param path: the full path of the misplaced object for logging + + :returns: True to indicate the request is fully processed + successfully, otherwise False. + """ + container_policy_index = direct_get_container_policy_index( + self.swift.container_ring, account, container) + if container_policy_index is None: + self.stats_log('unavailable_container', '%r (%f) unable to ' + 'determine the destination policy_index', + path, q_ts) + return False + if container_policy_index == q_policy_index: + self.stats_log('noop_object', '%r (%f) container policy_index ' + '%s matches queue policy index %s', path, q_ts, + container_policy_index, q_policy_index) + return True + + # check if object exists in the destination already + self.logger.debug('checking for %r (%f) in destination ' + 'policy_index %s', path, q_ts, + container_policy_index) + headers = { + 'X-Backend-Storage-Policy-Index': container_policy_index} + dest_obj = self.swift.get_object_metadata(account, container, obj, + headers=headers, + acceptable_statuses=(2, 4)) + dest_ts = float( + dest_obj.get('x-timestamp', + dest_obj.get('x-backend-timestamp', + '0.0') + ) + ) + if dest_ts >= q_ts: + self.stats_log('found_object', '%r (%f) in policy_index %s ' + 'is newer than queue (%f)', path, dest_ts, + container_policy_index, q_ts) + return self.throw_tombstones(account, container, obj, q_ts, + q_policy_index, path) + + # object is misplaced + self.stats_log('misplaced_object', '%r (%f) in policy_index %s ' + 'should be in policy_index %s', path, q_ts, + q_policy_index, container_policy_index) + + # fetch object from the source location + self.logger.debug('fetching %r (%f) from storage policy %s', path, + q_ts, q_policy_index) + headers = { + 'X-Backend-Storage-Policy-Index': q_policy_index} + try: + source_obj_status, source_obj_info, source_obj_iter = \ + self.swift.get_object(account, container, obj, + headers=headers, + acceptable_statuses=(2, 4)) + except UnexpectedResponse as err: + source_obj_status = err.resp.status_int + source_obj_info = {} + source_obj_iter = None + + source_ts = float(source_obj_info.get("X-Timestamp", 0)) + if source_obj_status == 404 and q_op == 'DELETE': + return self.ensure_tombstone_in_right_location( + q_policy_index, account, container, obj, q_ts, path, + container_policy_index, source_ts) + else: + return self.ensure_object_in_right_location( + q_policy_index, account, container, obj, q_ts, path, + container_policy_index, source_ts, source_obj_status, + source_obj_info, source_obj_iter) + + def ensure_object_in_right_location(self, q_policy_index, account, + container, obj, q_ts, path, + container_policy_index, source_ts, + source_obj_status, source_obj_info, + source_obj_iter, **kwargs): + """ + Validate source object will satisfy the misplaced object queue entry + and move to destination. + + :param q_policy_index: the policy_index for the source object + :param account: the account name of the misplaced object + :param container: the container name of the misplaced object + :param obj: the name of the misplaced object + :param q_ts: a float, the timestamp of the misplaced object + :param path: the full path of the misplaced object for logging + :param container_policy_index: the policy_index of the destination + :param source_ts: a float, the timestamp of the source object + :param source_obj_status: the HTTP status source object request + :param source_obj_info: the HTTP headers of the source object request + :param source_obj_iter: the body iter of the source object request + """ + if source_obj_status // 100 != 2 or source_ts < q_ts: + if q_ts < time.time() - self.reclaim_age: + # it's old and there are no tombstones or anything; give up + self.stats_log('lost_source', '%r (%f) was not available in ' + 'policy_index %s and has expired', path, q_ts, + q_policy_index, level=logging.CRITICAL) + return True + # the source object is unavailable or older than the queue + # entry; a version that will satisfy the queue entry hopefully + # exists somewhere in the cluster, so wait and try again + self.stats_log('unavailable_source', '%r (%f) in ' + 'policy_index %s responded %s (%f)', path, + q_ts, q_policy_index, source_obj_status, + source_ts, level=logging.WARNING) + return False + + # optimistically move any source with a timestamp >= q_ts + ts = max(float(source_ts), q_ts) + # move the object + put_timestamp = slightly_later_timestamp(ts, offset=2) + self.stats_log('copy_attempt', '%r (%f) in policy_index %s will be ' + 'moved to policy_index %s (%s)', path, source_ts, + q_policy_index, container_policy_index, put_timestamp) + headers = source_obj_info.copy() + headers['X-Backend-Storage-Policy-Index'] = container_policy_index + headers['X-Timestamp'] = put_timestamp + + try: + self.swift.upload_object( + FileLikeIter(source_obj_iter), account, container, obj, + headers=headers) + except UnexpectedResponse as err: + self.stats_log('copy_failed', 'upload %r (%f) from ' + 'policy_index %s to policy_index %s ' + 'returned %s', path, source_ts, q_policy_index, + container_policy_index, err, level=logging.WARNING) + return False + except: # noqa + self.stats_log('unhandled_error', 'unable to upload %r (%f) ' + 'from policy_index %s to policy_index %s ', path, + source_ts, q_policy_index, container_policy_index, + level=logging.ERROR, exc_info=True) + return False + + self.stats_log('copy_success', '%r (%f) moved from policy_index %s ' + 'to policy_index %s (%s)', path, source_ts, + q_policy_index, container_policy_index, put_timestamp) + + return self.throw_tombstones(account, container, obj, q_ts, + q_policy_index, path) + + def ensure_tombstone_in_right_location(self, q_policy_index, account, + container, obj, q_ts, path, + container_policy_index, source_ts, + **kwargs): + """ + Issue a DELETE request against the destination to match the + misplaced DELETE against the source. + """ + delete_timestamp = slightly_later_timestamp(q_ts, offset=2) + self.stats_log('delete_attempt', '%r (%f) in policy_index %s ' + 'will be deleted from policy_index %s (%s)', path, + source_ts, q_policy_index, container_policy_index, + delete_timestamp) + headers = { + 'X-Backend-Storage-Policy-Index': container_policy_index, + 'X-Timestamp': delete_timestamp, + } + try: + self.swift.delete_object(account, container, obj, + headers=headers) + except UnexpectedResponse as err: + self.stats_log('delete_failed', 'delete %r (%f) from ' + 'policy_index %s (%s) returned %s', path, + source_ts, container_policy_index, + delete_timestamp, err, level=logging.WARNING) + return False + except: # noqa + self.stats_log('unhandled_error', 'unable to delete %r (%f) ' + 'from policy_index %s (%s)', path, source_ts, + container_policy_index, delete_timestamp, + level=logging.ERROR, exc_info=True) + return False + + self.stats_log('delete_success', '%r (%f) deleted from ' + 'policy_index %s (%s)', path, source_ts, + container_policy_index, delete_timestamp, + level=logging.INFO) + + return self.throw_tombstones(account, container, obj, q_ts, + q_policy_index, path) + + def reconcile_object(self, info): + """ + Process a possibly misplaced object write request. Determine correct + destination storage policy by checking with primary containers. Check + source and destination, copying or deleting into destination and + cleaning up the source as needed. + + This method wraps _reconcile_object for exception handling. + + :param info: a queue entry dict + + :returns: True to indicate the request is fully processed + successfully, otherwise False. + """ + self.logger.debug('checking placement for %r (%f) ' + 'in policy_index %s', info['path'], + info['q_ts'], info['q_policy_index']) + success = False + try: + success = self._reconcile_object(**info) + except: # noqa + self.logger.exception('Unhandled Exception trying to ' + 'reconcile %r (%s) in policy_index %s', + info['path'], info['q_ts'], + info['q_policy_index']) + if success: + metric = 'success' + msg = 'was handled successfully' + else: + metric = 'retry' + msg = 'must be retried' + msg = '%(path)r (%(q_ts)f) in policy_index %(q_policy_index)s ' + msg + self.stats_log(metric, msg, info, level=logging.INFO) + self.log_stats() + return success + + def _iter_containers(self): + """ + Generate a list of containers to process. + """ + # hit most recent container first instead of waiting on the updaters + current_container = get_reconciler_container_name(time.time()) + yield current_container + container_gen = self.swift.iter_containers(MISPLACED_OBJECTS_ACCOUNT) + self.logger.debug('looking for containers in %s', + MISPLACED_OBJECTS_ACCOUNT) + while True: + one_page = None + try: + one_page = list(itertools.islice( + container_gen, constraints.CONTAINER_LISTING_LIMIT)) + except UnexpectedResponse as err: + self.logger.error('Error listing containers in ' + 'account %s (%s)', + MISPLACED_OBJECTS_ACCOUNT, err) + + if not one_page: + # don't generally expect more than one page + break + # reversed order since we expect older containers to be empty + for c in reversed(one_page): + # encoding here is defensive + container = c['name'].encode('utf8') + if container == current_container: + continue # we've already hit this one this pass + yield container + + def _iter_objects(self, container): + """ + Generate a list of objects to process. + + :param container: the name of the container to process + + If the given container is empty and older than reclaim_age this + processor will attempt to reap it. + """ + self.logger.debug('looking for objects in %s', container) + found_obj = False + try: + for raw_obj in self.swift.iter_objects( + MISPLACED_OBJECTS_ACCOUNT, container): + found_obj = True + yield raw_obj + except UnexpectedResponse as err: + self.logger.error('Error listing objects in container %s (%s)', + container, err) + if float(container) < time.time() - self.reclaim_age and \ + not found_obj: + # Try to delete old empty containers so the queue doesn't + # grow without bound. It's ok if there's a conflict. + self.swift.delete_container( + MISPLACED_OBJECTS_ACCOUNT, container, + acceptable_statuses=(2, 404, 409, 412)) + + def reconcile(self): + """ + Main entry point for processing misplaced objects. + + Iterate over all queue entries and delegate to reconcile_object. + """ + self.logger.debug('pulling items from the queue') + for container in self._iter_containers(): + for raw_obj in self._iter_objects(container): + try: + obj_info = parse_raw_obj(raw_obj) + except Exception: + self.stats_log('invalid_record', + 'invalid queue record: %r', raw_obj, + level=logging.ERROR, exc_info=True) + continue + finished = self.reconcile_object(obj_info) + if finished: + self.pop_queue(container, raw_obj['name'], + obj_info['q_ts'], + obj_info['q_record']) + self.log_stats() + self.logger.debug('finished container %s', container) + + def run_once(self, *args, **kwargs): + """ + Process every entry in the queue. + """ + try: + self.reconcile() + except: + self.logger.exception('Unhandled Exception trying to reconcile') + self.log_stats(force=True) + + def run_forever(self, *args, **kwargs): + while True: + self.run_once(*args, **kwargs) + self.stats = defaultdict(int) + self.logger.info('sleeping between intervals (%ss)', self.interval) + time.sleep(self.interval) diff --git a/test/unit/__init__.py b/test/unit/__init__.py index d44ee7346c..ba155b8a41 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -626,7 +626,10 @@ def fake_http_connect(*code_iter, **kwargs): self._next_sleep = None def getresponse(self): - if kwargs.get('raise_exc'): + exc = kwargs.get('raise_exc') + if exc: + if isinstance(exc, Exception): + raise exc raise Exception('test') if kwargs.get('raise_timeout_exc'): raise Timeout() diff --git a/test/unit/common/middleware/helpers.py b/test/unit/common/middleware/helpers.py index a9f1d993c6..68a4bfee3d 100644 --- a/test/unit/common/middleware/helpers.py +++ b/test/unit/common/middleware/helpers.py @@ -20,6 +20,8 @@ from hashlib import md5 from swift.common import swob from swift.common.utils import split_path +from test.unit import FakeLogger, FakeRing + class FakeSwift(object): """ @@ -33,6 +35,21 @@ class FakeSwift(object): self.uploaded = {} # mapping of (method, path) --> (response class, headers, body) self._responses = {} + self.logger = FakeLogger('fake-swift') + self.account_ring = FakeRing() + self.container_ring = FakeRing() + self.get_object_ring = lambda policy_index: FakeRing() + + def _get_response(self, method, path): + resp = self._responses[(method, path)] + if isinstance(resp, list): + try: + resp = resp.pop(0) + except IndexError: + raise IndexError("Didn't find any more %r " + "in allowed responses" % ( + (method, path),)) + return resp def __call__(self, env, start_response): method = env['REQUEST_METHOD'] @@ -47,29 +64,30 @@ class FakeSwift(object): if resp: return resp(env, start_response) - headers = swob.Request(env).headers - self._calls.append((method, path, headers)) + req_headers = swob.Request(env).headers self.swift_sources.append(env.get('swift.source')) try: - resp_class, raw_headers, body = self._responses[(method, path)] + resp_class, raw_headers, body = self._get_response(method, path) headers = swob.HeaderKeyDict(raw_headers) except KeyError: if (env.get('QUERY_STRING') and (method, env['PATH_INFO']) in self._responses): - resp_class, raw_headers, body = self._responses[ - (method, env['PATH_INFO'])] + resp_class, raw_headers, body = self._get_response( + method, env['PATH_INFO']) headers = swob.HeaderKeyDict(raw_headers) elif method == 'HEAD' and ('GET', path) in self._responses: - resp_class, raw_headers, _ = self._responses[('GET', path)] + resp_class, raw_headers, body = self._get_response('GET', path) body = None headers = swob.HeaderKeyDict(raw_headers) elif method == 'GET' and obj and path in self.uploaded: resp_class = swob.HTTPOk headers, body = self.uploaded[path] else: - print "Didn't find %r in allowed responses" % ((method, path),) - raise + raise KeyError("Didn't find %r in allowed responses" % ( + (method, path),)) + + self._calls.append((method, path, req_headers)) # simulate object PUT if method == 'PUT' and obj: @@ -93,6 +111,10 @@ class FakeSwift(object): def calls(self): return [(method, path) for method, path, headers in self._calls] + @property + def headers(self): + return [headers for method, path, headers in self._calls] + @property def calls_with_headers(self): return self._calls @@ -103,3 +125,6 @@ class FakeSwift(object): def register(self, method, path, response_class, headers, body=''): self._responses[(method, path)] = (response_class, headers, body) + + def register_responses(self, method, path, responses): + self._responses[(method, path)] = list(responses) diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 79a20c5fdd..712d5653e8 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -264,6 +264,16 @@ class TestUtils(unittest.TestCase): self.assertRaises(ValueError, utils.normalize_timestamp, '') self.assertRaises(ValueError, utils.normalize_timestamp, 'abc') + def test_last_modified_date_to_timestamp(self): + expectations = { + '1970-01-01T00:00:00.000000': 0.0, + '2014-02-28T23:22:36.698390': 1393629756.698390, + '2011-03-19T04:03:00.604554': 1300507380.604554, + } + for last_modified, ts in expectations.items(): + real = utils.last_modified_date_to_timestamp(last_modified) + self.assertEqual(real, ts, "failed for %s" % last_modified) + def test_backwards(self): # Test swift.common.utils.backward diff --git a/test/unit/container/test_reconciler.py b/test/unit/container/test_reconciler.py new file mode 100644 index 0000000000..f49cdfdb37 --- /dev/null +++ b/test/unit/container/test_reconciler.py @@ -0,0 +1,1697 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import contextlib +import mock +import operator +import time +import unittest +import urllib +import socket +import os +import errno +import itertools +import random + +from collections import defaultdict +from datetime import datetime +from swift.container import reconciler +from swift.container.server import gen_resp_headers +from swift.common.direct_client import ClientException +from swift.common import swob +from swift.common.utils import split_path, normalize_timestamp + +from test.unit import debug_logger, FakeRing, fake_http_connect +from test.unit.common.middleware.helpers import FakeSwift + + +def timestamp_to_last_modified(timestamp): + return datetime.fromtimestamp(timestamp).strftime('%Y-%m-%dT%H:%M:%S.%f') + + +def container_resp_headers(**kwargs): + return swob.HeaderKeyDict(gen_resp_headers(kwargs)) + + +class FakeStoragePolicySwift(object): + + def __init__(self): + self.storage_policy = defaultdict(FakeSwift) + self._mock_oldest_spi_map = {} + + def __getattribute__(self, name): + try: + return object.__getattribute__(self, name) + except AttributeError: + return getattr(self.storage_policy[None], name) + + def __call__(self, env, start_response): + method = env['REQUEST_METHOD'] + path = env['PATH_INFO'] + _, acc, cont, obj = split_path(env['PATH_INFO'], 0, 4, + rest_with_last=True) + if not obj: + policy_index = None + else: + policy_index = self._mock_oldest_spi_map.get(cont, 0) + # allow backend policy override + if 'HTTP_X_BACKEND_STORAGE_POLICY_INDEX' in env: + policy_index = int(env['HTTP_X_BACKEND_STORAGE_POLICY_INDEX']) + + try: + return self.storage_policy[policy_index].__call__( + env, start_response) + except KeyError: + pass + + if method == 'PUT': + resp_class = swob.HTTPCreated + else: + resp_class = swob.HTTPNotFound + self.storage_policy[policy_index].register( + method, path, resp_class, {}, '') + + return self.storage_policy[policy_index].__call__( + env, start_response) + + +class FakeInternalClient(reconciler.InternalClient): + def __init__(self, listings): + self.app = FakeStoragePolicySwift() + self.user_agent = 'fake-internal-client' + self.request_tries = 1 + self.parse(listings) + + def parse(self, listings): + self.accounts = defaultdict(lambda: defaultdict(list)) + for item, timestamp in listings.items(): + # XXX this interface is stupid + if isinstance(timestamp, tuple): + timestamp, content_type = timestamp + else: + timestamp, content_type = timestamp, 'application/x-put' + storage_policy_index, path = item + account, container_name, obj_name = split_path( + path.encode('utf-8'), 0, 3, rest_with_last=True) + self.accounts[account][container_name].append( + (obj_name, storage_policy_index, timestamp, content_type)) + for account_name, containers in self.accounts.items(): + for con in containers: + self.accounts[account_name][con].sort(key=lambda t: t[0]) + for account, containers in self.accounts.items(): + account_listing_data = [] + account_path = '/v1/%s' % account + for container, objects in containers.items(): + container_path = account_path + '/' + container + container_listing_data = [] + for entry in objects: + (obj_name, storage_policy_index, + timestamp, content_type) = entry + if storage_policy_index is None and not obj_name: + # empty container + continue + obj_path = container_path + '/' + obj_name + headers = {'X-Timestamp': normalize_timestamp(timestamp)} + # register object response + self.app.storage_policy[storage_policy_index].register( + 'GET', obj_path, swob.HTTPOk, headers) + self.app.storage_policy[storage_policy_index].register( + 'DELETE', obj_path, swob.HTTPNoContent, {}) + # container listing entry + last_modified = timestamp_to_last_modified( + float(timestamp)) + obj_data = { + 'bytes': 0, + # listing data is unicode + 'name': obj_name.decode('utf-8'), + 'last_modified': last_modified, + 'hash': timestamp, + 'content_type': content_type, + } + container_listing_data.append(obj_data) + container_listing_data.sort(key=operator.itemgetter('name')) + # register container listing response + container_headers = {} + container_qry_string = '?format=json&marker=&end_marker=' + self.app.register('GET', container_path + container_qry_string, + swob.HTTPOk, container_headers, + json.dumps(container_listing_data)) + if container_listing_data: + obj_name = container_listing_data[-1]['name'] + # client should quote and encode marker + end_qry_string = '?format=json&marker=%s&end_marker=' % ( + urllib.quote(obj_name.encode('utf-8'))) + self.app.register('GET', container_path + end_qry_string, + swob.HTTPOk, container_headers, + json.dumps([])) + self.app.register('DELETE', container_path, + swob.HTTPConflict, {}, '') + # simple account listing entry + container_data = {'name': container} + account_listing_data.append(container_data) + # register account response + account_listing_data.sort(key=operator.itemgetter('name')) + account_headers = {} + account_qry_string = '?format=json&marker=&end_marker=' + self.app.register('GET', account_path + account_qry_string, + swob.HTTPOk, account_headers, + json.dumps(account_listing_data)) + end_qry_string = '?format=json&marker=%s&end_marker=' % ( + urllib.quote(account_listing_data[-1]['name'])) + self.app.register('GET', account_path + end_qry_string, + swob.HTTPOk, account_headers, + json.dumps([])) + + +class TestReconcilerUtils(unittest.TestCase): + + def setUp(self): + self.fake_ring = FakeRing() + + def test_parse_raw_obj(self): + got = reconciler.parse_raw_obj({ + 'name': "2:/AUTH_bob/con/obj", + 'hash': normalize_timestamp(2017551.49350), + 'last_modified': timestamp_to_last_modified(2017551.49352), + 'content_type': 'application/x-delete', + }) + self.assertEqual(got['q_policy_index'], 2) + self.assertEqual(got['account'], 'AUTH_bob') + self.assertEqual(got['container'], 'con') + self.assertEqual(got['obj'], 'obj') + self.assertEqual(got['q_ts'], 2017551.49350) + self.assertEqual(got['q_record'], 2017551.49352) + self.assertEqual(got['q_op'], 'DELETE') + + got = reconciler.parse_raw_obj({ + 'name': "1:/AUTH_bob/con/obj", + 'hash': normalize_timestamp(1234.20190), + 'last_modified': timestamp_to_last_modified(1234.20192), + 'content_type': 'application/x-put', + }) + self.assertEqual(got['q_policy_index'], 1) + self.assertEqual(got['account'], 'AUTH_bob') + self.assertEqual(got['container'], 'con') + self.assertEqual(got['obj'], 'obj') + self.assertEqual(got['q_ts'], 0000001234.20190) + self.assertEqual(got['q_record'], 0000001234.20192) + self.assertEqual(got['q_op'], 'PUT') + + # negative test + obj_info = { + 'name': "1:/AUTH_bob/con/obj", + 'hash': normalize_timestamp(1234.20190), + 'last_modified': timestamp_to_last_modified(1234.20192), + } + self.assertRaises(ValueError, reconciler.parse_raw_obj, obj_info) + obj_info['content_type'] = 'foo' + self.assertRaises(ValueError, reconciler.parse_raw_obj, obj_info) + obj_info['content_type'] = 'appliation/x-post' + self.assertRaises(ValueError, reconciler.parse_raw_obj, obj_info) + self.assertRaises(ValueError, reconciler.parse_raw_obj, + {'name': 'bogus'}) + self.assertRaises(ValueError, reconciler.parse_raw_obj, + {'name': '-1:/AUTH_test/container'}) + self.assertRaises(ValueError, reconciler.parse_raw_obj, + {'name': 'asdf:/AUTH_test/c/obj'}) + self.assertRaises(KeyError, reconciler.parse_raw_obj, + {'name': '0:/AUTH_test/c/obj', + 'content_type': 'application/x-put'}) + + def test_get_container_policy_index(self): + ts = itertools.count(int(time.time())) + mock_path = 'swift.container.reconciler.direct_head_container' + stub_resp_headers = [ + container_resp_headers( + status_changed_at=normalize_timestamp(ts.next()), + storage_policy_index=0, + ), + container_resp_headers( + status_changed_at=normalize_timestamp(ts.next()), + storage_policy_index=1, + ), + container_resp_headers( + status_changed_at=normalize_timestamp(ts.next()), + storage_policy_index=0, + ), + ] + for permutation in itertools.permutations((0, 1, 2)): + resp_headers = [stub_resp_headers[i] for i in permutation] + with mock.patch(mock_path) as direct_head: + direct_head.side_effect = resp_headers + oldest_spi = reconciler.direct_get_container_policy_index( + self.fake_ring, 'a', 'con') + test_values = [(info['x-storage-policy-index'], + info['x-backend-status-changed-at']) for + info in resp_headers] + self.assertEqual(oldest_spi, 0, + "oldest policy index wrong " + "for permutation %r" % test_values) + + def test_get_container_policy_index_with_error(self): + ts = itertools.count(int(time.time())) + mock_path = 'swift.container.reconciler.direct_head_container' + stub_resp_headers = [ + container_resp_headers( + status_change_at=ts.next(), + storage_policy_index=2, + ), + container_resp_headers( + status_changed_at=ts.next(), + storage_policy_index=1, + ), + # old timestamp, but 500 should be ignored... + ClientException( + 'Container Server blew up', + http_status=500, http_reason='Server Error', + http_headers=container_resp_headers( + status_changed_at=normalize_timestamp(0), + storage_policy_index=0, + ), + ), + ] + random.shuffle(stub_resp_headers) + with mock.patch(mock_path) as direct_head: + direct_head.side_effect = stub_resp_headers + oldest_spi = reconciler.direct_get_container_policy_index( + self.fake_ring, 'a', 'con') + self.assertEqual(oldest_spi, 2) + + def test_get_container_policy_index_with_socket_error(self): + ts = itertools.count(int(time.time())) + mock_path = 'swift.container.reconciler.direct_head_container' + stub_resp_headers = [ + container_resp_headers( + status_changed_at=normalize_timestamp(ts.next()), + storage_policy_index=1, + ), + container_resp_headers( + status_changed_at=normalize_timestamp(ts.next()), + storage_policy_index=0, + ), + socket.error(errno.ECONNREFUSED, os.strerror(errno.ECONNREFUSED)), + ] + random.shuffle(stub_resp_headers) + with mock.patch(mock_path) as direct_head: + direct_head.side_effect = stub_resp_headers + oldest_spi = reconciler.direct_get_container_policy_index( + self.fake_ring, 'a', 'con') + self.assertEqual(oldest_spi, 1) + + def test_get_container_policy_index_with_too_many_errors(self): + ts = itertools.count(int(time.time())) + mock_path = 'swift.container.reconciler.direct_head_container' + stub_resp_headers = [ + container_resp_headers( + status_changed_at=normalize_timestamp(ts.next()), + storage_policy_index=0, + ), + socket.error(errno.ECONNREFUSED, os.strerror(errno.ECONNREFUSED)), + ClientException( + 'Container Server blew up', + http_status=500, http_reason='Server Error', + http_headers=container_resp_headers( + status_changed_at=normalize_timestamp(ts.next()), + storage_policy_index=1, + ), + ), + ] + random.shuffle(stub_resp_headers) + with mock.patch(mock_path) as direct_head: + direct_head.side_effect = stub_resp_headers + oldest_spi = reconciler.direct_get_container_policy_index( + self.fake_ring, 'a', 'con') + self.assertEqual(oldest_spi, None) + + def test_get_container_policy_index_for_deleted(self): + mock_path = 'swift.container.reconciler.direct_head_container' + headers = container_resp_headers( + status_changed_at=normalize_timestamp(time.time()), + storage_policy_index=1, + ) + stub_resp_headers = [ + ClientException( + 'Container Not Found', + http_status=404, http_reason='Not Found', + http_headers=headers, + ), + ClientException( + 'Container Not Found', + http_status=404, http_reason='Not Found', + http_headers=headers, + ), + ClientException( + 'Container Not Found', + http_status=404, http_reason='Not Found', + http_headers=headers, + ), + ] + random.shuffle(stub_resp_headers) + with mock.patch(mock_path) as direct_head: + direct_head.side_effect = stub_resp_headers + oldest_spi = reconciler.direct_get_container_policy_index( + self.fake_ring, 'a', 'con') + self.assertEqual(oldest_spi, 1) + + def test_get_container_policy_index_for_recently_deleted(self): + ts = itertools.count(int(time.time())) + mock_path = 'swift.container.reconciler.direct_head_container' + stub_resp_headers = [ + ClientException( + 'Container Not Found', + http_status=404, http_reason='Not Found', + http_headers=container_resp_headers( + put_timestamp=ts.next(), + delete_timestamp=ts.next(), + status_changed_at=ts.next(), + storage_policy_index=0, + ), + ), + ClientException( + 'Container Not Found', + http_status=404, http_reason='Not Found', + http_headers=container_resp_headers( + put_timestamp=ts.next(), + delete_timestamp=ts.next(), + status_changed_at=ts.next(), + storage_policy_index=1, + ), + ), + ClientException( + 'Container Not Found', + http_status=404, http_reason='Not Found', + http_headers=container_resp_headers( + put_timestamp=ts.next(), + delete_timestamp=ts.next(), + status_changed_at=ts.next(), + storage_policy_index=2, + ), + ), + ] + random.shuffle(stub_resp_headers) + with mock.patch(mock_path) as direct_head: + direct_head.side_effect = stub_resp_headers + oldest_spi = reconciler.direct_get_container_policy_index( + self.fake_ring, 'a', 'con') + self.assertEqual(oldest_spi, 2) + + def test_get_container_policy_index_for_recently_recreated(self): + ts = itertools.count(int(time.time())) + mock_path = 'swift.container.reconciler.direct_head_container' + stub_resp_headers = [ + # old put, no recreate + container_resp_headers( + delete_timestamp=0, + put_timestamp=ts.next(), + status_changed_at=ts.next(), + storage_policy_index=0, + ), + # recently deleted + ClientException( + 'Container Not Found', + http_status=404, http_reason='Not Found', + http_headers=container_resp_headers( + put_timestamp=ts.next(), + delete_timestamp=ts.next(), + status_changed_at=ts.next(), + storage_policy_index=1, + ), + ), + # recently recreated + container_resp_headers( + delete_timestamp=ts.next(), + put_timestamp=ts.next(), + status_changed_at=ts.next(), + storage_policy_index=2, + ), + ] + random.shuffle(stub_resp_headers) + with mock.patch(mock_path) as direct_head: + direct_head.side_effect = stub_resp_headers + oldest_spi = reconciler.direct_get_container_policy_index( + self.fake_ring, 'a', 'con') + self.assertEqual(oldest_spi, 2) + + def test_get_container_policy_index_for_recently_split_brain(self): + ts = itertools.count(int(time.time())) + mock_path = 'swift.container.reconciler.direct_head_container' + stub_resp_headers = [ + # oldest put + container_resp_headers( + delete_timestamp=0, + put_timestamp=ts.next(), + status_changed_at=ts.next(), + storage_policy_index=0, + ), + # old recreate + container_resp_headers( + delete_timestamp=ts.next(), + put_timestamp=ts.next(), + status_changed_at=ts.next(), + storage_policy_index=1, + ), + # recently put + container_resp_headers( + delete_timestamp=0, + put_timestamp=ts.next(), + status_changed_at=ts.next(), + storage_policy_index=2, + ), + ] + random.shuffle(stub_resp_headers) + with mock.patch(mock_path) as direct_head: + direct_head.side_effect = stub_resp_headers + oldest_spi = reconciler.direct_get_container_policy_index( + self.fake_ring, 'a', 'con') + self.assertEqual(oldest_spi, 1) + + def test_direct_delete_container_entry(self): + mock_path = 'swift.common.direct_client.http_connect' + connect_args = [] + + def test_connect(ipaddr, port, device, partition, method, path, + headers=None, query_string=None): + connect_args.append({ + 'ipaddr': ipaddr, 'port': port, 'device': device, + 'partition': partition, 'method': method, 'path': path, + 'headers': headers, 'query_string': query_string}) + + x_timestamp = normalize_timestamp(time.time()) + headers = {'x-timestamp': x_timestamp} + fake_hc = fake_http_connect(200, 200, 200, give_connect=test_connect) + with mock.patch(mock_path, fake_hc): + reconciler.direct_delete_container_entry( + self.fake_ring, 'a', 'c', 'o', headers=headers) + + self.assertEqual(len(connect_args), 3) + for args in connect_args: + self.assertEqual(args['method'], 'DELETE') + self.assertEqual(args['path'], '/a/c/o') + self.assertEqual(args['headers'].get('x-timestamp'), + headers['x-timestamp']) + + def test_direct_delete_container_entry_with_errors(self): + # setup mock direct_delete + mock_path = \ + 'swift.container.reconciler.direct_delete_container_object' + stub_resp = [ + None, + socket.error(errno.ECONNREFUSED, os.strerror(errno.ECONNREFUSED)), + ClientException( + 'Container Server blew up', + '10.0.0.12', 6001, 'sdj', 404, 'Not Found' + ), + ] + mock_direct_delete = mock.MagicMock() + mock_direct_delete.side_effect = stub_resp + + with contextlib.nested( + mock.patch(mock_path, mock_direct_delete), + mock.patch('eventlet.greenpool.DEBUG', False), + ): + rv = reconciler.direct_delete_container_entry( + self.fake_ring, 'a', 'c', 'o') + self.assertEqual(rv, None) + self.assertEqual(len(mock_direct_delete.mock_calls), 3) + + def test_add_to_reconciler_queue(self): + mock_path = 'swift.common.direct_client.http_connect' + connect_args = [] + + def test_connect(ipaddr, port, device, partition, method, path, + headers=None, query_string=None): + connect_args.append({ + 'ipaddr': ipaddr, 'port': port, 'device': device, + 'partition': partition, 'method': method, 'path': path, + 'headers': headers, 'query_string': query_string}) + + fake_hc = fake_http_connect(200, 200, 200, give_connect=test_connect) + with mock.patch(mock_path, fake_hc): + ret = reconciler.add_to_reconciler_queue( + self.fake_ring, 'a', 'c', 'o', 17, 5948918.63946, 'DELETE') + + self.assertTrue(ret) + self.assertEqual(ret, str(int(5948918.63946 // 3600 * 3600))) + self.assertEqual(len(connect_args), 3) + connect_args.sort(key=lambda a: (a['ipaddr'], a['port'])) + + required_headers = ('x-content-type', 'x-etag') + + for args in connect_args: + self.assertEqual(args['headers']['X-Timestamp'], '5948918.63946') + self.assertEqual(args['path'], + '/.misplaced_objects/5947200/17:/a/c/o') + self.assertEqual(args['headers']['X-Content-Type'], + 'application/x-delete') + for header in required_headers: + self.assert_(header in args['headers'], + '%r was missing request headers %r' % ( + header, args['headers'])) + + def test_add_to_reconciler_queue_force(self): + mock_path = 'swift.common.direct_client.http_connect' + connect_args = [] + + def test_connect(ipaddr, port, device, partition, method, path, + headers=None, query_string=None): + connect_args.append({ + 'ipaddr': ipaddr, 'port': port, 'device': device, + 'partition': partition, 'method': method, 'path': path, + 'headers': headers, 'query_string': query_string}) + + fake_hc = fake_http_connect(200, 200, 200, give_connect=test_connect) + now = float(normalize_timestamp(time.time())) + with contextlib.nested( + mock.patch(mock_path, fake_hc), + mock.patch('swift.container.reconciler.time.time', + lambda: now), + ): + ret = reconciler.add_to_reconciler_queue( + self.fake_ring, 'a', 'c', 'o', 17, 5948918.63946, 'PUT', + force=True) + + self.assertTrue(ret) + self.assertEqual(ret, str(int(5948918.63946 // 3600 * 3600))) + self.assertEqual(len(connect_args), 3) + connect_args.sort(key=lambda a: (a['ipaddr'], a['port'])) + + required_headers = ('x-size', 'x-content-type') + + for args in connect_args: + self.assertEqual(args['headers']['X-Timestamp'], + normalize_timestamp(now)) + self.assertEqual(args['headers']['X-Etag'], '5948918.63946') + self.assertEqual(args['path'], + '/.misplaced_objects/5947200/17:/a/c/o') + for header in required_headers: + self.assert_(header in args['headers'], + '%r was missing request headers %r' % ( + header, args['headers'])) + + def test_add_to_reconciler_queue_fails(self): + mock_path = 'swift.common.direct_client.http_connect' + + fake_connects = [fake_http_connect(200), + fake_http_connect(200, raise_timeout_exc=True), + fake_http_connect(507)] + + def fake_hc(*a, **kw): + return fake_connects.pop()(*a, **kw) + + with mock.patch(mock_path, fake_hc): + ret = reconciler.add_to_reconciler_queue( + self.fake_ring, 'a', 'c', 'o', 17, 5948918.63946, 'PUT') + self.assertFalse(ret) + + def test_add_to_reconciler_queue_socket_error(self): + mock_path = 'swift.common.direct_client.http_connect' + + exc = socket.error(errno.ECONNREFUSED, + os.strerror(errno.ECONNREFUSED)) + fake_connects = [fake_http_connect(200), + fake_http_connect(200, raise_timeout_exc=True), + fake_http_connect(500, raise_exc=exc)] + + def fake_hc(*a, **kw): + return fake_connects.pop()(*a, **kw) + + with mock.patch(mock_path, fake_hc): + ret = reconciler.add_to_reconciler_queue( + self.fake_ring, 'a', 'c', 'o', 17, 5948918.63946, 'DELETE') + self.assertFalse(ret) + + +def listing_qs(marker): + return "?format=json&marker=%s&end_marker=" % \ + urllib.quote(marker.encode('utf-8')) + + +class TestReconciler(unittest.TestCase): + + maxDiff = None + + def setUp(self): + self.logger = debug_logger() + conf = {} + with mock.patch('swift.container.reconciler.InternalClient'): + self.reconciler = reconciler.ContainerReconciler(conf) + self.reconciler.logger = self.logger + self.start_interval = int(time.time() // 3600 * 3600) + self.current_container_path = '/v1/.misplaced_objects/%d' % ( + self.start_interval) + listing_qs('') + + def _mock_listing(self, objects): + self.reconciler.swift = FakeInternalClient(objects) + self.fake_swift = self.reconciler.swift.app + + def _mock_oldest_spi(self, container_oldest_spi_map): + self.fake_swift._mock_oldest_spi_map = container_oldest_spi_map + + def _run_once(self): + """ + Helper method to run the reconciler once with appropriate direct-client + mocks in place. + + Returns the list of direct-deleted container entries in the format + [(acc1, con1, obj1), ...] + """ + + def mock_oldest_spi(ring, account, container_name): + return self.fake_swift._mock_oldest_spi_map.get(container_name, 0) + + items = { + 'direct_get_container_policy_index': mock_oldest_spi, + 'direct_delete_container_entry': mock.DEFAULT, + } + + mock_time_iter = itertools.count(self.start_interval) + with mock.patch.multiple(reconciler, **items) as mocks: + self.mock_delete_container_entry = \ + mocks['direct_delete_container_entry'] + with mock.patch('time.time', mock_time_iter.next): + self.reconciler.run_once() + + return [c[1][1:4] for c in + mocks['direct_delete_container_entry'].mock_calls] + + def test_invalid_queue_name(self): + self._mock_listing({ + (None, "/.misplaced_objects/3600/bogus"): 3618.84187, + }) + deleted_container_entries = self._run_once() + # we try to find something useful + self.assertEqual( + self.fake_swift.calls, + [('GET', self.current_container_path), + ('GET', '/v1/.misplaced_objects' + listing_qs('')), + ('GET', '/v1/.misplaced_objects' + listing_qs('3600')), + ('GET', '/v1/.misplaced_objects/3600' + listing_qs('')), + ('GET', '/v1/.misplaced_objects/3600' + + listing_qs('bogus'))]) + # but only get the bogus record + self.assertEqual(self.reconciler.stats['invalid_record'], 1) + # and just leave it on the queue + self.assertEqual(self.reconciler.stats['pop_queue'], 0) + self.assertFalse(deleted_container_entries) + + def test_invalid_queue_name_marches_onward(self): + # there's something useful there on the queue + self._mock_listing({ + (None, "/.misplaced_objects/3600/00000bogus"): 3600.0000, + (None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o1"): 3618.84187, + (1, "/AUTH_bob/c/o1"): 3618.84187, + }) + self._mock_oldest_spi({'c': 1}) # already in the right spot! + deleted_container_entries = self._run_once() + # we get all the queue entries we can + self.assertEqual( + self.fake_swift.calls, + [('GET', self.current_container_path), + ('GET', '/v1/.misplaced_objects' + listing_qs('')), + ('GET', '/v1/.misplaced_objects' + listing_qs('3600')), + ('GET', '/v1/.misplaced_objects/3600' + listing_qs('')), + ('GET', '/v1/.misplaced_objects/3600' + + listing_qs('1:/AUTH_bob/c/o1'))]) + # and one is garbage + self.assertEqual(self.reconciler.stats['invalid_record'], 1) + # but the other is workable + self.assertEqual(self.reconciler.stats['noop_object'], 1) + # so pop the queue for that one + self.assertEqual(self.reconciler.stats['pop_queue'], 1) + self.assertEqual(deleted_container_entries, + [('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1')]) + self.assertEqual(self.reconciler.stats['success'], 1) + + def test_queue_name_with_policy_index_delimiter_in_name(self): + q_path = '.misplaced_objects/3600' + obj_path = "AUTH_bob/c:sneaky/o1:sneaky" + # there's something useful there on the queue + self._mock_listing({ + (None, "/%s/1:/%s" % (q_path, obj_path)): 3618.84187, + (1, '/%s' % obj_path): 3618.84187, + }) + self._mock_oldest_spi({'c': 0}) + deleted_container_entries = self._run_once() + # we find the misplaced object + self.assertEqual(self.reconciler.stats['misplaced_object'], 1) + self.assertEqual( + self.fake_swift.calls, + [('GET', self.current_container_path), + ('GET', '/v1/.misplaced_objects' + listing_qs('')), + ('GET', '/v1/.misplaced_objects' + listing_qs('3600')), + ('GET', '/v1/.misplaced_objects/3600' + listing_qs('')), + ('GET', '/v1/.misplaced_objects/3600' + + listing_qs('1:/%s' % obj_path))]) + # move it + self.assertEqual(self.reconciler.stats['copy_attempt'], 1) + self.assertEqual(self.reconciler.stats['copy_success'], 1) + self.assertEqual( + self.fake_swift.storage_policy[1].calls, + [('GET', '/v1/%s' % obj_path), + ('DELETE', '/v1/%s' % obj_path)]) + delete_headers = self.fake_swift.storage_policy[1].headers[1] + self.assertEqual( + self.fake_swift.storage_policy[0].calls, + [('HEAD', '/v1/%s' % obj_path), + ('PUT', '/v1/%s' % obj_path)]) + # clean up the source + self.assertEqual(self.reconciler.stats['cleanup_attempt'], 1) + self.assertEqual(self.reconciler.stats['cleanup_success'], 1) + # we DELETE the object from the wrong place with source_ts + offset 1 + # timestamp to make sure the change takes effect + self.assertEqual(delete_headers.get('X-Timestamp'), + normalize_timestamp(3618.84188)) + # and pop the queue for that one + self.assertEqual(self.reconciler.stats['pop_queue'], 1) + self.assertEqual(deleted_container_entries, [( + '.misplaced_objects', '3600', '1:/%s' % obj_path)]) + self.assertEqual(self.reconciler.stats['success'], 1) + + def test_unable_to_direct_get_oldest_storage_policy(self): + self._mock_listing({ + (None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o1"): 3618.84187, + }) + # the reconciler gets "None" if we can't quorum the container + self._mock_oldest_spi({'c': None}) + deleted_container_entries = self._run_once() + # we look for misplaced objects + self.assertEqual( + self.fake_swift.calls, + [('GET', self.current_container_path), + ('GET', '/v1/.misplaced_objects' + listing_qs('')), + ('GET', '/v1/.misplaced_objects' + listing_qs('3600')), + ('GET', '/v1/.misplaced_objects/3600' + listing_qs('')), + ('GET', '/v1/.misplaced_objects/3600' + + listing_qs('1:/AUTH_bob/c/o1'))]) + # but can't really say where to go looking + self.assertEqual(self.reconciler.stats['unavailable_container'], 1) + # we don't clean up anything + self.assertEqual(self.reconciler.stats['cleanup_object'], 0) + # and we definately should not pop_queue + self.assertFalse(deleted_container_entries) + self.assertEqual(self.reconciler.stats['retry'], 1) + + def test_object_move(self): + self._mock_listing({ + (None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o1"): 3618.84187, + (1, "/AUTH_bob/c/o1"): 3618.84187, + }) + self._mock_oldest_spi({'c': 0}) + deleted_container_entries = self._run_once() + + # found a misplaced object + self.assertEqual(self.reconciler.stats['misplaced_object'], 1) + self.assertEqual( + self.fake_swift.calls, + [('GET', self.current_container_path), + ('GET', '/v1/.misplaced_objects' + listing_qs('')), + ('GET', '/v1/.misplaced_objects' + listing_qs('3600')), + ('GET', '/v1/.misplaced_objects/3600' + listing_qs('')), + ('GET', '/v1/.misplaced_objects/3600' + + listing_qs('1:/AUTH_bob/c/o1'))]) + # moves it + self.assertEqual(self.reconciler.stats['copy_attempt'], 1) + self.assertEqual(self.reconciler.stats['copy_success'], 1) + self.assertEqual( + self.fake_swift.storage_policy[1].calls, + [('GET', '/v1/AUTH_bob/c/o1'), + ('DELETE', '/v1/AUTH_bob/c/o1')]) + delete_headers = self.fake_swift.storage_policy[1].headers[1] + self.assertEqual( + self.fake_swift.storage_policy[0].calls, + [('HEAD', '/v1/AUTH_bob/c/o1'), + ('PUT', '/v1/AUTH_bob/c/o1')]) + put_headers = self.fake_swift.storage_policy[0].headers[1] + # we PUT the object in the right place with q_ts + offset 2 + self.assertEqual(put_headers.get('X-Timestamp'), + normalize_timestamp('3618.84189')) + # cleans up the old + self.assertEqual(self.reconciler.stats['cleanup_attempt'], 1) + self.assertEqual(self.reconciler.stats['cleanup_success'], 1) + # we DELETE the object from the wrong place with source_ts + offset 1 + # timestamp to make sure the change takes effect + self.assertEqual(delete_headers.get('X-Timestamp'), + normalize_timestamp('3618.84188')) + # and when we're done, we pop the entry from the queue + self.assertEqual(self.reconciler.stats['pop_queue'], 1) + self.assertEqual(deleted_container_entries, + [('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1')]) + self.assertEqual(self.reconciler.stats['success'], 1) + + def test_object_move_the_other_direction(self): + self._mock_listing({ + (None, "/.misplaced_objects/3600/0:/AUTH_bob/c/o1"): 3618.84187, + (0, "/AUTH_bob/c/o1"): 3618.84187, + }) + self._mock_oldest_spi({'c': 1}) + deleted_container_entries = self._run_once() + + # found a misplaced object + self.assertEqual(self.reconciler.stats['misplaced_object'], 1) + self.assertEqual( + self.fake_swift.calls, + [('GET', self.current_container_path), + ('GET', '/v1/.misplaced_objects' + listing_qs('')), + ('GET', '/v1/.misplaced_objects' + listing_qs('3600')), + ('GET', '/v1/.misplaced_objects/3600' + listing_qs('')), + ('GET', '/v1/.misplaced_objects/3600' + + listing_qs('0:/AUTH_bob/c/o1'))]) + # moves it + self.assertEqual(self.reconciler.stats['copy_attempt'], 1) + self.assertEqual(self.reconciler.stats['copy_success'], 1) + self.assertEqual( + self.fake_swift.storage_policy[0].calls, + [('GET', '/v1/AUTH_bob/c/o1'), # 2 + ('DELETE', '/v1/AUTH_bob/c/o1')]) # 4 + delete_headers = self.fake_swift.storage_policy[0].headers[1] + self.assertEqual( + self.fake_swift.storage_policy[1].calls, + [('HEAD', '/v1/AUTH_bob/c/o1'), # 1 + ('PUT', '/v1/AUTH_bob/c/o1')]) # 3 + put_headers = self.fake_swift.storage_policy[1].headers[1] + # we PUT the object in the right place with q_ts + offset 2 + self.assertEqual(put_headers.get('X-Timestamp'), + normalize_timestamp('3618.84189')) + # cleans up the old + self.assertEqual(self.reconciler.stats['cleanup_attempt'], 1) + self.assertEqual(self.reconciler.stats['cleanup_success'], 1) + # we DELETE the object from the wrong place with source_ts + offset 1 + # timestamp to make sure the change takes effect + self.assertEqual(delete_headers.get('X-Timestamp'), + normalize_timestamp('3618.84188')) + # and when we're done, we pop the entry from the queue + self.assertEqual(self.reconciler.stats['pop_queue'], 1) + self.assertEqual(deleted_container_entries, + [('.misplaced_objects', '3600', '0:/AUTH_bob/c/o1')]) + self.assertEqual(self.reconciler.stats['success'], 1) + + def test_object_move_with_unicode_and_spaces(self): + # the "name" in listings and the unicode string passed to all + # functions where we call them with (account, container, obj) + obj_name = u"AUTH_bob/c \u062a/o1 \u062a" + # anytime we talk about a call made to swift for a path + obj_path = obj_name.encode('utf-8') + # this mock expects unquoted unicode because it handles container + # listings as well as paths + self._mock_listing({ + (None, "/.misplaced_objects/3600/1:/%s" % obj_name): 3618.84187, + (1, "/%s" % obj_name): 3618.84187, + }) + self._mock_oldest_spi({'c': 0}) + deleted_container_entries = self._run_once() + + # found a misplaced object + self.assertEqual(self.reconciler.stats['misplaced_object'], 1) + # listing_qs encodes and quotes - so give it name + self.assertEqual( + self.fake_swift.calls, + [('GET', self.current_container_path), + ('GET', '/v1/.misplaced_objects' + listing_qs('')), + ('GET', '/v1/.misplaced_objects' + listing_qs('3600')), + ('GET', '/v1/.misplaced_objects/3600' + listing_qs('')), + ('GET', '/v1/.misplaced_objects/3600' + + listing_qs('1:/%s' % obj_name))]) + # moves it + self.assertEqual(self.reconciler.stats['copy_attempt'], 1) + self.assertEqual(self.reconciler.stats['copy_success'], 1) + # these calls are to the real path + self.assertEqual( + self.fake_swift.storage_policy[1].calls, + [('GET', '/v1/%s' % obj_path), # 2 + ('DELETE', '/v1/%s' % obj_path)]) # 4 + delete_headers = self.fake_swift.storage_policy[1].headers[1] + self.assertEqual( + self.fake_swift.storage_policy[0].calls, + [('HEAD', '/v1/%s' % obj_path), # 1 + ('PUT', '/v1/%s' % obj_path)]) # 3 + put_headers = self.fake_swift.storage_policy[0].headers[1] + # we PUT the object in the right place with q_ts + offset 2 + self.assertEqual(put_headers.get('X-Timestamp'), + normalize_timestamp('3618.84189')) + # cleans up the old + self.assertEqual(self.reconciler.stats['cleanup_attempt'], 1) + self.assertEqual(self.reconciler.stats['cleanup_success'], 1) + # we DELETE the object from the wrong place with source_ts + offset 1 + # timestamp to make sure the change takes effect + self.assertEqual(delete_headers.get('X-Timestamp'), + normalize_timestamp('3618.84188')) + self.assertEqual( + delete_headers.get('X-Backend-Storage-Policy-Index'), '1') + # and when we're done, we pop the entry from the queue + self.assertEqual(self.reconciler.stats['pop_queue'], 1) + # this mock recieved the name, it's encoded down in buffered_http + self.assertEqual(deleted_container_entries, + [('.misplaced_objects', '3600', '1:/%s' % obj_name)]) + self.assertEqual(self.reconciler.stats['success'], 1) + + def test_object_delete(self): + q_ts = float(normalize_timestamp(time.time())) + self._mock_listing({ + (None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o1"): ( + normalize_timestamp(q_ts), 'application/x-delete'), + # object exists in "correct" storage policy - slightly older + (0, "/AUTH_bob/c/o1"): normalize_timestamp(q_ts - 1), + }) + self._mock_oldest_spi({'c': 0}) + # the tombstone exists in the enqueued storage policy + self.fake_swift.storage_policy[1].register( + 'GET', '/v1/AUTH_bob/c/o1', swob.HTTPNotFound, + {'X-Backend-Timestamp': normalize_timestamp(q_ts)}) + deleted_container_entries = self._run_once() + + # found a misplaced object + self.assertEqual(self.reconciler.stats['misplaced_object'], 1) + self.assertEqual( + self.fake_swift.calls, + [('GET', self.current_container_path), + ('GET', '/v1/.misplaced_objects' + listing_qs('')), + ('GET', '/v1/.misplaced_objects' + listing_qs('3600')), + ('GET', '/v1/.misplaced_objects/3600' + listing_qs('')), + ('GET', '/v1/.misplaced_objects/3600' + + listing_qs('1:/AUTH_bob/c/o1'))]) + # delete it + self.assertEqual(self.reconciler.stats['delete_attempt'], 1) + self.assertEqual(self.reconciler.stats['delete_success'], 1) + self.assertEqual( + self.fake_swift.storage_policy[1].calls, + [('GET', '/v1/AUTH_bob/c/o1'), + ('DELETE', '/v1/AUTH_bob/c/o1')]) + delete_headers = self.fake_swift.storage_policy[1].headers[1] + self.assertEqual( + self.fake_swift.storage_policy[0].calls, + [('HEAD', '/v1/AUTH_bob/c/o1'), + ('DELETE', '/v1/AUTH_bob/c/o1')]) + reconcile_headers = self.fake_swift.storage_policy[0].headers[1] + # we DELETE the object in the right place with q_ts + offset 2 + self.assertEqual(reconcile_headers.get('X-Timestamp'), + normalize_timestamp(q_ts + 0.00002)) + # cleans up the old + self.assertEqual(self.reconciler.stats['cleanup_attempt'], 1) + self.assertEqual(self.reconciler.stats['cleanup_success'], 1) + # we DELETE the object from the wrong place with source_ts + offset 1 + # timestamp to make sure the change takes effect + self.assertEqual(delete_headers.get('X-Timestamp'), + normalize_timestamp(q_ts + 0.00001)) + # and when we're done, we pop the entry from the queue + self.assertEqual(self.reconciler.stats['pop_queue'], 1) + self.assertEqual(deleted_container_entries, + [('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1')]) + self.assertEqual(self.reconciler.stats['success'], 1) + + def test_object_enqueued_for_the_correct_dest_noop(self): + self._mock_listing({ + (None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o1"): 3618.84187, + (1, "/AUTH_bob/c/o1"): 3618.84187, + }) + self._mock_oldest_spi({'c': 1}) # already in the right spot! + deleted_container_entries = self._run_once() + + # nothing to see here + self.assertEqual(self.reconciler.stats['noop_object'], 1) + self.assertEqual( + self.fake_swift.calls, + [('GET', self.current_container_path), + ('GET', '/v1/.misplaced_objects' + listing_qs('')), + ('GET', '/v1/.misplaced_objects' + listing_qs('3600')), + ('GET', '/v1/.misplaced_objects/3600' + listing_qs('')), + ('GET', '/v1/.misplaced_objects/3600' + + listing_qs('1:/AUTH_bob/c/o1'))]) + # so we just pop the queue + self.assertEqual(self.reconciler.stats['pop_queue'], 1) + self.assertEqual(deleted_container_entries, + [('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1')]) + self.assertEqual(self.reconciler.stats['success'], 1) + + def test_object_move_src_object_newer_than_queue_entry(self): + # setup the cluster + self._mock_listing({ + (None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o1"): 3600.123456, + (1, '/AUTH_bob/c/o1'): 3600.234567, # slightly newer + }) + self._mock_oldest_spi({'c': 0}) # destination + # turn the crank + deleted_container_entries = self._run_once() + + # found a misplaced object + self.assertEqual(self.reconciler.stats['misplaced_object'], 1) + self.assertEqual( + self.fake_swift.calls, + [('GET', self.current_container_path), + ('GET', '/v1/.misplaced_objects' + listing_qs('')), + ('GET', '/v1/.misplaced_objects' + listing_qs('3600')), + ('GET', '/v1/.misplaced_objects/3600' + listing_qs('')), + ('GET', '/v1/.misplaced_objects/3600' + + listing_qs('1:/AUTH_bob/c/o1'))]) + # proceed with the move + self.assertEqual(self.reconciler.stats['copy_attempt'], 1) + self.assertEqual(self.reconciler.stats['copy_success'], 1) + self.assertEqual( + self.fake_swift.storage_policy[1].calls, + [('GET', '/v1/AUTH_bob/c/o1'), # 2 + ('DELETE', '/v1/AUTH_bob/c/o1')]) # 4 + delete_headers = self.fake_swift.storage_policy[1].headers[1] + self.assertEqual( + self.fake_swift.storage_policy[0].calls, + [('HEAD', '/v1/AUTH_bob/c/o1'), # 1 + ('PUT', '/v1/AUTH_bob/c/o1')]) # 3 + # .. with source timestamp + offset 2 + put_headers = self.fake_swift.storage_policy[0].headers[1] + self.assertEqual(put_headers.get('X-Timestamp'), + normalize_timestamp(3600.234587)) + # src object is cleaned up + self.assertEqual(self.reconciler.stats['cleanup_attempt'], 1) + self.assertEqual(self.reconciler.stats['cleanup_success'], 1) + # ... with q_ts + offset 1 + self.assertEqual(delete_headers.get('X-Timestamp'), + normalize_timestamp(3600.123466)) + # and queue is popped + self.assertEqual(self.reconciler.stats['pop_queue'], 1) + self.assertEqual(deleted_container_entries, + [('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1')]) + self.assertEqual(self.reconciler.stats['success'], 1) + + def test_object_move_src_object_older_than_queue_entry(self): + # should be some sort of retry case + q_ts = float(normalize_timestamp(time.time())) + container = str(int(q_ts // 3600 * 3600)) + q_path = '.misplaced_objects/%s' % container + self._mock_listing({ + (None, "/%s/1:/AUTH_bob/c/o1" % q_path): q_ts, + (1, '/AUTH_bob/c/o1'): q_ts - 0.00001, # slightly older + }) + self._mock_oldest_spi({'c': 0}) + deleted_container_entries = self._run_once() + + # found a misplaced object + self.assertEqual(self.reconciler.stats['misplaced_object'], 1) + self.assertEqual( + self.fake_swift.calls, + [('GET', '/v1/%s' % q_path + listing_qs('')), + ('GET', '/v1/%s' % q_path + + listing_qs('1:/AUTH_bob/c/o1')), + ('GET', '/v1/.misplaced_objects' + listing_qs('')), + ('GET', '/v1/.misplaced_objects' + listing_qs(container))]) + self.assertEqual( + self.fake_swift.storage_policy[0].calls, + [('HEAD', '/v1/AUTH_bob/c/o1')]) + # but no object copy is attempted + self.assertEqual(self.reconciler.stats['unavailable_source'], 1) + self.assertEqual(self.reconciler.stats['copy_attempt'], 0) + self.assertEqual( + self.fake_swift.storage_policy[1].calls, + [('GET', '/v1/AUTH_bob/c/o1')]) + # src object is un-modified + self.assertEqual(self.reconciler.stats['cleanup_attempt'], 0) + # queue is un-changed, we'll have to retry + self.assertEqual(self.reconciler.stats['pop_queue'], 0) + self.assertEqual(deleted_container_entries, []) + self.assertEqual(self.reconciler.stats['retry'], 1) + + def test_src_object_unavailable_with_slightly_newer_tombstone(self): + # should be some sort of retry case + q_ts = float(normalize_timestamp(time.time())) + container = str(int(q_ts // 3600 * 3600)) + q_path = '.misplaced_objects/%s' % container + self._mock_listing({ + (None, "/%s/1:/AUTH_bob/c/o1" % q_path): q_ts, + }) + self._mock_oldest_spi({'c': 0}) + self.fake_swift.storage_policy[1].register( + 'GET', '/v1/AUTH_bob/c/o1', swob.HTTPNotFound, + {'X-Backend-Timestamp': normalize_timestamp(q_ts + 0.00002)}) + deleted_container_entries = self._run_once() + + # found a misplaced object + self.assertEqual(self.reconciler.stats['misplaced_object'], 1) + self.assertEqual( + self.fake_swift.calls, + [('GET', '/v1/%s' % q_path + listing_qs('')), + ('GET', '/v1/%s' % q_path + + listing_qs('1:/AUTH_bob/c/o1')), + ('GET', '/v1/.misplaced_objects' + listing_qs('')), + ('GET', '/v1/.misplaced_objects' + listing_qs(container))]) + self.assertEqual( + self.fake_swift.storage_policy[0].calls, + [('HEAD', '/v1/AUTH_bob/c/o1')]) + # but no object copy is attempted + self.assertEqual(self.reconciler.stats['unavailable_source'], 1) + self.assertEqual(self.reconciler.stats['copy_attempt'], 0) + self.assertEqual( + self.fake_swift.storage_policy[1].calls, + [('GET', '/v1/AUTH_bob/c/o1')]) + # src object is un-modified + self.assertEqual(self.reconciler.stats['cleanup_attempt'], 0) + # queue is un-changed, we'll have to retry + self.assertEqual(self.reconciler.stats['pop_queue'], 0) + self.assertEqual(deleted_container_entries, []) + self.assertEqual(self.reconciler.stats['retry'], 1) + + def test_src_object_unavailable_server_error(self): + # should be some sort of retry case + q_ts = float(normalize_timestamp(time.time())) + container = str(int(q_ts // 3600 * 3600)) + q_path = '.misplaced_objects/%s' % container + self._mock_listing({ + (None, "/%s/1:/AUTH_bob/c/o1" % q_path): q_ts, + }) + self._mock_oldest_spi({'c': 0}) + self.fake_swift.storage_policy[1].register( + 'GET', '/v1/AUTH_bob/c/o1', swob.HTTPServiceUnavailable, {}) + deleted_container_entries = self._run_once() + + # found a misplaced object + self.assertEqual(self.reconciler.stats['misplaced_object'], 1) + self.assertEqual( + self.fake_swift.calls, + [('GET', '/v1/%s' % q_path + listing_qs('')), + ('GET', '/v1/%s' % q_path + + listing_qs('1:/AUTH_bob/c/o1')), + ('GET', '/v1/.misplaced_objects' + listing_qs('')), + ('GET', '/v1/.misplaced_objects' + listing_qs(container))]) + self.assertEqual( + self.fake_swift.storage_policy[0].calls, + [('HEAD', '/v1/AUTH_bob/c/o1')]) + # but no object copy is attempted + self.assertEqual(self.reconciler.stats['unavailable_source'], 1) + self.assertEqual(self.reconciler.stats['copy_attempt'], 0) + self.assertEqual( + self.fake_swift.storage_policy[1].calls, + [('GET', '/v1/AUTH_bob/c/o1')]) + # src object is un-modified + self.assertEqual(self.reconciler.stats['cleanup_attempt'], 0) + # queue is un-changed, we'll have to retry + self.assertEqual(self.reconciler.stats['pop_queue'], 0) + self.assertEqual(deleted_container_entries, []) + self.assertEqual(self.reconciler.stats['retry'], 1) + + def test_object_move_fails_cleanup(self): + # setup the cluster + self._mock_listing({ + (None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o1"): 3600.123456, + (1, '/AUTH_bob/c/o1'): 3600.234567, # slightly newer + }) + self._mock_oldest_spi({'c': 0}) # destination + + # make the DELETE blow up + self.fake_swift.storage_policy[1].register( + 'DELETE', '/v1/AUTH_bob/c/o1', swob.HTTPServiceUnavailable, {}) + # turn the crank + deleted_container_entries = self._run_once() + + # found a misplaced object + self.assertEqual(self.reconciler.stats['misplaced_object'], 1) + self.assertEqual( + self.fake_swift.calls, + [('GET', self.current_container_path), + ('GET', '/v1/.misplaced_objects' + listing_qs('')), + ('GET', '/v1/.misplaced_objects' + listing_qs('3600')), + ('GET', '/v1/.misplaced_objects/3600' + listing_qs('')), + ('GET', '/v1/.misplaced_objects/3600' + + listing_qs('1:/AUTH_bob/c/o1'))]) + # proceed with the move + self.assertEqual(self.reconciler.stats['copy_attempt'], 1) + self.assertEqual(self.reconciler.stats['copy_success'], 1) + self.assertEqual( + self.fake_swift.storage_policy[1].calls, + [('GET', '/v1/AUTH_bob/c/o1'), # 2 + ('DELETE', '/v1/AUTH_bob/c/o1')]) # 4 + delete_headers = self.fake_swift.storage_policy[1].headers[1] + self.assertEqual( + self.fake_swift.storage_policy[0].calls, + [('HEAD', '/v1/AUTH_bob/c/o1'), # 1 + ('PUT', '/v1/AUTH_bob/c/o1')]) # 3 + # .. with source timestamp + offset 2 + put_headers = self.fake_swift.storage_policy[0].headers[1] + self.assertEqual(put_headers.get('X-Timestamp'), + normalize_timestamp(3600.234587)) + # we try to cleanup + self.assertEqual(self.reconciler.stats['cleanup_attempt'], 1) + # ... with q_ts + offset 1 + self.assertEqual(delete_headers.get('X-Timestamp'), + normalize_timestamp(3600.123466)) + # but cleanup fails! + self.assertEqual(self.reconciler.stats['cleanup_failed'], 1) + # so the queue is not popped + self.assertEqual(self.reconciler.stats['pop_queue'], 0) + self.assertEqual(deleted_container_entries, []) + # and we'll have to retry + self.assertEqual(self.reconciler.stats['retry'], 1) + + def test_object_move_src_object_is_forever_gone(self): + # oh boy, hate to be here - this is an oldy + q_ts = self.start_interval - self.reconciler.reclaim_age - 1 + self._mock_listing({ + (None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o1"): q_ts, + }) + self._mock_oldest_spi({'c': 0}) + deleted_container_entries = self._run_once() + + # found a misplaced object + self.assertEqual(self.reconciler.stats['misplaced_object'], 1) + self.assertEqual( + self.fake_swift.calls, + [('GET', self.current_container_path), + ('GET', '/v1/.misplaced_objects' + listing_qs('')), + ('GET', '/v1/.misplaced_objects' + listing_qs('3600')), + ('GET', '/v1/.misplaced_objects/3600' + listing_qs('')), + ('GET', '/v1/.misplaced_objects/3600' + + listing_qs('1:/AUTH_bob/c/o1'))]) + self.assertEqual( + self.fake_swift.storage_policy[0].calls, + [('HEAD', '/v1/AUTH_bob/c/o1')]) + # but it's gone :\ + self.assertEqual(self.reconciler.stats['lost_source'], 1) + self.assertEqual( + self.fake_swift.storage_policy[1].calls, + [('GET', '/v1/AUTH_bob/c/o1')]) + # gah, look, even if it was out there somewhere - we've been at this + # two weeks and haven't found it. We can't just keep looking forever, + # so... we're done + self.assertEqual(self.reconciler.stats['pop_queue'], 1) + self.assertEqual(deleted_container_entries, + [('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1')]) + # dunno if this is helpful, but FWIW we don't throw tombstones? + self.assertEqual(self.reconciler.stats['cleanup_attempt'], 0) + self.assertEqual(self.reconciler.stats['success'], 1) # lol + + def test_object_move_dest_already_moved(self): + self._mock_listing({ + (None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o1"): 3679.2019, + (1, "/AUTH_bob/c/o1"): 3679.2019, + (0, "/AUTH_bob/c/o1"): 3679.2019, + }) + self._mock_oldest_spi({'c': 0}) + deleted_container_entries = self._run_once() + + # we look for misplaced objects + self.assertEqual( + self.fake_swift.calls, + [('GET', self.current_container_path), + ('GET', '/v1/.misplaced_objects' + listing_qs('')), + ('GET', '/v1/.misplaced_objects' + listing_qs('3600')), + ('GET', '/v1/.misplaced_objects/3600' + listing_qs('')), + ('GET', '/v1/.misplaced_objects/3600' + + listing_qs('1:/AUTH_bob/c/o1'))]) + # but we found it already in the right place! + self.assertEqual(self.reconciler.stats['found_object'], 1) + self.assertEqual( + self.fake_swift.storage_policy[0].calls, + [('HEAD', '/v1/AUTH_bob/c/o1')]) + # so no attempt to read the source is made, but we do cleanup + self.assertEqual( + self.fake_swift.storage_policy[1].calls, + [('DELETE', '/v1/AUTH_bob/c/o1')]) + delete_headers = self.fake_swift.storage_policy[1].headers[0] + # rather we just clean up the dark matter + self.assertEqual(self.reconciler.stats['cleanup_attempt'], 1) + self.assertEqual(self.reconciler.stats['cleanup_success'], 1) + self.assertEqual(delete_headers.get('X-Timestamp'), + normalize_timestamp(3679.20191)) + # and wipe our hands of it + self.assertEqual(self.reconciler.stats['pop_queue'], 1) + self.assertEqual(deleted_container_entries, + [('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1')]) + self.assertEqual(self.reconciler.stats['success'], 1) + + def test_object_move_dest_object_newer_than_queue_entry(self): + self._mock_listing({ + (None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o1"): 3679.2019, + (1, "/AUTH_bob/c/o1"): 3679.2019, + (0, "/AUTH_bob/c/o1"): 3679.2019 + 0.00001, # slightly newer + }) + self._mock_oldest_spi({'c': 0}) + deleted_container_entries = self._run_once() + + # we look for misplaced objects... + self.assertEqual( + self.fake_swift.calls, + [('GET', self.current_container_path), + ('GET', '/v1/.misplaced_objects' + listing_qs('')), + ('GET', '/v1/.misplaced_objects' + listing_qs('3600')), + ('GET', '/v1/.misplaced_objects/3600' + listing_qs('')), + ('GET', '/v1/.misplaced_objects/3600' + + listing_qs('1:/AUTH_bob/c/o1'))]) + # but we found it already in the right place! + self.assertEqual(self.reconciler.stats['found_object'], 1) + self.assertEqual( + self.fake_swift.storage_policy[0].calls, + [('HEAD', '/v1/AUTH_bob/c/o1')]) + # so not attempt to read is made, but we do cleanup + self.assertEqual(self.reconciler.stats['copy_attempt'], 0) + self.assertEqual( + self.fake_swift.storage_policy[1].calls, + [('DELETE', '/v1/AUTH_bob/c/o1')]) + delete_headers = self.fake_swift.storage_policy[1].headers[0] + # rather we just clean up the dark matter + self.assertEqual(self.reconciler.stats['cleanup_attempt'], 1) + self.assertEqual(self.reconciler.stats['cleanup_success'], 1) + + self.assertEqual(delete_headers.get('X-Timestamp'), + normalize_timestamp('3679.20191')) + # and since we cleaned up the old object, so this counts as done + self.assertEqual(self.reconciler.stats['pop_queue'], 1) + self.assertEqual(deleted_container_entries, + [('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1')]) + self.assertEqual(self.reconciler.stats['success'], 1) + + def test_object_move_dest_object_older_than_queue_entry(self): + self._mock_listing({ + (None, "/.misplaced_objects/36000/1:/AUTH_bob/c/o1"): 36123.38393, + (1, "/AUTH_bob/c/o1"): 36123.38393, + (0, "/AUTH_bob/c/o1"): 36123.38393 - 0.00001, # slightly older + }) + self._mock_oldest_spi({'c': 0}) + deleted_container_entries = self._run_once() + + # we found a misplaced object + self.assertEqual(self.reconciler.stats['misplaced_object'], 1) + self.assertEqual( + self.fake_swift.calls, + [('GET', self.current_container_path), + ('GET', '/v1/.misplaced_objects' + listing_qs('')), + ('GET', '/v1/.misplaced_objects' + listing_qs('36000')), + ('GET', '/v1/.misplaced_objects/36000' + listing_qs('')), + ('GET', '/v1/.misplaced_objects/36000' + + listing_qs('1:/AUTH_bob/c/o1'))]) + # and since our version is *newer*, we overwrite + self.assertEqual(self.reconciler.stats['copy_attempt'], 1) + self.assertEqual(self.reconciler.stats['copy_success'], 1) + self.assertEqual( + self.fake_swift.storage_policy[1].calls, + [('GET', '/v1/AUTH_bob/c/o1'), # 2 + ('DELETE', '/v1/AUTH_bob/c/o1')]) # 4 + delete_headers = self.fake_swift.storage_policy[1].headers[1] + self.assertEqual( + self.fake_swift.storage_policy[0].calls, + [('HEAD', '/v1/AUTH_bob/c/o1'), # 1 + ('PUT', '/v1/AUTH_bob/c/o1')]) # 3 + # ... with a q_ts + offset 2 + put_headers = self.fake_swift.storage_policy[0].headers[1] + self.assertEqual(put_headers.get('X-Timestamp'), + normalize_timestamp(36123.38395)) + # then clean the dark matter + self.assertEqual(self.reconciler.stats['cleanup_attempt'], 1) + self.assertEqual(self.reconciler.stats['cleanup_success'], 1) + # ... with a q_ts + offset 1 + self.assertEqual(delete_headers.get('X-Timestamp'), + normalize_timestamp(36123.38394)) + + # and pop the queue + self.assertEqual(self.reconciler.stats['pop_queue'], 1) + self.assertEqual(deleted_container_entries, + [('.misplaced_objects', '36000', '1:/AUTH_bob/c/o1')]) + self.assertEqual(self.reconciler.stats['success'], 1) + + def test_object_move_put_fails(self): + # setup the cluster + self._mock_listing({ + (None, "/.misplaced_objects/36000/1:/AUTH_bob/c/o1"): 36123.383925, + (1, "/AUTH_bob/c/o1"): 36123.383925, + }) + self._mock_oldest_spi({'c': 0}) + + # make the put to dest fail! + self.fake_swift.storage_policy[0].register( + 'PUT', '/v1/AUTH_bob/c/o1', swob.HTTPServiceUnavailable, {}) + + # turn the crank + deleted_container_entries = self._run_once() + + # we find a misplaced object + self.assertEqual(self.reconciler.stats['misplaced_object'], 1) + self.assertEqual( + self.fake_swift.calls, + [('GET', self.current_container_path), + ('GET', '/v1/.misplaced_objects' + listing_qs('')), + ('GET', '/v1/.misplaced_objects' + listing_qs('36000')), + ('GET', '/v1/.misplaced_objects/36000' + listing_qs('')), + ('GET', '/v1/.misplaced_objects/36000' + + listing_qs('1:/AUTH_bob/c/o1'))]) + # and try to move it, but it fails + self.assertEqual(self.reconciler.stats['copy_attempt'], 1) + self.assertEqual( + self.fake_swift.storage_policy[1].calls, + [('GET', '/v1/AUTH_bob/c/o1')]) # 2 + self.assertEqual( + self.fake_swift.storage_policy[0].calls, + [('HEAD', '/v1/AUTH_bob/c/o1'), # 1 + ('PUT', '/v1/AUTH_bob/c/o1')]) # 3 + put_headers = self.fake_swift.storage_policy[0].headers[1] + # ...with q_ts + offset 2 (20-microseconds) + self.assertEqual(put_headers.get('X-Timestamp'), + normalize_timestamp(36123.383945)) + # but it failed + self.assertEqual(self.reconciler.stats['copy_success'], 0) + self.assertEqual(self.reconciler.stats['copy_failed'], 1) + # ... so we don't clean up the source + self.assertEqual(self.reconciler.stats['cleanup_attempt'], 0) + # and we don't pop the queue + self.assertEqual(deleted_container_entries, []) + self.assertEqual(self.reconciler.stats['unhandled_errors'], 0) + self.assertEqual(self.reconciler.stats['retry'], 1) + + def test_object_move_put_blows_up_crazy_town(self): + # setup the cluster + self._mock_listing({ + (None, "/.misplaced_objects/36000/1:/AUTH_bob/c/o1"): 36123.383925, + (1, "/AUTH_bob/c/o1"): 36123.383925, + }) + self._mock_oldest_spi({'c': 0}) + + # make the put to dest blow up crazy town + def blow_up(*args, **kwargs): + raise Exception('kaboom!') + + self.fake_swift.storage_policy[0].register( + 'PUT', '/v1/AUTH_bob/c/o1', blow_up, {}) + + # turn the crank + deleted_container_entries = self._run_once() + + # we find a misplaced object + self.assertEqual(self.reconciler.stats['misplaced_object'], 1) + self.assertEqual( + self.fake_swift.calls, + [('GET', self.current_container_path), + ('GET', '/v1/.misplaced_objects' + listing_qs('')), + ('GET', '/v1/.misplaced_objects' + listing_qs('36000')), + ('GET', '/v1/.misplaced_objects/36000' + listing_qs('')), + ('GET', '/v1/.misplaced_objects/36000' + + listing_qs('1:/AUTH_bob/c/o1'))]) + # and attempt to move it + self.assertEqual(self.reconciler.stats['copy_attempt'], 1) + self.assertEqual( + self.fake_swift.storage_policy[1].calls, + [('GET', '/v1/AUTH_bob/c/o1')]) # 2 + self.assertEqual( + self.fake_swift.storage_policy[0].calls, + [('HEAD', '/v1/AUTH_bob/c/o1'), # 1 + ('PUT', '/v1/AUTH_bob/c/o1')]) # 3 + put_headers = self.fake_swift.storage_policy[0].headers[1] + # ...with q_ts + offset 2 (20-microseconds) + self.assertEqual(put_headers.get('X-Timestamp'), + normalize_timestamp(36123.383945)) + # but it blows up hard + self.assertEqual(self.reconciler.stats['unhandled_error'], 1) + # so we don't cleanup + self.assertEqual(self.reconciler.stats['cleanup_attempt'], 0) + # and we don't pop the queue + self.assertEqual(self.reconciler.stats['pop_queue'], 0) + self.assertEqual(deleted_container_entries, []) + self.assertEqual(self.reconciler.stats['retry'], 1) + + def test_object_move_no_such_object_no_tombstone_recent(self): + q_ts = float(normalize_timestamp(time.time())) + container = str(int(q_ts // 3600 * 3600)) + q_path = '.misplaced_objects/%s' % container + + self._mock_listing({ + (None, "/%s/1:/AUTH_jeb/c/o1" % q_path): q_ts + }) + self._mock_oldest_spi({'c': 0}) + + deleted_container_entries = self._run_once() + + self.assertEqual( + self.fake_swift.calls, + [('GET', '/v1/.misplaced_objects/%s' % container + listing_qs('')), + ('GET', '/v1/.misplaced_objects/%s' % container + + listing_qs('1:/AUTH_jeb/c/o1')), + ('GET', '/v1/.misplaced_objects' + listing_qs('')), + ('GET', '/v1/.misplaced_objects' + listing_qs(container))]) + self.assertEqual( + self.fake_swift.storage_policy[0].calls, + [('HEAD', '/v1/AUTH_jeb/c/o1')], + ) + self.assertEqual( + self.fake_swift.storage_policy[1].calls, + [('GET', '/v1/AUTH_jeb/c/o1')], + ) + # the queue entry is recent enough that there could easily be + # tombstones on offline nodes or something, so we'll just leave it + # here and try again later + self.assertEqual(deleted_container_entries, []) + + def test_object_move_no_such_object_no_tombstone_ancient(self): + queue_ts = float(normalize_timestamp(time.time())) - \ + self.reconciler.reclaim_age * 1.1 + container = str(int(queue_ts // 3600 * 3600)) + + self._mock_listing({ + ( + None, "/.misplaced_objects/%s/1:/AUTH_jeb/c/o1" % container + ): queue_ts + }) + self._mock_oldest_spi({'c': 0}) + + deleted_container_entries = self._run_once() + + self.assertEqual( + self.fake_swift.calls, + [('GET', self.current_container_path), + ('GET', '/v1/.misplaced_objects' + listing_qs('')), + ('GET', '/v1/.misplaced_objects' + listing_qs(container)), + ('GET', '/v1/.misplaced_objects/%s' % container + listing_qs('')), + ('GET', '/v1/.misplaced_objects/%s' % container + + listing_qs('1:/AUTH_jeb/c/o1'))]) + self.assertEqual( + self.fake_swift.storage_policy[0].calls, + [('HEAD', '/v1/AUTH_jeb/c/o1')], + ) + self.assertEqual( + self.fake_swift.storage_policy[1].calls, + [('GET', '/v1/AUTH_jeb/c/o1')], + ) + + # the queue entry is old enough that the tombstones, if any, have + # probably been reaped, so we'll just give up + self.assertEqual( + deleted_container_entries, + [('.misplaced_objects', container, '1:/AUTH_jeb/c/o1')]) + + def test_delete_old_empty_queue_containers(self): + ts = float(normalize_timestamp(time.time())) - \ + self.reconciler.reclaim_age * 1.1 + container = str(int(ts // 3600 * 3600)) + older_ts = ts - 3600 + older_container = str(int(older_ts // 3600 * 3600)) + self._mock_listing({ + (None, "/.misplaced_objects/%s/" % container): 0, + (None, "/.misplaced_objects/%s/something" % older_container): 0, + }) + deleted_container_entries = self._run_once() + self.assertEqual(deleted_container_entries, []) + self.assertEqual( + self.fake_swift.calls, + [('GET', self.current_container_path), + ('GET', '/v1/.misplaced_objects' + listing_qs('')), + ('GET', '/v1/.misplaced_objects' + listing_qs(container)), + ('GET', '/v1/.misplaced_objects/%s' % container + listing_qs('')), + ('DELETE', '/v1/.misplaced_objects/%s' % container), + ('GET', '/v1/.misplaced_objects/%s' % older_container + + listing_qs('')), + ('GET', '/v1/.misplaced_objects/%s' % older_container + + listing_qs('something'))]) + self.assertEqual(self.reconciler.stats['invalid_record'], 1) + + def test_iter_over_old_containers_in_reverse(self): + step = reconciler.MISPLACED_OBJECTS_CONTAINER_DIVISOR + now = self.start_interval + containers = [] + for i in range(10): + container_ts = int(now - step * i) + container_name = str(container_ts // 3600 * 3600) + containers.append(container_name) + # add some old containers too + now -= self.reconciler.reclaim_age + old_containers = [] + for i in range(10): + container_ts = int(now - step * i) + container_name = str(container_ts // 3600 * 3600) + old_containers.append(container_name) + containers.sort() + old_containers.sort() + all_containers = old_containers + containers + self._mock_listing(dict(( + (None, "/.misplaced_objects/%s/" % container), 0 + ) for container in all_containers)) + deleted_container_entries = self._run_once() + self.assertEqual(deleted_container_entries, []) + last_container = all_containers[-1] + account_listing_calls = [ + ('GET', '/v1/.misplaced_objects' + listing_qs('')), + ('GET', '/v1/.misplaced_objects' + listing_qs(last_container)), + ] + new_container_calls = [ + ('GET', '/v1/.misplaced_objects/%s' % container + + listing_qs('')) for container in reversed(containers) + ][1:] # current_container get's skipped the second time around... + old_container_listings = [ + ('GET', '/v1/.misplaced_objects/%s' % container + + listing_qs('')) for container in reversed(old_containers) + ] + old_container_deletes = [ + ('DELETE', '/v1/.misplaced_objects/%s' % container) + for container in reversed(old_containers) + ] + old_container_calls = list(itertools.chain(*zip( + old_container_listings, old_container_deletes))) + self.assertEqual(self.fake_swift.calls, + [('GET', self.current_container_path)] + + account_listing_calls + new_container_calls + + old_container_calls) + + def test_error_in_iter_containers(self): + self._mock_listing({}) + + # make the listing return an error + self.fake_swift.storage_policy[None].register( + 'GET', '/v1/.misplaced_objects' + listing_qs(''), + swob.HTTPServiceUnavailable, {}) + + self._run_once() + self.assertEqual( + self.fake_swift.calls, + [('GET', self.current_container_path), + ('GET', '/v1/.misplaced_objects' + listing_qs(''))]) + self.assertEqual(self.reconciler.stats, {}) + errors = self.reconciler.logger.get_lines_for_level('error') + self.assertEqual(errors, [ + 'Error listing containers in account ' + '.misplaced_objects (Unexpected response: ' + '503 Service Unavailable)']) + + def test_unhandled_exception_in_reconcile(self): + self._mock_listing({}) + + # make the listing blow up + def blow_up(*args, **kwargs): + raise Exception('kaboom!') + + self.fake_swift.storage_policy[None].register( + 'GET', '/v1/.misplaced_objects' + listing_qs(''), + blow_up, {}) + self._run_once() + self.assertEqual( + self.fake_swift.calls, + [('GET', self.current_container_path), + ('GET', '/v1/.misplaced_objects' + listing_qs(''))]) + self.assertEqual(self.reconciler.stats, {}) + errors = self.reconciler.logger.get_lines_for_level('error') + self.assertEqual(errors, + ['Unhandled Exception trying to reconcile: ']) + + +if __name__ == '__main__': + unittest.main()