diff --git a/swift/common/db_replicator.py b/swift/common/db_replicator.py index 23fdb20ff7..9e1282c0ba 100644 --- a/swift/common/db_replicator.py +++ b/swift/common/db_replicator.py @@ -21,17 +21,17 @@ import shutil import uuid import errno import re +from contextlib import contextmanager from swift import gettext_ as _ from eventlet import GreenPool, sleep, Timeout from eventlet.green import subprocess -import simplejson import swift.common.db from swift.common.direct_client import quote from swift.common.utils import get_logger, whataremyips, storage_directory, \ renamer, mkdirs, lock_parent_directory, config_true_value, \ - unlink_older_than, dump_recon_cache, rsync_ip, ismount + unlink_older_than, dump_recon_cache, rsync_ip, ismount, json from swift.common import ring from swift.common.http import HTTP_NOT_FOUND, HTTP_INSUFFICIENT_STORAGE from swift.common.bufferedhttp import BufferedHTTPConnection @@ -129,7 +129,7 @@ class ReplConnection(BufferedHTTPConnection): :returns: bufferedhttp response object """ try: - body = simplejson.dumps(args) + body = json.dumps(args) self.request('REPLICATE', self.path, body, {'Content-Type': 'application/json'}) response = self.getresponse() @@ -348,6 +348,14 @@ class Replicator(Daemon): os.path.basename(db_file).split('.', 1)[0], self.logger) + def _gather_sync_args(self, info): + """ + Convert local replication_info to sync args tuple. + """ + sync_args_order = ('max_row', 'hash', 'id', 'created_at', + 'put_timestamp', 'delete_timestamp', 'metadata') + return tuple(info[key] for key in sync_args_order) + def _repl_to_node(self, node, broker, partition, info): """ Replicate a database to a node. @@ -367,21 +375,22 @@ class Replicator(Daemon): self.logger.error( _('ERROR Unable to connect to remote server: %s'), node) return False + sync_args = self._gather_sync_args(info) with Timeout(self.node_timeout): - response = http.replicate( - 'sync', info['max_row'], info['hash'], info['id'], - info['created_at'], info['put_timestamp'], - info['delete_timestamp'], info['metadata']) + response = http.replicate('sync', *sync_args) if not response: return False - elif response.status == HTTP_NOT_FOUND: # completely missing, rsync + return self._handle_sync_response(node, response, info, broker, http) + + def _handle_sync_response(self, node, response, info, broker, http): + if response.status == HTTP_NOT_FOUND: # completely missing, rsync self.stats['rsync'] += 1 self.logger.increment('rsyncs') return self._rsync_db(broker, node, http, info['id']) elif response.status == HTTP_INSUFFICIENT_STORAGE: raise DriveNotMounted() elif response.status >= 200 and response.status < 300: - rinfo = simplejson.loads(response.data) + rinfo = json.loads(response.data) local_sync = broker.get_sync(rinfo['id'], incoming=False) if self._in_sync(rinfo, info, broker, local_sync): return True @@ -416,17 +425,16 @@ class Replicator(Daemon): broker.reclaim(now - self.reclaim_age, now - (self.reclaim_age * 2)) info = broker.get_replication_info() - full_info = broker.get_info() bpart = self.ring.get_part( - full_info['account'], full_info.get('container')) + info['account'], info.get('container')) if bpart != int(partition): partition = bpart # Important to set this false here since the later check only # checks if it's on the proper device, not partition. shouldbehere = False - name = '/' + quote(full_info['account']) - if 'container' in full_info: - name += '/' + quote(full_info['container']) + name = '/' + quote(info['account']) + if 'container' in info: + name += '/' + quote(info['container']) self.logger.error( 'Found %s for %s when it should be on partition %s; will ' 'replicate out and remove.' % (object_file, name, bpart)) @@ -454,7 +462,7 @@ class Replicator(Daemon): if delete_timestamp < (now - self.reclaim_age) and \ delete_timestamp > put_timestamp and \ info['count'] in (None, '', 0, '0'): - if self.report_up_to_date(full_info): + if self.report_up_to_date(info): self.delete_db(object_file) self.logger.timing_since('timing', start_time) return @@ -597,55 +605,90 @@ class ReplicatorRpc(object): return HTTPNotFound() return getattr(self, op)(self.broker_class(db_file), args) - def sync(self, broker, args): + @contextmanager + def debug_timing(self, name): + timemark = time.time() + yield + timespan = time.time() - timemark + if timespan > DEBUG_TIMINGS_THRESHOLD: + self.logger.debug( + 'replicator-rpc-sync time for %s: %.02fs' % ( + name, timespan)) + + def _parse_sync_args(self, args): + """ + Convert remote sync args to remote_info dictionary. + """ (remote_sync, hash_, id_, created_at, put_timestamp, - delete_timestamp, metadata) = args - timemark = time.time() - try: - info = broker.get_replication_info() - except (Exception, Timeout) as e: - if 'no such table' in str(e): - self.logger.error(_("Quarantining DB %s"), broker) - quarantine_db(broker.db_file, broker.db_type) - return HTTPNotFound() - raise - timespan = time.time() - timemark - if timespan > DEBUG_TIMINGS_THRESHOLD: - self.logger.debug('replicator-rpc-sync time for info: %.02fs' % - timespan) + delete_timestamp, metadata) = args[:7] + remote_metadata = {} if metadata: - timemark = time.time() - broker.update_metadata(simplejson.loads(metadata)) - timespan = time.time() - timemark - if timespan > DEBUG_TIMINGS_THRESHOLD: - self.logger.debug('replicator-rpc-sync time for ' - 'update_metadata: %.02fs' % timespan) - if info['put_timestamp'] != put_timestamp or \ - info['created_at'] != created_at or \ - info['delete_timestamp'] != delete_timestamp: - timemark = time.time() - broker.merge_timestamps( - created_at, put_timestamp, delete_timestamp) - timespan = time.time() - timemark - if timespan > DEBUG_TIMINGS_THRESHOLD: - self.logger.debug('replicator-rpc-sync time for ' - 'merge_timestamps: %.02fs' % timespan) - timemark = time.time() - info['point'] = broker.get_sync(id_) - timespan = time.time() - timemark - if timespan > DEBUG_TIMINGS_THRESHOLD: - self.logger.debug('replicator-rpc-sync time for get_sync: ' - '%.02fs' % timespan) - if hash_ == info['hash'] and info['point'] < remote_sync: - timemark = time.time() - broker.merge_syncs([{'remote_id': id_, - 'sync_point': remote_sync}]) - info['point'] = remote_sync - timespan = time.time() - timemark - if timespan > DEBUG_TIMINGS_THRESHOLD: - self.logger.debug('replicator-rpc-sync time for ' - 'merge_syncs: %.02fs' % timespan) - return Response(simplejson.dumps(info)) + try: + remote_metadata = json.loads(metadata) + except ValueError: + self.logger.error("Unable to decode remote metadata %r", + metadata) + remote_info = { + 'point': remote_sync, + 'hash': hash_, + 'id': id_, + 'created_at': created_at, + 'put_timestamp': put_timestamp, + 'delete_timestamp': delete_timestamp, + 'metadata': remote_metadata, + } + return remote_info + + def sync(self, broker, args): + remote_info = self._parse_sync_args(args) + return self._handle_sync_request(broker, remote_info) + + def _get_synced_replication_info(self, broker, remote_info): + """ + Apply any changes to the broker based on remote_info and return the + current replication info. + + :param broker: the database broker + :param remote_info: the remote replication info + + :returns: local broker replication info + """ + return broker.get_replication_info() + + def _handle_sync_request(self, broker, remote_info): + """ + Update metadata, timestamps, sync points. + """ + with self.debug_timing('info'): + try: + info = self._get_synced_replication_info(broker, remote_info) + except (Exception, Timeout) as e: + if 'no such table' in str(e): + self.logger.error(_("Quarantining DB %s"), broker) + quarantine_db(broker.db_file, broker.db_type) + return HTTPNotFound() + raise + if remote_info['metadata']: + with self.debug_timing('update_metadata'): + broker.update_metadata(remote_info['metadata']) + sync_timestamps = ('created_at', 'put_timestamp', 'delete_timestamp') + if any(info[ts] != remote_info[ts] for ts in sync_timestamps): + with self.debug_timing('merge_timestamps'): + broker.merge_timestamps(*(remote_info[ts] for ts in + sync_timestamps)) + with self.debug_timing('get_sync'): + info['point'] = broker.get_sync(remote_info['id']) + if remote_info['hash'] == info['hash'] and \ + info['point'] < remote_info['point']: + with self.debug_timing('merge_syncs'): + translate = { + 'remote_id': 'id', + 'sync_point': 'point', + } + data = dict((k, remote_info[v]) for k, v in translate.items()) + broker.merge_syncs([data]) + info['point'] = remote_info['point'] + return Response(json.dumps(info)) def merge_syncs(self, broker, args): broker.merge_syncs(args[0]) diff --git a/swift/container/replicator.py b/swift/container/replicator.py index f69edba01f..b0a2e911a3 100644 --- a/swift/container/replicator.py +++ b/swift/container/replicator.py @@ -13,8 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time + from swift.container.backend import ContainerBroker, DATADIR +from swift.container.reconciler import incorrect_policy_index from swift.common import db_replicator +from swift.common.utils import json, normalize_timestamp +from swift.common.http import is_success +from swift.common.storage_policy import POLICIES class ContainerReplicator(db_replicator.Replicator): @@ -29,3 +35,58 @@ class ContainerReplicator(db_replicator.Replicator): if full_info['reported_' + key] != full_info[key]: return False return True + + def _gather_sync_args(self, replication_info): + parent = super(ContainerReplicator, self) + sync_args = parent._gather_sync_args(replication_info) + if len(POLICIES) > 1: + sync_args += tuple(replication_info[k] for k in + ('status_changed_at', 'count', + 'storage_policy_index')) + return sync_args + + def _handle_sync_response(self, node, response, info, broker, http): + parent = super(ContainerReplicator, self) + if is_success(response.status): + remote_info = json.loads(response.data) + if incorrect_policy_index(info, remote_info): + status_changed_at = normalize_timestamp(time.time()) + broker.set_storage_policy_index( + remote_info['storage_policy_index'], + timestamp=status_changed_at) + broker.merge_timestamps(*(remote_info[key] for key in ( + 'created_at', 'put_timestamp', 'delete_timestamp'))) + rv = parent._handle_sync_response( + node, response, info, broker, http) + return rv + + +class ContainerReplicatorRpc(db_replicator.ReplicatorRpc): + + def _parse_sync_args(self, args): + parent = super(ContainerReplicatorRpc, self) + remote_info = parent._parse_sync_args(args) + if len(args) > 9: + remote_info['status_changed_at'] = args[7] + remote_info['count'] = args[8] + remote_info['storage_policy_index'] = args[9] + return remote_info + + def _get_synced_replication_info(self, broker, remote_info): + """ + Sync the remote_info storage_policy_index if needed and return the + newly synced replication info. + + :param broker: the database broker + :param remote_info: the remote replication info + + :returns: local broker replication info + """ + info = broker.get_replication_info() + if incorrect_policy_index(info, remote_info): + status_changed_at = normalize_timestamp(time.time()) + broker.set_storage_policy_index( + remote_info['storage_policy_index'], + timestamp=status_changed_at) + info = broker.get_replication_info() + return info diff --git a/swift/container/server.py b/swift/container/server.py index 2432f2c0c6..15269f8616 100644 --- a/swift/container/server.py +++ b/swift/container/server.py @@ -24,6 +24,7 @@ from eventlet import Timeout import swift.common.db from swift.container.backend import ContainerBroker, DATADIR +from swift.container.replicator import ContainerReplicatorRpc from swift.common.db import DatabaseAlreadyExists from swift.common.container_sync_realms import ContainerSyncRealms from swift.common.request_helpers import get_param, get_listing_content_type, \ @@ -36,7 +37,6 @@ from swift.common.constraints import check_mount, check_float, check_utf8 from swift.common import constraints from swift.common.bufferedhttp import http_connect from swift.common.exceptions import ConnectionTimeout -from swift.common.db_replicator import ReplicatorRpc from swift.common.http import HTTP_NOT_FOUND, is_success from swift.common.storage_policy import POLICIES, POLICY_INDEX from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPConflict, \ @@ -102,7 +102,7 @@ class ContainerController(object): h.strip() for h in conf.get('allowed_sync_hosts', '127.0.0.1').split(',') if h.strip()] - self.replicator_rpc = ReplicatorRpc( + self.replicator_rpc = ContainerReplicatorRpc( self.root, DATADIR, ContainerBroker, self.mount_check, logger=self.logger) self.auto_create_account_prefix = \ diff --git a/test/probe/test_container_merge_policy_index.py b/test/probe/test_container_merge_policy_index.py index 69af958d18..da6b12315e 100644 --- a/test/probe/test_container_merge_policy_index.py +++ b/test/probe/test_container_merge_policy_index.py @@ -20,11 +20,13 @@ from optparse import OptionParser from urlparse import urlparse import random +from nose import SkipTest + from swift.common.manager import Manager from swift.common.storage_policy import POLICIES -from swift.common import utils, ring +from swift.common import utils, ring, direct_client from swift.common.http import HTTP_NOT_FOUND -from test.probe.common import reset_environment +from test.probe.common import reset_environment, get_to_final_state from swiftclient import client, get_auth, ClientException @@ -172,7 +174,7 @@ class TestContainerMergePolicyIndex(unittest.TestCase): def setUp(self): if len(POLICIES) < 2: - raise unittest.SkipTest() + raise SkipTest() (self.pids, self.port2server, self.account_ring, self.container_ring, self.object_ring, self.url, self.token, self.account, self.configs) = reset_environment() @@ -181,6 +183,40 @@ class TestContainerMergePolicyIndex(unittest.TestCase): self.brain = BrainSplitter(self.url, self.token, self.container_name, self.object_name) + def test_merge_storage_policy_index(self): + # generic split brain + self.brain.stop_primary_half() + self.brain.put_container() + self.brain.start_primary_half() + self.brain.stop_handoff_half() + self.brain.put_container() + self.brain.put_object() + self.brain.start_handoff_half() + # make sure we have some manner of split brain + container_part, container_nodes = self.container_ring.get_nodes( + self.account, self.container_name) + head_responses = [] + for node in container_nodes: + metadata = direct_client.direct_head_container( + node, container_part, self.account, self.container_name) + head_responses.append((node, metadata)) + found_policy_indexes = set(metadata['x-storage-policy-index'] for + node, metadata in head_responses) + self.assert_(len(found_policy_indexes) > 1, + 'primary nodes did not disagree about policy index %r' % + head_responses) + get_to_final_state() + head_responses = [] + for node in container_nodes: + metadata = direct_client.direct_head_container( + node, container_part, self.account, self.container_name) + head_responses.append((node, metadata)) + found_policy_indexes = set(metadata['x-storage-policy-index'] for + node, metadata in head_responses) + self.assert_(len(found_policy_indexes) == 1, + 'primary nodes disagree about policy index %r' % + head_responses) + def main(): options, commands = parser.parse_args() diff --git a/test/unit/common/test_db_replicator.py b/test/unit/common/test_db_replicator.py index 406aa6622d..181c44d496 100644 --- a/test/unit/common/test_db_replicator.py +++ b/test/unit/common/test_db_replicator.py @@ -21,14 +21,15 @@ import errno import math import time from mock import patch, call -from shutil import rmtree +from shutil import rmtree, copy from tempfile import mkdtemp, NamedTemporaryFile import mock import simplejson from swift.container.backend import DATADIR from swift.common import db_replicator -from swift.common.utils import normalize_timestamp +from swift.common.utils import (normalize_timestamp, hash_path, + storage_directory) from swift.common.exceptions import DriveNotMounted from swift.common.swob import HTTPException @@ -181,6 +182,7 @@ class FakeBroker(object): get_repl_missing_table = False stub_replication_info = None db_type = 'container' + db_contains_type = 'object' info = {'account': TEST_ACCOUNT_NAME, 'container': TEST_CONTAINER_NAME} def __init__(self, *args, **kwargs): @@ -215,17 +217,21 @@ class FakeBroker(object): def get_replication_info(self): if self.get_repl_missing_table: raise Exception('no such table') + info = dict(self.info) + info.update({ + 'hash': 12345, + 'delete_timestamp': 0, + 'put_timestamp': 1, + 'created_at': 1, + 'count': 0, + }) if self.stub_replication_info: - return self.stub_replication_info - return {'delete_timestamp': 0, 'put_timestamp': 1, 'count': 0, - 'hash': 12345, 'created_at': 1} + info.update(self.stub_replication_info) + return info def reclaim(self, item_timestamp, sync_timestamp): pass - def get_info(self): - return self.info - def newid(self, remote_d): pass @@ -240,6 +246,7 @@ class FakeBroker(object): class FakeAccountBroker(FakeBroker): db_type = 'account' + db_contains_type = 'container' info = {'account': TEST_ACCOUNT_NAME} @@ -578,7 +585,7 @@ class TestDBReplicator(unittest.TestCase): try: replicator.delete_db = self.stub_delete_db replicator.brokerclass.stub_replication_info = { - 'delete_timestamp': 2, 'put_timestamp': 1, 'count': 0} + 'delete_timestamp': 2, 'put_timestamp': 1} replicator._replicate_object('0', '/path/to/file', 'node_id') finally: replicator.brokerclass.stub_replication_info = None @@ -601,10 +608,10 @@ class TestDBReplicator(unittest.TestCase): node_id = replicator.ring.get_part_nodes(part)[0]['id'] replicator._replicate_object(str(part), '/path/to/file', node_id) self.assertEqual(['/path/to/file'], self.delete_db_calls) - self.assertEqual( - replicator.logger.log_dict['error'], - [(('Found /path/to/file for /a%20c%20t when it should be on ' - 'partition 0; will replicate out and remove.',), {})]) + error_msgs = replicator.logger.get_lines_for_level('error') + expected = 'Found /path/to/file for /a%20c%20t when it should be ' \ + 'on partition 0; will replicate out and remove.' + self.assertEqual(error_msgs, [expected]) def test_replicate_container_out_of_place(self): replicator = TestReplicator({}, logger=unit.FakeLogger()) @@ -885,10 +892,14 @@ class TestDBReplicator(unittest.TestCase): def test_replicator_sync_with_broker_replication_missing_table(self): rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, False) + rpc.logger = unit.debug_logger() broker = FakeBroker() broker.get_repl_missing_table = True + called = [] + def mock_quarantine_db(object_file, server_type): + called.append(True) self.assertEquals(broker.db_file, object_file) self.assertEquals(broker.db_type, server_type) @@ -901,6 +912,11 @@ class TestDBReplicator(unittest.TestCase): self.assertEquals('404 Not Found', response.status) self.assertEquals(404, response.status_int) + self.assertEqual(called, [True]) + errors = rpc.logger.get_lines_for_level('error') + self.assertEqual(errors, + ["Unable to decode remote metadata 'metadata'", + "Quarantining DB %s" % broker]) def test_replicator_sync(self): rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, False) @@ -1223,5 +1239,110 @@ class TestReplToNode(unittest.TestCase): self.fake_node, FakeBroker(), '0', self.fake_info), False) +class FakeHTTPResponse(object): + + def __init__(self, resp): + self.resp = resp + + @property + def status(self): + return self.resp.status_int + + @property + def data(self): + return self.resp.body + + +def attach_fake_replication_rpc(rpc, replicate_hook=None): + class FakeReplConnection(object): + + def __init__(self, node, partition, hash_, logger): + self.logger = logger + self.node = node + self.partition = partition + self.path = '/%s/%s/%s' % (node['device'], partition, hash_) + self.host = node['replication_ip'] + + def replicate(self, op, *sync_args): + print 'REPLICATE: %s, %s, %r' % (self.path, op, sync_args) + replicate_args = self.path.lstrip('/').split('/') + args = [op] + list(sync_args) + swob_response = rpc.dispatch(replicate_args, args) + resp = FakeHTTPResponse(swob_response) + if replicate_hook: + replicate_hook(op, *sync_args) + return resp + + return FakeReplConnection + + +class TestReplicatorSync(unittest.TestCase): + + backend = None # override in subclass + datadir = None + replicator_daemon = db_replicator.Replicator + replicator_rpc = db_replicator.ReplicatorRpc + + def setUp(self): + self.root = mkdtemp() + self.rpc = self.replicator_rpc( + self.root, self.datadir, self.backend, False, + logger=unit.debug_logger()) + FakeReplConnection = attach_fake_replication_rpc(self.rpc) + self._orig_ReplConnection = db_replicator.ReplConnection + db_replicator.ReplConnection = FakeReplConnection + self._orig_Ring = db_replicator.ring.Ring + self._ring = unit.FakeRing() + db_replicator.ring.Ring = lambda *args, **kwargs: self._get_ring() + self.logger = unit.debug_logger() + + def tearDown(self): + db_replicator.ReplConnection = self._orig_ReplConnection + db_replicator.ring.Ring = self._orig_Ring + rmtree(self.root) + + def _get_ring(self): + return self._ring + + def _get_broker(self, account, container=None, node_index=0): + hash_ = hash_path(account, container) + part, nodes = self._ring.get_nodes(account, container) + drive = nodes[node_index]['device'] + db_path = os.path.join(self.root, drive, + storage_directory(self.datadir, part, hash_), + hash_ + '.db') + return self.backend(db_path, account=account, container=container) + + def _get_broker_part_node(self, broker): + part, nodes = self._ring.get_nodes(broker.account, broker.container) + storage_dir = broker.db_file[len(self.root):].lstrip(os.path.sep) + broker_device = storage_dir.split(os.path.sep, 1)[0] + for node in nodes: + if node['device'] == broker_device: + return part, node + + def _run_once(self, node, conf_updates=None, daemon=None): + conf = { + 'devices': self.root, + 'recon_cache_path': self.root, + 'mount_check': 'false', + 'bind_port': node['replication_port'], + } + if conf_updates: + conf.update(conf_updates) + daemon = daemon or self.replicator_daemon(conf, logger=self.logger) + + def _rsync_file(db_file, remote_file, **kwargs): + remote_server, remote_path = remote_file.split('/', 1) + dest_path = os.path.join(self.root, remote_path) + copy(db_file, dest_path) + return True + daemon._rsync_file = _rsync_file + with mock.patch('swift.common.db_replicator.whataremyips', + new=lambda: [node['replication_ip']]): + daemon.run_once() + return daemon + + if __name__ == '__main__': unittest.main() diff --git a/test/unit/container/test_replicator.py b/test/unit/container/test_replicator.py index 8fa92770fb..cd88518c9a 100644 --- a/test/unit/container/test_replicator.py +++ b/test/unit/container/test_replicator.py @@ -13,9 +13,22 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os +import time +import shutil +import itertools import unittest -from swift.container import replicator +import mock +import random +import sqlite3 + +from swift.common import db_replicator +from swift.container import replicator, backend, server from swift.common.utils import normalize_timestamp +from swift.common.storage_policy import POLICIES + +from test.unit.common import test_db_replicator +from test.unit import patch_policies class TestReplicator(unittest.TestCase): @@ -56,5 +69,601 @@ class TestReplicator(unittest.TestCase): self.assertTrue(repl.report_up_to_date(info)) +@patch_policies +class TestReplicatorSync(test_db_replicator.TestReplicatorSync): + + backend = backend.ContainerBroker + datadir = server.DATADIR + replicator_daemon = replicator.ContainerReplicator + replicator_rpc = replicator.ContainerReplicatorRpc + + def test_sync_remote_in_sync(self): + # setup a local container + broker = self._get_broker('a', 'c', node_index=0) + put_timestamp = time.time() + broker.initialize(put_timestamp, POLICIES.default.idx) + # "replicate" to same database + node = {'device': 'sdb', 'replication_ip': '127.0.0.1'} + daemon = replicator.ContainerReplicator({}) + # replicate + part, node = self._get_broker_part_node(broker) + info = broker.get_replication_info() + success = daemon._repl_to_node(node, broker, part, info) + # nothing to do + self.assertTrue(success) + self.assertEqual(1, daemon.stats['no_change']) + + def test_sync_remote_with_timings(self): + # setup a local container + broker = self._get_broker('a', 'c', node_index=0) + put_timestamp = time.time() + broker.initialize(put_timestamp, POLICIES.default.idx) + broker.update_metadata( + {'x-container-meta-test': ('foo', put_timestamp)}) + # setup remote container + remote_broker = self._get_broker('a', 'c', node_index=1) + remote_broker.initialize(time.time(), POLICIES.default.idx) + timestamp = time.time() + for db in (broker, remote_broker): + db.put_object('/a/c/o', timestamp, 0, 'content-type', 'etag', + storage_policy_index=db.storage_policy_index) + # replicate + daemon = replicator.ContainerReplicator({}) + part, node = self._get_broker_part_node(remote_broker) + info = broker.get_replication_info() + with mock.patch.object(db_replicator, 'DEBUG_TIMINGS_THRESHOLD', 0): + success = daemon._repl_to_node(node, broker, part, info) + # nothing to do + self.assertTrue(success) + self.assertEqual(1, daemon.stats['no_change']) + expected_timings = ('info', 'update_metadata', 'merge_timestamps', + 'get_sync', 'merge_syncs') + debug_lines = self.rpc.logger.logger.get_lines_for_level('debug') + self.assertEqual(len(expected_timings), len(debug_lines)) + for metric in expected_timings: + expected = 'replicator-rpc-sync time for %s:' % metric + self.assert_(any(expected in line for line in debug_lines), + 'debug timing %r was not in %r' % ( + expected, debug_lines)) + + def test_sync_remote_missing(self): + broker = self._get_broker('a', 'c', node_index=0) + put_timestamp = time.time() + broker.initialize(put_timestamp, POLICIES.default.idx) + + # "replicate" + part, node = self._get_broker_part_node(broker) + daemon = self._run_once(node) + + # complete rsync to all other nodes + self.assertEqual(2, daemon.stats['rsync']) + for i in range(1, 3): + remote_broker = self._get_broker('a', 'c', node_index=i) + self.assertTrue(os.path.exists(remote_broker.db_file)) + remote_info = remote_broker.get_info() + local_info = self._get_broker( + 'a', 'c', node_index=0).get_info() + for k, v in local_info.items(): + if k == 'id': + continue + self.assertEqual(remote_info[k], v, + "mismatch remote %s %r != %r" % ( + k, remote_info[k], v)) + + def test_rsync_failure(self): + broker = self._get_broker('a', 'c', node_index=0) + put_timestamp = time.time() + broker.initialize(put_timestamp, POLICIES.default.idx) + # "replicate" to different device + daemon = replicator.ContainerReplicator({}) + + def _rsync_file(*args, **kwargs): + return False + daemon._rsync_file = _rsync_file + + # replicate + part, local_node = self._get_broker_part_node(broker) + node = random.choice([n for n in self._ring.devs if n != local_node]) + info = broker.get_replication_info() + success = daemon._repl_to_node(node, broker, part, info) + self.assertFalse(success) + + def test_sync_remote_missing_most_rows(self): + put_timestamp = time.time() + # create "local" broker + broker = self._get_broker('a', 'c', node_index=0) + broker.initialize(put_timestamp, POLICIES.default.idx) + # create "remote" broker + remote_broker = self._get_broker('a', 'c', node_index=1) + remote_broker.initialize(put_timestamp, POLICIES.default.idx) + # add a row to "local" db + broker.put_object('/a/c/o', time.time(), 0, 'content-type', 'etag', + storage_policy_index=broker.storage_policy_index) + #replicate + node = {'device': 'sdc', 'replication_ip': '127.0.0.1'} + daemon = replicator.ContainerReplicator({}) + + def _rsync_file(db_file, remote_file, **kwargs): + remote_server, remote_path = remote_file.split('/', 1) + dest_path = os.path.join(self.root, remote_path) + shutil.copy(db_file, dest_path) + return True + daemon._rsync_file = _rsync_file + part, node = self._get_broker_part_node(remote_broker) + info = broker.get_replication_info() + success = daemon._repl_to_node(node, broker, part, info) + self.assertTrue(success) + # row merge + self.assertEqual(1, daemon.stats['remote_merge']) + local_info = self._get_broker( + 'a', 'c', node_index=0).get_info() + remote_info = self._get_broker( + 'a', 'c', node_index=1).get_info() + for k, v in local_info.items(): + if k == 'id': + continue + self.assertEqual(remote_info[k], v, + "mismatch remote %s %r != %r" % ( + k, remote_info[k], v)) + + def test_sync_remote_missing_one_rows(self): + put_timestamp = time.time() + # create "local" broker + broker = self._get_broker('a', 'c', node_index=0) + broker.initialize(put_timestamp, POLICIES.default.idx) + # create "remote" broker + remote_broker = self._get_broker('a', 'c', node_index=1) + remote_broker.initialize(put_timestamp, POLICIES.default.idx) + # add some rows to both db + for i in range(10): + put_timestamp = time.time() + for db in (broker, remote_broker): + path = '/a/c/o_%s' % i + db.put_object(path, put_timestamp, 0, 'content-type', 'etag', + storage_policy_index=db.storage_policy_index) + # now a row to the "local" broker only + broker.put_object('/a/c/o_missing', time.time(), 0, + 'content-type', 'etag', + storage_policy_index=broker.storage_policy_index) + # replicate + daemon = replicator.ContainerReplicator({}) + part, node = self._get_broker_part_node(remote_broker) + info = broker.get_replication_info() + success = daemon._repl_to_node(node, broker, part, info) + self.assertTrue(success) + # row merge + self.assertEqual(1, daemon.stats['diff']) + local_info = self._get_broker( + 'a', 'c', node_index=0).get_info() + remote_info = self._get_broker( + 'a', 'c', node_index=1).get_info() + for k, v in local_info.items(): + if k == 'id': + continue + self.assertEqual(remote_info[k], v, + "mismatch remote %s %r != %r" % ( + k, remote_info[k], v)) + + def test_sync_remote_can_not_keep_up(self): + put_timestamp = time.time() + # create "local" broker + broker = self._get_broker('a', 'c', node_index=0) + broker.initialize(put_timestamp, POLICIES.default.idx) + # create "remote" broker + remote_broker = self._get_broker('a', 'c', node_index=1) + remote_broker.initialize(put_timestamp, POLICIES.default.idx) + # add some rows to both db's + for i in range(10): + put_timestamp = time.time() + for db in (broker, remote_broker): + obj_name = 'o_%s' % i + db.put_object(obj_name, put_timestamp, 0, + 'content-type', 'etag', + storage_policy_index=db.storage_policy_index) + # setup REPLICATE callback to simulate adding rows during merge_items + missing_counter = itertools.count() + + def put_more_objects(op, *args): + if op != 'merge_items': + return + path = '/a/c/o_missing_%s' % missing_counter.next() + broker.put_object(path, time.time(), 0, 'content-type', 'etag', + storage_policy_index=db.storage_policy_index) + test_db_replicator.FakeReplConnection = \ + test_db_replicator.attach_fake_replication_rpc( + self.rpc, replicate_hook=put_more_objects) + db_replicator.ReplConnection = test_db_replicator.FakeReplConnection + # and add one extra to local db to trigger merge_items + put_more_objects('merge_items') + # limit number of times we'll call merge_items + daemon = replicator.ContainerReplicator({'max_diffs': 10}) + # replicate + part, node = self._get_broker_part_node(remote_broker) + info = broker.get_replication_info() + success = daemon._repl_to_node(node, broker, part, info) + self.assertFalse(success) + # back off on the PUTs during replication... + FakeReplConnection = test_db_replicator.attach_fake_replication_rpc( + self.rpc, replicate_hook=None) + db_replicator.ReplConnection = FakeReplConnection + # retry replication + info = broker.get_replication_info() + success = daemon._repl_to_node(node, broker, part, info) + self.assertTrue(success) + # row merge + self.assertEqual(2, daemon.stats['diff']) + self.assertEqual(1, daemon.stats['diff_capped']) + local_info = self._get_broker( + 'a', 'c', node_index=0).get_info() + remote_info = self._get_broker( + 'a', 'c', node_index=1).get_info() + for k, v in local_info.items(): + if k == 'id': + continue + self.assertEqual(remote_info[k], v, + "mismatch remote %s %r != %r" % ( + k, remote_info[k], v)) + + def test_sync_status_change(self): + # setup a local container + broker = self._get_broker('a', 'c', node_index=0) + put_timestamp = time.time() + broker.initialize(put_timestamp, POLICIES.default.idx) + # setup remote container + remote_broker = self._get_broker('a', 'c', node_index=1) + remote_broker.initialize(put_timestamp, POLICIES.default.idx) + # delete local container + broker.delete_db(time.time()) + # replicate + daemon = replicator.ContainerReplicator({}) + part, node = self._get_broker_part_node(remote_broker) + info = broker.get_replication_info() + success = daemon._repl_to_node(node, broker, part, info) + # nothing to do + self.assertTrue(success) + self.assertEqual(1, daemon.stats['no_change']) + # status in sync + self.assertTrue(remote_broker.is_deleted()) + info = broker.get_info() + remote_info = remote_broker.get_info() + self.assert_(float(remote_info['status_changed_at']) > + float(remote_info['put_timestamp']), + 'remote status_changed_at (%s) is not ' + 'greater than put_timestamp (%s)' % ( + remote_info['status_changed_at'], + remote_info['put_timestamp'])) + self.assert_(float(remote_info['status_changed_at']) > + float(info['status_changed_at']), + 'remote status_changed_at (%s) is not ' + 'greater than local status_changed_at (%s)' % ( + remote_info['status_changed_at'], + info['status_changed_at'])) + + def test_sync_bogus_db_quarantines(self): + ts = (normalize_timestamp(t) for t in + itertools.count(int(time.time()))) + policy = random.choice(list(POLICIES)) + + # create "local" broker + local_broker = self._get_broker('a', 'c', node_index=0) + local_broker.initialize(ts.next(), policy.idx) + + # create "remote" broker + remote_broker = self._get_broker('a', 'c', node_index=1) + remote_broker.initialize(ts.next(), policy.idx) + + db_path = local_broker.db_file + self.assertTrue(os.path.exists(db_path)) # sanity check + old_inode = os.stat(db_path).st_ino + + _orig_get_info = backend.ContainerBroker.get_info + + def fail_like_bad_db(broker): + if broker.db_file == local_broker.db_file: + raise sqlite3.OperationalError("no such table: container_info") + else: + return _orig_get_info(broker) + + part, node = self._get_broker_part_node(remote_broker) + with mock.patch('swift.container.backend.ContainerBroker.get_info', + fail_like_bad_db): + # Have the remote node replicate to local; local should see its + # corrupt DB, quarantine it, and act like the DB wasn't ever there + # in the first place. + daemon = self._run_once(node) + + self.assertTrue(os.path.exists(db_path)) + # Make sure we didn't just keep the old DB, but quarantined it and + # made a fresh copy. + new_inode = os.stat(db_path).st_ino + self.assertNotEqual(old_inode, new_inode) + self.assertEqual(daemon.stats['failure'], 0) + + def _replication_scenarios(self, *scenarios, **kwargs): + remote_wins = kwargs.get('remote_wins', False) + # these tests are duplicated because of the differences in replication + # when row counts cause full rsync vs. merge + scenarios = scenarios or ( + 'no_row', 'local_row', 'remote_row', 'both_rows') + for scenario_name in scenarios: + ts = itertools.count(int(time.time())) + policy = random.choice(list(POLICIES)) + remote_policy = random.choice( + [p for p in POLICIES if p is not policy]) + broker = self._get_broker('a', 'c', node_index=0) + remote_broker = self._get_broker('a', 'c', node_index=1) + yield ts, policy, remote_policy, broker, remote_broker + # variations on different replication scenarios + variations = { + 'no_row': (), + 'local_row': (broker,), + 'remote_row': (remote_broker,), + 'both_rows': (broker, remote_broker), + } + dbs = variations[scenario_name] + obj_ts = ts.next() + for db in dbs: + db.put_object('/a/c/o', obj_ts, 0, 'content-type', 'etag', + storage_policy_index=db.storage_policy_index) + # replicate + part, node = self._get_broker_part_node(broker) + daemon = self._run_once(node) + self.assertEqual(0, daemon.stats['failure']) + + # in sync + local_info = self._get_broker( + 'a', 'c', node_index=0).get_info() + remote_info = self._get_broker( + 'a', 'c', node_index=1).get_info() + if remote_wins: + expected = remote_policy.idx + err = 'local policy did not change to match remote ' \ + 'for replication row scenario %s' % scenario_name + else: + expected = policy.idx + err = 'local policy changed to match remote ' \ + 'for replication row scenario %s' % scenario_name + self.assertEqual(local_info['storage_policy_index'], expected, err) + self.assertEqual(remote_info['storage_policy_index'], + local_info['storage_policy_index']) + test_db_replicator.TestReplicatorSync.tearDown(self) + test_db_replicator.TestReplicatorSync.setUp(self) + + def test_sync_local_create_policy_over_newer_remote_create(self): + for setup in self._replication_scenarios(): + ts, policy, remote_policy, broker, remote_broker = setup + # create "local" broker + broker.initialize(ts.next(), policy.idx) + # create "remote" broker + remote_broker.initialize(ts.next(), remote_policy.idx) + + def test_sync_local_create_policy_over_newer_remote_delete(self): + for setup in self._replication_scenarios(): + ts, policy, remote_policy, broker, remote_broker = setup + # create older "local" broker + broker.initialize(ts.next(), policy.idx) + # create "remote" broker + remote_broker.initialize(ts.next(), remote_policy.idx) + # delete "remote" broker + remote_broker.delete_db(ts.next()) + + def test_sync_local_create_policy_over_older_remote_delete(self): + # remote_row & both_rows cases are covered by + # "test_sync_remote_half_delete_policy_over_newer_local_create" + for setup in self._replication_scenarios( + 'no_row', 'local_row'): + ts, policy, remote_policy, broker, remote_broker = setup + # create older "remote" broker + remote_broker.initialize(ts.next(), remote_policy.idx) + # delete older "remote" broker + remote_broker.delete_db(ts.next()) + # create "local" broker + broker.initialize(ts.next(), policy.idx) + + def test_sync_local_half_delete_policy_over_newer_remote_create(self): + # no_row & remote_row cases are covered by + # "test_sync_remote_create_policy_over_older_local_delete" + for setup in self._replication_scenarios('local_row', 'both_rows'): + ts, policy, remote_policy, broker, remote_broker = setup + # create older "local" broker + broker.initialize(ts.next(), policy.idx) + # half delete older "local" broker + broker.delete_db(ts.next()) + # create "remote" broker + remote_broker.initialize(ts.next(), remote_policy.idx) + + def test_sync_local_recreate_policy_over_newer_remote_create(self): + for setup in self._replication_scenarios(): + ts, policy, remote_policy, broker, remote_broker = setup + # create "local" broker + broker.initialize(ts.next(), policy.idx) + # older recreate "local" broker + broker.delete_db(ts.next()) + recreate_timestamp = ts.next() + broker.update_put_timestamp(recreate_timestamp) + broker.update_status_changed_at(recreate_timestamp) + # create "remote" broker + remote_broker.initialize(ts.next(), remote_policy.idx) + + def test_sync_local_recreate_policy_over_older_remote_create(self): + for setup in self._replication_scenarios(): + ts, policy, remote_policy, broker, remote_broker = setup + # create older "remote" broker + remote_broker.initialize(ts.next(), remote_policy.idx) + # create "local" broker + broker.initialize(ts.next(), policy.idx) + # recreate "local" broker + broker.delete_db(ts.next()) + recreate_timestamp = ts.next() + broker.update_put_timestamp(recreate_timestamp) + broker.update_status_changed_at(recreate_timestamp) + + def test_sync_local_recreate_policy_over_newer_remote_delete(self): + for setup in self._replication_scenarios(): + ts, policy, remote_policy, broker, remote_broker = setup + # create "local" broker + broker.initialize(ts.next(), policy.idx) + # create "remote" broker + remote_broker.initialize(ts.next(), remote_policy.idx) + # recreate "local" broker + broker.delete_db(ts.next()) + recreate_timestamp = ts.next() + broker.update_put_timestamp(recreate_timestamp) + broker.update_status_changed_at(recreate_timestamp) + # older delete "remote" broker + remote_broker.delete_db(ts.next()) + + def test_sync_local_recreate_policy_over_older_remote_delete(self): + for setup in self._replication_scenarios(): + ts, policy, remote_policy, broker, remote_broker = setup + # create "local" broker + broker.initialize(ts.next(), policy.idx) + # create "remote" broker + remote_broker.initialize(ts.next(), remote_policy.idx) + # older delete "remote" broker + remote_broker.delete_db(ts.next()) + # recreate "local" broker + broker.delete_db(ts.next()) + recreate_timestamp = ts.next() + broker.update_put_timestamp(recreate_timestamp) + broker.update_status_changed_at(recreate_timestamp) + + def test_sync_local_recreate_policy_over_older_remote_recreate(self): + for setup in self._replication_scenarios(): + ts, policy, remote_policy, broker, remote_broker = setup + # create "remote" broker + remote_broker.initialize(ts.next(), remote_policy.idx) + # create "local" broker + broker.initialize(ts.next(), policy.idx) + # older recreate "remote" broker + remote_broker.delete_db(ts.next()) + remote_recreate_timestamp = ts.next() + remote_broker.update_put_timestamp(remote_recreate_timestamp) + remote_broker.update_status_changed_at(remote_recreate_timestamp) + # recreate "local" broker + broker.delete_db(ts.next()) + local_recreate_timestamp = ts.next() + broker.update_put_timestamp(local_recreate_timestamp) + broker.update_status_changed_at(local_recreate_timestamp) + + def test_sync_remote_create_policy_over_newer_local_create(self): + for setup in self._replication_scenarios(remote_wins=True): + ts, policy, remote_policy, broker, remote_broker = setup + # create older "remote" broker + remote_broker.initialize(ts.next(), remote_policy.idx) + # create "local" broker + broker.initialize(ts.next(), policy.idx) + + def test_sync_remote_create_policy_over_newer_local_delete(self): + for setup in self._replication_scenarios(remote_wins=True): + ts, policy, remote_policy, broker, remote_broker = setup + # create older "remote" broker + remote_broker.initialize(ts.next(), remote_policy.idx) + # create "local" broker + broker.initialize(ts.next(), policy.idx) + # delete "local" broker + broker.delete_db(ts.next()) + + def test_sync_remote_create_policy_over_older_local_delete(self): + # local_row & both_rows cases are covered by + # "test_sync_local_half_delete_policy_over_newer_remote_create" + for setup in self._replication_scenarios( + 'no_row', 'remote_row', remote_wins=True): + ts, policy, remote_policy, broker, remote_broker = setup + # create older "local" broker + broker.initialize(ts.next(), policy.idx) + # delete older "local" broker + broker.delete_db(ts.next()) + # create "remote" broker + remote_broker.initialize(ts.next(), remote_policy.idx) + + def test_sync_remote_half_delete_policy_over_newer_local_create(self): + # no_row & both_rows cases are covered by + # "test_sync_local_create_policy_over_older_remote_delete" + for setup in self._replication_scenarios('remote_row', 'both_rows', + remote_wins=True): + ts, policy, remote_policy, broker, remote_broker = setup + # create older "remote" broker + remote_broker.initialize(ts.next(), remote_policy.idx) + # half delete older "remote" broker + remote_broker.delete_db(ts.next()) + # create "local" broker + broker.initialize(ts.next(), policy.idx) + + def test_sync_remote_recreate_policy_over_newer_local_create(self): + for setup in self._replication_scenarios(remote_wins=True): + ts, policy, remote_policy, broker, remote_broker = setup + # create "remote" broker + remote_broker.initialize(ts.next(), remote_policy.idx) + # older recreate "remote" broker + remote_broker.delete_db(ts.next()) + recreate_timestamp = ts.next() + remote_broker.update_put_timestamp(recreate_timestamp) + remote_broker.update_status_changed_at(recreate_timestamp) + # create "local" broker + broker.initialize(ts.next(), policy.idx) + + def test_sync_remote_recreate_policy_over_older_local_create(self): + for setup in self._replication_scenarios(remote_wins=True): + ts, policy, remote_policy, broker, remote_broker = setup + # create older "local" broker + broker.initialize(ts.next(), policy.idx) + # create "remote" broker + remote_broker.initialize(ts.next(), remote_policy.idx) + # recreate "remote" broker + remote_broker.delete_db(ts.next()) + recreate_timestamp = ts.next() + remote_broker.update_put_timestamp(recreate_timestamp) + remote_broker.update_status_changed_at(recreate_timestamp) + + def test_sync_remote_recreate_policy_over_newer_local_delete(self): + for setup in self._replication_scenarios(remote_wins=True): + ts, policy, remote_policy, broker, remote_broker = setup + # create "local" broker + broker.initialize(ts.next(), policy.idx) + # create "remote" broker + remote_broker.initialize(ts.next(), remote_policy.idx) + # recreate "remote" broker + remote_broker.delete_db(ts.next()) + remote_recreate_timestamp = ts.next() + remote_broker.update_put_timestamp(remote_recreate_timestamp) + remote_broker.update_status_changed_at(remote_recreate_timestamp) + # older delete "local" broker + broker.delete_db(ts.next()) + + def test_sync_remote_recreate_policy_over_older_local_delete(self): + for setup in self._replication_scenarios(remote_wins=True): + ts, policy, remote_policy, broker, remote_broker = setup + # create "local" broker + broker.initialize(ts.next(), policy.idx) + # create "remote" broker + remote_broker.initialize(ts.next(), remote_policy.idx) + # older delete "local" broker + broker.delete_db(ts.next()) + # recreate "remote" broker + remote_broker.delete_db(ts.next()) + remote_recreate_timestamp = ts.next() + remote_broker.update_put_timestamp(remote_recreate_timestamp) + remote_broker.update_status_changed_at(remote_recreate_timestamp) + + def test_sync_remote_recreate_policy_over_older_local_recreate(self): + for setup in self._replication_scenarios(remote_wins=True): + ts, policy, remote_policy, broker, remote_broker = setup + # create older "local" broker + broker.initialize(ts.next(), policy.idx) + # create "remote" broker + remote_broker.initialize(ts.next(), remote_policy.idx) + # older recreate "local" broker + broker.delete_db(ts.next()) + local_recreate_timestamp = ts.next() + broker.update_put_timestamp(local_recreate_timestamp) + broker.update_status_changed_at(local_recreate_timestamp) + # recreate "remote" broker + remote_broker.delete_db(ts.next()) + remote_recreate_timestamp = ts.next() + remote_broker.update_put_timestamp(remote_recreate_timestamp) + remote_broker.update_status_changed_at(remote_recreate_timestamp) + + if __name__ == '__main__': unittest.main()