enable v2 api hbase tests

- update hbase connection to use in memory connection
- change so we always retrieve table from connection rather than
using obj variable to ensure we're using correct reference
- various fixes to get hbase to behave according to tests

Change-Id: Ie856654b94f1db8e1e4a5d8fcca56631a9726665
This commit is contained in:
Gordon Chung 2013-06-27 18:29:33 -04:00
parent 6215868820
commit dc042eb948
3 changed files with 128 additions and 83 deletions

View File

@ -27,7 +27,6 @@ import datetime
import happybase
import os
import re
from oslo.config import cfg
from ceilometer.openstack.common import log
@ -87,6 +86,13 @@ class Connection(base.Connection):
"""HBase connection.
"""
_memory_instance = None
PROJECT_TABLE = "project"
USER_TABLE = "user"
RESOURCE_TABLE = "resource"
METER_TABLE = "meter"
def __init__(self, conf):
"""Hbase Connection Initialization."""
opts = self._parse_connection_url(conf.database.connection)
@ -97,26 +103,17 @@ class Connection(base.Connection):
if url:
# Reparse URL, but from the env variable now
opts = self._parse_connection_url(url)
self.conn = self._get_connection(opts)
else:
# This is a in-memory usage for unit tests
self.conn = MConnection()
self.project = self.conn.table(self.PROJECT_TABLE)
self.user = self.conn.table(self.USER_TABLE)
self.resource = self.conn.table(self.RESOURCE_TABLE)
self.meter = self.conn.table(self.METER_TABLE)
return
self.conn = self._get_connection(opts)
if Connection._memory_instance is None:
LOG.debug('Creating a new in-memory HBase '
'Connection object')
Connection._memory_instance = MConnection()
self.conn = Connection._memory_instance
else:
self.conn = self._get_connection(opts)
self.conn.open()
self.project = self.conn.table(self.PROJECT_TABLE)
self.user = self.conn.table(self.USER_TABLE)
self.resource = self.conn.table(self.RESOURCE_TABLE)
self.meter = self.conn.table(self.METER_TABLE)
PROJECT_TABLE = "project"
USER_TABLE = "user"
RESOURCE_TABLE = "resource"
METER_TABLE = "meter"
def upgrade(self, version=None):
self.conn.create_table(self.PROJECT_TABLE, {'f': dict()})
@ -178,25 +175,29 @@ class Connection(base.Connection):
:param data: a dictionary such as returned by
ceilometer.meter.meter_message_from_counter
"""
project_table = self.conn.table(self.PROJECT_TABLE)
user_table = self.conn.table(self.USER_TABLE)
resource_table = self.conn.table(self.RESOURCE_TABLE)
meter_table = self.conn.table(self.METER_TABLE)
# Make sure we know about the user and project
if data['user_id']:
user = self.user.row(data['user_id'])
user = user_table.row(data['user_id'])
sources = _load_hbase_list(user, 's')
# Update if source is new
if data['source'] not in sources:
user['f:s_%s' % data['source']] = "1"
self.user.put(data['user_id'], user)
user_table.put(data['user_id'], user)
project = self.project.row(data['project_id'])
project = project_table.row(data['project_id'])
sources = _load_hbase_list(project, 's')
# Update if source is new
if data['source'] not in sources:
project['f:s_%s' % data['source']] = "1"
self.project.put(data['project_id'], project)
project_table.put(data['project_id'], project)
rts = reverse_timestamp(data['timestamp'])
resource = self.resource.row(data['resource_id'])
resource = resource_table.row(data['resource_id'])
new_meter = "%s!%s!%s" % (
data['counter_name'], data['counter_type'], data['counter_unit'])
new_resource = {'f:resource_id': data['resource_id'],
@ -207,10 +208,11 @@ class Connection(base.Connection):
'f:m_%s' % new_meter: "1"
}
# store metadata fields with prefix "r_"
resource_metadata = dict(('f:r_%s' % k, v)
for (k, v)
in data['resource_metadata'].iteritems())
new_resource.update(resource_metadata)
if data['resource_metadata']:
resource_metadata = dict(('f:r_%s' % k, v)
for (k, v)
in data['resource_metadata'].iteritems())
new_resource.update(resource_metadata)
# Update if resource has new information
if new_resource != resource:
@ -218,7 +220,7 @@ class Connection(base.Connection):
if new_meter not in meters:
new_resource['f:m_%s' % new_meter] = "1"
self.resource.put(data['resource_id'], new_resource)
resource_table.put(data['resource_id'], new_resource)
# Rowkey consists of reversed timestamp, meter and an md5 of
# user+resource+project for purposes of uniqueness
@ -253,34 +255,36 @@ class Connection(base.Connection):
data['timestamp'] = ts
# Save original meter.
record['f:message'] = json.dumps(data)
self.meter.put(row, record)
meter_table.put(row, record)
def get_users(self, source=None):
"""Return an iterable of user id strings.
:param source: Optional source filter.
"""
user_table = self.conn.table(self.USER_TABLE)
LOG.debug("source: %s" % source)
scan_args = {}
if source:
scan_args['columns'] = ['f:s_%s' % source]
return sorted(key for key, ignored in self.user.scan(**scan_args))
return sorted(key for key, ignored in user_table.scan(**scan_args))
def get_projects(self, source=None):
"""Return an iterable of project id strings.
:param source: Optional source filter.
"""
project_table = self.conn.table(self.PROJECT_TABLE)
LOG.debug("source: %s" % source)
scan_args = {}
if source:
scan_args['columns'] = ['f:s_%s' % source]
return (key for key, ignored in self.project.scan(**scan_args))
return (key for key, ignored in project_table.scan(**scan_args))
def get_resources(self, user=None, project=None, source=None,
start_timestamp=None, start_timestamp_op=None,
end_timestamp=None, end_timestamp_op=None,
metaquery={}):
metaquery={}, resource=None):
"""Return an iterable of models.Resource instances
:param user: Optional ID for user that owns the resource.
@ -291,7 +295,9 @@ class Connection(base.Connection):
:param end_timestamp: Optional modified timestamp end range.
:param end_timestamp_op: Optional end time operator, like lt, le.
:param metaquery: Optional dict with metadata to match on.
:param resource: Optional resource filter.
"""
def make_resource(data):
"""Transform HBase fields to Resource model."""
# convert HBase metadata e.g. f:r_display_name to display_name
@ -312,9 +318,13 @@ class Connection(base.Connection):
],
)
resource_table = self.conn.table(self.RESOURCE_TABLE)
meter_table = self.conn.table(self.METER_TABLE)
q, start_row, stop_row = make_query(user=user,
project=project,
source=source,
resource=resource,
start=start_timestamp,
start_op=start_timestamp_op,
end=end_timestamp,
@ -322,7 +332,8 @@ class Connection(base.Connection):
require_meter=False,
query_only=False)
LOG.debug("Query Meter table: %s" % q)
gen = self.meter.scan(filter=q, row_start=start_row, row_stop=stop_row)
gen = meter_table.scan(filter=q, row_start=start_row,
row_stop=stop_row)
# put all the resource_ids in a Set
resource_ids = Set()
@ -331,7 +342,7 @@ class Connection(base.Connection):
# handle metaquery
if len(metaquery) > 0:
for ignored, data in self.resource.rows(resource_ids):
for ignored, data in resource_table.rows(resource_ids):
for k, v in metaquery.iteritems():
# if metaquery matches, yield the resource model
# e.g. metaquery: metadata.display_name
@ -340,7 +351,7 @@ class Connection(base.Connection):
if data['f:r_' + k.split('.', 1)[1]] == v:
yield make_resource(data)
else:
for ignored, data in self.resource.rows(resource_ids):
for ignored, data in resource_table.rows(resource_ids):
yield make_resource(data)
def get_meters(self, user=None, project=None, resource=None, source=None,
@ -353,6 +364,8 @@ class Connection(base.Connection):
:param source: Optional source filter.
:param metaquery: Optional dict with metadata to match on.
"""
resource_table = self.conn.table(self.RESOURCE_TABLE)
q = make_query(user=user, project=project, resource=resource,
source=source, require_meter=False, query_only=True)
LOG.debug("Query Resource table: %s" % q)
@ -371,7 +384,7 @@ class Connection(base.Connection):
else:
q = meta_q # metaquery only
gen = self.resource.scan(filter=q)
gen = resource_table.scan(filter=q)
for ignored, data in gen:
# Meter columns are stored like this:
@ -407,11 +420,14 @@ class Connection(base.Connection):
data['timestamp'] = timeutils.parse_strtime(data['timestamp'])
return models.Sample(**data)
resource_table = self.conn.table(self.RESOURCE_TABLE)
meter_table = self.conn.table(self.METER_TABLE)
q, start, stop = make_query_from_filter(sample_filter,
require_meter=False)
LOG.debug("Query Meter Table: %s" % q)
gen = self.meter.scan(filter=q, row_start=start, row_stop=stop)
gen = meter_table.scan(filter=q, row_start=start, row_stop=stop)
for ignored, meter in gen:
# TODO(shengjie) put this implementation here because it's failing
@ -424,7 +440,7 @@ class Connection(base.Connection):
break
if len(metaquery) > 0:
# metaquery checks resource table
resource = self.resource.row(meter['f:resource_id'])
resource = resource_table.row(meter['f:resource_id'])
for k, v in metaquery.iteritems():
if resource['f:r_' + k.split('.', 1)[1]] != v:
@ -438,7 +454,8 @@ class Connection(base.Connection):
limit -= 1
yield make_sample(meter)
def _update_meter_stats(self, stat, meter):
@staticmethod
def _update_meter_stats(stat, meter):
"""Do the stats calculation on a requested time bucket in stats dict
:param stats: dict where aggregated stats are kept
@ -473,12 +490,13 @@ class Connection(base.Connection):
because of all the Thrift traffic it is going to create.
"""
meter_table = self.conn.table(self.METER_TABLE)
q, start, stop = make_query_from_filter(sample_filter)
meters = list(meter for (ignored, meter) in
self.meter.scan(filter=q,
row_start=start,
row_stop=stop)
meter_table.scan(filter=q, row_start=start,
row_stop=stop)
)
if sample_filter.start:
@ -672,7 +690,7 @@ class MConnection(object):
return t
def delete_table(self, name, use_prefix=True):
self.tables.remove(self.tables[name])
del self.tables[name]
def table(self, name):
return self.create_table(name)

View File

@ -11,65 +11,67 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
# from . import acl
# from . import alarm
# from . import compute_duration_by_resource
# from . import list_events
from . import acl
from . import alarm
from . import compute_duration_by_resource
from . import list_events
from . import list_meters
# from . import list_resources
from . import list_resources
from . import post_samples
# from . import statistics
from . import statistics
# TODO(jd) Fix the HBase driver to pass these tests!
# class TestAPIAcl(acl.TestAPIACL):
# database_connection = 'hbase://__test__'
class TestAPIAcl(acl.TestAPIACL):
database_connection = 'hbase://__test__'
# TODO(jd) Fix the HBase driver to pass these tests!
# class TestListEvents(list_events.TestListEvents):
# database_connection = 'hbase://__test__'
# TODO(jd) Fix the HBase driver to pass these tests!
# class TestListEmptyAlarms(alarm.TestListEmptyAlarms):
# database_connection = 'hbase://__test__'
class TestListEvents(list_events.TestListEvents):
database_connection = 'hbase://__test__'
# TODO(jd) Fix the HBase driver to pass these tests!
# class TestAlarms(alarm.TestAlarms):
# database_connection = 'hbase://__test__'
# TODO(jd) Fix the HBase driver to pass these tests!
# class TestComputeDurationByResource(
# compute_duration_by_resource.TestComputeDurationByResource):
# database_connection = 'hbase://__test__'
@testtools.skip('HBase alarms not implemented')
class TestListEmptyAlarms(alarm.TestListEmptyAlarms):
database_connection = 'hbase://__test__'
@testtools.skip('HBase alarms not implemented')
class TestAlarms(alarm.TestAlarms):
database_connection = 'hbase://__test__'
class TestComputeDurationByResource(
compute_duration_by_resource.TestComputeDurationByResource):
database_connection = 'hbase://__test__'
class TestListEmptyMeters(list_meters.TestListEmptyMeters):
database_connection = 'hbase://__test__'
# TODO(jd) Fix the HBase driver to pass these tests!
# class TestListMeters(list_meters.TestListMeters):
# database_connection = 'hbase://__test__'
# TODO(jd) Fix the HBase driver to pass these tests!
# class TestListResources(list_resources.TestListResources):
# database_connection = 'hbase://__test__'
class TestListMeters(list_meters.TestListMeters):
database_connection = 'hbase://__test__'
# TODO(jd) Fix the HBase driver to pass these tests!
# class TestMaxProjectVolume(statistics.TestMaxProjectVolume):
# database_connection = 'hbase://__test__'
# TODO(jd) Fix the HBase driver to pass these tests!
# class TestMaxResourceVolume(statistics.TestMaxResourceVolume):
# database_connection = 'hbase://__test__'
class TestListResources(list_resources.TestListResources):
database_connection = 'hbase://__test__'
# TODO(jd) Fix the HBase driver to pass these tests!
# class TestSumProjectVolume(statistics.TestSumProjectVolume):
# database_connection = 'hbase://__test__'
# TODO(jd) Fix the HBase driver to pass these tests!
# class TestSumResourceVolume(statistics.TestSumProjectVolume):
# database_connection = 'hbase://__test__'
class TestMaxProjectVolume(statistics.TestMaxProjectVolume):
database_connection = 'hbase://__test__'
class TestMaxResourceVolume(statistics.TestMaxResourceVolume):
database_connection = 'hbase://__test__'
class TestSumProjectVolume(statistics.TestSumProjectVolume):
database_connection = 'hbase://__test__'
class TestSumResourceVolume(statistics.TestSumProjectVolume):
database_connection = 'hbase://__test__'
class TestPostSamples(post_samples.TestPostSamples):

View File

@ -24,6 +24,10 @@
running the tests. Make sure the Thrift server is running on that server.
"""
from oslo.config import cfg
from ceilometer.storage.impl_hbase import Connection
from ceilometer.storage.impl_hbase import MConnection
from tests.storage import base
@ -31,6 +35,27 @@ class HBaseEngineTestBase(base.DBTestBase):
database_connection = 'hbase://__test__'
class ConnectionTest(HBaseEngineTestBase):
def test_hbase_connection(self):
cfg.CONF.database.connection = self.database_connection
conn = Connection(cfg.CONF)
self.assertIsInstance(conn.conn, MConnection)
class TestConn(object):
def __init__(self, host, port):
self.netloc = '%(host)s:%(port)s' % locals()
def open(self):
pass
cfg.CONF.database.connection = 'hbase://test_hbase:9090'
self.stubs.Set(Connection, '_get_connection',
lambda self, x: TestConn(x['host'], x['port']))
conn = Connection(cfg.CONF)
self.assertIsInstance(conn.conn, TestConn)
class UserTest(base.UserTest, HBaseEngineTestBase):
pass