move hbase event driver to event tree

this patch moves the event related code to the hbase driver in
event tree.

Change-Id: Ibc1be241db6360b626d7fb0b1e302f31c2aa4d30
Partially-Implements: blueprint dedicated-event-db
This commit is contained in:
gordon chung 2014-10-02 14:09:17 -04:00
parent dc2b44da29
commit 7169a42264
3 changed files with 215 additions and 159 deletions

View File

@ -12,10 +12,19 @@
# under the License. # under the License.
"""HBase storage backend """HBase storage backend
""" """
import operator
import os
import happybase
from oslo.utils import netutils
from six.moves.urllib import parse as urlparse
from ceilometer.event.storage import base
from ceilometer.event.storage import models
from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log from ceilometer.openstack.common import log
from ceilometer.storage.hbase import inmemory as hbase_inmemory
from ceilometer.storage.hbase import utils as hbase_utils from ceilometer.storage.hbase import utils as hbase_utils
from ceilometer.storage import impl_hbase as base
from ceilometer import utils from ceilometer import utils
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -39,7 +48,7 @@ class Connection(base.Connection):
- events: - events:
- row_key: timestamp of event's generation + uuid of event - row_key: timestamp of event's generation + uuid of event
in format: "%s+%s" % (ts, Event.message_id) in format: "%s:%s" % (ts, Event.message_id)
- Column Families: - Column Families:
f: contains the following qualifiers: f: contains the following qualifiers:
@ -50,7 +59,7 @@ class Connection(base.Connection):
.. code-block:: python .. code-block:: python
"%s+%s" % (trait_name, trait_type) "%s:%s" % (trait_name, trait_type)
""" """
CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES, CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES,
@ -63,6 +72,27 @@ class Connection(base.Connection):
EVENT_TABLE = "event" EVENT_TABLE = "event"
def __init__(self, url):
"""Hbase Connection Initialization."""
opts = self._parse_connection_url(url)
if opts['host'] == '__test__':
url = os.environ.get('CEILOMETER_TEST_HBASE_URL')
if url:
# Reparse URL, but from the env variable now
opts = self._parse_connection_url(url)
self.conn_pool = self._get_connection_pool(opts)
else:
# This is a in-memory usage for unit tests
if Connection._memory_instance is None:
LOG.debug(_('Creating a new in-memory HBase '
'Connection object'))
Connection._memory_instance = (hbase_inmemory.
MConnectionPool())
self.conn_pool = Connection._memory_instance
else:
self.conn_pool = self._get_connection_pool(opts)
def upgrade(self): def upgrade(self):
tables = [self.EVENT_TABLE] tables = [self.EVENT_TABLE]
column_families = {'f': dict(max_versions=1)} column_families = {'f': dict(max_versions=1)}
@ -81,3 +111,173 @@ class Connection(base.Connection):
conn.delete_table(table) conn.delete_table(table)
except Exception: except Exception:
LOG.debug(_('Cannot delete table but ignoring error')) LOG.debug(_('Cannot delete table but ignoring error'))
@staticmethod
def _get_connection_pool(conf):
"""Return a connection pool to the database.
.. note::
The tests use a subclass to override this and return an
in-memory connection pool.
"""
LOG.debug(_('connecting to HBase on %(host)s:%(port)s') % (
{'host': conf['host'], 'port': conf['port']}))
return happybase.ConnectionPool(size=100, host=conf['host'],
port=conf['port'],
table_prefix=conf['table_prefix'])
@staticmethod
def _parse_connection_url(url):
"""Parse connection parameters from a database url.
.. note::
HBase Thrift does not support authentication and there is no
database name, so we are not looking for these in the url.
"""
opts = {}
result = netutils.urlsplit(url)
opts['table_prefix'] = urlparse.parse_qs(
result.query).get('table_prefix', [None])[0]
opts['dbtype'] = result.scheme
if ':' in result.netloc:
opts['host'], port = result.netloc.split(':')
else:
opts['host'] = result.netloc
port = 9090
opts['port'] = port and int(port) or 9090
return opts
def record_events(self, event_models):
"""Write the events to Hbase.
:param event_models: a list of models.Event objects.
:return problem_events: a list of events that could not be saved in a
(reason, event) tuple. From the reasons that are enumerated in
storage.models.Event only the UNKNOWN_PROBLEM is applicable here.
"""
problem_events = []
with self.conn_pool.connection() as conn:
events_table = conn.table(self.EVENT_TABLE)
for event_model in event_models:
# Row key consists of timestamp and message_id from
# models.Event or purposes of storage event sorted by
# timestamp in the database.
ts = event_model.generated
row = hbase_utils.prepare_key(
hbase_utils.timestamp(ts, reverse=False),
event_model.message_id)
event_type = event_model.event_type
traits = {}
if event_model.traits:
for trait in event_model.traits:
key = hbase_utils.prepare_key(trait.name, trait.dtype)
traits[key] = trait.value
record = hbase_utils.serialize_entry(traits,
event_type=event_type,
timestamp=ts)
try:
events_table.put(row, record)
except Exception as ex:
LOG.debug(_("Failed to record event: %s") % ex)
problem_events.append((models.Event.UNKNOWN_PROBLEM,
event_model))
return problem_events
def get_events(self, event_filter):
"""Return an iter of models.Event objects.
:param event_filter: storage.EventFilter object, consists of filters
for events that are stored in database.
"""
q, start, stop = hbase_utils.make_events_query_from_filter(
event_filter)
with self.conn_pool.connection() as conn:
events_table = conn.table(self.EVENT_TABLE)
gen = events_table.scan(filter=q, row_start=start, row_stop=stop)
for event_id, data in gen:
traits = []
events_dict = hbase_utils.deserialize_entry(data)[0]
for key, value in events_dict.items():
if isinstance(key, tuple):
trait_name, trait_dtype = key
traits.append(models.Trait(name=trait_name,
dtype=int(trait_dtype),
value=value))
ts, mess = event_id.split(':')
yield models.Event(
message_id=hbase_utils.unquote(mess),
event_type=events_dict['event_type'],
generated=events_dict['timestamp'],
traits=sorted(traits,
key=operator.attrgetter('dtype'))
)
def get_event_types(self):
"""Return all event types as an iterable of strings."""
with self.conn_pool.connection() as conn:
events_table = conn.table(self.EVENT_TABLE)
gen = events_table.scan()
event_types = set()
for event_id, data in gen:
events_dict = hbase_utils.deserialize_entry(data)[0]
for key, value in events_dict.items():
if not isinstance(key, tuple) and key.startswith('event_type'):
if value not in event_types:
event_types.add(value)
yield value
def get_trait_types(self, event_type):
"""Return a dictionary containing the name and data type of the trait.
Only trait types for the provided event_type are returned.
:param event_type: the type of the Event
"""
q = hbase_utils.make_query(event_type=event_type)
trait_names = set()
with self.conn_pool.connection() as conn:
events_table = conn.table(self.EVENT_TABLE)
gen = events_table.scan(filter=q)
for event_id, data in gen:
events_dict = hbase_utils.deserialize_entry(data)[0]
for key, value in events_dict.items():
if isinstance(key, tuple):
trait_name, trait_type = key
if trait_name not in trait_names:
# Here we check that our method return only unique
# trait types, for ex. if it is found the same trait
# types in different events with equal event_type,
# method will return only one trait type. It is
# proposed that certain trait name could have only one
# trait type.
trait_names.add(trait_name)
data_type = models.Trait.type_names[int(trait_type)]
yield {'name': trait_name, 'data_type': data_type}
def get_traits(self, event_type, trait_type=None):
"""Return all trait instances associated with an event_type.
If trait_type is specified, only return instances of that trait type.
:param event_type: the type of the Event to filter by
:param trait_type: the name of the Trait to filter by
"""
q = hbase_utils.make_query(event_type=event_type,
trait_type=trait_type)
with self.conn_pool.connection() as conn:
events_table = conn.table(self.EVENT_TABLE)
gen = events_table.scan(filter=q)
for event_id, data in gen:
events_dict = hbase_utils.deserialize_entry(data)[0]
for key, value in events_dict.items():
if isinstance(key, tuple):
trait_name, trait_type = key
yield models.Trait(name=trait_name,
dtype=int(trait_type), value=value)

View File

@ -23,7 +23,6 @@ from oslo.utils import timeutils
from six.moves.urllib import parse as urlparse from six.moves.urllib import parse as urlparse
import ceilometer import ceilometer
from ceilometer.event.storage import models as ev_models
from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log from ceilometer.openstack.common import log
from ceilometer.storage import base from ceilometer.storage import base
@ -46,7 +45,6 @@ AVAILABLE_CAPABILITIES = {
'statistics': {'query': {'simple': True, 'statistics': {'query': {'simple': True,
'metadata': True}, 'metadata': True},
'aggregation': {'standard': True}}, 'aggregation': {'standard': True}},
'events': {'query': {'simple': True}},
} }
@ -111,22 +109,6 @@ class Connection(base.Connection):
"%s:%s:%s:%s:%s" % (rts, source, counter_name, counter_type, "%s:%s:%s:%s:%s" % (rts, source, counter_name, counter_type,
counter_unit) counter_unit)
- events:
- row_key: timestamp of event's generation + uuid of event
in format: "%s:%s" % (ts, Event.message_id)
- Column Families:
f: contains the following qualifiers:
- event_type: description of event's type
- timestamp: time stamp of event generation
- all traits for this event in format:
.. code-block:: python
"%s:%s" % (trait_name, trait_type)
""" """
CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES, CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES,
@ -139,7 +121,6 @@ class Connection(base.Connection):
RESOURCE_TABLE = "resource" RESOURCE_TABLE = "resource"
METER_TABLE = "meter" METER_TABLE = "meter"
EVENT_TABLE = "event"
def __init__(self, url): def __init__(self, url):
"""Hbase Connection Initialization.""" """Hbase Connection Initialization."""
@ -163,7 +144,7 @@ class Connection(base.Connection):
self.conn_pool = self._get_connection_pool(opts) self.conn_pool = self._get_connection_pool(opts)
def upgrade(self): def upgrade(self):
tables = [self.RESOURCE_TABLE, self.METER_TABLE, self.EVENT_TABLE] tables = [self.RESOURCE_TABLE, self.METER_TABLE]
column_families = {'f': dict(max_versions=1)} column_families = {'f': dict(max_versions=1)}
with self.conn_pool.connection() as conn: with self.conn_pool.connection() as conn:
hbase_utils.create_tables(conn, tables, column_families) hbase_utils.create_tables(conn, tables, column_families)
@ -173,8 +154,7 @@ class Connection(base.Connection):
LOG.debug(_('Dropping HBase schema...')) LOG.debug(_('Dropping HBase schema...'))
with self.conn_pool.connection() as conn: with self.conn_pool.connection() as conn:
for table in [self.RESOURCE_TABLE, for table in [self.RESOURCE_TABLE,
self.METER_TABLE, self.METER_TABLE]:
self.EVENT_TABLE]:
try: try:
conn.disable_table(table) conn.disable_table(table)
except Exception: except Exception:
@ -498,136 +478,3 @@ class Connection(base.Connection):
) )
self._update_meter_stats(results[-1], meter[0]) self._update_meter_stats(results[-1], meter[0])
return results return results
def record_events(self, event_models):
"""Write the events to Hbase.
:param event_models: a list of models.Event objects.
:return problem_events: a list of events that could not be saved in a
(reason, event) tuple. From the reasons that are enumerated in
storage.models.Event only the UNKNOWN_PROBLEM is applicable here.
"""
problem_events = []
with self.conn_pool.connection() as conn:
events_table = conn.table(self.EVENT_TABLE)
for event_model in event_models:
# Row key consists of timestamp and message_id from
# models.Event or purposes of storage event sorted by
# timestamp in the database.
ts = event_model.generated
row = hbase_utils.prepare_key(
hbase_utils.timestamp(ts, reverse=False),
event_model.message_id)
event_type = event_model.event_type
traits = {}
if event_model.traits:
for trait in event_model.traits:
key = hbase_utils.prepare_key(trait.name, trait.dtype)
traits[key] = trait.value
record = hbase_utils.serialize_entry(traits,
event_type=event_type,
timestamp=ts)
try:
events_table.put(row, record)
except Exception as ex:
LOG.debug(_("Failed to record event: %s") % ex)
problem_events.append((ev_models.Event.UNKNOWN_PROBLEM,
event_model))
return problem_events
def get_events(self, event_filter):
"""Return an iter of models.Event objects.
:param event_filter: storage.EventFilter object, consists of filters
for events that are stored in database.
"""
q, start, stop = hbase_utils.make_events_query_from_filter(
event_filter)
with self.conn_pool.connection() as conn:
events_table = conn.table(self.EVENT_TABLE)
gen = events_table.scan(filter=q, row_start=start, row_stop=stop)
for event_id, data in gen:
traits = []
events_dict = hbase_utils.deserialize_entry(data)[0]
for key, value in events_dict.items():
if isinstance(key, tuple):
trait_name, trait_dtype = key
traits.append(ev_models.Trait(name=trait_name,
dtype=int(trait_dtype),
value=value))
ts, mess = event_id.split(':')
yield ev_models.Event(
message_id=hbase_utils.unquote(mess),
event_type=events_dict['event_type'],
generated=events_dict['timestamp'],
traits=sorted(traits,
key=operator.attrgetter('dtype'))
)
def get_event_types(self):
"""Return all event types as an iterable of strings."""
with self.conn_pool.connection() as conn:
events_table = conn.table(self.EVENT_TABLE)
gen = events_table.scan()
event_types = set()
for event_id, data in gen:
events_dict = hbase_utils.deserialize_entry(data)[0]
for key, value in events_dict.items():
if not isinstance(key, tuple) and key.startswith('event_type'):
if value not in event_types:
event_types.add(value)
yield value
def get_trait_types(self, event_type):
"""Return a dictionary containing the name and data type of the trait.
Only trait types for the provided event_type are returned.
:param event_type: the type of the Event
"""
q = hbase_utils.make_query(event_type=event_type)
trait_names = set()
with self.conn_pool.connection() as conn:
events_table = conn.table(self.EVENT_TABLE)
gen = events_table.scan(filter=q)
for event_id, data in gen:
events_dict = hbase_utils.deserialize_entry(data)[0]
for key, value in events_dict.items():
if isinstance(key, tuple):
trait_name, trait_type = key
if trait_name not in trait_names:
# Here we check that our method return only unique
# trait types, for ex. if it is found the same trait
# types in different events with equal event_type,
# method will return only one trait type. It is
# proposed that certain trait name could have only one
# trait type.
trait_names.add(trait_name)
data_type = ev_models.Trait.type_names[int(trait_type)]
yield {'name': trait_name, 'data_type': data_type}
def get_traits(self, event_type, trait_type=None):
"""Return all trait instances associated with an event_type.
If trait_type is specified, only return instances of that trait type.
:param event_type: the type of the Event to filter by
:param trait_type: the name of the Trait to filter by
"""
q = hbase_utils.make_query(event_type=event_type,
trait_type=trait_type)
with self.conn_pool.connection() as conn:
events_table = conn.table(self.EVENT_TABLE)
gen = events_table.scan(filter=q)
for event_id, data in gen:
events_dict = hbase_utils.deserialize_entry(data)[0]
for key, value in events_dict.items():
if isinstance(key, tuple):
trait_name, trait_type = key
yield ev_models.Trait(name=trait_name,
dtype=int(trait_type), value=value)

View File

@ -26,6 +26,7 @@
import mock import mock
from ceilometer.alarm.storage import impl_hbase as hbase_alarm from ceilometer.alarm.storage import impl_hbase as hbase_alarm
from ceilometer.event.storage import impl_hbase as hbase_event
from ceilometer.storage import impl_hbase as hbase from ceilometer.storage import impl_hbase as hbase
from ceilometer.tests import base as test_base from ceilometer.tests import base as test_base
from ceilometer.tests import db as tests_db from ceilometer.tests import db as tests_db
@ -87,7 +88,7 @@ class CapabilitiesTest(test_base.BaseTestCase):
'stddev': False, 'stddev': False,
'cardinality': False}} 'cardinality': False}}
}, },
'events': {'query': {'simple': True}}, 'events': {'query': {'simple': False}},
} }
actual_capabilities = hbase.Connection.get_capabilities() actual_capabilities = hbase.Connection.get_capabilities()
@ -104,6 +105,14 @@ class CapabilitiesTest(test_base.BaseTestCase):
actual_capabilities = hbase_alarm.Connection.get_capabilities() actual_capabilities = hbase_alarm.Connection.get_capabilities()
self.assertEqual(expected_capabilities, actual_capabilities) self.assertEqual(expected_capabilities, actual_capabilities)
def test_event_capabilities(self):
expected_capabilities = {
'events': {'query': {'simple': True}},
}
actual_capabilities = hbase_event.Connection.get_capabilities()
self.assertEqual(expected_capabilities, actual_capabilities)
def test_storage_capabilities(self): def test_storage_capabilities(self):
expected_capabilities = { expected_capabilities = {
'storage': {'production_ready': True}, 'storage': {'production_ready': True},