Adding the TestManager code and the timerthread

Change-Id: I0cba5db5b3875bf450bbd7ffe85d7c90ff2966ab
This commit is contained in:
Anand Shanmugam 2015-06-23 11:17:15 -07:00
parent 623cbb217c
commit 23427d1cb6
5 changed files with 232 additions and 3 deletions

View File

@ -0,0 +1,145 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from cloudpulse.common import context as cloudpulse_context
from cloudpulse.common.plugin import discover
from cloudpulse.db.sqlalchemy import api as dbapi
from cloudpulse import objects
from cloudpulse.openstack.common import service as os_service
from cloudpulse.scenario import base
import logging
from oslo_config import cfg
from oslo_utils import importutils
import textwrap
import threading
cfg.CONF.import_opt('auth_uri', 'keystonemiddleware.auth_token',
group='keystone_authtoken')
CONF = cfg.CONF
dblock = threading.RLock()
def acquireLock():
dblock.acquire()
def releaseLock():
dblock.release()
LOG = logging.getLogger(__name__)
class Periodic_Task(object):
def __init__(self, task):
self.task = task
def create_task_entry(self, context):
test = {}
test['state'] = 'created'
test['testtype'] = 'periodic'
test['name'] = self.task
new_test = objects.Cpulse(context, **test)
acquireLock()
new_test.create()
releaseLock()
return new_test
def run_task(self):
importutils.import_module('keystonemiddleware.auth_token')
username = cfg.CONF.keystone_authtoken.username
tenant_name = cfg.CONF.keystone_authtoken.project_name
# password = cfg.CONF.keystone_authtoken.password
auth_url = cfg.CONF.keystone_authtoken.auth_uri
context = cloudpulse_context.make_context(
auth_url=auth_url,
user=username,
project=tenant_name)
new_test = self.create_task_entry(context)
test_manager.run(test=new_test)
class Periodic_TestManager(os_service.Service):
def __init__(self):
super(Periodic_TestManager, self).__init__()
def start(self):
tasks = CONF.periodic_tests
for key in tasks.keys():
interval, task_name = tasks[key], key
if int(interval) > 0:
period_task = Periodic_Task(task_name)
self.tg.add_timer(interval, self.interval_task,
task=period_task)
@staticmethod
def interval_task(task):
task.run_task()
class TestManager(object):
def __init__(self):
self.command_ref = {}
discover.import_modules_from_package("cloudpulse.scenario.plugins")
for scenario_group in discover.itersubclasses(base.Scenario):
for method in dir(scenario_group):
scenario = scenario_group()
callback = getattr(scenario, method)
self.command_ref[method] = callback
def run(self, **kwargs):
Test = kwargs['test']
func = self.command_ref[Test['name']]
Test['state'] = 'running'
acquireLock()
self.update_test(Test['uuid'], Test)
releaseLock()
result = func()
if result[0] == 200:
Test['state'] = 'finished'
Test['result'] = 'success'
else:
Test['state'] = 'failed'
Test['result'] = textwrap.fill(str(result[1]), 40)
acquireLock()
self.update_test(Test['uuid'], Test)
releaseLock()
def run_periodic(self, **kwargs):
Test = kwargs['test']
func = self.command_ref[Test['name']]
result = func()
if result[0] == 200:
Test['result'] = 'success'
else:
Test['state'] = 'failed'
Test['result'] = textwrap.fill(str(result[1]), 40)
acquireLock()
self.update_test(Test['uuid'], Test)
releaseLock()
def update_test(self, tuuid, patch):
npatch = {}
npatch['state'] = patch['state']
npatch['id'] = patch['id']
npatch['name'] = patch['name']
npatch['result'] = patch['result']
conn = dbapi.get_backend()
conn.update_test(tuuid, npatch)
return npatch
test_manager = TestManager()

View File

View File

@ -16,6 +16,10 @@ import pecan
from cloudpulse.api import auth
from cloudpulse.api import config as api_config
from cloudpulse.api import middleware
from cloudpulse.common.timerthread import cpulseTimer
from cloudpulse.common.timerthread import timerfunc
from cloudpulse.TestManager.TestManager import Periodic_TestManager
# Register options for the service
bind_opts = [
@ -67,4 +71,7 @@ def setup_app(config=None):
# TBD Add test hook later
# cpulseTimer(10, timerfunc, "Cpulse")
cpulseTimer(30, timerfunc, "Cpulse")
tm = Periodic_TestManager()
tm.start()
return auth.install(app, CONF, config.app.acl_public_routes)

View File

@ -0,0 +1,80 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from cloudpulse import objects
from cloudpulse.openstack.common._i18n import _LI
from cloudpulse.openstack.common import log as logging
from cloudpulse.TestManager import TestManager
import sys
import threading
from threading import Timer
LOG = logging.getLogger(__name__)
dblock = threading.RLock()
test_manager = TestManager.TestManager()
def acquireLock():
dblock.acquire()
def releaseLock():
dblock.release()
class cpulseTimer(object):
def __init__(self, interval, function, *args, **kwargs):
self._timer = None
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.is_running = False
self.start()
def _run(self):
self.is_running = False
self.start()
self.function(self, *self.args, **self.kwargs)
def start(self):
if not self.is_running:
self._timer = Timer(self.interval, self._run)
self._timer.start()
self.is_running = True
def test_run(**kwargs):
LOG.debug(_LI('Running Openstack test%s') % kwargs['test']['name'])
test_manager.run(**kwargs)
testthreads = []
def timerfunc(*args, **kwargs):
context = None
acquireLock()
tests = objects.Cpulse.list(context)
releaseLock()
for test in tests:
LOG.debug(_LI('Dumping REPFUNCTION %s') % test['uuid'])
if test['state'] == 'created' and test['testtype'] == 'manual':
methodtocall = getattr(sys.modules[__name__], 'test_run')
# methodtocall()
testthr = threading.Thread(name=test['name'],
target=methodtocall,
kwargs={'test': test})
testthreads.append(testthr)
testthr.start()
LOG.debug(_LI('REPFUNCTION, exec test %s') % test['name'])

View File

@ -133,9 +133,6 @@ class Connection(api.Connection):
if not values.get('uuid'):
values['uuid'] = utils.generate_uuid()
if not values.get('id'):
values['id'] = 777
cpulse = models.cpulse()
cpulse.update(values)
# TODO(VINOD)