Add POST capability to ssync for .meta files

ssync currently does the wrong thing when replicating object dirs
containing both a .data and a .meta file. The ssync sender uses a
single PUT to send both object content and metadata to the receiver,
using the metadata (.meta file) timestamp. This results in the object
content timestamp being advanced to the metadata timestamp,
potentially overwriting newer object data on the receiver and causing
an inconsistency with the container server record for the object.

For example, replicating an object dir with {t0.data(etag=x), t2.meta}
to a receiver with t1.data(etag=y) will result in the creation of
t2.data(etag=x) on the receiver. However, the container server will
continue to list the object as t1(etag=y).

This patch modifies ssync to replicate the content of .data and .meta
separately using a PUT request for the data (no change) and a POST
request for the metadata. In effect, ssync replication replicates the
client operations that generated the .data and .meta files so that
the result of replication is the same as if the original client requests
had persisted on all object servers.

Apart from maintaining correct timestamps across sync'd nodes, this has
the added benefit of not needing to PUT objects when only the metadata
has changed and a POST will suffice.

Taking the same example, ssync sender will no longer PUT t0.data but will
POST t2.meta resulting in the receiver having t1.data and t2.meta.

The changes are backwards compatible: an upgraded sender will only sync
data files to a legacy receiver and will not sync meta files (fixing the
erroneous behavior described above); a legacy sender will operate as
before when sync'ing to an upgraded receiver.

Changes:
- diskfile API provides methods to get the data file timestamp
  as distinct from the diskfile timestamp.

- diskfile yield_hashes return tuple now passes a dict mapping data and
  meta (if any) timestamps to their respective values in the timestamp
  field.

- ssync_sender will encode data and meta timestamps in the
  (hash_path, timestamp) tuple sent to the receiver during
  missing_checks.

- ssync_receiver compares sender's data and meta timestamps to any
  local diskfile and may specify that only data or meta parts are sent
  during updates phase by appending a qualifier to the hash returned
  in its 'wanted' list.

- ssync_sender now sends POST subrequests when a meta file
  exists and its content needs to be replicated.

- ssync_sender may send *only* a POST if the receiver indicates that
  is the only part required to be sync'd.

- object server will allow PUT and DELETE with earlier timestamp than
  a POST

- Fixed TODO related to replicated objects with fast-POST and ssync

Related spec change-id: I60688efc3df692d3a39557114dca8c5490f7837e

Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com>
Closes-Bug: 1501528
Change-Id: I97552d194e5cc342b0a3f4b9800de8aa6b9cb85b
This commit is contained in:
Alistair Coles 2015-04-22 12:56:50 +01:00
parent 167f3c8cbd
commit 29c10db0cb
14 changed files with 1659 additions and 284 deletions

View File

@ -692,6 +692,7 @@ def drop_buffer_cache(fd, offset, length):
NORMAL_FORMAT = "%016.05f" NORMAL_FORMAT = "%016.05f"
INTERNAL_FORMAT = NORMAL_FORMAT + '_%016x' INTERNAL_FORMAT = NORMAL_FORMAT + '_%016x'
MAX_OFFSET = (16 ** 16) - 1 MAX_OFFSET = (16 ** 16) - 1
PRECISION = 1e-5
# Setting this to True will cause the internal format to always display # Setting this to True will cause the internal format to always display
# extended digits - even when the value is equivalent to the normalized form. # extended digits - even when the value is equivalent to the normalized form.
# This isn't ideal during an upgrade when some servers might not understand # This isn't ideal during an upgrade when some servers might not understand
@ -736,7 +737,20 @@ class Timestamp(object):
compatible for normalized timestamps which do not include an offset. compatible for normalized timestamps which do not include an offset.
""" """
def __init__(self, timestamp, offset=0): def __init__(self, timestamp, offset=0, delta=0):
"""
Create a new Timestamp.
:param timestamp: time in seconds since the Epoch, may be any of:
* a float or integer
* normalized/internalized string
* another instance of this class (offset is preserved)
:param offset: the second internal offset vector, an int
:param delta: deca-microsecond difference from the base timestamp
param, an int
"""
if isinstance(timestamp, basestring): if isinstance(timestamp, basestring):
parts = timestamp.split('_', 1) parts = timestamp.split('_', 1)
self.timestamp = float(parts.pop(0)) self.timestamp = float(parts.pop(0))
@ -754,6 +768,14 @@ class Timestamp(object):
raise ValueError('offset must be non-negative') raise ValueError('offset must be non-negative')
if self.offset > MAX_OFFSET: if self.offset > MAX_OFFSET:
raise ValueError('offset must be smaller than %d' % MAX_OFFSET) raise ValueError('offset must be smaller than %d' % MAX_OFFSET)
self.raw = int(round(self.timestamp / PRECISION))
# add delta
if delta:
self.raw = self.raw + delta
if self.raw <= 0:
raise ValueError(
'delta must be greater than %d' % (-1 * self.raw))
self.timestamp = float(self.raw * PRECISION)
def __repr__(self): def __repr__(self):
return INTERNAL_FORMAT % (self.timestamp, self.offset) return INTERNAL_FORMAT % (self.timestamp, self.offset)

View File

@ -887,12 +887,18 @@ class BaseDiskFileManager(object):
def yield_hashes(self, device, partition, policy, def yield_hashes(self, device, partition, policy,
suffixes=None, **kwargs): suffixes=None, **kwargs):
""" """
Yields tuples of (full_path, hash_only, timestamp) for object Yields tuples of (full_path, hash_only, timestamps) for object
information stored for the given device, partition, and information stored for the given device, partition, and
(optionally) suffixes. If suffixes is None, all stored (optionally) suffixes. If suffixes is None, all stored
suffixes will be searched for object hashes. Note that if suffixes will be searched for object hashes. Note that if
suffixes is not None but empty, such as [], then nothing will suffixes is not None but empty, such as [], then nothing will
be yielded. be yielded.
timestamps is a dict which may contain items mapping:
ts_data -> timestamp of data or tombstone file,
ts_meta -> timestamp of meta file, if one exists
where timestamps are instances of
:class:`~swift.common.utils.Timestamp`
""" """
dev_path = self.get_dev_path(device) dev_path = self.get_dev_path(device)
if not dev_path: if not dev_path:
@ -906,27 +912,36 @@ class BaseDiskFileManager(object):
suffixes = ( suffixes = (
(os.path.join(partition_path, suffix), suffix) (os.path.join(partition_path, suffix), suffix)
for suffix in suffixes) for suffix in suffixes)
key_preference = (
('ts_meta', '.meta'),
('ts_data', '.data'),
('ts_data', '.ts'),
)
for suffix_path, suffix in suffixes: for suffix_path, suffix in suffixes:
for object_hash in self._listdir(suffix_path): for object_hash in self._listdir(suffix_path):
object_path = os.path.join(suffix_path, object_hash) object_path = os.path.join(suffix_path, object_hash)
newest_valid_file = None
try: try:
results = self.cleanup_ondisk_files( results = self.cleanup_ondisk_files(
object_path, self.reclaim_age, **kwargs) object_path, self.reclaim_age, **kwargs)
newest_valid_file = (results.get('.meta') timestamps = {}
or results.get('.data') for ts_key, ext in key_preference:
or results.get('.ts')) if ext not in results:
if newest_valid_file: continue
timestamp = self.parse_on_disk_filename( timestamps[ts_key] = self.parse_on_disk_filename(
newest_valid_file)['timestamp'] results[ext])['timestamp']
yield (object_path, object_hash, timestamp.internal) if 'ts_data' not in timestamps:
# file sets that do not include a .data or .ts
# file can not be opened and therefore can not
# be ssync'd
continue
yield (object_path, object_hash, timestamps)
except AssertionError as err: except AssertionError as err:
self.logger.debug('Invalid file set in %s (%s)' % ( self.logger.debug('Invalid file set in %s (%s)' % (
object_path, err)) object_path, err))
except DiskFileError as err: except DiskFileError as err:
self.logger.debug( self.logger.debug(
'Invalid diskfile filename %r in %r (%s)' % ( 'Invalid diskfile filename in %r (%s)' % (
newest_valid_file, object_path, err)) object_path, err))
class BaseDiskFileWriter(object): class BaseDiskFileWriter(object):
@ -1414,6 +1429,8 @@ class BaseDiskFile(object):
self._datadir = None self._datadir = None
self._tmpdir = join(device_path, get_tmp_dir(policy)) self._tmpdir = join(device_path, get_tmp_dir(policy))
self._metadata = None self._metadata = None
self._datafile_metadata = None
self._metafile_metadata = None
self._data_file = None self._data_file = None
self._fp = None self._fp = None
self._quarantined_dir = None self._quarantined_dir = None
@ -1454,6 +1471,12 @@ class BaseDiskFile(object):
raise DiskFileNotOpen() raise DiskFileNotOpen()
return Timestamp(self._metadata.get('X-Timestamp')) return Timestamp(self._metadata.get('X-Timestamp'))
@property
def data_timestamp(self):
if self._datafile_metadata is None:
raise DiskFileNotOpen()
return Timestamp(self._datafile_metadata.get('X-Timestamp'))
@classmethod @classmethod
def from_hash_dir(cls, mgr, hash_dir_path, device_path, partition, policy): def from_hash_dir(cls, mgr, hash_dir_path, device_path, partition, policy):
return cls(mgr, device_path, None, partition, _datadir=hash_dir_path, return cls(mgr, device_path, None, partition, _datadir=hash_dir_path,
@ -1693,16 +1716,21 @@ class BaseDiskFile(object):
:func:`swift.obj.diskfile.DiskFile._verify_data_file` :func:`swift.obj.diskfile.DiskFile._verify_data_file`
""" """
fp = open(data_file, 'rb') fp = open(data_file, 'rb')
datafile_metadata = self._failsafe_read_metadata(fp, data_file) self._datafile_metadata = self._failsafe_read_metadata(fp, data_file)
self._metadata = {}
if meta_file: if meta_file:
self._metadata = self._failsafe_read_metadata(meta_file, meta_file) self._metafile_metadata = self._failsafe_read_metadata(
meta_file, meta_file)
sys_metadata = dict( sys_metadata = dict(
[(key, val) for key, val in datafile_metadata.items() [(key, val) for key, val in self._datafile_metadata.items()
if key.lower() in DATAFILE_SYSTEM_META if key.lower() in DATAFILE_SYSTEM_META
or is_sys_meta('object', key)]) or is_sys_meta('object', key)])
self._metadata.update(self._metafile_metadata)
self._metadata.update(sys_metadata) self._metadata.update(sys_metadata)
# diskfile writer added 'name' to metafile, so remove it here
self._metafile_metadata.pop('name', None)
else: else:
self._metadata = datafile_metadata self._metadata.update(self._datafile_metadata)
if self._name is None: if self._name is None:
# If we don't know our name, we were just given a hash dir at # If we don't know our name, we were just given a hash dir at
# instantiation, so we'd better validate that the name hashes back # instantiation, so we'd better validate that the name hashes back
@ -1712,6 +1740,37 @@ class BaseDiskFile(object):
self._verify_data_file(data_file, fp) self._verify_data_file(data_file, fp)
return fp return fp
def get_metafile_metadata(self):
"""
Provide the metafile metadata for a previously opened object as a
dictionary. This is metadata that was written by a POST and does not
include any persistent metadata that was set by the original PUT.
:returns: object's .meta file metadata dictionary, or None if there is
no .meta file
:raises DiskFileNotOpen: if the
:func:`swift.obj.diskfile.DiskFile.open` method was not previously
invoked
"""
if self._metadata is None:
raise DiskFileNotOpen()
return self._metafile_metadata
def get_datafile_metadata(self):
"""
Provide the datafile metadata for a previously opened object as a
dictionary. This is metadata that was included when the object was
first PUT, and does not include metadata set by any subsequent POST.
:returns: object's datafile metadata dictionary
:raises DiskFileNotOpen: if the
:func:`swift.obj.diskfile.DiskFile.open` method was not previously
invoked
"""
if self._datafile_metadata is None:
raise DiskFileNotOpen()
return self._datafile_metadata
def get_metadata(self): def get_metadata(self):
""" """
Provide the metadata for a previously opened object as a dictionary. Provide the metadata for a previously opened object as a dictionary.
@ -1956,9 +2015,9 @@ class DiskFileManager(BaseDiskFileManager):
if have_valid_fileset() or not accept_first(): if have_valid_fileset() or not accept_first():
# newer .data or .ts already found so discard this # newer .data or .ts already found so discard this
discard() discard()
# if not have_valid_fileset(): if not have_valid_fileset():
# # remove any .meta that may have been previously found # remove any .meta that may have been previously found
# context['.meta'] = None context.pop('.meta', None)
set_valid_fileset() set_valid_fileset()
elif ext == '.meta': elif ext == '.meta':
if have_valid_fileset() or not accept_first(): if have_valid_fileset() or not accept_first():
@ -1972,14 +2031,14 @@ class DiskFileManager(BaseDiskFileManager):
def _verify_on_disk_files(self, accepted_files, **kwargs): def _verify_on_disk_files(self, accepted_files, **kwargs):
""" """
Verify that the final combination of on disk files complies with the Verify that the final combination of on disk files complies with the
diskfile contract. replicated diskfile contract.
:param accepted_files: files that have been found and accepted :param accepted_files: files that have been found and accepted
:returns: True if the file combination is compliant, False otherwise :returns: True if the file combination is compliant, False otherwise
""" """
# mimic legacy behavior - .meta is ignored when .ts is found # mimic legacy behavior - .meta is ignored when .ts is found
if accepted_files.get('.ts'): if accepted_files.get('.ts'):
accepted_files['.meta'] = None accepted_files.pop('.meta', None)
data_file, meta_file, ts_file, durable_file = tuple( data_file, meta_file, ts_file, durable_file = tuple(
[accepted_files.get(ext) [accepted_files.get(ext)
@ -2298,7 +2357,7 @@ class ECDiskFileManager(BaseDiskFileManager):
discard() discard()
if not have_valid_fileset(): if not have_valid_fileset():
# remove any .meta that may have been previously found # remove any .meta that may have been previously found
context['.meta'] = None context.pop('.meta', None)
set_valid_fileset() set_valid_fileset()
elif ext in ('.meta', '.durable'): elif ext in ('.meta', '.durable'):
if have_valid_fileset() or not accept_first(): if have_valid_fileset() or not accept_first():
@ -2312,7 +2371,7 @@ class ECDiskFileManager(BaseDiskFileManager):
def _verify_on_disk_files(self, accepted_files, frag_index=None, **kwargs): def _verify_on_disk_files(self, accepted_files, frag_index=None, **kwargs):
""" """
Verify that the final combination of on disk files complies with the Verify that the final combination of on disk files complies with the
diskfile contract. erasure-coded diskfile contract.
:param accepted_files: files that have been found and accepted :param accepted_files: files that have been found and accepted
:param frag_index: specifies a specific fragment index .data file :param frag_index: specifies a specific fragment index .data file

View File

@ -413,3 +413,11 @@ class DiskFile(object):
fp, md = self._filesystem.get_object(self._name) fp, md = self._filesystem.get_object(self._name)
if md and md['X-Timestamp'] < Timestamp(timestamp): if md and md['X-Timestamp'] < Timestamp(timestamp):
self._filesystem.del_object(self._name) self._filesystem.del_object(self._name)
@property
def timestamp(self):
if self._metadata is None:
raise DiskFileNotOpen()
return Timestamp(self._metadata.get('X-Timestamp'))
data_timestamp = timestamp

View File

@ -71,24 +71,27 @@ class RebuildingECDiskFileStream(object):
metadata in the DiskFile interface for ssync. metadata in the DiskFile interface for ssync.
""" """
def __init__(self, metadata, frag_index, rebuilt_fragment_iter): def __init__(self, datafile_metadata, frag_index, rebuilt_fragment_iter):
# start with metadata from a participating FA # start with metadata from a participating FA
self.metadata = metadata self.datafile_metadata = datafile_metadata
# the new FA is going to have the same length as others in the set # the new FA is going to have the same length as others in the set
self._content_length = self.metadata['Content-Length'] self._content_length = self.datafile_metadata['Content-Length']
# update the FI and delete the ETag, the obj server will # update the FI and delete the ETag, the obj server will
# recalc on the other side... # recalc on the other side...
self.metadata['X-Object-Sysmeta-Ec-Frag-Index'] = frag_index self.datafile_metadata['X-Object-Sysmeta-Ec-Frag-Index'] = frag_index
for etag_key in ('ETag', 'Etag'): for etag_key in ('ETag', 'Etag'):
self.metadata.pop(etag_key, None) self.datafile_metadata.pop(etag_key, None)
self.frag_index = frag_index self.frag_index = frag_index
self.rebuilt_fragment_iter = rebuilt_fragment_iter self.rebuilt_fragment_iter = rebuilt_fragment_iter
def get_metadata(self): def get_metadata(self):
return self.metadata return self.datafile_metadata
def get_datafile_metadata(self):
return self.datafile_metadata
@property @property
def content_length(self): def content_length(self):
@ -218,7 +221,7 @@ class ObjectReconstructor(Daemon):
'full_path': self._full_path(node, part, path, policy)}) 'full_path': self._full_path(node, part, path, policy)})
return resp return resp
def reconstruct_fa(self, job, node, metadata): def reconstruct_fa(self, job, node, datafile_metadata):
""" """
Reconstructs a fragment archive - this method is called from ssync Reconstructs a fragment archive - this method is called from ssync
after a remote node responds that is missing this object - the local after a remote node responds that is missing this object - the local
@ -227,7 +230,8 @@ class ObjectReconstructor(Daemon):
:param job: job from ssync_sender :param job: job from ssync_sender
:param node: node that we're rebuilding to :param node: node that we're rebuilding to
:param metadata: the metadata to attach to the rebuilt archive :param datafile_metadata: the datafile metadata to attach to
the rebuilt fragment archive
:returns: a DiskFile like class for use by ssync :returns: a DiskFile like class for use by ssync
:raises DiskFileError: if the fragment archive cannot be reconstructed :raises DiskFileError: if the fragment archive cannot be reconstructed
""" """
@ -244,7 +248,7 @@ class ObjectReconstructor(Daemon):
headers = self.headers.copy() headers = self.headers.copy()
headers['X-Backend-Storage-Policy-Index'] = int(job['policy']) headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
pile = GreenAsyncPile(len(part_nodes)) pile = GreenAsyncPile(len(part_nodes))
path = metadata['name'] path = datafile_metadata['name']
for node in part_nodes: for node in part_nodes:
pile.spawn(self._get_response, node, job['partition'], pile.spawn(self._get_response, node, job['partition'],
path, headers, job['policy']) path, headers, job['policy'])
@ -277,14 +281,14 @@ class ObjectReconstructor(Daemon):
'to reconstruct %s with ETag %s' % ( 'to reconstruct %s with ETag %s' % (
len(responses), job['policy'].ec_ndata, len(responses), job['policy'].ec_ndata,
self._full_path(node, job['partition'], self._full_path(node, job['partition'],
metadata['name'], job['policy']), datafile_metadata['name'], job['policy']),
etag)) etag))
raise DiskFileError('Unable to reconstruct EC archive') raise DiskFileError('Unable to reconstruct EC archive')
rebuilt_fragment_iter = self.make_rebuilt_fragment_iter( rebuilt_fragment_iter = self.make_rebuilt_fragment_iter(
responses[:job['policy'].ec_ndata], path, job['policy'], responses[:job['policy'].ec_ndata], path, job['policy'],
fi_to_rebuild) fi_to_rebuild)
return RebuildingECDiskFileStream(metadata, fi_to_rebuild, return RebuildingECDiskFileStream(datafile_metadata, fi_to_rebuild,
rebuilt_fragment_iter) rebuilt_fragment_iter)
def _reconstruct(self, policy, fragment_payload, frag_index): def _reconstruct(self, policy, fragment_payload, frag_index):
@ -536,17 +540,17 @@ class ObjectReconstructor(Daemon):
:param frag_index: (int) the fragment index of data files to be deleted :param frag_index: (int) the fragment index of data files to be deleted
""" """
df_mgr = self._df_router[job['policy']] df_mgr = self._df_router[job['policy']]
for object_hash, timestamp in objects.items(): for object_hash, timestamps in objects.items():
try: try:
df = df_mgr.get_diskfile_from_hash( df = df_mgr.get_diskfile_from_hash(
job['local_dev']['device'], job['partition'], job['local_dev']['device'], job['partition'],
object_hash, job['policy'], object_hash, job['policy'],
frag_index=frag_index) frag_index=frag_index)
df.purge(Timestamp(timestamp), frag_index) df.purge(timestamps['ts_data'], frag_index)
except DiskFileError: except DiskFileError:
self.logger.exception( self.logger.exception(
'Unable to purge DiskFile (%r %r %r)', 'Unable to purge DiskFile (%r %r %r)',
object_hash, timestamp, frag_index) object_hash, timestamps['ts_data'], frag_index)
continue continue
def process_job(self, job): def process_job(self, job):

View File

@ -487,7 +487,11 @@ class ObjectController(BaseStorageServer):
self._preserve_slo_manifest(metadata, orig_metadata) self._preserve_slo_manifest(metadata, orig_metadata)
metadata.update(val for val in request.headers.items() metadata.update(val for val in request.headers.items()
if is_user_meta('object', val[0])) if is_user_meta('object', val[0]))
for header_key in self.allowed_headers: headers_to_copy = (
request.headers.get(
'X-Backend-Replication-Headers', '').split() +
list(self.allowed_headers))
for header_key in headers_to_copy:
if header_key in request.headers: if header_key in request.headers:
header_caps = header_key.title() header_caps = header_key.title()
metadata[header_caps] = request.headers[header_key] metadata[header_caps] = request.headers[header_key]
@ -549,10 +553,12 @@ class ObjectController(BaseStorageServer):
return HTTPInsufficientStorage(drive=device, request=request) return HTTPInsufficientStorage(drive=device, request=request)
try: try:
orig_metadata = disk_file.read_metadata() orig_metadata = disk_file.read_metadata()
orig_timestamp = disk_file.data_timestamp
except DiskFileXattrNotSupported: except DiskFileXattrNotSupported:
return HTTPInsufficientStorage(drive=device, request=request) return HTTPInsufficientStorage(drive=device, request=request)
except (DiskFileNotExist, DiskFileQuarantined): except (DiskFileNotExist, DiskFileQuarantined):
orig_metadata = {} orig_metadata = {}
orig_timestamp = 0
# Checks for If-None-Match # Checks for If-None-Match
if request.if_none_match is not None and orig_metadata: if request.if_none_match is not None and orig_metadata:
@ -563,7 +569,6 @@ class ObjectController(BaseStorageServer):
# The current ETag matches, so return 412 # The current ETag matches, so return 412
return HTTPPreconditionFailed(request=request) return HTTPPreconditionFailed(request=request)
orig_timestamp = Timestamp(orig_metadata.get('X-Timestamp', 0))
if orig_timestamp >= req_timestamp: if orig_timestamp >= req_timestamp:
return HTTPConflict( return HTTPConflict(
request=request, request=request,
@ -856,7 +861,7 @@ class ObjectController(BaseStorageServer):
orig_metadata = {} orig_metadata = {}
response_class = HTTPNotFound response_class = HTTPNotFound
else: else:
orig_timestamp = Timestamp(orig_metadata.get('X-Timestamp', 0)) orig_timestamp = disk_file.data_timestamp
if orig_timestamp < req_timestamp: if orig_timestamp < req_timestamp:
response_class = HTTPNoContent response_class = HTTPNoContent
else: else:

View File

@ -24,6 +24,62 @@ from swift.common import http
from swift.common import swob from swift.common import swob
from swift.common import utils from swift.common import utils
from swift.common import request_helpers from swift.common import request_helpers
from swift.common.utils import Timestamp
def decode_missing(line):
"""
Parse a string of the form generated by
:py:func:`~swift.obj.ssync_sender.encode_missing` and return a dict
with keys ``object_hash``, ``ts_data``, ``ts_meta``.
The encoder for this line is
:py:func:`~swift.obj.ssync_sender.encode_missing`
"""
result = {}
parts = line.split()
result['object_hash'], t_data = (urllib.unquote(v) for v in parts[:2])
result['ts_data'] = result['ts_meta'] = Timestamp(t_data)
if len(parts) > 2:
# allow for a comma separated list of k:v pairs to future-proof
subparts = urllib.unquote(parts[2]).split(',')
for item in [subpart for subpart in subparts if ':' in subpart]:
k, v = item.split(':')
if k == 'm':
result['ts_meta'] = Timestamp(t_data, delta=int(v, 16))
return result
def encode_wanted(remote, local):
"""
Compare a remote and local results and generate a wanted line.
:param remote: a dict, with ts_data and ts_meta keys in the form
returned by :py:func:`decode_missing`
:param local: a dict, possibly empty, with ts_data and ts_meta keys
in the form returned :py:meth:`Receiver._check_local`
The decoder for this line is
:py:func:`~swift.obj.ssync_sender.decode_wanted`
"""
want = {}
if 'ts_data' in local:
# we have something, let's get just the right stuff
if remote['ts_data'] > local['ts_data']:
want['data'] = True
if 'ts_meta' in local and remote['ts_meta'] > local['ts_meta']:
want['meta'] = True
else:
# we got nothing, so we'll take whatever the remote has
want['data'] = True
want['meta'] = True
if want:
# this is the inverse of _decode_wanted's key_map
key_map = dict(data='d', meta='m')
parts = ''.join(v for k, v in sorted(key_map.items()) if want.get(k))
return '%s %s' % (urllib.quote(remote['object_hash']), parts)
return None
class Receiver(object): class Receiver(object):
@ -185,6 +241,42 @@ class Receiver(object):
raise swob.HTTPInsufficientStorage(drive=self.device) raise swob.HTTPInsufficientStorage(drive=self.device)
self.fp = self.request.environ['wsgi.input'] self.fp = self.request.environ['wsgi.input']
def _check_local(self, object_hash):
"""
Parse local diskfile and return results of current
representative for comparison to remote.
:param object_hash: the hash of the remote object being offered
"""
try:
df = self.diskfile_mgr.get_diskfile_from_hash(
self.device, self.partition, object_hash,
self.policy, frag_index=self.frag_index)
except exceptions.DiskFileNotExist:
return {}
try:
df.open()
except exceptions.DiskFileDeleted as err:
return {'ts_data': err.timestamp}
except exceptions.DiskFileError as err:
return {}
return {
'ts_data': df.data_timestamp,
'ts_meta': df.timestamp,
}
def _check_missing(self, line):
"""
Parse offered object from sender, and compare to local diskfile,
responding with proper protocol line to represented needed data
or None if in sync.
Anchor point for tests to mock legacy protocol changes.
"""
remote = decode_missing(line)
local = self._check_local(remote['object_hash'])
return encode_wanted(remote, local)
def missing_check(self): def missing_check(self):
""" """
Handles the receiver-side of the MISSING_CHECK step of a Handles the receiver-side of the MISSING_CHECK step of a
@ -208,8 +300,14 @@ class Receiver(object):
4. Receiver gets `:MISSING_CHECK: END`, responds with 4. Receiver gets `:MISSING_CHECK: END`, responds with
`:MISSING_CHECK: START`, followed by the list of `:MISSING_CHECK: START`, followed by the list of
hashes it collected as being wanted (one per line), <wanted_hash> specifiers it collected as being wanted
`:MISSING_CHECK: END`, and flushes any buffers. (one per line), `:MISSING_CHECK: END`, and flushes any
buffers.
Each <wanted_hash> specifier has the form <hash>[ <parts>] where
<parts> is a string containing characters 'd' and/or 'm'
indicating that only data or meta part of object respectively is
required to be sync'd.
5. Sender gets `:MISSING_CHECK: START` and reads the list 5. Sender gets `:MISSING_CHECK: START` and reads the list
of hashes desired by the receiver until reading of hashes desired by the receiver until reading
@ -232,26 +330,9 @@ class Receiver(object):
line = self.fp.readline(self.app.network_chunk_size) line = self.fp.readline(self.app.network_chunk_size)
if not line or line.strip() == ':MISSING_CHECK: END': if not line or line.strip() == ':MISSING_CHECK: END':
break break
parts = line.split() want = self._check_missing(line)
object_hash, timestamp = [urllib.unquote(v) for v in parts[:2]]
want = False
try:
df = self.diskfile_mgr.get_diskfile_from_hash(
self.device, self.partition, object_hash, self.policy,
frag_index=self.frag_index)
except exceptions.DiskFileNotExist:
want = True
else:
try:
df.open()
except exceptions.DiskFileDeleted as err:
want = err.timestamp < timestamp
except exceptions.DiskFileError as err:
want = True
else:
want = df.timestamp < timestamp
if want: if want:
object_hashes.append(object_hash) object_hashes.append(want)
yield ':MISSING_CHECK: START\r\n' yield ':MISSING_CHECK: START\r\n'
if object_hashes: if object_hashes:
yield '\r\n'.join(object_hashes) yield '\r\n'.join(object_hashes)
@ -338,10 +419,11 @@ class Receiver(object):
if header == 'content-length': if header == 'content-length':
content_length = int(value) content_length = int(value)
# Establish subrequest body, if needed. # Establish subrequest body, if needed.
if method == 'DELETE': if method in ('DELETE', 'POST'):
if content_length not in (None, 0): if content_length not in (None, 0):
raise Exception( raise Exception(
'DELETE subrequest with content-length %s' % path) '%s subrequest with content-length %s'
% (method, path))
elif method == 'PUT': elif method == 'PUT':
if content_length is None: if content_length is None:
raise Exception( raise Exception(

View File

@ -20,6 +20,48 @@ from swift.common import exceptions
from swift.common import http from swift.common import http
def encode_missing(object_hash, ts_data, ts_meta=None):
"""
Returns a string representing the object hash, its data file timestamp
and the delta forwards to its metafile timestamp, if non-zero, in the form:
``<hash> <timestamp> m:<hex delta>``
The decoder for this line is
:py:func:`~swift.obj.ssync_receiver.decode_missing`
"""
msg = '%s %s' % (urllib.quote(object_hash), urllib.quote(ts_data.internal))
if ts_meta and ts_meta != ts_data:
delta = ts_meta.raw - ts_data.raw
msg = '%s m:%x' % (msg, delta)
return msg
def decode_wanted(parts):
"""
Parse missing_check line parts to determine which parts of local
diskfile were wanted by the receiver.
The encoder for parts is
:py:func:`~swift.obj.ssync_receiver.encode_wanted`
"""
wanted = {}
key_map = dict(d='data', m='meta')
if parts:
# receiver specified data and/or meta wanted, so use those as
# conditions for sending PUT and/or POST subrequests
for k in key_map:
if k in parts[0]:
wanted[key_map[k]] = True
if not wanted:
# assume legacy receiver which will only accept PUTs. There is no
# way to send any meta file content without morphing the timestamp
# of either the data or the metadata, so we just send data file
# content to a legacy receiver. Once the receiver gets updated we
# will be able to send it the meta file content.
wanted['data'] = True
return wanted
class Sender(object): class Sender(object):
""" """
Sends SSYNC requests to the object server. Sends SSYNC requests to the object server.
@ -40,14 +82,15 @@ class Sender(object):
self.response_buffer = '' self.response_buffer = ''
self.response_chunk_left = 0 self.response_chunk_left = 0
# available_map has an entry for each object in given suffixes that # available_map has an entry for each object in given suffixes that
# is available to be sync'd; each entry is a hash => timestamp # is available to be sync'd; each entry is a hash => dict of timestamps
# of data file or tombstone file and/or meta file
self.available_map = {} self.available_map = {}
# When remote_check_objs is given in job, ssync_sender trys only to # When remote_check_objs is given in job, ssync_sender trys only to
# make sure those objects exist or not in remote. # make sure those objects exist or not in remote.
self.remote_check_objs = remote_check_objs self.remote_check_objs = remote_check_objs
# send_list has an entry for each object that the receiver wants to # send_map has an entry for each object that the receiver wants to
# be sync'ed; each entry is an object hash # be sync'ed; each entry maps an object hash => dict of wanted parts
self.send_list = [] self.send_map = {}
self.failures = 0 self.failures = 0
def __call__(self): def __call__(self):
@ -57,7 +100,8 @@ class Sender(object):
:returns: a 2-tuple, in the form (success, can_delete_objs) where :returns: a 2-tuple, in the form (success, can_delete_objs) where
success is a boolean and can_delete_objs is the map of success is a boolean and can_delete_objs is the map of
objects that are in sync with the receiver. Each entry in objects that are in sync with the receiver. Each entry in
can_delete_objs maps a hash => timestamp can_delete_objs maps a hash => timestamp of data file or
tombstone file
""" """
if not self.suffixes: if not self.suffixes:
return True, {} return True, {}
@ -79,7 +123,7 @@ class Sender(object):
# *send* any requested updates; instead we only collect # *send* any requested updates; instead we only collect
# what's already in sync and safe for deletion # what's already in sync and safe for deletion
in_sync_hashes = (set(self.available_map.keys()) - in_sync_hashes = (set(self.available_map.keys()) -
set(self.send_list)) set(self.send_map.keys()))
can_delete_obj = dict((hash_, self.available_map[hash_]) can_delete_obj = dict((hash_, self.available_map[hash_])
for hash_ in in_sync_hashes) for hash_ in in_sync_hashes)
if not self.failures: if not self.failures:
@ -220,17 +264,15 @@ class Sender(object):
frag_index=self.job.get('frag_index')) frag_index=self.job.get('frag_index'))
if self.remote_check_objs is not None: if self.remote_check_objs is not None:
hash_gen = ifilter( hash_gen = ifilter(
lambda path_objhash_timestamp: lambda path_objhash_timestamps:
path_objhash_timestamp[1] in path_objhash_timestamps[1] in
self.remote_check_objs, hash_gen) self.remote_check_objs, hash_gen)
for path, object_hash, timestamp in hash_gen: for path, object_hash, timestamps in hash_gen:
self.available_map[object_hash] = timestamp self.available_map[object_hash] = timestamps
with exceptions.MessageTimeout( with exceptions.MessageTimeout(
self.daemon.node_timeout, self.daemon.node_timeout,
'missing_check send line'): 'missing_check send line'):
msg = '%s %s\r\n' % ( msg = '%s\r\n' % encode_missing(object_hash, **timestamps)
urllib.quote(object_hash),
urllib.quote(timestamp))
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg)) self.connection.send('%x\r\n%s\r\n' % (len(msg), msg))
with exceptions.MessageTimeout( with exceptions.MessageTimeout(
self.daemon.node_timeout, 'missing_check end'): self.daemon.node_timeout, 'missing_check end'):
@ -260,7 +302,7 @@ class Sender(object):
break break
parts = line.split() parts = line.split()
if parts: if parts:
self.send_list.append(parts[0]) self.send_map[parts[0]] = decode_wanted(parts[1:])
def updates(self): def updates(self):
""" """
@ -270,12 +312,13 @@ class Sender(object):
Full documentation of this can be found at Full documentation of this can be found at
:py:meth:`.Receiver.updates`. :py:meth:`.Receiver.updates`.
""" """
# First, send all our subrequests based on the send_list. # First, send all our subrequests based on the send_map.
with exceptions.MessageTimeout( with exceptions.MessageTimeout(
self.daemon.node_timeout, 'updates start'): self.daemon.node_timeout, 'updates start'):
msg = ':UPDATES: START\r\n' msg = ':UPDATES: START\r\n'
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg)) self.connection.send('%x\r\n%s\r\n' % (len(msg), msg))
for object_hash in self.send_list: for object_hash, want in self.send_map.items():
object_hash = urllib.unquote(object_hash)
try: try:
df = self.df_mgr.get_diskfile_from_hash( df = self.df_mgr.get_diskfile_from_hash(
self.job['device'], self.job['partition'], object_hash, self.job['device'], self.job['partition'], object_hash,
@ -286,16 +329,21 @@ class Sender(object):
'/%s/%s/%s' % (df.account, df.container, df.obj)) '/%s/%s/%s' % (df.account, df.container, df.obj))
try: try:
df.open() df.open()
# EC reconstructor may have passed a callback to build if want.get('data'):
# an alternative diskfile... # EC reconstructor may have passed a callback to build an
df = self.job.get('sync_diskfile_builder', lambda *args: df)( # alternative diskfile - construct it using the metadata
self.job, self.node, df.get_metadata()) # from the data file only.
df_alt = self.job.get(
'sync_diskfile_builder', lambda *args: df)(
self.job, self.node, df.get_datafile_metadata())
self.send_put(url_path, df_alt)
if want.get('meta') and df.data_timestamp != df.timestamp:
self.send_post(url_path, df)
except exceptions.DiskFileDeleted as err: except exceptions.DiskFileDeleted as err:
self.send_delete(url_path, err.timestamp) if want.get('data'):
self.send_delete(url_path, err.timestamp)
except exceptions.DiskFileError: except exceptions.DiskFileError:
pass pass
else:
self.send_put(url_path, df)
with exceptions.MessageTimeout( with exceptions.MessageTimeout(
self.daemon.node_timeout, 'updates end'): self.daemon.node_timeout, 'updates end'):
msg = ':UPDATES: END\r\n' msg = ':UPDATES: END\r\n'
@ -343,7 +391,7 @@ class Sender(object):
""" """
msg = ['PUT ' + url_path, 'Content-Length: ' + str(df.content_length)] msg = ['PUT ' + url_path, 'Content-Length: ' + str(df.content_length)]
# Sorted to make it easier to test. # Sorted to make it easier to test.
for key, value in sorted(df.get_metadata().items()): for key, value in sorted(df.get_datafile_metadata().items()):
if key not in ('name', 'Content-Length'): if key not in ('name', 'Content-Length'):
msg.append('%s: %s' % (key, value)) msg.append('%s: %s' % (key, value))
msg = '\r\n'.join(msg) + '\r\n\r\n' msg = '\r\n'.join(msg) + '\r\n\r\n'
@ -354,6 +402,19 @@ class Sender(object):
self.daemon.node_timeout, 'send_put chunk'): self.daemon.node_timeout, 'send_put chunk'):
self.connection.send('%x\r\n%s\r\n' % (len(chunk), chunk)) self.connection.send('%x\r\n%s\r\n' % (len(chunk), chunk))
def send_post(self, url_path, df):
metadata = df.get_metafile_metadata()
if metadata is None:
return
msg = ['POST ' + url_path]
# Sorted to make it easier to test.
for key, value in sorted(metadata.items()):
msg.append('%s: %s' % (key, value))
msg = '\r\n'.join(msg) + '\r\n\r\n'
with exceptions.MessageTimeout(self.daemon.node_timeout, 'send_post'):
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg))
def disconnect(self): def disconnect(self):
""" """
Closes down the connection to the object server once done Closes down the connection to the object server once done

View File

@ -16,54 +16,24 @@
from io import StringIO from io import StringIO
from tempfile import mkdtemp from tempfile import mkdtemp
from textwrap import dedent from textwrap import dedent
import functools
import unittest import unittest
import os import os
import shutil import shutil
import uuid import uuid
from swift.common.exceptions import DiskFileDeleted
from swift.container.backend import ContainerBroker
from swift.common import internal_client, utils from swift.common import internal_client, utils
from swift.common.ring import Ring
from swift.common.utils import Timestamp, get_logger
from swift.obj.diskfile import DiskFileManager
from swift.common.storage_policy import POLICIES
from test.probe.brain import BrainSplitter from test.probe.brain import BrainSplitter
from test.probe.common import ReplProbeTest from test.probe.common import ReplProbeTest
def _sync_methods(object_server_config_paths):
"""
Get the set of all configured sync_methods for the object-replicator
sections in the list of config paths.
"""
sync_methods = set()
for config_path in object_server_config_paths:
options = utils.readconf(config_path, 'object-replicator')
sync_methods.add(options.get('sync_method', 'rsync'))
return sync_methods
def expected_failure_with_ssync(m):
"""
Wrapper for probetests that don't pass if you use ssync
"""
@functools.wraps(m)
def wrapper(self, *args, **kwargs):
obj_conf = self.configs['object-server']
config_paths = [v for k, v in obj_conf.items()
if k in self.brain.handoff_numbers]
using_ssync = 'ssync' in _sync_methods(config_paths)
failed = False
try:
return m(self, *args, **kwargs)
except AssertionError:
failed = True
if not using_ssync:
raise
finally:
if using_ssync and not failed:
self.fail('This test is expected to fail with ssync')
return wrapper
class Test(ReplProbeTest): class Test(ReplProbeTest):
def setUp(self): def setUp(self):
""" """
@ -102,9 +72,97 @@ class Test(ReplProbeTest):
super(Test, self).tearDown() super(Test, self).tearDown()
shutil.rmtree(self.tempdir) shutil.rmtree(self.tempdir)
def _put_object(self, headers=None): def _get_object_info(self, account, container, obj, number,
policy=None):
obj_conf = self.configs['object-server']
config_path = obj_conf[number]
options = utils.readconf(config_path, 'app:object-server')
swift_dir = options.get('swift_dir', '/etc/swift')
ring = POLICIES.get_object_ring(policy, swift_dir)
part, nodes = ring.get_nodes(account, container, obj)
for node in nodes:
# assumes one to one mapping
if node['port'] == int(options.get('bind_port')):
device = node['device']
break
else:
return None
mgr = DiskFileManager(options, get_logger(options))
disk_file = mgr.get_diskfile(device, part, account, container, obj,
policy)
info = disk_file.read_metadata()
return info
def _assert_consistent_object_metadata(self):
obj_info = []
for i in range(1, 5):
info_i = self._get_object_info(self.account, self.container_name,
self.object_name, i)
if info_i:
obj_info.append(info_i)
self.assertTrue(len(obj_info) > 1)
for other in obj_info[1:]:
self.assertEqual(obj_info[0], other,
'Object metadata mismatch: %s != %s'
% (obj_info[0], other))
def _assert_consistent_deleted_object(self):
for i in range(1, 5):
try:
info = self._get_object_info(self.account, self.container_name,
self.object_name, i)
if info is not None:
self.fail('Expected no disk file info but found %s' % info)
except DiskFileDeleted:
pass
def _get_db_info(self, account, container, number):
server_type = 'container'
obj_conf = self.configs['%s-server' % server_type]
config_path = obj_conf[number]
options = utils.readconf(config_path, 'app:container-server')
root = options.get('devices')
swift_dir = options.get('swift_dir', '/etc/swift')
ring = Ring(swift_dir, ring_name=server_type)
part, nodes = ring.get_nodes(account, container)
for node in nodes:
# assumes one to one mapping
if node['port'] == int(options.get('bind_port')):
device = node['device']
break
else:
return None
path_hash = utils.hash_path(account, container)
_dir = utils.storage_directory('%ss' % server_type, part, path_hash)
db_dir = os.path.join(root, device, _dir)
db_file = os.path.join(db_dir, '%s.db' % path_hash)
db = ContainerBroker(db_file)
return db.get_info()
def _assert_consistent_container_dbs(self):
db_info = []
for i in range(1, 5):
info_i = self._get_db_info(self.account, self.container_name, i)
if info_i:
db_info.append(info_i)
self.assertTrue(len(db_info) > 1)
for other in db_info[1:]:
self.assertEqual(db_info[0]['hash'], other['hash'],
'Container db hash mismatch: %s != %s'
% (db_info[0]['hash'], other['hash']))
def _assert_object_metadata_matches_listing(self, listing, metadata):
self.assertEqual(listing['bytes'], int(metadata['content-length']))
self.assertEqual(listing['hash'], metadata['etag'])
self.assertEqual(listing['content_type'], metadata['content-type'])
modified = Timestamp(metadata['x-timestamp']).isoformat
self.assertEqual(listing['last_modified'], modified)
def _put_object(self, headers=None, body=u'stuff'):
headers = headers or {} headers = headers or {}
self.int_client.upload_object(StringIO(u'stuff'), self.account, self.int_client.upload_object(StringIO(body), self.account,
self.container_name, self.container_name,
self.object_name, headers) self.object_name, headers)
@ -171,7 +229,96 @@ class Test(ReplProbeTest):
self.brain.stop_handoff_half() self.brain.stop_handoff_half()
self._get_object() self._get_object()
@expected_failure_with_ssync def test_object_after_replication_with_subsequent_post(self):
self.brain.put_container(policy_index=0)
# put object
self._put_object(headers={'Content-Type': 'foo'}, body=u'older')
# put newer object to first server subset
self.brain.stop_primary_half()
self._put_object(headers={'Content-Type': 'bar'}, body=u'newer')
metadata = self._get_object_metadata()
etag = metadata['etag']
self.brain.start_primary_half()
# post some user meta to all servers
self._post_object({'x-object-meta-bar': 'meta-bar'})
# run replicator
self.get_to_final_state()
# check that newer data has been replicated to second server subset
self.brain.stop_handoff_half()
metadata = self._get_object_metadata()
self.assertEqual(etag, metadata['etag'])
self.assertEqual('bar', metadata['content-type'])
self.assertEqual('meta-bar', metadata['x-object-meta-bar'])
self.brain.start_handoff_half()
self._assert_consistent_object_metadata()
self._assert_consistent_container_dbs()
def test_sysmeta_after_replication_with_subsequent_put(self):
sysmeta = {'x-object-sysmeta-foo': 'older'}
sysmeta2 = {'x-object-sysmeta-foo': 'newer'}
usermeta = {'x-object-meta-bar': 'meta-bar'}
self.brain.put_container(policy_index=0)
# put object with sysmeta to first server subset
self.brain.stop_primary_half()
self._put_object(headers=sysmeta)
metadata = self._get_object_metadata()
for key in sysmeta:
self.assertTrue(key in metadata)
self.assertEqual(metadata[key], sysmeta[key])
self.brain.start_primary_half()
# put object with updated sysmeta to second server subset
self.brain.stop_handoff_half()
self._put_object(headers=sysmeta2)
metadata = self._get_object_metadata()
for key in sysmeta2:
self.assertTrue(key in metadata)
self.assertEqual(metadata[key], sysmeta2[key])
self._post_object(usermeta)
metadata = self._get_object_metadata()
for key in usermeta:
self.assertTrue(key in metadata)
self.assertEqual(metadata[key], usermeta[key])
for key in sysmeta2:
self.assertTrue(key in metadata)
self.assertEqual(metadata[key], sysmeta2[key])
self.brain.start_handoff_half()
# run replicator
self.get_to_final_state()
# check sysmeta has been replicated to first server subset
self.brain.stop_primary_half()
metadata = self._get_object_metadata()
for key in usermeta:
self.assertTrue(key in metadata)
self.assertEqual(metadata[key], usermeta[key])
for key in sysmeta2.keys():
self.assertTrue(key in metadata, key)
self.assertEqual(metadata[key], sysmeta2[key])
self.brain.start_primary_half()
# check user sysmeta ok on second server subset
self.brain.stop_handoff_half()
metadata = self._get_object_metadata()
for key in usermeta:
self.assertTrue(key in metadata)
self.assertEqual(metadata[key], usermeta[key])
for key in sysmeta2.keys():
self.assertTrue(key in metadata, key)
self.assertEqual(metadata[key], sysmeta2[key])
self._assert_consistent_object_metadata()
self._assert_consistent_container_dbs()
def test_sysmeta_after_replication_with_subsequent_post(self): def test_sysmeta_after_replication_with_subsequent_post(self):
sysmeta = {'x-object-sysmeta-foo': 'sysmeta-foo'} sysmeta = {'x-object-sysmeta-foo': 'sysmeta-foo'}
usermeta = {'x-object-meta-bar': 'meta-bar'} usermeta = {'x-object-meta-bar': 'meta-bar'}
@ -218,6 +365,8 @@ class Test(ReplProbeTest):
for key in expected.keys(): for key in expected.keys():
self.assertTrue(key in metadata, key) self.assertTrue(key in metadata, key)
self.assertEqual(metadata[key], expected[key]) self.assertEqual(metadata[key], expected[key])
self._assert_consistent_object_metadata()
self._assert_consistent_container_dbs()
def test_sysmeta_after_replication_with_prior_post(self): def test_sysmeta_after_replication_with_prior_post(self):
sysmeta = {'x-object-sysmeta-foo': 'sysmeta-foo'} sysmeta = {'x-object-sysmeta-foo': 'sysmeta-foo'}
@ -267,6 +416,8 @@ class Test(ReplProbeTest):
self.assertEqual(metadata[key], sysmeta[key]) self.assertEqual(metadata[key], sysmeta[key])
for key in usermeta: for key in usermeta:
self.assertFalse(key in metadata) self.assertFalse(key in metadata)
self._assert_consistent_object_metadata()
self._assert_consistent_container_dbs()
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -405,6 +405,44 @@ class TestTimestamp(unittest.TestCase):
'%r is not bigger than %f given %r' % ( '%r is not bigger than %f given %r' % (
timestamp, float(normal), value)) timestamp, float(normal), value))
def test_raw(self):
expected = 140243640891203
timestamp = utils.Timestamp(1402436408.91203)
self.assertEqual(expected, timestamp.raw)
# 'raw' does not include offset
timestamp = utils.Timestamp(1402436408.91203, 0xf0)
self.assertEqual(expected, timestamp.raw)
def test_delta(self):
def _assertWithinBounds(expected, timestamp):
tolerance = 0.00001
minimum = expected - tolerance
maximum = expected + tolerance
self.assertTrue(float(timestamp) > minimum)
self.assertTrue(float(timestamp) < maximum)
timestamp = utils.Timestamp(1402436408.91203, delta=100)
_assertWithinBounds(1402436408.91303, timestamp)
self.assertEqual(140243640891303, timestamp.raw)
timestamp = utils.Timestamp(1402436408.91203, delta=-100)
_assertWithinBounds(1402436408.91103, timestamp)
self.assertEqual(140243640891103, timestamp.raw)
timestamp = utils.Timestamp(1402436408.91203, delta=0)
_assertWithinBounds(1402436408.91203, timestamp)
self.assertEqual(140243640891203, timestamp.raw)
# delta is independent of offset
timestamp = utils.Timestamp(1402436408.91203, offset=42, delta=100)
self.assertEqual(140243640891303, timestamp.raw)
self.assertEqual(42, timestamp.offset)
# cannot go negative
self.assertRaises(ValueError, utils.Timestamp, 1402436408.91203,
delta=-140243640891203)
def test_int(self): def test_int(self):
expected = 1402437965 expected = 1402437965
test_values = ( test_values = (

View File

@ -33,12 +33,13 @@ from shutil import rmtree
from time import time from time import time
from tempfile import mkdtemp from tempfile import mkdtemp
from hashlib import md5 from hashlib import md5
from contextlib import closing, nested from contextlib import closing, nested, contextmanager
from gzip import GzipFile from gzip import GzipFile
from eventlet import hubs, timeout, tpool from eventlet import hubs, timeout, tpool
from test.unit import (FakeLogger, mock as unit_mock, temptree, from test.unit import (FakeLogger, mock as unit_mock, temptree,
patch_policies, debug_logger, EMPTY_ETAG) patch_policies, debug_logger, EMPTY_ETAG,
make_timestamp_iter)
from nose import SkipTest from nose import SkipTest
from swift.obj import diskfile from swift.obj import diskfile
@ -921,9 +922,8 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
return files return files
self.fail('Unexpected listdir of %r' % path) self.fail('Unexpected listdir of %r' % path)
expected_items = [ expected_items = [
(os.path.join(part_path, hash_[-3:], hash_), hash_, (os.path.join(part_path, hash_[-3:], hash_), hash_, timestamps)
Timestamp(ts).internal) for hash_, timestamps in expected.items()]
for hash_, ts in expected.items()]
with nested( with nested(
mock.patch('os.listdir', _listdir), mock.patch('os.listdir', _listdir),
mock.patch('os.unlink')): mock.patch('os.unlink')):
@ -932,8 +932,8 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
device, part, policy, **kwargs)) device, part, policy, **kwargs))
expected = sorted(expected_items) expected = sorted(expected_items)
actual = sorted(hash_items) actual = sorted(hash_items)
self.assertEqual(actual, expected, # default list diff easiest to debug
'Expected %s but got %s' % (expected, actual)) self.assertEqual(actual, expected)
def test_yield_hashes_tombstones(self): def test_yield_hashes_tombstones(self):
ts_iter = (Timestamp(t) for t in itertools.count(int(time()))) ts_iter = (Timestamp(t) for t in itertools.count(int(time())))
@ -965,9 +965,9 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
} }
} }
expected = { expected = {
'1111111111111111111111111111127e': ts1.internal, '1111111111111111111111111111127e': {'ts_data': ts1.internal},
'2222222222222222222222222222227e': ts2.internal, '2222222222222222222222222222227e': {'ts_data': ts2.internal},
'3333333333333333333333333333300b': ts3.internal, '3333333333333333333333333333300b': {'ts_data': ts3.internal},
} }
for policy in POLICIES: for policy in POLICIES:
self._check_yield_hashes(policy, suffix_map, expected, self._check_yield_hashes(policy, suffix_map, expected,
@ -1084,9 +1084,9 @@ class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
'def': {}, 'def': {},
} }
expected = { expected = {
'9373a92d072897b136b3fc06595b4abc': fresh_ts, '9373a92d072897b136b3fc06595b4abc': {'ts_data': fresh_ts},
'9373a92d072897b136b3fc06595b0456': old_ts, '9373a92d072897b136b3fc06595b0456': {'ts_data': old_ts},
'9373a92d072897b136b3fc06595b7456': fresher_ts, '9373a92d072897b136b3fc06595b7456': {'ts_data': fresher_ts},
} }
self._check_yield_hashes(POLICIES.default, suffix_map, expected) self._check_yield_hashes(POLICIES.default, suffix_map, expected)
@ -1097,24 +1097,30 @@ class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
ts3 = next(ts_iter) ts3 = next(ts_iter)
suffix_map = { suffix_map = {
'abc': { 'abc': {
'9373a92d072897b136b3fc06595b4abc': [ # only tombstone is yield/sync -able
'9333a92d072897b136b3fc06595b4abc': [
ts1.internal + '.ts', ts1.internal + '.ts',
ts2.internal + '.meta'], ts2.internal + '.meta'],
}, },
'456': { '456': {
'9373a92d072897b136b3fc06595b0456': [ # only latest metadata timestamp
'9444a92d072897b136b3fc06595b0456': [
ts1.internal + '.data', ts1.internal + '.data',
ts2.internal + '.meta', ts2.internal + '.meta',
ts3.internal + '.meta'], ts3.internal + '.meta'],
'9373a92d072897b136b3fc06595b7456': [ # exemplary datadir with .meta
'9555a92d072897b136b3fc06595b7456': [
ts1.internal + '.data', ts1.internal + '.data',
ts2.internal + '.meta'], ts2.internal + '.meta'],
}, },
} }
expected = { expected = {
'9373a92d072897b136b3fc06595b4abc': ts2, '9333a92d072897b136b3fc06595b4abc':
'9373a92d072897b136b3fc06595b0456': ts3, {'ts_data': ts1},
'9373a92d072897b136b3fc06595b7456': ts2, '9444a92d072897b136b3fc06595b0456':
{'ts_data': ts1, 'ts_meta': ts3},
'9555a92d072897b136b3fc06595b7456':
{'ts_data': ts1, 'ts_meta': ts2},
} }
self._check_yield_hashes(POLICIES.default, suffix_map, expected) self._check_yield_hashes(POLICIES.default, suffix_map, expected)
@ -1138,8 +1144,8 @@ class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
'def': {}, 'def': {},
} }
expected = { expected = {
'9373a92d072897b136b3fc06595b0456': old_ts, '9373a92d072897b136b3fc06595b0456': {'ts_data': old_ts},
'9373a92d072897b136b3fc06595b7456': fresher_ts, '9373a92d072897b136b3fc06595b7456': {'ts_data': fresher_ts},
} }
self._check_yield_hashes(POLICIES.default, suffix_map, expected, self._check_yield_hashes(POLICIES.default, suffix_map, expected,
suffixes=['456']) suffixes=['456'])
@ -1156,7 +1162,7 @@ class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
}, },
} }
expected = { expected = {
'9373a92d072897b136b3fc06595b0456': ts1, '9373a92d072897b136b3fc06595b0456': {'ts_data': ts1},
} }
try: try:
self._check_yield_hashes(POLICIES.default, suffix_map, expected, self._check_yield_hashes(POLICIES.default, suffix_map, expected,
@ -1369,6 +1375,27 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
def test_get_ondisk_files_with_stray_meta(self): def test_get_ondisk_files_with_stray_meta(self):
# get_ondisk_files does not tolerate a stray .meta file # get_ondisk_files does not tolerate a stray .meta file
class_under_test = self._get_diskfile(POLICIES.default)
@contextmanager
def create_files(df, files):
os.makedirs(df._datadir)
for fname in files:
fpath = os.path.join(df._datadir, fname)
with open(fpath, 'w') as f:
diskfile.write_metadata(f, {'name': df._name,
'Content-Length': 0})
yield
rmtree(df._datadir, ignore_errors=True)
# sanity
files = [
'0000000006.00000#1.data',
'0000000006.00000.durable',
]
with create_files(class_under_test, files):
class_under_test.open()
scenarios = [['0000000007.00000.meta'], scenarios = [['0000000007.00000.meta'],
['0000000007.00000.meta', ['0000000007.00000.meta',
@ -1382,8 +1409,13 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
'0000000005.00000#1.data'] '0000000005.00000#1.data']
] ]
for files in scenarios: for files in scenarios:
class_under_test = self._get_diskfile(POLICIES.default) with create_files(class_under_test, files):
self.assertRaises(DiskFileNotExist, class_under_test.open) try:
class_under_test.open()
except DiskFileNotExist:
continue
self.fail('expected DiskFileNotExist opening %s with %r' % (
class_under_test.__class__.__name__, files))
def test_parse_on_disk_filename(self): def test_parse_on_disk_filename(self):
mgr = self.df_router[POLICIES.default] mgr = self.df_router[POLICIES.default]
@ -1550,9 +1582,9 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
'def': {}, 'def': {},
} }
expected = { expected = {
'9373a92d072897b136b3fc06595b4abc': fresh_ts, '9373a92d072897b136b3fc06595b4abc': {'ts_data': fresh_ts},
'9373a92d072897b136b3fc06595b0456': old_ts, '9373a92d072897b136b3fc06595b0456': {'ts_data': old_ts},
'9373a92d072897b136b3fc06595b7456': fresher_ts, '9373a92d072897b136b3fc06595b7456': {'ts_data': fresher_ts},
} }
self._check_yield_hashes(POLICIES.default, suffix_map, expected, self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=2) frag_index=2)
@ -1581,22 +1613,18 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
}, },
} }
expected = { expected = {
# TODO: differs from repl DiskFileManager which *will* '9373a92d072897b136b3fc06595b4abc': {'ts_data': ts1},
# return meta timestamp when only meta and ts on disk '9373a92d072897b136b3fc06595b0456': {'ts_data': ts1,
'9373a92d072897b136b3fc06595b4abc': ts1, 'ts_meta': ts3},
'9373a92d072897b136b3fc06595b0456': ts3, '9373a92d072897b136b3fc06595b7456': {'ts_data': ts1,
'9373a92d072897b136b3fc06595b7456': ts2, 'ts_meta': ts2},
} }
self._check_yield_hashes(POLICIES.default, suffix_map, expected) self._check_yield_hashes(POLICIES.default, suffix_map, expected)
# but meta timestamp is not returned if specified frag index # but meta timestamp is *not* returned if specified frag index
# is not found # is not found
expected = { expected = {
# TODO: differs from repl DiskFileManager which *will* '9373a92d072897b136b3fc06595b4abc': {'ts_data': ts1},
# return meta timestamp when only meta and ts on disk
'9373a92d072897b136b3fc06595b4abc': ts1,
'9373a92d072897b136b3fc06595b0456': ts3,
'9373a92d072897b136b3fc06595b7456': ts2,
} }
self._check_yield_hashes(POLICIES.default, suffix_map, expected, self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=3) frag_index=3)
@ -1623,8 +1651,8 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
'def': {}, 'def': {},
} }
expected = { expected = {
'9373a92d072897b136b3fc06595b0456': old_ts, '9373a92d072897b136b3fc06595b0456': {'ts_data': old_ts},
'9373a92d072897b136b3fc06595b7456': fresher_ts, '9373a92d072897b136b3fc06595b7456': {'ts_data': fresher_ts},
} }
self._check_yield_hashes(POLICIES.default, suffix_map, expected, self._check_yield_hashes(POLICIES.default, suffix_map, expected,
suffixes=['456'], frag_index=2) suffixes=['456'], frag_index=2)
@ -1642,7 +1670,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
}, },
} }
expected = { expected = {
'9373a92d072897b136b3fc06595b0456': ts1, '9373a92d072897b136b3fc06595b0456': {'ts_data': ts1},
} }
self._check_yield_hashes(POLICIES.default, suffix_map, expected, self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=2) frag_index=2)
@ -1651,8 +1679,8 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
suffix_map['456']['9373a92d072897b136b3fc06595b7456'].append( suffix_map['456']['9373a92d072897b136b3fc06595b7456'].append(
ts1.internal + '.durable') ts1.internal + '.durable')
expected = { expected = {
'9373a92d072897b136b3fc06595b0456': ts1, '9373a92d072897b136b3fc06595b0456': {'ts_data': ts1},
'9373a92d072897b136b3fc06595b7456': ts1, '9373a92d072897b136b3fc06595b7456': {'ts_data': ts1},
} }
self._check_yield_hashes(POLICIES.default, suffix_map, expected, self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=2) frag_index=2)
@ -1672,7 +1700,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
}, },
} }
expected = { expected = {
'9373a92d072897b136b3fc06595b0456': ts1, '9373a92d072897b136b3fc06595b0456': {'ts_data': ts1},
} }
self._check_yield_hashes(POLICIES.default, suffix_map, expected, self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=None) frag_index=None)
@ -1683,7 +1711,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
suffix_map['456']['9373a92d072897b136b3fc06595b0456'].append( suffix_map['456']['9373a92d072897b136b3fc06595b0456'].append(
ts2.internal + '.durable') ts2.internal + '.durable')
expected = { expected = {
'9373a92d072897b136b3fc06595b0456': ts2, '9373a92d072897b136b3fc06595b0456': {'ts_data': ts2},
} }
self._check_yield_hashes(POLICIES.default, suffix_map, expected, self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=None) frag_index=None)
@ -1698,27 +1726,40 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
ts2 = next(ts_iter) ts2 = next(ts_iter)
suffix_map = { suffix_map = {
'456': { '456': {
'9373a92d072897b136b3fc06595b0456': [ # this one is fine
'9333a92d072897b136b3fc06595b0456': [
ts1.internal + '#2.data', ts1.internal + '#2.data',
ts1.internal + '.durable'], ts1.internal + '.durable'],
'9373a92d072897b136b3fc06595b7456': [ # missing frag index
'9444a92d072897b136b3fc06595b7456': [
ts1.internal + '.data'], ts1.internal + '.data'],
'9373a92d072897b136b3fc06595b8456': [ '9555a92d072897b136b3fc06595b8456': [
'junk_file'], 'junk_file'],
'9373a92d072897b136b3fc06595b9456': [ # missing .durable
ts1.internal + '.data', '9666a92d072897b136b3fc06595b9456': [
ts1.internal + '#2.data',
ts2.internal + '.meta'], ts2.internal + '.meta'],
'9373a92d072897b136b3fc06595ba456': [ # .meta files w/o .data files can't be opened, and are ignored
'9777a92d072897b136b3fc06595ba456': [
ts1.internal + '.meta'], ts1.internal + '.meta'],
'9373a92d072897b136b3fc06595bb456': [ '9888a92d072897b136b3fc06595bb456': [
ts1.internal + '.meta', ts1.internal + '.meta',
ts2.internal + '.meta'], ts2.internal + '.meta'],
# this is good with meta
'9999a92d072897b136b3fc06595bb456': [
ts1.internal + '#2.data',
ts1.internal + '.durable',
ts2.internal + '.meta'],
# this one is wrong frag index
'9aaaa92d072897b136b3fc06595b0456': [
ts1.internal + '#7.data',
ts1.internal + '.durable'],
}, },
} }
expected = { expected = {
'9373a92d072897b136b3fc06595b0456': ts1, '9333a92d072897b136b3fc06595b0456': {'ts_data': ts1},
'9373a92d072897b136b3fc06595ba456': ts1, '9999a92d072897b136b3fc06595bb456': {'ts_data': ts1,
'9373a92d072897b136b3fc06595bb456': ts2, 'ts_meta': ts2},
} }
self._check_yield_hashes(POLICIES.default, suffix_map, expected, self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=2) frag_index=2)
@ -1758,9 +1799,9 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
}, },
} }
expected = { expected = {
'1111111111111111111111111111127e': ts1, '1111111111111111111111111111127e': {'ts_data': ts1},
'2222222222222222222222222222227e': ts2, '2222222222222222222222222222227e': {'ts_data': ts2},
'3333333333333333333333333333300b': ts3, '3333333333333333333333333333300b': {'ts_data': ts3},
} }
self._check_yield_hashes(POLICIES.default, suffix_map, expected, self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=2) frag_index=2)
@ -1976,17 +2017,62 @@ class DiskFileMixin(BaseDiskFileTestMixin):
def test_get_metadata_not_opened(self): def test_get_metadata_not_opened(self):
df = self._simple_get_diskfile() df = self._simple_get_diskfile()
self.assertRaises(DiskFileNotOpen, df.get_metadata) with self.assertRaises(DiskFileNotOpen):
df.get_metadata()
def test_get_datafile_metadata(self):
ts_iter = make_timestamp_iter()
body = '1234567890'
ts_data = ts_iter.next()
metadata = {'X-Object-Meta-Test': 'test1',
'X-Object-Sysmeta-Test': 'test1'}
df = self._create_test_file(body, timestamp=ts_data.internal,
metadata=metadata)
expected = df.get_metadata()
ts_meta = ts_iter.next()
df.write_metadata({'X-Timestamp': ts_meta.internal,
'X-Object-Meta-Test': 'changed',
'X-Object-Sysmeta-Test': 'ignored'})
df.open()
self.assertEqual(expected, df.get_datafile_metadata())
expected.update({'X-Timestamp': ts_meta.internal,
'X-Object-Meta-Test': 'changed'})
self.assertEqual(expected, df.get_metadata())
def test_get_datafile_metadata_not_opened(self):
df = self._simple_get_diskfile()
with self.assertRaises(DiskFileNotOpen):
df.get_datafile_metadata()
def test_get_metafile_metadata(self):
ts_iter = make_timestamp_iter()
body = '1234567890'
ts_data = ts_iter.next()
metadata = {'X-Object-Meta-Test': 'test1',
'X-Object-Sysmeta-Test': 'test1'}
df = self._create_test_file(body, timestamp=ts_data.internal,
metadata=metadata)
self.assertIsNone(df.get_metafile_metadata())
# now create a meta file
ts_meta = ts_iter.next()
df.write_metadata({'X-Timestamp': ts_meta.internal,
'X-Object-Meta-Test': 'changed'})
df.open()
expected = {'X-Timestamp': ts_meta.internal,
'X-Object-Meta-Test': 'changed'}
self.assertEqual(expected, df.get_metafile_metadata())
def test_get_metafile_metadata_not_opened(self):
df = self._simple_get_diskfile()
with self.assertRaises(DiskFileNotOpen):
df.get_metafile_metadata()
def test_not_opened(self): def test_not_opened(self):
df = self._simple_get_diskfile() df = self._simple_get_diskfile()
try: with self.assertRaises(DiskFileNotOpen):
with df: with df:
pass pass
except DiskFileNotOpen:
pass
else:
self.fail("Expected DiskFileNotOpen exception")
def test_disk_file_default_disallowed_metadata(self): def test_disk_file_default_disallowed_metadata(self):
# build an object with some meta (at t0+1s) # build an object with some meta (at t0+1s)
@ -3078,11 +3164,31 @@ class DiskFileMixin(BaseDiskFileTestMixin):
self.assertEqual(str(exc), '') self.assertEqual(str(exc), '')
def test_diskfile_timestamp(self): def test_diskfile_timestamp(self):
ts = Timestamp(time()) ts_1 = self.ts()
self._get_open_disk_file(ts=ts.internal) self._get_open_disk_file(ts=ts_1.internal)
df = self._simple_get_diskfile() df = self._simple_get_diskfile()
with df.open(): with df.open():
self.assertEqual(df.timestamp, ts.internal) self.assertEqual(df.timestamp, ts_1.internal)
ts_2 = self.ts()
df.write_metadata({'X-Timestamp': ts_2.internal})
with df.open():
self.assertEqual(df.timestamp, ts_2.internal)
def test_data_timestamp(self):
ts_1 = self.ts()
self._get_open_disk_file(ts=ts_1.internal)
df = self._simple_get_diskfile()
with df.open():
self.assertEqual(df.data_timestamp, ts_1.internal)
ts_2 = self.ts()
df.write_metadata({'X-Timestamp': ts_2.internal})
with df.open():
self.assertEqual(df.data_timestamp, ts_1.internal)
def test_data_timestamp_not_open(self):
df = self._simple_get_diskfile()
with self.assertRaises(DiskFileNotOpen):
df.data_timestamp
def test_error_in_hash_cleanup_listdir(self): def test_error_in_hash_cleanup_listdir(self):

View File

@ -840,8 +840,8 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
self.job['policy'], self.suffixes, self.job['policy'], self.suffixes,
frag_index=self.job.get('frag_index')) frag_index=self.job.get('frag_index'))
self.available_map = {} self.available_map = {}
for path, hash_, ts in hash_gen: for path, hash_, timestamps in hash_gen:
self.available_map[hash_] = ts self.available_map[hash_] = timestamps
context['available_map'] = self.available_map context['available_map'] = self.available_map
ssync_calls.append(context) ssync_calls.append(context)
@ -2403,7 +2403,7 @@ class TestObjectReconstructor(unittest.TestCase):
} }
def ssync_response_callback(*args): def ssync_response_callback(*args):
return True, {ohash: ts} return True, {ohash: {'ts_data': ts}}
ssync_calls = [] ssync_calls = []
with mock_ssync_sender(ssync_calls, with mock_ssync_sender(ssync_calls,
@ -2459,7 +2459,7 @@ class TestObjectReconstructor(unittest.TestCase):
} }
def ssync_response_callback(*args): def ssync_response_callback(*args):
return True, {ohash: ts} return True, {ohash: {'ts_data': ts}}
ssync_calls = [] ssync_calls = []
with mock_ssync_sender(ssync_calls, with mock_ssync_sender(ssync_calls,

View File

@ -44,7 +44,8 @@ from nose import SkipTest
from swift import __version__ as swift_version from swift import __version__ as swift_version
from swift.common.http import is_success from swift.common.http import is_success
from test.unit import FakeLogger, debug_logger, mocked_http_conn from test.unit import FakeLogger, debug_logger, mocked_http_conn, \
make_timestamp_iter
from test.unit import connect_tcp, readuntil2crlfs, patch_policies from test.unit import connect_tcp, readuntil2crlfs, patch_policies
from swift.obj import server as object_server from swift.obj import server as object_server
from swift.obj import diskfile from swift.obj import diskfile
@ -339,6 +340,41 @@ class TestObjectController(unittest.TestCase):
self.assertEqual(resp.status_int, 409) self.assertEqual(resp.status_int, 409)
self.assertEqual(resp.headers['X-Backend-Timestamp'], orig_timestamp) self.assertEqual(resp.headers['X-Backend-Timestamp'], orig_timestamp)
def test_POST_conflicts_with_later_POST(self):
ts_iter = make_timestamp_iter()
t_put = ts_iter.next().internal
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': t_put,
'Content-Length': 0,
'Content-Type': 'plain/text'})
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 201)
t_post1 = ts_iter.next().internal
t_post2 = ts_iter.next().internal
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': t_post2})
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 202)
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': t_post1})
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 409)
obj_dir = os.path.join(
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(0), 'p',
hash_path('a', 'c', 'o')))
ts_file = os.path.join(obj_dir, t_post2 + '.meta')
self.assertTrue(os.path.isfile(ts_file))
meta_file = os.path.join(obj_dir, t_post1 + '.meta')
self.assertFalse(os.path.isfile(meta_file))
def test_POST_not_exist(self): def test_POST_not_exist(self):
timestamp = normalize_timestamp(time()) timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/fail', req = Request.blank('/sda1/p/a/c/fail',
@ -1082,6 +1118,44 @@ class TestObjectController(unittest.TestCase):
'X-Object-Sysmeta-1': 'One', 'X-Object-Sysmeta-1': 'One',
'X-Object-Sysmeta-Two': 'Two'}) 'X-Object-Sysmeta-Two': 'Two'})
def test_PUT_succeeds_with_later_POST(self):
ts_iter = make_timestamp_iter()
t_put = ts_iter.next().internal
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': t_put,
'Content-Length': 0,
'Content-Type': 'plain/text'})
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 201)
t_put2 = ts_iter.next().internal
t_post = ts_iter.next().internal
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': t_post})
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 202)
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': t_put2,
'Content-Length': 0,
'Content-Type': 'plain/text'},
)
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 201)
obj_dir = os.path.join(
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(0), 'p',
hash_path('a', 'c', 'o')))
ts_file = os.path.join(obj_dir, t_put2 + '.data')
self.assertTrue(os.path.isfile(ts_file))
meta_file = os.path.join(obj_dir, t_post + '.meta')
self.assertTrue(os.path.isfile(meta_file))
def test_POST_system_metadata(self): def test_POST_system_metadata(self):
# check that diskfile sysmeta is not changed by a POST # check that diskfile sysmeta is not changed by a POST
timestamp1 = normalize_timestamp(time()) timestamp1 = normalize_timestamp(time())
@ -2394,6 +2468,42 @@ class TestObjectController(unittest.TestCase):
self.assertTrue(os.path.isfile(ts_1003_file)) self.assertTrue(os.path.isfile(ts_1003_file))
self.assertEqual(len(os.listdir(os.path.dirname(ts_1003_file))), 1) self.assertEqual(len(os.listdir(os.path.dirname(ts_1003_file))), 1)
def test_DELETE_succeeds_with_later_POST(self):
ts_iter = make_timestamp_iter()
t_put = ts_iter.next().internal
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': t_put,
'Content-Length': 0,
'Content-Type': 'plain/text'})
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 201)
t_delete = ts_iter.next().internal
t_post = ts_iter.next().internal
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': t_post})
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 202)
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'X-Timestamp': t_delete},
)
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 204)
obj_dir = os.path.join(
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(0), 'p',
hash_path('a', 'c', 'o')))
ts_file = os.path.join(obj_dir, t_delete + '.ts')
self.assertTrue(os.path.isfile(ts_file))
meta_file = os.path.join(obj_dir, t_post + '.meta')
self.assertTrue(os.path.isfile(meta_file))
def test_DELETE_container_updates(self): def test_DELETE_container_updates(self):
# Test swift.obj.server.ObjectController.DELETE and container # Test swift.obj.server.ObjectController.DELETE and container
# updates, making sure container update is called in the correct # updates, making sure container update is called in the correct

View File

@ -36,7 +36,7 @@ from swift.obj import ssync_receiver, ssync_sender
from swift.obj.reconstructor import ObjectReconstructor from swift.obj.reconstructor import ObjectReconstructor
from test import unit from test import unit
from test.unit import debug_logger, patch_policies from test.unit import debug_logger, patch_policies, make_timestamp_iter
@unit.patch_policies() @unit.patch_policies()
@ -611,8 +611,8 @@ class TestReceiver(unittest.TestCase):
self.assertEqual( self.assertEqual(
self.body_lines(resp.body), self.body_lines(resp.body),
[':MISSING_CHECK: START', [':MISSING_CHECK: START',
self.hash1, self.hash1 + ' dm',
self.hash2, self.hash2 + ' dm',
':MISSING_CHECK: END', ':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END']) ':UPDATES: START', ':UPDATES: END'])
self.assertEqual(resp.status_int, 200) self.assertEqual(resp.status_int, 200)
@ -637,8 +637,8 @@ class TestReceiver(unittest.TestCase):
self.assertEqual( self.assertEqual(
self.body_lines(resp.body), self.body_lines(resp.body),
[':MISSING_CHECK: START', [':MISSING_CHECK: START',
self.hash1, self.hash1 + ' dm',
self.hash2, self.hash2 + ' dm',
':MISSING_CHECK: END', ':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END']) ':UPDATES: START', ':UPDATES: END'])
self.assertEqual(resp.status_int, 200) self.assertEqual(resp.status_int, 200)
@ -670,7 +670,7 @@ class TestReceiver(unittest.TestCase):
self.assertEqual( self.assertEqual(
self.body_lines(resp.body), self.body_lines(resp.body),
[':MISSING_CHECK: START', [':MISSING_CHECK: START',
self.hash2, self.hash2 + ' dm',
':MISSING_CHECK: END', ':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END']) ':UPDATES: START', ':UPDATES: END'])
self.assertEqual(resp.status_int, 200) self.assertEqual(resp.status_int, 200)
@ -706,7 +706,7 @@ class TestReceiver(unittest.TestCase):
self.assertEqual( self.assertEqual(
self.body_lines(resp.body), self.body_lines(resp.body),
[':MISSING_CHECK: START', [':MISSING_CHECK: START',
self.hash2, self.hash2 + ' dm',
':MISSING_CHECK: END', ':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END']) ':UPDATES: START', ':UPDATES: END'])
self.assertEqual(resp.status_int, 200) self.assertEqual(resp.status_int, 200)
@ -740,14 +740,14 @@ class TestReceiver(unittest.TestCase):
self.assertEqual( self.assertEqual(
self.body_lines(resp.body), self.body_lines(resp.body),
[':MISSING_CHECK: START', [':MISSING_CHECK: START',
self.hash2, self.hash2 + ' dm',
':MISSING_CHECK: END', ':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END']) ':UPDATES: START', ':UPDATES: END'])
self.assertEqual(resp.status_int, 200) self.assertEqual(resp.status_int, 200)
self.assertFalse(self.controller.logger.error.called) self.assertFalse(self.controller.logger.error.called)
self.assertFalse(self.controller.logger.exception.called) self.assertFalse(self.controller.logger.exception.called)
def test_MISSING_CHECK_have_one_older(self): def test_MISSING_CHECK_have_newer_meta(self):
object_dir = utils.storage_directory( object_dir = utils.storage_directory(
os.path.join(self.testdir, 'sda1', os.path.join(self.testdir, 'sda1',
diskfile.get_data_dir(POLICIES[0])), diskfile.get_data_dir(POLICIES[0])),
@ -760,22 +760,67 @@ class TestReceiver(unittest.TestCase):
fp.flush() fp.flush()
self.metadata1['Content-Length'] = '1' self.metadata1['Content-Length'] = '1'
diskfile.write_metadata(fp, self.metadata1) diskfile.write_metadata(fp, self.metadata1)
# write newer .meta file
metadata = {'name': self.name1, 'X-Timestamp': self.ts2,
'X-Object-Meta-Test': 'test'}
fp = open(os.path.join(object_dir, self.ts2 + '.meta'), 'w+')
diskfile.write_metadata(fp, metadata)
# receiver has .data at older_ts, .meta at ts2
# sender has .data at ts1
self.controller.logger = mock.MagicMock() self.controller.logger = mock.MagicMock()
req = swob.Request.blank( req = swob.Request.blank(
'/sda1/1', '/sda1/1',
environ={'REQUEST_METHOD': 'SSYNC'}, environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n' + body=':MISSING_CHECK: START\r\n' +
self.hash1 + ' ' + self.ts1 + '\r\n' + self.hash1 + ' ' + self.ts1 + '\r\n'
self.hash2 + ' ' + self.ts2 + '\r\n'
':MISSING_CHECK: END\r\n' ':MISSING_CHECK: END\r\n'
':UPDATES: START\r\n:UPDATES: END\r\n') ':UPDATES: START\r\n:UPDATES: END\r\n')
resp = req.get_response(self.controller) resp = req.get_response(self.controller)
self.assertEqual( self.assertEqual(
self.body_lines(resp.body), self.body_lines(resp.body),
[':MISSING_CHECK: START', [':MISSING_CHECK: START',
self.hash1, self.hash1 + ' d',
self.hash2, ':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END'])
self.assertEqual(resp.status_int, 200)
self.assertFalse(self.controller.logger.error.called)
self.assertFalse(self.controller.logger.exception.called)
def test_MISSING_CHECK_have_older_meta(self):
object_dir = utils.storage_directory(
os.path.join(self.testdir, 'sda1',
diskfile.get_data_dir(POLICIES[0])),
'1', self.hash1)
utils.mkdirs(object_dir)
older_ts1 = utils.normalize_timestamp(float(self.ts1) - 1)
self.metadata1['X-Timestamp'] = older_ts1
fp = open(os.path.join(object_dir, older_ts1 + '.data'), 'w+')
fp.write('1')
fp.flush()
self.metadata1['Content-Length'] = '1'
diskfile.write_metadata(fp, self.metadata1)
# write .meta file at ts1
metadata = {'name': self.name1, 'X-Timestamp': self.ts1,
'X-Object-Meta-Test': 'test'}
fp = open(os.path.join(object_dir, self.ts1 + '.meta'), 'w+')
diskfile.write_metadata(fp, metadata)
# receiver has .data at older_ts, .meta at ts1
# sender has .data at older_ts, .meta at ts2
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/sda1/1',
environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n' +
self.hash1 + ' ' + older_ts1 + ' m:30d40\r\n'
':MISSING_CHECK: END\r\n'
':UPDATES: START\r\n:UPDATES: END\r\n')
resp = req.get_response(self.controller)
self.assertEqual(
self.body_lines(resp.body),
[':MISSING_CHECK: START',
self.hash1 + ' m',
':MISSING_CHECK: END', ':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END']) ':UPDATES: START', ':UPDATES: END'])
self.assertEqual(resp.status_int, 200) self.assertEqual(resp.status_int, 200)
@ -1303,6 +1348,46 @@ class TestReceiver(unittest.TestCase):
actual = df.get_metadata() actual = df.get_metadata()
self.assertEqual(expected, actual) self.assertEqual(expected, actual)
def test_UPDATES_POST(self):
_POST_request = [None]
@server.public
def _POST(request):
_POST_request[0] = request
return swob.HTTPAccepted()
with mock.patch.object(self.controller, 'POST', _POST):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/device/partition',
environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n'
':UPDATES: START\r\n'
'POST /a/c/o\r\n'
'X-Timestamp: 1364456113.12344\r\n'
'X-Object-Meta-Test1: one\r\n'
'Specialty-Header: value\r\n\r\n')
resp = req.get_response(self.controller)
self.assertEqual(
self.body_lines(resp.body),
[':MISSING_CHECK: START', ':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END'])
self.assertEqual(resp.status_int, 200)
self.assertFalse(self.controller.logger.exception.called)
self.assertFalse(self.controller.logger.error.called)
req = _POST_request[0]
self.assertEqual(req.path, '/device/partition/a/c/o')
self.assertEqual(req.content_length, None)
self.assertEqual(req.headers, {
'X-Timestamp': '1364456113.12344',
'X-Object-Meta-Test1': 'one',
'Specialty-Header': 'value',
'Host': 'localhost:80',
'X-Backend-Storage-Policy-Index': '0',
'X-Backend-Replication': 'True',
'X-Backend-Replication-Headers': (
'x-timestamp x-object-meta-test1 specialty-header')})
def test_UPDATES_with_storage_policy(self): def test_UPDATES_with_storage_policy(self):
# update router post policy patch # update router post policy patch
self.controller._diskfile_router = diskfile.DiskFileRouter( self.controller._diskfile_router = diskfile.DiskFileRouter(
@ -1489,6 +1574,11 @@ class TestReceiver(unittest.TestCase):
request.read_body = request.environ['wsgi.input'].read() request.read_body = request.environ['wsgi.input'].read()
return swob.HTTPCreated() return swob.HTTPCreated()
@server.public
def _POST(request):
_requests.append(request)
return swob.HTTPOk()
@server.public @server.public
def _DELETE(request): def _DELETE(request):
_requests.append(request) _requests.append(request)
@ -1496,6 +1586,7 @@ class TestReceiver(unittest.TestCase):
with contextlib.nested( with contextlib.nested(
mock.patch.object(self.controller, 'PUT', _PUT), mock.patch.object(self.controller, 'PUT', _PUT),
mock.patch.object(self.controller, 'POST', _POST),
mock.patch.object(self.controller, 'DELETE', _DELETE)): mock.patch.object(self.controller, 'DELETE', _DELETE)):
self.controller.logger = mock.MagicMock() self.controller.logger = mock.MagicMock()
req = swob.Request.blank( req = swob.Request.blank(
@ -1529,7 +1620,17 @@ class TestReceiver(unittest.TestCase):
'\r\n' '\r\n'
'DELETE /a/c/o6\r\n' 'DELETE /a/c/o6\r\n'
'X-Timestamp: 1364456113.00006\r\n' 'X-Timestamp: 1364456113.00006\r\n'
'\r\n') '\r\n'
'PUT /a/c/o7\r\n'
'Content-Length: 7\r\n'
'X-Timestamp: 1364456113.00007\r\n'
'\r\n'
'1234567'
'POST /a/c/o7\r\n'
'X-Object-Meta-Test-User: user_meta\r\n'
'X-Timestamp: 1364456113.00008\r\n'
'\r\n'
)
resp = req.get_response(self.controller) resp = req.get_response(self.controller)
self.assertEqual( self.assertEqual(
self.body_lines(resp.body), self.body_lines(resp.body),
@ -1538,7 +1639,7 @@ class TestReceiver(unittest.TestCase):
self.assertEqual(resp.status_int, 200) self.assertEqual(resp.status_int, 200)
self.assertFalse(self.controller.logger.exception.called) self.assertFalse(self.controller.logger.exception.called)
self.assertFalse(self.controller.logger.error.called) self.assertFalse(self.controller.logger.error.called)
self.assertEqual(len(_requests), 6) # sanity self.assertEqual(len(_requests), 8) # sanity
req = _requests.pop(0) req = _requests.pop(0)
self.assertEqual(req.method, 'PUT') self.assertEqual(req.method, 'PUT')
self.assertEqual(req.path, '/device/partition/a/c/o1') self.assertEqual(req.path, '/device/partition/a/c/o1')
@ -1609,6 +1710,31 @@ class TestReceiver(unittest.TestCase):
'X-Backend-Storage-Policy-Index': '0', 'X-Backend-Storage-Policy-Index': '0',
'X-Backend-Replication': 'True', 'X-Backend-Replication': 'True',
'X-Backend-Replication-Headers': 'x-timestamp'}) 'X-Backend-Replication-Headers': 'x-timestamp'})
req = _requests.pop(0)
self.assertEqual(req.method, 'PUT')
self.assertEqual(req.path, '/device/partition/a/c/o7')
self.assertEqual(req.content_length, 7)
self.assertEqual(req.headers, {
'Content-Length': '7',
'X-Timestamp': '1364456113.00007',
'Host': 'localhost:80',
'X-Backend-Storage-Policy-Index': '0',
'X-Backend-Replication': 'True',
'X-Backend-Replication-Headers': (
'content-length x-timestamp')})
self.assertEqual(req.read_body, '1234567')
req = _requests.pop(0)
self.assertEqual(req.method, 'POST')
self.assertEqual(req.path, '/device/partition/a/c/o7')
self.assertEqual(req.content_length, None)
self.assertEqual(req.headers, {
'X-Timestamp': '1364456113.00008',
'X-Object-Meta-Test-User': 'user_meta',
'Host': 'localhost:80',
'X-Backend-Storage-Policy-Index': '0',
'X-Backend-Replication': 'True',
'X-Backend-Replication-Headers': (
'x-object-meta-test-user x-timestamp')})
self.assertEqual(_requests, []) self.assertEqual(_requests, [])
def test_UPDATES_subreq_does_not_read_all(self): def test_UPDATES_subreq_does_not_read_all(self):
@ -1916,5 +2042,125 @@ class TestSsyncRxServer(unittest.TestCase):
self.assertFalse(mock_missing_check.called) self.assertFalse(mock_missing_check.called)
class TestModuleMethods(unittest.TestCase):
def test_decode_missing(self):
object_hash = '9d41d8cd98f00b204e9800998ecf0abc'
ts_iter = make_timestamp_iter()
t_data = ts_iter.next()
t_meta = ts_iter.next()
d_meta_data = t_meta.raw - t_data.raw
# legacy single timestamp string
msg = '%s %s' % (object_hash, t_data.internal)
expected = dict(object_hash=object_hash,
ts_meta=t_data,
ts_data=t_data)
self.assertEqual(expected, ssync_receiver.decode_missing(msg))
# hex meta delta encoded as extra message part
msg = '%s %s m:%x' % (object_hash, t_data.internal, d_meta_data)
expected = dict(object_hash=object_hash,
ts_data=t_data,
ts_meta=t_meta)
self.assertEqual(expected, ssync_receiver.decode_missing(msg))
# unexpected zero delta is tolerated
msg = '%s %s m:0' % (object_hash, t_data.internal)
expected = dict(object_hash=object_hash,
ts_meta=t_data,
ts_data=t_data)
self.assertEqual(expected, ssync_receiver.decode_missing(msg))
# unexpected subparts in timestamp delta part are tolerated
msg = '%s %s c:12345,m:%x,junk' % (object_hash,
t_data.internal,
d_meta_data)
expected = dict(object_hash=object_hash,
ts_meta=t_meta,
ts_data=t_data)
self.assertEqual(
expected, ssync_receiver.decode_missing(msg))
# extra message parts tolerated
msg = '%s %s m:%x future parts' % (object_hash,
t_data.internal,
d_meta_data)
expected = dict(object_hash=object_hash,
ts_meta=t_meta,
ts_data=t_data)
self.assertEqual(expected, ssync_receiver.decode_missing(msg))
def test_encode_wanted(self):
ts_iter = make_timestamp_iter()
old_t_data = ts_iter.next()
t_data = ts_iter.next()
old_t_meta = ts_iter.next()
t_meta = ts_iter.next()
remote = {
'object_hash': 'theremotehash',
'ts_data': t_data,
'ts_meta': t_meta,
}
# missing
local = {}
expected = 'theremotehash dm'
self.assertEqual(ssync_receiver.encode_wanted(remote, local),
expected)
# in-sync
local = {
'ts_data': t_data,
'ts_meta': t_meta,
}
expected = None
self.assertEqual(ssync_receiver.encode_wanted(remote, local),
expected)
# out-of-sync
local = {
'ts_data': old_t_data,
'ts_meta': old_t_meta,
}
expected = 'theremotehash dm'
self.assertEqual(ssync_receiver.encode_wanted(remote, local),
expected)
# old data
local = {
'ts_data': old_t_data,
'ts_meta': t_meta,
}
expected = 'theremotehash d'
self.assertEqual(ssync_receiver.encode_wanted(remote, local),
expected)
# old metadata
local = {
'ts_data': t_data,
'ts_meta': old_t_meta,
}
expected = 'theremotehash m'
self.assertEqual(ssync_receiver.encode_wanted(remote, local),
expected)
# in-sync tombstone
local = {
'ts_data': t_data,
}
expected = None
self.assertEqual(ssync_receiver.encode_wanted(remote, local),
expected)
# old tombstone
local = {
'ts_data': old_t_data,
}
expected = 'theremotehash d'
self.assertEqual(ssync_receiver.encode_wanted(remote, local),
expected)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@ -12,6 +12,7 @@
# implied. # implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from collections import defaultdict
import hashlib import hashlib
import os import os
@ -19,6 +20,7 @@ import shutil
import tempfile import tempfile
import time import time
import unittest import unittest
import urllib
import eventlet import eventlet
import itertools import itertools
@ -30,10 +32,10 @@ from swift.common.storage_policy import POLICIES
from swift.common.exceptions import DiskFileNotExist, DiskFileError, \ from swift.common.exceptions import DiskFileNotExist, DiskFileError, \
DiskFileDeleted DiskFileDeleted
from swift.common.utils import Timestamp from swift.common.utils import Timestamp
from swift.obj import ssync_sender, diskfile, server from swift.obj import ssync_sender, diskfile, server, ssync_receiver
from swift.obj.reconstructor import RebuildingECDiskFileStream from swift.obj.reconstructor import RebuildingECDiskFileStream
from test.unit import debug_logger, patch_policies from test.unit import debug_logger, patch_policies, make_timestamp_iter
class FakeReplicator(object): class FakeReplicator(object):
@ -461,7 +463,7 @@ class TestSender(BaseTestSender):
self.daemon, node, job, ['ignored'], self.daemon, node, job, ['ignored'],
remote_check_objs=None) remote_check_objs=None)
patch_sender(sender) patch_sender(sender)
sender.send_list = [wanted] sender.send_map = {wanted: []}
sender.available_map = available_map sender.available_map = available_map
success, candidates = sender() success, candidates = sender()
self.assertTrue(success) self.assertTrue(success)
@ -475,7 +477,7 @@ class TestSender(BaseTestSender):
self.daemon, node, job, ['ignored'], self.daemon, node, job, ['ignored'],
remote_check_objs=remote_check_objs) remote_check_objs=remote_check_objs)
patch_sender(sender) patch_sender(sender)
sender.send_list = [wanted] sender.send_map = {wanted: []}
sender.available_map = available_map sender.available_map = available_map
success, candidates = sender() success, candidates = sender()
self.assertTrue(success) self.assertTrue(success)
@ -485,7 +487,7 @@ class TestSender(BaseTestSender):
'1380144474.44444')]) '1380144474.44444')])
self.assertEqual(expected_map, candidates) self.assertEqual(expected_map, candidates)
def test_call_and_missing_check(self): def test_call_and_missing_check_metadata_legacy_response(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs): def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if device == 'dev' and partition == '9' and suffixes == ['abc'] \ if device == 'dev' and partition == '9' and suffixes == ['abc'] \
and policy == POLICIES.legacy: and policy == POLICIES.legacy:
@ -493,12 +495,14 @@ class TestSender(BaseTestSender):
'/srv/node/dev/objects/9/abc/' '/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc', '9d41d8cd98f00b204e9800998ecf0abc',
'9d41d8cd98f00b204e9800998ecf0abc', '9d41d8cd98f00b204e9800998ecf0abc',
'1380144470.00000') {'ts_data': Timestamp(1380144470.00000),
'ts_meta': Timestamp(1380155570.00005)})
else: else:
raise Exception( raise Exception(
'No match for %r %r %r' % (device, partition, suffixes)) 'No match for %r %r %r' % (device, partition, suffixes))
self.sender.connection = FakeConnection() self.sender.connection = FakeConnection()
self.sender.node = {}
self.sender.job = { self.sender.job = {
'device': 'dev', 'device': 'dev',
'partition': '9', 'partition': '9',
@ -510,6 +514,52 @@ class TestSender(BaseTestSender):
chunk_body=( chunk_body=(
':MISSING_CHECK: START\r\n' ':MISSING_CHECK: START\r\n'
'9d41d8cd98f00b204e9800998ecf0abc\r\n' '9d41d8cd98f00b204e9800998ecf0abc\r\n'
':MISSING_CHECK: END\r\n'
':UPDATES: START\r\n'
':UPDATES: END\r\n'
))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.connect = mock.MagicMock()
self.sender.df_mgr.get_diskfile_from_hash = mock.MagicMock()
self.sender.disconnect = mock.MagicMock()
success, candidates = self.sender()
self.assertTrue(success)
found_post = found_put = False
for chunk in self.sender.connection.sent:
if 'POST' in chunk:
found_post = True
if 'PUT' in chunk:
found_put = True
self.assertFalse(found_post)
self.assertTrue(found_put)
self.assertEqual(self.sender.failures, 0)
def test_call_and_missing_check(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if device == 'dev' and partition == '9' and suffixes == ['abc'] \
and policy == POLICIES.legacy:
yield (
'/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc',
'9d41d8cd98f00b204e9800998ecf0abc',
{'ts_data': Timestamp(1380144470.00000)})
else:
raise Exception(
'No match for %r %r %r' % (device, partition, suffixes))
self.sender.connection = FakeConnection()
self.sender.node = {}
self.sender.job = {
'device': 'dev',
'partition': '9',
'policy': POLICIES.legacy,
'frag_index': 0,
}
self.sender.suffixes = ['abc']
self.sender.response = FakeResponse(
chunk_body=(
':MISSING_CHECK: START\r\n'
'9d41d8cd98f00b204e9800998ecf0abc d\r\n'
':MISSING_CHECK: END\r\n')) ':MISSING_CHECK: END\r\n'))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.connect = mock.MagicMock() self.sender.connect = mock.MagicMock()
@ -517,8 +567,9 @@ class TestSender(BaseTestSender):
self.sender.disconnect = mock.MagicMock() self.sender.disconnect = mock.MagicMock()
success, candidates = self.sender() success, candidates = self.sender()
self.assertTrue(success) self.assertTrue(success)
self.assertEqual(candidates, dict([('9d41d8cd98f00b204e9800998ecf0abc', self.assertEqual(candidates,
'1380144470.00000')])) dict([('9d41d8cd98f00b204e9800998ecf0abc',
{'ts_data': Timestamp(1380144470.00000)})]))
self.assertEqual(self.sender.failures, 0) self.assertEqual(self.sender.failures, 0)
def test_call_and_missing_check_with_obj_list(self): def test_call_and_missing_check_with_obj_list(self):
@ -529,7 +580,7 @@ class TestSender(BaseTestSender):
'/srv/node/dev/objects/9/abc/' '/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc', '9d41d8cd98f00b204e9800998ecf0abc',
'9d41d8cd98f00b204e9800998ecf0abc', '9d41d8cd98f00b204e9800998ecf0abc',
'1380144470.00000') {'ts_data': Timestamp(1380144470.00000)})
else: else:
raise Exception( raise Exception(
'No match for %r %r %r' % (device, partition, suffixes)) 'No match for %r %r %r' % (device, partition, suffixes))
@ -552,8 +603,9 @@ class TestSender(BaseTestSender):
self.sender.disconnect = mock.MagicMock() self.sender.disconnect = mock.MagicMock()
success, candidates = self.sender() success, candidates = self.sender()
self.assertTrue(success) self.assertTrue(success)
self.assertEqual(candidates, dict([('9d41d8cd98f00b204e9800998ecf0abc', self.assertEqual(candidates,
'1380144470.00000')])) dict([('9d41d8cd98f00b204e9800998ecf0abc',
{'ts_data': Timestamp(1380144470.00000)})]))
self.assertEqual(self.sender.failures, 0) self.assertEqual(self.sender.failures, 0)
def test_call_and_missing_check_with_obj_list_but_required(self): def test_call_and_missing_check_with_obj_list_but_required(self):
@ -564,7 +616,7 @@ class TestSender(BaseTestSender):
'/srv/node/dev/objects/9/abc/' '/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc', '9d41d8cd98f00b204e9800998ecf0abc',
'9d41d8cd98f00b204e9800998ecf0abc', '9d41d8cd98f00b204e9800998ecf0abc',
'1380144470.00000') {'ts_data': Timestamp(1380144470.00000)})
else: else:
raise Exception( raise Exception(
'No match for %r %r %r' % (device, partition, suffixes)) 'No match for %r %r %r' % (device, partition, suffixes))
@ -574,13 +626,13 @@ class TestSender(BaseTestSender):
'policy': POLICIES.legacy, 'policy': POLICIES.legacy,
'frag_index': 0, 'frag_index': 0,
} }
self.sender = ssync_sender.Sender(self.daemon, None, job, ['abc'], self.sender = ssync_sender.Sender(self.daemon, {}, job, ['abc'],
['9d41d8cd98f00b204e9800998ecf0abc']) ['9d41d8cd98f00b204e9800998ecf0abc'])
self.sender.connection = FakeConnection() self.sender.connection = FakeConnection()
self.sender.response = FakeResponse( self.sender.response = FakeResponse(
chunk_body=( chunk_body=(
':MISSING_CHECK: START\r\n' ':MISSING_CHECK: START\r\n'
'9d41d8cd98f00b204e9800998ecf0abc\r\n' '9d41d8cd98f00b204e9800998ecf0abc d\r\n'
':MISSING_CHECK: END\r\n')) ':MISSING_CHECK: END\r\n'))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.connect = mock.MagicMock() self.sender.connect = mock.MagicMock()
@ -753,7 +805,7 @@ class TestSender(BaseTestSender):
''.join(self.sender.connection.sent), ''.join(self.sender.connection.sent),
'17\r\n:MISSING_CHECK: START\r\n\r\n' '17\r\n:MISSING_CHECK: START\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n') '15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(self.sender.send_list, []) self.assertEqual(self.sender.send_map, {})
self.assertEqual(self.sender.available_map, {}) self.assertEqual(self.sender.available_map, {})
def test_missing_check_has_suffixes(self): def test_missing_check_has_suffixes(self):
@ -765,17 +817,18 @@ class TestSender(BaseTestSender):
'/srv/node/dev/objects/9/abc/' '/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc', '9d41d8cd98f00b204e9800998ecf0abc',
'9d41d8cd98f00b204e9800998ecf0abc', '9d41d8cd98f00b204e9800998ecf0abc',
'1380144470.00000') {'ts_data': Timestamp(1380144470.00000)})
yield ( yield (
'/srv/node/dev/objects/9/def/' '/srv/node/dev/objects/9/def/'
'9d41d8cd98f00b204e9800998ecf0def', '9d41d8cd98f00b204e9800998ecf0def',
'9d41d8cd98f00b204e9800998ecf0def', '9d41d8cd98f00b204e9800998ecf0def',
'1380144472.22222') {'ts_data': Timestamp(1380144472.22222)})
yield ( yield (
'/srv/node/dev/objects/9/def/' '/srv/node/dev/objects/9/def/'
'9d41d8cd98f00b204e9800998ecf1def', '9d41d8cd98f00b204e9800998ecf1def',
'9d41d8cd98f00b204e9800998ecf1def', '9d41d8cd98f00b204e9800998ecf1def',
'1380144474.44444') {'ts_data': Timestamp(1380144474.44444),
'ts_meta': Timestamp(1380144475.44444)})
else: else:
raise Exception( raise Exception(
'No match for %r %r %r %r' % (device, partition, 'No match for %r %r %r %r' % (device, partition,
@ -799,12 +852,17 @@ class TestSender(BaseTestSender):
'17\r\n:MISSING_CHECK: START\r\n\r\n' '17\r\n:MISSING_CHECK: START\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' '33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf0def 1380144472.22222\r\n\r\n' '33\r\n9d41d8cd98f00b204e9800998ecf0def 1380144472.22222\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf1def 1380144474.44444\r\n\r\n' '3b\r\n9d41d8cd98f00b204e9800998ecf1def 1380144474.44444 '
'm:186a0\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n') '15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(self.sender.send_list, []) self.assertEqual(self.sender.send_map, {})
candidates = [('9d41d8cd98f00b204e9800998ecf0abc', '1380144470.00000'), candidates = [('9d41d8cd98f00b204e9800998ecf0abc',
('9d41d8cd98f00b204e9800998ecf0def', '1380144472.22222'), dict(ts_data=Timestamp(1380144470.00000))),
('9d41d8cd98f00b204e9800998ecf1def', '1380144474.44444')] ('9d41d8cd98f00b204e9800998ecf0def',
dict(ts_data=Timestamp(1380144472.22222))),
('9d41d8cd98f00b204e9800998ecf1def',
dict(ts_data=Timestamp(1380144474.44444),
ts_meta=Timestamp(1380144475.44444)))]
self.assertEqual(self.sender.available_map, dict(candidates)) self.assertEqual(self.sender.available_map, dict(candidates))
def test_missing_check_far_end_disconnect(self): def test_missing_check_far_end_disconnect(self):
@ -816,7 +874,7 @@ class TestSender(BaseTestSender):
'/srv/node/dev/objects/9/abc/' '/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc', '9d41d8cd98f00b204e9800998ecf0abc',
'9d41d8cd98f00b204e9800998ecf0abc', '9d41d8cd98f00b204e9800998ecf0abc',
'1380144470.00000') {'ts_data': Timestamp(1380144470.00000)})
else: else:
raise Exception( raise Exception(
'No match for %r %r %r %r' % (device, partition, 'No match for %r %r %r %r' % (device, partition,
@ -844,7 +902,7 @@ class TestSender(BaseTestSender):
'15\r\n:MISSING_CHECK: END\r\n\r\n') '15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(self.sender.available_map, self.assertEqual(self.sender.available_map,
dict([('9d41d8cd98f00b204e9800998ecf0abc', dict([('9d41d8cd98f00b204e9800998ecf0abc',
'1380144470.00000')])) dict(ts_data=Timestamp(1380144470.00000)))]))
def test_missing_check_far_end_disconnect2(self): def test_missing_check_far_end_disconnect2(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs): def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
@ -855,7 +913,7 @@ class TestSender(BaseTestSender):
'/srv/node/dev/objects/9/abc/' '/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc', '9d41d8cd98f00b204e9800998ecf0abc',
'9d41d8cd98f00b204e9800998ecf0abc', '9d41d8cd98f00b204e9800998ecf0abc',
'1380144470.00000') {'ts_data': Timestamp(1380144470.00000)})
else: else:
raise Exception( raise Exception(
'No match for %r %r %r %r' % (device, partition, 'No match for %r %r %r %r' % (device, partition,
@ -884,7 +942,7 @@ class TestSender(BaseTestSender):
'15\r\n:MISSING_CHECK: END\r\n\r\n') '15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(self.sender.available_map, self.assertEqual(self.sender.available_map,
dict([('9d41d8cd98f00b204e9800998ecf0abc', dict([('9d41d8cd98f00b204e9800998ecf0abc',
'1380144470.00000')])) {'ts_data': Timestamp(1380144470.00000)})]))
def test_missing_check_far_end_unexpected(self): def test_missing_check_far_end_unexpected(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs): def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
@ -895,7 +953,7 @@ class TestSender(BaseTestSender):
'/srv/node/dev/objects/9/abc/' '/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc', '9d41d8cd98f00b204e9800998ecf0abc',
'9d41d8cd98f00b204e9800998ecf0abc', '9d41d8cd98f00b204e9800998ecf0abc',
'1380144470.00000') {'ts_data': Timestamp(1380144470.00000)})
else: else:
raise Exception( raise Exception(
'No match for %r %r %r %r' % (device, partition, 'No match for %r %r %r %r' % (device, partition,
@ -923,9 +981,9 @@ class TestSender(BaseTestSender):
'15\r\n:MISSING_CHECK: END\r\n\r\n') '15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(self.sender.available_map, self.assertEqual(self.sender.available_map,
dict([('9d41d8cd98f00b204e9800998ecf0abc', dict([('9d41d8cd98f00b204e9800998ecf0abc',
'1380144470.00000')])) {'ts_data': Timestamp(1380144470.00000)})]))
def test_missing_check_send_list(self): def test_missing_check_send_map(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs): def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if (device == 'dev' and partition == '9' and if (device == 'dev' and partition == '9' and
policy == POLICIES.legacy and policy == POLICIES.legacy and
@ -934,7 +992,7 @@ class TestSender(BaseTestSender):
'/srv/node/dev/objects/9/abc/' '/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc', '9d41d8cd98f00b204e9800998ecf0abc',
'9d41d8cd98f00b204e9800998ecf0abc', '9d41d8cd98f00b204e9800998ecf0abc',
'1380144470.00000') {'ts_data': Timestamp(1380144470.00000)})
else: else:
raise Exception( raise Exception(
'No match for %r %r %r %r' % (device, partition, 'No match for %r %r %r %r' % (device, partition,
@ -950,7 +1008,7 @@ class TestSender(BaseTestSender):
self.sender.response = FakeResponse( self.sender.response = FakeResponse(
chunk_body=( chunk_body=(
':MISSING_CHECK: START\r\n' ':MISSING_CHECK: START\r\n'
'0123abc\r\n' '0123abc dm\r\n'
':MISSING_CHECK: END\r\n')) ':MISSING_CHECK: END\r\n'))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.missing_check() self.sender.missing_check()
@ -959,10 +1017,11 @@ class TestSender(BaseTestSender):
'17\r\n:MISSING_CHECK: START\r\n\r\n' '17\r\n:MISSING_CHECK: START\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' '33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n') '15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(self.sender.send_list, ['0123abc']) self.assertEqual(
self.sender.send_map, {'0123abc': {'data': True, 'meta': True}})
self.assertEqual(self.sender.available_map, self.assertEqual(self.sender.available_map,
dict([('9d41d8cd98f00b204e9800998ecf0abc', dict([('9d41d8cd98f00b204e9800998ecf0abc',
'1380144470.00000')])) {'ts_data': Timestamp(1380144470.00000)})]))
def test_missing_check_extra_line_parts(self): def test_missing_check_extra_line_parts(self):
# check that sender tolerates extra parts in missing check # check that sender tolerates extra parts in missing check
@ -975,7 +1034,7 @@ class TestSender(BaseTestSender):
'/srv/node/dev/objects/9/abc/' '/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc', '9d41d8cd98f00b204e9800998ecf0abc',
'9d41d8cd98f00b204e9800998ecf0abc', '9d41d8cd98f00b204e9800998ecf0abc',
'1380144470.00000') {'ts_data': Timestamp(1380144470.00000)})
else: else:
raise Exception( raise Exception(
'No match for %r %r %r %r' % (device, partition, 'No match for %r %r %r %r' % (device, partition,
@ -991,14 +1050,15 @@ class TestSender(BaseTestSender):
self.sender.response = FakeResponse( self.sender.response = FakeResponse(
chunk_body=( chunk_body=(
':MISSING_CHECK: START\r\n' ':MISSING_CHECK: START\r\n'
'0123abc extra response parts\r\n' '0123abc d extra response parts\r\n'
':MISSING_CHECK: END\r\n')) ':MISSING_CHECK: END\r\n'))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.missing_check() self.sender.missing_check()
self.assertEqual(self.sender.send_list, ['0123abc']) self.assertEqual(self.sender.send_map,
{'0123abc': {'data': True}})
self.assertEqual(self.sender.available_map, self.assertEqual(self.sender.available_map,
dict([('9d41d8cd98f00b204e9800998ecf0abc', dict([('9d41d8cd98f00b204e9800998ecf0abc',
'1380144470.00000')])) {'ts_data': Timestamp(1380144470.00000)})]))
def test_updates_timeout(self): def test_updates_timeout(self):
self.sender.connection = FakeConnection() self.sender.connection = FakeConnection()
@ -1006,9 +1066,9 @@ class TestSender(BaseTestSender):
self.sender.daemon.node_timeout = 0.01 self.sender.daemon.node_timeout = 0.01
self.assertRaises(exceptions.MessageTimeout, self.sender.updates) self.assertRaises(exceptions.MessageTimeout, self.sender.updates)
def test_updates_empty_send_list(self): def test_updates_empty_send_map(self):
self.sender.connection = FakeConnection() self.sender.connection = FakeConnection()
self.sender.send_list = [] self.sender.send_map = {}
self.sender.response = FakeResponse( self.sender.response = FakeResponse(
chunk_body=( chunk_body=(
':UPDATES: START\r\n' ':UPDATES: START\r\n'
@ -1021,7 +1081,7 @@ class TestSender(BaseTestSender):
def test_updates_unexpected_response_lines1(self): def test_updates_unexpected_response_lines1(self):
self.sender.connection = FakeConnection() self.sender.connection = FakeConnection()
self.sender.send_list = [] self.sender.send_map = {}
self.sender.response = FakeResponse( self.sender.response = FakeResponse(
chunk_body=( chunk_body=(
'abc\r\n' 'abc\r\n'
@ -1040,7 +1100,7 @@ class TestSender(BaseTestSender):
def test_updates_unexpected_response_lines2(self): def test_updates_unexpected_response_lines2(self):
self.sender.connection = FakeConnection() self.sender.connection = FakeConnection()
self.sender.send_list = [] self.sender.send_map = {}
self.sender.response = FakeResponse( self.sender.response = FakeResponse(
chunk_body=( chunk_body=(
':UPDATES: START\r\n' ':UPDATES: START\r\n'
@ -1073,7 +1133,7 @@ class TestSender(BaseTestSender):
'frag_index': 0, 'frag_index': 0,
} }
self.sender.node = {} self.sender.node = {}
self.sender.send_list = [object_hash] self.sender.send_map = {object_hash: {'data': True}}
self.sender.send_delete = mock.MagicMock() self.sender.send_delete = mock.MagicMock()
self.sender.send_put = mock.MagicMock() self.sender.send_put = mock.MagicMock()
self.sender.response = FakeResponse( self.sender.response = FakeResponse(
@ -1107,7 +1167,7 @@ class TestSender(BaseTestSender):
'frag_index': 0, 'frag_index': 0,
} }
self.sender.node = {} self.sender.node = {}
self.sender.send_list = [object_hash] self.sender.send_map = {object_hash: {'data': True}}
self.sender.response = FakeResponse( self.sender.response = FakeResponse(
chunk_body=( chunk_body=(
':UPDATES: START\r\n' ':UPDATES: START\r\n'
@ -1124,11 +1184,19 @@ class TestSender(BaseTestSender):
) )
def test_updates_put(self): def test_updates_put(self):
# sender has data file and meta file
ts_iter = make_timestamp_iter()
device = 'dev' device = 'dev'
part = '9' part = '9'
object_parts = ('a', 'c', 'o') object_parts = ('a', 'c', 'o')
df = self._make_open_diskfile(device, part, *object_parts) t1 = ts_iter.next()
df = self._make_open_diskfile(
device, part, *object_parts, timestamp=t1)
t2 = ts_iter.next()
metadata = {'X-Timestamp': t2.internal, 'X-Object-Meta-Fruit': 'kiwi'}
df.write_metadata(metadata)
object_hash = utils.hash_path(*object_parts) object_hash = utils.hash_path(*object_parts)
df.open()
expected = df.get_metadata() expected = df.get_metadata()
self.sender.connection = FakeConnection() self.sender.connection = FakeConnection()
self.sender.job = { self.sender.job = {
@ -1138,15 +1206,18 @@ class TestSender(BaseTestSender):
'frag_index': 0, 'frag_index': 0,
} }
self.sender.node = {} self.sender.node = {}
self.sender.send_list = [object_hash] # receiver requested data only
self.sender.send_map = {object_hash: {'data': True}}
self.sender.send_delete = mock.MagicMock() self.sender.send_delete = mock.MagicMock()
self.sender.send_put = mock.MagicMock() self.sender.send_put = mock.MagicMock()
self.sender.send_post = mock.MagicMock()
self.sender.response = FakeResponse( self.sender.response = FakeResponse(
chunk_body=( chunk_body=(
':UPDATES: START\r\n' ':UPDATES: START\r\n'
':UPDATES: END\r\n')) ':UPDATES: END\r\n'))
self.sender.updates() self.sender.updates()
self.assertEqual(self.sender.send_delete.mock_calls, []) self.assertEqual(self.sender.send_delete.mock_calls, [])
self.assertEqual(self.sender.send_post.mock_calls, [])
self.assertEqual(1, len(self.sender.send_put.mock_calls)) self.assertEqual(1, len(self.sender.send_put.mock_calls))
args, _kwargs = self.sender.send_put.call_args args, _kwargs = self.sender.send_put.call_args
path, df = args path, df = args
@ -1160,6 +1231,105 @@ class TestSender(BaseTestSender):
'11\r\n:UPDATES: START\r\n\r\n' '11\r\n:UPDATES: START\r\n\r\n'
'f\r\n:UPDATES: END\r\n\r\n') 'f\r\n:UPDATES: END\r\n\r\n')
def test_updates_post(self):
ts_iter = make_timestamp_iter()
device = 'dev'
part = '9'
object_parts = ('a', 'c', 'o')
t1 = ts_iter.next()
df = self._make_open_diskfile(
device, part, *object_parts, timestamp=t1)
t2 = ts_iter.next()
metadata = {'X-Timestamp': t2.internal, 'X-Object-Meta-Fruit': 'kiwi'}
df.write_metadata(metadata)
object_hash = utils.hash_path(*object_parts)
df.open()
expected = df.get_metadata()
self.sender.connection = FakeConnection()
self.sender.job = {
'device': device,
'partition': part,
'policy': POLICIES.legacy,
'frag_index': 0,
}
self.sender.node = {}
# receiver requested only meta
self.sender.send_map = {object_hash: {'meta': True}}
self.sender.send_delete = mock.MagicMock()
self.sender.send_put = mock.MagicMock()
self.sender.send_post = mock.MagicMock()
self.sender.response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
self.sender.updates()
self.assertEqual(self.sender.send_delete.mock_calls, [])
self.assertEqual(self.sender.send_put.mock_calls, [])
self.assertEqual(1, len(self.sender.send_post.mock_calls))
args, _kwargs = self.sender.send_post.call_args
path, df = args
self.assertEqual(path, '/a/c/o')
self.assertIsInstance(df, diskfile.DiskFile)
self.assertEqual(expected, df.get_metadata())
# note that the post line isn't actually sent since we mock send_post;
# send_post is tested separately.
self.assertEqual(
''.join(self.sender.connection.sent),
'11\r\n:UPDATES: START\r\n\r\n'
'f\r\n:UPDATES: END\r\n\r\n')
def test_updates_put_and_post(self):
ts_iter = make_timestamp_iter()
device = 'dev'
part = '9'
object_parts = ('a', 'c', 'o')
t1 = ts_iter.next()
df = self._make_open_diskfile(
device, part, *object_parts, timestamp=t1)
t2 = ts_iter.next()
metadata = {'X-Timestamp': t2.internal, 'X-Object-Meta-Fruit': 'kiwi'}
df.write_metadata(metadata)
object_hash = utils.hash_path(*object_parts)
df.open()
expected = df.get_metadata()
self.sender.connection = FakeConnection()
self.sender.job = {
'device': device,
'partition': part,
'policy': POLICIES.legacy,
'frag_index': 0,
}
self.sender.node = {}
# receiver requested data and meta
self.sender.send_map = {object_hash: {'meta': True, 'data': True}}
self.sender.send_delete = mock.MagicMock()
self.sender.send_put = mock.MagicMock()
self.sender.send_post = mock.MagicMock()
self.sender.response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
self.sender.updates()
self.assertEqual(self.sender.send_delete.mock_calls, [])
self.assertEqual(1, len(self.sender.send_put.mock_calls))
self.assertEqual(1, len(self.sender.send_post.mock_calls))
args, _kwargs = self.sender.send_put.call_args
path, df = args
self.assertEqual(path, '/a/c/o')
self.assertIsInstance(df, diskfile.DiskFile)
self.assertEqual(expected, df.get_metadata())
args, _kwargs = self.sender.send_post.call_args
path, df = args
self.assertEqual(path, '/a/c/o')
self.assertIsInstance(df, diskfile.DiskFile)
self.assertEqual(expected, df.get_metadata())
self.assertEqual(
''.join(self.sender.connection.sent),
'11\r\n:UPDATES: START\r\n\r\n'
'f\r\n:UPDATES: END\r\n\r\n')
def test_updates_storage_policy_index(self): def test_updates_storage_policy_index(self):
device = 'dev' device = 'dev'
part = '9' part = '9'
@ -1175,7 +1345,7 @@ class TestSender(BaseTestSender):
'policy': POLICIES[0], 'policy': POLICIES[0],
'frag_index': 0} 'frag_index': 0}
self.sender.node = {} self.sender.node = {}
self.sender.send_list = [object_hash] self.sender.send_map = {object_hash: {'data': True}}
self.sender.send_delete = mock.MagicMock() self.sender.send_delete = mock.MagicMock()
self.sender.send_put = mock.MagicMock() self.sender.send_put = mock.MagicMock()
self.sender.response = FakeResponse( self.sender.response = FakeResponse(
@ -1194,7 +1364,7 @@ class TestSender(BaseTestSender):
def test_updates_read_response_timeout_start(self): def test_updates_read_response_timeout_start(self):
self.sender.connection = FakeConnection() self.sender.connection = FakeConnection()
self.sender.send_list = [] self.sender.send_map = {}
self.sender.response = FakeResponse( self.sender.response = FakeResponse(
chunk_body=( chunk_body=(
':UPDATES: START\r\n' ':UPDATES: START\r\n'
@ -1211,7 +1381,7 @@ class TestSender(BaseTestSender):
def test_updates_read_response_disconnect_start(self): def test_updates_read_response_disconnect_start(self):
self.sender.connection = FakeConnection() self.sender.connection = FakeConnection()
self.sender.send_list = [] self.sender.send_map = {}
self.sender.response = FakeResponse(chunk_body='\r\n') self.sender.response = FakeResponse(chunk_body='\r\n')
exc = None exc = None
try: try:
@ -1226,7 +1396,7 @@ class TestSender(BaseTestSender):
def test_updates_read_response_unexp_start(self): def test_updates_read_response_unexp_start(self):
self.sender.connection = FakeConnection() self.sender.connection = FakeConnection()
self.sender.send_list = [] self.sender.send_map = {}
self.sender.response = FakeResponse( self.sender.response = FakeResponse(
chunk_body=( chunk_body=(
'anything else\r\n' 'anything else\r\n'
@ -1245,7 +1415,7 @@ class TestSender(BaseTestSender):
def test_updates_read_response_timeout_end(self): def test_updates_read_response_timeout_end(self):
self.sender.connection = FakeConnection() self.sender.connection = FakeConnection()
self.sender.send_list = [] self.sender.send_map = {}
self.sender.response = FakeResponse( self.sender.response = FakeResponse(
chunk_body=( chunk_body=(
':UPDATES: START\r\n' ':UPDATES: START\r\n'
@ -1264,7 +1434,7 @@ class TestSender(BaseTestSender):
def test_updates_read_response_disconnect_end(self): def test_updates_read_response_disconnect_end(self):
self.sender.connection = FakeConnection() self.sender.connection = FakeConnection()
self.sender.send_list = [] self.sender.send_map = {}
self.sender.response = FakeResponse( self.sender.response = FakeResponse(
chunk_body=( chunk_body=(
':UPDATES: START\r\n' ':UPDATES: START\r\n'
@ -1282,7 +1452,7 @@ class TestSender(BaseTestSender):
def test_updates_read_response_unexp_end(self): def test_updates_read_response_unexp_end(self):
self.sender.connection = FakeConnection() self.sender.connection = FakeConnection()
self.sender.send_list = [] self.sender.send_map = {}
self.sender.response = FakeResponse( self.sender.response = FakeResponse(
chunk_body=( chunk_body=(
':UPDATES: START\r\n' ':UPDATES: START\r\n'
@ -1358,13 +1528,20 @@ class TestSender(BaseTestSender):
self.assertEqual(str(exc), '0.01 seconds: send_put chunk') self.assertEqual(str(exc), '0.01 seconds: send_put chunk')
def test_send_put(self): def test_send_put(self):
ts_iter = make_timestamp_iter()
t1 = ts_iter.next()
body = 'test' body = 'test'
extra_metadata = {'Some-Other-Header': 'value'} extra_metadata = {'Some-Other-Header': 'value'}
df = self._make_open_diskfile(body=body, df = self._make_open_diskfile(body=body, timestamp=t1,
extra_metadata=extra_metadata) extra_metadata=extra_metadata)
expected = dict(df.get_metadata()) expected = dict(df.get_metadata())
expected['body'] = body expected['body'] = body
expected['chunk_size'] = len(body) expected['chunk_size'] = len(body)
# .meta file metadata is not included in expected for data only PUT
t2 = ts_iter.next()
metadata = {'X-Timestamp': t2.internal, 'X-Object-Meta-Fruit': 'kiwi'}
df.write_metadata(metadata)
df.open()
self.sender.connection = FakeConnection() self.sender.connection = FakeConnection()
self.sender.send_put('/a/c/o', df) self.sender.send_put('/a/c/o', df)
self.assertEqual( self.assertEqual(
@ -1380,6 +1557,32 @@ class TestSender(BaseTestSender):
'%(chunk_size)s\r\n' '%(chunk_size)s\r\n'
'%(body)s\r\n' % expected) '%(body)s\r\n' % expected)
def test_send_post(self):
# create .data file
extra_metadata = {'X-Object-Meta-Foo': 'old_value',
'X-Object-Sysmeta-Test': 'test_sysmeta',
'Content-Type': 'test_content_type'}
ts_0 = next(make_timestamp_iter())
df = self._make_open_diskfile(extra_metadata=extra_metadata,
timestamp=ts_0)
# create .meta file
ts_1 = next(make_timestamp_iter())
newer_metadata = {'X-Object-Meta-Foo': 'new_value',
'X-Timestamp': ts_1.internal}
df.write_metadata(newer_metadata)
self.sender.connection = FakeConnection()
with df.open():
self.sender.send_post('/a/c/o', df)
self.assertEqual(
''.join(self.sender.connection.sent),
'4c\r\n'
'POST /a/c/o\r\n'
'X-Object-Meta-Foo: new_value\r\n'
'X-Timestamp: %s\r\n'
'\r\n'
'\r\n' % ts_1.internal)
def test_disconnect_timeout(self): def test_disconnect_timeout(self):
self.sender.connection = FakeConnection() self.sender.connection = FakeConnection()
self.sender.connection.send = lambda d: eventlet.sleep(1) self.sender.connection.send = lambda d: eventlet.sleep(1)
@ -1556,7 +1759,7 @@ class TestBaseSsync(BaseTestSender):
def tx_updates(results, line): def tx_updates(results, line):
self.assertEqual('tx', line[0]) self.assertEqual('tx', line[0])
subrequests = results['tx_updates'] subrequests = results['tx_updates']
if line[1].startswith(('PUT', 'DELETE')): if line[1].startswith(('PUT', 'DELETE', 'POST')):
parts = line[1].split('\r\n') parts = line[1].split('\r\n')
method, path = parts[0].split() method, path = parts[0].split()
subreq = {'method': method, 'path': path, 'req': line[1], subreq = {'method': method, 'path': path, 'req': line[1],
@ -1626,6 +1829,8 @@ class TestBaseSsync(BaseTestSender):
""" """
for o_name, diskfiles in tx_objs.items(): for o_name, diskfiles in tx_objs.items():
for tx_df in diskfiles: for tx_df in diskfiles:
# check tx file still intact - ssync does not do any cleanup!
tx_df.open()
if tx_frag_index is None or tx_df._frag_index == tx_frag_index: if tx_frag_index is None or tx_df._frag_index == tx_frag_index:
# this diskfile should have been sync'd, # this diskfile should have been sync'd,
# check rx file is ok # check rx file is ok
@ -1641,24 +1846,21 @@ class TestBaseSsync(BaseTestSender):
self.assertRaises(DiskFileNotExist, self._open_rx_diskfile, self.assertRaises(DiskFileNotExist, self._open_rx_diskfile,
o_name, policy, o_name, policy,
frag_index=tx_df._frag_index) frag_index=tx_df._frag_index)
# check tx file still intact - ssync does not do any cleanup!
tx_df.open()
def _verify_tombstones(self, tx_objs, policy): def _verify_tombstones(self, tx_objs, policy):
# verify tx and rx tombstones that should be in sync # verify tx and rx tombstones that should be in sync
for o_name, diskfiles in tx_objs.items(): for o_name, diskfiles in tx_objs.items():
for tx_df_ in diskfiles: try:
try: self._open_tx_diskfile(o_name, policy)
self._open_tx_diskfile(o_name, policy) self.fail('DiskFileDeleted expected')
self.fail('DiskFileDeleted expected') except DiskFileDeleted as exc:
except DiskFileDeleted as exc: tx_delete_time = exc.timestamp
tx_delete_time = exc.timestamp try:
try: self._open_rx_diskfile(o_name, policy)
self._open_rx_diskfile(o_name, policy) self.fail('DiskFileDeleted expected')
self.fail('DiskFileDeleted expected') except DiskFileDeleted as exc:
except DiskFileDeleted as exc: rx_delete_time = exc.timestamp
rx_delete_time = exc.timestamp self.assertEqual(tx_delete_time, rx_delete_time)
self.assertEqual(tx_delete_time, rx_delete_time)
@patch_policies(with_ec_default=True) @patch_policies(with_ec_default=True)
@ -1879,7 +2081,7 @@ class TestSsyncReplication(TestBaseSsync):
tx_objs['o2'] = self._create_ondisk_files(tx_df_mgr, 'o2', policy, t2) tx_objs['o2'] = self._create_ondisk_files(tx_df_mgr, 'o2', policy, t2)
# o3 is on tx and older copy on rx # o3 is on tx and older copy on rx
t3a = next(self.ts_iter) t3a = next(self.ts_iter)
rx_objs['o3'] = self._create_ondisk_files(tx_df_mgr, 'o3', policy, t3a) rx_objs['o3'] = self._create_ondisk_files(rx_df_mgr, 'o3', policy, t3a)
t3b = next(self.ts_iter) t3b = next(self.ts_iter)
tx_objs['o3'] = self._create_ondisk_files(tx_df_mgr, 'o3', policy, t3b) tx_objs['o3'] = self._create_ondisk_files(tx_df_mgr, 'o3', policy, t3b)
# o4 in sync on rx and tx # o4 in sync on rx and tx
@ -1925,7 +2127,7 @@ class TestSsyncReplication(TestBaseSsync):
# run the sync protocol... # run the sync protocol...
success, in_sync_objs = sender() success, in_sync_objs = sender()
self.assertEqual(7, len(in_sync_objs)) self.assertEqual(7, len(in_sync_objs), trace['messages'])
self.assertTrue(success) self.assertTrue(success)
# verify protocol # verify protocol
@ -1983,6 +2185,287 @@ class TestSsyncReplication(TestBaseSsync):
# TOTAL = 80 # TOTAL = 80
self.assertEqual(80, trace.get('readline_bytes')) self.assertEqual(80, trace.get('readline_bytes'))
def test_meta_file_sync(self):
policy = POLICIES.default
rx_node_index = 0
# create diskfiles...
tx_objs = {}
rx_objs = {}
tx_tombstones = {}
rx_tombstones = {}
tx_df_mgr = self.daemon._diskfile_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy]
expected_subreqs = defaultdict(list)
# o1 on tx only with meta file
t1 = self.ts_iter.next()
tx_objs['o1'] = self._create_ondisk_files(tx_df_mgr, 'o1', policy, t1)
t1_meta = self.ts_iter.next()
metadata = {'X-Timestamp': t1_meta.internal,
'X-Object-Meta-Test': 'o1',
'X-Object-Sysmeta-Test': 'sys_o1'}
tx_objs['o1'][0].write_metadata(metadata)
expected_subreqs['PUT'].append('o1')
expected_subreqs['POST'].append('o1')
# o2 on tx with meta, on rx without meta
t2 = self.ts_iter.next()
tx_objs['o2'] = self._create_ondisk_files(tx_df_mgr, 'o2', policy, t2)
t2_meta = self.ts_iter.next()
metadata = {'X-Timestamp': t2_meta.internal,
'X-Object-Meta-Test': 'o2',
'X-Object-Sysmeta-Test': 'sys_o2'}
tx_objs['o2'][0].write_metadata(metadata)
rx_objs['o2'] = self._create_ondisk_files(rx_df_mgr, 'o2', policy, t2)
expected_subreqs['POST'].append('o2')
# o3 is on tx with meta, rx has newer data but no meta
t3a = self.ts_iter.next()
tx_objs['o3'] = self._create_ondisk_files(tx_df_mgr, 'o3', policy, t3a)
t3b = self.ts_iter.next()
rx_objs['o3'] = self._create_ondisk_files(rx_df_mgr, 'o3', policy, t3b)
t3_meta = self.ts_iter.next()
metadata = {'X-Timestamp': t3_meta.internal,
'X-Object-Meta-Test': 'o3',
'X-Object-Sysmeta-Test': 'sys_o3'}
tx_objs['o3'][0].write_metadata(metadata)
expected_subreqs['POST'].append('o3')
# o4 is on tx with meta, rx has older data and up to date meta
t4a = self.ts_iter.next()
rx_objs['o4'] = self._create_ondisk_files(rx_df_mgr, 'o4', policy, t4a)
t4b = self.ts_iter.next()
tx_objs['o4'] = self._create_ondisk_files(tx_df_mgr, 'o4', policy, t4b)
t4_meta = self.ts_iter.next()
metadata = {'X-Timestamp': t4_meta.internal,
'X-Object-Meta-Test': 'o4',
'X-Object-Sysmeta-Test': 'sys_o4'}
tx_objs['o4'][0].write_metadata(metadata)
rx_objs['o4'][0].write_metadata(metadata)
expected_subreqs['PUT'].append('o4')
# o5 is on tx with meta, rx is in sync with data and meta
t5 = self.ts_iter.next()
rx_objs['o5'] = self._create_ondisk_files(rx_df_mgr, 'o5', policy, t5)
tx_objs['o5'] = self._create_ondisk_files(tx_df_mgr, 'o5', policy, t5)
t5_meta = self.ts_iter.next()
metadata = {'X-Timestamp': t5_meta.internal,
'X-Object-Meta-Test': 'o5',
'X-Object-Sysmeta-Test': 'sys_o5'}
tx_objs['o5'][0].write_metadata(metadata)
rx_objs['o5'][0].write_metadata(metadata)
# o6 is tombstone on tx, rx has older data and meta
t6 = self.ts_iter.next()
tx_tombstones['o6'] = self._create_ondisk_files(
tx_df_mgr, 'o6', policy, t6)
rx_tombstones['o6'] = self._create_ondisk_files(
rx_df_mgr, 'o6', policy, t6)
metadata = {'X-Timestamp': self.ts_iter.next().internal,
'X-Object-Meta-Test': 'o6',
'X-Object-Sysmeta-Test': 'sys_o6'}
rx_tombstones['o6'][0].write_metadata(metadata)
tx_tombstones['o6'][0].delete(self.ts_iter.next())
expected_subreqs['DELETE'].append('o6')
# o7 is tombstone on rx, tx has older data and meta,
# no subreqs expected...
t7 = self.ts_iter.next()
tx_objs['o7'] = self._create_ondisk_files(tx_df_mgr, 'o7', policy, t7)
rx_tombstones['o7'] = self._create_ondisk_files(
rx_df_mgr, 'o7', policy, t7)
metadata = {'X-Timestamp': self.ts_iter.next().internal,
'X-Object-Meta-Test': 'o7',
'X-Object-Sysmeta-Test': 'sys_o7'}
tx_objs['o7'][0].write_metadata(metadata)
rx_tombstones['o7'][0].delete(self.ts_iter.next())
suffixes = set()
for diskfiles in (tx_objs.values() + tx_tombstones.values()):
for df in diskfiles:
suffixes.add(os.path.basename(os.path.dirname(df._datadir)))
# create ssync sender instance...
job = {'device': self.device,
'partition': self.partition,
'policy': policy}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
# run the sync protocol...
success, in_sync_objs = sender()
self.assertEqual(7, len(in_sync_objs))
self.assertTrue(success)
# verify protocol
results = self._analyze_trace(trace)
self.assertEqual(7, len(results['tx_missing']))
self.assertEqual(5, len(results['rx_missing']))
for subreq in results.get('tx_updates'):
obj = subreq['path'].split('/')[3]
method = subreq['method']
self.assertTrue(obj in expected_subreqs[method],
'Unexpected %s subreq for object %s, expected %s'
% (method, obj, expected_subreqs[method]))
expected_subreqs[method].remove(obj)
if method == 'PUT':
expected_body = '%s___None' % subreq['path']
self.assertEqual(expected_body, subreq['body'])
# verify all expected subreqs consumed
for _method, expected in expected_subreqs.items():
self.assertFalse(expected)
self.assertFalse(results['rx_updates'])
# verify on disk files...
del tx_objs['o7'] # o7 not expected to be sync'd
self._verify_ondisk_files(tx_objs, policy)
self._verify_tombstones(tx_tombstones, policy)
for oname, rx_obj in rx_objs.items():
df = rx_obj[0].open()
metadata = df.get_metadata()
self.assertEqual(metadata['X-Object-Meta-Test'], oname)
self.assertEqual(metadata['X-Object-Sysmeta-Test'], 'sys_' + oname)
def test_meta_file_not_synced_to_legacy_receiver(self):
# verify that the sender does sync a data file to a legacy receiver,
# but does not PUT meta file content to a legacy receiver
policy = POLICIES.default
rx_node_index = 0
# create diskfiles...
tx_df_mgr = self.daemon._diskfile_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy]
# rx has data at t1 but no meta
# object is on tx with data at t2, meta at t3,
t1 = self.ts_iter.next()
self._create_ondisk_files(rx_df_mgr, 'o1', policy, t1)
t2 = self.ts_iter.next()
tx_obj = self._create_ondisk_files(tx_df_mgr, 'o1', policy, t2)[0]
t3 = self.ts_iter.next()
metadata = {'X-Timestamp': t3.internal,
'X-Object-Meta-Test': 'o3',
'X-Object-Sysmeta-Test': 'sys_o3'}
tx_obj.write_metadata(metadata)
suffixes = [os.path.basename(os.path.dirname(tx_obj._datadir))]
# create ssync sender instance...
job = {'device': self.device,
'partition': self.partition,
'policy': policy}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
def _legacy_check_missing(self, line):
# reproduces behavior of 'legacy' ssync receiver missing_checks()
parts = line.split()
object_hash, timestamp = [urllib.unquote(v) for v in parts[:2]]
want = False
try:
df = self.diskfile_mgr.get_diskfile_from_hash(
self.device, self.partition, object_hash, self.policy,
frag_index=self.frag_index)
except exceptions.DiskFileNotExist:
want = True
else:
try:
df.open()
except exceptions.DiskFileDeleted as err:
want = err.timestamp < timestamp
except exceptions.DiskFileError as err:
want = True
else:
want = df.timestamp < timestamp
if want:
return urllib.quote(object_hash)
return None
# run the sync protocol...
func = 'swift.obj.ssync_receiver.Receiver._check_missing'
with mock.patch(func, _legacy_check_missing):
success, in_sync_objs = sender()
self.assertEqual(1, len(in_sync_objs))
self.assertTrue(success)
# verify protocol, expecting only a PUT to legacy receiver
results = self._analyze_trace(trace)
self.assertEqual(1, len(results['tx_missing']))
self.assertEqual(1, len(results['rx_missing']))
self.assertEqual(1, len(results['tx_updates']))
self.assertEqual('PUT', results['tx_updates'][0]['method'])
self.assertFalse(results['rx_updates'])
# verify on disk files...
rx_obj = self._open_rx_diskfile('o1', policy)
tx_obj = self._open_tx_diskfile('o1', policy)
# with legacy behavior rx_obj data and meta timestamps are equal
self.assertEqual(t2, rx_obj.data_timestamp)
self.assertEqual(t2, rx_obj.timestamp)
# with legacy behavior rx_obj data timestamp should equal tx_obj
self.assertEqual(rx_obj.data_timestamp, tx_obj.data_timestamp)
# tx meta file should not have been sync'd to rx data file
self.assertNotIn('X-Object-Meta-Test', rx_obj.get_metadata())
class TestModuleMethods(unittest.TestCase):
def test_encode_missing(self):
object_hash = '9d41d8cd98f00b204e9800998ecf0abc'
ts_iter = make_timestamp_iter()
t_data = ts_iter.next()
t_meta = ts_iter.next()
d_meta_data = t_meta.raw - t_data.raw
# equal data and meta timestamps -> legacy single timestamp string
expected = '%s %s' % (object_hash, t_data.internal)
self.assertEqual(
expected,
ssync_sender.encode_missing(object_hash, t_data, ts_meta=t_data))
# newer meta timestamp -> hex data delta encoded as extra message part
expected = '%s %s m:%x' % (object_hash, t_data.internal, d_meta_data)
self.assertEqual(
expected,
ssync_sender.encode_missing(object_hash, t_data, ts_meta=t_meta))
# test encode and decode functions invert
expected = {'object_hash': object_hash, 'ts_meta': t_meta,
'ts_data': t_data}
msg = ssync_sender.encode_missing(**expected)
actual = ssync_receiver.decode_missing(msg)
self.assertEqual(expected, actual)
def test_decode_wanted(self):
parts = ['d']
expected = {'data': True}
self.assertEqual(ssync_sender.decode_wanted(parts), expected)
parts = ['m']
expected = {'meta': True}
self.assertEqual(ssync_sender.decode_wanted(parts), expected)
parts = ['dm']
expected = {'data': True, 'meta': True}
self.assertEqual(ssync_sender.decode_wanted(parts), expected)
# you don't really these next few...
parts = ['md']
expected = {'data': True, 'meta': True}
self.assertEqual(ssync_sender.decode_wanted(parts), expected)
parts = ['xcy', 'funny', {'business': True}]
expected = {'data': True}
self.assertEqual(ssync_sender.decode_wanted(parts), expected)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()