Update container sync to use internal client

This patch changes container sync to use Internal Client instead
of Direct Client.

In the current design, container sync uses direct_get_object to
get the newest source object(which talks to storage node directly).
This works fine for replication storage policies however in
erasure coding policies, direct_get_object would only return part
of the object(it's encoded as several pieces). Using Internal
Client can get the original object in EC case.

Note that for the container sync put/delete part, it's working in
EC since it's using Simple Client.

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

DocImpact
Change-Id: I91952bc9337f354ce6024bf8392046a1ecf6ecc9
This commit is contained in:
Yuan Zhou 2014-12-24 11:52:34 +08:00 committed by Clay Gerrard
parent decbcd24d4
commit 61a9d35fd5
7 changed files with 334 additions and 112 deletions

View File

@ -270,6 +270,10 @@ If you need to use an HTTP Proxy, set it here; defaults to no proxy.
Will audit, at most, each container once per interval. The default is 300 seconds. Will audit, at most, each container once per interval. The default is 300 seconds.
.IP \fBcontainer_time\fR .IP \fBcontainer_time\fR
Maximum amount of time to spend syncing each container per pass. The default is 60 seconds. Maximum amount of time to spend syncing each container per pass. The default is 60 seconds.
.IP \fBrequest_retries\fR
Server errors from requests will be retried by default.
.IP \fBinternal_client_conf_path\fR
Internal client config file path.
.RE .RE
.PD .PD

View File

@ -170,6 +170,11 @@ use = egg:swift#recon
# #
# Maximum amount of time in seconds for the connection attempt # Maximum amount of time in seconds for the connection attempt
# conn_timeout = 5 # conn_timeout = 5
# Server errors from requests will be retried by default
# request_tries = 3
#
# Internal client config file path
# internal_client_conf_path = /etc/swift/internal-client.conf
# Note: Put it at the beginning of the pipeline to profile all middleware. But # Note: Put it at the beginning of the pipeline to profile all middleware. But
# it is safer to put this after healthcheck. # it is safer to put this after healthcheck.

View File

@ -0,0 +1,42 @@
[DEFAULT]
# swift_dir = /etc/swift
# user = swift
# You can specify default log routing here if you want:
# log_name = swift
# log_facility = LOG_LOCAL0
# log_level = INFO
# log_address = /dev/log
#
# comma separated list of functions to call to setup custom log handlers.
# functions get passed: conf, name, log_to_console, log_route, fmt, logger,
# adapted_logger
# log_custom_handlers =
#
# If set, log_udp_host will override log_address
# log_udp_host =
# log_udp_port = 514
#
# You can enable StatsD logging here:
# log_statsd_host = localhost
# log_statsd_port = 8125
# log_statsd_default_sample_rate = 1.0
# log_statsd_sample_rate_factor = 1.0
# log_statsd_metric_prefix =
[pipeline:main]
pipeline = catch_errors proxy-logging cache proxy-server
[app:proxy-server]
use = egg:swift#proxy
# See proxy-server.conf-sample for options
[filter:cache]
use = egg:swift#memcache
# See proxy-server.conf-sample for options
[filter:proxy-logging]
use = egg:swift#proxy_logging
[filter:catch_errors]
use = egg:swift#catch_errors
# See proxy-server.conf-sample for options

View File

@ -25,6 +25,7 @@ import time
import mimetools import mimetools
from swift import gettext_ as _ from swift import gettext_ as _
from StringIO import StringIO from StringIO import StringIO
from textwrap import dedent
import eventlet import eventlet
import eventlet.debug import eventlet.debug
@ -96,13 +97,34 @@ def _loadconfigdir(object_type, uri, path, name, relative_to, global_conf):
loadwsgi._loaders['config_dir'] = _loadconfigdir loadwsgi._loaders['config_dir'] = _loadconfigdir
class ConfigString(NamedConfigLoader):
"""
Wrap a raw config string up for paste.deploy.
If you give one of these to our loadcontext (e.g. give it to our
appconfig) we'll intercept it and get it routed to the right loader.
"""
def __init__(self, config_string):
self.contents = StringIO(dedent(config_string))
self.filename = "string"
defaults = {
'here': "string",
'__file__': "string",
}
self.parser = loadwsgi.NicerConfigParser("string", defaults=defaults)
self.parser.optionxform = str # Don't lower-case keys
self.parser.readfp(self.contents)
def wrap_conf_type(f): def wrap_conf_type(f):
""" """
Wrap a function whos first argument is a paste.deploy style config uri, Wrap a function whos first argument is a paste.deploy style config uri,
such that you can pass it an un-adorned raw filesystem path and the config such that you can pass it an un-adorned raw filesystem path (or config
directive (either config: or config_dir:) will be added automatically string) and the config directive (either config:, config_dir:, or
based on the type of filesystem entity at the given path (either a file or config_str:) will be added automatically based on the type of entity
directory) before passing it through to the paste.deploy function. (either a file or directory, or if no such entity on the file system -
just a string) before passing it through to the paste.deploy function.
""" """
def wrapper(conf_path, *args, **kwargs): def wrapper(conf_path, *args, **kwargs):
if os.path.isdir(conf_path): if os.path.isdir(conf_path):
@ -332,6 +354,12 @@ class PipelineWrapper(object):
def loadcontext(object_type, uri, name=None, relative_to=None, def loadcontext(object_type, uri, name=None, relative_to=None,
global_conf=None): global_conf=None):
if isinstance(uri, loadwsgi.ConfigLoader):
# bypass loadcontext's uri parsing and loader routing and
# just directly return the context
if global_conf:
uri.update_defaults(global_conf, overwrite=False)
return uri.get_context(object_type, name, global_conf)
add_conf_type = wrap_conf_type(lambda x: x) add_conf_type = wrap_conf_type(lambda x: x)
return loadwsgi.loadcontext(object_type, add_conf_type(uri), name=name, return loadwsgi.loadcontext(object_type, add_conf_type(uri), name=name,
relative_to=relative_to, relative_to=relative_to,

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import errno
import os import os
import uuid import uuid
from swift import gettext_ as _ from swift import gettext_ as _
@ -25,8 +26,8 @@ from eventlet import sleep, Timeout
import swift.common.db import swift.common.db
from swift.container.backend import ContainerBroker, DATADIR from swift.container.backend import ContainerBroker, DATADIR
from swift.common.container_sync_realms import ContainerSyncRealms from swift.common.container_sync_realms import ContainerSyncRealms
from swift.common.direct_client import direct_get_object from swift.common.internal_client import (
from swift.common.internal_client import delete_object, put_object delete_object, put_object, InternalClient, UnexpectedResponse)
from swift.common.exceptions import ClientException from swift.common.exceptions import ClientException
from swift.common.ring import Ring from swift.common.ring import Ring
from swift.common.ring.utils import is_local_device from swift.common.ring.utils import is_local_device
@ -37,6 +38,55 @@ from swift.common.utils import (
from swift.common.daemon import Daemon from swift.common.daemon import Daemon
from swift.common.http import HTTP_UNAUTHORIZED, HTTP_NOT_FOUND from swift.common.http import HTTP_UNAUTHORIZED, HTTP_NOT_FOUND
from swift.common.storage_policy import POLICIES from swift.common.storage_policy import POLICIES
from swift.common.wsgi import ConfigString
# The default internal client config body is to support upgrades without
# requiring deployment of the new /etc/swift/internal-client.conf
ic_conf_body = """
[DEFAULT]
# swift_dir = /etc/swift
# user = swift
# You can specify default log routing here if you want:
# log_name = swift
# log_facility = LOG_LOCAL0
# log_level = INFO
# log_address = /dev/log
#
# comma separated list of functions to call to setup custom log handlers.
# functions get passed: conf, name, log_to_console, log_route, fmt, logger,
# adapted_logger
# log_custom_handlers =
#
# If set, log_udp_host will override log_address
# log_udp_host =
# log_udp_port = 514
#
# You can enable StatsD logging here:
# log_statsd_host = localhost
# log_statsd_port = 8125
# log_statsd_default_sample_rate = 1.0
# log_statsd_sample_rate_factor = 1.0
# log_statsd_metric_prefix =
[pipeline:main]
pipeline = catch_errors proxy-logging cache proxy-server
[app:proxy-server]
use = egg:swift#proxy
# See proxy-server.conf-sample for options
[filter:cache]
use = egg:swift#memcache
# See proxy-server.conf-sample for options
[filter:proxy-logging]
use = egg:swift#proxy_logging
[filter:catch_errors]
use = egg:swift#catch_errors
# See proxy-server.conf-sample for options
""".lstrip()
class ContainerSync(Daemon): class ContainerSync(Daemon):
@ -103,12 +153,12 @@ class ContainerSync(Daemon):
loaded. This is overridden by unit tests. loaded. This is overridden by unit tests.
""" """
def __init__(self, conf, container_ring=None): def __init__(self, conf, container_ring=None, logger=None):
#: The dict of configuration values from the [container-sync] section #: The dict of configuration values from the [container-sync] section
#: of the container-server.conf. #: of the container-server.conf.
self.conf = conf self.conf = conf
#: Logger to use for container-sync log lines. #: Logger to use for container-sync log lines.
self.logger = get_logger(conf, log_route='container-sync') self.logger = logger or get_logger(conf, log_route='container-sync')
#: Path to the local device mount points. #: Path to the local device mount points.
self.devices = conf.get('devices', '/srv/node') self.devices = conf.get('devices', '/srv/node')
#: Indicates whether mount points should be verified as actual mount #: Indicates whether mount points should be verified as actual mount
@ -159,6 +209,26 @@ class ContainerSync(Daemon):
swift.common.db.DB_PREALLOCATION = \ swift.common.db.DB_PREALLOCATION = \
config_true_value(conf.get('db_preallocation', 'f')) config_true_value(conf.get('db_preallocation', 'f'))
self.conn_timeout = float(conf.get('conn_timeout', 5)) self.conn_timeout = float(conf.get('conn_timeout', 5))
request_tries = int(conf.get('request_tries') or 3)
internal_client_conf_path = conf.get('internal_client_conf_path')
if not internal_client_conf_path:
self.logger.warning(
_('Configuration option internal_client_conf_path not '
'defined. Using default configuration, See '
'internal-client.conf-sample for options'))
internal_client_conf = ConfigString(ic_conf_body)
else:
internal_client_conf = internal_client_conf_path
try:
self.swift = InternalClient(
internal_client_conf, 'Swift Container Sync', request_tries)
except IOError as err:
if err.errno != errno.ENOENT:
raise
raise SystemExit(
_('Unable to load internal client from config: %r (%s)') %
(internal_client_conf_path, err))
def get_object_ring(self, policy_idx): def get_object_ring(self, policy_idx):
""" """
@ -380,39 +450,32 @@ class ContainerSync(Daemon):
looking_for_timestamp = Timestamp(row['created_at']) looking_for_timestamp = Timestamp(row['created_at'])
timestamp = -1 timestamp = -1
headers = body = None headers = body = None
headers_out = {'X-Backend-Storage-Policy-Index': # look up for the newest one
headers_out = {'X-Newest': True,
'X-Backend-Storage-Policy-Index':
str(info['storage_policy_index'])} str(info['storage_policy_index'])}
for node in nodes: try:
try: source_obj_status, source_obj_info, source_obj_iter = \
these_headers, this_body = direct_get_object( self.swift.get_object(info['account'],
node, part, info['account'], info['container'], info['container'], row['name'],
row['name'], headers=headers_out, headers=headers_out,
resp_chunk_size=65536) acceptable_statuses=(2, 4))
this_timestamp = Timestamp(
these_headers['x-timestamp']) except (Exception, UnexpectedResponse, Timeout) as err:
if this_timestamp > timestamp: source_obj_info = {}
timestamp = this_timestamp source_obj_iter = None
headers = these_headers exc = err
body = this_body timestamp = Timestamp(source_obj_info.get(
except ClientException as err: 'x-timestamp', 0))
# If any errors are not 404, make sure we report the headers = source_obj_info
# non-404 one. We don't want to mistakenly assume the body = source_obj_iter
# object no longer exists just because one says so and
# the others errored for some other reason.
if not exc or getattr(
exc, 'http_status', HTTP_NOT_FOUND) == \
HTTP_NOT_FOUND:
exc = err
except (Exception, Timeout) as err:
exc = err
if timestamp < looking_for_timestamp: if timestamp < looking_for_timestamp:
if exc: if exc:
raise exc raise exc
raise Exception( raise Exception(
_('Unknown exception trying to GET: %(node)r ' _('Unknown exception trying to GET: '
'%(account)r %(container)r %(object)r'), '%(account)r %(container)r %(object)r'),
{'node': node, 'part': part, {'account': info['account'],
'account': info['account'],
'container': info['container'], 'container': info['container'],
'object': row['name']}) 'object': row['name']})
for key in ('date', 'last-modified'): for key in ('date', 'last-modified'):

View File

@ -156,6 +156,27 @@ class TestWSGI(unittest.TestCase):
logger.info('testing') logger.info('testing')
self.assertEquals('proxy-server', log_name) self.assertEquals('proxy-server', log_name)
@with_tempdir
def test_loadapp_from_file(self, tempdir):
conf_path = os.path.join(tempdir, 'object-server.conf')
conf_body = """
[app:main]
use = egg:swift#object
"""
contents = dedent(conf_body)
with open(conf_path, 'w') as f:
f.write(contents)
app = wsgi.loadapp(conf_path)
self.assertTrue(isinstance(app, obj_server.ObjectController))
def test_loadapp_from_string(self):
conf_body = """
[app:main]
use = egg:swift#object
"""
app = wsgi.loadapp(wsgi.ConfigString(conf_body))
self.assertTrue(isinstance(app, obj_server.ObjectController))
def test_init_request_processor_from_conf_dir(self): def test_init_request_processor_from_conf_dir(self):
config_dir = { config_dir = {
'proxy-server.conf.d/pipeline.conf': """ 'proxy-server.conf.d/pipeline.conf': """

View File

@ -14,17 +14,20 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import re import os
import unittest import unittest
from contextlib import nested from contextlib import nested
from textwrap import dedent
import mock import mock
from test.unit import FakeLogger from test.unit import debug_logger
from swift.container import sync from swift.container import sync
from swift.common import utils from swift.common import utils
from swift.common.wsgi import ConfigString
from swift.common.exceptions import ClientException from swift.common.exceptions import ClientException
from swift.common.storage_policy import StoragePolicy from swift.common.storage_policy import StoragePolicy
from test.unit import patch_policies import test
from test.unit import patch_policies, with_tempdir
utils.HASH_PATH_SUFFIX = 'endcap' utils.HASH_PATH_SUFFIX = 'endcap'
utils.HASH_PATH_PREFIX = 'endcap' utils.HASH_PATH_PREFIX = 'endcap'
@ -71,6 +74,9 @@ class FakeContainerBroker(object):
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())]) @patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())])
class TestContainerSync(unittest.TestCase): class TestContainerSync(unittest.TestCase):
def setUp(self):
self.logger = debug_logger('test-container-sync')
def test_FileLikeIter(self): def test_FileLikeIter(self):
# Retained test to show new FileLikeIter acts just like the removed # Retained test to show new FileLikeIter acts just like the removed
# _Iter2FileLikeObject did. # _Iter2FileLikeObject did.
@ -96,11 +102,56 @@ class TestContainerSync(unittest.TestCase):
self.assertEquals(flo.read(), '') self.assertEquals(flo.read(), '')
self.assertEquals(flo.read(2), '') self.assertEquals(flo.read(2), '')
def test_init(self): def assertLogMessage(self, msg_level, expected, skip=0):
for line in self.logger.get_lines_for_level(msg_level)[skip:]:
msg = 'expected %r not in %r' % (expected, line)
self.assertTrue(expected in line, msg)
@with_tempdir
def test_init(self, tempdir):
ic_conf_path = os.path.join(tempdir, 'internal-client.conf')
cring = FakeRing() cring = FakeRing()
cs = sync.ContainerSync({}, container_ring=cring)
with mock.patch('swift.container.sync.InternalClient'):
cs = sync.ContainerSync({}, container_ring=cring)
self.assertTrue(cs.container_ring is cring) self.assertTrue(cs.container_ring is cring)
# specified but not exists will not start
conf = {'internal_client_conf_path': ic_conf_path}
self.assertRaises(SystemExit, sync.ContainerSync, conf,
container_ring=cring, logger=self.logger)
# not specified will use default conf
with mock.patch('swift.container.sync.InternalClient') as mock_ic:
cs = sync.ContainerSync({}, container_ring=cring,
logger=self.logger)
self.assertTrue(cs.container_ring is cring)
self.assertTrue(mock_ic.called)
conf_path, name, retry = mock_ic.call_args[0]
self.assertTrue(isinstance(conf_path, ConfigString))
self.assertEquals(conf_path.contents.getvalue(),
dedent(sync.ic_conf_body))
self.assertLogMessage('warning', 'internal_client_conf_path')
self.assertLogMessage('warning', 'internal-client.conf-sample')
# correct
contents = dedent(sync.ic_conf_body)
with open(ic_conf_path, 'w') as f:
f.write(contents)
with mock.patch('swift.container.sync.InternalClient') as mock_ic:
cs = sync.ContainerSync(conf, container_ring=cring)
self.assertTrue(cs.container_ring is cring)
self.assertTrue(mock_ic.called)
conf_path, name, retry = mock_ic.call_args[0]
self.assertEquals(conf_path, ic_conf_path)
sample_conf_filename = os.path.join(
os.path.dirname(test.__file__),
'../etc/internal-client.conf-sample')
with open(sample_conf_filename) as sample_conf_file:
sample_conf = sample_conf_file.read()
self.assertEqual(contents, sample_conf)
def test_run_forever(self): def test_run_forever(self):
# This runs runs_forever with fakes to succeed for two loops, the first # This runs runs_forever with fakes to succeed for two loops, the first
# causing a report but no interval sleep, the second no report but an # causing a report but no interval sleep, the second no report but an
@ -142,7 +193,9 @@ class TestContainerSync(unittest.TestCase):
'storage_policy_index': 0}) 'storage_policy_index': 0})
sync.time = fake_time sync.time = fake_time
sync.sleep = fake_sleep sync.sleep = fake_sleep
cs = sync.ContainerSync({}, container_ring=FakeRing())
with mock.patch('swift.container.sync.InternalClient'):
cs = sync.ContainerSync({}, container_ring=FakeRing())
sync.audit_location_generator = fake_audit_location_generator sync.audit_location_generator = fake_audit_location_generator
cs.run_forever(1, 2, a=3, b=4, verbose=True) cs.run_forever(1, 2, a=3, b=4, verbose=True)
except Exception as err: except Exception as err:
@ -197,7 +250,9 @@ class TestContainerSync(unittest.TestCase):
p, info={'account': 'a', 'container': 'c', p, info={'account': 'a', 'container': 'c',
'storage_policy_index': 0}) 'storage_policy_index': 0})
sync.time = fake_time sync.time = fake_time
cs = sync.ContainerSync({}, container_ring=FakeRing())
with mock.patch('swift.container.sync.InternalClient'):
cs = sync.ContainerSync({}, container_ring=FakeRing())
sync.audit_location_generator = fake_audit_location_generator sync.audit_location_generator = fake_audit_location_generator
cs.run_once(1, 2, a=3, b=4, verbose=True) cs.run_once(1, 2, a=3, b=4, verbose=True)
self.assertEquals(time_calls, [6]) self.assertEquals(time_calls, [6])
@ -218,12 +273,14 @@ class TestContainerSync(unittest.TestCase):
def test_container_sync_not_db(self): def test_container_sync_not_db(self):
cring = FakeRing() cring = FakeRing()
cs = sync.ContainerSync({}, container_ring=cring) with mock.patch('swift.container.sync.InternalClient'):
cs = sync.ContainerSync({}, container_ring=cring)
self.assertEquals(cs.container_failures, 0) self.assertEquals(cs.container_failures, 0)
def test_container_sync_missing_db(self): def test_container_sync_missing_db(self):
cring = FakeRing() cring = FakeRing()
cs = sync.ContainerSync({}, container_ring=cring) with mock.patch('swift.container.sync.InternalClient'):
cs = sync.ContainerSync({}, container_ring=cring)
cs.container_sync('isa.db') cs.container_sync('isa.db')
self.assertEquals(cs.container_failures, 1) self.assertEquals(cs.container_failures, 1)
@ -231,7 +288,8 @@ class TestContainerSync(unittest.TestCase):
# Db could be there due to handoff replication so test that we ignore # Db could be there due to handoff replication so test that we ignore
# those. # those.
cring = FakeRing() cring = FakeRing()
cs = sync.ContainerSync({}, container_ring=cring) with mock.patch('swift.container.sync.InternalClient'):
cs = sync.ContainerSync({}, container_ring=cring)
orig_ContainerBroker = sync.ContainerBroker orig_ContainerBroker = sync.ContainerBroker
try: try:
sync.ContainerBroker = lambda p: FakeContainerBroker( sync.ContainerBroker = lambda p: FakeContainerBroker(
@ -263,7 +321,8 @@ class TestContainerSync(unittest.TestCase):
def test_container_sync_deleted(self): def test_container_sync_deleted(self):
cring = FakeRing() cring = FakeRing()
cs = sync.ContainerSync({}, container_ring=cring) with mock.patch('swift.container.sync.InternalClient'):
cs = sync.ContainerSync({}, container_ring=cring)
orig_ContainerBroker = sync.ContainerBroker orig_ContainerBroker = sync.ContainerBroker
try: try:
sync.ContainerBroker = lambda p: FakeContainerBroker( sync.ContainerBroker = lambda p: FakeContainerBroker(
@ -288,7 +347,8 @@ class TestContainerSync(unittest.TestCase):
def test_container_sync_no_to_or_key(self): def test_container_sync_no_to_or_key(self):
cring = FakeRing() cring = FakeRing()
cs = sync.ContainerSync({}, container_ring=cring) with mock.patch('swift.container.sync.InternalClient'):
cs = sync.ContainerSync({}, container_ring=cring)
orig_ContainerBroker = sync.ContainerBroker orig_ContainerBroker = sync.ContainerBroker
try: try:
sync.ContainerBroker = lambda p: FakeContainerBroker( sync.ContainerBroker = lambda p: FakeContainerBroker(
@ -368,7 +428,8 @@ class TestContainerSync(unittest.TestCase):
def test_container_stop_at(self): def test_container_stop_at(self):
cring = FakeRing() cring = FakeRing()
cs = sync.ContainerSync({}, container_ring=cring) with mock.patch('swift.container.sync.InternalClient'):
cs = sync.ContainerSync({}, container_ring=cring)
orig_ContainerBroker = sync.ContainerBroker orig_ContainerBroker = sync.ContainerBroker
orig_time = sync.time orig_time = sync.time
try: try:
@ -411,7 +472,8 @@ class TestContainerSync(unittest.TestCase):
def test_container_first_loop(self): def test_container_first_loop(self):
cring = FakeRing() cring = FakeRing()
cs = sync.ContainerSync({}, container_ring=cring) with mock.patch('swift.container.sync.InternalClient'):
cs = sync.ContainerSync({}, container_ring=cring)
def fake_hash_path(account, container, obj, raw_digest=False): def fake_hash_path(account, container, obj, raw_digest=False):
# Ensures that no rows match for full syncing, ordinal is 0 and # Ensures that no rows match for full syncing, ordinal is 0 and
@ -543,7 +605,9 @@ class TestContainerSync(unittest.TestCase):
def test_container_second_loop(self): def test_container_second_loop(self):
cring = FakeRing() cring = FakeRing()
cs = sync.ContainerSync({}, container_ring=cring) with mock.patch('swift.container.sync.InternalClient'):
cs = sync.ContainerSync({}, container_ring=cring,
logger=self.logger)
orig_ContainerBroker = sync.ContainerBroker orig_ContainerBroker = sync.ContainerBroker
orig_hash_path = sync.hash_path orig_hash_path = sync.hash_path
orig_delete_object = sync.delete_object orig_delete_object = sync.delete_object
@ -649,7 +713,6 @@ class TestContainerSync(unittest.TestCase):
hex = 'abcdef' hex = 'abcdef'
sync.uuid = FakeUUID sync.uuid = FakeUUID
fake_logger = FakeLogger()
def fake_delete_object(path, name=None, headers=None, proxy=None, def fake_delete_object(path, name=None, headers=None, proxy=None,
logger=None, timeout=None): logger=None, timeout=None):
@ -665,12 +728,14 @@ class TestContainerSync(unittest.TestCase):
headers, headers,
{'x-container-sync-key': 'key', 'x-timestamp': '1.2'}) {'x-container-sync-key': 'key', 'x-timestamp': '1.2'})
self.assertEquals(proxy, 'http://proxy') self.assertEquals(proxy, 'http://proxy')
self.assertEqual(logger, fake_logger)
self.assertEqual(timeout, 5.0) self.assertEqual(timeout, 5.0)
self.assertEqual(logger, self.logger)
sync.delete_object = fake_delete_object sync.delete_object = fake_delete_object
cs = sync.ContainerSync({}, container_ring=FakeRing())
cs.logger = fake_logger with mock.patch('swift.container.sync.InternalClient'):
cs = sync.ContainerSync({}, container_ring=FakeRing(),
logger=self.logger)
cs.http_proxies = ['http://proxy'] cs.http_proxies = ['http://proxy']
# Success # Success
self.assertTrue(cs.container_sync_row( self.assertTrue(cs.container_sync_row(
@ -749,7 +814,6 @@ class TestContainerSync(unittest.TestCase):
orig_uuid = sync.uuid orig_uuid = sync.uuid
orig_shuffle = sync.shuffle orig_shuffle = sync.shuffle
orig_put_object = sync.put_object orig_put_object = sync.put_object
orig_direct_get_object = sync.direct_get_object
try: try:
class FakeUUID(object): class FakeUUID(object):
class uuid4(object): class uuid4(object):
@ -757,7 +821,6 @@ class TestContainerSync(unittest.TestCase):
sync.uuid = FakeUUID sync.uuid = FakeUUID
sync.shuffle = lambda x: x sync.shuffle = lambda x: x
fake_logger = FakeLogger()
def fake_put_object(sync_to, name=None, headers=None, def fake_put_object(sync_to, name=None, headers=None,
contents=None, proxy=None, logger=None, contents=None, proxy=None, logger=None,
@ -781,24 +844,25 @@ class TestContainerSync(unittest.TestCase):
'content-type': 'text/plain'}) 'content-type': 'text/plain'})
self.assertEquals(contents.read(), 'contents') self.assertEquals(contents.read(), 'contents')
self.assertEquals(proxy, 'http://proxy') self.assertEquals(proxy, 'http://proxy')
self.assertEqual(logger, fake_logger)
self.assertEqual(timeout, 5.0) self.assertEqual(timeout, 5.0)
self.assertEqual(logger, self.logger)
sync.put_object = fake_put_object sync.put_object = fake_put_object
cs = sync.ContainerSync({}, container_ring=FakeRing()) with mock.patch('swift.container.sync.InternalClient'):
cs.logger = fake_logger cs = sync.ContainerSync({}, container_ring=FakeRing(),
logger=self.logger)
cs.http_proxies = ['http://proxy'] cs.http_proxies = ['http://proxy']
def fake_direct_get_object(node, part, account, container, obj, def fake_get_object(acct, con, obj, headers, acceptable_statuses):
headers, resp_chunk_size=1): self.assertEqual(headers['X-Backend-Storage-Policy-Index'],
self.assertEquals(headers['X-Backend-Storage-Policy-Index'], '0')
'0') return (200, {'other-header': 'other header value',
return ({'other-header': 'other header value', 'etag': '"etagvalue"', 'x-timestamp': '1.2',
'etag': '"etagvalue"', 'x-timestamp': '1.2', 'content-type': 'text/plain; swift_bytes=123'},
'content-type': 'text/plain; swift_bytes=123'},
iter('contents')) iter('contents'))
sync.direct_get_object = fake_direct_get_object
cs.swift.get_object = fake_get_object
# Success as everything says it worked # Success as everything says it worked
self.assertTrue(cs.container_sync_row( self.assertTrue(cs.container_sync_row(
{'deleted': False, {'deleted': False,
@ -809,19 +873,19 @@ class TestContainerSync(unittest.TestCase):
realm, realm_key)) realm, realm_key))
self.assertEquals(cs.container_puts, 1) self.assertEquals(cs.container_puts, 1)
def fake_direct_get_object(node, part, account, container, obj, def fake_get_object(acct, con, obj, headers, acceptable_statuses):
headers, resp_chunk_size=1): self.assertEquals(headers['X-Newest'], True)
self.assertEquals(headers['X-Backend-Storage-Policy-Index'], self.assertEquals(headers['X-Backend-Storage-Policy-Index'],
'0') '0')
return ({'date': 'date value', return (200, {'date': 'date value',
'last-modified': 'last modified value', 'last-modified': 'last modified value',
'x-timestamp': '1.2', 'x-timestamp': '1.2',
'other-header': 'other header value', 'other-header': 'other header value',
'etag': '"etagvalue"', 'etag': '"etagvalue"',
'content-type': 'text/plain; swift_bytes=123'}, 'content-type': 'text/plain; swift_bytes=123'},
iter('contents')) iter('contents'))
sync.direct_get_object = fake_direct_get_object cs.swift.get_object = fake_get_object
# Success as everything says it worked, also checks 'date' and # Success as everything says it worked, also checks 'date' and
# 'last-modified' headers are removed and that 'etag' header is # 'last-modified' headers are removed and that 'etag' header is
# stripped of double quotes. # stripped of double quotes.
@ -836,14 +900,14 @@ class TestContainerSync(unittest.TestCase):
exc = [] exc = []
def fake_direct_get_object(node, part, account, container, obj, def fake_get_object(acct, con, obj, headers, acceptable_statuses):
headers, resp_chunk_size=1): self.assertEquals(headers['X-Newest'], True)
self.assertEquals(headers['X-Backend-Storage-Policy-Index'], self.assertEquals(headers['X-Backend-Storage-Policy-Index'],
'0') '0')
exc.append(Exception('test exception')) exc.append(Exception('test exception'))
raise exc[-1] raise exc[-1]
sync.direct_get_object = fake_direct_get_object cs.swift.get_object = fake_get_object
# Fail due to completely unexpected exception # Fail due to completely unexpected exception
self.assertFalse(cs.container_sync_row( self.assertFalse(cs.container_sync_row(
{'deleted': False, {'deleted': False,
@ -853,22 +917,20 @@ class TestContainerSync(unittest.TestCase):
{'account': 'a', 'container': 'c', 'storage_policy_index': 0}, {'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key)) realm, realm_key))
self.assertEquals(cs.container_puts, 2) self.assertEquals(cs.container_puts, 2)
self.assertEquals(len(exc), 3) self.assertEquals(len(exc), 1)
self.assertEquals(str(exc[-1]), 'test exception') self.assertEquals(str(exc[-1]), 'test exception')
exc = [] exc = []
def fake_direct_get_object(node, part, account, container, obj, def fake_get_object(acct, con, obj, headers, acceptable_statuses):
headers, resp_chunk_size=1): self.assertEquals(headers['X-Newest'], True)
self.assertEquals(headers['X-Backend-Storage-Policy-Index'], self.assertEquals(headers['X-Backend-Storage-Policy-Index'],
'0') '0')
if len(exc) == 0:
exc.append(Exception('test other exception')) exc.append(ClientException('test client exception'))
else:
exc.append(ClientException('test client exception'))
raise exc[-1] raise exc[-1]
sync.direct_get_object = fake_direct_get_object cs.swift.get_object = fake_get_object
# Fail due to all direct_get_object calls failing # Fail due to all direct_get_object calls failing
self.assertFalse(cs.container_sync_row( self.assertFalse(cs.container_sync_row(
{'deleted': False, {'deleted': False,
@ -878,25 +940,22 @@ class TestContainerSync(unittest.TestCase):
{'account': 'a', 'container': 'c', 'storage_policy_index': 0}, {'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key)) realm, realm_key))
self.assertEquals(cs.container_puts, 2) self.assertEquals(cs.container_puts, 2)
self.assertEquals(len(exc), 3) self.assertEquals(len(exc), 1)
self.assertEquals(str(exc[-3]), 'test other exception')
self.assertEquals(str(exc[-2]), 'test client exception')
self.assertEquals(str(exc[-1]), 'test client exception') self.assertEquals(str(exc[-1]), 'test client exception')
def fake_direct_get_object(node, part, account, container, obj, def fake_get_object(acct, con, obj, headers, acceptable_statuses):
headers, resp_chunk_size=1): self.assertEquals(headers['X-Newest'], True)
self.assertEquals(headers['X-Backend-Storage-Policy-Index'], self.assertEquals(headers['X-Backend-Storage-Policy-Index'],
'0') '0')
return ({'other-header': 'other header value', return (200, {'other-header': 'other header value',
'x-timestamp': '1.2', 'etag': '"etagvalue"'}, 'x-timestamp': '1.2', 'etag': '"etagvalue"'},
iter('contents')) iter('contents'))
def fake_put_object(*args, **kwargs): def fake_put_object(*args, **kwargs):
raise ClientException('test client exception', http_status=401) raise ClientException('test client exception', http_status=401)
sync.direct_get_object = fake_direct_get_object cs.swift.get_object = fake_get_object
sync.put_object = fake_put_object sync.put_object = fake_put_object
cs.logger = FakeLogger()
# Fail due to 401 # Fail due to 401
self.assertFalse(cs.container_sync_row( self.assertFalse(cs.container_sync_row(
{'deleted': False, {'deleted': False,
@ -906,15 +965,13 @@ class TestContainerSync(unittest.TestCase):
{'account': 'a', 'container': 'c', 'storage_policy_index': 0}, {'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key)) realm, realm_key))
self.assertEquals(cs.container_puts, 2) self.assertEquals(cs.container_puts, 2)
self.assert_(re.match('Unauth ', self.assertLogMessage('info', 'Unauth')
cs.logger.log_dict['info'][0][0][0]))
def fake_put_object(*args, **kwargs): def fake_put_object(*args, **kwargs):
raise ClientException('test client exception', http_status=404) raise ClientException('test client exception', http_status=404)
sync.put_object = fake_put_object sync.put_object = fake_put_object
# Fail due to 404 # Fail due to 404
cs.logger = FakeLogger()
self.assertFalse(cs.container_sync_row( self.assertFalse(cs.container_sync_row(
{'deleted': False, {'deleted': False,
'name': 'object', 'name': 'object',
@ -923,8 +980,7 @@ class TestContainerSync(unittest.TestCase):
{'account': 'a', 'container': 'c', 'storage_policy_index': 0}, {'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key)) realm, realm_key))
self.assertEquals(cs.container_puts, 2) self.assertEquals(cs.container_puts, 2)
self.assert_(re.match('Not found ', self.assertLogMessage('info', 'Not found', 1)
cs.logger.log_dict['info'][0][0][0]))
def fake_put_object(*args, **kwargs): def fake_put_object(*args, **kwargs):
raise ClientException('test client exception', http_status=503) raise ClientException('test client exception', http_status=503)
@ -939,29 +995,32 @@ class TestContainerSync(unittest.TestCase):
{'account': 'a', 'container': 'c', 'storage_policy_index': 0}, {'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key)) realm, realm_key))
self.assertEquals(cs.container_puts, 2) self.assertEquals(cs.container_puts, 2)
error_lines = cs.logger.get_lines_for_level('error') self.assertLogMessage('error', 'ERROR Syncing')
self.assertEqual(len(error_lines), 1)
self.assertTrue(error_lines[0].startswith('ERROR Syncing '))
finally: finally:
sync.uuid = orig_uuid sync.uuid = orig_uuid
sync.shuffle = orig_shuffle sync.shuffle = orig_shuffle
sync.put_object = orig_put_object sync.put_object = orig_put_object
sync.direct_get_object = orig_direct_get_object
def test_select_http_proxy_None(self): def test_select_http_proxy_None(self):
cs = sync.ContainerSync(
{'sync_proxy': ''}, container_ring=FakeRing()) with mock.patch('swift.container.sync.InternalClient'):
cs = sync.ContainerSync(
{'sync_proxy': ''}, container_ring=FakeRing())
self.assertEqual(cs.select_http_proxy(), None) self.assertEqual(cs.select_http_proxy(), None)
def test_select_http_proxy_one(self): def test_select_http_proxy_one(self):
cs = sync.ContainerSync(
{'sync_proxy': 'http://one'}, container_ring=FakeRing()) with mock.patch('swift.container.sync.InternalClient'):
cs = sync.ContainerSync(
{'sync_proxy': 'http://one'}, container_ring=FakeRing())
self.assertEqual(cs.select_http_proxy(), 'http://one') self.assertEqual(cs.select_http_proxy(), 'http://one')
def test_select_http_proxy_multiple(self): def test_select_http_proxy_multiple(self):
cs = sync.ContainerSync(
{'sync_proxy': 'http://one,http://two,http://three'}, with mock.patch('swift.container.sync.InternalClient'):
container_ring=FakeRing()) cs = sync.ContainerSync(
{'sync_proxy': 'http://one,http://two,http://three'},
container_ring=FakeRing())
self.assertEqual( self.assertEqual(
set(cs.http_proxies), set(cs.http_proxies),
set(['http://one', 'http://two', 'http://three'])) set(['http://one', 'http://two', 'http://three']))