Quantum Security Groups API
Implements blueprint quantum-security-groups API In addition the the convention that if a URI has a '-' in it, it's replaced with a '_'. For example: POST security-groups will convert the body to {'security_group': ..} Change-Id: I2c5219ed1d44a43ce1bf03d49df9f5c1af23352b
This commit is contained in:
parent
5fe09f2437
commit
2cff3362a7
@ -111,7 +111,7 @@ class Controller(object):
|
||||
if member_actions is None:
|
||||
member_actions = []
|
||||
self._plugin = plugin
|
||||
self._collection = collection
|
||||
self._collection = collection.replace('-', '_')
|
||||
self._resource = resource
|
||||
self._attr_info = attr_info
|
||||
self._allow_bulk = allow_bulk
|
||||
|
466
quantum/db/securitygroups_db.py
Normal file
466
quantum/db/securitygroups_db.py
Normal file
@ -0,0 +1,466 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
#
|
||||
# Copyright 2012 Nicira Networks, Inc. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# @author: Aaron Rosen, Nicira, Inc
|
||||
#
|
||||
|
||||
import re
|
||||
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy import orm
|
||||
from sqlalchemy.orm import exc
|
||||
from sqlalchemy.orm import scoped_session
|
||||
|
||||
from quantum.api.v2 import attributes
|
||||
from quantum.openstack.common import cfg
|
||||
from quantum.common import utils
|
||||
from quantum.db import model_base
|
||||
from quantum.db import models_v2
|
||||
from quantum.extensions import securitygroup as ext_sg
|
||||
|
||||
|
||||
class SecurityGroup(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
|
||||
"""Represents a v2 quantum security group."""
|
||||
name = sa.Column(sa.String(255))
|
||||
description = sa.Column(sa.String(255))
|
||||
external_id = sa.Column(sa.Integer, unique=True)
|
||||
|
||||
|
||||
class SecurityGroupPortBinding(model_base.BASEV2):
|
||||
"""Represents binding between quantum ports and security profiles"""
|
||||
port_id = sa.Column(sa.String(36), sa.ForeignKey("ports.id"),
|
||||
primary_key=True)
|
||||
security_group_id = sa.Column(sa.String(36),
|
||||
sa.ForeignKey("securitygroups.id"),
|
||||
primary_key=True)
|
||||
|
||||
|
||||
class SecurityGroupRule(model_base.BASEV2, models_v2.HasId,
|
||||
models_v2.HasTenant):
|
||||
"""Represents a v2 quantum security group rule."""
|
||||
external_id = sa.Column(sa.Integer)
|
||||
security_group_id = sa.Column(sa.String(36),
|
||||
sa.ForeignKey("securitygroups.id",
|
||||
ondelete="CASCADE"),
|
||||
nullable=False)
|
||||
|
||||
source_group_id = sa.Column(sa.String(36),
|
||||
sa.ForeignKey("securitygroups.id",
|
||||
ondelete="CASCADE"),
|
||||
nullable=True)
|
||||
|
||||
direction = sa.Column(sa.Enum('ingress', 'egress'))
|
||||
ethertype = sa.Column(sa.String(40))
|
||||
protocol = sa.Column(sa.String(40))
|
||||
port_range_min = sa.Column(sa.Integer)
|
||||
port_range_max = sa.Column(sa.Integer)
|
||||
source_ip_prefix = sa.Column(sa.String(255))
|
||||
security_group = orm.relationship(
|
||||
SecurityGroup,
|
||||
backref=orm.backref('rules', cascade='all,delete'),
|
||||
primaryjoin="SecurityGroup.id==SecurityGroupRule.security_group_id")
|
||||
source_group = orm.relationship(
|
||||
SecurityGroup,
|
||||
backref=orm.backref('source_rules', cascade='all,delete'),
|
||||
primaryjoin="SecurityGroup.id==SecurityGroupRule.source_group_id")
|
||||
|
||||
|
||||
class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
|
||||
"""Mixin class to add security group to db_plugin_base_v2."""
|
||||
|
||||
__native_bulk_support = True
|
||||
sg_supported_protocols = ['tcp', 'udp', 'icmp']
|
||||
sg_supported_ethertypes = ['IPv4', 'IPv6']
|
||||
|
||||
def create_security_group_bulk(self, context, security_group_rule):
|
||||
return self._create_bulk('security_group', context,
|
||||
security_group_rule)
|
||||
|
||||
def create_security_group(self, context, security_group, default_sg=False):
|
||||
"""Create security group.
|
||||
If default_sg is true that means we are a default security group for
|
||||
a given tenant if it does not exist.
|
||||
"""
|
||||
s = security_group['security_group']
|
||||
if (cfg.CONF.SECURITYGROUP.proxy_mode and not context.is_admin):
|
||||
raise ext_sg.SecurityGroupProxyModeNotAdmin()
|
||||
if (cfg.CONF.SECURITYGROUP.proxy_mode and not s.get('external_id')):
|
||||
raise ext_sg.SecurityGroupProxyMode()
|
||||
if not cfg.CONF.SECURITYGROUP.proxy_mode and s.get('external_id'):
|
||||
raise ext_sg.SecurityGroupNotProxyMode()
|
||||
|
||||
tenant_id = self._get_tenant_id_for_create(context, s)
|
||||
|
||||
# if in proxy mode a default security group will be created by source
|
||||
if not default_sg and not cfg.CONF.SECURITYGROUP.proxy_mode:
|
||||
self._ensure_default_security_group(context, tenant_id,
|
||||
security_group)
|
||||
if s.get('external_id'):
|
||||
try:
|
||||
# Check if security group already exists
|
||||
sg = self.get_security_group(context, s.get('external_id'))
|
||||
if sg:
|
||||
raise ext_sg.SecurityGroupAlreadyExists(
|
||||
name=sg.get('name', ''),
|
||||
external_id=s.get('external_id'))
|
||||
except ext_sg.SecurityGroupNotFound:
|
||||
pass
|
||||
|
||||
with context.session.begin(subtransactions=True):
|
||||
security_group_db = SecurityGroup(id=s.get('id') or (
|
||||
utils.str_uuid()),
|
||||
description=s['description'],
|
||||
tenant_id=tenant_id,
|
||||
name=s['name'],
|
||||
external_id=s.get('external_id'))
|
||||
context.session.add(security_group_db)
|
||||
if s.get('name') == 'default':
|
||||
for ethertype in self.sg_supported_ethertypes:
|
||||
# Allow all egress traffic
|
||||
db = SecurityGroupRule(
|
||||
id=utils.str_uuid(), tenant_id=tenant_id,
|
||||
security_group=security_group_db,
|
||||
direction='egress',
|
||||
ethertype=ethertype)
|
||||
context.session.add(db)
|
||||
# Allow intercommunication
|
||||
db = SecurityGroupRule(
|
||||
id=utils.str_uuid(), tenant_id=tenant_id,
|
||||
security_group=security_group_db,
|
||||
direction='ingress',
|
||||
source_group=security_group_db,
|
||||
ethertype=ethertype)
|
||||
context.session.add(db)
|
||||
|
||||
return self._make_security_group_dict(security_group_db)
|
||||
|
||||
def get_security_groups(self, context, filters=None, fields=None):
|
||||
return self._get_collection(context, SecurityGroup,
|
||||
self._make_security_group_dict,
|
||||
filters=filters, fields=fields)
|
||||
|
||||
def get_security_group(self, context, id, fields=None, tenant_id=None):
|
||||
"""Tenant id is given to handle the case when we
|
||||
are creating a security group or security group rule on behalf of
|
||||
another use.
|
||||
"""
|
||||
|
||||
if tenant_id:
|
||||
tmp_context_tenant_id = context.tenant_id
|
||||
context.tenant_id = tenant_id
|
||||
|
||||
try:
|
||||
ret = self._make_security_group_dict(self._get_security_group(
|
||||
context, id), fields)
|
||||
finally:
|
||||
if tenant_id:
|
||||
context.tenant_id = tmp_context_tenant_id
|
||||
return ret
|
||||
|
||||
def _get_security_group(self, context, id):
|
||||
try:
|
||||
query = self._model_query(context, SecurityGroup)
|
||||
if not re.match(attributes.UUID_PATTERN, str(id)):
|
||||
sg = query.filter(SecurityGroup.external_id == id).one()
|
||||
else:
|
||||
sg = query.filter(SecurityGroup.id == id).one()
|
||||
|
||||
except exc.NoResultFound:
|
||||
raise ext_sg.SecurityGroupNotFound(id=id)
|
||||
return sg
|
||||
|
||||
def delete_security_group(self, context, id):
|
||||
if (cfg.CONF.SECURITYGROUP.proxy_mode and not context.is_admin):
|
||||
raise ext_sg.SecurityGroupProxyModeNotAdmin()
|
||||
|
||||
filters = {'security_group_id': [id]}
|
||||
ports = self._get_port_security_group_bindings(context, filters)
|
||||
if ports:
|
||||
raise ext_sg.SecurityGroupInUse(id=id)
|
||||
# confirm security group exists
|
||||
sg = self._get_security_group(context, id)
|
||||
|
||||
if sg['name'] == 'default':
|
||||
raise ext_sg.SecurityGroupCannotRemoveDefault()
|
||||
with context.session.begin(subtransactions=True):
|
||||
context.session.delete(sg)
|
||||
|
||||
def _make_security_group_dict(self, security_group, fields=None):
|
||||
res = {'id': security_group['id'],
|
||||
'name': security_group['name'],
|
||||
'tenant_id': security_group['tenant_id'],
|
||||
'description': security_group['description']}
|
||||
if security_group.get('external_id'):
|
||||
res['external_id'] = security_group['external_id']
|
||||
return self._fields(res, fields)
|
||||
|
||||
def _make_security_group_binding_dict(self, security_group, fields=None):
|
||||
res = {'port_id': security_group['port_id'],
|
||||
'security_group_id': security_group['security_group_id']}
|
||||
return self._fields(res, fields)
|
||||
|
||||
def _create_port_security_group_binding(self, context, port_id,
|
||||
security_group_id):
|
||||
with context.session.begin(subtransactions=True):
|
||||
db = SecurityGroupPortBinding(port_id=port_id,
|
||||
security_group_id=security_group_id)
|
||||
context.session.add(db)
|
||||
|
||||
def _get_port_security_group_bindings(self, context,
|
||||
filters=None, fields=None):
|
||||
return self._get_collection(context, SecurityGroupPortBinding,
|
||||
self._make_security_group_binding_dict,
|
||||
filters=filters, fields=fields)
|
||||
|
||||
def _delete_port_security_group_bindings(self, context, port_id):
|
||||
query = self._model_query(context, SecurityGroupPortBinding)
|
||||
bindings = query.filter(
|
||||
SecurityGroupPortBinding.port_id == port_id)
|
||||
with context.session.begin(subtransactions=True):
|
||||
for binding in bindings:
|
||||
context.session.delete(binding)
|
||||
|
||||
def create_security_group_rule_bulk(self, context, security_group_rule):
|
||||
return self._create_bulk('security_group_rule', context,
|
||||
security_group_rule)
|
||||
|
||||
def create_security_group_rule_bulk_native(self, context,
|
||||
security_group_rule):
|
||||
r = security_group_rule['security_group_rules']
|
||||
|
||||
scoped_session(context.session)
|
||||
security_group_id = self._validate_security_group_rules(
|
||||
context, security_group_rule)
|
||||
with context.session.begin(subtransactions=True):
|
||||
if not self.get_security_group(context, security_group_id):
|
||||
raise ext_sg.SecurityGroupNotFound(id=security_group_id)
|
||||
|
||||
self._check_for_duplicate_rules(context, r)
|
||||
ret = []
|
||||
for rule_dict in r:
|
||||
rule = rule_dict['security_group_rule']
|
||||
tenant_id = self._get_tenant_id_for_create(context, rule)
|
||||
db = SecurityGroupRule(
|
||||
id=utils.str_uuid(), tenant_id=tenant_id,
|
||||
security_group_id=rule['security_group_id'],
|
||||
direction=rule['direction'],
|
||||
external_id=rule.get('external_id'),
|
||||
source_group_id=rule.get('source_group_id'),
|
||||
ethertype=rule['ethertype'],
|
||||
protocol=rule['protocol'],
|
||||
port_range_min=rule['port_range_min'],
|
||||
port_range_max=rule['port_range_max'],
|
||||
source_ip_prefix=rule.get('source_ip_prefix'))
|
||||
context.session.add(db)
|
||||
ret.append(self._make_security_group_rule_dict(db))
|
||||
return ret
|
||||
|
||||
def create_security_group_rule(self, context, security_group_rule):
|
||||
bulk_rule = {'security_group_rules': [security_group_rule]}
|
||||
return self.create_security_group_rule_bulk_native(context,
|
||||
bulk_rule)[0]
|
||||
|
||||
def _validate_security_group_rules(self, context, security_group_rule):
|
||||
"""Check that rules being installed all belong to the same security
|
||||
group, source_group_id/security_group_id belong to the same tenant,
|
||||
and rules are valid.
|
||||
"""
|
||||
|
||||
if (cfg.CONF.SECURITYGROUP.proxy_mode and not context.is_admin):
|
||||
raise ext_sg.SecurityGroupProxyModeNotAdmin()
|
||||
|
||||
new_rules = set()
|
||||
tenant_ids = set()
|
||||
for rules in security_group_rule['security_group_rules']:
|
||||
rule = rules.get('security_group_rule')
|
||||
new_rules.add(rule['security_group_id'])
|
||||
|
||||
if (cfg.CONF.SECURITYGROUP.proxy_mode and
|
||||
not rule.get('external_id')):
|
||||
raise ext_sg.SecurityGroupProxyMode()
|
||||
if (not cfg.CONF.SECURITYGROUP.proxy_mode and
|
||||
rule.get('external_id')):
|
||||
raise ext_sg.SecurityGroupNotProxyMode()
|
||||
|
||||
# Check that protocol/ethertype are valid
|
||||
protocol = rule.get('protocol')
|
||||
if protocol and protocol not in self.sg_supported_protocols:
|
||||
raise ext_sg.SecurityGroupInvalidProtocolType(value=protocol)
|
||||
ethertype = rule.get('ethertype')
|
||||
if ethertype and ethertype not in self.sg_supported_ethertypes:
|
||||
raise ext_sg.SecurityGroupInvalidEtherType(value=ethertype)
|
||||
|
||||
# Check that port_range's are valid
|
||||
if (rule['port_range_min'] is None and
|
||||
rule['port_range_max'] is None):
|
||||
pass
|
||||
elif (rule['port_range_min'] is not None and
|
||||
rule['port_range_min'] <= rule['port_range_max']):
|
||||
if not rule['protocol']:
|
||||
raise ext_sg.SecurityGroupProtocolRequiredWithPorts()
|
||||
else:
|
||||
raise ext_sg.SecurityGroupInvalidPortRange()
|
||||
|
||||
if rule['source_ip_prefix'] and rule['source_group_id']:
|
||||
raise ext_sg.SecurityGroupSourceGroupAndIpPrefix()
|
||||
|
||||
if rule['tenant_id'] not in tenant_ids:
|
||||
tenant_ids.add(rule['tenant_id'])
|
||||
source_group_id = rule.get('source_group_id')
|
||||
# Check that source_group_id exists for tenant
|
||||
if source_group_id:
|
||||
self.get_security_group(context, source_group_id,
|
||||
tenant_id=rule['tenant_id'])
|
||||
if len(new_rules) > 1:
|
||||
raise ext_sg.SecurityGroupNotSingleGroupRules()
|
||||
security_group_id = new_rules.pop()
|
||||
|
||||
# Confirm single tenant and that the tenant has permission
|
||||
# to add rules to this security group.
|
||||
if len(tenant_ids) > 1:
|
||||
raise ext_sg.SecurityGroupRulesNotSingleTenant()
|
||||
for tenant_id in tenant_ids:
|
||||
self.get_security_group(context, security_group_id,
|
||||
tenant_id=tenant_id)
|
||||
return security_group_id
|
||||
|
||||
def _make_security_group_rule_dict(self, security_group_rule, fields=None):
|
||||
res = {'id': security_group_rule['id'],
|
||||
'tenant_id': security_group_rule['tenant_id'],
|
||||
'security_group_id': security_group_rule['security_group_id'],
|
||||
'ethertype': security_group_rule['ethertype'],
|
||||
'direction': security_group_rule['direction'],
|
||||
'protocol': security_group_rule['protocol'],
|
||||
'port_range_min': security_group_rule['port_range_min'],
|
||||
'port_range_max': security_group_rule['port_range_max'],
|
||||
'source_ip_prefix': security_group_rule['source_ip_prefix'],
|
||||
'source_group_id': security_group_rule['source_group_id'],
|
||||
'external_id': security_group_rule['external_id']}
|
||||
|
||||
return self._fields(res, fields)
|
||||
|
||||
def _make_security_group_rule_filter_dict(self, security_group_rule):
|
||||
sgr = security_group_rule['security_group_rule']
|
||||
res = {'tenant_id': [sgr['tenant_id']],
|
||||
'security_group_id': [sgr['security_group_id']],
|
||||
'direction': [sgr['direction']]}
|
||||
|
||||
include_if_present = ['protocol', 'port_range_max', 'port_range_min',
|
||||
'ethertype', 'source_ip_prefix',
|
||||
'source_group_id', 'external_id']
|
||||
for key in include_if_present:
|
||||
value = sgr.get(key)
|
||||
if value:
|
||||
res[key] = [value]
|
||||
return res
|
||||
|
||||
def _check_for_duplicate_rules(self, context, security_group_rules):
|
||||
for i in security_group_rules:
|
||||
found_self = False
|
||||
for j in security_group_rules:
|
||||
if i['security_group_rule'] == j['security_group_rule']:
|
||||
if found_self:
|
||||
raise ext_sg.DuplicateSecurityGroupRuleInPost(rule=i)
|
||||
found_self = True
|
||||
|
||||
# Check in database if rule exists
|
||||
filters = self._make_security_group_rule_filter_dict(i)
|
||||
if self.get_security_group_rules(context, filters):
|
||||
raise ext_sg.SecurityGroupRuleExists(rule=i)
|
||||
|
||||
def get_security_group_rules(self, context, filters=None, fields=None):
|
||||
return self._get_collection(context, SecurityGroupRule,
|
||||
self._make_security_group_rule_dict,
|
||||
filters=filters, fields=fields)
|
||||
|
||||
def get_security_group_rule(self, context, id, fields=None):
|
||||
security_group_rule = self._get_security_group_rule(context, id)
|
||||
return self._make_security_group_rule_dict(security_group_rule, fields)
|
||||
|
||||
def _get_security_group_rule(self, context, id):
|
||||
try:
|
||||
if not re.match(attributes.UUID_PATTERN, id):
|
||||
query = self._model_query(context, SecurityGroupRule)
|
||||
sgr = query.filter(SecurityGroupRule.external_id == id).one()
|
||||
else:
|
||||
query = self._model_query(context, SecurityGroupRule)
|
||||
sgr = query.filter(SecurityGroupRule.id == id).one()
|
||||
except exc.NoResultFound:
|
||||
raise ext_sg.SecurityGroupRuleNotFound(id=id)
|
||||
return sgr
|
||||
|
||||
def delete_security_group_rule(self, context, sgrid):
|
||||
if (cfg.CONF.SECURITYGROUP.proxy_mode and not context.is_admin):
|
||||
raise ext_sg.SecurityGroupProxyModeNotAdmin()
|
||||
with context.session.begin(subtransactions=True):
|
||||
rule = self._get_security_group_rule(context, sgrid)
|
||||
context.session.delete(rule)
|
||||
|
||||
def _extend_port_dict_security_group(self, context, port):
|
||||
filters = {'port_id': [port['id']]}
|
||||
fields = {'security_group_id': None}
|
||||
port[ext_sg.SECURITYGROUP] = []
|
||||
security_group_id = self._get_port_security_group_bindings(
|
||||
context, filters, fields)
|
||||
for security_group_id in security_group_id:
|
||||
port[ext_sg.SECURITYGROUP].append(
|
||||
security_group_id['security_group_id'])
|
||||
return port
|
||||
|
||||
def _process_port_create_security_group(self, context, port_id,
|
||||
security_group_id):
|
||||
if not security_group_id:
|
||||
return
|
||||
for security_group_id in security_group_id:
|
||||
self._create_port_security_group_binding(context, port_id,
|
||||
security_group_id)
|
||||
|
||||
def _ensure_default_security_group(self, context, tenant_id,
|
||||
security_group=None):
|
||||
"""Create a default security group if one doesn't exist.
|
||||
|
||||
:returns: the default security group id.
|
||||
"""
|
||||
# if in proxy mode a default security group will be created by source
|
||||
if not security_group and cfg.CONF.SECURITYGROUP.proxy_mode:
|
||||
return
|
||||
|
||||
filters = {'name': ['default'], 'tenant_id': [tenant_id]}
|
||||
default_group = self.get_security_groups(context, filters)
|
||||
if not default_group:
|
||||
security_group = {'security_group': {'name': 'default',
|
||||
'tenant_id': tenant_id,
|
||||
'description': 'default'}}
|
||||
if security_group:
|
||||
security_group['security_group']['external_id'] = (
|
||||
security_group['security_group'].get('external_id'))
|
||||
ret = self.create_security_group(context, security_group, True)
|
||||
return ret['id']
|
||||
else:
|
||||
return default_group[0]['id']
|
||||
|
||||
def _validate_security_groups_on_port(self, context, port):
|
||||
p = port['port']
|
||||
if not p.get(ext_sg.SECURITYGROUP):
|
||||
return
|
||||
|
||||
valid_groups = self.get_security_groups(context, fields={'id': None})
|
||||
valid_groups_set = set([x['id'] for x in valid_groups])
|
||||
req_sg_set = set(p[ext_sg.SECURITYGROUP])
|
||||
invalid_sg_set = req_sg_set - valid_groups_set
|
||||
if invalid_sg_set:
|
||||
msg = ' '.join(str(x) for x in invalid_sg_set)
|
||||
raise ext_sg.SecurityGroupNotFound(id=msg)
|
318
quantum/extensions/securitygroup.py
Normal file
318
quantum/extensions/securitygroup.py
Normal file
@ -0,0 +1,318 @@
|
||||
# Copyright (c) 2012 OpenStack, LLC.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
from abc import abstractmethod
|
||||
|
||||
from quantum.api.v2 import attributes as attr
|
||||
from quantum.api.v2 import base
|
||||
from quantum.common import exceptions as qexception
|
||||
from quantum.extensions import extensions
|
||||
from quantum import manager
|
||||
from quantum.openstack.common import cfg
|
||||
from quantum import quota
|
||||
|
||||
|
||||
# Security group Exceptions
|
||||
class SecurityGroupAlreadyExists(qexception.InUse):
|
||||
# This can only happen if the external_id database is cleared
|
||||
message = _("Security group %(name)s id %(external_id)s already exists")
|
||||
|
||||
|
||||
class SecurityGroupInvalidProtocolType(qexception.InvalidInput):
|
||||
message = _("Invalid protocol type %(value)s")
|
||||
|
||||
|
||||
class SecurityGroupInvalidEtherType(qexception.InvalidInput):
|
||||
message = _("Invalid/Unsupported ethertype %(value)s")
|
||||
|
||||
|
||||
class SecurityGroupInvalidPortRange(qexception.InvalidInput):
|
||||
message = _("For TCP/UDP protocols, port_range_min must be "
|
||||
"<= port_range_max")
|
||||
|
||||
|
||||
class SecurityGroupInvalidPortValue(qexception.InvalidInput):
|
||||
message = _("Invalid value for port %(port)s")
|
||||
|
||||
|
||||
class SecurityGroupInUse(qexception.InUse):
|
||||
message = _("Security Group %(id)s in use.")
|
||||
|
||||
|
||||
class SecurityGroupCannotRemoveDefault(qexception.InUse):
|
||||
message = _("Removing default security group not allowed.")
|
||||
|
||||
|
||||
class SecurityGroupDefaultAlreadyExists(qexception.InUse):
|
||||
message = _("Default security group already exists.")
|
||||
|
||||
|
||||
class SecurityGroupRuleInvalidProtocol(qexception.InUse):
|
||||
message = _("Security group rule protocol %(protocol)s not supported "
|
||||
"only protocol values %(values)s supported.")
|
||||
|
||||
|
||||
class SecurityGroupRulesNotSingleTenant(qexception.InvalidInput):
|
||||
message = _("Multiple tenant_ids in bulk security group rule create"
|
||||
" not allowed")
|
||||
|
||||
|
||||
class SecurityGroupSourceGroupAndIpPrefix(qexception.InvalidInput):
|
||||
message = _("Only source_ip_prefix or source_group_id may "
|
||||
"be provided.")
|
||||
|
||||
|
||||
class SecurityGroupProtocolRequiredWithPorts(qexception.InvalidInput):
|
||||
message = _("Must also specifiy protocol if port range is given.")
|
||||
|
||||
|
||||
class SecurityGroupNotSingleGroupRules(qexception.InvalidInput):
|
||||
message = _("Only allowed to update rules for "
|
||||
"one security profile at a time")
|
||||
|
||||
|
||||
class SecurityGroupSourceGroupNotFound(qexception.NotFound):
|
||||
message = _("source group id %(id)s does not exist")
|
||||
|
||||
|
||||
class SecurityGroupNotFound(qexception.NotFound):
|
||||
message = _("Security group %(id)s does not exist")
|
||||
|
||||
|
||||
class SecurityGroupRuleNotFound(qexception.NotFound):
|
||||
message = _("Security group rule %(id)s does not exist")
|
||||
|
||||
|
||||
class DuplicateSecurityGroupRuleInPost(qexception.InUse):
|
||||
message = _("Duplicate Security Group Rule in POST.")
|
||||
|
||||
|
||||
class SecurityGroupRuleExists(qexception.InUse):
|
||||
message = _("Security group rule exists %(rule)s")
|
||||
|
||||
|
||||
class SecurityGroupProxyMode(qexception.InUse):
|
||||
message = _("Did not recieve external id and in proxy mode")
|
||||
|
||||
|
||||
class SecurityGroupNotProxyMode(qexception.InUse):
|
||||
message = _("Recieve external id and not in proxy mode")
|
||||
|
||||
|
||||
class SecurityGroupProxyModeNotAdmin(qexception.InvalidExtenstionEnv):
|
||||
message = _("In Proxy Mode and not from admin")
|
||||
|
||||
|
||||
class SecurityGroupInvalidExternalID(qexception.InvalidInput):
|
||||
message = _("external_id wrong type %(data)s")
|
||||
|
||||
|
||||
def convert_validate_port_value(port):
|
||||
if port is None:
|
||||
return port
|
||||
try:
|
||||
val = int(port)
|
||||
except (ValueError, TypeError):
|
||||
raise SecurityGroupInvalidPortValue(port=port)
|
||||
|
||||
if val >= 0 and val <= 65535:
|
||||
return val
|
||||
else:
|
||||
raise SecurityGroupInvalidPortValue(port=port)
|
||||
|
||||
|
||||
def _validate_name_not_default(data, valid_values=None):
|
||||
if not cfg.CONF.SECURITYGROUP.proxy_mode and data == "default":
|
||||
raise SecurityGroupDefaultAlreadyExists()
|
||||
|
||||
|
||||
def _validate_external_id_and_mode(external_id, valid_values=None):
|
||||
if not cfg.CONF.SECURITYGROUP.proxy_mode and not external_id:
|
||||
return
|
||||
elif not cfg.CONF.SECURITYGROUP.proxy_mode and external_id:
|
||||
raise SecurityGroupNotProxyMode()
|
||||
try:
|
||||
int(external_id)
|
||||
except (ValueError, TypeError):
|
||||
raise SecurityGroupInvalidExternalID(data=external_id)
|
||||
if cfg.CONF.SECURITYGROUP.proxy_mode and not external_id:
|
||||
raise SecurityGroupProxyMode()
|
||||
|
||||
attr.validators['type:name_not_default'] = _validate_name_not_default
|
||||
attr.validators['type:external_id_and_mode'] = _validate_external_id_and_mode
|
||||
|
||||
# Attribute Map
|
||||
RESOURCE_ATTRIBUTE_MAP = {
|
||||
'security_groups': {
|
||||
'id': {'allow_post': False, 'allow_put': False,
|
||||
'validate': {'type:regex': attr.UUID_PATTERN},
|
||||
'is_visible': True},
|
||||
'name': {'allow_post': True, 'allow_put': False,
|
||||
'is_visible': True, 'default': '',
|
||||
'validate': {'type:name_not_default': None}},
|
||||
'description': {'allow_post': True, 'allow_put': False,
|
||||
'is_visible': True, 'default': ''},
|
||||
'external_id': {'allow_post': True, 'allow_put': False,
|
||||
'is_visible': True, 'default': None,
|
||||
'validate': {'type:external_id_and_mode': None}},
|
||||
'tenant_id': {'allow_post': True, 'allow_put': False,
|
||||
'required_by_policy': True,
|
||||
'is_visible': True},
|
||||
},
|
||||
'security_group_rules': {
|
||||
'id': {'allow_post': False, 'allow_put': False,
|
||||
'validate': {'type:regex': attr.UUID_PATTERN},
|
||||
'is_visible': True},
|
||||
# external_id can be used to be backwards compatible with nova
|
||||
'external_id': {'allow_post': True, 'allow_put': False,
|
||||
'is_visible': True, 'default': None,
|
||||
'validate': {'type:external_id_and_mode': None}},
|
||||
'security_group_id': {'allow_post': True, 'allow_put': False,
|
||||
'is_visible': True, 'required_by_policy': True},
|
||||
'source_group_id': {'allow_post': True, 'allow_put': False,
|
||||
'default': None, 'is_visible': True},
|
||||
'direction': {'allow_post': True, 'allow_put': True,
|
||||
'is_visible': True,
|
||||
'validate': {'type:values': ['ingress', 'egress']}},
|
||||
'protocol': {'allow_post': True, 'allow_put': False,
|
||||
'is_visible': True, 'default': None},
|
||||
'port_range_min': {'allow_post': True, 'allow_put': False,
|
||||
'convert_to': convert_validate_port_value,
|
||||
'default': None, 'is_visible': True},
|
||||
'port_range_max': {'allow_post': True, 'allow_put': False,
|
||||
'convert_to': convert_validate_port_value,
|
||||
'default': None, 'is_visible': True},
|
||||
'ethertype': {'allow_post': True, 'allow_put': False,
|
||||
'is_visible': True, 'default': 'IPv4'},
|
||||
'source_ip_prefix': {'allow_post': True, 'allow_put': False,
|
||||
'default': None, 'is_visible': True},
|
||||
'tenant_id': {'allow_post': True, 'allow_put': False,
|
||||
'required_by_policy': True,
|
||||
'is_visible': True},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
SECURITYGROUP = 'security_groups'
|
||||
EXTENDED_ATTRIBUTES_2_0 = {
|
||||
'ports': {SECURITYGROUP: {'allow_post': True,
|
||||
'allow_put': True,
|
||||
'is_visible': True,
|
||||
'default': None}}}
|
||||
security_group_quota_opts = [
|
||||
cfg.IntOpt('quota_security_group',
|
||||
default=10,
|
||||
help='number of security groups allowed per tenant,'
|
||||
'-1 for unlimited'),
|
||||
cfg.IntOpt('quota_security_group_rule',
|
||||
default=100,
|
||||
help='number of security rules allowed per tenant, '
|
||||
'-1 for unlimited'),
|
||||
]
|
||||
cfg.CONF.register_opts(security_group_quota_opts, 'QUOTAS')
|
||||
|
||||
security_group_opts = [
|
||||
cfg.StrOpt('proxy_mode', default=False)
|
||||
]
|
||||
cfg.CONF.register_opts(security_group_opts, 'SECURITYGROUP')
|
||||
|
||||
|
||||
class Securitygroup(object):
|
||||
""" Security group extension"""
|
||||
|
||||
@classmethod
|
||||
def get_name(cls):
|
||||
return "security-group"
|
||||
|
||||
@classmethod
|
||||
def get_alias(cls):
|
||||
return "security-group"
|
||||
|
||||
@classmethod
|
||||
def get_description(cls):
|
||||
return "The security groups extension."
|
||||
|
||||
@classmethod
|
||||
def get_namespace(cls):
|
||||
# todo
|
||||
return "http://docs.openstack.org/ext/securitygroups/api/v2.0"
|
||||
|
||||
@classmethod
|
||||
def get_updated(cls):
|
||||
return "2012-10-05T10:00:00-00:00"
|
||||
|
||||
@classmethod
|
||||
def get_resources(cls):
|
||||
""" Returns Ext Resources """
|
||||
exts = []
|
||||
plugin = manager.QuantumManager.get_plugin()
|
||||
for resource_name in ['security_group', 'security_group_rule']:
|
||||
collection_name = resource_name.replace('_', '-') + "s"
|
||||
params = RESOURCE_ATTRIBUTE_MAP.get(resource_name + "s", dict())
|
||||
quota.QUOTAS.register_resource_by_name(resource_name)
|
||||
controller = base.create_resource(collection_name,
|
||||
resource_name,
|
||||
plugin, params, allow_bulk=True)
|
||||
|
||||
ex = extensions.ResourceExtension(collection_name,
|
||||
controller)
|
||||
exts.append(ex)
|
||||
|
||||
return exts
|
||||
|
||||
def get_extended_resources(self, version):
|
||||
if version == "2.0":
|
||||
return EXTENDED_ATTRIBUTES_2_0
|
||||
else:
|
||||
return {}
|
||||
|
||||
|
||||
class SecurityGroupPluginBase(object):
|
||||
@abstractmethod
|
||||
def create_security_group(self, context, security_group):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def delete_security_group(self, context, security_group):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def update_security_group(self, context, security_group):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_security_groups(self, context, filters=None, fields=None):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_security_group(self, context, id, fields=None):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def create_security_group_rule(self, context, security_group_rule):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def delete_security_group_rule(self, context, sgrid):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_security_group_rules(self, context, filters=None, fields=None):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_security_group_rule(self, context, id, fields=None):
|
||||
pass
|
@ -258,7 +258,7 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
|
||||
|
||||
for arg in ('admin_state_up', 'device_id',
|
||||
'mac_address', 'name', 'fixed_ips',
|
||||
'tenant_id', 'device_owner'):
|
||||
'tenant_id', 'device_owner', 'security_groups'):
|
||||
# Arg must be present and not empty
|
||||
if arg in kwargs and kwargs[arg]:
|
||||
data['port'][arg] = kwargs[arg]
|
||||
|
829
quantum/tests/unit/test_extension_security_group.py
Normal file
829
quantum/tests/unit/test_extension_security_group.py
Normal file
@ -0,0 +1,829 @@
|
||||
# Copyright (c) 2012 OpenStack, LLC.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import contextlib
|
||||
import os
|
||||
|
||||
import mock
|
||||
import unittest2
|
||||
import webob.exc
|
||||
|
||||
from quantum.api.v2 import attributes
|
||||
from quantum.api.v2.router import APIRouter
|
||||
from quantum import context
|
||||
from quantum.common.test_lib import test_config
|
||||
from quantum.common import config
|
||||
from quantum.db import api as db
|
||||
from quantum.db import db_base_plugin_v2
|
||||
from quantum.db import securitygroups_db
|
||||
from quantum.extensions.extensions import PluginAwareExtensionManager
|
||||
from quantum.extensions import securitygroup as ext_sg
|
||||
from quantum.manager import QuantumManager
|
||||
from quantum.openstack.common import cfg
|
||||
from quantum.tests.unit import test_db_plugin
|
||||
from quantum.tests.unit import test_extensions
|
||||
from quantum.wsgi import JSONDeserializer
|
||||
|
||||
DB_PLUGIN_KLASS = ('quantum.tests.unit.test_extension_security_group.'
|
||||
'SecurityGroupTestPlugin')
|
||||
ROOTDIR = os.path.dirname(os.path.dirname(__file__))
|
||||
ETCDIR = os.path.join(ROOTDIR, 'etc')
|
||||
|
||||
|
||||
def etcdir(*p):
|
||||
return os.path.join(ETCDIR, *p)
|
||||
|
||||
|
||||
class SecurityGroupTestExtensionManager(object):
|
||||
|
||||
def get_resources(self):
|
||||
return ext_sg.Securitygroup.get_resources()
|
||||
|
||||
def get_actions(self):
|
||||
return []
|
||||
|
||||
def get_request_extensions(self):
|
||||
return []
|
||||
|
||||
|
||||
class SecurityGroupsTestCase(test_db_plugin.QuantumDbPluginV2TestCase,
|
||||
unittest2.TestCase):
|
||||
def setUp(self, plugin=None):
|
||||
super(SecurityGroupsTestCase, self).setUp()
|
||||
db._ENGINE = None
|
||||
db._MAKER = None
|
||||
# Make sure at each test a new instance of the plugin is returned
|
||||
QuantumManager._instance = None
|
||||
# Make sure at each test according extensions for the plugin is loaded
|
||||
PluginAwareExtensionManager._instance = None
|
||||
# Save the attributes map in case the plugin will alter it
|
||||
# loading extensions
|
||||
# Note(salvatore-orlando): shallow copy is not good enough in
|
||||
# this case, but copy.deepcopy does not seem to work, since it
|
||||
# causes test failures
|
||||
self._attribute_map_bk = {}
|
||||
for item in attributes.RESOURCE_ATTRIBUTE_MAP:
|
||||
self._attribute_map_bk[item] = (attributes.
|
||||
RESOURCE_ATTRIBUTE_MAP[item].
|
||||
copy())
|
||||
json_deserializer = JSONDeserializer()
|
||||
self._deserializers = {
|
||||
'application/json': json_deserializer,
|
||||
}
|
||||
|
||||
if not plugin:
|
||||
plugin = test_config.get('plugin_name_v2', DB_PLUGIN_KLASS)
|
||||
|
||||
# Create the default configurations
|
||||
args = ['--config-file', etcdir('quantum.conf.test')]
|
||||
# If test_config specifies some config-file, use it, as well
|
||||
for config_file in test_config.get('config_files', []):
|
||||
args.extend(['--config-file', config_file])
|
||||
config.parse(args=args)
|
||||
# Update the plugin
|
||||
cfg.CONF.set_override('core_plugin', plugin)
|
||||
self.api = APIRouter()
|
||||
|
||||
def _is_native_bulk_supported():
|
||||
plugin_obj = QuantumManager.get_plugin()
|
||||
native_bulk_attr_name = ("_%s__native_bulk_support"
|
||||
% plugin_obj.__class__.__name__)
|
||||
return getattr(plugin_obj, native_bulk_attr_name, False)
|
||||
|
||||
self._skip_native_bulk = not _is_native_bulk_supported()
|
||||
|
||||
QuantumManager.get_plugin().supported_extension_aliases = (
|
||||
["security-groups"])
|
||||
ext_mgr = SecurityGroupTestExtensionManager()
|
||||
if ext_mgr:
|
||||
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
|
||||
|
||||
def tearDown(self):
|
||||
super(SecurityGroupsTestCase, self).tearDown()
|
||||
db._ENGINE = None
|
||||
db._MAKER = None
|
||||
cfg.CONF.reset()
|
||||
# Restore the original attribute map
|
||||
attributes.RESOURCE_ATTRIBUTE_MAP = self._attribute_map_bk
|
||||
|
||||
def _create_security_group(self, fmt, name, description, external_id=None,
|
||||
**kwargs):
|
||||
|
||||
data = {'security_group': {'name': name,
|
||||
'tenant_id': kwargs.get('tenant_id',
|
||||
'test_tenant'),
|
||||
'description': description}}
|
||||
if external_id:
|
||||
data['security_group']['external_id'] = external_id
|
||||
security_group_req = self.new_create_request('security-groups', data,
|
||||
fmt)
|
||||
if (kwargs.get('set_context') and 'tenant_id' in kwargs):
|
||||
# create a specific auth context for this request
|
||||
security_group_req.environ['quantum.context'] = (
|
||||
context.Context('', kwargs['tenant_id']))
|
||||
return security_group_req.get_response(self.ext_api)
|
||||
|
||||
def _build_security_group_rule(self, security_group_id, direction,
|
||||
protocol, port_range_min, port_range_max,
|
||||
source_ip_prefix=None, source_group_id=None,
|
||||
external_id=None, tenant_id='test_tenant'):
|
||||
|
||||
data = {'security_group_rule': {'security_group_id': security_group_id,
|
||||
'direction': direction,
|
||||
'protocol': protocol,
|
||||
'port_range_min': port_range_min,
|
||||
'port_range_max': port_range_max,
|
||||
'tenant_id': tenant_id}}
|
||||
if external_id:
|
||||
data['security_group_rule']['external_id'] = external_id
|
||||
|
||||
if source_ip_prefix:
|
||||
data['security_group_rule']['source_ip_prefix'] = source_ip_prefix
|
||||
|
||||
if source_group_id:
|
||||
data['security_group_rule']['source_group_id'] = source_group_id
|
||||
|
||||
return data
|
||||
|
||||
def _create_security_group_rule(self, fmt, rules, **kwargs):
|
||||
|
||||
security_group_rule_req = self.new_create_request(
|
||||
'security-group-rules', rules, fmt)
|
||||
|
||||
if (kwargs.get('set_context') and 'tenant_id' in kwargs):
|
||||
# create a specific auth context for this request
|
||||
security_group_rule_req.environ['quantum.context'] = (
|
||||
context.Context('', kwargs['tenant_id']))
|
||||
return security_group_rule_req.get_response(self.ext_api)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def security_group(self, name='webservers', description='webservers',
|
||||
external_id=None, fmt='json', no_delete=False):
|
||||
res = self._create_security_group(fmt, name, description,
|
||||
external_id)
|
||||
security_group = self.deserialize(fmt, res)
|
||||
if res.status_int >= 400:
|
||||
raise webob.exc.HTTPClientError(code=res.status_int)
|
||||
yield security_group
|
||||
if not no_delete:
|
||||
self._delete('security-groups',
|
||||
security_group['security_group']['id'])
|
||||
|
||||
@contextlib.contextmanager
|
||||
def security_group_rule(self, security_group_id='4cd70774-cc67-4a87-9b39-7'
|
||||
'd1db38eb087',
|
||||
direction='ingress', protocol='tcp',
|
||||
port_range_min='22', port_range_max='22',
|
||||
source_ip_prefix=None, source_group_id=None,
|
||||
external_id=None, fmt='json', no_delete=False):
|
||||
|
||||
rule = self._build_security_group_rule(security_group_id, direction,
|
||||
protocol, port_range_min,
|
||||
port_range_max,
|
||||
source_ip_prefix,
|
||||
source_group_id, external_id)
|
||||
res = self._create_security_group_rule('json', rule)
|
||||
security_group_rule = self.deserialize(fmt, res)
|
||||
if res.status_int >= 400:
|
||||
raise webob.exc.HTTPClientError(code=res.status_int)
|
||||
yield security_group_rule
|
||||
if not no_delete:
|
||||
self._delete('security-group-rules',
|
||||
security_group_rule['security_group_rule']['id'])
|
||||
|
||||
|
||||
class SecurityGroupTestPlugin(db_base_plugin_v2.QuantumDbPluginV2,
|
||||
securitygroups_db.SecurityGroupDbMixin):
|
||||
""" Test plugin that implements necessary calls on create/delete port for
|
||||
associating ports with security groups.
|
||||
"""
|
||||
|
||||
supported_extension_aliases = ["security-group"]
|
||||
|
||||
def create_port(self, context, port):
|
||||
tenant_id = self._get_tenant_id_for_create(context, port['port'])
|
||||
default_sg = self._ensure_default_security_group(context, tenant_id)
|
||||
if not port['port'].get(ext_sg.SECURITYGROUP):
|
||||
port['port'][ext_sg.SECURITYGROUP] = [default_sg]
|
||||
self._validate_security_groups_on_port(context, port)
|
||||
session = context.session
|
||||
with session.begin(subtransactions=True):
|
||||
sgids = port['port'].get(ext_sg.SECURITYGROUP)
|
||||
port = super(SecurityGroupTestPlugin, self).create_port(context,
|
||||
port)
|
||||
self._process_port_create_security_group(context, port['id'],
|
||||
sgids)
|
||||
self._extend_port_dict_security_group(context, port)
|
||||
return port
|
||||
|
||||
def update_port(self, context, id, port):
|
||||
session = context.session
|
||||
with session.begin(subtransactions=True):
|
||||
self._validate_security_groups_on_port(context, port)
|
||||
# delete the port binding and read it with the new rules
|
||||
self._delete_port_security_group_bindings(context, id)
|
||||
self._process_port_create_security_group(context, id,
|
||||
port['port'].get(
|
||||
ext_sg.SECURITYGROUP))
|
||||
port = super(SecurityGroupTestPlugin, self).update_port(
|
||||
context, id, port)
|
||||
self._extend_port_dict_security_group(context, port)
|
||||
return port
|
||||
|
||||
def delete_port(self, context, id):
|
||||
session = context.session
|
||||
with session.begin(subtransactions=True):
|
||||
super(SecurityGroupTestPlugin, self).delete_port(context, id)
|
||||
self._delete_port_security_group_bindings(context, id)
|
||||
|
||||
def create_network(self, context, network):
|
||||
tenant_id = self._get_tenant_id_for_create(context, network['network'])
|
||||
self._ensure_default_security_group(context, tenant_id)
|
||||
return super(SecurityGroupTestPlugin, self).create_network(context,
|
||||
network)
|
||||
|
||||
|
||||
class SecurityGroupDBTestCase(SecurityGroupsTestCase):
|
||||
def setUp(self, plugin=None):
|
||||
test_config['plugin_name_v2'] = DB_PLUGIN_KLASS
|
||||
ext_mgr = SecurityGroupTestExtensionManager()
|
||||
test_config['extension_manager'] = ext_mgr
|
||||
super(SecurityGroupDBTestCase, self).setUp()
|
||||
|
||||
|
||||
class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
def test_create_security_group(self):
|
||||
name = 'webservers'
|
||||
description = 'my webservers'
|
||||
keys = [('name', name,), ('description', description)]
|
||||
with self.security_group(name, description) as security_group:
|
||||
for k, v, in keys:
|
||||
self.assertEquals(security_group['security_group'][k], v)
|
||||
|
||||
def test_create_security_group_external_id(self):
|
||||
cfg.CONF.SECURITYGROUP.proxy_mode = True
|
||||
name = 'webservers'
|
||||
description = 'my webservers'
|
||||
external_id = 10
|
||||
keys = [('name', name,), ('description', description),
|
||||
('external_id', external_id)]
|
||||
with self.security_group(name, description, external_id) as sg:
|
||||
for k, v, in keys:
|
||||
self.assertEquals(sg['security_group'][k], v)
|
||||
|
||||
def test_default_security_group(self):
|
||||
with self.network():
|
||||
res = self.new_list_request('security-groups')
|
||||
groups = self.deserialize('json', res.get_response(self.ext_api))
|
||||
self.assertEquals(len(groups['security_groups']), 1)
|
||||
|
||||
def test_create_security_group_proxy_mode_not_admin(self):
|
||||
cfg.CONF.SECURITYGROUP.proxy_mode = True
|
||||
res = self._create_security_group('json', 'webservers',
|
||||
'webservers', '1',
|
||||
tenant_id='bad_tenant',
|
||||
set_context=True)
|
||||
self.deserialize('json', res)
|
||||
self.assertEquals(res.status_int, 500)
|
||||
|
||||
def test_create_security_group_no_external_id_proxy_mode(self):
|
||||
cfg.CONF.SECURITYGROUP.proxy_mode = True
|
||||
res = self._create_security_group('json', 'webservers',
|
||||
'webservers')
|
||||
self.deserialize('json', res)
|
||||
self.assertEquals(res.status_int, 400)
|
||||
|
||||
def test_create_security_group_no_external_id_not_proxy_mode(self):
|
||||
res = self._create_security_group('json', 'webservers',
|
||||
'webservers', '1')
|
||||
self.deserialize('json', res)
|
||||
self.assertEquals(res.status_int, 409)
|
||||
|
||||
def test_create_default_security_group_fail(self):
|
||||
name = 'default'
|
||||
description = 'my webservers'
|
||||
res = self._create_security_group('json', name, description)
|
||||
self.deserialize('json', res)
|
||||
self.assertEquals(res.status_int, 409)
|
||||
|
||||
def test_create_security_group_duplicate_external_id(self):
|
||||
cfg.CONF.SECURITYGROUP.proxy_mode = True
|
||||
name = 'webservers'
|
||||
description = 'my webservers'
|
||||
external_id = 1
|
||||
with self.security_group(name, description, external_id):
|
||||
res = self._create_security_group('json', name, description,
|
||||
external_id)
|
||||
self.deserialize('json', res)
|
||||
self.assertEquals(res.status_int, 409)
|
||||
|
||||
def test_list_security_groups(self):
|
||||
name = 'webservers'
|
||||
description = 'my webservers'
|
||||
with self.security_group(name, description):
|
||||
res = self.new_list_request('security-groups')
|
||||
groups = self.deserialize('json', res.get_response(self.ext_api))
|
||||
self.assertEquals(len(groups['security_groups']), 2)
|
||||
|
||||
def test_get_security_group(self):
|
||||
name = 'webservers'
|
||||
description = 'my webservers'
|
||||
with self.security_group(name, description) as sg:
|
||||
source_group_id = sg['security_group']['id']
|
||||
res = self.new_show_request('security-groups', source_group_id)
|
||||
group = self.deserialize('json', res.get_response(self.ext_api))
|
||||
self.assertEquals(group['security_group']['id'], source_group_id)
|
||||
|
||||
def test_delete_security_group(self):
|
||||
name = 'webservers'
|
||||
description = 'my webservers'
|
||||
with self.security_group(name, description, no_delete=True) as sg:
|
||||
source_group_id = sg['security_group']['id']
|
||||
self._delete('security-groups', source_group_id, 204)
|
||||
|
||||
def test_delete_default_security_group_fail(self):
|
||||
with self.network():
|
||||
res = self.new_list_request('security-groups')
|
||||
sg = self.deserialize('json', res.get_response(self.ext_api))
|
||||
self._delete('security-groups', sg['security_groups'][0]['id'],
|
||||
409)
|
||||
|
||||
def test_default_security_group_rules(self):
|
||||
with self.network():
|
||||
res = self.new_list_request('security-groups')
|
||||
groups = self.deserialize('json', res.get_response(self.ext_api))
|
||||
self.assertEquals(len(groups['security_groups']), 1)
|
||||
res = self.new_list_request('security-group-rules')
|
||||
rules = self.deserialize('json', res.get_response(self.ext_api))
|
||||
self.assertEquals(len(rules['security_group_rules']), 4)
|
||||
# just generic rules to allow default egress and
|
||||
# intergroup communicartion
|
||||
for rule in rules['security_group_rules']:
|
||||
self.assertEquals(rule['port_range_max'], None)
|
||||
self.assertEquals(rule['port_range_min'], None)
|
||||
self.assertEquals(rule['protocol'], None)
|
||||
|
||||
def test_create_security_group_rule_source_ip_prefix(self):
|
||||
name = 'webservers'
|
||||
description = 'my webservers'
|
||||
with self.security_group(name, description) as sg:
|
||||
security_group_id = sg['security_group']['id']
|
||||
direction = "ingress"
|
||||
source_ip_prefix = "10.0.0.0/24"
|
||||
protocol = 'tcp'
|
||||
port_range_min = 22
|
||||
port_range_max = 22
|
||||
keys = [('source_ip_prefix', source_ip_prefix),
|
||||
('security_group_id', security_group_id),
|
||||
('direction', direction),
|
||||
('protocol', protocol),
|
||||
('port_range_min', port_range_min),
|
||||
('port_range_max', port_range_max)]
|
||||
with self.security_group_rule(security_group_id, direction,
|
||||
protocol, port_range_min,
|
||||
port_range_max,
|
||||
source_ip_prefix) as rule:
|
||||
for k, v, in keys:
|
||||
self.assertEquals(rule['security_group_rule'][k], v)
|
||||
|
||||
def test_create_security_group_rule_group_id(self):
|
||||
name = 'webservers'
|
||||
description = 'my webservers'
|
||||
with self.security_group(name, description) as sg:
|
||||
with self.security_group(name, description) as sg2:
|
||||
security_group_id = sg['security_group']['id']
|
||||
direction = "ingress"
|
||||
source_group_id = sg2['security_group']['id']
|
||||
protocol = 'tcp'
|
||||
port_range_min = 22
|
||||
port_range_max = 22
|
||||
keys = [('source_group_id', source_group_id),
|
||||
('security_group_id', security_group_id),
|
||||
('direction', direction),
|
||||
('protocol', protocol),
|
||||
('port_range_min', port_range_min),
|
||||
('port_range_max', port_range_max)]
|
||||
with self.security_group_rule(security_group_id, direction,
|
||||
protocol, port_range_min,
|
||||
port_range_max,
|
||||
source_group_id=source_group_id
|
||||
) as rule:
|
||||
for k, v, in keys:
|
||||
self.assertEquals(rule['security_group_rule'][k], v)
|
||||
|
||||
def test_create_security_group_source_group_ip_and_ip_prefix(self):
|
||||
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
|
||||
direction = "ingress"
|
||||
source_ip_prefix = "10.0.0.0/24"
|
||||
protocol = 'tcp'
|
||||
port_range_min = 22
|
||||
port_range_max = 22
|
||||
source_group_id = "9cd70774-cc67-4a87-9b39-7d1db38eb087"
|
||||
rule = self._build_security_group_rule(security_group_id, direction,
|
||||
protocol, port_range_min,
|
||||
port_range_max,
|
||||
source_ip_prefix,
|
||||
source_group_id)
|
||||
res = self._create_security_group_rule('json', rule)
|
||||
self.deserialize('json', res)
|
||||
self.assertEquals(res.status_int, 400)
|
||||
|
||||
def test_create_security_group_rule_bad_security_group_id(self):
|
||||
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
|
||||
direction = "ingress"
|
||||
source_ip_prefix = "10.0.0.0/24"
|
||||
protocol = 'tcp'
|
||||
port_range_min = 22
|
||||
port_range_max = 22
|
||||
rule = self._build_security_group_rule(security_group_id, direction,
|
||||
protocol, port_range_min,
|
||||
port_range_max,
|
||||
source_ip_prefix)
|
||||
res = self._create_security_group_rule('json', rule)
|
||||
self.deserialize('json', res)
|
||||
self.assertEquals(res.status_int, 404)
|
||||
|
||||
def test_create_security_group_rule_bad_tenant(self):
|
||||
with self.security_group() as sg:
|
||||
rule = {'security_group_rule':
|
||||
{'security_group_id': sg['security_group']['id'],
|
||||
'direction': 'ingress',
|
||||
'protocol': 'tcp',
|
||||
'port_range_min': '22',
|
||||
'port_range_max': '22',
|
||||
'tenant_id': "bad_tenant"}}
|
||||
|
||||
res = self._create_security_group_rule('json', rule)
|
||||
self.deserialize('json', res)
|
||||
self.assertEquals(res.status_int, 404)
|
||||
|
||||
def test_create_security_group_rule_exteral_id_proxy_mode(self):
|
||||
cfg.CONF.SECURITYGROUP.proxy_mode = True
|
||||
with self.security_group(external_id=1) as sg:
|
||||
rule = {'security_group_rule':
|
||||
{'security_group_id': sg['security_group']['id'],
|
||||
'direction': 'ingress',
|
||||
'protocol': 'tcp',
|
||||
'port_range_min': '22',
|
||||
'port_range_max': '22',
|
||||
'external_id': '1',
|
||||
'tenant_id': 'test_tenant',
|
||||
'source_group_id': sg['security_group']['id']}}
|
||||
|
||||
res = self._create_security_group_rule('json', rule)
|
||||
self.deserialize('json', res)
|
||||
self.assertEquals(res.status_int, 201)
|
||||
|
||||
def test_create_security_group_rule_exteral_id_not_proxy_mode(self):
|
||||
with self.security_group() as sg:
|
||||
rule = {'security_group_rule':
|
||||
{'security_group_id': sg['security_group']['id'],
|
||||
'direction': 'ingress',
|
||||
'protocol': 'tcp',
|
||||
'port_range_min': '22',
|
||||
'port_range_max': '22',
|
||||
'external_id': 1,
|
||||
'tenant_id': 'test_tenant',
|
||||
'source_group_id': sg['security_group']['id']}}
|
||||
|
||||
res = self._create_security_group_rule('json', rule)
|
||||
self.deserialize('json', res)
|
||||
self.assertEquals(res.status_int, 409)
|
||||
|
||||
def test_create_security_group_rule_not_admin(self):
|
||||
cfg.CONF.SECURITYGROUP.proxy_mode = True
|
||||
with self.security_group(external_id='1') as sg:
|
||||
rule = {'security_group_rule':
|
||||
{'security_group_id': sg['security_group']['id'],
|
||||
'direction': 'ingress',
|
||||
'protocol': 'tcp',
|
||||
'port_range_min': '22',
|
||||
'port_range_max': '22',
|
||||
'tenant_id': 'bad_tenant',
|
||||
'external_id': 1,
|
||||
'source_group_id': sg['security_group']['id']}}
|
||||
|
||||
res = self._create_security_group_rule('json', rule,
|
||||
tenant_id='bad_tenant',
|
||||
set_context=True)
|
||||
self.deserialize('json', res)
|
||||
self.assertEquals(res.status_int, 500)
|
||||
|
||||
def test_create_security_group_rule_bad_tenant_source_group_id(self):
|
||||
with self.security_group() as sg:
|
||||
res = self._create_security_group('json', 'webservers',
|
||||
'webservers',
|
||||
tenant_id='bad_tenant')
|
||||
sg2 = self.deserialize('json', res)
|
||||
rule = {'security_group_rule':
|
||||
{'security_group_id': sg2['security_group']['id'],
|
||||
'direction': 'ingress',
|
||||
'protocol': 'tcp',
|
||||
'port_range_min': '22',
|
||||
'port_range_max': '22',
|
||||
'tenant_id': 'bad_tenant',
|
||||
'source_group_id': sg['security_group']['id']}}
|
||||
|
||||
res = self._create_security_group_rule('json', rule,
|
||||
tenant_id='bad_tenant',
|
||||
set_context=True)
|
||||
self.deserialize('json', res)
|
||||
self.assertEquals(res.status_int, 404)
|
||||
|
||||
def test_create_security_group_rule_bad_tenant_security_group_rule(self):
|
||||
with self.security_group() as sg:
|
||||
res = self._create_security_group('json', 'webservers',
|
||||
'webservers',
|
||||
tenant_id='bad_tenant')
|
||||
self.deserialize('json', res)
|
||||
rule = {'security_group_rule':
|
||||
{'security_group_id': sg['security_group']['id'],
|
||||
'direction': 'ingress',
|
||||
'protocol': 'tcp',
|
||||
'port_range_min': '22',
|
||||
'port_range_max': '22',
|
||||
'tenant_id': 'bad_tenant'}}
|
||||
|
||||
res = self._create_security_group_rule('json', rule,
|
||||
tenant_id='bad_tenant',
|
||||
set_context=True)
|
||||
self.deserialize('json', res)
|
||||
self.assertEquals(res.status_int, 404)
|
||||
|
||||
def test_create_security_group_rule_bad_source_group_id(self):
|
||||
name = 'webservers'
|
||||
description = 'my webservers'
|
||||
with self.security_group(name, description) as sg:
|
||||
security_group_id = sg['security_group']['id']
|
||||
source_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
|
||||
direction = "ingress"
|
||||
protocol = 'tcp'
|
||||
port_range_min = 22
|
||||
port_range_max = 22
|
||||
rule = self._build_security_group_rule(security_group_id, direction,
|
||||
protocol, port_range_min,
|
||||
port_range_max,
|
||||
source_group_id=source_group_id)
|
||||
res = self._create_security_group_rule('json', rule)
|
||||
self.deserialize('json', res)
|
||||
self.assertEquals(res.status_int, 404)
|
||||
|
||||
def test_create_security_group_rule_duplicate_rules(self):
|
||||
name = 'webservers'
|
||||
description = 'my webservers'
|
||||
with self.security_group(name, description) as sg:
|
||||
security_group_id = sg['security_group']['id']
|
||||
with self.security_group_rule(security_group_id):
|
||||
rule = self._build_security_group_rule(
|
||||
sg['security_group']['id'], 'ingress', 'tcp', '22', '22')
|
||||
self._create_security_group_rule('json', rule)
|
||||
res = self._create_security_group_rule('json', rule)
|
||||
self.deserialize('json', res)
|
||||
self.assertEquals(res.status_int, 409)
|
||||
|
||||
def test_create_security_group_rule_min_port_greater_max(self):
|
||||
name = 'webservers'
|
||||
description = 'my webservers'
|
||||
with self.security_group(name, description) as sg:
|
||||
security_group_id = sg['security_group']['id']
|
||||
with self.security_group_rule(security_group_id):
|
||||
rule = self._build_security_group_rule(
|
||||
sg['security_group']['id'], 'ingress', 'tcp', '50', '22')
|
||||
self._create_security_group_rule('json', rule)
|
||||
res = self._create_security_group_rule('json', rule)
|
||||
self.deserialize('json', res)
|
||||
self.assertEquals(res.status_int, 400)
|
||||
|
||||
def test_create_security_group_rule_ports_but_no_protocol(self):
|
||||
name = 'webservers'
|
||||
description = 'my webservers'
|
||||
with self.security_group(name, description) as sg:
|
||||
security_group_id = sg['security_group']['id']
|
||||
with self.security_group_rule(security_group_id):
|
||||
rule = self._build_security_group_rule(
|
||||
sg['security_group']['id'], 'ingress', None, '22', '22')
|
||||
self._create_security_group_rule('json', rule)
|
||||
res = self._create_security_group_rule('json', rule)
|
||||
self.deserialize('json', res)
|
||||
self.assertEquals(res.status_int, 400)
|
||||
|
||||
def test_update_port_with_security_group(self):
|
||||
with self.network() as n:
|
||||
with self.subnet(n):
|
||||
with self.security_group() as sg:
|
||||
res = self._create_port('json', n['network']['id'])
|
||||
port = self.deserialize('json', res)
|
||||
|
||||
data = {'port': {'fixed_ips': port['port']['fixed_ips'],
|
||||
'name': port['port']['name'],
|
||||
ext_sg.SECURITYGROUP:
|
||||
[sg['security_group']['id']]}}
|
||||
|
||||
req = self.new_update_request('ports', data,
|
||||
port['port']['id'])
|
||||
res = self.deserialize('json', req.get_response(self.api))
|
||||
self.assertEquals(res['port'][ext_sg.SECURITYGROUP][0],
|
||||
sg['security_group']['id'])
|
||||
self._delete('ports', port['port']['id'])
|
||||
|
||||
def test_update_port_with_multiple_security_groups(self):
|
||||
with self.network() as n:
|
||||
with self.subnet(n):
|
||||
with self.security_group() as sg1:
|
||||
with self.security_group() as sg2:
|
||||
res = self._create_port(
|
||||
'json', n['network']['id'],
|
||||
security_groups=[sg1['security_group']['id'],
|
||||
sg2['security_group']['id']])
|
||||
port = self.deserialize('json', res)
|
||||
self.assertEquals(len(
|
||||
port['port'][ext_sg.SECURITYGROUP]), 2)
|
||||
self._delete('ports', port['port']['id'])
|
||||
|
||||
def test_update_port_remove_security_group(self):
|
||||
with self.network() as n:
|
||||
with self.subnet(n):
|
||||
with self.security_group() as sg:
|
||||
res = self._create_port('json', n['network']['id'],
|
||||
security_groups=(
|
||||
[sg['security_group']['id']]))
|
||||
port = self.deserialize('json', res)
|
||||
|
||||
data = {'port': {'fixed_ips': port['port']['fixed_ips'],
|
||||
'name': port['port']['name']}}
|
||||
|
||||
req = self.new_update_request('ports', data,
|
||||
port['port']['id'])
|
||||
res = self.deserialize('json', req.get_response(self.api))
|
||||
self.assertEquals(res['port'][ext_sg.SECURITYGROUP], [])
|
||||
self._delete('ports', port['port']['id'])
|
||||
|
||||
def test_create_port_with_bad_security_group(self):
|
||||
with self.network() as n:
|
||||
with self.subnet(n):
|
||||
res = self._create_port('json', n['network']['id'],
|
||||
security_groups=['bad_id'])
|
||||
|
||||
self.deserialize('json', res)
|
||||
self.assertEquals(res.status_int, 404)
|
||||
|
||||
def test_create_delete_security_group_port_in_use(self):
|
||||
with self.network() as n:
|
||||
with self.subnet(n):
|
||||
with self.security_group() as sg:
|
||||
res = self._create_port('json', n['network']['id'],
|
||||
security_groups=(
|
||||
[sg['security_group']['id']]))
|
||||
port = self.deserialize('json', res)
|
||||
self.assertEquals(port['port'][ext_sg.SECURITYGROUP][0],
|
||||
sg['security_group']['id'])
|
||||
# try to delete security group that's in use
|
||||
res = self._delete('security-groups',
|
||||
sg['security_group']['id'], 409)
|
||||
# delete the blocking port
|
||||
self._delete('ports', port['port']['id'])
|
||||
|
||||
def test_create_security_group_rule_bulk_native(self):
|
||||
if self._skip_native_bulk:
|
||||
self.skipTest("Plugin does not support native bulk "
|
||||
"security_group_rule create")
|
||||
with self.security_group() as sg:
|
||||
rule1 = self._build_security_group_rule(sg['security_group']['id'],
|
||||
'ingress', 'tcp', '22',
|
||||
'22', '10.0.0.1/24')
|
||||
rule2 = self._build_security_group_rule(sg['security_group']['id'],
|
||||
'ingress', 'tcp', '23',
|
||||
'23', '10.0.0.1/24')
|
||||
rules = {'security_group_rules': [rule1['security_group_rule'],
|
||||
rule2['security_group_rule']]}
|
||||
res = self._create_security_group_rule('json', rules)
|
||||
self.deserialize('json', res)
|
||||
self.assertEquals(res.status_int, 201)
|
||||
|
||||
def test_create_security_group_rule_bulk_emulated(self):
|
||||
real_has_attr = hasattr
|
||||
|
||||
#ensures the API choose the emulation code path
|
||||
def fakehasattr(item, attr):
|
||||
if attr.endswith('__native_bulk_support'):
|
||||
return False
|
||||
return real_has_attr(item, attr)
|
||||
|
||||
with mock.patch('__builtin__.hasattr',
|
||||
new=fakehasattr):
|
||||
with self.security_group() as sg:
|
||||
rule1 = self._build_security_group_rule(
|
||||
sg['security_group']['id'], 'ingress', 'tcp', '22', '22',
|
||||
'10.0.0.1/24')
|
||||
rule2 = self._build_security_group_rule(
|
||||
sg['security_group']['id'], 'ingress', 'tcp', '23', '23',
|
||||
'10.0.0.1/24')
|
||||
rules = {'security_group_rules': [rule1['security_group_rule'],
|
||||
rule2['security_group_rule']]
|
||||
}
|
||||
res = self._create_security_group_rule('json', rules)
|
||||
self.deserialize('json', res)
|
||||
self.assertEquals(res.status_int, 201)
|
||||
|
||||
def test_create_security_group_rule_duplicate_rule_in_post(self):
|
||||
if self._skip_native_bulk:
|
||||
self.skipTest("Plugin does not support native bulk "
|
||||
"security_group_rule create")
|
||||
with self.security_group() as sg:
|
||||
rule = self._build_security_group_rule(sg['security_group']['id'],
|
||||
'ingress', 'tcp', '22',
|
||||
'22', '10.0.0.1/24')
|
||||
rules = {'security_group_rules': [rule['security_group_rule'],
|
||||
rule['security_group_rule']]}
|
||||
res = self._create_security_group_rule('json', rules)
|
||||
rule = self.deserialize('json', res)
|
||||
self.assertEquals(res.status_int, 409)
|
||||
|
||||
def test_create_security_group_rule_duplicate_rule_in_post_emulated(self):
|
||||
real_has_attr = hasattr
|
||||
|
||||
#ensures the API choose the emulation code path
|
||||
def fakehasattr(item, attr):
|
||||
if attr.endswith('__native_bulk_support'):
|
||||
return False
|
||||
return real_has_attr(item, attr)
|
||||
|
||||
with mock.patch('__builtin__.hasattr',
|
||||
new=fakehasattr):
|
||||
|
||||
with self.security_group() as sg:
|
||||
rule = self._build_security_group_rule(
|
||||
sg['security_group']['id'], 'ingress', 'tcp', '22', '22',
|
||||
'10.0.0.1/24')
|
||||
rules = {'security_group_rules': [rule['security_group_rule'],
|
||||
rule['security_group_rule']]}
|
||||
res = self._create_security_group_rule('json', rules)
|
||||
rule = self.deserialize('json', res)
|
||||
self.assertEquals(res.status_int, 409)
|
||||
|
||||
def test_create_security_group_rule_duplicate_rule_db(self):
|
||||
if self._skip_native_bulk:
|
||||
self.skipTest("Plugin does not support native bulk "
|
||||
"security_group_rule create")
|
||||
with self.security_group() as sg:
|
||||
rule = self._build_security_group_rule(sg['security_group']['id'],
|
||||
'ingress', 'tcp', '22',
|
||||
'22', '10.0.0.1/24')
|
||||
rules = {'security_group_rules': [rule]}
|
||||
self._create_security_group_rule('json', rules)
|
||||
res = self._create_security_group_rule('json', rules)
|
||||
rule = self.deserialize('json', res)
|
||||
self.assertEquals(res.status_int, 409)
|
||||
|
||||
def test_create_security_group_rule_duplicate_rule_db_emulated(self):
|
||||
real_has_attr = hasattr
|
||||
|
||||
#ensures the API choose the emulation code path
|
||||
def fakehasattr(item, attr):
|
||||
if attr.endswith('__native_bulk_support'):
|
||||
return False
|
||||
return real_has_attr(item, attr)
|
||||
|
||||
with mock.patch('__builtin__.hasattr',
|
||||
new=fakehasattr):
|
||||
with self.security_group() as sg:
|
||||
rule = self._build_security_group_rule(
|
||||
sg['security_group']['id'], 'ingress', 'tcp', '22', '22',
|
||||
'10.0.0.1/24')
|
||||
rules = {'security_group_rules': [rule]}
|
||||
self._create_security_group_rule('json', rules)
|
||||
res = self._create_security_group_rule('json', rule)
|
||||
self.deserialize('json', res)
|
||||
self.assertEquals(res.status_int, 409)
|
||||
|
||||
def test_create_security_group_rule_differnt_security_group_ids(self):
|
||||
if self._skip_native_bulk:
|
||||
self.skipTest("Plugin does not support native bulk "
|
||||
"security_group_rule create")
|
||||
with self.security_group() as sg1:
|
||||
with self.security_group() as sg2:
|
||||
rule1 = self._build_security_group_rule(
|
||||
sg1['security_group']['id'], 'ingress', 'tcp', '22', '22',
|
||||
'10.0.0.1/24')
|
||||
rule2 = self._build_security_group_rule(
|
||||
sg2['security_group']['id'], 'ingress', 'tcp', '23', '23',
|
||||
'10.0.0.1/24')
|
||||
|
||||
rules = {'security_group_rules': [rule1['security_group_rule'],
|
||||
rule2['security_group_rule']]
|
||||
}
|
||||
res = self._create_security_group_rule('json', rules)
|
||||
self.deserialize('json', res)
|
||||
self.assertEquals(res.status_int, 400)
|
Loading…
x
Reference in New Issue
Block a user