Merge "Implement separate db per tenancy"
This commit is contained in:
commit
a4addd0f5e
@ -20,6 +20,9 @@ influxdb_opts = [
|
||||
cfg.StrOpt('database_name',
|
||||
help='database name where metrics are stored',
|
||||
default='mon'),
|
||||
cfg.BoolOpt('db_per_tenant',
|
||||
help='Whether to use a separate database per tenant',
|
||||
default=False),
|
||||
cfg.HostAddressOpt('ip_address',
|
||||
help='Valid IP address or hostname '
|
||||
'to InfluxDB instance'),
|
||||
|
@ -27,5 +27,5 @@ class AbstractRepository(object):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def write_batch(self, data_points):
|
||||
def write_batch(self, data_points_by_tenant):
|
||||
pass
|
||||
|
@ -53,9 +53,12 @@ class AlarmStateHistCassandraRepository(abstract_repository.AbstractCassandraRep
|
||||
alarm_id.encode('utf8'),
|
||||
time_stamp)
|
||||
|
||||
return alarm_state_hist
|
||||
return alarm_state_hist, tenant_id
|
||||
|
||||
def write_batch(self, alarm_state_hists):
|
||||
def write_batch(self, alarm_state_hists_by_tenant):
|
||||
# TODO(brtknr): At the moment, Cassandra does not have database per
|
||||
# tenant implemented, so use chained list of values.
|
||||
alarm_state_hists = alarm_state_hists_by_tenant.chained()
|
||||
while alarm_state_hists:
|
||||
num_rows = min(len(alarm_state_hists), cfg.CONF.kafka_alarm_history.batch_size)
|
||||
batch = alarm_state_hists[:num_rows]
|
||||
|
@ -136,6 +136,8 @@ class MetricCassandraRepository(abstract_repository.AbstractCassandraRepository)
|
||||
hash_string = '%s\0%s\0%s\0%s' % (region, tenant_id, metric_name, '\0'.join(dim_list))
|
||||
metric_id = hashlib.sha1(hash_string.encode('utf8')).hexdigest()
|
||||
|
||||
# TODO(brtknr): If database per tenant becomes the default and the
|
||||
# only option, recording tenant_id will be redundant.
|
||||
metric = Metric(id=metric_id,
|
||||
region=region,
|
||||
tenant_id=tenant_id,
|
||||
@ -165,7 +167,7 @@ class MetricCassandraRepository(abstract_repository.AbstractCassandraRepository)
|
||||
metric.dimension_names))
|
||||
self._metric_batch.add_metric_query(metric_update_bound_stmt)
|
||||
|
||||
return metric
|
||||
return metric, tenant_id
|
||||
|
||||
self._metric_id_cache[metric.id] = metric.id
|
||||
|
||||
@ -216,19 +218,17 @@ class MetricCassandraRepository(abstract_repository.AbstractCassandraRepository)
|
||||
metric.time_stamp))
|
||||
self._metric_batch.add_measurement_query(measurement_insert_bound_stmt)
|
||||
|
||||
return metric
|
||||
|
||||
def write_batch(self, metrics):
|
||||
return metric, tenant_id
|
||||
|
||||
def write_batch(self, metrics_by_tenant):
|
||||
# TODO(brtknr): At the moment, Cassandra does not have database per
|
||||
# tenant implemented, so join the list of values.
|
||||
metrics = metrics_by_tenant.chained()
|
||||
with self._lock:
|
||||
batch_list = self._metric_batch.get_all_batches()
|
||||
|
||||
results = execute_concurrent(self._session, batch_list, raise_on_first_error=True)
|
||||
|
||||
self._handle_results(results)
|
||||
|
||||
self._metric_batch.clear()
|
||||
|
||||
LOG.info("flushed %s metrics", len(metrics))
|
||||
|
||||
@staticmethod
|
||||
|
@ -19,6 +19,8 @@ import six
|
||||
|
||||
from monasca_persister.repositories import abstract_repository
|
||||
|
||||
DATABASE_NOT_FOUND_MSG = "database not found"
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class AbstractInfluxdbRepository(abstract_repository.AbstractRepository):
|
||||
@ -30,8 +32,30 @@ class AbstractInfluxdbRepository(abstract_repository.AbstractRepository):
|
||||
self.conf.influxdb.ip_address,
|
||||
self.conf.influxdb.port,
|
||||
self.conf.influxdb.user,
|
||||
self.conf.influxdb.password,
|
||||
self.conf.influxdb.database_name)
|
||||
self.conf.influxdb.password)
|
||||
|
||||
def write_batch(self, data_points):
|
||||
self._influxdb_client.write_points(data_points, 'ms', protocol='line')
|
||||
def write_batch(self, data_points_by_tenant):
|
||||
if self.conf.influxdb.db_per_tenant:
|
||||
for tenant_id, data_points in data_points_by_tenant.items():
|
||||
database = '%s_%s' % (self.conf.influxdb.database_name, tenant_id)
|
||||
self._write_batch(data_points, database)
|
||||
else:
|
||||
# NOTE (brtknr): Chain list of values to avoid multiple calls to
|
||||
# database API endpoint (when db_per_tenant is False).
|
||||
data_points = data_points_by_tenant.chained()
|
||||
self._write_batch(data_points, self.conf.influxdb.database_name)
|
||||
|
||||
def _write_batch(self, data_points, database):
|
||||
# NOTE (brtknr): Loop twice to ensure database is created if missing.
|
||||
for retry in range(2):
|
||||
try:
|
||||
self._influxdb_client.write_points(data_points, 'ms',
|
||||
protocol='line',
|
||||
database=database)
|
||||
break
|
||||
except influxdb.exceptions.InfluxDBClientError as ex:
|
||||
if (str(ex).startswith(DATABASE_NOT_FOUND_MSG) and
|
||||
self.conf.influxdb.db_per_tenant):
|
||||
self._influxdb_client.create_database(database)
|
||||
else:
|
||||
raise
|
||||
|
@ -61,4 +61,4 @@ class AlarmStateHistInfluxdbRepository(
|
||||
|
||||
LOG.debug(line)
|
||||
|
||||
return line
|
||||
return line, tenant_id
|
||||
|
@ -36,6 +36,9 @@ class MetricInfluxdbRepository(abstract_repository.AbstractInfluxdbRepository):
|
||||
value_meta) = parse_measurement_message(message)
|
||||
|
||||
tags = dimensions
|
||||
|
||||
# TODO(brtknr): If database per tenant becomes the default and the only
|
||||
# option, recording tenant_id will be redundant.
|
||||
tags[u'_tenant_id'] = tenant_id
|
||||
tags[u'_region'] = region
|
||||
|
||||
@ -61,4 +64,4 @@ class MetricInfluxdbRepository(abstract_repository.AbstractInfluxdbRepository):
|
||||
|
||||
LOG.debug(data)
|
||||
|
||||
return data
|
||||
return data, tenant_id
|
||||
|
@ -24,12 +24,44 @@ from monasca_persister.repositories import singleton
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
class DataPoints(dict):
|
||||
|
||||
def __init__(self):
|
||||
self.counter = 0
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
raise NotImplementedError('Use append(key, value) instead.')
|
||||
|
||||
def __delitem__(self, key):
|
||||
raise NotImplementedError('Use clear() instead.')
|
||||
|
||||
def pop(self):
|
||||
raise NotImplementedError('Use clear() instead.')
|
||||
|
||||
def popitem(self):
|
||||
raise NotImplementedError('Use clear() instead.')
|
||||
|
||||
def update(self):
|
||||
raise NotImplementedError('Use clear() instead.')
|
||||
|
||||
def chained(self):
|
||||
return [vi for vo in super(DataPoints, self).values() for vi in vo]
|
||||
|
||||
def append(self, key, value):
|
||||
super(DataPoints, self).setdefault(key, []).append(value)
|
||||
self.counter += 1
|
||||
|
||||
def clear(self):
|
||||
super(DataPoints, self).clear()
|
||||
self.counter = 0
|
||||
|
||||
|
||||
@six.add_metaclass(singleton.Singleton)
|
||||
class Persister(six.with_metaclass(ABCMeta, object)):
|
||||
|
||||
def __init__(self, kafka_conf, repository):
|
||||
self._data_points = []
|
||||
self._data_points = DataPoints()
|
||||
|
||||
self._kafka_topic = kafka_conf.topic
|
||||
self._batch_size = kafka_conf.batch_size
|
||||
self.repository = repository()
|
||||
@ -42,20 +74,20 @@ class Persister(six.with_metaclass(ABCMeta, object)):
|
||||
self.repository.write_batch(self._data_points)
|
||||
|
||||
LOG.info("Processed {} messages from topic '{}'".format(
|
||||
len(self._data_points), self._kafka_topic))
|
||||
self._data_points.counter, self._kafka_topic))
|
||||
|
||||
self._data_points = []
|
||||
self._data_points.clear()
|
||||
self._consumer.commit()
|
||||
except Exception as ex:
|
||||
if "partial write: points beyond retention policy dropped" in str(ex):
|
||||
LOG.warning("Some points older than retention policy were dropped")
|
||||
self._data_points = []
|
||||
self._data_points.clear()
|
||||
self._consumer.commit()
|
||||
|
||||
elif cfg.CONF.repositories.ignore_parse_point_error \
|
||||
and "unable to parse" in str(ex):
|
||||
LOG.warning("Some points were unable to be parsed and were dropped")
|
||||
self._data_points = []
|
||||
self._data_points.clear()
|
||||
self._consumer.commit()
|
||||
|
||||
else:
|
||||
@ -67,13 +99,13 @@ class Persister(six.with_metaclass(ABCMeta, object)):
|
||||
try:
|
||||
for message in self._consumer:
|
||||
try:
|
||||
data_point = self.repository.process_message(message)
|
||||
self._data_points.append(data_point)
|
||||
data_point, tenant_id = self.repository.process_message(message)
|
||||
self._data_points.append(tenant_id, data_point)
|
||||
except Exception:
|
||||
LOG.exception('Error processing message. Message is '
|
||||
'being dropped. {}'.format(message))
|
||||
|
||||
if len(self._data_points) >= self._batch_size:
|
||||
if self._data_points.counter >= self._batch_size:
|
||||
self._flush()
|
||||
except Exception:
|
||||
LOG.exception(
|
||||
|
@ -22,6 +22,8 @@ from oslo_config import cfg
|
||||
from monasca_persister.repositories.cassandra import alarm_state_history_repository
|
||||
from monasca_persister.repositories.cassandra import connection_util
|
||||
|
||||
from monasca_persister.repositories.persister import DataPoints
|
||||
|
||||
|
||||
class TestAlarmStateHistoryRepo(base.BaseTestCase):
|
||||
def setUp(self):
|
||||
@ -83,8 +85,9 @@ class TestAlarmStateHistoryRepo(base.BaseTestCase):
|
||||
b'"metric_definition":"dummy_definition"',
|
||||
b'"sub_alarm_state":"dummy_state"']
|
||||
|
||||
output = self.alarm_state_hist_repo.process_message(message)
|
||||
output, tenant_id = self.alarm_state_hist_repo.process_message(message)
|
||||
|
||||
self.assertEqual(tenant_id, 'dummytenantId')
|
||||
self.assertEqual(output[0], self.alarm_state_hist_repo._retention)
|
||||
self.assertEqual(output[1], b'"dummymetrics"')
|
||||
self.assertEqual(output[2], b'dummyoldState')
|
||||
@ -103,5 +106,6 @@ class TestAlarmStateHistoryRepo(base.BaseTestCase):
|
||||
cfg.CONF = Mock(kafka_alarm_history=Mock(batch_size=1))
|
||||
|
||||
self._session, self._upsert_stmt = Mock(), Mock()
|
||||
alarm_state_hists = ['elem']
|
||||
self.alarm_state_hist_repo.write_batch(alarm_state_hists)
|
||||
alarm_state_hists_by_tenant = DataPoints()
|
||||
alarm_state_hists_by_tenant.append('fake_tenant', 'elem')
|
||||
self.alarm_state_hist_repo.write_batch(alarm_state_hists_by_tenant)
|
||||
|
@ -64,7 +64,10 @@ class TestInfluxdbAlarmStateHistoryRepo(base.BaseTestCase):
|
||||
'\\"metric_definition\\":\\"dummy_definition\\"',
|
||||
'\\"sub_alarm_state\\":\\"dummy_state\\"',
|
||||
'\\"current_values\\":\\"dummy_values\\"']
|
||||
actual_output = self.alarm_state_repo.process_message(message)
|
||||
|
||||
actual_output, tenant_id = self.alarm_state_repo.process_message(message)
|
||||
|
||||
self.assertEqual(tenant_id, 'dummytenantId')
|
||||
self.assertIn(expected_output, actual_output)
|
||||
for elem in expected_dict:
|
||||
self.assertIn(elem, actual_output)
|
||||
|
@ -15,7 +15,6 @@
|
||||
|
||||
from mock import Mock
|
||||
from mock import patch
|
||||
from six import string_types
|
||||
|
||||
from oslotest import base
|
||||
from oslo_config import cfg
|
||||
@ -35,7 +34,7 @@ class TestMetricInfluxdbRepository(base.BaseTestCase):
|
||||
metric = self._get_metric()
|
||||
with patch.object(cfg, 'CONF', return_value=None):
|
||||
metric_repo = MetricInfluxdbRepository()
|
||||
self.assertIsInstance(metric_repo.process_message(metric), string_types)
|
||||
self.assertIsInstance(metric_repo.process_message(metric), tuple)
|
||||
|
||||
def _get_metric(self):
|
||||
metric = '''
|
||||
|
@ -23,6 +23,7 @@ from oslo_config import cfg
|
||||
from monasca_common.kafka import consumer
|
||||
from monasca_persister.kafka.legacy_kafka_persister import LegacyKafkaPersister
|
||||
from monasca_persister.repositories.persister import LOG
|
||||
from monasca_persister.repositories.persister import DataPoints
|
||||
|
||||
|
||||
class FakeException(Exception):
|
||||
@ -84,7 +85,7 @@ class TestPersisterRepo(base.BaseTestCase):
|
||||
|
||||
def test_run_if_consumer_is_faulty(self):
|
||||
with patch.object(os, '_exit', return_value=None) as mock_exit:
|
||||
self.persister._data_points = []
|
||||
self.persister._data_points = DataPoints()
|
||||
self.persister._consumer = Mock(side_effect=FakeException)
|
||||
self.persister.run()
|
||||
mock_exit.assert_called_once_with(1)
|
||||
@ -92,21 +93,22 @@ class TestPersisterRepo(base.BaseTestCase):
|
||||
def test_run_logs_exception_from_consumer(self):
|
||||
with patch.object(self.persister.repository, 'process_message',
|
||||
side_effect=FakeException):
|
||||
self.persister._data_points = ()
|
||||
self.persister._data_points = DataPoints()
|
||||
self.persister._consumer = ['aa']
|
||||
self.persister.run()
|
||||
self.mock_log_exception.assert_called()
|
||||
|
||||
def test_run_commit_is_called_and_data_points_is_emptied(self):
|
||||
with patch.object(self.persister.repository, 'process_message',
|
||||
return_value='message'):
|
||||
return_value=('message', 'tenant_id')):
|
||||
with patch.object(self.persister, '_consumer', return_value=Mock()) as mock_consumer:
|
||||
self.persister._data_points = ['a']
|
||||
self.persister._data_points = DataPoints()
|
||||
self.persister._data_points.append('fake_tenant_id', 'some')
|
||||
self.persister._consumer.__iter__.return_value = ('aa', 'bb')
|
||||
self.persister._batch_size = 1
|
||||
self.persister.run()
|
||||
mock_consumer.commit.assert_called()
|
||||
self.assertEqual([], self.persister._data_points)
|
||||
self.assertEqual(0, self.persister._data_points.counter)
|
||||
|
||||
def test_flush_logs_warning_and_exception(self):
|
||||
exception_msgs = ['partial write: points beyond retention policy dropped',
|
||||
@ -115,7 +117,8 @@ class TestPersisterRepo(base.BaseTestCase):
|
||||
return_value=True)):
|
||||
for elem in exception_msgs:
|
||||
with patch.object(LOG, 'info', side_effect=FakeException(elem)):
|
||||
self.persister._data_points = ['some']
|
||||
self.persister._data_points = DataPoints()
|
||||
self.persister._data_points.append('fake_tenant_id', 'some')
|
||||
self.persister._flush()
|
||||
self.mock_log_warning.assert_called()
|
||||
|
||||
@ -124,6 +127,7 @@ class TestPersisterRepo(base.BaseTestCase):
|
||||
with(patch.object(cfg.CONF.repositories,
|
||||
'ignore_parse_point_error', return_value=False)):
|
||||
mock_log_info.side_effect.message = 'some msg'
|
||||
self.persister._data_points = ['some']
|
||||
self.persister._data_points = DataPoints()
|
||||
self.persister._data_points.append('fake_tenant_id', 'some')
|
||||
self.assertRaises(FakeException, self.persister._flush)
|
||||
self.mock_log_exception.assert_called()
|
||||
|
@ -0,0 +1,6 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
Configuration option `db_per_tenant` added for InfluxDB to allow
|
||||
data points to be written to dedicated tenant database where the
|
||||
`database_name` prefixes the tenant ID, e.g. monasca_tenantid.
|
Loading…
x
Reference in New Issue
Block a user