Merge "py3: port obj/reconstructor tests"

This commit is contained in:
Zuul 2019-06-19 15:32:21 +00:00 committed by Gerrit Code Review
commit 808fe406d6
3 changed files with 88 additions and 74 deletions

View File

@ -39,6 +39,7 @@ import random
import errno
import xattr
import six
import six.moves.cPickle as pickle
from six import BytesIO
from six.moves import range
@ -1083,6 +1084,8 @@ def mocked_http_conn(*args, **kwargs):
requests = []
def capture_requests(ip, port, method, path, headers, qs, ssl):
if six.PY2 and not isinstance(ip, bytes):
ip = ip.encode('ascii')
req = {
'ip': ip,
'port': port,

View File

@ -18,6 +18,7 @@ import unittest
import os
from hashlib import md5
import mock
import six
import six.moves.cPickle as pickle
import tempfile
import time
@ -27,6 +28,7 @@ import random
import struct
import collections
from eventlet import Timeout, sleep, spawn
from eventlet.green import threading
from contextlib import closing, contextmanager
from gzip import GzipFile
@ -295,12 +297,15 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
df = df_mgr.get_diskfile('sda1', part, 'a', 'c', object_name,
policy=policy)
timestamp = timestamp or utils.Timestamp.now()
test_data = test_data or 'test data'
test_data = test_data or b'test data'
write_diskfile(df, timestamp, data=test_data, frag_index=frag_index,
legacy_durable=self.legacy_durable)
return df
def assert_expected_jobs(self, part_num, jobs):
# the dict diffs can be pretty big
self.maxDiff = 2048
for job in jobs:
del job['path']
del job['policy']
@ -660,19 +665,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
for job in jobs:
job_key = (job['partition'], job['frag_index'])
if job_key in expected_by_part_frag_index:
for k, value in job.items():
expected_value = \
expected_by_part_frag_index[job_key][k]
try:
if isinstance(value, list):
value.sort()
expected_value.sort()
self.assertEqual(value, expected_value)
except AssertionError as e:
extra_info = \
'\n\n... for %r in part num %s frag %s' % (
k, part_num, job_key[1])
raise AssertionError(str(e) + extra_info)
self.assertEqual(job, expected_by_part_frag_index[job_key])
else:
unexpected_jobs.append(job)
if unexpected_jobs:
@ -705,13 +698,29 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
new_device['device'] = device
new_device['id'] = next(id_counter)
self.policy.object_ring.devs.append(new_device)
self.reconstructor.stats_interval = 0
self.process_job = lambda j: sleep(0)
with mock_ssync_sender(), \
mocked_http_conn(*[200] * http_count,
body=pickle.dumps({})) as request_log:
self.reconstructor.run_once(devices=override_devices)
return request_log
self.reconstructor.stats_interval = object()
can_process = threading.Event()
can_do_stats = threading.Event()
can_do_stats.set()
def fake_sleep(secs=0):
if secs is not self.reconstructor.stats_interval:
return sleep(secs)
can_do_stats.wait()
can_do_stats.clear()
can_process.set()
def fake_process(job):
can_process.wait()
can_process.clear()
can_do_stats.set()
self.reconstructor.process_job = fake_process
with mock_ssync_sender(), mock.patch(
'swift.obj.reconstructor.sleep', fake_sleep):
self.reconstructor.run_once(devices=override_devices)
def test_run_once(self):
# sda1: 3 is done in setup
@ -720,7 +729,8 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
'sdc1': 1,
'sdd1': 0,
}
self._run_once(32, extra_devices)
with Timeout(60):
self._run_once(32, extra_devices)
stats_lines = set()
for line in self.logger.get_lines_for_level('info'):
if 'reconstructed in' not in line:
@ -728,8 +738,12 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
stat_line = line.split('reconstructed', 1)[0].strip()
stats_lines.add(stat_line)
acceptable = set([
'2/8 (25.00%) partitions',
'3/8 (37.50%) partitions',
'4/8 (50.00%) partitions',
'5/8 (62.50%) partitions',
'6/8 (75.00%) partitions',
'7/8 (87.50%) partitions',
'8/8 (100.00%) partitions',
])
matched = stats_lines & acceptable
@ -746,7 +760,8 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
'sdc1': 1,
'sdd1': 0,
}
self._run_once(3, extra_devices, 'sdc1')
with Timeout(60):
self._run_once(3, extra_devices, 'sdc1')
stats_lines = set()
for line in self.logger.get_lines_for_level('info'):
if 'reconstructed in' not in line:
@ -1332,7 +1347,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
'sda1', 2, self.policy, do_listdir=True)
stub_data.update({'7ca': {None: '8f19c38e1cf8e2390d4ca29051407ae3'}})
pickle_path = os.path.join(part_path, 'hashes.pkl')
with open(pickle_path, 'w') as f:
with open(pickle_path, 'wb') as f:
pickle.dump(stub_data, f)
# part 2 should be totally empty
@ -2908,7 +2923,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
'replication_port': self.port + 1 if dev == 'sdb' else self.port,
} for i, dev in enumerate(local_devs)]
stub_ring_devs.append({
'id': i + 1,
'id': len(local_devs),
'device': 'sdd',
'replication_ip': '127.0.0.88', # not local via IP
'replication_port': self.port,
@ -2957,13 +2972,13 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
'replication_port': self.port,
} for i, dev in enumerate(local_devs)]
stub_ring_devs.append({
'id': i + 1,
'id': len(local_devs),
'device': 'sdb',
'replication_ip': self.ip,
'replication_port': self.port + 1, # not local via port
})
stub_ring_devs.append({
'id': i + 2,
'id': len(local_devs) + 1,
'device': 'sdd',
'replication_ip': '127.0.0.88', # not local via IP
'replication_port': self.port,
@ -3388,7 +3403,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
job = jobs[0]
self.assertEqual(job['job_type'], object_reconstructor.SYNC)
self.assertEqual(job['frag_index'], frag_index)
self.assertEqual(job['suffixes'], stub_hashes.keys())
self.assertEqual(job['suffixes'], list(stub_hashes.keys()))
self.assertEqual(set([n['index'] for n in job['sync_to']]),
set([(frag_index + 1) % ring.replicas,
(frag_index - 1) % ring.replicas,
@ -3548,7 +3563,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
expected = {
'job_type': object_reconstructor.REVERT,
'frag_index': None,
'suffixes': stub_hashes.keys(),
'suffixes': list(stub_hashes.keys()),
'partition': partition,
'path': part_path,
'hashes': stub_hashes,
@ -3886,10 +3901,10 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
ssync_resp = mock.MagicMock()
ssync_resp.status = 200
ssync_resp.readline.side_effect = [
':MISSING_CHECK: START',
':MISSING_CHECK: END',
':UPDATES: START',
':UPDATES: END',
b':MISSING_CHECK: START',
b':MISSING_CHECK: END',
b':UPDATES: START',
b':UPDATES: END',
]
ssync_headers = []
@ -4445,7 +4460,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
'c', 'data-obj', policy=self.policy)
ts = self.ts()
with df.create() as writer:
test_data = 'test data'
test_data = b'test data'
writer.write(test_data)
metadata = {
'X-Timestamp': ts.internal,
@ -4579,7 +4594,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
obj_path = '/a/c/o' # subclass overrides this
obj_path = b'/a/c/o' # subclass overrides this
def setUp(self):
super(TestReconstructFragmentArchive, self).setUp()
@ -4600,7 +4615,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
test_data = (b'rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(1)
@ -4629,7 +4644,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
df = self.reconstructor.reconstruct_fa(
job, node, self.obj_metadata)
self.assertEqual(0, df.content_length)
fixed_body = ''.join(df.reader())
fixed_body = b''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
@ -4664,7 +4679,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
node = part_nodes[4]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
test_data = (b'rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data)
@ -4687,7 +4702,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
headers=headers_iter):
df = self.reconstructor.reconstruct_fa(
job, node, dict(self.obj_metadata))
fixed_body = ''.join(df.reader())
fixed_body = b''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
@ -4701,7 +4716,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
node = part_nodes[4]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
test_data = (b'rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data)
@ -4732,7 +4747,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
headers=headers_iter):
df = self.reconstructor.reconstruct_fa(
job, node, dict(self.obj_metadata))
fixed_body = ''.join(df.reader())
fixed_body = b''.join(df.reader())
# ... this bad response should be ignored like any other failure
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
@ -4749,7 +4764,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
# make up some data (trim some amount to make it unaligned with
# segment size)
test_data = ('rebuild' * self.policy.ec_segment_size)[:-454]
test_data = (b'rebuild' * self.policy.ec_segment_size)[:-454]
etag = md5(test_data).hexdigest()
ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data)
# the scheme is 10+4, so this gets a parity node
@ -4770,7 +4785,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
headers=headers_iter):
df = self.reconstructor.reconstruct_fa(
job, node, dict(self.obj_metadata))
fixed_body = ''.join(df.reader())
fixed_body = b''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
@ -4839,7 +4854,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
test_data = (b'rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data)
@ -4872,7 +4887,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, self.obj_metadata)
fixed_body = ''.join(df.reader())
fixed_body = b''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
@ -4890,7 +4905,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
test_data = (b'rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data)
@ -4911,7 +4926,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, dict(self.obj_metadata))
fixed_body = ''.join(df.reader())
fixed_body = b''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
@ -4927,7 +4942,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, dict(self.obj_metadata))
fixed_body = ''.join(df.reader())
fixed_body = b''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
@ -4945,7 +4960,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
test_data = (b'rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data)
@ -4963,7 +4978,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, dict(self.obj_metadata))
fixed_body = ''.join(df.reader())
fixed_body = b''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
@ -4983,7 +4998,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, dict(self.obj_metadata))
fixed_body = ''.join(df.reader())
fixed_body = b''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
@ -5007,7 +5022,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
test_data = (b'rebuild' * self.policy.ec_segment_size)[:-777]
ec_archive_dict = dict()
ts = make_timestamp_iter()
# create 3 different ec bodies
@ -5084,7 +5099,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
node = part_nodes[broken_index]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
test_data = (b'rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data)
@ -5104,7 +5119,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, self.obj_metadata)
fixed_body = ''.join(df.reader())
fixed_body = b''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
@ -5143,7 +5158,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
test_data = (b'rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data)
@ -5164,7 +5179,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, self.obj_metadata)
fixed_body = ''.join(df.reader())
fixed_body = b''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
@ -5193,7 +5208,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
test_data = (b'rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data)
@ -5222,7 +5237,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
*codes, body_iter=body_iter, headers=headers) as mock_conn:
df = self.reconstructor.reconstruct_fa(
job, node, self.obj_metadata)
fixed_body = ''.join(df.reader())
fixed_body = b''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
@ -5235,9 +5250,11 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
path = unquote(
'%(ip)s:%(port)d%(path)s' % mock_conn.requests[0]
).encode('latin1').decode('utf8')
)
expected_warning = 'Invalid resp from %s policy#0%s' % (
path, warning_extra)
if six.PY2:
expected_warning = expected_warning.decode('utf8')
self.assertIn(expected_warning, warning_log_lines)
test_missing_header(
@ -5263,7 +5280,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
test_data = (b'rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data)
@ -5287,7 +5304,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
*codes, body_iter=body_iter, headers=headers) as mock_conn:
df = self.reconstructor.reconstruct_fa(
job, node, self.obj_metadata)
fixed_body = ''.join(df.reader())
fixed_body = b''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
@ -5300,11 +5317,13 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
path = unquote(
'%(ip)s:%(port)d%(path)s' % mock_conn.requests[0]
).encode('latin1').decode('utf8')
)
expected_warning = (
'Invalid resp from %s policy#0 '
'(invalid X-Object-Sysmeta-Ec-Frag-Index: %r)'
% (path, invalid_frag_index))
if six.PY2:
expected_warning = expected_warning.decode('utf8')
self.assertIn(expected_warning, warning_log_lines)
for value in ('None', 'invalid'):
@ -5314,7 +5333,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
@patch_policies(with_ec_default=True)
class TestReconstructFragmentArchiveUTF8(TestReconstructFragmentArchive):
# repeat superclass tests with an object path that contains non-ascii chars
obj_path = '/a/c/o\xc3\xa8'
obj_path = b'/a/c/o\xc3\xa8'
@patch_policies([ECStoragePolicy(0, name='ec', is_default=True,
@ -5344,7 +5363,7 @@ class TestObjectReconstructorECDuplicationFactor(TestObjectReconstructor):
'X-Timestamp': '1234567890.12345',
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
test_data = (b'rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data)
@ -5381,7 +5400,7 @@ class TestObjectReconstructorECDuplicationFactor(TestObjectReconstructor):
*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, metadata)
fixed_body = ''.join(df.reader())
fixed_body = b''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())

10
tox.ini
View File

@ -94,15 +94,7 @@ commands =
test/unit/common/test_utils.py \
test/unit/common/test_wsgi.py \
test/unit/container \
test/unit/obj/test_auditor.py \
test/unit/obj/test_diskfile.py \
test/unit/obj/test_expirer.py \
test/unit/obj/test_replicator.py \
test/unit/obj/test_server.py \
test/unit/obj/test_ssync.py \
test/unit/obj/test_ssync_receiver.py \
test/unit/obj/test_ssync_sender.py \
test/unit/obj/test_updater.py \
test/unit/obj \
test/unit/proxy}
[testenv:py36]