Add OVF metadata service

Implements support for OVF metadata service.

Sample ovf-env.xml file:
https://github.com/cloud-init/cloud-init/blob/master/doc/sources/ovf/example/ovf-env.xml

Change-Id: I262baa9138599a7462ec2069fd2a142df0a4eb1e
This commit is contained in:
Jose Dillet 2019-04-22 12:57:37 +03:00
parent 3c35d0502e
commit 6eeeb607f7
5 changed files with 391 additions and 0 deletions

View File

@ -22,6 +22,7 @@ _OPT_PATHS = (
'cloudbaseinit.conf.maas.MAASOptions',
'cloudbaseinit.conf.openstack.OpenStackOptions',
'cloudbaseinit.conf.azure.AzureOptions',
'cloudbaseinit.conf.ovf.OvfOptions',
)

51
cloudbaseinit/conf/ovf.py Normal file
View File

@ -0,0 +1,51 @@
# Copyright 2019 VMware, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Config options available for the Ovf metadata service."""
from oslo_config import cfg
from cloudbaseinit.conf import base as conf_base
class OvfOptions(conf_base.Options):
"""Config options available for the Ovf metadata service."""
def __init__(self, config):
super(OvfOptions, self).__init__(config, group="ovf")
self._options = [
cfg.StrOpt(
"config_file_name",
default="ovf-env.xml",
help="Configuration file name"),
cfg.StrOpt(
"drive_label",
default="OVF ENV",
help="Look for configuration file in drives with this label"),
cfg.StrOpt(
"ns",
default="oe",
help="Namespace prefix for ovf environment"),
]
def register(self):
"""Register the current options to the global ConfigOpts object."""
group = cfg.OptGroup(self.group_name, title='Ovf Options')
self._config.register_group(group)
self._config.register_opts(self._options, group=group)
def list(self):
"""Return a list which contains all the available options."""
return self._options

View File

@ -0,0 +1,142 @@
# Copyright 2019 VMware, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
import os
from oslo_log import log as oslo_logging
import untangle
from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit.metadata.services import base
from cloudbaseinit.osutils import factory as osutils_factory
CONF = cloudbaseinit_conf.CONF
LOG = oslo_logging.getLogger(__name__)
INSTANCE_ID = 'iid-ovf'
class OvfService(base.BaseMetadataService):
def __init__(self):
super(OvfService, self).__init__()
self._config_drive_path = None
self._ovf_env = None
self._osutils = osutils_factory.get_os_utils()
def load(self):
super(OvfService, self).load()
try:
self._get_ovf_env()
return True
except Exception as ex:
LOG.exception(ex)
return False
def _get_config_drive_path(self):
if not self._config_drive_path:
for drive_letter in self._osutils.get_logical_drives():
label = self._osutils.get_volume_label(drive_letter)
if label and label.lower() == CONF.ovf.drive_label.lower():
self._config_drive_path = drive_letter
if not self._config_drive_path:
raise base.NotExistingMetadataException(
"No drive with label %s could be found" %
CONF.ovf.drive_label)
return self._config_drive_path
def _get_ovf_env_path(self):
drive_path = self._get_config_drive_path()
ovf_env_path = os.path.join(drive_path, CONF.ovf.config_file_name)
if not os.path.exists(ovf_env_path):
raise base.NotExistingMetadataException(
"File %s does not exist in drive %s" %
(CONF.ovf.config_file_name, drive_path))
return ovf_env_path
def _get_ovf_env(self):
if not self._ovf_env:
ovf_env_path = self._get_ovf_env_path()
self._ovf_env = untangle.parse(ovf_env_path)
return self._ovf_env
def _get_property_section(self):
ovf_env = self._get_ovf_env()
if not hasattr(ovf_env.Environment, 'PropertySection'):
LOG.warning("PropertySection not found in ovf file")
return None
return ovf_env.Environment.PropertySection
def _get_property_values(self, property_name):
prop_values = []
prop_section = self._get_property_section()
if not hasattr(prop_section, 'Property'):
LOG.warning("PropertySection in ovf file has no Property elements")
return None
for child_property in prop_section.Property:
property_key = child_property[CONF.ovf.ns + ':key']
if property_key and property_key == property_name:
property_value = child_property[CONF.ovf.ns + ':value']
if property_value:
prop_values.append(property_value.strip())
if not prop_values:
LOG.warning("Property %s not found in PropertySection in ovf file",
property_name)
return prop_values
def _get_property_value(self, property_name):
prop_values = self._get_property_values(property_name)
if len(prop_values) >= 1:
if len(prop_values) > 1:
LOG.warning("Expected one value for property %s, "
"found more. Returning first one",
property_name)
return prop_values[0]
return None
def get_instance_id(self):
instance_id = self._get_property_value('instance-id')
if instance_id is None:
instance_id = INSTANCE_ID
return instance_id
def get_user_data(self):
return self._get_property_value('user-data')
def get_decoded_user_data(self):
user_data = self.get_user_data()
if user_data:
return base64.b64decode(user_data)
return None
def get_host_name(self):
return self._get_property_value('hostname')
def get_public_keys(self):
return self._get_property_values('public-keys')
def get_admin_username(self):
return self._get_property_value('username')
def get_admin_password(self):
return self._get_property_value('password')
def _get_data(self, path):
pass

View File

@ -0,0 +1,177 @@
# Copyright 2019 VMware, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
import importlib
import os
import unittest
try:
import unittest.mock as mock
except ImportError:
import mock
from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit.metadata.services import base
from cloudbaseinit.tests import testutils
CONF = cloudbaseinit_conf.CONF
MODPATH = "cloudbaseinit.metadata.services.ovfservice.OvfService"
class OvfServiceTest(unittest.TestCase):
@mock.patch('cloudbaseinit.osutils.factory.get_os_utils')
def setUp(self, mock_osutils):
self._mock_osutils = mock_osutils
self._mock_untangle = mock.MagicMock()
self._mock_ctypes = mock.MagicMock()
self._mock_wintypes = mock.MagicMock()
self._moves_mock = mock.MagicMock()
self._module_patcher = mock.patch.dict(
'sys.modules',
{'untangle': self._mock_untangle,
'ctypes': self._mock_ctypes,
'ctypes.wintypes': self._mock_wintypes,
'six.moves': self._moves_mock
})
self._module_patcher.start()
self._ovfservice_module = importlib.import_module(
'cloudbaseinit.metadata.services.ovfservice')
self._ovfservice = self._ovfservice_module.OvfService()
self._logsnatcher = testutils.LogSnatcher(
'cloudbaseinit.metadata.services.ovfservice')
def tearDown(self):
self._module_patcher.stop()
@mock.patch('os.path.exists')
def _test__get_ovf_env_path(self, mock_path_exists,
path_exists=True):
mock_osutils = mock.Mock()
mock_osutils.get_logical_drives.return_value = ['fake_drive']
mock_osutils.get_volume_label.return_value = CONF.ovf.drive_label
mock_path_exists.return_value = path_exists
self._ovfservice._osutils = mock_osutils
if not path_exists:
self.assertRaises(base.NotExistingMetadataException,
self._ovfservice._get_ovf_env_path)
else:
res = self._ovfservice._get_ovf_env_path()
ovf_env_path = os.path.join(
"fake_drive", "ovf-env.xml")
self.assertEqual(res, ovf_env_path)
mock_path_exists.assert_called_once_with(ovf_env_path)
mock_osutils.get_logical_drives.assert_called_once_with()
mock_osutils.get_volume_label.assert_called_once_with("fake_drive")
def test_get_ovf_env_path_exists(self):
self._test__get_ovf_env_path()
def test_get_ovf_env_path_not_exists(self):
self._test__get_ovf_env_path(path_exists=False)
@mock.patch(MODPATH + "._get_ovf_env")
def test_get_instance_id(self, mock_get_ovf_env):
mock_ovf_env = mock.Mock()
mock_get_ovf_env.return_value = mock_ovf_env
mock_ovf_env.Environment.PropertySection.Property = \
self._get_test_properties('instance-id')
res = self._ovfservice.get_instance_id()
mock_get_ovf_env.assert_called_once_with()
self.assertEqual(res, str(id(mock.sentinel.value)))
@mock.patch(MODPATH + "._get_ovf_env")
def test_get_instance_id_unset(self, mock_get_ovf_env):
mock_ovf_env = mock.Mock()
mock_get_ovf_env.return_value = mock_ovf_env
mock_ovf_env.Environment.PropertySection.Property = []
res = self._ovfservice.get_instance_id()
mock_get_ovf_env.assert_called_once_with()
self.assertEqual(res, 'iid-ovf')
@mock.patch(MODPATH + "._get_ovf_env")
def test_get_decoded_user_data(self, mock_get_ovf_env):
mock_ovf_env = mock.Mock()
mock_get_ovf_env.return_value = mock_ovf_env
mock_ovf_env.Environment.PropertySection.Property = \
self._get_test_properties('user-data', True)
res = self._ovfservice.get_decoded_user_data()
mock_get_ovf_env.assert_called_once_with()
self.assertEqual(res, str(id(mock.sentinel.value)).encode())
@mock.patch(MODPATH + "._get_ovf_env")
def test_get_host_name(self, mock_get_ovf_env):
mock_ovf_env = mock.Mock()
mock_get_ovf_env.return_value = mock_ovf_env
mock_ovf_env.Environment.PropertySection.Property = \
self._get_test_properties('hostname')
res = self._ovfservice.get_host_name()
mock_get_ovf_env.assert_called_once_with()
self.assertEqual(res, str(id(mock.sentinel.value)))
@mock.patch(MODPATH + "._get_ovf_env")
def test_get_public_keys(self, mock_get_ovf_env):
mock_ovf_env = mock.Mock()
mock_get_ovf_env.return_value = mock_ovf_env
mock_ovf_env.Environment.PropertySection.Property = \
self._get_test_properties('public-keys')
res = self._ovfservice.get_public_keys()
mock_get_ovf_env.assert_called_once_with()
assert type(res) == list
assert len(res) == 1
self.assertEqual(res[0], str(id(mock.sentinel.value)))
@mock.patch(MODPATH + "._get_ovf_env")
def test_get_admin_username(self, mock_get_ovf_env):
mock_ovf_env = mock.Mock()
mock_get_ovf_env.return_value = mock_ovf_env
mock_ovf_env.Environment.PropertySection.Property = \
self._get_test_properties('username')
res = self._ovfservice.get_admin_username()
mock_get_ovf_env.assert_called_once_with()
self.assertEqual(res, str(id(mock.sentinel.value)))
@mock.patch(MODPATH + "._get_ovf_env")
def test_get_admin_password(self, mock_get_ovf_env):
mock_ovf_env = mock.Mock()
mock_get_ovf_env.return_value = mock_ovf_env
mock_ovf_env.Environment.PropertySection.Property = \
self._get_test_properties('password')
res = self._ovfservice.get_admin_password()
mock_get_ovf_env.assert_called_once_with()
self.assertEqual(res, str(id(mock.sentinel.value)))
def _get_test_properties(self, property_name, is_encoded=False):
tested_prop = self._get_tested_property(property_name, is_encoded)
another_prop = self._get_another_property('AnotherProperty')
yet_another_prop = self._get_another_property('YetAnotherProperty')
return [another_prop, tested_prop, yet_another_prop]
def _get_tested_property(self, property_name, is_encoded):
if not is_encoded:
value = str(id(mock.sentinel.value))
else:
value = base64.b64encode(str(id(mock.sentinel.value)).encode())
return {'oe:key': property_name, 'oe:value': value}
def _get_another_property(self, property_name):
return {
'oe:key': property_name,
'oe:value': str(id(mock.sentinel.another_value))
}

View File

@ -180,6 +180,26 @@ Capabilities:
* authentication certificates (x509)
* user data
OVF
---
.. class:: cloudbaseinit.metadata.services.ovfservice.OvfService
The *OVF* provider searches data from OVF environment ISO transport.
Capabilities:
* instance ID
* host name
* public keys
* admin password
* user data
Config options:
* config_file_name (string: "ovf-env.xml")
* drive_label (string: "OVF ENV")
* ns (string: "oe")
----
Configuring available services