From 613bab70b8d407c2546bacc9cd9849c0ccd45a8c Mon Sep 17 00:00:00 2001 From: Shinya Kawabata Date: Thu, 21 Jul 2016 05:55:46 +0900 Subject: [PATCH] Revival of Cassandra repository Revival and adding some modification to support schema change. See also: I914176d60bfce91fbe449702f7e78bb2f78706ce Change-Id: I2152057ef4843b555a21118cb20bd9eff36c3b7a --- monasca_persister/persister.conf | 8 ++ monasca_persister/persister.py | 2 +- .../repositories/cassandra/__init__.py | 22 ++++ .../cassandra/abstract_repository.py | 37 ++++++ .../alarm_state_history_repository.py | 71 +++++++++++ .../cassandra/metrics_repository.py | 115 ++++++++++++++++++ requirements.txt | 1 + 7 files changed, 255 insertions(+), 1 deletion(-) create mode 100644 monasca_persister/repositories/cassandra/__init__.py create mode 100644 monasca_persister/repositories/cassandra/abstract_repository.py create mode 100644 monasca_persister/repositories/cassandra/alarm_state_history_repository.py create mode 100644 monasca_persister/repositories/cassandra/metrics_repository.py diff --git a/monasca_persister/persister.conf b/monasca_persister/persister.conf index ed2670f2..c6d99437 100644 --- a/monasca_persister/persister.conf +++ b/monasca_persister/persister.conf @@ -10,9 +10,11 @@ verbose = true [repositories] # The driver to use for the metrics repository metrics_driver = monasca_persister.repositories.influxdb.metrics_repository:MetricInfluxdbRepository +#metrics_driver = monasca_persister.repositories.cassandra.metrics_repository:MetricCassandraRepository # The driver to use for the alarm state history repository alarm_state_history_driver = monasca_persister.repositories.influxdb.alarm_state_history_repository:AlarmStateHistInfluxdbRepository +#alarm_state_history_driver = monasca_persister.repositories.cassandra.alarm_state_history_repository:AlarmStateHistCassandraRepository [zookeeper] # Comma separated list of host:port @@ -61,3 +63,9 @@ ip_address = 192.168.10.4 port = 8086 user = mon_persister password = password + +# Uncomment, set cluster_ip_addresses, and change the repositories to point to the cassandra classes +#[cassandra] +# Comma separated list of Cassandra node IP addresses. No spaces. +#cluster_ip_addresses: 192.168.10.6 +#keyspace: monasca diff --git a/monasca_persister/persister.py b/monasca_persister/persister.py index 4c541e8a..5a9e70b8 100644 --- a/monasca_persister/persister.py +++ b/monasca_persister/persister.py @@ -16,7 +16,7 @@ """Persister Module The Persister reads metrics and alarms from Kafka and then stores them - in into Influxdb + in into either Influxdb or Cassandra Start the perister as stand-alone process by running 'persister.py --config-file ' diff --git a/monasca_persister/repositories/cassandra/__init__.py b/monasca_persister/repositories/cassandra/__init__.py new file mode 100644 index 00000000..a9d04897 --- /dev/null +++ b/monasca_persister/repositories/cassandra/__init__.py @@ -0,0 +1,22 @@ +# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from oslo_config import cfg + +cassandra_opts = [cfg.StrOpt('cluster_ip_addresses'), + cfg.StrOpt('keyspace')] + +cassandra_group = cfg.OptGroup(name='cassandra') +cfg.CONF.register_group(cassandra_group) +cfg.CONF.register_opts(cassandra_opts, cassandra_group) diff --git a/monasca_persister/repositories/cassandra/abstract_repository.py b/monasca_persister/repositories/cassandra/abstract_repository.py new file mode 100644 index 00000000..a0d5e9d7 --- /dev/null +++ b/monasca_persister/repositories/cassandra/abstract_repository.py @@ -0,0 +1,37 @@ +# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import abc +from cassandra.cluster import Cluster +from cassandra.query import BatchStatement +from oslo_config import cfg +import six + +from repositories.abstract_repository import AbstractRepository + + +@six.add_metaclass(abc.ABCMeta) +class AbstractCassandraRepository(AbstractRepository): + + def __init__(self): + super(AbstractCassandraRepository, self).__init__() + self.conf = cfg.CONF + + self._cassandra_cluster = Cluster( + self.conf.cassandra.cluster_ip_addresses.split(',')) + + self.cassandra_session = self._cassandra_cluster.connect( + self.conf.cassandra.keyspace) + + self._batch_stmt = BatchStatement() diff --git a/monasca_persister/repositories/cassandra/alarm_state_history_repository.py b/monasca_persister/repositories/cassandra/alarm_state_history_repository.py new file mode 100644 index 00000000..bc583589 --- /dev/null +++ b/monasca_persister/repositories/cassandra/alarm_state_history_repository.py @@ -0,0 +1,71 @@ +# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import json + +from cassandra.query import BatchStatement +from oslo_log import log + +from repositories.cassandra.abstract_repository import AbstractCassandraRepository +from repositories.utils import parse_alarm_state_hist_message + +LOG = log.getLogger(__name__) + + +class AlarmStateHistCassandraRepository(AbstractCassandraRepository): + + def __init__(self): + + super(AlarmStateHistCassandraRepository, self).__init__() + + self._insert_alarm_state_hist_stmt = self.cassandra_session.prepare( + 'insert into alarm_state_history (tenant_id, alarm_id, ' + 'metrics, new_state, ' + 'old_state, reason, reason_data, ' + 'sub_alarms, time_stamp) values (?,?,?,?,?,?,?,?,?)') + + def process_message(self, message): + + (alarm_id, metrics, new_state, old_state, link, + lifecycle_state, state_change_reason, + sub_alarms_json_snake_case, tenant_id, + time_stamp) = parse_alarm_state_hist_message( + message) + + alarm_state_hist = ( + tenant_id.encode('utf8'), + alarm_id.encode('utf8'), + json.dumps(metrics, ensure_ascii=False).encode( + 'utf8'), + new_state.encode('utf8'), + old_state.encode('utf8'), + state_change_reason.encode('utf8'), + "{}".encode('utf8'), + sub_alarms_json_snake_case.encode('utf8'), + time_stamp + ) + + LOG.debug(alarm_state_hist) + + return alarm_state_hist + + def write_batch(self, alarm_state_hists): + + for alarm_state_hist in alarm_state_hists: + self._batch_stmt.add(self._insert_alarm_state_hist_stmt, + alarm_state_hist) + + self.cassandra_session.execute(self._batch_stmt) + + self._batch_stmt = BatchStatement() diff --git a/monasca_persister/repositories/cassandra/metrics_repository.py b/monasca_persister/repositories/cassandra/metrics_repository.py new file mode 100644 index 00000000..eac73647 --- /dev/null +++ b/monasca_persister/repositories/cassandra/metrics_repository.py @@ -0,0 +1,115 @@ +# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import hashlib +import json + +from cassandra.query import BatchStatement +from oslo_log import log +import urllib + +from repositories.cassandra.abstract_repository import AbstractCassandraRepository +from repositories.utils import parse_measurement_message + +LOG = log.getLogger(__name__) + + +class MetricCassandraRepository(AbstractCassandraRepository): + + def __init__(self): + + super(MetricCassandraRepository, self).__init__() + + self._insert_measurement_stmt = self.cassandra_session.prepare( + 'insert into measurements (tenant_id,' + 'region, metric_hash, time_stamp, value,' + 'value_meta) values (?, ?, ?, ?, ?, ?)') + + self._insert_metric_map_stmt = self.cassandra_session.prepare( + 'insert into metric_map (tenant_id,' + 'region, metric_hash, ' + 'metric_map) values' + '(?,?,?,?)') + + def process_message(self, message): + + (dimensions, metric_name, region, tenant_id, time_stamp, value, + value_meta) = parse_measurement_message(message) + + metric_hash, metric_map = create_metric_hash(metric_name, + dimensions) + + measurement = (tenant_id.encode('utf8'), + region.encode('utf8'), + metric_hash, + time_stamp, + value, + json.dumps(value_meta, ensure_ascii=False).encode( + 'utf8')) + + LOG.debug(measurement) + + return MetricMeasurementInfo( + tenant_id.encode('utf8'), + region.encode('utf8'), + metric_hash, + metric_map, + measurement) + + def write_batch(self, metric_measurement_infos): + + for metric_measurement_info in metric_measurement_infos: + + self._batch_stmt.add(self._insert_measurement_stmt, + metric_measurement_info.measurement) + + metric_map = (metric_measurement_info.tenant_id, + metric_measurement_info.region, + metric_measurement_info.metric_hash, + metric_measurement_info.metric_map) + + self._batch_stmt.add(self._insert_metric_map_stmt, + metric_map) + + self.cassandra_session.execute(self._batch_stmt) + + self._batch_stmt = BatchStatement() + + +class MetricMeasurementInfo(object): + + def __init__(self, tenant_id, region, metric_hash, metric_map, + measurement): + + self.tenant_id = tenant_id + self.region = region + self.metric_hash = metric_hash + self.metric_map = metric_map + self.measurement = measurement + + +def create_metric_hash(metric_name, dimensions): + + dimensions['__name__'] = urllib.quote_plus(metric_name) + + hash_string = '' + + for dim_name in sorted(dimensions.iterkeys()): + dimension = (urllib.quote_plus(dim_name) + '=' + urllib.quote_plus( + dimensions[dim_name])) + hash_string += dimension + + sha1_hash = hashlib.sha1(hash_string).hexdigest() + + return bytearray.fromhex(sha1_hash), dimensions diff --git a/requirements.txt b/requirements.txt index 2db44f57..d0b8d220 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,5 @@ oslo.log six>=1.9.0 #influxdb==2.8.0 +#cassandra-driver>=2.1.4,!=3.6.0 # Apache-2.0 monasca-common