Add Storage Policy Support to Container Sync
Have container sync get its object ring from POLICIES now, update tests to use policy index from container_info and pass that along for use in ring selection. This change also introduced the option of specifiying in the cluster info which of the relam/cluster's is the current realm/cluster. DocImpact Implements: blueprint storage-policies Change-Id: If57d3b0ff8c395f21c81fda76458bc34fcb23257
This commit is contained in:
parent
019d7f5cda
commit
6da9799917
@ -578,6 +578,8 @@ use = egg:swift#container_sync
|
||||
# Updating those will have to be done manually, as knowing what the true realm
|
||||
# endpoint should be cannot always be guessed.
|
||||
# allow_full_urls = true
|
||||
# Set this to specify this clusters //realm/cluster as "current" in /info
|
||||
# current = //REALM/CLUSTER
|
||||
|
||||
# Note: Put it at the beginning of the pipleline to profile all middleware. But
|
||||
# it is safer to put this after catch_errors, gatekeeper and healthcheck.
|
||||
|
@ -28,10 +28,10 @@ class ContainerSync(object):
|
||||
using the container-sync-realms.conf style of container sync.
|
||||
"""
|
||||
|
||||
def __init__(self, app, conf):
|
||||
def __init__(self, app, conf, logger=None):
|
||||
self.app = app
|
||||
self.conf = conf
|
||||
self.logger = get_logger(conf, log_route='container_sync')
|
||||
self.logger = logger or get_logger(conf, log_route='container_sync')
|
||||
self.realms_conf = ContainerSyncRealms(
|
||||
os.path.join(
|
||||
conf.get('swift_dir', '/etc/swift'),
|
||||
@ -39,6 +39,31 @@ class ContainerSync(object):
|
||||
self.logger)
|
||||
self.allow_full_urls = config_true_value(
|
||||
conf.get('allow_full_urls', 'true'))
|
||||
# configure current realm/cluster for /info
|
||||
self.realm = self.cluster = None
|
||||
current = conf.get('current', None)
|
||||
if current:
|
||||
try:
|
||||
self.realm, self.cluster = (p.upper() for p in
|
||||
current.strip('/').split('/'))
|
||||
except ValueError:
|
||||
self.logger.error('Invalid current //REALM/CLUSTER (%s)',
|
||||
current)
|
||||
self.register_info()
|
||||
|
||||
def register_info(self):
|
||||
dct = {}
|
||||
for realm in self.realms_conf.realms():
|
||||
clusters = self.realms_conf.clusters(realm)
|
||||
if clusters:
|
||||
dct[realm] = {'clusters': dict((c, {}) for c in clusters)}
|
||||
if self.realm and self.cluster:
|
||||
try:
|
||||
dct[self.realm]['clusters'][self.cluster]['current'] = True
|
||||
except KeyError:
|
||||
self.logger.error('Unknown current //REALM/CLUSTER (%s)',
|
||||
'//%s/%s' % (self.realm, self.cluster))
|
||||
register_swift_info('container_sync', realms=dct)
|
||||
|
||||
@wsgify
|
||||
def __call__(self, req):
|
||||
@ -102,12 +127,7 @@ class ContainerSync(object):
|
||||
req.environ['swift.authorize_override'] = True
|
||||
if req.path == '/info':
|
||||
# Ensure /info requests get the freshest results
|
||||
dct = {}
|
||||
for realm in self.realms_conf.realms():
|
||||
clusters = self.realms_conf.clusters(realm)
|
||||
if clusters:
|
||||
dct[realm] = {'clusters': dict((c, {}) for c in clusters)}
|
||||
register_swift_info('container_sync', realms=dct)
|
||||
self.register_info()
|
||||
return self.app
|
||||
|
||||
|
||||
|
@ -35,6 +35,7 @@ from swift.common.utils import (
|
||||
whataremyips)
|
||||
from swift.common.daemon import Daemon
|
||||
from swift.common.http import HTTP_UNAUTHORIZED, HTTP_NOT_FOUND
|
||||
from swift.common.storage_policy import POLICIES, POLICY_INDEX
|
||||
|
||||
|
||||
class ContainerSync(Daemon):
|
||||
@ -99,11 +100,9 @@ class ContainerSync(Daemon):
|
||||
section of the container-server.conf
|
||||
:param container_ring: If None, the <swift_dir>/container.ring.gz will be
|
||||
loaded. This is overridden by unit tests.
|
||||
:param object_ring: If None, the <swift_dir>/object.ring.gz will be loaded.
|
||||
This is overridden by unit tests.
|
||||
"""
|
||||
|
||||
def __init__(self, conf, container_ring=None, object_ring=None):
|
||||
def __init__(self, conf, container_ring=None):
|
||||
#: The dict of configuration values from the [container-sync] section
|
||||
#: of the container-server.conf.
|
||||
self.conf = conf
|
||||
@ -150,17 +149,24 @@ class ContainerSync(Daemon):
|
||||
self.container_failures = 0
|
||||
#: Time of last stats report.
|
||||
self.reported = time()
|
||||
swift_dir = conf.get('swift_dir', '/etc/swift')
|
||||
self.swift_dir = conf.get('swift_dir', '/etc/swift')
|
||||
#: swift.common.ring.Ring for locating containers.
|
||||
self.container_ring = container_ring or Ring(swift_dir,
|
||||
self.container_ring = container_ring or Ring(self.swift_dir,
|
||||
ring_name='container')
|
||||
#: swift.common.ring.Ring for locating objects.
|
||||
self.object_ring = object_ring or Ring(swift_dir, ring_name='object')
|
||||
self._myips = whataremyips()
|
||||
self._myport = int(conf.get('bind_port', 6001))
|
||||
swift.common.db.DB_PREALLOCATION = \
|
||||
config_true_value(conf.get('db_preallocation', 'f'))
|
||||
|
||||
def get_object_ring(self, policy_idx):
|
||||
"""
|
||||
Get the ring object to use based on its policy.
|
||||
|
||||
:policy_idx: policy index as defined in swift.conf
|
||||
:returns: appropriate ring object
|
||||
"""
|
||||
return POLICIES.get_object_ring(policy_idx, self.swift_dir)
|
||||
|
||||
def run_forever(self, *args, **kwargs):
|
||||
"""
|
||||
Runs container sync scans until stopped.
|
||||
@ -361,19 +367,22 @@ class ContainerSync(Daemon):
|
||||
self.logger.increment('deletes')
|
||||
self.logger.timing_since('deletes.timing', start_time)
|
||||
else:
|
||||
part, nodes = self.object_ring.get_nodes(
|
||||
info['account'], info['container'],
|
||||
row['name'])
|
||||
part, nodes = \
|
||||
self.get_object_ring(info['storage_policy_index']). \
|
||||
get_nodes(info['account'], info['container'],
|
||||
row['name'])
|
||||
shuffle(nodes)
|
||||
exc = None
|
||||
looking_for_timestamp = float(row['created_at'])
|
||||
timestamp = -1
|
||||
headers = body = None
|
||||
headers_out = {POLICY_INDEX: str(info['storage_policy_index'])}
|
||||
for node in nodes:
|
||||
try:
|
||||
these_headers, this_body = direct_get_object(
|
||||
node, part, info['account'], info['container'],
|
||||
row['name'], resp_chunk_size=65536)
|
||||
row['name'], headers=headers_out,
|
||||
resp_chunk_size=65536)
|
||||
this_timestamp = float(these_headers['x-timestamp'])
|
||||
if this_timestamp > timestamp:
|
||||
timestamp = this_timestamp
|
||||
|
100
test/probe/test_container_sync.py
Normal file
100
test/probe/test_container_sync.py
Normal file
@ -0,0 +1,100 @@
|
||||
#!/usr/bin/python -u
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import unittest
|
||||
import uuid
|
||||
from urlparse import urlparse
|
||||
import random
|
||||
from nose import SkipTest
|
||||
|
||||
from swiftclient import client
|
||||
|
||||
from swift.common.storage_policy import POLICIES
|
||||
from swift.common.manager import Manager
|
||||
from test.probe.common import kill_servers, reset_environment
|
||||
|
||||
|
||||
def get_current_realm_cluster(url):
|
||||
parts = urlparse(url)
|
||||
url = parts.scheme + '://' + parts.netloc + '/info'
|
||||
http_conn = client.http_connection(url)
|
||||
try:
|
||||
info = client.get_capabilities(http_conn)
|
||||
except client.ClientException:
|
||||
raise SkipTest('Unable to retrieve cluster info')
|
||||
try:
|
||||
realms = info['container_sync']['realms']
|
||||
except KeyError:
|
||||
raise SkipTest('Unable to find container sync realms')
|
||||
for realm, realm_info in realms.items():
|
||||
for cluster, options in realm_info['clusters'].items():
|
||||
if options.get('current', False):
|
||||
return realm, cluster
|
||||
raise SkipTest('Unable find current realm cluster')
|
||||
|
||||
|
||||
class TestContainerSync(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
(self.pids, self.port2server, self.account_ring, self.container_ring,
|
||||
self.object_ring, self.policy, self.url, self.token,
|
||||
self.account, self.configs) = reset_environment()
|
||||
self.realm, self.cluster = get_current_realm_cluster(self.url)
|
||||
|
||||
def tearDown(self):
|
||||
kill_servers(self.port2server, self.pids)
|
||||
|
||||
def test_sync(self):
|
||||
base_headers = {'X-Container-Sync-Key': 'secret'}
|
||||
|
||||
# setup dest container
|
||||
dest_container = 'dest-container-%s' % uuid.uuid4()
|
||||
dest_headers = base_headers.copy()
|
||||
dest_policy = None
|
||||
if len(POLICIES) > 1:
|
||||
dest_policy = random.choice(list(POLICIES))
|
||||
dest_headers['X-Storage-Policy'] = dest_policy.name
|
||||
client.put_container(self.url, self.token, dest_container,
|
||||
headers=dest_headers)
|
||||
|
||||
# setup source container
|
||||
source_container = 'source-container-%s' % uuid.uuid4()
|
||||
source_headers = base_headers.copy()
|
||||
sync_to = '//%s/%s/%s/%s' % (self.realm, self.cluster, self.account,
|
||||
dest_container)
|
||||
source_headers['X-Container-Sync-To'] = sync_to
|
||||
if dest_policy:
|
||||
source_policy = random.choice([p for p in POLICIES
|
||||
if p is not dest_policy])
|
||||
source_headers['X-Storage-Policy'] = source_policy.name
|
||||
client.put_container(self.url, self.token, source_container,
|
||||
headers=source_headers)
|
||||
|
||||
# upload to source
|
||||
object_name = 'object-%s' % uuid.uuid4()
|
||||
client.put_object(self.url, self.token, source_container, object_name,
|
||||
'test-body')
|
||||
|
||||
# cycle container-sync
|
||||
Manager(['container-sync']).once()
|
||||
|
||||
# retrieve from sync'd container
|
||||
headers, body = client.get_object(self.url, self.token,
|
||||
dest_container, object_name)
|
||||
self.assertEqual(body, 'test-body')
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
get_current_realm_cluster('http://localhost:8080')
|
||||
unittest.main()
|
@ -19,12 +19,15 @@ import shutil
|
||||
import tempfile
|
||||
import unittest
|
||||
import uuid
|
||||
import mock
|
||||
|
||||
from swift.common import swob
|
||||
from swift.common.middleware import container_sync
|
||||
from swift.proxy.controllers.base import _get_cache_key
|
||||
from swift.proxy.controllers.info import InfoController
|
||||
|
||||
from test.unit import FakeLogger
|
||||
|
||||
|
||||
class FakeApp(object):
|
||||
|
||||
@ -63,6 +66,94 @@ cluster_dfw1 = http://dfw1.host/v1/
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.tempdir, ignore_errors=1)
|
||||
|
||||
def test_current_not_set(self):
|
||||
# no 'current' option set by default
|
||||
self.assertEqual(None, self.sync.realm)
|
||||
self.assertEqual(None, self.sync.cluster)
|
||||
info = {}
|
||||
|
||||
def capture_swift_info(key, **options):
|
||||
info[key] = options
|
||||
|
||||
with mock.patch(
|
||||
'swift.common.middleware.container_sync.register_swift_info',
|
||||
new=capture_swift_info):
|
||||
self.sync.register_info()
|
||||
|
||||
for realm, realm_info in info['container_sync']['realms'].items():
|
||||
for cluster, options in realm_info['clusters'].items():
|
||||
self.assertEqual(options.get('current', False), False)
|
||||
|
||||
def test_current_invalid(self):
|
||||
self.conf = {'swift_dir': self.tempdir, 'current': 'foo'}
|
||||
self.sync = container_sync.ContainerSync(self.app, self.conf,
|
||||
logger=FakeLogger())
|
||||
self.assertEqual(None, self.sync.realm)
|
||||
self.assertEqual(None, self.sync.cluster)
|
||||
info = {}
|
||||
|
||||
def capture_swift_info(key, **options):
|
||||
info[key] = options
|
||||
|
||||
with mock.patch(
|
||||
'swift.common.middleware.container_sync.register_swift_info',
|
||||
new=capture_swift_info):
|
||||
self.sync.register_info()
|
||||
|
||||
for realm, realm_info in info['container_sync']['realms'].items():
|
||||
for cluster, options in realm_info['clusters'].items():
|
||||
self.assertEqual(options.get('current', False), False)
|
||||
|
||||
error_lines = self.sync.logger.get_lines_for_level('error')
|
||||
self.assertEqual(error_lines, ['Invalid current '
|
||||
'//REALM/CLUSTER (foo)'])
|
||||
|
||||
def test_current_in_realms_conf(self):
|
||||
self.conf = {'swift_dir': self.tempdir, 'current': '//us/dfw1'}
|
||||
self.sync = container_sync.ContainerSync(self.app, self.conf)
|
||||
self.assertEqual('US', self.sync.realm)
|
||||
self.assertEqual('DFW1', self.sync.cluster)
|
||||
info = {}
|
||||
|
||||
def capture_swift_info(key, **options):
|
||||
info[key] = options
|
||||
|
||||
with mock.patch(
|
||||
'swift.common.middleware.container_sync.register_swift_info',
|
||||
new=capture_swift_info):
|
||||
self.sync.register_info()
|
||||
|
||||
for realm, realm_info in info['container_sync']['realms'].items():
|
||||
for cluster, options in realm_info['clusters'].items():
|
||||
if options.get('current'):
|
||||
break
|
||||
self.assertEqual(realm, self.sync.realm)
|
||||
self.assertEqual(cluster, self.sync.cluster)
|
||||
|
||||
def test_missing_from_realms_conf(self):
|
||||
self.conf = {'swift_dir': self.tempdir, 'current': 'foo/bar'}
|
||||
self.sync = container_sync.ContainerSync(self.app, self.conf,
|
||||
logger=FakeLogger())
|
||||
self.assertEqual('FOO', self.sync.realm)
|
||||
self.assertEqual('BAR', self.sync.cluster)
|
||||
info = {}
|
||||
|
||||
def capture_swift_info(key, **options):
|
||||
info[key] = options
|
||||
|
||||
with mock.patch(
|
||||
'swift.common.middleware.container_sync.register_swift_info',
|
||||
new=capture_swift_info):
|
||||
self.sync.register_info()
|
||||
|
||||
for realm, realm_info in info['container_sync']['realms'].items():
|
||||
for cluster, options in realm_info['clusters'].items():
|
||||
self.assertEqual(options.get('current', False), False)
|
||||
|
||||
for line in self.sync.logger.get_lines_for_level('error'):
|
||||
self.assertEqual(line, 'Unknown current '
|
||||
'//REALM/CLUSTER (//FOO/BAR)')
|
||||
|
||||
def test_pass_through(self):
|
||||
req = swob.Request.blank('/v1/a/c')
|
||||
resp = req.get_response(self.sync)
|
||||
|
@ -1,3 +1,4 @@
|
||||
|
||||
# Copyright (c) 2010-2012 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
@ -18,12 +19,12 @@ import unittest
|
||||
from contextlib import nested
|
||||
|
||||
import mock
|
||||
|
||||
from test.unit import FakeLogger
|
||||
from swift.container import sync
|
||||
from swift.common import utils
|
||||
from swift.common.exceptions import ClientException
|
||||
|
||||
from swift.common.storage_policy import StoragePolicy, POLICY_INDEX
|
||||
from test.unit import patch_policies
|
||||
|
||||
utils.HASH_PATH_SUFFIX = 'endcap'
|
||||
utils.HASH_PATH_PREFIX = 'endcap'
|
||||
@ -67,6 +68,7 @@ class FakeContainerBroker(object):
|
||||
self.sync_point2 = sync_point2
|
||||
|
||||
|
||||
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())])
|
||||
class TestContainerSync(unittest.TestCase):
|
||||
|
||||
def test_FileLikeIter(self):
|
||||
@ -96,10 +98,8 @@ class TestContainerSync(unittest.TestCase):
|
||||
|
||||
def test_init(self):
|
||||
cring = FakeRing()
|
||||
oring = FakeRing()
|
||||
cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring)
|
||||
cs = sync.ContainerSync({}, container_ring=cring)
|
||||
self.assertTrue(cs.container_ring is cring)
|
||||
self.assertTrue(cs.object_ring is oring)
|
||||
|
||||
def test_run_forever(self):
|
||||
# This runs runs_forever with fakes to succeed for two loops, the first
|
||||
@ -138,11 +138,11 @@ class TestContainerSync(unittest.TestCase):
|
||||
orig_audit_location_generator = sync.audit_location_generator
|
||||
try:
|
||||
sync.ContainerBroker = lambda p: FakeContainerBroker(
|
||||
p, info={'account': 'a', 'container': 'c'})
|
||||
p, info={'account': 'a', 'container': 'c',
|
||||
'storage_policy_index': 0})
|
||||
sync.time = fake_time
|
||||
sync.sleep = fake_sleep
|
||||
cs = sync.ContainerSync({}, container_ring=FakeRing(),
|
||||
object_ring=FakeRing())
|
||||
cs = sync.ContainerSync({}, container_ring=FakeRing())
|
||||
sync.audit_location_generator = fake_audit_location_generator
|
||||
cs.run_forever(1, 2, a=3, b=4, verbose=True)
|
||||
except Exception as err:
|
||||
@ -194,10 +194,10 @@ class TestContainerSync(unittest.TestCase):
|
||||
orig_ContainerBroker = sync.ContainerBroker
|
||||
try:
|
||||
sync.ContainerBroker = lambda p: FakeContainerBroker(
|
||||
p, info={'account': 'a', 'container': 'c'})
|
||||
p, info={'account': 'a', 'container': 'c',
|
||||
'storage_policy_index': 0})
|
||||
sync.time = fake_time
|
||||
cs = sync.ContainerSync({}, container_ring=FakeRing(),
|
||||
object_ring=FakeRing())
|
||||
cs = sync.ContainerSync({}, container_ring=FakeRing())
|
||||
sync.audit_location_generator = fake_audit_location_generator
|
||||
cs.run_once(1, 2, a=3, b=4, verbose=True)
|
||||
self.assertEquals(time_calls, [6])
|
||||
@ -218,14 +218,12 @@ class TestContainerSync(unittest.TestCase):
|
||||
|
||||
def test_container_sync_not_db(self):
|
||||
cring = FakeRing()
|
||||
oring = FakeRing()
|
||||
cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring)
|
||||
cs = sync.ContainerSync({}, container_ring=cring)
|
||||
self.assertEquals(cs.container_failures, 0)
|
||||
|
||||
def test_container_sync_missing_db(self):
|
||||
cring = FakeRing()
|
||||
oring = FakeRing()
|
||||
cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring)
|
||||
cs = sync.ContainerSync({}, container_ring=cring)
|
||||
cs.container_sync('isa.db')
|
||||
self.assertEquals(cs.container_failures, 1)
|
||||
|
||||
@ -233,12 +231,12 @@ class TestContainerSync(unittest.TestCase):
|
||||
# Db could be there due to handoff replication so test that we ignore
|
||||
# those.
|
||||
cring = FakeRing()
|
||||
oring = FakeRing()
|
||||
cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring)
|
||||
cs = sync.ContainerSync({}, container_ring=cring)
|
||||
orig_ContainerBroker = sync.ContainerBroker
|
||||
try:
|
||||
sync.ContainerBroker = lambda p: FakeContainerBroker(
|
||||
p, info={'account': 'a', 'container': 'c'})
|
||||
p, info={'account': 'a', 'container': 'c',
|
||||
'storage_policy_index': 0})
|
||||
cs._myips = ['127.0.0.1'] # No match
|
||||
cs._myport = 1 # No match
|
||||
cs.container_sync('isa.db')
|
||||
@ -265,12 +263,12 @@ class TestContainerSync(unittest.TestCase):
|
||||
|
||||
def test_container_sync_deleted(self):
|
||||
cring = FakeRing()
|
||||
oring = FakeRing()
|
||||
cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring)
|
||||
cs = sync.ContainerSync({}, container_ring=cring)
|
||||
orig_ContainerBroker = sync.ContainerBroker
|
||||
try:
|
||||
sync.ContainerBroker = lambda p: FakeContainerBroker(
|
||||
p, info={'account': 'a', 'container': 'c'}, deleted=False)
|
||||
p, info={'account': 'a', 'container': 'c',
|
||||
'storage_policy_index': 0}, deleted=False)
|
||||
cs._myips = ['10.0.0.0'] # Match
|
||||
cs._myport = 1000 # Match
|
||||
# This complete match will cause the 1 container failure since the
|
||||
@ -279,7 +277,8 @@ class TestContainerSync(unittest.TestCase):
|
||||
self.assertEquals(cs.container_failures, 1)
|
||||
|
||||
sync.ContainerBroker = lambda p: FakeContainerBroker(
|
||||
p, info={'account': 'a', 'container': 'c'}, deleted=True)
|
||||
p, info={'account': 'a', 'container': 'c',
|
||||
'storage_policy_index': 0}, deleted=True)
|
||||
# This complete match will not cause any more container failures
|
||||
# since the broker indicates deletion
|
||||
cs.container_sync('isa.db')
|
||||
@ -289,12 +288,12 @@ class TestContainerSync(unittest.TestCase):
|
||||
|
||||
def test_container_sync_no_to_or_key(self):
|
||||
cring = FakeRing()
|
||||
oring = FakeRing()
|
||||
cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring)
|
||||
cs = sync.ContainerSync({}, container_ring=cring)
|
||||
orig_ContainerBroker = sync.ContainerBroker
|
||||
try:
|
||||
sync.ContainerBroker = lambda p: FakeContainerBroker(
|
||||
p, info={'account': 'a', 'container': 'c',
|
||||
'storage_policy_index': 0,
|
||||
'x_container_sync_point1': -1,
|
||||
'x_container_sync_point2': -1})
|
||||
cs._myips = ['10.0.0.0'] # Match
|
||||
@ -307,6 +306,7 @@ class TestContainerSync(unittest.TestCase):
|
||||
|
||||
sync.ContainerBroker = lambda p: FakeContainerBroker(
|
||||
p, info={'account': 'a', 'container': 'c',
|
||||
'storage_policy_index': 0,
|
||||
'x_container_sync_point1': -1,
|
||||
'x_container_sync_point2': -1},
|
||||
metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1)})
|
||||
@ -320,6 +320,7 @@ class TestContainerSync(unittest.TestCase):
|
||||
|
||||
sync.ContainerBroker = lambda p: FakeContainerBroker(
|
||||
p, info={'account': 'a', 'container': 'c',
|
||||
'storage_policy_index': 0,
|
||||
'x_container_sync_point1': -1,
|
||||
'x_container_sync_point2': -1},
|
||||
metadata={'x-container-sync-key': ('key', 1)})
|
||||
@ -333,6 +334,7 @@ class TestContainerSync(unittest.TestCase):
|
||||
|
||||
sync.ContainerBroker = lambda p: FakeContainerBroker(
|
||||
p, info={'account': 'a', 'container': 'c',
|
||||
'storage_policy_index': 0,
|
||||
'x_container_sync_point1': -1,
|
||||
'x_container_sync_point2': -1},
|
||||
metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
|
||||
@ -348,6 +350,7 @@ class TestContainerSync(unittest.TestCase):
|
||||
|
||||
sync.ContainerBroker = lambda p: FakeContainerBroker(
|
||||
p, info={'account': 'a', 'container': 'c',
|
||||
'storage_policy_index': 0,
|
||||
'x_container_sync_point1': -1,
|
||||
'x_container_sync_point2': -1},
|
||||
metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
|
||||
@ -365,13 +368,13 @@ class TestContainerSync(unittest.TestCase):
|
||||
|
||||
def test_container_stop_at(self):
|
||||
cring = FakeRing()
|
||||
oring = FakeRing()
|
||||
cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring)
|
||||
cs = sync.ContainerSync({}, container_ring=cring)
|
||||
orig_ContainerBroker = sync.ContainerBroker
|
||||
orig_time = sync.time
|
||||
try:
|
||||
sync.ContainerBroker = lambda p: FakeContainerBroker(
|
||||
p, info={'account': 'a', 'container': 'c',
|
||||
'storage_policy_index': 0,
|
||||
'x_container_sync_point1': -1,
|
||||
'x_container_sync_point2': -1},
|
||||
metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
|
||||
@ -408,8 +411,7 @@ class TestContainerSync(unittest.TestCase):
|
||||
|
||||
def test_container_first_loop(self):
|
||||
cring = FakeRing()
|
||||
oring = FakeRing()
|
||||
cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring)
|
||||
cs = sync.ContainerSync({}, container_ring=cring)
|
||||
|
||||
def fake_hash_path(account, container, obj, raw_digest=False):
|
||||
# Ensures that no rows match for full syncing, ordinal is 0 and
|
||||
@ -418,6 +420,7 @@ class TestContainerSync(unittest.TestCase):
|
||||
fcb = FakeContainerBroker(
|
||||
'path',
|
||||
info={'account': 'a', 'container': 'c',
|
||||
'storage_policy_index': 0,
|
||||
'x_container_sync_point1': 2,
|
||||
'x_container_sync_point2': -1},
|
||||
metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
|
||||
@ -443,6 +446,7 @@ class TestContainerSync(unittest.TestCase):
|
||||
return '\x01' * 16
|
||||
fcb = FakeContainerBroker('path', info={'account': 'a',
|
||||
'container': 'c',
|
||||
'storage_policy_index': 0,
|
||||
'x_container_sync_point1': 1,
|
||||
'x_container_sync_point2': 1},
|
||||
metadata={'x-container-sync-to':
|
||||
@ -467,6 +471,7 @@ class TestContainerSync(unittest.TestCase):
|
||||
fcb = FakeContainerBroker(
|
||||
'path',
|
||||
info={'account': 'a', 'container': 'c',
|
||||
'storage_policy_index': 0,
|
||||
'x_container_sync_point1': 2,
|
||||
'x_container_sync_point2': -1},
|
||||
metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
|
||||
@ -489,6 +494,7 @@ class TestContainerSync(unittest.TestCase):
|
||||
fcb = FakeContainerBroker(
|
||||
'path',
|
||||
info={'account': 'a', 'container': 'c',
|
||||
'storage_policy_index': 0,
|
||||
'x_container_sync_point1': 2,
|
||||
'x_container_sync_point2': -1},
|
||||
metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
|
||||
@ -513,6 +519,7 @@ class TestContainerSync(unittest.TestCase):
|
||||
fcb = FakeContainerBroker(
|
||||
'path',
|
||||
info={'account': 'a', 'container': 'c',
|
||||
'storage_policy_index': 0,
|
||||
'x_container_sync_point1': 2,
|
||||
'x_container_sync_point2': -1},
|
||||
metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
|
||||
@ -536,8 +543,7 @@ class TestContainerSync(unittest.TestCase):
|
||||
|
||||
def test_container_second_loop(self):
|
||||
cring = FakeRing()
|
||||
oring = FakeRing()
|
||||
cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring)
|
||||
cs = sync.ContainerSync({}, container_ring=cring)
|
||||
orig_ContainerBroker = sync.ContainerBroker
|
||||
orig_hash_path = sync.hash_path
|
||||
orig_delete_object = sync.delete_object
|
||||
@ -554,6 +560,7 @@ class TestContainerSync(unittest.TestCase):
|
||||
fcb = FakeContainerBroker(
|
||||
'path',
|
||||
info={'account': 'a', 'container': 'c',
|
||||
'storage_policy_index': 0,
|
||||
'x_container_sync_point1': -1,
|
||||
'x_container_sync_point2': -1},
|
||||
metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
|
||||
@ -583,6 +590,7 @@ class TestContainerSync(unittest.TestCase):
|
||||
fcb = FakeContainerBroker(
|
||||
'path',
|
||||
info={'account': 'a', 'container': 'c',
|
||||
'storage_policy_index': 0,
|
||||
'x_container_sync_point1': -1,
|
||||
'x_container_sync_point2': -1},
|
||||
metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
|
||||
@ -603,6 +611,7 @@ class TestContainerSync(unittest.TestCase):
|
||||
fcb = FakeContainerBroker(
|
||||
'path',
|
||||
info={'account': 'a', 'container': 'c',
|
||||
'storage_policy_index': 0,
|
||||
'x_container_sync_point1': -1,
|
||||
'x_container_sync_point2': -1},
|
||||
metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
|
||||
@ -659,8 +668,7 @@ class TestContainerSync(unittest.TestCase):
|
||||
self.assertEqual(logger, fake_logger)
|
||||
|
||||
sync.delete_object = fake_delete_object
|
||||
cs = sync.ContainerSync({}, container_ring=FakeRing(),
|
||||
object_ring=FakeRing())
|
||||
cs = sync.ContainerSync({}, container_ring=FakeRing())
|
||||
cs.logger = fake_logger
|
||||
cs.http_proxies = ['http://proxy']
|
||||
# Success
|
||||
@ -668,8 +676,9 @@ class TestContainerSync(unittest.TestCase):
|
||||
{'deleted': True,
|
||||
'name': 'object',
|
||||
'created_at': '1.2'}, 'http://sync/to/path',
|
||||
'key', FakeContainerBroker('broker'), 'info', realm,
|
||||
realm_key))
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
self.assertEquals(cs.container_deletes, 1)
|
||||
|
||||
exc = []
|
||||
@ -684,8 +693,9 @@ class TestContainerSync(unittest.TestCase):
|
||||
{'deleted': True,
|
||||
'name': 'object',
|
||||
'created_at': '1.2'}, 'http://sync/to/path',
|
||||
'key', FakeContainerBroker('broker'), 'info', realm,
|
||||
realm_key))
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
self.assertEquals(cs.container_deletes, 1)
|
||||
self.assertEquals(len(exc), 1)
|
||||
self.assertEquals(str(exc[-1]), 'test exception')
|
||||
@ -700,8 +710,9 @@ class TestContainerSync(unittest.TestCase):
|
||||
{'deleted': True,
|
||||
'name': 'object',
|
||||
'created_at': '1.2'}, 'http://sync/to/path',
|
||||
'key', FakeContainerBroker('broker'), 'info', realm,
|
||||
realm_key))
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
self.assertEquals(cs.container_deletes, 1)
|
||||
self.assertEquals(len(exc), 2)
|
||||
self.assertEquals(str(exc[-1]), 'test client exception')
|
||||
@ -717,8 +728,9 @@ class TestContainerSync(unittest.TestCase):
|
||||
{'deleted': True,
|
||||
'name': 'object',
|
||||
'created_at': '1.2'}, 'http://sync/to/path',
|
||||
'key', FakeContainerBroker('broker'), 'info', realm,
|
||||
realm_key))
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
self.assertEquals(cs.container_deletes, 2)
|
||||
self.assertEquals(len(exc), 3)
|
||||
self.assertEquals(str(exc[-1]), 'test client exception: 404')
|
||||
@ -771,29 +783,31 @@ class TestContainerSync(unittest.TestCase):
|
||||
|
||||
sync.put_object = fake_put_object
|
||||
|
||||
cs = sync.ContainerSync({}, container_ring=FakeRing(),
|
||||
object_ring=FakeRing())
|
||||
cs = sync.ContainerSync({}, container_ring=FakeRing())
|
||||
cs.logger = fake_logger
|
||||
cs.http_proxies = ['http://proxy']
|
||||
|
||||
def fake_direct_get_object(*args, **kwargs):
|
||||
def fake_direct_get_object(node, part, account, container, obj,
|
||||
headers, resp_chunk_size=1):
|
||||
self.assertEquals(headers[POLICY_INDEX], '0')
|
||||
return ({'other-header': 'other header value',
|
||||
'etag': '"etagvalue"', 'x-timestamp': '1.2',
|
||||
'content-type': 'text/plain; swift_bytes=123'},
|
||||
iter('contents'))
|
||||
|
||||
sync.direct_get_object = fake_direct_get_object
|
||||
# Success as everything says it worked
|
||||
self.assertTrue(cs.container_sync_row(
|
||||
{'deleted': False,
|
||||
'name': 'object',
|
||||
'created_at': '1.2'}, 'http://sync/to/path',
|
||||
'key', FakeContainerBroker('broker'), {
|
||||
'account': 'a',
|
||||
'container': 'c'}, realm, realm_key))
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
self.assertEquals(cs.container_puts, 1)
|
||||
|
||||
def fake_direct_get_object(*args, **kwargs):
|
||||
def fake_direct_get_object(node, part, account, container, obj,
|
||||
headers, resp_chunk_size=1):
|
||||
self.assertEquals(headers[POLICY_INDEX], '0')
|
||||
return ({'date': 'date value',
|
||||
'last-modified': 'last modified value',
|
||||
'x-timestamp': '1.2',
|
||||
@ -810,14 +824,16 @@ class TestContainerSync(unittest.TestCase):
|
||||
{'deleted': False,
|
||||
'name': 'object',
|
||||
'created_at': '1.2'}, 'http://sync/to/path',
|
||||
'key', FakeContainerBroker('broker'), {
|
||||
'account': 'a',
|
||||
'container': 'c'}, realm, realm_key))
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
self.assertEquals(cs.container_puts, 2)
|
||||
|
||||
exc = []
|
||||
|
||||
def fake_direct_get_object(*args, **kwargs):
|
||||
def fake_direct_get_object(node, part, account, container, obj,
|
||||
headers, resp_chunk_size=1):
|
||||
self.assertEquals(headers[POLICY_INDEX], '0')
|
||||
exc.append(Exception('test exception'))
|
||||
raise exc[-1]
|
||||
|
||||
@ -827,16 +843,18 @@ class TestContainerSync(unittest.TestCase):
|
||||
{'deleted': False,
|
||||
'name': 'object',
|
||||
'created_at': '1.2'}, 'http://sync/to/path',
|
||||
'key', FakeContainerBroker('broker'), {
|
||||
'account': 'a',
|
||||
'container': 'c'}, realm, realm_key))
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
self.assertEquals(cs.container_puts, 2)
|
||||
self.assertEquals(len(exc), 3)
|
||||
self.assertEquals(str(exc[-1]), 'test exception')
|
||||
|
||||
exc = []
|
||||
|
||||
def fake_direct_get_object(*args, **kwargs):
|
||||
def fake_direct_get_object(node, part, account, container, obj,
|
||||
headers, resp_chunk_size=1):
|
||||
self.assertEquals(headers[POLICY_INDEX], '0')
|
||||
if len(exc) == 0:
|
||||
exc.append(Exception('test other exception'))
|
||||
else:
|
||||
@ -849,16 +867,18 @@ class TestContainerSync(unittest.TestCase):
|
||||
{'deleted': False,
|
||||
'name': 'object',
|
||||
'created_at': '1.2'}, 'http://sync/to/path',
|
||||
'key', FakeContainerBroker('broker'), {
|
||||
'account': 'a',
|
||||
'container': 'c'}, realm, realm_key))
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
self.assertEquals(cs.container_puts, 2)
|
||||
self.assertEquals(len(exc), 3)
|
||||
self.assertEquals(str(exc[-3]), 'test other exception')
|
||||
self.assertEquals(str(exc[-2]), 'test client exception')
|
||||
self.assertEquals(str(exc[-1]), 'test client exception')
|
||||
|
||||
def fake_direct_get_object(*args, **kwargs):
|
||||
def fake_direct_get_object(node, part, account, container, obj,
|
||||
headers, resp_chunk_size=1):
|
||||
self.assertEquals(headers[POLICY_INDEX], '0')
|
||||
return ({'other-header': 'other header value',
|
||||
'x-timestamp': '1.2', 'etag': '"etagvalue"'},
|
||||
iter('contents'))
|
||||
@ -874,9 +894,9 @@ class TestContainerSync(unittest.TestCase):
|
||||
{'deleted': False,
|
||||
'name': 'object',
|
||||
'created_at': '1.2'}, 'http://sync/to/path',
|
||||
'key', FakeContainerBroker('broker'), {
|
||||
'account': 'a',
|
||||
'container': 'c'}, realm, realm_key))
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
self.assertEquals(cs.container_puts, 2)
|
||||
self.assert_(re.match('Unauth ',
|
||||
cs.logger.log_dict['info'][0][0][0]))
|
||||
@ -891,9 +911,9 @@ class TestContainerSync(unittest.TestCase):
|
||||
{'deleted': False,
|
||||
'name': 'object',
|
||||
'created_at': '1.2'}, 'http://sync/to/path',
|
||||
'key', FakeContainerBroker('broker'), {
|
||||
'account': 'a',
|
||||
'container': 'c'}, realm, realm_key))
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
self.assertEquals(cs.container_puts, 2)
|
||||
self.assert_(re.match('Not found ',
|
||||
cs.logger.log_dict['info'][0][0][0]))
|
||||
@ -907,9 +927,9 @@ class TestContainerSync(unittest.TestCase):
|
||||
{'deleted': False,
|
||||
'name': 'object',
|
||||
'created_at': '1.2'}, 'http://sync/to/path',
|
||||
'key', FakeContainerBroker('broker'), {
|
||||
'account': 'a',
|
||||
'container': 'c'}, realm, realm_key))
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
self.assertEquals(cs.container_puts, 2)
|
||||
self.assertTrue(
|
||||
cs.logger.log_dict['exception'][0][0][0].startswith(
|
||||
@ -922,21 +942,18 @@ class TestContainerSync(unittest.TestCase):
|
||||
|
||||
def test_select_http_proxy_None(self):
|
||||
cs = sync.ContainerSync(
|
||||
{'sync_proxy': ''}, container_ring=FakeRing(),
|
||||
object_ring=FakeRing())
|
||||
{'sync_proxy': ''}, container_ring=FakeRing())
|
||||
self.assertEqual(cs.select_http_proxy(), None)
|
||||
|
||||
def test_select_http_proxy_one(self):
|
||||
cs = sync.ContainerSync(
|
||||
{'sync_proxy': 'http://one'}, container_ring=FakeRing(),
|
||||
object_ring=FakeRing())
|
||||
{'sync_proxy': 'http://one'}, container_ring=FakeRing())
|
||||
self.assertEqual(cs.select_http_proxy(), 'http://one')
|
||||
|
||||
def test_select_http_proxy_multiple(self):
|
||||
cs = sync.ContainerSync(
|
||||
{'sync_proxy': 'http://one,http://two,http://three'},
|
||||
container_ring=FakeRing(),
|
||||
object_ring=FakeRing())
|
||||
container_ring=FakeRing())
|
||||
self.assertEqual(
|
||||
set(cs.http_proxies),
|
||||
set(['http://one', 'http://two', 'http://three']))
|
||||
|
Loading…
x
Reference in New Issue
Block a user