Rajaram/Gavri|notifications added for ip block and ip address creation and deletion

This commit is contained in:
Rajaram Mallya 2011-11-28 17:48:20 +05:30
parent 5f98fff5d1
commit e5247c8187
4 changed files with 320 additions and 179 deletions

View File

@ -24,71 +24,22 @@ from melange.common import messaging
from melange.common import utils
class NoopNotifier(object):
def __init__(self):
pass
def warn(self, msg):
pass
def info(self, msg):
pass
def error(self, msg):
pass
class LoggingNotifier(object):
def __init__(self):
self.logger = logging.getLogger('melange.notifier.logging_notifier')
def warn(self, msg):
self.logger.warn(msg)
def info(self, msg):
self.logger.info(msg)
def error(self, msg):
self.logger.error(msg)
class QueueNotifier(object):
def _send_message(self, message, priority):
topic = "%s.%s" % ("melange.notifier", priority)
with messaging.Queue(topic) as queue:
queue.put(message)
def warn(self, msg):
self._send_message(msg, "WARN")
def info(self, msg):
self._send_message(msg, "INFO")
def error(self, msg):
self._send_message(msg, "ERROR")
class Notifier(object):
STRATEGIES = {
"logging": LoggingNotifier,
"queue": QueueNotifier,
"noop": NoopNotifier,
}
def error(self, event_type, payload):
self._send_message("error", event_type, payload)
def __init__(self, notifier=None):
strategy = config.Config.get("notifier", "noop")
try:
self.strategy = self.STRATEGIES[strategy]()
except KeyError:
raise exception.InvalidNotifier(notifier=strategy)
def warn(self, event_type, payload):
self._send_message("warn", event_type, payload)
@staticmethod
def _generate_message(event_type, priority, payload):
def info(self, event_type, payload):
self._send_message("info", event_type, payload)
def _send_message(self, level, event_type, payload):
msg = self._generate_message(event_type, level, payload)
self.notify(level, msg)
def _generate_message(self, event_type, priority, payload):
return {
"message_id": str(utils.generate_uuid()),
"publisher_id": socket.gethostname(),
@ -98,14 +49,43 @@ class Notifier(object):
"timestamp": str(utils.utcnow()),
}
def warn(self, event_type, payload):
msg = self._generate_message(event_type, "WARN", payload)
self.strategy.warn(msg)
def notify(self, level, msg):
pass
def info(self, event_type, payload):
msg = self._generate_message(event_type, "INFO", payload)
self.strategy.info(msg)
def error(self, event_type, payload):
msg = self._generate_message(event_type, "ERROR", payload)
self.strategy.error(msg)
class NoopNotifier(Notifier):
def notify(self, level, msg):
pass
class LoggingNotifier(Notifier):
logger = logging.getLogger('melange.notifier.logging_notifier')
def notify(self, level, msg):
getattr(self.logger, level)(msg)
class QueueNotifier(Notifier):
def notify(self, level, msg):
topic = "%s.%s" % ("melange.notifier", level.upper())
with messaging.Queue(topic) as queue:
queue.put(msg)
def notifier():
STRATEGIES = {
"logging": LoggingNotifier,
"queue": QueueNotifier,
"noop": NoopNotifier,
}
strategy = config.Config.get("notifier", "noop")
try:
return STRATEGIES[strategy]()
except KeyError:
raise exception.InvalidNotifier(notifier=strategy)

View File

@ -21,13 +21,13 @@ import datetime
import logging
import netaddr
from melange import db
from melange import ipv6
from melange import ipv4
from melange.common import config
from melange.common import exception
from melange.common import notifier
from melange.common import utils
from melange.db import db_api
from melange.db import db_query
LOG = logging.getLogger('melange.ipam.models')
@ -38,13 +38,35 @@ class ModelBase(object):
_fields_for_type_conversion = {}
_auto_generated_attrs = ["id", "created_at", "updated_at"]
_data_fields = []
on_create_notification_fields = []
on_update_notification_fields = []
on_delete_notification_fields = []
@classmethod
def create(cls, **values):
values['id'] = utils.generate_uuid()
values['created_at'] = utils.utcnow()
instance = cls(**values)
return instance.save()
instance = cls(**values).save()
instance._notify_fields("create")
return instance
def _notify_fields(self, event):
fields = getattr(self, "on_%s_notification_fields" % event)
if not fields:
return
payload = self._notification_payload(fields)
event_with_model_name = event + " " + self.__class__.__name__
notifier.notifier().info(event_with_model_name, payload)
def _notification_payload(self, fields):
return dict((attr, getattr(self, attr)) for attr in fields)
def update(self, **values):
attrs = utils.exclude(values, *self._auto_generated_attrs)
self.merge_attributes(attrs)
result = self.save()
self._notify_fields("update")
return result
def save(self):
if not self.is_valid():
@ -52,10 +74,11 @@ class ModelBase(object):
self._convert_columns_to_proper_type()
self._before_save()
self['updated_at'] = utils.utcnow()
return db_api.save(self)
return db.db_api.save(self)
def delete(self):
db_api.delete(self)
db.db_api.delete(self)
self._notify_fields("delete")
def __init__(self, **kwargs):
self.merge_attributes(kwargs)
@ -126,7 +149,7 @@ class ModelBase(object):
@classmethod
def get_by(cls, **kwargs):
return db_api.find_by(cls, **cls._process_conditions(kwargs))
return db.db_api.find_by(cls, **cls._process_conditions(kwargs))
@classmethod
def _process_conditions(cls, raw_conditions):
@ -135,7 +158,7 @@ class ModelBase(object):
@classmethod
def find_all(cls, **kwargs):
return db_query.find_all(cls, **cls._process_conditions(kwargs))
return db.db_query.find_all(cls, **cls._process_conditions(kwargs))
@classmethod
def count(cls, **conditions):
@ -146,11 +169,6 @@ class ModelBase(object):
for k, v in values.iteritems():
self[k] = v
def update(self, **values):
attrs = utils.exclude(values, *self._auto_generated_attrs)
self.merge_attributes(attrs)
return self.save()
def __setitem__(self, key, value):
setattr(self, key, value)
@ -220,6 +238,8 @@ class IpBlock(ModelBase):
_data_fields = ['cidr', 'network_id', 'policy_id', 'tenant_id', 'gateway',
'parent_id', 'type', 'dns1', 'dns2', 'broadcast',
'netmask']
on_create_notification_fields = ['tenant_id', 'id', 'type', 'created_at']
on_delete_notification_fields = ['tenant_id', 'id', 'type', 'created_at']
@classmethod
def find_allocated_ip(cls, ip_block_id, tenant_id, **conditions):
@ -231,7 +251,7 @@ class IpBlock(ModelBase):
@classmethod
def delete_all_deallocated_ips(cls):
for block in db_api.find_all_blocks_with_deallocated_ips():
for block in db.db_api.find_all_blocks_with_deallocated_ips():
block.delete_deallocated_ips()
@property
@ -381,7 +401,7 @@ class IpBlock(ModelBase):
def delete_deallocated_ips(self):
self.update(is_full=False)
for ip in db_api.find_deallocated_ips(
for ip in db.db_api.find_deallocated_ips(
deallocated_by=self._deallocated_by_date(), ip_block_id=self.id):
ip.delete()
@ -450,7 +470,8 @@ class IpBlock(ModelBase):
def networked_top_level_blocks(self):
if not self.network_id:
return []
blocks = db_api.find_all_top_level_blocks_in_network(self.network_id)
blocks = db.db_api.find_all_top_level_blocks_in_network(
self.network_id)
return filter(lambda block: block != self and block != self.parent,
blocks)
@ -518,6 +539,12 @@ class IpBlock(ModelBase):
class IpAddress(ModelBase):
_data_fields = ['ip_block_id', 'address', 'version']
on_create_notification_fields = ['used_by_tenant_id', 'id', 'ip_block_id',
'used_by_device_id', 'created_at',
'address']
on_delete_notification_fields = ['used_by_tenant_id', 'id', 'ip_block_id',
'used_by_device_id', 'created_at',
'address']
def _validate(self):
self._validate_presence_of("used_by_tenant_id")
@ -537,13 +564,13 @@ class IpAddress(ModelBase):
@classmethod
def find_all_by_network(cls, network_id, **conditions):
return db_query.find_all_ips_in_network(cls,
return db.db_query.find_all_ips_in_network(cls,
network_id=network_id,
**conditions)
@classmethod
def find_all_allocated_ips(cls, **conditions):
return db_query.find_all_allocated_ips(cls, **conditions)
return db.db_query.find_all_allocated_ips(cls, **conditions)
def delete(self):
if self._explicitly_allowed_on_interfaces():
@ -556,8 +583,8 @@ class IpAddress(ModelBase):
super(IpAddress, self).delete()
def _explicitly_allowed_on_interfaces(self):
return db_query.find_allowed_ips(IpAddress,
ip_address_id=self.id).count() > 0
return db.db_query.find_allowed_ips(IpAddress,
ip_address_id=self.id).count() > 0
def _before_save(self):
self.address = self._formatted(self.address)
@ -567,7 +594,7 @@ class IpAddress(ModelBase):
return IpBlock.get(self.ip_block_id)
def add_inside_locals(self, ip_addresses):
db_api.save_nat_relationships([
db.db_api.save_nat_relationships([
{
'inside_global_address_id': self.id,
'inside_local_address_id': local_address.id,
@ -582,12 +609,12 @@ class IpAddress(ModelBase):
self.update(marked_for_deallocation=False, deallocated_at=None)
def inside_globals(self, **kwargs):
return db_query.find_inside_globals(IpAddress,
return db.db_query.find_inside_globals(IpAddress,
local_address_id=self.id,
**kwargs)
def add_inside_globals(self, ip_addresses):
db_api.save_nat_relationships([
db.db_api.save_nat_relationships([
{
'inside_global_address_id': global_address.id,
'inside_local_address_id': self.id,
@ -595,15 +622,15 @@ class IpAddress(ModelBase):
for global_address in ip_addresses])
def inside_locals(self, **kwargs):
return db_query.find_inside_locals(IpAddress,
global_address_id=self.id,
return db.db_query.find_inside_locals(IpAddress,
global_address_id=self.id,
**kwargs)
def remove_inside_globals(self, inside_global_address=None):
return db_api.remove_inside_globals(self.id, inside_global_address)
return db.db_api.remove_inside_globals(self.id, inside_global_address)
def remove_inside_locals(self, inside_local_address=None):
return db_api.remove_inside_locals(self.id, inside_local_address)
return db.db_api.remove_inside_locals(self.id, inside_local_address)
def locked(self):
return self.marked_for_deallocation
@ -620,6 +647,11 @@ class IpAddress(ModelBase):
def mac_address(self):
return MacAddress.get_by(interface_id=self.interface_id)
@property
def used_by_device_id(self):
if self.interface:
return self.interface.device_id
def data(self, **options):
data = super(IpAddress, self).data(**options)
iface = self.interface
@ -788,17 +820,17 @@ class Interface(ModelBase):
self.virtual_interface_id)
raise IpNotAllowedOnInterfaceError(err_msg)
db_api.save_allowed_ip(self.id, ip.id)
db.db_api.save_allowed_ip(self.id, ip.id)
def _ip_cannot_be_allowed(self, ip):
return (self.plugged_in_network_id() is None
or self.plugged_in_network_id() != ip.ip_block.network_id)
def disallow_ip(self, ip):
db_api.remove_allowed_ip(interface_id=self.id, ip_address_id=ip.id)
db.db_api.remove_allowed_ip(interface_id=self.id, ip_address_id=ip.id)
def ips_allowed(self):
explicitly_allowed = db_query.find_allowed_ips(
explicitly_allowed = db.db_query.find_allowed_ips(
IpAddress, allowed_on_interface_id=self.id)
allocated_ips = IpAddress.find_all_allocated_ips(interface_id=self.id)
return list(set(allocated_ips) | set(explicitly_allowed))

View File

@ -21,6 +21,7 @@ import netaddr
from melange import tests
from melange.common import exception
from melange.common import notifier
from melange.common import utils
from melange.db import db_query
from melange.ipam import models
@ -1021,6 +1022,32 @@ class TestIpBlock(tests.BaseTest):
self.assertModelsEqual(block1.ip_routes(), ip_routes)
def test_ip_block_creation_is_notified(self):
_setup_uuid(self.mock, "ip_block_uuid")
creation_time = datetime.datetime(2050, 1, 1)
mock_notifier = _setup_notifier(self.mock)
mock_notifier.info("create IpBlock", dict(tenant_id="tnt_id",
id="ip_block_uuid",
type="private",
created_at=creation_time))
self.mock.ReplayAll()
with unit.StubTime(time=creation_time):
factory_models.IpBlockFactory(tenant_id="tnt_id",
type="private",)
def test_ip_block_deletion_is_notified(self):
block = factory_models.IpBlockFactory(tenant_id="tnt_id",
type="private")
mock_notifier = _setup_notifier(self.mock)
mock_notifier.info("delete IpBlock", dict(tenant_id="tnt_id",
type=block.type,
id=block.id,
created_at=block.created_at))
self.mock.ReplayAll()
block.delete()
class TestIpAddress(tests.BaseTest):
@ -1312,6 +1339,46 @@ class TestIpAddress(tests.BaseTest):
self.assertEqual(ip.errors['used_by_tenant_id'],
["used_by_tenant_id should be present"])
def test_ip_addresss_creation_is_notified(self):
block = factory_models.IpBlockFactory(cidr="10.1.1.1/24")
_setup_uuid(self.mock, "ip_address_uuid")
creation_time = datetime.datetime(2050, 1, 1)
mock_notifier = _setup_notifier(self.mock)
mock_notifier.info("create IpAddress", dict(used_by_tenant_id="tnt_id",
id="ip_address_uuid",
address="10.1.1.1",
used_by_device_id="ins",
ip_block_id=block.id,
created_at=creation_time))
self.mock.ReplayAll()
with unit.StubTime(time=creation_time):
interface = factory_models.InterfaceFactory(device_id="ins")
factory_models.IpAddressFactory(used_by_tenant_id="tnt_id",
address="10.1.1.1",
ip_block_id=block.id,
interface_id=interface.id)
def test_ip_addresss_creation_is_notified(self):
block = factory_models.IpBlockFactory(cidr="10.1.1.1/24")
interface = factory_models.InterfaceFactory(device_id="ins")
ip = factory_models.IpAddressFactory(used_by_tenant_id="tnt_id",
address="10.1.1.1",
ip_block_id=block.id,
interface_id=interface.id,
)
mock_notifier = _setup_notifier(self.mock)
mock_notifier.info("delete IpAddress", dict(used_by_tenant_id="tnt_id",
id=ip.id,
address=ip.address,
used_by_device_id="ins",
ip_block_id=block.id,
created_at=ip.created_at))
self.mock.ReplayAll()
ip.delete()
class TestIpRoute(tests.BaseTest):
@ -2375,3 +2442,13 @@ class TestAllowedIp(tests.BaseTest):
def _allocate_ip(block, interface=None, **kwargs):
interface = interface or factory_models.InterfaceFactory()
return block.allocate_ip(interface=interface, **kwargs)
def _setup_notifier(mock):
mock.StubOutClassWithMocks(notifier, "NoopNotifier")
return notifier.NoopNotifier()
def _setup_uuid(mock, uuid):
mock.StubOutWithMock(utils, "generate_uuid")
utils.generate_uuid().MultipleTimes().AndReturn(uuid)

View File

@ -22,133 +22,185 @@ import socket
import mox
from melange import tests
from melange.tests import unit
from melange.common import exception
from melange.common import messaging
from melange.common import notifier
from melange.common import utils
from melange import db
from melange.ipam import models
from melange.tests import unit
class TestNotifier(tests.BaseTest):
class NotifierTestBase():
def test_raises_error_if_configured_with_invalid_notifer(self):
with unit.StubConfig(notifier="invalid_notifier"):
self.assertRaisesExcMessage(exception.InvalidNotifier,
("no such notifier invalid_notifier "
"exists"),
notifier.Notifier)
def test_warn_formats_msg_before_passing_on_to_relavent_notifier(self):
def _setup_expected_message(self, priority, event,
message):
self._setup_uuid_with("test_uuid")
with unit.StubTime(time=datetime.datetime(2050, 1, 1)):
self._setup_expectation_on_noop_notifier_with("warn",
"test_event",
"test_message",
"test_uuid")
notifier.Notifier().warn("test_event", "test_message")
def test_info_formats_msg_before_passing_on_to_relavent_notifier(self):
self._setup_uuid_with("test_uuid")
with unit.StubTime(time=datetime.datetime(2050, 1, 1)):
self._setup_expectation_on_noop_notifier_with("info",
"test_event",
"test_message",
"test_uuid")
notifier.Notifier().info("test_event", "test_message")
def test_error_formats_msg_before_passing_on_to_relavent_notifier(self):
self._setup_uuid_with("test_uuid")
with unit.StubTime(time=datetime.datetime(2050, 1, 1)):
self._setup_expectation_on_noop_notifier_with("error",
"test_event",
"test_message",
"test_uuid")
notifier.Notifier().error("test_event", "test_message")
def _setup_expectation_on_noop_notifier_with(self, priority, event,
message, uuid):
self.mock.StubOutWithMock(notifier.NoopNotifier, priority)
priority_notifier_func = getattr(notifier.NoopNotifier, priority)
priority_notifier_func({
return {
'event_type': event,
'timestamp': str(utils.utcnow()),
'priority': priority.upper(),
'message_id': uuid,
'priority': priority,
'message_id': "test_uuid",
'payload': message,
'publisher_id': socket.gethostname(),
})
self.mock.ReplayAll()
}
def _setup_uuid_with(self, fake_uuid):
self.mock.StubOutWithMock(utils, "generate_uuid")
utils.generate_uuid().AndReturn(fake_uuid)
class TestLoggingNotifier(tests.BaseTest):
class TestLoggingNotifier(tests.BaseTest, NotifierTestBase):
def setUp(self):
super(TestLoggingNotifier, self).setUp()
with unit.StubConfig(notifier="logging"):
self.notifier = notifier.Notifier()
self.notifier = notifier.notifier()
self.logger = logging.getLogger('melange.notifier.logging_notifier')
def test_warn(self):
self.mock.StubOutWithMock(self.logger, "warn")
self.logger.warn(mox.IgnoreArg())
self.mock.ReplayAll()
with unit.StubTime(time=datetime.datetime(2050, 1, 1)):
self.notifier.warn("test_event", "test_message")
self.mock.StubOutWithMock(self.logger, "warn")
self.logger.warn(self._setup_expected_message("warn",
"test_event",
"test_message"))
self.mock.ReplayAll()
self.notifier.warn("test_event", "test_message")
def test_info(self):
self.mock.StubOutWithMock(self.logger, "info")
self.logger.info(mox.IgnoreArg())
self.mock.ReplayAll()
with unit.StubTime(time=datetime.datetime(2050, 1, 1)):
self.mock.StubOutWithMock(self.logger, "info")
self.logger.info(self._setup_expected_message("info",
"test_event",
"test_message"))
self.mock.ReplayAll()
self.notifier.info("test_event", "test_message")
self.notifier.info("test_event", "test_message")
def test_erorr(self):
self.mock.StubOutWithMock(self.logger, "error")
self.logger.error(mox.IgnoreArg())
self.mock.ReplayAll()
def test_error(self):
with unit.StubTime(time=datetime.datetime(2050, 1, 1)):
self.mock.StubOutWithMock(self.logger, "error")
self.logger.error(self._setup_expected_message("error",
"test_event",
"test_message"))
self.mock.ReplayAll()
self.notifier.error("test_event", "test_message")
self.notifier.error("test_event", "test_message")
class TestQueueNotifier(tests.BaseTest):
class TestQueueNotifier(tests.BaseTest, NotifierTestBase):
def setUp(self):
super(TestQueueNotifier, self).setUp()
with unit.StubConfig(notifier="queue"):
self.notifier = notifier.notifier()
def _setup_queue_mock(self, level, event, msg):
self.mock_queue = self.mock.CreateMockAnything()
self.mock_queue.__enter__().AndReturn(self.mock_queue)
self.mock_queue.put(mox.IgnoreArg())
self.mock_queue.put(self._setup_expected_message(level, event, msg))
self.mock_queue.__exit__(mox.IgnoreArg(),
mox.IgnoreArg(),
mox.IgnoreArg())
with unit.StubConfig(notifier="queue"):
self.notifier = notifier.Notifier()
def test_warn(self):
self.mock.StubOutWithMock(messaging, "Queue")
messaging.Queue("melange.notifier.WARN").AndReturn(self.mock_queue)
self.mock.ReplayAll()
with unit.StubTime(time=datetime.datetime(2050, 1, 1)):
self._setup_queue_mock("warn", "test_event", "test_message")
self.mock.StubOutWithMock(messaging, "Queue")
messaging.Queue("melange.notifier.WARN").AndReturn(self.mock_queue)
self.mock.ReplayAll()
self.notifier.warn("test_event", "test_message")
self.notifier.warn("test_event", "test_message")
def test_info(self):
self.mock.StubOutWithMock(messaging, "Queue")
messaging.Queue("melange.notifier.INFO").AndReturn(self.mock_queue)
self.mock.ReplayAll()
with unit.StubTime(time=datetime.datetime(2050, 1, 1)):
self._setup_queue_mock("info", "test_event", "test_message")
self.mock.StubOutWithMock(messaging, "Queue")
messaging.Queue("melange.notifier.INFO").AndReturn(self.mock_queue)
self.mock.ReplayAll()
self.notifier.info("test_event", "test_message")
self.notifier.info("test_event", "test_message")
def test_error(self):
self.mock.StubOutWithMock(messaging, "Queue")
messaging.Queue("melange.notifier.ERROR").AndReturn(self.mock_queue)
with unit.StubTime(time=datetime.datetime(2050, 1, 1)):
self.mock.StubOutWithMock(messaging, "Queue")
self._setup_queue_mock("error", "test_event", "test_message")
messaging.Queue("melange.notifier.ERROR").AndReturn(
self.mock_queue)
self.mock.ReplayAll()
self.notifier.error("test_event", "test_message")
class TestModelNotification(tests.BaseTest):
class TestModel(models.ModelBase):
on_create_notification_fields = ['alt_id', 'name', 'desc']
on_update_notification_fields = ['alt_id', 'desc']
on_delete_notification_fields = ['alt_id', 'name']
def save(self):
return self
class TestNonNotifyingModel(models.ModelBase):
def save(self):
return self
def test_model_notifies_on_create(self):
mock_notifier = self._setup_default_notifier()
mock_notifier.info("create TestModel", dict(alt_id="model_id",
name="blah",
desc="blahblah"))
self.mock.ReplayAll()
self.notifier.error("test_event", "test_message")
self.TestModel.create(alt_id="model_id",
name="blah",
desc="blahblah")
def test_model_notifies_on_update(self):
m = self.TestModel.create(alt_id="model_id",
name='blah',
desc='blahblah')
mock_notifier = self._setup_default_notifier()
mock_notifier.info("update TestModel", dict(alt_id="model_id",
desc="new desc"))
self.mock.ReplayAll()
m.update(name="name", desc="new desc")
def test_model_notifies_on_delete(self):
m = self.TestModel.create(alt_id="model_id",
name='blah',
desc='blahblah')
self.mock.StubOutWithMock(db, "db_api")
mock_notifier = self._setup_default_notifier()
mock_notifier.info("delete TestModel", dict(alt_id="model_id",
name="blah"))
db.db_api.delete(m)
self.mock.ReplayAll()
m.delete()
def test_model_doesnt_notify_when_notification_fields_not_set(self):
self.info_called = False
class MockNotifier():
def info(*args, **kargs):
self.info_called = True
self.mock.StubOutClassWithMocks(notifier, "NoopNotifier")
self.mock.ReplayAll()
self.TestNonNotifyingModel.create()
self.assertFalse(self.info_called)
def _setup_default_notifier(self):
self.mock.StubOutClassWithMocks(notifier, "NoopNotifier")
return notifier.NoopNotifier()