Merge "NSXv: eliminate task use from edge creation"

This commit is contained in:
Jenkins 2016-08-02 12:40:23 +00:00 committed by Gerrit Code Review
commit 6ba97577a0
6 changed files with 94 additions and 280 deletions

View File

@ -16,14 +16,17 @@
import time
from neutron.plugins.common import constants as plugin_const
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
from oslo_utils import excutils
from vmware_nsx._i18n import _LE, _LI, _LW
from vmware_nsx._i18n import _, _LE, _LI, _LW
from vmware_nsx.common import exceptions as nsxv_exc
from vmware_nsx.common import nsxv_constants
from vmware_nsx.common import utils
from vmware_nsx.db import nsxv_db
from vmware_nsx.plugins.nsx_v.vshield.common import constants
from vmware_nsx.plugins.nsx_v.vshield.common import exceptions
from vmware_nsx.plugins.nsx_v.vshield import edge_utils
@ -373,66 +376,6 @@ class EdgeApplianceDriver(object):
LOG.debug("Deletion complete vnic %(vnic_index)s: on edge %(edge_id)s",
{'vnic_index': index, 'edge_id': edge_id})
def _deploy_edge(self, task):
userdata = task.userdata
LOG.debug("NSXv: start deploying edge")
request = userdata['request']
try:
header = self.vcns.deploy_edge(request)[0]
objuri = header['location']
job_id = objuri[objuri.rfind("/") + 1:]
response = self.vcns.get_edge_id(job_id)[1]
edge_id = response['edgeId']
LOG.debug("VCNS: deploying edge %s", edge_id)
userdata['edge_id'] = edge_id
status = task_constants.TaskStatus.PENDING
except exceptions.VcnsApiException:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("NSXv: deploy edge failed."))
return status
def _status_edge(self, task):
edge_id = task.userdata['edge_id']
try:
response = self.vcns.get_edge_deploy_status(edge_id)[1]
task.userdata['retries'] = 0
system_status = response.get('systemStatus', None)
if system_status is None:
status = task_constants.TaskStatus.PENDING
elif system_status == 'good':
status = task_constants.TaskStatus.COMPLETED
else:
status = task_constants.TaskStatus.ERROR
except exceptions.VcnsApiException as e:
LOG.exception(_LE("VCNS: Edge %s status query failed."), edge_id)
raise e
except Exception as e:
retries = task.userdata.get('retries', 0) + 1
if retries < 3:
task.userdata['retries'] = retries
LOG.exception(_LE("VCNS: Unable to retrieve edge %(edge_id)s "
"status. Retry %(retries)d."),
{'edge_id': edge_id,
'retries': retries})
status = task_constants.TaskStatus.PENDING
else:
LOG.exception(_LE("VCNS: Unable to retrieve edge %s status. "
"Abort."), edge_id)
status = task_constants.TaskStatus.ERROR
LOG.debug("VCNS: Edge %s status", edge_id)
return status
def _result_edge(self, task):
edge_id = task.userdata.get('edge_id')
if task.status != task_constants.TaskStatus.COMPLETED:
LOG.error(_LE("NSXv: Failed to deploy edge %(edge_id)s "
"status %(status)d"),
{'edge_id': edge_id,
'status': task.status})
else:
LOG.debug("NSXv: Edge %s is deployed", edge_id)
def _update_edge(self, task):
edge_id = task.userdata['edge_id']
LOG.debug("start update edge %s", edge_id)
@ -495,11 +438,11 @@ class EdgeApplianceDriver(object):
LOG.exception(_LE("VCNS: Failed to get edges:\n%s"), e.response)
raise e
def deploy_edge(self, resource_id, name, internal_network, jobdata=None,
dist=False, wait_for_exec=False, loadbalancer_enable=True,
appliance_size=nsxv_constants.LARGE, async=True,
def deploy_edge(self, context, router_id, name, internal_network,
dist=False, loadbalancer_enable=True,
appliance_size=nsxv_constants.LARGE,
availability_zone=None):
task_name = 'deploying-%s' % name
edge_name = name
edge = self._assemble_edge(
edge_name, datacenter_moid=self.datacenter_moid,
@ -538,50 +481,35 @@ class EdgeApplianceDriver(object):
if not dist and loadbalancer_enable:
self._enable_loadbalancer(edge)
if async:
userdata = {
'dist': dist,
'request': edge,
'router_name': name,
'jobdata': jobdata
}
task = tasks.Task(task_name, resource_id,
self._deploy_edge,
status_callback=self._status_edge,
result_callback=self._result_edge,
userdata=userdata)
task.add_executed_monitor(self.callbacks.edge_deploy_started)
task.add_result_monitor(self.callbacks.edge_deploy_result)
self.task_manager.add(task)
edge_id = None
try:
header = self.vcns.deploy_edge(edge)[0]
edge_id = header.get('location', '/').split('/')[-1]
if wait_for_exec:
# wait until the deploy task is executed so edge_id is
# available
task.wait(task_constants.TaskState.EXECUTED)
if edge_id:
nsxv_db.update_nsxv_router_binding(
context.session, router_id, edge_id=edge_id)
if not dist:
# Init Edge vnic binding
nsxv_db.init_edge_vnic_binding(
context.session, edge_id)
else:
if router_id:
nsxv_db.update_nsxv_router_binding(
context.session, router_id,
status=plugin_const.ERROR)
error = _('Failed to deploy edge')
raise nsxv_exc.NsxPluginException(err_msg=error)
return task
else:
edge_id = None
try:
header = self.vcns.deploy_edge(edge,
async=False)[0]
edge_id = header['location'].split('/')[-1]
LOG.debug("VCNS: deploying edge %s", edge_id)
self.callbacks.complete_edge_creation(
context, edge_id, name, router_id, dist, True)
self.callbacks.edge_deploy_started_sync(
jobdata['context'], edge_id, name,
jobdata['router_id'], dist)
self.callbacks.edge_deploy_result_sync(
jobdata['context'], edge_id, name, jobdata['router_id'],
dist, True)
except exceptions.VcnsApiException:
self.callbacks.edge_deploy_result_sync(
jobdata['context'], edge_id, name, jobdata['router_id'],
dist, False)
with excutils.save_and_reraise_exception():
LOG.exception(_LE("NSXv: deploy edge failed."))
except exceptions.VcnsApiException:
self.callbacks.complete_edge_creation(
context, edge_id, name, router_id, dist, False)
with excutils.save_and_reraise_exception():
LOG.exception(_LE("NSXv: deploy edge failed."))
return edge_id
def update_edge(self, router_id, edge_id, name, internal_network,
jobdata=None, dist=False, loadbalancer_enable=True,

View File

@ -48,7 +48,6 @@ from vmware_nsx.plugins.nsx_v.vshield.common import (
from vmware_nsx.plugins.nsx_v.vshield.common import exceptions as nsxapi_exc
from vmware_nsx.plugins.nsx_v.vshield.tasks import (
constants as task_const)
from vmware_nsx.plugins.nsx_v.vshield.tasks import tasks
from vmware_nsx.plugins.nsx_v.vshield import vcns
WORKER_POOL_SIZE = 8
@ -158,25 +157,15 @@ class EdgeManager(object):
def _deploy_edge(self, context, lrouter,
lswitch=None, appliance_size=nsxv_constants.COMPACT,
edge_type=nsxv_constants.SERVICE_EDGE, async=True,
edge_type=nsxv_constants.SERVICE_EDGE,
availability_zone=None):
"""Create an edge for logical router support."""
router_id = lrouter['id']
# deploy edge
jobdata = {
'router_id': router_id,
'lrouter': lrouter,
'lswitch': lswitch,
'context': context
}
task = self.nsxv_manager.deploy_edge(
lrouter['id'], lrouter['name'], internal_network=None,
jobdata=jobdata, wait_for_exec=True,
self.nsxv_manager.deploy_edge(context, lrouter['id'],
lrouter['name'], internal_network=None,
appliance_size=appliance_size,
dist=(edge_type == nsxv_constants.VDR_EDGE), async=async,
dist=(edge_type == nsxv_constants.VDR_EDGE),
availability_zone=availability_zone)
return task
def _deploy_backup_edges_on_db(self, context, num,
appliance_size=nsxv_constants.COMPACT,
@ -211,7 +200,7 @@ class EdgeManager(object):
'name': router_id}
pool.spawn_n(self._deploy_edge, context, fake_router,
appliance_size=appliance_size,
edge_type=edge_type, async=False,
edge_type=edge_type,
availability_zone=availability_zone)
def _delete_edge(self, context, router_binding):
@ -571,7 +560,7 @@ class EdgeManager(object):
availability_zone=availability_zone.name)
self._deploy_edge(context, lrouter,
appliance_size=appliance_size,
edge_type=edge_type, async=False,
edge_type=edge_type,
availability_zone=availability_zone)
return
@ -598,7 +587,7 @@ class EdgeManager(object):
availability_zone=availability_zone.name)
self._deploy_edge(context, lrouter,
appliance_size=appliance_size,
edge_type=edge_type, async=False,
edge_type=edge_type,
availability_zone=availability_zone)
else:
LOG.debug("Select edge: %(edge_id)s from pool for %(name)s",
@ -624,16 +613,10 @@ class EdgeManager(object):
jobdata = {
'context': context,
'router_id': lrouter['id']}
fake_userdata = {'jobdata': jobdata,
'router_name': lrouter['name'],
'edge_id': edge_id,
'dist': dist}
fake_task = tasks.Task(name='fake-deploy-edge-task',
resource_id='fake-resource_id',
execute_callback=None,
userdata=fake_userdata)
fake_task.status = task_const.TaskStatus.COMPLETED
self.nsxv_manager.callbacks.edge_deploy_result(fake_task)
self.nsxv_manager.callbacks.complete_edge_creation(
context, edge_id, lrouter['name'], lrouter['id'], dist,
True)
# change edge's name at backend
task = self.nsxv_manager.update_edge(
resource_id, available_router_binding['edge_id'],
@ -1779,21 +1762,9 @@ def create_lrouter(nsxv_manager, context, lrouter, lswitch=None, dist=False,
availability_zone=availability_zone.name)
# deploy edge
jobdata = {
'router_id': router_id,
'lrouter': lrouter,
'lswitch': lswitch,
'context': context
}
# deploy and wait until the deploy request has been requested
# so we will have edge_id ready. The wait here should be fine
# as we're not in a database transaction now
task = nsxv_manager.deploy_edge(
router_id, router_name, internal_network=None,
dist=dist, jobdata=jobdata, appliance_size=appliance_size,
availability_zone=availability_zone)
task.wait(task_const.TaskState.RESULT)
nsxv_manager.deploy_edge(
context, router_id, router_name, internal_network=None, dist=dist,
appliance_size=appliance_size, availability_zone=availability_zone)
def delete_lrouter(nsxv_manager, context, router_id, dist=False):
@ -2265,55 +2236,12 @@ class NsxVCallbacks(object):
def __init__(self, plugin):
self.plugin = plugin
def edge_deploy_started(self, task):
"""callback when deployment task started."""
jobdata = task.userdata['jobdata']
context = jobdata['context']
router_id = jobdata.get('router_id')
edge_id = task.userdata.get('edge_id')
name = task.userdata.get('router_name')
dist = task.userdata.get('dist')
self.edge_deploy_started_sync(context, edge_id, name, router_id, dist)
def edge_deploy_started_sync(self, context, edge_id, name, router_id,
dist):
if edge_id:
LOG.debug("Start deploying %(edge_id)s for router %(name)s",
{'edge_id': edge_id,
'name': name})
nsxv_db.update_nsxv_router_binding(
context.session, router_id, edge_id=edge_id)
if not dist:
# Init Edge vnic binding
nsxv_db.init_edge_vnic_binding(
context.session, edge_id)
else:
LOG.debug("Failed to deploy Edge")
if router_id:
nsxv_db.update_nsxv_router_binding(
context.session, router_id,
status=plugin_const.ERROR)
def edge_deploy_result(self, task):
"""callback when deployment task finished."""
jobdata = task.userdata['jobdata']
context = jobdata['context']
name = task.userdata.get('router_name')
dist = task.userdata.get('dist')
router_id = jobdata['router_id']
edge_id = task.userdata.get('edge_id')
self.edge_deploy_result_sync(
context, edge_id, name, router_id, dist,
task.status == task_const.TaskStatus.COMPLETED)
def edge_deploy_result_sync(self, context, edge_id, name, router_id, dist,
deploy_successful):
def complete_edge_creation(
self, context, edge_id, name, router_id, dist, deploy_successful):
router_db = None
if uuidutils.is_uuid_like(router_id):
try:
router_db = self.plugin._get_router(
context, router_id)
router_db = self.plugin._get_router(context, router_id)
except l3.RouterNotFound:
# Router might have been deleted before deploy finished
LOG.warning(_LW("Router %s not found"), name)

View File

@ -142,10 +142,8 @@ class Vcns(object):
@retry_upon_exception(exceptions.ResourceNotFound)
@retry_upon_exception(exceptions.RequestBad)
def deploy_edge(self, request, async=True):
def deploy_edge(self, request):
uri = URI_PREFIX
if async:
uri += "?async=true"
return self.do_request(HTTP_POST, uri, request, decode=False)
def update_edge(self, edge_id, request):

View File

@ -87,7 +87,7 @@ class FakeVcns(object):
response = {"edgeJob": []}
return (header, response)
def deploy_edge(self, request, async=True):
def deploy_edge(self, request):
if (self._unique_router_name and
not self._validate_edge_name(request['name'])):
header = {
@ -104,39 +104,20 @@ class FakeVcns(object):
}
return (header, jsonutils.dumps(response))
if async:
self._job_idx = self._job_idx + 1
job_id = "jobdata-%d" % self._job_idx
self._edge_idx = self._edge_idx + 1
edge_id = "edge-%d" % self._edge_idx
self._jobs[job_id] = edge_id
self._edges[edge_id] = {
'name': request['name'],
'request': request,
'nat_rules': None,
'nat_rule_id': 0,
'interface_index': 1
}
header = {
'status': 200,
'location': 'https://host/api/4.0/jobs/%s' % job_id
}
response = ''
else:
self._edge_idx = self._edge_idx + 1
edge_id = "edge-%d" % self._edge_idx
self._edges[edge_id] = {
'name': request['name'],
'request': request,
'nat_rules': None,
'nat_rule_id': 0,
'interface_index': 1
}
header = {
'status': 200,
'location': 'https://host/api/4.0/edges/%s' % edge_id
}
response = ''
self._edge_idx = self._edge_idx + 1
edge_id = "edge-%d" % self._edge_idx
self._edges[edge_id] = {
'name': request['name'],
'request': request,
'nat_rules': None,
'nat_rule_id': 0,
'interface_index': 1
}
header = {
'status': 200,
'location': 'https://host/api/4.0/edges/%s' % edge_id
}
response = ''
return (header, response)
def update_edge(self, edge_id, request):

View File

@ -203,13 +203,9 @@ class EdgeUtilsTestCase(EdgeUtilsTestCaseMixin):
edge_utils.create_lrouter(self.nsxv_manager, self.ctx, lrouter,
lswitch=None, dist=False,
availability_zone=self.az)
self.nsxv_manager.deploy_edge.assert_called_once_with(
self.nsxv_manager.deploy_edge.assert_called_once_with(self.ctx,
lrouter['id'], (lrouter['name'] + '-' + lrouter['id']),
internal_network=None, dist=False, availability_zone=self.az,
jobdata={'router_id': lrouter['id'],
'lrouter': lrouter,
'lswitch': None,
'context': self.ctx},
appliance_size=vcns_const.SERVICE_SIZE_MAPPING['router'])
def _test_update_intereface_primary_addr(self, old_ip, new_ip, isUplink):

View File

@ -16,13 +16,16 @@
from eventlet import greenthread
import mock
from neutron import context as neutron_context
from neutron.tests import base
from oslo_config import cfg
import six
from vmware_nsx.common import exceptions as nsxv_exc
from vmware_nsx.plugins.nsx_v import availability_zones as nsx_az
from vmware_nsx.plugins.nsx_v.vshield.common import (
constants as vcns_const)
from vmware_nsx.plugins.nsx_v.vshield import edge_appliance_driver as e_drv
from vmware_nsx.plugins.nsx_v.vshield.tasks import (
constants as ts_const)
from vmware_nsx.plugins.nsx_v.vshield.tasks import tasks as ts
@ -317,6 +320,9 @@ class VcnsDriverTestCase(base.BaseTestCase):
def setUp(self):
super(VcnsDriverTestCase, self).setUp()
self.ctx = neutron_context.get_admin_context()
self.temp_e_drv_nsxv_db = e_drv.nsxv_db
e_drv.nsxv_db = mock.MagicMock()
self.config_parse(args=['--config-file', VCNS_CONFIG_FILE])
self.fc = fake_vcns.FakeVcns()
@ -333,34 +339,22 @@ class VcnsDriverTestCase(base.BaseTestCase):
self.result = None
def tearDown(self):
e_drv.nsxv_db = self.temp_e_drv_nsxv_db
self.vcns_driver.task_manager.stop()
# Task manager should not leave running threads around
# if _thread is None it means it was killed in stop()
self.assertIsNone(self.vcns_driver.task_manager._thread)
super(VcnsDriverTestCase, self).tearDown()
def complete_edge_creation(
self, context, edge_id, name, router_id, dist, deploy_successful):
pass
def _deploy_edge(self):
task = self.vcns_driver.deploy_edge(
'router-id', 'myedge', 'internal-network', {}, wait_for_exec=True,
self.edge_id = self.vcns_driver.deploy_edge(
self.ctx, 'router-id', 'myedge', 'internal-network',
availability_zone=self.az)
self.assertEqual(self.edge_id, 'edge-1')
task.wait(ts_const.TaskState.RESULT)
return task
def edge_deploy_started(self, task):
self.edge_id = task.userdata['edge_id']
def edge_deploy_started_sync(self, context, edge_id, name, router_id,
dist):
pass
def edge_deploy_result(self, task):
if task.status == ts_const.TaskStatus.COMPLETED:
task.userdata['jobdata']['edge_deploy_result'] = True
def edge_deploy_result_sync(self, context, edge_id, name, router_id,
dist, deploy_successful):
pass
def edge_delete_result(self, task):
if task.status == ts_const.TaskStatus.COMPLETED:
@ -378,35 +372,24 @@ class VcnsDriverTestCase(base.BaseTestCase):
if task.status == ts_const.TaskStatus.COMPLETED:
task.userdata['jobdata']['interface_update_result'] = True
def test_deploy_edge_with_async(self):
jobdata = {}
task = self.vcns_driver.deploy_edge(
'router-id', 'myedge', 'internal-network', jobdata=jobdata,
wait_for_exec=True, availability_zone=self.az)
self.assertEqual(self.edge_id, 'edge-1')
task.wait(ts_const.TaskState.RESULT)
self.assertEqual(task.status, ts_const.TaskStatus.COMPLETED)
self.assertTrue(jobdata.get('edge_deploy_result'))
def test_deploy_edge_with_sync(self):
jobdata = {"context": "fake_context",
"router_id": "fake_router_id"}
def test_deploy_edge_with(self):
self.vcns_driver.deploy_edge(
'router-id', 'myedge', 'internal-network', jobdata=jobdata,
wait_for_exec=True, async=False, availability_zone=self.az)
self.ctx, 'router-id', 'myedge', 'internal-network',
availability_zone=self.az)
status = self.vcns_driver.get_edge_status('edge-1')
self.assertEqual(status, vcns_const.RouterStatus.ROUTER_STATUS_ACTIVE)
def test_deploy_edge_fail(self):
task1 = self.vcns_driver.deploy_edge(
'router-1', 'myedge', 'internal-network', {}, wait_for_exec=True,
self.vcns_driver.deploy_edge(
self.ctx, 'router-1', 'myedge', 'internal-network',
availability_zone=self.az)
task2 = self.vcns_driver.deploy_edge(
'router-2', 'myedge', 'internal-network', {}, wait_for_exec=True,
# self.vcns_driver.deploy_edge(
# self.ctx, 'router-2', 'myedge', 'internal-network',
# availability_zone=self.az)
self.assertRaises(
nsxv_exc.NsxPluginException, self.vcns_driver.deploy_edge,
self.ctx, 'router-2', 'myedge', 'internal-network',
availability_zone=self.az)
task1.wait(ts_const.TaskState.RESULT)
task2.wait(ts_const.TaskState.RESULT)
self.assertEqual(task2.status, ts_const.TaskStatus.ERROR)
def test_get_edge_status(self):
self._deploy_edge()
@ -540,11 +523,11 @@ class VcnsDriverHATestCase(VcnsDriverTestCase):
self.vcns_driver.vcns.orig_deploy = self.vcns_driver.vcns.deploy_edge
self.vcns_driver.vcns.deploy_edge = self._fake_deploy_edge
def _fake_deploy_edge(self, request, async=True):
def _fake_deploy_edge(self, request):
# validate the appliance structure in the request,
# and return the regular (fake) response
found_app = request['appliances']['appliances']
self.assertEqual(len(found_app), 2)
self.assertEqual(found_app[0]['datastoreId'], self._data_store)
self.assertEqual(found_app[1]['datastoreId'], self._ha_data_store)
return self.vcns_driver.vcns.orig_deploy(request, async)
return self.vcns_driver.vcns.orig_deploy(request)