Move over a reflection module that taskflow uses
Taskflow has a quite useful reflection module that it would appear others would benefit from having access to, so instead of keeping it in taskflow, move it over to oslo.utils so that others can benefit from it. Potential users: - Oslo.db - Cinder - Taskflow (where the code used to exist) - Others doing name lookup or other reflection functionality Change-Id: I9fbdd86966afddb862cb7faed2f88298a9f956cf
This commit is contained in:
parent
281df4b601
commit
614a849400
208
oslo/utils/reflection.py
Normal file
208
oslo/utils/reflection.py
Normal file
@ -0,0 +1,208 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import inspect
|
||||
import types
|
||||
|
||||
import six
|
||||
|
||||
try:
|
||||
_TYPE_TYPE = types.TypeType
|
||||
except AttributeError:
|
||||
_TYPE_TYPE = type
|
||||
|
||||
# See: https://docs.python.org/2/library/__builtin__.html#module-__builtin__
|
||||
# and see https://docs.python.org/2/reference/executionmodel.html (and likely
|
||||
# others)...
|
||||
_BUILTIN_MODULES = ('builtins', '__builtin__', '__builtins__', 'exceptions')
|
||||
|
||||
|
||||
def _get_members(obj, exclude_hidden):
|
||||
"""Yields the members of an object, filtering by hidden/not hidden."""
|
||||
for (name, value) in inspect.getmembers(obj):
|
||||
if name.startswith("_") and exclude_hidden:
|
||||
continue
|
||||
yield (name, value)
|
||||
|
||||
|
||||
def get_member_names(obj, exclude_hidden=True):
|
||||
"""Get all the member names for a object."""
|
||||
return [name for (name, _obj) in _get_members(obj, exclude_hidden)]
|
||||
|
||||
|
||||
def get_class_name(obj, fully_qualified=True):
|
||||
"""Get class name for object.
|
||||
|
||||
If object is a type, fully qualified name of the type is returned.
|
||||
Else, fully qualified name of the type of the object is returned.
|
||||
For builtin types, just name is returned.
|
||||
"""
|
||||
if not isinstance(obj, six.class_types):
|
||||
obj = type(obj)
|
||||
try:
|
||||
built_in = obj.__module__ in _BUILTIN_MODULES
|
||||
except AttributeError:
|
||||
pass
|
||||
else:
|
||||
if built_in:
|
||||
try:
|
||||
return obj.__qualname__
|
||||
except AttributeError:
|
||||
return obj.__name__
|
||||
pieces = []
|
||||
try:
|
||||
pieces.append(obj.__qualname__)
|
||||
except AttributeError:
|
||||
pieces.append(obj.__name__)
|
||||
if fully_qualified:
|
||||
try:
|
||||
pieces.insert(0, obj.__module__)
|
||||
except AttributeError:
|
||||
pass
|
||||
return '.'.join(pieces)
|
||||
|
||||
|
||||
def get_all_class_names(obj, up_to=object):
|
||||
"""Get class names of object parent classes.
|
||||
|
||||
Iterate over all class names object is instance or subclass of,
|
||||
in order of method resolution (mro). If up_to parameter is provided,
|
||||
only name of classes that are sublcasses to that class are returned.
|
||||
"""
|
||||
if not isinstance(obj, six.class_types):
|
||||
obj = type(obj)
|
||||
for cls in obj.mro():
|
||||
if issubclass(cls, up_to):
|
||||
yield get_class_name(cls)
|
||||
|
||||
|
||||
def get_callable_name(function):
|
||||
"""Generate a name from callable.
|
||||
|
||||
Tries to do the best to guess fully qualified callable name.
|
||||
"""
|
||||
method_self = get_method_self(function)
|
||||
if method_self is not None:
|
||||
# This is a bound method.
|
||||
if isinstance(method_self, six.class_types):
|
||||
# This is a bound class method.
|
||||
im_class = method_self
|
||||
else:
|
||||
im_class = type(method_self)
|
||||
try:
|
||||
parts = (im_class.__module__, function.__qualname__)
|
||||
except AttributeError:
|
||||
parts = (im_class.__module__, im_class.__name__, function.__name__)
|
||||
elif inspect.ismethod(function) or inspect.isfunction(function):
|
||||
# This could be a function, a static method, a unbound method...
|
||||
try:
|
||||
parts = (function.__module__, function.__qualname__)
|
||||
except AttributeError:
|
||||
if hasattr(function, 'im_class'):
|
||||
# This is a unbound method, which exists only in python 2.x
|
||||
im_class = function.im_class
|
||||
parts = (im_class.__module__,
|
||||
im_class.__name__, function.__name__)
|
||||
else:
|
||||
parts = (function.__module__, function.__name__)
|
||||
else:
|
||||
im_class = type(function)
|
||||
if im_class is _TYPE_TYPE:
|
||||
im_class = function
|
||||
try:
|
||||
parts = (im_class.__module__, im_class.__qualname__)
|
||||
except AttributeError:
|
||||
parts = (im_class.__module__, im_class.__name__)
|
||||
return '.'.join(parts)
|
||||
|
||||
|
||||
def get_method_self(method):
|
||||
if not inspect.ismethod(method):
|
||||
return None
|
||||
try:
|
||||
return six.get_method_self(method)
|
||||
except AttributeError:
|
||||
return None
|
||||
|
||||
|
||||
def is_same_callback(callback1, callback2, strict=True):
|
||||
"""Returns if the two callbacks are the same."""
|
||||
if callback1 is callback2:
|
||||
# This happens when plain methods are given (or static/non-bound
|
||||
# methods).
|
||||
return True
|
||||
if callback1 == callback2:
|
||||
if not strict:
|
||||
return True
|
||||
# Two bound methods are equal if functions themselves are equal and
|
||||
# objects they are applied to are equal. This means that a bound
|
||||
# method could be the same bound method on another object if the
|
||||
# objects have __eq__ methods that return true (when in fact it is a
|
||||
# different bound method). Python u so crazy!
|
||||
try:
|
||||
self1 = six.get_method_self(callback1)
|
||||
self2 = six.get_method_self(callback2)
|
||||
return self1 is self2
|
||||
except AttributeError:
|
||||
pass
|
||||
return False
|
||||
|
||||
|
||||
def is_bound_method(method):
|
||||
"""Returns if the given method is bound to an object."""
|
||||
return bool(get_method_self(method))
|
||||
|
||||
|
||||
def is_subclass(obj, cls):
|
||||
"""Returns if the object is class and it is subclass of a given class."""
|
||||
return inspect.isclass(obj) and issubclass(obj, cls)
|
||||
|
||||
|
||||
def _get_arg_spec(function):
|
||||
if isinstance(function, _TYPE_TYPE):
|
||||
bound = True
|
||||
function = function.__init__
|
||||
elif isinstance(function, (types.FunctionType, types.MethodType)):
|
||||
bound = is_bound_method(function)
|
||||
function = getattr(function, '__wrapped__', function)
|
||||
else:
|
||||
function = function.__call__
|
||||
bound = is_bound_method(function)
|
||||
return inspect.getargspec(function), bound
|
||||
|
||||
|
||||
def get_callable_args(function, required_only=False):
|
||||
"""Get names of callable arguments.
|
||||
|
||||
Special arguments (like *args and **kwargs) are not included into
|
||||
output.
|
||||
|
||||
If required_only is True, optional arguments (with default values)
|
||||
are not included into output.
|
||||
"""
|
||||
argspec, bound = _get_arg_spec(function)
|
||||
f_args = argspec.args
|
||||
if required_only and argspec.defaults:
|
||||
f_args = f_args[:-len(argspec.defaults)]
|
||||
if bound:
|
||||
f_args = f_args[1:]
|
||||
return f_args
|
||||
|
||||
|
||||
def accepts_kwargs(function):
|
||||
"""Returns True if function accepts kwargs."""
|
||||
argspec, _bound = _get_arg_spec(function)
|
||||
return bool(argspec.keywords)
|
279
tests/test_reflection.py
Normal file
279
tests/test_reflection.py
Normal file
@ -0,0 +1,279 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslotest import base as test_base
|
||||
import six
|
||||
import testtools
|
||||
|
||||
from oslo.utils import reflection
|
||||
|
||||
|
||||
if six.PY3:
|
||||
RUNTIME_ERROR_CLASSES = ['RuntimeError', 'Exception',
|
||||
'BaseException', 'object']
|
||||
else:
|
||||
RUNTIME_ERROR_CLASSES = ['RuntimeError', 'StandardError', 'Exception',
|
||||
'BaseException', 'object']
|
||||
|
||||
|
||||
def dummy_decorator(f):
|
||||
|
||||
@six.wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
return f(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
def mere_function(a, b):
|
||||
pass
|
||||
|
||||
|
||||
def function_with_defs(a, b, optional=None):
|
||||
pass
|
||||
|
||||
|
||||
def function_with_kwargs(a, b, **kwargs):
|
||||
pass
|
||||
|
||||
|
||||
class Class(object):
|
||||
|
||||
def method(self, c, d):
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def static_method(e, f):
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def class_method(cls, g, h):
|
||||
pass
|
||||
|
||||
|
||||
class CallableClass(object):
|
||||
def __call__(self, i, j):
|
||||
pass
|
||||
|
||||
|
||||
class ClassWithInit(object):
|
||||
def __init__(self, k, l):
|
||||
pass
|
||||
|
||||
|
||||
class CallbackEqualityTest(test_base.BaseTestCase):
|
||||
def test_different_simple_callbacks(self):
|
||||
|
||||
def a():
|
||||
pass
|
||||
|
||||
def b():
|
||||
pass
|
||||
|
||||
self.assertFalse(reflection.is_same_callback(a, b))
|
||||
|
||||
def test_static_instance_callbacks(self):
|
||||
|
||||
class A(object):
|
||||
|
||||
@staticmethod
|
||||
def b(a, b, c):
|
||||
pass
|
||||
|
||||
a = A()
|
||||
b = A()
|
||||
|
||||
self.assertTrue(reflection.is_same_callback(a.b, b.b))
|
||||
|
||||
def test_different_instance_callbacks(self):
|
||||
|
||||
class A(object):
|
||||
def b(self):
|
||||
pass
|
||||
|
||||
def __eq__(self, other):
|
||||
return True
|
||||
|
||||
b = A()
|
||||
c = A()
|
||||
|
||||
self.assertFalse(reflection.is_same_callback(b.b, c.b))
|
||||
self.assertTrue(reflection.is_same_callback(b.b, c.b, strict=False))
|
||||
|
||||
|
||||
class GetCallableNameTest(test_base.BaseTestCase):
|
||||
|
||||
def test_mere_function(self):
|
||||
name = reflection.get_callable_name(mere_function)
|
||||
self.assertEqual('.'.join((__name__, 'mere_function')), name)
|
||||
|
||||
def test_method(self):
|
||||
name = reflection.get_callable_name(Class.method)
|
||||
self.assertEqual('.'.join((__name__, 'Class', 'method')), name)
|
||||
|
||||
def test_instance_method(self):
|
||||
name = reflection.get_callable_name(Class().method)
|
||||
self.assertEqual('.'.join((__name__, 'Class', 'method')), name)
|
||||
|
||||
def test_static_method(self):
|
||||
name = reflection.get_callable_name(Class.static_method)
|
||||
if six.PY3:
|
||||
self.assertEqual('.'.join((__name__, 'Class', 'static_method')),
|
||||
name)
|
||||
else:
|
||||
# NOTE(imelnikov): static method are just functions, class name
|
||||
# is not recorded anywhere in them.
|
||||
self.assertEqual('.'.join((__name__, 'static_method')), name)
|
||||
|
||||
def test_class_method(self):
|
||||
name = reflection.get_callable_name(Class.class_method)
|
||||
self.assertEqual('.'.join((__name__, 'Class', 'class_method')), name)
|
||||
|
||||
def test_constructor(self):
|
||||
name = reflection.get_callable_name(Class)
|
||||
self.assertEqual('.'.join((__name__, 'Class')), name)
|
||||
|
||||
def test_callable_class(self):
|
||||
name = reflection.get_callable_name(CallableClass())
|
||||
self.assertEqual('.'.join((__name__, 'CallableClass')), name)
|
||||
|
||||
def test_callable_class_call(self):
|
||||
name = reflection.get_callable_name(CallableClass().__call__)
|
||||
self.assertEqual('.'.join((__name__, 'CallableClass',
|
||||
'__call__')), name)
|
||||
|
||||
|
||||
# These extended/special case tests only work on python 3, due to python 2
|
||||
# being broken/incorrect with regard to these special cases...
|
||||
@testtools.skipIf(not six.PY3, 'python 3.x is not currently available')
|
||||
class GetCallableNameTestExtended(test_base.BaseTestCase):
|
||||
# Tests items in http://legacy.python.org/dev/peps/pep-3155/
|
||||
|
||||
class InnerCallableClass(object):
|
||||
def __call__(self):
|
||||
pass
|
||||
|
||||
def test_inner_callable_class(self):
|
||||
obj = self.InnerCallableClass()
|
||||
name = reflection.get_callable_name(obj.__call__)
|
||||
expected_name = '.'.join((__name__, 'GetCallableNameTestExtended',
|
||||
'InnerCallableClass', '__call__'))
|
||||
self.assertEqual(expected_name, name)
|
||||
|
||||
def test_inner_callable_function(self):
|
||||
def a():
|
||||
|
||||
def b():
|
||||
pass
|
||||
|
||||
return b
|
||||
|
||||
name = reflection.get_callable_name(a())
|
||||
expected_name = '.'.join((__name__, 'GetCallableNameTestExtended',
|
||||
'test_inner_callable_function', '<locals>',
|
||||
'a', '<locals>', 'b'))
|
||||
self.assertEqual(expected_name, name)
|
||||
|
||||
def test_inner_class(self):
|
||||
obj = self.InnerCallableClass()
|
||||
name = reflection.get_callable_name(obj)
|
||||
expected_name = '.'.join((__name__,
|
||||
'GetCallableNameTestExtended',
|
||||
'InnerCallableClass'))
|
||||
self.assertEqual(expected_name, name)
|
||||
|
||||
|
||||
class GetCallableArgsTest(test_base.BaseTestCase):
|
||||
|
||||
def test_mere_function(self):
|
||||
result = reflection.get_callable_args(mere_function)
|
||||
self.assertEqual(['a', 'b'], result)
|
||||
|
||||
def test_function_with_defaults(self):
|
||||
result = reflection.get_callable_args(function_with_defs)
|
||||
self.assertEqual(['a', 'b', 'optional'], result)
|
||||
|
||||
def test_required_only(self):
|
||||
result = reflection.get_callable_args(function_with_defs,
|
||||
required_only=True)
|
||||
self.assertEqual(['a', 'b'], result)
|
||||
|
||||
def test_method(self):
|
||||
result = reflection.get_callable_args(Class.method)
|
||||
self.assertEqual(['self', 'c', 'd'], result)
|
||||
|
||||
def test_instance_method(self):
|
||||
result = reflection.get_callable_args(Class().method)
|
||||
self.assertEqual(['c', 'd'], result)
|
||||
|
||||
def test_class_method(self):
|
||||
result = reflection.get_callable_args(Class.class_method)
|
||||
self.assertEqual(['g', 'h'], result)
|
||||
|
||||
def test_class_constructor(self):
|
||||
result = reflection.get_callable_args(ClassWithInit)
|
||||
self.assertEqual(['k', 'l'], result)
|
||||
|
||||
def test_class_with_call(self):
|
||||
result = reflection.get_callable_args(CallableClass())
|
||||
self.assertEqual(['i', 'j'], result)
|
||||
|
||||
def test_decorators_work(self):
|
||||
@dummy_decorator
|
||||
def special_fun(x, y):
|
||||
pass
|
||||
result = reflection.get_callable_args(special_fun)
|
||||
self.assertEqual(['x', 'y'], result)
|
||||
|
||||
|
||||
class AcceptsKwargsTest(test_base.BaseTestCase):
|
||||
|
||||
def test_no_kwargs(self):
|
||||
self.assertEqual(False, reflection.accepts_kwargs(mere_function))
|
||||
|
||||
def test_with_kwargs(self):
|
||||
self.assertEqual(True, reflection.accepts_kwargs(function_with_kwargs))
|
||||
|
||||
|
||||
class GetClassNameTest(test_base.BaseTestCase):
|
||||
|
||||
def test_std_exception(self):
|
||||
name = reflection.get_class_name(RuntimeError)
|
||||
self.assertEqual('RuntimeError', name)
|
||||
|
||||
def test_class(self):
|
||||
name = reflection.get_class_name(Class)
|
||||
self.assertEqual('.'.join((__name__, 'Class')), name)
|
||||
|
||||
def test_instance(self):
|
||||
name = reflection.get_class_name(Class())
|
||||
self.assertEqual('.'.join((__name__, 'Class')), name)
|
||||
|
||||
def test_int(self):
|
||||
name = reflection.get_class_name(42)
|
||||
self.assertEqual('int', name)
|
||||
|
||||
|
||||
class GetAllClassNamesTest(test_base.BaseTestCase):
|
||||
|
||||
def test_std_class(self):
|
||||
names = list(reflection.get_all_class_names(RuntimeError))
|
||||
self.assertEqual(RUNTIME_ERROR_CLASSES, names)
|
||||
|
||||
def test_std_class_up_to(self):
|
||||
names = list(reflection.get_all_class_names(RuntimeError,
|
||||
up_to=Exception))
|
||||
self.assertEqual(RUNTIME_ERROR_CLASSES[:-2], names)
|
Loading…
x
Reference in New Issue
Block a user