Notification for network/subnet/port create/delete/update.

blueprint quantum-notifications

Add host configuration into conf, which is determined by socket.gethostname()
function by default. Host name is part of publiser id, which is in the format
of 'network'.$host.

We add create/update/delete start and end notification for each kind of resource.
By default, the notification do nothing since the notifier driver is no_op_notifier.
We can define it in quantum.conf.

Change-Id: Ibc5eacac7a324584e6ccff120f573444932a88ef
This commit is contained in:
Yong Sheng Gong 2012-07-25 22:55:33 +08:00
parent 5df522c9d1
commit c3aa5bc1e3
3 changed files with 204 additions and 2 deletions

View File

@ -47,3 +47,112 @@ api_paste_config = api-paste.ini
# default driver to use for quota checks
# quota_driver = quantum.quota.ConfDriver
# ============ Notification System Options =====================
# Notifications can be sent when network/subnet/port are create, updated or deleted.
# There are four methods of sending notifications, logging (via the
# log_file directive), rpc (via a message queue),
# noop (no notifications sent, the default) or list of them
# Defined in notifier api
# notification_driver = quantum.openstack.common.notifier.no_op_notifier
# default_notification_level = INFO
# myhost = myhost.com
# default_publisher_id = $myhost
# Defined in rabbit_notifier for rpc way
# notification_topics = notifications
# Defined in list_notifier
# list_notifier_drivers = quantum.openstack.common.notifier.no_op_notifier
# Defined in rpc __init__
# The messaging module to use, defaults to kombu.
# rpc_backend =quantum.openstack.common.notifier.rpc.impl_kombu
# Size of RPC thread pool
# rpc_thread_pool_size = 64,
# Size of RPC connection pool
# rpc_conn_pool_size = 30
# Seconds to wait for a response from call or multicall
# rpc_response_timeout = 60
# Seconds to wait before a cast expires (TTL). Only supported by impl_zmq.
# rpc_cast_timeout = 30
# Modules of exceptions that are permitted to be recreated
# upon receiving exception data from an rpc call.
# allowed_rpc_exception_modules = quantum.openstack.common.exception, nova.exception
# AMQP exchange to connect to if using RabbitMQ or Qpid
# control_exchange = nova
# If passed, use a fake RabbitMQ provider
# fake_rabbit = False
# Configuration options if sending notifications via kombu rpc (these are
# the defaults)
# SSL version to use (valid only if SSL enabled)
# kombu_ssl_version =
# SSL key file (valid only if SSL enabled)
# kombu_ssl_keyfile =
# SSL cert file (valid only if SSL enabled)
# kombu_ssl_certfile =
# SSL certification authority file (valid only if SSL enabled)'
# kombu_ssl_ca_certs =
# IP address of the RabbitMQ installation
# rabbit_host = localhost
# Password of the RabbitMQ server
# rabbit_password = guest
# Port where RabbitMQ server is running/listening
# rabbit_port = 5672
# User ID used for RabbitMQ connections
# rabbit_userid = guest
# Location of a virtual RabbitMQ installation.
# rabbit_virtual_host = /
# Maximum retries with trying to connect to RabbitMQ
# (the default of 0 implies an infinite retry count)
# rabbit_max_retries = 0
# RabbitMQ connection retry interval
# rabbit_retry_interval = 1
# QPID
# rpc_backend=quantum.openstack.common.rpc.impl_qpid
# Qpid broker hostname
# qpid_hostname = localhost
# Qpid broker port
# qpid_port = 5672
# Username for qpid connection
# qpid_username = ''
# Password for qpid connection
# qpid_password = ''
# Space separated list of SASL mechanisms to use for auth
# qpid_sasl_mechanisms = ''
# Automatically reconnect
# qpid_reconnect = True
# Reconnection timeout in seconds
# qpid_reconnect_timeout = 0
# Max reconnections before giving up
# qpid_reconnect_limit = 0
# Minimum seconds between reconnection attempts
# qpid_reconnect_interval_min = 0
# Maximum seconds between reconnection attempts
# qpid_reconnect_interval_max = 0
# Equivalent to setting max and min to the same value
# qpid_reconnect_interval = 0
# Seconds between connection keepalive heartbeats
# qpid_heartbeat = 5
# Transport to use, either 'tcp' or 'ssl'
# qpid_protocol = tcp
# Disable Nagle algorithm
# qpid_tcp_nodelay = True
# ZMQ
# rpc_backend=quantum.openstack.common.rpc.impl_zmq
# ZeroMQ bind address. Should be a wildcard (*), an ethernet interface, or IP.
# The "host" option should point or resolve to this address.
# rpc_zmq_bind_address = *
# MatchMaker driver
# rpc_zmq_matchmaker = openstack.common.rpc.matchmaker.MatchMakerLocalhost
# ZeroMQ receiver listening port
# rpc_zmq_port = 9501
# Number of ZeroMQ contexts, defaults to 1
# rpc_zmq_contexts = 1
# Directory for holding IPC sockets
# rpc_zmq_ipc_dir = /var/run/openstack

View File

@ -14,12 +14,16 @@
# limitations under the License.
import logging
import socket
import webob.exc
from quantum.api.v2 import attributes
from quantum.api.v2 import resource as wsgi_resource
from quantum.common import exceptions
from quantum.common import utils
from quantum.openstack.common import cfg
from quantum.openstack.common.notifier import api as notifier_api
from quantum import policy
from quantum import quota
@ -40,6 +44,14 @@ FAULT_MAP = {exceptions.NotFound: webob.exc.HTTPNotFound,
QUOTAS = quota.QUOTAS
def _get_hostname():
return socket.gethostname()
# Register the configuration options
cfg.CONF.register_opt(cfg.StrOpt('host', default=_get_hostname()))
def fields(request):
"""
Extracts the list of fields to return
@ -111,6 +123,7 @@ class Controller(object):
self._policy_attrs = [name for (name, info) in self._attr_info.items()
if 'required_by_policy' in info
and info['required_by_policy']]
self._publisher_id = notifier_api.publisher_id('network')
def _is_visible(self, attr):
attr_val = self._attr_info.get(attr)
@ -189,6 +202,11 @@ class Controller(object):
def create(self, request, body=None):
"""Creates a new instance of the requested entity"""
notifier_api.notify(request.context,
self._publisher_id,
self._resource + '.create.start',
notifier_api.INFO,
body)
body = self._prepare_request_body(request.context, body, True,
allow_bulk=True)
action = "create_%s" % self._resource
@ -229,10 +247,21 @@ class Controller(object):
obj_creator = getattr(self._plugin, action)
kwargs = {self._resource: body}
obj = obj_creator(request.context, **kwargs)
return {self._resource: self._view(obj)}
result = {self._resource: self._view(obj)}
notifier_api.notify(request.context,
self._publisher_id,
self._resource + '.create.end',
notifier_api.INFO,
result)
return result
def delete(self, request, id):
"""Deletes the specified entity"""
notifier_api.notify(request.context,
self._publisher_id,
self._resource + '.delete.start',
notifier_api.INFO,
{self._resource + '_id': id})
action = "delete_%s" % self._resource
# Check authz
@ -246,9 +275,21 @@ class Controller(object):
obj_deleter = getattr(self._plugin, action)
obj_deleter(request.context, id)
notifier_api.notify(request.context,
self._publisher_id,
self._resource + '.delete.end',
notifier_api.INFO,
{self._resource + '_id': id})
def update(self, request, id, body=None):
"""Updates the specified entity's attributes"""
payload = body.copy()
payload['id'] = id
notifier_api.notify(request.context,
self._publisher_id,
self._resource + '.update.start',
notifier_api.INFO,
payload)
body = self._prepare_request_body(request.context, body, False)
action = "update_%s" % self._resource
@ -264,7 +305,13 @@ class Controller(object):
obj_updater = getattr(self._plugin, action)
kwargs = {self._resource: body}
obj = obj_updater(request.context, id, **kwargs)
return {self._resource: self._view(obj)}
result = {self._resource: self._view(obj)}
notifier_api.notify(request.context,
self._publisher_id,
self._resource + '.update.end',
notifier_api.INFO,
result)
return result
def _populate_tenant_id(self, context, res_dict, is_create):

View File

@ -32,6 +32,7 @@ from quantum import context
from quantum.extensions.extensions import PluginAwareExtensionManager
from quantum.manager import QuantumManager
from quantum.openstack.common import cfg
from quantum.openstack.common.notifier import api as notifer_api
LOG = logging.getLogger(__name__)
@ -755,6 +756,51 @@ class V2Views(unittest.TestCase):
self._view(keys, 'subnets', 'subnet')
class NotificationTest(APIv2TestBase):
def _resource_op_notifier(self, opname, resource, expected_errors=False):
initial_input = {resource: {'name': 'myname'}}
instance = self.plugin.return_value
instance.get_networks.return_value = initial_input
expected_code = exc.HTTPCreated.code
with mock.patch.object(notifer_api, 'notify') as mynotifier:
if opname == 'create':
initial_input[resource]['tenant_id'] = _uuid()
res = self.api.post_json(
_get_path('networks'), initial_input, expected_errors)
if opname == 'update':
res = self.api.put_json(
_get_path('networks', id=_uuid()),
initial_input, expect_errors=expected_errors)
expected_code = exc.HTTPOk.code
if opname == 'delete':
initial_input[resource]['tenant_id'] = _uuid()
res = self.api.delete(
_get_path('networks', id=_uuid()),
expect_errors=expected_errors)
expected_code = exc.HTTPNoContent.code
expected = [mock.call(mock.ANY,
'network.' + cfg.CONF.host,
resource + "." + opname + ".start",
'INFO',
mock.ANY),
mock.call(mock.ANY,
'network.' + cfg.CONF.host,
resource + "." + opname + ".end",
'INFO',
mock.ANY)]
self.assertEqual(expected, mynotifier.call_args_list)
self.assertEqual(res.status_int, expected_code)
def test_network_create_notifer(self):
self._resource_op_notifier('create', 'network')
def test_network_delete_notifer(self):
self._resource_op_notifier('delete', 'network')
def test_network_update_notifer(self):
self._resource_op_notifier('update', 'network')
class QuotaTest(APIv2TestBase):
def test_create_network_quota(self):
cfg.CONF.set_override('quota_network', 1, group='QUOTAS')