Refactored to correctly write the pickle to disk

This commit is contained in:
Chuck Thier 2010-09-23 16:09:30 +00:00
parent 1d26b15a4e
commit 044b065cda
4 changed files with 59 additions and 14 deletions

View File

@ -32,6 +32,9 @@ import ctypes.util
import fcntl import fcntl
import struct import struct
from ConfigParser import ConfigParser from ConfigParser import ConfigParser
from tempfile import mkstemp
import cPickle as pickle
import eventlet import eventlet
from eventlet import greenio, GreenPool, sleep, Timeout, listen from eventlet import greenio, GreenPool, sleep, Timeout, listen
@ -510,8 +513,8 @@ def unlink_older_than(path, mtime):
""" """
Remove any file in a given path that that was last modified before mtime. Remove any file in a given path that that was last modified before mtime.
:param path: Path to remove file from :param path: path to remove file from
:mtime: Timestamp of oldest file to keep :mtime: timestamp of oldest file to keep
""" """
if os.path.exists(path): if os.path.exists(path):
for fname in os.listdir(path): for fname in os.listdir(path):
@ -524,6 +527,14 @@ def unlink_older_than(path, mtime):
def item_from_env(env, item_name): def item_from_env(env, item_name):
"""
Get a value from the wsgi environment
:param env: wsgi environment dict
:param item_name: name of item to get
:returns: the value from the environment
"""
item = env.get(item_name, None) item = env.get(item_name, None)
if item is None: if item is None:
logging.error("ERROR: %s could not be found in env!" % item_name) logging.error("ERROR: %s could not be found in env!" % item_name)
@ -531,10 +542,27 @@ def item_from_env(env, item_name):
def cache_from_env(env): def cache_from_env(env):
"""
Get memcache connection pool from the environment (which had been
previously set by the memcache middleware
:param env: wsgi environment dict
:returns: swift.common.memcached.MemcacheRing from environment
"""
return item_from_env(env, 'swift.cache') return item_from_env(env, 'swift.cache')
def readconf(conf, section_name, log_name=None): def readconf(conf, section_name, log_name=None):
"""
Read config file and return config items as a dict
:param conf: path to config file
:param section_name: config section to read
:param log_name: name to be used with logging (will use section_name if
not defined)
:returns: dict of config items
"""
c = ConfigParser() c = ConfigParser()
if not c.read(conf): if not c.read(conf):
print "Unable to read config file %s" % conf print "Unable to read config file %s" % conf
@ -550,3 +578,21 @@ def readconf(conf, section_name, log_name=None):
else: else:
conf['log_name'] = section_name conf['log_name'] = section_name
return conf return conf
def write_pickle(obj, dest, tmp):
"""
Ensure that a pickle file gets written to disk. The file
is first written to a tmp location, ensure it is synced to disk, then
perform a move to its final location
:param obj: python object to be pickled
:param dest: path of final destination file
:param tmp: path to tmp to use
"""
fd, tmppath = mkstemp(dir=tmp)
with os.fdopen(fd, 'wb') as fo:
pickle.dump(obj, fo)
fo.flush()
os.fsync(fd)
renamer(tmppath, dest)

View File

@ -37,7 +37,7 @@ from eventlet import sleep, Timeout
from swift.common.utils import mkdirs, normalize_timestamp, \ from swift.common.utils import mkdirs, normalize_timestamp, \
storage_directory, hash_path, renamer, fallocate, \ storage_directory, hash_path, renamer, fallocate, \
split_path, drop_buffer_cache, get_logger split_path, drop_buffer_cache, get_logger, write_pickle
from swift.common.bufferedhttp import http_connect from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_object_creation, check_mount, \ from swift.common.constraints import check_object_creation, check_mount, \
check_float, check_xml_encodable check_float, check_xml_encodable
@ -300,15 +300,13 @@ class ObjectController(object):
'%s:%s/%s transaction %s (saving for async update later)' % '%s:%s/%s transaction %s (saving for async update later)' %
(ip, port, contdevice, headers_in.get('x-cf-trans-id', '-'))) (ip, port, contdevice, headers_in.get('x-cf-trans-id', '-')))
async_dir = os.path.join(self.devices, objdevice, ASYNCDIR) async_dir = os.path.join(self.devices, objdevice, ASYNCDIR)
fd, tmppath = mkstemp(dir=os.path.join(self.devices, objdevice, 'tmp'))
with os.fdopen(fd, 'wb') as fo:
pickle.dump({'op': op, 'account': account, 'container': container,
'obj': obj, 'headers': headers_out}, fo)
fo.flush()
os.fsync(fd)
ohash = hash_path(account, container, obj) ohash = hash_path(account, container, obj)
renamer(tmppath, os.path.join(async_dir, ohash[-3:], ohash + '-' + write_pickle(
normalize_timestamp(headers_out['x-timestamp']))) {'op': op, 'account': account, 'container': container,
'obj': obj, 'headers': headers_out},
os.path.join(async_dir, ohash[-3:], ohash + '-' +
normalize_timestamp(headers_out['x-timestamp'])),
os.path.join(self.devices, objdevice, 'tmp'))
def POST(self, request): def POST(self, request):
"""Handle HTTP POST requests for the Swift Object Server.""" """Handle HTTP POST requests for the Swift Object Server."""

View File

@ -25,7 +25,7 @@ from eventlet import patcher, Timeout
from swift.common.bufferedhttp import http_connect from swift.common.bufferedhttp import http_connect
from swift.common.exceptions import ConnectionTimeout from swift.common.exceptions import ConnectionTimeout
from swift.common.ring import Ring from swift.common.ring import Ring
from swift.common.utils import get_logger, renamer from swift.common.utils import get_logger, renamer, write_pickle
from swift.common.daemon import Daemon from swift.common.daemon import Daemon
from swift.obj.server import ASYNCDIR from swift.obj.server import ASYNCDIR
@ -176,7 +176,7 @@ class ObjectUpdater(Daemon):
self.failures += 1 self.failures += 1
self.logger.debug('Update failed for %s %s' % (obj, update_path)) self.logger.debug('Update failed for %s %s' % (obj, update_path))
update['successes'] = successes update['successes'] = successes
pickle.dump(update, open(update_path,'w')) write_pickle(update, update_path, os.path.join(device, 'tmp'))
def object_update(self, node, part, op, obj, headers): def object_update(self, node, part, op, obj, headers):
""" """

View File

@ -48,6 +48,7 @@ class TestObjectUpdater(unittest.TestCase):
os.mkdir(self.devices_dir) os.mkdir(self.devices_dir)
self.sda1 = os.path.join(self.devices_dir, 'sda1') self.sda1 = os.path.join(self.devices_dir, 'sda1')
os.mkdir(self.sda1) os.mkdir(self.sda1)
os.mkdir(os.path.join(self.sda1,'tmp'))
def tearDown(self): def tearDown(self):
rmtree(self.testdir, ignore_errors=1) rmtree(self.testdir, ignore_errors=1)