Add support for persisting events
This change extends the persister to read OpenStack event objects from kafka topic and then stores them into ElasticSearch. Story: 2001112 Task: 4843 Change-Id: I35b0db67dc088e56ad281c84fc4b50fa7a064e44
This commit is contained in:
parent
ca5d223611
commit
8273de7ca6
52
monasca_persister/conf/elasticsearch.py
Normal file
52
monasca_persister/conf/elasticsearch.py
Normal file
@ -0,0 +1,52 @@
|
||||
# Copyright 2017 FUJITSU LIMITED
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from monasca_persister.conf import types
|
||||
|
||||
elasticsearch_opts = [
|
||||
cfg.StrOpt('index_name',
|
||||
help='Index name where events are stored',
|
||||
default='monevents'),
|
||||
cfg.ListOpt('hosts',
|
||||
help='List of Elasticsearch nodes in format host[:port]',
|
||||
default=['localhost:9200'],
|
||||
item_type=types.HostAddressPortType()),
|
||||
cfg.BoolOpt('sniff_on_start',
|
||||
help='Flag indicating whether to obtain a list of nodes from the cluser at startup time',
|
||||
default=False),
|
||||
cfg.BoolOpt('sniff_on_connection_fail',
|
||||
help='Flag controlling if connection failure triggers a sniff',
|
||||
default=False),
|
||||
cfg.IntOpt('sniffer_timeout',
|
||||
help='Number of seconds between automatic sniffs',
|
||||
default=None),
|
||||
cfg.IntOpt('max_retries',
|
||||
help='Maximum number of retries before an exception is propagated',
|
||||
default=3,
|
||||
min=1)
|
||||
]
|
||||
|
||||
elasticsearch_group = cfg.OptGroup(name='elasticsearch', title='elasticsearch')
|
||||
|
||||
|
||||
def register_opts(conf):
|
||||
conf.register_group(elasticsearch_group)
|
||||
conf.register_opts(elasticsearch_opts, elasticsearch_group)
|
||||
|
||||
|
||||
def list_opts():
|
||||
return elasticsearch_group, elasticsearch_opts
|
54
monasca_persister/conf/kafka_events.py
Normal file
54
monasca_persister/conf/kafka_events.py
Normal file
@ -0,0 +1,54 @@
|
||||
# Copyright 2017 FUJITSU LIMITED
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from copy import deepcopy
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from monasca_persister.conf import kafka_common
|
||||
from monasca_persister.conf import types
|
||||
|
||||
kafka_events_group = cfg.OptGroup(name='kafka_events',
|
||||
title='kafka_events')
|
||||
kafka_events_opts = [
|
||||
cfg.ListOpt('uri',
|
||||
help='Comma separated list of Kafka broker host:port',
|
||||
default=['127.0.0.1:9092'],
|
||||
item_type=types.HostAddressPortType()),
|
||||
cfg.StrOpt('group_id',
|
||||
help='Kafka Group from which persister get data',
|
||||
default='1_events'),
|
||||
cfg.StrOpt('topic',
|
||||
help='Kafka Topic from which persister get data',
|
||||
default='monevents'),
|
||||
cfg.StrOpt('zookeeper_path',
|
||||
help='Path in zookeeper for kafka consumer group partitioning algorithm',
|
||||
default='/persister_partitions/$kafka_events.topic'),
|
||||
]
|
||||
|
||||
# Replace Default OPT with reference to kafka group option
|
||||
kafka_common_opts = deepcopy(kafka_common.kafka_common_opts)
|
||||
for opt in kafka_common_opts:
|
||||
opt.default = '$kafka.{}'.format(opt.name)
|
||||
|
||||
|
||||
def register_opts(conf):
|
||||
conf.register_group(kafka_events_group)
|
||||
conf.register_opts(kafka_events_opts + kafka_common_opts,
|
||||
kafka_events_group)
|
||||
|
||||
|
||||
def list_opts():
|
||||
return kafka_events_group, kafka_events_opts
|
@ -22,7 +22,10 @@ repositories_opts = [
|
||||
default='monasca_persister.repositories.influxdb.metrics_repository:MetricInfluxdbRepository'),
|
||||
cfg.StrOpt(name='alarm_state_history_driver',
|
||||
help='The repository driver to use for alarm state history',
|
||||
default='monasca_persister.repositories.influxdb.metrics_repository:MetricInfluxdbRepository')]
|
||||
default='monasca_persister.repositories.influxdb.metrics_repository:MetricInfluxdbRepository'),
|
||||
cfg.StrOpt(name='events_driver',
|
||||
help='The repository driver to use for events',
|
||||
default='monasca_persister.repositories.elasticsearch.events_repository:ElasticSearchEventsRepository')]
|
||||
|
||||
repositories_group = cfg.OptGroup(name='repositories',
|
||||
title='repositories')
|
||||
|
@ -95,23 +95,24 @@ def start_process(respository, kafka_config):
|
||||
m_persister.run()
|
||||
|
||||
|
||||
def prepare_processes(conf, repo_driver):
|
||||
if conf.num_processors > 0:
|
||||
repository = simport.load(repo_driver)
|
||||
for proc in range(0, conf.num_processors):
|
||||
processors.append(multiprocessing.Process(
|
||||
target=start_process, args=(repository, conf)))
|
||||
|
||||
def main():
|
||||
"""Start persister."""
|
||||
|
||||
config.parse_args()
|
||||
|
||||
metric_repository = simport.load(cfg.CONF.repositories.metrics_driver)
|
||||
alarm_state_history_repository = simport.load(cfg.CONF.repositories.alarm_state_history_driver)
|
||||
|
||||
# Add processors for metrics topic
|
||||
for proc in range(0, cfg.CONF.kafka_metrics.num_processors):
|
||||
processors.append(multiprocessing.Process(
|
||||
target=start_process, args=(metric_repository, cfg.CONF.kafka_metrics)))
|
||||
|
||||
prepare_processes(cfg.CONF.kafka_metrics, cfg.CONF.repositories.metrics_driver)
|
||||
# Add processors for alarm history topic
|
||||
for proc in range(0, cfg.CONF.kafka_alarm_history.num_processors):
|
||||
processors.append(multiprocessing.Process(
|
||||
target=start_process, args=(alarm_state_history_repository, cfg.CONF.kafka_alarm_history)))
|
||||
prepare_processes(cfg.CONF.kafka_alarm_history, cfg.CONF.repositories.alarm_state_history_driver)
|
||||
# Add processors for events topic
|
||||
prepare_processes(cfg.CONF.kafka_events, cfg.CONF.repositories.events_driver)
|
||||
|
||||
# Start
|
||||
try:
|
||||
|
@ -0,0 +1,74 @@
|
||||
# Copyright 2017 FUJITSU LIMITED
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import ujson
|
||||
|
||||
from datetime import datetime
|
||||
from elasticsearch import Elasticsearch
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
|
||||
from monasca_persister.repositories import abstract_repository
|
||||
from monasca_persister.repositories import utils
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class ElasticSearchEventsRepository(abstract_repository.AbstractRepository):
|
||||
def __init__(self):
|
||||
super(ElasticSearchEventsRepository, self).__init__()
|
||||
self.conf = cfg.CONF.elasticsearch
|
||||
self.es = Elasticsearch(
|
||||
hosts=self.conf.hosts,
|
||||
sniff_on_start=self.conf.sniff_on_start,
|
||||
sniff_on_connection_fail=self.conf.sniff_on_connection_fail,
|
||||
sniffer_timeout=self.conf.sniffer_timeout,
|
||||
max_retries=self.conf.max_retries
|
||||
)
|
||||
|
||||
def process_message(self, message):
|
||||
return utils.parse_events_message(message)
|
||||
|
||||
def write_batch(self, data_points):
|
||||
for data_point in data_points:
|
||||
(tenant_id, timestamp, event_type, payload) = data_point
|
||||
|
||||
index = '%s-%s-%s' % (self.conf.index_name, tenant_id,
|
||||
ElasticSearchEventsRepository._normalize_timestamp(timestamp))
|
||||
|
||||
body = {
|
||||
'tenant_id': tenant_id,
|
||||
'timestamp': timestamp,
|
||||
'event_type': event_type,
|
||||
'payload': payload
|
||||
}
|
||||
|
||||
self.es.create(
|
||||
index=index,
|
||||
doc_type='event',
|
||||
body=ujson.dumps(body)
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _normalize_timestamp(timestamp):
|
||||
d = None
|
||||
if timestamp and len(timestamp) >= 10:
|
||||
try:
|
||||
d = datetime.strptime(timestamp[0:10], '%Y-%m-%d')
|
||||
except ValueError as e:
|
||||
LOG.warning("Unable to parse timestamp '%s' - %s" % (timestamp, str(e)))
|
||||
if not d:
|
||||
d = datetime.today()
|
||||
return d.strftime('%Y-%m-%d')
|
@ -1,4 +1,5 @@
|
||||
# (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP
|
||||
# Copyright 2017 FUJITSU LIMITED
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@ -98,3 +99,14 @@ def parse_alarm_state_hist_message(message):
|
||||
return (alarm_id, metrics, new_state, old_state, link,
|
||||
lifecycle_state, state_change_reason,
|
||||
sub_alarms_json_snake_case, tenant_id, time_stamp)
|
||||
|
||||
|
||||
def parse_events_message(message):
|
||||
|
||||
decoded_message = json.loads(message.message.value)
|
||||
event_type = decoded_message['event_type']
|
||||
timestamp = decoded_message['timestamp']
|
||||
payload = decoded_message['payload']
|
||||
tenant_id = payload['tenant_id']
|
||||
|
||||
return tenant_id, timestamp, event_type, payload
|
||||
|
54
monasca_persister/tests/events.json
Normal file
54
monasca_persister/tests/events.json
Normal file
@ -0,0 +1,54 @@
|
||||
{
|
||||
"event_1": {
|
||||
"event_type": "compute.instance.create.start",
|
||||
"timestamp": "2017-06-01 09:15:11.494606",
|
||||
"payload": {
|
||||
"state_description": "",
|
||||
"availability_zone": "nova",
|
||||
"terminated_at": "",
|
||||
"ephemeral_gb": 0,
|
||||
"instance_type_id": 1,
|
||||
"deleted_at": "",
|
||||
"reservation_id": "r-74ndofdp",
|
||||
"instance_id": "cb724671-cc36-49cd-9987-d08f2c8356b9",
|
||||
"display_name": "fred",
|
||||
"hostname": "fred",
|
||||
"state": "building",
|
||||
"progress": "",
|
||||
"launched_at": "",
|
||||
"metadata": {
|
||||
},
|
||||
"node": null,
|
||||
"ramdisk_id": "",
|
||||
"access_ip_v6": null,
|
||||
"disk_gb": 1,
|
||||
"access_ip_v4": null,
|
||||
"kernel_id": "",
|
||||
"image_name": "cirros",
|
||||
"host": null,
|
||||
"user_id": "92e0ceb0f3d648ddabeae1bfde4071b2",
|
||||
"image_ref_url": "http://d00-19-99-b3-7e-2e.st.est.fujitsu.com:9292/images/e08428a7-aa34-42bd-9e91-6fe15e0ed2ae",
|
||||
"cell_name": "",
|
||||
"root_gb": 1,
|
||||
"tenant_id": "de98fbff448f4f278a56e9929db70b03",
|
||||
"created_at": "2017-06-01 09:15:10+00:00",
|
||||
"memory_mb": 512,
|
||||
"instance_type": "m1.tiny",
|
||||
"vcpus": 1,
|
||||
"image_meta": {
|
||||
"container_format": "bare",
|
||||
"min_ram": "0",
|
||||
"disk_format": "qcow2",
|
||||
"architecture": "x86_64",
|
||||
"min_disk": "1",
|
||||
"base_image_ref": "e08428a7-aa34-42bd-9e91-6fe15e0ed2ae"
|
||||
},
|
||||
"architecture": "x86_64",
|
||||
"os_type": null,
|
||||
"instance_flavor_id": "1"
|
||||
},
|
||||
"priority": "INFO",
|
||||
"publisher_id": "compute.d00-26-2d-0c-d5-64",
|
||||
"message_id": "5e60c9f1-1cf9-4f5d-9826-91fa329a79c1"
|
||||
}
|
||||
}
|
64
monasca_persister/tests/test_events.py
Normal file
64
monasca_persister/tests/test_events.py
Normal file
@ -0,0 +1,64 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2017 FUJITSU LIMITED
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
import os
|
||||
from oslotest import base
|
||||
from monasca_persister.repositories.elasticsearch import events_repository
|
||||
from monasca_persister.repositories import utils
|
||||
from mock import Mock
|
||||
from testtools import matchers
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class TestEvents(base.BaseTestCase):
|
||||
|
||||
def __init__(self, *args, **kwds):
|
||||
super(TestEvents, self).__init__(*args, **kwds)
|
||||
self.events = None
|
||||
|
||||
def setUp(self):
|
||||
super(TestEvents, self).setUp()
|
||||
|
||||
def tearDown(self):
|
||||
super(TestEvents, self).tearDown()
|
||||
|
||||
def test_parse_event(self):
|
||||
event = self._load_event('event_1')
|
||||
tenant_id, timestamp, event_type, payload = utils.parse_events_message(event)
|
||||
self.assertEqual('de98fbff448f4f278a56e9929db70b03', tenant_id)
|
||||
self.assertEqual('2017-06-01 09:15:11.494606', timestamp)
|
||||
self.assertEqual('compute.instance.create.start', event_type)
|
||||
self.assertIsNotNone(payload)
|
||||
self.assertThat(len(payload), matchers.GreaterThan(0))
|
||||
|
||||
def test_normalize_timestamp(self):
|
||||
today = datetime.today().strftime('%Y-%m-%d')
|
||||
normalize_timestamp = events_repository.ElasticSearchEventsRepository._normalize_timestamp
|
||||
|
||||
self.assertEqual(today, normalize_timestamp(None))
|
||||
self.assertEqual(today, normalize_timestamp(''))
|
||||
self.assertEqual(today, normalize_timestamp('foo'))
|
||||
self.assertEqual(today, normalize_timestamp('2017-02-3'))
|
||||
self.assertEqual(today, normalize_timestamp('2017-02-31'))
|
||||
|
||||
self.assertEqual('2017-08-07', normalize_timestamp('2017-08-07 11:22:43'))
|
||||
|
||||
def _load_event(self, event_name):
|
||||
if self.events is None:
|
||||
filepath = os.path.join(os.path.dirname(__file__), 'events.json')
|
||||
self.events = json.load(open(filepath))
|
||||
# create a kafka message envelope
|
||||
value = json.dumps(self.events[event_name])
|
||||
return Mock(message=Mock(value=value))
|
@ -22,6 +22,7 @@ CONF = cfg.CONF
|
||||
|
||||
NUMBER_OF_METRICS_PROCESSES = 2
|
||||
NUMBER_OF_ALARM_HIST_PROCESSES = 3
|
||||
NUMBER_OF_EVENTS_PROCESSES = 0
|
||||
|
||||
|
||||
class FakeException(Exception):
|
||||
@ -57,6 +58,8 @@ class TestPersister(base.BaseTestCase):
|
||||
|
||||
self.mock_cfg.CONF.kafka_metrics.num_processors = NUMBER_OF_METRICS_PROCESSES
|
||||
self.mock_cfg.CONF.kafka_alarm_history.num_processors = NUMBER_OF_ALARM_HIST_PROCESSES
|
||||
self.mock_cfg.CONF.kafka_events.num_processors = NUMBER_OF_EVENTS_PROCESSES
|
||||
|
||||
self.mock_cfg.CONF.zookeeper = 'zookeeper'
|
||||
|
||||
self.mock_sleep.side_effect = [FakeException, None]
|
||||
|
@ -35,6 +35,8 @@ influxdb =
|
||||
influxdb>=2.9.2 # MIT
|
||||
cassandra =
|
||||
cassandra-driver!=3.6.0,>=2.1.4 # Apache-2.0
|
||||
elasticsearch =
|
||||
elasticsearch>=2.0.0,<=3.0.0 # Apache-2.0
|
||||
|
||||
[pbr]
|
||||
autodoc_index_modules = True
|
||||
|
2
tox.ini
2
tox.ini
@ -20,7 +20,7 @@ whitelist_externals = bash
|
||||
rm
|
||||
install_command =
|
||||
{toxinidir}/tools/tox_install.sh {env:UPPER_CONSTRAINTS_FILE:https://git.openstack.org/cgit/openstack/requirements/plain/upper-constraints.txt} {opts} {packages}
|
||||
deps = .[influxdb,cassandra]
|
||||
deps = .[influxdb,cassandra,elasticsearch]
|
||||
-r{toxinidir}/requirements.txt
|
||||
-r{toxinidir}/test-requirements.txt
|
||||
commands =
|
||||
|
Loading…
x
Reference in New Issue
Block a user