Merge "add tests for _query_to_kwargs func"

This commit is contained in:
Jenkins 2013-09-12 10:21:00 +00:00 committed by Gerrit Code Review
commit d58a133597
2 changed files with 166 additions and 126 deletions

View File

@ -250,8 +250,8 @@ def _query_to_kwargs(query, db_func, internal_keys=[], headers=None):
'project_id': 'project',
'resource_id': 'resource'}
stamp = {}
trans = {}
metaquery = {}
kwargs = {}
for i in query:
if i.field == 'timestamp':
if i.op in ('lt', 'le'):
@ -261,22 +261,29 @@ def _query_to_kwargs(query, db_func, internal_keys=[], headers=None):
stamp['start_timestamp'] = i.value
stamp['start_timestamp_op'] = i.op
else:
LOG.warn('_query_to_kwargs ignoring %r unexpected op %r"' %
(i.field, i.op))
raise wsme.exc.InvalidInput('op', i.op,
'unimplemented operator for %s' %
i.field)
else:
if i.op != 'eq':
LOG.warn('_query_to_kwargs ignoring %r unimplemented op %r' %
(i.field, i.op))
elif i.field == 'search_offset':
stamp['search_offset'] = i.value
elif i.field.startswith('metadata.'):
metaquery[i.field] = i._get_value_as_type()
elif i.field.startswith('resource_metadata.'):
metaquery[i.field[9:]] = i._get_value_as_type()
if i.op == 'eq':
if i.field == 'search_offset':
stamp['search_offset'] = i.value
elif i.field.startswith('metadata.'):
metaquery[i.field] = i._get_value_as_type()
elif i.field.startswith('resource_metadata.'):
metaquery[i.field[9:]] = i._get_value_as_type()
else:
key = translation.get(i.field, i.field)
if key not in valid_keys:
msg = ("unrecognized field in query: %s, "
"valid keys: %s") % (query, valid_keys)
raise wsme.exc.UnknownArgument(key, msg)
kwargs[key] = i.value
else:
trans[translation.get(i.field, i.field)] = i.value
raise wsme.exc.InvalidInput('op', i.op,
'unimplemented operator for %s' %
i.field)
kwargs = {}
if metaquery and 'metaquery' in valid_keys:
kwargs['metaquery'] = metaquery
if stamp:
@ -295,14 +302,6 @@ def _query_to_kwargs(query, db_func, internal_keys=[], headers=None):
if 'end_timestamp_op' in stamp:
kwargs['end_timestamp_op'] = stamp['end_timestamp_op']
if trans:
for k in trans:
if k not in valid_keys:
msg = ("unrecognized field in query: %s, valid keys: %s" %
(query, valid_keys))
raise wsme.exc.UnknownArgument(k, msg)
kwargs[k] = trans[k]
return kwargs

View File

@ -14,14 +14,14 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Test the methods related to query."""
import datetime
import wsme
from ceilometer import storage
from ceilometer.api.controllers import v2 as api
from ceilometer.api.controllers.v2 import Query
from ceilometer.api.controllers.v2 import _query_to_kwargs
from ceilometer.openstack.common import timeutils
from ceilometer.tests import base as tests_base
@ -135,108 +135,6 @@ class TestQuery(tests_base.TestCase):
expected = value
self.assertEqual(query._get_value_as_type(), expected)
def _fake_db_func(self, resource, on_behalf_of, x, y,
metaquery={}, user=None, project=None,
start_timestamp=None, start_timestamp_op=None,
end_timestamp=None, end_timestamp_op=None, **kwargs):
pass
def test_query_to_kwargs_exclude_internal(self):
queries = [Query(field=f,
op='eq',
value='fake',
type='string') for f in ['y', 'on_behalf_of', 'x']]
self.assertRaises(wsme.exc.ClientSideError,
_query_to_kwargs,
queries,
self._fake_db_func,
headers={'X-ProjectId': 'foobar'},
internal_keys=['on_behalf_of'])
def test_query_to_kwargs_self_always_excluded(self):
queries = [Query(field=f,
op='eq',
value='fake',
type='string') for f in ['x', 'y']]
kwargs = _query_to_kwargs(queries,
self._fake_db_func,
headers={'X-ProjectId': 'foobar'})
self.assertFalse('self' in kwargs)
def test_query_to_kwargs_timestamp_mapping(self):
start = datetime.datetime.utcnow()
end = datetime.datetime.utcnow()
queries = [Query(field='timestamp',
op='gt',
value=start.isoformat(),
type='string'),
Query(field='timestamp',
op='le',
value=end.isoformat(),
type='string')]
kwargs = _query_to_kwargs(queries,
self._fake_db_func,
headers={'X-ProjectId': 'foobar'})
self.assertEqual(kwargs.get('start_timestamp'), start)
self.assertEqual(kwargs.get('start_timestamp_op'), 'gt')
self.assertEqual(kwargs.get('end_timestamp'), end)
self.assertEqual(kwargs.get('end_timestamp_op'), 'le')
def test_query_to_kwargs_non_equality_on_metadata(self):
queries = [Query(field='resource_metadata.image_id',
op='gt',
value='image',
type='string'),
Query(field='metadata.ramdisk_id',
op='le',
value='ramdisk',
type='string')]
kwargs = _query_to_kwargs(queries,
self._fake_db_func,
headers={'X-ProjectId': 'foobar'})
self.assertFalse('metaquery' in kwargs)
def test_query_to_kwargs_equality_on_metadata(self):
queries = [Query(field='resource_metadata.image_id',
op='eq',
value='image',
type='string'),
Query(field='metadata.ramdisk_id',
op='eq',
value='ramdisk',
type='string')]
kwargs = _query_to_kwargs(queries,
self._fake_db_func,
headers={'X-ProjectId': 'foobar'})
self.assertTrue('metaquery' in kwargs)
metaquery = kwargs['metaquery']
self.assertEqual(metaquery.get('metadata.image_id'), 'image')
self.assertEqual(metaquery.get('metadata.ramdisk_id'), 'ramdisk')
def test_query_to_kwargs_translation(self):
queries = [Query(field=f,
op='eq',
value='fake_%s' % f,
type='string') for f in ['user_id',
'project_id',
'resource_id']]
kwargs = _query_to_kwargs(queries,
self._fake_db_func,
headers={'X-ProjectId': 'foobar'})
for o in ['user', 'project', 'resource']:
self.assertEqual(kwargs.get(o), 'fake_%s_id' % o)
def test_query_to_kwargs_unrecognized(self):
queries = [Query(field=f,
op='eq',
value='fake',
type='string') for f in ['y', 'z', 'x']]
self.assertRaises(wsme.exc.ClientSideError,
_query_to_kwargs,
queries,
self._fake_db_func,
headers={'X-ProjectId': 'foobar'})
class TestValidateGroupByFields(tests_base.TestCase):
@ -265,3 +163,146 @@ class TestValidateGroupByFields(tests_base.TestCase):
api._validate_groupby_fields(['user_id', 'source', 'user_id'])
)
self.assertEqual(result, set(['user_id', 'source']))
class TestQueryToKwArgs(tests_base.TestCase):
def setUp(self):
super(TestQueryToKwArgs, self).setUp()
self.stubs.Set(api, '_sanitize_query', lambda x, y, **z: x)
def test_sample_filter_single(self):
q = [Query(field='user_id',
op='eq',
value='uid')]
kwargs = api._query_to_kwargs(q, storage.SampleFilter.__init__)
self.assertIn('user', kwargs)
self.assertEqual(len(kwargs), 1)
self.assertEqual(kwargs['user'], 'uid')
def test_sample_filter_multi(self):
q = [Query(field='user_id',
op='eq',
value='uid'),
Query(field='project_id',
op='eq',
value='pid'),
Query(field='resource_id',
op='eq',
value='rid'),
Query(field='source',
op='eq',
value='source_name'),
Query(field='meter',
op='eq',
value='meter_name')]
kwargs = api._query_to_kwargs(q, storage.SampleFilter.__init__)
self.assertEqual(len(kwargs), 5)
self.assertEqual(kwargs['user'], 'uid')
self.assertEqual(kwargs['project'], 'pid')
self.assertEqual(kwargs['resource'], 'rid')
self.assertEqual(kwargs['source'], 'source_name')
self.assertEqual(kwargs['meter'], 'meter_name')
def test_sample_filter_timestamp(self):
ts_start = timeutils.utcnow()
ts_end = ts_start + datetime.timedelta(minutes=5)
q = [Query(field='timestamp',
op='lt',
value=str(ts_end)),
Query(field='timestamp',
op='gt',
value=str(ts_start))]
kwargs = api._query_to_kwargs(q, storage.SampleFilter.__init__)
self.assertEqual(len(kwargs), 4)
self.assertEqual(kwargs['start'], ts_start)
self.assertEqual(kwargs['end'], ts_end)
self.assertEqual(kwargs['start_timestamp_op'], 'gt')
self.assertEqual(kwargs['end_timestamp_op'], 'lt')
def test_sample_filter_meta(self):
q = [Query(field='metadata.size',
op='eq',
value='20'),
Query(field='resource_metadata.id',
op='eq',
value='meta_id')]
kwargs = api._query_to_kwargs(q, storage.SampleFilter.__init__)
self.assertEqual(len(kwargs), 1)
self.assertEqual(len(kwargs['metaquery']), 2)
self.assertEqual(kwargs['metaquery']['metadata.size'], 20)
self.assertEqual(kwargs['metaquery']['metadata.id'], 'meta_id')
def test_sample_filter_non_equality_on_metadata(self):
queries = [Query(field='resource_metadata.image_id',
op='gt',
value='image',
type='string'),
Query(field='metadata.ramdisk_id',
op='le',
value='ramdisk',
type='string')]
self.assertRaises(
wsme.exc.InvalidInput,
api._query_to_kwargs,
queries,
storage.SampleFilter.__init__,
headers={'X-ProjectId': 'foobar'})
def test_sample_filter_invalid_field(self):
q = [Query(field='invalid',
op='eq',
value='20')]
self.assertRaises(
wsme.exc.UnknownArgument,
api._query_to_kwargs, q, storage.SampleFilter.__init__)
def test_sample_filter_invalid_op(self):
q = [Query(field='user_id',
op='lt',
value='20')]
self.assertRaises(
wsme.exc.InvalidInput,
api._query_to_kwargs, q, storage.SampleFilter.__init__)
def test_sample_filter_timestamp_invalid_op(self):
ts_start = timeutils.utcnow()
q = [Query(field='timestamp',
op='eq',
value=str(ts_start))]
self.assertRaises(
wsme.exc.InvalidInput,
api._query_to_kwargs, q, storage.SampleFilter.__init__)
def test_sample_filter_exclude_internal(self):
queries = [Query(field=f,
op='eq',
value='fake',
type='string') for f in ['y', 'on_behalf_of', 'x']]
self.assertRaises(wsme.exc.ClientSideError,
api._query_to_kwargs,
queries,
storage.SampleFilter.__init__,
headers={'X-ProjectId': 'foobar'},
internal_keys=['on_behalf_of'])
def test_sample_filter_self_always_excluded(self):
queries = [Query(field='user_id',
op='eq',
value='20')]
kwargs = api._query_to_kwargs(queries,
storage.SampleFilter.__init__,
headers={'X-ProjectId': 'foobar'})
self.assertFalse('self' in kwargs)
def test_sample_filter_translation(self):
queries = [Query(field=f,
op='eq',
value='fake_%s' % f,
type='string') for f in ['user_id',
'project_id',
'resource_id']]
kwargs = api._query_to_kwargs(queries,
storage.SampleFilter.__init__,
headers={'X-ProjectId': 'foobar'})
for o in ['user', 'project', 'resource']:
self.assertEqual(kwargs.get(o), 'fake_%s_id' % o)