consync: Now queries all primary nodes for a put and uses the newest object if it is newer or equal to the object to sync

This commit is contained in:
gholt 2011-06-16 01:31:51 +00:00
parent 78d417dda7
commit f9fb4c739c
2 changed files with 22 additions and 14 deletions

View File

@ -344,8 +344,8 @@ class ContainerSync(Daemon):
if row['deleted']:
try:
delete_object(sync_to, name=row['name'],
headers={'X-Timestamp': row['created_at'],
'X-Container-Sync-Key': sync_key},
headers={'x-timestamp': row['created_at'],
'x-container-sync-key': sync_key},
proxy=self.proxy)
except ClientException, err:
if err.http_status != 404:
@ -357,15 +357,22 @@ class ContainerSync(Daemon):
row['name'])
shuffle(nodes)
exc = None
looking_for_timestamp = float(row['created_at'])
timestamp = -1
headers = body = None
for node in nodes:
try:
headers, body = direct_get_object(node, part,
info['account'], info['container'], row['name'],
resp_chunk_size=65536)
break
these_headers, this_body = direct_get_object(node,
part, info['account'], info['container'],
row['name'], resp_chunk_size=65536)
this_timestamp = float(these_headers['x-timestamp'])
if this_timestamp > timestamp:
timestamp = this_timestamp
headers = these_headers
body = this_body
except ClientException, err:
exc = err
else:
if timestamp < looking_for_timestamp:
if exc:
raise exc
raise Exception(_('Unknown exception trying to GET: '
@ -379,8 +386,8 @@ class ContainerSync(Daemon):
del headers[key]
if 'etag' in headers:
headers['etag'] = headers['etag'].strip('"')
headers['X-Timestamp'] = row['created_at']
headers['X-Container-Sync-Key'] = sync_key
headers['x-timestamp'] = row['created_at']
headers['x-container-sync-key'] = sync_key
put_object(sync_to, name=row['name'], headers=headers,
contents=_Iter2FileLikeObject(body), proxy=self.proxy)
self.container_puts += 1

View File

@ -610,7 +610,7 @@ class TestContainerSync(unittest.TestCase):
self.assertEquals(path, 'http://sync/to/path')
self.assertEquals(name, 'object')
self.assertEquals(headers,
{'X-Container-Sync-Key': 'key', 'X-Timestamp': '1.2'})
{'x-container-sync-key': 'key', 'x-timestamp': '1.2'})
self.assertEquals(proxy, 'http://proxy')
sync.delete_object = fake_delete_object
@ -678,8 +678,8 @@ class TestContainerSync(unittest.TestCase):
contents=None, proxy=None):
self.assertEquals(sync_to, 'http://sync/to/path')
self.assertEquals(name, 'object')
self.assertEquals(headers, {'X-Container-Sync-Key': 'key',
'X-Timestamp': '1.2',
self.assertEquals(headers, {'x-container-sync-key': 'key',
'x-timestamp': '1.2',
'other-header': 'other header value',
'etag': 'etagvalue'})
self.assertEquals(contents.read(), 'contents')
@ -694,7 +694,7 @@ class TestContainerSync(unittest.TestCase):
def fake_direct_get_object(node, part, account, container, obj,
resp_chunk_size=1):
return ({'other-header': 'other header value',
'etag': '"etagvalue"'},
'etag': '"etagvalue"', 'x-timestamp': '1.2'},
iter('contents'))
sync.direct_get_object = fake_direct_get_object
@ -709,6 +709,7 @@ class TestContainerSync(unittest.TestCase):
resp_chunk_size=1):
return ({'date': 'date value',
'last-modified': 'last modified value',
'x-timestamp': '1.2',
'other-header': 'other header value',
'etag': '"etagvalue"'},
iter('contents'))
@ -758,7 +759,7 @@ class TestContainerSync(unittest.TestCase):
def fake_direct_get_object(node, part, account, container, obj,
resp_chunk_size=1):
return ({'other-header': 'other header value',
'etag': '"etagvalue"'},
'x-timestamp': '1.2', 'etag': '"etagvalue"'},
iter('contents'))
def fake_put_object(sync_to, name=None, headers=None,