Merge pull request #201 from dshulyak/graph_of_inputs
Support for revert of create and remove actions
This commit is contained in:
commit
6e65702d4f
@ -9,8 +9,8 @@ Then you can continue with standard solar things:
|
||||
```
|
||||
solar changes stage -d
|
||||
solar changes process
|
||||
solar changes run-once last
|
||||
watch -n 1 solar changes report last
|
||||
solar or run-once last
|
||||
watch -n 1 solar or report last
|
||||
```
|
||||
|
||||
Wait until all actions have state `SUCCESS`,
|
||||
@ -21,3 +21,60 @@ after that check `/etc/hosts` files on both nodes, it will contain entries like:
|
||||
10.0.0.4 second1441705178.0
|
||||
```
|
||||
|
||||
If you want to try out revert functionality - you can do it in a next way:
|
||||
|
||||
After you created all the stuff, print history like this:
|
||||
|
||||
`solar ch history`
|
||||
|
||||
Output:
|
||||
|
||||
```
|
||||
log task=hosts_file1.run uid=282fe919-6059-4100-affc-56a2b3992d9d
|
||||
log task=hosts_file2.run uid=774f5a49-00f1-4bae-8a77-90d1b2d54164
|
||||
log task=node1.run uid=2559f22c-5aa9-4c05-91c6-b70884190a56
|
||||
log task=node2.run uid=18f06abe-3e8d-4356-b172-128e1dded0e6
|
||||
```
|
||||
|
||||
Now you can try to revert creation of hosts_file1
|
||||
|
||||
```
|
||||
solar ch revert 282fe919-6059-4100-affc-56a2b3992d9d
|
||||
solar ch stage
|
||||
log task=hosts_file1.remove uid=1fe456c1-a847-4902-88bf-b7f2c5687d40
|
||||
solar ch process
|
||||
solar or run-once last
|
||||
watch -n 1 solar or report last
|
||||
```
|
||||
|
||||
For now this file will be simply cleaned (more cophisticated task can be added later).
|
||||
And you can create revert of your revert, which will lead to created hosts_file1
|
||||
resource and /etc/hosts with appropriate content
|
||||
|
||||
```
|
||||
solar ch revert 282fe919-6059-4100-affc-56a2b3992d9d
|
||||
solar ch stage
|
||||
log task=hosts_file1.remove uid=1fe456c1-a847-4902-88bf-b7f2c5687d40
|
||||
solar ch process
|
||||
solar changes run-once last
|
||||
watch -n 1 solar changes report last
|
||||
```
|
||||
|
||||
After this you can revert your result of your previous revert, which will
|
||||
create this file with relevant content.
|
||||
|
||||
```
|
||||
solar ch history -n 1
|
||||
log task=hosts_file1.remove uid=1fe456c1-a847-4902-88bf-b7f2c5687d40
|
||||
solar ch revert 1fe456c1-a847-4902-88bf-b7f2c5687d40
|
||||
solar ch stage
|
||||
log task=hosts_file1.run uid=493326b2-989f-4b94-a22c-0bbd0fc5e755
|
||||
solar ch process
|
||||
solar changes run-once last
|
||||
watch -n 1 solar changes report last
|
||||
```
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
5
resources/hosts_file/actions/remove.yaml
Normal file
5
resources/hosts_file/actions/remove.yaml
Normal file
@ -0,0 +1,5 @@
|
||||
- hosts: [{{host}}]
|
||||
sudo: yes
|
||||
tasks:
|
||||
- name: Remove hosts file
|
||||
shell: echo '# flushed by ansible' > /etc/hosts
|
@ -222,6 +222,7 @@ def get_inputs(path):
|
||||
|
||||
@resource.command()
|
||||
@click.argument('name')
|
||||
def remove(name):
|
||||
@click.option('-f', default=False, help='force removal from database')
|
||||
def remove(name, f):
|
||||
res = sresource.load(name)
|
||||
res.delete()
|
||||
res.remove(force=f)
|
||||
|
@ -16,6 +16,7 @@ import sys
|
||||
|
||||
import click
|
||||
|
||||
from solar import errors
|
||||
from solar.core import testing
|
||||
from solar.core import resource
|
||||
from solar.system_log import change
|
||||
@ -96,8 +97,10 @@ def history(n, d, s):
|
||||
@changes.command()
|
||||
@click.argument('uid')
|
||||
def revert(uid):
|
||||
change.revert(uid)
|
||||
|
||||
try:
|
||||
change.revert(uid)
|
||||
except errors.SolarError as er:
|
||||
raise click.BadParameter(str(er))
|
||||
|
||||
@changes.command()
|
||||
@click.option('--name', default=None)
|
||||
|
@ -13,6 +13,8 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from enum import Enum
|
||||
|
||||
from copy import deepcopy
|
||||
from multipledispatch import dispatch
|
||||
import os
|
||||
@ -40,6 +42,9 @@ def read_meta(base_path):
|
||||
return metadata
|
||||
|
||||
|
||||
RESOURCE_STATE = Enum('ResourceState', 'created operational removed error updated')
|
||||
|
||||
|
||||
class Resource(object):
|
||||
_metadata = {}
|
||||
|
||||
@ -53,6 +58,7 @@ class Resource(object):
|
||||
else:
|
||||
metadata = deepcopy(self._metadata)
|
||||
|
||||
self.base_path = base_path
|
||||
self.tags = tags or []
|
||||
self.virtual_resource = virtual_resource
|
||||
|
||||
@ -72,7 +78,7 @@ class Resource(object):
|
||||
'meta_inputs': inputs
|
||||
|
||||
})
|
||||
|
||||
self.db_obj.state = RESOURCE_STATE.created.name
|
||||
self.db_obj.save()
|
||||
|
||||
self.create_inputs(args)
|
||||
@ -82,6 +88,7 @@ class Resource(object):
|
||||
def __init__(self, resource_db):
|
||||
self.db_obj = resource_db
|
||||
self.name = resource_db.name
|
||||
self.base_path = resource_db.base_path
|
||||
# TODO: tags
|
||||
self.tags = []
|
||||
self.virtual_resource = None
|
||||
@ -139,6 +146,7 @@ class Resource(object):
|
||||
def update(self, args):
|
||||
# TODO: disconnect input when it is updated and end_node
|
||||
# for some input_to_input relation
|
||||
self.db_obj.state = RESOURCE_STATE.updated.name
|
||||
resource_inputs = self.resource_inputs()
|
||||
|
||||
for k, v in args.items():
|
||||
@ -149,6 +157,44 @@ class Resource(object):
|
||||
def delete(self):
|
||||
return self.db_obj.delete()
|
||||
|
||||
def remove(self, force=False):
|
||||
if force:
|
||||
self.delete()
|
||||
else:
|
||||
self.db_obj.state = RESOURCE_STATE.removed.name
|
||||
self.db_obj.save()
|
||||
|
||||
def set_operational(self):
|
||||
self.db_obj.state = RESOURCE_STATE.operational.name
|
||||
self.db_obj.save()
|
||||
|
||||
def set_error(self):
|
||||
self.db_obj.state = RESOURCE_STATE.error.name
|
||||
self.db_obj.save()
|
||||
|
||||
def to_be_removed(self):
|
||||
return self.db_obj.state == RESOURCE_STATE.removed.name
|
||||
|
||||
@property
|
||||
def connections(self):
|
||||
"""
|
||||
Gives you all incoming/outgoing connections for current resource,
|
||||
stored as:
|
||||
[(emitter, emitter_input, receiver, receiver_input), ...]
|
||||
"""
|
||||
rst = []
|
||||
for emitter, receiver, meta in self.db_obj.graph().edges(data=True):
|
||||
if meta:
|
||||
receiver_input = '{}:{}|{}'.format(receiver.name,
|
||||
meta['destination_key'], meta['tag'])
|
||||
else:
|
||||
receiver_input = receiver.name
|
||||
|
||||
rst.append(
|
||||
[emitter.resource.name, emitter.name,
|
||||
receiver.resource.name, receiver_input])
|
||||
return rst
|
||||
|
||||
def resource_inputs(self):
|
||||
return {
|
||||
i.name: i for i in self.db_obj.inputs.as_set()
|
||||
@ -179,6 +225,9 @@ class Resource(object):
|
||||
**self.to_dict()
|
||||
)
|
||||
|
||||
def load_commited(self):
|
||||
return orm.DBCommitedState.get_or_create(self.name)
|
||||
|
||||
|
||||
def load(name):
|
||||
r = orm.DBResource.load(name)
|
||||
|
@ -85,7 +85,12 @@ def location_and_transports(emitter, receiver, orig_mapping):
|
||||
inps_receiver = receiver.args
|
||||
# XXX: should be somehow parametrized (input attribute?)
|
||||
for single in ('transports_id', 'location_id'):
|
||||
_single(single, inps_emitter[single], inps_receiver[single])
|
||||
if single in inps_emitter and inps_receiver:
|
||||
_single(single, inps_emitter[single], inps_receiver[single])
|
||||
else:
|
||||
log.warning('Unable to create connection for %s with'
|
||||
' emitter %s, receiver %s',
|
||||
single, emitter.name, receiver.name)
|
||||
return
|
||||
|
||||
|
||||
|
@ -44,10 +44,10 @@ def add_event(ev):
|
||||
break
|
||||
else:
|
||||
rst.append(ev)
|
||||
resource_db = orm.DBResource.load(ev.parent)
|
||||
resource_events = orm.DBResourceEvents.get_or_create(ev.parent)
|
||||
event_db = orm.DBEvent(**ev.to_dict())
|
||||
event_db.save()
|
||||
resource_db.events.add(event_db)
|
||||
resource_events.events.add(event_db)
|
||||
|
||||
|
||||
def add_dep(parent, dep, actions, state='success'):
|
||||
@ -67,21 +67,21 @@ def add_react(parent, dep, actions, state='success'):
|
||||
|
||||
|
||||
def add_events(resource, lst):
|
||||
db_resource = orm.DBResource.load(resource)
|
||||
resource_events = orm.DBResourceEvents.get_or_create(resource)
|
||||
for ev in lst:
|
||||
event_db = orm.DBEvent(**ev.to_dict())
|
||||
event_db.save()
|
||||
db_resource.events.add(event_db)
|
||||
resource_events.events.add(event_db)
|
||||
|
||||
|
||||
def set_events(resource, lst):
|
||||
db_resource = orm.DBResource.load(resource)
|
||||
for ev in db_resource.events.as_set():
|
||||
resource_events = orm.DBResourceEvents.get_or_create(resource)
|
||||
for ev in resource_events.events.as_set():
|
||||
ev.delete()
|
||||
for ev in lst:
|
||||
event_db = orm.DBEvent(**ev.to_dict())
|
||||
event_db.save()
|
||||
db_resource.events.add(event_db)
|
||||
resource_events.events.add(event_db)
|
||||
|
||||
|
||||
def remove_event(ev):
|
||||
@ -90,7 +90,7 @@ def remove_event(ev):
|
||||
|
||||
|
||||
def all_events(resource):
|
||||
events = orm.DBResource.load(resource).events.as_set()
|
||||
events = orm.DBResourceEvents.get_or_create(resource).events.as_set()
|
||||
|
||||
if not events:
|
||||
return []
|
||||
|
@ -127,12 +127,12 @@ class BaseGraphDB(object):
|
||||
|
||||
COLLECTIONS = Enum(
|
||||
'Collections',
|
||||
'input resource state_data state_log plan_node plan_graph events stage_log commit_log'
|
||||
'input resource state_data state_log plan_node plan_graph events stage_log commit_log resource_events'
|
||||
)
|
||||
DEFAULT_COLLECTION=COLLECTIONS.resource
|
||||
RELATION_TYPES = Enum(
|
||||
'RelationTypes',
|
||||
'input_to_input resource_input plan_edge graph_to_node resource_event'
|
||||
'input_to_input resource_input plan_edge graph_to_node resource_event commited'
|
||||
)
|
||||
DEFAULT_RELATION=RELATION_TYPES.resource_input
|
||||
|
||||
|
@ -39,7 +39,7 @@ class RedisGraphDB(BaseGraphDB):
|
||||
source_collection = BaseGraphDB.COLLECTIONS.resource
|
||||
dest_collection = BaseGraphDB.COLLECTIONS.input
|
||||
elif relation_db['type_'] == BaseGraphDB.RELATION_TYPES.resource_event.name:
|
||||
source_collection = BaseGraphDB.COLLECTIONS.resource
|
||||
source_collection = BaseGraphDB.COLLECTIONS.resource_events
|
||||
dest_collection = BaseGraphDB.COLLECTIONS.events
|
||||
|
||||
source = self.get(relation_db['source'], collection=source_collection)
|
||||
@ -146,7 +146,6 @@ class RedisGraphDB(BaseGraphDB):
|
||||
def get(self, name, collection=BaseGraphDB.DEFAULT_COLLECTION,
|
||||
return_empty=False):
|
||||
"""Fetch element with given name and collection type."""
|
||||
|
||||
try:
|
||||
collection_key = self._make_collection_key(collection, name)
|
||||
item = self._r.get(collection_key)
|
||||
|
@ -181,6 +181,18 @@ class DBRelatedField(object):
|
||||
|
||||
return ret
|
||||
|
||||
def as_list(self):
|
||||
relations = self.all()
|
||||
|
||||
ret = []
|
||||
|
||||
for rel in relations:
|
||||
ret.append(
|
||||
self.destination_db_class(**rel.end_node.properties)
|
||||
)
|
||||
|
||||
return ret
|
||||
|
||||
def sources(self, destination_db_object):
|
||||
"""
|
||||
Reverse of self.as_set, i.e. for given destination_db_object,
|
||||
@ -424,6 +436,20 @@ class DBResourceInput(DBObject):
|
||||
)
|
||||
super(DBResourceInput, self).delete()
|
||||
|
||||
def edges(self):
|
||||
|
||||
out = db.get_relations(
|
||||
source=self._db_node,
|
||||
type_=base.BaseGraphDB.RELATION_TYPES.input_to_input)
|
||||
incoming = db.get_relations(
|
||||
dest=self._db_node,
|
||||
type_=base.BaseGraphDB.RELATION_TYPES.input_to_input)
|
||||
for relation in out + incoming:
|
||||
meta = relation.properties
|
||||
source = DBResourceInput(**relation.start_node.properties)
|
||||
dest = DBResourceInput(**relation.end_node.properties)
|
||||
yield source, dest, meta
|
||||
|
||||
def check_other_val(self, other_val=None):
|
||||
if not other_val:
|
||||
return self
|
||||
@ -434,7 +460,6 @@ class DBResourceInput(DBObject):
|
||||
correct_input = inps[other_val]
|
||||
return correct_input.backtrack_value()
|
||||
|
||||
|
||||
def backtrack_value_emitter(self, level=None, other_val=None):
|
||||
# TODO: this is actually just fetching head element in linked list
|
||||
# so this whole algorithm can be moved to the db backend probably
|
||||
@ -543,6 +568,46 @@ class DBEvent(DBObject):
|
||||
super(DBEvent, self).delete()
|
||||
|
||||
|
||||
class DBResourceEvents(DBObject):
|
||||
|
||||
__metaclass__ = DBObjectMeta
|
||||
|
||||
_collection = base.BaseGraphDB.COLLECTIONS.resource_events
|
||||
|
||||
id = db_field(schema='str!', is_primary=True)
|
||||
events = db_related_field(base.BaseGraphDB.RELATION_TYPES.resource_event,
|
||||
DBEvent)
|
||||
|
||||
@classmethod
|
||||
def get_or_create(cls, name):
|
||||
r = db.get_or_create(
|
||||
name,
|
||||
properties={'id': name},
|
||||
collection=cls._collection)
|
||||
return cls(**r.properties)
|
||||
|
||||
|
||||
class DBCommitedState(DBObject):
|
||||
|
||||
__metaclass__ = DBObjectMeta
|
||||
|
||||
_collection = base.BaseGraphDB.COLLECTIONS.state_data
|
||||
|
||||
id = db_field(schema='str!', is_primary=True)
|
||||
inputs = db_field(schema={}, default_value={})
|
||||
connections = db_field(schema=[], default_value=[])
|
||||
base_path = db_field(schema='str')
|
||||
tags = db_field(schema=[], default_value=[])
|
||||
state = db_field(schema='str', default_value='removed')
|
||||
|
||||
@classmethod
|
||||
def get_or_create(cls, name):
|
||||
r = db.get_or_create(
|
||||
name,
|
||||
properties={'id': name},
|
||||
collection=cls._collection)
|
||||
return cls(**r.properties)
|
||||
|
||||
|
||||
class DBResource(DBObject):
|
||||
__metaclass__ = DBObjectMeta
|
||||
@ -559,11 +624,10 @@ class DBResource(DBObject):
|
||||
version = db_field(schema='str')
|
||||
tags = db_field(schema=[], default_value=[])
|
||||
meta_inputs = db_field(schema={}, default_value={})
|
||||
state = db_field(schema='str')
|
||||
|
||||
inputs = db_related_field(base.BaseGraphDB.RELATION_TYPES.resource_input,
|
||||
DBResourceInput)
|
||||
events = db_related_field(base.BaseGraphDB.RELATION_TYPES.resource_event,
|
||||
DBEvent)
|
||||
|
||||
def add_input(self, name, schema, value):
|
||||
# NOTE: Inputs need to have uuid added because there can be many
|
||||
@ -597,6 +661,12 @@ class DBResource(DBObject):
|
||||
input.delete()
|
||||
super(DBResource, self).delete()
|
||||
|
||||
def graph(self):
|
||||
mdg = networkx.MultiDiGraph()
|
||||
for input in self.inputs.as_list():
|
||||
mdg.add_edges_from(input.edges())
|
||||
return mdg
|
||||
|
||||
|
||||
# TODO: remove this
|
||||
if __name__ == '__main__':
|
||||
|
@ -23,6 +23,10 @@ from solar.interfaces.db import get_db
|
||||
from solar.system_log import data
|
||||
from solar.orchestration import graph
|
||||
from solar.events import api as evapi
|
||||
from solar.interfaces import orm
|
||||
from .consts import CHANGES
|
||||
from solar.core.resource.resource import RESOURCE_STATE
|
||||
from solar.errors import CannotFindID
|
||||
|
||||
db = get_db()
|
||||
|
||||
@ -31,73 +35,88 @@ def guess_action(from_, to):
|
||||
# NOTE(dshulyak) imo the way to solve this - is dsl for orchestration,
|
||||
# something where this action will be excplicitly specified
|
||||
if not from_:
|
||||
return 'run'
|
||||
return CHANGES.run.name
|
||||
elif not to:
|
||||
return 'remove'
|
||||
return CHANGES.remove.name
|
||||
else:
|
||||
return 'update'
|
||||
return CHANGES.update.name
|
||||
|
||||
|
||||
def create_diff(staged, commited):
|
||||
return list(dictdiffer.diff(commited, staged))
|
||||
|
||||
|
||||
def create_logitem(resource, action, diffed):
|
||||
def create_logitem(resource, action, diffed, connections_diffed,
|
||||
base_path=None):
|
||||
return data.LogItem(
|
||||
utils.generate_uuid(),
|
||||
resource,
|
||||
'{}.{}'.format(resource, action),
|
||||
diffed)
|
||||
action,
|
||||
diffed,
|
||||
connections_diffed,
|
||||
base_path=base_path)
|
||||
|
||||
|
||||
def _stage_changes(staged_resources, commited_resources, staged_log):
|
||||
def create_sorted_diff(staged, commited):
|
||||
staged.sort()
|
||||
commited.sort()
|
||||
return create_diff(staged, commited)
|
||||
|
||||
union = set(staged_resources.keys()) | set(commited_resources.keys())
|
||||
for res_uid in union:
|
||||
commited_data = commited_resources.get(res_uid, {})
|
||||
staged_data = staged_resources.get(res_uid, {})
|
||||
|
||||
df = create_diff(staged_data, commited_data)
|
||||
|
||||
if df:
|
||||
action = guess_action(commited_data, staged_data)
|
||||
log_item = create_logitem(res_uid, action, df)
|
||||
staged_log.append(log_item)
|
||||
return staged_log
|
||||
|
||||
|
||||
def stage_changes():
|
||||
log = data.SL()
|
||||
log.clean()
|
||||
staged = {r.name: r.args for r in resource.load_all()}
|
||||
commited = data.CD()
|
||||
return _stage_changes(staged, commited, log)
|
||||
|
||||
for resouce_obj in resource.load_all():
|
||||
commited = resouce_obj.load_commited()
|
||||
base_path = resouce_obj.base_path
|
||||
if resouce_obj.to_be_removed():
|
||||
resource_args = {}
|
||||
resource_connections = []
|
||||
else:
|
||||
resource_args = resouce_obj.args
|
||||
resource_connections = resouce_obj.connections
|
||||
|
||||
if commited.state == RESOURCE_STATE.removed.name:
|
||||
commited_args = {}
|
||||
commited_connections = []
|
||||
else:
|
||||
commited_args = commited.inputs
|
||||
commited_connections = commited.connections
|
||||
|
||||
inputs_diff = create_diff(resource_args, commited_args)
|
||||
connections_diff = create_sorted_diff(
|
||||
resource_connections, commited_connections)
|
||||
|
||||
# if new connection created it will be reflected in inputs
|
||||
# but using inputs to reverse connections is not possible
|
||||
if inputs_diff:
|
||||
log_item = create_logitem(
|
||||
resouce_obj.name,
|
||||
guess_action(commited_args, resource_args),
|
||||
inputs_diff,
|
||||
connections_diff,
|
||||
base_path=base_path)
|
||||
log.append(log_item)
|
||||
return log
|
||||
|
||||
|
||||
def send_to_orchestration():
|
||||
dg = nx.MultiDiGraph()
|
||||
staged = {r.name: r.args for r in resource.load_all()}
|
||||
commited = data.CD()
|
||||
events = {}
|
||||
changed_nodes = []
|
||||
|
||||
for res_uid in staged.keys():
|
||||
commited_data = commited.get(res_uid, {})
|
||||
staged_data = staged.get(res_uid, {})
|
||||
for logitem in data.SL():
|
||||
events[logitem.res] = evapi.all_events(logitem.res)
|
||||
changed_nodes.append(logitem.res)
|
||||
|
||||
df = create_diff(staged_data, commited_data)
|
||||
|
||||
if df:
|
||||
events[res_uid] = evapi.all_events(res_uid)
|
||||
changed_nodes.append(res_uid)
|
||||
action = guess_action(commited_data, staged_data)
|
||||
|
||||
state_change = evapi.StateChange(res_uid, action)
|
||||
state_change.insert(changed_nodes, dg)
|
||||
state_change = evapi.StateChange(logitem.res, logitem.action)
|
||||
state_change.insert(changed_nodes, dg)
|
||||
|
||||
evapi.build_edges(dg, events)
|
||||
|
||||
# what it should be?
|
||||
# what `name` should be?
|
||||
dg.graph['name'] = 'system_log'
|
||||
return graph.create_plan_from_graph(dg)
|
||||
|
||||
@ -110,14 +129,67 @@ def parameters(res, action, data):
|
||||
|
||||
|
||||
def revert_uids(uids):
|
||||
commited = data.CD()
|
||||
"""
|
||||
:param uids: iterable not generator
|
||||
"""
|
||||
history = data.CL()
|
||||
not_valid = []
|
||||
for uid in uids:
|
||||
if history.get(uid) is None:
|
||||
not_valid.append(uid)
|
||||
if not_valid:
|
||||
raise CannotFindID('UIDS: {} not in history.'.format(not_valid))
|
||||
|
||||
for uid in uids:
|
||||
item = history.get(uid)
|
||||
res_db = resource.load(item.res)
|
||||
args_to_update = dictdiffer.revert(
|
||||
item.diff, commited.get(item.res, {}))
|
||||
res_db.update(args_to_update)
|
||||
|
||||
if item.action == CHANGES.update.name:
|
||||
_revert_update(item)
|
||||
elif item.action == CHANGES.remove.name:
|
||||
_revert_remove(item)
|
||||
elif item.action == CHANGES.run.name:
|
||||
_revert_run(item)
|
||||
else:
|
||||
log.debug('Action %s for resource %s is a side'
|
||||
' effect of another action', item.action, item.res)
|
||||
|
||||
|
||||
def _revert_remove(logitem):
|
||||
"""Resource should be created with all previous connections
|
||||
"""
|
||||
commited = orm.DBCommitedState.load(logitem.res)
|
||||
args = dictdiffer.revert(logitem.diff, commited.inputs)
|
||||
connections = dictdiffer.revert(logitem.signals_diff, sorted(commited.connections))
|
||||
resource.Resource(logitem.res, logitem.base_path, args=args, tags=commited.tags)
|
||||
for emitter, emitter_input, receiver, receiver_input in connections:
|
||||
emmiter_obj = resource.load(emitter)
|
||||
receiver_obj = resource.load(receiver)
|
||||
signals.connect(emmiter_obj, receiver_obj, {emitter_input: receiver_input})
|
||||
|
||||
|
||||
def _revert_update(logitem):
|
||||
"""Revert of update should update inputs and connections
|
||||
"""
|
||||
res_obj = resource.load(logitem.res)
|
||||
commited = res_obj.load_commited()
|
||||
args_to_update = dictdiffer.revert(logitem.diff, commited.inputs)
|
||||
res_obj.update(args_to_update)
|
||||
|
||||
for emitter, _, receiver, _ in commited.connections:
|
||||
emmiter_obj = resource.load(emitter)
|
||||
receiver_obj = resource.load(receiver)
|
||||
signals.disconnect(emmiter_obj, receiver_obj)
|
||||
|
||||
connections = dictdiffer.revert(logitem.signals_diff, sorted(commited.connections))
|
||||
for emitter, emitter_input, receiver, receiver_input in connections:
|
||||
emmiter_obj = resource.load(emitter)
|
||||
receiver_obj = resource.load(receiver)
|
||||
signals.connect(emmiter_obj, receiver_obj, {emitter_input: receiver_input})
|
||||
|
||||
|
||||
def _revert_run(logitem):
|
||||
res_obj = resource.load(logitem.res)
|
||||
res_obj.remove()
|
||||
|
||||
|
||||
def revert(uid):
|
||||
|
20
solar/solar/system_log/consts.py
Normal file
20
solar/solar/system_log/consts.py
Normal file
@ -0,0 +1,20 @@
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from enum import Enum
|
||||
|
||||
CHANGES = Enum(
|
||||
'Changes',
|
||||
'run remove update'
|
||||
)
|
@ -30,23 +30,24 @@ STATES = Enum('States', 'error inprogress pending success')
|
||||
def state_file(name):
|
||||
if 'log' in name:
|
||||
return Log(name)
|
||||
elif 'data' in name:
|
||||
return Data(name)
|
||||
|
||||
|
||||
CD = partial(state_file, 'commited_data')
|
||||
SL = partial(state_file, 'stage_log')
|
||||
CL = partial(state_file, 'commit_log')
|
||||
|
||||
|
||||
class LogItem(object):
|
||||
|
||||
def __init__(self, uid, res, log_action, diff, state=None):
|
||||
def __init__(self, uid, res, action, diff,
|
||||
signals_diff, state=None, base_path=None):
|
||||
self.uid = uid
|
||||
self.res = res
|
||||
self.log_action = log_action
|
||||
self.log_action = '{}.{}'.format(res, action)
|
||||
self.action = action
|
||||
self.diff = diff
|
||||
self.signals_diff = signals_diff
|
||||
self.state = state or STATES.pending
|
||||
self.base_path = base_path
|
||||
|
||||
def to_yaml(self):
|
||||
return utils.yaml_dump(self.to_dict())
|
||||
@ -54,9 +55,11 @@ class LogItem(object):
|
||||
def to_dict(self):
|
||||
return {'uid': self.uid,
|
||||
'res': self.res,
|
||||
'log_action': self.log_action,
|
||||
'diff': self.diff,
|
||||
'state': self.state.name}
|
||||
'state': self.state.name,
|
||||
'signals_diff': self.signals_diff,
|
||||
'base_path': self.base_path,
|
||||
'action': self.action}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, **kwargs):
|
||||
@ -88,6 +91,9 @@ def details(diff):
|
||||
elif type_ == 'change':
|
||||
rst.append('-+ {}: {} >> {}'.format(
|
||||
unwrap_change_val(val), change[0], change[1]))
|
||||
elif type_ == 'remove':
|
||||
for key, val in change:
|
||||
rst.append('-- {}: {}'.format(key ,val))
|
||||
return rst
|
||||
|
||||
|
||||
@ -147,35 +153,5 @@ class Log(object):
|
||||
def __iter__(self):
|
||||
return iter(self.collection())
|
||||
|
||||
|
||||
class Data(collections.MutableMapping):
|
||||
|
||||
def __init__(self, path):
|
||||
self.path = path
|
||||
r = db.get(path, collection=db.COLLECTIONS.state_data,
|
||||
return_empty=True, db_convert=False)
|
||||
|
||||
if r:
|
||||
self.store = r.get('properties', {})
|
||||
else:
|
||||
self.store = {}
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self.store[key]
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
self.store[key] = value
|
||||
db.create(self.path, self.store, collection=db.COLLECTIONS.state_data)
|
||||
|
||||
def __delitem__(self, key):
|
||||
self.store.pop(key)
|
||||
db.create(self.path, self.store, collection=db.COLLECTIONS.state_data)
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self.store)
|
||||
|
||||
def __len__(self):
|
||||
return len(self.store)
|
||||
|
||||
def clean(self):
|
||||
db.create(self.path, {}, collection=db.COLLECTIONS.state_data)
|
||||
return len(list(self.collection()))
|
||||
|
@ -14,12 +14,17 @@
|
||||
|
||||
from solar.system_log import data
|
||||
from dictdiffer import patch
|
||||
from solar.interfaces import orm
|
||||
from solar.core.resource import resource
|
||||
from .consts import CHANGES
|
||||
|
||||
|
||||
def set_error(log_action, *args, **kwargs):
|
||||
sl = data.SL()
|
||||
item = next((i for i in sl if i.log_action == log_action), None)
|
||||
if item:
|
||||
resource_obj = resource.load(item.res)
|
||||
resource.set_error()
|
||||
item.state = data.STATES.error
|
||||
sl.update(item)
|
||||
|
||||
@ -27,11 +32,26 @@ def set_error(log_action, *args, **kwargs):
|
||||
def move_to_commited(log_action, *args, **kwargs):
|
||||
sl = data.SL()
|
||||
item = next((i for i in sl if i.log_action == log_action), None)
|
||||
sl.pop(item.uid)
|
||||
if item:
|
||||
commited = data.CD()
|
||||
staged_data = patch(item.diff, commited.get(item.res, {}))
|
||||
sl.pop(item.uid)
|
||||
resource_obj = resource.load(item.res)
|
||||
commited = orm.DBCommitedState.get_or_create(item.res)
|
||||
|
||||
if item.action == CHANGES.remove.name:
|
||||
resource_obj.delete()
|
||||
commited.state = resource.RESOURCE_STATE.removed.name
|
||||
else:
|
||||
resource_obj.set_operational()
|
||||
commited.state = resource.RESOURCE_STATE.operational.name
|
||||
commited.inputs = patch(item.diff, commited.inputs)
|
||||
commited.tags = resource_obj.tags
|
||||
sorted_connections = sorted(commited.connections)
|
||||
commited.connections = patch(item.signals_diff, sorted_connections)
|
||||
commited.base_path = item.base_path
|
||||
|
||||
commited.save()
|
||||
cl = data.CL()
|
||||
item.state = data.STATES.success
|
||||
cl.append(item)
|
||||
commited[item.res] = staged_data
|
||||
|
||||
|
||||
|
@ -95,11 +95,3 @@ def resources():
|
||||
'connections': [['n.1', 'h.1', ['ip', 'ip']]],
|
||||
'tags': []}}
|
||||
return r
|
||||
|
||||
|
||||
def test_stage_changes(resources):
|
||||
commited = {}
|
||||
log = change._stage_changes(resources, commited, [])
|
||||
|
||||
assert len(log) == 3
|
||||
assert {l.res for l in log} == {'n.1', 'r.1', 'h.1'}
|
||||
|
@ -431,7 +431,7 @@ input:
|
||||
class TestEventORM(BaseResourceTest):
|
||||
|
||||
def test_return_emtpy_set(self):
|
||||
r = orm.DBResource(id='test1', name='test1', base_path='x')
|
||||
r = orm.DBResourceEvents(id='test1')
|
||||
r.save()
|
||||
self.assertEqual(r.events.as_set(), set())
|
||||
|
||||
@ -468,11 +468,11 @@ class TestEventORM(BaseResourceTest):
|
||||
self.assertEqual(len(orm.DBEvent.load_all()), 2)
|
||||
|
||||
def test_removal_of_event(self):
|
||||
r = orm.DBResource(id='n1', name='n1', base_path='x')
|
||||
r = orm.DBResourceEvents(id='test1')
|
||||
r.save()
|
||||
|
||||
ev = orm.DBEvent(
|
||||
parent='n1',
|
||||
parent='test1',
|
||||
parent_action='run',
|
||||
state='success',
|
||||
child_action='run',
|
||||
@ -484,5 +484,5 @@ class TestEventORM(BaseResourceTest):
|
||||
self.assertEqual(r.events.as_set(), {ev})
|
||||
ev.delete()
|
||||
|
||||
r = orm.DBResource.load('n1')
|
||||
r = orm.DBResourceEvents.load('test1')
|
||||
self.assertEqual(r.events.as_set(), set())
|
||||
|
@ -12,11 +12,14 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from pytest import fixture
|
||||
from pytest import mark
|
||||
from solar.system_log import change
|
||||
from solar.system_log import data
|
||||
from solar.system_log import operations
|
||||
from solar.core import signals
|
||||
from solar.core.resource import resource
|
||||
from solar.interfaces import orm
|
||||
|
||||
@ -35,12 +38,145 @@ def test_revert_update():
|
||||
|
||||
log = data.SL()
|
||||
logitem =change.create_logitem(
|
||||
res.name, action, change.create_diff(commit, previous))
|
||||
res.name, action, change.create_diff(commit, previous), [],
|
||||
base_path=res.base_path)
|
||||
log.append(logitem)
|
||||
resource_obj.update(commit)
|
||||
operations.move_to_commited(logitem.log_action)
|
||||
|
||||
assert logitem.diff == [('change', 'a', ('9', '10'))]
|
||||
assert resource_obj.args == commit
|
||||
|
||||
change.revert(logitem.uid)
|
||||
assert resource_obj.args == previous
|
||||
|
||||
|
||||
def test_revert_update_connected():
|
||||
res1 = orm.DBResource(id='test1', name='test1', base_path='x')
|
||||
res1.save()
|
||||
res1.add_input('a', 'str', '9')
|
||||
|
||||
res2 = orm.DBResource(id='test2', name='test2', base_path='x')
|
||||
res2.save()
|
||||
res2.add_input('a', 'str', 0)
|
||||
|
||||
res3 = orm.DBResource(id='test3', name='test3', base_path='x')
|
||||
res3.save()
|
||||
res3.add_input('a', 'str', 0)
|
||||
|
||||
res1 = resource.load('test1')
|
||||
res2 = resource.load('test2')
|
||||
res3 = resource.load('test3')
|
||||
signals.connect(res1, res2)
|
||||
signals.connect(res2, res3)
|
||||
|
||||
staged_log = change.stage_changes()
|
||||
assert len(staged_log) == 3
|
||||
for item in staged_log:
|
||||
operations.move_to_commited(item.log_action)
|
||||
assert len(staged_log) == 0
|
||||
|
||||
signals.disconnect(res1, res2)
|
||||
|
||||
staged_log = change.stage_changes()
|
||||
assert len(staged_log) == 2
|
||||
to_revert = []
|
||||
for item in staged_log:
|
||||
operations.move_to_commited(item.log_action)
|
||||
to_revert.append(item.uid)
|
||||
|
||||
change.revert_uids(sorted(to_revert, reverse=True))
|
||||
staged_log = change.stage_changes()
|
||||
assert len(staged_log) == 2
|
||||
for item in staged_log:
|
||||
assert item.diff == [['change', 'a', [0, '9']]]
|
||||
|
||||
|
||||
def test_revert_removal():
|
||||
res = orm.DBResource(id='test1', name='test1', base_path='x')
|
||||
res.save()
|
||||
res.add_input('a', 'str', '9')
|
||||
res.add_input('location_id', 'str', '1')
|
||||
res.add_input('transports_id', 'str', '1')
|
||||
|
||||
commited = orm.DBCommitedState.get_or_create('test1')
|
||||
commited.inputs = {'a': '9', 'location_id': '1', 'transports_id': '1'}
|
||||
commited.save()
|
||||
|
||||
logitem =change.create_logitem(
|
||||
res.name, 'remove', change.create_diff({}, {'a': '9'}), [],
|
||||
base_path=res.base_path)
|
||||
log = data.SL()
|
||||
log.append(logitem)
|
||||
resource_obj = resource.load(res.name)
|
||||
resource_obj.remove()
|
||||
operations.move_to_commited(logitem.log_action)
|
||||
|
||||
resources = orm.DBResource.load_all()
|
||||
|
||||
assert resources == []
|
||||
assert logitem.diff == [('remove', '', [('a', '9')])]
|
||||
|
||||
with mock.patch.object(resource, 'read_meta') as mread:
|
||||
mread.return_value = {'input': {'a': {'schema': 'str!'}}}
|
||||
change.revert(logitem.uid)
|
||||
resource_obj = resource.load('test1')
|
||||
assert resource_obj.args == {'a': '9', 'location_id': '1', 'transports_id': '1'}
|
||||
|
||||
|
||||
@mark.xfail(reason='With current approach child will be notice changes after parent is removed')
|
||||
def test_revert_removed_child():
|
||||
res1 = orm.DBResource(id='test1', name='test1', base_path='x')
|
||||
res1.save()
|
||||
res1.add_input('a', 'str', '9')
|
||||
|
||||
res2 = orm.DBResource(id='test2', name='test2', base_path='x')
|
||||
res2.save()
|
||||
res2.add_input('a', 'str', 0)
|
||||
|
||||
res1 = resource.load('test1')
|
||||
res2 = resource.load('test2')
|
||||
signals.connect(res1, res2)
|
||||
|
||||
staged_log = change.stage_changes()
|
||||
assert len(staged_log) == 2
|
||||
for item in staged_log:
|
||||
operations.move_to_commited(item.log_action)
|
||||
res2.remove()
|
||||
|
||||
staged_log = change.stage_changes()
|
||||
assert len(staged_log) == 1
|
||||
logitem = next(staged_log.collection())
|
||||
operations.move_to_commited(logitem.log_action)
|
||||
|
||||
with mock.patch.object(resource, 'read_meta') as mread:
|
||||
mread.return_value = {'input': {'a': {'schema': 'str!'}}}
|
||||
change.revert(logitem.uid)
|
||||
|
||||
res2 = resource.load('test2')
|
||||
assert res2.args == {'a': '9'}
|
||||
|
||||
|
||||
def test_revert_create():
|
||||
res = orm.DBResource(id='test1', name='test1', base_path='x')
|
||||
res.save()
|
||||
res.add_input('a', 'str', '9')
|
||||
|
||||
staged_log = change.stage_changes()
|
||||
assert len(staged_log) == 1
|
||||
logitem = next(staged_log.collection())
|
||||
|
||||
operations.move_to_commited(logitem.log_action)
|
||||
assert logitem.diff == [['add', '', [['a', '9']]]]
|
||||
|
||||
commited = orm.DBCommitedState.load('test1')
|
||||
assert commited.inputs == {'a': '9'}
|
||||
|
||||
change.revert(logitem.uid)
|
||||
|
||||
staged_log = change.stage_changes()
|
||||
assert len(staged_log) == 1
|
||||
for item in staged_log:
|
||||
operations.move_to_commited(item.log_action)
|
||||
resources = orm.DBResource.load_all()
|
||||
assert resources == []
|
||||
|
Loading…
x
Reference in New Issue
Block a user