Merge "Edge pool concurrency"
This commit is contained in:
commit
511cd42c09
@ -218,6 +218,7 @@ class NsxVMetadataProxyHandler:
|
||||
'timed out')
|
||||
raise nsxv_exc.NsxPluginException(err_msg=error)
|
||||
|
||||
rtr_id = None
|
||||
try:
|
||||
router_data = {
|
||||
'router': {
|
||||
|
@ -437,7 +437,7 @@ class EdgeApplianceDriver(object):
|
||||
|
||||
def deploy_edge(self, resource_id, name, internal_network, jobdata=None,
|
||||
dist=False, wait_for_exec=False, loadbalancer_enable=True,
|
||||
appliance_size=nsxv_constants.LARGE):
|
||||
appliance_size=nsxv_constants.LARGE, async=True):
|
||||
task_name = 'deploying-%s' % name
|
||||
edge_name = name
|
||||
edge = self._assemble_edge(
|
||||
@ -468,26 +468,51 @@ class EdgeApplianceDriver(object):
|
||||
edge['vnics']['vnics'].append(vnic_inside)
|
||||
if not dist and loadbalancer_enable:
|
||||
self._enable_loadbalancer(edge)
|
||||
userdata = {
|
||||
'dist': dist,
|
||||
'request': edge,
|
||||
'router_name': name,
|
||||
'jobdata': jobdata
|
||||
}
|
||||
task = tasks.Task(task_name, resource_id,
|
||||
self._deploy_edge,
|
||||
status_callback=self._status_edge,
|
||||
result_callback=self._result_edge,
|
||||
userdata=userdata)
|
||||
task.add_executed_monitor(self.callbacks.edge_deploy_started)
|
||||
task.add_result_monitor(self.callbacks.edge_deploy_result)
|
||||
self.task_manager.add(task)
|
||||
|
||||
if wait_for_exec:
|
||||
# waitl until the deploy task is executed so edge_id is available
|
||||
task.wait(task_constants.TaskState.EXECUTED)
|
||||
if async:
|
||||
userdata = {
|
||||
'dist': dist,
|
||||
'request': edge,
|
||||
'router_name': name,
|
||||
'jobdata': jobdata
|
||||
}
|
||||
task = tasks.Task(task_name, resource_id,
|
||||
self._deploy_edge,
|
||||
status_callback=self._status_edge,
|
||||
result_callback=self._result_edge,
|
||||
userdata=userdata)
|
||||
task.add_executed_monitor(self.callbacks.edge_deploy_started)
|
||||
task.add_result_monitor(self.callbacks.edge_deploy_result)
|
||||
self.task_manager.add(task)
|
||||
|
||||
return task
|
||||
if wait_for_exec:
|
||||
# wait until the deploy task is executed so edge_id is
|
||||
# available
|
||||
task.wait(task_constants.TaskState.EXECUTED)
|
||||
|
||||
return task
|
||||
else:
|
||||
edge_id = None
|
||||
try:
|
||||
header = self.vcns.deploy_edge(edge,
|
||||
async=False)[0]
|
||||
edge_id = header['location'].split('/')[-1]
|
||||
LOG.debug(_("VCNS: deploying edge %s"), edge_id)
|
||||
|
||||
self.callbacks.edge_deploy_started_sync(
|
||||
jobdata['context'], edge_id, name,
|
||||
jobdata['router_id'], dist)
|
||||
|
||||
self.callbacks.edge_deploy_result_sync(
|
||||
jobdata['context'], edge_id, name, jobdata['router_id'],
|
||||
dist, True)
|
||||
|
||||
except exceptions.VcnsApiException:
|
||||
self.callbacks.edge_deploy_result_sync(
|
||||
jobdata['context'], edge_id, name, jobdata['router_id'],
|
||||
dist, False)
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.exception(_("NSXv: deploy edge failed."))
|
||||
|
||||
def update_edge(self, router_id, edge_id, name, internal_network,
|
||||
jobdata=None, dist=False, loadbalancer_enable=True,
|
||||
|
@ -13,6 +13,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import eventlet
|
||||
import time
|
||||
|
||||
from oslo.config import cfg
|
||||
@ -37,6 +38,7 @@ from vmware_nsx.neutron.plugins.vmware.vshield.tasks import (
|
||||
from vmware_nsx.neutron.plugins.vmware.vshield.tasks import tasks
|
||||
|
||||
|
||||
WORKER_POOL_SIZE = 8
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
_uuid = uuidutils.generate_uuid
|
||||
@ -100,11 +102,12 @@ class EdgeManager(object):
|
||||
self.edge_pool_dicts = parse_backup_edge_pool_opt()
|
||||
self.nsxv_plugin = nsxv_manager.callbacks.plugin
|
||||
self.plugin = plugin
|
||||
self.worker_pool = eventlet.GreenPool(WORKER_POOL_SIZE)
|
||||
self._check_backup_edge_pools()
|
||||
|
||||
def _deploy_edge(self, context, lrouter,
|
||||
lswitch=None, appliance_size=nsxv_constants.LARGE,
|
||||
edge_type=nsxv_constants.SERVICE_EDGE):
|
||||
edge_type=nsxv_constants.SERVICE_EDGE, async=True):
|
||||
"""Create an edge for logical router support."""
|
||||
router_id = lrouter['id']
|
||||
# deploy edge
|
||||
@ -119,7 +122,7 @@ class EdgeManager(object):
|
||||
lrouter['id'], lrouter['name'], internal_network=None,
|
||||
jobdata=jobdata, wait_for_exec=True,
|
||||
appliance_size=appliance_size,
|
||||
dist=(edge_type == nsxv_constants.VDR_EDGE))
|
||||
dist=(edge_type == nsxv_constants.VDR_EDGE), async=async)
|
||||
return task
|
||||
|
||||
def _deploy_backup_edges_on_db(self, context, num,
|
||||
@ -128,6 +131,7 @@ class EdgeManager(object):
|
||||
router_ids = [(vcns_const.BACKUP_ROUTER_PREFIX +
|
||||
_uuid())[:vcns_const.EDGE_NAME_LEN]
|
||||
for i in xrange(num)]
|
||||
|
||||
for router_id in router_ids:
|
||||
nsxv_db.add_nsxv_router_binding(
|
||||
context.session, router_id, None, None,
|
||||
@ -138,14 +142,19 @@ class EdgeManager(object):
|
||||
def _deploy_backup_edges_at_backend(self, context, router_ids,
|
||||
appliance_size=nsxv_constants.LARGE,
|
||||
edge_type=nsxv_constants.SERVICE_EDGE):
|
||||
"""Asynchronously deploy edges to populate edge pool."""
|
||||
|
||||
eventlet.spawn_n(self._pool_creator, context, router_ids,
|
||||
appliance_size, edge_type)
|
||||
|
||||
def _pool_creator(self, context, router_ids, appliance_size, edge_type):
|
||||
pool = self.worker_pool
|
||||
for router_id in router_ids:
|
||||
fake_router = {
|
||||
'id': router_id,
|
||||
'name': router_id}
|
||||
self._deploy_edge(context, fake_router,
|
||||
appliance_size=appliance_size,
|
||||
edge_type=edge_type)
|
||||
pool.spawn_n(self._deploy_edge, context, fake_router,
|
||||
appliance_size=appliance_size,
|
||||
edge_type=edge_type, async=False)
|
||||
|
||||
def _delete_edge(self, context, router_binding):
|
||||
if router_binding['status'] == plugin_const.ERROR:
|
||||
@ -1435,6 +1444,10 @@ class NsxVCallbacks(object):
|
||||
edge_id = task.userdata.get('edge_id')
|
||||
name = task.userdata.get('router_name')
|
||||
dist = task.userdata.get('dist')
|
||||
self.edge_deploy_started_sync(context, edge_id, name, router_id, dist)
|
||||
|
||||
def edge_deploy_started_sync(self, context, edge_id, name, router_id,
|
||||
dist):
|
||||
if edge_id:
|
||||
LOG.debug("Start deploying %(edge_id)s for router %(name)s",
|
||||
{'edge_id': edge_id,
|
||||
@ -1459,6 +1472,14 @@ class NsxVCallbacks(object):
|
||||
name = task.userdata.get('router_name')
|
||||
dist = task.userdata.get('dist')
|
||||
router_id = jobdata['router_id']
|
||||
edge_id = task.userdata.get('edge_id')
|
||||
|
||||
self.edge_deploy_result_sync(
|
||||
context, edge_id, name, router_id, dist,
|
||||
task.status == task_const.TaskStatus.COMPLETED)
|
||||
|
||||
def edge_deploy_result_sync(self, context, edge_id, name, router_id, dist,
|
||||
deploy_successful):
|
||||
router_db = None
|
||||
if uuidutils.is_uuid_like(router_id):
|
||||
try:
|
||||
@ -1468,9 +1489,9 @@ class NsxVCallbacks(object):
|
||||
# Router might have been deleted before deploy finished
|
||||
LOG.warning(_LW("Router %s not found"), name)
|
||||
|
||||
if task.status == task_const.TaskStatus.COMPLETED:
|
||||
if deploy_successful:
|
||||
LOG.debug("Successfully deployed %(edge_id)s for router %(name)s",
|
||||
{'edge_id': task.userdata['edge_id'],
|
||||
{'edge_id': edge_id,
|
||||
'name': name})
|
||||
if (router_db and
|
||||
router_db['status'] == plugin_const.PENDING_CREATE):
|
||||
@ -1485,9 +1506,9 @@ class NsxVCallbacks(object):
|
||||
nsxv_db.update_nsxv_router_binding(
|
||||
context.session, router_id,
|
||||
status=plugin_const.ERROR)
|
||||
if not dist and task.userdata.get('edge_id'):
|
||||
if not dist and edge_id:
|
||||
nsxv_db.clean_edge_vnic_binding(
|
||||
context.session, task.userdata['edge_id'])
|
||||
context.session, edge_id)
|
||||
|
||||
def edge_update_result(self, task):
|
||||
LOG.debug("edge_update_result %d", task.status)
|
||||
|
@ -117,8 +117,10 @@ class Vcns(object):
|
||||
uri = URI_PREFIX + "?lockUpdatesOnEdge=true"
|
||||
return self.do_request(HTTP_POST, uri, decode=False)
|
||||
|
||||
def deploy_edge(self, request):
|
||||
uri = URI_PREFIX + "?async=true"
|
||||
def deploy_edge(self, request, async=True):
|
||||
uri = URI_PREFIX
|
||||
if async:
|
||||
uri += "?async=true"
|
||||
return self.do_request(HTTP_POST, uri, request, decode=False)
|
||||
|
||||
def update_edge(self, edge_id, request):
|
||||
|
Loading…
x
Reference in New Issue
Block a user