diff --git a/ceilometer/alarm/storage/impl_hbase.py b/ceilometer/alarm/storage/impl_hbase.py new file mode 100644 index 000000000..efdcb4722 --- /dev/null +++ b/ceilometer/alarm/storage/impl_hbase.py @@ -0,0 +1,224 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""HBase storage backend +""" +import datetime +import os + +import happybase +from six.moves.urllib import parse as urlparse + +from ceilometer.alarm.storage import base +from ceilometer.alarm.storage import models +from ceilometer.openstack.common.gettextutils import _ +from ceilometer.openstack.common import log +from ceilometer.openstack.common import network_utils +from ceilometer.storage.hbase import inmemory as hbase_inmemory +from ceilometer.storage.hbase import utils as hbase_utils +from ceilometer import utils + +LOG = log.getLogger(__name__) + + +AVAILABLE_CAPABILITIES = { + 'alarms': {'query': {'simple': True, + 'complex': False}, + 'history': {'query': {'simple': True, + 'complex': False}}}, +} + + +AVAILABLE_STORAGE_CAPABILITIES = { + 'storage': {'production_ready': True}, +} + + +class Connection(base.Connection): + """Put the data into a HBase database + + Collections: + + - alarm: + + - row_key: uuid of alarm + - Column Families: + + f: contains the raw incoming alarm data + + - alarm_h: + + - row_key: uuid of alarm + "_" + reversed timestamp + - Column Families: + + f: raw incoming alarm_history data. Timestamp becomes now() + if not determined + """ + + CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES, + AVAILABLE_CAPABILITIES) + STORAGE_CAPABILITIES = utils.update_nested( + base.Connection.STORAGE_CAPABILITIES, + AVAILABLE_STORAGE_CAPABILITIES, + ) + _memory_instance = None + + ALARM_TABLE = "alarm" + ALARM_HISTORY_TABLE = "alarm_h" + + def __init__(self, url): + """Hbase Connection Initialization.""" + opts = self._parse_connection_url(url) + + if opts['host'] == '__test__': + url = os.environ.get('CEILOMETER_TEST_HBASE_URL') + if url: + # Reparse URL, but from the env variable now + opts = self._parse_connection_url(url) + self.conn_pool = self._get_connection_pool(opts) + else: + # This is a in-memory usage for unit tests + if Connection._memory_instance is None: + LOG.debug(_('Creating a new in-memory HBase ' + 'Connection object')) + Connection._memory_instance = (hbase_inmemory. + MConnectionPool()) + self.conn_pool = Connection._memory_instance + else: + self.conn_pool = self._get_connection_pool(opts) + + def upgrade(self): + with self.conn_pool.connection() as conn: + conn.create_table(self.ALARM_TABLE, {'f': dict()}) + conn.create_table(self.ALARM_HISTORY_TABLE, {'f': dict()}) + + def clear(self): + LOG.debug(_('Dropping HBase schema...')) + with self.conn_pool.connection() as conn: + for table in [self.ALARM_TABLE, + self.ALARM_HISTORY_TABLE]: + try: + conn.disable_table(table) + except Exception: + LOG.debug(_('Cannot disable table but ignoring error')) + try: + conn.delete_table(table) + except Exception: + LOG.debug(_('Cannot delete table but ignoring error')) + + @staticmethod + def _get_connection_pool(conf): + """Return a connection pool to the database. + + .. note:: + + The tests use a subclass to override this and return an + in-memory connection pool. + """ + LOG.debug(_('connecting to HBase on %(host)s:%(port)s') % ( + {'host': conf['host'], 'port': conf['port']})) + return happybase.ConnectionPool(size=100, host=conf['host'], + port=conf['port'], + table_prefix=conf['table_prefix']) + + @staticmethod + def _parse_connection_url(url): + """Parse connection parameters from a database url. + + .. note:: + + HBase Thrift does not support authentication and there is no + database name, so we are not looking for these in the url. + """ + opts = {} + result = network_utils.urlsplit(url) + opts['table_prefix'] = urlparse.parse_qs( + result.query).get('table_prefix', [None])[0] + opts['dbtype'] = result.scheme + if ':' in result.netloc: + opts['host'], port = result.netloc.split(':') + else: + opts['host'] = result.netloc + port = 9090 + opts['port'] = port and int(port) or 9090 + return opts + + def update_alarm(self, alarm): + """Create an alarm. + + :param alarm: The alarm to create. It is Alarm object, so we need to + call as_dict() + """ + _id = alarm.alarm_id + alarm_to_store = hbase_utils.serialize_entry(alarm.as_dict()) + with self.conn_pool.connection() as conn: + alarm_table = conn.table(self.ALARM_TABLE) + alarm_table.put(_id, alarm_to_store) + stored_alarm = hbase_utils.deserialize_entry( + alarm_table.row(_id))[0] + return models.Alarm(**stored_alarm) + + create_alarm = update_alarm + + def delete_alarm(self, alarm_id): + with self.conn_pool.connection() as conn: + alarm_table = conn.table(self.ALARM_TABLE) + alarm_table.delete(alarm_id) + + def get_alarms(self, name=None, user=None, state=None, meter=None, + project=None, enabled=None, alarm_id=None, pagination=None): + + if pagination: + raise NotImplementedError('Pagination not implemented') + if meter: + raise NotImplementedError('Filter by meter not implemented') + + q = hbase_utils.make_query(alarm_id=alarm_id, name=name, + enabled=enabled, user_id=user, + project_id=project, state=state) + + with self.conn_pool.connection() as conn: + alarm_table = conn.table(self.ALARM_TABLE) + gen = alarm_table.scan(filter=q) + for ignored, data in gen: + stored_alarm = hbase_utils.deserialize_entry(data)[0] + yield models.Alarm(**stored_alarm) + + def get_alarm_changes(self, alarm_id, on_behalf_of, + user=None, project=None, type=None, + start_timestamp=None, start_timestamp_op=None, + end_timestamp=None, end_timestamp_op=None): + q = hbase_utils.make_query(alarm_id=alarm_id, + on_behalf_of=on_behalf_of, type=type, + user_id=user, project_id=project) + start_row, end_row = hbase_utils.make_timestamp_query( + hbase_utils.make_general_rowkey_scan, + start=start_timestamp, start_op=start_timestamp_op, + end=end_timestamp, end_op=end_timestamp_op, bounds_only=True, + some_id=alarm_id) + with self.conn_pool.connection() as conn: + alarm_history_table = conn.table(self.ALARM_HISTORY_TABLE) + gen = alarm_history_table.scan(filter=q, row_start=start_row, + row_stop=end_row) + for ignored, data in gen: + stored_entry = hbase_utils.deserialize_entry(data)[0] + yield models.AlarmChange(**stored_entry) + + def record_alarm_change(self, alarm_change): + """Record alarm change event.""" + alarm_change_dict = hbase_utils.serialize_entry(alarm_change) + ts = alarm_change.get('timestamp') or datetime.datetime.now() + rts = hbase_utils.timestamp(ts) + with self.conn_pool.connection() as conn: + alarm_history_table = conn.table(self.ALARM_HISTORY_TABLE) + alarm_history_table.put(alarm_change.get('alarm_id') + "_" + + str(rts), alarm_change_dict) diff --git a/ceilometer/storage/impl_hbase.py b/ceilometer/storage/impl_hbase.py index b702e873c..ccf6aec70 100644 --- a/ceilometer/storage/impl_hbase.py +++ b/ceilometer/storage/impl_hbase.py @@ -21,8 +21,6 @@ import time import happybase from six.moves.urllib import parse as urlparse -from ceilometer.alarm.storage import base as alarm_base -from ceilometer.alarm.storage import models as alarm_models from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import log from ceilometer.openstack.common import network_utils @@ -47,10 +45,6 @@ AVAILABLE_CAPABILITIES = { 'metadata': True}, 'aggregation': {'standard': True}}, 'events': {'query': {'simple': True}}, - 'alarms': {'query': {'simple': True, - 'complex': False}, - 'history': {'query': {'simple': True, - 'complex': False}}}, } @@ -59,7 +53,7 @@ AVAILABLE_STORAGE_CAPABILITIES = { } -class Connection(base.Connection, alarm_base.Connection): +class Connection(base.Connection): """Put the data into a HBase database Collections: @@ -116,21 +110,6 @@ class Connection(base.Connection, alarm_base.Connection): "%s+%s+%s!%s!%s" % (rts, source, counter_name, counter_type, counter_unit) - - alarm: - - - row_key: uuid of alarm - - Column Families: - - f: contains the raw incoming alarm data - - - alarm_h: - - - row_key: uuid of alarm + "_" + reversed timestamp - - Column Families: - - f: raw incoming alarm_history data. Timestamp becomes now() - if not determined - - events: - row_key: timestamp of event's generation + uuid of event @@ -158,8 +137,6 @@ class Connection(base.Connection, alarm_base.Connection): RESOURCE_TABLE = "resource" METER_TABLE = "meter" - ALARM_TABLE = "alarm" - ALARM_HISTORY_TABLE = "alarm_h" EVENT_TABLE = "event" def __init__(self, url): @@ -187,8 +164,6 @@ class Connection(base.Connection, alarm_base.Connection): with self.conn_pool.connection() as conn: conn.create_table(self.RESOURCE_TABLE, {'f': dict(max_versions=1)}) conn.create_table(self.METER_TABLE, {'f': dict(max_versions=1)}) - conn.create_table(self.ALARM_TABLE, {'f': dict()}) - conn.create_table(self.ALARM_HISTORY_TABLE, {'f': dict()}) conn.create_table(self.EVENT_TABLE, {'f': dict(max_versions=1)}) def clear(self): @@ -196,8 +171,6 @@ class Connection(base.Connection, alarm_base.Connection): with self.conn_pool.connection() as conn: for table in [self.RESOURCE_TABLE, self.METER_TABLE, - self.ALARM_TABLE, - self.ALARM_HISTORY_TABLE, self.EVENT_TABLE]: try: conn.disable_table(table) @@ -245,77 +218,6 @@ class Connection(base.Connection, alarm_base.Connection): opts['port'] = port and int(port) or 9090 return opts - def update_alarm(self, alarm): - """Create an alarm. - - :param alarm: The alarm to create. It is Alarm object, so we need to - call as_dict() - """ - _id = alarm.alarm_id - alarm_to_store = hbase_utils.serialize_entry(alarm.as_dict()) - with self.conn_pool.connection() as conn: - alarm_table = conn.table(self.ALARM_TABLE) - alarm_table.put(_id, alarm_to_store) - stored_alarm = hbase_utils.deserialize_entry( - alarm_table.row(_id))[0] - return alarm_models.Alarm(**stored_alarm) - - create_alarm = update_alarm - - def delete_alarm(self, alarm_id): - with self.conn_pool.connection() as conn: - alarm_table = conn.table(self.ALARM_TABLE) - alarm_table.delete(alarm_id) - - def get_alarms(self, name=None, user=None, state=None, meter=None, - project=None, enabled=None, alarm_id=None, pagination=None): - - if pagination: - raise NotImplementedError('Pagination not implemented') - if meter: - raise NotImplementedError('Filter by meter not implemented') - - q = hbase_utils.make_query(alarm_id=alarm_id, name=name, - enabled=enabled, user_id=user, - project_id=project, state=state) - - with self.conn_pool.connection() as conn: - alarm_table = conn.table(self.ALARM_TABLE) - gen = alarm_table.scan(filter=q) - for ignored, data in gen: - stored_alarm = hbase_utils.deserialize_entry(data)[0] - yield alarm_models.Alarm(**stored_alarm) - - def get_alarm_changes(self, alarm_id, on_behalf_of, - user=None, project=None, type=None, - start_timestamp=None, start_timestamp_op=None, - end_timestamp=None, end_timestamp_op=None): - q = hbase_utils.make_query(alarm_id=alarm_id, - on_behalf_of=on_behalf_of, type=type, - user_id=user, project_id=project) - start_row, end_row = hbase_utils.make_timestamp_query( - hbase_utils.make_general_rowkey_scan, - start=start_timestamp, start_op=start_timestamp_op, - end=end_timestamp, end_op=end_timestamp_op, bounds_only=True, - some_id=alarm_id) - with self.conn_pool.connection() as conn: - alarm_history_table = conn.table(self.ALARM_HISTORY_TABLE) - gen = alarm_history_table.scan(filter=q, row_start=start_row, - row_stop=end_row) - for ignored, data in gen: - stored_entry = hbase_utils.deserialize_entry(data)[0] - yield alarm_models.AlarmChange(**stored_entry) - - def record_alarm_change(self, alarm_change): - """Record alarm change event.""" - alarm_change_dict = hbase_utils.serialize_entry(alarm_change) - ts = alarm_change.get('timestamp') or datetime.datetime.now() - rts = hbase_utils.timestamp(ts) - with self.conn_pool.connection() as conn: - alarm_history_table = conn.table(self.ALARM_HISTORY_TABLE) - alarm_history_table.put(alarm_change.get('alarm_id') + "_" + - str(rts), alarm_change_dict) - def record_metering_data(self, data): """Write the data to the backend storage system. diff --git a/ceilometer/tests/storage/test_impl_hbase.py b/ceilometer/tests/storage/test_impl_hbase.py index 2f42a3bc5..1900c5566 100644 --- a/ceilometer/tests/storage/test_impl_hbase.py +++ b/ceilometer/tests/storage/test_impl_hbase.py @@ -25,6 +25,7 @@ """ import mock +from ceilometer.alarm.storage import impl_hbase as hbase_alarm from ceilometer.storage.hbase import inmemory as hbase_inmemory from ceilometer.storage import impl_hbase as hbase from ceilometer.tests import base as test_base @@ -90,16 +91,23 @@ class CapabilitiesTest(test_base.BaseTestCase): 'stddev': False, 'cardinality': False}} }, - 'alarms': {'query': {'simple': True, - 'complex': False}, - 'history': {'query': {'simple': True, - 'complex': False}}}, 'events': {'query': {'simple': True}}, } actual_capabilities = hbase.Connection.get_capabilities() self.assertEqual(expected_capabilities, actual_capabilities) + def test_alarm_capabilities(self): + expected_capabilities = { + 'alarms': {'query': {'simple': True, + 'complex': False}, + 'history': {'query': {'simple': True, + 'complex': False}}}, + } + + actual_capabilities = hbase_alarm.Connection.get_capabilities() + self.assertEqual(expected_capabilities, actual_capabilities) + def test_storage_capabilities(self): expected_capabilities = { 'storage': {'production_ready': True}, diff --git a/setup.cfg b/setup.cfg index 839679063..fd7ff3fb0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -167,7 +167,7 @@ ceilometer.alarm.storage = mysql = ceilometer.storage.impl_sqlalchemy:Connection postgresql = ceilometer.storage.impl_sqlalchemy:Connection sqlite = ceilometer.storage.impl_sqlalchemy:Connection - hbase = ceilometer.storage.impl_hbase:Connection + hbase = ceilometer.alarm.storage.impl_hbase:Connection db2 = ceilometer.alarm.storage.impl_db2:Connection ceilometer.metering.storage =