Alistair Coles f679ed0cc8 Make container sync copy SLO manifests
Currently the container sync daemon fails to copy
an SLO manifest, and the error will stall progress
of the sync process on that container. There are
several reasons why the sync of an SLO manifest
may fail:

1. The GET of the manifest from the source
   container returns an X-Static-Large-Object header
   that is not allowed to be included with a PUT
   to the destination container.

2. The format of the manifest object that is read
   from the source is not in the syntax required
   for a SLO manifest PUT.

3. Assuming 2 were fixed, the PUT of the manifest
   includes an ETag header which will not match the
   md5 of the manifest generated by the receiving
   proxy's SLO middleware.

4. If the manifest is being synced to a different
   account and/or cluster, then the SLO segments may
   not have been synced and so the validation of the
   PUT manifest will fail.

This patch addresses all of these obstacles by
enabling the destination container-sync middleware to
cause the SLO middleware to be bypassed by setting a
swift.slo_override flag in the request environ. This
flag is only set for request that have been validated
as originating from a container sync peer.

This is justifed by noting that a SLO manifest PUT from
a container sync peer can be assumed to have valid syntax
because it was already been validated when written to
the source container.

Furthermore, we must allow SLO manifests to be synced
without requiring the semantic of their content to be
re-validated because we have no way to enforce or check
that segments have been synced prior to the manifest, nor
to check that the semantic of the manifest is still valid
at the source.

This does mean that GETs to synced SLO manifests may fail
if segments have not been synced. This is however
consistent with the expectation for synced DLO manifests
and indeed for the source SLO manifest if segments have
been deleted since it was written.

Co-Authored-By: Oshrit Feder <oshritf@il.ibm.com>
Change-Id: I8d503419b7996721a671ed6b2795224775a7d8c6
Closes-Bug: #1605597
2016-09-14 13:32:00 +01:00

import os
import unittest
from textwrap import dedent
import mock
import errno
from swift.common.utils import Timestamp
from test.unit import debug_logger
from swift.container import sync
from swift.common.db import DatabaseConnectionError
from swift.common import utils
from swift.common.wsgi import ConfigString
from swift.common.exceptions import ClientException
from swift.common.storage_policy import StoragePolicy
import test
from test.unit import patch_policies, with_tempdir
utils.HASH_PATH_SUFFIX = 'endcap'
utils.HASH_PATH_PREFIX = 'endcap'
class FakeRing(object):
def __init__(self):
self.devs = [{'ip': '10.0.0.%s' % x, 'port': 1000 + x, 'device': 'sda'}
for x in range(3)]
def get_nodes(self, account, container=None, obj=None):
return 1, list(self.devs)
class FakeContainerBroker(object):
def __init__(self, path, metadata=None, info=None, deleted=False,
self.db_file = path
self.db_dir = os.path.dirname(path)
self.metadata = metadata if metadata else {}
self.info = info if info else {}
self.deleted = deleted
self.items_since = items_since if items_since else []
self.sync_point1 = -1
self.sync_point2 = -1
def get_max_row(self):
return 1
def get_info(self):
return self.info
def is_deleted(self):
return self.deleted
def get_items_since(self, sync_point, limit):
if sync_point < 0:
sync_point = 0
return self.items_since[sync_point:sync_point + limit]
def set_x_container_sync_points(self, sync_point1, sync_point2):
self.sync_point1 = sync_point1
self.sync_point2 = sync_point2
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())])
class TestContainerSync(unittest.TestCase):
def setUp(self):
self.logger = debug_logger('test-container-sync')
def test_FileLikeIter(self):
# Retained test to show new FileLikeIter acts just like the removed
# _Iter2FileLikeObject did.
flo = sync.FileLikeIter(iter(['123', '4567', '89', '0']))
expect = '1234567890'
got = flo.read(2)
self.assertTrue(len(got) <= 2)
self.assertEqual(got, expect[:len(got)])
expect = expect[len(got):]
got = flo.read(5)
self.assertTrue(len(got) <= 5)
self.assertEqual(got, expect[:len(got)])
expect = expect[len(got):]
self.assertEqual(flo.read(), expect)
self.assertEqual(flo.read(), '')
self.assertEqual(flo.read(2), '')
flo = sync.FileLikeIter(iter(['123', '4567', '89', '0']))
self.assertEqual(flo.read(), '1234567890')
self.assertEqual(flo.read(), '')
self.assertEqual(flo.read(2), '')
def assertLogMessage(self, msg_level, expected, skip=0):
for line in self.logger.get_lines_for_level(msg_level)[skip:]:
msg = 'expected %r not in %r' % (expected, line)
self.assertTrue(expected in line, msg)
def test_init(self, tempdir):
ic_conf_path = os.path.join(tempdir, 'internal-client.conf')
cring = FakeRing()
with mock.patch('swift.container.sync.InternalClient'):
cs = sync.ContainerSync({}, container_ring=cring)
self.assertTrue(cs.container_ring is cring)
# specified but not exists will not start
conf = {'internal_client_conf_path': ic_conf_path}
self.assertRaises(SystemExit, sync.ContainerSync, conf,
container_ring=cring, logger=self.logger)
# not specified will use default conf
with mock.patch('swift.container.sync.InternalClient') as mock_ic:
cs = sync.ContainerSync({}, container_ring=cring,
self.assertTrue(cs.container_ring is cring)
conf_path, name, retry = mock_ic.call_args[0]
self.assertTrue(isinstance(conf_path, ConfigString))
self.assertLogMessage('warning', 'internal_client_conf_path')
self.assertLogMessage('warning', 'internal-client.conf-sample')
# correct
contents = dedent(sync.ic_conf_body)
with open(ic_conf_path, 'w') as f:
with mock.patch('swift.container.sync.InternalClient') as mock_ic:
cs = sync.ContainerSync(conf, container_ring=cring)
self.assertTrue(cs.container_ring is cring)
conf_path, name, retry = mock_ic.call_args[0]
self.assertEqual(conf_path, ic_conf_path)
sample_conf_filename = os.path.join(
with open(sample_conf_filename) as sample_conf_file:
sample_conf = sample_conf_file.read()
self.assertEqual(contents, sample_conf)
def test_run_forever(self):
# This runs runs_forever with fakes to succeed for two loops, the first
# causing a report but no interval sleep, the second no report but an
# interval sleep.
time_calls = [0]
sleep_calls = []
def fake_time():
time_calls[0] += 1
returns = [1, # Initialized reported time
1, # Start time
3602, # Is it report time (yes)
3602, # Report time
3602, # Elapsed time for "under interval" (no)
3602, # Start time
3603, # Is it report time (no)
3603] # Elapsed time for "under interval" (yes)
if time_calls[0] == len(returns) + 1:
raise Exception('we are now done')
return returns[time_calls[0] - 1]
def fake_sleep(amount):
gen_func = ('swift.container.sync_store.'
with mock.patch('swift.container.sync.InternalClient'), \
mock.patch('swift.container.sync.time', fake_time), \
mock.patch('swift.container.sync.sleep', fake_sleep), \
mock.patch(gen_func) as fake_generator, \
lambda p: FakeContainerBroker(p, info={
'account': 'a', 'container': 'c',
'storage_policy_index': 0})):
fake_generator.side_effect = [iter(['container.db']),
cs = sync.ContainerSync({}, container_ring=FakeRing())
except Exception as err:
if str(err) != 'we are now done':
self.assertEqual(time_calls, [9])
self.assertEqual(len(sleep_calls), 2)
self.assertLessEqual(sleep_calls[0], cs.interval)
self.assertEqual(cs.interval - 1, sleep_calls[1])
self.assertEqual(2, fake_generator.call_count)
self.assertEqual(cs.reported, 3602)
def test_run_once(self):
# This runs runs_once with fakes twice, the first causing an interim
# report, the second with no interim report.
time_calls = [0]
def fake_time():
time_calls[0] += 1
returns = [1, # Initialized reported time
1, # Start time
3602, # Is it report time (yes)
3602, # Report time
3602, # End report time
3602, # For elapsed
3602, # Start time
3603, # Is it report time (no)
3604, # End report time
3605] # For elapsed
if time_calls[0] == len(returns) + 1:
raise Exception('we are now done')
return returns[time_calls[0] - 1]
gen_func = ('swift.container.sync_store.'
with mock.patch('swift.container.sync.InternalClient'), \
mock.patch('swift.container.sync.time', fake_time), \
mock.patch(gen_func) as fake_generator, \
lambda p: FakeContainerBroker(p, info={
'account': 'a', 'container': 'c',
'storage_policy_index': 0})):
fake_generator.side_effect = [iter(['container.db']),
cs = sync.ContainerSync({}, container_ring=FakeRing())
self.assertEqual(time_calls, [6])
self.assertEqual(1, fake_generator.call_count)
self.assertEqual(cs.reported, 3602)
except Exception as err:
if str(err) != 'we are now done':
self.assertEqual(time_calls, [10])
self.assertEqual(2, fake_generator.call_count)
self.assertEqual(cs.reported, 3604)
def test_container_sync_not_db(self):
cring = FakeRing()
with mock.patch('swift.container.sync.InternalClient'):
cs = sync.ContainerSync({}, container_ring=cring)
self.assertEqual(cs.container_failures, 0)
def test_container_sync_missing_db(self):
cring = FakeRing()
with mock.patch('swift.container.sync.InternalClient'):
cs = sync.ContainerSync({}, container_ring=cring)
broker = 'swift.container.backend.ContainerBroker'
store = 'swift.container.sync_store.ContainerSyncStore'
# In this test we call the container_sync instance several
# times with a missing db in various combinations.
# Since we use the same ContainerSync instance for all tests
# its failures counter increases by one with each call.
# Test the case where get_info returns DatabaseConnectionError
# with DB does not exist, and we succeed in deleting it.
with mock.patch(broker + '.get_info') as fake_get_info:
with mock.patch(store + '.remove_synced_container') as fake_remove:
fake_get_info.side_effect = DatabaseConnectionError(
"DB doesn't exist")
self.assertEqual(cs.container_failures, 1)
self.assertEqual(cs.container_skips, 0)
self.assertEqual(1, fake_remove.call_count)
self.assertEqual('isa.db', fake_remove.call_args[0][0].db_file)
# Test the case where get_info returns DatabaseConnectionError
# with DB does not exist, and we fail to delete it.
with mock.patch(broker + '.get_info') as fake_get_info:
with mock.patch(store + '.remove_synced_container') as fake_remove:
fake_get_info.side_effect = DatabaseConnectionError(
"DB doesn't exist")
fake_remove.side_effect = OSError('1')
self.assertEqual(cs.container_failures, 2)
self.assertEqual(cs.container_skips, 0)
self.assertEqual(1, fake_remove.call_count)
self.assertEqual('isa.db', fake_remove.call_args[0][0].db_file)
# Test the case where get_info returns DatabaseConnectionError
# with DB does not exist, and it returns an error != ENOENT.
with mock.patch(broker + '.get_info') as fake_get_info:
with mock.patch(store + '.remove_synced_container') as fake_remove:
fake_get_info.side_effect = DatabaseConnectionError(
"DB doesn't exist")
fake_remove.side_effect = OSError(errno.EPERM, 'a')
self.assertEqual(cs.container_failures, 3)
self.assertEqual(cs.container_skips, 0)
self.assertEqual(1, fake_remove.call_count)
self.assertEqual('isa.db', fake_remove.call_args[0][0].db_file)
# Test the case where get_info returns DatabaseConnectionError
# error different than DB does not exist
with mock.patch(broker + '.get_info') as fake_get_info:
with mock.patch(store + '.remove_synced_container') as fake_remove:
fake_get_info.side_effect = DatabaseConnectionError('a', 'a')
self.assertEqual(cs.container_failures, 4)
self.assertEqual(cs.container_skips, 0)
self.assertEqual(0, fake_remove.call_count)
def test_container_sync_not_my_db(self):
# Db could be there due to handoff replication so test that we ignore
# those.
cring = FakeRing()
with mock.patch('swift.container.sync.InternalClient'):
cs = sync.ContainerSync({
'bind_ip': '',
}, container_ring=cring)
# Plumbing test for bind_ip and whataremyips()
self.assertEqual([''], cs._myips)
orig_ContainerBroker = sync.ContainerBroker
sync.ContainerBroker = lambda p: FakeContainerBroker(
p, info={'account': 'a', 'container': 'c',
'storage_policy_index': 0})
cs._myips = [''] # No match
cs._myport = 1 # No match
self.assertEqual(cs.container_failures, 0)
cs._myips = [''] # Match
cs._myport = 1 # No match
self.assertEqual(cs.container_failures, 0)
cs._myips = [''] # No match
cs._myport = 1000 # Match
self.assertEqual(cs.container_failures, 0)
cs._myips = [''] # Match
cs._myport = 1000 # Match
# This complete match will cause the 1 container failure since the
# broker's info doesn't contain sync point keys
self.assertEqual(cs.container_failures, 1)
sync.ContainerBroker = orig_ContainerBroker
def test_container_sync_deleted(self):
cring = FakeRing()
with mock.patch('swift.container.sync.InternalClient'):
cs = sync.ContainerSync({}, container_ring=cring)
orig_ContainerBroker = sync.ContainerBroker
sync.ContainerBroker = lambda p: FakeContainerBroker(
p, info={'account': 'a', 'container': 'c',
'storage_policy_index': 0}, deleted=False)
cs._myips = [''] # Match
cs._myport = 1000 # Match
# This complete match will cause the 1 container failure since the
# broker's info doesn't contain sync point keys
self.assertEqual(cs.container_failures, 1)
sync.ContainerBroker = lambda p: FakeContainerBroker(
p, info={'account': 'a', 'container': 'c',
'storage_policy_index': 0}, deleted=True)
# This complete match will not cause any more container failures
# since the broker indicates deletion
self.assertEqual(cs.container_failures, 1)
sync.ContainerBroker = orig_ContainerBroker
def test_container_sync_no_to_or_key(self):
cring = FakeRing()
with mock.patch('swift.container.sync.InternalClient'):
cs = sync.ContainerSync({}, container_ring=cring)
orig_ContainerBroker = sync.ContainerBroker
sync.ContainerBroker = lambda p: FakeContainerBroker(
p, info={'account': 'a', 'container': 'c',
'storage_policy_index': 0,
'x_container_sync_point1': -1,
'x_container_sync_point2': -1})
cs._myips = [''] # Match
cs._myport = 1000 # Match
# This complete match will be skipped since the broker's metadata
# has no x-container-sync-to or x-container-sync-key
self.assertEqual(cs.container_failures, 0)
self.assertEqual(cs.container_skips, 1)
sync.ContainerBroker = lambda p: FakeContainerBroker(
p, info={'account': 'a', 'container': 'c',
'storage_policy_index': 0,
'x_container_sync_point1': -1,
'x_container_sync_point2': -1},
metadata={'x-container-sync-to': ('', 1)})
cs._myips = [''] # Match
cs._myport = 1000 # Match
# This complete match will be skipped since the broker's metadata
# has no x-container-sync-key
self.assertEqual(cs.container_failures, 0)
self.assertEqual(cs.container_skips, 2)
sync.ContainerBroker = lambda p: FakeContainerBroker(
p, info={'account': 'a', 'container': 'c',
'storage_policy_index': 0,
'x_container_sync_point1': -1,
'x_container_sync_point2': -1},
metadata={'x-container-sync-key': ('key', 1)})
cs._myips = [''] # Match
cs._myport = 1000 # Match
# This complete match will be skipped since the broker's metadata
# has no x-container-sync-to
self.assertEqual(cs.container_failures, 0)
self.assertEqual(cs.container_skips, 3)
sync.ContainerBroker = lambda p: FakeContainerBroker(
p, info={'account': 'a', 'container': 'c',
'storage_policy_index': 0,
'x_container_sync_point1': -1,
'x_container_sync_point2': -1},
metadata={'x-container-sync-to': ('', 1),
'x-container-sync-key': ('key', 1)})
cs._myips = [''] # Match
cs._myport = 1000 # Match
cs.allowed_sync_hosts = []
# This complete match will cause a container failure since the
# sync-to won't validate as allowed.
self.assertEqual(cs.container_failures, 1)
self.assertEqual(cs.container_skips, 3)
sync.ContainerBroker = lambda p: FakeContainerBroker(
p, info={'account': 'a', 'container': 'c',
'storage_policy_index': 0,
'x_container_sync_point1': -1,
'x_container_sync_point2': -1},
metadata={'x-container-sync-to': ('', 1),
'x-container-sync-key': ('key', 1)})
cs._myips = [''] # Match
cs._myport = 1000 # Match
cs.allowed_sync_hosts = ['']
# This complete match will succeed completely since the broker
# get_items_since will return no new rows.
self.assertEqual(cs.container_failures, 1)
self.assertEqual(cs.container_skips, 3)
sync.ContainerBroker = orig_ContainerBroker
def test_container_stop_at(self):
cring = FakeRing()
with mock.patch('swift.container.sync.InternalClient'):
cs = sync.ContainerSync({}, container_ring=cring)
orig_ContainerBroker = sync.ContainerBroker
orig_time = sync.time
sync.ContainerBroker = lambda p: FakeContainerBroker(
p, info={'account': 'a', 'container': 'c',
'storage_policy_index': 0,
'x_container_sync_point1': -1,
'x_container_sync_point2': -1},
metadata={'x-container-sync-to': ('', 1),
'x-container-sync-key': ('key', 1)},
items_since=['erroneous data'])
cs._myips = [''] # Match
cs._myport = 1000 # Match
cs.allowed_sync_hosts = ['']
# This sync will fail since the items_since data is bad.
self.assertEqual(cs.container_failures, 1)
self.assertEqual(cs.container_skips, 0)
# Set up fake times to make the sync short-circuit as having taken
# too long
fake_times = [
1.0, # Compute the time to move on
100000.0, # Compute if it's time to move on from first loop
100000.0] # Compute if it's time to move on from second loop
def fake_time():
return fake_times.pop(0)
sync.time = fake_time
# This same sync won't fail since it will look like it took so long
# as to be time to move on (before it ever actually tries to do
# anything).
self.assertEqual(cs.container_failures, 1)
self.assertEqual(cs.container_skips, 0)
sync.ContainerBroker = orig_ContainerBroker
sync.time = orig_time
def test_container_first_loop(self):
cring = FakeRing()
with mock.patch('swift.container.sync.InternalClient'):
cs = sync.ContainerSync({}, container_ring=cring)
def fake_hash_path(account, container, obj, raw_digest=False):
# Ensures that no rows match for full syncing, ordinal is 0 and
# all hashes are 0
return '\x00' * 16
fcb = FakeContainerBroker(
info={'account': 'a', 'container': 'c',
'storage_policy_index': 0,
'x_container_sync_point1': 2,
'x_container_sync_point2': -1},
metadata={'x-container-sync-to': ('', 1),
'x-container-sync-key': ('key', 1)},
items_since=[{'ROWID': 1, 'name': 'o'}])
with mock.patch('swift.container.sync.ContainerBroker',
lambda p: fcb), \
mock.patch('swift.container.sync.hash_path', fake_hash_path):
cs._myips = [''] # Match
cs._myport = 1000 # Match
cs.allowed_sync_hosts = ['']
# Succeeds because no rows match
self.assertEqual(cs.container_failures, 1)
self.assertEqual(cs.container_skips, 0)
self.assertEqual(fcb.sync_point1, None)
self.assertEqual(fcb.sync_point2, -1)
def fake_hash_path(account, container, obj, raw_digest=False):
# Ensures that all rows match for full syncing, ordinal is 0
# and all hashes are 1
return '\x01' * 16
fcb = FakeContainerBroker('path', info={'account': 'a',
'container': 'c',
'storage_policy_index': 0,
'x_container_sync_point1': 1,
'x_container_sync_point2': 1},
('', 1),
('key', 1)},
items_since=[{'ROWID': 1, 'name': 'o'}])
with mock.patch('swift.container.sync.ContainerBroker',
lambda p: fcb), \
mock.patch('swift.container.sync.hash_path', fake_hash_path):
cs._myips = [''] # Match
cs._myport = 1000 # Match
cs.allowed_sync_hosts = ['']
# Succeeds because the two sync points haven't deviated yet
self.assertEqual(cs.container_failures, 1)
self.assertEqual(cs.container_skips, 0)
self.assertEqual(fcb.sync_point1, -1)
self.assertEqual(fcb.sync_point2, -1)
fcb = FakeContainerBroker(
info={'account': 'a', 'container': 'c',
'storage_policy_index': 0,
'x_container_sync_point1': 2,
'x_container_sync_point2': -1},
metadata={'x-container-sync-to': ('', 1),
'x-container-sync-key': ('key', 1)},
items_since=[{'ROWID': 1, 'name': 'o'}])
with mock.patch('swift.container.sync.ContainerBroker', lambda p: fcb):
cs._myips = [''] # Match
cs._myport = 1000 # Match
cs.allowed_sync_hosts = ['']
# Fails because container_sync_row will fail since the row has no
# 'deleted' key
self.assertEqual(cs.container_failures, 2)
self.assertEqual(cs.container_skips, 0)
self.assertEqual(fcb.sync_point1, None)
self.assertEqual(fcb.sync_point2, -1)
def fake_delete_object(*args, **kwargs):
raise ClientException
fcb = FakeContainerBroker(
info={'account': 'a', 'container': 'c',
'storage_policy_index': 0,
'x_container_sync_point1': 2,
'x_container_sync_point2': -1},
metadata={'x-container-sync-to': ('', 1),
'x-container-sync-key': ('key', 1)},
items_since=[{'ROWID': 1, 'name': 'o', 'created_at': '1.2',
'deleted': True}])
with mock.patch('swift.container.sync.ContainerBroker',
lambda p: fcb), \
cs._myips = [''] # Match
cs._myport = 1000 # Match
cs.allowed_sync_hosts = ['']
# Fails because delete_object fails
self.assertEqual(cs.container_failures, 3)
self.assertEqual(cs.container_skips, 0)
self.assertEqual(fcb.sync_point1, None)
self.assertEqual(fcb.sync_point2, -1)
fcb = FakeContainerBroker(
info={'account': 'a', 'container': 'c',
'storage_policy_index': 0,
'x_container_sync_point1': 2,
'x_container_sync_point2': -1},
metadata={'x-container-sync-to': ('', 1),
'x-container-sync-key': ('key', 1)},
items_since=[{'ROWID': 1, 'name': 'o', 'created_at': '1.2',
'deleted': True}])
with mock.patch('swift.container.sync.ContainerBroker',
lambda p: fcb), \
lambda *x, **y: None):
cs._myips = [''] # Match
cs._myport = 1000 # Match
cs.allowed_sync_hosts = ['']
# Succeeds because delete_object succeeds
self.assertEqual(cs.container_failures, 3)
self.assertEqual(cs.container_skips, 0)
self.assertEqual(fcb.sync_point1, None)
self.assertEqual(fcb.sync_point2, 1)
def test_container_second_loop(self):
cring = FakeRing()
with mock.patch('swift.container.sync.InternalClient'):
cs = sync.ContainerSync({}, container_ring=cring,
orig_ContainerBroker = sync.ContainerBroker
orig_hash_path = sync.hash_path
orig_delete_object = sync.delete_object
# We'll ensure the first loop is always skipped by keeping the two
# sync points equal
def fake_hash_path(account, container, obj, raw_digest=False):
# Ensures that no rows match for second loop, ordinal is 0 and
# all hashes are 1
return '\x01' * 16
sync.hash_path = fake_hash_path
fcb = FakeContainerBroker(
info={'account': 'a', 'container': 'c',
'storage_policy_index': 0,
'x_container_sync_point1': -1,
'x_container_sync_point2': -1},
metadata={'x-container-sync-to': ('', 1),
'x-container-sync-key': ('key', 1)},
items_since=[{'ROWID': 1, 'name': 'o'}])
sync.ContainerBroker = lambda p: fcb
cs._myips = [''] # Match
cs._myport = 1000 # Match
cs.allowed_sync_hosts = ['']
# Succeeds because no rows match
self.assertEqual(cs.container_failures, 0)
self.assertEqual(cs.container_skips, 0)
self.assertEqual(fcb.sync_point1, 1)
self.assertEqual(fcb.sync_point2, None)
def fake_hash_path(account, container, obj, raw_digest=False):
# Ensures that all rows match for second loop, ordinal is 0 and
# all hashes are 0
return '\x00' * 16
def fake_delete_object(*args, **kwargs):
sync.hash_path = fake_hash_path
sync.delete_object = fake_delete_object
fcb = FakeContainerBroker(
info={'account': 'a', 'container': 'c',
'storage_policy_index': 0,
'x_container_sync_point1': -1,
'x_container_sync_point2': -1},
metadata={'x-container-sync-to': ('', 1),
'x-container-sync-key': ('key', 1)},
items_since=[{'ROWID': 1, 'name': 'o'}])
sync.ContainerBroker = lambda p: fcb
cs._myips = [''] # Match
cs._myport = 1000 # Match
cs.allowed_sync_hosts = ['']
# Fails because row is missing 'deleted' key
# Nevertheless the fault is skipped
self.assertEqual(cs.container_failures, 1)
self.assertEqual(cs.container_skips, 0)
self.assertEqual(fcb.sync_point1, 1)
self.assertEqual(fcb.sync_point2, None)
fcb = FakeContainerBroker(
info={'account': 'a', 'container': 'c',
'storage_policy_index': 0,
'x_container_sync_point1': -1,
'x_container_sync_point2': -1},
metadata={'x-container-sync-to': ('', 1),
'x-container-sync-key': ('key', 1)},
items_since=[{'ROWID': 1, 'name': 'o', 'created_at': '1.2',
'deleted': True}])
sync.ContainerBroker = lambda p: fcb
cs._myips = [''] # Match
cs._myport = 1000 # Match
cs.allowed_sync_hosts = ['']
# Succeeds because row now has 'deleted' key and delete_object
# succeeds
self.assertEqual(cs.container_failures, 1)
self.assertEqual(cs.container_skips, 0)
self.assertEqual(fcb.sync_point1, 1)
self.assertEqual(fcb.sync_point2, None)
sync.ContainerBroker = orig_ContainerBroker
sync.hash_path = orig_hash_path
sync.delete_object = orig_delete_object
def test_container_report(self):
container_stats = {'puts': 0,
'deletes': 0,
'bytes': 0}
def fake_container_sync_row(self, row, sync_to,
user_key, broker, info, realm, realm_key):
if 'deleted' in row:
container_stats['deletes'] += 1
return True
container_stats['puts'] += 1
container_stats['bytes'] += row['size']
return True
def fake_hash_path(account, container, obj, raw_digest=False):
# Ensures that no rows match for second loop, ordinal is 0 and
# all hashes are 1
return '\x01' * 16
fcb = FakeContainerBroker(
info={'account': 'a', 'container': 'c',
'storage_policy_index': 0,
'x_container_sync_point1': 5,
'x_container_sync_point2': -1},
metadata={'x-container-sync-to': ('', 1),
'x-container-sync-key': ('key', 1)},
items_since=[{'ROWID': 1, 'name': 'o1', 'size': 0,
'deleted': True},
{'ROWID': 2, 'name': 'o2', 'size': 1010},
{'ROWID': 3, 'name': 'o3', 'size': 0,
'deleted': True},
{'ROWID': 4, 'name': 'o4', 'size': 90},
{'ROWID': 5, 'name': 'o5', 'size': 0}])
with mock.patch('swift.container.sync.InternalClient'), \
fake_hash_path), \
lambda p: fcb):
cring = FakeRing()
cs = sync.ContainerSync({}, container_ring=cring,
cs.container_stats = container_stats
cs._myips = [''] # Match
cs._myport = 1000 # Match
cs.allowed_sync_hosts = ['']
funcType = type(sync.ContainerSync.container_sync_row)
cs.container_sync_row = funcType(fake_container_sync_row,
cs, sync.ContainerSync)
# Succeeds because no rows match
log_line = cs.logger.get_lines_for_level('info')[0]
lines = log_line.split(',')
self.assertTrue('sync_point2: 5', lines.pop().strip())
self.assertTrue('sync_point1: 5', lines.pop().strip())
self.assertTrue('bytes: 1100', lines.pop().strip())
self.assertTrue('deletes: 2', lines.pop().strip())
self.assertTrue('puts: 3', lines.pop().strip())
def test_container_sync_row_delete(self):
self._test_container_sync_row_delete(None, None)
def test_container_sync_row_delete_using_realms(self):
self._test_container_sync_row_delete('US', 'realm_key')
def _test_container_sync_row_delete(self, realm, realm_key):
orig_uuid = sync.uuid
orig_delete_object = sync.delete_object
class FakeUUID(object):
class uuid4(object):
hex = 'abcdef'
sync.uuid = FakeUUID
ts_data = Timestamp(1.1)
def fake_delete_object(path, name=None, headers=None, proxy=None,
logger=None, timeout=None):
self.assertEqual(path, 'http://sync/to/path')
self.assertEqual(name, 'object')
if realm:
self.assertEqual(headers, {
'US abcdef a2401ecb1256f469494a0abcb0eb62ffa73eca63',
'x-timestamp': ts_data.internal})
{'x-container-sync-key': 'key',
'x-timestamp': ts_data.internal})
self.assertEqual(proxy, 'http://proxy')
self.assertEqual(timeout, 5.0)
self.assertEqual(logger, self.logger)
sync.delete_object = fake_delete_object
with mock.patch('swift.container.sync.InternalClient'):
cs = sync.ContainerSync({}, container_ring=FakeRing(),
cs.http_proxies = ['http://proxy']
# Success.
# simulate a row with tombstone at 1.1 and later ctype, meta times
created_at = ts_data.internal + '+1388+1388' # last modified = 1.2
{'deleted': True,
'name': 'object',
'created_at': created_at,
'size': '1000'}, 'http://sync/to/path',
'key', FakeContainerBroker('broker'),
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key))
self.assertEqual(cs.container_deletes, 1)
exc = []
def fake_delete_object(*args, **kwargs):
exc.append(Exception('test exception'))
raise exc[-1]
sync.delete_object = fake_delete_object
# Failure because of delete_object exception
{'deleted': True,
'name': 'object',
'created_at': '1.2'}, 'http://sync/to/path',
'key', FakeContainerBroker('broker'),
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key))
self.assertEqual(cs.container_deletes, 1)
self.assertEqual(len(exc), 1)
self.assertEqual(str(exc[-1]), 'test exception')
def fake_delete_object(*args, **kwargs):
exc.append(ClientException('test client exception'))
raise exc[-1]
sync.delete_object = fake_delete_object
# Failure because of delete_object exception
{'deleted': True,
'name': 'object',
'created_at': '1.2'}, 'http://sync/to/path',
'key', FakeContainerBroker('broker'),
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key))
self.assertEqual(cs.container_deletes, 1)
self.assertEqual(len(exc), 2)
self.assertEqual(str(exc[-1]), 'test client exception')
def fake_delete_object(*args, **kwargs):
exc.append(ClientException('test client exception',
raise exc[-1]
sync.delete_object = fake_delete_object
# Success because the object wasn't even found
{'deleted': True,
'name': 'object',
'created_at': '1.2'}, 'http://sync/to/path',
'key', FakeContainerBroker('broker'),
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key))
self.assertEqual(cs.container_deletes, 2)
self.assertEqual(len(exc), 3)
self.assertEqual(str(exc[-1]), 'test client exception: 404')
sync.uuid = orig_uuid
sync.delete_object = orig_delete_object
def test_container_sync_row_put(self):
self._test_container_sync_row_put(None, None)
def test_container_sync_row_put_using_realms(self):
self._test_container_sync_row_put('US', 'realm_key')
def _test_container_sync_row_put(self, realm, realm_key):
orig_uuid = sync.uuid
orig_put_object = sync.put_object
orig_head_object = sync.head_object
class FakeUUID(object):
class uuid4(object):
hex = 'abcdef'
sync.uuid = FakeUUID
ts_data = Timestamp(1.1)
timestamp = Timestamp(1.2)
put_object_calls = []
def fake_put_object(*args, **kwargs):
put_object_calls.append((args, kwargs))
def check_put_object(extra_headers, sync_to, name=None,
headers=None, contents=None, proxy=None,
logger=None, timeout=None):
self.assertEqual(sync_to, 'http://sync/to/path')
self.assertEqual(name, 'object')
expected_headers = {
'x-timestamp': timestamp.internal,
'etag': 'etagvalue',
'other-header': 'other header value',
'content-type': 'text/plain'}
if realm:
'US abcdef a5fb3cf950738e6e3b364190e246bd7dd21dad3c'})
'x-container-sync-key': 'key'})
self.assertDictEqual(expected_headers, headers)
self.assertEqual(contents.read(), 'contents')
self.assertEqual(proxy, 'http://proxy')
self.assertEqual(timeout, 5.0)
self.assertEqual(logger, self.logger)
sync.put_object = fake_put_object
expected_put_count = 0
excepted_failure_count = 0
with mock.patch('swift.container.sync.InternalClient'):
cs = sync.ContainerSync({}, container_ring=FakeRing(),
cs.http_proxies = ['http://proxy']
def fake_get_object(acct, con, obj, headers, acceptable_statuses):
return (200,
{'other-header': 'other header value',
'etag': '"etagvalue"',
'x-timestamp': timestamp.internal,
'content-type': 'text/plain; swift_bytes=123'},
cs.swift.get_object = fake_get_object
# Success as everything says it worked.
# simulate a row with data at 1.1 and later ctype, meta times
created_at = ts_data.internal + '+1388+1388' # last modified = 1.2
def fake_object_in_rcontainer(row, sync_to, user_key,
broker, realm, realm_key):
return False
orig_object_in_rcontainer = cs._object_in_remote_container
cs._object_in_remote_container = fake_object_in_rcontainer
{'deleted': False,
'name': 'object',
'created_at': created_at,
'size': 50}, 'http://sync/to/path',
'key', FakeContainerBroker('broker'),
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key))
self.assertEqual(1, len(put_object_calls))
check_put_object({'etag': 'etagvalue'},
*put_object_calls[0][0], **put_object_calls[0][1])
expected_put_count += 1
self.assertEqual(cs.container_puts, expected_put_count)
def fake_get_object(acct, con, obj, headers, acceptable_statuses):
self.assertEqual(headers['X-Newest'], True)
return (200,
{'date': 'date value',
'last-modified': 'last modified value',
'x-timestamp': timestamp.internal,
'other-header': 'other header value',
'etag': '"etagvalue"',
'content-type': 'text/plain; swift_bytes=123'},
cs.swift.get_object = fake_get_object
# Success as everything says it worked, also checks 'date' and
# 'last-modified' headers are removed and that 'etag' header is
# stripped of double quotes.
put_object_calls = []
{'deleted': False,
'name': 'object',
'created_at': timestamp.internal,
'size': 60}, 'http://sync/to/path',
'key', FakeContainerBroker('broker'),
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key))
self.assertEqual(1, len(put_object_calls))
check_put_object({'etag': 'etagvalue'},
*put_object_calls[0][0], **put_object_calls[0][1])
expected_put_count += 1
self.assertEqual(cs.container_puts, expected_put_count)
# Success as everything says it worked, also check that PUT
# timestamp equals GET timestamp when it is newer than created_at
# value.
put_object_calls = []
{'deleted': False,
'name': 'object',
'created_at': '1.1',
'size': 60}, 'http://sync/to/path',
'key', FakeContainerBroker('broker'),
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key))
self.assertEqual(1, len(put_object_calls))
check_put_object({'etag': 'etagvalue'},
*put_object_calls[0][0], **put_object_calls[0][1])
expected_put_count += 1
self.assertEqual(cs.container_puts, expected_put_count)
def fake_get_object(acct, con, obj, headers, acceptable_statuses):
self.assertEqual(headers['X-Newest'], True)
return (200,
{'date': 'date value',
'last-modified': 'last modified value',
'x-timestamp': timestamp.internal,
'other-header': 'other header value',
'etag': '"etagvalue"',
'x-static-large-object': 'true',
'content-type': 'text/plain; swift_bytes=123'},
cs.swift.get_object = fake_get_object
# Success as everything says it worked, also check that etag
# header removed in case of SLO
put_object_calls = []
{'deleted': False,
'name': 'object',
'created_at': '1.1',
'size': 60}, 'http://sync/to/path',
'key', FakeContainerBroker('broker'),
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key))
self.assertEqual(1, len(put_object_calls))
check_put_object({'x-static-large-object': 'true'},
*put_object_calls[0][0], **put_object_calls[0][1])
expected_put_count += 1
self.assertEqual(cs.container_puts, expected_put_count)
exc = []
def fake_get_object(acct, con, obj, headers, acceptable_statuses):
self.assertEqual(headers['X-Newest'], True)
exc.append(Exception('test exception'))
raise exc[-1]
cs.swift.get_object = fake_get_object
# Fail due to completely unexpected exception
{'deleted': False,
'name': 'object',
'created_at': timestamp.internal,
'size': 70}, 'http://sync/to/path',
'key', FakeContainerBroker('broker'),
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key))
self.assertEqual(cs.container_puts, expected_put_count)
excepted_failure_count += 1
self.assertEqual(len(exc), 1)
self.assertEqual(str(exc[-1]), 'test exception')
exc = []
def fake_get_object(acct, con, obj, headers, acceptable_statuses):
self.assertEqual(headers['X-Newest'], True)
exc.append(ClientException('test client exception'))
raise exc[-1]
cs.swift.get_object = fake_get_object
# Fail due to all direct_get_object calls failing
{'deleted': False,
'name': 'object',
'created_at': timestamp.internal,
'size': 80}, 'http://sync/to/path',
'key', FakeContainerBroker('broker'),
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key))
self.assertEqual(cs.container_puts, expected_put_count)
excepted_failure_count += 1
self.assertEqual(len(exc), 1)
self.assertEqual(str(exc[-1]), 'test client exception')
def fake_get_object(acct, con, obj, headers, acceptable_statuses):
self.assertEqual(headers['X-Newest'], True)
return (200, {'other-header': 'other header value',
'x-timestamp': timestamp.internal,
'etag': '"etagvalue"'},
def fake_put_object(*args, **kwargs):
raise ClientException('test client exception', http_status=401)
cs.swift.get_object = fake_get_object
sync.put_object = fake_put_object
# Fail due to 401
{'deleted': False,
'name': 'object',
'created_at': timestamp.internal,
'size': 90}, 'http://sync/to/path',
'key', FakeContainerBroker('broker'),
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key))
self.assertEqual(cs.container_puts, expected_put_count)
excepted_failure_count += 1
self.assertEqual(cs.container_failures, excepted_failure_count)
self.assertLogMessage('info', 'Unauth')
def fake_put_object(*args, **kwargs):
raise ClientException('test client exception', http_status=404)
sync.put_object = fake_put_object
# Fail due to 404
{'deleted': False,
'name': 'object',
'created_at': timestamp.internal,
'size': 50}, 'http://sync/to/path',
'key', FakeContainerBroker('broker'),
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key))
self.assertEqual(cs.container_puts, expected_put_count)
excepted_failure_count += 1
self.assertEqual(cs.container_failures, excepted_failure_count)
self.assertLogMessage('info', 'Not found', 1)
def fake_put_object(*args, **kwargs):
raise ClientException('test client exception', http_status=503)
sync.put_object = fake_put_object
# Fail due to 503
{'deleted': False,
'name': 'object',
'created_at': timestamp.internal,
'size': 50}, 'http://sync/to/path',
'key', FakeContainerBroker('broker'),
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key))
self.assertEqual(cs.container_puts, expected_put_count)
excepted_failure_count += 1
self.assertEqual(cs.container_failures, excepted_failure_count)
self.assertLogMessage('error', 'ERROR Syncing')
# Test the following cases:
# remote has the same date and a put doesn't take place
# remote has more up to date copy and a put doesn't take place
# head_object returns ClientException(404) and a put takes place
# head_object returns other ClientException put doesn't take place
# and we get failure
# head_object returns other Exception put does not take place
# and we get failure
# remote returns old copy and a put takes place
test_row = {'deleted': False,
'name': 'object',
'created_at': timestamp.internal,
'etag': '1111',
'size': 10}
test_info = {'account': 'a',
'container': 'c',
'storage_policy_index': 0}
actual_puts = []
def fake_put_object(*args, **kwargs):
actual_puts.append((args, kwargs))
def fake_head_object(*args, **kwargs):
return ({'x-timestamp': '1.2'}, '')
sync.put_object = fake_put_object
sync.head_object = fake_head_object
cs._object_in_remote_container = orig_object_in_rcontainer
test_row, 'http://sync/to/path',
'key', FakeContainerBroker('broker'),
realm, realm_key))
# No additional put has taken place
self.assertEqual(len(actual_puts), 0)
# No additional errors
self.assertEqual(cs.container_failures, excepted_failure_count)
def fake_head_object(*args, **kwargs):
return ({'x-timestamp': '1.3'}, '')
sync.head_object = fake_head_object
test_row, 'http://sync/to/path',
'key', FakeContainerBroker('broker'),
realm, realm_key))
# No additional put has taken place
self.assertEqual(len(actual_puts), 0)
# No additional errors
self.assertEqual(cs.container_failures, excepted_failure_count)
actual_puts = []
def fake_head_object(*args, **kwargs):
raise ClientException('test client exception', http_status=404)
sync.head_object = fake_head_object
test_row, 'http://sync/to/path',
'key', FakeContainerBroker('broker'),
test_info, realm, realm_key))
# Additional put has taken place
self.assertEqual(len(actual_puts), 1)
# No additional errors
self.assertEqual(cs.container_failures, excepted_failure_count)
def fake_head_object(*args, **kwargs):
raise ClientException('test client exception', http_status=401)
sync.head_object = fake_head_object
test_row, 'http://sync/to/path',
'key', FakeContainerBroker('broker'),
test_info, realm, realm_key))
# No additional put has taken place, failures increased
self.assertEqual(len(actual_puts), 1)
excepted_failure_count += 1
self.assertEqual(cs.container_failures, excepted_failure_count)
def fake_head_object(*args, **kwargs):
raise Exception()
sync.head_object = fake_head_object
'key', FakeContainerBroker('broker'),
test_info, realm, realm_key))
# No additional put has taken place, failures increased
self.assertEqual(len(actual_puts), 1)
excepted_failure_count += 1
self.assertEqual(cs.container_failures, excepted_failure_count)
def fake_head_object(*args, **kwargs):
return ({'x-timestamp': '1.1'}, '')
sync.head_object = fake_head_object
test_row, 'http://sync/to/path',
'key', FakeContainerBroker('broker'),
test_info, realm, realm_key))
# Additional put has taken place
self.assertEqual(len(actual_puts), 2)
# No additional errors
self.assertEqual(cs.container_failures, excepted_failure_count)
sync.uuid = orig_uuid
sync.put_object = orig_put_object
sync.head_object = orig_head_object
def test_select_http_proxy_None(self):
with mock.patch('swift.container.sync.InternalClient'):
cs = sync.ContainerSync(
{'sync_proxy': ''}, container_ring=FakeRing())
self.assertEqual(cs.select_http_proxy(), None)
def test_select_http_proxy_one(self):
with mock.patch('swift.container.sync.InternalClient'):
cs = sync.ContainerSync(
{'sync_proxy': 'http://one'}, container_ring=FakeRing())
self.assertEqual(cs.select_http_proxy(), 'http://one')
def test_select_http_proxy_multiple(self):
with mock.patch('swift.container.sync.InternalClient'):
cs = sync.ContainerSync(
{'sync_proxy': 'http://one,http://two,http://three'},
set(['http://one', 'http://two', 'http://three']))
if __name__ == '__main__':