From 23427d1cb6101d3aa716268f9258d314201d441d Mon Sep 17 00:00:00 2001 From: Anand Shanmugam Date: Tue, 23 Jun 2015 11:17:15 -0700 Subject: [PATCH] Adding the TestManager code and the timerthread Change-Id: I0cba5db5b3875bf450bbd7ffe85d7c90ff2966ab --- cloudpulse/TestManager/TestManager.py | 145 ++++++++++++++++++++++++++ cloudpulse/TestManager/__init__.py | 0 cloudpulse/api/app.py | 7 ++ cloudpulse/common/timerthread.py | 80 ++++++++++++++ cloudpulse/db/sqlalchemy/api.py | 3 - 5 files changed, 232 insertions(+), 3 deletions(-) create mode 100644 cloudpulse/TestManager/TestManager.py create mode 100644 cloudpulse/TestManager/__init__.py create mode 100755 cloudpulse/common/timerthread.py diff --git a/cloudpulse/TestManager/TestManager.py b/cloudpulse/TestManager/TestManager.py new file mode 100644 index 0000000..2ee1114 --- /dev/null +++ b/cloudpulse/TestManager/TestManager.py @@ -0,0 +1,145 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from cloudpulse.common import context as cloudpulse_context +from cloudpulse.common.plugin import discover +from cloudpulse.db.sqlalchemy import api as dbapi +from cloudpulse import objects +from cloudpulse.openstack.common import service as os_service +from cloudpulse.scenario import base +import logging +from oslo_config import cfg +from oslo_utils import importutils +import textwrap +import threading + +cfg.CONF.import_opt('auth_uri', 'keystonemiddleware.auth_token', + group='keystone_authtoken') + + +CONF = cfg.CONF + +dblock = threading.RLock() + + +def acquireLock(): + dblock.acquire() + + +def releaseLock(): + dblock.release() + + +LOG = logging.getLogger(__name__) + + +class Periodic_Task(object): + def __init__(self, task): + self.task = task + + def create_task_entry(self, context): + test = {} + test['state'] = 'created' + test['testtype'] = 'periodic' + test['name'] = self.task + new_test = objects.Cpulse(context, **test) + acquireLock() + new_test.create() + releaseLock() + return new_test + + def run_task(self): + importutils.import_module('keystonemiddleware.auth_token') + username = cfg.CONF.keystone_authtoken.username + tenant_name = cfg.CONF.keystone_authtoken.project_name + # password = cfg.CONF.keystone_authtoken.password + auth_url = cfg.CONF.keystone_authtoken.auth_uri + + context = cloudpulse_context.make_context( + auth_url=auth_url, + user=username, + project=tenant_name) + + new_test = self.create_task_entry(context) + test_manager.run(test=new_test) + + +class Periodic_TestManager(os_service.Service): + def __init__(self): + super(Periodic_TestManager, self).__init__() + + def start(self): + tasks = CONF.periodic_tests + for key in tasks.keys(): + interval, task_name = tasks[key], key + if int(interval) > 0: + period_task = Periodic_Task(task_name) + self.tg.add_timer(interval, self.interval_task, + task=period_task) + + @staticmethod + def interval_task(task): + task.run_task() + + +class TestManager(object): + def __init__(self): + self.command_ref = {} + discover.import_modules_from_package("cloudpulse.scenario.plugins") + for scenario_group in discover.itersubclasses(base.Scenario): + for method in dir(scenario_group): + scenario = scenario_group() + callback = getattr(scenario, method) + self.command_ref[method] = callback + + def run(self, **kwargs): + Test = kwargs['test'] + func = self.command_ref[Test['name']] + Test['state'] = 'running' + acquireLock() + self.update_test(Test['uuid'], Test) + releaseLock() + result = func() + if result[0] == 200: + Test['state'] = 'finished' + Test['result'] = 'success' + else: + Test['state'] = 'failed' + Test['result'] = textwrap.fill(str(result[1]), 40) + acquireLock() + self.update_test(Test['uuid'], Test) + releaseLock() + + def run_periodic(self, **kwargs): + Test = kwargs['test'] + func = self.command_ref[Test['name']] + result = func() + if result[0] == 200: + Test['result'] = 'success' + else: + Test['state'] = 'failed' + Test['result'] = textwrap.fill(str(result[1]), 40) + acquireLock() + self.update_test(Test['uuid'], Test) + releaseLock() + + def update_test(self, tuuid, patch): + npatch = {} + npatch['state'] = patch['state'] + npatch['id'] = patch['id'] + npatch['name'] = patch['name'] + npatch['result'] = patch['result'] + conn = dbapi.get_backend() + conn.update_test(tuuid, npatch) + return npatch + +test_manager = TestManager() diff --git a/cloudpulse/TestManager/__init__.py b/cloudpulse/TestManager/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cloudpulse/api/app.py b/cloudpulse/api/app.py index 450a64c..b776dd4 100644 --- a/cloudpulse/api/app.py +++ b/cloudpulse/api/app.py @@ -16,6 +16,10 @@ import pecan from cloudpulse.api import auth from cloudpulse.api import config as api_config from cloudpulse.api import middleware +from cloudpulse.common.timerthread import cpulseTimer +from cloudpulse.common.timerthread import timerfunc +from cloudpulse.TestManager.TestManager import Periodic_TestManager + # Register options for the service bind_opts = [ @@ -67,4 +71,7 @@ def setup_app(config=None): # TBD Add test hook later # cpulseTimer(10, timerfunc, "Cpulse") + cpulseTimer(30, timerfunc, "Cpulse") + tm = Periodic_TestManager() + tm.start() return auth.install(app, CONF, config.app.acl_public_routes) diff --git a/cloudpulse/common/timerthread.py b/cloudpulse/common/timerthread.py new file mode 100755 index 0000000..8f9bffb --- /dev/null +++ b/cloudpulse/common/timerthread.py @@ -0,0 +1,80 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from cloudpulse import objects +from cloudpulse.openstack.common._i18n import _LI +from cloudpulse.openstack.common import log as logging +from cloudpulse.TestManager import TestManager +import sys +import threading +from threading import Timer + +LOG = logging.getLogger(__name__) +dblock = threading.RLock() + +test_manager = TestManager.TestManager() + + +def acquireLock(): + dblock.acquire() + + +def releaseLock(): + dblock.release() + + +class cpulseTimer(object): + def __init__(self, interval, function, *args, **kwargs): + self._timer = None + self.interval = interval + self.function = function + self.args = args + self.kwargs = kwargs + self.is_running = False + self.start() + + def _run(self): + self.is_running = False + self.start() + self.function(self, *self.args, **self.kwargs) + + def start(self): + if not self.is_running: + self._timer = Timer(self.interval, self._run) + self._timer.start() + self.is_running = True + + +def test_run(**kwargs): + LOG.debug(_LI('Running Openstack test%s') % kwargs['test']['name']) + test_manager.run(**kwargs) + + +testthreads = [] + + +def timerfunc(*args, **kwargs): + context = None + acquireLock() + tests = objects.Cpulse.list(context) + releaseLock() + for test in tests: + LOG.debug(_LI('Dumping REPFUNCTION %s') % test['uuid']) + if test['state'] == 'created' and test['testtype'] == 'manual': + methodtocall = getattr(sys.modules[__name__], 'test_run') + # methodtocall() + testthr = threading.Thread(name=test['name'], + target=methodtocall, + kwargs={'test': test}) + testthreads.append(testthr) + testthr.start() + LOG.debug(_LI('REPFUNCTION, exec test %s') % test['name']) diff --git a/cloudpulse/db/sqlalchemy/api.py b/cloudpulse/db/sqlalchemy/api.py index 29626e2..eebcf9b 100644 --- a/cloudpulse/db/sqlalchemy/api.py +++ b/cloudpulse/db/sqlalchemy/api.py @@ -133,9 +133,6 @@ class Connection(api.Connection): if not values.get('uuid'): values['uuid'] = utils.generate_uuid() - if not values.get('id'): - values['id'] = 777 - cpulse = models.cpulse() cpulse.update(values) # TODO(VINOD)