ssync: sync non-durable fragments from handoffs

Previously, ssync would not sync nor cleanup non-durable data
fragments on handoffs. When the reconstructor is syncing objects from
a handoff node (a 'revert' reconstructor job) it may be useful, and is
not harmful, to also send non-durable fragments if the receiver has
older or no fragment data.

Several changes are made to enable this. On the sending side:

  - For handoff (revert) jobs, the reconstructor instantiates
    SsyncSender with a new 'include_non_durable' option.
  - If configured with the include_non_durable option, the SsyncSender
    calls the diskfile yield_hashes function with options that allow
    non-durable fragments to be yielded.
  - The diskfile yield_hashes function is enhanced to include a
    'durable' flag in the data structure yielded for each object.
  - The SsyncSender includes the 'durable' flag in the metadata sent
    during the missing_check exchange with the receiver.
  - If the receiver requests the non-durable object, the SsyncSender
    includes a new 'X-Backend-No-Commit' header when sending the PUT
    subrequest for the object.
  - The SsyncSender includes the non-durable object in the collection
    of synced objects returned to the reconstructor so that the
    non-durable fragment is removed from the handoff node.

On the receiving side:

  - The object server includes a new 'X-Backend-Accept-No-Commit'
    header in its response to SSYNC requests. This indicates to the
    sender that the receiver has been upgraded to understand the
    'X-Backend-No-Commit' header.
  - The SsyncReceiver is enhanced to consider non-durable data when
    determining if the sender's data is wanted or not.
  - The object server PUT method is enhanced to check for and
    'X-Backend-No-Commit' header before committing a diskfile.

If a handoff sender has both a durable and newer non-durable fragment
for the same object and frag-index, only the newer non-durable
fragment will be synced and removed on the first reconstructor
pass. The durable fragment will be synced and removed on the next
reconstructor pass.

Change-Id: I1d47b865e0a621f35d323bbed472a6cfd2a5971b
Closes-Bug: 1778002
This commit is contained in:
Alistair Coles 2021-01-08 20:23:37 +00:00
parent 128f199508
commit 1dceafa7d5
14 changed files with 1195 additions and 169 deletions

View File

@ -1590,6 +1590,7 @@ class BaseDiskFileManager(object):
- ts_meta -> timestamp of meta file, if one exists
- ts_ctype -> timestamp of meta file containing most recent
content-type value, if one exists
- durable -> True if data file at ts_data is durable, False otherwise
where timestamps are instances of
:class:`~swift.common.utils.Timestamp`
@ -1611,11 +1612,15 @@ class BaseDiskFileManager(object):
(os.path.join(partition_path, suffix), suffix)
for suffix in suffixes)
key_preference = (
# define keys that we need to extract the result from the on disk info
# data:
# (x, y, z) -> result[x] should take the value of y[z]
key_map = (
('ts_meta', 'meta_info', 'timestamp'),
('ts_data', 'data_info', 'timestamp'),
('ts_data', 'ts_info', 'timestamp'),
('ts_ctype', 'ctype_info', 'ctype_timestamp'),
('durable', 'data_info', 'durable'),
)
# cleanup_ondisk_files() will remove empty hash dirs, and we'll
@ -1626,21 +1631,24 @@ class BaseDiskFileManager(object):
for object_hash in self._listdir(suffix_path):
object_path = os.path.join(suffix_path, object_hash)
try:
results = self.cleanup_ondisk_files(
diskfile_info = self.cleanup_ondisk_files(
object_path, **kwargs)
if results['files']:
if diskfile_info['files']:
found_files = True
timestamps = {}
for ts_key, info_key, info_ts_key in key_preference:
if info_key not in results:
result = {}
for result_key, diskfile_info_key, info_key in key_map:
if diskfile_info_key not in diskfile_info:
continue
timestamps[ts_key] = results[info_key][info_ts_key]
if 'ts_data' not in timestamps:
info = diskfile_info[diskfile_info_key]
if info_key in info:
# durable key not returned from replicated Diskfile
result[result_key] = info[info_key]
if 'ts_data' not in result:
# file sets that do not include a .data or .ts
# file cannot be opened and therefore cannot
# be ssync'd
continue
yield (object_hash, timestamps)
yield object_hash, result
except AssertionError as err:
self.logger.debug('Invalid file set in %s (%s)' % (
object_path, err))
@ -3489,6 +3497,11 @@ class ECDiskFileManager(BaseDiskFileManager):
break
if durable_info and durable_info['timestamp'] == timestamp:
durable_frag_set = frag_set
# a data frag filename may not have the #d part if durability
# is defined by a legacy .durable, so always mark all data
# frags as durable here
for frag in frag_set:
frag['durable'] = True
break # ignore frags that are older than durable timestamp
# Choose which frag set to use

View File

@ -864,7 +864,7 @@ class ObjectReconstructor(Daemon):
# ssync any out-of-sync suffixes with the remote node
success, _ = ssync_sender(
self, node, job, suffixes)()
self, node, job, suffixes, include_non_durable=False)()
# update stats for this attempt
self.suffix_sync += len(suffixes)
self.logger.update_stats('suffix.syncs', len(suffixes))
@ -891,7 +891,8 @@ class ObjectReconstructor(Daemon):
node['backend_index'] = job['policy'].get_backend_index(
node['index'])
success, in_sync_objs = ssync_sender(
self, node, job, job['suffixes'])()
self, node, job, job['suffixes'],
include_non_durable=True)()
if success:
syncd_with += 1
reverted_objs.update(in_sync_objs)

View File

@ -1048,6 +1048,8 @@ class ObjectController(BaseStorageServer):
if multi_stage_mime_state:
self._send_multi_stage_continue_headers(
request, **multi_stage_mime_state)
if not config_true_value(
request.headers.get('X-Backend-No-Commit', False)):
writer.commit(request.timestamp)
if multi_stage_mime_state:
self._drain_mime_request(**multi_stage_mime_state)
@ -1310,7 +1312,14 @@ class ObjectController(BaseStorageServer):
@replication
@timing_stats(sample_rate=0.1)
def SSYNC(self, request):
return Response(app_iter=ssync_receiver.Receiver(self, request)())
# the ssync sender may want to send PUT subrequests for non-durable
# data that should not be committed; legacy behaviour has been to
# commit all PUTs (subject to EC footer metadata), so we need to
# indicate to the sender that this object server has been upgraded to
# understand the X-Backend-No-Commit header.
headers = {'X-Backend-Accept-No-Commit': True}
return Response(app_iter=ssync_receiver.Receiver(self, request)(),
headers=headers)
def __call__(self, env, start_response):
"""WSGI Application entry point for the Swift Object Server."""

View File

@ -35,7 +35,8 @@ def decode_missing(line):
"""
Parse a string of the form generated by
:py:func:`~swift.obj.ssync_sender.encode_missing` and return a dict
with keys ``object_hash``, ``ts_data``, ``ts_meta``, ``ts_ctype``.
with keys ``object_hash``, ``ts_data``, ``ts_meta``, ``ts_ctype``,
``durable``.
The encoder for this line is
:py:func:`~swift.obj.ssync_sender.encode_missing`
@ -46,6 +47,7 @@ def decode_missing(line):
t_data = urllib.parse.unquote(parts[1])
result['ts_data'] = Timestamp(t_data)
result['ts_meta'] = result['ts_ctype'] = result['ts_data']
result['durable'] = True # default to True in case this key isn't sent
if len(parts) > 2:
# allow for a comma separated list of k:v pairs to future-proof
subparts = urllib.parse.unquote(parts[2]).split(',')
@ -55,6 +57,8 @@ def decode_missing(line):
result['ts_meta'] = Timestamp(t_data, delta=int(v, 16))
elif k == 't':
result['ts_ctype'] = Timestamp(t_data, delta=int(v, 16))
elif k == 'durable':
result['durable'] = utils.config_true_value(v)
return result
@ -279,6 +283,7 @@ class Receiver(object):
except exceptions.DiskFileDeleted as err:
result = {'ts_data': err.timestamp}
except exceptions.DiskFileError:
# e.g. a non-durable EC frag
result = {}
else:
result = {
@ -286,25 +291,35 @@ class Receiver(object):
'ts_meta': df.timestamp,
'ts_ctype': df.content_type_timestamp,
}
if (make_durable and df.fragments and
if ((df.durable_timestamp is None or
df.durable_timestamp < remote['ts_data']) and
df.fragments and
remote['ts_data'] in df.fragments and
self.frag_index in df.fragments[remote['ts_data']] and
(df.durable_timestamp is None or
df.durable_timestamp < remote['ts_data'])):
# We have the frag, just missing durable state, so make the frag
# durable now. Try this just once to avoid looping if it fails.
self.frag_index in df.fragments[remote['ts_data']]):
# The remote is offering a fragment that we already have but is
# *newer* than anything *durable* that we have
if remote['durable']:
# We have the frag, just missing durable state, so make the
# frag durable now. Try this just once to avoid looping if
# it fails.
if make_durable:
try:
with df.create() as writer:
writer.commit(remote['ts_data'])
return self._check_local(remote, make_durable=False)
except Exception:
# if commit fails then log exception and fall back to wanting
# a full update
# if commit fails then log exception and fall back to
# wanting a full update
self.app.logger.exception(
'%s/%s/%s EXCEPTION in ssync.Receiver while '
'attempting commit of %s'
% (self.request.remote_addr, self.device, self.partition,
df._datadir))
% (self.request.remote_addr, self.device,
self.partition, df._datadir))
else:
# We have the non-durable frag that is on offer, but our
# ts_data may currently be set to an older durable frag, so
# bump our ts_data to prevent the remote frag being wanted.
result['ts_data'] = remote['ts_data']
return result
def _check_missing(self, line):
@ -454,10 +469,15 @@ class Receiver(object):
header = header.strip().lower()
value = value.strip()
subreq.headers[header] = value
if header != 'etag':
# make sure ssync doesn't cause 'Etag' to be added to
if header not in ('etag', 'x-backend-no-commit'):
# we'll use X-Backend-Replication-Headers to force the
# object server to write all sync'd metadata, but with some
# exceptions:
# - make sure ssync doesn't cause 'Etag' to be added to
# obj metadata in addition to 'ETag' which object server
# sets (note capitalization)
# - filter out x-backend-no-commit which ssync sender may
# have added to the subrequest
replication_headers.append(header)
if header == 'content-length':
content_length = int(value)

View File

@ -19,14 +19,17 @@ from six.moves import urllib
from swift.common import bufferedhttp
from swift.common import exceptions
from swift.common import http
from swift.common.utils import config_true_value
def encode_missing(object_hash, ts_data, ts_meta=None, ts_ctype=None):
def encode_missing(object_hash, ts_data, ts_meta=None, ts_ctype=None,
**kwargs):
"""
Returns a string representing the object hash, its data file timestamp
and the delta forwards to its metafile and content-type timestamps, if
non-zero, in the form:
``<hash> <ts_data> [m:<hex delta to ts_meta>[,t:<hex delta to ts_ctype>]]``
Returns a string representing the object hash, its data file timestamp,
the delta forwards to its metafile and content-type timestamps, if
non-zero, and its durability, in the form:
``<hash> <ts_data> [m:<hex delta to ts_meta>[,t:<hex delta to ts_ctype>]
[,durable:False]``
The decoder for this line is
:py:func:`~swift.obj.ssync_receiver.decode_missing`
@ -34,12 +37,18 @@ def encode_missing(object_hash, ts_data, ts_meta=None, ts_ctype=None):
msg = ('%s %s'
% (urllib.parse.quote(object_hash),
urllib.parse.quote(ts_data.internal)))
extra_parts = []
if ts_meta and ts_meta != ts_data:
delta = ts_meta.raw - ts_data.raw
msg = '%s m:%x' % (msg, delta)
extra_parts.append('m:%x' % delta)
if ts_ctype and ts_ctype != ts_data:
delta = ts_ctype.raw - ts_data.raw
msg = '%s,t:%x' % (msg, delta)
extra_parts.append('t:%x' % delta)
if 'durable' in kwargs and kwargs['durable'] is False:
# only send durable in the less common case that it is False
extra_parts.append('durable:%s' % kwargs['durable'])
if extra_parts:
msg = '%s %s' % (msg, ','.join(extra_parts))
return msg.encode('ascii')
@ -133,7 +142,8 @@ class Sender(object):
process is there.
"""
def __init__(self, daemon, node, job, suffixes, remote_check_objs=None):
def __init__(self, daemon, node, job, suffixes, remote_check_objs=None,
include_non_durable=False):
self.daemon = daemon
self.df_mgr = self.daemon._df_router[job['policy']]
self.node = node
@ -142,6 +152,7 @@ class Sender(object):
# When remote_check_objs is given in job, ssync_sender trys only to
# make sure those objects exist or not in remote.
self.remote_check_objs = remote_check_objs
self.include_non_durable = include_non_durable
def __call__(self):
"""
@ -221,11 +232,11 @@ class Sender(object):
with the object server.
"""
connection = response = None
node_addr = '%s:%s' % (self.node['replication_ip'],
self.node['replication_port'])
with exceptions.MessageTimeout(
self.daemon.conn_timeout, 'connect send'):
connection = SsyncBufferedHTTPConnection(
'%s:%s' % (self.node['replication_ip'],
self.node['replication_port']))
connection = SsyncBufferedHTTPConnection(node_addr)
connection.putrequest('SSYNC', '/%s/%s' % (
self.node['device'], self.job['partition']))
connection.putheader('Transfer-Encoding', 'chunked')
@ -248,6 +259,14 @@ class Sender(object):
raise exceptions.ReplicationException(
'Expected status %s; got %s (%s)' %
(http.HTTP_OK, response.status, err_msg))
if self.include_non_durable and not config_true_value(
response.getheader('x-backend-accept-no-commit', False)):
# fall back to legacy behaviour if receiver does not understand
# X-Backend-Commit
self.daemon.logger.warning(
'ssync receiver %s does not accept non-durable fragments' %
node_addr)
self.include_non_durable = False
return connection, response
def missing_check(self, connection, response):
@ -265,10 +284,14 @@ class Sender(object):
self.daemon.node_timeout, 'missing_check start'):
msg = b':MISSING_CHECK: START\r\n'
connection.send(b'%x\r\n%s\r\n' % (len(msg), msg))
# an empty frag_prefs list is sufficient to get non-durable frags
# yielded, in which case an older durable frag will not be yielded
frag_prefs = [] if self.include_non_durable else None
hash_gen = self.df_mgr.yield_hashes(
self.job['device'], self.job['partition'],
self.job['policy'], self.suffixes,
frag_index=self.job.get('frag_index'))
frag_index=self.job.get('frag_index'),
frag_prefs=frag_prefs)
if self.remote_check_objs is not None:
hash_gen = six.moves.filter(
lambda objhash_timestamps:
@ -330,13 +353,14 @@ class Sender(object):
self.daemon.node_timeout, 'updates start'):
msg = b':UPDATES: START\r\n'
connection.send(b'%x\r\n%s\r\n' % (len(msg), msg))
frag_prefs = [] if self.include_non_durable else None
for object_hash, want in send_map.items():
object_hash = urllib.parse.unquote(object_hash)
try:
df = self.df_mgr.get_diskfile_from_hash(
self.job['device'], self.job['partition'], object_hash,
self.job['policy'], frag_index=self.job.get('frag_index'),
open_expired=True)
open_expired=True, frag_prefs=frag_prefs)
except exceptions.DiskFileNotExist:
continue
url_path = urllib.parse.quote(
@ -344,13 +368,15 @@ class Sender(object):
try:
df.open()
if want.get('data'):
is_durable = (df.durable_timestamp == df.data_timestamp)
# EC reconstructor may have passed a callback to build an
# alternative diskfile - construct it using the metadata
# from the data file only.
df_alt = self.job.get(
'sync_diskfile_builder', lambda *args: df)(
self.job, self.node, df.get_datafile_metadata())
self.send_put(connection, url_path, df_alt)
self.send_put(connection, url_path, df_alt,
durable=is_durable)
if want.get('meta') and df.data_timestamp != df.timestamp:
self.send_post(connection, url_path, df)
except exceptions.DiskFileDeleted as err:
@ -443,12 +469,16 @@ class Sender(object):
headers = {'X-Timestamp': timestamp.internal}
self.send_subrequest(connection, 'DELETE', url_path, headers, None)
def send_put(self, connection, url_path, df):
def send_put(self, connection, url_path, df, durable=True):
"""
Sends a PUT subrequest for the url_path using the source df
(DiskFile) and content_length.
"""
headers = {'Content-Length': str(df.content_length)}
if not durable:
# only send this header for the less common case; without this
# header object servers assume default commit behaviour
headers['X-Backend-No-Commit'] = 'True'
for key, value in df.get_datafile_metadata().items():
if key not in ('name', 'Content-Length'):
headers[key] = value

View File

@ -677,7 +677,8 @@ class ECProbeTest(ProbeTest):
def assert_direct_get_succeeds(self, onode, opart, require_durable=True,
extra_headers=None):
try:
self.direct_get(onode, opart, require_durable=require_durable,
return self.direct_get(onode, opart,
require_durable=require_durable,
extra_headers=extra_headers)
except direct_client.DirectClientException as err:
self.fail('Node data on %r was not available: %s' % (onode, err))
@ -715,6 +716,31 @@ class ECProbeTest(ProbeTest):
raise
return made_non_durable
def make_durable(self, nodes, opart):
# ensure all data files on the specified nodes are durable
made_durable = 0
for i, node in enumerate(nodes):
part_dir = self.storage_dir(node, part=opart)
for dirs, subdirs, files in os.walk(part_dir):
for fname in sorted(files, reverse=True):
# make the newest non-durable be durable
if (fname.endswith('.data') and
not fname.endswith('#d.data')):
made_durable += 1
non_durable_fname = fname.replace('.data', '#d.data')
os.rename(os.path.join(dirs, fname),
os.path.join(dirs, non_durable_fname))
break
headers, etag = self.assert_direct_get_succeeds(node, opart)
self.assertIn('X-Backend-Durable-Timestamp', headers)
try:
os.remove(os.path.join(part_dir, 'hashes.pkl'))
except OSError as e:
if e.errno != errno.ENOENT:
raise
return made_durable
if __name__ == "__main__":
for server in ('account', 'container'):

View File

@ -316,6 +316,137 @@ class TestReconstructorRevert(ECProbeTest):
else:
self.fail('Did not find rebuilt fragment on partner node')
def test_handoff_non_durable(self):
# verify that reconstructor reverts non-durable frags from handoff to
# primary (and also durable frag of same object on same handoff) and
# cleans up non-durable data files on handoffs after revert
headers = {'X-Storage-Policy': self.policy.name}
client.put_container(self.url, self.token, self.container_name,
headers=headers)
# get our node lists
opart, onodes = self.object_ring.get_nodes(
self.account, self.container_name, self.object_name)
pdevs = [self.device_dir(onode) for onode in onodes]
hnodes = list(itertools.islice(
self.object_ring.get_more_nodes(opart), 2))
# kill a primary nodes so we can force data onto a handoff
self.kill_drive(pdevs[0])
# PUT object at t1
contents = Body(total=3.5 * 2 ** 20)
headers = {'x-object-meta-foo': 'meta-foo'}
headers_post = {'x-object-meta-bar': 'meta-bar'}
client.put_object(self.url, self.token, self.container_name,
self.object_name, contents=contents,
headers=headers)
client.post_object(self.url, self.token, self.container_name,
self.object_name, headers=headers_post)
# (Some versions of?) swiftclient will mutate the headers dict on post
headers_post.pop('X-Auth-Token', None)
# this primary can't serve the data; we expect 507 here and not 404
# because we're using mount_check to kill nodes
self.assert_direct_get_fails(onodes[0], opart, 507)
# these primaries and first handoff do have the data
for onode in (onodes[1:]):
self.assert_direct_get_succeeds(onode, opart)
_hdrs, older_frag_etag = self.assert_direct_get_succeeds(hnodes[0],
opart)
self.assert_direct_get_fails(hnodes[1], opart, 404)
# make sure we can GET the object; there's 5 primaries and 1 handoff
headers, older_obj_etag = self.proxy_get()
self.assertEqual(contents.etag, older_obj_etag)
self.assertEqual('meta-bar', headers.get('x-object-meta-bar'))
# PUT object at t2; make all frags non-durable so that the previous
# durable frags at t1 remain on object server; use InternalClient so
# that x-backend-no-commit is passed through
internal_client = self.make_internal_client()
contents2 = Body(total=2.5 * 2 ** 20) # different content
self.assertNotEqual(contents2.etag, older_obj_etag) # sanity check
headers = {'x-backend-no-commit': 'True',
'x-object-meta-bar': 'meta-bar-new'}
internal_client.upload_object(contents2, self.account,
self.container_name.decode('utf8'),
self.object_name.decode('utf8'),
headers)
# GET should still return the older durable object
headers, obj_etag = self.proxy_get()
self.assertEqual(older_obj_etag, obj_etag)
self.assertEqual('meta-bar', headers.get('x-object-meta-bar'))
# on handoff we have older durable and newer non-durable
_hdrs, frag_etag = self.assert_direct_get_succeeds(hnodes[0], opart)
self.assertEqual(older_frag_etag, frag_etag)
_hdrs, newer_frag_etag = self.assert_direct_get_succeeds(
hnodes[0], opart, require_durable=False)
self.assertNotEqual(older_frag_etag, newer_frag_etag)
# now make all the newer frags durable only on the 5 primaries
self.assertEqual(5, self.make_durable(onodes[1:], opart))
# now GET will return the newer object
headers, newer_obj_etag = self.proxy_get()
self.assertEqual(contents2.etag, newer_obj_etag)
self.assertNotEqual(older_obj_etag, newer_obj_etag)
self.assertEqual('meta-bar-new', headers.get('x-object-meta-bar'))
# fix the 507'ing primary
self.revive_drive(pdevs[0])
# fire up reconstructor on handoff node only
hnode_id = (hnodes[0]['port'] % 100) // 10
self.reconstructor.once(number=hnode_id)
# primary now has only the newer non-durable frag
self.assert_direct_get_fails(onodes[0], opart, 404)
_hdrs, frag_etag = self.assert_direct_get_succeeds(
onodes[0], opart, require_durable=False)
self.assertEqual(newer_frag_etag, frag_etag)
# handoff has only the older durable
_hdrs, frag_etag = self.assert_direct_get_succeeds(hnodes[0], opart)
self.assertEqual(older_frag_etag, frag_etag)
headers, frag_etag = self.assert_direct_get_succeeds(
hnodes[0], opart, require_durable=False)
self.assertEqual(older_frag_etag, frag_etag)
self.assertEqual('meta-bar', headers.get('x-object-meta-bar'))
# fire up reconstructor on handoff node only, again
self.reconstructor.once(number=hnode_id)
# primary now has the newer non-durable frag and the older durable frag
headers, frag_etag = self.assert_direct_get_succeeds(onodes[0], opart)
self.assertEqual(older_frag_etag, frag_etag)
self.assertEqual('meta-bar', headers.get('x-object-meta-bar'))
headers, frag_etag = self.assert_direct_get_succeeds(
onodes[0], opart, require_durable=False)
self.assertEqual(newer_frag_etag, frag_etag)
self.assertEqual('meta-bar-new', headers.get('x-object-meta-bar'))
# handoff has nothing
self.assert_direct_get_fails(hnodes[0], opart, 404,
require_durable=False)
# kill all but first two primaries
for pdev in pdevs[2:]:
self.kill_drive(pdev)
# fire up reconstructor on the remaining primary[1]; without the
# other primaries, primary[1] cannot rebuild the frag but it can let
# primary[0] know that its non-durable frag can be made durable
self.reconstructor.once(number=self.config_number(onodes[1]))
# first primary now has a *durable* *newer* frag - it *was* useful to
# sync the non-durable!
headers, frag_etag = self.assert_direct_get_succeeds(onodes[0], opart)
self.assertEqual(newer_frag_etag, frag_etag)
self.assertEqual('meta-bar-new', headers.get('x-object-meta-bar'))
# revive primaries (in case we want to debug)
for pdev in pdevs[2:]:
self.revive_drive(pdev)
if __name__ == "__main__":
unittest.main()

View File

@ -70,10 +70,10 @@ class BaseTest(unittest.TestCase):
shutil.rmtree(self.tmpdir, ignore_errors=True)
def _make_diskfile(self, device='dev', partition='9',
account='a', container='c', obj='o', body='test',
account='a', container='c', obj='o', body=b'test',
extra_metadata=None, policy=None,
frag_index=None, timestamp=None, df_mgr=None,
commit=True, verify=True):
commit=True, verify=True, **kwargs):
policy = policy or POLICIES.legacy
object_parts = account, container, obj
timestamp = Timestamp.now() if timestamp is None else timestamp
@ -81,7 +81,7 @@ class BaseTest(unittest.TestCase):
df_mgr = self.daemon._df_router[policy]
df = df_mgr.get_diskfile(
device, partition, *object_parts, policy=policy,
frag_index=frag_index)
frag_index=frag_index, **kwargs)
write_diskfile(df, timestamp, data=body, extra_metadata=extra_metadata,
commit=commit)
if commit and verify:
@ -99,9 +99,10 @@ class BaseTest(unittest.TestCase):
def _make_open_diskfile(self, device='dev', partition='9',
account='a', container='c', obj='o', body=b'test',
extra_metadata=None, policy=None,
frag_index=None, timestamp=None, df_mgr=None):
frag_index=None, timestamp=None, df_mgr=None,
commit=True, **kwargs):
df = self._make_diskfile(device, partition, account, container, obj,
body, extra_metadata, policy, frag_index,
timestamp, df_mgr)
timestamp, df_mgr, commit, **kwargs)
df.open()
return df

View File

@ -1539,8 +1539,9 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
invalidations_file = os.path.join(
part_dir, diskfile.HASH_INVALIDATIONS_FILE)
with open(invalidations_file) as f:
self.assertEqual('%s\n%s' % (df1_suffix, df2_suffix),
f.read().strip('\n')) # sanity
invalids = f.read().splitlines()
self.assertEqual(sorted((df1_suffix, df2_suffix)),
sorted(invalids)) # sanity
# next time get hashes runs
with mock.patch('time.time', mock_time):
@ -2768,55 +2769,59 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
expected)
def test_yield_hashes_legacy_durable(self):
old_ts = '1383180000.12345'
fresh_ts = Timestamp(time() - 10).internal
fresher_ts = Timestamp(time() - 1).internal
old_ts = Timestamp('1383180000.12345')
fresh_ts = Timestamp(time() - 10)
fresher_ts = Timestamp(time() - 1)
suffix_map = {
'abc': {
'9373a92d072897b136b3fc06595b4abc': [
fresh_ts + '.ts'],
fresh_ts.internal + '.ts'],
},
'456': {
'9373a92d072897b136b3fc06595b0456': [
old_ts + '#2.data',
old_ts + '.durable'],
old_ts.internal + '#2.data',
old_ts.internal + '.durable'],
'9373a92d072897b136b3fc06595b7456': [
fresh_ts + '.ts',
fresher_ts + '#2.data',
fresher_ts + '.durable'],
fresh_ts.internal + '.ts',
fresher_ts.internal + '#2.data',
fresher_ts.internal + '.durable'],
},
'def': {},
}
expected = {
'9373a92d072897b136b3fc06595b4abc': {'ts_data': fresh_ts},
'9373a92d072897b136b3fc06595b0456': {'ts_data': old_ts},
'9373a92d072897b136b3fc06595b7456': {'ts_data': fresher_ts},
'9373a92d072897b136b3fc06595b0456': {'ts_data': old_ts,
'durable': True},
'9373a92d072897b136b3fc06595b7456': {'ts_data': fresher_ts,
'durable': True},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=2)
def test_yield_hashes(self):
old_ts = '1383180000.12345'
fresh_ts = Timestamp(time() - 10).internal
fresher_ts = Timestamp(time() - 1).internal
old_ts = Timestamp('1383180000.12345')
fresh_ts = Timestamp(time() - 10)
fresher_ts = Timestamp(time() - 1)
suffix_map = {
'abc': {
'9373a92d072897b136b3fc06595b4abc': [
fresh_ts + '.ts'],
fresh_ts.internal + '.ts'],
},
'456': {
'9373a92d072897b136b3fc06595b0456': [
old_ts + '#2#d.data'],
old_ts.internal + '#2#d.data'],
'9373a92d072897b136b3fc06595b7456': [
fresh_ts + '.ts',
fresher_ts + '#2#d.data'],
fresh_ts.internal + '.ts',
fresher_ts.internal + '#2#d.data'],
},
'def': {},
}
expected = {
'9373a92d072897b136b3fc06595b4abc': {'ts_data': fresh_ts},
'9373a92d072897b136b3fc06595b0456': {'ts_data': old_ts},
'9373a92d072897b136b3fc06595b7456': {'ts_data': fresher_ts},
'9373a92d072897b136b3fc06595b0456': {'ts_data': old_ts,
'durable': True},
'9373a92d072897b136b3fc06595b7456': {'ts_data': fresher_ts,
'durable': True},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=2)
@ -2847,9 +2852,11 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
expected = {
'9373a92d072897b136b3fc06595b4abc': {'ts_data': ts1},
'9373a92d072897b136b3fc06595b0456': {'ts_data': ts1,
'ts_meta': ts3},
'ts_meta': ts3,
'durable': True},
'9373a92d072897b136b3fc06595b7456': {'ts_data': ts1,
'ts_meta': ts2},
'ts_meta': ts2,
'durable': True},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected)
@ -2885,9 +2892,11 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
expected = {
'9373a92d072897b136b3fc06595b4abc': {'ts_data': ts1},
'9373a92d072897b136b3fc06595b0456': {'ts_data': ts1,
'ts_meta': ts3},
'ts_meta': ts3,
'durable': True},
'9373a92d072897b136b3fc06595b7456': {'ts_data': ts1,
'ts_meta': ts2},
'ts_meta': ts2,
'durable': True},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected)
@ -2921,8 +2930,10 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
'def': {},
}
expected = {
'9373a92d072897b136b3fc06595b0456': {'ts_data': old_ts},
'9373a92d072897b136b3fc06595b7456': {'ts_data': fresher_ts},
'9373a92d072897b136b3fc06595b0456': {'ts_data': old_ts,
'durable': True},
'9373a92d072897b136b3fc06595b7456': {'ts_data': fresher_ts,
'durable': True},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected,
suffixes=['456'], frag_index=2)
@ -2947,8 +2958,10 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
'def': {},
}
expected = {
'9373a92d072897b136b3fc06595b0456': {'ts_data': old_ts},
'9373a92d072897b136b3fc06595b7456': {'ts_data': fresher_ts},
'9373a92d072897b136b3fc06595b0456': {'ts_data': old_ts,
'durable': True},
'9373a92d072897b136b3fc06595b7456': {'ts_data': fresher_ts,
'durable': True},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected,
suffixes=['456'], frag_index=2)
@ -2965,7 +2978,8 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
},
}
expected = {
'9373a92d072897b136b3fc06595b0456': {'ts_data': ts1},
'9373a92d072897b136b3fc06595b0456': {'ts_data': ts1,
'durable': True},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=2)
@ -2974,12 +2988,62 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
suffix_map['456']['9373a92d072897b136b3fc06595b7456'] = [
ts1.internal + '#2#d.data']
expected = {
'9373a92d072897b136b3fc06595b0456': {'ts_data': ts1},
'9373a92d072897b136b3fc06595b7456': {'ts_data': ts1},
'9373a92d072897b136b3fc06595b0456': {'ts_data': ts1,
'durable': True},
'9373a92d072897b136b3fc06595b7456': {'ts_data': ts1,
'durable': True},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=2)
def test_yield_hashes_optionally_yields_non_durable_data(self):
ts_iter = (Timestamp(t) for t in itertools.count(int(time())))
ts1 = next(ts_iter)
ts2 = next(ts_iter)
suffix_map = {
'abc': {
'9373a92d072897b136b3fc06595b4abc': [
ts1.internal + '#2#d.data',
ts2.internal + '#2.data'], # newer non-durable
'9373a92d072897b136b3fc06595b0abc': [
ts1.internal + '#2.data', # older non-durable
ts2.internal + '#2#d.data'],
},
'456': {
'9373a92d072897b136b3fc06595b0456': [
ts1.internal + '#2#d.data'],
'9373a92d072897b136b3fc06595b7456': [
ts2.internal + '#2.data'],
},
}
# sanity check non-durables not yielded
expected = {
'9373a92d072897b136b3fc06595b4abc': {'ts_data': ts1,
'durable': True},
'9373a92d072897b136b3fc06595b0abc': {'ts_data': ts2,
'durable': True},
'9373a92d072897b136b3fc06595b0456': {'ts_data': ts1,
'durable': True},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=2, frag_prefs=None)
# an empty frag_prefs list is sufficient to get non-durables yielded
# (in preference over *older* durable)
expected = {
'9373a92d072897b136b3fc06595b4abc': {'ts_data': ts2,
'durable': False},
'9373a92d072897b136b3fc06595b0abc': {'ts_data': ts2,
'durable': True},
'9373a92d072897b136b3fc06595b0456': {'ts_data': ts1,
'durable': True},
'9373a92d072897b136b3fc06595b7456': {'ts_data': ts2,
'durable': False},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=2, frag_prefs=[])
def test_yield_hashes_skips_missing_legacy_durable(self):
ts_iter = (Timestamp(t) for t in itertools.count(int(time())))
ts1 = next(ts_iter)
@ -2993,7 +3057,8 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
},
}
expected = {
'9373a92d072897b136b3fc06595b0456': {'ts_data': ts1},
'9373a92d072897b136b3fc06595b0456': {'ts_data': ts1,
'durable': True},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=2)
@ -3002,8 +3067,10 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
suffix_map['456']['9373a92d072897b136b3fc06595b7456'].append(
ts1.internal + '.durable')
expected = {
'9373a92d072897b136b3fc06595b0456': {'ts_data': ts1},
'9373a92d072897b136b3fc06595b7456': {'ts_data': ts1},
'9373a92d072897b136b3fc06595b0456': {'ts_data': ts1,
'durable': True},
'9373a92d072897b136b3fc06595b7456': {'ts_data': ts1,
'durable': True},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=2)
@ -3023,7 +3090,8 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
},
}
expected = {
'9373a92d072897b136b3fc06595b0456': {'ts_data': ts1},
'9373a92d072897b136b3fc06595b0456': {'ts_data': ts1,
'durable': True},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=None)
@ -3034,7 +3102,8 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
suffix_map['456']['9373a92d072897b136b3fc06595b0456'].append(
ts2.internal + '.durable')
expected = {
'9373a92d072897b136b3fc06595b0456': {'ts_data': ts2},
'9373a92d072897b136b3fc06595b0456': {'ts_data': ts2,
'durable': True},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=None)
@ -3055,7 +3124,8 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
},
}
expected = {
'9373a92d072897b136b3fc06595b0456': {'ts_data': ts1},
'9373a92d072897b136b3fc06595b0456': {'ts_data': ts1,
'durable': True},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=None)
@ -3072,7 +3142,8 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
},
}
expected = {
'9373a92d072897b136b3fc06595b0456': {'ts_data': ts2},
'9373a92d072897b136b3fc06595b0456': {'ts_data': ts2,
'durable': True},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=None)
@ -3130,12 +3201,16 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
},
}
expected = {
'9333a92d072897b136b3fc06595b0456': {'ts_data': ts1},
'9333a92d072897b136b3fc06595b0456': {'ts_data': ts1,
'durable': True},
'9999a92d072897b136b3fc06595bb456': {'ts_data': ts1,
'ts_meta': ts2},
'9333a92d072897b136b3fc06595b1456': {'ts_data': ts1},
'ts_meta': ts2,
'durable': True},
'9333a92d072897b136b3fc06595b1456': {'ts_data': ts1,
'durable': True},
'9999a92d072897b136b3fc06595bc456': {'ts_data': ts1,
'ts_meta': ts2},
'ts_meta': ts2,
'durable': True},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=2)
@ -3170,9 +3245,12 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
},
}
expected = {
'1111111111111111111111111111127e': {'ts_data': ts1},
'2222222222222222222222222222227e': {'ts_data': ts2},
'3333333333333333333333333333300b': {'ts_data': ts3},
'1111111111111111111111111111127e': {'ts_data': ts1,
'durable': True},
'2222222222222222222222222222227e': {'ts_data': ts2,
'durable': True},
'3333333333333333333333333333300b': {'ts_data': ts3,
'durable': True},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=2)
@ -3212,9 +3290,12 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
},
}
expected = {
'1111111111111111111111111111127e': {'ts_data': ts1},
'2222222222222222222222222222227e': {'ts_data': ts2},
'3333333333333333333333333333300b': {'ts_data': ts3},
'1111111111111111111111111111127e': {'ts_data': ts1,
'durable': True},
'2222222222222222222222222222227e': {'ts_data': ts2,
'durable': True},
'3333333333333333333333333333300b': {'ts_data': ts3,
'durable': True},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=2)
@ -3271,7 +3352,7 @@ class DiskFileMixin(BaseDiskFileTestMixin):
def _create_ondisk_file(self, df, data, timestamp, metadata=None,
ctype_timestamp=None,
ext='.data', legacy_durable=False):
ext='.data', legacy_durable=False, commit=True):
mkdirs(df._datadir)
if timestamp is None:
timestamp = time()
@ -3292,12 +3373,15 @@ class DiskFileMixin(BaseDiskFileTestMixin):
if ext == '.data' and df.policy.policy_type == EC_POLICY:
if legacy_durable:
filename = '%s#%s' % (timestamp.internal, df._frag_index)
durable_file = os.path.join(df._datadir,
'%s.durable' % timestamp.internal)
if commit:
durable_file = os.path.join(
df._datadir, '%s.durable' % timestamp.internal)
with open(durable_file, 'wb') as f:
pass
else:
elif commit:
filename = '%s#%s#d' % (timestamp.internal, df._frag_index)
else:
filename = '%s#%s' % (timestamp.internal, df._frag_index)
if ctype_timestamp:
metadata.update(
{'Content-Type-Timestamp':
@ -6300,6 +6384,35 @@ class TestECDiskFile(DiskFileMixin, unittest.TestCase):
df.open() # not quarantined
def test_ondisk_data_info_has_durable_key(self):
# non-durable; use frag_prefs=[] to allow it to be opened
df = self._simple_get_diskfile(obj='o1', frag_prefs=[])
self._create_ondisk_file(df, b'', ext='.data', timestamp=10,
metadata={'name': '/a/c/o1'}, commit=False)
with df.open():
self.assertIn('durable', df._ondisk_info['data_info'])
self.assertFalse(df._ondisk_info['data_info']['durable'])
# durable
df = self._simple_get_diskfile(obj='o2')
self._create_ondisk_file(df, b'', ext='.data', timestamp=10,
metadata={'name': '/a/c/o2'})
with df.open():
self.assertIn('durable', df._ondisk_info['data_info'])
self.assertTrue(df._ondisk_info['data_info']['durable'])
# legacy durable
df = self._simple_get_diskfile(obj='o3')
self._create_ondisk_file(df, b'', ext='.data', timestamp=10,
metadata={'name': '/a/c/o3'},
legacy_durable=True)
with df.open():
data_info = df._ondisk_info['data_info']
# sanity check it is legacy with no #d part in filename
self.assertEqual(data_info['filename'], '0000000010.00000#2.data')
self.assertIn('durable', data_info)
self.assertTrue(data_info['durable'])
@patch_policies(with_ec_default=True)
class TestSuffixHashes(unittest.TestCase):
@ -7066,7 +7179,9 @@ class TestSuffixHashes(unittest.TestCase):
df2.delete(self.ts())
# suffix2 should be in invalidations file
with open(invalidations_file, 'r') as f:
self.assertEqual("%s\n%s\n" % (suffix2, suffix2), f.read())
invalids = f.read().splitlines()
self.assertEqual(sorted((suffix2, suffix2)),
sorted(invalids)) # sanity
# hashes file is not yet changed
with open(hashes_file, 'rb') as f:
found_hashes = pickle.load(f)

View File

@ -52,10 +52,11 @@ from test.unit.obj.common import write_diskfile
@contextmanager
def mock_ssync_sender(ssync_calls=None, response_callback=None, **kwargs):
def fake_ssync(daemon, node, job, suffixes):
def fake_ssync(daemon, node, job, suffixes, **kwargs):
if ssync_calls is not None:
ssync_calls.append(
{'node': node, 'job': job, 'suffixes': suffixes})
call_args = {'node': node, 'job': job, 'suffixes': suffixes}
call_args.update(kwargs)
ssync_calls.append(call_args)
def fake_call():
if response_callback:
@ -1136,6 +1137,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
self.success = False
break
context['success'] = self.success
context.update(kwargs)
def __call__(self, *args, **kwargs):
return self.success, self.available_map if self.success else {}
@ -1168,6 +1170,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
expected_calls = []
for context in ssync_calls:
if context['job']['job_type'] == REVERT:
self.assertTrue(context.get('include_non_durable'))
for dirpath, files in visit_obj_dirs(context):
# sanity check - expect some files to be in dir,
# may not be for the reverted frag index
@ -1176,6 +1179,9 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
expected_calls.append(mock.call(context['job'],
context['available_map'],
context['node']['index']))
else:
self.assertFalse(context.get('include_non_durable'))
mock_delete.assert_has_calls(expected_calls, any_order=True)
# N.B. in this next test sequence we acctually delete files after
@ -1193,12 +1199,15 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
self.reconstructor.reconstruct()
for context in ssync_calls:
if context['job']['job_type'] == REVERT:
self.assertTrue(True, context.get('include_non_durable'))
data_file_tail = ('#%s.data'
% context['node']['index'])
for dirpath, files in visit_obj_dirs(context):
n_files_after += len(files)
for filename in files:
self.assertFalse(filename.endswith(data_file_tail))
else:
self.assertFalse(context.get('include_non_durable'))
# sanity check that some files should were deleted
self.assertGreater(n_files, n_files_after)
@ -1225,13 +1234,14 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
self.assertEqual(len(captured_ssync), 2)
expected_ssync_calls = {
# device, part, frag_index: expected_occurrences
('sda1', 2, 2): 1,
('sda1', 2, 0): 1,
('sda1', 2, 2, True): 1,
('sda1', 2, 0, True): 1,
}
self.assertEqual(expected_ssync_calls, dict(collections.Counter(
(context['job']['device'],
context['job']['partition'],
context['job']['frag_index'])
context['job']['frag_index'],
context['include_non_durable'])
for context in captured_ssync
)))
@ -1296,14 +1306,15 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
self.reconstructor.reconstruct(override_partitions=[2])
expected_ssync_calls = sorted([
(u'10.0.0.0', REVERT, 2, [u'3c1']),
(u'10.0.0.2', REVERT, 2, [u'061']),
(u'10.0.0.0', REVERT, 2, [u'3c1'], True),
(u'10.0.0.2', REVERT, 2, [u'061'], True),
])
self.assertEqual(expected_ssync_calls, sorted((
c['node']['ip'],
c['job']['job_type'],
c['job']['partition'],
c['suffixes'],
c.get('include_non_durable')
) for c in ssync_calls))
expected_stats = {
@ -3797,14 +3808,15 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
[(r['ip'], r['path']) for r in request_log.requests])
expected_ssync_calls = sorted([
(sync_to[0]['ip'], 0, set(['123', 'abc'])),
(sync_to[1]['ip'], 0, set(['123', 'abc'])),
(sync_to[2]['ip'], 0, set(['123', 'abc'])),
(sync_to[0]['ip'], 0, set(['123', 'abc']), False),
(sync_to[1]['ip'], 0, set(['123', 'abc']), False),
(sync_to[2]['ip'], 0, set(['123', 'abc']), False),
])
self.assertEqual(expected_ssync_calls, sorted((
c['node']['ip'],
c['job']['partition'],
set(c['suffixes']),
c.get('include_non_durable'),
) for c in ssync_calls))
def test_sync_duplicates_to_remote_region(self):
@ -3966,12 +3978,13 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
for r in request_log.requests))
expected_ssync_calls = sorted([
(sync_to[1]['ip'], 0, ['abc']),
(sync_to[1]['ip'], 0, ['abc'], False),
])
self.assertEqual(expected_ssync_calls, sorted((
c['node']['ip'],
c['job']['partition'],
c['suffixes'],
c.get('include_non_durable')
) for c in ssync_calls))
def test_process_job_primary_some_in_sync(self):
@ -4038,11 +4051,12 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
self.assertEqual(
dict(collections.Counter(
(c['node']['index'], tuple(sorted(c['suffixes'])))
(c['node']['index'], tuple(sorted(c['suffixes'])),
c.get('include_non_durable'))
for c in ssync_calls)),
{(sync_to[0]['index'], ('123',)): 1,
(sync_to[1]['index'], ('abc',)): 1,
(sync_to[2]['index'], ('123', 'abc')): 1,
{(sync_to[0]['index'], ('123',), False): 1,
(sync_to[1]['index'], ('abc',), False): 1,
(sync_to[2]['index'], ('123', 'abc'), False): 1,
})
def test_process_job_primary_down(self):
@ -4102,14 +4116,15 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
self.assertEqual(expected_suffix_calls, found_suffix_calls)
expected_ssync_calls = sorted([
('10.0.0.0', 0, set(['123', 'abc'])),
('10.0.0.1', 0, set(['123', 'abc'])),
('10.0.0.2', 0, set(['123', 'abc'])),
('10.0.0.0', 0, set(['123', 'abc']), False),
('10.0.0.1', 0, set(['123', 'abc']), False),
('10.0.0.2', 0, set(['123', 'abc']), False),
])
found_ssync_calls = sorted((
c['node']['ip'],
c['job']['partition'],
set(c['suffixes']),
c.get('include_non_durable')
) for c in ssync_calls)
self.assertEqual(expected_ssync_calls, found_ssync_calls)
@ -4276,10 +4291,11 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
self.assertEqual(
sorted(collections.Counter(
(c['node']['ip'], c['node']['port'], c['node']['device'],
tuple(sorted(c['suffixes'])))
tuple(sorted(c['suffixes'])),
c.get('include_non_durable'))
for c in ssync_calls).items()),
[((sync_to[0]['ip'], sync_to[0]['port'], sync_to[0]['device'],
('123', 'abc')), 1)])
('123', 'abc'), True), 1)])
def test_process_job_will_not_revert_to_handoff(self):
frag_index = random.randint(
@ -4331,10 +4347,11 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
self.assertEqual(
sorted(collections.Counter(
(c['node']['ip'], c['node']['port'], c['node']['device'],
tuple(sorted(c['suffixes'])))
tuple(sorted(c['suffixes'])),
c.get('include_non_durable'))
for c in ssync_calls).items()),
[((sync_to[0]['ip'], sync_to[0]['port'], sync_to[0]['device'],
('123', 'abc')), 1)])
('123', 'abc'), True), 1)])
def test_process_job_revert_is_handoff_fails(self):
frag_index = random.randint(
@ -4385,10 +4402,11 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
self.assertEqual(
sorted(collections.Counter(
(c['node']['ip'], c['node']['port'], c['node']['device'],
tuple(sorted(c['suffixes'])))
tuple(sorted(c['suffixes'])),
c.get('include_non_durable'))
for c in ssync_calls).items()),
[((sync_to[0]['ip'], sync_to[0]['port'], sync_to[0]['device'],
('123', 'abc')), 1)])
('123', 'abc'), True), 1)])
self.assertEqual(self.reconstructor.handoffs_remaining, 1)
def test_process_job_revert_cleanup(self):

View File

@ -2629,14 +2629,15 @@ class TestObjectController(unittest.TestCase):
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 201)
def test_EC_GET_PUT_data(self):
def test_EC_PUT_GET_data(self):
for policy in self.ec_policies:
ts = next(self.ts)
raw_data = (b'VERIFY' * policy.ec_segment_size)[:-432]
frag_archives = encode_frag_archive_bodies(policy, raw_data)
frag_index = random.randint(0, len(frag_archives) - 1)
# put EC frag archive
req = Request.blank('/sda1/p/a/c/o', method='PUT', headers={
'X-Timestamp': next(self.ts).internal,
'X-Timestamp': ts.internal,
'Content-Type': 'application/verify',
'Content-Length': len(frag_archives[frag_index]),
'X-Object-Sysmeta-Ec-Frag-Index': frag_index,
@ -2654,6 +2655,59 @@ class TestObjectController(unittest.TestCase):
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.body, frag_archives[frag_index])
# check the diskfile is durable
df_mgr = diskfile.ECDiskFileManager(self.conf,
self.object_controller.logger)
df = df_mgr.get_diskfile('sda1', 'p', 'a', 'c', 'o', policy,
frag_prefs=[])
with df.open():
self.assertEqual(ts, df.data_timestamp)
self.assertEqual(df.data_timestamp, df.durable_timestamp)
def test_EC_PUT_GET_data_no_commit(self):
for policy in self.ec_policies:
ts = next(self.ts)
raw_data = (b'VERIFY' * policy.ec_segment_size)[:-432]
frag_archives = encode_frag_archive_bodies(policy, raw_data)
frag_index = random.randint(0, len(frag_archives) - 1)
# put EC frag archive
req = Request.blank('/sda1/p/a/c/o', method='PUT', headers={
'X-Timestamp': ts.internal,
'Content-Type': 'application/verify',
'Content-Length': len(frag_archives[frag_index]),
'X-Backend-No-Commit': 'true',
'X-Object-Sysmeta-Ec-Frag-Index': frag_index,
'X-Backend-Storage-Policy-Index': int(policy),
})
req.body = frag_archives[frag_index]
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 201)
# get EC frag archive will 404 - nothing durable...
req = Request.blank('/sda1/p/a/c/o', headers={
'X-Backend-Storage-Policy-Index': int(policy),
})
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 404)
# ...unless we explicitly request *any* fragment...
req = Request.blank('/sda1/p/a/c/o', headers={
'X-Backend-Storage-Policy-Index': int(policy),
'X-Backend-Fragment-Preferences': '[]',
})
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.body, frag_archives[frag_index])
# check the diskfile is not durable
df_mgr = diskfile.ECDiskFileManager(self.conf,
self.object_controller.logger)
df = df_mgr.get_diskfile('sda1', 'p', 'a', 'c', 'o', policy,
frag_prefs=[])
with df.open():
self.assertEqual(ts, df.data_timestamp)
self.assertIsNone(df.durable_timestamp)
def test_EC_GET_quarantine_invalid_frag_archive(self):
policy = random.choice(self.ec_policies)
raw_data = (b'VERIFY' * policy.ec_segment_size)[:-432]
@ -7109,6 +7163,8 @@ class TestObjectController(unittest.TestCase):
headers={})
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 200)
self.assertEqual('True',
resp.headers.get('X-Backend-Accept-No-Commit'))
def test_PUT_with_full_drive(self):

View File

@ -123,7 +123,7 @@ class TestBaseSsync(BaseTest):
return self.obj_data[path]
def _create_ondisk_files(self, df_mgr, obj_name, policy, timestamp,
frag_indexes=None, commit=True):
frag_indexes=None, commit=True, **kwargs):
frag_indexes = frag_indexes or [None]
metadata = {'Content-Type': 'plain/text'}
diskfiles = []
@ -136,22 +136,22 @@ class TestBaseSsync(BaseTest):
device=self.device, partition=self.partition, account='a',
container='c', obj=obj_name, body=object_data,
extra_metadata=metadata, timestamp=timestamp, policy=policy,
frag_index=frag_index, df_mgr=df_mgr, commit=commit)
frag_index=frag_index, df_mgr=df_mgr, commit=commit, **kwargs)
diskfiles.append(df)
return diskfiles
def _open_tx_diskfile(self, obj_name, policy, frag_index=None):
def _open_tx_diskfile(self, obj_name, policy, frag_index=None, **kwargs):
df_mgr = self.daemon._df_router[policy]
df = df_mgr.get_diskfile(
self.device, self.partition, account='a', container='c',
obj=obj_name, policy=policy, frag_index=frag_index)
obj=obj_name, policy=policy, frag_index=frag_index, **kwargs)
df.open()
return df
def _open_rx_diskfile(self, obj_name, policy, frag_index=None):
def _open_rx_diskfile(self, obj_name, policy, frag_index=None, **kwargs):
df = self.rx_controller.get_diskfile(
self.device, self.partition, 'a', 'c', obj_name, policy=policy,
frag_index=frag_index, open_expired=True)
frag_index=frag_index, open_expired=True, **kwargs)
df.open()
return df
@ -261,7 +261,7 @@ class TestBaseSsync(BaseTest):
return results
def _verify_ondisk_files(self, tx_objs, policy, tx_frag_index=None,
rx_frag_index=None):
rx_frag_index=None, **kwargs):
"""
Verify tx and rx files that should be in sync.
:param tx_objs: sender diskfiles
@ -278,7 +278,7 @@ class TestBaseSsync(BaseTest):
# this diskfile should have been sync'd,
# check rx file is ok
rx_df = self._open_rx_diskfile(
o_name, policy, rx_frag_index)
o_name, policy, rx_frag_index, **kwargs)
# for EC revert job or replication etags should match
match_etag = (tx_frag_index == rx_frag_index)
self._verify_diskfile_sync(
@ -453,7 +453,7 @@ class TestSsyncEC(TestBaseSsyncEC):
rx_df_mgr, obj_name, policy, t2, (12, 13), commit=False)
expected_subreqs['PUT'].append(obj_name)
# o3 on rx has frag at other time and non-durable - PUT required
# o3 on rx has frag at newer time and non-durable - PUT required
t3 = next(self.ts_iter)
obj_name = 'o3'
tx_objs[obj_name] = self._create_ondisk_files(
@ -520,6 +520,91 @@ class TestSsyncEC(TestBaseSsyncEC):
self._verify_ondisk_files(
tx_objs, policy, frag_index, rx_node_index)
def test_handoff_non_durable_fragment(self):
# test that a sync_revert type job does PUT when the tx is non-durable
policy = POLICIES.default
rx_node_index = frag_index = 0
tx_node_index = 1
# create sender side diskfiles...
tx_objs = {}
rx_objs = {}
tx_df_mgr = self.daemon._df_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy]
expected_subreqs = defaultdict(list)
# o1 non-durable on tx and missing on rx
t1 = next(self.ts_iter) # newer non-durable tx .data
obj_name = 'o1'
tx_objs[obj_name] = self._create_ondisk_files(
tx_df_mgr, obj_name, policy, t1, (tx_node_index, rx_node_index,),
commit=False, frag_prefs=[])
expected_subreqs['PUT'].append(obj_name)
# o2 non-durable on tx and rx
t2 = next(self.ts_iter)
obj_name = 'o2'
tx_objs[obj_name] = self._create_ondisk_files(
tx_df_mgr, obj_name, policy, t2, (tx_node_index, rx_node_index,),
commit=False, frag_prefs=[])
rx_objs[obj_name] = self._create_ondisk_files(
rx_df_mgr, obj_name, policy, t2, (rx_node_index,), commit=False,
frag_prefs=[])
# o3 durable on tx and missing on rx, to check the include_non_durable
# does not exclude durables
t3 = next(self.ts_iter)
obj_name = 'o3'
tx_objs[obj_name] = self._create_ondisk_files(
tx_df_mgr, obj_name, policy, t3, (tx_node_index, rx_node_index,))
expected_subreqs['PUT'].append(obj_name)
suffixes = set()
for diskfiles in tx_objs.values():
for df in diskfiles:
suffixes.add(os.path.basename(os.path.dirname(df._datadir)))
# create ssync sender instance...with include_non_durable
job = {'device': self.device,
'partition': self.partition,
'policy': policy,
'frag_index': frag_index}
node = dict(self.rx_node)
sender = ssync_sender.Sender(self.daemon, node, job, suffixes,
include_non_durable=True)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
# run the sync protocol...
sender()
# verify protocol
results = self._analyze_trace(trace)
self.assertEqual(3, len(results['tx_missing']))
self.assertEqual(2, len(results['rx_missing']))
self.assertEqual(2, len(results['tx_updates']))
self.assertFalse(results['rx_updates'])
for subreq in results.get('tx_updates'):
obj = subreq['path'].split('/')[3]
method = subreq['method']
self.assertTrue(obj in expected_subreqs[method],
'Unexpected %s subreq for object %s, expected %s'
% (method, obj, expected_subreqs[method]))
expected_subreqs[method].remove(obj)
if method == 'PUT':
expected_body = self._get_object_data(
subreq['path'], frag_index=rx_node_index)
self.assertEqual(expected_body, subreq['body'])
# verify all expected subreqs consumed
for _method, expected in expected_subreqs.items():
self.assertFalse(expected)
# verify on disk files...
# tx_objs.pop('o4') # o4 should not have been sync'd
self._verify_ondisk_files(
tx_objs, policy, frag_index, rx_node_index, frag_prefs=[])
def test_fragment_sync(self):
# check that a sync_only type job does call reconstructor to build a
# diskfile to send, and continues making progress despite an error

View File

@ -772,6 +772,8 @@ class TestReceiver(unittest.TestCase):
@patch_policies(with_ec_default=True)
def test_MISSING_CHECK_missing_durable(self):
# check that local non-durable frag is made durable if remote sends
# same ts for same frag, but only if remote is durable
self.controller.logger = mock.MagicMock()
self.controller._diskfile_router = diskfile.DiskFileRouter(
self.conf, self.controller.logger)
@ -791,8 +793,31 @@ class TestReceiver(unittest.TestCase):
'X-Timestamp': ts1,
'Content-Length': '1'}
diskfile.write_metadata(fp, metadata1)
self.assertEqual([ts1 + '#2.data'], os.listdir(object_dir)) # sanity
# make a request - expect no data to be wanted
# offer same non-durable frag - expect no data to be wanted
req = swob.Request.blank(
'/sda1/1',
environ={'REQUEST_METHOD': 'SSYNC',
'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '0',
'HTTP_X_BACKEND_SSYNC_FRAG_INDEX': '2'},
body=':MISSING_CHECK: START\r\n' +
self.hash1 + ' ' + ts1 + ' durable:no\r\n'
':MISSING_CHECK: END\r\n'
':UPDATES: START\r\n:UPDATES: END\r\n')
resp = req.get_response(self.controller)
self.assertEqual(
self.body_lines(resp.body),
[b':MISSING_CHECK: START',
b':MISSING_CHECK: END',
b':UPDATES: START', b':UPDATES: END'])
self.assertEqual(resp.status_int, 200)
self.assertFalse(self.controller.logger.error.called)
self.assertFalse(self.controller.logger.exception.called)
# the local frag is still not durable...
self.assertEqual([ts1 + '#2.data'], os.listdir(object_dir))
# offer same frag but durable - expect no data to be wanted
req = swob.Request.blank(
'/sda1/1',
environ={'REQUEST_METHOD': 'SSYNC',
@ -811,6 +836,8 @@ class TestReceiver(unittest.TestCase):
self.assertEqual(resp.status_int, 200)
self.assertFalse(self.controller.logger.error.called)
self.assertFalse(self.controller.logger.exception.called)
# the local frag is now durable...
self.assertEqual([ts1 + '#2#d.data'], os.listdir(object_dir))
@patch_policies(with_ec_default=True)
@mock.patch('swift.obj.diskfile.ECDiskFileWriter.commit')
@ -834,6 +861,7 @@ class TestReceiver(unittest.TestCase):
'X-Timestamp': ts1,
'Content-Length': '1'}
diskfile.write_metadata(fp, metadata1)
self.assertEqual([ts1 + '#2.data'], os.listdir(object_dir)) # sanity
# make a request with commit disabled - expect data to be wanted
req = swob.Request.blank(
@ -881,6 +909,198 @@ class TestReceiver(unittest.TestCase):
'EXCEPTION in ssync.Receiver while attempting commit of',
self.controller.logger.exception.call_args[0][0])
@patch_policies(with_ec_default=True)
def test_MISSING_CHECK_local_non_durable(self):
# check that local non-durable fragment does not prevent other frags
# being wanted from the sender
self.controller.logger = mock.MagicMock()
self.controller._diskfile_router = diskfile.DiskFileRouter(
self.conf, self.controller.logger)
ts_iter = make_timestamp_iter()
ts1 = next(ts_iter).internal
ts2 = next(ts_iter).internal
ts3 = next(ts_iter).internal
# make non-durable rx disk file at ts2
object_dir = utils.storage_directory(
os.path.join(self.testdir, 'sda1',
diskfile.get_data_dir(POLICIES[0])),
'1', self.hash1)
utils.mkdirs(object_dir)
fp = open(os.path.join(object_dir, ts2 + '#2.data'), 'w+')
fp.write('1')
fp.flush()
metadata1 = {
'name': self.name1,
'X-Timestamp': ts2,
'Content-Length': '1'}
diskfile.write_metadata(fp, metadata1)
self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir)) # sanity
def do_check(tx_missing_line, expected_rx_missing_lines):
req = swob.Request.blank(
'/sda1/1',
environ={'REQUEST_METHOD': 'SSYNC',
'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '0',
'HTTP_X_BACKEND_SSYNC_FRAG_INDEX': '2'},
body=':MISSING_CHECK: START\r\n' +
tx_missing_line + '\r\n'
':MISSING_CHECK: END\r\n'
':UPDATES: START\r\n:UPDATES: END\r\n')
resp = req.get_response(self.controller)
self.assertEqual(
self.body_lines(resp.body),
[b':MISSING_CHECK: START'] +
[l.encode('ascii') for l in expected_rx_missing_lines] +
[b':MISSING_CHECK: END',
b':UPDATES: START', b':UPDATES: END'])
self.assertEqual(resp.status_int, 200)
self.assertFalse(self.controller.logger.error.called)
self.assertFalse(self.controller.logger.exception.called)
# check remote frag is always wanted - older, newer, durable or not...
do_check(self.hash1 + ' ' + ts1 + ' durable:no',
[self.hash1 + ' dm'])
self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir))
do_check(self.hash1 + ' ' + ts1 + ' durable:yes',
[self.hash1 + ' dm'])
self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir))
do_check(self.hash1 + ' ' + ts1, [self.hash1 + ' dm'])
self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir))
do_check(self.hash1 + ' ' + ts3 + ' durable:no',
[self.hash1 + ' dm'])
self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir))
do_check(self.hash1 + ' ' + ts3 + ' durable:yes',
[self.hash1 + ' dm'])
self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir))
do_check(self.hash1 + ' ' + ts3, [self.hash1 + ' dm'])
self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir))
# ... except when at same timestamp
do_check(self.hash1 + ' ' + ts2 + ' durable:no', [])
self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir))
# durable remote frag at ts2 will make the local durable..
do_check(self.hash1 + ' ' + ts2 + ' durable:yes', [])
self.assertEqual([ts2 + '#2#d.data'], os.listdir(object_dir))
@patch_policies(with_ec_default=True)
def test_MISSING_CHECK_local_durable(self):
# check that local durable fragment does not prevent newer non-durable
# frags being wanted from the sender
self.controller.logger = mock.MagicMock()
self.controller._diskfile_router = diskfile.DiskFileRouter(
self.conf, self.controller.logger)
ts_iter = make_timestamp_iter()
ts1 = next(ts_iter).internal
ts2 = next(ts_iter).internal
ts3 = next(ts_iter).internal
# make non-durable rx disk file at ts2
object_dir = utils.storage_directory(
os.path.join(self.testdir, 'sda1',
diskfile.get_data_dir(POLICIES[0])),
'1', self.hash1)
utils.mkdirs(object_dir)
fp = open(os.path.join(object_dir, ts2 + '#2.data'), 'w+')
fp.write('1')
fp.flush()
metadata1 = {
'name': self.name1,
'X-Timestamp': ts2,
'Content-Length': '1'}
diskfile.write_metadata(fp, metadata1)
self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir)) # sanity
def do_check(tx_missing_line, expected_rx_missing_lines):
req = swob.Request.blank(
'/sda1/1',
environ={'REQUEST_METHOD': 'SSYNC',
'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '0',
'HTTP_X_BACKEND_SSYNC_FRAG_INDEX': '2'},
body=':MISSING_CHECK: START\r\n' +
tx_missing_line + '\r\n'
':MISSING_CHECK: END\r\n'
':UPDATES: START\r\n:UPDATES: END\r\n')
resp = req.get_response(self.controller)
self.assertEqual(
self.body_lines(resp.body),
[b':MISSING_CHECK: START'] +
[l.encode('ascii') for l in expected_rx_missing_lines] +
[b':MISSING_CHECK: END',
b':UPDATES: START', b':UPDATES: END'])
self.assertEqual(resp.status_int, 200)
self.assertFalse(self.controller.logger.error.called)
self.assertFalse(self.controller.logger.exception.called)
# check remote frag is always wanted - older, newer, durable or not...
do_check(self.hash1 + ' ' + ts1 + ' durable:no',
[self.hash1 + ' dm'])
self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir))
do_check(self.hash1 + ' ' + ts1 + ' durable:yes',
[self.hash1 + ' dm'])
self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir))
do_check(self.hash1 + ' ' + ts1, [self.hash1 + ' dm'])
self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir))
do_check(self.hash1 + ' ' + ts3 + ' durable:no',
[self.hash1 + ' dm'])
self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir))
do_check(self.hash1 + ' ' + ts3 + ' durable:yes',
[self.hash1 + ' dm'])
self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir))
do_check(self.hash1 + ' ' + ts3, [self.hash1 + ' dm'])
self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir))
# ... except when at same timestamp
do_check(self.hash1 + ' ' + ts2 + ' durable:no', [])
self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir))
# durable remote frag at ts2 will make the local durable..
do_check(self.hash1 + ' ' + ts2 + ' durable:yes', [])
self.assertEqual([ts2 + '#2#d.data'], os.listdir(object_dir))
@patch_policies(with_ec_default=True)
def test_MISSING_CHECK_local_durable_older_than_remote_non_durable(self):
# check that newer non-durable fragment is wanted
self.controller.logger = mock.MagicMock()
self.controller._diskfile_router = diskfile.DiskFileRouter(
self.conf, self.controller.logger)
ts_iter = make_timestamp_iter()
ts1 = next(ts_iter).internal
ts2 = next(ts_iter).internal
# make durable rx disk file at ts2
object_dir = utils.storage_directory(
os.path.join(self.testdir, 'sda1',
diskfile.get_data_dir(POLICIES[0])),
'1', self.hash1)
utils.mkdirs(object_dir)
fp = open(os.path.join(object_dir, ts1 + '#2#d.data'), 'w+')
fp.write('1')
fp.flush()
metadata1 = {
'name': self.name1,
'X-Timestamp': ts1,
'Content-Length': '1'}
diskfile.write_metadata(fp, metadata1)
# make a request offering non-durable at ts2
req = swob.Request.blank(
'/sda1/1',
environ={'REQUEST_METHOD': 'SSYNC',
'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '0',
'HTTP_X_BACKEND_SSYNC_FRAG_INDEX': '2'},
body=':MISSING_CHECK: START\r\n' +
self.hash1 + ' ' + ts2 + ' durable:no\r\n'
':MISSING_CHECK: END\r\n'
':UPDATES: START\r\n:UPDATES: END\r\n')
resp = req.get_response(self.controller)
self.assertEqual(
self.body_lines(resp.body),
[b':MISSING_CHECK: START',
(self.hash1 + ' dm').encode('ascii'),
b':MISSING_CHECK: END',
b':UPDATES: START', b':UPDATES: END'])
self.assertEqual(resp.status_int, 200)
self.assertFalse(self.controller.logger.error.called)
self.assertFalse(self.controller.logger.exception.called)
def test_MISSING_CHECK_storage_policy(self):
# update router post policy patch
self.controller._diskfile_router = diskfile.DiskFileRouter(
@ -1499,6 +1719,7 @@ class TestReceiver(unittest.TestCase):
'X-Object-Meta-Test1: one\r\n'
'Content-Encoding: gzip\r\n'
'Specialty-Header: value\r\n'
'X-Backend-No-Commit: True\r\n'
'\r\n'
'1')
resp = req.get_response(self.controller)
@ -1520,9 +1741,11 @@ class TestReceiver(unittest.TestCase):
'X-Object-Meta-Test1': 'one',
'Content-Encoding': 'gzip',
'Specialty-Header': 'value',
'X-Backend-No-Commit': 'True',
'Host': 'localhost:80',
'X-Backend-Storage-Policy-Index': '0',
'X-Backend-Replication': 'True',
# note: Etag and X-Backend-No-Commit not in replication-headers
'X-Backend-Replication-Headers': (
'content-length x-timestamp x-object-meta-test1 '
'content-encoding specialty-header')})
@ -1530,7 +1753,8 @@ class TestReceiver(unittest.TestCase):
def test_UPDATES_PUT_replication_headers(self):
self.controller.logger = mock.MagicMock()
# sanity check - regular PUT will not persist Specialty-Header
# sanity check - regular PUT will not persist Specialty-Header or
# X-Backend-No-Commit
req = swob.Request.blank(
'/sda1/0/a/c/o1', body='1',
environ={'REQUEST_METHOD': 'PUT'},
@ -1540,6 +1764,7 @@ class TestReceiver(unittest.TestCase):
'X-Timestamp': '1364456113.12344',
'X-Object-Meta-Test1': 'one',
'Content-Encoding': 'gzip',
'X-Backend-No-Commit': 'False',
'Specialty-Header': 'value'})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 201)
@ -1547,6 +1772,7 @@ class TestReceiver(unittest.TestCase):
'sda1', '0', 'a', 'c', 'o1', POLICIES.default)
df.open()
self.assertFalse('Specialty-Header' in df.get_metadata())
self.assertFalse('X-Backend-No-Commit' in df.get_metadata())
# an SSYNC request can override PUT header filtering...
req = swob.Request.blank(
@ -1561,6 +1787,7 @@ class TestReceiver(unittest.TestCase):
'X-Timestamp: 1364456113.12344\r\n'
'X-Object-Meta-Test1: one\r\n'
'Content-Encoding: gzip\r\n'
'X-Backend-No-Commit: False\r\n'
'Specialty-Header: value\r\n'
'\r\n'
'1')
@ -1572,7 +1799,7 @@ class TestReceiver(unittest.TestCase):
self.assertEqual(resp.status_int, 200)
# verify diskfile has metadata permitted by replication headers
# including Specialty-Header
# including Specialty-Header, but not Etag or X-Backend-No-Commit
df = self.controller.get_diskfile(
'sda1', '0', 'a', 'c', 'o2', POLICIES.default)
df.open()
@ -2264,7 +2491,8 @@ class TestModuleMethods(unittest.TestCase):
expected = dict(object_hash=object_hash,
ts_meta=t_data,
ts_data=t_data,
ts_ctype=t_data)
ts_ctype=t_data,
durable=True)
self.assertEqual(expected,
ssync_receiver.decode_missing(msg.encode('ascii')))
@ -2273,7 +2501,8 @@ class TestModuleMethods(unittest.TestCase):
expected = dict(object_hash=object_hash,
ts_data=t_data,
ts_meta=t_meta,
ts_ctype=t_data)
ts_ctype=t_data,
durable=True)
self.assertEqual(expected,
ssync_receiver.decode_missing(msg.encode('ascii')))
@ -2283,7 +2512,8 @@ class TestModuleMethods(unittest.TestCase):
expected = dict(object_hash=object_hash,
ts_data=t_data,
ts_meta=t_meta,
ts_ctype=t_ctype)
ts_ctype=t_ctype,
durable=True)
self.assertEqual(
expected, ssync_receiver.decode_missing(msg.encode('ascii')))
@ -2298,7 +2528,8 @@ class TestModuleMethods(unittest.TestCase):
expected = dict(object_hash=object_hash,
ts_data=t_data,
ts_meta=t_meta,
ts_ctype=t_data)
ts_ctype=t_data,
durable=True)
self.assertEqual(
expected, ssync_receiver.decode_missing(msg.encode('ascii')))
@ -2307,7 +2538,8 @@ class TestModuleMethods(unittest.TestCase):
expected = dict(object_hash=object_hash,
ts_meta=t_data,
ts_data=t_data,
ts_ctype=t_data)
ts_ctype=t_data,
durable=True)
self.assertEqual(expected,
ssync_receiver.decode_missing(msg.encode('ascii')))
@ -2318,7 +2550,8 @@ class TestModuleMethods(unittest.TestCase):
expected = dict(object_hash=object_hash,
ts_meta=t_meta,
ts_data=t_data,
ts_ctype=t_data)
ts_ctype=t_data,
durable=True)
self.assertEqual(
expected, ssync_receiver.decode_missing(msg.encode('ascii')))
@ -2329,10 +2562,45 @@ class TestModuleMethods(unittest.TestCase):
expected = dict(object_hash=object_hash,
ts_meta=t_meta,
ts_data=t_data,
ts_ctype=t_data)
ts_ctype=t_data,
durable=True)
self.assertEqual(expected,
ssync_receiver.decode_missing(msg.encode('ascii')))
# not durable
def check_non_durable(durable_val):
msg = '%s %s m:%x,durable:%s' % (object_hash,
t_data.internal,
d_meta_data,
durable_val)
expected = dict(object_hash=object_hash,
ts_meta=t_meta,
ts_data=t_data,
ts_ctype=t_data,
durable=False)
self.assertEqual(
expected, ssync_receiver.decode_missing(msg.encode('ascii')))
check_non_durable('no')
check_non_durable('false')
check_non_durable('False')
# explicit durable (as opposed to True by default)
def check_durable(durable_val):
msg = '%s %s m:%x,durable:%s' % (object_hash,
t_data.internal,
d_meta_data,
durable_val)
expected = dict(object_hash=object_hash,
ts_meta=t_meta,
ts_data=t_data,
ts_ctype=t_data,
durable=True)
self.assertEqual(
expected, ssync_receiver.decode_missing(msg.encode('ascii')))
check_durable('yes')
check_durable('true')
check_durable('True')
def test_encode_wanted(self):
ts_iter = make_timestamp_iter()
old_t_data = next(ts_iter)

View File

@ -55,7 +55,7 @@ class NullBufferedHTTPConnection(object):
class FakeResponse(ssync_sender.SsyncBufferedHTTPResponse):
def __init__(self, chunk_body=''):
def __init__(self, chunk_body='', headers=None):
self.status = 200
self.close_called = False
if not six.PY2:
@ -65,6 +65,7 @@ class FakeResponse(ssync_sender.SsyncBufferedHTTPResponse):
b'%x\r\n%s\r\n0\r\n\r\n' % (len(chunk_body), chunk_body))
self.ssync_response_buffer = b''
self.ssync_response_chunk_left = 0
self.headers = headers or {}
def read(self, *args, **kwargs):
return b''
@ -72,6 +73,12 @@ class FakeResponse(ssync_sender.SsyncBufferedHTTPResponse):
def close(self):
self.close_called = True
def getheader(self, header_name, default=None):
return str(self.headers.get(header_name, default))
def getheaders(self):
return self.headers.items()
class FakeConnection(object):
@ -380,6 +387,56 @@ class TestSender(BaseTest):
method_name, mock_method.mock_calls,
expected_calls))
def _do_test_connect_include_non_durable(self,
include_non_durable,
resp_headers):
# construct sender and make connect call
node = dict(replication_ip='1.2.3.4', replication_port=5678,
device='sda1', backend_index=0)
job = dict(partition='9', policy=POLICIES[1])
sender = ssync_sender.Sender(self.daemon, node, job, None,
include_non_durable=include_non_durable)
self.assertEqual(include_non_durable, sender.include_non_durable)
with mock.patch(
'swift.obj.ssync_sender.SsyncBufferedHTTPConnection'
) as mock_conn_class:
mock_conn = mock_conn_class.return_value
mock_conn.getresponse.return_value = FakeResponse('', resp_headers)
sender.connect()
mock_conn_class.assert_called_once_with('1.2.3.4:5678')
return sender
def test_connect_legacy_receiver(self):
sender = self._do_test_connect_include_non_durable(False, {})
self.assertFalse(sender.include_non_durable)
warnings = self.daemon_logger.get_lines_for_level('warning')
self.assertEqual([], warnings)
def test_connect_upgraded_receiver(self):
resp_hdrs = {'x-backend-accept-no-commit': 'True'}
sender = self._do_test_connect_include_non_durable(False, resp_hdrs)
# 'x-backend-accept-no-commit' in response does not override
# sender.include_non_durable
self.assertFalse(sender.include_non_durable)
warnings = self.daemon_logger.get_lines_for_level('warning')
self.assertEqual([], warnings)
def test_connect_legacy_receiver_include_non_durable(self):
sender = self._do_test_connect_include_non_durable(True, {})
# no 'x-backend-accept-no-commit' in response,
# sender.include_non_durable has been overridden
self.assertFalse(sender.include_non_durable)
warnings = self.daemon_logger.get_lines_for_level('warning')
self.assertEqual(['ssync receiver 1.2.3.4:5678 does not accept '
'non-durable fragments'], warnings)
def test_connect_upgraded_receiver_include_non_durable(self):
resp_hdrs = {'x-backend-accept-no-commit': 'True'}
sender = self._do_test_connect_include_non_durable(True, resp_hdrs)
self.assertTrue(sender.include_non_durable)
warnings = self.daemon_logger.get_lines_for_level('warning')
self.assertEqual([], warnings)
def test_call(self):
def patch_sender(sender, available_map, send_map):
connection = FakeConnection()
@ -1465,7 +1522,7 @@ class TestSender(BaseTest):
exc = err
self.assertEqual(str(exc), '0.01 seconds: send_put chunk')
def _check_send_put(self, obj_name, meta_value):
def _check_send_put(self, obj_name, meta_value, durable=True):
ts_iter = make_timestamp_iter()
t1 = next(ts_iter)
body = b'test'
@ -1473,7 +1530,8 @@ class TestSender(BaseTest):
u'Unicode-Meta-Name': meta_value}
df = self._make_open_diskfile(obj=obj_name, body=body,
timestamp=t1,
extra_metadata=extra_metadata)
extra_metadata=extra_metadata,
commit=durable)
expected = dict(df.get_metadata())
expected['body'] = body if six.PY2 else body.decode('ascii')
expected['chunk_size'] = len(body)
@ -1481,14 +1539,17 @@ class TestSender(BaseTest):
wire_meta = meta_value if six.PY2 else meta_value.encode('utf8')
path = six.moves.urllib.parse.quote(expected['name'])
expected['path'] = path
expected['length'] = format(145 + len(path) + len(wire_meta), 'x')
no_commit = '' if durable else 'X-Backend-No-Commit: True\r\n'
expected['no_commit'] = no_commit
length = 145 + len(path) + len(wire_meta) + len(no_commit)
expected['length'] = format(length, 'x')
# .meta file metadata is not included in expected for data only PUT
t2 = next(ts_iter)
metadata = {'X-Timestamp': t2.internal, 'X-Object-Meta-Fruit': 'kiwi'}
df.write_metadata(metadata)
df.open()
connection = FakeConnection()
self.sender.send_put(connection, path, df)
self.sender.send_put(connection, path, df, durable=durable)
expected = (
'%(length)s\r\n'
'PUT %(path)s\r\n'
@ -1496,6 +1557,7 @@ class TestSender(BaseTest):
'ETag: %(ETag)s\r\n'
'Some-Other-Header: value\r\n'
'Unicode-Meta-Name: %(meta)s\r\n'
'%(no_commit)s'
'X-Timestamp: %(X-Timestamp)s\r\n'
'\r\n'
'\r\n'
@ -1508,6 +1570,9 @@ class TestSender(BaseTest):
def test_send_put(self):
self._check_send_put('o', 'meta')
def test_send_put_non_durable(self):
self._check_send_put('o', 'meta', durable=False)
def test_send_put_unicode(self):
if six.PY2:
self._check_send_put(
@ -1575,6 +1640,174 @@ class TestSender(BaseTest):
self.assertTrue(connection.closed)
@patch_policies(with_ec_default=True)
class TestSenderEC(BaseTest):
def setUp(self):
skip_if_no_xattrs()
super(TestSenderEC, self).setUp()
self.daemon_logger = debug_logger('test-ssync-sender')
self.daemon = ObjectReplicator(self.daemon_conf,
self.daemon_logger)
job = {'policy': POLICIES.legacy} # sufficient for Sender.__init__
self.sender = ssync_sender.Sender(self.daemon, None, job, None)
def test_missing_check_non_durable(self):
# sender has durable and non-durable data files for frag index 2
ts_iter = make_timestamp_iter()
frag_index = 2
device = 'dev'
part = '9'
object_parts = ('a', 'c', 'o')
object_hash = utils.hash_path(*object_parts)
# older durable data file at t1
t1 = next(ts_iter)
df_durable = self._make_diskfile(
device, part, *object_parts, timestamp=t1, policy=POLICIES.default,
frag_index=frag_index, commit=True, verify=False)
with df_durable.open():
self.assertEqual(t1, df_durable.durable_timestamp) # sanity
# newer non-durable data file at t2
t2 = next(ts_iter)
df_non_durable = self._make_diskfile(
device, part, *object_parts, timestamp=t2, policy=POLICIES.default,
frag_index=frag_index, commit=False, frag_prefs=[])
with df_non_durable.open():
self.assertNotEqual(df_non_durable.data_timestamp,
df_non_durable.durable_timestamp) # sanity
self.sender.job = {
'device': device,
'partition': part,
'policy': POLICIES.default,
'frag_index': frag_index,
}
self.sender.node = {}
# First call missing check with sender in default mode - expect the
# non-durable frag to be ignored
response = FakeResponse(
chunk_body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n')
connection = FakeConnection()
available_map, send_map = self.sender.missing_check(connection,
response)
self.assertEqual(
b''.join(connection.sent),
b'17\r\n:MISSING_CHECK: START\r\n\r\n'
b'33\r\n' + object_hash.encode('utf8') +
b' ' + t1.internal.encode('utf8') + b'\r\n\r\n'
b'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(
available_map, {object_hash: {'ts_data': t1, 'durable': True}})
# Now make sender send non-durables and repeat missing_check - this
# time the durable is ignored and the non-durable is included in
# available_map (but NOT sent to receiver)
self.sender.include_non_durable = True
response = FakeResponse(
chunk_body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n')
connection = FakeConnection()
available_map, send_map = self.sender.missing_check(connection,
response)
self.assertEqual(
b''.join(connection.sent),
b'17\r\n:MISSING_CHECK: START\r\n\r\n'
b'41\r\n' + object_hash.encode('utf8') +
b' ' + t2.internal.encode('utf8') + b' durable:False\r\n\r\n'
b'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(
available_map, {object_hash: {'ts_data': t2, 'durable': False}})
# Finally, purge the non-durable frag and repeat missing-check to
# confirm that the durable frag is now found and sent to receiver
df_non_durable.purge(t2, frag_index)
response = FakeResponse(
chunk_body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n')
connection = FakeConnection()
available_map, send_map = self.sender.missing_check(connection,
response)
self.assertEqual(
b''.join(connection.sent),
b'17\r\n:MISSING_CHECK: START\r\n\r\n'
b'33\r\n' + object_hash.encode('utf8') +
b' ' + t1.internal.encode('utf8') + b'\r\n\r\n'
b'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(
available_map, {object_hash: {'ts_data': t1, 'durable': True}})
def test_updates_put_non_durable(self):
# sender has durable and non-durable data files for frag index 2 and is
# initialised to include non-durables
ts_iter = make_timestamp_iter()
frag_index = 2
device = 'dev'
part = '9'
object_parts = ('a', 'c', 'o')
object_hash = utils.hash_path(*object_parts)
# older durable data file
t1 = next(ts_iter)
df_durable = self._make_diskfile(
device, part, *object_parts, timestamp=t1, policy=POLICIES.default,
frag_index=frag_index, commit=True, verify=False)
with df_durable.open():
self.assertEqual(t1, df_durable.durable_timestamp) # sanity
# newer non-durable data file
t2 = next(ts_iter)
df_non_durable = self._make_diskfile(
device, part, *object_parts, timestamp=t2, policy=POLICIES.default,
frag_index=frag_index, commit=False, frag_prefs=[])
with df_non_durable.open():
self.assertNotEqual(df_non_durable.data_timestamp,
df_non_durable.durable_timestamp) # sanity
# pretend receiver requested data only
send_map = {object_hash: {'data': True}}
def check_updates(include_non_durable, expected_durable_kwarg):
# call updates and check that the call to send_put is as expected
self.sender.include_non_durable = include_non_durable
self.sender.job = {
'device': device,
'partition': part,
'policy': POLICIES.default,
'frag_index': frag_index,
}
self.sender.node = {}
self.sender.send_delete = mock.MagicMock()
self.sender.send_put = mock.MagicMock()
self.sender.send_post = mock.MagicMock()
response = FakeResponse(
chunk_body=':UPDATES: START\r\n:UPDATES: END\r\n')
connection = FakeConnection()
self.sender.updates(connection, response, send_map)
self.assertEqual(self.sender.send_delete.mock_calls, [])
self.assertEqual(self.sender.send_post.mock_calls, [])
self.assertEqual(1, len(self.sender.send_put.mock_calls))
args, kwargs = self.sender.send_put.call_args
connection, path, df_non_durable = args
self.assertEqual(path, '/a/c/o')
self.assertEqual({'durable': expected_durable_kwarg}, kwargs)
# note that the put line isn't actually sent since we mock
# send_put; send_put is tested separately.
self.assertEqual(
b''.join(connection.sent),
b'11\r\n:UPDATES: START\r\n\r\n'
b'f\r\n:UPDATES: END\r\n\r\n')
# note: we never expect the (False, False) case
check_updates(include_non_durable=False, expected_durable_kwarg=True)
# non-durable frag is newer so is sent
check_updates(include_non_durable=True, expected_durable_kwarg=False)
# remove the newer non-durable frag so that the durable frag is sent...
df_non_durable.purge(t2, frag_index)
check_updates(include_non_durable=True, expected_durable_kwarg=True)
class TestModuleMethods(unittest.TestCase):
def test_encode_missing(self):
object_hash = '9d41d8cd98f00b204e9800998ecf0abc'
@ -1618,15 +1851,35 @@ class TestModuleMethods(unittest.TestCase):
expected.encode('ascii'),
ssync_sender.encode_missing(object_hash, t_data, t_meta, t_type))
# optional durable param
expected = ('%s %s m:%x,t:%x'
% (object_hash, t_data.internal, d_meta_data, d_type_data))
self.assertEqual(
expected.encode('ascii'),
ssync_sender.encode_missing(object_hash, t_data, t_meta, t_type,
durable=None))
expected = ('%s %s m:%x,t:%x,durable:False'
% (object_hash, t_data.internal, d_meta_data, d_type_data))
self.assertEqual(
expected.encode('ascii'),
ssync_sender.encode_missing(object_hash, t_data, t_meta, t_type,
durable=False))
expected = ('%s %s m:%x,t:%x'
% (object_hash, t_data.internal, d_meta_data, d_type_data))
self.assertEqual(
expected.encode('ascii'),
ssync_sender.encode_missing(object_hash, t_data, t_meta, t_type,
durable=True))
# test encode and decode functions invert
expected = {'object_hash': object_hash, 'ts_meta': t_meta,
'ts_data': t_data, 'ts_ctype': t_type}
'ts_data': t_data, 'ts_ctype': t_type, 'durable': False}
msg = ssync_sender.encode_missing(**expected)
actual = ssync_receiver.decode_missing(msg)
self.assertEqual(expected, actual)
expected = {'object_hash': object_hash, 'ts_meta': t_meta,
'ts_data': t_meta, 'ts_ctype': t_meta}
'ts_data': t_meta, 'ts_ctype': t_meta, 'durable': True}
msg = ssync_sender.encode_missing(**expected)
actual = ssync_receiver.decode_missing(msg)
self.assertEqual(expected, actual)