From 6858510b59c23ebef527a92c3258dc3c5c6046f5 Mon Sep 17 00:00:00 2001 From: Alistair Coles Date: Thu, 3 Sep 2015 16:13:17 +0100 Subject: [PATCH] Re-organise ssync tests We have some tests that exercise both the sender and receiver, but are spread across test_ssync_sender.py and test_ssync_receiver.py. This creates a new module test_ssync.py and moves the end-to-end tests into there. Change-Id: Iea3e9932734924453f7241432afda90abbc75c06 --- test/unit/obj/common.py | 80 +++ test/unit/obj/test_ssync.py | 909 +++++++++++++++++++++++++++ test/unit/obj/test_ssync_receiver.py | 123 +--- test/unit/obj/test_ssync_sender.py | 895 +------------------------- 4 files changed, 1005 insertions(+), 1002 deletions(-) create mode 100644 test/unit/obj/common.py create mode 100644 test/unit/obj/test_ssync.py diff --git a/test/unit/obj/common.py b/test/unit/obj/common.py new file mode 100644 index 0000000000..33acb631d5 --- /dev/null +++ b/test/unit/obj/common.py @@ -0,0 +1,80 @@ +# Copyright (c) 2013 - 2015 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import hashlib +import shutil +import tempfile +import unittest +import time + +from swift.common.storage_policy import POLICIES +from swift.common.utils import Timestamp +from swift.obj import diskfile + +from test.unit import debug_logger + + +class FakeReplicator(object): + def __init__(self, testdir, policy=None): + self.logger = debug_logger('test-ssync-sender') + self.conn_timeout = 1 + self.node_timeout = 2 + self.http_timeout = 3 + self.network_chunk_size = 65536 + self.disk_chunk_size = 4096 + conf = { + 'devices': testdir, + 'mount_check': 'false', + } + policy = POLICIES.default if policy is None else policy + self._diskfile_router = diskfile.DiskFileRouter(conf, self.logger) + self._diskfile_mgr = self._diskfile_router[policy] + + +class BaseTest(unittest.TestCase): + def setUp(self): + # daemon will be set in subclass setUp + self.daemon = None + self.tmpdir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.tmpdir, ignore_errors=True) + + def _make_open_diskfile(self, device='dev', partition='9', + account='a', container='c', obj='o', body='test', + extra_metadata=None, policy=None, + frag_index=None, timestamp=None, df_mgr=None): + policy = policy or POLICIES.legacy + object_parts = account, container, obj + timestamp = Timestamp(time.time()) if timestamp is None else timestamp + if df_mgr is None: + df_mgr = self.daemon._diskfile_router[policy] + df = df_mgr.get_diskfile( + device, partition, *object_parts, policy=policy, + frag_index=frag_index) + content_length = len(body) + etag = hashlib.md5(body).hexdigest() + with df.create() as writer: + writer.write(body) + metadata = { + 'X-Timestamp': timestamp.internal, + 'Content-Length': str(content_length), + 'ETag': etag, + } + if extra_metadata: + metadata.update(extra_metadata) + writer.put(metadata) + writer.commit(timestamp) + df.open() + return df diff --git a/test/unit/obj/test_ssync.py b/test/unit/obj/test_ssync.py new file mode 100644 index 0000000000..94c463606c --- /dev/null +++ b/test/unit/obj/test_ssync.py @@ -0,0 +1,909 @@ +# Copyright (c) 2013 - 2015 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from collections import defaultdict + +import mock +import os +import time +import unittest + +import eventlet +import itertools +from six.moves import urllib + +from swift.common.exceptions import DiskFileNotExist, DiskFileError, \ + DiskFileDeleted +from swift.common import utils +from swift.common.storage_policy import POLICIES +from swift.common.utils import Timestamp +from swift.obj import ssync_sender, server +from swift.obj.reconstructor import RebuildingECDiskFileStream + +from test.unit import patch_policies +from test.unit.obj.common import BaseTest, FakeReplicator + + +class TestBaseSsync(BaseTest): + """ + Provides a framework to test end to end interactions between sender and + receiver. The basis for each test is actual diskfile state on either side. + The connection between sender and receiver is wrapped to capture ssync + traffic for subsequent verification of the protocol. Assertions are made + about the final state of the sender and receiver diskfiles. + """ + def setUp(self): + super(TestBaseSsync, self).setUp() + self.device = 'dev' + self.partition = '9' + # sender side setup + self.tx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender') + utils.mkdirs(os.path.join(self.tx_testdir, self.device)) + self.daemon = FakeReplicator(self.tx_testdir) + + # rx side setup + self.rx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_receiver') + utils.mkdirs(os.path.join(self.rx_testdir, self.device)) + conf = { + 'devices': self.rx_testdir, + 'mount_check': 'false', + 'replication_one_per_device': 'false', + 'log_requests': 'false'} + self.rx_controller = server.ObjectController(conf) + self.ts_iter = (Timestamp(t) + for t in itertools.count(int(time.time()))) + self.rx_ip = '127.0.0.1' + sock = eventlet.listen((self.rx_ip, 0)) + self.rx_server = eventlet.spawn( + eventlet.wsgi.server, sock, self.rx_controller, utils.NullLogger()) + self.rx_port = sock.getsockname()[1] + self.rx_node = {'replication_ip': self.rx_ip, + 'replication_port': self.rx_port, + 'device': self.device} + + def tearDown(self): + self.rx_server.kill() + super(TestBaseSsync, self).tearDown() + + def make_connect_wrapper(self, sender): + """ + Make a wrapper function for the ssync_sender.Sender.connect() method + that will in turn wrap the HTTConnection.send() and the + Sender.readline() so that ssync protocol messages can be captured. + """ + orig_connect = sender.connect + trace = dict(messages=[]) + + def add_trace(type, msg): + # record a protocol event for later analysis + if msg.strip(): + trace['messages'].append((type, msg.strip())) + + def make_send_wrapper(send): + def wrapped_send(msg): + _msg = msg.split('\r\n', 1)[1] + _msg = _msg.rsplit('\r\n', 1)[0] + add_trace('tx', _msg) + send(msg) + return wrapped_send + + def make_readline_wrapper(readline): + def wrapped_readline(): + data = readline() + add_trace('rx', data) + bytes_read = trace.setdefault('readline_bytes', 0) + trace['readline_bytes'] = bytes_read + len(data) + return data + return wrapped_readline + + def wrapped_connect(): + orig_connect() + sender.connection.send = make_send_wrapper( + sender.connection.send) + sender.readline = make_readline_wrapper(sender.readline) + return wrapped_connect, trace + + def _create_ondisk_files(self, df_mgr, obj_name, policy, timestamp, + frag_indexes=None): + frag_indexes = [None] if frag_indexes is None else frag_indexes + metadata = {'Content-Type': 'plain/text'} + diskfiles = [] + for frag_index in frag_indexes: + object_data = '/a/c/%s___%s' % (obj_name, frag_index) + if frag_index is not None: + metadata['X-Object-Sysmeta-Ec-Frag-Index'] = str(frag_index) + df = self._make_open_diskfile( + device=self.device, partition=self.partition, account='a', + container='c', obj=obj_name, body=object_data, + extra_metadata=metadata, timestamp=timestamp, policy=policy, + frag_index=frag_index, df_mgr=df_mgr) + # sanity checks + listing = os.listdir(df._datadir) + self.assertTrue(listing) + for filename in listing: + self.assertTrue(filename.startswith(timestamp.internal)) + diskfiles.append(df) + return diskfiles + + def _open_tx_diskfile(self, obj_name, policy, frag_index=None): + df_mgr = self.daemon._diskfile_router[policy] + df = df_mgr.get_diskfile( + self.device, self.partition, account='a', container='c', + obj=obj_name, policy=policy, frag_index=frag_index) + df.open() + return df + + def _open_rx_diskfile(self, obj_name, policy, frag_index=None): + df = self.rx_controller.get_diskfile( + self.device, self.partition, 'a', 'c', obj_name, policy=policy, + frag_index=frag_index) + df.open() + return df + + def _verify_diskfile_sync(self, tx_df, rx_df, frag_index, same_etag=False): + # verify that diskfiles' metadata match + # sanity check, they are not the same ondisk files! + self.assertNotEqual(tx_df._datadir, rx_df._datadir) + rx_metadata = dict(rx_df.get_metadata()) + for k, v in tx_df.get_metadata().items(): + if k == 'X-Object-Sysmeta-Ec-Frag-Index': + # if tx_df had a frag_index then rx_df should also have one + self.assertTrue(k in rx_metadata) + self.assertEqual(frag_index, int(rx_metadata.pop(k))) + elif k == 'ETag' and not same_etag: + self.assertNotEqual(v, rx_metadata.pop(k, None)) + continue + else: + self.assertEqual(v, rx_metadata.pop(k), k) + self.assertFalse(rx_metadata) + expected_body = '%s___%s' % (tx_df._name, frag_index) + actual_body = ''.join([chunk for chunk in rx_df.reader()]) + self.assertEqual(expected_body, actual_body) + + def _analyze_trace(self, trace): + """ + Parse protocol trace captured by fake connection, making some + assertions along the way, and return results as a dict of form: + results = {'tx_missing': , + 'rx_missing': , + 'tx_updates': , + 'rx_updates': } + + Each subreq is a dict with keys: 'method', 'path', 'headers', 'body' + """ + def tx_missing(results, line): + self.assertEqual('tx', line[0]) + results['tx_missing'].append(line[1]) + + def rx_missing(results, line): + self.assertEqual('rx', line[0]) + parts = line[1].split('\r\n') + for part in parts: + results['rx_missing'].append(part) + + def tx_updates(results, line): + self.assertEqual('tx', line[0]) + subrequests = results['tx_updates'] + if line[1].startswith(('PUT', 'DELETE', 'POST')): + parts = line[1].split('\r\n') + method, path = parts[0].split() + subreq = {'method': method, 'path': path, 'req': line[1], + 'headers': parts[1:]} + subrequests.append(subreq) + else: + self.assertTrue(subrequests) + body = (subrequests[-1]).setdefault('body', '') + body += line[1] + subrequests[-1]['body'] = body + + def rx_updates(results, line): + self.assertEqual('rx', line[0]) + results.setdefault['rx_updates'].append(line[1]) + + def unexpected(results, line): + results.setdefault('unexpected', []).append(line) + + # each trace line is a tuple of ([tx|rx], msg) + handshakes = iter([(('tx', ':MISSING_CHECK: START'), tx_missing), + (('tx', ':MISSING_CHECK: END'), unexpected), + (('rx', ':MISSING_CHECK: START'), rx_missing), + (('rx', ':MISSING_CHECK: END'), unexpected), + (('tx', ':UPDATES: START'), tx_updates), + (('tx', ':UPDATES: END'), unexpected), + (('rx', ':UPDATES: START'), rx_updates), + (('rx', ':UPDATES: END'), unexpected)]) + expect_handshake = next(handshakes) + phases = ('tx_missing', 'rx_missing', 'tx_updates', 'rx_updates') + results = dict((k, []) for k in phases) + handler = unexpected + lines = list(trace.get('messages', [])) + lines.reverse() + while lines: + line = lines.pop() + if line == expect_handshake[0]: + handler = expect_handshake[1] + try: + expect_handshake = next(handshakes) + except StopIteration: + # should be the last line + self.assertFalse( + lines, 'Unexpected trailing lines %s' % lines) + continue + handler(results, line) + + try: + # check all handshakes occurred + missed = next(handshakes) + self.fail('Handshake %s not found' % str(missed[0])) + except StopIteration: + pass + # check no message outside of a phase + self.assertFalse(results.get('unexpected'), + 'Message outside of a phase: %s' % results.get(None)) + return results + + def _verify_ondisk_files(self, tx_objs, policy, tx_frag_index=None, + rx_frag_index=None): + """ + Verify tx and rx files that should be in sync. + :param tx_objs: sender diskfiles + :param policy: storage policy instance + :param tx_frag_index: the fragment index of tx diskfiles that should + have been used as a source for sync'ing + :param rx_frag_index: the fragment index of expected rx diskfiles + """ + for o_name, diskfiles in tx_objs.items(): + for tx_df in diskfiles: + # check tx file still intact - ssync does not do any cleanup! + tx_df.open() + if tx_frag_index is None or tx_df._frag_index == tx_frag_index: + # this diskfile should have been sync'd, + # check rx file is ok + rx_df = self._open_rx_diskfile( + o_name, policy, rx_frag_index) + # for EC revert job or replication etags should match + match_etag = (tx_frag_index == rx_frag_index) + self._verify_diskfile_sync( + tx_df, rx_df, rx_frag_index, match_etag) + else: + # this diskfile should not have been sync'd, + # check no rx file, + self.assertRaises(DiskFileNotExist, self._open_rx_diskfile, + o_name, policy, + frag_index=tx_df._frag_index) + + def _verify_tombstones(self, tx_objs, policy): + # verify tx and rx tombstones that should be in sync + for o_name, diskfiles in tx_objs.items(): + try: + self._open_tx_diskfile(o_name, policy) + self.fail('DiskFileDeleted expected') + except DiskFileDeleted as exc: + tx_delete_time = exc.timestamp + try: + self._open_rx_diskfile(o_name, policy) + self.fail('DiskFileDeleted expected') + except DiskFileDeleted as exc: + rx_delete_time = exc.timestamp + self.assertEqual(tx_delete_time, rx_delete_time) + + +@patch_policies(with_ec_default=True) +class TestSsyncEC(TestBaseSsync): + def test_handoff_fragment_revert(self): + # test that a sync_revert type job does send the correct frag archives + # to the receiver + policy = POLICIES.default + rx_node_index = 0 + tx_node_index = 1 + # for a revert job we iterate over frag index that belongs on + # remote node + frag_index = rx_node_index + + # create sender side diskfiles... + tx_objs = {} + rx_objs = {} + tx_tombstones = {} + tx_df_mgr = self.daemon._diskfile_router[policy] + rx_df_mgr = self.rx_controller._diskfile_router[policy] + # o1 has primary and handoff fragment archives + t1 = next(self.ts_iter) + tx_objs['o1'] = self._create_ondisk_files( + tx_df_mgr, 'o1', policy, t1, (rx_node_index, tx_node_index)) + # o2 only has primary + t2 = next(self.ts_iter) + tx_objs['o2'] = self._create_ondisk_files( + tx_df_mgr, 'o2', policy, t2, (tx_node_index,)) + # o3 only has handoff + t3 = next(self.ts_iter) + tx_objs['o3'] = self._create_ondisk_files( + tx_df_mgr, 'o3', policy, t3, (rx_node_index,)) + # o4 primary and handoff fragment archives on tx, handoff in sync on rx + t4 = next(self.ts_iter) + tx_objs['o4'] = self._create_ondisk_files( + tx_df_mgr, 'o4', policy, t4, (tx_node_index, rx_node_index,)) + rx_objs['o4'] = self._create_ondisk_files( + rx_df_mgr, 'o4', policy, t4, (rx_node_index,)) + # o5 is a tombstone, missing on receiver + t5 = next(self.ts_iter) + tx_tombstones['o5'] = self._create_ondisk_files( + tx_df_mgr, 'o5', policy, t5, (tx_node_index,)) + tx_tombstones['o5'][0].delete(t5) + + suffixes = set() + for diskfiles in (tx_objs.values() + tx_tombstones.values()): + for df in diskfiles: + suffixes.add(os.path.basename(os.path.dirname(df._datadir))) + + # create ssync sender instance... + job = {'device': self.device, + 'partition': self.partition, + 'policy': policy, + 'frag_index': frag_index} + node = dict(self.rx_node) + node.update({'index': rx_node_index}) + sender = ssync_sender.Sender(self.daemon, node, job, suffixes) + # wrap connection from tx to rx to capture ssync messages... + sender.connect, trace = self.make_connect_wrapper(sender) + + # run the sync protocol... + sender() + + # verify protocol + results = self._analyze_trace(trace) + # sender has handoff frags for o1, o3 and o4 and ts for o5 + self.assertEqual(4, len(results['tx_missing'])) + # receiver is missing frags for o1, o3 and ts for o5 + self.assertEqual(3, len(results['rx_missing'])) + self.assertEqual(3, len(results['tx_updates'])) + self.assertFalse(results['rx_updates']) + sync_paths = [] + for subreq in results.get('tx_updates'): + if subreq.get('method') == 'PUT': + self.assertTrue( + 'X-Object-Sysmeta-Ec-Frag-Index: %s' % rx_node_index + in subreq.get('headers')) + expected_body = '%s___%s' % (subreq['path'], rx_node_index) + self.assertEqual(expected_body, subreq['body']) + elif subreq.get('method') == 'DELETE': + self.assertEqual('/a/c/o5', subreq['path']) + sync_paths.append(subreq.get('path')) + self.assertEqual(['/a/c/o1', '/a/c/o3', '/a/c/o5'], sorted(sync_paths)) + + # verify on disk files... + self._verify_ondisk_files( + tx_objs, policy, frag_index, rx_node_index) + self._verify_tombstones(tx_tombstones, policy) + + def test_fragment_sync(self): + # check that a sync_only type job does call reconstructor to build a + # diskfile to send, and continues making progress despite an error + # when building one diskfile + policy = POLICIES.default + rx_node_index = 0 + tx_node_index = 1 + # for a sync job we iterate over frag index that belongs on local node + frag_index = tx_node_index + + # create sender side diskfiles... + tx_objs = {} + tx_tombstones = {} + rx_objs = {} + tx_df_mgr = self.daemon._diskfile_router[policy] + rx_df_mgr = self.rx_controller._diskfile_router[policy] + # o1 only has primary + t1 = next(self.ts_iter) + tx_objs['o1'] = self._create_ondisk_files( + tx_df_mgr, 'o1', policy, t1, (tx_node_index,)) + # o2 only has primary + t2 = next(self.ts_iter) + tx_objs['o2'] = self._create_ondisk_files( + tx_df_mgr, 'o2', policy, t2, (tx_node_index,)) + # o3 only has primary + t3 = next(self.ts_iter) + tx_objs['o3'] = self._create_ondisk_files( + tx_df_mgr, 'o3', policy, t3, (tx_node_index,)) + # o4 primary fragment archives on tx, handoff in sync on rx + t4 = next(self.ts_iter) + tx_objs['o4'] = self._create_ondisk_files( + tx_df_mgr, 'o4', policy, t4, (tx_node_index,)) + rx_objs['o4'] = self._create_ondisk_files( + rx_df_mgr, 'o4', policy, t4, (rx_node_index,)) + # o5 is a tombstone, missing on receiver + t5 = next(self.ts_iter) + tx_tombstones['o5'] = self._create_ondisk_files( + tx_df_mgr, 'o5', policy, t5, (tx_node_index,)) + tx_tombstones['o5'][0].delete(t5) + + suffixes = set() + for diskfiles in (tx_objs.values() + tx_tombstones.values()): + for df in diskfiles: + suffixes.add(os.path.basename(os.path.dirname(df._datadir))) + + reconstruct_fa_calls = [] + + def fake_reconstruct_fa(job, node, metadata): + reconstruct_fa_calls.append((job, node, policy, metadata)) + if len(reconstruct_fa_calls) == 2: + # simulate second reconstruct failing + raise DiskFileError + content = '%s___%s' % (metadata['name'], rx_node_index) + return RebuildingECDiskFileStream( + metadata, rx_node_index, iter([content])) + + # create ssync sender instance... + job = {'device': self.device, + 'partition': self.partition, + 'policy': policy, + 'frag_index': frag_index, + 'sync_diskfile_builder': fake_reconstruct_fa} + node = dict(self.rx_node) + node.update({'index': rx_node_index}) + sender = ssync_sender.Sender(self.daemon, node, job, suffixes) + # wrap connection from tx to rx to capture ssync messages... + sender.connect, trace = self.make_connect_wrapper(sender) + + # run the sync protocol... + sender() + + # verify protocol + results = self._analyze_trace(trace) + # sender has primary for o1, o2 and o3, o4 and ts for o5 + self.assertEqual(5, len(results['tx_missing'])) + # receiver is missing o1, o2 and o3 and ts for o5 + self.assertEqual(4, len(results['rx_missing'])) + # sender can only construct 2 out of 3 missing frags + self.assertEqual(3, len(results['tx_updates'])) + self.assertEqual(3, len(reconstruct_fa_calls)) + self.assertFalse(results['rx_updates']) + actual_sync_paths = [] + for subreq in results.get('tx_updates'): + if subreq.get('method') == 'PUT': + self.assertTrue( + 'X-Object-Sysmeta-Ec-Frag-Index: %s' % rx_node_index + in subreq.get('headers')) + expected_body = '%s___%s' % (subreq['path'], rx_node_index) + self.assertEqual(expected_body, subreq['body']) + elif subreq.get('method') == 'DELETE': + self.assertEqual('/a/c/o5', subreq['path']) + actual_sync_paths.append(subreq.get('path')) + + # remove the failed df from expected synced df's + expect_sync_paths = ['/a/c/o1', '/a/c/o2', '/a/c/o3', '/a/c/o5'] + failed_path = reconstruct_fa_calls[1][3]['name'] + expect_sync_paths.remove(failed_path) + failed_obj = None + for obj, diskfiles in tx_objs.items(): + if diskfiles[0]._name == failed_path: + failed_obj = obj + # sanity check + self.assertTrue(tx_objs.pop(failed_obj)) + + # verify on disk files... + self.assertEqual(sorted(expect_sync_paths), sorted(actual_sync_paths)) + self._verify_ondisk_files( + tx_objs, policy, frag_index, rx_node_index) + self._verify_tombstones(tx_tombstones, policy) + + def test_send_with_frag_index_none(self): + policy = POLICIES.default + tx_df_mgr = self.daemon._diskfile_router[policy] + rx_df_mgr = self.rx_controller._diskfile_router[policy] + # create an ec fragment on the remote node + ts1 = next(self.ts_iter) + remote_df = self._create_ondisk_files( + rx_df_mgr, 'o', policy, ts1, (3,))[0] + + # create a tombstone on the local node + df = self._create_ondisk_files( + tx_df_mgr, 'o', policy, ts1, (3,))[0] + suffix = os.path.basename(os.path.dirname(df._datadir)) + ts2 = next(self.ts_iter) + df.delete(ts2) + # a reconstructor revert job with only tombstones will have frag_index + # explicitly set to None + job = { + 'frag_index': None, + 'partition': self.partition, + 'policy': policy, + 'device': self.device, + } + sender = ssync_sender.Sender( + self.daemon, self.rx_node, job, [suffix]) + success, _ = sender() + self.assertTrue(success) + try: + remote_df.read_metadata() + except DiskFileDeleted as e: + self.assertEqual(e.timestamp, ts2) + else: + self.fail('Successfully opened remote DiskFile') + + def test_send_invalid_frag_index(self): + policy = POLICIES.default + job = {'frag_index': 'Not a number', + 'device': self.device, + 'partition': self.partition, + 'policy': policy} + sender = ssync_sender.Sender( + self.daemon, self.rx_node, job, ['abc']) + success, _ = sender() + self.assertFalse(success) + error_log_lines = self.daemon.logger.get_lines_for_level('error') + self.assertEqual(1, len(error_log_lines)) + error_msg = error_log_lines[0] + self.assertIn("Expected status 200; got 400", error_msg) + self.assertIn("Invalid X-Backend-Ssync-Frag-Index 'Not a number'", + error_msg) + + +@patch_policies +class TestSsyncReplication(TestBaseSsync): + def test_sync(self): + policy = POLICIES.default + rx_node_index = 0 + + # create sender side diskfiles... + tx_objs = {} + rx_objs = {} + tx_tombstones = {} + rx_tombstones = {} + tx_df_mgr = self.daemon._diskfile_router[policy] + rx_df_mgr = self.rx_controller._diskfile_router[policy] + # o1 and o2 are on tx only + t1 = next(self.ts_iter) + tx_objs['o1'] = self._create_ondisk_files(tx_df_mgr, 'o1', policy, t1) + t2 = next(self.ts_iter) + tx_objs['o2'] = self._create_ondisk_files(tx_df_mgr, 'o2', policy, t2) + # o3 is on tx and older copy on rx + t3a = next(self.ts_iter) + rx_objs['o3'] = self._create_ondisk_files(rx_df_mgr, 'o3', policy, t3a) + t3b = next(self.ts_iter) + tx_objs['o3'] = self._create_ondisk_files(tx_df_mgr, 'o3', policy, t3b) + # o4 in sync on rx and tx + t4 = next(self.ts_iter) + tx_objs['o4'] = self._create_ondisk_files(tx_df_mgr, 'o4', policy, t4) + rx_objs['o4'] = self._create_ondisk_files(rx_df_mgr, 'o4', policy, t4) + # o5 is a tombstone, missing on receiver + t5 = next(self.ts_iter) + tx_tombstones['o5'] = self._create_ondisk_files( + tx_df_mgr, 'o5', policy, t5) + tx_tombstones['o5'][0].delete(t5) + # o6 is a tombstone, in sync on tx and rx + t6 = next(self.ts_iter) + tx_tombstones['o6'] = self._create_ondisk_files( + tx_df_mgr, 'o6', policy, t6) + tx_tombstones['o6'][0].delete(t6) + rx_tombstones['o6'] = self._create_ondisk_files( + rx_df_mgr, 'o6', policy, t6) + rx_tombstones['o6'][0].delete(t6) + # o7 is a tombstone on tx, older data on rx + t7a = next(self.ts_iter) + rx_objs['o7'] = self._create_ondisk_files(rx_df_mgr, 'o7', policy, t7a) + t7b = next(self.ts_iter) + tx_tombstones['o7'] = self._create_ondisk_files( + tx_df_mgr, 'o7', policy, t7b) + tx_tombstones['o7'][0].delete(t7b) + + suffixes = set() + for diskfiles in (tx_objs.values() + tx_tombstones.values()): + for df in diskfiles: + suffixes.add(os.path.basename(os.path.dirname(df._datadir))) + + # create ssync sender instance... + job = {'device': self.device, + 'partition': self.partition, + 'policy': policy} + node = dict(self.rx_node) + node.update({'index': rx_node_index}) + sender = ssync_sender.Sender(self.daemon, node, job, suffixes) + # wrap connection from tx to rx to capture ssync messages... + sender.connect, trace = self.make_connect_wrapper(sender) + + # run the sync protocol... + success, in_sync_objs = sender() + + self.assertEqual(7, len(in_sync_objs)) + self.assertTrue(success) + + # verify protocol + results = self._analyze_trace(trace) + self.assertEqual(7, len(results['tx_missing'])) + self.assertEqual(5, len(results['rx_missing'])) + self.assertEqual(5, len(results['tx_updates'])) + self.assertFalse(results['rx_updates']) + sync_paths = [] + for subreq in results.get('tx_updates'): + if subreq.get('method') == 'PUT': + self.assertTrue( + subreq['path'] in ('/a/c/o1', '/a/c/o2', '/a/c/o3')) + expected_body = '%s___None' % subreq['path'] + self.assertEqual(expected_body, subreq['body']) + elif subreq.get('method') == 'DELETE': + self.assertTrue(subreq['path'] in ('/a/c/o5', '/a/c/o7')) + sync_paths.append(subreq.get('path')) + self.assertEqual( + ['/a/c/o1', '/a/c/o2', '/a/c/o3', '/a/c/o5', '/a/c/o7'], + sorted(sync_paths)) + + # verify on disk files... + self._verify_ondisk_files(tx_objs, policy) + self._verify_tombstones(tx_tombstones, policy) + + def test_nothing_to_sync(self): + job = {'device': self.device, + 'partition': self.partition, + 'policy': POLICIES.default} + node = {'replication_ip': self.rx_ip, + 'replication_port': self.rx_port, + 'device': self.device, + 'index': 0} + sender = ssync_sender.Sender(self.daemon, node, job, ['abc']) + # wrap connection from tx to rx to capture ssync messages... + sender.connect, trace = self.make_connect_wrapper(sender) + + result, in_sync_objs = sender() + + self.assertTrue(result) + self.assertFalse(in_sync_objs) + results = self._analyze_trace(trace) + self.assertFalse(results['tx_missing']) + self.assertFalse(results['rx_missing']) + self.assertFalse(results['tx_updates']) + self.assertFalse(results['rx_updates']) + # Minimal receiver response as read by sender: + # 2 <-- initial \r\n to start ssync exchange + # + 23 <-- :MISSING CHECK START\r\n + # + 2 <-- \r\n (minimal missing check response) + # + 21 <-- :MISSING CHECK END\r\n + # + 17 <-- :UPDATES START\r\n + # + 15 <-- :UPDATES END\r\n + # TOTAL = 80 + self.assertEqual(80, trace.get('readline_bytes')) + + def test_meta_file_sync(self): + policy = POLICIES.default + rx_node_index = 0 + + # create diskfiles... + tx_objs = {} + rx_objs = {} + tx_tombstones = {} + rx_tombstones = {} + tx_df_mgr = self.daemon._diskfile_router[policy] + rx_df_mgr = self.rx_controller._diskfile_router[policy] + + expected_subreqs = defaultdict(list) + + # o1 on tx only with meta file + t1 = next(self.ts_iter) + tx_objs['o1'] = self._create_ondisk_files(tx_df_mgr, 'o1', policy, t1) + t1_meta = next(self.ts_iter) + metadata = {'X-Timestamp': t1_meta.internal, + 'X-Object-Meta-Test': 'o1', + 'X-Object-Sysmeta-Test': 'sys_o1'} + tx_objs['o1'][0].write_metadata(metadata) + expected_subreqs['PUT'].append('o1') + expected_subreqs['POST'].append('o1') + + # o2 on tx with meta, on rx without meta + t2 = next(self.ts_iter) + tx_objs['o2'] = self._create_ondisk_files(tx_df_mgr, 'o2', policy, t2) + t2_meta = next(self.ts_iter) + metadata = {'X-Timestamp': t2_meta.internal, + 'X-Object-Meta-Test': 'o2', + 'X-Object-Sysmeta-Test': 'sys_o2'} + tx_objs['o2'][0].write_metadata(metadata) + rx_objs['o2'] = self._create_ondisk_files(rx_df_mgr, 'o2', policy, t2) + expected_subreqs['POST'].append('o2') + + # o3 is on tx with meta, rx has newer data but no meta + t3a = next(self.ts_iter) + tx_objs['o3'] = self._create_ondisk_files(tx_df_mgr, 'o3', policy, t3a) + t3b = next(self.ts_iter) + rx_objs['o3'] = self._create_ondisk_files(rx_df_mgr, 'o3', policy, t3b) + t3_meta = next(self.ts_iter) + metadata = {'X-Timestamp': t3_meta.internal, + 'X-Object-Meta-Test': 'o3', + 'X-Object-Sysmeta-Test': 'sys_o3'} + tx_objs['o3'][0].write_metadata(metadata) + expected_subreqs['POST'].append('o3') + + # o4 is on tx with meta, rx has older data and up to date meta + t4a = next(self.ts_iter) + rx_objs['o4'] = self._create_ondisk_files(rx_df_mgr, 'o4', policy, t4a) + t4b = next(self.ts_iter) + tx_objs['o4'] = self._create_ondisk_files(tx_df_mgr, 'o4', policy, t4b) + t4_meta = next(self.ts_iter) + metadata = {'X-Timestamp': t4_meta.internal, + 'X-Object-Meta-Test': 'o4', + 'X-Object-Sysmeta-Test': 'sys_o4'} + tx_objs['o4'][0].write_metadata(metadata) + rx_objs['o4'][0].write_metadata(metadata) + expected_subreqs['PUT'].append('o4') + + # o5 is on tx with meta, rx is in sync with data and meta + t5 = next(self.ts_iter) + rx_objs['o5'] = self._create_ondisk_files(rx_df_mgr, 'o5', policy, t5) + tx_objs['o5'] = self._create_ondisk_files(tx_df_mgr, 'o5', policy, t5) + t5_meta = next(self.ts_iter) + metadata = {'X-Timestamp': t5_meta.internal, + 'X-Object-Meta-Test': 'o5', + 'X-Object-Sysmeta-Test': 'sys_o5'} + tx_objs['o5'][0].write_metadata(metadata) + rx_objs['o5'][0].write_metadata(metadata) + + # o6 is tombstone on tx, rx has older data and meta + t6 = next(self.ts_iter) + tx_tombstones['o6'] = self._create_ondisk_files( + tx_df_mgr, 'o6', policy, t6) + rx_tombstones['o6'] = self._create_ondisk_files( + rx_df_mgr, 'o6', policy, t6) + metadata = {'X-Timestamp': next(self.ts_iter).internal, + 'X-Object-Meta-Test': 'o6', + 'X-Object-Sysmeta-Test': 'sys_o6'} + rx_tombstones['o6'][0].write_metadata(metadata) + tx_tombstones['o6'][0].delete(next(self.ts_iter)) + expected_subreqs['DELETE'].append('o6') + + # o7 is tombstone on rx, tx has older data and meta, + # no subreqs expected... + t7 = next(self.ts_iter) + tx_objs['o7'] = self._create_ondisk_files(tx_df_mgr, 'o7', policy, t7) + rx_tombstones['o7'] = self._create_ondisk_files( + rx_df_mgr, 'o7', policy, t7) + metadata = {'X-Timestamp': next(self.ts_iter).internal, + 'X-Object-Meta-Test': 'o7', + 'X-Object-Sysmeta-Test': 'sys_o7'} + tx_objs['o7'][0].write_metadata(metadata) + rx_tombstones['o7'][0].delete(next(self.ts_iter)) + + suffixes = set() + for diskfiles in (tx_objs.values() + tx_tombstones.values()): + for df in diskfiles: + suffixes.add(os.path.basename(os.path.dirname(df._datadir))) + + # create ssync sender instance... + job = {'device': self.device, + 'partition': self.partition, + 'policy': policy} + node = dict(self.rx_node) + node.update({'index': rx_node_index}) + sender = ssync_sender.Sender(self.daemon, node, job, suffixes) + # wrap connection from tx to rx to capture ssync messages... + sender.connect, trace = self.make_connect_wrapper(sender) + + # run the sync protocol... + success, in_sync_objs = sender() + + self.assertEqual(7, len(in_sync_objs)) + self.assertTrue(success) + + # verify protocol + results = self._analyze_trace(trace) + self.assertEqual(7, len(results['tx_missing'])) + self.assertEqual(5, len(results['rx_missing'])) + for subreq in results.get('tx_updates'): + obj = subreq['path'].split('/')[3] + method = subreq['method'] + self.assertTrue(obj in expected_subreqs[method], + 'Unexpected %s subreq for object %s, expected %s' + % (method, obj, expected_subreqs[method])) + expected_subreqs[method].remove(obj) + if method == 'PUT': + expected_body = '%s___None' % subreq['path'] + self.assertEqual(expected_body, subreq['body']) + # verify all expected subreqs consumed + for _method, expected in expected_subreqs.items(): + self.assertFalse(expected) + self.assertFalse(results['rx_updates']) + + # verify on disk files... + del tx_objs['o7'] # o7 not expected to be sync'd + self._verify_ondisk_files(tx_objs, policy) + self._verify_tombstones(tx_tombstones, policy) + for oname, rx_obj in rx_objs.items(): + df = rx_obj[0].open() + metadata = df.get_metadata() + self.assertEqual(metadata['X-Object-Meta-Test'], oname) + self.assertEqual(metadata['X-Object-Sysmeta-Test'], 'sys_' + oname) + + def test_meta_file_not_synced_to_legacy_receiver(self): + # verify that the sender does sync a data file to a legacy receiver, + # but does not PUT meta file content to a legacy receiver + policy = POLICIES.default + rx_node_index = 0 + + # create diskfiles... + tx_df_mgr = self.daemon._diskfile_router[policy] + rx_df_mgr = self.rx_controller._diskfile_router[policy] + + # rx has data at t1 but no meta + # object is on tx with data at t2, meta at t3, + t1 = next(self.ts_iter) + self._create_ondisk_files(rx_df_mgr, 'o1', policy, t1) + t2 = next(self.ts_iter) + tx_obj = self._create_ondisk_files(tx_df_mgr, 'o1', policy, t2)[0] + t3 = next(self.ts_iter) + metadata = {'X-Timestamp': t3.internal, + 'X-Object-Meta-Test': 'o3', + 'X-Object-Sysmeta-Test': 'sys_o3'} + tx_obj.write_metadata(metadata) + + suffixes = [os.path.basename(os.path.dirname(tx_obj._datadir))] + # create ssync sender instance... + job = {'device': self.device, + 'partition': self.partition, + 'policy': policy} + node = dict(self.rx_node) + node.update({'index': rx_node_index}) + sender = ssync_sender.Sender(self.daemon, node, job, suffixes) + # wrap connection from tx to rx to capture ssync messages... + sender.connect, trace = self.make_connect_wrapper(sender) + + def _legacy_check_missing(self, line): + # reproduces behavior of 'legacy' ssync receiver missing_checks() + parts = line.split() + object_hash = urllib.parse.unquote(parts[0]) + timestamp = urllib.parse.unquote(parts[1]) + want = False + try: + df = self.diskfile_mgr.get_diskfile_from_hash( + self.device, self.partition, object_hash, self.policy, + frag_index=self.frag_index) + except DiskFileNotExist: + want = True + else: + try: + df.open() + except DiskFileDeleted as err: + want = err.timestamp < timestamp + except DiskFileError: + want = True + else: + want = df.timestamp < timestamp + if want: + return urllib.parse.quote(object_hash) + return None + + # run the sync protocol... + func = 'swift.obj.ssync_receiver.Receiver._check_missing' + with mock.patch(func, _legacy_check_missing): + success, in_sync_objs = sender() + + self.assertEqual(1, len(in_sync_objs)) + self.assertTrue(success) + + # verify protocol, expecting only a PUT to legacy receiver + results = self._analyze_trace(trace) + self.assertEqual(1, len(results['tx_missing'])) + self.assertEqual(1, len(results['rx_missing'])) + self.assertEqual(1, len(results['tx_updates'])) + self.assertEqual('PUT', results['tx_updates'][0]['method']) + self.assertFalse(results['rx_updates']) + + # verify on disk files... + rx_obj = self._open_rx_diskfile('o1', policy) + tx_obj = self._open_tx_diskfile('o1', policy) + # with legacy behavior rx_obj data and meta timestamps are equal + self.assertEqual(t2, rx_obj.data_timestamp) + self.assertEqual(t2, rx_obj.timestamp) + # with legacy behavior rx_obj data timestamp should equal tx_obj + self.assertEqual(rx_obj.data_timestamp, tx_obj.data_timestamp) + # tx meta file should not have been sync'd to rx data file + self.assertNotIn('X-Object-Meta-Test', rx_obj.get_metadata()) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/unit/obj/test_ssync_receiver.py b/test/unit/obj/test_ssync_receiver.py index 83fd2796fe..cb92608a85 100644 --- a/test/unit/obj/test_ssync_receiver.py +++ b/test/unit/obj/test_ssync_receiver.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import hashlib import os import shutil import tempfile @@ -26,7 +25,7 @@ import six from swift.common import bufferedhttp from swift.common import exceptions from swift.common import swob -from swift.common.storage_policy import POLICIES, REPL_POLICY +from swift.common.storage_policy import POLICIES from swift.common import utils from swift.common.swob import HTTPException from swift.obj import diskfile @@ -1809,51 +1808,28 @@ class TestSsyncRxServer(unittest.TestCase): # server socket. def setUp(self): - self.ts = unit.make_timestamp_iter() self.rx_ip = '127.0.0.1' # dirs self.tmpdir = tempfile.mkdtemp() self.tempdir = os.path.join(self.tmpdir, 'tmp_test_obj_server') - self.rx_devices = os.path.join(self.tempdir, 'rx/node') - self.tx_devices = os.path.join(self.tempdir, 'tx/node') + self.devices = os.path.join(self.tempdir, 'srv/node') for device in ('sda1', 'sdb1'): - for root in (self.rx_devices, self.tx_devices): - os.makedirs(os.path.join(root, device)) + os.makedirs(os.path.join(self.devices, device)) self.conf = { - 'devices': self.rx_devices, + 'devices': self.devices, 'swift_dir': self.tempdir, - 'mount_check': False, } self.rx_logger = debug_logger('test-object-server') - self.rx_app = server.ObjectController(self.conf, logger=self.rx_logger) + rx_server = server.ObjectController(self.conf, logger=self.rx_logger) self.sock = eventlet.listen((self.rx_ip, 0)) self.rx_server = eventlet.spawn( - eventlet.wsgi.server, self.sock, self.rx_app, utils.NullLogger()) + eventlet.wsgi.server, self.sock, rx_server, utils.NullLogger()) self.rx_port = self.sock.getsockname()[1] - self.tx_logger = debug_logger('test-daemon') - self.policy = POLICIES[0] - self.conf['devices'] = self.tx_devices + self.tx_logger = debug_logger('test-reconstructor') self.daemon = ObjectReconstructor(self.conf, self.tx_logger) - self.daemon._diskfile_mgr = self.daemon._df_router[self.policy] - - self.nodes = [ - { - 'device': 'sda1', - 'ip': '127.0.0.1', - 'replication_ip': '127.0.0.1', - 'port': self.rx_port, - 'replication_port': self.rx_port, - }, - { - 'device': 'sdb1', - 'ip': '127.0.0.1', - 'replication_ip': '127.0.0.1', - 'port': self.rx_port, - 'replication_port': self.rx_port, - }, - ] + self.daemon._diskfile_mgr = self.daemon._df_router[POLICIES[0]] def tearDown(self): self.rx_server.kill() @@ -1926,89 +1902,6 @@ class TestSsyncRxServer(unittest.TestCase): # sanity check that the receiver did not proceed to missing_check self.assertFalse(mock_missing_check.called) - def test_sender_job_missing_frag_node_indexes(self): - # replication jobs don't send frag_index, so we'll use a REPL_POLICY - repl_policy = POLICIES[1] - self.assertEqual(repl_policy.policy_type, REPL_POLICY) - repl_mgr = self.daemon._df_router[repl_policy] - self.daemon._diskfile_mgr = repl_mgr - device = self.nodes[0]['device'] - # create a replicated object, on sender - df = repl_mgr.get_diskfile(device, '0', 'a', 'c', 'o', - policy=repl_policy) - now = next(self.ts) - metadata = { - 'X-Timestamp': now.internal, - 'Content-Type': 'text/plain', - 'Content-Length': '0', - 'ETag': hashlib.md5('').hexdigest(), - } - with df.create() as writer: - writer.write('') - writer.put(metadata) - # sanity the object is on the sender - self.assertTrue(df._datadir.startswith(self.tx_devices)) - # setup a ssync job - suffix = os.path.basename(os.path.dirname(df._datadir)) - job = { - 'partition': 0, - 'policy': repl_policy, - 'device': device, - } - sender = ssync_sender.Sender( - self.daemon, self.nodes[0], job, [suffix]) - success, _ = sender() - self.assertTrue(success) - # sanity object is synced to receiver - remote_df = self.rx_app._diskfile_router[repl_policy].get_diskfile( - device, '0', 'a', 'c', 'o', policy=repl_policy) - self.assertTrue(remote_df._datadir.startswith(self.rx_devices)) - self.assertEqual(remote_df.read_metadata(), metadata) - - def test_send_frag_index_none(self): - # create an ec fragment on the remote node - device = self.nodes[1]['device'] - remote_df = self.rx_app._diskfile_router[self.policy].get_diskfile( - device, '1', 'a', 'c', 'o', policy=self.policy) - ts1 = next(self.ts) - data = 'frag_archive' - metadata = { - 'ETag': hashlib.md5(data).hexdigest(), - 'X-Timestamp': ts1.internal, - 'Content-Length': len(data), - 'X-Object-Sysmeta-Ec-Frag-Index': '3', - } - with remote_df.create() as writer: - writer.write(data) - writer.put(metadata) - writer.commit(ts1) - # create a tombstone on the local node - df = self.daemon._df_router[self.policy].get_diskfile( - device, '1', 'a', 'c', 'o', policy=self.policy) - suffix = os.path.basename(os.path.dirname(df._datadir)) - ts2 = next(self.ts) - df.delete(ts2) - # a reconstructor revert job with only tombstones will have frag_index - # explicitly set to None - job = { - 'frag_index': None, - 'partition': 1, - 'policy': self.policy, - 'device': device, - } - sender = ssync_sender.Sender( - self.daemon, self.nodes[1], job, [suffix]) - success, _ = sender() - self.assertTrue(success) - # diskfile tombstone synced to receiver's datadir with timestamp - self.assertTrue(remote_df._datadir.startswith(self.rx_devices)) - try: - remote_df.read_metadata() - except exceptions.DiskFileDeleted as e: - self.assertEqual(e.timestamp, ts2) - else: - self.fail('Successfully opened remote DiskFile') - def test_bad_request_invalid_frag_index(self): with mock.patch('swift.obj.ssync_receiver.Receiver.missing_check')\ as mock_missing_check: diff --git a/test/unit/obj/test_ssync_sender.py b/test/unit/obj/test_ssync_sender.py index e6b92a291f..60c42855b9 100644 --- a/test/unit/obj/test_ssync_sender.py +++ b/test/unit/obj/test_ssync_sender.py @@ -12,47 +12,21 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. -from collections import defaultdict - -import hashlib import os -import shutil -import tempfile import time import unittest import eventlet -import itertools import mock import six -from six.moves import urllib from swift.common import exceptions, utils from swift.common.storage_policy import POLICIES -from swift.common.exceptions import DiskFileNotExist, DiskFileError, \ - DiskFileDeleted from swift.common.utils import Timestamp -from swift.obj import ssync_sender, diskfile, server, ssync_receiver -from swift.obj.reconstructor import RebuildingECDiskFileStream +from swift.obj import ssync_sender, diskfile, ssync_receiver -from test.unit import debug_logger, patch_policies, make_timestamp_iter - - -class FakeReplicator(object): - def __init__(self, testdir, policy=None): - self.logger = debug_logger('test-ssync-sender') - self.conn_timeout = 1 - self.node_timeout = 2 - self.http_timeout = 3 - self.network_chunk_size = 65536 - self.disk_chunk_size = 4096 - conf = { - 'devices': testdir, - 'mount_check': 'false', - } - policy = POLICIES.default if policy is None else policy - self._diskfile_router = diskfile.DiskFileRouter(conf, self.logger) - self._diskfile_mgr = self._diskfile_router[policy] +from test.unit import patch_policies, make_timestamp_iter +from test.unit.obj.common import FakeReplicator, BaseTest class NullBufferedHTTPConnection(object): @@ -105,49 +79,16 @@ class FakeConnection(object): self.closed = True -class BaseTestSender(unittest.TestCase): +@patch_policies() +class TestSender(BaseTest): + def setUp(self): - self.tmpdir = tempfile.mkdtemp() + super(TestSender, self).setUp() self.testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender') utils.mkdirs(os.path.join(self.testdir, 'dev')) self.daemon = FakeReplicator(self.testdir) self.sender = ssync_sender.Sender(self.daemon, None, None, None) - def tearDown(self): - shutil.rmtree(self.tmpdir, ignore_errors=True) - - def _make_open_diskfile(self, device='dev', partition='9', - account='a', container='c', obj='o', body='test', - extra_metadata=None, policy=None, - frag_index=None, timestamp=None, df_mgr=None): - policy = policy or POLICIES.legacy - object_parts = account, container, obj - timestamp = Timestamp(time.time()) if timestamp is None else timestamp - if df_mgr is None: - df_mgr = self.daemon._diskfile_router[policy] - df = df_mgr.get_diskfile( - device, partition, *object_parts, policy=policy, - frag_index=frag_index) - content_length = len(body) - etag = hashlib.md5(body).hexdigest() - with df.create() as writer: - writer.write(body) - metadata = { - 'X-Timestamp': timestamp.internal, - 'Content-Length': str(content_length), - 'ETag': etag, - } - if extra_metadata: - metadata.update(extra_metadata) - writer.put(metadata) - writer.commit(timestamp) - df.open() - return df - - -@patch_policies() -class TestSender(BaseTestSender): - def test_call_catches_MessageTimeout(self): def connect(self): @@ -1598,826 +1539,6 @@ class TestSender(BaseTestSender): self.assertTrue(self.sender.connection.closed) -class TestBaseSsync(BaseTestSender): - """ - Provides a framework to test end to end interactions between sender and - receiver. The basis for each test is actual diskfile state on either side. - The connection between sender and receiver is wrapped to capture ssync - traffic for subsequent verification of the protocol. Assertions are made - about the final state of the sender and receiver diskfiles. - """ - - def make_connect_wrapper(self, sender): - """ - Make a wrapper function for the ssync_sender.Sender.connect() method - that will in turn wrap the HTTConnection.send() and the - Sender.readline() so that ssync protocol messages can be captured. - """ - orig_connect = sender.connect - trace = dict(messages=[]) - - def add_trace(type, msg): - # record a protocol event for later analysis - if msg.strip(): - trace['messages'].append((type, msg.strip())) - - def make_send_wrapper(send): - def wrapped_send(msg): - _msg = msg.split('\r\n', 1)[1] - _msg = _msg.rsplit('\r\n', 1)[0] - add_trace('tx', _msg) - send(msg) - return wrapped_send - - def make_readline_wrapper(readline): - def wrapped_readline(): - data = readline() - add_trace('rx', data) - bytes_read = trace.setdefault('readline_bytes', 0) - trace['readline_bytes'] = bytes_read + len(data) - return data - return wrapped_readline - - def wrapped_connect(): - orig_connect() - sender.connection.send = make_send_wrapper( - sender.connection.send) - sender.readline = make_readline_wrapper(sender.readline) - return wrapped_connect, trace - - def setUp(self): - self.device = 'dev' - self.partition = '9' - self.tmpdir = tempfile.mkdtemp() - # sender side setup - self.tx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender') - utils.mkdirs(os.path.join(self.tx_testdir, self.device)) - self.daemon = FakeReplicator(self.tx_testdir) - - # rx side setup - self.rx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_receiver') - utils.mkdirs(os.path.join(self.rx_testdir, self.device)) - conf = { - 'devices': self.rx_testdir, - 'mount_check': 'false', - 'replication_one_per_device': 'false', - 'log_requests': 'false'} - self.rx_controller = server.ObjectController(conf) - self.ts_iter = (Timestamp(t) - for t in itertools.count(int(time.time()))) - self.rx_ip = '127.0.0.1' - sock = eventlet.listen((self.rx_ip, 0)) - self.rx_server = eventlet.spawn( - eventlet.wsgi.server, sock, self.rx_controller, utils.NullLogger()) - self.rx_port = sock.getsockname()[1] - self.rx_node = {'replication_ip': self.rx_ip, - 'replication_port': self.rx_port, - 'device': self.device} - - def tearDown(self): - self.rx_server.kill() - shutil.rmtree(self.tmpdir, ignore_errors=True) - - def _create_ondisk_files(self, df_mgr, obj_name, policy, timestamp, - frag_indexes=None): - frag_indexes = [None] if frag_indexes is None else frag_indexes - metadata = {'Content-Type': 'plain/text'} - diskfiles = [] - for frag_index in frag_indexes: - object_data = '/a/c/%s___%s' % (obj_name, frag_index) - if frag_index is not None: - metadata['X-Object-Sysmeta-Ec-Frag-Index'] = str(frag_index) - df = self._make_open_diskfile( - device=self.device, partition=self.partition, account='a', - container='c', obj=obj_name, body=object_data, - extra_metadata=metadata, timestamp=timestamp, policy=policy, - frag_index=frag_index, df_mgr=df_mgr) - # sanity checks - listing = os.listdir(df._datadir) - self.assertTrue(listing) - for filename in listing: - self.assertTrue(filename.startswith(timestamp.internal)) - diskfiles.append(df) - return diskfiles - - def _open_tx_diskfile(self, obj_name, policy, frag_index=None): - df_mgr = self.daemon._diskfile_router[policy] - df = df_mgr.get_diskfile( - self.device, self.partition, account='a', container='c', - obj=obj_name, policy=policy, frag_index=frag_index) - df.open() - return df - - def _open_rx_diskfile(self, obj_name, policy, frag_index=None): - df = self.rx_controller.get_diskfile( - self.device, self.partition, 'a', 'c', obj_name, policy=policy, - frag_index=frag_index) - df.open() - return df - - def _verify_diskfile_sync(self, tx_df, rx_df, frag_index, same_etag=False): - # verify that diskfiles' metadata match - # sanity check, they are not the same ondisk files! - self.assertNotEqual(tx_df._datadir, rx_df._datadir) - rx_metadata = dict(rx_df.get_metadata()) - for k, v in tx_df.get_metadata().items(): - if k == 'X-Object-Sysmeta-Ec-Frag-Index': - # if tx_df had a frag_index then rx_df should also have one - self.assertTrue(k in rx_metadata) - self.assertEqual(frag_index, int(rx_metadata.pop(k))) - elif k == 'ETag' and not same_etag: - self.assertNotEqual(v, rx_metadata.pop(k, None)) - continue - else: - self.assertEqual(v, rx_metadata.pop(k), k) - self.assertFalse(rx_metadata) - expected_body = '%s___%s' % (tx_df._name, frag_index) - actual_body = ''.join([chunk for chunk in rx_df.reader()]) - self.assertEqual(expected_body, actual_body) - - def _analyze_trace(self, trace): - """ - Parse protocol trace captured by fake connection, making some - assertions along the way, and return results as a dict of form: - results = {'tx_missing': , - 'rx_missing': , - 'tx_updates': , - 'rx_updates': } - - Each subreq is a dict with keys: 'method', 'path', 'headers', 'body' - """ - def tx_missing(results, line): - self.assertEqual('tx', line[0]) - results['tx_missing'].append(line[1]) - - def rx_missing(results, line): - self.assertEqual('rx', line[0]) - parts = line[1].split('\r\n') - for part in parts: - results['rx_missing'].append(part) - - def tx_updates(results, line): - self.assertEqual('tx', line[0]) - subrequests = results['tx_updates'] - if line[1].startswith(('PUT', 'DELETE', 'POST')): - parts = line[1].split('\r\n') - method, path = parts[0].split() - subreq = {'method': method, 'path': path, 'req': line[1], - 'headers': parts[1:]} - subrequests.append(subreq) - else: - self.assertTrue(subrequests) - body = (subrequests[-1]).setdefault('body', '') - body += line[1] - subrequests[-1]['body'] = body - - def rx_updates(results, line): - self.assertEqual('rx', line[0]) - results.setdefault['rx_updates'].append(line[1]) - - def unexpected(results, line): - results.setdefault('unexpected', []).append(line) - - # each trace line is a tuple of ([tx|rx], msg) - handshakes = iter([(('tx', ':MISSING_CHECK: START'), tx_missing), - (('tx', ':MISSING_CHECK: END'), unexpected), - (('rx', ':MISSING_CHECK: START'), rx_missing), - (('rx', ':MISSING_CHECK: END'), unexpected), - (('tx', ':UPDATES: START'), tx_updates), - (('tx', ':UPDATES: END'), unexpected), - (('rx', ':UPDATES: START'), rx_updates), - (('rx', ':UPDATES: END'), unexpected)]) - expect_handshake = next(handshakes) - phases = ('tx_missing', 'rx_missing', 'tx_updates', 'rx_updates') - results = dict((k, []) for k in phases) - handler = unexpected - lines = list(trace.get('messages', [])) - lines.reverse() - while lines: - line = lines.pop() - if line == expect_handshake[0]: - handler = expect_handshake[1] - try: - expect_handshake = next(handshakes) - except StopIteration: - # should be the last line - self.assertFalse( - lines, 'Unexpected trailing lines %s' % lines) - continue - handler(results, line) - - try: - # check all handshakes occurred - missed = next(handshakes) - self.fail('Handshake %s not found' % str(missed[0])) - except StopIteration: - pass - # check no message outside of a phase - self.assertFalse(results.get('unexpected'), - 'Message outside of a phase: %s' % results.get(None)) - return results - - def _verify_ondisk_files(self, tx_objs, policy, tx_frag_index=None, - rx_frag_index=None): - """ - Verify tx and rx files that should be in sync. - :param tx_objs: sender diskfiles - :param policy: storage policy instance - :param tx_frag_index: the fragment index of tx diskfiles that should - have been used as a source for sync'ing - :param rx_frag_index: the fragment index of expected rx diskfiles - """ - for o_name, diskfiles in tx_objs.items(): - for tx_df in diskfiles: - # check tx file still intact - ssync does not do any cleanup! - tx_df.open() - if tx_frag_index is None or tx_df._frag_index == tx_frag_index: - # this diskfile should have been sync'd, - # check rx file is ok - rx_df = self._open_rx_diskfile( - o_name, policy, rx_frag_index) - # for EC revert job or replication etags should match - match_etag = (tx_frag_index == rx_frag_index) - self._verify_diskfile_sync( - tx_df, rx_df, rx_frag_index, match_etag) - else: - # this diskfile should not have been sync'd, - # check no rx file, - self.assertRaises(DiskFileNotExist, self._open_rx_diskfile, - o_name, policy, - frag_index=tx_df._frag_index) - - def _verify_tombstones(self, tx_objs, policy): - # verify tx and rx tombstones that should be in sync - for o_name, diskfiles in tx_objs.items(): - try: - self._open_tx_diskfile(o_name, policy) - self.fail('DiskFileDeleted expected') - except DiskFileDeleted as exc: - tx_delete_time = exc.timestamp - try: - self._open_rx_diskfile(o_name, policy) - self.fail('DiskFileDeleted expected') - except DiskFileDeleted as exc: - rx_delete_time = exc.timestamp - self.assertEqual(tx_delete_time, rx_delete_time) - - -@patch_policies(with_ec_default=True) -class TestSsyncEC(TestBaseSsync): - def test_handoff_fragment_revert(self): - # test that a sync_revert type job does send the correct frag archives - # to the receiver - policy = POLICIES.default - rx_node_index = 0 - tx_node_index = 1 - # for a revert job we iterate over frag index that belongs on - # remote node - frag_index = rx_node_index - - # create sender side diskfiles... - tx_objs = {} - rx_objs = {} - tx_tombstones = {} - tx_df_mgr = self.daemon._diskfile_router[policy] - rx_df_mgr = self.rx_controller._diskfile_router[policy] - # o1 has primary and handoff fragment archives - t1 = next(self.ts_iter) - tx_objs['o1'] = self._create_ondisk_files( - tx_df_mgr, 'o1', policy, t1, (rx_node_index, tx_node_index)) - # o2 only has primary - t2 = next(self.ts_iter) - tx_objs['o2'] = self._create_ondisk_files( - tx_df_mgr, 'o2', policy, t2, (tx_node_index,)) - # o3 only has handoff - t3 = next(self.ts_iter) - tx_objs['o3'] = self._create_ondisk_files( - tx_df_mgr, 'o3', policy, t3, (rx_node_index,)) - # o4 primary and handoff fragment archives on tx, handoff in sync on rx - t4 = next(self.ts_iter) - tx_objs['o4'] = self._create_ondisk_files( - tx_df_mgr, 'o4', policy, t4, (tx_node_index, rx_node_index,)) - rx_objs['o4'] = self._create_ondisk_files( - rx_df_mgr, 'o4', policy, t4, (rx_node_index,)) - # o5 is a tombstone, missing on receiver - t5 = next(self.ts_iter) - tx_tombstones['o5'] = self._create_ondisk_files( - tx_df_mgr, 'o5', policy, t5, (tx_node_index,)) - tx_tombstones['o5'][0].delete(t5) - - suffixes = set() - for diskfiles in (tx_objs.values() + tx_tombstones.values()): - for df in diskfiles: - suffixes.add(os.path.basename(os.path.dirname(df._datadir))) - - # create ssync sender instance... - job = {'device': self.device, - 'partition': self.partition, - 'policy': policy, - 'frag_index': frag_index} - node = dict(self.rx_node) - node.update({'index': rx_node_index}) - sender = ssync_sender.Sender(self.daemon, node, job, suffixes) - # wrap connection from tx to rx to capture ssync messages... - sender.connect, trace = self.make_connect_wrapper(sender) - - # run the sync protocol... - sender() - - # verify protocol - results = self._analyze_trace(trace) - # sender has handoff frags for o1, o3 and o4 and ts for o5 - self.assertEqual(4, len(results['tx_missing'])) - # receiver is missing frags for o1, o3 and ts for o5 - self.assertEqual(3, len(results['rx_missing'])) - self.assertEqual(3, len(results['tx_updates'])) - self.assertFalse(results['rx_updates']) - sync_paths = [] - for subreq in results.get('tx_updates'): - if subreq.get('method') == 'PUT': - self.assertTrue( - 'X-Object-Sysmeta-Ec-Frag-Index: %s' % rx_node_index - in subreq.get('headers')) - expected_body = '%s___%s' % (subreq['path'], rx_node_index) - self.assertEqual(expected_body, subreq['body']) - elif subreq.get('method') == 'DELETE': - self.assertEqual('/a/c/o5', subreq['path']) - sync_paths.append(subreq.get('path')) - self.assertEqual(['/a/c/o1', '/a/c/o3', '/a/c/o5'], sorted(sync_paths)) - - # verify on disk files... - self._verify_ondisk_files( - tx_objs, policy, frag_index, rx_node_index) - self._verify_tombstones(tx_tombstones, policy) - - def test_fragment_sync(self): - # check that a sync_only type job does call reconstructor to build a - # diskfile to send, and continues making progress despite an error - # when building one diskfile - policy = POLICIES.default - rx_node_index = 0 - tx_node_index = 1 - # for a sync job we iterate over frag index that belongs on local node - frag_index = tx_node_index - - # create sender side diskfiles... - tx_objs = {} - tx_tombstones = {} - rx_objs = {} - tx_df_mgr = self.daemon._diskfile_router[policy] - rx_df_mgr = self.rx_controller._diskfile_router[policy] - # o1 only has primary - t1 = next(self.ts_iter) - tx_objs['o1'] = self._create_ondisk_files( - tx_df_mgr, 'o1', policy, t1, (tx_node_index,)) - # o2 only has primary - t2 = next(self.ts_iter) - tx_objs['o2'] = self._create_ondisk_files( - tx_df_mgr, 'o2', policy, t2, (tx_node_index,)) - # o3 only has primary - t3 = next(self.ts_iter) - tx_objs['o3'] = self._create_ondisk_files( - tx_df_mgr, 'o3', policy, t3, (tx_node_index,)) - # o4 primary fragment archives on tx, handoff in sync on rx - t4 = next(self.ts_iter) - tx_objs['o4'] = self._create_ondisk_files( - tx_df_mgr, 'o4', policy, t4, (tx_node_index,)) - rx_objs['o4'] = self._create_ondisk_files( - rx_df_mgr, 'o4', policy, t4, (rx_node_index,)) - # o5 is a tombstone, missing on receiver - t5 = next(self.ts_iter) - tx_tombstones['o5'] = self._create_ondisk_files( - tx_df_mgr, 'o5', policy, t5, (tx_node_index,)) - tx_tombstones['o5'][0].delete(t5) - - suffixes = set() - for diskfiles in (tx_objs.values() + tx_tombstones.values()): - for df in diskfiles: - suffixes.add(os.path.basename(os.path.dirname(df._datadir))) - - reconstruct_fa_calls = [] - - def fake_reconstruct_fa(job, node, metadata): - reconstruct_fa_calls.append((job, node, policy, metadata)) - if len(reconstruct_fa_calls) == 2: - # simulate second reconstruct failing - raise DiskFileError - content = '%s___%s' % (metadata['name'], rx_node_index) - return RebuildingECDiskFileStream( - metadata, rx_node_index, iter([content])) - - # create ssync sender instance... - job = {'device': self.device, - 'partition': self.partition, - 'policy': policy, - 'frag_index': frag_index, - 'sync_diskfile_builder': fake_reconstruct_fa} - node = dict(self.rx_node) - node.update({'index': rx_node_index}) - sender = ssync_sender.Sender(self.daemon, node, job, suffixes) - # wrap connection from tx to rx to capture ssync messages... - sender.connect, trace = self.make_connect_wrapper(sender) - - # run the sync protocol... - sender() - - # verify protocol - results = self._analyze_trace(trace) - # sender has primary for o1, o2 and o3, o4 and ts for o5 - self.assertEqual(5, len(results['tx_missing'])) - # receiver is missing o1, o2 and o3 and ts for o5 - self.assertEqual(4, len(results['rx_missing'])) - # sender can only construct 2 out of 3 missing frags - self.assertEqual(3, len(results['tx_updates'])) - self.assertEqual(3, len(reconstruct_fa_calls)) - self.assertFalse(results['rx_updates']) - actual_sync_paths = [] - for subreq in results.get('tx_updates'): - if subreq.get('method') == 'PUT': - self.assertTrue( - 'X-Object-Sysmeta-Ec-Frag-Index: %s' % rx_node_index - in subreq.get('headers')) - expected_body = '%s___%s' % (subreq['path'], rx_node_index) - self.assertEqual(expected_body, subreq['body']) - elif subreq.get('method') == 'DELETE': - self.assertEqual('/a/c/o5', subreq['path']) - actual_sync_paths.append(subreq.get('path')) - - # remove the failed df from expected synced df's - expect_sync_paths = ['/a/c/o1', '/a/c/o2', '/a/c/o3', '/a/c/o5'] - failed_path = reconstruct_fa_calls[1][3]['name'] - expect_sync_paths.remove(failed_path) - failed_obj = None - for obj, diskfiles in tx_objs.items(): - if diskfiles[0]._name == failed_path: - failed_obj = obj - # sanity check - self.assertTrue(tx_objs.pop(failed_obj)) - - # verify on disk files... - self.assertEqual(sorted(expect_sync_paths), sorted(actual_sync_paths)) - self._verify_ondisk_files( - tx_objs, policy, frag_index, rx_node_index) - self._verify_tombstones(tx_tombstones, policy) - - -@patch_policies -class TestSsyncReplication(TestBaseSsync): - def test_sync(self): - policy = POLICIES.default - rx_node_index = 0 - - # create sender side diskfiles... - tx_objs = {} - rx_objs = {} - tx_tombstones = {} - rx_tombstones = {} - tx_df_mgr = self.daemon._diskfile_router[policy] - rx_df_mgr = self.rx_controller._diskfile_router[policy] - # o1 and o2 are on tx only - t1 = next(self.ts_iter) - tx_objs['o1'] = self._create_ondisk_files(tx_df_mgr, 'o1', policy, t1) - t2 = next(self.ts_iter) - tx_objs['o2'] = self._create_ondisk_files(tx_df_mgr, 'o2', policy, t2) - # o3 is on tx and older copy on rx - t3a = next(self.ts_iter) - rx_objs['o3'] = self._create_ondisk_files(rx_df_mgr, 'o3', policy, t3a) - t3b = next(self.ts_iter) - tx_objs['o3'] = self._create_ondisk_files(tx_df_mgr, 'o3', policy, t3b) - # o4 in sync on rx and tx - t4 = next(self.ts_iter) - tx_objs['o4'] = self._create_ondisk_files(tx_df_mgr, 'o4', policy, t4) - rx_objs['o4'] = self._create_ondisk_files(rx_df_mgr, 'o4', policy, t4) - # o5 is a tombstone, missing on receiver - t5 = next(self.ts_iter) - tx_tombstones['o5'] = self._create_ondisk_files( - tx_df_mgr, 'o5', policy, t5) - tx_tombstones['o5'][0].delete(t5) - # o6 is a tombstone, in sync on tx and rx - t6 = next(self.ts_iter) - tx_tombstones['o6'] = self._create_ondisk_files( - tx_df_mgr, 'o6', policy, t6) - tx_tombstones['o6'][0].delete(t6) - rx_tombstones['o6'] = self._create_ondisk_files( - rx_df_mgr, 'o6', policy, t6) - rx_tombstones['o6'][0].delete(t6) - # o7 is a tombstone on tx, older data on rx - t7a = next(self.ts_iter) - rx_objs['o7'] = self._create_ondisk_files(rx_df_mgr, 'o7', policy, t7a) - t7b = next(self.ts_iter) - tx_tombstones['o7'] = self._create_ondisk_files( - tx_df_mgr, 'o7', policy, t7b) - tx_tombstones['o7'][0].delete(t7b) - - suffixes = set() - for diskfiles in (tx_objs.values() + tx_tombstones.values()): - for df in diskfiles: - suffixes.add(os.path.basename(os.path.dirname(df._datadir))) - - # create ssync sender instance... - job = {'device': self.device, - 'partition': self.partition, - 'policy': policy} - node = dict(self.rx_node) - node.update({'index': rx_node_index}) - sender = ssync_sender.Sender(self.daemon, node, job, suffixes) - # wrap connection from tx to rx to capture ssync messages... - sender.connect, trace = self.make_connect_wrapper(sender) - - # run the sync protocol... - success, in_sync_objs = sender() - - self.assertEqual(7, len(in_sync_objs), trace['messages']) - self.assertTrue(success) - - # verify protocol - results = self._analyze_trace(trace) - self.assertEqual(7, len(results['tx_missing'])) - self.assertEqual(5, len(results['rx_missing'])) - self.assertEqual(5, len(results['tx_updates'])) - self.assertFalse(results['rx_updates']) - sync_paths = [] - for subreq in results.get('tx_updates'): - if subreq.get('method') == 'PUT': - self.assertTrue( - subreq['path'] in ('/a/c/o1', '/a/c/o2', '/a/c/o3')) - expected_body = '%s___None' % subreq['path'] - self.assertEqual(expected_body, subreq['body']) - elif subreq.get('method') == 'DELETE': - self.assertTrue(subreq['path'] in ('/a/c/o5', '/a/c/o7')) - sync_paths.append(subreq.get('path')) - self.assertEqual( - ['/a/c/o1', '/a/c/o2', '/a/c/o3', '/a/c/o5', '/a/c/o7'], - sorted(sync_paths)) - - # verify on disk files... - self._verify_ondisk_files(tx_objs, policy) - self._verify_tombstones(tx_tombstones, policy) - - def test_nothing_to_sync(self): - job = {'device': self.device, - 'partition': self.partition, - 'policy': POLICIES.default} - node = {'replication_ip': self.rx_ip, - 'replication_port': self.rx_port, - 'device': self.device, - 'index': 0} - sender = ssync_sender.Sender(self.daemon, node, job, ['abc']) - # wrap connection from tx to rx to capture ssync messages... - sender.connect, trace = self.make_connect_wrapper(sender) - - result, in_sync_objs = sender() - - self.assertTrue(result) - self.assertFalse(in_sync_objs) - results = self._analyze_trace(trace) - self.assertFalse(results['tx_missing']) - self.assertFalse(results['rx_missing']) - self.assertFalse(results['tx_updates']) - self.assertFalse(results['rx_updates']) - # Minimal receiver response as read by sender: - # 2 <-- initial \r\n to start ssync exchange - # + 23 <-- :MISSING CHECK START\r\n - # + 2 <-- \r\n (minimal missing check response) - # + 21 <-- :MISSING CHECK END\r\n - # + 17 <-- :UPDATES START\r\n - # + 15 <-- :UPDATES END\r\n - # TOTAL = 80 - self.assertEqual(80, trace.get('readline_bytes')) - - def test_meta_file_sync(self): - policy = POLICIES.default - rx_node_index = 0 - - # create diskfiles... - tx_objs = {} - rx_objs = {} - tx_tombstones = {} - rx_tombstones = {} - tx_df_mgr = self.daemon._diskfile_router[policy] - rx_df_mgr = self.rx_controller._diskfile_router[policy] - - expected_subreqs = defaultdict(list) - - # o1 on tx only with meta file - t1 = next(self.ts_iter) - tx_objs['o1'] = self._create_ondisk_files(tx_df_mgr, 'o1', policy, t1) - t1_meta = next(self.ts_iter) - metadata = {'X-Timestamp': t1_meta.internal, - 'X-Object-Meta-Test': 'o1', - 'X-Object-Sysmeta-Test': 'sys_o1'} - tx_objs['o1'][0].write_metadata(metadata) - expected_subreqs['PUT'].append('o1') - expected_subreqs['POST'].append('o1') - - # o2 on tx with meta, on rx without meta - t2 = next(self.ts_iter) - tx_objs['o2'] = self._create_ondisk_files(tx_df_mgr, 'o2', policy, t2) - t2_meta = next(self.ts_iter) - metadata = {'X-Timestamp': t2_meta.internal, - 'X-Object-Meta-Test': 'o2', - 'X-Object-Sysmeta-Test': 'sys_o2'} - tx_objs['o2'][0].write_metadata(metadata) - rx_objs['o2'] = self._create_ondisk_files(rx_df_mgr, 'o2', policy, t2) - expected_subreqs['POST'].append('o2') - - # o3 is on tx with meta, rx has newer data but no meta - t3a = next(self.ts_iter) - tx_objs['o3'] = self._create_ondisk_files(tx_df_mgr, 'o3', policy, t3a) - t3b = next(self.ts_iter) - rx_objs['o3'] = self._create_ondisk_files(rx_df_mgr, 'o3', policy, t3b) - t3_meta = next(self.ts_iter) - metadata = {'X-Timestamp': t3_meta.internal, - 'X-Object-Meta-Test': 'o3', - 'X-Object-Sysmeta-Test': 'sys_o3'} - tx_objs['o3'][0].write_metadata(metadata) - expected_subreqs['POST'].append('o3') - - # o4 is on tx with meta, rx has older data and up to date meta - t4a = next(self.ts_iter) - rx_objs['o4'] = self._create_ondisk_files(rx_df_mgr, 'o4', policy, t4a) - t4b = next(self.ts_iter) - tx_objs['o4'] = self._create_ondisk_files(tx_df_mgr, 'o4', policy, t4b) - t4_meta = next(self.ts_iter) - metadata = {'X-Timestamp': t4_meta.internal, - 'X-Object-Meta-Test': 'o4', - 'X-Object-Sysmeta-Test': 'sys_o4'} - tx_objs['o4'][0].write_metadata(metadata) - rx_objs['o4'][0].write_metadata(metadata) - expected_subreqs['PUT'].append('o4') - - # o5 is on tx with meta, rx is in sync with data and meta - t5 = next(self.ts_iter) - rx_objs['o5'] = self._create_ondisk_files(rx_df_mgr, 'o5', policy, t5) - tx_objs['o5'] = self._create_ondisk_files(tx_df_mgr, 'o5', policy, t5) - t5_meta = next(self.ts_iter) - metadata = {'X-Timestamp': t5_meta.internal, - 'X-Object-Meta-Test': 'o5', - 'X-Object-Sysmeta-Test': 'sys_o5'} - tx_objs['o5'][0].write_metadata(metadata) - rx_objs['o5'][0].write_metadata(metadata) - - # o6 is tombstone on tx, rx has older data and meta - t6 = next(self.ts_iter) - tx_tombstones['o6'] = self._create_ondisk_files( - tx_df_mgr, 'o6', policy, t6) - rx_tombstones['o6'] = self._create_ondisk_files( - rx_df_mgr, 'o6', policy, t6) - metadata = {'X-Timestamp': next(self.ts_iter).internal, - 'X-Object-Meta-Test': 'o6', - 'X-Object-Sysmeta-Test': 'sys_o6'} - rx_tombstones['o6'][0].write_metadata(metadata) - tx_tombstones['o6'][0].delete(next(self.ts_iter)) - expected_subreqs['DELETE'].append('o6') - - # o7 is tombstone on rx, tx has older data and meta, - # no subreqs expected... - t7 = next(self.ts_iter) - tx_objs['o7'] = self._create_ondisk_files(tx_df_mgr, 'o7', policy, t7) - rx_tombstones['o7'] = self._create_ondisk_files( - rx_df_mgr, 'o7', policy, t7) - metadata = {'X-Timestamp': next(self.ts_iter).internal, - 'X-Object-Meta-Test': 'o7', - 'X-Object-Sysmeta-Test': 'sys_o7'} - tx_objs['o7'][0].write_metadata(metadata) - rx_tombstones['o7'][0].delete(next(self.ts_iter)) - - suffixes = set() - for diskfiles in (tx_objs.values() + tx_tombstones.values()): - for df in diskfiles: - suffixes.add(os.path.basename(os.path.dirname(df._datadir))) - - # create ssync sender instance... - job = {'device': self.device, - 'partition': self.partition, - 'policy': policy} - node = dict(self.rx_node) - node.update({'index': rx_node_index}) - sender = ssync_sender.Sender(self.daemon, node, job, suffixes) - # wrap connection from tx to rx to capture ssync messages... - sender.connect, trace = self.make_connect_wrapper(sender) - - # run the sync protocol... - success, in_sync_objs = sender() - - self.assertEqual(7, len(in_sync_objs)) - self.assertTrue(success) - - # verify protocol - results = self._analyze_trace(trace) - self.assertEqual(7, len(results['tx_missing'])) - self.assertEqual(5, len(results['rx_missing'])) - for subreq in results.get('tx_updates'): - obj = subreq['path'].split('/')[3] - method = subreq['method'] - self.assertTrue(obj in expected_subreqs[method], - 'Unexpected %s subreq for object %s, expected %s' - % (method, obj, expected_subreqs[method])) - expected_subreqs[method].remove(obj) - if method == 'PUT': - expected_body = '%s___None' % subreq['path'] - self.assertEqual(expected_body, subreq['body']) - # verify all expected subreqs consumed - for _method, expected in expected_subreqs.items(): - self.assertFalse(expected) - self.assertFalse(results['rx_updates']) - - # verify on disk files... - del tx_objs['o7'] # o7 not expected to be sync'd - self._verify_ondisk_files(tx_objs, policy) - self._verify_tombstones(tx_tombstones, policy) - for oname, rx_obj in rx_objs.items(): - df = rx_obj[0].open() - metadata = df.get_metadata() - self.assertEqual(metadata['X-Object-Meta-Test'], oname) - self.assertEqual(metadata['X-Object-Sysmeta-Test'], 'sys_' + oname) - - def test_meta_file_not_synced_to_legacy_receiver(self): - # verify that the sender does sync a data file to a legacy receiver, - # but does not PUT meta file content to a legacy receiver - policy = POLICIES.default - rx_node_index = 0 - - # create diskfiles... - tx_df_mgr = self.daemon._diskfile_router[policy] - rx_df_mgr = self.rx_controller._diskfile_router[policy] - - # rx has data at t1 but no meta - # object is on tx with data at t2, meta at t3, - t1 = next(self.ts_iter) - self._create_ondisk_files(rx_df_mgr, 'o1', policy, t1) - t2 = next(self.ts_iter) - tx_obj = self._create_ondisk_files(tx_df_mgr, 'o1', policy, t2)[0] - t3 = next(self.ts_iter) - metadata = {'X-Timestamp': t3.internal, - 'X-Object-Meta-Test': 'o3', - 'X-Object-Sysmeta-Test': 'sys_o3'} - tx_obj.write_metadata(metadata) - - suffixes = [os.path.basename(os.path.dirname(tx_obj._datadir))] - # create ssync sender instance... - job = {'device': self.device, - 'partition': self.partition, - 'policy': policy} - node = dict(self.rx_node) - node.update({'index': rx_node_index}) - sender = ssync_sender.Sender(self.daemon, node, job, suffixes) - # wrap connection from tx to rx to capture ssync messages... - sender.connect, trace = self.make_connect_wrapper(sender) - - def _legacy_check_missing(self, line): - # reproduces behavior of 'legacy' ssync receiver missing_checks() - parts = line.split() - object_hash = urllib.parse.unquote(parts[0]) - timestamp = urllib.parse.unquote(parts[1]) - want = False - try: - df = self.diskfile_mgr.get_diskfile_from_hash( - self.device, self.partition, object_hash, self.policy, - frag_index=self.frag_index) - except exceptions.DiskFileNotExist: - want = True - else: - try: - df.open() - except exceptions.DiskFileDeleted as err: - want = err.timestamp < timestamp - except exceptions.DiskFileError as err: - want = True - else: - want = df.timestamp < timestamp - if want: - return urllib.parse.quote(object_hash) - return None - - # run the sync protocol... - func = 'swift.obj.ssync_receiver.Receiver._check_missing' - with mock.patch(func, _legacy_check_missing): - success, in_sync_objs = sender() - - self.assertEqual(1, len(in_sync_objs)) - self.assertTrue(success) - - # verify protocol, expecting only a PUT to legacy receiver - results = self._analyze_trace(trace) - self.assertEqual(1, len(results['tx_missing'])) - self.assertEqual(1, len(results['rx_missing'])) - self.assertEqual(1, len(results['tx_updates'])) - self.assertEqual('PUT', results['tx_updates'][0]['method']) - self.assertFalse(results['rx_updates']) - - # verify on disk files... - rx_obj = self._open_rx_diskfile('o1', policy) - tx_obj = self._open_tx_diskfile('o1', policy) - # with legacy behavior rx_obj data and meta timestamps are equal - self.assertEqual(t2, rx_obj.data_timestamp) - self.assertEqual(t2, rx_obj.timestamp) - # with legacy behavior rx_obj data timestamp should equal tx_obj - self.assertEqual(rx_obj.data_timestamp, tx_obj.data_timestamp) - # tx meta file should not have been sync'd to rx data file - self.assertNotIn('X-Object-Meta-Test', rx_obj.get_metadata()) - - class TestModuleMethods(unittest.TestCase): def test_encode_missing(self): object_hash = '9d41d8cd98f00b204e9800998ecf0abc' @@ -2458,7 +1579,7 @@ class TestModuleMethods(unittest.TestCase): expected = {'data': True, 'meta': True} self.assertEqual(ssync_sender.decode_wanted(parts), expected) - # you don't really these next few... + # you don't really expect these next few... parts = ['md'] expected = {'data': True, 'meta': True} self.assertEqual(ssync_sender.decode_wanted(parts), expected)