Enqueue misplaced objects during container replication

After a container database is replicated, a _post_replicate_hook will enqueue
misplaced objects for the container-reconciler into the .misplaced_objects
containers.  Items to be reconciled are "batch loaded" into the reconciler
queue and the end of a container replication cycle by levering container
replication itself.

DocImpact
Implements: blueprint storage-policies
Change-Id: I3627efcdea75403586dffee46537a60add08bfda
This commit is contained in:
Clay Gerrard 2014-04-29 00:21:08 -07:00
parent 6cc10d17de
commit a14d2c857c
6 changed files with 769 additions and 13 deletions

View File

@ -156,6 +156,7 @@ class Replicator(Daemon):
self.cpool = GreenPool(size=concurrency)
swift_dir = conf.get('swift_dir', '/etc/swift')
self.ring = ring.Ring(swift_dir, ring_name=self.server_type)
self._local_device_ids = set()
self.per_diff = int(conf.get('per_diff', 1000))
self.max_diffs = int(conf.get('max_diffs') or 100)
self.interval = int(conf.get('interval') or
@ -406,6 +407,14 @@ class Replicator(Daemon):
return self._usync_db(max(rinfo['point'], local_sync),
broker, http, rinfo['id'], info['id'])
def _post_replicate_hook(self, broker, info, responses):
"""
:param broker: the container that just replicated
:param info: pre-replication full info dict
:param responses: a list of bools indicating success from nodes
"""
pass
def _replicate_object(self, partition, object_file, node_id):
"""
Replicate the db, choosing method based on whether or not it
@ -490,13 +499,19 @@ class Replicator(Daemon):
self.stats['success' if success else 'failure'] += 1
self.logger.increment('successes' if success else 'failures')
responses.append(success)
try:
self._post_replicate_hook(broker, info, responses)
except (Exception, Timeout):
self.logger.exception('UNHANDLED EXCEPTION: in post replicate '
'hook for %s', broker.db_file)
if not shouldbehere and all(responses):
# If the db shouldn't be on this node and has been successfully
# synced to all of its peers, it can be removed.
self.delete_db(object_file)
self.delete_db(broker)
self.logger.timing_since('timing', start_time)
def delete_db(self, object_file):
def delete_db(self, broker):
object_file = broker.db_file
hash_dir = os.path.dirname(object_file)
suf_dir = os.path.dirname(hash_dir)
with lock_parent_directory(object_file):
@ -534,6 +549,7 @@ class Replicator(Daemon):
if not ips:
self.logger.error(_('ERROR Failed to get my own IPs?'))
return
self._local_device_ids = set()
for node in self.ring.devs:
if (node and node['replication_ip'] in ips and
node['replication_port'] == self.port):
@ -547,6 +563,7 @@ class Replicator(Daemon):
time.time() - self.reclaim_age)
datadir = os.path.join(self.root, node['device'], self.datadir)
if os.path.isdir(datadir):
self._local_device_ids.add(node['id'])
dirs.append((datadir, node['id']))
self.logger.info(_('Beginning replication run'))
for part, object_file, node_id in roundrobin_datadirs(dirs):

View File

@ -69,6 +69,10 @@ class DiskFileDeviceUnavailable(DiskFileError):
pass
class DeviceUnavailable(SwiftException):
pass
class PathNotDir(OSError):
pass

View File

@ -13,14 +13,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import itertools
import time
from collections import defaultdict
from eventlet import Timeout
from swift.container.backend import ContainerBroker, DATADIR
from swift.container.reconciler import incorrect_policy_index
from swift.container.reconciler import (
MISPLACED_OBJECTS_ACCOUNT, incorrect_policy_index,
get_reconciler_container_name, get_row_to_q_entry_translater)
from swift.common import db_replicator
from swift.common.utils import json, normalize_timestamp
from swift.common.http import is_success
from swift.common.storage_policy import POLICIES
from swift.common.exceptions import DeviceUnavailable
from swift.common.http import is_success
from swift.common.db import DatabaseAlreadyExists
from swift.common.utils import (json, normalize_timestamp, hash_path,
storage_directory, quorum_size)
class ContainerReplicator(db_replicator.Replicator):
@ -60,6 +69,169 @@ class ContainerReplicator(db_replicator.Replicator):
node, response, info, broker, http)
return rv
def find_local_handoff_for_part(self, part):
"""
Look through devices in the ring for the first handoff devie that was
identified during job creation as available on this node.
:returns: a node entry from the ring
"""
nodes = self.ring.get_part_nodes(part)
more_nodes = self.ring.get_more_nodes(part)
for node in itertools.chain(nodes, more_nodes):
if node['id'] in self._local_device_ids:
return node
return None
def get_reconciler_broker(self, timestamp):
"""
Get a local instance of the reconciler container broker that is
appropriate to enqueue the given timestamp.
:param timestamp: the timestamp of the row to be enqueued
:returns: a local reconciler broker
"""
container = get_reconciler_container_name(timestamp)
if self.reconciler_containers and \
container in self.reconciler_containers:
return self.reconciler_containers[container][1]
account = MISPLACED_OBJECTS_ACCOUNT
part = self.ring.get_part(account, container)
node = self.find_local_handoff_for_part(part)
if not node:
raise DeviceUnavailable(
'No mounted devices found suitable to Handoff reconciler '
'container %s in partition %s' % (container, part))
hsh = hash_path(account, container)
db_dir = storage_directory(DATADIR, part, hsh)
db_path = os.path.join(self.root, node['device'], db_dir, hsh + '.db')
broker = ContainerBroker(db_path, account=account, container=container)
if not os.path.exists(broker.db_file):
try:
broker.initialize(timestamp, 0)
except DatabaseAlreadyExists:
pass
if self.reconciler_containers is not None:
self.reconciler_containers[container] = part, broker, node['id']
return broker
def feed_reconciler(self, container, item_list):
"""
Add queue entries for rows in item_list to the local reconciler
container database.
:param container: the name of the reconciler container
:param item_list: the list of rows to enqueue
:returns: True if successfully enqueued
"""
try:
reconciler = self.get_reconciler_broker(container)
except DeviceUnavailable as e:
self.logger.warning('DeviceUnavailable: %s', e)
return False
self.logger.debug('Adding %d objects to the reconciler at %s',
len(item_list), reconciler.db_file)
try:
reconciler.merge_items(item_list)
except (Exception, Timeout):
self.logger.exception('UNHANDLED EXCEPTION: trying to merge '
'%d items to reconciler container %s',
len(item_list), reconciler.db_file)
return False
return True
def dump_to_reconciler(self, broker, point):
"""
Look for object rows for objects updates in the wrong storage policy
in broker with a ``ROWID`` greater than the rowid given as point.
:param broker: the container broker with misplaced objects
:param point: the last verified ``reconciler_sync_point``
:returns: the last successfull enqueued rowid
"""
max_sync = broker.get_max_row()
misplaced = broker.get_misplaced_since(point, self.per_diff)
if not misplaced:
return max_sync
translater = get_row_to_q_entry_translater(broker)
errors = False
low_sync = point
while misplaced:
batches = defaultdict(list)
for item in misplaced:
container = get_reconciler_container_name(item['created_at'])
batches[container].append(translater(item))
for container, item_list in batches.items():
success = self.feed_reconciler(container, item_list)
if not success:
errors = True
point = misplaced[-1]['ROWID']
if not errors:
low_sync = point
misplaced = broker.get_misplaced_since(point, self.per_diff)
return low_sync
def _post_replicate_hook(self, broker, info, responses):
if info['account'] == MISPLACED_OBJECTS_ACCOUNT:
return
if not broker.has_multiple_policies():
broker.update_reconciler_sync(info['max_row'])
return
point = broker.get_reconciler_sync()
max_sync = self.dump_to_reconciler(broker, point)
success = responses.count(True) >= quorum_size(len(responses))
if max_sync > point and success:
# to be safe, only slide up the sync point with a quorum on
# replication
broker.update_reconciler_sync(max_sync)
def delete_db(self, broker):
"""
Ensure that reconciler databases are only cleaned up at the end of the
replication run.
"""
if (self.reconciler_cleanups is not None and
broker.account == MISPLACED_OBJECTS_ACCOUNT):
# this container shouldn't be here, make sure it's cleaned up
self.reconciler_cleanups[broker.container] = broker
return
return super(ContainerReplicator, self).delete_db(broker)
def replicate_reconcilers(self):
"""
Ensure any items merged to reconciler containers during replication
are pushed out to correct nodes and any reconciler containers that do
not belong on this node are removed.
"""
self.logger.info('Replicating %d reconciler containers',
len(self.reconciler_containers))
for part, reconciler, node_id in self.reconciler_containers.values():
self.cpool.spawn_n(
self._replicate_object, part, reconciler.db_file, node_id)
self.cpool.waitall()
# wipe out the cache do disable bypass in delete_db
cleanups = self.reconciler_cleanups
self.reconciler_cleanups = self.reconciler_containers = None
self.logger.info('Cleaning up %d reconciler containers',
len(cleanups))
for reconciler in cleanups.values():
self.cpool.spawn_n(self.delete_db, reconciler)
self.cpool.waitall()
self.logger.info('Finished reconciler replication')
def run_once(self, *args, **kwargs):
self.reconciler_containers = {}
self.reconciler_cleanups = {}
rv = super(ContainerReplicator, self).run_once(*args, **kwargs)
if any([self.reconciler_containers, self.reconciler_cleanups]):
self.replicate_reconcilers()
return rv
class ContainerReplicatorRpc(db_replicator.ReplicatorRpc):

View File

@ -12,8 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from hashlib import md5
import sys
import itertools
import time
import unittest
import uuid
from optparse import OptionParser
@ -23,8 +25,9 @@ import random
from nose import SkipTest
from swift.common.manager import Manager
from swift.common.storage_policy import POLICIES
from swift.common import utils, ring, direct_client
from swift.common.internal_client import InternalClient
from swift.common import utils, direct_client, ring
from swift.common.storage_policy import POLICIES, POLICY_INDEX
from swift.common.http import HTTP_NOT_FOUND
from test.probe.common import reset_environment, get_to_final_state
@ -200,23 +203,378 @@ class TestContainerMergePolicyIndex(unittest.TestCase):
metadata = direct_client.direct_head_container(
node, container_part, self.account, self.container_name)
head_responses.append((node, metadata))
found_policy_indexes = set(metadata['x-storage-policy-index'] for
found_policy_indexes = set(metadata[POLICY_INDEX] for
node, metadata in head_responses)
self.assert_(len(found_policy_indexes) > 1,
'primary nodes did not disagree about policy index %r' %
head_responses)
# find our object
orig_policy_index = None
for policy_index in found_policy_indexes:
object_ring = POLICIES.get_object_ring(policy_index, '/etc/swift')
part, nodes = object_ring.get_nodes(
self.account, self.container_name, self.object_name)
for node in nodes:
try:
direct_client.direct_head_object(
node, part, self.account, self.container_name,
self.object_name, headers={POLICY_INDEX: policy_index})
except direct_client.ClientException as err:
continue
orig_policy_index = policy_index
break
if orig_policy_index is not None:
break
else:
self.fail('Unable to find /%s/%s/%s in %r' % (
self.account, self.container_name, self.object_name,
found_policy_indexes))
get_to_final_state()
Manager(['container-reconciler']).once()
# validate containers
head_responses = []
for node in container_nodes:
metadata = direct_client.direct_head_container(
node, container_part, self.account, self.container_name)
head_responses.append((node, metadata))
found_policy_indexes = set(metadata['x-storage-policy-index'] for
found_policy_indexes = set(metadata[POLICY_INDEX] for
node, metadata in head_responses)
self.assert_(len(found_policy_indexes) == 1,
'primary nodes disagree about policy index %r' %
head_responses)
expected_policy_index = found_policy_indexes.pop()
self.assertNotEqual(orig_policy_index, expected_policy_index)
# validate object placement
orig_policy_ring = POLICIES.get_object_ring(orig_policy_index,
'/etc/swift')
for node in orig_policy_ring.devs:
try:
direct_client.direct_head_object(
node, part, self.account, self.container_name,
self.object_name, headers={
POLICY_INDEX: orig_policy_index})
except direct_client.ClientException as err:
if err.http_status == HTTP_NOT_FOUND:
continue
raise
else:
self.fail('Found /%s/%s/%s in %s' % (
self.account, self.container_name, self.object_name,
orig_policy_index))
# use proxy to access object (bad container info might be cached...)
timeout = time.time() + TIMEOUT
while time.time() < timeout:
try:
metadata = client.head_object(self.url, self.token,
self.container_name,
self.object_name)
except ClientException as err:
if err.http_status != HTTP_NOT_FOUND:
raise
time.sleep(1)
else:
break
else:
self.fail('could not HEAD /%s/%s/%s/ from policy %s '
'after %s seconds.' % (
self.account, self.container_name, self.object_name,
expected_policy_index, TIMEOUT))
def test_reconcile_delete(self):
# generic split brain
self.brain.stop_primary_half()
self.brain.put_container()
self.brain.put_object()
self.brain.start_primary_half()
self.brain.stop_handoff_half()
self.brain.put_container()
self.brain.delete_object()
self.brain.start_handoff_half()
# make sure we have some manner of split brain
container_part, container_nodes = self.container_ring.get_nodes(
self.account, self.container_name)
head_responses = []
for node in container_nodes:
metadata = direct_client.direct_head_container(
node, container_part, self.account, self.container_name)
head_responses.append((node, metadata))
found_policy_indexes = set(metadata[POLICY_INDEX] for
node, metadata in head_responses)
self.assert_(len(found_policy_indexes) > 1,
'primary nodes did not disagree about policy index %r' %
head_responses)
# find our object
orig_policy_index = ts_policy_index = None
for policy_index in found_policy_indexes:
object_ring = POLICIES.get_object_ring(policy_index, '/etc/swift')
part, nodes = object_ring.get_nodes(
self.account, self.container_name, self.object_name)
for node in nodes:
try:
direct_client.direct_head_object(
node, part, self.account, self.container_name,
self.object_name, headers={POLICY_INDEX: policy_index})
except direct_client.ClientException as err:
if 'x-backend-timestamp' in err.http_headers:
ts_policy_index = policy_index
break
else:
orig_policy_index = policy_index
break
if not orig_policy_index:
self.fail('Unable to find /%s/%s/%s in %r' % (
self.account, self.container_name, self.object_name,
found_policy_indexes))
if not ts_policy_index:
self.fail('Unable to find tombstone /%s/%s/%s in %r' % (
self.account, self.container_name, self.object_name,
found_policy_indexes))
get_to_final_state()
Manager(['container-reconciler']).once()
# validate containers
head_responses = []
for node in container_nodes:
metadata = direct_client.direct_head_container(
node, container_part, self.account, self.container_name)
head_responses.append((node, metadata))
new_found_policy_indexes = set(metadata[POLICY_INDEX] for node,
metadata in head_responses)
self.assert_(len(new_found_policy_indexes) == 1,
'primary nodes disagree about policy index %r' %
dict((node['port'], metadata[POLICY_INDEX])
for node, metadata in head_responses))
expected_policy_index = new_found_policy_indexes.pop()
self.assertEqual(orig_policy_index, expected_policy_index)
# validate object fully deleted
for policy_index in found_policy_indexes:
object_ring = POLICIES.get_object_ring(policy_index, '/etc/swift')
part, nodes = object_ring.get_nodes(
self.account, self.container_name, self.object_name)
for node in nodes:
try:
direct_client.direct_head_object(
node, part, self.account, self.container_name,
self.object_name, headers={POLICY_INDEX: policy_index})
except direct_client.ClientException as err:
if err.http_status == HTTP_NOT_FOUND:
continue
else:
self.fail('Found /%s/%s/%s in %s on %s' % (
self.account, self.container_name, self.object_name,
orig_policy_index, node))
def test_reconcile_manifest(self):
manifest_data = []
def write_part(i):
body = 'VERIFY%0.2d' % i + '\x00' * 1048576
part_name = 'manifest_part_%0.2d' % i
manifest_entry = {
"path": "/%s/%s" % (self.container_name, part_name),
"etag": md5(body).hexdigest(),
"size_bytes": len(body),
}
client.put_object(self.url, self.token, self.container_name,
part_name, contents=body)
manifest_data.append(manifest_entry)
# get an old container stashed
self.brain.stop_primary_half()
policy = random.choice(list(POLICIES))
self.brain.put_container(policy.idx)
self.brain.start_primary_half()
# write some parts
for i in range(10):
write_part(i)
self.brain.stop_handoff_half()
wrong_policy = random.choice([p for p in POLICIES if p is not policy])
self.brain.put_container(wrong_policy.idx)
# write some more parts
for i in range(10, 20):
write_part(i)
# write manifest
try:
client.put_object(self.url, self.token, self.container_name,
self.object_name,
contents=utils.json.dumps(manifest_data),
query_string='multipart-manifest=put')
except ClientException as err:
# so as it works out, you can't really upload a multi-part
# manifest for objects that are currently misplaced - you have to
# wait until they're all available - which is about the same as
# some other failure that causes data to be unavailable to the
# proxy at the time of upload
self.assertEqual(err.http_status, 400)
# but what the heck, we'll sneak one in just to see what happens...
direct_manifest_name = self.object_name + '-direct-test'
object_ring = POLICIES.get_object_ring(wrong_policy.idx, '/etc/swift')
part, nodes = object_ring.get_nodes(
self.account, self.container_name, direct_manifest_name)
container_part = self.container_ring.get_part(self.account,
self.container_name)
def translate_direct(data):
return {
'hash': data['etag'],
'bytes': data['size_bytes'],
'name': data['path'],
}
direct_manifest_data = map(translate_direct, manifest_data)
headers = {
'x-container-host': ','.join('%s:%s' % (n['ip'], n['port']) for n
in self.container_ring.devs),
'x-container-device': ','.join(n['device'] for n in
self.container_ring.devs),
'x-container-partition': container_part,
POLICY_INDEX: wrong_policy.idx,
'X-Static-Large-Object': 'True',
}
for node in nodes:
direct_client.direct_put_object(
node, part, self.account, self.container_name,
direct_manifest_name,
contents=utils.json.dumps(direct_manifest_data),
headers=headers)
break # one should do it...
self.brain.start_handoff_half()
get_to_final_state()
Manager(['container-reconciler']).once()
# clear proxy cache
client.post_container(self.url, self.token, self.container_name, {})
# let's see how that direct upload worked out...
metadata, body = client.get_object(
self.url, self.token, self.container_name, direct_manifest_name,
query_string='multipart-manifest=get')
self.assertEqual(metadata['x-static-large-object'].lower(), 'true')
for i, entry in enumerate(utils.json.loads(body)):
for key in ('hash', 'bytes', 'name'):
self.assertEquals(entry[key], direct_manifest_data[i][key])
metadata, body = client.get_object(
self.url, self.token, self.container_name, direct_manifest_name)
self.assertEqual(metadata['x-static-large-object'].lower(), 'true')
self.assertEqual(int(metadata['content-length']),
sum(part['size_bytes'] for part in manifest_data))
self.assertEqual(body, ''.join('VERIFY%0.2d' % i + '\x00' * 1048576
for i in range(20)))
# and regular upload should work now too
client.put_object(self.url, self.token, self.container_name,
self.object_name,
contents=utils.json.dumps(manifest_data),
query_string='multipart-manifest=put')
metadata = client.head_object(self.url, self.token,
self.container_name,
self.object_name)
self.assertEqual(int(metadata['content-length']),
sum(part['size_bytes'] for part in manifest_data))
def test_reconciler_move_object_twice(self):
# select some policies
old_policy = random.choice(list(POLICIES))
new_policy = random.choice([p for p in POLICIES if p != old_policy])
# setup a split brain
self.brain.stop_handoff_half()
# get old_policy on two primaries
self.brain.put_container(policy_index=int(old_policy))
self.brain.start_handoff_half()
self.brain.stop_primary_half()
# force a recreate on handoffs
self.brain.put_container(policy_index=int(old_policy))
self.brain.delete_container()
self.brain.put_container(policy_index=int(new_policy))
self.brain.put_object() # populate memcache with new_policy
self.brain.start_primary_half()
# at this point two primaries have old policy
container_part, container_nodes = self.container_ring.get_nodes(
self.account, self.container_name)
head_responses = []
for node in container_nodes:
metadata = direct_client.direct_head_container(
node, container_part, self.account, self.container_name)
head_responses.append((node, metadata))
old_container_node_ids = [
node['id'] for node, metadata in head_responses
if int(old_policy) ==
int(metadata['X-Backend-Storage-Policy-Index'])]
self.assertEqual(2, len(old_container_node_ids))
# hopefully memcache still has the new policy cached
self.brain.put_object()
# double-check object correctly written to new policy
conf_files = []
for server in Manager(['container-reconciler']).servers:
conf_files.extend(server.conf_files())
conf_file = conf_files[0]
client = InternalClient(conf_file, 'probe-test', 3)
client.get_object_metadata(
self.account, self.container_name, self.object_name,
headers={'X-Backend-Storage-Policy-Index': int(new_policy)})
client.get_object_metadata(
self.account, self.container_name, self.object_name,
acceptable_statuses=(4,),
headers={'X-Backend-Storage-Policy-Index': int(old_policy)})
# shutdown the containers that know about the new policy
self.brain.stop_handoff_half()
# and get rows enqueued from old nodes
for server_type in ('container-replicator', 'container-updater'):
server = Manager([server_type])
tuple(server.once(number=n + 1) for n in old_container_node_ids)
# verify entry in the queue for the "misplaced" new_policy
for container in client.iter_containers('.misplaced_objects'):
for obj in client.iter_objects('.misplaced_objects',
container['name']):
expected = '%d:/%s/%s/%s' % (new_policy, self.account,
self.container_name,
self.object_name)
self.assertEqual(obj['name'], expected)
Manager(['container-reconciler']).once()
# verify object in old_policy
client.get_object_metadata(
self.account, self.container_name, self.object_name,
headers={'X-Backend-Storage-Policy-Index': int(old_policy)})
# verify object is *not* in new_policy
client.get_object_metadata(
self.account, self.container_name, self.object_name,
acceptable_statuses=(4,),
headers={'X-Backend-Storage-Policy-Index': int(new_policy)})
get_to_final_state()
# verify entry in the queue
client = InternalClient(conf_file, 'probe-test', 3)
for container in client.iter_containers('.misplaced_objects'):
for obj in client.iter_objects('.misplaced_objects',
container['name']):
expected = '%d:/%s/%s/%s' % (old_policy, self.account,
self.container_name,
self.object_name)
self.assertEqual(obj['name'], expected)
Manager(['container-reconciler']).once()
# and now it flops back
client.get_object_metadata(
self.account, self.container_name, self.object_name,
headers={'X-Backend-Storage-Policy-Index': int(new_policy)})
client.get_object_metadata(
self.account, self.container_name, self.object_name,
acceptable_statuses=(4,),
headers={'X-Backend-Storage-Policy-Index': int(old_policy)})
def main():
options, commands = parser.parse_args()

View File

@ -274,8 +274,8 @@ class TestDBReplicator(unittest.TestCase):
self._patchers.append(patcher)
return patched_thing
def stub_delete_db(self, object_file):
self.delete_db_calls.append(object_file)
def stub_delete_db(self, broker):
self.delete_db_calls.append('/path/to/file')
def test_repl_connection(self):
node = {'replication_ip': '127.0.0.1', 'replication_port': 80,
@ -657,7 +657,8 @@ class TestDBReplicator(unittest.TestCase):
self.assertTrue(os.path.exists(temp_file2.name))
self.assertEqual(0, replicator.stats['remove'])
replicator.delete_db(temp_file.name)
temp_file.db_file = temp_file.name
replicator.delete_db(temp_file)
self.assertTrue(os.path.exists(temp_dir))
self.assertTrue(os.path.exists(temp_suf_dir))
@ -669,7 +670,8 @@ class TestDBReplicator(unittest.TestCase):
replicator.logger.log_dict['increment'])
self.assertEqual(1, replicator.stats['remove'])
replicator.delete_db(temp_file2.name)
temp_file2.db_file = temp_file2.name
replicator.delete_db(temp_file2)
self.assertTrue(os.path.exists(temp_dir))
self.assertFalse(os.path.exists(temp_suf_dir))
@ -1343,6 +1345,19 @@ class TestReplicatorSync(unittest.TestCase):
daemon.run_once()
return daemon
def test_local_ids(self):
if self.datadir is None:
# base test case
return
for drive in ('sda', 'sdb', 'sdd'):
os.makedirs(os.path.join(self.root, drive, self.datadir))
for node in self._ring.devs:
daemon = self._run_once(node)
if node['device'] == 'sdc':
self.assertEqual(daemon._local_device_ids, set())
else:
self.assertEqual(daemon._local_device_ids,
set([node['id']]))
if __name__ == '__main__':
unittest.main()

View File

@ -24,6 +24,8 @@ import sqlite3
from swift.common import db_replicator
from swift.container import replicator, backend, server
from swift.container.reconciler import (
MISPLACED_OBJECTS_ACCOUNT, get_reconciler_container_name)
from swift.common.utils import normalize_timestamp
from swift.common.storage_policy import POLICIES
@ -664,6 +666,194 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
remote_broker.update_put_timestamp(remote_recreate_timestamp)
remote_broker.update_status_changed_at(remote_recreate_timestamp)
def test_sync_to_remote_with_misplaced(self):
ts = itertools.count(int(time.time()))
# create "local" broker
policy = random.choice(list(POLICIES))
broker = self._get_broker('a', 'c', node_index=0)
broker.initialize(normalize_timestamp(ts.next()),
policy.idx)
# create "remote" broker
remote_policy = random.choice([p for p in POLICIES if p is not
policy])
remote_broker = self._get_broker('a', 'c', node_index=1)
remote_broker.initialize(normalize_timestamp(ts.next()),
remote_policy.idx)
# add misplaced row to remote_broker
remote_broker.put_object(
'/a/c/o', normalize_timestamp(ts.next()), 0, 'content-type',
'etag', storage_policy_index=remote_broker.storage_policy_index)
# since this row matches policy index or remote, it shows up in count
self.assertEqual(remote_broker.get_info()['object_count'], 1)
self.assertEqual([], remote_broker.get_misplaced_since(-1, 1))
#replicate
part, node = self._get_broker_part_node(broker)
daemon = self._run_once(node)
# since our local broker has no rows to push it logs as no_change
self.assertEqual(1, daemon.stats['no_change'])
self.assertEqual(0, broker.get_info()['object_count'])
# remote broker updates it's policy index; this makes the remote
# broker's object count change
info = remote_broker.get_info()
expectations = {
'object_count': 0,
'storage_policy_index': policy.idx,
}
for key, value in expectations.items():
self.assertEqual(info[key], value)
# but it also knows those objects are misplaced now
misplaced = remote_broker.get_misplaced_since(-1, 100)
self.assertEqual(len(misplaced), 1)
# we also pushed out to node 3 with rsync
self.assertEqual(1, daemon.stats['rsync'])
third_broker = self._get_broker('a', 'c', node_index=2)
info = third_broker.get_info()
for key, value in expectations.items():
self.assertEqual(info[key], value)
def test_misplaced_rows_replicate_and_enqueue(self):
ts = itertools.count(int(time.time()))
policy = random.choice(list(POLICIES))
broker = self._get_broker('a', 'c', node_index=0)
broker.initialize(normalize_timestamp(ts.next()),
policy.idx)
remote_policy = random.choice([p for p in POLICIES if p is not
policy])
remote_broker = self._get_broker('a', 'c', node_index=1)
remote_broker.initialize(normalize_timestamp(ts.next()),
remote_policy.idx)
# add a misplaced row to *local* broker
obj_put_timestamp = normalize_timestamp(ts.next())
broker.put_object(
'o', obj_put_timestamp, 0, 'content-type',
'etag', storage_policy_index=remote_policy.idx)
misplaced = broker.get_misplaced_since(-1, 1)
self.assertEqual(len(misplaced), 1)
# since this row is misplaced it doesn't show up in count
self.assertEqual(broker.get_info()['object_count'], 0)
# replicate
part, node = self._get_broker_part_node(broker)
daemon = self._run_once(node)
# push to remote, and third node was missing (also maybe reconciler)
self.assert_(2 < daemon.stats['rsync'] <= 3)
# grab the rsynced instance of remote_broker
remote_broker = self._get_broker('a', 'c', node_index=1)
# remote has misplaced rows too now
misplaced = remote_broker.get_misplaced_since(-1, 1)
self.assertEqual(len(misplaced), 1)
# and the correct policy_index and object_count
info = remote_broker.get_info()
expectations = {
'object_count': 0,
'storage_policy_index': policy.idx,
}
for key, value in expectations.items():
self.assertEqual(info[key], value)
# and we should have also enqeued these rows in the reconciler
reconciler = daemon.get_reconciler_broker(misplaced[0]['created_at'])
# but it may not be on the same node as us anymore though...
reconciler = self._get_broker(reconciler.account,
reconciler.container, node_index=0)
self.assertEqual(reconciler.get_info()['object_count'], 1)
objects = reconciler.list_objects_iter(
1, '', None, None, None, None, storage_policy_index=0)
self.assertEqual(len(objects), 1)
expected = ('%s:/a/c/o' % remote_policy.idx, obj_put_timestamp, 0,
'application/x-put', obj_put_timestamp)
self.assertEqual(objects[0], expected)
# having safely enqueued to the reconciler we can advance
# our sync pointer
self.assertEqual(broker.get_reconciler_sync(), 1)
def test_multiple_out_sync_reconciler_enqueue_normalize(self):
ts = itertools.count(int(time.time()))
policy = random.choice(list(POLICIES))
broker = self._get_broker('a', 'c', node_index=0)
broker.initialize(normalize_timestamp(ts.next()), policy.idx)
remote_policy = random.choice([p for p in POLICIES if p is not
policy])
remote_broker = self._get_broker('a', 'c', node_index=1)
remote_broker.initialize(normalize_timestamp(ts.next()),
remote_policy.idx)
# add some rows to brokers
for db in (broker, remote_broker):
for p in (policy, remote_policy):
db.put_object('o-%s' % p.name, normalize_timestamp(ts.next()),
0, 'content-type', 'etag',
storage_policy_index=p.idx)
db._commit_puts()
expected_policy_stats = {
policy.idx: {'object_count': 1, 'bytes_used': 0},
remote_policy.idx: {'object_count': 1, 'bytes_used': 0},
}
for db in (broker, remote_broker):
policy_stats = db.get_policy_stats()
self.assertEqual(policy_stats, expected_policy_stats)
# each db has 2 rows, 4 total
all_items = set()
for db in (broker, remote_broker):
items = db.get_items_since(-1, 4)
all_items.update(
(item['name'], item['created_at']) for item in items)
self.assertEqual(4, len(all_items))
# replicate both ways
part, node = self._get_broker_part_node(broker)
self._run_once(node)
part, node = self._get_broker_part_node(remote_broker)
self._run_once(node)
# only the latest timestamps should survive
most_recent_items = {}
for name, timestamp in all_items:
most_recent_items[name] = max(
timestamp, most_recent_items.get(name, -1))
self.assertEqual(2, len(most_recent_items))
for db in (broker, remote_broker):
items = db.get_items_since(-1, 4)
self.assertEqual(len(items), len(most_recent_items))
for item in items:
self.assertEqual(most_recent_items[item['name']],
item['created_at'])
# and the reconciler also collapses updates
reconciler_containers = set()
for item in all_items:
_name, timestamp = item
reconciler_containers.add(
get_reconciler_container_name(timestamp))
reconciler_items = set()
for reconciler_container in reconciler_containers:
for node_index in range(3):
reconciler = self._get_broker(MISPLACED_OBJECTS_ACCOUNT,
reconciler_container,
node_index=node_index)
items = reconciler.get_items_since(-1, 4)
reconciler_items.update(
(item['name'], item['created_at']) for item in items)
# they can't *both* be in the wrong policy ;)
self.assertEqual(1, len(reconciler_items))
for reconciler_name, timestamp in reconciler_items:
_policy_index, path = reconciler_name.split(':', 1)
a, c, name = path.lstrip('/').split('/')
self.assertEqual(most_recent_items[name], timestamp)
if __name__ == '__main__':
unittest.main()