refactor hbase storage code
create a common hbase base connection that alarm, event, and metering dbs can inherit. Change-Id: Iaefcb17373539885499a64563aa13b8124c53c8d
This commit is contained in:
parent
a2657d99a9
commit
f0881cdb2c
@ -36,7 +36,6 @@ class Connection(object):
|
|||||||
}
|
}
|
||||||
|
|
||||||
def __init__(self, url):
|
def __init__(self, url):
|
||||||
"""Constructor."""
|
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@ -10,21 +10,15 @@
|
|||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
"""HBase storage backend
|
|
||||||
"""
|
|
||||||
import datetime
|
|
||||||
import os
|
|
||||||
|
|
||||||
import happybase
|
import datetime
|
||||||
from oslo.utils import netutils
|
|
||||||
from six.moves.urllib import parse as urlparse
|
|
||||||
|
|
||||||
import ceilometer
|
import ceilometer
|
||||||
from ceilometer.alarm.storage import base
|
from ceilometer.alarm.storage import base
|
||||||
from ceilometer.alarm.storage import models
|
from ceilometer.alarm.storage import models
|
||||||
from ceilometer.openstack.common.gettextutils import _
|
from ceilometer.openstack.common.gettextutils import _
|
||||||
from ceilometer.openstack.common import log
|
from ceilometer.openstack.common import log
|
||||||
from ceilometer.storage.hbase import inmemory as hbase_inmemory
|
from ceilometer.storage.hbase import base as hbase_base
|
||||||
from ceilometer.storage.hbase import migration as hbase_migration
|
from ceilometer.storage.hbase import migration as hbase_migration
|
||||||
from ceilometer.storage.hbase import utils as hbase_utils
|
from ceilometer.storage.hbase import utils as hbase_utils
|
||||||
from ceilometer import utils
|
from ceilometer import utils
|
||||||
@ -45,8 +39,8 @@ AVAILABLE_STORAGE_CAPABILITIES = {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
class Connection(base.Connection):
|
class Connection(hbase_base.Connection, base.Connection):
|
||||||
"""Put the data into a HBase database
|
"""Put the alarm data into a HBase database
|
||||||
|
|
||||||
Collections:
|
Collections:
|
||||||
|
|
||||||
@ -78,25 +72,7 @@ class Connection(base.Connection):
|
|||||||
ALARM_HISTORY_TABLE = "alarm_h"
|
ALARM_HISTORY_TABLE = "alarm_h"
|
||||||
|
|
||||||
def __init__(self, url):
|
def __init__(self, url):
|
||||||
"""Hbase Connection Initialization."""
|
super(Connection, self).__init__(url)
|
||||||
opts = self._parse_connection_url(url)
|
|
||||||
|
|
||||||
if opts['host'] == '__test__':
|
|
||||||
url = os.environ.get('CEILOMETER_TEST_HBASE_URL')
|
|
||||||
if url:
|
|
||||||
# Reparse URL, but from the env variable now
|
|
||||||
opts = self._parse_connection_url(url)
|
|
||||||
self.conn_pool = self._get_connection_pool(opts)
|
|
||||||
else:
|
|
||||||
# This is a in-memory usage for unit tests
|
|
||||||
if Connection._memory_instance is None:
|
|
||||||
LOG.debug(_('Creating a new in-memory HBase '
|
|
||||||
'Connection object'))
|
|
||||||
Connection._memory_instance = (hbase_inmemory.
|
|
||||||
MConnectionPool())
|
|
||||||
self.conn_pool = Connection._memory_instance
|
|
||||||
else:
|
|
||||||
self.conn_pool = self._get_connection_pool(opts)
|
|
||||||
|
|
||||||
def upgrade(self):
|
def upgrade(self):
|
||||||
tables = [self.ALARM_HISTORY_TABLE, self.ALARM_TABLE]
|
tables = [self.ALARM_HISTORY_TABLE, self.ALARM_TABLE]
|
||||||
@ -119,43 +95,6 @@ class Connection(base.Connection):
|
|||||||
except Exception:
|
except Exception:
|
||||||
LOG.debug(_('Cannot delete table but ignoring error'))
|
LOG.debug(_('Cannot delete table but ignoring error'))
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _get_connection_pool(conf):
|
|
||||||
"""Return a connection pool to the database.
|
|
||||||
|
|
||||||
.. note::
|
|
||||||
|
|
||||||
The tests use a subclass to override this and return an
|
|
||||||
in-memory connection pool.
|
|
||||||
"""
|
|
||||||
LOG.debug(_('connecting to HBase on %(host)s:%(port)s') % (
|
|
||||||
{'host': conf['host'], 'port': conf['port']}))
|
|
||||||
return happybase.ConnectionPool(size=100, host=conf['host'],
|
|
||||||
port=conf['port'],
|
|
||||||
table_prefix=conf['table_prefix'])
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _parse_connection_url(url):
|
|
||||||
"""Parse connection parameters from a database url.
|
|
||||||
|
|
||||||
.. note::
|
|
||||||
|
|
||||||
HBase Thrift does not support authentication and there is no
|
|
||||||
database name, so we are not looking for these in the url.
|
|
||||||
"""
|
|
||||||
opts = {}
|
|
||||||
result = netutils.urlsplit(url)
|
|
||||||
opts['table_prefix'] = urlparse.parse_qs(
|
|
||||||
result.query).get('table_prefix', [None])[0]
|
|
||||||
opts['dbtype'] = result.scheme
|
|
||||||
if ':' in result.netloc:
|
|
||||||
opts['host'], port = result.netloc.split(':')
|
|
||||||
else:
|
|
||||||
opts['host'] = result.netloc
|
|
||||||
port = 9090
|
|
||||||
opts['port'] = port and int(port) or 9090
|
|
||||||
return opts
|
|
||||||
|
|
||||||
def update_alarm(self, alarm):
|
def update_alarm(self, alarm):
|
||||||
"""Create an alarm.
|
"""Create an alarm.
|
||||||
|
|
||||||
|
@ -26,7 +26,6 @@ class Connection(object):
|
|||||||
}
|
}
|
||||||
|
|
||||||
def __init__(self, url):
|
def __init__(self, url):
|
||||||
"""Constructor."""
|
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@ -10,20 +10,14 @@
|
|||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
"""HBase storage backend
|
|
||||||
"""
|
|
||||||
import operator
|
|
||||||
import os
|
|
||||||
|
|
||||||
import happybase
|
import operator
|
||||||
from oslo.utils import netutils
|
|
||||||
from six.moves.urllib import parse as urlparse
|
|
||||||
|
|
||||||
from ceilometer.event.storage import base
|
from ceilometer.event.storage import base
|
||||||
from ceilometer.event.storage import models
|
from ceilometer.event.storage import models
|
||||||
from ceilometer.openstack.common.gettextutils import _
|
from ceilometer.openstack.common.gettextutils import _
|
||||||
from ceilometer.openstack.common import log
|
from ceilometer.openstack.common import log
|
||||||
from ceilometer.storage.hbase import inmemory as hbase_inmemory
|
from ceilometer.storage.hbase import base as hbase_base
|
||||||
from ceilometer.storage.hbase import utils as hbase_utils
|
from ceilometer.storage.hbase import utils as hbase_utils
|
||||||
from ceilometer import utils
|
from ceilometer import utils
|
||||||
|
|
||||||
@ -40,7 +34,7 @@ AVAILABLE_STORAGE_CAPABILITIES = {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
class Connection(base.Connection):
|
class Connection(hbase_base.Connection, base.Connection):
|
||||||
"""Put the event data into a HBase database
|
"""Put the event data into a HBase database
|
||||||
|
|
||||||
Collections:
|
Collections:
|
||||||
@ -73,25 +67,7 @@ class Connection(base.Connection):
|
|||||||
EVENT_TABLE = "event"
|
EVENT_TABLE = "event"
|
||||||
|
|
||||||
def __init__(self, url):
|
def __init__(self, url):
|
||||||
"""Hbase Connection Initialization."""
|
super(Connection, self).__init__(url)
|
||||||
opts = self._parse_connection_url(url)
|
|
||||||
|
|
||||||
if opts['host'] == '__test__':
|
|
||||||
url = os.environ.get('CEILOMETER_TEST_HBASE_URL')
|
|
||||||
if url:
|
|
||||||
# Reparse URL, but from the env variable now
|
|
||||||
opts = self._parse_connection_url(url)
|
|
||||||
self.conn_pool = self._get_connection_pool(opts)
|
|
||||||
else:
|
|
||||||
# This is a in-memory usage for unit tests
|
|
||||||
if Connection._memory_instance is None:
|
|
||||||
LOG.debug(_('Creating a new in-memory HBase '
|
|
||||||
'Connection object'))
|
|
||||||
Connection._memory_instance = (hbase_inmemory.
|
|
||||||
MConnectionPool())
|
|
||||||
self.conn_pool = Connection._memory_instance
|
|
||||||
else:
|
|
||||||
self.conn_pool = self._get_connection_pool(opts)
|
|
||||||
|
|
||||||
def upgrade(self):
|
def upgrade(self):
|
||||||
tables = [self.EVENT_TABLE]
|
tables = [self.EVENT_TABLE]
|
||||||
@ -112,43 +88,6 @@ class Connection(base.Connection):
|
|||||||
except Exception:
|
except Exception:
|
||||||
LOG.debug(_('Cannot delete table but ignoring error'))
|
LOG.debug(_('Cannot delete table but ignoring error'))
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _get_connection_pool(conf):
|
|
||||||
"""Return a connection pool to the database.
|
|
||||||
|
|
||||||
.. note::
|
|
||||||
|
|
||||||
The tests use a subclass to override this and return an
|
|
||||||
in-memory connection pool.
|
|
||||||
"""
|
|
||||||
LOG.debug(_('connecting to HBase on %(host)s:%(port)s') % (
|
|
||||||
{'host': conf['host'], 'port': conf['port']}))
|
|
||||||
return happybase.ConnectionPool(size=100, host=conf['host'],
|
|
||||||
port=conf['port'],
|
|
||||||
table_prefix=conf['table_prefix'])
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _parse_connection_url(url):
|
|
||||||
"""Parse connection parameters from a database url.
|
|
||||||
|
|
||||||
.. note::
|
|
||||||
|
|
||||||
HBase Thrift does not support authentication and there is no
|
|
||||||
database name, so we are not looking for these in the url.
|
|
||||||
"""
|
|
||||||
opts = {}
|
|
||||||
result = netutils.urlsplit(url)
|
|
||||||
opts['table_prefix'] = urlparse.parse_qs(
|
|
||||||
result.query).get('table_prefix', [None])[0]
|
|
||||||
opts['dbtype'] = result.scheme
|
|
||||||
if ':' in result.netloc:
|
|
||||||
opts['host'], port = result.netloc.split(':')
|
|
||||||
else:
|
|
||||||
opts['host'] = result.netloc
|
|
||||||
port = 9090
|
|
||||||
opts['port'] = port and int(port) or 9090
|
|
||||||
return opts
|
|
||||||
|
|
||||||
def record_events(self, event_models):
|
def record_events(self, event_models):
|
||||||
"""Write the events to Hbase.
|
"""Write the events to Hbase.
|
||||||
|
|
||||||
|
@ -173,7 +173,6 @@ class Connection(object):
|
|||||||
}
|
}
|
||||||
|
|
||||||
def __init__(self, url):
|
def __init__(self, url):
|
||||||
"""Constructor."""
|
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
88
ceilometer/storage/hbase/base.py
Normal file
88
ceilometer/storage/hbase/base.py
Normal file
@ -0,0 +1,88 @@
|
|||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
|
import happybase
|
||||||
|
from oslo.utils import netutils
|
||||||
|
from six.moves.urllib import parse as urlparse
|
||||||
|
|
||||||
|
from ceilometer.openstack.common.gettextutils import _
|
||||||
|
from ceilometer.openstack.common import log
|
||||||
|
from ceilometer.storage.hbase import inmemory as hbase_inmemory
|
||||||
|
|
||||||
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class Connection(object):
|
||||||
|
"""Base connection class for HBase."""
|
||||||
|
|
||||||
|
_memory_instance = None
|
||||||
|
|
||||||
|
def __init__(self, url):
|
||||||
|
"""Hbase Connection Initialization."""
|
||||||
|
opts = self._parse_connection_url(url)
|
||||||
|
|
||||||
|
if opts['host'] == '__test__':
|
||||||
|
url = os.environ.get('CEILOMETER_TEST_HBASE_URL')
|
||||||
|
if url:
|
||||||
|
# Reparse URL, but from the env variable now
|
||||||
|
opts = self._parse_connection_url(url)
|
||||||
|
self.conn_pool = self._get_connection_pool(opts)
|
||||||
|
else:
|
||||||
|
# This is a in-memory usage for unit tests
|
||||||
|
if Connection._memory_instance is None:
|
||||||
|
LOG.debug(_('Creating a new in-memory HBase '
|
||||||
|
'Connection object'))
|
||||||
|
Connection._memory_instance = (hbase_inmemory.
|
||||||
|
MConnectionPool())
|
||||||
|
self.conn_pool = Connection._memory_instance
|
||||||
|
else:
|
||||||
|
self.conn_pool = self._get_connection_pool(opts)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _get_connection_pool(conf):
|
||||||
|
"""Return a connection pool to the database.
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
|
||||||
|
The tests use a subclass to override this and return an
|
||||||
|
in-memory connection pool.
|
||||||
|
"""
|
||||||
|
LOG.debug(_('connecting to HBase on %(host)s:%(port)s') % (
|
||||||
|
{'host': conf['host'], 'port': conf['port']}))
|
||||||
|
return happybase.ConnectionPool(size=100, host=conf['host'],
|
||||||
|
port=conf['port'],
|
||||||
|
table_prefix=conf['table_prefix'])
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _parse_connection_url(url):
|
||||||
|
"""Parse connection parameters from a database url.
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
|
||||||
|
HBase Thrift does not support authentication and there is no
|
||||||
|
database name, so we are not looking for these in the url.
|
||||||
|
"""
|
||||||
|
opts = {}
|
||||||
|
result = netutils.urlsplit(url)
|
||||||
|
opts['table_prefix'] = urlparse.parse_qs(
|
||||||
|
result.query).get('table_prefix', [None])[0]
|
||||||
|
opts['dbtype'] = result.scheme
|
||||||
|
if ':' in result.netloc:
|
||||||
|
opts['host'], port = result.netloc.split(':')
|
||||||
|
else:
|
||||||
|
opts['host'] = result.netloc
|
||||||
|
port = 9090
|
||||||
|
opts['port'] = port and int(port) or 9090
|
||||||
|
return opts
|
@ -10,23 +10,18 @@
|
|||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
"""HBase storage backend
|
|
||||||
"""
|
|
||||||
import datetime
|
import datetime
|
||||||
import operator
|
import operator
|
||||||
import os
|
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import happybase
|
|
||||||
from oslo.utils import netutils
|
|
||||||
from oslo.utils import timeutils
|
from oslo.utils import timeutils
|
||||||
from six.moves.urllib import parse as urlparse
|
|
||||||
|
|
||||||
import ceilometer
|
import ceilometer
|
||||||
from ceilometer.openstack.common.gettextutils import _
|
from ceilometer.openstack.common.gettextutils import _
|
||||||
from ceilometer.openstack.common import log
|
from ceilometer.openstack.common import log
|
||||||
from ceilometer.storage import base
|
from ceilometer.storage import base
|
||||||
from ceilometer.storage.hbase import inmemory as hbase_inmemory
|
from ceilometer.storage.hbase import base as hbase_base
|
||||||
from ceilometer.storage.hbase import migration as hbase_migration
|
from ceilometer.storage.hbase import migration as hbase_migration
|
||||||
from ceilometer.storage.hbase import utils as hbase_utils
|
from ceilometer.storage.hbase import utils as hbase_utils
|
||||||
from ceilometer.storage import models
|
from ceilometer.storage import models
|
||||||
@ -53,8 +48,8 @@ AVAILABLE_STORAGE_CAPABILITIES = {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
class Connection(base.Connection):
|
class Connection(hbase_base.Connection, base.Connection):
|
||||||
"""Put the data into a HBase database
|
"""Put the metering data into a HBase database
|
||||||
|
|
||||||
Collections:
|
Collections:
|
||||||
|
|
||||||
@ -123,25 +118,7 @@ class Connection(base.Connection):
|
|||||||
METER_TABLE = "meter"
|
METER_TABLE = "meter"
|
||||||
|
|
||||||
def __init__(self, url):
|
def __init__(self, url):
|
||||||
"""Hbase Connection Initialization."""
|
super(Connection, self).__init__(url)
|
||||||
opts = self._parse_connection_url(url)
|
|
||||||
|
|
||||||
if opts['host'] == '__test__':
|
|
||||||
url = os.environ.get('CEILOMETER_TEST_HBASE_URL')
|
|
||||||
if url:
|
|
||||||
# Reparse URL, but from the env variable now
|
|
||||||
opts = self._parse_connection_url(url)
|
|
||||||
self.conn_pool = self._get_connection_pool(opts)
|
|
||||||
else:
|
|
||||||
# This is a in-memory usage for unit tests
|
|
||||||
if Connection._memory_instance is None:
|
|
||||||
LOG.debug(_('Creating a new in-memory HBase '
|
|
||||||
'Connection object'))
|
|
||||||
Connection._memory_instance = (hbase_inmemory.
|
|
||||||
MConnectionPool())
|
|
||||||
self.conn_pool = Connection._memory_instance
|
|
||||||
else:
|
|
||||||
self.conn_pool = self._get_connection_pool(opts)
|
|
||||||
|
|
||||||
def upgrade(self):
|
def upgrade(self):
|
||||||
tables = [self.RESOURCE_TABLE, self.METER_TABLE]
|
tables = [self.RESOURCE_TABLE, self.METER_TABLE]
|
||||||
@ -164,43 +141,6 @@ class Connection(base.Connection):
|
|||||||
except Exception:
|
except Exception:
|
||||||
LOG.debug(_('Cannot delete table but ignoring error'))
|
LOG.debug(_('Cannot delete table but ignoring error'))
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _get_connection_pool(conf):
|
|
||||||
"""Return a connection pool to the database.
|
|
||||||
|
|
||||||
.. note::
|
|
||||||
|
|
||||||
The tests use a subclass to override this and return an
|
|
||||||
in-memory connection pool.
|
|
||||||
"""
|
|
||||||
LOG.debug(_('connecting to HBase on %(host)s:%(port)s') % (
|
|
||||||
{'host': conf['host'], 'port': conf['port']}))
|
|
||||||
return happybase.ConnectionPool(size=100, host=conf['host'],
|
|
||||||
port=conf['port'],
|
|
||||||
table_prefix=conf['table_prefix'])
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _parse_connection_url(url):
|
|
||||||
"""Parse connection parameters from a database url.
|
|
||||||
|
|
||||||
.. note::
|
|
||||||
|
|
||||||
HBase Thrift does not support authentication and there is no
|
|
||||||
database name, so we are not looking for these in the url.
|
|
||||||
"""
|
|
||||||
opts = {}
|
|
||||||
result = netutils.urlsplit(url)
|
|
||||||
opts['table_prefix'] = urlparse.parse_qs(
|
|
||||||
result.query).get('table_prefix', [None])[0]
|
|
||||||
opts['dbtype'] = result.scheme
|
|
||||||
if ':' in result.netloc:
|
|
||||||
opts['host'], port = result.netloc.split(':')
|
|
||||||
else:
|
|
||||||
opts['host'] = result.netloc
|
|
||||||
port = 9090
|
|
||||||
opts['port'] = port and int(port) or 9090
|
|
||||||
return opts
|
|
||||||
|
|
||||||
def record_metering_data(self, data):
|
def record_metering_data(self, data):
|
||||||
"""Write the data to the backend storage system.
|
"""Write the data to the backend storage system.
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user