DHCP service conflict check

Change-Id: I6e51511c31adcaf84b76590a0d809998de3556c9
This commit is contained in:
qilongzhao 2017-02-21 20:09:08 +08:00
parent 44b0950438
commit 8d89037937
7 changed files with 803 additions and 0 deletions

View File

@ -0,0 +1,248 @@
# Copyright 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
/deploy_server endpoint for Daisy v1 API
"""
import webob.exc
import re
import commands
import subprocess
from oslo_log import log as logging
from webob.exc import HTTPBadRequest
from webob.exc import HTTPForbidden
from daisy.api import policy
import daisy.api.v1
from daisy.api.v1 import controller
from daisy.api.v1 import filters
from daisy.common import exception
from daisy.common import utils
from daisy.common import wsgi
from daisy import i18n
from daisy import notifier
import daisy.registry.client.v1.api as registry
LOG = logging.getLogger(__name__)
_ = i18n._
_LE = i18n._LE
_LI = i18n._LI
_LW = i18n._LW
SUPPORTED_PARAMS = daisy.api.v1.SUPPORTED_PARAMS
SUPPORTED_FILTERS = daisy.api.v1.SUPPORTED_FILTERS
ACTIVE_IMMUTABLE = daisy.api.v1.ACTIVE_IMMUTABLE
class Controller(controller.BaseController):
"""
WSGI controller for deploy_servers resource in Daisy v1 API
The deploy_servers resource API is a RESTful web service for
template_service data.
The API is as follows::
GET /deploy_servers -- Returns a set of brief metadata about
deploy_servers
GET /deploy_servers/detail -- Returns a set of detailed metadata
about deploy_servers
HEAD /deploy_servers/<ID> --
Return metadata about an template_service with id <ID>
GET /deploy_servers/<ID> --
Return template_service data for template_service with id <ID>
POST /deploy_servers --
Store template_service data and return metadata about the
newly-stored template_service
PUT /deploy_servers/<ID> --
Update template_service metadata and/or upload template_service
data for a previously-reserved template_service
DELETE /deploy_servers/<ID> -- Delete the deploy_servers with id
"""
def __init__(self):
self.notifier = notifier.Notifier()
registry.configure_registry_client()
self.policy = policy.Enforcer()
def _enforce(self, req, action, target=None):
"""Authorize an action against our policies"""
if target is None:
target = {}
try:
self.policy.enforce(req.context, action, target)
except exception.Forbidden:
raise HTTPForbidden()
def _get_filters(self, req):
"""
Return a dictionary of query param filters from the request
:param req: the Request object coming from the wsgi layer
:retval a dict of key/value filters
"""
query_filters = {}
for param in req.params:
if param in SUPPORTED_FILTERS:
query_filters[param] = req.params.get(param)
if not filters.validate(param, query_filters[param]):
raise HTTPBadRequest(_('Bad value passed to filter '
'%(filter)s got %(val)s')
% {'filter': param,
'val': query_filters[param]})
return query_filters
def _get_query_params(self, req):
"""
Extracts necessary query params from request.
:param req: the WSGI Request object
:retval dict of parameters that can be used by registry client
"""
params = {'filters': self._get_filters(req)}
for PARAM in SUPPORTED_PARAMS:
if PARAM in req.params:
params[PARAM] = req.params.get(PARAM)
return params
def get_nics(self):
nics = set()
(status, output) = commands.getstatusoutput('ifconfig')
net_card_pattern = re.compile('\S*: ')
for net_card in re.finditer(net_card_pattern, str(output)):
nic_name = net_card.group().split(': ')[0]
if nic_name == "lo":
continue
eth_port_name = nic_name.split(":")[0].split(".")[0]
nics.add(eth_port_name)
return nics
def get_pxe_nic(self, pxe_server_ip):
if_addr_nic_cmd = "ip addr show | grep %s | awk '{ print $7 }'" \
% pxe_server_ip
(status, output) = commands.getstatusoutput(if_addr_nic_cmd)
if status:
LOG.warn("get_pxe_server_port error %s." % status)
return
return str(output).split(":")[0].split(".")[0]
def list_deploy_server(self, req):
"""
Returns detailed information for all available deploy_servers
:param req: The WSGI/Webob Request object
:retval The response body is a mapping of the following form::
{'deploy_servers': [
{'id': <ID>,
'name': <NAME>,
......
'created_at': <TIMESTAMP>,
'updated_at': <TIMESTAMP>,
'deleted_at': <TIMESTAMP>|<NONE>,}, ...
]}
"""
self._enforce(req, 'list_deploy_server')
params = self._get_query_params(req)
params['filters'] = {'type': 'system'}
try:
deploy_servers = registry.get_all_networks(req.context, **params)
except Exception:
LOG.error("Get system net plane failed.")
raise HTTPBadRequest(
explanation="Get system net plane failed.",
request=req)
if len(deploy_servers) != 1:
msg = (_("system net plane is not only, "
"%s." % len(deploy_servers)))
LOG.error(msg)
raise HTTPBadRequest(
explanation=msg,
request=req)
deploy_servers[0]["nics"] = self.get_nics()
deploy_servers[0]["pxe_nic"] = self.get_pxe_nic(
deploy_servers[0]["ip"])
return dict(deploy_servers=deploy_servers)
@utils.mutating
def pxe_env_check(self, req, deploy_server_meta):
def get_error_msg(in_child):
end_flag = 'end check'
error_flag = "[error]"
error_msg = ""
while True:
buff = in_child.stdout.readline()
if (buff == '' or buff.find(end_flag) > 0) and \
(in_child.poll() is not None):
break
if buff.find(error_flag) == 0:
error_msg += buff[len(error_flag):]
return error_msg
self._enforce(req, 'pxe_env_check')
interface = deploy_server_meta["deployment_interface"]
server_ip = deploy_server_meta["server_ip"]
cmd = "pxe_env_check %s %s" % (interface, server_ip)
try:
child = subprocess.Popen(
cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
child.wait()
if child.returncode != 0:
msg = get_error_msg(child)
LOG.error(msg)
raise webob.exc.HTTPBadRequest(explanation=msg)
except subprocess.CalledProcessError as e:
msg = "pxe env check failed!%s" % e.output.strip()
LOG.error(msg)
raise webob.exc.HTTPBadRequest(explanation=msg)
except Exception as e:
msg = "Failed to pxe env check,%s" % e
LOG.error(msg)
raise webob.exc.HTTPBadRequest(explanation=msg)
else:
LOG.info("pxe env check ok!")
return {'deploy_server_meta': {'return_code': 0}}
class DeployServerDeserializer(wsgi.JSONRequestDeserializer):
"""Handles deserialization of specific controller method requests."""
def _deserialize(self, request):
result = {}
result["deploy_server_meta"] = utils.get_dict_meta(request)
return result
def pxe_env_check(self, request):
return self._deserialize(request)
class DeployServerSerializer(wsgi.JSONResponseSerializer):
"""Handles serialization of specific controller method responses."""
def __init__(self):
self.notifier = notifier.Notifier()
def pxe_env_check(self, response, result):
response.status = 201
response.headers['Content-Type'] = 'application/json'
response.body = self.to_json(result)
return response
def create_resource():
"""deploy server resource factory method"""
deserializer = DeployServerDeserializer()
serializer = DeployServerSerializer()
return wsgi.Resource(Controller(), deserializer, serializer)

View File

@ -39,6 +39,7 @@ from daisy.api.v1 import version_patchs
from daisy.api.v1 import template_configs
from daisy.api.v1 import template_funcs
from daisy.api.v1 import template_services
from daisy.api.v1 import deploy_server
class API(wsgi.Router):
@ -639,6 +640,16 @@ class API(wsgi.Router):
action="get_template_service",
conditions=dict(method=["GET"]))
deploy_server_resource = deploy_server.create_resource()
mapper.connect("/deploy_server",
controller=deploy_server_resource,
action="list_deploy_server",
conditions={'method': ['GET']})
mapper.connect("/deploy_servers/pxe_env_check",
controller=deploy_server_resource,
action='pxe_env_check',
conditions={'method': ['POST']})
path = os.path.join(os.path.abspath(os.path.dirname(
os.path.realpath(__file__))),
'ext')

View File

@ -0,0 +1,239 @@
import mock
import webob
from daisy import test
from daisy.context import RequestContext
from daisy.api.v1 import deploy_server
class MockStdout():
def __init__(self):
pass
def readline(self):
return ''
class MockPopen():
def __init__(self, returncode):
self.returncode = returncode
self.stdout = MockStdout()
def wait(self):
pass
def poll(self):
return True
class DeployServerFuncs(test.TestCase):
def setUp(self):
super(DeployServerFuncs, self).setUp()
self.controller = deploy_server.Controller()
self.req = webob.Request.blank('/')
self.req.context = RequestContext(is_admin=True, user='fake user',
tenant='fake tenant')
@mock.patch("commands.getstatusoutput")
def test_get_nics(self, mock_do_getstatusoutput):
def mock_getstatusoutput(*args, **kwargs):
return (0, 'docker0: eth0: eth1: ')
mock_do_getstatusoutput.side_effect = mock_getstatusoutput
nics = self.controller.get_nics()
self.assertEqual(set(['docker0', 'eth1', 'eth0']), nics)
@mock.patch("commands.getstatusoutput")
def test_get_nics_with_lo(self, mock_do_getstatusoutput):
def mock_getstatusoutput(*args, **kwargs):
return (0, 'docker0: eth0: eth1: lo: ')
mock_do_getstatusoutput.side_effect = mock_getstatusoutput
nics = self.controller.get_nics()
self.assertEqual(set(['docker0', 'eth1', 'eth0']), nics)
@mock.patch("commands.getstatusoutput")
def test_get_pxe_nic(self, mock_do_getstatusoutput):
def mock_getstatusoutput(*args, **kwargs):
return (0, 'bond0:100')
mock_do_getstatusoutput.side_effect = mock_getstatusoutput
pxe_nic = self.controller.get_pxe_nic("99.99.1.5")
self.assertEqual("bond0", pxe_nic)
@mock.patch("commands.getstatusoutput")
def test_get_pxe_nic_with_error(self, mock_do_getstatusoutput):
def mock_getstatusoutput(*args, **kwargs):
return (1, 'bond0:100')
mock_do_getstatusoutput.side_effect = mock_getstatusoutput
pxe_nic = self.controller.get_pxe_nic("99.99.1.5")
self.assertEqual(None, pxe_nic)
@mock.patch("daisy.registry.client.v1.api.get_all_networks")
@mock.patch("daisy.api.v1.deploy_server.Controller.get_nics")
@mock.patch("daisy.api.v1.deploy_server.Controller.get_pxe_nic")
def test_list_deploy_server(self, mock_do_get_pxe_nic,
mock_do_get_nics,
mock_do_get_all_networks):
def mock_get_all_networks(*args, **kwargs):
return [{"ip": "99.99.1.5"}]
def mock_get_nics(*args, **kwargs):
return set(['docker0', 'eth1', 'eth0'])
def mock_get_pxe_nic(*args, **kwargs):
return "bond0"
mock_do_get_all_networks.side_effect = mock_get_all_networks
mock_do_get_nics.side_effect = mock_get_nics
mock_do_get_pxe_nic.side_effect = mock_get_pxe_nic
req = webob.Request.blank('/')
req.context = RequestContext(is_admin=True,
user='fake user',
tenant='fake tenant')
actual = {'deploy_servers': [
{
'ip': '99.99.1.5',
'nics': set(['docker0', 'eth0', 'eth1']),
'pxe_nic': 'bond0'
}]}
deploy_servers = self.controller.list_deploy_server(req)
self.assertEqual(actual, deploy_servers)
@mock.patch("daisy.registry.client.v1.api.get_all_networks")
@mock.patch("daisy.api.v1.deploy_server.Controller.get_nics")
@mock.patch("daisy.api.v1.deploy_server.Controller.get_pxe_nic")
def test_list_more_deploy_server(self, mock_do_get_pxe_nic,
mock_do_get_nics,
mock_do_get_all_networks):
def mock_get_all_networks(*args, **kwargs):
return [{"ip": "99.99.1.5"}, {"ip": "99.99.1.5"}]
def mock_get_nics(*args, **kwargs):
return set(['docker0', 'eth1', 'eth0'])
def mock_get_pxe_nic(*args, **kwargs):
return "bond0"
mock_do_get_all_networks.side_effect = mock_get_all_networks
mock_do_get_nics.side_effect = mock_get_nics
mock_do_get_pxe_nic.side_effect = mock_get_pxe_nic
req = webob.Request.blank('/')
req.context = RequestContext(is_admin=True,
user='fake user',
tenant='fake tenant')
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.list_deploy_server,
req)
@mock.patch("daisy.registry.client.v1.api.get_all_networks")
@mock.patch("daisy.api.v1.deploy_server.Controller.get_nics")
@mock.patch("daisy.api.v1.deploy_server.Controller.get_pxe_nic")
def test_list_more_deploy_server_with_error(
self, mock_do_get_pxe_nic, mock_do_get_nics,
mock_do_get_all_networks):
def mock_get_all_networks(*args, **kwargs):
raise webob.exc.HTTPBadRequest(
explanation="Get system net plane failed.",
request=self.req)
def mock_get_nics(*args, **kwargs):
return set(['docker0', 'eth1', 'eth0'])
def mock_get_pxe_nic(*args, **kwargs):
return "bond0"
mock_do_get_all_networks.side_effect = mock_get_all_networks
mock_do_get_nics.side_effect = mock_get_nics
mock_do_get_pxe_nic.side_effect = mock_get_pxe_nic
req = webob.Request.blank('/')
req.context = RequestContext(is_admin=True,
user='fake user',
tenant='fake tenant')
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.list_deploy_server,
req)
@mock.patch("subprocess.Popen")
def test_pxe_env_check(self, mock_do_popen):
def mock_popen(*args, **kwargs):
_popen = MockPopen(0)
return _popen
mock_do_popen.side_effect = mock_popen
req = webob.Request.blank('/')
req.context = RequestContext(is_admin=True,
user='fake user',
tenant='fake tenant')
deploy_server_meta = {
"deployment_interface": "bond0",
"server_ip": "99.99.1.5"}
actual = {'deploy_server_meta': {'return_code': 0}}
result = self.controller.pxe_env_check(req, deploy_server_meta)
self.assertEqual(actual, result)
@mock.patch("subprocess.Popen")
def test_pxe_env_check_with_error_popen(self, mock_do_popen):
def mock_popen(*args, **kwargs):
_popen = MockPopen(1)
return _popen
mock_do_popen.side_effect = mock_popen
req = webob.Request.blank('/')
req.context = RequestContext(is_admin=True,
user='fake user',
tenant='fake tenant')
deploy_server_meta = {
"deployment_interface": "bond0",
"server_ip": "99.99.1.5"}
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.pxe_env_check,
req, deploy_server_meta)
@mock.patch("subprocess.Popen")
def test_pxe_env_check_call_process_error(self, mock_do_popen):
def mock_popen(*args, **kwargs):
e = subprocess.CalledProcessError(0, 'test')
e.output = 'test error'
raise e
mock_do_popen.side_effect = mock_popen
req = webob.Request.blank('/')
req.context = RequestContext(is_admin=True,
user='fake user',
tenant='fake tenant')
deploy_server_meta = {
"deployment_interface": "bond0",
"server_ip": "99.99.1.5"}
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.pxe_env_check,
req, deploy_server_meta)
@mock.patch("subprocess.Popen")
def test_pxe_env_check_error(self, mock_do_popen):
def mock_popen(*args, **kwargs):
_popen = MockPopen(1)
return _popen
mock_do_popen.side_effect = mock_popen
req = webob.Request.blank('/')
req.context = RequestContext(is_admin=True,
user='fake user',
tenant='fake tenant')
deploy_server_meta = {
"deployment_interface": "bond0",
"server_ip": "99.99.1.5"}
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.pxe_env_check,
req, deploy_server_meta)

View File

@ -36,6 +36,7 @@ from daisyclient.v1.backup_restore import BackupRestoreManager
from daisyclient.v1.backend_types import BackendTypesManager
from daisyclient.v1.versions import VersionManager
from daisyclient.v1.version_patchs import VersionPatchManager
from daisyclient.v1.deploy_server import DeployServerManager
class Client(object):
@ -74,3 +75,4 @@ class Client(object):
self.backend_types = BackendTypesManager(self.http_client)
self.versions = VersionManager(self.http_client)
self.version_patchs = VersionPatchManager(self.http_client)
self.deploy_server = DeployServerManager(self.http_client)

View File

@ -0,0 +1,234 @@
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
from oslo_utils import encodeutils
from oslo_utils import strutils
import six
import six.moves.urllib.parse as urlparse
from daisyclient.common import utils
from daisyclient.openstack.common.apiclient import base
PXE_ENV_CHECK_PARAMS = ('deployment_interface', 'server_ip')
DEFAULT_PAGE_SIZE = 200
SORT_DIR_VALUES = ('asc', 'desc')
SORT_KEY_VALUES = ('id', 'created_at', 'updated_at')
OS_REQ_ID_HDR = 'x-openstack-request-id'
class DeployServer(base.Resource):
def __repr__(self):
return "<DeployServer %s>" % self._info
def update(self, **fields):
self.manager.update(self, **fields)
def delete(self, **kwargs):
return self.manager.delete(self)
def data(self, **kwargs):
return self.manager.data(self, **kwargs)
class DeployServerManager(base.ManagerWithFind):
resource_class = DeployServer
def _deploy_server_meta_to_headers(self, fields):
headers = {}
fields_copy = copy.deepcopy(fields)
# NOTE(flaper87): Convert to str, headers
# that are not instance of basestring. All
# headers will be encoded later, before the
# request is sent.
for key, value in six.iteritems(fields_copy):
headers['%s' % key] = utils.to_str(value)
return headers
@staticmethod
def _format_deploy_server_meta_for_user(meta):
for key in ['size', 'min_ram', 'min_disk']:
if key in meta:
try:
meta[key] = int(meta[key]) if meta[key] else 0
except ValueError:
pass
return meta
def _list(self, url, response_key, obj_class=None, body=None):
resp, body = self.client.get(url)
if obj_class is None:
obj_class = self.resource_class
data = body[response_key]
return ([obj_class(self, res, loaded=True) for res in data if res],
resp)
def _build_params(self, parameters):
params = {'limit': parameters.get('page_size', DEFAULT_PAGE_SIZE)}
if 'marker' in parameters:
params['marker'] = parameters['marker']
sort_key = parameters.get('sort_key')
if sort_key is not None:
if sort_key in SORT_KEY_VALUES:
params['sort_key'] = sort_key
else:
raise ValueError('sort_key must be one of the following: %s.'
% ', '.join(SORT_KEY_VALUES))
sort_dir = parameters.get('sort_dir')
if sort_dir is not None:
if sort_dir in SORT_DIR_VALUES:
params['sort_dir'] = sort_dir
else:
raise ValueError('sort_dir must be one of the following: %s.'
% ', '.join(SORT_DIR_VALUES))
filters = parameters.get('filters', {})
params.update(filters)
return params
def get(self, id):
"""get deploy server information by id."""
pass
def list(self, **kwargs):
"""Get a list of deploy server.
:param page_size: number of items to request in each paginated request
:param limit: maximum number of services to return
:param marker: begin returning services that appear later in the
service ist than that represented by this service id
:param filters: dict of direct comparison filters that mimics the
structure of an service object
:param return_request_id: If an empty list is provided, populate this
list with the request ID value from the header
x-openstack-request-id
:rtype: list of :class:`DeployServer`
"""
absolute_limit = kwargs.get('limit')
page_size = kwargs.get('page_size', DEFAULT_PAGE_SIZE)
def paginate(qp, return_request_id=None):
for param, value in six.iteritems(qp):
if isinstance(value, six.string_types):
# Note(flaper87) Url encoding should
# be moved inside http utils, at least
# shouldn't be here.
#
# Making sure all params are str before
# trying to encode them
qp[param] = encodeutils.safe_decode(value)
url = '/v1/deploy_server?%s' % urlparse.urlencode(qp)
deploy_servers, resp = self._list(url, "deploy_servers")
if return_request_id is not None:
return_request_id.append(resp.headers.get(OS_REQ_ID_HDR, None))
for deploy_server in deploy_servers:
yield deploy_server
return_request_id = kwargs.get('return_req_id', None)
params = self._build_params(kwargs)
seen = 0
while True:
seen_last_page = 0
filtered = 0
for deploy_server in paginate(params, return_request_id):
last_deploy_server = deploy_server.id
if (absolute_limit is not None and
seen + seen_last_page >= absolute_limit):
# Note(kragniz): we've seen enough images
return
else:
seen_last_page += 1
yield deploy_server
seen += seen_last_page
if seen_last_page + filtered == 0:
"""
Note(kragniz): we didn't get any deploy_servers
in the last page
"""
return
if absolute_limit is not None and seen >= absolute_limit:
# Note(kragniz): reached the limit of deploy_servers to return
return
if page_size and seen_last_page + filtered < page_size:
"""
Note(kragniz): we've reached the last page
of the deploy_servers
"""
return
# Note(kragniz): there are more deploy_servers to come
params['marker'] = last_deploy_server
seen_last_page = 0
def add(self, **kwargs):
"""Add .
TODO(bcwaldon): document accepted params
"""
pass
def delete(self, id):
"""Delete."""
pass
def update(self, id, **kwargs):
"""Update"""
pass
def pxe_env_check(self, **kwargs):
"""pxe env check
TODO(bcwaldon): document accepted params
"""
fields = {}
for field in kwargs:
if field in PXE_ENV_CHECK_PARAMS:
fields[field] = kwargs[field]
elif field == 'return_req_id':
continue
else:
msg = "pxe_env_check() got an unexpected "\
"keyword argument '%s'"
raise TypeError(msg % field)
url = '/v1/deploy_servers/pxe_env_check'
hdrs = self._deploy_server_meta_to_headers(fields)
resp, body = self.client.post(url, headers=None, data=hdrs)
return DeployServer(
self, self._format_deploy_server_meta_for_user(
body['deploy_server_meta']))

View File

@ -45,6 +45,7 @@ from daisyclient.v1 import param_helper
import daisyclient.v1.backup_restore
import daisyclient.v1.versions
import daisyclient.v1.version_patchs
import daisyclient.v1.deploy_server
from daisy.common import utils as daisy_utils
_bool_strict = functools.partial(strutils.bool_from_string, strict=True)
@ -2601,3 +2602,42 @@ def do_version_patch_delete(dc, args):
print('[Fail]')
print('%s: Unable to delete version_patch %s'
% (e, version_patch))
@utils.arg('--page-size', metavar='<SIZE>', default=None, type=int,
help='Number to request in each paginated request.')
@utils.arg('--sort-key', default='id',
choices=daisyclient.v1.deploy_server.SORT_KEY_VALUES,
help='Sort deploy server list by specified field.')
@utils.arg('--sort-dir', default='asc',
choices=daisyclient.v1.deploy_server.SORT_DIR_VALUES,
help='Sort deploy server list in specified direction.')
def do_deploy_server_list(dc, args):
"""List deploy servers you can access."""
kwargs = {'filters': {}}
if args.page_size is not None:
kwargs['page_size'] = args.page_size
kwargs['sort_key'] = args.sort_key
kwargs['sort_dir'] = args.sort_dir
template_funcs = dc.deploy_server.list(**kwargs)
columns = ['ID', 'Name', 'Cluster_id', 'Description',
'Vlan_start', 'Vlan_end', 'Gateway', 'Cidr',
'Type', 'Ip_ranges', 'Segmentation_type',
'custom_name', 'nics', 'pxe_nic']
utils.print_list(template_funcs, columns)
@utils.arg('deployment_interface', metavar='<DEPLOYMENT_INTERFACE>',
help='The interface to deploy.')
@utils.arg('server_ip', metavar='<SERVER_IP>',
help='The server ip to deploy.')
def do_pxe_env_check(gc, args):
"""Check pxe env."""
fields = {}
fields.update({
'deployment_interface': args.deployment_interface,
'server_ip': args.server_ip})
pxe_env = gc.deploy_server.pxe_env_check(**fields)
_daisy_show(pxe_env)

View File

@ -0,0 +1,29 @@
import mock
import unittest
import webob
from daisyclient.v1 import client
from daisyclient.v1 import deploy_server
from daisyclient.common import http
endpoint = 'http://127.0.0.1:29292'
client_mata = {'ssl_compression': True, 'insecure': False, 'timeout': 600,
'cert': None, 'key': None, 'cacert': ''}
class TestDeployServerManager(unittest.TestCase):
def setUp(self):
super(TestDeployServerManager, self).setUp()
self.client = http.HTTPClient(endpoint, **client_mata)
self.manager = deploy_server.DeployServerManager(self.client)
@mock.patch('daisyclient.common.http.HTTPClient._request')
def test_pxe_env_check(self, mock_do_request):
def mock_request(method, url, **kwargs):
resp = webob.Response()
resp.status_code = 200
body = {'deploy_server_meta': {'return_code': 0}}
return resp, body
mock_do_request.side_effect = mock_request
pxe_env_check = self.manager.pxe_env_check(**version_meta)
self.assertEqual(0, pxe_env_check.return_code)