diff --git a/swift/common/utils.py b/swift/common/utils.py index f2e186c03e..59c9ee7c3c 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -32,6 +32,9 @@ import ctypes.util import fcntl import struct from ConfigParser import ConfigParser +from tempfile import mkstemp +import cPickle as pickle + import eventlet from eventlet import greenio, GreenPool, sleep, Timeout, listen @@ -510,8 +513,8 @@ def unlink_older_than(path, mtime): """ Remove any file in a given path that that was last modified before mtime. - :param path: Path to remove file from - :mtime: Timestamp of oldest file to keep + :param path: path to remove file from + :mtime: timestamp of oldest file to keep """ if os.path.exists(path): for fname in os.listdir(path): @@ -524,6 +527,14 @@ def unlink_older_than(path, mtime): def item_from_env(env, item_name): + """ + Get a value from the wsgi environment + + :param env: wsgi environment dict + :param item_name: name of item to get + + :returns: the value from the environment + """ item = env.get(item_name, None) if item is None: logging.error("ERROR: %s could not be found in env!" % item_name) @@ -531,10 +542,27 @@ def item_from_env(env, item_name): def cache_from_env(env): + """ + Get memcache connection pool from the environment (which had been + previously set by the memcache middleware + + :param env: wsgi environment dict + + :returns: swift.common.memcached.MemcacheRing from environment + """ return item_from_env(env, 'swift.cache') def readconf(conf, section_name, log_name=None): + """ + Read config file and return config items as a dict + + :param conf: path to config file + :param section_name: config section to read + :param log_name: name to be used with logging (will use section_name if + not defined) + :returns: dict of config items + """ c = ConfigParser() if not c.read(conf): print "Unable to read config file %s" % conf @@ -550,3 +578,21 @@ def readconf(conf, section_name, log_name=None): else: conf['log_name'] = section_name return conf + + +def write_pickle(obj, dest, tmp): + """ + Ensure that a pickle file gets written to disk. The file + is first written to a tmp location, ensure it is synced to disk, then + perform a move to its final location + + :param obj: python object to be pickled + :param dest: path of final destination file + :param tmp: path to tmp to use + """ + fd, tmppath = mkstemp(dir=tmp) + with os.fdopen(fd, 'wb') as fo: + pickle.dump(obj, fo) + fo.flush() + os.fsync(fd) + renamer(tmppath, dest) diff --git a/swift/obj/server.py b/swift/obj/server.py index 49f707bdb3..1120d21f48 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -37,7 +37,7 @@ from eventlet import sleep, Timeout from swift.common.utils import mkdirs, normalize_timestamp, \ storage_directory, hash_path, renamer, fallocate, \ - split_path, drop_buffer_cache, get_logger + split_path, drop_buffer_cache, get_logger, write_pickle from swift.common.bufferedhttp import http_connect from swift.common.constraints import check_object_creation, check_mount, \ check_float, check_xml_encodable @@ -300,15 +300,13 @@ class ObjectController(object): '%s:%s/%s transaction %s (saving for async update later)' % (ip, port, contdevice, headers_in.get('x-cf-trans-id', '-'))) async_dir = os.path.join(self.devices, objdevice, ASYNCDIR) - fd, tmppath = mkstemp(dir=os.path.join(self.devices, objdevice, 'tmp')) - with os.fdopen(fd, 'wb') as fo: - pickle.dump({'op': op, 'account': account, 'container': container, - 'obj': obj, 'headers': headers_out}, fo) - fo.flush() - os.fsync(fd) - ohash = hash_path(account, container, obj) - renamer(tmppath, os.path.join(async_dir, ohash[-3:], ohash + '-' + - normalize_timestamp(headers_out['x-timestamp']))) + ohash = hash_path(account, container, obj) + write_pickle( + {'op': op, 'account': account, 'container': container, + 'obj': obj, 'headers': headers_out}, + os.path.join(async_dir, ohash[-3:], ohash + '-' + + normalize_timestamp(headers_out['x-timestamp'])), + os.path.join(self.devices, objdevice, 'tmp')) def POST(self, request): """Handle HTTP POST requests for the Swift Object Server.""" diff --git a/swift/obj/updater.py b/swift/obj/updater.py index 22f259feae..3d6a15cc4f 100644 --- a/swift/obj/updater.py +++ b/swift/obj/updater.py @@ -25,7 +25,7 @@ from eventlet import patcher, Timeout from swift.common.bufferedhttp import http_connect from swift.common.exceptions import ConnectionTimeout from swift.common.ring import Ring -from swift.common.utils import get_logger, renamer +from swift.common.utils import get_logger, renamer, write_pickle from swift.common.daemon import Daemon from swift.obj.server import ASYNCDIR @@ -176,7 +176,7 @@ class ObjectUpdater(Daemon): self.failures += 1 self.logger.debug('Update failed for %s %s' % (obj, update_path)) update['successes'] = successes - pickle.dump(update, open(update_path,'w')) + write_pickle(update, update_path, os.path.join(device, 'tmp')) def object_update(self, node, part, op, obj, headers): """ diff --git a/test/unit/obj/test_updater.py b/test/unit/obj/test_updater.py index dc0dd7129f..9887c6fcaf 100644 --- a/test/unit/obj/test_updater.py +++ b/test/unit/obj/test_updater.py @@ -48,6 +48,7 @@ class TestObjectUpdater(unittest.TestCase): os.mkdir(self.devices_dir) self.sda1 = os.path.join(self.devices_dir, 'sda1') os.mkdir(self.sda1) + os.mkdir(os.path.join(self.sda1,'tmp')) def tearDown(self): rmtree(self.testdir, ignore_errors=1)