Replaces network:* strings by constants

This patch replaces all occurences of the strings
prefixed by network:* by their constant equivalent.

Closes-bug: #1270863
Change-Id: I149cc0ab7bde08ea83057e6c0697f668edbe29db
This commit is contained in:
Sylvain Afchain 2014-01-17 06:40:43 +01:00 committed by Akihiro Motoki
parent 3099c30b90
commit 0133631403
21 changed files with 69 additions and 55 deletions

View File

@ -520,7 +520,7 @@ class Dnsmasq(DhcpLocalProcess):
# provides all dnsmasq ip as dns-server if there is more than # provides all dnsmasq ip as dns-server if there is more than
# one dnsmasq for a subnet and there is no dns-server submitted # one dnsmasq for a subnet and there is no dns-server submitted
# by the server # by the server
if port.device_owner == 'network:dhcp': if port.device_owner == constants.DEVICE_OWNER_DHCP:
for ip in port.fixed_ips: for ip in port.fixed_ips:
i = subnet_idx_map.get(ip.subnet_id) i = subnet_idx_map.get(ip.subnet_id)
if i is None: if i is None:

View File

@ -43,8 +43,6 @@ from neutron import wsgi
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
DEVICE_OWNER_ROUTER_INTF = "network:router_interface"
class MetadataProxyHandler(object): class MetadataProxyHandler(object):
OPTS = [ OPTS = [
@ -133,7 +131,7 @@ class MetadataProxyHandler(object):
else: else:
internal_ports = qclient.list_ports( internal_ports = qclient.list_ports(
device_id=router_id, device_id=router_id,
device_owner=DEVICE_OWNER_ROUTER_INTF)['ports'] device_owner=n_const.DEVICE_OWNER_ROUTER_INTF)['ports']
networks = [p['network_id'] for p in internal_ports] networks = [p['network_id'] for p in internal_ports]

View File

@ -38,8 +38,6 @@ from neutron.openstack.common import uuidutils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
AGENT_OWNER_PREFIX = 'network:'
# Ports with the following 'device_owner' values will not prevent # Ports with the following 'device_owner' values will not prevent
# network deletion. If delete_network() finds that all ports on a # network deletion. If delete_network() finds that all ports on a
# network have these owners, it will explicitly delete each port # network have these owners, it will explicitly delete each port
@ -47,7 +45,7 @@ AGENT_OWNER_PREFIX = 'network:'
# finds out that all existing IP Allocations are associated with ports # finds out that all existing IP Allocations are associated with ports
# with these owners, it will allow subnet deletion to proceed with the # with these owners, it will allow subnet deletion to proceed with the
# IP allocations being cleaned up by cascade. # IP allocations being cleaned up by cascade.
AUTO_DELETE_PORT_OWNERS = ['network:dhcp'] AUTO_DELETE_PORT_OWNERS = [constants.DEVICE_OWNER_DHCP]
class CommonDbMixin(object): class CommonDbMixin(object):

View File

@ -191,7 +191,7 @@ class DhcpRpcCallbackMixin(object):
tenant_id=network['tenant_id'], tenant_id=network['tenant_id'],
mac_address=attributes.ATTR_NOT_SPECIFIED, mac_address=attributes.ATTR_NOT_SPECIFIED,
name='', name='',
device_owner='network:dhcp', device_owner=constants.DEVICE_OWNER_DHCP,
fixed_ips=[dict(subnet_id=s) for s in dhcp_enabled_subnet_ids]) fixed_ips=[dict(subnet_id=s) for s in dhcp_enabled_subnet_ids])
retval = self._port_action(plugin, context, {'port': port_dict}, retval = self._port_action(plugin, context, {'port': port_dict},

View File

@ -221,7 +221,7 @@ class NeutronRestProxyV2Base(db_base_plugin_v2.NeutronDbPluginV2,
interfaces = [] interfaces = []
mapped_router = self._map_state_and_status(router) mapped_router = self._map_state_and_status(router)
router_filter = { router_filter = {
'device_owner': ["network:router_interface"], 'device_owner': [const.DEVICE_OWNER_ROUTER_INTF],
'device_id': [router.get('id')] 'device_id': [router.get('id')]
} }
router_ports = self.get_ports(admin_context, router_ports = self.get_ports(admin_context,
@ -642,7 +642,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
net = super(NeutronRestProxyV2, net = super(NeutronRestProxyV2,
self).get_network(context, new_port["network_id"]) self).get_network(context, new_port["network_id"])
if self.add_meta_server_route: if self.add_meta_server_route:
if new_port['device_owner'] == 'network:dhcp': if new_port['device_owner'] == const.DEVICE_OWNER_DHCP:
destination = METADATA_SERVER_IP + '/32' destination = METADATA_SERVER_IP + '/32'
self._add_host_route(context, destination, new_port) self._add_host_route(context, destination, new_port)

View File

@ -24,6 +24,7 @@ import eventlet
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.v2 import attributes from neutron.api.v2 import attributes
from neutron.common import constants
from neutron.common import exceptions as n_exc from neutron.common import exceptions as n_exc
from neutron.common import rpc as q_rpc from neutron.common import rpc as q_rpc
from neutron.common import topics from neutron.common import topics
@ -1180,7 +1181,7 @@ class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
:returns: port object :returns: port object
""" """
if ('device_id' in port['port'] and port['port']['device_owner'] in if ('device_id' in port['port'] and port['port']['device_owner'] in
['network:dhcp', 'network:router_interface']): [constants.DEVICE_OWNER_DHCP, constants.DEVICE_OWNER_ROUTER_INTF]):
p_profile_name = c_conf.CISCO_N1K.network_node_policy_profile p_profile_name = c_conf.CISCO_N1K.network_node_policy_profile
p_profile = self._get_policy_profile_by_name(p_profile_name) p_profile = self._get_policy_profile_by_name(p_profile_name)
if p_profile: if p_profile:

View File

@ -19,6 +19,7 @@
from heleosapi import info as h_info from heleosapi import info as h_info
from neutron.common import constants
from neutron.db import models_v2 from neutron.db import models_v2
from neutron.openstack.common import log as logging from neutron.openstack.common import log as logging
@ -47,7 +48,8 @@ def retrieve_ip_allocation_info(context, neutron_port):
return return
subnet = retrieve_subnet(context, subnet_id) subnet = retrieve_subnet(context, subnet_id)
allocated_ip = neutron_port["fixed_ips"][0]["ip_address"] allocated_ip = neutron_port["fixed_ips"][0]["ip_address"]
is_gw_port = neutron_port["device_owner"] == "network:router_gateway" is_gw_port = (neutron_port["device_owner"] ==
constants.DEVICE_OWNER_ROUTER_GW)
gateway_ip = subnet["gateway_ip"] gateway_ip = subnet["gateway_ip"]
ip_allocation_info = h_info.IpAllocationInfo( ip_allocation_info = h_info.IpAllocationInfo(

View File

@ -19,6 +19,7 @@
from heleosapi import info as h_info from heleosapi import info as h_info
from neutron.common import constants
from neutron import manager from neutron import manager
from neutron.plugins.embrane.l2base import support_base as base from neutron.plugins.embrane.l2base import support_base as base
@ -32,7 +33,8 @@ class FakePluginSupport(base.SupportBase):
plugin = manager.NeutronManager.get_plugin() plugin = manager.NeutronManager.get_plugin()
network_id = neutron_port["network_id"] network_id = neutron_port["network_id"]
network = plugin._get_network(context, network_id) network = plugin._get_network(context, network_id)
is_gw = neutron_port["device_owner"] == "network:router_gateway" is_gw = (neutron_port["device_owner"] ==
constants.DEVICE_OWNER_ROUTER_GW)
result = h_info.UtifInfo(vlan=0, result = h_info.UtifInfo(vlan=0,
network_name=network["name"], network_name=network["name"],
network_id=network["id"], network_id=network["id"],

View File

@ -19,6 +19,7 @@
from heleosapi import info as h_info from heleosapi import info as h_info
from neutron.common import constants
from neutron import manager from neutron import manager
from neutron.plugins.embrane.l2base import support_base as base from neutron.plugins.embrane.l2base import support_base as base
from neutron.plugins.embrane.l2base import support_exceptions as exc from neutron.plugins.embrane.l2base import support_exceptions as exc
@ -45,7 +46,8 @@ class OpenvswitchSupport(base.SupportBase):
err_msg=_("No segmentation_id found for the network, " err_msg=_("No segmentation_id found for the network, "
"please be sure that tenant_network_type is vlan")) "please be sure that tenant_network_type is vlan"))
network = plugin._get_network(context, network_id) network = plugin._get_network(context, network_id)
is_gw = neutron_port["device_owner"] == "network:router_gateway" is_gw = (neutron_port["device_owner"] ==
constants.DEVICE_OWNER_ROUTER_GW)
result = h_info.UtifInfo(vlan=network_binding["segmentation_id"], result = h_info.UtifInfo(vlan=network_binding["segmentation_id"],
network_name=network["name"], network_name=network["name"],
network_id=network["id"], network_id=network["id"],

View File

@ -163,7 +163,7 @@ def _is_vif_port(port):
def _is_dhcp_port(port): def _is_dhcp_port(port):
"""Check whether the given port is a DHCP port.""" """Check whether the given port is a DHCP port."""
device_owner = port['device_owner'] device_owner = port['device_owner']
return device_owner.startswith('network:dhcp') return device_owner.startswith(constants.DEVICE_OWNER_DHCP)
def _check_resource_exists(func, id, name, raise_exc=False): def _check_resource_exists(func, id, name, raise_exc=False):
@ -1024,7 +1024,7 @@ class MidonetPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
rport_qry = context.session.query(models_v2.Port) rport_qry = context.session.query(models_v2.Port)
dhcp_ports = rport_qry.filter_by( dhcp_ports = rport_qry.filter_by(
network_id=subnet["network_id"], network_id=subnet["network_id"],
device_owner='network:dhcp').all() device_owner=constants.DEVICE_OWNER_DHCP).all()
if dhcp_ports and dhcp_ports[0].fixed_ips: if dhcp_ports and dhcp_ports[0].fixed_ips:
metadata_gw_ip = dhcp_ports[0].fixed_ips[0].ip_address metadata_gw_ip = dhcp_ports[0].fixed_ips[0].ip_address
else: else:

View File

@ -25,6 +25,7 @@ import netaddr
from oslo.config import cfg from oslo.config import cfg
from neutron.api.v2 import attributes from neutron.api.v2 import attributes
from neutron.common import constants
from neutron.db import db_base_plugin_v2 from neutron.db import db_base_plugin_v2
from neutron.db import external_net_db from neutron.db import external_net_db
from neutron.db import l3_db from neutron.db import l3_db
@ -184,7 +185,7 @@ class NeutronPluginPLUMgridV2(db_base_plugin_v2.NeutronDbPluginV2,
port_db = super(NeutronPluginPLUMgridV2, self).create_port(context, port_db = super(NeutronPluginPLUMgridV2, self).create_port(context,
port) port)
device_id = port_db["device_id"] device_id = port_db["device_id"]
if port_db["device_owner"] == "network:router_gateway": if port_db["device_owner"] == constants.DEVICE_OWNER_ROUTER_GW:
router_db = self._get_router(context, device_id) router_db = self._get_router(context, device_id)
else: else:
router_db = None router_db = None
@ -212,7 +213,7 @@ class NeutronPluginPLUMgridV2(db_base_plugin_v2.NeutronDbPluginV2,
port_db = super(NeutronPluginPLUMgridV2, self).update_port( port_db = super(NeutronPluginPLUMgridV2, self).update_port(
context, port_id, port) context, port_id, port)
device_id = port_db["device_id"] device_id = port_db["device_id"]
if port_db["device_owner"] == "network:router_gateway": if port_db["device_owner"] == constants.DEVICE_OWNER_ROUTER_GW:
router_db = self._get_router(context, device_id) router_db = self._get_router(context, device_id)
else: else:
router_db = None router_db = None
@ -242,7 +243,7 @@ class NeutronPluginPLUMgridV2(db_base_plugin_v2.NeutronDbPluginV2,
self.disassociate_floatingips(context, port_id) self.disassociate_floatingips(context, port_id)
super(NeutronPluginPLUMgridV2, self).delete_port(context, port_id) super(NeutronPluginPLUMgridV2, self).delete_port(context, port_id)
if port_db["device_owner"] == "network:router_gateway": if port_db["device_owner"] == constants.DEVICE_OWNER_ROUTER_GW:
device_id = port_db["device_id"] device_id = port_db["device_id"]
router_db = self._get_router(context, device_id) router_db = self._get_router(context, device_id)
else: else:

View File

@ -1028,7 +1028,7 @@ class NsxPluginV2(addr_pair_db.AllowedAddressPairsMixin,
# Before deleting ports, ensure the peer of a NSX logical # Before deleting ports, ensure the peer of a NSX logical
# port with a patch attachment is removed too # port with a patch attachment is removed too
port_filter = {'network_id': [id], port_filter = {'network_id': [id],
'device_owner': ['network:router_interface']} 'device_owner': [constants.DEVICE_OWNER_ROUTER_INTF]}
router_iface_ports = self.get_ports(context, filters=port_filter) router_iface_ports = self.get_ports(context, filters=port_filter)
for port in router_iface_ports: for port in router_iface_ports:
nsx_switch_id, nsx_port_id = nsx_utils.get_nsx_switch_and_port_id( nsx_switch_id, nsx_port_id = nsx_utils.get_nsx_switch_and_port_id(

View File

@ -585,11 +585,12 @@ class TestNecPluginOfcManager(NecPluginV2TestCase):
with self.network() as network: with self.network() as network:
with self.subnet(network=network): with self.subnet(network=network):
net = network['network'] net = network['network']
p = self._create_resource('port', p = self._create_resource(
{'network_id': net['id'], 'port',
'tenant_id': net['tenant_id'], {'network_id': net['id'],
'device_owner': 'network:dhcp', 'tenant_id': net['tenant_id'],
'device_id': 'dhcp-port1'}) 'device_owner': constants.DEVICE_OWNER_DHCP,
'device_id': 'dhcp-port1'})
# Make sure that the port is created on OFC. # Make sure that the port is created on OFC.
portinfo = {'id': p['id'], 'port_no': 123} portinfo = {'id': p['id'], 'port_no': 123}
self.rpcapi_update_ports(added=[portinfo]) self.rpcapi_update_ports(added=[portinfo])

View File

@ -31,6 +31,7 @@ from neutron.api.v2 import attributes
from neutron.api.v2.attributes import ATTR_NOT_SPECIFIED from neutron.api.v2.attributes import ATTR_NOT_SPECIFIED
from neutron.api.v2.router import APIRouter from neutron.api.v2.router import APIRouter
from neutron.common import config from neutron.common import config
from neutron.common import constants
from neutron.common import exceptions as n_exc from neutron.common import exceptions as n_exc
from neutron.common.test_lib import test_config from neutron.common.test_lib import test_config
from neutron import context from neutron import context
@ -1132,7 +1133,8 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
admin_state_up=True) admin_state_up=True)
network = self.deserialize(self.fmt, res) network = self.deserialize(self.fmt, res)
network_id = network['network']['id'] network_id = network['network']['id']
self._create_port(self.fmt, network_id, device_owner='network:dhcp') self._create_port(self.fmt, network_id,
device_owner=constants.DEVICE_OWNER_DHCP)
req = self.new_delete_request('networks', network_id) req = self.new_delete_request('networks', network_id)
res = req.get_response(self.api) res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPNoContent.code) self.assertEqual(res.status_int, webob.exc.HTTPNoContent.code)
@ -2412,7 +2414,7 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
cidr, ip_version=4) cidr, ip_version=4)
self._create_port(self.fmt, self._create_port(self.fmt,
network['network']['id'], network['network']['id'],
device_owner='network:dhcp') device_owner=constants.DEVICE_OWNER_DHCP)
req = self.new_delete_request('subnets', subnet['subnet']['id']) req = self.new_delete_request('subnets', subnet['subnet']['id'])
res = req.get_response(self.api) res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPNoContent.code) self.assertEqual(res.status_int, webob.exc.HTTPNoContent.code)

View File

@ -15,6 +15,7 @@
import mock import mock
from neutron.common import constants
from neutron.common import exceptions as n_exc from neutron.common import exceptions as n_exc
from neutron.db import dhcp_rpc_base from neutron.db import dhcp_rpc_base
from neutron.openstack.common.db import exception as db_exc from neutron.openstack.common.db import exception as db_exc
@ -54,7 +55,7 @@ class TestDhcpRpcCallbackMixin(base.BaseTestCase):
def _test__port_action_with_failures(self, exc=None, action=None): def _test__port_action_with_failures(self, exc=None, action=None):
port = { port = {
'network_id': 'foo_network_id', 'network_id': 'foo_network_id',
'device_owner': 'network:dhcp', 'device_owner': constants.DEVICE_OWNER_DHCP,
'fixed_ips': [{'subnet_id': 'foo_subnet_id'}] 'fixed_ips': [{'subnet_id': 'foo_subnet_id'}]
} }
self.plugin.create_port.side_effect = exc self.plugin.create_port.side_effect = exc
@ -187,7 +188,7 @@ class TestDhcpRpcCallbackMixin(base.BaseTestCase):
create_spec = dict(tenant_id='tenantid', device_id='devid', create_spec = dict(tenant_id='tenantid', device_id='devid',
network_id='netid', name='', network_id='netid', name='',
admin_state_up=True, admin_state_up=True,
device_owner='network:dhcp', device_owner=constants.DEVICE_OWNER_DHCP,
mac_address=mock.ANY) mac_address=mock.ANY)
create_retval = create_spec.copy() create_retval = create_spec.copy()
create_retval['id'] = 'port_id' create_retval['id'] = 'port_id'

View File

@ -19,6 +19,7 @@ import contextlib
from oslo.config import cfg from oslo.config import cfg
from webob import exc from webob import exc
from neutron.common import constants
from neutron.db import extraroute_db from neutron.db import extraroute_db
from neutron.extensions import extraroute from neutron.extensions import extraroute
from neutron.extensions import l3 from neutron.extensions import l3
@ -392,7 +393,6 @@ class ExtraRouteDBTestCaseBase(object):
p['port']['id']) p['port']['id'])
def test_router_update_on_external_port(self): def test_router_update_on_external_port(self):
DEVICE_OWNER_ROUTER_GW = "network:router_gateway"
with self.router() as r: with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s: with self.subnet(cidr='10.0.1.0/24') as s:
self._set_net_external(s['subnet']['network_id']) self._set_net_external(s['subnet']['network_id'])
@ -402,11 +402,12 @@ class ExtraRouteDBTestCaseBase(object):
body = self._show('routers', r['router']['id']) body = self._show('routers', r['router']['id'])
net_id = body['router']['external_gateway_info']['network_id'] net_id = body['router']['external_gateway_info']['network_id']
self.assertEqual(net_id, s['subnet']['network_id']) self.assertEqual(net_id, s['subnet']['network_id'])
port_res = self._list_ports('json', port_res = self._list_ports(
200, 'json',
s['subnet']['network_id'], 200,
tenant_id=r['router']['tenant_id'], s['subnet']['network_id'],
device_own=DEVICE_OWNER_ROUTER_GW) tenant_id=r['router']['tenant_id'],
device_own=constants.DEVICE_OWNER_ROUTER_GW)
port_list = self.deserialize('json', port_res) port_list = self.deserialize('json', port_res)
self.assertEqual(len(port_list['ports']), 1) self.assertEqual(len(port_list['ports']), 1)

View File

@ -1146,7 +1146,8 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
port_id=private_port['port']['id']) port_id=private_port['port']['id'])
self.assertEqual(res.status_int, 400) self.assertEqual(res.status_int, 400)
for p in self._list('ports')['ports']: for p in self._list('ports')['ports']:
if p['device_owner'] == 'network:floatingip': if (p['device_owner'] ==
l3_constants.DEVICE_OWNER_FLOATINGIP):
self.fail('garbage port is not deleted') self.fail('garbage port is not deleted')
self._remove_external_gateway_from_router( self._remove_external_gateway_from_router(
r['router']['id'], r['router']['id'],
@ -1184,7 +1185,8 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
self.assertEqual(res.status_int, exc.HTTPConflict.code) self.assertEqual(res.status_int, exc.HTTPConflict.code)
for p in self._list('ports')['ports']: for p in self._list('ports')['ports']:
if p['device_owner'] == 'network:floatingip': if (p['device_owner'] ==
l3_constants.DEVICE_OWNER_FLOATINGIP):
self.fail('garbage port is not deleted') self.fail('garbage port is not deleted')
self._remove_external_gateway_from_router( self._remove_external_gateway_from_router(
@ -1341,7 +1343,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
found = False found = False
with self.floatingip_with_assoc(): with self.floatingip_with_assoc():
for p in self._list('ports')['ports']: for p in self._list('ports')['ports']:
if p['device_owner'] == 'network:floatingip': if p['device_owner'] == l3_constants.DEVICE_OWNER_FLOATINGIP:
self._delete('ports', p['id'], self._delete('ports', p['id'],
expected_code=exc.HTTPConflict.code) expected_code=exc.HTTPConflict.code)
found = True found = True
@ -1524,7 +1526,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
found = False found = False
with self.floatingip_with_assoc(): with self.floatingip_with_assoc():
for p in self._list('ports')['ports']: for p in self._list('ports')['ports']:
if p['device_owner'] == 'network:router_interface': if p['device_owner'] == l3_constants.DEVICE_OWNER_ROUTER_INTF:
subnet_id = p['fixed_ips'][0]['subnet_id'] subnet_id = p['fixed_ips'][0]['subnet_id']
router_id = p['device_id'] router_id = p['device_id']
self._router_interface_action( self._router_interface_action(
@ -1538,7 +1540,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
found = False found = False
with self.floatingip_with_assoc(): with self.floatingip_with_assoc():
for p in self._list('ports')['ports']: for p in self._list('ports')['ports']:
if p['device_owner'] == 'network:router_interface': if p['device_owner'] == l3_constants.DEVICE_OWNER_ROUTER_INTF:
router_id = p['device_id'] router_id = p['device_id']
self._router_interface_action( self._router_interface_action(
'remove', router_id, None, p['id'], 'remove', router_id, None, p['id'],

View File

@ -23,6 +23,7 @@ from oslo.config import cfg
from neutron.agent.common import config from neutron.agent.common import config
from neutron.agent.linux import dhcp from neutron.agent.linux import dhcp
from neutron.common import config as base_config from neutron.common import config as base_config
from neutron.common import constants
from neutron.openstack.common import log as logging from neutron.openstack.common import log as logging
from neutron.tests import base from neutron.tests import base
@ -80,7 +81,7 @@ class FakePort3:
class FakeRouterPort: class FakeRouterPort:
id = 'rrrrrrrr-rrrr-rrrr-rrrr-rrrrrrrrrrrr' id = 'rrrrrrrr-rrrr-rrrr-rrrr-rrrrrrrrrrrr'
admin_state_up = True admin_state_up = True
device_owner = 'network:router_interface' device_owner = constants.DEVICE_OWNER_ROUTER_INTF
fixed_ips = [FakeIPAllocation('192.168.0.1', fixed_ips = [FakeIPAllocation('192.168.0.1',
'dddddddd-dddd-dddd-dddd-dddddddddddd')] 'dddddddd-dddd-dddd-dddd-dddddddddddd')]
mac_address = '00:00:0f:rr:rr:rr' mac_address = '00:00:0f:rr:rr:rr'
@ -92,7 +93,7 @@ class FakeRouterPort:
class FakePortMultipleAgents1: class FakePortMultipleAgents1:
id = 'rrrrrrrr-rrrr-rrrr-rrrr-rrrrrrrrrrrr' id = 'rrrrrrrr-rrrr-rrrr-rrrr-rrrrrrrrrrrr'
admin_state_up = True admin_state_up = True
device_owner = 'network:dhcp' device_owner = constants.DEVICE_OWNER_DHCP
fixed_ips = [FakeIPAllocation('192.168.0.5', fixed_ips = [FakeIPAllocation('192.168.0.5',
'dddddddd-dddd-dddd-dddd-dddddddddddd')] 'dddddddd-dddd-dddd-dddd-dddddddddddd')]
mac_address = '00:00:0f:dd:dd:dd' mac_address = '00:00:0f:dd:dd:dd'
@ -104,7 +105,7 @@ class FakePortMultipleAgents1:
class FakePortMultipleAgents2: class FakePortMultipleAgents2:
id = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa' id = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'
admin_state_up = True admin_state_up = True
device_owner = 'network:dhcp' device_owner = constants.DEVICE_OWNER_DHCP
fixed_ips = [FakeIPAllocation('192.168.0.6', fixed_ips = [FakeIPAllocation('192.168.0.6',
'dddddddd-dddd-dddd-dddd-dddddddddddd')] 'dddddddd-dddd-dddd-dddd-dddddddddddd')]
mac_address = '00:00:0f:ee:ee:ee' mac_address = '00:00:0f:ee:ee:ee'

View File

@ -23,6 +23,7 @@ import testtools
import webob import webob
from neutron.agent.metadata import agent from neutron.agent.metadata import agent
from neutron.common import constants
from neutron.common import utils from neutron.common import utils
from neutron.tests import base from neutron.tests import base
@ -112,7 +113,7 @@ class TestMetadataProxyHandler(base.BaseTestCase):
expected.append( expected.append(
mock.call().list_ports( mock.call().list_ports(
device_id=router_id, device_id=router_id,
device_owner='network:router_interface' device_owner=constants.DEVICE_OWNER_ROUTER_INTF
) )
) )

View File

@ -17,6 +17,7 @@ import mock
from oslo.config import cfg from oslo.config import cfg
from neutron.common import constants as n_consts
from neutron.common import exceptions as n_exc from neutron.common import exceptions as n_exc
from neutron import context from neutron import context
from neutron.db import api as db from neutron.db import api as db
@ -940,12 +941,12 @@ class DhcpAgentNotifyAPITestCase(base.BaseTestCase):
def test_notify_ports_update_with_special_ports(self): def test_notify_ports_update_with_special_ports(self):
ports = [{'fixed_ips': [], ports = [{'fixed_ips': [],
'device_id': '', 'device_id': '',
'device_owner': 'network:dhcp', 'device_owner': n_consts.DEVICE_OWNER_DHCP,
'mac_address': 'fa:16:3e:da:1d:46'}, 'mac_address': 'fa:16:3e:da:1d:46'},
{'fixed_ips': [{'subnet_id': 'foo_subnet_id', {'fixed_ips': [{'subnet_id': 'foo_subnet_id',
'ip_address': '1.2.3.4'}], 'ip_address': '1.2.3.4'}],
'device_id': 'foo_device_id', 'device_id': 'foo_device_id',
'device_owner': 'network:router_gateway', 'device_owner': n_consts.DEVICE_OWNER_ROUTER_GW,
'mac_address': 'fa:16:3e:da:1d:46'}] 'mac_address': 'fa:16:3e:da:1d:46'}]
call_args = mock.call( call_args = mock.call(
mock.ANY, 'foo_network_id', 'foo_subnet_id', dhcp=[], meta=[]) mock.ANY, 'foo_network_id', 'foo_subnet_id', dhcp=[], meta=[])
@ -1014,7 +1015,7 @@ class DhcpAgentNotifyAPITestCase(base.BaseTestCase):
'admin_state_up': True, 'admin_state_up': True,
'network_id': 'foo_network_id', 'network_id': 'foo_network_id',
'tenant_id': 'foo_tenant_id', 'tenant_id': 'foo_tenant_id',
'device_owner': 'network:dhcp', 'device_owner': n_consts.DEVICE_OWNER_DHCP,
'mac_address': mock.ANY, 'mac_address': mock.ANY,
'fixed_ips': [{'subnet_id': 'foo_subnet_id'}], 'fixed_ips': [{'subnet_id': 'foo_subnet_id'}],
'device_id': '' 'device_id': ''
@ -1102,7 +1103,7 @@ class DhcpAgentNotifyAPITestCase(base.BaseTestCase):
self.notifier.notify(mock.ANY, {'subnet': subnet}, 'subnet.delete.end') self.notifier.notify(mock.ANY, {'subnet': subnet}, 'subnet.delete.end')
filters = { filters = {
'network_id': [subnet['network_id']], 'network_id': [subnet['network_id']],
'device_owner': ['network:dhcp'] 'device_owner': [n_consts.DEVICE_OWNER_DHCP]
} }
self.plugin.get_ports.assert_called_once_with( self.plugin.get_ports.assert_called_once_with(
mock.ANY, filters=filters) mock.ANY, filters=filters)
@ -1149,7 +1150,7 @@ class DhcpTestCase(base.BaseTestCase):
} }
port = { port = {
'id': 'foo_port_id', 'id': 'foo_port_id',
'device_owner': 'network:dhcp', 'device_owner': n_consts.DEVICE_OWNER_DHCP,
'mac_address': 'aa:bb:cc:dd:ee:ff', 'mac_address': 'aa:bb:cc:dd:ee:ff',
'network_id': 'foo_network_id', 'network_id': 'foo_network_id',
'fixed_ips': [{'subnet_id': subnet['id']}] 'fixed_ips': [{'subnet_id': subnet['id']}]
@ -1184,7 +1185,7 @@ class DhcpTestCase(base.BaseTestCase):
def test_handle_delete_dhcp_owner_port(self): def test_handle_delete_dhcp_owner_port(self):
port = { port = {
'id': 'foo_port_id', 'id': 'foo_port_id',
'device_owner': 'network:dhcp', 'device_owner': n_consts.DEVICE_OWNER_DHCP,
'network_id': 'foo_network_id', 'network_id': 'foo_network_id',
'fixed_ips': [], 'fixed_ips': [],
'mac_address': 'aa:bb:cc:dd:ee:ff' 'mac_address': 'aa:bb:cc:dd:ee:ff'
@ -1297,15 +1298,15 @@ class MetadataTestCase(base.BaseTestCase):
def test_handle_port_metadata_access_dhcp_port(self): def test_handle_port_metadata_access_dhcp_port(self):
self._test_handle_port_metadata_access_special_owners( self._test_handle_port_metadata_access_special_owners(
'network:dhcp', [{'subnet_id': 'foo_subnet'}]) n_consts.DEVICE_OWNER_DHCP, [{'subnet_id': 'foo_subnet'}])
def test_handle_port_metadata_access_router_port(self): def test_handle_port_metadata_access_router_port(self):
self._test_handle_port_metadata_access_special_owners( self._test_handle_port_metadata_access_special_owners(
'network:router_interface', [{'subnet_id': 'foo_subnet'}]) n_consts.DEVICE_OWNER_ROUTER_INTF, [{'subnet_id': 'foo_subnet'}])
def test_handle_port_metadata_access_no_device_id(self): def test_handle_port_metadata_access_no_device_id(self):
self._test_handle_port_metadata_access_special_owners( self._test_handle_port_metadata_access_special_owners(
'network:dhcp', '') n_consts.DEVICE_OWNER_DHCP, '')
def test_handle_port_metadata_access_no_fixed_ips(self): def test_handle_port_metadata_access_no_fixed_ips(self):
self._test_handle_port_metadata_access_special_owners( self._test_handle_port_metadata_access_special_owners(

View File

@ -221,7 +221,7 @@ class TestPortsV2(NsxPluginV2TestCase,
with mock.patch.object(nsx_db, 'add_neutron_nsx_port_mapping', with mock.patch.object(nsx_db, 'add_neutron_nsx_port_mapping',
side_effect=db_exception): side_effect=db_exception):
with self.network() as net: with self.network() as net:
with self.port(device_owner='network:dhcp'): with self.port(device_owner=constants.DEVICE_OWNER_DHCP):
self._verify_no_orphan_left(net['network']['id']) self._verify_no_orphan_left(net['network']['id'])
def test_create_port_maintenance_returns_503(self): def test_create_port_maintenance_returns_503(self):
@ -909,7 +909,7 @@ class TestL3NatTestCase(L3NatTest,
subnets = self._list('subnets')['subnets'] subnets = self._list('subnets')['subnets']
with self.subnet() as s: with self.subnet() as s:
with self.port(subnet=s, device_id='1234', with self.port(subnet=s, device_id='1234',
device_owner='network:dhcp'): device_owner=constants.DEVICE_OWNER_DHCP):
subnets = self._list('subnets')['subnets'] subnets = self._list('subnets')['subnets']
self.assertEqual(len(subnets), 1) self.assertEqual(len(subnets), 1)
self.assertEqual(subnets[0]['host_routes'][0]['nexthop'], self.assertEqual(subnets[0]['host_routes'][0]['nexthop'],