Merge container storage_policy_index

Keep status_changed_at in container databases current with status changes that
occur as a result of container creation, deletion, or re-creation.

Merge container put/delete/created timestamps when handling replicate
responses from remote servers in addition to during the handling of the
REPLICATE request.

When storage policies are configured on a cluster send status_changed_at,
object_count and storage_policy_index as part of container replication sync
args.

Use status_changed_at during replication to determine the oldest active
container and merge storage_policy_index.

DocImpact
Implements: blueprint storage-policies
Change-Id: Ib9a0dd42c271145e641437dc04d0ebea1e11fc47
This commit is contained in:
Clay Gerrard 2014-05-27 17:47:21 -07:00
parent 3dff1249f5
commit 81bc31e6ec
6 changed files with 951 additions and 81 deletions

View File

@ -21,17 +21,17 @@ import shutil
import uuid
import errno
import re
from contextlib import contextmanager
from swift import gettext_ as _
from eventlet import GreenPool, sleep, Timeout
from eventlet.green import subprocess
import simplejson
import swift.common.db
from swift.common.direct_client import quote
from swift.common.utils import get_logger, whataremyips, storage_directory, \
renamer, mkdirs, lock_parent_directory, config_true_value, \
unlink_older_than, dump_recon_cache, rsync_ip, ismount
unlink_older_than, dump_recon_cache, rsync_ip, ismount, json
from swift.common import ring
from swift.common.http import HTTP_NOT_FOUND, HTTP_INSUFFICIENT_STORAGE
from swift.common.bufferedhttp import BufferedHTTPConnection
@ -129,7 +129,7 @@ class ReplConnection(BufferedHTTPConnection):
:returns: bufferedhttp response object
"""
try:
body = simplejson.dumps(args)
body = json.dumps(args)
self.request('REPLICATE', self.path, body,
{'Content-Type': 'application/json'})
response = self.getresponse()
@ -348,6 +348,14 @@ class Replicator(Daemon):
os.path.basename(db_file).split('.', 1)[0],
self.logger)
def _gather_sync_args(self, info):
"""
Convert local replication_info to sync args tuple.
"""
sync_args_order = ('max_row', 'hash', 'id', 'created_at',
'put_timestamp', 'delete_timestamp', 'metadata')
return tuple(info[key] for key in sync_args_order)
def _repl_to_node(self, node, broker, partition, info):
"""
Replicate a database to a node.
@ -367,21 +375,22 @@ class Replicator(Daemon):
self.logger.error(
_('ERROR Unable to connect to remote server: %s'), node)
return False
sync_args = self._gather_sync_args(info)
with Timeout(self.node_timeout):
response = http.replicate(
'sync', info['max_row'], info['hash'], info['id'],
info['created_at'], info['put_timestamp'],
info['delete_timestamp'], info['metadata'])
response = http.replicate('sync', *sync_args)
if not response:
return False
elif response.status == HTTP_NOT_FOUND: # completely missing, rsync
return self._handle_sync_response(node, response, info, broker, http)
def _handle_sync_response(self, node, response, info, broker, http):
if response.status == HTTP_NOT_FOUND: # completely missing, rsync
self.stats['rsync'] += 1
self.logger.increment('rsyncs')
return self._rsync_db(broker, node, http, info['id'])
elif response.status == HTTP_INSUFFICIENT_STORAGE:
raise DriveNotMounted()
elif response.status >= 200 and response.status < 300:
rinfo = simplejson.loads(response.data)
rinfo = json.loads(response.data)
local_sync = broker.get_sync(rinfo['id'], incoming=False)
if self._in_sync(rinfo, info, broker, local_sync):
return True
@ -416,17 +425,16 @@ class Replicator(Daemon):
broker.reclaim(now - self.reclaim_age,
now - (self.reclaim_age * 2))
info = broker.get_replication_info()
full_info = broker.get_info()
bpart = self.ring.get_part(
full_info['account'], full_info.get('container'))
info['account'], info.get('container'))
if bpart != int(partition):
partition = bpart
# Important to set this false here since the later check only
# checks if it's on the proper device, not partition.
shouldbehere = False
name = '/' + quote(full_info['account'])
if 'container' in full_info:
name += '/' + quote(full_info['container'])
name = '/' + quote(info['account'])
if 'container' in info:
name += '/' + quote(info['container'])
self.logger.error(
'Found %s for %s when it should be on partition %s; will '
'replicate out and remove.' % (object_file, name, bpart))
@ -454,7 +462,7 @@ class Replicator(Daemon):
if delete_timestamp < (now - self.reclaim_age) and \
delete_timestamp > put_timestamp and \
info['count'] in (None, '', 0, '0'):
if self.report_up_to_date(full_info):
if self.report_up_to_date(info):
self.delete_db(object_file)
self.logger.timing_since('timing', start_time)
return
@ -597,55 +605,90 @@ class ReplicatorRpc(object):
return HTTPNotFound()
return getattr(self, op)(self.broker_class(db_file), args)
def sync(self, broker, args):
@contextmanager
def debug_timing(self, name):
timemark = time.time()
yield
timespan = time.time() - timemark
if timespan > DEBUG_TIMINGS_THRESHOLD:
self.logger.debug(
'replicator-rpc-sync time for %s: %.02fs' % (
name, timespan))
def _parse_sync_args(self, args):
"""
Convert remote sync args to remote_info dictionary.
"""
(remote_sync, hash_, id_, created_at, put_timestamp,
delete_timestamp, metadata) = args
timemark = time.time()
try:
info = broker.get_replication_info()
except (Exception, Timeout) as e:
if 'no such table' in str(e):
self.logger.error(_("Quarantining DB %s"), broker)
quarantine_db(broker.db_file, broker.db_type)
return HTTPNotFound()
raise
timespan = time.time() - timemark
if timespan > DEBUG_TIMINGS_THRESHOLD:
self.logger.debug('replicator-rpc-sync time for info: %.02fs' %
timespan)
delete_timestamp, metadata) = args[:7]
remote_metadata = {}
if metadata:
timemark = time.time()
broker.update_metadata(simplejson.loads(metadata))
timespan = time.time() - timemark
if timespan > DEBUG_TIMINGS_THRESHOLD:
self.logger.debug('replicator-rpc-sync time for '
'update_metadata: %.02fs' % timespan)
if info['put_timestamp'] != put_timestamp or \
info['created_at'] != created_at or \
info['delete_timestamp'] != delete_timestamp:
timemark = time.time()
broker.merge_timestamps(
created_at, put_timestamp, delete_timestamp)
timespan = time.time() - timemark
if timespan > DEBUG_TIMINGS_THRESHOLD:
self.logger.debug('replicator-rpc-sync time for '
'merge_timestamps: %.02fs' % timespan)
timemark = time.time()
info['point'] = broker.get_sync(id_)
timespan = time.time() - timemark
if timespan > DEBUG_TIMINGS_THRESHOLD:
self.logger.debug('replicator-rpc-sync time for get_sync: '
'%.02fs' % timespan)
if hash_ == info['hash'] and info['point'] < remote_sync:
timemark = time.time()
broker.merge_syncs([{'remote_id': id_,
'sync_point': remote_sync}])
info['point'] = remote_sync
timespan = time.time() - timemark
if timespan > DEBUG_TIMINGS_THRESHOLD:
self.logger.debug('replicator-rpc-sync time for '
'merge_syncs: %.02fs' % timespan)
return Response(simplejson.dumps(info))
try:
remote_metadata = json.loads(metadata)
except ValueError:
self.logger.error("Unable to decode remote metadata %r",
metadata)
remote_info = {
'point': remote_sync,
'hash': hash_,
'id': id_,
'created_at': created_at,
'put_timestamp': put_timestamp,
'delete_timestamp': delete_timestamp,
'metadata': remote_metadata,
}
return remote_info
def sync(self, broker, args):
remote_info = self._parse_sync_args(args)
return self._handle_sync_request(broker, remote_info)
def _get_synced_replication_info(self, broker, remote_info):
"""
Apply any changes to the broker based on remote_info and return the
current replication info.
:param broker: the database broker
:param remote_info: the remote replication info
:returns: local broker replication info
"""
return broker.get_replication_info()
def _handle_sync_request(self, broker, remote_info):
"""
Update metadata, timestamps, sync points.
"""
with self.debug_timing('info'):
try:
info = self._get_synced_replication_info(broker, remote_info)
except (Exception, Timeout) as e:
if 'no such table' in str(e):
self.logger.error(_("Quarantining DB %s"), broker)
quarantine_db(broker.db_file, broker.db_type)
return HTTPNotFound()
raise
if remote_info['metadata']:
with self.debug_timing('update_metadata'):
broker.update_metadata(remote_info['metadata'])
sync_timestamps = ('created_at', 'put_timestamp', 'delete_timestamp')
if any(info[ts] != remote_info[ts] for ts in sync_timestamps):
with self.debug_timing('merge_timestamps'):
broker.merge_timestamps(*(remote_info[ts] for ts in
sync_timestamps))
with self.debug_timing('get_sync'):
info['point'] = broker.get_sync(remote_info['id'])
if remote_info['hash'] == info['hash'] and \
info['point'] < remote_info['point']:
with self.debug_timing('merge_syncs'):
translate = {
'remote_id': 'id',
'sync_point': 'point',
}
data = dict((k, remote_info[v]) for k, v in translate.items())
broker.merge_syncs([data])
info['point'] = remote_info['point']
return Response(json.dumps(info))
def merge_syncs(self, broker, args):
broker.merge_syncs(args[0])

View File

@ -13,8 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from swift.container.backend import ContainerBroker, DATADIR
from swift.container.reconciler import incorrect_policy_index
from swift.common import db_replicator
from swift.common.utils import json, normalize_timestamp
from swift.common.http import is_success
from swift.common.storage_policy import POLICIES
class ContainerReplicator(db_replicator.Replicator):
@ -29,3 +35,58 @@ class ContainerReplicator(db_replicator.Replicator):
if full_info['reported_' + key] != full_info[key]:
return False
return True
def _gather_sync_args(self, replication_info):
parent = super(ContainerReplicator, self)
sync_args = parent._gather_sync_args(replication_info)
if len(POLICIES) > 1:
sync_args += tuple(replication_info[k] for k in
('status_changed_at', 'count',
'storage_policy_index'))
return sync_args
def _handle_sync_response(self, node, response, info, broker, http):
parent = super(ContainerReplicator, self)
if is_success(response.status):
remote_info = json.loads(response.data)
if incorrect_policy_index(info, remote_info):
status_changed_at = normalize_timestamp(time.time())
broker.set_storage_policy_index(
remote_info['storage_policy_index'],
timestamp=status_changed_at)
broker.merge_timestamps(*(remote_info[key] for key in (
'created_at', 'put_timestamp', 'delete_timestamp')))
rv = parent._handle_sync_response(
node, response, info, broker, http)
return rv
class ContainerReplicatorRpc(db_replicator.ReplicatorRpc):
def _parse_sync_args(self, args):
parent = super(ContainerReplicatorRpc, self)
remote_info = parent._parse_sync_args(args)
if len(args) > 9:
remote_info['status_changed_at'] = args[7]
remote_info['count'] = args[8]
remote_info['storage_policy_index'] = args[9]
return remote_info
def _get_synced_replication_info(self, broker, remote_info):
"""
Sync the remote_info storage_policy_index if needed and return the
newly synced replication info.
:param broker: the database broker
:param remote_info: the remote replication info
:returns: local broker replication info
"""
info = broker.get_replication_info()
if incorrect_policy_index(info, remote_info):
status_changed_at = normalize_timestamp(time.time())
broker.set_storage_policy_index(
remote_info['storage_policy_index'],
timestamp=status_changed_at)
info = broker.get_replication_info()
return info

View File

@ -24,6 +24,7 @@ from eventlet import Timeout
import swift.common.db
from swift.container.backend import ContainerBroker, DATADIR
from swift.container.replicator import ContainerReplicatorRpc
from swift.common.db import DatabaseAlreadyExists
from swift.common.container_sync_realms import ContainerSyncRealms
from swift.common.request_helpers import get_param, get_listing_content_type, \
@ -36,7 +37,6 @@ from swift.common.constraints import check_mount, check_float, check_utf8
from swift.common import constraints
from swift.common.bufferedhttp import http_connect
from swift.common.exceptions import ConnectionTimeout
from swift.common.db_replicator import ReplicatorRpc
from swift.common.http import HTTP_NOT_FOUND, is_success
from swift.common.storage_policy import POLICIES, POLICY_INDEX
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPConflict, \
@ -102,7 +102,7 @@ class ContainerController(object):
h.strip()
for h in conf.get('allowed_sync_hosts', '127.0.0.1').split(',')
if h.strip()]
self.replicator_rpc = ReplicatorRpc(
self.replicator_rpc = ContainerReplicatorRpc(
self.root, DATADIR, ContainerBroker, self.mount_check,
logger=self.logger)
self.auto_create_account_prefix = \

View File

@ -20,11 +20,13 @@ from optparse import OptionParser
from urlparse import urlparse
import random
from nose import SkipTest
from swift.common.manager import Manager
from swift.common.storage_policy import POLICIES
from swift.common import utils, ring
from swift.common import utils, ring, direct_client
from swift.common.http import HTTP_NOT_FOUND
from test.probe.common import reset_environment
from test.probe.common import reset_environment, get_to_final_state
from swiftclient import client, get_auth, ClientException
@ -172,7 +174,7 @@ class TestContainerMergePolicyIndex(unittest.TestCase):
def setUp(self):
if len(POLICIES) < 2:
raise unittest.SkipTest()
raise SkipTest()
(self.pids, self.port2server, self.account_ring, self.container_ring,
self.object_ring, self.url, self.token,
self.account, self.configs) = reset_environment()
@ -181,6 +183,40 @@ class TestContainerMergePolicyIndex(unittest.TestCase):
self.brain = BrainSplitter(self.url, self.token, self.container_name,
self.object_name)
def test_merge_storage_policy_index(self):
# generic split brain
self.brain.stop_primary_half()
self.brain.put_container()
self.brain.start_primary_half()
self.brain.stop_handoff_half()
self.brain.put_container()
self.brain.put_object()
self.brain.start_handoff_half()
# make sure we have some manner of split brain
container_part, container_nodes = self.container_ring.get_nodes(
self.account, self.container_name)
head_responses = []
for node in container_nodes:
metadata = direct_client.direct_head_container(
node, container_part, self.account, self.container_name)
head_responses.append((node, metadata))
found_policy_indexes = set(metadata['x-storage-policy-index'] for
node, metadata in head_responses)
self.assert_(len(found_policy_indexes) > 1,
'primary nodes did not disagree about policy index %r' %
head_responses)
get_to_final_state()
head_responses = []
for node in container_nodes:
metadata = direct_client.direct_head_container(
node, container_part, self.account, self.container_name)
head_responses.append((node, metadata))
found_policy_indexes = set(metadata['x-storage-policy-index'] for
node, metadata in head_responses)
self.assert_(len(found_policy_indexes) == 1,
'primary nodes disagree about policy index %r' %
head_responses)
def main():
options, commands = parser.parse_args()

View File

@ -21,14 +21,15 @@ import errno
import math
import time
from mock import patch, call
from shutil import rmtree
from shutil import rmtree, copy
from tempfile import mkdtemp, NamedTemporaryFile
import mock
import simplejson
from swift.container.backend import DATADIR
from swift.common import db_replicator
from swift.common.utils import normalize_timestamp
from swift.common.utils import (normalize_timestamp, hash_path,
storage_directory)
from swift.common.exceptions import DriveNotMounted
from swift.common.swob import HTTPException
@ -181,6 +182,7 @@ class FakeBroker(object):
get_repl_missing_table = False
stub_replication_info = None
db_type = 'container'
db_contains_type = 'object'
info = {'account': TEST_ACCOUNT_NAME, 'container': TEST_CONTAINER_NAME}
def __init__(self, *args, **kwargs):
@ -215,17 +217,21 @@ class FakeBroker(object):
def get_replication_info(self):
if self.get_repl_missing_table:
raise Exception('no such table')
info = dict(self.info)
info.update({
'hash': 12345,
'delete_timestamp': 0,
'put_timestamp': 1,
'created_at': 1,
'count': 0,
})
if self.stub_replication_info:
return self.stub_replication_info
return {'delete_timestamp': 0, 'put_timestamp': 1, 'count': 0,
'hash': 12345, 'created_at': 1}
info.update(self.stub_replication_info)
return info
def reclaim(self, item_timestamp, sync_timestamp):
pass
def get_info(self):
return self.info
def newid(self, remote_d):
pass
@ -240,6 +246,7 @@ class FakeBroker(object):
class FakeAccountBroker(FakeBroker):
db_type = 'account'
db_contains_type = 'container'
info = {'account': TEST_ACCOUNT_NAME}
@ -578,7 +585,7 @@ class TestDBReplicator(unittest.TestCase):
try:
replicator.delete_db = self.stub_delete_db
replicator.brokerclass.stub_replication_info = {
'delete_timestamp': 2, 'put_timestamp': 1, 'count': 0}
'delete_timestamp': 2, 'put_timestamp': 1}
replicator._replicate_object('0', '/path/to/file', 'node_id')
finally:
replicator.brokerclass.stub_replication_info = None
@ -601,10 +608,10 @@ class TestDBReplicator(unittest.TestCase):
node_id = replicator.ring.get_part_nodes(part)[0]['id']
replicator._replicate_object(str(part), '/path/to/file', node_id)
self.assertEqual(['/path/to/file'], self.delete_db_calls)
self.assertEqual(
replicator.logger.log_dict['error'],
[(('Found /path/to/file for /a%20c%20t when it should be on '
'partition 0; will replicate out and remove.',), {})])
error_msgs = replicator.logger.get_lines_for_level('error')
expected = 'Found /path/to/file for /a%20c%20t when it should be ' \
'on partition 0; will replicate out and remove.'
self.assertEqual(error_msgs, [expected])
def test_replicate_container_out_of_place(self):
replicator = TestReplicator({}, logger=unit.FakeLogger())
@ -885,10 +892,14 @@ class TestDBReplicator(unittest.TestCase):
def test_replicator_sync_with_broker_replication_missing_table(self):
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, False)
rpc.logger = unit.debug_logger()
broker = FakeBroker()
broker.get_repl_missing_table = True
called = []
def mock_quarantine_db(object_file, server_type):
called.append(True)
self.assertEquals(broker.db_file, object_file)
self.assertEquals(broker.db_type, server_type)
@ -901,6 +912,11 @@ class TestDBReplicator(unittest.TestCase):
self.assertEquals('404 Not Found', response.status)
self.assertEquals(404, response.status_int)
self.assertEqual(called, [True])
errors = rpc.logger.get_lines_for_level('error')
self.assertEqual(errors,
["Unable to decode remote metadata 'metadata'",
"Quarantining DB %s" % broker])
def test_replicator_sync(self):
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, False)
@ -1223,5 +1239,110 @@ class TestReplToNode(unittest.TestCase):
self.fake_node, FakeBroker(), '0', self.fake_info), False)
class FakeHTTPResponse(object):
def __init__(self, resp):
self.resp = resp
@property
def status(self):
return self.resp.status_int
@property
def data(self):
return self.resp.body
def attach_fake_replication_rpc(rpc, replicate_hook=None):
class FakeReplConnection(object):
def __init__(self, node, partition, hash_, logger):
self.logger = logger
self.node = node
self.partition = partition
self.path = '/%s/%s/%s' % (node['device'], partition, hash_)
self.host = node['replication_ip']
def replicate(self, op, *sync_args):
print 'REPLICATE: %s, %s, %r' % (self.path, op, sync_args)
replicate_args = self.path.lstrip('/').split('/')
args = [op] + list(sync_args)
swob_response = rpc.dispatch(replicate_args, args)
resp = FakeHTTPResponse(swob_response)
if replicate_hook:
replicate_hook(op, *sync_args)
return resp
return FakeReplConnection
class TestReplicatorSync(unittest.TestCase):
backend = None # override in subclass
datadir = None
replicator_daemon = db_replicator.Replicator
replicator_rpc = db_replicator.ReplicatorRpc
def setUp(self):
self.root = mkdtemp()
self.rpc = self.replicator_rpc(
self.root, self.datadir, self.backend, False,
logger=unit.debug_logger())
FakeReplConnection = attach_fake_replication_rpc(self.rpc)
self._orig_ReplConnection = db_replicator.ReplConnection
db_replicator.ReplConnection = FakeReplConnection
self._orig_Ring = db_replicator.ring.Ring
self._ring = unit.FakeRing()
db_replicator.ring.Ring = lambda *args, **kwargs: self._get_ring()
self.logger = unit.debug_logger()
def tearDown(self):
db_replicator.ReplConnection = self._orig_ReplConnection
db_replicator.ring.Ring = self._orig_Ring
rmtree(self.root)
def _get_ring(self):
return self._ring
def _get_broker(self, account, container=None, node_index=0):
hash_ = hash_path(account, container)
part, nodes = self._ring.get_nodes(account, container)
drive = nodes[node_index]['device']
db_path = os.path.join(self.root, drive,
storage_directory(self.datadir, part, hash_),
hash_ + '.db')
return self.backend(db_path, account=account, container=container)
def _get_broker_part_node(self, broker):
part, nodes = self._ring.get_nodes(broker.account, broker.container)
storage_dir = broker.db_file[len(self.root):].lstrip(os.path.sep)
broker_device = storage_dir.split(os.path.sep, 1)[0]
for node in nodes:
if node['device'] == broker_device:
return part, node
def _run_once(self, node, conf_updates=None, daemon=None):
conf = {
'devices': self.root,
'recon_cache_path': self.root,
'mount_check': 'false',
'bind_port': node['replication_port'],
}
if conf_updates:
conf.update(conf_updates)
daemon = daemon or self.replicator_daemon(conf, logger=self.logger)
def _rsync_file(db_file, remote_file, **kwargs):
remote_server, remote_path = remote_file.split('/', 1)
dest_path = os.path.join(self.root, remote_path)
copy(db_file, dest_path)
return True
daemon._rsync_file = _rsync_file
with mock.patch('swift.common.db_replicator.whataremyips',
new=lambda: [node['replication_ip']]):
daemon.run_once()
return daemon
if __name__ == '__main__':
unittest.main()

View File

@ -13,9 +13,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
import shutil
import itertools
import unittest
from swift.container import replicator
import mock
import random
import sqlite3
from swift.common import db_replicator
from swift.container import replicator, backend, server
from swift.common.utils import normalize_timestamp
from swift.common.storage_policy import POLICIES
from test.unit.common import test_db_replicator
from test.unit import patch_policies
class TestReplicator(unittest.TestCase):
@ -56,5 +69,601 @@ class TestReplicator(unittest.TestCase):
self.assertTrue(repl.report_up_to_date(info))
@patch_policies
class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
backend = backend.ContainerBroker
datadir = server.DATADIR
replicator_daemon = replicator.ContainerReplicator
replicator_rpc = replicator.ContainerReplicatorRpc
def test_sync_remote_in_sync(self):
# setup a local container
broker = self._get_broker('a', 'c', node_index=0)
put_timestamp = time.time()
broker.initialize(put_timestamp, POLICIES.default.idx)
# "replicate" to same database
node = {'device': 'sdb', 'replication_ip': '127.0.0.1'}
daemon = replicator.ContainerReplicator({})
# replicate
part, node = self._get_broker_part_node(broker)
info = broker.get_replication_info()
success = daemon._repl_to_node(node, broker, part, info)
# nothing to do
self.assertTrue(success)
self.assertEqual(1, daemon.stats['no_change'])
def test_sync_remote_with_timings(self):
# setup a local container
broker = self._get_broker('a', 'c', node_index=0)
put_timestamp = time.time()
broker.initialize(put_timestamp, POLICIES.default.idx)
broker.update_metadata(
{'x-container-meta-test': ('foo', put_timestamp)})
# setup remote container
remote_broker = self._get_broker('a', 'c', node_index=1)
remote_broker.initialize(time.time(), POLICIES.default.idx)
timestamp = time.time()
for db in (broker, remote_broker):
db.put_object('/a/c/o', timestamp, 0, 'content-type', 'etag',
storage_policy_index=db.storage_policy_index)
# replicate
daemon = replicator.ContainerReplicator({})
part, node = self._get_broker_part_node(remote_broker)
info = broker.get_replication_info()
with mock.patch.object(db_replicator, 'DEBUG_TIMINGS_THRESHOLD', 0):
success = daemon._repl_to_node(node, broker, part, info)
# nothing to do
self.assertTrue(success)
self.assertEqual(1, daemon.stats['no_change'])
expected_timings = ('info', 'update_metadata', 'merge_timestamps',
'get_sync', 'merge_syncs')
debug_lines = self.rpc.logger.logger.get_lines_for_level('debug')
self.assertEqual(len(expected_timings), len(debug_lines))
for metric in expected_timings:
expected = 'replicator-rpc-sync time for %s:' % metric
self.assert_(any(expected in line for line in debug_lines),
'debug timing %r was not in %r' % (
expected, debug_lines))
def test_sync_remote_missing(self):
broker = self._get_broker('a', 'c', node_index=0)
put_timestamp = time.time()
broker.initialize(put_timestamp, POLICIES.default.idx)
# "replicate"
part, node = self._get_broker_part_node(broker)
daemon = self._run_once(node)
# complete rsync to all other nodes
self.assertEqual(2, daemon.stats['rsync'])
for i in range(1, 3):
remote_broker = self._get_broker('a', 'c', node_index=i)
self.assertTrue(os.path.exists(remote_broker.db_file))
remote_info = remote_broker.get_info()
local_info = self._get_broker(
'a', 'c', node_index=0).get_info()
for k, v in local_info.items():
if k == 'id':
continue
self.assertEqual(remote_info[k], v,
"mismatch remote %s %r != %r" % (
k, remote_info[k], v))
def test_rsync_failure(self):
broker = self._get_broker('a', 'c', node_index=0)
put_timestamp = time.time()
broker.initialize(put_timestamp, POLICIES.default.idx)
# "replicate" to different device
daemon = replicator.ContainerReplicator({})
def _rsync_file(*args, **kwargs):
return False
daemon._rsync_file = _rsync_file
# replicate
part, local_node = self._get_broker_part_node(broker)
node = random.choice([n for n in self._ring.devs if n != local_node])
info = broker.get_replication_info()
success = daemon._repl_to_node(node, broker, part, info)
self.assertFalse(success)
def test_sync_remote_missing_most_rows(self):
put_timestamp = time.time()
# create "local" broker
broker = self._get_broker('a', 'c', node_index=0)
broker.initialize(put_timestamp, POLICIES.default.idx)
# create "remote" broker
remote_broker = self._get_broker('a', 'c', node_index=1)
remote_broker.initialize(put_timestamp, POLICIES.default.idx)
# add a row to "local" db
broker.put_object('/a/c/o', time.time(), 0, 'content-type', 'etag',
storage_policy_index=broker.storage_policy_index)
#replicate
node = {'device': 'sdc', 'replication_ip': '127.0.0.1'}
daemon = replicator.ContainerReplicator({})
def _rsync_file(db_file, remote_file, **kwargs):
remote_server, remote_path = remote_file.split('/', 1)
dest_path = os.path.join(self.root, remote_path)
shutil.copy(db_file, dest_path)
return True
daemon._rsync_file = _rsync_file
part, node = self._get_broker_part_node(remote_broker)
info = broker.get_replication_info()
success = daemon._repl_to_node(node, broker, part, info)
self.assertTrue(success)
# row merge
self.assertEqual(1, daemon.stats['remote_merge'])
local_info = self._get_broker(
'a', 'c', node_index=0).get_info()
remote_info = self._get_broker(
'a', 'c', node_index=1).get_info()
for k, v in local_info.items():
if k == 'id':
continue
self.assertEqual(remote_info[k], v,
"mismatch remote %s %r != %r" % (
k, remote_info[k], v))
def test_sync_remote_missing_one_rows(self):
put_timestamp = time.time()
# create "local" broker
broker = self._get_broker('a', 'c', node_index=0)
broker.initialize(put_timestamp, POLICIES.default.idx)
# create "remote" broker
remote_broker = self._get_broker('a', 'c', node_index=1)
remote_broker.initialize(put_timestamp, POLICIES.default.idx)
# add some rows to both db
for i in range(10):
put_timestamp = time.time()
for db in (broker, remote_broker):
path = '/a/c/o_%s' % i
db.put_object(path, put_timestamp, 0, 'content-type', 'etag',
storage_policy_index=db.storage_policy_index)
# now a row to the "local" broker only
broker.put_object('/a/c/o_missing', time.time(), 0,
'content-type', 'etag',
storage_policy_index=broker.storage_policy_index)
# replicate
daemon = replicator.ContainerReplicator({})
part, node = self._get_broker_part_node(remote_broker)
info = broker.get_replication_info()
success = daemon._repl_to_node(node, broker, part, info)
self.assertTrue(success)
# row merge
self.assertEqual(1, daemon.stats['diff'])
local_info = self._get_broker(
'a', 'c', node_index=0).get_info()
remote_info = self._get_broker(
'a', 'c', node_index=1).get_info()
for k, v in local_info.items():
if k == 'id':
continue
self.assertEqual(remote_info[k], v,
"mismatch remote %s %r != %r" % (
k, remote_info[k], v))
def test_sync_remote_can_not_keep_up(self):
put_timestamp = time.time()
# create "local" broker
broker = self._get_broker('a', 'c', node_index=0)
broker.initialize(put_timestamp, POLICIES.default.idx)
# create "remote" broker
remote_broker = self._get_broker('a', 'c', node_index=1)
remote_broker.initialize(put_timestamp, POLICIES.default.idx)
# add some rows to both db's
for i in range(10):
put_timestamp = time.time()
for db in (broker, remote_broker):
obj_name = 'o_%s' % i
db.put_object(obj_name, put_timestamp, 0,
'content-type', 'etag',
storage_policy_index=db.storage_policy_index)
# setup REPLICATE callback to simulate adding rows during merge_items
missing_counter = itertools.count()
def put_more_objects(op, *args):
if op != 'merge_items':
return
path = '/a/c/o_missing_%s' % missing_counter.next()
broker.put_object(path, time.time(), 0, 'content-type', 'etag',
storage_policy_index=db.storage_policy_index)
test_db_replicator.FakeReplConnection = \
test_db_replicator.attach_fake_replication_rpc(
self.rpc, replicate_hook=put_more_objects)
db_replicator.ReplConnection = test_db_replicator.FakeReplConnection
# and add one extra to local db to trigger merge_items
put_more_objects('merge_items')
# limit number of times we'll call merge_items
daemon = replicator.ContainerReplicator({'max_diffs': 10})
# replicate
part, node = self._get_broker_part_node(remote_broker)
info = broker.get_replication_info()
success = daemon._repl_to_node(node, broker, part, info)
self.assertFalse(success)
# back off on the PUTs during replication...
FakeReplConnection = test_db_replicator.attach_fake_replication_rpc(
self.rpc, replicate_hook=None)
db_replicator.ReplConnection = FakeReplConnection
# retry replication
info = broker.get_replication_info()
success = daemon._repl_to_node(node, broker, part, info)
self.assertTrue(success)
# row merge
self.assertEqual(2, daemon.stats['diff'])
self.assertEqual(1, daemon.stats['diff_capped'])
local_info = self._get_broker(
'a', 'c', node_index=0).get_info()
remote_info = self._get_broker(
'a', 'c', node_index=1).get_info()
for k, v in local_info.items():
if k == 'id':
continue
self.assertEqual(remote_info[k], v,
"mismatch remote %s %r != %r" % (
k, remote_info[k], v))
def test_sync_status_change(self):
# setup a local container
broker = self._get_broker('a', 'c', node_index=0)
put_timestamp = time.time()
broker.initialize(put_timestamp, POLICIES.default.idx)
# setup remote container
remote_broker = self._get_broker('a', 'c', node_index=1)
remote_broker.initialize(put_timestamp, POLICIES.default.idx)
# delete local container
broker.delete_db(time.time())
# replicate
daemon = replicator.ContainerReplicator({})
part, node = self._get_broker_part_node(remote_broker)
info = broker.get_replication_info()
success = daemon._repl_to_node(node, broker, part, info)
# nothing to do
self.assertTrue(success)
self.assertEqual(1, daemon.stats['no_change'])
# status in sync
self.assertTrue(remote_broker.is_deleted())
info = broker.get_info()
remote_info = remote_broker.get_info()
self.assert_(float(remote_info['status_changed_at']) >
float(remote_info['put_timestamp']),
'remote status_changed_at (%s) is not '
'greater than put_timestamp (%s)' % (
remote_info['status_changed_at'],
remote_info['put_timestamp']))
self.assert_(float(remote_info['status_changed_at']) >
float(info['status_changed_at']),
'remote status_changed_at (%s) is not '
'greater than local status_changed_at (%s)' % (
remote_info['status_changed_at'],
info['status_changed_at']))
def test_sync_bogus_db_quarantines(self):
ts = (normalize_timestamp(t) for t in
itertools.count(int(time.time())))
policy = random.choice(list(POLICIES))
# create "local" broker
local_broker = self._get_broker('a', 'c', node_index=0)
local_broker.initialize(ts.next(), policy.idx)
# create "remote" broker
remote_broker = self._get_broker('a', 'c', node_index=1)
remote_broker.initialize(ts.next(), policy.idx)
db_path = local_broker.db_file
self.assertTrue(os.path.exists(db_path)) # sanity check
old_inode = os.stat(db_path).st_ino
_orig_get_info = backend.ContainerBroker.get_info
def fail_like_bad_db(broker):
if broker.db_file == local_broker.db_file:
raise sqlite3.OperationalError("no such table: container_info")
else:
return _orig_get_info(broker)
part, node = self._get_broker_part_node(remote_broker)
with mock.patch('swift.container.backend.ContainerBroker.get_info',
fail_like_bad_db):
# Have the remote node replicate to local; local should see its
# corrupt DB, quarantine it, and act like the DB wasn't ever there
# in the first place.
daemon = self._run_once(node)
self.assertTrue(os.path.exists(db_path))
# Make sure we didn't just keep the old DB, but quarantined it and
# made a fresh copy.
new_inode = os.stat(db_path).st_ino
self.assertNotEqual(old_inode, new_inode)
self.assertEqual(daemon.stats['failure'], 0)
def _replication_scenarios(self, *scenarios, **kwargs):
remote_wins = kwargs.get('remote_wins', False)
# these tests are duplicated because of the differences in replication
# when row counts cause full rsync vs. merge
scenarios = scenarios or (
'no_row', 'local_row', 'remote_row', 'both_rows')
for scenario_name in scenarios:
ts = itertools.count(int(time.time()))
policy = random.choice(list(POLICIES))
remote_policy = random.choice(
[p for p in POLICIES if p is not policy])
broker = self._get_broker('a', 'c', node_index=0)
remote_broker = self._get_broker('a', 'c', node_index=1)
yield ts, policy, remote_policy, broker, remote_broker
# variations on different replication scenarios
variations = {
'no_row': (),
'local_row': (broker,),
'remote_row': (remote_broker,),
'both_rows': (broker, remote_broker),
}
dbs = variations[scenario_name]
obj_ts = ts.next()
for db in dbs:
db.put_object('/a/c/o', obj_ts, 0, 'content-type', 'etag',
storage_policy_index=db.storage_policy_index)
# replicate
part, node = self._get_broker_part_node(broker)
daemon = self._run_once(node)
self.assertEqual(0, daemon.stats['failure'])
# in sync
local_info = self._get_broker(
'a', 'c', node_index=0).get_info()
remote_info = self._get_broker(
'a', 'c', node_index=1).get_info()
if remote_wins:
expected = remote_policy.idx
err = 'local policy did not change to match remote ' \
'for replication row scenario %s' % scenario_name
else:
expected = policy.idx
err = 'local policy changed to match remote ' \
'for replication row scenario %s' % scenario_name
self.assertEqual(local_info['storage_policy_index'], expected, err)
self.assertEqual(remote_info['storage_policy_index'],
local_info['storage_policy_index'])
test_db_replicator.TestReplicatorSync.tearDown(self)
test_db_replicator.TestReplicatorSync.setUp(self)
def test_sync_local_create_policy_over_newer_remote_create(self):
for setup in self._replication_scenarios():
ts, policy, remote_policy, broker, remote_broker = setup
# create "local" broker
broker.initialize(ts.next(), policy.idx)
# create "remote" broker
remote_broker.initialize(ts.next(), remote_policy.idx)
def test_sync_local_create_policy_over_newer_remote_delete(self):
for setup in self._replication_scenarios():
ts, policy, remote_policy, broker, remote_broker = setup
# create older "local" broker
broker.initialize(ts.next(), policy.idx)
# create "remote" broker
remote_broker.initialize(ts.next(), remote_policy.idx)
# delete "remote" broker
remote_broker.delete_db(ts.next())
def test_sync_local_create_policy_over_older_remote_delete(self):
# remote_row & both_rows cases are covered by
# "test_sync_remote_half_delete_policy_over_newer_local_create"
for setup in self._replication_scenarios(
'no_row', 'local_row'):
ts, policy, remote_policy, broker, remote_broker = setup
# create older "remote" broker
remote_broker.initialize(ts.next(), remote_policy.idx)
# delete older "remote" broker
remote_broker.delete_db(ts.next())
# create "local" broker
broker.initialize(ts.next(), policy.idx)
def test_sync_local_half_delete_policy_over_newer_remote_create(self):
# no_row & remote_row cases are covered by
# "test_sync_remote_create_policy_over_older_local_delete"
for setup in self._replication_scenarios('local_row', 'both_rows'):
ts, policy, remote_policy, broker, remote_broker = setup
# create older "local" broker
broker.initialize(ts.next(), policy.idx)
# half delete older "local" broker
broker.delete_db(ts.next())
# create "remote" broker
remote_broker.initialize(ts.next(), remote_policy.idx)
def test_sync_local_recreate_policy_over_newer_remote_create(self):
for setup in self._replication_scenarios():
ts, policy, remote_policy, broker, remote_broker = setup
# create "local" broker
broker.initialize(ts.next(), policy.idx)
# older recreate "local" broker
broker.delete_db(ts.next())
recreate_timestamp = ts.next()
broker.update_put_timestamp(recreate_timestamp)
broker.update_status_changed_at(recreate_timestamp)
# create "remote" broker
remote_broker.initialize(ts.next(), remote_policy.idx)
def test_sync_local_recreate_policy_over_older_remote_create(self):
for setup in self._replication_scenarios():
ts, policy, remote_policy, broker, remote_broker = setup
# create older "remote" broker
remote_broker.initialize(ts.next(), remote_policy.idx)
# create "local" broker
broker.initialize(ts.next(), policy.idx)
# recreate "local" broker
broker.delete_db(ts.next())
recreate_timestamp = ts.next()
broker.update_put_timestamp(recreate_timestamp)
broker.update_status_changed_at(recreate_timestamp)
def test_sync_local_recreate_policy_over_newer_remote_delete(self):
for setup in self._replication_scenarios():
ts, policy, remote_policy, broker, remote_broker = setup
# create "local" broker
broker.initialize(ts.next(), policy.idx)
# create "remote" broker
remote_broker.initialize(ts.next(), remote_policy.idx)
# recreate "local" broker
broker.delete_db(ts.next())
recreate_timestamp = ts.next()
broker.update_put_timestamp(recreate_timestamp)
broker.update_status_changed_at(recreate_timestamp)
# older delete "remote" broker
remote_broker.delete_db(ts.next())
def test_sync_local_recreate_policy_over_older_remote_delete(self):
for setup in self._replication_scenarios():
ts, policy, remote_policy, broker, remote_broker = setup
# create "local" broker
broker.initialize(ts.next(), policy.idx)
# create "remote" broker
remote_broker.initialize(ts.next(), remote_policy.idx)
# older delete "remote" broker
remote_broker.delete_db(ts.next())
# recreate "local" broker
broker.delete_db(ts.next())
recreate_timestamp = ts.next()
broker.update_put_timestamp(recreate_timestamp)
broker.update_status_changed_at(recreate_timestamp)
def test_sync_local_recreate_policy_over_older_remote_recreate(self):
for setup in self._replication_scenarios():
ts, policy, remote_policy, broker, remote_broker = setup
# create "remote" broker
remote_broker.initialize(ts.next(), remote_policy.idx)
# create "local" broker
broker.initialize(ts.next(), policy.idx)
# older recreate "remote" broker
remote_broker.delete_db(ts.next())
remote_recreate_timestamp = ts.next()
remote_broker.update_put_timestamp(remote_recreate_timestamp)
remote_broker.update_status_changed_at(remote_recreate_timestamp)
# recreate "local" broker
broker.delete_db(ts.next())
local_recreate_timestamp = ts.next()
broker.update_put_timestamp(local_recreate_timestamp)
broker.update_status_changed_at(local_recreate_timestamp)
def test_sync_remote_create_policy_over_newer_local_create(self):
for setup in self._replication_scenarios(remote_wins=True):
ts, policy, remote_policy, broker, remote_broker = setup
# create older "remote" broker
remote_broker.initialize(ts.next(), remote_policy.idx)
# create "local" broker
broker.initialize(ts.next(), policy.idx)
def test_sync_remote_create_policy_over_newer_local_delete(self):
for setup in self._replication_scenarios(remote_wins=True):
ts, policy, remote_policy, broker, remote_broker = setup
# create older "remote" broker
remote_broker.initialize(ts.next(), remote_policy.idx)
# create "local" broker
broker.initialize(ts.next(), policy.idx)
# delete "local" broker
broker.delete_db(ts.next())
def test_sync_remote_create_policy_over_older_local_delete(self):
# local_row & both_rows cases are covered by
# "test_sync_local_half_delete_policy_over_newer_remote_create"
for setup in self._replication_scenarios(
'no_row', 'remote_row', remote_wins=True):
ts, policy, remote_policy, broker, remote_broker = setup
# create older "local" broker
broker.initialize(ts.next(), policy.idx)
# delete older "local" broker
broker.delete_db(ts.next())
# create "remote" broker
remote_broker.initialize(ts.next(), remote_policy.idx)
def test_sync_remote_half_delete_policy_over_newer_local_create(self):
# no_row & both_rows cases are covered by
# "test_sync_local_create_policy_over_older_remote_delete"
for setup in self._replication_scenarios('remote_row', 'both_rows',
remote_wins=True):
ts, policy, remote_policy, broker, remote_broker = setup
# create older "remote" broker
remote_broker.initialize(ts.next(), remote_policy.idx)
# half delete older "remote" broker
remote_broker.delete_db(ts.next())
# create "local" broker
broker.initialize(ts.next(), policy.idx)
def test_sync_remote_recreate_policy_over_newer_local_create(self):
for setup in self._replication_scenarios(remote_wins=True):
ts, policy, remote_policy, broker, remote_broker = setup
# create "remote" broker
remote_broker.initialize(ts.next(), remote_policy.idx)
# older recreate "remote" broker
remote_broker.delete_db(ts.next())
recreate_timestamp = ts.next()
remote_broker.update_put_timestamp(recreate_timestamp)
remote_broker.update_status_changed_at(recreate_timestamp)
# create "local" broker
broker.initialize(ts.next(), policy.idx)
def test_sync_remote_recreate_policy_over_older_local_create(self):
for setup in self._replication_scenarios(remote_wins=True):
ts, policy, remote_policy, broker, remote_broker = setup
# create older "local" broker
broker.initialize(ts.next(), policy.idx)
# create "remote" broker
remote_broker.initialize(ts.next(), remote_policy.idx)
# recreate "remote" broker
remote_broker.delete_db(ts.next())
recreate_timestamp = ts.next()
remote_broker.update_put_timestamp(recreate_timestamp)
remote_broker.update_status_changed_at(recreate_timestamp)
def test_sync_remote_recreate_policy_over_newer_local_delete(self):
for setup in self._replication_scenarios(remote_wins=True):
ts, policy, remote_policy, broker, remote_broker = setup
# create "local" broker
broker.initialize(ts.next(), policy.idx)
# create "remote" broker
remote_broker.initialize(ts.next(), remote_policy.idx)
# recreate "remote" broker
remote_broker.delete_db(ts.next())
remote_recreate_timestamp = ts.next()
remote_broker.update_put_timestamp(remote_recreate_timestamp)
remote_broker.update_status_changed_at(remote_recreate_timestamp)
# older delete "local" broker
broker.delete_db(ts.next())
def test_sync_remote_recreate_policy_over_older_local_delete(self):
for setup in self._replication_scenarios(remote_wins=True):
ts, policy, remote_policy, broker, remote_broker = setup
# create "local" broker
broker.initialize(ts.next(), policy.idx)
# create "remote" broker
remote_broker.initialize(ts.next(), remote_policy.idx)
# older delete "local" broker
broker.delete_db(ts.next())
# recreate "remote" broker
remote_broker.delete_db(ts.next())
remote_recreate_timestamp = ts.next()
remote_broker.update_put_timestamp(remote_recreate_timestamp)
remote_broker.update_status_changed_at(remote_recreate_timestamp)
def test_sync_remote_recreate_policy_over_older_local_recreate(self):
for setup in self._replication_scenarios(remote_wins=True):
ts, policy, remote_policy, broker, remote_broker = setup
# create older "local" broker
broker.initialize(ts.next(), policy.idx)
# create "remote" broker
remote_broker.initialize(ts.next(), remote_policy.idx)
# older recreate "local" broker
broker.delete_db(ts.next())
local_recreate_timestamp = ts.next()
broker.update_put_timestamp(local_recreate_timestamp)
broker.update_status_changed_at(local_recreate_timestamp)
# recreate "remote" broker
remote_broker.delete_db(ts.next())
remote_recreate_timestamp = ts.next()
remote_broker.update_put_timestamp(remote_recreate_timestamp)
remote_broker.update_status_changed_at(remote_recreate_timestamp)
if __name__ == '__main__':
unittest.main()