diff --git a/etc/proxy-server.conf-sample b/etc/proxy-server.conf-sample index 0923ce15a4..ec8eb8ef9e 100644 --- a/etc/proxy-server.conf-sample +++ b/etc/proxy-server.conf-sample @@ -578,6 +578,8 @@ use = egg:swift#container_sync # Updating those will have to be done manually, as knowing what the true realm # endpoint should be cannot always be guessed. # allow_full_urls = true +# Set this to specify this clusters //realm/cluster as "current" in /info +# current = //REALM/CLUSTER # Note: Put it at the beginning of the pipleline to profile all middleware. But # it is safer to put this after catch_errors, gatekeeper and healthcheck. diff --git a/swift/common/middleware/container_sync.py b/swift/common/middleware/container_sync.py index c5393df4fa..1ea6480700 100644 --- a/swift/common/middleware/container_sync.py +++ b/swift/common/middleware/container_sync.py @@ -28,10 +28,10 @@ class ContainerSync(object): using the container-sync-realms.conf style of container sync. """ - def __init__(self, app, conf): + def __init__(self, app, conf, logger=None): self.app = app self.conf = conf - self.logger = get_logger(conf, log_route='container_sync') + self.logger = logger or get_logger(conf, log_route='container_sync') self.realms_conf = ContainerSyncRealms( os.path.join( conf.get('swift_dir', '/etc/swift'), @@ -39,6 +39,31 @@ class ContainerSync(object): self.logger) self.allow_full_urls = config_true_value( conf.get('allow_full_urls', 'true')) + # configure current realm/cluster for /info + self.realm = self.cluster = None + current = conf.get('current', None) + if current: + try: + self.realm, self.cluster = (p.upper() for p in + current.strip('/').split('/')) + except ValueError: + self.logger.error('Invalid current //REALM/CLUSTER (%s)', + current) + self.register_info() + + def register_info(self): + dct = {} + for realm in self.realms_conf.realms(): + clusters = self.realms_conf.clusters(realm) + if clusters: + dct[realm] = {'clusters': dict((c, {}) for c in clusters)} + if self.realm and self.cluster: + try: + dct[self.realm]['clusters'][self.cluster]['current'] = True + except KeyError: + self.logger.error('Unknown current //REALM/CLUSTER (%s)', + '//%s/%s' % (self.realm, self.cluster)) + register_swift_info('container_sync', realms=dct) @wsgify def __call__(self, req): @@ -102,12 +127,7 @@ class ContainerSync(object): req.environ['swift.authorize_override'] = True if req.path == '/info': # Ensure /info requests get the freshest results - dct = {} - for realm in self.realms_conf.realms(): - clusters = self.realms_conf.clusters(realm) - if clusters: - dct[realm] = {'clusters': dict((c, {}) for c in clusters)} - register_swift_info('container_sync', realms=dct) + self.register_info() return self.app diff --git a/swift/container/sync.py b/swift/container/sync.py index 0a3d0209e8..aa1ecc6929 100644 --- a/swift/container/sync.py +++ b/swift/container/sync.py @@ -35,6 +35,7 @@ from swift.common.utils import ( whataremyips) from swift.common.daemon import Daemon from swift.common.http import HTTP_UNAUTHORIZED, HTTP_NOT_FOUND +from swift.common.storage_policy import POLICIES, POLICY_INDEX class ContainerSync(Daemon): @@ -99,11 +100,9 @@ class ContainerSync(Daemon): section of the container-server.conf :param container_ring: If None, the /container.ring.gz will be loaded. This is overridden by unit tests. - :param object_ring: If None, the /object.ring.gz will be loaded. - This is overridden by unit tests. """ - def __init__(self, conf, container_ring=None, object_ring=None): + def __init__(self, conf, container_ring=None): #: The dict of configuration values from the [container-sync] section #: of the container-server.conf. self.conf = conf @@ -150,17 +149,24 @@ class ContainerSync(Daemon): self.container_failures = 0 #: Time of last stats report. self.reported = time() - swift_dir = conf.get('swift_dir', '/etc/swift') + self.swift_dir = conf.get('swift_dir', '/etc/swift') #: swift.common.ring.Ring for locating containers. - self.container_ring = container_ring or Ring(swift_dir, + self.container_ring = container_ring or Ring(self.swift_dir, ring_name='container') - #: swift.common.ring.Ring for locating objects. - self.object_ring = object_ring or Ring(swift_dir, ring_name='object') self._myips = whataremyips() self._myport = int(conf.get('bind_port', 6001)) swift.common.db.DB_PREALLOCATION = \ config_true_value(conf.get('db_preallocation', 'f')) + def get_object_ring(self, policy_idx): + """ + Get the ring object to use based on its policy. + + :policy_idx: policy index as defined in swift.conf + :returns: appropriate ring object + """ + return POLICIES.get_object_ring(policy_idx, self.swift_dir) + def run_forever(self, *args, **kwargs): """ Runs container sync scans until stopped. @@ -361,19 +367,22 @@ class ContainerSync(Daemon): self.logger.increment('deletes') self.logger.timing_since('deletes.timing', start_time) else: - part, nodes = self.object_ring.get_nodes( - info['account'], info['container'], - row['name']) + part, nodes = \ + self.get_object_ring(info['storage_policy_index']). \ + get_nodes(info['account'], info['container'], + row['name']) shuffle(nodes) exc = None looking_for_timestamp = float(row['created_at']) timestamp = -1 headers = body = None + headers_out = {POLICY_INDEX: str(info['storage_policy_index'])} for node in nodes: try: these_headers, this_body = direct_get_object( node, part, info['account'], info['container'], - row['name'], resp_chunk_size=65536) + row['name'], headers=headers_out, + resp_chunk_size=65536) this_timestamp = float(these_headers['x-timestamp']) if this_timestamp > timestamp: timestamp = this_timestamp diff --git a/test/probe/test_container_sync.py b/test/probe/test_container_sync.py new file mode 100644 index 0000000000..2ca483f08c --- /dev/null +++ b/test/probe/test_container_sync.py @@ -0,0 +1,100 @@ +#!/usr/bin/python -u +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import uuid +from urlparse import urlparse +import random +from nose import SkipTest + +from swiftclient import client + +from swift.common.storage_policy import POLICIES +from swift.common.manager import Manager +from test.probe.common import kill_servers, reset_environment + + +def get_current_realm_cluster(url): + parts = urlparse(url) + url = parts.scheme + '://' + parts.netloc + '/info' + http_conn = client.http_connection(url) + try: + info = client.get_capabilities(http_conn) + except client.ClientException: + raise SkipTest('Unable to retrieve cluster info') + try: + realms = info['container_sync']['realms'] + except KeyError: + raise SkipTest('Unable to find container sync realms') + for realm, realm_info in realms.items(): + for cluster, options in realm_info['clusters'].items(): + if options.get('current', False): + return realm, cluster + raise SkipTest('Unable find current realm cluster') + + +class TestContainerSync(unittest.TestCase): + + def setUp(self): + (self.pids, self.port2server, self.account_ring, self.container_ring, + self.object_ring, self.policy, self.url, self.token, + self.account, self.configs) = reset_environment() + self.realm, self.cluster = get_current_realm_cluster(self.url) + + def tearDown(self): + kill_servers(self.port2server, self.pids) + + def test_sync(self): + base_headers = {'X-Container-Sync-Key': 'secret'} + + # setup dest container + dest_container = 'dest-container-%s' % uuid.uuid4() + dest_headers = base_headers.copy() + dest_policy = None + if len(POLICIES) > 1: + dest_policy = random.choice(list(POLICIES)) + dest_headers['X-Storage-Policy'] = dest_policy.name + client.put_container(self.url, self.token, dest_container, + headers=dest_headers) + + # setup source container + source_container = 'source-container-%s' % uuid.uuid4() + source_headers = base_headers.copy() + sync_to = '//%s/%s/%s/%s' % (self.realm, self.cluster, self.account, + dest_container) + source_headers['X-Container-Sync-To'] = sync_to + if dest_policy: + source_policy = random.choice([p for p in POLICIES + if p is not dest_policy]) + source_headers['X-Storage-Policy'] = source_policy.name + client.put_container(self.url, self.token, source_container, + headers=source_headers) + + # upload to source + object_name = 'object-%s' % uuid.uuid4() + client.put_object(self.url, self.token, source_container, object_name, + 'test-body') + + # cycle container-sync + Manager(['container-sync']).once() + + # retrieve from sync'd container + headers, body = client.get_object(self.url, self.token, + dest_container, object_name) + self.assertEqual(body, 'test-body') + + +if __name__ == "__main__": + get_current_realm_cluster('http://localhost:8080') + unittest.main() diff --git a/test/unit/common/middleware/test_container_sync.py b/test/unit/common/middleware/test_container_sync.py index 2956ccee2d..8ef4ad6ec7 100644 --- a/test/unit/common/middleware/test_container_sync.py +++ b/test/unit/common/middleware/test_container_sync.py @@ -19,12 +19,15 @@ import shutil import tempfile import unittest import uuid +import mock from swift.common import swob from swift.common.middleware import container_sync from swift.proxy.controllers.base import _get_cache_key from swift.proxy.controllers.info import InfoController +from test.unit import FakeLogger + class FakeApp(object): @@ -63,6 +66,94 @@ cluster_dfw1 = http://dfw1.host/v1/ def tearDown(self): shutil.rmtree(self.tempdir, ignore_errors=1) + def test_current_not_set(self): + # no 'current' option set by default + self.assertEqual(None, self.sync.realm) + self.assertEqual(None, self.sync.cluster) + info = {} + + def capture_swift_info(key, **options): + info[key] = options + + with mock.patch( + 'swift.common.middleware.container_sync.register_swift_info', + new=capture_swift_info): + self.sync.register_info() + + for realm, realm_info in info['container_sync']['realms'].items(): + for cluster, options in realm_info['clusters'].items(): + self.assertEqual(options.get('current', False), False) + + def test_current_invalid(self): + self.conf = {'swift_dir': self.tempdir, 'current': 'foo'} + self.sync = container_sync.ContainerSync(self.app, self.conf, + logger=FakeLogger()) + self.assertEqual(None, self.sync.realm) + self.assertEqual(None, self.sync.cluster) + info = {} + + def capture_swift_info(key, **options): + info[key] = options + + with mock.patch( + 'swift.common.middleware.container_sync.register_swift_info', + new=capture_swift_info): + self.sync.register_info() + + for realm, realm_info in info['container_sync']['realms'].items(): + for cluster, options in realm_info['clusters'].items(): + self.assertEqual(options.get('current', False), False) + + error_lines = self.sync.logger.get_lines_for_level('error') + self.assertEqual(error_lines, ['Invalid current ' + '//REALM/CLUSTER (foo)']) + + def test_current_in_realms_conf(self): + self.conf = {'swift_dir': self.tempdir, 'current': '//us/dfw1'} + self.sync = container_sync.ContainerSync(self.app, self.conf) + self.assertEqual('US', self.sync.realm) + self.assertEqual('DFW1', self.sync.cluster) + info = {} + + def capture_swift_info(key, **options): + info[key] = options + + with mock.patch( + 'swift.common.middleware.container_sync.register_swift_info', + new=capture_swift_info): + self.sync.register_info() + + for realm, realm_info in info['container_sync']['realms'].items(): + for cluster, options in realm_info['clusters'].items(): + if options.get('current'): + break + self.assertEqual(realm, self.sync.realm) + self.assertEqual(cluster, self.sync.cluster) + + def test_missing_from_realms_conf(self): + self.conf = {'swift_dir': self.tempdir, 'current': 'foo/bar'} + self.sync = container_sync.ContainerSync(self.app, self.conf, + logger=FakeLogger()) + self.assertEqual('FOO', self.sync.realm) + self.assertEqual('BAR', self.sync.cluster) + info = {} + + def capture_swift_info(key, **options): + info[key] = options + + with mock.patch( + 'swift.common.middleware.container_sync.register_swift_info', + new=capture_swift_info): + self.sync.register_info() + + for realm, realm_info in info['container_sync']['realms'].items(): + for cluster, options in realm_info['clusters'].items(): + self.assertEqual(options.get('current', False), False) + + for line in self.sync.logger.get_lines_for_level('error'): + self.assertEqual(line, 'Unknown current ' + '//REALM/CLUSTER (//FOO/BAR)') + def test_pass_through(self): req = swob.Request.blank('/v1/a/c') resp = req.get_response(self.sync) diff --git a/test/unit/container/test_sync.py b/test/unit/container/test_sync.py index c27cf748c2..0800937282 100644 --- a/test/unit/container/test_sync.py +++ b/test/unit/container/test_sync.py @@ -1,3 +1,4 @@ + # Copyright (c) 2010-2012 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -18,12 +19,12 @@ import unittest from contextlib import nested import mock - from test.unit import FakeLogger from swift.container import sync from swift.common import utils from swift.common.exceptions import ClientException - +from swift.common.storage_policy import StoragePolicy, POLICY_INDEX +from test.unit import patch_policies utils.HASH_PATH_SUFFIX = 'endcap' utils.HASH_PATH_PREFIX = 'endcap' @@ -67,6 +68,7 @@ class FakeContainerBroker(object): self.sync_point2 = sync_point2 +@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())]) class TestContainerSync(unittest.TestCase): def test_FileLikeIter(self): @@ -96,10 +98,8 @@ class TestContainerSync(unittest.TestCase): def test_init(self): cring = FakeRing() - oring = FakeRing() - cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring) + cs = sync.ContainerSync({}, container_ring=cring) self.assertTrue(cs.container_ring is cring) - self.assertTrue(cs.object_ring is oring) def test_run_forever(self): # This runs runs_forever with fakes to succeed for two loops, the first @@ -138,11 +138,11 @@ class TestContainerSync(unittest.TestCase): orig_audit_location_generator = sync.audit_location_generator try: sync.ContainerBroker = lambda p: FakeContainerBroker( - p, info={'account': 'a', 'container': 'c'}) + p, info={'account': 'a', 'container': 'c', + 'storage_policy_index': 0}) sync.time = fake_time sync.sleep = fake_sleep - cs = sync.ContainerSync({}, container_ring=FakeRing(), - object_ring=FakeRing()) + cs = sync.ContainerSync({}, container_ring=FakeRing()) sync.audit_location_generator = fake_audit_location_generator cs.run_forever(1, 2, a=3, b=4, verbose=True) except Exception as err: @@ -194,10 +194,10 @@ class TestContainerSync(unittest.TestCase): orig_ContainerBroker = sync.ContainerBroker try: sync.ContainerBroker = lambda p: FakeContainerBroker( - p, info={'account': 'a', 'container': 'c'}) + p, info={'account': 'a', 'container': 'c', + 'storage_policy_index': 0}) sync.time = fake_time - cs = sync.ContainerSync({}, container_ring=FakeRing(), - object_ring=FakeRing()) + cs = sync.ContainerSync({}, container_ring=FakeRing()) sync.audit_location_generator = fake_audit_location_generator cs.run_once(1, 2, a=3, b=4, verbose=True) self.assertEquals(time_calls, [6]) @@ -218,14 +218,12 @@ class TestContainerSync(unittest.TestCase): def test_container_sync_not_db(self): cring = FakeRing() - oring = FakeRing() - cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring) + cs = sync.ContainerSync({}, container_ring=cring) self.assertEquals(cs.container_failures, 0) def test_container_sync_missing_db(self): cring = FakeRing() - oring = FakeRing() - cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring) + cs = sync.ContainerSync({}, container_ring=cring) cs.container_sync('isa.db') self.assertEquals(cs.container_failures, 1) @@ -233,12 +231,12 @@ class TestContainerSync(unittest.TestCase): # Db could be there due to handoff replication so test that we ignore # those. cring = FakeRing() - oring = FakeRing() - cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring) + cs = sync.ContainerSync({}, container_ring=cring) orig_ContainerBroker = sync.ContainerBroker try: sync.ContainerBroker = lambda p: FakeContainerBroker( - p, info={'account': 'a', 'container': 'c'}) + p, info={'account': 'a', 'container': 'c', + 'storage_policy_index': 0}) cs._myips = ['127.0.0.1'] # No match cs._myport = 1 # No match cs.container_sync('isa.db') @@ -265,12 +263,12 @@ class TestContainerSync(unittest.TestCase): def test_container_sync_deleted(self): cring = FakeRing() - oring = FakeRing() - cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring) + cs = sync.ContainerSync({}, container_ring=cring) orig_ContainerBroker = sync.ContainerBroker try: sync.ContainerBroker = lambda p: FakeContainerBroker( - p, info={'account': 'a', 'container': 'c'}, deleted=False) + p, info={'account': 'a', 'container': 'c', + 'storage_policy_index': 0}, deleted=False) cs._myips = ['10.0.0.0'] # Match cs._myport = 1000 # Match # This complete match will cause the 1 container failure since the @@ -279,7 +277,8 @@ class TestContainerSync(unittest.TestCase): self.assertEquals(cs.container_failures, 1) sync.ContainerBroker = lambda p: FakeContainerBroker( - p, info={'account': 'a', 'container': 'c'}, deleted=True) + p, info={'account': 'a', 'container': 'c', + 'storage_policy_index': 0}, deleted=True) # This complete match will not cause any more container failures # since the broker indicates deletion cs.container_sync('isa.db') @@ -289,12 +288,12 @@ class TestContainerSync(unittest.TestCase): def test_container_sync_no_to_or_key(self): cring = FakeRing() - oring = FakeRing() - cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring) + cs = sync.ContainerSync({}, container_ring=cring) orig_ContainerBroker = sync.ContainerBroker try: sync.ContainerBroker = lambda p: FakeContainerBroker( p, info={'account': 'a', 'container': 'c', + 'storage_policy_index': 0, 'x_container_sync_point1': -1, 'x_container_sync_point2': -1}) cs._myips = ['10.0.0.0'] # Match @@ -307,6 +306,7 @@ class TestContainerSync(unittest.TestCase): sync.ContainerBroker = lambda p: FakeContainerBroker( p, info={'account': 'a', 'container': 'c', + 'storage_policy_index': 0, 'x_container_sync_point1': -1, 'x_container_sync_point2': -1}, metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1)}) @@ -320,6 +320,7 @@ class TestContainerSync(unittest.TestCase): sync.ContainerBroker = lambda p: FakeContainerBroker( p, info={'account': 'a', 'container': 'c', + 'storage_policy_index': 0, 'x_container_sync_point1': -1, 'x_container_sync_point2': -1}, metadata={'x-container-sync-key': ('key', 1)}) @@ -333,6 +334,7 @@ class TestContainerSync(unittest.TestCase): sync.ContainerBroker = lambda p: FakeContainerBroker( p, info={'account': 'a', 'container': 'c', + 'storage_policy_index': 0, 'x_container_sync_point1': -1, 'x_container_sync_point2': -1}, metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), @@ -348,6 +350,7 @@ class TestContainerSync(unittest.TestCase): sync.ContainerBroker = lambda p: FakeContainerBroker( p, info={'account': 'a', 'container': 'c', + 'storage_policy_index': 0, 'x_container_sync_point1': -1, 'x_container_sync_point2': -1}, metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), @@ -365,13 +368,13 @@ class TestContainerSync(unittest.TestCase): def test_container_stop_at(self): cring = FakeRing() - oring = FakeRing() - cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring) + cs = sync.ContainerSync({}, container_ring=cring) orig_ContainerBroker = sync.ContainerBroker orig_time = sync.time try: sync.ContainerBroker = lambda p: FakeContainerBroker( p, info={'account': 'a', 'container': 'c', + 'storage_policy_index': 0, 'x_container_sync_point1': -1, 'x_container_sync_point2': -1}, metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), @@ -408,8 +411,7 @@ class TestContainerSync(unittest.TestCase): def test_container_first_loop(self): cring = FakeRing() - oring = FakeRing() - cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring) + cs = sync.ContainerSync({}, container_ring=cring) def fake_hash_path(account, container, obj, raw_digest=False): # Ensures that no rows match for full syncing, ordinal is 0 and @@ -418,6 +420,7 @@ class TestContainerSync(unittest.TestCase): fcb = FakeContainerBroker( 'path', info={'account': 'a', 'container': 'c', + 'storage_policy_index': 0, 'x_container_sync_point1': 2, 'x_container_sync_point2': -1}, metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), @@ -443,6 +446,7 @@ class TestContainerSync(unittest.TestCase): return '\x01' * 16 fcb = FakeContainerBroker('path', info={'account': 'a', 'container': 'c', + 'storage_policy_index': 0, 'x_container_sync_point1': 1, 'x_container_sync_point2': 1}, metadata={'x-container-sync-to': @@ -467,6 +471,7 @@ class TestContainerSync(unittest.TestCase): fcb = FakeContainerBroker( 'path', info={'account': 'a', 'container': 'c', + 'storage_policy_index': 0, 'x_container_sync_point1': 2, 'x_container_sync_point2': -1}, metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), @@ -489,6 +494,7 @@ class TestContainerSync(unittest.TestCase): fcb = FakeContainerBroker( 'path', info={'account': 'a', 'container': 'c', + 'storage_policy_index': 0, 'x_container_sync_point1': 2, 'x_container_sync_point2': -1}, metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), @@ -513,6 +519,7 @@ class TestContainerSync(unittest.TestCase): fcb = FakeContainerBroker( 'path', info={'account': 'a', 'container': 'c', + 'storage_policy_index': 0, 'x_container_sync_point1': 2, 'x_container_sync_point2': -1}, metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), @@ -536,8 +543,7 @@ class TestContainerSync(unittest.TestCase): def test_container_second_loop(self): cring = FakeRing() - oring = FakeRing() - cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring) + cs = sync.ContainerSync({}, container_ring=cring) orig_ContainerBroker = sync.ContainerBroker orig_hash_path = sync.hash_path orig_delete_object = sync.delete_object @@ -554,6 +560,7 @@ class TestContainerSync(unittest.TestCase): fcb = FakeContainerBroker( 'path', info={'account': 'a', 'container': 'c', + 'storage_policy_index': 0, 'x_container_sync_point1': -1, 'x_container_sync_point2': -1}, metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), @@ -583,6 +590,7 @@ class TestContainerSync(unittest.TestCase): fcb = FakeContainerBroker( 'path', info={'account': 'a', 'container': 'c', + 'storage_policy_index': 0, 'x_container_sync_point1': -1, 'x_container_sync_point2': -1}, metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), @@ -603,6 +611,7 @@ class TestContainerSync(unittest.TestCase): fcb = FakeContainerBroker( 'path', info={'account': 'a', 'container': 'c', + 'storage_policy_index': 0, 'x_container_sync_point1': -1, 'x_container_sync_point2': -1}, metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), @@ -659,8 +668,7 @@ class TestContainerSync(unittest.TestCase): self.assertEqual(logger, fake_logger) sync.delete_object = fake_delete_object - cs = sync.ContainerSync({}, container_ring=FakeRing(), - object_ring=FakeRing()) + cs = sync.ContainerSync({}, container_ring=FakeRing()) cs.logger = fake_logger cs.http_proxies = ['http://proxy'] # Success @@ -668,8 +676,9 @@ class TestContainerSync(unittest.TestCase): {'deleted': True, 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', - 'key', FakeContainerBroker('broker'), 'info', realm, - realm_key)) + 'key', FakeContainerBroker('broker'), + {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, + realm, realm_key)) self.assertEquals(cs.container_deletes, 1) exc = [] @@ -684,8 +693,9 @@ class TestContainerSync(unittest.TestCase): {'deleted': True, 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', - 'key', FakeContainerBroker('broker'), 'info', realm, - realm_key)) + 'key', FakeContainerBroker('broker'), + {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, + realm, realm_key)) self.assertEquals(cs.container_deletes, 1) self.assertEquals(len(exc), 1) self.assertEquals(str(exc[-1]), 'test exception') @@ -700,8 +710,9 @@ class TestContainerSync(unittest.TestCase): {'deleted': True, 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', - 'key', FakeContainerBroker('broker'), 'info', realm, - realm_key)) + 'key', FakeContainerBroker('broker'), + {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, + realm, realm_key)) self.assertEquals(cs.container_deletes, 1) self.assertEquals(len(exc), 2) self.assertEquals(str(exc[-1]), 'test client exception') @@ -717,8 +728,9 @@ class TestContainerSync(unittest.TestCase): {'deleted': True, 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', - 'key', FakeContainerBroker('broker'), 'info', realm, - realm_key)) + 'key', FakeContainerBroker('broker'), + {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, + realm, realm_key)) self.assertEquals(cs.container_deletes, 2) self.assertEquals(len(exc), 3) self.assertEquals(str(exc[-1]), 'test client exception: 404') @@ -771,29 +783,31 @@ class TestContainerSync(unittest.TestCase): sync.put_object = fake_put_object - cs = sync.ContainerSync({}, container_ring=FakeRing(), - object_ring=FakeRing()) + cs = sync.ContainerSync({}, container_ring=FakeRing()) cs.logger = fake_logger cs.http_proxies = ['http://proxy'] - def fake_direct_get_object(*args, **kwargs): + def fake_direct_get_object(node, part, account, container, obj, + headers, resp_chunk_size=1): + self.assertEquals(headers[POLICY_INDEX], '0') return ({'other-header': 'other header value', 'etag': '"etagvalue"', 'x-timestamp': '1.2', 'content-type': 'text/plain; swift_bytes=123'}, iter('contents')) - sync.direct_get_object = fake_direct_get_object # Success as everything says it worked self.assertTrue(cs.container_sync_row( {'deleted': False, 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', - 'key', FakeContainerBroker('broker'), { - 'account': 'a', - 'container': 'c'}, realm, realm_key)) + 'key', FakeContainerBroker('broker'), + {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, + realm, realm_key)) self.assertEquals(cs.container_puts, 1) - def fake_direct_get_object(*args, **kwargs): + def fake_direct_get_object(node, part, account, container, obj, + headers, resp_chunk_size=1): + self.assertEquals(headers[POLICY_INDEX], '0') return ({'date': 'date value', 'last-modified': 'last modified value', 'x-timestamp': '1.2', @@ -810,14 +824,16 @@ class TestContainerSync(unittest.TestCase): {'deleted': False, 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', - 'key', FakeContainerBroker('broker'), { - 'account': 'a', - 'container': 'c'}, realm, realm_key)) + 'key', FakeContainerBroker('broker'), + {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, + realm, realm_key)) self.assertEquals(cs.container_puts, 2) exc = [] - def fake_direct_get_object(*args, **kwargs): + def fake_direct_get_object(node, part, account, container, obj, + headers, resp_chunk_size=1): + self.assertEquals(headers[POLICY_INDEX], '0') exc.append(Exception('test exception')) raise exc[-1] @@ -827,16 +843,18 @@ class TestContainerSync(unittest.TestCase): {'deleted': False, 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', - 'key', FakeContainerBroker('broker'), { - 'account': 'a', - 'container': 'c'}, realm, realm_key)) + 'key', FakeContainerBroker('broker'), + {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, + realm, realm_key)) self.assertEquals(cs.container_puts, 2) self.assertEquals(len(exc), 3) self.assertEquals(str(exc[-1]), 'test exception') exc = [] - def fake_direct_get_object(*args, **kwargs): + def fake_direct_get_object(node, part, account, container, obj, + headers, resp_chunk_size=1): + self.assertEquals(headers[POLICY_INDEX], '0') if len(exc) == 0: exc.append(Exception('test other exception')) else: @@ -849,16 +867,18 @@ class TestContainerSync(unittest.TestCase): {'deleted': False, 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', - 'key', FakeContainerBroker('broker'), { - 'account': 'a', - 'container': 'c'}, realm, realm_key)) + 'key', FakeContainerBroker('broker'), + {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, + realm, realm_key)) self.assertEquals(cs.container_puts, 2) self.assertEquals(len(exc), 3) self.assertEquals(str(exc[-3]), 'test other exception') self.assertEquals(str(exc[-2]), 'test client exception') self.assertEquals(str(exc[-1]), 'test client exception') - def fake_direct_get_object(*args, **kwargs): + def fake_direct_get_object(node, part, account, container, obj, + headers, resp_chunk_size=1): + self.assertEquals(headers[POLICY_INDEX], '0') return ({'other-header': 'other header value', 'x-timestamp': '1.2', 'etag': '"etagvalue"'}, iter('contents')) @@ -874,9 +894,9 @@ class TestContainerSync(unittest.TestCase): {'deleted': False, 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', - 'key', FakeContainerBroker('broker'), { - 'account': 'a', - 'container': 'c'}, realm, realm_key)) + 'key', FakeContainerBroker('broker'), + {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, + realm, realm_key)) self.assertEquals(cs.container_puts, 2) self.assert_(re.match('Unauth ', cs.logger.log_dict['info'][0][0][0])) @@ -891,9 +911,9 @@ class TestContainerSync(unittest.TestCase): {'deleted': False, 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', - 'key', FakeContainerBroker('broker'), { - 'account': 'a', - 'container': 'c'}, realm, realm_key)) + 'key', FakeContainerBroker('broker'), + {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, + realm, realm_key)) self.assertEquals(cs.container_puts, 2) self.assert_(re.match('Not found ', cs.logger.log_dict['info'][0][0][0])) @@ -907,9 +927,9 @@ class TestContainerSync(unittest.TestCase): {'deleted': False, 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', - 'key', FakeContainerBroker('broker'), { - 'account': 'a', - 'container': 'c'}, realm, realm_key)) + 'key', FakeContainerBroker('broker'), + {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, + realm, realm_key)) self.assertEquals(cs.container_puts, 2) self.assertTrue( cs.logger.log_dict['exception'][0][0][0].startswith( @@ -922,21 +942,18 @@ class TestContainerSync(unittest.TestCase): def test_select_http_proxy_None(self): cs = sync.ContainerSync( - {'sync_proxy': ''}, container_ring=FakeRing(), - object_ring=FakeRing()) + {'sync_proxy': ''}, container_ring=FakeRing()) self.assertEqual(cs.select_http_proxy(), None) def test_select_http_proxy_one(self): cs = sync.ContainerSync( - {'sync_proxy': 'http://one'}, container_ring=FakeRing(), - object_ring=FakeRing()) + {'sync_proxy': 'http://one'}, container_ring=FakeRing()) self.assertEqual(cs.select_http_proxy(), 'http://one') def test_select_http_proxy_multiple(self): cs = sync.ContainerSync( {'sync_proxy': 'http://one,http://two,http://three'}, - container_ring=FakeRing(), - object_ring=FakeRing()) + container_ring=FakeRing()) self.assertEqual( set(cs.http_proxies), set(['http://one', 'http://two', 'http://three']))