From 61a9d35fd58381b7c299f125ef01d00f9b0203fe Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Wed, 24 Dec 2014 11:52:34 +0800 Subject: [PATCH] Update container sync to use internal client This patch changes container sync to use Internal Client instead of Direct Client. In the current design, container sync uses direct_get_object to get the newest source object(which talks to storage node directly). This works fine for replication storage policies however in erasure coding policies, direct_get_object would only return part of the object(it's encoded as several pieces). Using Internal Client can get the original object in EC case. Note that for the container sync put/delete part, it's working in EC since it's using Simple Client. Signed-off-by: Yuan Zhou DocImpact Change-Id: I91952bc9337f354ce6024bf8392046a1ecf6ecc9 --- doc/manpages/container-server.conf.5 | 4 + etc/container-server.conf-sample | 5 + etc/internal-client.conf-sample | 42 ++++++ swift/common/wsgi.py | 36 ++++- swift/container/sync.py | 125 ++++++++++++---- test/unit/common/test_wsgi.py | 21 +++ test/unit/container/test_sync.py | 213 +++++++++++++++++---------- 7 files changed, 334 insertions(+), 112 deletions(-) create mode 100644 etc/internal-client.conf-sample diff --git a/doc/manpages/container-server.conf.5 b/doc/manpages/container-server.conf.5 index a6bb699758..93408cf7ad 100644 --- a/doc/manpages/container-server.conf.5 +++ b/doc/manpages/container-server.conf.5 @@ -270,6 +270,10 @@ If you need to use an HTTP Proxy, set it here; defaults to no proxy. Will audit, at most, each container once per interval. The default is 300 seconds. .IP \fBcontainer_time\fR Maximum amount of time to spend syncing each container per pass. The default is 60 seconds. +.IP \fBrequest_retries\fR +Server errors from requests will be retried by default. +.IP \fBinternal_client_conf_path\fR +Internal client config file path. .RE .PD diff --git a/etc/container-server.conf-sample b/etc/container-server.conf-sample index 6e881d9e04..e7b8a802f8 100644 --- a/etc/container-server.conf-sample +++ b/etc/container-server.conf-sample @@ -170,6 +170,11 @@ use = egg:swift#recon # # Maximum amount of time in seconds for the connection attempt # conn_timeout = 5 +# Server errors from requests will be retried by default +# request_tries = 3 +# +# Internal client config file path +# internal_client_conf_path = /etc/swift/internal-client.conf # Note: Put it at the beginning of the pipeline to profile all middleware. But # it is safer to put this after healthcheck. diff --git a/etc/internal-client.conf-sample b/etc/internal-client.conf-sample new file mode 100644 index 0000000000..2d25d448b6 --- /dev/null +++ b/etc/internal-client.conf-sample @@ -0,0 +1,42 @@ +[DEFAULT] +# swift_dir = /etc/swift +# user = swift +# You can specify default log routing here if you want: +# log_name = swift +# log_facility = LOG_LOCAL0 +# log_level = INFO +# log_address = /dev/log +# +# comma separated list of functions to call to setup custom log handlers. +# functions get passed: conf, name, log_to_console, log_route, fmt, logger, +# adapted_logger +# log_custom_handlers = +# +# If set, log_udp_host will override log_address +# log_udp_host = +# log_udp_port = 514 +# +# You can enable StatsD logging here: +# log_statsd_host = localhost +# log_statsd_port = 8125 +# log_statsd_default_sample_rate = 1.0 +# log_statsd_sample_rate_factor = 1.0 +# log_statsd_metric_prefix = + +[pipeline:main] +pipeline = catch_errors proxy-logging cache proxy-server + +[app:proxy-server] +use = egg:swift#proxy +# See proxy-server.conf-sample for options + +[filter:cache] +use = egg:swift#memcache +# See proxy-server.conf-sample for options + +[filter:proxy-logging] +use = egg:swift#proxy_logging + +[filter:catch_errors] +use = egg:swift#catch_errors +# See proxy-server.conf-sample for options diff --git a/swift/common/wsgi.py b/swift/common/wsgi.py index b1e1f5ea73..35df2077f2 100644 --- a/swift/common/wsgi.py +++ b/swift/common/wsgi.py @@ -25,6 +25,7 @@ import time import mimetools from swift import gettext_ as _ from StringIO import StringIO +from textwrap import dedent import eventlet import eventlet.debug @@ -96,13 +97,34 @@ def _loadconfigdir(object_type, uri, path, name, relative_to, global_conf): loadwsgi._loaders['config_dir'] = _loadconfigdir +class ConfigString(NamedConfigLoader): + """ + Wrap a raw config string up for paste.deploy. + + If you give one of these to our loadcontext (e.g. give it to our + appconfig) we'll intercept it and get it routed to the right loader. + """ + + def __init__(self, config_string): + self.contents = StringIO(dedent(config_string)) + self.filename = "string" + defaults = { + 'here': "string", + '__file__': "string", + } + self.parser = loadwsgi.NicerConfigParser("string", defaults=defaults) + self.parser.optionxform = str # Don't lower-case keys + self.parser.readfp(self.contents) + + def wrap_conf_type(f): """ Wrap a function whos first argument is a paste.deploy style config uri, - such that you can pass it an un-adorned raw filesystem path and the config - directive (either config: or config_dir:) will be added automatically - based on the type of filesystem entity at the given path (either a file or - directory) before passing it through to the paste.deploy function. + such that you can pass it an un-adorned raw filesystem path (or config + string) and the config directive (either config:, config_dir:, or + config_str:) will be added automatically based on the type of entity + (either a file or directory, or if no such entity on the file system - + just a string) before passing it through to the paste.deploy function. """ def wrapper(conf_path, *args, **kwargs): if os.path.isdir(conf_path): @@ -332,6 +354,12 @@ class PipelineWrapper(object): def loadcontext(object_type, uri, name=None, relative_to=None, global_conf=None): + if isinstance(uri, loadwsgi.ConfigLoader): + # bypass loadcontext's uri parsing and loader routing and + # just directly return the context + if global_conf: + uri.update_defaults(global_conf, overwrite=False) + return uri.get_context(object_type, name, global_conf) add_conf_type = wrap_conf_type(lambda x: x) return loadwsgi.loadcontext(object_type, add_conf_type(uri), name=name, relative_to=relative_to, diff --git a/swift/container/sync.py b/swift/container/sync.py index 0f42de6e99..a409de4ac7 100644 --- a/swift/container/sync.py +++ b/swift/container/sync.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import errno import os import uuid from swift import gettext_ as _ @@ -25,8 +26,8 @@ from eventlet import sleep, Timeout import swift.common.db from swift.container.backend import ContainerBroker, DATADIR from swift.common.container_sync_realms import ContainerSyncRealms -from swift.common.direct_client import direct_get_object -from swift.common.internal_client import delete_object, put_object +from swift.common.internal_client import ( + delete_object, put_object, InternalClient, UnexpectedResponse) from swift.common.exceptions import ClientException from swift.common.ring import Ring from swift.common.ring.utils import is_local_device @@ -37,6 +38,55 @@ from swift.common.utils import ( from swift.common.daemon import Daemon from swift.common.http import HTTP_UNAUTHORIZED, HTTP_NOT_FOUND from swift.common.storage_policy import POLICIES +from swift.common.wsgi import ConfigString + + +# The default internal client config body is to support upgrades without +# requiring deployment of the new /etc/swift/internal-client.conf +ic_conf_body = """ +[DEFAULT] +# swift_dir = /etc/swift +# user = swift +# You can specify default log routing here if you want: +# log_name = swift +# log_facility = LOG_LOCAL0 +# log_level = INFO +# log_address = /dev/log +# +# comma separated list of functions to call to setup custom log handlers. +# functions get passed: conf, name, log_to_console, log_route, fmt, logger, +# adapted_logger +# log_custom_handlers = +# +# If set, log_udp_host will override log_address +# log_udp_host = +# log_udp_port = 514 +# +# You can enable StatsD logging here: +# log_statsd_host = localhost +# log_statsd_port = 8125 +# log_statsd_default_sample_rate = 1.0 +# log_statsd_sample_rate_factor = 1.0 +# log_statsd_metric_prefix = + +[pipeline:main] +pipeline = catch_errors proxy-logging cache proxy-server + +[app:proxy-server] +use = egg:swift#proxy +# See proxy-server.conf-sample for options + +[filter:cache] +use = egg:swift#memcache +# See proxy-server.conf-sample for options + +[filter:proxy-logging] +use = egg:swift#proxy_logging + +[filter:catch_errors] +use = egg:swift#catch_errors +# See proxy-server.conf-sample for options +""".lstrip() class ContainerSync(Daemon): @@ -103,12 +153,12 @@ class ContainerSync(Daemon): loaded. This is overridden by unit tests. """ - def __init__(self, conf, container_ring=None): + def __init__(self, conf, container_ring=None, logger=None): #: The dict of configuration values from the [container-sync] section #: of the container-server.conf. self.conf = conf #: Logger to use for container-sync log lines. - self.logger = get_logger(conf, log_route='container-sync') + self.logger = logger or get_logger(conf, log_route='container-sync') #: Path to the local device mount points. self.devices = conf.get('devices', '/srv/node') #: Indicates whether mount points should be verified as actual mount @@ -159,6 +209,26 @@ class ContainerSync(Daemon): swift.common.db.DB_PREALLOCATION = \ config_true_value(conf.get('db_preallocation', 'f')) self.conn_timeout = float(conf.get('conn_timeout', 5)) + request_tries = int(conf.get('request_tries') or 3) + + internal_client_conf_path = conf.get('internal_client_conf_path') + if not internal_client_conf_path: + self.logger.warning( + _('Configuration option internal_client_conf_path not ' + 'defined. Using default configuration, See ' + 'internal-client.conf-sample for options')) + internal_client_conf = ConfigString(ic_conf_body) + else: + internal_client_conf = internal_client_conf_path + try: + self.swift = InternalClient( + internal_client_conf, 'Swift Container Sync', request_tries) + except IOError as err: + if err.errno != errno.ENOENT: + raise + raise SystemExit( + _('Unable to load internal client from config: %r (%s)') % + (internal_client_conf_path, err)) def get_object_ring(self, policy_idx): """ @@ -380,39 +450,32 @@ class ContainerSync(Daemon): looking_for_timestamp = Timestamp(row['created_at']) timestamp = -1 headers = body = None - headers_out = {'X-Backend-Storage-Policy-Index': + # look up for the newest one + headers_out = {'X-Newest': True, + 'X-Backend-Storage-Policy-Index': str(info['storage_policy_index'])} - for node in nodes: - try: - these_headers, this_body = direct_get_object( - node, part, info['account'], info['container'], - row['name'], headers=headers_out, - resp_chunk_size=65536) - this_timestamp = Timestamp( - these_headers['x-timestamp']) - if this_timestamp > timestamp: - timestamp = this_timestamp - headers = these_headers - body = this_body - except ClientException as err: - # If any errors are not 404, make sure we report the - # non-404 one. We don't want to mistakenly assume the - # object no longer exists just because one says so and - # the others errored for some other reason. - if not exc or getattr( - exc, 'http_status', HTTP_NOT_FOUND) == \ - HTTP_NOT_FOUND: - exc = err - except (Exception, Timeout) as err: - exc = err + try: + source_obj_status, source_obj_info, source_obj_iter = \ + self.swift.get_object(info['account'], + info['container'], row['name'], + headers=headers_out, + acceptable_statuses=(2, 4)) + + except (Exception, UnexpectedResponse, Timeout) as err: + source_obj_info = {} + source_obj_iter = None + exc = err + timestamp = Timestamp(source_obj_info.get( + 'x-timestamp', 0)) + headers = source_obj_info + body = source_obj_iter if timestamp < looking_for_timestamp: if exc: raise exc raise Exception( - _('Unknown exception trying to GET: %(node)r ' + _('Unknown exception trying to GET: ' '%(account)r %(container)r %(object)r'), - {'node': node, 'part': part, - 'account': info['account'], + {'account': info['account'], 'container': info['container'], 'object': row['name']}) for key in ('date', 'last-modified'): diff --git a/test/unit/common/test_wsgi.py b/test/unit/common/test_wsgi.py index 67142decdd..279eb8624b 100644 --- a/test/unit/common/test_wsgi.py +++ b/test/unit/common/test_wsgi.py @@ -156,6 +156,27 @@ class TestWSGI(unittest.TestCase): logger.info('testing') self.assertEquals('proxy-server', log_name) + @with_tempdir + def test_loadapp_from_file(self, tempdir): + conf_path = os.path.join(tempdir, 'object-server.conf') + conf_body = """ + [app:main] + use = egg:swift#object + """ + contents = dedent(conf_body) + with open(conf_path, 'w') as f: + f.write(contents) + app = wsgi.loadapp(conf_path) + self.assertTrue(isinstance(app, obj_server.ObjectController)) + + def test_loadapp_from_string(self): + conf_body = """ + [app:main] + use = egg:swift#object + """ + app = wsgi.loadapp(wsgi.ConfigString(conf_body)) + self.assertTrue(isinstance(app, obj_server.ObjectController)) + def test_init_request_processor_from_conf_dir(self): config_dir = { 'proxy-server.conf.d/pipeline.conf': """ diff --git a/test/unit/container/test_sync.py b/test/unit/container/test_sync.py index 3db5c41930..8c6d895323 100644 --- a/test/unit/container/test_sync.py +++ b/test/unit/container/test_sync.py @@ -14,17 +14,20 @@ # See the License for the specific language governing permissions and # limitations under the License. -import re +import os import unittest from contextlib import nested +from textwrap import dedent import mock -from test.unit import FakeLogger +from test.unit import debug_logger from swift.container import sync from swift.common import utils +from swift.common.wsgi import ConfigString from swift.common.exceptions import ClientException from swift.common.storage_policy import StoragePolicy -from test.unit import patch_policies +import test +from test.unit import patch_policies, with_tempdir utils.HASH_PATH_SUFFIX = 'endcap' utils.HASH_PATH_PREFIX = 'endcap' @@ -71,6 +74,9 @@ class FakeContainerBroker(object): @patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())]) class TestContainerSync(unittest.TestCase): + def setUp(self): + self.logger = debug_logger('test-container-sync') + def test_FileLikeIter(self): # Retained test to show new FileLikeIter acts just like the removed # _Iter2FileLikeObject did. @@ -96,11 +102,56 @@ class TestContainerSync(unittest.TestCase): self.assertEquals(flo.read(), '') self.assertEquals(flo.read(2), '') - def test_init(self): + def assertLogMessage(self, msg_level, expected, skip=0): + for line in self.logger.get_lines_for_level(msg_level)[skip:]: + msg = 'expected %r not in %r' % (expected, line) + self.assertTrue(expected in line, msg) + + @with_tempdir + def test_init(self, tempdir): + ic_conf_path = os.path.join(tempdir, 'internal-client.conf') cring = FakeRing() - cs = sync.ContainerSync({}, container_ring=cring) + + with mock.patch('swift.container.sync.InternalClient'): + cs = sync.ContainerSync({}, container_ring=cring) self.assertTrue(cs.container_ring is cring) + # specified but not exists will not start + conf = {'internal_client_conf_path': ic_conf_path} + self.assertRaises(SystemExit, sync.ContainerSync, conf, + container_ring=cring, logger=self.logger) + + # not specified will use default conf + with mock.patch('swift.container.sync.InternalClient') as mock_ic: + cs = sync.ContainerSync({}, container_ring=cring, + logger=self.logger) + self.assertTrue(cs.container_ring is cring) + self.assertTrue(mock_ic.called) + conf_path, name, retry = mock_ic.call_args[0] + self.assertTrue(isinstance(conf_path, ConfigString)) + self.assertEquals(conf_path.contents.getvalue(), + dedent(sync.ic_conf_body)) + self.assertLogMessage('warning', 'internal_client_conf_path') + self.assertLogMessage('warning', 'internal-client.conf-sample') + + # correct + contents = dedent(sync.ic_conf_body) + with open(ic_conf_path, 'w') as f: + f.write(contents) + with mock.patch('swift.container.sync.InternalClient') as mock_ic: + cs = sync.ContainerSync(conf, container_ring=cring) + self.assertTrue(cs.container_ring is cring) + self.assertTrue(mock_ic.called) + conf_path, name, retry = mock_ic.call_args[0] + self.assertEquals(conf_path, ic_conf_path) + + sample_conf_filename = os.path.join( + os.path.dirname(test.__file__), + '../etc/internal-client.conf-sample') + with open(sample_conf_filename) as sample_conf_file: + sample_conf = sample_conf_file.read() + self.assertEqual(contents, sample_conf) + def test_run_forever(self): # This runs runs_forever with fakes to succeed for two loops, the first # causing a report but no interval sleep, the second no report but an @@ -142,7 +193,9 @@ class TestContainerSync(unittest.TestCase): 'storage_policy_index': 0}) sync.time = fake_time sync.sleep = fake_sleep - cs = sync.ContainerSync({}, container_ring=FakeRing()) + + with mock.patch('swift.container.sync.InternalClient'): + cs = sync.ContainerSync({}, container_ring=FakeRing()) sync.audit_location_generator = fake_audit_location_generator cs.run_forever(1, 2, a=3, b=4, verbose=True) except Exception as err: @@ -197,7 +250,9 @@ class TestContainerSync(unittest.TestCase): p, info={'account': 'a', 'container': 'c', 'storage_policy_index': 0}) sync.time = fake_time - cs = sync.ContainerSync({}, container_ring=FakeRing()) + + with mock.patch('swift.container.sync.InternalClient'): + cs = sync.ContainerSync({}, container_ring=FakeRing()) sync.audit_location_generator = fake_audit_location_generator cs.run_once(1, 2, a=3, b=4, verbose=True) self.assertEquals(time_calls, [6]) @@ -218,12 +273,14 @@ class TestContainerSync(unittest.TestCase): def test_container_sync_not_db(self): cring = FakeRing() - cs = sync.ContainerSync({}, container_ring=cring) + with mock.patch('swift.container.sync.InternalClient'): + cs = sync.ContainerSync({}, container_ring=cring) self.assertEquals(cs.container_failures, 0) def test_container_sync_missing_db(self): cring = FakeRing() - cs = sync.ContainerSync({}, container_ring=cring) + with mock.patch('swift.container.sync.InternalClient'): + cs = sync.ContainerSync({}, container_ring=cring) cs.container_sync('isa.db') self.assertEquals(cs.container_failures, 1) @@ -231,7 +288,8 @@ class TestContainerSync(unittest.TestCase): # Db could be there due to handoff replication so test that we ignore # those. cring = FakeRing() - cs = sync.ContainerSync({}, container_ring=cring) + with mock.patch('swift.container.sync.InternalClient'): + cs = sync.ContainerSync({}, container_ring=cring) orig_ContainerBroker = sync.ContainerBroker try: sync.ContainerBroker = lambda p: FakeContainerBroker( @@ -263,7 +321,8 @@ class TestContainerSync(unittest.TestCase): def test_container_sync_deleted(self): cring = FakeRing() - cs = sync.ContainerSync({}, container_ring=cring) + with mock.patch('swift.container.sync.InternalClient'): + cs = sync.ContainerSync({}, container_ring=cring) orig_ContainerBroker = sync.ContainerBroker try: sync.ContainerBroker = lambda p: FakeContainerBroker( @@ -288,7 +347,8 @@ class TestContainerSync(unittest.TestCase): def test_container_sync_no_to_or_key(self): cring = FakeRing() - cs = sync.ContainerSync({}, container_ring=cring) + with mock.patch('swift.container.sync.InternalClient'): + cs = sync.ContainerSync({}, container_ring=cring) orig_ContainerBroker = sync.ContainerBroker try: sync.ContainerBroker = lambda p: FakeContainerBroker( @@ -368,7 +428,8 @@ class TestContainerSync(unittest.TestCase): def test_container_stop_at(self): cring = FakeRing() - cs = sync.ContainerSync({}, container_ring=cring) + with mock.patch('swift.container.sync.InternalClient'): + cs = sync.ContainerSync({}, container_ring=cring) orig_ContainerBroker = sync.ContainerBroker orig_time = sync.time try: @@ -411,7 +472,8 @@ class TestContainerSync(unittest.TestCase): def test_container_first_loop(self): cring = FakeRing() - cs = sync.ContainerSync({}, container_ring=cring) + with mock.patch('swift.container.sync.InternalClient'): + cs = sync.ContainerSync({}, container_ring=cring) def fake_hash_path(account, container, obj, raw_digest=False): # Ensures that no rows match for full syncing, ordinal is 0 and @@ -543,7 +605,9 @@ class TestContainerSync(unittest.TestCase): def test_container_second_loop(self): cring = FakeRing() - cs = sync.ContainerSync({}, container_ring=cring) + with mock.patch('swift.container.sync.InternalClient'): + cs = sync.ContainerSync({}, container_ring=cring, + logger=self.logger) orig_ContainerBroker = sync.ContainerBroker orig_hash_path = sync.hash_path orig_delete_object = sync.delete_object @@ -649,7 +713,6 @@ class TestContainerSync(unittest.TestCase): hex = 'abcdef' sync.uuid = FakeUUID - fake_logger = FakeLogger() def fake_delete_object(path, name=None, headers=None, proxy=None, logger=None, timeout=None): @@ -665,12 +728,14 @@ class TestContainerSync(unittest.TestCase): headers, {'x-container-sync-key': 'key', 'x-timestamp': '1.2'}) self.assertEquals(proxy, 'http://proxy') - self.assertEqual(logger, fake_logger) self.assertEqual(timeout, 5.0) + self.assertEqual(logger, self.logger) sync.delete_object = fake_delete_object - cs = sync.ContainerSync({}, container_ring=FakeRing()) - cs.logger = fake_logger + + with mock.patch('swift.container.sync.InternalClient'): + cs = sync.ContainerSync({}, container_ring=FakeRing(), + logger=self.logger) cs.http_proxies = ['http://proxy'] # Success self.assertTrue(cs.container_sync_row( @@ -749,7 +814,6 @@ class TestContainerSync(unittest.TestCase): orig_uuid = sync.uuid orig_shuffle = sync.shuffle orig_put_object = sync.put_object - orig_direct_get_object = sync.direct_get_object try: class FakeUUID(object): class uuid4(object): @@ -757,7 +821,6 @@ class TestContainerSync(unittest.TestCase): sync.uuid = FakeUUID sync.shuffle = lambda x: x - fake_logger = FakeLogger() def fake_put_object(sync_to, name=None, headers=None, contents=None, proxy=None, logger=None, @@ -781,24 +844,25 @@ class TestContainerSync(unittest.TestCase): 'content-type': 'text/plain'}) self.assertEquals(contents.read(), 'contents') self.assertEquals(proxy, 'http://proxy') - self.assertEqual(logger, fake_logger) self.assertEqual(timeout, 5.0) + self.assertEqual(logger, self.logger) sync.put_object = fake_put_object - cs = sync.ContainerSync({}, container_ring=FakeRing()) - cs.logger = fake_logger + with mock.patch('swift.container.sync.InternalClient'): + cs = sync.ContainerSync({}, container_ring=FakeRing(), + logger=self.logger) cs.http_proxies = ['http://proxy'] - def fake_direct_get_object(node, part, account, container, obj, - headers, resp_chunk_size=1): - self.assertEquals(headers['X-Backend-Storage-Policy-Index'], - '0') - return ({'other-header': 'other header value', - 'etag': '"etagvalue"', 'x-timestamp': '1.2', - 'content-type': 'text/plain; swift_bytes=123'}, + def fake_get_object(acct, con, obj, headers, acceptable_statuses): + self.assertEqual(headers['X-Backend-Storage-Policy-Index'], + '0') + return (200, {'other-header': 'other header value', + 'etag': '"etagvalue"', 'x-timestamp': '1.2', + 'content-type': 'text/plain; swift_bytes=123'}, iter('contents')) - sync.direct_get_object = fake_direct_get_object + + cs.swift.get_object = fake_get_object # Success as everything says it worked self.assertTrue(cs.container_sync_row( {'deleted': False, @@ -809,19 +873,19 @@ class TestContainerSync(unittest.TestCase): realm, realm_key)) self.assertEquals(cs.container_puts, 1) - def fake_direct_get_object(node, part, account, container, obj, - headers, resp_chunk_size=1): + def fake_get_object(acct, con, obj, headers, acceptable_statuses): + self.assertEquals(headers['X-Newest'], True) self.assertEquals(headers['X-Backend-Storage-Policy-Index'], '0') - return ({'date': 'date value', - 'last-modified': 'last modified value', - 'x-timestamp': '1.2', - 'other-header': 'other header value', - 'etag': '"etagvalue"', - 'content-type': 'text/plain; swift_bytes=123'}, + return (200, {'date': 'date value', + 'last-modified': 'last modified value', + 'x-timestamp': '1.2', + 'other-header': 'other header value', + 'etag': '"etagvalue"', + 'content-type': 'text/plain; swift_bytes=123'}, iter('contents')) - sync.direct_get_object = fake_direct_get_object + cs.swift.get_object = fake_get_object # Success as everything says it worked, also checks 'date' and # 'last-modified' headers are removed and that 'etag' header is # stripped of double quotes. @@ -836,14 +900,14 @@ class TestContainerSync(unittest.TestCase): exc = [] - def fake_direct_get_object(node, part, account, container, obj, - headers, resp_chunk_size=1): + def fake_get_object(acct, con, obj, headers, acceptable_statuses): + self.assertEquals(headers['X-Newest'], True) self.assertEquals(headers['X-Backend-Storage-Policy-Index'], '0') exc.append(Exception('test exception')) raise exc[-1] - sync.direct_get_object = fake_direct_get_object + cs.swift.get_object = fake_get_object # Fail due to completely unexpected exception self.assertFalse(cs.container_sync_row( {'deleted': False, @@ -853,22 +917,20 @@ class TestContainerSync(unittest.TestCase): {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, realm, realm_key)) self.assertEquals(cs.container_puts, 2) - self.assertEquals(len(exc), 3) + self.assertEquals(len(exc), 1) self.assertEquals(str(exc[-1]), 'test exception') exc = [] - def fake_direct_get_object(node, part, account, container, obj, - headers, resp_chunk_size=1): + def fake_get_object(acct, con, obj, headers, acceptable_statuses): + self.assertEquals(headers['X-Newest'], True) self.assertEquals(headers['X-Backend-Storage-Policy-Index'], '0') - if len(exc) == 0: - exc.append(Exception('test other exception')) - else: - exc.append(ClientException('test client exception')) + + exc.append(ClientException('test client exception')) raise exc[-1] - sync.direct_get_object = fake_direct_get_object + cs.swift.get_object = fake_get_object # Fail due to all direct_get_object calls failing self.assertFalse(cs.container_sync_row( {'deleted': False, @@ -878,25 +940,22 @@ class TestContainerSync(unittest.TestCase): {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, realm, realm_key)) self.assertEquals(cs.container_puts, 2) - self.assertEquals(len(exc), 3) - self.assertEquals(str(exc[-3]), 'test other exception') - self.assertEquals(str(exc[-2]), 'test client exception') + self.assertEquals(len(exc), 1) self.assertEquals(str(exc[-1]), 'test client exception') - def fake_direct_get_object(node, part, account, container, obj, - headers, resp_chunk_size=1): + def fake_get_object(acct, con, obj, headers, acceptable_statuses): + self.assertEquals(headers['X-Newest'], True) self.assertEquals(headers['X-Backend-Storage-Policy-Index'], '0') - return ({'other-header': 'other header value', - 'x-timestamp': '1.2', 'etag': '"etagvalue"'}, + return (200, {'other-header': 'other header value', + 'x-timestamp': '1.2', 'etag': '"etagvalue"'}, iter('contents')) def fake_put_object(*args, **kwargs): raise ClientException('test client exception', http_status=401) - sync.direct_get_object = fake_direct_get_object + cs.swift.get_object = fake_get_object sync.put_object = fake_put_object - cs.logger = FakeLogger() # Fail due to 401 self.assertFalse(cs.container_sync_row( {'deleted': False, @@ -906,15 +965,13 @@ class TestContainerSync(unittest.TestCase): {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, realm, realm_key)) self.assertEquals(cs.container_puts, 2) - self.assert_(re.match('Unauth ', - cs.logger.log_dict['info'][0][0][0])) + self.assertLogMessage('info', 'Unauth') def fake_put_object(*args, **kwargs): raise ClientException('test client exception', http_status=404) sync.put_object = fake_put_object # Fail due to 404 - cs.logger = FakeLogger() self.assertFalse(cs.container_sync_row( {'deleted': False, 'name': 'object', @@ -923,8 +980,7 @@ class TestContainerSync(unittest.TestCase): {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, realm, realm_key)) self.assertEquals(cs.container_puts, 2) - self.assert_(re.match('Not found ', - cs.logger.log_dict['info'][0][0][0])) + self.assertLogMessage('info', 'Not found', 1) def fake_put_object(*args, **kwargs): raise ClientException('test client exception', http_status=503) @@ -939,29 +995,32 @@ class TestContainerSync(unittest.TestCase): {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, realm, realm_key)) self.assertEquals(cs.container_puts, 2) - error_lines = cs.logger.get_lines_for_level('error') - self.assertEqual(len(error_lines), 1) - self.assertTrue(error_lines[0].startswith('ERROR Syncing ')) + self.assertLogMessage('error', 'ERROR Syncing') finally: sync.uuid = orig_uuid sync.shuffle = orig_shuffle sync.put_object = orig_put_object - sync.direct_get_object = orig_direct_get_object def test_select_http_proxy_None(self): - cs = sync.ContainerSync( - {'sync_proxy': ''}, container_ring=FakeRing()) + + with mock.patch('swift.container.sync.InternalClient'): + cs = sync.ContainerSync( + {'sync_proxy': ''}, container_ring=FakeRing()) self.assertEqual(cs.select_http_proxy(), None) def test_select_http_proxy_one(self): - cs = sync.ContainerSync( - {'sync_proxy': 'http://one'}, container_ring=FakeRing()) + + with mock.patch('swift.container.sync.InternalClient'): + cs = sync.ContainerSync( + {'sync_proxy': 'http://one'}, container_ring=FakeRing()) self.assertEqual(cs.select_http_proxy(), 'http://one') def test_select_http_proxy_multiple(self): - cs = sync.ContainerSync( - {'sync_proxy': 'http://one,http://two,http://three'}, - container_ring=FakeRing()) + + with mock.patch('swift.container.sync.InternalClient'): + cs = sync.ContainerSync( + {'sync_proxy': 'http://one,http://two,http://three'}, + container_ring=FakeRing()) self.assertEqual( set(cs.http_proxies), set(['http://one', 'http://two', 'http://three']))