Merge "Replaces network:* strings by constants"
This commit is contained in:
commit
9b4b174a03
@ -512,7 +512,7 @@ class Dnsmasq(DhcpLocalProcess):
|
||||
# provides all dnsmasq ip as dns-server if there is more than
|
||||
# one dnsmasq for a subnet and there is no dns-server submitted
|
||||
# by the server
|
||||
if port.device_owner == 'network:dhcp':
|
||||
if port.device_owner == constants.DEVICE_OWNER_DHCP:
|
||||
for ip in port.fixed_ips:
|
||||
i = subnet_idx_map.get(ip.subnet_id)
|
||||
if i is None:
|
||||
|
@ -43,8 +43,6 @@ from neutron import wsgi
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
DEVICE_OWNER_ROUTER_INTF = "network:router_interface"
|
||||
|
||||
|
||||
class MetadataProxyHandler(object):
|
||||
OPTS = [
|
||||
@ -133,7 +131,7 @@ class MetadataProxyHandler(object):
|
||||
else:
|
||||
internal_ports = qclient.list_ports(
|
||||
device_id=router_id,
|
||||
device_owner=DEVICE_OWNER_ROUTER_INTF)['ports']
|
||||
device_owner=n_const.DEVICE_OWNER_ROUTER_INTF)['ports']
|
||||
|
||||
networks = [p['network_id'] for p in internal_ports]
|
||||
|
||||
|
@ -36,8 +36,6 @@ from neutron.openstack.common import uuidutils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
AGENT_OWNER_PREFIX = 'network:'
|
||||
|
||||
# Ports with the following 'device_owner' values will not prevent
|
||||
# network deletion. If delete_network() finds that all ports on a
|
||||
# network have these owners, it will explicitly delete each port
|
||||
@ -45,7 +43,7 @@ AGENT_OWNER_PREFIX = 'network:'
|
||||
# finds out that all existing IP Allocations are associated with ports
|
||||
# with these owners, it will allow subnet deletion to proceed with the
|
||||
# IP allocations being cleaned up by cascade.
|
||||
AUTO_DELETE_PORT_OWNERS = ['network:dhcp']
|
||||
AUTO_DELETE_PORT_OWNERS = [constants.DEVICE_OWNER_DHCP]
|
||||
|
||||
|
||||
class CommonDbMixin(object):
|
||||
|
@ -191,7 +191,7 @@ class DhcpRpcCallbackMixin(object):
|
||||
tenant_id=network['tenant_id'],
|
||||
mac_address=attributes.ATTR_NOT_SPECIFIED,
|
||||
name='',
|
||||
device_owner='network:dhcp',
|
||||
device_owner=constants.DEVICE_OWNER_DHCP,
|
||||
fixed_ips=[dict(subnet_id=s) for s in dhcp_enabled_subnet_ids])
|
||||
|
||||
retval = self._port_action(plugin, context, {'port': port_dict},
|
||||
|
@ -214,7 +214,7 @@ class NeutronRestProxyV2Base(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
interfaces = []
|
||||
mapped_router = self._map_state_and_status(router)
|
||||
router_filter = {
|
||||
'device_owner': ["network:router_interface"],
|
||||
'device_owner': [const.DEVICE_OWNER_ROUTER_INTF],
|
||||
'device_id': [router.get('id')]
|
||||
}
|
||||
router_ports = self.get_ports(admin_context,
|
||||
@ -674,7 +674,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
|
||||
net = super(NeutronRestProxyV2,
|
||||
self).get_network(context, new_port["network_id"])
|
||||
if self.add_meta_server_route:
|
||||
if new_port['device_owner'] == 'network:dhcp':
|
||||
if new_port['device_owner'] == const.DEVICE_OWNER_DHCP:
|
||||
destination = METADATA_SERVER_IP + '/32'
|
||||
self._add_host_route(context, destination, new_port)
|
||||
|
||||
|
@ -24,6 +24,7 @@ import eventlet
|
||||
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
|
||||
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
|
||||
from neutron.api.v2 import attributes
|
||||
from neutron.common import constants
|
||||
from neutron.common import exceptions as n_exc
|
||||
from neutron.common import rpc as q_rpc
|
||||
from neutron.common import topics
|
||||
@ -1180,7 +1181,7 @@ class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
:returns: port object
|
||||
"""
|
||||
if ('device_id' in port['port'] and port['port']['device_owner'] in
|
||||
['network:dhcp', 'network:router_interface']):
|
||||
[constants.DEVICE_OWNER_DHCP, constants.DEVICE_OWNER_ROUTER_INTF]):
|
||||
p_profile_name = c_conf.CISCO_N1K.network_node_policy_profile
|
||||
p_profile = self._get_policy_profile_by_name(p_profile_name)
|
||||
if p_profile:
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
from heleosapi import info as h_info
|
||||
|
||||
from neutron.common import constants
|
||||
from neutron.db import models_v2
|
||||
from neutron.openstack.common import log as logging
|
||||
|
||||
@ -47,7 +48,8 @@ def retrieve_ip_allocation_info(context, neutron_port):
|
||||
return
|
||||
subnet = retrieve_subnet(context, subnet_id)
|
||||
allocated_ip = neutron_port["fixed_ips"][0]["ip_address"]
|
||||
is_gw_port = neutron_port["device_owner"] == "network:router_gateway"
|
||||
is_gw_port = (neutron_port["device_owner"] ==
|
||||
constants.DEVICE_OWNER_ROUTER_GW)
|
||||
gateway_ip = subnet["gateway_ip"]
|
||||
|
||||
ip_allocation_info = h_info.IpAllocationInfo(
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
from heleosapi import info as h_info
|
||||
|
||||
from neutron.common import constants
|
||||
from neutron import manager
|
||||
from neutron.plugins.embrane.l2base import support_base as base
|
||||
|
||||
@ -32,7 +33,8 @@ class FakePluginSupport(base.SupportBase):
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
network_id = neutron_port["network_id"]
|
||||
network = plugin._get_network(context, network_id)
|
||||
is_gw = neutron_port["device_owner"] == "network:router_gateway"
|
||||
is_gw = (neutron_port["device_owner"] ==
|
||||
constants.DEVICE_OWNER_ROUTER_GW)
|
||||
result = h_info.UtifInfo(vlan=0,
|
||||
network_name=network["name"],
|
||||
network_id=network["id"],
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
from heleosapi import info as h_info
|
||||
|
||||
from neutron.common import constants
|
||||
from neutron import manager
|
||||
from neutron.plugins.embrane.l2base import support_base as base
|
||||
from neutron.plugins.embrane.l2base import support_exceptions as exc
|
||||
@ -45,7 +46,8 @@ class OpenvswitchSupport(base.SupportBase):
|
||||
err_msg=_("No segmentation_id found for the network, "
|
||||
"please be sure that tenant_network_type is vlan"))
|
||||
network = plugin._get_network(context, network_id)
|
||||
is_gw = neutron_port["device_owner"] == "network:router_gateway"
|
||||
is_gw = (neutron_port["device_owner"] ==
|
||||
constants.DEVICE_OWNER_ROUTER_GW)
|
||||
result = h_info.UtifInfo(vlan=network_binding["segmentation_id"],
|
||||
network_name=network["name"],
|
||||
network_id=network["id"],
|
||||
|
@ -163,7 +163,7 @@ def _is_vif_port(port):
|
||||
def _is_dhcp_port(port):
|
||||
"""Check whether the given port is a DHCP port."""
|
||||
device_owner = port['device_owner']
|
||||
return device_owner.startswith('network:dhcp')
|
||||
return device_owner.startswith(constants.DEVICE_OWNER_DHCP)
|
||||
|
||||
|
||||
def _check_resource_exists(func, id, name, raise_exc=False):
|
||||
@ -1024,7 +1024,7 @@ class MidonetPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
rport_qry = context.session.query(models_v2.Port)
|
||||
dhcp_ports = rport_qry.filter_by(
|
||||
network_id=subnet["network_id"],
|
||||
device_owner='network:dhcp').all()
|
||||
device_owner=constants.DEVICE_OWNER_DHCP).all()
|
||||
if dhcp_ports and dhcp_ports[0].fixed_ips:
|
||||
metadata_gw_ip = dhcp_ports[0].fixed_ips[0].ip_address
|
||||
else:
|
||||
|
@ -25,6 +25,7 @@ import netaddr
|
||||
from oslo.config import cfg
|
||||
|
||||
from neutron.api.v2 import attributes
|
||||
from neutron.common import constants
|
||||
from neutron.db import db_base_plugin_v2
|
||||
from neutron.db import external_net_db
|
||||
from neutron.db import l3_db
|
||||
@ -184,7 +185,7 @@ class NeutronPluginPLUMgridV2(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
port_db = super(NeutronPluginPLUMgridV2, self).create_port(context,
|
||||
port)
|
||||
device_id = port_db["device_id"]
|
||||
if port_db["device_owner"] == "network:router_gateway":
|
||||
if port_db["device_owner"] == constants.DEVICE_OWNER_ROUTER_GW:
|
||||
router_db = self._get_router(context, device_id)
|
||||
else:
|
||||
router_db = None
|
||||
@ -212,7 +213,7 @@ class NeutronPluginPLUMgridV2(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
port_db = super(NeutronPluginPLUMgridV2, self).update_port(
|
||||
context, port_id, port)
|
||||
device_id = port_db["device_id"]
|
||||
if port_db["device_owner"] == "network:router_gateway":
|
||||
if port_db["device_owner"] == constants.DEVICE_OWNER_ROUTER_GW:
|
||||
router_db = self._get_router(context, device_id)
|
||||
else:
|
||||
router_db = None
|
||||
@ -242,7 +243,7 @@ class NeutronPluginPLUMgridV2(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
self.disassociate_floatingips(context, port_id)
|
||||
super(NeutronPluginPLUMgridV2, self).delete_port(context, port_id)
|
||||
|
||||
if port_db["device_owner"] == "network:router_gateway":
|
||||
if port_db["device_owner"] == constants.DEVICE_OWNER_ROUTER_GW:
|
||||
device_id = port_db["device_id"]
|
||||
router_db = self._get_router(context, device_id)
|
||||
else:
|
||||
|
@ -1028,7 +1028,7 @@ class NsxPluginV2(addr_pair_db.AllowedAddressPairsMixin,
|
||||
# Before deleting ports, ensure the peer of a NSX logical
|
||||
# port with a patch attachment is removed too
|
||||
port_filter = {'network_id': [id],
|
||||
'device_owner': ['network:router_interface']}
|
||||
'device_owner': [constants.DEVICE_OWNER_ROUTER_INTF]}
|
||||
router_iface_ports = self.get_ports(context, filters=port_filter)
|
||||
for port in router_iface_ports:
|
||||
nsx_switch_id, nsx_port_id = nsx_utils.get_nsx_switch_and_port_id(
|
||||
|
@ -585,10 +585,11 @@ class TestNecPluginOfcManager(NecPluginV2TestCase):
|
||||
with self.network() as network:
|
||||
with self.subnet(network=network):
|
||||
net = network['network']
|
||||
p = self._create_resource('port',
|
||||
p = self._create_resource(
|
||||
'port',
|
||||
{'network_id': net['id'],
|
||||
'tenant_id': net['tenant_id'],
|
||||
'device_owner': 'network:dhcp',
|
||||
'device_owner': constants.DEVICE_OWNER_DHCP,
|
||||
'device_id': 'dhcp-port1'})
|
||||
# Make sure that the port is created on OFC.
|
||||
portinfo = {'id': p['id'], 'port_no': 123}
|
||||
|
@ -31,6 +31,7 @@ from neutron.api.v2 import attributes
|
||||
from neutron.api.v2.attributes import ATTR_NOT_SPECIFIED
|
||||
from neutron.api.v2.router import APIRouter
|
||||
from neutron.common import config
|
||||
from neutron.common import constants
|
||||
from neutron.common import exceptions as n_exc
|
||||
from neutron.common.test_lib import test_config
|
||||
from neutron import context
|
||||
@ -1132,7 +1133,8 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
|
||||
admin_state_up=True)
|
||||
network = self.deserialize(self.fmt, res)
|
||||
network_id = network['network']['id']
|
||||
self._create_port(self.fmt, network_id, device_owner='network:dhcp')
|
||||
self._create_port(self.fmt, network_id,
|
||||
device_owner=constants.DEVICE_OWNER_DHCP)
|
||||
req = self.new_delete_request('networks', network_id)
|
||||
res = req.get_response(self.api)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPNoContent.code)
|
||||
@ -2398,7 +2400,7 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
|
||||
cidr, ip_version=4)
|
||||
self._create_port(self.fmt,
|
||||
network['network']['id'],
|
||||
device_owner='network:dhcp')
|
||||
device_owner=constants.DEVICE_OWNER_DHCP)
|
||||
req = self.new_delete_request('subnets', subnet['subnet']['id'])
|
||||
res = req.get_response(self.api)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPNoContent.code)
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
import mock
|
||||
|
||||
from neutron.common import constants
|
||||
from neutron.common import exceptions as n_exc
|
||||
from neutron.db import dhcp_rpc_base
|
||||
from neutron.openstack.common.db import exception as db_exc
|
||||
@ -54,7 +55,7 @@ class TestDhcpRpcCallbackMixin(base.BaseTestCase):
|
||||
def _test__port_action_with_failures(self, exc=None, action=None):
|
||||
port = {
|
||||
'network_id': 'foo_network_id',
|
||||
'device_owner': 'network:dhcp',
|
||||
'device_owner': constants.DEVICE_OWNER_DHCP,
|
||||
'fixed_ips': [{'subnet_id': 'foo_subnet_id'}]
|
||||
}
|
||||
self.plugin.create_port.side_effect = exc
|
||||
@ -187,7 +188,7 @@ class TestDhcpRpcCallbackMixin(base.BaseTestCase):
|
||||
create_spec = dict(tenant_id='tenantid', device_id='devid',
|
||||
network_id='netid', name='',
|
||||
admin_state_up=True,
|
||||
device_owner='network:dhcp',
|
||||
device_owner=constants.DEVICE_OWNER_DHCP,
|
||||
mac_address=mock.ANY)
|
||||
create_retval = create_spec.copy()
|
||||
create_retval['id'] = 'port_id'
|
||||
|
@ -19,6 +19,7 @@ import contextlib
|
||||
from oslo.config import cfg
|
||||
from webob import exc
|
||||
|
||||
from neutron.common import constants
|
||||
from neutron.db import extraroute_db
|
||||
from neutron.extensions import extraroute
|
||||
from neutron.extensions import l3
|
||||
@ -392,7 +393,6 @@ class ExtraRouteDBTestCaseBase(object):
|
||||
p['port']['id'])
|
||||
|
||||
def test_router_update_on_external_port(self):
|
||||
DEVICE_OWNER_ROUTER_GW = "network:router_gateway"
|
||||
with self.router() as r:
|
||||
with self.subnet(cidr='10.0.1.0/24') as s:
|
||||
self._set_net_external(s['subnet']['network_id'])
|
||||
@ -402,11 +402,12 @@ class ExtraRouteDBTestCaseBase(object):
|
||||
body = self._show('routers', r['router']['id'])
|
||||
net_id = body['router']['external_gateway_info']['network_id']
|
||||
self.assertEqual(net_id, s['subnet']['network_id'])
|
||||
port_res = self._list_ports('json',
|
||||
port_res = self._list_ports(
|
||||
'json',
|
||||
200,
|
||||
s['subnet']['network_id'],
|
||||
tenant_id=r['router']['tenant_id'],
|
||||
device_own=DEVICE_OWNER_ROUTER_GW)
|
||||
device_own=constants.DEVICE_OWNER_ROUTER_GW)
|
||||
port_list = self.deserialize('json', port_res)
|
||||
self.assertEqual(len(port_list['ports']), 1)
|
||||
|
||||
|
@ -1146,7 +1146,8 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
|
||||
port_id=private_port['port']['id'])
|
||||
self.assertEqual(res.status_int, 400)
|
||||
for p in self._list('ports')['ports']:
|
||||
if p['device_owner'] == 'network:floatingip':
|
||||
if (p['device_owner'] ==
|
||||
l3_constants.DEVICE_OWNER_FLOATINGIP):
|
||||
self.fail('garbage port is not deleted')
|
||||
self._remove_external_gateway_from_router(
|
||||
r['router']['id'],
|
||||
@ -1184,7 +1185,8 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
|
||||
self.assertEqual(res.status_int, exc.HTTPConflict.code)
|
||||
|
||||
for p in self._list('ports')['ports']:
|
||||
if p['device_owner'] == 'network:floatingip':
|
||||
if (p['device_owner'] ==
|
||||
l3_constants.DEVICE_OWNER_FLOATINGIP):
|
||||
self.fail('garbage port is not deleted')
|
||||
|
||||
self._remove_external_gateway_from_router(
|
||||
@ -1341,7 +1343,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
|
||||
found = False
|
||||
with self.floatingip_with_assoc():
|
||||
for p in self._list('ports')['ports']:
|
||||
if p['device_owner'] == 'network:floatingip':
|
||||
if p['device_owner'] == l3_constants.DEVICE_OWNER_FLOATINGIP:
|
||||
self._delete('ports', p['id'],
|
||||
expected_code=exc.HTTPConflict.code)
|
||||
found = True
|
||||
@ -1524,7 +1526,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
|
||||
found = False
|
||||
with self.floatingip_with_assoc():
|
||||
for p in self._list('ports')['ports']:
|
||||
if p['device_owner'] == 'network:router_interface':
|
||||
if p['device_owner'] == l3_constants.DEVICE_OWNER_ROUTER_INTF:
|
||||
subnet_id = p['fixed_ips'][0]['subnet_id']
|
||||
router_id = p['device_id']
|
||||
self._router_interface_action(
|
||||
@ -1538,7 +1540,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
|
||||
found = False
|
||||
with self.floatingip_with_assoc():
|
||||
for p in self._list('ports')['ports']:
|
||||
if p['device_owner'] == 'network:router_interface':
|
||||
if p['device_owner'] == l3_constants.DEVICE_OWNER_ROUTER_INTF:
|
||||
router_id = p['device_id']
|
||||
self._router_interface_action(
|
||||
'remove', router_id, None, p['id'],
|
||||
|
@ -23,6 +23,7 @@ from oslo.config import cfg
|
||||
from neutron.agent.common import config
|
||||
from neutron.agent.linux import dhcp
|
||||
from neutron.common import config as base_config
|
||||
from neutron.common import constants
|
||||
from neutron.openstack.common import log as logging
|
||||
from neutron.tests import base
|
||||
|
||||
@ -80,7 +81,7 @@ class FakePort3:
|
||||
class FakeRouterPort:
|
||||
id = 'rrrrrrrr-rrrr-rrrr-rrrr-rrrrrrrrrrrr'
|
||||
admin_state_up = True
|
||||
device_owner = 'network:router_interface'
|
||||
device_owner = constants.DEVICE_OWNER_ROUTER_INTF
|
||||
fixed_ips = [FakeIPAllocation('192.168.0.1',
|
||||
'dddddddd-dddd-dddd-dddd-dddddddddddd')]
|
||||
mac_address = '00:00:0f:rr:rr:rr'
|
||||
@ -92,7 +93,7 @@ class FakeRouterPort:
|
||||
class FakePortMultipleAgents1:
|
||||
id = 'rrrrrrrr-rrrr-rrrr-rrrr-rrrrrrrrrrrr'
|
||||
admin_state_up = True
|
||||
device_owner = 'network:dhcp'
|
||||
device_owner = constants.DEVICE_OWNER_DHCP
|
||||
fixed_ips = [FakeIPAllocation('192.168.0.5',
|
||||
'dddddddd-dddd-dddd-dddd-dddddddddddd')]
|
||||
mac_address = '00:00:0f:dd:dd:dd'
|
||||
@ -104,7 +105,7 @@ class FakePortMultipleAgents1:
|
||||
class FakePortMultipleAgents2:
|
||||
id = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'
|
||||
admin_state_up = True
|
||||
device_owner = 'network:dhcp'
|
||||
device_owner = constants.DEVICE_OWNER_DHCP
|
||||
fixed_ips = [FakeIPAllocation('192.168.0.6',
|
||||
'dddddddd-dddd-dddd-dddd-dddddddddddd')]
|
||||
mac_address = '00:00:0f:ee:ee:ee'
|
||||
|
@ -23,6 +23,7 @@ import testtools
|
||||
import webob
|
||||
|
||||
from neutron.agent.metadata import agent
|
||||
from neutron.common import constants
|
||||
from neutron.common import utils
|
||||
from neutron.tests import base
|
||||
|
||||
@ -112,7 +113,7 @@ class TestMetadataProxyHandler(base.BaseTestCase):
|
||||
expected.append(
|
||||
mock.call().list_ports(
|
||||
device_id=router_id,
|
||||
device_owner='network:router_interface'
|
||||
device_owner=constants.DEVICE_OWNER_ROUTER_INTF
|
||||
)
|
||||
)
|
||||
|
||||
|
@ -17,6 +17,7 @@ import mock
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from neutron.common import constants as n_consts
|
||||
from neutron.common import exceptions as n_exc
|
||||
from neutron import context
|
||||
from neutron.db import api as db
|
||||
@ -940,12 +941,12 @@ class DhcpAgentNotifyAPITestCase(base.BaseTestCase):
|
||||
def test_notify_ports_update_with_special_ports(self):
|
||||
ports = [{'fixed_ips': [],
|
||||
'device_id': '',
|
||||
'device_owner': 'network:dhcp',
|
||||
'device_owner': n_consts.DEVICE_OWNER_DHCP,
|
||||
'mac_address': 'fa:16:3e:da:1d:46'},
|
||||
{'fixed_ips': [{'subnet_id': 'foo_subnet_id',
|
||||
'ip_address': '1.2.3.4'}],
|
||||
'device_id': 'foo_device_id',
|
||||
'device_owner': 'network:router_gateway',
|
||||
'device_owner': n_consts.DEVICE_OWNER_ROUTER_GW,
|
||||
'mac_address': 'fa:16:3e:da:1d:46'}]
|
||||
call_args = mock.call(
|
||||
mock.ANY, 'foo_network_id', 'foo_subnet_id', dhcp=[], meta=[])
|
||||
@ -1014,7 +1015,7 @@ class DhcpAgentNotifyAPITestCase(base.BaseTestCase):
|
||||
'admin_state_up': True,
|
||||
'network_id': 'foo_network_id',
|
||||
'tenant_id': 'foo_tenant_id',
|
||||
'device_owner': 'network:dhcp',
|
||||
'device_owner': n_consts.DEVICE_OWNER_DHCP,
|
||||
'mac_address': mock.ANY,
|
||||
'fixed_ips': [{'subnet_id': 'foo_subnet_id'}],
|
||||
'device_id': ''
|
||||
@ -1102,7 +1103,7 @@ class DhcpAgentNotifyAPITestCase(base.BaseTestCase):
|
||||
self.notifier.notify(mock.ANY, {'subnet': subnet}, 'subnet.delete.end')
|
||||
filters = {
|
||||
'network_id': [subnet['network_id']],
|
||||
'device_owner': ['network:dhcp']
|
||||
'device_owner': [n_consts.DEVICE_OWNER_DHCP]
|
||||
}
|
||||
self.plugin.get_ports.assert_called_once_with(
|
||||
mock.ANY, filters=filters)
|
||||
@ -1149,7 +1150,7 @@ class DhcpTestCase(base.BaseTestCase):
|
||||
}
|
||||
port = {
|
||||
'id': 'foo_port_id',
|
||||
'device_owner': 'network:dhcp',
|
||||
'device_owner': n_consts.DEVICE_OWNER_DHCP,
|
||||
'mac_address': 'aa:bb:cc:dd:ee:ff',
|
||||
'network_id': 'foo_network_id',
|
||||
'fixed_ips': [{'subnet_id': subnet['id']}]
|
||||
@ -1184,7 +1185,7 @@ class DhcpTestCase(base.BaseTestCase):
|
||||
def test_handle_delete_dhcp_owner_port(self):
|
||||
port = {
|
||||
'id': 'foo_port_id',
|
||||
'device_owner': 'network:dhcp',
|
||||
'device_owner': n_consts.DEVICE_OWNER_DHCP,
|
||||
'network_id': 'foo_network_id',
|
||||
'fixed_ips': [],
|
||||
'mac_address': 'aa:bb:cc:dd:ee:ff'
|
||||
@ -1297,15 +1298,15 @@ class MetadataTestCase(base.BaseTestCase):
|
||||
|
||||
def test_handle_port_metadata_access_dhcp_port(self):
|
||||
self._test_handle_port_metadata_access_special_owners(
|
||||
'network:dhcp', [{'subnet_id': 'foo_subnet'}])
|
||||
n_consts.DEVICE_OWNER_DHCP, [{'subnet_id': 'foo_subnet'}])
|
||||
|
||||
def test_handle_port_metadata_access_router_port(self):
|
||||
self._test_handle_port_metadata_access_special_owners(
|
||||
'network:router_interface', [{'subnet_id': 'foo_subnet'}])
|
||||
n_consts.DEVICE_OWNER_ROUTER_INTF, [{'subnet_id': 'foo_subnet'}])
|
||||
|
||||
def test_handle_port_metadata_access_no_device_id(self):
|
||||
self._test_handle_port_metadata_access_special_owners(
|
||||
'network:dhcp', '')
|
||||
n_consts.DEVICE_OWNER_DHCP, '')
|
||||
|
||||
def test_handle_port_metadata_access_no_fixed_ips(self):
|
||||
self._test_handle_port_metadata_access_special_owners(
|
||||
|
@ -221,7 +221,7 @@ class TestPortsV2(NsxPluginV2TestCase,
|
||||
with mock.patch.object(nsx_db, 'add_neutron_nsx_port_mapping',
|
||||
side_effect=db_exception):
|
||||
with self.network() as net:
|
||||
with self.port(device_owner='network:dhcp'):
|
||||
with self.port(device_owner=constants.DEVICE_OWNER_DHCP):
|
||||
self._verify_no_orphan_left(net['network']['id'])
|
||||
|
||||
def test_create_port_maintenance_returns_503(self):
|
||||
@ -909,7 +909,7 @@ class TestL3NatTestCase(L3NatTest,
|
||||
subnets = self._list('subnets')['subnets']
|
||||
with self.subnet() as s:
|
||||
with self.port(subnet=s, device_id='1234',
|
||||
device_owner='network:dhcp'):
|
||||
device_owner=constants.DEVICE_OWNER_DHCP):
|
||||
subnets = self._list('subnets')['subnets']
|
||||
self.assertEqual(len(subnets), 1)
|
||||
self.assertEqual(subnets[0]['host_routes'][0]['nexthop'],
|
||||
|
Loading…
Reference in New Issue
Block a user