Merge "updater: Stop trying to quarantine missing asyncs"
This commit is contained in:
commit
39acbc342f
@ -349,7 +349,9 @@ class ObjectUpdater(Daemon):
|
|||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
update = pickle.load(open(update_path, 'rb'))
|
update = pickle.load(open(update_path, 'rb'))
|
||||||
except Exception:
|
except Exception as e:
|
||||||
|
if getattr(e, 'errno', None) == errno.ENOENT:
|
||||||
|
return
|
||||||
self.logger.exception(
|
self.logger.exception(
|
||||||
_('ERROR Pickle problem, quarantining %s'), update_path)
|
_('ERROR Pickle problem, quarantining %s'), update_path)
|
||||||
self.stats.quarantines += 1
|
self.stats.quarantines += 1
|
||||||
@ -357,6 +359,13 @@ class ObjectUpdater(Daemon):
|
|||||||
target_path = os.path.join(device, 'quarantined', 'objects',
|
target_path = os.path.join(device, 'quarantined', 'objects',
|
||||||
os.path.basename(update_path))
|
os.path.basename(update_path))
|
||||||
renamer(update_path, target_path, fsync=False)
|
renamer(update_path, target_path, fsync=False)
|
||||||
|
try:
|
||||||
|
# If this was the last async_pending in the directory,
|
||||||
|
# then this will succeed. Otherwise, it'll fail, and
|
||||||
|
# that's okay.
|
||||||
|
os.rmdir(os.path.dirname(update_path))
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
return
|
return
|
||||||
|
|
||||||
def do_update():
|
def do_update():
|
||||||
|
@ -647,7 +647,6 @@ class TestObjectUpdater(unittest.TestCase):
|
|||||||
data, timestamp, policy)
|
data, timestamp, policy)
|
||||||
|
|
||||||
def test_obj_put_async_updates(self):
|
def test_obj_put_async_updates(self):
|
||||||
ts_iter = make_timestamp_iter()
|
|
||||||
policies = list(POLICIES)
|
policies = list(POLICIES)
|
||||||
random.shuffle(policies)
|
random.shuffle(policies)
|
||||||
|
|
||||||
@ -664,8 +663,8 @@ class TestObjectUpdater(unittest.TestCase):
|
|||||||
def do_test(headers_out, expected, container_path=None):
|
def do_test(headers_out, expected, container_path=None):
|
||||||
# write an async
|
# write an async
|
||||||
dfmanager = DiskFileManager(conf, daemon.logger)
|
dfmanager = DiskFileManager(conf, daemon.logger)
|
||||||
self._write_async_update(dfmanager, next(ts_iter), policies[0],
|
self._write_async_update(dfmanager, next(self.ts_iter),
|
||||||
headers=headers_out,
|
policies[0], headers=headers_out,
|
||||||
container_path=container_path)
|
container_path=container_path)
|
||||||
request_log = []
|
request_log = []
|
||||||
|
|
||||||
@ -691,7 +690,7 @@ class TestObjectUpdater(unittest.TestCase):
|
|||||||
self.assertFalse(os.listdir(async_dir))
|
self.assertFalse(os.listdir(async_dir))
|
||||||
daemon.logger.clear()
|
daemon.logger.clear()
|
||||||
|
|
||||||
ts = next(ts_iter)
|
ts = next(self.ts_iter)
|
||||||
# use a dict rather than HeaderKeyDict so we can vary the case of the
|
# use a dict rather than HeaderKeyDict so we can vary the case of the
|
||||||
# pickled headers
|
# pickled headers
|
||||||
headers_out = {
|
headers_out = {
|
||||||
@ -1153,6 +1152,69 @@ class TestObjectUpdater(unittest.TestCase):
|
|||||||
daemon.logger.get_increment_counts())
|
daemon.logger.get_increment_counts())
|
||||||
self.assertFalse(os.listdir(async_dir)) # no async file
|
self.assertFalse(os.listdir(async_dir)) # no async file
|
||||||
|
|
||||||
|
def test_obj_update_quarantine(self):
|
||||||
|
policies = list(POLICIES)
|
||||||
|
random.shuffle(policies)
|
||||||
|
|
||||||
|
# setup updater
|
||||||
|
conf = {
|
||||||
|
'devices': self.devices_dir,
|
||||||
|
'mount_check': 'false',
|
||||||
|
'swift_dir': self.testdir,
|
||||||
|
}
|
||||||
|
daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
|
||||||
|
async_dir = os.path.join(self.sda1, get_async_dir(policies[0]))
|
||||||
|
os.mkdir(async_dir)
|
||||||
|
|
||||||
|
ohash = hash_path('a', 'c', 'o')
|
||||||
|
odir = os.path.join(async_dir, ohash[-3:])
|
||||||
|
mkdirs(odir)
|
||||||
|
op_path = os.path.join(
|
||||||
|
odir,
|
||||||
|
'%s-%s' % (ohash, next(self.ts_iter).internal))
|
||||||
|
with open(op_path, 'wb') as async_pending:
|
||||||
|
async_pending.write(b'\xff') # invalid pickle
|
||||||
|
|
||||||
|
with mocked_http_conn():
|
||||||
|
with mock.patch('swift.obj.updater.dump_recon_cache'):
|
||||||
|
daemon.run_once()
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
{'quarantines': 1},
|
||||||
|
daemon.logger.get_increment_counts())
|
||||||
|
self.assertFalse(os.listdir(async_dir)) # no asyncs
|
||||||
|
|
||||||
|
def test_obj_update_gone_missing(self):
|
||||||
|
# if you've got multiple updaters running (say, both a background
|
||||||
|
# and foreground process), process_object_update may get a file
|
||||||
|
# that doesn't exist
|
||||||
|
policies = list(POLICIES)
|
||||||
|
random.shuffle(policies)
|
||||||
|
|
||||||
|
# setup updater
|
||||||
|
conf = {
|
||||||
|
'devices': self.devices_dir,
|
||||||
|
'mount_check': 'false',
|
||||||
|
'swift_dir': self.testdir,
|
||||||
|
}
|
||||||
|
daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
|
||||||
|
async_dir = os.path.join(self.sda1, get_async_dir(policies[0]))
|
||||||
|
os.mkdir(async_dir)
|
||||||
|
|
||||||
|
ohash = hash_path('a', 'c', 'o')
|
||||||
|
odir = os.path.join(async_dir, ohash[-3:])
|
||||||
|
mkdirs(odir)
|
||||||
|
op_path = os.path.join(
|
||||||
|
odir,
|
||||||
|
'%s-%s' % (ohash, next(self.ts_iter).internal))
|
||||||
|
|
||||||
|
with mocked_http_conn():
|
||||||
|
with mock.patch('swift.obj.updater.dump_recon_cache'):
|
||||||
|
daemon.process_object_update(op_path, self.sda1, policies[0])
|
||||||
|
self.assertEqual({}, daemon.logger.get_increment_counts())
|
||||||
|
self.assertEqual(os.listdir(async_dir), [ohash[-3:]])
|
||||||
|
self.assertFalse(os.listdir(odir))
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user