Asynchronous job management(part 1)

Implement asynchronous job management to ensure jobs can be
successfully completed even if those jobs temporally fail
for some reasons. The detailed design can be found in section
9 in design document.

This patch focuses on defining database schema and building
lock mechanism to avoid running the same type of jobs at the
same time.

Enabling workers to rerun failed job and purge old job records
will be covered in the following patches.

Change-Id: I87d0056a95eb7cb963e1c3599062a60299472298
This commit is contained in:
zhiyuan_cai 2016-03-02 09:15:46 +08:00
parent b756e09bbd
commit 2bc2db5ae6
7 changed files with 300 additions and 4 deletions

View File

@ -58,3 +58,11 @@ ns_bridge_port_name = 'ns_bridge_port_%s_%s_%s'
MAX_INT = 0x7FFFFFFF
expire_time = datetime.datetime(2000, 1, 1)
# job status
JS_New = 'New'
JS_Running = 'Running'
JS_Success = 'Success'
JS_Fail = 'Fail'
SP_EXTRA_ID = '00000000-0000-0000-0000-000000000000'

View File

@ -21,7 +21,9 @@ from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_log import log as logging
from oslo_utils import timeutils
from oslo_utils import uuidutils
from tricircle.common import constants
from tricircle.common.context import is_admin_context as _is_admin_context
from tricircle.common import exceptions
from tricircle.common.i18n import _
@ -202,6 +204,72 @@ def get_pod_by_name(context, pod_name):
return None
def new_job(context, _type, resource_id):
with context.session.begin():
job_dict = {'id': uuidutils.generate_uuid(),
'type': _type,
'status': constants.JS_New,
'resource_id': resource_id,
'extra_id': uuidutils.generate_uuid()}
job = core.create_resource(context, models.Job, job_dict)
return job
def register_job(context, _type, resource_id):
try:
context.session.begin()
job_dict = {'id': uuidutils.generate_uuid(),
'type': _type,
'status': constants.JS_Running,
'resource_id': resource_id,
'extra_id': constants.SP_EXTRA_ID}
job = core.create_resource(context, models.Job, job_dict)
context.session.commit()
return job
except db_exc.DBDuplicateEntry:
context.session.rollback()
return None
except db_exc.DBDeadlock:
context.session.rollback()
return None
finally:
context.session.close()
def get_latest_timestamp(context, status, _type, resource_id):
jobs = core.query_resource(
context, models.Job,
[{'key': 'status', 'comparator': 'eq', 'value': status},
{'key': 'type', 'comparator': 'eq', 'value': _type},
{'key': 'resource_id', 'comparator': 'eq', 'value': resource_id}],
[('timestamp', False)])
if jobs:
return jobs[0]['timestamp']
else:
return None
def get_running_job(context, _type, resource_id):
jobs = core.query_resource(
context, models.Job,
[{'key': 'resource_id', 'comparator': 'eq', 'value': resource_id},
{'key': 'status', 'comparator': 'eq', 'value': constants.JS_Running},
{'key': 'type', 'comparator': 'eq', 'value': _type}], [])
if jobs:
return jobs[0]
else:
return None
def finish_job(context, job_id, successful, timestamp):
status = constants.JS_Success if successful else constants.JS_Fail
with context.session.begin():
job_dict = {'status': status,
'timestamp': timestamp,
'extra_id': uuidutils.generate_uuid()}
core.update_resource(context, models.Job, job_id, job_dict)
_DEFAULT_QUOTA_NAME = 'default'

View File

@ -217,10 +217,25 @@ def upgrade(migrate_engine):
mysql_engine='InnoDB',
mysql_charset='utf8')
job = sql.Table(
'job', meta,
sql.Column('id', sql.String(length=36), primary_key=True),
sql.Column('type', sql.String(length=36)),
sql.Column('timestamp', sql.TIMESTAMP,
server_default=sql.text('CURRENT_TIMESTAMP')),
sql.Column('status', sql.String(length=36)),
sql.Column('resource_id', sql.String(length=36)),
sql.Column('extra_id', sql.String(length=36)),
migrate.UniqueConstraint(
'type', 'status', 'resource_id', 'extra_id',
name='job0type0status0resource_id0extra_id'),
mysql_engine='InnoDB',
mysql_charset='utf8')
tables = [aggregates, aggregate_metadata, instance_types,
instance_type_projects, instance_type_extra_specs, key_pairs,
quotas, quota_classes, quota_usages, reservations,
volume_types,
volume_types, job,
quality_of_service_specs, cascaded_pods_resource_routing]
for table in tables:
table.create()

View File

@ -385,3 +385,23 @@ class ResourceRouting(core.ModelBase, core.DictBase, models.TimestampMixin):
project_id = sql.Column('project_id', sql.String(length=36))
resource_type = sql.Column('resource_type', sql.String(length=64),
nullable=False)
class Job(core.ModelBase, core.DictBase):
__tablename__ = 'job'
__table_args__ = (
schema.UniqueConstraint(
'type', 'status', 'resource_id', 'extra_id',
name='job0type0status0resource_id0extra_id'),
)
attributes = ['id', 'type', 'timestamp', 'status', 'resource_id',
'extra_id']
id = sql.Column('id', sql.String(length=36), primary_key=True)
type = sql.Column('type', sql.String(length=36))
timestamp = sql.Column('timestamp', sql.TIMESTAMP,
server_default=sql.text('CURRENT_TIMESTAMP'))
status = sql.Column('status', sql.String(length=36))
resource_id = sql.Column('resource_id', sql.String(length=36))
extra_id = sql.Column('extra_id', sql.String(length=36))

View File

@ -13,15 +13,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import mock
from mock import patch
import unittest
from oslo_config import cfg
from oslo_utils import uuidutils
from tricircle.common import constants
from tricircle.common import context
import tricircle.db.api as db_api
from tricircle.db import core
from tricircle.db import models
from tricircle.xjob import xmanager
from tricircle.xjob import xservice
BOTTOM1_NETWORK = []
@ -32,7 +38,8 @@ BOTTOM1_PORT = []
BOTTOM2_PORT = []
BOTTOM1_ROUTER = []
BOTTOM2_ROUTER = []
RES_LIST = [BOTTOM1_SUBNET, BOTTOM2_SUBNET, BOTTOM1_PORT, BOTTOM2_PORT]
RES_LIST = [BOTTOM1_NETWORK, BOTTOM2_NETWORK, BOTTOM1_SUBNET, BOTTOM2_SUBNET,
BOTTOM1_PORT, BOTTOM2_PORT, BOTTOM1_ROUTER, BOTTOM2_ROUTER]
RES_MAP = {'pod_1': {'network': BOTTOM1_NETWORK,
'subnet': BOTTOM1_SUBNET,
'port': BOTTOM1_PORT,
@ -93,6 +100,10 @@ class XManagerTest(unittest.TestCase):
core.ModelBase.metadata.create_all(core.get_engine())
# enforce foreign key constraint for sqlite
core.get_engine().execute('pragma foreign_keys=on')
for opt in xservice.common_opts:
if opt.name in ('worker_handle_timeout', 'job_run_expire',
'worker_sleep_time'):
cfg.CONF.register_opt(opt)
self.context = context.Context()
self.xmanager = FakeXManager()
@ -160,7 +171,7 @@ class XManagerTest(unittest.TestCase):
'ip_address': '10.0.3.1'}]})
self.xmanager.configure_extra_routes(self.context,
{'router': top_router_id})
payload={'router': top_router_id})
calls = [mock.call(self.context, 'router_1_id',
{'router': {
'routes': [{'nexthop': '100.0.1.2',
@ -172,3 +183,91 @@ class XManagerTest(unittest.TestCase):
{'nexthop': '100.0.1.1',
'destination': '10.0.3.0/24'}]}})]
mock_update.assert_has_calls(calls)
def test_job_handle(self):
@xmanager._job_handle('fake_resource')
def fake_handle(self, ctx, payload):
pass
fake_id = 'fake_id'
payload = {'fake_resource': fake_id}
fake_handle(None, self.context, payload=payload)
jobs = core.query_resource(self.context, models.Job, [], [])
expected_status = [constants.JS_New, constants.JS_Success]
job_status = [job['status'] for job in jobs]
self.assertItemsEqual(expected_status, job_status)
self.assertEqual(fake_id, jobs[0]['resource_id'])
self.assertEqual(fake_id, jobs[1]['resource_id'])
self.assertEqual('fake_resource', jobs[0]['type'])
self.assertEqual('fake_resource', jobs[1]['type'])
def test_job_handle_exception(self):
@xmanager._job_handle('fake_resource')
def fake_handle(self, ctx, payload):
raise Exception()
fake_id = 'fake_id'
payload = {'fake_resource': fake_id}
fake_handle(None, self.context, payload=payload)
jobs = core.query_resource(self.context, models.Job, [], [])
expected_status = [constants.JS_New, constants.JS_Fail]
job_status = [job['status'] for job in jobs]
self.assertItemsEqual(expected_status, job_status)
self.assertEqual(fake_id, jobs[0]['resource_id'])
self.assertEqual(fake_id, jobs[1]['resource_id'])
self.assertEqual('fake_resource', jobs[0]['type'])
self.assertEqual('fake_resource', jobs[1]['type'])
def test_job_run_expire(self):
@xmanager._job_handle('fake_resource')
def fake_handle(self, ctx, payload):
pass
fake_id = uuidutils.generate_uuid()
payload = {'fake_resource': fake_id}
expired_job = {
'id': uuidutils.generate_uuid(),
'type': 'fake_resource',
'timestamp': datetime.datetime.now() - datetime.timedelta(0, 120),
'status': constants.JS_Running,
'resource_id': fake_id,
'extra_id': constants.SP_EXTRA_ID
}
core.create_resource(self.context, models.Job, expired_job)
fake_handle(None, self.context, payload=payload)
jobs = core.query_resource(self.context, models.Job, [], [])
expected_status = ['New', 'Fail', 'Success']
job_status = [job['status'] for job in jobs]
self.assertItemsEqual(expected_status, job_status)
for i in xrange(3):
self.assertEqual(fake_id, jobs[i]['resource_id'])
self.assertEqual('fake_resource', jobs[i]['type'])
@patch.object(db_api, 'get_running_job')
@patch.object(db_api, 'register_job')
def test_worker_handle_timeout(self, mock_register, mock_get):
@xmanager._job_handle('fake_resource')
def fake_handle(self, ctx, payload):
pass
cfg.CONF.set_override('worker_handle_timeout', 1)
mock_register.return_value = None
mock_get.return_value = None
fake_id = uuidutils.generate_uuid()
payload = {'fake_resource': fake_id}
fake_handle(None, self.context, payload=payload)
# nothing to assert, what we test is that fake_handle can exit when
# timeout
def tearDown(self):
core.ModelBase.metadata.drop_all(core.get_engine())
for res in RES_LIST:
del res[:]

View File

@ -13,7 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import eventlet
import netaddr
import six
from oslo_config import cfg
from oslo_log import log as logging
@ -23,7 +26,9 @@ from oslo_service import periodic_task
from tricircle.common import client
from tricircle.common import constants
from tricircle.common.i18n import _
from tricircle.common.i18n import _LE
from tricircle.common.i18n import _LI
from tricircle.common.i18n import _LW
import tricircle.db.api as db_api
@ -31,6 +36,78 @@ CONF = cfg.CONF
LOG = logging.getLogger(__name__)
def _job_handle(job_type):
def handle_func(func):
@six.wraps(func)
def handle_args(*args, **kwargs):
ctx = args[1]
payload = kwargs['payload']
resource_id = payload[job_type]
db_api.new_job(ctx, job_type, resource_id)
start_time = datetime.datetime.now()
while True:
current_time = datetime.datetime.now()
delta = current_time - start_time
if delta.seconds >= CONF.worker_handle_timeout:
# quit when this handle is running for a long time
break
time_new = db_api.get_latest_timestamp(ctx, constants.JS_New,
job_type, resource_id)
time_success = db_api.get_latest_timestamp(
ctx, constants.JS_Success, job_type, resource_id)
if time_success and time_success >= time_new:
break
job = db_api.register_job(ctx, job_type, resource_id)
if not job:
# fail to obtain the lock, let other worker handle the job
running_job = db_api.get_running_job(ctx, job_type,
resource_id)
if not running_job:
# there are two reasons that running_job is None. one
# is that the running job has just been finished, the
# other is that all workers fail to register the job
# due to deadlock exception. so we sleep and try again
eventlet.sleep(CONF.worker_sleep_time)
continue
job_time = running_job['timestamp']
current_time = datetime.datetime.now()
delta = current_time - job_time
if delta.seconds > CONF.job_run_expire:
# previous running job expires, we set its status to
# fail and try again to obtain the lock
db_api.finish_job(ctx, running_job['id'], False,
time_new)
LOG.warning(_LW('Job %(job)s of type %(job_type)s for '
'resource %(resource)s expires, set '
'its state to Fail'),
{'job': running_job['id'],
'job_type': job_type,
'resource': resource_id})
eventlet.sleep(CONF.worker_sleep_time)
continue
else:
# previous running job is still valid, we just leave
# the job to the worker who holds the lock
break
# successfully obtain the lock, start to execute handler
try:
func(*args, **kwargs)
except Exception:
db_api.finish_job(ctx, job['id'], False, time_new)
LOG.error(_LE('Job %(job)s of type %(job_type)s for '
'resource %(resource)s fails'),
{'job': job['id'],
'job_type': job_type,
'resource': resource_id})
break
db_api.finish_job(ctx, job['id'], True, time_new)
eventlet.sleep(CONF.worker_sleep_time)
return handle_args
return handle_func
class PeriodicTasks(periodic_task.PeriodicTasks):
def __init__(self):
super(PeriodicTasks, self).__init__(CONF)
@ -128,6 +205,7 @@ class XManager(PeriodicTasks):
return info_text
@_job_handle('router')
def configure_extra_routes(self, ctx, payload):
# TODO(zhiyuan) performance and reliability issue
# better have a job tracking mechanism

View File

@ -47,7 +47,15 @@ common_opts = [
cfg.StrOpt('host', default='tricircle.xhost',
help=_("The host name for RPC server")),
cfg.IntOpt('workers', default=1,
help=_("number of workers")),
help=_("Number of workers")),
cfg.IntOpt('worker_handle_timeout', default=1800,
help=_("Timeout for worker's one turn of processing, in"
" seconds")),
cfg.IntOpt('job_run_expire', default=60,
help=_("Running job is considered expires after this time, in"
" seconds")),
cfg.FloatOpt('worker_sleep_time', default=0.1,
help=_("Seconds a worker sleeps after one run in a loop"))
]
service_opts = [