diff --git a/swift/common/db.py b/swift/common/db.py index 76249ed2d5..c6df12aa3a 100644 --- a/swift/common/db.py +++ b/swift/common/db.py @@ -59,19 +59,23 @@ def utf8encode(*args): for s in args] -def native_str_keys(metadata): +def native_str_keys_and_values(metadata): if six.PY2: uni_keys = [k for k in metadata if isinstance(k, six.text_type)] for k in uni_keys: sv = metadata[k] del metadata[k] - metadata[k.encode('utf-8')] = sv + metadata[k.encode('utf-8')] = [ + x.encode('utf-8') if isinstance(x, six.text_type) else x + for x in sv] else: bin_keys = [k for k in metadata if isinstance(k, six.binary_type)] for k in bin_keys: sv = metadata[k] del metadata[k] - metadata[k.decode('utf-8')] = sv + metadata[k.decode('utf-8')] = [ + x.decode('utf-8') if isinstance(x, six.binary_type) else x + for x in sv] ZERO_LIKE_VALUES = {None, '', 0, '0'} @@ -878,7 +882,7 @@ class DatabaseBroker(object): metadata = self.get_raw_metadata() if metadata: metadata = json.loads(metadata) - native_str_keys(metadata) + native_str_keys_and_values(metadata) else: metadata = {} return metadata @@ -940,7 +944,7 @@ class DatabaseBroker(object): self.db_type) md = row[0] md = json.loads(md) if md else {} - native_str_keys(md) + native_str_keys_and_values(md) except sqlite3.OperationalError as err: if 'no such column: metadata' not in str(err): raise diff --git a/swift/common/utils.py b/swift/common/utils.py index 68005f324b..2390f87835 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -75,7 +75,7 @@ from six.moves import cPickle as pickle from six.moves.configparser import (ConfigParser, NoSectionError, NoOptionError, RawConfigParser) from six.moves import range, http_client -from six.moves.urllib.parse import quote as _quote +from six.moves.urllib.parse import quote as _quote, unquote from six.moves.urllib.parse import urlparse from swift import gettext_ as _ @@ -5699,6 +5699,9 @@ def get_redirect_data(response): if 'Location' not in headers: return None location = urlparse(headers['Location']).path + if config_true_value(headers.get('X-Backend-Location-Is-Quoted', + 'false')): + location = unquote(location) account, container, _junk = split_path(location, 2, 3, True) timestamp_val = headers.get('X-Backend-Redirect-Timestamp') try: diff --git a/swift/container/backend.py b/swift/container/backend.py index 8dee9d243c..48e298232b 100644 --- a/swift/container/backend.py +++ b/swift/container/backend.py @@ -22,6 +22,7 @@ from uuid import uuid4 import six from six.moves import range +from six.moves.urllib.parse import unquote import sqlite3 from eventlet import tpool @@ -2040,7 +2041,14 @@ class ContainerBroker(DatabaseBroker): ``container`` attributes respectively. """ - path = self.get_sharding_sysmeta('Root') + path = self.get_sharding_sysmeta('Quoted-Root') + hdr = 'X-Container-Sysmeta-Shard-Quoted-Root' + if path: + path = unquote(path) + else: + path = self.get_sharding_sysmeta('Root') + hdr = 'X-Container-Sysmeta-Shard-Root' + if not path: # Ensure account/container get populated self._populate_instance_cache() @@ -2052,8 +2060,8 @@ class ContainerBroker(DatabaseBroker): self._root_account, self._root_container = split_path( '/' + path, 2, 2) except ValueError: - raise ValueError("Expected X-Container-Sysmeta-Shard-Root to be " - "of the form 'account/container', got %r" % path) + raise ValueError("Expected %s to be of the form " + "'account/container', got %r" % (hdr, path)) @property def root_account(self): diff --git a/swift/container/server.py b/swift/container/server.py index 831237cf88..0702ab92e2 100644 --- a/swift/container/server.py +++ b/swift/container/server.py @@ -23,6 +23,7 @@ from swift import gettext_ as _ from eventlet import Timeout import six +from six.moves.urllib.parse import quote import swift.common.db from swift.container.sync_store import ContainerSyncStore @@ -312,6 +313,11 @@ class ContainerController(BaseStorageServer): """ if not config_true_value( req.headers.get('x-backend-accept-redirect', False)): + # We want to avoid fetching shard ranges for the (more + # time-sensitive) object-server update, so allow some misplaced + # objects to land between when we've started sharding and when the + # proxy learns about it. Note that this path is also used by old, + # pre-sharding updaters during a rolling upgrade. return None shard_ranges = broker.get_shard_ranges( @@ -324,7 +330,15 @@ class ContainerController(BaseStorageServer): # in preference to the parent, which is the desired result. containing_range = shard_ranges[0] location = "/%s/%s" % (containing_range.name, obj_name) - headers = {'Location': location, + if location != quote(location) and not config_true_value( + req.headers.get('x-backend-accept-quoted-location', False)): + # Sender expects the destination to be unquoted, but it isn't safe + # to send unquoted. Eat the update for now and let the sharder + # move it later. Should only come up during rolling upgrades. + return None + + headers = {'Location': quote(location), + 'X-Backend-Location-Is-Quoted': 'true', 'X-Backend-Redirect-Timestamp': containing_range.timestamp.internal} diff --git a/swift/container/sharder.py b/swift/container/sharder.py index 98d6588559..fb5d6f2210 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -656,7 +656,14 @@ class ContainerSharder(ContainerReplicator): # Get the valid info into the broker.container, etc shard_broker.get_info() shard_broker.merge_shard_ranges(shard_range) - shard_broker.set_sharding_sysmeta('Root', root_path) + shard_broker.set_sharding_sysmeta('Quoted-Root', quote(root_path)) + # NB: we *used* to do + # shard_broker.set_sharding_sysmeta('Root', root_path) + # but that isn't safe for container names with nulls or newlines (or + # possibly some other characters). We consciously *don't* make any + # attempt to set the old meta; during an upgrade, some shards may think + # they are in fact roots, but it cleans up well enough once everyone's + # upgraded. shard_broker.update_metadata({ 'X-Container-Sysmeta-Sharding': ('True', Timestamp.now().internal)}) @@ -1129,8 +1136,16 @@ class ContainerSharder(ContainerReplicator): shard_range.update_state(ShardRange.CREATED) headers = { 'X-Backend-Storage-Policy-Index': broker.storage_policy_index, - 'X-Container-Sysmeta-Shard-Root': broker.root_path, + 'X-Container-Sysmeta-Shard-Quoted-Root': quote( + broker.root_path), 'X-Container-Sysmeta-Sharding': True} + # NB: we *used* to send along + # 'X-Container-Sysmeta-Shard-Root': broker.root_path + # but that isn't safe for container names with nulls or newlines + # (or possibly some other characters). We consciously *don't* make + # any attempt to set the old meta; during an upgrade, some shards + # may think they are in fact roots, but it cleans up well enough + # once everyone's upgraded. success = self._send_shard_ranges( shard_range.account, shard_range.container, [shard_range], headers=headers) diff --git a/swift/obj/server.py b/swift/obj/server.py index d1a1d85e93..0c46247301 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -17,6 +17,7 @@ import six import six.moves.cPickle as pickle +from six.moves.urllib.parse import unquote import json import os import multiprocessing @@ -366,7 +367,6 @@ class ObjectController(BaseStorageServer): contdevices = [d.strip() for d in headers_in.get('X-Container-Device', '').split(',')] contpartition = headers_in.get('X-Container-Partition', '') - contpath = headers_in.get('X-Backend-Container-Path') if len(conthosts) != len(contdevices): # This shouldn't happen unless there's a bug in the proxy, @@ -379,6 +379,12 @@ class ObjectController(BaseStorageServer): 'devices': headers_in.get('X-Container-Device', '')}) return + contpath = headers_in.get('X-Backend-Quoted-Container-Path') + if contpath: + contpath = unquote(contpath) + else: + contpath = headers_in.get('X-Backend-Container-Path') + if contpath: try: # TODO: this is very late in request handling to be validating diff --git a/swift/obj/updater.py b/swift/obj/updater.py index 8f414d4eb7..466f294c0f 100644 --- a/swift/obj/updater.py +++ b/swift/obj/updater.py @@ -358,6 +358,7 @@ class ObjectUpdater(Daemon): headers_out.setdefault('X-Backend-Storage-Policy-Index', str(int(policy))) headers_out.setdefault('X-Backend-Accept-Redirect', 'true') + headers_out.setdefault('X-Backend-Accept-Quoted-Location', 'true') container_path = update.get('container_path') if container_path: acct, cont = split_path('/' + container_path, minsegs=2) diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 9b71ed259f..3a5b7f0057 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -343,7 +343,15 @@ class BaseObjectController(Controller): headers[index].get('X-Container-Device'), container['device']) if container_path: - headers[index]['X-Backend-Container-Path'] = container_path + headers[index]['X-Backend-Quoted-Container-Path'] = quote( + container_path) + # NB: we used to send + # 'X-Backend-Container-Path': container_path + # but that isn't safe for container names with nulls or + # newlines (or possibly some other characters). We consciously + # *don't* make any attempt to set the old meta; during an + # upgrade, old object-servers will talk to the root which + # will eat the update and move it as a misplaced object. def set_delete_at_headers(index, delete_at_node): headers[index]['X-Delete-At-Container'] = delete_at_container diff --git a/test/probe/test_sharder.py b/test/probe/test_sharder.py index 2d5c5417f9..4a0fb386ee 100644 --- a/test/probe/test_sharder.py +++ b/test/probe/test_sharder.py @@ -20,6 +20,7 @@ import uuid from nose import SkipTest import six +from six.moves.urllib.parse import quote from swift.common import direct_client, utils from swift.common.manager import Manager @@ -57,6 +58,7 @@ class ShardCollector(object): class BaseTestContainerSharding(ReplProbeTest): + DELIM = '-' def _maybe_skip_test(self): try: @@ -101,10 +103,10 @@ class BaseTestContainerSharding(ReplProbeTest): self._maybe_skip_test() def _make_object_names(self, number): - return ['obj-%04d' % x for x in range(number)] + return ['obj%s%04d' % (self.DELIM, x) for x in range(number)] def _setup_container_name(self): - self.container_name = 'container-%s' % uuid.uuid4() + self.container_name = 'container%s%s' % (self.DELIM, uuid.uuid4()) def setUp(self): client.logger.setLevel(client.logging.WARNING) @@ -415,7 +417,8 @@ class TestContainerShardingNonUTF8(BaseTestContainerSharding): self.max_shard_size // 2)] def check_listing(objects, **params): - qs = '&'.join(['%s=%s' % param for param in params.items()]) + qs = '&'.join('%s=%s' % (k, quote(str(v))) + for k, v in params.items()) headers, listing = client.get_container( self.url, self.token, self.container_name, query_string=qs) listing = [x['name'].encode('utf-8') if six.PY2 else x['name'] @@ -468,12 +471,12 @@ class TestContainerShardingNonUTF8(BaseTestContainerSharding): # delimiter headers, listing = client.get_container( self.url, self.token, self.container_name, - query_string='delimiter=-') - self.assertEqual([{'subdir': 'obj-'}], listing) + query_string='delimiter=' + quote(self.DELIM)) + self.assertEqual([{'subdir': 'obj' + self.DELIM}], listing) headers, listing = client.get_container( self.url, self.token, self.container_name, - query_string='delimiter=j-') - self.assertEqual([{'subdir': 'obj-'}], listing) + query_string='delimiter=j' + quote(self.DELIM)) + self.assertEqual([{'subdir': 'obj' + self.DELIM}], listing) limit = self.cluster_info['swift']['container_listing_limit'] exc = check_listing_fails(412, limit=limit + 1) @@ -546,13 +549,23 @@ class TestContainerShardingNonUTF8(BaseTestContainerSharding): self.assert_container_post_ok('sharded') +class TestContainerShardingFunkyNames(TestContainerShardingNonUTF8): + DELIM = '\n' + + def _make_object_names(self, number): + return ['obj\n%04d%%Ff' % x for x in range(number)] + + def _setup_container_name(self): + self.container_name = 'container\n%%Ff\n%s' % uuid.uuid4() + + class TestContainerShardingUTF8(TestContainerShardingNonUTF8): def _make_object_names(self, number): # override default with names that include non-ascii chars name_length = self.cluster_info['swift']['max_object_name_length'] obj_names = [] for x in range(number): - name = (u'obj-\u00e4\u00ea\u00ec\u00f2\u00fb-%04d' % x) + name = (u'obj-\u00e4\u00ea\u00ec\u00f2\u00fb\u1234-%04d' % x) name = name.encode('utf8').ljust(name_length, b'o') if not six.PY2: name = name.decode('utf8') @@ -563,10 +576,11 @@ class TestContainerShardingUTF8(TestContainerShardingNonUTF8): # override default with max length name that includes non-ascii chars super(TestContainerShardingUTF8, self)._setup_container_name() name_length = self.cluster_info['swift']['max_container_name_length'] - cont_name = self.container_name + u'-\u00e4\u00ea\u00ec\u00f2\u00fb' - self.conainer_name = cont_name.ljust(name_length, 'x') - if six.PY2: - self.conainer_name = self.container_name.encode('utf8') + cont_name = \ + self.container_name + u'-\u00e4\u00ea\u00ec\u00f2\u00fb\u1234' + self.container_name = cont_name.encode('utf8').ljust(name_length, b'x') + if not six.PY2: + self.container_name = self.container_name.decode('utf8') class TestContainerSharding(BaseTestContainerSharding): @@ -1114,7 +1128,9 @@ class TestContainerSharding(BaseTestContainerSharding): shard_listings = self.direct_get_container(sr.account, sr.container) for node, (hdrs, listing) in shard_listings.items(): - shard_listing_names = [o['name'] for o in listing] + shard_listing_names = [ + o['name'].encode('utf-8') if six.PY2 else o['name'] + for o in listing] for obj in obj_names[4::5]: if obj in sr: self.assertIn(obj, shard_listing_names) @@ -1178,8 +1194,9 @@ class TestContainerSharding(BaseTestContainerSharding): expected_shards=0, exp_obj_count=0): # checks that shard range is consistent on all nodes root_path = '%s/%s' % (self.account, self.container_name) - exp_shard_hdrs = {'X-Container-Sysmeta-Shard-Root': root_path, - 'X-Backend-Sharding-State': expected_state} + exp_shard_hdrs = { + 'X-Container-Sysmeta-Shard-Quoted-Root': quote(root_path), + 'X-Backend-Sharding-State': expected_state} object_counts = [] bytes_used = [] for node_id, node_data in node_data.items(): @@ -2178,3 +2195,27 @@ class TestContainerSharding(BaseTestContainerSharding): self.assertEqual(2, int(metadata.get('x-account-container-count'))) self.assertEqual(0, int(metadata.get('x-account-object-count'))) self.assertEqual(0, int(metadata.get('x-account-bytes-used'))) + + +class TestContainerShardingMoreUTF8(TestContainerSharding): + def _make_object_names(self, number): + # override default with names that include non-ascii chars + name_length = self.cluster_info['swift']['max_object_name_length'] + obj_names = [] + for x in range(number): + name = (u'obj-\u00e4\u00ea\u00ec\u00f2\u00fb-%04d' % x) + name = name.encode('utf8').ljust(name_length, b'o') + if not six.PY2: + name = name.decode('utf8') + obj_names.append(name) + return obj_names + + def _setup_container_name(self): + # override default with max length name that includes non-ascii chars + super(TestContainerShardingMoreUTF8, self)._setup_container_name() + name_length = self.cluster_info['swift']['max_container_name_length'] + cont_name = \ + self.container_name + u'-\u00e4\u00ea\u00ec\u00f2\u00fb\u1234' + self.container_name = cont_name.encode('utf8').ljust(name_length, b'x') + if not six.PY2: + self.container_name = self.container_name.decode('utf8') diff --git a/test/unit/common/test_db.py b/test/unit/common/test_db.py index cfef1b55aa..ab9ed64a72 100644 --- a/test/unit/common/test_db.py +++ b/test/unit/common/test_db.py @@ -526,13 +526,14 @@ class TestExampleBroker(unittest.TestCase): # This is not obvious. The actual JSON in the database is the same: # '{"test\\u062a": ["value\\u062a", "0000000001.00000"]}' # The only difference is what reading it produces on py2 and py3. - # We use native strings for metadata keys (see native_str_keys()), - # so keys are different. + # We use native strings for metadata (see native_str_keys_and_values), + # so types are different. if six.PY2: key = u'test\u062a'.encode('utf-8') + value = u'value\u062a'.encode('utf-8') else: key = u'test\u062a' - value = u'value\u062a' + value = u'value\u062a' metadata = { key: [value, Timestamp(1).internal] } diff --git a/test/unit/container/test_backend.py b/test/unit/container/test_backend.py index d53f1cda40..33fd5298e6 100644 --- a/test/unit/container/test_backend.py +++ b/test/unit/container/test_backend.py @@ -339,7 +339,7 @@ class TestContainerBroker(unittest.TestCase): self.assertTrue(broker.empty()) @with_tempdir - def test_empty_shard_container(self, tempdir): + def test_empty_old_style_shard_container(self, tempdir): # Test ContainerBroker.empty for a shard container where shard range # usage should not be considered db_path = os.path.join( @@ -418,6 +418,86 @@ class TestContainerBroker(unittest.TestCase): broker.merge_shard_ranges([own_sr]) self.assertTrue(broker.empty()) + @with_tempdir + def test_empty_shard_container(self, tempdir): + # Test ContainerBroker.empty for a shard container where shard range + # usage should not be considered + db_path = os.path.join( + tempdir, 'containers', 'part', 'suffix', 'hash', 'container.db') + broker = ContainerBroker(db_path, account='.shards_a', container='cc') + broker.initialize(next(self.ts).internal, 0) + broker.set_sharding_sysmeta('Quoted-Root', 'a/c') + self.assertFalse(broker.is_root_container()) + + def check_object_counted(broker_to_test, broker_with_object): + obj = {'name': 'o', 'created_at': next(self.ts).internal, + 'size': 0, 'content_type': 'text/plain', 'etag': EMPTY_ETAG, + 'deleted': 0} + broker_with_object.merge_items([dict(obj)]) + self.assertFalse(broker_to_test.empty()) + # and delete it + obj.update({'created_at': next(self.ts).internal, 'deleted': 1}) + broker_with_object.merge_items([dict(obj)]) + self.assertTrue(broker_to_test.empty()) + + self.assertTrue(broker.empty()) + check_object_counted(broker, broker) + + # own shard range is not considered for object count + own_sr = broker.get_own_shard_range() + self.assertEqual(0, own_sr.object_count) + broker.merge_shard_ranges([own_sr]) + self.assertTrue(broker.empty()) + + broker.put_object('o', next(self.ts).internal, 0, 'text/plain', + EMPTY_ETAG) + own_sr = broker.get_own_shard_range() + self.assertEqual(1, own_sr.object_count) + broker.merge_shard_ranges([own_sr]) + self.assertFalse(broker.empty()) + broker.delete_object('o', next(self.ts).internal) + self.assertTrue(broker.empty()) + + def check_shard_ranges_not_counted(): + sr = ShardRange('.shards_a/shard_c', next(self.ts), object_count=0) + sr.update_meta(13, 99, meta_timestamp=next(self.ts)) + for state in ShardRange.STATES: + sr.update_state(state, state_timestamp=next(self.ts)) + broker.merge_shard_ranges([sr]) + self.assertTrue(broker.empty()) + + # empty other shard ranges do not influence result + sr.update_meta(0, 0, meta_timestamp=next(self.ts)) + for state in ShardRange.STATES: + sr.update_state(state, state_timestamp=next(self.ts)) + broker.merge_shard_ranges([sr]) + self.assertTrue(broker.empty()) + + check_shard_ranges_not_counted() + + # move to sharding state + broker.enable_sharding(next(self.ts)) + self.assertTrue(broker.set_sharding_state()) + + # check object in retiring db is considered + check_object_counted(broker, broker.get_brokers()[0]) + self.assertTrue(broker.empty()) + # as well as misplaced objects in fresh db + check_object_counted(broker, broker) + check_shard_ranges_not_counted() + + # move to sharded state + self.assertTrue(broker.set_sharded_state()) + self.assertTrue(broker.empty()) + check_object_counted(broker, broker) + check_shard_ranges_not_counted() + + # own shard range still has no influence + own_sr = broker.get_own_shard_range() + own_sr.update_meta(3, 4, meta_timestamp=next(self.ts)) + broker.merge_shard_ranges([own_sr]) + self.assertTrue(broker.empty()) + def test_reclaim(self): broker = ContainerBroker(':memory:', account='test_account', container='test_container') @@ -3361,7 +3441,7 @@ class TestContainerBroker(unittest.TestCase): self.assertEqual('myaccount/mycontainer', broker.path) @with_tempdir - def test_root_account_container_path(self, tempdir): + def test_old_style_root_account_container_path(self, tempdir): db_path = os.path.join(tempdir, 'container.db') broker = ContainerBroker( db_path, account='root_a', container='root_c') @@ -3442,6 +3522,88 @@ class TestContainerBroker(unittest.TestCase): check_validation('/root_a/root_c/blah') check_validation('/') + @with_tempdir + def test_root_account_container_path(self, tempdir): + db_path = os.path.join(tempdir, 'container.db') + broker = ContainerBroker( + db_path, account='root_a', container='root_c') + broker.initialize(next(self.ts).internal, 1) + # make sure we can cope with unitialized account and container + broker.account = broker.container = None + + self.assertEqual('root_a', broker.root_account) + self.assertEqual('root_c', broker.root_container) + self.assertEqual('root_a/root_c', broker.root_path) + self.assertTrue(broker.is_root_container()) + self.assertEqual('root_a', broker.account) # sanity check + self.assertEqual('root_c', broker.container) # sanity check + + # we don't expect root containers to have this sysmeta set but if it is + # the broker should still behave like a root container + metadata = { + 'X-Container-Sysmeta-Shard-Quoted-Root': + ('root_a/root_c', next(self.ts).internal)} + broker = ContainerBroker( + db_path, account='root_a', container='root_c') + broker.update_metadata(metadata) + broker.account = broker.container = None + self.assertEqual('root_a', broker.root_account) + self.assertEqual('root_c', broker.root_container) + self.assertEqual('root_a/root_c', broker.root_path) + self.assertTrue(broker.is_root_container()) + + # if root is marked deleted, it still considers itself to be a root + broker.delete_db(next(self.ts).internal) + self.assertEqual('root_a', broker.root_account) + self.assertEqual('root_c', broker.root_container) + self.assertEqual('root_a/root_c', broker.root_path) + self.assertTrue(broker.is_root_container()) + # check the values are not just being cached + broker = ContainerBroker(db_path) + self.assertEqual('root_a', broker.root_account) + self.assertEqual('root_c', broker.root_container) + self.assertEqual('root_a/root_c', broker.root_path) + self.assertTrue(broker.is_root_container()) + + # check a shard container + db_path = os.path.join(tempdir, 'shard_container.db') + broker = ContainerBroker( + db_path, account='.shards_root_a', container='c_shard') + broker.initialize(next(self.ts).internal, 1) + # now the metadata is significant... + metadata = { + 'X-Container-Sysmeta-Shard-Quoted-Root': + ('root_a/root_c', next(self.ts).internal)} + broker.update_metadata(metadata) + broker.account = broker.container = None + broker._root_account = broker._root_container = None + + self.assertEqual('root_a', broker.root_account) + self.assertEqual('root_c', broker.root_container) + self.assertEqual('root_a/root_c', broker.root_path) + self.assertFalse(broker.is_root_container()) + + # check validation + def check_validation(root_value): + metadata = { + 'X-Container-Sysmeta-Shard-Quoted-Root': + (root_value, next(self.ts).internal)} + broker.update_metadata(metadata) + broker.account = broker.container = None + broker._root_account = broker._root_container = None + with self.assertRaises(ValueError) as cm: + broker.root_account + self.assertIn('Expected X-Container-Sysmeta-Shard-Quoted-Root', + str(cm.exception)) + with self.assertRaises(ValueError): + broker.root_container + + check_validation('root_a') + check_validation('/root_a') + check_validation('/root_a/root_c') + check_validation('/root_a/root_c/blah') + check_validation('/') + def test_resolve_shard_range_states(self): self.assertIsNone(ContainerBroker.resolve_shard_range_states(None)) self.assertIsNone(ContainerBroker.resolve_shard_range_states([])) @@ -4422,7 +4584,8 @@ class TestContainerBroker(unittest.TestCase): do_test(orig_state, ts, test_state, ts_newer, test_state, ts_newer) - def _check_object_stats_when_sharded(self, a, c, root_a, root_c, tempdir): + def _check_object_stats_when_old_style_sharded( + self, a, c, root_a, root_c, tempdir): # common setup and assertions for root and shard containers db_path = os.path.join( tempdir, 'containers', 'part', 'suffix', 'hash', 'container.db') @@ -4449,6 +4612,51 @@ class TestContainerBroker(unittest.TestCase): self.assertEqual(14, broker.get_info()['bytes_used']) return broker + @with_tempdir + def test_object_stats_old_style_root_container(self, tempdir): + broker = self._check_object_stats_when_old_style_sharded( + 'a', 'c', 'a', 'c', tempdir) + self.assertTrue(broker.is_root_container()) # sanity + self.assertTrue(broker.set_sharded_state()) + self.assertEqual(120, broker.get_info()['object_count']) + self.assertEqual(1999, broker.get_info()['bytes_used']) + + @with_tempdir + def test_object_stats_old_style_shard_container(self, tempdir): + broker = self._check_object_stats_when_old_style_sharded( + '.shard_a', 'c-blah', 'a', 'c', tempdir) + self.assertFalse(broker.is_root_container()) # sanity + self.assertTrue(broker.set_sharded_state()) + self.assertEqual(0, broker.get_info()['object_count']) + self.assertEqual(0, broker.get_info()['bytes_used']) + + def _check_object_stats_when_sharded(self, a, c, root_a, root_c, tempdir): + # common setup and assertions for root and shard containers + db_path = os.path.join( + tempdir, 'containers', 'part', 'suffix', 'hash', 'container.db') + broker = ContainerBroker( + db_path, account=a, container=c) + broker.initialize(next(self.ts).internal, 0) + broker.set_sharding_sysmeta('Quoted-Root', '%s/%s' % (root_a, root_c)) + broker.merge_items([{'name': 'obj', 'size': 14, 'etag': 'blah', + 'content_type': 'text/plain', 'deleted': 0, + 'created_at': Timestamp.now().internal}]) + self.assertEqual(1, broker.get_info()['object_count']) + self.assertEqual(14, broker.get_info()['bytes_used']) + + broker.enable_sharding(next(self.ts)) + self.assertTrue(broker.set_sharding_state()) + sr_1 = ShardRange( + '%s/%s1' % (root_a, root_c), Timestamp.now(), lower='', upper='m', + object_count=99, bytes_used=999, state=ShardRange.ACTIVE) + sr_2 = ShardRange( + '%s/%s2' % (root_a, root_c), Timestamp.now(), lower='m', upper='', + object_count=21, bytes_used=1000, state=ShardRange.ACTIVE) + broker.merge_shard_ranges([sr_1, sr_2]) + self.assertEqual(1, broker.get_info()['object_count']) + self.assertEqual(14, broker.get_info()['bytes_used']) + return broker + @with_tempdir def test_object_stats_root_container(self, tempdir): broker = self._check_object_stats_when_sharded( diff --git a/test/unit/container/test_replicator.py b/test/unit/container/test_replicator.py index 98b43e6fcb..bd81b8fee6 100644 --- a/test/unit/container/test_replicator.py +++ b/test/unit/container/test_replicator.py @@ -924,7 +924,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): # our sync pointer self.assertEqual(broker.get_reconciler_sync(), 2) - def test_misplaced_rows_replicate_and_enqueue_from_shard(self): + def test_misplaced_rows_replicate_and_enqueue_from_old_style_shard(self): # force all timestamps to fall in same hour ts = (Timestamp(t) for t in itertools.count(int(time.time()) // 3600 * 3600)) @@ -1009,6 +1009,91 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): # our sync pointer self.assertEqual(broker.get_reconciler_sync(), 2) + def test_misplaced_rows_replicate_and_enqueue_from_shard(self): + # force all timestamps to fall in same hour + ts = (Timestamp(t) for t in + itertools.count(int(time.time()) // 3600 * 3600)) + policy = random.choice(list(POLICIES)) + broker = self._get_broker('.shards_a', 'some-other-c', node_index=0) + broker.initialize(next(ts).internal, policy.idx) + broker.set_sharding_sysmeta('Quoted-Root', 'a/c') + remote_policy = random.choice([p for p in POLICIES if p is not + policy]) + remote_broker = self._get_broker( + '.shards_a', 'some-other-c', node_index=1) + remote_broker.initialize(next(ts).internal, remote_policy.idx) + + # add a misplaced row to *local* broker + obj_put_timestamp = next(ts).internal + broker.put_object( + 'o', obj_put_timestamp, 0, 'content-type', + 'etag', storage_policy_index=remote_policy.idx) + misplaced = broker.get_misplaced_since(-1, 10) + self.assertEqual(len(misplaced), 1) + # since this row is misplaced it doesn't show up in count + self.assertEqual(broker.get_info()['object_count'], 0) + + # add another misplaced row to *local* broker with composite timestamp + ts_data = next(ts) + ts_ctype = next(ts) + ts_meta = next(ts) + broker.put_object( + 'o2', ts_data.internal, 0, 'content-type', + 'etag', storage_policy_index=remote_policy.idx, + ctype_timestamp=ts_ctype.internal, meta_timestamp=ts_meta.internal) + misplaced = broker.get_misplaced_since(-1, 10) + self.assertEqual(len(misplaced), 2) + # since this row is misplaced it doesn't show up in count + self.assertEqual(broker.get_info()['object_count'], 0) + + # replicate + part, node = self._get_broker_part_node(broker) + daemon = self._run_once(node) + # push to remote, and third node was missing (also maybe reconciler) + self.assertTrue(2 < daemon.stats['rsync'] <= 3, daemon.stats['rsync']) + + # grab the rsynced instance of remote_broker + remote_broker = self._get_broker( + '.shards_a', 'some-other-c', node_index=1) + + # remote has misplaced rows too now + misplaced = remote_broker.get_misplaced_since(-1, 10) + self.assertEqual(len(misplaced), 2) + + # and the correct policy_index and object_count + info = remote_broker.get_info() + expectations = { + 'object_count': 0, + 'storage_policy_index': policy.idx, + } + for key, value in expectations.items(): + self.assertEqual(info[key], value) + + # and we should have also enqueued these rows in a single reconciler, + # since we forced the object timestamps to be in the same hour. + reconciler = daemon.get_reconciler_broker(misplaced[0]['created_at']) + # but it may not be on the same node as us anymore though... + reconciler = self._get_broker(reconciler.account, + reconciler.container, node_index=0) + self.assertEqual(reconciler.get_info()['object_count'], 2) + objects = reconciler.list_objects_iter( + 10, '', None, None, None, None, storage_policy_index=0) + self.assertEqual(len(objects), 2) + # NB: reconciler work is for the *root* container! + expected = ('%s:/a/c/o' % remote_policy.idx, obj_put_timestamp, 0, + 'application/x-put', obj_put_timestamp) + self.assertEqual(objects[0], expected) + # the second object's listing has ts_meta as its last modified time + # but its full composite timestamp is in the hash field. + expected = ('%s:/a/c/o2' % remote_policy.idx, ts_meta.internal, 0, + 'application/x-put', + encode_timestamps(ts_data, ts_ctype, ts_meta)) + self.assertEqual(objects[1], expected) + + # having safely enqueued to the reconciler we can advance + # our sync pointer + self.assertEqual(broker.get_reconciler_sync(), 2) + def test_multiple_out_sync_reconciler_enqueue_normalize(self): ts = (Timestamp(t).internal for t in itertools.count(int(time.time()))) diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py index 1175c7620a..a2fa25569d 100644 --- a/test/unit/container/test_sharder.py +++ b/test/unit/container/test_sharder.py @@ -78,10 +78,25 @@ class BaseTestSharder(unittest.TestCase): broker.initialize() return broker + def _make_old_style_sharding_broker(self, account='a', container='c', + shard_bounds=(('', 'middle'), + ('middle', ''))): + broker = self._make_broker(account=account, container=container) + broker.set_sharding_sysmeta('Root', 'a/c') + old_db_id = broker.get_info()['id'] + broker.enable_sharding(next(self.ts_iter)) + shard_ranges = self._make_shard_ranges( + shard_bounds, state=ShardRange.CLEAVED) + broker.merge_shard_ranges(shard_ranges) + self.assertTrue(broker.set_sharding_state()) + broker = ContainerBroker(broker.db_file, account='a', container='c') + self.assertNotEqual(old_db_id, broker.get_info()['id']) # sanity check + return broker + def _make_sharding_broker(self, account='a', container='c', shard_bounds=(('', 'middle'), ('middle', ''))): broker = self._make_broker(account=account, container=container) - broker.set_sharding_sysmeta('Root', 'a/c') + broker.set_sharding_sysmeta('Quoted-Root', 'a/c') old_db_id = broker.get_info()['id'] broker.enable_sharding(next(self.ts_iter)) shard_ranges = self._make_shard_ranges( @@ -2279,7 +2294,7 @@ class TestSharder(BaseTestSharder): '.shards_', 'shard_c', (('l', 'mid'), ('mid', 'u'))) self.assertEqual(1, broker.get_own_shard_range().deleted) - def test_identify_sharding_candidate(self): + def test_identify_sharding_old_style_candidate(self): brokers = [self._make_broker(container='c%03d' % i) for i in range(6)] for broker in brokers: broker.set_sharding_sysmeta('Root', 'a/c') @@ -2333,6 +2348,60 @@ class TestSharder(BaseTestSharder): self._assert_recon_stats( expected_recon, sharder, 'sharding_candidates') + def test_identify_sharding_candidate(self): + brokers = [self._make_broker(container='c%03d' % i) for i in range(6)] + for broker in brokers: + broker.set_sharding_sysmeta('Quoted-Root', 'a/c') + node = {'index': 2} + # containers are all empty + with self._mock_sharder() as sharder: + for broker in brokers: + sharder._identify_sharding_candidate(broker, node) + expected_stats = {} + self._assert_stats(expected_stats, sharder, 'sharding_candidates') + + objects = [ + ['obj%3d' % i, next(self.ts_iter).internal, i, 'text/plain', + 'etag%s' % i, 0] for i in range(160)] + + # one container has 100 objects, which is below the sharding threshold + for obj in objects[:100]: + brokers[0].put_object(*obj) + conf = {'recon_cache_path': self.tempdir} + with self._mock_sharder(conf=conf) as sharder: + for broker in brokers: + sharder._identify_sharding_candidate(broker, node) + self.assertFalse(sharder.sharding_candidates) + expected_recon = { + 'found': 0, + 'top': []} + sharder._report_stats() + self._assert_recon_stats( + expected_recon, sharder, 'sharding_candidates') + + # reduce the sharding threshold and the container is reported + conf = {'shard_container_threshold': 100, + 'recon_cache_path': self.tempdir} + with self._mock_sharder(conf=conf) as sharder: + with mock_timestamp_now() as now: + for broker in brokers: + sharder._identify_sharding_candidate(broker, node) + stats_0 = {'path': brokers[0].db_file, + 'node_index': 2, + 'account': 'a', + 'container': 'c000', + 'root': 'a/c', + 'object_count': 100, + 'meta_timestamp': now.internal, + 'file_size': os.stat(brokers[0].db_file).st_size} + self.assertEqual([stats_0], sharder.sharding_candidates) + expected_recon = { + 'found': 1, + 'top': [stats_0]} + sharder._report_stats() + self._assert_recon_stats( + expected_recon, sharder, 'sharding_candidates') + # repeat with handoff node and db_file error with self._mock_sharder(conf=conf) as sharder: with mock.patch('os.stat', side_effect=OSError('test error')): @@ -3489,7 +3558,7 @@ class TestSharder(BaseTestSharder): self._check_objects([expected], expected_shard_dbs[0]) self._check_objects([], broker.db_file) - def _setup_find_ranges(self, account, cont, lower, upper): + def _setup_old_style_find_ranges(self, account, cont, lower, upper): broker = self._make_broker(account=account, container=cont) own_sr = ShardRange('%s/%s' % (account, cont), Timestamp.now(), lower, upper) @@ -3503,6 +3572,106 @@ class TestSharder(BaseTestSharder): broker.put_object(*obj) return broker, objects + def _check_old_style_find_shard_ranges_none_found(self, broker, objects): + with self._mock_sharder() as sharder: + num_found = sharder._find_shard_ranges(broker) + self.assertGreater(sharder.split_size, len(objects)) + self.assertEqual(0, num_found) + self.assertFalse(broker.get_shard_ranges()) + expected_stats = {'attempted': 1, 'success': 0, 'failure': 1, + 'found': 0, 'min_time': mock.ANY, + 'max_time': mock.ANY} + stats = self._assert_stats(expected_stats, sharder, 'scanned') + self.assertGreaterEqual(stats['max_time'], stats['min_time']) + + with self._mock_sharder( + conf={'shard_container_threshold': 200}) as sharder: + num_found = sharder._find_shard_ranges(broker) + self.assertEqual(sharder.split_size, len(objects)) + self.assertEqual(0, num_found) + self.assertFalse(broker.get_shard_ranges()) + expected_stats = {'attempted': 1, 'success': 0, 'failure': 1, + 'found': 0, 'min_time': mock.ANY, + 'max_time': mock.ANY} + stats = self._assert_stats(expected_stats, sharder, 'scanned') + self.assertGreaterEqual(stats['max_time'], stats['min_time']) + + def test_old_style_find_shard_ranges_none_found_root(self): + broker, objects = self._setup_old_style_find_ranges('a', 'c', '', '') + self._check_old_style_find_shard_ranges_none_found(broker, objects) + + def test_old_style_find_shard_ranges_none_found_shard(self): + broker, objects = self._setup_old_style_find_ranges( + '.shards_a', 'c', 'lower', 'upper') + self._check_old_style_find_shard_ranges_none_found(broker, objects) + + def _check_old_style_find_shard_ranges_finds_two( + self, account, cont, lower, upper): + def check_ranges(): + self.assertEqual(2, len(broker.get_shard_ranges())) + expected_ranges = [ + ShardRange( + ShardRange.make_path('.int_shards_a', 'c', cont, now, 0), + now, lower, objects[98][0], 99), + ShardRange( + ShardRange.make_path('.int_shards_a', 'c', cont, now, 1), + now, objects[98][0], upper, 1), + ] + self._assert_shard_ranges_equal(expected_ranges, + broker.get_shard_ranges()) + + # first invocation finds both ranges + broker, objects = self._setup_old_style_find_ranges( + account, cont, lower, upper) + with self._mock_sharder(conf={'shard_container_threshold': 199, + 'auto_create_account_prefix': '.int_'} + ) as sharder: + with mock_timestamp_now() as now: + num_found = sharder._find_shard_ranges(broker) + self.assertEqual(99, sharder.split_size) + self.assertEqual(2, num_found) + check_ranges() + expected_stats = {'attempted': 1, 'success': 1, 'failure': 0, + 'found': 2, 'min_time': mock.ANY, + 'max_time': mock.ANY} + stats = self._assert_stats(expected_stats, sharder, 'scanned') + self.assertGreaterEqual(stats['max_time'], stats['min_time']) + + # second invocation finds none + with self._mock_sharder(conf={'shard_container_threshold': 199, + 'auto_create_account_prefix': '.int_'} + ) as sharder: + num_found = sharder._find_shard_ranges(broker) + self.assertEqual(0, num_found) + self.assertEqual(2, len(broker.get_shard_ranges())) + check_ranges() + expected_stats = {'attempted': 0, 'success': 0, 'failure': 0, + 'found': 0, 'min_time': mock.ANY, + 'max_time': mock.ANY} + stats = self._assert_stats(expected_stats, sharder, 'scanned') + self.assertGreaterEqual(stats['max_time'], stats['min_time']) + + def test_old_style_find_shard_ranges_finds_two_root(self): + self._check_old_style_find_shard_ranges_finds_two('a', 'c', '', '') + + def test_old_style_find_shard_ranges_finds_two_shard(self): + self._check_old_style_find_shard_ranges_finds_two( + '.shards_a', 'c_', 'l', 'u') + + def _setup_find_ranges(self, account, cont, lower, upper): + broker = self._make_broker(account=account, container=cont) + own_sr = ShardRange('%s/%s' % (account, cont), Timestamp.now(), + lower, upper) + broker.merge_shard_ranges([own_sr]) + broker.set_sharding_sysmeta('Quoted-Root', 'a/c') + objects = [ + # some of these are misplaced objects + ['obj%3d' % i, self.ts_encoded(), i, 'text/plain', 'etag%s' % i, 0] + for i in range(100)] + for obj in objects: + broker.put_object(*obj) + return broker, objects + def _check_find_shard_ranges_none_found(self, broker, objects): with self._mock_sharder() as sharder: num_found = sharder._find_shard_ranges(broker) @@ -4144,7 +4313,7 @@ class TestSharder(BaseTestSharder): self._assert_stats(expected_stats, sharder, 'audit_root') mocked.assert_not_called() - def test_audit_shard_container(self): + def test_audit_old_style_shard_container(self): broker = self._make_broker(account='.shards_a', container='shard_c') broker.set_sharding_sysmeta('Root', 'a/c') # include overlaps to verify correct match for updating own shard range @@ -4282,6 +4451,144 @@ class TestSharder(BaseTestSharder): assert_ok() self.assertTrue(broker.is_deleted()) + def test_audit_shard_container(self): + broker = self._make_broker(account='.shards_a', container='shard_c') + broker.set_sharding_sysmeta('Quoted-Root', 'a/c') + # include overlaps to verify correct match for updating own shard range + shard_bounds = ( + ('a', 'j'), ('k', 't'), ('k', 's'), ('l', 's'), ('s', 'z')) + shard_ranges = self._make_shard_ranges(shard_bounds, ShardRange.ACTIVE) + shard_ranges[1].name = broker.path + expected_stats = {'attempted': 1, 'success': 0, 'failure': 1} + + def call_audit_container(exc=None): + with self._mock_sharder() as sharder: + sharder.logger = debug_logger() + with mock.patch.object(sharder, '_audit_root_container') \ + as mocked, mock.patch.object( + sharder, 'int_client') as mock_swift: + mock_response = mock.MagicMock() + mock_response.headers = {'x-backend-record-type': + 'shard'} + mock_response.body = json.dumps( + [dict(sr) for sr in shard_ranges]) + mock_swift.make_request.return_value = mock_response + mock_swift.make_request.side_effect = exc + mock_swift.make_path = (lambda a, c: + '/v1/%s/%s' % (a, c)) + sharder.reclaim_age = 0 + sharder._audit_container(broker) + mocked.assert_not_called() + return sharder, mock_swift + + # bad account name + broker.account = 'bad_account' + sharder, mock_swift = call_audit_container() + lines = sharder.logger.get_lines_for_level('warning') + self._assert_stats(expected_stats, sharder, 'audit_shard') + self.assertIn('Audit warnings for shard %s' % broker.db_file, lines[0]) + self.assertIn('account not in shards namespace', lines[0]) + self.assertNotIn('root has no matching shard range', lines[0]) + self.assertNotIn('unable to get shard ranges from root', lines[0]) + self.assertIn('Audit failed for shard %s' % broker.db_file, lines[1]) + self.assertIn('missing own shard range', lines[1]) + self.assertFalse(lines[2:]) + self.assertFalse(broker.is_deleted()) + + # missing own shard range + broker.get_info() + sharder, mock_swift = call_audit_container() + lines = sharder.logger.get_lines_for_level('warning') + self._assert_stats(expected_stats, sharder, 'audit_shard') + self.assertIn('Audit failed for shard %s' % broker.db_file, lines[0]) + self.assertIn('missing own shard range', lines[0]) + self.assertNotIn('unable to get shard ranges from root', lines[0]) + self.assertFalse(lines[1:]) + self.assertFalse(sharder.logger.get_lines_for_level('error')) + self.assertFalse(broker.is_deleted()) + + # create own shard range, no match in root + expected_stats = {'attempted': 1, 'success': 1, 'failure': 0} + own_shard_range = broker.get_own_shard_range() # get the default + own_shard_range.lower = 'j' + own_shard_range.upper = 'k' + broker.merge_shard_ranges([own_shard_range]) + sharder, mock_swift = call_audit_container() + lines = sharder.logger.get_lines_for_level('warning') + self.assertIn('Audit warnings for shard %s' % broker.db_file, lines[0]) + self.assertNotIn('account not in shards namespace', lines[0]) + self.assertNotIn('missing own shard range', lines[0]) + self.assertIn('root has no matching shard range', lines[0]) + self.assertNotIn('unable to get shard ranges from root', lines[0]) + self._assert_stats(expected_stats, sharder, 'audit_shard') + self.assertFalse(lines[1:]) + self.assertFalse(sharder.logger.get_lines_for_level('error')) + self.assertFalse(broker.is_deleted()) + expected_headers = {'X-Backend-Record-Type': 'shard', + 'X-Newest': 'true', + 'X-Backend-Include-Deleted': 'True', + 'X-Backend-Override-Deleted': 'true'} + params = {'format': 'json', 'marker': 'j', 'end_marker': 'k'} + mock_swift.make_request.assert_called_once_with( + 'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,), + params=params) + + # create own shard range, failed response from root + expected_stats = {'attempted': 1, 'success': 1, 'failure': 0} + own_shard_range = broker.get_own_shard_range() # get the default + own_shard_range.lower = 'j' + own_shard_range.upper = 'k' + broker.merge_shard_ranges([own_shard_range]) + sharder, mock_swift = call_audit_container( + exc=internal_client.UnexpectedResponse('bad', 'resp')) + lines = sharder.logger.get_lines_for_level('warning') + self.assertIn('Failed to get shard ranges', lines[0]) + self.assertIn('Audit warnings for shard %s' % broker.db_file, lines[1]) + self.assertNotIn('account not in shards namespace', lines[1]) + self.assertNotIn('missing own shard range', lines[1]) + self.assertNotIn('root has no matching shard range', lines[1]) + self.assertIn('unable to get shard ranges from root', lines[1]) + self._assert_stats(expected_stats, sharder, 'audit_shard') + self.assertFalse(lines[2:]) + self.assertFalse(sharder.logger.get_lines_for_level('error')) + self.assertFalse(broker.is_deleted()) + mock_swift.make_request.assert_called_once_with( + 'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,), + params=params) + + def assert_ok(): + sharder, mock_swift = call_audit_container() + self.assertFalse(sharder.logger.get_lines_for_level('warning')) + self.assertFalse(sharder.logger.get_lines_for_level('error')) + self._assert_stats(expected_stats, sharder, 'audit_shard') + params = {'format': 'json', 'marker': 'k', 'end_marker': 't'} + mock_swift.make_request.assert_called_once_with( + 'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,), + params=params) + + # make own shard range match one in root, but different state + shard_ranges[1].timestamp = Timestamp.now() + broker.merge_shard_ranges([shard_ranges[1]]) + now = Timestamp.now() + shard_ranges[1].update_state(ShardRange.SHARDING, state_timestamp=now) + assert_ok() + self.assertFalse(broker.is_deleted()) + # own shard range state is updated from root version + own_shard_range = broker.get_own_shard_range() + self.assertEqual(ShardRange.SHARDING, own_shard_range.state) + self.assertEqual(now, own_shard_range.state_timestamp) + + own_shard_range.update_state(ShardRange.SHARDED, + state_timestamp=Timestamp.now()) + broker.merge_shard_ranges([own_shard_range]) + assert_ok() + + own_shard_range.deleted = 1 + own_shard_range.timestamp = Timestamp.now() + broker.merge_shard_ranges([own_shard_range]) + assert_ok() + self.assertTrue(broker.is_deleted()) + def test_find_and_enable_sharding_candidates(self): broker = self._make_broker() broker.enable_sharding(next(self.ts_iter)) @@ -4760,6 +5067,116 @@ class TestCleavingContext(BaseTestSharder): else: self.fail("Deleted context 'Context-%s' not found") + def test_store_old_style(self): + broker = self._make_old_style_sharding_broker() + old_db_id = broker.get_brokers()[0].get_info()['id'] + last_mod = Timestamp.now() + ctx = CleavingContext(old_db_id, 'curs', 12, 11, 2, True, True, 2, 4) + with mock_timestamp_now(last_mod): + ctx.store(broker) + key = 'X-Container-Sysmeta-Shard-Context-%s' % old_db_id + data = json.loads(broker.metadata[key][0]) + expected = {'ref': old_db_id, + 'cursor': 'curs', + 'max_row': 12, + 'cleave_to_row': 11, + 'last_cleave_to_row': 2, + 'cleaving_done': True, + 'misplaced_done': True, + 'ranges_done': 2, + 'ranges_todo': 4} + self.assertEqual(expected, data) + # last modified is the metadata timestamp + self.assertEqual(broker.metadata[key][1], last_mod.internal) + + def test_store_add_row_load_old_style(self): + # adding row to older db changes only max_row in the context + broker = self._make_old_style_sharding_broker() + old_broker = broker.get_brokers()[0] + old_db_id = old_broker.get_info()['id'] + old_broker.merge_items([old_broker._record_to_dict( + ('obj', next(self.ts_iter).internal, 0, 'text/plain', 'etag', 1))]) + old_max_row = old_broker.get_max_row() + self.assertEqual(1, old_max_row) # sanity check + ctx = CleavingContext(old_db_id, 'curs', 1, 1, 0, True, True) + ctx.store(broker) + + # adding a row changes max row + old_broker.merge_items([old_broker._record_to_dict( + ('obj', next(self.ts_iter).internal, 0, 'text/plain', 'etag', 1))]) + + new_ctx = CleavingContext.load(broker) + self.assertEqual(old_db_id, new_ctx.ref) + self.assertEqual('curs', new_ctx.cursor) + self.assertEqual(2, new_ctx.max_row) + self.assertEqual(1, new_ctx.cleave_to_row) + self.assertEqual(0, new_ctx.last_cleave_to_row) + self.assertTrue(new_ctx.misplaced_done) + self.assertTrue(new_ctx.cleaving_done) + + def test_store_reclaim_load_old_style(self): + # reclaiming rows from older db does not change context + broker = self._make_old_style_sharding_broker() + old_broker = broker.get_brokers()[0] + old_db_id = old_broker.get_info()['id'] + old_broker.merge_items([old_broker._record_to_dict( + ('obj', next(self.ts_iter).internal, 0, 'text/plain', 'etag', 1))]) + old_max_row = old_broker.get_max_row() + self.assertEqual(1, old_max_row) # sanity check + ctx = CleavingContext(old_db_id, 'curs', 1, 1, 0, True, True) + ctx.store(broker) + + self.assertEqual( + 1, len(old_broker.get_objects())) + now = next(self.ts_iter).internal + broker.get_brokers()[0].reclaim(now, now) + self.assertFalse(old_broker.get_objects()) + + new_ctx = CleavingContext.load(broker) + self.assertEqual(old_db_id, new_ctx.ref) + self.assertEqual('curs', new_ctx.cursor) + self.assertEqual(1, new_ctx.max_row) + self.assertEqual(1, new_ctx.cleave_to_row) + self.assertEqual(0, new_ctx.last_cleave_to_row) + self.assertTrue(new_ctx.misplaced_done) + self.assertTrue(new_ctx.cleaving_done) + + def test_store_modify_db_id_load_old_style(self): + # changing id changes ref, so results in a fresh context + broker = self._make_old_style_sharding_broker() + old_broker = broker.get_brokers()[0] + old_db_id = old_broker.get_info()['id'] + ctx = CleavingContext(old_db_id, 'curs', 12, 11, 2, True, True) + ctx.store(broker) + + old_broker.newid('fake_remote_id') + new_db_id = old_broker.get_info()['id'] + self.assertNotEqual(old_db_id, new_db_id) + + new_ctx = CleavingContext.load(broker) + self.assertEqual(new_db_id, new_ctx.ref) + self.assertEqual('', new_ctx.cursor) + # note max_row is dynamically updated during load + self.assertEqual(-1, new_ctx.max_row) + self.assertEqual(None, new_ctx.cleave_to_row) + self.assertEqual(None, new_ctx.last_cleave_to_row) + self.assertFalse(new_ctx.misplaced_done) + self.assertFalse(new_ctx.cleaving_done) + + def test_load_modify_store_load_old_style(self): + broker = self._make_old_style_sharding_broker() + old_db_id = broker.get_brokers()[0].get_info()['id'] + ctx = CleavingContext.load(broker) + self.assertEqual(old_db_id, ctx.ref) + self.assertEqual('', ctx.cursor) # sanity check + ctx.cursor = 'curs' + ctx.misplaced_done = True + ctx.store(broker) + ctx = CleavingContext.load(broker) + self.assertEqual(old_db_id, ctx.ref) + self.assertEqual('curs', ctx.cursor) + self.assertTrue(ctx.misplaced_done) + def test_store(self): broker = self._make_sharding_broker() old_db_id = broker.get_brokers()[0].get_info()['id'] diff --git a/test/unit/container/test_updater.py b/test/unit/container/test_updater.py index 06471fbab8..56f3c59438 100644 --- a/test/unit/container/test_updater.py +++ b/test/unit/container/test_updater.py @@ -350,7 +350,7 @@ class TestContainerUpdater(unittest.TestCase): self.assertEqual(info['reported_object_count'], 1) self.assertEqual(info['reported_bytes_used'], 3) - def test_shard_container(self): + def test_old_style_shard_container(self): cu = self._get_container_updater() cu.run_once() containers_dir = os.path.join(self.sda1, DATADIR) @@ -439,5 +439,94 @@ class TestContainerUpdater(unittest.TestCase): self.assertEqual(info['reported_object_count'], 0) self.assertEqual(info['reported_bytes_used'], 0) + def test_shard_container(self): + cu = self._get_container_updater() + cu.run_once() + containers_dir = os.path.join(self.sda1, DATADIR) + os.mkdir(containers_dir) + cu.run_once() + self.assertTrue(os.path.exists(containers_dir)) + subdir = os.path.join(containers_dir, 'subdir') + os.mkdir(subdir) + cb = ContainerBroker(os.path.join(subdir, 'hash.db'), + account='.shards_a', container='c') + cb.initialize(normalize_timestamp(1), 0) + cb.set_sharding_sysmeta('Quoted-Root', 'a/c') + self.assertFalse(cb.is_root_container()) + cu.run_once() + info = cb.get_info() + self.assertEqual(info['object_count'], 0) + self.assertEqual(info['bytes_used'], 0) + self.assertEqual(info['reported_put_timestamp'], '0') + self.assertEqual(info['reported_delete_timestamp'], '0') + self.assertEqual(info['reported_object_count'], 0) + self.assertEqual(info['reported_bytes_used'], 0) + + cb.put_object('o', normalize_timestamp(2), 3, 'text/plain', + '68b329da9893e34099c7d8ad5cb9c940') + # Fake us having already reported *bad* stats under swift 2.18.0 + cb.reported('0', '0', 1, 3) + + # Should fail with a bunch of connection-refused + cu.run_once() + info = cb.get_info() + self.assertEqual(info['object_count'], 1) + self.assertEqual(info['bytes_used'], 3) + self.assertEqual(info['reported_put_timestamp'], '0') + self.assertEqual(info['reported_delete_timestamp'], '0') + self.assertEqual(info['reported_object_count'], 1) + self.assertEqual(info['reported_bytes_used'], 3) + + def accept(sock, addr, return_code): + try: + with Timeout(3): + inc = sock.makefile('rb') + out = sock.makefile('wb') + out.write(b'HTTP/1.1 %d OK\r\nContent-Length: 0\r\n\r\n' % + return_code) + out.flush() + self.assertEqual(inc.readline(), + b'PUT /sda1/2/.shards_a/c HTTP/1.1\r\n') + headers = {} + line = inc.readline() + while line and line != b'\r\n': + headers[line.split(b':')[0].lower()] = \ + line.split(b':')[1].strip() + line = inc.readline() + self.assertIn(b'x-put-timestamp', headers) + self.assertIn(b'x-delete-timestamp', headers) + self.assertIn(b'x-object-count', headers) + self.assertIn(b'x-bytes-used', headers) + except BaseException as err: + import traceback + traceback.print_exc() + return err + return None + bindsock = listen_zero() + + def spawn_accepts(): + events = [] + for _junk in range(2): + sock, addr = bindsock.accept() + events.append(spawn(accept, sock, addr, 201)) + return events + + spawned = spawn(spawn_accepts) + for dev in cu.get_account_ring().devs: + if dev is not None: + dev['port'] = bindsock.getsockname()[1] + cu.run_once() + for event in spawned.wait(): + err = event.wait() + if err: + raise err + info = cb.get_info() + self.assertEqual(info['object_count'], 1) + self.assertEqual(info['bytes_used'], 3) + self.assertEqual(info['reported_put_timestamp'], '0000000001.00000') + self.assertEqual(info['reported_delete_timestamp'], '0') + self.assertEqual(info['reported_object_count'], 0) + self.assertEqual(info['reported_bytes_used'], 0) + if __name__ == '__main__': unittest.main() diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index 3e5a5f16f7..3113dc2de5 100644 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -1066,6 +1066,7 @@ class TestObjectController(unittest.TestCase): # User-Agent is updated. expected_post_headers['User-Agent'] = 'object-updater %s' % os.getpid() expected_post_headers['X-Backend-Accept-Redirect'] = 'true' + expected_post_headers['X-Backend-Accept-Quoted-Location'] = 'true' self.assertDictEqual(expected_post_headers, actual_headers) self.assertFalse( os.listdir(os.path.join( @@ -1078,7 +1079,8 @@ class TestObjectController(unittest.TestCase): self._test_PUT_then_POST_async_pendings( POLICIES[1], update_etag='override_etag') - def _check_PUT_redirected_async_pending(self, container_path=None): + def _check_PUT_redirected_async_pending(self, container_path=None, + old_style=False): # When container update is redirected verify that the redirect location # is persisted in the async pending file. policy = POLICIES[0] @@ -1097,8 +1099,10 @@ class TestObjectController(unittest.TestCase): 'X-Container-Device': 'cdevice'} if container_path: - # the proxy may include this header - put_headers['X-Backend-Container-Path'] = container_path + # the proxy may include either header + hdr = ('X-Backend-Container-Path' if old_style + else 'X-Backend-Quoted-Container-Path') + put_headers[hdr] = container_path expected_update_path = '/cdevice/99/%s/o' % container_path else: expected_update_path = '/cdevice/99/a/c/o' @@ -1176,6 +1180,10 @@ class TestObjectController(unittest.TestCase): def test_PUT_redirected_async_pending_with_container_path(self): self._check_PUT_redirected_async_pending(container_path='.another/c') + def test_PUT_redirected_async_pending_with_old_style_container_path(self): + self._check_PUT_redirected_async_pending( + container_path='.another/c', old_style=True) + def test_POST_quarantine_zbyte(self): timestamp = normalize_timestamp(time()) req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, @@ -5379,7 +5387,7 @@ class TestObjectController(unittest.TestCase): 'X-Backend-Container-Update-Override-Content-Type': 'ignored', 'X-Backend-Container-Update-Override-Foo': 'ignored'}) - def test_PUT_container_update_to_shard(self): + def test_PUT_container_update_to_old_style_shard(self): # verify that alternate container update path is respected when # included in request headers def do_test(container_path, expected_path, expected_container_path): @@ -5469,6 +5477,96 @@ class TestObjectController(unittest.TestCase): do_test('too/many/parts', 'a/c', None) do_test('/leading/slash', 'a/c', None) + def test_PUT_container_update_to_shard(self): + # verify that alternate container update path is respected when + # included in request headers + def do_test(container_path, expected_path, expected_container_path): + policy = random.choice(list(POLICIES)) + container_updates = [] + + def capture_updates( + ip, port, method, path, headers, *args, **kwargs): + container_updates.append((ip, port, method, path, headers)) + + pickle_async_update_args = [] + + def fake_pickle_async_update(*args): + pickle_async_update_args.append(args) + + diskfile_mgr = self.object_controller._diskfile_router[policy] + diskfile_mgr.pickle_async_update = fake_pickle_async_update + + ts_put = next(self.ts) + headers = { + 'X-Timestamp': ts_put.internal, + 'X-Trans-Id': '123', + 'X-Container-Host': 'chost:cport', + 'X-Container-Partition': 'cpartition', + 'X-Container-Device': 'cdevice', + 'Content-Type': 'text/plain', + 'X-Object-Sysmeta-Ec-Frag-Index': 0, + 'X-Backend-Storage-Policy-Index': int(policy), + } + if container_path is not None: + headers['X-Backend-Quoted-Container-Path'] = container_path + + req = Request.blank('/sda1/0/a/c/o', method='PUT', + headers=headers, body='') + with mocked_http_conn( + 500, give_connect=capture_updates) as fake_conn: + with fake_spawn(): + resp = req.get_response(self.object_controller) + with self.assertRaises(StopIteration): + next(fake_conn.code_iter) + self.assertEqual(resp.status_int, 201) + self.assertEqual(len(container_updates), 1) + # verify expected path used in update request + ip, port, method, path, headers = container_updates[0] + self.assertEqual(ip, 'chost') + self.assertEqual(port, 'cport') + self.assertEqual(method, 'PUT') + self.assertEqual(path, '/cdevice/cpartition/%s/o' % expected_path) + + # verify that the picked update *always* has root container + self.assertEqual(1, len(pickle_async_update_args)) + (objdevice, account, container, obj, data, timestamp, + policy) = pickle_async_update_args[0] + self.assertEqual(objdevice, 'sda1') + self.assertEqual(account, 'a') # NB user account + self.assertEqual(container, 'c') # NB root container + self.assertEqual(obj, 'o') + self.assertEqual(timestamp, ts_put.internal) + self.assertEqual(policy, policy) + expected_data = { + 'headers': HeaderKeyDict({ + 'X-Size': '0', + 'User-Agent': 'object-server %s' % os.getpid(), + 'X-Content-Type': 'text/plain', + 'X-Timestamp': ts_put.internal, + 'X-Trans-Id': '123', + 'Referer': 'PUT http://localhost/sda1/0/a/c/o', + 'X-Backend-Storage-Policy-Index': int(policy), + 'X-Etag': 'd41d8cd98f00b204e9800998ecf8427e'}), + 'obj': 'o', + 'account': 'a', + 'container': 'c', + 'op': 'PUT'} + if expected_container_path: + expected_data['container_path'] = expected_container_path + self.assertEqual(expected_data, data) + + do_test('a_shard/c_shard', 'a_shard/c_shard', 'a_shard/c_shard') + do_test('', 'a/c', None) + do_test(None, 'a/c', None) + # TODO: should these cases trigger a 400 response rather than + # defaulting to root path? + do_test('garbage', 'a/c', None) + do_test('/', 'a/c', None) + do_test('/no-acct', 'a/c', None) + do_test('no-cont/', 'a/c', None) + do_test('too/many/parts', 'a/c', None) + do_test('/leading/slash', 'a/c', None) + def test_container_update_async(self): policy = random.choice(list(POLICIES)) req = Request.blank( diff --git a/test/unit/obj/test_updater.py b/test/unit/obj/test_updater.py index deaad94318..232d68db88 100644 --- a/test/unit/obj/test_updater.py +++ b/test/unit/obj/test_updater.py @@ -711,19 +711,24 @@ class TestObjectUpdater(unittest.TestCase): 'X-Backend-Storage-Policy-Index': str(int(policies[0])), 'User-Agent': 'object-updater %s' % os.getpid(), 'X-Backend-Accept-Redirect': 'true', + 'X-Backend-Accept-Quoted-Location': 'true', } - # always expect X-Backend-Accept-Redirect to be true + # always expect X-Backend-Accept-Redirect and + # X-Backend-Accept-Quoted-Location to be true do_test(headers_out, expected, container_path='.shards_a/shard_c') do_test(headers_out, expected) - # ...unless X-Backend-Accept-Redirect is already set + # ...unless they're already set expected['X-Backend-Accept-Redirect'] = 'false' + expected['X-Backend-Accept-Quoted-Location'] = 'false' headers_out_2 = dict(headers_out) headers_out_2['X-Backend-Accept-Redirect'] = 'false' + headers_out_2['X-Backend-Accept-Quoted-Location'] = 'false' do_test(headers_out_2, expected) # updater should add policy header if missing expected['X-Backend-Accept-Redirect'] = 'true' + expected['X-Backend-Accept-Quoted-Location'] = 'true' headers_out['X-Backend-Storage-Policy-Index'] = None do_test(headers_out, expected) @@ -747,7 +752,8 @@ class TestObjectUpdater(unittest.TestCase): 'X-Timestamp': timestamp.internal, 'X-Backend-Storage-Policy-Index': str(int(policy)), 'User-Agent': 'object-updater %s' % os.getpid(), - 'X-Backend-Accept-Redirect': 'true'} + 'X-Backend-Accept-Redirect': 'true', + 'X-Backend-Accept-Quoted-Location': 'true'} for request in requests: self.assertEqual('PUT', request['method']) self.assertDictEqual(expected_headers, request['headers']) @@ -954,9 +960,11 @@ class TestObjectUpdater(unittest.TestCase): # 1st round of redirects, newest redirect should be chosen (301, {'Location': '/.shards_a/c_shard_old/o', 'X-Backend-Redirect-Timestamp': ts_redirect_1.internal}), - (301, {'Location': '/.shards_a/c_shard_new/o', + (301, {'Location': '/.shards_a/c%5Fshard%5Fnew/o', + 'X-Backend-Location-Is-Quoted': 'true', 'X-Backend-Redirect-Timestamp': ts_redirect_2.internal}), - (301, {'Location': '/.shards_a/c_shard_old/o', + (301, {'Location': '/.shards_a/c%5Fshard%5Fold/o', + 'X-Backend-Location-Is-Quoted': 'true', 'X-Backend-Redirect-Timestamp': ts_redirect_1.internal}), # 2nd round of redirects (301, {'Location': '/.shards_a/c_shard_newer/o', diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 3b509c380d..1a63867c0a 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -3832,7 +3832,7 @@ class TestReplicatedObjectController( 'Host': 'localhost:80', 'Referer': '%s http://localhost/v1/a/c/o' % method, 'X-Backend-Storage-Policy-Index': '1', - 'X-Backend-Container-Path': shard_range.name + 'X-Backend-Quoted-Container-Path': shard_range.name }, } check_request(request, **expectations) @@ -3943,7 +3943,7 @@ class TestReplicatedObjectController( 'Host': 'localhost:80', 'Referer': '%s http://localhost/v1/a/c/o' % method, 'X-Backend-Storage-Policy-Index': '1', - 'X-Backend-Container-Path': shard_ranges[1].name + 'X-Backend-Quoted-Container-Path': shard_ranges[1].name }, } check_request(request, **expectations) @@ -4045,7 +4045,7 @@ class TestReplicatedObjectController( 'Host': 'localhost:80', 'Referer': '%s http://localhost/v1/a/c/o' % method, 'X-Backend-Storage-Policy-Index': '1', - 'X-Backend-Container-Path': shard_ranges[1].name + 'X-Backend-Quoted-Container-Path': shard_ranges[1].name }, } check_request(request, **expectations)