VCNS driver implementation

Implement API/driver interface for configuring vShield Edge Appliance.

Currently implemented functions:
    - Deploy an Edge
    - Destroy an Edge
    - Configuring interfaces
    - Configuring SNAT/DNAT rules
    - Configuring default gateway and static routes
    - Query Edge status
    - Task-based asynchronous model
    - Allow old routes/nat config to be skipped if new updates are coming

Implements: blueprint vcns-driver
Change-Id: I881bde907f4c90de4c919d008b76b8c2a2d0e1fd
This commit is contained in:
Kaiwei Fan 2013-08-20 14:28:58 -07:00
parent b60ff6b2a5
commit 90db4802d7
19 changed files with 2357 additions and 3 deletions

View File

@ -134,3 +134,34 @@
# that using the minimum chunk size will cause the interval between two
# requests to be less than min_sync_req_delay
# min_chunk_size = 500
[vcns]
# URL for VCNS manager
# manager_uri = https://management_ip
# User name for VCNS manager
# user = admin
# Password for VCNS manager
# password = default
# (Optional) Datacenter ID for Edge deployment
# datacenter_moid =
# (Optional) Deployment Container ID for NSX Edge deployment
# If not specified, either a default global container will be used, or
# the resource pool and datastore specified below will be used
# deployment_container_id =
# (Optional) Resource pool ID for NSX Edge deployment
# resource_pool_id =
# (Optional) Datastore ID for NSX Edge deployment
# datastore_id =
# (Required) UUID of logic switch for physical network connectivity
# external_network =
# (Optional) Asynchronous task status check interval
# default is 2000 (millisecond)
# task_status_check_interval = 2000

View File

@ -120,12 +120,43 @@ cluster_opts = [
"network connection")),
]
DEFAULT_STATUS_CHECK_INTERVAL = 2000
vcns_opts = [
cfg.StrOpt('user',
default='admin',
help=_('User name for vsm')),
cfg.StrOpt('password',
default='default',
secret=True,
help=_('Password for vsm')),
cfg.StrOpt('manager_uri',
help=_('uri for vsm')),
cfg.StrOpt('datacenter_moid',
help=_('Optional parameter identifying the ID of datacenter '
'to deploy NSX Edges')),
cfg.StrOpt('deployment_container_id',
help=_('Optional parameter identifying the ID of datastore to '
'deploy NSX Edges')),
cfg.StrOpt('resource_pool_id',
help=_('Optional parameter identifying the ID of resource to '
'deploy NSX Edges')),
cfg.StrOpt('datastore_id',
help=_('Optional parameter identifying the ID of datastore to '
'deploy NSX Edges')),
cfg.StrOpt('external_network',
help=_('Network ID for physical network connectivity')),
cfg.IntOpt('task_status_check_interval',
default=DEFAULT_STATUS_CHECK_INTERVAL,
help=_("Task status check interval"))
]
# Register the configuration options
cfg.CONF.register_opts(connection_opts)
cfg.CONF.register_opts(cluster_opts)
cfg.CONF.register_opts(nvp_opts, "NVP")
cfg.CONF.register_opts(sync_opts, "NVP_SYNC")
cfg.CONF.register_opts(vcns_opts, group="vcns")
# NOTE(armando-migliaccio): keep the following code until we support
# NVP configuration files in older format (Grizzly or older).
# ### BEGIN

View File

@ -0,0 +1,16 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,84 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 VMware, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: linb, VMware
import base64
import eventlet
from neutron.openstack.common import jsonutils
from neutron.plugins.nicira.vshield.common import exceptions
httplib2 = eventlet.import_patched('httplib2')
def xmldumps(obj):
config = ""
if isinstance(obj, dict):
for key, value in obj.iteritems():
cfg = "<%s>%s</%s>" % (key, xmldumps(value), key)
config += cfg
elif isinstance(obj, list):
for value in obj:
config += xmldumps(value)
else:
config = obj
return config
class VcnsApiHelper(object):
errors = {
303: exceptions.ResourceRedirect,
400: exceptions.RequestBad,
403: exceptions.Forbidden,
404: exceptions.ResourceNotFound,
415: exceptions.MediaTypeUnsupport,
503: exceptions.ServiceUnavailable
}
def __init__(self, address, user, password, format='json'):
self.authToken = base64.encodestring("%s:%s" % (user, password))
self.user = user
self.passwd = password
self.address = address
self.format = format
if format == 'json':
self.encode = jsonutils.dumps
else:
self.encode = xmldumps
def request(self, method, uri, params=None):
uri = self.address + uri
http = httplib2.Http()
http.disable_ssl_certificate_validation = True
headers = {
'Content-Type': 'application/' + self.format,
'Accept': 'application/' + 'json',
'Authorization': 'Basic ' + self.authToken
}
body = self.encode(params) if params else None
header, response = http.request(uri, method,
body=body, headers=headers)
status = int(header['status'])
if 200 <= status < 300:
return header, response
if status in self.errors:
cls = self.errors[status]
else:
cls = exceptions.VcnsApiException
raise cls(uri=uri, status=status, header=header, response=response)

View File

@ -0,0 +1,16 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,45 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
EDGE_ID = 'edge_id'
ROUTER_ID = 'router_id'
# Interface
EXTERNAL_VNIC_INDEX = 0
INTERNAL_VNIC_INDEX = 1
EXTERNAL_VNIC_NAME = "external"
INTERNAL_VNIC_NAME = "internal"
INTEGRATION_LR_IPADDRESS = "169.254.2.1/28"
INTEGRATION_EDGE_IPADDRESS = "169.254.2.3"
INTEGRATION_SUBNET_NETMASK = "255.255.255.240"
# SNAT rule location
PREPEND = 0
APPEND = -1
# error code
VCNS_ERROR_CODE_EDGE_NOT_RUNNING = 10013
# router status by number
class RouterStatus(object):
ROUTER_STATUS_ACTIVE = 0
ROUTER_STATUS_DOWN = 1
ROUTER_STATUS_PENDING_CREATE = 2
ROUTER_STATUS_PENDING_DELETE = 3
ROUTER_STATUS_ERROR = 4

View File

@ -0,0 +1,64 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 VMware, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: linb, VMware
from neutron.common import exceptions
class VcnsException(exceptions.NeutronException):
pass
class VcnsGeneralException(VcnsException):
def __init__(self, message):
self.message = message
super(VcnsGeneralException, self).__init__()
class VcnsApiException(VcnsException):
message = _("An unknown exception %(status)s occurred: %(response)s.")
def __init__(self, **kwargs):
super(VcnsApiException, self).__init__(**kwargs)
self.status = kwargs.get('status')
self.header = kwargs.get('header')
self.response = kwargs.get('response')
class ResourceRedirect(VcnsApiException):
message = _("Resource %(uri)s has been redirected")
class RequestBad(VcnsApiException):
message = _("Request %(uri)s is Bad, response %(response)s")
class Forbidden(VcnsApiException):
message = _("Forbidden: %(uri)s")
class ResourceNotFound(VcnsApiException):
message = _("Resource %(uri)s not found")
class MediaTypeUnsupport(VcnsApiException):
message = _("Media Type %(uri)s is not supported")
class ServiceUnavailable(VcnsApiException):
message = _("Service on available: %(uri)s")

View File

@ -0,0 +1,631 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 VMware, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Kaiwei Fan, VMware, Inc.
# @author: Bo Link, VMware, Inc.
from neutron.openstack.common import jsonutils
from neutron.openstack.common import log as logging
from neutron.plugins.nicira.vshield.common import (
constants as vcns_const)
from neutron.plugins.nicira.vshield.common.constants import RouterStatus
from neutron.plugins.nicira.vshield.common import exceptions
from neutron.plugins.nicira.vshield.tasks.constants import TaskState
from neutron.plugins.nicira.vshield.tasks.constants import TaskStatus
from neutron.plugins.nicira.vshield.tasks import tasks
LOG = logging.getLogger(__name__)
class EdgeApplianceDriver(object):
def __init__(self):
# store the last task per edge that has the latest config
self.updated_task = {
'nat': {},
'route': {},
}
def _assemble_edge(self, name, appliance_size="compact",
deployment_container_id=None, datacenter_moid=None,
enable_aesni=True, hypervisor_assist=False,
enable_fips=False, remote_access=False):
edge = {
'name': name,
'fqdn': name,
'hypervisorAssist': hypervisor_assist,
'type': 'gatewayServices',
'enableAesni': enable_aesni,
'enableFips': enable_fips,
'cliSettings': {
'remoteAccess': remote_access
},
'appliances': {
'applianceSize': appliance_size
},
'vnics': {
'vnics': []
}
}
if deployment_container_id:
edge['appliances']['deploymentContainerId'] = (
deployment_container_id)
if datacenter_moid:
edge['datacenterMoid'] = datacenter_moid,
return edge
def _assemble_edge_appliance(self, resource_pool_id, datastore_id):
appliance = {}
if resource_pool_id:
appliance['resourcePoolId'] = resource_pool_id
if datastore_id:
appliance['datastoreId'] = datastore_id
return appliance
def _assemble_edge_vnic(self, name, index, portgroup_id,
primary_address=None, subnet_mask=None,
secondary=None,
type="internal",
enable_proxy_arp=False,
enable_send_redirects=True,
is_connected=True,
mtu=1500):
vnic = {
'index': index,
'name': name,
'type': type,
'portgroupId': portgroup_id,
'mtu': mtu,
'enableProxyArp': enable_proxy_arp,
'enableSendRedirects': enable_send_redirects,
'isConnected': is_connected
}
if primary_address and subnet_mask:
address_group = {
'primaryAddress': primary_address,
'subnetMask': subnet_mask
}
if secondary:
address_group['secondaryAddresses'] = {
'ipAddress': secondary
}
vnic['addressGroups'] = {
'addressGroups': [address_group]
}
return vnic
def _edge_status_to_level(self, status):
if status == 'GREEN':
status_level = RouterStatus.ROUTER_STATUS_ACTIVE
elif status in ('GREY', 'YELLOW'):
status_level = RouterStatus.ROUTER_STATUS_DOWN
else:
status_level = RouterStatus.ROUTER_STATUS_ERROR
return status_level
def get_edge_status(self, edge_id):
try:
response = self.vcns.get_edge_status(edge_id)[1]
status_level = self._edge_status_to_level(
response['edgeStatus'])
except exceptions.VcnsApiException as e:
LOG.exception(_("VCNS: Failed to get edge status:\n%s"),
e.response)
status_level = RouterStatus.ROUTER_STATUS_ERROR
try:
desc = jsonutils.loads(e.response)
if desc.get('errorCode') == (
vcns_const.VCNS_ERROR_CODE_EDGE_NOT_RUNNING):
status_level = RouterStatus.ROUTER_STATUS_DOWN
except ValueError:
LOG.exception(e.response)
return status_level
def get_edges_statuses(self):
edges_status_level = {}
edges = self._get_edges()
for edge in edges['edgePage'].get('data', []):
edge_id = edge['id']
status = edge['edgeStatus']
edges_status_level[edge_id] = self._edge_status_to_level(status)
return edges_status_level
def _update_interface(self, task):
edge_id = task.userdata['edge_id']
config = task.userdata['config']
LOG.debug(_("VCNS: start updating vnic %s"), config)
try:
self.vcns.update_interface(edge_id, config)
except exceptions.VcnsApiException as e:
LOG.exception(_("VCNS: Failed to update vnic %(config)s:\n"
"%(response)s"), {
'config': config,
'response': e.response})
raise e
except Exception as e:
LOG.exception(_("VCNS: Failed to update vnic %d"),
config['index'])
raise e
return TaskStatus.COMPLETED
def update_interface(self, router_id, edge_id, index, network,
address=None, netmask=None, secondary=None,
jobdata=None):
LOG.debug(_("VCNS: update vnic %(index)d: %(addr)s %(netmask)s"), {
'index': index, 'addr': address, 'netmask': netmask})
if index == vcns_const.EXTERNAL_VNIC_INDEX:
name = vcns_const.EXTERNAL_VNIC_NAME
intf_type = 'uplink'
elif index == vcns_const.INTERNAL_VNIC_INDEX:
name = vcns_const.INTERNAL_VNIC_NAME
intf_type = 'internal'
else:
msg = _("Vnic %d currently not supported") % index
raise exceptions.VcnsGeneralException(msg)
config = self._assemble_edge_vnic(
name, index, network, address, netmask, secondary, type=intf_type)
userdata = {
'edge_id': edge_id,
'config': config,
'jobdata': jobdata
}
task_name = "update-interface-%s-%d" % (edge_id, index)
task = tasks.Task(task_name, router_id,
self._update_interface, userdata=userdata)
task.add_result_monitor(self.callbacks.interface_update_result)
self.task_manager.add(task)
return task
def _deploy_edge(self, task):
userdata = task.userdata
name = userdata['router_name']
LOG.debug(_("VCNS: start deploying edge %s"), name)
request = userdata['request']
try:
header = self.vcns.deploy_edge(request)[0]
objuri = header['location']
job_id = objuri[objuri.rfind("/") + 1:]
response = self.vcns.get_edge_id(job_id)[1]
edge_id = response['edgeId']
LOG.debug(_("VCNS: deploying edge %s"), edge_id)
userdata['edge_id'] = edge_id
status = TaskStatus.PENDING
except exceptions.VcnsApiException as e:
LOG.exception(_("VCNS: deploy edge failed for router %s."),
name)
raise e
return status
def _status_edge(self, task):
edge_id = task.userdata['edge_id']
try:
response = self.vcns.get_edge_deploy_status(edge_id)[1]
task.userdata['retries'] = 0
system_status = response.get('systemStatus', None)
if system_status is None:
status = TaskStatus.PENDING
elif system_status == 'good':
status = TaskStatus.COMPLETED
else:
status = TaskStatus.ERROR
except exceptions.VcnsApiException as e:
LOG.exception(_("VCNS: Edge %s status query failed."), edge_id)
raise e
except Exception as e:
retries = task.userdata.get('retries', 0) + 1
if retries < 3:
task.userdata['retries'] = retries
msg = _("VCNS: Unable to retrieve edge %(edge_id)s status. "
"Retry %(retries)d.") % {
'edge_id': edge_id,
'retries': retries}
LOG.exception(msg)
status = TaskStatus.PENDING
else:
msg = _("VCNS: Unable to retrieve edge %s status. "
"Abort.") % edge_id
LOG.exception(msg)
status = TaskStatus.ERROR
LOG.debug(_("VCNS: Edge %s status"), edge_id)
return status
def _result_edge(self, task):
router_name = task.userdata['router_name']
edge_id = task.userdata.get('edge_id')
if task.status != TaskStatus.COMPLETED:
LOG.error(_("VCNS: Failed to deploy edge %(edge_id)s "
"for %(name)s, status %(status)d"), {
'edge_id': edge_id,
'name': router_name,
'status': task.status
})
else:
LOG.debug(_("VCNS: Edge %(edge_id)s deployed for "
"router %(name)s"), {
'edge_id': edge_id, 'name': router_name
})
def _delete_edge(self, task):
edge_id = task.userdata['edge_id']
LOG.debug(_("VCNS: start destroying edge %s"), edge_id)
status = TaskStatus.COMPLETED
if edge_id:
try:
self.vcns.delete_edge(edge_id)
except exceptions.ResourceNotFound:
pass
except exceptions.VcnsApiException as e:
msg = _("VCNS: Failed to delete %{edge_id)s:\n"
"%(response)s") % {
'edge_id': edge_id, 'response': e.response}
LOG.exception(msg)
status = TaskStatus.ERROR
except Exception:
LOG.exception(_("VCNS: Failed to delete %s"), edge_id)
status = TaskStatus.ERROR
return status
def _get_edges(self):
try:
return self.vcns.get_edges()[1]
except exceptions.VcnsApiException as e:
LOG.exception(_("VCNS: Failed to get edges:\n%s"), e.response)
raise e
def deploy_edge(self, router_id, name, internal_network, jobdata=None,
wait_for_exec=False):
task_name = 'deploying-%s' % name
edge_name = name
edge = self._assemble_edge(
edge_name, datacenter_moid=self.datacenter_moid,
deployment_container_id=self.deployment_container_id,
appliance_size='large', remote_access=True)
appliance = self._assemble_edge_appliance(self.resource_pool_id,
self.datastore_id)
if appliance:
edge['appliances']['appliances'] = [appliance]
vnic_external = self._assemble_edge_vnic(
vcns_const.EXTERNAL_VNIC_NAME, vcns_const.EXTERNAL_VNIC_INDEX,
self.external_network, type="uplink")
edge['vnics']['vnics'].append(vnic_external)
vnic_inside = self._assemble_edge_vnic(
vcns_const.INTERNAL_VNIC_NAME, vcns_const.INTERNAL_VNIC_INDEX,
internal_network,
vcns_const.INTEGRATION_EDGE_IPADDRESS,
vcns_const.INTEGRATION_SUBNET_NETMASK,
type="internal")
edge['vnics']['vnics'].append(vnic_inside)
userdata = {
'request': edge,
'router_name': name,
'jobdata': jobdata
}
task = tasks.Task(task_name, router_id,
self._deploy_edge,
status_callback=self._status_edge,
result_callback=self._result_edge,
userdata=userdata)
task.add_executed_monitor(self.callbacks.edge_deploy_started)
task.add_result_monitor(self.callbacks.edge_deploy_result)
self.task_manager.add(task)
if wait_for_exec:
# waitl until the deploy task is executed so edge_id is available
task.wait(TaskState.EXECUTED)
return task
def delete_edge(self, router_id, edge_id, jobdata=None):
task_name = 'delete-%s' % edge_id
userdata = {
'router_id': router_id,
'edge_id': edge_id,
'jobdata': jobdata
}
task = tasks.Task(task_name, router_id, self._delete_edge,
userdata=userdata)
task.add_result_monitor(self.callbacks.edge_delete_result)
self.task_manager.add(task)
return task
def _assemble_nat_rule(self, action, original_address,
translated_address,
vnic_index=vcns_const.EXTERNAL_VNIC_INDEX,
enabled=True):
nat_rule = {}
nat_rule['action'] = action
nat_rule['vnic'] = vnic_index
nat_rule['originalAddress'] = original_address
nat_rule['translatedAddress'] = translated_address
nat_rule['enabled'] = enabled
return nat_rule
def get_nat_config(self, edge_id):
try:
return self.vcns.get_nat_config(edge_id)[1]
except exceptions.VcnsApiException as e:
LOG.exception(_("VCNS: Failed to get nat config:\n%s"),
e.response)
raise e
def _create_nat_rule(self, task):
# TODO(fank): use POST for optimization
# return rule_id for future reference
rule = task.userdata['rule']
LOG.debug(_("VCNS: start creating nat rules: %s"), rule)
edge_id = task.userdata['edge_id']
nat = self.get_nat_config(edge_id)
location = task.userdata['location']
del nat['version']
if location is None or location == vcns_const.APPEND:
nat['rules']['natRulesDtos'].append(rule)
else:
nat['rules']['natRulesDtos'].insert(location, rule)
try:
self.vcns.update_nat_config(edge_id, nat)
status = TaskStatus.COMPLETED
except exceptions.VcnsApiException as e:
LOG.exception(_("VCNS: Failed to create snat rule:\n%s"),
e.response)
status = TaskStatus.ERROR
return status
def create_snat_rule(self, router_id, edge_id, src, translated,
jobdata=None, location=None):
LOG.debug(_("VCNS: create snat rule %(src)s/%(translated)s"), {
'src': src, 'translated': translated})
snat_rule = self._assemble_nat_rule("snat", src, translated)
userdata = {
'router_id': router_id,
'edge_id': edge_id,
'rule': snat_rule,
'location': location,
'jobdata': jobdata
}
task_name = "create-snat-%s-%s-%s" % (edge_id, src, translated)
task = tasks.Task(task_name, router_id, self._create_nat_rule,
userdata=userdata)
task.add_result_monitor(self.callbacks.snat_create_result)
self.task_manager.add(task)
return task
def _delete_nat_rule(self, task):
# TODO(fank): pass in rule_id for optimization
# handle routes update for optimization
edge_id = task.userdata['edge_id']
address = task.userdata['address']
addrtype = task.userdata['addrtype']
LOG.debug(_("VCNS: start deleting %(type)s rules: %(addr)s"), {
'type': addrtype, 'addr': address})
nat = self.get_nat_config(edge_id)
del nat['version']
status = TaskStatus.COMPLETED
for nat_rule in nat['rules']['natRulesDtos']:
if nat_rule[addrtype] == address:
rule_id = nat_rule['ruleId']
try:
self.vcns.delete_nat_rule(edge_id, rule_id)
except exceptions.VcnsApiException as e:
LOG.exception(_("VCNS: Failed to delete snat rule:\n"
"%s"), e.response)
status = TaskStatus.ERROR
return status
def delete_snat_rule(self, router_id, edge_id, src, jobdata=None):
LOG.debug(_("VCNS: delete snat rule %s"), src)
userdata = {
'edge_id': edge_id,
'address': src,
'addrtype': 'originalAddress',
'jobdata': jobdata
}
task_name = "delete-snat-%s-%s" % (edge_id, src)
task = tasks.Task(task_name, router_id, self._delete_nat_rule,
userdata=userdata)
task.add_result_monitor(self.callbacks.snat_delete_result)
self.task_manager.add(task)
return task
def create_dnat_rule(self, router_id, edge_id, dst, translated,
jobdata=None, location=None):
# TODO(fank): use POST for optimization
# return rule_id for future reference
LOG.debug(_("VCNS: create dnat rule %(dst)s/%(translated)s"), {
'dst': dst, 'translated': translated})
dnat_rule = self._assemble_nat_rule(
"dnat", dst, translated)
userdata = {
'router_id': router_id,
'edge_id': edge_id,
'rule': dnat_rule,
'location': location,
'jobdata': jobdata
}
task_name = "create-dnat-%s-%s-%s" % (edge_id, dst, translated)
task = tasks.Task(task_name, router_id, self._create_nat_rule,
userdata=userdata)
task.add_result_monitor(self.callbacks.dnat_create_result)
self.task_manager.add(task)
return task
def delete_dnat_rule(self, router_id, edge_id, translated,
jobdata=None):
# TODO(fank): pass in rule_id for optimization
LOG.debug(_("VCNS: delete dnat rule %s"), translated)
userdata = {
'edge_id': edge_id,
'address': translated,
'addrtype': 'translatedAddress',
'jobdata': jobdata
}
task_name = "delete-dnat-%s-%s" % (edge_id, translated)
task = tasks.Task(task_name, router_id, self._delete_nat_rule,
userdata=userdata)
task.add_result_monitor(self.callbacks.dnat_delete_result)
self.task_manager.add(task)
return task
def _update_nat_rule(self, task):
# TODO(fank): use POST for optimization
# return rule_id for future reference
edge_id = task.userdata['edge_id']
if task != self.updated_task['nat'][edge_id]:
# this task does not have the latest config, abort now
# for speedup
return TaskStatus.ABORT
rules = task.userdata['rules']
LOG.debug(_("VCNS: start updating nat rules: %s"), rules)
nat = {
'rules': {
'natRulesDtos': rules
}
}
try:
self.vcns.update_nat_config(edge_id, nat)
status = TaskStatus.COMPLETED
except exceptions.VcnsApiException as e:
LOG.exception(_("VCNS: Failed to create snat rule:\n%s"),
e.response)
status = TaskStatus.ERROR
return status
def update_nat_rules(self, router_id, edge_id, snats, dnats,
jobdata=None):
LOG.debug(_("VCNS: update nat rule\n"
"SNAT:%(snat)s\n"
"DNAT:%(dnat)s\n"), {
'snat': snats, 'dnat': dnats})
nat_rules = []
for dnat in dnats:
nat_rules.append(self._assemble_nat_rule(
'dnat', dnat['dst'], dnat['translated']))
nat_rules.append(self._assemble_nat_rule(
'snat', dnat['translated'], dnat['dst']))
for snat in snats:
nat_rules.append(self._assemble_nat_rule(
'snat', snat['src'], snat['translated']))
userdata = {
'edge_id': edge_id,
'rules': nat_rules,
'jobdata': jobdata,
}
task_name = "update-nat-%s" % edge_id
task = tasks.Task(task_name, router_id, self._update_nat_rule,
userdata=userdata)
task.add_result_monitor(self.callbacks.nat_update_result)
self.updated_task['nat'][edge_id] = task
self.task_manager.add(task)
return task
def _update_routes(self, task):
edge_id = task.userdata['edge_id']
if (task != self.updated_task['route'][edge_id] and
task.userdata.get('skippable', True)):
# this task does not have the latest config, abort now
# for speedup
return TaskStatus.ABORT
gateway = task.userdata['gateway']
routes = task.userdata['routes']
LOG.debug(_("VCNS: start updating routes for %s"), edge_id)
static_routes = []
for route in routes:
static_routes.append({
"route": {
"description": "",
"vnic": vcns_const.INTERNAL_VNIC_INDEX,
"network": route['cidr'],
"nextHop": route['nexthop']
}
})
request = {
"staticRouting": {
"staticRoutes": static_routes,
}
}
if gateway:
request["staticRouting"]["defaultRoute"] = {
"description": "default-gateway",
"gatewayAddress": gateway,
"vnic": vcns_const.EXTERNAL_VNIC_INDEX
}
try:
self.vcns.update_routes(edge_id, request)
status = TaskStatus.COMPLETED
except exceptions.VcnsApiException as e:
LOG.exception(_("VCNS: Failed to update routes:\n%s"),
e.response)
status = TaskStatus.ERROR
return status
def update_routes(self, router_id, edge_id, gateway, routes,
skippable=True, jobdata=None):
if gateway:
gateway = gateway.split('/')[0]
userdata = {
'edge_id': edge_id,
'gateway': gateway,
'routes': routes,
'skippable': skippable,
'jobdata': jobdata
}
task_name = "update-routes-%s" % (edge_id)
task = tasks.Task(task_name, router_id, self._update_routes,
userdata=userdata)
task.add_result_monitor(self.callbacks.routes_update_result)
self.updated_task['route'][edge_id] = task
self.task_manager.add(task)
return task
def create_lswitch(self, name, tz_config):
lsconfig = {
'display_name': name,
"tags": [],
"type": "LogicalSwitchConfig",
"_schema": "/ws.v1/schema/LogicalSwitchConfig",
"port_isolation_enabled": False,
"replication_mode": "service",
"transport_zones": tz_config
}
response = self.vcns.create_lswitch(lsconfig)[1]
return response
def delete_lswitch(self, lswitch_id):
self.vcns.delete_lswitch(lswitch_id)

View File

@ -0,0 +1,16 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,46 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class TaskStatus(object):
"""Task running status.
This is used by execution/status callback function to notify the
task manager what's the status of current task, and also used for
indication the final task execution result.
"""
PENDING = 1
COMPLETED = 2
ERROR = 3
ABORT = 4
class TaskState(object):
"""Current state of a task.
This is to keep track of the current state of a task.
NONE: the task is still in the queue
START: the task is pull out from the queue and is about to be executed
EXECUTED: the task has been executed
STATUS: we're running periodic status check for this task
RESULT: the task has finished and result is ready
"""
NONE = -1
START = 0
EXECUTED = 1
STATUS = 2
RESULT = 3

View File

@ -0,0 +1,385 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import uuid
from eventlet import event
from eventlet import greenthread
from eventlet.support import greenlets as greenlet
from neutron.common import exceptions
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.plugins.nicira.vshield.tasks.constants import TaskState
from neutron.plugins.nicira.vshield.tasks.constants import TaskStatus
DEFAULT_INTERVAL = 1000
LOG = logging.getLogger(__name__)
def nop(task):
return TaskStatus.COMPLETED
class TaskException(exceptions.NeutronException):
def __init__(self, message=None, **kwargs):
if message is not None:
self.message = message
super(TaskException, self).__init__(**kwargs)
class InvalidState(TaskException):
message = _("Invalid state %(state)d")
class TaskStateSkipped(TaskException):
message = _("State %(state)d skipped. Current state %(current)d")
class Task():
def __init__(self, name, resource_id, execute_callback,
status_callback=nop, result_callback=nop, userdata=None):
self.name = name
self.resource_id = resource_id
self._execute_callback = execute_callback
self._status_callback = status_callback
self._result_callback = result_callback
self.userdata = userdata
self.id = None
self.status = None
self._monitors = {
TaskState.START: [],
TaskState.EXECUTED: [],
TaskState.RESULT: []
}
self._states = [None, None, None, None]
self._state = TaskState.NONE
def _add_monitor(self, action, func):
self._monitors[action].append(func)
return self
def _move_state(self, state):
self._state = state
if self._states[state] is not None:
e = self._states[state]
self._states[state] = None
e.send()
for s in range(state):
if self._states[s] is not None:
e = self._states[s]
self._states[s] = None
e.send_exception(
TaskStateSkipped(state=s, current=self._state))
def _invoke_monitor(self, state):
for func in self._monitors[state]:
try:
func(self)
except Exception:
msg = _("Task %(task)s encountered exception in %(func)s "
"at state %(state)s") % {
'task': str(self),
'func': str(func),
'state': state}
LOG.exception(msg)
self._move_state(state)
return self
def _start(self):
return self._invoke_monitor(TaskState.START)
def _executed(self):
return self._invoke_monitor(TaskState.EXECUTED)
def _update_status(self, status):
if self.status == status:
return self
self.status = status
def _finished(self):
return self._invoke_monitor(TaskState.RESULT)
def add_start_monitor(self, func):
return self._add_monitor(TaskState.START, func)
def add_executed_monitor(self, func):
return self._add_monitor(TaskState.EXECUTED, func)
def add_result_monitor(self, func):
return self._add_monitor(TaskState.RESULT, func)
def wait(self, state):
if (state < TaskState.START or
state > TaskState.RESULT or
state == TaskState.STATUS):
raise InvalidState(state=state)
if state <= self._state:
# we already passed this current state, so no wait
return
e = event.Event()
self._states[state] = e
e.wait()
def __repr__(self):
return "Task-%s-%s-%s" % (
self.name, self.resource_id, self.id)
class TaskManager():
_instance = None
_default_interval = DEFAULT_INTERVAL
def __init__(self, interval=None):
self._interval = interval or TaskManager._default_interval
# A queue to pass tasks from other threads
self._tasks_queue = collections.deque()
# A dict to store resource -> resource's tasks
self._tasks = {}
# New request event
self._req = event.Event()
# TaskHandler stopped event
self._stopped = event.Event()
# Periodic function trigger
self._monitor = None
self._monitor_busy = False
self._monitor_stop = None
# Thread handling the task request
self._thread = None
def _execute(self, task):
"""Execute task."""
msg = _("Start task %s") % str(task)
LOG.debug(msg)
task._start()
try:
status = task._execute_callback(task)
except Exception:
msg = _("Task %(task)s encountered exception in %(cb)s") % {
'task': str(task),
'cb': str(task._execute_callback)}
LOG.exception(msg)
status = TaskStatus.ERROR
LOG.debug(_("Task %(task)s return %(status)s"), {
'task': str(task),
'status': status})
task._update_status(status)
task._executed()
return status
def _result(self, task):
"""Notify task execution result."""
try:
task._result_callback(task)
except Exception:
msg = _("Task %(task)s encountered exception in %(cb)s") % {
'task': str(task),
'cb': str(task._result_callback)}
LOG.exception(msg)
LOG.debug(_("Task %(task)s return %(status)s") % {
'task': str(task),
'status': task.status})
task._finished()
def _check_pending_tasks(self):
"""Check all pending tasks status."""
for resource_id in self._tasks.keys():
if self._monitor_stop:
# looping call is asked to stop, return now
return
tasks = self._tasks[resource_id]
# only the first task is executed and pending
task = tasks[0]
try:
status = task._status_callback(task)
except Exception:
msg = _("Task %(task)s encountered exception in %(cb)s") % {
'task': str(task),
'cb': str(task._status_callback)}
LOG.exception(msg)
status = TaskStatus.ERROR
task._update_status(status)
if status != TaskStatus.PENDING:
self._dequeue(task, True)
def _enqueue(self, task):
if task.resource_id in self._tasks:
# append to existing resource queue for ordered processing
self._tasks[task.resource_id].append(task)
else:
# put the task to a new resource queue
tasks = collections.deque()
tasks.append(task)
self._tasks[task.resource_id] = tasks
def _dequeue(self, task, run_next):
self._result(task)
tasks = self._tasks[task.resource_id]
tasks.remove(task)
if not tasks:
# no more tasks for this resource
del self._tasks[task.resource_id]
return
if run_next:
# process next task for this resource
while tasks:
task = tasks[0]
status = self._execute(task)
if status == TaskStatus.PENDING:
break
self._dequeue(task, False)
def _abort(self):
"""Abort all tasks."""
for resource_id in self._tasks.keys():
tasks = list(self._tasks[resource_id])
for task in tasks:
task._update_status(TaskStatus.ABORT)
self._dequeue(task, False)
def _get_task(self):
"""Get task request."""
while True:
for t in self._tasks_queue:
return self._tasks_queue.popleft()
self._req.wait()
self._req.reset()
def run(self):
while True:
try:
# get a task from queue, or timeout for periodic status check
task = self._get_task()
if task.resource_id in self._tasks:
# this resource already has some tasks under processing,
# append the task to same queue for ordered processing
self._enqueue(task)
continue
status = self._execute(task)
if status != TaskStatus.PENDING:
self._result(task)
continue
self._enqueue(task)
except greenlet.GreenletExit:
break
except Exception:
LOG.exception(_("TaskManager terminated"))
break
self._monitor.stop()
if self._monitor_busy:
self._monitor_stop = event.Event()
self._monitor_stop.wait()
self._monitor_stop = None
self._abort()
self._stopped.send()
def add(self, task):
task.id = uuid.uuid1()
self._tasks_queue.append(task)
if not self._req.ready():
self._req.send()
return task.id
def stop(self):
if not self._thread:
return
self._thread.kill()
self._stopped.wait()
self._thread = None
def has_pending_task(self):
if self._tasks_queue:
return True
if self._tasks:
return True
return False
def show_pending_tasks(self):
for task in self._tasks_queue:
print str(task)
for resource, tasks in self._tasks.iteritems():
for task in tasks:
print str(task)
def count(self):
count = 0
for resource_id, tasks in self._tasks.iteritems():
count += len(tasks)
return count
def start(self, interval=None):
def _inner():
self.run()
def _loopingcall_callback():
try:
self._monitor_busy = True
self._check_pending_tasks()
self._monitor_busy = False
if self._monitor_stop:
self._monitor_stop.send()
except Exception:
LOG.exception(_("Exception in _check_pending_tasks"))
if self._thread:
return self
if interval is None or interval == 0:
interval = self._interval
self._thread = greenthread.spawn(_inner)
self._monitor = loopingcall.FixedIntervalLoopingCall(
_loopingcall_callback)
self._monitor.start(interval / 1000.0,
interval / 1000.0)
return self
@classmethod
def set_default_interval(cls, interval):
cls._default_interval = interval

View File

@ -0,0 +1,111 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 VMware, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: linb, VMware
from neutron.openstack.common import jsonutils
from neutron.openstack.common import log as logging
from neutron.plugins.nicira.vshield.common import VcnsApiClient
LOG = logging.getLogger(__name__)
HTTP_GET = "GET"
HTTP_POST = "POST"
HTTP_DELETE = "DELETE"
HTTP_PUT = "PUT"
URI_PREFIX = "/api/4.0/edges"
class Vcns(object):
def __init__(self, address, user, password):
self.address = address
self.user = user
self.password = password
self.jsonapi_client = VcnsApiClient.VcnsApiHelper(address, user,
password, 'json')
# TODO(fank): remove this after json syntax is fixed on VSM
self.xmlapi_client = VcnsApiClient.VcnsApiHelper(address, user,
password, 'xml')
def do_request(self, method, uri, params=None, format='json', **kwargs):
LOG.debug(_("VcnsApiHelper('%(method)s', '%(uri)s', '%(body)s')"), {
'method': method,
'uri': uri,
'body': jsonutils.dumps(params)})
if format == 'json':
header, content = self.jsonapi_client.request(method, uri, params)
else:
header, content = self.xmlapi_client.request(method, uri, params)
LOG.debug(_("Header: '%s'"), header)
LOG.debug(_("Content: '%s'"), content)
if content == '':
return header, {}
if kwargs.get('decode', True):
content = jsonutils.loads(content)
return header, content
def deploy_edge(self, request):
uri = URI_PREFIX + "?async=true"
return self.do_request(HTTP_POST, uri, request, decode=False)
def get_edge_id(self, job_id):
uri = URI_PREFIX + "/jobs/%s" % job_id
return self.do_request(HTTP_GET, uri, decode=True)
def get_edge_deploy_status(self, edge_id):
uri = URI_PREFIX + "/%s/status?getlatest=false" % edge_id
return self.do_request(HTTP_GET, uri, decode="True")
def delete_edge(self, edge_id):
uri = "%s/%s" % (URI_PREFIX, edge_id)
return self.do_request(HTTP_DELETE, uri)
def update_interface(self, edge_id, vnic):
uri = "%s/%s/vnics/%d" % (URI_PREFIX, edge_id, vnic['index'])
return self.do_request(HTTP_PUT, uri, vnic, decode=True)
def get_nat_config(self, edge_id):
uri = "%s/%s/nat/config" % (URI_PREFIX, edge_id)
return self.do_request(HTTP_GET, uri, decode=True)
def update_nat_config(self, edge_id, nat):
uri = "%s/%s/nat/config" % (URI_PREFIX, edge_id)
return self.do_request(HTTP_PUT, uri, nat, decode=True)
def delete_nat_rule(self, edge_id, rule_id):
uri = "%s/%s/nat/config/rules/%s" % (URI_PREFIX, edge_id, rule_id)
return self.do_request(HTTP_DELETE, uri, decode=True)
def get_edge_status(self, edge_id):
uri = "%s/%s/status?getlatest=false" % (URI_PREFIX, edge_id)
return self.do_request(HTTP_GET, uri, decode=True)
def get_edges(self):
uri = URI_PREFIX
return self.do_request(HTTP_GET, uri, decode=True)
def update_routes(self, edge_id, routes):
uri = "%s/%s/routing/config/static" % (URI_PREFIX, edge_id)
return self.do_request(HTTP_PUT, uri, routes, format='xml')
def create_lswitch(self, lsconfig):
uri = "/api/ws.v1/lswitch"
return self.do_request(HTTP_POST, uri, lsconfig, decode=True)
def delete_lswitch(self, lswitch_id):
uri = "/api/ws.v1/lswitch/%s" % lswitch_id
return self.do_request(HTTP_DELETE, uri)

View File

@ -0,0 +1,44 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 VMware, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: linb, VMware
from oslo.config import cfg
from neutron.plugins.nicira.common import config # noqa
from neutron.plugins.nicira.vshield import edge_appliance_driver
from neutron.plugins.nicira.vshield.tasks import tasks
from neutron.plugins.nicira.vshield import vcns
class VcnsDriver(edge_appliance_driver.EdgeApplianceDriver):
def __init__(self, callbacks):
super(VcnsDriver, self).__init__()
self.callbacks = callbacks
self.vcns_uri = cfg.CONF.vcns.manager_uri
self.vcns_user = cfg.CONF.vcns.user
self.vcns_passwd = cfg.CONF.vcns.password
self.datacenter_moid = cfg.CONF.vcns.datacenter_moid
self.deployment_container_id = cfg.CONF.vcns.deployment_container_id
self.resource_pool_id = cfg.CONF.vcns.resource_pool_id
self.datastore_id = cfg.CONF.vcns.datastore_id
self.external_network = cfg.CONF.vcns.external_network
interval = cfg.CONF.vcns.task_status_check_interval
self.task_manager = tasks.TaskManager(interval)
self.task_manager.start()
self.vcns = vcns.Vcns(self.vcns_uri, self.vcns_user, self.vcns_passwd)

View File

@ -18,19 +18,22 @@
import os
import neutron.plugins.nicira.api_client.client_eventlet as client
from neutron.plugins.nicira import extensions
import neutron.plugins.nicira.NeutronPlugin as plugin
import neutron.plugins.nicira.NvpApiClient as nvpapi
from neutron.plugins.nicira.vshield import vcns
nvp_plugin = plugin.NvpPluginV2
api_helper = nvpapi.NVPApiHelper
nvp_client = client.NvpApiClientEventlet
vcns_class = vcns.Vcns
STUBS_PATH = os.path.join(os.path.dirname(__file__), 'etc')
NVPEXT_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)),
"../../plugins/nicira/extensions")
NVPEXT_PATH = os.path.dirname(extensions.__file__)
NVPAPI_NAME = '%s.%s' % (api_helper.__module__, api_helper.__name__)
PLUGIN_NAME = '%s.%s' % (nvp_plugin.__module__, nvp_plugin.__name__)
CLIENT_NAME = '%s.%s' % (nvp_client.__module__, nvp_client.__name__)
VCNS_NAME = '%s.%s' % (vcns_class.__module__, vcns_class.__name__)
def get_fake_conf(filename):

View File

@ -0,0 +1,9 @@
[vcns]
manager_uri = https://fake-host
user = fake-user
passwordd = fake-password
datacenter_moid = fake-moid
resource_pool_id = fake-resgroup
datastore_id = fake-datastore
external_network = fake-ext-net
task_status_check_interval = 100

View File

@ -0,0 +1,541 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from eventlet import greenthread
import mock
from neutron.common import config as n_config
from neutron.plugins.nicira.vshield.common import (
constants as vcns_const)
from neutron.plugins.nicira.vshield.common.constants import RouterStatus
from neutron.plugins.nicira.vshield.tasks.constants import TaskState
from neutron.plugins.nicira.vshield.tasks.constants import TaskStatus
from neutron.plugins.nicira.vshield.tasks import tasks as ts
from neutron.plugins.nicira.vshield import vcns_driver
from neutron.tests import base
from neutron.tests.unit.nicira import get_fake_conf
from neutron.tests.unit.nicira import VCNS_NAME
from neutron.tests.unit.nicira.vshield import fake_vcns
VCNS_CONFIG_FILE = get_fake_conf("vcns.ini.test")
ts.TaskManager.set_default_interval(100)
class VcnsDriverTaskManagerTestCase(base.BaseTestCase):
def setUp(self):
super(VcnsDriverTaskManagerTestCase, self).setUp()
self.manager = ts.TaskManager()
self.manager.start(100)
def tearDown(self):
self.manager.stop()
super(VcnsDriverTaskManagerTestCase, self).tearDown()
def _test_task_manager_task_process_state(self, sync_exec=False):
def _task_failed(task, reason):
task.userdata['result'] = False
task.userdata['error'] = reason
def _check_state(task, exp_state):
if not task.userdata.get('result', True):
return False
state = task.userdata['state']
if state != exp_state:
msg = "state %d expect %d" % (
state, exp_state)
_task_failed(task, msg)
return False
task.userdata['state'] = state + 1
return True
def _exec(task):
if not _check_state(task, 1):
return TaskStatus.ERROR
if task.userdata['sync_exec']:
return TaskStatus.COMPLETED
else:
return TaskStatus.PENDING
def _status(task):
if task.userdata['sync_exec']:
_task_failed(task, "_status callback triggered")
state = task.userdata['state']
if state == 3:
_check_state(task, 3)
return TaskStatus.PENDING
else:
_check_state(task, 4)
return TaskStatus.COMPLETED
def _result(task):
if task.userdata['sync_exec']:
exp_state = 3
else:
exp_state = 5
_check_state(task, exp_state)
def _start_monitor(task):
_check_state(task, 0)
def _executed_monitor(task):
_check_state(task, 2)
def _result_monitor(task):
if task.userdata['sync_exec']:
exp_state = 4
else:
exp_state = 6
if _check_state(task, exp_state):
task.userdata['result'] = True
else:
task.userdata['result'] = False
userdata = {
'state': 0,
'sync_exec': sync_exec
}
task = ts.Task('name', 'res', _exec, _status, _result, userdata)
task.add_start_monitor(_start_monitor)
task.add_executed_monitor(_executed_monitor)
task.add_result_monitor(_result_monitor)
self.manager.add(task)
task.wait(TaskState.RESULT)
if 'error' in userdata:
print userdata['error']
self.assertTrue(userdata['result'])
def test_task_manager_task_sync_exec_process_state(self):
self._test_task_manager_task_process_state(sync_exec=True)
def test_task_manager_task_async_exec_process_state(self):
self._test_task_manager_task_process_state(sync_exec=False)
def test_task_manager_task_ordered_process(self):
def _task_failed(task, reason):
task.userdata['result'] = False
task.userdata['error'] = reason
def _exec(task):
task.userdata['executed'] = True
return TaskStatus.PENDING
def _status(task):
return TaskStatus.COMPLETED
def _result(task):
next_task = task.userdata.get('next')
if next_task:
if next_task.userdata.get('executed'):
_task_failed(next_task, "executed premature")
if task.userdata.get('result', True):
task.userdata['result'] = True
tasks = []
prev = None
last_task = None
for i in range(5):
name = "name-%d" % i
task = ts.Task(name, 'res', _exec, _status, _result, {})
tasks.append(task)
if prev:
prev.userdata['next'] = task
prev = task
last_task = task
for task in tasks:
self.manager.add(task)
last_task.wait(TaskState.RESULT)
for task in tasks:
if 'error' in task.userdata:
print "Task %s failed: " % (
tasks.name, tasks.userdata['error'])
for task in tasks:
self.assertTrue(task.userdata['result'])
def test_task_manager_task_parallel_process(self):
tasks = []
def _exec(task):
task.userdata['executed'] = True
return TaskStatus.PENDING
def _status(task):
for t in tasks:
if not t.userdata.get('executed'):
t.userdata['resut'] = False
return TaskStatus.COMPLETED
def _result(task):
if (task.userdata.get('result') is None and
task.status == TaskStatus.COMPLETED):
task.userdata['result'] = True
else:
task.userdata['result'] = False
for i in range(5):
name = "name-%d" % i
res = 'resource-%d' % i
task = ts.Task(name, res, _exec, _status, _result, {})
tasks.append(task)
self.manager.add(task)
for task in tasks:
task.wait(TaskState.RESULT)
self.assertTrue(task.userdata['result'])
def test_task_manager_stop(self):
def _exec(task):
return TaskStatus.PENDING
def _status(task):
greenthread.sleep(0.1)
return TaskStatus.PENDING
def _result(task):
pass
manager = ts.TaskManager().start(100)
alltasks = {}
for i in range(100):
res = 'res-%d' % i
tasks = []
for i in range(100):
task = ts.Task('name', res, _exec, _status, _result)
manager.add(task)
tasks.append(task)
alltasks[res] = tasks
greenthread.sleep(2)
manager.stop()
for res, tasks in alltasks.iteritems():
for task in tasks:
self.assertEqual(task.status, TaskStatus.ABORT)
class VcnsDriverTestCase(base.BaseTestCase):
def vcns_patch(self):
instance = self.mock_vcns.start()
instance.return_value.deploy_edge.side_effect = self.fc.deploy_edge
instance.return_value.get_edge_id.side_effect = self.fc.get_edge_id
instance.return_value.get_edge_deploy_status.side_effect = (
self.fc.get_edge_deploy_status)
instance.return_value.delete_edge.side_effect = self.fc.delete_edge
instance.return_value.update_interface.side_effect = (
self.fc.update_interface)
instance.return_value.get_nat_config.side_effect = (
self.fc.get_nat_config)
instance.return_value.update_nat_config.side_effect = (
self.fc.update_nat_config)
instance.return_value.delete_nat_rule.side_effect = (
self.fc.delete_nat_rule)
instance.return_value.get_edge_status.side_effect = (
self.fc.get_edge_status)
instance.return_value.get_edges.side_effect = self.fc.get_edges
instance.return_value.update_routes.side_effect = (
self.fc.update_routes)
instance.return_value.create_lswitch.side_effect = (
self.fc.create_lswitch)
instance.return_value.delete_lswitch.side_effect = (
self.fc.delete_lswitch)
def setUp(self):
super(VcnsDriverTestCase, self).setUp()
n_config.parse(['--config-file', VCNS_CONFIG_FILE])
self.fc = fake_vcns.FakeVcns()
self.mock_vcns = mock.patch(VCNS_NAME, autospec=True)
self.vcns_patch()
self.addCleanup(self.fc.reset_all)
self.addCleanup(self.mock_vcns.stop)
self.vcns_driver = vcns_driver.VcnsDriver(self)
self.edge_id = None
self.result = None
def _deploy_edge(self):
task = self.vcns_driver.deploy_edge(
'router-id', 'myedge', 'internal-network', {}, wait_for_exec=True)
self.assertEqual(self.edge_id, 'edge-1')
task.wait(TaskState.RESULT)
return task
def edge_deploy_started(self, task):
self.edge_id = task.userdata['edge_id']
def edge_deploy_result(self, task):
if task.status == TaskStatus.COMPLETED:
task.userdata['jobdata']['edge_deploy_result'] = True
def edge_delete_result(self, task):
if task.status == TaskStatus.COMPLETED:
task.userdata['jobdata']['edge_delete_result'] = True
def snat_create_result(self, task):
if task.status == TaskStatus.COMPLETED:
task.userdata['jobdata']['snat_create_result'] = True
def snat_delete_result(self, task):
if task.status == TaskStatus.COMPLETED:
task.userdata['jobdata']['snat_delete_result'] = True
def dnat_create_result(self, task):
if task.status == TaskStatus.COMPLETED:
task.userdata['jobdata']['dnat_create_result'] = True
def dnat_delete_result(self, task):
if task.status == TaskStatus.COMPLETED:
task.userdata['jobdata']['dnat_delete_result'] = True
def nat_update_result(self, task):
if task.status == TaskStatus.COMPLETED:
task.userdata['jobdata']['nat_update_result'] = True
def routes_update_result(self, task):
if task.status == TaskStatus.COMPLETED:
task.userdata['jobdata']['routes_update_result'] = True
def interface_update_result(self, task):
if task.status == TaskStatus.COMPLETED:
task.userdata['jobdata']['interface_update_result'] = True
def test_deploy_edge(self):
jobdata = {}
task = self.vcns_driver.deploy_edge(
'router-id', 'myedge', 'internal-network', jobdata=jobdata,
wait_for_exec=True)
self.assertEqual(self.edge_id, 'edge-1')
task.wait(TaskState.RESULT)
self.assertEqual(task.status, TaskStatus.COMPLETED)
self.assertTrue(jobdata.get('edge_deploy_result'))
def test_deploy_edge_fail(self):
self.vcns_driver.deploy_edge(
'router-1', 'myedge', 'internal-network', {}, wait_for_exec=True)
task = self.vcns_driver.deploy_edge(
'router-2', 'myedge', 'internal-network', {}, wait_for_exec=True)
task.wait(TaskState.RESULT)
self.assertEqual(task.status, TaskStatus.ERROR)
def test_get_edge_status(self):
self._deploy_edge()
status = self.vcns_driver.get_edge_status(self.edge_id)
self.assertEqual(status, RouterStatus.ROUTER_STATUS_ACTIVE)
def test_get_edges(self):
self._deploy_edge()
edges = self.vcns_driver.get_edges_statuses()
found = False
for edge_id, status in edges.iteritems():
if edge_id == self.edge_id:
found = True
break
self.assertTrue(found)
def _create_nat_rule(self, edge_id, action, org, translated):
jobdata = {}
if action == 'snat':
task = self.vcns_driver.create_snat_rule(
'router-id', edge_id, org, translated, jobdata=jobdata)
key = 'snat_create_result'
else:
task = self.vcns_driver.create_dnat_rule(
'router-id', edge_id, org, translated, jobdata=jobdata)
key = 'dnat_create_result'
task.wait(TaskState.RESULT)
self.assertTrue(jobdata.get(key))
def _delete_nat_rule(self, edge_id, action, addr):
jobdata = {}
if action == 'snat':
task = self.vcns_driver.delete_snat_rule(
'router-id', edge_id, addr, jobdata=jobdata)
key = 'snat_delete_result'
else:
task = self.vcns_driver.delete_dnat_rule(
'router-id', edge_id, addr, jobdata=jobdata)
key = 'dnat_delete_result'
task.wait(TaskState.RESULT)
self.assertTrue(jobdata.get(key))
def _test_create_nat_rule(self, action):
self._deploy_edge()
addr = '192.168.1.1'
translated = '10.0.0.1'
self._create_nat_rule(self.edge_id, action, addr, translated)
natcfg = self.vcns_driver.get_nat_config(self.edge_id)
for rule in natcfg['rules']['natRulesDtos']:
if (rule['originalAddress'] == addr and
rule['translatedAddress'] == translated and
rule['action'] == action):
break
else:
self.assertTrue(False)
def _test_delete_nat_rule(self, action):
self._deploy_edge()
addr = '192.168.1.1'
translated = '10.0.0.1'
self._create_nat_rule(self.edge_id, action, addr, translated)
if action == 'snat':
self._delete_nat_rule(self.edge_id, action, addr)
else:
self._delete_nat_rule(self.edge_id, action, translated)
natcfg = self.vcns_driver.get_nat_config(self.edge_id)
for rule in natcfg['rules']['natRulesDtos']:
if (rule['originalAddress'] == addr and
rule['translatedAddress'] == translated and
rule['action'] == action):
self.assertTrue(False)
break
def test_create_snat_rule(self):
self._test_create_nat_rule('snat')
def test_delete_snat_rule(self):
self._test_delete_nat_rule('snat')
def test_create_dnat_rule(self):
self._test_create_nat_rule('dnat')
def test_delete_dnat_rule(self):
self._test_delete_nat_rule('dnat')
def test_update_nat_rules(self):
self._deploy_edge()
jobdata = {}
snats = [{
'src': '192.168.1.0/24',
'translated': '10.0.0.1'
}, {
'src': '192.168.2.0/24',
'translated': '10.0.0.2'
}, {
'src': '192.168.3.0/24',
'translated': '10.0.0.3'
}
]
dnats = [{
'dst': '100.0.0.4',
'translated': '192.168.1.1'
}, {
'dst': '100.0.0.5',
'translated': '192.168.2.1'
}
]
task = self.vcns_driver.update_nat_rules(
'router-id', self.edge_id, snats, dnats, jobdata=jobdata)
task.wait(TaskState.RESULT)
self.assertTrue(jobdata.get('nat_update_result'))
natcfg = self.vcns_driver.get_nat_config(self.edge_id)
rules = natcfg['rules']['natRulesDtos']
self.assertEqual(len(rules), 2 * len(dnats) + len(snats))
self.natEquals(rules[0], dnats[0])
self.natEquals(rules[1], self.snat_for_dnat(dnats[0]))
self.natEquals(rules[2], dnats[1])
self.natEquals(rules[3], self.snat_for_dnat(dnats[1]))
self.natEquals(rules[4], snats[0])
self.natEquals(rules[5], snats[1])
self.natEquals(rules[6], snats[2])
def snat_for_dnat(self, dnat):
return {
'src': dnat['translated'],
'translated': dnat['dst']
}
def natEquals(self, rule, exp):
addr = exp.get('src')
if not addr:
addr = exp.get('dst')
self.assertEqual(rule['originalAddress'], addr)
self.assertEqual(rule['translatedAddress'], exp['translated'])
def test_update_routes(self):
self._deploy_edge()
jobdata = {}
routes = [{
'cidr': '192.168.1.0/24',
'nexthop': '169.254.2.1'
}, {
'cidr': '192.168.2.0/24',
'nexthop': '169.254.2.1'
}, {
'cidr': '192.168.3.0/24',
'nexthop': '169.254.2.1'
}
]
task = self.vcns_driver.update_routes(
'router-id', self.edge_id, '10.0.0.1', routes, jobdata=jobdata)
task.wait(TaskState.RESULT)
self.assertTrue(jobdata.get('routes_update_result'))
def test_update_interface(self):
self._deploy_edge()
jobdata = {}
task = self.vcns_driver.update_interface(
'router-id', self.edge_id, vcns_const.EXTERNAL_VNIC_INDEX,
'network-id', address='100.0.0.3', netmask='255.255.255.0',
jobdata=jobdata)
task.wait(TaskState.RESULT)
self.assertTrue(jobdata.get('interface_update_result'))
def test_delete_edge(self):
self._deploy_edge()
jobdata = {}
task = self.vcns_driver.delete_edge(
'router-id', self.edge_id, jobdata=jobdata)
task.wait(TaskState.RESULT)
self.assertTrue(jobdata.get('edge_delete_result'))
def test_create_lswitch(self):
tz_config = [{
'transport_zone_uuid': 'tz-uuid'
}]
lswitch = self.vcns_driver.create_lswitch('lswitch', tz_config)
self.assertEqual(lswitch['display_name'], 'lswitch')
self.assertEqual(lswitch['type'], 'LogicalSwitchConfig')
self.assertIn('uuid', lswitch)
def test_delete_lswitch(self):
tz_config = {
'transport_zone_uuid': 'tz-uuid'
}
lswitch = self.vcns_driver.create_lswitch('lswitch', tz_config)
self.vcns_driver.delete_lswitch(lswitch['uuid'])

View File

@ -0,0 +1,16 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,16 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,249 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 VMware, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: linb, VMware
import copy
import json
from neutron.openstack.common import uuidutils
class FakeVcns(object):
def __init__(self, unique_router_name=True):
self._jobs = {}
self._job_idx = 0
self._edges = {}
self._edge_idx = 0
self._lswitches = {}
self._unique_router_name = unique_router_name
self._fake_nvpapi = None
def set_fake_nvpapi(self, fake_nvpapi):
self._fake_nvpapi = fake_nvpapi
def _validate_edge_name(self, name):
for edge_id, edge in self._edges.iteritems():
if edge['name'] == name:
return False
return True
def deploy_edge(self, request):
if (self._unique_router_name and
not self._validate_edge_name(request['name'])):
header = {
'status': 400
}
msg = ('Edge name should be unique for tenant. Edge %s '
'already exists for default tenant.') % request['name']
response = {
'details': msg,
'errorCode': 10085,
'rootCauseString': None,
'moduleName': 'vShield Edge',
'errorData': None
}
return (header, json.dumps(response))
self._job_idx = self._job_idx + 1
job_id = "jobdata-%d" % self._job_idx
self._edge_idx = self._edge_idx + 1
edge_id = "edge-%d" % self._edge_idx
self._jobs[job_id] = edge_id
self._edges[edge_id] = {
'name': request['name'],
'request': request,
'nat_rules': None,
'nat_rule_id': 0
}
header = {
'status': 200,
'location': 'https://host/api/4.0/jobs/%s' % job_id
}
response = ''
return (header, response)
def get_edge_id(self, job_id):
if job_id not in self._jobs:
raise Exception(_("Job %s does not nexist") % job_id)
header = {
'status': 200
}
response = {
'edgeId': self._jobs[job_id]
}
return (header, response)
def get_edge_deploy_status(self, edge_id):
if edge_id not in self._edges:
raise Exception(_("Edge %s does not exist") % edge_id)
header = {
'status': 200,
}
response = {
'systemStatus': 'good'
}
return (header, response)
def delete_edge(self, edge_id):
if edge_id not in self._edges:
raise Exception(_("Edge %s does not exist") % edge_id)
del self._edges[edge_id]
header = {
'status': 200
}
response = ''
return (header, response)
def update_interface(self, edge_id, vnic):
header = {
'status': 200
}
response = ''
return (header, response)
def get_nat_config(self, edge_id):
if edge_id not in self._edges:
raise Exception(_("Edge %s does not exist") % edge_id)
edge = self._edges[edge_id]
rules = edge['nat_rules']
if rules is None:
rules = {
'rules': {
'natRulesDtos': []
},
'version': 1
}
header = {
'status': 200
}
rules['version'] = 1
return (header, rules)
def update_nat_config(self, edge_id, nat):
if edge_id not in self._edges:
raise Exception(_("Edge %s does not exist") % edge_id)
edge = self._edges[edge_id]
max_rule_id = edge['nat_rule_id']
rules = copy.deepcopy(nat)
for rule in rules['rules']['natRulesDtos']:
rule_id = rule.get('ruleId', 0)
if rule_id > max_rule_id:
max_rule_id = rule_id
for rule in rules['rules']['natRulesDtos']:
if 'ruleId' not in rule:
max_rule_id = max_rule_id + 1
rule['ruleId'] = max_rule_id
edge['nat_rules'] = rules
edge['nat_rule_id'] = max_rule_id
header = {
'status': 200
}
response = ''
return (header, response)
def delete_nat_rule(self, edge_id, rule_id):
if edge_id not in self._edges:
raise Exception(_("Edge %s does not exist") % edge_id)
edge = self._edges[edge_id]
rules = edge['nat_rules']
rule_to_delete = None
for rule in rules['rules']['natRulesDtos']:
if rule_id == rule['ruleId']:
rule_to_delete = rule
break
if rule_to_delete is None:
raise Exception(_("Rule id %d doest not exist") % rule_id)
rules['rules']['natRulesDtos'].remove(rule_to_delete)
header = {
'status': 200
}
response = ''
return (header, response)
def get_edge_status(self, edge_id):
if edge_id not in self._edges:
raise Exception(_("Edge %s does not exist") % edge_id)
header = {
'status': 200
}
response = {
'edgeStatus': 'GREEN'
}
return (header, response)
def get_edges(self):
header = {
'status': 200
}
edges = []
for edge_id in self._edges:
edges.append({
'id': edge_id,
'edgeStatus': 'GREEN'
})
response = {
'edgePage': {
'data': edges
}
}
return (header, response)
def update_routes(self, edge_id, routes):
header = {
'status': 200
}
response = ''
return (header, response)
def create_lswitch(self, lsconfig):
# The lswitch is created via VCNS API so the fake nvpapi wont
# see it. Added to fake nvpapi here.
if self._fake_nvpapi:
lswitch = self._fake_nvpapi._add_lswitch(json.dumps(lsconfig))
else:
lswitch = lsconfig
lswitch['uuid'] = uuidutils.generate_uuid()
self._lswitches[lswitch['uuid']] = lswitch
header = {
'status': 200
}
lswitch['_href'] = '/api/ws.v1/lswitch/%s' % lswitch['uuid']
return (header, lswitch)
def delete_lswitch(self, id):
if id not in self._lswitches:
raise Exception(_("Lswitch %s does not exist") % id)
del self._lswitches[id]
if self._fake_nvpapi:
# TODO(fank): fix the hack
del self._fake_nvpapi._fake_lswitch_dict[id]
header = {
'status': 200
}
response = ''
return (header, response)
def reset_all(self):
self._jobs.clear()
self._edges.clear()
self._lswitches.clear()