vmware-nsx/vmware_nsx/db/extended_security_group.py
Gary Kotton b4c7750818 Fix broken unit tests
The commit dcb2a931b5b84fb7aa41f08b37a5148bf6e987bc broke us.
Prior to that there was commit cbc15d2e1db22fbefaac0a97363589aebf834b24
that tried to remove checks for specific IPs.

In the NSX|V plugin the DHCP port is assigned.

So the following was done:
1. Removed tests that are working in the base plugin
2. Updated necessary tests
3. Fixed a random fail with the security groups

Change-Id: I99c60dd7c5b78a2a14c8e71cf7408439b6004dd1
2016-06-10 11:10:37 -07:00

77 lines
3.3 KiB
Python

# Copyright 2016 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy as sa
from sqlalchemy import orm
from neutron.db import db_base_plugin_v2
from neutron.db import model_base
from neutron.db import securitygroups_db
from neutron.extensions import securitygroup as ext_sg
from vmware_nsx.extensions import securitygrouplogging as sg_logging
class NsxExtendedSecurityGroupProperties(model_base.BASEV2):
__tablename__ = 'nsx_extended_security_group_properties'
security_group_id = sa.Column(sa.String(36),
sa.ForeignKey('securitygroups.id',
ondelete="CASCADE"),
primary_key=True)
logging = sa.Column(sa.Boolean, default=False, nullable=False)
security_group = orm.relationship(
securitygroups_db.SecurityGroup,
backref=orm.backref('ext_properties', lazy='joined',
uselist=False, cascade='delete'))
class ExtendedSecurityGroupPropertiesMixin(object):
def _process_security_group_properties_create(self, context,
sg_res, sg_req):
with context.session.begin(subtransactions=True):
properties = NsxExtendedSecurityGroupProperties(
security_group_id=sg_res['id'],
logging=sg_req.get(sg_logging.LOGGING, False))
context.session.add(properties)
sg_res[sg_logging.LOGGING] = sg_req.get(sg_logging.LOGGING, False)
def _get_security_group_properties(self, context, security_group_id):
return context.session.query(
NsxExtendedSecurityGroupProperties).filter_by(
security_group_id=security_group_id).one()
def _process_security_group_properties_update(self, context,
sg_res, sg_req):
if (sg_logging.LOGGING in sg_req
and (sg_req[sg_logging.LOGGING] !=
sg_res.get(sg_logging.LOGGING, False))):
prop = self._get_security_group_properties(context, sg_res['id'])
with context.session.begin(subtransactions=True):
prop.update({sg_logging.LOGGING: sg_req[sg_logging.LOGGING]})
sg_res[sg_logging.LOGGING] = sg_req[sg_logging.LOGGING]
def _is_security_group_logged(self, context, security_group_id):
prop = self._get_security_group_properties(context, security_group_id)
return prop.logging
db_base_plugin_v2.NeutronDbPluginV2.register_dict_extend_funcs(
ext_sg.SECURITYGROUPS, ['_extend_security_group_with_properties'])
def _extend_security_group_with_properties(self, sg_res, sg_db):
if sg_db.ext_properties:
sg_res[sg_logging.LOGGING] = sg_db.ext_properties.logging