Rename Security Groups related methods for VMware NSX plugin

Second to last step for the renaming/refactoring of nvplib
and related modules. This is about security groups.

Partial-implements blueprint nicira-plugin-renaming

Change-Id: I0099bf25be77fdec90d515c890b728a477409ef9
This commit is contained in:
armando-migliaccio 2014-01-16 23:08:20 -08:00
parent 770e672455
commit a13cbe1410
18 changed files with 500 additions and 520 deletions

View File

@ -79,10 +79,9 @@ from neutron.plugins.nicira.extensions import nvp_qos as ext_qos
from neutron.plugins.nicira.nsxlib import l2gateway as l2gwlib from neutron.plugins.nicira.nsxlib import l2gateway as l2gwlib
from neutron.plugins.nicira.nsxlib import queue as queuelib from neutron.plugins.nicira.nsxlib import queue as queuelib
from neutron.plugins.nicira.nsxlib import router as routerlib from neutron.plugins.nicira.nsxlib import router as routerlib
from neutron.plugins.nicira.nsxlib import secgroup as secgrouplib
from neutron.plugins.nicira.nsxlib import switch as switchlib from neutron.plugins.nicira.nsxlib import switch as switchlib
from neutron.plugins.nicira import NvpApiClient from neutron.plugins.nicira import NvpApiClient
from neutron.plugins.nicira import nvplib
LOG = logging.getLogger("NeutronPlugin") LOG = logging.getLogger("NeutronPlugin")
@ -467,7 +466,7 @@ class NvpPluginV2(addr_pair_db.AllowedAddressPairsMixin,
context.session, port_data['id'], context.session, port_data['id'],
selected_lswitch['uuid'], lport['uuid']) selected_lswitch['uuid'], lport['uuid'])
if port_data['device_owner'] not in self.port_special_owners: if port_data['device_owner'] not in self.port_special_owners:
switchlib.plug_interface( switchlib.plug_vif_interface(
self.cluster, selected_lswitch['uuid'], self.cluster, selected_lswitch['uuid'],
lport['uuid'], "VifAttachment", port_data['id']) lport['uuid'], "VifAttachment", port_data['id'])
LOG.debug(_("_nvp_create_port completed for port %(name)s " LOG.debug(_("_nvp_create_port completed for port %(name)s "
@ -1692,7 +1691,7 @@ class NvpPluginV2(addr_pair_db.AllowedAddressPairsMixin,
nsx_switch_id, nsx_port_id = nsx_utils.get_nsx_switch_and_port_id( nsx_switch_id, nsx_port_id = nsx_utils.get_nsx_switch_and_port_id(
context.session, self.cluster, port_id) context.session, self.cluster, port_id)
# Unplug current attachment from lswitch port # Unplug current attachment from lswitch port
switchlib.plug_interface(self.cluster, nsx_switch_id, switchlib.plug_vif_interface(self.cluster, nsx_switch_id,
nsx_port_id, "NoAttachment") nsx_port_id, "NoAttachment")
# Create logical router port and plug patch attachment # Create logical router port and plug patch attachment
self._create_and_attach_router_port( self._create_and_attach_router_port(
@ -2107,9 +2106,9 @@ class NvpPluginV2(addr_pair_db.AllowedAddressPairsMixin,
if not default_sg: if not default_sg:
self._ensure_default_security_group(context, tenant_id) self._ensure_default_security_group(context, tenant_id)
nvp_secgroup = nvplib.create_security_profile(self.cluster, nsx_secgroup = secgrouplib.create_security_profile(self.cluster,
tenant_id, s) tenant_id, s)
security_group['security_group']['id'] = nvp_secgroup['uuid'] security_group['security_group']['id'] = nsx_secgroup['uuid']
return super(NvpPluginV2, self).create_security_group( return super(NvpPluginV2, self).create_security_group(
context, security_group, default_sg) context, security_group, default_sg)
@ -2132,7 +2131,7 @@ class NvpPluginV2(addr_pair_db.AllowedAddressPairsMixin,
context, filters): context, filters):
raise ext_sg.SecurityGroupInUse(id=security_group['id']) raise ext_sg.SecurityGroupInUse(id=security_group['id'])
try: try:
nvplib.delete_security_profile( secgrouplib.delete_security_profile(
self.cluster, security_group['id']) self.cluster, security_group['id'])
except q_exc.NotFound: except q_exc.NotFound:
LOG.info(_("Security group: %s was already deleted " LOG.info(_("Security group: %s was already deleted "
@ -2187,7 +2186,7 @@ class NvpPluginV2(addr_pair_db.AllowedAddressPairsMixin,
# of them to PUT to NVP. # of them to PUT to NVP.
combined_rules = self._merge_security_group_rules_with_current( combined_rules = self._merge_security_group_rules_with_current(
context, s, security_group['id']) context, s, security_group['id'])
nvplib.update_security_group_rules(self.cluster, secgrouplib.update_security_group_rules(self.cluster,
security_group['id'], security_group['id'],
combined_rules) combined_rules)
return super( return super(
@ -2212,7 +2211,7 @@ class NvpPluginV2(addr_pair_db.AllowedAddressPairsMixin,
self._remove_security_group_with_id_and_id_field( self._remove_security_group_with_id_and_id_field(
current_rules, sgrid) current_rules, sgrid)
nvplib.update_security_group_rules( secgrouplib.update_security_group_rules(
self.cluster, sgid, current_rules) self.cluster, sgid, current_rules)
return super(NvpPluginV2, self).delete_security_group_rule(context, return super(NvpPluginV2, self).delete_security_group_rule(context,
sgrid) sgrid)

View File

@ -31,6 +31,7 @@ from neutron.openstack.common import log as logging
from neutron.plugins.common import constants as service_constants from neutron.plugins.common import constants as service_constants
from neutron.plugins.nicira.common import config # noqa from neutron.plugins.nicira.common import config # noqa
from neutron.plugins.nicira.common import exceptions as nvp_exc from neutron.plugins.nicira.common import exceptions as nvp_exc
from neutron.plugins.nicira.common import utils
from neutron.plugins.nicira.dbexts import servicerouter as sr_db from neutron.plugins.nicira.dbexts import servicerouter as sr_db
from neutron.plugins.nicira.dbexts import vcns_db from neutron.plugins.nicira.dbexts import vcns_db
from neutron.plugins.nicira.dbexts import vcns_models from neutron.plugins.nicira.dbexts import vcns_models
@ -39,7 +40,6 @@ from neutron.plugins.nicira import NeutronPlugin
from neutron.plugins.nicira.nsxlib import router as routerlib from neutron.plugins.nicira.nsxlib import router as routerlib
from neutron.plugins.nicira.nsxlib import switch as switchlib from neutron.plugins.nicira.nsxlib import switch as switchlib
from neutron.plugins.nicira import NvpApiClient from neutron.plugins.nicira import NvpApiClient
from neutron.plugins.nicira import nvplib
from neutron.plugins.nicira.vshield.common import ( from neutron.plugins.nicira.vshield.common import (
constants as vcns_const) constants as vcns_const)
from neutron.plugins.nicira.vshield.common.constants import RouterStatus from neutron.plugins.nicira.vshield.common.constants import RouterStatus
@ -1612,7 +1612,7 @@ class VcnsCallbacks(object):
def _process_base_create_lswitch_args(*args, **kwargs): def _process_base_create_lswitch_args(*args, **kwargs):
tags = [{"tag": nvplib.NEUTRON_VERSION, "scope": "quantum"}] tags = utils.get_tags()
tags.append({"tag": args[1], tags.append({"tag": args[1],
"scope": "quantum_net_id"}) "scope": "quantum_net_id"})
if args[2]: if args[2]:

View File

@ -32,6 +32,11 @@ from neutron.plugins.nicira.nsxlib import switch as switchlib
from neutron.plugins.nicira import NvpApiClient from neutron.plugins.nicira import NvpApiClient
from neutron.plugins.nicira import nvplib from neutron.plugins.nicira import nvplib
# Maximum page size for a single request
# NOTE(salv-orlando): This might become a version-dependent map should the
# limit be raised in future versions
MAX_PAGE_SIZE = 5000
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -466,7 +471,7 @@ class NvpSynchronizer():
# API. In this case the request should be split in multiple # API. In this case the request should be split in multiple
# requests. This is not ideal, and therefore a log warning will # requests. This is not ideal, and therefore a log warning will
# be emitted. # be emitted.
num_requests = page_size / (nvplib.MAX_PAGE_SIZE + 1) + 1 num_requests = page_size / (MAX_PAGE_SIZE + 1) + 1
if num_requests > 1: if num_requests > 1:
LOG.warn(_("Requested page size is %(cur_chunk_size)d." LOG.warn(_("Requested page size is %(cur_chunk_size)d."
"It might be necessary to do %(num_requests)d " "It might be necessary to do %(num_requests)d "
@ -475,12 +480,12 @@ class NvpSynchronizer():
"is less than %(max_page_size)d"), "is less than %(max_page_size)d"),
{'cur_chunk_size': page_size, {'cur_chunk_size': page_size,
'num_requests': num_requests, 'num_requests': num_requests,
'max_page_size': nvplib.MAX_PAGE_SIZE}) 'max_page_size': MAX_PAGE_SIZE})
# Only the first request might return the total size, # Only the first request might return the total size,
# subsequent requests will definetely not # subsequent requests will definetely not
results, cursor, total_size = nvplib.get_single_query_page( results, cursor, total_size = nvplib.get_single_query_page(
uri, self._cluster, cursor, uri, self._cluster, cursor,
min(page_size, nvplib.MAX_PAGE_SIZE)) min(page_size, MAX_PAGE_SIZE))
for _req in range(num_requests - 1): for _req in range(num_requests - 1):
# If no cursor is returned break the cycle as there is no # If no cursor is returned break the cycle as there is no
# actual need to perform multiple requests (all fetched) # actual need to perform multiple requests (all fetched)
@ -491,7 +496,7 @@ class NvpSynchronizer():
break break
req_results, cursor = nvplib.get_single_query_page( req_results, cursor = nvplib.get_single_query_page(
uri, self._cluster, cursor, uri, self._cluster, cursor,
min(page_size, nvplib.MAX_PAGE_SIZE))[:2] min(page_size, MAX_PAGE_SIZE))[:2]
results.extend(req_results) results.extend(req_results)
# reset cursor before returning if we queried just to # reset cursor before returning if we queried just to
# know the number of entities # know the number of entities

View File

@ -18,8 +18,8 @@ import json
from neutron.openstack.common import log from neutron.openstack.common import log
from neutron.plugins.nicira.common import utils from neutron.plugins.nicira.common import utils
from neutron.plugins.nicira.nsxlib import switch
from neutron.plugins.nicira.nvplib import _build_uri_path from neutron.plugins.nicira.nvplib import _build_uri_path
from neutron.plugins.nicira.nvplib import _plug_interface
from neutron.plugins.nicira.nvplib import do_request from neutron.plugins.nicira.nvplib import do_request
from neutron.plugins.nicira.nvplib import get_all_query_pages from neutron.plugins.nicira.nvplib import get_all_query_pages
@ -69,7 +69,7 @@ def plug_l2_gw_service(cluster, lswitch_id, lport_id,
'l2_gateway_service_uuid': gateway_id} 'l2_gateway_service_uuid': gateway_id}
if vlan_id: if vlan_id:
att_obj['vlan_id'] = vlan_id att_obj['vlan_id'] = vlan_id
return _plug_interface(cluster, lswitch_id, lport_id, att_obj) return switch.plug_interface(cluster, lswitch_id, lport_id, att_obj)
def get_l2_gw_service(cluster, gateway_id): def get_l2_gw_service(cluster, gateway_id):

View File

@ -0,0 +1,113 @@
# Copyright 2014 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
from neutron.common import constants
from neutron.common import exceptions
from neutron.openstack.common import log
from neutron.plugins.nicira.common import utils
from neutron.plugins.nicira.nvplib import do_request
from neutron.plugins.nicira.nvplib import format_exception
HTTP_GET = "GET"
HTTP_POST = "POST"
HTTP_DELETE = "DELETE"
HTTP_PUT = "PUT"
LOG = log.getLogger(__name__)
def mk_body(**kwargs):
"""Convenience function creates and dumps dictionary to string.
:param kwargs: the key/value pirs to be dumped into a json string.
:returns: a json string.
"""
return json.dumps(kwargs, ensure_ascii=False)
def create_security_profile(cluster, tenant_id, security_profile):
path = "/ws.v1/security-profile"
# Allow all dhcp responses and all ingress traffic
hidden_rules = {'logical_port_egress_rules':
[{'ethertype': 'IPv4',
'protocol': constants.PROTO_NUM_UDP,
'port_range_min': constants.DHCP_RESPONSE_PORT,
'port_range_max': constants.DHCP_RESPONSE_PORT,
'ip_prefix': '0.0.0.0/0'}],
'logical_port_ingress_rules':
[{'ethertype': 'IPv4'},
{'ethertype': 'IPv6'}]}
display_name = utils.check_and_truncate(security_profile.get('name'))
body = mk_body(
tags=utils.get_tags(os_tid=tenant_id), display_name=display_name,
logical_port_ingress_rules=(
hidden_rules['logical_port_ingress_rules']),
logical_port_egress_rules=hidden_rules['logical_port_egress_rules']
)
rsp = do_request(HTTP_POST, path, body, cluster=cluster)
if security_profile.get('name') == 'default':
# If security group is default allow ip traffic between
# members of the same security profile is allowed and ingress traffic
# from the switch
rules = {'logical_port_egress_rules': [{'ethertype': 'IPv4',
'profile_uuid': rsp['uuid']},
{'ethertype': 'IPv6',
'profile_uuid': rsp['uuid']}],
'logical_port_ingress_rules': [{'ethertype': 'IPv4'},
{'ethertype': 'IPv6'}]}
update_security_group_rules(cluster, rsp['uuid'], rules)
LOG.debug(_("Created Security Profile: %s"), rsp)
return rsp
def update_security_group_rules(cluster, spid, rules):
path = "/ws.v1/security-profile/%s" % spid
# Allow all dhcp responses in
rules['logical_port_egress_rules'].append(
{'ethertype': 'IPv4', 'protocol': constants.PROTO_NUM_UDP,
'port_range_min': constants.DHCP_RESPONSE_PORT,
'port_range_max': constants.DHCP_RESPONSE_PORT,
'ip_prefix': '0.0.0.0/0'})
# If there are no ingress rules add bunk rule to drop all ingress traffic
if not rules['logical_port_ingress_rules']:
rules['logical_port_ingress_rules'].append(
{'ethertype': 'IPv4', 'ip_prefix': '127.0.0.1/32'})
try:
body = mk_body(
logical_port_ingress_rules=rules['logical_port_ingress_rules'],
logical_port_egress_rules=rules['logical_port_egress_rules'])
rsp = do_request(HTTP_PUT, path, body, cluster=cluster)
except exceptions.NotFound as e:
LOG.error(format_exception("Unknown", e, locals()))
#FIXME(salvatore-orlando): This should not raise NeutronException
raise exceptions.NeutronException()
LOG.debug(_("Updated Security Profile: %s"), rsp)
return rsp
def delete_security_profile(cluster, spid):
path = "/ws.v1/security-profile/%s" % spid
try:
do_request(HTTP_DELETE, path, cluster=cluster)
except exceptions.NotFound:
# This is not necessarily an error condition
LOG.warn(_("Unable to find security profile %s on NSX backend"),
spid)
raise

View File

@ -23,7 +23,6 @@ from neutron.plugins.nicira.common import exceptions as nvp_exc
from neutron.plugins.nicira.common import utils from neutron.plugins.nicira.common import utils
from neutron.plugins.nicira import NvpApiClient from neutron.plugins.nicira import NvpApiClient
from neutron.plugins.nicira.nvplib import _build_uri_path from neutron.plugins.nicira.nvplib import _build_uri_path
from neutron.plugins.nicira.nvplib import _plug_interface
from neutron.plugins.nicira.nvplib import do_request from neutron.plugins.nicira.nvplib import do_request
from neutron.plugins.nicira.nvplib import get_all_query_pages from neutron.plugins.nicira.nvplib import get_all_query_pages
@ -369,11 +368,21 @@ def get_port_status(cluster, lswitch_id, port_id):
return constants.PORT_STATUS_DOWN return constants.PORT_STATUS_DOWN
def plug_interface(cluster, lswitch_id, port, port_type, attachment=None): def plug_interface(cluster, lswitch_id, lport_id, att_obj):
return do_request(HTTP_PUT,
_build_uri_path(LSWITCHPORT_RESOURCE,
lport_id, lswitch_id,
is_attachment=True),
json.dumps(att_obj),
cluster=cluster)
def plug_vif_interface(
cluster, lswitch_id, port_id, port_type, attachment=None):
"""Plug a VIF Attachment object in a logical port.""" """Plug a VIF Attachment object in a logical port."""
lport_obj = {} lport_obj = {}
if attachment: if attachment:
lport_obj["vif_uuid"] = attachment lport_obj["vif_uuid"] = attachment
lport_obj["type"] = port_type lport_obj["type"] = port_type
return _plug_interface(cluster, lswitch_id, port, lport_obj) return plug_interface(cluster, lswitch_id, port_id, lport_obj)

View File

@ -22,15 +22,11 @@
import json import json
#FIXME(danwent): I'd like this file to get to the point where it has
# no neutron-specific logic in it
from neutron.common import constants
from neutron.common import exceptions as exception from neutron.common import exceptions as exception
from neutron.openstack.common import log from neutron.openstack.common import log
from neutron.plugins.nicira.common import exceptions as nvp_exc from neutron.plugins.nicira.common import exceptions as nvp_exc
from neutron.plugins.nicira.common import utils
from neutron.plugins.nicira import NvpApiClient from neutron.plugins.nicira import NvpApiClient
from neutron.version import version_info
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -45,9 +41,6 @@ URI_PREFIX = "/ws.v1"
LSWITCH_RESOURCE = "lswitch" LSWITCH_RESOURCE = "lswitch"
LSWITCHPORT_RESOURCE = "lport/%s" % LSWITCH_RESOURCE LSWITCHPORT_RESOURCE = "lport/%s" % LSWITCH_RESOURCE
# Current neutron version
NEUTRON_VERSION = version_info.release_string()
# Maximum page size for a single request # Maximum page size for a single request
# NOTE(salv-orlando): This might become a version-dependent map should the # NOTE(salv-orlando): This might become a version-dependent map should the
# limit be raised in future versions # limit be raised in future versions
@ -89,24 +82,6 @@ def _build_uri_path(resource,
return uri_path return uri_path
def get_cluster_version(cluster):
"""Return major/minor version #."""
# Get control-cluster nodes
uri = "/ws.v1/control-cluster/node?_page_length=1&fields=uuid"
res = do_request(HTTP_GET, uri, cluster=cluster)
if res["result_count"] == 0:
return None
node_uuid = res["results"][0]["uuid"]
# Get control-cluster node status. It's unsupported to have controllers
# running different version so we just need the first node version.
uri = "/ws.v1/control-cluster/node/%s/status" % node_uuid
res = do_request(HTTP_GET, uri, cluster=cluster)
version_parts = res["version"].split(".")
version = "%s.%s" % tuple(version_parts[:2])
LOG.info(_("NVP controller cluster version: %s"), version)
return version
def get_single_query_page(path, cluster, page_cursor=None, def get_single_query_page(path, cluster, page_cursor=None,
page_length=1000, neutron_only=True): page_length=1000, neutron_only=True):
params = [] params = []
@ -138,20 +113,6 @@ def get_all_query_pages(path, c):
return result_list return result_list
def _plug_interface(cluster, lswitch_id, lport_id, att_obj):
uri = _build_uri_path(LSWITCHPORT_RESOURCE, lport_id, lswitch_id,
is_attachment=True)
return do_request(HTTP_PUT, uri, json.dumps(att_obj),
cluster=cluster)
#------------------------------------------------------------------------------
# Security Profile convenience functions.
#------------------------------------------------------------------------------
EXT_SECURITY_PROFILE_ID_SCOPE = 'nova_spid'
TENANT_ID_SCOPE = 'os_tid'
def format_exception(etype, e, exception_locals): def format_exception(etype, e, exception_locals):
"""Consistent formatting for exceptions. """Consistent formatting for exceptions.
@ -185,90 +146,3 @@ def do_request(*args, **kwargs):
raise exception.NotFound() raise exception.NotFound()
except NvpApiClient.ReadOnlyMode: except NvpApiClient.ReadOnlyMode:
raise nvp_exc.MaintenanceInProgress() raise nvp_exc.MaintenanceInProgress()
def mk_body(**kwargs):
"""Convenience function creates and dumps dictionary to string.
:param kwargs: the key/value pirs to be dumped into a json string.
:returns: a json string.
"""
return json.dumps(kwargs, ensure_ascii=False)
# -----------------------------------------------------------------------------
# Security Group API Calls
# -----------------------------------------------------------------------------
def create_security_profile(cluster, tenant_id, security_profile):
path = "/ws.v1/security-profile"
# Allow all dhcp responses and all ingress traffic
hidden_rules = {'logical_port_egress_rules':
[{'ethertype': 'IPv4',
'protocol': constants.PROTO_NUM_UDP,
'port_range_min': constants.DHCP_RESPONSE_PORT,
'port_range_max': constants.DHCP_RESPONSE_PORT,
'ip_prefix': '0.0.0.0/0'}],
'logical_port_ingress_rules':
[{'ethertype': 'IPv4'},
{'ethertype': 'IPv6'}]}
tags = [dict(scope='os_tid', tag=tenant_id),
dict(scope='quantum', tag=NEUTRON_VERSION)]
display_name = utils.check_and_truncate(security_profile.get('name'))
body = mk_body(
tags=tags, display_name=display_name,
logical_port_ingress_rules=(
hidden_rules['logical_port_ingress_rules']),
logical_port_egress_rules=hidden_rules['logical_port_egress_rules']
)
rsp = do_request(HTTP_POST, path, body, cluster=cluster)
if security_profile.get('name') == 'default':
# If security group is default allow ip traffic between
# members of the same security profile is allowed and ingress traffic
# from the switch
rules = {'logical_port_egress_rules': [{'ethertype': 'IPv4',
'profile_uuid': rsp['uuid']},
{'ethertype': 'IPv6',
'profile_uuid': rsp['uuid']}],
'logical_port_ingress_rules': [{'ethertype': 'IPv4'},
{'ethertype': 'IPv6'}]}
update_security_group_rules(cluster, rsp['uuid'], rules)
LOG.debug(_("Created Security Profile: %s"), rsp)
return rsp
def update_security_group_rules(cluster, spid, rules):
path = "/ws.v1/security-profile/%s" % spid
# Allow all dhcp responses in
rules['logical_port_egress_rules'].append(
{'ethertype': 'IPv4', 'protocol': constants.PROTO_NUM_UDP,
'port_range_min': constants.DHCP_RESPONSE_PORT,
'port_range_max': constants.DHCP_RESPONSE_PORT,
'ip_prefix': '0.0.0.0/0'})
# If there are no ingress rules add bunk rule to drop all ingress traffic
if not rules['logical_port_ingress_rules']:
rules['logical_port_ingress_rules'].append(
{'ethertype': 'IPv4', 'ip_prefix': '127.0.0.1/32'})
try:
body = mk_body(
logical_port_ingress_rules=rules['logical_port_ingress_rules'],
logical_port_egress_rules=rules['logical_port_egress_rules'])
rsp = do_request(HTTP_PUT, path, body, cluster=cluster)
except exception.NotFound as e:
LOG.error(format_exception("Unknown", e, locals()))
#FIXME(salvatore-orlando): This should not raise NeutronException
raise exception.NeutronException()
LOG.debug(_("Updated Security Profile: %s"), rsp)
return rsp
def delete_security_profile(cluster, spid):
path = "/ws.v1/security-profile/%s" % spid
try:
do_request(HTTP_DELETE, path, cluster=cluster)
except exception.NotFound:
# This is not necessarily an error condition
LOG.warn(_("Unable to find security profile %s on NSX backend"),
spid)
raise

View File

@ -0,0 +1,97 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2014 VMware, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import mock
from neutron.plugins.nicira.common import config # noqa
from neutron.plugins.nicira import nsx_cluster
from neutron.plugins.nicira import NvpApiClient
from neutron.plugins.nicira import nvplib
from neutron.tests import base
from neutron.tests.unit.nicira import fake_nvpapiclient
from neutron.tests.unit.nicira import NVPAPI_NAME
from neutron.tests.unit.nicira import STUBS_PATH
from neutron.tests.unit import test_api_v2
_uuid = test_api_v2._uuid
class NsxlibTestCase(base.BaseTestCase):
def setUp(self):
# mock nvp api client
self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH)
self.mock_nsxapi = mock.patch(NVPAPI_NAME, autospec=True)
instance = self.mock_nsxapi.start()
instance.return_value.login.return_value = "the_cookie"
fake_version = getattr(self, 'fake_version', "3.0")
instance.return_value.get_nvp_version.return_value = (
NvpApiClient.NVPVersion(fake_version))
def _fake_request(*args, **kwargs):
return self.fc.fake_request(*args, **kwargs)
instance.return_value.request.side_effect = _fake_request
self.fake_cluster = nsx_cluster.NSXCluster(
name='fake-cluster', nsx_controllers=['1.1.1.1:999'],
default_tz_uuid=_uuid(), nsx_user='foo', nsx_password='bar')
self.fake_cluster.api_client = NvpApiClient.NVPApiHelper(
('1.1.1.1', '999', True),
self.fake_cluster.nsx_user, self.fake_cluster.nsx_password,
self.fake_cluster.req_timeout, self.fake_cluster.http_timeout,
self.fake_cluster.retries, self.fake_cluster.redirects)
super(NsxlibTestCase, self).setUp()
self.addCleanup(self.fc.reset_all)
self.addCleanup(self.mock_nsxapi.stop)
def _build_tag_dict(self, tags):
# This syntax is needed for python 2.6 compatibility
return dict((t['scope'], t['tag']) for t in tags)
class NsxlibNegativeBaseTestCase(base.BaseTestCase):
def setUp(self):
# mock nsx api client
self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH)
self.mock_nsxapi = mock.patch(NVPAPI_NAME, autospec=True)
instance = self.mock_nsxapi.start()
instance.return_value.login.return_value = "the_cookie"
# Choose 3.0, but the version is irrelevant for the aim of
# these tests as calls are throwing up errors anyway
fake_version = getattr(self, 'fake_version', "3.0")
instance.return_value.get_nvp_version.return_value = (
NvpApiClient.NVPVersion(fake_version))
def _faulty_request(*args, **kwargs):
raise nvplib.NvpApiClient.NvpApiException
instance.return_value.request.side_effect = _faulty_request
self.fake_cluster = nsx_cluster.NSXCluster(
name='fake-cluster', nsx_controllers=['1.1.1.1:999'],
default_tz_uuid=_uuid(), nsx_user='foo', nsx_password='bar')
self.fake_cluster.api_client = NvpApiClient.NVPApiHelper(
('1.1.1.1', '999', True),
self.fake_cluster.nsx_user, self.fake_cluster.nsx_password,
self.fake_cluster.req_timeout, self.fake_cluster.http_timeout,
self.fake_cluster.retries, self.fake_cluster.redirects)
super(NsxlibNegativeBaseTestCase, self).setUp()
self.addCleanup(self.fc.reset_all)
self.addCleanup(self.mock_nsxapi.stop)

View File

@ -17,14 +17,13 @@
from neutron.plugins.nicira.nsxlib import l2gateway as l2gwlib from neutron.plugins.nicira.nsxlib import l2gateway as l2gwlib
from neutron.plugins.nicira.nsxlib import switch as switchlib from neutron.plugins.nicira.nsxlib import switch as switchlib
from neutron.plugins.nicira import nvplib from neutron.plugins.nicira import nvplib
from neutron.tests.unit.nicira.test_nvplib import NsxlibNegativeBaseTestCase from neutron.tests.unit.nicira.nsxlib import base
from neutron.tests.unit.nicira.test_nvplib import NvplibTestCase
from neutron.tests.unit import test_api_v2 from neutron.tests.unit import test_api_v2
_uuid = test_api_v2._uuid _uuid = test_api_v2._uuid
class L2GatewayNegativeTestCase(NsxlibNegativeBaseTestCase): class L2GatewayNegativeTestCase(base.NsxlibNegativeBaseTestCase):
def test_create_l2_gw_service_on_failure(self): def test_create_l2_gw_service_on_failure(self):
self.assertRaises(nvplib.NvpApiClient.NvpApiException, self.assertRaises(nvplib.NvpApiClient.NvpApiException,
@ -55,7 +54,7 @@ class L2GatewayNegativeTestCase(NsxlibNegativeBaseTestCase):
'pluto') 'pluto')
class L2GatewayTestCase(NvplibTestCase): class L2GatewayTestCase(base.NsxlibTestCase):
def _create_gw_service(self, node_uuid, display_name, def _create_gw_service(self, node_uuid, display_name,
tenant_id='fake_tenant'): tenant_id='fake_tenant'):

View File

@ -19,10 +19,10 @@ import mock
from neutron.common import exceptions from neutron.common import exceptions
from neutron.plugins.nicira.nsxlib import queue as queuelib from neutron.plugins.nicira.nsxlib import queue as queuelib
from neutron.plugins.nicira import NvpApiClient from neutron.plugins.nicira import NvpApiClient
from neutron.tests.unit.nicira.test_nvplib import NvplibTestCase from neutron.tests.unit.nicira.nsxlib import base
class TestLogicalQueueLib(NvplibTestCase): class TestLogicalQueueLib(base.NsxlibTestCase):
def setUp(self): def setUp(self):
super(TestLogicalQueueLib, self).setUp() super(TestLogicalQueueLib, self).setUp()

View File

@ -19,18 +19,18 @@ import mock
from neutron.common import exceptions from neutron.common import exceptions
from neutron.openstack.common import uuidutils from neutron.openstack.common import uuidutils
from neutron.plugins.nicira.common import exceptions as nvp_exc from neutron.plugins.nicira.common import exceptions as nvp_exc
from neutron.plugins.nicira.common import utils
from neutron.plugins.nicira.nsxlib import router as routerlib from neutron.plugins.nicira.nsxlib import router as routerlib
from neutron.plugins.nicira.nsxlib import switch as switchlib from neutron.plugins.nicira.nsxlib import switch as switchlib
from neutron.plugins.nicira import NvpApiClient from neutron.plugins.nicira import NvpApiClient
from neutron.plugins.nicira import nvplib from neutron.plugins.nicira import nvplib
from neutron.tests.unit.nicira.test_nvplib import NsxlibNegativeBaseTestCase from neutron.tests.unit.nicira.nsxlib import base
from neutron.tests.unit.nicira.test_nvplib import NvplibTestCase
from neutron.tests.unit import test_api_v2 from neutron.tests.unit import test_api_v2
_uuid = test_api_v2._uuid _uuid = test_api_v2._uuid
class TestNatRules(NvplibTestCase): class TestNatRules(base.NsxlibTestCase):
def _test_create_lrouter_dnat_rule(self, version): def _test_create_lrouter_dnat_rule(self, version):
with mock.patch.object(self.fake_cluster.api_client, with mock.patch.object(self.fake_cluster.api_client,
@ -61,7 +61,7 @@ class TestNatRules(NvplibTestCase):
self._test_create_lrouter_dnat_rule('3.1') self._test_create_lrouter_dnat_rule('3.1')
class TestExplicitLRouters(NvplibTestCase): class TestExplicitLRouters(base.NsxlibTestCase):
def setUp(self): def setUp(self):
self.fake_version = '3.2' self.fake_version = '3.2'
@ -72,8 +72,7 @@ class TestExplicitLRouters(NvplibTestCase):
router = {'display_name': router_name, router = {'display_name': router_name,
'uuid': router_id, 'uuid': router_id,
'tags': [{'scope': 'quantum', 'tag': nvplib.NEUTRON_VERSION}, 'tags': utils.get_tags(os_tid=tenant_id),
{'scope': 'os_tid', 'tag': '%s' % tenant_id}],
'distributed': False, 'distributed': False,
'routing_config': {'type': 'RoutingTableRoutingConfig', 'routing_config': {'type': 'RoutingTableRoutingConfig',
'_schema': schema}, '_schema': schema},
@ -113,10 +112,8 @@ class TestExplicitLRouters(NvplibTestCase):
{'gateway_ip_address': 'fake_address', {'gateway_ip_address': 'fake_address',
'type': 'RouterNextHop'}, 'type': 'RouterNextHop'},
'type': 'SingleDefaultRouteImplicitRoutingConfig'}, 'type': 'SingleDefaultRouteImplicitRoutingConfig'},
'tags': [{'scope': 'os_tid', 'tag': 'fake_tenant_id'}, 'tags': utils.get_tags(os_tid='fake_tenant_id',
{'scope': 'q_router_id', 'tag': 'pipita_higuain'}, q_router_id='pipita_higuain'),
{'scope': 'quantum',
'tag': nvplib.NEUTRON_VERSION}],
'type': 'LogicalRouterConfig'} 'type': 'LogicalRouterConfig'}
self.assertEqual(expected, body) self.assertEqual(expected, body)
@ -129,11 +126,8 @@ class TestExplicitLRouters(NvplibTestCase):
tenant_id, router_type) tenant_id, router_type)
expected = {'display_name': 'fake_router_name', expected = {'display_name': 'fake_router_name',
'routing_config': {'type': 'RoutingTableRoutingConfig'}, 'routing_config': {'type': 'RoutingTableRoutingConfig'},
'tags': [{'scope': 'os_tid', 'tag': 'fake_tenant_id'}, 'tags': utils.get_tags(os_tid='fake_tenant_id',
{'scope': 'q_router_id', q_router_id='marekiaro_hamsik'),
'tag': 'marekiaro_hamsik'},
{'scope': 'quantum',
'tag': nvplib.NEUTRON_VERSION}],
'type': 'LogicalRouterConfig'} 'type': 'LogicalRouterConfig'}
self.assertEqual(expected, body) self.assertEqual(expected, body)
@ -253,7 +247,7 @@ class TestExplicitLRouters(NvplibTestCase):
self.fake_cluster, router_id, new_routes) self.fake_cluster, router_id, new_routes)
class RouterNegativeTestCase(NsxlibNegativeBaseTestCase): class RouterNegativeTestCase(base.NsxlibNegativeBaseTestCase):
def test_create_lrouter_on_failure(self): def test_create_lrouter_on_failure(self):
self.assertRaises(nvplib.NvpApiClient.NvpApiException, self.assertRaises(nvplib.NvpApiClient.NvpApiException,
@ -285,7 +279,7 @@ class RouterNegativeTestCase(NsxlibNegativeBaseTestCase):
'new_hop') 'new_hop')
class TestLogicalRouters(NvplibTestCase): class TestLogicalRouters(base.NsxlibTestCase):
def _verify_lrouter(self, res_lrouter, def _verify_lrouter(self, res_lrouter,
expected_uuid, expected_uuid,

View File

@ -0,0 +1,121 @@
# Copyright (c) 2014 VMware, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from neutron.common import exceptions
from neutron.plugins.nicira.nsxlib import secgroup as secgrouplib
from neutron.plugins.nicira import nvplib
from neutron.tests.unit.nicira.nsxlib import base
class SecurityProfileTestCase(base.NsxlibTestCase):
def test_create_and_get_security_profile(self):
sec_prof = secgrouplib.create_security_profile(
self.fake_cluster, 'pippo', {'name': 'test'})
sec_prof_res = nvplib.do_request(
secgrouplib.HTTP_GET,
nvplib._build_uri_path('security-profile',
resource_id=sec_prof['uuid']),
cluster=self.fake_cluster)
self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
# Check for builtin rules
self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 1)
self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 2)
def test_create_and_get_default_security_profile(self):
sec_prof = secgrouplib.create_security_profile(
self.fake_cluster, 'pippo', {'name': 'default'})
sec_prof_res = nvplib.do_request(
nvplib.HTTP_GET,
nvplib._build_uri_path('security-profile',
resource_id=sec_prof['uuid']),
cluster=self.fake_cluster)
self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
# Check for builtin rules
self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 3)
self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 2)
def test_update_security_profile_rules(self):
sec_prof = secgrouplib.create_security_profile(
self.fake_cluster, 'pippo', {'name': 'test'})
ingress_rule = {'ethertype': 'IPv4'}
egress_rule = {'ethertype': 'IPv4', 'profile_uuid': 'xyz'}
new_rules = {'logical_port_egress_rules': [egress_rule],
'logical_port_ingress_rules': [ingress_rule]}
secgrouplib.update_security_group_rules(
self.fake_cluster, sec_prof['uuid'], new_rules)
sec_prof_res = nvplib.do_request(
nvplib.HTTP_GET,
nvplib._build_uri_path('security-profile',
resource_id=sec_prof['uuid']),
cluster=self.fake_cluster)
self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
# Check for builtin rules
self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 2)
self.assertIn(egress_rule,
sec_prof_res['logical_port_egress_rules'])
self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 1)
self.assertIn(ingress_rule,
sec_prof_res['logical_port_ingress_rules'])
def test_update_security_profile_rules_noingress(self):
sec_prof = secgrouplib.create_security_profile(
self.fake_cluster, 'pippo', {'name': 'test'})
hidden_ingress_rule = {'ethertype': 'IPv4',
'ip_prefix': '127.0.0.1/32'}
egress_rule = {'ethertype': 'IPv4', 'profile_uuid': 'xyz'}
new_rules = {'logical_port_egress_rules': [egress_rule],
'logical_port_ingress_rules': []}
secgrouplib.update_security_group_rules(
self.fake_cluster, sec_prof['uuid'], new_rules)
sec_prof_res = nvplib.do_request(
nvplib.HTTP_GET,
nvplib._build_uri_path('security-profile',
resource_id=sec_prof['uuid']),
cluster=self.fake_cluster)
self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
# Check for builtin rules
self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 2)
self.assertIn(egress_rule,
sec_prof_res['logical_port_egress_rules'])
self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 1)
self.assertIn(hidden_ingress_rule,
sec_prof_res['logical_port_ingress_rules'])
def test_update_non_existing_securityprofile_raises(self):
self.assertRaises(exceptions.NeutronException,
secgrouplib.update_security_group_rules,
self.fake_cluster, 'whatever',
{'logical_port_egress_rules': [],
'logical_port_ingress_rules': []})
def test_delete_security_profile(self):
sec_prof = secgrouplib.create_security_profile(
self.fake_cluster, 'pippo', {'name': 'test'})
secgrouplib.delete_security_profile(
self.fake_cluster, sec_prof['uuid'])
self.assertRaises(exceptions.NotFound,
nvplib.do_request,
nvplib.HTTP_GET,
nvplib._build_uri_path(
'security-profile',
resource_id=sec_prof['uuid']),
cluster=self.fake_cluster)
def test_delete_non_existing_securityprofile_raises(self):
self.assertRaises(exceptions.NeutronException,
secgrouplib.delete_security_profile,
self.fake_cluster, 'whatever')

View File

@ -21,13 +21,13 @@ from neutron.common import constants
from neutron.common import exceptions from neutron.common import exceptions
from neutron.plugins.nicira.common import utils from neutron.plugins.nicira.common import utils
from neutron.plugins.nicira.nsxlib import switch as switchlib from neutron.plugins.nicira.nsxlib import switch as switchlib
from neutron.tests.unit.nicira.test_nvplib import NvplibTestCase from neutron.tests.unit.nicira.nsxlib import base
from neutron.tests.unit import test_api_v2 from neutron.tests.unit import test_api_v2
_uuid = test_api_v2._uuid _uuid = test_api_v2._uuid
class LogicalSwitchesTestCase(NvplibTestCase): class LogicalSwitchesTestCase(base.NsxlibTestCase):
def test_create_and_get_lswitches_single(self): def test_create_and_get_lswitches_single(self):
tenant_id = 'pippo' tenant_id = 'pippo'
@ -141,7 +141,7 @@ class LogicalSwitchesTestCase(NvplibTestCase):
self.fake_cluster, 'whatever', ['whatever']) self.fake_cluster, 'whatever', ['whatever'])
class LogicalPortsTestCase(NvplibTestCase): class LogicalPortsTestCase(base.NsxlibTestCase):
def _create_switch_and_port(self, tenant_id='pippo', def _create_switch_and_port(self, tenant_id='pippo',
neutron_port_id='whatever', neutron_port_id='whatever',
@ -169,7 +169,7 @@ class LogicalPortsTestCase(NvplibTestCase):
def test_plug_interface(self): def test_plug_interface(self):
lswitch, lport = self._create_switch_and_port() lswitch, lport = self._create_switch_and_port()
switchlib.plug_interface(self.fake_cluster, lswitch['uuid'], switchlib.plug_vif_interface(self.fake_cluster, lswitch['uuid'],
lport['uuid'], 'VifAttachment', 'fake') lport['uuid'], 'VifAttachment', 'fake')
lport_res = switchlib.get_port(self.fake_cluster, lport_res = switchlib.get_port(self.fake_cluster,
lswitch['uuid'], lport['uuid']) lswitch['uuid'], lport['uuid'])

View File

@ -26,8 +26,8 @@ from neutron import context
from neutron.extensions import l3 from neutron.extensions import l3
from neutron.manager import NeutronManager from neutron.manager import NeutronManager
from neutron.openstack.common import uuidutils from neutron.openstack.common import uuidutils
from neutron.plugins.nicira.common import utils
from neutron.plugins.nicira import NeutronServicePlugin as nsp from neutron.plugins.nicira import NeutronServicePlugin as nsp
from neutron.plugins.nicira import nvplib
from neutron.tests import base from neutron.tests import base
from neutron.tests.unit.nicira import NVPEXT_PATH from neutron.tests.unit.nicira import NVPEXT_PATH
from neutron.tests.unit.nicira import SERVICE_PLUGIN_NAME from neutron.tests.unit.nicira import SERVICE_PLUGIN_NAME
@ -246,11 +246,8 @@ class TestProxyCreateLswitch(base.BaseTestCase):
{'zone_uuid': 'foo_zone', {'zone_uuid': 'foo_zone',
'transport_type': 'stt'} 'transport_type': 'stt'}
] ]
self.tags = [ self.tags = utils.get_tags(quantum_net_id='foo_id',
{'scope': 'quantum', 'tag': nvplib.NEUTRON_VERSION}, os_tid=self.tenant_id)
{'scope': 'quantum_net_id', 'tag': 'foo_id'},
{'scope': 'os_tid', 'tag': self.tenant_id}
]
self.cluster = None self.cluster = None
def test_create_lswitch_with_basic_args(self): def test_create_lswitch_with_basic_args(self):
@ -261,7 +258,7 @@ class TestProxyCreateLswitch(base.BaseTestCase):
self.tz_config) self.tz_config)
self.assertEqual(self.display_name, result[0]) self.assertEqual(self.display_name, result[0])
self.assertEqual(self.tz_config, result[1]) self.assertEqual(self.tz_config, result[1])
self.assertEqual(self.tags, result[2]) self.assertEqual(sorted(self.tags), sorted(result[2]))
def test_create_lswitch_with_shared_as_kwarg(self): def test_create_lswitch_with_shared_as_kwarg(self):
result = nsp._process_base_create_lswitch_args(self.cluster, result = nsp._process_base_create_lswitch_args(self.cluster,
@ -271,7 +268,7 @@ class TestProxyCreateLswitch(base.BaseTestCase):
self.tz_config, self.tz_config,
shared=True) shared=True)
expected = self.tags + [{'scope': 'shared', 'tag': 'true'}] expected = self.tags + [{'scope': 'shared', 'tag': 'true'}]
self.assertEqual(expected, result[2]) self.assertEqual(sorted(expected), sorted(result[2]))
def test_create_lswitch_with_shared_as_arg(self): def test_create_lswitch_with_shared_as_arg(self):
result = nsp._process_base_create_lswitch_args(self.cluster, result = nsp._process_base_create_lswitch_args(self.cluster,
@ -282,7 +279,7 @@ class TestProxyCreateLswitch(base.BaseTestCase):
True) True)
additional_tags = [{'scope': 'shared', 'tag': 'true'}] additional_tags = [{'scope': 'shared', 'tag': 'true'}]
expected = self.tags + additional_tags expected = self.tags + additional_tags
self.assertEqual(expected, result[2]) self.assertEqual(sorted(expected), sorted(result[2]))
def test_create_lswitch_with_additional_tags(self): def test_create_lswitch_with_additional_tags(self):
more_tags = [{'scope': 'foo_scope', 'tag': 'foo_tag'}] more_tags = [{'scope': 'foo_scope', 'tag': 'foo_tag'}]
@ -293,4 +290,4 @@ class TestProxyCreateLswitch(base.BaseTestCase):
self.tz_config, self.tz_config,
tags=more_tags) tags=more_tags)
expected = self.tags + more_tags expected = self.tags + more_tags
self.assertEqual(expected, result[2]) self.assertEqual(sorted(expected), sorted(result[2]))

View File

@ -50,7 +50,6 @@ from neutron.plugins.nicira import NeutronPlugin
from neutron.plugins.nicira import nsxlib from neutron.plugins.nicira import nsxlib
from neutron.plugins.nicira import NvpApiClient from neutron.plugins.nicira import NvpApiClient
from neutron.plugins.nicira.NvpApiClient import NVPVersion from neutron.plugins.nicira.NvpApiClient import NVPVersion
from neutron.plugins.nicira import nvplib
from neutron.tests.unit import _test_extension_portbindings as test_bindings from neutron.tests.unit import _test_extension_portbindings as test_bindings
from neutron.tests.unit.nicira import fake_nvpapiclient from neutron.tests.unit.nicira import fake_nvpapiclient
from neutron.tests.unit.nicira import get_fake_conf from neutron.tests.unit.nicira import get_fake_conf
@ -266,7 +265,7 @@ class TestNiciraPortsV2(NiciraPluginV2TestCase,
def test_create_port_maintenance_returns_503(self): def test_create_port_maintenance_returns_503(self):
with self.network() as net: with self.network() as net:
with mock.patch.object(nvplib, 'do_request', with mock.patch.object(nsxlib.switch, 'do_request',
side_effect=nvp_exc.MaintenanceInProgress): side_effect=nvp_exc.MaintenanceInProgress):
data = {'port': {'network_id': net['network']['id'], data = {'port': {'network_id': net['network']['id'],
'admin_state_up': False, 'admin_state_up': False,
@ -368,7 +367,7 @@ class TestNiciraNetworksV2(test_plugin.TestNetworksV2,
data = {'network': {'name': 'foo', data = {'network': {'name': 'foo',
'admin_state_up': True, 'admin_state_up': True,
'tenant_id': self._tenant_id}} 'tenant_id': self._tenant_id}}
with mock.patch.object(nvplib, 'do_request', with mock.patch.object(nsxlib.switch, 'do_request',
side_effect=nvp_exc.MaintenanceInProgress): side_effect=nvp_exc.MaintenanceInProgress):
net_req = self.new_create_request('networks', data, self.fmt) net_req = self.new_create_request('networks', data, self.fmt)
res = net_req.get_response(self.api) res = net_req.get_response(self.api)

View File

@ -19,9 +19,14 @@ import mock
from neutron.db import api as db_api from neutron.db import api as db_api
from neutron.openstack.common import uuidutils from neutron.openstack.common import uuidutils
from neutron.plugins.nicira.common import exceptions as nvp_exc
from neutron.plugins.nicira.common import nsx_utils from neutron.plugins.nicira.common import nsx_utils
from neutron.plugins.nicira.common import utils
from neutron.plugins.nicira import NvpApiClient
from neutron.plugins.nicira import nvplib
from neutron.tests import base from neutron.tests import base
from neutron.tests.unit.nicira import nicira_method from neutron.tests.unit.nicira import nicira_method
from neutron.tests.unit.nicira.nsxlib import base as nsx_base
class NsxUtilsTestCase(base.BaseTestCase): class NsxUtilsTestCase(base.BaseTestCase):
@ -189,3 +194,98 @@ class NsxUtilsTestCase(base.BaseTestCase):
module_name='nsxlib.router'), module_name='nsxlib.router'),
return_value=[]): return_value=[]):
self._verify_get_nsx_router_id(None) self._verify_get_nsx_router_id(None)
def test_check_and_truncate_name_with_none(self):
name = None
result = utils.check_and_truncate(name)
self.assertEqual('', result)
def test_check_and_truncate_name_with_short_name(self):
name = 'foo_port_name'
result = utils.check_and_truncate(name)
self.assertEqual(name, result)
def test_check_and_truncate_name_long_name(self):
name = 'this_is_a_port_whose_name_is_longer_than_40_chars'
result = utils.check_and_truncate(name)
self.assertEqual(len(result), utils.MAX_DISPLAY_NAME_LEN)
def test_build_uri_path_plain(self):
result = nvplib._build_uri_path('RESOURCE')
self.assertEqual("%s/%s" % (nvplib.URI_PREFIX, 'RESOURCE'), result)
def test_build_uri_path_with_field(self):
result = nvplib._build_uri_path('RESOURCE', fields='uuid')
expected = "%s/%s?fields=uuid" % (nvplib.URI_PREFIX, 'RESOURCE')
self.assertEqual(expected, result)
def test_build_uri_path_with_filters(self):
filters = {"tag": 'foo', "tag_scope": "scope_foo"}
result = nvplib._build_uri_path('RESOURCE', filters=filters)
expected = (
"%s/%s?tag_scope=scope_foo&tag=foo" %
(nvplib.URI_PREFIX, 'RESOURCE'))
self.assertEqual(expected, result)
def test_build_uri_path_with_resource_id(self):
res = 'RESOURCE'
res_id = 'resource_id'
result = nvplib._build_uri_path(res, resource_id=res_id)
expected = "%s/%s/%s" % (nvplib.URI_PREFIX, res, res_id)
self.assertEqual(expected, result)
def test_build_uri_path_with_parent_and_resource_id(self):
parent_res = 'RESOURCE_PARENT'
child_res = 'RESOURCE_CHILD'
res = '%s/%s' % (child_res, parent_res)
par_id = 'parent_resource_id'
res_id = 'resource_id'
result = nvplib._build_uri_path(
res, parent_resource_id=par_id, resource_id=res_id)
expected = ("%s/%s/%s/%s/%s" %
(nvplib.URI_PREFIX, parent_res, par_id, child_res, res_id))
self.assertEqual(expected, result)
def test_build_uri_path_with_attachment(self):
parent_res = 'RESOURCE_PARENT'
child_res = 'RESOURCE_CHILD'
res = '%s/%s' % (child_res, parent_res)
par_id = 'parent_resource_id'
res_id = 'resource_id'
result = nvplib._build_uri_path(res, parent_resource_id=par_id,
resource_id=res_id, is_attachment=True)
expected = ("%s/%s/%s/%s/%s/%s" %
(nvplib.URI_PREFIX, parent_res,
par_id, child_res, res_id, 'attachment'))
self.assertEqual(expected, result)
def test_build_uri_path_with_extra_action(self):
parent_res = 'RESOURCE_PARENT'
child_res = 'RESOURCE_CHILD'
res = '%s/%s' % (child_res, parent_res)
par_id = 'parent_resource_id'
res_id = 'resource_id'
result = nvplib._build_uri_path(res, parent_resource_id=par_id,
resource_id=res_id, extra_action='doh')
expected = ("%s/%s/%s/%s/%s/%s" %
(nvplib.URI_PREFIX, parent_res,
par_id, child_res, res_id, 'doh'))
self.assertEqual(expected, result)
class ClusterManagementTestCase(nsx_base.NsxlibTestCase):
def test_cluster_in_readonly_mode(self):
with mock.patch.object(self.fake_cluster.api_client,
'request',
side_effect=NvpApiClient.ReadOnlyMode):
self.assertRaises(nvp_exc.MaintenanceInProgress,
nvplib.do_request, cluster=self.fake_cluster)
def test_cluster_method_not_implemented(self):
self.assertRaises(NvpApiClient.NvpApiException,
nvplib.do_request,
nvplib.HTTP_GET,
nvplib._build_uri_path('MY_FAKE_RESOURCE',
resource_id='foo'),
cluster=self.fake_cluster)

View File

@ -462,7 +462,7 @@ class NvpSyncTestCase(base.BaseTestCase):
with self._populate_data(ctx, net_size=net_size, with self._populate_data(ctx, net_size=net_size,
port_size=port_size, port_size=port_size,
router_size=router_size): router_size=router_size):
with mock.patch.object(nvplib, 'MAX_PAGE_SIZE', 15): with mock.patch.object(sync, 'MAX_PAGE_SIZE', 15):
# The following mock is just for counting calls, # The following mock is just for counting calls,
# but we will still run the actual function # but we will still run the actual function
with mock.patch.object( with mock.patch.object(

View File

@ -1,327 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2013 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# @author: Salvatore Orlando, VMware
import mock
from neutron.common import exceptions
from neutron.plugins.nicira.common import config # noqa
from neutron.plugins.nicira.common import exceptions as nvp_exc
from neutron.plugins.nicira.common import utils
from neutron.plugins.nicira import nsx_cluster
from neutron.plugins.nicira import NvpApiClient
from neutron.plugins.nicira import nvplib
from neutron.tests import base
from neutron.tests.unit.nicira import fake_nvpapiclient
from neutron.tests.unit.nicira import NVPAPI_NAME
from neutron.tests.unit.nicira import STUBS_PATH
from neutron.tests.unit import test_api_v2
_uuid = test_api_v2._uuid
class NvplibTestCase(base.BaseTestCase):
def setUp(self):
# mock nvp api client
self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH)
self.mock_nvpapi = mock.patch(NVPAPI_NAME, autospec=True)
instance = self.mock_nvpapi.start()
instance.return_value.login.return_value = "the_cookie"
fake_version = getattr(self, 'fake_version', "3.0")
instance.return_value.get_nvp_version.return_value = (
NvpApiClient.NVPVersion(fake_version))
def _fake_request(*args, **kwargs):
return self.fc.fake_request(*args, **kwargs)
instance.return_value.request.side_effect = _fake_request
self.fake_cluster = nsx_cluster.NSXCluster(
name='fake-cluster', nsx_controllers=['1.1.1.1:999'],
default_tz_uuid=_uuid(), nsx_user='foo', nsx_password='bar')
self.fake_cluster.api_client = NvpApiClient.NVPApiHelper(
('1.1.1.1', '999', True),
self.fake_cluster.nsx_user, self.fake_cluster.nsx_password,
self.fake_cluster.req_timeout, self.fake_cluster.http_timeout,
self.fake_cluster.retries, self.fake_cluster.redirects)
super(NvplibTestCase, self).setUp()
self.addCleanup(self.fc.reset_all)
self.addCleanup(self.mock_nvpapi.stop)
def _build_tag_dict(self, tags):
# This syntax is needed for python 2.6 compatibility
return dict((t['scope'], t['tag']) for t in tags)
class NsxlibNegativeBaseTestCase(base.BaseTestCase):
def setUp(self):
# mock nvp api client
self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH)
self.mock_nvpapi = mock.patch(NVPAPI_NAME, autospec=True)
instance = self.mock_nvpapi.start()
instance.return_value.login.return_value = "the_cookie"
# Choose 3.0, but the version is irrelevant for the aim of
# these tests as calls are throwing up errors anyway
fake_version = getattr(self, 'fake_version', "3.0")
instance.return_value.get_nvp_version.return_value = (
NvpApiClient.NVPVersion(fake_version))
def _faulty_request(*args, **kwargs):
raise nvplib.NvpApiClient.NvpApiException
instance.return_value.request.side_effect = _faulty_request
self.fake_cluster = nsx_cluster.NSXCluster(
name='fake-cluster', nsx_controllers=['1.1.1.1:999'],
default_tz_uuid=_uuid(), nsx_user='foo', nsx_password='bar')
self.fake_cluster.api_client = NvpApiClient.NVPApiHelper(
('1.1.1.1', '999', True),
self.fake_cluster.nsx_user, self.fake_cluster.nsx_password,
self.fake_cluster.req_timeout, self.fake_cluster.http_timeout,
self.fake_cluster.retries, self.fake_cluster.redirects)
super(NsxlibNegativeBaseTestCase, self).setUp()
self.addCleanup(self.fc.reset_all)
self.addCleanup(self.mock_nvpapi.stop)
class TestNvplibSecurityProfile(NvplibTestCase):
def test_create_and_get_security_profile(self):
sec_prof = nvplib.create_security_profile(self.fake_cluster,
'pippo', {'name': 'test'})
sec_prof_res = nvplib.do_request(
nvplib.HTTP_GET,
nvplib._build_uri_path('security-profile',
resource_id=sec_prof['uuid']),
cluster=self.fake_cluster)
self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
# Check for builtin rules
self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 1)
self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 2)
def test_create_and_get_default_security_profile(self):
sec_prof = nvplib.create_security_profile(self.fake_cluster,
'pippo',
{'name': 'default'})
sec_prof_res = nvplib.do_request(
nvplib.HTTP_GET,
nvplib._build_uri_path('security-profile',
resource_id=sec_prof['uuid']),
cluster=self.fake_cluster)
self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
# Check for builtin rules
self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 3)
self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 2)
def test_update_security_profile_rules(self):
sec_prof = nvplib.create_security_profile(self.fake_cluster,
'pippo', {'name': 'test'})
ingress_rule = {'ethertype': 'IPv4'}
egress_rule = {'ethertype': 'IPv4', 'profile_uuid': 'xyz'}
new_rules = {'logical_port_egress_rules': [egress_rule],
'logical_port_ingress_rules': [ingress_rule]}
nvplib.update_security_group_rules(self.fake_cluster,
sec_prof['uuid'],
new_rules)
sec_prof_res = nvplib.do_request(
nvplib.HTTP_GET,
nvplib._build_uri_path('security-profile',
resource_id=sec_prof['uuid']),
cluster=self.fake_cluster)
self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
# Check for builtin rules
self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 2)
self.assertIn(egress_rule,
sec_prof_res['logical_port_egress_rules'])
self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 1)
self.assertIn(ingress_rule,
sec_prof_res['logical_port_ingress_rules'])
def test_update_security_profile_rules_noingress(self):
sec_prof = nvplib.create_security_profile(self.fake_cluster,
'pippo', {'name': 'test'})
hidden_ingress_rule = {'ethertype': 'IPv4',
'ip_prefix': '127.0.0.1/32'}
egress_rule = {'ethertype': 'IPv4', 'profile_uuid': 'xyz'}
new_rules = {'logical_port_egress_rules': [egress_rule],
'logical_port_ingress_rules': []}
nvplib.update_security_group_rules(self.fake_cluster,
sec_prof['uuid'],
new_rules)
sec_prof_res = nvplib.do_request(
nvplib.HTTP_GET,
nvplib._build_uri_path('security-profile',
resource_id=sec_prof['uuid']),
cluster=self.fake_cluster)
self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
# Check for builtin rules
self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 2)
self.assertIn(egress_rule,
sec_prof_res['logical_port_egress_rules'])
self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 1)
self.assertIn(hidden_ingress_rule,
sec_prof_res['logical_port_ingress_rules'])
def test_update_non_existing_securityprofile_raises(self):
self.assertRaises(exceptions.NeutronException,
nvplib.update_security_group_rules,
self.fake_cluster, 'whatever',
{'logical_port_egress_rules': [],
'logical_port_ingress_rules': []})
def test_delete_security_profile(self):
sec_prof = nvplib.create_security_profile(self.fake_cluster,
'pippo', {'name': 'test'})
nvplib.delete_security_profile(self.fake_cluster, sec_prof['uuid'])
self.assertRaises(exceptions.NotFound,
nvplib.do_request,
nvplib.HTTP_GET,
nvplib._build_uri_path(
'security-profile',
resource_id=sec_prof['uuid']),
cluster=self.fake_cluster)
def test_delete_non_existing_securityprofile_raises(self):
self.assertRaises(exceptions.NeutronException,
nvplib.delete_security_profile,
self.fake_cluster, 'whatever')
class TestNvplibClusterManagement(NvplibTestCase):
def test_get_cluster_version(self):
def fakedorequest(*args, **kwargs):
uri = args[1]
if 'node/xyz' in uri:
return {'version': '3.0.9999'}
elif 'node' in uri:
return {'result_count': 1,
'results': [{'uuid': 'xyz'}]}
with mock.patch.object(nvplib, 'do_request', new=fakedorequest):
version = nvplib.get_cluster_version('whatever')
self.assertEqual(version, '3.0')
def test_get_cluster_version_no_nodes(self):
def fakedorequest(*args, **kwargs):
uri = args[1]
if 'node' in uri:
return {'result_count': 0}
with mock.patch.object(nvplib, 'do_request', new=fakedorequest):
version = nvplib.get_cluster_version('whatever')
self.assertIsNone(version)
def test_cluster_in_readonly_mode(self):
with mock.patch.object(self.fake_cluster.api_client,
'request',
side_effect=NvpApiClient.ReadOnlyMode):
self.assertRaises(nvp_exc.MaintenanceInProgress,
nvplib.do_request, cluster=self.fake_cluster)
def test_cluster_method_not_implemetned(self):
self.assertRaises(NvpApiClient.NvpApiException,
nvplib.do_request,
nvplib.HTTP_GET,
nvplib._build_uri_path('MY_FAKE_RESOURCE',
resource_id='foo'),
cluster=self.fake_cluster)
class NvplibMiscTestCase(base.BaseTestCase):
def test_check_and_truncate_name_with_none(self):
name = None
result = utils.check_and_truncate(name)
self.assertEqual('', result)
def test_check_and_truncate_name_with_short_name(self):
name = 'foo_port_name'
result = utils.check_and_truncate(name)
self.assertEqual(name, result)
def test_check_and_truncate_name_long_name(self):
name = 'this_is_a_port_whose_name_is_longer_than_40_chars'
result = utils.check_and_truncate(name)
self.assertEqual(len(result), utils.MAX_DISPLAY_NAME_LEN)
def test_build_uri_path_plain(self):
result = nvplib._build_uri_path('RESOURCE')
self.assertEqual("%s/%s" % (nvplib.URI_PREFIX, 'RESOURCE'), result)
def test_build_uri_path_with_field(self):
result = nvplib._build_uri_path('RESOURCE', fields='uuid')
expected = "%s/%s?fields=uuid" % (nvplib.URI_PREFIX, 'RESOURCE')
self.assertEqual(expected, result)
def test_build_uri_path_with_filters(self):
filters = {"tag": 'foo', "tag_scope": "scope_foo"}
result = nvplib._build_uri_path('RESOURCE', filters=filters)
expected = (
"%s/%s?tag_scope=scope_foo&tag=foo" %
(nvplib.URI_PREFIX, 'RESOURCE'))
self.assertEqual(expected, result)
def test_build_uri_path_with_resource_id(self):
res = 'RESOURCE'
res_id = 'resource_id'
result = nvplib._build_uri_path(res, resource_id=res_id)
expected = "%s/%s/%s" % (nvplib.URI_PREFIX, res, res_id)
self.assertEqual(expected, result)
def test_build_uri_path_with_parent_and_resource_id(self):
parent_res = 'RESOURCE_PARENT'
child_res = 'RESOURCE_CHILD'
res = '%s/%s' % (child_res, parent_res)
par_id = 'parent_resource_id'
res_id = 'resource_id'
result = nvplib._build_uri_path(
res, parent_resource_id=par_id, resource_id=res_id)
expected = ("%s/%s/%s/%s/%s" %
(nvplib.URI_PREFIX, parent_res, par_id, child_res, res_id))
self.assertEqual(expected, result)
def test_build_uri_path_with_attachment(self):
parent_res = 'RESOURCE_PARENT'
child_res = 'RESOURCE_CHILD'
res = '%s/%s' % (child_res, parent_res)
par_id = 'parent_resource_id'
res_id = 'resource_id'
result = nvplib._build_uri_path(res, parent_resource_id=par_id,
resource_id=res_id, is_attachment=True)
expected = ("%s/%s/%s/%s/%s/%s" %
(nvplib.URI_PREFIX, parent_res,
par_id, child_res, res_id, 'attachment'))
self.assertEqual(expected, result)
def test_build_uri_path_with_extra_action(self):
parent_res = 'RESOURCE_PARENT'
child_res = 'RESOURCE_CHILD'
res = '%s/%s' % (child_res, parent_res)
par_id = 'parent_resource_id'
res_id = 'resource_id'
result = nvplib._build_uri_path(res, parent_resource_id=par_id,
resource_id=res_id, extra_action='doh')
expected = ("%s/%s/%s/%s/%s/%s" %
(nvplib.URI_PREFIX, parent_res,
par_id, child_res, res_id, 'doh'))
self.assertEqual(expected, result)