Merge "Move over a reflection module that taskflow uses"
This commit is contained in:
commit
7b16c2dfac
208
oslo/utils/reflection.py
Normal file
208
oslo/utils/reflection.py
Normal file
@ -0,0 +1,208 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import inspect
|
||||
import types
|
||||
|
||||
import six
|
||||
|
||||
try:
|
||||
_TYPE_TYPE = types.TypeType
|
||||
except AttributeError:
|
||||
_TYPE_TYPE = type
|
||||
|
||||
# See: https://docs.python.org/2/library/__builtin__.html#module-__builtin__
|
||||
# and see https://docs.python.org/2/reference/executionmodel.html (and likely
|
||||
# others)...
|
||||
_BUILTIN_MODULES = ('builtins', '__builtin__', '__builtins__', 'exceptions')
|
||||
|
||||
|
||||
def _get_members(obj, exclude_hidden):
|
||||
"""Yields the members of an object, filtering by hidden/not hidden."""
|
||||
for (name, value) in inspect.getmembers(obj):
|
||||
if name.startswith("_") and exclude_hidden:
|
||||
continue
|
||||
yield (name, value)
|
||||
|
||||
|
||||
def get_member_names(obj, exclude_hidden=True):
|
||||
"""Get all the member names for a object."""
|
||||
return [name for (name, _obj) in _get_members(obj, exclude_hidden)]
|
||||
|
||||
|
||||
def get_class_name(obj, fully_qualified=True):
|
||||
"""Get class name for object.
|
||||
|
||||
If object is a type, fully qualified name of the type is returned.
|
||||
Else, fully qualified name of the type of the object is returned.
|
||||
For builtin types, just name is returned.
|
||||
"""
|
||||
if not isinstance(obj, six.class_types):
|
||||
obj = type(obj)
|
||||
try:
|
||||
built_in = obj.__module__ in _BUILTIN_MODULES
|
||||
except AttributeError:
|
||||
pass
|
||||
else:
|
||||
if built_in:
|
||||
try:
|
||||
return obj.__qualname__
|
||||
except AttributeError:
|
||||
return obj.__name__
|
||||
pieces = []
|
||||
try:
|
||||
pieces.append(obj.__qualname__)
|
||||
except AttributeError:
|
||||
pieces.append(obj.__name__)
|
||||
if fully_qualified:
|
||||
try:
|
||||
pieces.insert(0, obj.__module__)
|
||||
except AttributeError:
|
||||
pass
|
||||
return '.'.join(pieces)
|
||||
|
||||
|
||||
def get_all_class_names(obj, up_to=object):
|
||||
"""Get class names of object parent classes.
|
||||
|
||||
Iterate over all class names object is instance or subclass of,
|
||||
in order of method resolution (mro). If up_to parameter is provided,
|
||||
only name of classes that are sublcasses to that class are returned.
|
||||
"""
|
||||
if not isinstance(obj, six.class_types):
|
||||
obj = type(obj)
|
||||
for cls in obj.mro():
|
||||
if issubclass(cls, up_to):
|
||||
yield get_class_name(cls)
|
||||
|
||||
|
||||
def get_callable_name(function):
|
||||
"""Generate a name from callable.
|
||||
|
||||
Tries to do the best to guess fully qualified callable name.
|
||||
"""
|
||||
method_self = get_method_self(function)
|
||||
if method_self is not None:
|
||||
# This is a bound method.
|
||||
if isinstance(method_self, six.class_types):
|
||||
# This is a bound class method.
|
||||
im_class = method_self
|
||||
else:
|
||||
im_class = type(method_self)
|
||||
try:
|
||||
parts = (im_class.__module__, function.__qualname__)
|
||||
except AttributeError:
|
||||
parts = (im_class.__module__, im_class.__name__, function.__name__)
|
||||
elif inspect.ismethod(function) or inspect.isfunction(function):
|
||||
# This could be a function, a static method, a unbound method...
|
||||
try:
|
||||
parts = (function.__module__, function.__qualname__)
|
||||
except AttributeError:
|
||||
if hasattr(function, 'im_class'):
|
||||
# This is a unbound method, which exists only in python 2.x
|
||||
im_class = function.im_class
|
||||
parts = (im_class.__module__,
|
||||
im_class.__name__, function.__name__)
|
||||
else:
|
||||
parts = (function.__module__, function.__name__)
|
||||
else:
|
||||
im_class = type(function)
|
||||
if im_class is _TYPE_TYPE:
|
||||
im_class = function
|
||||
try:
|
||||
parts = (im_class.__module__, im_class.__qualname__)
|
||||
except AttributeError:
|
||||
parts = (im_class.__module__, im_class.__name__)
|
||||
return '.'.join(parts)
|
||||
|
||||
|
||||
def get_method_self(method):
|
||||
if not inspect.ismethod(method):
|
||||
return None
|
||||
try:
|
||||
return six.get_method_self(method)
|
||||
except AttributeError:
|
||||
return None
|
||||
|
||||
|
||||
def is_same_callback(callback1, callback2, strict=True):
|
||||
"""Returns if the two callbacks are the same."""
|
||||
if callback1 is callback2:
|
||||
# This happens when plain methods are given (or static/non-bound
|
||||
# methods).
|
||||
return True
|
||||
if callback1 == callback2:
|
||||
if not strict:
|
||||
return True
|
||||
# Two bound methods are equal if functions themselves are equal and
|
||||
# objects they are applied to are equal. This means that a bound
|
||||
# method could be the same bound method on another object if the
|
||||
# objects have __eq__ methods that return true (when in fact it is a
|
||||
# different bound method). Python u so crazy!
|
||||
try:
|
||||
self1 = six.get_method_self(callback1)
|
||||
self2 = six.get_method_self(callback2)
|
||||
return self1 is self2
|
||||
except AttributeError:
|
||||
pass
|
||||
return False
|
||||
|
||||
|
||||
def is_bound_method(method):
|
||||
"""Returns if the given method is bound to an object."""
|
||||
return bool(get_method_self(method))
|
||||
|
||||
|
||||
def is_subclass(obj, cls):
|
||||
"""Returns if the object is class and it is subclass of a given class."""
|
||||
return inspect.isclass(obj) and issubclass(obj, cls)
|
||||
|
||||
|
||||
def _get_arg_spec(function):
|
||||
if isinstance(function, _TYPE_TYPE):
|
||||
bound = True
|
||||
function = function.__init__
|
||||
elif isinstance(function, (types.FunctionType, types.MethodType)):
|
||||
bound = is_bound_method(function)
|
||||
function = getattr(function, '__wrapped__', function)
|
||||
else:
|
||||
function = function.__call__
|
||||
bound = is_bound_method(function)
|
||||
return inspect.getargspec(function), bound
|
||||
|
||||
|
||||
def get_callable_args(function, required_only=False):
|
||||
"""Get names of callable arguments.
|
||||
|
||||
Special arguments (like *args and **kwargs) are not included into
|
||||
output.
|
||||
|
||||
If required_only is True, optional arguments (with default values)
|
||||
are not included into output.
|
||||
"""
|
||||
argspec, bound = _get_arg_spec(function)
|
||||
f_args = argspec.args
|
||||
if required_only and argspec.defaults:
|
||||
f_args = f_args[:-len(argspec.defaults)]
|
||||
if bound:
|
||||
f_args = f_args[1:]
|
||||
return f_args
|
||||
|
||||
|
||||
def accepts_kwargs(function):
|
||||
"""Returns True if function accepts kwargs."""
|
||||
argspec, _bound = _get_arg_spec(function)
|
||||
return bool(argspec.keywords)
|
279
tests/test_reflection.py
Normal file
279
tests/test_reflection.py
Normal file
@ -0,0 +1,279 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslotest import base as test_base
|
||||
import six
|
||||
import testtools
|
||||
|
||||
from oslo.utils import reflection
|
||||
|
||||
|
||||
if six.PY3:
|
||||
RUNTIME_ERROR_CLASSES = ['RuntimeError', 'Exception',
|
||||
'BaseException', 'object']
|
||||
else:
|
||||
RUNTIME_ERROR_CLASSES = ['RuntimeError', 'StandardError', 'Exception',
|
||||
'BaseException', 'object']
|
||||
|
||||
|
||||
def dummy_decorator(f):
|
||||
|
||||
@six.wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
return f(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
def mere_function(a, b):
|
||||
pass
|
||||
|
||||
|
||||
def function_with_defs(a, b, optional=None):
|
||||
pass
|
||||
|
||||
|
||||
def function_with_kwargs(a, b, **kwargs):
|
||||
pass
|
||||
|
||||
|
||||
class Class(object):
|
||||
|
||||
def method(self, c, d):
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def static_method(e, f):
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def class_method(cls, g, h):
|
||||
pass
|
||||
|
||||
|
||||
class CallableClass(object):
|
||||
def __call__(self, i, j):
|
||||
pass
|
||||
|
||||
|
||||
class ClassWithInit(object):
|
||||
def __init__(self, k, l):
|
||||
pass
|
||||
|
||||
|
||||
class CallbackEqualityTest(test_base.BaseTestCase):
|
||||
def test_different_simple_callbacks(self):
|
||||
|
||||
def a():
|
||||
pass
|
||||
|
||||
def b():
|
||||
pass
|
||||
|
||||
self.assertFalse(reflection.is_same_callback(a, b))
|
||||
|
||||
def test_static_instance_callbacks(self):
|
||||
|
||||
class A(object):
|
||||
|
||||
@staticmethod
|
||||
def b(a, b, c):
|
||||
pass
|
||||
|
||||
a = A()
|
||||
b = A()
|
||||
|
||||
self.assertTrue(reflection.is_same_callback(a.b, b.b))
|
||||
|
||||
def test_different_instance_callbacks(self):
|
||||
|
||||
class A(object):
|
||||
def b(self):
|
||||
pass
|
||||
|
||||
def __eq__(self, other):
|
||||
return True
|
||||
|
||||
b = A()
|
||||
c = A()
|
||||
|
||||
self.assertFalse(reflection.is_same_callback(b.b, c.b))
|
||||
self.assertTrue(reflection.is_same_callback(b.b, c.b, strict=False))
|
||||
|
||||
|
||||
class GetCallableNameTest(test_base.BaseTestCase):
|
||||
|
||||
def test_mere_function(self):
|
||||
name = reflection.get_callable_name(mere_function)
|
||||
self.assertEqual('.'.join((__name__, 'mere_function')), name)
|
||||
|
||||
def test_method(self):
|
||||
name = reflection.get_callable_name(Class.method)
|
||||
self.assertEqual('.'.join((__name__, 'Class', 'method')), name)
|
||||
|
||||
def test_instance_method(self):
|
||||
name = reflection.get_callable_name(Class().method)
|
||||
self.assertEqual('.'.join((__name__, 'Class', 'method')), name)
|
||||
|
||||
def test_static_method(self):
|
||||
name = reflection.get_callable_name(Class.static_method)
|
||||
if six.PY3:
|
||||
self.assertEqual('.'.join((__name__, 'Class', 'static_method')),
|
||||
name)
|
||||
else:
|
||||
# NOTE(imelnikov): static method are just functions, class name
|
||||
# is not recorded anywhere in them.
|
||||
self.assertEqual('.'.join((__name__, 'static_method')), name)
|
||||
|
||||
def test_class_method(self):
|
||||
name = reflection.get_callable_name(Class.class_method)
|
||||
self.assertEqual('.'.join((__name__, 'Class', 'class_method')), name)
|
||||
|
||||
def test_constructor(self):
|
||||
name = reflection.get_callable_name(Class)
|
||||
self.assertEqual('.'.join((__name__, 'Class')), name)
|
||||
|
||||
def test_callable_class(self):
|
||||
name = reflection.get_callable_name(CallableClass())
|
||||
self.assertEqual('.'.join((__name__, 'CallableClass')), name)
|
||||
|
||||
def test_callable_class_call(self):
|
||||
name = reflection.get_callable_name(CallableClass().__call__)
|
||||
self.assertEqual('.'.join((__name__, 'CallableClass',
|
||||
'__call__')), name)
|
||||
|
||||
|
||||
# These extended/special case tests only work on python 3, due to python 2
|
||||
# being broken/incorrect with regard to these special cases...
|
||||
@testtools.skipIf(not six.PY3, 'python 3.x is not currently available')
|
||||
class GetCallableNameTestExtended(test_base.BaseTestCase):
|
||||
# Tests items in http://legacy.python.org/dev/peps/pep-3155/
|
||||
|
||||
class InnerCallableClass(object):
|
||||
def __call__(self):
|
||||
pass
|
||||
|
||||
def test_inner_callable_class(self):
|
||||
obj = self.InnerCallableClass()
|
||||
name = reflection.get_callable_name(obj.__call__)
|
||||
expected_name = '.'.join((__name__, 'GetCallableNameTestExtended',
|
||||
'InnerCallableClass', '__call__'))
|
||||
self.assertEqual(expected_name, name)
|
||||
|
||||
def test_inner_callable_function(self):
|
||||
def a():
|
||||
|
||||
def b():
|
||||
pass
|
||||
|
||||
return b
|
||||
|
||||
name = reflection.get_callable_name(a())
|
||||
expected_name = '.'.join((__name__, 'GetCallableNameTestExtended',
|
||||
'test_inner_callable_function', '<locals>',
|
||||
'a', '<locals>', 'b'))
|
||||
self.assertEqual(expected_name, name)
|
||||
|
||||
def test_inner_class(self):
|
||||
obj = self.InnerCallableClass()
|
||||
name = reflection.get_callable_name(obj)
|
||||
expected_name = '.'.join((__name__,
|
||||
'GetCallableNameTestExtended',
|
||||
'InnerCallableClass'))
|
||||
self.assertEqual(expected_name, name)
|
||||
|
||||
|
||||
class GetCallableArgsTest(test_base.BaseTestCase):
|
||||
|
||||
def test_mere_function(self):
|
||||
result = reflection.get_callable_args(mere_function)
|
||||
self.assertEqual(['a', 'b'], result)
|
||||
|
||||
def test_function_with_defaults(self):
|
||||
result = reflection.get_callable_args(function_with_defs)
|
||||
self.assertEqual(['a', 'b', 'optional'], result)
|
||||
|
||||
def test_required_only(self):
|
||||
result = reflection.get_callable_args(function_with_defs,
|
||||
required_only=True)
|
||||
self.assertEqual(['a', 'b'], result)
|
||||
|
||||
def test_method(self):
|
||||
result = reflection.get_callable_args(Class.method)
|
||||
self.assertEqual(['self', 'c', 'd'], result)
|
||||
|
||||
def test_instance_method(self):
|
||||
result = reflection.get_callable_args(Class().method)
|
||||
self.assertEqual(['c', 'd'], result)
|
||||
|
||||
def test_class_method(self):
|
||||
result = reflection.get_callable_args(Class.class_method)
|
||||
self.assertEqual(['g', 'h'], result)
|
||||
|
||||
def test_class_constructor(self):
|
||||
result = reflection.get_callable_args(ClassWithInit)
|
||||
self.assertEqual(['k', 'l'], result)
|
||||
|
||||
def test_class_with_call(self):
|
||||
result = reflection.get_callable_args(CallableClass())
|
||||
self.assertEqual(['i', 'j'], result)
|
||||
|
||||
def test_decorators_work(self):
|
||||
@dummy_decorator
|
||||
def special_fun(x, y):
|
||||
pass
|
||||
result = reflection.get_callable_args(special_fun)
|
||||
self.assertEqual(['x', 'y'], result)
|
||||
|
||||
|
||||
class AcceptsKwargsTest(test_base.BaseTestCase):
|
||||
|
||||
def test_no_kwargs(self):
|
||||
self.assertEqual(False, reflection.accepts_kwargs(mere_function))
|
||||
|
||||
def test_with_kwargs(self):
|
||||
self.assertEqual(True, reflection.accepts_kwargs(function_with_kwargs))
|
||||
|
||||
|
||||
class GetClassNameTest(test_base.BaseTestCase):
|
||||
|
||||
def test_std_exception(self):
|
||||
name = reflection.get_class_name(RuntimeError)
|
||||
self.assertEqual('RuntimeError', name)
|
||||
|
||||
def test_class(self):
|
||||
name = reflection.get_class_name(Class)
|
||||
self.assertEqual('.'.join((__name__, 'Class')), name)
|
||||
|
||||
def test_instance(self):
|
||||
name = reflection.get_class_name(Class())
|
||||
self.assertEqual('.'.join((__name__, 'Class')), name)
|
||||
|
||||
def test_int(self):
|
||||
name = reflection.get_class_name(42)
|
||||
self.assertEqual('int', name)
|
||||
|
||||
|
||||
class GetAllClassNamesTest(test_base.BaseTestCase):
|
||||
|
||||
def test_std_class(self):
|
||||
names = list(reflection.get_all_class_names(RuntimeError))
|
||||
self.assertEqual(RUNTIME_ERROR_CLASSES, names)
|
||||
|
||||
def test_std_class_up_to(self):
|
||||
names = list(reflection.get_all_class_names(RuntimeError,
|
||||
up_to=Exception))
|
||||
self.assertEqual(RUNTIME_ERROR_CLASSES[:-2], names)
|
Loading…
x
Reference in New Issue
Block a user