From 78d417dda72691c599121ad6b47dce226dc1bcb9 Mon Sep 17 00:00:00 2001 From: gholt Date: Thu, 16 Jun 2011 00:59:55 +0000 Subject: [PATCH] consync: More tests and slight refactor to be more testable --- swift/container/sync.py | 41 +- test/unit/container/test_sync.py | 656 ++++++++++++++++++++++++++++++- 2 files changed, 672 insertions(+), 25 deletions(-) diff --git a/swift/container/sync.py b/swift/container/sync.py index 9b40bf5bcb..8d0be56f8b 100644 --- a/swift/container/sync.py +++ b/swift/container/sync.py @@ -15,13 +15,15 @@ import os from time import ctime, time -import random +from random import random, shuffle from struct import unpack_from from eventlet import sleep from swift.container import server as container_server -from swift.common import client, direct_client +from swift.common.client import ClientException, delete_object, put_object, \ + quote +from swift.common.direct_client import direct_get_object from swift.common.ring import Ring from swift.common.db import ContainerBroker from swift.common.utils import audit_location_generator, get_logger, \ @@ -182,7 +184,7 @@ class ContainerSync(Daemon): """ Runs container sync scans until stopped. """ - sleep(random.random() * self.interval) + sleep(random() * self.interval) while True: begin = time() all_locs = audit_location_generator(self.devices, @@ -320,7 +322,7 @@ class ContainerSync(Daemon): sync_point1 = row['ROWID'] broker.set_x_container_sync_points(sync_point1, None) self.container_syncs += 1 - except Exception: + except Exception, err: self.container_failures += 1 self.logger.exception(_('ERROR Syncing %s'), (broker.db_file)) @@ -341,11 +343,11 @@ class ContainerSync(Daemon): try: if row['deleted']: try: - client.delete_object(sync_to, name=row['name'], + delete_object(sync_to, name=row['name'], headers={'X-Timestamp': row['created_at'], 'X-Container-Sync-Key': sync_key}, proxy=self.proxy) - except client.ClientException, err: + except ClientException, err: if err.http_status != 404: raise self.container_deletes += 1 @@ -353,16 +355,15 @@ class ContainerSync(Daemon): part, nodes = self.object_ring.get_nodes( info['account'], info['container'], row['name']) - random.shuffle(nodes) + shuffle(nodes) exc = None for node in nodes: try: - headers, body = \ - direct_client.direct_get_object(node, part, - info['account'], info['container'], - row['name'], resp_chunk_size=65536) + headers, body = direct_get_object(node, part, + info['account'], info['container'], row['name'], + resp_chunk_size=65536) break - except client.ClientException, err: + except ClientException, err: exc = err else: if exc: @@ -380,26 +381,22 @@ class ContainerSync(Daemon): headers['etag'] = headers['etag'].strip('"') headers['X-Timestamp'] = row['created_at'] headers['X-Container-Sync-Key'] = sync_key - client.put_object(sync_to, name=row['name'], - headers=headers, - contents=_Iter2FileLikeObject(body), - proxy=self.proxy) + put_object(sync_to, name=row['name'], headers=headers, + contents=_Iter2FileLikeObject(body), proxy=self.proxy) self.container_puts += 1 - except client.ClientException, err: + except ClientException, err: if err.http_status == 401: self.logger.info(_('Unauth %(sync_from)r ' '=> %(sync_to)r key: %(sync_key)r'), {'sync_from': '%s/%s' % - (client.quote(info['account']), - client.quote(info['container'])), + (quote(info['account']), quote(info['container'])), 'sync_to': sync_to, 'sync_key': sync_key}) elif err.http_status == 404: self.logger.info(_('Not found %(sync_from)r ' '=> %(sync_to)r key: %(sync_key)r'), {'sync_from': '%s/%s' % - (client.quote(info['account']), - client.quote(info['container'])), + (quote(info['account']), quote(info['container'])), 'sync_to': sync_to, 'sync_key': sync_key}) else: @@ -408,7 +405,7 @@ class ContainerSync(Daemon): {'db_file': broker.db_file, 'row': row}) self.container_failures += 1 return False - except Exception: + except Exception, err: self.logger.exception( _('ERROR Syncing %(db_file)s %(row)s'), {'db_file': broker.db_file, 'row': row}) diff --git a/test/unit/container/test_sync.py b/test/unit/container/test_sync.py index c824e654e3..357c5627fb 100644 --- a/test/unit/container/test_sync.py +++ b/test/unit/container/test_sync.py @@ -16,9 +16,14 @@ import unittest from swift.container import sync +from swift.common import utils +from swift.common.client import ClientException -class FakeRing(): +utils.HASH_PATH_SUFFIX = 'endcap' + + +class FakeRing(object): def __init__(self): self.replica_count = 3 @@ -29,6 +34,34 @@ class FakeRing(): return 1, list(self.devs) +class FakeContainerBroker(object): + + def __init__(self, path, metadata=None, info=None, deleted=False, + items_since=None): + self.db_file = path + self.metadata = metadata if metadata else {} + self.info = info if info else {} + self.deleted = deleted + self.items_since = items_since if items_since else [] + self.sync_point1 = -1 + self.sync_point2 = -1 + + def get_info(self): + return self.info + + def is_deleted(self): + return self.deleted + + def get_items_since(self, sync_point, limit): + if sync_point < 0: + sync_point = 0 + return self.items_since[sync_point:sync_point + limit] + + def set_x_container_sync_points(self, sync_point1, sync_point2): + self.sync_point1 = sync_point1 + self.sync_point2 = sync_point2 + + class TestContainerSync(unittest.TestCase): def test_Iter2FileLikeObject(self): @@ -100,7 +133,8 @@ class TestContainerSync(unittest.TestCase): sync.time = fake_time sync.sleep = fake_sleep sync.audit_location_generator = fake_audit_location_generator - cs = sync.ContainerSync({}) + cs = sync.ContainerSync({}, container_ring=FakeRing(), + object_ring=FakeRing()) cs.run_forever() except Exception, err: if str(err) != 'we are now done': @@ -151,7 +185,8 @@ class TestContainerSync(unittest.TestCase): try: sync.time = fake_time sync.audit_location_generator = fake_audit_location_generator - cs = sync.ContainerSync({}) + cs = sync.ContainerSync({}, container_ring=FakeRing(), + object_ring=FakeRing()) cs.run_once() self.assertEquals(time_calls, [6]) self.assertEquals(audit_location_generator_calls, [1]) @@ -168,6 +203,621 @@ class TestContainerSync(unittest.TestCase): self.assertEquals(audit_location_generator_calls, [2]) self.assertEquals(cs.reported, 3604) + def test_container_sync_not_db(self): + cring = FakeRing() + oring = FakeRing() + cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring) + self.assertEquals(cs.container_failures, 0) + + def test_container_sync_missing_db(self): + cring = FakeRing() + oring = FakeRing() + cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring) + cs.container_sync('isa.db') + self.assertEquals(cs.container_failures, 1) + + def test_container_sync_not_my_db(self): + # Db could be there due to handoff replication so test that we ignore + # those. + cring = FakeRing() + oring = FakeRing() + cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring) + orig_ContainerBroker = sync.ContainerBroker + try: + sync.ContainerBroker = lambda p: FakeContainerBroker(p, + info={'account': 'a', 'container': 'c'}) + cs._myips = ['127.0.0.1'] # No match + cs._myport = 1 # No match + cs.container_sync('isa.db') + self.assertEquals(cs.container_failures, 0) + + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1 # No match + cs.container_sync('isa.db') + self.assertEquals(cs.container_failures, 0) + + cs._myips = ['127.0.0.1'] # No match + cs._myport = 1000 # Match + cs.container_sync('isa.db') + self.assertEquals(cs.container_failures, 0) + + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + # This complete match will cause the 1 container failure since the + # broker's info doesn't contain sync point keys + cs.container_sync('isa.db') + self.assertEquals(cs.container_failures, 1) + finally: + sync.ContainerBroker = orig_ContainerBroker + + def test_container_sync_deleted(self): + cring = FakeRing() + oring = FakeRing() + cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring) + orig_ContainerBroker = sync.ContainerBroker + try: + sync.ContainerBroker = lambda p: FakeContainerBroker(p, + info={'account': 'a', 'container': 'c'}, deleted=False) + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + # This complete match will cause the 1 container failure since the + # broker's info doesn't contain sync point keys + cs.container_sync('isa.db') + self.assertEquals(cs.container_failures, 1) + + sync.ContainerBroker = lambda p: FakeContainerBroker(p, + info={'account': 'a', 'container': 'c'}, deleted=True) + # This complete match will not cause any more container failures + # since the broker indicates deletion + cs.container_sync('isa.db') + self.assertEquals(cs.container_failures, 1) + finally: + sync.ContainerBroker = orig_ContainerBroker + + def test_container_sync_no_to_or_key(self): + cring = FakeRing() + oring = FakeRing() + cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring) + orig_ContainerBroker = sync.ContainerBroker + try: + sync.ContainerBroker = lambda p: FakeContainerBroker(p, + info={'account': 'a', 'container': 'c', + 'x_container_sync_point1': -1, + 'x_container_sync_point2': -1}) + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + # This complete match will be skipped since the broker's metadata + # has no x-container-sync-to or x-container-sync-key + cs.container_sync('isa.db') + self.assertEquals(cs.container_failures, 0) + self.assertEquals(cs.container_skips, 1) + + sync.ContainerBroker = lambda p: FakeContainerBroker(p, + info={'account': 'a', 'container': 'c', + 'x_container_sync_point1': -1, + 'x_container_sync_point2': -1}, + metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1)}) + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + # This complete match will be skipped since the broker's metadata + # has no x-container-sync-key + cs.container_sync('isa.db') + self.assertEquals(cs.container_failures, 0) + self.assertEquals(cs.container_skips, 2) + + sync.ContainerBroker = lambda p: FakeContainerBroker(p, + info={'account': 'a', 'container': 'c', + 'x_container_sync_point1': -1, + 'x_container_sync_point2': -1}, + metadata={'x-container-sync-key': ('key', 1)}) + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + # This complete match will be skipped since the broker's metadata + # has no x-container-sync-to + cs.container_sync('isa.db') + self.assertEquals(cs.container_failures, 0) + self.assertEquals(cs.container_skips, 3) + + sync.ContainerBroker = lambda p: FakeContainerBroker(p, + info={'account': 'a', 'container': 'c', + 'x_container_sync_point1': -1, + 'x_container_sync_point2': -1}, + metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), + 'x-container-sync-key': ('key', 1)}) + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + cs.allowed_sync_hosts = [] + # This complete match will cause a container failure since the + # sync-to won't validate as allowed. + cs.container_sync('isa.db') + self.assertEquals(cs.container_failures, 1) + self.assertEquals(cs.container_skips, 3) + + sync.ContainerBroker = lambda p: FakeContainerBroker(p, + info={'account': 'a', 'container': 'c', + 'x_container_sync_point1': -1, + 'x_container_sync_point2': -1}, + metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), + 'x-container-sync-key': ('key', 1)}) + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + cs.allowed_sync_hosts = ['127.0.0.1'] + # This complete match will succeed completely since the broker + # get_items_since will return no new rows. + cs.container_sync('isa.db') + self.assertEquals(cs.container_failures, 1) + self.assertEquals(cs.container_skips, 3) + finally: + sync.ContainerBroker = orig_ContainerBroker + + def test_container_stop_at(self): + cring = FakeRing() + oring = FakeRing() + cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring) + orig_ContainerBroker = sync.ContainerBroker + orig_time = sync.time + try: + sync.ContainerBroker = lambda p: FakeContainerBroker(p, + info={'account': 'a', 'container': 'c', + 'x_container_sync_point1': -1, + 'x_container_sync_point2': -1}, + metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), + 'x-container-sync-key': ('key', 1)}, + items_since=['erroneous data']) + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + cs.allowed_sync_hosts = ['127.0.0.1'] + # This sync will fail since the items_since data is bad. + cs.container_sync('isa.db') + self.assertEquals(cs.container_failures, 1) + self.assertEquals(cs.container_skips, 0) + + # Set up fake times to make the sync short-circuit as having taken + # too long + fake_times = [ + 1.0, # Compute the time to move on + 100000.0, # Compute if it's time to move on from first loop + 100000.0] # Compute if it's time to move on from second loop + + def fake_time(): + return fake_times.pop(0) + + sync.time = fake_time + # This same sync won't fail since it will look like it took so long + # as to be time to move on (before it ever actually tries to do + # anything). + cs.container_sync('isa.db') + self.assertEquals(cs.container_failures, 1) + self.assertEquals(cs.container_skips, 0) + finally: + sync.ContainerBroker = orig_ContainerBroker + sync.time = orig_time + + def test_container_first_loop(self): + cring = FakeRing() + oring = FakeRing() + cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring) + orig_ContainerBroker = sync.ContainerBroker + orig_hash_path = sync.hash_path + orig_delete_object = sync.delete_object + try: + + def fake_hash_path(account, container, obj, raw_digest=False): + # Ensures that no rows match for full syncing, ordinal is 0 and + # all hashes are 0 + return '\x00' * 16 + + sync.hash_path = fake_hash_path + fcb = FakeContainerBroker('path', + info={'account': 'a', 'container': 'c', + 'x_container_sync_point1': 2, + 'x_container_sync_point2': -1}, + metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), + 'x-container-sync-key': ('key', 1)}, + items_since=[{'ROWID': 1, 'name': 'o'}]) + sync.ContainerBroker = lambda p: fcb + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + cs.allowed_sync_hosts = ['127.0.0.1'] + cs.container_sync('isa.db') + # Succeeds because no rows match + self.assertEquals(cs.container_failures, 0) + self.assertEquals(cs.container_skips, 0) + self.assertEquals(fcb.sync_point1, None) + self.assertEquals(fcb.sync_point2, 1) + + def fake_hash_path(account, container, obj, raw_digest=False): + # Ensures that all rows match for full syncing, ordinal is 0 + # and all hashes are 1 + return '\x01' * 16 + + sync.hash_path = fake_hash_path + fcb = FakeContainerBroker('path', + info={'account': 'a', 'container': 'c', + 'x_container_sync_point1': 1, + 'x_container_sync_point2': -1}, + metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), + 'x-container-sync-key': ('key', 1)}, + items_since=[{'ROWID': 1, 'name': 'o'}]) + sync.ContainerBroker = lambda p: fcb + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + cs.allowed_sync_hosts = ['127.0.0.1'] + cs.container_sync('isa.db') + # Succeeds because the two sync points haven't deviated enough yet + self.assertEquals(cs.container_failures, 0) + self.assertEquals(cs.container_skips, 0) + self.assertEquals(fcb.sync_point1, -1) + self.assertEquals(fcb.sync_point2, -1) + + fcb = FakeContainerBroker('path', + info={'account': 'a', 'container': 'c', + 'x_container_sync_point1': 2, + 'x_container_sync_point2': -1}, + metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), + 'x-container-sync-key': ('key', 1)}, + items_since=[{'ROWID': 1, 'name': 'o'}]) + sync.ContainerBroker = lambda p: fcb + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + cs.allowed_sync_hosts = ['127.0.0.1'] + cs.container_sync('isa.db') + # Fails because container_sync_row will fail since the row has no + # 'deleted' key + self.assertEquals(cs.container_failures, 1) + self.assertEquals(cs.container_skips, 0) + self.assertEquals(fcb.sync_point1, -1) + self.assertEquals(fcb.sync_point2, -1) + + fcb = FakeContainerBroker('path', + info={'account': 'a', 'container': 'c', + 'x_container_sync_point1': 2, + 'x_container_sync_point2': -1}, + metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), + 'x-container-sync-key': ('key', 1)}, + items_since=[{'ROWID': 1, 'name': 'o', 'created_at': '1.2', + 'deleted': True}]) + sync.ContainerBroker = lambda p: fcb + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + cs.allowed_sync_hosts = ['127.0.0.1'] + cs.container_sync('isa.db') + # Fails because delete_object fails + self.assertEquals(cs.container_failures, 2) + self.assertEquals(cs.container_skips, 0) + self.assertEquals(fcb.sync_point1, -1) + self.assertEquals(fcb.sync_point2, -1) + + def fake_delete_object(*args, **kwargs): + pass + + sync.delete_object = fake_delete_object + fcb = FakeContainerBroker('path', + info={'account': 'a', 'container': 'c', + 'x_container_sync_point1': 2, + 'x_container_sync_point2': -1}, + metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), + 'x-container-sync-key': ('key', 1)}, + items_since=[{'ROWID': 1, 'name': 'o', 'created_at': '1.2', + 'deleted': True}]) + sync.ContainerBroker = lambda p: fcb + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + cs.allowed_sync_hosts = ['127.0.0.1'] + cs.container_sync('isa.db') + # Succeeds because delete_object succeeds + self.assertEquals(cs.container_failures, 2) + self.assertEquals(cs.container_skips, 0) + self.assertEquals(fcb.sync_point1, None) + self.assertEquals(fcb.sync_point2, 1) + finally: + sync.ContainerBroker = orig_ContainerBroker + sync.hash_path = orig_hash_path + sync.delete_object = orig_delete_object + + def test_container_second_loop(self): + cring = FakeRing() + oring = FakeRing() + cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring) + orig_ContainerBroker = sync.ContainerBroker + orig_hash_path = sync.hash_path + orig_delete_object = sync.delete_object + try: + # We'll ensure the first loop is always skipped by keeping the two + # sync points equal + + def fake_hash_path(account, container, obj, raw_digest=False): + # Ensures that no rows match for second loop, ordinal is 0 and + # all hashes are 1 + return '\x01' * 16 + + sync.hash_path = fake_hash_path + fcb = FakeContainerBroker('path', + info={'account': 'a', 'container': 'c', + 'x_container_sync_point1': -1, + 'x_container_sync_point2': -1}, + metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), + 'x-container-sync-key': ('key', 1)}, + items_since=[{'ROWID': 1, 'name': 'o'}]) + sync.ContainerBroker = lambda p: fcb + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + cs.allowed_sync_hosts = ['127.0.0.1'] + cs.container_sync('isa.db') + # Succeeds because no rows match + self.assertEquals(cs.container_failures, 0) + self.assertEquals(cs.container_skips, 0) + self.assertEquals(fcb.sync_point1, 1) + self.assertEquals(fcb.sync_point2, None) + + def fake_hash_path(account, container, obj, raw_digest=False): + # Ensures that all rows match for second loop, ordinal is 0 and + # all hashes are 0 + return '\x00' * 16 + + def fake_delete_object(*args, **kwargs): + pass + + sync.hash_path = fake_hash_path + sync.delete_object = fake_delete_object + fcb = FakeContainerBroker('path', + info={'account': 'a', 'container': 'c', + 'x_container_sync_point1': -1, + 'x_container_sync_point2': -1}, + metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), + 'x-container-sync-key': ('key', 1)}, + items_since=[{'ROWID': 1, 'name': 'o'}]) + sync.ContainerBroker = lambda p: fcb + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + cs.allowed_sync_hosts = ['127.0.0.1'] + cs.container_sync('isa.db') + # Fails because row is missing 'deleted' key + self.assertEquals(cs.container_failures, 1) + self.assertEquals(cs.container_skips, 0) + self.assertEquals(fcb.sync_point1, -1) + self.assertEquals(fcb.sync_point2, -1) + + fcb = FakeContainerBroker('path', + info={'account': 'a', 'container': 'c', + 'x_container_sync_point1': -1, + 'x_container_sync_point2': -1}, + metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), + 'x-container-sync-key': ('key', 1)}, + items_since=[{'ROWID': 1, 'name': 'o', 'created_at': '1.2', + 'deleted': True}]) + sync.ContainerBroker = lambda p: fcb + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + cs.allowed_sync_hosts = ['127.0.0.1'] + cs.container_sync('isa.db') + # Succeeds because row now has 'deleted' key and delete_object + # succeeds + self.assertEquals(cs.container_failures, 1) + self.assertEquals(cs.container_skips, 0) + self.assertEquals(fcb.sync_point1, 1) + self.assertEquals(fcb.sync_point2, None) + finally: + sync.ContainerBroker = orig_ContainerBroker + sync.hash_path = orig_hash_path + sync.delete_object = orig_delete_object + + def test_container_sync_row_delete(self): + orig_delete_object = sync.delete_object + try: + + def fake_delete_object(path, name=None, headers=None, proxy=None): + self.assertEquals(path, 'http://sync/to/path') + self.assertEquals(name, 'object') + self.assertEquals(headers, + {'X-Container-Sync-Key': 'key', 'X-Timestamp': '1.2'}) + self.assertEquals(proxy, 'http://proxy') + + sync.delete_object = fake_delete_object + cs = sync.ContainerSync({}, container_ring=FakeRing(), + object_ring=FakeRing()) + cs.proxy = 'http://proxy' + # Success + self.assertTrue(cs.container_sync_row({'deleted': True, + 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), 'info')) + self.assertEquals(cs.container_deletes, 1) + + exc = [] + + def fake_delete_object(path, name=None, headers=None, proxy=None): + exc.append(Exception('test exception')) + raise exc[-1] + + sync.delete_object = fake_delete_object + # Failure because of delete_object exception + self.assertFalse(cs.container_sync_row({'deleted': True, + 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), 'info')) + self.assertEquals(cs.container_deletes, 1) + self.assertEquals(len(exc), 1) + self.assertEquals(str(exc[-1]), 'test exception') + + def fake_delete_object(path, name=None, headers=None, proxy=None): + exc.append(ClientException('test client exception')) + raise exc[-1] + + sync.delete_object = fake_delete_object + # Failure because of delete_object exception + self.assertFalse(cs.container_sync_row({'deleted': True, + 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), 'info')) + self.assertEquals(cs.container_deletes, 1) + self.assertEquals(len(exc), 2) + self.assertEquals(str(exc[-1]), 'test client exception') + + def fake_delete_object(path, name=None, headers=None, proxy=None): + exc.append(ClientException('test client exception', + http_status=404)) + raise exc[-1] + + sync.delete_object = fake_delete_object + # Success because the object wasn't even found + self.assertTrue(cs.container_sync_row({'deleted': True, + 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), 'info')) + self.assertEquals(cs.container_deletes, 2) + self.assertEquals(len(exc), 3) + self.assertEquals(str(exc[-1]), 'test client exception: 404') + finally: + sync.delete_object = orig_delete_object + + def test_container_sync_row_put(self): + orig_shuffle = sync.shuffle + orig_put_object = sync.put_object + orig_direct_get_object = sync.direct_get_object + try: + sync.shuffle = lambda x: x + + def fake_put_object(sync_to, name=None, headers=None, + contents=None, proxy=None): + self.assertEquals(sync_to, 'http://sync/to/path') + self.assertEquals(name, 'object') + self.assertEquals(headers, {'X-Container-Sync-Key': 'key', + 'X-Timestamp': '1.2', + 'other-header': 'other header value', + 'etag': 'etagvalue'}) + self.assertEquals(contents.read(), 'contents') + self.assertEquals(proxy, 'http://proxy') + + sync.put_object = fake_put_object + + cs = sync.ContainerSync({}, container_ring=FakeRing(), + object_ring=FakeRing()) + cs.proxy = 'http://proxy' + + def fake_direct_get_object(node, part, account, container, obj, + resp_chunk_size=1): + return ({'other-header': 'other header value', + 'etag': '"etagvalue"'}, + iter('contents')) + + sync.direct_get_object = fake_direct_get_object + # Success as everything says it worked + self.assertTrue(cs.container_sync_row({'deleted': False, + 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), {'account': 'a', + 'container': 'c'})) + self.assertEquals(cs.container_puts, 1) + + def fake_direct_get_object(node, part, account, container, obj, + resp_chunk_size=1): + return ({'date': 'date value', + 'last-modified': 'last modified value', + 'other-header': 'other header value', + 'etag': '"etagvalue"'}, + iter('contents')) + + sync.direct_get_object = fake_direct_get_object + # Success as everything says it worked, also checks 'date' and + # 'last-modified' headers are removed and that 'etag' header is + # stripped of double quotes. + self.assertTrue(cs.container_sync_row({'deleted': False, + 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), {'account': 'a', + 'container': 'c'})) + self.assertEquals(cs.container_puts, 2) + + exc = [] + + def fake_direct_get_object(node, part, account, container, obj, + resp_chunk_size=1): + exc.append(Exception('test exception')) + raise exc[-1] + + sync.direct_get_object = fake_direct_get_object + # Fail due to completely unexpected exception + self.assertFalse(cs.container_sync_row({'deleted': False, + 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), {'account': 'a', + 'container': 'c'})) + self.assertEquals(cs.container_puts, 2) + self.assertEquals(len(exc), 1) + self.assertEquals(str(exc[-1]), 'test exception') + + def fake_direct_get_object(node, part, account, container, obj, + resp_chunk_size=1): + exc.append(ClientException('test client exception')) + raise exc[-1] + + sync.direct_get_object = fake_direct_get_object + # Fail due to all direct_get_object calls failing + self.assertFalse(cs.container_sync_row({'deleted': False, + 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), {'account': 'a', + 'container': 'c'})) + self.assertEquals(cs.container_puts, 2) + self.assertEquals(len(exc), 4) + self.assertEquals(str(exc[-1]), 'test client exception') + + def fake_direct_get_object(node, part, account, container, obj, + resp_chunk_size=1): + return ({'other-header': 'other header value', + 'etag': '"etagvalue"'}, + iter('contents')) + + def fake_put_object(sync_to, name=None, headers=None, + contents=None, proxy=None): + raise ClientException('test client exception', http_status=401) + + class FakeLogger(object): + + def __init__(self): + self.err = '' + self.exc = '' + + def info(self, err, *args, **kwargs): + self.err = err + + def exception(self, exc, *args, **kwargs): + self.exc = exc + + sync.direct_get_object = fake_direct_get_object + sync.put_object = fake_put_object + cs.logger = FakeLogger() + # Fail due to 401 + self.assertFalse(cs.container_sync_row({'deleted': False, + 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), {'account': 'a', + 'container': 'c'})) + self.assertEquals(cs.container_puts, 2) + self.assertTrue(cs.logger.err.startswith('Unauth ')) + + def fake_put_object(sync_to, name=None, headers=None, + contents=None, proxy=None): + raise ClientException('test client exception', http_status=404) + + sync.put_object = fake_put_object + # Fail due to 404 + self.assertFalse(cs.container_sync_row({'deleted': False, + 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), {'account': 'a', + 'container': 'c'})) + self.assertEquals(cs.container_puts, 2) + self.assertTrue(cs.logger.err.startswith('Not found ')) + + def fake_put_object(sync_to, name=None, headers=None, + contents=None, proxy=None): + raise ClientException('test client exception', http_status=503) + + sync.put_object = fake_put_object + # Fail due to 503 + self.assertFalse(cs.container_sync_row({'deleted': False, + 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), {'account': 'a', + 'container': 'c'})) + self.assertEquals(cs.container_puts, 2) + self.assertTrue(cs.logger.exc.startswith('ERROR Syncing ')) + finally: + sync.shuffle = orig_shuffle + sync.put_object = orig_put_object + sync.direct_get_object = orig_direct_get_object + if __name__ == '__main__': unittest.main()