Fix guestagent.test_operating_system for Python3
* Tests for _write_file_as_root failed because tempfile.NamedTemporaryFile use binary mode by default, changing to text mode fixes. * Migrate to olso_serialization.base64 for Base64Codec. Also change the return value from bytearray to bytes for Base64Codec.deserialize. Base64Codec supports reverse encoding(i.e. binary data will be written to the dest file), in this situation, the dest file should be opened with binary mode(and when reading from the file, binary mode should also be used). * stream_codecs.StringConverter converts iterable objects with map function. The behavior of map function is different under Python 2.x and 3.x. However csv.writerows(before Python 3.5) and unpack_singleton in trove.common.utils both need a list object. Converting the return value of map function to a list explicitly before passing to these functions fixes. guestagent.test_operating_system is the last blacklist regex pattern for py3 unittests. With the above problems fixed, blacklist-py3.txt is not needed any more, and tox.ini is also updateted. Migrating from ostestr to stestr is also done while updating tox.ini. Here is the ML post for more information about os-testr and stestr: http://lists.openstack.org/pipermail/openstack-dev/2017-September/122135.html Partially implements: blueprint trove-python3 Change-Id: I31f1f97901d6ebff8a91c1b70a343e724ab806eb Signed-off-by: Zhao Chao <zhaochao1984@gmail.com>
This commit is contained in:
parent
d6bd37f3f3
commit
8ce9d735d2
@ -1,3 +1,3 @@
|
|||||||
[DEFAULT]
|
[DEFAULT]
|
||||||
test_path=${OS_TEST_PATH:-./trove/tests}
|
test_path=${OS_TEST_PATH:-./trove/tests/unittests}
|
||||||
top_dir=./
|
top_dir=./
|
||||||
|
@ -1,4 +0,0 @@
|
|||||||
[DEFAULT]
|
|
||||||
test_command=${PYTHON:-python} -m subunit.run discover ./trove/tests/unittests $LISTOPT $IDOPTION
|
|
||||||
test_id_option=--load-list $IDFILE
|
|
||||||
test_list_option=--list
|
|
@ -1,3 +0,0 @@
|
|||||||
# Use a blacklist of tests known to fail on Python 3, until
|
|
||||||
# all unit tests will pass on Python 3.
|
|
||||||
guestagent.test_operating_system
|
|
13
tox.ini
13
tox.ini
@ -32,20 +32,21 @@ commands =
|
|||||||
flake8
|
flake8
|
||||||
doc8 {posargs}
|
doc8 {posargs}
|
||||||
|
|
||||||
|
[stestr-base]
|
||||||
|
commands = stestr run --serial '{posargs}'
|
||||||
|
stestr slowest
|
||||||
|
|
||||||
[testenv:py27]
|
[testenv:py27]
|
||||||
commands = {[testenv]commands}
|
commands = {[testenv]commands}
|
||||||
ostestr {posargs} --slowest --serial
|
{[stestr-base]commands}
|
||||||
|
|
||||||
[py3base]
|
|
||||||
commands = ostestr --slowest --blacklist-file=blacklist-py3.txt --serial --regex '.*'
|
|
||||||
|
|
||||||
[testenv:py34]
|
[testenv:py34]
|
||||||
commands = {[testenv]commands}
|
commands = {[testenv]commands}
|
||||||
{[py3base]commands}
|
{[stestr-base]commands}
|
||||||
|
|
||||||
[testenv:py35]
|
[testenv:py35]
|
||||||
commands = {[testenv]commands}
|
commands = {[testenv]commands}
|
||||||
{[py3base]commands}
|
{[stestr-base]commands}
|
||||||
|
|
||||||
[testenv:apiexamples]
|
[testenv:apiexamples]
|
||||||
commands = {envpython} generate_examples.py
|
commands = {envpython} generate_examples.py
|
||||||
|
@ -31,8 +31,9 @@ IV_BIT_COUNT = 16
|
|||||||
|
|
||||||
|
|
||||||
def encode_data(data):
|
def encode_data(data):
|
||||||
if isinstance(data, six.text_type):
|
# NOTE(zhaochao) No need to encoding string object any more,
|
||||||
data = data.encode('utf-8')
|
# as Base64Codec is now using oslo_serialization.base64 which
|
||||||
|
# could take care of this.
|
||||||
return stream_codecs.Base64Codec().serialize(data)
|
return stream_codecs.Base64Codec().serialize(data)
|
||||||
|
|
||||||
|
|
||||||
|
@ -15,7 +15,6 @@
|
|||||||
|
|
||||||
import abc
|
import abc
|
||||||
import ast
|
import ast
|
||||||
import base64
|
|
||||||
import csv
|
import csv
|
||||||
import json
|
import json
|
||||||
import re
|
import re
|
||||||
@ -26,6 +25,8 @@ from six.moves import configparser
|
|||||||
import xmltodict
|
import xmltodict
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
|
from oslo_serialization import base64
|
||||||
|
|
||||||
from trove.common import utils as trove_utils
|
from trove.common import utils as trove_utils
|
||||||
|
|
||||||
|
|
||||||
@ -324,9 +325,14 @@ class PropertiesCodec(StreamCodec):
|
|||||||
key = row[0].strip()
|
key = row[0].strip()
|
||||||
# Ignore comment lines.
|
# Ignore comment lines.
|
||||||
if not key.strip().startswith(self._comment_markers):
|
if not key.strip().startswith(self._comment_markers):
|
||||||
items = self._string_converter.to_objects(
|
# NOTE(zhaochao): a list object is expected for
|
||||||
|
# trove_utils.unpack_singleton, however in python3
|
||||||
|
# map objects won't be treated as lists, so we
|
||||||
|
# convert the result of StringConverter.to_objects
|
||||||
|
# to a list explicitly.
|
||||||
|
items = list(self._string_converter.to_objects(
|
||||||
[v if v else None for v in
|
[v if v else None for v in
|
||||||
map(self._strip_comments, row[1:])])
|
map(self._strip_comments, row[1:])]))
|
||||||
current = data_dict.get(key)
|
current = data_dict.get(key)
|
||||||
if current is not None:
|
if current is not None:
|
||||||
current.append(trove_utils.unpack_singleton(items)
|
current.append(trove_utils.unpack_singleton(items)
|
||||||
@ -360,9 +366,14 @@ class PropertiesCodec(StreamCodec):
|
|||||||
header, self._string_converter.to_strings(items)))
|
header, self._string_converter.to_strings(items)))
|
||||||
else:
|
else:
|
||||||
# This is a single-row property with only one argument.
|
# This is a single-row property with only one argument.
|
||||||
|
# Note(zhaochao): csv.writerows expects a list object before
|
||||||
|
# python 3.5, but map objects won't be treated as lists in
|
||||||
|
# python 3, so we explicitly convert the result of
|
||||||
|
# StringConverter.to_strings to a list here to support py34
|
||||||
|
# unittests.
|
||||||
rows.append(
|
rows.append(
|
||||||
self._string_converter.to_strings(
|
list(self._string_converter.to_strings(
|
||||||
self._to_list(header, items)))
|
self._to_list(header, items))))
|
||||||
|
|
||||||
return rows
|
return rows
|
||||||
|
|
||||||
@ -434,7 +445,14 @@ class KeyValueCodec(StreamCodec):
|
|||||||
return self._line_terminator.join(lines)
|
return self._line_terminator.join(lines)
|
||||||
|
|
||||||
def deserialize(self, stream):
|
def deserialize(self, stream):
|
||||||
lines = stream.split(self._line_terminator)
|
# Note(zhaochao): In Python 3, when files are opened in text mode,
|
||||||
|
# newlines will be translated to '\n' by default, so we just split
|
||||||
|
# the stream by '\n'.
|
||||||
|
if sys.version_info[0] >= 3:
|
||||||
|
lines = stream.split('\n')
|
||||||
|
else:
|
||||||
|
lines = stream.split(self._line_terminator)
|
||||||
|
|
||||||
result = {}
|
result = {}
|
||||||
for line in lines:
|
for line in lines:
|
||||||
line = line.lstrip().rstrip()
|
line = line.lstrip().rstrip()
|
||||||
@ -500,22 +518,14 @@ class Base64Codec(StreamCodec):
|
|||||||
binary data before writing to a file as well.
|
binary data before writing to a file as well.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def serialize(self, data):
|
# NOTE(zhaochao): migrate to oslo_serialization.base64 to serialize(return
|
||||||
|
# a text object) and deserialize(return a bytes object) data.
|
||||||
|
|
||||||
try:
|
def serialize(self, data):
|
||||||
# py27str - if we've got text data, this should encode it
|
return base64.encode_as_text(data)
|
||||||
# py27aa/py34aa - if we've got a bytearray, this should work too
|
|
||||||
encoded = str(base64.b64encode(data).decode('utf-8'))
|
|
||||||
except TypeError:
|
|
||||||
# py34str - convert to bytes first, then we can encode
|
|
||||||
data_bytes = bytes([ord(item) for item in data])
|
|
||||||
encoded = base64.b64encode(data_bytes).decode('utf-8')
|
|
||||||
return encoded
|
|
||||||
|
|
||||||
def deserialize(self, stream):
|
def deserialize(self, stream):
|
||||||
|
return base64.decode_as_bytes(stream)
|
||||||
# py27 & py34 seem to understand bytearray the same
|
|
||||||
return bytearray([item for item in base64.b64decode(stream)])
|
|
||||||
|
|
||||||
|
|
||||||
class XmlCodec(StreamCodec):
|
class XmlCodec(StreamCodec):
|
||||||
|
@ -56,13 +56,18 @@ def read_file(path, codec=IdentityCodec(), as_root=False, decode=True):
|
|||||||
:raises: :class:`UnprocessableEntity` if codec not given.
|
:raises: :class:`UnprocessableEntity` if codec not given.
|
||||||
"""
|
"""
|
||||||
if path and exists(path, is_directory=False, as_root=as_root):
|
if path and exists(path, is_directory=False, as_root=as_root):
|
||||||
if as_root:
|
if decode:
|
||||||
return _read_file_as_root(path, codec, decode=decode)
|
open_flag = 'r'
|
||||||
|
convert_func = codec.deserialize
|
||||||
|
else:
|
||||||
|
open_flag = 'rb'
|
||||||
|
convert_func = codec.serialize
|
||||||
|
|
||||||
with open(path, 'r') as fp:
|
if as_root:
|
||||||
if decode:
|
return _read_file_as_root(path, open_flag, convert_func)
|
||||||
return codec.deserialize(fp.read())
|
|
||||||
return codec.serialize(fp.read())
|
with open(path, open_flag) as fp:
|
||||||
|
return convert_func(fp.read())
|
||||||
|
|
||||||
raise exception.UnprocessableEntity(_("File does not exist: %s") % path)
|
raise exception.UnprocessableEntity(_("File does not exist: %s") % path)
|
||||||
|
|
||||||
@ -97,24 +102,22 @@ def exists(path, is_directory=False, as_root=False):
|
|||||||
return found
|
return found
|
||||||
|
|
||||||
|
|
||||||
def _read_file_as_root(path, codec, decode=True):
|
def _read_file_as_root(path, open_flag, convert_func):
|
||||||
"""Read a file as root.
|
"""Read a file as root.
|
||||||
|
|
||||||
:param path Path to the written file.
|
:param path Path to the written file.
|
||||||
:type path string
|
:type path string
|
||||||
|
|
||||||
:param codec: A codec used to transform the data.
|
:param open_flag: The flag for opening a file
|
||||||
:type codec: StreamCodec
|
:type open_flag: string
|
||||||
|
|
||||||
:param decode: Should the codec decode the data.
|
:param convert_func: The function for converting data.
|
||||||
:type decode: boolean
|
:type convert_func: callable
|
||||||
"""
|
"""
|
||||||
with tempfile.NamedTemporaryFile() as fp:
|
with tempfile.NamedTemporaryFile(open_flag) as fp:
|
||||||
copy(path, fp.name, force=True, dereference=True, as_root=True)
|
copy(path, fp.name, force=True, dereference=True, as_root=True)
|
||||||
chmod(fp.name, FileMode.ADD_READ_ALL(), as_root=True)
|
chmod(fp.name, FileMode.ADD_READ_ALL(), as_root=True)
|
||||||
if decode:
|
return convert_func(fp.read())
|
||||||
return codec.deserialize(fp.read())
|
|
||||||
return codec.serialize(fp.read())
|
|
||||||
|
|
||||||
|
|
||||||
def write_file(path, data, codec=IdentityCodec(), as_root=False, encode=True):
|
def write_file(path, data, codec=IdentityCodec(), as_root=False, encode=True):
|
||||||
@ -141,20 +144,24 @@ def write_file(path, data, codec=IdentityCodec(), as_root=False, encode=True):
|
|||||||
:raises: :class:`UnprocessableEntity` if path not given.
|
:raises: :class:`UnprocessableEntity` if path not given.
|
||||||
"""
|
"""
|
||||||
if path:
|
if path:
|
||||||
if as_root:
|
if encode:
|
||||||
_write_file_as_root(path, data, codec, encode=encode)
|
open_flag = 'w'
|
||||||
|
convert_func = codec.serialize
|
||||||
else:
|
else:
|
||||||
with open(path, 'w') as fp:
|
open_flag = 'wb'
|
||||||
if encode:
|
convert_func = codec.deserialize
|
||||||
fp.write(codec.serialize(data))
|
|
||||||
else:
|
if as_root:
|
||||||
fp.write(codec.deserialize(data))
|
_write_file_as_root(path, data, open_flag, convert_func)
|
||||||
|
else:
|
||||||
|
with open(path, open_flag) as fp:
|
||||||
|
fp.write(convert_func(data))
|
||||||
fp.flush()
|
fp.flush()
|
||||||
else:
|
else:
|
||||||
raise exception.UnprocessableEntity(_("Invalid path: %s") % path)
|
raise exception.UnprocessableEntity(_("Invalid path: %s") % path)
|
||||||
|
|
||||||
|
|
||||||
def _write_file_as_root(path, data, codec, encode=True):
|
def _write_file_as_root(path, data, open_flag, convert_func):
|
||||||
"""Write a file as root. Overwrite any existing contents.
|
"""Write a file as root. Overwrite any existing contents.
|
||||||
|
|
||||||
:param path Path to the written file.
|
:param path Path to the written file.
|
||||||
@ -163,19 +170,16 @@ def _write_file_as_root(path, data, codec, encode=True):
|
|||||||
:param data: An object representing the file contents.
|
:param data: An object representing the file contents.
|
||||||
:type data: StreamCodec
|
:type data: StreamCodec
|
||||||
|
|
||||||
:param codec: A codec used to transform the data.
|
:param open_flag: The flag for opening a file
|
||||||
:type codec: StreamCodec
|
:type open_flag: string
|
||||||
|
|
||||||
:param encode: Should the codec encode the data.
|
:param convert_func: The function for converting data.
|
||||||
:type encode: boolean
|
:type convert_func: callable
|
||||||
"""
|
"""
|
||||||
# The files gets removed automatically once the managing object goes
|
# The files gets removed automatically once the managing object goes
|
||||||
# out of scope.
|
# out of scope.
|
||||||
with tempfile.NamedTemporaryFile('w', delete=False) as fp:
|
with tempfile.NamedTemporaryFile(open_flag, delete=False) as fp:
|
||||||
if encode:
|
fp.write(convert_func(data))
|
||||||
fp.write(codec.serialize(data))
|
|
||||||
else:
|
|
||||||
fp.write(codec.deserialize(data))
|
|
||||||
fp.flush()
|
fp.flush()
|
||||||
fp.close() # Release the resource before proceeding.
|
fp.close() # Release the resource before proceeding.
|
||||||
copy(fp.name, path, force=True, as_root=True)
|
copy(fp.name, path, force=True, as_root=True)
|
||||||
|
@ -18,7 +18,7 @@ import re
|
|||||||
import stat
|
import stat
|
||||||
import tempfile
|
import tempfile
|
||||||
|
|
||||||
from mock import call, patch
|
from mock import call, patch, mock_open
|
||||||
from oslo_concurrency.processutils import UnknownArgumentError
|
from oslo_concurrency.processutils import UnknownArgumentError
|
||||||
import six
|
import six
|
||||||
from testtools import ExpectedException
|
from testtools import ExpectedException
|
||||||
@ -38,8 +38,12 @@ class TestOperatingSystem(trove_testtools.TestCase):
|
|||||||
|
|
||||||
def test_base64_codec(self):
|
def test_base64_codec(self):
|
||||||
data = "Line 1\nLine 2\n"
|
data = "Line 1\nLine 2\n"
|
||||||
self._test_file_codec(data, Base64Codec())
|
# Base64Codec.deserialize returns bytes instead of string.
|
||||||
|
self._test_file_codec(data, Base64Codec(),
|
||||||
|
expected_data=data.encode('utf-8'))
|
||||||
|
|
||||||
|
# when encoding is reversed for Base64Codec, reading from files
|
||||||
|
# will call Base64Codec.serialize which returns string.
|
||||||
data = "TGluZSAxCkxpbmUgMgo="
|
data = "TGluZSAxCkxpbmUgMgo="
|
||||||
self._test_file_codec(data, Base64Codec(), reverse_encoding=True)
|
self._test_file_codec(data, Base64Codec(), reverse_encoding=True)
|
||||||
|
|
||||||
@ -192,7 +196,7 @@ class TestOperatingSystem(trove_testtools.TestCase):
|
|||||||
@patch.object(operating_system, 'copy')
|
@patch.object(operating_system, 'copy')
|
||||||
def test_write_file_as_root(self, copy_mock):
|
def test_write_file_as_root(self, copy_mock):
|
||||||
target_file = tempfile.NamedTemporaryFile()
|
target_file = tempfile.NamedTemporaryFile()
|
||||||
temp_file = tempfile.NamedTemporaryFile()
|
temp_file = tempfile.NamedTemporaryFile('w')
|
||||||
|
|
||||||
with patch('tempfile.NamedTemporaryFile', return_value=temp_file):
|
with patch('tempfile.NamedTemporaryFile', return_value=temp_file):
|
||||||
operating_system.write_file(
|
operating_system.write_file(
|
||||||
@ -205,13 +209,172 @@ class TestOperatingSystem(trove_testtools.TestCase):
|
|||||||
side_effect=Exception("Error while executing 'copy'."))
|
side_effect=Exception("Error while executing 'copy'."))
|
||||||
def test_write_file_as_root_with_error(self, copy_mock):
|
def test_write_file_as_root_with_error(self, copy_mock):
|
||||||
target_file = tempfile.NamedTemporaryFile()
|
target_file = tempfile.NamedTemporaryFile()
|
||||||
temp_file = tempfile.NamedTemporaryFile()
|
temp_file = tempfile.NamedTemporaryFile('w')
|
||||||
with patch('tempfile.NamedTemporaryFile', return_value=temp_file):
|
with patch('tempfile.NamedTemporaryFile', return_value=temp_file):
|
||||||
with ExpectedException(Exception, "Error while executing 'copy'."):
|
with ExpectedException(Exception, "Error while executing 'copy'."):
|
||||||
operating_system.write_file(target_file.name,
|
operating_system.write_file(target_file.name,
|
||||||
"Lorem Ipsum", as_root=True)
|
"Lorem Ipsum", as_root=True)
|
||||||
self.assertFalse(os.path.exists(temp_file.name))
|
self.assertFalse(os.path.exists(temp_file.name))
|
||||||
|
|
||||||
|
@patch.object(operating_system, 'exists', return_value=True)
|
||||||
|
@patch.object(operating_system, 'copy')
|
||||||
|
@patch.object(operating_system, 'chmod')
|
||||||
|
@patch.object(IdentityCodec, 'deserialize')
|
||||||
|
@patch.object(IdentityCodec, 'serialize')
|
||||||
|
@patch.object(operating_system, 'open',
|
||||||
|
mock_open(read_data='MockingRead'))
|
||||||
|
def test_read_file_with_flags_and_conv_func(self, mock_serialize,
|
||||||
|
mock_deserialize,
|
||||||
|
mock_chmod, mock_copy,
|
||||||
|
*args):
|
||||||
|
test_path = '/path/of/file'
|
||||||
|
test_data = 'MockingRead'
|
||||||
|
# use getattr to avoid pylint 'no-member' warning
|
||||||
|
mock_file = getattr(operating_system, 'open')
|
||||||
|
|
||||||
|
# simple read
|
||||||
|
operating_system.read_file(test_path)
|
||||||
|
mock_file.assert_called_once_with(test_path, 'r')
|
||||||
|
mock_file().read.assert_called_once()
|
||||||
|
mock_deserialize.called_once_with(test_data)
|
||||||
|
mock_file.reset_mock()
|
||||||
|
mock_deserialize.reset_mock()
|
||||||
|
|
||||||
|
# read with decode=False
|
||||||
|
operating_system.read_file(test_path, decode=False)
|
||||||
|
mock_file.assert_called_once_with(test_path, 'rb')
|
||||||
|
mock_file().read.assert_called_once()
|
||||||
|
mock_serialize.called_once_with(test_data)
|
||||||
|
mock_file.reset_mock()
|
||||||
|
mock_serialize.reset_mock()
|
||||||
|
|
||||||
|
# checking _read_file_as_root arguments
|
||||||
|
with patch.object(operating_system,
|
||||||
|
'_read_file_as_root') as mock_read_file_as_root:
|
||||||
|
# simple read as root,
|
||||||
|
operating_system.read_file(test_path, as_root=True)
|
||||||
|
mock_read_file_as_root.assert_called_once_with(
|
||||||
|
test_path, 'r', mock_deserialize)
|
||||||
|
mock_deserialize.assert_not_called()
|
||||||
|
mock_read_file_as_root.reset_mock()
|
||||||
|
|
||||||
|
# read as root with decode=False,
|
||||||
|
operating_system.read_file(test_path, as_root=True, decode=False)
|
||||||
|
mock_read_file_as_root.assert_called_once_with(
|
||||||
|
test_path, 'rb', mock_serialize)
|
||||||
|
mock_serialize.assert_not_called()
|
||||||
|
|
||||||
|
# simple read as root
|
||||||
|
temp_file = tempfile.NamedTemporaryFile('r')
|
||||||
|
with patch.object(tempfile, 'NamedTemporaryFile',
|
||||||
|
return_value=temp_file) as mock_temp_file:
|
||||||
|
operating_system.read_file(test_path, as_root=True)
|
||||||
|
mock_temp_file.assert_called_once_with('r')
|
||||||
|
mock_copy.called_once_with(test_path, temp_file.name,
|
||||||
|
force=True, dereference=True,
|
||||||
|
as_root=True)
|
||||||
|
mock_chmod.called_once_with(temp_file.name,
|
||||||
|
FileMode.ADD_READ_ALL(),
|
||||||
|
as_root=True)
|
||||||
|
mock_deserialize.assert_called_once_with('')
|
||||||
|
self.assertFalse(os.path.exists(temp_file.name))
|
||||||
|
mock_copy.reset_mock()
|
||||||
|
mock_chmod.reset_mock()
|
||||||
|
mock_deserialize.reset_mock()
|
||||||
|
|
||||||
|
# read as root with decode=False
|
||||||
|
temp_file = tempfile.NamedTemporaryFile('rb')
|
||||||
|
with patch.object(tempfile, 'NamedTemporaryFile',
|
||||||
|
return_value=temp_file) as mock_temp_file:
|
||||||
|
operating_system.read_file(test_path, as_root=True,
|
||||||
|
decode=False)
|
||||||
|
mock_temp_file.assert_called_once_with('rb')
|
||||||
|
mock_copy.called_once_with(test_path, temp_file.name,
|
||||||
|
force=True, dereference=True,
|
||||||
|
as_root=True)
|
||||||
|
mock_chmod.called_once_with(temp_file.name,
|
||||||
|
FileMode.ADD_READ_ALL(),
|
||||||
|
as_root=True)
|
||||||
|
mock_serialize.assert_called_once_with(b'')
|
||||||
|
self.assertFalse(os.path.exists(temp_file.name))
|
||||||
|
|
||||||
|
@patch.object(operating_system, 'copy')
|
||||||
|
@patch.object(operating_system, 'chmod')
|
||||||
|
@patch.object(IdentityCodec, 'deserialize',
|
||||||
|
return_value=b'DeseiralizedData')
|
||||||
|
@patch.object(IdentityCodec, 'serialize',
|
||||||
|
return_value='SerializedData')
|
||||||
|
@patch.object(operating_system, 'open', mock_open())
|
||||||
|
def test_write_file_with_flags_and_conv_func(self, mock_serialize,
|
||||||
|
mock_deserialize,
|
||||||
|
mock_chmod, mock_copy):
|
||||||
|
test_path = '/path/of/file'
|
||||||
|
test_data = 'MockingWrite'
|
||||||
|
test_serialize = 'SerializedData'
|
||||||
|
test_deserialize = b'DeseiralizedData'
|
||||||
|
mock_file = getattr(operating_system, 'open')
|
||||||
|
|
||||||
|
# simple write
|
||||||
|
operating_system.write_file(test_path, test_data)
|
||||||
|
mock_file.assert_called_once_with(test_path, 'w')
|
||||||
|
mock_serialize.called_once_with(test_data)
|
||||||
|
mock_file().write.assert_called_once_with(test_serialize)
|
||||||
|
mock_file().flush.assert_called_once()
|
||||||
|
mock_file.reset_mock()
|
||||||
|
mock_serialize.reset_mock()
|
||||||
|
|
||||||
|
# write with encode=False
|
||||||
|
operating_system.write_file(test_path, test_data, encode=False)
|
||||||
|
mock_file.assert_called_once_with(test_path, 'wb')
|
||||||
|
mock_deserialize.called_once_with(test_data)
|
||||||
|
mock_file().write.assert_called_once_with(test_deserialize)
|
||||||
|
mock_file().flush.assert_called_once()
|
||||||
|
mock_file.reset_mock()
|
||||||
|
mock_deserialize.reset_mock()
|
||||||
|
|
||||||
|
# checking _write_file_as_root arguments
|
||||||
|
with patch.object(operating_system,
|
||||||
|
'_write_file_as_root') as mock_write_file_as_root:
|
||||||
|
# simple write as root,
|
||||||
|
operating_system.write_file(test_path, test_data, as_root=True)
|
||||||
|
mock_write_file_as_root.assert_called_once_with(
|
||||||
|
test_path, test_data, 'w', mock_serialize)
|
||||||
|
mock_serialize.assert_not_called()
|
||||||
|
mock_write_file_as_root.reset_mock()
|
||||||
|
|
||||||
|
# read as root with encode=False,
|
||||||
|
operating_system.write_file(test_path, test_data,
|
||||||
|
as_root=True, encode=False)
|
||||||
|
mock_write_file_as_root.assert_called_once_with(
|
||||||
|
test_path, test_data, 'wb', mock_deserialize)
|
||||||
|
mock_deserialize.assert_not_called()
|
||||||
|
|
||||||
|
# simple write as root
|
||||||
|
temp_file = tempfile.NamedTemporaryFile('w')
|
||||||
|
with patch.object(tempfile, 'NamedTemporaryFile',
|
||||||
|
return_value=temp_file) as mock_temp_file:
|
||||||
|
operating_system.write_file(test_path, test_data, as_root=True)
|
||||||
|
mock_temp_file.assert_called_once_with('w', delete=False)
|
||||||
|
mock_serialize.assert_called_once_with(test_data)
|
||||||
|
mock_copy.called_once_with(temp_file.name, test_path,
|
||||||
|
force=True, as_root=True)
|
||||||
|
self.assertFalse(os.path.exists(temp_file.name))
|
||||||
|
mock_copy.reset_mock()
|
||||||
|
mock_chmod.reset_mock()
|
||||||
|
mock_serialize.reset_mock()
|
||||||
|
|
||||||
|
# write as root with decode=False
|
||||||
|
temp_file = tempfile.NamedTemporaryFile('wb')
|
||||||
|
with patch.object(tempfile, 'NamedTemporaryFile',
|
||||||
|
return_value=temp_file) as mock_temp_file:
|
||||||
|
operating_system.write_file(test_path, test_data,
|
||||||
|
as_root=True, encode=False)
|
||||||
|
mock_temp_file.assert_called_once_with('wb', delete=False)
|
||||||
|
mock_deserialize.assert_called_once_with(test_data)
|
||||||
|
mock_copy.called_once_with(temp_file.name, test_path,
|
||||||
|
force=True, as_root=True)
|
||||||
|
self.assertFalse(os.path.exists(temp_file.name))
|
||||||
|
|
||||||
def test_start_service(self):
|
def test_start_service(self):
|
||||||
self._assert_service_call(operating_system.start_service,
|
self._assert_service_call(operating_system.start_service,
|
||||||
'cmd_start')
|
'cmd_start')
|
||||||
|
Loading…
Reference in New Issue
Block a user