From 14c5b547f207d5d93e659a2aa4a78c725dcb081d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gon=C3=A9ri=20Le=20Bouder?= Date: Wed, 20 Nov 2013 16:26:03 +0100 Subject: [PATCH] test: improve db_replicator coverage This patch adds a test for ReplicatorRpc.complete_rsync() and complete extract_device() coverage. test_extract_device: test the case the parameter is invalid test_complete_rsync_with_bad_input: ensure the use of invalid parameters return a 404 erro test_complete_rsync: validate the returned code in case of success Change-Id: I59e0d26a1efe59d8beff1e81c2a7edc6de0872e9 --- test/unit/common/test_db_replicator.py | 34 ++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/test/unit/common/test_db_replicator.py b/test/unit/common/test_db_replicator.py index a9a41f7084..12edc49fad 100644 --- a/test/unit/common/test_db_replicator.py +++ b/test/unit/common/test_db_replicator.py @@ -29,6 +29,7 @@ from swift.common import db_replicator from swift.common.utils import normalize_timestamp from swift.container import server as container_server from swift.common.exceptions import DriveNotMounted +from swift.common.swob import HTTPException from test.unit import FakeLogger @@ -201,6 +202,9 @@ class FakeBroker: def get_info(self): return self.info + def newid(self, remote_d): + pass + class FakeAccountBroker(FakeBroker): db_type = 'account' @@ -564,6 +568,8 @@ class TestDBReplicator(unittest.TestCase): replicator = TestReplicator({'devices': '/some/root'}) self.assertEqual('some_device', replicator.extract_device( '/some/root/some_device/deeper/and/deeper')) + self.assertEqual('UNKNOWN', replicator.extract_device( + '/some/foo/some_device/deeper/and/deeper')) # def test_dispatch(self): # rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, False) @@ -600,6 +606,34 @@ class TestDBReplicator(unittest.TestCase): rpc.merge_syncs(fake_broker, args) self.assertEquals(fake_broker.args, (args[0],)) + def test_complete_rsync_with_bad_input(self): + drive = '/some/root' + db_file = __file__ + args = ['old_file'] + rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, False) + resp = rpc.complete_rsync(drive, db_file, args) + self.assertTrue(isinstance(resp, HTTPException)) + self.assertEquals(404, resp.status_int) + resp = rpc.complete_rsync(drive, 'new_db_file', args) + self.assertTrue(isinstance(resp, HTTPException)) + self.assertEquals(404, resp.status_int) + + def test_complete_rsync(self): + drive = mkdtemp() + args = ['old_file'] + rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, False) + os.mkdir('%s/tmp' % drive) + old_file = '%s/tmp/old_file' % drive + new_file = '%s/new_db_file' % drive + try: + fp = open(old_file, 'w') + fp.write('void') + fp.close + resp = rpc.complete_rsync(drive, new_file, args) + self.assertEquals(204, resp.status_int) + finally: + rmtree(drive) + def test_roundrobin_datadirs(self): listdir_calls = [] isdir_calls = []