From b7d916aea37eebe6f5b94529e92250fccd4231f6 Mon Sep 17 00:00:00 2001 From: Mikhail Durnosvistov Date: Thu, 13 Feb 2014 11:41:57 +0200 Subject: [PATCH] Cleanup mock patch without `with` part 1 A good idea because it makes the code easier to read to get rid of the `with` using mock.patch and mock.patch.object in favor decorators, where possible in the tests. This patch only affects drivers. Change-Id: I564428a6315352f72ad3bad69bf7dd8cd88ffd9e --- ironic/tests/drivers/test_ipminative.py | 196 +++--- ironic/tests/drivers/test_ipmitool.py | 345 +++++------ ironic/tests/drivers/test_pxe.py | 325 +++++----- ironic/tests/drivers/test_ssh.py | 752 +++++++++++------------- 4 files changed, 758 insertions(+), 860 deletions(-) diff --git a/ironic/tests/drivers/test_ipminative.py b/ironic/tests/drivers/test_ipminative.py index 6e436e01c8..7730039001 100644 --- a/ironic/tests/drivers/test_ipminative.py +++ b/ironic/tests/drivers/test_ipminative.py @@ -50,9 +50,6 @@ class IPMINativePrivateMethodTestCase(base.TestCase): driver_info=INFO_DICT) self.dbapi = db_api.get_instance() self.info = ipminative._parse_driver_info(self.node) - ipmi_patch = mock.patch('pyghmi.ipmi.command.Command') - self.ipmi_mock = ipmi_patch.start() - self.addCleanup(ipmi_patch.stop) def test__parse_driver_info(self): # make sure we get back the expected things @@ -70,32 +67,36 @@ class IPMINativePrivateMethodTestCase(base.TestCase): ipminative._parse_driver_info, node) - def test__power_status_on(self): - ipmicmd = self.ipmi_mock.return_value + @mock.patch('pyghmi.ipmi.command.Command') + def test__power_status_on(self, ipmi_mock): + ipmicmd = ipmi_mock.return_value ipmicmd.get_power.return_value = {'powerstate': 'on'} state = ipminative._power_status(self.info) ipmicmd.get_power.assert_called_once_with() self.assertEqual(states.POWER_ON, state) - def test__power_status_off(self): - ipmicmd = self.ipmi_mock.return_value + @mock.patch('pyghmi.ipmi.command.Command') + def test__power_status_off(self, ipmi_mock): + ipmicmd = ipmi_mock.return_value ipmicmd.get_power.return_value = {'powerstate': 'off'} state = ipminative._power_status(self.info) ipmicmd.get_power.assert_called_once_with() self.assertEqual(states.POWER_OFF, state) - def test__power_status_error(self): - ipmicmd = self.ipmi_mock.return_value + @mock.patch('pyghmi.ipmi.command.Command') + def test__power_status_error(self, ipmi_mock): + ipmicmd = ipmi_mock.return_value ipmicmd.get_power.return_value = {'powerstate': 'Error'} state = ipminative._power_status(self.info) ipmicmd.get_power.assert_called_once_with() self.assertEqual(states.ERROR, state) - def test__power_on(self): - ipmicmd = self.ipmi_mock.return_value + @mock.patch('pyghmi.ipmi.command.Command') + def test__power_on(self, ipmi_mock): + ipmicmd = ipmi_mock.return_value ipmicmd.set_power.return_value = {'powerstate': 'on'} self.config(retry_timeout=400, group='ipmi') @@ -103,8 +104,9 @@ class IPMINativePrivateMethodTestCase(base.TestCase): ipmicmd.set_power.assert_called_once_with('on', 400) self.assertEqual(states.POWER_ON, state) - def test__power_off(self): - ipmicmd = self.ipmi_mock.return_value + @mock.patch('pyghmi.ipmi.command.Command') + def test__power_off(self, ipmi_mock): + ipmicmd = ipmi_mock.return_value ipmicmd.set_power.return_value = {'powerstate': 'off'} self.config(retry_timeout=500, group='ipmi') @@ -112,8 +114,9 @@ class IPMINativePrivateMethodTestCase(base.TestCase): ipmicmd.set_power.assert_called_once_with('off', 500) self.assertEqual(states.POWER_OFF, state) - def test__reboot(self): - ipmicmd = self.ipmi_mock.return_value + @mock.patch('pyghmi.ipmi.command.Command') + def test__reboot(self, ipmi_mock): + ipmicmd = ipmi_mock.return_value ipmicmd.set_power.return_value = {'powerstate': 'on'} self.config(retry_timeout=600, group='ipmi') @@ -138,75 +141,75 @@ class IPMINativeDriverTestCase(db_base.DbTestCase): self.dbapi = db_api.get_instance() self.info = ipminative._parse_driver_info(self.node) - def test_get_power_state(self): - with mock.patch('pyghmi.ipmi.command.Command') as ipmi_mock: - # Getting the mocked command. - cmd_mock = ipmi_mock.return_value - # Getting the get power mock. - get_power_mock = cmd_mock.get_power + @mock.patch('pyghmi.ipmi.command.Command') + def test_get_power_state(self, ipmi_mock): + # Getting the mocked command. + cmd_mock = ipmi_mock.return_value + # Getting the get power mock. + get_power_mock = cmd_mock.get_power - return_values = [{'powerstate': 'error'}, - {'powerstate': 'on'}, - {'powerstate': 'off'}] + return_values = [{'powerstate': 'error'}, + {'powerstate': 'on'}, + {'powerstate': 'off'}] - get_power_mock.side_effect = lambda: return_values.pop() + get_power_mock.side_effect = lambda: return_values.pop() - pstate = self.driver.power.get_power_state(None, self.node) - self.assertEqual(states.POWER_OFF, pstate) + pstate = self.driver.power.get_power_state(None, self.node) + self.assertEqual(states.POWER_OFF, pstate) - pstate = self.driver.power.get_power_state(None, self.node) - self.assertEqual(states.POWER_ON, pstate) + pstate = self.driver.power.get_power_state(None, self.node) + self.assertEqual(states.POWER_ON, pstate) - pstate = self.driver.power.get_power_state(None, self.node) - self.assertEqual(states.ERROR, pstate) - self.assertEqual(3, get_power_mock.call_count, - "pyghmi.ipmi.command.Command.get_power was not" - " called 3 times.") + pstate = self.driver.power.get_power_state(None, self.node) + self.assertEqual(states.ERROR, pstate) + self.assertEqual(3, get_power_mock.call_count, + "pyghmi.ipmi.command.Command.get_power was not" + " called 3 times.") - def test_set_power_on_ok(self): - with mock.patch.object(ipminative, '_power_on') as power_on_mock: - power_on_mock.return_value = states.POWER_ON + @mock.patch.object(ipminative, '_power_on') + def test_set_power_on_ok(self, power_on_mock): + power_on_mock.return_value = states.POWER_ON - with task_manager.acquire(self.context, - [self.node.uuid]) as task: - self.driver.power.set_power_state( - task, self.node, states.POWER_ON) - power_on_mock.assert_called_once_with(self.info) + with task_manager.acquire(self.context, + [self.node.uuid]) as task: + self.driver.power.set_power_state( + task, self.node, states.POWER_ON) + power_on_mock.assert_called_once_with(self.info) - def test_set_power_off_ok(self): - with mock.patch.object(ipminative, '_power_off') as power_off_mock: - power_off_mock.return_value = states.POWER_OFF + @mock.patch.object(ipminative, '_power_off') + def test_set_power_off_ok(self, power_off_mock): + power_off_mock.return_value = states.POWER_OFF - with task_manager.acquire(self.context, - [self.node.uuid]) as task: - self.driver.power.set_power_state( - task, self.node, states.POWER_OFF) - power_off_mock.assert_called_once_with(self.info) + with task_manager.acquire(self.context, + [self.node.uuid]) as task: + self.driver.power.set_power_state( + task, self.node, states.POWER_OFF) + power_off_mock.assert_called_once_with(self.info) - def test_set_power_on_fail(self): - with mock.patch('pyghmi.ipmi.command.Command') as ipmi_mock: - ipmicmd = ipmi_mock.return_value - ipmicmd.set_power.return_value = {'powerstate': 'error'} + @mock.patch('pyghmi.ipmi.command.Command') + def test_set_power_on_fail(self, ipmi_mock): + ipmicmd = ipmi_mock.return_value + ipmicmd.set_power.return_value = {'powerstate': 'error'} - self.config(retry_timeout=500, group='ipmi') - with task_manager.acquire(self.context, - [self.node.uuid]) as task: - self.assertRaises(exception.PowerStateFailure, - self.driver.power.set_power_state, - task, - self.node, - states.POWER_ON) - ipmicmd.set_power.assert_called_once_with('on', 500) + self.config(retry_timeout=500, group='ipmi') + with task_manager.acquire(self.context, + [self.node.uuid]) as task: + self.assertRaises(exception.PowerStateFailure, + self.driver.power.set_power_state, + task, + self.node, + states.POWER_ON) + ipmicmd.set_power.assert_called_once_with('on', 500) - def test_set_boot_device_ok(self): - with mock.patch('pyghmi.ipmi.command.Command') as ipmi_mock: - ipmicmd = ipmi_mock.return_value - ipmicmd.set_bootdev.return_value = None + @mock.patch('pyghmi.ipmi.command.Command') + def test_set_boot_device_ok(self, ipmi_mock): + ipmicmd = ipmi_mock.return_value + ipmicmd.set_bootdev.return_value = None - with task_manager.acquire(self.context, - [self.node.uuid]) as task: - self.driver.vendor._set_boot_device(task, 'pxe') - ipmicmd.set_bootdev.assert_called_once_with('pxe') + with task_manager.acquire(self.context, + [self.node.uuid]) as task: + self.driver.vendor._set_boot_device(task, 'pxe') + ipmicmd.set_bootdev.assert_called_once_with('pxe') def test_set_boot_device_bad_device(self): with task_manager.acquire(self.context, [self.node.uuid]) as task: @@ -215,28 +218,28 @@ class IPMINativeDriverTestCase(db_base.DbTestCase): task, 'fake-device') - def test_reboot_ok(self): - with mock.patch.object(ipminative, '_reboot') as reboot_mock: - reboot_mock.return_value = None + @mock.patch.object(ipminative, '_reboot') + def test_reboot_ok(self, reboot_mock): + reboot_mock.return_value = None - with task_manager.acquire(self.context, - [self.node.uuid]) as task: - self.driver.power.reboot(task, self.node) - reboot_mock.assert_called_once_with(self.info) + with task_manager.acquire(self.context, + [self.node.uuid]) as task: + self.driver.power.reboot(task, self.node) + reboot_mock.assert_called_once_with(self.info) - def test_reboot_fail(self): - with mock.patch('pyghmi.ipmi.command.Command') as ipmi_mock: - ipmicmd = ipmi_mock.return_value - ipmicmd.set_power.return_value = {'powerstate': 'error'} + @mock.patch('pyghmi.ipmi.command.Command') + def test_reboot_fail(self, ipmi_mock): + ipmicmd = ipmi_mock.return_value + ipmicmd.set_power.return_value = {'powerstate': 'error'} - self.config(retry_timeout=500, group='ipmi') - with task_manager.acquire(self.context, - [self.node.uuid]) as task: - self.assertRaises(exception.PowerStateFailure, - self.driver.power.reboot, - task, - self.node) - ipmicmd.set_power.assert_called_once_with('boot', 500) + self.config(retry_timeout=500, group='ipmi') + with task_manager.acquire(self.context, + [self.node.uuid]) as task: + self.assertRaises(exception.PowerStateFailure, + self.driver.power.reboot, + task, + self.node) + ipmicmd.set_power.assert_called_once_with('boot', 500) def test_vendor_passthru_validate__set_boot_device_good(self): with task_manager.acquire(self.context, @@ -267,12 +270,11 @@ class IPMINativeDriverTestCase(db_base.DbTestCase): self.driver.vendor.validate, task, method='non-existent-method') - def test_vendor_passthru_call__set_boot_device(self): + @mock.patch.object(ipminative.VendorPassthru, '_set_boot_device') + def test_vendor_passthru_call__set_boot_device(self, boot_mock): with task_manager.acquire(self.context, [self.node.uuid], shared=False) as task: - with mock.patch.object(ipminative.VendorPassthru, - '_set_boot_device') as boot_mock: - self.driver.vendor.vendor_passthru(task, - method='set_boot_device', - device='pxe') - boot_mock.assert_called_once_with(task, 'pxe', False) + self.driver.vendor.vendor_passthru(task, + method='set_boot_device', + device='pxe') + boot_mock.assert_called_once_with(task, 'pxe', False) diff --git a/ironic/tests/drivers/test_ipmitool.py b/ironic/tests/drivers/test_ipmitool.py index b8f7c85420..a6122c17e0 100644 --- a/ironic/tests/drivers/test_ipmitool.py +++ b/ironic/tests/drivers/test_ipmitool.py @@ -104,7 +104,9 @@ class IPMIToolPrivateMethodTestCase(base.TestCase): ipmi._parse_driver_info, node) - def test__exec_ipmitool(self): + @mock.patch.object(ipmi, '_make_password_file', autospec=True) + @mock.patch.object(utils, 'execute', autospec=True) + def test__exec_ipmitool(self, mock_exec, mock_pwf): pw_file_handle = tempfile.NamedTemporaryFile() pw_file = pw_file_handle.name file_handle = open(pw_file, "w") @@ -118,19 +120,17 @@ class IPMIToolPrivateMethodTestCase(base.TestCase): 'A', 'B', 'C', ] - with mock.patch.object(ipmi, '_make_password_file', - autospec=True) as mock_pwf: - mock_pwf.return_value = file_handle - with mock.patch.object(utils, 'execute', - autospec=True) as mock_exec: - mock_exec.return_value = (None, None) + mock_pwf.return_value = file_handle + mock_exec.return_value = (None, None) - ipmi._exec_ipmitool(self.info, 'A B C') + ipmi._exec_ipmitool(self.info, 'A B C') - mock_pwf.assert_called_once_with(self.info['password']) - mock_exec.assert_called_once_with(*args, attempts=3) + mock_pwf.assert_called_once_with(self.info['password']) + mock_exec.assert_called_once_with(*args, attempts=3) - def test__exec_ipmitool_without_password(self): + @mock.patch.object(ipmi, '_make_password_file', autospec=True) + @mock.patch.object(utils, 'execute', autospec=True) + def test__exec_ipmitool_without_password(self, mock_exec, mock_pwf): self.info['password'] = None pw_file_handle = tempfile.NamedTemporaryFile() pw_file = pw_file_handle.name @@ -145,17 +145,15 @@ class IPMIToolPrivateMethodTestCase(base.TestCase): 'A', 'B', 'C', ] - with mock.patch.object(ipmi, '_make_password_file', - autospec=True) as mock_pwf: - mock_pwf.return_value = file_handle - with mock.patch.object(utils, 'execute', - autospec=True) as mock_exec: - mock_exec.return_value = (None, None) - ipmi._exec_ipmitool(self.info, 'A B C') - self.assertTrue(mock_pwf.called) - mock_exec.assert_called_once_with(*args, attempts=3) + mock_pwf.return_value = file_handle + mock_exec.return_value = (None, None) + ipmi._exec_ipmitool(self.info, 'A B C') + self.assertTrue(mock_pwf.called) + mock_exec.assert_called_once_with(*args, attempts=3) - def test__exec_ipmitool_without_username(self): + @mock.patch.object(ipmi, '_make_password_file', autospec=True) + @mock.patch.object(utils, 'execute', autospec=True) + def test__exec_ipmitool_without_username(self, mock_exec, mock_pwf): self.info['username'] = None pw_file_handle = tempfile.NamedTemporaryFile() pw_file = pw_file_handle.name @@ -169,17 +167,15 @@ class IPMIToolPrivateMethodTestCase(base.TestCase): 'A', 'B', 'C', ] - with mock.patch.object(ipmi, '_make_password_file', - autospec=True) as mock_pwf: - mock_pwf.return_value = file_handle - with mock.patch.object(utils, 'execute', - autospec=True) as mock_exec: - mock_exec.return_value = (None, None) - ipmi._exec_ipmitool(self.info, 'A B C') - self.assertTrue(mock_pwf.called) - mock_exec.assert_called_once_with(*args, attempts=3) + mock_pwf.return_value = file_handle + mock_exec.return_value = (None, None) + ipmi._exec_ipmitool(self.info, 'A B C') + self.assertTrue(mock_pwf.called) + mock_exec.assert_called_once_with(*args, attempts=3) - def test__exec_ipmitool_exception(self): + @mock.patch.object(ipmi, '_make_password_file', autospec=True) + @mock.patch.object(utils, 'execute', autospec=True) + def test__exec_ipmitool_exception(self, mock_exec, mock_pwf): pw_file_handle = tempfile.NamedTemporaryFile() pw_file = pw_file_handle.name file_handle = open(pw_file, "w") @@ -193,59 +189,52 @@ class IPMIToolPrivateMethodTestCase(base.TestCase): 'A', 'B', 'C', ] - with mock.patch.object(ipmi, '_make_password_file', - autospec=True) as mock_pwf: - mock_pwf.return_value = file_handle - with mock.patch.object(utils, 'execute', - autospec=True) as mock_exec: - mock_exec.side_effect = processutils.ProcessExecutionError("x") - self.assertRaises(processutils.ProcessExecutionError, - ipmi._exec_ipmitool, - self.info, 'A B C') - mock_pwf.assert_called_once_with(self.info['password']) - mock_exec.assert_called_once_with(*args, attempts=3) + mock_pwf.return_value = file_handle + mock_exec.side_effect = processutils.ProcessExecutionError("x") + self.assertRaises(processutils.ProcessExecutionError, + ipmi._exec_ipmitool, + self.info, 'A B C') + mock_pwf.assert_called_once_with(self.info['password']) + mock_exec.assert_called_once_with(*args, attempts=3) - def test__power_status_on(self): - with mock.patch.object(ipmi, '_exec_ipmitool', - autospec=True) as mock_exec: - mock_exec.return_value = ["Chassis Power is on\n", None] + @mock.patch.object(ipmi, '_exec_ipmitool', autospec=True) + def test__power_status_on(self, mock_exec): + mock_exec.return_value = ["Chassis Power is on\n", None] - state = ipmi._power_status(self.info) + state = ipmi._power_status(self.info) - mock_exec.assert_called_once_with(self.info, "power status") - self.assertEqual(states.POWER_ON, state) + mock_exec.assert_called_once_with(self.info, "power status") + self.assertEqual(states.POWER_ON, state) - def test__power_status_off(self): - with mock.patch.object(ipmi, '_exec_ipmitool', - autospec=True) as mock_exec: - mock_exec.return_value = ["Chassis Power is off\n", None] + @mock.patch.object(ipmi, '_exec_ipmitool', autospec=True) + def test__power_status_off(self, mock_exec): + mock_exec.return_value = ["Chassis Power is off\n", None] - state = ipmi._power_status(self.info) + state = ipmi._power_status(self.info) - mock_exec.assert_called_once_with(self.info, "power status") - self.assertEqual(states.POWER_OFF, state) + mock_exec.assert_called_once_with(self.info, "power status") + self.assertEqual(states.POWER_OFF, state) - def test__power_status_error(self): - with mock.patch.object(ipmi, '_exec_ipmitool', - autospec=True) as mock_exec: - mock_exec.return_value = ["Chassis Power is badstate\n", None] + @mock.patch.object(ipmi, '_exec_ipmitool', autospec=True) + def test__power_status_error(self, mock_exec): + mock_exec.return_value = ["Chassis Power is badstate\n", None] - state = ipmi._power_status(self.info) + state = ipmi._power_status(self.info) - mock_exec.assert_called_once_with(self.info, "power status") - self.assertEqual(states.ERROR, state) + mock_exec.assert_called_once_with(self.info, "power status") + self.assertEqual(states.ERROR, state) - def test__power_status_exception(self): - with mock.patch.object(ipmi, '_exec_ipmitool', - side_effect=processutils.ProcessExecutionError("error"), - autospec=True) as mock_exec: - self.assertRaises(exception.IPMIFailure, - ipmi._power_status, - self.info) - mock_exec.assert_called_once_with(self.info, "power status") + @mock.patch.object(ipmi, '_exec_ipmitool', autospec=True) + def test__power_status_exception(self, mock_exec): + mock_exec.side_effect = processutils.ProcessExecutionError("error") + self.assertRaises(exception.IPMIFailure, + ipmi._power_status, + self.info) + mock_exec.assert_called_once_with(self.info, "power status") + @mock.patch.object(ipmi, '_exec_ipmitool', autospec=True) @mock.patch('eventlet.greenthread.sleep') - def test__power_on_max_retries(self, sleep_mock): + def test__power_on_max_retries(self, sleep_mock, mock_exec): self.config(retry_timeout=2, group='ipmi') def side_effect(driver_info, command): @@ -253,18 +242,16 @@ class IPMIToolPrivateMethodTestCase(base.TestCase): "power on": [None, None]} return resp_dict.get(command, ["Bad\n", None]) - with mock.patch.object(ipmi, '_exec_ipmitool', - autospec=True) as mock_exec: - mock_exec.side_effect = side_effect + mock_exec.side_effect = side_effect - expected = [mock.call(self.info, "power on"), - mock.call(self.info, "power status"), - mock.call(self.info, "power status")] + expected = [mock.call(self.info, "power on"), + mock.call(self.info, "power status"), + mock.call(self.info, "power status")] - state = ipmi._power_on(self.info) + state = ipmi._power_on(self.info) - self.assertEqual(mock_exec.call_args_list, expected) - self.assertEqual(states.ERROR, state) + self.assertEqual(mock_exec.call_args_list, expected) + self.assertEqual(states.ERROR, state) class IPMIToolDriverTestCase(db_base.DbTestCase): @@ -281,87 +268,83 @@ class IPMIToolDriverTestCase(db_base.DbTestCase): driver_info=INFO_DICT) self.info = ipmi._parse_driver_info(self.node) - def test_get_power_state(self): - returns = [["Chassis Power is off\n", None], - ["Chassis Power is on\n", None], - ["\n", None]] + @mock.patch.object(ipmi, '_exec_ipmitool', autospec=True) + def test_get_power_state(self, mock_exec): + returns = iter([["Chassis Power is off\n", None], + ["Chassis Power is on\n", None], + ["\n", None]]) expected = [mock.call(self.info, "power status"), mock.call(self.info, "power status"), mock.call(self.info, "power status")] - with mock.patch.object(ipmi, '_exec_ipmitool', side_effect=returns, - autospec=True) as mock_exec: + mock_exec.side_effect = returns - pstate = self.driver.power.get_power_state(None, self.node) - self.assertEqual(states.POWER_OFF, pstate) + pstate = self.driver.power.get_power_state(None, self.node) + self.assertEqual(states.POWER_OFF, pstate) - pstate = self.driver.power.get_power_state(None, self.node) - self.assertEqual(states.POWER_ON, pstate) + pstate = self.driver.power.get_power_state(None, self.node) + self.assertEqual(states.POWER_ON, pstate) - pstate = self.driver.power.get_power_state(None, self.node) - self.assertEqual(states.ERROR, pstate) + pstate = self.driver.power.get_power_state(None, self.node) + self.assertEqual(states.ERROR, pstate) - self.assertEqual(mock_exec.call_args_list, expected) + self.assertEqual(mock_exec.call_args_list, expected) - def test_get_power_state_exception(self): - with mock.patch.object(ipmi, '_exec_ipmitool', - side_effect=processutils.ProcessExecutionError("error"), - autospec=True) as mock_exec: - self.assertRaises(exception.IPMIFailure, - self.driver.power.get_power_state, - None, - self.node) + @mock.patch.object(ipmi, '_exec_ipmitool', autospec=True) + def test_get_power_state_exception(self, mock_exec): + mock_exec.side_effect = processutils.ProcessExecutionError("error") + self.assertRaises(exception.IPMIFailure, + self.driver.power.get_power_state, + None, + self.node) mock_exec.assert_called_once_with(self.info, "power status") - def test_set_power_on_ok(self): + @mock.patch.object(ipmi, '_power_on', autospec=True) + @mock.patch.object(ipmi, '_power_off', autospec=True) + def test_set_power_on_ok(self, mock_off, mock_on): self.config(retry_timeout=0, group='ipmi') - with mock.patch.object(ipmi, '_power_on', autospec=True) as mock_on: - mock_on.return_value = states.POWER_ON - with mock.patch.object(ipmi, '_power_off', - autospec=True) as mock_off: + mock_on.return_value = states.POWER_ON + with task_manager.acquire(self.context, + [self.node['uuid']]) as task: + self.driver.power.set_power_state(task, + self.node, + states.POWER_ON) - with task_manager.acquire(self.context, - [self.node['uuid']]) as task: - self.driver.power.set_power_state( - task, self.node, states.POWER_ON) + mock_on.assert_called_once_with(self.info) + self.assertFalse(mock_off.called) - mock_on.assert_called_once_with(self.info) - self.assertFalse(mock_off.called) - - def test_set_power_off_ok(self): + @mock.patch.object(ipmi, '_power_on', autospec=True) + @mock.patch.object(ipmi, '_power_off', autospec=True) + def test_set_power_off_ok(self, mock_off, mock_on): self.config(retry_timeout=0, group='ipmi') - with mock.patch.object(ipmi, '_power_on', autospec=True) as mock_on: - with mock.patch.object(ipmi, '_power_off', - autospec=True) as mock_off: - mock_off.return_value = states.POWER_OFF + mock_off.return_value = states.POWER_OFF - with task_manager.acquire(self.context, - [self.node['uuid']]) as task: - self.driver.power.set_power_state( - task, self.node, states.POWER_OFF) + with task_manager.acquire(self.context, + [self.node['uuid']]) as task: + self.driver.power.set_power_state(task, + self.node, + states.POWER_OFF) - mock_off.assert_called_once_with(self.info) - self.assertFalse(mock_on.called) + mock_off.assert_called_once_with(self.info) + self.assertFalse(mock_on.called) - def test_set_power_on_fail(self): + @mock.patch.object(ipmi, '_power_on', autospec=True) + @mock.patch.object(ipmi, '_power_off', autospec=True) + def test_set_power_on_fail(self, mock_off, mock_on): self.config(retry_timeout=0, group='ipmi') - with mock.patch.object(ipmi, '_power_on', autospec=True) as mock_on: - mock_on.return_value = states.ERROR - with mock.patch.object(ipmi, '_power_off', - autospec=True) as mock_off: + mock_on.return_value = states.ERROR + with task_manager.acquire(self.context, + [self.node['uuid']]) as task: + self.assertRaises(exception.PowerStateFailure, + self.driver.power.set_power_state, + task, + self.node, + states.POWER_ON) - with task_manager.acquire(self.context, - [self.node['uuid']]) as task: - self.assertRaises(exception.PowerStateFailure, - self.driver.power.set_power_state, - task, - self.node, - states.POWER_ON) - - mock_on.assert_called_once_with(self.info) - self.assertFalse(mock_off.called) + mock_on.assert_called_once_with(self.info) + self.assertFalse(mock_off.called) def test_set_power_invalid_state(self): with task_manager.acquire(self.context, [self.node['uuid']]) as task: @@ -371,16 +354,15 @@ class IPMIToolDriverTestCase(db_base.DbTestCase): self.node, "fake state") - def test_set_boot_device_ok(self): - with mock.patch.object(ipmi, '_exec_ipmitool', - autospec=True) as mock_exec: - mock_exec.return_value = [None, None] + @mock.patch.object(ipmi, '_exec_ipmitool', autospec=True) + def test_set_boot_device_ok(self, mock_exec): + mock_exec.return_value = [None, None] - with task_manager.acquire(self.context, - [self.node['uuid']]) as task: - self.driver.vendor._set_boot_device(task, 'pxe') + with task_manager.acquire(self.context, + [self.node['uuid']]) as task: + self.driver.vendor._set_boot_device(task, 'pxe') - mock_exec.assert_called_once_with(self.info, "chassis bootdev pxe") + mock_exec.assert_called_once_with(self.info, "chassis bootdev pxe") def test_set_boot_device_bad_device(self): with task_manager.acquire(self.context, [self.node['uuid']]) as task: @@ -389,44 +371,42 @@ class IPMIToolDriverTestCase(db_base.DbTestCase): task, 'fake-device') - def test_reboot_ok(self): + @mock.patch.object(ipmi, '_power_off', autospec=False) + @mock.patch.object(ipmi, '_power_on', autospec=False) + def test_reboot_ok(self, mock_on, mock_off): manager = mock.MagicMock() #NOTE(rloo): if autospec is True, then manager.mock_calls is empty - with mock.patch.object(ipmi, '_power_off', autospec=False) as mock_off: - with mock.patch.object(ipmi, '_power_on', - autospec=False) as mock_on: - mock_on.return_value = states.POWER_ON - manager.attach_mock(mock_off, 'power_off') - manager.attach_mock(mock_on, 'power_on') - expected = [mock.call.power_off(self.info), - mock.call.power_on(self.info)] + mock_on.return_value = states.POWER_ON + manager.attach_mock(mock_off, 'power_off') + manager.attach_mock(mock_on, 'power_on') + expected = [mock.call.power_off(self.info), + mock.call.power_on(self.info)] - with task_manager.acquire(self.context, - [self.node['uuid']]) as task: - self.driver.power.reboot(task, self.node) + with task_manager.acquire(self.context, + [self.node['uuid']]) as task: + self.driver.power.reboot(task, self.node) - self.assertEqual(manager.mock_calls, expected) + self.assertEqual(manager.mock_calls, expected) - def test_reboot_fail(self): + @mock.patch.object(ipmi, '_power_off', autospec=False) + @mock.patch.object(ipmi, '_power_on', autospec=False) + def test_reboot_fail(self, mock_on, mock_off): manager = mock.MagicMock() #NOTE(rloo): if autospec is True, then manager.mock_calls is empty - with mock.patch.object(ipmi, '_power_off', autospec=False) as mock_off: - with mock.patch.object(ipmi, '_power_on', - autospec=False) as mock_on: - mock_on.return_value = states.ERROR - manager.attach_mock(mock_off, 'power_off') - manager.attach_mock(mock_on, 'power_on') - expected = [mock.call.power_off(self.info), - mock.call.power_on(self.info)] + mock_on.return_value = states.ERROR + manager.attach_mock(mock_off, 'power_off') + manager.attach_mock(mock_on, 'power_on') + expected = [mock.call.power_off(self.info), + mock.call.power_on(self.info)] - with task_manager.acquire(self.context, - [self.node['uuid']]) as task: - self.assertRaises(exception.PowerStateFailure, - self.driver.power.reboot, - task, - self.node) + with task_manager.acquire(self.context, + [self.node['uuid']]) as task: + self.assertRaises(exception.PowerStateFailure, + self.driver.power.reboot, + task, + self.node) - self.assertEqual(manager.mock_calls, expected) + self.assertEqual(manager.mock_calls, expected) def test_vendor_passthru_validate__set_boot_device_good(self): with task_manager.acquire(self.context, self.node['uuid']) as task: @@ -453,14 +433,13 @@ class IPMIToolDriverTestCase(db_base.DbTestCase): self.driver.vendor.validate, task, method='fake_method') - def test_vendor_passthru_call_set_boot_device(self): + @mock.patch.object(ipmi.VendorPassthru, '_set_boot_device') + def test_vendor_passthru_call_set_boot_device(self, boot_mock): with task_manager.acquire(self.context, [self.node['uuid']], shared=False) as task: - with mock.patch.object(ipmi.VendorPassthru, - '_set_boot_device') as boot_mock: - self.driver.vendor.vendor_passthru(task, - method='set_boot_device', - device='pxe') + self.driver.vendor.vendor_passthru(task, + method='set_boot_device', + device='pxe') boot_mock.assert_called_once_with(task, 'pxe', False) @mock.patch.object(ipmi, '_exec_ipmitool') diff --git a/ironic/tests/drivers/test_pxe.py b/ironic/tests/drivers/test_pxe.py index 1900610cfc..aaa5ea30d0 100644 --- a/ironic/tests/drivers/test_pxe.py +++ b/ironic/tests/drivers/test_pxe.py @@ -198,7 +198,8 @@ class PXEPrivateMethodsTestCase(db_base.DbTestCase): p = db_utils.get_test_port(**kwargs) return self.dbapi.create_port(p) - def test__get_tftp_image_info(self): + @mock.patch.object(base_image_service.BaseImageService, '_show') + def test__get_tftp_image_info(self, show_mock): properties = {'properties': {u'kernel_id': u'instance_kernel_uuid', u'ramdisk_id': u'instance_ramdisk_uuid'}} @@ -222,26 +223,24 @@ class PXEPrivateMethodsTestCase(db_base.DbTestCase): os.path.join(CONF.pxe.tftp_root, self.node.uuid, 'deploy_kernel')]} - with mock.patch.object(base_image_service.BaseImageService, '_show') \ - as show_mock: - show_mock.return_value = properties - image_info = pxe._get_tftp_image_info(self.node, self.context) - show_mock.assert_called_once_with('glance://image_uuid', - method='get') - self.assertEqual(expected_info, image_info) + show_mock.return_value = properties + image_info = pxe._get_tftp_image_info(self.node, self.context) + show_mock.assert_called_once_with('glance://image_uuid', + method='get') + self.assertEqual(expected_info, image_info) # test with saved info - with mock.patch.object(base_image_service.BaseImageService, '_show') \ - as show_mock: - image_info = pxe._get_tftp_image_info(self.node, self.context) - self.assertEqual(expected_info, image_info) - self.assertFalse(show_mock.called) - self.assertEqual('instance_kernel_uuid', - self.node.driver_info.get('pxe_kernel')) - self.assertEqual('instance_ramdisk_uuid', - self.node.driver_info.get('pxe_ramdisk')) + show_mock.reset_mock() + image_info = pxe._get_tftp_image_info(self.node, self.context) + self.assertEqual(expected_info, image_info) + self.assertFalse(show_mock.called) + self.assertEqual('instance_kernel_uuid', + self.node.driver_info.get('pxe_kernel')) + self.assertEqual('instance_ramdisk_uuid', + self.node.driver_info.get('pxe_ramdisk')) - def test__build_pxe_config(self): + @mock.patch.object(utils, 'random_alnum') + def test__build_pxe_config(self, random_alnum_mock): self.config(pxe_append_params='test_param', group='pxe') # NOTE: right '/' should be removed from url string self.config(api_url='http://192.168.122.184:6385/', group='conductor') @@ -250,32 +249,31 @@ class PXEPrivateMethodsTestCase(db_base.DbTestCase): pxe_config_template = open(template, 'r').read() fake_key = '0123456789ABCDEFGHIJKLMNOPQRSTUV' - with mock.patch.object(utils, 'random_alnum') as random_alnum_mock: - random_alnum_mock.return_value = fake_key + random_alnum_mock.return_value = fake_key - image_info = {'deploy_kernel': ['deploy_kernel', - os.path.join(CONF.pxe.tftp_root, - self.node.uuid, - 'deploy_kernel')], - 'deploy_ramdisk': ['deploy_ramdisk', - os.path.join(CONF.pxe.tftp_root, - self.node.uuid, - 'deploy_ramdisk')], - 'kernel': ['kernel_id', - os.path.join(CONF.pxe.tftp_root, - self.node.uuid, - 'kernel')], - 'ramdisk': ['ramdisk_id', - os.path.join(CONF.pxe.tftp_root, - self.node.uuid, - 'ramdisk')] + image_info = {'deploy_kernel': ['deploy_kernel', + os.path.join(CONF.pxe.tftp_root, + self.node.uuid, + 'deploy_kernel')], + 'deploy_ramdisk': ['deploy_ramdisk', + os.path.join(CONF.pxe.tftp_root, + self.node.uuid, + 'deploy_ramdisk')], + 'kernel': ['kernel_id', + os.path.join(CONF.pxe.tftp_root, + self.node.uuid, + 'kernel')], + 'ramdisk': ['ramdisk_id', + os.path.join(CONF.pxe.tftp_root, + self.node.uuid, + 'ramdisk')] } - pxe_config = pxe._build_pxe_config(self.node, - image_info, - self.context) + pxe_config = pxe._build_pxe_config(self.node, + image_info, + self.context) - random_alnum_mock.assert_called_once_with(32) - self.assertEqual(pxe_config_template, pxe_config) + random_alnum_mock.assert_called_once_with(32) + self.assertEqual(pxe_config_template, pxe_config) # test that deploy_key saved db_node = self.dbapi.get_node_by_uuid(self.node.uuid) @@ -312,54 +310,50 @@ class PXEPrivateMethodsTestCase(db_base.DbTestCase): result = pxe._get_node_vif_ids(task) self.assertEqual(expected, result) - def test__update_neutron(self): + @mock.patch.object(pxe, '_get_node_vif_ids') + @mock.patch.object(neutron.NeutronAPI, 'update_port_dhcp_opts') + def test__update_neutron(self, mock_updo, mock_gnvi): opts = pxe._dhcp_options_for_instance() - with mock.patch.object(pxe, '_get_node_vif_ids') as mock_gnvi: - mock_gnvi.return_value = {'port-uuid': 'vif-uuid'} - with mock.patch.object(neutron.NeutronAPI, - 'update_port_dhcp_opts') as mock_updo: - with task_manager.acquire(self.context, - self.node.uuid) as task: - pxe._update_neutron(task, self.node) - mock_updo.assertCalleOnceWith('vif-uuid', opts) + mock_gnvi.return_value = {'port-uuid': 'vif-uuid'} + with task_manager.acquire(self.context, + self.node.uuid) as task: + pxe._update_neutron(task, self.node) + mock_updo.assertCalleOnceWith('vif-uuid', opts) - def test__update_neutron_no_vif_data(self): - with mock.patch.object(pxe, '_get_node_vif_ids') as mock_gnvi: - mock_gnvi.return_value = {} - with mock.patch.object(neutron.NeutronAPI, - '__init__') as mock_init: - with task_manager.acquire(self.context, - self.node.uuid) as task: - pxe._update_neutron(task, self.node) - mock_init.assert_not_called() + @mock.patch.object(pxe, '_get_node_vif_ids') + @mock.patch.object(neutron.NeutronAPI, '__init__') + def test__update_neutron_no_vif_data(self, mock_init, mock_gnvi): + mock_gnvi.return_value = {} + with task_manager.acquire(self.context, + self.node.uuid) as task: + pxe._update_neutron(task, self.node) + mock_init.assert_not_called() - def test__update_neutron_some_failures(self): + @mock.patch.object(pxe, '_get_node_vif_ids') + @mock.patch.object(neutron.NeutronAPI, 'update_port_dhcp_opts') + def test__update_neutron_some_failures(self, mock_updo, mock_gnvi): # confirm update is called twice, one fails, but no exception raised - with mock.patch.object(pxe, '_get_node_vif_ids') as mock_gnvi: - mock_gnvi.return_value = {'p1': 'v1', 'p2': 'v2'} - with mock.patch.object(neutron.NeutronAPI, - 'update_port_dhcp_opts') as mock_updo: - exc = exception.FailedToUpdateDHCPOptOnPort('fake exception') - mock_updo.side_effect = [None, exc] - with task_manager.acquire(self.context, - self.node.uuid) as task: - pxe._update_neutron(task, self.node) - self.assertEqual(2, mock_updo.call_count) + mock_gnvi.return_value = {'p1': 'v1', 'p2': 'v2'} + exc = exception.FailedToUpdateDHCPOptOnPort('fake exception') + mock_updo.side_effect = [None, exc] + with task_manager.acquire(self.context, + self.node.uuid) as task: + pxe._update_neutron(task, self.node) + self.assertEqual(2, mock_updo.call_count) - def test__update_neutron_fails(self): + @mock.patch.object(pxe, '_get_node_vif_ids') + @mock.patch.object(neutron.NeutronAPI, 'update_port_dhcp_opts') + def test__update_neutron_fails(self, mock_updo, mock_gnvi): # confirm update is called twice, both fail, and exception is raised - with mock.patch.object(pxe, '_get_node_vif_ids') as mock_gnvi: - mock_gnvi.return_value = {'p1': 'v1', 'p2': 'v2'} - with mock.patch.object(neutron.NeutronAPI, - 'update_port_dhcp_opts') as mock_updo: - exc = exception.FailedToUpdateDHCPOptOnPort('fake exception') - mock_updo.side_effect = [exc, exc] - with task_manager.acquire(self.context, - self.node.uuid) as task: - self.assertRaises(exception.FailedToUpdateDHCPOptOnPort, - pxe._update_neutron, - task, self.node) - self.assertEqual(2, mock_updo.call_count) + mock_gnvi.return_value = {'p1': 'v1', 'p2': 'v2'} + exc = exception.FailedToUpdateDHCPOptOnPort('fake exception') + mock_updo.side_effect = [exc, exc] + with task_manager.acquire(self.context, + self.node.uuid) as task: + self.assertRaises(exception.FailedToUpdateDHCPOptOnPort, + pxe._update_neutron, + task, self.node) + self.assertEqual(2, mock_updo.call_count) def test__dhcp_options_for_instance(self): self.config(pxe_bootfile_name='test_pxe_bootfile', group='pxe') @@ -591,39 +585,37 @@ class PXEDriverTestCase(db_base.DbTestCase): mock_img_path.assert_called_once_with(task.node.uuid) mock_get_img_mb.assert_called_once_with(fake_img_path) - def test_deploy(self): - with mock.patch.object(pxe, '_update_neutron') as update_neutron_mock: - with mock.patch.object(manager_utils, - 'node_power_action') as node_power_mock: - with mock.patch.object(manager_utils, - 'node_set_boot_device') as node_set_boot_mock: - with task_manager.acquire(self.context, - self.node.uuid, shared=False) as task: - state = task.driver.deploy.deploy(task, self.node) - self.assertEqual(state, states.DEPLOYWAIT) - update_neutron_mock.assert_called_once_with(task, - self.node) - node_set_boot_mock.assert_called_once_with(task, - 'pxe', - persistent=True) - node_power_mock.assert_called_once_with(task, - self.node, - states.REBOOT) + @mock.patch.object(pxe, '_update_neutron') + @mock.patch.object(manager_utils, 'node_power_action') + @mock.patch.object(manager_utils, 'node_set_boot_device') + def test_deploy(self, node_set_boot_mock, node_power_mock, + update_neutron_mock): + with task_manager.acquire(self.context, + self.node.uuid, shared=False) as task: + state = task.driver.deploy.deploy(task, self.node) + self.assertEqual(state, states.DEPLOYWAIT) + update_neutron_mock.assert_called_once_with(task, + self.node) + node_set_boot_mock.assert_called_once_with(task, + 'pxe', + persistent=True) + node_power_mock.assert_called_once_with(task, + self.node, + states.REBOOT) - # ensure token file created - t_path = pxe._get_token_file_path(self.node.uuid) - token = open(t_path, 'r').read() - self.assertEqual(self.context.auth_token, token) + # ensure token file created + t_path = pxe._get_token_file_path(self.node.uuid) + token = open(t_path, 'r').read() + self.assertEqual(self.context.auth_token, token) - def test_tear_down(self): - with mock.patch.object(manager_utils, - 'node_power_action') as node_power_mock: - with task_manager.acquire(self.context, - self.node.uuid) as task: - state = task.driver.deploy.tear_down(task, self.node) - self.assertEqual(states.DELETED, state) - node_power_mock.assert_called_once_with(task, self.node, - states.POWER_OFF) + @mock.patch.object(manager_utils, 'node_power_action') + def test_tear_down(self, node_power_mock): + with task_manager.acquire(self.context, + self.node.uuid) as task: + state = task.driver.deploy.tear_down(task, self.node) + self.assertEqual(states.DELETED, state) + node_power_mock.assert_called_once_with(task, self.node, + states.POWER_OFF) @mock.patch.object(manager_utils, 'node_power_action') def test_tear_down_removes_internal_attrs(self, mock_npa): @@ -643,12 +635,12 @@ class PXEDriverTestCase(db_base.DbTestCase): self.assertNotIn('pxe_ramdisk', self.node.driver_info) mock_npa.assert_called_once_with(task, self.node, states.POWER_OFF) - def test_take_over(self): - with mock.patch.object(pxe, '_update_neutron') as update_neutron_mock: - with task_manager.acquire( - self.context, self.node.uuid, shared=True) as task: - task.driver.deploy.take_over(task, self.node) - update_neutron_mock.assert_called_once_with(task, self.node) + @mock.patch.object(pxe, '_update_neutron') + def test_take_over(self, update_neutron_mock): + with task_manager.acquire( + self.context, self.node.uuid, shared=True) as task: + task.driver.deploy.take_over(task, self.node) + update_neutron_mock.assert_called_once_with(task, self.node) def test_continue_deploy_good(self): token_path = self._create_token_file() @@ -743,7 +735,8 @@ class PXEDriverTestCase(db_base.DbTestCase): self.assertEqual(1, _continue_deploy_mock.call_count, "_continue_deploy was not called once.") - def clean_up_config(self, master=None): + @mock.patch.object(pxe, '_get_tftp_image_info') + def clean_up_config(self, get_tftp_image_info_mock, master=None): temp_dir = tempfile.mkdtemp() self.config(tftp_root=temp_dir, group='pxe') tftp_master_dir = os.path.join(CONF.pxe.tftp_root, @@ -770,57 +763,55 @@ class PXEDriverTestCase(db_base.DbTestCase): self.node.uuid, 'deploy_kernel') image_info = {'deploy_kernel': ['deploy_kernel_uuid', d_kernel_path]} - with mock.patch.object(pxe, '_get_tftp_image_info') \ - as get_tftp_image_info_mock: - get_tftp_image_info_mock.return_value = image_info + get_tftp_image_info_mock.return_value = image_info - pxecfg_dir = os.path.join(CONF.pxe.tftp_root, 'pxelinux.cfg') - os.makedirs(pxecfg_dir) + pxecfg_dir = os.path.join(CONF.pxe.tftp_root, 'pxelinux.cfg') + os.makedirs(pxecfg_dir) - instance_dir = os.path.join(CONF.pxe.tftp_root, - self.node.uuid) - image_dir = os.path.join(CONF.pxe.images_path, self.node.uuid) - os.makedirs(instance_dir) - os.makedirs(image_dir) - config_path = os.path.join(instance_dir, 'config') - deploy_kernel_path = os.path.join(instance_dir, 'deploy_kernel') - pxe_mac_path = os.path.join(pxecfg_dir, '01-aa-bb-cc') - image_path = os.path.join(image_dir, 'disk') - open(config_path, 'w').close() - os.link(config_path, pxe_mac_path) - if master: - master_deploy_kernel_path = os.path.join(tftp_master_dir, - 'deploy_kernel_uuid') - master_instance_path = os.path.join(instance_master_dir, - 'image_uuid') - open(master_deploy_kernel_path, 'w').close() - open(master_instance_path, 'w').close() + instance_dir = os.path.join(CONF.pxe.tftp_root, + self.node.uuid) + image_dir = os.path.join(CONF.pxe.images_path, self.node.uuid) + os.makedirs(instance_dir) + os.makedirs(image_dir) + config_path = os.path.join(instance_dir, 'config') + deploy_kernel_path = os.path.join(instance_dir, 'deploy_kernel') + pxe_mac_path = os.path.join(pxecfg_dir, '01-aa-bb-cc') + image_path = os.path.join(image_dir, 'disk') + open(config_path, 'w').close() + os.link(config_path, pxe_mac_path) + if master: + master_deploy_kernel_path = os.path.join(tftp_master_dir, + 'deploy_kernel_uuid') + master_instance_path = os.path.join(instance_master_dir, + 'image_uuid') + open(master_deploy_kernel_path, 'w').close() + open(master_instance_path, 'w').close() - os.link(master_deploy_kernel_path, deploy_kernel_path) - os.link(master_instance_path, image_path) - if master == 'in_use': - deploy_kernel_link = os.path.join(CONF.pxe.tftp_root, - 'deploy_kernel_link') - image_link = os.path.join(CONF.pxe.images_path, - 'image_link') - os.link(master_deploy_kernel_path, deploy_kernel_link) - os.link(master_instance_path, image_link) + os.link(master_deploy_kernel_path, deploy_kernel_path) + os.link(master_instance_path, image_path) + if master == 'in_use': + deploy_kernel_link = os.path.join(CONF.pxe.tftp_root, + 'deploy_kernel_link') + image_link = os.path.join(CONF.pxe.images_path, + 'image_link') + os.link(master_deploy_kernel_path, deploy_kernel_link) + os.link(master_instance_path, image_link) - else: - open(deploy_kernel_path, 'w').close() - open(image_path, 'w').close() - token_path = self._create_token_file() - self.config(image_cache_size=0, group='pxe') + else: + open(deploy_kernel_path, 'w').close() + open(image_path, 'w').close() + token_path = self._create_token_file() + self.config(image_cache_size=0, group='pxe') - with task_manager.acquire(self.context, [self.node.uuid], - shared=True) as task: - task.resources[0].driver.deploy.clean_up(task, self.node) - get_tftp_image_info_mock.called_once_with(self.node) - assert_false_path = [config_path, deploy_kernel_path, image_path, - pxe_mac_path, image_dir, instance_dir, - token_path] - for path in assert_false_path: - self.assertFalse(os.path.exists(path)) + with task_manager.acquire(self.context, [self.node.uuid], + shared=True) as task: + task.resources[0].driver.deploy.clean_up(task, self.node) + get_tftp_image_info_mock.called_once_with(self.node) + assert_false_path = [config_path, deploy_kernel_path, image_path, + pxe_mac_path, image_dir, instance_dir, + token_path] + for path in assert_false_path: + self.assertFalse(os.path.exists(path)) def test_clean_up_no_master_images(self): self.clean_up_config(master=None) diff --git a/ironic/tests/drivers/test_ssh.py b/ironic/tests/drivers/test_ssh.py index f6715cd39e..5128f9fe9d 100644 --- a/ironic/tests/drivers/test_ssh.py +++ b/ironic/tests/drivers/test_ssh.py @@ -192,116 +192,99 @@ class SSHPrivateMethodsTestCase(base.TestCase): driver_info=db_utils.get_test_ssh_info()) self.sshclient = paramiko.SSHClient() - # Set up the mock for processutils.ssh_execute because most tests use - # it. processutils.ssh_execute returns (stdout, stderr). - self.ssh_patcher = mock.patch.object(processutils, 'ssh_execute') - self.exec_ssh_mock = self.ssh_patcher.start() - self.exec_ssh_mock.return_value = ('', '') + @mock.patch.object(utils, 'ssh_connect') + def test__get_connection_client(self, ssh_connect_mock): + ssh_connect_mock.return_value = self.sshclient + client = ssh._get_connection(self.node) + self.assertEqual(self.sshclient, client) + driver_info = ssh._parse_driver_info(self.node) + ssh_connect_mock.assert_called_once_with(driver_info) - def stop_patcher(): - if self.ssh_patcher: - self.ssh_patcher.stop() + @mock.patch.object(utils, 'ssh_connect') + def test__get_connection_exception(self, ssh_connect_mock): + ssh_connect_mock.side_effect = exception.SSHConnectFailed(host='fake') + self.assertRaises(exception.SSHConnectFailed, + ssh._get_connection, + self.node) + driver_info = ssh._parse_driver_info(self.node) + ssh_connect_mock.assert_called_once_with(driver_info) - self.addCleanup(stop_patcher) - - def test__get_connection_client(self): - with mock.patch.object( - utils, 'ssh_connect') as ssh_connect_mock: - ssh_connect_mock.return_value = self.sshclient - client = ssh._get_connection(self.node) - self.assertEqual(self.sshclient, client) - driver_info = ssh._parse_driver_info(self.node) - ssh_connect_mock.assert_called_once_with(driver_info) - - def test__get_connection_exception(self): - with mock.patch.object( - utils, 'ssh_connect') as ssh_connect_mock: - ssh_connect_mock.side_effect = exception.SSHConnectFailed( - host='fake') - self.assertRaises(exception.SSHConnectFailed, - ssh._get_connection, - self.node) - driver_info = ssh._parse_driver_info(self.node) - ssh_connect_mock.assert_called_once_with(driver_info) - - def test__ssh_execute(self): + @mock.patch.object(processutils, 'ssh_execute') + def test__ssh_execute(self, exec_ssh_mock): ssh_cmd = "somecmd" expected = ['a', 'b', 'c'] - self.exec_ssh_mock.return_value = ('\n'.join(expected), '') + exec_ssh_mock.return_value = ('\n'.join(expected), '') lst = ssh._ssh_execute(self.sshclient, ssh_cmd) - self.exec_ssh_mock.assert_called_once_with(self.sshclient, ssh_cmd) + exec_ssh_mock.assert_called_once_with(self.sshclient, ssh_cmd) self.assertEqual(expected, lst) - def test__ssh_execute_exception(self): + @mock.patch.object(processutils, 'ssh_execute') + def test__ssh_execute_exception(self, exec_ssh_mock): ssh_cmd = "somecmd" - self.exec_ssh_mock.side_effect = processutils.ProcessExecutionError + exec_ssh_mock.side_effect = processutils.ProcessExecutionError self.assertRaises(exception.SSHCommandFailed, ssh._ssh_execute, self.sshclient, ssh_cmd) - self.exec_ssh_mock.assert_called_once_with(self.sshclient, ssh_cmd) + exec_ssh_mock.assert_called_once_with(self.sshclient, ssh_cmd) - def test__get_power_status_on(self): + @mock.patch.object(processutils, 'ssh_execute') + @mock.patch.object(ssh, '_get_hosts_name_for_node') + def test__get_power_status_on(self, get_hosts_name_mock, exec_ssh_mock): info = ssh._parse_driver_info(self.node) - with mock.patch.object(ssh, '_get_hosts_name_for_node') \ - as get_hosts_name_mock: - self.exec_ssh_mock.return_value = ( - '"NodeName" {b43c4982-110c-4c29-9325-d5f41b053513}', '') - get_hosts_name_mock.return_value = "NodeName" + exec_ssh_mock.return_value = ( + '"NodeName" {b43c4982-110c-4c29-9325-d5f41b053513}', '') + get_hosts_name_mock.return_value = "NodeName" - pstate = ssh._get_power_status(self.sshclient, info) + pstate = ssh._get_power_status(self.sshclient, info) - ssh_cmd = "%s %s" % (info['cmd_set']['base_cmd'], - info['cmd_set']['list_running']) - self.assertEqual(states.POWER_ON, pstate) - self.exec_ssh_mock.assert_called_once_with( - self.sshclient, ssh_cmd) - get_hosts_name_mock.assert_called_once_with(self.sshclient, - info) + ssh_cmd = "%s %s" % (info['cmd_set']['base_cmd'], + info['cmd_set']['list_running']) + self.assertEqual(states.POWER_ON, pstate) + exec_ssh_mock.assert_called_once_with(self.sshclient, ssh_cmd) + get_hosts_name_mock.assert_called_once_with(self.sshclient, info) - def test__get_power_status_off(self): + @mock.patch.object(processutils, 'ssh_execute') + @mock.patch.object(ssh, '_get_hosts_name_for_node') + def test__get_power_status_off(self, get_hosts_name_mock, exec_ssh_mock): info = ssh._parse_driver_info(self.node) - with mock.patch.object(ssh, '_get_hosts_name_for_node') \ - as get_hosts_name_mock: - self.exec_ssh_mock.return_value = ( - '"NodeName" {b43c4982-110c-4c29-9325-d5f41b053513}', '') - get_hosts_name_mock.return_value = "NotNodeName" + exec_ssh_mock.return_value = ( + '"NodeName" {b43c4982-110c-4c29-9325-d5f41b053513}', '') + get_hosts_name_mock.return_value = "NotNodeName" - pstate = ssh._get_power_status(self.sshclient, info) + pstate = ssh._get_power_status(self.sshclient, info) - ssh_cmd = "%s %s" % (info['cmd_set']['base_cmd'], - info['cmd_set']['list_running']) - self.assertEqual(states.POWER_OFF, pstate) - self.exec_ssh_mock.assert_called_once_with(self.sshclient, - ssh_cmd) - get_hosts_name_mock.assert_called_once_with(self.sshclient, - info) + ssh_cmd = "%s %s" % (info['cmd_set']['base_cmd'], + info['cmd_set']['list_running']) + self.assertEqual(states.POWER_OFF, pstate) + exec_ssh_mock.assert_called_once_with(self.sshclient, ssh_cmd) + get_hosts_name_mock.assert_called_once_with(self.sshclient, info) + + @mock.patch.object(processutils, 'ssh_execute') + @mock.patch.object(ssh, '_get_hosts_name_for_node') + def test__get_power_status_error(self, get_hosts_name_mock, exec_ssh_mock): - def test__get_power_status_error(self): info = ssh._parse_driver_info(self.node) - with mock.patch.object(ssh, '_get_hosts_name_for_node') \ - as get_hosts_name_mock: - self.exec_ssh_mock.return_value = ( - '"NodeName" {b43c4982-110c-4c29-9325-d5f41b053513}', '') - info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"] - get_hosts_name_mock.return_value = None - self.assertRaises(exception.NodeNotFound, - ssh._get_power_status, - self.sshclient, - info) + exec_ssh_mock.return_value = ( + '"NodeName" {b43c4982-110c-4c29-9325-d5f41b053513}', '') + info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"] + get_hosts_name_mock.return_value = None + self.assertRaises(exception.NodeNotFound, + ssh._get_power_status, + self.sshclient, + info) - ssh_cmd = "%s %s" % (info['cmd_set']['base_cmd'], - info['cmd_set']['list_running']) + ssh_cmd = "%s %s" % (info['cmd_set']['base_cmd'], + info['cmd_set']['list_running']) - self.exec_ssh_mock.assert_called_once_with( - self.sshclient, ssh_cmd) - get_hosts_name_mock.assert_called_once_with(self.sshclient, - info) + exec_ssh_mock.assert_called_once_with(self.sshclient, ssh_cmd) + get_hosts_name_mock.assert_called_once_with(self.sshclient, info) - def test__get_power_status_exception(self): + @mock.patch.object(processutils, 'ssh_execute') + def test__get_power_status_exception(self, exec_ssh_mock): info = ssh._parse_driver_info(self.node) - self.exec_ssh_mock.side_effect = processutils.ProcessExecutionError + exec_ssh_mock.side_effect = processutils.ProcessExecutionError self.assertRaises(exception.SSHCommandFailed, ssh._get_power_status, @@ -309,10 +292,11 @@ class SSHPrivateMethodsTestCase(base.TestCase): info) ssh_cmd = "%s %s" % (info['cmd_set']['base_cmd'], info['cmd_set']['list_running']) - self.exec_ssh_mock.assert_called_once_with( + exec_ssh_mock.assert_called_once_with( self.sshclient, ssh_cmd) - def test__get_hosts_name_for_node_match(self): + @mock.patch.object(processutils, 'ssh_execute') + def test__get_hosts_name_for_node_match(self, exec_ssh_mock): info = ssh._parse_driver_info(self.node) info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"] ssh_cmd = "%s %s" % (info['cmd_set']['base_cmd'], @@ -321,7 +305,7 @@ class SSHPrivateMethodsTestCase(base.TestCase): cmd_to_exec = "%s %s" % (info['cmd_set']['base_cmd'], info['cmd_set']['get_node_macs']) cmd_to_exec = cmd_to_exec.replace('{_NodeName_}', 'NodeName') - self.exec_ssh_mock.side_effect = [('NodeName', ''), + exec_ssh_mock.side_effect = [('NodeName', ''), ('52:54:00:cf:2d:31', '')] expected = [mock.call(self.sshclient, ssh_cmd), mock.call(self.sshclient, cmd_to_exec)] @@ -329,12 +313,13 @@ class SSHPrivateMethodsTestCase(base.TestCase): found_name = ssh._get_hosts_name_for_node(self.sshclient, info) self.assertEqual('NodeName', found_name) - self.assertEqual(expected, self.exec_ssh_mock.call_args_list) + self.assertEqual(expected, exec_ssh_mock.call_args_list) - def test__get_hosts_name_for_node_no_match(self): + @mock.patch.object(processutils, 'ssh_execute') + def test__get_hosts_name_for_node_no_match(self, exec_ssh_mock): info = ssh._parse_driver_info(self.node) info['macs'] = ["11:11:11:11:11:11", "22:22:22:22:22:22"] - self.exec_ssh_mock.side_effect = [('NodeName', ''), + exec_ssh_mock.side_effect = [('NodeName', ''), ('52:54:00:cf:2d:31', '')] ssh_cmd = "%s %s" % (info['cmd_set']['base_cmd'], @@ -350,9 +335,10 @@ class SSHPrivateMethodsTestCase(base.TestCase): found_name = ssh._get_hosts_name_for_node(self.sshclient, info) self.assertIsNone(found_name) - self.assertEqual(expected, self.exec_ssh_mock.call_args_list) + self.assertEqual(expected, exec_ssh_mock.call_args_list) - def test__get_hosts_name_for_node_exception(self): + @mock.patch.object(processutils, 'ssh_execute') + def test__get_hosts_name_for_node_exception(self, exec_ssh_mock): info = ssh._parse_driver_info(self.node) info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"] ssh_cmd = "%s %s" % (info['cmd_set']['base_cmd'], @@ -362,8 +348,8 @@ class SSHPrivateMethodsTestCase(base.TestCase): info['cmd_set']['get_node_macs']) cmd_to_exec = cmd_to_exec.replace('{_NodeName_}', 'NodeName') - self.exec_ssh_mock.side_effect = [('NodeName', ''), - processutils.ProcessExecutionError] + exec_ssh_mock.side_effect = [('NodeName', ''), + processutils.ProcessExecutionError] expected = [mock.call(self.sshclient, ssh_cmd), mock.call(self.sshclient, cmd_to_exec)] @@ -371,176 +357,149 @@ class SSHPrivateMethodsTestCase(base.TestCase): ssh._get_hosts_name_for_node, self.sshclient, info) - self.assertEqual(expected, self.exec_ssh_mock.call_args_list) + self.assertEqual(expected, exec_ssh_mock.call_args_list) - def test__power_on_good(self): + @mock.patch.object(processutils, 'ssh_execute') + @mock.patch.object(ssh, '_get_power_status') + @mock.patch.object(ssh, '_get_hosts_name_for_node') + def test__power_on_good(self, get_hosts_name_mock, get_power_status_mock, + exec_ssh_mock): info = ssh._parse_driver_info(self.node) info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"] - with mock.patch.object(ssh, '_get_power_status') \ - as get_power_status_mock: - with mock.patch.object(ssh, '_get_hosts_name_for_node') \ - as get_hosts_name_mock: - get_power_status_mock.side_effect = [states.POWER_OFF, - states.POWER_ON] - get_hosts_name_mock.return_value = "NodeName" - expected = [mock.call(self.sshclient, info), - mock.call(self.sshclient, info)] + get_power_status_mock.side_effect = [states.POWER_OFF, + states.POWER_ON] + get_hosts_name_mock.return_value = "NodeName" + expected = [mock.call(self.sshclient, info), + mock.call(self.sshclient, info)] - cmd_to_exec = "%s %s" % (info['cmd_set']['base_cmd'], - info['cmd_set']['start_cmd']) - cmd_to_exec = cmd_to_exec.replace('{_NodeName_}', 'NodeName') - current_state = ssh._power_on(self.sshclient, info) + cmd_to_exec = "%s %s" % (info['cmd_set']['base_cmd'], + info['cmd_set']['start_cmd']) + cmd_to_exec = cmd_to_exec.replace('{_NodeName_}', 'NodeName') + current_state = ssh._power_on(self.sshclient, info) - self.assertEqual(states.POWER_ON, current_state) - self.assertEqual(expected, - get_power_status_mock.call_args_list) - get_hosts_name_mock.assert_called_once_with(self.sshclient, - info) - self.exec_ssh_mock.assert_called_once_with(self.sshclient, - cmd_to_exec) + self.assertEqual(states.POWER_ON, current_state) + self.assertEqual(expected, get_power_status_mock.call_args_list) + get_hosts_name_mock.assert_called_once_with(self.sshclient, info) + exec_ssh_mock.assert_called_once_with(self.sshclient, cmd_to_exec) - def test__power_on_fail(self): + @mock.patch.object(processutils, 'ssh_execute') + @mock.patch.object(ssh, '_get_power_status') + @mock.patch.object(ssh, '_get_hosts_name_for_node') + def test__power_on_fail(self, get_hosts_name_mock, get_power_status_mock, + exec_ssh_mock): info = ssh._parse_driver_info(self.node) info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"] - with mock.patch.object(ssh, '_get_power_status') \ - as get_power_status_mock: - with mock.patch.object(ssh, '_get_hosts_name_for_node') \ - as get_hosts_name_mock: - get_power_status_mock.side_effect = [states.POWER_OFF, - states.POWER_OFF] - get_hosts_name_mock.return_value = "NodeName" - expected = [mock.call(self.sshclient, info), - mock.call(self.sshclient, info)] + get_power_status_mock.side_effect = [states.POWER_OFF, + states.POWER_OFF] + get_hosts_name_mock.return_value = "NodeName" + expected = [mock.call(self.sshclient, info), + mock.call(self.sshclient, info)] - cmd_to_exec = "%s %s" % (info['cmd_set']['base_cmd'], - info['cmd_set']['start_cmd']) - cmd_to_exec = cmd_to_exec.replace('{_NodeName_}', 'NodeName') - current_state = ssh._power_on(self.sshclient, info) + cmd_to_exec = "%s %s" % (info['cmd_set']['base_cmd'], + info['cmd_set']['start_cmd']) + cmd_to_exec = cmd_to_exec.replace('{_NodeName_}', 'NodeName') + current_state = ssh._power_on(self.sshclient, info) - self.assertEqual(states.ERROR, current_state) - self.assertEqual(expected, - get_power_status_mock.call_args_list) - get_hosts_name_mock.assert_called_once_with(self.sshclient, - info) - self.exec_ssh_mock.assert_called_once_with(self.sshclient, - cmd_to_exec) + self.assertEqual(states.ERROR, current_state) + self.assertEqual(expected, get_power_status_mock.call_args_list) + get_hosts_name_mock.assert_called_once_with(self.sshclient, info) + exec_ssh_mock.assert_called_once_with(self.sshclient, cmd_to_exec) - def test__power_on_exception(self): + @mock.patch.object(processutils, 'ssh_execute') + @mock.patch.object(ssh, '_get_power_status') + @mock.patch.object(ssh, '_get_hosts_name_for_node') + def test__power_on_exception(self, get_hosts_name_mock, + get_power_status_mock, exec_ssh_mock): info = ssh._parse_driver_info(self.node) info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"] - self.exec_ssh_mock.side_effect = processutils.ProcessExecutionError - with mock.patch.object( - ssh, '_get_power_status') as get_power_status_mock: - with mock.patch.object( - ssh, '_get_hosts_name_for_node') as get_hosts_name_mock: - get_power_status_mock.side_effect = [states.POWER_OFF, - states.POWER_ON] - get_hosts_name_mock.return_value = "NodeName" + exec_ssh_mock.side_effect = processutils.ProcessExecutionError + get_power_status_mock.side_effect = [states.POWER_OFF, + states.POWER_ON] + get_hosts_name_mock.return_value = "NodeName" - cmd_to_exec = "%s %s" % (info['cmd_set']['base_cmd'], - info['cmd_set']['start_cmd']) - cmd_to_exec = cmd_to_exec.replace('{_NodeName_}', 'NodeName') + cmd_to_exec = "%s %s" % (info['cmd_set']['base_cmd'], + info['cmd_set']['start_cmd']) + cmd_to_exec = cmd_to_exec.replace('{_NodeName_}', 'NodeName') - self.assertRaises(exception.SSHCommandFailed, - ssh._power_on, - self.sshclient, - info) - get_power_status_mock.assert_called_once_with(self.sshclient, - info) - get_hosts_name_mock.assert_called_once_with(self.sshclient, - info) - self.exec_ssh_mock.assert_called_once_with(self.sshclient, - cmd_to_exec) + self.assertRaises(exception.SSHCommandFailed, + ssh._power_on, + self.sshclient, + info) + get_power_status_mock.assert_called_once_with(self.sshclient, info) + get_hosts_name_mock.assert_called_once_with(self.sshclient, info) + exec_ssh_mock.assert_called_once_with(self.sshclient, cmd_to_exec) - def test__power_off_good(self): + @mock.patch.object(processutils, 'ssh_execute') + @mock.patch.object(ssh, '_get_power_status') + @mock.patch.object(ssh, '_get_hosts_name_for_node') + def test__power_off_good(self, get_hosts_name_mock, + get_power_status_mock, exec_ssh_mock): info = ssh._parse_driver_info(self.node) info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"] - with mock.patch.object(ssh, '_get_power_status') \ - as get_power_status_mock: - with mock.patch.object(ssh, '_get_hosts_name_for_node') \ - as get_hosts_name_mock: - get_power_status_mock.side_effect = [states.POWER_ON, - states.POWER_OFF] - get_hosts_name_mock.return_value = "NodeName" - expected = [mock.call(self.sshclient, info), - mock.call(self.sshclient, info)] + get_power_status_mock.side_effect = [states.POWER_ON, + states.POWER_OFF] + get_hosts_name_mock.return_value = "NodeName" + expected = [mock.call(self.sshclient, info), + mock.call(self.sshclient, info)] - cmd_to_exec = "%s %s" % (info['cmd_set']['base_cmd'], - info['cmd_set']['stop_cmd']) - cmd_to_exec = cmd_to_exec.replace('{_NodeName_}', 'NodeName') - current_state = ssh._power_off(self.sshclient, info) + cmd_to_exec = "%s %s" % (info['cmd_set']['base_cmd'], + info['cmd_set']['stop_cmd']) + cmd_to_exec = cmd_to_exec.replace('{_NodeName_}', 'NodeName') + current_state = ssh._power_off(self.sshclient, info) - self.assertEqual(states.POWER_OFF, current_state) - self.assertEqual(expected, - get_power_status_mock.call_args_list) - get_hosts_name_mock.assert_called_once_with(self.sshclient, - info) - self.exec_ssh_mock.assert_called_once_with(self.sshclient, - cmd_to_exec) + self.assertEqual(states.POWER_OFF, current_state) + self.assertEqual(expected, get_power_status_mock.call_args_list) + get_hosts_name_mock.assert_called_once_with(self.sshclient, info) + exec_ssh_mock.assert_called_once_with(self.sshclient, cmd_to_exec) - def test__power_off_fail(self): + @mock.patch.object(processutils, 'ssh_execute') + @mock.patch.object(ssh, '_get_power_status') + @mock.patch.object(ssh, '_get_hosts_name_for_node') + def test__power_off_fail(self, get_hosts_name_mock, + get_power_status_mock, exec_ssh_mock): info = ssh._parse_driver_info(self.node) info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"] - with mock.patch.object(ssh, '_get_power_status') \ - as get_power_status_mock: - with mock.patch.object(ssh, '_get_hosts_name_for_node') \ - as get_hosts_name_mock: - get_power_status_mock.side_effect = [states.POWER_ON, - states.POWER_ON] - get_hosts_name_mock.return_value = "NodeName" - expected = [mock.call(self.sshclient, info), - mock.call(self.sshclient, info)] + get_power_status_mock.side_effect = [states.POWER_ON, + states.POWER_ON] + get_hosts_name_mock.return_value = "NodeName" + expected = [mock.call(self.sshclient, info), + mock.call(self.sshclient, info)] - cmd_to_exec = "%s %s" % (info['cmd_set']['base_cmd'], - info['cmd_set']['stop_cmd']) - cmd_to_exec = cmd_to_exec.replace('{_NodeName_}', 'NodeName') - current_state = ssh._power_off(self.sshclient, info) + cmd_to_exec = "%s %s" % (info['cmd_set']['base_cmd'], + info['cmd_set']['stop_cmd']) + cmd_to_exec = cmd_to_exec.replace('{_NodeName_}', 'NodeName') + current_state = ssh._power_off(self.sshclient, info) - self.assertEqual(states.ERROR, current_state) - self.assertEqual(expected, - get_power_status_mock.call_args_list) - get_hosts_name_mock.assert_called_once_with(self.sshclient, - info) - self.exec_ssh_mock.assert_called_once_with(self.sshclient, - cmd_to_exec) + self.assertEqual(states.ERROR, current_state) + self.assertEqual(expected, get_power_status_mock.call_args_list) + get_hosts_name_mock.assert_called_once_with(self.sshclient, info) + exec_ssh_mock.assert_called_once_with(self.sshclient, cmd_to_exec) - def test__power_off_exception(self): + @mock.patch.object(processutils, 'ssh_execute') + @mock.patch.object(ssh, '_get_power_status') + @mock.patch.object(ssh, '_get_hosts_name_for_node') + def test__power_off_exception(self, get_hosts_name_mock, + get_power_status_mock, exec_ssh_mock): info = ssh._parse_driver_info(self.node) info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"] - self.exec_ssh_mock.side_effect = processutils.ProcessExecutionError - with mock.patch.object( - ssh, '_get_power_status') as get_power_status_mock: - with mock.patch.object( - ssh, '_get_hosts_name_for_node') as get_hosts_name_mock: - self.exec_ssh_mock.side_effect = ( - processutils.ProcessExecutionError) - get_power_status_mock.side_effect = [states.POWER_ON, - states.POWER_OFF] - get_hosts_name_mock.return_value = "NodeName" + exec_ssh_mock.side_effect = processutils.ProcessExecutionError + get_power_status_mock.side_effect = [states.POWER_ON, + states.POWER_OFF] + get_hosts_name_mock.return_value = "NodeName" - cmd_to_exec = "%s %s" % (info['cmd_set']['base_cmd'], - info['cmd_set']['stop_cmd']) - cmd_to_exec = cmd_to_exec.replace('{_NodeName_}', 'NodeName') + cmd_to_exec = "%s %s" % (info['cmd_set']['base_cmd'], + info['cmd_set']['stop_cmd']) + cmd_to_exec = cmd_to_exec.replace('{_NodeName_}', 'NodeName') - self.assertRaises(exception.SSHCommandFailed, - ssh._power_off, - self.sshclient, - info) - get_power_status_mock.assert_called_once_with(self.sshclient, - info) - get_hosts_name_mock.assert_called_once_with(self.sshclient, - info) - self.exec_ssh_mock.assert_called_once_with(self.sshclient, - cmd_to_exec) + self.assertRaises(exception.SSHCommandFailed, ssh._power_off, + self.sshclient, info) + get_power_status_mock.assert_called_once_with(self.sshclient, info) + get_hosts_name_mock.assert_called_once_with(self.sshclient, info) + exec_ssh_mock.assert_called_once_with(self.sshclient, cmd_to_exec) def test_exec_ssh_command_good(self): - # stop mocking the processutils.ssh_execute because we - # are testing it here - self.ssh_patcher.stop() - self.ssh_patcher = None - class Channel(object): def recv_exit_status(self): return 0 @@ -568,11 +527,6 @@ class SSHPrivateMethodsTestCase(base.TestCase): exec_command_mock.assert_called_once_with("command") def test_exec_ssh_command_fail(self): - # stop mocking the processutils.ssh_execute because we - # are testing it here - self.ssh_patcher.stop() - self.ssh_patcher = None - class Channel(object): def recv_exit_status(self): return 127 @@ -614,49 +568,20 @@ class SSHDriverTestCase(db_base.DbTestCase): node_id=self.node.id)) self.sshclient = paramiko.SSHClient() - #setup these mocks because most tests use them - self.parse_drv_info_patcher = mock.patch.object(ssh, - '_parse_driver_info') - self.parse_drv_info_mock = None - self.get_mac_addr_patcher = mock.patch.object( - driver_utils, - 'get_node_mac_addresses') - self.get_mac_addr_mock = self.get_mac_addr_patcher.start() - self.get_conn_patcher = mock.patch.object(ssh, '_get_connection') - self.get_conn_mock = self.get_conn_patcher.start() - - def stop_patchers(): - if self.parse_drv_info_mock: - self.parse_drv_info_patcher.stop() - if self.get_mac_addr_mock: - self.get_mac_addr_patcher.stop() - if self.get_conn_mock: - self.get_conn_patcher.stop() - - self.addCleanup(stop_patchers) - - def test__validate_info_ssh_connect_failed(self): + @mock.patch.object(utils, 'ssh_connect') + def test__validate_info_ssh_connect_failed(self, ssh_connect_mock): info = ssh._parse_driver_info(self.node) - self.get_conn_patcher.stop() - self.get_conn_mock = None - with mock.patch.object(utils, 'ssh_connect') \ - as ssh_connect_mock: - ssh_connect_mock.side_effect = exception.SSHConnectFailed( - host='fake') - with task_manager.acquire(self.context, [info['uuid']], - shared=False) as task: - self.assertRaises(exception.InvalidParameterValue, - task.resources[0].driver.power.validate, - task, self.node) - driver_info = ssh._parse_driver_info(self.node) - ssh_connect_mock.assert_called_once_with(driver_info) + ssh_connect_mock.side_effect = exception.SSHConnectFailed(host='fake') + with task_manager.acquire(self.context, [info['uuid']], + shared=False) as task: + self.assertRaises(exception.InvalidParameterValue, + task.resources[0].driver.power.validate, + task, self.node) + driver_info = ssh._parse_driver_info(self.node) + ssh_connect_mock.assert_called_once_with(driver_info) def test_validate_fail_no_port(self): - # stop the get_mac_addr mock, it's needed for this test - self.get_mac_addr_patcher.stop() - self.get_mac_addr_mock = None - new_node = obj_utils.create_test_node( self.context, id=321, @@ -669,185 +594,186 @@ class SSHDriverTestCase(db_base.DbTestCase): task.resources[0].driver.power.validate, task, new_node) - def test_reboot_good(self): + @mock.patch.object(driver_utils, 'get_node_mac_addresses') + @mock.patch.object(ssh, '_get_connection') + @mock.patch.object(ssh, '_get_power_status') + @mock.patch.object(ssh, '_power_off') + @mock.patch.object(ssh, '_power_on') + def test_reboot_good(self, power_on_mock, power_off_mock, + get_power_stat_mock, get_conn_mock, + get_mac_addr_mock): info = ssh._parse_driver_info(self.node) info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"] + get_mac_addr_mock.return_value = info['macs'] + get_conn_mock.return_value = self.sshclient + get_power_stat_mock.return_value = states.POWER_ON + power_off_mock.return_value = None + power_on_mock.return_value = states.POWER_ON + with mock.patch.object(ssh, + '_parse_driver_info') as parse_drv_info_mock: + parse_drv_info_mock.return_value = info + with task_manager.acquire(self.context, [info['uuid']], + shared=False) as task: + task.resources[0].driver.power.reboot(task, self.node) - self.parse_drv_info_mock = self.parse_drv_info_patcher.start() - self.parse_drv_info_mock.return_value = info - self.get_mac_addr_mock.return_value = info['macs'] - self.get_conn_mock.return_value = self.sshclient + parse_drv_info_mock.assert_called_once_with(self.node) + get_mac_addr_mock.assert_called_once_with(mock.ANY, self.node) + get_conn_mock.assert_called_once_with(self.node) + get_power_stat_mock.assert_called_once_with(self.sshclient, info) + power_off_mock.assert_called_once_with(self.sshclient, info) + power_on_mock.assert_called_once_with(self.sshclient, info) - with mock.patch.object(ssh, '_get_power_status') \ - as get_power_stat_mock: - with mock.patch.object(ssh, '_power_off') as power_off_mock: - with mock.patch.object(ssh, '_power_on') as power_on_mock: - get_power_stat_mock.return_value = states.POWER_ON - power_off_mock.return_value = None - power_on_mock.return_value = states.POWER_ON - - with task_manager.acquire(self.context, [info['uuid']], - shared=False) as task: - task.resources[0].driver.power.reboot(task, self.node) - - self.parse_drv_info_mock.assert_called_once_with(self.node) - self.get_mac_addr_mock.assert_called_once_with(mock.ANY, - self.node) - self.get_conn_mock.assert_called_once_with(self.node) - get_power_stat_mock.assert_called_once_with(self.sshclient, - info) - power_off_mock.assert_called_once_with(self.sshclient, - info) - power_on_mock.assert_called_once_with(self.sshclient, info) - - def test_reboot_fail(self): + @mock.patch.object(driver_utils, 'get_node_mac_addresses') + @mock.patch.object(ssh, '_get_connection') + @mock.patch.object(ssh, '_get_power_status') + @mock.patch.object(ssh, '_power_off') + @mock.patch.object(ssh, '_power_on') + def test_reboot_fail(self, power_on_mock, power_off_mock, + get_power_stat_mock, get_conn_mock, + get_mac_addr_mock): info = ssh._parse_driver_info(self.node) info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"] + get_mac_addr_mock.return_value = info['macs'] + get_conn_mock.return_value = self.sshclient + get_power_stat_mock.return_value = states.POWER_ON + power_off_mock.return_value = None + power_on_mock.return_value = states.POWER_OFF + with mock.patch.object(ssh, + '_parse_driver_info') as parse_drv_info_mock: + parse_drv_info_mock.return_value = info + with task_manager.acquire(self.context, [info['uuid']], + shared=False) as task: + self.assertRaises(exception.PowerStateFailure, + task.resources[0].driver.power.reboot, + task, + self.node) + parse_drv_info_mock.assert_called_once_with(self.node) + get_mac_addr_mock.assert_called_once_with(mock.ANY, self.node) + get_conn_mock.assert_called_once_with(self.node) + get_power_stat_mock.assert_called_once_with(self.sshclient, info) + power_off_mock.assert_called_once_with(self.sshclient, info) + power_on_mock.assert_called_once_with(self.sshclient, info) - self.parse_drv_info_mock = self.parse_drv_info_patcher.start() - self.parse_drv_info_mock.return_value = info - self.get_mac_addr_mock.return_value = info['macs'] - self.get_conn_mock.return_value = self.sshclient - - with mock.patch.object(ssh, '_get_power_status') \ - as get_power_stat_mock: - with mock.patch.object(ssh, '_power_off') as power_off_mock: - with mock.patch.object(ssh, '_power_on') as power_on_mock: - get_power_stat_mock.return_value = states.POWER_ON - power_off_mock.return_value = None - power_on_mock.return_value = states.POWER_OFF - - with task_manager.acquire(self.context, [info['uuid']], - shared=False) as task: - self.assertRaises( - exception.PowerStateFailure, - task.resources[0].driver.power.reboot, - task, - self.node) - self.parse_drv_info_mock.assert_called_once_with(self.node) - self.get_mac_addr_mock.assert_called_once_with(mock.ANY, - self.node) - self.get_conn_mock.assert_called_once_with(self.node) - get_power_stat_mock.assert_called_once_with(self.sshclient, - info) - power_off_mock.assert_called_once_with(self.sshclient, - info) - power_on_mock.assert_called_once_with(self.sshclient, info) - - def test_set_power_state_bad_state(self): + @mock.patch.object(driver_utils, 'get_node_mac_addresses') + @mock.patch.object(ssh, '_get_connection') + def test_set_power_state_bad_state(self, get_conn_mock, + get_mac_addr_mock): info = ssh._parse_driver_info(self.node) info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"] - - self.parse_drv_info_mock = self.parse_drv_info_patcher.start() - self.parse_drv_info_mock.return_value = info - self.get_mac_addr_mock.return_value = info['macs'] - self.get_conn_mock.return_value = self.sshclient - - with task_manager.acquire(self.context, [info['uuid']], - shared=False) as task: - self.assertRaises( + get_mac_addr_mock.return_value = info['macs'] + get_conn_mock.return_value = self.sshclient + with mock.patch.object(ssh, + '_parse_driver_info') as parse_drv_info_mock: + parse_drv_info_mock.return_value = info + with task_manager.acquire(self.context, [info['uuid']], + shared=False) as task: + self.assertRaises( exception.InvalidParameterValue, task.resources[0].driver.power.set_power_state, task, self.node, "BAD_PSTATE") - self. parse_drv_info_mock.assert_called_once_with(self.node) - self.get_mac_addr_mock.assert_called_once_with(mock.ANY, self.node) - self.get_conn_mock.assert_called_once_with(self.node) + parse_drv_info_mock.assert_called_once_with(self.node) + get_mac_addr_mock.assert_called_once_with(mock.ANY, self.node) + get_conn_mock.assert_called_once_with(self.node) - def test_set_power_state_on_good(self): + @mock.patch.object(driver_utils, 'get_node_mac_addresses') + @mock.patch.object(ssh, '_get_connection') + @mock.patch.object(ssh, '_power_on') + def test_set_power_state_on_good(self, power_on_mock, get_conn_mock, + get_mac_addr_mock): info = ssh._parse_driver_info(self.node) info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"] - - self.parse_drv_info_mock = self.parse_drv_info_patcher.start() - self.parse_drv_info_mock.return_value = info - self.get_mac_addr_mock.return_value = info['macs'] - self.get_conn_mock.return_value = self.sshclient - with mock.patch.object(ssh, '_power_on') as power_on_mock: - power_on_mock.return_value = states.POWER_ON - + get_mac_addr_mock.return_value = info['macs'] + get_conn_mock.return_value = self.sshclient + power_on_mock.return_value = states.POWER_ON + with mock.patch.object(ssh, + '_parse_driver_info') as parse_drv_info_mock: + parse_drv_info_mock.return_value = info with task_manager.acquire(self.context, [info['uuid']], shared=False) as task: - task.resources[0].driver.power.set_power_state(task, - self.node, + task.resources[0].driver.power.set_power_state(task, self.node, states.POWER_ON) - self.parse_drv_info_mock.assert_called_once_with(self.node) - self.get_mac_addr_mock.assert_called_once_with(mock.ANY, self.node) - self.get_conn_mock.assert_called_once_with(self.node) - power_on_mock.assert_called_once_with(self.sshclient, info) + parse_drv_info_mock.assert_called_once_with(self.node) + get_mac_addr_mock.assert_called_once_with(mock.ANY, self.node) + get_conn_mock.assert_called_once_with(self.node) + power_on_mock.assert_called_once_with(self.sshclient, info) - def test_set_power_state_on_fail(self): + @mock.patch.object(driver_utils, 'get_node_mac_addresses') + @mock.patch.object(ssh, '_get_connection') + @mock.patch.object(ssh, '_power_on') + def test_set_power_state_on_fail(self, power_on_mock, get_conn_mock, + get_mac_addr_mock): info = ssh._parse_driver_info(self.node) info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"] - - self.parse_drv_info_mock = self.parse_drv_info_patcher.start() - self.parse_drv_info_mock.return_value = info - self.get_mac_addr_mock.return_value = info['macs'] - self.get_conn_mock.return_value = self.sshclient - - with mock.patch.object(ssh, '_power_on') as power_on_mock: - power_on_mock.return_value = states.POWER_OFF - + get_mac_addr_mock.return_value = info['macs'] + get_conn_mock.return_value = self.sshclient + power_on_mock.return_value = states.POWER_OFF + with mock.patch.object(ssh, + '_parse_driver_info') as parse_drv_info_mock: + parse_drv_info_mock.return_value = info with task_manager.acquire(self.context, [info['uuid']], shared=False) as task: self.assertRaises( - exception.PowerStateFailure, - task.resources[0].driver.power.set_power_state, - task, - self.node, - states.POWER_ON) + exception.PowerStateFailure, + task.resources[0].driver.power.set_power_state, + task, + self.node, + states.POWER_ON) - self.parse_drv_info_mock.assert_called_once_with(self.node) - self.get_mac_addr_mock.assert_called_once_with(mock.ANY, self.node) - self.get_conn_mock.assert_called_once_with(self.node) - power_on_mock.assert_called_once_with(self.sshclient, info) + parse_drv_info_mock.assert_called_once_with(self.node) + get_mac_addr_mock.assert_called_once_with(mock.ANY, self.node) + get_conn_mock.assert_called_once_with(self.node) + power_on_mock.assert_called_once_with(self.sshclient, info) - def test_set_power_state_off_good(self): + @mock.patch.object(driver_utils, 'get_node_mac_addresses') + @mock.patch.object(ssh, '_get_connection') + @mock.patch.object(ssh, '_power_off') + def test_set_power_state_off_good(self, power_off_mock, get_conn_mock, + get_mac_addr_mock): info = ssh._parse_driver_info(self.node) info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"] - - self.parse_drv_info_mock = self.parse_drv_info_patcher.start() - self.parse_drv_info_mock.return_value = info - self.get_mac_addr_mock.return_value = info['macs'] - self.get_conn_mock.return_value = self.sshclient - - with mock.patch.object(ssh, '_power_off') as power_off_mock: - power_off_mock.return_value = states.POWER_OFF - + get_mac_addr_mock.return_value = info['macs'] + get_conn_mock.return_value = self.sshclient + power_off_mock.return_value = states.POWER_OFF + with mock.patch.object(ssh, + '_parse_driver_info') as parse_drv_info_mock: + parse_drv_info_mock.return_value = info with task_manager.acquire(self.context, [info['uuid']], shared=False) as task: - task.resources[0].driver.power.set_power_state(task, - self.node, states.POWER_OFF) + task.resources[0].driver.power.set_power_state( + task, self.node, states.POWER_OFF) - self.parse_drv_info_mock.assert_called_once_with(self.node) - self.get_mac_addr_mock.assert_called_once_with(mock.ANY, self.node) - self.get_conn_mock.assert_called_once_with(self.node) - power_off_mock.assert_called_once_with(self.sshclient, info) + parse_drv_info_mock.assert_called_once_with(self.node) + get_mac_addr_mock.assert_called_once_with(mock.ANY, self.node) + get_conn_mock.assert_called_once_with(self.node) + power_off_mock.assert_called_once_with(self.sshclient, info) - def test_set_power_state_off_fail(self): + @mock.patch.object(driver_utils, 'get_node_mac_addresses') + @mock.patch.object(ssh, '_get_connection') + @mock.patch.object(ssh, '_power_off') + def test_set_power_state_off_fail(self, power_off_mock, get_conn_mock, + get_mac_addr_mock): info = ssh._parse_driver_info(self.node) info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"] - - self.parse_drv_info_mock = self.parse_drv_info_patcher.start() - self.parse_drv_info_mock.return_value = info - self.get_mac_addr_mock.return_value = info['macs'] - self.get_conn_mock.return_value = self.sshclient - - with mock.patch.object(ssh, '_power_off') as power_off_mock: - power_off_mock.return_value = states.POWER_ON - + get_mac_addr_mock.return_value = info['macs'] + get_conn_mock.return_value = self.sshclient + power_off_mock.return_value = states.POWER_ON + with mock.patch.object(ssh, + '_parse_driver_info') as parse_drv_info_mock: + parse_drv_info_mock.return_value = info with task_manager.acquire(self.context, [info['uuid']], shared=False) as task: self.assertRaises( - exception.PowerStateFailure, - task.resources[0].driver.power.set_power_state, - task, - self.node, - states.POWER_OFF) + exception.PowerStateFailure, + task.resources[0].driver.power.set_power_state, + task, + self.node, + states.POWER_OFF) - self.parse_drv_info_mock.assert_called_once_with(self.node) - self.get_mac_addr_mock.assert_called_once_with(mock.ANY, self.node) - self.get_conn_mock.assert_called_once_with(self.node) - power_off_mock.assert_called_once_with(self.sshclient, info) + parse_drv_info_mock.assert_called_once_with(self.node) + get_mac_addr_mock.assert_called_once_with(mock.ANY, self.node) + get_conn_mock.assert_called_once_with(self.node) + power_off_mock.assert_called_once_with(self.sshclient, info)