Add Storage Policy support to the Account Reaper
Extract X-Storage-Policy-Index header from container listing request and use it when making direct object DELETE requests. DocImpact Implements: blueprint storage-policies Change-Id: Icd4b2611b4169e46f216ff9a9839af732971a2bf
This commit is contained in:
parent
8326dc9f2a
commit
2e1ea825aa
@ -31,6 +31,7 @@ from swift.common.ring import Ring
|
||||
from swift.common.utils import get_logger, whataremyips, ismount, \
|
||||
config_true_value
|
||||
from swift.common.daemon import Daemon
|
||||
from swift.common.storage_policy import POLICIES, POLICY_INDEX
|
||||
|
||||
|
||||
class AccountReaper(Daemon):
|
||||
@ -54,9 +55,9 @@ class AccountReaper(Daemon):
|
||||
configuration parameters.
|
||||
"""
|
||||
|
||||
def __init__(self, conf):
|
||||
def __init__(self, conf, logger=None):
|
||||
self.conf = conf
|
||||
self.logger = get_logger(conf, log_route='account-reaper')
|
||||
self.logger = logger or get_logger(conf, log_route='account-reaper')
|
||||
self.devices = conf.get('devices', '/srv/node')
|
||||
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
|
||||
self.interval = int(conf.get('interval', 3600))
|
||||
@ -89,11 +90,14 @@ class AccountReaper(Daemon):
|
||||
self.container_ring = Ring(self.swift_dir, ring_name='container')
|
||||
return self.container_ring
|
||||
|
||||
def get_object_ring(self):
|
||||
"""The object :class:`swift.common.ring.Ring` for the cluster."""
|
||||
if not self.object_ring:
|
||||
self.object_ring = Ring(self.swift_dir, ring_name='object')
|
||||
return self.object_ring
|
||||
def get_object_ring(self, policy_idx):
|
||||
"""
|
||||
Get the ring identified by the policy index
|
||||
|
||||
:param policy_idx: Storage policy index
|
||||
:returns: A ring matching the storage policy
|
||||
"""
|
||||
return POLICIES.get_object_ring(policy_idx, self.swift_dir)
|
||||
|
||||
def run_forever(self, *args, **kwargs):
|
||||
"""Main entry point when running the reaper in normal daemon mode.
|
||||
@ -177,6 +181,15 @@ class AccountReaper(Daemon):
|
||||
not broker.empty():
|
||||
self.reap_account(broker, partition, nodes)
|
||||
|
||||
def reset_stats(self):
|
||||
self.stats_return_codes = {}
|
||||
self.stats_containers_deleted = 0
|
||||
self.stats_objects_deleted = 0
|
||||
self.stats_containers_remaining = 0
|
||||
self.stats_objects_remaining = 0
|
||||
self.stats_containers_possibly_remaining = 0
|
||||
self.stats_objects_possibly_remaining = 0
|
||||
|
||||
def reap_account(self, broker, partition, nodes):
|
||||
"""
|
||||
Called once per pass for each account this server is the primary for
|
||||
@ -220,13 +233,7 @@ class AccountReaper(Daemon):
|
||||
return False
|
||||
account = info['account']
|
||||
self.logger.info(_('Beginning pass on account %s'), account)
|
||||
self.stats_return_codes = {}
|
||||
self.stats_containers_deleted = 0
|
||||
self.stats_objects_deleted = 0
|
||||
self.stats_containers_remaining = 0
|
||||
self.stats_objects_remaining = 0
|
||||
self.stats_containers_possibly_remaining = 0
|
||||
self.stats_objects_possibly_remaining = 0
|
||||
self.reset_stats()
|
||||
try:
|
||||
marker = ''
|
||||
while True:
|
||||
@ -324,11 +331,11 @@ class AccountReaper(Daemon):
|
||||
while True:
|
||||
objects = None
|
||||
try:
|
||||
objects = direct_get_container(
|
||||
headers, objects = direct_get_container(
|
||||
node, part, account, container,
|
||||
marker=marker,
|
||||
conn_timeout=self.conn_timeout,
|
||||
response_timeout=self.node_timeout)[1]
|
||||
response_timeout=self.node_timeout)
|
||||
self.stats_return_codes[2] = \
|
||||
self.stats_return_codes.get(2, 0) + 1
|
||||
self.logger.increment('return_codes.2')
|
||||
@ -343,11 +350,12 @@ class AccountReaper(Daemon):
|
||||
if not objects:
|
||||
break
|
||||
try:
|
||||
policy_index = headers.get(POLICY_INDEX, 0)
|
||||
for obj in objects:
|
||||
if isinstance(obj['name'], unicode):
|
||||
obj['name'] = obj['name'].encode('utf8')
|
||||
pool.spawn(self.reap_object, account, container, part,
|
||||
nodes, obj['name'])
|
||||
nodes, obj['name'], policy_index)
|
||||
pool.waitall()
|
||||
except (Exception, Timeout):
|
||||
self.logger.exception(_('Exception with objects for container '
|
||||
@ -396,7 +404,7 @@ class AccountReaper(Daemon):
|
||||
self.logger.increment('containers_possibly_remaining')
|
||||
|
||||
def reap_object(self, account, container, container_partition,
|
||||
container_nodes, obj):
|
||||
container_nodes, obj, policy_index):
|
||||
"""
|
||||
Deletes the given object by issuing a delete request to each node for
|
||||
the object. The format of the delete request is such that each object
|
||||
@ -412,12 +420,14 @@ class AccountReaper(Daemon):
|
||||
container ring.
|
||||
:param container_nodes: The primary node dicts for the container.
|
||||
:param obj: The name of the object to delete.
|
||||
:param policy_index: The storage policy index of the object's container
|
||||
|
||||
* See also: :func:`swift.common.ring.Ring.get_nodes` for a description
|
||||
of the container node dicts.
|
||||
"""
|
||||
container_nodes = list(container_nodes)
|
||||
part, nodes = self.get_object_ring().get_nodes(account, container, obj)
|
||||
ring = self.get_object_ring(policy_index)
|
||||
part, nodes = ring.get_nodes(account, container, obj)
|
||||
successes = 0
|
||||
failures = 0
|
||||
for node in nodes:
|
||||
@ -429,7 +439,8 @@ class AccountReaper(Daemon):
|
||||
response_timeout=self.node_timeout,
|
||||
headers={'X-Container-Host': '%(ip)s:%(port)s' % cnode,
|
||||
'X-Container-Partition': str(container_partition),
|
||||
'X-Container-Device': cnode['device']})
|
||||
'X-Container-Device': cnode['device'],
|
||||
POLICY_INDEX: policy_index})
|
||||
successes += 1
|
||||
self.stats_return_codes[2] = \
|
||||
self.stats_return_codes.get(2, 0) + 1
|
||||
|
95
test/probe/test_account_reaper.py
Normal file
95
test/probe/test_account_reaper.py
Normal file
@ -0,0 +1,95 @@
|
||||
#!/usr/bin/python -u
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import unittest
|
||||
import uuid
|
||||
|
||||
from swiftclient import client
|
||||
|
||||
from swift.common.storage_policy import POLICIES
|
||||
from swift.common.manager import Manager
|
||||
from swift.common.direct_client import direct_delete_account, \
|
||||
direct_get_object, direct_head_container, ClientException
|
||||
from test.probe.common import kill_servers, reset_environment, \
|
||||
get_to_final_state
|
||||
|
||||
|
||||
class TestAccountReaper(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
(self.pids, self.port2server, self.account_ring, self.container_ring,
|
||||
self.object_ring, self.policy, self.url, self.token,
|
||||
self.account, self.configs) = reset_environment()
|
||||
|
||||
def tearDown(self):
|
||||
kill_servers(self.port2server, self.pids)
|
||||
|
||||
def test_sync(self):
|
||||
all_objects = []
|
||||
# upload some containers
|
||||
for policy in POLICIES:
|
||||
container = 'container-%s-%s' % (policy.name, uuid.uuid4())
|
||||
client.put_container(self.url, self.token, container,
|
||||
headers={'X-Storage-Policy': policy.name})
|
||||
obj = 'object-%s' % uuid.uuid4()
|
||||
body = 'test-body'
|
||||
client.put_object(self.url, self.token, container, obj, body)
|
||||
all_objects.append((policy, container, obj))
|
||||
|
||||
Manager(['container-updater']).once()
|
||||
|
||||
headers = client.head_account(self.url, self.token)
|
||||
|
||||
self.assertEqual(int(headers['x-account-container-count']),
|
||||
len(POLICIES))
|
||||
self.assertEqual(int(headers['x-account-object-count']),
|
||||
len(POLICIES))
|
||||
self.assertEqual(int(headers['x-account-bytes-used']),
|
||||
len(POLICIES) * len(body))
|
||||
|
||||
part, nodes = self.account_ring.get_nodes(self.account)
|
||||
for node in nodes:
|
||||
direct_delete_account(node, part, self.account)
|
||||
|
||||
Manager(['account-reaper']).once()
|
||||
|
||||
get_to_final_state()
|
||||
|
||||
for policy, container, obj in all_objects:
|
||||
cpart, cnodes = self.container_ring.get_nodes(
|
||||
self.account, container)
|
||||
for cnode in cnodes:
|
||||
try:
|
||||
direct_head_container(cnode, cpart, self.account,
|
||||
container)
|
||||
except ClientException as err:
|
||||
self.assertEquals(err.http_status, 404)
|
||||
else:
|
||||
self.fail('Found un-reaped /%s/%s on %r' %
|
||||
(self.account, container, node))
|
||||
object_ring = POLICIES.get_object_ring(policy.idx, '/etc/swift/')
|
||||
part, nodes = object_ring.get_nodes(self.account, container, obj)
|
||||
for node in nodes:
|
||||
try:
|
||||
direct_get_object(node, part, self.account,
|
||||
container, obj)
|
||||
except ClientException as err:
|
||||
self.assertEquals(err.http_status, 404)
|
||||
else:
|
||||
self.fail('Found un-reaped /%s/%s/%s on %r in %s!' %
|
||||
(self.account, container, obj, node, policy))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
@ -15,12 +15,13 @@
|
||||
|
||||
import os
|
||||
import time
|
||||
import random
|
||||
import shutil
|
||||
import tempfile
|
||||
import unittest
|
||||
|
||||
from logging import DEBUG
|
||||
from mock import patch
|
||||
from mock import patch, call, DEFAULT
|
||||
from contextlib import nested
|
||||
|
||||
from swift.account import reaper
|
||||
@ -28,6 +29,9 @@ from swift.account.backend import DATADIR
|
||||
from swift.common.exceptions import ClientException
|
||||
from swift.common.utils import normalize_timestamp
|
||||
|
||||
from test import unit
|
||||
from swift.common.storage_policy import StoragePolicy, POLICIES, POLICY_INDEX
|
||||
|
||||
|
||||
class FakeLogger(object):
|
||||
def __init__(self, *args, **kwargs):
|
||||
@ -109,6 +113,7 @@ class FakeRing(object):
|
||||
def get_part_nodes(self, *args, **kwargs):
|
||||
return self.nodes
|
||||
|
||||
|
||||
acc_nodes = [{'device': 'sda1',
|
||||
'ip': '',
|
||||
'port': ''},
|
||||
@ -130,6 +135,10 @@ cont_nodes = [{'device': 'sda1',
|
||||
'port': ''}]
|
||||
|
||||
|
||||
@unit.patch_policies([StoragePolicy(0, 'zero', False,
|
||||
object_ring=unit.FakeRing()),
|
||||
StoragePolicy(1, 'one', True,
|
||||
object_ring=unit.FakeRing())])
|
||||
class TestReaper(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
@ -150,9 +159,6 @@ class TestReaper(unittest.TestCase):
|
||||
self.amount_fail += 1
|
||||
raise self.myexp
|
||||
|
||||
def fake_object_ring(self):
|
||||
return FakeRing()
|
||||
|
||||
def fake_direct_delete_container(self, *args, **kwargs):
|
||||
if self.amount_delete_fail < self.max_delete_fail:
|
||||
self.amount_delete_fail += 1
|
||||
@ -265,30 +271,81 @@ class TestReaper(unittest.TestCase):
|
||||
reaper.time = time_orig
|
||||
|
||||
def test_reap_object(self):
|
||||
r = self.init_reaper({}, fakelogger=True)
|
||||
self.amount_fail = 0
|
||||
self.max_fail = 0
|
||||
with patch('swift.account.reaper.AccountReaper.get_object_ring',
|
||||
self.fake_object_ring):
|
||||
with patch('swift.account.reaper.direct_delete_object',
|
||||
self.fake_direct_delete_object):
|
||||
r.reap_object('a', 'c', 'partition', cont_nodes, 'o')
|
||||
self.assertEqual(r.stats_objects_deleted, 3)
|
||||
conf = {
|
||||
'mount_check': 'false',
|
||||
}
|
||||
r = reaper.AccountReaper(conf, logger=unit.debug_logger())
|
||||
ring = unit.FakeRing()
|
||||
mock_path = 'swift.account.reaper.direct_delete_object'
|
||||
for policy in POLICIES:
|
||||
r.reset_stats()
|
||||
with patch(mock_path) as fake_direct_delete:
|
||||
r.reap_object('a', 'c', 'partition', cont_nodes, 'o',
|
||||
policy.idx)
|
||||
for i, call_args in enumerate(
|
||||
fake_direct_delete.call_args_list):
|
||||
cnode = cont_nodes[i]
|
||||
host = '%(ip)s:%(port)s' % cnode
|
||||
device = cnode['device']
|
||||
headers = {
|
||||
'X-Container-Host': host,
|
||||
'X-Container-Partition': 'partition',
|
||||
'X-Container-Device': device,
|
||||
POLICY_INDEX: policy.idx
|
||||
}
|
||||
ring = r.get_object_ring(policy.idx)
|
||||
expected = call(ring.devs[i], 1, 'a', 'c', 'o',
|
||||
headers=headers, conn_timeout=0.5,
|
||||
response_timeout=10)
|
||||
self.assertEqual(call_args, expected)
|
||||
self.assertEqual(r.stats_objects_deleted, 3)
|
||||
|
||||
def test_reap_object_fail(self):
|
||||
r = self.init_reaper({}, fakelogger=True)
|
||||
self.amount_fail = 0
|
||||
self.max_fail = 1
|
||||
ctx = [patch('swift.account.reaper.AccountReaper.get_object_ring',
|
||||
self.fake_object_ring),
|
||||
patch('swift.account.reaper.direct_delete_object',
|
||||
self.fake_direct_delete_object)]
|
||||
with nested(*ctx):
|
||||
r.reap_object('a', 'c', 'partition', cont_nodes, 'o')
|
||||
policy = random.choice(list(POLICIES))
|
||||
with patch('swift.account.reaper.direct_delete_object',
|
||||
self.fake_direct_delete_object):
|
||||
r.reap_object('a', 'c', 'partition', cont_nodes, 'o',
|
||||
policy.idx)
|
||||
self.assertEqual(r.stats_objects_deleted, 1)
|
||||
self.assertEqual(r.stats_objects_remaining, 1)
|
||||
self.assertEqual(r.stats_objects_possibly_remaining, 1)
|
||||
|
||||
@patch('swift.account.reaper.Ring',
|
||||
lambda *args, **kwargs: unit.FakeRing())
|
||||
def test_reap_container(self):
|
||||
policy = random.choice(list(POLICIES))
|
||||
r = self.init_reaper({}, fakelogger=True)
|
||||
with patch.multiple('swift.account.reaper',
|
||||
direct_get_container=DEFAULT,
|
||||
direct_delete_object=DEFAULT,
|
||||
direct_delete_container=DEFAULT) as mocks:
|
||||
headers = {POLICY_INDEX: policy.idx}
|
||||
obj_listing = [{'name': 'o'}]
|
||||
|
||||
def fake_get_container(*args, **kwargs):
|
||||
try:
|
||||
obj = obj_listing.pop(0)
|
||||
except IndexError:
|
||||
obj_list = []
|
||||
else:
|
||||
obj_list = [obj]
|
||||
return headers, obj_list
|
||||
|
||||
mocks['direct_get_container'].side_effect = fake_get_container
|
||||
r.reap_container('a', 'partition', acc_nodes, 'c')
|
||||
mock_calls = mocks['direct_delete_object'].call_args_list
|
||||
self.assertEqual(3, len(mock_calls))
|
||||
for call_args in mock_calls:
|
||||
_args, kwargs = call_args
|
||||
self.assertEqual(kwargs['headers'][POLICY_INDEX],
|
||||
policy.idx)
|
||||
|
||||
self.assertEquals(mocks['direct_delete_container'].call_count, 3)
|
||||
self.assertEqual(r.stats_objects_deleted, 3)
|
||||
|
||||
def test_reap_container_get_object_fail(self):
|
||||
r = self.init_reaper({}, fakelogger=True)
|
||||
self.get_fail = True
|
||||
|
Loading…
x
Reference in New Issue
Block a user