Send correct number of X-Delete-At-* headers
Send just as many requests with X-Delete-At-* as we do X-Container-* to the object server. Furthermore, stop the object server on making an update to the expirer queue when it wasn't told to do so and remove the log warning which would have been produced. Reason: It can be the case that the number of object replicas (OR) is larger than the number of container replicas (CR) for a given storage policy (most likely in case of EC). Before this commit, only CR object servers received the x-delete-at-* headers, which means that OR - CR object servers did not receive the headers. The servers missing the header would produce a log warning and create the x-delete-at-container header and async update on their own, which could lead to a bug, if the expiring_objects_container_divisor option was misconfigured. Change-Id: I20fc2f42f590fda995814a2fa7ba86019f9fddc1 Closes-Bug: #1733588
This commit is contained in:
parent
f994d99009
commit
d8f9045518
@ -391,15 +391,10 @@ class ObjectController(BaseStorageServer):
|
||||
if op != 'DELETE':
|
||||
delete_at_container = headers_in.get('X-Delete-At-Container', None)
|
||||
if not delete_at_container:
|
||||
self.logger.warning(
|
||||
'X-Delete-At-Container header must be specified for '
|
||||
'expiring objects background %s to work properly. Making '
|
||||
'best guess as to the container name for now.' % op)
|
||||
# TODO(gholt): In a future release, change the above warning to
|
||||
# a raised exception and remove the guess code below.
|
||||
delete_at_container = get_expirer_container(
|
||||
delete_at, self.expiring_objects_container_divisor,
|
||||
account, container, obj)
|
||||
# If header is missing, no update needed as sufficient other
|
||||
# object servers should perform the required update.
|
||||
return
|
||||
|
||||
partition = headers_in.get('X-Delete-At-Partition', None)
|
||||
hosts = headers_in.get('X-Delete-At-Host', '')
|
||||
contdevices = headers_in.get('X-Delete-At-Device', '')
|
||||
|
@ -325,15 +325,30 @@ class BaseObjectController(Controller):
|
||||
headers[index].get('X-Container-Device'),
|
||||
container['device'])
|
||||
|
||||
def set_delete_at_headers(index, delete_at_node):
|
||||
headers[index]['X-Delete-At-Container'] = delete_at_container
|
||||
headers[index]['X-Delete-At-Partition'] = delete_at_partition
|
||||
headers[index]['X-Delete-At-Host'] = csv_append(
|
||||
headers[index].get('X-Delete-At-Host'),
|
||||
'%(ip)s:%(port)s' % delete_at_node)
|
||||
headers[index]['X-Delete-At-Device'] = csv_append(
|
||||
headers[index].get('X-Delete-At-Device'),
|
||||
delete_at_node['device'])
|
||||
|
||||
n_updates_needed = num_container_updates(
|
||||
len(containers), quorum_size(len(containers)),
|
||||
n_outgoing, policy.quorum)
|
||||
|
||||
container_iter = itertools.cycle(containers)
|
||||
dan_iter = itertools.cycle(delete_at_nodes or [])
|
||||
existing_updates = 0
|
||||
while existing_updates < n_updates_needed:
|
||||
set_container_update(existing_updates % n_outgoing,
|
||||
next(container_iter))
|
||||
index = existing_updates % n_outgoing
|
||||
set_container_update(index, next(container_iter))
|
||||
if delete_at_nodes:
|
||||
# We reverse the index in order to distribute the updates
|
||||
# across all nodes.
|
||||
set_delete_at_headers(n_outgoing - 1 - index, next(dan_iter))
|
||||
existing_updates += 1
|
||||
|
||||
# Keep the number of expirer-queue deletes to a reasonable number.
|
||||
@ -360,18 +375,6 @@ class BaseObjectController(Controller):
|
||||
headers[i]['X-Backend-Clean-Expiring-Object-Queue'] = (
|
||||
't' if i < n_desired_queue_updates else 'f')
|
||||
|
||||
for i, node in enumerate(delete_at_nodes or []):
|
||||
i = i % len(headers)
|
||||
|
||||
headers[i]['X-Delete-At-Container'] = delete_at_container
|
||||
headers[i]['X-Delete-At-Partition'] = delete_at_partition
|
||||
headers[i]['X-Delete-At-Host'] = csv_append(
|
||||
headers[i].get('X-Delete-At-Host'),
|
||||
'%(ip)s:%(port)s' % node)
|
||||
headers[i]['X-Delete-At-Device'] = csv_append(
|
||||
headers[i].get('X-Delete-At-Device'),
|
||||
node['device'])
|
||||
|
||||
return headers
|
||||
|
||||
def _get_conn_response(self, putter, path, logger_thread_locals,
|
||||
|
@ -5582,11 +5582,7 @@ class TestObjectController(unittest.TestCase):
|
||||
'X-Backend-Storage-Policy-Index': int(policy)})
|
||||
self.object_controller.delete_at_update('PUT', 2, 'a', 'c', 'o',
|
||||
req, 'sda1', policy)
|
||||
self.assertEqual(
|
||||
self.logger.get_lines_for_level('warning'),
|
||||
['X-Delete-At-Container header must be specified for expiring '
|
||||
'objects background PUT to work properly. Making best guess as '
|
||||
'to the container name for now.'])
|
||||
self.assertEqual(given_args, [])
|
||||
|
||||
def test_delete_at_update_delete(self):
|
||||
policy = random.choice(list(POLICIES))
|
||||
|
@ -34,7 +34,7 @@ from six.moves import range
|
||||
import swift
|
||||
from swift.common import utils, swob, exceptions
|
||||
from swift.common.exceptions import ChunkWriteTimeout
|
||||
from swift.common.utils import Timestamp
|
||||
from swift.common.utils import Timestamp, list_from_csv
|
||||
from swift.proxy import server as proxy_server
|
||||
from swift.proxy.controllers import obj
|
||||
from swift.proxy.controllers.base import \
|
||||
@ -787,6 +787,111 @@ class BaseObjectControllerMixin(object):
|
||||
n_expected_updates, self.replicas(policy))
|
||||
self.assertEqual(n_expected_updates, n_container_updates)
|
||||
|
||||
def test_delete_at_backend_requests(self):
|
||||
t = str(int(time.time() + 100))
|
||||
for policy in POLICIES:
|
||||
req = swift.common.swob.Request.blank(
|
||||
'/v1/a/c/o', method='PUT',
|
||||
headers={'Content-Length': '0',
|
||||
'X-Backend-Storage-Policy-Index': int(policy),
|
||||
'X-Delete-At': t})
|
||||
controller = self.controller_cls(self.app, 'a', 'c', 'o')
|
||||
|
||||
for num_del_at_nodes in range(1, 16):
|
||||
containers = [
|
||||
{'ip': '2.0.0.%s' % i, 'port': '70%s' % str(i).zfill(2),
|
||||
'device': 'sdc'} for i in range(num_del_at_nodes)]
|
||||
del_at_nodes = [
|
||||
{'ip': '1.0.0.%s' % i, 'port': '60%s' % str(i).zfill(2),
|
||||
'device': 'sdb'} for i in range(num_del_at_nodes)]
|
||||
|
||||
backend_headers = controller._backend_requests(
|
||||
req, self.replicas(policy), 1, containers,
|
||||
delete_at_container='dac', delete_at_partition=2,
|
||||
delete_at_nodes=del_at_nodes)
|
||||
|
||||
devices = []
|
||||
hosts = []
|
||||
part = ctr = 0
|
||||
for given_headers in backend_headers:
|
||||
self.assertEqual(given_headers.get('X-Delete-At'), t)
|
||||
if 'X-Delete-At-Partition' in given_headers:
|
||||
self.assertEqual(
|
||||
given_headers.get('X-Delete-At-Partition'), '2')
|
||||
part += 1
|
||||
if 'X-Delete-At-Container' in given_headers:
|
||||
self.assertEqual(
|
||||
given_headers.get('X-Delete-At-Container'), 'dac')
|
||||
ctr += 1
|
||||
devices += (
|
||||
list_from_csv(given_headers.get('X-Delete-At-Device')))
|
||||
hosts += (
|
||||
list_from_csv(given_headers.get('X-Delete-At-Host')))
|
||||
|
||||
# same as in test_container_update_backend_requests
|
||||
n_can_fail = self.replicas(policy) - self.quorum(policy)
|
||||
n_expected_updates = (
|
||||
n_can_fail + utils.quorum_size(num_del_at_nodes))
|
||||
|
||||
n_expected_hosts = max(
|
||||
n_expected_updates, num_del_at_nodes)
|
||||
|
||||
self.assertEqual(len(hosts), n_expected_hosts)
|
||||
self.assertEqual(len(devices), n_expected_hosts)
|
||||
|
||||
# parts don't get doubled up, maximum is count of obj requests
|
||||
n_expected_parts = min(
|
||||
n_expected_hosts, self.replicas(policy))
|
||||
self.assertEqual(part, n_expected_parts)
|
||||
self.assertEqual(ctr, n_expected_parts)
|
||||
|
||||
# check that hosts are correct
|
||||
self.assertEqual(
|
||||
set(hosts),
|
||||
set('%s:%s' % (h['ip'], h['port']) for h in del_at_nodes))
|
||||
self.assertEqual(set(devices), set(('sdb',)))
|
||||
|
||||
def test_smooth_distributed_backend_requests(self):
|
||||
t = str(int(time.time() + 100))
|
||||
for policy in POLICIES:
|
||||
req = swift.common.swob.Request.blank(
|
||||
'/v1/a/c/o', method='PUT',
|
||||
headers={'Content-Length': '0',
|
||||
'X-Backend-Storage-Policy-Index': int(policy),
|
||||
'X-Delete-At': t})
|
||||
controller = self.controller_cls(self.app, 'a', 'c', 'o')
|
||||
|
||||
for num_containers in range(1, 16):
|
||||
containers = [
|
||||
{'ip': '2.0.0.%s' % i, 'port': '70%s' % str(i).zfill(2),
|
||||
'device': 'sdc'} for i in range(num_containers)]
|
||||
del_at_nodes = [
|
||||
{'ip': '1.0.0.%s' % i, 'port': '60%s' % str(i).zfill(2),
|
||||
'device': 'sdb'} for i in range(num_containers)]
|
||||
|
||||
backend_headers = controller._backend_requests(
|
||||
req, self.replicas(policy), 1, containers,
|
||||
delete_at_container='dac', delete_at_partition=2,
|
||||
delete_at_nodes=del_at_nodes)
|
||||
|
||||
# caculate no of expected updates, see
|
||||
# test_container_update_backend_requests for explanation
|
||||
n_expected_updates = min(max(
|
||||
self.replicas(policy) - self.quorum(policy) +
|
||||
utils.quorum_size(num_containers), num_containers),
|
||||
self.replicas(policy))
|
||||
|
||||
# the first n_expected_updates servers should have received
|
||||
# a container update
|
||||
self.assertTrue(
|
||||
all([h.get('X-Container-Partition')
|
||||
for h in backend_headers[:n_expected_updates]]))
|
||||
# the last n_expected_updates servers should have received
|
||||
# the x-delete-at* headers
|
||||
self.assertTrue(
|
||||
all([h.get('X-Delete-At-Container')
|
||||
for h in backend_headers[-n_expected_updates:]]))
|
||||
|
||||
def _check_write_affinity(
|
||||
self, conf, policy_conf, policy, affinity_regions, affinity_count):
|
||||
conf['policy_config'] = policy_conf
|
||||
|
@ -5486,9 +5486,9 @@ class TestReplicatedObjectController(
|
||||
'X-Delete-At-Partition': '0',
|
||||
'X-Delete-At-Device': 'sdb'},
|
||||
{'X-Delete-At-Host': None,
|
||||
'X-Delete-At-Container': None,
|
||||
'X-Delete-At-Partition': None,
|
||||
'X-Delete-At-Device': None}
|
||||
'X-Delete-At-Container': None,
|
||||
'X-Delete-At-Device': None},
|
||||
])
|
||||
|
||||
@mock.patch('time.time', new=lambda: STATIC_TIME)
|
||||
|
Loading…
x
Reference in New Issue
Block a user