Per-policy DiskFile classes

Adds specific disk file classes for EC policy types.

The new ECDiskFile and ECDiskFileWriter classes are used by the
ECDiskFileManager.

ECDiskFileManager is registered with the DiskFileRouter for use with
EC_POLICY type policies.

Refactors diskfile tests into BaseDiskFileMixin and BaseDiskFileManagerMixin
classes which are then extended in subclasses for the legacy
replication-type DiskFile* and ECDiskFile* classes.

Refactor to prefer use of a policy instance reference over a policy_index
int to refer to a policy.

Add additional verification to DiskFileManager.get_dev_path to validate the
device root with common.constraints.check_dir, even when mount_check is
disabled for use in on a virtual swift-all-in-one.

Co-Authored-By: Thiago da Silva <thiago@redhat.com>
Co-Authored-By: John Dickinson <me@not.mn>
Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com>
Co-Authored-By: Tushar Gohad <tushar.gohad@intel.com>
Co-Authored-By: Paul Luse <paul.e.luse@intel.com>
Co-Authored-By: Samuel Merritt <sam@swiftstack.com>
Co-Authored-By: Christian Schwede <christian.schwede@enovance.com>
Co-Authored-By: Yuan Zhou <yuan.zhou@intel.com>
Change-Id: I22f915160dc67a9e18f4738c1ddf068344e8ad5d
This commit is contained in:
Alistair Coles 2015-03-17 08:32:57 +00:00 committed by Clay Gerrard
parent 8a58bbf75f
commit fa89064933
19 changed files with 4480 additions and 1349 deletions

View File

@ -24,7 +24,7 @@ from swift.common.request_helpers import is_sys_meta, is_user_meta, \
from swift.account.backend import AccountBroker, DATADIR as ABDATADIR
from swift.container.backend import ContainerBroker, DATADIR as CBDATADIR
from swift.obj.diskfile import get_data_dir, read_metadata, DATADIR_BASE, \
extract_policy_index
extract_policy
from swift.common.storage_policy import POLICIES
@ -341,10 +341,7 @@ def print_obj(datafile, check_etag=True, swift_dir='/etc/swift',
datadir = DATADIR_BASE
# try to extract policy index from datafile disk path
try:
policy_index = extract_policy_index(datafile)
except ValueError:
pass
policy_index = int(extract_policy(datafile) or POLICIES.legacy)
try:
if policy_index:

View File

@ -204,6 +204,19 @@ def check_object_creation(req, object_name):
return check_metadata(req, 'object')
def check_dir(root, drive):
"""
Verify that the path to the device is a directory and is a lesser
constraint that is enforced when a full mount_check isn't possible
with, for instance, a VM using loopback or partitions.
:param root: base path where the dir is
:param drive: drive name to be checked
:returns: True if it is a valid directoy, False otherwise
"""
return os.path.isdir(os.path.join(root, drive))
def check_mount(root, drive):
"""
Verify that the path to the device is a mount point and mounted. This

File diff suppressed because it is too large Load Diff

View File

@ -57,6 +57,12 @@ class InMemoryFileSystem(object):
def get_diskfile(self, account, container, obj, **kwargs):
return DiskFile(self, account, container, obj)
def pickle_async_update(self, *args, **kwargs):
"""
For now don't handle async updates.
"""
pass
class DiskFileWriter(object):
"""
@ -98,6 +104,16 @@ class DiskFileWriter(object):
metadata['name'] = self._name
self._filesystem.put_object(self._name, self._fp, metadata)
def commit(self, timestamp):
"""
Perform any operations necessary to mark the object as durable. For
mem_diskfile type this is a no-op.
:param timestamp: object put timestamp, an instance of
:class:`~swift.common.utils.Timestamp`
"""
pass
class DiskFileReader(object):
"""

View File

@ -15,15 +15,7 @@
""" In-Memory Object Server for Swift """
import os
from swift import gettext_ as _
from eventlet import Timeout
from swift.common.bufferedhttp import http_connect
from swift.common.exceptions import ConnectionTimeout
from swift.common.http import is_success
from swift.obj.mem_diskfile import InMemoryFileSystem
from swift.obj import server
@ -53,49 +45,6 @@ class ObjectController(server.ObjectController):
"""
return self._filesystem.get_diskfile(account, container, obj, **kwargs)
def async_update(self, op, account, container, obj, host, partition,
contdevice, headers_out, objdevice, policy_idx):
"""
Sends or saves an async update.
:param op: operation performed (ex: 'PUT', or 'DELETE')
:param account: account name for the object
:param container: container name for the object
:param obj: object name
:param host: host that the container is on
:param partition: partition that the container is on
:param contdevice: device name that the container is on
:param headers_out: dictionary of headers to send in the container
request
:param objdevice: device name that the object is in
:param policy_idx: the associated storage policy index
"""
headers_out['user-agent'] = 'object-server %s' % os.getpid()
full_path = '/%s/%s/%s' % (account, container, obj)
if all([host, partition, contdevice]):
try:
with ConnectionTimeout(self.conn_timeout):
ip, port = host.rsplit(':', 1)
conn = http_connect(ip, port, contdevice, partition, op,
full_path, headers_out)
with Timeout(self.node_timeout):
response = conn.getresponse()
response.read()
if is_success(response.status):
return
else:
self.logger.error(_(
'ERROR Container update failed: %(status)d '
'response from %(ip)s:%(port)s/%(dev)s'),
{'status': response.status, 'ip': ip, 'port': port,
'dev': contdevice})
except (Exception, Timeout):
self.logger.exception(_(
'ERROR container update failed with '
'%(ip)s:%(port)s/%(dev)s'),
{'ip': ip, 'port': port, 'dev': contdevice})
# FIXME: For now don't handle async updates
def REPLICATE(self, request):
"""
Handle REPLICATE requests for the Swift Object Server. This is used

View File

@ -39,7 +39,7 @@ from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE
from swift.obj import ssync_sender
from swift.obj.diskfile import (DiskFileManager, get_hashes, get_data_dir,
get_tmp_dir)
from swift.common.storage_policy import POLICIES
from swift.common.storage_policy import POLICIES, REPL_POLICY
hubs.use_hub(get_hub())
@ -110,14 +110,15 @@ class ObjectReplicator(Daemon):
"""
return self.sync_method(node, job, suffixes, *args, **kwargs)
def get_object_ring(self, policy_idx):
def load_object_ring(self, policy):
"""
Get the ring object to use to handle a request based on its policy.
Make sure the policy's rings are loaded.
:policy_idx: policy index as defined in swift.conf
:param policy: the StoragePolicy instance
:returns: appropriate ring object
"""
return POLICIES.get_object_ring(policy_idx, self.swift_dir)
policy.load_ring(self.swift_dir)
return policy.object_ring
def _rsync(self, args):
"""
@ -196,7 +197,7 @@ class ObjectReplicator(Daemon):
had_any = True
if not had_any:
return False, set()
data_dir = get_data_dir(job['policy_idx'])
data_dir = get_data_dir(job['policy'])
args.append(join(rsync_module, node['device'],
data_dir, job['partition']))
return self._rsync(args) == 0, set()
@ -231,7 +232,7 @@ class ObjectReplicator(Daemon):
if len(suff) == 3 and isdir(join(path, suff))]
self.replication_count += 1
self.logger.increment('partition.delete.count.%s' % (job['device'],))
self.headers['X-Backend-Storage-Policy-Index'] = job['policy_idx']
self.headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
begin = time.time()
try:
responses = []
@ -314,7 +315,7 @@ class ObjectReplicator(Daemon):
"""
self.replication_count += 1
self.logger.increment('partition.update.count.%s' % (job['device'],))
self.headers['X-Backend-Storage-Policy-Index'] = job['policy_idx']
self.headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
begin = time.time()
try:
hashed, local_hash = tpool_reraise(
@ -328,7 +329,8 @@ class ObjectReplicator(Daemon):
random.shuffle(job['nodes'])
nodes = itertools.chain(
job['nodes'],
job['object_ring'].get_more_nodes(int(job['partition'])))
job['policy'].object_ring.get_more_nodes(
int(job['partition'])))
while attempts_left > 0:
# If this throws StopIteration it will be caught way below
node = next(nodes)
@ -460,16 +462,15 @@ class ObjectReplicator(Daemon):
self.kill_coros()
self.last_replication_count = self.replication_count
def process_repl(self, policy, ips, override_devices=None,
def build_replication_jobs(self, policy, ips, override_devices=None,
override_partitions=None):
"""
Helper function for collect_jobs to build jobs for replication
using replication style storage policy
"""
jobs = []
obj_ring = self.get_object_ring(policy.idx)
data_dir = get_data_dir(policy.idx)
for local_dev in [dev for dev in obj_ring.devs
data_dir = get_data_dir(policy)
for local_dev in [dev for dev in policy.object_ring.devs
if (dev
and is_local_device(ips,
self.port,
@ -479,7 +480,7 @@ class ObjectReplicator(Daemon):
or dev['device'] in override_devices))]:
dev_path = join(self.devices_dir, local_dev['device'])
obj_path = join(dev_path, data_dir)
tmp_path = join(dev_path, get_tmp_dir(int(policy)))
tmp_path = join(dev_path, get_tmp_dir(policy))
if self.mount_check and not ismount(dev_path):
self.logger.warn(_('%s is not mounted'), local_dev['device'])
continue
@ -497,7 +498,8 @@ class ObjectReplicator(Daemon):
try:
job_path = join(obj_path, partition)
part_nodes = obj_ring.get_part_nodes(int(partition))
part_nodes = policy.object_ring.get_part_nodes(
int(partition))
nodes = [node for node in part_nodes
if node['id'] != local_dev['id']]
jobs.append(
@ -506,9 +508,8 @@ class ObjectReplicator(Daemon):
obj_path=obj_path,
nodes=nodes,
delete=len(nodes) > len(part_nodes) - 1,
policy_idx=policy.idx,
policy=policy,
partition=partition,
object_ring=obj_ring,
region=local_dev['region']))
except ValueError:
continue
@ -530,12 +531,14 @@ class ObjectReplicator(Daemon):
jobs = []
ips = whataremyips()
for policy in POLICIES:
if (override_policies is not None
and str(policy.idx) not in override_policies):
if policy.policy_type == REPL_POLICY:
if (override_policies is not None and
str(policy.idx) not in override_policies):
continue
# may need to branch here for future policy types
jobs += self.process_repl(policy, ips,
override_devices=override_devices,
# ensure rings are loaded for policy
self.load_object_ring(policy)
jobs += self.build_replication_jobs(
policy, ips, override_devices=override_devices,
override_partitions=override_partitions)
random.shuffle(jobs)
if self.handoffs_first:
@ -569,7 +572,7 @@ class ObjectReplicator(Daemon):
if self.mount_check and not ismount(dev_path):
self.logger.warn(_('%s is not mounted'), job['device'])
continue
if not self.check_ring(job['object_ring']):
if not self.check_ring(job['policy'].object_ring):
self.logger.info(_("Ring change detected. Aborting "
"current replication pass."))
return

View File

@ -685,12 +685,17 @@ class ObjectController(BaseStorageServer):
"""
Handle REPLICATE requests for the Swift Object Server. This is used
by the object replicator to get hashes for directories.
Note that the name REPLICATE is preserved for historical reasons as
this verb really just returns the hashes information for the specified
parameters and is used, for example, by both replication and EC.
"""
device, partition, suffix, policy_idx = \
device, partition, suffix_parts, policy = \
get_name_and_placement(request, 2, 3, True)
suffixes = suffix_parts.split('-') if suffix_parts else []
try:
hashes = self._diskfile_mgr.get_hashes(device, partition, suffix,
policy_idx)
hashes = self._diskfile_mgr.get_hashes(
device, partition, suffixes, policy)
except DiskFileDeviceUnavailable:
resp = HTTPInsufficientStorage(drive=device, request=request)
else:

View File

@ -47,7 +47,7 @@ class Sender(object):
@property
def policy_idx(self):
return int(self.job.get('policy_idx', 0))
return int(self.job.get('policy', 0))
def __call__(self):
"""

View File

@ -29,7 +29,8 @@ from swift.common.ring import Ring
from swift.common.utils import get_logger, renamer, write_pickle, \
dump_recon_cache, config_true_value, ismount
from swift.common.daemon import Daemon
from swift.obj.diskfile import get_tmp_dir, get_async_dir, ASYNCDIR_BASE
from swift.common.storage_policy import split_policy_string, PolicyError
from swift.obj.diskfile import get_tmp_dir, ASYNCDIR_BASE
from swift.common.http import is_success, HTTP_NOT_FOUND, \
HTTP_INTERNAL_SERVER_ERROR
@ -148,28 +149,19 @@ class ObjectUpdater(Daemon):
start_time = time.time()
# loop through async pending dirs for all policies
for asyncdir in self._listdir(device):
# skip stuff like "accounts", "containers", etc.
if not (asyncdir == ASYNCDIR_BASE or
asyncdir.startswith(ASYNCDIR_BASE + '-')):
continue
# we only care about directories
async_pending = os.path.join(device, asyncdir)
if not os.path.isdir(async_pending):
continue
if asyncdir == ASYNCDIR_BASE:
policy_idx = 0
else:
_junk, policy_idx = asyncdir.split('-', 1)
try:
policy_idx = int(policy_idx)
get_async_dir(policy_idx)
except ValueError:
self.logger.warn(_('Directory %s does not map to a '
'valid policy') % asyncdir)
if not asyncdir.startswith(ASYNCDIR_BASE):
# skip stuff like "accounts", "containers", etc.
continue
try:
base, policy = split_policy_string(asyncdir)
except PolicyError as e:
self.logger.warn(_('Directory %r does not map '
'to a valid policy (%s)') % (asyncdir, e))
continue
for prefix in self._listdir(async_pending):
prefix_path = os.path.join(async_pending, prefix)
if not os.path.isdir(prefix_path):
@ -193,7 +185,7 @@ class ObjectUpdater(Daemon):
os.unlink(update_path)
else:
self.process_object_update(update_path, device,
policy_idx)
policy)
last_obj_hash = obj_hash
time.sleep(self.slowdown)
try:
@ -202,13 +194,13 @@ class ObjectUpdater(Daemon):
pass
self.logger.timing_since('timing', start_time)
def process_object_update(self, update_path, device, policy_idx):
def process_object_update(self, update_path, device, policy):
"""
Process the object information to be updated and update.
:param update_path: path to pickled object update file
:param device: path to device
:param policy_idx: storage policy index of object update
:param policy: storage policy of object update
"""
try:
update = pickle.load(open(update_path, 'rb'))
@ -228,7 +220,7 @@ class ObjectUpdater(Daemon):
headers_out = update['headers'].copy()
headers_out['user-agent'] = 'object-updater %s' % os.getpid()
headers_out.setdefault('X-Backend-Storage-Policy-Index',
str(policy_idx))
str(int(policy)))
events = [spawn(self.object_update,
node, part, update['op'], obj, headers_out)
for node in nodes if node['id'] not in successes]
@ -256,7 +248,7 @@ class ObjectUpdater(Daemon):
if new_successes:
update['successes'] = successes
write_pickle(update, update_path, os.path.join(
device, get_tmp_dir(policy_idx)))
device, get_tmp_dir(policy)))
def object_update(self, node, part, op, obj, headers_out):
"""

View File

@ -368,6 +368,11 @@ class TestConstraints(unittest.TestCase):
self.assertTrue('X-Delete-At' in req.headers)
self.assertEqual(req.headers['X-Delete-At'], expected)
def test_check_dir(self):
self.assertFalse(constraints.check_dir('', ''))
with mock.patch("os.path.isdir", MockTrue()):
self.assertTrue(constraints.check_dir('/srv', 'foo/bar'))
def test_check_mount(self):
self.assertFalse(constraints.check_mount('', ''))
with mock.patch("swift.common.utils.ismount", MockTrue()):

View File

@ -28,7 +28,7 @@ from swift.obj.diskfile import DiskFile, write_metadata, invalidate_hash, \
get_data_dir, DiskFileManager, AuditLocation
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
storage_directory
from swift.common.storage_policy import StoragePolicy
from swift.common.storage_policy import StoragePolicy, POLICIES
_mocked_policies = [StoragePolicy(0, 'zero', False),
@ -48,12 +48,16 @@ class TestAuditor(unittest.TestCase):
os.mkdir(os.path.join(self.devices, 'sdb'))
# policy 0
self.objects = os.path.join(self.devices, 'sda', get_data_dir(0))
self.objects_2 = os.path.join(self.devices, 'sdb', get_data_dir(0))
self.objects = os.path.join(self.devices, 'sda',
get_data_dir(POLICIES[0]))
self.objects_2 = os.path.join(self.devices, 'sdb',
get_data_dir(POLICIES[0]))
os.mkdir(self.objects)
# policy 1
self.objects_p1 = os.path.join(self.devices, 'sda', get_data_dir(1))
self.objects_2_p1 = os.path.join(self.devices, 'sdb', get_data_dir(1))
self.objects_p1 = os.path.join(self.devices, 'sda',
get_data_dir(POLICIES[1]))
self.objects_2_p1 = os.path.join(self.devices, 'sdb',
get_data_dir(POLICIES[1]))
os.mkdir(self.objects_p1)
self.parts = self.parts_p1 = {}
@ -70,9 +74,10 @@ class TestAuditor(unittest.TestCase):
self.df_mgr = DiskFileManager(self.conf, self.logger)
# diskfiles for policy 0, 1
self.disk_file = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o', 0)
self.disk_file = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o',
policy=POLICIES[0])
self.disk_file_p1 = self.df_mgr.get_diskfile('sda', '0', 'a', 'c',
'o', 1)
'o', policy=POLICIES[1])
def tearDown(self):
rmtree(os.path.dirname(self.testdir), ignore_errors=1)
@ -125,13 +130,15 @@ class TestAuditor(unittest.TestCase):
pre_quarantines = auditor_worker.quarantines
auditor_worker.object_audit(
AuditLocation(disk_file._datadir, 'sda', '0'))
AuditLocation(disk_file._datadir, 'sda', '0',
policy=POLICIES.legacy))
self.assertEquals(auditor_worker.quarantines, pre_quarantines)
os.write(writer._fd, 'extra_data')
auditor_worker.object_audit(
AuditLocation(disk_file._datadir, 'sda', '0'))
AuditLocation(disk_file._datadir, 'sda', '0',
policy=POLICIES.legacy))
self.assertEquals(auditor_worker.quarantines,
pre_quarantines + 1)
run_tests(self.disk_file)
@ -156,10 +163,12 @@ class TestAuditor(unittest.TestCase):
pre_quarantines = auditor_worker.quarantines
# remake so it will have metadata
self.disk_file = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o')
self.disk_file = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o',
policy=POLICIES.legacy)
auditor_worker.object_audit(
AuditLocation(self.disk_file._datadir, 'sda', '0'))
AuditLocation(self.disk_file._datadir, 'sda', '0',
policy=POLICIES.legacy))
self.assertEquals(auditor_worker.quarantines, pre_quarantines)
etag = md5()
etag.update('1' + '0' * 1023)
@ -171,7 +180,8 @@ class TestAuditor(unittest.TestCase):
writer.put(metadata)
auditor_worker.object_audit(
AuditLocation(self.disk_file._datadir, 'sda', '0'))
AuditLocation(self.disk_file._datadir, 'sda', '0',
policy=POLICIES.legacy))
self.assertEquals(auditor_worker.quarantines, pre_quarantines + 1)
def test_object_audit_no_meta(self):
@ -186,7 +196,8 @@ class TestAuditor(unittest.TestCase):
self.rcache, self.devices)
pre_quarantines = auditor_worker.quarantines
auditor_worker.object_audit(
AuditLocation(self.disk_file._datadir, 'sda', '0'))
AuditLocation(self.disk_file._datadir, 'sda', '0',
policy=POLICIES.legacy))
self.assertEquals(auditor_worker.quarantines, pre_quarantines + 1)
def test_object_audit_will_not_swallow_errors_in_tests(self):
@ -203,7 +214,8 @@ class TestAuditor(unittest.TestCase):
with mock.patch.object(DiskFileManager,
'get_diskfile_from_audit_location', blowup):
self.assertRaises(NameError, auditor_worker.object_audit,
AuditLocation(os.path.dirname(path), 'sda', '0'))
AuditLocation(os.path.dirname(path), 'sda', '0',
policy=POLICIES.legacy))
def test_failsafe_object_audit_will_swallow_errors_in_tests(self):
timestamp = str(normalize_timestamp(time.time()))
@ -216,9 +228,11 @@ class TestAuditor(unittest.TestCase):
def blowup(*args):
raise NameError('tpyo')
with mock.patch('swift.obj.diskfile.DiskFile', blowup):
with mock.patch('swift.obj.diskfile.DiskFileManager.diskfile_cls',
blowup):
auditor_worker.failsafe_object_audit(
AuditLocation(os.path.dirname(path), 'sda', '0'))
AuditLocation(os.path.dirname(path), 'sda', '0',
policy=POLICIES.legacy))
self.assertEquals(auditor_worker.errors, 1)
def test_generic_exception_handling(self):
@ -240,7 +254,8 @@ class TestAuditor(unittest.TestCase):
'Content-Length': str(os.fstat(writer._fd).st_size),
}
writer.put(metadata)
with mock.patch('swift.obj.diskfile.DiskFile', lambda *_: 1 / 0):
with mock.patch('swift.obj.diskfile.DiskFileManager.diskfile_cls',
lambda *_: 1 / 0):
auditor_worker.audit_all_objects()
self.assertEquals(auditor_worker.errors, pre_errors + 1)
@ -368,7 +383,8 @@ class TestAuditor(unittest.TestCase):
}
writer.put(metadata)
auditor_worker.audit_all_objects()
self.disk_file = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'ob')
self.disk_file = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'ob',
policy=POLICIES.legacy)
data = '1' * 10
etag = md5()
with self.disk_file.create() as writer:
@ -424,7 +440,7 @@ class TestAuditor(unittest.TestCase):
name_hash = hash_path('a', 'c', 'o')
dir_path = os.path.join(
self.devices, 'sda',
storage_directory(get_data_dir(0), '0', name_hash))
storage_directory(get_data_dir(POLICIES[0]), '0', name_hash))
ts_file_path = os.path.join(dir_path, '99999.ts')
if not os.path.exists(dir_path):
mkdirs(dir_path)
@ -474,9 +490,8 @@ class TestAuditor(unittest.TestCase):
DiskFile._quarantine(self, data_file, msg)
self.setup_bad_zero_byte()
was_df = auditor.diskfile.DiskFile
try:
auditor.diskfile.DiskFile = FakeFile
with mock.patch('swift.obj.diskfile.DiskFileManager.diskfile_cls',
FakeFile):
kwargs = {'mode': 'once'}
kwargs['zero_byte_fps'] = 50
self.auditor.run_audit(**kwargs)
@ -484,8 +499,6 @@ class TestAuditor(unittest.TestCase):
'sda', 'quarantined', 'objects')
self.assertTrue(os.path.isdir(quarantine_path))
self.assertTrue(rat[0])
finally:
auditor.diskfile.DiskFile = was_df
@mock.patch.object(auditor.ObjectAuditor, 'run_audit')
@mock.patch('os.fork', return_value=0)

File diff suppressed because it is too large Load Diff

View File

@ -173,9 +173,9 @@ class TestObjectReplicator(unittest.TestCase):
os.mkdir(self.devices)
os.mkdir(os.path.join(self.devices, 'sda'))
self.objects = os.path.join(self.devices, 'sda',
diskfile.get_data_dir(0))
diskfile.get_data_dir(POLICIES[0]))
self.objects_1 = os.path.join(self.devices, 'sda',
diskfile.get_data_dir(1))
diskfile.get_data_dir(POLICIES[1]))
os.mkdir(self.objects)
os.mkdir(self.objects_1)
self.parts = {}
@ -205,7 +205,7 @@ class TestObjectReplicator(unittest.TestCase):
object_replicator.http_connect = mock_http_connect(200)
cur_part = '0'
df = self.df_mgr.get_diskfile('sda', cur_part, 'a', 'c', 'o',
policy_idx=0)
policy=POLICIES[0])
mkdirs(df._datadir)
f = open(os.path.join(df._datadir,
normalize_timestamp(time.time()) + '.data'),
@ -216,7 +216,7 @@ class TestObjectReplicator(unittest.TestCase):
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, cur_part, data_dir)
process_arg_checker = []
ring = replicator.get_object_ring(0)
ring = replicator.load_object_ring(POLICIES[0])
nodes = [node for node in
ring.get_part_nodes(int(cur_part))
if node['ip'] not in _ips()]
@ -239,7 +239,7 @@ class TestObjectReplicator(unittest.TestCase):
object_replicator.http_connect = mock_http_connect(200)
cur_part = '0'
df = self.df_mgr.get_diskfile('sda', cur_part, 'a', 'c', 'o',
policy_idx=1)
policy=POLICIES[1])
mkdirs(df._datadir)
f = open(os.path.join(df._datadir,
normalize_timestamp(time.time()) + '.data'),
@ -250,7 +250,7 @@ class TestObjectReplicator(unittest.TestCase):
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects_1, cur_part, data_dir)
process_arg_checker = []
ring = replicator.get_object_ring(1)
ring = replicator.load_object_ring(POLICIES[1])
nodes = [node for node in
ring.get_part_nodes(int(cur_part))
if node['ip'] not in _ips()]
@ -266,7 +266,7 @@ class TestObjectReplicator(unittest.TestCase):
def test_check_ring(self):
for pol in POLICIES:
obj_ring = self.replicator.get_object_ring(pol.idx)
obj_ring = self.replicator.load_object_ring(pol)
self.assertTrue(self.replicator.check_ring(obj_ring))
orig_check = self.replicator.next_check
self.replicator.next_check = orig_check - 30
@ -300,7 +300,7 @@ class TestObjectReplicator(unittest.TestCase):
jobs_to_delete = [j for j in jobs if j['delete']]
jobs_by_pol_part = {}
for job in jobs:
jobs_by_pol_part[str(job['policy_idx']) + job['partition']] = job
jobs_by_pol_part[str(int(job['policy'])) + job['partition']] = job
self.assertEquals(len(jobs_to_delete), 2)
self.assertTrue('1', jobs_to_delete[0]['partition'])
self.assertEquals(
@ -392,7 +392,8 @@ class TestObjectReplicator(unittest.TestCase):
def test_delete_partition(self):
with mock.patch('swift.obj.replicator.http_connect',
mock_http_connect(200)):
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o')
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o',
policy=POLICIES.legacy)
mkdirs(df._datadir)
f = open(os.path.join(df._datadir,
normalize_timestamp(time.time()) + '.data'),
@ -404,7 +405,7 @@ class TestObjectReplicator(unittest.TestCase):
whole_path_from = os.path.join(self.objects, '1', data_dir)
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
ring = self.replicator.get_object_ring(0)
ring = self.replicator.load_object_ring(POLICIES[0])
nodes = [node for node in
ring.get_part_nodes(1)
if node['ip'] not in _ips()]
@ -421,7 +422,8 @@ class TestObjectReplicator(unittest.TestCase):
self.replicator.conf.pop('sync_method')
with mock.patch('swift.obj.replicator.http_connect',
mock_http_connect(200)):
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o')
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o',
policy=POLICIES.legacy)
mkdirs(df._datadir)
f = open(os.path.join(df._datadir,
normalize_timestamp(time.time()) + '.data'),
@ -433,7 +435,7 @@ class TestObjectReplicator(unittest.TestCase):
whole_path_from = os.path.join(self.objects, '1', data_dir)
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
ring = self.replicator.get_object_ring(0)
ring = self.replicator.load_object_ring(POLICIES[0])
nodes = [node for node in
ring.get_part_nodes(1)
if node['ip'] not in _ips()]
@ -470,7 +472,8 @@ class TestObjectReplicator(unittest.TestCase):
with mock.patch('swift.obj.replicator.http_connect',
mock_http_connect(200)):
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o')
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o',
policy=POLICIES.legacy)
mkdirs(df._datadir)
f = open(os.path.join(df._datadir,
normalize_timestamp(time.time()) + '.data'),
@ -496,7 +499,7 @@ class TestObjectReplicator(unittest.TestCase):
with mock.patch('swift.obj.replicator.http_connect',
mock_http_connect(200)):
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o',
policy_idx=1)
policy=POLICIES[1])
mkdirs(df._datadir)
f = open(os.path.join(df._datadir,
normalize_timestamp(time.time()) + '.data'),
@ -508,7 +511,7 @@ class TestObjectReplicator(unittest.TestCase):
whole_path_from = os.path.join(self.objects_1, '1', data_dir)
part_path = os.path.join(self.objects_1, '1')
self.assertTrue(os.access(part_path, os.F_OK))
ring = self.replicator.get_object_ring(1)
ring = self.replicator.load_object_ring(POLICIES[1])
nodes = [node for node in
ring.get_part_nodes(1)
if node['ip'] not in _ips()]
@ -524,7 +527,8 @@ class TestObjectReplicator(unittest.TestCase):
def test_delete_partition_with_failures(self):
with mock.patch('swift.obj.replicator.http_connect',
mock_http_connect(200)):
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o')
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o',
policy=POLICIES.legacy)
mkdirs(df._datadir)
f = open(os.path.join(df._datadir,
normalize_timestamp(time.time()) + '.data'),
@ -536,7 +540,7 @@ class TestObjectReplicator(unittest.TestCase):
whole_path_from = os.path.join(self.objects, '1', data_dir)
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
ring = self.replicator.get_object_ring(0)
ring = self.replicator.load_object_ring(POLICIES[0])
nodes = [node for node in
ring.get_part_nodes(1)
if node['ip'] not in _ips()]
@ -559,7 +563,8 @@ class TestObjectReplicator(unittest.TestCase):
with mock.patch('swift.obj.replicator.http_connect',
mock_http_connect(200)):
self.replicator.handoff_delete = 2
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o')
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o',
policy=POLICIES.legacy)
mkdirs(df._datadir)
f = open(os.path.join(df._datadir,
normalize_timestamp(time.time()) + '.data'),
@ -571,7 +576,7 @@ class TestObjectReplicator(unittest.TestCase):
whole_path_from = os.path.join(self.objects, '1', data_dir)
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
ring = self.replicator.get_object_ring(0)
ring = self.replicator.load_object_ring(POLICIES[0])
nodes = [node for node in
ring.get_part_nodes(1)
if node['ip'] not in _ips()]
@ -593,7 +598,8 @@ class TestObjectReplicator(unittest.TestCase):
with mock.patch('swift.obj.replicator.http_connect',
mock_http_connect(200)):
self.replicator.handoff_delete = 2
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o')
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o',
policy=POLICIES.legacy)
mkdirs(df._datadir)
f = open(os.path.join(df._datadir,
normalize_timestamp(time.time()) + '.data'),
@ -605,7 +611,7 @@ class TestObjectReplicator(unittest.TestCase):
whole_path_from = os.path.join(self.objects, '1', data_dir)
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
ring = self.replicator.get_object_ring(0)
ring = self.replicator.load_object_ring(POLICIES[0])
nodes = [node for node in
ring.get_part_nodes(1)
if node['ip'] not in _ips()]
@ -627,7 +633,8 @@ class TestObjectReplicator(unittest.TestCase):
def test_delete_partition_with_handoff_delete_fail_in_other_region(self):
with mock.patch('swift.obj.replicator.http_connect',
mock_http_connect(200)):
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o')
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o',
policy=POLICIES.legacy)
mkdirs(df._datadir)
f = open(os.path.join(df._datadir,
normalize_timestamp(time.time()) + '.data'),
@ -639,7 +646,7 @@ class TestObjectReplicator(unittest.TestCase):
whole_path_from = os.path.join(self.objects, '1', data_dir)
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
ring = self.replicator.get_object_ring(0)
ring = self.replicator.load_object_ring(POLICIES[0])
nodes = [node for node in
ring.get_part_nodes(1)
if node['ip'] not in _ips()]
@ -659,7 +666,8 @@ class TestObjectReplicator(unittest.TestCase):
self.assertTrue(os.access(part_path, os.F_OK))
def test_delete_partition_override_params(self):
df = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o')
df = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o',
policy=POLICIES.legacy)
mkdirs(df._datadir)
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
@ -672,9 +680,10 @@ class TestObjectReplicator(unittest.TestCase):
self.assertFalse(os.access(part_path, os.F_OK))
def test_delete_policy_override_params(self):
df0 = self.df_mgr.get_diskfile('sda', '99', 'a', 'c', 'o')
df0 = self.df_mgr.get_diskfile('sda', '99', 'a', 'c', 'o',
policy=POLICIES.legacy)
df1 = self.df_mgr.get_diskfile('sda', '99', 'a', 'c', 'o',
policy_idx=1)
policy=POLICIES[1])
mkdirs(df0._datadir)
mkdirs(df1._datadir)
@ -695,7 +704,8 @@ class TestObjectReplicator(unittest.TestCase):
def test_delete_partition_ssync(self):
with mock.patch('swift.obj.replicator.http_connect',
mock_http_connect(200)):
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o')
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o',
policy=POLICIES.legacy)
mkdirs(df._datadir)
f = open(os.path.join(df._datadir,
normalize_timestamp(time.time()) + '.data'),
@ -743,7 +753,8 @@ class TestObjectReplicator(unittest.TestCase):
def test_delete_partition_ssync_with_sync_failure(self):
with mock.patch('swift.obj.replicator.http_connect',
mock_http_connect(200)):
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o')
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o',
policy=POLICIES.legacy)
mkdirs(df._datadir)
f = open(os.path.join(df._datadir,
normalize_timestamp(time.time()) + '.data'),
@ -791,7 +802,8 @@ class TestObjectReplicator(unittest.TestCase):
self.replicator.logger = debug_logger('test-replicator')
with mock.patch('swift.obj.replicator.http_connect',
mock_http_connect(200)):
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o')
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o',
policy=POLICIES.legacy)
mkdirs(df._datadir)
f = open(os.path.join(df._datadir,
normalize_timestamp(time.time()) + '.data'),
@ -832,7 +844,8 @@ class TestObjectReplicator(unittest.TestCase):
mock_http_connect(200)):
self.replicator.logger = mock_logger = \
debug_logger('test-replicator')
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o')
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o',
policy=POLICIES.legacy)
mkdirs(df._datadir)
f = open(os.path.join(df._datadir,
normalize_timestamp(time.time()) + '.data'),
@ -927,7 +940,8 @@ class TestObjectReplicator(unittest.TestCase):
# Write some files into '1' and run replicate- they should be moved
# to the other partitions and then node should get deleted.
cur_part = '1'
df = self.df_mgr.get_diskfile('sda', cur_part, 'a', 'c', 'o')
df = self.df_mgr.get_diskfile('sda', cur_part, 'a', 'c', 'o',
policy=POLICIES.legacy)
mkdirs(df._datadir)
f = open(os.path.join(df._datadir,
normalize_timestamp(time.time()) + '.data'),
@ -937,7 +951,7 @@ class TestObjectReplicator(unittest.TestCase):
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, cur_part, data_dir)
ring = replicator.get_object_ring(0)
ring = replicator.load_object_ring(POLICIES[0])
process_arg_checker = []
nodes = [node for node in
ring.get_part_nodes(int(cur_part))
@ -991,7 +1005,8 @@ class TestObjectReplicator(unittest.TestCase):
# Write some files into '1' and run replicate- they should be moved
# to the other partitions and then node should get deleted.
cur_part = '1'
df = self.df_mgr.get_diskfile('sda', cur_part, 'a', 'c', 'o')
df = self.df_mgr.get_diskfile('sda', cur_part, 'a', 'c', 'o',
policy=POLICIES.legacy)
mkdirs(df._datadir)
f = open(os.path.join(df._datadir,
normalize_timestamp(time.time()) + '.data'),
@ -1002,10 +1017,11 @@ class TestObjectReplicator(unittest.TestCase):
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, cur_part, data_dir)
process_arg_checker = []
ring = replicator.get_object_ring(0)
ring = replicator.load_object_ring(POLICIES[0])
nodes = [node for node in
ring.get_part_nodes(int(cur_part))
if node['ip'] not in _ips()]
for node in nodes:
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'],
cur_part)
@ -1069,8 +1085,8 @@ class TestObjectReplicator(unittest.TestCase):
expect = 'Error syncing partition'
for job in jobs:
set_default(self)
ring = self.replicator.get_object_ring(job['policy_idx'])
self.headers['X-Backend-Storage-Policy-Index'] = job['policy_idx']
ring = job['policy'].object_ring
self.headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
self.replicator.update(job)
self.assertTrue(error in mock_logger.error.call_args[0][0])
self.assertTrue(expect in mock_logger.exception.call_args[0][0])
@ -1116,7 +1132,7 @@ class TestObjectReplicator(unittest.TestCase):
for job in jobs:
set_default(self)
# limit local job to policy 0 for simplicity
if job['partition'] == '0' and job['policy_idx'] == 0:
if job['partition'] == '0' and int(job['policy']) == 0:
local_job = job.copy()
continue
self.replicator.update(job)

View File

@ -68,13 +68,14 @@ class TestObjectController(unittest.TestCase):
self.tmpdir = mkdtemp()
self.testdir = os.path.join(self.tmpdir,
'tmp_test_object_server_ObjectController')
conf = {'devices': self.testdir, 'mount_check': 'false'}
mkdirs(os.path.join(self.testdir, 'sda1'))
self.conf = {'devices': self.testdir, 'mount_check': 'false'}
self.object_controller = object_server.ObjectController(
conf, logger=debug_logger())
self.conf, logger=debug_logger())
self.object_controller.bytes_per_sync = 1
self._orig_tpool_exc = tpool.execute
tpool.execute = lambda f, *args, **kwargs: f(*args, **kwargs)
self.df_mgr = diskfile.DiskFileManager(conf,
self.df_mgr = diskfile.DiskFileManager(self.conf,
self.object_controller.logger)
self.logger = debug_logger('test-object-controller')
@ -86,7 +87,7 @@ class TestObjectController(unittest.TestCase):
def _stage_tmp_dir(self, policy):
mkdirs(os.path.join(self.testdir, 'sda1',
diskfile.get_tmp_dir(int(policy))))
diskfile.get_tmp_dir(policy)))
def check_all_api_methods(self, obj_name='o', alt_res=None):
path = '/sda1/p/a/c/%s' % obj_name
@ -419,7 +420,8 @@ class TestObjectController(unittest.TestCase):
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 201)
objfile = self.df_mgr.get_diskfile('sda1', 'p', 'a', 'c', 'o')
objfile = self.df_mgr.get_diskfile('sda1', 'p', 'a', 'c', 'o',
policy=POLICIES.legacy)
objfile.open()
file_name = os.path.basename(objfile._data_file)
with open(objfile._data_file) as fp:
@ -570,7 +572,7 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(resp.status_int, 201)
objfile = os.path.join(
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(0),
storage_directory(diskfile.get_data_dir(POLICIES[0]),
'p', hash_path('a', 'c', 'o')),
utils.Timestamp(timestamp).internal + '.data')
self.assert_(os.path.isfile(objfile))
@ -605,7 +607,7 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(resp.status_int, 201)
objfile = os.path.join(
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(0), 'p',
storage_directory(diskfile.get_data_dir(POLICIES[0]), 'p',
hash_path('a', 'c', 'o')),
utils.Timestamp(timestamp).internal + '.data')
self.assert_(os.path.isfile(objfile))
@ -640,7 +642,7 @@ class TestObjectController(unittest.TestCase):
self.assertEqual(resp.status_int, 201)
objfile = os.path.join(
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(0), 'p',
storage_directory(diskfile.get_data_dir(POLICIES[0]), 'p',
hash_path('a', 'c', 'o')),
utils.Timestamp(timestamp).internal + '.data')
self.assertTrue(os.path.isfile(objfile))
@ -717,7 +719,7 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(resp.status_int, 201)
objfile = os.path.join(
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(0), 'p',
storage_directory(diskfile.get_data_dir(POLICIES[0]), 'p',
hash_path('a', 'c', 'o')),
utils.Timestamp(timestamp).internal + '.data')
self.assert_(os.path.isfile(objfile))
@ -790,7 +792,7 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(resp.status_int, 201)
objfile = os.path.join(
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(0), 'p',
storage_directory(diskfile.get_data_dir(POLICIES[0]), 'p',
hash_path('a', 'c', 'o')),
timestamp + '.data')
self.assert_(os.path.isfile(objfile))
@ -833,7 +835,7 @@ class TestObjectController(unittest.TestCase):
# original .data file metadata should be unchanged
objfile = os.path.join(
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(0), 'p',
storage_directory(diskfile.get_data_dir(POLICIES[0]), 'p',
hash_path('a', 'c', 'o')),
timestamp1 + '.data')
self.assert_(os.path.isfile(objfile))
@ -851,7 +853,7 @@ class TestObjectController(unittest.TestCase):
# .meta file metadata should have only user meta items
metafile = os.path.join(
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(0), 'p',
storage_directory(diskfile.get_data_dir(POLICIES[0]), 'p',
hash_path('a', 'c', 'o')),
timestamp2 + '.meta')
self.assert_(os.path.isfile(metafile))
@ -1060,7 +1062,7 @@ class TestObjectController(unittest.TestCase):
objfile = os.path.join(
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(0), 'p',
storage_directory(diskfile.get_data_dir(POLICIES[0]), 'p',
hash_path('a', 'c', 'o')),
utils.Timestamp(timestamp).internal + '.data')
os.unlink(objfile)
@ -1104,7 +1106,8 @@ class TestObjectController(unittest.TestCase):
req.body = 'VERIFY'
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 201)
disk_file = self.df_mgr.get_diskfile('sda1', 'p', 'a', 'c', 'o')
disk_file = self.df_mgr.get_diskfile('sda1', 'p', 'a', 'c', 'o',
policy=POLICIES.legacy)
disk_file.open()
file_name = os.path.basename(disk_file._data_file)
@ -1203,7 +1206,7 @@ class TestObjectController(unittest.TestCase):
objfile = os.path.join(
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(0), 'p',
storage_directory(diskfile.get_data_dir(POLICIES[0]), 'p',
hash_path('a', 'c', 'o')),
utils.Timestamp(timestamp).internal + '.data')
os.unlink(objfile)
@ -1694,7 +1697,8 @@ class TestObjectController(unittest.TestCase):
req.body = 'VERIFY'
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 201)
disk_file = self.df_mgr.get_diskfile('sda1', 'p', 'a', 'c', 'o')
disk_file = self.df_mgr.get_diskfile('sda1', 'p', 'a', 'c', 'o',
policy=POLICIES.legacy)
disk_file.open()
file_name = os.path.basename(disk_file._data_file)
etag = md5()
@ -1726,7 +1730,8 @@ class TestObjectController(unittest.TestCase):
req.body = 'VERIFY'
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 201)
disk_file = self.df_mgr.get_diskfile('sda1', 'p', 'a', 'c', 'o')
disk_file = self.df_mgr.get_diskfile('sda1', 'p', 'a', 'c', 'o',
policy=POLICIES.legacy)
disk_file.open()
file_name = os.path.basename(disk_file._data_file)
with open(disk_file._data_file) as fp:
@ -1754,7 +1759,8 @@ class TestObjectController(unittest.TestCase):
req.body = 'VERIFY'
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 201)
disk_file = self.df_mgr.get_diskfile('sda1', 'p', 'a', 'c', 'o')
disk_file = self.df_mgr.get_diskfile('sda1', 'p', 'a', 'c', 'o',
policy=POLICIES.legacy)
disk_file.open()
file_name = os.path.basename(disk_file._data_file)
etag = md5()
@ -1812,7 +1818,6 @@ class TestObjectController(unittest.TestCase):
environ={'REQUEST_METHOD': 'DELETE'})
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 400)
# self.assertRaises(KeyError, self.object_controller.DELETE, req)
# The following should have created a tombstone file
timestamp = normalize_timestamp(1000)
@ -1823,7 +1828,7 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(resp.status_int, 404)
ts_1000_file = os.path.join(
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(0), 'p',
storage_directory(diskfile.get_data_dir(POLICIES[0]), 'p',
hash_path('a', 'c', 'o')),
utils.Timestamp(timestamp).internal + '.ts')
self.assertTrue(os.path.isfile(ts_1000_file))
@ -1839,7 +1844,7 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(resp.status_int, 404)
ts_999_file = os.path.join(
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(0), 'p',
storage_directory(diskfile.get_data_dir(POLICIES[0]), 'p',
hash_path('a', 'c', 'o')),
utils.Timestamp(timestamp).internal + '.ts')
self.assertFalse(os.path.isfile(ts_999_file))
@ -1859,7 +1864,7 @@ class TestObjectController(unittest.TestCase):
# There should now be 1000 ts and a 1001 data file.
data_1002_file = os.path.join(
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(0), 'p',
storage_directory(diskfile.get_data_dir(POLICIES[0]), 'p',
hash_path('a', 'c', 'o')),
orig_timestamp + '.data')
self.assertTrue(os.path.isfile(data_1002_file))
@ -1875,7 +1880,7 @@ class TestObjectController(unittest.TestCase):
self.assertEqual(resp.headers['X-Backend-Timestamp'], orig_timestamp)
ts_1001_file = os.path.join(
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(0), 'p',
storage_directory(diskfile.get_data_dir(POLICIES[0]), 'p',
hash_path('a', 'c', 'o')),
utils.Timestamp(timestamp).internal + '.ts')
self.assertFalse(os.path.isfile(ts_1001_file))
@ -1890,7 +1895,7 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(resp.status_int, 204)
ts_1003_file = os.path.join(
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(0), 'p',
storage_directory(diskfile.get_data_dir(POLICIES[0]), 'p',
hash_path('a', 'c', 'o')),
utils.Timestamp(timestamp).internal + '.ts')
self.assertTrue(os.path.isfile(ts_1003_file))
@ -1932,7 +1937,7 @@ class TestObjectController(unittest.TestCase):
orig_timestamp.internal)
objfile = os.path.join(
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(0), 'p',
storage_directory(diskfile.get_data_dir(POLICIES[0]), 'p',
hash_path('a', 'c', 'o')),
utils.Timestamp(timestamp).internal + '.ts')
self.assertFalse(os.path.isfile(objfile))
@ -1951,7 +1956,7 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(resp.status_int, 204)
objfile = os.path.join(
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(0), 'p',
storage_directory(diskfile.get_data_dir(POLICIES[0]), 'p',
hash_path('a', 'c', 'o')),
utils.Timestamp(timestamp).internal + '.ts')
self.assert_(os.path.isfile(objfile))
@ -1970,7 +1975,7 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(resp.status_int, 404)
objfile = os.path.join(
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(0), 'p',
storage_directory(diskfile.get_data_dir(POLICIES[0]), 'p',
hash_path('a', 'c', 'o')),
utils.Timestamp(timestamp).internal + '.ts')
self.assert_(os.path.isfile(objfile))
@ -1989,7 +1994,7 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(resp.status_int, 404)
objfile = os.path.join(
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(0), 'p',
storage_directory(diskfile.get_data_dir(POLICIES[0]), 'p',
hash_path('a', 'c', 'o')),
utils.Timestamp(timestamp).internal + '.ts')
self.assertFalse(os.path.isfile(objfile))
@ -2556,8 +2561,8 @@ class TestObjectController(unittest.TestCase):
self.object_controller.async_update(
'PUT', 'a', 'c', 'o', '127.0.0.1:1234', 1, 'sdc1',
{'x-timestamp': '1', 'x-out': 'set',
'X-Backend-Storage-Policy-Index': policy.idx}, 'sda1',
policy.idx)
'X-Backend-Storage-Policy-Index': int(policy)}, 'sda1',
policy)
finally:
object_server.http_connect = orig_http_connect
self.assertEquals(
@ -2565,7 +2570,7 @@ class TestObjectController(unittest.TestCase):
['127.0.0.1', '1234', 'sdc1', 1, 'PUT', '/a/c/o', {
'x-timestamp': '1', 'x-out': 'set',
'user-agent': 'object-server %s' % os.getpid(),
'X-Backend-Storage-Policy-Index': policy.idx}])
'X-Backend-Storage-Policy-Index': int(policy)}])
@patch_policies([storage_policy.StoragePolicy(0, 'zero', True),
storage_policy.StoragePolicy(1, 'one'),
@ -2609,7 +2614,7 @@ class TestObjectController(unittest.TestCase):
headers={'X-Timestamp': '12345',
'Content-Type': 'application/burrito',
'Content-Length': '0',
'X-Backend-Storage-Policy-Index': policy.idx,
'X-Backend-Storage-Policy-Index': int(policy),
'X-Container-Partition': '20',
'X-Container-Host': '1.2.3.4:5',
'X-Container-Device': 'sdb1',
@ -2645,7 +2650,7 @@ class TestObjectController(unittest.TestCase):
'X-Backend-Storage-Policy-Index': '37',
'referer': 'PUT http://localhost/sda1/p/a/c/o',
'user-agent': 'object-server %d' % os.getpid(),
'X-Backend-Storage-Policy-Index': policy.idx,
'X-Backend-Storage-Policy-Index': int(policy),
'x-trans-id': '-'})})
self.assertEquals(
http_connect_args[1],
@ -2790,7 +2795,7 @@ class TestObjectController(unittest.TestCase):
int(delete_at_timestamp) /
self.object_controller.expiring_objects_container_divisor *
self.object_controller.expiring_objects_container_divisor)
req = Request.blank('/sda1/p/a/c/o', method='PUT', body='', headers={
headers = {
'Content-Type': 'text/plain',
'X-Timestamp': put_timestamp,
'X-Container-Host': '10.0.0.1:6001',
@ -2801,8 +2806,9 @@ class TestObjectController(unittest.TestCase):
'X-Delete-At-Partition': 'p',
'X-Delete-At-Host': '10.0.0.2:6002',
'X-Delete-At-Device': 'sda1',
'X-Backend-Storage-Policy-Index': int(policy),
})
'X-Backend-Storage-Policy-Index': int(policy)}
req = Request.blank(
'/sda1/p/a/c/o', method='PUT', body='', headers=headers)
with mocked_http_conn(
500, 500, give_connect=capture_updates) as fake_conn:
resp = req.get_response(self.object_controller)
@ -2838,7 +2844,7 @@ class TestObjectController(unittest.TestCase):
self.assertEqual(headers[key], str(value))
# check async pendings
async_dir = os.path.join(self.testdir, 'sda1',
diskfile.get_async_dir(policy.idx))
diskfile.get_async_dir(policy))
found_files = []
for root, dirs, files in os.walk(async_dir):
for f in files:
@ -2848,7 +2854,7 @@ class TestObjectController(unittest.TestCase):
if data['account'] == 'a':
self.assertEquals(
int(data['headers']
['X-Backend-Storage-Policy-Index']), policy.idx)
['X-Backend-Storage-Policy-Index']), int(policy))
elif data['account'] == '.expiring_objects':
self.assertEquals(
int(data['headers']
@ -2872,12 +2878,12 @@ class TestObjectController(unittest.TestCase):
self.object_controller.async_update(
'PUT', 'a', 'c', 'o', '127.0.0.1:1234', 1, 'sdc1',
{'x-timestamp': '1', 'x-out': 'set',
'X-Backend-Storage-Policy-Index': policy.idx}, 'sda1',
policy.idx)
'X-Backend-Storage-Policy-Index': int(policy)}, 'sda1',
policy)
finally:
object_server.http_connect = orig_http_connect
utils.HASH_PATH_PREFIX = _prefix
async_dir = diskfile.get_async_dir(policy.idx)
async_dir = diskfile.get_async_dir(policy)
self.assertEquals(
pickle.load(open(os.path.join(
self.testdir, 'sda1', async_dir, 'a83',
@ -2885,7 +2891,7 @@ class TestObjectController(unittest.TestCase):
utils.Timestamp(1).internal))),
{'headers': {'x-timestamp': '1', 'x-out': 'set',
'user-agent': 'object-server %s' % os.getpid(),
'X-Backend-Storage-Policy-Index': policy.idx},
'X-Backend-Storage-Policy-Index': int(policy)},
'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT'})
def test_async_update_saves_on_non_2xx(self):
@ -2916,9 +2922,9 @@ class TestObjectController(unittest.TestCase):
self.object_controller.async_update(
'PUT', 'a', 'c', 'o', '127.0.0.1:1234', 1, 'sdc1',
{'x-timestamp': '1', 'x-out': str(status),
'X-Backend-Storage-Policy-Index': policy.idx}, 'sda1',
policy.idx)
async_dir = diskfile.get_async_dir(policy.idx)
'X-Backend-Storage-Policy-Index': int(policy)}, 'sda1',
policy)
async_dir = diskfile.get_async_dir(policy)
self.assertEquals(
pickle.load(open(os.path.join(
self.testdir, 'sda1', async_dir, 'a83',
@ -2928,7 +2934,7 @@ class TestObjectController(unittest.TestCase):
'user-agent':
'object-server %s' % os.getpid(),
'X-Backend-Storage-Policy-Index':
policy.idx},
int(policy)},
'account': 'a', 'container': 'c', 'obj': 'o',
'op': 'PUT'})
finally:
@ -2992,8 +2998,8 @@ class TestObjectController(unittest.TestCase):
self.object_controller.async_update(
'PUT', 'a', 'c', 'o', '127.0.0.1:1234', 1, 'sdc1',
{'x-timestamp': '1', 'x-out': str(status)}, 'sda1',
policy.idx)
async_dir = diskfile.get_async_dir(int(policy))
policy)
async_dir = diskfile.get_async_dir(policy)
self.assertTrue(
os.path.exists(os.path.join(
self.testdir, 'sda1', async_dir, 'a83',
@ -3744,7 +3750,7 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(resp.body, 'TEST')
objfile = os.path.join(
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(0), 'p',
storage_directory(diskfile.get_data_dir(POLICIES[0]), 'p',
hash_path('a', 'c', 'o')),
utils.Timestamp(test_timestamp).internal + '.data')
self.assert_(os.path.isfile(objfile))
@ -3969,10 +3975,10 @@ class TestObjectController(unittest.TestCase):
def my_tpool_execute(func, *args, **kwargs):
return func(*args, **kwargs)
was_get_hashes = diskfile.get_hashes
was_get_hashes = diskfile.DiskFileManager._get_hashes
was_tpool_exe = tpool.execute
try:
diskfile.get_hashes = fake_get_hashes
diskfile.DiskFileManager._get_hashes = fake_get_hashes
tpool.execute = my_tpool_execute
req = Request.blank('/sda1/p/suff',
environ={'REQUEST_METHOD': 'REPLICATE'},
@ -3983,7 +3989,7 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(p_data, {1: 2})
finally:
tpool.execute = was_tpool_exe
diskfile.get_hashes = was_get_hashes
diskfile.DiskFileManager._get_hashes = was_get_hashes
def test_REPLICATE_timeout(self):
@ -3993,10 +3999,10 @@ class TestObjectController(unittest.TestCase):
def my_tpool_execute(func, *args, **kwargs):
return func(*args, **kwargs)
was_get_hashes = diskfile.get_hashes
was_get_hashes = diskfile.DiskFileManager._get_hashes
was_tpool_exe = tpool.execute
try:
diskfile.get_hashes = fake_get_hashes
diskfile.DiskFileManager._get_hashes = fake_get_hashes
tpool.execute = my_tpool_execute
req = Request.blank('/sda1/p/suff',
environ={'REQUEST_METHOD': 'REPLICATE'},
@ -4004,7 +4010,7 @@ class TestObjectController(unittest.TestCase):
self.assertRaises(Timeout, self.object_controller.REPLICATE, req)
finally:
tpool.execute = was_tpool_exe
diskfile.get_hashes = was_get_hashes
diskfile.DiskFileManager._get_hashes = was_get_hashes
def test_REPLICATE_insufficient_storage(self):
conf = {'devices': self.testdir, 'mount_check': 'true'}
@ -4429,6 +4435,7 @@ class TestObjectController(unittest.TestCase):
self.assertTrue(os.path.isdir(object_dir))
@patch_policies
class TestObjectServer(unittest.TestCase):
def setUp(self):

View File

@ -25,15 +25,16 @@ import eventlet
import mock
from swift.common import exceptions, utils
from swift.common.storage_policy import POLICIES
from swift.obj import ssync_sender, diskfile
from test.unit import DebugLogger, patch_policies
from test.unit import debug_logger, patch_policies
class FakeReplicator(object):
def __init__(self, testdir):
self.logger = mock.MagicMock()
self.logger = debug_logger('test-ssync-sender')
self.conn_timeout = 1
self.node_timeout = 2
self.http_timeout = 3
@ -43,7 +44,7 @@ class FakeReplicator(object):
'devices': testdir,
'mount_check': 'false',
}
self._diskfile_mgr = diskfile.DiskFileManager(conf, DebugLogger())
self._diskfile_mgr = diskfile.DiskFileManager(conf, self.logger)
class NullBufferedHTTPConnection(object):
@ -90,24 +91,27 @@ class FakeConnection(object):
self.closed = True
@patch_policies()
class TestSender(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender')
utils.mkdirs(os.path.join(self.testdir, 'dev'))
self.replicator = FakeReplicator(self.testdir)
self.sender = ssync_sender.Sender(self.replicator, None, None, None)
def tearDown(self):
shutil.rmtree(self.tmpdir, ignore_errors=1)
shutil.rmtree(self.tmpdir, ignore_errors=True)
def _make_open_diskfile(self, device='dev', partition='9',
account='a', container='c', obj='o', body='test',
extra_metadata=None, policy_idx=0):
extra_metadata=None, policy=None):
policy = policy or POLICIES.legacy
object_parts = account, container, obj
req_timestamp = utils.normalize_timestamp(time.time())
df = self.sender.daemon._diskfile_mgr.get_diskfile(
device, partition, *object_parts, policy_idx=policy_idx)
device, partition, *object_parts, policy=policy)
content_length = len(body)
etag = hashlib.md5(body).hexdigest()
with df.create() as writer:
@ -134,16 +138,16 @@ class TestSender(unittest.TestCase):
with mock.patch.object(ssync_sender.Sender, 'connect', connect):
node = dict(replication_ip='1.2.3.4', replication_port=5678,
device='sda1')
job = dict(partition='9')
job = dict(partition='9', policy=POLICIES.legacy)
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
self.sender.suffixes = ['abc']
success, candidates = self.sender()
self.assertFalse(success)
self.assertEquals(candidates, set())
call = self.replicator.logger.error.mock_calls[0]
self.assertEqual(
call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9'))
self.assertEqual(str(call[1][-1]), '1 second: test connect')
error_lines = self.replicator.logger.get_lines_for_level('error')
self.assertEqual(1, len(error_lines))
self.assertEqual('1.2.3.4:5678/sda1/9 1 second: test connect',
error_lines[0])
def test_call_catches_ReplicationException(self):
@ -153,45 +157,44 @@ class TestSender(unittest.TestCase):
with mock.patch.object(ssync_sender.Sender, 'connect', connect):
node = dict(replication_ip='1.2.3.4', replication_port=5678,
device='sda1')
job = dict(partition='9')
job = dict(partition='9', policy=POLICIES.legacy)
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
self.sender.suffixes = ['abc']
success, candidates = self.sender()
self.assertFalse(success)
self.assertEquals(candidates, set())
call = self.replicator.logger.error.mock_calls[0]
self.assertEqual(
call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9'))
self.assertEqual(str(call[1][-1]), 'test connect')
error_lines = self.replicator.logger.get_lines_for_level('error')
self.assertEqual(1, len(error_lines))
self.assertEqual('1.2.3.4:5678/sda1/9 test connect',
error_lines[0])
def test_call_catches_other_exceptions(self):
node = dict(replication_ip='1.2.3.4', replication_port=5678,
device='sda1')
job = dict(partition='9')
job = dict(partition='9', policy=POLICIES.legacy)
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
self.sender.suffixes = ['abc']
self.sender.connect = 'cause exception'
success, candidates = self.sender()
self.assertFalse(success)
self.assertEquals(candidates, set())
call = self.replicator.logger.exception.mock_calls[0]
self.assertEqual(
call[1],
('%s:%s/%s/%s EXCEPTION in replication.Sender', '1.2.3.4', 5678,
'sda1', '9'))
error_lines = self.replicator.logger.get_lines_for_level('error')
for line in error_lines:
self.assertTrue(line.startswith(
'1.2.3.4:5678/sda1/9 EXCEPTION in replication.Sender:'))
def test_call_catches_exception_handling_exception(self):
node = dict(replication_ip='1.2.3.4', replication_port=5678,
device='sda1')
job = None # Will cause inside exception handler to fail
job = node = None # Will cause inside exception handler to fail
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
self.sender.suffixes = ['abc']
self.sender.connect = 'cause exception'
success, candidates = self.sender()
self.assertFalse(success)
self.assertEquals(candidates, set())
self.replicator.logger.exception.assert_called_once_with(
'EXCEPTION in replication.Sender')
error_lines = self.replicator.logger.get_lines_for_level('error')
for line in error_lines:
self.assertTrue(line.startswith(
'EXCEPTION in replication.Sender'))
def test_call_calls_others(self):
self.sender.suffixes = ['abc']
@ -222,11 +225,10 @@ class TestSender(unittest.TestCase):
self.sender.updates.assert_called_once_with()
self.sender.disconnect.assert_called_once_with()
@patch_policies
def test_connect(self):
node = dict(replication_ip='1.2.3.4', replication_port=5678,
device='sda1')
job = dict(partition='9', policy_idx=1)
device='sda1', index=0)
job = dict(partition='9', policy=POLICIES[1])
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
self.sender.suffixes = ['abc']
with mock.patch(
@ -256,9 +258,9 @@ class TestSender(unittest.TestCase):
expected_calls))
def test_call_and_missing_check(self):
def yield_hashes(device, partition, policy_index, suffixes=None):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if device == 'dev' and partition == '9' and suffixes == ['abc'] \
and policy_index == 0:
and policy == POLICIES.legacy:
yield (
'/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc',
@ -269,7 +271,11 @@ class TestSender(unittest.TestCase):
'No match for %r %r %r' % (device, partition, suffixes))
self.sender.connection = FakeConnection()
self.sender.job = {'device': 'dev', 'partition': '9'}
self.sender.job = {
'device': 'dev',
'partition': '9',
'policy': POLICIES.legacy,
}
self.sender.suffixes = ['abc']
self.sender.response = FakeResponse(
chunk_body=(
@ -286,9 +292,9 @@ class TestSender(unittest.TestCase):
self.assertEqual(self.sender.failures, 0)
def test_call_and_missing_check_with_obj_list(self):
def yield_hashes(device, partition, policy_index, suffixes=None):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if device == 'dev' and partition == '9' and suffixes == ['abc'] \
and policy_index == 0:
and policy == POLICIES.legacy:
yield (
'/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc',
@ -297,7 +303,11 @@ class TestSender(unittest.TestCase):
else:
raise Exception(
'No match for %r %r %r' % (device, partition, suffixes))
job = {'device': 'dev', 'partition': '9'}
job = {
'device': 'dev',
'partition': '9',
'policy': POLICIES.legacy,
}
self.sender = ssync_sender.Sender(self.replicator, None, job, ['abc'],
['9d41d8cd98f00b204e9800998ecf0abc'])
self.sender.connection = FakeConnection()
@ -315,9 +325,9 @@ class TestSender(unittest.TestCase):
self.assertEqual(self.sender.failures, 0)
def test_call_and_missing_check_with_obj_list_but_required(self):
def yield_hashes(device, partition, policy_index, suffixes=None):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if device == 'dev' and partition == '9' and suffixes == ['abc'] \
and policy_index == 0:
and policy == POLICIES.legacy:
yield (
'/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc',
@ -326,7 +336,11 @@ class TestSender(unittest.TestCase):
else:
raise Exception(
'No match for %r %r %r' % (device, partition, suffixes))
job = {'device': 'dev', 'partition': '9'}
job = {
'device': 'dev',
'partition': '9',
'policy': POLICIES.legacy,
}
self.sender = ssync_sender.Sender(self.replicator, None, job, ['abc'],
['9d41d8cd98f00b204e9800998ecf0abc'])
self.sender.connection = FakeConnection()
@ -347,7 +361,7 @@ class TestSender(unittest.TestCase):
self.replicator.conn_timeout = 0.01
node = dict(replication_ip='1.2.3.4', replication_port=5678,
device='sda1')
job = dict(partition='9')
job = dict(partition='9', policy=POLICIES.legacy)
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
self.sender.suffixes = ['abc']
@ -360,16 +374,16 @@ class TestSender(unittest.TestCase):
success, candidates = self.sender()
self.assertFalse(success)
self.assertEquals(candidates, set())
call = self.replicator.logger.error.mock_calls[0]
self.assertEqual(
call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9'))
self.assertEqual(str(call[1][-1]), '0.01 seconds: connect send')
error_lines = self.replicator.logger.get_lines_for_level('error')
for line in error_lines:
self.assertTrue(line.startswith(
'1.2.3.4:5678/sda1/9 0.01 seconds: connect send'))
def test_connect_receive_timeout(self):
self.replicator.node_timeout = 0.02
node = dict(replication_ip='1.2.3.4', replication_port=5678,
device='sda1')
job = dict(partition='9')
device='sda1', index=0)
job = dict(partition='9', policy=POLICIES.legacy)
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
self.sender.suffixes = ['abc']
@ -384,16 +398,16 @@ class TestSender(unittest.TestCase):
success, candidates = self.sender()
self.assertFalse(success)
self.assertEquals(candidates, set())
call = self.replicator.logger.error.mock_calls[0]
self.assertEqual(
call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9'))
self.assertEqual(str(call[1][-1]), '0.02 seconds: connect receive')
error_lines = self.replicator.logger.get_lines_for_level('error')
for line in error_lines:
self.assertTrue(line.startswith(
'1.2.3.4:5678/sda1/9 0.02 seconds: connect receive'))
def test_connect_bad_status(self):
self.replicator.node_timeout = 0.02
node = dict(replication_ip='1.2.3.4', replication_port=5678,
device='sda1')
job = dict(partition='9')
job = dict(partition='9', policy=POLICIES.legacy)
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
self.sender.suffixes = ['abc']
@ -409,10 +423,10 @@ class TestSender(unittest.TestCase):
success, candidates = self.sender()
self.assertFalse(success)
self.assertEquals(candidates, set())
call = self.replicator.logger.error.mock_calls[0]
self.assertEqual(
call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9'))
self.assertEqual(str(call[1][-1]), 'Expected status 200; got 503')
error_lines = self.replicator.logger.get_lines_for_level('error')
for line in error_lines:
self.assertTrue(line.startswith(
'1.2.3.4:5678/sda1/9 Expected status 200; got 503'))
def test_readline_newline_in_buffer(self):
self.sender.response_buffer = 'Has a newline already.\r\nOkay.'
@ -473,16 +487,21 @@ class TestSender(unittest.TestCase):
self.assertRaises(exceptions.MessageTimeout, self.sender.missing_check)
def test_missing_check_has_empty_suffixes(self):
def yield_hashes(device, partition, policy_idx, suffixes=None):
if (device != 'dev' or partition != '9' or policy_idx != 0 or
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if (device != 'dev' or partition != '9' or
policy != POLICIES.legacy or
suffixes != ['abc', 'def']):
yield # Just here to make this a generator
raise Exception(
'No match for %r %r %r %r' % (device, partition,
policy_idx, suffixes))
policy, suffixes))
self.sender.connection = FakeConnection()
self.sender.job = {'device': 'dev', 'partition': '9'}
self.sender.job = {
'device': 'dev',
'partition': '9',
'policy': POLICIES.legacy,
}
self.sender.suffixes = ['abc', 'def']
self.sender.response = FakeResponse(
chunk_body=(
@ -498,8 +517,9 @@ class TestSender(unittest.TestCase):
self.assertEqual(self.sender.available_set, set())
def test_missing_check_has_suffixes(self):
def yield_hashes(device, partition, policy_idx, suffixes=None):
if (device == 'dev' and partition == '9' and policy_idx == 0 and
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if (device == 'dev' and partition == '9' and
policy == POLICIES.legacy and
suffixes == ['abc', 'def']):
yield (
'/srv/node/dev/objects/9/abc/'
@ -519,10 +539,14 @@ class TestSender(unittest.TestCase):
else:
raise Exception(
'No match for %r %r %r %r' % (device, partition,
policy_idx, suffixes))
policy, suffixes))
self.sender.connection = FakeConnection()
self.sender.job = {'device': 'dev', 'partition': '9'}
self.sender.job = {
'device': 'dev',
'partition': '9',
'policy': POLICIES.legacy,
}
self.sender.suffixes = ['abc', 'def']
self.sender.response = FakeResponse(
chunk_body=(
@ -544,8 +568,9 @@ class TestSender(unittest.TestCase):
self.assertEqual(self.sender.available_set, set(candidates))
def test_missing_check_far_end_disconnect(self):
def yield_hashes(device, partition, policy_idx, suffixes=None):
if (device == 'dev' and partition == '9' and policy_idx == 0 and
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if (device == 'dev' and partition == '9' and
policy == POLICIES.legacy and
suffixes == ['abc']):
yield (
'/srv/node/dev/objects/9/abc/'
@ -555,10 +580,14 @@ class TestSender(unittest.TestCase):
else:
raise Exception(
'No match for %r %r %r %r' % (device, partition,
policy_idx, suffixes))
policy, suffixes))
self.sender.connection = FakeConnection()
self.sender.job = {'device': 'dev', 'partition': '9'}
self.sender.job = {
'device': 'dev',
'partition': '9',
'policy': POLICIES.legacy,
}
self.sender.suffixes = ['abc']
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.response = FakeResponse(chunk_body='\r\n')
@ -577,8 +606,9 @@ class TestSender(unittest.TestCase):
set(['9d41d8cd98f00b204e9800998ecf0abc']))
def test_missing_check_far_end_disconnect2(self):
def yield_hashes(device, partition, policy_idx, suffixes=None):
if (device == 'dev' and partition == '9' and policy_idx == 0 and
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if (device == 'dev' and partition == '9' and
policy == POLICIES.legacy and
suffixes == ['abc']):
yield (
'/srv/node/dev/objects/9/abc/'
@ -588,10 +618,14 @@ class TestSender(unittest.TestCase):
else:
raise Exception(
'No match for %r %r %r %r' % (device, partition,
policy_idx, suffixes))
policy, suffixes))
self.sender.connection = FakeConnection()
self.sender.job = {'device': 'dev', 'partition': '9'}
self.sender.job = {
'device': 'dev',
'partition': '9',
'policy': POLICIES.legacy,
}
self.sender.suffixes = ['abc']
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.response = FakeResponse(
@ -611,8 +645,9 @@ class TestSender(unittest.TestCase):
set(['9d41d8cd98f00b204e9800998ecf0abc']))
def test_missing_check_far_end_unexpected(self):
def yield_hashes(device, partition, policy_idx, suffixes=None):
if (device == 'dev' and partition == '9' and policy_idx == 0 and
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if (device == 'dev' and partition == '9' and
policy == POLICIES.legacy and
suffixes == ['abc']):
yield (
'/srv/node/dev/objects/9/abc/'
@ -622,10 +657,14 @@ class TestSender(unittest.TestCase):
else:
raise Exception(
'No match for %r %r %r %r' % (device, partition,
policy_idx, suffixes))
policy, suffixes))
self.sender.connection = FakeConnection()
self.sender.job = {'device': 'dev', 'partition': '9'}
self.sender.job = {
'device': 'dev',
'partition': '9',
'policy': POLICIES.legacy,
}
self.sender.suffixes = ['abc']
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.response = FakeResponse(chunk_body='OH HAI\r\n')
@ -644,8 +683,9 @@ class TestSender(unittest.TestCase):
set(['9d41d8cd98f00b204e9800998ecf0abc']))
def test_missing_check_send_list(self):
def yield_hashes(device, partition, policy_idx, suffixes=None):
if (device == 'dev' and partition == '9' and policy_idx == 0 and
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if (device == 'dev' and partition == '9' and
policy == POLICIES.legacy and
suffixes == ['abc']):
yield (
'/srv/node/dev/objects/9/abc/'
@ -655,10 +695,14 @@ class TestSender(unittest.TestCase):
else:
raise Exception(
'No match for %r %r %r %r' % (device, partition,
policy_idx, suffixes))
policy, suffixes))
self.sender.connection = FakeConnection()
self.sender.job = {'device': 'dev', 'partition': '9'}
self.sender.job = {
'device': 'dev',
'partition': '9',
'policy': POLICIES.legacy,
}
self.sender.suffixes = ['abc']
self.sender.response = FakeResponse(
chunk_body=(
@ -742,7 +786,11 @@ class TestSender(unittest.TestCase):
delete_timestamp = utils.normalize_timestamp(time.time())
df.delete(delete_timestamp)
self.sender.connection = FakeConnection()
self.sender.job = {'device': device, 'partition': part}
self.sender.job = {
'device': device,
'partition': part,
'policy': POLICIES.legacy,
}
self.sender.node = {}
self.sender.send_list = [object_hash]
self.sender.send_delete = mock.MagicMock()
@ -771,7 +819,11 @@ class TestSender(unittest.TestCase):
delete_timestamp = utils.normalize_timestamp(time.time())
df.delete(delete_timestamp)
self.sender.connection = FakeConnection()
self.sender.job = {'device': device, 'partition': part}
self.sender.job = {
'device': device,
'partition': part,
'policy': POLICIES.legacy,
}
self.sender.node = {}
self.sender.send_list = [object_hash]
self.sender.response = FakeResponse(
@ -797,7 +849,11 @@ class TestSender(unittest.TestCase):
object_hash = utils.hash_path(*object_parts)
expected = df.get_metadata()
self.sender.connection = FakeConnection()
self.sender.job = {'device': device, 'partition': part}
self.sender.job = {
'device': device,
'partition': part,
'policy': POLICIES.legacy,
}
self.sender.node = {}
self.sender.send_list = [object_hash]
self.sender.send_delete = mock.MagicMock()
@ -821,18 +877,20 @@ class TestSender(unittest.TestCase):
'11\r\n:UPDATES: START\r\n\r\n'
'f\r\n:UPDATES: END\r\n\r\n')
@patch_policies
def test_updates_storage_policy_index(self):
device = 'dev'
part = '9'
object_parts = ('a', 'c', 'o')
df = self._make_open_diskfile(device, part, *object_parts,
policy_idx=1)
policy=POLICIES[0])
object_hash = utils.hash_path(*object_parts)
expected = df.get_metadata()
self.sender.connection = FakeConnection()
self.sender.job = {'device': device, 'partition': part,
'policy_idx': 1}
self.sender.job = {
'device': device,
'partition': part,
'policy': POLICIES[0],
'frag_index': 0}
self.sender.node = {}
self.sender.send_list = [object_hash]
self.sender.send_delete = mock.MagicMock()
@ -847,7 +905,7 @@ class TestSender(unittest.TestCase):
self.assertEqual(path, '/a/c/o')
self.assert_(isinstance(df, diskfile.DiskFile))
self.assertEqual(expected, df.get_metadata())
self.assertEqual(os.path.join(self.testdir, 'dev/objects-1/9/',
self.assertEqual(os.path.join(self.testdir, 'dev/objects/9/',
object_hash[-3:], object_hash),
df._datadir)

View File

@ -70,7 +70,7 @@ class TestObjectUpdater(unittest.TestCase):
self.sda1 = os.path.join(self.devices_dir, 'sda1')
os.mkdir(self.sda1)
for policy in POLICIES:
os.mkdir(os.path.join(self.sda1, get_tmp_dir(int(policy))))
os.mkdir(os.path.join(self.sda1, get_tmp_dir(policy)))
self.logger = debug_logger()
def tearDown(self):
@ -169,8 +169,8 @@ class TestObjectUpdater(unittest.TestCase):
seen = set()
class MockObjectUpdater(object_updater.ObjectUpdater):
def process_object_update(self, update_path, device, idx):
seen.add((update_path, idx))
def process_object_update(self, update_path, device, policy):
seen.add((update_path, int(policy)))
os.unlink(update_path)
cu = MockObjectUpdater({
@ -216,7 +216,7 @@ class TestObjectUpdater(unittest.TestCase):
'concurrency': '1',
'node_timeout': '15'})
cu.run_once()
async_dir = os.path.join(self.sda1, get_async_dir(0))
async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0]))
os.mkdir(async_dir)
cu.run_once()
self.assert_(os.path.exists(async_dir))
@ -253,7 +253,7 @@ class TestObjectUpdater(unittest.TestCase):
'concurrency': '1',
'node_timeout': '15'}, logger=self.logger)
cu.run_once()
async_dir = os.path.join(self.sda1, get_async_dir(0))
async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0]))
os.mkdir(async_dir)
cu.run_once()
self.assert_(os.path.exists(async_dir))
@ -393,7 +393,7 @@ class TestObjectUpdater(unittest.TestCase):
'mount_check': 'false',
'swift_dir': self.testdir,
}
async_dir = os.path.join(self.sda1, get_async_dir(policy.idx))
async_dir = os.path.join(self.sda1, get_async_dir(policy))
os.mkdir(async_dir)
account, container, obj = 'a', 'c', 'o'
@ -412,7 +412,7 @@ class TestObjectUpdater(unittest.TestCase):
data = {'op': op, 'account': account, 'container': container,
'obj': obj, 'headers': headers_out}
dfmanager.pickle_async_update(self.sda1, account, container, obj,
data, ts.next(), policy.idx)
data, ts.next(), policy)
request_log = []
@ -428,7 +428,7 @@ class TestObjectUpdater(unittest.TestCase):
ip, part, method, path, headers, qs, ssl = request_args
self.assertEqual(method, op)
self.assertEqual(headers['X-Backend-Storage-Policy-Index'],
str(policy.idx))
str(int(policy)))
self.assertEqual(daemon.logger.get_increment_counts(),
{'successes': 1, 'unlinks': 1,
'async_pendings': 1})
@ -444,7 +444,7 @@ class TestObjectUpdater(unittest.TestCase):
'swift_dir': self.testdir,
}
daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
async_dir = os.path.join(self.sda1, get_async_dir(policy.idx))
async_dir = os.path.join(self.sda1, get_async_dir(policy))
os.mkdir(async_dir)
# write an async
@ -456,12 +456,12 @@ class TestObjectUpdater(unittest.TestCase):
'x-content-type': 'text/plain',
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
'x-timestamp': ts.next(),
'X-Backend-Storage-Policy-Index': policy.idx,
'X-Backend-Storage-Policy-Index': int(policy),
})
data = {'op': op, 'account': account, 'container': container,
'obj': obj, 'headers': headers_out}
dfmanager.pickle_async_update(self.sda1, account, container, obj,
data, ts.next(), policy.idx)
data, ts.next(), policy)
request_log = []
@ -481,7 +481,7 @@ class TestObjectUpdater(unittest.TestCase):
ip, part, method, path, headers, qs, ssl = request_args
self.assertEqual(method, 'PUT')
self.assertEqual(headers['X-Backend-Storage-Policy-Index'],
str(policy.idx))
str(int(policy)))
self.assertEqual(daemon.logger.get_increment_counts(),
{'successes': 1, 'unlinks': 1, 'async_pendings': 1})

View File

@ -34,6 +34,21 @@ class TestProxyServer(test_server.TestProxyServer):
class TestObjectController(test_server.TestObjectController):
def test_PUT_no_etag_fallocate(self):
# mem server doesn't call fallocate(), believe it or not
pass
# these tests all go looking in the filesystem
def test_policy_IO(self):
pass
def test_PUT_ec(self):
pass
def test_PUT_ec_multiple_segments(self):
pass
def test_PUT_ec_fragment_archive_etag_mismatch(self):
pass

View File

@ -87,10 +87,9 @@ def do_setup(the_object_server):
os.path.join(mkdtemp(), 'tmp_test_proxy_server_chunked')
mkdirs(_testdir)
rmtree(_testdir)
mkdirs(os.path.join(_testdir, 'sda1'))
mkdirs(os.path.join(_testdir, 'sda1', 'tmp'))
mkdirs(os.path.join(_testdir, 'sdb1'))
mkdirs(os.path.join(_testdir, 'sdb1', 'tmp'))
for drive in ('sda1', 'sdb1', 'sdc1', 'sdd1', 'sde1',
'sdf1', 'sdg1', 'sdh1', 'sdi1'):
mkdirs(os.path.join(_testdir, drive, 'tmp'))
conf = {'devices': _testdir, 'swift_dir': _testdir,
'mount_check': 'false', 'allowed_headers':
'content-encoding, x-object-manifest, content-disposition, foo',
@ -1014,20 +1013,14 @@ class TestObjectController(unittest.TestCase):
@unpatch_policies
def test_policy_IO(self):
if hasattr(_test_servers[-1], '_filesystem'):
# ironically, the _filesystem attribute on the object server means
# the in-memory diskfile is in use, so this test does not apply
return
def check_file(policy_idx, cont, devs, check_val):
partition, nodes = prosrv.get_object_ring(policy_idx).get_nodes(
'a', cont, 'o')
def check_file(policy, cont, devs, check_val):
partition, nodes = policy.object_ring.get_nodes('a', cont, 'o')
conf = {'devices': _testdir, 'mount_check': 'false'}
df_mgr = diskfile.DiskFileManager(conf, FakeLogger())
for dev in devs:
file = df_mgr.get_diskfile(dev, partition, 'a',
cont, 'o',
policy_idx=policy_idx)
policy=policy)
if check_val is True:
file.open()
@ -1058,8 +1051,8 @@ class TestObjectController(unittest.TestCase):
self.assertEqual(res.status_int, 200)
self.assertEqual(res.body, obj)
check_file(0, 'c', ['sda1', 'sdb1'], True)
check_file(0, 'c', ['sdc1', 'sdd1', 'sde1', 'sdf1'], False)
check_file(POLICIES[0], 'c', ['sda1', 'sdb1'], True)
check_file(POLICIES[0], 'c', ['sdc1', 'sdd1', 'sde1', 'sdf1'], False)
# check policy 1: put file on c1, read it back, check loc on disk
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
@ -1084,8 +1077,8 @@ class TestObjectController(unittest.TestCase):
self.assertEqual(res.status_int, 200)
self.assertEqual(res.body, obj)
check_file(1, 'c1', ['sdc1', 'sdd1'], True)
check_file(1, 'c1', ['sda1', 'sdb1', 'sde1', 'sdf1'], False)
check_file(POLICIES[1], 'c1', ['sdc1', 'sdd1'], True)
check_file(POLICIES[1], 'c1', ['sda1', 'sdb1', 'sde1', 'sdf1'], False)
# check policy 2: put file on c2, read it back, check loc on disk
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
@ -1110,8 +1103,8 @@ class TestObjectController(unittest.TestCase):
self.assertEqual(res.status_int, 200)
self.assertEqual(res.body, obj)
check_file(2, 'c2', ['sde1', 'sdf1'], True)
check_file(2, 'c2', ['sda1', 'sdb1', 'sdc1', 'sdd1'], False)
check_file(POLICIES[2], 'c2', ['sde1', 'sdf1'], True)
check_file(POLICIES[2], 'c2', ['sda1', 'sdb1', 'sdc1', 'sdd1'], False)
@unpatch_policies
def test_policy_IO_override(self):
@ -1146,7 +1139,7 @@ class TestObjectController(unittest.TestCase):
conf = {'devices': _testdir, 'mount_check': 'false'}
df_mgr = diskfile.DiskFileManager(conf, FakeLogger())
df = df_mgr.get_diskfile(node['device'], partition, 'a',
'c1', 'wrong-o', policy_idx=2)
'c1', 'wrong-o', policy=POLICIES[2])
with df.open():
contents = ''.join(df.reader())
self.assertEqual(contents, "hello")
@ -1178,7 +1171,7 @@ class TestObjectController(unittest.TestCase):
self.assertEqual(res.status_int, 204)
df = df_mgr.get_diskfile(node['device'], partition, 'a',
'c1', 'wrong-o', policy_idx=2)
'c1', 'wrong-o', policy=POLICIES[2])
try:
df.open()
except DiskFileNotExist as e:

View File

@ -135,7 +135,7 @@ class TestObjectSysmeta(unittest.TestCase):
self.tmpdir = mkdtemp()
self.testdir = os.path.join(self.tmpdir,
'tmp_test_object_server_ObjectController')
mkdirs(os.path.join(self.testdir, 'sda1', 'tmp'))
mkdirs(os.path.join(self.testdir, 'sda', 'tmp'))
conf = {'devices': self.testdir, 'mount_check': 'false'}
self.obj_ctlr = object_server.ObjectController(
conf, logger=debug_logger('obj-ut'))