Merge "Fix time-shifting of objects PUT with container-sync"
This commit is contained in:
commit
5fc3ad7b53
@ -170,7 +170,7 @@ class ContainerSync(Daemon):
|
||||
#: running wild on near empty systems.
|
||||
self.interval = int(conf.get('interval', 300))
|
||||
#: Maximum amount of time to spend syncing a container before moving on
|
||||
#: to the next one. If a conatiner sync hasn't finished in this time,
|
||||
#: to the next one. If a container sync hasn't finished in this time,
|
||||
#: it'll just be resumed next scan.
|
||||
self.container_time = int(conf.get('container_time', 60))
|
||||
#: ContainerSyncCluster instance for validating sync-to values.
|
||||
@ -463,27 +463,22 @@ class ContainerSync(Daemon):
|
||||
shuffle(nodes)
|
||||
exc = None
|
||||
looking_for_timestamp = Timestamp(row['created_at'])
|
||||
timestamp = -1
|
||||
headers = body = None
|
||||
# look up for the newest one
|
||||
headers_out = {'X-Newest': True,
|
||||
'X-Backend-Storage-Policy-Index':
|
||||
str(info['storage_policy_index'])}
|
||||
try:
|
||||
source_obj_status, source_obj_info, source_obj_iter = \
|
||||
source_obj_status, headers, body = \
|
||||
self.swift.get_object(info['account'],
|
||||
info['container'], row['name'],
|
||||
headers=headers_out,
|
||||
acceptable_statuses=(2, 4))
|
||||
|
||||
except (Exception, UnexpectedResponse, Timeout) as err:
|
||||
source_obj_info = {}
|
||||
source_obj_iter = None
|
||||
headers = {}
|
||||
body = None
|
||||
exc = err
|
||||
timestamp = Timestamp(source_obj_info.get(
|
||||
'x-timestamp', 0))
|
||||
headers = source_obj_info
|
||||
body = source_obj_iter
|
||||
timestamp = Timestamp(headers.get('x-timestamp', 0))
|
||||
if timestamp < looking_for_timestamp:
|
||||
if exc:
|
||||
raise exc
|
||||
@ -501,7 +496,6 @@ class ContainerSync(Daemon):
|
||||
if 'content-type' in headers:
|
||||
headers['content-type'] = clean_content_type(
|
||||
headers['content-type'])
|
||||
headers['x-timestamp'] = row['created_at']
|
||||
if realm and realm_key:
|
||||
nonce = uuid.uuid4().hex
|
||||
path = urlparse(sync_to).path + '/' + quote(row['name'])
|
||||
|
@ -22,6 +22,7 @@ from swiftclient import client, ClientException
|
||||
|
||||
from swift.common.http import HTTP_NOT_FOUND
|
||||
from swift.common.manager import Manager
|
||||
from test.probe.brain import BrainSplitter
|
||||
from test.probe.common import ReplProbeTest, ENABLED_POLICIES
|
||||
|
||||
|
||||
@ -149,5 +150,70 @@ class TestContainerSync(ReplProbeTest):
|
||||
dest_container, object_name)
|
||||
self.assertEqual(body, 'test-body')
|
||||
|
||||
def test_sync_with_stale_container_rows(self):
|
||||
source_container, dest_container = self._setup_synced_containers()
|
||||
brain = BrainSplitter(self.url, self.token, source_container,
|
||||
None, 'container')
|
||||
|
||||
# upload to source
|
||||
object_name = 'object-%s' % uuid.uuid4()
|
||||
client.put_object(self.url, self.token, source_container, object_name,
|
||||
'test-body')
|
||||
|
||||
# check source container listing
|
||||
_, listing = client.get_container(
|
||||
self.url, self.token, source_container)
|
||||
for expected_obj_dict in listing:
|
||||
if expected_obj_dict['name'] == object_name:
|
||||
break
|
||||
else:
|
||||
self.fail('Failed to find source object %r in container listing %r'
|
||||
% (object_name, listing))
|
||||
|
||||
# stop all container servers
|
||||
brain.stop_primary_half()
|
||||
brain.stop_handoff_half()
|
||||
|
||||
# upload new object content to source - container updates will fail
|
||||
client.put_object(self.url, self.token, source_container, object_name,
|
||||
'new-test-body')
|
||||
source_headers = client.head_object(
|
||||
self.url, self.token, source_container, object_name)
|
||||
|
||||
# start all container servers
|
||||
brain.start_primary_half()
|
||||
brain.start_handoff_half()
|
||||
|
||||
# sanity check: source container listing should not have changed
|
||||
_, listing = client.get_container(
|
||||
self.url, self.token, source_container)
|
||||
for actual_obj_dict in listing:
|
||||
if actual_obj_dict['name'] == object_name:
|
||||
self.assertDictEqual(expected_obj_dict, actual_obj_dict)
|
||||
break
|
||||
else:
|
||||
self.fail('Failed to find source object %r in container listing %r'
|
||||
% (object_name, listing))
|
||||
|
||||
# cycle container-sync - object should be correctly sync'd despite
|
||||
# stale info in container row
|
||||
Manager(['container-sync']).once()
|
||||
|
||||
# verify sync'd object has same content and headers
|
||||
dest_headers, body = client.get_object(self.url, self.token,
|
||||
dest_container, object_name)
|
||||
self.assertEqual(body, 'new-test-body')
|
||||
mismatched_headers = []
|
||||
for k in ('etag', 'content-length', 'content-type', 'x-timestamp',
|
||||
'last-modified'):
|
||||
if source_headers[k] == dest_headers[k]:
|
||||
continue
|
||||
mismatched_headers.append((k, source_headers[k], dest_headers[k]))
|
||||
if mismatched_headers:
|
||||
msg = '\n'.join([('Mismatched header %r, expected %r but got %r'
|
||||
% item) for item in mismatched_headers])
|
||||
self.fail(msg)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
@ -885,6 +885,7 @@ class TestContainerSync(unittest.TestCase):
|
||||
self.assertEqual(logger, self.logger)
|
||||
|
||||
sync.put_object = fake_put_object
|
||||
expected_put_count = 0
|
||||
|
||||
with mock.patch('swift.container.sync.InternalClient'):
|
||||
cs = sync.ContainerSync({}, container_ring=FakeRing(),
|
||||
@ -909,7 +910,8 @@ class TestContainerSync(unittest.TestCase):
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
self.assertEqual(cs.container_puts, 1)
|
||||
expected_put_count += 1
|
||||
self.assertEqual(cs.container_puts, expected_put_count)
|
||||
|
||||
def fake_get_object(acct, con, obj, headers, acceptable_statuses):
|
||||
self.assertEqual(headers['X-Newest'], True)
|
||||
@ -935,7 +937,21 @@ class TestContainerSync(unittest.TestCase):
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
self.assertEqual(cs.container_puts, 2)
|
||||
expected_put_count += 1
|
||||
self.assertEqual(cs.container_puts, expected_put_count)
|
||||
|
||||
# Success as everything says it worked, also check that PUT
|
||||
# timestamp equals GET timestamp when it is newer than created_at
|
||||
# value.
|
||||
self.assertTrue(cs.container_sync_row(
|
||||
{'deleted': False,
|
||||
'name': 'object',
|
||||
'created_at': '1.1'}, 'http://sync/to/path',
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
expected_put_count += 1
|
||||
self.assertEqual(cs.container_puts, expected_put_count)
|
||||
|
||||
exc = []
|
||||
|
||||
@ -955,7 +971,7 @@ class TestContainerSync(unittest.TestCase):
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
self.assertEqual(cs.container_puts, 2)
|
||||
self.assertEqual(cs.container_puts, expected_put_count)
|
||||
self.assertEqual(len(exc), 1)
|
||||
self.assertEqual(str(exc[-1]), 'test exception')
|
||||
|
||||
@ -978,7 +994,7 @@ class TestContainerSync(unittest.TestCase):
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
self.assertEqual(cs.container_puts, 2)
|
||||
self.assertEqual(cs.container_puts, expected_put_count)
|
||||
self.assertEqual(len(exc), 1)
|
||||
self.assertEqual(str(exc[-1]), 'test client exception')
|
||||
|
||||
@ -1003,7 +1019,7 @@ class TestContainerSync(unittest.TestCase):
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
self.assertEqual(cs.container_puts, 2)
|
||||
self.assertEqual(cs.container_puts, expected_put_count)
|
||||
self.assertLogMessage('info', 'Unauth')
|
||||
|
||||
def fake_put_object(*args, **kwargs):
|
||||
@ -1018,7 +1034,7 @@ class TestContainerSync(unittest.TestCase):
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
self.assertEqual(cs.container_puts, 2)
|
||||
self.assertEqual(cs.container_puts, expected_put_count)
|
||||
self.assertLogMessage('info', 'Not found', 1)
|
||||
|
||||
def fake_put_object(*args, **kwargs):
|
||||
@ -1033,7 +1049,7 @@ class TestContainerSync(unittest.TestCase):
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
self.assertEqual(cs.container_puts, 2)
|
||||
self.assertEqual(cs.container_puts, expected_put_count)
|
||||
self.assertLogMessage('error', 'ERROR Syncing')
|
||||
finally:
|
||||
sync.uuid = orig_uuid
|
||||
|
Loading…
x
Reference in New Issue
Block a user