Merge "ssync: fix decoding of ts_meta when ts_data has offset"
This commit is contained in:
commit
2e89e92cb7
@ -45,8 +45,8 @@ def decode_missing(line):
|
||||
parts = line.decode('ascii').split()
|
||||
result['object_hash'] = urllib.parse.unquote(parts[0])
|
||||
t_data = urllib.parse.unquote(parts[1])
|
||||
result['ts_data'] = Timestamp(t_data)
|
||||
result['ts_meta'] = result['ts_ctype'] = result['ts_data']
|
||||
result['ts_data'] = ts_data = Timestamp(t_data)
|
||||
result['ts_meta'] = result['ts_ctype'] = ts_data
|
||||
result['durable'] = True # default to True in case this key isn't sent
|
||||
if len(parts) > 2:
|
||||
# allow for a comma separated list of k:v pairs to future-proof
|
||||
@ -54,9 +54,13 @@ def decode_missing(line):
|
||||
for item in [subpart for subpart in subparts if ':' in subpart]:
|
||||
k, v = item.split(':')
|
||||
if k == 'm':
|
||||
result['ts_meta'] = Timestamp(t_data, delta=int(v, 16))
|
||||
# ignore ts_data offset when calculating ts_meta
|
||||
result['ts_meta'] = Timestamp(ts_data.normal,
|
||||
delta=int(v, 16))
|
||||
elif k == 't':
|
||||
result['ts_ctype'] = Timestamp(t_data, delta=int(v, 16))
|
||||
# ignore ts_data offset when calculating ts_ctype
|
||||
result['ts_ctype'] = Timestamp(ts_data.normal,
|
||||
delta=int(v, 16))
|
||||
elif k == 'durable':
|
||||
result['durable'] = utils.config_true_value(v)
|
||||
return result
|
||||
|
@ -15,12 +15,15 @@
|
||||
# limitations under the License.
|
||||
|
||||
from unittest import main
|
||||
import random
|
||||
|
||||
from swiftclient import client
|
||||
|
||||
from swift.common import direct_client
|
||||
from swift.common.request_helpers import get_reserved_name
|
||||
from swift.obj import reconstructor
|
||||
|
||||
from test.probe.common import ReplProbeTest
|
||||
from test.probe.common import ReplProbeTest, ECProbeTest
|
||||
|
||||
|
||||
class TestObjectVersioning(ReplProbeTest):
|
||||
@ -229,5 +232,99 @@ class TestObjectVersioning(ReplProbeTest):
|
||||
self.assertEqual(data, b'new version')
|
||||
|
||||
|
||||
class TestECObjectVersioning(ECProbeTest):
|
||||
|
||||
def setUp(self):
|
||||
super(TestECObjectVersioning, self).setUp()
|
||||
self.part, self.nodes = self.object_ring.get_nodes(
|
||||
self.account, self.container_name, self.object_name)
|
||||
|
||||
def test_versioning_with_metadata_replication(self):
|
||||
# Enable versioning
|
||||
client.put_container(self.url, self.token, self.container_name,
|
||||
headers={
|
||||
'X-Storage-Policy': self.policy.name,
|
||||
'X-Versions-Enabled': 'True',
|
||||
})
|
||||
# create version with metadata in a handoff location
|
||||
failed_primary = random.choice(self.nodes)
|
||||
failed_primary_device_path = self.device_dir(failed_primary)
|
||||
self.kill_drive(failed_primary_device_path)
|
||||
headers = {'x-object-meta-foo': 'meta-foo'}
|
||||
client.put_object(self.url, self.token, self.container_name,
|
||||
self.object_name, contents='some data',
|
||||
headers=headers)
|
||||
headers_post = {'x-object-meta-bar': 'meta-bar'}
|
||||
client.post_object(self.url, self.token, self.container_name,
|
||||
self.object_name, headers=headers_post)
|
||||
# find the handoff
|
||||
primary_ids = [n['id'] for n in self.nodes]
|
||||
for handoff in self.object_ring.devs:
|
||||
if handoff['id'] in primary_ids:
|
||||
continue
|
||||
try:
|
||||
headers, etag = self.direct_get(handoff, self.part)
|
||||
except direct_client.DirectClientException as err:
|
||||
if err.http_status != 404:
|
||||
raise
|
||||
else:
|
||||
break
|
||||
else:
|
||||
self.fail('unable to find object on handoffs')
|
||||
# we want to repair the fault, but avoid doing the handoff revert
|
||||
self.revive_drive(failed_primary_device_path)
|
||||
handoff_config = (handoff['id'] + 1) % 4
|
||||
failed_config = (failed_primary['id'] + 1) % 4
|
||||
partner_nodes = reconstructor._get_partners(
|
||||
failed_primary['index'], self.nodes)
|
||||
random.shuffle(partner_nodes)
|
||||
for partner in partner_nodes:
|
||||
fix_config = (partner['id'] + 1) % 4
|
||||
if fix_config not in (handoff_config, failed_config):
|
||||
break
|
||||
else:
|
||||
self.fail('unable to find fix_config in %r excluding %r & %r' % (
|
||||
[(d['device'], (d['id'] + 1) % 4) for d in partner_nodes],
|
||||
handoff_config, failed_config))
|
||||
|
||||
self.reconstructor.once(number=fix_config)
|
||||
# validate object in all locations
|
||||
missing = []
|
||||
etags = set()
|
||||
metadata = []
|
||||
for node in self.nodes:
|
||||
try:
|
||||
headers, etag = self.direct_get(node, self.part)
|
||||
except direct_client.DirectClientException as err:
|
||||
if err.http_status != 404:
|
||||
raise
|
||||
missing.append(node)
|
||||
continue
|
||||
etags.add(headers['X-Object-Sysmeta-Ec-Etag'])
|
||||
metadata.append(headers['X-Object-Meta-Bar'])
|
||||
if missing:
|
||||
self.fail('Ran reconstructor config #%s to repair %r but '
|
||||
'found 404 on primary: %r' % (
|
||||
fix_config, failed_primary['device'],
|
||||
[d['device'] for d in missing]))
|
||||
self.assertEqual(1, len(etags))
|
||||
self.assertEqual(['meta-bar'] * len(self.nodes), metadata)
|
||||
# process revert
|
||||
self.reconstructor.once(number=handoff_config)
|
||||
# validate object (still?) in primary locations
|
||||
etags = set()
|
||||
metadata = []
|
||||
for node in self.nodes:
|
||||
headers, etag = self.direct_get(node, self.part)
|
||||
etags.add(headers['X-Object-Sysmeta-Ec-Etag'])
|
||||
metadata.append(headers['X-Object-Meta-Bar'])
|
||||
self.assertEqual(1, len(etags))
|
||||
self.assertEqual(['meta-bar'] * len(self.nodes), metadata)
|
||||
# and removed form handoff
|
||||
with self.assertRaises(direct_client.DirectClientException) as ctx:
|
||||
headers, etag = self.direct_get(handoff, self.part)
|
||||
self.assertEqual(ctx.exception.http_status, 404)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
@ -1464,6 +1464,7 @@ class TestSsyncReplication(TestBaseSsync):
|
||||
|
||||
# o5 is on tx with meta, rx is in sync with data and meta
|
||||
t5 = next(self.ts_iter)
|
||||
t5 = utils.Timestamp(t5, offset=1) # note: use an offset for this test
|
||||
rx_objs['o5'] = self._create_ondisk_files(rx_df_mgr, 'o5', policy, t5)
|
||||
tx_objs['o5'] = self._create_ondisk_files(tx_df_mgr, 'o5', policy, t5)
|
||||
t5_meta = next(self.ts_iter)
|
||||
|
@ -2053,6 +2053,15 @@ class TestModuleMethods(unittest.TestCase):
|
||||
actual = ssync_receiver.decode_missing(msg)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
# test encode and decode functions invert with offset
|
||||
t_data_offset = utils.Timestamp(t_data, offset=1)
|
||||
expected = {'object_hash': object_hash, 'ts_meta': t_meta,
|
||||
'ts_data': t_data_offset, 'ts_ctype': t_type,
|
||||
'durable': False}
|
||||
msg = ssync_sender.encode_missing(**expected)
|
||||
actual = ssync_receiver.decode_missing(msg)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_decode_wanted(self):
|
||||
parts = ['d']
|
||||
expected = {'data': True}
|
||||
|
Loading…
x
Reference in New Issue
Block a user