Adding the scenarios module and the scenarios

Change-Id: Id7948b1bbf3c2a83d1d26dd6c8512bb9c8912ce8
Implements: blueprint cloudpulse-ext-handler
This commit is contained in:
Anand Shanmugam 2015-06-23 11:06:42 -07:00
parent 51aa7f5e8c
commit 623cbb217c
7 changed files with 354 additions and 0 deletions

View File

222
cloudpulse/scenario/base.py Normal file
View File

@ -0,0 +1,222 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from cloudpulse.common import exception
from cloudpulse.common.plugin import discover
import itertools
def scenario(admin_only=False, operator=False, context=None):
"""Add extra fields to benchmark scenarios methods.
This method is used as decorator for the methods of benchmark scenarios
and it adds following extra fields to the methods.
'is_scenario' is set to True
'admin_only' is set to True if a scenario require admin endpoints
'operator' is set to True if the scenario is an operator scenario
"""
def wrapper(func):
func.is_scenario = True
func.admin_only = admin_only
func.operator = operator
func.context = context or {}
return func
return wrapper
class Scenario(object):
"""This is base class for any benchmark scenario.
You should create subclass of this class. And your test scenarios will
be auto discoverable and you will be able to specify it in test config.
"""
def __init__(self, context=None, admin_tests=None,
tenant_tests=None, operator_tests=None):
self._admin_tests = admin_tests
self.tenant_tests = tenant_tests
self.operator_tests = operator_tests
@staticmethod
def get_by_name(name):
"""Returns Scenario class by name."""
for scenario in discover.itersubclasses(Scenario):
if name == scenario.__name__:
return scenario
raise exception.NoSuchScenario(name=name)
@staticmethod
def is_scenario(cls, method_name):
"""Check whether a given method in scenario class is a scenario.
:param cls: scenario class
:param method_name: method name
:returns: True if the method is a benchmark scenario, False otherwise
"""
try:
getattr(cls, method_name)
except Exception:
return False
return Scenario.meta(cls, "is_scenario", method_name, default=False)
@staticmethod
def is_admin(cls, method_name):
"""Check whether a given method in scenario class is a scenario.
:param cls: scenario class
:param method_name: method name
:returns: True if the method is a benchmark scenario, False otherwise
"""
try:
getattr(cls, method_name)
except Exception:
return False
return Scenario.meta(cls, "admin_only", method_name, default=False)
@staticmethod
def is_operator(cls, method_name):
"""Check whether a given method in scenario class is a scenario.
:param cls: scenario class
:param method_name: method name
:returns: True if the method is a benchmark scenario, False otherwise
"""
try:
getattr(cls, method_name)
except Exception:
return False
return Scenario.meta(cls, "operator", method_name, default=False)
@classmethod
def list_operator_scenarios(scenario_cls):
"""Lists all the existing methods in the operator scenario classes.
Returns the method names in format <Class name>.<Method name>, which
is used in the test config.
:returns: List of strings
"""
scenario_classes = (list(discover.itersubclasses(scenario_cls)) +
[scenario_cls])
scenarios_list = [
["%s.%s" % (scenario.__name__, func)
for func in dir(scenario)
if Scenario.is_scenario(scenario, func)
and Scenario.is_operator(scenario, func)]
for scenario in scenario_classes
]
operator_scenarios = list(
itertools.chain.from_iterable(scenarios_list))
return operator_scenarios
@classmethod
def list_admin_scenarios(scenario_cls):
"""Lists all the existing methods in the operator scenario classes.
Returns the method names in format <Class name>.<Method name>, which
is used in the test config.
:returns: List of strings
"""
scenario_classes = (list(discover.itersubclasses(scenario_cls)) +
[scenario_cls])
scenarios_list = [
["%s.%s" % (scenario.__name__, func)
for func in dir(scenario)
if Scenario.is_scenario(scenario, func) and
Scenario.is_admin(scenario, func)]
for scenario in scenario_classes
]
scenarios_list_admin = list(
itertools.chain.from_iterable(scenarios_list))
return scenarios_list_admin
@classmethod
def list_tenant_scenarios(scenario_cls):
"""Lists all the existing methods in the operator scenario classes.
Returns the method names in format <Class name>.<Method name>, which
is used in the test config.
:returns: List of strings
"""
scenario_classes = (list(discover.itersubclasses(scenario_cls)) +
[scenario_cls])
scenarios_list = [
["%s.%s" % (scenario.__name__, func)
for func in dir(scenario)
if Scenario.is_scenario(scenario, func) and
not Scenario.is_admin(scenario, func) and
not Scenario.is_operator(scenario, func)]
for scenario in scenario_classes
]
tenant_scenarios = list(
itertools.chain.from_iterable(scenarios_list))
return tenant_scenarios
@classmethod
def list_all_scenarios(scenario_cls):
"""Lists all the existing methods in the operator scenario classes.
Returns the method names in format <Class name>.<Method name>, which
is used in the test config.
:returns: List of strings
"""
scenario_classes = (list(discover.itersubclasses(scenario_cls)) +
[scenario_cls])
scenarios_list = [
["%s.%s" % (scenario.__name__, func)
for func in dir(scenario) if Scenario.is_scenario(scenario, func)]
for scenario in scenario_classes
]
scenarios_list_flat = list(
itertools.chain.from_iterable(scenarios_list))
return scenarios_list_flat
@classmethod
def validate(cls, name, config, admin=None, users=None, task=None):
"""Semantic check of benchmark arguments."""
if cls.meta(name, "admin_only") is True:
print("Admin Access")
if cls.meta(name, "operator") is True:
print("Operator")
def setup(self, *args, **kwargs):
"""TODO:Implement setup and teardown"""
pass
def teardown(self, *args, **kwargs):
"""TODO:Implement setup and teardown"""
pass
@staticmethod
def meta(cls, attr_name, method_name=None, default=None):
"""Extract the named meta information out of the scenario name.
:param cls: Scenario (sub)class or string of form 'class.method'
:param attr_name: Name of method attribute holding meta information.
:param method_name: Name of method queried for meta information.
:param default: Value returned if no meta information is attached.
:returns: Meta value bound to method attribute or default.
"""
if isinstance(cls, str):
cls_name, method_name = cls.split(".", 1)
cls = Scenario.get_by_name(cls_name)
method = getattr(cls, method_name)
return getattr(method, attr_name, default)

View File

View File

@ -0,0 +1,32 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from cloudpulse.scenario import base
from oslo_config import cfg
TESTS_OPTS = [
cfg.IntOpt('dummy_cloudtest',
default=0,
help='The nova endpoint and interval')
]
CONF = cfg.CONF
periodic_test_group = cfg.OptGroup(name='periodic_tests',
title='Periodic tests to be run')
CONF.register_opts(TESTS_OPTS, periodic_test_group)
class dummy_scenario(base.Scenario):
@base.scenario(operator=True)
def dummy_cloudtest(self, *args, **kwargs):
return (200, "success", ['dummy_result'])

View File

@ -0,0 +1,100 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from cloudpulse.openstack.api.cinder_api import CinderHealth
from cloudpulse.openstack.api.glance_api import GlanceHealth
from cloudpulse.openstack.api.keystone_api import KeystoneHealth
from cloudpulse.openstack.api.neutron_api import NeutronHealth
from cloudpulse.openstack.api.nova_api import NovaHealth
from cloudpulse.scenario import base
from oslo_config import cfg
from oslo_utils import importutils
cfg.CONF.import_opt('auth_uri', 'keystonemiddleware.auth_token',
group='keystone_authtoken')
TESTS_OPTS = [
cfg.IntOpt('nova_endpoint',
default=0,
help='The nova endpoint and interval'),
cfg.IntOpt('neutron_endpoint',
default=0,
help='The neutron endpoint and interval'),
cfg.IntOpt('keystone_endpoint',
default=0,
help='The keystone endpoint and interval'),
cfg.IntOpt('glance_endpoint',
default=0,
help='The glance endpoint and interval'),
cfg.IntOpt('cinder_endpoint',
default=0,
help='The cinder endpoint and interval')
]
CONF = cfg.CONF
periodic_test_group = cfg.OptGroup(name='periodic_tests',
title='Periodic tests to be run')
CONF.register_group(periodic_test_group)
CONF.register_opts(TESTS_OPTS, periodic_test_group)
class endpoint_scenario(base.Scenario):
def _get_credentials(self):
importutils.import_module('keystonemiddleware.auth_token')
creds = {}
creds['username'] = cfg.CONF.keystone_authtoken.username
creds['tenant_name'] = cfg.CONF.keystone_authtoken.project_name
creds['password'] = cfg.CONF.keystone_authtoken.password
creds['auth_url'] = cfg.CONF.keystone_authtoken.auth_uri
return creds
def _get_nova_v2_credentials(self):
importutils.import_module('keystonemiddleware.auth_token')
creds = {}
creds['username'] = cfg.CONF.keystone_authtoken.username
creds['project_id'] = cfg.CONF.keystone_authtoken.project_name
creds['api_key'] = cfg.CONF.keystone_authtoken.password
creds['auth_url'] = cfg.CONF.keystone_authtoken.auth_uri
creds['version'] = 2
return creds
@base.scenario(admin_only=False, operator=False)
def nova_endpoint(self, *args, **kwargs):
creds = self._get_nova_v2_credentials()
nova = NovaHealth(creds)
return nova.nova_service_list()
@base.scenario(admin_only=False, operator=False)
def neutron_endpoint(self, *args, **kwargs):
creds = self._get_credentials()
neutron = NeutronHealth(creds)
return neutron.neutron_agent_list()
@base.scenario(admin_only=False, operator=False)
def keystone_endpoint(self, *args, **kwargs):
creds = self._get_credentials()
keystone = KeystoneHealth(creds)
return keystone.keystone_service_list()
@base.scenario(admin_only=False, operator=False)
def glance_endpoint(self, *args, **kwargs):
creds = self._get_credentials()
keystone = KeystoneHealth(creds)
glance = GlanceHealth(keystone)
return glance.glance_image_list()
@base.scenario(admin_only=False, operator=False)
def cinder_endpoint(self, *args, **kwargs):
creds = self._get_nova_v2_credentials()
cinder = CinderHealth(creds)
return cinder.cinder_list()