[HBase] Add migration script for new row separate design
Add migration for HBase data for compatibility with new row format which presented in CR https://review.openstack.org/#/c/106376/ Change-Id: I5232c5dad17730b76a1b3ee864d8d973ba1b8749
This commit is contained in:
parent
919769480d
commit
3643ca5b3b
@ -25,6 +25,7 @@ from ceilometer.alarm.storage import models
|
|||||||
from ceilometer.openstack.common.gettextutils import _
|
from ceilometer.openstack.common.gettextutils import _
|
||||||
from ceilometer.openstack.common import log
|
from ceilometer.openstack.common import log
|
||||||
from ceilometer.storage.hbase import inmemory as hbase_inmemory
|
from ceilometer.storage.hbase import inmemory as hbase_inmemory
|
||||||
|
from ceilometer.storage.hbase import migration as hbase_migration
|
||||||
from ceilometer.storage.hbase import utils as hbase_utils
|
from ceilometer.storage.hbase import utils as hbase_utils
|
||||||
from ceilometer import utils
|
from ceilometer import utils
|
||||||
|
|
||||||
@ -102,6 +103,7 @@ class Connection(base.Connection):
|
|||||||
column_families = {'f': dict()}
|
column_families = {'f': dict()}
|
||||||
with self.conn_pool.connection() as conn:
|
with self.conn_pool.connection() as conn:
|
||||||
hbase_utils.create_tables(conn, tables, column_families)
|
hbase_utils.create_tables(conn, tables, column_families)
|
||||||
|
hbase_migration.migrate_tables(conn, tables)
|
||||||
|
|
||||||
def clear(self):
|
def clear(self):
|
||||||
LOG.debug(_('Dropping HBase schema...'))
|
LOG.debug(_('Dropping HBase schema...'))
|
||||||
|
@ -214,8 +214,12 @@ class MTable(object):
|
|||||||
"""
|
"""
|
||||||
op = args[0]
|
op = args[0]
|
||||||
value = args[1]
|
value = args[1]
|
||||||
|
is_regex = False
|
||||||
if value.startswith('binaryprefix:'):
|
if value.startswith('binaryprefix:'):
|
||||||
value = value[len('binaryprefix:'):]
|
value = value[len('binaryprefix:'):]
|
||||||
|
if value.startswith('regexstring:'):
|
||||||
|
value = value[len('regexstring:'):]
|
||||||
|
is_regex = True
|
||||||
column = 'f:' + value
|
column = 'f:' + value
|
||||||
r = {}
|
r = {}
|
||||||
for row in rows:
|
for row in rows:
|
||||||
@ -226,7 +230,8 @@ class MTable(object):
|
|||||||
(op == '>=' and key >= column) or
|
(op == '>=' and key >= column) or
|
||||||
(op == '<=' and key <= column) or
|
(op == '<=' and key <= column) or
|
||||||
(op == '>' and key > column) or
|
(op == '>' and key > column) or
|
||||||
(op == '<' and key < column)):
|
(op == '<' and key < column) or
|
||||||
|
(is_regex and re.search(value, key))):
|
||||||
r_data[key] = data[key]
|
r_data[key] = data[key]
|
||||||
else:
|
else:
|
||||||
raise ceilometer.NotImplementedError(
|
raise ceilometer.NotImplementedError(
|
||||||
|
119
ceilometer/storage/hbase/migration.py
Normal file
119
ceilometer/storage/hbase/migration.py
Normal file
@ -0,0 +1,119 @@
|
|||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
"""HBase storage backend migrations
|
||||||
|
"""
|
||||||
|
|
||||||
|
import re
|
||||||
|
|
||||||
|
from ceilometer.storage.hbase import utils as hbase_utils
|
||||||
|
|
||||||
|
|
||||||
|
def migrate_resource_table(conn, table):
|
||||||
|
"""Migrate table 'resource' in HBase.
|
||||||
|
|
||||||
|
Change qualifiers format from "%s+%s+%s!%s!%s" %
|
||||||
|
(rts, source, counter_name, counter_type,counter_unit)
|
||||||
|
in columns with meters f:m_*
|
||||||
|
to new separator format "%s:%s:%s:%s:%s" %
|
||||||
|
(rts, source, counter_name, counter_type,counter_unit)
|
||||||
|
"""
|
||||||
|
resource_table = conn.table(table)
|
||||||
|
resource_filter = ("QualifierFilter(=, "
|
||||||
|
"'regexstring:m_\\d{19}\\+"
|
||||||
|
"[\\w-\\._]*\\+[\\w-\\._!]')")
|
||||||
|
gen = resource_table.scan(filter=resource_filter)
|
||||||
|
for row, data in gen:
|
||||||
|
columns = []
|
||||||
|
updated_columns = dict()
|
||||||
|
column_prefix = "f:"
|
||||||
|
for column, value in data.items():
|
||||||
|
if column.startswith('f:m_'):
|
||||||
|
columns.append(column)
|
||||||
|
parts = column[2:].split("+", 2)
|
||||||
|
parts.extend(parts.pop(2).split("!"))
|
||||||
|
column = hbase_utils.prepare_key(*parts)
|
||||||
|
updated_columns[column_prefix + column] = value
|
||||||
|
resource_table.put(row, updated_columns)
|
||||||
|
resource_table.delete(row, columns)
|
||||||
|
|
||||||
|
|
||||||
|
def migrate_meter_table(conn, table):
|
||||||
|
"""Migrate table 'meter' in HBase.
|
||||||
|
|
||||||
|
Change row format from "%s_%d_%s" % (counter_name, rts, message_signature)
|
||||||
|
to new separator format "%s:%s:%s" % (counter_name, rts, message_signature)
|
||||||
|
"""
|
||||||
|
meter_table = conn.table(table)
|
||||||
|
meter_filter = ("RowFilter(=, "
|
||||||
|
"'regexstring:[\\w\\._-]*_\\d{19}_\\w*')")
|
||||||
|
gen = meter_table.scan(filter=meter_filter)
|
||||||
|
for row, data in gen:
|
||||||
|
parts = row.rsplit('_', 2)
|
||||||
|
new_row = hbase_utils.prepare_key(*parts)
|
||||||
|
meter_table.put(new_row, data)
|
||||||
|
meter_table.delete(row)
|
||||||
|
|
||||||
|
|
||||||
|
def migrate_event_table(conn, table):
|
||||||
|
"""Migrate table 'event' in HBase.
|
||||||
|
|
||||||
|
Change row format from ""%d_%s" % timestamp, event_id,
|
||||||
|
to new separator format "%s:%s" % timestamp, event_id
|
||||||
|
Also change trait columns from %s+%s % trait.name, trait.dtype
|
||||||
|
to %s:%s % trait.name, trait.dtype
|
||||||
|
"""
|
||||||
|
event_table = conn.table(table)
|
||||||
|
event_filter = "RowFilter(=, 'regexstring:\\d*_\\w*')"
|
||||||
|
gen = event_table.scan(filter=event_filter)
|
||||||
|
trait_pattern = re.compile("f:[\w\-_]*\+\w")
|
||||||
|
column_prefix = "f:"
|
||||||
|
for row, data in gen:
|
||||||
|
row_parts = row.split("_", 1)
|
||||||
|
update_data = {}
|
||||||
|
for column, value in data.items():
|
||||||
|
if trait_pattern.match(column):
|
||||||
|
trait_parts = column[2:].rsplit('+', 1)
|
||||||
|
column = hbase_utils.prepare_key(*trait_parts)
|
||||||
|
update_data[column_prefix + column] = value
|
||||||
|
new_row = hbase_utils.prepare_key(*row_parts)
|
||||||
|
event_table.put(new_row, update_data)
|
||||||
|
event_table.delete(row)
|
||||||
|
|
||||||
|
|
||||||
|
def migrate_alarm_history_table(conn, table):
|
||||||
|
"""Migrate table 'alarm_h' in HBase.
|
||||||
|
|
||||||
|
Change row format from ""%s_%s" % alarm_id, rts,
|
||||||
|
to new separator format "%s:%s" % alarm_id, rts
|
||||||
|
"""
|
||||||
|
alarm_h_table = conn.table(table)
|
||||||
|
alarm_h_filter = "RowFilter(=, 'regexstring:\\w*_\\d{19}')"
|
||||||
|
gen = alarm_h_table.scan(filter=alarm_h_filter)
|
||||||
|
for row, data in gen:
|
||||||
|
row_parts = row.rsplit('_', 1)
|
||||||
|
alarm_h_table.put(hbase_utils.prepare_key(*row_parts), data)
|
||||||
|
alarm_h_table.delete(row)
|
||||||
|
|
||||||
|
|
||||||
|
TABLE_MIGRATION_FUNCS = {'resource': migrate_resource_table,
|
||||||
|
'alarm_h': migrate_alarm_history_table,
|
||||||
|
'meter': migrate_meter_table,
|
||||||
|
'event': migrate_event_table}
|
||||||
|
|
||||||
|
|
||||||
|
def migrate_tables(conn, tables):
|
||||||
|
if type(tables) is not list:
|
||||||
|
tables = [tables]
|
||||||
|
for table in tables:
|
||||||
|
if table in TABLE_MIGRATION_FUNCS:
|
||||||
|
TABLE_MIGRATION_FUNCS.get(table)(conn, table)
|
@ -18,6 +18,7 @@ import json
|
|||||||
|
|
||||||
import bson.json_util
|
import bson.json_util
|
||||||
from happybase.hbase import ttypes
|
from happybase.hbase import ttypes
|
||||||
|
import six
|
||||||
|
|
||||||
from ceilometer.openstack.common.gettextutils import _
|
from ceilometer.openstack.common.gettextutils import _
|
||||||
from ceilometer.openstack.common import log
|
from ceilometer.openstack.common import log
|
||||||
@ -319,6 +320,20 @@ def format_meter_reference(c_name, c_type, c_unit, rts, source):
|
|||||||
return "%s+%s+%s!%s!%s" % (rts, source, c_name, c_type, c_unit)
|
return "%s+%s+%s!%s!%s" % (rts, source, c_name, c_type, c_unit)
|
||||||
|
|
||||||
|
|
||||||
|
def prepare_key(*args):
|
||||||
|
"""Prepares names for rows and columns with correct separator.
|
||||||
|
|
||||||
|
:param args: strings or numbers that we want our key construct of
|
||||||
|
:return: key with quoted args that are separated with character ":"
|
||||||
|
"""
|
||||||
|
key_quote = []
|
||||||
|
for key in args:
|
||||||
|
if isinstance(key, int):
|
||||||
|
key = str(key)
|
||||||
|
key_quote.append(quote(key))
|
||||||
|
return ":".join(key_quote)
|
||||||
|
|
||||||
|
|
||||||
def timestamp_from_record_tuple(record):
|
def timestamp_from_record_tuple(record):
|
||||||
"""Extract timestamp from HBase tuple record."""
|
"""Extract timestamp from HBase tuple record."""
|
||||||
return record[0]['timestamp']
|
return record[0]['timestamp']
|
||||||
@ -439,3 +454,22 @@ def create_tables(conn, tables, column_families):
|
|||||||
LOG.warn(_("Cannot create table %(table_name)s "
|
LOG.warn(_("Cannot create table %(table_name)s "
|
||||||
"it already exists. Ignoring error")
|
"it already exists. Ignoring error")
|
||||||
% {'table_name': table})
|
% {'table_name': table})
|
||||||
|
|
||||||
|
|
||||||
|
def quote(s, *args):
|
||||||
|
"""Return quoted string even if it is unicode one.
|
||||||
|
|
||||||
|
:param s: string that should be quoted
|
||||||
|
:param args: any symbol we want to stay unquoted
|
||||||
|
"""
|
||||||
|
s_en = s.encode('utf8')
|
||||||
|
return six.moves.urllib.parse.quote(s_en, *args)
|
||||||
|
|
||||||
|
|
||||||
|
def unquote(s):
|
||||||
|
"""Return unquoted and decoded string.
|
||||||
|
|
||||||
|
:param s: string that should be unquoted
|
||||||
|
"""
|
||||||
|
s_de = six.moves.urllib.parse.unquote(s)
|
||||||
|
return s_de.decode('utf8')
|
||||||
|
@ -28,6 +28,7 @@ from ceilometer.openstack.common.gettextutils import _
|
|||||||
from ceilometer.openstack.common import log
|
from ceilometer.openstack.common import log
|
||||||
from ceilometer.storage import base
|
from ceilometer.storage import base
|
||||||
from ceilometer.storage.hbase import inmemory as hbase_inmemory
|
from ceilometer.storage.hbase import inmemory as hbase_inmemory
|
||||||
|
from ceilometer.storage.hbase import migration as hbase_migration
|
||||||
from ceilometer.storage.hbase import utils as hbase_utils
|
from ceilometer.storage.hbase import utils as hbase_utils
|
||||||
from ceilometer.storage import models
|
from ceilometer.storage import models
|
||||||
from ceilometer import utils
|
from ceilometer import utils
|
||||||
@ -166,6 +167,7 @@ class Connection(base.Connection):
|
|||||||
column_families = {'f': dict(max_versions=1)}
|
column_families = {'f': dict(max_versions=1)}
|
||||||
with self.conn_pool.connection() as conn:
|
with self.conn_pool.connection() as conn:
|
||||||
hbase_utils.create_tables(conn, tables, column_families)
|
hbase_utils.create_tables(conn, tables, column_families)
|
||||||
|
hbase_migration.migrate_tables(conn, tables)
|
||||||
|
|
||||||
def clear(self):
|
def clear(self):
|
||||||
LOG.debug(_('Dropping HBase schema...'))
|
LOG.debug(_('Dropping HBase schema...'))
|
||||||
|
Loading…
x
Reference in New Issue
Block a user