sharding: Better-handle newlines in container names

Previously, if you were on Python 2.7.10+ [0], such a newline would cause the
sharder to fail, complaining about invalid header values when trying to create
the shard containers. On older versions of Python, it would most likely cause a
parsing error in the container-server that was trying to handle the PUT.

Now, quote all places that we pass around container paths. This includes:

  * The X-Container-Sysmeta-Shard-(Quoted-)Root sent when creating the (empty)
    remote shards
  * The X-Container-Sysmeta-Shard-(Quoted-)Root included when initializing the
    local handoff for cleaving
  * The X-Backend-(Quoted-)Container-Path the proxy sends to the object-server
    for container updates
  * The Location header the container-server sends to the object-updater

Note that a new header was required in requests so that servers would
know whether the value should be unquoted or not. We can get away with
reusing Location in responses by having clients opt-in to quoting with
a new X-Backend-Accept-Quoted-Location header.

During a rolling upgrade,

  * old object-servers servicing requests from new proxy-servers will
    not know about the container path override and so will try to update
    the root container,
  * in general, object updates are more likely to land in the root
    container; the sharder will deal with them as misplaced objects, and
  * shard containers created by new code on servers running old code
    will think they are root containers until the server is running new
    code, too; during this time they'll fail the sharder audit and report
    stats to their account, but both of these should get cleared up upon
    upgrade.

Drive-by: fix a "conainer_name" typo that prevented us from testing that
we can shard a container with unicode in its name. Also, add more UTF8
probe tests.

[0] See https://bugs.python.org/issue22928

Change-Id: Ie08f36e31a448a547468dd85911c3a3bc30e89f1
Closes-Bug: 1856894
This commit is contained in:
Tim Burke 2019-12-18 15:14:00 -08:00
parent 7862ec7b8a
commit 3f88907012
17 changed files with 1059 additions and 53 deletions

View File

@ -59,19 +59,23 @@ def utf8encode(*args):
for s in args]
def native_str_keys(metadata):
def native_str_keys_and_values(metadata):
if six.PY2:
uni_keys = [k for k in metadata if isinstance(k, six.text_type)]
for k in uni_keys:
sv = metadata[k]
del metadata[k]
metadata[k.encode('utf-8')] = sv
metadata[k.encode('utf-8')] = [
x.encode('utf-8') if isinstance(x, six.text_type) else x
for x in sv]
else:
bin_keys = [k for k in metadata if isinstance(k, six.binary_type)]
for k in bin_keys:
sv = metadata[k]
del metadata[k]
metadata[k.decode('utf-8')] = sv
metadata[k.decode('utf-8')] = [
x.decode('utf-8') if isinstance(x, six.binary_type) else x
for x in sv]
ZERO_LIKE_VALUES = {None, '', 0, '0'}
@ -878,7 +882,7 @@ class DatabaseBroker(object):
metadata = self.get_raw_metadata()
if metadata:
metadata = json.loads(metadata)
native_str_keys(metadata)
native_str_keys_and_values(metadata)
else:
metadata = {}
return metadata
@ -940,7 +944,7 @@ class DatabaseBroker(object):
self.db_type)
md = row[0]
md = json.loads(md) if md else {}
native_str_keys(md)
native_str_keys_and_values(md)
except sqlite3.OperationalError as err:
if 'no such column: metadata' not in str(err):
raise

View File

@ -75,7 +75,7 @@ from six.moves import cPickle as pickle
from six.moves.configparser import (ConfigParser, NoSectionError,
NoOptionError, RawConfigParser)
from six.moves import range, http_client
from six.moves.urllib.parse import quote as _quote
from six.moves.urllib.parse import quote as _quote, unquote
from six.moves.urllib.parse import urlparse
from swift import gettext_ as _
@ -5699,6 +5699,9 @@ def get_redirect_data(response):
if 'Location' not in headers:
return None
location = urlparse(headers['Location']).path
if config_true_value(headers.get('X-Backend-Location-Is-Quoted',
'false')):
location = unquote(location)
account, container, _junk = split_path(location, 2, 3, True)
timestamp_val = headers.get('X-Backend-Redirect-Timestamp')
try:

View File

@ -22,6 +22,7 @@ from uuid import uuid4
import six
from six.moves import range
from six.moves.urllib.parse import unquote
import sqlite3
from eventlet import tpool
@ -2040,7 +2041,14 @@ class ContainerBroker(DatabaseBroker):
``container`` attributes respectively.
"""
path = self.get_sharding_sysmeta('Root')
path = self.get_sharding_sysmeta('Quoted-Root')
hdr = 'X-Container-Sysmeta-Shard-Quoted-Root'
if path:
path = unquote(path)
else:
path = self.get_sharding_sysmeta('Root')
hdr = 'X-Container-Sysmeta-Shard-Root'
if not path:
# Ensure account/container get populated
self._populate_instance_cache()
@ -2052,8 +2060,8 @@ class ContainerBroker(DatabaseBroker):
self._root_account, self._root_container = split_path(
'/' + path, 2, 2)
except ValueError:
raise ValueError("Expected X-Container-Sysmeta-Shard-Root to be "
"of the form 'account/container', got %r" % path)
raise ValueError("Expected %s to be of the form "
"'account/container', got %r" % (hdr, path))
@property
def root_account(self):

View File

@ -23,6 +23,7 @@ from swift import gettext_ as _
from eventlet import Timeout
import six
from six.moves.urllib.parse import quote
import swift.common.db
from swift.container.sync_store import ContainerSyncStore
@ -312,6 +313,11 @@ class ContainerController(BaseStorageServer):
"""
if not config_true_value(
req.headers.get('x-backend-accept-redirect', False)):
# We want to avoid fetching shard ranges for the (more
# time-sensitive) object-server update, so allow some misplaced
# objects to land between when we've started sharding and when the
# proxy learns about it. Note that this path is also used by old,
# pre-sharding updaters during a rolling upgrade.
return None
shard_ranges = broker.get_shard_ranges(
@ -324,7 +330,15 @@ class ContainerController(BaseStorageServer):
# in preference to the parent, which is the desired result.
containing_range = shard_ranges[0]
location = "/%s/%s" % (containing_range.name, obj_name)
headers = {'Location': location,
if location != quote(location) and not config_true_value(
req.headers.get('x-backend-accept-quoted-location', False)):
# Sender expects the destination to be unquoted, but it isn't safe
# to send unquoted. Eat the update for now and let the sharder
# move it later. Should only come up during rolling upgrades.
return None
headers = {'Location': quote(location),
'X-Backend-Location-Is-Quoted': 'true',
'X-Backend-Redirect-Timestamp':
containing_range.timestamp.internal}

View File

@ -656,7 +656,14 @@ class ContainerSharder(ContainerReplicator):
# Get the valid info into the broker.container, etc
shard_broker.get_info()
shard_broker.merge_shard_ranges(shard_range)
shard_broker.set_sharding_sysmeta('Root', root_path)
shard_broker.set_sharding_sysmeta('Quoted-Root', quote(root_path))
# NB: we *used* to do
# shard_broker.set_sharding_sysmeta('Root', root_path)
# but that isn't safe for container names with nulls or newlines (or
# possibly some other characters). We consciously *don't* make any
# attempt to set the old meta; during an upgrade, some shards may think
# they are in fact roots, but it cleans up well enough once everyone's
# upgraded.
shard_broker.update_metadata({
'X-Container-Sysmeta-Sharding':
('True', Timestamp.now().internal)})
@ -1129,8 +1136,16 @@ class ContainerSharder(ContainerReplicator):
shard_range.update_state(ShardRange.CREATED)
headers = {
'X-Backend-Storage-Policy-Index': broker.storage_policy_index,
'X-Container-Sysmeta-Shard-Root': broker.root_path,
'X-Container-Sysmeta-Shard-Quoted-Root': quote(
broker.root_path),
'X-Container-Sysmeta-Sharding': True}
# NB: we *used* to send along
# 'X-Container-Sysmeta-Shard-Root': broker.root_path
# but that isn't safe for container names with nulls or newlines
# (or possibly some other characters). We consciously *don't* make
# any attempt to set the old meta; during an upgrade, some shards
# may think they are in fact roots, but it cleans up well enough
# once everyone's upgraded.
success = self._send_shard_ranges(
shard_range.account, shard_range.container,
[shard_range], headers=headers)

View File

@ -17,6 +17,7 @@
import six
import six.moves.cPickle as pickle
from six.moves.urllib.parse import unquote
import json
import os
import multiprocessing
@ -366,7 +367,6 @@ class ObjectController(BaseStorageServer):
contdevices = [d.strip() for d in
headers_in.get('X-Container-Device', '').split(',')]
contpartition = headers_in.get('X-Container-Partition', '')
contpath = headers_in.get('X-Backend-Container-Path')
if len(conthosts) != len(contdevices):
# This shouldn't happen unless there's a bug in the proxy,
@ -379,6 +379,12 @@ class ObjectController(BaseStorageServer):
'devices': headers_in.get('X-Container-Device', '')})
return
contpath = headers_in.get('X-Backend-Quoted-Container-Path')
if contpath:
contpath = unquote(contpath)
else:
contpath = headers_in.get('X-Backend-Container-Path')
if contpath:
try:
# TODO: this is very late in request handling to be validating

View File

@ -358,6 +358,7 @@ class ObjectUpdater(Daemon):
headers_out.setdefault('X-Backend-Storage-Policy-Index',
str(int(policy)))
headers_out.setdefault('X-Backend-Accept-Redirect', 'true')
headers_out.setdefault('X-Backend-Accept-Quoted-Location', 'true')
container_path = update.get('container_path')
if container_path:
acct, cont = split_path('/' + container_path, minsegs=2)

View File

@ -343,7 +343,15 @@ class BaseObjectController(Controller):
headers[index].get('X-Container-Device'),
container['device'])
if container_path:
headers[index]['X-Backend-Container-Path'] = container_path
headers[index]['X-Backend-Quoted-Container-Path'] = quote(
container_path)
# NB: we used to send
# 'X-Backend-Container-Path': container_path
# but that isn't safe for container names with nulls or
# newlines (or possibly some other characters). We consciously
# *don't* make any attempt to set the old meta; during an
# upgrade, old object-servers will talk to the root which
# will eat the update and move it as a misplaced object.
def set_delete_at_headers(index, delete_at_node):
headers[index]['X-Delete-At-Container'] = delete_at_container

View File

@ -20,6 +20,7 @@ import uuid
from nose import SkipTest
import six
from six.moves.urllib.parse import quote
from swift.common import direct_client, utils
from swift.common.manager import Manager
@ -57,6 +58,7 @@ class ShardCollector(object):
class BaseTestContainerSharding(ReplProbeTest):
DELIM = '-'
def _maybe_skip_test(self):
try:
@ -101,10 +103,10 @@ class BaseTestContainerSharding(ReplProbeTest):
self._maybe_skip_test()
def _make_object_names(self, number):
return ['obj-%04d' % x for x in range(number)]
return ['obj%s%04d' % (self.DELIM, x) for x in range(number)]
def _setup_container_name(self):
self.container_name = 'container-%s' % uuid.uuid4()
self.container_name = 'container%s%s' % (self.DELIM, uuid.uuid4())
def setUp(self):
client.logger.setLevel(client.logging.WARNING)
@ -415,7 +417,8 @@ class TestContainerShardingNonUTF8(BaseTestContainerSharding):
self.max_shard_size // 2)]
def check_listing(objects, **params):
qs = '&'.join(['%s=%s' % param for param in params.items()])
qs = '&'.join('%s=%s' % (k, quote(str(v)))
for k, v in params.items())
headers, listing = client.get_container(
self.url, self.token, self.container_name, query_string=qs)
listing = [x['name'].encode('utf-8') if six.PY2 else x['name']
@ -468,12 +471,12 @@ class TestContainerShardingNonUTF8(BaseTestContainerSharding):
# delimiter
headers, listing = client.get_container(
self.url, self.token, self.container_name,
query_string='delimiter=-')
self.assertEqual([{'subdir': 'obj-'}], listing)
query_string='delimiter=' + quote(self.DELIM))
self.assertEqual([{'subdir': 'obj' + self.DELIM}], listing)
headers, listing = client.get_container(
self.url, self.token, self.container_name,
query_string='delimiter=j-')
self.assertEqual([{'subdir': 'obj-'}], listing)
query_string='delimiter=j' + quote(self.DELIM))
self.assertEqual([{'subdir': 'obj' + self.DELIM}], listing)
limit = self.cluster_info['swift']['container_listing_limit']
exc = check_listing_fails(412, limit=limit + 1)
@ -546,13 +549,23 @@ class TestContainerShardingNonUTF8(BaseTestContainerSharding):
self.assert_container_post_ok('sharded')
class TestContainerShardingFunkyNames(TestContainerShardingNonUTF8):
DELIM = '\n'
def _make_object_names(self, number):
return ['obj\n%04d%%Ff' % x for x in range(number)]
def _setup_container_name(self):
self.container_name = 'container\n%%Ff\n%s' % uuid.uuid4()
class TestContainerShardingUTF8(TestContainerShardingNonUTF8):
def _make_object_names(self, number):
# override default with names that include non-ascii chars
name_length = self.cluster_info['swift']['max_object_name_length']
obj_names = []
for x in range(number):
name = (u'obj-\u00e4\u00ea\u00ec\u00f2\u00fb-%04d' % x)
name = (u'obj-\u00e4\u00ea\u00ec\u00f2\u00fb\u1234-%04d' % x)
name = name.encode('utf8').ljust(name_length, b'o')
if not six.PY2:
name = name.decode('utf8')
@ -563,10 +576,11 @@ class TestContainerShardingUTF8(TestContainerShardingNonUTF8):
# override default with max length name that includes non-ascii chars
super(TestContainerShardingUTF8, self)._setup_container_name()
name_length = self.cluster_info['swift']['max_container_name_length']
cont_name = self.container_name + u'-\u00e4\u00ea\u00ec\u00f2\u00fb'
self.conainer_name = cont_name.ljust(name_length, 'x')
if six.PY2:
self.conainer_name = self.container_name.encode('utf8')
cont_name = \
self.container_name + u'-\u00e4\u00ea\u00ec\u00f2\u00fb\u1234'
self.container_name = cont_name.encode('utf8').ljust(name_length, b'x')
if not six.PY2:
self.container_name = self.container_name.decode('utf8')
class TestContainerSharding(BaseTestContainerSharding):
@ -1114,7 +1128,9 @@ class TestContainerSharding(BaseTestContainerSharding):
shard_listings = self.direct_get_container(sr.account,
sr.container)
for node, (hdrs, listing) in shard_listings.items():
shard_listing_names = [o['name'] for o in listing]
shard_listing_names = [
o['name'].encode('utf-8') if six.PY2 else o['name']
for o in listing]
for obj in obj_names[4::5]:
if obj in sr:
self.assertIn(obj, shard_listing_names)
@ -1178,8 +1194,9 @@ class TestContainerSharding(BaseTestContainerSharding):
expected_shards=0, exp_obj_count=0):
# checks that shard range is consistent on all nodes
root_path = '%s/%s' % (self.account, self.container_name)
exp_shard_hdrs = {'X-Container-Sysmeta-Shard-Root': root_path,
'X-Backend-Sharding-State': expected_state}
exp_shard_hdrs = {
'X-Container-Sysmeta-Shard-Quoted-Root': quote(root_path),
'X-Backend-Sharding-State': expected_state}
object_counts = []
bytes_used = []
for node_id, node_data in node_data.items():
@ -2178,3 +2195,27 @@ class TestContainerSharding(BaseTestContainerSharding):
self.assertEqual(2, int(metadata.get('x-account-container-count')))
self.assertEqual(0, int(metadata.get('x-account-object-count')))
self.assertEqual(0, int(metadata.get('x-account-bytes-used')))
class TestContainerShardingMoreUTF8(TestContainerSharding):
def _make_object_names(self, number):
# override default with names that include non-ascii chars
name_length = self.cluster_info['swift']['max_object_name_length']
obj_names = []
for x in range(number):
name = (u'obj-\u00e4\u00ea\u00ec\u00f2\u00fb-%04d' % x)
name = name.encode('utf8').ljust(name_length, b'o')
if not six.PY2:
name = name.decode('utf8')
obj_names.append(name)
return obj_names
def _setup_container_name(self):
# override default with max length name that includes non-ascii chars
super(TestContainerShardingMoreUTF8, self)._setup_container_name()
name_length = self.cluster_info['swift']['max_container_name_length']
cont_name = \
self.container_name + u'-\u00e4\u00ea\u00ec\u00f2\u00fb\u1234'
self.container_name = cont_name.encode('utf8').ljust(name_length, b'x')
if not six.PY2:
self.container_name = self.container_name.decode('utf8')

View File

@ -526,13 +526,14 @@ class TestExampleBroker(unittest.TestCase):
# This is not obvious. The actual JSON in the database is the same:
# '{"test\\u062a": ["value\\u062a", "0000000001.00000"]}'
# The only difference is what reading it produces on py2 and py3.
# We use native strings for metadata keys (see native_str_keys()),
# so keys are different.
# We use native strings for metadata (see native_str_keys_and_values),
# so types are different.
if six.PY2:
key = u'test\u062a'.encode('utf-8')
value = u'value\u062a'.encode('utf-8')
else:
key = u'test\u062a'
value = u'value\u062a'
value = u'value\u062a'
metadata = {
key: [value, Timestamp(1).internal]
}

View File

@ -339,7 +339,7 @@ class TestContainerBroker(unittest.TestCase):
self.assertTrue(broker.empty())
@with_tempdir
def test_empty_shard_container(self, tempdir):
def test_empty_old_style_shard_container(self, tempdir):
# Test ContainerBroker.empty for a shard container where shard range
# usage should not be considered
db_path = os.path.join(
@ -418,6 +418,86 @@ class TestContainerBroker(unittest.TestCase):
broker.merge_shard_ranges([own_sr])
self.assertTrue(broker.empty())
@with_tempdir
def test_empty_shard_container(self, tempdir):
# Test ContainerBroker.empty for a shard container where shard range
# usage should not be considered
db_path = os.path.join(
tempdir, 'containers', 'part', 'suffix', 'hash', 'container.db')
broker = ContainerBroker(db_path, account='.shards_a', container='cc')
broker.initialize(next(self.ts).internal, 0)
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
self.assertFalse(broker.is_root_container())
def check_object_counted(broker_to_test, broker_with_object):
obj = {'name': 'o', 'created_at': next(self.ts).internal,
'size': 0, 'content_type': 'text/plain', 'etag': EMPTY_ETAG,
'deleted': 0}
broker_with_object.merge_items([dict(obj)])
self.assertFalse(broker_to_test.empty())
# and delete it
obj.update({'created_at': next(self.ts).internal, 'deleted': 1})
broker_with_object.merge_items([dict(obj)])
self.assertTrue(broker_to_test.empty())
self.assertTrue(broker.empty())
check_object_counted(broker, broker)
# own shard range is not considered for object count
own_sr = broker.get_own_shard_range()
self.assertEqual(0, own_sr.object_count)
broker.merge_shard_ranges([own_sr])
self.assertTrue(broker.empty())
broker.put_object('o', next(self.ts).internal, 0, 'text/plain',
EMPTY_ETAG)
own_sr = broker.get_own_shard_range()
self.assertEqual(1, own_sr.object_count)
broker.merge_shard_ranges([own_sr])
self.assertFalse(broker.empty())
broker.delete_object('o', next(self.ts).internal)
self.assertTrue(broker.empty())
def check_shard_ranges_not_counted():
sr = ShardRange('.shards_a/shard_c', next(self.ts), object_count=0)
sr.update_meta(13, 99, meta_timestamp=next(self.ts))
for state in ShardRange.STATES:
sr.update_state(state, state_timestamp=next(self.ts))
broker.merge_shard_ranges([sr])
self.assertTrue(broker.empty())
# empty other shard ranges do not influence result
sr.update_meta(0, 0, meta_timestamp=next(self.ts))
for state in ShardRange.STATES:
sr.update_state(state, state_timestamp=next(self.ts))
broker.merge_shard_ranges([sr])
self.assertTrue(broker.empty())
check_shard_ranges_not_counted()
# move to sharding state
broker.enable_sharding(next(self.ts))
self.assertTrue(broker.set_sharding_state())
# check object in retiring db is considered
check_object_counted(broker, broker.get_brokers()[0])
self.assertTrue(broker.empty())
# as well as misplaced objects in fresh db
check_object_counted(broker, broker)
check_shard_ranges_not_counted()
# move to sharded state
self.assertTrue(broker.set_sharded_state())
self.assertTrue(broker.empty())
check_object_counted(broker, broker)
check_shard_ranges_not_counted()
# own shard range still has no influence
own_sr = broker.get_own_shard_range()
own_sr.update_meta(3, 4, meta_timestamp=next(self.ts))
broker.merge_shard_ranges([own_sr])
self.assertTrue(broker.empty())
def test_reclaim(self):
broker = ContainerBroker(':memory:', account='test_account',
container='test_container')
@ -3361,7 +3441,7 @@ class TestContainerBroker(unittest.TestCase):
self.assertEqual('myaccount/mycontainer', broker.path)
@with_tempdir
def test_root_account_container_path(self, tempdir):
def test_old_style_root_account_container_path(self, tempdir):
db_path = os.path.join(tempdir, 'container.db')
broker = ContainerBroker(
db_path, account='root_a', container='root_c')
@ -3442,6 +3522,88 @@ class TestContainerBroker(unittest.TestCase):
check_validation('/root_a/root_c/blah')
check_validation('/')
@with_tempdir
def test_root_account_container_path(self, tempdir):
db_path = os.path.join(tempdir, 'container.db')
broker = ContainerBroker(
db_path, account='root_a', container='root_c')
broker.initialize(next(self.ts).internal, 1)
# make sure we can cope with unitialized account and container
broker.account = broker.container = None
self.assertEqual('root_a', broker.root_account)
self.assertEqual('root_c', broker.root_container)
self.assertEqual('root_a/root_c', broker.root_path)
self.assertTrue(broker.is_root_container())
self.assertEqual('root_a', broker.account) # sanity check
self.assertEqual('root_c', broker.container) # sanity check
# we don't expect root containers to have this sysmeta set but if it is
# the broker should still behave like a root container
metadata = {
'X-Container-Sysmeta-Shard-Quoted-Root':
('root_a/root_c', next(self.ts).internal)}
broker = ContainerBroker(
db_path, account='root_a', container='root_c')
broker.update_metadata(metadata)
broker.account = broker.container = None
self.assertEqual('root_a', broker.root_account)
self.assertEqual('root_c', broker.root_container)
self.assertEqual('root_a/root_c', broker.root_path)
self.assertTrue(broker.is_root_container())
# if root is marked deleted, it still considers itself to be a root
broker.delete_db(next(self.ts).internal)
self.assertEqual('root_a', broker.root_account)
self.assertEqual('root_c', broker.root_container)
self.assertEqual('root_a/root_c', broker.root_path)
self.assertTrue(broker.is_root_container())
# check the values are not just being cached
broker = ContainerBroker(db_path)
self.assertEqual('root_a', broker.root_account)
self.assertEqual('root_c', broker.root_container)
self.assertEqual('root_a/root_c', broker.root_path)
self.assertTrue(broker.is_root_container())
# check a shard container
db_path = os.path.join(tempdir, 'shard_container.db')
broker = ContainerBroker(
db_path, account='.shards_root_a', container='c_shard')
broker.initialize(next(self.ts).internal, 1)
# now the metadata is significant...
metadata = {
'X-Container-Sysmeta-Shard-Quoted-Root':
('root_a/root_c', next(self.ts).internal)}
broker.update_metadata(metadata)
broker.account = broker.container = None
broker._root_account = broker._root_container = None
self.assertEqual('root_a', broker.root_account)
self.assertEqual('root_c', broker.root_container)
self.assertEqual('root_a/root_c', broker.root_path)
self.assertFalse(broker.is_root_container())
# check validation
def check_validation(root_value):
metadata = {
'X-Container-Sysmeta-Shard-Quoted-Root':
(root_value, next(self.ts).internal)}
broker.update_metadata(metadata)
broker.account = broker.container = None
broker._root_account = broker._root_container = None
with self.assertRaises(ValueError) as cm:
broker.root_account
self.assertIn('Expected X-Container-Sysmeta-Shard-Quoted-Root',
str(cm.exception))
with self.assertRaises(ValueError):
broker.root_container
check_validation('root_a')
check_validation('/root_a')
check_validation('/root_a/root_c')
check_validation('/root_a/root_c/blah')
check_validation('/')
def test_resolve_shard_range_states(self):
self.assertIsNone(ContainerBroker.resolve_shard_range_states(None))
self.assertIsNone(ContainerBroker.resolve_shard_range_states([]))
@ -4422,7 +4584,8 @@ class TestContainerBroker(unittest.TestCase):
do_test(orig_state, ts, test_state, ts_newer, test_state,
ts_newer)
def _check_object_stats_when_sharded(self, a, c, root_a, root_c, tempdir):
def _check_object_stats_when_old_style_sharded(
self, a, c, root_a, root_c, tempdir):
# common setup and assertions for root and shard containers
db_path = os.path.join(
tempdir, 'containers', 'part', 'suffix', 'hash', 'container.db')
@ -4449,6 +4612,51 @@ class TestContainerBroker(unittest.TestCase):
self.assertEqual(14, broker.get_info()['bytes_used'])
return broker
@with_tempdir
def test_object_stats_old_style_root_container(self, tempdir):
broker = self._check_object_stats_when_old_style_sharded(
'a', 'c', 'a', 'c', tempdir)
self.assertTrue(broker.is_root_container()) # sanity
self.assertTrue(broker.set_sharded_state())
self.assertEqual(120, broker.get_info()['object_count'])
self.assertEqual(1999, broker.get_info()['bytes_used'])
@with_tempdir
def test_object_stats_old_style_shard_container(self, tempdir):
broker = self._check_object_stats_when_old_style_sharded(
'.shard_a', 'c-blah', 'a', 'c', tempdir)
self.assertFalse(broker.is_root_container()) # sanity
self.assertTrue(broker.set_sharded_state())
self.assertEqual(0, broker.get_info()['object_count'])
self.assertEqual(0, broker.get_info()['bytes_used'])
def _check_object_stats_when_sharded(self, a, c, root_a, root_c, tempdir):
# common setup and assertions for root and shard containers
db_path = os.path.join(
tempdir, 'containers', 'part', 'suffix', 'hash', 'container.db')
broker = ContainerBroker(
db_path, account=a, container=c)
broker.initialize(next(self.ts).internal, 0)
broker.set_sharding_sysmeta('Quoted-Root', '%s/%s' % (root_a, root_c))
broker.merge_items([{'name': 'obj', 'size': 14, 'etag': 'blah',
'content_type': 'text/plain', 'deleted': 0,
'created_at': Timestamp.now().internal}])
self.assertEqual(1, broker.get_info()['object_count'])
self.assertEqual(14, broker.get_info()['bytes_used'])
broker.enable_sharding(next(self.ts))
self.assertTrue(broker.set_sharding_state())
sr_1 = ShardRange(
'%s/%s1' % (root_a, root_c), Timestamp.now(), lower='', upper='m',
object_count=99, bytes_used=999, state=ShardRange.ACTIVE)
sr_2 = ShardRange(
'%s/%s2' % (root_a, root_c), Timestamp.now(), lower='m', upper='',
object_count=21, bytes_used=1000, state=ShardRange.ACTIVE)
broker.merge_shard_ranges([sr_1, sr_2])
self.assertEqual(1, broker.get_info()['object_count'])
self.assertEqual(14, broker.get_info()['bytes_used'])
return broker
@with_tempdir
def test_object_stats_root_container(self, tempdir):
broker = self._check_object_stats_when_sharded(

View File

@ -924,7 +924,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
# our sync pointer
self.assertEqual(broker.get_reconciler_sync(), 2)
def test_misplaced_rows_replicate_and_enqueue_from_shard(self):
def test_misplaced_rows_replicate_and_enqueue_from_old_style_shard(self):
# force all timestamps to fall in same hour
ts = (Timestamp(t) for t in
itertools.count(int(time.time()) // 3600 * 3600))
@ -1009,6 +1009,91 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
# our sync pointer
self.assertEqual(broker.get_reconciler_sync(), 2)
def test_misplaced_rows_replicate_and_enqueue_from_shard(self):
# force all timestamps to fall in same hour
ts = (Timestamp(t) for t in
itertools.count(int(time.time()) // 3600 * 3600))
policy = random.choice(list(POLICIES))
broker = self._get_broker('.shards_a', 'some-other-c', node_index=0)
broker.initialize(next(ts).internal, policy.idx)
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
remote_policy = random.choice([p for p in POLICIES if p is not
policy])
remote_broker = self._get_broker(
'.shards_a', 'some-other-c', node_index=1)
remote_broker.initialize(next(ts).internal, remote_policy.idx)
# add a misplaced row to *local* broker
obj_put_timestamp = next(ts).internal
broker.put_object(
'o', obj_put_timestamp, 0, 'content-type',
'etag', storage_policy_index=remote_policy.idx)
misplaced = broker.get_misplaced_since(-1, 10)
self.assertEqual(len(misplaced), 1)
# since this row is misplaced it doesn't show up in count
self.assertEqual(broker.get_info()['object_count'], 0)
# add another misplaced row to *local* broker with composite timestamp
ts_data = next(ts)
ts_ctype = next(ts)
ts_meta = next(ts)
broker.put_object(
'o2', ts_data.internal, 0, 'content-type',
'etag', storage_policy_index=remote_policy.idx,
ctype_timestamp=ts_ctype.internal, meta_timestamp=ts_meta.internal)
misplaced = broker.get_misplaced_since(-1, 10)
self.assertEqual(len(misplaced), 2)
# since this row is misplaced it doesn't show up in count
self.assertEqual(broker.get_info()['object_count'], 0)
# replicate
part, node = self._get_broker_part_node(broker)
daemon = self._run_once(node)
# push to remote, and third node was missing (also maybe reconciler)
self.assertTrue(2 < daemon.stats['rsync'] <= 3, daemon.stats['rsync'])
# grab the rsynced instance of remote_broker
remote_broker = self._get_broker(
'.shards_a', 'some-other-c', node_index=1)
# remote has misplaced rows too now
misplaced = remote_broker.get_misplaced_since(-1, 10)
self.assertEqual(len(misplaced), 2)
# and the correct policy_index and object_count
info = remote_broker.get_info()
expectations = {
'object_count': 0,
'storage_policy_index': policy.idx,
}
for key, value in expectations.items():
self.assertEqual(info[key], value)
# and we should have also enqueued these rows in a single reconciler,
# since we forced the object timestamps to be in the same hour.
reconciler = daemon.get_reconciler_broker(misplaced[0]['created_at'])
# but it may not be on the same node as us anymore though...
reconciler = self._get_broker(reconciler.account,
reconciler.container, node_index=0)
self.assertEqual(reconciler.get_info()['object_count'], 2)
objects = reconciler.list_objects_iter(
10, '', None, None, None, None, storage_policy_index=0)
self.assertEqual(len(objects), 2)
# NB: reconciler work is for the *root* container!
expected = ('%s:/a/c/o' % remote_policy.idx, obj_put_timestamp, 0,
'application/x-put', obj_put_timestamp)
self.assertEqual(objects[0], expected)
# the second object's listing has ts_meta as its last modified time
# but its full composite timestamp is in the hash field.
expected = ('%s:/a/c/o2' % remote_policy.idx, ts_meta.internal, 0,
'application/x-put',
encode_timestamps(ts_data, ts_ctype, ts_meta))
self.assertEqual(objects[1], expected)
# having safely enqueued to the reconciler we can advance
# our sync pointer
self.assertEqual(broker.get_reconciler_sync(), 2)
def test_multiple_out_sync_reconciler_enqueue_normalize(self):
ts = (Timestamp(t).internal for t in
itertools.count(int(time.time())))

View File

@ -78,10 +78,25 @@ class BaseTestSharder(unittest.TestCase):
broker.initialize()
return broker
def _make_old_style_sharding_broker(self, account='a', container='c',
shard_bounds=(('', 'middle'),
('middle', ''))):
broker = self._make_broker(account=account, container=container)
broker.set_sharding_sysmeta('Root', 'a/c')
old_db_id = broker.get_info()['id']
broker.enable_sharding(next(self.ts_iter))
shard_ranges = self._make_shard_ranges(
shard_bounds, state=ShardRange.CLEAVED)
broker.merge_shard_ranges(shard_ranges)
self.assertTrue(broker.set_sharding_state())
broker = ContainerBroker(broker.db_file, account='a', container='c')
self.assertNotEqual(old_db_id, broker.get_info()['id']) # sanity check
return broker
def _make_sharding_broker(self, account='a', container='c',
shard_bounds=(('', 'middle'), ('middle', ''))):
broker = self._make_broker(account=account, container=container)
broker.set_sharding_sysmeta('Root', 'a/c')
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
old_db_id = broker.get_info()['id']
broker.enable_sharding(next(self.ts_iter))
shard_ranges = self._make_shard_ranges(
@ -2279,7 +2294,7 @@ class TestSharder(BaseTestSharder):
'.shards_', 'shard_c', (('l', 'mid'), ('mid', 'u')))
self.assertEqual(1, broker.get_own_shard_range().deleted)
def test_identify_sharding_candidate(self):
def test_identify_sharding_old_style_candidate(self):
brokers = [self._make_broker(container='c%03d' % i) for i in range(6)]
for broker in brokers:
broker.set_sharding_sysmeta('Root', 'a/c')
@ -2333,6 +2348,60 @@ class TestSharder(BaseTestSharder):
self._assert_recon_stats(
expected_recon, sharder, 'sharding_candidates')
def test_identify_sharding_candidate(self):
brokers = [self._make_broker(container='c%03d' % i) for i in range(6)]
for broker in brokers:
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
node = {'index': 2}
# containers are all empty
with self._mock_sharder() as sharder:
for broker in brokers:
sharder._identify_sharding_candidate(broker, node)
expected_stats = {}
self._assert_stats(expected_stats, sharder, 'sharding_candidates')
objects = [
['obj%3d' % i, next(self.ts_iter).internal, i, 'text/plain',
'etag%s' % i, 0] for i in range(160)]
# one container has 100 objects, which is below the sharding threshold
for obj in objects[:100]:
brokers[0].put_object(*obj)
conf = {'recon_cache_path': self.tempdir}
with self._mock_sharder(conf=conf) as sharder:
for broker in brokers:
sharder._identify_sharding_candidate(broker, node)
self.assertFalse(sharder.sharding_candidates)
expected_recon = {
'found': 0,
'top': []}
sharder._report_stats()
self._assert_recon_stats(
expected_recon, sharder, 'sharding_candidates')
# reduce the sharding threshold and the container is reported
conf = {'shard_container_threshold': 100,
'recon_cache_path': self.tempdir}
with self._mock_sharder(conf=conf) as sharder:
with mock_timestamp_now() as now:
for broker in brokers:
sharder._identify_sharding_candidate(broker, node)
stats_0 = {'path': brokers[0].db_file,
'node_index': 2,
'account': 'a',
'container': 'c000',
'root': 'a/c',
'object_count': 100,
'meta_timestamp': now.internal,
'file_size': os.stat(brokers[0].db_file).st_size}
self.assertEqual([stats_0], sharder.sharding_candidates)
expected_recon = {
'found': 1,
'top': [stats_0]}
sharder._report_stats()
self._assert_recon_stats(
expected_recon, sharder, 'sharding_candidates')
# repeat with handoff node and db_file error
with self._mock_sharder(conf=conf) as sharder:
with mock.patch('os.stat', side_effect=OSError('test error')):
@ -3489,7 +3558,7 @@ class TestSharder(BaseTestSharder):
self._check_objects([expected], expected_shard_dbs[0])
self._check_objects([], broker.db_file)
def _setup_find_ranges(self, account, cont, lower, upper):
def _setup_old_style_find_ranges(self, account, cont, lower, upper):
broker = self._make_broker(account=account, container=cont)
own_sr = ShardRange('%s/%s' % (account, cont), Timestamp.now(),
lower, upper)
@ -3503,6 +3572,106 @@ class TestSharder(BaseTestSharder):
broker.put_object(*obj)
return broker, objects
def _check_old_style_find_shard_ranges_none_found(self, broker, objects):
with self._mock_sharder() as sharder:
num_found = sharder._find_shard_ranges(broker)
self.assertGreater(sharder.split_size, len(objects))
self.assertEqual(0, num_found)
self.assertFalse(broker.get_shard_ranges())
expected_stats = {'attempted': 1, 'success': 0, 'failure': 1,
'found': 0, 'min_time': mock.ANY,
'max_time': mock.ANY}
stats = self._assert_stats(expected_stats, sharder, 'scanned')
self.assertGreaterEqual(stats['max_time'], stats['min_time'])
with self._mock_sharder(
conf={'shard_container_threshold': 200}) as sharder:
num_found = sharder._find_shard_ranges(broker)
self.assertEqual(sharder.split_size, len(objects))
self.assertEqual(0, num_found)
self.assertFalse(broker.get_shard_ranges())
expected_stats = {'attempted': 1, 'success': 0, 'failure': 1,
'found': 0, 'min_time': mock.ANY,
'max_time': mock.ANY}
stats = self._assert_stats(expected_stats, sharder, 'scanned')
self.assertGreaterEqual(stats['max_time'], stats['min_time'])
def test_old_style_find_shard_ranges_none_found_root(self):
broker, objects = self._setup_old_style_find_ranges('a', 'c', '', '')
self._check_old_style_find_shard_ranges_none_found(broker, objects)
def test_old_style_find_shard_ranges_none_found_shard(self):
broker, objects = self._setup_old_style_find_ranges(
'.shards_a', 'c', 'lower', 'upper')
self._check_old_style_find_shard_ranges_none_found(broker, objects)
def _check_old_style_find_shard_ranges_finds_two(
self, account, cont, lower, upper):
def check_ranges():
self.assertEqual(2, len(broker.get_shard_ranges()))
expected_ranges = [
ShardRange(
ShardRange.make_path('.int_shards_a', 'c', cont, now, 0),
now, lower, objects[98][0], 99),
ShardRange(
ShardRange.make_path('.int_shards_a', 'c', cont, now, 1),
now, objects[98][0], upper, 1),
]
self._assert_shard_ranges_equal(expected_ranges,
broker.get_shard_ranges())
# first invocation finds both ranges
broker, objects = self._setup_old_style_find_ranges(
account, cont, lower, upper)
with self._mock_sharder(conf={'shard_container_threshold': 199,
'auto_create_account_prefix': '.int_'}
) as sharder:
with mock_timestamp_now() as now:
num_found = sharder._find_shard_ranges(broker)
self.assertEqual(99, sharder.split_size)
self.assertEqual(2, num_found)
check_ranges()
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0,
'found': 2, 'min_time': mock.ANY,
'max_time': mock.ANY}
stats = self._assert_stats(expected_stats, sharder, 'scanned')
self.assertGreaterEqual(stats['max_time'], stats['min_time'])
# second invocation finds none
with self._mock_sharder(conf={'shard_container_threshold': 199,
'auto_create_account_prefix': '.int_'}
) as sharder:
num_found = sharder._find_shard_ranges(broker)
self.assertEqual(0, num_found)
self.assertEqual(2, len(broker.get_shard_ranges()))
check_ranges()
expected_stats = {'attempted': 0, 'success': 0, 'failure': 0,
'found': 0, 'min_time': mock.ANY,
'max_time': mock.ANY}
stats = self._assert_stats(expected_stats, sharder, 'scanned')
self.assertGreaterEqual(stats['max_time'], stats['min_time'])
def test_old_style_find_shard_ranges_finds_two_root(self):
self._check_old_style_find_shard_ranges_finds_two('a', 'c', '', '')
def test_old_style_find_shard_ranges_finds_two_shard(self):
self._check_old_style_find_shard_ranges_finds_two(
'.shards_a', 'c_', 'l', 'u')
def _setup_find_ranges(self, account, cont, lower, upper):
broker = self._make_broker(account=account, container=cont)
own_sr = ShardRange('%s/%s' % (account, cont), Timestamp.now(),
lower, upper)
broker.merge_shard_ranges([own_sr])
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
objects = [
# some of these are misplaced objects
['obj%3d' % i, self.ts_encoded(), i, 'text/plain', 'etag%s' % i, 0]
for i in range(100)]
for obj in objects:
broker.put_object(*obj)
return broker, objects
def _check_find_shard_ranges_none_found(self, broker, objects):
with self._mock_sharder() as sharder:
num_found = sharder._find_shard_ranges(broker)
@ -4144,7 +4313,7 @@ class TestSharder(BaseTestSharder):
self._assert_stats(expected_stats, sharder, 'audit_root')
mocked.assert_not_called()
def test_audit_shard_container(self):
def test_audit_old_style_shard_container(self):
broker = self._make_broker(account='.shards_a', container='shard_c')
broker.set_sharding_sysmeta('Root', 'a/c')
# include overlaps to verify correct match for updating own shard range
@ -4282,6 +4451,144 @@ class TestSharder(BaseTestSharder):
assert_ok()
self.assertTrue(broker.is_deleted())
def test_audit_shard_container(self):
broker = self._make_broker(account='.shards_a', container='shard_c')
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
# include overlaps to verify correct match for updating own shard range
shard_bounds = (
('a', 'j'), ('k', 't'), ('k', 's'), ('l', 's'), ('s', 'z'))
shard_ranges = self._make_shard_ranges(shard_bounds, ShardRange.ACTIVE)
shard_ranges[1].name = broker.path
expected_stats = {'attempted': 1, 'success': 0, 'failure': 1}
def call_audit_container(exc=None):
with self._mock_sharder() as sharder:
sharder.logger = debug_logger()
with mock.patch.object(sharder, '_audit_root_container') \
as mocked, mock.patch.object(
sharder, 'int_client') as mock_swift:
mock_response = mock.MagicMock()
mock_response.headers = {'x-backend-record-type':
'shard'}
mock_response.body = json.dumps(
[dict(sr) for sr in shard_ranges])
mock_swift.make_request.return_value = mock_response
mock_swift.make_request.side_effect = exc
mock_swift.make_path = (lambda a, c:
'/v1/%s/%s' % (a, c))
sharder.reclaim_age = 0
sharder._audit_container(broker)
mocked.assert_not_called()
return sharder, mock_swift
# bad account name
broker.account = 'bad_account'
sharder, mock_swift = call_audit_container()
lines = sharder.logger.get_lines_for_level('warning')
self._assert_stats(expected_stats, sharder, 'audit_shard')
self.assertIn('Audit warnings for shard %s' % broker.db_file, lines[0])
self.assertIn('account not in shards namespace', lines[0])
self.assertNotIn('root has no matching shard range', lines[0])
self.assertNotIn('unable to get shard ranges from root', lines[0])
self.assertIn('Audit failed for shard %s' % broker.db_file, lines[1])
self.assertIn('missing own shard range', lines[1])
self.assertFalse(lines[2:])
self.assertFalse(broker.is_deleted())
# missing own shard range
broker.get_info()
sharder, mock_swift = call_audit_container()
lines = sharder.logger.get_lines_for_level('warning')
self._assert_stats(expected_stats, sharder, 'audit_shard')
self.assertIn('Audit failed for shard %s' % broker.db_file, lines[0])
self.assertIn('missing own shard range', lines[0])
self.assertNotIn('unable to get shard ranges from root', lines[0])
self.assertFalse(lines[1:])
self.assertFalse(sharder.logger.get_lines_for_level('error'))
self.assertFalse(broker.is_deleted())
# create own shard range, no match in root
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0}
own_shard_range = broker.get_own_shard_range() # get the default
own_shard_range.lower = 'j'
own_shard_range.upper = 'k'
broker.merge_shard_ranges([own_shard_range])
sharder, mock_swift = call_audit_container()
lines = sharder.logger.get_lines_for_level('warning')
self.assertIn('Audit warnings for shard %s' % broker.db_file, lines[0])
self.assertNotIn('account not in shards namespace', lines[0])
self.assertNotIn('missing own shard range', lines[0])
self.assertIn('root has no matching shard range', lines[0])
self.assertNotIn('unable to get shard ranges from root', lines[0])
self._assert_stats(expected_stats, sharder, 'audit_shard')
self.assertFalse(lines[1:])
self.assertFalse(sharder.logger.get_lines_for_level('error'))
self.assertFalse(broker.is_deleted())
expected_headers = {'X-Backend-Record-Type': 'shard',
'X-Newest': 'true',
'X-Backend-Include-Deleted': 'True',
'X-Backend-Override-Deleted': 'true'}
params = {'format': 'json', 'marker': 'j', 'end_marker': 'k'}
mock_swift.make_request.assert_called_once_with(
'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,),
params=params)
# create own shard range, failed response from root
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0}
own_shard_range = broker.get_own_shard_range() # get the default
own_shard_range.lower = 'j'
own_shard_range.upper = 'k'
broker.merge_shard_ranges([own_shard_range])
sharder, mock_swift = call_audit_container(
exc=internal_client.UnexpectedResponse('bad', 'resp'))
lines = sharder.logger.get_lines_for_level('warning')
self.assertIn('Failed to get shard ranges', lines[0])
self.assertIn('Audit warnings for shard %s' % broker.db_file, lines[1])
self.assertNotIn('account not in shards namespace', lines[1])
self.assertNotIn('missing own shard range', lines[1])
self.assertNotIn('root has no matching shard range', lines[1])
self.assertIn('unable to get shard ranges from root', lines[1])
self._assert_stats(expected_stats, sharder, 'audit_shard')
self.assertFalse(lines[2:])
self.assertFalse(sharder.logger.get_lines_for_level('error'))
self.assertFalse(broker.is_deleted())
mock_swift.make_request.assert_called_once_with(
'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,),
params=params)
def assert_ok():
sharder, mock_swift = call_audit_container()
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
self.assertFalse(sharder.logger.get_lines_for_level('error'))
self._assert_stats(expected_stats, sharder, 'audit_shard')
params = {'format': 'json', 'marker': 'k', 'end_marker': 't'}
mock_swift.make_request.assert_called_once_with(
'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,),
params=params)
# make own shard range match one in root, but different state
shard_ranges[1].timestamp = Timestamp.now()
broker.merge_shard_ranges([shard_ranges[1]])
now = Timestamp.now()
shard_ranges[1].update_state(ShardRange.SHARDING, state_timestamp=now)
assert_ok()
self.assertFalse(broker.is_deleted())
# own shard range state is updated from root version
own_shard_range = broker.get_own_shard_range()
self.assertEqual(ShardRange.SHARDING, own_shard_range.state)
self.assertEqual(now, own_shard_range.state_timestamp)
own_shard_range.update_state(ShardRange.SHARDED,
state_timestamp=Timestamp.now())
broker.merge_shard_ranges([own_shard_range])
assert_ok()
own_shard_range.deleted = 1
own_shard_range.timestamp = Timestamp.now()
broker.merge_shard_ranges([own_shard_range])
assert_ok()
self.assertTrue(broker.is_deleted())
def test_find_and_enable_sharding_candidates(self):
broker = self._make_broker()
broker.enable_sharding(next(self.ts_iter))
@ -4760,6 +5067,116 @@ class TestCleavingContext(BaseTestSharder):
else:
self.fail("Deleted context 'Context-%s' not found")
def test_store_old_style(self):
broker = self._make_old_style_sharding_broker()
old_db_id = broker.get_brokers()[0].get_info()['id']
last_mod = Timestamp.now()
ctx = CleavingContext(old_db_id, 'curs', 12, 11, 2, True, True, 2, 4)
with mock_timestamp_now(last_mod):
ctx.store(broker)
key = 'X-Container-Sysmeta-Shard-Context-%s' % old_db_id
data = json.loads(broker.metadata[key][0])
expected = {'ref': old_db_id,
'cursor': 'curs',
'max_row': 12,
'cleave_to_row': 11,
'last_cleave_to_row': 2,
'cleaving_done': True,
'misplaced_done': True,
'ranges_done': 2,
'ranges_todo': 4}
self.assertEqual(expected, data)
# last modified is the metadata timestamp
self.assertEqual(broker.metadata[key][1], last_mod.internal)
def test_store_add_row_load_old_style(self):
# adding row to older db changes only max_row in the context
broker = self._make_old_style_sharding_broker()
old_broker = broker.get_brokers()[0]
old_db_id = old_broker.get_info()['id']
old_broker.merge_items([old_broker._record_to_dict(
('obj', next(self.ts_iter).internal, 0, 'text/plain', 'etag', 1))])
old_max_row = old_broker.get_max_row()
self.assertEqual(1, old_max_row) # sanity check
ctx = CleavingContext(old_db_id, 'curs', 1, 1, 0, True, True)
ctx.store(broker)
# adding a row changes max row
old_broker.merge_items([old_broker._record_to_dict(
('obj', next(self.ts_iter).internal, 0, 'text/plain', 'etag', 1))])
new_ctx = CleavingContext.load(broker)
self.assertEqual(old_db_id, new_ctx.ref)
self.assertEqual('curs', new_ctx.cursor)
self.assertEqual(2, new_ctx.max_row)
self.assertEqual(1, new_ctx.cleave_to_row)
self.assertEqual(0, new_ctx.last_cleave_to_row)
self.assertTrue(new_ctx.misplaced_done)
self.assertTrue(new_ctx.cleaving_done)
def test_store_reclaim_load_old_style(self):
# reclaiming rows from older db does not change context
broker = self._make_old_style_sharding_broker()
old_broker = broker.get_brokers()[0]
old_db_id = old_broker.get_info()['id']
old_broker.merge_items([old_broker._record_to_dict(
('obj', next(self.ts_iter).internal, 0, 'text/plain', 'etag', 1))])
old_max_row = old_broker.get_max_row()
self.assertEqual(1, old_max_row) # sanity check
ctx = CleavingContext(old_db_id, 'curs', 1, 1, 0, True, True)
ctx.store(broker)
self.assertEqual(
1, len(old_broker.get_objects()))
now = next(self.ts_iter).internal
broker.get_brokers()[0].reclaim(now, now)
self.assertFalse(old_broker.get_objects())
new_ctx = CleavingContext.load(broker)
self.assertEqual(old_db_id, new_ctx.ref)
self.assertEqual('curs', new_ctx.cursor)
self.assertEqual(1, new_ctx.max_row)
self.assertEqual(1, new_ctx.cleave_to_row)
self.assertEqual(0, new_ctx.last_cleave_to_row)
self.assertTrue(new_ctx.misplaced_done)
self.assertTrue(new_ctx.cleaving_done)
def test_store_modify_db_id_load_old_style(self):
# changing id changes ref, so results in a fresh context
broker = self._make_old_style_sharding_broker()
old_broker = broker.get_brokers()[0]
old_db_id = old_broker.get_info()['id']
ctx = CleavingContext(old_db_id, 'curs', 12, 11, 2, True, True)
ctx.store(broker)
old_broker.newid('fake_remote_id')
new_db_id = old_broker.get_info()['id']
self.assertNotEqual(old_db_id, new_db_id)
new_ctx = CleavingContext.load(broker)
self.assertEqual(new_db_id, new_ctx.ref)
self.assertEqual('', new_ctx.cursor)
# note max_row is dynamically updated during load
self.assertEqual(-1, new_ctx.max_row)
self.assertEqual(None, new_ctx.cleave_to_row)
self.assertEqual(None, new_ctx.last_cleave_to_row)
self.assertFalse(new_ctx.misplaced_done)
self.assertFalse(new_ctx.cleaving_done)
def test_load_modify_store_load_old_style(self):
broker = self._make_old_style_sharding_broker()
old_db_id = broker.get_brokers()[0].get_info()['id']
ctx = CleavingContext.load(broker)
self.assertEqual(old_db_id, ctx.ref)
self.assertEqual('', ctx.cursor) # sanity check
ctx.cursor = 'curs'
ctx.misplaced_done = True
ctx.store(broker)
ctx = CleavingContext.load(broker)
self.assertEqual(old_db_id, ctx.ref)
self.assertEqual('curs', ctx.cursor)
self.assertTrue(ctx.misplaced_done)
def test_store(self):
broker = self._make_sharding_broker()
old_db_id = broker.get_brokers()[0].get_info()['id']

View File

@ -350,7 +350,7 @@ class TestContainerUpdater(unittest.TestCase):
self.assertEqual(info['reported_object_count'], 1)
self.assertEqual(info['reported_bytes_used'], 3)
def test_shard_container(self):
def test_old_style_shard_container(self):
cu = self._get_container_updater()
cu.run_once()
containers_dir = os.path.join(self.sda1, DATADIR)
@ -439,5 +439,94 @@ class TestContainerUpdater(unittest.TestCase):
self.assertEqual(info['reported_object_count'], 0)
self.assertEqual(info['reported_bytes_used'], 0)
def test_shard_container(self):
cu = self._get_container_updater()
cu.run_once()
containers_dir = os.path.join(self.sda1, DATADIR)
os.mkdir(containers_dir)
cu.run_once()
self.assertTrue(os.path.exists(containers_dir))
subdir = os.path.join(containers_dir, 'subdir')
os.mkdir(subdir)
cb = ContainerBroker(os.path.join(subdir, 'hash.db'),
account='.shards_a', container='c')
cb.initialize(normalize_timestamp(1), 0)
cb.set_sharding_sysmeta('Quoted-Root', 'a/c')
self.assertFalse(cb.is_root_container())
cu.run_once()
info = cb.get_info()
self.assertEqual(info['object_count'], 0)
self.assertEqual(info['bytes_used'], 0)
self.assertEqual(info['reported_put_timestamp'], '0')
self.assertEqual(info['reported_delete_timestamp'], '0')
self.assertEqual(info['reported_object_count'], 0)
self.assertEqual(info['reported_bytes_used'], 0)
cb.put_object('o', normalize_timestamp(2), 3, 'text/plain',
'68b329da9893e34099c7d8ad5cb9c940')
# Fake us having already reported *bad* stats under swift 2.18.0
cb.reported('0', '0', 1, 3)
# Should fail with a bunch of connection-refused
cu.run_once()
info = cb.get_info()
self.assertEqual(info['object_count'], 1)
self.assertEqual(info['bytes_used'], 3)
self.assertEqual(info['reported_put_timestamp'], '0')
self.assertEqual(info['reported_delete_timestamp'], '0')
self.assertEqual(info['reported_object_count'], 1)
self.assertEqual(info['reported_bytes_used'], 3)
def accept(sock, addr, return_code):
try:
with Timeout(3):
inc = sock.makefile('rb')
out = sock.makefile('wb')
out.write(b'HTTP/1.1 %d OK\r\nContent-Length: 0\r\n\r\n' %
return_code)
out.flush()
self.assertEqual(inc.readline(),
b'PUT /sda1/2/.shards_a/c HTTP/1.1\r\n')
headers = {}
line = inc.readline()
while line and line != b'\r\n':
headers[line.split(b':')[0].lower()] = \
line.split(b':')[1].strip()
line = inc.readline()
self.assertIn(b'x-put-timestamp', headers)
self.assertIn(b'x-delete-timestamp', headers)
self.assertIn(b'x-object-count', headers)
self.assertIn(b'x-bytes-used', headers)
except BaseException as err:
import traceback
traceback.print_exc()
return err
return None
bindsock = listen_zero()
def spawn_accepts():
events = []
for _junk in range(2):
sock, addr = bindsock.accept()
events.append(spawn(accept, sock, addr, 201))
return events
spawned = spawn(spawn_accepts)
for dev in cu.get_account_ring().devs:
if dev is not None:
dev['port'] = bindsock.getsockname()[1]
cu.run_once()
for event in spawned.wait():
err = event.wait()
if err:
raise err
info = cb.get_info()
self.assertEqual(info['object_count'], 1)
self.assertEqual(info['bytes_used'], 3)
self.assertEqual(info['reported_put_timestamp'], '0000000001.00000')
self.assertEqual(info['reported_delete_timestamp'], '0')
self.assertEqual(info['reported_object_count'], 0)
self.assertEqual(info['reported_bytes_used'], 0)
if __name__ == '__main__':
unittest.main()

View File

@ -1066,6 +1066,7 @@ class TestObjectController(unittest.TestCase):
# User-Agent is updated.
expected_post_headers['User-Agent'] = 'object-updater %s' % os.getpid()
expected_post_headers['X-Backend-Accept-Redirect'] = 'true'
expected_post_headers['X-Backend-Accept-Quoted-Location'] = 'true'
self.assertDictEqual(expected_post_headers, actual_headers)
self.assertFalse(
os.listdir(os.path.join(
@ -1078,7 +1079,8 @@ class TestObjectController(unittest.TestCase):
self._test_PUT_then_POST_async_pendings(
POLICIES[1], update_etag='override_etag')
def _check_PUT_redirected_async_pending(self, container_path=None):
def _check_PUT_redirected_async_pending(self, container_path=None,
old_style=False):
# When container update is redirected verify that the redirect location
# is persisted in the async pending file.
policy = POLICIES[0]
@ -1097,8 +1099,10 @@ class TestObjectController(unittest.TestCase):
'X-Container-Device': 'cdevice'}
if container_path:
# the proxy may include this header
put_headers['X-Backend-Container-Path'] = container_path
# the proxy may include either header
hdr = ('X-Backend-Container-Path' if old_style
else 'X-Backend-Quoted-Container-Path')
put_headers[hdr] = container_path
expected_update_path = '/cdevice/99/%s/o' % container_path
else:
expected_update_path = '/cdevice/99/a/c/o'
@ -1176,6 +1180,10 @@ class TestObjectController(unittest.TestCase):
def test_PUT_redirected_async_pending_with_container_path(self):
self._check_PUT_redirected_async_pending(container_path='.another/c')
def test_PUT_redirected_async_pending_with_old_style_container_path(self):
self._check_PUT_redirected_async_pending(
container_path='.another/c', old_style=True)
def test_POST_quarantine_zbyte(self):
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
@ -5379,7 +5387,7 @@ class TestObjectController(unittest.TestCase):
'X-Backend-Container-Update-Override-Content-Type': 'ignored',
'X-Backend-Container-Update-Override-Foo': 'ignored'})
def test_PUT_container_update_to_shard(self):
def test_PUT_container_update_to_old_style_shard(self):
# verify that alternate container update path is respected when
# included in request headers
def do_test(container_path, expected_path, expected_container_path):
@ -5469,6 +5477,96 @@ class TestObjectController(unittest.TestCase):
do_test('too/many/parts', 'a/c', None)
do_test('/leading/slash', 'a/c', None)
def test_PUT_container_update_to_shard(self):
# verify that alternate container update path is respected when
# included in request headers
def do_test(container_path, expected_path, expected_container_path):
policy = random.choice(list(POLICIES))
container_updates = []
def capture_updates(
ip, port, method, path, headers, *args, **kwargs):
container_updates.append((ip, port, method, path, headers))
pickle_async_update_args = []
def fake_pickle_async_update(*args):
pickle_async_update_args.append(args)
diskfile_mgr = self.object_controller._diskfile_router[policy]
diskfile_mgr.pickle_async_update = fake_pickle_async_update
ts_put = next(self.ts)
headers = {
'X-Timestamp': ts_put.internal,
'X-Trans-Id': '123',
'X-Container-Host': 'chost:cport',
'X-Container-Partition': 'cpartition',
'X-Container-Device': 'cdevice',
'Content-Type': 'text/plain',
'X-Object-Sysmeta-Ec-Frag-Index': 0,
'X-Backend-Storage-Policy-Index': int(policy),
}
if container_path is not None:
headers['X-Backend-Quoted-Container-Path'] = container_path
req = Request.blank('/sda1/0/a/c/o', method='PUT',
headers=headers, body='')
with mocked_http_conn(
500, give_connect=capture_updates) as fake_conn:
with fake_spawn():
resp = req.get_response(self.object_controller)
with self.assertRaises(StopIteration):
next(fake_conn.code_iter)
self.assertEqual(resp.status_int, 201)
self.assertEqual(len(container_updates), 1)
# verify expected path used in update request
ip, port, method, path, headers = container_updates[0]
self.assertEqual(ip, 'chost')
self.assertEqual(port, 'cport')
self.assertEqual(method, 'PUT')
self.assertEqual(path, '/cdevice/cpartition/%s/o' % expected_path)
# verify that the picked update *always* has root container
self.assertEqual(1, len(pickle_async_update_args))
(objdevice, account, container, obj, data, timestamp,
policy) = pickle_async_update_args[0]
self.assertEqual(objdevice, 'sda1')
self.assertEqual(account, 'a') # NB user account
self.assertEqual(container, 'c') # NB root container
self.assertEqual(obj, 'o')
self.assertEqual(timestamp, ts_put.internal)
self.assertEqual(policy, policy)
expected_data = {
'headers': HeaderKeyDict({
'X-Size': '0',
'User-Agent': 'object-server %s' % os.getpid(),
'X-Content-Type': 'text/plain',
'X-Timestamp': ts_put.internal,
'X-Trans-Id': '123',
'Referer': 'PUT http://localhost/sda1/0/a/c/o',
'X-Backend-Storage-Policy-Index': int(policy),
'X-Etag': 'd41d8cd98f00b204e9800998ecf8427e'}),
'obj': 'o',
'account': 'a',
'container': 'c',
'op': 'PUT'}
if expected_container_path:
expected_data['container_path'] = expected_container_path
self.assertEqual(expected_data, data)
do_test('a_shard/c_shard', 'a_shard/c_shard', 'a_shard/c_shard')
do_test('', 'a/c', None)
do_test(None, 'a/c', None)
# TODO: should these cases trigger a 400 response rather than
# defaulting to root path?
do_test('garbage', 'a/c', None)
do_test('/', 'a/c', None)
do_test('/no-acct', 'a/c', None)
do_test('no-cont/', 'a/c', None)
do_test('too/many/parts', 'a/c', None)
do_test('/leading/slash', 'a/c', None)
def test_container_update_async(self):
policy = random.choice(list(POLICIES))
req = Request.blank(

View File

@ -711,19 +711,24 @@ class TestObjectUpdater(unittest.TestCase):
'X-Backend-Storage-Policy-Index': str(int(policies[0])),
'User-Agent': 'object-updater %s' % os.getpid(),
'X-Backend-Accept-Redirect': 'true',
'X-Backend-Accept-Quoted-Location': 'true',
}
# always expect X-Backend-Accept-Redirect to be true
# always expect X-Backend-Accept-Redirect and
# X-Backend-Accept-Quoted-Location to be true
do_test(headers_out, expected, container_path='.shards_a/shard_c')
do_test(headers_out, expected)
# ...unless X-Backend-Accept-Redirect is already set
# ...unless they're already set
expected['X-Backend-Accept-Redirect'] = 'false'
expected['X-Backend-Accept-Quoted-Location'] = 'false'
headers_out_2 = dict(headers_out)
headers_out_2['X-Backend-Accept-Redirect'] = 'false'
headers_out_2['X-Backend-Accept-Quoted-Location'] = 'false'
do_test(headers_out_2, expected)
# updater should add policy header if missing
expected['X-Backend-Accept-Redirect'] = 'true'
expected['X-Backend-Accept-Quoted-Location'] = 'true'
headers_out['X-Backend-Storage-Policy-Index'] = None
do_test(headers_out, expected)
@ -747,7 +752,8 @@ class TestObjectUpdater(unittest.TestCase):
'X-Timestamp': timestamp.internal,
'X-Backend-Storage-Policy-Index': str(int(policy)),
'User-Agent': 'object-updater %s' % os.getpid(),
'X-Backend-Accept-Redirect': 'true'}
'X-Backend-Accept-Redirect': 'true',
'X-Backend-Accept-Quoted-Location': 'true'}
for request in requests:
self.assertEqual('PUT', request['method'])
self.assertDictEqual(expected_headers, request['headers'])
@ -954,9 +960,11 @@ class TestObjectUpdater(unittest.TestCase):
# 1st round of redirects, newest redirect should be chosen
(301, {'Location': '/.shards_a/c_shard_old/o',
'X-Backend-Redirect-Timestamp': ts_redirect_1.internal}),
(301, {'Location': '/.shards_a/c_shard_new/o',
(301, {'Location': '/.shards_a/c%5Fshard%5Fnew/o',
'X-Backend-Location-Is-Quoted': 'true',
'X-Backend-Redirect-Timestamp': ts_redirect_2.internal}),
(301, {'Location': '/.shards_a/c_shard_old/o',
(301, {'Location': '/.shards_a/c%5Fshard%5Fold/o',
'X-Backend-Location-Is-Quoted': 'true',
'X-Backend-Redirect-Timestamp': ts_redirect_1.internal}),
# 2nd round of redirects
(301, {'Location': '/.shards_a/c_shard_newer/o',

View File

@ -3832,7 +3832,7 @@ class TestReplicatedObjectController(
'Host': 'localhost:80',
'Referer': '%s http://localhost/v1/a/c/o' % method,
'X-Backend-Storage-Policy-Index': '1',
'X-Backend-Container-Path': shard_range.name
'X-Backend-Quoted-Container-Path': shard_range.name
},
}
check_request(request, **expectations)
@ -3943,7 +3943,7 @@ class TestReplicatedObjectController(
'Host': 'localhost:80',
'Referer': '%s http://localhost/v1/a/c/o' % method,
'X-Backend-Storage-Policy-Index': '1',
'X-Backend-Container-Path': shard_ranges[1].name
'X-Backend-Quoted-Container-Path': shard_ranges[1].name
},
}
check_request(request, **expectations)
@ -4045,7 +4045,7 @@ class TestReplicatedObjectController(
'Host': 'localhost:80',
'Referer': '%s http://localhost/v1/a/c/o' % method,
'X-Backend-Storage-Policy-Index': '1',
'X-Backend-Container-Path': shard_ranges[1].name
'X-Backend-Quoted-Container-Path': shard_ranges[1].name
},
}
check_request(request, **expectations)