Update pep8 checks
* Update max line length to 100 * Clean up codes for pep8 checks from tox.ini Change-Id: I974c0c31dc51784506cbf54b87bc450f2334845e Signed-off-by: Jui Chandwaskar <jchandwaskar@op5.com>
This commit is contained in:
parent
c567f52f1b
commit
6abefff1b2
@ -18,27 +18,32 @@ from oslo_config import cfg
|
||||
from monasca_persister.conf import types
|
||||
|
||||
elasticsearch_opts = [
|
||||
cfg.StrOpt('index_name',
|
||||
help='Index name where events are stored',
|
||||
default='monevents'),
|
||||
cfg.ListOpt('hosts',
|
||||
help='List of Elasticsearch nodes in format host[:port]',
|
||||
default=['localhost:9200'],
|
||||
item_type=types.HostAddressPortType()),
|
||||
cfg.BoolOpt('sniff_on_start',
|
||||
help='Flag indicating whether to obtain a list of nodes from the cluser at startup time',
|
||||
default=False),
|
||||
cfg.BoolOpt('sniff_on_connection_fail',
|
||||
help='Flag controlling if connection failure triggers a sniff',
|
||||
default=False),
|
||||
cfg.IntOpt('sniffer_timeout',
|
||||
help='Number of seconds between automatic sniffs',
|
||||
default=None),
|
||||
cfg.IntOpt('max_retries',
|
||||
help='Maximum number of retries before an exception is propagated',
|
||||
default=3,
|
||||
min=1)
|
||||
]
|
||||
cfg.StrOpt(
|
||||
'index_name',
|
||||
help='Index name where events are stored',
|
||||
default='monevents'),
|
||||
cfg.ListOpt(
|
||||
'hosts',
|
||||
help='List of Elasticsearch nodes in format host[:port]',
|
||||
default=['localhost:9200'],
|
||||
item_type=types.HostAddressPortType()),
|
||||
cfg.BoolOpt(
|
||||
'sniff_on_start',
|
||||
help='Flag indicating whether to obtain a list of nodes from the cluser at startup time',
|
||||
default=False),
|
||||
cfg.BoolOpt(
|
||||
'sniff_on_connection_fail',
|
||||
help='Flag controlling if connection failure triggers a sniff',
|
||||
default=False),
|
||||
cfg.IntOpt(
|
||||
'sniffer_timeout',
|
||||
help='Number of seconds between automatic sniffs',
|
||||
default=None),
|
||||
cfg.IntOpt(
|
||||
'max_retries',
|
||||
help='Maximum number of retries before an exception is propagated',
|
||||
default=3,
|
||||
min=1)]
|
||||
|
||||
elasticsearch_group = cfg.OptGroup(name='elasticsearch', title='elasticsearch')
|
||||
|
||||
|
@ -42,9 +42,10 @@ kafka_alarm_history_opts = [
|
||||
cfg.StrOpt('zookeeper_path',
|
||||
help='Path in zookeeper for kafka consumer group partitioning algorithm',
|
||||
default='/persister_partitions/$kafka_alarm_history.topic'),
|
||||
cfg.IntOpt('batch_size',
|
||||
help='Maximum number of alarm state history messages to buffer before writing to database',
|
||||
default=1),
|
||||
cfg.IntOpt(
|
||||
'batch_size',
|
||||
help='Maximum number of alarm state history messages to buffer before writing to database',
|
||||
default=1),
|
||||
]
|
||||
|
||||
|
||||
|
@ -17,15 +17,21 @@
|
||||
from oslo_config import cfg
|
||||
|
||||
repositories_opts = [
|
||||
cfg.StrOpt(name='metrics_driver',
|
||||
help='The repository driver to use for metrics',
|
||||
default='monasca_persister.repositories.influxdb.metrics_repository:MetricInfluxdbRepository'),
|
||||
cfg.StrOpt(name='alarm_state_history_driver',
|
||||
help='The repository driver to use for alarm state history',
|
||||
default='monasca_persister.repositories.influxdb.metrics_repository:MetricInfluxdbRepository'),
|
||||
cfg.StrOpt(name='events_driver',
|
||||
help='The repository driver to use for events',
|
||||
default='monasca_persister.repositories.elasticsearch.events_repository:ElasticSearchEventsRepository')]
|
||||
cfg.StrOpt(
|
||||
name='metrics_driver',
|
||||
help='The repository driver to use for metrics',
|
||||
default=('monasca_persister.repositories.influxdb.metrics_repository:'
|
||||
'MetricInfluxdbRepository')),
|
||||
cfg.StrOpt(
|
||||
name='alarm_state_history_driver',
|
||||
help='The repository driver to use for alarm state history',
|
||||
default=('monasca_persister.repositories.influxdb.metrics_repository:'
|
||||
'MetricInfluxdbRepository')),
|
||||
cfg.StrOpt(
|
||||
name='events_driver',
|
||||
help='The repository driver to use for events',
|
||||
default=('monasca_persister.repositories.elasticsearch.events_repository:'
|
||||
'ElasticSearchEventsRepository'))]
|
||||
|
||||
repositories_group = cfg.OptGroup(name='repositories',
|
||||
title='repositories')
|
||||
|
@ -61,7 +61,8 @@ def clean_exit(signum, frame=None):
|
||||
for process in processors:
|
||||
try:
|
||||
if process.is_alive():
|
||||
process.terminate() # Sends sigterm which any processes after a notification is sent attempt to handle
|
||||
# Sends sigterm which any processes after a notification is sent attempt to handle
|
||||
process.terminate()
|
||||
wait_for_exit = True
|
||||
except Exception: # nosec
|
||||
# There is really nothing to do if the kill fails, so just go on.
|
||||
|
@ -25,9 +25,10 @@ from monasca_persister.repositories.utils import parse_alarm_state_hist_message
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
UPSERT_CQL = ('update monasca.alarm_state_history USING TTL ? '
|
||||
'set metric = ?, old_state = ?, new_state = ?, sub_alarms = ?, reason = ?, reason_data = ? '
|
||||
'where tenant_id = ? and alarm_id = ? and time_stamp = ?')
|
||||
UPSERT_CQL = (
|
||||
'update monasca.alarm_state_history USING TTL ? '
|
||||
'set metric = ?, old_state = ?, new_state = ?, sub_alarms = ?, reason = ?, reason_data = ? '
|
||||
'where tenant_id = ? and alarm_id = ? and time_stamp = ?')
|
||||
|
||||
|
||||
class AlarmStateHistCassandraRepository(abstract_repository.AbstractCassandraRepository):
|
||||
|
@ -47,5 +47,6 @@ def create_cluster():
|
||||
def create_session(cluster):
|
||||
session = cluster.connect(conf.cassandra.keyspace)
|
||||
session.default_timeout = conf.cassandra.read_timeout
|
||||
session.default_consistency_level = ConsistencyLevel.name_to_value[conf.cassandra.consistency_level]
|
||||
session.default_consistency_level = \
|
||||
ConsistencyLevel.name_to_value[conf.cassandra.consistency_level]
|
||||
return session
|
||||
|
@ -68,7 +68,10 @@ class MetricBatch(object):
|
||||
self.batch_query_by_replicas(bound_stmt, self.measurement_queries)
|
||||
|
||||
def batch_query_by_replicas(self, bound_stmt, query_map):
|
||||
hosts = tuple(self.lb_policy.make_query_plan(working_keyspace=bound_stmt.keyspace, query=bound_stmt))
|
||||
hosts = tuple(
|
||||
self.lb_policy.make_query_plan(
|
||||
working_keyspace=bound_stmt.keyspace,
|
||||
query=bound_stmt))
|
||||
|
||||
queue = query_map.get(hosts, None)
|
||||
if not queue:
|
||||
@ -96,18 +99,21 @@ class MetricBatch(object):
|
||||
|
||||
@staticmethod
|
||||
def log_token_batch_map(name, query_map):
|
||||
LOG.info('%s : Size: %s; Tokens: |%s|' % (name, len(query_map),
|
||||
'|'.join(['%s: %s' % (
|
||||
token,
|
||||
','.join([str(counter.value()) for (batch, counter) in queue]))
|
||||
for token, queue in query_map.items()])))
|
||||
LOG.info('%s : Size: %s; Tokens: |%s|' %
|
||||
(name, len(query_map),
|
||||
'|'.join(['%s: %s' % (
|
||||
token,
|
||||
','.join([str(counter.value()) for (batch, counter) in queue]))
|
||||
for token, queue in query_map.items()])))
|
||||
|
||||
@staticmethod
|
||||
def log_replica_batch_map(name, query_map):
|
||||
LOG.info('%s : Size: %s; Replicas: |%s|' % (name, len(query_map), '|'.join([
|
||||
'%s: %s' % (
|
||||
','.join([h.address for h in hosts]), ','.join([str(counter.value()) for (batch, counter) in queue]))
|
||||
for hosts, queue in query_map.items()])))
|
||||
LOG.info('%s : Size: %s; Replicas: |%s|' %
|
||||
(name, len(query_map), '|'.join([
|
||||
'%s: %s' % (
|
||||
','.join([h.address for h in hosts]),
|
||||
','.join([str(counter.value()) for (batch, counter) in queue]))
|
||||
for hosts, queue in query_map.items()])))
|
||||
|
||||
def get_all_batches(self):
|
||||
self.log_token_batch_map("metric batches", self.metric_queries)
|
||||
|
@ -31,9 +31,10 @@ from monasca_persister.repositories.utils import parse_measurement_message
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
MEASUREMENT_INSERT_CQL = ('update monasca.measurements USING TTL ? '
|
||||
'set value = ?, value_meta = ?, region = ?, tenant_id = ?, metric_name = ?, dimensions = ? '
|
||||
'where metric_id = ? and time_stamp = ?')
|
||||
MEASUREMENT_INSERT_CQL = (
|
||||
'update monasca.measurements USING TTL ? '
|
||||
'set value = ?, value_meta = ?, region = ?, tenant_id = ?, metric_name = ?, dimensions = ? '
|
||||
'where metric_id = ? and time_stamp = ?')
|
||||
|
||||
MEASUREMENT_UPDATE_CQL = ('update monasca.measurements USING TTL ? '
|
||||
'set value = ?, value_meta = ? where metric_id = ? and time_stamp = ?')
|
||||
@ -66,8 +67,16 @@ RETRIEVE_METRIC_DIMENSION_CQL = ('select region, tenant_id, metric_name, '
|
||||
'WHERE token(region, tenant_id, metric_name) > ? '
|
||||
'and token(region, tenant_id, metric_name) <= ? ')
|
||||
|
||||
Metric = namedtuple('Metric', ['id', 'region', 'tenant_id', 'name', 'dimension_list', 'dimension_names',
|
||||
'time_stamp', 'value', 'value_meta'])
|
||||
Metric = namedtuple('Metric',
|
||||
['id',
|
||||
'region',
|
||||
'tenant_id',
|
||||
'name',
|
||||
'dimension_list',
|
||||
'dimension_names',
|
||||
'time_stamp',
|
||||
'value',
|
||||
'value_meta'])
|
||||
|
||||
|
||||
class MetricCassandraRepository(abstract_repository.AbstractCassandraRepository):
|
||||
@ -101,7 +110,10 @@ class MetricCassandraRepository(abstract_repository.AbstractCassandraRepository)
|
||||
|
||||
self._retrieve_metric_dimension_stmt = self._session.prepare(RETRIEVE_METRIC_DIMENSION_CQL)
|
||||
|
||||
self._metric_batch = MetricBatch(self._cluster.metadata, self._cluster.load_balancing_policy, self._max_batches)
|
||||
self._metric_batch = MetricBatch(
|
||||
self._cluster.metadata,
|
||||
self._cluster.load_balancing_policy,
|
||||
self._max_batches)
|
||||
|
||||
self._metric_id_cache = LRUCache(self._cache_size)
|
||||
self._dimension_cache = LRUCache(self._cache_size)
|
||||
@ -178,34 +190,29 @@ class MetricCassandraRepository(abstract_repository.AbstractCassandraRepository)
|
||||
self._metric_batch.add_dimension_query(dimension_bound_stmt)
|
||||
self._dimension_cache[dim_key] = dim_key
|
||||
|
||||
metric_dim_key = self._get_metric_dimnesion_key(metric.region, metric.tenant_id, metric.name, name,
|
||||
value)
|
||||
metric_dim_key = self._get_metric_dimnesion_key(
|
||||
metric.region, metric.tenant_id, metric.name, name, value)
|
||||
if not self._metric_dimension_cache.get(metric_dim_key, None):
|
||||
dimension_metric_bound_stmt = self._dimension_metric_stmt.bind((metric.region,
|
||||
metric.tenant_id,
|
||||
name,
|
||||
value,
|
||||
metric.name))
|
||||
dimension_metric_bound_stmt = self._dimension_metric_stmt.bind(
|
||||
(metric.region, metric.tenant_id, name, value, metric.name))
|
||||
self._metric_batch.add_dimension_metric_query(dimension_metric_bound_stmt)
|
||||
|
||||
metric_dimension_bound_stmt = self._metric_dimension_stmt.bind((metric.region,
|
||||
metric.tenant_id,
|
||||
metric.name,
|
||||
name,
|
||||
value))
|
||||
metric_dimension_bound_stmt = self._metric_dimension_stmt.bind(
|
||||
(metric.region, metric.tenant_id, metric.name, name, value))
|
||||
self._metric_batch.add_metric_dimension_query(metric_dimension_bound_stmt)
|
||||
|
||||
self._metric_dimension_cache[metric_dim_key] = metric_dim_key
|
||||
|
||||
measurement_insert_bound_stmt = self._measurement_insert_stmt.bind((self._retention,
|
||||
metric.value,
|
||||
metric.value_meta,
|
||||
metric.region,
|
||||
metric.tenant_id,
|
||||
metric.name,
|
||||
metric.dimension_list,
|
||||
id_bytes,
|
||||
metric.time_stamp))
|
||||
measurement_insert_bound_stmt = self._measurement_insert_stmt.bind(
|
||||
(self._retention,
|
||||
metric.value,
|
||||
metric.value_meta,
|
||||
metric.region,
|
||||
metric.tenant_id,
|
||||
metric.name,
|
||||
metric.dimension_list,
|
||||
id_bytes,
|
||||
metric.time_stamp))
|
||||
self._metric_batch.add_measurement_query(measurement_insert_bound_stmt)
|
||||
|
||||
return metric
|
||||
@ -240,7 +247,9 @@ class MetricCassandraRepository(abstract_repository.AbstractCassandraRepository)
|
||||
key = self._get_dimnesion_key(row.region, row.tenant_id, row.name, row.value)
|
||||
self._dimension_cache[key] = key
|
||||
|
||||
LOG.info("loaded %s dimension entries cache from database into cache." % self._dimension_cache.currsize)
|
||||
LOG.info(
|
||||
"loaded %s dimension entries cache from database into cache." %
|
||||
self._dimension_cache.currsize)
|
||||
|
||||
@staticmethod
|
||||
def _get_dimnesion_key(region, tenant_id, name, value):
|
||||
@ -258,16 +267,22 @@ class MetricCassandraRepository(abstract_repository.AbstractCassandraRepository)
|
||||
|
||||
cnt = 0
|
||||
for row in rows:
|
||||
key = self._get_metric_dimnesion_key(row.region, row.tenant_id, row.metric_name, row.dimension_name,
|
||||
row.dimension_value)
|
||||
key = self._get_metric_dimnesion_key(
|
||||
row.region,
|
||||
row.tenant_id,
|
||||
row.metric_name,
|
||||
row.dimension_name,
|
||||
row.dimension_value)
|
||||
self._metric_dimension_cache[key] = key
|
||||
cnt += 1
|
||||
|
||||
LOG.info("loaded %s metric dimension entries from database into cache." % cnt)
|
||||
LOG.info(
|
||||
"total loaded %s metric dimension entries in cache." % self._metric_dimension_cache.currsize)
|
||||
"total loaded %s metric dimension entries in cache." %
|
||||
self._metric_dimension_cache.currsize)
|
||||
|
||||
@staticmethod
|
||||
def _get_metric_dimnesion_key(region, tenant_id, metric_name, dimension_name, dimension_value):
|
||||
|
||||
return '%s\0%s\0%s\0%s\0%s' % (region, tenant_id, metric_name, dimension_name, dimension_value)
|
||||
return '%s\0%s\0%s\0%s\0%s' % (region, tenant_id, metric_name,
|
||||
dimension_name, dimension_value)
|
||||
|
@ -46,4 +46,8 @@ class MonascaRetryPolicy(RetryPolicy):
|
||||
|
||||
def on_unavailable(self, query, consistency, required_replicas, alive_replicas, retry_num):
|
||||
|
||||
return (self.RETRY_NEXT_HOST, consistency) if retry_num < self.unavailable_attempts else (self.RETHROW, None)
|
||||
return (
|
||||
self.RETRY_NEXT_HOST,
|
||||
consistency) if retry_num < self.unavailable_attempts else (
|
||||
self.RETHROW,
|
||||
None)
|
||||
|
@ -63,5 +63,6 @@ class TokenRangeQueryManager(object):
|
||||
|
||||
|
||||
def execute_query_token_range(token_range):
|
||||
results = TokenRangeQueryManager.session.execute(TokenRangeQueryManager.prepared.bind(token_range))
|
||||
results = TokenRangeQueryManager.session.execute(
|
||||
TokenRangeQueryManager.prepared.bind(token_range))
|
||||
TokenRangeQueryManager.result_handler(results)
|
||||
|
@ -24,7 +24,7 @@ LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class AlarmStateHistInfluxdbRepository(
|
||||
abstract_repository.AbstractInfluxdbRepository):
|
||||
abstract_repository.AbstractInfluxdbRepository):
|
||||
|
||||
def __init__(self):
|
||||
|
||||
@ -36,7 +36,7 @@ class AlarmStateHistInfluxdbRepository(
|
||||
lifecycle_state, state_change_reason,
|
||||
sub_alarms_json_snake_case, tenant_id,
|
||||
time_stamp) = parse_alarm_state_hist_message(
|
||||
message)
|
||||
message)
|
||||
|
||||
name = u'alarm_state_history'
|
||||
fields = []
|
||||
|
@ -55,7 +55,8 @@ class MetricInfluxdbRepository(abstract_repository.AbstractInfluxdbRepository):
|
||||
value_field = u'value={}'.format(value)
|
||||
value_meta_field = u'value_meta=' + value_meta_str
|
||||
|
||||
data = key_values + u' ' + value_field + u',' + value_meta_field + u' ' + str(int(time_stamp))
|
||||
data = key_values + u' ' + value_field + u',' + \
|
||||
value_meta_field + u' ' + str(int(time_stamp))
|
||||
|
||||
LOG.debug(data)
|
||||
|
||||
|
@ -33,14 +33,14 @@ class Persister(object):
|
||||
self._batch_size = kafka_conf.batch_size
|
||||
|
||||
self._consumer = consumer.KafkaConsumer(
|
||||
kafka_conf.uri,
|
||||
zookeeper_conf.uri,
|
||||
kafka_conf.zookeeper_path,
|
||||
kafka_conf.group_id,
|
||||
kafka_conf.topic,
|
||||
repartition_callback=self._flush,
|
||||
commit_callback=self._flush,
|
||||
commit_timeout=kafka_conf.max_wait_time_seconds)
|
||||
kafka_conf.uri,
|
||||
zookeeper_conf.uri,
|
||||
kafka_conf.zookeeper_path,
|
||||
kafka_conf.group_id,
|
||||
kafka_conf.topic,
|
||||
repartition_callback=self._flush,
|
||||
commit_callback=self._flush,
|
||||
commit_timeout=kafka_conf.max_wait_time_seconds)
|
||||
|
||||
self.repository = repository()
|
||||
|
||||
@ -52,7 +52,7 @@ class Persister(object):
|
||||
self.repository.write_batch(self._data_points)
|
||||
|
||||
LOG.info("Processed {} messages from topic '{}'".format(
|
||||
len(self._data_points), self._kafka_topic))
|
||||
len(self._data_points), self._kafka_topic))
|
||||
|
||||
self._data_points = []
|
||||
self._consumer.commit()
|
||||
@ -79,7 +79,7 @@ class Persister(object):
|
||||
self._flush()
|
||||
except Exception:
|
||||
LOG.exception(
|
||||
'Persister encountered fatal exception processing '
|
||||
'messages. '
|
||||
'Shutting down all threads and exiting')
|
||||
'Persister encountered fatal exception processing '
|
||||
'messages. '
|
||||
'Shutting down all threads and exiting')
|
||||
os._exit(1)
|
||||
|
@ -78,21 +78,21 @@ def parse_alarm_state_hist_message(message):
|
||||
sub_alarms_json = json.dumps(sub_alarms, ensure_ascii=False)
|
||||
|
||||
sub_alarms_json_snake_case = sub_alarms_json.replace(
|
||||
'"subAlarmExpression":',
|
||||
'"sub_alarm_expression":')
|
||||
'"subAlarmExpression":',
|
||||
'"sub_alarm_expression":')
|
||||
|
||||
sub_alarms_json_snake_case = sub_alarms_json_snake_case.replace(
|
||||
'"currentValues":',
|
||||
'"current_values":')
|
||||
'"currentValues":',
|
||||
'"current_values":')
|
||||
|
||||
# jobrs: I do not think that this shows up
|
||||
sub_alarms_json_snake_case = sub_alarms_json_snake_case.replace(
|
||||
'"metricDefinition":',
|
||||
'"metric_definition":')
|
||||
'"metricDefinition":',
|
||||
'"metric_definition":')
|
||||
|
||||
sub_alarms_json_snake_case = sub_alarms_json_snake_case.replace(
|
||||
'"subAlarmState":',
|
||||
'"sub_alarm_state":')
|
||||
'"subAlarmState":',
|
||||
'"sub_alarm_state":')
|
||||
else:
|
||||
sub_alarms_json_snake_case = "[]"
|
||||
|
||||
|
@ -119,7 +119,7 @@ class TestPersister(base.BaseTestCase):
|
||||
def test_active_children_are_killed_during_exit(self):
|
||||
|
||||
with patch.object(self.persister.multiprocessing, 'active_children') as active_children,\
|
||||
patch.object(self.persister.os, 'kill') as mock_kill:
|
||||
patch.object(self.persister.os, 'kill') as mock_kill:
|
||||
|
||||
active_children.return_value = [Mock(name='child-1', pid=1),
|
||||
Mock(name='child-2', pid=2)]
|
||||
|
5
tox.ini
5
tox.ini
@ -67,11 +67,10 @@ commands =
|
||||
flake8 monasca_persister
|
||||
|
||||
[flake8]
|
||||
max-line-length = 120
|
||||
max-line-length = 100
|
||||
# TODO: ignored checks should be enabled in the future
|
||||
# H405 multi line docstring summary not separated with an empty line
|
||||
# H904 Wrap long lines in parentheses instead of a backslash
|
||||
ignore = F821,H405,H904,E126,E125,H306,E302,E122
|
||||
ignore = F821,H405,H306,E302
|
||||
exclude=.venv,.git,.tox,dist,*egg,build
|
||||
|
||||
[bandit]
|
||||
|
Loading…
Reference in New Issue
Block a user