Add support to increase object ring partition power
This patch adds methods to increase the partition power of an existing object ring without downtime for the users using a 3-step process. Data won't be moved to other nodes; objects using the new increased partition power will be located on the same device and are hardlinked to avoid data movement. 1. A new setting "next_part_power" will be added to the rings, and once the proxy server reloaded the rings it will send this value to the object servers on any write operation. Object servers will now create a hard-link in the new location to the original DiskFile object. Already existing data will be relinked using a new tool in the new locations using hardlinks. 2. The actual partition power itself will be increased. Servers will now use the new partition power to read from and write to. No longer required hard links in the old object location have to be removed now by the relinker tool; the relinker tool reads the next_part_power setting to find object locations that need to be cleaned up. 3. The "next_part_power" flag will be removed. This mostly implements the spec in [1]; however it's not using an "epoch" as described there. The idea of the epoch was to store data using different partition powers in their own namespace to avoid conflicts with auditors and replicators as well as being able to abort such an operation and just remove the new tree. This would require some heavy change of the on-disk data layout, and other object-server implementations would be required to adopt this scheme too. Instead the object-replicator is now aware that there is a partition power increase in progress and will skip replication of data in that storage policy; the relinker tool should be simply run and afterwards the partition power will be increased. This shouldn't take that much time (it's only walking the filesystem and hardlinking); impact should be low therefore. The relinker should be run on all storage nodes at the same time in parallel to decrease the required time (though this is not mandatory). Failures during relinking should not affect cluster operations - relinking can be even aborted manually and restarted later. Auditors are not quarantining objects written to a path with a different partition power and therefore working as before (though they are reading each object twice in the worst case before the no longer needed hard links are removed). Co-Authored-By: Alistair Coles <alistair.coles@hpe.com> Co-Authored-By: Matthew Oliver <matt@oliver.net.au> Co-Authored-By: Tim Burke <tim.burke@gmail.com> [1] https://specs.openstack.org/openstack/swift-specs/specs/in_progress/ increasing_partition_power.html Change-Id: I7d6371a04f5c1c4adbb8733a71f3c177ee5448bb
This commit is contained in:
parent
45b17d89c7
commit
e1140666d6
39
bin/swift-object-relinker
Executable file
39
bin/swift-object-relinker
Executable file
@ -0,0 +1,39 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import sys
|
||||||
|
|
||||||
|
from swift.cli.relinker import main
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description='Relink and cleanup objects to increase partition power')
|
||||||
|
parser.add_argument('action', choices=['relink', 'cleanup'])
|
||||||
|
parser.add_argument('--swift-dir', default='/etc/swift',
|
||||||
|
dest='swift_dir', help='Path to swift directory')
|
||||||
|
parser.add_argument('--devices', default='/srv/node',
|
||||||
|
dest='devices', help='Path to swift device directory')
|
||||||
|
parser.add_argument('--skip-mount-check', default=False,
|
||||||
|
action="store_true", dest='skip_mount_check')
|
||||||
|
parser.add_argument('--logfile', default=None,
|
||||||
|
dest='logfile')
|
||||||
|
parser.add_argument('--debug', default=False, action='store_true')
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
sys.exit(main(args))
|
@ -62,6 +62,7 @@ Overview and Concepts
|
|||||||
overview_encryption
|
overview_encryption
|
||||||
overview_backing_store
|
overview_backing_store
|
||||||
ring_background
|
ring_background
|
||||||
|
ring_partpower
|
||||||
associated_projects
|
associated_projects
|
||||||
|
|
||||||
Developer Documentation
|
Developer Documentation
|
||||||
|
184
doc/source/ring_partpower.rst
Normal file
184
doc/source/ring_partpower.rst
Normal file
@ -0,0 +1,184 @@
|
|||||||
|
==============================
|
||||||
|
Modifying Ring Partition Power
|
||||||
|
==============================
|
||||||
|
|
||||||
|
The ring partition power determines the on-disk location of data files and is
|
||||||
|
selected when creating a new ring. In normal operation, it is a fixed value.
|
||||||
|
This is because a different partition power results in a different on-disk
|
||||||
|
location for all data files.
|
||||||
|
|
||||||
|
However, increasing the partition power by 1 can be done by choosing locations
|
||||||
|
that are on the same disk. As a result, we can create hard-links for both the
|
||||||
|
new and old locations, avoiding data movement without impacting availability.
|
||||||
|
|
||||||
|
To enable a partition power change without interrupting user access, object
|
||||||
|
servers need to be aware of it in advance. Therefore a partition power change
|
||||||
|
needs to be done in multiple steps.
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
|
||||||
|
Do not increase the partition power on account and container rings.
|
||||||
|
Increasing the partition power is *only* supported for object rings.
|
||||||
|
Trying to increase the part_power for account and container rings *will*
|
||||||
|
result in unavailability, maybe even data loss.
|
||||||
|
|
||||||
|
|
||||||
|
-------
|
||||||
|
Caveats
|
||||||
|
-------
|
||||||
|
|
||||||
|
Before increasing the partition power, consider the possible drawbacks.
|
||||||
|
There are a few caveats when increasing the partition power:
|
||||||
|
|
||||||
|
* All hashes.pkl files will become invalid once hard links are created, and the
|
||||||
|
replicators will need significantly more time on the first run after finishing
|
||||||
|
the partition power increase.
|
||||||
|
* Object replicators will skip partitions during the partition power increase.
|
||||||
|
Replicators are not aware of hard-links, and would simply copy the content;
|
||||||
|
this would result in heavy data movement and the worst case would be that all
|
||||||
|
data is stored twice.
|
||||||
|
* Due to the fact that each object will now be hard linked from two locations,
|
||||||
|
many more inodes will be used - expect around twice the amount. You need to
|
||||||
|
check the free inode count *before* increasing the partition power.
|
||||||
|
* Also, object auditors might read each object twice before cleanup removes the
|
||||||
|
second hard link.
|
||||||
|
* Due to the new inodes more memory is needed to cache them, and your
|
||||||
|
object servers should have plenty of available memory to avoid running out of
|
||||||
|
inode cache. Setting ``vfs_cache_pressure`` to 1 might help with that.
|
||||||
|
* All nodes in the cluster *must* run at least Swift version 2.13.0 or later.
|
||||||
|
|
||||||
|
Due to these caveats you should only increase the partition power if really
|
||||||
|
needed, i.e. if the number of partitions per disk is extremely low and the data
|
||||||
|
is distributed unevenly across disks.
|
||||||
|
|
||||||
|
-----------------------------------
|
||||||
|
1. Prepare partition power increase
|
||||||
|
-----------------------------------
|
||||||
|
|
||||||
|
The swift-ring-builder is used to prepare the ring for an upcoming partition
|
||||||
|
power increase. It will store a new variable ``next_part_power`` with the current
|
||||||
|
partition power + 1. Object servers recognize this, and hard links to the new
|
||||||
|
location will be created (or deleted) on every PUT or DELETE. This will make
|
||||||
|
it possible to access newly written objects using the future partition power::
|
||||||
|
|
||||||
|
swift-ring-builder <builder-file> prepare_increase_partition_power
|
||||||
|
swift-ring-builder <builder-file> write_ring
|
||||||
|
|
||||||
|
Now you need to copy the updated .ring.gz to all nodes. Already existing data
|
||||||
|
needs to be relinked too; therefore an operator has to run a relinker command
|
||||||
|
on all object servers in this phase::
|
||||||
|
|
||||||
|
swift-object-relinker relink
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
|
||||||
|
Start relinking after *all* the servers re-read the modified ring files,
|
||||||
|
which normally happens within 15 seconds after writing a modified ring.
|
||||||
|
Also, make sure the modified rings are pushed to all nodes running object
|
||||||
|
services (replicators, reconstructors and reconcilers)- they have to skip
|
||||||
|
partitions during relinking.
|
||||||
|
|
||||||
|
Relinking might take some time; while there is no data copied or actually
|
||||||
|
moved, the tool still needs to walk the whole file system and create new hard
|
||||||
|
links as required.
|
||||||
|
|
||||||
|
---------------------------
|
||||||
|
2. Increase partition power
|
||||||
|
---------------------------
|
||||||
|
|
||||||
|
Now that all existing data can be found using the new location, it's time to
|
||||||
|
actually increase the partition power itself::
|
||||||
|
|
||||||
|
swift-ring-builder <builder-file> increase_partition_power
|
||||||
|
swift-ring-builder <builder-file> write_ring
|
||||||
|
|
||||||
|
Now you need to copy the updated .ring.gz again to all nodes. Object servers
|
||||||
|
are now using the new, increased partition power and no longer create
|
||||||
|
additional hard links.
|
||||||
|
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
|
||||||
|
The object servers will create additional hard links for each modified or
|
||||||
|
new object, and this requires more inodes.
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
|
||||||
|
If you decide you don't want to increase the partition power, you should
|
||||||
|
instead cancel the increase. It is not possible to revert this operation
|
||||||
|
once started. To abort the partition power increase, execute the following
|
||||||
|
commands, copy the updated .ring.gz files to all nodes and continue with
|
||||||
|
`3. Cleanup`_ afterwards::
|
||||||
|
|
||||||
|
swift-ring-builder <builder-file> cancel_increase_partition_power
|
||||||
|
swift-ring-builder <builder-file> write_ring
|
||||||
|
|
||||||
|
|
||||||
|
----------
|
||||||
|
3. Cleanup
|
||||||
|
----------
|
||||||
|
|
||||||
|
Existing hard links in the old locations need to be removed, and a cleanup tool
|
||||||
|
is provided to do this. Run the following command on each storage node::
|
||||||
|
|
||||||
|
swift-object-relinker cleanup
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
|
||||||
|
The cleanup must be finished within your object servers reclaim_age period
|
||||||
|
(which is by default 1 week). Otherwise objects that have been overwritten
|
||||||
|
between step #1 and step #2 and deleted afterwards can't be cleaned up
|
||||||
|
anymore.
|
||||||
|
|
||||||
|
Afterwards it is required to update the rings one last
|
||||||
|
time to inform servers that all steps to increase the partition power are done,
|
||||||
|
and replicators should resume their job::
|
||||||
|
|
||||||
|
swift-ring-builder <builder-file> finish_increase_partition_power
|
||||||
|
swift-ring-builder <builder-file> write_ring
|
||||||
|
|
||||||
|
Now you need to copy the updated .ring.gz again to all nodes.
|
||||||
|
|
||||||
|
----------
|
||||||
|
Background
|
||||||
|
----------
|
||||||
|
|
||||||
|
An existing object that is currently located on partition X will be placed
|
||||||
|
either on partition 2*X or 2*X+1 after the partition power is increased. The
|
||||||
|
reason for this is the Ring.get_part() method, that does a bitwise shift to the
|
||||||
|
right.
|
||||||
|
|
||||||
|
To avoid actual data movement to different disks or even nodes, the allocation
|
||||||
|
of partitions to nodes needs to be changed. The allocation is pairwise due to
|
||||||
|
the above mentioned new partition scheme. Therefore devices are allocated like
|
||||||
|
this, with the partition being the index and the value being the device id::
|
||||||
|
|
||||||
|
old new
|
||||||
|
part dev part dev
|
||||||
|
---- --- ---- ---
|
||||||
|
0 0 0 0
|
||||||
|
1 0
|
||||||
|
1 3 2 3
|
||||||
|
3 3
|
||||||
|
2 7 4 7
|
||||||
|
5 7
|
||||||
|
3 5 6 5
|
||||||
|
7 5
|
||||||
|
4 2 8 2
|
||||||
|
9 2
|
||||||
|
5 1 10 1
|
||||||
|
11 1
|
||||||
|
|
||||||
|
There is a helper method to compute the new path, and the following example
|
||||||
|
shows the mapping between old and new location::
|
||||||
|
|
||||||
|
>>> from swift.common.utils import replace_partition_in_path
|
||||||
|
>>> old='objects/16003/a38/fa0fcec07328d068e24ccbf2a62f2a38/1467658208.57179.data'
|
||||||
|
>>> replace_partition_in_path(old, 14)
|
||||||
|
'objects/16003/a38/fa0fcec07328d068e24ccbf2a62f2a38/1467658208.57179.data'
|
||||||
|
>>> replace_partition_in_path(old, 15)
|
||||||
|
'objects/32007/a38/fa0fcec07328d068e24ccbf2a62f2a38/1467658208.57179.data'
|
||||||
|
|
||||||
|
Using the original partition power (14) it returned the same path; however
|
||||||
|
after an increase to 15 it returns the new path, and the new partition is 2*X+1
|
||||||
|
in this case.
|
@ -51,6 +51,7 @@ scripts =
|
|||||||
bin/swift-object-info
|
bin/swift-object-info
|
||||||
bin/swift-object-replicator
|
bin/swift-object-replicator
|
||||||
bin/swift-object-reconstructor
|
bin/swift-object-reconstructor
|
||||||
|
bin/swift-object-relinker
|
||||||
bin/swift-object-server
|
bin/swift-object-server
|
||||||
bin/swift-object-updater
|
bin/swift-object-updater
|
||||||
bin/swift-oldies
|
bin/swift-oldies
|
||||||
|
159
swift/cli/relinker.py
Normal file
159
swift/cli/relinker.py
Normal file
@ -0,0 +1,159 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from swift.common.storage_policy import POLICIES
|
||||||
|
from swift.common.exceptions import DiskFileDeleted, DiskFileNotExist, \
|
||||||
|
DiskFileQuarantined
|
||||||
|
from swift.common.utils import replace_partition_in_path, \
|
||||||
|
audit_location_generator, get_logger
|
||||||
|
from swift.obj import diskfile
|
||||||
|
|
||||||
|
|
||||||
|
def relink(swift_dir='/etc/swift',
|
||||||
|
devices='/srv/node',
|
||||||
|
skip_mount_check=False,
|
||||||
|
logger=logging.getLogger()):
|
||||||
|
mount_check = not skip_mount_check
|
||||||
|
run = False
|
||||||
|
relinked = errors = 0
|
||||||
|
for policy in POLICIES:
|
||||||
|
policy.object_ring = None # Ensure it will be reloaded
|
||||||
|
policy.load_ring(swift_dir)
|
||||||
|
part_power = policy.object_ring.part_power
|
||||||
|
next_part_power = policy.object_ring.next_part_power
|
||||||
|
if not next_part_power or next_part_power == part_power:
|
||||||
|
continue
|
||||||
|
logging.info('Relinking files for policy %s under %s',
|
||||||
|
policy.name, devices)
|
||||||
|
run = True
|
||||||
|
locations = audit_location_generator(
|
||||||
|
devices,
|
||||||
|
diskfile.get_data_dir(policy),
|
||||||
|
mount_check=mount_check)
|
||||||
|
for fname, _, _ in locations:
|
||||||
|
newfname = replace_partition_in_path(fname, next_part_power)
|
||||||
|
try:
|
||||||
|
diskfile.relink_paths(fname, newfname, check_existing=True)
|
||||||
|
relinked += 1
|
||||||
|
except OSError as exc:
|
||||||
|
errors += 1
|
||||||
|
logger.warning("Relinking %s to %s failed: %s",
|
||||||
|
fname, newfname, exc)
|
||||||
|
|
||||||
|
if not run:
|
||||||
|
logger.warning("No policy found to increase the partition power.")
|
||||||
|
return 2
|
||||||
|
logging.info('Relinked %d diskfiles (%d errors)', relinked, errors)
|
||||||
|
if errors > 0:
|
||||||
|
return 1
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
def cleanup(swift_dir='/etc/swift',
|
||||||
|
devices='/srv/node',
|
||||||
|
skip_mount_check=False,
|
||||||
|
logger=logging.getLogger()):
|
||||||
|
mount_check = not skip_mount_check
|
||||||
|
conf = {'devices': devices, 'mount_check': mount_check}
|
||||||
|
diskfile_router = diskfile.DiskFileRouter(conf, get_logger(conf))
|
||||||
|
errors = cleaned_up = 0
|
||||||
|
run = False
|
||||||
|
for policy in POLICIES:
|
||||||
|
policy.object_ring = None # Ensure it will be reloaded
|
||||||
|
policy.load_ring(swift_dir)
|
||||||
|
part_power = policy.object_ring.part_power
|
||||||
|
next_part_power = policy.object_ring.next_part_power
|
||||||
|
if not next_part_power or next_part_power != part_power:
|
||||||
|
continue
|
||||||
|
logging.info('Cleaning up files for policy %s under %s',
|
||||||
|
policy.name, devices)
|
||||||
|
run = True
|
||||||
|
locations = audit_location_generator(
|
||||||
|
devices,
|
||||||
|
diskfile.get_data_dir(policy),
|
||||||
|
mount_check=mount_check)
|
||||||
|
for fname, device, partition in locations:
|
||||||
|
expected_fname = replace_partition_in_path(fname, part_power)
|
||||||
|
if fname == expected_fname:
|
||||||
|
continue
|
||||||
|
# Make sure there is a valid object file in the expected new
|
||||||
|
# location. Note that this could be newer than the original one
|
||||||
|
# (which happens if there is another PUT after partition power
|
||||||
|
# has been increased, but cleanup did not yet run)
|
||||||
|
loc = diskfile.AuditLocation(
|
||||||
|
os.path.dirname(expected_fname), device, partition, policy)
|
||||||
|
diskfile_mgr = diskfile_router[policy]
|
||||||
|
df = diskfile_mgr.get_diskfile_from_audit_location(loc)
|
||||||
|
try:
|
||||||
|
with df.open():
|
||||||
|
pass
|
||||||
|
except DiskFileQuarantined as exc:
|
||||||
|
logger.warning('ERROR Object %(obj)s failed audit and was'
|
||||||
|
' quarantined: %(err)r',
|
||||||
|
{'obj': loc, 'err': exc})
|
||||||
|
errors += 1
|
||||||
|
continue
|
||||||
|
except DiskFileDeleted:
|
||||||
|
pass
|
||||||
|
except DiskFileNotExist as exc:
|
||||||
|
err = False
|
||||||
|
if policy.policy_type == 'erasure_coding':
|
||||||
|
# Might be a non-durable fragment - check that there is
|
||||||
|
# a fragment in the new path. Will be fixed by the
|
||||||
|
# reconstructor then
|
||||||
|
if not os.path.isfile(expected_fname):
|
||||||
|
err = True
|
||||||
|
else:
|
||||||
|
err = True
|
||||||
|
if err:
|
||||||
|
logger.warning(
|
||||||
|
'Error cleaning up %s: %r', fname, exc)
|
||||||
|
errors += 1
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
os.remove(fname)
|
||||||
|
cleaned_up += 1
|
||||||
|
logging.debug("Removed %s", fname)
|
||||||
|
except OSError as exc:
|
||||||
|
logger.warning('Error cleaning up %s: %r', fname, exc)
|
||||||
|
errors += 1
|
||||||
|
|
||||||
|
if not run:
|
||||||
|
logger.warning("No policy found to increase the partition power.")
|
||||||
|
return 2
|
||||||
|
logging.info('Cleaned up %d diskfiles (%d errors)', cleaned_up, errors)
|
||||||
|
if errors > 0:
|
||||||
|
return 1
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
def main(args):
|
||||||
|
logging.basicConfig(
|
||||||
|
format='%(message)s',
|
||||||
|
level=logging.DEBUG if args.debug else logging.INFO,
|
||||||
|
filename=args.logfile)
|
||||||
|
|
||||||
|
logger = logging.getLogger()
|
||||||
|
|
||||||
|
if args.action == 'relink':
|
||||||
|
return relink(
|
||||||
|
args.swift_dir, args.devices, args.skip_mount_check, logger)
|
||||||
|
|
||||||
|
if args.action == 'cleanup':
|
||||||
|
return cleanup(
|
||||||
|
args.swift_dir, args.devices, args.skip_mount_check, logger)
|
@ -497,12 +497,14 @@ swift-ring-builder <builder_file>
|
|||||||
print('The overload factor is %0.2f%% (%.6f)' % (
|
print('The overload factor is %0.2f%% (%.6f)' % (
|
||||||
builder.overload * 100, builder.overload))
|
builder.overload * 100, builder.overload))
|
||||||
|
|
||||||
|
ring_dict = None
|
||||||
|
builder_dict = builder.get_ring().to_dict()
|
||||||
|
|
||||||
# compare ring file against builder file
|
# compare ring file against builder file
|
||||||
if not exists(ring_file):
|
if not exists(ring_file):
|
||||||
print('Ring file %s not found, '
|
print('Ring file %s not found, '
|
||||||
'probably it hasn\'t been written yet' % ring_file)
|
'probably it hasn\'t been written yet' % ring_file)
|
||||||
else:
|
else:
|
||||||
builder_dict = builder.get_ring().to_dict()
|
|
||||||
try:
|
try:
|
||||||
ring_dict = RingData.load(ring_file).to_dict()
|
ring_dict = RingData.load(ring_file).to_dict()
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
@ -520,6 +522,24 @@ swift-ring-builder <builder_file>
|
|||||||
for dev in builder._iter_devs():
|
for dev in builder._iter_devs():
|
||||||
flags = 'DEL' if dev in builder._remove_devs else ''
|
flags = 'DEL' if dev in builder._remove_devs else ''
|
||||||
print_dev_f(dev, balance_per_dev[dev['id']], flags)
|
print_dev_f(dev, balance_per_dev[dev['id']], flags)
|
||||||
|
|
||||||
|
# Print some helpful info if partition power increase in progress
|
||||||
|
if (builder.next_part_power and
|
||||||
|
builder.next_part_power == (builder.part_power + 1)):
|
||||||
|
print('\nPreparing increase of partition power (%d -> %d)' % (
|
||||||
|
builder.part_power, builder.next_part_power))
|
||||||
|
print('Run "swift-object-relinker relink" on all nodes before '
|
||||||
|
'moving on to increase_partition_power.')
|
||||||
|
if (builder.next_part_power and
|
||||||
|
builder.part_power == builder.next_part_power):
|
||||||
|
print('\nIncreased partition power (%d -> %d)' % (
|
||||||
|
builder.part_power, builder.next_part_power))
|
||||||
|
if builder_dict != ring_dict:
|
||||||
|
print('First run "swift-ring-builder <builderfile> write_ring"'
|
||||||
|
' now and copy the updated .ring.gz file to all nodes.')
|
||||||
|
print('Run "swift-object-relinker cleanup" on all nodes before '
|
||||||
|
'moving on to finish_increase_partition_power.')
|
||||||
|
|
||||||
exit(EXIT_SUCCESS)
|
exit(EXIT_SUCCESS)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@ -650,6 +670,11 @@ swift-ring-builder <builder_file> add
|
|||||||
print(Commands.add.__doc__.strip())
|
print(Commands.add.__doc__.strip())
|
||||||
exit(EXIT_ERROR)
|
exit(EXIT_ERROR)
|
||||||
|
|
||||||
|
if builder.next_part_power:
|
||||||
|
print('Partition power increase in progress. You need ')
|
||||||
|
print('to finish the increase first before adding devices.')
|
||||||
|
exit(EXIT_WARNING)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
for new_dev in _parse_add_values(argv[3:]):
|
for new_dev in _parse_add_values(argv[3:]):
|
||||||
for dev in builder.devs:
|
for dev in builder.devs:
|
||||||
@ -795,6 +820,11 @@ swift-ring-builder <builder_file> remove
|
|||||||
print(parse_search_value.__doc__.strip())
|
print(parse_search_value.__doc__.strip())
|
||||||
exit(EXIT_ERROR)
|
exit(EXIT_ERROR)
|
||||||
|
|
||||||
|
if builder.next_part_power:
|
||||||
|
print('Partition power increase in progress. You need ')
|
||||||
|
print('to finish the increase first before removing devices.')
|
||||||
|
exit(EXIT_WARNING)
|
||||||
|
|
||||||
devs, opts = _parse_remove_values(argv[3:])
|
devs, opts = _parse_remove_values(argv[3:])
|
||||||
|
|
||||||
input_question = 'Are you sure you want to remove these ' \
|
input_question = 'Are you sure you want to remove these ' \
|
||||||
@ -857,6 +887,11 @@ swift-ring-builder <builder_file> rebalance [options]
|
|||||||
handler.setFormatter(formatter)
|
handler.setFormatter(formatter)
|
||||||
logger.addHandler(handler)
|
logger.addHandler(handler)
|
||||||
|
|
||||||
|
if builder.next_part_power:
|
||||||
|
print('Partition power increase in progress.')
|
||||||
|
print('You need to finish the increase first before rebalancing.')
|
||||||
|
exit(EXIT_WARNING)
|
||||||
|
|
||||||
devs_changed = builder.devs_changed
|
devs_changed = builder.devs_changed
|
||||||
min_part_seconds_left = builder.min_part_seconds_left
|
min_part_seconds_left = builder.min_part_seconds_left
|
||||||
try:
|
try:
|
||||||
@ -1221,6 +1256,159 @@ swift-ring-builder <builder_file> set_overload <overload>[%]
|
|||||||
builder.save(builder_file)
|
builder.save(builder_file)
|
||||||
exit(status)
|
exit(status)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def prepare_increase_partition_power():
|
||||||
|
"""
|
||||||
|
swift-ring-builder <builder_file> prepare_increase_partition_power
|
||||||
|
Prepare the ring to increase the partition power by one.
|
||||||
|
|
||||||
|
A write_ring command is needed to make the change take effect.
|
||||||
|
|
||||||
|
Once the updated rings have been deployed to all servers you need to run
|
||||||
|
the swift-object-relinker tool to relink existing data.
|
||||||
|
|
||||||
|
*****************************
|
||||||
|
USE THIS WITH EXTREME CAUTION
|
||||||
|
*****************************
|
||||||
|
|
||||||
|
If you increase the partition power and deploy changed rings, you may
|
||||||
|
introduce unavailability in your cluster. This has an end-user impact. Make
|
||||||
|
sure you execute required operations to increase the partition power
|
||||||
|
accurately.
|
||||||
|
|
||||||
|
"""
|
||||||
|
if len(argv) < 3:
|
||||||
|
print(Commands.prepare_increase_partition_power.__doc__.strip())
|
||||||
|
exit(EXIT_ERROR)
|
||||||
|
|
||||||
|
if "object" not in basename(builder_file):
|
||||||
|
print(
|
||||||
|
'Partition power increase is only supported for object rings.')
|
||||||
|
exit(EXIT_ERROR)
|
||||||
|
|
||||||
|
if not builder.prepare_increase_partition_power():
|
||||||
|
print('Ring is already prepared for partition power increase.')
|
||||||
|
exit(EXIT_ERROR)
|
||||||
|
|
||||||
|
builder.save(builder_file)
|
||||||
|
|
||||||
|
print('The next partition power is now %d.' % builder.next_part_power)
|
||||||
|
print('The change will take effect after the next write_ring.')
|
||||||
|
print('Ensure your proxy-servers, object-replicators and ')
|
||||||
|
print('reconstructors are using the changed rings and relink ')
|
||||||
|
print('(using swift-object-relinker) your existing data')
|
||||||
|
print('before the partition power increase')
|
||||||
|
exit(EXIT_SUCCESS)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def increase_partition_power():
|
||||||
|
"""
|
||||||
|
swift-ring-builder <builder_file> increase_partition_power
|
||||||
|
Increases the partition power by one. Needs to be run after
|
||||||
|
prepare_increase_partition_power has been run and all existing data has
|
||||||
|
been relinked using the swift-object-relinker tool.
|
||||||
|
|
||||||
|
A write_ring command is needed to make the change take effect.
|
||||||
|
|
||||||
|
Once the updated rings have been deployed to all servers you need to run
|
||||||
|
the swift-object-relinker tool to cleanup old data.
|
||||||
|
|
||||||
|
*****************************
|
||||||
|
USE THIS WITH EXTREME CAUTION
|
||||||
|
*****************************
|
||||||
|
|
||||||
|
If you increase the partition power and deploy changed rings, you may
|
||||||
|
introduce unavailability in your cluster. This has an end-user impact. Make
|
||||||
|
sure you execute required operations to increase the partition power
|
||||||
|
accurately.
|
||||||
|
|
||||||
|
"""
|
||||||
|
if len(argv) < 3:
|
||||||
|
print(Commands.increase_partition_power.__doc__.strip())
|
||||||
|
exit(EXIT_ERROR)
|
||||||
|
|
||||||
|
if builder.increase_partition_power():
|
||||||
|
print('The partition power is now %d.' % builder.part_power)
|
||||||
|
print('The change will take effect after the next write_ring.')
|
||||||
|
|
||||||
|
builder._update_last_part_moves()
|
||||||
|
builder.save(builder_file)
|
||||||
|
|
||||||
|
exit(EXIT_SUCCESS)
|
||||||
|
else:
|
||||||
|
print('Ring partition power cannot be increased. Either the ring')
|
||||||
|
print('was not prepared yet, or this operation has already run.')
|
||||||
|
exit(EXIT_ERROR)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def cancel_increase_partition_power():
|
||||||
|
"""
|
||||||
|
swift-ring-builder <builder_file> cancel_increase_partition_power
|
||||||
|
Cancel the increase of the partition power.
|
||||||
|
|
||||||
|
A write_ring command is needed to make the change take effect.
|
||||||
|
|
||||||
|
Once the updated rings have been deployed to all servers you need to run
|
||||||
|
the swift-object-relinker tool to cleanup unneeded links.
|
||||||
|
|
||||||
|
*****************************
|
||||||
|
USE THIS WITH EXTREME CAUTION
|
||||||
|
*****************************
|
||||||
|
|
||||||
|
If you increase the partition power and deploy changed rings, you may
|
||||||
|
introduce unavailability in your cluster. This has an end-user impact. Make
|
||||||
|
sure you execute required operations to increase the partition power
|
||||||
|
accurately.
|
||||||
|
|
||||||
|
"""
|
||||||
|
if len(argv) < 3:
|
||||||
|
print(Commands.cancel_increase_partition_power.__doc__.strip())
|
||||||
|
exit(EXIT_ERROR)
|
||||||
|
|
||||||
|
if not builder.cancel_increase_partition_power():
|
||||||
|
print('Ring partition power increase cannot be canceled.')
|
||||||
|
exit(EXIT_ERROR)
|
||||||
|
|
||||||
|
builder.save(builder_file)
|
||||||
|
|
||||||
|
print('The next partition power is now %d.' % builder.next_part_power)
|
||||||
|
print('The change will take effect after the next write_ring.')
|
||||||
|
print('Ensure your object-servers are using the changed rings and')
|
||||||
|
print('cleanup (using swift-object-relinker) the hard links')
|
||||||
|
exit(EXIT_SUCCESS)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def finish_increase_partition_power():
|
||||||
|
"""
|
||||||
|
swift-ring-builder <builder_file> finish_increase_partition_power
|
||||||
|
Finally removes the next_part_power flag. Has to be run after the
|
||||||
|
swift-object-relinker tool has been used to cleanup old existing data.
|
||||||
|
|
||||||
|
A write_ring command is needed to make the change take effect.
|
||||||
|
|
||||||
|
*****************************
|
||||||
|
USE THIS WITH EXTREME CAUTION
|
||||||
|
*****************************
|
||||||
|
|
||||||
|
If you increase the partition power and deploy changed rings, you may
|
||||||
|
introduce unavailability in your cluster. This has an end-user impact. Make
|
||||||
|
sure you execute required operations to increase the partition power
|
||||||
|
accurately.
|
||||||
|
|
||||||
|
"""
|
||||||
|
if len(argv) < 3:
|
||||||
|
print(Commands.finish_increase_partition_power.__doc__.strip())
|
||||||
|
exit(EXIT_ERROR)
|
||||||
|
|
||||||
|
if not builder.finish_increase_partition_power():
|
||||||
|
print('Ring partition power increase cannot be finished.')
|
||||||
|
exit(EXIT_ERROR)
|
||||||
|
|
||||||
|
print('The change will take effect after the next write_ring.')
|
||||||
|
builder.save(builder_file)
|
||||||
|
|
||||||
|
exit(EXIT_SUCCESS)
|
||||||
|
|
||||||
|
|
||||||
def main(arguments=None):
|
def main(arguments=None):
|
||||||
global argv, backup_dir, builder, builder_file, ring_file
|
global argv, backup_dir, builder, builder_file, ring_file
|
||||||
|
@ -86,6 +86,7 @@ class RingBuilder(object):
|
|||||||
% (min_part_hours,))
|
% (min_part_hours,))
|
||||||
|
|
||||||
self.part_power = part_power
|
self.part_power = part_power
|
||||||
|
self.next_part_power = None
|
||||||
self.replicas = replicas
|
self.replicas = replicas
|
||||||
self.min_part_hours = min_part_hours
|
self.min_part_hours = min_part_hours
|
||||||
self.parts = 2 ** self.part_power
|
self.parts = 2 ** self.part_power
|
||||||
@ -210,6 +211,7 @@ class RingBuilder(object):
|
|||||||
"""
|
"""
|
||||||
if hasattr(builder, 'devs'):
|
if hasattr(builder, 'devs'):
|
||||||
self.part_power = builder.part_power
|
self.part_power = builder.part_power
|
||||||
|
self.next_part_power = builder.next_part_power
|
||||||
self.replicas = builder.replicas
|
self.replicas = builder.replicas
|
||||||
self.min_part_hours = builder.min_part_hours
|
self.min_part_hours = builder.min_part_hours
|
||||||
self.parts = builder.parts
|
self.parts = builder.parts
|
||||||
@ -225,6 +227,7 @@ class RingBuilder(object):
|
|||||||
self._id = getattr(builder, '_id', None)
|
self._id = getattr(builder, '_id', None)
|
||||||
else:
|
else:
|
||||||
self.part_power = builder['part_power']
|
self.part_power = builder['part_power']
|
||||||
|
self.next_part_power = builder.get('next_part_power')
|
||||||
self.replicas = builder['replicas']
|
self.replicas = builder['replicas']
|
||||||
self.min_part_hours = builder['min_part_hours']
|
self.min_part_hours = builder['min_part_hours']
|
||||||
self.parts = builder['parts']
|
self.parts = builder['parts']
|
||||||
@ -261,6 +264,7 @@ class RingBuilder(object):
|
|||||||
copy_from.
|
copy_from.
|
||||||
"""
|
"""
|
||||||
return {'part_power': self.part_power,
|
return {'part_power': self.part_power,
|
||||||
|
'next_part_power': self.next_part_power,
|
||||||
'replicas': self.replicas,
|
'replicas': self.replicas,
|
||||||
'min_part_hours': self.min_part_hours,
|
'min_part_hours': self.min_part_hours,
|
||||||
'parts': self.parts,
|
'parts': self.parts,
|
||||||
@ -341,7 +345,8 @@ class RingBuilder(object):
|
|||||||
self._ring = \
|
self._ring = \
|
||||||
RingData([array('H', p2d) for p2d in
|
RingData([array('H', p2d) for p2d in
|
||||||
self._replica2part2dev],
|
self._replica2part2dev],
|
||||||
devs, self.part_shift)
|
devs, self.part_shift,
|
||||||
|
self.next_part_power)
|
||||||
return self._ring
|
return self._ring
|
||||||
|
|
||||||
def add_dev(self, dev):
|
def add_dev(self, dev):
|
||||||
@ -1751,6 +1756,26 @@ class RingBuilder(object):
|
|||||||
matched_devs.append(dev)
|
matched_devs.append(dev)
|
||||||
return matched_devs
|
return matched_devs
|
||||||
|
|
||||||
|
def prepare_increase_partition_power(self):
|
||||||
|
"""
|
||||||
|
Prepares a ring for partition power increase.
|
||||||
|
|
||||||
|
This makes it possible to compute the future location of any object
|
||||||
|
based on the next partition power.
|
||||||
|
|
||||||
|
In this phase object servers should create hard links when finalizing a
|
||||||
|
write to the new location as well. A relinker will be run after
|
||||||
|
restarting object-servers, creating hard links to all existing objects
|
||||||
|
in their future location.
|
||||||
|
|
||||||
|
:returns: False if next_part_power was not set, otherwise True.
|
||||||
|
"""
|
||||||
|
if self.next_part_power:
|
||||||
|
return False
|
||||||
|
self.next_part_power = self.part_power + 1
|
||||||
|
self.version += 1
|
||||||
|
return True
|
||||||
|
|
||||||
def increase_partition_power(self):
|
def increase_partition_power(self):
|
||||||
"""
|
"""
|
||||||
Increases ring partition power by one.
|
Increases ring partition power by one.
|
||||||
@ -1759,8 +1784,17 @@ class RingBuilder(object):
|
|||||||
|
|
||||||
OLD: 0, 3, 7, 5, 2, 1, ...
|
OLD: 0, 3, 7, 5, 2, 1, ...
|
||||||
NEW: 0, 0, 3, 3, 7, 7, 5, 5, 2, 2, 1, 1, ...
|
NEW: 0, 0, 3, 3, 7, 7, 5, 5, 2, 2, 1, 1, ...
|
||||||
|
|
||||||
|
:returns: False if next_part_power was not set or is equal to current
|
||||||
|
part_power, None if something went wrong, otherwise True.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
if not self.next_part_power:
|
||||||
|
return False
|
||||||
|
|
||||||
|
if self.next_part_power != (self.part_power + 1):
|
||||||
|
return False
|
||||||
|
|
||||||
new_replica2part2dev = []
|
new_replica2part2dev = []
|
||||||
for replica in self._replica2part2dev:
|
for replica in self._replica2part2dev:
|
||||||
new_replica = array('H')
|
new_replica = array('H')
|
||||||
@ -1775,13 +1809,47 @@ class RingBuilder(object):
|
|||||||
|
|
||||||
# We need to update the time when a partition has been moved the last
|
# We need to update the time when a partition has been moved the last
|
||||||
# time. Since this is an array of all partitions, we need to double it
|
# time. Since this is an array of all partitions, we need to double it
|
||||||
# two
|
# too
|
||||||
new_last_part_moves = []
|
new_last_part_moves = []
|
||||||
for partition in self._last_part_moves:
|
for partition in self._last_part_moves:
|
||||||
new_last_part_moves.append(partition)
|
new_last_part_moves.append(partition)
|
||||||
new_last_part_moves.append(partition)
|
new_last_part_moves.append(partition)
|
||||||
self._last_part_moves = new_last_part_moves
|
self._last_part_moves = new_last_part_moves
|
||||||
|
|
||||||
self.part_power += 1
|
self.part_power = self.next_part_power
|
||||||
self.parts *= 2
|
self.parts *= 2
|
||||||
self.version += 1
|
self.version += 1
|
||||||
|
return True
|
||||||
|
|
||||||
|
def cancel_increase_partition_power(self):
|
||||||
|
"""
|
||||||
|
Cancels a ring partition power increasement.
|
||||||
|
|
||||||
|
This sets the next_part_power to the current part_power. Object
|
||||||
|
replicators will still skip replication, and a cleanup is still
|
||||||
|
required. Finally, a finish_increase_partition_power needs to be run.
|
||||||
|
|
||||||
|
:returns: False if next_part_power was not set or is equal to current
|
||||||
|
part_power, otherwise True.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not self.next_part_power:
|
||||||
|
return False
|
||||||
|
|
||||||
|
if self.next_part_power != (self.part_power + 1):
|
||||||
|
return False
|
||||||
|
|
||||||
|
self.next_part_power = self.part_power
|
||||||
|
self.version += 1
|
||||||
|
return True
|
||||||
|
|
||||||
|
def finish_increase_partition_power(self):
|
||||||
|
"""Finish the partition power increase.
|
||||||
|
|
||||||
|
The hard links from the old object locations should be removed by now.
|
||||||
|
"""
|
||||||
|
if self.next_part_power and self.next_part_power == self.part_power:
|
||||||
|
self.next_part_power = None
|
||||||
|
self.version += 1
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
@ -38,10 +38,12 @@ from swift.common.ring.utils import tiers_for_dev
|
|||||||
class RingData(object):
|
class RingData(object):
|
||||||
"""Partitioned consistent hashing ring data (used for serialization)."""
|
"""Partitioned consistent hashing ring data (used for serialization)."""
|
||||||
|
|
||||||
def __init__(self, replica2part2dev_id, devs, part_shift):
|
def __init__(self, replica2part2dev_id, devs, part_shift,
|
||||||
|
next_part_power=None):
|
||||||
self.devs = devs
|
self.devs = devs
|
||||||
self._replica2part2dev_id = replica2part2dev_id
|
self._replica2part2dev_id = replica2part2dev_id
|
||||||
self._part_shift = part_shift
|
self._part_shift = part_shift
|
||||||
|
self.next_part_power = next_part_power
|
||||||
|
|
||||||
for dev in self.devs:
|
for dev in self.devs:
|
||||||
if dev is not None:
|
if dev is not None:
|
||||||
@ -113,18 +115,27 @@ class RingData(object):
|
|||||||
|
|
||||||
if not hasattr(ring_data, 'devs'):
|
if not hasattr(ring_data, 'devs'):
|
||||||
ring_data = RingData(ring_data['replica2part2dev_id'],
|
ring_data = RingData(ring_data['replica2part2dev_id'],
|
||||||
ring_data['devs'], ring_data['part_shift'])
|
ring_data['devs'], ring_data['part_shift'],
|
||||||
|
ring_data.get('next_part_power'))
|
||||||
return ring_data
|
return ring_data
|
||||||
|
|
||||||
def serialize_v1(self, file_obj):
|
def serialize_v1(self, file_obj):
|
||||||
# Write out new-style serialization magic and version:
|
# Write out new-style serialization magic and version:
|
||||||
file_obj.write(struct.pack('!4sH', 'R1NG', 1))
|
file_obj.write(struct.pack('!4sH', 'R1NG', 1))
|
||||||
ring = self.to_dict()
|
ring = self.to_dict()
|
||||||
json_encoder = json.JSONEncoder(sort_keys=True)
|
|
||||||
json_text = json_encoder.encode(
|
# Only include next_part_power if it is set in the
|
||||||
{'devs': ring['devs'], 'part_shift': ring['part_shift'],
|
# builder, otherwise just ignore it
|
||||||
|
_text = {'devs': ring['devs'], 'part_shift': ring['part_shift'],
|
||||||
'replica_count': len(ring['replica2part2dev_id']),
|
'replica_count': len(ring['replica2part2dev_id']),
|
||||||
'byteorder': sys.byteorder})
|
'byteorder': sys.byteorder}
|
||||||
|
|
||||||
|
next_part_power = ring.get('next_part_power')
|
||||||
|
if next_part_power is not None:
|
||||||
|
_text['next_part_power'] = next_part_power
|
||||||
|
|
||||||
|
json_encoder = json.JSONEncoder(sort_keys=True)
|
||||||
|
json_text = json_encoder.encode(_text)
|
||||||
json_len = len(json_text)
|
json_len = len(json_text)
|
||||||
file_obj.write(struct.pack('!I', json_len))
|
file_obj.write(struct.pack('!I', json_len))
|
||||||
file_obj.write(json_text)
|
file_obj.write(json_text)
|
||||||
@ -155,7 +166,8 @@ class RingData(object):
|
|||||||
def to_dict(self):
|
def to_dict(self):
|
||||||
return {'devs': self.devs,
|
return {'devs': self.devs,
|
||||||
'replica2part2dev_id': self._replica2part2dev_id,
|
'replica2part2dev_id': self._replica2part2dev_id,
|
||||||
'part_shift': self._part_shift}
|
'part_shift': self._part_shift,
|
||||||
|
'next_part_power': self.next_part_power}
|
||||||
|
|
||||||
|
|
||||||
class Ring(object):
|
class Ring(object):
|
||||||
@ -244,6 +256,15 @@ class Ring(object):
|
|||||||
self._num_regions = len(regions)
|
self._num_regions = len(regions)
|
||||||
self._num_zones = len(zones)
|
self._num_zones = len(zones)
|
||||||
self._num_ips = len(ips)
|
self._num_ips = len(ips)
|
||||||
|
self._next_part_power = ring_data.next_part_power
|
||||||
|
|
||||||
|
@property
|
||||||
|
def next_part_power(self):
|
||||||
|
return self._next_part_power
|
||||||
|
|
||||||
|
@property
|
||||||
|
def part_power(self):
|
||||||
|
return 32 - self._part_shift
|
||||||
|
|
||||||
def _rebuild_tier_data(self):
|
def _rebuild_tier_data(self):
|
||||||
self.tier2devs = defaultdict(list)
|
self.tier2devs = defaultdict(list)
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
|
|
||||||
from __future__ import print_function
|
from __future__ import print_function
|
||||||
|
|
||||||
|
import binascii
|
||||||
import errno
|
import errno
|
||||||
import fcntl
|
import fcntl
|
||||||
import grp
|
import grp
|
||||||
@ -27,6 +28,7 @@ import operator
|
|||||||
import os
|
import os
|
||||||
import pwd
|
import pwd
|
||||||
import re
|
import re
|
||||||
|
import struct
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
@ -4238,3 +4240,25 @@ def md5_hash_for_file(fname):
|
|||||||
for block in iter(lambda: f.read(MD5_BLOCK_READ_BYTES), ''):
|
for block in iter(lambda: f.read(MD5_BLOCK_READ_BYTES), ''):
|
||||||
md5sum.update(block)
|
md5sum.update(block)
|
||||||
return md5sum.hexdigest()
|
return md5sum.hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
def replace_partition_in_path(path, part_power):
|
||||||
|
"""
|
||||||
|
Takes a full path to a file and a partition power and returns
|
||||||
|
the same path, but with the correct partition number. Most useful when
|
||||||
|
increasing the partition power.
|
||||||
|
|
||||||
|
:param path: full path to a file, for example object .data file
|
||||||
|
:param part_power: partition power to compute correct partition number
|
||||||
|
:returns: Path with re-computed partition power
|
||||||
|
"""
|
||||||
|
|
||||||
|
path_components = path.split(os.sep)
|
||||||
|
digest = binascii.unhexlify(path_components[-2])
|
||||||
|
|
||||||
|
part_shift = 32 - int(part_power)
|
||||||
|
part = struct.unpack_from('>I', digest)[0] >> part_shift
|
||||||
|
|
||||||
|
path_components[-4] = "%d" % part
|
||||||
|
|
||||||
|
return os.sep.join(path_components)
|
||||||
|
@ -65,7 +65,7 @@ from swift.common.utils import mkdirs, Timestamp, \
|
|||||||
config_true_value, listdir, split_path, ismount, remove_file, \
|
config_true_value, listdir, split_path, ismount, remove_file, \
|
||||||
get_md5_socket, F_SETPIPE_SZ, decode_timestamps, encode_timestamps, \
|
get_md5_socket, F_SETPIPE_SZ, decode_timestamps, encode_timestamps, \
|
||||||
tpool_reraise, MD5_OF_EMPTY_STRING, link_fd_to_path, o_tmpfile_supported, \
|
tpool_reraise, MD5_OF_EMPTY_STRING, link_fd_to_path, o_tmpfile_supported, \
|
||||||
O_TMPFILE, makedirs_count
|
O_TMPFILE, makedirs_count, replace_partition_in_path
|
||||||
from swift.common.splice import splice, tee
|
from swift.common.splice import splice, tee
|
||||||
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \
|
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \
|
||||||
DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \
|
DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \
|
||||||
@ -345,6 +345,37 @@ def invalidate_hash(suffix_dir):
|
|||||||
inv_fh.write(suffix + "\n")
|
inv_fh.write(suffix + "\n")
|
||||||
|
|
||||||
|
|
||||||
|
def relink_paths(target_path, new_target_path, check_existing=False):
|
||||||
|
"""
|
||||||
|
Hard-links a file located in target_path using the second path
|
||||||
|
new_target_path. Creates intermediate directories if required.
|
||||||
|
|
||||||
|
:param target_path: current absolute filename
|
||||||
|
:param new_target_path: new absolute filename for the hardlink
|
||||||
|
:param check_existing: if True, check whether the link is already present
|
||||||
|
before attempting to create a new one
|
||||||
|
"""
|
||||||
|
|
||||||
|
if target_path != new_target_path:
|
||||||
|
logging.debug('Relinking %s to %s due to next_part_power set',
|
||||||
|
target_path, new_target_path)
|
||||||
|
new_target_dir = os.path.dirname(new_target_path)
|
||||||
|
if not os.path.isdir(new_target_dir):
|
||||||
|
os.makedirs(new_target_dir)
|
||||||
|
|
||||||
|
link_exists = False
|
||||||
|
if check_existing:
|
||||||
|
try:
|
||||||
|
new_stat = os.stat(new_target_path)
|
||||||
|
orig_stat = os.stat(target_path)
|
||||||
|
link_exists = (new_stat.st_ino == orig_stat.st_ino)
|
||||||
|
except OSError:
|
||||||
|
pass # if anything goes wrong, try anyway
|
||||||
|
|
||||||
|
if not link_exists:
|
||||||
|
os.link(target_path, new_target_path)
|
||||||
|
|
||||||
|
|
||||||
def get_part_path(dev_path, policy, partition):
|
def get_part_path(dev_path, policy, partition):
|
||||||
"""
|
"""
|
||||||
Given the device path, policy, and partition, returns the full
|
Given the device path, policy, and partition, returns the full
|
||||||
@ -1455,9 +1486,11 @@ class BaseDiskFileWriter(object):
|
|||||||
:param tmppath: full path name of the opened file descriptor
|
:param tmppath: full path name of the opened file descriptor
|
||||||
:param bytes_per_sync: number bytes written between sync calls
|
:param bytes_per_sync: number bytes written between sync calls
|
||||||
:param diskfile: the diskfile creating this DiskFileWriter instance
|
:param diskfile: the diskfile creating this DiskFileWriter instance
|
||||||
|
:param next_part_power: the next partition power to be used
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, name, datadir, fd, tmppath, bytes_per_sync, diskfile):
|
def __init__(self, name, datadir, fd, tmppath, bytes_per_sync, diskfile,
|
||||||
|
next_part_power):
|
||||||
# Parameter tracking
|
# Parameter tracking
|
||||||
self._name = name
|
self._name = name
|
||||||
self._datadir = datadir
|
self._datadir = datadir
|
||||||
@ -1465,6 +1498,7 @@ class BaseDiskFileWriter(object):
|
|||||||
self._tmppath = tmppath
|
self._tmppath = tmppath
|
||||||
self._bytes_per_sync = bytes_per_sync
|
self._bytes_per_sync = bytes_per_sync
|
||||||
self._diskfile = diskfile
|
self._diskfile = diskfile
|
||||||
|
self.next_part_power = next_part_power
|
||||||
|
|
||||||
# Internal attributes
|
# Internal attributes
|
||||||
self._upload_size = 0
|
self._upload_size = 0
|
||||||
@ -1528,6 +1562,21 @@ class BaseDiskFileWriter(object):
|
|||||||
# It was an unnamed temp file created by open() with O_TMPFILE
|
# It was an unnamed temp file created by open() with O_TMPFILE
|
||||||
link_fd_to_path(self._fd, target_path,
|
link_fd_to_path(self._fd, target_path,
|
||||||
self._diskfile._dirs_created)
|
self._diskfile._dirs_created)
|
||||||
|
|
||||||
|
# Check if the partition power will/has been increased
|
||||||
|
new_target_path = None
|
||||||
|
if self.next_part_power:
|
||||||
|
new_target_path = replace_partition_in_path(
|
||||||
|
target_path, self.next_part_power)
|
||||||
|
if target_path != new_target_path:
|
||||||
|
try:
|
||||||
|
fsync_dir(os.path.dirname(target_path))
|
||||||
|
relink_paths(target_path, new_target_path)
|
||||||
|
except OSError as exc:
|
||||||
|
self.manager.logger.exception(
|
||||||
|
'Relinking %s to %s failed: %s',
|
||||||
|
target_path, new_target_path, exc)
|
||||||
|
|
||||||
# If rename is successful, flag put as succeeded. This is done to avoid
|
# If rename is successful, flag put as succeeded. This is done to avoid
|
||||||
# unnecessary os.unlink() of tempfile later. As renamer() has
|
# unnecessary os.unlink() of tempfile later. As renamer() has
|
||||||
# succeeded, the tempfile would no longer exist at its original path.
|
# succeeded, the tempfile would no longer exist at its original path.
|
||||||
@ -1538,6 +1587,8 @@ class BaseDiskFileWriter(object):
|
|||||||
except OSError:
|
except OSError:
|
||||||
logging.exception(_('Problem cleaning up %s'), self._datadir)
|
logging.exception(_('Problem cleaning up %s'), self._datadir)
|
||||||
|
|
||||||
|
self._part_power_cleanup(target_path, new_target_path)
|
||||||
|
|
||||||
def _put(self, metadata, cleanup=True, *a, **kw):
|
def _put(self, metadata, cleanup=True, *a, **kw):
|
||||||
"""
|
"""
|
||||||
Helper method for subclasses.
|
Helper method for subclasses.
|
||||||
@ -1584,6 +1635,43 @@ class BaseDiskFileWriter(object):
|
|||||||
"""
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def _part_power_cleanup(self, cur_path, new_path):
|
||||||
|
"""
|
||||||
|
Cleanup relative DiskFile directories.
|
||||||
|
|
||||||
|
If the partition power is increased soon or has just been increased but
|
||||||
|
the relinker didn't yet cleanup the old files, an additional cleanup of
|
||||||
|
the relative dirs has to be done. Otherwise there might be some unused
|
||||||
|
files left if a PUT or DELETE is done in the meantime
|
||||||
|
:param cur_path: current full path to an object file
|
||||||
|
:param new_path: recomputed path to an object file, based on the
|
||||||
|
next_part_power set in the ring
|
||||||
|
|
||||||
|
"""
|
||||||
|
if new_path is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Partition power will be increased soon
|
||||||
|
if new_path != cur_path:
|
||||||
|
new_target_dir = os.path.dirname(new_path)
|
||||||
|
try:
|
||||||
|
self.manager.cleanup_ondisk_files(new_target_dir)
|
||||||
|
except OSError:
|
||||||
|
logging.exception(
|
||||||
|
_('Problem cleaning up %s'), new_target_dir)
|
||||||
|
|
||||||
|
# Partition power has been increased, cleanup not yet finished
|
||||||
|
else:
|
||||||
|
prev_part_power = int(self.next_part_power) - 1
|
||||||
|
old_target_path = replace_partition_in_path(
|
||||||
|
cur_path, prev_part_power)
|
||||||
|
old_target_dir = os.path.dirname(old_target_path)
|
||||||
|
try:
|
||||||
|
self.manager.cleanup_ondisk_files(old_target_dir)
|
||||||
|
except OSError:
|
||||||
|
logging.exception(
|
||||||
|
_('Problem cleaning up %s'), old_target_dir)
|
||||||
|
|
||||||
|
|
||||||
class BaseDiskFileReader(object):
|
class BaseDiskFileReader(object):
|
||||||
"""
|
"""
|
||||||
@ -1922,6 +2010,7 @@ class BaseDiskFile(object):
|
|||||||
:param use_linkat: if True, use open() with linkat() to create obj file
|
:param use_linkat: if True, use open() with linkat() to create obj file
|
||||||
:param open_expired: if True, open() will not raise a DiskFileExpired if
|
:param open_expired: if True, open() will not raise a DiskFileExpired if
|
||||||
object is expired
|
object is expired
|
||||||
|
:param next_part_power: the next partition power to be used
|
||||||
"""
|
"""
|
||||||
reader_cls = None # must be set by subclasses
|
reader_cls = None # must be set by subclasses
|
||||||
writer_cls = None # must be set by subclasses
|
writer_cls = None # must be set by subclasses
|
||||||
@ -1929,7 +2018,8 @@ class BaseDiskFile(object):
|
|||||||
def __init__(self, mgr, device_path, partition,
|
def __init__(self, mgr, device_path, partition,
|
||||||
account=None, container=None, obj=None, _datadir=None,
|
account=None, container=None, obj=None, _datadir=None,
|
||||||
policy=None, use_splice=False, pipe_size=None,
|
policy=None, use_splice=False, pipe_size=None,
|
||||||
use_linkat=False, open_expired=False, **kwargs):
|
use_linkat=False, open_expired=False, next_part_power=None,
|
||||||
|
**kwargs):
|
||||||
self._manager = mgr
|
self._manager = mgr
|
||||||
self._device_path = device_path
|
self._device_path = device_path
|
||||||
self._logger = mgr.logger
|
self._logger = mgr.logger
|
||||||
@ -1947,6 +2037,7 @@ class BaseDiskFile(object):
|
|||||||
# in all entry fops being carried out synchronously.
|
# in all entry fops being carried out synchronously.
|
||||||
self._dirs_created = 0
|
self._dirs_created = 0
|
||||||
self.policy = policy
|
self.policy = policy
|
||||||
|
self.next_part_power = next_part_power
|
||||||
if account and container and obj:
|
if account and container and obj:
|
||||||
self._name = '/' + '/'.join((account, container, obj))
|
self._name = '/' + '/'.join((account, container, obj))
|
||||||
self._account = account
|
self._account = account
|
||||||
@ -2491,7 +2582,8 @@ class BaseDiskFile(object):
|
|||||||
raise
|
raise
|
||||||
dfw = self.writer_cls(self._name, self._datadir, fd, tmppath,
|
dfw = self.writer_cls(self._name, self._datadir, fd, tmppath,
|
||||||
bytes_per_sync=self._bytes_per_sync,
|
bytes_per_sync=self._bytes_per_sync,
|
||||||
diskfile=self)
|
diskfile=self,
|
||||||
|
next_part_power=self.next_part_power)
|
||||||
yield dfw
|
yield dfw
|
||||||
finally:
|
finally:
|
||||||
try:
|
try:
|
||||||
@ -2712,10 +2804,27 @@ class ECDiskFileWriter(BaseDiskFileWriter):
|
|||||||
|
|
||||||
def _finalize_durable(self, data_file_path, durable_data_file_path):
|
def _finalize_durable(self, data_file_path, durable_data_file_path):
|
||||||
exc = None
|
exc = None
|
||||||
|
new_data_file_path = new_durable_data_file_path = None
|
||||||
|
if self.next_part_power:
|
||||||
|
new_data_file_path = replace_partition_in_path(
|
||||||
|
data_file_path, self.next_part_power)
|
||||||
|
new_durable_data_file_path = replace_partition_in_path(
|
||||||
|
durable_data_file_path, self.next_part_power)
|
||||||
try:
|
try:
|
||||||
try:
|
try:
|
||||||
os.rename(data_file_path, durable_data_file_path)
|
os.rename(data_file_path, durable_data_file_path)
|
||||||
fsync_dir(self._datadir)
|
fsync_dir(self._datadir)
|
||||||
|
if self.next_part_power and \
|
||||||
|
data_file_path != new_data_file_path:
|
||||||
|
try:
|
||||||
|
os.rename(new_data_file_path,
|
||||||
|
new_durable_data_file_path)
|
||||||
|
except OSError as exc:
|
||||||
|
self.manager.logger.exception(
|
||||||
|
'Renaming new path %s to %s failed: %s',
|
||||||
|
new_data_file_path, new_durable_data_file_path,
|
||||||
|
exc)
|
||||||
|
|
||||||
except (OSError, IOError) as err:
|
except (OSError, IOError) as err:
|
||||||
if err.errno not in (errno.ENOSPC, errno.EDQUOT):
|
if err.errno not in (errno.ENOSPC, errno.EDQUOT):
|
||||||
# re-raise to catch all handler
|
# re-raise to catch all handler
|
||||||
@ -2733,6 +2842,9 @@ class ECDiskFileWriter(BaseDiskFileWriter):
|
|||||||
self.manager.logger.exception(
|
self.manager.logger.exception(
|
||||||
_('Problem cleaning up %(datadir)s (%(err)s)'),
|
_('Problem cleaning up %(datadir)s (%(err)s)'),
|
||||||
{'datadir': self._datadir, 'err': os_err})
|
{'datadir': self._datadir, 'err': os_err})
|
||||||
|
self._part_power_cleanup(
|
||||||
|
durable_data_file_path, new_durable_data_file_path)
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
params = {'file': durable_data_file_path, 'err': err}
|
params = {'file': durable_data_file_path, 'err': err}
|
||||||
self.manager.logger.exception(
|
self.manager.logger.exception(
|
||||||
|
@ -916,6 +916,19 @@ class ObjectReconstructor(Daemon):
|
|||||||
all_parts = []
|
all_parts = []
|
||||||
|
|
||||||
for policy, local_devices in policy2devices.items():
|
for policy, local_devices in policy2devices.items():
|
||||||
|
# Skip replication if next_part_power is set. In this case
|
||||||
|
# every object is hard-linked twice, but the replicator
|
||||||
|
# can't detect them and would create a second copy of the
|
||||||
|
# file if not yet existing - and this might double the
|
||||||
|
# actual transferred and stored data
|
||||||
|
next_part_power = getattr(
|
||||||
|
policy.object_ring, 'next_part_power', None)
|
||||||
|
if next_part_power is not None:
|
||||||
|
self.logger.warning(
|
||||||
|
_("next_part_power set in policy '%s'. Skipping"),
|
||||||
|
policy.name)
|
||||||
|
continue
|
||||||
|
|
||||||
df_mgr = self._df_router[policy]
|
df_mgr = self._df_router[policy]
|
||||||
for local_dev in local_devices:
|
for local_dev in local_devices:
|
||||||
dev_path = df_mgr.get_dev_path(local_dev['device'])
|
dev_path = df_mgr.get_dev_path(local_dev['device'])
|
||||||
@ -1018,6 +1031,7 @@ class ObjectReconstructor(Daemon):
|
|||||||
self.logger.info(_("Ring change detected. Aborting "
|
self.logger.info(_("Ring change detected. Aborting "
|
||||||
"current reconstruction pass."))
|
"current reconstruction pass."))
|
||||||
return
|
return
|
||||||
|
|
||||||
self.reconstruction_part_count += 1
|
self.reconstruction_part_count += 1
|
||||||
jobs = self.build_reconstruction_jobs(part_info)
|
jobs = self.build_reconstruction_jobs(part_info)
|
||||||
if not jobs:
|
if not jobs:
|
||||||
|
@ -676,6 +676,19 @@ class ObjectReplicator(Daemon):
|
|||||||
jobs = []
|
jobs = []
|
||||||
ips = whataremyips(self.bind_ip)
|
ips = whataremyips(self.bind_ip)
|
||||||
for policy in POLICIES:
|
for policy in POLICIES:
|
||||||
|
# Skip replication if next_part_power is set. In this case
|
||||||
|
# every object is hard-linked twice, but the replicator can't
|
||||||
|
# detect them and would create a second copy of the file if not
|
||||||
|
# yet existing - and this might double the actual transferred
|
||||||
|
# and stored data
|
||||||
|
next_part_power = getattr(
|
||||||
|
policy.object_ring, 'next_part_power', None)
|
||||||
|
if next_part_power is not None:
|
||||||
|
self.logger.warning(
|
||||||
|
_("next_part_power set in policy '%s'. Skipping"),
|
||||||
|
policy.name)
|
||||||
|
continue
|
||||||
|
|
||||||
if policy.policy_type == REPL_POLICY:
|
if policy.policy_type == REPL_POLICY:
|
||||||
if (override_policies is not None and
|
if (override_policies is not None and
|
||||||
str(policy.idx) not in override_policies):
|
str(policy.idx) not in override_policies):
|
||||||
@ -744,6 +757,7 @@ class ObjectReplicator(Daemon):
|
|||||||
self.logger.info(_("Ring change detected. Aborting "
|
self.logger.info(_("Ring change detected. Aborting "
|
||||||
"current replication pass."))
|
"current replication pass."))
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if isfile(job['path']):
|
if isfile(job['path']):
|
||||||
# Clean up any (probably zero-byte) files where a
|
# Clean up any (probably zero-byte) files where a
|
||||||
|
@ -513,11 +513,13 @@ class ObjectController(BaseStorageServer):
|
|||||||
if new_delete_at and new_delete_at < time.time():
|
if new_delete_at and new_delete_at < time.time():
|
||||||
return HTTPBadRequest(body='X-Delete-At in past', request=request,
|
return HTTPBadRequest(body='X-Delete-At in past', request=request,
|
||||||
content_type='text/plain')
|
content_type='text/plain')
|
||||||
|
next_part_power = request.headers.get('X-Backend-Next-Part-Power')
|
||||||
try:
|
try:
|
||||||
disk_file = self.get_diskfile(
|
disk_file = self.get_diskfile(
|
||||||
device, partition, account, container, obj,
|
device, partition, account, container, obj,
|
||||||
policy=policy, open_expired=config_true_value(
|
policy=policy, open_expired=config_true_value(
|
||||||
request.headers.get('x-backend-replication', 'false')))
|
request.headers.get('x-backend-replication', 'false')),
|
||||||
|
next_part_power=next_part_power)
|
||||||
except DiskFileDeviceUnavailable:
|
except DiskFileDeviceUnavailable:
|
||||||
return HTTPInsufficientStorage(drive=device, request=request)
|
return HTTPInsufficientStorage(drive=device, request=request)
|
||||||
try:
|
try:
|
||||||
@ -675,10 +677,12 @@ class ObjectController(BaseStorageServer):
|
|||||||
# nodes; handoff nodes should 409 subrequests to over-write an
|
# nodes; handoff nodes should 409 subrequests to over-write an
|
||||||
# existing data fragment until they offloaded the existing fragment
|
# existing data fragment until they offloaded the existing fragment
|
||||||
frag_index = request.headers.get('X-Backend-Ssync-Frag-Index')
|
frag_index = request.headers.get('X-Backend-Ssync-Frag-Index')
|
||||||
|
next_part_power = request.headers.get('X-Backend-Next-Part-Power')
|
||||||
try:
|
try:
|
||||||
disk_file = self.get_diskfile(
|
disk_file = self.get_diskfile(
|
||||||
device, partition, account, container, obj,
|
device, partition, account, container, obj,
|
||||||
policy=policy, frag_index=frag_index)
|
policy=policy, frag_index=frag_index,
|
||||||
|
next_part_power=next_part_power)
|
||||||
except DiskFileDeviceUnavailable:
|
except DiskFileDeviceUnavailable:
|
||||||
return HTTPInsufficientStorage(drive=device, request=request)
|
return HTTPInsufficientStorage(drive=device, request=request)
|
||||||
try:
|
try:
|
||||||
@ -987,10 +991,11 @@ class ObjectController(BaseStorageServer):
|
|||||||
device, partition, account, container, obj, policy = \
|
device, partition, account, container, obj, policy = \
|
||||||
get_name_and_placement(request, 5, 5, True)
|
get_name_and_placement(request, 5, 5, True)
|
||||||
req_timestamp = valid_timestamp(request)
|
req_timestamp = valid_timestamp(request)
|
||||||
|
next_part_power = request.headers.get('X-Backend-Next-Part-Power')
|
||||||
try:
|
try:
|
||||||
disk_file = self.get_diskfile(
|
disk_file = self.get_diskfile(
|
||||||
device, partition, account, container, obj,
|
device, partition, account, container, obj,
|
||||||
policy=policy)
|
policy=policy, next_part_power=next_part_power)
|
||||||
except DiskFileDeviceUnavailable:
|
except DiskFileDeviceUnavailable:
|
||||||
return HTTPInsufficientStorage(drive=device, request=request)
|
return HTTPInsufficientStorage(drive=device, request=request)
|
||||||
try:
|
try:
|
||||||
|
@ -235,6 +235,9 @@ class BaseObjectController(Controller):
|
|||||||
container_info['storage_policy'])
|
container_info['storage_policy'])
|
||||||
obj_ring = self.app.get_object_ring(policy_index)
|
obj_ring = self.app.get_object_ring(policy_index)
|
||||||
req.headers['X-Backend-Storage-Policy-Index'] = policy_index
|
req.headers['X-Backend-Storage-Policy-Index'] = policy_index
|
||||||
|
next_part_power = getattr(obj_ring, 'next_part_power', None)
|
||||||
|
if next_part_power:
|
||||||
|
req.headers['X-Backend-Next-Part-Power'] = next_part_power
|
||||||
partition, nodes = obj_ring.get_nodes(
|
partition, nodes = obj_ring.get_nodes(
|
||||||
self.account_name, self.container_name, self.object_name)
|
self.account_name, self.container_name, self.object_name)
|
||||||
|
|
||||||
@ -643,6 +646,9 @@ class BaseObjectController(Controller):
|
|||||||
|
|
||||||
# pass the policy index to storage nodes via req header
|
# pass the policy index to storage nodes via req header
|
||||||
req.headers['X-Backend-Storage-Policy-Index'] = policy_index
|
req.headers['X-Backend-Storage-Policy-Index'] = policy_index
|
||||||
|
next_part_power = getattr(obj_ring, 'next_part_power', None)
|
||||||
|
if next_part_power:
|
||||||
|
req.headers['X-Backend-Next-Part-Power'] = next_part_power
|
||||||
req.acl = container_info['write_acl']
|
req.acl = container_info['write_acl']
|
||||||
req.environ['swift_sync_key'] = container_info['sync_key']
|
req.environ['swift_sync_key'] = container_info['sync_key']
|
||||||
|
|
||||||
@ -701,6 +707,9 @@ class BaseObjectController(Controller):
|
|||||||
obj_ring = self.app.get_object_ring(policy_index)
|
obj_ring = self.app.get_object_ring(policy_index)
|
||||||
# pass the policy index to storage nodes via req header
|
# pass the policy index to storage nodes via req header
|
||||||
req.headers['X-Backend-Storage-Policy-Index'] = policy_index
|
req.headers['X-Backend-Storage-Policy-Index'] = policy_index
|
||||||
|
next_part_power = getattr(obj_ring, 'next_part_power', None)
|
||||||
|
if next_part_power:
|
||||||
|
req.headers['X-Backend-Next-Part-Power'] = next_part_power
|
||||||
container_partition = container_info['partition']
|
container_partition = container_info['partition']
|
||||||
container_nodes = container_info['nodes']
|
container_nodes = container_info['nodes']
|
||||||
req.acl = container_info['write_acl']
|
req.acl = container_info['write_acl']
|
||||||
|
199
test/probe/test_object_partpower_increase.py
Executable file
199
test/probe/test_object_partpower_increase.py
Executable file
@ -0,0 +1,199 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
|
from errno import EEXIST
|
||||||
|
from shutil import copyfile
|
||||||
|
from tempfile import mkstemp
|
||||||
|
from time import time
|
||||||
|
from unittest import main
|
||||||
|
from uuid import uuid4
|
||||||
|
|
||||||
|
from swiftclient import client
|
||||||
|
|
||||||
|
from swift.cli.relinker import relink, cleanup
|
||||||
|
from swift.common.manager import Manager
|
||||||
|
from swift.common.ring import RingBuilder
|
||||||
|
from swift.common.utils import replace_partition_in_path
|
||||||
|
from swift.obj.diskfile import get_data_dir
|
||||||
|
from test.probe.common import ECProbeTest, ProbeTest, ReplProbeTest
|
||||||
|
|
||||||
|
|
||||||
|
class TestPartPowerIncrease(ProbeTest):
|
||||||
|
def setUp(self):
|
||||||
|
super(TestPartPowerIncrease, self).setUp()
|
||||||
|
_, self.ring_file_backup = mkstemp()
|
||||||
|
_, self.builder_file_backup = mkstemp()
|
||||||
|
self.ring_file = self.object_ring.serialized_path
|
||||||
|
self.builder_file = self.ring_file.replace('ring.gz', 'builder')
|
||||||
|
copyfile(self.ring_file, self.ring_file_backup)
|
||||||
|
copyfile(self.builder_file, self.builder_file_backup)
|
||||||
|
# In case the test user is not allowed to write rings
|
||||||
|
self.assertTrue(os.access('/etc/swift', os.W_OK))
|
||||||
|
self.assertTrue(os.access('/etc/swift/backups', os.W_OK))
|
||||||
|
self.assertTrue(os.access('/etc/swift/object.builder', os.W_OK))
|
||||||
|
self.assertTrue(os.access('/etc/swift/object.ring.gz', os.W_OK))
|
||||||
|
# Ensure the test object will be erasure coded
|
||||||
|
self.data = ' ' * getattr(self.policy, 'ec_segment_size', 1)
|
||||||
|
|
||||||
|
self.devices = [
|
||||||
|
self.device_dir('object', {'ip': ip, 'port': port, 'device': ''})
|
||||||
|
for ip, port in set((dev['ip'], dev['port'])
|
||||||
|
for dev in self.object_ring.devs)]
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
# Keep a backup copy of the modified .builder file
|
||||||
|
backup_dir = os.path.join(
|
||||||
|
os.path.dirname(self.builder_file), 'backups')
|
||||||
|
try:
|
||||||
|
os.mkdir(backup_dir)
|
||||||
|
except OSError as err:
|
||||||
|
if err.errno != EEXIST:
|
||||||
|
raise
|
||||||
|
backup_name = (os.path.join(
|
||||||
|
backup_dir,
|
||||||
|
'%d.probe.' % time() + os.path.basename(self.builder_file)))
|
||||||
|
copyfile(self.builder_file, backup_name)
|
||||||
|
|
||||||
|
# Restore original ring
|
||||||
|
os.system('sudo mv %s %s' % (
|
||||||
|
self.ring_file_backup, self.ring_file))
|
||||||
|
os.system('sudo mv %s %s' % (
|
||||||
|
self.builder_file_backup, self.builder_file))
|
||||||
|
|
||||||
|
def _find_objs_ondisk(self, container, obj):
|
||||||
|
locations = []
|
||||||
|
opart, onodes = self.object_ring.get_nodes(
|
||||||
|
self.account, container, obj)
|
||||||
|
for node in onodes:
|
||||||
|
start_dir = os.path.join(
|
||||||
|
self.device_dir('object', node),
|
||||||
|
get_data_dir(self.policy),
|
||||||
|
str(opart))
|
||||||
|
for root, dirs, files in os.walk(start_dir):
|
||||||
|
for filename in files:
|
||||||
|
if filename.endswith('.data'):
|
||||||
|
locations.append(os.path.join(root, filename))
|
||||||
|
return locations
|
||||||
|
|
||||||
|
def _test_main(self, cancel=False):
|
||||||
|
container = 'container-%s' % uuid4()
|
||||||
|
obj = 'object-%s' % uuid4()
|
||||||
|
obj2 = 'object-%s' % uuid4()
|
||||||
|
|
||||||
|
# Create container
|
||||||
|
headers = {'X-Storage-Policy': self.policy.name}
|
||||||
|
client.put_container(self.url, self.token, container, headers=headers)
|
||||||
|
|
||||||
|
# Create a new object
|
||||||
|
client.put_object(self.url, self.token, container, obj, self.data)
|
||||||
|
client.head_object(self.url, self.token, container, obj)
|
||||||
|
|
||||||
|
# Prepare partition power increase
|
||||||
|
builder = RingBuilder.load(self.builder_file)
|
||||||
|
builder.prepare_increase_partition_power()
|
||||||
|
builder.save(self.builder_file)
|
||||||
|
ring_data = builder.get_ring()
|
||||||
|
ring_data.save(self.ring_file)
|
||||||
|
|
||||||
|
# Ensure the proxy uses the changed ring
|
||||||
|
Manager(['proxy']).restart()
|
||||||
|
|
||||||
|
# Ensure object is still accessible
|
||||||
|
client.head_object(self.url, self.token, container, obj)
|
||||||
|
|
||||||
|
# Relink existing objects
|
||||||
|
for device in self.devices:
|
||||||
|
self.assertEqual(0, relink(skip_mount_check=True, devices=device))
|
||||||
|
|
||||||
|
# Create second object after relinking and ensure it is accessible
|
||||||
|
client.put_object(self.url, self.token, container, obj2, self.data)
|
||||||
|
client.head_object(self.url, self.token, container, obj2)
|
||||||
|
|
||||||
|
# Remember the original object locations
|
||||||
|
org_locations = self._find_objs_ondisk(container, obj)
|
||||||
|
org_locations += self._find_objs_ondisk(container, obj2)
|
||||||
|
|
||||||
|
# Remember the new object locations
|
||||||
|
new_locations = []
|
||||||
|
for loc in org_locations:
|
||||||
|
new_locations.append(replace_partition_in_path(
|
||||||
|
str(loc), self.object_ring.part_power + 1))
|
||||||
|
|
||||||
|
# Overwrite existing object - to ensure that older timestamp files
|
||||||
|
# will be cleaned up properly later
|
||||||
|
client.put_object(self.url, self.token, container, obj, self.data)
|
||||||
|
|
||||||
|
# Ensure objects are still accessible
|
||||||
|
client.head_object(self.url, self.token, container, obj)
|
||||||
|
client.head_object(self.url, self.token, container, obj2)
|
||||||
|
|
||||||
|
# Increase partition power
|
||||||
|
builder = RingBuilder.load(self.builder_file)
|
||||||
|
if not cancel:
|
||||||
|
builder.increase_partition_power()
|
||||||
|
else:
|
||||||
|
builder.cancel_increase_partition_power()
|
||||||
|
builder.save(self.builder_file)
|
||||||
|
ring_data = builder.get_ring()
|
||||||
|
ring_data.save(self.ring_file)
|
||||||
|
|
||||||
|
# Ensure the proxy uses the changed ring
|
||||||
|
Manager(['proxy']).restart()
|
||||||
|
|
||||||
|
# Ensure objects are still accessible
|
||||||
|
client.head_object(self.url, self.token, container, obj)
|
||||||
|
client.head_object(self.url, self.token, container, obj2)
|
||||||
|
|
||||||
|
# Overwrite existing object - to ensure that older timestamp files
|
||||||
|
# will be cleaned up properly later
|
||||||
|
client.put_object(self.url, self.token, container, obj, self.data)
|
||||||
|
|
||||||
|
# Cleanup old objects in the wrong location
|
||||||
|
for device in self.devices:
|
||||||
|
self.assertEqual(0, cleanup(skip_mount_check=True, devices=device))
|
||||||
|
|
||||||
|
# Ensure objects are still accessible
|
||||||
|
client.head_object(self.url, self.token, container, obj)
|
||||||
|
client.head_object(self.url, self.token, container, obj2)
|
||||||
|
|
||||||
|
# Ensure data in old or relinked object locations is removed
|
||||||
|
if not cancel:
|
||||||
|
for fn in org_locations:
|
||||||
|
self.assertFalse(os.path.exists(fn))
|
||||||
|
else:
|
||||||
|
for fn in new_locations:
|
||||||
|
self.assertFalse(os.path.exists(fn))
|
||||||
|
|
||||||
|
|
||||||
|
class TestReplPartPowerIncrease(TestPartPowerIncrease, ReplProbeTest):
|
||||||
|
def test_main(self):
|
||||||
|
self._test_main()
|
||||||
|
|
||||||
|
def test_canceled(self):
|
||||||
|
self._test_main(cancel=True)
|
||||||
|
|
||||||
|
|
||||||
|
class TestECPartPowerIncrease(TestPartPowerIncrease, ECProbeTest):
|
||||||
|
def test_main(self):
|
||||||
|
self._test_main()
|
||||||
|
|
||||||
|
def test_canceled(self):
|
||||||
|
self._test_main(cancel=True)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
@ -300,8 +300,7 @@ class FabricatedRing(Ring):
|
|||||||
self.nodes = nodes
|
self.nodes = nodes
|
||||||
self.port = port
|
self.port = port
|
||||||
self.replicas = replicas
|
self.replicas = replicas
|
||||||
self.part_power = part_power
|
self._part_shift = 32 - part_power
|
||||||
self._part_shift = 32 - self.part_power
|
|
||||||
self._reload()
|
self._reload()
|
||||||
|
|
||||||
def _reload(self, *args, **kwargs):
|
def _reload(self, *args, **kwargs):
|
||||||
|
172
test/unit/cli/test_relinker.py
Normal file
172
test/unit/cli/test_relinker.py
Normal file
@ -0,0 +1,172 @@
|
|||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import binascii
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
import struct
|
||||||
|
import tempfile
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
from swift.cli import relinker
|
||||||
|
from swift.common import exceptions, ring, utils
|
||||||
|
from swift.common import storage_policy
|
||||||
|
from swift.common.storage_policy import (
|
||||||
|
StoragePolicy, StoragePolicyCollection, POLICIES)
|
||||||
|
|
||||||
|
from swift.obj.diskfile import write_metadata
|
||||||
|
|
||||||
|
from test.unit import FakeLogger
|
||||||
|
|
||||||
|
|
||||||
|
class TestRelinker(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.logger = FakeLogger()
|
||||||
|
self.testdir = tempfile.mkdtemp()
|
||||||
|
self.devices = os.path.join(self.testdir, 'node')
|
||||||
|
shutil.rmtree(self.testdir, ignore_errors=1)
|
||||||
|
os.mkdir(self.testdir)
|
||||||
|
os.mkdir(self.devices)
|
||||||
|
|
||||||
|
self.rb = ring.RingBuilder(8, 6.0, 1)
|
||||||
|
|
||||||
|
for i in range(6):
|
||||||
|
ip = "127.0.0.%s" % i
|
||||||
|
self.rb.add_dev({'id': i, 'region': 0, 'zone': 0, 'weight': 1,
|
||||||
|
'ip': ip, 'port': 10000, 'device': 'sda1'})
|
||||||
|
self.rb.rebalance(seed=1)
|
||||||
|
|
||||||
|
self.existing_device = 'sda1'
|
||||||
|
os.mkdir(os.path.join(self.devices, self.existing_device))
|
||||||
|
self.objects = os.path.join(self.devices, self.existing_device,
|
||||||
|
'objects')
|
||||||
|
os.mkdir(self.objects)
|
||||||
|
self._hash = utils.hash_path('a/c/o')
|
||||||
|
digest = binascii.unhexlify(self._hash)
|
||||||
|
part = struct.unpack_from('>I', digest)[0] >> 24
|
||||||
|
self.next_part = struct.unpack_from('>I', digest)[0] >> 23
|
||||||
|
self.objdir = os.path.join(
|
||||||
|
self.objects, str(part), self._hash[-3:], self._hash)
|
||||||
|
os.makedirs(self.objdir)
|
||||||
|
self.object_fname = "1278553064.00000.data"
|
||||||
|
self.objname = os.path.join(self.objdir, self.object_fname)
|
||||||
|
with open(self.objname, "wb") as dummy:
|
||||||
|
dummy.write("Hello World!")
|
||||||
|
write_metadata(dummy, {'name': '/a/c/o', 'Content-Length': '12'})
|
||||||
|
|
||||||
|
test_policies = [StoragePolicy(0, 'platin', True)]
|
||||||
|
storage_policy._POLICIES = StoragePolicyCollection(test_policies)
|
||||||
|
|
||||||
|
self.expected_dir = os.path.join(
|
||||||
|
self.objects, str(self.next_part), self._hash[-3:], self._hash)
|
||||||
|
self.expected_file = os.path.join(self.expected_dir, self.object_fname)
|
||||||
|
|
||||||
|
def _save_ring(self):
|
||||||
|
rd = self.rb.get_ring()
|
||||||
|
for policy in POLICIES:
|
||||||
|
rd.save(os.path.join(
|
||||||
|
self.testdir, '%s.ring.gz' % policy.ring_name))
|
||||||
|
# Enforce ring reloading in relinker
|
||||||
|
policy.object_ring = None
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
shutil.rmtree(self.testdir, ignore_errors=1)
|
||||||
|
storage_policy.reload_storage_policies()
|
||||||
|
|
||||||
|
def test_relink(self):
|
||||||
|
self.rb.prepare_increase_partition_power()
|
||||||
|
self._save_ring()
|
||||||
|
relinker.relink(self.testdir, self.devices, True)
|
||||||
|
|
||||||
|
self.assertTrue(os.path.isdir(self.expected_dir))
|
||||||
|
self.assertTrue(os.path.isfile(self.expected_file))
|
||||||
|
|
||||||
|
stat_old = os.stat(os.path.join(self.objdir, self.object_fname))
|
||||||
|
stat_new = os.stat(self.expected_file)
|
||||||
|
self.assertEqual(stat_old.st_ino, stat_new.st_ino)
|
||||||
|
|
||||||
|
def _common_test_cleanup(self, relink=True):
|
||||||
|
# Create a ring that has prev_part_power set
|
||||||
|
self.rb.prepare_increase_partition_power()
|
||||||
|
self.rb.increase_partition_power()
|
||||||
|
self._save_ring()
|
||||||
|
|
||||||
|
os.makedirs(self.expected_dir)
|
||||||
|
|
||||||
|
if relink:
|
||||||
|
# Create a hardlink to the original object name. This is expected
|
||||||
|
# after a normal relinker run
|
||||||
|
os.link(os.path.join(self.objdir, self.object_fname),
|
||||||
|
self.expected_file)
|
||||||
|
|
||||||
|
def test_cleanup(self):
|
||||||
|
self._common_test_cleanup()
|
||||||
|
self.assertEqual(0, relinker.cleanup(self.testdir, self.devices, True))
|
||||||
|
|
||||||
|
# Old objectname should be removed, new should still exist
|
||||||
|
self.assertTrue(os.path.isdir(self.expected_dir))
|
||||||
|
self.assertTrue(os.path.isfile(self.expected_file))
|
||||||
|
self.assertFalse(os.path.isfile(
|
||||||
|
os.path.join(self.objdir, self.object_fname)))
|
||||||
|
|
||||||
|
def test_cleanup_not_yet_relinked(self):
|
||||||
|
self._common_test_cleanup(relink=False)
|
||||||
|
self.assertEqual(1, relinker.cleanup(self.testdir, self.devices, True))
|
||||||
|
|
||||||
|
self.assertTrue(os.path.isfile(
|
||||||
|
os.path.join(self.objdir, self.object_fname)))
|
||||||
|
|
||||||
|
def test_cleanup_deleted(self):
|
||||||
|
self._common_test_cleanup()
|
||||||
|
|
||||||
|
# Pretend the object got deleted inbetween and there is a tombstone
|
||||||
|
fname_ts = self.expected_file[:-4] + "ts"
|
||||||
|
os.rename(self.expected_file, fname_ts)
|
||||||
|
|
||||||
|
self.assertEqual(0, relinker.cleanup(self.testdir, self.devices, True))
|
||||||
|
|
||||||
|
def test_cleanup_doesnotexist(self):
|
||||||
|
self._common_test_cleanup()
|
||||||
|
|
||||||
|
# Pretend the file in the new place got deleted inbetween
|
||||||
|
os.remove(self.expected_file)
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
1, relinker.cleanup(self.testdir, self.devices, True, self.logger))
|
||||||
|
self.assertEqual(self.logger.get_lines_for_level('warning'),
|
||||||
|
['Error cleaning up %s: %s' % (self.objname,
|
||||||
|
repr(exceptions.DiskFileNotExist()))])
|
||||||
|
|
||||||
|
def test_cleanup_non_durable_fragment(self):
|
||||||
|
self._common_test_cleanup()
|
||||||
|
|
||||||
|
# Actually all fragments are non-durable and raise and DiskFileNotExist
|
||||||
|
# in EC in this test. However, if the counterpart exists in the new
|
||||||
|
# location, this is ok - it will be fixed by the reconstructor later on
|
||||||
|
storage_policy._POLICIES[0].policy_type = 'erasure_coding'
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
0, relinker.cleanup(self.testdir, self.devices, True, self.logger))
|
||||||
|
self.assertEqual(self.logger.get_lines_for_level('warning'), [])
|
||||||
|
|
||||||
|
def test_cleanup_quarantined(self):
|
||||||
|
self._common_test_cleanup()
|
||||||
|
# Pretend the object in the new place got corrupted
|
||||||
|
with open(self.expected_file, "wb") as obj:
|
||||||
|
obj.write('trash')
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
1, relinker.cleanup(self.testdir, self.devices, True, self.logger))
|
||||||
|
|
||||||
|
self.assertIn('failed audit and was quarantined',
|
||||||
|
self.logger.get_lines_for_level('warning')[0])
|
@ -489,6 +489,16 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin):
|
|||||||
dev = ring.devs[-1]
|
dev = ring.devs[-1]
|
||||||
self.assertGreater(dev['region'], 0)
|
self.assertGreater(dev['region'], 0)
|
||||||
|
|
||||||
|
def test_add_device_part_power_increase(self):
|
||||||
|
self.create_sample_ring()
|
||||||
|
ring = RingBuilder.load(self.tmpfile)
|
||||||
|
ring.next_part_power = 1
|
||||||
|
ring.save(self.tmpfile)
|
||||||
|
|
||||||
|
argv = ["", self.tmpfile, "add",
|
||||||
|
"r0z0-127.0.1.1:6200/sda1_some meta data", "100"]
|
||||||
|
self.assertSystemExit(EXIT_WARNING, ringbuilder.main, argv)
|
||||||
|
|
||||||
def test_remove_device(self):
|
def test_remove_device(self):
|
||||||
for search_value in self.search_values:
|
for search_value in self.search_values:
|
||||||
self.create_sample_ring()
|
self.create_sample_ring()
|
||||||
@ -762,6 +772,15 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin):
|
|||||||
"--ip", "unknown"]
|
"--ip", "unknown"]
|
||||||
self.assertSystemExit(EXIT_ERROR, ringbuilder.main, argv)
|
self.assertSystemExit(EXIT_ERROR, ringbuilder.main, argv)
|
||||||
|
|
||||||
|
def test_remove_device_part_power_increase(self):
|
||||||
|
self.create_sample_ring()
|
||||||
|
ring = RingBuilder.load(self.tmpfile)
|
||||||
|
ring.next_part_power = 1
|
||||||
|
ring.save(self.tmpfile)
|
||||||
|
|
||||||
|
argv = ["", self.tmpfile, "remove", "d0"]
|
||||||
|
self.assertSystemExit(EXIT_WARNING, ringbuilder.main, argv)
|
||||||
|
|
||||||
def test_set_weight(self):
|
def test_set_weight(self):
|
||||||
for search_value in self.search_values:
|
for search_value in self.search_values:
|
||||||
self.create_sample_ring()
|
self.create_sample_ring()
|
||||||
@ -1988,6 +2007,15 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin):
|
|||||||
ring = RingBuilder.load(self.tmpfile)
|
ring = RingBuilder.load(self.tmpfile)
|
||||||
self.assertEqual(last_replica2part2dev, ring._replica2part2dev)
|
self.assertEqual(last_replica2part2dev, ring._replica2part2dev)
|
||||||
|
|
||||||
|
def test_rebalance_part_power_increase(self):
|
||||||
|
self.create_sample_ring()
|
||||||
|
ring = RingBuilder.load(self.tmpfile)
|
||||||
|
ring.next_part_power = 1
|
||||||
|
ring.save(self.tmpfile)
|
||||||
|
|
||||||
|
argv = ["", self.tmpfile, "rebalance", "3"]
|
||||||
|
self.assertSystemExit(EXIT_WARNING, ringbuilder.main, argv)
|
||||||
|
|
||||||
def test_write_ring(self):
|
def test_write_ring(self):
|
||||||
self.create_sample_ring()
|
self.create_sample_ring()
|
||||||
argv = ["", self.tmpfile, "rebalance"]
|
argv = ["", self.tmpfile, "rebalance"]
|
||||||
|
@ -2604,6 +2604,40 @@ class TestRingBuilder(unittest.TestCase):
|
|||||||
except exceptions.DuplicateDeviceError:
|
except exceptions.DuplicateDeviceError:
|
||||||
self.fail("device hole not reused")
|
self.fail("device hole not reused")
|
||||||
|
|
||||||
|
def test_prepare_increase_partition_power(self):
|
||||||
|
ring_file = os.path.join(self.testdir, 'test_partpower.ring.gz')
|
||||||
|
|
||||||
|
rb = ring.RingBuilder(8, 3.0, 1)
|
||||||
|
self.assertEqual(rb.part_power, 8)
|
||||||
|
|
||||||
|
# add more devices than replicas to the ring
|
||||||
|
for i in range(10):
|
||||||
|
dev = "sdx%s" % i
|
||||||
|
rb.add_dev({'id': i, 'region': 0, 'zone': 0, 'weight': 1,
|
||||||
|
'ip': '127.0.0.1', 'port': 10000, 'device': dev})
|
||||||
|
rb.rebalance(seed=1)
|
||||||
|
|
||||||
|
self.assertFalse(rb.cancel_increase_partition_power())
|
||||||
|
self.assertEqual(rb.part_power, 8)
|
||||||
|
self.assertIsNone(rb.next_part_power)
|
||||||
|
|
||||||
|
self.assertFalse(rb.finish_increase_partition_power())
|
||||||
|
self.assertEqual(rb.part_power, 8)
|
||||||
|
self.assertIsNone(rb.next_part_power)
|
||||||
|
|
||||||
|
self.assertTrue(rb.prepare_increase_partition_power())
|
||||||
|
self.assertEqual(rb.part_power, 8)
|
||||||
|
self.assertEqual(rb.next_part_power, 9)
|
||||||
|
|
||||||
|
# Save .ring.gz, and load ring from it to ensure prev/next is set
|
||||||
|
rd = rb.get_ring()
|
||||||
|
rd.save(ring_file)
|
||||||
|
|
||||||
|
r = ring.Ring(ring_file)
|
||||||
|
expected_part_shift = 32 - 8
|
||||||
|
self.assertEqual(expected_part_shift, r._part_shift)
|
||||||
|
self.assertEqual(9, r.next_part_power)
|
||||||
|
|
||||||
def test_increase_partition_power(self):
|
def test_increase_partition_power(self):
|
||||||
rb = ring.RingBuilder(8, 3.0, 1)
|
rb = ring.RingBuilder(8, 3.0, 1)
|
||||||
self.assertEqual(rb.part_power, 8)
|
self.assertEqual(rb.part_power, 8)
|
||||||
@ -2623,13 +2657,18 @@ class TestRingBuilder(unittest.TestCase):
|
|||||||
old_part, old_nodes = r.get_nodes("acc", "cont", "obj")
|
old_part, old_nodes = r.get_nodes("acc", "cont", "obj")
|
||||||
old_version = rb.version
|
old_version = rb.version
|
||||||
|
|
||||||
rb.increase_partition_power()
|
self.assertTrue(rb.prepare_increase_partition_power())
|
||||||
|
self.assertTrue(rb.increase_partition_power())
|
||||||
rb.validate()
|
rb.validate()
|
||||||
changed_parts, _balance, removed_devs = rb.rebalance()
|
changed_parts, _balance, removed_devs = rb.rebalance()
|
||||||
|
|
||||||
self.assertEqual(changed_parts, 0)
|
self.assertEqual(changed_parts, 0)
|
||||||
self.assertEqual(removed_devs, 0)
|
self.assertEqual(removed_devs, 0)
|
||||||
|
|
||||||
|
# Make sure cancellation is not possible
|
||||||
|
# after increasing the partition power
|
||||||
|
self.assertFalse(rb.cancel_increase_partition_power())
|
||||||
|
|
||||||
old_ring = r
|
old_ring = r
|
||||||
rd = rb.get_ring()
|
rd = rb.get_ring()
|
||||||
rd.save(ring_file)
|
rd.save(ring_file)
|
||||||
@ -2637,8 +2676,9 @@ class TestRingBuilder(unittest.TestCase):
|
|||||||
new_part, new_nodes = r.get_nodes("acc", "cont", "obj")
|
new_part, new_nodes = r.get_nodes("acc", "cont", "obj")
|
||||||
|
|
||||||
# sanity checks
|
# sanity checks
|
||||||
self.assertEqual(rb.part_power, 9)
|
self.assertEqual(9, rb.part_power)
|
||||||
self.assertEqual(rb.version, old_version + 2)
|
self.assertEqual(9, rb.next_part_power)
|
||||||
|
self.assertEqual(rb.version, old_version + 3)
|
||||||
|
|
||||||
# make sure there is always the same device assigned to every pair of
|
# make sure there is always the same device assigned to every pair of
|
||||||
# partitions
|
# partitions
|
||||||
@ -2670,6 +2710,107 @@ class TestRingBuilder(unittest.TestCase):
|
|||||||
# nodes after increasing the partition power
|
# nodes after increasing the partition power
|
||||||
self.assertEqual(old_nodes, new_nodes)
|
self.assertEqual(old_nodes, new_nodes)
|
||||||
|
|
||||||
|
def test_finalize_increase_partition_power(self):
|
||||||
|
ring_file = os.path.join(self.testdir, 'test_partpower.ring.gz')
|
||||||
|
|
||||||
|
rb = ring.RingBuilder(8, 3.0, 1)
|
||||||
|
self.assertEqual(rb.part_power, 8)
|
||||||
|
|
||||||
|
# add more devices than replicas to the ring
|
||||||
|
for i in range(10):
|
||||||
|
dev = "sdx%s" % i
|
||||||
|
rb.add_dev({'id': i, 'region': 0, 'zone': 0, 'weight': 1,
|
||||||
|
'ip': '127.0.0.1', 'port': 10000, 'device': dev})
|
||||||
|
rb.rebalance(seed=1)
|
||||||
|
|
||||||
|
self.assertTrue(rb.prepare_increase_partition_power())
|
||||||
|
|
||||||
|
# Make sure this doesn't do any harm before actually increasing the
|
||||||
|
# partition power
|
||||||
|
self.assertFalse(rb.finish_increase_partition_power())
|
||||||
|
self.assertEqual(rb.next_part_power, 9)
|
||||||
|
|
||||||
|
self.assertTrue(rb.increase_partition_power())
|
||||||
|
|
||||||
|
self.assertFalse(rb.prepare_increase_partition_power())
|
||||||
|
self.assertEqual(rb.part_power, 9)
|
||||||
|
self.assertEqual(rb.next_part_power, 9)
|
||||||
|
|
||||||
|
self.assertTrue(rb.finish_increase_partition_power())
|
||||||
|
|
||||||
|
self.assertEqual(rb.part_power, 9)
|
||||||
|
self.assertIsNone(rb.next_part_power)
|
||||||
|
|
||||||
|
# Save .ring.gz, and load ring from it to ensure prev/next is set
|
||||||
|
rd = rb.get_ring()
|
||||||
|
rd.save(ring_file)
|
||||||
|
|
||||||
|
r = ring.Ring(ring_file)
|
||||||
|
expected_part_shift = 32 - 9
|
||||||
|
self.assertEqual(expected_part_shift, r._part_shift)
|
||||||
|
self.assertIsNone(r.next_part_power)
|
||||||
|
|
||||||
|
def test_prepare_increase_partition_power_failed(self):
|
||||||
|
rb = ring.RingBuilder(8, 3.0, 1)
|
||||||
|
self.assertEqual(rb.part_power, 8)
|
||||||
|
|
||||||
|
self.assertTrue(rb.prepare_increase_partition_power())
|
||||||
|
self.assertEqual(rb.next_part_power, 9)
|
||||||
|
|
||||||
|
# next_part_power is still set, do not increase again
|
||||||
|
self.assertFalse(rb.prepare_increase_partition_power())
|
||||||
|
self.assertEqual(rb.next_part_power, 9)
|
||||||
|
|
||||||
|
def test_increase_partition_power_failed(self):
|
||||||
|
rb = ring.RingBuilder(8, 3.0, 1)
|
||||||
|
self.assertEqual(rb.part_power, 8)
|
||||||
|
|
||||||
|
# add more devices than replicas to the ring
|
||||||
|
for i in range(10):
|
||||||
|
dev = "sdx%s" % i
|
||||||
|
rb.add_dev({'id': i, 'region': 0, 'zone': 0, 'weight': 1,
|
||||||
|
'ip': '127.0.0.1', 'port': 10000, 'device': dev})
|
||||||
|
rb.rebalance(seed=1)
|
||||||
|
|
||||||
|
# next_part_power not set, can't increase the part power
|
||||||
|
self.assertFalse(rb.increase_partition_power())
|
||||||
|
self.assertEqual(rb.part_power, 8)
|
||||||
|
|
||||||
|
self.assertTrue(rb.prepare_increase_partition_power())
|
||||||
|
|
||||||
|
self.assertTrue(rb.increase_partition_power())
|
||||||
|
self.assertEqual(rb.part_power, 9)
|
||||||
|
|
||||||
|
# part_power already increased
|
||||||
|
self.assertFalse(rb.increase_partition_power())
|
||||||
|
self.assertEqual(rb.part_power, 9)
|
||||||
|
|
||||||
|
def test_cancel_increase_partition_power(self):
|
||||||
|
rb = ring.RingBuilder(8, 3.0, 1)
|
||||||
|
self.assertEqual(rb.part_power, 8)
|
||||||
|
|
||||||
|
# add more devices than replicas to the ring
|
||||||
|
for i in range(10):
|
||||||
|
dev = "sdx%s" % i
|
||||||
|
rb.add_dev({'id': i, 'region': 0, 'zone': 0, 'weight': 1,
|
||||||
|
'ip': '127.0.0.1', 'port': 10000, 'device': dev})
|
||||||
|
rb.rebalance(seed=1)
|
||||||
|
|
||||||
|
old_version = rb.version
|
||||||
|
self.assertTrue(rb.prepare_increase_partition_power())
|
||||||
|
|
||||||
|
# sanity checks
|
||||||
|
self.assertEqual(8, rb.part_power)
|
||||||
|
self.assertEqual(9, rb.next_part_power)
|
||||||
|
self.assertEqual(rb.version, old_version + 1)
|
||||||
|
|
||||||
|
self.assertTrue(rb.cancel_increase_partition_power())
|
||||||
|
rb.validate()
|
||||||
|
|
||||||
|
self.assertEqual(8, rb.part_power)
|
||||||
|
self.assertEqual(8, rb.next_part_power)
|
||||||
|
self.assertEqual(rb.version, old_version + 2)
|
||||||
|
|
||||||
|
|
||||||
class TestGetRequiredOverload(unittest.TestCase):
|
class TestGetRequiredOverload(unittest.TestCase):
|
||||||
|
|
||||||
|
@ -3809,6 +3809,28 @@ cluster_dfw1 = http://dfw1.host/v1/
|
|||||||
self.fail('Invalid results from pure function:\n%s' %
|
self.fail('Invalid results from pure function:\n%s' %
|
||||||
'\n'.join(failures))
|
'\n'.join(failures))
|
||||||
|
|
||||||
|
def test_replace_partition_in_path(self):
|
||||||
|
# Check for new part = part * 2
|
||||||
|
old = '/s/n/d/o/700/c77/af088baea4806dcaba30bf07d9e64c77/f'
|
||||||
|
new = '/s/n/d/o/1400/c77/af088baea4806dcaba30bf07d9e64c77/f'
|
||||||
|
# Expected outcome
|
||||||
|
self.assertEqual(utils.replace_partition_in_path(old, 11), new)
|
||||||
|
|
||||||
|
# Make sure there is no change if the part power didn't change
|
||||||
|
self.assertEqual(utils.replace_partition_in_path(old, 10), old)
|
||||||
|
self.assertEqual(utils.replace_partition_in_path(new, 11), new)
|
||||||
|
|
||||||
|
# Check for new part = part * 2 + 1
|
||||||
|
old = '/s/n/d/o/693/c77/ad708baea4806dcaba30bf07d9e64c77/f'
|
||||||
|
new = '/s/n/d/o/1387/c77/ad708baea4806dcaba30bf07d9e64c77/f'
|
||||||
|
|
||||||
|
# Expected outcome
|
||||||
|
self.assertEqual(utils.replace_partition_in_path(old, 11), new)
|
||||||
|
|
||||||
|
# Make sure there is no change if the part power didn't change
|
||||||
|
self.assertEqual(utils.replace_partition_in_path(old, 10), old)
|
||||||
|
self.assertEqual(utils.replace_partition_in_path(new, 11), new)
|
||||||
|
|
||||||
|
|
||||||
class ResellerConfReader(unittest.TestCase):
|
class ResellerConfReader(unittest.TestCase):
|
||||||
|
|
||||||
|
@ -28,6 +28,7 @@ import uuid
|
|||||||
import xattr
|
import xattr
|
||||||
import re
|
import re
|
||||||
import six
|
import six
|
||||||
|
import struct
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from random import shuffle, randint
|
from random import shuffle, randint
|
||||||
from shutil import rmtree
|
from shutil import rmtree
|
||||||
@ -3839,14 +3840,23 @@ class DiskFileMixin(BaseDiskFileTestMixin):
|
|||||||
DiskFileNoSpace,
|
DiskFileNoSpace,
|
||||||
diskfile.write_metadata, 'n/a', metadata)
|
diskfile.write_metadata, 'n/a', metadata)
|
||||||
|
|
||||||
def _create_diskfile_dir(self, timestamp, policy, legacy_durable=False):
|
def _create_diskfile_dir(self, timestamp, policy, legacy_durable=False,
|
||||||
|
partition=0, next_part_power=None,
|
||||||
|
expect_error=False):
|
||||||
timestamp = Timestamp(timestamp)
|
timestamp = Timestamp(timestamp)
|
||||||
df = self._simple_get_diskfile(account='a', container='c',
|
df = self._simple_get_diskfile(account='a', container='c',
|
||||||
obj='o_%s' % policy,
|
obj='o_%s' % policy,
|
||||||
policy=policy)
|
policy=policy,
|
||||||
|
partition=partition,
|
||||||
|
next_part_power=next_part_power)
|
||||||
frag_index = None
|
frag_index = None
|
||||||
if policy.policy_type == EC_POLICY:
|
if policy.policy_type == EC_POLICY:
|
||||||
frag_index = df._frag_index or 7
|
frag_index = df._frag_index or 7
|
||||||
|
if expect_error:
|
||||||
|
with self.assertRaises(Exception):
|
||||||
|
write_diskfile(df, timestamp, frag_index=frag_index,
|
||||||
|
legacy_durable=legacy_durable)
|
||||||
|
else:
|
||||||
write_diskfile(df, timestamp, frag_index=frag_index,
|
write_diskfile(df, timestamp, frag_index=frag_index,
|
||||||
legacy_durable=legacy_durable)
|
legacy_durable=legacy_durable)
|
||||||
return df._datadir
|
return df._datadir
|
||||||
@ -3892,7 +3902,77 @@ class DiskFileMixin(BaseDiskFileTestMixin):
|
|||||||
for policy in POLICIES:
|
for policy in POLICIES:
|
||||||
self._do_test_write_cleanup(policy, legacy_durable=True)
|
self._do_test_write_cleanup(policy, legacy_durable=True)
|
||||||
|
|
||||||
def test_commit_no_extra_fsync(self):
|
@mock.patch("swift.obj.diskfile.BaseDiskFileManager.cleanup_ondisk_files")
|
||||||
|
def test_write_cleanup_part_power_increase(self, mock_cleanup):
|
||||||
|
# Without next_part_power set we expect only one cleanup per DiskFile
|
||||||
|
# and no linking
|
||||||
|
for policy in POLICIES:
|
||||||
|
timestamp = Timestamp(time()).internal
|
||||||
|
df_dir = self._create_diskfile_dir(timestamp, policy)
|
||||||
|
self.assertEqual(1, mock_cleanup.call_count)
|
||||||
|
mock_cleanup.assert_called_once_with(df_dir)
|
||||||
|
mock_cleanup.reset_mock()
|
||||||
|
|
||||||
|
# With next_part_power set to part_power + 1 we expect two cleanups per
|
||||||
|
# DiskFile: first cleanup the current directory, but also cleanup the
|
||||||
|
# future directory where hardlinks are created
|
||||||
|
for policy in POLICIES:
|
||||||
|
timestamp = Timestamp(time()).internal
|
||||||
|
df_dir = self._create_diskfile_dir(
|
||||||
|
timestamp, policy, next_part_power=11)
|
||||||
|
|
||||||
|
self.assertEqual(2, mock_cleanup.call_count)
|
||||||
|
mock_cleanup.assert_any_call(df_dir)
|
||||||
|
|
||||||
|
# Make sure the translated path is also cleaned up
|
||||||
|
expected_fname = utils.replace_partition_in_path(
|
||||||
|
os.path.join(df_dir, "dummy"), 11)
|
||||||
|
expected_dir = os.path.dirname(expected_fname)
|
||||||
|
mock_cleanup.assert_any_call(expected_dir)
|
||||||
|
|
||||||
|
mock_cleanup.reset_mock()
|
||||||
|
|
||||||
|
# With next_part_power set to part_power we expect two cleanups per
|
||||||
|
# DiskFile: first cleanup the current directory, but also cleanup the
|
||||||
|
# previous old directory
|
||||||
|
for policy in POLICIES:
|
||||||
|
digest = utils.hash_path(
|
||||||
|
'a', 'c', 'o_%s' % policy, raw_digest=True)
|
||||||
|
partition = struct.unpack_from('>I', digest)[0] >> (32 - 10)
|
||||||
|
timestamp = Timestamp(time()).internal
|
||||||
|
df_dir = self._create_diskfile_dir(
|
||||||
|
timestamp, policy, partition=partition, next_part_power=10)
|
||||||
|
|
||||||
|
self.assertEqual(2, mock_cleanup.call_count)
|
||||||
|
mock_cleanup.assert_any_call(df_dir)
|
||||||
|
|
||||||
|
# Make sure the path using the old part power is also cleaned up
|
||||||
|
expected_fname = utils.replace_partition_in_path(
|
||||||
|
os.path.join(df_dir, "dummy"), 9)
|
||||||
|
expected_dir = os.path.dirname(expected_fname)
|
||||||
|
mock_cleanup.assert_any_call(expected_dir)
|
||||||
|
|
||||||
|
mock_cleanup.reset_mock()
|
||||||
|
|
||||||
|
@mock.patch.object(diskfile.BaseDiskFileManager, 'cleanup_ondisk_files',
|
||||||
|
side_effect=Exception)
|
||||||
|
def test_killed_before_cleanup(self, mock_cleanup):
|
||||||
|
for policy in POLICIES:
|
||||||
|
timestamp = Timestamp(time()).internal
|
||||||
|
digest = utils.hash_path(
|
||||||
|
'a', 'c', 'o_%s' % policy, raw_digest=True)
|
||||||
|
partition = struct.unpack_from('>I', digest)[0] >> (32 - 10)
|
||||||
|
df_dir = self._create_diskfile_dir(timestamp, policy,
|
||||||
|
partition=partition,
|
||||||
|
next_part_power=11,
|
||||||
|
expect_error=True)
|
||||||
|
expected_fname = utils.replace_partition_in_path(
|
||||||
|
os.path.join(df_dir, "dummy"), 11)
|
||||||
|
expected_dir = os.path.dirname(expected_fname)
|
||||||
|
|
||||||
|
self.assertEqual(os.listdir(df_dir), os.listdir(expected_dir))
|
||||||
|
|
||||||
|
def test_commit_fsync(self):
|
||||||
for policy in POLICIES:
|
for policy in POLICIES:
|
||||||
df = self._simple_get_diskfile(account='a', container='c',
|
df = self._simple_get_diskfile(account='a', container='c',
|
||||||
obj='o', policy=policy)
|
obj='o', policy=policy)
|
||||||
|
@ -65,7 +65,26 @@ def mock_ssync_sender(ssync_calls=None, response_callback=None, **kwargs):
|
|||||||
yield fake_ssync
|
yield fake_ssync
|
||||||
|
|
||||||
|
|
||||||
def _create_test_rings(path):
|
def make_ec_archive_bodies(policy, test_body):
|
||||||
|
segment_size = policy.ec_segment_size
|
||||||
|
# split up the body into buffers
|
||||||
|
chunks = [test_body[x:x + segment_size]
|
||||||
|
for x in range(0, len(test_body), segment_size)]
|
||||||
|
# encode the buffers into fragment payloads
|
||||||
|
fragment_payloads = []
|
||||||
|
for chunk in chunks:
|
||||||
|
fragments = \
|
||||||
|
policy.pyeclib_driver.encode(chunk) * policy.ec_duplication_factor
|
||||||
|
if not fragments:
|
||||||
|
break
|
||||||
|
fragment_payloads.append(fragments)
|
||||||
|
|
||||||
|
# join up the fragment payloads per node
|
||||||
|
ec_archive_bodies = [''.join(frags) for frags in zip(*fragment_payloads)]
|
||||||
|
return ec_archive_bodies
|
||||||
|
|
||||||
|
|
||||||
|
def _create_test_rings(path, next_part_power=None):
|
||||||
testgz = os.path.join(path, 'object.ring.gz')
|
testgz = os.path.join(path, 'object.ring.gz')
|
||||||
intended_replica2part2dev_id = [
|
intended_replica2part2dev_id = [
|
||||||
[0, 1, 2],
|
[0, 1, 2],
|
||||||
@ -87,14 +106,16 @@ def _create_test_rings(path):
|
|||||||
with closing(GzipFile(testgz, 'wb')) as f:
|
with closing(GzipFile(testgz, 'wb')) as f:
|
||||||
pickle.dump(
|
pickle.dump(
|
||||||
ring.RingData(intended_replica2part2dev_id,
|
ring.RingData(intended_replica2part2dev_id,
|
||||||
intended_devs, intended_part_shift),
|
intended_devs, intended_part_shift,
|
||||||
|
next_part_power),
|
||||||
f)
|
f)
|
||||||
|
|
||||||
testgz = os.path.join(path, 'object-1.ring.gz')
|
testgz = os.path.join(path, 'object-1.ring.gz')
|
||||||
with closing(GzipFile(testgz, 'wb')) as f:
|
with closing(GzipFile(testgz, 'wb')) as f:
|
||||||
pickle.dump(
|
pickle.dump(
|
||||||
ring.RingData(intended_replica2part2dev_id,
|
ring.RingData(intended_replica2part2dev_id,
|
||||||
intended_devs, intended_part_shift),
|
intended_devs, intended_part_shift,
|
||||||
|
next_part_power),
|
||||||
f)
|
f)
|
||||||
|
|
||||||
|
|
||||||
@ -1218,6 +1239,19 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
|||||||
self.assertEqual(self.reconstructor.suffix_count, 0)
|
self.assertEqual(self.reconstructor.suffix_count, 0)
|
||||||
self.assertEqual(len(found_jobs), 6)
|
self.assertEqual(len(found_jobs), 6)
|
||||||
|
|
||||||
|
def test_reconstructor_skipped_partpower_increase(self):
|
||||||
|
self.reconstructor._reset_stats()
|
||||||
|
_create_test_rings(self.testdir, 10)
|
||||||
|
# Enforce re-reading the EC ring
|
||||||
|
POLICIES[1].object_ring = ring.Ring(self.testdir, ring_name='object-1')
|
||||||
|
|
||||||
|
self.reconstructor.reconstruct()
|
||||||
|
|
||||||
|
self.assertEqual(0, self.reconstructor.reconstruction_count)
|
||||||
|
warnings = self.reconstructor.logger.get_lines_for_level('warning')
|
||||||
|
self.assertIn(
|
||||||
|
"next_part_power set in policy 'one'. Skipping", warnings)
|
||||||
|
|
||||||
|
|
||||||
class TestGlobalSetupObjectReconstructorLegacyDurable(
|
class TestGlobalSetupObjectReconstructorLegacyDurable(
|
||||||
TestGlobalSetupObjectReconstructor):
|
TestGlobalSetupObjectReconstructor):
|
||||||
|
@ -131,7 +131,7 @@ def _mock_process(ret):
|
|||||||
object_replicator.subprocess.Popen = orig_process
|
object_replicator.subprocess.Popen = orig_process
|
||||||
|
|
||||||
|
|
||||||
def _create_test_rings(path, devs=None):
|
def _create_test_rings(path, devs=None, next_part_power=None):
|
||||||
testgz = os.path.join(path, 'object.ring.gz')
|
testgz = os.path.join(path, 'object.ring.gz')
|
||||||
intended_replica2part2dev_id = [
|
intended_replica2part2dev_id = [
|
||||||
[0, 1, 2, 3, 4, 5, 6],
|
[0, 1, 2, 3, 4, 5, 6],
|
||||||
@ -159,14 +159,14 @@ def _create_test_rings(path, devs=None):
|
|||||||
with closing(GzipFile(testgz, 'wb')) as f:
|
with closing(GzipFile(testgz, 'wb')) as f:
|
||||||
pickle.dump(
|
pickle.dump(
|
||||||
ring.RingData(intended_replica2part2dev_id,
|
ring.RingData(intended_replica2part2dev_id,
|
||||||
intended_devs, intended_part_shift),
|
intended_devs, intended_part_shift, next_part_power),
|
||||||
f)
|
f)
|
||||||
|
|
||||||
testgz = os.path.join(path, 'object-1.ring.gz')
|
testgz = os.path.join(path, 'object-1.ring.gz')
|
||||||
with closing(GzipFile(testgz, 'wb')) as f:
|
with closing(GzipFile(testgz, 'wb')) as f:
|
||||||
pickle.dump(
|
pickle.dump(
|
||||||
ring.RingData(intended_replica2part2dev_id,
|
ring.RingData(intended_replica2part2dev_id,
|
||||||
intended_devs, intended_part_shift),
|
intended_devs, intended_part_shift, next_part_power),
|
||||||
f)
|
f)
|
||||||
for policy in POLICIES:
|
for policy in POLICIES:
|
||||||
policy.object_ring = None # force reload
|
policy.object_ring = None # force reload
|
||||||
@ -1959,6 +1959,15 @@ class TestObjectReplicator(unittest.TestCase):
|
|||||||
# After 10 cycles every partition is seen exactly once
|
# After 10 cycles every partition is seen exactly once
|
||||||
self.assertEqual(sorted(range(partitions)), sorted(seen))
|
self.assertEqual(sorted(range(partitions)), sorted(seen))
|
||||||
|
|
||||||
|
def test_replicate_skipped_partpower_increase(self):
|
||||||
|
_create_test_rings(self.testdir, next_part_power=4)
|
||||||
|
self.replicator.replicate()
|
||||||
|
self.assertEqual(0, self.replicator.job_count)
|
||||||
|
self.assertEqual(0, self.replicator.replication_count)
|
||||||
|
warnings = self.logger.get_lines_for_level('warning')
|
||||||
|
self.assertIn(
|
||||||
|
"next_part_power set in policy 'one'. Skipping", warnings)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
Loading…
Reference in New Issue
Block a user