diff --git a/ceilometer/storage/impl_hbase.py b/ceilometer/storage/impl_hbase.py index 1f995c540..a06f16f74 100644 --- a/ceilometer/storage/impl_hbase.py +++ b/ceilometer/storage/impl_hbase.py @@ -16,36 +16,9 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -"""Openstack Ceilometer HBase storage backend - -.. note:: - This driver is designed to enable Ceilometer store its data in HBase. - The implementation is using HBase Thrift interface so it's necessary to have - the HBase Thrift server installed and started: - (https://ccp.cloudera.com/display/CDHDOC/HBase+Installation) - - This driver has been tested against HBase 0.92.1/CDH 4.1.1, - HBase 0.94.4/HDP 1.2 and HBase 0.94.5/Apache. - Versions earlier than 0.92.1 are not supported due to feature - incompatibility. - - Due to limitations of HBase the driver implements its own data aggregations - which may harm its performance. It is likely that the performance could be - improved if co-processors were used, however at the moment the co-processor - support is not exposed through Thrift API. - - The following four tables are expected to exist in HBase: - create 'project', {NAME=>'f'} - create 'user', {NAME=>'f'} - create 'resource', {NAME=>'f'} - create 'meter', {NAME=>'f'} - - The driver is using HappyBase which is a wrapper library used to interact - with HBase via Thrift protocol: - http://happybase.readthedocs.org/en/latest/index.html# - +"""HBase storage backend """ - +from sets import Set from urlparse import urlparse import json import hashlib @@ -223,16 +196,24 @@ class Connection(base.Connection): project['f:s_%s' % data['source']] = "1" self.project.put(data['project_id'], project) + rts = reverse_timestamp(data['timestamp']) + resource = self.resource.row(data['resource_id']) new_meter = "%s!%s!%s" % ( data['counter_name'], data['counter_type'], data['counter_unit']) new_resource = {'f:resource_id': data['resource_id'], 'f:project_id': data['project_id'], 'f:user_id': data['user_id'], - 'f:metadata': json.dumps(data['resource_metadata']), 'f:source': data["source"], - 'f:m_%s' % new_meter: "1", + # store meters with prefix "m_" + 'f:m_%s' % new_meter: "1" } + # store metadata fields with prefix "r_" + resource_metadata = dict(('f:r_%s' % k, v) + for (k, v) + in data['resource_metadata'].iteritems()) + new_resource.update(resource_metadata) + # Update if resource has new information if new_resource != resource: meters = _load_hbase_list(resource, 'm') @@ -249,7 +230,6 @@ class Connection(base.Connection): # We use reverse timestamps in rowkeys as they are sorted # alphabetically. - rts = reverse_timestamp(data['timestamp']) row = "%s_%d_%s" % (data['counter_name'], rts, m.hexdigest()) # Convert timestamp to string as json.dumps won't @@ -309,34 +289,22 @@ class Connection(base.Connection): :param source: Optional source filter. :param start_timestamp: Optional modified timestamp start range. :param end_timestamp: Optional modified timestamp end range. + :param metaquery: Optional dict with metadata to match on. """ - q, start_row, end_row = make_query(user=user, - project=project, - source=source, - start=start_timestamp, - end=end_timestamp, - require_meter=False) - LOG.debug("q: %s" % q) - # TODO implement metaquery support - if len(metaquery) > 0: - raise NotImplementedError('metaquery not implemented') + def make_resource(data): + """ transform HBase fields to Resource model + """ + # convert HBase metadata e.g. f:r_display_name to display_name + data['f:metadata'] = dict((k[4:], v) + for k, v in data.iteritems() + if k.startswith('f:r_')) - resource_ids = {} - g = self.meter.scan(filter=q, row_start=start_row, - row_stop=end_row) - for ignored, data in g: - resource_ids[data['f:resource_id']] = data['f:resource_id'] - - q = make_query(user=user, project=project, source=source, - query_only=True, require_meter=False) - LOG.debug("q: %s" % q) - for resource_id, data in self.resource.rows(resource_ids): - yield models.Resource( - resource_id=resource_id, + return models.Resource( + resource_id=data['f:resource_id'], project_id=data['f:project_id'], source=data['f:source'], user_id=data['f:user_id'], - metadata=json.loads(data['f:metadata']), + metadata=data['f:metadata'], meter=[ models.ResourceMeter(*(m[4:].split("!"))) for m in data @@ -344,6 +312,35 @@ class Connection(base.Connection): ], ) + q, start_row, stop_row = make_query(user=user, + project=project, + source=source, + start=start_timestamp, + end=end_timestamp, + require_meter=False, + query_only=False) + LOG.debug("Query Meter table: %s" % q) + gen = self.meter.scan(filter=q, row_start=start_row, row_stop=stop_row) + + # put all the resource_ids in a Set + resource_ids = Set() + for ignored, data in gen: + resource_ids.add(data['f:resource_id']) + + # handle metaquery + if len(metaquery) > 0: + for ignored, data in self.resource.rows(resource_ids): + for k, v in metaquery.iteritems(): + # if metaquery matches, yield the resource model + # e.g. metaquery: metadata.display_name + # equals + # HBase: f:r_display_name + if data['f:r_' + k.split('.', 1)[1]] == v: + yield make_resource(data) + else: + for ignored, data in self.resource.rows(resource_ids): + yield make_resource(data) + def get_meters(self, user=None, project=None, resource=None, source=None, metaquery={}): """Return an iterable of models.Meter instances @@ -354,13 +351,23 @@ class Connection(base.Connection): :param source: Optional source filter. :param metaquery: Optional dict with metadata to match on. """ - q, ignored, ignored = make_query(user=user, project=project, - resource=resource, source=source, - require_meter=False) - LOG.debug("q: %s" % q) - # TODO implement metaquery support + q = make_query(user=user, project=project, resource=resource, + source=source, require_meter=False, query_only=True) + LOG.debug("Query Resource table: %s" % q) + + # handle metaquery if len(metaquery) > 0: - raise NotImplementedError('metaquery not implemented') + meta_q = [] + for k, v in metaquery.iteritems(): + meta_q.append( + "SingleColumnValueFilter ('f', '%s', =, 'binary:%s')" + % ('r_' + k.split('.', 1)[1], v)) + meta_q = " AND ".join(meta_q) + # join query and metaquery + if q is not None: + q += " AND " + meta_q + else: + q = meta_q # metaquery only gen = self.resource.scan(filter=q) @@ -389,15 +396,36 @@ class Connection(base.Connection): def get_samples(self, sample_filter): """Return an iterable of models.Sample instances """ + def make_sample(data): + """ transform HBase fields to Sample model + """ + data = json.loads(data['f:message']) + data['timestamp'] = timeutils.parse_strtime(data['timestamp']) + return models.Sample(**data) + q, start, stop = make_query_from_filter(sample_filter, require_meter=False) - LOG.debug("q: %s" % q) + LOG.debug("Query Meter Table: %s" % q) gen = self.meter.scan(filter=q, row_start=start, row_stop=stop) + for ignored, meter in gen: - meter = json.loads(meter['f:message']) - meter['timestamp'] = timeutils.parse_strtime(meter['timestamp']) - yield models.Sample(**meter) + # TODO (shengjie) put this implementation here because it's failing + # the test. bp hbase-meter-table-enhancement will address this + # properly. + # handle metaquery + metaquery = sample_filter.metaquery + if len(metaquery) > 0: + # metaquery checks resource table + resource = self.resource.row(meter['f:resource_id']) + + for k, v in metaquery.iteritems(): + if resource['f:r_' + k.split('.', 1)[1]] != v: + break # if one metaquery doesn't match, break + else: + yield make_sample(meter) + else: + yield make_sample(meter) def _update_meter_stats(self, stat, meter): """Do the stats calculation on a requested time bucket in stats dict @@ -660,7 +688,8 @@ def reverse_timestamp(dt): def make_query(user=None, project=None, meter=None, resource=None, source=None, start=None, end=None, require_meter=True, query_only=False): - """Return a filter query based on the selected parameters. + """Return a filter query string based on the selected parameters. + :param user: Optional user-id :param project: Optional project-id :param meter: Optional counter-name @@ -687,23 +716,19 @@ def make_query(user=None, project=None, meter=None, if source: q.append("SingleColumnValueFilter " "('f', 'source', =, 'binary:%s')" % source) - # when start_time and end_time is provided, - # if it's filtered by meter, - # rowkey will be used in the query; - # if it's non meter filter query(eg. project_id, user_id etc), - # SingleColumnValueFilter against rts will be appended to the query - # query other tables should have no start and end passed in - stopRow, startRow = "", "" + + start_row, end_row = "", "" rts_start = str(reverse_timestamp(start) + 1) if start else "" rts_end = str(reverse_timestamp(end) + 1) if end else "" + # when start_time and end_time is provided, + # if it's filtered by meter, + # rowkey will be used in the query; + # else it's non meter filter query(e.g. project_id, user_id etc), + # SingleColumnValueFilter against rts will be appended to the query + # query other tables should have no start and end passed in if meter: - # if it's meter filter without start and end, - # startRow = meter while stopRow = meter + MAX_BYTE - if not rts_start: - rts_start = chr(127) - stopRow = "%s_%s" % (meter, rts_start) - startRow = "%s_%s" % (meter, rts_end) + start_row, end_row = _make_rowkey_scan(meter, rts_start, rts_end) elif require_meter: raise RuntimeError('Missing required meter specifier') else: @@ -717,10 +742,11 @@ def make_query(user=None, project=None, meter=None, sample_filter = None if len(q): sample_filter = " AND ".join(q) + if query_only: return sample_filter else: - return sample_filter, startRow, stopRow + return sample_filter, start_row, end_row def make_query_from_filter(sample_filter, require_meter=True): @@ -730,16 +756,24 @@ def make_query_from_filter(sample_filter, require_meter=True): :param require_meter: If true and the filter does not have a meter, raise an error. """ - if sample_filter.metaquery is not None and \ - len(sample_filter.metaquery) > 0: - raise NotImplementedError('metaquery not implemented') - return make_query(sample_filter.user, sample_filter.project, sample_filter.meter, sample_filter.resource, sample_filter.source, sample_filter.start, sample_filter.end, require_meter) +def _make_rowkey_scan(meter, rts_start=None, rts_end=None): + """ if it's meter filter without start and end, + start_row = meter while end_row = meter + MAX_BYTE + """ + if not rts_start: + rts_start = chr(127) + end_row = "%s_%s" % (meter, rts_start) + start_row = "%s_%s" % (meter, rts_end) + + return start_row, end_row + + def _load_hbase_list(d, prefix): """Deserialise dict stored as HBase column family """ diff --git a/tests/api/v1/test_impl_hbase.py b/tests/api/v1/test_impl_hbase.py index d5d666481..eec44c3a3 100644 --- a/tests/api/v1/test_impl_hbase.py +++ b/tests/api/v1/test_impl_hbase.py @@ -34,6 +34,10 @@ class TestListEvents(list_events.TestListEvents): database_connection = 'hbase://__test__' +class TestListEventsMetaQuery(list_events.TestListEventsMetaquery): + database_connection = 'hbase://__test__' + + class TestListEmptyMeters(list_meters.TestListEmptyMeters): database_connection = 'hbase://__test__' @@ -42,6 +46,10 @@ class TestListMeters(list_meters.TestListMeters): database_connection = 'hbase://__test__' +class TestListMetersMetaquery(list_meters.TestListMetersMetaquery): + database_connection = 'hbase://__test__' + + class TestListEmptyUsers(list_users.TestListEmptyUsers): database_connection = 'hbase://__test__' @@ -70,6 +78,10 @@ class TestListResources(list_resources.TestListResources): database_connection = 'hbase://__test__' +class TestListResourcesMetaquery(list_resources.TestListResourcesMetaquery): + database_connection = 'hbase://__test__' + + class TestListSource(list_sources.TestListSource): database_connection = 'hbase://__test__'