From 6a6a4b108803c43d64e4c0805d64f99702df8cfb Mon Sep 17 00:00:00 2001 From: dekehn Date: Tue, 3 Dec 2013 15:28:55 -0700 Subject: [PATCH] extra_dhcp_opt add checks for empty strings When passing extra-dhcp-opt into the port-create where an empty string is provided as opt_value=' ', the create and update routine will load the empty string into the DB. The result when written to the opts file is: "tag:ece4c8aa-15c9-4f6b-8c42-7d4e285734bf,option:server-ip-address", which when read by dnsmasq has been tested to cause dnsmasq to segment fault. Change-Id: I31de4a3d27092bb219d20221c5ef5a6b22e050dc Closes-Bug: #1257467 --- neutron/api/v2/attributes.py | 18 ++- neutron/extensions/extra_dhcp_opt.py | 9 +- neutron/tests/unit/test_attributes.py | 18 +++ .../unit/test_extension_extradhcpopts.py | 108 ++++++++++++------ 4 files changed, 113 insertions(+), 40 deletions(-) diff --git a/neutron/api/v2/attributes.py b/neutron/api/v2/attributes.py index 590a4bbc56..33756f0142 100644 --- a/neutron/api/v2/attributes.py +++ b/neutron/api/v2/attributes.py @@ -74,9 +74,22 @@ def _validate_values(data, valid_values=None): return msg +def _validate_not_empty_string_or_none(data, max_len=None): + if data is not None: + return _validate_not_empty_string(data, max_len=max_len) + + +def _validate_not_empty_string(data, max_len=None): + msg = _validate_string(data, max_len=max_len) + if msg: + return msg + if not data.strip(): + return _("'%s' Blank strings are not permitted") % data + + def _validate_string_or_none(data, max_len=None): if data is not None: - return _validate_string(data, max_len=None) + return _validate_string(data, max_len=max_len) def _validate_string(data, max_len=None): @@ -527,6 +540,9 @@ validators = {'type:dict': _validate_dict, 'type:regex': _validate_regex, 'type:string': _validate_string, 'type:string_or_none': _validate_string_or_none, + 'type:not_empty_string': _validate_not_empty_string, + 'type:not_empty_string_or_none': + _validate_not_empty_string_or_none, 'type:subnet': _validate_subnet, 'type:subnet_list': _validate_subnet_list, 'type:uuid': _validate_uuid, diff --git a/neutron/extensions/extra_dhcp_opt.py b/neutron/extensions/extra_dhcp_opt.py index 92e95a5d6e..59a3a0c59e 100644 --- a/neutron/extensions/extra_dhcp_opt.py +++ b/neutron/extensions/extra_dhcp_opt.py @@ -26,8 +26,7 @@ class ExtraDhcpOptNotFound(exceptions.NotFound): class ExtraDhcpOptBadData(exceptions.InvalidInput): - message = _("Invalid data format for extra-dhcp-opt, " - "provide a list of dicts: %(data)s") + message = _("Invalid data format for extra-dhcp-opt: %(data)s") def _validate_list_of_dict_or_none(data, key_specs=None): @@ -39,6 +38,7 @@ def _validate_list_of_dict_or_none(data, key_specs=None): if msg: raise ExtraDhcpOptBadData(data=msg) + attr.validators['type:list_of_dict_or_none'] = _validate_list_of_dict_or_none # Attribute Map @@ -54,8 +54,9 @@ EXTENDED_ATTRIBUTES_2_0 = { 'validate': { 'type:list_of_dict_or_none': { 'id': {'type:uuid': None, 'required': False}, - 'opt_name': {'type:string': None, 'required': True}, - 'opt_value': {'type:string_or_none': None, + 'opt_name': {'type:not_empty_string': None, + 'required': True}, + 'opt_value': {'type:not_empty_string_or_none': None, 'required': True}}}}}} diff --git a/neutron/tests/unit/test_attributes.py b/neutron/tests/unit/test_attributes.py index ebfcbf2c49..61c08d36bf 100644 --- a/neutron/tests/unit/test_attributes.py +++ b/neutron/tests/unit/test_attributes.py @@ -65,6 +65,24 @@ class TestAttributes(base.BaseTestCase): msg = attributes._validate_values(7, (4, 6)) self.assertEqual(msg, "'7' is not in (4, 6)") + def test_validate_not_empty_string(self): + msg = attributes._validate_not_empty_string(' ', None) + self.assertEqual(msg, u"' ' Blank strings are not permitted") + + def test_validate_not_empty_string_or_none(self): + msg = attributes._validate_not_empty_string_or_none(' ', None) + self.assertEqual(msg, u"' ' Blank strings are not permitted") + + msg = attributes._validate_not_empty_string_or_none(None, None) + self.assertIsNone(msg) + + def test_validate_string_or_none(self): + msg = attributes._validate_not_empty_string_or_none('test', None) + self.assertIsNone(msg) + + msg = attributes._validate_not_empty_string_or_none(None, None) + self.assertIsNone(msg) + def test_validate_string(self): msg = attributes._validate_string(None, None) self.assertEqual(msg, "'None' is not a valid string") diff --git a/neutron/tests/unit/test_extension_extradhcpopts.py b/neutron/tests/unit/test_extension_extradhcpopts.py index 1589851bcc..8d0f1e26c9 100644 --- a/neutron/tests/unit/test_extension_extradhcpopts.py +++ b/neutron/tests/unit/test_extension_extradhcpopts.py @@ -17,6 +17,7 @@ # import copy +import webob.exc from neutron.db import db_base_plugin_v2 from neutron.db import extradhcpopt_db as edo_db @@ -87,22 +88,22 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase): port['port'][edo_ext.EXTRADHCPOPTS]) def test_create_port_with_none_extradhcpopts(self): - new_list = [{'opt_name': 'bootfile-name', + opt_list = [{'opt_name': 'bootfile-name', 'opt_value': None}, {'opt_name': 'server-ip-address', 'opt_value': '123.123.123.456'}, {'opt_name': 'tftp-server', 'opt_value': '123.123.123.123'}] - new_dict = [{'opt_name': 'server-ip-address', + expected = [{'opt_name': 'server-ip-address', 'opt_value': '123.123.123.456'}, {'opt_name': 'tftp-server', 'opt_value': '123.123.123.123'}] - params = {edo_ext.EXTRADHCPOPTS: new_list, + params = {edo_ext.EXTRADHCPOPTS: opt_list, 'arg_list': (edo_ext.EXTRADHCPOPTS,)} with self.port(**params) as port: - self._check_opts(new_dict, + self._check_opts(expected, port['port'][edo_ext.EXTRADHCPOPTS]) def test_update_port_with_extradhcpopts_with_same(self): @@ -112,8 +113,8 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase): {'opt_name': 'server-ip-address', 'opt_value': '123.123.123.456'}] upd_opts = [{'opt_name': 'bootfile-name', 'opt_value': 'changeme.0'}] - new_opts = opt_list[:] - for i in new_opts: + expected_opts = opt_list[:] + for i in expected_opts: if i['opt_name'] == upd_opts[0]['opt_name']: i['opt_value'] = upd_opts[0]['opt_value'] break @@ -127,7 +128,27 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase): req = self.new_update_request('ports', update_port, port['port']['id']) port = self.deserialize('json', req.get_response(self.api)) - self._check_opts(new_opts, + self._check_opts(expected_opts, + port['port'][edo_ext.EXTRADHCPOPTS]) + + def test_update_port_with_additional_extradhcpopt(self): + opt_list = [{'opt_name': 'tftp-server', + 'opt_value': '123.123.123.123'}, + {'opt_name': 'server-ip-address', + 'opt_value': '123.123.123.456'}] + upd_opts = [{'opt_name': 'bootfile-name', 'opt_value': 'changeme.0'}] + expected_opts = copy.deepcopy(opt_list) + expected_opts.append(upd_opts[0]) + params = {edo_ext.EXTRADHCPOPTS: opt_list, + 'arg_list': (edo_ext.EXTRADHCPOPTS,)} + + with self.port(**params) as port: + update_port = {'port': {edo_ext.EXTRADHCPOPTS: upd_opts}} + + req = self.new_update_request('ports', update_port, + port['port']['id']) + port = self.deserialize('json', req.get_response(self.api)) + self._check_opts(expected_opts, port['port'][edo_ext.EXTRADHCPOPTS]) def test_update_port_with_extradhcpopts(self): @@ -137,8 +158,8 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase): {'opt_name': 'server-ip-address', 'opt_value': '123.123.123.456'}] upd_opts = [{'opt_name': 'bootfile-name', 'opt_value': 'changeme.0'}] - new_opts = copy.deepcopy(opt_list) - for i in new_opts: + expected_opts = copy.deepcopy(opt_list) + for i in expected_opts: if i['opt_name'] == upd_opts[0]['opt_name']: i['opt_value'] = upd_opts[0]['opt_value'] break @@ -152,28 +173,7 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase): req = self.new_update_request('ports', update_port, port['port']['id']) port = self.deserialize('json', req.get_response(self.api)) - self._check_opts(new_opts, - port['port'][edo_ext.EXTRADHCPOPTS]) - - def test_update_port_with_additional_extradhcpopt(self): - opt_list = [{'opt_name': 'tftp-server', - 'opt_value': '123.123.123.123'}, - {'opt_name': 'server-ip-address', - 'opt_value': '123.123.123.456'}] - upd_opts = [{'opt_name': 'bootfile-name', 'opt_value': 'changeme.0'}] - new_opts = copy.deepcopy(opt_list) - new_opts.append(upd_opts[0]) - - params = {edo_ext.EXTRADHCPOPTS: opt_list, - 'arg_list': (edo_ext.EXTRADHCPOPTS,)} - - with self.port(**params) as port: - update_port = {'port': {edo_ext.EXTRADHCPOPTS: upd_opts}} - - req = self.new_update_request('ports', update_port, - port['port']['id']) - port = self.deserialize('json', req.get_response(self.api)) - self._check_opts(new_opts, + self._check_opts(expected_opts, port['port'][edo_ext.EXTRADHCPOPTS]) def test_update_port_with_extradhcpopt_delete(self): @@ -183,10 +183,10 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase): {'opt_name': 'server-ip-address', 'opt_value': '123.123.123.456'}] upd_opts = [{'opt_name': 'bootfile-name', 'opt_value': None}] - new_opts = [] + expected_opts = [] - new_opts = [opt for opt in opt_list - if opt['opt_name'] != 'bootfile-name'] + expected_opts = [opt for opt in opt_list + if opt['opt_name'] != 'bootfile-name'] params = {edo_ext.EXTRADHCPOPTS: opt_list, 'arg_list': (edo_ext.EXTRADHCPOPTS,)} @@ -197,7 +197,7 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase): req = self.new_update_request('ports', update_port, port['port']['id']) port = self.deserialize('json', req.get_response(self.api)) - self._check_opts(new_opts, + self._check_opts(expected_opts, port['port'][edo_ext.EXTRADHCPOPTS]) def test_update_port_without_extradhcpopt_delete(self): @@ -226,3 +226,41 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase): port = self.deserialize('json', req.get_response(self.api)) self._check_opts(opt_list, port['port'][edo_ext.EXTRADHCPOPTS]) + + def test_update_port_with_blank_string_extradhcpopt(self): + opt_list = [{'opt_name': 'bootfile-name', 'opt_value': 'pxelinux.0'}, + {'opt_name': 'tftp-server', + 'opt_value': '123.123.123.123'}, + {'opt_name': 'server-ip-address', + 'opt_value': '123.123.123.456'}] + upd_opts = [{'opt_name': 'bootfile-name', 'opt_value': ' '}] + + params = {edo_ext.EXTRADHCPOPTS: opt_list, + 'arg_list': (edo_ext.EXTRADHCPOPTS,)} + + with self.port(**params) as port: + update_port = {'port': {edo_ext.EXTRADHCPOPTS: upd_opts}} + + req = self.new_update_request('ports', update_port, + port['port']['id']) + res = req.get_response(self.api) + self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) + + def test_update_port_with_blank_name_extradhcpopt(self): + opt_list = [{'opt_name': 'bootfile-name', 'opt_value': 'pxelinux.0'}, + {'opt_name': 'tftp-server', + 'opt_value': '123.123.123.123'}, + {'opt_name': 'server-ip-address', + 'opt_value': '123.123.123.456'}] + upd_opts = [{'opt_name': ' ', 'opt_value': 'pxelinux.0'}] + + params = {edo_ext.EXTRADHCPOPTS: opt_list, + 'arg_list': (edo_ext.EXTRADHCPOPTS,)} + + with self.port(**params) as port: + update_port = {'port': {edo_ext.EXTRADHCPOPTS: upd_opts}} + + req = self.new_update_request('ports', update_port, + port['port']['id']) + res = req.get_response(self.api) + self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)