Add support for OpenNebula meta-data
All the needed details are taken from a context script file found on a ISO mounted inside the VM. Change-Id: I2259159ea09e03e17a0c98d2fe7039f597be9190
This commit is contained in:
parent
8befa6c833
commit
92bdd3fce2
@ -29,9 +29,11 @@ opts = [
|
||||
'cloudbaseinit.metadata.services.ec2service.EC2Service',
|
||||
'cloudbaseinit.metadata.services.maasservice.MaaSHttpService',
|
||||
'cloudbaseinit.metadata.services.cloudstack.CloudStack',
|
||||
'cloudbaseinit.metadata.services'
|
||||
'.opennebulaservice.OpenNebulaService',
|
||||
],
|
||||
help='List of enabled metadata service classes, '
|
||||
'to be tested fro availability in the provided order. '
|
||||
'to be tested for availability in the provided order. '
|
||||
'The first available service will be used to retrieve '
|
||||
'metadata')
|
||||
]
|
||||
|
234
cloudbaseinit/metadata/services/opennebulaservice.py
Normal file
234
cloudbaseinit/metadata/services/opennebulaservice.py
Normal file
@ -0,0 +1,234 @@
|
||||
# Copyright 2014 Cloudbase Solutions Srl
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
# pylint: disable=missing-docstring, bad-builtin
|
||||
|
||||
|
||||
import os
|
||||
import re
|
||||
import socket
|
||||
import struct
|
||||
|
||||
import six
|
||||
|
||||
from cloudbaseinit.metadata.services import base
|
||||
from cloudbaseinit.openstack.common import log as logging
|
||||
from cloudbaseinit.osutils import factory as osutils_factory
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
CONTEXT_FILE = "context.sh"
|
||||
INSTANCE_ID = "iid-dsopennebula"
|
||||
|
||||
# metadata identifiers
|
||||
HOST_NAME = ["SET_HOSTNAME", "HOSTNAME"]
|
||||
USER_DATA = ["USER_DATA", "USERDATA"]
|
||||
PUBLIC_KEY = ["SSH_PUBLIC_KEY", "SSH_KEY"]
|
||||
|
||||
MAC = ["ETH{iid}_MAC"]
|
||||
ADDRESS = ["ETH{iid}_IP"]
|
||||
NETMASK = ["ETH{iid}_MASK"]
|
||||
GATEWAY = ["ETH{iid}_GATEWAY"]
|
||||
DNSNS = ["ETH{iid}_DNS"]
|
||||
|
||||
|
||||
class OpenNebulaService(base.BaseMetadataService):
|
||||
|
||||
"""Service handling ONE.
|
||||
|
||||
Service able to expose OpenNebula metadata
|
||||
using information found in a mounted ISO file.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
super(OpenNebulaService, self).__init__()
|
||||
self._context_path = None
|
||||
self._raw_content = None
|
||||
self._dict_content = {}
|
||||
|
||||
def _nic_count(self):
|
||||
"""Return the number of available interfaces."""
|
||||
mac = MAC[0]
|
||||
iid = 0
|
||||
while self._dict_content.get(mac.format(iid=iid)):
|
||||
iid += 1
|
||||
return iid
|
||||
|
||||
@staticmethod
|
||||
def _parse_shell_variables(content):
|
||||
"""Returns a dictionary with variables and their values.
|
||||
|
||||
This is a dummy approach, because it works only with simple literals.
|
||||
"""
|
||||
crlf_sep = "\r\n"
|
||||
sep = "\n"
|
||||
if crlf_sep in content:
|
||||
sep = crlf_sep
|
||||
# preprocess the content
|
||||
lines = []
|
||||
for line in content.split(sep):
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
lines.append(line)
|
||||
# for cleaner pattern matching
|
||||
lines.append("__REGEX_DUMMY__='__regex_dummy__'")
|
||||
new_content = sep.join(lines)
|
||||
# get pairs
|
||||
pairs = {}
|
||||
pattern = (r"(?P<key>\w+)=(['\"](?P<str_value>[\s\S]+?)['\"]|"
|
||||
r"(?P<int_value>\d+))(?=\s+\w+=)")
|
||||
for match in re.finditer(pattern, new_content):
|
||||
pairs[match.group("key")] = (match.group("str_value") or
|
||||
int(match.group("int_value")))
|
||||
return pairs
|
||||
|
||||
@staticmethod
|
||||
def _calculate_netmask(address, gateway):
|
||||
"""Try to determine a default netmask.
|
||||
|
||||
It is a simple, frequent and dummy prediction
|
||||
based on the provided IP and gateway addresses.
|
||||
"""
|
||||
address_chunks = address.split(".")
|
||||
gateway_chunks = gateway.split(".")
|
||||
netmask_chunks = []
|
||||
for achunk, gchunk in six.moves.zip(
|
||||
address_chunks, gateway_chunks):
|
||||
if achunk == gchunk:
|
||||
nchunk = "255"
|
||||
else:
|
||||
nchunk = "0"
|
||||
netmask_chunks.append(nchunk)
|
||||
return ".".join(netmask_chunks)
|
||||
|
||||
@staticmethod
|
||||
def _compute_broadcast(address, netmask):
|
||||
address_ulong, = struct.unpack("!L", socket.inet_aton(address))
|
||||
netmask_ulong, = struct.unpack("!L", socket.inet_aton(netmask))
|
||||
bitmask = 0xFFFFFFFF
|
||||
broadcast_ulong = address_ulong | ~netmask_ulong & bitmask
|
||||
broadcast = socket.inet_ntoa(struct.pack("!L", broadcast_ulong))
|
||||
return broadcast
|
||||
|
||||
def _get_data(self, name):
|
||||
# get the content if it's not already retrieved
|
||||
if not self._raw_content:
|
||||
if not self._context_path:
|
||||
msg = "No metadata file path found"
|
||||
LOG.debug(msg)
|
||||
raise base.NotExistingMetadataException(msg)
|
||||
with open(self._context_path, "r") as fin:
|
||||
self._raw_content = fin.read()
|
||||
# fill the dict with values
|
||||
vardict = OpenNebulaService._parse_shell_variables(
|
||||
self._raw_content
|
||||
)
|
||||
self._dict_content.update(vardict)
|
||||
# return the requested value
|
||||
if name not in self._dict_content:
|
||||
msg = "Metadata {} not found".format(name)
|
||||
LOG.debug(msg)
|
||||
raise base.NotExistingMetadataException(msg)
|
||||
return self._dict_content[name]
|
||||
|
||||
def _get_cache_data(self, names, iid=None):
|
||||
# Solves caching issues when working with
|
||||
# multiple names (lists not hashable).
|
||||
# This happens because the caching function used
|
||||
# to store already computed results inside a dictionary
|
||||
# and the keys were strings (and must be anything that
|
||||
# is hashable under a dictionary, that's why the exception
|
||||
# is thrown).
|
||||
names = names[:]
|
||||
if iid is not None:
|
||||
for ind, value in enumerate(names):
|
||||
names[ind] = value.format(iid=iid)
|
||||
for name in names:
|
||||
try:
|
||||
return super(OpenNebulaService, self)._get_cache_data(name)
|
||||
except base.NotExistingMetadataException:
|
||||
pass
|
||||
msg = "None of {} metadata was found".format(", ".join(names))
|
||||
LOG.debug(msg)
|
||||
raise base.NotExistingMetadataException(msg)
|
||||
|
||||
def load(self):
|
||||
"""Loads the context metadata from the ISO provided by OpenNebula."""
|
||||
super(OpenNebulaService, self).__init__()
|
||||
LOG.debug("Searching for a drive containing OpenNebula context data")
|
||||
osutils = osutils_factory.get_os_utils()
|
||||
for drive in osutils.get_cdrom_drives():
|
||||
label = osutils.get_volume_label(drive)
|
||||
file_path = os.path.join(drive, CONTEXT_FILE)
|
||||
if os.path.isfile(file_path):
|
||||
LOG.info("Found drive %(label)s (%(drive)s) with "
|
||||
"OpenNebula metadata file %(file_path)s",
|
||||
{"label": label, "drive": drive,
|
||||
"file_path": file_path})
|
||||
self._context_path = file_path
|
||||
return True
|
||||
LOG.error("No drive or context file found")
|
||||
return False
|
||||
|
||||
def get_instance_id(self):
|
||||
# return a dummy default value
|
||||
return INSTANCE_ID
|
||||
|
||||
def get_host_name(self):
|
||||
return self._get_cache_data(HOST_NAME)
|
||||
|
||||
def get_user_data(self):
|
||||
return self._get_cache_data(USER_DATA)
|
||||
|
||||
def get_public_keys(self):
|
||||
return [self._get_cache_data(PUBLIC_KEY)]
|
||||
|
||||
def get_network_details(self):
|
||||
"""Return a list of NetworkDetails objects.
|
||||
|
||||
With each object from that list, the corresponding
|
||||
NIC (by mac) can be statically configured.
|
||||
If no such object is present, then is believed that
|
||||
this is handled by DHCP (user didn't provide sufficient data).
|
||||
"""
|
||||
network_details = []
|
||||
ncount = self._nic_count()
|
||||
# for every interface
|
||||
for iid in range(ncount):
|
||||
try:
|
||||
# get existing values
|
||||
mac = self._get_cache_data(MAC, iid=iid).upper()
|
||||
address = self._get_cache_data(ADDRESS, iid=iid)
|
||||
gateway = self._get_cache_data(GATEWAY, iid=iid)
|
||||
# try to find/predict and compute the rest
|
||||
try:
|
||||
netmask = self._get_cache_data(NETMASK, iid=iid)
|
||||
except base.NotExistingMetadataException:
|
||||
netmask = self._calculate_netmask(address, gateway)
|
||||
broadcast = self._compute_broadcast(address, netmask)
|
||||
# gather them as namedtuple objects
|
||||
details = base.NetworkDetails(
|
||||
mac,
|
||||
address,
|
||||
netmask,
|
||||
broadcast,
|
||||
gateway,
|
||||
self._get_cache_data(DNSNS, iid=iid).split(" ")
|
||||
)
|
||||
except base.NotExistingMetadataException:
|
||||
LOG.debug("Incomplete NIC details")
|
||||
else:
|
||||
network_details.append(details)
|
||||
return network_details
|
310
cloudbaseinit/tests/metadata/services/test_opennebulaservice.py
Normal file
310
cloudbaseinit/tests/metadata/services/test_opennebulaservice.py
Normal file
@ -0,0 +1,310 @@
|
||||
# Copyright 2014 Cloudbase Solutions Srl
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
import re
|
||||
import textwrap
|
||||
import unittest
|
||||
|
||||
import mock
|
||||
|
||||
from cloudbaseinit.metadata.services import base
|
||||
from cloudbaseinit.metadata.services import opennebulaservice
|
||||
|
||||
|
||||
MAC = "54:EE:75:19:F4:61" # output must be upper
|
||||
ADDRESS = "192.168.122.101"
|
||||
NETMASK = "255.255.255.0"
|
||||
BROADCAST = "192.168.122.255"
|
||||
GATEWAY = "192.168.122.1"
|
||||
DNSNS = "8.8.8.8"
|
||||
PUBLIC_KEY = ("ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAACAQDJitRvac/fr1jWrZw"
|
||||
"j6mgDxlrBN2xAtKExtm5cPkexQUuxTma61ZijP/aWiQg9Q93baSwsBi"
|
||||
"IPM0SO1ro0szv84cC9GmSHWVOnCVWGY3nojplqL5VfV9NDLlmSceFc5"
|
||||
"cLpUTMnoUiXt8QXfDm50gh/5vGgJJXuMz1BKwfJH232ajM5r9xUfKDZ"
|
||||
"jzhTVooPlWoJJmn6xJDOJG7cjszZpv2N+Xzq7GRo6fa7ygTASOnES5t"
|
||||
"vbcqM8432P6Bg7Hkr2bOjQF11RyJofFcOvECKfbX4jQ9JGzbocNnepw"
|
||||
"2YlV08UYa/8aoFgzyo/FiR6cc/jQupbFIe92xBSNiMEioeZ26nTac6C"
|
||||
"oRQXEKrb95Ntg7ysYUqjKQFWJdx6AW7hlE8mMjA6nRqvswXsp1atNdU"
|
||||
"DylyVxlvUHo9rEHEs3GKjkO4tr8KKR0N+oWVAO8S2RfSaD/wFcTokW8"
|
||||
"DeLz2Fnc04pyqOnCjdG7b7HqQVUupuxJNc3EUxZEjbUYiDi22MWF0Oa"
|
||||
"vM7e0xZHMOsdhUPUUnBWngETuOTVSo26bRfzOcUzjwyv2n5PS9rvzYz"
|
||||
"ooXIqcK4BdJ8TLh4OQZwV862PjiafxxWC1L90Tou+BkMTFvwoiWDGMc"
|
||||
"ckPkjvg6p9E2viSFgaKMq2S6EjbzsHG/9BilLBDHLOcbhUU6E76dqGk"
|
||||
"4jl0ZzQ== jfontan@zooloo")
|
||||
HOST_NAME = "ws2012r2"
|
||||
USER_DATA = """#cloud-config
|
||||
bootcmd:
|
||||
- ifdown -a
|
||||
runcmd:
|
||||
- curl http://10.0.1.1:8999/I_am_alive
|
||||
write_files:
|
||||
- encoding: b64
|
||||
content: RG9lcyBpdCB3b3JrPwo=
|
||||
owner: root:root
|
||||
path: /etc/test_file
|
||||
permissions: '\''0644'\''
|
||||
packages:
|
||||
- ruby2.0"""
|
||||
|
||||
CONTEXT = """
|
||||
DISK_ID='1'
|
||||
ETH0_DNS='{dnsns}'
|
||||
ETH0_GATEWAY='{gateway}'
|
||||
ETH0_IP='{address}'
|
||||
ETH0_MASK='{netmask}'
|
||||
ETH0_MAC='{mac}'
|
||||
ETH0_SEARCH_DOMAIN='example.org'
|
||||
NETWORK='YES'
|
||||
SET_HOSTNAME='{host_name}'
|
||||
SSH_PUBLIC_KEY='{public_key}'
|
||||
TARGET='hda'
|
||||
USER_DATA='{user_data}'
|
||||
""".format(
|
||||
dnsns=DNSNS,
|
||||
gateway=GATEWAY,
|
||||
address=ADDRESS,
|
||||
netmask=NETMASK,
|
||||
mac=MAC.lower(), # warning: mac is in lowercase
|
||||
host_name=HOST_NAME,
|
||||
public_key=PUBLIC_KEY,
|
||||
user_data=USER_DATA
|
||||
)
|
||||
|
||||
CONTEXT2 = ("""
|
||||
ETH1_DNS='{dnsns}'
|
||||
ETH1_GATEWAY='{gateway}'
|
||||
ETH1_IP='{address}'
|
||||
ETH1_MASK='{netmask}'
|
||||
ETH1_MAC='{mac}'
|
||||
""" + CONTEXT).format(
|
||||
dnsns=DNSNS,
|
||||
gateway=GATEWAY,
|
||||
address=ADDRESS,
|
||||
netmask=NETMASK,
|
||||
mac=MAC.lower()
|
||||
)
|
||||
|
||||
|
||||
def _get_nic_details():
|
||||
details = base.NetworkDetails(
|
||||
MAC,
|
||||
ADDRESS,
|
||||
NETMASK,
|
||||
BROADCAST,
|
||||
GATEWAY,
|
||||
DNSNS.split(" ")
|
||||
)
|
||||
return details
|
||||
|
||||
|
||||
class _TestOpenNebulaService(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self._service = opennebulaservice.OpenNebulaService()
|
||||
|
||||
|
||||
class TestOpenNebulaService(_TestOpenNebulaService):
|
||||
|
||||
def _test_parse_shell_variables(self, crlf=False, comment=False):
|
||||
content = textwrap.dedent("""
|
||||
VAR1='1'
|
||||
var2='abcdef'
|
||||
VAR_VAR3='aaa.bbb.123.ccc'
|
||||
VaR4='aaa
|
||||
bbb
|
||||
x -- c
|
||||
d: e
|
||||
'
|
||||
""")
|
||||
if crlf:
|
||||
content = content.replace("\n", "\r\n")
|
||||
if comment:
|
||||
content += "# A simple comment\n"
|
||||
pairs = self._service._parse_shell_variables(content)
|
||||
_pairs = {
|
||||
"VAR1": "1",
|
||||
"var2": "abcdef",
|
||||
"VAR_VAR3": "aaa.bbb.123.ccc",
|
||||
"VaR4": "aaa\nbbb\nx -- c\nd: e\n"
|
||||
}
|
||||
if crlf:
|
||||
for key, value in _pairs.items():
|
||||
_pairs[key] = value.replace("\n", "\r\n")
|
||||
self.assertEqual(_pairs, pairs)
|
||||
|
||||
def test_parse_shell_variables(self):
|
||||
# 1. no CRLF, no comment
|
||||
# 2. CRLF, no comment
|
||||
# 3. no CRLF, comment
|
||||
for crlf, comment in (
|
||||
(False, False),
|
||||
(True, False),
|
||||
(False, True)):
|
||||
self._test_parse_shell_variables(crlf=crlf, comment=comment)
|
||||
|
||||
def test_calculate_netmask(self):
|
||||
address, gateway, _netmask = (
|
||||
"192.168.0.10",
|
||||
"192.168.1.1",
|
||||
"255.255.0.0"
|
||||
)
|
||||
netmask = self._service._calculate_netmask(address, gateway)
|
||||
self.assertEqual(_netmask, netmask)
|
||||
|
||||
def test_compute_broadcast(self):
|
||||
address, netmask, _broadcast = (
|
||||
"192.168.0.10",
|
||||
"255.255.0.0",
|
||||
"192.168.255.255"
|
||||
)
|
||||
broadcast = self._service._compute_broadcast(address, netmask)
|
||||
self.assertEqual(_broadcast, broadcast)
|
||||
|
||||
@mock.patch("cloudbaseinit.metadata.services"
|
||||
".opennebulaservice.os.path")
|
||||
@mock.patch("cloudbaseinit.metadata.services"
|
||||
".opennebulaservice.osutils_factory")
|
||||
def _test_load(self, mock_osutils_factory, mock_os_path, level=0):
|
||||
# fake data
|
||||
fakes = {
|
||||
"drive": "mount_point",
|
||||
"label": "fake_label",
|
||||
"context_path": "fake_path",
|
||||
"context_data": "fake_data"
|
||||
}
|
||||
# mocking part
|
||||
mock_osutils = mock.MagicMock()
|
||||
mock_osutils_factory.get_os_utils.return_value = mock_osutils
|
||||
mock_osutils.get_cdrom_drives.return_value = []
|
||||
# custom mocking according to level of testing
|
||||
if level > 1:
|
||||
mock_osutils.get_cdrom_drives.return_value = [fakes["drive"]]
|
||||
mock_osutils.get_volume_label.return_value = fakes["label"]
|
||||
mock_os_path.join.return_value = fakes["context_path"]
|
||||
mock_os_path.isfile.return_value = False
|
||||
if level > 2:
|
||||
mock_os_path.isfile.return_value = True
|
||||
# run the method being tested
|
||||
ret = self._service.load()
|
||||
# check calls
|
||||
if level > 0:
|
||||
mock_osutils_factory.get_os_utils.assert_called_once_with()
|
||||
mock_osutils.get_cdrom_drives.assert_called_once_with()
|
||||
if level > 1:
|
||||
(mock_osutils.get_volume_label
|
||||
.assert_called_once_with(fakes["drive"]))
|
||||
mock_os_path.join.assert_called_once()
|
||||
mock_os_path.isfile.assert_called_once()
|
||||
# check response and members
|
||||
if level in (1, 2):
|
||||
self.assertFalse(ret)
|
||||
elif level == 3:
|
||||
self.assertTrue(ret)
|
||||
self.assertEqual(fakes["context_path"],
|
||||
self._service._context_path)
|
||||
|
||||
def test_load_no_drives(self):
|
||||
self._test_load(level=1)
|
||||
|
||||
def test_load_no_relevant_drive(self):
|
||||
self._test_load(level=2)
|
||||
|
||||
def test_load_relevant_drive(self):
|
||||
self._test_load(level=3)
|
||||
|
||||
@mock.patch("six.moves.builtins.open",
|
||||
new=mock.mock_open(read_data=CONTEXT))
|
||||
def test_get_data(self):
|
||||
eclass = base.NotExistingMetadataException
|
||||
with self.assertRaises(eclass):
|
||||
self._service._get_data("smt")
|
||||
self._service._context_path = "path"
|
||||
with self.assertRaises(eclass):
|
||||
self._service._get_data("smt")
|
||||
open.assert_called_once_with("path", "r")
|
||||
var = opennebulaservice.ADDRESS[0].format(iid=0)
|
||||
ret = self._service._get_data(var)
|
||||
self.assertEqual(ADDRESS, ret)
|
||||
|
||||
|
||||
class TestLoadedOpenNebulaService(_TestOpenNebulaService):
|
||||
|
||||
def setUp(self):
|
||||
super(TestLoadedOpenNebulaService, self).setUp()
|
||||
self.load_context()
|
||||
|
||||
def load_context(self, context=CONTEXT):
|
||||
self._service._raw_content = context
|
||||
vardict = self._service._parse_shell_variables(
|
||||
self._service._raw_content
|
||||
)
|
||||
self._service._dict_content = vardict
|
||||
|
||||
def test_get_cache_data(self):
|
||||
names = ["smt"]
|
||||
with self.assertRaises(base.NotExistingMetadataException):
|
||||
self._service._get_cache_data(names)
|
||||
names.append(opennebulaservice.ADDRESS[0].format(iid=0))
|
||||
ret = self._service._get_cache_data(names)
|
||||
self.assertEqual(ADDRESS, ret)
|
||||
|
||||
def test_get_instance_id(self):
|
||||
self.assertEqual(
|
||||
opennebulaservice.INSTANCE_ID,
|
||||
self._service.get_instance_id()
|
||||
)
|
||||
|
||||
def test_get_host_name(self):
|
||||
self.assertEqual(
|
||||
HOST_NAME,
|
||||
self._service.get_host_name()
|
||||
)
|
||||
|
||||
def test_get_user_data(self):
|
||||
self.assertEqual(
|
||||
USER_DATA,
|
||||
self._service.get_user_data()
|
||||
)
|
||||
|
||||
def test_get_public_keys(self):
|
||||
self.assertEqual(
|
||||
[PUBLIC_KEY],
|
||||
self._service.get_public_keys()
|
||||
)
|
||||
|
||||
def _test_get_network_details(self, netmask=True):
|
||||
if not netmask:
|
||||
context = re.sub(r"ETH0_MASK='(\d+\.){3}\d+'", "", CONTEXT)
|
||||
self.load_context(context=context)
|
||||
details = _get_nic_details()
|
||||
self.assertEqual(
|
||||
[details],
|
||||
self._service.get_network_details()
|
||||
)
|
||||
|
||||
def test_get_network_details(self):
|
||||
self._test_get_network_details(netmask=True)
|
||||
|
||||
def test_get_network_details_predict(self):
|
||||
self._test_get_network_details(netmask=False)
|
||||
|
||||
def test_multiple_nics(self):
|
||||
self.load_context(context=CONTEXT2)
|
||||
details = _get_nic_details()
|
||||
network_details = [details] * 2
|
||||
self.assertEqual(
|
||||
network_details,
|
||||
self._service.get_network_details()
|
||||
)
|
@ -14,9 +14,13 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
import unittest
|
||||
|
||||
try:
|
||||
import mock
|
||||
except ImportError:
|
||||
import unittest.mock as mock
|
||||
|
||||
from cloudbaseinit import exception
|
||||
from cloudbaseinit.metadata import factory
|
||||
|
||||
@ -24,8 +28,15 @@ from cloudbaseinit.metadata import factory
|
||||
class MetadataServiceFactoryTests(unittest.TestCase):
|
||||
|
||||
@mock.patch('cloudbaseinit.utils.classloader.ClassLoader.load_class')
|
||||
def _test_get_metadata_service(self, mock_load_class, ret_value):
|
||||
def _test_get_metadata_service(self, mock_load_class,
|
||||
ret_value=mock.MagicMock(),
|
||||
load_exception=False):
|
||||
mock_load_class.side_effect = ret_value
|
||||
if load_exception:
|
||||
mock_load_class()().load.side_effect = Exception
|
||||
with self.assertRaises(exception.CloudbaseInitException):
|
||||
factory.get_metadata_service()
|
||||
return
|
||||
if ret_value is exception.CloudbaseInitException:
|
||||
self.assertRaises(exception.CloudbaseInitException,
|
||||
factory.get_metadata_service)
|
||||
@ -34,9 +45,11 @@ class MetadataServiceFactoryTests(unittest.TestCase):
|
||||
self.assertEqual(mock_load_class()(), response)
|
||||
|
||||
def test_get_metadata_service(self):
|
||||
m = mock.MagicMock()
|
||||
self._test_get_metadata_service(ret_value=m)
|
||||
self._test_get_metadata_service()
|
||||
|
||||
def test_get_metadata_service_exception(self):
|
||||
self._test_get_metadata_service(
|
||||
ret_value=exception.CloudbaseInitException)
|
||||
|
||||
def test_get_metadata_service_load_exception(self):
|
||||
self._test_get_metadata_service(load_exception=True)
|
||||
|
Loading…
x
Reference in New Issue
Block a user