only latest async pending is now sent
This commit is contained in:
commit
df0a61a649
@ -776,7 +776,7 @@ def readconf(conf, section_name=None, log_name=None, defaults=None):
|
||||
return conf
|
||||
|
||||
|
||||
def write_pickle(obj, dest, tmp, pickle_protocol=0):
|
||||
def write_pickle(obj, dest, tmp=None, pickle_protocol=0):
|
||||
"""
|
||||
Ensure that a pickle file gets written to disk. The file
|
||||
is first written to a tmp location, ensure it is synced to disk, then
|
||||
@ -784,9 +784,11 @@ def write_pickle(obj, dest, tmp, pickle_protocol=0):
|
||||
|
||||
:param obj: python object to be pickled
|
||||
:param dest: path of final destination file
|
||||
:param tmp: path to tmp to use
|
||||
:param tmp: path to tmp to use, defaults to None
|
||||
:param pickle_protocol: protocol to pickle the obj with, defaults to 0
|
||||
"""
|
||||
if tmp == None:
|
||||
tmp = os.path.dirname(dest)
|
||||
fd, tmppath = mkstemp(dir=tmp, suffix='.tmp')
|
||||
with os.fdopen(fd, 'wb') as fo:
|
||||
pickle.dump(obj, fo, pickle_protocol)
|
||||
|
@ -132,11 +132,23 @@ class ObjectUpdater(Daemon):
|
||||
prefix_path = os.path.join(async_pending, prefix)
|
||||
if not os.path.isdir(prefix_path):
|
||||
continue
|
||||
for update in os.listdir(prefix_path):
|
||||
last_obj_hash = None
|
||||
for update in sorted(os.listdir(prefix_path), reverse=True):
|
||||
update_path = os.path.join(prefix_path, update)
|
||||
if not os.path.isfile(update_path):
|
||||
continue
|
||||
self.process_object_update(update_path, device)
|
||||
try:
|
||||
obj_hash, timestamp = update.split('-')
|
||||
except ValueError:
|
||||
self.logger.error(
|
||||
_('ERROR async pending file with unexpected name %s')
|
||||
% (update_path))
|
||||
continue
|
||||
if obj_hash == last_obj_hash:
|
||||
os.unlink(update_path)
|
||||
else:
|
||||
self.process_object_update(update_path, device)
|
||||
last_obj_hash = obj_hash
|
||||
time.sleep(self.slowdown)
|
||||
try:
|
||||
os.rmdir(prefix_path)
|
||||
|
@ -20,14 +20,17 @@ import unittest
|
||||
from gzip import GzipFile
|
||||
from shutil import rmtree
|
||||
from time import time
|
||||
from distutils.dir_util import mkpath
|
||||
|
||||
from eventlet import spawn, TimeoutError, listen
|
||||
from eventlet.timeout import Timeout
|
||||
|
||||
from swift.obj import updater as object_updater, server as object_server
|
||||
from swift.obj.server import ASYNCDIR
|
||||
from swift.common.ring import RingData
|
||||
from swift.common import utils
|
||||
from swift.common.utils import hash_path, normalize_timestamp, mkdirs
|
||||
from swift.common.utils import hash_path, normalize_timestamp, mkdirs, \
|
||||
write_pickle
|
||||
|
||||
|
||||
class TestObjectUpdater(unittest.TestCase):
|
||||
@ -48,7 +51,7 @@ class TestObjectUpdater(unittest.TestCase):
|
||||
os.mkdir(self.devices_dir)
|
||||
self.sda1 = os.path.join(self.devices_dir, 'sda1')
|
||||
os.mkdir(self.sda1)
|
||||
os.mkdir(os.path.join(self.sda1,'tmp'))
|
||||
os.mkdir(os.path.join(self.sda1, 'tmp'))
|
||||
|
||||
def tearDown(self):
|
||||
rmtree(self.testdir, ignore_errors=1)
|
||||
@ -70,6 +73,45 @@ class TestObjectUpdater(unittest.TestCase):
|
||||
self.assertEquals(cu.node_timeout, 5)
|
||||
self.assert_(cu.get_container_ring() is not None)
|
||||
|
||||
def test_object_sweep(self):
|
||||
prefix_dir = os.path.join(self.sda1, ASYNCDIR, 'abc')
|
||||
mkpath(prefix_dir)
|
||||
|
||||
objects = {
|
||||
'a': [1089.3, 18.37, 12.83, 1.3],
|
||||
'b': [49.4, 49.3, 49.2, 49.1],
|
||||
'c': [109984.123],
|
||||
}
|
||||
|
||||
expected = set()
|
||||
for o, timestamps in objects.iteritems():
|
||||
ohash = hash_path('account', 'container', o)
|
||||
for t in timestamps:
|
||||
o_path = os.path.join(prefix_dir, ohash + '-' +
|
||||
normalize_timestamp(t))
|
||||
if t == timestamps[0]:
|
||||
expected.add(o_path)
|
||||
write_pickle({}, o_path)
|
||||
|
||||
seen = set()
|
||||
|
||||
class MockObjectUpdater(object_updater.ObjectUpdater):
|
||||
def process_object_update(self, update_path, device):
|
||||
seen.add(update_path)
|
||||
os.unlink(update_path)
|
||||
|
||||
cu = MockObjectUpdater({
|
||||
'devices': self.devices_dir,
|
||||
'mount_check': 'false',
|
||||
'swift_dir': self.testdir,
|
||||
'interval': '1',
|
||||
'concurrency': '1',
|
||||
'node_timeout': '5',
|
||||
})
|
||||
cu.object_sweep(self.sda1)
|
||||
self.assert_(not os.path.exists(prefix_dir))
|
||||
self.assertEqual(expected, seen)
|
||||
|
||||
def test_run_once(self):
|
||||
cu = object_updater.ObjectUpdater({
|
||||
'devices': self.devices_dir,
|
||||
@ -103,6 +145,7 @@ class TestObjectUpdater(unittest.TestCase):
|
||||
self.assert_(os.path.exists(op_path))
|
||||
|
||||
bindsock = listen(('127.0.0.1', 0))
|
||||
|
||||
def accepter(sock, return_code):
|
||||
try:
|
||||
with Timeout(3):
|
||||
@ -123,6 +166,7 @@ class TestObjectUpdater(unittest.TestCase):
|
||||
except BaseException, err:
|
||||
return err
|
||||
return None
|
||||
|
||||
def accept(return_codes):
|
||||
codes = iter(return_codes)
|
||||
try:
|
||||
@ -139,7 +183,7 @@ class TestObjectUpdater(unittest.TestCase):
|
||||
except BaseException, err:
|
||||
return err
|
||||
return None
|
||||
event = spawn(accept, [201,500])
|
||||
event = spawn(accept, [201, 500])
|
||||
for dev in cu.get_container_ring().devs:
|
||||
if dev is not None:
|
||||
dev['port'] = bindsock.getsockname()[1]
|
||||
|
Loading…
x
Reference in New Issue
Block a user