change ite logic and add more tests
This commit is contained in:
parent
61e53372be
commit
5482035bc0
@ -287,6 +287,40 @@ def direct_put_object(node, part, account, container, name, contents,
|
||||
return resp.getheader('etag').strip('"')
|
||||
|
||||
|
||||
def direct_post_object(node, part, account, container, name, headers,
|
||||
conn_timeout=5, response_timeout=15):
|
||||
"""
|
||||
Put object directly from the object server.
|
||||
|
||||
:param node: node dictionary from the ring
|
||||
:param part: partition the container is on
|
||||
:param account: account name
|
||||
:param container: container name
|
||||
:param name: object name
|
||||
:param headers: headers to store as metadata
|
||||
:param conn_timeout: timeout in seconds for establishing the connection
|
||||
:param response_timeout: timeout in seconds for getting the response
|
||||
:raises ClientException: HTTP POST request failed
|
||||
"""
|
||||
path = '/%s/%s/%s' % (account, container, name)
|
||||
headers['X-Timestamp'] = normalize_timestamp(time())
|
||||
with Timeout(conn_timeout):
|
||||
conn = http_connect(node['ip'], node['port'], node['device'], part,
|
||||
'POST', path, headers=headers)
|
||||
with Timeout(response_timeout):
|
||||
resp = conn.getresponse()
|
||||
resp.read()
|
||||
if resp.status < 200 or resp.status >= 300:
|
||||
raise ClientException(
|
||||
'Object server %s:%s direct POST %s gave status %s' %
|
||||
(node['ip'], node['port'],
|
||||
repr('/%s/%s%s' % (node['device'], part, path)),
|
||||
resp.status),
|
||||
http_host=node['ip'], http_port=node['port'],
|
||||
http_device=node['device'], http_status=resp.status,
|
||||
http_reason=resp.reason)
|
||||
|
||||
|
||||
def direct_delete_object(node, part, account, container, obj,
|
||||
conn_timeout=5, response_timeout=15, headers={}):
|
||||
"""
|
||||
|
@ -155,7 +155,6 @@ class AuditorWorker(object):
|
||||
os.path.join(self.devices, device), path)
|
||||
return
|
||||
except Exception:
|
||||
#raise
|
||||
self.errors += 1
|
||||
self.logger.exception(_('ERROR Trying to audit %s'), path)
|
||||
return
|
||||
|
@ -132,7 +132,6 @@ class DiskFile(object):
|
||||
|
||||
def __init__(self, path, device, partition, account, container, obj,
|
||||
logger, keep_data_fp=False, disk_chunk_size=65536):
|
||||
|
||||
self.disk_chunk_size = disk_chunk_size
|
||||
self.name = '/' + '/'.join((account, container, obj))
|
||||
name_hash = hash_path(account, container, obj)
|
||||
@ -146,14 +145,13 @@ class DiskFile(object):
|
||||
self.data_file = None
|
||||
self.fp = None
|
||||
self.iter_etag = None
|
||||
self.last_iter_pos = 0
|
||||
self.started_at_0 = False
|
||||
self.read_to_eof = False
|
||||
self.quarantined_dir = None
|
||||
self.keep_cache = False
|
||||
if not os.path.exists(self.datadir):
|
||||
return
|
||||
files = sorted(os.listdir(self.datadir), reverse=True)
|
||||
|
||||
for file in files:
|
||||
if file.endswith('.ts'):
|
||||
self.data_file = self.meta_file = None
|
||||
@ -183,18 +181,14 @@ class DiskFile(object):
|
||||
try:
|
||||
dropped_cache = 0
|
||||
read = 0
|
||||
self.last_iter_pos = 0
|
||||
self.iter_etag = md5()
|
||||
if self.fp.tell() == 0:
|
||||
self.started_at_0 = True
|
||||
self.iter_etag = md5()
|
||||
while True:
|
||||
pre_read_pos = self.fp.tell()
|
||||
chunk = self.fp.read(self.disk_chunk_size)
|
||||
if chunk:
|
||||
if self.iter_etag and self.last_iter_pos == pre_read_pos:
|
||||
if self.iter_etag:
|
||||
self.iter_etag.update(chunk)
|
||||
self.last_iter_pos += len(chunk)
|
||||
else:
|
||||
# file has not been read sequentially
|
||||
self.iter_etag = None
|
||||
read += len(chunk)
|
||||
if read - dropped_cache > (1024 * 1024):
|
||||
self.drop_cache(self.fp.fileno(), dropped_cache,
|
||||
@ -228,7 +222,6 @@ class DiskFile(object):
|
||||
|
||||
def _handle_close_quarantine(self):
|
||||
"""Check if file needs to be quarantined"""
|
||||
obj_size = None
|
||||
try:
|
||||
obj_size = self.get_data_file_size()
|
||||
except DiskFileError, e:
|
||||
@ -237,17 +230,17 @@ class DiskFile(object):
|
||||
except DiskFileNotExist:
|
||||
return
|
||||
|
||||
if (self.iter_etag and self.read_to_eof and self.metadata.get('ETag')
|
||||
and obj_size == self.last_iter_pos and
|
||||
self.iter_etag.hexdigest() != self.metadata['ETag']):
|
||||
if (self.iter_etag and self.started_at_0 and self.read_to_eof and
|
||||
self.metadata.has_key('ETag') and
|
||||
self.iter_etag.hexdigest() != self.metadata.get('ETag')):
|
||||
self.quarantine()
|
||||
|
||||
def close(self, verify_file=True):
|
||||
"""
|
||||
Close the file.
|
||||
Close the file. Will handle quarantining file if necessary.
|
||||
|
||||
:param verify_file: Defaults to True, will handle quarantining
|
||||
file if necessary.
|
||||
:param verify_file: Defaults to True. If false, will not check
|
||||
file to see if it needs quarantining.
|
||||
"""
|
||||
if self.fp:
|
||||
try:
|
||||
@ -463,13 +456,11 @@ class ObjectController(object):
|
||||
response_class = HTTPNotFound
|
||||
else:
|
||||
response_class = HTTPAccepted
|
||||
|
||||
try:
|
||||
file_size = file.get_data_file_size()
|
||||
except (DiskFileError, DiskFileNotExist):
|
||||
file.quarantine()
|
||||
return HTTPNotFound(request=request)
|
||||
|
||||
metadata = {'X-Timestamp': request.headers['x-timestamp']}
|
||||
metadata.update(val for val in request.headers.iteritems()
|
||||
if val[0].lower().startswith('x-object-meta-'))
|
||||
|
@ -42,13 +42,11 @@ class TestObjectFailures(unittest.TestCase):
|
||||
for file in files:
|
||||
return os.path.join(obj_dir, file)
|
||||
|
||||
def run_quarantine(self):
|
||||
container = 'container-%s' % uuid4()
|
||||
obj = 'object-%s' % uuid4()
|
||||
def _setup_data_file(self, container, obj, data):
|
||||
client.put_container(self.url, self.token, container)
|
||||
client.put_object(self.url, self.token, container, obj, 'VERIFY')
|
||||
client.put_object(self.url, self.token, container, obj, data)
|
||||
odata = client.get_object(self.url, self.token, container, obj)[-1]
|
||||
self.assertEquals(odata, 'VERIFY')
|
||||
self.assertEquals(odata, data)
|
||||
opart, onodes = self.object_ring.get_nodes(
|
||||
self.account, container, obj)
|
||||
onode = onodes[0]
|
||||
@ -62,7 +60,14 @@ class TestObjectFailures(unittest.TestCase):
|
||||
device, opart,
|
||||
hash_str[-3:], hash_str)
|
||||
data_file = self._get_data_file_path(obj_dir)
|
||||
return onode, opart, data_file
|
||||
|
||||
|
||||
def run_quarantine(self):
|
||||
container = 'container-%s' % uuid4()
|
||||
obj = 'object-%s' % uuid4()
|
||||
onode, opart, data_file = self._setup_data_file(container, obj,
|
||||
'VERIFY')
|
||||
with open(data_file) as fp:
|
||||
metadata = read_metadata(fp)
|
||||
metadata['ETag'] = 'badetag'
|
||||
@ -82,25 +87,8 @@ class TestObjectFailures(unittest.TestCase):
|
||||
def run_quarantine_range_etag(self):
|
||||
container = 'container-range-%s' % uuid4()
|
||||
obj = 'object-range-%s' % uuid4()
|
||||
client.put_container(self.url, self.token, container)
|
||||
client.put_object(self.url, self.token, container, obj, 'RANGE')
|
||||
odata = client.get_object(self.url, self.token, container, obj)[-1]
|
||||
self.assertEquals(odata, 'RANGE')
|
||||
opart, onodes = self.object_ring.get_nodes(
|
||||
self.account, container, obj)
|
||||
|
||||
onode = onodes[0]
|
||||
node_id = (onode['port'] - 6000) / 10
|
||||
device = onode['device']
|
||||
hash_str = hash_path(self.account, container, obj)
|
||||
|
||||
obj_server_conf = readconf('/etc/swift/object-server/%s.conf' %
|
||||
node_id)
|
||||
devices = obj_server_conf['app:object-server']['devices']
|
||||
obj_dir = '%s/%s/objects/%s/%s/%s/' % (devices,
|
||||
device, opart,
|
||||
hash_str[-3:], hash_str)
|
||||
data_file = self._get_data_file_path(obj_dir)
|
||||
onode, opart, data_file = self._setup_data_file(container, obj,
|
||||
'RANGE')
|
||||
with open(data_file) as fp:
|
||||
metadata = read_metadata(fp)
|
||||
metadata['ETag'] = 'badetag'
|
||||
@ -122,32 +110,10 @@ class TestObjectFailures(unittest.TestCase):
|
||||
except client.ClientException, e:
|
||||
self.assertEquals(e.http_status, 404)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def run_quarantine_range_zero_byte(self):
|
||||
def run_quarantine_zero_byte_get(self):
|
||||
container = 'container-zbyte-%s' % uuid4()
|
||||
obj = 'object-zbyte-%s' % uuid4()
|
||||
client.put_container(self.url, self.token, container)
|
||||
client.put_object(self.url, self.token, container, obj, 'ZBYTE')
|
||||
odata = client.get_object(self.url, self.token, container, obj)[-1]
|
||||
self.assertEquals(odata, 'ZBYTE')
|
||||
opart, onodes = self.object_ring.get_nodes(
|
||||
self.account, container, obj)
|
||||
onode = onodes[0]
|
||||
node_id = (onode['port'] - 6000) / 10
|
||||
device = onode['device']
|
||||
hash_str = hash_path(self.account, container, obj)
|
||||
obj_server_conf = readconf('/etc/swift/object-server/%s.conf' %
|
||||
node_id)
|
||||
devices = obj_server_conf['app:object-server']['devices']
|
||||
obj_dir = '%s/%s/objects/%s/%s/%s/' % (devices,
|
||||
device, opart,
|
||||
hash_str[-3:], hash_str)
|
||||
data_file = self._get_data_file_path(obj_dir)
|
||||
|
||||
onode, opart, data_file = self._setup_data_file(container, obj, 'DATA')
|
||||
with open(data_file) as fp:
|
||||
metadata = read_metadata(fp)
|
||||
os.unlink(data_file)
|
||||
@ -163,11 +129,52 @@ class TestObjectFailures(unittest.TestCase):
|
||||
except client.ClientException, e:
|
||||
self.assertEquals(e.http_status, 404)
|
||||
|
||||
def run_quarantine_zero_byte_head(self):
|
||||
container = 'container-zbyte-%s' % uuid4()
|
||||
obj = 'object-zbyte-%s' % uuid4()
|
||||
onode, opart, data_file = self._setup_data_file(container, obj, 'DATA')
|
||||
with open(data_file) as fp:
|
||||
metadata = read_metadata(fp)
|
||||
os.unlink(data_file)
|
||||
|
||||
with open(data_file,'w') as fp:
|
||||
write_metadata(fp, metadata)
|
||||
try:
|
||||
resp = direct_client.direct_head_object(onode, opart, self.account,
|
||||
container, obj,
|
||||
conn_timeout=1,
|
||||
response_timeout=1)
|
||||
raise "Did not quarantine object"
|
||||
except client.ClientException, e:
|
||||
self.assertEquals(e.http_status, 404)
|
||||
|
||||
def run_quarantine_zero_byte_post(self):
|
||||
container = 'container-zbyte-%s' % uuid4()
|
||||
obj = 'object-zbyte-%s' % uuid4()
|
||||
onode, opart, data_file = self._setup_data_file(container, obj, 'DATA')
|
||||
with open(data_file) as fp:
|
||||
metadata = read_metadata(fp)
|
||||
os.unlink(data_file)
|
||||
|
||||
with open(data_file,'w') as fp:
|
||||
write_metadata(fp, metadata)
|
||||
try:
|
||||
resp = direct_client.direct_post_object(
|
||||
onode, opart, self.account,
|
||||
container, obj,
|
||||
{'X-Object-Meta-1': 'One', 'X-Object-Meta-Two': 'Two'},
|
||||
conn_timeout=1,
|
||||
response_timeout=1)
|
||||
raise "Did not quarantine object"
|
||||
except client.ClientException, e:
|
||||
self.assertEquals(e.http_status, 404)
|
||||
|
||||
def test_runner(self):
|
||||
self.run_quarantine()
|
||||
self.run_quarantine_range_etag()
|
||||
self.run_quarantine_range_zero_byte()
|
||||
self.run_quarantine_zero_byte_get()
|
||||
self.run_quarantine_zero_byte_head()
|
||||
self.run_quarantine_zero_byte_post()
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -104,7 +104,7 @@ class TestAuditor(unittest.TestCase):
|
||||
}
|
||||
self.disk_file.put(fd, tmppath, metadata)
|
||||
pre_quarantines = self.auditor.quarantines
|
||||
# remake to it will have metadata
|
||||
# remake so it will have metadata
|
||||
self.disk_file = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o',
|
||||
self.logger)
|
||||
|
||||
|
@ -25,7 +25,6 @@ import fcntl
|
||||
import time
|
||||
import tempfile
|
||||
from contextlib import contextmanager
|
||||
from eventlet import tpool
|
||||
from eventlet.green import subprocess
|
||||
from test.unit import FakeLogger
|
||||
from swift.common import utils
|
||||
|
@ -37,7 +37,7 @@ from swift.obj import server as object_server
|
||||
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
|
||||
NullLogger, storage_directory
|
||||
from swift.common.exceptions import DiskFileNotExist
|
||||
|
||||
from eventlet import tpool
|
||||
|
||||
class TestDiskFile(unittest.TestCase):
|
||||
"""Test swift.obj.server.DiskFile"""
|
||||
@ -47,6 +47,10 @@ class TestDiskFile(unittest.TestCase):
|
||||
self.testdir = os.path.join(mkdtemp(), 'tmp_test_obj_server_DiskFile')
|
||||
mkdirs(os.path.join(self.testdir, 'sda1', 'tmp'))
|
||||
|
||||
def fake_exe(*args, **kwargs):
|
||||
pass
|
||||
tpool.execute = fake_exe
|
||||
|
||||
def tearDown(self):
|
||||
""" Tear down for testing swift.object_server.ObjectController """
|
||||
rmtree(os.path.dirname(self.testdir))
|
||||
|
Loading…
Reference in New Issue
Block a user