Split out quantum.client and quantum.common.

Change-Id: I1241bcd3305b6859c0cd2bb8c35b523e27aa3b18
This commit is contained in:
Monty Taylor 2012-01-19 11:45:25 +11:00
parent e85f2ea8bf
commit 13222958a5
30 changed files with 33 additions and 3568 deletions

2
.gitignore vendored
View File

@ -2,9 +2,9 @@
*.DS_Store *.DS_Store
build/* build/*
build-stamp build-stamp
quantum.egg-info/
run_tests.err.log run_tests.err.log
run_tests.log run_tests.log
tests/
.quantum-venv/ .quantum-venv/
.venv/ .venv/
quantum/vcsversion.py quantum/vcsversion.py

View File

@ -1,24 +0,0 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Nicira Neworks, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import sys
sys.path.insert(0, os.getcwd())
import quantum.client.cli as cli
cli.main()

View File

@ -49,7 +49,7 @@ auth_admin_password = secrete
#auth_admin_token = <token-value> #auth_admin_token = <token-value>
[filter:extensions] [filter:extensions]
paste.filter_factory = quantum.common.extensions:plugin_aware_extension_middleware_factory paste.filter_factory = quantum.extensions.extensions:plugin_aware_extension_middleware_factory
[app:quantumversions] [app:quantumversions]
paste.app_factory = quantum.api.versions:Versions.factory paste.app_factory = quantum.api.versions:Versions.factory

View File

@ -18,7 +18,7 @@ api_extensions_path = ../../../../extensions
pipeline = extensions extensions_test_app pipeline = extensions extensions_test_app
[filter:extensions] [filter:extensions]
paste.filter_factory = quantum.common.extensions:plugin_aware_extension_middleware_factory paste.filter_factory = quantum.extensions.extensions:plugin_aware_extension_middleware_factory
[app:extensions_test_app] [app:extensions_test_app]
paste.app_factory = quantum.plugins.cisco.tests.unit.test_cisco_extension:app_factory paste.app_factory = quantum.plugins.cisco.tests.unit.test_cisco_extension:app_factory

View File

@ -1,359 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Citrix Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# @author: Tyler Smith, Cisco Systems
import logging
import httplib
import socket
import urllib
from quantum.common import exceptions
from quantum.common.serializer import Serializer
LOG = logging.getLogger('quantum.client')
EXCEPTIONS = {
400: exceptions.BadInputError,
401: exceptions.NotAuthorized,
420: exceptions.NetworkNotFound,
421: exceptions.NetworkInUse,
430: exceptions.PortNotFound,
431: exceptions.StateInvalid,
432: exceptions.PortInUseClient,
440: exceptions.AlreadyAttachedClient,
501: exceptions.NotImplementedError}
AUTH_TOKEN_HEADER = "X-Auth-Token"
class ApiCall(object):
"""A Decorator to add support for format and tenant overriding"""
def __init__(self, function):
self.function = function
def __get__(self, instance, owner):
def with_params(*args, **kwargs):
"""
Temporarily sets the format and tenant for this request
"""
(format, tenant) = (instance.format, instance.tenant)
if 'format' in kwargs:
instance.format = kwargs['format']
if 'tenant' in kwargs:
instance.tenant = kwargs['tenant']
ret = self.function(instance, *args)
(instance.format, instance.tenant) = (format, tenant)
return ret
return with_params
class Client(object):
"""A base client class - derived from Glance.BaseClient"""
#Metadata for deserializing xml
_serialization_metadata = {
"application/xml": {
"attributes": {
"network": ["id", "name"],
"port": ["id", "state"],
"attachment": ["id"]},
"plurals": {"networks": "network",
"ports": "port"}},
}
# Action query strings
networks_path = "/networks"
network_path = "/networks/%s"
ports_path = "/networks/%s/ports"
port_path = "/networks/%s/ports/%s"
attachment_path = "/networks/%s/ports/%s/attachment"
def __init__(self, host="127.0.0.1", port=9696, use_ssl=False, tenant=None,
format="xml", testingStub=None, key_file=None, cert_file=None,
auth_token=None, logger=None,
action_prefix="/v1.0/tenants/{tenant_id}"):
"""
Creates a new client to some service.
:param host: The host where service resides
:param port: The port where service resides
:param use_ssl: True to use SSL, False to use HTTP
:param tenant: The tenant ID to make requests with
:param format: The format to query the server with
:param testingStub: A class that stubs basic server methods for tests
:param key_file: The SSL key file to use if use_ssl is true
:param cert_file: The SSL cert file to use if use_ssl is true
:param auth_token: authentication token to be passed to server
:param logger: Logger object for the client library
:param action_prefix: prefix for request URIs
"""
self.host = host
self.port = port
self.use_ssl = use_ssl
self.tenant = tenant
self.format = format
self.connection = None
self.testingStub = testingStub
self.key_file = key_file
self.cert_file = cert_file
self.logger = logger
self.auth_token = auth_token
self.action_prefix = action_prefix
def get_connection_type(self):
"""
Returns the proper connection type
"""
if self.testingStub:
return self.testingStub
if self.use_ssl:
return httplib.HTTPSConnection
else:
return httplib.HTTPConnection
def _send_request(self, conn, method, action, body, headers):
# Salvatore: Isolating this piece of code in its own method to
# facilitate stubout for testing
if self.logger:
self.logger.debug("Quantum Client Request:\n" \
+ method + " " + action + "\n")
if body:
self.logger.debug(body)
conn.request(method, action, body, headers)
return conn.getresponse()
def do_request(self, method, action, body=None,
headers=None, params=None, exception_args={}):
"""
Connects to the server and issues a request.
Returns the result data, or raises an appropriate exception if
HTTP status code is not 2xx
:param method: HTTP method ("GET", "POST", "PUT", etc...)
:param body: string of data to send, or None (default)
:param headers: mapping of key/value pairs to add as headers
:param params: dictionary of key/value pairs to add to append
to action
"""
LOG.debug("Client issuing request: %s", action)
# Ensure we have a tenant id
if not self.tenant:
raise Exception("Tenant ID not set")
# Add format and tenant_id
action += ".%s" % self.format
action = self.action_prefix + action
action = action.replace('{tenant_id}', self.tenant)
if isinstance(params, dict):
action += '?' + urllib.urlencode(params)
if body:
body = self.serialize(body)
try:
connection_type = self.get_connection_type()
headers = headers or {"Content-Type":
"application/%s" % self.format}
# if available, add authentication token
if self.auth_token:
headers[AUTH_TOKEN_HEADER] = self.auth_token
# Open connection and send request, handling SSL certs
certs = {'key_file': self.key_file, 'cert_file': self.cert_file}
certs = dict((x, certs[x]) for x in certs if certs[x] is not None)
if self.use_ssl and len(certs):
conn = connection_type(self.host, self.port, **certs)
else:
conn = connection_type(self.host, self.port)
res = self._send_request(conn, method, action, body, headers)
status_code = self.get_status_code(res)
data = res.read()
if self.logger:
self.logger.debug("Quantum Client Reply (code = %s) :\n %s" \
% (str(status_code), data))
if status_code in (httplib.OK,
httplib.CREATED,
httplib.ACCEPTED,
httplib.NO_CONTENT):
return self.deserialize(data, status_code)
else:
error_message = res.read()
LOG.debug("Server returned error: %s", status_code)
LOG.debug("Error message: %s", error_message)
# Create exception with HTTP status code and message
if res.status in EXCEPTIONS:
raise EXCEPTIONS[res.status](**exception_args)
# Add error code and message to exception arguments
ex = Exception("Server returned error: %s" % status_code)
ex.args = ([dict(status_code=status_code,
message=error_message)],)
raise ex
except (socket.error, IOError), e:
msg = "Unable to connect to server. Got error: %s" % e
LOG.exception(msg)
raise Exception(msg)
def get_status_code(self, response):
"""
Returns the integer status code from the response, which
can be either a Webob.Response (used in testing) or httplib.Response
"""
if hasattr(response, 'status_int'):
return response.status_int
else:
return response.status
def serialize(self, data):
"""
Serializes a dictionary with a single key (which can contain any
structure) into either xml or json
"""
if data is None:
return None
elif isinstance(data, dict):
return Serializer().serialize(data, self.content_type())
else:
raise Exception("unable to serialize object of type = '%s'" \
% type(data))
def deserialize(self, data, status_code):
"""
Deserializes a an xml or json string into a dictionary
"""
if status_code == 204:
return data
return Serializer(self._serialization_metadata).\
deserialize(data, self.content_type())
def content_type(self, format=None):
"""
Returns the mime-type for either 'xml' or 'json'. Defaults to the
currently set format
"""
if not format:
format = self.format
return "application/%s" % (format)
@ApiCall
def list_networks(self):
"""
Fetches a list of all networks for a tenant
"""
return self.do_request("GET", self.networks_path)
@ApiCall
def show_network_details(self, network):
"""
Fetches the details of a certain network
"""
return self.do_request("GET", self.network_path % (network),
exception_args={"net_id": network})
@ApiCall
def create_network(self, body=None):
"""
Creates a new network
"""
return self.do_request("POST", self.networks_path, body=body)
@ApiCall
def update_network(self, network, body=None):
"""
Updates a network
"""
return self.do_request("PUT", self.network_path % (network), body=body,
exception_args={"net_id": network})
@ApiCall
def delete_network(self, network):
"""
Deletes the specified network
"""
return self.do_request("DELETE", self.network_path % (network),
exception_args={"net_id": network})
@ApiCall
def list_ports(self, network):
"""
Fetches a list of ports on a given network
"""
return self.do_request("GET", self.ports_path % (network))
@ApiCall
def show_port_details(self, network, port):
"""
Fetches the details of a certain port
"""
return self.do_request("GET", self.port_path % (network, port),
exception_args={"net_id": network, "port_id": port})
@ApiCall
def create_port(self, network, body=None):
"""
Creates a new port on a given network
"""
return self.do_request("POST", self.ports_path % (network), body=body,
exception_args={"net_id": network})
@ApiCall
def delete_port(self, network, port):
"""
Deletes the specified port from a network
"""
return self.do_request("DELETE", self.port_path % (network, port),
exception_args={"net_id": network, "port_id": port})
@ApiCall
def update_port(self, network, port, body=None):
"""
Sets the attributes of the specified port
"""
return self.do_request("PUT",
self.port_path % (network, port), body=body,
exception_args={"net_id": network,
"port_id": port})
@ApiCall
def show_port_attachment(self, network, port):
"""
Fetches the attachment-id associated with the specified port
"""
return self.do_request("GET", self.attachment_path % (network, port),
exception_args={"net_id": network, "port_id": port})
@ApiCall
def attach_resource(self, network, port, body=None):
"""
Sets the attachment-id of the specified port
"""
return self.do_request("PUT",
self.attachment_path % (network, port), body=body,
exception_args={"net_id": network,
"port_id": port,
"attach_id": str(body)})
@ApiCall
def detach_resource(self, network, port):
"""
Removes the attachment-id of the specified port
"""
return self.do_request("DELETE",
self.attachment_path % (network, port),
exception_args={"net_id": network, "port_id": port})

View File

@ -1,167 +0,0 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Nicira Networks, Inc.
# Copyright 2011 Citrix Systems
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# @author: Somik Behera, Nicira Networks, Inc.
# @author: Brad Hall, Nicira Networks, Inc.
# @author: Salvatore Orlando, Citrix
import gettext
import logging
import logging.handlers
import os
import sys
from optparse import OptionParser
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir,
os.pardir))
if os.path.exists(os.path.join(possible_topdir, 'quantum', '__init__.py')):
sys.path.insert(0, possible_topdir)
gettext.install('quantum', unicode=1)
from quantum.client import cli_lib
from quantum.client import Client
#Configure logger for client - cli logger is a child of it
#NOTE(salvatore-orlando): logger name does not map to package
#this is deliberate. Simplifies logger configuration
LOG = logging.getLogger('quantum')
FORMAT = 'json'
commands = {
"list_nets": {
"func": cli_lib.list_nets,
"args": ["tenant-id"]},
"create_net": {
"func": cli_lib.create_net,
"args": ["tenant-id", "net-name"]},
"delete_net": {
"func": cli_lib.delete_net,
"args": ["tenant-id", "net-id"]},
"show_net": {
"func": cli_lib.show_net,
"args": ["tenant-id", "net-id"]},
"update_net": {
"func": cli_lib.update_net,
"args": ["tenant-id", "net-id", "new-name"]},
"list_ports": {
"func": cli_lib.list_ports,
"args": ["tenant-id", "net-id"]},
"create_port": {
"func": cli_lib.create_port,
"args": ["tenant-id", "net-id"]},
"delete_port": {
"func": cli_lib.delete_port,
"args": ["tenant-id", "net-id", "port-id"]},
"update_port": {
"func": cli_lib.update_port,
"args": ["tenant-id", "net-id", "port-id", "params"]},
"show_port": {
"func": cli_lib.show_port,
"args": ["tenant-id", "net-id", "port-id"]},
"plug_iface": {
"func": cli_lib.plug_iface,
"args": ["tenant-id", "net-id", "port-id", "iface-id"]},
"unplug_iface": {
"func": cli_lib.unplug_iface,
"args": ["tenant-id", "net-id", "port-id"]}, }
def help():
print "\nCommands:"
for k in commands.keys():
print " %s %s" % (k,
" ".join(["<%s>" % y for y in commands[k]["args"]]))
def build_args(cmd, cmdargs, arglist):
args = []
orig_arglist = arglist[:]
try:
for x in cmdargs:
args.append(arglist[0])
del arglist[0]
except:
LOG.error("Not enough arguments for \"%s\" (expected: %d, got: %d)" % (
cmd, len(cmdargs), len(orig_arglist)))
print "Usage:\n %s %s" % (cmd,
" ".join(["<%s>" % y for y in commands[cmd]["args"]]))
return None
if len(arglist) > 0:
LOG.error("Too many arguments for \"%s\" (expected: %d, got: %d)" % (
cmd, len(cmdargs), len(orig_arglist)))
print "Usage:\n %s %s" % (cmd,
" ".join(["<%s>" % y for y in commands[cmd]["args"]]))
return None
return args
def main():
usagestr = "Usage: %prog [OPTIONS] <command> [args]"
parser = OptionParser(usage=usagestr)
parser.add_option("-H", "--host", dest="host",
type="string", default="127.0.0.1", help="ip address of api host")
parser.add_option("-p", "--port", dest="port",
type="int", default=9696, help="api poort")
parser.add_option("-s", "--ssl", dest="ssl",
action="store_true", default=False, help="use ssl")
parser.add_option("-v", "--verbose", dest="verbose",
action="store_true", default=False, help="turn on verbose logging")
parser.add_option("-f", "--logfile", dest="logfile",
type="string", default="syslog", help="log file path")
parser.add_option("-t", "--token", dest="token",
type="string", default=None, help="authentication token")
options, args = parser.parse_args()
if options.verbose:
LOG.setLevel(logging.DEBUG)
else:
LOG.setLevel(logging.WARN)
#logging.handlers.WatchedFileHandler
if options.logfile == "syslog":
LOG.addHandler(logging.handlers.SysLogHandler(address='/dev/log'))
else:
LOG.addHandler(logging.handlers.WatchedFileHandler(options.logfile))
# Set permissions on log file
os.chmod(options.logfile, 0644)
if len(args) < 1:
parser.print_help()
help()
sys.exit(1)
cmd = args[0]
if cmd not in commands.keys():
LOG.error("Unknown command: %s" % cmd)
help()
sys.exit(1)
args = build_args(cmd, commands[cmd]["args"], args[1:])
if not args:
sys.exit(1)
LOG.info("Executing command \"%s\" with args: %s" % (cmd, args))
client = Client(options.host, options.port, options.ssl,
args[0], FORMAT,
auth_token=options.token)
commands[cmd]["func"](client, *args)
LOG.info("Command execution completed")
sys.exit(0)

View File

@ -1,335 +0,0 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Nicira Networks, Inc.
# Copyright 2011 Citrix Systems
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# @author: Somik Behera, Nicira Networks, Inc.
# @author: Brad Hall, Nicira Networks, Inc.
# @author: Salvatore Orlando, Citrix
""" Functions providing implementation for CLI commands. """
import logging
import os
import sys
FORMAT = "json"
LOG = logging.getLogger('quantum.client.cli_lib')
class OutputTemplate(object):
""" A class for generating simple templated output.
Based on Python templating mechanism.
Templates can also express attributes on objects, such as network.id;
templates can also be nested, thus allowing for iteration on inner
templates.
Examples:
1) template with class attributes
Name: %(person.name)s \n
Surname: %(person.surname)s \n
2) template with iteration
Telephone numbers: \n
%(phone_numbers|Telephone number:%(number)s)
3) template with iteration and class attributes
Addresses: \n
%(Addresses|Street:%(address.street)s\nNumber%(address.number))
Instances of this class are initialized with a template string and
the dictionary for performing substition. The class implements the
__str__ method, so it can be directly printed.
"""
def __init__(self, template, data):
self._template = template
self.data = data
def __str__(self):
return self._template % self
def __getitem__(self, key):
items = key.split("|")
if len(items) == 1:
return self._make_attribute(key)
else:
# Note(salvatore-orlando): items[0] must be subscriptable
return self._make_list(self.data[items[0]], items[1])
def _make_attribute(self, item):
""" Renders an entity attribute key in the template.
e.g.: entity.attribute
"""
items = item.split('.')
if len(items) == 1:
return self.data[item]
elif len(items) == 2:
return self.data[items[0]][items[1]]
def _make_list(self, items, inner_template):
""" Renders a list key in the template.
e.g.: %(list|item data:%(item))
"""
#make sure list is subscriptable
if not hasattr(items, '__getitem__'):
raise Exception("Element is not iterable")
return "\n".join([inner_template % item for item in items])
class CmdOutputTemplate(OutputTemplate):
""" This class provides templated output for CLI commands.
Extends OutputTemplate loading a different template for each command.
"""
_templates = {
"list_nets": "Virtual Networks for Tenant %(tenant_id)s\n" +
"%(networks|\tNetwork ID: %(id)s)s",
"show_net": "Network ID: %(network.id)s\n" +
"network Name: %(network.name)s",
"create_net": "Created a new Virtual Network with ID: " +
"%(network_id)s\n" +
"for Tenant: %(tenant_id)s",
"update_net": "Updated Virtual Network with ID: %(network.id)s\n" +
"for Tenant: %(tenant_id)s\n",
"delete_net": "Deleted Virtual Network with ID: %(network_id)s\n" +
"for Tenant %(tenant_id)s",
"list_ports": "Ports on Virtual Network: %(network_id)s\n" +
"for Tenant: %(tenant_id)s\n" +
"%(ports|\tLogical Port: %(id)s)s",
"create_port": "Created new Logical Port with ID: %(port_id)s\n" +
"on Virtual Network: %(network_id)s\n" +
"for Tenant: %(tenant_id)s",
"show_port": "Logical Port ID: %(port.id)s\n" +
"administrative State: %(port.state)s\n" +
"interface: %(port.attachment)s\n" +
"on Virtual Network: %(network_id)s\n" +
"for Tenant: %(tenant_id)s",
"update_port": "Updated Logical Port " +
"with ID: %(port.id)s\n" +
"on Virtual Network: %(network_id)s\n" +
"for tenant: %(tenant_id)s",
"delete_port": "Deleted Logical Port with ID: %(port_id)s\n" +
"on Virtual Network: %(network_id)s\n" +
"for Tenant: %(tenant_id)s",
"plug_iface": "Plugged interface %(attachment)s\n" +
"into Logical Port: %(port_id)s\n" +
"on Virtual Network: %(network_id)s\n" +
"for Tenant: %(tenant_id)s",
"unplug_iface": "Unplugged interface from Logical Port:" +
"%(port_id)s\n" +
"on Virtual Network: %(network_id)s\n" +
"for Tenant: %(tenant_id)s"}
def __init__(self, cmd, data):
super(CmdOutputTemplate, self).__init__(self._templates[cmd], data)
def _handle_exception(ex):
LOG.exception(sys.exc_info())
print "Exception:%s - %s" % (sys.exc_info()[0], sys.exc_info()[1])
status_code = None
message = None
# Retrieve dict at 1st element of tuple at last argument
if ex.args and isinstance(ex.args[-1][0], dict):
status_code = ex.args[-1][0].get('status_code', None)
message = ex.args[-1][0].get('message', None)
msg_1 = "Command failed with error code: %s" \
% (status_code or '<missing>')
msg_2 = "Error message:%s" % (message or '<missing>')
LOG.exception(msg_1 + "-" + msg_2)
print msg_1
print msg_2
def prepare_output(cmd, tenant_id, response):
LOG.debug("Preparing output for response:%s", response)
response['tenant_id'] = tenant_id
output = str(CmdOutputTemplate(cmd, response))
LOG.debug("Finished preparing output for command:%s", cmd)
return output
def list_nets(client, *args):
tenant_id = args[0]
res = client.list_networks()
LOG.debug("Operation 'list_networks' executed.")
output = prepare_output("list_nets", tenant_id, res)
print output
def create_net(client, *args):
tenant_id, name = args
data = {'network': {'name': name}}
new_net_id = None
try:
res = client.create_network(data)
new_net_id = res["network"]["id"]
LOG.debug("Operation 'create_network' executed.")
output = prepare_output("create_net", tenant_id,
dict(network_id=new_net_id))
print output
except Exception as ex:
_handle_exception(ex)
def delete_net(client, *args):
tenant_id, network_id = args
try:
client.delete_network(network_id)
LOG.debug("Operation 'delete_network' executed.")
output = prepare_output("delete_net", tenant_id,
dict(network_id=network_id))
print output
except Exception as ex:
_handle_exception(ex)
def show_net(client, *args):
tenant_id, network_id = args
try:
#NOTE(salvatore-orlando) changed for returning exclusively
# output for GET /networks/{net-id} API operation
res = client.show_network_details(network_id)["network"]
LOG.debug("Operation 'show_network_details' executed.")
output = prepare_output("show_net", tenant_id,
dict(network=res))
print output
except Exception as ex:
_handle_exception(ex)
def update_net(client, *args):
tenant_id, network_id, param_data = args
data = {'network': {}}
for kv in param_data.split(","):
k, v = kv.split("=")
data['network'][k] = v
data['network']['id'] = network_id
try:
client.update_network(network_id, data)
LOG.debug("Operation 'update_network' executed.")
# Response has no body. Use data for populating output
output = prepare_output("update_net", tenant_id, data)
print output
except Exception as ex:
_handle_exception(ex)
def list_ports(client, *args):
tenant_id, network_id = args
try:
ports = client.list_ports(network_id)
LOG.debug("Operation 'list_ports' executed.")
data = ports
data['network_id'] = network_id
output = prepare_output("list_ports", tenant_id, data)
print output
except Exception as ex:
_handle_exception(ex)
def create_port(client, *args):
tenant_id, network_id = args
try:
res = client.create_port(network_id)
LOG.debug("Operation 'create_port' executed.")
new_port_id = res["port"]["id"]
output = prepare_output("create_port", tenant_id,
dict(network_id=network_id,
port_id=new_port_id))
print output
except Exception as ex:
_handle_exception(ex)
def delete_port(client, *args):
tenant_id, network_id, port_id = args
try:
client.delete_port(network_id, port_id)
LOG.debug("Operation 'delete_port' executed.")
output = prepare_output("delete_port", tenant_id,
dict(network_id=network_id,
port_id=port_id))
print output
except Exception as ex:
_handle_exception(ex)
return
def show_port(client, *args):
tenant_id, network_id, port_id = args
try:
port = client.show_port_details(network_id, port_id)["port"]
LOG.debug("Operation 'list_port_details' executed.")
#NOTE(salvatore-orland): current API implementation does not
#return attachment with GET operation on port. Once API alignment
#branch is merged, update client to use the detail action.
# (danwent) Until then, just make additonal webservice call.
attach = client.show_port_attachment(network_id, port_id)['attachment']
if "id" in attach:
port['attachment'] = attach['id']
else:
port['attachment'] = '<none>'
output = prepare_output("show_port", tenant_id,
dict(network_id=network_id,
port=port))
print output
except Exception as ex:
_handle_exception(ex)
def update_port(client, *args):
tenant_id, network_id, port_id, param_data = args
data = {'port': {}}
for kv in param_data.split(","):
k, v = kv.split("=")
data['port'][k] = v
data['network_id'] = network_id
data['port']['id'] = port_id
try:
client.update_port(network_id, port_id, data)
LOG.debug("Operation 'udpate_port' executed.")
# Response has no body. Use data for populating output
output = prepare_output("update_port", tenant_id, data)
print output
except Exception as ex:
_handle_exception(ex)
def plug_iface(client, *args):
tenant_id, network_id, port_id, attachment = args
try:
data = {'attachment': {'id': '%s' % attachment}}
client.attach_resource(network_id, port_id, data)
LOG.debug("Operation 'attach_resource' executed.")
output = prepare_output("plug_iface", tenant_id,
dict(network_id=network_id,
port_id=port_id,
attachment=attachment))
print output
except Exception as ex:
_handle_exception(ex)
def unplug_iface(client, *args):
tenant_id, network_id, port_id = args
try:
client.detach_resource(network_id, port_id)
LOG.debug("Operation 'detach_resource' executed.")
output = prepare_output("unplug_iface", tenant_id,
dict(network_id=network_id,
port_id=port_id))
print output
except Exception as ex:
_handle_exception(ex)

View File

@ -1,16 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Nicira Networks, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# @author: Somik Behera, Nicira Networks, Inc.

View File

@ -1,327 +0,0 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Nicira Networks, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Routines for configuring Quantum
"""
import ConfigParser
import logging
import logging.config
import logging.handlers
import optparse
import os
import re
import sys
from paste import deploy
from quantum.common import flags
from quantum.common import exceptions as exception
DEFAULT_LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s"
DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
FLAGS = flags.FLAGS
LOG = logging.getLogger('quantum.wsgi')
def parse_options(parser, cli_args=None):
"""
Returns the parsed CLI options, command to run and its arguments, merged
with any same-named options found in a configuration file.
The function returns a tuple of (options, args), where options is a
mapping of option key/str(value) pairs, and args is the set of arguments
(not options) supplied on the command-line.
The reason that the option values are returned as strings only is that
ConfigParser and paste.deploy only accept string values...
:param parser: The option parser
:param cli_args: (Optional) Set of arguments to process. If not present,
sys.argv[1:] is used.
:retval tuple of (options, args)
"""
(options, args) = parser.parse_args(cli_args)
return (vars(options), args)
def add_common_options(parser):
"""
Given a supplied optparse.OptionParser, adds an OptionGroup that
represents all common configuration options.
:param parser: optparse.OptionParser
"""
help_text = "The following configuration options are common to "\
"all quantum programs."
group = optparse.OptionGroup(parser, "Common Options", help_text)
group.add_option('-v', '--verbose', default=False, dest="verbose",
action="store_true",
help="Print more verbose output")
group.add_option('-d', '--debug', default=False, dest="debug",
action="store_true",
help="Print debugging output")
group.add_option('--config-file', default=None, metavar="PATH",
help="Path to the config file to use. When not specified "
"(the default), we generally look at the first "
"argument specified to be a config file, and if "
"that is also missing, we search standard "
"directories for a config file.")
parser.add_option_group(group)
def add_log_options(parser):
"""
Given a supplied optparse.OptionParser, adds an OptionGroup that
represents all the configuration options around logging.
:param parser: optparse.OptionParser
"""
help_text = "The following configuration options are specific to logging "\
"functionality for this program."
group = optparse.OptionGroup(parser, "Logging Options", help_text)
group.add_option('--log-config', default=None, metavar="PATH",
help="If this option is specified, the logging "
"configuration file specified is used and overrides "
"any other logging options specified. Please see "
"the Python logging module documentation for "
"details on logging configuration files.")
group.add_option('--log-date-format', metavar="FORMAT",
default=DEFAULT_LOG_DATE_FORMAT,
help="Format string for %(asctime)s in log records. "
"Default: %default")
group.add_option('--log-file', default=None, metavar="PATH",
help="(Optional) Name of log file to output to. "
"If not set, logging will go to stdout.")
group.add_option("--log-dir", default=None,
help="(Optional) The directory to keep log files in "
"(will be prepended to --logfile)")
parser.add_option_group(group)
def setup_logging(options, conf):
"""
Sets up the logging options for a log with supplied name
:param options: Mapping of typed option key/values
:param conf: Mapping of untyped key/values from config file
"""
if options.get('log_config', None):
# Use a logging configuration file for all settings...
if os.path.exists(options['log_config']):
logging.config.fileConfig(options['log_config'])
return
else:
raise RuntimeError("Unable to locate specified logging "
"config file: %s" % options['log_config'])
# If either the CLI option or the conf value
# is True, we set to True
debug = options.get('debug') or \
get_option(conf, 'debug', type='bool', default=False)
verbose = options.get('verbose') or \
get_option(conf, 'verbose', type='bool', default=False)
root_logger = logging.root
if debug:
root_logger.setLevel(logging.DEBUG)
elif verbose:
root_logger.setLevel(logging.INFO)
else:
root_logger.setLevel(logging.WARNING)
# Set log configuration from options...
# Note that we use a hard-coded log format in the options
# because of Paste.Deploy bug #379
# http://trac.pythonpaste.org/pythonpaste/ticket/379
log_format = options.get('log_format', DEFAULT_LOG_FORMAT)
log_date_format = options.get('log_date_format', DEFAULT_LOG_DATE_FORMAT)
formatter = logging.Formatter(log_format, log_date_format)
logfile = options.get('log_file')
if not logfile:
logfile = conf.get('log_file')
if logfile:
logdir = options.get('log_dir')
if not logdir:
logdir = conf.get('log_dir')
if logdir:
logfile = os.path.join(logdir, logfile)
logfile = logging.FileHandler(logfile)
logfile.setFormatter(formatter)
logfile.setFormatter(formatter)
root_logger.addHandler(logfile)
else:
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
root_logger.addHandler(handler)
def find_config_file(options, args, config_file='quantum.conf'):
"""
Return the first config file found.
We search for the paste config file in the following order:
* If --config-file option is used, use that
* If args[0] is a file, use that
* Search for the configuration file in standard directories:
* .
* ~.quantum/
* ~
* $FLAGS.state_path/etc/quantum
* $FLAGS.state_path/etc
:retval Full path to config file, or None if no config file found
"""
fix_path = lambda p: os.path.abspath(os.path.expanduser(p))
if options.get('config_file'):
if os.path.exists(options['config_file']):
return fix_path(options['config_file'])
elif args:
if os.path.exists(args[0]):
return fix_path(args[0])
dir_to_common = os.path.dirname(os.path.abspath(__file__))
root = os.path.join(dir_to_common, '..', '..', '..', '..')
# Handle standard directory search for the config file
config_file_dirs = [fix_path(os.path.join(os.getcwd(), 'etc')),
fix_path(os.path.join('~', '.venv', 'etc',
'quantum')),
fix_path('~'),
os.path.join(FLAGS.state_path, 'etc'),
os.path.join(FLAGS.state_path, 'etc', 'quantum'),
fix_path(os.path.join('~', '.local',
'etc', 'quantum')),
'/usr/etc/quantum',
'/usr/local/etc/quantum',
'/etc/quantum/',
'/etc']
if 'plugin' in options:
config_file_dirs = [os.path.join(x, 'quantum', 'plugins',
options['plugin'])
for x in config_file_dirs]
if os.path.exists(os.path.join(root, 'plugins')):
plugins = [fix_path(os.path.join(root, 'plugins', p, 'etc'))
for p in os.listdir(os.path.join(root, 'plugins'))]
plugins = [p for p in plugins if os.path.isdir(p)]
config_file_dirs.extend(plugins)
for cfg_dir in config_file_dirs:
cfg_file = os.path.join(cfg_dir, config_file)
if os.path.exists(cfg_file):
return cfg_file
def load_paste_config(app_name, options, args):
"""
Looks for a config file to use for an app and returns the
config file path and a configuration mapping from a paste config file.
We search for the paste config file in the following order:
* If --config-file option is used, use that
* If args[0] is a file, use that
* Search for quantum.conf in standard directories:
* .
* ~.quantum/
* ~
* /etc/quantum
* /etc
:param app_name: Name of the application to load config for, or None.
None signifies to only load the [DEFAULT] section of
the config file.
:param options: Set of typed options returned from parse_options()
:param args: Command line arguments from argv[1:]
:retval Tuple of (conf_file, conf)
:raises RuntimeError when config file cannot be located or there was a
problem loading the configuration file.
"""
conf_file = find_config_file(options, args)
if not conf_file:
raise RuntimeError("Unable to locate any configuration file. "
"Cannot load application %s" % app_name)
try:
conf = deploy.appconfig("config:%s" % conf_file, name=app_name)
return conf_file, conf
except Exception, e:
raise RuntimeError("Error trying to load config %s: %s"
% (conf_file, e))
def load_paste_app(app_name, options, args):
"""
Builds and returns a WSGI app from a paste config file.
We search for the paste config file in the following order:
* If --config-file option is used, use that
* If args[0] is a file, use that
* Search for quantum.conf in standard directories:
* .
* ~.quantum/
* ~
* /etc/quantum
* /etc
:param app_name: Name of the application to load
:param options: Set of typed options returned from parse_options()
:param args: Command line arguments from argv[1:]
:raises RuntimeError when config file cannot be located or application
cannot be loaded from config file
"""
conf_file, conf = load_paste_config(app_name, options, args)
try:
app = deploy.loadapp("config:%s" % conf_file, name=app_name)
except (LookupError, ImportError), e:
raise RuntimeError("Unable to load %(app_name)s from "
"configuration file %(conf_file)s."
"\nGot: %(e)r" % locals())
return conf, app
def get_option(options, option, **kwargs):
if option in options:
value = options[option]
type_ = kwargs.get('type', 'str')
if type_ == 'bool':
if hasattr(value, 'lower'):
return value.lower() == 'true'
else:
return value
elif type_ == 'int':
return int(value)
elif type_ == 'float':
return float(value)
else:
return value
elif 'default' in kwargs:
return kwargs['default']
else:
raise KeyError("option '%s' not found" % option)

View File

@ -1,178 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Nicira Networks, Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Quantum base exception handling, including decorator for re-raising
Quantum-type exceptions. SHOULD include dedicated exception logging.
"""
import logging
import gettext
gettext.install('quantum', unicode=1)
class QuantumException(Exception):
"""Base Quantum Exception
Taken from nova.exception.NovaException
To correctly use this class, inherit from it and define
a 'message' property. That message will get printf'd
with the keyword arguments provided to the constructor.
"""
message = _("An unknown exception occurred.")
def __init__(self, **kwargs):
try:
self._error_string = self.message % kwargs
except Exception:
# at least get the core message out if something happened
self._error_string = self.message
def __str__(self):
return self._error_string
class ProcessExecutionError(IOError):
def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None,
description=None):
if description is None:
description = "Unexpected error while running command."
if exit_code is None:
exit_code = '-'
message = "%s\nCommand: %s\nExit code: %s\nStdout: %r\nStderr: %r" % (
description, cmd, exit_code, stdout, stderr)
IOError.__init__(self, message)
class Error(Exception):
def __init__(self, message=None):
super(Error, self).__init__(message)
class ApiError(Error):
def __init__(self, message='Unknown', code='Unknown'):
self.message = message
self.code = code
super(ApiError, self).__init__('%s: %s' % (code, message))
class NotFound(QuantumException):
pass
class ClassNotFound(NotFound):
message = _("Class %(class_name)s could not be found")
class NetworkNotFound(NotFound):
message = _("Network %(net_id)s could not be found")
class PortNotFound(NotFound):
message = _("Port %(port_id)s could not be found " \
"on network %(net_id)s")
class StateInvalid(QuantumException):
message = _("Unsupported port state: %(port_state)s")
class NetworkInUse(QuantumException):
message = _("Unable to complete operation on network %(net_id)s. " \
"There is one or more attachments plugged into its ports.")
class PortInUse(QuantumException):
message = _("Unable to complete operation on port %(port_id)s " \
"for network %(net_id)s. The attachment '%(att_id)s" \
"is plugged into the logical port.")
class AlreadyAttached(QuantumException):
message = _("Unable to plug the attachment %(att_id)s into port " \
"%(port_id)s for network %(net_id)s. The attachment is " \
"already plugged into port %(att_port_id)s")
# NOTE: on the client side, we often do not know all of the information
# that is known on the server, thus, we create separate exception for
# those scenarios
class PortInUseClient(QuantumException):
message = _("Unable to complete operation on port %(port_id)s " \
"for network %(net_id)s. An attachment " \
"is plugged into the logical port.")
class AlreadyAttachedClient(QuantumException):
message = _("Unable to plug the attachment %(att_id)s into port " \
"%(port_id)s for network %(net_id)s. The attachment is " \
"already plugged into another port.")
class MalformedRequestBody(QuantumException):
message = _("Malformed request body: %(reason)s")
class Duplicate(Error):
pass
class NotAuthorized(Error):
pass
class NotEmpty(Error):
pass
class Invalid(Error):
pass
class InvalidContentType(Invalid):
message = _("Invalid content type %(content_type)s.")
class BadInputError(Exception):
"""Error resulting from a client sending bad input to a server"""
pass
class MissingArgumentError(Error):
pass
class NotImplementedError(Error):
pass
def wrap_exception(f):
def _wrap(*args, **kw):
try:
return f(*args, **kw)
except Exception, e:
if not isinstance(e, Error):
#exc_type, exc_value, exc_traceback = sys.exc_info()
logging.exception('Uncaught exception')
#logging.error(traceback.extract_stack(exc_traceback))
raise Error(str(e))
raise
_wrap.func_name = f.func_name
return _wrap

View File

@ -1,249 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Citrix Systems, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Command-line flag library.
Wraps gflags.
Global flags should be defined here, the rest are defined where they're used.
"""
import getopt
import gflags
import os
import string
import sys
class FlagValues(gflags.FlagValues):
"""Extension of gflags.FlagValues that allows undefined and runtime flags.
Unknown flags will be ignored when parsing the command line, but the
command line will be kept so that it can be replayed if new flags are
defined after the initial parsing.
"""
def __init__(self, extra_context=None):
gflags.FlagValues.__init__(self)
self.__dict__['__dirty'] = []
self.__dict__['__was_already_parsed'] = False
self.__dict__['__stored_argv'] = []
self.__dict__['__extra_context'] = extra_context
def __call__(self, argv):
# We're doing some hacky stuff here so that we don't have to copy
# out all the code of the original verbatim and then tweak a few lines.
# We're hijacking the output of getopt so we can still return the
# leftover args at the end
sneaky_unparsed_args = {"value": None}
original_argv = list(argv)
if self.IsGnuGetOpt():
orig_getopt = getattr(getopt, 'gnu_getopt')
orig_name = 'gnu_getopt'
else:
orig_getopt = getattr(getopt, 'getopt')
orig_name = 'getopt'
def _sneaky(*args, **kw):
optlist, unparsed_args = orig_getopt(*args, **kw)
sneaky_unparsed_args['value'] = unparsed_args
return optlist, unparsed_args
try:
setattr(getopt, orig_name, _sneaky)
args = gflags.FlagValues.__call__(self, argv)
except gflags.UnrecognizedFlagError:
# Undefined args were found, for now we don't care so just
# act like everything went well
# (these three lines are copied pretty much verbatim from the end
# of the __call__ function we are wrapping)
unparsed_args = sneaky_unparsed_args['value']
if unparsed_args:
if self.IsGnuGetOpt():
args = argv[:1] + unparsed_args
else:
args = argv[:1] + original_argv[-len(unparsed_args):]
else:
args = argv[:1]
finally:
setattr(getopt, orig_name, orig_getopt)
# Store the arguments for later, we'll need them for new flags
# added at runtime
self.__dict__['__stored_argv'] = original_argv
self.__dict__['__was_already_parsed'] = True
self.ClearDirty()
return args
def Reset(self):
gflags.FlagValues.Reset(self)
self.__dict__['__dirty'] = []
self.__dict__['__was_already_parsed'] = False
self.__dict__['__stored_argv'] = []
def SetDirty(self, name):
"""Mark a flag as dirty so that accessing it will case a reparse."""
self.__dict__['__dirty'].append(name)
def IsDirty(self, name):
return name in self.__dict__['__dirty']
def ClearDirty(self):
self.__dict__['__is_dirty'] = []
def WasAlreadyParsed(self):
return self.__dict__['__was_already_parsed']
def ParseNewFlags(self):
if '__stored_argv' not in self.__dict__:
return
new_flags = FlagValues(self)
for k in self.__dict__['__dirty']:
new_flags[k] = gflags.FlagValues.__getitem__(self, k)
new_flags(self.__dict__['__stored_argv'])
for k in self.__dict__['__dirty']:
setattr(self, k, getattr(new_flags, k))
self.ClearDirty()
def __setitem__(self, name, flag):
gflags.FlagValues.__setitem__(self, name, flag)
if self.WasAlreadyParsed():
self.SetDirty(name)
def __getitem__(self, name):
if self.IsDirty(name):
self.ParseNewFlags()
return gflags.FlagValues.__getitem__(self, name)
def __getattr__(self, name):
if self.IsDirty(name):
self.ParseNewFlags()
val = gflags.FlagValues.__getattr__(self, name)
if isinstance(val, str):
tmpl = string.Template(val)
context = [self, self.__dict__['__extra_context']]
return tmpl.substitute(StrWrapper(context))
return val
class StrWrapper(object):
"""Wrapper around FlagValues objects.
Wraps FlagValues objects for string.Template so that we're
sure to return strings.
"""
def __init__(self, context_objs):
self.context_objs = context_objs
def __getitem__(self, name):
for context in self.context_objs:
val = getattr(context, name, False)
if val:
return str(val)
raise KeyError(name)
# Copied from gflags with small mods to get the naming correct.
# Originally gflags checks for the first module that is not gflags that is
# in the call chain, we want to check for the first module that is not gflags
# and not this module.
def _GetCallingModule():
"""Returns the name of the module that's calling into this module.
We generally use this function to get the name of the module calling a
DEFINE_foo... function.
"""
# Walk down the stack to find the first globals dict that's not ours.
for depth in range(1, sys.getrecursionlimit()):
if not sys._getframe(depth).f_globals is globals():
module_name = __GetModuleName(sys._getframe(depth).f_globals)
if module_name == 'gflags':
continue
if module_name is not None:
return module_name
raise AssertionError("No module was found")
# Copied from gflags because it is a private function
def __GetModuleName(globals_dict):
"""Given a globals dict, returns the name of the module that defines it.
Args:
globals_dict: A dictionary that should correspond to an environment
providing the values of the globals.
Returns:
A string (the name of the module) or None (if the module could not
be identified.
"""
for name, module in sys.modules.iteritems():
if getattr(module, '__dict__', None) is globals_dict:
if name == '__main__':
return sys.argv[0]
return name
return None
def _wrapper(func):
def _wrapped(*args, **kw):
kw.setdefault('flag_values', FLAGS)
func(*args, **kw)
_wrapped.func_name = func.func_name
return _wrapped
FLAGS = FlagValues()
gflags.FLAGS = FLAGS
gflags._GetCallingModule = _GetCallingModule
DEFINE = _wrapper(gflags.DEFINE)
DEFINE_string = _wrapper(gflags.DEFINE_string)
DEFINE_integer = _wrapper(gflags.DEFINE_integer)
DEFINE_bool = _wrapper(gflags.DEFINE_bool)
DEFINE_boolean = _wrapper(gflags.DEFINE_boolean)
DEFINE_float = _wrapper(gflags.DEFINE_float)
DEFINE_enum = _wrapper(gflags.DEFINE_enum)
DEFINE_list = _wrapper(gflags.DEFINE_list)
DEFINE_spaceseplist = _wrapper(gflags.DEFINE_spaceseplist)
DEFINE_multistring = _wrapper(gflags.DEFINE_multistring)
DEFINE_multi_int = _wrapper(gflags.DEFINE_multi_int)
DEFINE_flag = _wrapper(gflags.DEFINE_flag)
HelpFlag = gflags.HelpFlag
HelpshortFlag = gflags.HelpshortFlag
HelpXMLFlag = gflags.HelpXMLFlag
def DECLARE(name, module_string, flag_values=FLAGS):
if module_string not in sys.modules:
__import__(module_string, globals(), locals())
if name not in flag_values:
raise gflags.UnrecognizedFlag(
"%s not defined by %s" % (name, module_string))
# __GLOBAL FLAGS ONLY__
# Define any app-specific flags in their own files, docs at:
# http://code.google.com/p/python-gflags/source/browse/trunk/gflags.py#a9
DEFINE_string('state_path', os.path.join(os.path.dirname(__file__), '../../'),
"Top-level directory for maintaining quantum's state")

View File

@ -1,153 +0,0 @@
from xml.dom import minidom
import webob.exc
from quantum.common import utils
class Serializer(object):
"""Serializes and deserializes dictionaries to certain MIME types."""
def __init__(self, metadata=None, default_xmlns=None):
"""Create a serializer based on the given WSGI environment.
'metadata' is an optional dict mapping MIME types to information
needed to serialize a dictionary to that type.
"""
self.metadata = metadata or {}
self.default_xmlns = default_xmlns
def _get_serialize_handler(self, content_type):
handlers = {
'application/json': self._to_json,
'application/xml': self._to_xml,
}
try:
return handlers[content_type]
except Exception:
raise exception.InvalidContentType(content_type=content_type)
def serialize(self, data, content_type):
"""Serialize a dictionary into the specified content type."""
return self._get_serialize_handler(content_type)(data)
def deserialize(self, datastring, content_type):
"""Deserialize a string to a dictionary.
The string must be in the format of a supported MIME type.
"""
try:
return self.get_deserialize_handler(content_type)(datastring)
except Exception:
raise webob.exc.HTTPBadRequest("Could not deserialize data")
def get_deserialize_handler(self, content_type):
handlers = {
'application/json': self._from_json,
'application/xml': self._from_xml,
}
try:
return handlers[content_type]
except Exception:
raise exception.InvalidContentType(content_type=content_type)
def _from_json(self, datastring):
return utils.loads(datastring)
def _from_xml(self, datastring):
xmldata = self.metadata.get('application/xml', {})
plurals = set(xmldata.get('plurals', {}))
node = minidom.parseString(datastring).childNodes[0]
return {node.nodeName: self._from_xml_node(node, plurals)}
def _from_xml_node(self, node, listnames):
"""Convert a minidom node to a simple Python type.
listnames is a collection of names of XML nodes whose subnodes should
be considered list items.
"""
if len(node.childNodes) == 1 and node.childNodes[0].nodeType == 3:
return node.childNodes[0].nodeValue
elif node.nodeName in listnames:
return [self._from_xml_node(n, listnames)
for n in node.childNodes if n.nodeType != node.TEXT_NODE]
else:
result = dict()
for attr in node.attributes.keys():
result[attr] = node.attributes[attr].nodeValue
for child in node.childNodes:
if child.nodeType != node.TEXT_NODE:
result[child.nodeName] = self._from_xml_node(child,
listnames)
return result
def _to_json(self, data):
return utils.dumps(data)
def _to_xml(self, data):
metadata = self.metadata.get('application/xml', {})
# We expect data to contain a single key which is the XML root.
root_key = data.keys()[0]
doc = minidom.Document()
node = self._to_xml_node(doc, metadata, root_key, data[root_key])
xmlns = node.getAttribute('xmlns')
if not xmlns and self.default_xmlns:
node.setAttribute('xmlns', self.default_xmlns)
return node.toprettyxml(indent='', newl='')
def _to_xml_node(self, doc, metadata, nodename, data):
"""Recursive method to convert data members to XML nodes."""
result = doc.createElement(nodename)
# Set the xml namespace if one is specified
# TODO(justinsb): We could also use prefixes on the keys
xmlns = metadata.get('xmlns', None)
if xmlns:
result.setAttribute('xmlns', xmlns)
if isinstance(data, list):
collections = metadata.get('list_collections', {})
if nodename in collections:
metadata = collections[nodename]
for item in data:
node = doc.createElement(metadata['item_name'])
node.setAttribute(metadata['item_key'], str(item))
result.appendChild(node)
return result
singular = metadata.get('plurals', {}).get(nodename, None)
if singular is None:
if nodename.endswith('s'):
singular = nodename[:-1]
else:
singular = 'item'
for item in data:
node = self._to_xml_node(doc, metadata, singular, item)
result.appendChild(node)
elif isinstance(data, dict):
collections = metadata.get('dict_collections', {})
if nodename in collections:
metadata = collections[nodename]
for k, v in data.items():
node = doc.createElement(metadata['item_name'])
node.setAttribute(metadata['item_key'], str(k))
text = doc.createTextNode(str(v))
node.appendChild(text)
result.appendChild(node)
return result
attrs = metadata.get('attributes', {}).get(nodename, {})
for k, v in data.items():
if k in attrs:
result.setAttribute(k, str(v))
else:
node = self._to_xml_node(doc, metadata, k, v)
result.appendChild(node)
else:
# Type is atom.
node = doc.createTextNode(str(data))
result.appendChild(node)
return result

View File

@ -1,291 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 OpenStack, LLC
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Colorizer Code is borrowed from Twisted:
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import gettext
import os
import unittest
import sys
import logging
from nose import result
from nose import core
from nose import config
class _AnsiColorizer(object):
"""
A colorizer is an object that loosely wraps around a stream, allowing
callers to write text to the stream in a particular color.
Colorizer classes must implement C{supported()} and C{write(text, color)}.
"""
_colors = dict(black=30, red=31, green=32, yellow=33,
blue=34, magenta=35, cyan=36, white=37)
def __init__(self, stream):
self.stream = stream
def supported(cls, stream=sys.stdout):
"""
A class method that returns True if the current platform supports
coloring terminal output using this method. Returns False otherwise.
"""
if not stream.isatty():
return False # auto color only on TTYs
try:
import curses
except ImportError:
return False
else:
try:
try:
return curses.tigetnum("colors") > 2
except curses.error:
curses.setupterm()
return curses.tigetnum("colors") > 2
except:
raise
# guess false in case of error
return False
supported = classmethod(supported)
def write(self, text, color):
"""
Write the given text to the stream in the given color.
@param text: Text to be written to the stream.
@param color: A string label for a color. e.g. 'red', 'white'.
"""
color = self._colors[color]
self.stream.write('\x1b[%s;1m%s\x1b[0m' % (color, text))
class _Win32Colorizer(object):
"""
See _AnsiColorizer docstring.
"""
def __init__(self, stream):
from win32console import GetStdHandle, STD_OUT_HANDLE, \
FOREGROUND_RED, FOREGROUND_BLUE, FOREGROUND_GREEN, \
FOREGROUND_INTENSITY
red, green, blue, bold = (FOREGROUND_RED, FOREGROUND_GREEN,
FOREGROUND_BLUE, FOREGROUND_INTENSITY)
self.stream = stream
self.screenBuffer = GetStdHandle(STD_OUT_HANDLE)
self._colors = {
'normal': red | green | blue,
'red': red | bold,
'green': green | bold,
'blue': blue | bold,
'yellow': red | green | bold,
'magenta': red | blue | bold,
'cyan': green | blue | bold,
'white': red | green | blue | bold}
def supported(cls, stream=sys.stdout):
try:
import win32console
screenBuffer = win32console.GetStdHandle(
win32console.STD_OUT_HANDLE)
except ImportError:
return False
import pywintypes
try:
screenBuffer.SetConsoleTextAttribute(
win32console.FOREGROUND_RED |
win32console.FOREGROUND_GREEN |
win32console.FOREGROUND_BLUE)
except pywintypes.error:
return False
else:
return True
supported = classmethod(supported)
def write(self, text, color):
color = self._colors[color]
self.screenBuffer.SetConsoleTextAttribute(color)
self.stream.write(text)
self.screenBuffer.SetConsoleTextAttribute(self._colors['normal'])
class _NullColorizer(object):
"""
See _AnsiColorizer docstring.
"""
def __init__(self, stream):
self.stream = stream
def supported(cls, stream=sys.stdout):
return True
supported = classmethod(supported)
def write(self, text, color):
self.stream.write(text)
class QuantumTestResult(result.TextTestResult):
def __init__(self, *args, **kw):
result.TextTestResult.__init__(self, *args, **kw)
self._last_case = None
self.colorizer = None
# NOTE(vish, tfukushima): reset stdout for the terminal check
stdout = sys.__stdout__
sys.stdout = sys.__stdout__
for colorizer in [_Win32Colorizer, _AnsiColorizer, _NullColorizer]:
if colorizer.supported():
self.colorizer = colorizer(self.stream)
break
sys.stdout = stdout
def getDescription(self, test):
return str(test)
# NOTE(vish, tfukushima): copied from unittest with edit to add color
def addSuccess(self, test):
unittest.TestResult.addSuccess(self, test)
if self.showAll:
self.colorizer.write("OK", 'green')
self.stream.writeln()
elif self.dots:
self.stream.write('.')
self.stream.flush()
# NOTE(vish, tfukushima): copied from unittest with edit to add color
def addFailure(self, test, err):
unittest.TestResult.addFailure(self, test, err)
if self.showAll:
self.colorizer.write("FAIL", 'red')
self.stream.writeln()
elif self.dots:
self.stream.write('F')
self.stream.flush()
# NOTE(vish, tfukushima): copied from unittest with edit to add color
def addError(self, test, err):
"""Overrides normal addError to add support for errorClasses.
If the exception is a registered class, the error will be added
to the list for that class, not errors.
"""
stream = getattr(self, 'stream', None)
ec, ev, tb = err
try:
exc_info = self._exc_info_to_string(err, test)
except TypeError:
# This is for compatibility with Python 2.3.
exc_info = self._exc_info_to_string(err)
for cls, (storage, label, isfail) in self.errorClasses.items():
if result.isclass(ec) and issubclass(ec, cls):
if isfail:
test.passwd = False
storage.append((test, exc_info))
# Might get patched into a streamless result
if stream is not None:
if self.showAll:
message = [label]
detail = result._exception_details(err[1])
if detail:
message.append(detail)
stream.writeln(": ".join(message))
elif self.dots:
stream.write(label[:1])
return
self.errors.append((test, exc_info))
test.passed = False
if stream is not None:
if self.showAll:
self.colorizer.write("ERROR", 'red')
self.stream.writeln()
elif self.dots:
stream.write('E')
def startTest(self, test):
unittest.TestResult.startTest(self, test)
current_case = test.test.__class__.__name__
if self.showAll:
if current_case != self._last_case:
self.stream.writeln(current_case)
self._last_case = current_case
#NOTE(salvatore-orlando):
#slightly changed in order to print test case class
#together with unit test name
self.stream.write(
' %s' % str(test.test).ljust(60))
self.stream.flush()
class QuantumTestRunner(core.TextTestRunner):
def _makeResult(self):
return QuantumTestResult(self.stream,
self.descriptions,
self.verbosity,
self.config)
def run_tests(c=None):
logger = logging.getLogger()
hdlr = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
hdlr.setFormatter(formatter)
logger.addHandler(hdlr)
logger.setLevel(logging.DEBUG)
# NOTE(bgh): I'm not entirely sure why but nose gets confused here when
# calling run_tests from a plugin directory run_tests.py (instead of the
# main run_tests.py). It will call run_tests with no arguments and the
# testing of run_tests will fail (though the plugin tests will pass). For
# now we just return True to let the run_tests test pass.
if not c:
return True
runner = QuantumTestRunner(stream=c.stream,
verbosity=c.verbosity,
config=c)
return not core.run(config=c, testRunner=runner)
# describes parameters used by different unit/functional tests
# a plugin-specific testing mechanism should import this dictionary
# and override the values in it if needed (e.g., run_tests.py in
# quantum/plugins/openvswitch/ )
test_config = {
"plugin_name": "quantum.plugins.sample.SamplePlugin.FakePlugin",
"default_net_op_status": "UP",
"default_port_op_status": "UP",
}

View File

@ -1,267 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011, Nicira Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# Borrowed from nova code base, more utilities will be added/borrowed as and
# when needed.
# @author: Somik Behera, Nicira Networks, Inc.
"""Utilities and helper functions."""
import ConfigParser
import datetime
import exceptions as exception
import inspect
import logging
import os
import random
import subprocess
import socket
import sys
import base64
import functools
import json
import re
import string
import struct
import time
from quantum.common import flags
from quantum.common import exceptions as exception
from quantum.common.exceptions import ProcessExecutionError
def import_class(import_str):
"""Returns a class from a string including module and class."""
mod_str, _sep, class_str = import_str.rpartition('.')
try:
__import__(mod_str)
return getattr(sys.modules[mod_str], class_str)
except (ImportError, ValueError, AttributeError), exc:
print(('Inner Exception: %s'), exc)
raise exception.ClassNotFound(class_name=class_str)
def import_object(import_str):
"""Returns an object including a module or module and class."""
try:
__import__(import_str)
return sys.modules[import_str]
except ImportError:
cls = import_class(import_str)
return cls()
def to_primitive(value):
if isinstance(value, (list, tuple)):
o = []
for v in value:
o.append(to_primitive(v))
return o
elif isinstance(value, dict):
o = {}
for k, v in value.iteritems():
o[k] = to_primitive(v)
return o
elif isinstance(value, datetime.datetime):
return str(value)
elif hasattr(value, 'iteritems'):
return to_primitive(dict(value.iteritems()))
elif hasattr(value, '__iter__'):
return to_primitive(list(value))
else:
return value
def dumps(value):
try:
return json.dumps(value)
except TypeError:
pass
return json.dumps(to_primitive(value))
def loads(s):
return json.loads(s)
TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
FLAGS = flags.FLAGS
def int_from_bool_as_string(subject):
"""
Interpret a string as a boolean and return either 1 or 0.
Any string value in:
('True', 'true', 'On', 'on', '1')
is interpreted as a boolean True.
Useful for JSON-decoded stuff and config file parsing
"""
return bool_from_string(subject) and 1 or 0
def bool_from_string(subject):
"""
Interpret a string as a boolean.
Any string value in:
('True', 'true', 'On', 'on', '1')
is interpreted as a boolean True.
Useful for JSON-decoded stuff and config file parsing
"""
if isinstance(subject, bool):
return subject
if hasattr(subject, 'startswith'): # str or unicode...
if subject.strip().lower() in ('true', 'on', '1'):
return True
return False
def fetchfile(url, target):
logging.debug("Fetching %s" % url)
# c = pycurl.Curl()
# fp = open(target, "wb")
# c.setopt(c.URL, url)
# c.setopt(c.WRITEDATA, fp)
# c.perform()
# c.close()
# fp.close()
execute("curl --fail %s -o %s" % (url, target))
def execute(cmd, process_input=None, addl_env=None, check_exit_code=True):
logging.debug("Running cmd: %s", cmd)
env = os.environ.copy()
if addl_env:
env.update(addl_env)
obj = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
result = None
if process_input is not None:
result = obj.communicate(process_input)
else:
result = obj.communicate()
obj.stdin.close()
if obj.returncode:
logging.debug("Result was %s" % (obj.returncode))
if check_exit_code and obj.returncode != 0:
(stdout, stderr) = result
raise ProcessExecutionError(exit_code=obj.returncode,
stdout=stdout,
stderr=stderr,
cmd=cmd)
return result
def abspath(s):
return os.path.join(os.path.dirname(__file__), s)
# TODO(sirp): when/if utils is extracted to common library, we should remove
# the argument's default.
#def default_flagfile(filename='nova.conf'):
def default_flagfile(filename='quantum.conf'):
for arg in sys.argv:
if arg.find('flagfile') != -1:
break
else:
if not os.path.isabs(filename):
# turn relative filename into an absolute path
script_dir = os.path.dirname(inspect.stack()[-1][1])
filename = os.path.abspath(os.path.join(script_dir, filename))
if os.path.exists(filename):
sys.argv = \
sys.argv[:1] + ['--flagfile=%s' % filename] + sys.argv[1:]
def debug(arg):
logging.debug('debug in callback: %s', arg)
return arg
def runthis(prompt, cmd, check_exit_code=True):
logging.debug("Running %s" % (cmd))
exit_code = subprocess.call(cmd.split(" "))
logging.debug(prompt % (exit_code))
if check_exit_code and exit_code != 0:
raise ProcessExecutionError(exit_code=exit_code,
stdout=None,
stderr=None,
cmd=cmd)
def generate_uid(topic, size=8):
return '%s-%s' % (topic, ''.join(
[random.choice('01234567890abcdefghijklmnopqrstuvwxyz')
for x in xrange(size)]))
def generate_mac():
mac = [0x02, 0x16, 0x3e, random.randint(0x00, 0x7f),
random.randint(0x00, 0xff), random.randint(0x00, 0xff)]
return ':'.join(map(lambda x: "%02x" % x, mac))
def last_octet(address):
return int(address.split(".")[-1])
def isotime(at=None):
if not at:
at = datetime.datetime.utcnow()
return at.strftime(TIME_FORMAT)
def parse_isotime(timestr):
return datetime.datetime.strptime(timestr, TIME_FORMAT)
def get_plugin_from_config(file="config.ini"):
Config = ConfigParser.ConfigParser()
Config.read(file)
return Config.get("PLUGIN", "provider")
class LazyPluggable(object):
"""A pluggable backend loaded lazily based on some value."""
def __init__(self, pivot, **backends):
self.__backends = backends
self.__pivot = pivot
self.__backend = None
def __get_backend(self):
if not self.__backend:
backend_name = self.__pivot.value
if backend_name not in self.__backends:
raise exception.Error('Invalid backend: %s' % backend_name)
backend = self.__backends[backend_name]
if isinstance(backend, tuple):
name = backend[0]
fromlist = backend[1]
else:
name = backend
fromlist = backend
self.__backend = __import__(name, None, None, fromlist)
logging.info('backend %s', self.__backend)
return self.__backend
def __getattr__(self, key):
backend = self.__get_backend()
return getattr(backend, key)

View File

@ -24,7 +24,7 @@ from webob import exc
from quantum.extensions import _credential_view as credential_view from quantum.extensions import _credential_view as credential_view
from quantum.api import api_common as common from quantum.api import api_common as common
from quantum.common import extensions from quantum.extensions import extensions
from quantum.manager import QuantumManager from quantum.manager import QuantumManager
from quantum.plugins.cisco.common import cisco_exceptions as exception from quantum.plugins.cisco.common import cisco_exceptions as exception
from quantum.plugins.cisco.common import cisco_faults as faults from quantum.plugins.cisco.common import cisco_faults as faults

View File

@ -30,7 +30,7 @@ import quantum.extensions
from quantum.manager import QuantumManager from quantum.manager import QuantumManager
from quantum import wsgi from quantum import wsgi
LOG = logging.getLogger('quantum.common.extensions') LOG = logging.getLogger('quantum.extensions.extensions')
class PluginInterface(object): class PluginInterface(object):

View File

@ -24,7 +24,7 @@ from webob import exc
from quantum.api import api_common as common from quantum.api import api_common as common
from quantum.api.views import ports as port_view from quantum.api.views import ports as port_view
from quantum.common import extensions from quantum.extensions import extensions
from quantum.manager import QuantumManager from quantum.manager import QuantumManager
from quantum.plugins.cisco.common import cisco_exceptions as exception from quantum.plugins.cisco.common import cisco_exceptions as exception
from quantum.plugins.cisco.common import cisco_faults as faults from quantum.plugins.cisco.common import cisco_faults as faults

View File

@ -23,7 +23,7 @@ from webob import exc
from quantum.extensions import _novatenant_view as novatenant_view from quantum.extensions import _novatenant_view as novatenant_view
from quantum.api import api_common as common from quantum.api import api_common as common
from quantum.common import exceptions as qexception from quantum.common import exceptions as qexception
from quantum.common import extensions from quantum.extensions import extensions
from quantum.manager import QuantumManager from quantum.manager import QuantumManager
from quantum.plugins.cisco.common import cisco_faults as faults from quantum.plugins.cisco.common import cisco_faults as faults

View File

@ -24,7 +24,7 @@ from webob import exc
from quantum.extensions import _pprofiles as pprofiles_view from quantum.extensions import _pprofiles as pprofiles_view
from quantum.api import api_common as common from quantum.api import api_common as common
from quantum.common import exceptions as qexception from quantum.common import exceptions as qexception
from quantum.common import extensions from quantum.extensions import extensions
from quantum.manager import QuantumManager from quantum.manager import QuantumManager
from quantum.plugins.cisco.common import cisco_exceptions as exception from quantum.plugins.cisco.common import cisco_exceptions as exception
from quantum.plugins.cisco.common import cisco_faults as faults from quantum.plugins.cisco.common import cisco_faults as faults

View File

@ -23,7 +23,7 @@ import logging
from webob import exc from webob import exc
from quantum.extensions import _qos_view as qos_view from quantum.extensions import _qos_view as qos_view
from quantum.api import api_common as common from quantum.api import api_common as common
from quantum.common import extensions from quantum.extensions import extensions
from quantum.manager import QuantumManager from quantum.manager import QuantumManager
from quantum.plugins.cisco.common import cisco_exceptions as exception from quantum.plugins.cisco.common import cisco_exceptions as exception
from quantum.plugins.cisco.common import cisco_faults as faults from quantum.plugins.cisco.common import cisco_faults as faults

View File

@ -32,12 +32,12 @@ from quantum.extensions import multiport
from quantum.plugins.cisco.db import api as db from quantum.plugins.cisco.db import api as db
from quantum import wsgi from quantum import wsgi
from quantum.common import config from quantum.common import config
from quantum.common import extensions from quantum.extensions import extensions
from quantum import api as server from quantum import api as server
from quantum.plugins.cisco.l2network_plugin import L2Network from quantum.plugins.cisco.l2network_plugin import L2Network
from quantum.tests.unit.extension_stubs import StubBaseAppController from quantum.tests.unit.extension_stubs import StubBaseAppController
from quantum.common.extensions import (PluginAwareExtensionManager, from quantum.extensions.extensions import (PluginAwareExtensionManager,
ExtensionMiddleware) ExtensionMiddleware)
from quantum.manager import QuantumManager from quantum.manager import QuantumManager
from quantum.plugins.cisco import l2network_plugin from quantum.plugins.cisco import l2network_plugin

View File

@ -15,7 +15,7 @@
# under the License. # under the License.
from abc import abstractmethod from abc import abstractmethod
from quantum.common import extensions from quantum.extensions import extensions
from quantum import wsgi from quantum import wsgi

View File

@ -18,7 +18,7 @@
import json import json
from quantum import wsgi from quantum import wsgi
from quantum.common import extensions from quantum.extensions import extensions
from abc import abstractmethod from abc import abstractmethod

View File

@ -1,422 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010-2011 ????
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# @author: Salvatore Orlando, Citrix Systems
""" Module containing unit tests for Quantum
command line interface
"""
import logging
import sys
import unittest
from quantum import api as server
from quantum.client import cli_lib as cli
from quantum.client import Client
from quantum.db import api as db
from quantum.tests.unit.client_tools import stubs as client_stubs
LOG = logging.getLogger('quantum.tests.test_cli')
FORMAT = 'json'
class CLITest(unittest.TestCase):
def setUp(self):
"""Prepare the test environment"""
options = {}
options['plugin_provider'] = \
'quantum.plugins.sample.SamplePlugin.FakePlugin'
#TODO: make the version of the API router configurable
self.api = server.APIRouterV11(options)
self.tenant_id = "test_tenant"
self.network_name_1 = "test_network_1"
self.network_name_2 = "test_network_2"
# Prepare client and plugin manager
self.client = Client(tenant=self.tenant_id, format=FORMAT,
testingStub=client_stubs.FakeHTTPConnection)
# Redirect stdout
self.fake_stdout = client_stubs.FakeStdout()
sys.stdout = self.fake_stdout
def tearDown(self):
"""Clear the test environment"""
db.clear_db()
sys.stdout = sys.__stdout__
def _verify_list_networks(self):
# Verification - get raw result from db
nw_list = db.network_list(self.tenant_id)
networks = [dict(id=nw.uuid, name=nw.name) for nw in nw_list]
# Fill CLI template
output = cli.prepare_output('list_nets', self.tenant_id,
dict(networks=networks))
# Verify!
# Must add newline at the end to match effect of print call
self.assertEquals(self.fake_stdout.make_string(), output + '\n')
def _verify_create_network(self):
# Verification - get raw result from db
nw_list = db.network_list(self.tenant_id)
if len(nw_list) != 1:
self.fail("No network created")
network_id = nw_list[0].uuid
# Fill CLI template
output = cli.prepare_output('create_net', self.tenant_id,
dict(network_id=network_id))
# Verify!
# Must add newline at the end to match effect of print call
self.assertEquals(self.fake_stdout.make_string(), output + '\n')
def _verify_delete_network(self, network_id):
# Verification - get raw result from db
nw_list = db.network_list(self.tenant_id)
if len(nw_list) != 0:
self.fail("DB should not contain any network")
# Fill CLI template
output = cli.prepare_output('delete_net', self.tenant_id,
dict(network_id=network_id))
# Verify!
# Must add newline at the end to match effect of print call
self.assertEquals(self.fake_stdout.make_string(), output + '\n')
def _verify_update_network(self):
# Verification - get raw result from db
nw_list = db.network_list(self.tenant_id)
network_data = {'id': nw_list[0].uuid,
'name': nw_list[0].name}
# Fill CLI template
output = cli.prepare_output('update_net', self.tenant_id,
dict(network=network_data))
# Verify!
# Must add newline at the end to match effect of print call
self.assertEquals(self.fake_stdout.make_string(), output + '\n')
def _verify_show_network(self):
# Verification - get raw result from db
nw = db.network_list(self.tenant_id)[0]
network = dict(id=nw.uuid, name=nw.name)
# Fill CLI template
output = cli.prepare_output('show_net', self.tenant_id,
dict(network=network))
# Verify!
# Must add newline at the end to match effect of print call
self.assertEquals(self.fake_stdout.make_string(), output + '\n')
def _verify_list_ports(self, network_id):
# Verification - get raw result from db
port_list = db.port_list(network_id)
ports = [dict(id=port.uuid, state=port.state)
for port in port_list]
# Fill CLI template
output = cli.prepare_output('list_ports', self.tenant_id,
dict(network_id=network_id,
ports=ports))
# Verify!
# Must add newline at the end to match effect of print call
self.assertEquals(self.fake_stdout.make_string(), output + '\n')
def _verify_create_port(self, network_id):
# Verification - get raw result from db
port_list = db.port_list(network_id)
if len(port_list) != 1:
self.fail("No port created")
port_id = port_list[0].uuid
# Fill CLI template
output = cli.prepare_output('create_port', self.tenant_id,
dict(network_id=network_id,
port_id=port_id))
# Verify!
# Must add newline at the end to match effect of print call
self.assertEquals(self.fake_stdout.make_string(), output + '\n')
def _verify_delete_port(self, network_id, port_id):
# Verification - get raw result from db
port_list = db.port_list(network_id)
if len(port_list) != 0:
self.fail("DB should not contain any port")
# Fill CLI template
output = cli.prepare_output('delete_port', self.tenant_id,
dict(network_id=network_id,
port_id=port_id))
# Verify!
# Must add newline at the end to match effect of print call
self.assertEquals(self.fake_stdout.make_string(), output + '\n')
def _verify_update_port(self, network_id, port_id):
# Verification - get raw result from db
port = db.port_get(port_id, network_id)
port_data = {'id': port.uuid, 'state': port.state}
# Fill CLI template
output = cli.prepare_output('update_port', self.tenant_id,
dict(network_id=network_id,
port=port_data))
# Verify!
# Must add newline at the end to match effect of print call
self.assertEquals(self.fake_stdout.make_string(), output + '\n')
def _verify_show_port(self, network_id, port_id):
# Verification - get raw result from db
# TODO(salvatore-orlando): Must resolve this issue with
# attachment in separate bug fix.
port = db.port_get(port_id, network_id)
port_data = {'id': port.uuid, 'state': port.state,
'attachment': "<none>"}
if port.interface_id is not None:
port_data['attachment'] = port.interface_id
# Fill CLI template
output = cli.prepare_output('show_port', self.tenant_id,
dict(network_id=network_id,
port=port_data))
# Verify!
# Must add newline at the end to match effect of print call
self.assertEquals(self.fake_stdout.make_string(), output + '\n')
def _verify_plug_iface(self, network_id, port_id):
# Verification - get raw result from db
port = db.port_get(port_id, network_id)
# Fill CLI template
output = cli.prepare_output("plug_iface", self.tenant_id,
dict(network_id=network_id,
port_id=port['uuid'],
attachment=port['interface_id']))
# Verify!
# Must add newline at the end to match effect of print call
self.assertEquals(self.fake_stdout.make_string(), output + '\n')
def _verify_unplug_iface(self, network_id, port_id):
# Verification - get raw result from db
port = db.port_get(port_id, network_id)
# Fill CLI template
output = cli.prepare_output("unplug_iface", self.tenant_id,
dict(network_id=network_id,
port_id=port['uuid']))
# Verify!
# Must add newline at the end to match effect of print call
self.assertEquals(self.fake_stdout.make_string(), output + '\n')
def test_list_networks(self):
try:
# Pre-populate data for testing using db api
db.network_create(self.tenant_id, self.network_name_1)
db.network_create(self.tenant_id, self.network_name_2)
cli.list_nets(self.client, self.tenant_id)
except:
LOG.exception("Exception caught: %s", sys.exc_info())
self.fail("test_list_networks failed due to an exception")
LOG.debug("Operation completed. Verifying result")
LOG.debug(self.fake_stdout.content)
self._verify_list_networks()
def test_create_network(self):
try:
cli.create_net(self.client, self.tenant_id, "test")
except:
LOG.exception("Exception caught: %s", sys.exc_info())
self.fail("test_create_network failed due to an exception")
LOG.debug("Operation completed. Verifying result")
LOG.debug(self.fake_stdout.content)
self._verify_create_network()
def test_delete_network(self):
try:
db.network_create(self.tenant_id, self.network_name_1)
network_id = db.network_list(self.tenant_id)[0]['uuid']
cli.delete_net(self.client, self.tenant_id, network_id)
except:
LOG.exception("Exception caught: %s", sys.exc_info())
self.fail("test_delete_network failed due to an exception")
LOG.debug("Operation completed. Verifying result")
LOG.debug(self.fake_stdout.content)
self._verify_delete_network(network_id)
def test_show_network(self):
try:
# Load some data into the datbase
net = db.network_create(self.tenant_id, self.network_name_1)
cli.show_net(self.client, self.tenant_id, net['uuid'])
except:
LOG.exception("Exception caught: %s", sys.exc_info())
self.fail("test_detail_network failed due to an exception")
LOG.debug("Operation completed. Verifying result")
LOG.debug(self.fake_stdout.content)
self._verify_show_network()
def test_update_network(self):
try:
net = db.network_create(self.tenant_id, self.network_name_1)
network_id = net['uuid']
cli.update_net(self.client, self.tenant_id,
network_id, 'name=%s' % self.network_name_2)
except:
LOG.exception("Exception caught: %s", sys.exc_info())
self.fail("test_update_network failed due to an exception")
LOG.debug("Operation completed. Verifying result")
LOG.debug(self.fake_stdout.content)
self._verify_update_network()
def test_list_ports(self):
try:
# Pre-populate data for testing using db api
net = db.network_create(self.tenant_id, self.network_name_1)
network_id = net['uuid']
db.port_create(network_id)
db.port_create(network_id)
cli.list_ports(self.client, self.tenant_id, network_id)
except:
LOG.exception("Exception caught: %s", sys.exc_info())
self.fail("test_list_ports failed due to an exception")
LOG.debug("Operation completed. Verifying result")
LOG.debug(self.fake_stdout.content)
self._verify_list_ports(network_id)
def test_create_port(self):
network_id = None
try:
# Pre-populate data for testing using db api
net = db.network_create(self.tenant_id, self.network_name_1)
network_id = net['uuid']
cli.create_port(self.client, self.tenant_id, network_id)
except:
LOG.exception("Exception caught: %s", sys.exc_info())
self.fail("test_create_port failed due to an exception")
LOG.debug("Operation completed. Verifying result")
LOG.debug(self.fake_stdout.content)
self._verify_create_port(network_id)
def test_delete_port(self):
network_id = None
port_id = None
try:
# Pre-populate data for testing using db api
net = db.network_create(self.tenant_id, self.network_name_1)
network_id = net['uuid']
port = db.port_create(network_id)
port_id = port['uuid']
cli.delete_port(self.client, self.tenant_id, network_id, port_id)
except:
LOG.exception("Exception caught: %s", sys.exc_info())
self.fail("test_delete_port failed due to an exception")
LOG.debug("Operation completed. Verifying result")
LOG.debug(self.fake_stdout.content)
self._verify_delete_port(network_id, port_id)
def test_update_port(self):
try:
net = db.network_create(self.tenant_id, self.network_name_1)
network_id = net['uuid']
port = db.port_create(network_id)
port_id = port['uuid']
# Default state is DOWN - change to ACTIVE.
cli.update_port(self.client, self.tenant_id, network_id,
port_id, 'state=ACTIVE')
except:
LOG.exception("Exception caught: %s", sys.exc_info())
self.fail("test_update_port failed due to an exception")
LOG.debug("Operation completed. Verifying result")
LOG.debug(self.fake_stdout.content)
self._verify_update_port(network_id, port_id)
def test_show_port_no_attach(self):
network_id = None
port_id = None
try:
# Pre-populate data for testing using db api
net = db.network_create(self.tenant_id, self.network_name_1)
network_id = net['uuid']
port = db.port_create(network_id)
port_id = port['uuid']
cli.show_port(self.client, self.tenant_id, network_id, port_id)
except:
LOG.exception("Exception caught: %s", sys.exc_info())
self.fail("test_show_port_no_attach failed due to an exception")
LOG.debug("Operation completed. Verifying result")
LOG.debug(self.fake_stdout.content)
self._verify_show_port(network_id, port_id)
def test_show_port_with_attach(self):
network_id = None
port_id = None
iface_id = "flavor crystals"
try:
# Pre-populate data for testing using db api
net = db.network_create(self.tenant_id, self.network_name_1)
network_id = net['uuid']
port = db.port_create(network_id)
port_id = port['uuid']
db.port_set_attachment(port_id, network_id, iface_id)
cli.show_port(self.client, self.tenant_id, network_id, port_id)
except:
LOG.exception("Exception caught: %s", sys.exc_info())
self.fail("test_show_port_with_attach failed due to an exception")
LOG.debug("Operation completed. Verifying result")
LOG.debug(self.fake_stdout.content)
self._verify_show_port(network_id, port_id)
def test_plug_iface(self):
network_id = None
port_id = None
try:
# Load some data into the datbase
net = db.network_create(self.tenant_id, self.network_name_1)
network_id = net['uuid']
port = db.port_create(net['uuid'])
port_id = port['uuid']
cli.plug_iface(self.client, self.tenant_id, network_id,
port_id, "test_iface_id")
except:
LOG.exception("Exception caught: %s", sys.exc_info())
self.fail("test_plug_iface failed due to an exception")
LOG.debug("Operation completed. Verifying result")
LOG.debug(self.fake_stdout.content)
self._verify_plug_iface(network_id, port_id)
def test_unplug_iface(self):
network_id = None
port_id = None
try:
# Load some data into the datbase
net = db.network_create(self.tenant_id, self.network_name_1)
network_id = net['uuid']
port = db.port_create(net['uuid'])
port_id = port['uuid']
db.port_set_attachment(port_id, network_id, "test_iface_id")
cli.unplug_iface(self.client, self.tenant_id, network_id, port_id)
except:
LOG.exception("Exception caught: %s", sys.exc_info())
self.fail("test_plug_iface failed due to an exception")
LOG.debug("Operation completed. Verifying result")
LOG.debug(self.fake_stdout.content)
self._verify_unplug_iface(network_id, port_id)

View File

@ -1,625 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Cisco Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# @author: Tyler Smith, Cisco Systems
import logging
import unittest
import re
from quantum.common.serializer import Serializer
from quantum.client import Client
LOG = logging.getLogger('quantum.tests.test_api')
# Set a couple tenants to use for testing
TENANT_1 = 'totore'
TENANT_2 = 'totore2'
class ServerStub():
"""This class stubs a basic server for the API client to talk to"""
class Response(object):
"""This class stubs a basic response to send the API client"""
def __init__(self, content=None, status=None):
self.content = content
self.status = status
def read(self):
return self.content
def status(self):
return self.status
# To test error codes, set the host to 10.0.0.1, and the port to the code
def __init__(self, host, port=9696, key_file="", cert_file=""):
self.host = host
self.port = port
self.key_file = key_file
self.cert_file = cert_file
def request(self, method, action, body, headers):
self.method = method
self.action = action
self.body = body
def status(self, status=None):
return status or 200
def getresponse(self):
res = self.Response(status=self.status())
# If the host is 10.0.0.1, return the port as an error code
if self.host == "10.0.0.1":
res.status = self.port
return res
# Extract important information from the action string to assure sanity
match = re.search('tenants/(.+?)/(.+)\.(json|xml)$', self.action)
tenant = match.group(1)
path = match.group(2)
format = match.group(3)
data = {'data': {'method': self.method, 'action': self.action,
'body': self.body, 'tenant': tenant, 'path': path,
'format': format, 'key_file': self.key_file,
'cert_file': self.cert_file}}
# Serialize it to the proper format so the API client can handle it
if data['data']['format'] == 'json':
res.content = Serializer().serialize(data, "application/json")
else:
res.content = Serializer().serialize(data, "application/xml")
return res
class APITest(unittest.TestCase):
def setUp(self):
""" Setups a test environment for the API client """
HOST = '127.0.0.1'
PORT = 9696
USE_SSL = False
self.client = Client(HOST, PORT, USE_SSL, TENANT_1, 'json', ServerStub)
def _assert_sanity(self, call, status, method, path, data=[], params={}):
""" Perform common assertions to test the sanity of client requests """
# Handle an error case first
if status != 200:
(self.client.host, self.client.port) = ("10.0.0.1", status)
self.assertRaises(Exception, call, *data, **params)
return
# Make the call, then get the data from the root node and assert it
data = call(*data, **params)['data']
self.assertEqual(data['method'], method)
self.assertEqual(data['format'], params['format'])
self.assertEqual(data['tenant'], params['tenant'])
self.assertEqual(data['path'], path)
return data
def _test_list_networks(self, tenant=TENANT_1, format='json', status=200):
LOG.debug("_test_list_networks - tenant:%s "\
"- format:%s - START", format, tenant)
self._assert_sanity(self.client.list_networks,
status,
"GET",
"networks",
data=[],
params={'tenant': tenant, 'format': format})
LOG.debug("_test_list_networks - tenant:%s "\
"- format:%s - END", format, tenant)
def _test_show_network_details(self,
tenant=TENANT_1, format='json', status=200):
LOG.debug("_test_show_network_details - tenant:%s "\
"- format:%s - START", format, tenant)
self._assert_sanity(self.client.show_network_details,
status,
"GET",
"networks/001",
data=["001"],
params={'tenant': tenant, 'format': format})
LOG.debug("_test_show_network_details - tenant:%s "\
"- format:%s - END", format, tenant)
def _test_create_network(self, tenant=TENANT_1, format='json', status=200):
LOG.debug("_test_create_network - tenant:%s "\
"- format:%s - START", format, tenant)
self._assert_sanity(self.client.create_network,
status,
"POST",
"networks",
data=[{'network': {'net-name': 'testNetwork'}}],
params={'tenant': tenant, 'format': format})
LOG.debug("_test_create_network - tenant:%s "\
"- format:%s - END", format, tenant)
def _test_update_network(self, tenant=TENANT_1, format='json', status=200):
LOG.debug("_test_update_network - tenant:%s "\
"- format:%s - START", format, tenant)
self._assert_sanity(self.client.update_network,
status,
"PUT",
"networks/001",
data=["001",
{'network': {'net-name': 'newName'}}],
params={'tenant': tenant, 'format': format})
LOG.debug("_test_update_network - tenant:%s "\
"- format:%s - END", format, tenant)
def _test_delete_network(self, tenant=TENANT_1, format='json', status=200):
LOG.debug("_test_delete_network - tenant:%s "\
"- format:%s - START", format, tenant)
self._assert_sanity(self.client.delete_network,
status,
"DELETE",
"networks/001",
data=["001"],
params={'tenant': tenant, 'format': format})
LOG.debug("_test_delete_network - tenant:%s "\
"- format:%s - END", format, tenant)
def _test_list_ports(self, tenant=TENANT_1, format='json', status=200):
LOG.debug("_test_list_ports - tenant:%s "\
"- format:%s - START", format, tenant)
self._assert_sanity(self.client.list_ports,
status,
"GET",
"networks/001/ports",
data=["001"],
params={'tenant': tenant, 'format': format})
LOG.debug("_test_list_ports - tenant:%s "\
"- format:%s - END", format, tenant)
def _test_show_port_details(self,
tenant=TENANT_1, format='json', status=200):
LOG.debug("_test_show_port_details - tenant:%s "\
"- format:%s - START", format, tenant)
self._assert_sanity(self.client.show_port_details,
status,
"GET",
"networks/001/ports/001",
data=["001", "001"],
params={'tenant': tenant, 'format': format})
LOG.debug("_test_show_port_details - tenant:%s "\
"- format:%s - END", format, tenant)
def _test_create_port(self, tenant=TENANT_1, format='json', status=200):
LOG.debug("_test_create_port - tenant:%s "\
"- format:%s - START", format, tenant)
self._assert_sanity(self.client.create_port,
status,
"POST",
"networks/001/ports",
data=["001"],
params={'tenant': tenant, 'format': format})
LOG.debug("_test_create_port - tenant:%s "\
"- format:%s - END", format, tenant)
def _test_delete_port(self, tenant=TENANT_1, format='json', status=200):
LOG.debug("_test_delete_port - tenant:%s "\
"- format:%s - START", format, tenant)
self._assert_sanity(self.client.delete_port,
status,
"DELETE",
"networks/001/ports/001",
data=["001", "001"],
params={'tenant': tenant, 'format': format})
LOG.debug("_test_delete_port - tenant:%s "\
"- format:%s - END", format, tenant)
def _test_update_port(self, tenant=TENANT_1, format='json', status=200):
LOG.debug("_test_update_port - tenant:%s "\
"- format:%s - START", format, tenant)
self._assert_sanity(self.client.update_port,
status,
"PUT",
"networks/001/ports/001",
data=["001", "001",
{'port': {'state': 'ACTIVE'}}],
params={'tenant': tenant, 'format': format})
LOG.debug("_test_update_port - tenant:%s "\
"- format:%s - END", format, tenant)
def _test_show_port_attachment(self,
tenant=TENANT_1, format='json', status=200):
LOG.debug("_test_show_port_attachment - tenant:%s "\
"- format:%s - START", format, tenant)
self._assert_sanity(self.client.show_port_attachment,
status,
"GET",
"networks/001/ports/001/attachment",
data=["001", "001"],
params={'tenant': tenant, 'format': format})
LOG.debug("_test_show_port_attachment - tenant:%s "\
"- format:%s - END", format, tenant)
def _test_attach_resource(self, tenant=TENANT_1,
format='json', status=200):
LOG.debug("_test_attach_resource - tenant:%s "\
"- format:%s - START", format, tenant)
self._assert_sanity(self.client.attach_resource,
status,
"PUT",
"networks/001/ports/001/attachment",
data=["001", "001",
{'resource': {'id': '1234'}}],
params={'tenant': tenant, 'format': format})
LOG.debug("_test_attach_resource - tenant:%s "\
"- format:%s - END", format, tenant)
def _test_detach_resource(self, tenant=TENANT_1,
format='json', status=200):
LOG.debug("_test_detach_resource - tenant:%s "\
"- format:%s - START", format, tenant)
self._assert_sanity(self.client.detach_resource,
status,
"DELETE",
"networks/001/ports/001/attachment",
data=["001", "001"],
params={'tenant': tenant, 'format': format})
LOG.debug("_test_detach_resource - tenant:%s "\
"- format:%s - END", format, tenant)
def _test_ssl_certificates(self, tenant=TENANT_1,
format='json', status=200):
LOG.debug("_test_ssl_certificates - tenant:%s "\
"- format:%s - START", format, tenant)
# Set SSL, and our cert file
self.client.use_ssl = True
cert_file = "/fake.cert"
self.client.key_file = self.client.cert_file = cert_file
data = self._assert_sanity(self.client.list_networks,
status,
"GET",
"networks",
data=[],
params={'tenant': tenant, 'format': format})
self.assertEquals(data["key_file"], cert_file)
self.assertEquals(data["cert_file"], cert_file)
LOG.debug("_test_ssl_certificates - tenant:%s "\
"- format:%s - END", format, tenant)
def test_list_networks_json(self):
self._test_list_networks(format='json')
def test_list_networks_xml(self):
self._test_list_networks(format='xml')
def test_list_networks_alt_tenant(self):
self._test_list_networks(tenant=TENANT_2)
def test_list_networks_error_470(self):
self._test_list_networks(status=470)
def test_list_networks_error_401(self):
self._test_list_networks(status=401)
def test_show_network_details_json(self):
self._test_show_network_details(format='json')
def test_show_network_details_xml(self):
self._test_show_network_details(format='xml')
def test_show_network_details_alt_tenant(self):
self._test_show_network_details(tenant=TENANT_2)
def test_show_network_details_error_470(self):
self._test_show_network_details(status=470)
def test_show_network_details_error_401(self):
self._test_show_network_details(status=401)
def test_show_network_details_error_420(self):
self._test_show_network_details(status=420)
def test_create_network_json(self):
self._test_create_network(format='json')
def test_create_network_xml(self):
self._test_create_network(format='xml')
def test_create_network_alt_tenant(self):
self._test_create_network(tenant=TENANT_2)
def test_create_network_error_470(self):
self._test_create_network(status=470)
def test_create_network_error_401(self):
self._test_create_network(status=401)
def test_create_network_error_400(self):
self._test_create_network(status=400)
def test_create_network_error_422(self):
self._test_create_network(status=422)
def test_update_network_json(self):
self._test_update_network(format='json')
def test_update_network_xml(self):
self._test_update_network(format='xml')
def test_update_network_alt_tenant(self):
self._test_update_network(tenant=TENANT_2)
def test_update_network_error_470(self):
self._test_update_network(status=470)
def test_update_network_error_401(self):
self._test_update_network(status=401)
def test_update_network_error_400(self):
self._test_update_network(status=400)
def test_update_network_error_420(self):
self._test_update_network(status=420)
def test_update_network_error_422(self):
self._test_update_network(status=422)
def test_delete_network_json(self):
self._test_delete_network(format='json')
def test_delete_network_xml(self):
self._test_delete_network(format='xml')
def test_delete_network_alt_tenant(self):
self._test_delete_network(tenant=TENANT_2)
def test_delete_network_error_470(self):
self._test_delete_network(status=470)
def test_delete_network_error_401(self):
self._test_delete_network(status=401)
def test_delete_network_error_420(self):
self._test_delete_network(status=420)
def test_delete_network_error_421(self):
self._test_delete_network(status=421)
def test_list_ports_json(self):
self._test_list_ports(format='json')
def test_list_ports_xml(self):
self._test_list_ports(format='xml')
def test_list_ports_alt_tenant(self):
self._test_list_ports(tenant=TENANT_2)
def test_list_ports_error_470(self):
self._test_list_ports(status=470)
def test_list_ports_error_401(self):
self._test_list_ports(status=401)
def test_list_ports_error_420(self):
self._test_list_ports(status=420)
def test_show_port_details_json(self):
self._test_list_ports(format='json')
def test_show_port_details_xml(self):
self._test_list_ports(format='xml')
def test_show_port_details_alt_tenant(self):
self._test_list_ports(tenant=TENANT_2)
def test_show_port_details_error_470(self):
self._test_show_port_details(status=470)
def test_show_port_details_error_401(self):
self._test_show_port_details(status=401)
def test_show_port_details_error_420(self):
self._test_show_port_details(status=420)
def test_show_port_details_error_430(self):
self._test_show_port_details(status=430)
def test_create_port_json(self):
self._test_create_port(format='json')
def test_create_port_xml(self):
self._test_create_port(format='xml')
def test_create_port_alt_tenant(self):
self._test_create_port(tenant=TENANT_2)
def test_create_port_error_470(self):
self._test_create_port(status=470)
def test_create_port_error_401(self):
self._test_create_port(status=401)
def test_create_port_error_400(self):
self._test_create_port(status=400)
def test_create_port_error_420(self):
self._test_create_port(status=420)
def test_create_port_error_430(self):
self._test_create_port(status=430)
def test_create_port_error_431(self):
self._test_create_port(status=431)
def test_delete_port_json(self):
self._test_delete_port(format='json')
def test_delete_port_xml(self):
self._test_delete_port(format='xml')
def test_delete_port_alt_tenant(self):
self._test_delete_port(tenant=TENANT_2)
def test_delete_port_error_470(self):
self._test_delete_port(status=470)
def test_delete_port_error_401(self):
self._test_delete_port(status=401)
def test_delete_port_error_420(self):
self._test_delete_port(status=420)
def test_delete_port_error_430(self):
self._test_delete_port(status=430)
def test_delete_port_error_432(self):
self._test_delete_port(status=432)
def test_update_port_json(self):
self._test_update_port(format='json')
def test_update_port_xml(self):
self._test_update_port(format='xml')
def test_update_port_alt_tenant(self):
self._test_update_port(tenant=TENANT_2)
def test_update_port_error_470(self):
self._test_update_port(status=470)
def test_update_port_error_401(self):
self._test_update_port(status=401)
def test_update_port_error_400(self):
self._test_update_port(status=400)
def test_update_port_error_420(self):
self._test_update_port(status=420)
def test_update_port_error_430(self):
self._test_update_port(status=430)
def test_update_port_error_431(self):
self._test_update_port(status=431)
def test_show_port_attachment_json(self):
self._test_show_port_attachment(format='json')
def test_show_port_attachment_xml(self):
self._test_show_port_attachment(format='xml')
def test_show_port_attachment_alt_tenant(self):
self._test_show_port_attachment(tenant=TENANT_2)
def test_show_port_attachment_error_470(self):
self._test_show_port_attachment(status=470)
def test_show_port_attachment_error_401(self):
self._test_show_port_attachment(status=401)
def test_show_port_attachment_error_400(self):
self._test_show_port_attachment(status=400)
def test_show_port_attachment_error_420(self):
self._test_show_port_attachment(status=420)
def test_show_port_attachment_error_430(self):
self._test_show_port_attachment(status=430)
def test_attach_resource_json(self):
self._test_attach_resource(format='json')
def test_attach_resource_xml(self):
self._test_attach_resource(format='xml')
def test_attach_resource_alt_tenant(self):
self._test_attach_resource(tenant=TENANT_2)
def test_attach_resource_error_470(self):
self._test_attach_resource(status=470)
def test_attach_resource_error_401(self):
self._test_attach_resource(status=401)
def test_attach_resource_error_400(self):
self._test_attach_resource(status=400)
def test_attach_resource_error_420(self):
self._test_attach_resource(status=420)
def test_attach_resource_error_430(self):
self._test_attach_resource(status=430)
def test_attach_resource_error_432(self):
self._test_attach_resource(status=432)
def test_attach_resource_error_440(self):
self._test_attach_resource(status=440)
def test_detach_resource_json(self):
self._test_detach_resource(format='json')
def test_detach_resource_xml(self):
self._test_detach_resource(format='xml')
def test_detach_resource_alt_tenant(self):
self._test_detach_resource(tenant=TENANT_2)
def test_detach_resource_error_470(self):
self._test_detach_resource(status=470)
def test_detach_resource_error_401(self):
self._test_detach_resource(status=401)
def test_detach_resource_error_420(self):
self._test_detach_resource(status=420)
def test_detach_resource_error_430(self):
self._test_detach_resource(status=430)
def test_ssl_certificates(self):
self._test_ssl_certificates()

View File

@ -27,7 +27,7 @@ from quantum import wsgi
from quantum.api import faults from quantum.api import faults
from quantum.common import config from quantum.common import config
from quantum.common import exceptions from quantum.common import exceptions
from quantum.common import extensions from quantum.extensions import extensions
import sys import sys
print sys.path print sys.path
from quantum.plugins.sample.SamplePlugin import QuantumEchoPlugin from quantum.plugins.sample.SamplePlugin import QuantumEchoPlugin
@ -36,12 +36,19 @@ from quantum.tests.unit.extension_stubs import (StubExtension, StubPlugin,
StubBaseAppController, StubBaseAppController,
ExtensionExpectingPluginInterface) ExtensionExpectingPluginInterface)
import quantum.tests.unit.extensions import quantum.tests.unit.extensions
from quantum.common.extensions import (ExtensionManager, from quantum.extensions.extensions import (ExtensionManager,
PluginAwareExtensionManager, PluginAwareExtensionManager,
ExtensionMiddleware) ExtensionMiddleware)
LOG = logging.getLogger('test_extensions') LOG = logging.getLogger('test_extensions')
from quantum.common import flags
FLAGS = flags.FLAGS
quantum_dir = os.path.dirname(os.path.abspath(quantum.__file__))
src_dir = os.path.abspath(os.path.join(quantum_dir, ".."))
FLAGS.state_path = src_dir
test_conf_file = config.find_config_file({}, None, "quantum.conf.test") test_conf_file = config.find_config_file({}, None, "quantum.conf.test")
extensions_path = ':'.join(quantum.tests.unit.extensions.__path__) extensions_path = ':'.join(quantum.tests.unit.extensions.__path__)

View File

@ -5,16 +5,6 @@
# openstack-nose https://github.com/jkoelker/openstack-nose # openstack-nose https://github.com/jkoelker/openstack-nose
verbosity=2 verbosity=2
detailed-errors=1 detailed-errors=1
with-coverage=1
cover-package=openstack.common
cover-html=1
cover-inclusive=1
with-tissue=1
tissue-repeat=1
tissue-show-pep8=1
tissue-show-source=1
tissue-inclusive=1
tissue-color=1
with-openstack=1 with-openstack=1
openstack-red=0.05 openstack-red=0.05
openstack-yellow=0.025 openstack-yellow=0.025

View File

@ -1,56 +0,0 @@
try:
from setuptools import setup, find_packages
except ImportError:
from ez_setup import use_setuptools
use_setuptools()
from setuptools import setup, find_packages
from quantum import version
Name = 'quantum-client'
Url = "https://launchpad.net/quantum"
Version = version.version_string()
License = 'Apache License 2.0'
Author = 'Netstack'
AuthorEmail = 'netstack@lists.launchpad.net'
Maintainer = ''
Summary = 'Client functionalities for Quantum'
ShortDescription = Summary
Description = Summary
requires = [
'quantum-common'
]
EagerResources = [
'quantum',
]
ProjectScripts = [
]
PackageData = {
}
setup(
name=Name,
version=Version,
url=Url,
author=Author,
author_email=AuthorEmail,
description=ShortDescription,
long_description=Description,
license=License,
scripts=ProjectScripts,
install_requires=requires,
include_package_data=False,
packages=["quantum.client"],
package_data=PackageData,
eager_resources=EagerResources,
entry_points={
'console_scripts': [
'quantum = quantum.client.cli:main'
]
},
)

View File

@ -1,67 +0,0 @@
try:
from setuptools import setup, find_packages
except ImportError:
from ez_setup import use_setuptools
use_setuptools()
from setuptools import setup, find_packages
from quantum import version
Name = 'quantum-common'
Url = "https://launchpad.net/quantum"
Version = version.version_string()
License = 'Apache License 2.0'
Author = 'Netstack'
AuthorEmail = 'netstack@lists.launchpad.net'
Maintainer = ''
Summary = 'Common functionalities for Quantum'
ShortDescription = Summary
Description = Summary
requires = [
'eventlet>=0.9.12',
'Routes>=1.12.3',
'nose',
'Paste',
'PasteDeploy',
'pep8>=0.6.1',
'python-gflags',
'simplejson',
'sqlalchemy',
'webob',
'webtest'
]
EagerResources = [
'quantum',
]
ProjectScripts = [
]
PackageData = {
}
exclude = ['quantum.client', 'quantum.client.*', 'quantum.server',
'quantum.server.*', 'quantum.tests', 'quantum.tests.*',
'quantum.plugins.*']
pkgs = find_packages('.', exclude=exclude)
pkgs = filter(lambda x: x.startswith("quantum"), pkgs)
setup(
name=Name,
version=Version,
url=Url,
author=Author,
author_email=AuthorEmail,
description=ShortDescription,
long_description=Description,
license=License,
scripts=ProjectScripts,
install_requires=requires,
include_package_data=False,
packages=pkgs,
package_data=PackageData,
eager_resources=EagerResources,
entry_points={},
)

View File

@ -1,15 +1,19 @@
Paste
PasteDeploy
Routes>=1.12.3
coverage coverage
distribute>=0.6.24 distribute>=0.6.24
eventlet>=0.9.12 eventlet>=0.9.12
Routes>=1.12.3
lxml==2.3 lxml==2.3
nose
nosexcover
Paste
PasteDeploy
pep8>=0.5.0
python-gflags python-gflags
simplejson simplejson
sqlalchemy sqlalchemy
webob webob
webtest webtest
nose
nosexcover
pep8==0.6.1
-e git+https://review.openstack.org/p/openstack/python-quantumclient#egg=python-quantumclient-dev
-e git+https://review.openstack.org/p/openstack-dev/openstack-nose.git#egg=openstack.nose_plugin