Support iptables-based security group in NEC plugin
blueprint nec-security-group This commit also refactors RPC API and callbacks in the plugin and agent to support security group RPC. Change-Id: I09d69ca3aff43e0468bbd5df6367de767af27acc
This commit is contained in:
parent
ea7a1740cb
commit
ca5a92d8e2
@ -39,6 +39,10 @@ polling_interval = 2
|
|||||||
# Change to "sudo" to skip the filtering and just run the comand directly
|
# Change to "sudo" to skip the filtering and just run the comand directly
|
||||||
root_helper = sudo
|
root_helper = sudo
|
||||||
|
|
||||||
|
[SECURITYGROUP]
|
||||||
|
# Firewall driver for realizing quantum security group function
|
||||||
|
firewall_driver = quantum.agent.linux.iptables_firewall.OVSHybridIptablesFirewallDriver
|
||||||
|
|
||||||
[OFC]
|
[OFC]
|
||||||
# Specify OpenFlow Controller Host, Port and Driver to connect.
|
# Specify OpenFlow Controller Host, Port and Driver to connect.
|
||||||
host = 127.0.0.1
|
host = 127.0.0.1
|
||||||
|
@ -86,9 +86,9 @@ class SecurityGroupAgentRpcMixin(object):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
def init_firewall(self):
|
def init_firewall(self):
|
||||||
LOG.debug(_("Init firewall settings"))
|
firewall_driver = cfg.CONF.SECURITYGROUP.firewall_driver
|
||||||
self.firewall = importutils.import_object(
|
LOG.debug(_("Init firewall settings (driver=%s)"), firewall_driver)
|
||||||
cfg.CONF.SECURITYGROUP.firewall_driver)
|
self.firewall = importutils.import_object(firewall_driver)
|
||||||
|
|
||||||
def prepare_devices_filter(self, device_ids):
|
def prepare_devices_filter(self, device_ids):
|
||||||
if not device_ids:
|
if not device_ids:
|
||||||
|
@ -33,6 +33,7 @@ migration_for_plugins = [
|
|||||||
'quantum.plugins.linuxbridge.lb_quantum_plugin.LinuxBridgePluginV2',
|
'quantum.plugins.linuxbridge.lb_quantum_plugin.LinuxBridgePluginV2',
|
||||||
'quantum.plugins.nicira.nicira_nvp_plugin.QuantumPlugin.NvpPluginV2',
|
'quantum.plugins.nicira.nicira_nvp_plugin.QuantumPlugin.NvpPluginV2',
|
||||||
'quantum.plugins.openvswitch.ovs_quantum_plugin.OVSQuantumPluginV2',
|
'quantum.plugins.openvswitch.ovs_quantum_plugin.OVSQuantumPluginV2',
|
||||||
|
'quantum.plugins.nec.nec_plugin.NECPluginV2',
|
||||||
]
|
]
|
||||||
|
|
||||||
from alembic import op
|
from alembic import op
|
||||||
|
@ -20,23 +20,99 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
# @author: Ryota MIBU
|
# @author: Ryota MIBU
|
||||||
|
# @author: Akihiro MOTOKI
|
||||||
|
|
||||||
import socket
|
import socket
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
import eventlet
|
||||||
|
|
||||||
from quantum.agent.linux import ovs_lib
|
from quantum.agent.linux import ovs_lib
|
||||||
|
from quantum.agent import rpc as agent_rpc
|
||||||
|
from quantum.agent import securitygroups_rpc as sg_rpc
|
||||||
from quantum.common import config as logging_config
|
from quantum.common import config as logging_config
|
||||||
from quantum.common import topics
|
from quantum.common import topics
|
||||||
from quantum import context
|
from quantum import context as q_context
|
||||||
|
from quantum.extensions import securitygroup as ext_sg
|
||||||
from quantum.openstack.common import log as logging
|
from quantum.openstack.common import log as logging
|
||||||
from quantum.openstack.common import rpc
|
from quantum.openstack.common.rpc import dispatcher
|
||||||
|
from quantum.openstack.common.rpc import proxy
|
||||||
from quantum.plugins.nec.common import config
|
from quantum.plugins.nec.common import config
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class NECPluginApi(agent_rpc.PluginApi):
|
||||||
|
BASE_RPC_API_VERSION = '1.0'
|
||||||
|
|
||||||
|
def update_ports(self, context, agent_id, datapath_id,
|
||||||
|
port_added, port_removed):
|
||||||
|
"""RPC to update information of ports on Quantum Server"""
|
||||||
|
LOG.info(_("Update ports: added=%(added)s, "
|
||||||
|
"removed=%(removed)s"),
|
||||||
|
{'added': port_added, 'removed': port_removed})
|
||||||
|
try:
|
||||||
|
self.call(context,
|
||||||
|
self.make_msg('update_ports',
|
||||||
|
topic=topics.AGENT,
|
||||||
|
agent_id=agent_id,
|
||||||
|
datapath_id=datapath_id,
|
||||||
|
port_added=port_added,
|
||||||
|
port_removed=port_removed))
|
||||||
|
except Exception as e:
|
||||||
|
LOG.warn(_("update_ports() failed."))
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
class NECAgentRpcCallback(object):
|
||||||
|
|
||||||
|
RPC_API_VERSION = '1.0'
|
||||||
|
|
||||||
|
def __init__(self, context, agent, sg_agent):
|
||||||
|
self.context = context
|
||||||
|
self.agent = agent
|
||||||
|
self.sg_agent = sg_agent
|
||||||
|
|
||||||
|
def port_update(self, context, **kwargs):
|
||||||
|
LOG.debug(_("port_update received: %s"), kwargs)
|
||||||
|
port = kwargs.get('port')
|
||||||
|
# Validate that port is on OVS
|
||||||
|
vif_port = self.agent.int_br.get_vif_port_by_id(port['id'])
|
||||||
|
if not vif_port:
|
||||||
|
return
|
||||||
|
|
||||||
|
if ext_sg.SECURITYGROUPS in port:
|
||||||
|
self.sg_agent.refresh_firewall()
|
||||||
|
|
||||||
|
|
||||||
|
class SecurityGroupServerRpcApi(proxy.RpcProxy,
|
||||||
|
sg_rpc.SecurityGroupServerRpcApiMixin):
|
||||||
|
|
||||||
|
def __init__(self, topic):
|
||||||
|
super(SecurityGroupServerRpcApi, self).__init__(
|
||||||
|
topic=topic, default_version=sg_rpc.SG_RPC_VERSION)
|
||||||
|
|
||||||
|
|
||||||
|
class SecurityGroupAgentRpcCallback(
|
||||||
|
sg_rpc.SecurityGroupAgentRpcCallbackMixin):
|
||||||
|
|
||||||
|
RPC_API_VERSION = sg_rpc.SG_RPC_VERSION
|
||||||
|
|
||||||
|
def __init__(self, context, sg_agent):
|
||||||
|
self.context = context
|
||||||
|
self.sg_agent = sg_agent
|
||||||
|
|
||||||
|
|
||||||
|
class SecurityGroupAgentRpc(sg_rpc.SecurityGroupAgentRpcMixin):
|
||||||
|
|
||||||
|
def __init__(self, context):
|
||||||
|
self.context = context
|
||||||
|
self.plugin_rpc = SecurityGroupServerRpcApi(topics.PLUGIN)
|
||||||
|
self.init_firewall()
|
||||||
|
|
||||||
|
|
||||||
class NECQuantumAgent(object):
|
class NECQuantumAgent(object):
|
||||||
|
|
||||||
def __init__(self, integ_br, root_helper, polling_interval):
|
def __init__(self, integ_br, root_helper, polling_interval):
|
||||||
@ -49,36 +125,46 @@ class NECQuantumAgent(object):
|
|||||||
self.int_br = ovs_lib.OVSBridge(integ_br, root_helper)
|
self.int_br = ovs_lib.OVSBridge(integ_br, root_helper)
|
||||||
self.polling_interval = polling_interval
|
self.polling_interval = polling_interval
|
||||||
|
|
||||||
|
self.datapath_id = "0x%s" % self.int_br.get_datapath_id()
|
||||||
|
self.setup_rpc()
|
||||||
|
|
||||||
|
def setup_rpc(self):
|
||||||
self.host = socket.gethostname()
|
self.host = socket.gethostname()
|
||||||
self.agent_id = 'nec-q-agent.%s' % self.host
|
self.agent_id = 'nec-q-agent.%s' % self.host
|
||||||
self.datapath_id = "0x%s" % self.int_br.get_datapath_id()
|
LOG.info(_("RPC agent_id: %s"), self.agent_id)
|
||||||
|
|
||||||
|
self.topic = topics.AGENT
|
||||||
|
self.context = q_context.get_admin_context_without_session()
|
||||||
|
|
||||||
|
self.plugin_rpc = NECPluginApi(topics.PLUGIN)
|
||||||
|
self.sg_agent = SecurityGroupAgentRpc(self.context)
|
||||||
|
|
||||||
# RPC network init
|
# RPC network init
|
||||||
self.context = context.get_admin_context_without_session()
|
# Handle updates from service
|
||||||
self.conn = rpc.create_connection(new=True)
|
self.callback_nec = NECAgentRpcCallback(self.context,
|
||||||
|
self, self.sg_agent)
|
||||||
def update_ports(self, port_added=[], port_removed=[]):
|
self.callback_sg = SecurityGroupAgentRpcCallback(self.context,
|
||||||
"""RPC to update information of ports on Quantum Server"""
|
self.sg_agent)
|
||||||
LOG.info(_("Update ports: added=%(port_added)s, "
|
self.dispatcher = dispatcher.RpcDispatcher([self.callback_nec,
|
||||||
"removed=%(port_removed)s"),
|
self.callback_sg])
|
||||||
locals())
|
# Define the listening consumer for the agent
|
||||||
try:
|
consumers = [[topics.PORT, topics.UPDATE],
|
||||||
rpc.call(self.context,
|
[topics.SECURITY_GROUP, topics.UPDATE]]
|
||||||
topics.PLUGIN,
|
self.connection = agent_rpc.create_consumers(self.dispatcher,
|
||||||
{'method': 'update_ports',
|
self.topic,
|
||||||
'args': {'topic': topics.AGENT,
|
consumers)
|
||||||
'agent_id': self.agent_id,
|
|
||||||
'datapath_id': self.datapath_id,
|
|
||||||
'port_added': port_added,
|
|
||||||
'port_removed': port_removed}})
|
|
||||||
except Exception as e:
|
|
||||||
LOG.warn(_("update_ports() failed."))
|
|
||||||
return
|
|
||||||
|
|
||||||
def _vif_port_to_port_info(self, vif_port):
|
def _vif_port_to_port_info(self, vif_port):
|
||||||
return dict(id=vif_port.vif_id, port_no=vif_port.ofport,
|
return dict(id=vif_port.vif_id, port_no=vif_port.ofport,
|
||||||
mac=vif_port.vif_mac)
|
mac=vif_port.vif_mac)
|
||||||
|
|
||||||
|
def _process_security_group(self, port_added, port_removed):
|
||||||
|
if port_added:
|
||||||
|
devices_added = [p['id'] for p in port_added]
|
||||||
|
self.sg_agent.prepare_devices_filter(devices_added)
|
||||||
|
if port_removed:
|
||||||
|
self.sg_agent.remove_devices_filter(port_removed)
|
||||||
|
|
||||||
def daemon_loop(self):
|
def daemon_loop(self):
|
||||||
"""Main processing loop for NEC Plugin Agent."""
|
"""Main processing loop for NEC Plugin Agent."""
|
||||||
old_ports = []
|
old_ports = []
|
||||||
@ -99,7 +185,10 @@ class NECQuantumAgent(object):
|
|||||||
port_removed.append(port_id)
|
port_removed.append(port_id)
|
||||||
|
|
||||||
if port_added or port_removed:
|
if port_added or port_removed:
|
||||||
self.update_ports(port_added, port_removed)
|
self.plugin_rpc.update_ports(self.context,
|
||||||
|
self.agent_id, self.datapath_id,
|
||||||
|
port_added, port_removed)
|
||||||
|
self._process_security_group(port_added, port_removed)
|
||||||
else:
|
else:
|
||||||
LOG.debug(_("No port changed."))
|
LOG.debug(_("No port changed."))
|
||||||
|
|
||||||
@ -108,6 +197,8 @@ class NECQuantumAgent(object):
|
|||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
|
eventlet.monkey_patch()
|
||||||
|
|
||||||
config.CONF(project='quantum')
|
config.CONF(project='quantum')
|
||||||
|
|
||||||
logging_config.setup_logging(config.CONF)
|
logging_config.setup_logging(config.CONF)
|
||||||
|
@ -19,6 +19,10 @@ import sqlalchemy as sa
|
|||||||
|
|
||||||
from quantum.db import api as db
|
from quantum.db import api as db
|
||||||
from quantum.db import model_base
|
from quantum.db import model_base
|
||||||
|
from quantum.db import models_v2
|
||||||
|
from quantum.db import securitygroups_db as sg_db
|
||||||
|
from quantum.extensions import securitygroup as ext_sg
|
||||||
|
from quantum import manager
|
||||||
from quantum.openstack.common import log as logging
|
from quantum.openstack.common import log as logging
|
||||||
# NOTE (e0ne): this import is needed for config init
|
# NOTE (e0ne): this import is needed for config init
|
||||||
from quantum.plugins.nec.common import config
|
from quantum.plugins.nec.common import config
|
||||||
@ -117,3 +121,29 @@ def del_portinfo(id):
|
|||||||
except sa.orm.exc.NoResultFound:
|
except sa.orm.exc.NoResultFound:
|
||||||
LOG.warning(_("del_portinfo(): NotFound portinfo for "
|
LOG.warning(_("del_portinfo(): NotFound portinfo for "
|
||||||
"port_id: %s"), id)
|
"port_id: %s"), id)
|
||||||
|
|
||||||
|
|
||||||
|
def get_port_from_device(port_id):
|
||||||
|
"""Get port from database"""
|
||||||
|
LOG.debug(_("get_port_with_securitygroups() called:port_id=%s"), port_id)
|
||||||
|
session = db.get_session()
|
||||||
|
sg_binding_port = sg_db.SecurityGroupPortBinding.port_id
|
||||||
|
|
||||||
|
query = session.query(models_v2.Port,
|
||||||
|
sg_db.SecurityGroupPortBinding.security_group_id)
|
||||||
|
query = query.outerjoin(sg_db.SecurityGroupPortBinding,
|
||||||
|
models_v2.Port.id == sg_binding_port)
|
||||||
|
query = query.filter(models_v2.Port.id == port_id)
|
||||||
|
port_and_sgs = query.all()
|
||||||
|
if not port_and_sgs:
|
||||||
|
return None
|
||||||
|
port = port_and_sgs[0][0]
|
||||||
|
plugin = manager.QuantumManager.get_plugin()
|
||||||
|
port_dict = plugin._make_port_dict(port)
|
||||||
|
port_dict[ext_sg.SECURITYGROUPS] = [
|
||||||
|
sg_id for port, sg_id in port_and_sgs if sg_id]
|
||||||
|
port_dict['security_group_rules'] = []
|
||||||
|
port_dict['security_group_source_groups'] = []
|
||||||
|
port_dict['fixed_ips'] = [ip['ip_address']
|
||||||
|
for ip in port['fixed_ips']]
|
||||||
|
return port_dict
|
||||||
|
@ -15,6 +15,9 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
# @author: Ryota MIBU
|
# @author: Ryota MIBU
|
||||||
|
|
||||||
|
from quantum.agent import securitygroups_rpc as sg_rpc
|
||||||
|
from quantum.common import constants as q_const
|
||||||
|
from quantum.common import exceptions as q_exc
|
||||||
from quantum.common import rpc as q_rpc
|
from quantum.common import rpc as q_rpc
|
||||||
from quantum.common import topics
|
from quantum.common import topics
|
||||||
from quantum import context
|
from quantum import context
|
||||||
@ -23,9 +26,12 @@ from quantum.db import l3_db
|
|||||||
from quantum.db import l3_rpc_base
|
from quantum.db import l3_rpc_base
|
||||||
#NOTE(amotoki): quota_db cannot be removed, it is for db model
|
#NOTE(amotoki): quota_db cannot be removed, it is for db model
|
||||||
from quantum.db import quota_db
|
from quantum.db import quota_db
|
||||||
|
from quantum.db import securitygroups_rpc_base as sg_db_rpc
|
||||||
from quantum.extensions import portbindings
|
from quantum.extensions import portbindings
|
||||||
|
from quantum.extensions import securitygroup as ext_sg
|
||||||
from quantum.openstack.common import log as logging
|
from quantum.openstack.common import log as logging
|
||||||
from quantum.openstack.common import rpc
|
from quantum.openstack.common import rpc
|
||||||
|
from quantum.openstack.common.rpc import proxy
|
||||||
from quantum.plugins.nec.common import config
|
from quantum.plugins.nec.common import config
|
||||||
from quantum.plugins.nec.common import exceptions as nexc
|
from quantum.plugins.nec.common import exceptions as nexc
|
||||||
from quantum.plugins.nec.db import api as ndb
|
from quantum.plugins.nec.db import api as ndb
|
||||||
@ -51,7 +57,9 @@ class OperationalStatus:
|
|||||||
ERROR = "ERROR"
|
ERROR = "ERROR"
|
||||||
|
|
||||||
|
|
||||||
class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
|
class NECPluginV2(nec_plugin_base.NECPluginV2Base,
|
||||||
|
l3_db.L3_NAT_db_mixin,
|
||||||
|
sg_db_rpc.SecurityGroupServerRpcMixin):
|
||||||
"""NECPluginV2 controls an OpenFlow Controller.
|
"""NECPluginV2 controls an OpenFlow Controller.
|
||||||
|
|
||||||
The Quantum NECPluginV2 maps L2 logical networks to L2 virtualized networks
|
The Quantum NECPluginV2 maps L2 logical networks to L2 virtualized networks
|
||||||
@ -65,7 +73,8 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
|
|||||||
information to and from the plugin.
|
information to and from the plugin.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
supported_extension_aliases = ["router", "quotas", "binding"]
|
supported_extension_aliases = ["router", "quotas", "binding",
|
||||||
|
"security-group"]
|
||||||
|
|
||||||
binding_view = "extension:port_binding:view"
|
binding_view = "extension:port_binding:view"
|
||||||
binding_set = "extension:port_binding:set"
|
binding_set = "extension:port_binding:set"
|
||||||
@ -81,21 +90,28 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
|
|||||||
|
|
||||||
self.setup_rpc()
|
self.setup_rpc()
|
||||||
|
|
||||||
|
def setup_rpc(self):
|
||||||
|
self.topic = topics.PLUGIN
|
||||||
|
self.conn = rpc.create_connection(new=True)
|
||||||
|
self.notifier = NECPluginV2AgentNotifierApi(topics.AGENT)
|
||||||
|
|
||||||
|
self.callback_nec = NECPluginV2RPCCallbacks(self)
|
||||||
|
self.callback_dhcp = DhcpRpcCallback()
|
||||||
|
self.callback_l3 = L3RpcCallback()
|
||||||
|
self.callback_sg = SecurityGroupServerRpcCallback()
|
||||||
|
callbacks = [self.callback_nec, self.callback_dhcp,
|
||||||
|
self.callback_l3, self.callback_sg]
|
||||||
|
self.dispatcher = q_rpc.PluginRpcDispatcher(callbacks)
|
||||||
|
self.conn.create_consumer(self.topic, self.dispatcher, fanout=False)
|
||||||
|
# Consume from all consumers in a thread
|
||||||
|
self.conn.consume_in_thread()
|
||||||
|
|
||||||
def _check_view_auth(self, context, resource, action):
|
def _check_view_auth(self, context, resource, action):
|
||||||
return policy.check(context, action, resource)
|
return policy.check(context, action, resource)
|
||||||
|
|
||||||
def _enforce_set_auth(self, context, resource, action):
|
def _enforce_set_auth(self, context, resource, action):
|
||||||
policy.enforce(context, action, resource)
|
policy.enforce(context, action, resource)
|
||||||
|
|
||||||
def setup_rpc(self):
|
|
||||||
self.topic = topics.PLUGIN
|
|
||||||
self.conn = rpc.create_connection(new=True)
|
|
||||||
self.callbacks = NECPluginV2RPCCallbacks(self)
|
|
||||||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
|
||||||
self.conn.create_consumer(self.topic, self.dispatcher, fanout=False)
|
|
||||||
# Consume from all consumers in a thread
|
|
||||||
self.conn.consume_in_thread()
|
|
||||||
|
|
||||||
def _update_resource_status(self, context, resource, id, status):
|
def _update_resource_status(self, context, resource, id, status):
|
||||||
"""Update status of specified resource."""
|
"""Update status of specified resource."""
|
||||||
request = {}
|
request = {}
|
||||||
@ -199,8 +215,12 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
|
|||||||
"""Create a new network entry on DB, and create it on OFC."""
|
"""Create a new network entry on DB, and create it on OFC."""
|
||||||
LOG.debug(_("NECPluginV2.create_network() called, "
|
LOG.debug(_("NECPluginV2.create_network() called, "
|
||||||
"network=%s ."), network)
|
"network=%s ."), network)
|
||||||
session = context.session
|
#set up default security groups
|
||||||
with session.begin(subtransactions=True):
|
tenant_id = self._get_tenant_id_for_create(
|
||||||
|
context, network['network'])
|
||||||
|
self._ensure_default_security_group(context, tenant_id)
|
||||||
|
|
||||||
|
with context.session.begin(subtransactions=True):
|
||||||
new_net = super(NECPluginV2, self).create_network(context, network)
|
new_net = super(NECPluginV2, self).create_network(context, network)
|
||||||
self._process_l3_create(context, network['network'], new_net['id'])
|
self._process_l3_create(context, network['network'], new_net['id'])
|
||||||
self._extend_network_dict_l3(context, new_net)
|
self._extend_network_dict_l3(context, new_net)
|
||||||
@ -337,12 +357,25 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
|
|||||||
def create_port(self, context, port):
|
def create_port(self, context, port):
|
||||||
"""Create a new port entry on DB, then try to activate it."""
|
"""Create a new port entry on DB, then try to activate it."""
|
||||||
LOG.debug(_("NECPluginV2.create_port() called, port=%s ."), port)
|
LOG.debug(_("NECPluginV2.create_port() called, port=%s ."), port)
|
||||||
new_port = super(NECPluginV2, self).create_port(context, port)
|
with context.session.begin(subtransactions=True):
|
||||||
self._update_resource_status(context, "port", new_port['id'],
|
self._ensure_default_security_group_on_port(context, port)
|
||||||
OperationalStatus.BUILD)
|
sgids = self._get_security_groups_on_port(context, port)
|
||||||
|
port = super(NECPluginV2, self).create_port(context, port)
|
||||||
|
self._process_port_create_security_group(
|
||||||
|
context, port['id'], sgids)
|
||||||
|
self._extend_port_dict_security_group(context, port)
|
||||||
|
# Note: In order to allow dhcp packets,
|
||||||
|
# changes for dhcp ip should be notifified
|
||||||
|
if port['device_owner'] == q_const.DEVICE_OWNER_DHCP:
|
||||||
|
self.notifier.security_groups_provider_updated(context)
|
||||||
|
else:
|
||||||
|
self.notifier.security_groups_member_updated(
|
||||||
|
context, port.get(ext_sg.SECURITYGROUPS))
|
||||||
|
|
||||||
self.activate_port_if_ready(context, new_port)
|
self._update_resource_status(context, "port", port['id'],
|
||||||
return self._extend_port_dict_binding(context, new_port)
|
OperationalStatus.BUILD)
|
||||||
|
self.activate_port_if_ready(context, port)
|
||||||
|
return self._extend_port_dict_binding(context, port)
|
||||||
|
|
||||||
def update_port(self, context, id, port):
|
def update_port(self, context, id, port):
|
||||||
"""Update port, and handle packetfilters associated with the port.
|
"""Update port, and handle packetfilters associated with the port.
|
||||||
@ -352,23 +385,37 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
|
|||||||
"""
|
"""
|
||||||
LOG.debug(_("NECPluginV2.update_port() called, "
|
LOG.debug(_("NECPluginV2.update_port() called, "
|
||||||
"id=%(id)s port=%(port)s ."), locals())
|
"id=%(id)s port=%(port)s ."), locals())
|
||||||
|
need_port_update_notify = False
|
||||||
|
with context.session.begin(subtransactions=True):
|
||||||
old_port = super(NECPluginV2, self).get_port(context, id)
|
old_port = super(NECPluginV2, self).get_port(context, id)
|
||||||
new_port = super(NECPluginV2, self).update_port(context, id, port)
|
new_port = super(NECPluginV2, self).update_port(context, id, port)
|
||||||
|
need_port_update_notify = self.update_security_group_on_port(
|
||||||
|
context, id, port, old_port, new_port)
|
||||||
|
|
||||||
changed = (old_port['admin_state_up'] is
|
need_port_update_notify |= self.is_security_group_member_updated(
|
||||||
not new_port['admin_state_up'])
|
context, old_port, new_port)
|
||||||
|
if need_port_update_notify:
|
||||||
|
self.notifier.port_update(context, new_port)
|
||||||
|
|
||||||
|
changed = (old_port['admin_state_up'] != new_port['admin_state_up'])
|
||||||
if changed:
|
if changed:
|
||||||
if new_port['admin_state_up']:
|
if new_port['admin_state_up']:
|
||||||
self.activate_port_if_ready(context, new_port)
|
self.activate_port_if_ready(context, new_port)
|
||||||
else:
|
else:
|
||||||
self.deactivate_port(context, old_port)
|
self.deactivate_port(context, old_port)
|
||||||
|
|
||||||
|
# NOTE: _extend_port_dict_security_group() is called in
|
||||||
|
# update_security_group_on_port() above, so we don't need to
|
||||||
|
# call it here.
|
||||||
return self._extend_port_dict_binding(context, new_port)
|
return self._extend_port_dict_binding(context, new_port)
|
||||||
|
|
||||||
def delete_port(self, context, id, l3_port_check=True):
|
def delete_port(self, context, id, l3_port_check=True):
|
||||||
"""Delete port and packet_filters associated with the port."""
|
"""Delete port and packet_filters associated with the port."""
|
||||||
LOG.debug(_("NECPluginV2.delete_port() called, id=%s ."), id)
|
LOG.debug(_("NECPluginV2.delete_port() called, id=%s ."), id)
|
||||||
port = super(NECPluginV2, self).get_port(context, id)
|
# ext_sg.SECURITYGROUPS attribute for the port is required
|
||||||
|
# since notifier.security_groups_member_updated() need the attribute.
|
||||||
|
# Thus we need to call self.get_port() instead of super().get_port()
|
||||||
|
port = self.get_port(context, id)
|
||||||
|
|
||||||
self.deactivate_port(context, port)
|
self.deactivate_port(context, port)
|
||||||
|
|
||||||
@ -384,22 +431,27 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
|
|||||||
# and l3-router. If so, we should prevent deletion.
|
# and l3-router. If so, we should prevent deletion.
|
||||||
if l3_port_check:
|
if l3_port_check:
|
||||||
self.prevent_l3_port_deletion(context, id)
|
self.prevent_l3_port_deletion(context, id)
|
||||||
|
with context.session.begin(subtransactions=True):
|
||||||
self.disassociate_floatingips(context, id)
|
self.disassociate_floatingips(context, id)
|
||||||
|
self._delete_port_security_group_bindings(context, id)
|
||||||
super(NECPluginV2, self).delete_port(context, id)
|
super(NECPluginV2, self).delete_port(context, id)
|
||||||
|
self.notifier.security_groups_member_updated(
|
||||||
|
context, port.get(ext_sg.SECURITYGROUPS))
|
||||||
|
|
||||||
def get_port(self, context, id, fields=None):
|
def get_port(self, context, id, fields=None):
|
||||||
session = context.session
|
with context.session.begin(subtransactions=True):
|
||||||
with session.begin(subtransactions=True):
|
|
||||||
port = super(NECPluginV2, self).get_port(context, id, fields)
|
port = super(NECPluginV2, self).get_port(context, id, fields)
|
||||||
|
self._extend_port_dict_security_group(context, port)
|
||||||
self._extend_port_dict_binding(context, port)
|
self._extend_port_dict_binding(context, port)
|
||||||
return self._fields(port, fields)
|
return self._fields(port, fields)
|
||||||
|
|
||||||
def get_ports(self, context, filters=None, fields=None):
|
def get_ports(self, context, filters=None, fields=None):
|
||||||
session = context.session
|
with context.session.begin(subtransactions=True):
|
||||||
with session.begin(subtransactions=True):
|
|
||||||
ports = super(NECPluginV2, self).get_ports(context, filters,
|
ports = super(NECPluginV2, self).get_ports(context, filters,
|
||||||
fields)
|
fields)
|
||||||
|
# TODO(amotoki) filter by security group
|
||||||
for port in ports:
|
for port in ports:
|
||||||
|
self._extend_port_dict_security_group(context, port)
|
||||||
self._extend_port_dict_binding(context, port)
|
self._extend_port_dict_binding(context, port)
|
||||||
return [self._fields(port, fields) for port in ports]
|
return [self._fields(port, fields) for port in ports]
|
||||||
|
|
||||||
@ -529,8 +581,52 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
|
|||||||
super(NECPluginV2, self).delete_packet_filter(context, id)
|
super(NECPluginV2, self).delete_packet_filter(context, id)
|
||||||
|
|
||||||
|
|
||||||
class NECPluginV2RPCCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
|
class NECPluginV2AgentNotifierApi(proxy.RpcProxy,
|
||||||
l3_rpc_base.L3RpcCallbackMixin):
|
sg_rpc.SecurityGroupAgentRpcApiMixin):
|
||||||
|
'''RPC API for NEC plugin agent'''
|
||||||
|
|
||||||
|
BASE_RPC_API_VERSION = '1.0'
|
||||||
|
|
||||||
|
def __init__(self, topic):
|
||||||
|
super(NECPluginV2AgentNotifierApi, self).__init__(
|
||||||
|
topic=topic, default_version=self.BASE_RPC_API_VERSION)
|
||||||
|
self.topic_port_update = topics.get_topic_name(
|
||||||
|
topic, topics.PORT, topics.UPDATE)
|
||||||
|
|
||||||
|
def port_update(self, context, port):
|
||||||
|
self.fanout_cast(context,
|
||||||
|
self.make_msg('port_update',
|
||||||
|
port=port),
|
||||||
|
topic=self.topic_port_update)
|
||||||
|
|
||||||
|
|
||||||
|
class DhcpRpcCallback(dhcp_rpc_base.DhcpRpcCallbackMixin):
|
||||||
|
# DhcpPluginApi BASE_RPC_API_VERSION
|
||||||
|
RPC_API_VERSION = '1.0'
|
||||||
|
|
||||||
|
|
||||||
|
class L3RpcCallback(l3_rpc_base.L3RpcCallbackMixin):
|
||||||
|
# L3PluginApi BASE_RPC_API_VERSION
|
||||||
|
RPC_API_VERSION = '1.0'
|
||||||
|
|
||||||
|
|
||||||
|
class SecurityGroupServerRpcCallback(
|
||||||
|
sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
|
||||||
|
|
||||||
|
RPC_API_VERSION = sg_rpc.SG_RPC_VERSION
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_port_from_device(device):
|
||||||
|
port = ndb.get_port_from_device(device)
|
||||||
|
if port:
|
||||||
|
port['device'] = device
|
||||||
|
LOG.debug(_("NECPluginV2RPCCallbacks.get_port_from_device() called, "
|
||||||
|
"device=%(device)s => %(ret)s."),
|
||||||
|
{'device': device, 'ret': port})
|
||||||
|
return port
|
||||||
|
|
||||||
|
|
||||||
|
class NECPluginV2RPCCallbacks(object):
|
||||||
|
|
||||||
RPC_API_VERSION = '1.0'
|
RPC_API_VERSION = '1.0'
|
||||||
|
|
||||||
|
@ -39,7 +39,7 @@ class TestNecPortsV2(test_plugin.TestPortsV2, NecPluginV2TestCase,
|
|||||||
test_bindings.PortBindingsTestCase):
|
test_bindings.PortBindingsTestCase):
|
||||||
|
|
||||||
VIF_TYPE = portbindings.VIF_TYPE_OVS
|
VIF_TYPE = portbindings.VIF_TYPE_OVS
|
||||||
HAS_SECURITY_GROUP = False
|
HAS_PORT_FILTER = True
|
||||||
|
|
||||||
|
|
||||||
class TestNecNetworksV2(test_plugin.TestNetworksV2, NecPluginV2TestCase):
|
class TestNecNetworksV2(test_plugin.TestNetworksV2, NecPluginV2TestCase):
|
||||||
|
88
quantum/tests/unit/nec/test_security_group.py
Normal file
88
quantum/tests/unit/nec/test_security_group.py
Normal file
@ -0,0 +1,88 @@
|
|||||||
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
|
#
|
||||||
|
# Copyright 2013, NEC Corporation
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import contextlib
|
||||||
|
|
||||||
|
import mock
|
||||||
|
|
||||||
|
from quantum.api.v2 import attributes
|
||||||
|
from quantum.extensions import securitygroup as ext_sg
|
||||||
|
from quantum import manager
|
||||||
|
from quantum.plugins.nec.db import api as ndb
|
||||||
|
from quantum.tests.unit import test_extension_security_group as test_sg
|
||||||
|
from quantum.tests.unit import test_security_groups_rpc as test_sg_rpc
|
||||||
|
|
||||||
|
PLUGIN_NAME = ('quantum.plugins.nec.nec_plugin.NECPluginV2')
|
||||||
|
AGENT_NAME = ('quantum.plugins.nec.agent.nec_quantum_agent.NECQuantumAgent')
|
||||||
|
NOTIFIER = ('quantum.plugins.nec.nec_plugin.NECPluginV2AgentNotifierApi')
|
||||||
|
|
||||||
|
|
||||||
|
class NecSecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase):
|
||||||
|
_plugin_name = PLUGIN_NAME
|
||||||
|
|
||||||
|
def setUp(self, plugin=None):
|
||||||
|
self.addCleanup(mock.patch.stopall)
|
||||||
|
notifier_p = mock.patch(NOTIFIER)
|
||||||
|
notifier_cls = notifier_p.start()
|
||||||
|
self.notifier = mock.Mock()
|
||||||
|
notifier_cls.return_value = self.notifier
|
||||||
|
self._attribute_map_bk_ = {}
|
||||||
|
for item in attributes.RESOURCE_ATTRIBUTE_MAP:
|
||||||
|
self._attribute_map_bk_[item] = (attributes.
|
||||||
|
RESOURCE_ATTRIBUTE_MAP[item].
|
||||||
|
copy())
|
||||||
|
super(NecSecurityGroupsTestCase, self).setUp(PLUGIN_NAME)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
super(NecSecurityGroupsTestCase, self).tearDown()
|
||||||
|
attributes.RESOURCE_ATTRIBUTE_MAP = self._attribute_map_bk_
|
||||||
|
|
||||||
|
|
||||||
|
class TestNecSecurityGroups(NecSecurityGroupsTestCase,
|
||||||
|
test_sg.TestSecurityGroups,
|
||||||
|
test_sg_rpc.SGNotificationTestMixin):
|
||||||
|
|
||||||
|
def test_security_group_get_port_from_device(self):
|
||||||
|
with contextlib.nested(self.network(),
|
||||||
|
self.security_group()) as (n, sg):
|
||||||
|
with self.subnet(n):
|
||||||
|
res = self._create_port(self.fmt, n['network']['id'])
|
||||||
|
port = self.deserialize(self.fmt, res)
|
||||||
|
port_id = port['port']['id']
|
||||||
|
sg_id = sg['security_group']['id']
|
||||||
|
fixed_ips = port['port']['fixed_ips']
|
||||||
|
|
||||||
|
data = {'port': {'fixed_ips': fixed_ips,
|
||||||
|
'name': port['port']['name'],
|
||||||
|
ext_sg.SECURITYGROUPS: [sg_id]}}
|
||||||
|
req = self.new_update_request('ports', data, port_id)
|
||||||
|
res = self.deserialize(self.fmt,
|
||||||
|
req.get_response(self.api))
|
||||||
|
|
||||||
|
plugin = manager.QuantumManager.get_plugin()
|
||||||
|
port_dict = plugin.callback_sg.get_port_from_device(port_id)
|
||||||
|
self.assertEqual(port_id, port_dict['id'])
|
||||||
|
self.assertEqual([sg_id],
|
||||||
|
port_dict[ext_sg.SECURITYGROUPS])
|
||||||
|
self.assertEqual([], port_dict['security_group_rules'])
|
||||||
|
self.assertEqual([fixed_ips[0]['ip_address']],
|
||||||
|
port_dict['fixed_ips'])
|
||||||
|
self._delete('ports', port_id)
|
||||||
|
|
||||||
|
|
||||||
|
class TestNecSecurityGroupsXML(TestNecSecurityGroups):
|
||||||
|
fmt = 'xml'
|
Loading…
Reference in New Issue
Block a user