2759d5d51c
This patch adds a new object versioning mode. This new mode provides a new set of APIs for users to interact with older versions of an object. It also changes the naming scheme of older versions and adds a version-id to each object. This new mode is not backwards compatible or interchangeable with the other two modes (i.e., stack and history), especially due to the changes in the namimg scheme of older versions. This new mode will also serve as a foundation for adding S3 versioning compatibility in the s3api middleware. Note that this does not (yet) support using a versioned container as a source in container-sync. Container sync should be enhanced to sync previous versions of objects. Change-Id: Ic7d39ba425ca324eeb4543a2ce8d03428e2225a1 Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com> Co-Authored-By: Tim Burke <tim.burke@gmail.com> Co-Authored-By: Thiago da Silva <thiagodasilva@gmail.com>
2412 lines
114 KiB
Python
2412 lines
114 KiB
Python
# Copyright (c) 2017 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import shutil
|
|
import uuid
|
|
|
|
from nose import SkipTest
|
|
import six
|
|
from six.moves.urllib.parse import quote
|
|
|
|
from swift.common import direct_client, utils
|
|
from swift.common.manager import Manager
|
|
from swift.common.memcached import MemcacheRing
|
|
from swift.common.utils import ShardRange, parse_db_filename, get_db_files, \
|
|
quorum_size, config_true_value, Timestamp
|
|
from swift.container.backend import ContainerBroker, UNSHARDED, SHARDING
|
|
from swift.container.sharder import CleavingContext
|
|
from swiftclient import client, get_auth, ClientException
|
|
|
|
from swift.proxy.controllers.base import get_cache_key
|
|
from swift.proxy.controllers.obj import num_container_updates
|
|
from test import annotate_failure
|
|
from test.probe.brain import BrainSplitter
|
|
from test.probe.common import ReplProbeTest, get_server_number, \
|
|
wait_for_server_to_hangup
|
|
|
|
|
|
MIN_SHARD_CONTAINER_THRESHOLD = 4
|
|
MAX_SHARD_CONTAINER_THRESHOLD = 100
|
|
|
|
|
|
class ShardCollector(object):
|
|
"""
|
|
Returns map of node to tuples of (headers, shard ranges) returned from node
|
|
"""
|
|
def __init__(self):
|
|
self.ranges = {}
|
|
|
|
def __call__(self, cnode, cpart, account, container):
|
|
self.ranges[cnode['id']] = direct_client.direct_get_container(
|
|
cnode, cpart, account, container,
|
|
headers={'X-Backend-Record-Type': 'shard'})
|
|
|
|
|
|
class BaseTestContainerSharding(ReplProbeTest):
|
|
DELIM = '-'
|
|
|
|
def _maybe_skip_test(self):
|
|
try:
|
|
cont_configs = [utils.readconf(p, 'container-sharder')
|
|
for p in self.configs['container-server'].values()]
|
|
except ValueError:
|
|
raise SkipTest('No [container-sharder] section found in '
|
|
'container-server configs')
|
|
|
|
skip_reasons = []
|
|
auto_shard = all(config_true_value(c.get('auto_shard', False))
|
|
for c in cont_configs)
|
|
if not auto_shard:
|
|
skip_reasons.append(
|
|
'auto_shard must be true in all container_sharder configs')
|
|
|
|
self.max_shard_size = max(
|
|
int(c.get('shard_container_threshold', '1000000'))
|
|
for c in cont_configs)
|
|
|
|
if not (MIN_SHARD_CONTAINER_THRESHOLD <= self.max_shard_size
|
|
<= MAX_SHARD_CONTAINER_THRESHOLD):
|
|
skip_reasons.append(
|
|
'shard_container_threshold %d must be between %d and %d' %
|
|
(self.max_shard_size, MIN_SHARD_CONTAINER_THRESHOLD,
|
|
MAX_SHARD_CONTAINER_THRESHOLD))
|
|
|
|
def skip_check(reason_list, option, required):
|
|
values = {int(c.get(option, required)) for c in cont_configs}
|
|
if values != {required}:
|
|
reason_list.append('%s must be %s' % (option, required))
|
|
|
|
skip_check(skip_reasons, 'shard_scanner_batch_size', 10)
|
|
skip_check(skip_reasons, 'shard_batch_size', 2)
|
|
|
|
if skip_reasons:
|
|
raise SkipTest(', '.join(skip_reasons))
|
|
|
|
def _load_rings_and_configs(self):
|
|
super(BaseTestContainerSharding, self)._load_rings_and_configs()
|
|
# perform checks for skipping test before starting services
|
|
self._maybe_skip_test()
|
|
|
|
def _make_object_names(self, number, start=0):
|
|
return ['obj%s%04d' % (self.DELIM, x)
|
|
for x in range(start, start + number)]
|
|
|
|
def _setup_container_name(self):
|
|
# Container where we're PUTting objects
|
|
self.container_name = 'container%s%s' % (self.DELIM, uuid.uuid4())
|
|
|
|
def setUp(self):
|
|
client.logger.setLevel(client.logging.WARNING)
|
|
client.requests.logging.getLogger().setLevel(
|
|
client.requests.logging.WARNING)
|
|
super(BaseTestContainerSharding, self).setUp()
|
|
_, self.admin_token = get_auth(
|
|
'http://127.0.0.1:8080/auth/v1.0', 'admin:admin', 'admin')
|
|
self._setup_container_name()
|
|
self.init_brain(self.container_name)
|
|
self.sharders = Manager(['container-sharder'])
|
|
self.internal_client = self.make_internal_client()
|
|
self.memcache = MemcacheRing(['127.0.0.1:11211'])
|
|
|
|
def init_brain(self, container_name):
|
|
self.container_to_shard = container_name
|
|
self.brain = BrainSplitter(
|
|
self.url, self.token, self.container_to_shard,
|
|
None, 'container')
|
|
self.brain.put_container(policy_index=int(self.policy))
|
|
|
|
def stop_container_servers(self, node_numbers=None):
|
|
if node_numbers:
|
|
ipports = []
|
|
server2ipport = {v: k for k, v in self.ipport2server.items()}
|
|
for number in self.brain.node_numbers[node_numbers]:
|
|
self.brain.servers.stop(number=number)
|
|
server = 'container%d' % number
|
|
ipports.append(server2ipport[server])
|
|
else:
|
|
ipports = [k for k, v in self.ipport2server.items()
|
|
if v.startswith('container')]
|
|
self.brain.servers.stop()
|
|
for ipport in ipports:
|
|
wait_for_server_to_hangup(ipport)
|
|
|
|
def put_objects(self, obj_names, contents=None):
|
|
results = []
|
|
for obj in obj_names:
|
|
rdict = {}
|
|
client.put_object(self.url, token=self.token,
|
|
container=self.container_name, name=obj,
|
|
contents=contents, response_dict=rdict)
|
|
results.append((obj, rdict['headers'].get('x-object-version-id')))
|
|
return results
|
|
|
|
def delete_objects(self, obj_names_and_versions):
|
|
for obj in obj_names_and_versions:
|
|
if isinstance(obj, tuple):
|
|
obj, version = obj
|
|
client.delete_object(
|
|
self.url, self.token, self.container_name, obj,
|
|
query_string='version-id=%s' % version)
|
|
else:
|
|
client.delete_object(
|
|
self.url, self.token, self.container_name, obj)
|
|
|
|
def get_container_shard_ranges(self, account=None, container=None):
|
|
account = account if account else self.account
|
|
container = container if container else self.container_to_shard
|
|
path = self.internal_client.make_path(account, container)
|
|
resp = self.internal_client.make_request(
|
|
'GET', path + '?format=json', {'X-Backend-Record-Type': 'shard'},
|
|
[200])
|
|
return [ShardRange.from_dict(sr) for sr in json.loads(resp.body)]
|
|
|
|
def direct_get_container_shard_ranges(self, account=None, container=None,
|
|
expect_failure=False):
|
|
collector = ShardCollector()
|
|
self.direct_container_op(
|
|
collector, account, container, expect_failure)
|
|
return collector.ranges
|
|
|
|
def get_storage_dir(self, part, node, account=None, container=None):
|
|
account = account or self.brain.account
|
|
container = container or self.container_name
|
|
server_type, config_number = get_server_number(
|
|
(node['ip'], node['port']), self.ipport2server)
|
|
assert server_type == 'container'
|
|
repl_server = '%s-replicator' % server_type
|
|
conf = utils.readconf(self.configs[repl_server][config_number],
|
|
section_name=repl_server)
|
|
datadir = os.path.join(conf['devices'], node['device'], 'containers')
|
|
container_hash = utils.hash_path(account, container)
|
|
return (utils.storage_directory(datadir, part, container_hash),
|
|
container_hash)
|
|
|
|
def get_broker(self, part, node, account=None, container=None):
|
|
container_dir, container_hash = self.get_storage_dir(
|
|
part, node, account=account, container=container)
|
|
db_file = os.path.join(container_dir, container_hash + '.db')
|
|
self.assertTrue(get_db_files(db_file)) # sanity check
|
|
return ContainerBroker(db_file)
|
|
|
|
def categorize_container_dir_content(self, account=None, container=None):
|
|
account = account or self.brain.account
|
|
container = container or self.container_name
|
|
part, nodes = self.brain.ring.get_nodes(account, container)
|
|
storage_dirs = [
|
|
self.get_storage_dir(part, node, account=account,
|
|
container=container)[0]
|
|
for node in nodes]
|
|
result = {
|
|
'shard_dbs': [],
|
|
'normal_dbs': [],
|
|
'pendings': [],
|
|
'locks': [],
|
|
'other': [],
|
|
}
|
|
for storage_dir in storage_dirs:
|
|
for f in os.listdir(storage_dir):
|
|
path = os.path.join(storage_dir, f)
|
|
if path.endswith('.db'):
|
|
hash_, epoch, ext = parse_db_filename(path)
|
|
if epoch:
|
|
result['shard_dbs'].append(path)
|
|
else:
|
|
result['normal_dbs'].append(path)
|
|
elif path.endswith('.db.pending'):
|
|
result['pendings'].append(path)
|
|
elif path.endswith('/.lock'):
|
|
result['locks'].append(path)
|
|
else:
|
|
result['other'].append(path)
|
|
if result['other']:
|
|
self.fail('Found unexpected files in storage directory:\n %s' %
|
|
'\n '.join(result['other']))
|
|
return result
|
|
|
|
def assertLengthEqual(self, obj, length):
|
|
obj_len = len(obj)
|
|
self.assertEqual(obj_len, length, 'len(%r) == %d, not %d' % (
|
|
obj, obj_len, length))
|
|
|
|
def assert_dict_contains(self, expected_items, actual_dict):
|
|
ignored = set(expected_items) ^ set(actual_dict)
|
|
filtered_actual = {k: actual_dict[k]
|
|
for k in actual_dict if k not in ignored}
|
|
self.assertEqual(expected_items, filtered_actual)
|
|
|
|
def assert_shard_ranges_contiguous(self, expected_number, shard_ranges,
|
|
first_lower='', last_upper=''):
|
|
if shard_ranges and isinstance(shard_ranges[0], ShardRange):
|
|
actual_shard_ranges = sorted(shard_ranges)
|
|
else:
|
|
actual_shard_ranges = sorted(ShardRange.from_dict(d)
|
|
for d in shard_ranges)
|
|
self.assertLengthEqual(actual_shard_ranges, expected_number)
|
|
if expected_number:
|
|
with annotate_failure('Ranges %s.' % actual_shard_ranges):
|
|
self.assertEqual(first_lower, actual_shard_ranges[0].lower_str)
|
|
for x, y in zip(actual_shard_ranges, actual_shard_ranges[1:]):
|
|
self.assertEqual(x.upper, y.lower)
|
|
self.assertEqual(last_upper, actual_shard_ranges[-1].upper_str)
|
|
|
|
def assert_shard_range_equal(self, expected, actual, excludes=None):
|
|
excludes = excludes or []
|
|
expected_dict = dict(expected)
|
|
actual_dict = dict(actual)
|
|
for k in excludes:
|
|
expected_dict.pop(k, None)
|
|
actual_dict.pop(k, None)
|
|
self.assertEqual(expected_dict, actual_dict)
|
|
|
|
def assert_shard_range_lists_equal(self, expected, actual, excludes=None):
|
|
self.assertEqual(len(expected), len(actual))
|
|
for expected, actual in zip(expected, actual):
|
|
self.assert_shard_range_equal(expected, actual, excludes=excludes)
|
|
|
|
def assert_shard_range_state(self, expected_state, shard_ranges):
|
|
if shard_ranges and not isinstance(shard_ranges[0], ShardRange):
|
|
shard_ranges = [ShardRange.from_dict(data)
|
|
for data in shard_ranges]
|
|
self.assertEqual([expected_state] * len(shard_ranges),
|
|
[sr.state for sr in shard_ranges])
|
|
|
|
def assert_total_object_count(self, expected_object_count, shard_ranges):
|
|
actual = sum(sr['object_count'] for sr in shard_ranges)
|
|
self.assertEqual(expected_object_count, actual)
|
|
|
|
def assert_container_listing(self, expected_listing):
|
|
headers, actual_listing = client.get_container(
|
|
self.url, self.token, self.container_name)
|
|
self.assertIn('x-container-object-count', headers)
|
|
expected_obj_count = len(expected_listing)
|
|
self.assertEqual(expected_listing, [
|
|
x['name'].encode('utf-8') if six.PY2 else x['name']
|
|
for x in actual_listing])
|
|
self.assertEqual(str(expected_obj_count),
|
|
headers['x-container-object-count'])
|
|
return headers, actual_listing
|
|
|
|
def assert_container_object_count(self, expected_obj_count):
|
|
headers = client.head_container(
|
|
self.url, self.token, self.container_name)
|
|
self.assertIn('x-container-object-count', headers)
|
|
self.assertEqual(str(expected_obj_count),
|
|
headers['x-container-object-count'])
|
|
|
|
def assert_container_post_ok(self, meta_value):
|
|
key = 'X-Container-Meta-Assert-Post-Works'
|
|
headers = {key: meta_value}
|
|
client.post_container(
|
|
self.url, self.token, self.container_name, headers=headers)
|
|
resp_headers = client.head_container(
|
|
self.url, self.token, self.container_name)
|
|
self.assertEqual(meta_value, resp_headers.get(key.lower()))
|
|
|
|
def assert_container_post_fails(self, meta_value):
|
|
key = 'X-Container-Meta-Assert-Post-Works'
|
|
headers = {key: meta_value}
|
|
with self.assertRaises(ClientException) as cm:
|
|
client.post_container(
|
|
self.url, self.token, self.container_name, headers=headers)
|
|
self.assertEqual(404, cm.exception.http_status)
|
|
|
|
def assert_container_delete_fails(self):
|
|
with self.assertRaises(ClientException) as cm:
|
|
client.delete_container(self.url, self.token, self.container_name)
|
|
self.assertEqual(409, cm.exception.http_status)
|
|
|
|
def assert_container_not_found(self):
|
|
with self.assertRaises(ClientException) as cm:
|
|
client.get_container(self.url, self.token, self.container_name)
|
|
self.assertEqual(404, cm.exception.http_status)
|
|
# check for headers leaking out while deleted
|
|
resp_headers = cm.exception.http_response_headers
|
|
self.assertNotIn('X-Container-Object-Count', resp_headers)
|
|
self.assertNotIn('X-Container-Bytes-Used', resp_headers)
|
|
self.assertNotIn('X-Timestamp', resp_headers)
|
|
self.assertNotIn('X-PUT-Timestamp', resp_headers)
|
|
|
|
def assert_container_has_shard_sysmeta(self):
|
|
node_headers = self.direct_head_container()
|
|
for node_id, headers in node_headers.items():
|
|
with annotate_failure('%s in %s' % (node_id, node_headers.keys())):
|
|
for k, v in headers.items():
|
|
if k.lower().startswith('x-container-sysmeta-shard'):
|
|
break
|
|
else:
|
|
self.fail('No shard sysmeta found in %s' % headers)
|
|
|
|
def assert_container_state(self, node, expected_state, num_shard_ranges):
|
|
headers, shard_ranges = direct_client.direct_get_container(
|
|
node, self.brain.part, self.account, self.container_to_shard,
|
|
headers={'X-Backend-Record-Type': 'shard'})
|
|
self.assertEqual(num_shard_ranges, len(shard_ranges))
|
|
self.assertIn('X-Backend-Sharding-State', headers)
|
|
self.assertEqual(
|
|
expected_state, headers['X-Backend-Sharding-State'])
|
|
return [ShardRange.from_dict(sr) for sr in shard_ranges]
|
|
|
|
def get_part_and_node_numbers(self, shard_range):
|
|
"""Return the partition and node numbers for a shard range."""
|
|
part, nodes = self.brain.ring.get_nodes(
|
|
shard_range.account, shard_range.container)
|
|
return part, [n['id'] + 1 for n in nodes]
|
|
|
|
def run_sharders(self, shard_ranges):
|
|
"""Run the sharder on partitions for given shard ranges."""
|
|
if not isinstance(shard_ranges, (list, tuple, set)):
|
|
shard_ranges = (shard_ranges,)
|
|
partitions = ','.join(str(self.get_part_and_node_numbers(sr)[0])
|
|
for sr in shard_ranges)
|
|
self.sharders.once(additional_args='--partitions=%s' % partitions)
|
|
|
|
def run_sharder_sequentially(self, shard_range=None):
|
|
"""Run sharder node by node on partition for given shard range."""
|
|
if shard_range:
|
|
part, node_numbers = self.get_part_and_node_numbers(shard_range)
|
|
else:
|
|
part, node_numbers = self.brain.part, self.brain.node_numbers
|
|
for node_number in node_numbers:
|
|
self.sharders.once(number=node_number,
|
|
additional_args='--partitions=%s' % part)
|
|
|
|
|
|
class TestContainerShardingNonUTF8(BaseTestContainerSharding):
|
|
def test_sharding_listing(self):
|
|
# verify parameterised listing of a container during sharding
|
|
all_obj_names = self._make_object_names(4 * self.max_shard_size)
|
|
obj_names = all_obj_names[::2]
|
|
self.put_objects(obj_names)
|
|
# choose some names approx in middle of each expected shard range
|
|
markers = [
|
|
obj_names[i] for i in range(self.max_shard_size // 4,
|
|
2 * self.max_shard_size,
|
|
self.max_shard_size // 2)]
|
|
|
|
def check_listing(objects, **params):
|
|
qs = '&'.join('%s=%s' % (k, quote(str(v)))
|
|
for k, v in params.items())
|
|
headers, listing = client.get_container(
|
|
self.url, self.token, self.container_name, query_string=qs)
|
|
listing = [x['name'].encode('utf-8') if six.PY2 else x['name']
|
|
for x in listing]
|
|
if params.get('reverse'):
|
|
marker = params.get('marker', ShardRange.MAX)
|
|
end_marker = params.get('end_marker', ShardRange.MIN)
|
|
expected = [o for o in objects if end_marker < o < marker]
|
|
expected.reverse()
|
|
else:
|
|
marker = params.get('marker', ShardRange.MIN)
|
|
end_marker = params.get('end_marker', ShardRange.MAX)
|
|
expected = [o for o in objects if marker < o < end_marker]
|
|
if 'limit' in params:
|
|
expected = expected[:params['limit']]
|
|
self.assertEqual(expected, listing)
|
|
|
|
def check_listing_fails(exp_status, **params):
|
|
qs = '&'.join(['%s=%s' % param for param in params.items()])
|
|
with self.assertRaises(ClientException) as cm:
|
|
client.get_container(
|
|
self.url, self.token, self.container_name, query_string=qs)
|
|
self.assertEqual(exp_status, cm.exception.http_status)
|
|
return cm.exception
|
|
|
|
def do_listing_checks(objects):
|
|
check_listing(objects)
|
|
check_listing(objects, marker=markers[0], end_marker=markers[1])
|
|
check_listing(objects, marker=markers[0], end_marker=markers[2])
|
|
check_listing(objects, marker=markers[1], end_marker=markers[3])
|
|
check_listing(objects, marker=markers[1], end_marker=markers[3],
|
|
limit=self.max_shard_size // 4)
|
|
check_listing(objects, marker=markers[1], end_marker=markers[3],
|
|
limit=self.max_shard_size // 4)
|
|
check_listing(objects, marker=markers[1], end_marker=markers[2],
|
|
limit=self.max_shard_size // 2)
|
|
check_listing(objects, marker=markers[1], end_marker=markers[1])
|
|
check_listing(objects, reverse=True)
|
|
check_listing(objects, reverse=True, end_marker=markers[1])
|
|
check_listing(objects, reverse=True, marker=markers[3],
|
|
end_marker=markers[1],
|
|
limit=self.max_shard_size // 4)
|
|
check_listing(objects, reverse=True, marker=markers[3],
|
|
end_marker=markers[1], limit=0)
|
|
check_listing([], marker=markers[0], end_marker=markers[0])
|
|
check_listing([], marker=markers[0], end_marker=markers[1],
|
|
reverse=True)
|
|
check_listing(objects, prefix='obj')
|
|
check_listing([], prefix='zzz')
|
|
# delimiter
|
|
headers, listing = client.get_container(
|
|
self.url, self.token, self.container_name,
|
|
query_string='delimiter=' + quote(self.DELIM))
|
|
self.assertEqual([{'subdir': 'obj' + self.DELIM}], listing)
|
|
headers, listing = client.get_container(
|
|
self.url, self.token, self.container_name,
|
|
query_string='delimiter=j' + quote(self.DELIM))
|
|
self.assertEqual([{'subdir': 'obj' + self.DELIM}], listing)
|
|
|
|
limit = self.cluster_info['swift']['container_listing_limit']
|
|
exc = check_listing_fails(412, limit=limit + 1)
|
|
self.assertIn(b'Maximum limit', exc.http_response_content)
|
|
exc = check_listing_fails(400, delimiter='%ff')
|
|
self.assertIn(b'not valid UTF-8', exc.http_response_content)
|
|
|
|
# sanity checks
|
|
do_listing_checks(obj_names)
|
|
|
|
# Shard the container
|
|
client.post_container(self.url, self.admin_token, self.container_name,
|
|
headers={'X-Container-Sharding': 'on'})
|
|
# First run the 'leader' in charge of scanning, which finds all shard
|
|
# ranges and cleaves first two
|
|
self.sharders.once(number=self.brain.node_numbers[0],
|
|
additional_args='--partitions=%s' % self.brain.part)
|
|
# Then run sharder on other nodes which will also cleave first two
|
|
# shard ranges
|
|
for n in self.brain.node_numbers[1:]:
|
|
self.sharders.once(
|
|
number=n, additional_args='--partitions=%s' % self.brain.part)
|
|
|
|
# sanity check shard range states
|
|
for node in self.brain.nodes:
|
|
self.assert_container_state(node, 'sharding', 4)
|
|
shard_ranges = self.get_container_shard_ranges()
|
|
self.assertLengthEqual(shard_ranges, 4)
|
|
self.assert_shard_range_state(ShardRange.CLEAVED, shard_ranges[:2])
|
|
self.assert_shard_range_state(ShardRange.CREATED, shard_ranges[2:])
|
|
|
|
self.assert_container_delete_fails()
|
|
self.assert_container_has_shard_sysmeta() # confirm no sysmeta deleted
|
|
self.assert_container_post_ok('sharding')
|
|
do_listing_checks(obj_names)
|
|
|
|
# put some new objects spread through entire namespace
|
|
new_obj_names = all_obj_names[1::4]
|
|
self.put_objects(new_obj_names)
|
|
|
|
# new objects that fell into the first two cleaved shard ranges are
|
|
# reported in listing, new objects in the yet-to-be-cleaved shard
|
|
# ranges are not yet included in listing
|
|
exp_obj_names = [o for o in obj_names + new_obj_names
|
|
if o <= shard_ranges[1].upper]
|
|
exp_obj_names += [o for o in obj_names
|
|
if o > shard_ranges[1].upper]
|
|
exp_obj_names.sort()
|
|
do_listing_checks(exp_obj_names)
|
|
|
|
# run all the sharders again and the last two shard ranges get cleaved
|
|
self.sharders.once(additional_args='--partitions=%s' % self.brain.part)
|
|
for node in self.brain.nodes:
|
|
self.assert_container_state(node, 'sharded', 4)
|
|
shard_ranges = self.get_container_shard_ranges()
|
|
self.assert_shard_range_state(ShardRange.ACTIVE, shard_ranges)
|
|
|
|
exp_obj_names = obj_names + new_obj_names
|
|
exp_obj_names.sort()
|
|
do_listing_checks(exp_obj_names)
|
|
self.assert_container_delete_fails()
|
|
self.assert_container_has_shard_sysmeta()
|
|
self.assert_container_post_ok('sharded')
|
|
|
|
# delete original objects
|
|
self.delete_objects(obj_names)
|
|
do_listing_checks(new_obj_names)
|
|
self.assert_container_delete_fails()
|
|
self.assert_container_has_shard_sysmeta()
|
|
self.assert_container_post_ok('sharded')
|
|
|
|
|
|
class TestContainerShardingFunkyNames(TestContainerShardingNonUTF8):
|
|
DELIM = '\n'
|
|
|
|
def _make_object_names(self, number):
|
|
return ['obj\n%04d%%Ff' % x for x in range(number)]
|
|
|
|
def _setup_container_name(self):
|
|
self.container_name = 'container\n%%Ff\n%s' % uuid.uuid4()
|
|
|
|
|
|
class TestContainerShardingUTF8(TestContainerShardingNonUTF8):
|
|
def _make_object_names(self, number, start=0):
|
|
# override default with names that include non-ascii chars
|
|
name_length = self.cluster_info['swift']['max_object_name_length']
|
|
obj_names = []
|
|
for x in range(start, start + number):
|
|
name = (u'obj-\u00e4\u00ea\u00ec\u00f2\u00fb\u1234-%04d' % x)
|
|
name = name.encode('utf8').ljust(name_length, b'o')
|
|
if not six.PY2:
|
|
name = name.decode('utf8')
|
|
obj_names.append(name)
|
|
return obj_names
|
|
|
|
def _setup_container_name(self):
|
|
# override default with max length name that includes non-ascii chars
|
|
super(TestContainerShardingUTF8, self)._setup_container_name()
|
|
name_length = self.cluster_info['swift']['max_container_name_length']
|
|
cont_name = \
|
|
self.container_name + u'-\u00e4\u00ea\u00ec\u00f2\u00fb\u1234'
|
|
self.container_name = cont_name.encode('utf8').ljust(name_length, b'x')
|
|
if not six.PY2:
|
|
self.container_name = self.container_name.decode('utf8')
|
|
|
|
|
|
class TestContainerShardingObjectVersioning(BaseTestContainerSharding):
|
|
def _maybe_skip_test(self):
|
|
super(TestContainerShardingObjectVersioning, self)._maybe_skip_test()
|
|
try:
|
|
vw_config = utils.readconf(self.configs['proxy-server'],
|
|
'filter:versioned_writes')
|
|
except ValueError:
|
|
raise SkipTest('No [filter:versioned_writes] section found in '
|
|
'proxy-server configs')
|
|
allow_object_versioning = config_true_value(
|
|
vw_config.get('allow_object_versioning', False))
|
|
if not allow_object_versioning:
|
|
raise SkipTest('allow_object_versioning must be true '
|
|
'in all versioned_writes configs')
|
|
|
|
def init_brain(self, container_name):
|
|
client.put_container(self.url, self.token, container_name, headers={
|
|
'X-Storage-Policy': self.policy.name,
|
|
'X-Versions-Enabled': 'true',
|
|
})
|
|
self.container_to_shard = '\x00versions\x00' + container_name
|
|
self.brain = BrainSplitter(
|
|
self.url, self.token, self.container_to_shard,
|
|
None, 'container')
|
|
|
|
def test_sharding_listing(self):
|
|
# verify parameterised listing of a container during sharding
|
|
all_obj_names = self._make_object_names(3) * self.max_shard_size
|
|
all_obj_names.extend(self._make_object_names(self.max_shard_size,
|
|
start=3))
|
|
obj_names = all_obj_names[::2]
|
|
obj_names_and_versions = self.put_objects(obj_names)
|
|
|
|
def sort_key(obj_and_ver):
|
|
obj, ver = obj_and_ver
|
|
return obj, ~Timestamp(ver)
|
|
|
|
obj_names_and_versions.sort(key=sort_key)
|
|
# choose some names approx in middle of each expected shard range
|
|
markers = [
|
|
obj_names_and_versions[i]
|
|
for i in range(self.max_shard_size // 4,
|
|
2 * self.max_shard_size,
|
|
self.max_shard_size // 2)]
|
|
|
|
def check_listing(objects, **params):
|
|
params['versions'] = ''
|
|
qs = '&'.join('%s=%s' % param for param in params.items())
|
|
headers, listing = client.get_container(
|
|
self.url, self.token, self.container_name, query_string=qs)
|
|
listing = [(x['name'].encode('utf-8') if six.PY2 else x['name'],
|
|
x['version_id'])
|
|
for x in listing]
|
|
if params.get('reverse'):
|
|
marker = (
|
|
params.get('marker', ShardRange.MAX),
|
|
~Timestamp(params['version_marker'])
|
|
if 'version_marker' in params else ~Timestamp('0'),
|
|
)
|
|
end_marker = (
|
|
params.get('end_marker', ShardRange.MIN),
|
|
Timestamp('0'),
|
|
)
|
|
expected = [o for o in objects
|
|
if end_marker < sort_key(o) < marker]
|
|
expected.reverse()
|
|
else:
|
|
marker = (
|
|
params.get('marker', ShardRange.MIN),
|
|
~Timestamp(params['version_marker'])
|
|
if 'version_marker' in params else Timestamp('0'),
|
|
)
|
|
end_marker = (
|
|
params.get('end_marker', ShardRange.MAX),
|
|
~Timestamp('0'),
|
|
)
|
|
expected = [o for o in objects
|
|
if marker < sort_key(o) < end_marker]
|
|
if 'limit' in params:
|
|
expected = expected[:params['limit']]
|
|
self.assertEqual(expected, listing)
|
|
|
|
def check_listing_fails(exp_status, **params):
|
|
params['versions'] = ''
|
|
qs = '&'.join('%s=%s' % param for param in params.items())
|
|
with self.assertRaises(ClientException) as cm:
|
|
client.get_container(
|
|
self.url, self.token, self.container_name, query_string=qs)
|
|
self.assertEqual(exp_status, cm.exception.http_status)
|
|
return cm.exception
|
|
|
|
def do_listing_checks(objects):
|
|
check_listing(objects)
|
|
check_listing(objects,
|
|
marker=markers[0][0], version_marker=markers[0][1])
|
|
check_listing(objects,
|
|
marker=markers[0][0], version_marker=markers[0][1],
|
|
limit=self.max_shard_size // 10)
|
|
check_listing(objects,
|
|
marker=markers[0][0], version_marker=markers[0][1],
|
|
limit=self.max_shard_size // 4)
|
|
check_listing(objects,
|
|
marker=markers[0][0], version_marker=markers[0][1],
|
|
limit=self.max_shard_size // 2)
|
|
check_listing(objects,
|
|
marker=markers[1][0], version_marker=markers[1][1])
|
|
check_listing(objects,
|
|
marker=markers[1][0], version_marker=markers[1][1],
|
|
limit=self.max_shard_size // 10)
|
|
check_listing(objects,
|
|
marker=markers[2][0], version_marker=markers[2][1],
|
|
limit=self.max_shard_size // 4)
|
|
check_listing(objects,
|
|
marker=markers[2][0], version_marker=markers[2][1],
|
|
limit=self.max_shard_size // 2)
|
|
check_listing(objects, reverse=True)
|
|
check_listing(objects, reverse=True,
|
|
marker=markers[1][0], version_marker=markers[1][1])
|
|
|
|
check_listing(objects, prefix='obj')
|
|
check_listing([], prefix='zzz')
|
|
# delimiter
|
|
headers, listing = client.get_container(
|
|
self.url, self.token, self.container_name,
|
|
query_string='delimiter=-')
|
|
self.assertEqual([{'subdir': 'obj-'}], listing)
|
|
headers, listing = client.get_container(
|
|
self.url, self.token, self.container_name,
|
|
query_string='delimiter=j-')
|
|
self.assertEqual([{'subdir': 'obj-'}], listing)
|
|
|
|
limit = self.cluster_info['swift']['container_listing_limit']
|
|
exc = check_listing_fails(412, limit=limit + 1)
|
|
self.assertIn(b'Maximum limit', exc.http_response_content)
|
|
exc = check_listing_fails(400, delimiter='%ff')
|
|
self.assertIn(b'not valid UTF-8', exc.http_response_content)
|
|
|
|
# sanity checks
|
|
do_listing_checks(obj_names_and_versions)
|
|
|
|
# Shard the container. Use an internal_client so we get an implicit
|
|
# X-Backend-Allow-Reserved-Names header
|
|
self.internal_client.set_container_metadata(
|
|
self.account, self.container_to_shard, {
|
|
'X-Container-Sysmeta-Sharding': 'True',
|
|
})
|
|
# First run the 'leader' in charge of scanning, which finds all shard
|
|
# ranges and cleaves first two
|
|
self.sharders.once(number=self.brain.node_numbers[0],
|
|
additional_args='--partitions=%s' % self.brain.part)
|
|
# Then run sharder on other nodes which will also cleave first two
|
|
# shard ranges
|
|
for n in self.brain.node_numbers[1:]:
|
|
self.sharders.once(
|
|
number=n, additional_args='--partitions=%s' % self.brain.part)
|
|
|
|
# sanity check shard range states
|
|
for node in self.brain.nodes:
|
|
self.assert_container_state(node, 'sharding', 4)
|
|
shard_ranges = self.get_container_shard_ranges()
|
|
self.assertLengthEqual(shard_ranges, 4)
|
|
self.assert_shard_range_state(ShardRange.CLEAVED, shard_ranges[:2])
|
|
self.assert_shard_range_state(ShardRange.CREATED, shard_ranges[2:])
|
|
|
|
self.assert_container_delete_fails()
|
|
self.assert_container_has_shard_sysmeta() # confirm no sysmeta deleted
|
|
self.assert_container_post_ok('sharding')
|
|
do_listing_checks(obj_names_and_versions)
|
|
|
|
# put some new objects spread through entire namespace
|
|
new_obj_names = all_obj_names[1::4]
|
|
new_obj_names_and_versions = self.put_objects(new_obj_names)
|
|
|
|
# new objects that fell into the first two cleaved shard ranges are
|
|
# reported in listing, new objects in the yet-to-be-cleaved shard
|
|
# ranges are not yet included in listing
|
|
exp_obj_names_and_versions = [
|
|
o for o in obj_names_and_versions + new_obj_names_and_versions
|
|
if '\x00' + o[0] <= shard_ranges[1].upper]
|
|
exp_obj_names_and_versions += [
|
|
o for o in obj_names_and_versions
|
|
if '\x00' + o[0] > shard_ranges[1].upper]
|
|
exp_obj_names_and_versions.sort(key=sort_key)
|
|
do_listing_checks(exp_obj_names_and_versions)
|
|
|
|
# run all the sharders again and the last two shard ranges get cleaved
|
|
self.sharders.once(additional_args='--partitions=%s' % self.brain.part)
|
|
for node in self.brain.nodes:
|
|
self.assert_container_state(node, 'sharded', 4)
|
|
shard_ranges = self.get_container_shard_ranges()
|
|
self.assert_shard_range_state(ShardRange.ACTIVE, shard_ranges)
|
|
|
|
exp_obj_names_and_versions = \
|
|
obj_names_and_versions + new_obj_names_and_versions
|
|
exp_obj_names_and_versions.sort(key=sort_key)
|
|
do_listing_checks(exp_obj_names_and_versions)
|
|
self.assert_container_delete_fails()
|
|
self.assert_container_has_shard_sysmeta()
|
|
self.assert_container_post_ok('sharded')
|
|
|
|
# delete original objects
|
|
self.delete_objects(obj_names_and_versions)
|
|
new_obj_names_and_versions.sort(key=sort_key)
|
|
do_listing_checks(new_obj_names_and_versions)
|
|
self.assert_container_delete_fails()
|
|
self.assert_container_has_shard_sysmeta()
|
|
self.assert_container_post_ok('sharded')
|
|
|
|
|
|
class TestContainerSharding(BaseTestContainerSharding):
|
|
def _test_sharded_listing(self, run_replicators=False):
|
|
obj_names = self._make_object_names(self.max_shard_size)
|
|
self.put_objects(obj_names)
|
|
|
|
# Verify that we start out with normal DBs, no shards
|
|
found = self.categorize_container_dir_content()
|
|
self.assertLengthEqual(found['normal_dbs'], 3)
|
|
self.assertLengthEqual(found['shard_dbs'], 0)
|
|
for db_file in found['normal_dbs']:
|
|
broker = ContainerBroker(db_file)
|
|
self.assertIs(True, broker.is_root_container())
|
|
self.assertEqual('unsharded', broker.get_db_state())
|
|
self.assertLengthEqual(broker.get_shard_ranges(), 0)
|
|
|
|
headers, pre_sharding_listing = client.get_container(
|
|
self.url, self.token, self.container_name)
|
|
self.assertEqual(obj_names, [
|
|
x['name'].encode('utf-8') if six.PY2 else x['name']
|
|
for x in pre_sharding_listing]) # sanity
|
|
|
|
# Shard it
|
|
client.post_container(self.url, self.admin_token, self.container_name,
|
|
headers={'X-Container-Sharding': 'on'})
|
|
pre_sharding_headers = client.head_container(
|
|
self.url, self.admin_token, self.container_name)
|
|
self.assertEqual('True',
|
|
pre_sharding_headers.get('x-container-sharding'))
|
|
|
|
# Only run the one in charge of scanning
|
|
self.sharders.once(number=self.brain.node_numbers[0],
|
|
additional_args='--partitions=%s' % self.brain.part)
|
|
|
|
# Verify that we have one sharded db -- though the other normal DBs
|
|
# received the shard ranges that got defined
|
|
found = self.categorize_container_dir_content()
|
|
self.assertLengthEqual(found['shard_dbs'], 1)
|
|
broker = self.get_broker(self.brain.part, self.brain.nodes[0])
|
|
# sanity check - the shard db is on replica 0
|
|
self.assertEqual(found['shard_dbs'][0], broker.db_file)
|
|
self.assertIs(True, broker.is_root_container())
|
|
self.assertEqual('sharded', broker.get_db_state())
|
|
orig_root_shard_ranges = [dict(sr) for sr in broker.get_shard_ranges()]
|
|
self.assertLengthEqual(orig_root_shard_ranges, 2)
|
|
self.assert_total_object_count(len(obj_names), orig_root_shard_ranges)
|
|
self.assert_shard_ranges_contiguous(2, orig_root_shard_ranges)
|
|
self.assertEqual([ShardRange.ACTIVE, ShardRange.ACTIVE],
|
|
[sr['state'] for sr in orig_root_shard_ranges])
|
|
contexts = list(CleavingContext.load_all(broker))
|
|
self.assertEqual([], contexts) # length check
|
|
self.direct_delete_container(expect_failure=True)
|
|
|
|
self.assertLengthEqual(found['normal_dbs'], 2)
|
|
for db_file in found['normal_dbs']:
|
|
broker = ContainerBroker(db_file)
|
|
self.assertIs(True, broker.is_root_container())
|
|
self.assertEqual('unsharded', broker.get_db_state())
|
|
shard_ranges = [dict(sr) for sr in broker.get_shard_ranges()]
|
|
self.assertEqual([ShardRange.CREATED, ShardRange.CREATED],
|
|
[sr['state'] for sr in shard_ranges])
|
|
# the sharded db had shard range meta_timestamps and state updated
|
|
# during cleaving, so we do not expect those to be equal on other
|
|
# nodes
|
|
self.assert_shard_range_lists_equal(
|
|
orig_root_shard_ranges, shard_ranges,
|
|
excludes=['meta_timestamp', 'state', 'state_timestamp'])
|
|
|
|
contexts = list(CleavingContext.load_all(broker))
|
|
self.assertEqual([], contexts) # length check
|
|
|
|
if run_replicators:
|
|
Manager(['container-replicator']).once()
|
|
# replication doesn't change the db file names
|
|
found = self.categorize_container_dir_content()
|
|
self.assertLengthEqual(found['shard_dbs'], 1)
|
|
self.assertLengthEqual(found['normal_dbs'], 2)
|
|
|
|
# Now that everyone has shard ranges, run *everyone*
|
|
self.sharders.once(additional_args='--partitions=%s' % self.brain.part)
|
|
|
|
# Verify that we only have shard dbs now
|
|
found = self.categorize_container_dir_content()
|
|
self.assertLengthEqual(found['shard_dbs'], 3)
|
|
self.assertLengthEqual(found['normal_dbs'], 0)
|
|
# Shards stayed the same
|
|
for db_file in found['shard_dbs']:
|
|
broker = ContainerBroker(db_file)
|
|
self.assertIs(True, broker.is_root_container())
|
|
self.assertEqual('sharded', broker.get_db_state())
|
|
# Well, except for meta_timestamps, since the shards each reported
|
|
self.assert_shard_range_lists_equal(
|
|
orig_root_shard_ranges, broker.get_shard_ranges(),
|
|
excludes=['meta_timestamp', 'state_timestamp'])
|
|
for orig, updated in zip(orig_root_shard_ranges,
|
|
broker.get_shard_ranges()):
|
|
self.assertGreaterEqual(updated.state_timestamp,
|
|
orig['state_timestamp'])
|
|
self.assertGreaterEqual(updated.meta_timestamp,
|
|
orig['meta_timestamp'])
|
|
|
|
contexts = list(CleavingContext.load_all(broker))
|
|
self.assertEqual([], contexts) # length check
|
|
|
|
# Check that entire listing is available
|
|
headers, actual_listing = self.assert_container_listing(obj_names)
|
|
# ... and check some other container properties
|
|
self.assertEqual(headers['last-modified'],
|
|
pre_sharding_headers['last-modified'])
|
|
|
|
# It even works in reverse!
|
|
headers, listing = client.get_container(self.url, self.token,
|
|
self.container_name,
|
|
query_string='reverse=on')
|
|
self.assertEqual(pre_sharding_listing[::-1], listing)
|
|
|
|
# Now put some new objects into first shard, taking its count to
|
|
# 3 shard ranges' worth
|
|
more_obj_names = [
|
|
'beta%03d' % x for x in range(self.max_shard_size)]
|
|
self.put_objects(more_obj_names)
|
|
|
|
# The listing includes new objects...
|
|
headers, listing = self.assert_container_listing(
|
|
more_obj_names + obj_names)
|
|
self.assertEqual(pre_sharding_listing, listing[len(more_obj_names):])
|
|
|
|
# ...but root object count is out of date until the sharders run and
|
|
# update the root
|
|
self.assert_container_object_count(len(obj_names))
|
|
|
|
# run sharders on the shard to get root updated
|
|
shard_1 = ShardRange.from_dict(orig_root_shard_ranges[0])
|
|
self.run_sharders(shard_1)
|
|
self.assert_container_object_count(len(more_obj_names + obj_names))
|
|
|
|
# we've added objects enough that we need to shard the first shard
|
|
# *again* into three new sub-shards, but nothing happens until the root
|
|
# leader identifies shard candidate...
|
|
root_shard_ranges = self.direct_get_container_shard_ranges()
|
|
for node, (hdrs, root_shards) in root_shard_ranges.items():
|
|
self.assertLengthEqual(root_shards, 2)
|
|
with annotate_failure('node %s. ' % node):
|
|
self.assertEqual(
|
|
[ShardRange.ACTIVE] * 2,
|
|
[sr['state'] for sr in root_shards])
|
|
# orig shards 0, 1 should be contiguous
|
|
self.assert_shard_ranges_contiguous(2, root_shards)
|
|
|
|
# Now run the root leader to identify shard candidate...while one of
|
|
# the shard container servers is down
|
|
shard_1_part, shard_1_nodes = self.get_part_and_node_numbers(shard_1)
|
|
self.brain.servers.stop(number=shard_1_nodes[2])
|
|
self.sharders.once(number=self.brain.node_numbers[0],
|
|
additional_args='--partitions=%s' % self.brain.part)
|
|
|
|
# ... so third replica of first shard state is not moved to sharding
|
|
found_for_shard = self.categorize_container_dir_content(
|
|
shard_1.account, shard_1.container)
|
|
self.assertLengthEqual(found_for_shard['normal_dbs'], 3)
|
|
self.assertEqual(
|
|
[ShardRange.SHARDING, ShardRange.SHARDING, ShardRange.ACTIVE],
|
|
[ContainerBroker(db_file).get_own_shard_range().state
|
|
for db_file in found_for_shard['normal_dbs']])
|
|
|
|
# ...then run first cycle of first shard sharders in order, leader
|
|
# first, to get to predictable state where all nodes have cleaved 2 out
|
|
# of 3 ranges...starting with first two nodes
|
|
for node_number in shard_1_nodes[:2]:
|
|
self.sharders.once(
|
|
number=node_number,
|
|
additional_args='--partitions=%s' % shard_1_part)
|
|
|
|
# ... first two replicas start sharding to sub-shards
|
|
found_for_shard = self.categorize_container_dir_content(
|
|
shard_1.account, shard_1.container)
|
|
self.assertLengthEqual(found_for_shard['shard_dbs'], 2)
|
|
for db_file in found_for_shard['shard_dbs'][:2]:
|
|
broker = ContainerBroker(db_file)
|
|
with annotate_failure('shard db file %s. ' % db_file):
|
|
self.assertIs(False, broker.is_root_container())
|
|
self.assertEqual('sharding', broker.get_db_state())
|
|
self.assertEqual(
|
|
ShardRange.SHARDING, broker.get_own_shard_range().state)
|
|
shard_shards = broker.get_shard_ranges()
|
|
self.assertEqual(
|
|
[ShardRange.CLEAVED, ShardRange.CLEAVED,
|
|
ShardRange.CREATED],
|
|
[sr.state for sr in shard_shards])
|
|
self.assert_shard_ranges_contiguous(
|
|
3, shard_shards,
|
|
first_lower=orig_root_shard_ranges[0]['lower'],
|
|
last_upper=orig_root_shard_ranges[0]['upper'])
|
|
|
|
contexts = list(CleavingContext.load_all(broker))
|
|
self.assertEqual(len(contexts), 1)
|
|
context, _lm = contexts[0]
|
|
self.assertIs(context.cleaving_done, False)
|
|
self.assertIs(context.misplaced_done, True)
|
|
self.assertEqual(context.ranges_done, 2)
|
|
self.assertEqual(context.ranges_todo, 1)
|
|
self.assertEqual(context.max_row,
|
|
self.max_shard_size * 3 // 2)
|
|
|
|
# but third replica still has no idea it should be sharding
|
|
self.assertLengthEqual(found_for_shard['normal_dbs'], 3)
|
|
self.assertEqual(
|
|
ShardRange.ACTIVE,
|
|
ContainerBroker(
|
|
found_for_shard['normal_dbs'][2]).get_own_shard_range().state)
|
|
|
|
# ...but once sharder runs on third replica it will learn its state;
|
|
# note that any root replica on the stopped container server also won't
|
|
# know about the shards being in sharding state, so leave that server
|
|
# stopped for now so that shard fetches its state from an up-to-date
|
|
# root replica
|
|
self.sharders.once(
|
|
number=shard_1_nodes[2],
|
|
additional_args='--partitions=%s' % shard_1_part)
|
|
|
|
# third replica is sharding but has no sub-shard ranges yet...
|
|
found_for_shard = self.categorize_container_dir_content(
|
|
shard_1.account, shard_1.container)
|
|
self.assertLengthEqual(found_for_shard['shard_dbs'], 2)
|
|
self.assertLengthEqual(found_for_shard['normal_dbs'], 3)
|
|
broker = ContainerBroker(found_for_shard['normal_dbs'][2])
|
|
self.assertEqual('unsharded', broker.get_db_state())
|
|
self.assertEqual(
|
|
ShardRange.SHARDING, broker.get_own_shard_range().state)
|
|
self.assertFalse(broker.get_shard_ranges())
|
|
|
|
contexts = list(CleavingContext.load_all(broker))
|
|
self.assertEqual([], contexts) # length check
|
|
|
|
# ...until sub-shard ranges are replicated from another shard replica;
|
|
# there may also be a sub-shard replica missing so run replicators on
|
|
# all nodes to fix that if necessary
|
|
self.brain.servers.start(number=shard_1_nodes[2])
|
|
self.replicators.once()
|
|
|
|
# Now that the replicators have all run, third replica sees cleaving
|
|
# contexts for the first two
|
|
contexts = list(CleavingContext.load_all(broker))
|
|
self.assertEqual(len(contexts), 2)
|
|
|
|
# now run sharder again on third replica
|
|
self.sharders.once(
|
|
number=shard_1_nodes[2],
|
|
additional_args='--partitions=%s' % shard_1_part)
|
|
sharding_broker = ContainerBroker(found_for_shard['normal_dbs'][2])
|
|
self.assertEqual('sharding', sharding_broker.get_db_state())
|
|
|
|
broker_id = broker.get_info()['id']
|
|
# Old, unsharded DB doesn't have the context...
|
|
contexts = list(CleavingContext.load_all(broker))
|
|
self.assertEqual(len(contexts), 2)
|
|
self.assertNotIn(broker_id, [ctx[0].ref for ctx in contexts])
|
|
# ...but the sharding one does
|
|
contexts = list(CleavingContext.load_all(sharding_broker))
|
|
self.assertEqual(len(contexts), 3)
|
|
self.assertIn(broker_id, [ctx[0].ref for ctx in contexts])
|
|
|
|
# check original first shard range state and sub-shards - all replicas
|
|
# should now be in consistent state
|
|
found_for_shard = self.categorize_container_dir_content(
|
|
shard_1.account, shard_1.container)
|
|
self.assertLengthEqual(found_for_shard['shard_dbs'], 3)
|
|
self.assertLengthEqual(found_for_shard['normal_dbs'], 3)
|
|
for db_file in found_for_shard['shard_dbs']:
|
|
broker = ContainerBroker(db_file)
|
|
with annotate_failure('shard db file %s. ' % db_file):
|
|
self.assertIs(False, broker.is_root_container())
|
|
self.assertEqual('sharding', broker.get_db_state())
|
|
self.assertEqual(
|
|
ShardRange.SHARDING, broker.get_own_shard_range().state)
|
|
shard_shards = broker.get_shard_ranges()
|
|
self.assertEqual(
|
|
[ShardRange.CLEAVED, ShardRange.CLEAVED,
|
|
ShardRange.CREATED],
|
|
[sr.state for sr in shard_shards])
|
|
self.assert_shard_ranges_contiguous(
|
|
3, shard_shards,
|
|
first_lower=orig_root_shard_ranges[0]['lower'],
|
|
last_upper=orig_root_shard_ranges[0]['upper'])
|
|
|
|
# check third sub-shard is in created state
|
|
sub_shard = shard_shards[2]
|
|
found_for_sub_shard = self.categorize_container_dir_content(
|
|
sub_shard.account, sub_shard.container)
|
|
self.assertFalse(found_for_sub_shard['shard_dbs'])
|
|
self.assertLengthEqual(found_for_sub_shard['normal_dbs'], 3)
|
|
for db_file in found_for_sub_shard['normal_dbs']:
|
|
broker = ContainerBroker(db_file)
|
|
with annotate_failure('sub shard db file %s. ' % db_file):
|
|
self.assertIs(False, broker.is_root_container())
|
|
self.assertEqual('unsharded', broker.get_db_state())
|
|
self.assertEqual(
|
|
ShardRange.CREATED, broker.get_own_shard_range().state)
|
|
self.assertFalse(broker.get_shard_ranges())
|
|
|
|
# check root shard ranges
|
|
root_shard_ranges = self.direct_get_container_shard_ranges()
|
|
for node, (hdrs, root_shards) in root_shard_ranges.items():
|
|
self.assertLengthEqual(root_shards, 5)
|
|
with annotate_failure('node %s. ' % node):
|
|
# shard ranges are sorted by upper, state, lower, so expect:
|
|
# sub-shards, orig shard 0, orig shard 1
|
|
self.assertEqual(
|
|
[ShardRange.CLEAVED, ShardRange.CLEAVED,
|
|
ShardRange.CREATED, ShardRange.SHARDING,
|
|
ShardRange.ACTIVE],
|
|
[sr['state'] for sr in root_shards])
|
|
# sub-shards 0, 1, 2, orig shard 1 should be contiguous
|
|
self.assert_shard_ranges_contiguous(
|
|
4, root_shards[:3] + root_shards[4:])
|
|
# orig shards 0, 1 should be contiguous
|
|
self.assert_shard_ranges_contiguous(2, root_shards[3:])
|
|
|
|
self.assert_container_listing(more_obj_names + obj_names)
|
|
self.assert_container_object_count(len(more_obj_names + obj_names))
|
|
|
|
# Before writing, kill the cache
|
|
self.memcache.delete(get_cache_key(
|
|
self.account, self.container_name, shard='updating'))
|
|
# add another object that lands in the first of the new sub-shards
|
|
self.put_objects(['alpha'])
|
|
|
|
# check that alpha object is in the first new shard
|
|
shard_listings = self.direct_get_container(shard_shards[0].account,
|
|
shard_shards[0].container)
|
|
for node, (hdrs, listing) in shard_listings.items():
|
|
with annotate_failure(node):
|
|
self.assertIn('alpha', [o['name'] for o in listing])
|
|
self.assert_container_listing(['alpha'] + more_obj_names + obj_names)
|
|
# Run sharders again so things settle.
|
|
self.run_sharders(shard_1)
|
|
# Also run replicators to settle cleaving contexts
|
|
self.replicators.once()
|
|
|
|
# check original first shard range shards
|
|
for db_file in found_for_shard['shard_dbs']:
|
|
broker = ContainerBroker(db_file)
|
|
with annotate_failure('shard db file %s. ' % db_file):
|
|
self.assertIs(False, broker.is_root_container())
|
|
self.assertEqual('sharded', broker.get_db_state())
|
|
self.assertEqual(
|
|
[ShardRange.ACTIVE] * 3,
|
|
[sr.state for sr in broker.get_shard_ranges()])
|
|
|
|
# Make sure our cleaving contexts got cleaned up
|
|
contexts = list(CleavingContext.load_all(broker))
|
|
self.assertEqual([], contexts)
|
|
|
|
# check root shard ranges
|
|
root_shard_ranges = self.direct_get_container_shard_ranges()
|
|
for node, (hdrs, root_shards) in root_shard_ranges.items():
|
|
# old first shard range should have been deleted
|
|
self.assertLengthEqual(root_shards, 4)
|
|
with annotate_failure('node %s. ' % node):
|
|
self.assertEqual(
|
|
[ShardRange.ACTIVE] * 4,
|
|
[sr['state'] for sr in root_shards])
|
|
self.assert_shard_ranges_contiguous(4, root_shards)
|
|
|
|
headers, final_listing = self.assert_container_listing(
|
|
['alpha'] + more_obj_names + obj_names)
|
|
|
|
# check root
|
|
found = self.categorize_container_dir_content()
|
|
self.assertLengthEqual(found['shard_dbs'], 3)
|
|
self.assertLengthEqual(found['normal_dbs'], 0)
|
|
new_shard_ranges = None
|
|
for db_file in found['shard_dbs']:
|
|
broker = ContainerBroker(db_file)
|
|
self.assertIs(True, broker.is_root_container())
|
|
self.assertEqual('sharded', broker.get_db_state())
|
|
if new_shard_ranges is None:
|
|
new_shard_ranges = broker.get_shard_ranges(
|
|
include_deleted=True)
|
|
self.assertLengthEqual(new_shard_ranges, 5)
|
|
# Second half is still there, and unchanged
|
|
self.assertIn(
|
|
dict(orig_root_shard_ranges[1], meta_timestamp=None,
|
|
state_timestamp=None),
|
|
[dict(sr, meta_timestamp=None, state_timestamp=None)
|
|
for sr in new_shard_ranges])
|
|
# But the first half split in three, then deleted
|
|
by_name = {sr.name: sr for sr in new_shard_ranges}
|
|
self.assertIn(orig_root_shard_ranges[0]['name'], by_name)
|
|
old_shard_range = by_name.pop(
|
|
orig_root_shard_ranges[0]['name'])
|
|
self.assertTrue(old_shard_range.deleted)
|
|
self.assert_shard_ranges_contiguous(4, list(by_name.values()))
|
|
else:
|
|
# Everyone's on the same page. Well, except for
|
|
# meta_timestamps, since the shards each reported
|
|
other_shard_ranges = broker.get_shard_ranges(
|
|
include_deleted=True)
|
|
self.assert_shard_range_lists_equal(
|
|
new_shard_ranges, other_shard_ranges,
|
|
excludes=['meta_timestamp', 'state_timestamp'])
|
|
for orig, updated in zip(orig_root_shard_ranges,
|
|
other_shard_ranges):
|
|
self.assertGreaterEqual(updated.meta_timestamp,
|
|
orig['meta_timestamp'])
|
|
|
|
self.assert_container_delete_fails()
|
|
|
|
for obj in final_listing:
|
|
client.delete_object(
|
|
self.url, self.token, self.container_name, obj['name'])
|
|
|
|
# the objects won't be listed anymore
|
|
self.assert_container_listing([])
|
|
# but root container stats will not yet be aware of the deletions
|
|
self.assert_container_delete_fails()
|
|
|
|
# One server was down while the shard sharded its first two sub-shards,
|
|
# so there may be undeleted handoff db(s) for sub-shard(s) that were
|
|
# not fully replicated; run replicators now to clean up so they no
|
|
# longer report bogus stats to root.
|
|
self.replicators.once()
|
|
|
|
# Run sharder so that shard containers update the root. Do not run
|
|
# sharder on root container because that triggers shrinks which can
|
|
# cause root object count to temporarily be non-zero and prevent the
|
|
# final delete.
|
|
self.run_sharders(self.get_container_shard_ranges())
|
|
# then root is empty and can be deleted
|
|
self.assert_container_listing([])
|
|
self.assert_container_object_count(0)
|
|
client.delete_container(self.url, self.token, self.container_name)
|
|
|
|
def test_sharded_listing_no_replicators(self):
|
|
self._test_sharded_listing()
|
|
|
|
def test_sharded_listing_with_replicators(self):
|
|
self._test_sharded_listing(run_replicators=True)
|
|
|
|
def test_async_pendings(self):
|
|
obj_names = self._make_object_names(self.max_shard_size * 2)
|
|
|
|
# There are some updates *everyone* gets
|
|
self.put_objects(obj_names[::5])
|
|
# But roll some outages so each container only get ~2/5 more object
|
|
# records i.e. total of 3/5 updates per container; and async pendings
|
|
# pile up
|
|
for i, n in enumerate(self.brain.node_numbers, start=1):
|
|
self.brain.servers.stop(number=n)
|
|
self.put_objects(obj_names[i::5])
|
|
self.brain.servers.start(number=n)
|
|
|
|
# But there are also 1/5 updates *no one* gets
|
|
self.brain.servers.stop()
|
|
self.put_objects(obj_names[4::5])
|
|
self.brain.servers.start()
|
|
|
|
# Shard it
|
|
client.post_container(self.url, self.admin_token, self.container_name,
|
|
headers={'X-Container-Sharding': 'on'})
|
|
headers = client.head_container(self.url, self.admin_token,
|
|
self.container_name)
|
|
self.assertEqual('True', headers.get('x-container-sharding'))
|
|
|
|
# sanity check
|
|
found = self.categorize_container_dir_content()
|
|
self.assertLengthEqual(found['shard_dbs'], 0)
|
|
self.assertLengthEqual(found['normal_dbs'], 3)
|
|
for db_file in found['normal_dbs']:
|
|
broker = ContainerBroker(db_file)
|
|
self.assertIs(True, broker.is_root_container())
|
|
self.assertEqual(len(obj_names) * 3 // 5,
|
|
broker.get_info()['object_count'])
|
|
|
|
# Only run the 'leader' in charge of scanning.
|
|
# Each container has ~2 * max * 3/5 objects
|
|
# which are distributed from obj000 to obj<2 * max - 1>,
|
|
# so expect 3 shard ranges to be found: the first two will be complete
|
|
# shards with max/2 objects and lower/upper bounds spaced by approx:
|
|
# (2 * max - 1)/(2 * max * 3/5) * (max/2) =~ 5/6 * max
|
|
#
|
|
# Note that during this shard cycle the leader replicates to other
|
|
# nodes so they will end up with ~2 * max * 4/5 objects.
|
|
self.sharders.once(number=self.brain.node_numbers[0],
|
|
additional_args='--partitions=%s' % self.brain.part)
|
|
|
|
# Verify that we have one shard db -- though the other normal DBs
|
|
# received the shard ranges that got defined
|
|
found = self.categorize_container_dir_content()
|
|
self.assertLengthEqual(found['shard_dbs'], 1)
|
|
node_index_zero_db = found['shard_dbs'][0]
|
|
broker = ContainerBroker(node_index_zero_db)
|
|
self.assertIs(True, broker.is_root_container())
|
|
self.assertEqual(SHARDING, broker.get_db_state())
|
|
expected_shard_ranges = broker.get_shard_ranges()
|
|
self.assertLengthEqual(expected_shard_ranges, 3)
|
|
self.assertEqual(
|
|
[ShardRange.CLEAVED, ShardRange.CLEAVED, ShardRange.CREATED],
|
|
[sr.state for sr in expected_shard_ranges])
|
|
|
|
# Still have all three big DBs -- we've only cleaved 2 of the 3 shard
|
|
# ranges that got defined
|
|
self.assertLengthEqual(found['normal_dbs'], 3)
|
|
db_states = []
|
|
for db_file in found['normal_dbs']:
|
|
broker = ContainerBroker(db_file)
|
|
self.assertIs(True, broker.is_root_container())
|
|
db_states.append(broker.get_db_state())
|
|
# the sharded db had shard range meta_timestamps updated during
|
|
# cleaving, so we do not expect those to be equal on other nodes
|
|
self.assert_shard_range_lists_equal(
|
|
expected_shard_ranges, broker.get_shard_ranges(),
|
|
excludes=['meta_timestamp', 'state_timestamp', 'state'])
|
|
self.assertEqual(len(obj_names) * 3 // 5,
|
|
broker.get_info()['object_count'])
|
|
self.assertEqual([SHARDING, UNSHARDED, UNSHARDED], sorted(db_states))
|
|
|
|
# Run the other sharders so we're all in (roughly) the same state
|
|
for n in self.brain.node_numbers[1:]:
|
|
self.sharders.once(
|
|
number=n,
|
|
additional_args='--partitions=%s' % self.brain.part)
|
|
found = self.categorize_container_dir_content()
|
|
self.assertLengthEqual(found['shard_dbs'], 3)
|
|
self.assertLengthEqual(found['normal_dbs'], 3)
|
|
for db_file in found['normal_dbs']:
|
|
broker = ContainerBroker(db_file)
|
|
self.assertEqual(SHARDING, broker.get_db_state())
|
|
# no new rows
|
|
self.assertEqual(len(obj_names) * 3 // 5,
|
|
broker.get_info()['object_count'])
|
|
|
|
# Run updaters to clear the async pendings
|
|
Manager(['object-updater']).once()
|
|
|
|
# Our "big" dbs didn't take updates
|
|
for db_file in found['normal_dbs']:
|
|
broker = ContainerBroker(db_file)
|
|
self.assertEqual(len(obj_names) * 3 // 5,
|
|
broker.get_info()['object_count'])
|
|
|
|
# confirm that the async pending updates got redirected to the shards
|
|
for sr in expected_shard_ranges:
|
|
shard_listings = self.direct_get_container(sr.account,
|
|
sr.container)
|
|
for node, (hdrs, listing) in shard_listings.items():
|
|
shard_listing_names = [
|
|
o['name'].encode('utf-8') if six.PY2 else o['name']
|
|
for o in listing]
|
|
for obj in obj_names[4::5]:
|
|
if obj in sr:
|
|
self.assertIn(obj, shard_listing_names)
|
|
else:
|
|
self.assertNotIn(obj, shard_listing_names)
|
|
|
|
# The entire listing is not yet available - we have two cleaved shard
|
|
# ranges, complete with async updates, but for the remainder of the
|
|
# namespace only what landed in the original container
|
|
headers, listing = client.get_container(self.url, self.token,
|
|
self.container_name)
|
|
start_listing = [
|
|
o for o in obj_names if o <= expected_shard_ranges[1].upper]
|
|
self.assertEqual(
|
|
[x['name'].encode('utf-8') if six.PY2 else x['name']
|
|
for x in listing[:len(start_listing)]],
|
|
start_listing)
|
|
# we can't assert much about the remaining listing, other than that
|
|
# there should be something
|
|
self.assertTrue(
|
|
[x['name'].encode('utf-8') if six.PY2 else x['name']
|
|
for x in listing[len(start_listing):]])
|
|
self.assertIn('x-container-object-count', headers)
|
|
self.assertEqual(str(len(listing)),
|
|
headers['x-container-object-count'])
|
|
headers, listing = client.get_container(self.url, self.token,
|
|
self.container_name,
|
|
query_string='reverse=on')
|
|
self.assertEqual([x['name'].encode('utf-8') if six.PY2 else x['name']
|
|
for x in listing[-len(start_listing):]],
|
|
list(reversed(start_listing)))
|
|
self.assertIn('x-container-object-count', headers)
|
|
self.assertEqual(str(len(listing)),
|
|
headers['x-container-object-count'])
|
|
self.assertTrue(
|
|
[x['name'].encode('utf-8') if six.PY2 else x['name']
|
|
for x in listing[:-len(start_listing)]])
|
|
|
|
# Run the sharders again to get everything to settle
|
|
self.sharders.once()
|
|
found = self.categorize_container_dir_content()
|
|
self.assertLengthEqual(found['shard_dbs'], 3)
|
|
self.assertLengthEqual(found['normal_dbs'], 0)
|
|
# now all shards have been cleaved we should get the complete listing
|
|
headers, listing = client.get_container(self.url, self.token,
|
|
self.container_name)
|
|
self.assertEqual([x['name'].encode('utf-8') if six.PY2 else x['name']
|
|
for x in listing],
|
|
obj_names)
|
|
|
|
def test_shrinking(self):
|
|
int_client = self.make_internal_client()
|
|
|
|
def check_node_data(node_data, exp_hdrs, exp_obj_count, exp_shards):
|
|
hdrs, range_data = node_data
|
|
self.assert_dict_contains(exp_hdrs, hdrs)
|
|
self.assert_shard_ranges_contiguous(exp_shards, range_data)
|
|
self.assert_total_object_count(exp_obj_count, range_data)
|
|
|
|
def check_shard_nodes_data(node_data, expected_state='unsharded',
|
|
expected_shards=0, exp_obj_count=0):
|
|
# checks that shard range is consistent on all nodes
|
|
root_path = '%s/%s' % (self.account, self.container_name)
|
|
exp_shard_hdrs = {
|
|
'X-Container-Sysmeta-Shard-Quoted-Root': quote(root_path),
|
|
'X-Backend-Sharding-State': expected_state}
|
|
object_counts = []
|
|
bytes_used = []
|
|
for node_id, node_data in node_data.items():
|
|
with annotate_failure('Node id %s.' % node_id):
|
|
check_node_data(
|
|
node_data, exp_shard_hdrs, exp_obj_count,
|
|
expected_shards)
|
|
hdrs = node_data[0]
|
|
object_counts.append(int(hdrs['X-Container-Object-Count']))
|
|
bytes_used.append(int(hdrs['X-Container-Bytes-Used']))
|
|
if len(set(object_counts)) != 1:
|
|
self.fail('Inconsistent object counts: %s' % object_counts)
|
|
if len(set(bytes_used)) != 1:
|
|
self.fail('Inconsistent bytes used: %s' % bytes_used)
|
|
return object_counts[0], bytes_used[0]
|
|
|
|
repeat = [0]
|
|
|
|
def do_shard_then_shrink():
|
|
repeat[0] += 1
|
|
obj_names = ['obj-%s-%03d' % (repeat[0], x)
|
|
for x in range(self.max_shard_size)]
|
|
self.put_objects(obj_names)
|
|
# these two object names will fall at start of first shard range...
|
|
alpha = 'alpha-%s' % repeat[0]
|
|
beta = 'beta-%s' % repeat[0]
|
|
|
|
# Enable sharding
|
|
client.post_container(
|
|
self.url, self.admin_token, self.container_name,
|
|
headers={'X-Container-Sharding': 'on'})
|
|
|
|
# sanity check
|
|
self.assert_container_listing(obj_names)
|
|
|
|
# Only run the one in charge of scanning
|
|
self.sharders.once(
|
|
number=self.brain.node_numbers[0],
|
|
additional_args='--partitions=%s' % self.brain.part)
|
|
|
|
# check root container
|
|
root_nodes_data = self.direct_get_container_shard_ranges()
|
|
self.assertEqual(3, len(root_nodes_data))
|
|
|
|
# nodes on which sharder has not run are still in unsharded state
|
|
# but have had shard ranges replicated to them
|
|
exp_obj_count = len(obj_names)
|
|
exp_hdrs = {'X-Backend-Sharding-State': 'unsharded',
|
|
'X-Container-Object-Count': str(exp_obj_count)}
|
|
node_id = self.brain.node_numbers[1] - 1
|
|
check_node_data(
|
|
root_nodes_data[node_id], exp_hdrs, exp_obj_count, 2)
|
|
node_id = self.brain.node_numbers[2] - 1
|
|
check_node_data(
|
|
root_nodes_data[node_id], exp_hdrs, exp_obj_count, 2)
|
|
|
|
# only one that ran sharder is in sharded state
|
|
exp_hdrs['X-Backend-Sharding-State'] = 'sharded'
|
|
node_id = self.brain.node_numbers[0] - 1
|
|
check_node_data(
|
|
root_nodes_data[node_id], exp_hdrs, exp_obj_count, 2)
|
|
|
|
orig_range_data = root_nodes_data[node_id][1]
|
|
orig_shard_ranges = [ShardRange.from_dict(r)
|
|
for r in orig_range_data]
|
|
|
|
# check first shard
|
|
shard_nodes_data = self.direct_get_container_shard_ranges(
|
|
orig_shard_ranges[0].account, orig_shard_ranges[0].container)
|
|
obj_count, bytes_used = check_shard_nodes_data(shard_nodes_data)
|
|
total_shard_object_count = obj_count
|
|
|
|
# check second shard
|
|
shard_nodes_data = self.direct_get_container_shard_ranges(
|
|
orig_shard_ranges[1].account, orig_shard_ranges[1].container)
|
|
obj_count, bytes_used = check_shard_nodes_data(shard_nodes_data)
|
|
total_shard_object_count += obj_count
|
|
self.assertEqual(exp_obj_count, total_shard_object_count)
|
|
|
|
# Now that everyone has shard ranges, run *everyone*
|
|
self.sharders.once(
|
|
additional_args='--partitions=%s' % self.brain.part)
|
|
|
|
# all root container nodes should now be in sharded state
|
|
root_nodes_data = self.direct_get_container_shard_ranges()
|
|
self.assertEqual(3, len(root_nodes_data))
|
|
for node_id, node_data in root_nodes_data.items():
|
|
with annotate_failure('Node id %s.' % node_id):
|
|
check_node_data(node_data, exp_hdrs, exp_obj_count, 2)
|
|
|
|
# run updaters to update .sharded account; shard containers have
|
|
# not updated account since having objects replicated to them
|
|
self.updaters.once()
|
|
shard_cont_count, shard_obj_count = int_client.get_account_info(
|
|
orig_shard_ranges[0].account, [204])
|
|
self.assertEqual(2 * repeat[0], shard_cont_count)
|
|
# the shards account should always have zero object count to avoid
|
|
# double accounting
|
|
self.assertEqual(0, shard_obj_count)
|
|
|
|
# checking the listing also refreshes proxy container info cache so
|
|
# that the proxy becomes aware that container is sharded and will
|
|
# now look up the shard target for subsequent updates
|
|
self.assert_container_listing(obj_names)
|
|
|
|
# Before writing, kill the cache
|
|
self.memcache.delete(get_cache_key(
|
|
self.account, self.container_name, shard='updating'))
|
|
|
|
# delete objects from first shard range
|
|
first_shard_objects = [obj_name for obj_name in obj_names
|
|
if obj_name <= orig_shard_ranges[0].upper]
|
|
for obj in first_shard_objects:
|
|
client.delete_object(
|
|
self.url, self.token, self.container_name, obj)
|
|
with self.assertRaises(ClientException):
|
|
client.get_object(
|
|
self.url, self.token, self.container_name, obj)
|
|
|
|
second_shard_objects = [obj_name for obj_name in obj_names
|
|
if obj_name > orig_shard_ranges[1].lower]
|
|
self.assert_container_listing(second_shard_objects)
|
|
|
|
self.put_objects([alpha])
|
|
second_shard_objects = [obj_name for obj_name in obj_names
|
|
if obj_name > orig_shard_ranges[1].lower]
|
|
self.assert_container_listing([alpha] + second_shard_objects)
|
|
|
|
# while container servers are down, but proxy has container info in
|
|
# cache from recent listing, put another object; this update will
|
|
# lurk in async pending until the updaters run again; because all
|
|
# the root container servers are down and therefore cannot respond
|
|
# to a GET for a redirect target, the object update will default to
|
|
# being targeted at the root container
|
|
self.stop_container_servers()
|
|
|
|
# Before writing, kill the cache
|
|
self.memcache.delete(get_cache_key(
|
|
self.account, self.container_name, shard='updating'))
|
|
|
|
self.put_objects([beta])
|
|
self.brain.servers.start()
|
|
async_pendings = self.gather_async_pendings(
|
|
self.get_all_object_nodes())
|
|
num_container_replicas = len(self.brain.nodes)
|
|
num_obj_replicas = self.policy.object_ring.replica_count
|
|
expected_num_updates = num_container_updates(
|
|
num_container_replicas, quorum_size(num_container_replicas),
|
|
num_obj_replicas, self.policy.quorum)
|
|
expected_num_pendings = min(expected_num_updates, num_obj_replicas)
|
|
# sanity check
|
|
with annotate_failure('policy %s. ' % self.policy):
|
|
self.assertLengthEqual(async_pendings, expected_num_pendings)
|
|
|
|
# root object count is not updated...
|
|
self.assert_container_object_count(len(obj_names))
|
|
self.assert_container_listing([alpha] + second_shard_objects)
|
|
root_nodes_data = self.direct_get_container_shard_ranges()
|
|
self.assertEqual(3, len(root_nodes_data))
|
|
for node_id, node_data in root_nodes_data.items():
|
|
with annotate_failure('Node id %s.' % node_id):
|
|
check_node_data(node_data, exp_hdrs, exp_obj_count, 2)
|
|
range_data = node_data[1]
|
|
self.assert_shard_range_lists_equal(
|
|
orig_range_data, range_data,
|
|
excludes=['meta_timestamp', 'state_timestamp'])
|
|
|
|
# ...until the sharders run and update root
|
|
self.run_sharders(orig_shard_ranges[0])
|
|
exp_obj_count = len(second_shard_objects) + 1
|
|
self.assert_container_object_count(exp_obj_count)
|
|
self.assert_container_listing([alpha] + second_shard_objects)
|
|
|
|
# root sharder finds donor, acceptor pair and pushes changes
|
|
self.sharders.once(
|
|
additional_args='--partitions=%s' % self.brain.part)
|
|
self.assert_container_listing([alpha] + second_shard_objects)
|
|
# run sharder on donor to shrink and replicate to acceptor
|
|
self.run_sharders(orig_shard_ranges[0])
|
|
self.assert_container_listing([alpha] + second_shard_objects)
|
|
# run sharder on acceptor to update root with stats
|
|
self.run_sharders(orig_shard_ranges[1])
|
|
self.assert_container_listing([alpha] + second_shard_objects)
|
|
self.assert_container_object_count(len(second_shard_objects) + 1)
|
|
|
|
# check root container
|
|
root_nodes_data = self.direct_get_container_shard_ranges()
|
|
self.assertEqual(3, len(root_nodes_data))
|
|
exp_hdrs['X-Container-Object-Count'] = str(exp_obj_count)
|
|
for node_id, node_data in root_nodes_data.items():
|
|
with annotate_failure('Node id %s.' % node_id):
|
|
# NB now only *one* shard range in root
|
|
check_node_data(node_data, exp_hdrs, exp_obj_count, 1)
|
|
|
|
# the acceptor shard is intact..
|
|
shard_nodes_data = self.direct_get_container_shard_ranges(
|
|
orig_shard_ranges[1].account, orig_shard_ranges[1].container)
|
|
obj_count, bytes_used = check_shard_nodes_data(shard_nodes_data)
|
|
# all objects should now be in this shard
|
|
self.assertEqual(exp_obj_count, obj_count)
|
|
|
|
# the donor shard is also still intact
|
|
donor = orig_shard_ranges[0]
|
|
shard_nodes_data = self.direct_get_container_shard_ranges(
|
|
donor.account, donor.container)
|
|
# the donor's shard range will have the acceptor's projected stats
|
|
obj_count, bytes_used = check_shard_nodes_data(
|
|
shard_nodes_data, expected_state='sharded', expected_shards=1,
|
|
exp_obj_count=len(second_shard_objects) + 1)
|
|
# but the donor is empty and so reports zero stats
|
|
self.assertEqual(0, obj_count)
|
|
self.assertEqual(0, bytes_used)
|
|
# check the donor own shard range state
|
|
part, nodes = self.brain.ring.get_nodes(
|
|
donor.account, donor.container)
|
|
for node in nodes:
|
|
with annotate_failure(node):
|
|
broker = self.get_broker(
|
|
part, node, donor.account, donor.container)
|
|
own_sr = broker.get_own_shard_range()
|
|
self.assertEqual(ShardRange.SHARDED, own_sr.state)
|
|
self.assertTrue(own_sr.deleted)
|
|
|
|
# delete all the second shard's object apart from 'alpha'
|
|
for obj in second_shard_objects:
|
|
client.delete_object(
|
|
self.url, self.token, self.container_name, obj)
|
|
|
|
self.assert_container_listing([alpha])
|
|
|
|
# runs sharders so second range shrinks away, requires up to 3
|
|
# cycles
|
|
self.sharders.once() # shard updates root stats
|
|
self.assert_container_listing([alpha])
|
|
self.sharders.once() # root finds shrinkable shard
|
|
self.assert_container_listing([alpha])
|
|
self.sharders.once() # shards shrink themselves
|
|
self.assert_container_listing([alpha])
|
|
|
|
# the second shard range has sharded and is empty
|
|
shard_nodes_data = self.direct_get_container_shard_ranges(
|
|
orig_shard_ranges[1].account, orig_shard_ranges[1].container)
|
|
check_shard_nodes_data(
|
|
shard_nodes_data, expected_state='sharded', expected_shards=1,
|
|
exp_obj_count=1)
|
|
|
|
# check root container
|
|
root_nodes_data = self.direct_get_container_shard_ranges()
|
|
self.assertEqual(3, len(root_nodes_data))
|
|
exp_hdrs = {'X-Backend-Sharding-State': 'collapsed',
|
|
# just the alpha object
|
|
'X-Container-Object-Count': '1'}
|
|
for node_id, node_data in root_nodes_data.items():
|
|
with annotate_failure('Node id %s.' % node_id):
|
|
# NB now no shard ranges in root
|
|
check_node_data(node_data, exp_hdrs, 0, 0)
|
|
|
|
# delete the alpha object
|
|
client.delete_object(
|
|
self.url, self.token, self.container_name, alpha)
|
|
# should now be able to delete the *apparently* empty container
|
|
client.delete_container(self.url, self.token, self.container_name)
|
|
self.assert_container_not_found()
|
|
self.direct_head_container(expect_failure=True)
|
|
|
|
# and the container stays deleted even after sharders run and shard
|
|
# send updates
|
|
self.sharders.once()
|
|
self.assert_container_not_found()
|
|
self.direct_head_container(expect_failure=True)
|
|
|
|
# now run updaters to deal with the async pending for the beta
|
|
# object
|
|
self.updaters.once()
|
|
# and the container is revived!
|
|
self.assert_container_listing([beta])
|
|
|
|
# finally, clear out the container
|
|
client.delete_object(
|
|
self.url, self.token, self.container_name, beta)
|
|
|
|
do_shard_then_shrink()
|
|
# repeat from starting point of a collapsed and previously deleted
|
|
# container
|
|
do_shard_then_shrink()
|
|
|
|
def _setup_replication_scenario(self, num_shards, extra_objs=('alpha',)):
|
|
# Get cluster to state where 2 replicas are sharding or sharded but 3rd
|
|
# replica is unsharded and has an object that the first 2 are missing.
|
|
|
|
# put objects while all servers are up
|
|
obj_names = self._make_object_names(
|
|
num_shards * self.max_shard_size // 2)
|
|
self.put_objects(obj_names)
|
|
|
|
client.post_container(self.url, self.admin_token, self.container_name,
|
|
headers={'X-Container-Sharding': 'on'})
|
|
node_numbers = self.brain.node_numbers
|
|
|
|
# run replicators first time to get sync points set
|
|
self.replicators.once()
|
|
|
|
# stop the leader node and one other server
|
|
self.stop_container_servers(slice(0, 2))
|
|
|
|
# ...then put one more object in first shard range namespace
|
|
self.put_objects(extra_objs)
|
|
|
|
# start leader and first other server, stop third server
|
|
for number in node_numbers[:2]:
|
|
self.brain.servers.start(number=number)
|
|
self.brain.servers.stop(number=node_numbers[2])
|
|
self.assert_container_listing(obj_names) # sanity check
|
|
|
|
# shard the container - first two shard ranges are cleaved
|
|
for number in node_numbers[:2]:
|
|
self.sharders.once(
|
|
number=number,
|
|
additional_args='--partitions=%s' % self.brain.part)
|
|
|
|
self.assert_container_listing(obj_names) # sanity check
|
|
return obj_names
|
|
|
|
def test_replication_to_sharding_container(self):
|
|
# verify that replication from an unsharded replica to a sharding
|
|
# replica does not replicate rows but does replicate shard ranges
|
|
obj_names = self._setup_replication_scenario(3)
|
|
for node in self.brain.nodes[:2]:
|
|
self.assert_container_state(node, 'sharding', 3)
|
|
|
|
# bring third server back up, run replicator
|
|
node_numbers = self.brain.node_numbers
|
|
self.brain.servers.start(number=node_numbers[2])
|
|
# sanity check...
|
|
self.assert_container_state(self.brain.nodes[2], 'unsharded', 0)
|
|
self.replicators.once(number=node_numbers[2])
|
|
# check db files unchanged
|
|
found = self.categorize_container_dir_content()
|
|
self.assertLengthEqual(found['shard_dbs'], 2)
|
|
self.assertLengthEqual(found['normal_dbs'], 3)
|
|
|
|
# the 'alpha' object is NOT replicated to the two sharded nodes
|
|
for node in self.brain.nodes[:2]:
|
|
broker = self.get_broker(self.brain.part, node)
|
|
with annotate_failure(
|
|
'Node id %s in %s' % (node['id'], self.brain.nodes[:2])):
|
|
self.assertFalse(broker.get_objects())
|
|
self.assert_container_state(node, 'sharding', 3)
|
|
self.brain.servers.stop(number=node_numbers[2])
|
|
self.assert_container_listing(obj_names)
|
|
|
|
# all nodes now have shard ranges
|
|
self.brain.servers.start(number=node_numbers[2])
|
|
node_data = self.direct_get_container_shard_ranges()
|
|
for node, (hdrs, shard_ranges) in node_data.items():
|
|
with annotate_failure(node):
|
|
self.assert_shard_ranges_contiguous(3, shard_ranges)
|
|
|
|
# complete cleaving third shard range on first two nodes
|
|
self.brain.servers.stop(number=node_numbers[2])
|
|
for number in node_numbers[:2]:
|
|
self.sharders.once(
|
|
number=number,
|
|
additional_args='--partitions=%s' % self.brain.part)
|
|
# ...and now they are in sharded state
|
|
self.assert_container_state(self.brain.nodes[0], 'sharded', 3)
|
|
self.assert_container_state(self.brain.nodes[1], 'sharded', 3)
|
|
# ...still no 'alpha' object in listing
|
|
self.assert_container_listing(obj_names)
|
|
|
|
# run the sharder on the third server, alpha object is included in
|
|
# shards that it cleaves
|
|
self.brain.servers.start(number=node_numbers[2])
|
|
self.assert_container_state(self.brain.nodes[2], 'unsharded', 3)
|
|
self.sharders.once(number=node_numbers[2],
|
|
additional_args='--partitions=%s' % self.brain.part)
|
|
self.assert_container_state(self.brain.nodes[2], 'sharding', 3)
|
|
self.sharders.once(number=node_numbers[2],
|
|
additional_args='--partitions=%s' % self.brain.part)
|
|
self.assert_container_state(self.brain.nodes[2], 'sharded', 3)
|
|
self.assert_container_listing(['alpha'] + obj_names)
|
|
|
|
def test_replication_to_sharded_container(self):
|
|
# verify that replication from an unsharded replica to a sharded
|
|
# replica does not replicate rows but does replicate shard ranges
|
|
obj_names = self._setup_replication_scenario(2)
|
|
for node in self.brain.nodes[:2]:
|
|
self.assert_container_state(node, 'sharded', 2)
|
|
|
|
# sanity check
|
|
found = self.categorize_container_dir_content()
|
|
self.assertLengthEqual(found['shard_dbs'], 2)
|
|
self.assertLengthEqual(found['normal_dbs'], 1)
|
|
for node in self.brain.nodes[:2]:
|
|
broker = self.get_broker(self.brain.part, node)
|
|
info = broker.get_info()
|
|
with annotate_failure(
|
|
'Node id %s in %s' % (node['id'], self.brain.nodes[:2])):
|
|
self.assertEqual(len(obj_names), info['object_count'])
|
|
self.assertFalse(broker.get_objects())
|
|
|
|
# bring third server back up, run replicator
|
|
node_numbers = self.brain.node_numbers
|
|
self.brain.servers.start(number=node_numbers[2])
|
|
# sanity check...
|
|
self.assert_container_state(self.brain.nodes[2], 'unsharded', 0)
|
|
self.replicators.once(number=node_numbers[2])
|
|
# check db files unchanged
|
|
found = self.categorize_container_dir_content()
|
|
self.assertLengthEqual(found['shard_dbs'], 2)
|
|
self.assertLengthEqual(found['normal_dbs'], 1)
|
|
|
|
# the 'alpha' object is NOT replicated to the two sharded nodes
|
|
for node in self.brain.nodes[:2]:
|
|
broker = self.get_broker(self.brain.part, node)
|
|
with annotate_failure(
|
|
'Node id %s in %s' % (node['id'], self.brain.nodes[:2])):
|
|
self.assertFalse(broker.get_objects())
|
|
self.assert_container_state(node, 'sharded', 2)
|
|
self.brain.servers.stop(number=node_numbers[2])
|
|
self.assert_container_listing(obj_names)
|
|
|
|
# all nodes now have shard ranges
|
|
self.brain.servers.start(number=node_numbers[2])
|
|
node_data = self.direct_get_container_shard_ranges()
|
|
for node, (hdrs, shard_ranges) in node_data.items():
|
|
with annotate_failure(node):
|
|
self.assert_shard_ranges_contiguous(2, shard_ranges)
|
|
|
|
# run the sharder on the third server, alpha object is included in
|
|
# shards that it cleaves
|
|
self.assert_container_state(self.brain.nodes[2], 'unsharded', 2)
|
|
self.sharders.once(number=node_numbers[2],
|
|
additional_args='--partitions=%s' % self.brain.part)
|
|
self.assert_container_state(self.brain.nodes[2], 'sharded', 2)
|
|
self.assert_container_listing(['alpha'] + obj_names)
|
|
|
|
def test_sharding_requires_sufficient_replication(self):
|
|
# verify that cleaving only progresses if each cleaved shard range is
|
|
# sufficiently replicated
|
|
|
|
# put enough objects for 4 shard ranges
|
|
obj_names = self._make_object_names(2 * self.max_shard_size)
|
|
self.put_objects(obj_names)
|
|
|
|
client.post_container(self.url, self.admin_token, self.container_name,
|
|
headers={'X-Container-Sharding': 'on'})
|
|
node_numbers = self.brain.node_numbers
|
|
leader_node = self.brain.nodes[0]
|
|
leader_num = node_numbers[0]
|
|
|
|
# run replicators first time to get sync points set
|
|
self.replicators.once()
|
|
|
|
# start sharding on the leader node
|
|
self.sharders.once(number=leader_num,
|
|
additional_args='--partitions=%s' % self.brain.part)
|
|
shard_ranges = self.assert_container_state(leader_node, 'sharding', 4)
|
|
self.assertEqual([ShardRange.CLEAVED] * 2 + [ShardRange.CREATED] * 2,
|
|
[sr.state for sr in shard_ranges])
|
|
|
|
# stop *all* container servers for third shard range
|
|
sr_part, sr_node_nums = self.get_part_and_node_numbers(shard_ranges[2])
|
|
for node_num in sr_node_nums:
|
|
self.brain.servers.stop(number=node_num)
|
|
|
|
# attempt to continue sharding on the leader node
|
|
self.sharders.once(number=leader_num,
|
|
additional_args='--partitions=%s' % self.brain.part)
|
|
|
|
# no cleaving progress was made
|
|
for node_num in sr_node_nums:
|
|
self.brain.servers.start(number=node_num)
|
|
shard_ranges = self.assert_container_state(leader_node, 'sharding', 4)
|
|
self.assertEqual([ShardRange.CLEAVED] * 2 + [ShardRange.CREATED] * 2,
|
|
[sr.state for sr in shard_ranges])
|
|
|
|
# stop two of the servers for third shard range, not including any
|
|
# server that happens to be the leader node
|
|
stopped = []
|
|
for node_num in sr_node_nums:
|
|
if node_num != leader_num:
|
|
self.brain.servers.stop(number=node_num)
|
|
stopped.append(node_num)
|
|
if len(stopped) >= 2:
|
|
break
|
|
self.assertLengthEqual(stopped, 2) # sanity check
|
|
|
|
# attempt to continue sharding on the leader node
|
|
self.sharders.once(number=leader_num,
|
|
additional_args='--partitions=%s' % self.brain.part)
|
|
|
|
# no cleaving progress was made
|
|
for node_num in stopped:
|
|
self.brain.servers.start(number=node_num)
|
|
shard_ranges = self.assert_container_state(leader_node, 'sharding', 4)
|
|
self.assertEqual([ShardRange.CLEAVED] * 2 + [ShardRange.CREATED] * 2,
|
|
[sr.state for sr in shard_ranges])
|
|
|
|
# stop just one of the servers for third shard range
|
|
stopped = []
|
|
for node_num in sr_node_nums:
|
|
if node_num != leader_num:
|
|
self.brain.servers.stop(number=node_num)
|
|
stopped.append(node_num)
|
|
break
|
|
self.assertLengthEqual(stopped, 1) # sanity check
|
|
|
|
# attempt to continue sharding the container
|
|
self.sharders.once(number=leader_num,
|
|
additional_args='--partitions=%s' % self.brain.part)
|
|
|
|
# this time cleaving completed
|
|
self.brain.servers.start(number=stopped[0])
|
|
shard_ranges = self.assert_container_state(leader_node, 'sharded', 4)
|
|
self.assertEqual([ShardRange.ACTIVE] * 4,
|
|
[sr.state for sr in shard_ranges])
|
|
|
|
def test_sharded_delete(self):
|
|
all_obj_names = self._make_object_names(self.max_shard_size)
|
|
self.put_objects(all_obj_names)
|
|
# Shard the container
|
|
client.post_container(self.url, self.admin_token, self.container_name,
|
|
headers={'X-Container-Sharding': 'on'})
|
|
for n in self.brain.node_numbers:
|
|
self.sharders.once(
|
|
number=n, additional_args='--partitions=%s' % self.brain.part)
|
|
# sanity checks
|
|
for node in self.brain.nodes:
|
|
self.assert_container_state(node, 'sharded', 2)
|
|
self.assert_container_delete_fails()
|
|
self.assert_container_has_shard_sysmeta()
|
|
self.assert_container_post_ok('sharded')
|
|
self.assert_container_listing(all_obj_names)
|
|
|
|
# delete all objects - updates redirected to shards
|
|
self.delete_objects(all_obj_names)
|
|
self.assert_container_listing([])
|
|
self.assert_container_post_ok('has objects')
|
|
# root not yet updated with shard stats
|
|
self.assert_container_object_count(len(all_obj_names))
|
|
self.assert_container_delete_fails()
|
|
self.assert_container_has_shard_sysmeta()
|
|
|
|
# run sharder on shard containers to update root stats
|
|
shard_ranges = self.get_container_shard_ranges()
|
|
self.assertLengthEqual(shard_ranges, 2)
|
|
self.run_sharders(shard_ranges)
|
|
self.assert_container_listing([])
|
|
self.assert_container_post_ok('empty')
|
|
self.assert_container_object_count(0)
|
|
|
|
# put a new object - update redirected to shard
|
|
self.put_objects(['alpha'])
|
|
self.assert_container_listing(['alpha'])
|
|
self.assert_container_object_count(0)
|
|
|
|
# before root learns about new object in shard, delete the container
|
|
client.delete_container(self.url, self.token, self.container_name)
|
|
self.assert_container_post_fails('deleted')
|
|
self.assert_container_not_found()
|
|
|
|
# run the sharders to update root with shard stats
|
|
self.run_sharders(shard_ranges)
|
|
|
|
self.assert_container_listing(['alpha'])
|
|
self.assert_container_object_count(1)
|
|
self.assert_container_delete_fails()
|
|
self.assert_container_post_ok('revived')
|
|
|
|
def test_object_update_redirection(self):
|
|
all_obj_names = self._make_object_names(self.max_shard_size)
|
|
self.put_objects(all_obj_names)
|
|
# Shard the container
|
|
client.post_container(self.url, self.admin_token, self.container_name,
|
|
headers={'X-Container-Sharding': 'on'})
|
|
for n in self.brain.node_numbers:
|
|
self.sharders.once(
|
|
number=n, additional_args='--partitions=%s' % self.brain.part)
|
|
# sanity checks
|
|
for node in self.brain.nodes:
|
|
self.assert_container_state(node, 'sharded', 2)
|
|
self.assert_container_delete_fails()
|
|
self.assert_container_has_shard_sysmeta()
|
|
self.assert_container_post_ok('sharded')
|
|
self.assert_container_listing(all_obj_names)
|
|
|
|
# delete all objects - updates redirected to shards
|
|
self.delete_objects(all_obj_names)
|
|
self.assert_container_listing([])
|
|
self.assert_container_post_ok('has objects')
|
|
|
|
# run sharder on shard containers to update root stats
|
|
shard_ranges = self.get_container_shard_ranges()
|
|
self.assertLengthEqual(shard_ranges, 2)
|
|
self.run_sharders(shard_ranges)
|
|
self.assert_container_object_count(0)
|
|
|
|
# First, test a misplaced object moving from one shard to another.
|
|
# with one shard server down, put a new 'alpha' object...
|
|
shard_part, shard_nodes = self.get_part_and_node_numbers(
|
|
shard_ranges[0])
|
|
self.brain.servers.stop(number=shard_nodes[2])
|
|
self.put_objects(['alpha'])
|
|
self.assert_container_listing(['alpha'])
|
|
self.assert_container_object_count(0)
|
|
self.assertLengthEqual(
|
|
self.gather_async_pendings(self.get_all_object_nodes()), 1)
|
|
self.brain.servers.start(number=shard_nodes[2])
|
|
|
|
# run sharder on root to discover first shrink candidate
|
|
self.sharders.once(additional_args='--partitions=%s' % self.brain.part)
|
|
# then run sharder on the shard node without the alpha object
|
|
self.sharders.once(additional_args='--partitions=%s' % shard_part,
|
|
number=shard_nodes[2])
|
|
# root sees first shard has shrunk, only second shard range used for
|
|
# listing so alpha object not in listing
|
|
self.assertLengthEqual(self.get_container_shard_ranges(), 1)
|
|
self.assert_container_listing([])
|
|
self.assert_container_object_count(0)
|
|
|
|
# run the updaters: the async pending update will be redirected from
|
|
# shrunk shard to second shard
|
|
self.updaters.once()
|
|
self.assert_container_listing(['alpha'])
|
|
self.assert_container_object_count(0) # root not yet updated
|
|
|
|
# then run sharder on other shard nodes to complete shrinking
|
|
for number in shard_nodes[:2]:
|
|
self.sharders.once(additional_args='--partitions=%s' % shard_part,
|
|
number=number)
|
|
# and get root updated
|
|
self.run_sharders(shard_ranges[1])
|
|
self.assert_container_listing(['alpha'])
|
|
self.assert_container_object_count(1)
|
|
self.assertLengthEqual(self.get_container_shard_ranges(), 1)
|
|
|
|
# Now we have just one active shard, test a misplaced object moving
|
|
# from that shard to the root.
|
|
# with one shard server down, delete 'alpha' and put a 'beta' object...
|
|
shard_part, shard_nodes = self.get_part_and_node_numbers(
|
|
shard_ranges[1])
|
|
self.brain.servers.stop(number=shard_nodes[2])
|
|
# Before writing, kill the cache
|
|
self.memcache.delete(get_cache_key(
|
|
self.account, self.container_name, shard='updating'))
|
|
self.delete_objects(['alpha'])
|
|
self.put_objects(['beta'])
|
|
self.assert_container_listing(['beta'])
|
|
self.assert_container_object_count(1)
|
|
self.assertLengthEqual(
|
|
self.gather_async_pendings(self.get_all_object_nodes()), 2)
|
|
self.brain.servers.start(number=shard_nodes[2])
|
|
|
|
# run sharder on root to discover second shrink candidate - root is not
|
|
# yet aware of the beta object
|
|
self.sharders.once(additional_args='--partitions=%s' % self.brain.part)
|
|
# then run sharder on the shard node without the beta object, to shrink
|
|
# it to root - note this moves stale copy of alpha to the root db
|
|
self.sharders.once(additional_args='--partitions=%s' % shard_part,
|
|
number=shard_nodes[2])
|
|
# now there are no active shards
|
|
self.assertFalse(self.get_container_shard_ranges())
|
|
|
|
# with other two shard servers down, listing won't find beta object
|
|
for number in shard_nodes[:2]:
|
|
self.brain.servers.stop(number=number)
|
|
self.assert_container_listing(['alpha'])
|
|
self.assert_container_object_count(1)
|
|
|
|
# run the updaters: the async pending update will be redirected from
|
|
# shrunk shard to the root
|
|
self.updaters.once()
|
|
self.assert_container_listing(['beta'])
|
|
self.assert_container_object_count(1)
|
|
|
|
def test_misplaced_object_movement(self):
|
|
def merge_object(shard_range, name, deleted=0):
|
|
# it's hard to get a test to put a misplaced object into a shard,
|
|
# so this hack is used force an object record directly into a shard
|
|
# container db. Note: the actual object won't exist, we're just
|
|
# using this to test object records in container dbs.
|
|
shard_part, shard_nodes = self.brain.ring.get_nodes(
|
|
shard_range.account, shard_range.container)
|
|
shard_broker = self.get_broker(
|
|
shard_part, shard_nodes[0], shard_range.account,
|
|
shard_range.container)
|
|
shard_broker.merge_items(
|
|
[{'name': name, 'created_at': Timestamp.now().internal,
|
|
'size': 0, 'content_type': 'text/plain',
|
|
'etag': hashlib.md5().hexdigest(), 'deleted': deleted,
|
|
'storage_policy_index': shard_broker.storage_policy_index}])
|
|
return shard_nodes[0]
|
|
|
|
all_obj_names = self._make_object_names(self.max_shard_size)
|
|
self.put_objects(all_obj_names)
|
|
# Shard the container
|
|
client.post_container(self.url, self.admin_token, self.container_name,
|
|
headers={'X-Container-Sharding': 'on'})
|
|
for n in self.brain.node_numbers:
|
|
self.sharders.once(
|
|
number=n, additional_args='--partitions=%s' % self.brain.part)
|
|
# sanity checks
|
|
for node in self.brain.nodes:
|
|
self.assert_container_state(node, 'sharded', 2)
|
|
self.assert_container_delete_fails()
|
|
self.assert_container_has_shard_sysmeta()
|
|
self.assert_container_post_ok('sharded')
|
|
self.assert_container_listing(all_obj_names)
|
|
|
|
# delete all objects in first shard range - updates redirected to shard
|
|
shard_ranges = self.get_container_shard_ranges()
|
|
self.assertLengthEqual(shard_ranges, 2)
|
|
shard_0_objects = [name for name in all_obj_names
|
|
if name in shard_ranges[0]]
|
|
shard_1_objects = [name for name in all_obj_names
|
|
if name in shard_ranges[1]]
|
|
self.delete_objects(shard_0_objects)
|
|
self.assert_container_listing(shard_1_objects)
|
|
self.assert_container_post_ok('has objects')
|
|
|
|
# run sharder on first shard container to update root stats
|
|
self.run_sharders(shard_ranges[0])
|
|
self.assert_container_object_count(len(shard_1_objects))
|
|
|
|
# First, test a misplaced object moving from one shard to another.
|
|
# run sharder on root to discover first shrink candidate
|
|
self.sharders.once(additional_args='--partitions=%s' % self.brain.part)
|
|
# then run sharder on first shard range to shrink it
|
|
self.run_sharders(shard_ranges[0])
|
|
# force a misplaced object into the shrunken shard range to simulate
|
|
# a client put that was in flight when it started to shrink
|
|
misplaced_node = merge_object(shard_ranges[0], 'alpha', deleted=0)
|
|
# root sees first shard has shrunk, only second shard range used for
|
|
# listing so alpha object not in listing
|
|
self.assertLengthEqual(self.get_container_shard_ranges(), 1)
|
|
self.assert_container_listing(shard_1_objects)
|
|
self.assert_container_object_count(len(shard_1_objects))
|
|
# until sharder runs on that node to move the misplaced object to the
|
|
# second shard range
|
|
shard_part, shard_nodes_numbers = self.get_part_and_node_numbers(
|
|
shard_ranges[0])
|
|
self.sharders.once(additional_args='--partitions=%s' % shard_part,
|
|
number=misplaced_node['id'] + 1)
|
|
self.assert_container_listing(['alpha'] + shard_1_objects)
|
|
# root not yet updated
|
|
self.assert_container_object_count(len(shard_1_objects))
|
|
|
|
# run sharder to get root updated
|
|
self.run_sharders(shard_ranges[1])
|
|
self.assert_container_listing(['alpha'] + shard_1_objects)
|
|
self.assert_container_object_count(len(shard_1_objects) + 1)
|
|
self.assertLengthEqual(self.get_container_shard_ranges(), 1)
|
|
|
|
# Now we have just one active shard, test a misplaced object moving
|
|
# from that shard to the root.
|
|
# delete most objects from second shard range and run sharder on root
|
|
# to discover second shrink candidate
|
|
self.delete_objects(shard_1_objects)
|
|
self.run_sharders(shard_ranges[1])
|
|
self.sharders.once(additional_args='--partitions=%s' % self.brain.part)
|
|
# then run sharder on the shard node to shrink it to root - note this
|
|
# moves alpha to the root db
|
|
self.run_sharders(shard_ranges[1])
|
|
# now there are no active shards
|
|
self.assertFalse(self.get_container_shard_ranges())
|
|
|
|
# force some misplaced object updates into second shrunk shard range
|
|
merge_object(shard_ranges[1], 'alpha', deleted=1)
|
|
misplaced_node = merge_object(shard_ranges[1], 'beta', deleted=0)
|
|
# root is not yet aware of them
|
|
self.assert_container_listing(['alpha'])
|
|
self.assert_container_object_count(1)
|
|
# until sharder runs on that node to move the misplaced object
|
|
shard_part, shard_nodes_numbers = self.get_part_and_node_numbers(
|
|
shard_ranges[1])
|
|
self.sharders.once(additional_args='--partitions=%s' % shard_part,
|
|
number=misplaced_node['id'] + 1)
|
|
self.assert_container_listing(['beta'])
|
|
self.assert_container_object_count(1)
|
|
self.assert_container_delete_fails()
|
|
|
|
def test_replication_to_sharded_container_from_unsharded_old_primary(self):
|
|
primary_ids = [n['id'] for n in self.brain.nodes]
|
|
handoff_node = next(n for n in self.brain.ring.devs
|
|
if n['id'] not in primary_ids)
|
|
|
|
# start with two sharded replicas and one unsharded with extra object
|
|
obj_names = self._setup_replication_scenario(2)
|
|
for node in self.brain.nodes[:2]:
|
|
self.assert_container_state(node, 'sharded', 2)
|
|
|
|
# Fake a ring change - copy unsharded db which has no shard ranges to a
|
|
# handoff to create illusion of a new unpopulated primary node
|
|
node_numbers = self.brain.node_numbers
|
|
new_primary_node = self.brain.nodes[2]
|
|
new_primary_node_number = node_numbers[2]
|
|
new_primary_dir, container_hash = self.get_storage_dir(
|
|
self.brain.part, new_primary_node)
|
|
old_primary_dir, container_hash = self.get_storage_dir(
|
|
self.brain.part, handoff_node)
|
|
utils.mkdirs(os.path.dirname(old_primary_dir))
|
|
shutil.move(new_primary_dir, old_primary_dir)
|
|
|
|
# make the cluster more or less "healthy" again
|
|
self.brain.servers.start(number=new_primary_node_number)
|
|
|
|
# get a db on every node...
|
|
client.put_container(self.url, self.token, self.container_name)
|
|
self.assertTrue(os.path.exists(os.path.join(
|
|
new_primary_dir, container_hash + '.db')))
|
|
found = self.categorize_container_dir_content()
|
|
self.assertLengthEqual(found['normal_dbs'], 1) # "new" primary
|
|
self.assertLengthEqual(found['shard_dbs'], 2) # existing primaries
|
|
|
|
# catastrophic failure! drive dies and is replaced on unchanged primary
|
|
failed_node = self.brain.nodes[0]
|
|
failed_dir, _container_hash = self.get_storage_dir(
|
|
self.brain.part, failed_node)
|
|
shutil.rmtree(failed_dir)
|
|
|
|
# replicate the "old primary" to everybody except the "new primary"
|
|
self.brain.servers.stop(number=new_primary_node_number)
|
|
self.replicators.once(number=handoff_node['id'] + 1)
|
|
|
|
# We're willing to rsync the retiring db to the failed primary.
|
|
# This may or may not have shard ranges, depending on the order in
|
|
# which we hit the primaries, but it definitely *doesn't* have an
|
|
# epoch in its name yet. All objects are replicated.
|
|
self.assertTrue(os.path.exists(os.path.join(
|
|
failed_dir, container_hash + '.db')))
|
|
self.assertLengthEqual(os.listdir(failed_dir), 1)
|
|
broker = self.get_broker(self.brain.part, failed_node)
|
|
self.assertLengthEqual(broker.get_objects(), len(obj_names) + 1)
|
|
|
|
# The other out-of-date primary is within usync range but objects are
|
|
# not replicated to it because the handoff db learns about shard ranges
|
|
broker = self.get_broker(self.brain.part, self.brain.nodes[1])
|
|
self.assertLengthEqual(broker.get_objects(), 0)
|
|
|
|
# Handoff db still exists and now has shard ranges!
|
|
self.assertTrue(os.path.exists(os.path.join(
|
|
old_primary_dir, container_hash + '.db')))
|
|
broker = self.get_broker(self.brain.part, handoff_node)
|
|
shard_ranges = broker.get_shard_ranges()
|
|
self.assertLengthEqual(shard_ranges, 2)
|
|
self.assert_container_state(handoff_node, 'unsharded', 2)
|
|
|
|
# Replicate again, this time *including* "new primary"
|
|
self.brain.servers.start(number=new_primary_node_number)
|
|
self.replicators.once(number=handoff_node['id'] + 1)
|
|
|
|
# Ordinarily, we would have rsync_then_merge'd to "new primary"
|
|
# but instead we wait
|
|
broker = self.get_broker(self.brain.part, new_primary_node)
|
|
self.assertLengthEqual(broker.get_objects(), 0)
|
|
shard_ranges = broker.get_shard_ranges()
|
|
self.assertLengthEqual(shard_ranges, 2)
|
|
|
|
# so the next time the sharder comes along, it can push rows out
|
|
# and delete the big db
|
|
self.sharders.once(number=handoff_node['id'] + 1,
|
|
additional_args='--partitions=%s' % self.brain.part)
|
|
self.assert_container_state(handoff_node, 'sharded', 2)
|
|
self.assertFalse(os.path.exists(os.path.join(
|
|
old_primary_dir, container_hash + '.db')))
|
|
# the sharded db hangs around until replication confirms durability
|
|
# first attempt is not sufficiently successful
|
|
self.brain.servers.stop(number=node_numbers[0])
|
|
self.replicators.once(number=handoff_node['id'] + 1)
|
|
self.assertTrue(os.path.exists(old_primary_dir))
|
|
self.assert_container_state(handoff_node, 'sharded', 2)
|
|
# second attempt is successful and handoff db is deleted
|
|
self.brain.servers.start(number=node_numbers[0])
|
|
self.replicators.once(number=handoff_node['id'] + 1)
|
|
self.assertFalse(os.path.exists(old_primary_dir))
|
|
|
|
# run all the sharders, get us into a consistent state
|
|
self.sharders.once(additional_args='--partitions=%s' % self.brain.part)
|
|
self.assert_container_listing(['alpha'] + obj_names)
|
|
|
|
def test_replication_to_empty_new_primary_from_sharding_old_primary(self):
|
|
primary_ids = [n['id'] for n in self.brain.nodes]
|
|
handoff_node = next(n for n in self.brain.ring.devs
|
|
if n['id'] not in primary_ids)
|
|
num_shards = 3
|
|
obj_names = self._make_object_names(
|
|
num_shards * self.max_shard_size // 2)
|
|
self.put_objects(obj_names)
|
|
client.post_container(self.url, self.admin_token, self.container_name,
|
|
headers={'X-Container-Sharding': 'on'})
|
|
|
|
# run replicators first time to get sync points set
|
|
self.replicators.once()
|
|
# start sharding on only the leader node
|
|
leader_node = self.brain.nodes[0]
|
|
leader_node_number = self.brain.node_numbers[0]
|
|
self.sharders.once(number=leader_node_number)
|
|
self.assert_container_state(leader_node, 'sharding', 3)
|
|
for node in self.brain.nodes[1:]:
|
|
self.assert_container_state(node, 'unsharded', 3)
|
|
|
|
# Fake a ring change - copy leader node db to a handoff to create
|
|
# illusion of a new unpopulated primary leader node
|
|
new_primary_dir, container_hash = self.get_storage_dir(
|
|
self.brain.part, leader_node)
|
|
old_primary_dir, container_hash = self.get_storage_dir(
|
|
self.brain.part, handoff_node)
|
|
utils.mkdirs(os.path.dirname(old_primary_dir))
|
|
shutil.move(new_primary_dir, old_primary_dir)
|
|
self.assert_container_state(handoff_node, 'sharding', 3)
|
|
|
|
# run replicator on handoff node to create a fresh db on new primary
|
|
self.assertFalse(os.path.exists(new_primary_dir))
|
|
self.replicators.once(number=handoff_node['id'] + 1)
|
|
self.assertTrue(os.path.exists(new_primary_dir))
|
|
self.assert_container_state(leader_node, 'sharded', 3)
|
|
broker = self.get_broker(self.brain.part, leader_node)
|
|
shard_ranges = broker.get_shard_ranges()
|
|
self.assertLengthEqual(shard_ranges, 3)
|
|
self.assertEqual(
|
|
[ShardRange.CLEAVED, ShardRange.CLEAVED, ShardRange.CREATED],
|
|
[sr.state for sr in shard_ranges])
|
|
|
|
# db still exists on handoff
|
|
self.assertTrue(os.path.exists(old_primary_dir))
|
|
self.assert_container_state(handoff_node, 'sharding', 3)
|
|
# continue sharding it...
|
|
self.sharders.once(number=handoff_node['id'] + 1)
|
|
self.assert_container_state(leader_node, 'sharded', 3)
|
|
# now handoff is fully sharded the replicator will delete it
|
|
self.replicators.once(number=handoff_node['id'] + 1)
|
|
self.assertFalse(os.path.exists(old_primary_dir))
|
|
|
|
# all primaries now have active shard ranges but only one is in sharded
|
|
# state
|
|
self.assert_container_state(leader_node, 'sharded', 3)
|
|
for node in self.brain.nodes[1:]:
|
|
self.assert_container_state(node, 'unsharded', 3)
|
|
node_data = self.direct_get_container_shard_ranges()
|
|
for node_id, (hdrs, shard_ranges) in node_data.items():
|
|
with annotate_failure(
|
|
'node id %s from %s' % (node_id, node_data.keys)):
|
|
self.assert_shard_range_state(ShardRange.ACTIVE, shard_ranges)
|
|
|
|
# check handoff cleaved all objects before it was deleted - stop all
|
|
# but leader node so that listing is fetched from shards
|
|
for number in self.brain.node_numbers[1:3]:
|
|
self.brain.servers.stop(number=number)
|
|
|
|
self.assert_container_listing(obj_names)
|
|
|
|
for number in self.brain.node_numbers[1:3]:
|
|
self.brain.servers.start(number=number)
|
|
|
|
self.sharders.once()
|
|
self.assert_container_state(leader_node, 'sharded', 3)
|
|
for node in self.brain.nodes[1:]:
|
|
self.assert_container_state(node, 'sharding', 3)
|
|
self.sharders.once()
|
|
for node in self.brain.nodes:
|
|
self.assert_container_state(node, 'sharded', 3)
|
|
|
|
self.assert_container_listing(obj_names)
|
|
|
|
def test_sharded_account_updates(self):
|
|
# verify that .shards account updates have zero object count and bytes
|
|
# to avoid double accounting
|
|
all_obj_names = self._make_object_names(self.max_shard_size)
|
|
self.put_objects(all_obj_names, contents='xyz')
|
|
# Shard the container into 2 shards
|
|
client.post_container(self.url, self.admin_token, self.container_name,
|
|
headers={'X-Container-Sharding': 'on'})
|
|
for n in self.brain.node_numbers:
|
|
self.sharders.once(
|
|
number=n, additional_args='--partitions=%s' % self.brain.part)
|
|
# sanity checks
|
|
for node in self.brain.nodes:
|
|
shard_ranges = self.assert_container_state(node, 'sharded', 2)
|
|
self.assert_container_delete_fails()
|
|
self.assert_container_has_shard_sysmeta()
|
|
self.assert_container_post_ok('sharded')
|
|
self.assert_container_listing(all_obj_names)
|
|
# run the updaters to get account stats updated
|
|
self.updaters.once()
|
|
# check user account stats
|
|
metadata = self.internal_client.get_account_metadata(self.account)
|
|
self.assertEqual(1, int(metadata.get('x-account-container-count')))
|
|
self.assertEqual(self.max_shard_size,
|
|
int(metadata.get('x-account-object-count')))
|
|
self.assertEqual(3 * self.max_shard_size,
|
|
int(metadata.get('x-account-bytes-used')))
|
|
# check hidden .shards account stats
|
|
metadata = self.internal_client.get_account_metadata(
|
|
shard_ranges[0].account)
|
|
self.assertEqual(2, int(metadata.get('x-account-container-count')))
|
|
self.assertEqual(0, int(metadata.get('x-account-object-count')))
|
|
self.assertEqual(0, int(metadata.get('x-account-bytes-used')))
|
|
|
|
|
|
class TestContainerShardingMoreUTF8(TestContainerSharding):
|
|
def _make_object_names(self, number):
|
|
# override default with names that include non-ascii chars
|
|
name_length = self.cluster_info['swift']['max_object_name_length']
|
|
obj_names = []
|
|
for x in range(number):
|
|
name = (u'obj-\u00e4\u00ea\u00ec\u00f2\u00fb-%04d' % x)
|
|
name = name.encode('utf8').ljust(name_length, b'o')
|
|
if not six.PY2:
|
|
name = name.decode('utf8')
|
|
obj_names.append(name)
|
|
return obj_names
|
|
|
|
def _setup_container_name(self):
|
|
# override default with max length name that includes non-ascii chars
|
|
super(TestContainerShardingMoreUTF8, self)._setup_container_name()
|
|
name_length = self.cluster_info['swift']['max_container_name_length']
|
|
cont_name = \
|
|
self.container_name + u'-\u00e4\u00ea\u00ec\u00f2\u00fb\u1234'
|
|
self.container_name = cont_name.encode('utf8').ljust(name_length, b'x')
|
|
if not six.PY2:
|
|
self.container_name = self.container_name.decode('utf8')
|