diff --git a/ceilometer/storage/impl_hbase.py b/ceilometer/storage/impl_hbase.py index e80d19fb5..69ef2a6c0 100644 --- a/ceilometer/storage/impl_hbase.py +++ b/ceilometer/storage/impl_hbase.py @@ -27,7 +27,6 @@ import datetime import happybase import os import re - from oslo.config import cfg from ceilometer.openstack.common import log @@ -87,6 +86,13 @@ class Connection(base.Connection): """HBase connection. """ + _memory_instance = None + + PROJECT_TABLE = "project" + USER_TABLE = "user" + RESOURCE_TABLE = "resource" + METER_TABLE = "meter" + def __init__(self, conf): """Hbase Connection Initialization.""" opts = self._parse_connection_url(conf.database.connection) @@ -97,26 +103,17 @@ class Connection(base.Connection): if url: # Reparse URL, but from the env variable now opts = self._parse_connection_url(url) + self.conn = self._get_connection(opts) else: # This is a in-memory usage for unit tests - self.conn = MConnection() - self.project = self.conn.table(self.PROJECT_TABLE) - self.user = self.conn.table(self.USER_TABLE) - self.resource = self.conn.table(self.RESOURCE_TABLE) - self.meter = self.conn.table(self.METER_TABLE) - return - - self.conn = self._get_connection(opts) + if Connection._memory_instance is None: + LOG.debug('Creating a new in-memory HBase ' + 'Connection object') + Connection._memory_instance = MConnection() + self.conn = Connection._memory_instance + else: + self.conn = self._get_connection(opts) self.conn.open() - self.project = self.conn.table(self.PROJECT_TABLE) - self.user = self.conn.table(self.USER_TABLE) - self.resource = self.conn.table(self.RESOURCE_TABLE) - self.meter = self.conn.table(self.METER_TABLE) - - PROJECT_TABLE = "project" - USER_TABLE = "user" - RESOURCE_TABLE = "resource" - METER_TABLE = "meter" def upgrade(self, version=None): self.conn.create_table(self.PROJECT_TABLE, {'f': dict()}) @@ -178,25 +175,29 @@ class Connection(base.Connection): :param data: a dictionary such as returned by ceilometer.meter.meter_message_from_counter """ + project_table = self.conn.table(self.PROJECT_TABLE) + user_table = self.conn.table(self.USER_TABLE) + resource_table = self.conn.table(self.RESOURCE_TABLE) + meter_table = self.conn.table(self.METER_TABLE) # Make sure we know about the user and project if data['user_id']: - user = self.user.row(data['user_id']) + user = user_table.row(data['user_id']) sources = _load_hbase_list(user, 's') # Update if source is new if data['source'] not in sources: user['f:s_%s' % data['source']] = "1" - self.user.put(data['user_id'], user) + user_table.put(data['user_id'], user) - project = self.project.row(data['project_id']) + project = project_table.row(data['project_id']) sources = _load_hbase_list(project, 's') # Update if source is new if data['source'] not in sources: project['f:s_%s' % data['source']] = "1" - self.project.put(data['project_id'], project) + project_table.put(data['project_id'], project) rts = reverse_timestamp(data['timestamp']) - resource = self.resource.row(data['resource_id']) + resource = resource_table.row(data['resource_id']) new_meter = "%s!%s!%s" % ( data['counter_name'], data['counter_type'], data['counter_unit']) new_resource = {'f:resource_id': data['resource_id'], @@ -207,10 +208,11 @@ class Connection(base.Connection): 'f:m_%s' % new_meter: "1" } # store metadata fields with prefix "r_" - resource_metadata = dict(('f:r_%s' % k, v) - for (k, v) - in data['resource_metadata'].iteritems()) - new_resource.update(resource_metadata) + if data['resource_metadata']: + resource_metadata = dict(('f:r_%s' % k, v) + for (k, v) + in data['resource_metadata'].iteritems()) + new_resource.update(resource_metadata) # Update if resource has new information if new_resource != resource: @@ -218,7 +220,7 @@ class Connection(base.Connection): if new_meter not in meters: new_resource['f:m_%s' % new_meter] = "1" - self.resource.put(data['resource_id'], new_resource) + resource_table.put(data['resource_id'], new_resource) # Rowkey consists of reversed timestamp, meter and an md5 of # user+resource+project for purposes of uniqueness @@ -253,34 +255,36 @@ class Connection(base.Connection): data['timestamp'] = ts # Save original meter. record['f:message'] = json.dumps(data) - self.meter.put(row, record) + meter_table.put(row, record) def get_users(self, source=None): """Return an iterable of user id strings. :param source: Optional source filter. """ + user_table = self.conn.table(self.USER_TABLE) LOG.debug("source: %s" % source) scan_args = {} if source: scan_args['columns'] = ['f:s_%s' % source] - return sorted(key for key, ignored in self.user.scan(**scan_args)) + return sorted(key for key, ignored in user_table.scan(**scan_args)) def get_projects(self, source=None): """Return an iterable of project id strings. :param source: Optional source filter. """ + project_table = self.conn.table(self.PROJECT_TABLE) LOG.debug("source: %s" % source) scan_args = {} if source: scan_args['columns'] = ['f:s_%s' % source] - return (key for key, ignored in self.project.scan(**scan_args)) + return (key for key, ignored in project_table.scan(**scan_args)) def get_resources(self, user=None, project=None, source=None, start_timestamp=None, start_timestamp_op=None, end_timestamp=None, end_timestamp_op=None, - metaquery={}): + metaquery={}, resource=None): """Return an iterable of models.Resource instances :param user: Optional ID for user that owns the resource. @@ -291,7 +295,9 @@ class Connection(base.Connection): :param end_timestamp: Optional modified timestamp end range. :param end_timestamp_op: Optional end time operator, like lt, le. :param metaquery: Optional dict with metadata to match on. + :param resource: Optional resource filter. """ + def make_resource(data): """Transform HBase fields to Resource model.""" # convert HBase metadata e.g. f:r_display_name to display_name @@ -312,9 +318,13 @@ class Connection(base.Connection): ], ) + resource_table = self.conn.table(self.RESOURCE_TABLE) + meter_table = self.conn.table(self.METER_TABLE) + q, start_row, stop_row = make_query(user=user, project=project, source=source, + resource=resource, start=start_timestamp, start_op=start_timestamp_op, end=end_timestamp, @@ -322,7 +332,8 @@ class Connection(base.Connection): require_meter=False, query_only=False) LOG.debug("Query Meter table: %s" % q) - gen = self.meter.scan(filter=q, row_start=start_row, row_stop=stop_row) + gen = meter_table.scan(filter=q, row_start=start_row, + row_stop=stop_row) # put all the resource_ids in a Set resource_ids = Set() @@ -331,7 +342,7 @@ class Connection(base.Connection): # handle metaquery if len(metaquery) > 0: - for ignored, data in self.resource.rows(resource_ids): + for ignored, data in resource_table.rows(resource_ids): for k, v in metaquery.iteritems(): # if metaquery matches, yield the resource model # e.g. metaquery: metadata.display_name @@ -340,7 +351,7 @@ class Connection(base.Connection): if data['f:r_' + k.split('.', 1)[1]] == v: yield make_resource(data) else: - for ignored, data in self.resource.rows(resource_ids): + for ignored, data in resource_table.rows(resource_ids): yield make_resource(data) def get_meters(self, user=None, project=None, resource=None, source=None, @@ -353,6 +364,8 @@ class Connection(base.Connection): :param source: Optional source filter. :param metaquery: Optional dict with metadata to match on. """ + + resource_table = self.conn.table(self.RESOURCE_TABLE) q = make_query(user=user, project=project, resource=resource, source=source, require_meter=False, query_only=True) LOG.debug("Query Resource table: %s" % q) @@ -371,7 +384,7 @@ class Connection(base.Connection): else: q = meta_q # metaquery only - gen = self.resource.scan(filter=q) + gen = resource_table.scan(filter=q) for ignored, data in gen: # Meter columns are stored like this: @@ -407,11 +420,14 @@ class Connection(base.Connection): data['timestamp'] = timeutils.parse_strtime(data['timestamp']) return models.Sample(**data) + resource_table = self.conn.table(self.RESOURCE_TABLE) + meter_table = self.conn.table(self.METER_TABLE) + q, start, stop = make_query_from_filter(sample_filter, require_meter=False) LOG.debug("Query Meter Table: %s" % q) - gen = self.meter.scan(filter=q, row_start=start, row_stop=stop) + gen = meter_table.scan(filter=q, row_start=start, row_stop=stop) for ignored, meter in gen: # TODO(shengjie) put this implementation here because it's failing @@ -424,7 +440,7 @@ class Connection(base.Connection): break if len(metaquery) > 0: # metaquery checks resource table - resource = self.resource.row(meter['f:resource_id']) + resource = resource_table.row(meter['f:resource_id']) for k, v in metaquery.iteritems(): if resource['f:r_' + k.split('.', 1)[1]] != v: @@ -438,7 +454,8 @@ class Connection(base.Connection): limit -= 1 yield make_sample(meter) - def _update_meter_stats(self, stat, meter): + @staticmethod + def _update_meter_stats(stat, meter): """Do the stats calculation on a requested time bucket in stats dict :param stats: dict where aggregated stats are kept @@ -473,12 +490,13 @@ class Connection(base.Connection): because of all the Thrift traffic it is going to create. """ + meter_table = self.conn.table(self.METER_TABLE) + q, start, stop = make_query_from_filter(sample_filter) meters = list(meter for (ignored, meter) in - self.meter.scan(filter=q, - row_start=start, - row_stop=stop) + meter_table.scan(filter=q, row_start=start, + row_stop=stop) ) if sample_filter.start: @@ -672,7 +690,7 @@ class MConnection(object): return t def delete_table(self, name, use_prefix=True): - self.tables.remove(self.tables[name]) + del self.tables[name] def table(self, name): return self.create_table(name) diff --git a/tests/api/v2/test_impl_hbase.py b/tests/api/v2/test_impl_hbase.py index 5c00b0566..5a857b6d2 100644 --- a/tests/api/v2/test_impl_hbase.py +++ b/tests/api/v2/test_impl_hbase.py @@ -11,65 +11,67 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +import testtools -# from . import acl -# from . import alarm -# from . import compute_duration_by_resource -# from . import list_events +from . import acl +from . import alarm +from . import compute_duration_by_resource +from . import list_events from . import list_meters -# from . import list_resources +from . import list_resources from . import post_samples -# from . import statistics +from . import statistics -# TODO(jd) Fix the HBase driver to pass these tests! -# class TestAPIAcl(acl.TestAPIACL): -# database_connection = 'hbase://__test__' +class TestAPIAcl(acl.TestAPIACL): + database_connection = 'hbase://__test__' -# TODO(jd) Fix the HBase driver to pass these tests! -# class TestListEvents(list_events.TestListEvents): -# database_connection = 'hbase://__test__' -# TODO(jd) Fix the HBase driver to pass these tests! -# class TestListEmptyAlarms(alarm.TestListEmptyAlarms): -# database_connection = 'hbase://__test__' +class TestListEvents(list_events.TestListEvents): + database_connection = 'hbase://__test__' -# TODO(jd) Fix the HBase driver to pass these tests! -# class TestAlarms(alarm.TestAlarms): -# database_connection = 'hbase://__test__' -# TODO(jd) Fix the HBase driver to pass these tests! -# class TestComputeDurationByResource( -# compute_duration_by_resource.TestComputeDurationByResource): -# database_connection = 'hbase://__test__' +@testtools.skip('HBase alarms not implemented') +class TestListEmptyAlarms(alarm.TestListEmptyAlarms): + database_connection = 'hbase://__test__' + + +@testtools.skip('HBase alarms not implemented') +class TestAlarms(alarm.TestAlarms): + database_connection = 'hbase://__test__' + + +class TestComputeDurationByResource( + compute_duration_by_resource.TestComputeDurationByResource): + database_connection = 'hbase://__test__' class TestListEmptyMeters(list_meters.TestListEmptyMeters): database_connection = 'hbase://__test__' -# TODO(jd) Fix the HBase driver to pass these tests! -# class TestListMeters(list_meters.TestListMeters): -# database_connection = 'hbase://__test__' -# TODO(jd) Fix the HBase driver to pass these tests! -# class TestListResources(list_resources.TestListResources): -# database_connection = 'hbase://__test__' +class TestListMeters(list_meters.TestListMeters): + database_connection = 'hbase://__test__' -# TODO(jd) Fix the HBase driver to pass these tests! -# class TestMaxProjectVolume(statistics.TestMaxProjectVolume): -# database_connection = 'hbase://__test__' -# TODO(jd) Fix the HBase driver to pass these tests! -# class TestMaxResourceVolume(statistics.TestMaxResourceVolume): -# database_connection = 'hbase://__test__' +class TestListResources(list_resources.TestListResources): + database_connection = 'hbase://__test__' -# TODO(jd) Fix the HBase driver to pass these tests! -# class TestSumProjectVolume(statistics.TestSumProjectVolume): -# database_connection = 'hbase://__test__' -# TODO(jd) Fix the HBase driver to pass these tests! -# class TestSumResourceVolume(statistics.TestSumProjectVolume): -# database_connection = 'hbase://__test__' +class TestMaxProjectVolume(statistics.TestMaxProjectVolume): + database_connection = 'hbase://__test__' + + +class TestMaxResourceVolume(statistics.TestMaxResourceVolume): + database_connection = 'hbase://__test__' + + +class TestSumProjectVolume(statistics.TestSumProjectVolume): + database_connection = 'hbase://__test__' + + +class TestSumResourceVolume(statistics.TestSumProjectVolume): + database_connection = 'hbase://__test__' class TestPostSamples(post_samples.TestPostSamples): diff --git a/tests/storage/test_impl_hbase.py b/tests/storage/test_impl_hbase.py index 9b21f7ad9..93703bd91 100644 --- a/tests/storage/test_impl_hbase.py +++ b/tests/storage/test_impl_hbase.py @@ -24,6 +24,10 @@ running the tests. Make sure the Thrift server is running on that server. """ +from oslo.config import cfg + +from ceilometer.storage.impl_hbase import Connection +from ceilometer.storage.impl_hbase import MConnection from tests.storage import base @@ -31,6 +35,27 @@ class HBaseEngineTestBase(base.DBTestBase): database_connection = 'hbase://__test__' +class ConnectionTest(HBaseEngineTestBase): + + def test_hbase_connection(self): + cfg.CONF.database.connection = self.database_connection + conn = Connection(cfg.CONF) + self.assertIsInstance(conn.conn, MConnection) + + class TestConn(object): + def __init__(self, host, port): + self.netloc = '%(host)s:%(port)s' % locals() + + def open(self): + pass + + cfg.CONF.database.connection = 'hbase://test_hbase:9090' + self.stubs.Set(Connection, '_get_connection', + lambda self, x: TestConn(x['host'], x['port'])) + conn = Connection(cfg.CONF) + self.assertIsInstance(conn.conn, TestConn) + + class UserTest(base.UserTest, HBaseEngineTestBase): pass