29c10db0cb
ssync currently does the wrong thing when replicating object dirs containing both a .data and a .meta file. The ssync sender uses a single PUT to send both object content and metadata to the receiver, using the metadata (.meta file) timestamp. This results in the object content timestamp being advanced to the metadata timestamp, potentially overwriting newer object data on the receiver and causing an inconsistency with the container server record for the object. For example, replicating an object dir with {t0.data(etag=x), t2.meta} to a receiver with t1.data(etag=y) will result in the creation of t2.data(etag=x) on the receiver. However, the container server will continue to list the object as t1(etag=y). This patch modifies ssync to replicate the content of .data and .meta separately using a PUT request for the data (no change) and a POST request for the metadata. In effect, ssync replication replicates the client operations that generated the .data and .meta files so that the result of replication is the same as if the original client requests had persisted on all object servers. Apart from maintaining correct timestamps across sync'd nodes, this has the added benefit of not needing to PUT objects when only the metadata has changed and a POST will suffice. Taking the same example, ssync sender will no longer PUT t0.data but will POST t2.meta resulting in the receiver having t1.data and t2.meta. The changes are backwards compatible: an upgraded sender will only sync data files to a legacy receiver and will not sync meta files (fixing the erroneous behavior described above); a legacy sender will operate as before when sync'ing to an upgraded receiver. Changes: - diskfile API provides methods to get the data file timestamp as distinct from the diskfile timestamp. - diskfile yield_hashes return tuple now passes a dict mapping data and meta (if any) timestamps to their respective values in the timestamp field. - ssync_sender will encode data and meta timestamps in the (hash_path, timestamp) tuple sent to the receiver during missing_checks. - ssync_receiver compares sender's data and meta timestamps to any local diskfile and may specify that only data or meta parts are sent during updates phase by appending a qualifier to the hash returned in its 'wanted' list. - ssync_sender now sends POST subrequests when a meta file exists and its content needs to be replicated. - ssync_sender may send *only* a POST if the receiver indicates that is the only part required to be sync'd. - object server will allow PUT and DELETE with earlier timestamp than a POST - Fixed TODO related to replicated objects with fast-POST and ssync Related spec change-id: I60688efc3df692d3a39557114dca8c5490f7837e Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com> Closes-Bug: 1501528 Change-Id: I97552d194e5cc342b0a3f4b9800de8aa6b9cb85b
425 lines
16 KiB
Python
425 lines
16 KiB
Python
#!/usr/bin/python -u
|
|
# Copyright (c) 2010-2012 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
from io import StringIO
|
|
from tempfile import mkdtemp
|
|
from textwrap import dedent
|
|
import unittest
|
|
|
|
import os
|
|
import shutil
|
|
import uuid
|
|
from swift.common.exceptions import DiskFileDeleted
|
|
|
|
from swift.container.backend import ContainerBroker
|
|
from swift.common import internal_client, utils
|
|
from swift.common.ring import Ring
|
|
from swift.common.utils import Timestamp, get_logger
|
|
from swift.obj.diskfile import DiskFileManager
|
|
from swift.common.storage_policy import POLICIES
|
|
|
|
from test.probe.brain import BrainSplitter
|
|
from test.probe.common import ReplProbeTest
|
|
|
|
|
|
class Test(ReplProbeTest):
|
|
def setUp(self):
|
|
"""
|
|
Reset all environment and start all servers.
|
|
"""
|
|
super(Test, self).setUp()
|
|
self.container_name = 'container-%s' % uuid.uuid4()
|
|
self.object_name = 'object-%s' % uuid.uuid4()
|
|
self.brain = BrainSplitter(self.url, self.token, self.container_name,
|
|
self.object_name, 'object',
|
|
policy=self.policy)
|
|
self.tempdir = mkdtemp()
|
|
conf_path = os.path.join(self.tempdir, 'internal_client.conf')
|
|
conf_body = """
|
|
[DEFAULT]
|
|
swift_dir = /etc/swift
|
|
|
|
[pipeline:main]
|
|
pipeline = catch_errors cache proxy-server
|
|
|
|
[app:proxy-server]
|
|
use = egg:swift#proxy
|
|
object_post_as_copy = false
|
|
|
|
[filter:cache]
|
|
use = egg:swift#memcache
|
|
|
|
[filter:catch_errors]
|
|
use = egg:swift#catch_errors
|
|
"""
|
|
with open(conf_path, 'w') as f:
|
|
f.write(dedent(conf_body))
|
|
self.int_client = internal_client.InternalClient(conf_path, 'test', 1)
|
|
|
|
def tearDown(self):
|
|
super(Test, self).tearDown()
|
|
shutil.rmtree(self.tempdir)
|
|
|
|
def _get_object_info(self, account, container, obj, number,
|
|
policy=None):
|
|
obj_conf = self.configs['object-server']
|
|
config_path = obj_conf[number]
|
|
options = utils.readconf(config_path, 'app:object-server')
|
|
swift_dir = options.get('swift_dir', '/etc/swift')
|
|
ring = POLICIES.get_object_ring(policy, swift_dir)
|
|
part, nodes = ring.get_nodes(account, container, obj)
|
|
for node in nodes:
|
|
# assumes one to one mapping
|
|
if node['port'] == int(options.get('bind_port')):
|
|
device = node['device']
|
|
break
|
|
else:
|
|
return None
|
|
mgr = DiskFileManager(options, get_logger(options))
|
|
disk_file = mgr.get_diskfile(device, part, account, container, obj,
|
|
policy)
|
|
info = disk_file.read_metadata()
|
|
return info
|
|
|
|
def _assert_consistent_object_metadata(self):
|
|
obj_info = []
|
|
for i in range(1, 5):
|
|
info_i = self._get_object_info(self.account, self.container_name,
|
|
self.object_name, i)
|
|
if info_i:
|
|
obj_info.append(info_i)
|
|
self.assertTrue(len(obj_info) > 1)
|
|
for other in obj_info[1:]:
|
|
self.assertEqual(obj_info[0], other,
|
|
'Object metadata mismatch: %s != %s'
|
|
% (obj_info[0], other))
|
|
|
|
def _assert_consistent_deleted_object(self):
|
|
for i in range(1, 5):
|
|
try:
|
|
info = self._get_object_info(self.account, self.container_name,
|
|
self.object_name, i)
|
|
if info is not None:
|
|
self.fail('Expected no disk file info but found %s' % info)
|
|
except DiskFileDeleted:
|
|
pass
|
|
|
|
def _get_db_info(self, account, container, number):
|
|
server_type = 'container'
|
|
obj_conf = self.configs['%s-server' % server_type]
|
|
config_path = obj_conf[number]
|
|
options = utils.readconf(config_path, 'app:container-server')
|
|
root = options.get('devices')
|
|
|
|
swift_dir = options.get('swift_dir', '/etc/swift')
|
|
ring = Ring(swift_dir, ring_name=server_type)
|
|
part, nodes = ring.get_nodes(account, container)
|
|
for node in nodes:
|
|
# assumes one to one mapping
|
|
if node['port'] == int(options.get('bind_port')):
|
|
device = node['device']
|
|
break
|
|
else:
|
|
return None
|
|
|
|
path_hash = utils.hash_path(account, container)
|
|
_dir = utils.storage_directory('%ss' % server_type, part, path_hash)
|
|
db_dir = os.path.join(root, device, _dir)
|
|
db_file = os.path.join(db_dir, '%s.db' % path_hash)
|
|
db = ContainerBroker(db_file)
|
|
return db.get_info()
|
|
|
|
def _assert_consistent_container_dbs(self):
|
|
db_info = []
|
|
for i in range(1, 5):
|
|
info_i = self._get_db_info(self.account, self.container_name, i)
|
|
if info_i:
|
|
db_info.append(info_i)
|
|
self.assertTrue(len(db_info) > 1)
|
|
for other in db_info[1:]:
|
|
self.assertEqual(db_info[0]['hash'], other['hash'],
|
|
'Container db hash mismatch: %s != %s'
|
|
% (db_info[0]['hash'], other['hash']))
|
|
|
|
def _assert_object_metadata_matches_listing(self, listing, metadata):
|
|
self.assertEqual(listing['bytes'], int(metadata['content-length']))
|
|
self.assertEqual(listing['hash'], metadata['etag'])
|
|
self.assertEqual(listing['content_type'], metadata['content-type'])
|
|
modified = Timestamp(metadata['x-timestamp']).isoformat
|
|
self.assertEqual(listing['last_modified'], modified)
|
|
|
|
def _put_object(self, headers=None, body=u'stuff'):
|
|
headers = headers or {}
|
|
self.int_client.upload_object(StringIO(body), self.account,
|
|
self.container_name,
|
|
self.object_name, headers)
|
|
|
|
def _post_object(self, headers):
|
|
self.int_client.set_object_metadata(self.account, self.container_name,
|
|
self.object_name, headers)
|
|
|
|
def _delete_object(self):
|
|
self.int_client.delete_object(self.account, self.container_name,
|
|
self.object_name)
|
|
|
|
def _get_object(self, headers=None, expect_statuses=(2,)):
|
|
return self.int_client.get_object(self.account,
|
|
self.container_name,
|
|
self.object_name,
|
|
headers,
|
|
acceptable_statuses=expect_statuses)
|
|
|
|
def _get_object_metadata(self):
|
|
return self.int_client.get_object_metadata(self.account,
|
|
self.container_name,
|
|
self.object_name)
|
|
|
|
def test_object_delete_is_replicated(self):
|
|
self.brain.put_container(policy_index=int(self.policy))
|
|
# put object
|
|
self._put_object()
|
|
|
|
# put newer object with sysmeta to first server subset
|
|
self.brain.stop_primary_half()
|
|
self._put_object()
|
|
self.brain.start_primary_half()
|
|
|
|
# delete object on second server subset
|
|
self.brain.stop_handoff_half()
|
|
self._delete_object()
|
|
self.brain.start_handoff_half()
|
|
|
|
# run replicator
|
|
self.get_to_final_state()
|
|
|
|
# check object deletion has been replicated on first server set
|
|
self.brain.stop_primary_half()
|
|
self._get_object(expect_statuses=(4,))
|
|
self.brain.start_primary_half()
|
|
|
|
# check object deletion persists on second server set
|
|
self.brain.stop_handoff_half()
|
|
self._get_object(expect_statuses=(4,))
|
|
|
|
# put newer object to second server set
|
|
self._put_object()
|
|
self.brain.start_handoff_half()
|
|
|
|
# run replicator
|
|
self.get_to_final_state()
|
|
|
|
# check new object has been replicated on first server set
|
|
self.brain.stop_primary_half()
|
|
self._get_object()
|
|
self.brain.start_primary_half()
|
|
|
|
# check new object persists on second server set
|
|
self.brain.stop_handoff_half()
|
|
self._get_object()
|
|
|
|
def test_object_after_replication_with_subsequent_post(self):
|
|
self.brain.put_container(policy_index=0)
|
|
|
|
# put object
|
|
self._put_object(headers={'Content-Type': 'foo'}, body=u'older')
|
|
|
|
# put newer object to first server subset
|
|
self.brain.stop_primary_half()
|
|
self._put_object(headers={'Content-Type': 'bar'}, body=u'newer')
|
|
metadata = self._get_object_metadata()
|
|
etag = metadata['etag']
|
|
self.brain.start_primary_half()
|
|
|
|
# post some user meta to all servers
|
|
self._post_object({'x-object-meta-bar': 'meta-bar'})
|
|
|
|
# run replicator
|
|
self.get_to_final_state()
|
|
|
|
# check that newer data has been replicated to second server subset
|
|
self.brain.stop_handoff_half()
|
|
metadata = self._get_object_metadata()
|
|
self.assertEqual(etag, metadata['etag'])
|
|
self.assertEqual('bar', metadata['content-type'])
|
|
self.assertEqual('meta-bar', metadata['x-object-meta-bar'])
|
|
self.brain.start_handoff_half()
|
|
|
|
self._assert_consistent_object_metadata()
|
|
self._assert_consistent_container_dbs()
|
|
|
|
def test_sysmeta_after_replication_with_subsequent_put(self):
|
|
sysmeta = {'x-object-sysmeta-foo': 'older'}
|
|
sysmeta2 = {'x-object-sysmeta-foo': 'newer'}
|
|
usermeta = {'x-object-meta-bar': 'meta-bar'}
|
|
self.brain.put_container(policy_index=0)
|
|
|
|
# put object with sysmeta to first server subset
|
|
self.brain.stop_primary_half()
|
|
self._put_object(headers=sysmeta)
|
|
metadata = self._get_object_metadata()
|
|
for key in sysmeta:
|
|
self.assertTrue(key in metadata)
|
|
self.assertEqual(metadata[key], sysmeta[key])
|
|
self.brain.start_primary_half()
|
|
|
|
# put object with updated sysmeta to second server subset
|
|
self.brain.stop_handoff_half()
|
|
self._put_object(headers=sysmeta2)
|
|
metadata = self._get_object_metadata()
|
|
for key in sysmeta2:
|
|
self.assertTrue(key in metadata)
|
|
self.assertEqual(metadata[key], sysmeta2[key])
|
|
self._post_object(usermeta)
|
|
metadata = self._get_object_metadata()
|
|
for key in usermeta:
|
|
self.assertTrue(key in metadata)
|
|
self.assertEqual(metadata[key], usermeta[key])
|
|
for key in sysmeta2:
|
|
self.assertTrue(key in metadata)
|
|
self.assertEqual(metadata[key], sysmeta2[key])
|
|
|
|
self.brain.start_handoff_half()
|
|
|
|
# run replicator
|
|
self.get_to_final_state()
|
|
|
|
# check sysmeta has been replicated to first server subset
|
|
self.brain.stop_primary_half()
|
|
metadata = self._get_object_metadata()
|
|
for key in usermeta:
|
|
self.assertTrue(key in metadata)
|
|
self.assertEqual(metadata[key], usermeta[key])
|
|
for key in sysmeta2.keys():
|
|
self.assertTrue(key in metadata, key)
|
|
self.assertEqual(metadata[key], sysmeta2[key])
|
|
self.brain.start_primary_half()
|
|
|
|
# check user sysmeta ok on second server subset
|
|
self.brain.stop_handoff_half()
|
|
metadata = self._get_object_metadata()
|
|
for key in usermeta:
|
|
self.assertTrue(key in metadata)
|
|
self.assertEqual(metadata[key], usermeta[key])
|
|
for key in sysmeta2.keys():
|
|
self.assertTrue(key in metadata, key)
|
|
self.assertEqual(metadata[key], sysmeta2[key])
|
|
|
|
self._assert_consistent_object_metadata()
|
|
self._assert_consistent_container_dbs()
|
|
|
|
def test_sysmeta_after_replication_with_subsequent_post(self):
|
|
sysmeta = {'x-object-sysmeta-foo': 'sysmeta-foo'}
|
|
usermeta = {'x-object-meta-bar': 'meta-bar'}
|
|
self.brain.put_container(policy_index=int(self.policy))
|
|
# put object
|
|
self._put_object()
|
|
# put newer object with sysmeta to first server subset
|
|
self.brain.stop_primary_half()
|
|
self._put_object(headers=sysmeta)
|
|
metadata = self._get_object_metadata()
|
|
for key in sysmeta:
|
|
self.assertTrue(key in metadata)
|
|
self.assertEqual(metadata[key], sysmeta[key])
|
|
self.brain.start_primary_half()
|
|
|
|
# post some user meta to second server subset
|
|
self.brain.stop_handoff_half()
|
|
self._post_object(usermeta)
|
|
metadata = self._get_object_metadata()
|
|
for key in usermeta:
|
|
self.assertTrue(key in metadata)
|
|
self.assertEqual(metadata[key], usermeta[key])
|
|
for key in sysmeta:
|
|
self.assertFalse(key in metadata)
|
|
self.brain.start_handoff_half()
|
|
|
|
# run replicator
|
|
self.get_to_final_state()
|
|
|
|
# check user metadata has been replicated to first server subset
|
|
# and sysmeta is unchanged
|
|
self.brain.stop_primary_half()
|
|
metadata = self._get_object_metadata()
|
|
expected = dict(sysmeta)
|
|
expected.update(usermeta)
|
|
for key in expected.keys():
|
|
self.assertTrue(key in metadata, key)
|
|
self.assertEqual(metadata[key], expected[key])
|
|
self.brain.start_primary_half()
|
|
|
|
# check user metadata and sysmeta both on second server subset
|
|
self.brain.stop_handoff_half()
|
|
metadata = self._get_object_metadata()
|
|
for key in expected.keys():
|
|
self.assertTrue(key in metadata, key)
|
|
self.assertEqual(metadata[key], expected[key])
|
|
self._assert_consistent_object_metadata()
|
|
self._assert_consistent_container_dbs()
|
|
|
|
def test_sysmeta_after_replication_with_prior_post(self):
|
|
sysmeta = {'x-object-sysmeta-foo': 'sysmeta-foo'}
|
|
usermeta = {'x-object-meta-bar': 'meta-bar'}
|
|
self.brain.put_container(policy_index=int(self.policy))
|
|
# put object
|
|
self._put_object()
|
|
|
|
# put user meta to first server subset
|
|
self.brain.stop_handoff_half()
|
|
self._post_object(headers=usermeta)
|
|
metadata = self._get_object_metadata()
|
|
for key in usermeta:
|
|
self.assertTrue(key in metadata)
|
|
self.assertEqual(metadata[key], usermeta[key])
|
|
self.brain.start_handoff_half()
|
|
|
|
# put newer object with sysmeta to second server subset
|
|
self.brain.stop_primary_half()
|
|
self._put_object(headers=sysmeta)
|
|
metadata = self._get_object_metadata()
|
|
for key in sysmeta:
|
|
self.assertTrue(key in metadata)
|
|
self.assertEqual(metadata[key], sysmeta[key])
|
|
self.brain.start_primary_half()
|
|
|
|
# run replicator
|
|
self.get_to_final_state()
|
|
|
|
# check stale user metadata is not replicated to first server subset
|
|
# and sysmeta is unchanged
|
|
self.brain.stop_primary_half()
|
|
metadata = self._get_object_metadata()
|
|
for key in sysmeta:
|
|
self.assertTrue(key in metadata)
|
|
self.assertEqual(metadata[key], sysmeta[key])
|
|
for key in usermeta:
|
|
self.assertFalse(key in metadata)
|
|
self.brain.start_primary_half()
|
|
|
|
# check stale user metadata is removed from second server subset
|
|
# and sysmeta is replicated
|
|
self.brain.stop_handoff_half()
|
|
metadata = self._get_object_metadata()
|
|
for key in sysmeta:
|
|
self.assertTrue(key in metadata)
|
|
self.assertEqual(metadata[key], sysmeta[key])
|
|
for key in usermeta:
|
|
self.assertFalse(key in metadata)
|
|
self._assert_consistent_object_metadata()
|
|
self._assert_consistent_container_dbs()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|