Merge pull request #57 from Mirantis/events

Events
This commit is contained in:
CGenie 2015-09-02 09:35:36 +02:00
commit accc44f10d
20 changed files with 597 additions and 84 deletions

155
docs/events.md Normal file
View File

@ -0,0 +1,155 @@
[DRAFT] events propagation
Possible events on a resource:
1. changed
configuration management executed on resource, and changes were found,
both ansible and puppet is able to know if there were any changes
2. failed, error
error - corresponds to problems in infrastructure, and probably cant be remediated in any way
failed - process of configuring resource failed
Does it make sense to create such separation?
3. ok
Resource executed without errors or changes, so successors may skip reloading
or whatever
4. created ??
is there such cases when we need to differentiate between updated object
and created?
--------------------------------------------------
Propagating events when there is no data changed
Control for specifying events:
on <emitter>.<emmiter_action> <event> <subscriber>.<subsciber_action>
on mariadb.run changed keystone.run
on keystone.run changed keystone_config.run
on keystone_config.run changed haproxy.reload
+---------------------+
| mariadb.run |
+---------------------+
|
| changed
v
+---------------------+
| keystone_config.run |
+---------------------+
|
| changed
v
+---------------------+
| haproxy.reload |
+---------------------+
<u>.<action> - <event> - <v>.<action>
When data connection between several resources created - events connections should
be created as well, resource a connect resource b:
on a.run changed b.reload
on a.remove changed b.run
-------------------------------------------------
Resolving cycles on a data plane
Resolving rmq cycle with events, lets say we have 4 objects:
- rmq.cluster
- rmq.1, rmq.2, rmq.3
rmq.cluster is a sinc that will use data from all 3 nodes, and those nodes will
consume that sinc - so there is a cycle. We can not depend just on data to resolve
this cycle.
Order of execution should be like this:
rmq.1.run rmq.2.run rmq.3.run
rmq.1.cluster_create
rmq.2.cluster_join, rmq.2.cluster_join
Also cluster operation should happen only when rmq.cluster is changed.
on rmq.cluster changed rmq.1.cluster_create
on rmq.1.cluster_create changed rmq.2.cluster_join
on rmq.1.cluster_create changed rmq.3.cluster_join
+----------------------+
| rmq.1.run |
+----------------------+
|
| changed
v
+----------------------+
| rmq.1.cluster_create | -+
+----------------------+ |
| |
| changed |
v |
+----------------------+ |
| rmq.2.cluster_join | |
+----------------------+ |
^ |
| changed | changed
| |
+----------------------+ |
| rmq.2.run | |
+----------------------+ |
+----------------------+ |
| rmq.3.run | |
+----------------------+ |
| |
| changed |
v |
+----------------------+ |
| rmq.3.cluster_join | <+
+----------------------+
---------------------------------------------------
Resolve cycles on a execution level
We have 5 objects, which forms 2 pathes
- keystone-config -> keystone-service -> haproxy-sevice
- glance-config -> glance-service -> haproxy-service
But also we have keystone endpoint exposed via haproxy, and it is consumed in
glance-config, therefore there is a cycle. And proper resolution for this
cycle would be to install haproxy after keystone is configured, and after that
configure glance, and only after reload haproxy one more time to ensure that
glance exposed via haproxy.
+----+
| g |
+----+
|
|
v
+----+
| gc | <+
+----+ |
| |
| |
v |
+----+ |
+> | ha | -+
| +----+
| +----+
| | k |
| +----+
| |
| |
| v
| +----+
+- | kc |
+----+
During traversal we should check if added node forms a cycle, find a pair
of nodes that created this cycle and create a node with incremented action.
In the above example this resolution will help if ha.run will be incremented
and we will have two actions - ha.run#0 and ha.run#1, and gc.run#0 will lead
to ha.run#1.

View File

@ -10,6 +10,7 @@ from solar.core import signals
from solar.core import validation
from solar.core.resource import virtual_resource as vr
from solar import errors
from solar import events as evapi
from solar.interfaces.db import get_db
@ -73,6 +74,7 @@ def setup_resources():
# KEYSTONE
keystone_puppet = vr.create('keystone_puppet', 'resources/keystone_puppet', {})[0]
evapi.add_dep(rabbitmq_service1.name, keystone_puppet.name, actions=('run', 'update'))
keystone_db = vr.create('keystone_db', 'resources/mariadb_db/', {
'db_name': 'keystone_db',
'login_user': 'root'
@ -351,17 +353,17 @@ def setup_resources():
signals.connect(cinder_puppet, cinder_api_puppet, {
'keystone_host': 'keystone_auth_host',
'keystone_port': 'keystone_auth_port'})
evapi.add_react(cinder_puppet.name, cinder_api_puppet.name, actions=('update',))
# CINDER SCHEDULER
cinder_scheduler_puppet = vr.create('cinder_scheduler_puppet', 'resources/cinder_scheduler_puppet', {})[0]
signals.connect(node1, cinder_scheduler_puppet)
signals.connect(cinder_puppet, cinder_scheduler_puppet)
evapi.add_react(cinder_puppet.name, cinder_scheduler_puppet.name, actions=('update',))
# CINDER VOLUME
cinder_volume_puppet = vr.create('cinder_volume_puppet', 'resources/cinder_volume_puppet', {})[0]
signals.connect(node1, cinder_volume_puppet)
signals.connect(cinder_puppet, cinder_volume_puppet)
evapi.add_react(cinder_puppet.name, cinder_volume_puppet.name, actions=('update',))
# NOVA
nova_puppet = vr.create('nova_puppet', 'resources/nova_puppet', {})[0]
nova_db = vr.create('nova_db', 'resources/mariadb_db/', {

26
solar/solar/cli/base.py Normal file
View File

@ -0,0 +1,26 @@
import click
class AliasedGroup(click.Group):
"""This class introduces iproute2-like behaviour, command will be inferred
by matching patterns.
If there will be more than 1 matches - exception will be raised
Examples:
>> solar ch stage
>> solar cha process
>> solar res action run rabbitmq_service1
"""
def get_command(self, ctx, cmd_name):
rv = click.Group.get_command(self, ctx, cmd_name)
if rv is not None:
return rv
matches = [x for x in self.list_commands(ctx)
if x.startswith(cmd_name)]
if not matches:
return None
elif len(matches) == 1:
return click.Group.get_command(self, ctx, matches[0])
ctx.fail('Too many matches: %s' % ', '.join(sorted(matches)))

58
solar/solar/cli/events.py Normal file
View File

@ -0,0 +1,58 @@
import subprocess
import click
import networkx as nx
from solar.events import api as evapi
@click.group()
def events():
pass
@events.command()
@click.argument('resource')
def show(resource):
all_ = evapi.all_events(resource)
if all_:
click.echo('Resource: {}'.format(resource))
offset = ' ' * 4
for ev in all_:
click.echo(offset+repr(ev))
else:
click.echo('No events for resource {}'.format(resource))
@events.command()
@click.argument('etype')
@click.argument('parent_node')
@click.argument('parent_action')
@click.argument('state')
@click.argument('depend_node')
@click.argument('depend_action')
def add(etype, parent_node, parent_action, state, depend_node, depend_action):
ev = evapi.create_event(locals())
evapi.add_event(ev)
@events.command()
@click.argument('etype')
@click.argument('parent_node')
@click.argument('parent_action')
@click.argument('state')
@click.argument('depend_node')
@click.argument('depend_action')
def rem(etype, parent_node, parent_action, state, depend_node, depend_action):
ev = evapi.create_event(locals())
evapi.remove_event(ev)
@events.command()
@click.argument('resource')
def trav(resource):
dg = evapi.bft_events_graph(resource)
nx.write_dot(dg, '{name}.dot'.format(name='events'))
subprocess.call(
'dot -Tpng {name}.dot -o {name}.png'.format(name='events'),
shell=True)

View File

@ -38,9 +38,11 @@ from solar.interfaces.db import get_db
from solar import errors
from solar.core.log import log
from solar.cli import base
from solar.cli import executors
from solar.cli.orch import orchestration
from solar.cli.system_log import changes
from solar.cli.events import events
# NOTE: these are extensions, they shouldn't be imported here
# Maybe each extension can also extend the CLI with parsers
@ -75,7 +77,7 @@ def show_emitter_connections(emitter_name, destinations):
)
@click.group()
@click.group(cls=base.AliasedGroup)
def main():
pass
@ -417,6 +419,7 @@ def run():
main.add_command(orchestration)
main.add_command(changes)
main.add_command(events)
main()

View File

@ -27,11 +27,11 @@ def validate():
@changes.command()
def stage():
log = change.stage_changes()
staged = list(log.reverse())
if not staged:
log = list(change.stage_changes().reverse())
for item in log:
click.echo(item)
if not log:
click.echo('No changes')
click.echo(staged)
@changes.command()

View File

@ -20,7 +20,8 @@ class AnsibleTemplate(TempFileHandler):
call_args = ['ansible-playbook', '--module-path', '/vagrant/library', '-i', inventory_file, playbook_file]
log.debug('EXECUTING: %s', ' '.join(call_args))
out = fabric_api.local(' '.join(call_args), capture=True)
with fabric_api.shell_env(ANSIBLE_HOST_KEY_CHECKING='False'):
out = fabric_api.local(' '.join(call_args), capture=True)
if out.failed:
raise errors.SolarError(out)

View File

@ -5,6 +5,8 @@ import networkx as nx
from solar.core.log import log
from solar.interfaces.db import get_db
from solar.events.api import add_events
from solar.events.controls import Dependency
db = get_db()
@ -149,6 +151,11 @@ def connect(emitter, receiver, mapping=None):
connect_single(emitter, src, receiver, dst)
events = [
Dependency(emitter.name, 'run', 'success', receiver.name, 'run'),
Dependency(emitter.name, 'update', 'success', receiver.name, 'update')
]
add_events(emitter.name, events)
#receiver.save()

View File

@ -0,0 +1,2 @@
from .api import *

128
solar/solar/events/api.py Normal file
View File

@ -0,0 +1,128 @@
__all__ = ['add_dep', 'add_react']
import networkx as nx
from solar.core.log import log
from solar.interfaces.db import get_db
from solar.events.controls import Dep, React, StateChange
db = get_db()
def create_event(event_dict):
etype = event_dict.pop('etype')
if etype == React.etype:
return React(**event_dict)
elif etype == Dep.etype:
return Dep(**event_dict)
else:
raise Exception('No support for type %s', etype)
def add_event(ev):
rst = all_events(ev.parent_node)
for rev in rst:
if ev == rev:
break
else:
rst.append(ev)
db.save(
ev.parent_node,
[i.to_dict() for i in rst],
collection=db.COLLECTIONS.events)
def add_dep(parent, dep, actions, state='success'):
for act in actions:
d = Dep(parent, act, state=state,
depend_node=dep, depend_action=act)
add_event(d)
log.debug('Added event: %s', d)
def add_react(parent, dep, actions, state='success'):
for act in actions:
r = React(parent, act, state=state,
depend_node=dep, depend_action=act)
add_event(r)
log.debug('Added event: %s', r)
def remove_event(ev):
rst = all_events(ev.parent_node)
db.save(
ev.parent_node,
[i.to_dict() for i in rst],
collection=db.COLLECTIONS.events)
def set_events(resource, lst):
db.save(
resource,
[i.to_dict() for i in lst],
collection=db.COLLECTIONS.events)
def add_events(resource, lst):
rst = all_events(resource)
rst.extend(lst)
set_events(resource, rst)
def all_events(resource):
events = db.read(resource, collection=db.COLLECTIONS.events)
if not events:
return []
return [create_event(i) for i in events]
def bft_events_graph(start):
"""Builds graph of events traversing events in breadth-first order
This graph doesnt necessary reflect deployment order, it is used
to show dependencies between resources
"""
dg = nx.DiGraph()
stack = [start]
visited = set()
while stack:
item = stack.pop()
current_events = all_events(item)
for ev in current_events:
dg.add_edge(ev.parent, ev.dependent, label=ev.state)
if ev.depend_node in visited:
continue
# it is possible to have events leading to same resource but
# different action
if ev.depend_node in stack:
continue
stack.append(ev.depend_node)
visited.add(ev.parent_node)
return dg
def build_edges(changed_resources, changes_graph, events):
"""
:param changed_resources: list of resource names that were changed
:param changes_graph: nx.DiGraph object with actions to be executed
:param events: {res: [controls.Event objects]}
"""
stack = changed_resources[:]
while stack:
node = stack.pop()
if node in events:
log.debug('Events %s for resource %s', events[node], node)
else:
log.debug('No dependencies based on %s', node)
for ev in events.get(node, ()):
ev.insert(stack, changes_graph)
return changes_graph

View File

@ -0,0 +1,99 @@
"""
Available controls:
*depends_on* implements relationship that will guarantee that depended action
on resource will be executed after parent, if parent will be executed. It means
that this control contributes only to ordering, and wont trigger any action
if dependent resource isnt changed.
depends_on:
- parent:run -> ok -> dependent:run
*react_on* - relationship which will guarantee that action on dependent resource
will be executed if parent action is going to be executed. This control will
trigger action even if no changes noticed on dependent resource.
react_on:
- parent:update -> ok -> dependent:update
"""
import re
class Event(object):
etype = None
def __init__(self, parent_node, parent_action,
state='', depend_node='', depend_action=''):
self.parent_node = parent_node
self.parent_action = parent_action
self.state = state
self.depend_node = depend_node
self.depend_action = depend_action
@property
def parent(self):
return '{}.{}'.format(self.parent_node, self.parent_action)
@property
def dependent(self):
return '{}.{}'.format(self.depend_node, self.depend_action)
def to_dict(self):
rst = {'etype': self.etype}
rst.update(self.__dict__)
return rst
def __eq__(self, inst):
if inst.__class__ != self.__class__:
return False
return all((
self.parent == inst.parent,
self.state == inst.state,
self.dependent == inst.dependent))
def __repr__(self):
return '{}: {} -> {} -> {}'.format(
self.etype, self.parent, self.state, self.dependent)
class Dependency(Event):
etype = 'depends_on'
def insert(self, changed_resources, changes_graph):
if (self.parent in changes_graph and
self.dependent in changes_graph):
changes_graph.add_edge(
self.parent, self.dependent, state=self.state)
Dep = Dependency
class React(Event):
etype = 'react_on'
def insert(self, changed_resources, changes_graph):
if self.parent in changes_graph:
if self.dependent not in changes_graph:
changes_graph.add_node(
self.dependent, status='PENDING',
errmsg=None, type='solar_resource',
args=[self.depend_node, self.depend_action])
changes_graph.add_edge(self.parent, self.dependent, state=self.state)
changed_resources.append(self.depend_node)
class StateChange(Event):
etype = 'state_change'
def insert(self, changed_resources, changes_graph):
changed_resources.append(self.parent)
changes_graph.add_node(
self.parent, status='PENDING',
errmsg=None, type='solar_resource',
args=[self.parent_node, self.parent_action])

View File

@ -10,7 +10,7 @@ from solar import errors
class RedisDB(object):
COLLECTIONS = Enum(
'Collections',
'connection resource state_data state_log'
'connection resource state_data state_log events'
)
DB = {
'host': 'localhost',

View File

@ -1,29 +0,0 @@
1. Core orchestration
1.1. Celery integration
1.2. Orchestrate stuff based on plan traversal
1.3. Different controls
1.4. Granular execution (e.g execute start/end, execute certain tasks, execute certain path)
2. User facing interface for orchestration
2.1. How to integrate this stuff with resources?
Two options:
- Use orchestrator as driver
- Orchestrate resources externally, all logic implemented in resource driver
2.2. How to allow reuse across different Profiles ?
Solution: plan for each separate group of resources, e.g. plan for rabbitmq
deployment is not exposed, it is stored only in rabbitmq resources template,
but it exposes several *anchors* - amqp_cluster_ready and amqp_one_node_ready
3. Granular testing
3.1. How to integrate pre/post verifications with graph execution
4. Add back timeout support
Orchestration features
-------------------------
1. Controls
- Execute task even if all predecessors failed
- Execute task when one (specific number) predecessor succeed
- Execute task when certain percent of tasks are success
- Ignore if some task failed
2. Granular execution

View File

@ -21,7 +21,7 @@ def save_graph(name, graph):
def get_graph(name):
dg = nx.DiGraph()
dg = nx.MultiDiGraph()
nodes = json.loads(r.get('{}:nodes'.format(name)))
edges = json.loads(r.get('{}:edges'.format(name)))
dg.graph = json.loads(r.get('{}:attributes'.format(name)))
@ -37,7 +37,7 @@ def parse_plan(plan_data):
""" parses yaml definition and returns graph
"""
plan = yaml.load(plan_data)
dg = nx.DiGraph()
dg = nx.MultiDiGraph()
dg.graph['name'] = plan['name']
for task in plan['tasks']:
dg.add_node(

View File

@ -11,6 +11,7 @@ ERROR - visited node, but failed, can be failed by timeout
SUCCESS - visited node, successfull
INPROGRESS - task already scheduled, can be moved to ERROR or SUCCESS
SKIPPED - not visited, and should be skipped from execution
NOOP - task wont be executed, but should be treated as visited
"""

View File

@ -11,6 +11,7 @@ from solar.interfaces.db import get_db
from solar.core import actions
from solar.system_log import data
from solar.orchestration import graph
from solar.events import api as evapi
db = get_db()
@ -26,22 +27,6 @@ def guess_action(from_, to):
return 'update'
def connections(res, graph):
result = []
for pred in graph.predecessors(res.name):
for num, edge in graph.get_edge_data(pred, res.name).items():
if 'label' in edge:
if ':' in edge['label']:
parent, child = edge['label'].split(':')
mapping = [parent, child]
else:
mapping = [edge['label'], edge['label']]
else:
mapping = None
result.append([pred, res.name, mapping])
return result
def create_diff(staged, commited):
return list(diff(commited, staged))
@ -63,11 +48,12 @@ def _stage_changes(staged_resources, conn_graph,
df = create_diff(staged_data, commited_data)
if df:
action = guess_action(commited_data, staged_data)
log_item = data.LogItem(
utils.generate_uuid(),
res_uid,
df,
guess_action(commited_data, staged_data))
'{}.{}'.format(res_uid, action),
df)
staged_log.append(log_item)
return staged_log
@ -83,26 +69,28 @@ def stage_changes():
def send_to_orchestration():
conn_graph = signals.detailed_connection_graph()
dg = nx.DiGraph()
dg = nx.MultiDiGraph()
staged = {r.name: r.args_show()
for r in resource.load_all().values()}
commited = data.CD()
events = {}
changed_nodes = []
for res_uid in nx.topological_sort(conn_graph):
for res_uid in staged.keys():
commited_data = commited.get(res_uid, {})
staged_data = staged.get(res_uid, {})
df = create_diff(staged_data, commited_data)
if df:
dg.add_node(
res_uid, status='PENDING',
errmsg=None,
**parameters(res_uid, guess_action(commited_data, staged_data)))
for pred in conn_graph.predecessors(res_uid):
if pred in dg:
dg.add_edge(pred, res_uid)
events[res_uid] = evapi.all_events(res_uid)
changed_nodes.append(res_uid)
action = guess_action(commited_data, staged_data)
state_change = evapi.StateChange(res_uid, action)
state_change.insert(changed_nodes, dg)
evapi.build_edges(changed_nodes, dg, events)
# what it should be?
dg.graph['name'] = 'system_log'

View File

@ -30,12 +30,12 @@ CL = partial(state_file, 'commit_log')
class LogItem(object):
def __init__(self, uid, res, diff, action, state=None):
def __init__(self, uid, res, log_action, diff, state=None):
self.uid = uid
self.res = res
self.log_action = log_action
self.diff = diff
self.state = state or STATES.pending
self.action = action
def to_yaml(self):
return utils.yaml_dump(self.to_dict())
@ -43,9 +43,9 @@ class LogItem(object):
def to_dict(self):
return {'uid': self.uid,
'res': self.res,
'log_action': self.log_action,
'diff': self.diff,
'state': self.state.name,
'action': self.action}
'state': self.state.name}
@classmethod
def from_dict(cls, **kwargs):
@ -54,10 +54,14 @@ class LogItem(object):
return cls(**kwargs)
def __str__(self):
return self.to_yaml()
return self.compact
def __repr__(self):
return self.to_yaml()
return self.compact
@property
def compact(self):
return 'log task={} uid={}'.format(self.log_action, self.uid)
class Log(object):
@ -66,7 +70,7 @@ class Log(object):
self.ordered_log = db.get_set(path)
def append(self, logitem):
self.ordered_log.add([(logitem.res, logitem.to_dict())])
self.ordered_log.add([(logitem.log_action, logitem.to_dict())])
def pop(self, uid):
item = self.get(uid)
@ -76,7 +80,7 @@ class Log(object):
return item
def update(self, logitem):
self.ordered_log.update(logitem.res, logitem.to_dict())
self.ordered_log.update(logitem.log_action, logitem.to_dict())
def clean(self):
self.ordered_log.clean()

View File

@ -4,20 +4,20 @@ from solar.system_log import data
from dictdiffer import patch
def set_error(task_uuid, *args, **kwargs):
def set_error(log_action, *args, **kwargs):
sl = data.SL()
item = sl.get(task_uuid)
item = sl.get(log_action)
if item:
item.state = data.STATES.error
sl.update(item)
def move_to_commited(task_uuid, *args, **kwargs):
def move_to_commited(log_action, *args, **kwargs):
sl = data.SL()
item = sl.pop(task_uuid)
item = sl.pop(log_action)
if item:
commited = data.CD()
staged_data = patch(item.diff, commited.get(item.res, {}))
staged_data = patch(item.diff, commited.get(item.log_action, {}))
cl = data.CL()
item.state = data.STATES.success
cl.append(item)

View File

@ -6,11 +6,11 @@ from solar.system_log.operations import set_error, move_to_commited
__all__ = ['error_logitem', 'commit_logitem']
@app.task
@app.task(name='error_logitem')
def error_logitem(task_uuid):
return set_error(task_uuid.rsplit(':', 1)[-1])
@app.task
@app.task(name='commit_logitem')
def commit_logitem(task_uuid):
return move_to_commited(task_uuid.rsplit(':', 1)[-1])

View File

@ -0,0 +1,68 @@
import networkx as nx
from pytest import fixture
from solar.events import api as evapi
@fixture
def nova_deps():
rst = [
evapi.Dep('nova', 'run', 'success', 'nova_api', 'run'),
evapi.Dep('nova', 'update', 'success', 'nova_api', 'update'),
evapi.React('nova', 'update', 'success', 'nova_api', 'update')
]
return {'nova': rst}
def test_nova_api_run_after_nova(nova_deps):
changed = ['nova', 'nova_api']
changes_graph = nx.DiGraph()
changes_graph.add_node('nova.run')
changes_graph.add_node('nova_api.run')
evapi.build_edges(changed, changes_graph, nova_deps)
assert changes_graph.successors('nova.run') == ['nova_api.run']
def test_nova_api_react_on_update(nova_deps):
"""Test that nova_api:update will be called even if there is no changes
in nova_api
"""
changed = ['nova']
changes_graph = nx.DiGraph()
changes_graph.add_node('nova.update')
evapi.build_edges(changed, changes_graph, nova_deps)
assert changes_graph.successors('nova.update') == ['nova_api.update']
@fixture
def rmq_deps():
"""Example of a case when defaults are not good enough.
For example we need to run some stuff on first node before two others.
"""
# NOTE(dshulyak) is it possible to put this create/join logic into
# puppet manifest? So that rmq_cluster.2 before joining will check if
# cluster already exists?
return {
'rmq.1': [evapi.Dep('rmq.1', 'run', 'success', 'rmq_cluster.1', 'create')],
'rmq.2': [evapi.Dep('rmq.2', 'run', 'success', 'rmq_cluster.2', 'join')],
'rmq.3': [evapi.Dep('rmq.3', 'run', 'success', 'rmq_cluster.3', 'join')],
'rmq_cluster.1': [
evapi.Dep('rmq_cluster.1', 'create', 'success', 'rmq_cluster.2', 'join'),
evapi.Dep('rmq_cluster.1', 'create', 'success', 'rmq_cluster.3', 'join')]}
def test_rmq(rmq_deps):
changed = ['rmq.1', 'rmq.2', 'rmq.3', 'rmq_cluster.1', 'rmq_cluster.2', 'rmq_cluster.3']
changes_graph = nx.DiGraph()
changes_graph.add_node('rmq.1.run')
changes_graph.add_node('rmq.2.run')
changes_graph.add_node('rmq.3.run')
changes_graph.add_node('rmq_cluster.1.create')
changes_graph.add_node('rmq_cluster.2.join')
changes_graph.add_node('rmq_cluster.3.join')
evapi.build_edges(changed, changes_graph, rmq_deps)
assert set(changes_graph.successors('rmq_cluster.1.create')) == {
'rmq_cluster.2.join', 'rmq_cluster.3.join'}