Merge "Add a tool for migrating alarms data from NoSQL to SQL"
This commit is contained in:
commit
7064ddc28a
162
aodh/cmd/data_migration.py
Normal file
162
aodh/cmd/data_migration.py
Normal file
@ -0,0 +1,162 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
A tool for migrating alarms and alarms history data from NoSQL to SQL.
|
||||
|
||||
NOTES:
|
||||
|
||||
- Users need to specify the source NoSQL connection url and the destination SQL
|
||||
connection URL with this tool, an usage example:
|
||||
aodh-data-migration --nosql-conn \
|
||||
mongodb://aodh:password@127.0.0.1:27017/aodh --sql-conn \
|
||||
mysql+pymysql://root:password@127.0.0.1/aodh?charset=utf8
|
||||
|
||||
- Both the alarm data and alarm history data will be migrated when running this
|
||||
tool, but the history data migration can be avoid by specifying False of
|
||||
--migrate-history option.
|
||||
|
||||
- It is better to ensure the db connection is OK when running this tool, and
|
||||
this tool can be run repeatedly, the duplicated data will be skipped.
|
||||
|
||||
- This tool depends on the NoSQL and SQL drivers of Aodh, so it is should be
|
||||
used only before the removal of NoSQL drivers.
|
||||
|
||||
- This tool has been tested OK in devstack environment, but users need to be
|
||||
cautious with this, because the data migration between storage backends is
|
||||
a bit dangerous.
|
||||
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import sys
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_db import exception
|
||||
from oslo_db import options as db_options
|
||||
from aodh.i18n import _LE, _LI, _LW
|
||||
import six.moves.urllib.parse as urlparse
|
||||
|
||||
from aodh import storage
|
||||
|
||||
root_logger = logging.getLogger('')
|
||||
|
||||
|
||||
def get_parser():
|
||||
parser = argparse.ArgumentParser(
|
||||
description='A tool for Migrating alarms and alarms history from'
|
||||
' NoSQL to SQL',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--nosql-conn',
|
||||
required=True,
|
||||
type=str,
|
||||
help='The source NoSQL database connection.',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--sql-conn',
|
||||
required=True,
|
||||
type=str,
|
||||
help='The destination SQL database connection.',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--migrate-history',
|
||||
default=True,
|
||||
type=bool,
|
||||
help='Migrate history data when migrate alarms or not,'
|
||||
' True as Default.',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--debug',
|
||||
default=False,
|
||||
action='store_true',
|
||||
help='Show the debug level log messages.',
|
||||
)
|
||||
return parser
|
||||
|
||||
|
||||
def _validate_conn_options(args):
|
||||
nosql_scheme = urlparse.urlparse(args.nosql_conn).scheme
|
||||
sql_scheme = urlparse.urlparse(args.sql_conn).scheme
|
||||
if nosql_scheme not in ('mongodb', 'hbase'):
|
||||
root_logger.error(_LE('Invalid source DB type %s, the source database '
|
||||
'connection should be one of: [mongodb, hbase]'
|
||||
), nosql_scheme)
|
||||
sys.exit(1)
|
||||
if sql_scheme not in ('mysql', 'mysql+pymysql', 'postgresql',
|
||||
'sqlite'):
|
||||
root_logger.error(_LE('Invalid destination DB type %s, the destination'
|
||||
' database connection should be one of: '
|
||||
'[mysql, postgresql, sqlite]'), sql_scheme)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def main():
|
||||
args = get_parser().parse_args()
|
||||
|
||||
# Set up logging to use the console
|
||||
console = logging.StreamHandler(sys.stderr)
|
||||
formatter = logging.Formatter(
|
||||
'[%(asctime)s] %(levelname)-8s %(message)s')
|
||||
console.setFormatter(formatter)
|
||||
root_logger.addHandler(console)
|
||||
if args.debug:
|
||||
root_logger.setLevel(logging.DEBUG)
|
||||
else:
|
||||
root_logger.setLevel(logging.INFO)
|
||||
|
||||
_validate_conn_options(args)
|
||||
|
||||
nosql_conf = cfg.ConfigOpts()
|
||||
db_options.set_defaults(nosql_conf, args.nosql_conn)
|
||||
nosql_conf.register_opts(storage.OPTS, 'database')
|
||||
nosql_conn = storage.get_connection_from_config(nosql_conf)
|
||||
|
||||
sql_conf = cfg.ConfigOpts()
|
||||
db_options.set_defaults(sql_conf, args.sql_conn)
|
||||
sql_conf.register_opts(storage.OPTS, 'database')
|
||||
sql_conn = storage.get_connection_from_config(sql_conf)
|
||||
|
||||
root_logger.info(
|
||||
_LI("Starting to migrate alarms data from NoSQL to SQL..."))
|
||||
|
||||
count = 0
|
||||
for alarm in nosql_conn.get_alarms():
|
||||
root_logger.debug("Migrating alarm %s..." % alarm.alarm_id)
|
||||
try:
|
||||
sql_conn.create_alarm(alarm)
|
||||
count += 1
|
||||
except exception.DBDuplicateEntry:
|
||||
root_logger.warning(_LW("Duplicated alarm %s found, skipped."),
|
||||
alarm.alarm_id)
|
||||
if not args.migrate_history:
|
||||
continue
|
||||
|
||||
history_count = 0
|
||||
for history in nosql_conn.get_alarm_changes(alarm.alarm_id, None):
|
||||
history_data = history.as_dict()
|
||||
root_logger.debug(" Migrating alarm history data with"
|
||||
" event_id %s..." % history_data['event_id'])
|
||||
try:
|
||||
sql_conn.record_alarm_change(history_data)
|
||||
history_count += 1
|
||||
except exception.DBDuplicateEntry:
|
||||
root_logger.warning(
|
||||
_LW(" Duplicated alarm history %s found, skipped."),
|
||||
history_data['event_id'])
|
||||
root_logger.info(_LI(" Migrated %(count)s history data of alarm "
|
||||
"%(alarm_id)s"),
|
||||
{'count': history_count, 'alarm_id': alarm.alarm_id})
|
||||
|
||||
root_logger.info(_LI("End alarms data migration from NoSQL to SQL, %s"
|
||||
" alarms have been migrated."), count)
|
112
aodh/tests/functional/storage/test_data_migration.py
Normal file
112
aodh/tests/functional/storage/test_data_migration.py
Normal file
@ -0,0 +1,112 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import datetime
|
||||
import uuid
|
||||
|
||||
import mock
|
||||
from oslo_config import fixture as fixture_config
|
||||
|
||||
from aodh.cmd import data_migration
|
||||
from aodh import service
|
||||
from aodh import storage
|
||||
from aodh.storage import models as alarm_models
|
||||
from aodh.tests.functional import db as tests_db
|
||||
from aodh.tests.functional.storage import test_storage_scenarios
|
||||
|
||||
|
||||
@tests_db.run_with('hbase', 'mongodb')
|
||||
class TestDataMigration(test_storage_scenarios.AlarmTestBase):
|
||||
|
||||
def setUp(self):
|
||||
sql_conf = service.prepare_service(argv=[], config_files=[])
|
||||
self.sql_conf = self.useFixture(fixture_config.Config(sql_conf)).conf
|
||||
# using sqlite to represent the type of SQL dbs
|
||||
self.sql_conf.set_override('connection', "sqlite://",
|
||||
group="database", enforce_type=True)
|
||||
self.sql_namager = tests_db.SQLiteManager(self.sql_conf)
|
||||
self.useFixture(self.sql_namager)
|
||||
self.sql_conf.set_override('connection', self.sql_namager.url,
|
||||
group="database", enforce_type=True)
|
||||
self.sql_alarm_conn = storage.get_connection_from_config(self.sql_conf)
|
||||
self.sql_alarm_conn.upgrade()
|
||||
super(TestDataMigration, self).setUp()
|
||||
self.add_some_alarms()
|
||||
self._add_some_alarm_changes()
|
||||
|
||||
def tearDown(self):
|
||||
self.sql_alarm_conn.clear()
|
||||
self.sql_alarm_conn = None
|
||||
super(TestDataMigration, self).tearDown()
|
||||
|
||||
def _add_some_alarm_changes(self):
|
||||
alarms = list(self.alarm_conn.get_alarms())
|
||||
i = 0
|
||||
for alarm in alarms:
|
||||
for change_type in [alarm_models.AlarmChange.CREATION,
|
||||
alarm_models.AlarmChange.RULE_CHANGE,
|
||||
alarm_models.AlarmChange.STATE_TRANSITION,
|
||||
alarm_models.AlarmChange.STATE_TRANSITION,
|
||||
alarm_models.AlarmChange.STATE_TRANSITION]:
|
||||
alarm_change = {
|
||||
"event_id": str(uuid.uuid4()),
|
||||
"alarm_id": alarm.alarm_id,
|
||||
"type": change_type,
|
||||
"detail": "detail %s" % alarm.name,
|
||||
"user_id": alarm.user_id,
|
||||
"project_id": alarm.project_id,
|
||||
"on_behalf_of": alarm.project_id,
|
||||
"timestamp": datetime.datetime(2014, 4, 7, 7, 30 + i)
|
||||
}
|
||||
self.alarm_conn.record_alarm_change(alarm_change=alarm_change)
|
||||
i += 1
|
||||
|
||||
def test_data_migration_without_history_data(self):
|
||||
alarms = list(self.alarm_conn.get_alarms())
|
||||
self.assertEqual(3, len(alarms))
|
||||
alarms_sql = list(self.sql_alarm_conn.get_alarms())
|
||||
self.assertEqual(0, len(alarms_sql))
|
||||
test_args = data_migration.get_parser().parse_args(
|
||||
['--sql-conn', 'sqlite://', '--nosql-conn',
|
||||
self.CONF.database.connection, '--migrate-history', False])
|
||||
with mock.patch('argparse.ArgumentParser.parse_args') as args_parser:
|
||||
# because get_connection_from_config has been mocked in
|
||||
# aodh.tests.functional.db.TestBase#setUp, here re-mocked it that
|
||||
# this test can get nosql and sql storage connections
|
||||
with mock.patch('aodh.storage.get_connection_from_config') as conn:
|
||||
conn.side_effect = [self.alarm_conn, self.sql_alarm_conn]
|
||||
args_parser.return_value = test_args
|
||||
data_migration.main()
|
||||
alarms_sql = list(self.sql_alarm_conn.get_alarms())
|
||||
alarm_changes = list(self.sql_alarm_conn.query_alarm_history())
|
||||
self.assertEqual(0, len(alarm_changes))
|
||||
self.assertEqual(3, len(alarms_sql))
|
||||
self.assertEqual(sorted([a.alarm_id for a in alarms]),
|
||||
sorted([a.alarm_id for a in alarms_sql]))
|
||||
|
||||
def test_data_migration_with_history_data(self):
|
||||
test_args = data_migration.get_parser().parse_args(
|
||||
['--sql-conn', 'sqlite://', '--nosql-conn',
|
||||
self.CONF.database.connection])
|
||||
with mock.patch('argparse.ArgumentParser.parse_args') as args_parser:
|
||||
# because get_connection_from_config has been mocked in
|
||||
# aodh.tests.functional.db.TestBase#setUp, here re-mocked it that
|
||||
# this test can get nosql and sql storage connections
|
||||
with mock.patch('aodh.storage.get_connection_from_config') as conn:
|
||||
conn.side_effect = [self.alarm_conn, self.sql_alarm_conn]
|
||||
args_parser.return_value = test_args
|
||||
data_migration.main()
|
||||
alarms_sql = list(self.sql_alarm_conn.get_alarms())
|
||||
self.assertEqual(3, len(alarms_sql))
|
||||
for alarm in alarms_sql:
|
||||
changes = list(self.sql_alarm_conn.get_alarm_changes(
|
||||
alarm.alarm_id, alarm.project_id))
|
||||
self.assertEqual(5, len(changes))
|
@ -0,0 +1,7 @@
|
||||
---
|
||||
upgrade:
|
||||
- >
|
||||
Add a tool for migrating alarm and alarm history data from NoSQL storage
|
||||
to SQL storage. The migration tool has been tested OK in devstack
|
||||
environment, but users need to be cautious with this, because the data
|
||||
migration between storage backends is a bit dangerous.
|
Loading…
Reference in New Issue
Block a user