[NSX-v]: Validate DNS search domain values at API level
Currently the DNS search domain value is validated in the backend. But this validation takes place at the time of instance creation, since that is when the static edge bindings are created and pushed to the backend. This allows the subnet create/update operations to succeed even if an invalid DNS search domain value is specified. This patch adds validation to the DNS search domain extension. Change-Id: Ib392e8695f40023219df93b6889366ce0a305423
This commit is contained in:
parent
01ad2eb932
commit
4c00489dbb
@ -12,17 +12,65 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import re
|
||||
|
||||
from neutron.api import extensions
|
||||
from neutron.api.v2 import attributes
|
||||
|
||||
|
||||
DNS_LABEL_MAX_LEN = 63
|
||||
DNS_LABEL_REGEX = "[a-zA-Z0-9-]{1,%d}$" % DNS_LABEL_MAX_LEN
|
||||
|
||||
|
||||
def _validate_dns_format(data):
|
||||
if not data:
|
||||
return
|
||||
try:
|
||||
# Allow values ending in period '.'
|
||||
trimmed = data if not data.endswith('.') else data[:-1]
|
||||
names = trimmed.split('.')
|
||||
for name in names:
|
||||
if not name:
|
||||
raise TypeError(_("Encountered an empty component"))
|
||||
if name.endswith('-') or name[0] == '-':
|
||||
raise TypeError(
|
||||
_("Name '%s' must not start or end with a hyphen") % name)
|
||||
if not re.match(DNS_LABEL_REGEX, name):
|
||||
raise TypeError(
|
||||
_("Name '%s' must be 1-63 characters long, each of "
|
||||
"which can only be alphanumeric or a hyphen") % name)
|
||||
# RFC 1123 hints that a TLD can't be all numeric. last is a TLD if
|
||||
# it's an FQDN.
|
||||
if len(names) > 1 and re.match("^[0-9]+$", names[-1]):
|
||||
raise TypeError(_("TLD '%s' must not be all numeric") % names[-1])
|
||||
except TypeError as e:
|
||||
msg = _("'%(data)s' not a valid DNS search domain. Reason: "
|
||||
"%(reason)s") % {'data': data, 'reason': str(e)}
|
||||
return msg
|
||||
|
||||
|
||||
def _validate_dns_search_domain(data, max_len=attributes.NAME_MAX_LEN):
|
||||
msg = attributes._validate_string(data, max_len)
|
||||
if msg:
|
||||
return msg
|
||||
if not data:
|
||||
return
|
||||
msg = _validate_dns_format(data)
|
||||
if msg:
|
||||
return msg
|
||||
|
||||
|
||||
attributes.validators['type:dns_search_domain'] = (_validate_dns_search_domain)
|
||||
|
||||
|
||||
DNS_SEARCH_DOMAIN = 'dns_search_domain'
|
||||
EXTENDED_ATTRIBUTES_2_0 = {
|
||||
'subnets': {
|
||||
DNS_SEARCH_DOMAIN: {'allow_post': True, 'allow_put': True,
|
||||
'default': attributes.ATTR_NOT_SPECIFIED,
|
||||
'validate': {'type:string': None},
|
||||
'is_visible': True},
|
||||
DNS_SEARCH_DOMAIN: {
|
||||
'allow_post': True, 'allow_put': True,
|
||||
'default': attributes.ATTR_NOT_SPECIFIED,
|
||||
'validate': {'type:dns_search_domain': attributes.NAME_MAX_LEN},
|
||||
'is_visible': True},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -60,14 +60,20 @@ class DnsSearchDomainExtensionTestCase(test_plugin.NsxVPluginV2TestCase):
|
||||
'dns_search_domain': dns_search_domain}}
|
||||
subnet_req = self.new_create_request('subnets', data)
|
||||
res = subnet_req.get_response(self.api)
|
||||
return self.deserialize(self.fmt, res)
|
||||
return res
|
||||
|
||||
def test_subnet_create_with_dns_search_domain(self):
|
||||
sub = self._create_subnet_with_dns_search_domain('vmware.com')
|
||||
res = self._create_subnet_with_dns_search_domain('vmware.com')
|
||||
sub = self.deserialize(self.fmt, res)
|
||||
self.assertEqual('vmware.com', sub['subnet']['dns_search_domain'])
|
||||
|
||||
def test_subnet_create_with_invalid_dns_search_domain_fail(self):
|
||||
res = self._create_subnet_with_dns_search_domain('vmw@re.com')
|
||||
self.assertEqual(400, res.status_int)
|
||||
|
||||
def test_subnet_update_with_dns_search_domain(self):
|
||||
sub = self._create_subnet_with_dns_search_domain('vmware.com')
|
||||
res = self._create_subnet_with_dns_search_domain('vmware.com')
|
||||
sub = self.deserialize(self.fmt, res)
|
||||
data = {'subnet': {'dns_search_domain': 'eng.vmware.com'}}
|
||||
req = self.new_update_request('subnets', data, sub['subnet']['id'])
|
||||
updated_sub = self.deserialize(self.fmt, req.get_response(self.api))
|
||||
|
Loading…
x
Reference in New Issue
Block a user