From 4570e42803e97c0249e8d3bd87cdaf5673b3ce77 Mon Sep 17 00:00:00 2001 From: Julien Danjou Date: Tue, 20 Sep 2016 10:41:02 +0200 Subject: [PATCH] Remove deprecated non-SQL drivers Change-Id: I1eb890fe422718e986bb92139a023868bace7d96 --- aodh/cmd/data_migration.py | 162 --------- aodh/storage/__init__.py | 7 - aodh/storage/hbase/__init__.py | 0 aodh/storage/hbase/base.py | 78 ----- aodh/storage/hbase/inmemory.py | 283 ---------------- aodh/storage/hbase/migration.py | 43 --- aodh/storage/hbase/utils.py | 250 -------------- aodh/storage/impl_hbase.py | 189 ----------- aodh/storage/impl_mongodb.py | 106 ------ aodh/storage/impl_sqlalchemy.py | 10 +- aodh/storage/mongo/__init__.py | 0 aodh/storage/mongo/utils.py | 297 ---------------- aodh/storage/pymongo_base.py | 317 ------------------ .../functional/api/v2/test_alarm_scenarios.py | 6 +- .../api/v2/test_complex_query_scenarios.py | 4 +- aodh/tests/functional/db.py | 83 +---- .../storage/sqlalchemy/test_migrations.py | 1 - .../functional/storage/test_data_migration.py | 112 ------- .../functional/storage/test_impl_hbase.py | 67 ---- .../functional/storage/test_impl_mongodb.py | 101 ------ .../storage/test_storage_scenarios.py | 1 - aodh/tests/mocks.py | 79 ----- devstack/plugin.sh | 49 +-- doc/source/configuration.rst | 41 --- doc/source/install/manual.rst | 87 +---- doc/source/testing.rst | 4 +- ...emove-no-sql-drivers-21dfdbd750751340.yaml | 3 + run-functional-tests.sh | 9 +- setup.cfg | 11 - tools/test_hbase_table_utils.py | 38 --- tox.ini | 18 +- 31 files changed, 31 insertions(+), 2425 deletions(-) delete mode 100644 aodh/cmd/data_migration.py delete mode 100644 aodh/storage/hbase/__init__.py delete mode 100644 aodh/storage/hbase/base.py delete mode 100644 aodh/storage/hbase/inmemory.py delete mode 100644 aodh/storage/hbase/migration.py delete mode 100644 aodh/storage/hbase/utils.py delete mode 100644 aodh/storage/impl_hbase.py delete mode 100644 aodh/storage/impl_mongodb.py delete mode 100644 aodh/storage/mongo/__init__.py delete mode 100644 aodh/storage/mongo/utils.py delete mode 100644 aodh/storage/pymongo_base.py delete mode 100644 aodh/tests/functional/storage/test_data_migration.py delete mode 100644 aodh/tests/functional/storage/test_impl_hbase.py delete mode 100644 aodh/tests/functional/storage/test_impl_mongodb.py delete mode 100644 aodh/tests/mocks.py create mode 100644 releasenotes/notes/remove-no-sql-drivers-21dfdbd750751340.yaml delete mode 100644 tools/test_hbase_table_utils.py diff --git a/aodh/cmd/data_migration.py b/aodh/cmd/data_migration.py deleted file mode 100644 index 6a9ea49ba..000000000 --- a/aodh/cmd/data_migration.py +++ /dev/null @@ -1,162 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -A tool for migrating alarms and alarms history data from NoSQL to SQL. - -NOTES: - -- Users need to specify the source NoSQL connection url and the destination SQL - connection URL with this tool, an usage example: - aodh-data-migration --nosql-conn \ - mongodb://aodh:password@127.0.0.1:27017/aodh --sql-conn \ - mysql+pymysql://root:password@127.0.0.1/aodh?charset=utf8 - -- Both the alarm data and alarm history data will be migrated when running this - tool, but the history data migration can be avoid by specifying False of - --migrate-history option. - -- It is better to ensure the db connection is OK when running this tool, and - this tool can be run repeatedly, the duplicated data will be skipped. - -- This tool depends on the NoSQL and SQL drivers of Aodh, so it is should be - used only before the removal of NoSQL drivers. - -- This tool has been tested OK in devstack environment, but users need to be - cautious with this, because the data migration between storage backends is - a bit dangerous. - -""" - -import argparse -import logging -import sys - -from oslo_config import cfg -from oslo_db import exception -from oslo_db import options as db_options -from aodh.i18n import _LE, _LI, _LW -import six.moves.urllib.parse as urlparse - -from aodh import storage - -root_logger = logging.getLogger('') - - -def get_parser(): - parser = argparse.ArgumentParser( - description='A tool for Migrating alarms and alarms history from' - ' NoSQL to SQL', - ) - parser.add_argument( - '--nosql-conn', - required=True, - type=str, - help='The source NoSQL database connection.', - ) - parser.add_argument( - '--sql-conn', - required=True, - type=str, - help='The destination SQL database connection.', - ) - parser.add_argument( - '--migrate-history', - default=True, - type=bool, - help='Migrate history data when migrate alarms or not,' - ' True as Default.', - ) - parser.add_argument( - '--debug', - default=False, - action='store_true', - help='Show the debug level log messages.', - ) - return parser - - -def _validate_conn_options(args): - nosql_scheme = urlparse.urlparse(args.nosql_conn).scheme - sql_scheme = urlparse.urlparse(args.sql_conn).scheme - if nosql_scheme not in ('mongodb', 'hbase'): - root_logger.error(_LE('Invalid source DB type %s, the source database ' - 'connection should be one of: [mongodb, hbase]' - ), nosql_scheme) - sys.exit(1) - if sql_scheme not in ('mysql', 'mysql+pymysql', 'postgresql', - 'sqlite'): - root_logger.error(_LE('Invalid destination DB type %s, the destination' - ' database connection should be one of: ' - '[mysql, postgresql, sqlite]'), sql_scheme) - sys.exit(1) - - -def main(): - args = get_parser().parse_args() - - # Set up logging to use the console - console = logging.StreamHandler(sys.stderr) - formatter = logging.Formatter( - '[%(asctime)s] %(levelname)-8s %(message)s') - console.setFormatter(formatter) - root_logger.addHandler(console) - if args.debug: - root_logger.setLevel(logging.DEBUG) - else: - root_logger.setLevel(logging.INFO) - - _validate_conn_options(args) - - nosql_conf = cfg.ConfigOpts() - db_options.set_defaults(nosql_conf, args.nosql_conn) - nosql_conf.register_opts(storage.OPTS, 'database') - nosql_conn = storage.get_connection_from_config(nosql_conf) - - sql_conf = cfg.ConfigOpts() - db_options.set_defaults(sql_conf, args.sql_conn) - sql_conf.register_opts(storage.OPTS, 'database') - sql_conn = storage.get_connection_from_config(sql_conf) - - root_logger.info( - _LI("Starting to migrate alarms data from NoSQL to SQL...")) - - count = 0 - for alarm in nosql_conn.get_alarms(): - root_logger.debug("Migrating alarm %s..." % alarm.alarm_id) - try: - sql_conn.create_alarm(alarm) - count += 1 - except exception.DBDuplicateEntry: - root_logger.warning(_LW("Duplicated alarm %s found, skipped."), - alarm.alarm_id) - if not args.migrate_history: - continue - - history_count = 0 - for history in nosql_conn.get_alarm_changes(alarm.alarm_id, None): - history_data = history.as_dict() - root_logger.debug(" Migrating alarm history data with" - " event_id %s..." % history_data['event_id']) - try: - sql_conn.record_alarm_change(history_data) - history_count += 1 - except exception.DBDuplicateEntry: - root_logger.warning( - _LW(" Duplicated alarm history %s found, skipped."), - history_data['event_id']) - root_logger.info(_LI(" Migrated %(count)s history data of alarm " - "%(alarm_id)s"), - {'count': history_count, 'alarm_id': alarm.alarm_id}) - - root_logger.info(_LI("End alarms data migration from NoSQL to SQL, %s" - " alarms have been migrated."), count) diff --git a/aodh/storage/__init__.py b/aodh/storage/__init__.py index e1d104826..a3f428d66 100644 --- a/aodh/storage/__init__.py +++ b/aodh/storage/__init__.py @@ -22,7 +22,6 @@ from oslo_utils import timeutils import retrying import six.moves.urllib.parse as urlparse from stevedore import driver -import warnings _NAMESPACE = 'aodh.storage' @@ -58,12 +57,6 @@ def get_connection_from_config(conf): retries = conf.database.max_retries url = conf.database.connection connection_scheme = urlparse.urlparse(url).scheme - if connection_scheme not in ('mysql', 'mysql+pymysql', 'postgresql', - 'sqlite'): - msg = ('Storage backend %s is deprecated, and all the NoSQL backends ' - 'will be removed in Aodh 4.0, please use SQL backend.' % - connection_scheme) - warnings.warn(msg) LOG.debug('looking for %(name)r driver in %(namespace)r', {'name': connection_scheme, 'namespace': _NAMESPACE}) mgr = driver.DriverManager(_NAMESPACE, connection_scheme) diff --git a/aodh/storage/hbase/__init__.py b/aodh/storage/hbase/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/aodh/storage/hbase/base.py b/aodh/storage/hbase/base.py deleted file mode 100644 index d6f363e30..000000000 --- a/aodh/storage/hbase/base.py +++ /dev/null @@ -1,78 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import happybase -from oslo_log import log -from oslo_utils import netutils -from six.moves.urllib import parse as urlparse - -from aodh.storage.hbase import inmemory as hbase_inmemory - -LOG = log.getLogger(__name__) - - -class Connection(object): - """Base connection class for HBase.""" - - _memory_instance = None - - def __init__(self, conf, url): - """Hbase Connection Initialization.""" - opts = self._parse_connection_url(url) - - if opts['host'] == '__test__': - # This is a in-memory usage for unit tests - if Connection._memory_instance is None: - LOG.debug('Creating a new in-memory HBase Connection object') - Connection._memory_instance = (hbase_inmemory. - MConnectionPool()) - self.conn_pool = Connection._memory_instance - else: - self.conn_pool = self._get_connection_pool(opts) - - @staticmethod - def _get_connection_pool(conf): - """Return a connection pool to the database. - - .. note:: - - The tests use a subclass to override this and return an - in-memory connection pool. - """ - LOG.debug('connecting to HBase on %(host)s:%(port)s', - {'host': conf['host'], 'port': conf['port']}) - return happybase.ConnectionPool(size=100, host=conf['host'], - port=conf['port'], - table_prefix=conf['table_prefix']) - - @staticmethod - def _parse_connection_url(url): - """Parse connection parameters from a database url. - - .. note:: - - HBase Thrift does not support authentication and there is no - database name, so we are not looking for these in the url. - """ - opts = {} - result = netutils.urlsplit(url) - opts['table_prefix'] = urlparse.parse_qs( - result.query).get('table_prefix', [None])[0] - opts['dbtype'] = result.scheme - if ':' in result.netloc: - opts['host'], port = result.netloc.split(':') - else: - opts['host'] = result.netloc - port = 9090 - opts['port'] = port and int(port) or 9090 - return opts diff --git a/aodh/storage/hbase/inmemory.py b/aodh/storage/hbase/inmemory.py deleted file mode 100644 index 94826b478..000000000 --- a/aodh/storage/hbase/inmemory.py +++ /dev/null @@ -1,283 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -""" This is a very crude version of "in-memory HBase", which implements just - enough functionality of HappyBase API to support testing of our driver. -""" - -import copy -import re - -from oslo_log import log -import six - -import aodh - -LOG = log.getLogger(__name__) - - -class MTable(object): - """HappyBase.Table mock.""" - def __init__(self, name, families): - self.name = name - self.families = families - self._rows_with_ts = {} - - def row(self, key, columns=None): - if key not in self._rows_with_ts: - return {} - res = copy.copy(sorted(six.iteritems( - self._rows_with_ts.get(key)))[-1][1]) - if columns: - keys = res.keys() - for key in keys: - if key not in columns: - res.pop(key) - return res - - def rows(self, keys): - return ((k, self.row(k)) for k in keys) - - def put(self, key, data, ts=None): - # Note: Now we use 'timestamped' but only for one Resource table. - # That's why we may put ts='0' in case when ts is None. If it is - # needed to use 2 types of put in one table ts=0 cannot be used. - if ts is None: - ts = "0" - if key not in self._rows_with_ts: - self._rows_with_ts[key] = {ts: data} - else: - if ts in self._rows_with_ts[key]: - self._rows_with_ts[key][ts].update(data) - else: - self._rows_with_ts[key].update({ts: data}) - - def delete(self, key): - del self._rows_with_ts[key] - - def _get_latest_dict(self, row): - # The idea here is to return latest versions of columns. - # In _rows_with_ts we store {row: {ts_1: {data}, ts_2: {data}}}. - # res will contain a list of tuples [(ts_1, {data}), (ts_2, {data})] - # sorted by ts, i.e. in this list ts_2 is the most latest. - # To get result as HBase provides we should iterate in reverse order - # and get from "latest" data only key-values that are not in newer data - data = {} - for i in sorted(six.iteritems(self._rows_with_ts[row])): - data.update(i[1]) - return data - - def scan(self, filter=None, columns=None, row_start=None, row_stop=None, - limit=None): - columns = columns or [] - sorted_keys = sorted(self._rows_with_ts) - # copy data between row_start and row_stop into a dict - rows = {} - for row in sorted_keys: - if row_start and row < row_start: - continue - if row_stop and row > row_stop: - break - rows[row] = self._get_latest_dict(row) - - if columns: - ret = {} - for row, data in six.iteritems(rows): - for key in data: - if key in columns: - ret[row] = data - rows = ret - if filter: - # TODO(jdanjou): we should really parse this properly, - # but at the moment we are only going to support AND here - filters = filter.split('AND') - for f in filters: - # Extract filter name and its arguments - g = re.search("(.*)\((.*),?\)", f) - fname = g.group(1).strip() - fargs = [s.strip().replace('\'', '') - for s in g.group(2).split(',')] - m = getattr(self, fname) - if callable(m): - # overwrite rows for filtering to take effect - # in case of multiple filters - rows = m(fargs, rows) - else: - raise aodh.NotImplementedError( - "%s filter is not implemented, " - "you may want to add it!" % filter) - for k in sorted(rows)[:limit]: - yield k, rows[k] - - @staticmethod - def SingleColumnValueFilter(args, rows): - """This is filter for testing "in-memory HBase". - - This method is called from scan() when 'SingleColumnValueFilter' - is found in the 'filter' argument. - """ - op = args[2] - column = "%s:%s" % (args[0], args[1]) - value = args[3] - if value.startswith('binary:'): - value = value[7:] - r = {} - for row in rows: - data = rows[row] - if op == '=': - if column in data and data[column] == value: - r[row] = data - elif op == '<': - if column in data and data[column] < value: - r[row] = data - elif op == '<=': - if column in data and data[column] <= value: - r[row] = data - elif op == '>': - if column in data and data[column] > value: - r[row] = data - elif op == '>=': - if column in data and data[column] >= value: - r[row] = data - elif op == '!=': - if column in data and data[column] != value: - r[row] = data - return r - - @staticmethod - def ColumnPrefixFilter(args, rows): - """This is filter for testing "in-memory HBase". - - This method is called from scan() when 'ColumnPrefixFilter' is found - in the 'filter' argument. - - :param args: a list of filter arguments, contain prefix of column - :param rows: a dict of row prefixes for filtering - """ - value = args[0] - column = 'f:' + value - r = {} - for row, data in rows.items(): - column_dict = {} - for key in data: - if key.startswith(column): - column_dict[key] = data[key] - r[row] = column_dict - return r - - @staticmethod - def RowFilter(args, rows): - """This is filter for testing "in-memory HBase". - - This method is called from scan() when 'RowFilter' is found in the - 'filter' argument. - - :param args: a list of filter arguments, it contains operator and - sought string - :param rows: a dict of rows which are filtered - """ - op = args[0] - value = args[1] - if value.startswith('binary:'): - value = value[len('binary:'):] - if value.startswith('regexstring:'): - value = value[len('regexstring:'):] - r = {} - for row, data in rows.items(): - try: - g = re.search(value, row).group() - if op == '=': - if g == row: - r[row] = data - else: - raise aodh.NotImplementedError( - "In-memory " - "RowFilter doesn't support " - "the %s operation yet" % op) - except AttributeError: - pass - return r - - @staticmethod - def QualifierFilter(args, rows): - """This is filter for testing "in-memory HBase". - - This method is called from scan() when 'QualifierFilter' is found in - the 'filter' argument - """ - op = args[0] - value = args[1] - is_regex = False - if value.startswith('binaryprefix:'): - value = value[len('binaryprefix:'):] - if value.startswith('regexstring:'): - value = value[len('regexstring:'):] - is_regex = True - column = 'f:' + value - r = {} - for row in rows: - data = rows[row] - r_data = {} - for key in data: - if ((op == '=' and key.startswith(column)) or - (op == '>=' and key >= column) or - (op == '<=' and key <= column) or - (op == '>' and key > column) or - (op == '<' and key < column) or - (is_regex and re.search(value, key))): - r_data[key] = data[key] - else: - raise aodh.NotImplementedError( - "In-memory QualifierFilter " - "doesn't support the %s " - "operation yet" % op) - if r_data: - r[row] = r_data - return r - - -class MConnectionPool(object): - def __init__(self): - self.conn = MConnection() - - def connection(self): - return self.conn - - -class MConnection(object): - """HappyBase.Connection mock.""" - def __init__(self): - self.tables = {} - - def __enter__(self, *args, **kwargs): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - pass - - @staticmethod - def open(): - LOG.debug("Opening in-memory HBase connection") - - def create_table(self, n, families=None): - families = families or {} - if n in self.tables: - return self.tables[n] - t = MTable(n, families) - self.tables[n] = t - return t - - def delete_table(self, name, use_prefix=True): - del self.tables[name] - - def table(self, name): - return self.create_table(name) diff --git a/aodh/storage/hbase/migration.py b/aodh/storage/hbase/migration.py deleted file mode 100644 index 97525c5fe..000000000 --- a/aodh/storage/hbase/migration.py +++ /dev/null @@ -1,43 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -"""HBase storage backend migrations -""" - - -from aodh.storage.hbase import utils as hbase_utils - - -def migrate_alarm_history_table(conn, table): - """Migrate table 'alarm_h' in HBase. - - Change row format from ""%s_%s" % alarm_id, rts, - to new separator format "%s:%s" % alarm_id, rts - """ - alarm_h_table = conn.table(table) - alarm_h_filter = "RowFilter(=, 'regexstring:\\w*_\\d{19}')" - gen = alarm_h_table.scan(filter=alarm_h_filter) - for row, data in gen: - row_parts = row.rsplit('_', 1) - alarm_h_table.put(hbase_utils.prepare_key(*row_parts), data) - alarm_h_table.delete(row) - - -TABLE_MIGRATION_FUNCS = {'alarm_h': migrate_alarm_history_table} - - -def migrate_tables(conn, tables): - if type(tables) is not list: - tables = [tables] - for table in tables: - if table in TABLE_MIGRATION_FUNCS: - TABLE_MIGRATION_FUNCS.get(table)(conn, table) diff --git a/aodh/storage/hbase/utils.py b/aodh/storage/hbase/utils.py deleted file mode 100644 index e9b437004..000000000 --- a/aodh/storage/hbase/utils.py +++ /dev/null @@ -1,250 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -""" Various HBase helpers -""" -import copy -import datetime -import json - -import bson.json_util -from happybase.hbase import ttypes -from oslo_log import log -import six - -from aodh.i18n import _LW - -LOG = log.getLogger(__name__) - -OP_SIGN = {'eq': '=', 'lt': '<', 'le': '<=', 'ne': '!=', 'gt': '>', 'ge': '>='} -# We need this additional dictionary because we have reverted timestamp in -# row-keys for stored metrics -OP_SIGN_REV = {'eq': '=', 'lt': '>', 'le': '>=', 'ne': '!=', 'gt': '<', - 'ge': '<='} - - -def timestamp(dt, reverse=True): - """Timestamp is count of milliseconds since start of epoch. - - If reverse=True then timestamp will be reversed. Such a technique is used - in HBase rowkey design when period queries are required. Because of the - fact that rows are sorted lexicographically it's possible to vary whether - the 'oldest' entries will be on top of the table or it should be the newest - ones (reversed timestamp case). - - :param dt: datetime which is translated to timestamp - :param reverse: a boolean parameter for reverse or straight count of - timestamp in milliseconds - :return: count or reversed count of milliseconds since start of epoch - """ - epoch = datetime.datetime(1970, 1, 1) - td = dt - epoch - ts = td.microseconds + td.seconds * 1000000 + td.days * 86400000000 - return 0x7fffffffffffffff - ts if reverse else ts - - -def make_timestamp_query(func, start=None, start_op=None, end=None, - end_op=None, bounds_only=False, **kwargs): - """Return a filter start and stop row for filtering and a query. - - Query is based on the fact that CF-name is 'rts'. - :param start: Optional start timestamp - :param start_op: Optional start timestamp operator, like gt, ge - :param end: Optional end timestamp - :param end_op: Optional end timestamp operator, like lt, le - :param bounds_only: if True than query will not be returned - :param func: a function that provide a format of row - :param kwargs: kwargs for :param func - """ - # We don't need to dump here because get_start_end_rts returns strings - rts_start, rts_end = get_start_end_rts(start, end) - start_row, end_row = func(rts_start, rts_end, **kwargs) - - if bounds_only: - return start_row, end_row - - q = [] - start_op = start_op or 'ge' - end_op = end_op or 'lt' - if rts_start: - q.append("SingleColumnValueFilter ('f', 'rts', %s, 'binary:%s')" % - (OP_SIGN_REV[start_op], rts_start)) - if rts_end: - q.append("SingleColumnValueFilter ('f', 'rts', %s, 'binary:%s')" % - (OP_SIGN_REV[end_op], rts_end)) - - res_q = None - if len(q): - res_q = " AND ".join(q) - - return start_row, end_row, res_q - - -def get_start_end_rts(start, end): - - rts_start = str(timestamp(start)) if start else "" - rts_end = str(timestamp(end)) if end else "" - return rts_start, rts_end - - -def make_query(**kwargs): - """Return a filter query string based on the selected parameters. - - :param kwargs: key-value pairs to filter on. Key should be a real - column name in db - """ - q = [] - - # Note: we use extended constructor for SingleColumnValueFilter here. - # It is explicitly specified that entry should not be returned if CF is not - # found in table. - for key, value in sorted(kwargs.items()): - if value is not None: - if key == 'source': - q.append("SingleColumnValueFilter " - "('f', 's_%s', =, 'binary:%s', true, true)" % - (value, dump('1'))) - elif key == 'trait_type': - q.append("ColumnPrefixFilter('%s')" % value) - elif key == 'event_id': - q.append("RowFilter ( = , 'regexstring:\d*:%s')" % value) - elif key == 'exclude': - for k, v in six.iteritems(value): - q.append("SingleColumnValueFilter " - "('f', '%(k)s', !=, 'binary:%(v)s', true, true)" % - {'k': quote(k), 'v': dump(v)}) - else: - q.append("SingleColumnValueFilter " - "('f', '%s', =, 'binary:%s', true, true)" % - (quote(key), dump(value))) - if len(q): - return " AND ".join(q) - - -def make_general_rowkey_scan(rts_start=None, rts_end=None, some_id=None): - """If it's filter on some_id without start and end. - - start_row = some_id while end_row = some_id + MAX_BYTE. - """ - if some_id is None: - return None, None - if not rts_start: - # NOTE(idegtiarov): Here we could not use chr > 122 because chr >= 123 - # will be quoted and character will be turn in a composition that is - # started with '%' (chr(37)) that lexicographically is less than chr - # of number - rts_start = chr(122) - end_row = prepare_key(some_id, rts_start) - start_row = prepare_key(some_id, rts_end) - - return start_row, end_row - - -def prepare_key(*args): - """Prepares names for rows and columns with correct separator. - - :param args: strings or numbers that we want our key construct of - :return: key with quoted args that are separated with character ":" - """ - key_quote = [] - for key in args: - if isinstance(key, six.integer_types): - key = str(key) - key_quote.append(quote(key)) - return ":".join(key_quote) - - -def deserialize_entry(entry): - """Return a list of flatten results. - - Result contains a dict of simple structures such as 'resource_id':1 - - :param entry: entry from HBase, without row name and timestamp - """ - flatten_result = {} - for k, v in entry.items(): - if ':' in k[2:]: - key = tuple([unquote(i) for i in k[2:].split(':')]) - else: - key = unquote(k[2:]) - flatten_result[key] = load(v) - - return flatten_result - - -def serialize_entry(data=None, **kwargs): - """Return a dict that is ready to be stored to HBase - - :param data: dict to be serialized - :param kwargs: additional args - """ - data = data or {} - entry_dict = copy.copy(data) - entry_dict.update(**kwargs) - - result = {} - for k, v in entry_dict.items(): - result['f:' + quote(k, ':')] = dump(v) - return result - - -def dump(data): - return json.dumps(data, default=bson.json_util.default) - - -def load(data): - return json.loads(data, object_hook=object_hook) - - -# We don't want to have tzinfo in decoded json.This object_hook is -# overwritten json_util.object_hook for $date -def object_hook(dct): - if "$date" in dct: - dt = bson.json_util.object_hook(dct) - return dt.replace(tzinfo=None) - return bson.json_util.object_hook(dct) - - -def create_tables(conn, tables, column_families): - for table in tables: - try: - conn.create_table(table, column_families) - except ttypes.AlreadyExists: - if conn.table_prefix: - table = ("%(table_prefix)s" - "%(separator)s" - "%(table_name)s" % - dict(table_prefix=conn.table_prefix, - separator=conn.table_prefix_separator, - table_name=table)) - - LOG.warning(_LW("Cannot create table %s because " - "it already exists. Ignoring error"), table) - - -def quote(s, *args): - """Return quoted string even if it is unicode one. - - :param s: string that should be quoted - :param args: any symbol we want to stay unquoted - """ - s_en = s.encode('utf8') - return six.moves.urllib.parse.quote(s_en, *args) - - -def unquote(s): - """Return unquoted and decoded string. - - :param s: string that should be unquoted - """ - s_de = six.moves.urllib.parse.unquote(s) - return s_de.decode('utf8') diff --git a/aodh/storage/impl_hbase.py b/aodh/storage/impl_hbase.py deleted file mode 100644 index 7c10928c6..000000000 --- a/aodh/storage/impl_hbase.py +++ /dev/null @@ -1,189 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import datetime -import operator - -from oslo_log import log - -import aodh -from aodh import storage -from aodh.storage import base -from aodh.storage.hbase import base as hbase_base -from aodh.storage.hbase import migration as hbase_migration -from aodh.storage.hbase import utils as hbase_utils -from aodh.storage import models - -LOG = log.getLogger(__name__) - - -AVAILABLE_CAPABILITIES = { - 'alarms': {'query': {'simple': True, - 'complex': False}, - 'history': {'query': {'simple': True, - 'complex': False}}}, -} - - -AVAILABLE_STORAGE_CAPABILITIES = { - 'storage': {'production_ready': True}, -} - - -class Connection(hbase_base.Connection, base.Connection): - """Put the alarm data into a HBase database - - Collections: - - - alarm: - - - row_key: uuid of alarm - - Column Families: - - f: contains the raw incoming alarm data - - - alarm_h: - - - row_key: uuid of alarm + ":" + reversed timestamp - - Column Families: - - f: raw incoming alarm_history data. Timestamp becomes now() - if not determined - """ - - CAPABILITIES = base.update_nested(base.Connection.CAPABILITIES, - AVAILABLE_CAPABILITIES) - STORAGE_CAPABILITIES = base.update_nested( - base.Connection.STORAGE_CAPABILITIES, - AVAILABLE_STORAGE_CAPABILITIES, - ) - _memory_instance = None - - ALARM_TABLE = "alarm" - ALARM_HISTORY_TABLE = "alarm_h" - - def upgrade(self): - tables = [self.ALARM_HISTORY_TABLE, self.ALARM_TABLE] - column_families = {'f': dict()} - with self.conn_pool.connection() as conn: - hbase_utils.create_tables(conn, tables, column_families) - hbase_migration.migrate_tables(conn, tables) - - def clear(self): - LOG.debug('Dropping HBase schema...') - with self.conn_pool.connection() as conn: - for table in [self.ALARM_TABLE, - self.ALARM_HISTORY_TABLE]: - try: - conn.disable_table(table) - except Exception: - LOG.debug('Cannot disable table but ignoring error') - try: - conn.delete_table(table) - except Exception: - LOG.debug('Cannot delete table but ignoring error') - - def update_alarm(self, alarm, upsert=False): - """Create an alarm. - - :param alarm: The alarm to create. It is Alarm object, so we need to - call as_dict() - """ - _id = alarm.alarm_id - alarm_to_store = hbase_utils.serialize_entry(alarm.as_dict()) - with self.conn_pool.connection() as conn: - alarm_table = conn.table(self.ALARM_TABLE) - if not upsert: - q = hbase_utils.make_query(alarm_id=alarm.alarm_id) - query_alarm = alarm_table.scan(filter=q) - if len(list(query_alarm)) == 0: - raise storage.AlarmNotFound(alarm.alarm_id) - alarm_table.put(_id, alarm_to_store) - stored_alarm = hbase_utils.deserialize_entry( - alarm_table.row(_id)) - return models.Alarm(**stored_alarm) - - def create_alarm(self, alarm): - return self.update_alarm(alarm, upsert=True) - - def delete_alarm(self, alarm_id): - """Delete an alarm and its history data.""" - with self.conn_pool.connection() as conn: - alarm_table = conn.table(self.ALARM_TABLE) - alarm_table.delete(alarm_id) - q = hbase_utils.make_query(alarm_id=alarm_id) - alarm_history_table = conn.table(self.ALARM_HISTORY_TABLE) - for alarm_id, ignored in alarm_history_table.scan(filter=q): - alarm_history_table.delete(alarm_id) - - def get_alarms(self, name=None, user=None, state=None, meter=None, - project=None, enabled=None, alarm_id=None, - alarm_type=None, severity=None, exclude=None, - pagination=None): - if pagination: - raise aodh.NotImplementedError('Pagination query not implemented') - if meter: - raise aodh.NotImplementedError( - 'Filter by meter not implemented') - - q = hbase_utils.make_query(alarm_id=alarm_id, name=name, - enabled=enabled, user_id=user, - project_id=project, state=state, - type=alarm_type, severity=severity, - exclude=exclude) - - with self.conn_pool.connection() as conn: - alarm_table = conn.table(self.ALARM_TABLE) - gen = alarm_table.scan(filter=q) - alarms = [hbase_utils.deserialize_entry(data) - for ignored, data in gen] - for alarm in sorted( - alarms, - key=operator.itemgetter('timestamp'), - reverse=True): - yield models.Alarm(**alarm) - - def get_alarm_changes(self, alarm_id, on_behalf_of, - user=None, project=None, alarm_type=None, - severity=None, start_timestamp=None, - start_timestamp_op=None, end_timestamp=None, - end_timestamp_op=None, pagination=None): - if pagination: - raise aodh.NotImplementedError('Pagination query not implemented') - q = hbase_utils.make_query(alarm_id=alarm_id, - on_behalf_of=on_behalf_of, type=alarm_type, - user_id=user, project_id=project, - severity=severity) - start_row, end_row = hbase_utils.make_timestamp_query( - hbase_utils.make_general_rowkey_scan, - start=start_timestamp, start_op=start_timestamp_op, - end=end_timestamp, end_op=end_timestamp_op, bounds_only=True, - some_id=alarm_id) - with self.conn_pool.connection() as conn: - alarm_history_table = conn.table(self.ALARM_HISTORY_TABLE) - gen = alarm_history_table.scan(filter=q, row_start=start_row, - row_stop=end_row) - for ignored, data in gen: - stored_entry = hbase_utils.deserialize_entry(data) - yield models.AlarmChange(**stored_entry) - - def record_alarm_change(self, alarm_change): - """Record alarm change event.""" - alarm_change_dict = hbase_utils.serialize_entry(alarm_change) - ts = alarm_change.get('timestamp') or datetime.datetime.now() - rts = hbase_utils.timestamp(ts) - with self.conn_pool.connection() as conn: - alarm_history_table = conn.table(self.ALARM_HISTORY_TABLE) - alarm_history_table.put( - hbase_utils.prepare_key(alarm_change.get('alarm_id'), rts), - alarm_change_dict) diff --git a/aodh/storage/impl_mongodb.py b/aodh/storage/impl_mongodb.py deleted file mode 100644 index 0f1f158d4..000000000 --- a/aodh/storage/impl_mongodb.py +++ /dev/null @@ -1,106 +0,0 @@ -# -# Copyright 2012 New Dream Network, LLC (DreamHost) -# Copyright 2013 eNovance -# Copyright 2014-2015 Red Hat, Inc -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -"""MongoDB storage backend""" - -from oslo_log import log -import pymongo - -from aodh import storage -from aodh.storage.mongo import utils as pymongo_utils -from aodh.storage import pymongo_base - -LOG = log.getLogger(__name__) - - -class Connection(pymongo_base.Connection): - """Put the alarm data into a MongoDB database.""" - - CONNECTION_POOL = pymongo_utils.ConnectionPool() - - def __init__(self, conf, url): - self.conf = conf - # NOTE(jd) Use our own connection pooling on top of the Pymongo one./ - # We need that otherwise we overflow the MongoDB instance with new - # connection since we instantiate a Pymongo client each time someone - # requires a new storage connection. - self.conn = self.CONNECTION_POOL.connect( - url, - conf.database.max_retries, - conf.database.retry_interval) - - # Require MongoDB 2.4 to use $setOnInsert - if self.conn.server_info()['versionArray'] < [2, 4]: - raise storage.StorageBadVersion("Need at least MongoDB 2.4") - - connection_options = pymongo.uri_parser.parse_uri(url) - self.db = getattr(self.conn, connection_options['database']) - if connection_options.get('username'): - self.db.authenticate(connection_options['username'], - connection_options['password']) - - # NOTE(jd) Upgrading is just about creating index, so let's do this - # on connection to be sure at least the TTL is correctly updated if - # needed. - self.upgrade() - - @staticmethod - def update_ttl(ttl, ttl_index_name, index_field, coll): - """Update or ensure time_to_live indexes. - - :param ttl: time to live in seconds. - :param ttl_index_name: name of the index we want to update or ensure. - :param index_field: field with the index that we need to update. - :param coll: collection which indexes need to be updated. - """ - indexes = coll.index_information() - if ttl <= 0: - if ttl_index_name in indexes: - coll.drop_index(ttl_index_name) - return - - if ttl_index_name in indexes: - return coll.database.command( - 'collMod', coll.name, - index={'keyPattern': {index_field: pymongo.ASCENDING}, - 'expireAfterSeconds': ttl}) - - coll.ensure_index([(index_field, pymongo.ASCENDING)], - expireAfterSeconds=ttl, - name=ttl_index_name) - - def upgrade(self): - super(Connection, self).upgrade() - # Establish indexes - ttl = self.conf.database.alarm_history_time_to_live - self.update_ttl( - ttl, 'alarm_history_ttl', 'timestamp', self.db.alarm_history) - - def clear(self): - self.conn.drop_database(self.db.name) - # Connection will be reopened automatically if needed - self.conn.close() - - def clear_expired_alarm_history_data(self, alarm_history_ttl): - """Clear expired alarm history data from the backend storage system. - - Clearing occurs according to the time-to-live. - - :param alarm_history_ttl: Number of seconds to keep alarm history - records for. - """ - LOG.debug("Clearing expired alarm history data is based on native " - "MongoDB time to live feature and going in background.") diff --git a/aodh/storage/impl_sqlalchemy.py b/aodh/storage/impl_sqlalchemy.py index 80901e8e6..092097957 100644 --- a/aodh/storage/impl_sqlalchemy.py +++ b/aodh/storage/impl_sqlalchemy.py @@ -30,6 +30,7 @@ from sqlalchemy import desc from sqlalchemy import func from sqlalchemy.orm import exc +import aodh from aodh.i18n import _LI from aodh import storage from aodh.storage import base @@ -149,7 +150,7 @@ class Connection(base.Connection): return (self._row_to_alarm_model(x) for x in query.all()) @staticmethod - def _get_pagination_query(query, pagination, api_model, model): + def _get_pagination_query(session, query, pagination, api_model, model): if not pagination.get('sort'): pagination['sort'] = api_model.DEFAULT_SORT marker = None @@ -168,6 +169,9 @@ class Connection(base.Connection): # order when "severity" specified in sorts. for sort_key, sort_dir in pagination['sort'][::-1]: if sort_key == 'severity': + engine = session.connection() + if engine.dialect.name != "mysql": + raise aodh.NotImplementedError sort_dir_func = {'asc': asc, 'desc': desc}[sort_dir] query = query.order_by(sort_dir_func( func.field(getattr(model, sort_key), 'low', @@ -222,7 +226,7 @@ class Connection(base.Connection): query = query.filter(getattr(models.Alarm, key) != value) query = self._get_pagination_query( - query, pagination, alarm_api_models.Alarm, models.Alarm) + session, query, pagination, alarm_api_models.Alarm, models.Alarm) alarms = self._retrieve_alarms(query) # TODO(cmart): improve this by using sqlalchemy.func factory @@ -360,7 +364,7 @@ class Connection(base.Connection): models.AlarmChange.timestamp < end_timestamp) query = self._get_pagination_query( - query, pagination, alarm_api_models.AlarmChange, + session, query, pagination, alarm_api_models.AlarmChange, models.AlarmChange) return self._retrieve_alarm_history(query) diff --git a/aodh/storage/mongo/__init__.py b/aodh/storage/mongo/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/aodh/storage/mongo/utils.py b/aodh/storage/mongo/utils.py deleted file mode 100644 index 1fe65e92c..000000000 --- a/aodh/storage/mongo/utils.py +++ /dev/null @@ -1,297 +0,0 @@ -# -# Copyright Ericsson AB 2013. All rights reserved -# Copyright 2015 Red Hat, Inc -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -"""Common functions for MongoDB backend -""" - -import weakref - -from oslo_log import log -from oslo_utils import netutils -import pymongo -import retrying - -from aodh.i18n import _LI, _LW - -LOG = log.getLogger(__name__) - - -def make_timestamp_range(start, end, - start_timestamp_op=None, end_timestamp_op=None): - - """Create the query document to find timestamps within that range. - - This is done by given two possible datetimes and their operations. - By default, using $gte for the lower bound and $lt for the upper bound. - """ - ts_range = {} - - if start: - if start_timestamp_op == 'gt': - start_timestamp_op = '$gt' - else: - start_timestamp_op = '$gte' - ts_range[start_timestamp_op] = start - - if end: - if end_timestamp_op == 'le': - end_timestamp_op = '$lte' - else: - end_timestamp_op = '$lt' - ts_range[end_timestamp_op] = end - return ts_range - - -class ConnectionPool(object): - - def __init__(self): - self._pool = {} - - def connect(self, url, max_retries, retry_interval): - connection_options = pymongo.uri_parser.parse_uri(url) - del connection_options['database'] - del connection_options['username'] - del connection_options['password'] - del connection_options['collection'] - pool_key = tuple(connection_options) - - if pool_key in self._pool: - client = self._pool.get(pool_key)() - if client: - return client - splitted_url = netutils.urlsplit(url) - log_data = {'db': splitted_url.scheme, - 'nodelist': connection_options['nodelist']} - LOG.info(_LI('Connecting to %(db)s on %(nodelist)s'), log_data) - try: - client = MongoProxy( - pymongo.MongoClient(url), - max_retries, - retry_interval, - ) - except pymongo.errors.ConnectionFailure as e: - LOG.warning(_LW('Unable to connect to the database server: ' - '%(errmsg)s.'), {'errmsg': e}) - raise - self._pool[pool_key] = weakref.ref(client) - return client - - -class QueryTransformer(object): - - operators = {"<": "$lt", - ">": "$gt", - "<=": "$lte", - "=<": "$lte", - ">=": "$gte", - "=>": "$gte", - "!=": "$ne", - "in": "$in", - "=~": "$regex"} - - complex_operators = {"or": "$or", - "and": "$and"} - - ordering_functions = {"asc": pymongo.ASCENDING, - "desc": pymongo.DESCENDING} - - def transform_orderby(self, orderby): - orderby_filter = [] - - for field in orderby: - field_name = list(field.keys())[0] - ordering = self.ordering_functions[list(field.values())[0]] - orderby_filter.append((field_name, ordering)) - return orderby_filter - - @staticmethod - def _move_negation_to_leaf(condition): - """Moves every not operator to the leafs. - - Moving is going by applying the De Morgan rules and annihilating - double negations. - """ - def _apply_de_morgan(tree, negated_subtree, negated_op): - if negated_op == "and": - new_op = "or" - else: - new_op = "and" - - tree[new_op] = [{"not": child} - for child in negated_subtree[negated_op]] - del tree["not"] - - def transform(subtree): - op = list(subtree.keys())[0] - if op in ["and", "or"]: - [transform(child) for child in subtree[op]] - elif op == "not": - negated_tree = subtree[op] - negated_op = list(negated_tree.keys())[0] - if negated_op == "and": - _apply_de_morgan(subtree, negated_tree, negated_op) - transform(subtree) - elif negated_op == "or": - _apply_de_morgan(subtree, negated_tree, negated_op) - transform(subtree) - elif negated_op == "not": - # two consecutive not annihilates themselves - value = list(negated_tree.values())[0] - new_op = list(value.keys())[0] - subtree[new_op] = negated_tree[negated_op][new_op] - del subtree["not"] - transform(subtree) - - transform(condition) - - def transform_filter(self, condition): - # in Mongo not operator can only be applied to - # simple expressions so we have to move every - # not operator to the leafs of the expression tree - self._move_negation_to_leaf(condition) - return self._process_json_tree(condition) - - def _handle_complex_op(self, complex_op, nodes): - element_list = [] - for node in nodes: - element = self._process_json_tree(node) - element_list.append(element) - complex_operator = self.complex_operators[complex_op] - op = {complex_operator: element_list} - return op - - def _handle_not_op(self, negated_tree): - # assumes that not is moved to the leaf already - # so we are next to a leaf - negated_op = list(negated_tree.keys())[0] - negated_field = list(negated_tree[negated_op].keys())[0] - value = negated_tree[negated_op][negated_field] - if negated_op == "=": - return {negated_field: {"$ne": value}} - elif negated_op == "!=": - return {negated_field: value} - else: - return {negated_field: {"$not": - {self.operators[negated_op]: value}}} - - def _handle_simple_op(self, simple_op, nodes): - field_name = list(nodes.keys())[0] - field_value = list(nodes.values())[0] - - # no operator for equal in Mongo - if simple_op == "=": - op = {field_name: field_value} - return op - - operator = self.operators[simple_op] - op = {field_name: {operator: field_value}} - return op - - def _process_json_tree(self, condition_tree): - operator_node = list(condition_tree.keys())[0] - nodes = list(condition_tree.values())[0] - - if operator_node in self.complex_operators: - return self._handle_complex_op(operator_node, nodes) - - if operator_node == "not": - negated_tree = condition_tree[operator_node] - return self._handle_not_op(negated_tree) - - return self._handle_simple_op(operator_node, nodes) - -MONGO_METHODS = set([typ for typ in dir(pymongo.collection.Collection) - if not typ.startswith('_')]) -MONGO_METHODS.update(set([typ for typ in dir(pymongo.MongoClient) - if not typ.startswith('_')])) -MONGO_METHODS.update(set([typ for typ in dir(pymongo) - if not typ.startswith('_')])) - - -def _safe_mongo_call(max_retries, retry_interval): - return retrying.retry( - retry_on_exception=lambda e: isinstance( - e, pymongo.errors.AutoReconnect), - wait_fixed=retry_interval * 1000, - stop_max_attempt_number=max_retries if max_retries >= 0 else None - ) - - -class MongoProxy(object): - def __init__(self, conn, max_retries, retry_interval): - self.conn = conn - self.max_retries = max_retries - self.retry_interval = retry_interval - - def __getitem__(self, item): - """Create and return proxy around the method in the connection. - - :param item: name of the connection - """ - return MongoProxy(self.conn[item]) - - def find(self, *args, **kwargs): - # We need this modifying method to return a CursorProxy object so that - # we can handle the Cursor next function to catch the AutoReconnect - # exception. - return CursorProxy(self.conn.find(*args, **kwargs), - self.max_retries, - self.retry_interval) - - def __getattr__(self, item): - """Wrap MongoDB connection. - - If item is the name of an executable method, for example find or - insert, wrap this method to retry. - Else wrap getting attribute with MongoProxy. - """ - if item in ('name', 'database'): - return getattr(self.conn, item) - if item in MONGO_METHODS: - return _safe_mongo_call( - self.max_retries, - self.retry_interval, - )(getattr(self.conn, item)) - return MongoProxy(getattr(self.conn, item), - self.max_retries, - self.retry_interval) - - def __call__(self, *args, **kwargs): - return self.conn(*args, **kwargs) - - -class CursorProxy(pymongo.cursor.Cursor): - def __init__(self, cursor, max_retries, retry_interval): - self.cursor = cursor - self.next = _safe_mongo_call( - max_retries, retry_interval)(self._next) - - def __getitem__(self, item): - return self.cursor[item] - - def _next(self): - """Wrap Cursor next method. - - This method will be executed before each Cursor next method call. - """ - try: - save_cursor = self.cursor.clone() - return self.cursor.next() - except pymongo.errors.AutoReconnect: - self.cursor = save_cursor - raise - - def __getattr__(self, item): - return getattr(self.cursor, item) diff --git a/aodh/storage/pymongo_base.py b/aodh/storage/pymongo_base.py deleted file mode 100644 index 4d3df1df4..000000000 --- a/aodh/storage/pymongo_base.py +++ /dev/null @@ -1,317 +0,0 @@ -# -# Copyright Ericsson AB 2013. All rights reserved -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -"""Common functions for MongoDB backend -""" -import pymongo -import six - -import aodh -from aodh import storage -from aodh.storage import base -from aodh.storage import models -from aodh.storage.mongo import utils as pymongo_utils - - -COMMON_AVAILABLE_CAPABILITIES = { - 'alarms': {'query': {'simple': True, - 'complex': True}, - 'history': {'query': {'simple': True, - 'complex': True}}}, -} - - -AVAILABLE_STORAGE_CAPABILITIES = { - 'storage': {'production_ready': True}, -} - - -class Connection(base.Connection): - """Base Alarm Connection class for MongoDB driver.""" - CAPABILITIES = base.update_nested(base.Connection.CAPABILITIES, - COMMON_AVAILABLE_CAPABILITIES) - - STORAGE_CAPABILITIES = base.update_nested( - base.Connection.STORAGE_CAPABILITIES, - AVAILABLE_STORAGE_CAPABILITIES, - ) - - def upgrade(self): - # create collection if not present - if 'alarm' not in self.db.conn.collection_names(): - self.db.conn.create_collection('alarm') - if 'alarm_history' not in self.db.conn.collection_names(): - self.db.conn.create_collection('alarm_history') - - def update_alarm(self, alarm, upsert=False): - """Update alarm.""" - data = alarm.as_dict() - - self.db.alarm.update( - {'alarm_id': alarm.alarm_id}, - {'$set': data}, - upsert=upsert) - - alarms_found = self.db.alarm.find({'alarm_id': alarm.alarm_id}) - if alarms_found.count() == 0: - raise storage.AlarmNotFound(alarm.alarm_id) - stored_alarm = alarms_found[0] - del stored_alarm['_id'] - self._ensure_encapsulated_rule_format(stored_alarm) - self._ensure_time_constraints(stored_alarm) - return models.Alarm(**stored_alarm) - - def create_alarm(self, alarm): - return self.update_alarm(alarm, upsert=True) - - def delete_alarm(self, alarm_id): - """Delete an alarm and its history data.""" - self.db.alarm.remove({'alarm_id': alarm_id}) - self.db.alarm_history.remove({'alarm_id': alarm_id}) - - def record_alarm_change(self, alarm_change): - """Record alarm change event.""" - self.db.alarm_history.insert(alarm_change.copy()) - - def get_alarms(self, name=None, user=None, state=None, meter=None, - project=None, enabled=None, alarm_id=None, - alarm_type=None, severity=None, exclude=None, - pagination=None): - """Yields a lists of alarms that match filters. - - :param name: Optional name for alarm. - :param user: Optional ID for user that owns the resource. - :param state: Optional string for alarm state. - :param meter: Optional string for alarms associated with meter. - :param project: Optional ID for project that owns the resource. - :param enabled: Optional boolean to list disable alarm. - :param alarm_id: Optional alarm_id to return one alarm. - :param alarm_type: Optional alarm type. - :param severity: Optional alarm severity. - :param exclude: Optional dict for inequality constraint. - :param pagination: Pagination query parameters. - """ - if pagination: - raise aodh.NotImplementedError('Pagination query not implemented') - q = {} - if user is not None: - q['user_id'] = user - if project is not None: - q['project_id'] = project - if name is not None: - q['name'] = name - if enabled is not None: - q['enabled'] = enabled - if alarm_id is not None: - q['alarm_id'] = alarm_id - if state is not None: - q['state'] = state - if meter is not None: - q['rule.meter_name'] = meter - if alarm_type is not None: - q['type'] = alarm_type - if severity is not None: - q['severity'] = severity - if exclude is not None: - for key, value in six.iteritems(exclude): - q[key] = {'$ne': value} - - return self._retrieve_alarms(q, - [("timestamp", - pymongo.DESCENDING)], - None) - - def get_alarm_changes(self, alarm_id, on_behalf_of, - user=None, project=None, alarm_type=None, - severity=None, start_timestamp=None, - start_timestamp_op=None, end_timestamp=None, - end_timestamp_op=None, pagination=None): - """Yields list of AlarmChanges describing alarm history - - Changes are always sorted in reverse order of occurrence, given - the importance of currency. - - Segregation for non-administrative users is done on the basis - of the on_behalf_of parameter. This allows such users to have - visibility on both the changes initiated by themselves directly - (generally creation, rule changes, or deletion) and also on those - changes initiated on their behalf by the alarming service (state - transitions after alarm thresholds are crossed). - - :param alarm_id: ID of alarm to return changes for - :param on_behalf_of: ID of tenant to scope changes query (None for - administrative user, indicating all projects) - :param user: Optional ID of user to return changes for - :param project: Optional ID of project to return changes for - :param alarm_type: Optional change type - :param severity: Optional change severity - :param start_timestamp: Optional modified timestamp start range - :param start_timestamp_op: Optional timestamp start range operation - :param end_timestamp: Optional modified timestamp end range - :param end_timestamp_op: Optional timestamp end range operation - :param pagination: Pagination query parameters. - """ - if pagination: - raise aodh.NotImplementedError('Pagination query not implemented') - q = dict(alarm_id=alarm_id) - if on_behalf_of is not None: - q['on_behalf_of'] = on_behalf_of - if user is not None: - q['user_id'] = user - if project is not None: - q['project_id'] = project - if alarm_type is not None: - q['type'] = alarm_type - if severity is not None: - q['severity'] = severity - if start_timestamp or end_timestamp: - ts_range = pymongo_utils.make_timestamp_range(start_timestamp, - end_timestamp, - start_timestamp_op, - end_timestamp_op) - if ts_range: - q['timestamp'] = ts_range - - return self._retrieve_alarm_changes(q, - [("timestamp", - pymongo.DESCENDING)], - None) - - def query_alarms(self, filter_expr=None, orderby=None, limit=None): - """Return an iterable of model.Alarm objects.""" - return self._retrieve_data(filter_expr, orderby, limit, - models.Alarm) - - def query_alarm_history(self, filter_expr=None, orderby=None, limit=None): - """Return an iterable of model.AlarmChange objects.""" - return self._retrieve_data(filter_expr, - orderby, - limit, - models.AlarmChange) - - def _retrieve_data(self, filter_expr, orderby, limit, model): - if limit == 0: - return [] - query_filter = {} - orderby_filter = [("timestamp", pymongo.DESCENDING)] - transformer = pymongo_utils.QueryTransformer() - if orderby is not None: - orderby_filter = transformer.transform_orderby(orderby) - if filter_expr is not None: - query_filter = transformer.transform_filter(filter_expr) - - retrieve = {models.Alarm: self._retrieve_alarms, - models.AlarmChange: self._retrieve_alarm_changes} - return retrieve[model](query_filter, orderby_filter, limit) - - def _retrieve_alarms(self, query_filter, orderby, limit): - if limit is not None: - alarms = self.db.alarm.find(query_filter, - limit=limit, - sort=orderby) - else: - alarms = self.db.alarm.find(query_filter, sort=orderby) - - for alarm in alarms: - a = {} - a.update(alarm) - del a['_id'] - self._ensure_encapsulated_rule_format(a) - self._ensure_time_constraints(a) - yield models.Alarm(**a) - - def _retrieve_alarm_changes(self, query_filter, orderby, limit): - if limit is not None: - alarms_history = self.db.alarm_history.find(query_filter, - limit=limit, - sort=orderby) - else: - alarms_history = self.db.alarm_history.find( - query_filter, sort=orderby) - - for alarm_history in alarms_history: - ah = {} - ah.update(alarm_history) - del ah['_id'] - yield models.AlarmChange(**ah) - - @classmethod - def _ensure_encapsulated_rule_format(cls, alarm): - """Ensure the alarm returned by the storage have the correct format. - - The previous format looks like: - { - 'alarm_id': '0ld-4l3rt', - 'enabled': True, - 'name': 'old-alert', - 'description': 'old-alert', - 'timestamp': None, - 'meter_name': 'cpu', - 'user_id': 'me', - 'project_id': 'and-da-boys', - 'comparison_operator': 'lt', - 'threshold': 36, - 'statistic': 'count', - 'evaluation_periods': 1, - 'period': 60, - 'state': "insufficient data", - 'state_timestamp': None, - 'ok_actions': [], - 'alarm_actions': ['http://nowhere/alarms'], - 'insufficient_data_actions': [], - 'repeat_actions': False, - 'matching_metadata': {'key': 'value'} - # or 'matching_metadata': [{'key': 'key', 'value': 'value'}] - } - """ - - if isinstance(alarm.get('rule'), dict): - return - - alarm['type'] = 'threshold' - alarm['rule'] = {} - alarm['matching_metadata'] = cls._decode_matching_metadata( - alarm['matching_metadata']) - for field in ['period', 'evaluation_periods', 'threshold', - 'statistic', 'comparison_operator', 'meter_name']: - if field in alarm: - alarm['rule'][field] = alarm[field] - del alarm[field] - - query = [] - for key in alarm['matching_metadata']: - query.append({'field': key, - 'op': 'eq', - 'value': alarm['matching_metadata'][key], - 'type': 'string'}) - del alarm['matching_metadata'] - alarm['rule']['query'] = query - - @staticmethod - def _decode_matching_metadata(matching_metadata): - if isinstance(matching_metadata, dict): - # note(sileht): keep compatibility with alarm - # with matching_metadata as a dict - return matching_metadata - else: - new_matching_metadata = {} - for elem in matching_metadata: - new_matching_metadata[elem['key']] = elem['value'] - return new_matching_metadata - - @staticmethod - def _ensure_time_constraints(alarm): - """Ensures the alarm has a time constraints field.""" - if 'time_constraints' not in alarm: - alarm['time_constraints'] = [] diff --git a/aodh/tests/functional/api/v2/test_alarm_scenarios.py b/aodh/tests/functional/api/v2/test_alarm_scenarios.py index ad695dfb6..c4189ed29 100644 --- a/aodh/tests/functional/api/v2/test_alarm_scenarios.py +++ b/aodh/tests/functional/api/v2/test_alarm_scenarios.py @@ -30,7 +30,6 @@ from aodh import messaging from aodh.storage import models from aodh.tests import constants from aodh.tests.functional.api import v2 -from aodh.tests.functional import db as tests_db def default_alarms(auth_headers): @@ -1365,7 +1364,7 @@ class TestAlarms(TestAlarmsBase): self.assertEqual(1, len(alarms)) # FIXME(sileht): This should really returns [] not None - # but the mongodb and sql just store the json dict as is... + # but SQL just stores the json dict as is... # migration script for sql will be a mess because we have # to parse all JSON :( # I guess we assume that wsme convert the None input to [] @@ -3300,7 +3299,6 @@ class TestAlarmsCompositeRule(TestAlarmsBase): response.json['error_message']['faultstring']) -@tests_db.run_with('mysql', 'pgsql', 'sqlite') class TestPaginationQuery(TestAlarmsBase): def setUp(self): super(TestPaginationQuery, self).setUp() @@ -3318,6 +3316,8 @@ class TestPaginationQuery(TestAlarmsBase): self.assertEqual(['name1', 'name2', 'name3', 'name4'], names) def test_sort_by_severity_with_its_value(self): + if self.engine != "mysql": + self.skipTest("This is only implemented for MySQL") data = self.get_json('/alarms?sort=severity:asc', headers=self.auth_headers) severities = [a['severity'] for a in data] diff --git a/aodh/tests/functional/api/v2/test_complex_query_scenarios.py b/aodh/tests/functional/api/v2/test_complex_query_scenarios.py index 10913d001..110278f99 100644 --- a/aodh/tests/functional/api/v2/test_complex_query_scenarios.py +++ b/aodh/tests/functional/api/v2/test_complex_query_scenarios.py @@ -21,7 +21,6 @@ from oslo_utils import timeutils from aodh.storage import models from aodh.tests.functional.api import v2 as tests_api -from aodh.tests.functional import db as tests_db admin_header = {"X-Roles": "admin", @@ -195,8 +194,9 @@ class TestQueryAlarmsController(tests_api.FunctionalTest): for alarm in data.json: self.assertEqual("alarm", alarm["state"]) - @tests_db.run_with('mysql', 'pgsql', 'sqlite') def test_query_with_orderby_severity(self): + if self.engine != "mysql": + self.skipTest("This is only implemented for MySQL") orderby = '[{"severity": "ASC"}]' data = self.post_json(self.alarm_url, headers=admin_header, diff --git a/aodh/tests/functional/db.py b/aodh/tests/functional/db.py index cd257d6d7..fd7dd50a7 100644 --- a/aodh/tests/functional/db.py +++ b/aodh/tests/functional/db.py @@ -20,29 +20,14 @@ import os import uuid import fixtures -import mock from oslo_config import fixture as fixture_config from oslotest import mockpatch import six from six.moves.urllib import parse as urlparse -from testtools import testcase from aodh import service from aodh import storage from aodh.tests import base as test_base -try: - from aodh.tests import mocks -except ImportError: - mocks = None # happybase module is not Python 3 compatible yet - - -class MongoDbManager(fixtures.Fixture): - - def __init__(self, conf): - self.url = '%(url)s_%(db)s' % { - 'url': conf.database.connection, - 'db': uuid.uuid4().hex, - } class SQLManager(fixtures.Fixture): @@ -80,36 +65,6 @@ class MySQLManager(SQLManager): conn.execute('CREATE DATABASE %s;' % db_name) -class HBaseManager(fixtures.Fixture): - def __init__(self, conf): - self.url = '%s?table_prefix=%s' % ( - conf.database.connection, - os.getenv("AODH_TEST_HBASE_TABLE_PREFIX", "test") - ) - - def setUp(self): - super(HBaseManager, self).setUp() - # Unique prefix for each test to keep data is distinguished because - # all test data is stored in one table - data_prefix = str(uuid.uuid4().hex) - - def table(conn, name): - return mocks.MockHBaseTable(name, conn, data_prefix) - - # Mock only real HBase connection, MConnection "table" method - # stays origin. - mock.patch('happybase.Connection.table', new=table).start() - # We shouldn't delete data and tables after each test, - # because it last for too long. - # All tests tables will be deleted in setup-test-env.sh - mock.patch("happybase.Connection.disable_table", - new=mock.MagicMock()).start() - mock.patch("happybase.Connection.delete_table", - new=mock.MagicMock()).start() - mock.patch("happybase.Connection.create_table", - new=mock.MagicMock()).start() - - class SQLiteManager(fixtures.Fixture): def __init__(self, conf): @@ -120,13 +75,10 @@ class SQLiteManager(fixtures.Fixture): class TestBase(test_base.BaseTestCase): DRIVER_MANAGERS = { - 'mongodb': MongoDbManager, 'mysql': MySQLManager, 'postgresql': PgSQLManager, 'sqlite': SQLiteManager, } - if mocks is not None: - DRIVER_MANAGERS['hbase'] = HBaseManager def setUp(self): super(TestBase, self).setUp() @@ -137,23 +89,16 @@ class TestBase(test_base.BaseTestCase): engine = urlparse.urlparse(db_url).scheme # In case some drivers have additional specification, for example: # PyMySQL will have scheme mysql+pymysql. - engine = engine.split('+')[0] - - # NOTE(Alexei_987) Shortcut to skip expensive db setUp - test_method = self._get_test_method() - if (hasattr(test_method, '_run_with') - and engine not in test_method._run_with): - raise testcase.TestSkipped( - 'Test is not applicable for %s' % engine) + self.engine = engine.split('+')[0] conf = service.prepare_service(argv=[], config_files=[]) self.CONF = self.useFixture(fixture_config.Config(conf)).conf self.CONF.set_override('connection', db_url, group="database", enforce_type=True) - manager = self.DRIVER_MANAGERS.get(engine) + manager = self.DRIVER_MANAGERS.get(self.engine) if not manager: - self.skipTest("missing driver manager: %s" % engine) + self.skipTest("missing driver manager: %s" % self.engine) self.db_manager = manager(self.CONF) @@ -176,25 +121,3 @@ class TestBase(test_base.BaseTestCase): def _get_connection(self, conf): return self.alarm_conn - - -def run_with(*drivers): - """Used to mark tests that are only applicable for certain db driver. - - Skips test if driver is not available. - """ - def decorator(test): - if isinstance(test, type) and issubclass(test, TestBase): - # Decorate all test methods - for attr in dir(test): - if attr.startswith('test_'): - value = getattr(test, attr) - if callable(value): - if six.PY3: - value._run_with = drivers - else: - value.__func__._run_with = drivers - else: - test._run_with = drivers - return test - return decorator diff --git a/aodh/tests/functional/storage/sqlalchemy/test_migrations.py b/aodh/tests/functional/storage/sqlalchemy/test_migrations.py index 6566d9757..9d126a15e 100644 --- a/aodh/tests/functional/storage/sqlalchemy/test_migrations.py +++ b/aodh/tests/functional/storage/sqlalchemy/test_migrations.py @@ -27,7 +27,6 @@ class ABCSkip(base.SkipNotImplementedMeta, abc.ABCMeta): pass -@tests_db.run_with('mysql', 'pgsql', 'sqlite') class ModelsMigrationsSync( six.with_metaclass(ABCSkip, tests_db.TestBase, diff --git a/aodh/tests/functional/storage/test_data_migration.py b/aodh/tests/functional/storage/test_data_migration.py deleted file mode 100644 index 585ad84cf..000000000 --- a/aodh/tests/functional/storage/test_data_migration.py +++ /dev/null @@ -1,112 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -import datetime -import uuid - -import mock -from oslo_config import fixture as fixture_config - -from aodh.cmd import data_migration -from aodh import service -from aodh import storage -from aodh.storage import models as alarm_models -from aodh.tests.functional import db as tests_db -from aodh.tests.functional.storage import test_storage_scenarios - - -@tests_db.run_with('hbase', 'mongodb') -class TestDataMigration(test_storage_scenarios.AlarmTestBase): - - def setUp(self): - sql_conf = service.prepare_service(argv=[], config_files=[]) - self.sql_conf = self.useFixture(fixture_config.Config(sql_conf)).conf - # using sqlite to represent the type of SQL dbs - self.sql_conf.set_override('connection', "sqlite://", - group="database", enforce_type=True) - self.sql_namager = tests_db.SQLiteManager(self.sql_conf) - self.useFixture(self.sql_namager) - self.sql_conf.set_override('connection', self.sql_namager.url, - group="database", enforce_type=True) - self.sql_alarm_conn = storage.get_connection_from_config(self.sql_conf) - self.sql_alarm_conn.upgrade() - super(TestDataMigration, self).setUp() - self.add_some_alarms() - self._add_some_alarm_changes() - - def tearDown(self): - self.sql_alarm_conn.clear() - self.sql_alarm_conn = None - super(TestDataMigration, self).tearDown() - - def _add_some_alarm_changes(self): - alarms = list(self.alarm_conn.get_alarms()) - i = 0 - for alarm in alarms: - for change_type in [alarm_models.AlarmChange.CREATION, - alarm_models.AlarmChange.RULE_CHANGE, - alarm_models.AlarmChange.STATE_TRANSITION, - alarm_models.AlarmChange.STATE_TRANSITION, - alarm_models.AlarmChange.STATE_TRANSITION]: - alarm_change = { - "event_id": str(uuid.uuid4()), - "alarm_id": alarm.alarm_id, - "type": change_type, - "detail": "detail %s" % alarm.name, - "user_id": alarm.user_id, - "project_id": alarm.project_id, - "on_behalf_of": alarm.project_id, - "timestamp": datetime.datetime(2014, 4, 7, 7, 30 + i) - } - self.alarm_conn.record_alarm_change(alarm_change=alarm_change) - i += 1 - - def test_data_migration_without_history_data(self): - alarms = list(self.alarm_conn.get_alarms()) - self.assertEqual(3, len(alarms)) - alarms_sql = list(self.sql_alarm_conn.get_alarms()) - self.assertEqual(0, len(alarms_sql)) - test_args = data_migration.get_parser().parse_args( - ['--sql-conn', 'sqlite://', '--nosql-conn', - self.CONF.database.connection, '--migrate-history', False]) - with mock.patch('argparse.ArgumentParser.parse_args') as args_parser: - # because get_connection_from_config has been mocked in - # aodh.tests.functional.db.TestBase#setUp, here re-mocked it that - # this test can get nosql and sql storage connections - with mock.patch('aodh.storage.get_connection_from_config') as conn: - conn.side_effect = [self.alarm_conn, self.sql_alarm_conn] - args_parser.return_value = test_args - data_migration.main() - alarms_sql = list(self.sql_alarm_conn.get_alarms()) - alarm_changes = list(self.sql_alarm_conn.query_alarm_history()) - self.assertEqual(0, len(alarm_changes)) - self.assertEqual(3, len(alarms_sql)) - self.assertEqual(sorted([a.alarm_id for a in alarms]), - sorted([a.alarm_id for a in alarms_sql])) - - def test_data_migration_with_history_data(self): - test_args = data_migration.get_parser().parse_args( - ['--sql-conn', 'sqlite://', '--nosql-conn', - self.CONF.database.connection]) - with mock.patch('argparse.ArgumentParser.parse_args') as args_parser: - # because get_connection_from_config has been mocked in - # aodh.tests.functional.db.TestBase#setUp, here re-mocked it that - # this test can get nosql and sql storage connections - with mock.patch('aodh.storage.get_connection_from_config') as conn: - conn.side_effect = [self.alarm_conn, self.sql_alarm_conn] - args_parser.return_value = test_args - data_migration.main() - alarms_sql = list(self.sql_alarm_conn.get_alarms()) - self.assertEqual(3, len(alarms_sql)) - for alarm in alarms_sql: - changes = list(self.sql_alarm_conn.get_alarm_changes( - alarm.alarm_id, alarm.project_id)) - self.assertEqual(5, len(changes)) diff --git a/aodh/tests/functional/storage/test_impl_hbase.py b/aodh/tests/functional/storage/test_impl_hbase.py deleted file mode 100644 index 96b07d94e..000000000 --- a/aodh/tests/functional/storage/test_impl_hbase.py +++ /dev/null @@ -1,67 +0,0 @@ -# -# Copyright 2012, 2013 Dell Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -"""Tests for aodh/storage/impl_hbase.py - -.. note:: - In order to run the tests against real HBase server set the environment - variable aodh_TEST_HBASE_URL to point to that HBase instance before - running the tests. Make sure the Thrift server is running on that server. - -""" -import mock - -try: - import happybase # noqa -except ImportError: - import testtools.testcase - raise testtools.testcase.TestSkipped("happybase is needed") - -from aodh.storage import impl_hbase -from aodh.tests import base as test_base -from aodh.tests.functional import db as tests_db - - -class ConnectionTest(tests_db.TestBase): - - @tests_db.run_with('hbase') - def test_hbase_connection(self): - - class TestConn(object): - def __init__(self, host, port): - self.netloc = '%s:%s' % (host, port) - - def open(self): - pass - - def get_connection_pool(conf): - return TestConn(conf['host'], conf['port']) - - with mock.patch.object(impl_hbase.Connection, '_get_connection_pool', - side_effect=get_connection_pool): - conn = impl_hbase.Connection(self.CONF, 'hbase://test_hbase:9090') - self.assertIsInstance(conn.conn_pool, TestConn) - - -class CapabilitiesTest(test_base.BaseTestCase): - def test_alarm_capabilities(self): - expected_capabilities = { - 'alarms': {'query': {'simple': True, - 'complex': False}, - 'history': {'query': {'simple': True, - 'complex': False}}}, - } - - actual_capabilities = impl_hbase.Connection.get_capabilities() - self.assertEqual(expected_capabilities, actual_capabilities) diff --git a/aodh/tests/functional/storage/test_impl_mongodb.py b/aodh/tests/functional/storage/test_impl_mongodb.py deleted file mode 100644 index a3386dfe8..000000000 --- a/aodh/tests/functional/storage/test_impl_mongodb.py +++ /dev/null @@ -1,101 +0,0 @@ -# -# Copyright 2012 New Dream Network, LLC (DreamHost) -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -"""Tests for aodh/storage/impl_mongodb.py - -.. note:: - In order to run the tests against another MongoDB server set the - environment variable aodh_TEST_MONGODB_URL to point to a MongoDB - server before running the tests. - -""" -import unittest - -try: - from aodh.storage import impl_mongodb -except ImportError: - impl_mongodb = None -from aodh.tests import base as test_base -from aodh.tests.functional import db as tests_db - - -@unittest.skipUnless(impl_mongodb, "pymongo not available") -@tests_db.run_with('mongodb') -class MongoDBConnection(tests_db.TestBase): - def test_connection_pooling(self): - test_conn = impl_mongodb.Connection(self.CONF, - self.CONF.database.connection) - self.assertEqual(self.alarm_conn.conn, test_conn.conn) - - def test_replica_set(self): - url = self.CONF.database.connection + '?replicaSet=foobar' - conn = impl_mongodb.Connection(self.CONF, url) - self.assertTrue(conn.conn) - - -@unittest.skipUnless(impl_mongodb, "pymongo not available") -@tests_db.run_with('mongodb') -class IndexTest(tests_db.TestBase): - def _test_ttl_index_absent(self, conn, coll_name, ttl_opt): - # create a fake index and check it is deleted - coll = getattr(conn.db, coll_name) - index_name = '%s_ttl' % coll_name - self.CONF.set_override(ttl_opt, -1, group='database', - enforce_type=True) - conn.upgrade() - self.assertNotIn(index_name, coll.index_information()) - - self.CONF.set_override(ttl_opt, 456789, group='database', - enforce_type=True) - conn.upgrade() - self.assertEqual(456789, - coll.index_information() - [index_name]['expireAfterSeconds']) - - def test_alarm_history_ttl_index_absent(self): - self._test_ttl_index_absent(self.alarm_conn, 'alarm_history', - 'alarm_history_time_to_live') - - def _test_ttl_index_present(self, conn, coll_name, ttl_opt): - coll = getattr(conn.db, coll_name) - self.CONF.set_override(ttl_opt, 456789, group='database', - enforce_type=True) - conn.upgrade() - index_name = '%s_ttl' % coll_name - self.assertEqual(456789, - coll.index_information() - [index_name]['expireAfterSeconds']) - - self.CONF.set_override(ttl_opt, -1, group='database', - enforce_type=True) - conn.upgrade() - self.assertNotIn(index_name, coll.index_information()) - - def test_alarm_history_ttl_index_present(self): - self._test_ttl_index_present(self.alarm_conn, 'alarm_history', - 'alarm_history_time_to_live') - - -class CapabilitiesTest(test_base.BaseTestCase): - @unittest.skipUnless(impl_mongodb, "pymongo not available") - def test_alarm_capabilities(self): - expected_capabilities = { - 'alarms': {'query': {'simple': True, - 'complex': True}, - 'history': {'query': {'simple': True, - 'complex': True}}}, - } - - actual_capabilities = impl_mongodb.Connection.get_capabilities() - self.assertEqual(expected_capabilities, actual_capabilities) diff --git a/aodh/tests/functional/storage/test_storage_scenarios.py b/aodh/tests/functional/storage/test_storage_scenarios.py index 0b6baa1bb..7cf07116d 100644 --- a/aodh/tests/functional/storage/test_storage_scenarios.py +++ b/aodh/tests/functional/storage/test_storage_scenarios.py @@ -265,7 +265,6 @@ class AlarmTest(AlarmTestBase): self.assertNotEqual(victim.name, s.name) -@tests_db.run_with('sqlite', 'mysql', 'pgsql', 'hbase') class AlarmHistoryTest(AlarmTestBase): def setUp(self): diff --git a/aodh/tests/mocks.py b/aodh/tests/mocks.py deleted file mode 100644 index 48f11fad8..000000000 --- a/aodh/tests/mocks.py +++ /dev/null @@ -1,79 +0,0 @@ - -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import os - -import happybase - - -class MockHBaseTable(happybase.Table): - - def __init__(self, name, connection, data_prefix): - # data_prefix is added to all rows which are written - # in this test. It allows to divide data from different tests - self.data_prefix = data_prefix - # We create happybase Table with prefix from - # AODH_TEST_HBASE_TABLE_PREFIX - prefix = os.getenv("AODH_TEST_HBASE_TABLE_PREFIX", 'test') - super(MockHBaseTable, self).__init__( - "%s_%s" % (prefix, name), - connection) - - def put(self, row, *args, **kwargs): - row = self.data_prefix + row - return super(MockHBaseTable, self).put(row, *args, - **kwargs) - - def scan(self, row_start=None, row_stop=None, row_prefix=None, - columns=None, filter=None, timestamp=None, - include_timestamp=False, batch_size=10, scan_batching=None, - limit=None, sorted_columns=False): - # Add data prefix for row parameters - # row_prefix could not be combined with row_start or row_stop - if not row_start and not row_stop: - row_prefix = self.data_prefix + (row_prefix or "") - row_start = None - row_stop = None - elif row_start and not row_stop: - # Adding data_prefix to row_start and row_stop does not work - # if it looks like row_start = %data_prefix%foo, - # row_stop = %data_prefix, because row_start > row_stop - filter = self._update_filter_row(filter) - row_start = self.data_prefix + row_start - else: - row_start = self.data_prefix + (row_start or "") - row_stop = self.data_prefix + (row_stop or "") - gen = super(MockHBaseTable, self).scan(row_start, row_stop, - row_prefix, columns, - filter, timestamp, - include_timestamp, batch_size, - scan_batching, limit, - sorted_columns) - data_prefix_len = len(self.data_prefix) - # Restore original row format - for row, data in gen: - yield (row[data_prefix_len:], data) - - def row(self, row, *args, **kwargs): - row = self.data_prefix + row - return super(MockHBaseTable, self).row(row, *args, **kwargs) - - def delete(self, row, *args, **kwargs): - row = self.data_prefix + row - return super(MockHBaseTable, self).delete(row, *args, **kwargs) - - def _update_filter_row(self, filter): - if filter: - return "PrefixFilter(%s) AND %s" % (self.data_prefix, filter) - else: - return "PrefixFilter(%s)" % self.data_prefix diff --git a/devstack/plugin.sh b/devstack/plugin.sh index 2b6701d7e..9fdf43a2a 100644 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -9,7 +9,7 @@ # By default all aodh services are started (see # devstack/settings). # -# AODH_BACKEND: Database backend (e.g. 'mysql', 'mongodb') +# AODH_BACKEND: Database backend (e.g. 'mysql') # AODH_COORDINATION_URL: URL for group membership service provided by tooz. # Support potential entry-points console scripts in VENV or not @@ -50,28 +50,6 @@ function aodh_service_url { } -# _install_mongdb - Install mongodb and python lib. -function _aodh_install_mongodb { - # Server package is the same on all - local packages=mongodb-server - - if is_fedora; then - # mongodb client - packages="${packages} mongodb" - fi - - install_package ${packages} - - if is_fedora; then - restart_service mongod - else - restart_service mongodb - fi - - # give time for service to restart - sleep 5 -} - # _install_redis() - Install the redis server and python lib. function _aodh_install_redis { if is_ubuntu; then @@ -102,18 +80,12 @@ function _aodh_config_apache_wsgi { fi sudo cp $AODH_DIR/devstack/apache-aodh.template $aodh_apache_conf - if [ "$AODH_BACKEND" = 'hbase' ] ; then - # Use one process to have single in-memory DB instance for data consistency - AODH_API_WORKERS=1 - else - AODH_API_WORKERS=$API_WORKERS - fi sudo sed -e " s|%PORT%|$AODH_SERVICE_PORT|g; s|%APACHE_NAME%|$APACHE_NAME|g; s|%WSGIAPP%|$AODH_WSGI_DIR/app|g; s|%USER%|$STACK_USER|g; - s|%APIWORKERS%|$AODH_API_WORKERS|g; + s|%APIWORKERS%|$API_WORKERS|g; s|%VIRTUALENV%|$venv_path|g " -i $aodh_apache_conf } @@ -127,14 +99,6 @@ function _aodh_prepare_coordination { fi } -# Install required services for storage backends -function _aodh_prepare_storage_backend { - if [ "$AODH_BACKEND" = 'mongodb' ] ; then - pip_install_gr pymongo - _aodh_install_mongodb - fi -} - # Create aodh related accounts in Keystone function _aodh_create_accounts { if is_service_enabled aodh-api; then @@ -170,9 +134,6 @@ function _aodh_cleanup_apache_wsgi { # cleanup_aodh() - Remove residual data files, anything left over # from previous runs that a clean run would need to clean up function cleanup_aodh { - if [ "$AODH_BACKEND" = 'mongodb' ] ; then - mongo aodh --eval "db.dropDatabase();" - fi if [ "$AODH_DEPLOY" == "mod_wsgi" ]; then _aodh_cleanup_apache_wsgi fi @@ -182,11 +143,6 @@ function cleanup_aodh { function _aodh_configure_storage_backend { if [ "$AODH_BACKEND" = 'mysql' ] || [ "$AODH_BACKEND" = 'postgresql' ] ; then iniset $AODH_CONF database connection $(database_connection_url aodh) - elif [ "$AODH_BACKEND" = 'mongodb' ] ; then - iniset $AODH_CONF database connection mongodb://localhost:27017/aodh - cleanup_aodh - elif [ "$AODH_BACKEND" = 'hbase' ] ; then - iniset $AODH_CONF database connection hbase://__test__ else die $LINENO "Unable to configure unknown AODH_BACKEND $AODH_BACKEND" fi @@ -291,7 +247,6 @@ function init_aodh { # otherwise makes sense to do the backend services). function install_aodh { _aodh_prepare_coordination - _aodh_prepare_storage_backend install_aodhclient sudo -H pip install -e "$AODH_DIR"[test,$AODH_BACKEND] sudo install -d -o $STACK_USER -m 755 $AODH_CONF_DIR diff --git a/doc/source/configuration.rst b/doc/source/configuration.rst index 6cb9bc33c..585bde91f 100644 --- a/doc/source/configuration.rst +++ b/doc/source/configuration.rst @@ -18,47 +18,6 @@ Configuration ============= -HBase -=================== - -This storage implementation uses Thrift HBase interface. The default Thrift -connection settings should be changed to support using ConnectionPool in HBase. -To ensure proper configuration, please add the following lines to the -`hbase-site.xml` configuration file:: - - - hbase.thrift.minWorkerThreads - 200 - - -For pure development purposes, you can use HBase from Apache_ or some other -vendor like Cloudera or Hortonworks. To verify your installation, you can use -the `list` command in `HBase shell`, to list the tables in your -HBase server, as follows:: - - $ ${HBASE_HOME}/bin/hbase shell - - hbase> list - -.. note:: - This driver has been tested against HBase 0.94.2/CDH 4.2.0, - HBase 0.94.4/HDP 1.2, HBase 0.94.18/Apache, HBase 0.94.5/Apache, - HBase 0.96.2/Apache and HBase 0.98.0/Apache. - Versions earlier than 0.92.1 are not supported due to feature incompatibility. - -To find out more about supported storage backends please take a look on the -:doc:`install/manual/` guide. - -.. note:: - - If you are changing the configuration on the fly to use HBase, as a storage - backend, you will need to restart the Aodh services that use the - database to allow the changes to take affect, i.e. the collector and API - services. - -.. _Apache: https://hbase.apache.org/book/quickstart.html - - Aodh Sample Configuration File ============================== diff --git a/doc/source/install/manual.rst b/doc/source/install/manual.rst index 7d5157aba..8d431d838 100644 --- a/doc/source/install/manual.rst +++ b/doc/source/install/manual.rst @@ -28,86 +28,15 @@ This step is a prerequisite for the collector, notification agent and API services. You may use one of the listed database backends below to store Aodh data. -.. note:: - Please notice, MongoDB requires pymongo_ to be installed on the system. The - required minimum version of pymongo is 2.4. -.. +The recommended Aodh storage backend is any SQLAlchemy-supported database +(`PostgreSQL` or `MySQL`). You need to create a `aodh` database first and then +initialise it by running:: -SQLalchemy-supported DBs ------------------------- - - The recommended Aodh storage backend is any SQLAlchemy-supported - database (`PostgreSQL` or `MySQL`). - - In case of SQL-based database backends, you need to create a `aodh` - database first and then initialise it by running:: - - aodh-dbsync - - To use MySQL as the storage backend, change the 'database' section in - aodh.conf as follows:: - - [database] - connection = mysql+pymysql://username:password@host/aodh?charset=utf8 - - -MongoDB -------- - - Follow the instructions to install the MongoDB_ package for your operating - system, then start the service. The required minimum version of MongoDB is 2.4. - - To use MongoDB as the storage backend, change the 'database' section in - aodh.conf as follows:: - - [database] - connection = mongodb://username:password@host:27017/aodh - - If MongoDB is configured in replica set mode, add `?replicaSet=` in your - connection URL:: - - [database] - connection = mongodb://username:password@host:27017/aodh?replicaSet=foobar - - -HBase ------ - - HBase backend is implemented to use HBase Thrift interface, therefore it is - mandatory to have the HBase Thrift server installed and running. To start - the Thrift server, please run the following command:: - - ${HBASE_HOME}/bin/hbase thrift start - - The implementation uses `HappyBase`_, which is a wrapper library used to - interact with HBase via Thrift protocol. You can verify the thrift - connection by running a quick test from a client:: - - import happybase - - conn = happybase.Connection(host=$hbase-thrift-server, port=9090, table_prefix=None) - print conn.tables() # this returns a list of HBase tables in your HBase server - - .. note:: - HappyBase version 0.5 or greater is required. Additionally, version 0.7 - is not currently supported. - .. - - In case of HBase, the needed database tables (`project`, `user`, `resource`, - `meter`, `alarm`, `alarm_h`) should be created manually with `f` column - family for each one. - - To use HBase as the storage backend, change the 'database' section in - aodh.conf as follows:: - - [database] - connection = hbase://hbase-thrift-host:9090 - - -.. _HappyBase: http://happybase.readthedocs.org/en/latest/index.html# -.. _MongoDB: http://www.mongodb.org/ -.. _pymongo: https://pypi.python.org/pypi/pymongo/ - + aodh-dbsync +To use MySQL as the storage backend, change the 'database' section in +aodh.conf as follows:: + [database] + connection = mysql+pymysql://username:password@host/aodh?charset=utf8 diff --git a/doc/source/testing.rst b/doc/source/testing.rst index b7d81a3c7..0f5673f48 100644 --- a/doc/source/testing.rst +++ b/doc/source/testing.rst @@ -24,15 +24,13 @@ run through tox_. $ sudo pip install tox -2. On Ubuntu install ``mongodb`` and ``libmysqlclient-dev`` packages:: +2. On Ubuntu install ``libmysqlclient-dev`` packages:: - $ sudo apt-get install mongodb $ sudo apt-get install libmysqlclient-dev For Fedora20 there is no ``libmysqlclient-dev`` package, so you’ll need to install ``mariadb-devel.x86-64`` (or ``mariadb-devel.i386``) instead:: - $ sudo yum install mongodb $ sudo yum install mariadb-devel.x86_64 3. Run the unit and code-style tests:: diff --git a/releasenotes/notes/remove-no-sql-drivers-21dfdbd750751340.yaml b/releasenotes/notes/remove-no-sql-drivers-21dfdbd750751340.yaml new file mode 100644 index 000000000..886d9482b --- /dev/null +++ b/releasenotes/notes/remove-no-sql-drivers-21dfdbd750751340.yaml @@ -0,0 +1,3 @@ +--- +upgrade: + - All the deprecated non-SQL drivers have been removed. diff --git a/run-functional-tests.sh b/run-functional-tests.sh index 9f9af8df2..7ab73c10c 100755 --- a/run-functional-tests.sh +++ b/run-functional-tests.sh @@ -4,11 +4,4 @@ set -e export AODH_TEST_BACKEND=${AODH_TEST_BACKEND:-mysql} export AODH_SERVICE_URL=${AODH_SERVICE_URL:-http://127.0.0.1:8042} -case $AODH_TEST_BACKEND in - hbase) - AODH_TEST_STORAGE_URL="hbase://__test__" $* - ;; - *) - pifpaf -g AODH_TEST_STORAGE_URL run $AODH_TEST_BACKEND -- $* - ;; -esac +pifpaf -g AODH_TEST_STORAGE_URL run $AODH_TEST_BACKEND -- $* diff --git a/setup.cfg b/setup.cfg index 9ee6a447e..e4d05c6db 100644 --- a/setup.cfg +++ b/setup.cfg @@ -44,14 +44,6 @@ postgresql = alembic>=0.7.2 psycopg2 -mongodb = - pymongo>=3.0.2 - -hbase = - happybase!=0.7,>=0.5,<1.0.0:python_version=='2.7' - # Required for bson - pymongo>=3.0.2 - zaqar = python-zaqarclient>=1.2.0 @@ -80,12 +72,10 @@ test = [entry_points] aodh.storage = log = aodh.storage.impl_log:Connection - mongodb = aodh.storage.impl_mongodb:Connection mysql = aodh.storage.impl_sqlalchemy:Connection mysql+pymysql = aodh.storage.impl_sqlalchemy:Connection postgresql = aodh.storage.impl_sqlalchemy:Connection sqlite = aodh.storage.impl_sqlalchemy:Connection - hbase = aodh.storage.impl_hbase:Connection aodh.alarm.rule = threshold = aodh.api.controllers.v2.alarm_rules.threshold:AlarmThresholdRule @@ -122,7 +112,6 @@ console_scripts = aodh-evaluator = aodh.cmd.alarm:evaluator aodh-notifier = aodh.cmd.alarm:notifier aodh-listener = aodh.cmd.alarm:listener - aodh-data-migration = aodh.cmd.data_migration:main aodh-combination-alarm-conversion = aodh.cmd.alarm_conversion:conversion oslo.config.opts = diff --git a/tools/test_hbase_table_utils.py b/tools/test_hbase_table_utils.py deleted file mode 100644 index 5e210c042..000000000 --- a/tools/test_hbase_table_utils.py +++ /dev/null @@ -1,38 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import os -import sys - -from oslo_config import cfg - -from aodh import storage - - -def main(argv): - cfg.CONF([], project='aodh') - if os.getenv("AODH_TEST_STORAGE_URL"): - url = ("%s?table_prefix=%s" % - (os.getenv("AODH_TEST_STORAGE_URL"), - os.getenv("AODH_TEST_HBASE_TABLE_PREFIX", "test"))) - cfg.CONF.set_override("connection", url, group="database", - enforce_type=True) - alarm_conn = storage.get_connection_from_config(cfg.CONF) - for arg in argv: - if arg == "--upgrade": - alarm_conn.upgrade() - if arg == "--clear": - alarm_conn.clear() - - -if __name__ == '__main__': - main(sys.argv[1:]) diff --git a/tox.ini b/tox.ini index fdc66cc3a..c08efebdf 100644 --- a/tox.ini +++ b/tox.ini @@ -15,17 +15,6 @@ commands = oslo-config-generator --config-file=etc/aodh/aodh-config-generator.conf whitelist_externals = bash -[testenv:py27-hbase] -deps = .[hbase,test] -setenv = OS_TEST_PATH=aodh/tests/functional/ - AODH_TEST_STORAGE_URL=hbase://__test__ - -[testenv:py27-mongodb] -deps = .[mongodb,test] -setenv = OS_TEST_PATH=aodh/tests/functional/ -commands = - pifpaf -g AODH_TEST_STORAGE_URL run mongodb -- python setup.py testr --slowest --testr-args="{posargs}" - [testenv:py27-mysql] deps = .[mysql,test] setenv = OS_TEST_PATH=aodh/tests/functional/ @@ -39,7 +28,7 @@ commands = pifpaf -g AODH_TEST_STORAGE_URL run postgresql -- python setup.py testr --slowest --testr-args="{posargs}" [testenv:functional] -deps = .[mysql,postgresql,mongodb,hbase,test] +deps = .[mysql,postgresql,test] setenv = VIRTUAL_ENV={envdir} OS_TEST_PATH=aodh/tests/functional/ GABBI_LIVE_FAIL_IF_NO_TEST=1 @@ -91,11 +80,6 @@ setenv = PYTHONHASHSEED=0 [testenv:debug] commands = bash -x oslo_debug_helper {posargs} -[testenv:debug-mongodb] -deps = .[mongodb,test] -setenv = OS_TEST_PATH=aodh/tests/functional/ -commands = pifpaf -g AODH_TEST_STORAGE_URL run mongodb -- oslo_debug_helper {posargs} - [testenv:debug-mysql] deps = .[mysql,test] setenv = OS_TEST_PATH=aodh/tests/functional/