Merge "Refactor write_affinity DELETE handling"
This commit is contained in:
commit
43b22a30aa
@ -697,7 +697,8 @@ class BaseObjectController(Controller):
|
|||||||
"""
|
"""
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
def _delete_object(self, req, obj_ring, partition, headers):
|
def _delete_object(self, req, obj_ring, partition, headers,
|
||||||
|
node_count=None, node_iterator=None):
|
||||||
"""Delete object considering write-affinity.
|
"""Delete object considering write-affinity.
|
||||||
|
|
||||||
When deleting object in write affinity deployment, also take configured
|
When deleting object in write affinity deployment, also take configured
|
||||||
@ -711,37 +712,12 @@ class BaseObjectController(Controller):
|
|||||||
:param headers: system headers to storage nodes
|
:param headers: system headers to storage nodes
|
||||||
:return: Response object
|
:return: Response object
|
||||||
"""
|
"""
|
||||||
policy_index = req.headers.get('X-Backend-Storage-Policy-Index')
|
|
||||||
policy = POLICIES.get_by_index(policy_index)
|
|
||||||
|
|
||||||
node_count = None
|
|
||||||
node_iterator = None
|
|
||||||
|
|
||||||
policy_options = self.app.get_policy_options(policy)
|
|
||||||
is_local = policy_options.write_affinity_is_local_fn
|
|
||||||
if is_local is not None:
|
|
||||||
primaries = obj_ring.get_part_nodes(partition)
|
|
||||||
node_count = len(primaries)
|
|
||||||
|
|
||||||
local_handoffs = policy_options.write_affinity_handoff_delete_count
|
|
||||||
if local_handoffs is None:
|
|
||||||
local_primaries = [node for node in primaries
|
|
||||||
if is_local(node)]
|
|
||||||
local_handoffs = len(primaries) - len(local_primaries)
|
|
||||||
|
|
||||||
node_count += local_handoffs
|
|
||||||
|
|
||||||
node_iterator = self.iter_nodes_local_first(
|
|
||||||
obj_ring, partition, policy=policy, local_handoffs_first=True
|
|
||||||
)
|
|
||||||
|
|
||||||
status_overrides = {404: 204}
|
status_overrides = {404: 204}
|
||||||
resp = self.make_requests(req, obj_ring,
|
resp = self.make_requests(req, obj_ring,
|
||||||
partition, 'DELETE', req.swift_entity_path,
|
partition, 'DELETE', req.swift_entity_path,
|
||||||
headers, overrides=status_overrides,
|
headers, overrides=status_overrides,
|
||||||
node_count=node_count,
|
node_count=node_count,
|
||||||
node_iterator=node_iterator)
|
node_iterator=node_iterator)
|
||||||
|
|
||||||
return resp
|
return resp
|
||||||
|
|
||||||
def _post_object(self, req, obj_ring, partition, headers):
|
def _post_object(self, req, obj_ring, partition, headers):
|
||||||
@ -861,6 +837,7 @@ class BaseObjectController(Controller):
|
|||||||
|
|
||||||
# Include local handoff nodes if write-affinity is enabled.
|
# Include local handoff nodes if write-affinity is enabled.
|
||||||
node_count = len(nodes)
|
node_count = len(nodes)
|
||||||
|
node_iterator = None
|
||||||
policy = POLICIES.get_by_index(policy_index)
|
policy = POLICIES.get_by_index(policy_index)
|
||||||
policy_options = self.app.get_policy_options(policy)
|
policy_options = self.app.get_policy_options(policy)
|
||||||
is_local = policy_options.write_affinity_is_local_fn
|
is_local = policy_options.write_affinity_is_local_fn
|
||||||
@ -870,11 +847,16 @@ class BaseObjectController(Controller):
|
|||||||
local_primaries = [node for node in nodes if is_local(node)]
|
local_primaries = [node for node in nodes if is_local(node)]
|
||||||
local_handoffs = len(nodes) - len(local_primaries)
|
local_handoffs = len(nodes) - len(local_primaries)
|
||||||
node_count += local_handoffs
|
node_count += local_handoffs
|
||||||
|
node_iterator = self.iter_nodes_local_first(
|
||||||
|
obj_ring, partition, policy=policy, local_handoffs_first=True
|
||||||
|
)
|
||||||
|
|
||||||
headers = self._backend_requests(
|
headers = self._backend_requests(
|
||||||
req, node_count, container_partition, container_nodes,
|
req, node_count, container_partition, container_nodes,
|
||||||
container_path=container_path)
|
container_path=container_path)
|
||||||
return self._delete_object(req, obj_ring, partition, headers)
|
return self._delete_object(req, obj_ring, partition, headers,
|
||||||
|
node_count=node_count,
|
||||||
|
node_iterator=node_iterator)
|
||||||
|
|
||||||
|
|
||||||
@ObjectControllerRouter.register(REPL_POLICY)
|
@ObjectControllerRouter.register(REPL_POLICY)
|
||||||
|
@ -531,7 +531,7 @@ class CommonObjectControllerMixin(BaseObjectControllerMixin):
|
|||||||
self.assertNotIn('X-Delete-At-Host', headers)
|
self.assertNotIn('X-Delete-At-Host', headers)
|
||||||
self.assertNotIn('X-Delete-At-Device', headers)
|
self.assertNotIn('X-Delete-At-Device', headers)
|
||||||
|
|
||||||
def test_DELETE_write_affinity_before_replication(self):
|
def test_DELETE_write_affinity_after_replication(self):
|
||||||
policy_conf = self.app.get_policy_options(self.policy)
|
policy_conf = self.app.get_policy_options(self.policy)
|
||||||
policy_conf.write_affinity_handoff_delete_count = self.replicas() // 2
|
policy_conf.write_affinity_handoff_delete_count = self.replicas() // 2
|
||||||
policy_conf.write_affinity_is_local_fn = (
|
policy_conf.write_affinity_is_local_fn = (
|
||||||
@ -545,7 +545,7 @@ class CommonObjectControllerMixin(BaseObjectControllerMixin):
|
|||||||
|
|
||||||
self.assertEqual(resp.status_int, 204)
|
self.assertEqual(resp.status_int, 204)
|
||||||
|
|
||||||
def test_DELETE_write_affinity_after_replication(self):
|
def test_DELETE_write_affinity_before_replication(self):
|
||||||
policy_conf = self.app.get_policy_options(self.policy)
|
policy_conf = self.app.get_policy_options(self.policy)
|
||||||
policy_conf.write_affinity_handoff_delete_count = self.replicas() // 2
|
policy_conf.write_affinity_handoff_delete_count = self.replicas() // 2
|
||||||
policy_conf.write_affinity_is_local_fn = (
|
policy_conf.write_affinity_is_local_fn = (
|
||||||
@ -1416,6 +1416,36 @@ class TestReplicatedObjController(CommonObjectControllerMixin,
|
|||||||
self.assertIn(req.swift_entity_path.decode('utf-8'), log_lines[0])
|
self.assertIn(req.swift_entity_path.decode('utf-8'), log_lines[0])
|
||||||
self.assertIn('ERROR with Object server', log_lines[0])
|
self.assertIn('ERROR with Object server', log_lines[0])
|
||||||
|
|
||||||
|
def test_DELETE_with_write_affinity(self):
|
||||||
|
policy_conf = self.app.get_policy_options(self.policy)
|
||||||
|
policy_conf.write_affinity_handoff_delete_count = self.replicas() // 2
|
||||||
|
policy_conf.write_affinity_is_local_fn = (
|
||||||
|
lambda node: node['region'] == 1)
|
||||||
|
|
||||||
|
req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
|
||||||
|
|
||||||
|
codes = [204, 204, 404, 204]
|
||||||
|
with mocked_http_conn(*codes):
|
||||||
|
resp = req.get_response(self.app)
|
||||||
|
self.assertEqual(resp.status_int, 204)
|
||||||
|
|
||||||
|
codes = [204, 404, 404, 204]
|
||||||
|
with mocked_http_conn(*codes):
|
||||||
|
resp = req.get_response(self.app)
|
||||||
|
self.assertEqual(resp.status_int, 204)
|
||||||
|
|
||||||
|
policy_conf.write_affinity_handoff_delete_count = 2
|
||||||
|
|
||||||
|
codes = [204, 204, 404, 204, 404]
|
||||||
|
with mocked_http_conn(*codes):
|
||||||
|
resp = req.get_response(self.app)
|
||||||
|
self.assertEqual(resp.status_int, 204)
|
||||||
|
|
||||||
|
codes = [204, 404, 404, 204, 204]
|
||||||
|
with mocked_http_conn(*codes):
|
||||||
|
resp = req.get_response(self.app)
|
||||||
|
self.assertEqual(resp.status_int, 204)
|
||||||
|
|
||||||
def test_PUT_error_during_transfer_data(self):
|
def test_PUT_error_during_transfer_data(self):
|
||||||
class FakeReader(object):
|
class FakeReader(object):
|
||||||
def read(self, size):
|
def read(self, size):
|
||||||
@ -1816,15 +1846,39 @@ class TestReplicatedObjController(CommonObjectControllerMixin,
|
|||||||
|
|
||||||
@patch_policies(
|
@patch_policies(
|
||||||
[StoragePolicy(0, '1-replica', True),
|
[StoragePolicy(0, '1-replica', True),
|
||||||
StoragePolicy(1, '5-replica', False),
|
StoragePolicy(1, '4-replica', False),
|
||||||
StoragePolicy(2, '8-replica', False),
|
StoragePolicy(2, '8-replica', False),
|
||||||
StoragePolicy(3, '15-replica', False)],
|
StoragePolicy(3, '15-replica', False)],
|
||||||
fake_ring_args=[
|
fake_ring_args=[
|
||||||
{'replicas': 1}, {'replicas': 5}, {'replicas': 8}, {'replicas': 15}])
|
{'replicas': 1}, {'replicas': 4}, {'replicas': 8}, {'replicas': 15}])
|
||||||
class TestReplicatedObjControllerVariousReplicas(CommonObjectControllerMixin,
|
class TestReplicatedObjControllerVariousReplicas(CommonObjectControllerMixin,
|
||||||
unittest.TestCase):
|
unittest.TestCase):
|
||||||
controller_cls = obj.ReplicatedObjectController
|
controller_cls = obj.ReplicatedObjectController
|
||||||
|
|
||||||
|
def test_DELETE_with_write_affinity(self):
|
||||||
|
policy_index = 1
|
||||||
|
self.policy = POLICIES[policy_index]
|
||||||
|
policy_conf = self.app.get_policy_options(self.policy)
|
||||||
|
self.app.container_info['storage_policy'] = policy_index
|
||||||
|
policy_conf.write_affinity_handoff_delete_count = \
|
||||||
|
self.replicas(self.policy) // 2
|
||||||
|
policy_conf.write_affinity_is_local_fn = (
|
||||||
|
lambda node: node['region'] == 1)
|
||||||
|
|
||||||
|
req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
|
||||||
|
|
||||||
|
codes = [204, 204, 404, 404, 204, 204]
|
||||||
|
with mocked_http_conn(*codes):
|
||||||
|
resp = req.get_response(self.app)
|
||||||
|
self.assertEqual(resp.status_int, 204)
|
||||||
|
|
||||||
|
policy_conf.write_affinity_handoff_delete_count = 1
|
||||||
|
|
||||||
|
codes = [204, 204, 404, 404, 204]
|
||||||
|
with mocked_http_conn(*codes):
|
||||||
|
resp = req.get_response(self.app)
|
||||||
|
self.assertEqual(resp.status_int, 204)
|
||||||
|
|
||||||
|
|
||||||
@patch_policies()
|
@patch_policies()
|
||||||
class TestReplicatedObjControllerMimePutter(BaseObjectControllerMixin,
|
class TestReplicatedObjControllerMimePutter(BaseObjectControllerMixin,
|
||||||
|
Loading…
Reference in New Issue
Block a user