Remove all usage of six library

Replace six with Python 3 style code.

Change-Id: I52aa4a0f679b56ffa57cdd2199933e36cf076992
This commit is contained in:
haixin 2020-10-07 11:48:23 +08:00
parent 87d2a1f9d4
commit cfe9881e6b
18 changed files with 122 additions and 263 deletions

View File

@ -29,7 +29,6 @@ PyYAML==3.13
requests==2.14.2
requestsexceptions==1.2.0
rfc3986==0.3.1
six==1.10.0
smmap==0.9.0
stestr==2.0.0
stevedore==1.20.0

View File

@ -13,8 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import six
def flatten_dict_to_keypairs(d, separator=':'):
"""Generator that produces sequence of keypairs for nested dictionaries.
@ -22,7 +20,7 @@ def flatten_dict_to_keypairs(d, separator=':'):
:param d: dictionaries which may be nested
:param separator: symbol between names
"""
for name, value in sorted(six.iteritems(d)):
for name, value in sorted(iter(d.items())):
if isinstance(value, dict):
for subname, subvalue in flatten_dict_to_keypairs(value,
separator):

View File

@ -15,8 +15,6 @@
import sys
import six
# NOTE(blk-u): This provides a symbol that can be overridden just for this
# module during testing. sys.getfilesystemencoding() is called by coverage so
@ -35,10 +33,10 @@ def safe_decode(text, incoming=None, errors='strict'):
representation of it.
:raises TypeError: If text is not an instance of str
"""
if not isinstance(text, (six.string_types, six.binary_type)):
if not isinstance(text, (str, bytes)):
raise TypeError("%s can't be decoded" % type(text))
if isinstance(text, six.text_type):
if isinstance(text, str):
return text
if not incoming:
@ -81,7 +79,7 @@ def safe_encode(text, incoming=None,
See also to_utf8() function which is simpler and don't depend on
the locale encoding.
"""
if not isinstance(text, (six.string_types, six.binary_type)):
if not isinstance(text, (str, bytes)):
raise TypeError("%s can't be encoded" % type(text))
if not incoming:
@ -94,7 +92,7 @@ def safe_encode(text, incoming=None,
if hasattr(encoding, 'lower'):
encoding = encoding.lower()
if isinstance(text, six.text_type):
if isinstance(text, str):
return text.encode(encoding, errors)
elif text and encoding != incoming:
# Decode text before encoding it with `encoding`
@ -111,9 +109,9 @@ def to_utf8(text):
.. versionadded:: 3.5
"""
if isinstance(text, six.binary_type):
if isinstance(text, bytes):
return text
elif isinstance(text, six.text_type):
elif isinstance(text, str):
return text.encode('utf-8')
else:
raise TypeError("bytes or Unicode expected, got %s"
@ -133,24 +131,6 @@ def exception_to_unicode(exc):
.. versionadded:: 1.6
"""
msg = None
if six.PY2:
# First try by calling the unicode type constructor. We should try
# unicode() before exc.__unicode__() because subclasses of unicode can
# be easily casted to unicode, whereas they have no __unicode__()
# method.
try:
msg = unicode(exc) # NOQA
except UnicodeError:
# unicode(exc) fail with UnicodeDecodeError on Python 2 if
# exc.__unicode__() or exc.__str__() returns a bytes string not
# decodable from the default encoding (ASCII)
if hasattr(exc, '__unicode__'):
# Call directly the __unicode__() method to avoid
# the implicit decoding from the default encoding
try:
msg = exc.__unicode__()
except UnicodeError: # nosec
pass
if msg is None:
# Don't call directly str(exc), because it fails with
@ -158,7 +138,7 @@ def exception_to_unicode(exc):
# string not encodable to the default encoding (ASCII)
msg = exc.__str__()
if isinstance(msg, six.text_type):
if isinstance(msg, str):
# This should be the default path on Python 3 and an *optional* path
# on Python 2 (if for some reason the exception message was already
# in unicode instead of the more typical bytes string); so avoid

View File

@ -18,14 +18,13 @@ Exception related utilities.
"""
import functools
import io
import logging
import os
import sys
import time
import traceback
import six
from oslo_utils import encodeutils
from oslo_utils import reflection
@ -70,7 +69,7 @@ class CausedByException(Exception):
if indent < 0:
raise ValueError("Provided 'indent' must be greater than"
" or equal to zero instead of %s" % indent)
buf = six.StringIO()
buf = io.StringIO()
if show_root_class:
buf.write(reflection.get_class_name(self, fully_qualified=False))
buf.write(": ")
@ -140,7 +139,7 @@ def raise_with_cause(exc_cls, message, *args, **kwargs):
# Leave no references around (especially with regards to
# tracebacks and any variables that it retains internally).
del(exc_type, exc, exc_tb)
six.raise_from(exc_cls(message, *args, **kwargs), kwargs.get('cause'))
raise exc_cls(message, *args, **kwargs) from kwargs.get('cause')
class save_and_reraise_exception(object):
@ -193,7 +192,15 @@ class save_and_reraise_exception(object):
if self.type_ is None and self.value is None:
raise RuntimeError("There is no (currently) captured exception"
" to force the reraising of")
six.reraise(self.type_, self.value, self.tb)
try:
if self.value is None:
self.value = self.type_()
if self.value.__traceback__ is not self.tb:
raise self.value.with_traceback(self.tb)
raise self.value
finally:
self.value = None
self.tb = None
def capture(self, check=True):
(type_, value, tb) = sys.exc_info()
@ -240,7 +247,7 @@ def forever_retry_uncaught_exceptions(*args, **kwargs):
retry_delay = max(0.0, float(kwargs.get('retry_delay', 1.0)))
same_log_delay = max(0.0, float(kwargs.get('same_log_delay', 60.0)))
@six.wraps(infunc)
@functools.wraps(infunc)
def wrapper(*args, **kwargs):
last_exc_message = None
same_failure_count = 0
@ -339,7 +346,15 @@ class exception_filter(object):
try:
if not self._should_ignore_ex(ex):
if exc_val is ex:
six.reraise(exc_type, exc_val, traceback)
try:
if exc_val is None:
exc_val = exc_type()
if exc_val.__traceback__ is not traceback:
raise exc_val.with_traceback(traceback)
raise exc_val
finally:
exc_val = None
traceback = None
else:
raise ex
finally:

View File

@ -21,11 +21,10 @@ import logging
import os
import re
import socket
from urllib import parse
import netaddr
import netifaces
import six
from six.moves.urllib import parse
from oslo_utils._i18n import _
@ -284,7 +283,7 @@ def is_valid_mac(address):
.. versionadded:: 3.17
"""
m = "[0-9a-f]{2}(:[0-9a-f]{2}){5}$"
return (isinstance(address, six.string_types) and
return (isinstance(address, str) and
re.match(m, address.lower()))

View File

@ -22,10 +22,9 @@ Reflection module.
import inspect
import logging
import operator
import types
import six
try:
_TYPE_TYPE = types.TypeType
except AttributeError:
@ -40,16 +39,9 @@ _BUILTIN_MODULES = ('builtins', '__builtin__', '__builtins__', 'exceptions')
LOG = logging.getLogger(__name__)
if six.PY3:
Parameter = inspect.Parameter
Signature = inspect.Signature
get_signature = inspect.signature
else:
# Provide an equivalent but use funcsigs instead...
import funcsigs
Parameter = funcsigs.Parameter
Signature = funcsigs.Signature
get_signature = funcsigs.signature
def get_members(obj, exclude_hidden=True):
@ -85,7 +77,7 @@ def get_class_name(obj, fully_qualified=True, truncate_builtins=True):
if inspect.ismethod(obj):
obj = get_method_self(obj)
if not isinstance(obj, six.class_types):
if not isinstance(obj, type):
obj = type(obj)
if truncate_builtins:
try:
@ -109,7 +101,7 @@ def get_all_class_names(obj, up_to=object,
in order of method resolution (mro). If up_to parameter is provided,
only name of classes that are sublcasses to that class are returned.
"""
if not isinstance(obj, six.class_types):
if not isinstance(obj, type):
obj = type(obj)
for cls in obj.mro():
if issubclass(cls, up_to):
@ -126,7 +118,7 @@ def get_callable_name(function):
method_self = get_method_self(function)
if method_self is not None:
# This is a bound method.
if isinstance(method_self, six.class_types):
if isinstance(method_self, type):
# This is a bound class method.
im_class = method_self
else:
@ -163,7 +155,7 @@ def get_method_self(method):
if not inspect.ismethod(method):
return None
try:
return six.get_method_self(method)
return operator.attrgetter("__self__")(method)
except AttributeError:
return None
@ -203,8 +195,8 @@ def is_same_callback(callback1, callback2, strict=True):
# another object if the objects have __eq__ methods that return true
# (when in fact it is a different bound method). Python u so crazy!
try:
self1 = six.get_method_self(callback1)
self2 = six.get_method_self(callback2)
self1 = operator.attrgetter("__self__")(callback1)
self2 = operator.attrgetter("__self__")(callback2)
return self1 is self2
except AttributeError: # nosec
pass
@ -231,8 +223,8 @@ def get_callable_args(function, required_only=False):
are not included into output.
"""
sig = get_signature(function)
function_args = list(six.iterkeys(sig.parameters))
for param_name, p in six.iteritems(sig.parameters):
function_args = list(iter(sig.parameters.keys()))
for param_name, p in iter(sig.parameters.items()):
if (p.kind in (Parameter.VAR_POSITIONAL, Parameter.VAR_KEYWORD) or
(required_only and p.default is not Parameter.empty)):
function_args.remove(param_name)
@ -243,4 +235,4 @@ def accepts_kwargs(function):
"""Returns ``True`` if function accepts kwargs otherwise ``False``."""
sig = get_signature(function)
return any(p.kind == Parameter.VAR_KEYWORD
for p in six.itervalues(sig.parameters))
for p in iter(sig.parameters.values()))

View File

@ -21,10 +21,9 @@ import collections
import math
import re
import unicodedata
import urllib
import pyparsing as pp
import six
from six.moves import urllib
from oslo_utils._i18n import _
from oslo_utils import encodeutils
@ -138,8 +137,8 @@ def bool_from_string(subject, strict=False, default=False):
"""
if isinstance(subject, bool):
return subject
if not isinstance(subject, six.string_types):
subject = six.text_type(subject)
if not isinstance(subject, str):
subject = str(subject)
lowered = subject.strip().lower()
@ -324,7 +323,7 @@ def mask_password(message, secret="***"): # nosec
"""
try:
message = six.text_type(message)
message = str(message)
except UnicodeDecodeError: # nosec
# NOTE(jecarey): Temporary fix to handle cases where message is a
# byte string. A better solution will be provided in Kilo.
@ -411,7 +410,7 @@ def mask_dict_password(dictionary, secret="***"): # nosec
# NOTE(jlvillal): Check to see if anything in the dictionary 'key'
# contains any key specified in _SANITIZE_KEYS.
k_matched = False
if isinstance(k, six.string_types):
if isinstance(k, str):
for sani_key in _SANITIZE_KEYS:
if sani_key in k.lower():
out[k] = secret
@ -420,7 +419,7 @@ def mask_dict_password(dictionary, secret="***"): # nosec
if not k_matched:
# We did not find a match for the key name in the
# _SANITIZE_KEYS, so we fall through to here
if isinstance(v, six.string_types):
if isinstance(v, str):
out[k] = mask_password(v, secret=secret)
else:
# Just leave it alone.
@ -438,7 +437,7 @@ def is_int_like(val):
.. versionadded:: 1.1
"""
try:
return six.text_type(int(val)) == six.text_type(val)
return str(int(val)) == str(val)
except (TypeError, ValueError):
return False
@ -457,7 +456,7 @@ def check_string_length(value, name=None, min_length=0, max_length=None):
if name is None:
name = value
if not isinstance(value, six.string_types):
if not isinstance(value, str):
msg = _("%s is not a string or unicode") % name
raise TypeError(msg)

View File

@ -19,7 +19,6 @@ import warnings
import eventlet
from eventlet import greenthread
from oslotest import base as test_base
import six
from oslo_utils import eventletutils
@ -44,7 +43,7 @@ class EventletUtilsTest(test_base.BaseTestCase):
self.assertEqual(1, len(capture))
w = capture[0]
self.assertEqual(RuntimeWarning, w.category)
self.assertIn('os', six.text_type(w.message))
self.assertIn('os', str(w.message))
@mock.patch("oslo_utils.eventletutils._patcher")
def test_warning_not_patched_none_provided(self, mock_patcher):
@ -57,7 +56,7 @@ class EventletUtilsTest(test_base.BaseTestCase):
w = capture[0]
self.assertEqual(RuntimeWarning, w.category)
for m in eventletutils._ALL_PATCH:
self.assertIn(m, six.text_type(w.message))
self.assertIn(m, str(w.message))
@mock.patch("oslo_utils.eventletutils._patcher")
def test_warning_not_patched_all(self, mock_patcher):
@ -70,7 +69,7 @@ class EventletUtilsTest(test_base.BaseTestCase):
w = capture[0]
self.assertEqual(RuntimeWarning, w.category)
for m in eventletutils._ALL_PATCH:
self.assertIn(m, six.text_type(w.message))
self.assertIn(m, str(w.message))
@mock.patch("oslo_utils.eventletutils._patcher")
def test_no_warning(self, mock_patcher):
@ -118,7 +117,7 @@ class EventletUtilsTest(test_base.BaseTestCase):
w = capture[0]
self.assertEqual(RuntimeWarning, w.category)
for m in ['os', 'thread']:
self.assertNotIn(m, six.text_type(w.message))
self.assertNotIn(m, str(w.message))
def test_invalid_patch_check(self):
self.assertRaises(ValueError,
@ -133,10 +132,7 @@ class EventletUtilsTest(test_base.BaseTestCase):
self.assertIsInstance(e_event, eventletutils.EventletEvent)
t_event = eventletutils.Event()
if six.PY3:
t_event_cls = threading.Event
else:
t_event_cls = threading._Event
self.assertIsInstance(t_event, t_event_cls)
public_methods = [m for m in dir(t_event) if not m.startswith("_") and

View File

@ -26,7 +26,6 @@ import uuid
import yaml
from oslotest import base as test_base
import six
from oslo_utils import fileutils
@ -131,7 +130,7 @@ class WriteToTempfileTestCase(test_base.BaseTestCase):
def check_file_content(self, path):
with open(path, 'r') as fd:
ans = fd.read()
self.assertEqual(self.content, six.b(ans))
self.assertEqual(self.content, ans.encode("latin-1"))
def test_file_without_path_and_suffix(self):
res = fileutils.write_to_tempfile(self.content)
@ -204,7 +203,7 @@ class TestComputeFileChecksum(test_base.BaseTestCase):
def check_file_content(self, content, path):
with open(path, 'r') as fd:
ans = fd.read()
self.assertEqual(content, six.b(ans))
self.assertEqual(content, ans.encode("latin-1"))
def test_compute_checksum_default_algorithm(self):
path = fileutils.write_to_tempfile(self.content)

View File

@ -17,7 +17,6 @@
import datetime
from oslotest import base as test_base
import six
from oslo_utils import fixture
from oslo_utils.fixture import uuidsentinel as uuids
@ -81,4 +80,4 @@ class UUIDSentinelsTest(test_base.BaseTestCase):
def test_with_underline_prefix(self):
ex = self.assertRaises(AttributeError, getattr, uuids, '_foo')
self.assertIn("Sentinels must not start with _", six.text_type(ex))
self.assertIn("Sentinels must not start with _", str(ex))

View File

@ -13,11 +13,9 @@
import fnmatch as standard_fnmatch
import ntpath
import posixpath
import sys
from unittest import mock
from oslotest import base
import six
fnmatch = None
@ -48,12 +46,3 @@ class TestFnmatch(base.BaseTestCase):
fnmatch = standard_fnmatch
self._test_fnmatch_posix_nt()
with mock.patch.object(sys, 'version_info', new=(2, 7, 11)):
from oslo_utils import fnmatch as oslo_fnmatch
fnmatch = oslo_fnmatch
self._test_fnmatch_posix_nt()
with mock.patch.object(sys, 'version_info', new=(2, 7, 0)):
six.moves.reload_module(oslo_fnmatch)
self._test_fnmatch_posix_nt()

View File

@ -14,13 +14,13 @@
# under the License.
import contextlib
import io
import socket
from unittest import mock
import netaddr
import netifaces
from oslotest import base as test_base
import six
from oslo_utils import netutils
@ -393,7 +393,7 @@ class MACbyIPv6TestCase(test_base.BaseTestCase):
@contextlib.contextmanager
def mock_file_content(content):
# Allows StringIO to act like a context manager-enabled file.
yield six.StringIO(content)
yield io.StringIO(content)
class TestIsIPv6Enabled(test_base.BaseTestCase):
@ -407,19 +407,19 @@ class TestIsIPv6Enabled(test_base.BaseTestCase):
self.addCleanup(reset_detection_flag)
@mock.patch('os.path.exists', return_value=True)
@mock.patch('six.moves.builtins.open', return_value=mock_file_content('0'))
@mock.patch('builtins.open', return_value=mock_file_content('0'))
def test_enabled(self, mock_open, exists):
enabled = netutils.is_ipv6_enabled()
self.assertTrue(enabled)
@mock.patch('os.path.exists', return_value=True)
@mock.patch('six.moves.builtins.open', return_value=mock_file_content('1'))
@mock.patch('builtins.open', return_value=mock_file_content('1'))
def test_disabled(self, mock_open, exists):
enabled = netutils.is_ipv6_enabled()
self.assertFalse(enabled)
@mock.patch('os.path.exists', return_value=False)
@mock.patch('six.moves.builtins.open',
@mock.patch('builtins.open',
side_effect=AssertionError('should not read'))
def test_disabled_non_exists(self, mock_open, exists):
enabled = netutils.is_ipv6_enabled()
@ -429,14 +429,14 @@ class TestIsIPv6Enabled(test_base.BaseTestCase):
def test_memoize_enabled(self, exists):
# Reset the flag to appear that we haven't looked for it yet.
netutils._IS_IPV6_ENABLED = None
with mock.patch('six.moves.builtins.open',
with mock.patch('builtins.open',
return_value=mock_file_content('0')) as mock_open:
enabled = netutils.is_ipv6_enabled()
self.assertTrue(mock_open.called)
self.assertTrue(netutils._IS_IPV6_ENABLED)
self.assertTrue(enabled)
# The second call should not use open again
with mock.patch('six.moves.builtins.open',
with mock.patch('builtins.open',
side_effect=AssertionError('should not be called')):
enabled = netutils.is_ipv6_enabled()
self.assertTrue(enabled)
@ -445,18 +445,18 @@ class TestIsIPv6Enabled(test_base.BaseTestCase):
def test_memoize_disabled(self, exists):
# Reset the flag to appear that we haven't looked for it yet.
netutils._IS_IPV6_ENABLED = None
with mock.patch('six.moves.builtins.open',
with mock.patch('builtins.open',
return_value=mock_file_content('1')):
enabled = netutils.is_ipv6_enabled()
self.assertFalse(enabled)
# The second call should not use open again
with mock.patch('six.moves.builtins.open',
with mock.patch('builtins.open',
side_effect=AssertionError('should not be called')):
enabled = netutils.is_ipv6_enabled()
self.assertFalse(enabled)
@mock.patch('os.path.exists', return_value=False)
@mock.patch('six.moves.builtins.open',
@mock.patch('builtins.open',
side_effect=AssertionError('should not read'))
def test_memoize_not_exists(self, mock_open, exists):
# Reset the flag to appear that we haven't looked for it yet.

View File

@ -14,26 +14,21 @@
# License for the specific language governing permissions and limitations
# under the License.
import functools
import sys
from oslotest import base as test_base
import six
import testtools
from oslo_utils import reflection
if six.PY3:
RUNTIME_ERROR_CLASSES = ['RuntimeError', 'Exception',
'BaseException', 'object']
else:
RUNTIME_ERROR_CLASSES = ['RuntimeError', 'StandardError', 'Exception',
'BaseException', 'object']
def dummy_decorator(f):
@six.wraps(f)
@functools.wraps(f)
def wrapper(*args, **kwargs):
return f(*args, **kwargs)
@ -192,13 +187,8 @@ class GetCallableNameTest(test_base.BaseTestCase):
def test_static_method(self):
name = reflection.get_callable_name(Class.static_method)
if six.PY3:
self.assertEqual('.'.join((__name__, 'Class', 'static_method')),
name)
else:
# NOTE(imelnikov): static method are just functions, class name
# is not recorded anywhere in them.
self.assertEqual('.'.join((__name__, 'static_method')), name)
def test_class_method(self):
name = reflection.get_callable_name(Class.class_method)
@ -218,9 +208,6 @@ class GetCallableNameTest(test_base.BaseTestCase):
'__call__')), name)
# These extended/special case tests only work on python 3, due to python 2
# being broken/incorrect with regard to these special cases...
@testtools.skipIf(not six.PY3, 'python 3.x is not currently available')
class GetCallableNameTestExtended(test_base.BaseTestCase):
# Tests items in http://legacy.python.org/dev/peps/pep-3155/

View File

@ -22,7 +22,6 @@ from unittest import mock
import ddt
from oslotest import base as test_base
import six
import testscenarios
from oslo_utils import strutils
@ -33,12 +32,6 @@ load_tests = testscenarios.load_tests_apply_scenarios
class StrUtilsTest(test_base.BaseTestCase):
@mock.patch("six.text_type")
def test_bool_bool_from_string_no_text(self, mock_text):
self.assertTrue(strutils.bool_from_string(True))
self.assertFalse(strutils.bool_from_string(False))
self.assertEqual(0, mock_text.call_count)
def test_bool_bool_from_string(self):
self.assertTrue(strutils.bool_from_string(True))
self.assertFalse(strutils.bool_from_string(False))
@ -85,7 +78,7 @@ class StrUtilsTest(test_base.BaseTestCase):
self._test_bool_from_string(lambda s: s)
def test_unicode_bool_from_string(self):
self._test_bool_from_string(six.text_type)
self._test_bool_from_string(str)
self.assertFalse(strutils.bool_from_string(u'使用', strict=False))
exc = self.assertRaises(ValueError, strutils.bool_from_string,
@ -93,7 +86,7 @@ class StrUtilsTest(test_base.BaseTestCase):
expected_msg = (u"Unrecognized value '使用', acceptable values are:"
u" '0', '1', 'f', 'false', 'n', 'no', 'off', 'on',"
u" 't', 'true', 'y', 'yes'")
self.assertEqual(expected_msg, six.text_type(exc))
self.assertEqual(expected_msg, str(exc))
def test_other_bool_from_string(self):
self.assertFalse(strutils.bool_from_string(None))
@ -170,16 +163,17 @@ class StrUtilsTest(test_base.BaseTestCase):
def test_slugify(self):
to_slug = strutils.to_slug
self.assertRaises(TypeError, to_slug, True)
self.assertEqual(six.u("hello"), to_slug("hello"))
self.assertEqual(six.u("two-words"), to_slug("Two Words"))
self.assertEqual(six.u("ma-any-spa-ce-es"),
self.assertEqual("hello", to_slug("hello"))
self.assertEqual("two-words", to_slug("Two Words"))
self.assertEqual("ma-any-spa-ce-es",
to_slug("Ma-any\t spa--ce- es"))
self.assertEqual(six.u("excamation"), to_slug("exc!amation!"))
self.assertEqual(six.u("ampserand"), to_slug("&ampser$and"))
self.assertEqual(six.u("ju5tnum8er"), to_slug("ju5tnum8er"))
self.assertEqual(six.u("strip-"), to_slug(" strip - "))
self.assertEqual(six.u("perche"), to_slug(six.b("perch\xc3\xa9")))
self.assertEqual(six.u("strange"),
self.assertEqual("excamation", to_slug("exc!amation!"))
self.assertEqual("ampserand", to_slug("&ampser$and"))
self.assertEqual("ju5tnum8er", to_slug("ju5tnum8er"))
self.assertEqual("strip-", to_slug(" strip - "))
self.assertEqual("perche",
to_slug("perch\xc3\xa9".encode("latin-1")))
self.assertEqual("strange",
to_slug("\x80strange", errors="ignore"))
@ -619,12 +613,12 @@ class MaskPasswordTestCase(test_base.BaseTestCase):
self.assertEqual(expected, strutils.mask_password(payload))
payload = """{'adminPass':'TL0EfN33'}"""
payload = six.text_type(payload)
payload = str(payload)
expected = """{'adminPass':'***'}"""
self.assertEqual(expected, strutils.mask_password(payload))
payload = """{'token':'mytoken'}"""
payload = six.text_type(payload)
payload = str(payload)
expected = """{'token':'***'}"""
self.assertEqual(expected, strutils.mask_password(payload))
@ -950,7 +944,7 @@ class ValidateIntegerTestCase(test_base.BaseTestCase):
"min_value": 300, "max_value": 300},
{"value": 55, "name": "doing 55 in a 54",
"max_value": 54},
{"value": six.unichr(129), "name": "UnicodeError",
{"value": chr(129), "name": "UnicodeError",
"max_value": 1000})
def test_invalid_inputs(self, value, name, **kwargs):
self.assertRaises(ValueError, strutils.validate_integer,

View File

@ -19,8 +19,6 @@ from unittest import mock
from oslo_i18n import fixture as oslo_i18n_fixture
from oslotest import base as test_base
import six
import testtools
from oslo_utils import encodeutils
@ -30,25 +28,23 @@ class EncodeUtilsTest(test_base.BaseTestCase):
def test_safe_decode(self):
safe_decode = encodeutils.safe_decode
self.assertRaises(TypeError, safe_decode, True)
self.assertEqual(six.u('ni\xf1o'), safe_decode(six.b("ni\xc3\xb1o"),
self.assertEqual('ni\xf1o',
safe_decode("ni\xc3\xb1o".encode("latin-1"),
incoming="utf-8"))
if six.PY2:
# In Python 3, bytes.decode() doesn't support anymore
# bytes => bytes encodings like base64
self.assertEqual(six.u("test"), safe_decode("dGVzdA==",
incoming='base64'))
self.assertEqual(six.u("strange"), safe_decode(six.b('\x80strange'),
self.assertEqual("strange",
safe_decode('\x80strange'.encode("latin-1"),
errors='ignore'))
self.assertEqual(six.u('\xc0'), safe_decode(six.b('\xc0'),
self.assertEqual('\xc0', safe_decode('\xc0'.encode("latin-1"),
incoming='iso-8859-1'))
# Forcing incoming to ascii so it falls back to utf-8
self.assertEqual(six.u('ni\xf1o'), safe_decode(six.b('ni\xc3\xb1o'),
self.assertEqual('ni\xf1o',
safe_decode('ni\xc3\xb1o'.encode("latin-1"),
incoming='ascii'))
self.assertEqual(six.u('foo'), safe_decode(b'foo'))
self.assertEqual('foo', safe_decode(b'foo'))
def test_safe_encode_none_instead_of_text(self):
self.assertRaises(TypeError, encodeutils.safe_encode, None)
@ -68,28 +64,18 @@ class EncodeUtilsTest(test_base.BaseTestCase):
def test_safe_encode_tuple_instead_of_text(self):
self.assertRaises(TypeError, encodeutils.safe_encode, ('foo', 'bar', ))
def test_safe_encode_py2(self):
if six.PY2:
# In Python 3, str.encode() doesn't support anymore
# text => text encodings like base64
self.assertEqual(
six.b("dGVzdA==\n"),
encodeutils.safe_encode("test", encoding='base64'),
)
else:
self.skipTest("Requires py2.x")
def test_safe_encode_force_incoming_utf8_to_ascii(self):
# Forcing incoming to ascii so it falls back to utf-8
self.assertEqual(
six.b('ni\xc3\xb1o'),
encodeutils.safe_encode(six.b('ni\xc3\xb1o'), incoming='ascii'),
'ni\xc3\xb1o'.encode("latin-1"),
encodeutils.safe_encode('ni\xc3\xb1o'.encode("latin-1"),
incoming='ascii'),
)
def test_safe_encode_same_encoding_different_cases(self):
with mock.patch.object(encodeutils, 'safe_decode', mock.Mock()):
utf8 = encodeutils.safe_encode(
six.u('foo\xf1bar'), encoding='utf-8')
'foo\xf1bar', encoding='utf-8')
self.assertEqual(
encodeutils.safe_encode(utf8, 'UTF-8', 'utf-8'),
encodeutils.safe_encode(utf8, 'utf-8', 'UTF-8'),
@ -101,11 +87,11 @@ class EncodeUtilsTest(test_base.BaseTestCase):
encodeutils.safe_decode.assert_has_calls([])
def test_safe_encode_different_encodings(self):
text = six.u('foo\xc3\xb1bar')
text = 'foo\xc3\xb1bar'
result = encodeutils.safe_encode(
text=text, incoming='utf-8', encoding='iso-8859-1')
self.assertNotEqual(text, result)
self.assertNotEqual(six.b("foo\xf1bar"), result)
self.assertNotEqual("foo\xf1bar".encode("latin-1"), result)
def test_to_utf8(self):
self.assertEqual(encodeutils.to_utf8(b'a\xe9\xff'), # bytes
@ -115,7 +101,7 @@ class EncodeUtilsTest(test_base.BaseTestCase):
self.assertRaises(TypeError, encodeutils.to_utf8, 123) # invalid
# oslo.i18n Message objects should also be accepted for convenience.
# It works because Message is a subclass of six.text_type. Use the
# It works because Message is a subclass of str. Use the
# lazy translation to get a Message instance of oslo_i18n.
msg = oslo_i18n_fixture.Translation().lazy("test")
self.assertEqual(encodeutils.to_utf8(msg),
@ -177,79 +163,6 @@ class ExceptionToUnicodeTest(test_base.BaseTestCase):
self.assertEqual(encodeutils.exception_to_unicode(exc),
u'\u0420\u0443\u0441\u0441\u043a\u0438\u0439')
@testtools.skipIf(six.PY3, 'test specific to Python 2')
def test_unicode_exception(self):
# Exception with a __unicode__() method, but no __str__()
class UnicodeException(Exception):
def __init__(self, value):
Exception.__init__(self)
self.value = value
def __unicode__(self):
return self.value
# __unicode__() returns unicode
exc = UnicodeException(u'unicode \xe9\u20ac')
self.assertEqual(encodeutils.exception_to_unicode(exc),
u'unicode \xe9\u20ac')
# __unicode__() returns bytes (does this case really happen in the
# wild?)
exc = UnicodeException(b'utf-8 \xc3\xa9\xe2\x82\xac')
self.assertEqual(encodeutils.exception_to_unicode(exc),
u'utf-8 \xe9\u20ac')
@testtools.skipIf(six.PY3, 'test specific to Python 2')
def test_unicode_or_str_exception(self):
# Exception with __str__() and __unicode__() methods
class UnicodeOrStrException(Exception):
def __init__(self, unicode_value, str_value):
Exception.__init__(self)
self.unicode_value = unicode_value
self.str_value = str_value
def __unicode__(self):
return self.unicode_value
def __str__(self):
return self.str_value
# __unicode__() returns unicode
exc = UnicodeOrStrException(u'unicode \xe9\u20ac', b'str')
self.assertEqual(encodeutils.exception_to_unicode(exc),
u'unicode \xe9\u20ac')
# __unicode__() returns bytes (does this case really happen in the
# wild?)
exc = UnicodeOrStrException(b'utf-8 \xc3\xa9\xe2\x82\xac', b'str')
self.assertEqual(encodeutils.exception_to_unicode(exc),
u'utf-8 \xe9\u20ac')
@testtools.skipIf(six.PY3, 'test specific to Python 2')
def test_unicode_only_exception(self):
# Exception with a __unicode__() method and a __str__() which
# raises an exception (similar to the Message class of oslo_i18n)
class UnicodeOnlyException(Exception):
def __init__(self, value):
Exception.__init__(self)
self.value = value
def __unicode__(self):
return self.value
def __str__(self):
raise UnicodeError("use unicode()")
# __unicode__() returns unicode
exc = UnicodeOnlyException(u'unicode \xe9\u20ac')
self.assertEqual(encodeutils.exception_to_unicode(exc),
u'unicode \xe9\u20ac')
# __unicode__() returns bytes
exc = UnicodeOnlyException(b'utf-8 \xc3\xa9\xe2\x82\xac')
self.assertEqual(encodeutils.exception_to_unicode(exc),
u'utf-8 \xe9\u20ac')
def test_oslo_i18n_message(self):
# use the lazy translation to get a Message instance of oslo_i18n
exc = oslo_i18n_fixture.Translation().lazy("test")

View File

@ -19,13 +19,13 @@ Time related utilities and helper functions.
import calendar
import datetime
import functools
import logging
import time
from debtcollector import removals
import iso8601
import pytz
import six
from oslo_utils import reflection
@ -66,9 +66,9 @@ def parse_isotime(timestr):
try:
return iso8601.parse_date(timestr)
except iso8601.ParseError as e:
raise ValueError(six.text_type(e))
raise ValueError(str(e))
except TypeError as e:
raise ValueError(six.text_type(e))
raise ValueError(str(e))
@removals.remove(
@ -114,7 +114,7 @@ def is_older_than(before, seconds):
Accept datetime string with timezone information.
Fix comparison with timezone aware datetime.
"""
if isinstance(before, six.string_types):
if isinstance(before, str):
before = parse_isotime(before)
before = normalize_time(before)
@ -129,7 +129,7 @@ def is_newer_than(after, seconds):
Accept datetime string with timezone information.
Fix comparison with timezone aware datetime.
"""
if isinstance(after, six.string_types):
if isinstance(after, str):
after = parse_isotime(after)
after = normalize_time(after)
@ -377,7 +377,7 @@ def time_it(logger, log_level=logging.DEBUG,
if not enabled:
return func
@six.wraps(func)
@functools.wraps(func)
def wrapper(*args, **kwargs):
with StopWatch() as w:
result = func(*args, **kwargs)

View File

@ -19,8 +19,9 @@ Helpers for comparing version strings.
.. versionadded:: 1.6
"""
import functools
import packaging.version
import six
from oslo_utils._i18n import _
@ -57,13 +58,13 @@ def convert_version_to_int(version):
.. versionadded:: 2.0
"""
try:
if isinstance(version, six.string_types):
if isinstance(version, str):
version = convert_version_to_tuple(version)
if isinstance(version, tuple):
return six.moves.reduce(lambda x, y: (x * 1000) + y, version)
return functools.reduce(lambda x, y: (x * 1000) + y, version)
except Exception as ex:
msg = _("Version %s is invalid.") % version
six.raise_from(ValueError(msg), ex)
raise ValueError(msg) from ex
def convert_version_to_str(version_int):
@ -75,7 +76,7 @@ def convert_version_to_str(version_int):
factor = 1000
while version_int != 0:
version_number = version_int - (version_int // factor * factor)
version_numbers.insert(0, six.text_type(version_number))
version_numbers.insert(0, str(version_number))
version_int = version_int // factor
return '.'.join(map(str, version_numbers))

View File

@ -8,7 +8,6 @@
# that is a likely indicator that the feature belongs somewhere else.
pbr!=2.1.0,>=2.0.0 # Apache-2.0
six>=1.10.0 # MIT
iso8601>=0.1.11 # MIT
oslo.i18n>=3.15.3 # Apache-2.0
pytz>=2013.6 # MIT