
* should be 100% * also comes with fixups Partially implements: blueprint image-based-provisioning Change-Id: I73ff92f65c0872ad82917d22900c3f410986629d
159 lines
5.9 KiB
Python
159 lines
5.9 KiB
Python
# Copyright 2011 Justin Santa Barbara
|
|
# Copyright 2012 Hewlett-Packard Development Company, L.P.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import os
|
|
import tempfile
|
|
import testtools
|
|
|
|
import mock
|
|
import stevedore
|
|
|
|
from fuel_agent import errors
|
|
from fuel_agent.openstack.common import processutils
|
|
from fuel_agent.utils import utils
|
|
|
|
|
|
class ExecuteTestCase(testtools.TestCase):
|
|
"""This class is partly based on the same class in openstack/ironic."""
|
|
|
|
def setUp(self):
|
|
super(ExecuteTestCase, self).setUp()
|
|
fake_driver = stevedore.extension.Extension('fake_driver', None, None,
|
|
'fake_obj')
|
|
self.drv_manager = stevedore.driver.DriverManager.make_test_instance(
|
|
fake_driver)
|
|
|
|
def test_parse_unit(self):
|
|
self.assertEqual(utils.parse_unit('1.00m', 'm', ceil=True), 1)
|
|
self.assertEqual(utils.parse_unit('1.00m', 'm', ceil=False), 1)
|
|
self.assertEqual(utils.parse_unit('1.49m', 'm', ceil=True), 2)
|
|
self.assertEqual(utils.parse_unit('1.49m', 'm', ceil=False), 1)
|
|
self.assertEqual(utils.parse_unit('1.51m', 'm', ceil=True), 2)
|
|
self.assertEqual(utils.parse_unit('1.51m', 'm', ceil=False), 1)
|
|
self.assertRaises(ValueError, utils.parse_unit, '1.00m', 'MiB')
|
|
self.assertRaises(ValueError, utils.parse_unit, '', 'MiB')
|
|
|
|
def test_B2MiB(self):
|
|
self.assertEqual(utils.B2MiB(1048575, ceil=False), 0)
|
|
self.assertEqual(utils.B2MiB(1048576, ceil=False), 1)
|
|
self.assertEqual(utils.B2MiB(1048575, ceil=True), 1)
|
|
self.assertEqual(utils.B2MiB(1048576, ceil=True), 1)
|
|
self.assertEqual(utils.B2MiB(1048577, ceil=True), 2)
|
|
|
|
def test_retry_on_failure(self):
|
|
fd, tmpfilename = tempfile.mkstemp()
|
|
_, tmpfilename2 = tempfile.mkstemp()
|
|
try:
|
|
fp = os.fdopen(fd, 'w+')
|
|
fp.write('''#!/bin/sh
|
|
# If stdin fails to get passed during one of the runs, make a note.
|
|
if ! grep -q foo
|
|
then
|
|
echo 'failure' > "$1"
|
|
fi
|
|
# If stdin has failed to get passed during this or a previous run, exit early.
|
|
if grep failure "$1"
|
|
then
|
|
exit 1
|
|
fi
|
|
runs="$(cat $1)"
|
|
if [ -z "$runs" ]
|
|
then
|
|
runs=0
|
|
fi
|
|
runs=$(($runs + 1))
|
|
echo $runs > "$1"
|
|
exit 1
|
|
''')
|
|
fp.close()
|
|
os.chmod(tmpfilename, 0o755)
|
|
self.assertRaises(processutils.ProcessExecutionError,
|
|
utils.execute,
|
|
tmpfilename, tmpfilename2, attempts=10,
|
|
process_input='foo',
|
|
delay_on_retry=False)
|
|
fp = open(tmpfilename2, 'r')
|
|
runs = fp.read()
|
|
fp.close()
|
|
self.assertNotEqual(runs.strip(), 'failure', 'stdin did not '
|
|
'always get passed '
|
|
'correctly')
|
|
runs = int(runs.strip())
|
|
self.assertEqual(10, runs,
|
|
'Ran %d times instead of 10.' % (runs,))
|
|
finally:
|
|
os.unlink(tmpfilename)
|
|
os.unlink(tmpfilename2)
|
|
|
|
def test_unknown_kwargs_raises_error(self):
|
|
self.assertRaises(processutils.UnknownArgumentError,
|
|
utils.execute,
|
|
'/usr/bin/env', 'true',
|
|
this_is_not_a_valid_kwarg=True)
|
|
|
|
def test_check_exit_code_boolean(self):
|
|
utils.execute('/usr/bin/env', 'false', check_exit_code=False)
|
|
self.assertRaises(processutils.ProcessExecutionError,
|
|
utils.execute,
|
|
'/usr/bin/env', 'false', check_exit_code=True)
|
|
|
|
def test_no_retry_on_success(self):
|
|
fd, tmpfilename = tempfile.mkstemp()
|
|
_, tmpfilename2 = tempfile.mkstemp()
|
|
try:
|
|
fp = os.fdopen(fd, 'w+')
|
|
fp.write('''#!/bin/sh
|
|
# If we've already run, bail out.
|
|
grep -q foo "$1" && exit 1
|
|
# Mark that we've run before.
|
|
echo foo > "$1"
|
|
# Check that stdin gets passed correctly.
|
|
grep foo
|
|
''')
|
|
fp.close()
|
|
os.chmod(tmpfilename, 0o755)
|
|
utils.execute(tmpfilename,
|
|
tmpfilename2,
|
|
process_input='foo',
|
|
attempts=2)
|
|
finally:
|
|
os.unlink(tmpfilename)
|
|
os.unlink(tmpfilename2)
|
|
|
|
@mock.patch('stevedore.driver.DriverManager')
|
|
def test_get_driver(self, mock_drv_manager):
|
|
mock_drv_manager.return_value = self.drv_manager
|
|
self.assertEqual('fake_obj', utils.get_driver('fake_driver'))
|
|
|
|
@mock.patch('jinja2.Environment')
|
|
@mock.patch('jinja2.FileSystemLoader')
|
|
@mock.patch('six.moves.builtins.open')
|
|
def test_render_and_save_fail(self, mock_open, mock_j_lo, mock_j_env):
|
|
mock_open.side_effect = Exception('foo')
|
|
self.assertRaises(errors.TemplateWriteError, utils.render_and_save,
|
|
'fake_dir', 'fake_tmpl_name', 'fake_data',
|
|
'fake_file_name')
|
|
|
|
@mock.patch('jinja2.Environment')
|
|
@mock.patch('jinja2.FileSystemLoader')
|
|
@mock.patch('six.moves.builtins.open')
|
|
def test_render_and_save_ok(self, mock_open, mock_j_lo, mock_j_env):
|
|
mock_render = mock.Mock()
|
|
mock_render.render.return_value = 'fake_data'
|
|
mock_j_env.get_template.return_value = mock_render
|
|
utils.render_and_save('fake_dir', 'fake_tmpl_name', 'fake_data',
|
|
'fake_file_name')
|
|
mock_open.assert_called_once_with('fake_file_name', 'w')
|