Merge "Moved some code out of swift.obj.replicator"

This commit is contained in:
Jenkins 2013-05-17 04:47:21 +00:00 committed by Gerrit Code Review
commit d754b59cf8
7 changed files with 261 additions and 223 deletions

View File

@ -43,7 +43,7 @@ from urlparse import urlparse as stdlib_urlparse, ParseResult
import itertools
import eventlet
from eventlet import GreenPool, sleep, Timeout
from eventlet import GreenPool, sleep, Timeout, tpool
from eventlet.green import socket, threading
import netifaces
import codecs
@ -1716,3 +1716,18 @@ class InputProxy(object):
raise
self.bytes_received += len(line)
return line
def tpool_reraise(func, *args, **kwargs):
"""
Hack to work around Eventlet's tpool not catching and reraising Timeouts.
"""
def inner():
try:
return func(*args, **kwargs)
except BaseException, err:
return err
resp = tpool.execute(inner)
if isinstance(resp, BaseException):
raise resp
return resp

207
swift/obj/base.py Normal file
View File

@ -0,0 +1,207 @@
# Copyright (c) 2010-2013 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""General object server functions."""
import cPickle as pickle
import errno
import hashlib
import logging
import os
import time
import uuid
from os.path import basename, dirname, join
from swift.common.exceptions import PathNotDir
from swift.common.utils import lock_path, renamer, write_pickle
PICKLE_PROTOCOL = 2
ONE_WEEK = 604800
HASH_FILE = 'hashes.pkl'
def quarantine_renamer(device_path, corrupted_file_path):
"""
In the case that a file is corrupted, move it to a quarantined
area to allow replication to fix it.
:params device_path: The path to the device the corrupted file is on.
:params corrupted_file_path: The path to the file you want quarantined.
:returns: path (str) of directory the file was moved to
:raises OSError: re-raises non errno.EEXIST / errno.ENOTEMPTY
exceptions from rename
"""
from_dir = dirname(corrupted_file_path)
to_dir = join(device_path, 'quarantined', 'objects', basename(from_dir))
invalidate_hash(dirname(from_dir))
try:
renamer(from_dir, to_dir)
except OSError, e:
if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
raise
to_dir = "%s-%s" % (to_dir, uuid.uuid4().hex)
renamer(from_dir, to_dir)
return to_dir
def hash_suffix(path, reclaim_age):
"""
Performs reclamation and returns an md5 of all (remaining) files.
:param reclaim_age: age in seconds at which to remove tombstones
:raises PathNotDir: if given path is not a valid directory
:raises OSError: for non-ENOTDIR errors
"""
md5 = hashlib.md5()
try:
path_contents = sorted(os.listdir(path))
except OSError, err:
if err.errno in (errno.ENOTDIR, errno.ENOENT):
raise PathNotDir()
raise
for hsh in path_contents:
hsh_path = join(path, hsh)
try:
files = os.listdir(hsh_path)
except OSError, err:
if err.errno == errno.ENOTDIR:
partition_path = dirname(path)
objects_path = dirname(partition_path)
device_path = dirname(objects_path)
quar_path = quarantine_renamer(device_path, hsh_path)
logging.exception(
_('Quarantined %s to %s because it is not a directory') %
(hsh_path, quar_path))
continue
raise
if len(files) == 1:
if files[0].endswith('.ts'):
# remove tombstones older than reclaim_age
ts = files[0].rsplit('.', 1)[0]
if (time.time() - float(ts)) > reclaim_age:
os.unlink(join(hsh_path, files[0]))
files.remove(files[0])
elif files:
files.sort(reverse=True)
meta = data = tomb = None
for filename in list(files):
if not meta and filename.endswith('.meta'):
meta = filename
if not data and filename.endswith('.data'):
data = filename
if not tomb and filename.endswith('.ts'):
tomb = filename
if (filename < tomb or # any file older than tomb
filename < data or # any file older than data
(filename.endswith('.meta') and
filename < meta)): # old meta
os.unlink(join(hsh_path, filename))
files.remove(filename)
if not files:
os.rmdir(hsh_path)
for filename in files:
md5.update(filename)
try:
os.rmdir(path)
except OSError:
pass
return md5.hexdigest()
def invalidate_hash(suffix_dir):
"""
Invalidates the hash for a suffix_dir in the partition's hashes file.
:param suffix_dir: absolute path to suffix dir whose hash needs
invalidating
"""
suffix = os.path.basename(suffix_dir)
partition_dir = os.path.dirname(suffix_dir)
hashes_file = join(partition_dir, HASH_FILE)
with lock_path(partition_dir):
try:
with open(hashes_file, 'rb') as fp:
hashes = pickle.load(fp)
if suffix in hashes and not hashes[suffix]:
return
except Exception:
return
hashes[suffix] = None
write_pickle(hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
def get_hashes(partition_dir, recalculate=None, do_listdir=False,
reclaim_age=ONE_WEEK):
"""
Get a list of hashes for the suffix dir. do_listdir causes it to mistrust
the hash cache for suffix existence at the (unexpectedly high) cost of a
listdir. reclaim_age is just passed on to hash_suffix.
:param partition_dir: absolute path of partition to get hashes for
:param recalculate: list of suffixes which should be recalculated when got
:param do_listdir: force existence check for all hashes in the partition
:param reclaim_age: age at which to remove tombstones
:returns: tuple of (number of suffix dirs hashed, dictionary of hashes)
"""
hashed = 0
hashes_file = join(partition_dir, HASH_FILE)
modified = False
force_rewrite = False
hashes = {}
mtime = -1
if recalculate is None:
recalculate = []
try:
with open(hashes_file, 'rb') as fp:
hashes = pickle.load(fp)
mtime = os.path.getmtime(hashes_file)
except Exception:
do_listdir = True
force_rewrite = True
if do_listdir:
for suff in os.listdir(partition_dir):
if len(suff) == 3:
hashes.setdefault(suff, None)
modified = True
hashes.update((hash_, None) for hash_ in recalculate)
for suffix, hash_ in hashes.items():
if not hash_:
suffix_dir = join(partition_dir, suffix)
try:
hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
hashed += 1
except PathNotDir:
del hashes[suffix]
except OSError:
logging.exception(_('Error hashing suffix'))
modified = True
if modified:
with lock_path(partition_dir):
if force_rewrite or not os.path.exists(hashes_file) or \
os.path.getmtime(hashes_file) == mtime:
write_pickle(
hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
return hashed, hashes
return get_hashes(partition_dir, recalculate, do_listdir,
reclaim_age)
else:
return hashed, hashes

View File

@ -14,16 +14,12 @@
# limitations under the License.
import os
from os.path import basename, dirname, isdir, isfile, join
from os.path import isdir, isfile, join
import random
import shutil
import time
import logging
import hashlib
import itertools
import cPickle as pickle
import errno
import uuid
import eventlet
from eventlet import GreenPool, tpool, Timeout, sleep, hubs
@ -31,209 +27,18 @@ from eventlet.green import subprocess
from eventlet.support.greenlets import GreenletExit
from swift.common.ring import Ring
from swift.common.utils import whataremyips, unlink_older_than, lock_path, \
compute_eta, get_logger, write_pickle, renamer, dump_recon_cache, \
rsync_ip, mkdirs, config_true_value, list_from_csv, get_hub
from swift.common.utils import whataremyips, unlink_older_than, \
compute_eta, get_logger, dump_recon_cache, \
rsync_ip, mkdirs, config_true_value, list_from_csv, get_hub, \
tpool_reraise
from swift.common.bufferedhttp import http_connect
from swift.common.daemon import Daemon
from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE
from swift.common.exceptions import PathNotDir
from swift.obj.base import get_hashes
hubs.use_hub(get_hub())
PICKLE_PROTOCOL = 2
ONE_WEEK = 604800
HASH_FILE = 'hashes.pkl'
def quarantine_renamer(device_path, corrupted_file_path):
"""
In the case that a file is corrupted, move it to a quarantined
area to allow replication to fix it.
:params device_path: The path to the device the corrupted file is on.
:params corrupted_file_path: The path to the file you want quarantined.
:returns: path (str) of directory the file was moved to
:raises OSError: re-raises non errno.EEXIST / errno.ENOTEMPTY
exceptions from rename
"""
from_dir = dirname(corrupted_file_path)
to_dir = join(device_path, 'quarantined', 'objects', basename(from_dir))
invalidate_hash(dirname(from_dir))
try:
renamer(from_dir, to_dir)
except OSError, e:
if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
raise
to_dir = "%s-%s" % (to_dir, uuid.uuid4().hex)
renamer(from_dir, to_dir)
return to_dir
def hash_suffix(path, reclaim_age):
"""
Performs reclamation and returns an md5 of all (remaining) files.
:param reclaim_age: age in seconds at which to remove tombstones
:raises PathNotDir: if given path is not a valid directory
:raises OSError: for non-ENOTDIR errors
"""
md5 = hashlib.md5()
try:
path_contents = sorted(os.listdir(path))
except OSError, err:
if err.errno in (errno.ENOTDIR, errno.ENOENT):
raise PathNotDir()
raise
for hsh in path_contents:
hsh_path = join(path, hsh)
try:
files = os.listdir(hsh_path)
except OSError, err:
if err.errno == errno.ENOTDIR:
partition_path = dirname(path)
objects_path = dirname(partition_path)
device_path = dirname(objects_path)
quar_path = quarantine_renamer(device_path, hsh_path)
logging.exception(
_('Quarantined %s to %s because it is not a directory') %
(hsh_path, quar_path))
continue
raise
if len(files) == 1:
if files[0].endswith('.ts'):
# remove tombstones older than reclaim_age
ts = files[0].rsplit('.', 1)[0]
if (time.time() - float(ts)) > reclaim_age:
os.unlink(join(hsh_path, files[0]))
files.remove(files[0])
elif files:
files.sort(reverse=True)
meta = data = tomb = None
for filename in list(files):
if not meta and filename.endswith('.meta'):
meta = filename
if not data and filename.endswith('.data'):
data = filename
if not tomb and filename.endswith('.ts'):
tomb = filename
if (filename < tomb or # any file older than tomb
filename < data or # any file older than data
(filename.endswith('.meta') and
filename < meta)): # old meta
os.unlink(join(hsh_path, filename))
files.remove(filename)
if not files:
os.rmdir(hsh_path)
for filename in files:
md5.update(filename)
try:
os.rmdir(path)
except OSError:
pass
return md5.hexdigest()
def invalidate_hash(suffix_dir):
"""
Invalidates the hash for a suffix_dir in the partition's hashes file.
:param suffix_dir: absolute path to suffix dir whose hash needs
invalidating
"""
suffix = os.path.basename(suffix_dir)
partition_dir = os.path.dirname(suffix_dir)
hashes_file = join(partition_dir, HASH_FILE)
with lock_path(partition_dir):
try:
with open(hashes_file, 'rb') as fp:
hashes = pickle.load(fp)
if suffix in hashes and not hashes[suffix]:
return
except Exception:
return
hashes[suffix] = None
write_pickle(hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
def get_hashes(partition_dir, recalculate=None, do_listdir=False,
reclaim_age=ONE_WEEK):
"""
Get a list of hashes for the suffix dir. do_listdir causes it to mistrust
the hash cache for suffix existence at the (unexpectedly high) cost of a
listdir. reclaim_age is just passed on to hash_suffix.
:param partition_dir: absolute path of partition to get hashes for
:param recalculate: list of suffixes which should be recalculated when got
:param do_listdir: force existence check for all hashes in the partition
:param reclaim_age: age at which to remove tombstones
:returns: tuple of (number of suffix dirs hashed, dictionary of hashes)
"""
hashed = 0
hashes_file = join(partition_dir, HASH_FILE)
modified = False
force_rewrite = False
hashes = {}
mtime = -1
if recalculate is None:
recalculate = []
try:
with open(hashes_file, 'rb') as fp:
hashes = pickle.load(fp)
mtime = os.path.getmtime(hashes_file)
except Exception:
do_listdir = True
force_rewrite = True
if do_listdir:
for suff in os.listdir(partition_dir):
if len(suff) == 3:
hashes.setdefault(suff, None)
modified = True
hashes.update((hash_, None) for hash_ in recalculate)
for suffix, hash_ in hashes.items():
if not hash_:
suffix_dir = join(partition_dir, suffix)
try:
hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
hashed += 1
except PathNotDir:
del hashes[suffix]
except OSError:
logging.exception(_('Error hashing suffix'))
modified = True
if modified:
with lock_path(partition_dir):
if force_rewrite or not os.path.exists(hashes_file) or \
os.path.getmtime(hashes_file) == mtime:
write_pickle(
hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
return hashed, hashes
return get_hashes(partition_dir, recalculate, do_listdir,
reclaim_age)
else:
return hashed, hashes
def tpool_reraise(func, *args, **kwargs):
"""
Hack to work around Eventlet's tpool not catching and reraising Timeouts.
"""
def inner():
try:
return func(*args, **kwargs)
except BaseException, err:
return err
resp = tpool.execute(inner)
if isinstance(resp, BaseException):
raise resp
return resp
class ObjectReplicator(Daemon):
"""

View File

@ -33,13 +33,14 @@ from eventlet import sleep, Timeout, tpool
from swift.common.utils import mkdirs, normalize_timestamp, public, \
storage_directory, hash_path, renamer, fallocate, fsync, fdatasync, \
split_path, drop_buffer_cache, get_logger, write_pickle, \
config_true_value, validate_device_partition, timing_stats
config_true_value, validate_device_partition, timing_stats, \
tpool_reraise
from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_object_creation, check_mount, \
check_float, check_utf8
from swift.common.exceptions import ConnectionTimeout, DiskFileError, \
DiskFileNotExist, DiskFileCollision, DiskFileNoSpace
from swift.obj.replicator import tpool_reraise, invalidate_hash, \
from swift.obj.base import invalidate_hash, \
quarantine_renamer, get_hashes
from swift.common.http import is_success
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \

View File

@ -36,7 +36,7 @@ from StringIO import StringIO
from functools import partial
from tempfile import TemporaryFile, NamedTemporaryFile
from mock import patch
from mock import MagicMock, patch
from swift.common.exceptions import (Timeout, MessageTimeout,
ConnectionTimeout)
@ -1313,6 +1313,16 @@ log_name = %(yarr)s'''
ts = utils.get_trans_id_time('tx1df4ff4f55ea45f7b2ec2-almostright')
self.assertEquals(ts, None)
def test_tpool_reraise(self):
with patch.object(utils.tpool, 'execute', lambda f: f()):
self.assertTrue(
utils.tpool_reraise(MagicMock(return_value='test1')), 'test1')
self.assertRaises(Exception,
utils.tpool_reraise, MagicMock(side_effect=Exception('test2')))
self.assertRaises(BaseException,
utils.tpool_reraise,
MagicMock(side_effect=BaseException('test3')))
class TestStatsdLogging(unittest.TestCase):
def test_get_logger_statsd_client_not_specified(self):

View File

@ -26,7 +26,7 @@ from swift.obj import server as object_server
from swift.obj.server import DiskFile, write_metadata, DATADIR
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
storage_directory
from swift.obj.replicator import invalidate_hash
from swift.obj.base import invalidate_hash
class TestAuditor(unittest.TestCase):

View File

@ -30,7 +30,7 @@ from test.unit import FakeLogger, mock as unit_mock
from swift.common import utils
from swift.common.utils import hash_path, mkdirs, normalize_timestamp
from swift.common import ring
from swift.obj import replicator as object_replicator
from swift.obj import base as object_base, replicator as object_replicator
from swift.obj.server import DiskFile
@ -242,7 +242,7 @@ class TestObjectReplicator(unittest.TestCase):
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger())
mkdirs(df.datadir)
part = os.path.join(self.objects, '0')
open(os.path.join(part, object_replicator.HASH_FILE), 'w')
open(os.path.join(part, object_base.HASH_FILE), 'w')
# Now the hash file is zero bytes.
i = [0]
def getmtime(filename):
@ -283,7 +283,7 @@ class TestObjectReplicator(unittest.TestCase):
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, '0', data_dir)
orig_quarantine_renamer = object_replicator.quarantine_renamer
orig_quarantine_renamer = object_base.quarantine_renamer
called = [False]
def wrapped(*args, **kwargs):
@ -291,10 +291,10 @@ class TestObjectReplicator(unittest.TestCase):
return orig_quarantine_renamer(*args, **kwargs)
try:
object_replicator.quarantine_renamer = wrapped
object_replicator.hash_suffix(whole_path_from, 101)
object_base.quarantine_renamer = wrapped
object_base.hash_suffix(whole_path_from, 101)
finally:
object_replicator.quarantine_renamer = orig_quarantine_renamer
object_base.quarantine_renamer = orig_quarantine_renamer
self.assertTrue(called[0])
def test_hash_suffix_one_file(self):
@ -308,10 +308,10 @@ class TestObjectReplicator(unittest.TestCase):
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, '0', data_dir)
object_replicator.hash_suffix(whole_path_from, 101)
object_base.hash_suffix(whole_path_from, 101)
self.assertEquals(len(os.listdir(self.parts['0'])), 1)
object_replicator.hash_suffix(whole_path_from, 99)
object_base.hash_suffix(whole_path_from, 99)
self.assertEquals(len(os.listdir(self.parts['0'])), 0)
def test_hash_suffix_multi_file_one(self):
@ -331,7 +331,7 @@ class TestObjectReplicator(unittest.TestCase):
hsh_path = os.listdir(whole_path_from)[0]
whole_hsh_path = os.path.join(whole_path_from, hsh_path)
object_replicator.hash_suffix(whole_path_from, 99)
object_base.hash_suffix(whole_path_from, 99)
# only the tombstone should be left
self.assertEquals(len(os.listdir(whole_hsh_path)), 1)
@ -355,7 +355,7 @@ class TestObjectReplicator(unittest.TestCase):
hsh_path = os.listdir(whole_path_from)[0]
whole_hsh_path = os.path.join(whole_path_from, hsh_path)
object_replicator.hash_suffix(whole_path_from, 99)
object_base.hash_suffix(whole_path_from, 99)
# only the meta and data should be left
self.assertEquals(len(os.listdir(whole_hsh_path)), 2)
@ -372,17 +372,17 @@ class TestObjectReplicator(unittest.TestCase):
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, '0', data_dir)
hashes_file = os.path.join(self.objects, '0',
object_replicator.HASH_FILE)
object_base.HASH_FILE)
# test that non existent file except caught
self.assertEquals(object_replicator.invalidate_hash(whole_path_from),
self.assertEquals(object_base.invalidate_hash(whole_path_from),
None)
# test that hashes get cleared
check_pickle_data = pickle.dumps({data_dir: None},
object_replicator.PICKLE_PROTOCOL)
object_base.PICKLE_PROTOCOL)
for data_hash in [{data_dir: None}, {data_dir: 'abcdefg'}]:
with open(hashes_file, 'wb') as fp:
pickle.dump(data_hash, fp, object_replicator.PICKLE_PROTOCOL)
object_replicator.invalidate_hash(whole_path_from)
pickle.dump(data_hash, fp, object_base.PICKLE_PROTOCOL)
object_base.invalidate_hash(whole_path_from)
assertFileData(hashes_file, check_pickle_data)
def test_check_ring(self):
@ -539,7 +539,7 @@ class TestObjectReplicator(unittest.TestCase):
('2', True), ('3', True)]:
self.assertEquals(os.access(
os.path.join(self.objects,
i, object_replicator.HASH_FILE),
i, object_base.HASH_FILE),
os.F_OK), result)
finally:
object_replicator.http_connect = was_connector