Add metering extension and base class

This a part of the blueprint bandwidth-router-label

This patch initiates the blueprint by adding base class
to associate labels and metering rules to tenant's routers.

Change-Id: Ia93b49d881e79c3291730cff7b80f26c56fedb48
This commit is contained in:
Sylvain Afchain 2013-06-14 17:35:50 +02:00
parent e70b10ffc7
commit 1e4a5f1bdb
16 changed files with 1397 additions and 7 deletions

View File

@ -114,5 +114,13 @@
"get_network_profile": "", "get_network_profile": "",
"update_policy_profiles": "rule:admin_only", "update_policy_profiles": "rule:admin_only",
"get_policy_profiles": "", "get_policy_profiles": "",
"get_policy_profile": "" "get_policy_profile": "",
"create_metering_label": "rule:admin_only",
"delete_metering_label": "rule:admin_only",
"get_metering_label": "rule:admin_only",
"create_metering_label_rule": "rule:admin_only",
"delete_metering_label_rule": "rule:admin_only",
"get_metering_label_rule": "rule:admin_only"
} }

View File

@ -0,0 +1,96 @@
# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
#
# Author: Sylvain Afchain <sylvain.afchain@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.common import constants
from neutron.common import topics
from neutron.common import utils
from neutron import manager
from neutron.openstack.common import log as logging
from neutron.openstack.common.rpc import proxy
LOG = logging.getLogger(__name__)
class MeteringAgentNotifyAPI(proxy.RpcProxy):
"""API for plugin to notify L3 metering agent."""
BASE_RPC_API_VERSION = '1.0'
def __init__(self, topic=topics.METERING_AGENT):
super(MeteringAgentNotifyAPI, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
def _agent_notification(self, context, method, routers):
"""Notify l3 metering agents hosted by l3 agent hosts."""
adminContext = context.is_admin and context or context.elevated()
plugin = manager.NeutronManager.get_plugin()
l3_routers = {}
for router in routers:
l3_agents = plugin.get_l3_agents_hosting_routers(
adminContext, [router['id']],
admin_state_up=True,
active=True)
for l3_agent in l3_agents:
LOG.debug(_('Notify metering agent at %(topic)s.%(host)s '
'the message %(method)s'),
{'topic': self.topic,
'host': l3_agent.host,
'method': method})
l3_router = l3_routers.get(l3_agent.host, [])
l3_router.append(router)
l3_routers[l3_agent.host] = l3_router
for host, routers in l3_routers.iteritems():
self.cast(context, self.make_msg(method, routers=routers),
topic='%s.%s' % (self.topic, host))
def _notification_fanout(self, context, method, router_id):
LOG.debug(_('Fanout notify metering agent at %(topic)s the message '
'%(method)s on router %(router_id)s'),
{'topic': self.topic,
'method': method,
'router_id': router_id})
self.fanout_cast(
context, self.make_msg(method,
router_id=router_id),
topic=self.topic)
def _notification(self, context, method, routers):
"""Notify all the agents that are hosting the routers."""
plugin = manager.NeutronManager.get_plugin()
if utils.is_extension_supported(
plugin, constants.L3_AGENT_SCHEDULER_EXT_ALIAS):
self._agent_notification(context, method, routers)
else:
self.fanout_cast(context, self.make_msg(method, routers=routers),
topic=self.topic)
def router_deleted(self, context, router_id):
self._notification_fanout(context, 'router_deleted', router_id)
def routers_updated(self, context, routers):
if routers:
self._notification(context, 'routers_updated', routers)
def update_metering_label_rules(self, context, routers):
self._notification(context, 'update_metering_label_rules', routers)
def add_metering_label(self, context, routers):
self._notification(context, 'add_metering_label', routers)
def remove_metering_label(self, context, routers):
self._notification(context, 'remove_metering_label', routers)

View File

@ -30,6 +30,7 @@ DEVICE_OWNER_DHCP = "network:dhcp"
FLOATINGIP_KEY = '_floatingips' FLOATINGIP_KEY = '_floatingips'
INTERFACE_KEY = '_interfaces' INTERFACE_KEY = '_interfaces'
METERING_LABEL_KEY = '_metering_labels'
IPv4 = 'IPv4' IPv4 = 'IPv4'
IPv6 = 'IPv6' IPv6 = 'IPv6'

View File

@ -26,9 +26,12 @@ AGENT = 'q-agent-notifier'
PLUGIN = 'q-plugin' PLUGIN = 'q-plugin'
DHCP = 'q-dhcp-notifer' DHCP = 'q-dhcp-notifer'
FIREWALL_PLUGIN = 'q-firewall-plugin' FIREWALL_PLUGIN = 'q-firewall-plugin'
METERING_PLUGIN = 'q-metering-plugin'
L3_AGENT = 'l3_agent' L3_AGENT = 'l3_agent'
DHCP_AGENT = 'dhcp_agent' DHCP_AGENT = 'dhcp_agent'
METERING_AGENT = 'metering_agent'
METERING_PLUGIN = 'metering_plugin'
def get_topic_name(prefix, table, operation): def get_topic_name(prefix, table, operation):

View File

@ -182,6 +182,11 @@ class CommonDbMixin(object):
def _get_collection_count(self, context, model, filters=None): def _get_collection_count(self, context, model, filters=None):
return self._get_collection_query(context, model, filters).count() return self._get_collection_query(context, model, filters).count()
def _get_marker_obj(self, context, resource, limit, marker):
if limit and marker:
return getattr(self, '_get_%s' % resource)(context, marker)
return None
class NeutronDbPluginV2(neutron_plugin_base_v2.NeutronPluginBaseV2, class NeutronDbPluginV2(neutron_plugin_base_v2.NeutronPluginBaseV2,
CommonDbMixin): CommonDbMixin):
@ -923,11 +928,6 @@ class NeutronDbPluginV2(neutron_plugin_base_v2.NeutronPluginBaseV2,
context.session.rollback() context.session.rollback()
return objects return objects
def _get_marker_obj(self, context, resource, limit, marker):
if limit and marker:
return getattr(self, '_get_%s' % resource)(context, marker)
return None
def create_network_bulk(self, context, networks): def create_network_bulk(self, context, networks):
return self._create_bulk('network', context, networks) return self._create_bulk('network', context, networks)

View File

@ -0,0 +1,15 @@
# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
#
# Author: Sylvain Afchain <sylvain.afchain@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,233 @@
# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
#
# Author: Sylvain Afchain <sylvain.afchain@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
import sqlalchemy as sa
from sqlalchemy import orm
from neutron.api.rpc.agentnotifiers import metering_rpc_agent_api
from neutron.common import constants
from neutron.db import api as dbapi
from neutron.db import db_base_plugin_v2 as base_db
from neutron.db import l3_db
from neutron.db import model_base
from neutron.db import models_v2
from neutron.extensions import metering
from neutron.openstack.common import log as logging
from neutron.openstack.common import uuidutils
LOG = logging.getLogger(__name__)
class MeteringLabelRule(model_base.BASEV2, models_v2.HasId):
direction = sa.Column(sa.Enum('ingress', 'egress',
name='meteringlabels_direction'))
remote_ip_prefix = sa.Column(sa.String(64))
metering_label_id = sa.Column(sa.String(36),
sa.ForeignKey("meteringlabels.id",
ondelete="CASCADE"),
nullable=False)
excluded = sa.Column(sa.Boolean, default=False)
class MeteringLabel(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
name = sa.Column(sa.String(255))
description = sa.Column(sa.String(1024))
rules = orm.relationship(MeteringLabelRule, backref="label",
cascade="delete", lazy="joined")
routers = orm.relationship(
l3_db.Router,
primaryjoin="MeteringLabel.tenant_id==Router.tenant_id",
foreign_keys='Router.tenant_id')
class MeteringDbMixin(metering.MeteringPluginBase,
base_db.CommonDbMixin):
def __init__(self):
dbapi.register_models()
self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI()
def _make_metering_label_dict(self, metering_label, fields=None):
res = {'id': metering_label['id'],
'name': metering_label['name'],
'description': metering_label['description'],
'tenant_id': metering_label['tenant_id']}
return self._fields(res, fields)
def create_metering_label(self, context, metering_label):
m = metering_label['metering_label']
tenant_id = self._get_tenant_id_for_create(context, m)
with context.session.begin(subtransactions=True):
metering_db = MeteringLabel(id=uuidutils.generate_uuid(),
description=m['description'],
tenant_id=tenant_id,
name=m['name'])
context.session.add(metering_db)
return self._make_metering_label_dict(metering_db)
def delete_metering_label(self, context, label_id):
with context.session.begin(subtransactions=True):
try:
label = self._get_by_id(context, MeteringLabel, label_id)
except orm.exc.NoResultFound:
raise metering.MeteringLabelNotFound(label_id=label_id)
context.session.delete(label)
def get_metering_label(self, context, label_id, fields=None):
try:
metering_label = self._get_by_id(context, MeteringLabel, label_id)
except orm.exc.NoResultFound:
raise metering.MeteringLabelNotFound(label_id=label_id)
return self._make_metering_label_dict(metering_label, fields)
def get_metering_labels(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
marker_obj = self._get_marker_obj(context, 'metering_labels', limit,
marker)
return self._get_collection(context, MeteringLabel,
self._make_metering_label_dict,
filters=filters, fields=fields,
sorts=sorts,
limit=limit,
marker_obj=marker_obj,
page_reverse=page_reverse)
def _make_metering_label_rule_dict(self, metering_label_rule, fields=None):
res = {'id': metering_label_rule['id'],
'metering_label_id': metering_label_rule['metering_label_id'],
'direction': metering_label_rule['direction'],
'remote_ip_prefix': metering_label_rule['remote_ip_prefix'],
'excluded': metering_label_rule['excluded']}
return self._fields(res, fields)
def get_metering_label_rules(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
marker_obj = self._get_marker_obj(context, 'metering_label_rules',
limit, marker)
return self._get_collection(context, MeteringLabelRule,
self._make_metering_label_rule_dict,
filters=filters, fields=fields,
sorts=sorts,
limit=limit,
marker_obj=marker_obj,
page_reverse=page_reverse)
def get_metering_label_rule(self, context, rule_id, fields=None):
try:
metering_label_rule = self._get_by_id(context,
MeteringLabelRule, rule_id)
except orm.exc.NoResultFound:
raise metering.MeteringLabelRuleNotFound(rule_id=rule_id)
return self._make_metering_label_rule_dict(metering_label_rule, fields)
def _validate_cidr(self, context, remote_ip_prefix, direction, excluded):
r_ips = self.get_metering_label_rules(context,
filters={'direction':
[direction],
'excluded':
[excluded]},
fields=['remote_ip_prefix'])
cidrs = [r['remote_ip_prefix'] for r in r_ips]
new_cidr_ipset = netaddr.IPSet([remote_ip_prefix])
if (netaddr.IPSet(cidrs) & new_cidr_ipset):
raise metering.MeteringLabelRuleOverlaps(remote_ip_prefix=
remote_ip_prefix)
def create_metering_label_rule(self, context, metering_label_rule):
m = metering_label_rule['metering_label_rule']
with context.session.begin(subtransactions=True):
label_id = m['metering_label_id']
ip_prefix = m['remote_ip_prefix']
direction = m['direction']
excluded = m['excluded']
self._validate_cidr(context, ip_prefix, direction, excluded)
metering_db = MeteringLabelRule(id=uuidutils.generate_uuid(),
metering_label_id=label_id,
direction=direction,
excluded=m['excluded'],
remote_ip_prefix=ip_prefix)
context.session.add(metering_db)
return self._make_metering_label_rule_dict(metering_db)
def delete_metering_label_rule(self, context, rule_id):
with context.session.begin(subtransactions=True):
try:
rule = self._get_by_id(context, MeteringLabelRule, rule_id)
except orm.exc.NoResultFound:
raise metering.MeteringLabelRuleNotFound(rule_id=rule_id)
context.session.delete(rule)
def _get_metering_rules_dict(self, metering_label):
rules = []
for rule in metering_label.rules:
rule_dict = self._make_metering_label_rule_dict(rule)
rules.append(rule_dict)
return rules
def _make_router_dict(self, router):
res = {'id': router['id'],
'name': router['name'],
'tenant_id': router['tenant_id'],
'admin_state_up': router['admin_state_up'],
'status': router['status'],
'gw_port_id': router['gw_port_id'],
constants.METERING_LABEL_KEY: []}
return res
def _process_sync_metering_data(self, labels):
routers_dict = {}
for label in labels:
routers = label.routers
for router in routers:
router_dict = routers_dict.get(
router['id'],
self._make_router_dict(router))
rules = self._get_metering_rules_dict(label)
data = {'id': label['id'], 'rules': rules}
router_dict[constants.METERING_LABEL_KEY].append(data)
routers_dict[router['id']] = router_dict
return routers_dict.values()
def get_sync_data_metering(self, context, label_id=None):
with context.session.begin(subtransactions=True):
if label_id:
label = self._get_by_id(context, MeteringLabel, label_id)
labels = [label]
else:
labels = self._get_collection_query(context, MeteringLabel)
return self._process_sync_metering_data(labels)

View File

@ -0,0 +1,77 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""metering
Revision ID: 569e98a8132b
Revises: 13de305df56e
Create Date: 2013-07-17 15:38:36.254595
"""
# revision identifiers, used by Alembic.
revision = '569e98a8132b'
down_revision = 'f9263d6df56'
# Change to ['*'] if this migration applies to all plugins
migration_for_plugins = ['neutron.services.metering.metering_plugin.'
'MeteringPlugin']
from alembic import op
import sqlalchemy as sa
from neutron.db import migration
def downgrade(active_plugins=None, options=None):
if not migration.should_run(active_plugins, migration_for_plugins):
return
op.drop_table('meteringlabelrules')
op.drop_table('meteringlabels')
def upgrade(active_plugins=None, options=None):
if not migration.should_run(active_plugins, migration_for_plugins):
return
op.create_table('meteringlabels',
sa.Column('tenant_id', sa.String(length=255),
nullable=True),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('name', sa.String(length=255),
nullable=True),
sa.Column('description', sa.String(length=255),
nullable=True),
sa.PrimaryKeyConstraint('id'))
op.create_table('meteringlabelrules',
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('direction',
sa.Enum('ingress', 'egress',
name='meteringlabels_direction'),
nullable=True),
sa.Column('remote_ip_prefix', sa.String(length=64),
nullable=True),
sa.Column('metering_label_id', sa.String(length=36),
nullable=False),
sa.Column('excluded', sa.Boolean(),
autoincrement=False, nullable=True),
sa.ForeignKeyConstraint(['metering_label_id'],
['meteringlabels.id'],
name='meteringlabelrules_ibfk_1'),
sa.PrimaryKeyConstraint('id'))

View File

@ -0,0 +1,204 @@
# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
#
# Author: Sylvain Afchain <sylvain.afchain@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from neutron.api import extensions
from neutron.api.v2 import attributes as attr
from neutron.api.v2 import base
from neutron.common import exceptions as qexception
from neutron import manager
from neutron.openstack.common import log as logging
from neutron.plugins.common import constants
from neutron.services import service_base
LOG = logging.getLogger(__name__)
class MeteringLabelNotFound(qexception.NotFound):
message = _("Metering label %(label_id)s does not exist")
class DuplicateMeteringRuleInPost(qexception.InUse):
message = _("Duplicate Metering Rule in POST.")
class MeteringLabelRuleNotFound(qexception.NotFound):
message = _("Metering label rule %(rule_id)s does not exist")
class MeteringLabelRuleOverlaps(qexception.NotFound):
message = _("Metering label rule with remote_ip_prefix "
"%(remote_ip_prefix)s overlaps another")
RESOURCE_ATTRIBUTE_MAP = {
'metering_labels': {
'id': {'allow_post': False, 'allow_put': False,
'is_visible': True,
'primary_key': True},
'name': {'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': ''},
'description': {'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': ''},
'tenant_id': {'allow_post': True, 'allow_put': False,
'required_by_policy': True,
'is_visible': True}
},
'metering_label_rules': {
'id': {'allow_post': False, 'allow_put': False,
'is_visible': True,
'primary_key': True},
'metering_label_id': {'allow_post': True, 'allow_put': False,
'validate': {'type:uuid': None},
'is_visible': True, 'required_by_policy': True},
'direction': {'allow_post': True, 'allow_put': True,
'is_visible': True,
'validate': {'type:values': ['ingress', 'egress']}},
'excluded': {'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': False,
'convert_to': attr.convert_to_boolean},
'remote_ip_prefix': {'allow_post': True, 'allow_put': False,
'is_visible': True, 'required_by_policy': True,
'validate': {'type:subnet': None}},
'tenant_id': {'allow_post': True, 'allow_put': False,
'required_by_policy': True,
'is_visible': True}
}
}
class Metering(extensions.ExtensionDescriptor):
@classmethod
def get_name(cls):
return "Neutron Metering"
@classmethod
def get_alias(cls):
return "metering"
@classmethod
def get_description(cls):
return "Neutron Metering extension."
@classmethod
def get_namespace(cls):
return "http://wiki.openstack.org/wiki/Neutron/Metering/Bandwidth#API"
@classmethod
def get_updated(cls):
return "2013-06-12T10:00:00-00:00"
@classmethod
def get_plugin_interface(cls):
return MeteringPluginBase
@classmethod
def get_resources(cls):
"""Returns Ext Resources."""
my_plurals = [(key, key[:-1]) for key in RESOURCE_ATTRIBUTE_MAP.keys()]
attr.PLURALS.update(dict(my_plurals))
exts = []
plugin = manager.NeutronManager.get_service_plugins()[
constants.METERING]
for resource_name in ['metering_label', 'metering_label_rule']:
collection_name = resource_name + "s"
collection_name = collection_name.replace('_', '-')
params = RESOURCE_ATTRIBUTE_MAP.get(resource_name + "s", dict())
controller = base.create_resource(collection_name,
resource_name,
plugin, params, allow_bulk=True,
allow_pagination=True,
allow_sorting=True)
ex = extensions.ResourceExtension(
collection_name,
controller,
path_prefix=constants.COMMON_PREFIXES[constants.METERING],
attr_map=params)
exts.append(ex)
return exts
def update_attributes_map(self, attributes):
super(Metering, self).update_attributes_map(
attributes, extension_attrs_map=RESOURCE_ATTRIBUTE_MAP)
def get_extended_resources(self, version):
if version == "2.0":
return RESOURCE_ATTRIBUTE_MAP
else:
return {}
class MeteringPluginBase(service_base.ServicePluginBase):
__metaclass__ = abc.ABCMeta
def get_plugin_name(self):
return constants.METERING
def get_plugin_description(self):
return constants.METERING
def get_plugin_type(self):
return constants.METERING
@abc.abstractmethod
def create_metering_label(self, context, metering_label):
"""Create a metering label."""
pass
@abc.abstractmethod
def delete_metering_label(self, context, label_id):
"""Delete a metering label."""
pass
@abc.abstractmethod
def get_metering_label(self, context, label_id, fields=None):
"""Get a metering label."""
pass
@abc.abstractmethod
def get_metering_labels(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
"""List all metering labels."""
pass
@abc.abstractmethod
def create_metering_label_rule(self, context, metering_label_rule):
"""Create a metering label rule."""
pass
@abc.abstractmethod
def get_metering_label_rule(self, context, rule_id, fields=None):
"""Get a metering label rule."""
pass
@abc.abstractmethod
def delete_metering_label_rule(self, context, rule_id):
"""Delete a metering label rule."""
pass
@abc.abstractmethod
def get_metering_label_rules(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
"""List all metering label rules."""
pass

View File

@ -21,6 +21,7 @@ DUMMY = "DUMMY"
LOADBALANCER = "LOADBALANCER" LOADBALANCER = "LOADBALANCER"
FIREWALL = "FIREWALL" FIREWALL = "FIREWALL"
VPN = "VPN" VPN = "VPN"
METERING = "METERING"
#maps extension alias to service type #maps extension alias to service type
EXT_TO_SERVICE_MAPPING = { EXT_TO_SERVICE_MAPPING = {
@ -28,10 +29,11 @@ EXT_TO_SERVICE_MAPPING = {
'lbaas': LOADBALANCER, 'lbaas': LOADBALANCER,
'fwaas': FIREWALL, 'fwaas': FIREWALL,
'vpnaas': VPN, 'vpnaas': VPN,
'metering': METERING,
} }
# TODO(salvatore-orlando): Move these (or derive them) from conf file # TODO(salvatore-orlando): Move these (or derive them) from conf file
ALLOWED_SERVICES = [CORE, DUMMY, LOADBALANCER, FIREWALL, VPN] ALLOWED_SERVICES = [CORE, DUMMY, LOADBALANCER, FIREWALL, VPN, METERING]
COMMON_PREFIXES = { COMMON_PREFIXES = {
CORE: "", CORE: "",
@ -39,6 +41,7 @@ COMMON_PREFIXES = {
LOADBALANCER: "/lb", LOADBALANCER: "/lb",
FIREWALL: "/fw", FIREWALL: "/fw",
VPN: "/vpn", VPN: "/vpn",
METERING: "/metering",
} }
# Service operation status constants # Service operation status constants

View File

@ -0,0 +1,15 @@
# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
#
# Author: Sylvain Afchain <sylvain.afchain@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,90 @@
# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
#
# Author: Sylvain Afchain <sylvain.afchain@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.api.rpc.agentnotifiers import metering_rpc_agent_api
from neutron.common import rpc as p_rpc
from neutron.common import topics
from neutron.db.metering import metering_db
from neutron.openstack.common import rpc
class MeteringCallbacks(metering_db.MeteringDbMixin):
RPC_API_VERSION = '1.0'
def __init__(self, plugin):
self.plugin = plugin
def create_rpc_dispatcher(self):
return p_rpc.PluginRpcDispatcher([self])
def get_sync_data_metering(self, context, **kwargs):
return super(MeteringCallbacks, self).get_sync_data_metering(context)
class MeteringPlugin(metering_db.MeteringDbMixin):
"""Implementation of the Neutron Metering Service Plugin."""
supported_extension_aliases = ["metering"]
def __init__(self):
super(MeteringPlugin, self).__init__()
self.callbacks = MeteringCallbacks(self)
self.conn = rpc.create_connection(new=True)
self.conn.create_consumer(
topics.METERING_PLUGIN,
self.callbacks.create_rpc_dispatcher(),
fanout=False)
self.conn.consume_in_thread()
self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI()
def create_metering_label(self, context, metering_label):
label = super(MeteringPlugin, self).create_metering_label(
context, metering_label)
data = self.get_sync_data_metering(context)
self.meter_rpc.add_metering_label(context, data)
return label
def delete_metering_label(self, context, label_id):
data = self.get_sync_data_metering(context, label_id)
label = super(MeteringPlugin, self).delete_metering_label(
context, label_id)
self.meter_rpc.remove_metering_label(context, data)
return label
def create_metering_label_rule(self, context, metering_label_rule):
rule = super(MeteringPlugin, self).create_metering_label_rule(
context, metering_label_rule)
data = self.get_sync_data_metering(context)
self.meter_rpc.update_metering_label_rules(context, data)
return rule
def delete_metering_label_rule(self, context, rule_id):
rule = super(MeteringPlugin, self).delete_metering_label_rule(
context, rule_id)
data = self.get_sync_data_metering(context)
self.meter_rpc.update_metering_label_rules(context, data)
return rule

View File

@ -0,0 +1,15 @@
# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
#
# Author: Sylvain Afchain <sylvain.afchain@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,268 @@
# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
#
# Author: Sylvain Afchain <sylvain.afchain@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import logging
import webob.exc
from neutron.api.extensions import ExtensionMiddleware
from neutron.api.extensions import PluginAwareExtensionManager
from neutron.common import config
from neutron import context
import neutron.extensions
from neutron.extensions import metering
from neutron.plugins.common import constants
from neutron.services.metering import metering_plugin
from neutron.tests.unit import test_db_plugin
LOG = logging.getLogger(__name__)
DB_METERING_PLUGIN_KLASS = (
"neutron.services.metering."
"metering_plugin.MeteringPlugin"
)
extensions_path = ':'.join(neutron.extensions.__path__)
class MeteringPluginDbTestCaseMixin(object):
def _create_metering_label(self, fmt, name, description, **kwargs):
data = {'metering_label': {'name': name,
'tenant_id': kwargs.get('tenant_id',
'test_tenant'),
'description': description}}
req = self.new_create_request('metering-labels', data,
fmt)
if kwargs.get('set_context') and 'tenant_id' in kwargs:
# create a specific auth context for this request
req.environ['neutron.context'] = (
context.Context('', kwargs['tenant_id'],
is_admin=kwargs.get('is_admin', True)))
return req.get_response(self.ext_api)
def _make_metering_label(self, fmt, name, description, **kwargs):
res = self._create_metering_label(fmt, name, description, **kwargs)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
def _create_metering_label_rule(self, fmt, metering_label_id, direction,
remote_ip_prefix, excluded, **kwargs):
data = {'metering_label_rule':
{'metering_label_id': metering_label_id,
'tenant_id': kwargs.get('tenant_id', 'test_tenant'),
'direction': direction,
'excluded': excluded,
'remote_ip_prefix': remote_ip_prefix}}
req = self.new_create_request('metering-label-rules',
data, fmt)
if kwargs.get('set_context') and 'tenant_id' in kwargs:
# create a specific auth context for this request
req.environ['neutron.context'] = (
context.Context('', kwargs['tenant_id']))
return req.get_response(self.ext_api)
def _make_metering_label_rule(self, fmt, metering_label_id, direction,
remote_ip_prefix, excluded, **kwargs):
res = self._create_metering_label_rule(fmt, metering_label_id,
direction, remote_ip_prefix,
excluded, **kwargs)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
@contextlib.contextmanager
def metering_label(self, name='label', description='desc',
fmt=None, no_delete=False, **kwargs):
if not fmt:
fmt = self.fmt
metering_label = self._make_metering_label(fmt, name,
description, **kwargs)
try:
yield metering_label
finally:
if not no_delete:
self._delete('metering-labels',
metering_label['metering_label']['id'])
@contextlib.contextmanager
def metering_label_rule(self, metering_label_id=None, direction='ingress',
remote_ip_prefix='10.0.0.0/24',
excluded='false', fmt=None, no_delete=False):
if not fmt:
fmt = self.fmt
metering_label_rule = self._make_metering_label_rule(fmt,
metering_label_id,
direction,
remote_ip_prefix,
excluded)
try:
yield metering_label_rule
finally:
if not no_delete:
self._delete('metering-label-rules',
metering_label_rule['metering_label_rule']['id'])
class MeteringPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase,
MeteringPluginDbTestCaseMixin):
fmt = 'json'
resource_prefix_map = dict(
(k.replace('_', '-'), constants.COMMON_PREFIXES[constants.METERING])
for k in metering.RESOURCE_ATTRIBUTE_MAP.keys()
)
def setUp(self, plugin=None):
service_plugins = {'metering_plugin_name': DB_METERING_PLUGIN_KLASS}
super(MeteringPluginDbTestCase, self).setUp(
plugin=plugin,
service_plugins=service_plugins
)
self.plugin = metering_plugin.MeteringPlugin()
ext_mgr = PluginAwareExtensionManager(
extensions_path,
{constants.METERING: self.plugin}
)
app = config.load_paste_app('extensions_test_app')
self.ext_api = ExtensionMiddleware(app, ext_mgr=ext_mgr)
def test_create_metering_label(self):
name = 'my label'
description = 'my metering label'
keys = [('name', name,), ('description', description)]
with self.metering_label(name, description) as metering_label:
for k, v, in keys:
self.assertEqual(metering_label['metering_label'][k], v)
def test_delete_metering_label(self):
name = 'my label'
description = 'my metering label'
with self.metering_label(name, description,
no_delete=True) as metering_label:
metering_label_id = metering_label['metering_label']['id']
self._delete('metering-labels', metering_label_id, 204)
def test_list_metering_label(self):
name = 'my label'
description = 'my metering label'
with contextlib.nested(
self.metering_label(name, description),
self.metering_label(name, description)) as metering_label:
self._test_list_resources('metering-label', metering_label)
def test_create_metering_label_rule(self):
name = 'my label'
description = 'my metering label'
with self.metering_label(name, description) as metering_label:
metering_label_id = metering_label['metering_label']['id']
direction = 'egress'
remote_ip_prefix = '192.168.0.0/24'
excluded = True
keys = [('metering_label_id', metering_label_id),
('direction', direction),
('excluded', excluded),
('remote_ip_prefix', remote_ip_prefix)]
with self.metering_label_rule(metering_label_id,
direction,
remote_ip_prefix,
excluded) as label_rule:
for k, v, in keys:
self.assertEqual(label_rule['metering_label_rule'][k], v)
def test_delete_metering_label_rule(self):
name = 'my label'
description = 'my metering label'
with self.metering_label(name, description) as metering_label:
metering_label_id = metering_label['metering_label']['id']
direction = 'egress'
remote_ip_prefix = '192.168.0.0/24'
excluded = True
with self.metering_label_rule(metering_label_id,
direction,
remote_ip_prefix,
excluded,
no_delete=True) as label_rule:
rule_id = label_rule['metering_label_rule']['id']
self._delete('metering-label-rules', rule_id, 204)
def test_list_metering_label_rule(self):
name = 'my label'
description = 'my metering label'
with self.metering_label(name, description) as metering_label:
metering_label_id = metering_label['metering_label']['id']
direction = 'egress'
remote_ip_prefix = '192.168.0.0/24'
excluded = True
with contextlib.nested(
self.metering_label_rule(metering_label_id,
direction,
remote_ip_prefix,
excluded),
self.metering_label_rule(metering_label_id,
'ingress',
remote_ip_prefix,
excluded)) as metering_label_rule:
self._test_list_resources('metering-label-rule',
metering_label_rule)
def test_create_metering_label_rules(self):
name = 'my label'
description = 'my metering label'
with self.metering_label(name, description) as metering_label:
metering_label_id = metering_label['metering_label']['id']
direction = 'egress'
remote_ip_prefix = '192.168.0.0/24'
excluded = True
with contextlib.nested(
self.metering_label_rule(metering_label_id,
direction,
remote_ip_prefix,
excluded),
self.metering_label_rule(metering_label_id,
direction,
'0.0.0.0/0',
False)) as metering_label_rule:
self._test_list_resources('metering-label-rule',
metering_label_rule)
class TestMeteringDbXML(MeteringPluginDbTestCase):
fmt = 'xml'

View File

@ -0,0 +1,15 @@
# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
#
# Author: Sylvain Afchain <sylvain.afchain@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,347 @@
# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
#
# Author: Sylvain Afchain <sylvain.afchain@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron.api.v2 import attributes as attr
from neutron.common.test_lib import test_config
from neutron import context
from neutron.db import agents_db
from neutron.db import agentschedulers_db
from neutron.extensions import l3 as ext_l3
from neutron.extensions import metering as ext_metering
from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants
from neutron.tests.unit.db.metering import test_db_metering
from neutron.tests.unit import test_db_plugin
from neutron.tests.unit import test_l3_plugin
_uuid = uuidutils.generate_uuid
DB_METERING_PLUGIN_KLASS = (
"neutron.services.metering."
"metering_plugin.MeteringPlugin"
)
class MeteringTestExtensionManager(object):
def get_resources(self):
attr.RESOURCE_ATTRIBUTE_MAP.update(ext_metering.RESOURCE_ATTRIBUTE_MAP)
attr.RESOURCE_ATTRIBUTE_MAP.update(ext_l3.RESOURCE_ATTRIBUTE_MAP)
l3_res = ext_l3.L3.get_resources()
metering_res = ext_metering.Metering.get_resources()
return l3_res + metering_res
def get_actions(self):
return []
def get_request_extensions(self):
return []
class TestMeteringPlugin(test_db_plugin.NeutronDbPluginV2TestCase,
test_l3_plugin.L3NatTestCaseMixin,
test_db_metering.MeteringPluginDbTestCaseMixin):
resource_prefix_map = dict(
(k.replace('_', '-'), constants.COMMON_PREFIXES[constants.METERING])
for k in ext_metering.RESOURCE_ATTRIBUTE_MAP.keys()
)
def setUp(self):
service_plugins = {'metering_plugin_name': DB_METERING_PLUGIN_KLASS}
test_config['plugin_name_v2'] = ('neutron.tests.unit.test_l3_plugin.'
'TestL3NatPlugin')
ext_mgr = MeteringTestExtensionManager()
test_config['extension_manager'] = ext_mgr
super(TestMeteringPlugin, self).setUp(service_plugins=service_plugins)
self.uuid = '654f6b9d-0f36-4ae5-bd1b-01616794ca60'
uuid = 'neutron.openstack.common.uuidutils.generate_uuid'
self.uuid_patch = mock.patch(uuid, return_value=self.uuid)
self.mock_uuid = self.uuid_patch.start()
fanout = ('neutron.openstack.common.rpc.proxy.RpcProxy.'
'fanout_cast')
self.fanout_patch = mock.patch(fanout)
self.mock_fanout = self.fanout_patch.start()
self.tenant_id = 'a7e61382-47b8-4d40-bae3-f95981b5637b'
self.ctx = context.Context('', self.tenant_id, is_admin=True)
self.context_patch = mock.patch('neutron.context.Context',
return_value=self.ctx)
self.mock_context = self.context_patch.start()
self.topic = 'metering_agent'
def tearDown(self):
self.uuid_patch.stop()
self.fanout_patch.stop()
self.context_patch.stop()
del test_config['extension_manager']
del test_config['plugin_name_v2']
super(TestMeteringPlugin, self).tearDown()
def test_add_metering_label_rpc_call(self):
second_uuid = 'e27fe2df-376e-4ac7-ae13-92f050a21f84'
expected = {'args': {'routers': [{'status': 'ACTIVE',
'name': 'router1',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rules': [],
'id': self.uuid}],
'id': self.uuid}]},
'namespace': None,
'method': 'add_metering_label'}
tenant_id_2 = '8a268a58-1610-4890-87e0-07abb8231206'
self.mock_uuid.return_value = second_uuid
with self.router(name='router2', tenant_id=tenant_id_2,
set_context=True):
self.mock_uuid.return_value = self.uuid
with self.router(name='router1', tenant_id=self.tenant_id,
set_context=True):
with self.metering_label(tenant_id=self.tenant_id,
set_context=True):
self.mock_fanout.assert_called_with(self.ctx, expected,
topic=self.topic)
def test_remove_metering_label_rpc_call(self):
expected = {'args':
{'routers': [{'status': 'ACTIVE',
'name': 'router1',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rules': [],
'id': self.uuid}],
'id': self.uuid}]},
'namespace': None,
'method': 'add_metering_label'}
with self.router(tenant_id=self.tenant_id, set_context=True):
with self.metering_label(tenant_id=self.tenant_id,
set_context=True):
self.mock_fanout.assert_called_with(self.ctx, expected,
topic=self.topic)
expected['method'] = 'remove_metering_label'
self.mock_fanout.assert_called_with(self.ctx, expected,
topic=self.topic)
def test_remove_one_metering_label_rpc_call(self):
second_uuid = 'e27fe2df-376e-4ac7-ae13-92f050a21f84'
expected_add = {'args':
{'routers': [{'status': 'ACTIVE',
'name': 'router1',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rules': [],
'id': self.uuid},
{'rules': [],
'id': second_uuid}],
'id': self.uuid}]},
'namespace': None,
'method': 'add_metering_label'}
expected_remove = {'args':
{'routers': [{'status': 'ACTIVE',
'name': 'router1',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rules': [],
'id': second_uuid}],
'id': self.uuid}]},
'namespace': None,
'method': 'remove_metering_label'}
with self.router(tenant_id=self.tenant_id, set_context=True):
with self.metering_label(tenant_id=self.tenant_id,
set_context=True):
self.mock_uuid.return_value = second_uuid
with self.metering_label(tenant_id=self.tenant_id,
set_context=True):
self.mock_fanout.assert_called_with(self.ctx, expected_add,
topic=self.topic)
self.mock_fanout.assert_called_with(self.ctx, expected_remove,
topic=self.topic)
def test_update_metering_label_rules_rpc_call(self):
second_uuid = 'e27fe2df-376e-4ac7-ae13-92f050a21f84'
expected_add = {'args':
{'routers': [
{'status': 'ACTIVE',
'name': 'router1',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rules': [
{'remote_ip_prefix': '10.0.0.0/24',
'direction': 'ingress',
'metering_label_id': self.uuid,
'excluded': False,
'id': self.uuid},
{'remote_ip_prefix': '10.0.0.0/24',
'direction': 'egress',
'metering_label_id': self.uuid,
'excluded': False,
'id': second_uuid}],
'id': self.uuid}],
'id': self.uuid}]},
'namespace': None,
'method': 'update_metering_label_rules'}
expected_del = {'args':
{'routers': [
{'status': 'ACTIVE',
'name': 'router1',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rules': [
{'remote_ip_prefix': '10.0.0.0/24',
'direction': 'ingress',
'metering_label_id': self.uuid,
'excluded': False,
'id': self.uuid}],
'id': self.uuid}],
'id': self.uuid}]},
'namespace': None,
'method': 'update_metering_label_rules'}
with self.router(tenant_id=self.tenant_id, set_context=True):
with self.metering_label(tenant_id=self.tenant_id,
set_context=True) as label:
l = label['metering_label']
with self.metering_label_rule(l['id']):
self.mock_uuid.return_value = second_uuid
with self.metering_label_rule(l['id'], direction='egress'):
self.mock_fanout.assert_called_with(self.ctx,
expected_add,
topic=self.topic)
self.mock_fanout.assert_called_with(self.ctx,
expected_del,
topic=self.topic)
class TestRoutePlugin(agentschedulers_db.L3AgentSchedulerDbMixin,
test_l3_plugin.TestL3NatPlugin):
supported_extension_aliases = ["router", "l3_agent_scheduler"]
class TestMeteringPluginL3AgentScheduler(
test_db_plugin.NeutronDbPluginV2TestCase,
test_l3_plugin.L3NatTestCaseMixin,
test_db_metering.MeteringPluginDbTestCaseMixin):
resource_prefix_map = dict(
(k.replace('_', '-'), constants.COMMON_PREFIXES[constants.METERING])
for k in ext_metering.RESOURCE_ATTRIBUTE_MAP.keys()
)
def setUp(self):
service_plugins = {'metering_plugin_name': DB_METERING_PLUGIN_KLASS}
plugin_str = ('neutron.tests.unit.services.metering.'
'test_metering_plugin.TestRoutePlugin')
test_config['plugin_name_v2'] = plugin_str
ext_mgr = MeteringTestExtensionManager()
test_config['extension_manager'] = ext_mgr
super(TestMeteringPluginL3AgentScheduler,
self).setUp(service_plugins=service_plugins)
self.uuid = '654f6b9d-0f36-4ae5-bd1b-01616794ca60'
uuid = 'neutron.openstack.common.uuidutils.generate_uuid'
self.uuid_patch = mock.patch(uuid, return_value=self.uuid)
self.mock_uuid = self.uuid_patch.start()
cast = 'neutron.openstack.common.rpc.proxy.RpcProxy.cast'
self.cast_patch = mock.patch(cast)
self.mock_cast = self.cast_patch.start()
self.tenant_id = 'a7e61382-47b8-4d40-bae3-f95981b5637b'
self.ctx = context.Context('', self.tenant_id, is_admin=True)
self.context_patch = mock.patch('neutron.context.Context',
return_value=self.ctx)
self.mock_context = self.context_patch.start()
self.l3routers_patch = mock.patch(plugin_str +
'.get_l3_agents_hosting_routers')
self.l3routers_mock = self.l3routers_patch.start()
self.topic = 'metering_agent'
def tearDown(self):
self.uuid_patch.stop()
self.cast_patch.stop()
self.context_patch.stop()
self.l3routers_patch.stop()
del test_config['extension_manager']
del test_config['plugin_name_v2']
super(TestMeteringPluginL3AgentScheduler, self).tearDown()
def test_add_metering_label_rpc_call(self):
second_uuid = 'e27fe2df-376e-4ac7-ae13-92f050a21f84'
expected = {'args': {'routers': [{'status': 'ACTIVE',
'name': 'router1',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rules': [],
'id': second_uuid}],
'id': self.uuid},
{'status': 'ACTIVE',
'name': 'router2',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rules': [],
'id': second_uuid}],
'id': second_uuid}]},
'namespace': None,
'method': 'add_metering_label'}
agent_host = 'l3_agent_host'
agent = agents_db.Agent(host=agent_host)
self.l3routers_mock.return_value = [agent]
with self.router(name='router1', tenant_id=self.tenant_id,
set_context=True):
self.mock_uuid.return_value = second_uuid
with self.router(name='router2', tenant_id=self.tenant_id,
set_context=True):
with self.metering_label(tenant_id=self.tenant_id,
set_context=True):
topic = "%s.%s" % (self.topic, agent_host)
self.mock_cast.assert_called_with(self.ctx,
expected,
topic=topic)