Cleanup mock patch without with part 1

A good idea because it makes the code easier to read to get rid of
the `with` using mock.patch and mock.patch.object in favor decorators,
where possible in the tests.

This patch only affects drivers.

Change-Id: I564428a6315352f72ad3bad69bf7dd8cd88ffd9e
This commit is contained in:
Mikhail Durnosvistov 2014-02-13 11:41:57 +02:00
parent 0acc05fb33
commit b7d916aea3
4 changed files with 758 additions and 860 deletions

View File

@ -50,9 +50,6 @@ class IPMINativePrivateMethodTestCase(base.TestCase):
driver_info=INFO_DICT)
self.dbapi = db_api.get_instance()
self.info = ipminative._parse_driver_info(self.node)
ipmi_patch = mock.patch('pyghmi.ipmi.command.Command')
self.ipmi_mock = ipmi_patch.start()
self.addCleanup(ipmi_patch.stop)
def test__parse_driver_info(self):
# make sure we get back the expected things
@ -70,32 +67,36 @@ class IPMINativePrivateMethodTestCase(base.TestCase):
ipminative._parse_driver_info,
node)
def test__power_status_on(self):
ipmicmd = self.ipmi_mock.return_value
@mock.patch('pyghmi.ipmi.command.Command')
def test__power_status_on(self, ipmi_mock):
ipmicmd = ipmi_mock.return_value
ipmicmd.get_power.return_value = {'powerstate': 'on'}
state = ipminative._power_status(self.info)
ipmicmd.get_power.assert_called_once_with()
self.assertEqual(states.POWER_ON, state)
def test__power_status_off(self):
ipmicmd = self.ipmi_mock.return_value
@mock.patch('pyghmi.ipmi.command.Command')
def test__power_status_off(self, ipmi_mock):
ipmicmd = ipmi_mock.return_value
ipmicmd.get_power.return_value = {'powerstate': 'off'}
state = ipminative._power_status(self.info)
ipmicmd.get_power.assert_called_once_with()
self.assertEqual(states.POWER_OFF, state)
def test__power_status_error(self):
ipmicmd = self.ipmi_mock.return_value
@mock.patch('pyghmi.ipmi.command.Command')
def test__power_status_error(self, ipmi_mock):
ipmicmd = ipmi_mock.return_value
ipmicmd.get_power.return_value = {'powerstate': 'Error'}
state = ipminative._power_status(self.info)
ipmicmd.get_power.assert_called_once_with()
self.assertEqual(states.ERROR, state)
def test__power_on(self):
ipmicmd = self.ipmi_mock.return_value
@mock.patch('pyghmi.ipmi.command.Command')
def test__power_on(self, ipmi_mock):
ipmicmd = ipmi_mock.return_value
ipmicmd.set_power.return_value = {'powerstate': 'on'}
self.config(retry_timeout=400, group='ipmi')
@ -103,8 +104,9 @@ class IPMINativePrivateMethodTestCase(base.TestCase):
ipmicmd.set_power.assert_called_once_with('on', 400)
self.assertEqual(states.POWER_ON, state)
def test__power_off(self):
ipmicmd = self.ipmi_mock.return_value
@mock.patch('pyghmi.ipmi.command.Command')
def test__power_off(self, ipmi_mock):
ipmicmd = ipmi_mock.return_value
ipmicmd.set_power.return_value = {'powerstate': 'off'}
self.config(retry_timeout=500, group='ipmi')
@ -112,8 +114,9 @@ class IPMINativePrivateMethodTestCase(base.TestCase):
ipmicmd.set_power.assert_called_once_with('off', 500)
self.assertEqual(states.POWER_OFF, state)
def test__reboot(self):
ipmicmd = self.ipmi_mock.return_value
@mock.patch('pyghmi.ipmi.command.Command')
def test__reboot(self, ipmi_mock):
ipmicmd = ipmi_mock.return_value
ipmicmd.set_power.return_value = {'powerstate': 'on'}
self.config(retry_timeout=600, group='ipmi')
@ -138,75 +141,75 @@ class IPMINativeDriverTestCase(db_base.DbTestCase):
self.dbapi = db_api.get_instance()
self.info = ipminative._parse_driver_info(self.node)
def test_get_power_state(self):
with mock.patch('pyghmi.ipmi.command.Command') as ipmi_mock:
# Getting the mocked command.
cmd_mock = ipmi_mock.return_value
# Getting the get power mock.
get_power_mock = cmd_mock.get_power
@mock.patch('pyghmi.ipmi.command.Command')
def test_get_power_state(self, ipmi_mock):
# Getting the mocked command.
cmd_mock = ipmi_mock.return_value
# Getting the get power mock.
get_power_mock = cmd_mock.get_power
return_values = [{'powerstate': 'error'},
{'powerstate': 'on'},
{'powerstate': 'off'}]
return_values = [{'powerstate': 'error'},
{'powerstate': 'on'},
{'powerstate': 'off'}]
get_power_mock.side_effect = lambda: return_values.pop()
get_power_mock.side_effect = lambda: return_values.pop()
pstate = self.driver.power.get_power_state(None, self.node)
self.assertEqual(states.POWER_OFF, pstate)
pstate = self.driver.power.get_power_state(None, self.node)
self.assertEqual(states.POWER_OFF, pstate)
pstate = self.driver.power.get_power_state(None, self.node)
self.assertEqual(states.POWER_ON, pstate)
pstate = self.driver.power.get_power_state(None, self.node)
self.assertEqual(states.POWER_ON, pstate)
pstate = self.driver.power.get_power_state(None, self.node)
self.assertEqual(states.ERROR, pstate)
self.assertEqual(3, get_power_mock.call_count,
"pyghmi.ipmi.command.Command.get_power was not"
" called 3 times.")
pstate = self.driver.power.get_power_state(None, self.node)
self.assertEqual(states.ERROR, pstate)
self.assertEqual(3, get_power_mock.call_count,
"pyghmi.ipmi.command.Command.get_power was not"
" called 3 times.")
def test_set_power_on_ok(self):
with mock.patch.object(ipminative, '_power_on') as power_on_mock:
power_on_mock.return_value = states.POWER_ON
@mock.patch.object(ipminative, '_power_on')
def test_set_power_on_ok(self, power_on_mock):
power_on_mock.return_value = states.POWER_ON
with task_manager.acquire(self.context,
[self.node.uuid]) as task:
self.driver.power.set_power_state(
task, self.node, states.POWER_ON)
power_on_mock.assert_called_once_with(self.info)
with task_manager.acquire(self.context,
[self.node.uuid]) as task:
self.driver.power.set_power_state(
task, self.node, states.POWER_ON)
power_on_mock.assert_called_once_with(self.info)
def test_set_power_off_ok(self):
with mock.patch.object(ipminative, '_power_off') as power_off_mock:
power_off_mock.return_value = states.POWER_OFF
@mock.patch.object(ipminative, '_power_off')
def test_set_power_off_ok(self, power_off_mock):
power_off_mock.return_value = states.POWER_OFF
with task_manager.acquire(self.context,
[self.node.uuid]) as task:
self.driver.power.set_power_state(
task, self.node, states.POWER_OFF)
power_off_mock.assert_called_once_with(self.info)
with task_manager.acquire(self.context,
[self.node.uuid]) as task:
self.driver.power.set_power_state(
task, self.node, states.POWER_OFF)
power_off_mock.assert_called_once_with(self.info)
def test_set_power_on_fail(self):
with mock.patch('pyghmi.ipmi.command.Command') as ipmi_mock:
ipmicmd = ipmi_mock.return_value
ipmicmd.set_power.return_value = {'powerstate': 'error'}
@mock.patch('pyghmi.ipmi.command.Command')
def test_set_power_on_fail(self, ipmi_mock):
ipmicmd = ipmi_mock.return_value
ipmicmd.set_power.return_value = {'powerstate': 'error'}
self.config(retry_timeout=500, group='ipmi')
with task_manager.acquire(self.context,
[self.node.uuid]) as task:
self.assertRaises(exception.PowerStateFailure,
self.driver.power.set_power_state,
task,
self.node,
states.POWER_ON)
ipmicmd.set_power.assert_called_once_with('on', 500)
self.config(retry_timeout=500, group='ipmi')
with task_manager.acquire(self.context,
[self.node.uuid]) as task:
self.assertRaises(exception.PowerStateFailure,
self.driver.power.set_power_state,
task,
self.node,
states.POWER_ON)
ipmicmd.set_power.assert_called_once_with('on', 500)
def test_set_boot_device_ok(self):
with mock.patch('pyghmi.ipmi.command.Command') as ipmi_mock:
ipmicmd = ipmi_mock.return_value
ipmicmd.set_bootdev.return_value = None
@mock.patch('pyghmi.ipmi.command.Command')
def test_set_boot_device_ok(self, ipmi_mock):
ipmicmd = ipmi_mock.return_value
ipmicmd.set_bootdev.return_value = None
with task_manager.acquire(self.context,
[self.node.uuid]) as task:
self.driver.vendor._set_boot_device(task, 'pxe')
ipmicmd.set_bootdev.assert_called_once_with('pxe')
with task_manager.acquire(self.context,
[self.node.uuid]) as task:
self.driver.vendor._set_boot_device(task, 'pxe')
ipmicmd.set_bootdev.assert_called_once_with('pxe')
def test_set_boot_device_bad_device(self):
with task_manager.acquire(self.context, [self.node.uuid]) as task:
@ -215,28 +218,28 @@ class IPMINativeDriverTestCase(db_base.DbTestCase):
task,
'fake-device')
def test_reboot_ok(self):
with mock.patch.object(ipminative, '_reboot') as reboot_mock:
reboot_mock.return_value = None
@mock.patch.object(ipminative, '_reboot')
def test_reboot_ok(self, reboot_mock):
reboot_mock.return_value = None
with task_manager.acquire(self.context,
[self.node.uuid]) as task:
self.driver.power.reboot(task, self.node)
reboot_mock.assert_called_once_with(self.info)
with task_manager.acquire(self.context,
[self.node.uuid]) as task:
self.driver.power.reboot(task, self.node)
reboot_mock.assert_called_once_with(self.info)
def test_reboot_fail(self):
with mock.patch('pyghmi.ipmi.command.Command') as ipmi_mock:
ipmicmd = ipmi_mock.return_value
ipmicmd.set_power.return_value = {'powerstate': 'error'}
@mock.patch('pyghmi.ipmi.command.Command')
def test_reboot_fail(self, ipmi_mock):
ipmicmd = ipmi_mock.return_value
ipmicmd.set_power.return_value = {'powerstate': 'error'}
self.config(retry_timeout=500, group='ipmi')
with task_manager.acquire(self.context,
[self.node.uuid]) as task:
self.assertRaises(exception.PowerStateFailure,
self.driver.power.reboot,
task,
self.node)
ipmicmd.set_power.assert_called_once_with('boot', 500)
self.config(retry_timeout=500, group='ipmi')
with task_manager.acquire(self.context,
[self.node.uuid]) as task:
self.assertRaises(exception.PowerStateFailure,
self.driver.power.reboot,
task,
self.node)
ipmicmd.set_power.assert_called_once_with('boot', 500)
def test_vendor_passthru_validate__set_boot_device_good(self):
with task_manager.acquire(self.context,
@ -267,12 +270,11 @@ class IPMINativeDriverTestCase(db_base.DbTestCase):
self.driver.vendor.validate,
task, method='non-existent-method')
def test_vendor_passthru_call__set_boot_device(self):
@mock.patch.object(ipminative.VendorPassthru, '_set_boot_device')
def test_vendor_passthru_call__set_boot_device(self, boot_mock):
with task_manager.acquire(self.context, [self.node.uuid],
shared=False) as task:
with mock.patch.object(ipminative.VendorPassthru,
'_set_boot_device') as boot_mock:
self.driver.vendor.vendor_passthru(task,
method='set_boot_device',
device='pxe')
boot_mock.assert_called_once_with(task, 'pxe', False)
self.driver.vendor.vendor_passthru(task,
method='set_boot_device',
device='pxe')
boot_mock.assert_called_once_with(task, 'pxe', False)

View File

@ -104,7 +104,9 @@ class IPMIToolPrivateMethodTestCase(base.TestCase):
ipmi._parse_driver_info,
node)
def test__exec_ipmitool(self):
@mock.patch.object(ipmi, '_make_password_file', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool(self, mock_exec, mock_pwf):
pw_file_handle = tempfile.NamedTemporaryFile()
pw_file = pw_file_handle.name
file_handle = open(pw_file, "w")
@ -118,19 +120,17 @@ class IPMIToolPrivateMethodTestCase(base.TestCase):
'A', 'B', 'C',
]
with mock.patch.object(ipmi, '_make_password_file',
autospec=True) as mock_pwf:
mock_pwf.return_value = file_handle
with mock.patch.object(utils, 'execute',
autospec=True) as mock_exec:
mock_exec.return_value = (None, None)
mock_pwf.return_value = file_handle
mock_exec.return_value = (None, None)
ipmi._exec_ipmitool(self.info, 'A B C')
ipmi._exec_ipmitool(self.info, 'A B C')
mock_pwf.assert_called_once_with(self.info['password'])
mock_exec.assert_called_once_with(*args, attempts=3)
mock_pwf.assert_called_once_with(self.info['password'])
mock_exec.assert_called_once_with(*args, attempts=3)
def test__exec_ipmitool_without_password(self):
@mock.patch.object(ipmi, '_make_password_file', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool_without_password(self, mock_exec, mock_pwf):
self.info['password'] = None
pw_file_handle = tempfile.NamedTemporaryFile()
pw_file = pw_file_handle.name
@ -145,17 +145,15 @@ class IPMIToolPrivateMethodTestCase(base.TestCase):
'A', 'B', 'C',
]
with mock.patch.object(ipmi, '_make_password_file',
autospec=True) as mock_pwf:
mock_pwf.return_value = file_handle
with mock.patch.object(utils, 'execute',
autospec=True) as mock_exec:
mock_exec.return_value = (None, None)
ipmi._exec_ipmitool(self.info, 'A B C')
self.assertTrue(mock_pwf.called)
mock_exec.assert_called_once_with(*args, attempts=3)
mock_pwf.return_value = file_handle
mock_exec.return_value = (None, None)
ipmi._exec_ipmitool(self.info, 'A B C')
self.assertTrue(mock_pwf.called)
mock_exec.assert_called_once_with(*args, attempts=3)
def test__exec_ipmitool_without_username(self):
@mock.patch.object(ipmi, '_make_password_file', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool_without_username(self, mock_exec, mock_pwf):
self.info['username'] = None
pw_file_handle = tempfile.NamedTemporaryFile()
pw_file = pw_file_handle.name
@ -169,17 +167,15 @@ class IPMIToolPrivateMethodTestCase(base.TestCase):
'A', 'B', 'C',
]
with mock.patch.object(ipmi, '_make_password_file',
autospec=True) as mock_pwf:
mock_pwf.return_value = file_handle
with mock.patch.object(utils, 'execute',
autospec=True) as mock_exec:
mock_exec.return_value = (None, None)
ipmi._exec_ipmitool(self.info, 'A B C')
self.assertTrue(mock_pwf.called)
mock_exec.assert_called_once_with(*args, attempts=3)
mock_pwf.return_value = file_handle
mock_exec.return_value = (None, None)
ipmi._exec_ipmitool(self.info, 'A B C')
self.assertTrue(mock_pwf.called)
mock_exec.assert_called_once_with(*args, attempts=3)
def test__exec_ipmitool_exception(self):
@mock.patch.object(ipmi, '_make_password_file', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool_exception(self, mock_exec, mock_pwf):
pw_file_handle = tempfile.NamedTemporaryFile()
pw_file = pw_file_handle.name
file_handle = open(pw_file, "w")
@ -193,59 +189,52 @@ class IPMIToolPrivateMethodTestCase(base.TestCase):
'A', 'B', 'C',
]
with mock.patch.object(ipmi, '_make_password_file',
autospec=True) as mock_pwf:
mock_pwf.return_value = file_handle
with mock.patch.object(utils, 'execute',
autospec=True) as mock_exec:
mock_exec.side_effect = processutils.ProcessExecutionError("x")
self.assertRaises(processutils.ProcessExecutionError,
ipmi._exec_ipmitool,
self.info, 'A B C')
mock_pwf.assert_called_once_with(self.info['password'])
mock_exec.assert_called_once_with(*args, attempts=3)
mock_pwf.return_value = file_handle
mock_exec.side_effect = processutils.ProcessExecutionError("x")
self.assertRaises(processutils.ProcessExecutionError,
ipmi._exec_ipmitool,
self.info, 'A B C')
mock_pwf.assert_called_once_with(self.info['password'])
mock_exec.assert_called_once_with(*args, attempts=3)
def test__power_status_on(self):
with mock.patch.object(ipmi, '_exec_ipmitool',
autospec=True) as mock_exec:
mock_exec.return_value = ["Chassis Power is on\n", None]
@mock.patch.object(ipmi, '_exec_ipmitool', autospec=True)
def test__power_status_on(self, mock_exec):
mock_exec.return_value = ["Chassis Power is on\n", None]
state = ipmi._power_status(self.info)
state = ipmi._power_status(self.info)
mock_exec.assert_called_once_with(self.info, "power status")
self.assertEqual(states.POWER_ON, state)
mock_exec.assert_called_once_with(self.info, "power status")
self.assertEqual(states.POWER_ON, state)
def test__power_status_off(self):
with mock.patch.object(ipmi, '_exec_ipmitool',
autospec=True) as mock_exec:
mock_exec.return_value = ["Chassis Power is off\n", None]
@mock.patch.object(ipmi, '_exec_ipmitool', autospec=True)
def test__power_status_off(self, mock_exec):
mock_exec.return_value = ["Chassis Power is off\n", None]
state = ipmi._power_status(self.info)
state = ipmi._power_status(self.info)
mock_exec.assert_called_once_with(self.info, "power status")
self.assertEqual(states.POWER_OFF, state)
mock_exec.assert_called_once_with(self.info, "power status")
self.assertEqual(states.POWER_OFF, state)
def test__power_status_error(self):
with mock.patch.object(ipmi, '_exec_ipmitool',
autospec=True) as mock_exec:
mock_exec.return_value = ["Chassis Power is badstate\n", None]
@mock.patch.object(ipmi, '_exec_ipmitool', autospec=True)
def test__power_status_error(self, mock_exec):
mock_exec.return_value = ["Chassis Power is badstate\n", None]
state = ipmi._power_status(self.info)
state = ipmi._power_status(self.info)
mock_exec.assert_called_once_with(self.info, "power status")
self.assertEqual(states.ERROR, state)
mock_exec.assert_called_once_with(self.info, "power status")
self.assertEqual(states.ERROR, state)
def test__power_status_exception(self):
with mock.patch.object(ipmi, '_exec_ipmitool',
side_effect=processutils.ProcessExecutionError("error"),
autospec=True) as mock_exec:
self.assertRaises(exception.IPMIFailure,
ipmi._power_status,
self.info)
mock_exec.assert_called_once_with(self.info, "power status")
@mock.patch.object(ipmi, '_exec_ipmitool', autospec=True)
def test__power_status_exception(self, mock_exec):
mock_exec.side_effect = processutils.ProcessExecutionError("error")
self.assertRaises(exception.IPMIFailure,
ipmi._power_status,
self.info)
mock_exec.assert_called_once_with(self.info, "power status")
@mock.patch.object(ipmi, '_exec_ipmitool', autospec=True)
@mock.patch('eventlet.greenthread.sleep')
def test__power_on_max_retries(self, sleep_mock):
def test__power_on_max_retries(self, sleep_mock, mock_exec):
self.config(retry_timeout=2, group='ipmi')
def side_effect(driver_info, command):
@ -253,18 +242,16 @@ class IPMIToolPrivateMethodTestCase(base.TestCase):
"power on": [None, None]}
return resp_dict.get(command, ["Bad\n", None])
with mock.patch.object(ipmi, '_exec_ipmitool',
autospec=True) as mock_exec:
mock_exec.side_effect = side_effect
mock_exec.side_effect = side_effect
expected = [mock.call(self.info, "power on"),
mock.call(self.info, "power status"),
mock.call(self.info, "power status")]
expected = [mock.call(self.info, "power on"),
mock.call(self.info, "power status"),
mock.call(self.info, "power status")]
state = ipmi._power_on(self.info)
state = ipmi._power_on(self.info)
self.assertEqual(mock_exec.call_args_list, expected)
self.assertEqual(states.ERROR, state)
self.assertEqual(mock_exec.call_args_list, expected)
self.assertEqual(states.ERROR, state)
class IPMIToolDriverTestCase(db_base.DbTestCase):
@ -281,87 +268,83 @@ class IPMIToolDriverTestCase(db_base.DbTestCase):
driver_info=INFO_DICT)
self.info = ipmi._parse_driver_info(self.node)
def test_get_power_state(self):
returns = [["Chassis Power is off\n", None],
["Chassis Power is on\n", None],
["\n", None]]
@mock.patch.object(ipmi, '_exec_ipmitool', autospec=True)
def test_get_power_state(self, mock_exec):
returns = iter([["Chassis Power is off\n", None],
["Chassis Power is on\n", None],
["\n", None]])
expected = [mock.call(self.info, "power status"),
mock.call(self.info, "power status"),
mock.call(self.info, "power status")]
with mock.patch.object(ipmi, '_exec_ipmitool', side_effect=returns,
autospec=True) as mock_exec:
mock_exec.side_effect = returns
pstate = self.driver.power.get_power_state(None, self.node)
self.assertEqual(states.POWER_OFF, pstate)
pstate = self.driver.power.get_power_state(None, self.node)
self.assertEqual(states.POWER_OFF, pstate)
pstate = self.driver.power.get_power_state(None, self.node)
self.assertEqual(states.POWER_ON, pstate)
pstate = self.driver.power.get_power_state(None, self.node)
self.assertEqual(states.POWER_ON, pstate)
pstate = self.driver.power.get_power_state(None, self.node)
self.assertEqual(states.ERROR, pstate)
pstate = self.driver.power.get_power_state(None, self.node)
self.assertEqual(states.ERROR, pstate)
self.assertEqual(mock_exec.call_args_list, expected)
self.assertEqual(mock_exec.call_args_list, expected)
def test_get_power_state_exception(self):
with mock.patch.object(ipmi, '_exec_ipmitool',
side_effect=processutils.ProcessExecutionError("error"),
autospec=True) as mock_exec:
self.assertRaises(exception.IPMIFailure,
self.driver.power.get_power_state,
None,
self.node)
@mock.patch.object(ipmi, '_exec_ipmitool', autospec=True)
def test_get_power_state_exception(self, mock_exec):
mock_exec.side_effect = processutils.ProcessExecutionError("error")
self.assertRaises(exception.IPMIFailure,
self.driver.power.get_power_state,
None,
self.node)
mock_exec.assert_called_once_with(self.info, "power status")
def test_set_power_on_ok(self):
@mock.patch.object(ipmi, '_power_on', autospec=True)
@mock.patch.object(ipmi, '_power_off', autospec=True)
def test_set_power_on_ok(self, mock_off, mock_on):
self.config(retry_timeout=0, group='ipmi')
with mock.patch.object(ipmi, '_power_on', autospec=True) as mock_on:
mock_on.return_value = states.POWER_ON
with mock.patch.object(ipmi, '_power_off',
autospec=True) as mock_off:
mock_on.return_value = states.POWER_ON
with task_manager.acquire(self.context,
[self.node['uuid']]) as task:
self.driver.power.set_power_state(task,
self.node,
states.POWER_ON)
with task_manager.acquire(self.context,
[self.node['uuid']]) as task:
self.driver.power.set_power_state(
task, self.node, states.POWER_ON)
mock_on.assert_called_once_with(self.info)
self.assertFalse(mock_off.called)
mock_on.assert_called_once_with(self.info)
self.assertFalse(mock_off.called)
def test_set_power_off_ok(self):
@mock.patch.object(ipmi, '_power_on', autospec=True)
@mock.patch.object(ipmi, '_power_off', autospec=True)
def test_set_power_off_ok(self, mock_off, mock_on):
self.config(retry_timeout=0, group='ipmi')
with mock.patch.object(ipmi, '_power_on', autospec=True) as mock_on:
with mock.patch.object(ipmi, '_power_off',
autospec=True) as mock_off:
mock_off.return_value = states.POWER_OFF
mock_off.return_value = states.POWER_OFF
with task_manager.acquire(self.context,
[self.node['uuid']]) as task:
self.driver.power.set_power_state(
task, self.node, states.POWER_OFF)
with task_manager.acquire(self.context,
[self.node['uuid']]) as task:
self.driver.power.set_power_state(task,
self.node,
states.POWER_OFF)
mock_off.assert_called_once_with(self.info)
self.assertFalse(mock_on.called)
mock_off.assert_called_once_with(self.info)
self.assertFalse(mock_on.called)
def test_set_power_on_fail(self):
@mock.patch.object(ipmi, '_power_on', autospec=True)
@mock.patch.object(ipmi, '_power_off', autospec=True)
def test_set_power_on_fail(self, mock_off, mock_on):
self.config(retry_timeout=0, group='ipmi')
with mock.patch.object(ipmi, '_power_on', autospec=True) as mock_on:
mock_on.return_value = states.ERROR
with mock.patch.object(ipmi, '_power_off',
autospec=True) as mock_off:
mock_on.return_value = states.ERROR
with task_manager.acquire(self.context,
[self.node['uuid']]) as task:
self.assertRaises(exception.PowerStateFailure,
self.driver.power.set_power_state,
task,
self.node,
states.POWER_ON)
with task_manager.acquire(self.context,
[self.node['uuid']]) as task:
self.assertRaises(exception.PowerStateFailure,
self.driver.power.set_power_state,
task,
self.node,
states.POWER_ON)
mock_on.assert_called_once_with(self.info)
self.assertFalse(mock_off.called)
mock_on.assert_called_once_with(self.info)
self.assertFalse(mock_off.called)
def test_set_power_invalid_state(self):
with task_manager.acquire(self.context, [self.node['uuid']]) as task:
@ -371,16 +354,15 @@ class IPMIToolDriverTestCase(db_base.DbTestCase):
self.node,
"fake state")
def test_set_boot_device_ok(self):
with mock.patch.object(ipmi, '_exec_ipmitool',
autospec=True) as mock_exec:
mock_exec.return_value = [None, None]
@mock.patch.object(ipmi, '_exec_ipmitool', autospec=True)
def test_set_boot_device_ok(self, mock_exec):
mock_exec.return_value = [None, None]
with task_manager.acquire(self.context,
[self.node['uuid']]) as task:
self.driver.vendor._set_boot_device(task, 'pxe')
with task_manager.acquire(self.context,
[self.node['uuid']]) as task:
self.driver.vendor._set_boot_device(task, 'pxe')
mock_exec.assert_called_once_with(self.info, "chassis bootdev pxe")
mock_exec.assert_called_once_with(self.info, "chassis bootdev pxe")
def test_set_boot_device_bad_device(self):
with task_manager.acquire(self.context, [self.node['uuid']]) as task:
@ -389,44 +371,42 @@ class IPMIToolDriverTestCase(db_base.DbTestCase):
task,
'fake-device')
def test_reboot_ok(self):
@mock.patch.object(ipmi, '_power_off', autospec=False)
@mock.patch.object(ipmi, '_power_on', autospec=False)
def test_reboot_ok(self, mock_on, mock_off):
manager = mock.MagicMock()
#NOTE(rloo): if autospec is True, then manager.mock_calls is empty
with mock.patch.object(ipmi, '_power_off', autospec=False) as mock_off:
with mock.patch.object(ipmi, '_power_on',
autospec=False) as mock_on:
mock_on.return_value = states.POWER_ON
manager.attach_mock(mock_off, 'power_off')
manager.attach_mock(mock_on, 'power_on')
expected = [mock.call.power_off(self.info),
mock.call.power_on(self.info)]
mock_on.return_value = states.POWER_ON
manager.attach_mock(mock_off, 'power_off')
manager.attach_mock(mock_on, 'power_on')
expected = [mock.call.power_off(self.info),
mock.call.power_on(self.info)]
with task_manager.acquire(self.context,
[self.node['uuid']]) as task:
self.driver.power.reboot(task, self.node)
with task_manager.acquire(self.context,
[self.node['uuid']]) as task:
self.driver.power.reboot(task, self.node)
self.assertEqual(manager.mock_calls, expected)
self.assertEqual(manager.mock_calls, expected)
def test_reboot_fail(self):
@mock.patch.object(ipmi, '_power_off', autospec=False)
@mock.patch.object(ipmi, '_power_on', autospec=False)
def test_reboot_fail(self, mock_on, mock_off):
manager = mock.MagicMock()
#NOTE(rloo): if autospec is True, then manager.mock_calls is empty
with mock.patch.object(ipmi, '_power_off', autospec=False) as mock_off:
with mock.patch.object(ipmi, '_power_on',
autospec=False) as mock_on:
mock_on.return_value = states.ERROR
manager.attach_mock(mock_off, 'power_off')
manager.attach_mock(mock_on, 'power_on')
expected = [mock.call.power_off(self.info),
mock.call.power_on(self.info)]
mock_on.return_value = states.ERROR
manager.attach_mock(mock_off, 'power_off')
manager.attach_mock(mock_on, 'power_on')
expected = [mock.call.power_off(self.info),
mock.call.power_on(self.info)]
with task_manager.acquire(self.context,
[self.node['uuid']]) as task:
self.assertRaises(exception.PowerStateFailure,
self.driver.power.reboot,
task,
self.node)
with task_manager.acquire(self.context,
[self.node['uuid']]) as task:
self.assertRaises(exception.PowerStateFailure,
self.driver.power.reboot,
task,
self.node)
self.assertEqual(manager.mock_calls, expected)
self.assertEqual(manager.mock_calls, expected)
def test_vendor_passthru_validate__set_boot_device_good(self):
with task_manager.acquire(self.context, self.node['uuid']) as task:
@ -453,14 +433,13 @@ class IPMIToolDriverTestCase(db_base.DbTestCase):
self.driver.vendor.validate,
task, method='fake_method')
def test_vendor_passthru_call_set_boot_device(self):
@mock.patch.object(ipmi.VendorPassthru, '_set_boot_device')
def test_vendor_passthru_call_set_boot_device(self, boot_mock):
with task_manager.acquire(self.context, [self.node['uuid']],
shared=False) as task:
with mock.patch.object(ipmi.VendorPassthru,
'_set_boot_device') as boot_mock:
self.driver.vendor.vendor_passthru(task,
method='set_boot_device',
device='pxe')
self.driver.vendor.vendor_passthru(task,
method='set_boot_device',
device='pxe')
boot_mock.assert_called_once_with(task, 'pxe', False)
@mock.patch.object(ipmi, '_exec_ipmitool')

View File

@ -198,7 +198,8 @@ class PXEPrivateMethodsTestCase(db_base.DbTestCase):
p = db_utils.get_test_port(**kwargs)
return self.dbapi.create_port(p)
def test__get_tftp_image_info(self):
@mock.patch.object(base_image_service.BaseImageService, '_show')
def test__get_tftp_image_info(self, show_mock):
properties = {'properties': {u'kernel_id': u'instance_kernel_uuid',
u'ramdisk_id': u'instance_ramdisk_uuid'}}
@ -222,26 +223,24 @@ class PXEPrivateMethodsTestCase(db_base.DbTestCase):
os.path.join(CONF.pxe.tftp_root,
self.node.uuid,
'deploy_kernel')]}
with mock.patch.object(base_image_service.BaseImageService, '_show') \
as show_mock:
show_mock.return_value = properties
image_info = pxe._get_tftp_image_info(self.node, self.context)
show_mock.assert_called_once_with('glance://image_uuid',
method='get')
self.assertEqual(expected_info, image_info)
show_mock.return_value = properties
image_info = pxe._get_tftp_image_info(self.node, self.context)
show_mock.assert_called_once_with('glance://image_uuid',
method='get')
self.assertEqual(expected_info, image_info)
# test with saved info
with mock.patch.object(base_image_service.BaseImageService, '_show') \
as show_mock:
image_info = pxe._get_tftp_image_info(self.node, self.context)
self.assertEqual(expected_info, image_info)
self.assertFalse(show_mock.called)
self.assertEqual('instance_kernel_uuid',
self.node.driver_info.get('pxe_kernel'))
self.assertEqual('instance_ramdisk_uuid',
self.node.driver_info.get('pxe_ramdisk'))
show_mock.reset_mock()
image_info = pxe._get_tftp_image_info(self.node, self.context)
self.assertEqual(expected_info, image_info)
self.assertFalse(show_mock.called)
self.assertEqual('instance_kernel_uuid',
self.node.driver_info.get('pxe_kernel'))
self.assertEqual('instance_ramdisk_uuid',
self.node.driver_info.get('pxe_ramdisk'))
def test__build_pxe_config(self):
@mock.patch.object(utils, 'random_alnum')
def test__build_pxe_config(self, random_alnum_mock):
self.config(pxe_append_params='test_param', group='pxe')
# NOTE: right '/' should be removed from url string
self.config(api_url='http://192.168.122.184:6385/', group='conductor')
@ -250,32 +249,31 @@ class PXEPrivateMethodsTestCase(db_base.DbTestCase):
pxe_config_template = open(template, 'r').read()
fake_key = '0123456789ABCDEFGHIJKLMNOPQRSTUV'
with mock.patch.object(utils, 'random_alnum') as random_alnum_mock:
random_alnum_mock.return_value = fake_key
random_alnum_mock.return_value = fake_key
image_info = {'deploy_kernel': ['deploy_kernel',
os.path.join(CONF.pxe.tftp_root,
self.node.uuid,
'deploy_kernel')],
'deploy_ramdisk': ['deploy_ramdisk',
os.path.join(CONF.pxe.tftp_root,
self.node.uuid,
'deploy_ramdisk')],
'kernel': ['kernel_id',
os.path.join(CONF.pxe.tftp_root,
self.node.uuid,
'kernel')],
'ramdisk': ['ramdisk_id',
os.path.join(CONF.pxe.tftp_root,
self.node.uuid,
'ramdisk')]
image_info = {'deploy_kernel': ['deploy_kernel',
os.path.join(CONF.pxe.tftp_root,
self.node.uuid,
'deploy_kernel')],
'deploy_ramdisk': ['deploy_ramdisk',
os.path.join(CONF.pxe.tftp_root,
self.node.uuid,
'deploy_ramdisk')],
'kernel': ['kernel_id',
os.path.join(CONF.pxe.tftp_root,
self.node.uuid,
'kernel')],
'ramdisk': ['ramdisk_id',
os.path.join(CONF.pxe.tftp_root,
self.node.uuid,
'ramdisk')]
}
pxe_config = pxe._build_pxe_config(self.node,
image_info,
self.context)
pxe_config = pxe._build_pxe_config(self.node,
image_info,
self.context)
random_alnum_mock.assert_called_once_with(32)
self.assertEqual(pxe_config_template, pxe_config)
random_alnum_mock.assert_called_once_with(32)
self.assertEqual(pxe_config_template, pxe_config)
# test that deploy_key saved
db_node = self.dbapi.get_node_by_uuid(self.node.uuid)
@ -312,54 +310,50 @@ class PXEPrivateMethodsTestCase(db_base.DbTestCase):
result = pxe._get_node_vif_ids(task)
self.assertEqual(expected, result)
def test__update_neutron(self):
@mock.patch.object(pxe, '_get_node_vif_ids')
@mock.patch.object(neutron.NeutronAPI, 'update_port_dhcp_opts')
def test__update_neutron(self, mock_updo, mock_gnvi):
opts = pxe._dhcp_options_for_instance()
with mock.patch.object(pxe, '_get_node_vif_ids') as mock_gnvi:
mock_gnvi.return_value = {'port-uuid': 'vif-uuid'}
with mock.patch.object(neutron.NeutronAPI,
'update_port_dhcp_opts') as mock_updo:
with task_manager.acquire(self.context,
self.node.uuid) as task:
pxe._update_neutron(task, self.node)
mock_updo.assertCalleOnceWith('vif-uuid', opts)
mock_gnvi.return_value = {'port-uuid': 'vif-uuid'}
with task_manager.acquire(self.context,
self.node.uuid) as task:
pxe._update_neutron(task, self.node)
mock_updo.assertCalleOnceWith('vif-uuid', opts)
def test__update_neutron_no_vif_data(self):
with mock.patch.object(pxe, '_get_node_vif_ids') as mock_gnvi:
mock_gnvi.return_value = {}
with mock.patch.object(neutron.NeutronAPI,
'__init__') as mock_init:
with task_manager.acquire(self.context,
self.node.uuid) as task:
pxe._update_neutron(task, self.node)
mock_init.assert_not_called()
@mock.patch.object(pxe, '_get_node_vif_ids')
@mock.patch.object(neutron.NeutronAPI, '__init__')
def test__update_neutron_no_vif_data(self, mock_init, mock_gnvi):
mock_gnvi.return_value = {}
with task_manager.acquire(self.context,
self.node.uuid) as task:
pxe._update_neutron(task, self.node)
mock_init.assert_not_called()
def test__update_neutron_some_failures(self):
@mock.patch.object(pxe, '_get_node_vif_ids')
@mock.patch.object(neutron.NeutronAPI, 'update_port_dhcp_opts')
def test__update_neutron_some_failures(self, mock_updo, mock_gnvi):
# confirm update is called twice, one fails, but no exception raised
with mock.patch.object(pxe, '_get_node_vif_ids') as mock_gnvi:
mock_gnvi.return_value = {'p1': 'v1', 'p2': 'v2'}
with mock.patch.object(neutron.NeutronAPI,
'update_port_dhcp_opts') as mock_updo:
exc = exception.FailedToUpdateDHCPOptOnPort('fake exception')
mock_updo.side_effect = [None, exc]
with task_manager.acquire(self.context,
self.node.uuid) as task:
pxe._update_neutron(task, self.node)
self.assertEqual(2, mock_updo.call_count)
mock_gnvi.return_value = {'p1': 'v1', 'p2': 'v2'}
exc = exception.FailedToUpdateDHCPOptOnPort('fake exception')
mock_updo.side_effect = [None, exc]
with task_manager.acquire(self.context,
self.node.uuid) as task:
pxe._update_neutron(task, self.node)
self.assertEqual(2, mock_updo.call_count)
def test__update_neutron_fails(self):
@mock.patch.object(pxe, '_get_node_vif_ids')
@mock.patch.object(neutron.NeutronAPI, 'update_port_dhcp_opts')
def test__update_neutron_fails(self, mock_updo, mock_gnvi):
# confirm update is called twice, both fail, and exception is raised
with mock.patch.object(pxe, '_get_node_vif_ids') as mock_gnvi:
mock_gnvi.return_value = {'p1': 'v1', 'p2': 'v2'}
with mock.patch.object(neutron.NeutronAPI,
'update_port_dhcp_opts') as mock_updo:
exc = exception.FailedToUpdateDHCPOptOnPort('fake exception')
mock_updo.side_effect = [exc, exc]
with task_manager.acquire(self.context,
self.node.uuid) as task:
self.assertRaises(exception.FailedToUpdateDHCPOptOnPort,
pxe._update_neutron,
task, self.node)
self.assertEqual(2, mock_updo.call_count)
mock_gnvi.return_value = {'p1': 'v1', 'p2': 'v2'}
exc = exception.FailedToUpdateDHCPOptOnPort('fake exception')
mock_updo.side_effect = [exc, exc]
with task_manager.acquire(self.context,
self.node.uuid) as task:
self.assertRaises(exception.FailedToUpdateDHCPOptOnPort,
pxe._update_neutron,
task, self.node)
self.assertEqual(2, mock_updo.call_count)
def test__dhcp_options_for_instance(self):
self.config(pxe_bootfile_name='test_pxe_bootfile', group='pxe')
@ -591,39 +585,37 @@ class PXEDriverTestCase(db_base.DbTestCase):
mock_img_path.assert_called_once_with(task.node.uuid)
mock_get_img_mb.assert_called_once_with(fake_img_path)
def test_deploy(self):
with mock.patch.object(pxe, '_update_neutron') as update_neutron_mock:
with mock.patch.object(manager_utils,
'node_power_action') as node_power_mock:
with mock.patch.object(manager_utils,
'node_set_boot_device') as node_set_boot_mock:
with task_manager.acquire(self.context,
self.node.uuid, shared=False) as task:
state = task.driver.deploy.deploy(task, self.node)
self.assertEqual(state, states.DEPLOYWAIT)
update_neutron_mock.assert_called_once_with(task,
self.node)
node_set_boot_mock.assert_called_once_with(task,
'pxe',
persistent=True)
node_power_mock.assert_called_once_with(task,
self.node,
states.REBOOT)
@mock.patch.object(pxe, '_update_neutron')
@mock.patch.object(manager_utils, 'node_power_action')
@mock.patch.object(manager_utils, 'node_set_boot_device')
def test_deploy(self, node_set_boot_mock, node_power_mock,
update_neutron_mock):
with task_manager.acquire(self.context,
self.node.uuid, shared=False) as task:
state = task.driver.deploy.deploy(task, self.node)
self.assertEqual(state, states.DEPLOYWAIT)
update_neutron_mock.assert_called_once_with(task,
self.node)
node_set_boot_mock.assert_called_once_with(task,
'pxe',
persistent=True)
node_power_mock.assert_called_once_with(task,
self.node,
states.REBOOT)
# ensure token file created
t_path = pxe._get_token_file_path(self.node.uuid)
token = open(t_path, 'r').read()
self.assertEqual(self.context.auth_token, token)
# ensure token file created
t_path = pxe._get_token_file_path(self.node.uuid)
token = open(t_path, 'r').read()
self.assertEqual(self.context.auth_token, token)
def test_tear_down(self):
with mock.patch.object(manager_utils,
'node_power_action') as node_power_mock:
with task_manager.acquire(self.context,
self.node.uuid) as task:
state = task.driver.deploy.tear_down(task, self.node)
self.assertEqual(states.DELETED, state)
node_power_mock.assert_called_once_with(task, self.node,
states.POWER_OFF)
@mock.patch.object(manager_utils, 'node_power_action')
def test_tear_down(self, node_power_mock):
with task_manager.acquire(self.context,
self.node.uuid) as task:
state = task.driver.deploy.tear_down(task, self.node)
self.assertEqual(states.DELETED, state)
node_power_mock.assert_called_once_with(task, self.node,
states.POWER_OFF)
@mock.patch.object(manager_utils, 'node_power_action')
def test_tear_down_removes_internal_attrs(self, mock_npa):
@ -643,12 +635,12 @@ class PXEDriverTestCase(db_base.DbTestCase):
self.assertNotIn('pxe_ramdisk', self.node.driver_info)
mock_npa.assert_called_once_with(task, self.node, states.POWER_OFF)
def test_take_over(self):
with mock.patch.object(pxe, '_update_neutron') as update_neutron_mock:
with task_manager.acquire(
self.context, self.node.uuid, shared=True) as task:
task.driver.deploy.take_over(task, self.node)
update_neutron_mock.assert_called_once_with(task, self.node)
@mock.patch.object(pxe, '_update_neutron')
def test_take_over(self, update_neutron_mock):
with task_manager.acquire(
self.context, self.node.uuid, shared=True) as task:
task.driver.deploy.take_over(task, self.node)
update_neutron_mock.assert_called_once_with(task, self.node)
def test_continue_deploy_good(self):
token_path = self._create_token_file()
@ -743,7 +735,8 @@ class PXEDriverTestCase(db_base.DbTestCase):
self.assertEqual(1, _continue_deploy_mock.call_count,
"_continue_deploy was not called once.")
def clean_up_config(self, master=None):
@mock.patch.object(pxe, '_get_tftp_image_info')
def clean_up_config(self, get_tftp_image_info_mock, master=None):
temp_dir = tempfile.mkdtemp()
self.config(tftp_root=temp_dir, group='pxe')
tftp_master_dir = os.path.join(CONF.pxe.tftp_root,
@ -770,57 +763,55 @@ class PXEDriverTestCase(db_base.DbTestCase):
self.node.uuid, 'deploy_kernel')
image_info = {'deploy_kernel': ['deploy_kernel_uuid', d_kernel_path]}
with mock.patch.object(pxe, '_get_tftp_image_info') \
as get_tftp_image_info_mock:
get_tftp_image_info_mock.return_value = image_info
get_tftp_image_info_mock.return_value = image_info
pxecfg_dir = os.path.join(CONF.pxe.tftp_root, 'pxelinux.cfg')
os.makedirs(pxecfg_dir)
pxecfg_dir = os.path.join(CONF.pxe.tftp_root, 'pxelinux.cfg')
os.makedirs(pxecfg_dir)
instance_dir = os.path.join(CONF.pxe.tftp_root,
self.node.uuid)
image_dir = os.path.join(CONF.pxe.images_path, self.node.uuid)
os.makedirs(instance_dir)
os.makedirs(image_dir)
config_path = os.path.join(instance_dir, 'config')
deploy_kernel_path = os.path.join(instance_dir, 'deploy_kernel')
pxe_mac_path = os.path.join(pxecfg_dir, '01-aa-bb-cc')
image_path = os.path.join(image_dir, 'disk')
open(config_path, 'w').close()
os.link(config_path, pxe_mac_path)
if master:
master_deploy_kernel_path = os.path.join(tftp_master_dir,
'deploy_kernel_uuid')
master_instance_path = os.path.join(instance_master_dir,
'image_uuid')
open(master_deploy_kernel_path, 'w').close()
open(master_instance_path, 'w').close()
instance_dir = os.path.join(CONF.pxe.tftp_root,
self.node.uuid)
image_dir = os.path.join(CONF.pxe.images_path, self.node.uuid)
os.makedirs(instance_dir)
os.makedirs(image_dir)
config_path = os.path.join(instance_dir, 'config')
deploy_kernel_path = os.path.join(instance_dir, 'deploy_kernel')
pxe_mac_path = os.path.join(pxecfg_dir, '01-aa-bb-cc')
image_path = os.path.join(image_dir, 'disk')
open(config_path, 'w').close()
os.link(config_path, pxe_mac_path)
if master:
master_deploy_kernel_path = os.path.join(tftp_master_dir,
'deploy_kernel_uuid')
master_instance_path = os.path.join(instance_master_dir,
'image_uuid')
open(master_deploy_kernel_path, 'w').close()
open(master_instance_path, 'w').close()
os.link(master_deploy_kernel_path, deploy_kernel_path)
os.link(master_instance_path, image_path)
if master == 'in_use':
deploy_kernel_link = os.path.join(CONF.pxe.tftp_root,
'deploy_kernel_link')
image_link = os.path.join(CONF.pxe.images_path,
'image_link')
os.link(master_deploy_kernel_path, deploy_kernel_link)
os.link(master_instance_path, image_link)
os.link(master_deploy_kernel_path, deploy_kernel_path)
os.link(master_instance_path, image_path)
if master == 'in_use':
deploy_kernel_link = os.path.join(CONF.pxe.tftp_root,
'deploy_kernel_link')
image_link = os.path.join(CONF.pxe.images_path,
'image_link')
os.link(master_deploy_kernel_path, deploy_kernel_link)
os.link(master_instance_path, image_link)
else:
open(deploy_kernel_path, 'w').close()
open(image_path, 'w').close()
token_path = self._create_token_file()
self.config(image_cache_size=0, group='pxe')
else:
open(deploy_kernel_path, 'w').close()
open(image_path, 'w').close()
token_path = self._create_token_file()
self.config(image_cache_size=0, group='pxe')
with task_manager.acquire(self.context, [self.node.uuid],
shared=True) as task:
task.resources[0].driver.deploy.clean_up(task, self.node)
get_tftp_image_info_mock.called_once_with(self.node)
assert_false_path = [config_path, deploy_kernel_path, image_path,
pxe_mac_path, image_dir, instance_dir,
token_path]
for path in assert_false_path:
self.assertFalse(os.path.exists(path))
with task_manager.acquire(self.context, [self.node.uuid],
shared=True) as task:
task.resources[0].driver.deploy.clean_up(task, self.node)
get_tftp_image_info_mock.called_once_with(self.node)
assert_false_path = [config_path, deploy_kernel_path, image_path,
pxe_mac_path, image_dir, instance_dir,
token_path]
for path in assert_false_path:
self.assertFalse(os.path.exists(path))
def test_clean_up_no_master_images(self):
self.clean_up_config(master=None)

View File

@ -192,116 +192,99 @@ class SSHPrivateMethodsTestCase(base.TestCase):
driver_info=db_utils.get_test_ssh_info())
self.sshclient = paramiko.SSHClient()
# Set up the mock for processutils.ssh_execute because most tests use
# it. processutils.ssh_execute returns (stdout, stderr).
self.ssh_patcher = mock.patch.object(processutils, 'ssh_execute')
self.exec_ssh_mock = self.ssh_patcher.start()
self.exec_ssh_mock.return_value = ('', '')
@mock.patch.object(utils, 'ssh_connect')
def test__get_connection_client(self, ssh_connect_mock):
ssh_connect_mock.return_value = self.sshclient
client = ssh._get_connection(self.node)
self.assertEqual(self.sshclient, client)
driver_info = ssh._parse_driver_info(self.node)
ssh_connect_mock.assert_called_once_with(driver_info)
def stop_patcher():
if self.ssh_patcher:
self.ssh_patcher.stop()
@mock.patch.object(utils, 'ssh_connect')
def test__get_connection_exception(self, ssh_connect_mock):
ssh_connect_mock.side_effect = exception.SSHConnectFailed(host='fake')
self.assertRaises(exception.SSHConnectFailed,
ssh._get_connection,
self.node)
driver_info = ssh._parse_driver_info(self.node)
ssh_connect_mock.assert_called_once_with(driver_info)
self.addCleanup(stop_patcher)
def test__get_connection_client(self):
with mock.patch.object(
utils, 'ssh_connect') as ssh_connect_mock:
ssh_connect_mock.return_value = self.sshclient
client = ssh._get_connection(self.node)
self.assertEqual(self.sshclient, client)
driver_info = ssh._parse_driver_info(self.node)
ssh_connect_mock.assert_called_once_with(driver_info)
def test__get_connection_exception(self):
with mock.patch.object(
utils, 'ssh_connect') as ssh_connect_mock:
ssh_connect_mock.side_effect = exception.SSHConnectFailed(
host='fake')
self.assertRaises(exception.SSHConnectFailed,
ssh._get_connection,
self.node)
driver_info = ssh._parse_driver_info(self.node)
ssh_connect_mock.assert_called_once_with(driver_info)
def test__ssh_execute(self):
@mock.patch.object(processutils, 'ssh_execute')
def test__ssh_execute(self, exec_ssh_mock):
ssh_cmd = "somecmd"
expected = ['a', 'b', 'c']
self.exec_ssh_mock.return_value = ('\n'.join(expected), '')
exec_ssh_mock.return_value = ('\n'.join(expected), '')
lst = ssh._ssh_execute(self.sshclient, ssh_cmd)
self.exec_ssh_mock.assert_called_once_with(self.sshclient, ssh_cmd)
exec_ssh_mock.assert_called_once_with(self.sshclient, ssh_cmd)
self.assertEqual(expected, lst)
def test__ssh_execute_exception(self):
@mock.patch.object(processutils, 'ssh_execute')
def test__ssh_execute_exception(self, exec_ssh_mock):
ssh_cmd = "somecmd"
self.exec_ssh_mock.side_effect = processutils.ProcessExecutionError
exec_ssh_mock.side_effect = processutils.ProcessExecutionError
self.assertRaises(exception.SSHCommandFailed,
ssh._ssh_execute,
self.sshclient,
ssh_cmd)
self.exec_ssh_mock.assert_called_once_with(self.sshclient, ssh_cmd)
exec_ssh_mock.assert_called_once_with(self.sshclient, ssh_cmd)
def test__get_power_status_on(self):
@mock.patch.object(processutils, 'ssh_execute')
@mock.patch.object(ssh, '_get_hosts_name_for_node')
def test__get_power_status_on(self, get_hosts_name_mock, exec_ssh_mock):
info = ssh._parse_driver_info(self.node)
with mock.patch.object(ssh, '_get_hosts_name_for_node') \
as get_hosts_name_mock:
self.exec_ssh_mock.return_value = (
'"NodeName" {b43c4982-110c-4c29-9325-d5f41b053513}', '')
get_hosts_name_mock.return_value = "NodeName"
exec_ssh_mock.return_value = (
'"NodeName" {b43c4982-110c-4c29-9325-d5f41b053513}', '')
get_hosts_name_mock.return_value = "NodeName"
pstate = ssh._get_power_status(self.sshclient, info)
pstate = ssh._get_power_status(self.sshclient, info)
ssh_cmd = "%s %s" % (info['cmd_set']['base_cmd'],
info['cmd_set']['list_running'])
self.assertEqual(states.POWER_ON, pstate)
self.exec_ssh_mock.assert_called_once_with(
self.sshclient, ssh_cmd)
get_hosts_name_mock.assert_called_once_with(self.sshclient,
info)
ssh_cmd = "%s %s" % (info['cmd_set']['base_cmd'],
info['cmd_set']['list_running'])
self.assertEqual(states.POWER_ON, pstate)
exec_ssh_mock.assert_called_once_with(self.sshclient, ssh_cmd)
get_hosts_name_mock.assert_called_once_with(self.sshclient, info)
def test__get_power_status_off(self):
@mock.patch.object(processutils, 'ssh_execute')
@mock.patch.object(ssh, '_get_hosts_name_for_node')
def test__get_power_status_off(self, get_hosts_name_mock, exec_ssh_mock):
info = ssh._parse_driver_info(self.node)
with mock.patch.object(ssh, '_get_hosts_name_for_node') \
as get_hosts_name_mock:
self.exec_ssh_mock.return_value = (
'"NodeName" {b43c4982-110c-4c29-9325-d5f41b053513}', '')
get_hosts_name_mock.return_value = "NotNodeName"
exec_ssh_mock.return_value = (
'"NodeName" {b43c4982-110c-4c29-9325-d5f41b053513}', '')
get_hosts_name_mock.return_value = "NotNodeName"
pstate = ssh._get_power_status(self.sshclient, info)
pstate = ssh._get_power_status(self.sshclient, info)
ssh_cmd = "%s %s" % (info['cmd_set']['base_cmd'],
info['cmd_set']['list_running'])
self.assertEqual(states.POWER_OFF, pstate)
self.exec_ssh_mock.assert_called_once_with(self.sshclient,
ssh_cmd)
get_hosts_name_mock.assert_called_once_with(self.sshclient,
info)
ssh_cmd = "%s %s" % (info['cmd_set']['base_cmd'],
info['cmd_set']['list_running'])
self.assertEqual(states.POWER_OFF, pstate)
exec_ssh_mock.assert_called_once_with(self.sshclient, ssh_cmd)
get_hosts_name_mock.assert_called_once_with(self.sshclient, info)
@mock.patch.object(processutils, 'ssh_execute')
@mock.patch.object(ssh, '_get_hosts_name_for_node')
def test__get_power_status_error(self, get_hosts_name_mock, exec_ssh_mock):
def test__get_power_status_error(self):
info = ssh._parse_driver_info(self.node)
with mock.patch.object(ssh, '_get_hosts_name_for_node') \
as get_hosts_name_mock:
self.exec_ssh_mock.return_value = (
'"NodeName" {b43c4982-110c-4c29-9325-d5f41b053513}', '')
info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"]
get_hosts_name_mock.return_value = None
self.assertRaises(exception.NodeNotFound,
ssh._get_power_status,
self.sshclient,
info)
exec_ssh_mock.return_value = (
'"NodeName" {b43c4982-110c-4c29-9325-d5f41b053513}', '')
info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"]
get_hosts_name_mock.return_value = None
self.assertRaises(exception.NodeNotFound,
ssh._get_power_status,
self.sshclient,
info)
ssh_cmd = "%s %s" % (info['cmd_set']['base_cmd'],
info['cmd_set']['list_running'])
ssh_cmd = "%s %s" % (info['cmd_set']['base_cmd'],
info['cmd_set']['list_running'])
self.exec_ssh_mock.assert_called_once_with(
self.sshclient, ssh_cmd)
get_hosts_name_mock.assert_called_once_with(self.sshclient,
info)
exec_ssh_mock.assert_called_once_with(self.sshclient, ssh_cmd)
get_hosts_name_mock.assert_called_once_with(self.sshclient, info)
def test__get_power_status_exception(self):
@mock.patch.object(processutils, 'ssh_execute')
def test__get_power_status_exception(self, exec_ssh_mock):
info = ssh._parse_driver_info(self.node)
self.exec_ssh_mock.side_effect = processutils.ProcessExecutionError
exec_ssh_mock.side_effect = processutils.ProcessExecutionError
self.assertRaises(exception.SSHCommandFailed,
ssh._get_power_status,
@ -309,10 +292,11 @@ class SSHPrivateMethodsTestCase(base.TestCase):
info)
ssh_cmd = "%s %s" % (info['cmd_set']['base_cmd'],
info['cmd_set']['list_running'])
self.exec_ssh_mock.assert_called_once_with(
exec_ssh_mock.assert_called_once_with(
self.sshclient, ssh_cmd)
def test__get_hosts_name_for_node_match(self):
@mock.patch.object(processutils, 'ssh_execute')
def test__get_hosts_name_for_node_match(self, exec_ssh_mock):
info = ssh._parse_driver_info(self.node)
info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"]
ssh_cmd = "%s %s" % (info['cmd_set']['base_cmd'],
@ -321,7 +305,7 @@ class SSHPrivateMethodsTestCase(base.TestCase):
cmd_to_exec = "%s %s" % (info['cmd_set']['base_cmd'],
info['cmd_set']['get_node_macs'])
cmd_to_exec = cmd_to_exec.replace('{_NodeName_}', 'NodeName')
self.exec_ssh_mock.side_effect = [('NodeName', ''),
exec_ssh_mock.side_effect = [('NodeName', ''),
('52:54:00:cf:2d:31', '')]
expected = [mock.call(self.sshclient, ssh_cmd),
mock.call(self.sshclient, cmd_to_exec)]
@ -329,12 +313,13 @@ class SSHPrivateMethodsTestCase(base.TestCase):
found_name = ssh._get_hosts_name_for_node(self.sshclient, info)
self.assertEqual('NodeName', found_name)
self.assertEqual(expected, self.exec_ssh_mock.call_args_list)
self.assertEqual(expected, exec_ssh_mock.call_args_list)
def test__get_hosts_name_for_node_no_match(self):
@mock.patch.object(processutils, 'ssh_execute')
def test__get_hosts_name_for_node_no_match(self, exec_ssh_mock):
info = ssh._parse_driver_info(self.node)
info['macs'] = ["11:11:11:11:11:11", "22:22:22:22:22:22"]
self.exec_ssh_mock.side_effect = [('NodeName', ''),
exec_ssh_mock.side_effect = [('NodeName', ''),
('52:54:00:cf:2d:31', '')]
ssh_cmd = "%s %s" % (info['cmd_set']['base_cmd'],
@ -350,9 +335,10 @@ class SSHPrivateMethodsTestCase(base.TestCase):
found_name = ssh._get_hosts_name_for_node(self.sshclient, info)
self.assertIsNone(found_name)
self.assertEqual(expected, self.exec_ssh_mock.call_args_list)
self.assertEqual(expected, exec_ssh_mock.call_args_list)
def test__get_hosts_name_for_node_exception(self):
@mock.patch.object(processutils, 'ssh_execute')
def test__get_hosts_name_for_node_exception(self, exec_ssh_mock):
info = ssh._parse_driver_info(self.node)
info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"]
ssh_cmd = "%s %s" % (info['cmd_set']['base_cmd'],
@ -362,8 +348,8 @@ class SSHPrivateMethodsTestCase(base.TestCase):
info['cmd_set']['get_node_macs'])
cmd_to_exec = cmd_to_exec.replace('{_NodeName_}', 'NodeName')
self.exec_ssh_mock.side_effect = [('NodeName', ''),
processutils.ProcessExecutionError]
exec_ssh_mock.side_effect = [('NodeName', ''),
processutils.ProcessExecutionError]
expected = [mock.call(self.sshclient, ssh_cmd),
mock.call(self.sshclient, cmd_to_exec)]
@ -371,176 +357,149 @@ class SSHPrivateMethodsTestCase(base.TestCase):
ssh._get_hosts_name_for_node,
self.sshclient,
info)
self.assertEqual(expected, self.exec_ssh_mock.call_args_list)
self.assertEqual(expected, exec_ssh_mock.call_args_list)
def test__power_on_good(self):
@mock.patch.object(processutils, 'ssh_execute')
@mock.patch.object(ssh, '_get_power_status')
@mock.patch.object(ssh, '_get_hosts_name_for_node')
def test__power_on_good(self, get_hosts_name_mock, get_power_status_mock,
exec_ssh_mock):
info = ssh._parse_driver_info(self.node)
info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"]
with mock.patch.object(ssh, '_get_power_status') \
as get_power_status_mock:
with mock.patch.object(ssh, '_get_hosts_name_for_node') \
as get_hosts_name_mock:
get_power_status_mock.side_effect = [states.POWER_OFF,
states.POWER_ON]
get_hosts_name_mock.return_value = "NodeName"
expected = [mock.call(self.sshclient, info),
mock.call(self.sshclient, info)]
get_power_status_mock.side_effect = [states.POWER_OFF,
states.POWER_ON]
get_hosts_name_mock.return_value = "NodeName"
expected = [mock.call(self.sshclient, info),
mock.call(self.sshclient, info)]
cmd_to_exec = "%s %s" % (info['cmd_set']['base_cmd'],
info['cmd_set']['start_cmd'])
cmd_to_exec = cmd_to_exec.replace('{_NodeName_}', 'NodeName')
current_state = ssh._power_on(self.sshclient, info)
cmd_to_exec = "%s %s" % (info['cmd_set']['base_cmd'],
info['cmd_set']['start_cmd'])
cmd_to_exec = cmd_to_exec.replace('{_NodeName_}', 'NodeName')
current_state = ssh._power_on(self.sshclient, info)
self.assertEqual(states.POWER_ON, current_state)
self.assertEqual(expected,
get_power_status_mock.call_args_list)
get_hosts_name_mock.assert_called_once_with(self.sshclient,
info)
self.exec_ssh_mock.assert_called_once_with(self.sshclient,
cmd_to_exec)
self.assertEqual(states.POWER_ON, current_state)
self.assertEqual(expected, get_power_status_mock.call_args_list)
get_hosts_name_mock.assert_called_once_with(self.sshclient, info)
exec_ssh_mock.assert_called_once_with(self.sshclient, cmd_to_exec)
def test__power_on_fail(self):
@mock.patch.object(processutils, 'ssh_execute')
@mock.patch.object(ssh, '_get_power_status')
@mock.patch.object(ssh, '_get_hosts_name_for_node')
def test__power_on_fail(self, get_hosts_name_mock, get_power_status_mock,
exec_ssh_mock):
info = ssh._parse_driver_info(self.node)
info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"]
with mock.patch.object(ssh, '_get_power_status') \
as get_power_status_mock:
with mock.patch.object(ssh, '_get_hosts_name_for_node') \
as get_hosts_name_mock:
get_power_status_mock.side_effect = [states.POWER_OFF,
states.POWER_OFF]
get_hosts_name_mock.return_value = "NodeName"
expected = [mock.call(self.sshclient, info),
mock.call(self.sshclient, info)]
get_power_status_mock.side_effect = [states.POWER_OFF,
states.POWER_OFF]
get_hosts_name_mock.return_value = "NodeName"
expected = [mock.call(self.sshclient, info),
mock.call(self.sshclient, info)]
cmd_to_exec = "%s %s" % (info['cmd_set']['base_cmd'],
info['cmd_set']['start_cmd'])
cmd_to_exec = cmd_to_exec.replace('{_NodeName_}', 'NodeName')
current_state = ssh._power_on(self.sshclient, info)
cmd_to_exec = "%s %s" % (info['cmd_set']['base_cmd'],
info['cmd_set']['start_cmd'])
cmd_to_exec = cmd_to_exec.replace('{_NodeName_}', 'NodeName')
current_state = ssh._power_on(self.sshclient, info)
self.assertEqual(states.ERROR, current_state)
self.assertEqual(expected,
get_power_status_mock.call_args_list)
get_hosts_name_mock.assert_called_once_with(self.sshclient,
info)
self.exec_ssh_mock.assert_called_once_with(self.sshclient,
cmd_to_exec)
self.assertEqual(states.ERROR, current_state)
self.assertEqual(expected, get_power_status_mock.call_args_list)
get_hosts_name_mock.assert_called_once_with(self.sshclient, info)
exec_ssh_mock.assert_called_once_with(self.sshclient, cmd_to_exec)
def test__power_on_exception(self):
@mock.patch.object(processutils, 'ssh_execute')
@mock.patch.object(ssh, '_get_power_status')
@mock.patch.object(ssh, '_get_hosts_name_for_node')
def test__power_on_exception(self, get_hosts_name_mock,
get_power_status_mock, exec_ssh_mock):
info = ssh._parse_driver_info(self.node)
info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"]
self.exec_ssh_mock.side_effect = processutils.ProcessExecutionError
with mock.patch.object(
ssh, '_get_power_status') as get_power_status_mock:
with mock.patch.object(
ssh, '_get_hosts_name_for_node') as get_hosts_name_mock:
get_power_status_mock.side_effect = [states.POWER_OFF,
states.POWER_ON]
get_hosts_name_mock.return_value = "NodeName"
exec_ssh_mock.side_effect = processutils.ProcessExecutionError
get_power_status_mock.side_effect = [states.POWER_OFF,
states.POWER_ON]
get_hosts_name_mock.return_value = "NodeName"
cmd_to_exec = "%s %s" % (info['cmd_set']['base_cmd'],
info['cmd_set']['start_cmd'])
cmd_to_exec = cmd_to_exec.replace('{_NodeName_}', 'NodeName')
cmd_to_exec = "%s %s" % (info['cmd_set']['base_cmd'],
info['cmd_set']['start_cmd'])
cmd_to_exec = cmd_to_exec.replace('{_NodeName_}', 'NodeName')
self.assertRaises(exception.SSHCommandFailed,
ssh._power_on,
self.sshclient,
info)
get_power_status_mock.assert_called_once_with(self.sshclient,
info)
get_hosts_name_mock.assert_called_once_with(self.sshclient,
info)
self.exec_ssh_mock.assert_called_once_with(self.sshclient,
cmd_to_exec)
self.assertRaises(exception.SSHCommandFailed,
ssh._power_on,
self.sshclient,
info)
get_power_status_mock.assert_called_once_with(self.sshclient, info)
get_hosts_name_mock.assert_called_once_with(self.sshclient, info)
exec_ssh_mock.assert_called_once_with(self.sshclient, cmd_to_exec)
def test__power_off_good(self):
@mock.patch.object(processutils, 'ssh_execute')
@mock.patch.object(ssh, '_get_power_status')
@mock.patch.object(ssh, '_get_hosts_name_for_node')
def test__power_off_good(self, get_hosts_name_mock,
get_power_status_mock, exec_ssh_mock):
info = ssh._parse_driver_info(self.node)
info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"]
with mock.patch.object(ssh, '_get_power_status') \
as get_power_status_mock:
with mock.patch.object(ssh, '_get_hosts_name_for_node') \
as get_hosts_name_mock:
get_power_status_mock.side_effect = [states.POWER_ON,
states.POWER_OFF]
get_hosts_name_mock.return_value = "NodeName"
expected = [mock.call(self.sshclient, info),
mock.call(self.sshclient, info)]
get_power_status_mock.side_effect = [states.POWER_ON,
states.POWER_OFF]
get_hosts_name_mock.return_value = "NodeName"
expected = [mock.call(self.sshclient, info),
mock.call(self.sshclient, info)]
cmd_to_exec = "%s %s" % (info['cmd_set']['base_cmd'],
info['cmd_set']['stop_cmd'])
cmd_to_exec = cmd_to_exec.replace('{_NodeName_}', 'NodeName')
current_state = ssh._power_off(self.sshclient, info)
cmd_to_exec = "%s %s" % (info['cmd_set']['base_cmd'],
info['cmd_set']['stop_cmd'])
cmd_to_exec = cmd_to_exec.replace('{_NodeName_}', 'NodeName')
current_state = ssh._power_off(self.sshclient, info)
self.assertEqual(states.POWER_OFF, current_state)
self.assertEqual(expected,
get_power_status_mock.call_args_list)
get_hosts_name_mock.assert_called_once_with(self.sshclient,
info)
self.exec_ssh_mock.assert_called_once_with(self.sshclient,
cmd_to_exec)
self.assertEqual(states.POWER_OFF, current_state)
self.assertEqual(expected, get_power_status_mock.call_args_list)
get_hosts_name_mock.assert_called_once_with(self.sshclient, info)
exec_ssh_mock.assert_called_once_with(self.sshclient, cmd_to_exec)
def test__power_off_fail(self):
@mock.patch.object(processutils, 'ssh_execute')
@mock.patch.object(ssh, '_get_power_status')
@mock.patch.object(ssh, '_get_hosts_name_for_node')
def test__power_off_fail(self, get_hosts_name_mock,
get_power_status_mock, exec_ssh_mock):
info = ssh._parse_driver_info(self.node)
info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"]
with mock.patch.object(ssh, '_get_power_status') \
as get_power_status_mock:
with mock.patch.object(ssh, '_get_hosts_name_for_node') \
as get_hosts_name_mock:
get_power_status_mock.side_effect = [states.POWER_ON,
states.POWER_ON]
get_hosts_name_mock.return_value = "NodeName"
expected = [mock.call(self.sshclient, info),
mock.call(self.sshclient, info)]
get_power_status_mock.side_effect = [states.POWER_ON,
states.POWER_ON]
get_hosts_name_mock.return_value = "NodeName"
expected = [mock.call(self.sshclient, info),
mock.call(self.sshclient, info)]
cmd_to_exec = "%s %s" % (info['cmd_set']['base_cmd'],
info['cmd_set']['stop_cmd'])
cmd_to_exec = cmd_to_exec.replace('{_NodeName_}', 'NodeName')
current_state = ssh._power_off(self.sshclient, info)
cmd_to_exec = "%s %s" % (info['cmd_set']['base_cmd'],
info['cmd_set']['stop_cmd'])
cmd_to_exec = cmd_to_exec.replace('{_NodeName_}', 'NodeName')
current_state = ssh._power_off(self.sshclient, info)
self.assertEqual(states.ERROR, current_state)
self.assertEqual(expected,
get_power_status_mock.call_args_list)
get_hosts_name_mock.assert_called_once_with(self.sshclient,
info)
self.exec_ssh_mock.assert_called_once_with(self.sshclient,
cmd_to_exec)
self.assertEqual(states.ERROR, current_state)
self.assertEqual(expected, get_power_status_mock.call_args_list)
get_hosts_name_mock.assert_called_once_with(self.sshclient, info)
exec_ssh_mock.assert_called_once_with(self.sshclient, cmd_to_exec)
def test__power_off_exception(self):
@mock.patch.object(processutils, 'ssh_execute')
@mock.patch.object(ssh, '_get_power_status')
@mock.patch.object(ssh, '_get_hosts_name_for_node')
def test__power_off_exception(self, get_hosts_name_mock,
get_power_status_mock, exec_ssh_mock):
info = ssh._parse_driver_info(self.node)
info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"]
self.exec_ssh_mock.side_effect = processutils.ProcessExecutionError
with mock.patch.object(
ssh, '_get_power_status') as get_power_status_mock:
with mock.patch.object(
ssh, '_get_hosts_name_for_node') as get_hosts_name_mock:
self.exec_ssh_mock.side_effect = (
processutils.ProcessExecutionError)
get_power_status_mock.side_effect = [states.POWER_ON,
states.POWER_OFF]
get_hosts_name_mock.return_value = "NodeName"
exec_ssh_mock.side_effect = processutils.ProcessExecutionError
get_power_status_mock.side_effect = [states.POWER_ON,
states.POWER_OFF]
get_hosts_name_mock.return_value = "NodeName"
cmd_to_exec = "%s %s" % (info['cmd_set']['base_cmd'],
info['cmd_set']['stop_cmd'])
cmd_to_exec = cmd_to_exec.replace('{_NodeName_}', 'NodeName')
cmd_to_exec = "%s %s" % (info['cmd_set']['base_cmd'],
info['cmd_set']['stop_cmd'])
cmd_to_exec = cmd_to_exec.replace('{_NodeName_}', 'NodeName')
self.assertRaises(exception.SSHCommandFailed,
ssh._power_off,
self.sshclient,
info)
get_power_status_mock.assert_called_once_with(self.sshclient,
info)
get_hosts_name_mock.assert_called_once_with(self.sshclient,
info)
self.exec_ssh_mock.assert_called_once_with(self.sshclient,
cmd_to_exec)
self.assertRaises(exception.SSHCommandFailed, ssh._power_off,
self.sshclient, info)
get_power_status_mock.assert_called_once_with(self.sshclient, info)
get_hosts_name_mock.assert_called_once_with(self.sshclient, info)
exec_ssh_mock.assert_called_once_with(self.sshclient, cmd_to_exec)
def test_exec_ssh_command_good(self):
# stop mocking the processutils.ssh_execute because we
# are testing it here
self.ssh_patcher.stop()
self.ssh_patcher = None
class Channel(object):
def recv_exit_status(self):
return 0
@ -568,11 +527,6 @@ class SSHPrivateMethodsTestCase(base.TestCase):
exec_command_mock.assert_called_once_with("command")
def test_exec_ssh_command_fail(self):
# stop mocking the processutils.ssh_execute because we
# are testing it here
self.ssh_patcher.stop()
self.ssh_patcher = None
class Channel(object):
def recv_exit_status(self):
return 127
@ -614,49 +568,20 @@ class SSHDriverTestCase(db_base.DbTestCase):
node_id=self.node.id))
self.sshclient = paramiko.SSHClient()
#setup these mocks because most tests use them
self.parse_drv_info_patcher = mock.patch.object(ssh,
'_parse_driver_info')
self.parse_drv_info_mock = None
self.get_mac_addr_patcher = mock.patch.object(
driver_utils,
'get_node_mac_addresses')
self.get_mac_addr_mock = self.get_mac_addr_patcher.start()
self.get_conn_patcher = mock.patch.object(ssh, '_get_connection')
self.get_conn_mock = self.get_conn_patcher.start()
def stop_patchers():
if self.parse_drv_info_mock:
self.parse_drv_info_patcher.stop()
if self.get_mac_addr_mock:
self.get_mac_addr_patcher.stop()
if self.get_conn_mock:
self.get_conn_patcher.stop()
self.addCleanup(stop_patchers)
def test__validate_info_ssh_connect_failed(self):
@mock.patch.object(utils, 'ssh_connect')
def test__validate_info_ssh_connect_failed(self, ssh_connect_mock):
info = ssh._parse_driver_info(self.node)
self.get_conn_patcher.stop()
self.get_conn_mock = None
with mock.patch.object(utils, 'ssh_connect') \
as ssh_connect_mock:
ssh_connect_mock.side_effect = exception.SSHConnectFailed(
host='fake')
with task_manager.acquire(self.context, [info['uuid']],
shared=False) as task:
self.assertRaises(exception.InvalidParameterValue,
task.resources[0].driver.power.validate,
task, self.node)
driver_info = ssh._parse_driver_info(self.node)
ssh_connect_mock.assert_called_once_with(driver_info)
ssh_connect_mock.side_effect = exception.SSHConnectFailed(host='fake')
with task_manager.acquire(self.context, [info['uuid']],
shared=False) as task:
self.assertRaises(exception.InvalidParameterValue,
task.resources[0].driver.power.validate,
task, self.node)
driver_info = ssh._parse_driver_info(self.node)
ssh_connect_mock.assert_called_once_with(driver_info)
def test_validate_fail_no_port(self):
# stop the get_mac_addr mock, it's needed for this test
self.get_mac_addr_patcher.stop()
self.get_mac_addr_mock = None
new_node = obj_utils.create_test_node(
self.context,
id=321,
@ -669,185 +594,186 @@ class SSHDriverTestCase(db_base.DbTestCase):
task.resources[0].driver.power.validate,
task, new_node)
def test_reboot_good(self):
@mock.patch.object(driver_utils, 'get_node_mac_addresses')
@mock.patch.object(ssh, '_get_connection')
@mock.patch.object(ssh, '_get_power_status')
@mock.patch.object(ssh, '_power_off')
@mock.patch.object(ssh, '_power_on')
def test_reboot_good(self, power_on_mock, power_off_mock,
get_power_stat_mock, get_conn_mock,
get_mac_addr_mock):
info = ssh._parse_driver_info(self.node)
info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"]
get_mac_addr_mock.return_value = info['macs']
get_conn_mock.return_value = self.sshclient
get_power_stat_mock.return_value = states.POWER_ON
power_off_mock.return_value = None
power_on_mock.return_value = states.POWER_ON
with mock.patch.object(ssh,
'_parse_driver_info') as parse_drv_info_mock:
parse_drv_info_mock.return_value = info
with task_manager.acquire(self.context, [info['uuid']],
shared=False) as task:
task.resources[0].driver.power.reboot(task, self.node)
self.parse_drv_info_mock = self.parse_drv_info_patcher.start()
self.parse_drv_info_mock.return_value = info
self.get_mac_addr_mock.return_value = info['macs']
self.get_conn_mock.return_value = self.sshclient
parse_drv_info_mock.assert_called_once_with(self.node)
get_mac_addr_mock.assert_called_once_with(mock.ANY, self.node)
get_conn_mock.assert_called_once_with(self.node)
get_power_stat_mock.assert_called_once_with(self.sshclient, info)
power_off_mock.assert_called_once_with(self.sshclient, info)
power_on_mock.assert_called_once_with(self.sshclient, info)
with mock.patch.object(ssh, '_get_power_status') \
as get_power_stat_mock:
with mock.patch.object(ssh, '_power_off') as power_off_mock:
with mock.patch.object(ssh, '_power_on') as power_on_mock:
get_power_stat_mock.return_value = states.POWER_ON
power_off_mock.return_value = None
power_on_mock.return_value = states.POWER_ON
with task_manager.acquire(self.context, [info['uuid']],
shared=False) as task:
task.resources[0].driver.power.reboot(task, self.node)
self.parse_drv_info_mock.assert_called_once_with(self.node)
self.get_mac_addr_mock.assert_called_once_with(mock.ANY,
self.node)
self.get_conn_mock.assert_called_once_with(self.node)
get_power_stat_mock.assert_called_once_with(self.sshclient,
info)
power_off_mock.assert_called_once_with(self.sshclient,
info)
power_on_mock.assert_called_once_with(self.sshclient, info)
def test_reboot_fail(self):
@mock.patch.object(driver_utils, 'get_node_mac_addresses')
@mock.patch.object(ssh, '_get_connection')
@mock.patch.object(ssh, '_get_power_status')
@mock.patch.object(ssh, '_power_off')
@mock.patch.object(ssh, '_power_on')
def test_reboot_fail(self, power_on_mock, power_off_mock,
get_power_stat_mock, get_conn_mock,
get_mac_addr_mock):
info = ssh._parse_driver_info(self.node)
info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"]
get_mac_addr_mock.return_value = info['macs']
get_conn_mock.return_value = self.sshclient
get_power_stat_mock.return_value = states.POWER_ON
power_off_mock.return_value = None
power_on_mock.return_value = states.POWER_OFF
with mock.patch.object(ssh,
'_parse_driver_info') as parse_drv_info_mock:
parse_drv_info_mock.return_value = info
with task_manager.acquire(self.context, [info['uuid']],
shared=False) as task:
self.assertRaises(exception.PowerStateFailure,
task.resources[0].driver.power.reboot,
task,
self.node)
parse_drv_info_mock.assert_called_once_with(self.node)
get_mac_addr_mock.assert_called_once_with(mock.ANY, self.node)
get_conn_mock.assert_called_once_with(self.node)
get_power_stat_mock.assert_called_once_with(self.sshclient, info)
power_off_mock.assert_called_once_with(self.sshclient, info)
power_on_mock.assert_called_once_with(self.sshclient, info)
self.parse_drv_info_mock = self.parse_drv_info_patcher.start()
self.parse_drv_info_mock.return_value = info
self.get_mac_addr_mock.return_value = info['macs']
self.get_conn_mock.return_value = self.sshclient
with mock.patch.object(ssh, '_get_power_status') \
as get_power_stat_mock:
with mock.patch.object(ssh, '_power_off') as power_off_mock:
with mock.patch.object(ssh, '_power_on') as power_on_mock:
get_power_stat_mock.return_value = states.POWER_ON
power_off_mock.return_value = None
power_on_mock.return_value = states.POWER_OFF
with task_manager.acquire(self.context, [info['uuid']],
shared=False) as task:
self.assertRaises(
exception.PowerStateFailure,
task.resources[0].driver.power.reboot,
task,
self.node)
self.parse_drv_info_mock.assert_called_once_with(self.node)
self.get_mac_addr_mock.assert_called_once_with(mock.ANY,
self.node)
self.get_conn_mock.assert_called_once_with(self.node)
get_power_stat_mock.assert_called_once_with(self.sshclient,
info)
power_off_mock.assert_called_once_with(self.sshclient,
info)
power_on_mock.assert_called_once_with(self.sshclient, info)
def test_set_power_state_bad_state(self):
@mock.patch.object(driver_utils, 'get_node_mac_addresses')
@mock.patch.object(ssh, '_get_connection')
def test_set_power_state_bad_state(self, get_conn_mock,
get_mac_addr_mock):
info = ssh._parse_driver_info(self.node)
info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"]
self.parse_drv_info_mock = self.parse_drv_info_patcher.start()
self.parse_drv_info_mock.return_value = info
self.get_mac_addr_mock.return_value = info['macs']
self.get_conn_mock.return_value = self.sshclient
with task_manager.acquire(self.context, [info['uuid']],
shared=False) as task:
self.assertRaises(
get_mac_addr_mock.return_value = info['macs']
get_conn_mock.return_value = self.sshclient
with mock.patch.object(ssh,
'_parse_driver_info') as parse_drv_info_mock:
parse_drv_info_mock.return_value = info
with task_manager.acquire(self.context, [info['uuid']],
shared=False) as task:
self.assertRaises(
exception.InvalidParameterValue,
task.resources[0].driver.power.set_power_state,
task,
self.node,
"BAD_PSTATE")
self. parse_drv_info_mock.assert_called_once_with(self.node)
self.get_mac_addr_mock.assert_called_once_with(mock.ANY, self.node)
self.get_conn_mock.assert_called_once_with(self.node)
parse_drv_info_mock.assert_called_once_with(self.node)
get_mac_addr_mock.assert_called_once_with(mock.ANY, self.node)
get_conn_mock.assert_called_once_with(self.node)
def test_set_power_state_on_good(self):
@mock.patch.object(driver_utils, 'get_node_mac_addresses')
@mock.patch.object(ssh, '_get_connection')
@mock.patch.object(ssh, '_power_on')
def test_set_power_state_on_good(self, power_on_mock, get_conn_mock,
get_mac_addr_mock):
info = ssh._parse_driver_info(self.node)
info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"]
self.parse_drv_info_mock = self.parse_drv_info_patcher.start()
self.parse_drv_info_mock.return_value = info
self.get_mac_addr_mock.return_value = info['macs']
self.get_conn_mock.return_value = self.sshclient
with mock.patch.object(ssh, '_power_on') as power_on_mock:
power_on_mock.return_value = states.POWER_ON
get_mac_addr_mock.return_value = info['macs']
get_conn_mock.return_value = self.sshclient
power_on_mock.return_value = states.POWER_ON
with mock.patch.object(ssh,
'_parse_driver_info') as parse_drv_info_mock:
parse_drv_info_mock.return_value = info
with task_manager.acquire(self.context, [info['uuid']],
shared=False) as task:
task.resources[0].driver.power.set_power_state(task,
self.node,
task.resources[0].driver.power.set_power_state(task, self.node,
states.POWER_ON)
self.parse_drv_info_mock.assert_called_once_with(self.node)
self.get_mac_addr_mock.assert_called_once_with(mock.ANY, self.node)
self.get_conn_mock.assert_called_once_with(self.node)
power_on_mock.assert_called_once_with(self.sshclient, info)
parse_drv_info_mock.assert_called_once_with(self.node)
get_mac_addr_mock.assert_called_once_with(mock.ANY, self.node)
get_conn_mock.assert_called_once_with(self.node)
power_on_mock.assert_called_once_with(self.sshclient, info)
def test_set_power_state_on_fail(self):
@mock.patch.object(driver_utils, 'get_node_mac_addresses')
@mock.patch.object(ssh, '_get_connection')
@mock.patch.object(ssh, '_power_on')
def test_set_power_state_on_fail(self, power_on_mock, get_conn_mock,
get_mac_addr_mock):
info = ssh._parse_driver_info(self.node)
info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"]
self.parse_drv_info_mock = self.parse_drv_info_patcher.start()
self.parse_drv_info_mock.return_value = info
self.get_mac_addr_mock.return_value = info['macs']
self.get_conn_mock.return_value = self.sshclient
with mock.patch.object(ssh, '_power_on') as power_on_mock:
power_on_mock.return_value = states.POWER_OFF
get_mac_addr_mock.return_value = info['macs']
get_conn_mock.return_value = self.sshclient
power_on_mock.return_value = states.POWER_OFF
with mock.patch.object(ssh,
'_parse_driver_info') as parse_drv_info_mock:
parse_drv_info_mock.return_value = info
with task_manager.acquire(self.context, [info['uuid']],
shared=False) as task:
self.assertRaises(
exception.PowerStateFailure,
task.resources[0].driver.power.set_power_state,
task,
self.node,
states.POWER_ON)
exception.PowerStateFailure,
task.resources[0].driver.power.set_power_state,
task,
self.node,
states.POWER_ON)
self.parse_drv_info_mock.assert_called_once_with(self.node)
self.get_mac_addr_mock.assert_called_once_with(mock.ANY, self.node)
self.get_conn_mock.assert_called_once_with(self.node)
power_on_mock.assert_called_once_with(self.sshclient, info)
parse_drv_info_mock.assert_called_once_with(self.node)
get_mac_addr_mock.assert_called_once_with(mock.ANY, self.node)
get_conn_mock.assert_called_once_with(self.node)
power_on_mock.assert_called_once_with(self.sshclient, info)
def test_set_power_state_off_good(self):
@mock.patch.object(driver_utils, 'get_node_mac_addresses')
@mock.patch.object(ssh, '_get_connection')
@mock.patch.object(ssh, '_power_off')
def test_set_power_state_off_good(self, power_off_mock, get_conn_mock,
get_mac_addr_mock):
info = ssh._parse_driver_info(self.node)
info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"]
self.parse_drv_info_mock = self.parse_drv_info_patcher.start()
self.parse_drv_info_mock.return_value = info
self.get_mac_addr_mock.return_value = info['macs']
self.get_conn_mock.return_value = self.sshclient
with mock.patch.object(ssh, '_power_off') as power_off_mock:
power_off_mock.return_value = states.POWER_OFF
get_mac_addr_mock.return_value = info['macs']
get_conn_mock.return_value = self.sshclient
power_off_mock.return_value = states.POWER_OFF
with mock.patch.object(ssh,
'_parse_driver_info') as parse_drv_info_mock:
parse_drv_info_mock.return_value = info
with task_manager.acquire(self.context, [info['uuid']],
shared=False) as task:
task.resources[0].driver.power.set_power_state(task,
self.node, states.POWER_OFF)
task.resources[0].driver.power.set_power_state(
task, self.node, states.POWER_OFF)
self.parse_drv_info_mock.assert_called_once_with(self.node)
self.get_mac_addr_mock.assert_called_once_with(mock.ANY, self.node)
self.get_conn_mock.assert_called_once_with(self.node)
power_off_mock.assert_called_once_with(self.sshclient, info)
parse_drv_info_mock.assert_called_once_with(self.node)
get_mac_addr_mock.assert_called_once_with(mock.ANY, self.node)
get_conn_mock.assert_called_once_with(self.node)
power_off_mock.assert_called_once_with(self.sshclient, info)
def test_set_power_state_off_fail(self):
@mock.patch.object(driver_utils, 'get_node_mac_addresses')
@mock.patch.object(ssh, '_get_connection')
@mock.patch.object(ssh, '_power_off')
def test_set_power_state_off_fail(self, power_off_mock, get_conn_mock,
get_mac_addr_mock):
info = ssh._parse_driver_info(self.node)
info['macs'] = ["11:11:11:11:11:11", "52:54:00:cf:2d:31"]
self.parse_drv_info_mock = self.parse_drv_info_patcher.start()
self.parse_drv_info_mock.return_value = info
self.get_mac_addr_mock.return_value = info['macs']
self.get_conn_mock.return_value = self.sshclient
with mock.patch.object(ssh, '_power_off') as power_off_mock:
power_off_mock.return_value = states.POWER_ON
get_mac_addr_mock.return_value = info['macs']
get_conn_mock.return_value = self.sshclient
power_off_mock.return_value = states.POWER_ON
with mock.patch.object(ssh,
'_parse_driver_info') as parse_drv_info_mock:
parse_drv_info_mock.return_value = info
with task_manager.acquire(self.context, [info['uuid']],
shared=False) as task:
self.assertRaises(
exception.PowerStateFailure,
task.resources[0].driver.power.set_power_state,
task,
self.node,
states.POWER_OFF)
exception.PowerStateFailure,
task.resources[0].driver.power.set_power_state,
task,
self.node,
states.POWER_OFF)
self.parse_drv_info_mock.assert_called_once_with(self.node)
self.get_mac_addr_mock.assert_called_once_with(mock.ANY, self.node)
self.get_conn_mock.assert_called_once_with(self.node)
power_off_mock.assert_called_once_with(self.sshclient, info)
parse_drv_info_mock.assert_called_once_with(self.node)
get_mac_addr_mock.assert_called_once_with(mock.ANY, self.node)
get_conn_mock.assert_called_once_with(self.node)
power_off_mock.assert_called_once_with(self.sshclient, info)