Container-Sync to perform HEAD before PUT object on remote

This change adds a remote HEAD object request before each call to
sync_row.
Currently, container-sync-row attempts to replicate the object
(using PUT) regardless of the existance of the object on the remote side,
thus causing each object to be transferred on the wire several times
(depending on the replication factor)

An alternative to HEAD is to do a conditional PUT (using, 100-continue).
However, this change is more involved and requires upgrade of both the
client and server side clusters to work.
In the Tokyo design summit it was decided to start with the HEAD approach.

Change-Id: I60d982dd2cc79a0f13b0924507cd03d7f9c9d70b
Closes-Bug: #1277223
This commit is contained in:
OSHRITF 2016-01-20 15:55:30 +02:00 committed by Alistair Coles
parent d9a4f18b49
commit 125d18e0ff
5 changed files with 326 additions and 90 deletions

View File

@ -769,6 +769,7 @@ class SimpleClient(object):
req.get_method = lambda: method
conn = urllib2.urlopen(req, timeout=timeout)
body = conn.read()
info = conn.info()
try:
body_data = json.loads(body)
except ValueError:
@ -792,13 +793,13 @@ class SimpleClient(object):
url,
conn.getcode(),
sent_content_length,
conn.info()['content-length'],
info['content-length'],
trans_start,
trans_stop,
trans_stop - trans_start,
additional_info
)))
return [None, body_data]
return [info, body_data]
def retry_request(self, method, **kwargs):
retries = kwargs.pop('retries', self.retries)
@ -837,6 +838,12 @@ class SimpleClient(object):
contents=contents.read(), **kwargs)
def head_object(url, **kwargs):
"""For usage with container sync """
client = SimpleClient(url=url)
return client.retry_request('HEAD', **kwargs)
def put_object(url, **kwargs):
"""For usage with container sync """
client = SimpleClient(url=url)

View File

@ -29,7 +29,8 @@ from swift.container.backend import ContainerBroker
from swift.container.sync_store import ContainerSyncStore
from swift.common.container_sync_realms import ContainerSyncRealms
from swift.common.internal_client import (
delete_object, put_object, InternalClient, UnexpectedResponse)
delete_object, put_object, head_object,
InternalClient, UnexpectedResponse)
from swift.common.exceptions import ClientException
from swift.common.ring import Ring
from swift.common.ring.utils import is_local_device
@ -396,10 +397,84 @@ class ContainerSync(Daemon):
self.logger.exception(_('ERROR Syncing %s'),
broker if broker else path)
def _update_sync_to_headers(self, name, sync_to, user_key,
realm, realm_key, method, headers):
"""
Updates container sync headers
:param name: The name of the object
:param sync_to: The URL to the remote container.
:param user_key: The X-Container-Sync-Key to use when sending requests
to the other container.
:param realm: The realm from self.realms_conf, if there is one.
If None, fallback to using the older allowed_sync_hosts
way of syncing.
:param realm_key: The realm key from self.realms_conf, if there
is one. If None, fallback to using the older
allowed_sync_hosts way of syncing.
:param method: HTTP method to create sig with
:param headers: headers to update with container sync headers
"""
if realm and realm_key:
nonce = uuid.uuid4().hex
path = urlparse(sync_to).path + '/' + quote(name)
sig = self.realms_conf.get_sig(method, path,
headers.get('x-timestamp', 0),
nonce, realm_key,
user_key)
headers['x-container-sync-auth'] = '%s %s %s' % (realm,
nonce,
sig)
else:
headers['x-container-sync-key'] = user_key
def _object_in_remote_container(self, name, sync_to, user_key,
realm, realm_key, timestamp):
"""
Performs head object on remote to eliminate extra remote put and
local get object calls
:param name: The name of the object in the updated row in the local
database triggering the sync update.
:param sync_to: The URL to the remote container.
:param user_key: The X-Container-Sync-Key to use when sending requests
to the other container.
:param realm: The realm from self.realms_conf, if there is one.
If None, fallback to using the older allowed_sync_hosts
way of syncing.
:param realm_key: The realm key from self.realms_conf, if there
is one. If None, fallback to using the older
allowed_sync_hosts way of syncing.
:param timestamp: last modified date of local object
:returns: True if object already exists in remote
"""
headers = {'x-timestamp': timestamp.internal}
self._update_sync_to_headers(name, sync_to, user_key, realm,
realm_key, 'HEAD', headers)
try:
metadata, _ = head_object(sync_to, name=name,
headers=headers,
proxy=self.select_http_proxy(),
logger=self.logger,
retries=0)
remote_ts = Timestamp(metadata.get('x-timestamp', 0))
self.logger.debug("remote obj timestamp %s local obj %s" %
(timestamp.internal, remote_ts.internal))
if timestamp <= remote_ts:
return True
# Object in remote should be updated
return False
except ClientException as http_err:
# Object not in remote
if http_err.http_status == 404:
return False
raise http_err
def container_sync_row(self, row, sync_to, user_key, broker, info,
realm, realm_key):
"""
Sends the update the row indicates to the sync_to container.
Update can be either delete or put.
:param row: The updated row in the local database triggering the sync
update.
@ -427,17 +502,9 @@ class ContainerSync(Daemon):
# timestamp of the source tombstone
try:
headers = {'x-timestamp': ts_data.internal}
if realm and realm_key:
nonce = uuid.uuid4().hex
path = urlparse(sync_to).path + '/' + quote(
row['name'])
sig = self.realms_conf.get_sig(
'DELETE', path, headers['x-timestamp'], nonce,
realm_key, user_key)
headers['x-container-sync-auth'] = '%s %s %s' % (
realm, nonce, sig)
else:
headers['x-container-sync-key'] = user_key
self._update_sync_to_headers(row['name'], sync_to,
user_key, realm, realm_key,
'DELETE', headers)
delete_object(sync_to, name=row['name'], headers=headers,
proxy=self.select_http_proxy(),
logger=self.logger,
@ -451,6 +518,10 @@ class ContainerSync(Daemon):
else:
# when sync'ing a live object, use ts_meta - this is the time
# at which the source object was last modified by a PUT or POST
if self._object_in_remote_container(row['name'],
sync_to, user_key, realm,
realm_key, ts_meta):
return True
exc = None
# look up for the newest one
headers_out = {'X-Newest': True,
@ -485,16 +556,8 @@ class ContainerSync(Daemon):
if 'content-type' in headers:
headers['content-type'] = clean_content_type(
headers['content-type'])
if realm and realm_key:
nonce = uuid.uuid4().hex
path = urlparse(sync_to).path + '/' + quote(row['name'])
sig = self.realms_conf.get_sig(
'PUT', path, headers['x-timestamp'], nonce, realm_key,
user_key)
headers['x-container-sync-auth'] = '%s %s %s' % (
realm, nonce, sig)
else:
headers['x-container-sync-key'] = user_key
self._update_sync_to_headers(row['name'], sync_to, user_key,
realm, realm_key, 'PUT', headers)
put_object(sync_to, name=row['name'], headers=headers,
contents=FileLikeIter(body),
proxy=self.select_http_proxy(), logger=self.logger,

View File

@ -266,6 +266,26 @@ class TestContainerSync(ReplProbeTest):
% item) for item in mismatched_headers])
self.fail(msg)
def test_sync_newer_remote(self):
source_container, dest_container = self._setup_synced_containers()
# upload to source
object_name = 'object-%s' % uuid.uuid4()
client.put_object(self.url, self.token, source_container, object_name,
'old-source-body')
# upload to dest with same name
client.put_object(self.url, self.token, dest_container, object_name,
'new-test-body')
# cycle container-sync
Manager(['container-sync']).once()
# verify that the remote object did not change
resp_headers, body = client.get_object(self.url, self.token,
dest_container, object_name)
self.assertEqual(body, 'new-test-body')
if __name__ == "__main__":
unittest.main()

View File

@ -343,6 +343,9 @@ class TestInternalClient(unittest.TestCase):
def read(self):
return json.dumps(body)
def info(self):
return {}
for timeout in (0.0, 42.0, None):
mocked_func = 'swift.common.internal_client.urllib2.urlopen'
with mock.patch(mocked_func) as mock_urlopen:
@ -1181,76 +1184,84 @@ class TestGetAuth(unittest.TestCase):
'http://127.0.0.1', 'user', 'key', auth_version=2.0)
mock_time_value = 1401224049.98
def mock_time():
global mock_time_value
mock_time_value += 1
return mock_time_value
class TestSimpleClient(unittest.TestCase):
def _test_get_head(self, request, urlopen, method):
mock_time_value = [1401224049.98]
def mock_time():
# global mock_time_value
mock_time_value[0] += 1
return mock_time_value[0]
with mock.patch('swift.common.internal_client.time', mock_time):
# basic request, only url as kwarg
request.return_value.get_type.return_value = "http"
urlopen.return_value.read.return_value = ''
urlopen.return_value.getcode.return_value = 200
urlopen.return_value.info.return_value = {'content-length': '345'}
sc = internal_client.SimpleClient(url='http://127.0.0.1')
logger = FakeLogger()
retval = sc.retry_request(
method, headers={'content-length': '123'}, logger=logger)
self.assertEqual(urlopen.call_count, 1)
request.assert_called_with('http://127.0.0.1?format=json',
headers={'content-length': '123'},
data=None)
self.assertEqual([{'content-length': '345'}, None], retval)
self.assertEqual(method, request.return_value.get_method())
self.assertEqual(logger.log_dict['debug'], [(
('-> 2014-05-27T20:54:11 ' + method +
' http://127.0.0.1%3Fformat%3Djson 200 '
'123 345 1401224050.98 1401224051.98 1.0 -',), {})])
# Check if JSON is decoded
urlopen.return_value.read.return_value = '{}'
retval = sc.retry_request(method)
self.assertEqual([{'content-length': '345'}, {}], retval)
# same as above, now with token
sc = internal_client.SimpleClient(url='http://127.0.0.1',
token='token')
retval = sc.retry_request(method)
request.assert_called_with('http://127.0.0.1?format=json',
headers={'X-Auth-Token': 'token'},
data=None)
self.assertEqual([{'content-length': '345'}, {}], retval)
# same as above, now with prefix
sc = internal_client.SimpleClient(url='http://127.0.0.1',
token='token')
retval = sc.retry_request(method, prefix="pre_")
request.assert_called_with(
'http://127.0.0.1?format=json&prefix=pre_',
headers={'X-Auth-Token': 'token'}, data=None)
self.assertEqual([{'content-length': '345'}, {}], retval)
# same as above, now with container name
retval = sc.retry_request(method, container='cont')
request.assert_called_with('http://127.0.0.1/cont?format=json',
headers={'X-Auth-Token': 'token'},
data=None)
self.assertEqual([{'content-length': '345'}, {}], retval)
# same as above, now with object name
retval = sc.retry_request(method, container='cont', name='obj')
request.assert_called_with('http://127.0.0.1/cont/obj',
headers={'X-Auth-Token': 'token'},
data=None)
self.assertEqual([{'content-length': '345'}, {}], retval)
@mock.patch('eventlet.green.urllib2.urlopen')
@mock.patch('eventlet.green.urllib2.Request')
@mock.patch('swift.common.internal_client.time', mock_time)
def test_get(self, request, urlopen):
# basic GET request, only url as kwarg
request.return_value.get_type.return_value = "http"
urlopen.return_value.read.return_value = ''
urlopen.return_value.getcode.return_value = 200
urlopen.return_value.info.return_value = {'content-length': '345'}
sc = internal_client.SimpleClient(url='http://127.0.0.1')
logger = FakeLogger()
retval = sc.retry_request(
'GET', headers={'content-length': '123'}, logger=logger)
self.assertEqual(urlopen.call_count, 1)
request.assert_called_with('http://127.0.0.1?format=json',
headers={'content-length': '123'},
data=None)
self.assertEqual([None, None], retval)
self.assertEqual('GET', request.return_value.get_method())
self.assertEqual(logger.log_dict['debug'], [(
('-> 2014-05-27T20:54:11 GET http://127.0.0.1%3Fformat%3Djson 200 '
'123 345 1401224050.98 1401224051.98 1.0 -',), {})])
self._test_get_head(request, urlopen, 'GET')
# Check if JSON is decoded
urlopen.return_value.read.return_value = '{}'
retval = sc.retry_request('GET')
self.assertEqual([None, {}], retval)
# same as above, now with token
sc = internal_client.SimpleClient(url='http://127.0.0.1',
token='token')
retval = sc.retry_request('GET')
request.assert_called_with('http://127.0.0.1?format=json',
headers={'X-Auth-Token': 'token'},
data=None)
self.assertEqual([None, {}], retval)
# same as above, now with prefix
sc = internal_client.SimpleClient(url='http://127.0.0.1',
token='token')
retval = sc.retry_request('GET', prefix="pre_")
request.assert_called_with('http://127.0.0.1?format=json&prefix=pre_',
headers={'X-Auth-Token': 'token'},
data=None)
self.assertEqual([None, {}], retval)
# same as above, now with container name
retval = sc.retry_request('GET', container='cont')
request.assert_called_with('http://127.0.0.1/cont?format=json',
headers={'X-Auth-Token': 'token'},
data=None)
self.assertEqual([None, {}], retval)
# same as above, now with object name
retval = sc.retry_request('GET', container='cont', name='obj')
request.assert_called_with('http://127.0.0.1/cont/obj',
headers={'X-Auth-Token': 'token'},
data=None)
self.assertEqual([None, {}], retval)
@mock.patch('eventlet.green.urllib2.urlopen')
@mock.patch('eventlet.green.urllib2.Request')
def test_head(self, request, urlopen):
self._test_get_head(request, urlopen, 'HEAD')
@mock.patch('eventlet.green.urllib2.urlopen')
@mock.patch('eventlet.green.urllib2.Request')
@ -1272,6 +1283,7 @@ class TestSimpleClient(unittest.TestCase):
request.return_value.get_type.return_value = "http"
mock_resp = mock.MagicMock()
mock_resp.read.return_value = ''
mock_resp.info.return_value = {}
urlopen.side_effect = [urllib2.URLError(''), mock_resp]
sc = internal_client.SimpleClient(url='http://127.0.0.1', retries=1,
token='token')
@ -1283,13 +1295,14 @@ class TestSimpleClient(unittest.TestCase):
self.assertEqual(urlopen.call_count, 2)
request.assert_called_with('http://127.0.0.1?format=json', data=None,
headers={'X-Auth-Token': 'token'})
self.assertEqual([None, None], retval)
self.assertEqual([{}, None], retval)
self.assertEqual(sc.attempts, 2)
@mock.patch('eventlet.green.urllib2.urlopen')
def test_get_with_retries_param(self, mock_urlopen):
mock_response = mock.MagicMock()
mock_response.read.return_value = ''
mock_response.info.return_value = {}
mock_urlopen.side_effect = internal_client.httplib.BadStatusLine('')
c = internal_client.SimpleClient(url='http://127.0.0.1', token='token')
self.assertEqual(c.retries, 5)
@ -1315,7 +1328,7 @@ class TestSimpleClient(unittest.TestCase):
retval = c.retry_request('GET', retries=1)
self.assertEqual(mock_sleep.call_count, 1)
self.assertEqual(mock_urlopen.call_count, 2)
self.assertEqual([None, None], retval)
self.assertEqual([{}, None], retval)
@mock.patch('eventlet.green.urllib2.urlopen')
def test_request_with_retries_with_HTTPError(self, mock_urlopen):
@ -1380,9 +1393,13 @@ class TestSimpleClient(unittest.TestCase):
url = 'https://127.0.0.1:1/a'
class FakeConn(object):
def read(self):
return 'irrelevant'
def info(self):
return {}
mocked = 'swift.common.internal_client.urllib2.urlopen'
# module level methods

View File

@ -855,6 +855,8 @@ class TestContainerSync(unittest.TestCase):
def _test_container_sync_row_put(self, realm, realm_key):
orig_uuid = sync.uuid
orig_put_object = sync.put_object
orig_head_object = sync.head_object
try:
class FakeUUID(object):
class uuid4(object):
@ -891,6 +893,7 @@ class TestContainerSync(unittest.TestCase):
sync.put_object = fake_put_object
expected_put_count = 0
excepted_failure_count = 0
with mock.patch('swift.container.sync.InternalClient'):
cs = sync.ContainerSync({}, container_ring=FakeRing(),
@ -911,6 +914,14 @@ class TestContainerSync(unittest.TestCase):
# Success as everything says it worked.
# simulate a row with data at 1.1 and later ctype, meta times
created_at = ts_data.internal + '+1388+1388' # last modified = 1.2
def fake_object_in_rcontainer(row, sync_to, user_key,
broker, realm, realm_key):
return False
orig_object_in_rcontainer = cs._object_in_remote_container
cs._object_in_remote_container = fake_object_in_rcontainer
self.assertTrue(cs.container_sync_row(
{'deleted': False,
'name': 'object',
@ -935,6 +946,7 @@ class TestContainerSync(unittest.TestCase):
iter('contents'))
cs.swift.get_object = fake_get_object
# Success as everything says it worked, also checks 'date' and
# 'last-modified' headers are removed and that 'etag' header is
# stripped of double quotes.
@ -980,6 +992,7 @@ class TestContainerSync(unittest.TestCase):
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key))
self.assertEqual(cs.container_puts, expected_put_count)
excepted_failure_count += 1
self.assertEqual(len(exc), 1)
self.assertEqual(str(exc[-1]), 'test exception')
@ -1003,6 +1016,7 @@ class TestContainerSync(unittest.TestCase):
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key))
self.assertEqual(cs.container_puts, expected_put_count)
excepted_failure_count += 1
self.assertEqual(len(exc), 1)
self.assertEqual(str(exc[-1]), 'test client exception')
@ -1029,6 +1043,8 @@ class TestContainerSync(unittest.TestCase):
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key))
self.assertEqual(cs.container_puts, expected_put_count)
excepted_failure_count += 1
self.assertEqual(cs.container_failures, excepted_failure_count)
self.assertLogMessage('info', 'Unauth')
def fake_put_object(*args, **kwargs):
@ -1044,6 +1060,8 @@ class TestContainerSync(unittest.TestCase):
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key))
self.assertEqual(cs.container_puts, expected_put_count)
excepted_failure_count += 1
self.assertEqual(cs.container_failures, excepted_failure_count)
self.assertLogMessage('info', 'Not found', 1)
def fake_put_object(*args, **kwargs):
@ -1059,10 +1077,121 @@ class TestContainerSync(unittest.TestCase):
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key))
self.assertEqual(cs.container_puts, expected_put_count)
excepted_failure_count += 1
self.assertEqual(cs.container_failures, excepted_failure_count)
self.assertLogMessage('error', 'ERROR Syncing')
# Test the following cases:
# remote has the same date and a put doesn't take place
# remote has more up to date copy and a put doesn't take place
# head_object returns ClientException(404) and a put takes place
# head_object returns other ClientException put doesn't take place
# and we get failure
# head_object returns other Exception put does not take place
# and we get failure
# remote returns old copy and a put takes place
test_row = {'deleted': False,
'name': 'object',
'created_at': timestamp.internal,
'etag': '1111'}
test_info = {'account': 'a',
'container': 'c',
'storage_policy_index': 0}
actual_puts = []
def fake_put_object(*args, **kwargs):
actual_puts.append((args, kwargs))
def fake_head_object(*args, **kwargs):
return ({'x-timestamp': '1.2'}, '')
sync.put_object = fake_put_object
sync.head_object = fake_head_object
cs._object_in_remote_container = orig_object_in_rcontainer
self.assertTrue(cs.container_sync_row(
test_row, 'http://sync/to/path',
'key', FakeContainerBroker('broker'),
test_info,
realm, realm_key))
# No additional put has taken place
self.assertEqual(len(actual_puts), 0)
# No additional errors
self.assertEqual(cs.container_failures, excepted_failure_count)
def fake_head_object(*args, **kwargs):
return ({'x-timestamp': '1.3'}, '')
sync.head_object = fake_head_object
self.assertTrue(cs.container_sync_row(
test_row, 'http://sync/to/path',
'key', FakeContainerBroker('broker'),
test_info,
realm, realm_key))
# No additional put has taken place
self.assertEqual(len(actual_puts), 0)
# No additional errors
self.assertEqual(cs.container_failures, excepted_failure_count)
actual_puts = []
def fake_head_object(*args, **kwargs):
raise ClientException('test client exception', http_status=404)
sync.head_object = fake_head_object
self.assertTrue(cs.container_sync_row(
test_row, 'http://sync/to/path',
'key', FakeContainerBroker('broker'),
test_info, realm, realm_key))
# Additional put has taken place
self.assertEqual(len(actual_puts), 1)
# No additional errors
self.assertEqual(cs.container_failures, excepted_failure_count)
def fake_head_object(*args, **kwargs):
raise ClientException('test client exception', http_status=401)
sync.head_object = fake_head_object
self.assertFalse(cs.container_sync_row(
test_row, 'http://sync/to/path',
'key', FakeContainerBroker('broker'),
test_info, realm, realm_key))
# No additional put has taken place, failures increased
self.assertEqual(len(actual_puts), 1)
excepted_failure_count += 1
self.assertEqual(cs.container_failures, excepted_failure_count)
def fake_head_object(*args, **kwargs):
raise Exception()
sync.head_object = fake_head_object
self.assertFalse(cs.container_sync_row(
test_row,
'http://sync/to/path',
'key', FakeContainerBroker('broker'),
test_info, realm, realm_key))
# No additional put has taken place, failures increased
self.assertEqual(len(actual_puts), 1)
excepted_failure_count += 1
self.assertEqual(cs.container_failures, excepted_failure_count)
def fake_head_object(*args, **kwargs):
return ({'x-timestamp': '1.1'}, '')
sync.head_object = fake_head_object
self.assertTrue(cs.container_sync_row(
test_row, 'http://sync/to/path',
'key', FakeContainerBroker('broker'),
test_info, realm, realm_key))
# Additional put has taken place
self.assertEqual(len(actual_puts), 2)
# No additional errors
self.assertEqual(cs.container_failures, excepted_failure_count)
finally:
sync.uuid = orig_uuid
sync.put_object = orig_put_object
sync.head_object = orig_head_object
def test_select_http_proxy_None(self):