From 7169a4226433272d9a28d326cc87036857bfb215 Mon Sep 17 00:00:00 2001 From: gordon chung Date: Thu, 2 Oct 2014 14:09:17 -0400 Subject: [PATCH] move hbase event driver to event tree this patch moves the event related code to the hbase driver in event tree. Change-Id: Ibc1be241db6360b626d7fb0b1e302f31c2aa4d30 Partially-Implements: blueprint dedicated-event-db --- ceilometer/event/storage/impl_hbase.py | 206 +++++++++++++++++++- ceilometer/storage/impl_hbase.py | 157 +-------------- ceilometer/tests/storage/test_impl_hbase.py | 11 +- 3 files changed, 215 insertions(+), 159 deletions(-) diff --git a/ceilometer/event/storage/impl_hbase.py b/ceilometer/event/storage/impl_hbase.py index 5ebf48669..44c19efbc 100644 --- a/ceilometer/event/storage/impl_hbase.py +++ b/ceilometer/event/storage/impl_hbase.py @@ -12,10 +12,19 @@ # under the License. """HBase storage backend """ +import operator +import os + +import happybase +from oslo.utils import netutils +from six.moves.urllib import parse as urlparse + +from ceilometer.event.storage import base +from ceilometer.event.storage import models from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import log +from ceilometer.storage.hbase import inmemory as hbase_inmemory from ceilometer.storage.hbase import utils as hbase_utils -from ceilometer.storage import impl_hbase as base from ceilometer import utils LOG = log.getLogger(__name__) @@ -39,7 +48,7 @@ class Connection(base.Connection): - events: - row_key: timestamp of event's generation + uuid of event - in format: "%s+%s" % (ts, Event.message_id) + in format: "%s:%s" % (ts, Event.message_id) - Column Families: f: contains the following qualifiers: @@ -50,7 +59,7 @@ class Connection(base.Connection): .. code-block:: python - "%s+%s" % (trait_name, trait_type) + "%s:%s" % (trait_name, trait_type) """ CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES, @@ -63,6 +72,27 @@ class Connection(base.Connection): EVENT_TABLE = "event" + def __init__(self, url): + """Hbase Connection Initialization.""" + opts = self._parse_connection_url(url) + + if opts['host'] == '__test__': + url = os.environ.get('CEILOMETER_TEST_HBASE_URL') + if url: + # Reparse URL, but from the env variable now + opts = self._parse_connection_url(url) + self.conn_pool = self._get_connection_pool(opts) + else: + # This is a in-memory usage for unit tests + if Connection._memory_instance is None: + LOG.debug(_('Creating a new in-memory HBase ' + 'Connection object')) + Connection._memory_instance = (hbase_inmemory. + MConnectionPool()) + self.conn_pool = Connection._memory_instance + else: + self.conn_pool = self._get_connection_pool(opts) + def upgrade(self): tables = [self.EVENT_TABLE] column_families = {'f': dict(max_versions=1)} @@ -81,3 +111,173 @@ class Connection(base.Connection): conn.delete_table(table) except Exception: LOG.debug(_('Cannot delete table but ignoring error')) + + @staticmethod + def _get_connection_pool(conf): + """Return a connection pool to the database. + + .. note:: + + The tests use a subclass to override this and return an + in-memory connection pool. + """ + LOG.debug(_('connecting to HBase on %(host)s:%(port)s') % ( + {'host': conf['host'], 'port': conf['port']})) + return happybase.ConnectionPool(size=100, host=conf['host'], + port=conf['port'], + table_prefix=conf['table_prefix']) + + @staticmethod + def _parse_connection_url(url): + """Parse connection parameters from a database url. + + .. note:: + + HBase Thrift does not support authentication and there is no + database name, so we are not looking for these in the url. + """ + opts = {} + result = netutils.urlsplit(url) + opts['table_prefix'] = urlparse.parse_qs( + result.query).get('table_prefix', [None])[0] + opts['dbtype'] = result.scheme + if ':' in result.netloc: + opts['host'], port = result.netloc.split(':') + else: + opts['host'] = result.netloc + port = 9090 + opts['port'] = port and int(port) or 9090 + return opts + + def record_events(self, event_models): + """Write the events to Hbase. + + :param event_models: a list of models.Event objects. + :return problem_events: a list of events that could not be saved in a + (reason, event) tuple. From the reasons that are enumerated in + storage.models.Event only the UNKNOWN_PROBLEM is applicable here. + """ + problem_events = [] + + with self.conn_pool.connection() as conn: + events_table = conn.table(self.EVENT_TABLE) + for event_model in event_models: + # Row key consists of timestamp and message_id from + # models.Event or purposes of storage event sorted by + # timestamp in the database. + ts = event_model.generated + row = hbase_utils.prepare_key( + hbase_utils.timestamp(ts, reverse=False), + event_model.message_id) + event_type = event_model.event_type + traits = {} + if event_model.traits: + for trait in event_model.traits: + key = hbase_utils.prepare_key(trait.name, trait.dtype) + traits[key] = trait.value + record = hbase_utils.serialize_entry(traits, + event_type=event_type, + timestamp=ts) + try: + events_table.put(row, record) + except Exception as ex: + LOG.debug(_("Failed to record event: %s") % ex) + problem_events.append((models.Event.UNKNOWN_PROBLEM, + event_model)) + return problem_events + + def get_events(self, event_filter): + """Return an iter of models.Event objects. + + :param event_filter: storage.EventFilter object, consists of filters + for events that are stored in database. + """ + q, start, stop = hbase_utils.make_events_query_from_filter( + event_filter) + with self.conn_pool.connection() as conn: + events_table = conn.table(self.EVENT_TABLE) + + gen = events_table.scan(filter=q, row_start=start, row_stop=stop) + + for event_id, data in gen: + traits = [] + events_dict = hbase_utils.deserialize_entry(data)[0] + for key, value in events_dict.items(): + if isinstance(key, tuple): + trait_name, trait_dtype = key + traits.append(models.Trait(name=trait_name, + dtype=int(trait_dtype), + value=value)) + ts, mess = event_id.split(':') + + yield models.Event( + message_id=hbase_utils.unquote(mess), + event_type=events_dict['event_type'], + generated=events_dict['timestamp'], + traits=sorted(traits, + key=operator.attrgetter('dtype')) + ) + + def get_event_types(self): + """Return all event types as an iterable of strings.""" + with self.conn_pool.connection() as conn: + events_table = conn.table(self.EVENT_TABLE) + gen = events_table.scan() + + event_types = set() + for event_id, data in gen: + events_dict = hbase_utils.deserialize_entry(data)[0] + for key, value in events_dict.items(): + if not isinstance(key, tuple) and key.startswith('event_type'): + if value not in event_types: + event_types.add(value) + yield value + + def get_trait_types(self, event_type): + """Return a dictionary containing the name and data type of the trait. + + Only trait types for the provided event_type are returned. + + :param event_type: the type of the Event + """ + + q = hbase_utils.make_query(event_type=event_type) + trait_names = set() + with self.conn_pool.connection() as conn: + events_table = conn.table(self.EVENT_TABLE) + gen = events_table.scan(filter=q) + for event_id, data in gen: + events_dict = hbase_utils.deserialize_entry(data)[0] + for key, value in events_dict.items(): + if isinstance(key, tuple): + trait_name, trait_type = key + if trait_name not in trait_names: + # Here we check that our method return only unique + # trait types, for ex. if it is found the same trait + # types in different events with equal event_type, + # method will return only one trait type. It is + # proposed that certain trait name could have only one + # trait type. + trait_names.add(trait_name) + data_type = models.Trait.type_names[int(trait_type)] + yield {'name': trait_name, 'data_type': data_type} + + def get_traits(self, event_type, trait_type=None): + """Return all trait instances associated with an event_type. + + If trait_type is specified, only return instances of that trait type. + :param event_type: the type of the Event to filter by + :param trait_type: the name of the Trait to filter by + """ + q = hbase_utils.make_query(event_type=event_type, + trait_type=trait_type) + with self.conn_pool.connection() as conn: + events_table = conn.table(self.EVENT_TABLE) + gen = events_table.scan(filter=q) + for event_id, data in gen: + events_dict = hbase_utils.deserialize_entry(data)[0] + for key, value in events_dict.items(): + if isinstance(key, tuple): + trait_name, trait_type = key + yield models.Trait(name=trait_name, + dtype=int(trait_type), value=value) diff --git a/ceilometer/storage/impl_hbase.py b/ceilometer/storage/impl_hbase.py index cf3284d52..2499bea3b 100644 --- a/ceilometer/storage/impl_hbase.py +++ b/ceilometer/storage/impl_hbase.py @@ -23,7 +23,6 @@ from oslo.utils import timeutils from six.moves.urllib import parse as urlparse import ceilometer -from ceilometer.event.storage import models as ev_models from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import log from ceilometer.storage import base @@ -46,7 +45,6 @@ AVAILABLE_CAPABILITIES = { 'statistics': {'query': {'simple': True, 'metadata': True}, 'aggregation': {'standard': True}}, - 'events': {'query': {'simple': True}}, } @@ -111,22 +109,6 @@ class Connection(base.Connection): "%s:%s:%s:%s:%s" % (rts, source, counter_name, counter_type, counter_unit) - - - events: - - - row_key: timestamp of event's generation + uuid of event - in format: "%s:%s" % (ts, Event.message_id) - - Column Families: - - f: contains the following qualifiers: - - - event_type: description of event's type - - timestamp: time stamp of event generation - - all traits for this event in format: - - .. code-block:: python - - "%s:%s" % (trait_name, trait_type) """ CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES, @@ -139,7 +121,6 @@ class Connection(base.Connection): RESOURCE_TABLE = "resource" METER_TABLE = "meter" - EVENT_TABLE = "event" def __init__(self, url): """Hbase Connection Initialization.""" @@ -163,7 +144,7 @@ class Connection(base.Connection): self.conn_pool = self._get_connection_pool(opts) def upgrade(self): - tables = [self.RESOURCE_TABLE, self.METER_TABLE, self.EVENT_TABLE] + tables = [self.RESOURCE_TABLE, self.METER_TABLE] column_families = {'f': dict(max_versions=1)} with self.conn_pool.connection() as conn: hbase_utils.create_tables(conn, tables, column_families) @@ -173,8 +154,7 @@ class Connection(base.Connection): LOG.debug(_('Dropping HBase schema...')) with self.conn_pool.connection() as conn: for table in [self.RESOURCE_TABLE, - self.METER_TABLE, - self.EVENT_TABLE]: + self.METER_TABLE]: try: conn.disable_table(table) except Exception: @@ -498,136 +478,3 @@ class Connection(base.Connection): ) self._update_meter_stats(results[-1], meter[0]) return results - - def record_events(self, event_models): - """Write the events to Hbase. - - :param event_models: a list of models.Event objects. - :return problem_events: a list of events that could not be saved in a - (reason, event) tuple. From the reasons that are enumerated in - storage.models.Event only the UNKNOWN_PROBLEM is applicable here. - """ - problem_events = [] - - with self.conn_pool.connection() as conn: - events_table = conn.table(self.EVENT_TABLE) - for event_model in event_models: - # Row key consists of timestamp and message_id from - # models.Event or purposes of storage event sorted by - # timestamp in the database. - ts = event_model.generated - row = hbase_utils.prepare_key( - hbase_utils.timestamp(ts, reverse=False), - event_model.message_id) - event_type = event_model.event_type - traits = {} - if event_model.traits: - for trait in event_model.traits: - key = hbase_utils.prepare_key(trait.name, trait.dtype) - traits[key] = trait.value - record = hbase_utils.serialize_entry(traits, - event_type=event_type, - timestamp=ts) - try: - events_table.put(row, record) - except Exception as ex: - LOG.debug(_("Failed to record event: %s") % ex) - problem_events.append((ev_models.Event.UNKNOWN_PROBLEM, - event_model)) - return problem_events - - def get_events(self, event_filter): - """Return an iter of models.Event objects. - - :param event_filter: storage.EventFilter object, consists of filters - for events that are stored in database. - """ - q, start, stop = hbase_utils.make_events_query_from_filter( - event_filter) - with self.conn_pool.connection() as conn: - events_table = conn.table(self.EVENT_TABLE) - - gen = events_table.scan(filter=q, row_start=start, row_stop=stop) - - for event_id, data in gen: - traits = [] - events_dict = hbase_utils.deserialize_entry(data)[0] - for key, value in events_dict.items(): - if isinstance(key, tuple): - trait_name, trait_dtype = key - traits.append(ev_models.Trait(name=trait_name, - dtype=int(trait_dtype), - value=value)) - ts, mess = event_id.split(':') - - yield ev_models.Event( - message_id=hbase_utils.unquote(mess), - event_type=events_dict['event_type'], - generated=events_dict['timestamp'], - traits=sorted(traits, - key=operator.attrgetter('dtype')) - ) - - def get_event_types(self): - """Return all event types as an iterable of strings.""" - with self.conn_pool.connection() as conn: - events_table = conn.table(self.EVENT_TABLE) - gen = events_table.scan() - - event_types = set() - for event_id, data in gen: - events_dict = hbase_utils.deserialize_entry(data)[0] - for key, value in events_dict.items(): - if not isinstance(key, tuple) and key.startswith('event_type'): - if value not in event_types: - event_types.add(value) - yield value - - def get_trait_types(self, event_type): - """Return a dictionary containing the name and data type of the trait. - - Only trait types for the provided event_type are returned. - - :param event_type: the type of the Event - """ - - q = hbase_utils.make_query(event_type=event_type) - trait_names = set() - with self.conn_pool.connection() as conn: - events_table = conn.table(self.EVENT_TABLE) - gen = events_table.scan(filter=q) - for event_id, data in gen: - events_dict = hbase_utils.deserialize_entry(data)[0] - for key, value in events_dict.items(): - if isinstance(key, tuple): - trait_name, trait_type = key - if trait_name not in trait_names: - # Here we check that our method return only unique - # trait types, for ex. if it is found the same trait - # types in different events with equal event_type, - # method will return only one trait type. It is - # proposed that certain trait name could have only one - # trait type. - trait_names.add(trait_name) - data_type = ev_models.Trait.type_names[int(trait_type)] - yield {'name': trait_name, 'data_type': data_type} - - def get_traits(self, event_type, trait_type=None): - """Return all trait instances associated with an event_type. - - If trait_type is specified, only return instances of that trait type. - :param event_type: the type of the Event to filter by - :param trait_type: the name of the Trait to filter by - """ - q = hbase_utils.make_query(event_type=event_type, - trait_type=trait_type) - with self.conn_pool.connection() as conn: - events_table = conn.table(self.EVENT_TABLE) - gen = events_table.scan(filter=q) - for event_id, data in gen: - events_dict = hbase_utils.deserialize_entry(data)[0] - for key, value in events_dict.items(): - if isinstance(key, tuple): - trait_name, trait_type = key - yield ev_models.Trait(name=trait_name, - dtype=int(trait_type), value=value) diff --git a/ceilometer/tests/storage/test_impl_hbase.py b/ceilometer/tests/storage/test_impl_hbase.py index e5ca5badc..dd638e0a4 100644 --- a/ceilometer/tests/storage/test_impl_hbase.py +++ b/ceilometer/tests/storage/test_impl_hbase.py @@ -26,6 +26,7 @@ import mock from ceilometer.alarm.storage import impl_hbase as hbase_alarm +from ceilometer.event.storage import impl_hbase as hbase_event from ceilometer.storage import impl_hbase as hbase from ceilometer.tests import base as test_base from ceilometer.tests import db as tests_db @@ -87,7 +88,7 @@ class CapabilitiesTest(test_base.BaseTestCase): 'stddev': False, 'cardinality': False}} }, - 'events': {'query': {'simple': True}}, + 'events': {'query': {'simple': False}}, } actual_capabilities = hbase.Connection.get_capabilities() @@ -104,6 +105,14 @@ class CapabilitiesTest(test_base.BaseTestCase): actual_capabilities = hbase_alarm.Connection.get_capabilities() self.assertEqual(expected_capabilities, actual_capabilities) + def test_event_capabilities(self): + expected_capabilities = { + 'events': {'query': {'simple': True}}, + } + + actual_capabilities = hbase_event.Connection.get_capabilities() + self.assertEqual(expected_capabilities, actual_capabilities) + def test_storage_capabilities(self): expected_capabilities = { 'storage': {'production_ready': True},