Fix to return latest resource metadata

Addresses the latest resource metadata not being returned in
the MongoDB, SQLAlchemy, DB2, and HBase drivers. A schema
change was required for HBase, because it was overwriting
historical metadata.

Closes-Bug: #1208547
Related-Bug: #1201701
Implements: blueprint hbase-meter-table-enhancement
Change-Id: Ib09e21cbc7bbd45a6ecc321403e9947df837e14b
This commit is contained in:
Thomas Maddox 2013-08-20 18:10:17 +00:00
parent ab452f8dd1
commit 64f17d6552
7 changed files with 397 additions and 257 deletions

View File

@ -54,7 +54,7 @@ def _handle_sort_key(model_name, sort_key=None):
"""
sort_keys_extra = {'alarm': ['name', 'user_id', 'project_id'],
'meter': ['user_id', 'project_id'],
'resource': ['user_id', 'project_id'],
'resource': ['user_id', 'project_id', 'timestamp'],
}
sort_keys = sort_keys_extra[model_name]

View File

@ -23,6 +23,7 @@
import copy
import weakref
import itertools
import bson.code
import bson.objectid
@ -179,6 +180,8 @@ class Connection(base.Connection):
'duration_end': 1,
}
SORT_OPERATION_MAP = {'desc': pymongo.DESCENDING, 'asc': pymongo.ASCENDING}
def __init__(self, conf):
url = conf.database.connection
@ -211,6 +214,27 @@ class Connection(base.Connection):
self.upgrade()
@classmethod
def _build_sort_instructions(cls, sort_keys=[], sort_dir='desc'):
"""Returns a sort_instruction.
Sort instructions are used in the query to determine what attributes
to sort on and what direction to use.
:param q: The query dict passed in.
:param sort_keys: array of attributes by which results be sorted.
:param sort_dir: direction in which results be sorted (asc, desc).
:return: sort parameters
"""
sort_instructions = []
_sort_dir = cls.SORT_OPERATION_MAP.get(
sort_dir, cls.SORT_OPERATION_MAP['desc'])
for _sort_key in sort_keys:
_instruction = (_sort_key, _sort_dir)
sort_instructions.append(_instruction)
return sort_instructions
def upgrade(self, version=None):
# Establish indexes
#
@ -395,30 +419,36 @@ class Connection(base.Connection):
if ts_range:
q['timestamp'] = ts_range
resource_ids = self.db.meter.find(q).distinct('resource_id')
if self._using_mongodb:
q = {'_id': {'$in': resource_ids}}
else:
q = {'_id': {'$in': [m['_id'] for m in resource_ids]}}
sort_keys = base._handle_sort_key('resource', 'timestamp')
sort_keys.insert(0, 'resource_id')
sort_instructions = self._build_sort_instructions(sort_keys=sort_keys,
sort_dir='desc')
resource = lambda x: x['resource_id']
meters = self.db.meter.find(q, sort=sort_instructions)
for resource_id, r_meters in itertools.groupby(meters, key=resource):
resource_meters = []
# Because we have to know first/last timestamp, and we need a full
# list of references to the resource's meters, we need a tuple
# here.
r_meters = tuple(r_meters)
for meter in r_meters:
resource_meters.append(models.ResourceMeter(
counter_name=meter['counter_name'],
counter_type=meter['counter_type'],
counter_unit=meter.get('counter_unit', ''))
)
latest_meter = r_meters[0]
last_ts = latest_meter['timestamp']
first_ts = r_meters[-1]['timestamp']
for resource in self.db.resource.find(q):
yield models.Resource(
resource_id=resource['_id'],
project_id=resource['project_id'],
first_sample_timestamp=None,
last_sample_timestamp=None,
source=resource['source'],
user_id=resource['user_id'],
metadata=resource['metadata'],
meter=[
models.ResourceMeter(
counter_name=meter['counter_name'],
counter_type=meter['counter_type'],
counter_unit=meter.get('counter_unit', ''),
)
for meter in resource['meter']
],
)
yield models.Resource(resource_id=latest_meter['resource_id'],
project_id=latest_meter['project_id'],
first_sample_timestamp=first_ts,
last_sample_timestamp=last_ts,
source=latest_meter['source'],
user_id=latest_meter['user_id'],
metadata=latest_meter['resource_metadata'],
meter=resource_meters)
def get_meters(self, user=None, project=None, resource=None, source=None,
metaquery={}, pagination=None):

View File

@ -169,6 +169,14 @@ class Connection(base.Connection):
user_table = self.conn.table(self.USER_TABLE)
resource_table = self.conn.table(self.RESOURCE_TABLE)
meter_table = self.conn.table(self.METER_TABLE)
# store metadata fields with prefix "r_"
resource_metadata = {}
if data['resource_metadata']:
resource_metadata = dict(('f:r_%s' % k, v)
for (k, v)
in data['resource_metadata'].iteritems())
# Make sure we know about the user and project
if data['user_id']:
user = user_table.row(data['user_id'])
@ -188,7 +196,8 @@ class Connection(base.Connection):
rts = reverse_timestamp(data['timestamp'])
resource = resource_table.row(data['resource_id'])
new_meter = "%s!%s!%s" % (
new_meter = _format_meter_reference(
data['counter_name'], data['counter_type'], data['counter_unit'])
new_resource = {'f:resource_id': data['resource_id'],
'f:project_id': data['project_id'],
@ -197,12 +206,7 @@ class Connection(base.Connection):
# store meters with prefix "m_"
'f:m_%s' % new_meter: "1"
}
# store metadata fields with prefix "r_"
if data['resource_metadata']:
resource_metadata = dict(('f:r_%s' % k, v)
for (k, v)
in data['resource_metadata'].iteritems())
new_resource.update(resource_metadata)
new_resource.update(resource_metadata)
# Update if resource has new information
if new_resource != resource:
@ -240,6 +244,8 @@ class Connection(base.Connection):
# add in reversed_ts here for time range scan
'f:rts': str(rts)
}
# Need to record resource_metadata for more robust filtering.
record.update(resource_metadata)
# Don't want to be changing the original data object.
data = copy.copy(data)
data['timestamp'] = ts
@ -301,12 +307,10 @@ class Connection(base.Connection):
if pagination:
raise NotImplementedError(_('Pagination not implemented'))
def make_resource(data, first_ts, last_ts):
def make_resource(data, first_ts, last_ts, meter_refs):
"""Transform HBase fields to Resource model."""
# convert HBase metadata e.g. f:r_display_name to display_name
data['f:metadata'] = dict((k[4:], v)
for k, v in data.iteritems()
if k.startswith('f:r_'))
data['f:metadata'] = _metadata_from_document(data)
return models.Resource(
resource_id=data['f:resource_id'],
@ -317,13 +321,10 @@ class Connection(base.Connection):
user_id=data['f:user_id'],
metadata=data['f:metadata'],
meter=[
models.ResourceMeter(*(m[4:].split("!")))
for m in data
if m.startswith('f:m_')
models.ResourceMeter(*(m.split("!")))
for m in meter_refs
],
)
resource_table = self.conn.table(self.RESOURCE_TABLE)
meter_table = self.conn.table(self.METER_TABLE)
q, start_row, stop_row = make_query(user=user,
@ -340,32 +341,40 @@ class Connection(base.Connection):
meters = meter_table.scan(filter=q, row_start=start_row,
row_stop=stop_row)
resources = {}
for resource_id, r_meters in itertools.groupby(
meters, key=lambda x: x[1]['f:resource_id']):
timestamps = tuple(timeutils.parse_strtime(m[1]['f:timestamp'])
for m in r_meters)
resources[resource_id] = (min(timestamps), max(timestamps))
# We have to sort on resource_id before we can group by it. According
# to the itertools documentation a new group is generated when the
# value of the key function changes (it breaks there).
meters = sorted(meters, key=_resource_id_from_record_tuple)
# handle metaquery
if len(metaquery) > 0:
for ignored, data in resource_table.rows(resources.iterkeys()):
for resource_id, r_meters in itertools.groupby(
meters, key=_resource_id_from_record_tuple):
meter_rows = [data[1] for data in sorted(
r_meters, key=_timestamp_from_record_tuple)]
meter_references = [
_format_meter_reference(m['f:counter_name'],
m['f:counter_type'],
m['f:counter_unit'])
for m in meter_rows]
latest_data = meter_rows[-1]
min_ts = timeutils.parse_strtime(meter_rows[0]['f:timestamp'])
max_ts = timeutils.parse_strtime(latest_data['f:timestamp'])
if metaquery:
for k, v in metaquery.iteritems():
# if metaquery matches, yield the resource model
# e.g. metaquery: metadata.display_name
# equals
# HBase: f:r_display_name
if data['f:r_' + k.split('.', 1)[1]] == v:
if latest_data['f:r_' + k.split('.', 1)[1]] == v:
yield make_resource(
data,
resources[data['f:resource_id']][0],
resources[data['f:resource_id']][1])
else:
for ignored, data in resource_table.rows(resources.iterkeys()):
latest_data,
min_ts,
max_ts,
meter_references
)
else:
yield make_resource(
data,
resources[data['f:resource_id']][0],
resources[data['f:resource_id']][1])
latest_data,
min_ts,
max_ts,
meter_references
)
def get_meters(self, user=None, project=None, resource=None, source=None,
metaquery={}, pagination=None):
@ -894,3 +903,29 @@ def _load_hbase_list(d, prefix):
for key in (k for k in d if k.startswith(prefix)):
ret.append(key[len(prefix):])
return ret
def _format_meter_reference(counter_name, counter_type, counter_unit):
"""Format reference to meter data.
"""
return "%s!%s!%s" % (counter_name, counter_type, counter_unit)
def _metadata_from_document(doc):
"""Extract resource metadata from HBase document using prefix specific
to HBase implementation.
"""
return dict(
(k[4:], v) for k, v in doc.iteritems() if k.startswith('f:r_'))
def _timestamp_from_record_tuple(record):
"""Extract timestamp from HBase tuple record
"""
return timeutils.parse_strtime(record[1]['f:timestamp'])
def _resource_id_from_record_tuple(record):
"""Extract resource_id from HBase tuple record
"""
return record[1]['f:resource_id']

View File

@ -298,6 +298,9 @@ class Connection(base.Connection):
return value;
}""")
SORT_OPERATION_MAPPING = {'desc': (pymongo.DESCENDING, '$lt'),
'asc': (pymongo.ASCENDING, '$gt')}
def __init__(self, conf):
url = conf.database.connection
@ -488,15 +491,7 @@ class Connection(base.Connection):
:return: sort parameters, query to use
"""
all_sort = []
sort_mapping = {'desc': (pymongo.DESCENDING, '$lt'),
'asc': (pymongo.ASCENDING, '$gt')
}
_sort_dir, _sort_flag = sort_mapping.get(sort_dir,
sort_mapping['desc'])
for _sort_key in sort_keys:
_all_sort = (_sort_key, _sort_dir)
all_sort.append(_all_sort)
all_sort, _op = cls._build_sort_instructions(sort_keys, sort_dir)
if marker is not None:
sort_criteria_list = []
@ -504,7 +499,7 @@ class Connection(base.Connection):
for i in range(0, len(sort_keys)):
sort_criteria_list.append(cls._recurse_sort_keys(
sort_keys[:(len(sort_keys) - i)],
marker, _sort_flag))
marker, _op))
metaquery = {"$or": sort_criteria_list}
else:
@ -512,6 +507,27 @@ class Connection(base.Connection):
return all_sort, metaquery
@classmethod
def _build_sort_instructions(cls, sort_keys=[], sort_dir='desc'):
"""Returns a sort_instruction and paging operator.
Sort instructions are used in the query to determine what attributes
to sort on and what direction to use.
:param q: The query dict passed in.
:param sort_keys: array of attributes by which results be sorted.
:param sort_dir: direction in which results be sorted (asc, desc).
:return: sort instructions and paging operator
"""
sort_instructions = []
_sort_dir, operation = cls.SORT_OPERATION_MAPPING.get(
sort_dir, cls.SORT_OPERATION_MAPPING['desc'])
for _sort_key in sort_keys:
_instruction = (_sort_key, _sort_dir)
sort_instructions.append(_instruction)
return sort_instructions, operation
@classmethod
def paginate_query(cls, q, db_collection, limit=None, marker=None,
sort_keys=[], sort_dir='desc'):
@ -615,8 +631,12 @@ class Connection(base.Connection):
if ts_range:
q['timestamp'] = ts_range
sort_keys = base._handle_sort_key('resource')
sort_instructions = self._build_sort_instructions(sort_keys)[0]
aggregate = self.db.meter.aggregate([
{"$match": q},
{"$sort": dict(sort_instructions)},
{"$group": {
"_id": "$resource_id",
"user_id": {"$first": "$user_id"},

View File

@ -46,7 +46,6 @@ from ceilometer.storage.sqlalchemy.models import UniqueName
from ceilometer.storage.sqlalchemy.models import User
from ceilometer import utils
LOG = log.getLogger(__name__)
@ -111,7 +110,7 @@ def make_query_from_filter(query, sample_filter, require_meter=True):
if sample_filter.meter:
query = query.filter(Meter.counter_name == sample_filter.meter)
elif require_meter:
raise RuntimeError('Missing required meter specifier')
raise RuntimeError(_('Missing required meter specifier'))
if sample_filter.source:
query = query.filter(Meter.sources.any(id=sample_filter.source))
if sample_filter.start:
@ -134,7 +133,7 @@ def make_query_from_filter(query, sample_filter, require_meter=True):
query = query.filter_by(resource_id=sample_filter.resource)
if sample_filter.metaquery:
raise NotImplementedError('metaquery not implemented')
raise NotImplementedError(_('metaquery not implemented'))
return query
@ -289,35 +288,75 @@ class Connection(base.Connection):
:param pagination: Optional pagination query.
"""
# We probably want to raise these early, since we don't know from here
# if they will be handled. We don't want extra wait or work for it to
# just fail.
if pagination:
raise NotImplementedError(_('Pagination not implemented'))
if metaquery:
raise NotImplementedError(_('metaquery not implemented'))
# (thomasm) We need to get the max timestamp first, since that's the
# most accurate. We also need to filter down in the subquery to
# constrain what we have to JOIN on later.
session = sqlalchemy_session.get_session()
query = session.query(
Meter,
func.min(Meter.timestamp),
func.max(Meter.timestamp),
ts_subquery = session.query(
Meter.resource_id,
func.max(Meter.timestamp).label("max_ts"),
func.min(Meter.timestamp).label("min_ts")
).group_by(Meter.resource_id)
if user is not None:
query = query.filter(Meter.user_id == user)
if source is not None:
query = query.filter(Meter.sources.any(id=source))
# Here are the basic 'eq' operation filters for the sample data.
for column, value in [(Meter.resource_id, resource),
(Meter.user_id, user),
(Meter.project_id, project)]:
if value:
ts_subquery = ts_subquery.filter(column == value)
if source:
ts_subquery = ts_subquery.filter(
Meter.sources.any(id=source))
# Here we limit the samples being used to a specific time period,
# if requested.
if start_timestamp:
if start_timestamp_op == 'gt':
query = query.filter(Meter.timestamp > start_timestamp)
ts_subquery = ts_subquery.filter(
Meter.timestamp > start_timestamp
)
else:
query = query.filter(Meter.timestamp >= start_timestamp)
ts_subquery = ts_subquery.filter(
Meter.timestamp >= start_timestamp
)
if end_timestamp:
if end_timestamp_op == 'le':
query = query.filter(Meter.timestamp <= end_timestamp)
ts_subquery = ts_subquery.filter(
Meter.timestamp <= end_timestamp
)
else:
query = query.filter(Meter.timestamp < end_timestamp)
if project is not None:
query = query.filter(Meter.project_id == project)
if resource is not None:
query = query.filter(Meter.resource_id == resource)
if metaquery:
raise NotImplementedError('metaquery not implemented')
ts_subquery = ts_subquery.filter(
Meter.timestamp < end_timestamp
)
ts_subquery = ts_subquery.subquery()
# Now we need to get the max Meter.id out of the leftover results, to
# break any ties.
agg_subquery = session.query(
func.max(Meter.id).label("max_id"),
ts_subquery
).filter(
Meter.resource_id == ts_subquery.c.resource_id,
Meter.timestamp == ts_subquery.c.max_ts
).group_by(Meter.resource_id).subquery()
query = session.query(
Meter,
agg_subquery.c.min_ts,
agg_subquery.c.max_ts
).filter(
Meter.id == agg_subquery.c.max_id
)
for meter, first_ts, last_ts in query.all():
yield api_models.Resource(
@ -353,6 +392,8 @@ class Connection(base.Connection):
if pagination:
raise NotImplementedError(_('Pagination not implemented'))
if metaquery:
raise NotImplementedError(_('metaquery not implemented'))
session = sqlalchemy_session.get_session()
@ -388,8 +429,6 @@ class Connection(base.Connection):
query = query.filter(Resource.id == resource)
if project is not None:
query = query.filter(Resource.project_id == project)
if metaquery:
raise NotImplementedError('metaquery not implemented')
for resource, meter in query.all():
yield api_models.Meter(
@ -501,7 +540,7 @@ class Connection(base.Connection):
for group in groupby:
if group not in ['user_id', 'project_id', 'resource_id']:
raise NotImplementedError(
"Unable to group by these fields")
_("Unable to group by these fields"))
if not period:
for res in self._make_stats_query(sample_filter, groupby):

View File

@ -58,4 +58,5 @@ class BaseTest(test_base.TestCase):
self.assertEqual(sort_keys_meter, ['foo', 'user_id', 'project_id'])
sort_keys_resource = base._handle_sort_key('resource', 'project_id')
self.assertEqual(sort_keys_resource, ['project_id', 'user_id'])
self.assertEquals(sort_keys_resource,
['project_id', 'user_id', 'timestamp'])

View File

@ -36,6 +36,28 @@ load_tests = testscenarios.load_tests_apply_scenarios
class DBTestBase(tests_db.TestBase):
def create_and_store_sample(self, timestamp=datetime.datetime.utcnow(),
metadata={
'display_name': 'test-server',
'tag': 'self.counter'
},
name='instance',
sample_type=sample.TYPE_CUMULATIVE, unit='',
volume=1, user_id='user-id',
project_id='project-id',
resource_id='resource-id', source=None):
s = sample.Sample(
name, sample_type, unit=unit, volume=volume, user_id=user_id,
project_id=project_id, resource_id=resource_id,
timestamp=timestamp,
resource_metadata=metadata, source=source
)
msg = rpc.meter_message_from_counter(
s, cfg.CONF.publisher_rpc.metering_secret
)
self.conn.record_metering_data(msg)
return msg
def setUp(self):
super(DBTestBase, self).setUp()
self.prepare_data()
@ -48,6 +70,7 @@ class DBTestBase(tests_db.TestBase):
original_timestamps = [(2012, 7, 2, 10, 40), (2012, 7, 2, 10, 41),
(2012, 7, 2, 10, 41), (2012, 7, 2, 10, 42),
(2012, 7, 2, 10, 43)]
timestamps_for_test_samples_default_order = [(2012, 7, 2, 10, 44),
(2011, 5, 30, 18, 3),
(2012, 12, 1, 1, 25),
@ -57,114 +80,46 @@ class DBTestBase(tests_db.TestBase):
timestamps_for_test_samples_default_order)
self.msgs = []
c = sample.Sample(
'instance',
sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='user-id',
project_id='project-id',
resource_id='resource-id',
self.msgs.append(self.create_and_store_sample(
timestamp=datetime.datetime(2012, 7, 2, 10, 39),
resource_metadata={'display_name': 'test-server',
'tag': 'self.counter',
},
source='test-1',
source='test-1')
)
self.msg0 = rpc.meter_message_from_counter(
c,
cfg.CONF.publisher_rpc.metering_secret,
)
self.conn.record_metering_data(self.msg0)
self.msgs.append(self.msg0)
self.counter = sample.Sample(
'instance',
sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='user-id',
project_id='project-id',
resource_id='resource-id',
self.msgs.append(self.create_and_store_sample(
timestamp=datetime.datetime(*timestamp_list[0]),
resource_metadata={'display_name': 'test-server',
'tag': 'self.counter',
},
source='test-1',
source='test-1')
)
self.msg1 = rpc.meter_message_from_counter(
self.counter,
cfg.CONF.publisher_rpc.metering_secret,
)
self.conn.record_metering_data(self.msg1)
self.msgs.append(self.msg1)
self.counter2 = sample.Sample(
'instance',
sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='user-id',
project_id='project-id',
resource_id='resource-id-alternate',
self.msgs.append(self.create_and_store_sample(
timestamp=datetime.datetime(*timestamp_list[1]),
resource_metadata={'display_name': 'test-server',
'tag': 'self.counter2',
},
source='test-2',
)
self.msg2 = rpc.meter_message_from_counter(
self.counter2,
cfg.CONF.publisher_rpc.metering_secret,
)
self.conn.record_metering_data(self.msg2)
self.msgs.append(self.msg2)
self.counter3 = sample.Sample(
'instance',
sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='user-id-alternate',
project_id='project-id',
resource_id='resource-id-alternate',
metadata={'display_name': 'test-server', 'tag': 'self.counter2'},
source='test-2')
)
self.msgs.append(self.create_and_store_sample(
timestamp=datetime.datetime(*timestamp_list[2]),
resource_metadata={'display_name': 'test-server',
'tag': 'self.counter3',
},
source='test-3',
resource_id='resource-id-alternate',
user_id='user-id-alternate',
metadata={'display_name': 'test-server', 'tag': 'self.counter3'},
source='test-3')
)
self.msg3 = rpc.meter_message_from_counter(
self.counter3,
cfg.CONF.publisher_rpc.metering_secret,
)
self.conn.record_metering_data(self.msg3)
self.msgs.append(self.msg3)
start_idx = 3
end_idx = len(timestamp_list)
for i, ts in zip(range(start_idx - 1, end_idx - 1),
timestamp_list[start_idx:end_idx]):
c = sample.Sample(
'instance',
sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='user-id-%s' % i,
project_id='project-id-%s' % i,
resource_id='resource-id-%s' % i,
timestamp=datetime.datetime(*ts),
resource_metadata={'display_name': 'test-server',
'tag': 'counter-%s' % i},
source='test',
self.msgs.append(
self.create_and_store_sample(
timestamp=datetime.datetime(*ts),
user_id='user-id-%s' % i,
project_id='project-id-%s' % i,
resource_id='resource-id-%s' % i,
metadata={
'display_name': 'test-server',
'tag': 'counter-%s' % i
},
source='test')
)
msg = rpc.meter_message_from_counter(
c,
cfg.CONF.publisher_rpc.metering_secret,
)
self.conn.record_metering_data(msg)
self.msgs.append(msg)
class UserTest(DBTestBase,
@ -179,7 +134,7 @@ class UserTest(DBTestBase,
def test_get_users_by_source(self):
users = self.conn.get_users(source='test-1')
assert list(users) == ['user-id']
self.assertEqual(list(users), ['user-id'])
class ProjectTest(DBTestBase,
@ -195,7 +150,7 @@ class ProjectTest(DBTestBase,
def test_get_projects_by_source(self):
projects = self.conn.get_projects(source='test-1')
expected = ['project-id']
assert list(projects) == expected
self.assertEqual(list(projects), expected)
class ResourceTest(DBTestBase,
@ -204,13 +159,6 @@ class ResourceTest(DBTestBase,
def test_get_resources(self):
expected_first_sample_timestamp = datetime.datetime(2012, 7, 2, 10, 39)
expected_last_sample_timestamp = datetime.datetime(2012, 7, 2, 10, 40)
#note(sileht): This is not normal, all backends should
# the same data...
if cfg.CONF.database.connection.startswith('db2://'):
expected_first_sample_timestamp = None
expected_last_sample_timestamp = None
msgs_sources = [msg['source'] for msg in self.msgs]
resources = list(self.conn.get_resources())
self.assertEqual(len(resources), 9)
@ -221,11 +169,11 @@ class ResourceTest(DBTestBase,
expected_first_sample_timestamp)
self.assertEqual(resource.last_sample_timestamp,
expected_last_sample_timestamp)
assert resource.resource_id == 'resource-id'
assert resource.project_id == 'project-id'
self.assertEqual(resource.resource_id, 'resource-id')
self.assertEqual(resource.project_id, 'project-id')
self.assertIn(resource.source, msgs_sources)
assert resource.user_id == 'user-id'
assert resource.metadata['display_name'] == 'test-server'
self.assertEqual(resource.user_id, 'user-id')
self.assertEqual(resource.metadata['display_name'], 'test-server')
self.assertIn(models.ResourceMeter('instance', 'cumulative', ''),
resource.meter)
break
@ -279,53 +227,54 @@ class ResourceTest(DBTestBase,
resources = list(self.conn.get_resources(start_timestamp=start_ts,
end_timestamp=end_ts))
resource_ids = [r.resource_id for r in resources]
assert set(resource_ids) == set(['resource-id-2'])
self.assertEqual(set(resource_ids), set(['resource-id-2']))
resources = list(self.conn.get_resources(start_timestamp=start_ts,
end_timestamp=end_ts,
start_timestamp_op='ge',
end_timestamp_op='lt'))
resource_ids = [r.resource_id for r in resources]
assert set(resource_ids) == set(['resource-id-2'])
self.assertEqual(set(resource_ids), set(['resource-id-2']))
resources = list(self.conn.get_resources(start_timestamp=start_ts,
end_timestamp=end_ts,
start_timestamp_op='gt',
end_timestamp_op='lt'))
resource_ids = [r.resource_id for r in resources]
assert len(resource_ids) == 0
self.assertEqual(len(resource_ids), 0)
resources = list(self.conn.get_resources(start_timestamp=start_ts,
end_timestamp=end_ts,
start_timestamp_op='gt',
end_timestamp_op='le'))
resource_ids = [r.resource_id for r in resources]
assert set(resource_ids) == set(['resource-id-3'])
self.assertEqual(set(resource_ids), set(['resource-id-3']))
resources = list(self.conn.get_resources(start_timestamp=start_ts,
end_timestamp=end_ts,
start_timestamp_op='ge',
end_timestamp_op='le'))
resource_ids = [r.resource_id for r in resources]
assert set(resource_ids) == set(['resource-id-2', 'resource-id-3'])
self.assertEqual(set(resource_ids),
set(['resource-id-2', 'resource-id-3']))
def test_get_resources_by_source(self):
resources = list(self.conn.get_resources(source='test-1'))
assert len(resources) == 1
self.assertEqual(len(resources), 1)
ids = set(r.resource_id for r in resources)
assert ids == set(['resource-id'])
self.assertEqual(ids, set(['resource-id']))
def test_get_resources_by_user(self):
resources = list(self.conn.get_resources(user='user-id'))
assert len(resources) == 2
self.assertEqual(len(resources), 2)
ids = set(r.resource_id for r in resources)
assert ids == set(['resource-id', 'resource-id-alternate'])
self.assertEqual(ids, set(['resource-id', 'resource-id-alternate']))
def test_get_resources_by_project(self):
resources = list(self.conn.get_resources(project='project-id'))
assert len(resources) == 2
self.assertEqual(len(resources), 2)
ids = set(r.resource_id for r in resources)
assert ids == set(['resource-id', 'resource-id-alternate'])
self.assertEqual(ids, set(['resource-id', 'resource-id-alternate']))
def test_get_resources_by_metaquery(self):
q = {'metadata.display_name': 'test-server'}
@ -336,6 +285,22 @@ class ResourceTest(DBTestBase,
resources = list(self.conn.get_resources(metaquery={}))
self.assertEqual(len(resources), 9)
def test_get_resources_most_recent_metadata_all(self):
resources = self.conn.get_resources()
expected_tags = ['self.counter', 'self.counter3', 'counter-2',
'counter-3', 'counter-4', 'counter-5', 'counter-6',
'counter-7', 'counter-8']
for resource in resources:
self.assertIn(resource.metadata['tag'], expected_tags)
def test_get_resources_most_recent_metadata_single(self):
resource = list(
self.conn.get_resources(resource='resource-id-alternate')
)[0]
expected_tag = 'self.counter3'
self.assertEqual(resource.metadata['tag'], expected_tag)
class ResourceTestPagination(DBTestBase,
tests_db.MixinTestsWithBackendScenarios):
@ -379,6 +344,57 @@ class ResourceTestPagination(DBTestBase,
[i.resource_id for i in results])
class ResourceTestOrdering(DBTestBase,
tests_db.MixinTestsWithBackendScenarios):
def prepare_data(self):
sample_timings = [('resource-id-1', [(2013, 8, 10, 10, 43),
(2013, 8, 10, 10, 44),
(2013, 8, 10, 10, 42),
(2013, 8, 10, 10, 49),
(2013, 8, 10, 10, 47)]),
('resource-id-2', [(2013, 8, 10, 10, 43),
(2013, 8, 10, 10, 48),
(2013, 8, 10, 10, 42),
(2013, 8, 10, 10, 48),
(2013, 8, 10, 10, 47)]),
('resource-id-3', [(2013, 8, 10, 10, 43),
(2013, 8, 10, 10, 44),
(2013, 8, 10, 10, 50),
(2013, 8, 10, 10, 49),
(2013, 8, 10, 10, 47)])]
counter = 0
for resource, timestamps in sample_timings:
for timestamp in timestamps:
self.create_and_store_sample(
timestamp=datetime.datetime(*timestamp),
resource_id=resource,
user_id=str(counter % 2),
project_id=str(counter % 3),
metadata={
'display_name': 'test-server',
'tag': 'sample-%s' % counter
},
source='test'
)
counter += 1
def test_get_resources_ordering_all(self):
resources = list(self.conn.get_resources())
expected = set([
('resource-id-1', 'sample-3'),
('resource-id-2', 'sample-8'),
('resource-id-3', 'sample-12')
])
received = set([(r.resource_id, r.metadata['tag']) for r in resources])
self.assertEqual(received, expected)
def test_get_resources_ordering_single(self):
resource = list(self.conn.get_resources(resource='resource-id-2'))[0]
self.assertEqual(resource.resource_id, 'resource-id-2')
self.assertEqual(resource.metadata['tag'], 'sample-8')
class MeterTest(DBTestBase,
tests_db.MixinTestsWithBackendScenarios):
@ -391,11 +407,11 @@ class MeterTest(DBTestBase,
def test_get_meters_by_user(self):
results = list(self.conn.get_meters(user='user-id'))
assert len(results) == 1
self.assertEqual(len(results), 1)
def test_get_meters_by_project(self):
results = list(self.conn.get_meters(project='project-id'))
assert len(results) == 2
self.assertEqual(len(results), 2)
def test_get_meters_by_metaquery(self):
q = {'metadata.display_name': 'test-server'}
@ -483,7 +499,7 @@ class RawSampleTest(DBTestBase,
results = list(self.conn.get_samples(f))
self.assertEqual(len(results), 3)
for meter in results:
assert meter.as_dict() in [self.msg0, self.msg1, self.msg2]
self.assertIn(meter.as_dict(), self.msgs[:3])
def test_get_samples_by_user_limit(self):
f = storage.SampleFilter(user='user-id')
@ -500,8 +516,7 @@ class RawSampleTest(DBTestBase,
results = list(self.conn.get_samples(f))
assert results
for meter in results:
assert meter.as_dict() in [self.msg0, self.msg1,
self.msg2, self.msg3]
self.assertIn(meter.as_dict(), self.msgs[:4])
def test_get_samples_by_resource(self):
f = storage.SampleFilter(user='user-id', resource='resource-id')
@ -509,7 +524,7 @@ class RawSampleTest(DBTestBase,
assert results
meter = results[1]
assert meter is not None
self.assertEqual(meter.as_dict(), self.msg0)
self.assertEqual(meter.as_dict(), self.msgs[0])
def test_get_samples_by_metaquery(self):
q = {'metadata.display_name': 'test-server'}
@ -517,7 +532,7 @@ class RawSampleTest(DBTestBase,
results = list(self.conn.get_samples(f))
assert results
for meter in results:
assert meter.as_dict() in self.msgs
self.assertIn(meter.as_dict(), self.msgs)
def test_get_samples_by_start_time(self):
timestamp = datetime.datetime(2012, 7, 2, 10, 41)
@ -527,17 +542,17 @@ class RawSampleTest(DBTestBase,
)
results = list(self.conn.get_samples(f))
assert len(results) == 1
assert results[0].timestamp == timestamp
self.assertEqual(len(results), 1)
self.assertEqual(results[0].timestamp, timestamp)
f.start_timestamp_op = 'ge'
results = list(self.conn.get_samples(f))
assert len(results) == 1
assert results[0].timestamp == timestamp
self.assertEqual(len(results), 1)
self.assertEqual(results[0].timestamp, timestamp)
f.start_timestamp_op = 'gt'
results = list(self.conn.get_samples(f))
assert len(results) == 0
self.assertEqual(len(results), 0)
def test_get_samples_by_end_time(self):
timestamp = datetime.datetime(2012, 7, 2, 10, 40)
@ -568,32 +583,32 @@ class RawSampleTest(DBTestBase,
)
results = list(self.conn.get_samples(f))
assert len(results) == 1
assert results[0].timestamp == start_ts
self.assertEqual(len(results), 1)
self.assertEqual(results[0].timestamp, start_ts)
f.start_timestamp_op = 'gt'
f.end_timestamp_op = 'lt'
results = list(self.conn.get_samples(f))
assert len(results) == 0
self.assertEqual(len(results), 0)
f.start_timestamp_op = 'ge'
f.end_timestamp_op = 'lt'
results = list(self.conn.get_samples(f))
assert len(results) == 1
assert results[0].timestamp == start_ts
self.assertEqual(len(results), 1)
self.assertEqual(results[0].timestamp, start_ts)
f.start_timestamp_op = 'gt'
f.end_timestamp_op = 'le'
results = list(self.conn.get_samples(f))
assert len(results) == 1
assert results[0].timestamp == end_ts
self.assertEqual(len(results), 1)
self.assertEqual(results[0].timestamp, end_ts)
f.start_timestamp_op = 'ge'
f.end_timestamp_op = 'le'
results = list(self.conn.get_samples(f))
assert len(results) == 2
assert results[0].timestamp == end_ts
assert results[1].timestamp == start_ts
self.assertEqual(len(results), 2)
self.assertEqual(results[0].timestamp, end_ts)
self.assertEqual(results[1].timestamp, start_ts)
def test_get_samples_by_name(self):
f = storage.SampleFilter(user='user-id', meter='no-such-meter')
@ -701,12 +716,12 @@ class StatisticsTest(DBTestBase,
self.assertEqual(results.duration,
(datetime.datetime(2012, 9, 25, 12, 32)
- datetime.datetime(2012, 9, 25, 10, 30)).seconds)
assert results.count == 3
assert results.unit == 'GiB'
assert results.min == 8
assert results.max == 10
assert results.sum == 27
assert results.avg == 9
self.assertEqual(results.count, 3)
self.assertEqual(results.unit, 'GiB')
self.assertEqual(results.min, 8)
self.assertEqual(results.max, 10)
self.assertEqual(results.sum, 27)
self.assertEqual(results.avg, 9)
def test_no_period_in_query(self):
f = storage.SampleFilter(
@ -714,15 +729,15 @@ class StatisticsTest(DBTestBase,
meter='volume.size',
)
results = list(self.conn.get_meter_statistics(f))[0]
assert results.period == 0
self.assertEqual(results.period, 0)
def test_period_is_int(self):
f = storage.SampleFilter(
meter='volume.size',
)
results = list(self.conn.get_meter_statistics(f))[0]
assert(isinstance(results.period, int))
assert results.count == 6
self.assertIs(type(results.period), int)
self.assertEqual(results.count, 6)
def test_by_user_period(self):
f = storage.SampleFilter(
@ -834,12 +849,12 @@ class StatisticsTest(DBTestBase,
)
results = list(self.conn.get_meter_statistics(f))[0]
self.assertEqual(results.duration, 0)
assert results.count == 1
assert results.unit == 'GiB'
assert results.min == 6
assert results.max == 6
assert results.sum == 6
assert results.avg == 6
self.assertEqual(results.count, 1)
self.assertEqual(results.unit, 'GiB')
self.assertEqual(results.min, 6)
self.assertEqual(results.max, 6)
self.assertEqual(results.sum, 6)
self.assertEqual(results.avg, 6)
def test_one_resource(self):
f = storage.SampleFilter(
@ -850,12 +865,12 @@ class StatisticsTest(DBTestBase,
self.assertEqual(results.duration,
(datetime.datetime(2012, 9, 25, 12, 32)
- datetime.datetime(2012, 9, 25, 10, 30)).seconds)
assert results.count == 3
assert results.unit == 'GiB'
assert results.min == 5
assert results.max == 7
assert results.sum == 18
assert results.avg == 6
self.assertEqual(results.count, 3)
self.assertEqual(results.unit, 'GiB')
self.assertEqual(results.min, 5)
self.assertEqual(results.max, 7)
self.assertEqual(results.sum, 18)
self.assertEqual(results.avg, 6)
class StatisticsGroupByTest(DBTestBase,