extra_dhcp_opt add checks for empty strings

When passing extra-dhcp-opt into the port-create where an empty string
is provided as opt_value='   ', the create and update routine will load the
empty string into the DB. The result when written to the opts file is:
"tag:ece4c8aa-15c9-4f6b-8c42-7d4e285734bf,option:server-ip-address", which
when read by dnsmasq has been tested to cause dnsmasq to segment fault.

Change-Id: I31de4a3d27092bb219d20221c5ef5a6b22e050dc
Closes-Bug: #1257467
This commit is contained in:
dekehn 2013-12-03 15:28:55 -07:00
parent c688211223
commit 6a6a4b1088
4 changed files with 113 additions and 40 deletions

View File

@ -74,9 +74,22 @@ def _validate_values(data, valid_values=None):
return msg return msg
def _validate_not_empty_string_or_none(data, max_len=None):
if data is not None:
return _validate_not_empty_string(data, max_len=max_len)
def _validate_not_empty_string(data, max_len=None):
msg = _validate_string(data, max_len=max_len)
if msg:
return msg
if not data.strip():
return _("'%s' Blank strings are not permitted") % data
def _validate_string_or_none(data, max_len=None): def _validate_string_or_none(data, max_len=None):
if data is not None: if data is not None:
return _validate_string(data, max_len=None) return _validate_string(data, max_len=max_len)
def _validate_string(data, max_len=None): def _validate_string(data, max_len=None):
@ -527,6 +540,9 @@ validators = {'type:dict': _validate_dict,
'type:regex': _validate_regex, 'type:regex': _validate_regex,
'type:string': _validate_string, 'type:string': _validate_string,
'type:string_or_none': _validate_string_or_none, 'type:string_or_none': _validate_string_or_none,
'type:not_empty_string': _validate_not_empty_string,
'type:not_empty_string_or_none':
_validate_not_empty_string_or_none,
'type:subnet': _validate_subnet, 'type:subnet': _validate_subnet,
'type:subnet_list': _validate_subnet_list, 'type:subnet_list': _validate_subnet_list,
'type:uuid': _validate_uuid, 'type:uuid': _validate_uuid,

View File

@ -26,8 +26,7 @@ class ExtraDhcpOptNotFound(exceptions.NotFound):
class ExtraDhcpOptBadData(exceptions.InvalidInput): class ExtraDhcpOptBadData(exceptions.InvalidInput):
message = _("Invalid data format for extra-dhcp-opt, " message = _("Invalid data format for extra-dhcp-opt: %(data)s")
"provide a list of dicts: %(data)s")
def _validate_list_of_dict_or_none(data, key_specs=None): def _validate_list_of_dict_or_none(data, key_specs=None):
@ -39,6 +38,7 @@ def _validate_list_of_dict_or_none(data, key_specs=None):
if msg: if msg:
raise ExtraDhcpOptBadData(data=msg) raise ExtraDhcpOptBadData(data=msg)
attr.validators['type:list_of_dict_or_none'] = _validate_list_of_dict_or_none attr.validators['type:list_of_dict_or_none'] = _validate_list_of_dict_or_none
# Attribute Map # Attribute Map
@ -54,8 +54,9 @@ EXTENDED_ATTRIBUTES_2_0 = {
'validate': { 'validate': {
'type:list_of_dict_or_none': { 'type:list_of_dict_or_none': {
'id': {'type:uuid': None, 'required': False}, 'id': {'type:uuid': None, 'required': False},
'opt_name': {'type:string': None, 'required': True}, 'opt_name': {'type:not_empty_string': None,
'opt_value': {'type:string_or_none': None, 'required': True},
'opt_value': {'type:not_empty_string_or_none': None,
'required': True}}}}}} 'required': True}}}}}}

View File

@ -65,6 +65,24 @@ class TestAttributes(base.BaseTestCase):
msg = attributes._validate_values(7, (4, 6)) msg = attributes._validate_values(7, (4, 6))
self.assertEqual(msg, "'7' is not in (4, 6)") self.assertEqual(msg, "'7' is not in (4, 6)")
def test_validate_not_empty_string(self):
msg = attributes._validate_not_empty_string(' ', None)
self.assertEqual(msg, u"' ' Blank strings are not permitted")
def test_validate_not_empty_string_or_none(self):
msg = attributes._validate_not_empty_string_or_none(' ', None)
self.assertEqual(msg, u"' ' Blank strings are not permitted")
msg = attributes._validate_not_empty_string_or_none(None, None)
self.assertIsNone(msg)
def test_validate_string_or_none(self):
msg = attributes._validate_not_empty_string_or_none('test', None)
self.assertIsNone(msg)
msg = attributes._validate_not_empty_string_or_none(None, None)
self.assertIsNone(msg)
def test_validate_string(self): def test_validate_string(self):
msg = attributes._validate_string(None, None) msg = attributes._validate_string(None, None)
self.assertEqual(msg, "'None' is not a valid string") self.assertEqual(msg, "'None' is not a valid string")

View File

@ -17,6 +17,7 @@
# #
import copy import copy
import webob.exc
from neutron.db import db_base_plugin_v2 from neutron.db import db_base_plugin_v2
from neutron.db import extradhcpopt_db as edo_db from neutron.db import extradhcpopt_db as edo_db
@ -87,22 +88,22 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase):
port['port'][edo_ext.EXTRADHCPOPTS]) port['port'][edo_ext.EXTRADHCPOPTS])
def test_create_port_with_none_extradhcpopts(self): def test_create_port_with_none_extradhcpopts(self):
new_list = [{'opt_name': 'bootfile-name', opt_list = [{'opt_name': 'bootfile-name',
'opt_value': None}, 'opt_value': None},
{'opt_name': 'server-ip-address', {'opt_name': 'server-ip-address',
'opt_value': '123.123.123.456'}, 'opt_value': '123.123.123.456'},
{'opt_name': 'tftp-server', {'opt_name': 'tftp-server',
'opt_value': '123.123.123.123'}] 'opt_value': '123.123.123.123'}]
new_dict = [{'opt_name': 'server-ip-address', expected = [{'opt_name': 'server-ip-address',
'opt_value': '123.123.123.456'}, 'opt_value': '123.123.123.456'},
{'opt_name': 'tftp-server', {'opt_name': 'tftp-server',
'opt_value': '123.123.123.123'}] 'opt_value': '123.123.123.123'}]
params = {edo_ext.EXTRADHCPOPTS: new_list, params = {edo_ext.EXTRADHCPOPTS: opt_list,
'arg_list': (edo_ext.EXTRADHCPOPTS,)} 'arg_list': (edo_ext.EXTRADHCPOPTS,)}
with self.port(**params) as port: with self.port(**params) as port:
self._check_opts(new_dict, self._check_opts(expected,
port['port'][edo_ext.EXTRADHCPOPTS]) port['port'][edo_ext.EXTRADHCPOPTS])
def test_update_port_with_extradhcpopts_with_same(self): def test_update_port_with_extradhcpopts_with_same(self):
@ -112,8 +113,8 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase):
{'opt_name': 'server-ip-address', {'opt_name': 'server-ip-address',
'opt_value': '123.123.123.456'}] 'opt_value': '123.123.123.456'}]
upd_opts = [{'opt_name': 'bootfile-name', 'opt_value': 'changeme.0'}] upd_opts = [{'opt_name': 'bootfile-name', 'opt_value': 'changeme.0'}]
new_opts = opt_list[:] expected_opts = opt_list[:]
for i in new_opts: for i in expected_opts:
if i['opt_name'] == upd_opts[0]['opt_name']: if i['opt_name'] == upd_opts[0]['opt_name']:
i['opt_value'] = upd_opts[0]['opt_value'] i['opt_value'] = upd_opts[0]['opt_value']
break break
@ -127,7 +128,27 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase):
req = self.new_update_request('ports', update_port, req = self.new_update_request('ports', update_port,
port['port']['id']) port['port']['id'])
port = self.deserialize('json', req.get_response(self.api)) port = self.deserialize('json', req.get_response(self.api))
self._check_opts(new_opts, self._check_opts(expected_opts,
port['port'][edo_ext.EXTRADHCPOPTS])
def test_update_port_with_additional_extradhcpopt(self):
opt_list = [{'opt_name': 'tftp-server',
'opt_value': '123.123.123.123'},
{'opt_name': 'server-ip-address',
'opt_value': '123.123.123.456'}]
upd_opts = [{'opt_name': 'bootfile-name', 'opt_value': 'changeme.0'}]
expected_opts = copy.deepcopy(opt_list)
expected_opts.append(upd_opts[0])
params = {edo_ext.EXTRADHCPOPTS: opt_list,
'arg_list': (edo_ext.EXTRADHCPOPTS,)}
with self.port(**params) as port:
update_port = {'port': {edo_ext.EXTRADHCPOPTS: upd_opts}}
req = self.new_update_request('ports', update_port,
port['port']['id'])
port = self.deserialize('json', req.get_response(self.api))
self._check_opts(expected_opts,
port['port'][edo_ext.EXTRADHCPOPTS]) port['port'][edo_ext.EXTRADHCPOPTS])
def test_update_port_with_extradhcpopts(self): def test_update_port_with_extradhcpopts(self):
@ -137,8 +158,8 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase):
{'opt_name': 'server-ip-address', {'opt_name': 'server-ip-address',
'opt_value': '123.123.123.456'}] 'opt_value': '123.123.123.456'}]
upd_opts = [{'opt_name': 'bootfile-name', 'opt_value': 'changeme.0'}] upd_opts = [{'opt_name': 'bootfile-name', 'opt_value': 'changeme.0'}]
new_opts = copy.deepcopy(opt_list) expected_opts = copy.deepcopy(opt_list)
for i in new_opts: for i in expected_opts:
if i['opt_name'] == upd_opts[0]['opt_name']: if i['opt_name'] == upd_opts[0]['opt_name']:
i['opt_value'] = upd_opts[0]['opt_value'] i['opt_value'] = upd_opts[0]['opt_value']
break break
@ -152,28 +173,7 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase):
req = self.new_update_request('ports', update_port, req = self.new_update_request('ports', update_port,
port['port']['id']) port['port']['id'])
port = self.deserialize('json', req.get_response(self.api)) port = self.deserialize('json', req.get_response(self.api))
self._check_opts(new_opts, self._check_opts(expected_opts,
port['port'][edo_ext.EXTRADHCPOPTS])
def test_update_port_with_additional_extradhcpopt(self):
opt_list = [{'opt_name': 'tftp-server',
'opt_value': '123.123.123.123'},
{'opt_name': 'server-ip-address',
'opt_value': '123.123.123.456'}]
upd_opts = [{'opt_name': 'bootfile-name', 'opt_value': 'changeme.0'}]
new_opts = copy.deepcopy(opt_list)
new_opts.append(upd_opts[0])
params = {edo_ext.EXTRADHCPOPTS: opt_list,
'arg_list': (edo_ext.EXTRADHCPOPTS,)}
with self.port(**params) as port:
update_port = {'port': {edo_ext.EXTRADHCPOPTS: upd_opts}}
req = self.new_update_request('ports', update_port,
port['port']['id'])
port = self.deserialize('json', req.get_response(self.api))
self._check_opts(new_opts,
port['port'][edo_ext.EXTRADHCPOPTS]) port['port'][edo_ext.EXTRADHCPOPTS])
def test_update_port_with_extradhcpopt_delete(self): def test_update_port_with_extradhcpopt_delete(self):
@ -183,9 +183,9 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase):
{'opt_name': 'server-ip-address', {'opt_name': 'server-ip-address',
'opt_value': '123.123.123.456'}] 'opt_value': '123.123.123.456'}]
upd_opts = [{'opt_name': 'bootfile-name', 'opt_value': None}] upd_opts = [{'opt_name': 'bootfile-name', 'opt_value': None}]
new_opts = [] expected_opts = []
new_opts = [opt for opt in opt_list expected_opts = [opt for opt in opt_list
if opt['opt_name'] != 'bootfile-name'] if opt['opt_name'] != 'bootfile-name']
params = {edo_ext.EXTRADHCPOPTS: opt_list, params = {edo_ext.EXTRADHCPOPTS: opt_list,
@ -197,7 +197,7 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase):
req = self.new_update_request('ports', update_port, req = self.new_update_request('ports', update_port,
port['port']['id']) port['port']['id'])
port = self.deserialize('json', req.get_response(self.api)) port = self.deserialize('json', req.get_response(self.api))
self._check_opts(new_opts, self._check_opts(expected_opts,
port['port'][edo_ext.EXTRADHCPOPTS]) port['port'][edo_ext.EXTRADHCPOPTS])
def test_update_port_without_extradhcpopt_delete(self): def test_update_port_without_extradhcpopt_delete(self):
@ -226,3 +226,41 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase):
port = self.deserialize('json', req.get_response(self.api)) port = self.deserialize('json', req.get_response(self.api))
self._check_opts(opt_list, self._check_opts(opt_list,
port['port'][edo_ext.EXTRADHCPOPTS]) port['port'][edo_ext.EXTRADHCPOPTS])
def test_update_port_with_blank_string_extradhcpopt(self):
opt_list = [{'opt_name': 'bootfile-name', 'opt_value': 'pxelinux.0'},
{'opt_name': 'tftp-server',
'opt_value': '123.123.123.123'},
{'opt_name': 'server-ip-address',
'opt_value': '123.123.123.456'}]
upd_opts = [{'opt_name': 'bootfile-name', 'opt_value': ' '}]
params = {edo_ext.EXTRADHCPOPTS: opt_list,
'arg_list': (edo_ext.EXTRADHCPOPTS,)}
with self.port(**params) as port:
update_port = {'port': {edo_ext.EXTRADHCPOPTS: upd_opts}}
req = self.new_update_request('ports', update_port,
port['port']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_update_port_with_blank_name_extradhcpopt(self):
opt_list = [{'opt_name': 'bootfile-name', 'opt_value': 'pxelinux.0'},
{'opt_name': 'tftp-server',
'opt_value': '123.123.123.123'},
{'opt_name': 'server-ip-address',
'opt_value': '123.123.123.456'}]
upd_opts = [{'opt_name': ' ', 'opt_value': 'pxelinux.0'}]
params = {edo_ext.EXTRADHCPOPTS: opt_list,
'arg_list': (edo_ext.EXTRADHCPOPTS,)}
with self.port(**params) as port:
update_port = {'port': {edo_ext.EXTRADHCPOPTS: upd_opts}}
req = self.new_update_request('ports', update_port,
port['port']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)