Cisco VPNaaS and L3 router plugin integration

This completes the blueprint effort to integrate the Cisco VPNaaS
drivers with the Cisco L3 router plugin for a VPN solution that
uses an in-band Cisco CSR (versus out-of-band, as is currently).

The VPNaaS service driver is modified to no longer read router
configuration from an INI file, but instead, will query the L3
router plugin for this information. The config loading logic is
removed. The router information is transformed and passed to the
device driver for use by the REST client.

The L3 router plugin now provides two methods to provide the needed
router and hosting device info. In addition, the template and
snippets used to provision the Cisco CSR are modified to have the
needed VPN settings and to change the VRF configuration to the new
style to support VPN REST API calls with VRF info.

The VPNaaS device driver was modified to provide the VRF name as
part of the URI for several VPN REST API calls, as needed for this
solution. The device driver uses the additional VRF and interface
information that is provided by the service driver.

The unit tests were updated to reflect the usage of VRF names, and
the new logic for checking router information.

Change-Id: I6ee17f91a25db7bff3ebbeb8d482de1c31a77035
Implements: blueprint cisco-vpnaas-and-router-integration
This commit is contained in:
Paul Michali 2014-09-24 18:22:04 -04:00
parent 46dbc4d763
commit cb08c4e30f
10 changed files with 209 additions and 752 deletions

View File

@ -58,28 +58,32 @@ ENABLE_INTF = """
#=================================================#
# Create VRF
# $(config)ip routing
# $(config)ip vrf nrouter-e7d4y5
# $(config)vrf definition nrouter-e7d4y5
# $(config-vrf)address-family ipv4
# $(config-vrf-af)exit-address-family
# $(config-vrf)address-family ipv6
# $(config-vrf-af)exit-address-family
#=================================================#
CREATE_VRF = """
<config>
<cli-config-data>
<cmd>ip routing</cmd>
<cmd>ip vrf %s</cmd>
<cmd>vrf definition %s</cmd>
<cmd>address-family ipv4</cmd>
<cmd>exit-address-family</cmd>
<cmd>address-family ipv6</cmd>
<cmd>exit-address-family</cmd>
</cli-config-data>
</config>
"""
#=================================================#
# Remove VRF
# $(config)ip routing
# $(config)no ip vrf nrouter-e7d4y5
# $(config)no vrf definition nrouter-e7d4y5
#=================================================#
REMOVE_VRF = """
<config>
<cli-config-data>
<cmd>ip routing</cmd>
<cmd>no ip vrf %s</cmd>
<cmd>no vrf definition %s</cmd>
</cli-config-data>
</config>
"""
@ -96,7 +100,7 @@ CREATE_SUBINTERFACE = """
<cli-config-data>
<cmd>interface %s</cmd>
<cmd>encapsulation dot1Q %s</cmd>
<cmd>ip vrf forwarding %s</cmd>
<cmd>vrf forwarding %s</cmd>
<cmd>ip address %s %s</cmd>
</cli-config-data>
</config>
@ -127,7 +131,7 @@ SET_INTC_HSRP = """
<config>
<cli-config-data>
<cmd>interface %s</cmd>
<cmd>ip vrf forwarding %s</cmd>
<cmd>vrf forwarding %s</cmd>
<cmd>standby version 2</cmd>
<cmd>standby %s priority %s</cmd>
<cmd>standby %s ip %s</cmd>

View File

@ -22,10 +22,13 @@ from sqlalchemy.sql import expression as expr
from neutron.common import constants as l3_constants
from neutron.common import exceptions as n_exc
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron import context as n_context
from neutron.db import agents_db
from neutron.db import extraroute_db
from neutron.db import l3_db
from neutron.db import models_v2
from neutron.db import portbindings_db as p_binding
from neutron.extensions import providernet as pr_net
from neutron.openstack.common import lockutils
from neutron.openstack.common import log as logging
@ -573,3 +576,53 @@ class L3RouterApplianceDBMixin(extraroute_db.ExtraRoute_dbonly_mixin):
active=True)
else:
return []
def get_active_routers_for_host(self, context, host):
query = context.session.query(
l3_models.RouterHostingDeviceBinding.router_id)
query = query.join(
models_v2.Port,
l3_models.RouterHostingDeviceBinding.hosting_device_id ==
models_v2.Port.device_id)
query = query.join(p_binding.PortBindingPort)
query = query.filter(p_binding.PortBindingPort.host == host)
query = query.filter(models_v2.Port.name == 'mgmt')
router_ids = [item[0] for item in query]
if router_ids:
return self.get_sync_data_ext(context, router_ids=router_ids,
active=True)
else:
return []
@staticmethod
def _agent_state_filter(check_active, last_heartbeat):
"""Filters only active agents, if requested."""
if not check_active:
return True
return not agents_db.AgentDbMixin.is_agent_down(last_heartbeat)
def get_host_for_router(self, context, router, admin_state_up=None,
check_active=False):
query = context.session.query(agents_db.Agent.host,
agents_db.Agent.heartbeat_timestamp)
query = query.join(
p_binding.PortBindingPort,
p_binding.PortBindingPort.host == agents_db.Agent.host)
query = query.join(
models_v2.Port,
models_v2.Port.id == p_binding.PortBindingPort.port_id)
query = query.join(
l3_models.RouterHostingDeviceBinding,
l3_models.RouterHostingDeviceBinding.hosting_device_id ==
models_v2.Port.device_id)
query = query.filter(
agents_db.Agent.topic == topics.L3_AGENT,
l3_models.RouterHostingDeviceBinding.router_id == router)
if admin_state_up is not None:
query = query.filter(
agents_db.Agent.admin_state_up == admin_state_up)
entry = query.first()
if entry and L3RouterApplianceDBMixin._agent_state_filter(check_active,
entry[1]):
return entry[0]
return ""

View File

@ -39,6 +39,13 @@ interface GigabitEthernet1
ip address <ip> <mask>
no shutdown
virtual-service csr_mgmt
ip shared host-interface GigabitEthernet1
activate
remote-management
restful-api local-port 55443
ip route 0.0.0.0 0.0.0.0 GigabitEthernet1 <gw>
ip name-server <name_server>

View File

@ -30,7 +30,6 @@ HEADER_CONTENT_TYPE_JSON = {'content-type': 'application/json'}
URL_BASE = 'https://%(host)s/api/v1/%(resource)s'
# CSR RESTapi URIs
URI_VPN_IPSEC_POLICIES = 'vpn-svc/ipsec/policies'
URI_VPN_IPSEC_POLICIES_ID = URI_VPN_IPSEC_POLICIES + '/%s'
URI_VPN_IKE_POLICIES = 'vpn-svc/ike/policies'
@ -62,10 +61,12 @@ class CsrRestClient(object):
def __init__(self, settings):
self.port = str(settings.get('protocol_port', 55443))
self.host = ':'.join([settings.get('rest_mgmt_ip', ''), self.port])
self.tunnel_ip = settings.get('external_ip', '')
self.auth = (settings['username'], settings['password'])
self.tunnel_if_name = settings.get('tunnel_if_name', '')
self.inner_if_name = settings.get('inner_if_name', '')
self.outer_if_name = settings.get('outer_if_name', '')
self.token = None
self.vrf = settings.get('vrf', '')
self.vrf_prefix = 'vrf/%s/' % self.vrf if self.vrf else ""
self.status = requests.codes.OK
self.timeout = settings.get('timeout')
self.max_tries = 5
@ -212,6 +213,8 @@ class CsrRestClient(object):
return self._do_request('DELETE', resource,
more_headers=HEADER_CONTENT_TYPE_JSON)
# VPN Specific APIs
def create_ike_policy(self, policy_info):
base_ike_policy_info = {u'version': u'v1',
u'local-auth-method': u'pre-share'}
@ -226,19 +229,22 @@ class CsrRestClient(object):
payload=base_ipsec_policy_info)
def create_pre_shared_key(self, psk_info):
return self.post_request(URI_VPN_IKE_KEYRINGS, payload=psk_info)
return self.post_request(self.vrf_prefix + URI_VPN_IKE_KEYRINGS,
payload=psk_info)
def create_ipsec_connection(self, connection_info):
base_conn_info = {
u'vpn-type': u'site-to-site',
u'ip-version': u'ipv4',
u'local-device': {
u'tunnel-ip-address': self.tunnel_ip,
u'ip-address': self.tunnel_if_name
u'tunnel-ip-address': self.outer_if_name,
u'ip-address': self.inner_if_name
}
}
connection_info.update(base_conn_info)
return self.post_request(URI_VPN_SITE_TO_SITE,
if self.vrf:
connection_info[u'tunnel-vrf'] = self.vrf
return self.post_request(self.vrf_prefix + URI_VPN_SITE_TO_SITE,
payload=connection_info)
def configure_ike_keepalive(self, keepalive_info):
@ -247,11 +253,12 @@ class CsrRestClient(object):
return self.put_request(URI_VPN_IKE_KEEPALIVE, keepalive_info)
def create_static_route(self, route_info):
return self.post_request(URI_ROUTING_STATIC_ROUTES,
return self.post_request(self.vrf_prefix + URI_ROUTING_STATIC_ROUTES,
payload=route_info)
def delete_static_route(self, route_id):
return self.delete_request(URI_ROUTING_STATIC_ROUTES_ID % route_id)
return self.delete_request(
self.vrf_prefix + URI_ROUTING_STATIC_ROUTES_ID % route_id)
def set_ipsec_connection_state(self, tunnel, admin_up=True):
"""Set the IPSec site-to-site connection (tunnel) admin state.
@ -259,10 +266,12 @@ class CsrRestClient(object):
Note: When a tunnel is created, it will be admin up.
"""
info = {u'vpn-interface-name': tunnel, u'enabled': admin_up}
return self.put_request(URI_VPN_SITE_TO_SITE_STATE % tunnel, info)
return self.put_request(
self.vrf_prefix + URI_VPN_SITE_TO_SITE_STATE % tunnel, info)
def delete_ipsec_connection(self, conn_id):
return self.delete_request(URI_VPN_SITE_TO_SITE_ID % conn_id)
return self.delete_request(
self.vrf_prefix + URI_VPN_SITE_TO_SITE_ID % conn_id)
def delete_ipsec_policy(self, policy_id):
return self.delete_request(URI_VPN_IPSEC_POLICIES_ID % policy_id)
@ -271,10 +280,12 @@ class CsrRestClient(object):
return self.delete_request(URI_VPN_IKE_POLICIES_ID % policy_id)
def delete_pre_shared_key(self, key_id):
return self.delete_request(URI_VPN_IKE_KEYRINGS_ID % key_id)
return self.delete_request(
self.vrf_prefix + URI_VPN_IKE_KEYRINGS_ID % key_id)
def read_tunnel_statuses(self):
results = self.get_request(URI_VPN_SITE_ACTIVE_SESSIONS)
results = self.get_request(self.vrf_prefix +
URI_VPN_SITE_ACTIVE_SESSIONS)
if self.status != requests.codes.OK or not results:
return []
tunnels = [(t[u'vpn-interface-name'], t[u'status'])

View File

@ -1,211 +0,0 @@
# Copyright 2014 Cisco Systems, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Interim code to obtain router information for Cisco CSR
This obtains information on the Cisco CSR router from an INI file. This is
an interim solution, until the Cisco L3 router plugin code is up-streamed.
Once that happens, this code and UTs will be removed and the API calls to
the L3 router will be used.
To use this code, the Neutron server is started with a config_file that
points to an INI file with router configuration. The router would be created
(manually) in Nova, the INI file is then updated with the router information,
and then VPN IPSec site-to-site connections can be created using that router.
"""
import netaddr
import re
from oslo.config import cfg
from neutron.db import l3_db
from neutron.db import models_v2
from neutron.openstack.common.gettextutils import _LE, _LI
from neutron.openstack.common import log as logging
from neutron.services.vpn.device_drivers import (
cisco_csr_rest_client as csr_client)
LOG = logging.getLogger(__name__)
tunnel_if_re = re.compile(r'^GigabitEthernet[123]')
def get_available_csrs_from_config(config_files):
"""Read INI for available Cisco CSRs that driver can use.
Loads management port, tunnel IP, user, and password information for
available CSRs from configuration file. Driver will use this info to
configure VPN connections. The CSR is associated 1:1 with a Neutron
router. To identify which CSR to use for a VPN service, the public
(GW) IP of the Neutron router will be used as an index into the CSR
config info.
"""
multi_parser = cfg.MultiConfigParser()
LOG.info(_LI("Scanning config files %s for Cisco CSR configurations"),
config_files)
try:
read_ok = multi_parser.read(config_files)
except cfg.ParseError as pe:
LOG.error(_LE("Config file parse error: %s"), pe)
return {}
if len(read_ok) != len(config_files):
raise cfg.Error(_("Unable to parse config files %s for Cisco CSR "
"info") % config_files)
csrs_found = {}
for parsed_file in multi_parser.parsed:
for parsed_item in parsed_file.keys():
device_type, sep, for_router = parsed_item.partition(':')
if device_type.lower() == 'cisco_csr_rest':
try:
netaddr.IPNetwork(for_router)
except netaddr.core.AddrFormatError:
LOG.error(_LE("Ignoring Cisco CSR configuration entry - "
"router IP %s is not valid"), for_router)
continue
entry = parsed_file[parsed_item]
# Check for missing fields
try:
rest_mgmt_ip = entry['rest_mgmt'][0]
tunnel_ip = entry['tunnel_ip'][0]
username = entry['username'][0]
password = entry['password'][0]
host = entry['host'][0]
tunnel_if = entry['tunnel_if'][0]
except KeyError as ke:
LOG.error(_LE("Ignoring Cisco CSR for router %(router)s "
"- missing %(field)s setting"),
{'router': for_router, 'field': str(ke)})
continue
# Validate fields
try:
timeout = float(entry['timeout'][0])
except ValueError:
LOG.error(_LE("Ignoring Cisco CSR for router %s - "
"timeout is not a floating point number"),
for_router)
continue
except KeyError:
timeout = csr_client.TIMEOUT
try:
netaddr.IPAddress(rest_mgmt_ip)
except netaddr.core.AddrFormatError:
LOG.error(_LE("Ignoring Cisco CSR for subnet %s - "
"REST management is not an IP address"),
for_router)
continue
try:
netaddr.IPAddress(tunnel_ip)
except netaddr.core.AddrFormatError:
LOG.error(_LE("Ignoring Cisco CSR for router %s - "
"local tunnel is not an IP address"),
for_router)
continue
m = tunnel_if_re.match(tunnel_if)
if not m:
LOG.error(_LE("Malformed interface name for Cisco "
"CSR router entry - %s"), tunnel_if)
continue
csrs_found[for_router] = {'rest_mgmt_ip': rest_mgmt_ip,
'tunnel_ip': tunnel_ip,
'username': username,
'password': password,
'host': host,
'tunnel_if': tunnel_if,
'timeout': timeout}
LOG.debug("Found CSR for router %(router)s: %(info)s",
{'router': for_router,
'info': csrs_found[for_router]})
LOG.debug("Found %d router entries", len(csrs_found))
return csrs_found
def _get_router_id_via_external_ip(context, external_ip):
'''Find router ID for router with matching GW port IP.'''
query = context.session.query(l3_db.Router)
query = query.join(models_v2.Port,
l3_db.Router.gw_port_id == models_v2.Port.id)
query = query.join(models_v2.IPAllocation,
models_v2.IPAllocation.port_id == models_v2.Port.id)
query = query.filter(models_v2.IPAllocation.ip_address == external_ip)
router = query.first()
if router:
return router.id
def get_active_routers_for_host(context, host):
'''Get list of routers from INI file that use host requested.'''
routers = []
configured_routers = get_available_csrs_from_config(cfg.CONF.config_file)
if not configured_routers:
LOG.error(_LE("No routers found in INI file!"))
return routers
for router_ip, info in configured_routers.items():
if host == info['host']:
router_id = _get_router_id_via_external_ip(context, router_ip)
if router_id:
LOG.debug("Found router %(router)s on host %(host)s",
{'router': router_id, 'host': host})
routers.append({
'id': router_id,
'hosting_device': {
'management_ip_address': info['rest_mgmt_ip'],
'credentials': {'username': info['username'],
'password': info['password']}
},
'tunnel_if': info['tunnel_if'],
'tunnel_ip': info['tunnel_ip']
})
else:
LOG.error(_LE("Unable to lookup router ID based on router's "
"public IP (%s) in INI file"), router_ip)
if not routers:
LOG.error(_LE("No matching routers on host %s"), host)
return routers
def _get_external_ip_for_router(context, router_id):
'''Find port that is the gateway port for router.'''
query = context.session.query(models_v2.Port)
query = query.join(l3_db.Router,
l3_db.Router.gw_port_id == models_v2.Port.id)
query = query.filter(l3_db.Router.id == router_id)
gw_port = query.first()
if gw_port:
return gw_port.fixed_ips[0]['ip_address']
def get_host_for_router(context, router_id):
'''Find out GW port for router and look-up in INI file to get host.
For this interim solution, there is the possibility that the INI file is
not configured correctly and either there are no entries or no matching
entry. An error will be reported in log and resource will remain in the
same state (e.g. PENDING_CREATE).
'''
routers = get_available_csrs_from_config(cfg.CONF.config_file)
if not routers:
LOG.error(_LE("No routers found in INI file!"))
return ''
router_public_ip = _get_external_ip_for_router(context, router_id)
if router_public_ip:
router = routers.get(router_public_ip)
if router:
LOG.debug("Found host %(host)s for router %(router)s",
{'host': router['host'], 'router': router_id})
return router['host']
LOG.error(_LE("Unable to find host for router %s"), router_id)
return ''

View File

@ -15,10 +15,10 @@
from neutron.common import rpc as n_rpc
from neutron.db.vpn import vpn_db
from neutron.openstack.common import log as logging
from neutron.plugins.cisco.l3.plugging_drivers import (
n1kv_plugging_constants as n1kv_constants)
from neutron.services.vpn.common import topics
from neutron.services.vpn import service_drivers
from neutron.services.vpn.service_drivers import (
cisco_cfg_loader as via_cfg_file)
from neutron.services.vpn.service_drivers import cisco_csr_db as csr_id_map
from neutron.services.vpn.service_drivers import cisco_validator
@ -30,6 +30,7 @@ LIFETIME_LIMITS = {'IKE Policy': {'min': 60, 'max': 86400},
'IPSec Policy': {'min': 120, 'max': 2592000}}
MIN_CSR_MTU = 1500
MAX_CSR_MTU = 9192
VRF_SUFFIX_LEN = 6
class CiscoCsrIPsecVpnDriverCallBack(n_rpc.RpcCallback):
@ -59,7 +60,8 @@ class CiscoCsrIPsecVpnDriverCallBack(n_rpc.RpcCallback):
def get_vpn_services_on_host(self, context, host=None):
"""Returns info on the VPN services on the host."""
routers = via_cfg_file.get_active_routers_for_host(context, host)
routers = self.driver.l3_plugin.get_active_routers_for_host(context,
host)
host_vpn_services = []
for router in routers:
vpn_services = self.get_vpn_services_using(context, router['id'])
@ -96,11 +98,8 @@ class CiscoCsrIPsecVpnAgentApi(service_drivers.BaseIPsecVpnAgentApi,
admin_context = context.is_admin and context or context.elevated()
if not version:
version = self.RPC_API_VERSION
host = via_cfg_file.get_host_for_router(admin_context, router_id)
if not host:
# NOTE: This is a config error for workaround. At this point we
# can't set state of resource to error.
return
host = self.driver.l3_plugin.get_host_for_router(admin_context,
router_id)
LOG.debug('Notify agent at %(topic)s.%(host)s the message '
'%(method)s %(args)s for router %(router)s',
{'topic': self.topic,
@ -191,17 +190,27 @@ class CiscoCsrIPsecVPNDriver(service_drivers.VpnDriver):
'ike_policy_id': u'%d' % ike_id,
'ipsec_policy_id': u'%s' % ipsec_id}
def _create_tunnel_interface(self, router_info):
return router_info['tunnel_if']
def _create_interface(self, interface_info):
hosting_info = interface_info['hosting_info']
vlan = hosting_info['segmentation_id']
# Port name "currently" is t{1,2}_p:1, as only one router per CSR,
# but will keep a semi-generic algorithm
port_name = hosting_info['hosting_port_name']
name, sep, num = port_name.partition(':')
offset = 1 if name in n1kv_constants.T2_PORT_NAME else 0
if_num = int(num) * 2 + offset
return 'GigabitEthernet%d.%d' % (if_num, vlan)
def _get_router_info(self, router_info):
hosting_device = router_info['hosting_device']
return {'rest_mgmt_ip': hosting_device['management_ip_address'],
'external_ip': router_info['tunnel_ip'],
'username': hosting_device['credentials']['username'],
'password': hosting_device['credentials']['password'],
'tunnel_if_name': self._create_tunnel_interface(router_info),
# TODO(pcm): Add protocol_port, if avail from L3 router plugin
'inner_if_name': self._create_interface(
router_info['_interfaces'][0]),
'outer_if_name': self._create_interface(
router_info['gw_port']),
'vrf': 'nrouter-' + router_info['id'][:VRF_SUFFIX_LEN],
'timeout': 30} # Hard-coded for now
def _make_vpnservice_dict(self, context, vpnservice, router_info):

View File

@ -26,6 +26,7 @@ from neutron.tests import base
dummy_policy_id = 'dummy-ipsec-policy-id-name'
TEST_VRF = 'nrouter-123456'
BASE_URL = 'https://%s:55443/api/v1/'
LOCAL_URL = 'https://localhost:55443/api/v1/'
@ -33,18 +34,18 @@ URI_HOSTNAME = 'global/host-name'
URI_USERS = 'global/local-users'
URI_AUTH = 'auth/token-services'
URI_INTERFACE_GE1 = 'interfaces/GigabitEthernet1'
URI_PSK = 'vpn-svc/ike/keyrings'
URI_PSK = 'vrf/' + TEST_VRF + '/vpn-svc/ike/keyrings'
URI_PSK_ID = URI_PSK + '/%s'
URI_IKE_POLICY = 'vpn-svc/ike/policies'
URI_IKE_POLICY_ID = URI_IKE_POLICY + '/%s'
URI_IPSEC_POLICY = 'vpn-svc/ipsec/policies'
URI_IPSEC_POLICY_ID = URI_IPSEC_POLICY + '/%s'
URI_IPSEC_CONN = 'vpn-svc/site-to-site'
URI_IPSEC_CONN = 'vrf/' + TEST_VRF + '/vpn-svc/site-to-site'
URI_IPSEC_CONN_ID = URI_IPSEC_CONN + '/%s'
URI_KEEPALIVE = 'vpn-svc/ike/keepalive'
URI_ROUTES = 'routing-svc/static-routes'
URI_ROUTES = 'vrf/' + TEST_VRF + '/routing-svc/static-routes'
URI_ROUTES_ID = URI_ROUTES + '/%s'
URI_SESSIONS = 'vpn-svc/site-to-site/active/sessions'
URI_SESSIONS = 'vrf/' + TEST_VRF + '/vpn-svc/site-to-site/active/sessions'
# Note: Helper functions to test reuse of IDs.
@ -69,6 +70,7 @@ class CiscoCsrBaseTestCase(base.BaseTestCase):
self.base_url = BASE_URL % host
self.requests = self.useFixture(mock_fixture.Fixture())
info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip,
'vrf': 'nrouter-123456',
'username': 'stack', 'password': 'cisco', 'timeout': timeout}
self.csr = csr_client.CsrRestClient(info)
@ -1131,6 +1133,7 @@ class TestCsrRestIPSecConnectionCreate(CiscoCsrBaseTestCase):
u'ipsec-policy-id': u'%s' % ipsec_policy_id,
u'ike-profile-id': None,
u'mtu': 1500,
u'tunnel-vrf': TEST_VRF,
u'local-device': {
u'ip-address': '10.3.0.1/24',
u'tunnel-ip-address': '10.10.10.10'
@ -1161,6 +1164,7 @@ class TestCsrRestIPSecConnectionCreate(CiscoCsrBaseTestCase):
u'ike-profile-id': None,
u'vpn-type': u'site-to-site',
u'mtu': 1500,
u'tunnel-vrf': TEST_VRF,
u'ip-version': u'ipv4'}
expected_connection.update(connection_info)
location = self.csr.create_ipsec_connection(connection_info)
@ -1206,6 +1210,7 @@ class TestCsrRestIPSecConnectionCreate(CiscoCsrBaseTestCase):
u'ike-profile-id': None,
u'vpn-type': u'site-to-site',
u'mtu': 1500,
u'tunnel-vrf': TEST_VRF,
u'ip-version': u'ipv4'}
expected_connection.update(connection_info)
location = self.csr.create_ipsec_connection(connection_info)
@ -1213,7 +1218,7 @@ class TestCsrRestIPSecConnectionCreate(CiscoCsrBaseTestCase):
self.csr.delete_ipsec_connection,
tunnel_name)
self.assertEqual(requests.codes.CREATED, self.csr.status)
self.assertIn('vpn-svc/site-to-site/' + tunnel_name, location)
self.assertIn(URI_IPSEC_CONN_ID % tunnel_name, location)
# Check the hard-coded items that get set as well...
self._helper_register_ipsec_conn_get(tunnel_name, override={
@ -1247,6 +1252,7 @@ class TestCsrRestIPSecConnectionCreate(CiscoCsrBaseTestCase):
}
expected_connection = {u'kind': u'object#vpn-site-to-site',
u'ike-profile-id': None,
u'tunnel-vrf': TEST_VRF,
u'vpn-type': u'site-to-site',
u'ip-version': u'ipv4'}
expected_connection.update(connection_info)
@ -1285,6 +1291,7 @@ class TestCsrRestIPSecConnectionCreate(CiscoCsrBaseTestCase):
}
expected_connection = {u'kind': u'object#vpn-site-to-site',
u'ike-profile-id': None,
u'tunnel-vrf': TEST_VRF,
u'vpn-type': u'site-to-site',
u'ip-version': u'ipv4'}
expected_connection.update(connection_info)
@ -1427,6 +1434,7 @@ class TestCsrRestIPSecConnectionCreate(CiscoCsrBaseTestCase):
}
expected_connection = {u'kind': u'object#vpn-site-to-site',
u'ike-profile-id': None,
u'tunnel-vrf': TEST_VRF,
u'vpn-type': u'site-to-site',
u'ip-version': u'ipv4'}
expected_connection.update(connection_info)

View File

@ -70,12 +70,10 @@ class TestCiscoCsrIPSecConnection(base.BaseTestCase):
'lifetime_value': 3600},
'cisco': {'site_conn_id': 'Tunnel0',
'ike_policy_id': 222,
'ipsec_policy_id': 333,
'router_public_ip': '172.24.4.23'}
'ipsec_policy_id': 333}
}
self.csr = mock.Mock(spec=csr_client.CsrRestClient)
self.csr.status = 201 # All calls to CSR REST API succeed
self.csr.tunnel_ip = '172.24.4.23'
self.ipsec_conn = ipsec_driver.CiscoCsrIPSecConnection(self.conn_info,
self.csr)
@ -241,8 +239,7 @@ class TestCiscoCsrIPsecConnectionCreateTransforms(base.BaseTestCase):
'lifetime_value': 3600},
'cisco': {'site_conn_id': 'Tunnel0',
'ike_policy_id': 222,
'ipsec_policy_id': 333,
'router_public_ip': '172.24.4.23'}
'ipsec_policy_id': 333}
}
self.csr = mock.Mock(spec=csr_client.CsrRestClient)
self.csr.tunnel_ip = '172.24.4.23'
@ -437,12 +434,14 @@ class TestCiscoCsrIPsecDeviceDriverSyncStatuses(base.BaseTestCase):
ipsec_driver.CiscoCsrIPSecConnection,
'set_admin_state').start()
self.csr = mock.Mock()
self.router_info = {u'router_info': {'rest_mgmt_ip': '2.2.2.2',
'tunnel_ip': '1.1.1.3',
'username': 'me',
'password': 'password',
'timeout': 120,
'external_ip': u'1.1.1.1'}}
self.router_info = {
u'router_info': {'rest_mgmt_ip': '2.2.2.2',
'tunnel_ip': '1.1.1.3',
'username': 'me',
'password': 'password',
'timeout': 120,
'outer_if_name': u'GigabitEthernet3.102',
'inner_if_name': u'GigabitEthernet3.101'}}
self.service123_data = {u'id': u'123',
u'status': constants.DOWN,
u'admin_state_up': False}

View File

@ -1,476 +0,0 @@
# Copyright 2014 Cisco Systems, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Test module for interim implementation - to be removed later.
This tests using an INI file to obtain Cisco CSR router information
for IPSec site-to-site connections. Once the Cisco L3 router plugin
blueprint has been up-streamed, this can be removed and production code
switched to use the L3 plugin methods for:
get_host_for_router()
get_active_routers_for_host()
TODO(pcm): remove module, when Cisco L3 router plugin is up-streamed.
"""
import os
import tempfile
import mock
from oslo.config import cfg
from neutron import context as ctx
from neutron.openstack.common import uuidutils
from neutron.services.vpn.device_drivers import (
cisco_csr_rest_client as csr_client)
from neutron.services.vpn.service_drivers import (
cisco_cfg_loader as cfg_loader)
from neutron.tests import base
_uuid = uuidutils.generate_uuid
FAKE_ROUTER_ID = _uuid()
CISCO_GET_ROUTER_IP = ('neutron.services.vpn.service_drivers.'
'cisco_cfg_loader._get_external_ip_for_router')
CISCO_GET_ROUTER_ID = ('neutron.services.vpn.service_drivers.'
'cisco_cfg_loader._get_router_id_via_external_ip')
def create_tempfile(contents):
(fd, path) = tempfile.mkstemp(prefix='test', suffix='.conf')
try:
os.write(fd, contents.encode('utf-8'))
finally:
os.close(fd)
return path
class TestCiscoCsrServiceDriverConfigLoading(base.BaseTestCase):
def test_loading_csr_configuration(self):
"""Ensure that Cisco CSR configs can be loaded from config files."""
cfg_file = create_tempfile(
'[CISCO_CSR_REST:3.2.1.1]\n'
'rest_mgmt = 10.20.30.1\n'
'tunnel_ip = 3.2.1.3\n'
'username = me\n'
'password = secret\n'
'host = compute-node\n'
'tunnel_if = GigabitEthernet3\n'
'timeout = 5.0\n')
expected = {'3.2.1.1': {'rest_mgmt_ip': '10.20.30.1',
'tunnel_ip': '3.2.1.3',
'username': 'me',
'password': 'secret',
'host': 'compute-node',
'tunnel_if': 'GigabitEthernet3',
'timeout': 5.0}}
csrs_found = cfg_loader.get_available_csrs_from_config([cfg_file])
self.assertEqual(expected, csrs_found)
def test_loading_config_without_timeout(self):
"""Cisco CSR config without timeout will use default timeout."""
cfg_file = create_tempfile(
'[CISCO_CSR_REST:3.2.1.1]\n'
'rest_mgmt = 10.20.30.1\n'
'tunnel_ip = 3.2.1.3\n'
'username = me\n'
'password = secret\n'
'host = compute-node\n'
'tunnel_if = GigabitEthernet3\n')
expected = {'3.2.1.1': {'rest_mgmt_ip': '10.20.30.1',
'tunnel_ip': '3.2.1.3',
'username': 'me',
'password': 'secret',
'host': 'compute-node',
'tunnel_if': 'GigabitEthernet3',
'timeout': csr_client.TIMEOUT}}
csrs_found = cfg_loader.get_available_csrs_from_config([cfg_file])
self.assertEqual(expected, csrs_found)
def test_skip_loading_duplicate_csr_configuration(self):
"""Failure test that duplicate configurations are ignored."""
cfg_file = create_tempfile(
'[CISCO_CSR_REST:3.2.1.1]\n'
'rest_mgmt = 10.20.30.1\n'
'tunnel_ip = 3.2.1.3\n'
'username = me\n'
'password = secret\n'
'host = compute-node\n'
'tunnel_if = GigabitEthernet3\n'
'timeout = 5.0\n'
'[CISCO_CSR_REST:3.2.1.1]\n'
'rest_mgmt = 5.5.5.3\n'
'tunnel_ip = 3.2.1.6\n'
'username = me\n'
'password = secret\n'
'host = compute-node\n'
'tunnel_if = GigabitEthernet3\n')
expected = {'3.2.1.1': {'rest_mgmt_ip': '10.20.30.1',
'tunnel_ip': '3.2.1.3',
'username': 'me',
'password': 'secret',
'host': 'compute-node',
'tunnel_if': 'GigabitEthernet3',
'timeout': 5.0}}
csrs_found = cfg_loader.get_available_csrs_from_config([cfg_file])
self.assertEqual(expected, csrs_found)
def test_fail_loading_config_with_invalid_timeout(self):
"""Failure test of invalid timeout in config info."""
cfg_file = create_tempfile(
'[CISCO_CSR_REST:3.2.1.1]\n'
'rest_mgmt = 10.20.30.1\n'
'tunnel_ip = 3.2.1.3\n'
'username = me\n'
'password = secret\n'
'host = compute-node\n'
'tunnel_if = GigabitEthernet3\n'
'timeout = yes\n')
csrs_found = cfg_loader.get_available_csrs_from_config([cfg_file])
self.assertEqual({}, csrs_found)
def test_fail_loading_config_missing_required_info(self):
"""Failure test of config missing required info."""
cfg_file = create_tempfile(
'[CISCO_CSR_REST:1.1.1.0]\n'
# No rest_mgmt
'tunnel_ip = 1.1.1.3\n'
'username = me\n'
'password = secret\n'
'host = compute-node\n'
'tunnel_if = GigabitEthernet3\n'
'timeout = 5.0\n'
'[CISCO_CSR_REST:2.2.2.0]\n'
'rest_mgmt = 10.20.30.2\n'
# No tunnel_ip
'username = me\n'
'password = secret\n'
'host = compute-node\n'
'tunnel_if = GigabitEthernet3\n'
'timeout = 5.0\n'
'[CISCO_CSR_REST:3.3.3.0]\n'
'rest_mgmt = 10.20.30.3\n'
'tunnel_ip = 3.3.3.3\n'
# No username
'password = secret\n'
'host = compute-node\n'
'tunnel_if = GigabitEthernet3\n'
'timeout = 5.0\n'
'[CISCO_CSR_REST:4.4.4.0]\n'
'rest_mgmt = 10.20.30.4\n'
'tunnel_ip = 4.4.4.4\n'
'username = me\n'
# No password
'host = compute-node\n'
'tunnel_if = GigabitEthernet3\n'
'timeout = 5.0\n'
'[CISCO_CSR_REST:5.5.5.0]\n'
'rest_mgmt = 10.20.30.5\n'
'tunnel_ip = 5.5.5.5'
'username = me\n'
'password = secret\n'
# No host
'tunnel_if = GigabitEthernet3\n'
'timeout = 5.0\n'
'[CISCO_CSR_REST:6.6.6.0]\n'
'rest_mgmt = 10.20.30.6\n'
'tunnel_ip = 6.6.6.6'
'username = me\n'
'password = secret\n'
'host = compute-node\n'
# No tunnel_if
'timeout = 5.0\n')
csrs_found = cfg_loader.get_available_csrs_from_config([cfg_file])
self.assertEqual({}, csrs_found)
def test_fail_loading_config_with_invalid_router_id(self):
"""Failure test of config with invalid rotuer ID."""
cfg_file = create_tempfile(
'[CISCO_CSR_REST:4.3.2.1.9]\n'
'rest_mgmt = 10.20.30.1\n'
'tunnel_ip = 4.3.2.3\n'
'username = me\n'
'password = secret\n'
'host = compute-node\n'
'tunnel_if = GigabitEthernet3\n'
'timeout = 5.0\n')
csrs_found = cfg_loader.get_available_csrs_from_config([cfg_file])
self.assertEqual({}, csrs_found)
def test_fail_loading_config_with_invalid_mgmt_ip(self):
"""Failure test of configuration with invalid management IP address."""
cfg_file = create_tempfile(
'[CISCO_CSR_REST:3.2.1.1]\n'
'rest_mgmt = 1.1.1.1.1\n'
'tunnel_ip = 3.2.1.3\n'
'username = me\n'
'password = secret\n'
'host = compute-node\n'
'tunnel_if = GigabitEthernet3\n'
'timeout = 5.0\n')
csrs_found = cfg_loader.get_available_csrs_from_config([cfg_file])
self.assertEqual({}, csrs_found)
def test_fail_loading_config_with_invalid_tunnel_ip(self):
"""Failure test of configuration with invalid tunnel IP address."""
cfg_file = create_tempfile(
'[CISCO_CSR_REST:3.2.1.1]\n'
'rest_mgmt = 1.1.1.1\n'
'tunnel_ip = 3.2.1.4.5\n'
'username = me\n'
'password = secret\n'
'host = compute-node\n'
'tunnel_if = GigabitEthernet3\n'
'timeout = 5.0\n')
csrs_found = cfg_loader.get_available_csrs_from_config([cfg_file])
self.assertEqual({}, csrs_found)
def test_failure_no_configurations_entries(self):
"""Failure test config file without any CSR definitions."""
cfg_file = create_tempfile('NO CISCO SECTION AT ALL\n')
csrs_found = cfg_loader.get_available_csrs_from_config([cfg_file])
self.assertEqual({}, csrs_found)
def test_failure_no_csr_configurations_entries(self):
"""Failure test config file without any CSR definitions."""
cfg_file = create_tempfile('[SOME_CONFIG:123]\n'
'username = me\n')
csrs_found = cfg_loader.get_available_csrs_from_config([cfg_file])
self.assertEqual({}, csrs_found)
def test_missing_config_value(self):
"""Failure test of config file missing a value for attribute."""
cfg_file = create_tempfile(
'[CISCO_CSR_REST:3.2.1.1]\n'
'rest_mgmt = \n'
'tunnel_ip = 3.2.1.3\n'
'username = me\n'
'password = secret\n'
'host = compute-node\n'
'tunnel_if = GigabitEthernet3\n'
'timeout = 5.0\n')
csrs_found = cfg_loader.get_available_csrs_from_config([cfg_file])
self.assertEqual({}, csrs_found)
def test_ignores_invalid_attribute_in_config(self):
"""Test ignoring of config file with invalid attribute."""
cfg_file = create_tempfile(
'[CISCO_CSR_REST:3.2.1.1]\n'
'rest_mgmt = 1.1.1.1\n'
'bogus = abcdef\n'
'tunnel_ip = 3.2.1.3\n'
'username = me\n'
'password = secret\n'
'host = compute-node\n'
'tunnel_if = GigabitEthernet3\n'
'timeout = 15.5\n')
expected = {'3.2.1.1': {'rest_mgmt_ip': '1.1.1.1',
'tunnel_ip': '3.2.1.3',
'username': 'me',
'password': 'secret',
'host': 'compute-node',
'tunnel_if': 'GigabitEthernet3',
'timeout': 15.5}}
csrs_found = cfg_loader.get_available_csrs_from_config([cfg_file])
self.assertEqual(expected, csrs_found)
def test_invalid_management_interface(self):
"""Failure test of invalid management interface name."""
cfg_file = create_tempfile(
'[CISCO_CSR_REST:3.2.1.1]\n'
'rest_mgmt = 1.1.1.1\n'
'tunnel_ip = 3.2.1.3\n'
'username = me\n'
'password = secret\n'
'host = compute-node\n'
'tunnel_if = GigabitEthernet9\n'
'timeout = 5.0\n')
csrs_found = cfg_loader.get_available_csrs_from_config([cfg_file])
self.assertEqual({}, csrs_found)
class TestCiscoCsrRouterInfo(base.BaseTestCase):
def setUp(self):
super(TestCiscoCsrRouterInfo, self).setUp()
self.context = ctx.get_admin_context()
def test_find_host_for_router(self):
"""Look up host in INI file for a router."""
cfg_file = create_tempfile(
'[CISCO_CSR_REST:3.2.1.1]\n'
'rest_mgmt = 10.20.30.1\n'
'tunnel_ip = 3.2.1.3\n'
'username = me\n'
'password = secret\n'
'host = ubuntu\n'
'tunnel_if = GigabitEthernet1\n'
'mgmt_vlan = 100\n'
'timeout = 5.0\n')
cfg.CONF.set_override('config_file', [cfg_file])
mock.patch(CISCO_GET_ROUTER_IP, return_value='3.2.1.1').start()
self.assertEqual('ubuntu',
cfg_loader.get_host_for_router(self.context,
FAKE_ROUTER_ID))
def test_failed_to_find_host_as_no_routers_in_ini(self):
"""Fail to find host, as no router info in INI file."""
cfg_file = create_tempfile('\n')
cfg.CONF.set_override('config_file', [cfg_file])
mock.patch(CISCO_GET_ROUTER_IP, return_value='5.5.5.5').start()
self.assertEqual('',
cfg_loader.get_host_for_router(self.context,
FAKE_ROUTER_ID))
def test_failed_no_matching_router_to_obtain_host(self):
"""Fail to find INI info for router provided."""
cfg_file = create_tempfile(
'[CISCO_CSR_REST:3.2.1.1]\n'
'rest_mgmt = 10.20.30.1\n'
'tunnel_ip = 3.2.1.3\n'
'username = me\n'
'password = secret\n'
'host = ubuntu\n'
'tunnel_if = GigabitEthernet3\n'
'timeout = 5.0\n')
cfg.CONF.set_override('config_file', [cfg_file])
mock.patch(CISCO_GET_ROUTER_IP, return_value='5.5.5.5').start()
self.assertEqual('',
cfg_loader.get_host_for_router(self.context,
FAKE_ROUTER_ID))
def test_failed_to_find_router_ip(self):
"""Fail to lookup router IP, preventing search in INI file."""
cfg_file = create_tempfile(
'[CISCO_CSR_REST:3.2.1.1]\n'
'rest_mgmt = 10.20.30.1\n'
'tunnel_ip = 3.2.1.3\n'
'username = me\n'
'password = secret\n'
'host = ubuntu\n'
'tunnel_if = GigabitEthernet3\n'
'timeout = 5.0\n')
cfg.CONF.set_override('config_file', [cfg_file])
mock.patch(CISCO_GET_ROUTER_IP, return_value=None).start()
self.assertEqual('',
cfg_loader.get_host_for_router(self.context,
FAKE_ROUTER_ID))
def _get_router_id_from_external_ip(self, context, ip):
if ip == '3.2.1.1':
return '123'
elif ip == '4.3.2.1':
return '456'
def test_get_one_active_router_for_host(self):
"""Get router info from INI for host specified."""
cfg_file = create_tempfile(
'[CISCO_CSR_REST:3.2.1.1]\n'
'rest_mgmt = 10.20.30.1\n'
'tunnel_ip = 3.2.1.3\n'
'username = me\n'
'password = secret\n'
'host = ubuntu\n'
'tunnel_if = GigabitEthernet2\n'
'timeout = 5.0\n')
cfg.CONF.set_override('config_file', [cfg_file])
mock.patch(CISCO_GET_ROUTER_ID,
side_effect=self._get_router_id_from_external_ip).start()
expected = {
'id': '123',
'hosting_device': {
'management_ip_address': '10.20.30.1',
'credentials': {'username': 'me', 'password': 'secret'}
},
'tunnel_if': 'GigabitEthernet2',
'tunnel_ip': '3.2.1.3'
}
routers = cfg_loader.get_active_routers_for_host(self.context,
"ubuntu")
self.assertEqual([expected], routers)
def test_get_two_active_routers_for_host(self):
"""Get info for two routers, from INI file, for host specified."""
cfg_file = create_tempfile(
'[CISCO_CSR_REST:3.2.1.1]\n'
'rest_mgmt = 10.20.30.1\n'
'tunnel_ip = 3.2.1.1\n'
'username = me\n'
'password = secret\n'
'host = ubuntu\n'
'tunnel_if = GigabitEthernet2\n'
'timeout = 5.0\n'
'[CISCO_CSR_REST:4.3.2.1]\n'
'rest_mgmt = 10.20.30.2\n'
'tunnel_ip = 4.3.2.1\n'
'username = you\n'
'password = insecure\n'
'host = ubuntu\n'
'tunnel_if = GigabitEthernet3\n'
'timeout = 5.0\n')
cfg.CONF.set_override('config_file', [cfg_file])
mock.patch(CISCO_GET_ROUTER_ID,
side_effect=self._get_router_id_from_external_ip).start()
expected_a = {
'id': '123',
'hosting_device': {
'management_ip_address': '10.20.30.1',
'credentials': {'username': 'me', 'password': 'secret'}
},
'tunnel_if': 'GigabitEthernet2',
'tunnel_ip': '3.2.1.1'
}
expected_b = {
'id': '456',
'hosting_device': {
'management_ip_address': '10.20.30.2',
'credentials': {'username': 'you', 'password': 'insecure'}
},
'tunnel_if': 'GigabitEthernet3',
'tunnel_ip': '4.3.2.1'
}
routers = cfg_loader.get_active_routers_for_host(self.context,
"ubuntu")
sorted_routers = sorted(routers, key=lambda key: key['id'])
self.assertEqual([expected_a, expected_b], sorted_routers)
def test_failure_to_find_routers_for_host(self):
"""Fail to find a router in INI with matching host name."""
routers = cfg_loader.get_active_routers_for_host(self.context,
"bogus")
self.assertEqual([], routers)
def test_failure_to_lookup_router_id_for_host(self):
"""Fail to get router UUID for router in INI matching host name."""
cfg_file = create_tempfile(
'[CISCO_CSR_REST:6.6.6.1]\n'
'rest_mgmt = 10.20.30.1\n'
'tunnel_ip = 6.6.6.1\n'
'username = me\n'
'password = secret\n'
'host = ubuntu\n'
'tunnel_if = GigabitEthernet3\n'
'timeout = 5.0\n')
cfg.CONF.set_override('config_file', [cfg_file])
mock.patch(CISCO_GET_ROUTER_ID,
side_effect=self._get_router_id_from_external_ip).start()
routers = cfg_loader.get_active_routers_for_host(self.context,
"ubuntu")
self.assertEqual([], routers)

View File

@ -74,14 +74,10 @@ class TestCiscoIPsecDriverValidation(base.BaseTestCase):
def setUp(self):
super(TestCiscoIPsecDriverValidation, self).setUp()
mock.patch('neutron.common.rpc.create_connection').start()
self.l3_plugin = mock.Mock()
mock.patch(
'neutron.manager.NeutronManager.get_service_plugins',
return_value={constants.L3_ROUTER_NAT: self.l3_plugin}).start()
self.core_plugin = mock.Mock()
mock.patch('neutron.manager.NeutronManager.get_plugin',
return_value=self.core_plugin).start()
self.context = n_ctx.Context('some_user', 'some_tenant')
self.vpn_service = {'router_id': '123'}
self.router = mock.Mock()
@ -337,18 +333,20 @@ class TestCiscoIPsecDriver(testlib_api.SqlTestCase):
mock.patch('neutron.common.rpc.create_connection').start()
service_plugin = mock.Mock()
service_plugin.get_host_for_router.return_value = FAKE_HOST
# TODO(pcm): Remove when Cisco L3 router plugin support available
mock.patch('neutron.services.vpn.service_drivers.'
'cisco_cfg_loader.get_host_for_router',
return_value=FAKE_HOST).start()
service_plugin._get_vpnservice.return_value = {
'router_id': _uuid()
}
get_service_plugin = mock.patch(
'neutron.manager.NeutronManager.get_service_plugins').start()
get_service_plugin.return_value = {
constants.L3_ROUTER_NAT: service_plugin}
l3_plugin = mock.Mock()
mock.patch(
'neutron.manager.NeutronManager.get_service_plugins',
return_value={constants.L3_ROUTER_NAT: l3_plugin}).start()
l3_plugin.get_host_for_router.return_value = FAKE_HOST
l3_agent = mock.Mock()
l3_agent.host = 'some-host'
l3_plugin.get_l3_agents_hosting_routers.return_value = [l3_agent]
self.driver = ipsec_driver.CiscoCsrIPsecVPNDriver(service_plugin)
mock.patch.object(csr_db, 'create_tunnel_mapping').start()
self.context = n_ctx.Context('some_user', 'some_tenant')
@ -388,3 +386,58 @@ class TestCiscoIPsecDriver(testlib_api.SqlTestCase):
self._test_update(self.driver.delete_vpnservice,
[FAKE_VPN_SERVICE],
{'reason': 'vpn-service-delete'})
class TestCiscoIPsecDriverRequests(base.BaseTestCase):
"""Test handling device driver requests for service info."""
def setUp(self):
super(TestCiscoIPsecDriverRequests, self).setUp()
mock.patch('neutron.common.rpc.create_connection').start()
service_plugin = mock.Mock()
self.driver = ipsec_driver.CiscoCsrIPsecVPNDriver(service_plugin)
def test_build_router_tunnel_interface_name(self):
"""Check formation of inner/outer interface name for CSR router."""
router_info = {
'_interfaces': [
{'hosting_info': {'segmentation_id': 100,
'hosting_port_name': 't1_p:1'}}
],
'gw_port':
{'hosting_info': {'segmentation_id': 200,
'hosting_port_name': 't2_p:1'}}
}
self.assertEqual(
'GigabitEthernet2.100',
self.driver._create_interface(router_info['_interfaces'][0]))
self.assertEqual(
'GigabitEthernet3.200',
self.driver._create_interface(router_info['gw_port']))
def test_build_router_info(self):
"""Check creation of CSR info to send to device driver."""
router_info = {
'hosting_device': {
'management_ip_address': '1.1.1.1',
'credentials': {'username': 'me', 'password': 'secret'}
},
'gw_port':
{'hosting_info': {'segmentation_id': 101,
'hosting_port_name': 't2_p:1'}},
'id': u'c607b58e-f150-4289-b83f-45623578d122',
'_interfaces': [
{'hosting_info': {'segmentation_id': 100,
'hosting_port_name': 't1_p:1'}}
]
}
expected = {'rest_mgmt_ip': '1.1.1.1',
'username': 'me',
'password': 'secret',
'inner_if_name': 'GigabitEthernet2.100',
'outer_if_name': 'GigabitEthernet3.101',
'vrf': 'nrouter-c607b5',
'timeout': 30}
self.assertEqual(expected, self.driver._get_router_info(router_info))