Merge "Tool to migrate existing data to db per tenant"
This commit is contained in:
commit
2e6ce3842b
@ -45,7 +45,7 @@ def _get_config_files():
|
||||
return conf_files
|
||||
|
||||
|
||||
def parse_args():
|
||||
def parse_args(description='Persists metrics & alarm history in TSDB'):
|
||||
global _CONF_LOADED
|
||||
if _CONF_LOADED:
|
||||
LOG.debug('Configuration has been already loaded')
|
||||
@ -58,7 +58,7 @@ def parse_args():
|
||||
project='monasca',
|
||||
version=version.version_str,
|
||||
default_config_files=_get_config_files(),
|
||||
description='Persists metrics & alarm history in TSDB')
|
||||
description=description)
|
||||
|
||||
log.setup(CONF,
|
||||
product_name='monasca-persister',
|
||||
|
122
monasca_persister/tools/influxdb/db-per-tenant/README.rst
Normal file
122
monasca_persister/tools/influxdb/db-per-tenant/README.rst
Normal file
@ -0,0 +1,122 @@
|
||||
migrate-to-db-per-tenant.py
|
||||
===========================
|
||||
|
||||
The general plan for the monasca project is to move in the direction of
|
||||
having a database per tenant because:
|
||||
- Not only give a finer grain control over retention policy per tenant
|
||||
but also possibly speed up tenants queries by scoping them within their
|
||||
project.
|
||||
- Security is improved though better isolation. For example, a previous bug in
|
||||
InfluxDB where the tenant ID was ignored in the query exposed data from
|
||||
outside a tenants project. This is less likely to happen with a separate DB
|
||||
per tenant.)
|
||||
- We move in a direction of improving scalability for InfluxDB users
|
||||
without the Enterprise license. In the future a dedicated InfluxDB
|
||||
instance could optionally be used per project.
|
||||
|
||||
For further reading - https://storyboard.openstack.org/#!/story/2006331
|
||||
|
||||
All effort has been made to ensure this is a safe process. And it
|
||||
should be safe to run the tool multiple times. However, it is provided
|
||||
AS IS and you should use it at your own risk.
|
||||
|
||||
Usage
|
||||
=====
|
||||
|
||||
Steps to use this tool:
|
||||
|
||||
- Log in to one node where monasca-persister is deployed.
|
||||
|
||||
- Identify installation path to monasca-persister. This may be a
|
||||
virtual environment such as
|
||||
`/opt/stack/venv/monasca-<version>/lib/python2.7/site-packages/monasca_persister`
|
||||
or as in devstack
|
||||
`/opt/stack/monasca-persister/monasca_persister/` which you may need to
|
||||
activate.
|
||||
|
||||
- Identify the existing configuration for monasca-persister. The likely
|
||||
location for this is `/etc/monasca/persister.conf` if using devstack.
|
||||
|
||||
- Optionally, make a backup of your database in case something goes
|
||||
wrong but your original database should be left intact.
|
||||
|
||||
- Open and edit `migrate-to-db-per-tenant.py` and edit mapping between
|
||||
your existing projects to retention policies if you so wish. You may
|
||||
also want to change `end_time_offset_override` to the length of history
|
||||
you want the migration tool to consider.
|
||||
|
||||
- Invoke the tool to migrate to database per tenant. The arguments inside the
|
||||
square bracket default to the values shown when undefined.
|
||||
|
||||
::
|
||||
|
||||
sudo -u mon-persister python migrate-to-db-per-tenant.py --config-file /etc/monasca/persister.conf --migrate-retention-policy project:2w,project2:1d --migrate-skip-regex ^log\\..+ [--migrate-time-unit w --migrate-start-time-offset 0 --migrate-end-time-offset 520]
|
||||
|
||||
|
||||
- The progress of the migration will be logged to persister log file
|
||||
specifed in the persister config.
|
||||
|
||||
|
||||
FAQ
|
||||
===
|
||||
|
||||
1. Will this interrupt the operation of Monasca?
|
||||
- In theory, you can run this migration query on a live Monasca system
|
||||
because the migration leaves the original database untouched.
|
||||
However, in practice, it may degrade performance while the migration
|
||||
is taking place.
|
||||
2. How do I ensure that I migrate *all* metrics to the new scheme? i.e.
|
||||
Do I need to stop all persisters writing to InfluxDB and let Kafka
|
||||
buffer for a bit while the migration is taking place?
|
||||
- If you stop the persister and the migration takes longer than the
|
||||
Kafka buffer duration, this may cause data loss. It may be best to do
|
||||
the first batch of migration on the entire dataset and perform
|
||||
iterative migration of smaller chucks collected in the duration
|
||||
migration was taking place.
|
||||
3. Will the original index be left intact so that I can fall back if it
|
||||
doesn't work?
|
||||
- Yes.
|
||||
4. What do I do after I have run the tool? I need to enable the feature
|
||||
before starting the persisters right?
|
||||
- If you enable the feature by flipping on `db_per_tenant` flag before
|
||||
starting the migration and restart your persister service, your
|
||||
persister will start writing to a database per tenant model. It may
|
||||
be best to switch this feature on after the first round of migration,
|
||||
restart the persister so that it starts writing to database per
|
||||
tenancy and migrate the remainder of the data. Note that in the
|
||||
second run, you may need to delete `migrate-success` so that the metrics
|
||||
are not skipped. You may also want to try with smaller `--migrate-time-unit`
|
||||
option (default: `w` for week) if larger chunks of migration fail. Other
|
||||
option are `d` for day and `h` for hours.
|
||||
5. How long does it take (rough estimate per GB of metrics?)
|
||||
- This depends on your storage backend. On one of the systems in which
|
||||
this tool was tested on, it was an overnight job to move 40GB of metrics.
|
||||
6. Is the tool idempotent? i.e. could I migrate the bulk of the data
|
||||
with the persisters running, stop the persisters, and then migrate
|
||||
the delta in a short interval to minimise downtime? Can I run it
|
||||
again if it fails?
|
||||
- Yes, InfluxDB ensures that same time indices with the same tags,
|
||||
fields and values are not duplicated. Copying things over twice is
|
||||
perfectly safe.
|
||||
7. Will it set fire to my cat?
|
||||
- Depends if you cat is well behaved.
|
||||
|
||||
|
||||
License
|
||||
=======
|
||||
|
||||
Copyright (c) 2019 StackHPC Limited
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the “License”); you may
|
||||
not use this file except in compliance with the License. You may obtain
|
||||
a copy of the License at
|
||||
|
||||
::
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an “AS IS” BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
237
monasca_persister/tools/influxdb/db-per-tenant/migrate-to-db-per-tenant.py
Executable file
237
monasca_persister/tools/influxdb/db-per-tenant/migrate-to-db-per-tenant.py
Executable file
@ -0,0 +1,237 @@
|
||||
# (C) Copyright 2019 StackHPC Limited.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
""" Monasca InfluxDB tool for migration to database per tenant
|
||||
|
||||
Used to move data from monolithic database e.g. `monasca` to a database per
|
||||
tenant model, e.g. `monasca_<tenant_id>`.
|
||||
|
||||
Please see the included README.rst for more details about creating an
|
||||
appropriate configuration file.
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
|
||||
from monasca_persister import config
|
||||
from monasca_persister.repositories.influxdb import metrics_repository
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from oslo_log import log
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
MIGRATE_QUERY = ('SELECT * INTO "{target_db}"..:MEASUREMENT'
|
||||
' FROM "{measurement}"'
|
||||
' WHERE _tenant_id=\'{tenant_id}\''
|
||||
' AND time > {lower_time_offset}'
|
||||
' AND time <= {upper_time_offset}'
|
||||
' GROUP BY *')
|
||||
|
||||
class MigrationHelper(object):
|
||||
|
||||
def __init__(self):
|
||||
repo = metrics_repository.MetricInfluxdbRepository()
|
||||
self.conf = repo.conf
|
||||
self.client = repo._influxdb_client
|
||||
self.client.switch_database(self.conf.influxdb.database_name)
|
||||
|
||||
def _migrate(self, measurement, tenant_id, start_time_offset,
|
||||
end_time_offset, retention_policy={}, time_unit='w',
|
||||
db_per_tenant=True, **kwargs):
|
||||
|
||||
total_written = 0
|
||||
first_upper_time_offset = None
|
||||
last_lower_time_offset = None
|
||||
time_offset = start_time_offset
|
||||
|
||||
if db_per_tenant:
|
||||
target_db = "{}_{}".format(self.conf.influxdb.database_name, tenant_id)
|
||||
self.client.create_database(target_db)
|
||||
if retention_policy:
|
||||
self.client.create_retention_policy(database=target_db, **retention_policy)
|
||||
LOG.info(' into {}:'.format(target_db))
|
||||
|
||||
while end_time_offset > 0 and time_offset < end_time_offset:
|
||||
lower_time_offset = 'now()-{}{}'.format(time_offset + 1, time_unit)
|
||||
upper_time_offset = 'now()-{}{}'.format(time_offset, time_unit)
|
||||
if not first_upper_time_offset:
|
||||
first_upper_time_offset = upper_time_offset
|
||||
migrate_query = MIGRATE_QUERY.format(
|
||||
target_db=target_db,
|
||||
measurement=measurement,
|
||||
tenant_id=tenant_id,
|
||||
lower_time_offset=lower_time_offset,
|
||||
upper_time_offset=upper_time_offset,
|
||||
)
|
||||
LOG.debug(migrate_query)
|
||||
|
||||
written = next(self.client.query(migrate_query).get_points('result')).get('written')
|
||||
total_written += written
|
||||
time_offset += 1
|
||||
if written > 0:
|
||||
last_lower_time_offset = lower_time_offset
|
||||
LOG.info(" migrated {} entries from {} -> {} (cumulative {})".format(
|
||||
written,
|
||||
lower_time_offset,
|
||||
upper_time_offset,
|
||||
total_written,
|
||||
))
|
||||
LOG.info(" finished migrating a total of {} entries from {} -> {}.".format(
|
||||
total_written,
|
||||
last_lower_time_offset,
|
||||
first_upper_time_offset,
|
||||
))
|
||||
|
||||
def get_measurements(self, fname):
|
||||
measurements = []
|
||||
if fname:
|
||||
with open(fname, 'a+') as f:
|
||||
measurements = [l.strip() for l in f.readlines()]
|
||||
if not measurements:
|
||||
result = self.client.query('SHOW MEASUREMENTS').get_points('measurements')
|
||||
measurements = [m.get('name') for m in result]
|
||||
if fname:
|
||||
with open(fname, 'w') as f:
|
||||
for r in measurements:
|
||||
f.write(r + '\n')
|
||||
return measurements
|
||||
|
||||
def get_tenancies(self, measurements):
|
||||
result = self.client.query("SHOW TAG VALUES WITH KEY = _tenant_id")
|
||||
return {m: [t.get('value') for t in result.get_points(m)] for m in measurements}
|
||||
|
||||
def get_complete(self, fname):
|
||||
if fname:
|
||||
with open(fname, 'a+') as fd:
|
||||
return {l.strip() for l in fd.readlines()}
|
||||
else:
|
||||
return {}
|
||||
|
||||
def migrate(self,
|
||||
tenant_defaults={},
|
||||
default_start_time_offset=0, # Default: now
|
||||
default_end_time_offset=(520), # Default: 10 years
|
||||
skip_regex=[],
|
||||
measurements_file=None, success_file=None, failure_file=None, **kwargs):
|
||||
measurements = self.get_measurements(measurements_file)
|
||||
tenancy = self.get_tenancies(measurements)
|
||||
done = self.get_complete(success_file)
|
||||
skip = set()
|
||||
fail = set()
|
||||
if failure_file:
|
||||
if os.path.exists(failure_file):
|
||||
os.remove(failure_file)
|
||||
|
||||
filtered_measurements = []
|
||||
for measurement in measurements:
|
||||
if any([f.match(measurement) for f in skip_regex]):
|
||||
skip.add(measurement)
|
||||
LOG.debug('Skipping {} because it matches a skip regex.'.format(measurement))
|
||||
continue
|
||||
elif measurement in done:
|
||||
LOG.debug('Skipping {} because its already done.'.format(measurement))
|
||||
continue
|
||||
else:
|
||||
filtered_measurements.append(measurement)
|
||||
|
||||
for i, measurement in enumerate(filtered_measurements):
|
||||
LOG.info('Migrating {}'.format(measurement))
|
||||
try:
|
||||
for tenant_id in tenancy.get(measurement):
|
||||
start_time_offset = tenant_defaults.get(
|
||||
tenant_id, {}).get('start_time_offset_override',
|
||||
default_start_time_offset)
|
||||
end_time_offset = tenant_defaults.get(
|
||||
tenant_id, {}).get('end_time_offset_override',
|
||||
default_end_time_offset)
|
||||
retention_policy = tenant_defaults.get(
|
||||
tenant_id, {}) .get('rp', {})
|
||||
self._migrate(measurement, tenant_id,
|
||||
start_time_offset=start_time_offset,
|
||||
end_time_offset=end_time_offset,
|
||||
retention_policy=retention_policy, **kwargs)
|
||||
if success_file:
|
||||
with open(success_file, 'a+') as fd:
|
||||
fd.write('{}\n'.format(measurement))
|
||||
done.add(measurement)
|
||||
except Exception as e:
|
||||
LOG.error(e)
|
||||
if failure_file:
|
||||
with open(failure_file, 'a+') as fe:
|
||||
fe.write('{}\t{}\n'.format(measurement, e))
|
||||
fail.add(measurement)
|
||||
LOG.info("{}/{} (done {} + skip {} + fail {})/{}".format(
|
||||
i + 1, len(filtered_measurements), len(done), len(skip),
|
||||
len(fail), len(measurements)))
|
||||
|
||||
|
||||
def main():
|
||||
CONF = cfg.CONF
|
||||
cli_opts = [
|
||||
cfg.StrOpt('migrate-time-unit', choices=['h', 'd', 'w'], default='w',
|
||||
help='Unit of time, h=hour, d=day, w=week (default: "w").'),
|
||||
cfg.IntOpt('migrate-start-time-offset', default=0,
|
||||
help='Start time offset in the given unit of time (default: 0).'),
|
||||
cfg.IntOpt('migrate-end-time-offset', default=520,
|
||||
help='End time offset in the given unit of time (default: 520).'),
|
||||
cfg.DictOpt('migrate-retention-policy', default={},
|
||||
help=('Custom retention policy for projects in the provided'
|
||||
'time unit. (e.g. project-id-x:2,project-id-y:4)')),
|
||||
cfg.ListOpt('migrate-skip-regex', default=[],
|
||||
help=('Skip metrics that match this comma separated list of regex patterns.'
|
||||
'(e.g. ^log\\\\..+,^cpu\\\\..+ to skip metrics beginning with log.)')),
|
||||
]
|
||||
CONF.register_cli_opts(cli_opts)
|
||||
config.parse_args("Monasca InfluxDB database per tenant migration tool")
|
||||
|
||||
# Configure custom retention policy for your existing projects. For
|
||||
# example, rp2w is a retention policy of two weeks which we can assign to
|
||||
# project example-project-id.
|
||||
tenant_defaults = dict()
|
||||
for k, v in CONF.migrate_retention_policy.items():
|
||||
if v.isdigit():
|
||||
rp = '{}{}'.format(v, CONF.migrate_time_unit)
|
||||
tenant_defaults[k] = dict(
|
||||
end_time_offset_override=int(v),
|
||||
rp=dict(name=rp, duration=rp, replication='1', default=True),
|
||||
)
|
||||
LOG.info('Project {} will be applied retention policy: {}.'.format(k, rp))
|
||||
else:
|
||||
raise ValueError('Retention policy for project {} must be an'
|
||||
'integer of given time unit. Current value:'
|
||||
'{}.'.format(k, v))
|
||||
|
||||
skip_regex = []
|
||||
for p in CONF.migrate_skip_regex:
|
||||
skip_regex.append(re.compile(str(p)))
|
||||
LOG.info('Metrics matching pattern "{}" will be skipped.'.format(p))
|
||||
|
||||
helper = MigrationHelper()
|
||||
helper.migrate(skip_regex=skip_regex,
|
||||
tenant_defaults=tenant_defaults,
|
||||
default_end_time_offset=CONF.migrate_end_time_offset,
|
||||
default_start_time_offset=CONF.migrate_start_time_offset,
|
||||
time_unit=CONF.migrate_time_unit,
|
||||
measurements_file='migrate-measurements',
|
||||
success_file='migrate-success',
|
||||
failure_file='migrate-failure')
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
Loading…
x
Reference in New Issue
Block a user