diff --git a/swift/common/request_helpers.py b/swift/common/request_helpers.py index c7677c11ed..5ee246e55a 100644 --- a/swift/common/request_helpers.py +++ b/swift/common/request_helpers.py @@ -225,6 +225,21 @@ def remove_items(headers, condition): return removed +def copy_header_subset(from_r, to_r, condition): + """ + Will copy desired subset of headers from from_r to to_r. + + :param from_r: a swob Request or Response + :param to_r: a swob Request or Response + :param condition: a function that will be passed the header key as a + single argument and should return True if the header + is to be copied. + """ + for k, v in from_r.headers.items(): + if condition(k): + to_r.headers[k] = v + + def close_if_possible(maybe_closable): close_method = getattr(maybe_closable, 'close', None) if callable(close_method): diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index be3cde7416..896225a978 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -49,6 +49,7 @@ from eventlet import Timeout from swift import gettext_ as _ from swift.common.constraints import check_mount +from swift.common.request_helpers import is_sys_meta from swift.common.utils import mkdirs, Timestamp, \ storage_directory, hash_path, renamer, fallocate, fsync, \ fdatasync, drop_buffer_cache, ThreadPool, lock_path, write_pickle, \ @@ -1315,7 +1316,8 @@ class DiskFile(object): self._metadata = self._failsafe_read_metadata(meta_file, meta_file) sys_metadata = dict( [(key, val) for key, val in datafile_metadata.iteritems() - if key.lower() in DATAFILE_SYSTEM_META]) + if key.lower() in DATAFILE_SYSTEM_META + or is_sys_meta('object', key)]) self._metadata.update(sys_metadata) else: self._metadata = datafile_metadata diff --git a/swift/obj/server.py b/swift/obj/server.py index 8fe1e7c8fe..21370b7ac9 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -38,7 +38,8 @@ from swift.common.exceptions import ConnectionTimeout, DiskFileQuarantined, \ DiskFileDeviceUnavailable, DiskFileExpired, ChunkReadTimeout from swift.obj import ssync_receiver from swift.common.http import is_success -from swift.common.request_helpers import get_name_and_placement, is_user_meta +from swift.common.request_helpers import get_name_and_placement, \ + is_user_meta, is_sys_or_user_meta from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \ HTTPInternalServerError, HTTPNoContent, HTTPNotFound, \ HTTPPreconditionFailed, HTTPRequestTimeout, HTTPUnprocessableEntity, \ @@ -445,7 +446,7 @@ class ObjectController(object): 'Content-Length': str(upload_size), } metadata.update(val for val in request.headers.iteritems() - if is_user_meta('object', val[0])) + if is_sys_or_user_meta('object', val[0])) for header_key in ( request.headers.get('X-Backend-Replication-Headers') or self.allowed_headers): @@ -503,7 +504,7 @@ class ObjectController(object): response.headers['Content-Type'] = metadata.get( 'Content-Type', 'application/octet-stream') for key, value in metadata.iteritems(): - if is_user_meta('object', key) or \ + if is_sys_or_user_meta('object', key) or \ key.lower() in self.allowed_headers: response.headers[key] = value response.etag = metadata['ETag'] @@ -549,7 +550,7 @@ class ObjectController(object): response.headers['Content-Type'] = metadata.get( 'Content-Type', 'application/octet-stream') for key, value in metadata.iteritems(): - if is_user_meta('object', key) or \ + if is_sys_or_user_meta('object', key) or \ key.lower() in self.allowed_headers: response.headers[key] = value response.etag = metadata['ETag'] diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 64889a3092..06d9cbf5f4 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -56,7 +56,8 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \ HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \ HTTPServerError, HTTPServiceUnavailable, Request, \ HTTPClientDisconnect, HTTPNotImplemented -from swift.common.request_helpers import is_user_meta +from swift.common.request_helpers import is_sys_or_user_meta, is_sys_meta, \ + remove_items, copy_header_subset def copy_headers_into(from_r, to_r): @@ -67,7 +68,7 @@ def copy_headers_into(from_r, to_r): """ pass_headers = ['x-delete-at'] for k, v in from_r.headers.items(): - if is_user_meta('object', k) or k.lower() in pass_headers: + if is_sys_or_user_meta('object', k) or k.lower() in pass_headers: to_r.headers[k] = v @@ -624,8 +625,14 @@ class ObjectController(Controller): if not content_type_manually_set: sink_req.headers['Content-Type'] = \ source_resp.headers['Content-Type'] - if not config_true_value( + if config_true_value( sink_req.headers.get('x-fresh-metadata', 'false')): + # post-as-copy: ignore new sysmeta, copy existing sysmeta + condition = lambda k: is_sys_meta('object', k) + remove_items(sink_req.headers, condition) + copy_header_subset(source_resp, sink_req, condition) + else: + # copy/update existing sysmeta and user meta copy_headers_into(source_resp, sink_req) copy_headers_into(req, sink_req) # copy over x-static-large-object for POSTs and manifest copies diff --git a/test/probe/brain.py b/test/probe/brain.py new file mode 100644 index 0000000000..d37b68e94f --- /dev/null +++ b/test/probe/brain.py @@ -0,0 +1,206 @@ +#!/usr/bin/python -u +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import itertools +import uuid +from optparse import OptionParser +from urlparse import urlparse +import random + +from swift.common.manager import Manager +from swift.common import utils, ring +from swift.common.storage_policy import POLICIES +from swift.common.http import HTTP_NOT_FOUND + +from swiftclient import client, get_auth, ClientException + +TIMEOUT = 60 + + +def meta_command(name, bases, attrs): + """ + Look for attrs with a truthy attribute __command__ and add them to an + attribute __commands__ on the type that maps names to decorated methods. + The decorated methods' doc strings also get mapped in __docs__. + + Also adds a method run(command_name, *args, **kwargs) that will + execute the method mapped to the name in __commands__. + """ + commands = {} + docs = {} + for attr, value in attrs.items(): + if getattr(value, '__command__', False): + commands[attr] = value + # methods have always have a __doc__ attribute, sometimes empty + docs[attr] = (getattr(value, '__doc__', None) or + 'perform the %s command' % attr).strip() + attrs['__commands__'] = commands + attrs['__docs__'] = docs + + def run(self, command, *args, **kwargs): + return self.__commands__[command](self, *args, **kwargs) + attrs.setdefault('run', run) + return type(name, bases, attrs) + + +def command(f): + f.__command__ = True + return f + + +class BrainSplitter(object): + + __metaclass__ = meta_command + + def __init__(self, url, token, container_name='test', object_name='test', + server_type='container'): + self.url = url + self.token = token + self.account = utils.split_path(urlparse(url).path, 2, 2)[1] + self.container_name = container_name + self.object_name = object_name + server_list = ['%s-server' % server_type] if server_type else ['all'] + self.servers = Manager(server_list) + policies = list(POLICIES) + random.shuffle(policies) + self.policies = itertools.cycle(policies) + + o = object_name if server_type == 'object' else None + c = container_name if server_type in ('object', 'container') else None + part, nodes = ring.Ring( + '/etc/swift/%s.ring.gz' % server_type).get_nodes( + self.account, c, o) + node_ids = [n['id'] for n in nodes] + if all(n_id in node_ids for n_id in (0, 1)): + self.primary_numbers = (1, 2) + self.handoff_numbers = (3, 4) + else: + self.primary_numbers = (3, 4) + self.handoff_numbers = (1, 2) + + @command + def start_primary_half(self): + """ + start servers 1 & 2 + """ + tuple(self.servers.start(number=n) for n in self.primary_numbers) + + @command + def stop_primary_half(self): + """ + stop servers 1 & 2 + """ + tuple(self.servers.stop(number=n) for n in self.primary_numbers) + + @command + def start_handoff_half(self): + """ + start servers 3 & 4 + """ + tuple(self.servers.start(number=n) for n in self.handoff_numbers) + + @command + def stop_handoff_half(self): + """ + stop servers 3 & 4 + """ + tuple(self.servers.stop(number=n) for n in self.handoff_numbers) + + @command + def put_container(self, policy_index=None): + """ + put container with next storage policy + """ + policy = self.policies.next() + if policy_index is not None: + policy = POLICIES.get_by_index(int(policy_index)) + if not policy: + raise ValueError('Unknown policy with index %s' % policy) + headers = {'X-Storage-Policy': policy.name} + client.put_container(self.url, self.token, self.container_name, + headers=headers) + + @command + def delete_container(self): + """ + delete container + """ + client.delete_container(self.url, self.token, self.container_name) + + @command + def put_object(self, headers=None): + """ + issue put for zero byte test object + """ + client.put_object(self.url, self.token, self.container_name, + self.object_name, headers=headers) + + @command + def delete_object(self): + """ + issue delete for test object + """ + try: + client.delete_object(self.url, self.token, self.container_name, + self.object_name) + except ClientException as err: + if err.http_status != HTTP_NOT_FOUND: + raise + +parser = OptionParser('%prog [options] ' + '[:[,...]] [...]') +parser.usage += '\n\nCommands:\n\t' + \ + '\n\t'.join("%s - %s" % (name, doc) for name, doc in + BrainSplitter.__docs__.items()) +parser.add_option('-c', '--container', default='container-%s' % uuid.uuid4(), + help='set container name') +parser.add_option('-o', '--object', default='object-%s' % uuid.uuid4(), + help='set object name') +parser.add_option('-s', '--server_type', default='container', + help='set server type') + + +def main(): + options, commands = parser.parse_args() + if not commands: + parser.print_help() + return 'ERROR: must specify at least one command' + for cmd_args in commands: + cmd = cmd_args.split(':', 1)[0] + if cmd not in BrainSplitter.__commands__: + parser.print_help() + return 'ERROR: unknown command %s' % cmd + url, token = get_auth('http://127.0.0.1:8080/auth/v1.0', + 'test:tester', 'testing') + brain = BrainSplitter(url, token, options.container, options.object, + options.server_type) + for cmd_args in commands: + parts = cmd_args.split(':', 1) + command = parts[0] + if len(parts) > 1: + args = utils.list_from_csv(parts[1]) + else: + args = () + try: + brain.run(command, *args) + except ClientException as e: + print '**WARNING**: %s raised %s' % (command, e) + print 'STATUS'.join(['*' * 25] * 2) + brain.servers.status() + sys.exit() + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/test/probe/test_container_merge_policy_index.py b/test/probe/test_container_merge_policy_index.py index 795009e1bd..5773ce7d2e 100644 --- a/test/probe/test_container_merge_policy_index.py +++ b/test/probe/test_container_merge_policy_index.py @@ -13,166 +13,26 @@ # limitations under the License. from hashlib import md5 -import sys -import itertools import time import unittest import uuid -from optparse import OptionParser -from urlparse import urlparse import random from nose import SkipTest from swift.common.manager import Manager from swift.common.internal_client import InternalClient -from swift.common import utils, direct_client, ring +from swift.common import utils, direct_client from swift.common.storage_policy import POLICIES from swift.common.http import HTTP_NOT_FOUND +from test.probe.brain import BrainSplitter from test.probe.common import reset_environment, get_to_final_state -from swiftclient import client, get_auth, ClientException +from swiftclient import client, ClientException TIMEOUT = 60 -def meta_command(name, bases, attrs): - """ - Look for attrs with a truthy attribute __command__ and add them to an - attribute __commands__ on the type that maps names to decorated methods. - The decorated methods' doc strings also get mapped in __docs__. - - Also adds a method run(command_name, *args, **kwargs) that will - execute the method mapped to the name in __commands__. - """ - commands = {} - docs = {} - for attr, value in attrs.items(): - if getattr(value, '__command__', False): - commands[attr] = value - # methods have always have a __doc__ attribute, sometimes empty - docs[attr] = (getattr(value, '__doc__', None) or - 'perform the %s command' % attr).strip() - attrs['__commands__'] = commands - attrs['__docs__'] = docs - - def run(self, command, *args, **kwargs): - return self.__commands__[command](self, *args, **kwargs) - attrs.setdefault('run', run) - return type(name, bases, attrs) - - -def command(f): - f.__command__ = True - return f - - -class BrainSplitter(object): - - __metaclass__ = meta_command - - def __init__(self, url, token, container_name='test', object_name='test'): - self.url = url - self.token = token - self.account = utils.split_path(urlparse(url).path, 2, 2)[1] - self.container_name = container_name - self.object_name = object_name - self.servers = Manager(['container-server']) - policies = list(POLICIES) - random.shuffle(policies) - self.policies = itertools.cycle(policies) - - container_part, container_nodes = ring.Ring( - '/etc/swift/container.ring.gz').get_nodes( - self.account, self.container_name) - container_node_ids = [n['id'] for n in container_nodes] - if all(n_id in container_node_ids for n_id in (0, 1)): - self.primary_numbers = (1, 2) - self.handoff_numbers = (3, 4) - else: - self.primary_numbers = (3, 4) - self.handoff_numbers = (1, 2) - - @command - def start_primary_half(self): - """ - start container servers 1 & 2 - """ - tuple(self.servers.start(number=n) for n in self.primary_numbers) - - @command - def stop_primary_half(self): - """ - stop container servers 1 & 2 - """ - tuple(self.servers.stop(number=n) for n in self.primary_numbers) - - @command - def start_handoff_half(self): - """ - start container servers 3 & 4 - """ - tuple(self.servers.start(number=n) for n in self.handoff_numbers) - - @command - def stop_handoff_half(self): - """ - stop container servers 3 & 4 - """ - tuple(self.servers.stop(number=n) for n in self.handoff_numbers) - - @command - def put_container(self, policy_index=None): - """ - put container with next storage policy - """ - policy = self.policies.next() - if policy_index is not None: - policy = POLICIES.get_by_index(int(policy_index)) - if not policy: - raise ValueError('Unknown policy with index %s' % policy) - headers = {'X-Storage-Policy': policy.name} - client.put_container(self.url, self.token, self.container_name, - headers=headers) - - @command - def delete_container(self): - """ - delete container - """ - client.delete_container(self.url, self.token, self.container_name) - - @command - def put_object(self, headers=None): - """ - issue put for zero byte test object - """ - client.put_object(self.url, self.token, self.container_name, - self.object_name, headers=headers) - - @command - def delete_object(self): - """ - issue delete for test object - """ - try: - client.delete_object(self.url, self.token, self.container_name, - self.object_name) - except ClientException as err: - if err.http_status != HTTP_NOT_FOUND: - raise - -parser = OptionParser('%prog split-brain [options] ' - '[:[,...]] [...]') -parser.usage += '\n\nCommands:\n\t' + \ - '\n\t'.join("%s - %s" % (name, doc) for name, doc in - BrainSplitter.__docs__.items()) -parser.add_option('-c', '--container', default='container-%s' % uuid.uuid4(), - help='set container name') -parser.add_option('-o', '--object', default='object-%s' % uuid.uuid4(), - help='set object name') - - class TestContainerMergePolicyIndex(unittest.TestCase): def setUp(self): @@ -184,7 +44,7 @@ class TestContainerMergePolicyIndex(unittest.TestCase): self.container_name = 'container-%s' % uuid.uuid4() self.object_name = 'object-%s' % uuid.uuid4() self.brain = BrainSplitter(self.url, self.token, self.container_name, - self.object_name) + self.object_name, 'container') def test_merge_storage_policy_index(self): # generic split brain @@ -594,37 +454,5 @@ class TestContainerMergePolicyIndex(unittest.TestCase): self.fail('Found unexpected object %r in the queue' % obj) -def main(): - options, commands = parser.parse_args() - commands.remove('split-brain') - if not commands: - parser.print_help() - return 'ERROR: must specify at least one command' - for cmd_args in commands: - cmd = cmd_args.split(':', 1)[0] - if cmd not in BrainSplitter.__commands__: - parser.print_help() - return 'ERROR: unknown command %s' % cmd - url, token = get_auth('http://127.0.0.1:8080/auth/v1.0', - 'test:tester', 'testing') - brain = BrainSplitter(url, token, options.container, options.object) - for cmd_args in commands: - parts = cmd_args.split(':', 1) - command = parts[0] - if len(parts) > 1: - args = utils.list_from_csv(parts[1]) - else: - args = () - try: - brain.run(command, *args) - except ClientException as e: - print '**WARNING**: %s raised %s' % (command, e) - print 'STATUS'.join(['*' * 25] * 2) - brain.servers.status() - sys.exit() - - if __name__ == "__main__": - if any('split-brain' in arg for arg in sys.argv): - sys.exit(main()) unittest.main() diff --git a/test/probe/test_object_metadata_replication.py b/test/probe/test_object_metadata_replication.py new file mode 100644 index 0000000000..23ed2db193 --- /dev/null +++ b/test/probe/test_object_metadata_replication.py @@ -0,0 +1,186 @@ +#!/usr/bin/python -u +# Copyright (c) 2010-2012 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from io import StringIO +from tempfile import mkdtemp +from textwrap import dedent + +import os +import shutil +import unittest +import uuid +from swift.common import internal_client + +from test.probe.brain import BrainSplitter +from test.probe.common import kill_servers, reset_environment, \ + get_to_final_state + + +class Test(unittest.TestCase): + def setUp(self): + """ + Reset all environment and start all servers. + """ + (self.pids, self.port2server, self.account_ring, self.container_ring, + self.object_ring, self.policy, self.url, self.token, + self.account, self.configs) = reset_environment() + self.container_name = 'container-%s' % uuid.uuid4() + self.object_name = 'object-%s' % uuid.uuid4() + self.brain = BrainSplitter(self.url, self.token, self.container_name, + self.object_name, 'object') + self.tempdir = mkdtemp() + conf_path = os.path.join(self.tempdir, 'internal_client.conf') + conf_body = """ + [DEFAULT] + swift_dir = /etc/swift + + [pipeline:main] + pipeline = catch_errors cache proxy-server + + [app:proxy-server] + use = egg:swift#proxy + object_post_as_copy = false + + [filter:cache] + use = egg:swift#memcache + + [filter:catch_errors] + use = egg:swift#catch_errors + """ + with open(conf_path, 'w') as f: + f.write(dedent(conf_body)) + self.int_client = internal_client.InternalClient(conf_path, 'test', 1) + + def tearDown(self): + """ + Stop all servers. + """ + kill_servers(self.port2server, self.pids) + shutil.rmtree(self.tempdir) + + def _put_object(self, headers=None): + headers = headers or {} + self.int_client.upload_object(StringIO(u'stuff'), self.account, + self.container_name, + self.object_name, headers) + + def _post_object(self, headers): + self.int_client.set_object_metadata(self.account, self.container_name, + self.object_name, headers) + + def _get_object_metadata(self): + return self.int_client.get_object_metadata(self.account, + self.container_name, + self.object_name) + + def test_sysmeta_after_replication_with_subsequent_post(self): + sysmeta = {'x-object-sysmeta-foo': 'sysmeta-foo'} + usermeta = {'x-object-meta-bar': 'meta-bar'} + self.brain.put_container(policy_index=0) + # put object + self._put_object() + # put newer object with sysmeta to first server subset + self.brain.stop_primary_half() + self._put_object(headers=sysmeta) + metadata = self._get_object_metadata() + for key in sysmeta: + self.assertTrue(key in metadata) + self.assertEqual(metadata[key], sysmeta[key]) + self.brain.start_primary_half() + + # post some user meta to second server subset + self.brain.stop_handoff_half() + self._post_object(usermeta) + metadata = self._get_object_metadata() + for key in usermeta: + self.assertTrue(key in metadata) + self.assertEqual(metadata[key], usermeta[key]) + for key in sysmeta: + self.assertFalse(key in metadata) + self.brain.start_handoff_half() + + # run replicator + get_to_final_state() + + # check user metadata has been replicated to first server subset + # and sysmeta is unchanged + self.brain.stop_primary_half() + metadata = self._get_object_metadata() + expected = dict(sysmeta) + expected.update(usermeta) + for key in expected.keys(): + self.assertTrue(key in metadata, key) + self.assertEqual(metadata[key], expected[key]) + self.brain.start_primary_half() + + # check user metadata and sysmeta both on second server subset + self.brain.stop_handoff_half() + metadata = self._get_object_metadata() + for key in expected.keys(): + self.assertTrue(key in metadata, key) + self.assertEqual(metadata[key], expected[key]) + + def test_sysmeta_after_replication_with_prior_post(self): + sysmeta = {'x-object-sysmeta-foo': 'sysmeta-foo'} + usermeta = {'x-object-meta-bar': 'meta-bar'} + self.brain.put_container(policy_index=0) + # put object + self._put_object() + + # put user meta to first server subset + self.brain.stop_handoff_half() + self._post_object(headers=usermeta) + metadata = self._get_object_metadata() + for key in usermeta: + self.assertTrue(key in metadata) + self.assertEqual(metadata[key], usermeta[key]) + self.brain.start_handoff_half() + + # put newer object with sysmeta to second server subset + self.brain.stop_primary_half() + self._put_object(headers=sysmeta) + metadata = self._get_object_metadata() + for key in sysmeta: + self.assertTrue(key in metadata) + self.assertEqual(metadata[key], sysmeta[key]) + self.brain.start_primary_half() + + # run replicator + get_to_final_state() + + # check stale user metadata is not replicated to first server subset + # and sysmeta is unchanged + self.brain.stop_primary_half() + metadata = self._get_object_metadata() + for key in sysmeta: + self.assertTrue(key in metadata) + self.assertEqual(metadata[key], sysmeta[key]) + for key in usermeta: + self.assertFalse(key in metadata) + self.brain.start_primary_half() + + # check stale user metadata is removed from second server subset + # and sysmeta is replicated + self.brain.stop_handoff_half() + metadata = self._get_object_metadata() + for key in sysmeta: + self.assertTrue(key in metadata) + self.assertEqual(metadata[key], sysmeta[key]) + for key in usermeta: + self.assertFalse(key in metadata) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/unit/common/test_request_helpers.py b/test/unit/common/test_request_helpers.py index 8bb382db1d..c87a39979b 100644 --- a/test/unit/common/test_request_helpers.py +++ b/test/unit/common/test_request_helpers.py @@ -16,9 +16,10 @@ """Tests for swift.common.request_helpers""" import unittest +from swift.common.swob import Request from swift.common.request_helpers import is_sys_meta, is_user_meta, \ is_sys_or_user_meta, strip_sys_meta_prefix, strip_user_meta_prefix, \ - remove_items + remove_items, copy_header_subset server_types = ['account', 'container', 'object'] @@ -68,3 +69,15 @@ class TestRequestHelpers(unittest.TestCase): rem = remove_items(src, test) self.assertEquals(src, {'c': 'd'}) self.assertEquals(rem, {'a': 'b'}) + + def test_copy_header_subset(self): + src = {'a': 'b', + 'c': 'd'} + from_req = Request.blank('/path', environ={}, headers=src) + to_req = Request.blank('/path', {}) + test = lambda x: x.lower() == 'a' + copy_header_subset(from_req, to_req, test) + self.assertTrue('A' in to_req.headers) + self.assertEqual(to_req.headers['A'], 'b') + self.assertFalse('c' in to_req.headers) + self.assertFalse('C' in to_req.headers) diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py index 43bd85d55b..b62dbf85cc 100644 --- a/test/unit/obj/test_diskfile.py +++ b/test/unit/obj/test_diskfile.py @@ -1080,6 +1080,25 @@ class TestDiskFile(unittest.TestCase): # new fast-post updateable keys are added self.assertEquals('Value2', df._metadata['X-Object-Meta-Key2']) + def test_disk_file_preserves_sysmeta(self): + # build an object with some meta (ts 41) + orig_metadata = {'X-Object-Sysmeta-Key1': 'Value1', + 'Content-Type': 'text/garbage'} + df = self._get_open_disk_file(ts=41, extra_metadata=orig_metadata) + with df.open(): + self.assertEquals('1024', df._metadata['Content-Length']) + # write some new metadata (fast POST, don't send orig meta, ts 42) + df = self._simple_get_diskfile() + df.write_metadata({'X-Timestamp': Timestamp(42).internal, + 'X-Object-Sysmeta-Key1': 'Value2', + 'X-Object-Meta-Key3': 'Value3'}) + df = self._simple_get_diskfile() + with df.open(): + # non-fast-post updateable keys are preserved + self.assertEquals('text/garbage', df._metadata['Content-Type']) + # original sysmeta keys are preserved + self.assertEquals('Value1', df._metadata['X-Object-Sysmeta-Key1']) + def test_disk_file_reader_iter(self): df = self._create_test_file('1234567890') quarantine_msgs = [] diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index d521ff0a92..a43d420abe 100755 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -730,6 +730,181 @@ class TestObjectController(unittest.TestCase): resp = req.get_response(self.object_controller) self.assertEquals(resp.status_int, 408) + def test_PUT_system_metadata(self): + # check that sysmeta is stored in diskfile + timestamp = normalize_timestamp(time()) + req = Request.blank( + '/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': timestamp, + 'Content-Type': 'text/plain', + 'ETag': '1000d172764c9dbc3a5798a67ec5bb76', + 'X-Object-Meta-1': 'One', + 'X-Object-Sysmeta-1': 'One', + 'X-Object-Sysmeta-Two': 'Two'}) + req.body = 'VERIFY SYSMETA' + resp = req.get_response(self.object_controller) + self.assertEquals(resp.status_int, 201) + objfile = os.path.join( + self.testdir, 'sda1', + storage_directory(diskfile.get_data_dir(0), 'p', + hash_path('a', 'c', 'o')), + timestamp + '.data') + self.assert_(os.path.isfile(objfile)) + self.assertEquals(open(objfile).read(), 'VERIFY SYSMETA') + self.assertEquals(diskfile.read_metadata(objfile), + {'X-Timestamp': timestamp, + 'Content-Length': '14', + 'Content-Type': 'text/plain', + 'ETag': '1000d172764c9dbc3a5798a67ec5bb76', + 'name': '/a/c/o', + 'X-Object-Meta-1': 'One', + 'X-Object-Sysmeta-1': 'One', + 'X-Object-Sysmeta-Two': 'Two'}) + + def test_POST_system_metadata(self): + # check that diskfile sysmeta is not changed by a POST + timestamp1 = normalize_timestamp(time()) + req = Request.blank( + '/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': timestamp1, + 'Content-Type': 'text/plain', + 'ETag': '1000d172764c9dbc3a5798a67ec5bb76', + 'X-Object-Meta-1': 'One', + 'X-Object-Sysmeta-1': 'One', + 'X-Object-Sysmeta-Two': 'Two'}) + req.body = 'VERIFY SYSMETA' + resp = req.get_response(self.object_controller) + self.assertEquals(resp.status_int, 201) + + timestamp2 = normalize_timestamp(time()) + req = Request.blank( + '/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'}, + headers={'X-Timestamp': timestamp2, + 'X-Object-Meta-1': 'Not One', + 'X-Object-Sysmeta-1': 'Not One', + 'X-Object-Sysmeta-Two': 'Not Two'}) + resp = req.get_response(self.object_controller) + self.assertEquals(resp.status_int, 202) + + # original .data file metadata should be unchanged + objfile = os.path.join( + self.testdir, 'sda1', + storage_directory(diskfile.get_data_dir(0), 'p', + hash_path('a', 'c', 'o')), + timestamp1 + '.data') + self.assert_(os.path.isfile(objfile)) + self.assertEquals(open(objfile).read(), 'VERIFY SYSMETA') + self.assertEquals(diskfile.read_metadata(objfile), + {'X-Timestamp': timestamp1, + 'Content-Length': '14', + 'Content-Type': 'text/plain', + 'ETag': '1000d172764c9dbc3a5798a67ec5bb76', + 'name': '/a/c/o', + 'X-Object-Meta-1': 'One', + 'X-Object-Sysmeta-1': 'One', + 'X-Object-Sysmeta-Two': 'Two'}) + + # .meta file metadata should have only user meta items + metafile = os.path.join( + self.testdir, 'sda1', + storage_directory(diskfile.get_data_dir(0), 'p', + hash_path('a', 'c', 'o')), + timestamp2 + '.meta') + self.assert_(os.path.isfile(metafile)) + self.assertEquals(diskfile.read_metadata(metafile), + {'X-Timestamp': timestamp2, + 'name': '/a/c/o', + 'X-Object-Meta-1': 'Not One'}) + + def test_PUT_then_fetch_system_metadata(self): + timestamp = normalize_timestamp(time()) + req = Request.blank( + '/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': timestamp, + 'Content-Type': 'text/plain', + 'ETag': '1000d172764c9dbc3a5798a67ec5bb76', + 'X-Object-Meta-1': 'One', + 'X-Object-Sysmeta-1': 'One', + 'X-Object-Sysmeta-Two': 'Two'}) + req.body = 'VERIFY SYSMETA' + resp = req.get_response(self.object_controller) + self.assertEquals(resp.status_int, 201) + + def check_response(resp): + self.assertEquals(resp.status_int, 200) + self.assertEquals(resp.content_length, 14) + self.assertEquals(resp.content_type, 'text/plain') + self.assertEquals(resp.headers['content-type'], 'text/plain') + self.assertEquals( + resp.headers['last-modified'], + strftime('%a, %d %b %Y %H:%M:%S GMT', + gmtime(math.ceil(float(timestamp))))) + self.assertEquals(resp.headers['etag'], + '"1000d172764c9dbc3a5798a67ec5bb76"') + self.assertEquals(resp.headers['x-object-meta-1'], 'One') + self.assertEquals(resp.headers['x-object-sysmeta-1'], 'One') + self.assertEquals(resp.headers['x-object-sysmeta-two'], 'Two') + + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'HEAD'}) + resp = req.get_response(self.object_controller) + check_response(resp) + + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'GET'}) + resp = req.get_response(self.object_controller) + check_response(resp) + + def test_PUT_then_POST_then_fetch_system_metadata(self): + timestamp = normalize_timestamp(time()) + req = Request.blank( + '/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': timestamp, + 'Content-Type': 'text/plain', + 'ETag': '1000d172764c9dbc3a5798a67ec5bb76', + 'X-Object-Meta-1': 'One', + 'X-Object-Sysmeta-1': 'One', + 'X-Object-Sysmeta-Two': 'Two'}) + req.body = 'VERIFY SYSMETA' + resp = req.get_response(self.object_controller) + self.assertEquals(resp.status_int, 201) + + timestamp2 = normalize_timestamp(time()) + req = Request.blank( + '/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'}, + headers={'X-Timestamp': timestamp2, + 'X-Object-Meta-1': 'Not One', + 'X-Object-Sysmeta-1': 'Not One', + 'X-Object-Sysmeta-Two': 'Not Two'}) + resp = req.get_response(self.object_controller) + self.assertEquals(resp.status_int, 202) + + def check_response(resp): + # user meta should be updated but not sysmeta + self.assertEquals(resp.status_int, 200) + self.assertEquals(resp.content_length, 14) + self.assertEquals(resp.content_type, 'text/plain') + self.assertEquals(resp.headers['content-type'], 'text/plain') + self.assertEquals( + resp.headers['last-modified'], + strftime('%a, %d %b %Y %H:%M:%S GMT', + gmtime(math.ceil(float(timestamp2))))) + self.assertEquals(resp.headers['etag'], + '"1000d172764c9dbc3a5798a67ec5bb76"') + self.assertEquals(resp.headers['x-object-meta-1'], 'Not One') + self.assertEquals(resp.headers['x-object-sysmeta-1'], 'One') + self.assertEquals(resp.headers['x-object-sysmeta-two'], 'Two') + + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'HEAD'}) + resp = req.get_response(self.object_controller) + check_response(resp) + + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'GET'}) + resp = req.get_response(self.object_controller) + check_response(resp) + def test_PUT_container_connection(self): def mock_http_connect(response, with_exc=False): diff --git a/test/unit/proxy/test_sysmeta.py b/test/unit/proxy/test_sysmeta.py new file mode 100644 index 0000000000..c15b51bc21 --- /dev/null +++ b/test/unit/proxy/test_sysmeta.py @@ -0,0 +1,361 @@ +# Copyright (c) 2010-2012 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import unittest +import os +from tempfile import mkdtemp +from urllib import quote +from swift.common.storage_policy import StoragePolicy +from swift.common.swob import Request +from swift.common.utils import mkdirs, split_path +from swift.common.wsgi import monkey_patch_mimetools, WSGIContext +from swift.obj import server as object_server +from swift.proxy import server as proxy +import swift.proxy.controllers +from test.unit import FakeMemcache, debug_logger, FakeRing, \ + fake_http_connect, patch_policies + + +class FakeServerConnection(WSGIContext): + '''Fakes an HTTPConnection to a server instance.''' + def __init__(self, app): + super(FakeServerConnection, self).__init__(app) + self.data = '' + + def getheaders(self): + return self._response_headers + + def read(self, amt=None): + try: + result = self.resp_iter.next() + return result + except StopIteration: + return '' + + def getheader(self, name, default=None): + result = self._response_header_value(name) + return result if result else default + + def getresponse(self): + environ = {'REQUEST_METHOD': self.method} + req = Request.blank(self.path, environ, headers=self.req_headers, + body=self.data) + self.resp = self._app_call(req.environ) + self.resp_iter = iter(self.resp) + if self._response_headers is None: + self._response_headers = [] + status_parts = self._response_status.split(' ', 1) + self.status = int(status_parts[0]) + self.reason = status_parts[1] if len(status_parts) == 2 else '' + return self + + def getexpect(self): + class ContinueResponse(object): + status = 100 + return ContinueResponse() + + def send(self, data): + self.data = data + + def __call__(self, ipaddr, port, device, partition, method, path, + headers=None, query_string=None): + self.path = quote('/' + device + '/' + str(partition) + path) + self.method = method + self.req_headers = headers + return self + + +def get_http_connect(account_func, container_func, object_func): + '''Returns a http_connect function that delegates to + entity-specific http_connect methods based on request path. + ''' + def http_connect(ipaddr, port, device, partition, method, path, + headers=None, query_string=None): + a, c, o = split_path(path, 1, 3, True) + if o: + func = object_func + elif c: + func = container_func + else: + func = account_func + resp = func(ipaddr, port, device, partition, method, path, + headers=headers, query_string=query_string) + return resp + + return http_connect + + +@patch_policies([StoragePolicy(0, 'zero', True, + object_ring=FakeRing(replicas=1))]) +class TestObjectSysmeta(unittest.TestCase): + '''Tests object sysmeta is correctly handled by combination + of proxy server and object server. + ''' + def _assertStatus(self, resp, expected): + self.assertEqual(resp.status_int, expected, + 'Expected %d, got %s' + % (expected, resp.status)) + + def _assertInHeaders(self, resp, expected): + for key, val in expected.iteritems(): + self.assertTrue(key in resp.headers, + 'Header %s missing from %s' % (key, resp.headers)) + self.assertEqual(val, resp.headers[key], + 'Expected header %s:%s, got %s:%s' + % (key, val, key, resp.headers[key])) + + def _assertNotInHeaders(self, resp, unexpected): + for key, val in unexpected.iteritems(): + self.assertFalse(key in resp.headers, + 'Header %s not expected in %s' + % (key, resp.headers)) + + def setUp(self): + self.app = proxy.Application(None, FakeMemcache(), + logger=debug_logger('proxy-ut'), + account_ring=FakeRing(replicas=1), + container_ring=FakeRing(replicas=1)) + monkey_patch_mimetools() + self.testdir = \ + os.path.join(mkdtemp(), 'tmp_test_object_server_ObjectController') + mkdirs(os.path.join(self.testdir, 'sda1', 'tmp')) + conf = {'devices': self.testdir, 'mount_check': 'false'} + self.obj_ctlr = object_server.ObjectController( + conf, logger=debug_logger('obj-ut')) + + http_connect = get_http_connect(fake_http_connect(200), + fake_http_connect(200), + FakeServerConnection(self.obj_ctlr)) + + swift.proxy.controllers.base.http_connect = http_connect + swift.proxy.controllers.obj.http_connect = http_connect + + original_sysmeta_headers_1 = {'x-object-sysmeta-test0': 'val0', + 'x-object-sysmeta-test1': 'val1'} + original_sysmeta_headers_2 = {'x-object-sysmeta-test2': 'val2'} + changed_sysmeta_headers = {'x-object-sysmeta-test0': '', + 'x-object-sysmeta-test1': 'val1 changed'} + new_sysmeta_headers = {'x-object-sysmeta-test3': 'val3'} + original_meta_headers_1 = {'x-object-meta-test0': 'meta0', + 'x-object-meta-test1': 'meta1'} + original_meta_headers_2 = {'x-object-meta-test2': 'meta2'} + changed_meta_headers = {'x-object-meta-test0': '', + 'x-object-meta-test1': 'meta1 changed'} + new_meta_headers = {'x-object-meta-test3': 'meta3'} + bad_headers = {'x-account-sysmeta-test1': 'bad1'} + + def test_PUT_sysmeta_then_GET(self): + path = '/v1/a/c/o' + + env = {'REQUEST_METHOD': 'PUT'} + hdrs = dict(self.original_sysmeta_headers_1) + hdrs.update(self.original_meta_headers_1) + hdrs.update(self.bad_headers) + req = Request.blank(path, environ=env, headers=hdrs, body='x') + resp = req.get_response(self.app) + self._assertStatus(resp, 201) + + req = Request.blank(path, environ={}) + resp = req.get_response(self.app) + self._assertStatus(resp, 200) + self._assertInHeaders(resp, self.original_sysmeta_headers_1) + self._assertInHeaders(resp, self.original_meta_headers_1) + self._assertNotInHeaders(resp, self.bad_headers) + + def test_PUT_sysmeta_then_HEAD(self): + path = '/v1/a/c/o' + + env = {'REQUEST_METHOD': 'PUT'} + hdrs = dict(self.original_sysmeta_headers_1) + hdrs.update(self.original_meta_headers_1) + hdrs.update(self.bad_headers) + req = Request.blank(path, environ=env, headers=hdrs, body='x') + resp = req.get_response(self.app) + self._assertStatus(resp, 201) + + env = {'REQUEST_METHOD': 'HEAD'} + req = Request.blank(path, environ=env) + resp = req.get_response(self.app) + self._assertStatus(resp, 200) + self._assertInHeaders(resp, self.original_sysmeta_headers_1) + self._assertInHeaders(resp, self.original_meta_headers_1) + self._assertNotInHeaders(resp, self.bad_headers) + + def test_sysmeta_replaced_by_PUT(self): + path = '/v1/a/c/o' + + env = {'REQUEST_METHOD': 'PUT'} + hdrs = dict(self.original_sysmeta_headers_1) + hdrs.update(self.original_sysmeta_headers_2) + hdrs.update(self.original_meta_headers_1) + hdrs.update(self.original_meta_headers_2) + req = Request.blank(path, environ=env, headers=hdrs, body='x') + resp = req.get_response(self.app) + self._assertStatus(resp, 201) + + env = {'REQUEST_METHOD': 'PUT'} + hdrs = dict(self.changed_sysmeta_headers) + hdrs.update(self.new_sysmeta_headers) + hdrs.update(self.changed_meta_headers) + hdrs.update(self.new_meta_headers) + hdrs.update(self.bad_headers) + req = Request.blank(path, environ=env, headers=hdrs, body='x') + resp = req.get_response(self.app) + self._assertStatus(resp, 201) + + req = Request.blank(path, environ={}) + resp = req.get_response(self.app) + self._assertStatus(resp, 200) + self._assertInHeaders(resp, self.changed_sysmeta_headers) + self._assertInHeaders(resp, self.new_sysmeta_headers) + self._assertNotInHeaders(resp, self.original_sysmeta_headers_2) + self._assertInHeaders(resp, self.changed_meta_headers) + self._assertInHeaders(resp, self.new_meta_headers) + self._assertNotInHeaders(resp, self.original_meta_headers_2) + + def _test_sysmeta_not_updated_by_POST(self): + # check sysmeta is not changed by a POST but user meta is replaced + path = '/v1/a/c/o' + + env = {'REQUEST_METHOD': 'PUT'} + hdrs = dict(self.original_sysmeta_headers_1) + hdrs.update(self.original_meta_headers_1) + req = Request.blank(path, environ=env, headers=hdrs, body='x') + resp = req.get_response(self.app) + self._assertStatus(resp, 201) + + env = {'REQUEST_METHOD': 'POST'} + hdrs = dict(self.changed_sysmeta_headers) + hdrs.update(self.new_sysmeta_headers) + hdrs.update(self.changed_meta_headers) + hdrs.update(self.new_meta_headers) + hdrs.update(self.bad_headers) + req = Request.blank(path, environ=env, headers=hdrs) + resp = req.get_response(self.app) + self._assertStatus(resp, 202) + + req = Request.blank(path, environ={}) + resp = req.get_response(self.app) + self._assertStatus(resp, 200) + self._assertInHeaders(resp, self.original_sysmeta_headers_1) + self._assertNotInHeaders(resp, self.new_sysmeta_headers) + self._assertInHeaders(resp, self.changed_meta_headers) + self._assertInHeaders(resp, self.new_meta_headers) + self._assertNotInHeaders(resp, self.bad_headers) + + env = {'REQUEST_METHOD': 'PUT'} + hdrs = dict(self.changed_sysmeta_headers) + hdrs.update(self.new_sysmeta_headers) + hdrs.update(self.bad_headers) + req = Request.blank(path, environ=env, headers=hdrs, body='x') + resp = req.get_response(self.app) + self._assertStatus(resp, 201) + + req = Request.blank(path, environ={}) + resp = req.get_response(self.app) + self._assertStatus(resp, 200) + self._assertInHeaders(resp, self.changed_sysmeta_headers) + self._assertInHeaders(resp, self.new_sysmeta_headers) + self._assertNotInHeaders(resp, self.original_sysmeta_headers_2) + + def test_sysmeta_not_updated_by_POST(self): + self.app.object_post_as_copy = False + self._test_sysmeta_not_updated_by_POST() + + def test_sysmeta_not_updated_by_POST_as_copy(self): + self.app.object_post_as_copy = True + self._test_sysmeta_not_updated_by_POST() + + def test_sysmeta_updated_by_COPY(self): + # check sysmeta is updated by a COPY in same way as user meta + path = '/v1/a/c/o' + dest = '/c/o2' + env = {'REQUEST_METHOD': 'PUT'} + hdrs = dict(self.original_sysmeta_headers_1) + hdrs.update(self.original_sysmeta_headers_2) + hdrs.update(self.original_meta_headers_1) + hdrs.update(self.original_meta_headers_2) + req = Request.blank(path, environ=env, headers=hdrs, body='x') + resp = req.get_response(self.app) + self._assertStatus(resp, 201) + + env = {'REQUEST_METHOD': 'COPY'} + hdrs = dict(self.changed_sysmeta_headers) + hdrs.update(self.new_sysmeta_headers) + hdrs.update(self.changed_meta_headers) + hdrs.update(self.new_meta_headers) + hdrs.update(self.bad_headers) + hdrs.update({'Destination': dest}) + req = Request.blank(path, environ=env, headers=hdrs) + resp = req.get_response(self.app) + self._assertStatus(resp, 201) + self._assertInHeaders(resp, self.changed_sysmeta_headers) + self._assertInHeaders(resp, self.new_sysmeta_headers) + self._assertInHeaders(resp, self.original_sysmeta_headers_2) + self._assertInHeaders(resp, self.changed_meta_headers) + self._assertInHeaders(resp, self.new_meta_headers) + self._assertInHeaders(resp, self.original_meta_headers_2) + self._assertNotInHeaders(resp, self.bad_headers) + + req = Request.blank('/v1/a/c/o2', environ={}) + resp = req.get_response(self.app) + self._assertStatus(resp, 200) + self._assertInHeaders(resp, self.changed_sysmeta_headers) + self._assertInHeaders(resp, self.new_sysmeta_headers) + self._assertInHeaders(resp, self.original_sysmeta_headers_2) + self._assertInHeaders(resp, self.changed_meta_headers) + self._assertInHeaders(resp, self.new_meta_headers) + self._assertInHeaders(resp, self.original_meta_headers_2) + self._assertNotInHeaders(resp, self.bad_headers) + + def test_sysmeta_updated_by_COPY_from(self): + # check sysmeta is updated by a COPY in same way as user meta + path = '/v1/a/c/o' + env = {'REQUEST_METHOD': 'PUT'} + hdrs = dict(self.original_sysmeta_headers_1) + hdrs.update(self.original_sysmeta_headers_2) + hdrs.update(self.original_meta_headers_1) + hdrs.update(self.original_meta_headers_2) + req = Request.blank(path, environ=env, headers=hdrs, body='x') + resp = req.get_response(self.app) + self._assertStatus(resp, 201) + + env = {'REQUEST_METHOD': 'PUT'} + hdrs = dict(self.changed_sysmeta_headers) + hdrs.update(self.new_sysmeta_headers) + hdrs.update(self.changed_meta_headers) + hdrs.update(self.new_meta_headers) + hdrs.update(self.bad_headers) + hdrs.update({'X-Copy-From': '/c/o'}) + req = Request.blank('/v1/a/c/o2', environ=env, headers=hdrs, body='') + resp = req.get_response(self.app) + self._assertStatus(resp, 201) + self._assertInHeaders(resp, self.changed_sysmeta_headers) + self._assertInHeaders(resp, self.new_sysmeta_headers) + self._assertInHeaders(resp, self.original_sysmeta_headers_2) + self._assertInHeaders(resp, self.changed_meta_headers) + self._assertInHeaders(resp, self.new_meta_headers) + self._assertInHeaders(resp, self.original_meta_headers_2) + self._assertNotInHeaders(resp, self.bad_headers) + + req = Request.blank('/v1/a/c/o2', environ={}) + resp = req.get_response(self.app) + self._assertStatus(resp, 200) + self._assertInHeaders(resp, self.changed_sysmeta_headers) + self._assertInHeaders(resp, self.new_sysmeta_headers) + self._assertInHeaders(resp, self.original_sysmeta_headers_2) + self._assertInHeaders(resp, self.changed_meta_headers) + self._assertInHeaders(resp, self.new_meta_headers) + self._assertInHeaders(resp, self.original_meta_headers_2) + self._assertNotInHeaders(resp, self.bad_headers)