Merge "object: Plumb logger_thread_locals through _finalize_put"
This commit is contained in:
commit
443677f104
@ -1837,7 +1837,10 @@ class BaseDiskFileWriter(object):
|
||||
"""
|
||||
return self._upload_size, self._chunks_etag.hexdigest()
|
||||
|
||||
def _finalize_put(self, metadata, target_path, cleanup):
|
||||
def _finalize_put(self, metadata, target_path, cleanup,
|
||||
logger_thread_locals):
|
||||
if logger_thread_locals is not None:
|
||||
self.logger.thread_locals = logger_thread_locals
|
||||
# Write the metadata before calling fsync() so that both data and
|
||||
# metadata are flushed to disk.
|
||||
write_metadata(self._fd, metadata)
|
||||
@ -1914,7 +1917,9 @@ class BaseDiskFileWriter(object):
|
||||
metadata['name'] = self._name
|
||||
target_path = join(self._datadir, filename)
|
||||
|
||||
tpool.execute(self._finalize_put, metadata, target_path, cleanup)
|
||||
tpool.execute(
|
||||
self._finalize_put, metadata, target_path, cleanup,
|
||||
logger_thread_locals=getattr(self.logger, 'thread_locals', None))
|
||||
|
||||
def put(self, metadata):
|
||||
"""
|
||||
|
@ -2863,6 +2863,59 @@ class TestObjectController(unittest.TestCase):
|
||||
check_file(old_part)
|
||||
check_file(new_part)
|
||||
|
||||
def test_PUT_next_part_power_eexist(self):
|
||||
hash_path_ = hash_path('a', 'c', 'o')
|
||||
part_power = 10
|
||||
old_part = utils.get_partition_for_hash(hash_path_, part_power)
|
||||
new_part = utils.get_partition_for_hash(hash_path_, part_power + 1)
|
||||
policy = POLICIES.default
|
||||
timestamp = utils.Timestamp(int(time())).internal
|
||||
|
||||
# There's no substitute for the real thing ;-)
|
||||
tpool.execute = self._orig_tpool_exc
|
||||
|
||||
# This is a little disingenuous, but it's easier than reproducing
|
||||
# the actual race that could lead to this EEXIST
|
||||
headers = {'X-Timestamp': timestamp,
|
||||
'Content-Length': '6',
|
||||
'Content-Type': 'application/octet-stream',
|
||||
'X-Backend-Storage-Policy-Index': int(policy),
|
||||
'X-Trans-Id': 'txn1'}
|
||||
req = Request.blank(
|
||||
'/sda1/%s/a/c/o' % new_part, method='PUT',
|
||||
headers=headers, body=b'VERIFY')
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
# The write should succeed, but the relink will fail
|
||||
headers = {'X-Timestamp': timestamp,
|
||||
'Content-Length': '6',
|
||||
'Content-Type': 'application/octet-stream',
|
||||
'X-Backend-Storage-Policy-Index': int(policy),
|
||||
'X-Backend-Next-Part-Power': part_power + 1,
|
||||
'X-Trans-Id': 'txn2'}
|
||||
req = Request.blank(
|
||||
'/sda1/%s/a/c/o' % old_part, method='PUT',
|
||||
headers=headers, body=b'VERIFY')
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
def check_file(part):
|
||||
data_file = os.path.join(
|
||||
self.testdir, 'sda1',
|
||||
storage_directory(diskfile.get_data_dir(int(policy)),
|
||||
part, hash_path_), timestamp + '.data')
|
||||
self.assertTrue(os.path.isfile(data_file))
|
||||
|
||||
check_file(old_part)
|
||||
check_file(new_part)
|
||||
|
||||
error_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertIn('[Errno 17] File exists', error_lines[0])
|
||||
self.assertEqual([], error_lines[1:])
|
||||
log_extras = self.logger.log_dict['error'][0][1]['extra']
|
||||
self.assertEqual('txn2', log_extras.get('txn_id'))
|
||||
|
||||
def test_PUT_next_part_power_races_around_makedirs_eexist(self):
|
||||
# simulate two 'concurrent' racing to create the new object dir in the
|
||||
# new partition and check that relinking tolerates the dir already
|
||||
|
Loading…
x
Reference in New Issue
Block a user