Merge "Send correct number of X-Delete-At-* headers"
This commit is contained in:
commit
9e6e9fd1bf
@ -391,15 +391,10 @@ class ObjectController(BaseStorageServer):
|
||||
if op != 'DELETE':
|
||||
delete_at_container = headers_in.get('X-Delete-At-Container', None)
|
||||
if not delete_at_container:
|
||||
self.logger.warning(
|
||||
'X-Delete-At-Container header must be specified for '
|
||||
'expiring objects background %s to work properly. Making '
|
||||
'best guess as to the container name for now.' % op)
|
||||
# TODO(gholt): In a future release, change the above warning to
|
||||
# a raised exception and remove the guess code below.
|
||||
delete_at_container = get_expirer_container(
|
||||
delete_at, self.expiring_objects_container_divisor,
|
||||
account, container, obj)
|
||||
# If header is missing, no update needed as sufficient other
|
||||
# object servers should perform the required update.
|
||||
return
|
||||
|
||||
partition = headers_in.get('X-Delete-At-Partition', None)
|
||||
hosts = headers_in.get('X-Delete-At-Host', '')
|
||||
contdevices = headers_in.get('X-Delete-At-Device', '')
|
||||
|
@ -325,15 +325,30 @@ class BaseObjectController(Controller):
|
||||
headers[index].get('X-Container-Device'),
|
||||
container['device'])
|
||||
|
||||
def set_delete_at_headers(index, delete_at_node):
|
||||
headers[index]['X-Delete-At-Container'] = delete_at_container
|
||||
headers[index]['X-Delete-At-Partition'] = delete_at_partition
|
||||
headers[index]['X-Delete-At-Host'] = csv_append(
|
||||
headers[index].get('X-Delete-At-Host'),
|
||||
'%(ip)s:%(port)s' % delete_at_node)
|
||||
headers[index]['X-Delete-At-Device'] = csv_append(
|
||||
headers[index].get('X-Delete-At-Device'),
|
||||
delete_at_node['device'])
|
||||
|
||||
n_updates_needed = num_container_updates(
|
||||
len(containers), quorum_size(len(containers)),
|
||||
n_outgoing, policy.quorum)
|
||||
|
||||
container_iter = itertools.cycle(containers)
|
||||
dan_iter = itertools.cycle(delete_at_nodes or [])
|
||||
existing_updates = 0
|
||||
while existing_updates < n_updates_needed:
|
||||
set_container_update(existing_updates % n_outgoing,
|
||||
next(container_iter))
|
||||
index = existing_updates % n_outgoing
|
||||
set_container_update(index, next(container_iter))
|
||||
if delete_at_nodes:
|
||||
# We reverse the index in order to distribute the updates
|
||||
# across all nodes.
|
||||
set_delete_at_headers(n_outgoing - 1 - index, next(dan_iter))
|
||||
existing_updates += 1
|
||||
|
||||
# Keep the number of expirer-queue deletes to a reasonable number.
|
||||
@ -360,18 +375,6 @@ class BaseObjectController(Controller):
|
||||
headers[i]['X-Backend-Clean-Expiring-Object-Queue'] = (
|
||||
't' if i < n_desired_queue_updates else 'f')
|
||||
|
||||
for i, node in enumerate(delete_at_nodes or []):
|
||||
i = i % len(headers)
|
||||
|
||||
headers[i]['X-Delete-At-Container'] = delete_at_container
|
||||
headers[i]['X-Delete-At-Partition'] = delete_at_partition
|
||||
headers[i]['X-Delete-At-Host'] = csv_append(
|
||||
headers[i].get('X-Delete-At-Host'),
|
||||
'%(ip)s:%(port)s' % node)
|
||||
headers[i]['X-Delete-At-Device'] = csv_append(
|
||||
headers[i].get('X-Delete-At-Device'),
|
||||
node['device'])
|
||||
|
||||
return headers
|
||||
|
||||
def _get_conn_response(self, putter, path, logger_thread_locals,
|
||||
|
@ -5612,11 +5612,7 @@ class TestObjectController(unittest.TestCase):
|
||||
'X-Backend-Storage-Policy-Index': int(policy)})
|
||||
self.object_controller.delete_at_update('PUT', 2, 'a', 'c', 'o',
|
||||
req, 'sda1', policy)
|
||||
self.assertEqual(
|
||||
self.logger.get_lines_for_level('warning'),
|
||||
['X-Delete-At-Container header must be specified for expiring '
|
||||
'objects background PUT to work properly. Making best guess as '
|
||||
'to the container name for now.'])
|
||||
self.assertEqual(given_args, [])
|
||||
|
||||
def test_delete_at_update_delete(self):
|
||||
policy = random.choice(list(POLICIES))
|
||||
|
@ -34,7 +34,7 @@ from six.moves import range
|
||||
import swift
|
||||
from swift.common import utils, swob, exceptions
|
||||
from swift.common.exceptions import ChunkWriteTimeout
|
||||
from swift.common.utils import Timestamp
|
||||
from swift.common.utils import Timestamp, list_from_csv
|
||||
from swift.proxy import server as proxy_server
|
||||
from swift.proxy.controllers import obj
|
||||
from swift.proxy.controllers.base import \
|
||||
@ -787,6 +787,111 @@ class BaseObjectControllerMixin(object):
|
||||
n_expected_updates, self.replicas(policy))
|
||||
self.assertEqual(n_expected_updates, n_container_updates)
|
||||
|
||||
def test_delete_at_backend_requests(self):
|
||||
t = str(int(time.time() + 100))
|
||||
for policy in POLICIES:
|
||||
req = swift.common.swob.Request.blank(
|
||||
'/v1/a/c/o', method='PUT',
|
||||
headers={'Content-Length': '0',
|
||||
'X-Backend-Storage-Policy-Index': int(policy),
|
||||
'X-Delete-At': t})
|
||||
controller = self.controller_cls(self.app, 'a', 'c', 'o')
|
||||
|
||||
for num_del_at_nodes in range(1, 16):
|
||||
containers = [
|
||||
{'ip': '2.0.0.%s' % i, 'port': '70%s' % str(i).zfill(2),
|
||||
'device': 'sdc'} for i in range(num_del_at_nodes)]
|
||||
del_at_nodes = [
|
||||
{'ip': '1.0.0.%s' % i, 'port': '60%s' % str(i).zfill(2),
|
||||
'device': 'sdb'} for i in range(num_del_at_nodes)]
|
||||
|
||||
backend_headers = controller._backend_requests(
|
||||
req, self.replicas(policy), 1, containers,
|
||||
delete_at_container='dac', delete_at_partition=2,
|
||||
delete_at_nodes=del_at_nodes)
|
||||
|
||||
devices = []
|
||||
hosts = []
|
||||
part = ctr = 0
|
||||
for given_headers in backend_headers:
|
||||
self.assertEqual(given_headers.get('X-Delete-At'), t)
|
||||
if 'X-Delete-At-Partition' in given_headers:
|
||||
self.assertEqual(
|
||||
given_headers.get('X-Delete-At-Partition'), '2')
|
||||
part += 1
|
||||
if 'X-Delete-At-Container' in given_headers:
|
||||
self.assertEqual(
|
||||
given_headers.get('X-Delete-At-Container'), 'dac')
|
||||
ctr += 1
|
||||
devices += (
|
||||
list_from_csv(given_headers.get('X-Delete-At-Device')))
|
||||
hosts += (
|
||||
list_from_csv(given_headers.get('X-Delete-At-Host')))
|
||||
|
||||
# same as in test_container_update_backend_requests
|
||||
n_can_fail = self.replicas(policy) - self.quorum(policy)
|
||||
n_expected_updates = (
|
||||
n_can_fail + utils.quorum_size(num_del_at_nodes))
|
||||
|
||||
n_expected_hosts = max(
|
||||
n_expected_updates, num_del_at_nodes)
|
||||
|
||||
self.assertEqual(len(hosts), n_expected_hosts)
|
||||
self.assertEqual(len(devices), n_expected_hosts)
|
||||
|
||||
# parts don't get doubled up, maximum is count of obj requests
|
||||
n_expected_parts = min(
|
||||
n_expected_hosts, self.replicas(policy))
|
||||
self.assertEqual(part, n_expected_parts)
|
||||
self.assertEqual(ctr, n_expected_parts)
|
||||
|
||||
# check that hosts are correct
|
||||
self.assertEqual(
|
||||
set(hosts),
|
||||
set('%s:%s' % (h['ip'], h['port']) for h in del_at_nodes))
|
||||
self.assertEqual(set(devices), set(('sdb',)))
|
||||
|
||||
def test_smooth_distributed_backend_requests(self):
|
||||
t = str(int(time.time() + 100))
|
||||
for policy in POLICIES:
|
||||
req = swift.common.swob.Request.blank(
|
||||
'/v1/a/c/o', method='PUT',
|
||||
headers={'Content-Length': '0',
|
||||
'X-Backend-Storage-Policy-Index': int(policy),
|
||||
'X-Delete-At': t})
|
||||
controller = self.controller_cls(self.app, 'a', 'c', 'o')
|
||||
|
||||
for num_containers in range(1, 16):
|
||||
containers = [
|
||||
{'ip': '2.0.0.%s' % i, 'port': '70%s' % str(i).zfill(2),
|
||||
'device': 'sdc'} for i in range(num_containers)]
|
||||
del_at_nodes = [
|
||||
{'ip': '1.0.0.%s' % i, 'port': '60%s' % str(i).zfill(2),
|
||||
'device': 'sdb'} for i in range(num_containers)]
|
||||
|
||||
backend_headers = controller._backend_requests(
|
||||
req, self.replicas(policy), 1, containers,
|
||||
delete_at_container='dac', delete_at_partition=2,
|
||||
delete_at_nodes=del_at_nodes)
|
||||
|
||||
# caculate no of expected updates, see
|
||||
# test_container_update_backend_requests for explanation
|
||||
n_expected_updates = min(max(
|
||||
self.replicas(policy) - self.quorum(policy) +
|
||||
utils.quorum_size(num_containers), num_containers),
|
||||
self.replicas(policy))
|
||||
|
||||
# the first n_expected_updates servers should have received
|
||||
# a container update
|
||||
self.assertTrue(
|
||||
all([h.get('X-Container-Partition')
|
||||
for h in backend_headers[:n_expected_updates]]))
|
||||
# the last n_expected_updates servers should have received
|
||||
# the x-delete-at* headers
|
||||
self.assertTrue(
|
||||
all([h.get('X-Delete-At-Container')
|
||||
for h in backend_headers[-n_expected_updates:]]))
|
||||
|
||||
def _check_write_affinity(
|
||||
self, conf, policy_conf, policy, affinity_regions, affinity_count):
|
||||
conf['policy_config'] = policy_conf
|
||||
|
@ -5486,9 +5486,9 @@ class TestReplicatedObjectController(
|
||||
'X-Delete-At-Partition': '0',
|
||||
'X-Delete-At-Device': 'sdb'},
|
||||
{'X-Delete-At-Host': None,
|
||||
'X-Delete-At-Container': None,
|
||||
'X-Delete-At-Partition': None,
|
||||
'X-Delete-At-Device': None}
|
||||
'X-Delete-At-Container': None,
|
||||
'X-Delete-At-Device': None},
|
||||
])
|
||||
|
||||
@mock.patch('time.time', new=lambda: STATIC_TIME)
|
||||
|
Loading…
x
Reference in New Issue
Block a user