drop remaining gnocchi specific code and fix pep8

This commit is contained in:
gordon chung 2015-12-16 11:00:37 -05:00
parent a5f4bd1692
commit b4ea93a306
9 changed files with 30 additions and 451 deletions

View File

@ -1,321 +0,0 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import datetime
import functools
import logging
import math
import random
import time
import types
from cliff import show
import futurist
from oslo_utils import timeutils
import six.moves
from aodhclient.v1 import metric_cli
LOG = logging.getLogger(__name__)
def _pickle_method(m):
if m.im_self is None:
return getattr, (m.im_class, m.im_func.func_name)
else:
return getattr, (m.im_self, m.im_func.func_name)
six.moves.copyreg.pickle(types.MethodType, _pickle_method)
def grouper(iterable, n, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
# grouper('ABCDEFG', 3, 'x') --> ABC DEF
args = [iter(iterable)] * n
return six.moves.zip(*args)
def _positive_non_zero_int(argument_value):
if argument_value is None:
return None
try:
value = int(argument_value)
except ValueError:
msg = "%s must be an integer" % argument_value
raise argparse.ArgumentTypeError(msg)
if value <= 0:
msg = "%s must be greater than 0" % argument_value
raise argparse.ArgumentTypeError(msg)
return value
def measure_job(fn, *args, **kwargs):
# NOTE(sileht): This is not part of BenchmarkPool
# because we cannot pickle BenchmarkPool class
sw = timeutils.StopWatch().start()
return fn(*args, **kwargs), sw.elapsed()
class BenchmarkPool(futurist.ProcessPoolExecutor):
def submit_job(self, times, fn, *args, **kwargs):
self.sw = timeutils.StopWatch()
self.sw.start()
self.times = times
return [self.submit(measure_job, fn, *args, **kwargs)
for i in six.moves.range(times)]
def map_job(self, fn, iterable, **kwargs):
self.sw = timeutils.StopWatch()
self.sw.start()
r = []
self.times = 0
for item in iterable:
r.append(self.submit(measure_job, fn, item, **kwargs))
self.times += 1
return r
def _log_progress(self, verb):
runtime = self.sw.elapsed()
done = self.statistics.executed
rate = done / runtime if runtime != 0 else 0
LOG.info(
"%d/%d, "
"total: %.2f seconds, "
"rate: %.2f %s/second"
% (done, self.times, runtime, rate, verb))
def wait_job(self, verb, futures):
while self.statistics.executed != self.times:
self._log_progress(verb)
time.sleep(1)
self._log_progress(verb)
self.shutdown(wait=True)
runtime = self.sw.elapsed()
results = []
latencies = []
for f in futures:
try:
result, latency = f.result()
results.append(result)
latencies.append(latency)
except Exception as e:
LOG.error("Error with %s metric: %s" % (verb, e))
latencies = sorted(latencies)
return results, runtime, {
'client workers': self._max_workers,
verb + ' runtime': "%.2f seconds" % runtime,
verb + ' executed': self.statistics.executed,
verb + ' speed': (
"%.2f %s/s" % (self.statistics.executed / runtime, verb)
),
verb + ' failures': self.statistics.failures,
verb + ' failures rate': (
"%.2f %%" % (
100
* self.statistics.failures
/ float(self.statistics.executed)
)
),
verb + ' latency min': min(latencies),
verb + ' latency max': max(latencies),
verb + ' latency mean': sum(latencies) / len(latencies),
verb + ' latency median': self._percentile(latencies, 0.5),
verb + ' latency 95%\'ile': self._percentile(latencies, 0.95),
verb + ' latency 99%\'ile': self._percentile(latencies, 0.99),
verb + ' latency 99.9%\'ile': self._percentile(latencies, 0.999),
}
@staticmethod
def _percentile(sorted_list, percent):
# NOTE(sileht): we don't to want depends on numpy
if not sorted_list:
return None
k = (len(sorted_list) - 1) * percent
f = math.floor(k)
c = math.ceil(k)
if f == c:
return sorted_list[int(k)]
d0 = sorted_list[int(f)] * (c - k)
d1 = sorted_list[int(c)] * (k - f)
return d0 + d1
class CliBenchmarkBase(show.ShowOne):
def get_parser(self, prog_name):
parser = super(CliBenchmarkBase, self).get_parser(prog_name)
parser.add_argument("--workers", "-w",
default=None,
type=_positive_non_zero_int,
help="Number of workers to use")
return parser
class CliBenchmarkMetricShow(CliBenchmarkBase,
metric_cli.CliMetricWithResourceID):
"""Do benchmark testing of metric show"""
def get_parser(self, prog_name):
parser = super(CliBenchmarkMetricShow, self).get_parser(prog_name)
parser.add_argument("metric", nargs='+',
help="ID or name of the metrics")
parser.add_argument("--count", "-n",
required=True,
type=_positive_non_zero_int,
help="Number of metrics to get")
return parser
def take_action(self, parsed_args):
pool = BenchmarkPool(parsed_args.workers)
LOG.info("Getting metrics")
futures = pool.map_job(self.app.client.metric.get,
parsed_args.metric * parsed_args.count,
resource_id=parsed_args.resource_id)
result, runtime, stats = pool.wait_job("show", futures)
return self.dict2columns(stats)
class CliBenchmarkMetricCreate(CliBenchmarkBase,
metric_cli.CliMetricCreateBase):
"""Do benchmark testing of metric creation"""
def get_parser(self, prog_name):
parser = super(CliBenchmarkMetricCreate, self).get_parser(prog_name)
parser.add_argument("--count", "-n",
required=True,
type=_positive_non_zero_int,
help="Number of metrics to create")
parser.add_argument("--keep", "-k",
action='store_true',
help="Keep created metrics")
return parser
def _take_action(self, metric, parsed_args):
pool = BenchmarkPool(parsed_args.workers)
LOG.info("Creating metrics")
futures = pool.submit_job(parsed_args.count,
self.app.client.metric.create,
metric, refetch_metric=False)
created_metrics, runtime, stats = pool.wait_job("create", futures)
if not parsed_args.keep:
LOG.info("Deleting metrics")
pool = BenchmarkPool(parsed_args.workers)
futures = pool.map_job(self.app.client.metric.delete,
[m['id'] for m in created_metrics])
_, runtime, dstats = pool.wait_job("delete", futures)
stats.update(dstats)
return self.dict2columns(stats)
class CliBenchmarkMeasuresAdd(CliBenchmarkBase,
metric_cli.CliMeasuresAddBase):
"""Do benchmark testing of adding measurements"""
def get_parser(self, prog_name):
parser = super(CliBenchmarkMeasuresAdd, self).get_parser(prog_name)
parser.add_argument("--count", "-n",
required=True,
type=_positive_non_zero_int,
help="Number of total measures to send")
parser.add_argument("--batch", "-b",
default=1,
type=_positive_non_zero_int,
help="Number of measures to send in each batch")
parser.add_argument("--timestamp-start", "-s",
default=(
timeutils.utcnow(True)
- datetime.timedelta(days=365)),
type=timeutils.parse_isotime,
help="First timestamp to use")
parser.add_argument("--timestamp-end", "-e",
default=timeutils.utcnow(True),
type=timeutils.parse_isotime,
help="Last timestamp to use")
return parser
def take_action(self, parsed_args):
pool = BenchmarkPool(parsed_args.workers)
LOG.info("Sending measures")
if parsed_args.timestamp_end <= parsed_args.timestamp_start:
raise ValueError("End timestamp must be after start timestamp")
# If batch size is bigger than the number of measures to send, we
# reduce it to make sure we send something.
if parsed_args.batch > parsed_args.count:
parsed_args.batch = parsed_args.count
start = int(parsed_args.timestamp_start.strftime("%s"))
end = int(parsed_args.timestamp_end.strftime("%s"))
count = parsed_args.count
if (end - start) < count:
raise ValueError(
"The specified time range is not large enough "
"for the number of points")
random_values = (random.randint(- 2 ** 32, 2 ** 32)
for _ in six.moves.range(count))
all_measures = ({"timestamp": ts, "value": v}
for ts, v
in six.moves.zip(
six.moves.range(start,
end,
(end - start) // count),
random_values))
measures = grouper(all_measures, parsed_args.batch)
futures = pool.map_job(functools.partial(
self.app.client.metric.add_measures,
parsed_args.metric), measures, resource_id=parsed_args.resource_id)
_, runtime, stats = pool.wait_job("push", futures)
stats['measures per request'] = parsed_args.batch
stats['measures push speed'] = (
"%.2f push/s" % (
parsed_args.batch * pool.statistics.executed / runtime
)
)
return self.dict2columns(stats)
class CliBenchmarkMeasuresShow(CliBenchmarkBase,
metric_cli.CliMeasuresShow):
"""Do benchmark testing of measurements show"""
def get_parser(self, prog_name):
parser = super(CliBenchmarkMeasuresShow, self).get_parser(prog_name)
parser.add_argument("--count", "-n",
required=True,
type=_positive_non_zero_int,
help="Number of total measures to send")
return parser
def take_action(self, parsed_args):
pool = BenchmarkPool(parsed_args.workers)
LOG.info("Getting measures")
futures = pool.submit_job(parsed_args.count,
self.app.client.metric.get_measures,
metric=parsed_args.metric,
resource_id=parsed_args.resource_id,
aggregation=parsed_args.aggregation,
start=parsed_args.start,
stop=parsed_args.stop)
result, runtime, stats = pool.wait_job("show", futures)
return self.dict2columns(stats)

View File

@ -11,8 +11,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import re
class ClientException(Exception):
"""The base exception class for all exceptions this library raises."""
@ -76,26 +74,6 @@ class NotFound(ClientException):
message = "Not found"
class MetricNotFound(NotFound, MutipleMeaningException):
message = "Metric not found"
match = re.compile("Metric .* does not exist")
class ResourceNotFound(NotFound, MutipleMeaningException):
message = "Resource not found"
match = re.compile("Resource .* does not exist")
class ArchivePolicyNotFound(NotFound, MutipleMeaningException):
message = "Archive policy not found"
match = re.compile("Archive policy .* does not exist")
class ArchivePolicyRuleNotFound(NotFound, MutipleMeaningException):
message = "Archive policy rule not found"
match = re.compile("Archive policy rule .* does not exist")
class MethodNotAllowed(ClientException):
"""HTTP 405 - Method Not Allowed"""
http_status = 405
@ -114,26 +92,6 @@ class Conflict(ClientException):
message = "Conflict"
class NamedMetricAreadyExists(Conflict, MutipleMeaningException):
message = "Named metric already exists"
match = re.compile("Named metric .* does not exist")
class ResourceAlreadyExists(Conflict, MutipleMeaningException):
message = "Resource already exists"
match = re.compile("Resource .* already exists")
class ArchivePolicyAlreadyExists(Conflict, MutipleMeaningException):
message = "Archive policy already exists"
match = re.compile("Archive policy .* already exists")
class ArchivePolicyRuleAlreadyExists(Conflict, MutipleMeaningException):
message = "Archive policy rule already exists"
match = re.compile("Archive policy Rule .* already exists")
class OverLimit(RetryAfterException):
"""HTTP 413 - Over limit:
@ -164,13 +122,7 @@ class NotImplemented(ClientException):
_error_classes = [BadRequest, Unauthorized, Forbidden, NotFound,
MethodNotAllowed, NotAcceptable, Conflict, OverLimit,
RateLimit, NotImplemented]
_error_classes_enhanced = {
NotFound: [MetricNotFound, ResourceNotFound, ArchivePolicyNotFound,
ArchivePolicyRuleNotFound],
Conflict: [NamedMetricAreadyExists, ResourceAlreadyExists,
ArchivePolicyAlreadyExists,
ArchivePolicyRuleAlreadyExists]
}
_error_classes_enhanced = {}
_code_map = dict(
(c.http_status, (c, _error_classes_enhanced.get(c, [])))
for c in _error_classes)

View File

@ -23,48 +23,15 @@ from cliff import commandmanager
from keystoneauth1 import exceptions
from keystoneauth1 import loading
from aodhclient import benchmark
from aodhclient import client
from aodhclient import noauth
from aodhclient.v1 import archive_policy_cli
from aodhclient.v1 import archive_policy_rule_cli as ap_rule_cli
from aodhclient.v1 import capabilities_cli
from aodhclient.v1 import metric_cli
from aodhclient.v1 import resource_cli
from aodhclient.v1 import status_cli
from aodhclient.v2 import capabilities_cli
from aodhclient.version import __version__
class AodhCommandManager(commandmanager.CommandManager):
SHELL_COMMANDS = {
"status": status_cli.CliStatusShow,
"resource list": resource_cli.CliResourceList,
"resource show": resource_cli.CliResourceShow,
"resource history": resource_cli.CliResourceHistory,
"resource search": resource_cli.CliResourceSearch,
"resource create": resource_cli.CliResourceCreate,
"resource update": resource_cli.CliResourceUpdate,
"resource delete": resource_cli.CliResourceDelete,
"archive-policy list": archive_policy_cli.CliArchivePolicyList,
"archive-policy show": archive_policy_cli.CliArchivePolicyShow,
"archive-policy create": archive_policy_cli.CliArchivePolicyCreate,
"archive-policy delete": archive_policy_cli.CliArchivePolicyDelete,
"archive-policy-rule list": ap_rule_cli.CliArchivePolicyRuleList,
"archive-policy-rule show": ap_rule_cli.CliArchivePolicyRuleShow,
"archive-policy-rule create": ap_rule_cli.CliArchivePolicyRuleCreate,
"archive-policy-rule delete": ap_rule_cli.CliArchivePolicyRuleDelete,
"metric list": metric_cli.CliMetricList,
"metric show": metric_cli.CliMetricShow,
"metric create": metric_cli.CliMetricCreate,
"metric delete": metric_cli.CliMetricDelete,
"measures show": metric_cli.CliMeasuresShow,
"measures add": metric_cli.CliMeasuresAdd,
"measures aggregation": metric_cli.CliMeasuresAggregation,
"capabilities list": capabilities_cli.CliCapabilitiesList,
"benchmark metric create": benchmark.CliBenchmarkMetricCreate,
"benchmark metric show": benchmark.CliBenchmarkMetricShow,
"benchmark measures add": benchmark.CliBenchmarkMeasuresAdd,
"benchmark measures show": benchmark.CliBenchmarkMeasuresShow,
}
def load_commands(self, namespace):
@ -96,7 +63,7 @@ class AodhShell(app.App):
:paramtype version: str
"""
parser = super(AodhShell, self).build_option_parser(description,
version)
version)
# Global arguments, one day this should go to keystoneauth1
parser.add_argument(
'--os-region-name',

View File

@ -38,8 +38,8 @@ class AodhClient(object):
creds = ("--os-auth-plugin aodh-noauth "
"--user-id %s --project-id %s "
"--aodh-endpoint %s") % (self.user_id,
self.project_id,
self.endpoint)
self.project_id,
self.endpoint)
flags = creds + ' ' + flags

View File

@ -10,13 +10,13 @@
# License for the specific language governing permissions and limitations
# under the License.
from gnocchiclient.tests.functional import base
from aodhclient.tests.functional import base
class CapabilitiesClientTest(base.ClientTestBase):
def test_capabilities_scenario(self):
# GET
result = self.gnocchi('capabilities', params="list")
result = self.aodh('capabilities', params="list")
caps = self.parser.listing(result)[0]
self.assertIsNotNone(caps)
self.assertEqual('aggregation_methods', caps['Field'])

View File

@ -14,12 +14,7 @@
# under the License.
from aodhclient import client
from aodhclient.v1 import archive_policy
from aodhclient.v1 import archive_policy_rule
from aodhclient.v1 import capabilities
from aodhclient.v1 import metric
from aodhclient.v1 import resource
from aodhclient.v1 import status
from aodhclient.v2 import capabilities
class Client(object):
@ -29,14 +24,8 @@ class Client(object):
:type session: :py:class:`keystoneauth.adapter.Adapter`
"""
def __init__(self, session=None, service_type='metric', **kwargs):
def __init__(self, session=None, service_type='alarming', **kwargs):
"""Initialize a new client for the Aodh v2 API."""
self.api = client.SessionClient(session, service_type=service_type,
**kwargs)
self.resource = resource.ResourceManager(self)
self.archive_policy = archive_policy.ArchivePolicyManager(self)
self.archive_policy_rule = (
archive_policy_rule.ArchivePolicyRuleManager(self))
self.metric = metric.MetricManager(self)
self.capabilities = capabilities.CapabilitiesManager(self)
self.status = status.StatusManager(self)

View File

@ -11,7 +11,7 @@ Usage
To use aodhclient in a project::
>>> from aodhclient.v1 import client
>>> from aodhclient.v2 import client
>>> aodh = client.Client(...)
>>> aodh.alarm.list("alarm")
@ -23,5 +23,5 @@ For more information, see the reference:
.. toctree::
:maxdepth: 2
ref/v1/index
ref/v2/index

View File

@ -55,8 +55,7 @@ def gen_ref(ver, title, names):
"signs": "=" * len(name),
"pkg": pkg, "name": name})
gen_ref("v1", "Version 1 API", ["client", "resource", "archive_policy",
"archive_policy_rule", "metric"])
gen_ref("v2", "Version 2 API", ["client"])
# -- General configuration ----------------------------------------------------

View File

@ -17,43 +17,36 @@ clean_exit () {
return $error_code
}
GNOCCHI_DATA=`mktemp -d /tmp/gnocchi-data-XXXXX`
MYSQL_DATA=`mktemp -d /tmp/gnocchi-mysql-XXXXX`
trap "clean_exit \"$GNOCCHI_DATA\" \"$MYSQL_DATA\"" EXIT
AODH_DATA=`mktemp -d /tmp/aodh-data-XXXXX`
MYSQL_DATA=`mktemp -d /tmp/aodh-mysql-XXXXX`
trap "clean_exit \"$AODH_DATA\" \"$MYSQL_DATA\"" EXIT
mkfifo ${MYSQL_DATA}/out
PATH=$PATH:/usr/libexec
mysqld --no-defaults --datadir=${MYSQL_DATA} --pid-file=${MYSQL_DATA}/mysql.pid --socket=${MYSQL_DATA}/mysql.socket --skip-networking --skip-grant-tables &> ${MYSQL_DATA}/out &
# Wait for MySQL to start listening to connections
wait_for_line "mysqld: ready for connections." ${MYSQL_DATA}/out
export GNOCCHI_TEST_INDEXER_URL="mysql+pymysql://root@localhost/test?unix_socket=${MYSQL_DATA}/mysql.socket&charset=utf8"
export AODH_TEST_STORAGE_URL="mysql+pymysql://root@localhost/test?unix_socket=${MYSQL_DATA}/mysql.socket&charset=utf8"
mysql --no-defaults -S ${MYSQL_DATA}/mysql.socket -e 'CREATE DATABASE test;'
mkfifo ${GNOCCHI_DATA}/out
echo '{"default": ""}' > ${GNOCCHI_DATA}/policy.json
cat > ${GNOCCHI_DATA}/gnocchi.conf <<EOF
echo '{"default": ""}' > ${AODH_DATA}/policy.json
cat > ${AODH_DATA}/aodh.conf <<EOF
[oslo_policy]
policy_file = ${GNOCCHI_DATA}/policy.json
[storage]
metric_processing_delay = 1
file_basepath = ${GNOCCHI_DATA}
driver = file
coordination_url = file://${GNOCCHI_DATA}
[indexer]
url = mysql+pymysql://root@localhost/test?unix_socket=${MYSQL_DATA}/mysql.socket&charset=utf8
policy_file = ${AODH_DATA}/policy.json
[database]
connection = mysql+pymysql://root@localhost/test?unix_socket=${MYSQL_DATA}/mysql.socket&charset=utf8
EOF
cat <<EOF > ${GNOCCHI_DATA}/api-paste.ini
cat <<EOF > ${AODH_DATA}/api-paste.ini
[pipeline:main]
pipeline = gnocchi
[app:gnocchi]
paste.app_factory = gnocchi.rest.app:app_factory
pipeline = aodh
[app:aodh]
paste.app_factory = aodh.rest.app:app_factory
EOF
gnocchi-upgrade --config-file ${GNOCCHI_DATA}/gnocchi.conf
gnocchi-metricd --config-file ${GNOCCHI_DATA}/gnocchi.conf &>/dev/null &
gnocchi-api --config-file ${GNOCCHI_DATA}/gnocchi.conf &> ${GNOCCHI_DATA}/out &
# Wait for Gnocchi to start
wait_for_line "Running on http://0.0.0.0:8041/" ${GNOCCHI_DATA}/out
export GNOCCHI_ENDPOINT=http://localhost:8041/
aodh-dbsync --config-file ${AODH_DATA}/aodh.conf
aodh-api --config-file ${AODH_DATA}/aodh.conf &> ${AODH_DATA}/out &
# Wait for Aodh to start
wait_for_line "Running on http://0.0.0.0:8042/" ${AODH_DATA}/out
export AODH_ENDPOINT=http://localhost:8042/
$*