# Copyright (c) 2010-2012 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. import os import time import shutil import itertools import unittest import mock import random import sqlite3 from swift.common import db_replicator from swift.container import replicator, backend, server from swift.container.reconciler import ( MISPLACED_OBJECTS_ACCOUNT, get_reconciler_container_name) from swift.common.utils import Timestamp from swift.common.storage_policy import POLICIES from test.unit.common import test_db_replicator from test.unit import patch_policies, make_timestamp_iter from contextlib import contextmanager @patch_policies class TestReplicatorSync(test_db_replicator.TestReplicatorSync): backend = backend.ContainerBroker datadir = server.DATADIR replicator_daemon = replicator.ContainerReplicator replicator_rpc = replicator.ContainerReplicatorRpc def test_report_up_to_date(self): broker = self._get_broker('a', 'c', node_index=0) broker.initialize(Timestamp(1).internal, int(POLICIES.default)) info = broker.get_info() broker.reported(info['put_timestamp'], info['delete_timestamp'], info['object_count'], info['bytes_used']) full_info = broker.get_replication_info() expected_info = {'put_timestamp': Timestamp(1).internal, 'delete_timestamp': '0', 'count': 0, 'bytes_used': 0, 'reported_put_timestamp': Timestamp(1).internal, 'reported_delete_timestamp': '0', 'reported_object_count': 0, 'reported_bytes_used': 0} for key, value in expected_info.items(): msg = 'expected value for %r, %r != %r' % ( key, full_info[key], value) self.assertEqual(full_info[key], value, msg) repl = replicator.ContainerReplicator({}) self.assertTrue(repl.report_up_to_date(full_info)) full_info['delete_timestamp'] = Timestamp(2).internal self.assertFalse(repl.report_up_to_date(full_info)) full_info['reported_delete_timestamp'] = Timestamp(2).internal self.assertTrue(repl.report_up_to_date(full_info)) full_info['count'] = 1 self.assertFalse(repl.report_up_to_date(full_info)) full_info['reported_object_count'] = 1 self.assertTrue(repl.report_up_to_date(full_info)) full_info['bytes_used'] = 1 self.assertFalse(repl.report_up_to_date(full_info)) full_info['reported_bytes_used'] = 1 self.assertTrue(repl.report_up_to_date(full_info)) full_info['put_timestamp'] = Timestamp(3).internal self.assertFalse(repl.report_up_to_date(full_info)) full_info['reported_put_timestamp'] = Timestamp(3).internal self.assertTrue(repl.report_up_to_date(full_info)) def test_sync_remote_in_sync(self): # setup a local container broker = self._get_broker('a', 'c', node_index=0) put_timestamp = time.time() broker.initialize(put_timestamp, POLICIES.default.idx) # "replicate" to same database node = {'device': 'sdb', 'replication_ip': '127.0.0.1'} daemon = replicator.ContainerReplicator({}) # replicate part, node = self._get_broker_part_node(broker) info = broker.get_replication_info() success = daemon._repl_to_node(node, broker, part, info) # nothing to do self.assertTrue(success) self.assertEqual(1, daemon.stats['no_change']) def test_sync_remote_with_timings(self): ts_iter = make_timestamp_iter() # setup a local container broker = self._get_broker('a', 'c', node_index=0) put_timestamp = next(ts_iter) broker.initialize(put_timestamp.internal, POLICIES.default.idx) broker.update_metadata( {'x-container-meta-test': ('foo', put_timestamp.internal)}) # setup remote container remote_broker = self._get_broker('a', 'c', node_index=1) remote_broker.initialize(next(ts_iter).internal, POLICIES.default.idx) timestamp = next(ts_iter) for db in (broker, remote_broker): db.put_object( '/a/c/o', timestamp.internal, 0, 'content-type', 'etag', storage_policy_index=db.storage_policy_index) # replicate daemon = replicator.ContainerReplicator({}) part, node = self._get_broker_part_node(remote_broker) info = broker.get_replication_info() with mock.patch.object(db_replicator, 'DEBUG_TIMINGS_THRESHOLD', -1): success = daemon._repl_to_node(node, broker, part, info) # nothing to do self.assertTrue(success) self.assertEqual(1, daemon.stats['no_change']) expected_timings = ('info', 'update_metadata', 'merge_timestamps', 'get_sync', 'merge_syncs') debug_lines = self.rpc.logger.logger.get_lines_for_level('debug') self.assertEqual(len(expected_timings), len(debug_lines), 'Expected %s debug lines but only got %s: %s' % (len(expected_timings), len(debug_lines), debug_lines)) for metric in expected_timings: expected = 'replicator-rpc-sync time for %s:' % metric self.assertTrue(any(expected in line for line in debug_lines), 'debug timing %r was not in %r' % ( expected, debug_lines)) def test_sync_remote_missing(self): broker = self._get_broker('a', 'c', node_index=0) put_timestamp = time.time() broker.initialize(put_timestamp, POLICIES.default.idx) # "replicate" part, node = self._get_broker_part_node(broker) daemon = self._run_once(node) # complete rsync to all other nodes self.assertEqual(2, daemon.stats['rsync']) for i in range(1, 3): remote_broker = self._get_broker('a', 'c', node_index=i) self.assertTrue(os.path.exists(remote_broker.db_file)) remote_info = remote_broker.get_info() local_info = self._get_broker( 'a', 'c', node_index=0).get_info() for k, v in local_info.items(): if k == 'id': continue self.assertEqual(remote_info[k], v, "mismatch remote %s %r != %r" % ( k, remote_info[k], v)) def test_rsync_failure(self): broker = self._get_broker('a', 'c', node_index=0) put_timestamp = time.time() broker.initialize(put_timestamp, POLICIES.default.idx) # "replicate" to different device daemon = replicator.ContainerReplicator({}) def _rsync_file(*args, **kwargs): return False daemon._rsync_file = _rsync_file # replicate part, local_node = self._get_broker_part_node(broker) node = random.choice([n for n in self._ring.devs if n['id'] != local_node['id']]) info = broker.get_replication_info() success = daemon._repl_to_node(node, broker, part, info) self.assertFalse(success) def test_sync_remote_missing_most_rows(self): put_timestamp = time.time() # create "local" broker broker = self._get_broker('a', 'c', node_index=0) broker.initialize(put_timestamp, POLICIES.default.idx) # create "remote" broker remote_broker = self._get_broker('a', 'c', node_index=1) remote_broker.initialize(put_timestamp, POLICIES.default.idx) # add a row to "local" db broker.put_object('/a/c/o', time.time(), 0, 'content-type', 'etag', storage_policy_index=broker.storage_policy_index) # replicate node = {'device': 'sdc', 'replication_ip': '127.0.0.1'} daemon = replicator.ContainerReplicator({'per_diff': 1}) def _rsync_file(db_file, remote_file, **kwargs): remote_server, remote_path = remote_file.split('/', 1) dest_path = os.path.join(self.root, remote_path) shutil.copy(db_file, dest_path) return True daemon._rsync_file = _rsync_file part, node = self._get_broker_part_node(remote_broker) info = broker.get_replication_info() success = daemon._repl_to_node(node, broker, part, info) self.assertTrue(success) # row merge self.assertEqual(1, daemon.stats['remote_merge']) local_info = self._get_broker( 'a', 'c', node_index=0).get_info() remote_info = self._get_broker( 'a', 'c', node_index=1).get_info() for k, v in local_info.items(): if k == 'id': continue self.assertEqual(remote_info[k], v, "mismatch remote %s %r != %r" % ( k, remote_info[k], v)) def test_sync_remote_missing_one_rows(self): put_timestamp = time.time() # create "local" broker broker = self._get_broker('a', 'c', node_index=0) broker.initialize(put_timestamp, POLICIES.default.idx) # create "remote" broker remote_broker = self._get_broker('a', 'c', node_index=1) remote_broker.initialize(put_timestamp, POLICIES.default.idx) # add some rows to both db for i in range(10): put_timestamp = time.time() for db in (broker, remote_broker): path = '/a/c/o_%s' % i db.put_object(path, put_timestamp, 0, 'content-type', 'etag', storage_policy_index=db.storage_policy_index) # now a row to the "local" broker only broker.put_object('/a/c/o_missing', time.time(), 0, 'content-type', 'etag', storage_policy_index=broker.storage_policy_index) # replicate daemon = replicator.ContainerReplicator({}) part, node = self._get_broker_part_node(remote_broker) info = broker.get_replication_info() success = daemon._repl_to_node(node, broker, part, info) self.assertTrue(success) # row merge self.assertEqual(1, daemon.stats['diff']) local_info = self._get_broker( 'a', 'c', node_index=0).get_info() remote_info = self._get_broker( 'a', 'c', node_index=1).get_info() for k, v in local_info.items(): if k == 'id': continue self.assertEqual(remote_info[k], v, "mismatch remote %s %r != %r" % ( k, remote_info[k], v)) def test_sync_remote_can_not_keep_up(self): put_timestamp = time.time() # create "local" broker broker = self._get_broker('a', 'c', node_index=0) broker.initialize(put_timestamp, POLICIES.default.idx) # create "remote" broker remote_broker = self._get_broker('a', 'c', node_index=1) remote_broker.initialize(put_timestamp, POLICIES.default.idx) # add some rows to both db's for i in range(10): put_timestamp = time.time() for db in (broker, remote_broker): obj_name = 'o_%s' % i db.put_object(obj_name, put_timestamp, 0, 'content-type', 'etag', storage_policy_index=db.storage_policy_index) # setup REPLICATE callback to simulate adding rows during merge_items missing_counter = itertools.count() def put_more_objects(op, *args): if op != 'merge_items': return path = '/a/c/o_missing_%s' % next(missing_counter) broker.put_object(path, time.time(), 0, 'content-type', 'etag', storage_policy_index=db.storage_policy_index) test_db_replicator.FakeReplConnection = \ test_db_replicator.attach_fake_replication_rpc( self.rpc, replicate_hook=put_more_objects) db_replicator.ReplConnection = test_db_replicator.FakeReplConnection # and add one extra to local db to trigger merge_items put_more_objects('merge_items') # limit number of times we'll call merge_items daemon = replicator.ContainerReplicator({'max_diffs': 10}) # replicate part, node = self._get_broker_part_node(remote_broker) info = broker.get_replication_info() success = daemon._repl_to_node(node, broker, part, info) self.assertFalse(success) # back off on the PUTs during replication... FakeReplConnection = test_db_replicator.attach_fake_replication_rpc( self.rpc, replicate_hook=None) db_replicator.ReplConnection = FakeReplConnection # retry replication info = broker.get_replication_info() success = daemon._repl_to_node(node, broker, part, info) self.assertTrue(success) # row merge self.assertEqual(2, daemon.stats['diff']) self.assertEqual(1, daemon.stats['diff_capped']) local_info = self._get_broker( 'a', 'c', node_index=0).get_info() remote_info = self._get_broker( 'a', 'c', node_index=1).get_info() for k, v in local_info.items(): if k == 'id': continue self.assertEqual(remote_info[k], v, "mismatch remote %s %r != %r" % ( k, remote_info[k], v)) def test_diff_capped_sync(self): ts = (Timestamp(t).internal for t in itertools.count(int(time.time()))) put_timestamp = next(ts) # start off with with a local db that is way behind broker = self._get_broker('a', 'c', node_index=0) broker.initialize(put_timestamp, POLICIES.default.idx) for i in range(50): broker.put_object( 'o%s' % i, next(ts), 0, 'content-type-old', 'etag', storage_policy_index=broker.storage_policy_index) # remote primary db has all the new bits... remote_broker = self._get_broker('a', 'c', node_index=1) remote_broker.initialize(put_timestamp, POLICIES.default.idx) for i in range(100): remote_broker.put_object( 'o%s' % i, next(ts), 0, 'content-type-new', 'etag', storage_policy_index=remote_broker.storage_policy_index) # except there's *one* tiny thing in our local broker that's newer broker.put_object( 'o101', next(ts), 0, 'content-type-new', 'etag', storage_policy_index=broker.storage_policy_index) # setup daemon with smaller per_diff and max_diffs part, node = self._get_broker_part_node(broker) daemon = self._get_daemon(node, conf_updates={'per_diff': 10, 'max_diffs': 3}) self.assertEqual(daemon.per_diff, 10) self.assertEqual(daemon.max_diffs, 3) # run once and verify diff capped self._run_once(node, daemon=daemon) self.assertEqual(1, daemon.stats['diff']) self.assertEqual(1, daemon.stats['diff_capped']) # run again and verify fully synced self._run_once(node, daemon=daemon) self.assertEqual(1, daemon.stats['diff']) self.assertEqual(0, daemon.stats['diff_capped']) # now that we're synced the new item should be in remote db remote_names = set() for item in remote_broker.list_objects_iter(500, '', '', '', ''): name, ts, size, content_type, etag = item remote_names.add(name) self.assertEqual(content_type, 'content-type-new') self.assertTrue('o101' in remote_names) self.assertEqual(len(remote_names), 101) self.assertEqual(remote_broker.get_info()['object_count'], 101) def test_sync_status_change(self): # setup a local container broker = self._get_broker('a', 'c', node_index=0) put_timestamp = time.time() broker.initialize(put_timestamp, POLICIES.default.idx) # setup remote container remote_broker = self._get_broker('a', 'c', node_index=1) remote_broker.initialize(put_timestamp, POLICIES.default.idx) # delete local container broker.delete_db(time.time()) # replicate daemon = replicator.ContainerReplicator({}) part, node = self._get_broker_part_node(remote_broker) info = broker.get_replication_info() success = daemon._repl_to_node(node, broker, part, info) # nothing to do self.assertTrue(success) self.assertEqual(1, daemon.stats['no_change']) # status in sync self.assertTrue(remote_broker.is_deleted()) info = broker.get_info() remote_info = remote_broker.get_info() self.assertTrue(Timestamp(remote_info['status_changed_at']) > Timestamp(remote_info['put_timestamp']), 'remote status_changed_at (%s) is not ' 'greater than put_timestamp (%s)' % ( remote_info['status_changed_at'], remote_info['put_timestamp'])) self.assertTrue(Timestamp(remote_info['status_changed_at']) > Timestamp(info['status_changed_at']), 'remote status_changed_at (%s) is not ' 'greater than local status_changed_at (%s)' % ( remote_info['status_changed_at'], info['status_changed_at'])) @contextmanager def _wrap_merge_timestamps(self, broker, calls): def fake_merge_timestamps(*args, **kwargs): calls.append(args[0]) orig_merge_timestamps(*args, **kwargs) orig_merge_timestamps = broker.merge_timestamps broker.merge_timestamps = fake_merge_timestamps try: yield True finally: broker.merge_timestamps = orig_merge_timestamps def test_sync_merge_timestamps(self): ts = (Timestamp(t).internal for t in itertools.count(int(time.time()))) # setup a local container broker = self._get_broker('a', 'c', node_index=0) put_timestamp = next(ts) broker.initialize(put_timestamp, POLICIES.default.idx) # setup remote container remote_broker = self._get_broker('a', 'c', node_index=1) remote_put_timestamp = next(ts) remote_broker.initialize(remote_put_timestamp, POLICIES.default.idx) # replicate, expect call to merge_timestamps on remote and local daemon = replicator.ContainerReplicator({}) part, node = self._get_broker_part_node(remote_broker) info = broker.get_replication_info() local_calls = [] remote_calls = [] with self._wrap_merge_timestamps(broker, local_calls): with self._wrap_merge_timestamps(broker, remote_calls): success = daemon._repl_to_node(node, broker, part, info) self.assertTrue(success) self.assertEqual(1, len(remote_calls)) self.assertEqual(1, len(local_calls)) self.assertEqual(remote_put_timestamp, broker.get_info()['put_timestamp']) self.assertEqual(remote_put_timestamp, remote_broker.get_info()['put_timestamp']) # replicate again, no changes so expect no calls to merge_timestamps info = broker.get_replication_info() local_calls = [] remote_calls = [] with self._wrap_merge_timestamps(broker, local_calls): with self._wrap_merge_timestamps(broker, remote_calls): success = daemon._repl_to_node(node, broker, part, info) self.assertTrue(success) self.assertEqual(0, len(remote_calls)) self.assertEqual(0, len(local_calls)) self.assertEqual(remote_put_timestamp, broker.get_info()['put_timestamp']) self.assertEqual(remote_put_timestamp, remote_broker.get_info()['put_timestamp']) def test_sync_bogus_db_quarantines(self): ts = (Timestamp(t).internal for t in itertools.count(int(time.time()))) policy = random.choice(list(POLICIES)) # create "local" broker local_broker = self._get_broker('a', 'c', node_index=0) local_broker.initialize(next(ts), policy.idx) # create "remote" broker remote_broker = self._get_broker('a', 'c', node_index=1) remote_broker.initialize(next(ts), policy.idx) db_path = local_broker.db_file self.assertTrue(os.path.exists(db_path)) # sanity check old_inode = os.stat(db_path).st_ino _orig_get_info = backend.ContainerBroker.get_info def fail_like_bad_db(broker): if broker.db_file == local_broker.db_file: raise sqlite3.OperationalError("no such table: container_info") else: return _orig_get_info(broker) part, node = self._get_broker_part_node(remote_broker) with mock.patch('swift.container.backend.ContainerBroker.get_info', fail_like_bad_db): # Have the remote node replicate to local; local should see its # corrupt DB, quarantine it, and act like the DB wasn't ever there # in the first place. daemon = self._run_once(node) self.assertTrue(os.path.exists(db_path)) # Make sure we didn't just keep the old DB, but quarantined it and # made a fresh copy. new_inode = os.stat(db_path).st_ino self.assertNotEqual(old_inode, new_inode) self.assertEqual(daemon.stats['failure'], 0) def _replication_scenarios(self, *scenarios, **kwargs): remote_wins = kwargs.get('remote_wins', False) # these tests are duplicated because of the differences in replication # when row counts cause full rsync vs. merge scenarios = scenarios or ( 'no_row', 'local_row', 'remote_row', 'both_rows') for scenario_name in scenarios: ts = itertools.count(int(time.time())) policy = random.choice(list(POLICIES)) remote_policy = random.choice( [p for p in POLICIES if p is not policy]) broker = self._get_broker('a', 'c', node_index=0) remote_broker = self._get_broker('a', 'c', node_index=1) yield ts, policy, remote_policy, broker, remote_broker # variations on different replication scenarios variations = { 'no_row': (), 'local_row': (broker,), 'remote_row': (remote_broker,), 'both_rows': (broker, remote_broker), } dbs = variations[scenario_name] obj_ts = next(ts) for db in dbs: db.put_object('/a/c/o', obj_ts, 0, 'content-type', 'etag', storage_policy_index=db.storage_policy_index) # replicate part, node = self._get_broker_part_node(broker) daemon = self._run_once(node) self.assertEqual(0, daemon.stats['failure']) # in sync local_info = self._get_broker( 'a', 'c', node_index=0).get_info() remote_info = self._get_broker( 'a', 'c', node_index=1).get_info() if remote_wins: expected = remote_policy.idx err = 'local policy did not change to match remote ' \ 'for replication row scenario %s' % scenario_name else: expected = policy.idx err = 'local policy changed to match remote ' \ 'for replication row scenario %s' % scenario_name self.assertEqual(local_info['storage_policy_index'], expected, err) self.assertEqual(remote_info['storage_policy_index'], local_info['storage_policy_index']) test_db_replicator.TestReplicatorSync.tearDown(self) test_db_replicator.TestReplicatorSync.setUp(self) def test_sync_local_create_policy_over_newer_remote_create(self): for setup in self._replication_scenarios(): ts, policy, remote_policy, broker, remote_broker = setup # create "local" broker broker.initialize(next(ts), policy.idx) # create "remote" broker remote_broker.initialize(next(ts), remote_policy.idx) def test_sync_local_create_policy_over_newer_remote_delete(self): for setup in self._replication_scenarios(): ts, policy, remote_policy, broker, remote_broker = setup # create older "local" broker broker.initialize(next(ts), policy.idx) # create "remote" broker remote_broker.initialize(next(ts), remote_policy.idx) # delete "remote" broker remote_broker.delete_db(next(ts)) def test_sync_local_create_policy_over_older_remote_delete(self): # remote_row & both_rows cases are covered by # "test_sync_remote_half_delete_policy_over_newer_local_create" for setup in self._replication_scenarios( 'no_row', 'local_row'): ts, policy, remote_policy, broker, remote_broker = setup # create older "remote" broker remote_broker.initialize(next(ts), remote_policy.idx) # delete older "remote" broker remote_broker.delete_db(next(ts)) # create "local" broker broker.initialize(next(ts), policy.idx) def test_sync_local_half_delete_policy_over_newer_remote_create(self): # no_row & remote_row cases are covered by # "test_sync_remote_create_policy_over_older_local_delete" for setup in self._replication_scenarios('local_row', 'both_rows'): ts, policy, remote_policy, broker, remote_broker = setup # create older "local" broker broker.initialize(next(ts), policy.idx) # half delete older "local" broker broker.delete_db(next(ts)) # create "remote" broker remote_broker.initialize(next(ts), remote_policy.idx) def test_sync_local_recreate_policy_over_newer_remote_create(self): for setup in self._replication_scenarios(): ts, policy, remote_policy, broker, remote_broker = setup # create "local" broker broker.initialize(next(ts), policy.idx) # older recreate "local" broker broker.delete_db(next(ts)) recreate_timestamp = next(ts) broker.update_put_timestamp(recreate_timestamp) broker.update_status_changed_at(recreate_timestamp) # create "remote" broker remote_broker.initialize(next(ts), remote_policy.idx) def test_sync_local_recreate_policy_over_older_remote_create(self): for setup in self._replication_scenarios(): ts, policy, remote_policy, broker, remote_broker = setup # create older "remote" broker remote_broker.initialize(next(ts), remote_policy.idx) # create "local" broker broker.initialize(next(ts), policy.idx) # recreate "local" broker broker.delete_db(next(ts)) recreate_timestamp = next(ts) broker.update_put_timestamp(recreate_timestamp) broker.update_status_changed_at(recreate_timestamp) def test_sync_local_recreate_policy_over_newer_remote_delete(self): for setup in self._replication_scenarios(): ts, policy, remote_policy, broker, remote_broker = setup # create "local" broker broker.initialize(next(ts), policy.idx) # create "remote" broker remote_broker.initialize(next(ts), remote_policy.idx) # recreate "local" broker broker.delete_db(next(ts)) recreate_timestamp = next(ts) broker.update_put_timestamp(recreate_timestamp) broker.update_status_changed_at(recreate_timestamp) # older delete "remote" broker remote_broker.delete_db(next(ts)) def test_sync_local_recreate_policy_over_older_remote_delete(self): for setup in self._replication_scenarios(): ts, policy, remote_policy, broker, remote_broker = setup # create "local" broker broker.initialize(next(ts), policy.idx) # create "remote" broker remote_broker.initialize(next(ts), remote_policy.idx) # older delete "remote" broker remote_broker.delete_db(next(ts)) # recreate "local" broker broker.delete_db(next(ts)) recreate_timestamp = next(ts) broker.update_put_timestamp(recreate_timestamp) broker.update_status_changed_at(recreate_timestamp) def test_sync_local_recreate_policy_over_older_remote_recreate(self): for setup in self._replication_scenarios(): ts, policy, remote_policy, broker, remote_broker = setup # create "remote" broker remote_broker.initialize(next(ts), remote_policy.idx) # create "local" broker broker.initialize(next(ts), policy.idx) # older recreate "remote" broker remote_broker.delete_db(next(ts)) remote_recreate_timestamp = next(ts) remote_broker.update_put_timestamp(remote_recreate_timestamp) remote_broker.update_status_changed_at(remote_recreate_timestamp) # recreate "local" broker broker.delete_db(next(ts)) local_recreate_timestamp = next(ts) broker.update_put_timestamp(local_recreate_timestamp) broker.update_status_changed_at(local_recreate_timestamp) def test_sync_remote_create_policy_over_newer_local_create(self): for setup in self._replication_scenarios(remote_wins=True): ts, policy, remote_policy, broker, remote_broker = setup # create older "remote" broker remote_broker.initialize(next(ts), remote_policy.idx) # create "local" broker broker.initialize(next(ts), policy.idx) def test_sync_remote_create_policy_over_newer_local_delete(self): for setup in self._replication_scenarios(remote_wins=True): ts, policy, remote_policy, broker, remote_broker = setup # create older "remote" broker remote_broker.initialize(next(ts), remote_policy.idx) # create "local" broker broker.initialize(next(ts), policy.idx) # delete "local" broker broker.delete_db(next(ts)) def test_sync_remote_create_policy_over_older_local_delete(self): # local_row & both_rows cases are covered by # "test_sync_local_half_delete_policy_over_newer_remote_create" for setup in self._replication_scenarios( 'no_row', 'remote_row', remote_wins=True): ts, policy, remote_policy, broker, remote_broker = setup # create older "local" broker broker.initialize(next(ts), policy.idx) # delete older "local" broker broker.delete_db(next(ts)) # create "remote" broker remote_broker.initialize(next(ts), remote_policy.idx) def test_sync_remote_half_delete_policy_over_newer_local_create(self): # no_row & both_rows cases are covered by # "test_sync_local_create_policy_over_older_remote_delete" for setup in self._replication_scenarios('remote_row', 'both_rows', remote_wins=True): ts, policy, remote_policy, broker, remote_broker = setup # create older "remote" broker remote_broker.initialize(next(ts), remote_policy.idx) # half delete older "remote" broker remote_broker.delete_db(next(ts)) # create "local" broker broker.initialize(next(ts), policy.idx) def test_sync_remote_recreate_policy_over_newer_local_create(self): for setup in self._replication_scenarios(remote_wins=True): ts, policy, remote_policy, broker, remote_broker = setup # create "remote" broker remote_broker.initialize(next(ts), remote_policy.idx) # older recreate "remote" broker remote_broker.delete_db(next(ts)) recreate_timestamp = next(ts) remote_broker.update_put_timestamp(recreate_timestamp) remote_broker.update_status_changed_at(recreate_timestamp) # create "local" broker broker.initialize(next(ts), policy.idx) def test_sync_remote_recreate_policy_over_older_local_create(self): for setup in self._replication_scenarios(remote_wins=True): ts, policy, remote_policy, broker, remote_broker = setup # create older "local" broker broker.initialize(next(ts), policy.idx) # create "remote" broker remote_broker.initialize(next(ts), remote_policy.idx) # recreate "remote" broker remote_broker.delete_db(next(ts)) recreate_timestamp = next(ts) remote_broker.update_put_timestamp(recreate_timestamp) remote_broker.update_status_changed_at(recreate_timestamp) def test_sync_remote_recreate_policy_over_newer_local_delete(self): for setup in self._replication_scenarios(remote_wins=True): ts, policy, remote_policy, broker, remote_broker = setup # create "local" broker broker.initialize(next(ts), policy.idx) # create "remote" broker remote_broker.initialize(next(ts), remote_policy.idx) # recreate "remote" broker remote_broker.delete_db(next(ts)) remote_recreate_timestamp = next(ts) remote_broker.update_put_timestamp(remote_recreate_timestamp) remote_broker.update_status_changed_at(remote_recreate_timestamp) # older delete "local" broker broker.delete_db(next(ts)) def test_sync_remote_recreate_policy_over_older_local_delete(self): for setup in self._replication_scenarios(remote_wins=True): ts, policy, remote_policy, broker, remote_broker = setup # create "local" broker broker.initialize(next(ts), policy.idx) # create "remote" broker remote_broker.initialize(next(ts), remote_policy.idx) # older delete "local" broker broker.delete_db(next(ts)) # recreate "remote" broker remote_broker.delete_db(next(ts)) remote_recreate_timestamp = next(ts) remote_broker.update_put_timestamp(remote_recreate_timestamp) remote_broker.update_status_changed_at(remote_recreate_timestamp) def test_sync_remote_recreate_policy_over_older_local_recreate(self): for setup in self._replication_scenarios(remote_wins=True): ts, policy, remote_policy, broker, remote_broker = setup # create older "local" broker broker.initialize(next(ts), policy.idx) # create "remote" broker remote_broker.initialize(next(ts), remote_policy.idx) # older recreate "local" broker broker.delete_db(next(ts)) local_recreate_timestamp = next(ts) broker.update_put_timestamp(local_recreate_timestamp) broker.update_status_changed_at(local_recreate_timestamp) # recreate "remote" broker remote_broker.delete_db(next(ts)) remote_recreate_timestamp = next(ts) remote_broker.update_put_timestamp(remote_recreate_timestamp) remote_broker.update_status_changed_at(remote_recreate_timestamp) def test_sync_to_remote_with_misplaced(self): ts = (Timestamp(t).internal for t in itertools.count(int(time.time()))) # create "local" broker policy = random.choice(list(POLICIES)) broker = self._get_broker('a', 'c', node_index=0) broker.initialize(next(ts), policy.idx) # create "remote" broker remote_policy = random.choice([p for p in POLICIES if p is not policy]) remote_broker = self._get_broker('a', 'c', node_index=1) remote_broker.initialize(next(ts), remote_policy.idx) # add misplaced row to remote_broker remote_broker.put_object( '/a/c/o', next(ts), 0, 'content-type', 'etag', storage_policy_index=remote_broker.storage_policy_index) # since this row matches policy index or remote, it shows up in count self.assertEqual(remote_broker.get_info()['object_count'], 1) self.assertEqual([], remote_broker.get_misplaced_since(-1, 1)) # replicate part, node = self._get_broker_part_node(broker) daemon = self._run_once(node) # since our local broker has no rows to push it logs as no_change self.assertEqual(1, daemon.stats['no_change']) self.assertEqual(0, broker.get_info()['object_count']) # remote broker updates it's policy index; this makes the remote # broker's object count change info = remote_broker.get_info() expectations = { 'object_count': 0, 'storage_policy_index': policy.idx, } for key, value in expectations.items(): self.assertEqual(info[key], value) # but it also knows those objects are misplaced now misplaced = remote_broker.get_misplaced_since(-1, 100) self.assertEqual(len(misplaced), 1) # we also pushed out to node 3 with rsync self.assertEqual(1, daemon.stats['rsync']) third_broker = self._get_broker('a', 'c', node_index=2) info = third_broker.get_info() for key, value in expectations.items(): self.assertEqual(info[key], value) def test_misplaced_rows_replicate_and_enqueue(self): ts = (Timestamp(t).internal for t in itertools.count(int(time.time()))) policy = random.choice(list(POLICIES)) broker = self._get_broker('a', 'c', node_index=0) broker.initialize(next(ts), policy.idx) remote_policy = random.choice([p for p in POLICIES if p is not policy]) remote_broker = self._get_broker('a', 'c', node_index=1) remote_broker.initialize(next(ts), remote_policy.idx) # add a misplaced row to *local* broker obj_put_timestamp = next(ts) broker.put_object( 'o', obj_put_timestamp, 0, 'content-type', 'etag', storage_policy_index=remote_policy.idx) misplaced = broker.get_misplaced_since(-1, 1) self.assertEqual(len(misplaced), 1) # since this row is misplaced it doesn't show up in count self.assertEqual(broker.get_info()['object_count'], 0) # replicate part, node = self._get_broker_part_node(broker) daemon = self._run_once(node) # push to remote, and third node was missing (also maybe reconciler) self.assertTrue(2 < daemon.stats['rsync'] <= 3) # grab the rsynced instance of remote_broker remote_broker = self._get_broker('a', 'c', node_index=1) # remote has misplaced rows too now misplaced = remote_broker.get_misplaced_since(-1, 1) self.assertEqual(len(misplaced), 1) # and the correct policy_index and object_count info = remote_broker.get_info() expectations = { 'object_count': 0, 'storage_policy_index': policy.idx, } for key, value in expectations.items(): self.assertEqual(info[key], value) # and we should have also enqeued these rows in the reconciler reconciler = daemon.get_reconciler_broker(misplaced[0]['created_at']) # but it may not be on the same node as us anymore though... reconciler = self._get_broker(reconciler.account, reconciler.container, node_index=0) self.assertEqual(reconciler.get_info()['object_count'], 1) objects = reconciler.list_objects_iter( 1, '', None, None, None, None, storage_policy_index=0) self.assertEqual(len(objects), 1) expected = ('%s:/a/c/o' % remote_policy.idx, obj_put_timestamp, 0, 'application/x-put', obj_put_timestamp) self.assertEqual(objects[0], expected) # having safely enqueued to the reconciler we can advance # our sync pointer self.assertEqual(broker.get_reconciler_sync(), 1) def test_multiple_out_sync_reconciler_enqueue_normalize(self): ts = (Timestamp(t).internal for t in itertools.count(int(time.time()))) policy = random.choice(list(POLICIES)) broker = self._get_broker('a', 'c', node_index=0) broker.initialize(next(ts), policy.idx) remote_policy = random.choice([p for p in POLICIES if p is not policy]) remote_broker = self._get_broker('a', 'c', node_index=1) remote_broker.initialize(next(ts), remote_policy.idx) # add some rows to brokers for db in (broker, remote_broker): for p in (policy, remote_policy): db.put_object('o-%s' % p.name, next(ts), 0, 'content-type', 'etag', storage_policy_index=p.idx) db._commit_puts() expected_policy_stats = { policy.idx: {'object_count': 1, 'bytes_used': 0}, remote_policy.idx: {'object_count': 1, 'bytes_used': 0}, } for db in (broker, remote_broker): policy_stats = db.get_policy_stats() self.assertEqual(policy_stats, expected_policy_stats) # each db has 2 rows, 4 total all_items = set() for db in (broker, remote_broker): items = db.get_items_since(-1, 4) all_items.update( (item['name'], item['created_at']) for item in items) self.assertEqual(4, len(all_items)) # replicate both ways part, node = self._get_broker_part_node(broker) self._run_once(node) part, node = self._get_broker_part_node(remote_broker) self._run_once(node) # only the latest timestamps should survive most_recent_items = {} for name, timestamp in all_items: most_recent_items[name] = max( timestamp, most_recent_items.get(name, -1)) self.assertEqual(2, len(most_recent_items)) for db in (broker, remote_broker): items = db.get_items_since(-1, 4) self.assertEqual(len(items), len(most_recent_items)) for item in items: self.assertEqual(most_recent_items[item['name']], item['created_at']) # and the reconciler also collapses updates reconciler_containers = set() for item in all_items: _name, timestamp = item reconciler_containers.add( get_reconciler_container_name(timestamp)) reconciler_items = set() for reconciler_container in reconciler_containers: for node_index in range(3): reconciler = self._get_broker(MISPLACED_OBJECTS_ACCOUNT, reconciler_container, node_index=node_index) items = reconciler.get_items_since(-1, 4) reconciler_items.update( (item['name'], item['created_at']) for item in items) # they can't *both* be in the wrong policy ;) self.assertEqual(1, len(reconciler_items)) for reconciler_name, timestamp in reconciler_items: _policy_index, path = reconciler_name.split(':', 1) a, c, name = path.lstrip('/').split('/') self.assertEqual(most_recent_items[name], timestamp) @contextmanager def _wrap_update_reconciler_sync(self, broker, calls): def wrapper_function(*args, **kwargs): calls.append(args) orig_function(*args, **kwargs) orig_function = broker.update_reconciler_sync broker.update_reconciler_sync = wrapper_function try: yield True finally: broker.update_reconciler_sync = orig_function def test_post_replicate_hook(self): ts = (Timestamp(t).internal for t in itertools.count(int(time.time()))) broker = self._get_broker('a', 'c', node_index=0) broker.initialize(next(ts), 0) broker.put_object('foo', next(ts), 0, 'text/plain', 'xyz', deleted=0, storage_policy_index=0) info = broker.get_replication_info() self.assertEqual(1, info['max_row']) self.assertEqual(-1, broker.get_reconciler_sync()) daemon = replicator.ContainerReplicator({}) calls = [] with self._wrap_update_reconciler_sync(broker, calls): daemon._post_replicate_hook(broker, info, []) self.assertEqual(1, len(calls)) # repeated call to _post_replicate_hook with no change to info # should not call update_reconciler_sync calls = [] with self._wrap_update_reconciler_sync(broker, calls): daemon._post_replicate_hook(broker, info, []) self.assertEqual(0, len(calls)) if __name__ == '__main__': unittest.main()