Merge "Make container sync copy SLO manifests"
This commit is contained in:
commit
4b4ef8d15d
@ -14,13 +14,24 @@ synchronization key.
|
||||
|
||||
.. note::
|
||||
|
||||
If you are using the large objects feature and syncing to another cluster
|
||||
then you will need to ensure that manifest files and segment files are
|
||||
synced. If segment files are in a different container than their manifest
|
||||
then both the manifest's container and the segments' container must be
|
||||
synced. The target container for synced segment files must always have the
|
||||
same name as their source container in order for them to be resolved by
|
||||
synced manifests.
|
||||
If you are using the :ref:`Large Objects <large-objects>` feature and
|
||||
syncing to another cluster then you will need to ensure that manifest files
|
||||
and segment files are synced. If segment files are in a different container
|
||||
than their manifest then both the manifest's container and the segments'
|
||||
container must be synced. The target container for synced segment files
|
||||
must always have the same name as their source container in order for them
|
||||
to be resolved by synced manifests.
|
||||
|
||||
Be aware that manifest files may be synced before segment files even if
|
||||
they are in the same container and were created after the segment files.
|
||||
|
||||
In the case of :ref:`Static Large Objects <static-large-objects>`, a GET
|
||||
request for a manifest whose segments have yet to be completely synced will
|
||||
fail with none or only part of the large object content being returned.
|
||||
|
||||
In the case of :ref:`Dynamic Large Objects <dynamic-large-objects>`, a GET
|
||||
request for a manifest whose segments have yet to be completely synced will
|
||||
either fail or return unexpected (and most likely incorrect) content.
|
||||
|
||||
.. note::
|
||||
|
||||
|
@ -130,6 +130,11 @@ class ContainerSync(object):
|
||||
raise exc
|
||||
else:
|
||||
req.environ['swift.authorize_override'] = True
|
||||
# An SLO manifest will already be in the internal manifest
|
||||
# syntax and might be synced before its segments, so stop SLO
|
||||
# middleware from performing the usual manifest validation.
|
||||
req.environ['swift.slo_override'] = True
|
||||
|
||||
if req.path == '/info':
|
||||
# Ensure /info requests get the freshest results
|
||||
self.register_info()
|
||||
|
@ -1041,6 +1041,9 @@ class StaticLargeObject(object):
|
||||
"""
|
||||
WSGI entry point
|
||||
"""
|
||||
if env.get('swift.slo_override'):
|
||||
return self.app(env, start_response)
|
||||
|
||||
req = Request(env)
|
||||
try:
|
||||
vrs, account, container, obj = req.split_path(4, 4, True)
|
||||
|
@ -361,6 +361,16 @@ class ProbeTest(unittest.TestCase):
|
||||
self.ipport2server[proxy_ipport] = 'proxy'
|
||||
self.url, self.token, self.account = check_server(
|
||||
proxy_ipport, self.ipport2server)
|
||||
self.account_1 = {
|
||||
'url': self.url, 'token': self.token, 'account': self.account}
|
||||
|
||||
url2, token2 = get_auth(
|
||||
'http://%s:%d/auth/v1.0' % proxy_ipport,
|
||||
'test2:tester2', 'testing2')
|
||||
self.account_2 = {
|
||||
'url': url2, 'token': token2, 'account': url2.split('/')[-1]}
|
||||
head_account(url2, token2) # sanity check
|
||||
|
||||
self.replicators = Manager(
|
||||
['account-replicator', 'container-replicator',
|
||||
'object-replicator'])
|
||||
|
@ -11,7 +11,7 @@
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
import uuid
|
||||
import random
|
||||
from nose import SkipTest
|
||||
@ -51,35 +51,47 @@ class TestContainerSync(ReplProbeTest):
|
||||
super(TestContainerSync, self).setUp()
|
||||
self.realm, self.cluster = get_current_realm_cluster(self.url)
|
||||
|
||||
def _setup_synced_containers(self, skey='secret', dkey='secret'):
|
||||
def _setup_synced_containers(
|
||||
self, source_overrides=None, dest_overrides=None):
|
||||
# these defaults are used to create both source and dest containers
|
||||
# unless overridden by source_overrides and/or dest_overrides
|
||||
default_params = {'url': self.url,
|
||||
'token': self.token,
|
||||
'account': self.account,
|
||||
'sync_key': 'secret'}
|
||||
|
||||
# setup dest container
|
||||
dest_container = 'dest-container-%s' % uuid.uuid4()
|
||||
dest = dict(default_params)
|
||||
dest['name'] = 'dest-container-%s' % uuid.uuid4()
|
||||
dest.update(dest_overrides or {})
|
||||
dest_headers = {}
|
||||
dest_policy = None
|
||||
if len(ENABLED_POLICIES) > 1:
|
||||
dest_policy = random.choice(ENABLED_POLICIES)
|
||||
dest_headers['X-Storage-Policy'] = dest_policy.name
|
||||
if dkey is not None:
|
||||
dest_headers['X-Container-Sync-Key'] = dkey
|
||||
client.put_container(self.url, self.token, dest_container,
|
||||
if dest['sync_key'] is not None:
|
||||
dest_headers['X-Container-Sync-Key'] = dest['sync_key']
|
||||
client.put_container(dest['url'], dest['token'], dest['name'],
|
||||
headers=dest_headers)
|
||||
|
||||
# setup source container
|
||||
source_container = 'source-container-%s' % uuid.uuid4()
|
||||
source = dict(default_params)
|
||||
source['name'] = 'source-container-%s' % uuid.uuid4()
|
||||
source.update(source_overrides or {})
|
||||
source_headers = {}
|
||||
sync_to = '//%s/%s/%s/%s' % (self.realm, self.cluster, self.account,
|
||||
dest_container)
|
||||
sync_to = '//%s/%s/%s/%s' % (self.realm, self.cluster, dest['account'],
|
||||
dest['name'])
|
||||
source_headers['X-Container-Sync-To'] = sync_to
|
||||
if skey is not None:
|
||||
source_headers['X-Container-Sync-Key'] = skey
|
||||
if source['sync_key'] is not None:
|
||||
source_headers['X-Container-Sync-Key'] = source['sync_key']
|
||||
if dest_policy:
|
||||
source_policy = random.choice([p for p in ENABLED_POLICIES
|
||||
if p is not dest_policy])
|
||||
source_headers['X-Storage-Policy'] = source_policy.name
|
||||
client.put_container(self.url, self.token, source_container,
|
||||
client.put_container(source['url'], source['token'], source['name'],
|
||||
headers=source_headers)
|
||||
|
||||
return source_container, dest_container
|
||||
return source['name'], dest['name']
|
||||
|
||||
def _test_sync(self, object_post_as_copy):
|
||||
source_container, dest_container = self._setup_synced_containers()
|
||||
@ -148,10 +160,97 @@ class TestContainerSync(ReplProbeTest):
|
||||
def test_sync_with_fast_post(self):
|
||||
self._test_sync(False)
|
||||
|
||||
def test_sync_slo_manifest(self):
|
||||
# Verify that SLO manifests are sync'd even if their segments can not
|
||||
# be found in the destination account at time of sync'ing.
|
||||
# Create source and dest containers for manifest in separate accounts.
|
||||
dest_account = self.account_2
|
||||
source_container, dest_container = self._setup_synced_containers(
|
||||
dest_overrides=dest_account
|
||||
)
|
||||
|
||||
# Create source and dest containers for segments in separate accounts.
|
||||
# These containers must have same name for the destination SLO manifest
|
||||
# to be able to resolve segments. Initially the destination has no sync
|
||||
# key so segments will not sync.
|
||||
segs_container = 'segments-%s' % uuid.uuid4()
|
||||
dest_segs_info = dict(dest_account)
|
||||
dest_segs_info.update({'name': segs_container, 'sync_key': None})
|
||||
self._setup_synced_containers(
|
||||
source_overrides={'name': segs_container, 'sync_key': 'segs_key'},
|
||||
dest_overrides=dest_segs_info)
|
||||
|
||||
# upload a segment to source
|
||||
segment_name = 'segment-%s' % uuid.uuid4()
|
||||
segment_data = 'segment body' # it's ok for first segment to be small
|
||||
segment_etag = client.put_object(
|
||||
self.url, self.token, segs_container, segment_name,
|
||||
segment_data)
|
||||
|
||||
manifest = [{'etag': segment_etag,
|
||||
'size_bytes': len(segment_data),
|
||||
'path': '/%s/%s' % (segs_container, segment_name)}]
|
||||
manifest_name = 'manifest-%s' % uuid.uuid4()
|
||||
put_headers = {'X-Object-Meta-Test': 'put_value'}
|
||||
client.put_object(
|
||||
self.url, self.token, source_container, manifest_name,
|
||||
json.dumps(manifest), headers=put_headers,
|
||||
query_string='multipart-manifest=put')
|
||||
|
||||
resp_headers, manifest_body = client.get_object(
|
||||
self.url, self.token, source_container, manifest_name,
|
||||
query_string='multipart-manifest=get')
|
||||
int_manifest = json.loads(manifest_body)
|
||||
|
||||
# cycle container-sync
|
||||
Manager(['container-sync']).once()
|
||||
|
||||
# verify manifest was sync'd
|
||||
resp_headers, dest_listing = client.get_container(
|
||||
dest_account['url'], dest_account['token'], dest_container)
|
||||
self.assertFalse(dest_listing[1:])
|
||||
self.assertEqual(manifest_name, dest_listing[0]['name'])
|
||||
|
||||
# verify manifest body
|
||||
resp_headers, body = client.get_object(
|
||||
dest_account['url'], dest_account['token'], dest_container,
|
||||
manifest_name, query_string='multipart-manifest=get')
|
||||
self.assertEqual(int_manifest, json.loads(body))
|
||||
self.assertIn('x-object-meta-test', resp_headers)
|
||||
self.assertEqual('put_value', resp_headers['x-object-meta-test'])
|
||||
|
||||
# attempt to GET the SLO will fail because the segment wasn't sync'd
|
||||
with self.assertRaises(ClientException) as cm:
|
||||
client.get_object(dest_account['url'], dest_account['token'],
|
||||
dest_container, manifest_name)
|
||||
self.assertEqual(409, cm.exception.http_status)
|
||||
|
||||
# now set sync key on destination segments container
|
||||
client.put_container(
|
||||
dest_account['url'], dest_account['token'], segs_container,
|
||||
headers={'X-Container-Sync-Key': 'segs_key'})
|
||||
|
||||
# cycle container-sync
|
||||
Manager(['container-sync']).once()
|
||||
|
||||
# sanity check - verify manifest body
|
||||
resp_headers, body = client.get_object(
|
||||
dest_account['url'], dest_account['token'], dest_container,
|
||||
manifest_name, query_string='multipart-manifest=get')
|
||||
self.assertEqual(int_manifest, json.loads(body))
|
||||
self.assertIn('x-object-meta-test', resp_headers)
|
||||
self.assertEqual('put_value', resp_headers['x-object-meta-test'])
|
||||
|
||||
# verify GET of SLO manifest now succeeds
|
||||
resp_headers, body = client.get_object(
|
||||
dest_account['url'], dest_account['token'], dest_container,
|
||||
manifest_name)
|
||||
self.assertEqual(segment_data, body)
|
||||
|
||||
def test_sync_lazy_skey(self):
|
||||
# Create synced containers, but with no key at source
|
||||
source_container, dest_container =\
|
||||
self._setup_synced_containers(None, 'secret')
|
||||
self._setup_synced_containers(source_overrides={'sync_key': None})
|
||||
|
||||
# upload to source
|
||||
object_name = 'object-%s' % uuid.uuid4()
|
||||
@ -178,7 +277,7 @@ class TestContainerSync(ReplProbeTest):
|
||||
def test_sync_lazy_dkey(self):
|
||||
# Create synced containers, but with no key at dest
|
||||
source_container, dest_container =\
|
||||
self._setup_synced_containers('secret', None)
|
||||
self._setup_synced_containers(dest_overrides={'sync_key': None})
|
||||
|
||||
# upload to source
|
||||
object_name = 'object-%s' % uuid.uuid4()
|
||||
|
@ -213,9 +213,9 @@ cluster_dfw1 = http://dfw1.host/v1/
|
||||
resp.body,
|
||||
'X-Container-Sync-Auth header not valid; contact cluster operator '
|
||||
'for support.')
|
||||
self.assertTrue(
|
||||
'cs:invalid-sig' in req.environ.get('swift.log_info'),
|
||||
req.environ.get('swift.log_info'))
|
||||
self.assertIn('cs:invalid-sig', req.environ.get('swift.log_info'))
|
||||
self.assertNotIn('swift.authorize_override', req.environ)
|
||||
self.assertNotIn('swift.slo_override', req.environ)
|
||||
|
||||
def test_valid_sig(self):
|
||||
ts = '1455221706.726999_0123456789abcdef'
|
||||
@ -233,6 +233,8 @@ cluster_dfw1 = http://dfw1.host/v1/
|
||||
self.assertIn('cs:valid', req.environ.get('swift.log_info'))
|
||||
self.assertIn('X-Timestamp', resp.headers)
|
||||
self.assertEqual(ts, resp.headers['X-Timestamp'])
|
||||
self.assertIn('swift.authorize_override', req.environ)
|
||||
self.assertIn('swift.slo_override', req.environ)
|
||||
|
||||
def test_valid_sig2(self):
|
||||
sig = self.sync.realms_conf.get_sig(
|
||||
@ -245,9 +247,9 @@ cluster_dfw1 = http://dfw1.host/v1/
|
||||
resp = req.get_response(self.sync)
|
||||
self.assertEqual(resp.status, '200 OK')
|
||||
self.assertEqual(resp.body, 'Response to Authorized Request')
|
||||
self.assertTrue(
|
||||
'cs:valid' in req.environ.get('swift.log_info'),
|
||||
req.environ.get('swift.log_info'))
|
||||
self.assertIn('cs:valid', req.environ.get('swift.log_info'))
|
||||
self.assertIn('swift.authorize_override', req.environ)
|
||||
self.assertIn('swift.slo_override', req.environ)
|
||||
|
||||
def test_info(self):
|
||||
req = swob.Request.blank('/info')
|
||||
|
@ -120,6 +120,25 @@ class TestSloMiddleware(SloTestCase):
|
||||
self.assertTrue(
|
||||
resp.startswith('X-Static-Large-Object is a reserved header'))
|
||||
|
||||
def test_slo_PUT_env_override(self):
|
||||
path = '/v1/a/c/o'
|
||||
body = 'manifest body not checked when override flag set'
|
||||
resp_status = []
|
||||
|
||||
def start_response(status, headers, *args):
|
||||
resp_status.append(status)
|
||||
|
||||
req = Request.blank(
|
||||
path, headers={'x-static-large-object': "true"},
|
||||
environ={'REQUEST_METHOD': 'PUT', 'swift.slo_override': True},
|
||||
body=body)
|
||||
self.app.register('PUT', path, swob.HTTPCreated, {})
|
||||
resp_iter = self.slo(req.environ, start_response)
|
||||
self.assertEqual('', ''.join(resp_iter))
|
||||
self.assertEqual(self.app.calls, [('PUT', path)])
|
||||
self.assertEqual(body, self.app.uploaded[path][1])
|
||||
self.assertEqual(resp_status[0], '201 Created')
|
||||
|
||||
def _put_bogus_slo(self, manifest_text,
|
||||
manifest_path='/v1/a/c/the-manifest'):
|
||||
with self.assertRaises(HTTPException) as catcher:
|
||||
|
@ -930,27 +930,30 @@ class TestContainerSync(unittest.TestCase):
|
||||
sync.uuid = FakeUUID
|
||||
ts_data = Timestamp(1.1)
|
||||
timestamp = Timestamp(1.2)
|
||||
put_object_calls = []
|
||||
|
||||
def fake_put_object(sync_to, name=None, headers=None,
|
||||
contents=None, proxy=None, logger=None,
|
||||
timeout=None):
|
||||
def fake_put_object(*args, **kwargs):
|
||||
put_object_calls.append((args, kwargs))
|
||||
|
||||
def check_put_object(extra_headers, sync_to, name=None,
|
||||
headers=None, contents=None, proxy=None,
|
||||
logger=None, timeout=None):
|
||||
self.assertEqual(sync_to, 'http://sync/to/path')
|
||||
self.assertEqual(name, 'object')
|
||||
expected_headers = {
|
||||
'x-timestamp': timestamp.internal,
|
||||
'etag': 'etagvalue',
|
||||
'other-header': 'other header value',
|
||||
'content-type': 'text/plain'}
|
||||
if realm:
|
||||
self.assertEqual(headers, {
|
||||
expected_headers.update({
|
||||
'x-container-sync-auth':
|
||||
'US abcdef a5fb3cf950738e6e3b364190e246bd7dd21dad3c',
|
||||
'x-timestamp': timestamp.internal,
|
||||
'etag': 'etagvalue',
|
||||
'other-header': 'other header value',
|
||||
'content-type': 'text/plain'})
|
||||
'US abcdef a5fb3cf950738e6e3b364190e246bd7dd21dad3c'})
|
||||
else:
|
||||
self.assertEqual(headers, {
|
||||
'x-container-sync-key': 'key',
|
||||
'x-timestamp': timestamp.internal,
|
||||
'other-header': 'other header value',
|
||||
'etag': 'etagvalue',
|
||||
'content-type': 'text/plain'})
|
||||
expected_headers.update({
|
||||
'x-container-sync-key': 'key'})
|
||||
expected_headers.update(extra_headers)
|
||||
self.assertDictEqual(expected_headers, headers)
|
||||
self.assertEqual(contents.read(), 'contents')
|
||||
self.assertEqual(proxy, 'http://proxy')
|
||||
self.assertEqual(timeout, 5.0)
|
||||
@ -995,6 +998,9 @@ class TestContainerSync(unittest.TestCase):
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
self.assertEqual(1, len(put_object_calls))
|
||||
check_put_object({'etag': 'etagvalue'},
|
||||
*put_object_calls[0][0], **put_object_calls[0][1])
|
||||
expected_put_count += 1
|
||||
self.assertEqual(cs.container_puts, expected_put_count)
|
||||
|
||||
@ -1016,6 +1022,7 @@ class TestContainerSync(unittest.TestCase):
|
||||
# Success as everything says it worked, also checks 'date' and
|
||||
# 'last-modified' headers are removed and that 'etag' header is
|
||||
# stripped of double quotes.
|
||||
put_object_calls = []
|
||||
self.assertTrue(cs.container_sync_row(
|
||||
{'deleted': False,
|
||||
'name': 'object',
|
||||
@ -1024,12 +1031,16 @@ class TestContainerSync(unittest.TestCase):
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
self.assertEqual(1, len(put_object_calls))
|
||||
check_put_object({'etag': 'etagvalue'},
|
||||
*put_object_calls[0][0], **put_object_calls[0][1])
|
||||
expected_put_count += 1
|
||||
self.assertEqual(cs.container_puts, expected_put_count)
|
||||
|
||||
# Success as everything says it worked, also check that PUT
|
||||
# timestamp equals GET timestamp when it is newer than created_at
|
||||
# value.
|
||||
put_object_calls = []
|
||||
self.assertTrue(cs.container_sync_row(
|
||||
{'deleted': False,
|
||||
'name': 'object',
|
||||
@ -1038,6 +1049,42 @@ class TestContainerSync(unittest.TestCase):
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
self.assertEqual(1, len(put_object_calls))
|
||||
check_put_object({'etag': 'etagvalue'},
|
||||
*put_object_calls[0][0], **put_object_calls[0][1])
|
||||
expected_put_count += 1
|
||||
self.assertEqual(cs.container_puts, expected_put_count)
|
||||
|
||||
def fake_get_object(acct, con, obj, headers, acceptable_statuses):
|
||||
self.assertEqual(headers['X-Newest'], True)
|
||||
self.assertEqual(headers['X-Backend-Storage-Policy-Index'],
|
||||
'0')
|
||||
return (200,
|
||||
{'date': 'date value',
|
||||
'last-modified': 'last modified value',
|
||||
'x-timestamp': timestamp.internal,
|
||||
'other-header': 'other header value',
|
||||
'etag': '"etagvalue"',
|
||||
'x-static-large-object': 'true',
|
||||
'content-type': 'text/plain; swift_bytes=123'},
|
||||
iter('contents'))
|
||||
|
||||
cs.swift.get_object = fake_get_object
|
||||
|
||||
# Success as everything says it worked, also check that etag
|
||||
# header removed in case of SLO
|
||||
put_object_calls = []
|
||||
self.assertTrue(cs.container_sync_row(
|
||||
{'deleted': False,
|
||||
'name': 'object',
|
||||
'created_at': '1.1',
|
||||
'size': 60}, 'http://sync/to/path',
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
self.assertEqual(1, len(put_object_calls))
|
||||
check_put_object({'x-static-large-object': 'true'},
|
||||
*put_object_calls[0][0], **put_object_calls[0][1])
|
||||
expected_put_count += 1
|
||||
self.assertEqual(cs.container_puts, expected_put_count)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user