diff --git a/tricircle/common/constants.py b/tricircle/common/constants.py index 8eaf6c5..601939f 100644 --- a/tricircle/common/constants.py +++ b/tricircle/common/constants.py @@ -66,3 +66,7 @@ JS_Success = 'Success' JS_Fail = 'Fail' SP_EXTRA_ID = '00000000-0000-0000-0000-000000000000' +TOP = 'top' + +# job type +JT_ROUTER = 'router' diff --git a/tricircle/common/xrpcapi.py b/tricircle/common/xrpcapi.py index c4b64f0..32ce085 100755 --- a/tricircle/common/xrpcapi.py +++ b/tricircle/common/xrpcapi.py @@ -24,6 +24,9 @@ import rpc from serializer import TricircleSerializer as Serializer import topics +from tricircle.common import constants + + CONF = cfg.CONF rpcapi_cap_opt = cfg.StrOpt('xjobapi', @@ -78,4 +81,5 @@ class XJobAPI(object): # specifying its control exchange, so the default value "openstack" is # used, thus we need to pass exchange as "openstack" here. self.client.prepare(exchange='openstack').cast( - ctxt, 'configure_extra_routes', payload={'router': router_id}) + ctxt, 'configure_extra_routes', + payload={constants.JT_ROUTER: router_id}) diff --git a/tricircle/db/api.py b/tricircle/db/api.py index 5ddd40a..27e4ccd 100644 --- a/tricircle/db/api.py +++ b/tricircle/db/api.py @@ -14,6 +14,7 @@ # under the License. import functools +import sqlalchemy as sql import time import uuid @@ -236,6 +237,24 @@ def register_job(context, _type, resource_id): context.session.close() +def get_latest_failed_jobs(context): + jobs = [] + query = context.session.query(models.Job.type, models.Job.resource_id, + sql.func.count(models.Job.id)) + query = query.group_by(models.Job.type, models.Job.resource_id) + for job_type, resource_id, count in query: + _query = context.session.query(models.Job) + _query = _query.filter_by(type=job_type, resource_id=resource_id) + _query = _query.order_by(sql.desc('timestamp')) + # when timestamps of job entries are the same, sort entries by status + # so "Fail" job is placed before "New" and "Success" jobs + _query = _query.order_by(sql.asc('status')) + latest_job = _query[0].to_dict() + if latest_job['status'] == constants.JS_Fail: + jobs.append(latest_job) + return jobs + + def get_latest_timestamp(context, status, _type, resource_id): jobs = core.query_resource( context, models.Job, diff --git a/tricircle/tests/unit/xjob/test_xmanager.py b/tricircle/tests/unit/xjob/test_xmanager.py index a166742..4e0d506 100644 --- a/tricircle/tests/unit/xjob/test_xmanager.py +++ b/tricircle/tests/unit/xjob/test_xmanager.py @@ -267,6 +267,41 @@ class XManagerTest(unittest.TestCase): # nothing to assert, what we test is that fake_handle can exit when # timeout + def test_get_failed_jobs(self): + job_dict_list = [ + {'timestamp': datetime.datetime(2000, 1, 1, 12, 0, 0), + 'resource_id': 'uuid1', 'type': 'res1', + 'status': constants.JS_Fail}, # job_uuid1 + {'timestamp': datetime.datetime(2000, 1, 1, 12, 5, 0), + 'resource_id': 'uuid1', 'type': 'res1', + 'status': constants.JS_Fail}, # job_uuid3 + {'timestamp': datetime.datetime(2000, 1, 1, 12, 20, 0), + 'resource_id': 'uuid2', 'type': 'res2', + 'status': constants.JS_Fail}, # job_uuid5 + {'timestamp': datetime.datetime(2000, 1, 1, 12, 15, 0), + 'resource_id': 'uuid2', 'type': 'res2', + 'status': constants.JS_Fail}, # job_uuid7 + {'timestamp': datetime.datetime(2000, 1, 1, 12, 25, 0), + 'resource_id': 'uuid3', 'type': 'res3', + 'status': constants.JS_Fail}, # job_uuid9 + {'timestamp': datetime.datetime(2000, 1, 1, 12, 30, 0), + 'resource_id': 'uuid3', 'type': 'res3', + 'status': constants.JS_Success}] + for i, job_dict in enumerate(job_dict_list, 1): + job_dict['id'] = 'job_uuid%d' % (2 * i - 1) + job_dict['extra_id'] = 'extra_uuid%d' % (2 * i - 1) + core.create_resource(self.context, models.Job, job_dict) + job_dict['id'] = 'job_uuid%d' % (2 * i) + job_dict['extra_id'] = 'extra_uuid%d' % (2 * i) + job_dict['status'] = constants.JS_New + core.create_resource(self.context, models.Job, job_dict) + + # for res3 + uuid3, the latest job's status is "Success", not returned + expected_ids = ['job_uuid3', 'job_uuid5'] + returned_jobs = db_api.get_latest_failed_jobs(self.context) + actual_ids = [job['id'] for job in returned_jobs] + self.assertItemsEqual(expected_ids, actual_ids) + def tearDown(self): core.ModelBase.metadata.drop_all(core.get_engine()) for res in RES_LIST: diff --git a/tricircle/xjob/xmanager.py b/tricircle/xjob/xmanager.py index 566b74e..649ad25 100755 --- a/tricircle/xjob/xmanager.py +++ b/tricircle/xjob/xmanager.py @@ -16,6 +16,7 @@ import datetime import eventlet import netaddr +import random import six from oslo_config import cfg @@ -127,12 +128,13 @@ class XManager(PeriodicTasks): self.service_name = service_name # self.notifier = rpc.get_notifier(self.service_name, self.host) self.additional_endpoints = [] - self.clients = {'top': client.Client()} + self.clients = {constants.TOP: client.Client()} + self.job_handles = {constants.JT_ROUTER: self.configure_extra_routes} super(XManager, self).__init__() def _get_client(self, pod_name=None): if not pod_name: - return self.clients['top'] + return self.clients[constants.TOP] if pod_name not in self.clients: self.clients[pod_name] = client.Client(pod_name) return self.clients[pod_name] @@ -205,11 +207,29 @@ class XManager(PeriodicTasks): return info_text - @_job_handle('router') + @periodic_task.periodic_task + def redo_failed_job(self, ctx): + failed_jobs = db_api.get_latest_failed_jobs(ctx) + failed_jobs = [ + job for job in failed_jobs if job['type'] in self.job_handles] + if not failed_jobs: + return + # in one run we only pick one job to handle + job_index = random.randint(0, len(failed_jobs) - 1) + failed_job = failed_jobs[job_index] + job_type = failed_job['type'] + payload = {job_type: failed_job['resource_id']} + LOG.debug(_('Redo failed job for %(resource_id)s of type ' + '%(job_type)s'), + {'resource_id': failed_job['resource_id'], + 'job_type': job_type}) + self.job_handles[job_type](ctx, payload=payload) + + @_job_handle(constants.JT_ROUTER) def configure_extra_routes(self, ctx, payload): # TODO(zhiyuan) performance and reliability issue # better have a job tracking mechanism - t_router_id = payload['router'] + t_router_id = payload[constants.JT_ROUTER] b_pods, b_router_ids = zip(*db_api.get_bottom_mappings_by_top_id( ctx, t_router_id, constants.RT_ROUTER))