Re-organise ssync tests
We have some tests that exercise both the sender and receiver, but are spread across test_ssync_sender.py and test_ssync_receiver.py. This creates a new module test_ssync.py and moves the end-to-end tests into there. Change-Id: Iea3e9932734924453f7241432afda90abbc75c06
This commit is contained in:
parent
ec9095bf58
commit
6858510b59
80
test/unit/obj/common.py
Normal file
80
test/unit/obj/common.py
Normal file
@ -0,0 +1,80 @@
|
||||
# Copyright (c) 2013 - 2015 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import hashlib
|
||||
import shutil
|
||||
import tempfile
|
||||
import unittest
|
||||
import time
|
||||
|
||||
from swift.common.storage_policy import POLICIES
|
||||
from swift.common.utils import Timestamp
|
||||
from swift.obj import diskfile
|
||||
|
||||
from test.unit import debug_logger
|
||||
|
||||
|
||||
class FakeReplicator(object):
|
||||
def __init__(self, testdir, policy=None):
|
||||
self.logger = debug_logger('test-ssync-sender')
|
||||
self.conn_timeout = 1
|
||||
self.node_timeout = 2
|
||||
self.http_timeout = 3
|
||||
self.network_chunk_size = 65536
|
||||
self.disk_chunk_size = 4096
|
||||
conf = {
|
||||
'devices': testdir,
|
||||
'mount_check': 'false',
|
||||
}
|
||||
policy = POLICIES.default if policy is None else policy
|
||||
self._diskfile_router = diskfile.DiskFileRouter(conf, self.logger)
|
||||
self._diskfile_mgr = self._diskfile_router[policy]
|
||||
|
||||
|
||||
class BaseTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# daemon will be set in subclass setUp
|
||||
self.daemon = None
|
||||
self.tmpdir = tempfile.mkdtemp()
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.tmpdir, ignore_errors=True)
|
||||
|
||||
def _make_open_diskfile(self, device='dev', partition='9',
|
||||
account='a', container='c', obj='o', body='test',
|
||||
extra_metadata=None, policy=None,
|
||||
frag_index=None, timestamp=None, df_mgr=None):
|
||||
policy = policy or POLICIES.legacy
|
||||
object_parts = account, container, obj
|
||||
timestamp = Timestamp(time.time()) if timestamp is None else timestamp
|
||||
if df_mgr is None:
|
||||
df_mgr = self.daemon._diskfile_router[policy]
|
||||
df = df_mgr.get_diskfile(
|
||||
device, partition, *object_parts, policy=policy,
|
||||
frag_index=frag_index)
|
||||
content_length = len(body)
|
||||
etag = hashlib.md5(body).hexdigest()
|
||||
with df.create() as writer:
|
||||
writer.write(body)
|
||||
metadata = {
|
||||
'X-Timestamp': timestamp.internal,
|
||||
'Content-Length': str(content_length),
|
||||
'ETag': etag,
|
||||
}
|
||||
if extra_metadata:
|
||||
metadata.update(extra_metadata)
|
||||
writer.put(metadata)
|
||||
writer.commit(timestamp)
|
||||
df.open()
|
||||
return df
|
909
test/unit/obj/test_ssync.py
Normal file
909
test/unit/obj/test_ssync.py
Normal file
@ -0,0 +1,909 @@
|
||||
# Copyright (c) 2013 - 2015 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from collections import defaultdict
|
||||
|
||||
import mock
|
||||
import os
|
||||
import time
|
||||
import unittest
|
||||
|
||||
import eventlet
|
||||
import itertools
|
||||
from six.moves import urllib
|
||||
|
||||
from swift.common.exceptions import DiskFileNotExist, DiskFileError, \
|
||||
DiskFileDeleted
|
||||
from swift.common import utils
|
||||
from swift.common.storage_policy import POLICIES
|
||||
from swift.common.utils import Timestamp
|
||||
from swift.obj import ssync_sender, server
|
||||
from swift.obj.reconstructor import RebuildingECDiskFileStream
|
||||
|
||||
from test.unit import patch_policies
|
||||
from test.unit.obj.common import BaseTest, FakeReplicator
|
||||
|
||||
|
||||
class TestBaseSsync(BaseTest):
|
||||
"""
|
||||
Provides a framework to test end to end interactions between sender and
|
||||
receiver. The basis for each test is actual diskfile state on either side.
|
||||
The connection between sender and receiver is wrapped to capture ssync
|
||||
traffic for subsequent verification of the protocol. Assertions are made
|
||||
about the final state of the sender and receiver diskfiles.
|
||||
"""
|
||||
def setUp(self):
|
||||
super(TestBaseSsync, self).setUp()
|
||||
self.device = 'dev'
|
||||
self.partition = '9'
|
||||
# sender side setup
|
||||
self.tx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender')
|
||||
utils.mkdirs(os.path.join(self.tx_testdir, self.device))
|
||||
self.daemon = FakeReplicator(self.tx_testdir)
|
||||
|
||||
# rx side setup
|
||||
self.rx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_receiver')
|
||||
utils.mkdirs(os.path.join(self.rx_testdir, self.device))
|
||||
conf = {
|
||||
'devices': self.rx_testdir,
|
||||
'mount_check': 'false',
|
||||
'replication_one_per_device': 'false',
|
||||
'log_requests': 'false'}
|
||||
self.rx_controller = server.ObjectController(conf)
|
||||
self.ts_iter = (Timestamp(t)
|
||||
for t in itertools.count(int(time.time())))
|
||||
self.rx_ip = '127.0.0.1'
|
||||
sock = eventlet.listen((self.rx_ip, 0))
|
||||
self.rx_server = eventlet.spawn(
|
||||
eventlet.wsgi.server, sock, self.rx_controller, utils.NullLogger())
|
||||
self.rx_port = sock.getsockname()[1]
|
||||
self.rx_node = {'replication_ip': self.rx_ip,
|
||||
'replication_port': self.rx_port,
|
||||
'device': self.device}
|
||||
|
||||
def tearDown(self):
|
||||
self.rx_server.kill()
|
||||
super(TestBaseSsync, self).tearDown()
|
||||
|
||||
def make_connect_wrapper(self, sender):
|
||||
"""
|
||||
Make a wrapper function for the ssync_sender.Sender.connect() method
|
||||
that will in turn wrap the HTTConnection.send() and the
|
||||
Sender.readline() so that ssync protocol messages can be captured.
|
||||
"""
|
||||
orig_connect = sender.connect
|
||||
trace = dict(messages=[])
|
||||
|
||||
def add_trace(type, msg):
|
||||
# record a protocol event for later analysis
|
||||
if msg.strip():
|
||||
trace['messages'].append((type, msg.strip()))
|
||||
|
||||
def make_send_wrapper(send):
|
||||
def wrapped_send(msg):
|
||||
_msg = msg.split('\r\n', 1)[1]
|
||||
_msg = _msg.rsplit('\r\n', 1)[0]
|
||||
add_trace('tx', _msg)
|
||||
send(msg)
|
||||
return wrapped_send
|
||||
|
||||
def make_readline_wrapper(readline):
|
||||
def wrapped_readline():
|
||||
data = readline()
|
||||
add_trace('rx', data)
|
||||
bytes_read = trace.setdefault('readline_bytes', 0)
|
||||
trace['readline_bytes'] = bytes_read + len(data)
|
||||
return data
|
||||
return wrapped_readline
|
||||
|
||||
def wrapped_connect():
|
||||
orig_connect()
|
||||
sender.connection.send = make_send_wrapper(
|
||||
sender.connection.send)
|
||||
sender.readline = make_readline_wrapper(sender.readline)
|
||||
return wrapped_connect, trace
|
||||
|
||||
def _create_ondisk_files(self, df_mgr, obj_name, policy, timestamp,
|
||||
frag_indexes=None):
|
||||
frag_indexes = [None] if frag_indexes is None else frag_indexes
|
||||
metadata = {'Content-Type': 'plain/text'}
|
||||
diskfiles = []
|
||||
for frag_index in frag_indexes:
|
||||
object_data = '/a/c/%s___%s' % (obj_name, frag_index)
|
||||
if frag_index is not None:
|
||||
metadata['X-Object-Sysmeta-Ec-Frag-Index'] = str(frag_index)
|
||||
df = self._make_open_diskfile(
|
||||
device=self.device, partition=self.partition, account='a',
|
||||
container='c', obj=obj_name, body=object_data,
|
||||
extra_metadata=metadata, timestamp=timestamp, policy=policy,
|
||||
frag_index=frag_index, df_mgr=df_mgr)
|
||||
# sanity checks
|
||||
listing = os.listdir(df._datadir)
|
||||
self.assertTrue(listing)
|
||||
for filename in listing:
|
||||
self.assertTrue(filename.startswith(timestamp.internal))
|
||||
diskfiles.append(df)
|
||||
return diskfiles
|
||||
|
||||
def _open_tx_diskfile(self, obj_name, policy, frag_index=None):
|
||||
df_mgr = self.daemon._diskfile_router[policy]
|
||||
df = df_mgr.get_diskfile(
|
||||
self.device, self.partition, account='a', container='c',
|
||||
obj=obj_name, policy=policy, frag_index=frag_index)
|
||||
df.open()
|
||||
return df
|
||||
|
||||
def _open_rx_diskfile(self, obj_name, policy, frag_index=None):
|
||||
df = self.rx_controller.get_diskfile(
|
||||
self.device, self.partition, 'a', 'c', obj_name, policy=policy,
|
||||
frag_index=frag_index)
|
||||
df.open()
|
||||
return df
|
||||
|
||||
def _verify_diskfile_sync(self, tx_df, rx_df, frag_index, same_etag=False):
|
||||
# verify that diskfiles' metadata match
|
||||
# sanity check, they are not the same ondisk files!
|
||||
self.assertNotEqual(tx_df._datadir, rx_df._datadir)
|
||||
rx_metadata = dict(rx_df.get_metadata())
|
||||
for k, v in tx_df.get_metadata().items():
|
||||
if k == 'X-Object-Sysmeta-Ec-Frag-Index':
|
||||
# if tx_df had a frag_index then rx_df should also have one
|
||||
self.assertTrue(k in rx_metadata)
|
||||
self.assertEqual(frag_index, int(rx_metadata.pop(k)))
|
||||
elif k == 'ETag' and not same_etag:
|
||||
self.assertNotEqual(v, rx_metadata.pop(k, None))
|
||||
continue
|
||||
else:
|
||||
self.assertEqual(v, rx_metadata.pop(k), k)
|
||||
self.assertFalse(rx_metadata)
|
||||
expected_body = '%s___%s' % (tx_df._name, frag_index)
|
||||
actual_body = ''.join([chunk for chunk in rx_df.reader()])
|
||||
self.assertEqual(expected_body, actual_body)
|
||||
|
||||
def _analyze_trace(self, trace):
|
||||
"""
|
||||
Parse protocol trace captured by fake connection, making some
|
||||
assertions along the way, and return results as a dict of form:
|
||||
results = {'tx_missing': <list of messages>,
|
||||
'rx_missing': <list of messages>,
|
||||
'tx_updates': <list of subreqs>,
|
||||
'rx_updates': <list of messages>}
|
||||
|
||||
Each subreq is a dict with keys: 'method', 'path', 'headers', 'body'
|
||||
"""
|
||||
def tx_missing(results, line):
|
||||
self.assertEqual('tx', line[0])
|
||||
results['tx_missing'].append(line[1])
|
||||
|
||||
def rx_missing(results, line):
|
||||
self.assertEqual('rx', line[0])
|
||||
parts = line[1].split('\r\n')
|
||||
for part in parts:
|
||||
results['rx_missing'].append(part)
|
||||
|
||||
def tx_updates(results, line):
|
||||
self.assertEqual('tx', line[0])
|
||||
subrequests = results['tx_updates']
|
||||
if line[1].startswith(('PUT', 'DELETE', 'POST')):
|
||||
parts = line[1].split('\r\n')
|
||||
method, path = parts[0].split()
|
||||
subreq = {'method': method, 'path': path, 'req': line[1],
|
||||
'headers': parts[1:]}
|
||||
subrequests.append(subreq)
|
||||
else:
|
||||
self.assertTrue(subrequests)
|
||||
body = (subrequests[-1]).setdefault('body', '')
|
||||
body += line[1]
|
||||
subrequests[-1]['body'] = body
|
||||
|
||||
def rx_updates(results, line):
|
||||
self.assertEqual('rx', line[0])
|
||||
results.setdefault['rx_updates'].append(line[1])
|
||||
|
||||
def unexpected(results, line):
|
||||
results.setdefault('unexpected', []).append(line)
|
||||
|
||||
# each trace line is a tuple of ([tx|rx], msg)
|
||||
handshakes = iter([(('tx', ':MISSING_CHECK: START'), tx_missing),
|
||||
(('tx', ':MISSING_CHECK: END'), unexpected),
|
||||
(('rx', ':MISSING_CHECK: START'), rx_missing),
|
||||
(('rx', ':MISSING_CHECK: END'), unexpected),
|
||||
(('tx', ':UPDATES: START'), tx_updates),
|
||||
(('tx', ':UPDATES: END'), unexpected),
|
||||
(('rx', ':UPDATES: START'), rx_updates),
|
||||
(('rx', ':UPDATES: END'), unexpected)])
|
||||
expect_handshake = next(handshakes)
|
||||
phases = ('tx_missing', 'rx_missing', 'tx_updates', 'rx_updates')
|
||||
results = dict((k, []) for k in phases)
|
||||
handler = unexpected
|
||||
lines = list(trace.get('messages', []))
|
||||
lines.reverse()
|
||||
while lines:
|
||||
line = lines.pop()
|
||||
if line == expect_handshake[0]:
|
||||
handler = expect_handshake[1]
|
||||
try:
|
||||
expect_handshake = next(handshakes)
|
||||
except StopIteration:
|
||||
# should be the last line
|
||||
self.assertFalse(
|
||||
lines, 'Unexpected trailing lines %s' % lines)
|
||||
continue
|
||||
handler(results, line)
|
||||
|
||||
try:
|
||||
# check all handshakes occurred
|
||||
missed = next(handshakes)
|
||||
self.fail('Handshake %s not found' % str(missed[0]))
|
||||
except StopIteration:
|
||||
pass
|
||||
# check no message outside of a phase
|
||||
self.assertFalse(results.get('unexpected'),
|
||||
'Message outside of a phase: %s' % results.get(None))
|
||||
return results
|
||||
|
||||
def _verify_ondisk_files(self, tx_objs, policy, tx_frag_index=None,
|
||||
rx_frag_index=None):
|
||||
"""
|
||||
Verify tx and rx files that should be in sync.
|
||||
:param tx_objs: sender diskfiles
|
||||
:param policy: storage policy instance
|
||||
:param tx_frag_index: the fragment index of tx diskfiles that should
|
||||
have been used as a source for sync'ing
|
||||
:param rx_frag_index: the fragment index of expected rx diskfiles
|
||||
"""
|
||||
for o_name, diskfiles in tx_objs.items():
|
||||
for tx_df in diskfiles:
|
||||
# check tx file still intact - ssync does not do any cleanup!
|
||||
tx_df.open()
|
||||
if tx_frag_index is None or tx_df._frag_index == tx_frag_index:
|
||||
# this diskfile should have been sync'd,
|
||||
# check rx file is ok
|
||||
rx_df = self._open_rx_diskfile(
|
||||
o_name, policy, rx_frag_index)
|
||||
# for EC revert job or replication etags should match
|
||||
match_etag = (tx_frag_index == rx_frag_index)
|
||||
self._verify_diskfile_sync(
|
||||
tx_df, rx_df, rx_frag_index, match_etag)
|
||||
else:
|
||||
# this diskfile should not have been sync'd,
|
||||
# check no rx file,
|
||||
self.assertRaises(DiskFileNotExist, self._open_rx_diskfile,
|
||||
o_name, policy,
|
||||
frag_index=tx_df._frag_index)
|
||||
|
||||
def _verify_tombstones(self, tx_objs, policy):
|
||||
# verify tx and rx tombstones that should be in sync
|
||||
for o_name, diskfiles in tx_objs.items():
|
||||
try:
|
||||
self._open_tx_diskfile(o_name, policy)
|
||||
self.fail('DiskFileDeleted expected')
|
||||
except DiskFileDeleted as exc:
|
||||
tx_delete_time = exc.timestamp
|
||||
try:
|
||||
self._open_rx_diskfile(o_name, policy)
|
||||
self.fail('DiskFileDeleted expected')
|
||||
except DiskFileDeleted as exc:
|
||||
rx_delete_time = exc.timestamp
|
||||
self.assertEqual(tx_delete_time, rx_delete_time)
|
||||
|
||||
|
||||
@patch_policies(with_ec_default=True)
|
||||
class TestSsyncEC(TestBaseSsync):
|
||||
def test_handoff_fragment_revert(self):
|
||||
# test that a sync_revert type job does send the correct frag archives
|
||||
# to the receiver
|
||||
policy = POLICIES.default
|
||||
rx_node_index = 0
|
||||
tx_node_index = 1
|
||||
# for a revert job we iterate over frag index that belongs on
|
||||
# remote node
|
||||
frag_index = rx_node_index
|
||||
|
||||
# create sender side diskfiles...
|
||||
tx_objs = {}
|
||||
rx_objs = {}
|
||||
tx_tombstones = {}
|
||||
tx_df_mgr = self.daemon._diskfile_router[policy]
|
||||
rx_df_mgr = self.rx_controller._diskfile_router[policy]
|
||||
# o1 has primary and handoff fragment archives
|
||||
t1 = next(self.ts_iter)
|
||||
tx_objs['o1'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o1', policy, t1, (rx_node_index, tx_node_index))
|
||||
# o2 only has primary
|
||||
t2 = next(self.ts_iter)
|
||||
tx_objs['o2'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o2', policy, t2, (tx_node_index,))
|
||||
# o3 only has handoff
|
||||
t3 = next(self.ts_iter)
|
||||
tx_objs['o3'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o3', policy, t3, (rx_node_index,))
|
||||
# o4 primary and handoff fragment archives on tx, handoff in sync on rx
|
||||
t4 = next(self.ts_iter)
|
||||
tx_objs['o4'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o4', policy, t4, (tx_node_index, rx_node_index,))
|
||||
rx_objs['o4'] = self._create_ondisk_files(
|
||||
rx_df_mgr, 'o4', policy, t4, (rx_node_index,))
|
||||
# o5 is a tombstone, missing on receiver
|
||||
t5 = next(self.ts_iter)
|
||||
tx_tombstones['o5'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o5', policy, t5, (tx_node_index,))
|
||||
tx_tombstones['o5'][0].delete(t5)
|
||||
|
||||
suffixes = set()
|
||||
for diskfiles in (tx_objs.values() + tx_tombstones.values()):
|
||||
for df in diskfiles:
|
||||
suffixes.add(os.path.basename(os.path.dirname(df._datadir)))
|
||||
|
||||
# create ssync sender instance...
|
||||
job = {'device': self.device,
|
||||
'partition': self.partition,
|
||||
'policy': policy,
|
||||
'frag_index': frag_index}
|
||||
node = dict(self.rx_node)
|
||||
node.update({'index': rx_node_index})
|
||||
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
|
||||
# wrap connection from tx to rx to capture ssync messages...
|
||||
sender.connect, trace = self.make_connect_wrapper(sender)
|
||||
|
||||
# run the sync protocol...
|
||||
sender()
|
||||
|
||||
# verify protocol
|
||||
results = self._analyze_trace(trace)
|
||||
# sender has handoff frags for o1, o3 and o4 and ts for o5
|
||||
self.assertEqual(4, len(results['tx_missing']))
|
||||
# receiver is missing frags for o1, o3 and ts for o5
|
||||
self.assertEqual(3, len(results['rx_missing']))
|
||||
self.assertEqual(3, len(results['tx_updates']))
|
||||
self.assertFalse(results['rx_updates'])
|
||||
sync_paths = []
|
||||
for subreq in results.get('tx_updates'):
|
||||
if subreq.get('method') == 'PUT':
|
||||
self.assertTrue(
|
||||
'X-Object-Sysmeta-Ec-Frag-Index: %s' % rx_node_index
|
||||
in subreq.get('headers'))
|
||||
expected_body = '%s___%s' % (subreq['path'], rx_node_index)
|
||||
self.assertEqual(expected_body, subreq['body'])
|
||||
elif subreq.get('method') == 'DELETE':
|
||||
self.assertEqual('/a/c/o5', subreq['path'])
|
||||
sync_paths.append(subreq.get('path'))
|
||||
self.assertEqual(['/a/c/o1', '/a/c/o3', '/a/c/o5'], sorted(sync_paths))
|
||||
|
||||
# verify on disk files...
|
||||
self._verify_ondisk_files(
|
||||
tx_objs, policy, frag_index, rx_node_index)
|
||||
self._verify_tombstones(tx_tombstones, policy)
|
||||
|
||||
def test_fragment_sync(self):
|
||||
# check that a sync_only type job does call reconstructor to build a
|
||||
# diskfile to send, and continues making progress despite an error
|
||||
# when building one diskfile
|
||||
policy = POLICIES.default
|
||||
rx_node_index = 0
|
||||
tx_node_index = 1
|
||||
# for a sync job we iterate over frag index that belongs on local node
|
||||
frag_index = tx_node_index
|
||||
|
||||
# create sender side diskfiles...
|
||||
tx_objs = {}
|
||||
tx_tombstones = {}
|
||||
rx_objs = {}
|
||||
tx_df_mgr = self.daemon._diskfile_router[policy]
|
||||
rx_df_mgr = self.rx_controller._diskfile_router[policy]
|
||||
# o1 only has primary
|
||||
t1 = next(self.ts_iter)
|
||||
tx_objs['o1'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o1', policy, t1, (tx_node_index,))
|
||||
# o2 only has primary
|
||||
t2 = next(self.ts_iter)
|
||||
tx_objs['o2'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o2', policy, t2, (tx_node_index,))
|
||||
# o3 only has primary
|
||||
t3 = next(self.ts_iter)
|
||||
tx_objs['o3'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o3', policy, t3, (tx_node_index,))
|
||||
# o4 primary fragment archives on tx, handoff in sync on rx
|
||||
t4 = next(self.ts_iter)
|
||||
tx_objs['o4'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o4', policy, t4, (tx_node_index,))
|
||||
rx_objs['o4'] = self._create_ondisk_files(
|
||||
rx_df_mgr, 'o4', policy, t4, (rx_node_index,))
|
||||
# o5 is a tombstone, missing on receiver
|
||||
t5 = next(self.ts_iter)
|
||||
tx_tombstones['o5'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o5', policy, t5, (tx_node_index,))
|
||||
tx_tombstones['o5'][0].delete(t5)
|
||||
|
||||
suffixes = set()
|
||||
for diskfiles in (tx_objs.values() + tx_tombstones.values()):
|
||||
for df in diskfiles:
|
||||
suffixes.add(os.path.basename(os.path.dirname(df._datadir)))
|
||||
|
||||
reconstruct_fa_calls = []
|
||||
|
||||
def fake_reconstruct_fa(job, node, metadata):
|
||||
reconstruct_fa_calls.append((job, node, policy, metadata))
|
||||
if len(reconstruct_fa_calls) == 2:
|
||||
# simulate second reconstruct failing
|
||||
raise DiskFileError
|
||||
content = '%s___%s' % (metadata['name'], rx_node_index)
|
||||
return RebuildingECDiskFileStream(
|
||||
metadata, rx_node_index, iter([content]))
|
||||
|
||||
# create ssync sender instance...
|
||||
job = {'device': self.device,
|
||||
'partition': self.partition,
|
||||
'policy': policy,
|
||||
'frag_index': frag_index,
|
||||
'sync_diskfile_builder': fake_reconstruct_fa}
|
||||
node = dict(self.rx_node)
|
||||
node.update({'index': rx_node_index})
|
||||
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
|
||||
# wrap connection from tx to rx to capture ssync messages...
|
||||
sender.connect, trace = self.make_connect_wrapper(sender)
|
||||
|
||||
# run the sync protocol...
|
||||
sender()
|
||||
|
||||
# verify protocol
|
||||
results = self._analyze_trace(trace)
|
||||
# sender has primary for o1, o2 and o3, o4 and ts for o5
|
||||
self.assertEqual(5, len(results['tx_missing']))
|
||||
# receiver is missing o1, o2 and o3 and ts for o5
|
||||
self.assertEqual(4, len(results['rx_missing']))
|
||||
# sender can only construct 2 out of 3 missing frags
|
||||
self.assertEqual(3, len(results['tx_updates']))
|
||||
self.assertEqual(3, len(reconstruct_fa_calls))
|
||||
self.assertFalse(results['rx_updates'])
|
||||
actual_sync_paths = []
|
||||
for subreq in results.get('tx_updates'):
|
||||
if subreq.get('method') == 'PUT':
|
||||
self.assertTrue(
|
||||
'X-Object-Sysmeta-Ec-Frag-Index: %s' % rx_node_index
|
||||
in subreq.get('headers'))
|
||||
expected_body = '%s___%s' % (subreq['path'], rx_node_index)
|
||||
self.assertEqual(expected_body, subreq['body'])
|
||||
elif subreq.get('method') == 'DELETE':
|
||||
self.assertEqual('/a/c/o5', subreq['path'])
|
||||
actual_sync_paths.append(subreq.get('path'))
|
||||
|
||||
# remove the failed df from expected synced df's
|
||||
expect_sync_paths = ['/a/c/o1', '/a/c/o2', '/a/c/o3', '/a/c/o5']
|
||||
failed_path = reconstruct_fa_calls[1][3]['name']
|
||||
expect_sync_paths.remove(failed_path)
|
||||
failed_obj = None
|
||||
for obj, diskfiles in tx_objs.items():
|
||||
if diskfiles[0]._name == failed_path:
|
||||
failed_obj = obj
|
||||
# sanity check
|
||||
self.assertTrue(tx_objs.pop(failed_obj))
|
||||
|
||||
# verify on disk files...
|
||||
self.assertEqual(sorted(expect_sync_paths), sorted(actual_sync_paths))
|
||||
self._verify_ondisk_files(
|
||||
tx_objs, policy, frag_index, rx_node_index)
|
||||
self._verify_tombstones(tx_tombstones, policy)
|
||||
|
||||
def test_send_with_frag_index_none(self):
|
||||
policy = POLICIES.default
|
||||
tx_df_mgr = self.daemon._diskfile_router[policy]
|
||||
rx_df_mgr = self.rx_controller._diskfile_router[policy]
|
||||
# create an ec fragment on the remote node
|
||||
ts1 = next(self.ts_iter)
|
||||
remote_df = self._create_ondisk_files(
|
||||
rx_df_mgr, 'o', policy, ts1, (3,))[0]
|
||||
|
||||
# create a tombstone on the local node
|
||||
df = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o', policy, ts1, (3,))[0]
|
||||
suffix = os.path.basename(os.path.dirname(df._datadir))
|
||||
ts2 = next(self.ts_iter)
|
||||
df.delete(ts2)
|
||||
# a reconstructor revert job with only tombstones will have frag_index
|
||||
# explicitly set to None
|
||||
job = {
|
||||
'frag_index': None,
|
||||
'partition': self.partition,
|
||||
'policy': policy,
|
||||
'device': self.device,
|
||||
}
|
||||
sender = ssync_sender.Sender(
|
||||
self.daemon, self.rx_node, job, [suffix])
|
||||
success, _ = sender()
|
||||
self.assertTrue(success)
|
||||
try:
|
||||
remote_df.read_metadata()
|
||||
except DiskFileDeleted as e:
|
||||
self.assertEqual(e.timestamp, ts2)
|
||||
else:
|
||||
self.fail('Successfully opened remote DiskFile')
|
||||
|
||||
def test_send_invalid_frag_index(self):
|
||||
policy = POLICIES.default
|
||||
job = {'frag_index': 'Not a number',
|
||||
'device': self.device,
|
||||
'partition': self.partition,
|
||||
'policy': policy}
|
||||
sender = ssync_sender.Sender(
|
||||
self.daemon, self.rx_node, job, ['abc'])
|
||||
success, _ = sender()
|
||||
self.assertFalse(success)
|
||||
error_log_lines = self.daemon.logger.get_lines_for_level('error')
|
||||
self.assertEqual(1, len(error_log_lines))
|
||||
error_msg = error_log_lines[0]
|
||||
self.assertIn("Expected status 200; got 400", error_msg)
|
||||
self.assertIn("Invalid X-Backend-Ssync-Frag-Index 'Not a number'",
|
||||
error_msg)
|
||||
|
||||
|
||||
@patch_policies
|
||||
class TestSsyncReplication(TestBaseSsync):
|
||||
def test_sync(self):
|
||||
policy = POLICIES.default
|
||||
rx_node_index = 0
|
||||
|
||||
# create sender side diskfiles...
|
||||
tx_objs = {}
|
||||
rx_objs = {}
|
||||
tx_tombstones = {}
|
||||
rx_tombstones = {}
|
||||
tx_df_mgr = self.daemon._diskfile_router[policy]
|
||||
rx_df_mgr = self.rx_controller._diskfile_router[policy]
|
||||
# o1 and o2 are on tx only
|
||||
t1 = next(self.ts_iter)
|
||||
tx_objs['o1'] = self._create_ondisk_files(tx_df_mgr, 'o1', policy, t1)
|
||||
t2 = next(self.ts_iter)
|
||||
tx_objs['o2'] = self._create_ondisk_files(tx_df_mgr, 'o2', policy, t2)
|
||||
# o3 is on tx and older copy on rx
|
||||
t3a = next(self.ts_iter)
|
||||
rx_objs['o3'] = self._create_ondisk_files(rx_df_mgr, 'o3', policy, t3a)
|
||||
t3b = next(self.ts_iter)
|
||||
tx_objs['o3'] = self._create_ondisk_files(tx_df_mgr, 'o3', policy, t3b)
|
||||
# o4 in sync on rx and tx
|
||||
t4 = next(self.ts_iter)
|
||||
tx_objs['o4'] = self._create_ondisk_files(tx_df_mgr, 'o4', policy, t4)
|
||||
rx_objs['o4'] = self._create_ondisk_files(rx_df_mgr, 'o4', policy, t4)
|
||||
# o5 is a tombstone, missing on receiver
|
||||
t5 = next(self.ts_iter)
|
||||
tx_tombstones['o5'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o5', policy, t5)
|
||||
tx_tombstones['o5'][0].delete(t5)
|
||||
# o6 is a tombstone, in sync on tx and rx
|
||||
t6 = next(self.ts_iter)
|
||||
tx_tombstones['o6'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o6', policy, t6)
|
||||
tx_tombstones['o6'][0].delete(t6)
|
||||
rx_tombstones['o6'] = self._create_ondisk_files(
|
||||
rx_df_mgr, 'o6', policy, t6)
|
||||
rx_tombstones['o6'][0].delete(t6)
|
||||
# o7 is a tombstone on tx, older data on rx
|
||||
t7a = next(self.ts_iter)
|
||||
rx_objs['o7'] = self._create_ondisk_files(rx_df_mgr, 'o7', policy, t7a)
|
||||
t7b = next(self.ts_iter)
|
||||
tx_tombstones['o7'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o7', policy, t7b)
|
||||
tx_tombstones['o7'][0].delete(t7b)
|
||||
|
||||
suffixes = set()
|
||||
for diskfiles in (tx_objs.values() + tx_tombstones.values()):
|
||||
for df in diskfiles:
|
||||
suffixes.add(os.path.basename(os.path.dirname(df._datadir)))
|
||||
|
||||
# create ssync sender instance...
|
||||
job = {'device': self.device,
|
||||
'partition': self.partition,
|
||||
'policy': policy}
|
||||
node = dict(self.rx_node)
|
||||
node.update({'index': rx_node_index})
|
||||
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
|
||||
# wrap connection from tx to rx to capture ssync messages...
|
||||
sender.connect, trace = self.make_connect_wrapper(sender)
|
||||
|
||||
# run the sync protocol...
|
||||
success, in_sync_objs = sender()
|
||||
|
||||
self.assertEqual(7, len(in_sync_objs))
|
||||
self.assertTrue(success)
|
||||
|
||||
# verify protocol
|
||||
results = self._analyze_trace(trace)
|
||||
self.assertEqual(7, len(results['tx_missing']))
|
||||
self.assertEqual(5, len(results['rx_missing']))
|
||||
self.assertEqual(5, len(results['tx_updates']))
|
||||
self.assertFalse(results['rx_updates'])
|
||||
sync_paths = []
|
||||
for subreq in results.get('tx_updates'):
|
||||
if subreq.get('method') == 'PUT':
|
||||
self.assertTrue(
|
||||
subreq['path'] in ('/a/c/o1', '/a/c/o2', '/a/c/o3'))
|
||||
expected_body = '%s___None' % subreq['path']
|
||||
self.assertEqual(expected_body, subreq['body'])
|
||||
elif subreq.get('method') == 'DELETE':
|
||||
self.assertTrue(subreq['path'] in ('/a/c/o5', '/a/c/o7'))
|
||||
sync_paths.append(subreq.get('path'))
|
||||
self.assertEqual(
|
||||
['/a/c/o1', '/a/c/o2', '/a/c/o3', '/a/c/o5', '/a/c/o7'],
|
||||
sorted(sync_paths))
|
||||
|
||||
# verify on disk files...
|
||||
self._verify_ondisk_files(tx_objs, policy)
|
||||
self._verify_tombstones(tx_tombstones, policy)
|
||||
|
||||
def test_nothing_to_sync(self):
|
||||
job = {'device': self.device,
|
||||
'partition': self.partition,
|
||||
'policy': POLICIES.default}
|
||||
node = {'replication_ip': self.rx_ip,
|
||||
'replication_port': self.rx_port,
|
||||
'device': self.device,
|
||||
'index': 0}
|
||||
sender = ssync_sender.Sender(self.daemon, node, job, ['abc'])
|
||||
# wrap connection from tx to rx to capture ssync messages...
|
||||
sender.connect, trace = self.make_connect_wrapper(sender)
|
||||
|
||||
result, in_sync_objs = sender()
|
||||
|
||||
self.assertTrue(result)
|
||||
self.assertFalse(in_sync_objs)
|
||||
results = self._analyze_trace(trace)
|
||||
self.assertFalse(results['tx_missing'])
|
||||
self.assertFalse(results['rx_missing'])
|
||||
self.assertFalse(results['tx_updates'])
|
||||
self.assertFalse(results['rx_updates'])
|
||||
# Minimal receiver response as read by sender:
|
||||
# 2 <-- initial \r\n to start ssync exchange
|
||||
# + 23 <-- :MISSING CHECK START\r\n
|
||||
# + 2 <-- \r\n (minimal missing check response)
|
||||
# + 21 <-- :MISSING CHECK END\r\n
|
||||
# + 17 <-- :UPDATES START\r\n
|
||||
# + 15 <-- :UPDATES END\r\n
|
||||
# TOTAL = 80
|
||||
self.assertEqual(80, trace.get('readline_bytes'))
|
||||
|
||||
def test_meta_file_sync(self):
|
||||
policy = POLICIES.default
|
||||
rx_node_index = 0
|
||||
|
||||
# create diskfiles...
|
||||
tx_objs = {}
|
||||
rx_objs = {}
|
||||
tx_tombstones = {}
|
||||
rx_tombstones = {}
|
||||
tx_df_mgr = self.daemon._diskfile_router[policy]
|
||||
rx_df_mgr = self.rx_controller._diskfile_router[policy]
|
||||
|
||||
expected_subreqs = defaultdict(list)
|
||||
|
||||
# o1 on tx only with meta file
|
||||
t1 = next(self.ts_iter)
|
||||
tx_objs['o1'] = self._create_ondisk_files(tx_df_mgr, 'o1', policy, t1)
|
||||
t1_meta = next(self.ts_iter)
|
||||
metadata = {'X-Timestamp': t1_meta.internal,
|
||||
'X-Object-Meta-Test': 'o1',
|
||||
'X-Object-Sysmeta-Test': 'sys_o1'}
|
||||
tx_objs['o1'][0].write_metadata(metadata)
|
||||
expected_subreqs['PUT'].append('o1')
|
||||
expected_subreqs['POST'].append('o1')
|
||||
|
||||
# o2 on tx with meta, on rx without meta
|
||||
t2 = next(self.ts_iter)
|
||||
tx_objs['o2'] = self._create_ondisk_files(tx_df_mgr, 'o2', policy, t2)
|
||||
t2_meta = next(self.ts_iter)
|
||||
metadata = {'X-Timestamp': t2_meta.internal,
|
||||
'X-Object-Meta-Test': 'o2',
|
||||
'X-Object-Sysmeta-Test': 'sys_o2'}
|
||||
tx_objs['o2'][0].write_metadata(metadata)
|
||||
rx_objs['o2'] = self._create_ondisk_files(rx_df_mgr, 'o2', policy, t2)
|
||||
expected_subreqs['POST'].append('o2')
|
||||
|
||||
# o3 is on tx with meta, rx has newer data but no meta
|
||||
t3a = next(self.ts_iter)
|
||||
tx_objs['o3'] = self._create_ondisk_files(tx_df_mgr, 'o3', policy, t3a)
|
||||
t3b = next(self.ts_iter)
|
||||
rx_objs['o3'] = self._create_ondisk_files(rx_df_mgr, 'o3', policy, t3b)
|
||||
t3_meta = next(self.ts_iter)
|
||||
metadata = {'X-Timestamp': t3_meta.internal,
|
||||
'X-Object-Meta-Test': 'o3',
|
||||
'X-Object-Sysmeta-Test': 'sys_o3'}
|
||||
tx_objs['o3'][0].write_metadata(metadata)
|
||||
expected_subreqs['POST'].append('o3')
|
||||
|
||||
# o4 is on tx with meta, rx has older data and up to date meta
|
||||
t4a = next(self.ts_iter)
|
||||
rx_objs['o4'] = self._create_ondisk_files(rx_df_mgr, 'o4', policy, t4a)
|
||||
t4b = next(self.ts_iter)
|
||||
tx_objs['o4'] = self._create_ondisk_files(tx_df_mgr, 'o4', policy, t4b)
|
||||
t4_meta = next(self.ts_iter)
|
||||
metadata = {'X-Timestamp': t4_meta.internal,
|
||||
'X-Object-Meta-Test': 'o4',
|
||||
'X-Object-Sysmeta-Test': 'sys_o4'}
|
||||
tx_objs['o4'][0].write_metadata(metadata)
|
||||
rx_objs['o4'][0].write_metadata(metadata)
|
||||
expected_subreqs['PUT'].append('o4')
|
||||
|
||||
# o5 is on tx with meta, rx is in sync with data and meta
|
||||
t5 = next(self.ts_iter)
|
||||
rx_objs['o5'] = self._create_ondisk_files(rx_df_mgr, 'o5', policy, t5)
|
||||
tx_objs['o5'] = self._create_ondisk_files(tx_df_mgr, 'o5', policy, t5)
|
||||
t5_meta = next(self.ts_iter)
|
||||
metadata = {'X-Timestamp': t5_meta.internal,
|
||||
'X-Object-Meta-Test': 'o5',
|
||||
'X-Object-Sysmeta-Test': 'sys_o5'}
|
||||
tx_objs['o5'][0].write_metadata(metadata)
|
||||
rx_objs['o5'][0].write_metadata(metadata)
|
||||
|
||||
# o6 is tombstone on tx, rx has older data and meta
|
||||
t6 = next(self.ts_iter)
|
||||
tx_tombstones['o6'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o6', policy, t6)
|
||||
rx_tombstones['o6'] = self._create_ondisk_files(
|
||||
rx_df_mgr, 'o6', policy, t6)
|
||||
metadata = {'X-Timestamp': next(self.ts_iter).internal,
|
||||
'X-Object-Meta-Test': 'o6',
|
||||
'X-Object-Sysmeta-Test': 'sys_o6'}
|
||||
rx_tombstones['o6'][0].write_metadata(metadata)
|
||||
tx_tombstones['o6'][0].delete(next(self.ts_iter))
|
||||
expected_subreqs['DELETE'].append('o6')
|
||||
|
||||
# o7 is tombstone on rx, tx has older data and meta,
|
||||
# no subreqs expected...
|
||||
t7 = next(self.ts_iter)
|
||||
tx_objs['o7'] = self._create_ondisk_files(tx_df_mgr, 'o7', policy, t7)
|
||||
rx_tombstones['o7'] = self._create_ondisk_files(
|
||||
rx_df_mgr, 'o7', policy, t7)
|
||||
metadata = {'X-Timestamp': next(self.ts_iter).internal,
|
||||
'X-Object-Meta-Test': 'o7',
|
||||
'X-Object-Sysmeta-Test': 'sys_o7'}
|
||||
tx_objs['o7'][0].write_metadata(metadata)
|
||||
rx_tombstones['o7'][0].delete(next(self.ts_iter))
|
||||
|
||||
suffixes = set()
|
||||
for diskfiles in (tx_objs.values() + tx_tombstones.values()):
|
||||
for df in diskfiles:
|
||||
suffixes.add(os.path.basename(os.path.dirname(df._datadir)))
|
||||
|
||||
# create ssync sender instance...
|
||||
job = {'device': self.device,
|
||||
'partition': self.partition,
|
||||
'policy': policy}
|
||||
node = dict(self.rx_node)
|
||||
node.update({'index': rx_node_index})
|
||||
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
|
||||
# wrap connection from tx to rx to capture ssync messages...
|
||||
sender.connect, trace = self.make_connect_wrapper(sender)
|
||||
|
||||
# run the sync protocol...
|
||||
success, in_sync_objs = sender()
|
||||
|
||||
self.assertEqual(7, len(in_sync_objs))
|
||||
self.assertTrue(success)
|
||||
|
||||
# verify protocol
|
||||
results = self._analyze_trace(trace)
|
||||
self.assertEqual(7, len(results['tx_missing']))
|
||||
self.assertEqual(5, len(results['rx_missing']))
|
||||
for subreq in results.get('tx_updates'):
|
||||
obj = subreq['path'].split('/')[3]
|
||||
method = subreq['method']
|
||||
self.assertTrue(obj in expected_subreqs[method],
|
||||
'Unexpected %s subreq for object %s, expected %s'
|
||||
% (method, obj, expected_subreqs[method]))
|
||||
expected_subreqs[method].remove(obj)
|
||||
if method == 'PUT':
|
||||
expected_body = '%s___None' % subreq['path']
|
||||
self.assertEqual(expected_body, subreq['body'])
|
||||
# verify all expected subreqs consumed
|
||||
for _method, expected in expected_subreqs.items():
|
||||
self.assertFalse(expected)
|
||||
self.assertFalse(results['rx_updates'])
|
||||
|
||||
# verify on disk files...
|
||||
del tx_objs['o7'] # o7 not expected to be sync'd
|
||||
self._verify_ondisk_files(tx_objs, policy)
|
||||
self._verify_tombstones(tx_tombstones, policy)
|
||||
for oname, rx_obj in rx_objs.items():
|
||||
df = rx_obj[0].open()
|
||||
metadata = df.get_metadata()
|
||||
self.assertEqual(metadata['X-Object-Meta-Test'], oname)
|
||||
self.assertEqual(metadata['X-Object-Sysmeta-Test'], 'sys_' + oname)
|
||||
|
||||
def test_meta_file_not_synced_to_legacy_receiver(self):
|
||||
# verify that the sender does sync a data file to a legacy receiver,
|
||||
# but does not PUT meta file content to a legacy receiver
|
||||
policy = POLICIES.default
|
||||
rx_node_index = 0
|
||||
|
||||
# create diskfiles...
|
||||
tx_df_mgr = self.daemon._diskfile_router[policy]
|
||||
rx_df_mgr = self.rx_controller._diskfile_router[policy]
|
||||
|
||||
# rx has data at t1 but no meta
|
||||
# object is on tx with data at t2, meta at t3,
|
||||
t1 = next(self.ts_iter)
|
||||
self._create_ondisk_files(rx_df_mgr, 'o1', policy, t1)
|
||||
t2 = next(self.ts_iter)
|
||||
tx_obj = self._create_ondisk_files(tx_df_mgr, 'o1', policy, t2)[0]
|
||||
t3 = next(self.ts_iter)
|
||||
metadata = {'X-Timestamp': t3.internal,
|
||||
'X-Object-Meta-Test': 'o3',
|
||||
'X-Object-Sysmeta-Test': 'sys_o3'}
|
||||
tx_obj.write_metadata(metadata)
|
||||
|
||||
suffixes = [os.path.basename(os.path.dirname(tx_obj._datadir))]
|
||||
# create ssync sender instance...
|
||||
job = {'device': self.device,
|
||||
'partition': self.partition,
|
||||
'policy': policy}
|
||||
node = dict(self.rx_node)
|
||||
node.update({'index': rx_node_index})
|
||||
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
|
||||
# wrap connection from tx to rx to capture ssync messages...
|
||||
sender.connect, trace = self.make_connect_wrapper(sender)
|
||||
|
||||
def _legacy_check_missing(self, line):
|
||||
# reproduces behavior of 'legacy' ssync receiver missing_checks()
|
||||
parts = line.split()
|
||||
object_hash = urllib.parse.unquote(parts[0])
|
||||
timestamp = urllib.parse.unquote(parts[1])
|
||||
want = False
|
||||
try:
|
||||
df = self.diskfile_mgr.get_diskfile_from_hash(
|
||||
self.device, self.partition, object_hash, self.policy,
|
||||
frag_index=self.frag_index)
|
||||
except DiskFileNotExist:
|
||||
want = True
|
||||
else:
|
||||
try:
|
||||
df.open()
|
||||
except DiskFileDeleted as err:
|
||||
want = err.timestamp < timestamp
|
||||
except DiskFileError:
|
||||
want = True
|
||||
else:
|
||||
want = df.timestamp < timestamp
|
||||
if want:
|
||||
return urllib.parse.quote(object_hash)
|
||||
return None
|
||||
|
||||
# run the sync protocol...
|
||||
func = 'swift.obj.ssync_receiver.Receiver._check_missing'
|
||||
with mock.patch(func, _legacy_check_missing):
|
||||
success, in_sync_objs = sender()
|
||||
|
||||
self.assertEqual(1, len(in_sync_objs))
|
||||
self.assertTrue(success)
|
||||
|
||||
# verify protocol, expecting only a PUT to legacy receiver
|
||||
results = self._analyze_trace(trace)
|
||||
self.assertEqual(1, len(results['tx_missing']))
|
||||
self.assertEqual(1, len(results['rx_missing']))
|
||||
self.assertEqual(1, len(results['tx_updates']))
|
||||
self.assertEqual('PUT', results['tx_updates'][0]['method'])
|
||||
self.assertFalse(results['rx_updates'])
|
||||
|
||||
# verify on disk files...
|
||||
rx_obj = self._open_rx_diskfile('o1', policy)
|
||||
tx_obj = self._open_tx_diskfile('o1', policy)
|
||||
# with legacy behavior rx_obj data and meta timestamps are equal
|
||||
self.assertEqual(t2, rx_obj.data_timestamp)
|
||||
self.assertEqual(t2, rx_obj.timestamp)
|
||||
# with legacy behavior rx_obj data timestamp should equal tx_obj
|
||||
self.assertEqual(rx_obj.data_timestamp, tx_obj.data_timestamp)
|
||||
# tx meta file should not have been sync'd to rx data file
|
||||
self.assertNotIn('X-Object-Meta-Test', rx_obj.get_metadata())
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
@ -13,7 +13,6 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import hashlib
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
@ -26,7 +25,7 @@ import six
|
||||
from swift.common import bufferedhttp
|
||||
from swift.common import exceptions
|
||||
from swift.common import swob
|
||||
from swift.common.storage_policy import POLICIES, REPL_POLICY
|
||||
from swift.common.storage_policy import POLICIES
|
||||
from swift.common import utils
|
||||
from swift.common.swob import HTTPException
|
||||
from swift.obj import diskfile
|
||||
@ -1809,51 +1808,28 @@ class TestSsyncRxServer(unittest.TestCase):
|
||||
# server socket.
|
||||
|
||||
def setUp(self):
|
||||
self.ts = unit.make_timestamp_iter()
|
||||
self.rx_ip = '127.0.0.1'
|
||||
# dirs
|
||||
self.tmpdir = tempfile.mkdtemp()
|
||||
self.tempdir = os.path.join(self.tmpdir, 'tmp_test_obj_server')
|
||||
|
||||
self.rx_devices = os.path.join(self.tempdir, 'rx/node')
|
||||
self.tx_devices = os.path.join(self.tempdir, 'tx/node')
|
||||
self.devices = os.path.join(self.tempdir, 'srv/node')
|
||||
for device in ('sda1', 'sdb1'):
|
||||
for root in (self.rx_devices, self.tx_devices):
|
||||
os.makedirs(os.path.join(root, device))
|
||||
os.makedirs(os.path.join(self.devices, device))
|
||||
|
||||
self.conf = {
|
||||
'devices': self.rx_devices,
|
||||
'devices': self.devices,
|
||||
'swift_dir': self.tempdir,
|
||||
'mount_check': False,
|
||||
}
|
||||
self.rx_logger = debug_logger('test-object-server')
|
||||
self.rx_app = server.ObjectController(self.conf, logger=self.rx_logger)
|
||||
rx_server = server.ObjectController(self.conf, logger=self.rx_logger)
|
||||
self.sock = eventlet.listen((self.rx_ip, 0))
|
||||
self.rx_server = eventlet.spawn(
|
||||
eventlet.wsgi.server, self.sock, self.rx_app, utils.NullLogger())
|
||||
eventlet.wsgi.server, self.sock, rx_server, utils.NullLogger())
|
||||
self.rx_port = self.sock.getsockname()[1]
|
||||
self.tx_logger = debug_logger('test-daemon')
|
||||
self.policy = POLICIES[0]
|
||||
self.conf['devices'] = self.tx_devices
|
||||
self.tx_logger = debug_logger('test-reconstructor')
|
||||
self.daemon = ObjectReconstructor(self.conf, self.tx_logger)
|
||||
self.daemon._diskfile_mgr = self.daemon._df_router[self.policy]
|
||||
|
||||
self.nodes = [
|
||||
{
|
||||
'device': 'sda1',
|
||||
'ip': '127.0.0.1',
|
||||
'replication_ip': '127.0.0.1',
|
||||
'port': self.rx_port,
|
||||
'replication_port': self.rx_port,
|
||||
},
|
||||
{
|
||||
'device': 'sdb1',
|
||||
'ip': '127.0.0.1',
|
||||
'replication_ip': '127.0.0.1',
|
||||
'port': self.rx_port,
|
||||
'replication_port': self.rx_port,
|
||||
},
|
||||
]
|
||||
self.daemon._diskfile_mgr = self.daemon._df_router[POLICIES[0]]
|
||||
|
||||
def tearDown(self):
|
||||
self.rx_server.kill()
|
||||
@ -1926,89 +1902,6 @@ class TestSsyncRxServer(unittest.TestCase):
|
||||
# sanity check that the receiver did not proceed to missing_check
|
||||
self.assertFalse(mock_missing_check.called)
|
||||
|
||||
def test_sender_job_missing_frag_node_indexes(self):
|
||||
# replication jobs don't send frag_index, so we'll use a REPL_POLICY
|
||||
repl_policy = POLICIES[1]
|
||||
self.assertEqual(repl_policy.policy_type, REPL_POLICY)
|
||||
repl_mgr = self.daemon._df_router[repl_policy]
|
||||
self.daemon._diskfile_mgr = repl_mgr
|
||||
device = self.nodes[0]['device']
|
||||
# create a replicated object, on sender
|
||||
df = repl_mgr.get_diskfile(device, '0', 'a', 'c', 'o',
|
||||
policy=repl_policy)
|
||||
now = next(self.ts)
|
||||
metadata = {
|
||||
'X-Timestamp': now.internal,
|
||||
'Content-Type': 'text/plain',
|
||||
'Content-Length': '0',
|
||||
'ETag': hashlib.md5('').hexdigest(),
|
||||
}
|
||||
with df.create() as writer:
|
||||
writer.write('')
|
||||
writer.put(metadata)
|
||||
# sanity the object is on the sender
|
||||
self.assertTrue(df._datadir.startswith(self.tx_devices))
|
||||
# setup a ssync job
|
||||
suffix = os.path.basename(os.path.dirname(df._datadir))
|
||||
job = {
|
||||
'partition': 0,
|
||||
'policy': repl_policy,
|
||||
'device': device,
|
||||
}
|
||||
sender = ssync_sender.Sender(
|
||||
self.daemon, self.nodes[0], job, [suffix])
|
||||
success, _ = sender()
|
||||
self.assertTrue(success)
|
||||
# sanity object is synced to receiver
|
||||
remote_df = self.rx_app._diskfile_router[repl_policy].get_diskfile(
|
||||
device, '0', 'a', 'c', 'o', policy=repl_policy)
|
||||
self.assertTrue(remote_df._datadir.startswith(self.rx_devices))
|
||||
self.assertEqual(remote_df.read_metadata(), metadata)
|
||||
|
||||
def test_send_frag_index_none(self):
|
||||
# create an ec fragment on the remote node
|
||||
device = self.nodes[1]['device']
|
||||
remote_df = self.rx_app._diskfile_router[self.policy].get_diskfile(
|
||||
device, '1', 'a', 'c', 'o', policy=self.policy)
|
||||
ts1 = next(self.ts)
|
||||
data = 'frag_archive'
|
||||
metadata = {
|
||||
'ETag': hashlib.md5(data).hexdigest(),
|
||||
'X-Timestamp': ts1.internal,
|
||||
'Content-Length': len(data),
|
||||
'X-Object-Sysmeta-Ec-Frag-Index': '3',
|
||||
}
|
||||
with remote_df.create() as writer:
|
||||
writer.write(data)
|
||||
writer.put(metadata)
|
||||
writer.commit(ts1)
|
||||
# create a tombstone on the local node
|
||||
df = self.daemon._df_router[self.policy].get_diskfile(
|
||||
device, '1', 'a', 'c', 'o', policy=self.policy)
|
||||
suffix = os.path.basename(os.path.dirname(df._datadir))
|
||||
ts2 = next(self.ts)
|
||||
df.delete(ts2)
|
||||
# a reconstructor revert job with only tombstones will have frag_index
|
||||
# explicitly set to None
|
||||
job = {
|
||||
'frag_index': None,
|
||||
'partition': 1,
|
||||
'policy': self.policy,
|
||||
'device': device,
|
||||
}
|
||||
sender = ssync_sender.Sender(
|
||||
self.daemon, self.nodes[1], job, [suffix])
|
||||
success, _ = sender()
|
||||
self.assertTrue(success)
|
||||
# diskfile tombstone synced to receiver's datadir with timestamp
|
||||
self.assertTrue(remote_df._datadir.startswith(self.rx_devices))
|
||||
try:
|
||||
remote_df.read_metadata()
|
||||
except exceptions.DiskFileDeleted as e:
|
||||
self.assertEqual(e.timestamp, ts2)
|
||||
else:
|
||||
self.fail('Successfully opened remote DiskFile')
|
||||
|
||||
def test_bad_request_invalid_frag_index(self):
|
||||
with mock.patch('swift.obj.ssync_receiver.Receiver.missing_check')\
|
||||
as mock_missing_check:
|
||||
|
@ -12,47 +12,21 @@
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from collections import defaultdict
|
||||
|
||||
import hashlib
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
import time
|
||||
import unittest
|
||||
|
||||
import eventlet
|
||||
import itertools
|
||||
import mock
|
||||
import six
|
||||
from six.moves import urllib
|
||||
|
||||
from swift.common import exceptions, utils
|
||||
from swift.common.storage_policy import POLICIES
|
||||
from swift.common.exceptions import DiskFileNotExist, DiskFileError, \
|
||||
DiskFileDeleted
|
||||
from swift.common.utils import Timestamp
|
||||
from swift.obj import ssync_sender, diskfile, server, ssync_receiver
|
||||
from swift.obj.reconstructor import RebuildingECDiskFileStream
|
||||
from swift.obj import ssync_sender, diskfile, ssync_receiver
|
||||
|
||||
from test.unit import debug_logger, patch_policies, make_timestamp_iter
|
||||
|
||||
|
||||
class FakeReplicator(object):
|
||||
def __init__(self, testdir, policy=None):
|
||||
self.logger = debug_logger('test-ssync-sender')
|
||||
self.conn_timeout = 1
|
||||
self.node_timeout = 2
|
||||
self.http_timeout = 3
|
||||
self.network_chunk_size = 65536
|
||||
self.disk_chunk_size = 4096
|
||||
conf = {
|
||||
'devices': testdir,
|
||||
'mount_check': 'false',
|
||||
}
|
||||
policy = POLICIES.default if policy is None else policy
|
||||
self._diskfile_router = diskfile.DiskFileRouter(conf, self.logger)
|
||||
self._diskfile_mgr = self._diskfile_router[policy]
|
||||
from test.unit import patch_policies, make_timestamp_iter
|
||||
from test.unit.obj.common import FakeReplicator, BaseTest
|
||||
|
||||
|
||||
class NullBufferedHTTPConnection(object):
|
||||
@ -105,49 +79,16 @@ class FakeConnection(object):
|
||||
self.closed = True
|
||||
|
||||
|
||||
class BaseTestSender(unittest.TestCase):
|
||||
@patch_policies()
|
||||
class TestSender(BaseTest):
|
||||
|
||||
def setUp(self):
|
||||
self.tmpdir = tempfile.mkdtemp()
|
||||
super(TestSender, self).setUp()
|
||||
self.testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender')
|
||||
utils.mkdirs(os.path.join(self.testdir, 'dev'))
|
||||
self.daemon = FakeReplicator(self.testdir)
|
||||
self.sender = ssync_sender.Sender(self.daemon, None, None, None)
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.tmpdir, ignore_errors=True)
|
||||
|
||||
def _make_open_diskfile(self, device='dev', partition='9',
|
||||
account='a', container='c', obj='o', body='test',
|
||||
extra_metadata=None, policy=None,
|
||||
frag_index=None, timestamp=None, df_mgr=None):
|
||||
policy = policy or POLICIES.legacy
|
||||
object_parts = account, container, obj
|
||||
timestamp = Timestamp(time.time()) if timestamp is None else timestamp
|
||||
if df_mgr is None:
|
||||
df_mgr = self.daemon._diskfile_router[policy]
|
||||
df = df_mgr.get_diskfile(
|
||||
device, partition, *object_parts, policy=policy,
|
||||
frag_index=frag_index)
|
||||
content_length = len(body)
|
||||
etag = hashlib.md5(body).hexdigest()
|
||||
with df.create() as writer:
|
||||
writer.write(body)
|
||||
metadata = {
|
||||
'X-Timestamp': timestamp.internal,
|
||||
'Content-Length': str(content_length),
|
||||
'ETag': etag,
|
||||
}
|
||||
if extra_metadata:
|
||||
metadata.update(extra_metadata)
|
||||
writer.put(metadata)
|
||||
writer.commit(timestamp)
|
||||
df.open()
|
||||
return df
|
||||
|
||||
|
||||
@patch_policies()
|
||||
class TestSender(BaseTestSender):
|
||||
|
||||
def test_call_catches_MessageTimeout(self):
|
||||
|
||||
def connect(self):
|
||||
@ -1598,826 +1539,6 @@ class TestSender(BaseTestSender):
|
||||
self.assertTrue(self.sender.connection.closed)
|
||||
|
||||
|
||||
class TestBaseSsync(BaseTestSender):
|
||||
"""
|
||||
Provides a framework to test end to end interactions between sender and
|
||||
receiver. The basis for each test is actual diskfile state on either side.
|
||||
The connection between sender and receiver is wrapped to capture ssync
|
||||
traffic for subsequent verification of the protocol. Assertions are made
|
||||
about the final state of the sender and receiver diskfiles.
|
||||
"""
|
||||
|
||||
def make_connect_wrapper(self, sender):
|
||||
"""
|
||||
Make a wrapper function for the ssync_sender.Sender.connect() method
|
||||
that will in turn wrap the HTTConnection.send() and the
|
||||
Sender.readline() so that ssync protocol messages can be captured.
|
||||
"""
|
||||
orig_connect = sender.connect
|
||||
trace = dict(messages=[])
|
||||
|
||||
def add_trace(type, msg):
|
||||
# record a protocol event for later analysis
|
||||
if msg.strip():
|
||||
trace['messages'].append((type, msg.strip()))
|
||||
|
||||
def make_send_wrapper(send):
|
||||
def wrapped_send(msg):
|
||||
_msg = msg.split('\r\n', 1)[1]
|
||||
_msg = _msg.rsplit('\r\n', 1)[0]
|
||||
add_trace('tx', _msg)
|
||||
send(msg)
|
||||
return wrapped_send
|
||||
|
||||
def make_readline_wrapper(readline):
|
||||
def wrapped_readline():
|
||||
data = readline()
|
||||
add_trace('rx', data)
|
||||
bytes_read = trace.setdefault('readline_bytes', 0)
|
||||
trace['readline_bytes'] = bytes_read + len(data)
|
||||
return data
|
||||
return wrapped_readline
|
||||
|
||||
def wrapped_connect():
|
||||
orig_connect()
|
||||
sender.connection.send = make_send_wrapper(
|
||||
sender.connection.send)
|
||||
sender.readline = make_readline_wrapper(sender.readline)
|
||||
return wrapped_connect, trace
|
||||
|
||||
def setUp(self):
|
||||
self.device = 'dev'
|
||||
self.partition = '9'
|
||||
self.tmpdir = tempfile.mkdtemp()
|
||||
# sender side setup
|
||||
self.tx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender')
|
||||
utils.mkdirs(os.path.join(self.tx_testdir, self.device))
|
||||
self.daemon = FakeReplicator(self.tx_testdir)
|
||||
|
||||
# rx side setup
|
||||
self.rx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_receiver')
|
||||
utils.mkdirs(os.path.join(self.rx_testdir, self.device))
|
||||
conf = {
|
||||
'devices': self.rx_testdir,
|
||||
'mount_check': 'false',
|
||||
'replication_one_per_device': 'false',
|
||||
'log_requests': 'false'}
|
||||
self.rx_controller = server.ObjectController(conf)
|
||||
self.ts_iter = (Timestamp(t)
|
||||
for t in itertools.count(int(time.time())))
|
||||
self.rx_ip = '127.0.0.1'
|
||||
sock = eventlet.listen((self.rx_ip, 0))
|
||||
self.rx_server = eventlet.spawn(
|
||||
eventlet.wsgi.server, sock, self.rx_controller, utils.NullLogger())
|
||||
self.rx_port = sock.getsockname()[1]
|
||||
self.rx_node = {'replication_ip': self.rx_ip,
|
||||
'replication_port': self.rx_port,
|
||||
'device': self.device}
|
||||
|
||||
def tearDown(self):
|
||||
self.rx_server.kill()
|
||||
shutil.rmtree(self.tmpdir, ignore_errors=True)
|
||||
|
||||
def _create_ondisk_files(self, df_mgr, obj_name, policy, timestamp,
|
||||
frag_indexes=None):
|
||||
frag_indexes = [None] if frag_indexes is None else frag_indexes
|
||||
metadata = {'Content-Type': 'plain/text'}
|
||||
diskfiles = []
|
||||
for frag_index in frag_indexes:
|
||||
object_data = '/a/c/%s___%s' % (obj_name, frag_index)
|
||||
if frag_index is not None:
|
||||
metadata['X-Object-Sysmeta-Ec-Frag-Index'] = str(frag_index)
|
||||
df = self._make_open_diskfile(
|
||||
device=self.device, partition=self.partition, account='a',
|
||||
container='c', obj=obj_name, body=object_data,
|
||||
extra_metadata=metadata, timestamp=timestamp, policy=policy,
|
||||
frag_index=frag_index, df_mgr=df_mgr)
|
||||
# sanity checks
|
||||
listing = os.listdir(df._datadir)
|
||||
self.assertTrue(listing)
|
||||
for filename in listing:
|
||||
self.assertTrue(filename.startswith(timestamp.internal))
|
||||
diskfiles.append(df)
|
||||
return diskfiles
|
||||
|
||||
def _open_tx_diskfile(self, obj_name, policy, frag_index=None):
|
||||
df_mgr = self.daemon._diskfile_router[policy]
|
||||
df = df_mgr.get_diskfile(
|
||||
self.device, self.partition, account='a', container='c',
|
||||
obj=obj_name, policy=policy, frag_index=frag_index)
|
||||
df.open()
|
||||
return df
|
||||
|
||||
def _open_rx_diskfile(self, obj_name, policy, frag_index=None):
|
||||
df = self.rx_controller.get_diskfile(
|
||||
self.device, self.partition, 'a', 'c', obj_name, policy=policy,
|
||||
frag_index=frag_index)
|
||||
df.open()
|
||||
return df
|
||||
|
||||
def _verify_diskfile_sync(self, tx_df, rx_df, frag_index, same_etag=False):
|
||||
# verify that diskfiles' metadata match
|
||||
# sanity check, they are not the same ondisk files!
|
||||
self.assertNotEqual(tx_df._datadir, rx_df._datadir)
|
||||
rx_metadata = dict(rx_df.get_metadata())
|
||||
for k, v in tx_df.get_metadata().items():
|
||||
if k == 'X-Object-Sysmeta-Ec-Frag-Index':
|
||||
# if tx_df had a frag_index then rx_df should also have one
|
||||
self.assertTrue(k in rx_metadata)
|
||||
self.assertEqual(frag_index, int(rx_metadata.pop(k)))
|
||||
elif k == 'ETag' and not same_etag:
|
||||
self.assertNotEqual(v, rx_metadata.pop(k, None))
|
||||
continue
|
||||
else:
|
||||
self.assertEqual(v, rx_metadata.pop(k), k)
|
||||
self.assertFalse(rx_metadata)
|
||||
expected_body = '%s___%s' % (tx_df._name, frag_index)
|
||||
actual_body = ''.join([chunk for chunk in rx_df.reader()])
|
||||
self.assertEqual(expected_body, actual_body)
|
||||
|
||||
def _analyze_trace(self, trace):
|
||||
"""
|
||||
Parse protocol trace captured by fake connection, making some
|
||||
assertions along the way, and return results as a dict of form:
|
||||
results = {'tx_missing': <list of messages>,
|
||||
'rx_missing': <list of messages>,
|
||||
'tx_updates': <list of subreqs>,
|
||||
'rx_updates': <list of messages>}
|
||||
|
||||
Each subreq is a dict with keys: 'method', 'path', 'headers', 'body'
|
||||
"""
|
||||
def tx_missing(results, line):
|
||||
self.assertEqual('tx', line[0])
|
||||
results['tx_missing'].append(line[1])
|
||||
|
||||
def rx_missing(results, line):
|
||||
self.assertEqual('rx', line[0])
|
||||
parts = line[1].split('\r\n')
|
||||
for part in parts:
|
||||
results['rx_missing'].append(part)
|
||||
|
||||
def tx_updates(results, line):
|
||||
self.assertEqual('tx', line[0])
|
||||
subrequests = results['tx_updates']
|
||||
if line[1].startswith(('PUT', 'DELETE', 'POST')):
|
||||
parts = line[1].split('\r\n')
|
||||
method, path = parts[0].split()
|
||||
subreq = {'method': method, 'path': path, 'req': line[1],
|
||||
'headers': parts[1:]}
|
||||
subrequests.append(subreq)
|
||||
else:
|
||||
self.assertTrue(subrequests)
|
||||
body = (subrequests[-1]).setdefault('body', '')
|
||||
body += line[1]
|
||||
subrequests[-1]['body'] = body
|
||||
|
||||
def rx_updates(results, line):
|
||||
self.assertEqual('rx', line[0])
|
||||
results.setdefault['rx_updates'].append(line[1])
|
||||
|
||||
def unexpected(results, line):
|
||||
results.setdefault('unexpected', []).append(line)
|
||||
|
||||
# each trace line is a tuple of ([tx|rx], msg)
|
||||
handshakes = iter([(('tx', ':MISSING_CHECK: START'), tx_missing),
|
||||
(('tx', ':MISSING_CHECK: END'), unexpected),
|
||||
(('rx', ':MISSING_CHECK: START'), rx_missing),
|
||||
(('rx', ':MISSING_CHECK: END'), unexpected),
|
||||
(('tx', ':UPDATES: START'), tx_updates),
|
||||
(('tx', ':UPDATES: END'), unexpected),
|
||||
(('rx', ':UPDATES: START'), rx_updates),
|
||||
(('rx', ':UPDATES: END'), unexpected)])
|
||||
expect_handshake = next(handshakes)
|
||||
phases = ('tx_missing', 'rx_missing', 'tx_updates', 'rx_updates')
|
||||
results = dict((k, []) for k in phases)
|
||||
handler = unexpected
|
||||
lines = list(trace.get('messages', []))
|
||||
lines.reverse()
|
||||
while lines:
|
||||
line = lines.pop()
|
||||
if line == expect_handshake[0]:
|
||||
handler = expect_handshake[1]
|
||||
try:
|
||||
expect_handshake = next(handshakes)
|
||||
except StopIteration:
|
||||
# should be the last line
|
||||
self.assertFalse(
|
||||
lines, 'Unexpected trailing lines %s' % lines)
|
||||
continue
|
||||
handler(results, line)
|
||||
|
||||
try:
|
||||
# check all handshakes occurred
|
||||
missed = next(handshakes)
|
||||
self.fail('Handshake %s not found' % str(missed[0]))
|
||||
except StopIteration:
|
||||
pass
|
||||
# check no message outside of a phase
|
||||
self.assertFalse(results.get('unexpected'),
|
||||
'Message outside of a phase: %s' % results.get(None))
|
||||
return results
|
||||
|
||||
def _verify_ondisk_files(self, tx_objs, policy, tx_frag_index=None,
|
||||
rx_frag_index=None):
|
||||
"""
|
||||
Verify tx and rx files that should be in sync.
|
||||
:param tx_objs: sender diskfiles
|
||||
:param policy: storage policy instance
|
||||
:param tx_frag_index: the fragment index of tx diskfiles that should
|
||||
have been used as a source for sync'ing
|
||||
:param rx_frag_index: the fragment index of expected rx diskfiles
|
||||
"""
|
||||
for o_name, diskfiles in tx_objs.items():
|
||||
for tx_df in diskfiles:
|
||||
# check tx file still intact - ssync does not do any cleanup!
|
||||
tx_df.open()
|
||||
if tx_frag_index is None or tx_df._frag_index == tx_frag_index:
|
||||
# this diskfile should have been sync'd,
|
||||
# check rx file is ok
|
||||
rx_df = self._open_rx_diskfile(
|
||||
o_name, policy, rx_frag_index)
|
||||
# for EC revert job or replication etags should match
|
||||
match_etag = (tx_frag_index == rx_frag_index)
|
||||
self._verify_diskfile_sync(
|
||||
tx_df, rx_df, rx_frag_index, match_etag)
|
||||
else:
|
||||
# this diskfile should not have been sync'd,
|
||||
# check no rx file,
|
||||
self.assertRaises(DiskFileNotExist, self._open_rx_diskfile,
|
||||
o_name, policy,
|
||||
frag_index=tx_df._frag_index)
|
||||
|
||||
def _verify_tombstones(self, tx_objs, policy):
|
||||
# verify tx and rx tombstones that should be in sync
|
||||
for o_name, diskfiles in tx_objs.items():
|
||||
try:
|
||||
self._open_tx_diskfile(o_name, policy)
|
||||
self.fail('DiskFileDeleted expected')
|
||||
except DiskFileDeleted as exc:
|
||||
tx_delete_time = exc.timestamp
|
||||
try:
|
||||
self._open_rx_diskfile(o_name, policy)
|
||||
self.fail('DiskFileDeleted expected')
|
||||
except DiskFileDeleted as exc:
|
||||
rx_delete_time = exc.timestamp
|
||||
self.assertEqual(tx_delete_time, rx_delete_time)
|
||||
|
||||
|
||||
@patch_policies(with_ec_default=True)
|
||||
class TestSsyncEC(TestBaseSsync):
|
||||
def test_handoff_fragment_revert(self):
|
||||
# test that a sync_revert type job does send the correct frag archives
|
||||
# to the receiver
|
||||
policy = POLICIES.default
|
||||
rx_node_index = 0
|
||||
tx_node_index = 1
|
||||
# for a revert job we iterate over frag index that belongs on
|
||||
# remote node
|
||||
frag_index = rx_node_index
|
||||
|
||||
# create sender side diskfiles...
|
||||
tx_objs = {}
|
||||
rx_objs = {}
|
||||
tx_tombstones = {}
|
||||
tx_df_mgr = self.daemon._diskfile_router[policy]
|
||||
rx_df_mgr = self.rx_controller._diskfile_router[policy]
|
||||
# o1 has primary and handoff fragment archives
|
||||
t1 = next(self.ts_iter)
|
||||
tx_objs['o1'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o1', policy, t1, (rx_node_index, tx_node_index))
|
||||
# o2 only has primary
|
||||
t2 = next(self.ts_iter)
|
||||
tx_objs['o2'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o2', policy, t2, (tx_node_index,))
|
||||
# o3 only has handoff
|
||||
t3 = next(self.ts_iter)
|
||||
tx_objs['o3'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o3', policy, t3, (rx_node_index,))
|
||||
# o4 primary and handoff fragment archives on tx, handoff in sync on rx
|
||||
t4 = next(self.ts_iter)
|
||||
tx_objs['o4'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o4', policy, t4, (tx_node_index, rx_node_index,))
|
||||
rx_objs['o4'] = self._create_ondisk_files(
|
||||
rx_df_mgr, 'o4', policy, t4, (rx_node_index,))
|
||||
# o5 is a tombstone, missing on receiver
|
||||
t5 = next(self.ts_iter)
|
||||
tx_tombstones['o5'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o5', policy, t5, (tx_node_index,))
|
||||
tx_tombstones['o5'][0].delete(t5)
|
||||
|
||||
suffixes = set()
|
||||
for diskfiles in (tx_objs.values() + tx_tombstones.values()):
|
||||
for df in diskfiles:
|
||||
suffixes.add(os.path.basename(os.path.dirname(df._datadir)))
|
||||
|
||||
# create ssync sender instance...
|
||||
job = {'device': self.device,
|
||||
'partition': self.partition,
|
||||
'policy': policy,
|
||||
'frag_index': frag_index}
|
||||
node = dict(self.rx_node)
|
||||
node.update({'index': rx_node_index})
|
||||
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
|
||||
# wrap connection from tx to rx to capture ssync messages...
|
||||
sender.connect, trace = self.make_connect_wrapper(sender)
|
||||
|
||||
# run the sync protocol...
|
||||
sender()
|
||||
|
||||
# verify protocol
|
||||
results = self._analyze_trace(trace)
|
||||
# sender has handoff frags for o1, o3 and o4 and ts for o5
|
||||
self.assertEqual(4, len(results['tx_missing']))
|
||||
# receiver is missing frags for o1, o3 and ts for o5
|
||||
self.assertEqual(3, len(results['rx_missing']))
|
||||
self.assertEqual(3, len(results['tx_updates']))
|
||||
self.assertFalse(results['rx_updates'])
|
||||
sync_paths = []
|
||||
for subreq in results.get('tx_updates'):
|
||||
if subreq.get('method') == 'PUT':
|
||||
self.assertTrue(
|
||||
'X-Object-Sysmeta-Ec-Frag-Index: %s' % rx_node_index
|
||||
in subreq.get('headers'))
|
||||
expected_body = '%s___%s' % (subreq['path'], rx_node_index)
|
||||
self.assertEqual(expected_body, subreq['body'])
|
||||
elif subreq.get('method') == 'DELETE':
|
||||
self.assertEqual('/a/c/o5', subreq['path'])
|
||||
sync_paths.append(subreq.get('path'))
|
||||
self.assertEqual(['/a/c/o1', '/a/c/o3', '/a/c/o5'], sorted(sync_paths))
|
||||
|
||||
# verify on disk files...
|
||||
self._verify_ondisk_files(
|
||||
tx_objs, policy, frag_index, rx_node_index)
|
||||
self._verify_tombstones(tx_tombstones, policy)
|
||||
|
||||
def test_fragment_sync(self):
|
||||
# check that a sync_only type job does call reconstructor to build a
|
||||
# diskfile to send, and continues making progress despite an error
|
||||
# when building one diskfile
|
||||
policy = POLICIES.default
|
||||
rx_node_index = 0
|
||||
tx_node_index = 1
|
||||
# for a sync job we iterate over frag index that belongs on local node
|
||||
frag_index = tx_node_index
|
||||
|
||||
# create sender side diskfiles...
|
||||
tx_objs = {}
|
||||
tx_tombstones = {}
|
||||
rx_objs = {}
|
||||
tx_df_mgr = self.daemon._diskfile_router[policy]
|
||||
rx_df_mgr = self.rx_controller._diskfile_router[policy]
|
||||
# o1 only has primary
|
||||
t1 = next(self.ts_iter)
|
||||
tx_objs['o1'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o1', policy, t1, (tx_node_index,))
|
||||
# o2 only has primary
|
||||
t2 = next(self.ts_iter)
|
||||
tx_objs['o2'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o2', policy, t2, (tx_node_index,))
|
||||
# o3 only has primary
|
||||
t3 = next(self.ts_iter)
|
||||
tx_objs['o3'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o3', policy, t3, (tx_node_index,))
|
||||
# o4 primary fragment archives on tx, handoff in sync on rx
|
||||
t4 = next(self.ts_iter)
|
||||
tx_objs['o4'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o4', policy, t4, (tx_node_index,))
|
||||
rx_objs['o4'] = self._create_ondisk_files(
|
||||
rx_df_mgr, 'o4', policy, t4, (rx_node_index,))
|
||||
# o5 is a tombstone, missing on receiver
|
||||
t5 = next(self.ts_iter)
|
||||
tx_tombstones['o5'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o5', policy, t5, (tx_node_index,))
|
||||
tx_tombstones['o5'][0].delete(t5)
|
||||
|
||||
suffixes = set()
|
||||
for diskfiles in (tx_objs.values() + tx_tombstones.values()):
|
||||
for df in diskfiles:
|
||||
suffixes.add(os.path.basename(os.path.dirname(df._datadir)))
|
||||
|
||||
reconstruct_fa_calls = []
|
||||
|
||||
def fake_reconstruct_fa(job, node, metadata):
|
||||
reconstruct_fa_calls.append((job, node, policy, metadata))
|
||||
if len(reconstruct_fa_calls) == 2:
|
||||
# simulate second reconstruct failing
|
||||
raise DiskFileError
|
||||
content = '%s___%s' % (metadata['name'], rx_node_index)
|
||||
return RebuildingECDiskFileStream(
|
||||
metadata, rx_node_index, iter([content]))
|
||||
|
||||
# create ssync sender instance...
|
||||
job = {'device': self.device,
|
||||
'partition': self.partition,
|
||||
'policy': policy,
|
||||
'frag_index': frag_index,
|
||||
'sync_diskfile_builder': fake_reconstruct_fa}
|
||||
node = dict(self.rx_node)
|
||||
node.update({'index': rx_node_index})
|
||||
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
|
||||
# wrap connection from tx to rx to capture ssync messages...
|
||||
sender.connect, trace = self.make_connect_wrapper(sender)
|
||||
|
||||
# run the sync protocol...
|
||||
sender()
|
||||
|
||||
# verify protocol
|
||||
results = self._analyze_trace(trace)
|
||||
# sender has primary for o1, o2 and o3, o4 and ts for o5
|
||||
self.assertEqual(5, len(results['tx_missing']))
|
||||
# receiver is missing o1, o2 and o3 and ts for o5
|
||||
self.assertEqual(4, len(results['rx_missing']))
|
||||
# sender can only construct 2 out of 3 missing frags
|
||||
self.assertEqual(3, len(results['tx_updates']))
|
||||
self.assertEqual(3, len(reconstruct_fa_calls))
|
||||
self.assertFalse(results['rx_updates'])
|
||||
actual_sync_paths = []
|
||||
for subreq in results.get('tx_updates'):
|
||||
if subreq.get('method') == 'PUT':
|
||||
self.assertTrue(
|
||||
'X-Object-Sysmeta-Ec-Frag-Index: %s' % rx_node_index
|
||||
in subreq.get('headers'))
|
||||
expected_body = '%s___%s' % (subreq['path'], rx_node_index)
|
||||
self.assertEqual(expected_body, subreq['body'])
|
||||
elif subreq.get('method') == 'DELETE':
|
||||
self.assertEqual('/a/c/o5', subreq['path'])
|
||||
actual_sync_paths.append(subreq.get('path'))
|
||||
|
||||
# remove the failed df from expected synced df's
|
||||
expect_sync_paths = ['/a/c/o1', '/a/c/o2', '/a/c/o3', '/a/c/o5']
|
||||
failed_path = reconstruct_fa_calls[1][3]['name']
|
||||
expect_sync_paths.remove(failed_path)
|
||||
failed_obj = None
|
||||
for obj, diskfiles in tx_objs.items():
|
||||
if diskfiles[0]._name == failed_path:
|
||||
failed_obj = obj
|
||||
# sanity check
|
||||
self.assertTrue(tx_objs.pop(failed_obj))
|
||||
|
||||
# verify on disk files...
|
||||
self.assertEqual(sorted(expect_sync_paths), sorted(actual_sync_paths))
|
||||
self._verify_ondisk_files(
|
||||
tx_objs, policy, frag_index, rx_node_index)
|
||||
self._verify_tombstones(tx_tombstones, policy)
|
||||
|
||||
|
||||
@patch_policies
|
||||
class TestSsyncReplication(TestBaseSsync):
|
||||
def test_sync(self):
|
||||
policy = POLICIES.default
|
||||
rx_node_index = 0
|
||||
|
||||
# create sender side diskfiles...
|
||||
tx_objs = {}
|
||||
rx_objs = {}
|
||||
tx_tombstones = {}
|
||||
rx_tombstones = {}
|
||||
tx_df_mgr = self.daemon._diskfile_router[policy]
|
||||
rx_df_mgr = self.rx_controller._diskfile_router[policy]
|
||||
# o1 and o2 are on tx only
|
||||
t1 = next(self.ts_iter)
|
||||
tx_objs['o1'] = self._create_ondisk_files(tx_df_mgr, 'o1', policy, t1)
|
||||
t2 = next(self.ts_iter)
|
||||
tx_objs['o2'] = self._create_ondisk_files(tx_df_mgr, 'o2', policy, t2)
|
||||
# o3 is on tx and older copy on rx
|
||||
t3a = next(self.ts_iter)
|
||||
rx_objs['o3'] = self._create_ondisk_files(rx_df_mgr, 'o3', policy, t3a)
|
||||
t3b = next(self.ts_iter)
|
||||
tx_objs['o3'] = self._create_ondisk_files(tx_df_mgr, 'o3', policy, t3b)
|
||||
# o4 in sync on rx and tx
|
||||
t4 = next(self.ts_iter)
|
||||
tx_objs['o4'] = self._create_ondisk_files(tx_df_mgr, 'o4', policy, t4)
|
||||
rx_objs['o4'] = self._create_ondisk_files(rx_df_mgr, 'o4', policy, t4)
|
||||
# o5 is a tombstone, missing on receiver
|
||||
t5 = next(self.ts_iter)
|
||||
tx_tombstones['o5'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o5', policy, t5)
|
||||
tx_tombstones['o5'][0].delete(t5)
|
||||
# o6 is a tombstone, in sync on tx and rx
|
||||
t6 = next(self.ts_iter)
|
||||
tx_tombstones['o6'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o6', policy, t6)
|
||||
tx_tombstones['o6'][0].delete(t6)
|
||||
rx_tombstones['o6'] = self._create_ondisk_files(
|
||||
rx_df_mgr, 'o6', policy, t6)
|
||||
rx_tombstones['o6'][0].delete(t6)
|
||||
# o7 is a tombstone on tx, older data on rx
|
||||
t7a = next(self.ts_iter)
|
||||
rx_objs['o7'] = self._create_ondisk_files(rx_df_mgr, 'o7', policy, t7a)
|
||||
t7b = next(self.ts_iter)
|
||||
tx_tombstones['o7'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o7', policy, t7b)
|
||||
tx_tombstones['o7'][0].delete(t7b)
|
||||
|
||||
suffixes = set()
|
||||
for diskfiles in (tx_objs.values() + tx_tombstones.values()):
|
||||
for df in diskfiles:
|
||||
suffixes.add(os.path.basename(os.path.dirname(df._datadir)))
|
||||
|
||||
# create ssync sender instance...
|
||||
job = {'device': self.device,
|
||||
'partition': self.partition,
|
||||
'policy': policy}
|
||||
node = dict(self.rx_node)
|
||||
node.update({'index': rx_node_index})
|
||||
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
|
||||
# wrap connection from tx to rx to capture ssync messages...
|
||||
sender.connect, trace = self.make_connect_wrapper(sender)
|
||||
|
||||
# run the sync protocol...
|
||||
success, in_sync_objs = sender()
|
||||
|
||||
self.assertEqual(7, len(in_sync_objs), trace['messages'])
|
||||
self.assertTrue(success)
|
||||
|
||||
# verify protocol
|
||||
results = self._analyze_trace(trace)
|
||||
self.assertEqual(7, len(results['tx_missing']))
|
||||
self.assertEqual(5, len(results['rx_missing']))
|
||||
self.assertEqual(5, len(results['tx_updates']))
|
||||
self.assertFalse(results['rx_updates'])
|
||||
sync_paths = []
|
||||
for subreq in results.get('tx_updates'):
|
||||
if subreq.get('method') == 'PUT':
|
||||
self.assertTrue(
|
||||
subreq['path'] in ('/a/c/o1', '/a/c/o2', '/a/c/o3'))
|
||||
expected_body = '%s___None' % subreq['path']
|
||||
self.assertEqual(expected_body, subreq['body'])
|
||||
elif subreq.get('method') == 'DELETE':
|
||||
self.assertTrue(subreq['path'] in ('/a/c/o5', '/a/c/o7'))
|
||||
sync_paths.append(subreq.get('path'))
|
||||
self.assertEqual(
|
||||
['/a/c/o1', '/a/c/o2', '/a/c/o3', '/a/c/o5', '/a/c/o7'],
|
||||
sorted(sync_paths))
|
||||
|
||||
# verify on disk files...
|
||||
self._verify_ondisk_files(tx_objs, policy)
|
||||
self._verify_tombstones(tx_tombstones, policy)
|
||||
|
||||
def test_nothing_to_sync(self):
|
||||
job = {'device': self.device,
|
||||
'partition': self.partition,
|
||||
'policy': POLICIES.default}
|
||||
node = {'replication_ip': self.rx_ip,
|
||||
'replication_port': self.rx_port,
|
||||
'device': self.device,
|
||||
'index': 0}
|
||||
sender = ssync_sender.Sender(self.daemon, node, job, ['abc'])
|
||||
# wrap connection from tx to rx to capture ssync messages...
|
||||
sender.connect, trace = self.make_connect_wrapper(sender)
|
||||
|
||||
result, in_sync_objs = sender()
|
||||
|
||||
self.assertTrue(result)
|
||||
self.assertFalse(in_sync_objs)
|
||||
results = self._analyze_trace(trace)
|
||||
self.assertFalse(results['tx_missing'])
|
||||
self.assertFalse(results['rx_missing'])
|
||||
self.assertFalse(results['tx_updates'])
|
||||
self.assertFalse(results['rx_updates'])
|
||||
# Minimal receiver response as read by sender:
|
||||
# 2 <-- initial \r\n to start ssync exchange
|
||||
# + 23 <-- :MISSING CHECK START\r\n
|
||||
# + 2 <-- \r\n (minimal missing check response)
|
||||
# + 21 <-- :MISSING CHECK END\r\n
|
||||
# + 17 <-- :UPDATES START\r\n
|
||||
# + 15 <-- :UPDATES END\r\n
|
||||
# TOTAL = 80
|
||||
self.assertEqual(80, trace.get('readline_bytes'))
|
||||
|
||||
def test_meta_file_sync(self):
|
||||
policy = POLICIES.default
|
||||
rx_node_index = 0
|
||||
|
||||
# create diskfiles...
|
||||
tx_objs = {}
|
||||
rx_objs = {}
|
||||
tx_tombstones = {}
|
||||
rx_tombstones = {}
|
||||
tx_df_mgr = self.daemon._diskfile_router[policy]
|
||||
rx_df_mgr = self.rx_controller._diskfile_router[policy]
|
||||
|
||||
expected_subreqs = defaultdict(list)
|
||||
|
||||
# o1 on tx only with meta file
|
||||
t1 = next(self.ts_iter)
|
||||
tx_objs['o1'] = self._create_ondisk_files(tx_df_mgr, 'o1', policy, t1)
|
||||
t1_meta = next(self.ts_iter)
|
||||
metadata = {'X-Timestamp': t1_meta.internal,
|
||||
'X-Object-Meta-Test': 'o1',
|
||||
'X-Object-Sysmeta-Test': 'sys_o1'}
|
||||
tx_objs['o1'][0].write_metadata(metadata)
|
||||
expected_subreqs['PUT'].append('o1')
|
||||
expected_subreqs['POST'].append('o1')
|
||||
|
||||
# o2 on tx with meta, on rx without meta
|
||||
t2 = next(self.ts_iter)
|
||||
tx_objs['o2'] = self._create_ondisk_files(tx_df_mgr, 'o2', policy, t2)
|
||||
t2_meta = next(self.ts_iter)
|
||||
metadata = {'X-Timestamp': t2_meta.internal,
|
||||
'X-Object-Meta-Test': 'o2',
|
||||
'X-Object-Sysmeta-Test': 'sys_o2'}
|
||||
tx_objs['o2'][0].write_metadata(metadata)
|
||||
rx_objs['o2'] = self._create_ondisk_files(rx_df_mgr, 'o2', policy, t2)
|
||||
expected_subreqs['POST'].append('o2')
|
||||
|
||||
# o3 is on tx with meta, rx has newer data but no meta
|
||||
t3a = next(self.ts_iter)
|
||||
tx_objs['o3'] = self._create_ondisk_files(tx_df_mgr, 'o3', policy, t3a)
|
||||
t3b = next(self.ts_iter)
|
||||
rx_objs['o3'] = self._create_ondisk_files(rx_df_mgr, 'o3', policy, t3b)
|
||||
t3_meta = next(self.ts_iter)
|
||||
metadata = {'X-Timestamp': t3_meta.internal,
|
||||
'X-Object-Meta-Test': 'o3',
|
||||
'X-Object-Sysmeta-Test': 'sys_o3'}
|
||||
tx_objs['o3'][0].write_metadata(metadata)
|
||||
expected_subreqs['POST'].append('o3')
|
||||
|
||||
# o4 is on tx with meta, rx has older data and up to date meta
|
||||
t4a = next(self.ts_iter)
|
||||
rx_objs['o4'] = self._create_ondisk_files(rx_df_mgr, 'o4', policy, t4a)
|
||||
t4b = next(self.ts_iter)
|
||||
tx_objs['o4'] = self._create_ondisk_files(tx_df_mgr, 'o4', policy, t4b)
|
||||
t4_meta = next(self.ts_iter)
|
||||
metadata = {'X-Timestamp': t4_meta.internal,
|
||||
'X-Object-Meta-Test': 'o4',
|
||||
'X-Object-Sysmeta-Test': 'sys_o4'}
|
||||
tx_objs['o4'][0].write_metadata(metadata)
|
||||
rx_objs['o4'][0].write_metadata(metadata)
|
||||
expected_subreqs['PUT'].append('o4')
|
||||
|
||||
# o5 is on tx with meta, rx is in sync with data and meta
|
||||
t5 = next(self.ts_iter)
|
||||
rx_objs['o5'] = self._create_ondisk_files(rx_df_mgr, 'o5', policy, t5)
|
||||
tx_objs['o5'] = self._create_ondisk_files(tx_df_mgr, 'o5', policy, t5)
|
||||
t5_meta = next(self.ts_iter)
|
||||
metadata = {'X-Timestamp': t5_meta.internal,
|
||||
'X-Object-Meta-Test': 'o5',
|
||||
'X-Object-Sysmeta-Test': 'sys_o5'}
|
||||
tx_objs['o5'][0].write_metadata(metadata)
|
||||
rx_objs['o5'][0].write_metadata(metadata)
|
||||
|
||||
# o6 is tombstone on tx, rx has older data and meta
|
||||
t6 = next(self.ts_iter)
|
||||
tx_tombstones['o6'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o6', policy, t6)
|
||||
rx_tombstones['o6'] = self._create_ondisk_files(
|
||||
rx_df_mgr, 'o6', policy, t6)
|
||||
metadata = {'X-Timestamp': next(self.ts_iter).internal,
|
||||
'X-Object-Meta-Test': 'o6',
|
||||
'X-Object-Sysmeta-Test': 'sys_o6'}
|
||||
rx_tombstones['o6'][0].write_metadata(metadata)
|
||||
tx_tombstones['o6'][0].delete(next(self.ts_iter))
|
||||
expected_subreqs['DELETE'].append('o6')
|
||||
|
||||
# o7 is tombstone on rx, tx has older data and meta,
|
||||
# no subreqs expected...
|
||||
t7 = next(self.ts_iter)
|
||||
tx_objs['o7'] = self._create_ondisk_files(tx_df_mgr, 'o7', policy, t7)
|
||||
rx_tombstones['o7'] = self._create_ondisk_files(
|
||||
rx_df_mgr, 'o7', policy, t7)
|
||||
metadata = {'X-Timestamp': next(self.ts_iter).internal,
|
||||
'X-Object-Meta-Test': 'o7',
|
||||
'X-Object-Sysmeta-Test': 'sys_o7'}
|
||||
tx_objs['o7'][0].write_metadata(metadata)
|
||||
rx_tombstones['o7'][0].delete(next(self.ts_iter))
|
||||
|
||||
suffixes = set()
|
||||
for diskfiles in (tx_objs.values() + tx_tombstones.values()):
|
||||
for df in diskfiles:
|
||||
suffixes.add(os.path.basename(os.path.dirname(df._datadir)))
|
||||
|
||||
# create ssync sender instance...
|
||||
job = {'device': self.device,
|
||||
'partition': self.partition,
|
||||
'policy': policy}
|
||||
node = dict(self.rx_node)
|
||||
node.update({'index': rx_node_index})
|
||||
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
|
||||
# wrap connection from tx to rx to capture ssync messages...
|
||||
sender.connect, trace = self.make_connect_wrapper(sender)
|
||||
|
||||
# run the sync protocol...
|
||||
success, in_sync_objs = sender()
|
||||
|
||||
self.assertEqual(7, len(in_sync_objs))
|
||||
self.assertTrue(success)
|
||||
|
||||
# verify protocol
|
||||
results = self._analyze_trace(trace)
|
||||
self.assertEqual(7, len(results['tx_missing']))
|
||||
self.assertEqual(5, len(results['rx_missing']))
|
||||
for subreq in results.get('tx_updates'):
|
||||
obj = subreq['path'].split('/')[3]
|
||||
method = subreq['method']
|
||||
self.assertTrue(obj in expected_subreqs[method],
|
||||
'Unexpected %s subreq for object %s, expected %s'
|
||||
% (method, obj, expected_subreqs[method]))
|
||||
expected_subreqs[method].remove(obj)
|
||||
if method == 'PUT':
|
||||
expected_body = '%s___None' % subreq['path']
|
||||
self.assertEqual(expected_body, subreq['body'])
|
||||
# verify all expected subreqs consumed
|
||||
for _method, expected in expected_subreqs.items():
|
||||
self.assertFalse(expected)
|
||||
self.assertFalse(results['rx_updates'])
|
||||
|
||||
# verify on disk files...
|
||||
del tx_objs['o7'] # o7 not expected to be sync'd
|
||||
self._verify_ondisk_files(tx_objs, policy)
|
||||
self._verify_tombstones(tx_tombstones, policy)
|
||||
for oname, rx_obj in rx_objs.items():
|
||||
df = rx_obj[0].open()
|
||||
metadata = df.get_metadata()
|
||||
self.assertEqual(metadata['X-Object-Meta-Test'], oname)
|
||||
self.assertEqual(metadata['X-Object-Sysmeta-Test'], 'sys_' + oname)
|
||||
|
||||
def test_meta_file_not_synced_to_legacy_receiver(self):
|
||||
# verify that the sender does sync a data file to a legacy receiver,
|
||||
# but does not PUT meta file content to a legacy receiver
|
||||
policy = POLICIES.default
|
||||
rx_node_index = 0
|
||||
|
||||
# create diskfiles...
|
||||
tx_df_mgr = self.daemon._diskfile_router[policy]
|
||||
rx_df_mgr = self.rx_controller._diskfile_router[policy]
|
||||
|
||||
# rx has data at t1 but no meta
|
||||
# object is on tx with data at t2, meta at t3,
|
||||
t1 = next(self.ts_iter)
|
||||
self._create_ondisk_files(rx_df_mgr, 'o1', policy, t1)
|
||||
t2 = next(self.ts_iter)
|
||||
tx_obj = self._create_ondisk_files(tx_df_mgr, 'o1', policy, t2)[0]
|
||||
t3 = next(self.ts_iter)
|
||||
metadata = {'X-Timestamp': t3.internal,
|
||||
'X-Object-Meta-Test': 'o3',
|
||||
'X-Object-Sysmeta-Test': 'sys_o3'}
|
||||
tx_obj.write_metadata(metadata)
|
||||
|
||||
suffixes = [os.path.basename(os.path.dirname(tx_obj._datadir))]
|
||||
# create ssync sender instance...
|
||||
job = {'device': self.device,
|
||||
'partition': self.partition,
|
||||
'policy': policy}
|
||||
node = dict(self.rx_node)
|
||||
node.update({'index': rx_node_index})
|
||||
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
|
||||
# wrap connection from tx to rx to capture ssync messages...
|
||||
sender.connect, trace = self.make_connect_wrapper(sender)
|
||||
|
||||
def _legacy_check_missing(self, line):
|
||||
# reproduces behavior of 'legacy' ssync receiver missing_checks()
|
||||
parts = line.split()
|
||||
object_hash = urllib.parse.unquote(parts[0])
|
||||
timestamp = urllib.parse.unquote(parts[1])
|
||||
want = False
|
||||
try:
|
||||
df = self.diskfile_mgr.get_diskfile_from_hash(
|
||||
self.device, self.partition, object_hash, self.policy,
|
||||
frag_index=self.frag_index)
|
||||
except exceptions.DiskFileNotExist:
|
||||
want = True
|
||||
else:
|
||||
try:
|
||||
df.open()
|
||||
except exceptions.DiskFileDeleted as err:
|
||||
want = err.timestamp < timestamp
|
||||
except exceptions.DiskFileError as err:
|
||||
want = True
|
||||
else:
|
||||
want = df.timestamp < timestamp
|
||||
if want:
|
||||
return urllib.parse.quote(object_hash)
|
||||
return None
|
||||
|
||||
# run the sync protocol...
|
||||
func = 'swift.obj.ssync_receiver.Receiver._check_missing'
|
||||
with mock.patch(func, _legacy_check_missing):
|
||||
success, in_sync_objs = sender()
|
||||
|
||||
self.assertEqual(1, len(in_sync_objs))
|
||||
self.assertTrue(success)
|
||||
|
||||
# verify protocol, expecting only a PUT to legacy receiver
|
||||
results = self._analyze_trace(trace)
|
||||
self.assertEqual(1, len(results['tx_missing']))
|
||||
self.assertEqual(1, len(results['rx_missing']))
|
||||
self.assertEqual(1, len(results['tx_updates']))
|
||||
self.assertEqual('PUT', results['tx_updates'][0]['method'])
|
||||
self.assertFalse(results['rx_updates'])
|
||||
|
||||
# verify on disk files...
|
||||
rx_obj = self._open_rx_diskfile('o1', policy)
|
||||
tx_obj = self._open_tx_diskfile('o1', policy)
|
||||
# with legacy behavior rx_obj data and meta timestamps are equal
|
||||
self.assertEqual(t2, rx_obj.data_timestamp)
|
||||
self.assertEqual(t2, rx_obj.timestamp)
|
||||
# with legacy behavior rx_obj data timestamp should equal tx_obj
|
||||
self.assertEqual(rx_obj.data_timestamp, tx_obj.data_timestamp)
|
||||
# tx meta file should not have been sync'd to rx data file
|
||||
self.assertNotIn('X-Object-Meta-Test', rx_obj.get_metadata())
|
||||
|
||||
|
||||
class TestModuleMethods(unittest.TestCase):
|
||||
def test_encode_missing(self):
|
||||
object_hash = '9d41d8cd98f00b204e9800998ecf0abc'
|
||||
@ -2458,7 +1579,7 @@ class TestModuleMethods(unittest.TestCase):
|
||||
expected = {'data': True, 'meta': True}
|
||||
self.assertEqual(ssync_sender.decode_wanted(parts), expected)
|
||||
|
||||
# you don't really these next few...
|
||||
# you don't really expect these next few...
|
||||
parts = ['md']
|
||||
expected = {'data': True, 'meta': True}
|
||||
self.assertEqual(ssync_sender.decode_wanted(parts), expected)
|
||||
|
Loading…
Reference in New Issue
Block a user