Merge "Fix EC ring validation at ring reload"
This commit is contained in:
commit
0944753b37
@ -145,6 +145,10 @@ class LockTimeout(MessageTimeout):
|
||||
pass
|
||||
|
||||
|
||||
class RingLoadError(SwiftException):
|
||||
pass
|
||||
|
||||
|
||||
class RingBuilderError(SwiftException):
|
||||
pass
|
||||
|
||||
|
@ -29,6 +29,7 @@ from tempfile import NamedTemporaryFile
|
||||
|
||||
from six.moves import range
|
||||
|
||||
from swift.common.exceptions import RingLoadError
|
||||
from swift.common.utils import hash_path, validate_configuration
|
||||
from swift.common.ring.utils import tiers_for_dev
|
||||
|
||||
@ -156,9 +157,14 @@ class Ring(object):
|
||||
|
||||
:param serialized_path: path to serialized RingData instance
|
||||
:param reload_time: time interval in seconds to check for a ring change
|
||||
:param ring_name: ring name string (basically specified from policy)
|
||||
:param validation_hook: hook point to validate ring configuration ontime
|
||||
|
||||
:raises: RingLoadError if the loaded ring data violates its constraint
|
||||
"""
|
||||
|
||||
def __init__(self, serialized_path, reload_time=15, ring_name=None):
|
||||
def __init__(self, serialized_path, reload_time=15, ring_name=None,
|
||||
validation_hook=lambda ring_data: None):
|
||||
# can't use the ring unless HASH_PATH_SUFFIX is set
|
||||
validate_configuration()
|
||||
if ring_name:
|
||||
@ -167,12 +173,24 @@ class Ring(object):
|
||||
else:
|
||||
self.serialized_path = os.path.join(serialized_path)
|
||||
self.reload_time = reload_time
|
||||
self._validation_hook = validation_hook
|
||||
self._reload(force=True)
|
||||
|
||||
def _reload(self, force=False):
|
||||
self._rtime = time() + self.reload_time
|
||||
if force or self.has_changed():
|
||||
ring_data = RingData.load(self.serialized_path)
|
||||
|
||||
try:
|
||||
self._validation_hook(ring_data)
|
||||
except RingLoadError:
|
||||
if force:
|
||||
raise
|
||||
else:
|
||||
# In runtime reload at working server, it's ok to use old
|
||||
# ring data if the new ring data is invalid.
|
||||
return
|
||||
|
||||
self._mtime = getmtime(self.serialized_path)
|
||||
self._devs = ring_data.devs
|
||||
# NOTE(akscram): Replication parameters like replication_ip
|
||||
|
@ -21,7 +21,7 @@ from swift.common.utils import (
|
||||
config_true_value, SWIFT_CONF_FILE, whataremyips, list_from_csv)
|
||||
from swift.common.ring import Ring, RingData
|
||||
from swift.common.utils import quorum_size
|
||||
from swift.common.exceptions import RingValidationError
|
||||
from swift.common.exceptions import RingLoadError
|
||||
from pyeclib.ec_iface import ECDriver, ECDriverError, VALID_EC_TYPES
|
||||
|
||||
LEGACY_POLICY_NAME = 'Policy-0'
|
||||
@ -350,13 +350,6 @@ class BaseStoragePolicy(object):
|
||||
self._validate_policy_name(name)
|
||||
self.alias_list.insert(0, name)
|
||||
|
||||
def _validate_ring(self):
|
||||
"""
|
||||
Hook, called when the ring is loaded. Can be used to
|
||||
validate the ring against the StoragePolicy configuration.
|
||||
"""
|
||||
pass
|
||||
|
||||
def load_ring(self, swift_dir):
|
||||
"""
|
||||
Load the ring for this policy immediately.
|
||||
@ -367,9 +360,6 @@ class BaseStoragePolicy(object):
|
||||
return
|
||||
self.object_ring = Ring(swift_dir, ring_name=self.ring_name)
|
||||
|
||||
# Validate ring to make sure it conforms to policy requirements
|
||||
self._validate_ring()
|
||||
|
||||
@property
|
||||
def quorum(self):
|
||||
"""
|
||||
@ -552,25 +542,6 @@ class ECStoragePolicy(BaseStoragePolicy):
|
||||
info.pop('ec_type')
|
||||
return info
|
||||
|
||||
def _validate_ring(self):
|
||||
"""
|
||||
EC specific validation
|
||||
|
||||
Replica count check - we need _at_least_ (#data + #parity) replicas
|
||||
configured. Also if the replica count is larger than exactly that
|
||||
number there's a non-zero risk of error for code that is considering
|
||||
the number of nodes in the primary list from the ring.
|
||||
"""
|
||||
if not self.object_ring:
|
||||
raise PolicyError('Ring is not loaded')
|
||||
nodes_configured = self.object_ring.replica_count
|
||||
if nodes_configured != (self.ec_ndata + self.ec_nparity):
|
||||
raise RingValidationError(
|
||||
'EC ring for policy %s needs to be configured with '
|
||||
'exactly %d nodes. Got %d.' % (
|
||||
self.name, self.ec_ndata + self.ec_nparity,
|
||||
nodes_configured))
|
||||
|
||||
@property
|
||||
def quorum(self):
|
||||
"""
|
||||
@ -593,6 +564,37 @@ class ECStoragePolicy(BaseStoragePolicy):
|
||||
"""
|
||||
return self._ec_quorum_size
|
||||
|
||||
def load_ring(self, swift_dir):
|
||||
"""
|
||||
Load the ring for this policy immediately.
|
||||
|
||||
:param swift_dir: path to rings
|
||||
"""
|
||||
if self.object_ring:
|
||||
return
|
||||
|
||||
def validate_ring_data(ring_data):
|
||||
"""
|
||||
EC specific validation
|
||||
|
||||
Replica count check - we need _at_least_ (#data + #parity) replicas
|
||||
configured. Also if the replica count is larger than exactly that
|
||||
number there's a non-zero risk of error for code that is
|
||||
considering the number of nodes in the primary list from the ring.
|
||||
"""
|
||||
|
||||
nodes_configured = len(ring_data._replica2part2dev_id)
|
||||
if nodes_configured != (self.ec_ndata + self.ec_nparity):
|
||||
raise RingLoadError(
|
||||
'EC ring for policy %s needs to be configured with '
|
||||
'exactly %d replicas. Got %d.' % (
|
||||
self.name, self.ec_ndata + self.ec_nparity,
|
||||
nodes_configured))
|
||||
|
||||
self.object_ring = Ring(
|
||||
swift_dir, ring_name=self.ring_name,
|
||||
validation_hook=validate_ring_data)
|
||||
|
||||
|
||||
class StoragePolicyCollection(object):
|
||||
"""
|
||||
|
@ -26,7 +26,7 @@ from swift.common.storage_policy import (
|
||||
BaseStoragePolicy, StoragePolicy, ECStoragePolicy, REPL_POLICY, EC_POLICY,
|
||||
VALID_EC_TYPES, DEFAULT_EC_OBJECT_SEGMENT_SIZE, BindPortsCache)
|
||||
from swift.common.ring import RingData
|
||||
from swift.common.exceptions import RingValidationError
|
||||
from swift.common.exceptions import RingLoadError
|
||||
from pyeclib.ec_iface import ECDriver
|
||||
|
||||
|
||||
@ -1146,23 +1146,32 @@ class TestStoragePolicies(unittest.TestCase):
|
||||
test_policies = [
|
||||
ECStoragePolicy(0, 'ec8-2', ec_type=DEFAULT_TEST_EC_TYPE,
|
||||
ec_ndata=8, ec_nparity=2,
|
||||
object_ring=FakeRing(replicas=8),
|
||||
is_default=True),
|
||||
ECStoragePolicy(1, 'ec10-4', ec_type=DEFAULT_TEST_EC_TYPE,
|
||||
ec_ndata=10, ec_nparity=4,
|
||||
object_ring=FakeRing(replicas=10)),
|
||||
ec_ndata=10, ec_nparity=4),
|
||||
ECStoragePolicy(2, 'ec4-2', ec_type=DEFAULT_TEST_EC_TYPE,
|
||||
ec_ndata=4, ec_nparity=2,
|
||||
object_ring=FakeRing(replicas=7)),
|
||||
ec_ndata=4, ec_nparity=2),
|
||||
]
|
||||
actual_load_ring_replicas = [8, 10, 7]
|
||||
policies = StoragePolicyCollection(test_policies)
|
||||
|
||||
for policy in policies:
|
||||
msg = 'EC ring for policy %s needs to be configured with ' \
|
||||
'exactly %d nodes.' % \
|
||||
(policy.name, policy.ec_ndata + policy.ec_nparity)
|
||||
self.assertRaisesWithMessage(RingValidationError, msg,
|
||||
policy._validate_ring)
|
||||
def create_mock_ring_data(num_replica):
|
||||
class mock_ring_data_klass(object):
|
||||
def __init__(self):
|
||||
self._replica2part2dev_id = [0] * num_replica
|
||||
|
||||
return mock_ring_data_klass()
|
||||
|
||||
for policy, ring_replicas in zip(policies, actual_load_ring_replicas):
|
||||
with mock.patch('swift.common.ring.ring.RingData.load',
|
||||
return_value=create_mock_ring_data(ring_replicas)):
|
||||
with mock.patch(
|
||||
'swift.common.ring.ring.validate_configuration'):
|
||||
msg = 'EC ring for policy %s needs to be configured with ' \
|
||||
'exactly %d replicas.' % \
|
||||
(policy.name, policy.ec_ndata + policy.ec_nparity)
|
||||
self.assertRaisesWithMessage(RingLoadError, msg,
|
||||
policy.load_ring, 'mock')
|
||||
|
||||
def test_storage_policy_get_info(self):
|
||||
test_policies = [
|
||||
|
@ -51,6 +51,9 @@ class TestObjectController(test_server.TestObjectController):
|
||||
def test_PUT_ec_fragment_archive_etag_mismatch(self):
|
||||
pass
|
||||
|
||||
def test_reload_ring_ec(self):
|
||||
pass
|
||||
|
||||
|
||||
class TestContainerController(test_server.TestContainerController):
|
||||
pass
|
||||
|
@ -24,7 +24,7 @@ import sys
|
||||
import traceback
|
||||
import unittest
|
||||
from contextlib import contextmanager
|
||||
from shutil import rmtree
|
||||
from shutil import rmtree, copyfile
|
||||
import gc
|
||||
import time
|
||||
from textwrap import dedent
|
||||
@ -3018,6 +3018,96 @@ class TestObjectController(unittest.TestCase):
|
||||
test_content_type('test.css', iter(['', '', 'text/css',
|
||||
'text/css', 'text/css']))
|
||||
|
||||
@unpatch_policies
|
||||
def test_reload_ring_ec(self):
|
||||
policy = POLICIES[3]
|
||||
self.put_container("ec", "ec-con")
|
||||
|
||||
orig_rtime = policy.object_ring._rtime
|
||||
# save original file as back up
|
||||
copyfile(policy.object_ring.serialized_path,
|
||||
policy.object_ring.serialized_path + '.bak')
|
||||
|
||||
try:
|
||||
# overwrite with 2 replica, 2 devices ring
|
||||
obj_devs = []
|
||||
obj_devs.append(
|
||||
{'port': _test_sockets[-3].getsockname()[1],
|
||||
'device': 'sdg1'})
|
||||
obj_devs.append(
|
||||
{'port': _test_sockets[-2].getsockname()[1],
|
||||
'device': 'sdh1'})
|
||||
write_fake_ring(policy.object_ring.serialized_path,
|
||||
*obj_devs)
|
||||
|
||||
def get_ring_reloaded_response(method):
|
||||
# force to reload at the request
|
||||
policy.object_ring._rtime = 0
|
||||
|
||||
trans_data = ['%s /v1/a/ec-con/o2 HTTP/1.1\r\n' % method,
|
||||
'Host: localhost\r\n',
|
||||
'Connection: close\r\n',
|
||||
'X-Storage-Token: t\r\n']
|
||||
|
||||
if method == 'PUT':
|
||||
# small, so we don't get multiple EC stripes
|
||||
obj = 'abCD' * 10
|
||||
|
||||
extra_trans_data = [
|
||||
'Etag: "%s"\r\n' % md5(obj).hexdigest(),
|
||||
'Content-Length: %d\r\n' % len(obj),
|
||||
'Content-Type: application/octet-stream\r\n',
|
||||
'\r\n%s' % obj
|
||||
]
|
||||
trans_data.extend(extra_trans_data)
|
||||
else:
|
||||
trans_data.append('\r\n')
|
||||
|
||||
prolis = _test_sockets[0]
|
||||
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
|
||||
fd = sock.makefile()
|
||||
fd.write(''.join(trans_data))
|
||||
fd.flush()
|
||||
headers = readuntil2crlfs(fd)
|
||||
|
||||
# use older ring with rollbacking
|
||||
return headers
|
||||
|
||||
for method in ('PUT', 'HEAD', 'GET', 'POST', 'DELETE'):
|
||||
headers = get_ring_reloaded_response(method)
|
||||
exp = 'HTTP/1.1 20'
|
||||
self.assertEqual(headers[:len(exp)], exp)
|
||||
|
||||
# proxy didn't load newest ring, use older one
|
||||
self.assertEqual(3, policy.object_ring.replica_count)
|
||||
|
||||
if method == 'POST':
|
||||
# Take care fast post here!
|
||||
orig_post_as_copy = getattr(
|
||||
_test_servers[0], 'object_post_as_copy', None)
|
||||
try:
|
||||
_test_servers[0].object_post_as_copy = False
|
||||
with mock.patch.object(
|
||||
_test_servers[0],
|
||||
'object_post_as_copy', False):
|
||||
headers = get_ring_reloaded_response(method)
|
||||
finally:
|
||||
if orig_post_as_copy is None:
|
||||
del _test_servers[0].object_post_as_copy
|
||||
else:
|
||||
_test_servers[0].object_post_as_copy = \
|
||||
orig_post_as_copy
|
||||
|
||||
exp = 'HTTP/1.1 20'
|
||||
self.assertEqual(headers[:len(exp)], exp)
|
||||
# sanity
|
||||
self.assertEqual(3, policy.object_ring.replica_count)
|
||||
|
||||
finally:
|
||||
policy.object_ring._rtime = orig_rtime
|
||||
os.rename(policy.object_ring.serialized_path + '.bak',
|
||||
policy.object_ring.serialized_path)
|
||||
|
||||
def test_custom_mime_types_files(self):
|
||||
swift_dir = mkdtemp()
|
||||
try:
|
||||
|
Loading…
x
Reference in New Issue
Block a user