Add first and last sample timestamp
When listing resources, it's useful to know where it started and where it stopped. We don't have this precise information, but we can at least say when we get the first and last timestamp. Blueprint: api-v2-improvement Change-Id: Icf678820bf7c3e6e12288b15623b653d9a942f21 Signed-off-by: Julien Danjou <julien@danjou.info>
This commit is contained in:
parent
df126b7652
commit
92bed7c1cc
@ -18,9 +18,9 @@
|
||||
# under the License.
|
||||
"""HBase storage backend
|
||||
"""
|
||||
from sets import Set
|
||||
import json
|
||||
import hashlib
|
||||
import itertools
|
||||
import copy
|
||||
import datetime
|
||||
import happybase
|
||||
@ -296,7 +296,7 @@ class Connection(base.Connection):
|
||||
:param resource: Optional resource filter.
|
||||
"""
|
||||
|
||||
def make_resource(data):
|
||||
def make_resource(data, first_ts, last_ts):
|
||||
"""Transform HBase fields to Resource model."""
|
||||
# convert HBase metadata e.g. f:r_display_name to display_name
|
||||
data['f:metadata'] = dict((k[4:], v)
|
||||
@ -305,6 +305,8 @@ class Connection(base.Connection):
|
||||
|
||||
return models.Resource(
|
||||
resource_id=data['f:resource_id'],
|
||||
first_sample_timestamp=first_ts,
|
||||
last_sample_timestamp=last_ts,
|
||||
project_id=data['f:project_id'],
|
||||
source=data['f:source'],
|
||||
user_id=data['f:user_id'],
|
||||
@ -330,27 +332,35 @@ class Connection(base.Connection):
|
||||
require_meter=False,
|
||||
query_only=False)
|
||||
LOG.debug("Query Meter table: %s" % q)
|
||||
gen = meter_table.scan(filter=q, row_start=start_row,
|
||||
row_stop=stop_row)
|
||||
meters = meter_table.scan(filter=q, row_start=start_row,
|
||||
row_stop=stop_row)
|
||||
|
||||
# put all the resource_ids in a Set
|
||||
resource_ids = Set()
|
||||
for ignored, data in gen:
|
||||
resource_ids.add(data['f:resource_id'])
|
||||
resources = {}
|
||||
for resource_id, r_meters in itertools.groupby(
|
||||
meters, key=lambda x: x[1]['f:resource_id']):
|
||||
timestamps = tuple(timeutils.parse_strtime(m[1]['f:timestamp'])
|
||||
for m in r_meters)
|
||||
resources[resource_id] = (min(timestamps), max(timestamps))
|
||||
|
||||
# handle metaquery
|
||||
if len(metaquery) > 0:
|
||||
for ignored, data in resource_table.rows(resource_ids):
|
||||
for ignored, data in resource_table.rows(resources.iterkeys()):
|
||||
for k, v in metaquery.iteritems():
|
||||
# if metaquery matches, yield the resource model
|
||||
# e.g. metaquery: metadata.display_name
|
||||
# equals
|
||||
# HBase: f:r_display_name
|
||||
if data['f:r_' + k.split('.', 1)[1]] == v:
|
||||
yield make_resource(data)
|
||||
yield make_resource(
|
||||
data,
|
||||
resources[data['f:resource_id']][0],
|
||||
resources[data['f:resource_id']][1])
|
||||
else:
|
||||
for ignored, data in resource_table.rows(resource_ids):
|
||||
yield make_resource(data)
|
||||
for ignored, data in resource_table.rows(resources.iterkeys()):
|
||||
yield make_resource(
|
||||
data,
|
||||
resources[data['f:resource_id']][0],
|
||||
resources[data['f:resource_id']][1])
|
||||
|
||||
def get_meters(self, user=None, project=None, resource=None, source=None,
|
||||
metaquery={}):
|
||||
|
@ -594,6 +594,8 @@ class Connection(base.Connection):
|
||||
"user_id": {"$first": "$user_id"},
|
||||
"project_id": {"$first": "$project_id"},
|
||||
"source": {"$first": "$source"},
|
||||
"first_sample_timestamp": {"$min": "$timestamp"},
|
||||
"last_sample_timestamp": {"$max": "$timestamp"},
|
||||
"metadata": {"$first": "$resource_metadata"},
|
||||
"meters_name": {"$push": "$counter_name"},
|
||||
"meters_type": {"$push": "$counter_type"},
|
||||
@ -610,6 +612,8 @@ class Connection(base.Connection):
|
||||
resource_id=result['_id'],
|
||||
user_id=result['user_id'],
|
||||
project_id=result['project_id'],
|
||||
first_sample_timestamp=result['first_sample_timestamp'],
|
||||
last_sample_timestamp=result['last_sample_timestamp'],
|
||||
source=result['source'],
|
||||
metadata=result['metadata'],
|
||||
meter=[
|
||||
|
@ -287,7 +287,11 @@ class Connection(base.Connection):
|
||||
:param resource: Optional resource filter.
|
||||
"""
|
||||
session = sqlalchemy_session.get_session()
|
||||
query = session.query(Meter,).group_by(Meter.resource_id)
|
||||
query = session.query(
|
||||
Meter,
|
||||
func.min(Meter.timestamp),
|
||||
func.max(Meter.timestamp),
|
||||
).group_by(Meter.resource_id)
|
||||
if user is not None:
|
||||
query = query.filter(Meter.user_id == user)
|
||||
if source is not None:
|
||||
@ -309,10 +313,12 @@ class Connection(base.Connection):
|
||||
if metaquery:
|
||||
raise NotImplementedError('metaquery not implemented')
|
||||
|
||||
for meter in query.all():
|
||||
for meter, first_ts, last_ts in query.all():
|
||||
yield api_models.Resource(
|
||||
resource_id=meter.resource_id,
|
||||
project_id=meter.project_id,
|
||||
first_sample_timestamp=first_ts,
|
||||
last_sample_timestamp=last_ts,
|
||||
source=meter.sources[0].id,
|
||||
user_id=meter.user_id,
|
||||
metadata=meter.resource_metadata,
|
||||
|
@ -88,12 +88,17 @@ class Resource(Model):
|
||||
"""Something for which sample data has been collected.
|
||||
"""
|
||||
|
||||
def __init__(self, resource_id, project_id, source, user_id, metadata,
|
||||
def __init__(self, resource_id, project_id,
|
||||
first_sample_timestamp,
|
||||
last_sample_timestamp,
|
||||
source, user_id, metadata,
|
||||
meter):
|
||||
"""Create a new resource.
|
||||
|
||||
:param resource_id: UUID of the resource
|
||||
:param project_id: UUID of project owning the resource
|
||||
:param first_sample_timestamp: first sample timestamp captured
|
||||
:param last_sample_timestamp: last sample timestamp captured
|
||||
:param source: the identifier for the user/project id definition
|
||||
:param user_id: UUID of user owning the resource
|
||||
:param metadata: most current metadata for the resource (a dict)
|
||||
@ -101,6 +106,8 @@ class Resource(Model):
|
||||
"""
|
||||
Model.__init__(self,
|
||||
resource_id=resource_id,
|
||||
first_sample_timestamp=first_sample_timestamp,
|
||||
last_sample_timestamp=last_sample_timestamp,
|
||||
project_id=project_id,
|
||||
source=source,
|
||||
user_id=user_id,
|
||||
|
@ -58,6 +58,27 @@ class DBTestBase(test_db.TestBase):
|
||||
timestamps_for_test_samples_default_order)
|
||||
|
||||
self.msgs = []
|
||||
c = sample.Sample(
|
||||
'instance',
|
||||
sample.TYPE_CUMULATIVE,
|
||||
unit='',
|
||||
volume=1,
|
||||
user_id='user-id',
|
||||
project_id='project-id',
|
||||
resource_id='resource-id',
|
||||
timestamp=datetime.datetime(2012, 7, 2, 10, 39),
|
||||
resource_metadata={'display_name': 'test-server',
|
||||
'tag': 'self.counter',
|
||||
},
|
||||
source='test-1',
|
||||
)
|
||||
self.msg0 = rpc.meter_message_from_counter(
|
||||
c,
|
||||
cfg.CONF.publisher_rpc.metering_secret,
|
||||
)
|
||||
self.conn.record_metering_data(self.msg0)
|
||||
self.msgs.append(self.msg0)
|
||||
|
||||
self.counter = sample.Sample(
|
||||
'instance',
|
||||
sample.TYPE_CUMULATIVE,
|
||||
@ -185,6 +206,10 @@ class ResourceTest(DBTestBase):
|
||||
for resource in resources:
|
||||
if resource.resource_id != 'resource-id':
|
||||
continue
|
||||
self.assertEqual(resource.first_sample_timestamp,
|
||||
datetime.datetime(2012, 7, 2, 10, 39))
|
||||
self.assertEqual(resource.last_sample_timestamp,
|
||||
datetime.datetime(2012, 7, 2, 10, 40))
|
||||
assert resource.resource_id == 'resource-id'
|
||||
assert resource.project_id == 'project-id'
|
||||
self.assertIn(resource.source, msgs_sources)
|
||||
@ -455,9 +480,9 @@ class RawSampleTest(DBTestBase):
|
||||
def test_get_samples_by_user(self):
|
||||
f = storage.SampleFilter(user='user-id')
|
||||
results = list(self.conn.get_samples(f))
|
||||
assert len(results) == 2
|
||||
self.assertEqual(len(results), 3)
|
||||
for meter in results:
|
||||
assert meter.as_dict() in [self.msg1, self.msg2]
|
||||
assert meter.as_dict() in [self.msg0, self.msg1, self.msg2]
|
||||
|
||||
def test_get_samples_by_user_limit(self):
|
||||
f = storage.SampleFilter(user='user-id')
|
||||
@ -467,22 +492,23 @@ class RawSampleTest(DBTestBase):
|
||||
def test_get_samples_by_user_limit_bigger(self):
|
||||
f = storage.SampleFilter(user='user-id')
|
||||
results = list(self.conn.get_samples(f, limit=42))
|
||||
self.assertEqual(len(results), 2)
|
||||
self.assertEqual(len(results), 3)
|
||||
|
||||
def test_get_samples_by_project(self):
|
||||
f = storage.SampleFilter(project='project-id')
|
||||
results = list(self.conn.get_samples(f))
|
||||
assert results
|
||||
for meter in results:
|
||||
assert meter.as_dict() in [self.msg1, self.msg2, self.msg3]
|
||||
assert meter.as_dict() in [self.msg0, self.msg1,
|
||||
self.msg2, self.msg3]
|
||||
|
||||
def test_get_samples_by_resource(self):
|
||||
f = storage.SampleFilter(user='user-id', resource='resource-id')
|
||||
results = list(self.conn.get_samples(f))
|
||||
assert results
|
||||
meter = results[0]
|
||||
meter = results[1]
|
||||
assert meter is not None
|
||||
assert meter.as_dict() == self.msg1
|
||||
self.assertEqual(meter.as_dict(), self.msg0)
|
||||
|
||||
def test_get_samples_by_metaquery(self):
|
||||
q = {'metadata.display_name': 'test-server'}
|
||||
@ -525,16 +551,17 @@ class RawSampleTest(DBTestBase):
|
||||
)
|
||||
|
||||
results = list(self.conn.get_samples(f))
|
||||
assert len(results) == 0
|
||||
self.assertEqual(len(results), 1)
|
||||
|
||||
f.end_timestamp_op = 'lt'
|
||||
results = list(self.conn.get_samples(f))
|
||||
assert len(results) == 0
|
||||
self.assertEqual(len(results), 1)
|
||||
|
||||
f.end_timestamp_op = 'le'
|
||||
results = list(self.conn.get_samples(f))
|
||||
assert len(results) == 1
|
||||
assert results[0].timestamp == timestamp
|
||||
self.assertEqual(len(results), 2)
|
||||
self.assertEqual(results[1].timestamp,
|
||||
datetime.datetime(2012, 7, 2, 10, 39))
|
||||
|
||||
def test_get_samples_by_both_times(self):
|
||||
start_ts = datetime.datetime(2012, 7, 2, 10, 42)
|
||||
@ -585,8 +612,7 @@ class RawSampleTest(DBTestBase):
|
||||
def test_get_samples_by_source(self):
|
||||
f = storage.SampleFilter(source='test-1')
|
||||
results = list(self.conn.get_samples(f))
|
||||
assert results
|
||||
assert len(results) == 1
|
||||
self.assertEqual(len(results), 2)
|
||||
|
||||
def test_clear_metering_data(self):
|
||||
timeutils.utcnow.override_time = datetime.datetime(2012, 7, 2, 10, 45)
|
||||
@ -620,7 +646,7 @@ class RawSampleTest(DBTestBase):
|
||||
|
||||
f = storage.SampleFilter(meter='instance')
|
||||
results = list(self.conn.get_samples(f))
|
||||
self.assertEqual(len(results), 10)
|
||||
self.assertEqual(len(results), 11)
|
||||
results = list(self.conn.get_users())
|
||||
self.assertEqual(len(results), 9)
|
||||
results = list(self.conn.get_projects())
|
||||
|
Loading…
x
Reference in New Issue
Block a user