port pci whitelist from nova to zun

Change-Id: I7a755d608428f61cf05b82f5ee9089b6fe16f1de
Partially-Implements: blueprint support-pcipassthroughfilter
This commit is contained in:
ShunliZhou 2017-08-09 10:57:44 +08:00
parent 127794e2dd
commit 5db86474fb
2 changed files with 177 additions and 0 deletions

94
zun/pci/whitelist.py Normal file
View File

@ -0,0 +1,94 @@
# Copyright (c) 2017 Intel, Inc.
# Copyright (c) 2017 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_serialization import jsonutils
from zun.common import exception
import zun.conf
from zun.pci import devspec
CONF = zun.conf.CONF
class Whitelist(object):
"""White list class to decide assignable pci devices.
Not all devices on compute node can be assigned to guest, the
cloud administrator decides the devices that can be assigned
based on vendor_id or product_id etc. If no white list specified,
no device will be assignable.
"""
def _parse_white_list_from_config(self, whitelists):
"""Parse and validate the pci whitelist from the zun config."""
specs = []
for jsonspec in whitelists:
try:
dev_spec = jsonutils.loads(jsonspec)
except ValueError:
raise exception.PciConfigInvalidWhitelist(
reason=_("Invalid entry: '%s'") % jsonspec)
if isinstance(dev_spec, dict):
dev_spec = [dev_spec]
elif not isinstance(dev_spec, list):
raise exception.PciConfigInvalidWhitelist(
reason=_("Invalid entry: '%s'; "
"Expecting list or dict") % jsonspec)
for ds in dev_spec:
if not isinstance(ds, dict):
raise exception.PciConfigInvalidWhitelist(
reason=_("Invalid entry: '%s'; "
"Expecting dict") % ds)
spec = devspec.PciDeviceSpec(ds)
specs.append(spec)
return specs
def __init__(self, whitelist_spec=None):
"""White list constructor
For example, followed json string specifies that devices whose
vendor_id is '8086' and product_id is '1520' can be assigned
to guest.
'[{"product_id":"1520", "vendor_id":"8086"}]'
:param whitelist_spec: A json string for a list of dictionaries,
each dictionary specifies the pci device
properties requirement.
"""
super(Whitelist, self).__init__()
if whitelist_spec:
self.specs = self._parse_white_list_from_config(whitelist_spec)
else:
self.specs = []
def device_assignable(self, dev):
"""Check if a device can be assigned to a guest.
:param dev: A dictionary describing the device properties
"""
for spec in self.specs:
if spec.match(dev):
return True
return False
def get_devspec(self, pci_dev):
for spec in self.specs:
if spec.match_pci_obj(pci_dev):
return spec

View File

@ -0,0 +1,83 @@
# Copyright (c) 2017 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from zun.pci import whitelist
from zun.tests import base
dev_dict = {
'compute_node_id': 1,
'address': '0000:00:0a.1',
'product_id': '0001',
'vendor_id': '8086',
'status': 'available',
'phys_function': '0000:00:0a.0',
}
class WhitelistTestCase(base.TestCase):
def test_whitelist(self):
white_list = '{"product_id":"0001", "vendor_id":"8086"}'
parsed = whitelist.Whitelist([white_list])
self.assertEqual(1, len(parsed.specs))
def test_whitelist_list_format(self):
white_list = '[{"product_id":"0001", "vendor_id":"8086"},'\
'{"product_id":"0002", "vendor_id":"8086"}]'
parsed = whitelist.Whitelist([white_list])
self.assertEqual(2, len(parsed.specs))
def test_whitelist_empty(self):
parsed = whitelist.Whitelist()
self.assertFalse(parsed.device_assignable(dev_dict))
def test_whitelist_multiple(self):
wl1 = '{"product_id":"0001", "vendor_id":"8086"}'
wl2 = '{"product_id":"0002", "vendor_id":"8087"}'
parsed = whitelist.Whitelist([wl1, wl2])
self.assertEqual(2, len(parsed.specs))
def test_device_assignable_glob(self):
white_list = '{"address":"*:00:0a.*", "physical_network":"hr_net"}'
parsed = whitelist.Whitelist(
[white_list])
self.assertTrue(parsed.device_assignable(dev_dict))
def test_device_not_assignable_glob(self):
white_list = '{"address":"*:00:0b.*", "physical_network":"hr_net"}'
parsed = whitelist.Whitelist(
[white_list])
self.assertFalse(parsed.device_assignable(dev_dict))
def test_device_assignable_regex(self):
white_list = ('{"address":{"domain": ".*", "bus": ".*", '
'"slot": "0a", "function": "[0-1]"}, '
'"physical_network":"hr_net"}')
parsed = whitelist.Whitelist(
[white_list])
self.assertTrue(parsed.device_assignable(dev_dict))
def test_device_not_assignable_regex(self):
white_list = ('{"address":{"domain": ".*", "bus": ".*", '
'"slot": "0a", "function": "[2-3]"}, '
'"physical_network":"hr_net"}')
parsed = whitelist.Whitelist(
[white_list])
self.assertFalse(parsed.device_assignable(dev_dict))
def test_device_assignable(self):
white_list = '{"product_id":"0001", "vendor_id":"8086"}'
parsed = whitelist.Whitelist([white_list])
self.assertTrue(parsed.device_assignable(dev_dict))