diff --git a/.gitignore b/.gitignore index 7feedf87..f81252a5 100644 --- a/.gitignore +++ b/.gitignore @@ -36,3 +36,4 @@ vagrant-settings.yaml .solar_cli_uids .ssh/ +.cache diff --git a/solar/.cache/v/cache/lastfailed b/solar/.cache/v/cache/lastfailed new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/solar/.cache/v/cache/lastfailed @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/solar/solar/events/api.py b/solar/solar/events/api.py index 0cbdedc0..684f60b1 100644 --- a/solar/solar/events/api.py +++ b/solar/solar/events/api.py @@ -18,34 +18,36 @@ __all__ = ['add_dep', 'add_react'] import networkx as nx from solar.core.log import log -from solar.interfaces.db import get_db +from solar.interfaces import orm from solar.events.controls import Dep, React, StateChange -db = get_db() - - def create_event(event_dict): - etype = event_dict.pop('etype') + etype = event_dict['etype'] + kwargs = {'child': event_dict['child'], + 'parent': event_dict['parent'], + 'child_action': event_dict['child_action'], + 'parent_action': event_dict['parent_action'], + 'state': event_dict['state']} if etype == React.etype: - return React(**event_dict) + return React(**kwargs) elif etype == Dep.etype: - return Dep(**event_dict) + return Dep(**kwargs) else: raise Exception('No support for type %s', etype) def add_event(ev): - rst = all_events(ev.parent_node) + rst = all_events(ev.parent) for rev in rst: if ev == rev: break else: rst.append(ev) - db.create( - ev.parent_node, - [i.to_dict() for i in rst], - collection=db.COLLECTIONS.events) + resource_db = orm.DBResource.load(ev.parent) + event_db = orm.DBEvent(**ev.to_dict()) + event_db.save() + resource_db.events.add(event_db) def add_dep(parent, dep, actions, state='success'): @@ -59,36 +61,40 @@ def add_dep(parent, dep, actions, state='success'): def add_react(parent, dep, actions, state='success'): for act in actions: r = React(parent, act, state=state, - depend_node=dep, depend_action=act) + depend_node=dep, depend_action=act) add_event(r) log.debug('Added event: %s', r) +def add_events(resource, lst): + db_resource = orm.DBResource.load(resource) + for ev in lst: + event_db = orm.DBEvent(**ev.to_dict()) + event_db.save() + db_resource.events.add(event_db) + + def set_events(resource, lst): - db.create( - resource, - [i.to_dict() for i in lst], - collection=db.COLLECTIONS.events) + db_resource = orm.DBResource.load(resource) + for ev in db_resource.events.as_set(): + ev.delete() + for ev in lst: + event_db = orm.DBEvent(**ev.to_dict()) + event_db.save() + db_resource.events.add(event_db) def remove_event(ev): - rst = all_events(ev.parent_node) - set_events(ev.parent_node, [it for it in rst if not it == ev]) - - -def add_events(resource, lst): - rst = all_events(resource) - rst.extend(lst) - set_events(resource, rst) + event_db = orm.DBEvent(**ev.to_dict()) + event_db.delete() def all_events(resource): - events = db.get(resource, collection=db.COLLECTIONS.events, - return_empty=True) + events = orm.DBResource.load(resource).events.as_set() if not events: return [] - return [create_event(i) for i in events.properties] + return [create_event(i.to_dict()) for i in events] def bft_events_graph(start): @@ -105,41 +111,47 @@ def bft_events_graph(start): current_events = all_events(item) for ev in current_events: - dg.add_edge(ev.parent, ev.dependent, label=ev.state) + dg.add_edge(ev.parent_node, ev.child_node, label=ev.state) - if ev.depend_node in visited: + if ev.child in visited: continue # it is possible to have events leading to same resource but # different action - if ev.depend_node in stack: + if ev.child in stack: continue - stack.append(ev.depend_node) - visited.add(ev.parent_node) + stack.append(ev.child) + visited.add(ev.parent) return dg -def build_edges(changed_resources, changes_graph, events): +def build_edges(changes_graph, events): """ - :param changed_resources: list of resource names that were changed :param changes_graph: nx.DiGraph object with actions to be executed :param events: {res: [controls.Event objects]} """ - stack = changed_resources[:] - visited = [] + events_graph = nx.MultiDiGraph() + + for res_evts in events.values(): + for ev in res_evts: + events_graph.add_edge(ev.parent_node, ev.child_node, event=ev) + + stack = changes_graph.nodes() + visited = set() while stack: - node = stack.pop() + event_name = stack.pop(0) - if node in events: - log.debug('Events %s for resource %s', events[node], node) + if event_name in events_graph: + log.debug('Next events after %s are %s', event_name, events_graph.successors(event_name)) else: - log.debug('No dependencies based on %s', node) + log.debug('No outgoing events based on %s', event_name) - if node not in visited: - for ev in events.get(node, ()): - ev.insert(stack, changes_graph) + if event_name not in visited: + for parent, child, data in events_graph.edges(event_name, data=True): + succ_ev = data['event'] + succ_ev.insert(stack, changes_graph) - visited.append(node) + visited.add(event_name) return changes_graph diff --git a/solar/solar/events/controls.py b/solar/solar/events/controls.py index 786acffd..19093335 100644 --- a/solar/solar/events/controls.py +++ b/solar/solar/events/controls.py @@ -36,38 +36,41 @@ class Event(object): etype = None - def __init__(self, parent_node, parent_action, - state='', depend_node='', depend_action=''): - self.parent_node = parent_node + def __init__(self, parent, parent_action, + state='', child='', child_action=''): + self.parent = parent self.parent_action = parent_action self.state = state - self.depend_node = depend_node - self.depend_action = depend_action + self.child = child + self.child_action = child_action @property - def parent(self): - return '{}.{}'.format(self.parent_node, self.parent_action) + def parent_node(self): + return '{}.{}'.format(self.parent, self.parent_action) @property - def dependent(self): - return '{}.{}'.format(self.depend_node, self.depend_action) + def child_node(self): + return '{}.{}'.format(self.child, self.child_action) def to_dict(self): - rst = {'etype': self.etype} - rst.update(self.__dict__) - return rst + return {'etype': self.etype, + 'child': self.child, + 'parent': self.parent, + 'parent_action': self.parent_action, + 'child_action': self.child_action, + 'state': self.state} def __eq__(self, inst): if inst.__class__ != self.__class__: return False return all(( - self.parent == inst.parent, + self.parent_node == inst.parent_node, self.state == inst.state, - self.dependent == inst.dependent)) + self.child_node == inst.child_node)) def __repr__(self): return '{}: {} -> {} -> {}'.format( - self.etype, self.parent, self.state, self.dependent) + self.etype, self.parent_node, self.state, self.child_node) def __hash__(self): return hash(repr(self)) @@ -78,10 +81,10 @@ class Dependency(Event): etype = 'depends_on' def insert(self, changed_resources, changes_graph): - if (self.parent in changes_graph and - self.dependent in changes_graph): + if (self.parent_node in changes_graph and + self.child_node in changes_graph): changes_graph.add_edge( - self.parent, self.dependent, state=self.state) + self.parent_node, self.child_node, state=self.state) Dep = Dependency @@ -91,15 +94,16 @@ class React(Event): def insert(self, changed_resources, changes_graph): - if self.parent in changes_graph: - if self.dependent not in changes_graph: + if self.parent_node in changes_graph: + if self.child_node not in changes_graph: changes_graph.add_node( - self.dependent, status='PENDING', + self.child_node, status='PENDING', errmsg=None, type='solar_resource', - args=[self.depend_node, self.depend_action]) + args=[self.child, self.child_action]) - changes_graph.add_edge(self.parent, self.dependent, state=self.state) - changed_resources.append(self.depend_node) + changes_graph.add_edge( + self.parent_node, self.child_node, state=self.state) + changed_resources.append(self.child_node) class StateChange(Event): @@ -109,6 +113,6 @@ class StateChange(Event): def insert(self, changed_resources, changes_graph): changed_resources.append(self.parent) changes_graph.add_node( - self.parent, status='PENDING', + self.parent_node, status='PENDING', errmsg=None, type='solar_resource', - args=[self.parent_node, self.parent_action]) + args=[self.parent, self.parent_action]) diff --git a/solar/solar/interfaces/db/base.py b/solar/solar/interfaces/db/base.py index e6a9f9b1..d74fc262 100644 --- a/solar/solar/interfaces/db/base.py +++ b/solar/solar/interfaces/db/base.py @@ -132,7 +132,7 @@ class BaseGraphDB(object): DEFAULT_COLLECTION=COLLECTIONS.resource RELATION_TYPES = Enum( 'RelationTypes', - 'input_to_input resource_input plan_edge graph_to_node' + 'input_to_input resource_input plan_edge graph_to_node resource_event' ) DEFAULT_RELATION=RELATION_TYPES.resource_input @@ -172,6 +172,10 @@ class BaseGraphDB(object): def create(self, name, properties={}, collection=DEFAULT_COLLECTION): """Create element (node) with given name, args, of type `collection`.""" + @abc.abstractmethod + def delete(self, name, collection=DEFAULT_COLLECTION): + """Delete element with given name. of type `collection`.""" + @abc.abstractmethod def create_relation(self, source, diff --git a/solar/solar/interfaces/db/redis_graph_db.py b/solar/solar/interfaces/db/redis_graph_db.py index db631f2c..50aadf82 100644 --- a/solar/solar/interfaces/db/redis_graph_db.py +++ b/solar/solar/interfaces/db/redis_graph_db.py @@ -38,6 +38,9 @@ class RedisGraphDB(BaseGraphDB): elif relation_db['type_'] == BaseGraphDB.RELATION_TYPES.resource_input.name: source_collection = BaseGraphDB.COLLECTIONS.resource dest_collection = BaseGraphDB.COLLECTIONS.input + elif relation_db['type_'] == BaseGraphDB.RELATION_TYPES.resource_event.name: + source_collection = BaseGraphDB.COLLECTIONS.resource + dest_collection = BaseGraphDB.COLLECTIONS.events source = self.get(relation_db['source'], collection=source_collection) dest = self.get(relation_db['dest'], collection=dest_collection) @@ -152,6 +155,11 @@ class RedisGraphDB(BaseGraphDB): except TypeError: raise KeyError + def delete(self, name, collection=BaseGraphDB.DEFAULT_COLLECTION): + keys = self._r.keys(self._make_collection_key(collection, name)) + if keys: + self._r.delete(*keys) + def get_or_create(self, name, properties={}, diff --git a/solar/solar/interfaces/orm.py b/solar/solar/interfaces/orm.py index 8774ac1a..4f3e6a51 100644 --- a/solar/solar/interfaces/orm.py +++ b/solar/solar/interfaces/orm.py @@ -382,6 +382,12 @@ class DBObject(object): collection=self._collection ) + def delete(self): + db.delete( + self._db_key, + collection=self._collection + ) + class DBResourceInput(DBObject): __metaclass__ = DBObjectMeta @@ -483,6 +489,29 @@ class DBResourceInput(DBObject): return self.parse_backtracked_value(self.backtrack_value_emitter()) +class DBEvent(DBObject): + + __metaclass__ = DBObjectMeta + + _collection = base.BaseGraphDB.COLLECTIONS.events + + id = db_field(is_primary=True) + parent = db_field(schema='str!') + parent_action = db_field(schema='str!') + etype = db_field('str!') + state = db_field('str') + child = db_field('str') + child_action = db_field('str') + + def delete(self): + db.delete_relations( + dest=self._db_node, + type_=base.BaseGraphDB.RELATION_TYPES.resource_event + ) + super(DBEvent, self).delete() + + + class DBResource(DBObject): __metaclass__ = DBObjectMeta @@ -501,6 +530,8 @@ class DBResource(DBObject): inputs = db_related_field(base.BaseGraphDB.RELATION_TYPES.resource_input, DBResourceInput) + events = db_related_field(base.BaseGraphDB.RELATION_TYPES.resource_event, + DBEvent) def add_input(self, name, schema, value): # NOTE: Inputs need to have uuid added because there can be many @@ -516,6 +547,18 @@ class DBResource(DBObject): self.inputs.add(input) + def add_event(self, action, state, etype, child, child_action): + event = DBEvent( + parent=self.name, + parent_action=action, + state=state, + etype=etype, + child=child, + child_action=child_action + ) + event.save() + self.events.add(event) + # TODO: remove this if __name__ == '__main__': diff --git a/solar/solar/system_log/change.py b/solar/solar/system_log/change.py index 4ba0c0b2..580c29d1 100644 --- a/solar/solar/system_log/change.py +++ b/solar/solar/system_log/change.py @@ -90,7 +90,7 @@ def send_to_orchestration(): state_change = evapi.StateChange(res_uid, action) state_change.insert(changed_nodes, dg) - evapi.build_edges(changed_nodes, dg, events) + evapi.build_edges(dg, events) # what it should be? dg.graph['name'] = 'system_log' diff --git a/solar/solar/test/test_events.py b/solar/solar/test/test_events.py index 25b6e083..48a4843d 100644 --- a/solar/solar/test/test_events.py +++ b/solar/solar/test/test_events.py @@ -17,6 +17,7 @@ import networkx as nx from pytest import fixture from solar.events import api as evapi +from solar.interfaces import orm from .base import BaseResourceTest @@ -31,19 +32,25 @@ def events_example(): def test_add_events(events_example): + r = orm.DBResource(id='e1', name='e1', base_path='x') + r.save() + evapi.add_events('e1', events_example) assert set(evapi.all_events('e1')) == set(events_example) def test_set_events(events_example): + r = orm.DBResource(id='e1', name='e1', base_path='x') + r.save() partial = events_example[:2] evapi.add_events('e1', events_example[:2]) evapi.set_events('e1', events_example[2:]) - assert evapi.all_events('e1') == events_example[2:] def test_remove_events(events_example): + r = orm.DBResource(id='e1', name='e1', base_path='x') + r.save() to_be_removed = events_example[2] evapi.add_events('e1', events_example) evapi.remove_event(to_be_removed) @@ -51,6 +58,8 @@ def test_remove_events(events_example): def test_single_event(events_example): + r = orm.DBResource(id='e1', name='e1', base_path='x') + r.save() evapi.add_events('e1', events_example[:2]) evapi.add_event(events_example[2]) assert set(evapi.all_events('e1')) == set(events_example) @@ -67,11 +76,10 @@ def nova_deps(): def test_nova_api_run_after_nova(nova_deps): - changed = ['nova', 'nova_api'] changes_graph = nx.DiGraph() changes_graph.add_node('nova.run') changes_graph.add_node('nova_api.run') - evapi.build_edges(changed, changes_graph, nova_deps) + evapi.build_edges(changes_graph, nova_deps) assert changes_graph.successors('nova.run') == ['nova_api.run'] @@ -80,10 +88,9 @@ def test_nova_api_react_on_update(nova_deps): """Test that nova_api:update will be called even if there is no changes in nova_api """ - changed = ['nova'] changes_graph = nx.DiGraph() changes_graph.add_node('nova.update') - evapi.build_edges(changed, changes_graph, nova_deps) + evapi.build_edges(changes_graph, nova_deps) assert changes_graph.successors('nova.update') == ['nova_api.update'] @@ -106,7 +113,6 @@ def rmq_deps(): def test_rmq(rmq_deps): - changed = ['rmq.1', 'rmq.2', 'rmq.3', 'rmq_cluster.1', 'rmq_cluster.2', 'rmq_cluster.3'] changes_graph = nx.DiGraph() changes_graph.add_node('rmq.1.run') changes_graph.add_node('rmq.2.run') @@ -114,7 +120,7 @@ def test_rmq(rmq_deps): changes_graph.add_node('rmq_cluster.1.create') changes_graph.add_node('rmq_cluster.2.join') changes_graph.add_node('rmq_cluster.3.join') - evapi.build_edges(changed, changes_graph, rmq_deps) + evapi.build_edges(changes_graph, rmq_deps) assert set(changes_graph.successors('rmq_cluster.1.create')) == { 'rmq_cluster.2.join', 'rmq_cluster.3.join'} @@ -123,15 +129,19 @@ def test_rmq(rmq_deps): def test_riak(): events = { - 'riak_service1': [evapi.React('riak_service1', 'run', 'success', 'riak_service2', 'join'), - evapi.React('riak_service1', 'run', 'success', 'riak_service3', 'join')], - 'riak_service3': [evapi.React('riak_service3', 'join', 'success', 'riak_service1', 'commit')], - 'riak_service2': [evapi.React('riak_service2', 'join', 'success', 'riak_service1', 'commit')], + 'riak_service1': [ + evapi.React('riak_service1', 'run', 'success', 'riak_service2', 'run'), + evapi.React('riak_service1', 'run', 'success', 'riak_service3', 'run')], + 'riak_service3': [ + evapi.React('riak_service3', 'join', 'success', 'riak_service1', 'commit'), + evapi.React('riak_service3', 'run', 'success', 'riak_service3', 'join')], + 'riak_service2': [ + evapi.React('riak_service2', 'run', 'success', 'riak_service2', 'join'), + evapi.React('riak_service2', 'join', 'success', 'riak_service1', 'commit')], } - changed = ['riak_service1'] - changes_graph = nx.DiGraph() + + changes_graph = nx.MultiDiGraph() changes_graph.add_node('riak_service1.run') - evapi.build_edges(changed, changes_graph, events) - assert nx.topological_sort(changes_graph) == [ - 'riak_service1.run', 'riak_service2.join', 'riak_service3.join', 'riak_service1.commit'] + evapi.build_edges(changes_graph, events) + assert set(changes_graph.predecessors('riak_service1.commit')) == {'riak_service2.join', 'riak_service3.join'} diff --git a/solar/solar/test/test_orm.py b/solar/solar/test/test_orm.py index a3824c99..43ad6a20 100644 --- a/solar/solar/test/test_orm.py +++ b/solar/solar/test/test_orm.py @@ -226,7 +226,6 @@ class TestResourceORM(BaseResourceTest): r.save() r.add_input('ip', 'str!', '10.0.0.2') - self.assertEqual(len(r.inputs.as_set()), 1) @@ -421,3 +420,63 @@ input: signals.disconnect(sample2, sample_dict_list) self.assertEqual(vi.backtrack_value_emitter(), [{'a': sample1.resource_inputs()['value']}]) + + +class TestEventORM(BaseResourceTest): + + def test_return_emtpy_set(self): + r = orm.DBResource(id='test1', name='test1', base_path='x') + r.save() + self.assertEqual(r.events.as_set(), set()) + + def test_save_and_load_by_parent(self): + ev = orm.DBEvent( + parent='n1', + parent_action='run', + state='success', + child_action='run', + child='n2', + etype='dependency') + ev.save() + + rst = orm.DBEvent.load(ev.id) + self.assertEqual(rst, ev) + + def test_save_several(self): + ev = orm.DBEvent( + parent='n1', + parent_action='run', + state='success', + child_action='run', + child='n2', + etype='dependency') + ev.save() + ev1 = orm.DBEvent( + parent='n1', + parent_action='run', + state='success', + child_action='run', + child='n3', + etype='dependency') + ev1.save() + self.assertEqual(len(orm.DBEvent.load_all()), 2) + + def test_removal_of_event(self): + r = orm.DBResource(id='n1', name='n1', base_path='x') + r.save() + + ev = orm.DBEvent( + parent='n1', + parent_action='run', + state='success', + child_action='run', + child='n2', + etype='dependency') + ev.save() + r.events.add(ev) + + self.assertEqual(r.events.as_set(), {ev}) + ev.delete() + + r = orm.DBResource.load('n1') + self.assertEqual(r.events.as_set(), set())