Change-Id: I8fc0018f57563e9ae0becd5c4be56a9ee16023d2
This commit is contained in:
LingxianKong 2015-05-05 13:12:07 +08:00
parent c402b72d2f
commit 1916ff862a
6 changed files with 2 additions and 448 deletions

File diff suppressed because one or more lines are too long

View File

@ -16,4 +16,3 @@ six>=1.9.0
SQLAlchemy>=0.9.7,<=0.9.99
stevedore>=1.3.0 # Apache-2.0
keystonemiddleware>=1.5.0
libvirt-python>=1.2.5 # LGPLv2+

View File

@ -2,7 +2,7 @@
name = terracotta
summary = Dynamic Scheduling service for OpenStack Cloud
description-file =
README.rst
README.md
license = Apache License, Version 2.0
home-page = https://launchpad.net/terracotta
classifiers =

View File

@ -19,6 +19,7 @@ from oslo_messaging.rpc import client
from terracotta import context as auth_ctx
from terracotta import exceptions as exc
LOG = logging.getLogger(__name__)
@ -112,113 +113,6 @@ class EngineClient(base.Engine):
serializer=serializer
)
@wrap_messaging_exception
def start_workflow(self, wf_name, wf_input, **params):
"""Starts workflow sending a request to engine over RPC.
:return: Workflow execution.
"""
return self._client.call(
auth_ctx.ctx(),
'start_workflow',
workflow_name=wf_name,
workflow_input=wf_input or {},
params=params
)
def on_task_state_change(self, task_ex_id, state):
return self._client.call(
auth_ctx.ctx(),
'on_task_state_change',
task_ex_id=task_ex_id,
state=state
)
@wrap_messaging_exception
def on_action_complete(self, action_ex_id, result):
"""Conveys action result to Mistral Engine.
This method should be used by clients of Mistral Engine to update
state of a action execution once action has executed. One of the
clients of this method is Mistral REST API server that receives
action result from the outside action handlers.
Note: calling this method serves an event notifying Mistral that
it possibly needs to move the workflow on, i.e. run other workflow
tasks for which all dependencies are satisfied.
:return: Task.
"""
return self._client.call(
auth_ctx.ctx(),
'on_action_complete',
action_ex_id=action_ex_id,
result_data=result.data,
result_error=result.error
)
@wrap_messaging_exception
def pause_workflow(self, execution_id):
"""Stops the workflow with the given execution id.
:return: Workflow execution.
"""
return self._client.call(
auth_ctx.ctx(),
'pause_workflow',
execution_id=execution_id
)
@wrap_messaging_exception
def resume_workflow(self, execution_id):
"""Resumes the workflow with the given execution id.
:return: Workflow execution.
"""
return self._client.call(
auth_ctx.ctx(),
'resume_workflow',
execution_id=execution_id
)
@wrap_messaging_exception
def stop_workflow(self, execution_id, state, message=None):
"""Stops workflow execution with given status.
Once stopped, the workflow is complete with SUCCESS or ERROR,
and can not be resumed.
:param execution_id: Workflow execution id
:param state: State assigned to the workflow: SUCCESS or ERROR
:param message: Optional information string
:return: Workflow execution, model.Execution
"""
return self._client.call(
auth_ctx.ctx(),
'stop_workflow',
execution_id=execution_id,
state=state,
message=message
)
@wrap_messaging_exception
def rollback_workflow(self, execution_id):
"""Rolls back the workflow with the given execution id.
:return: Workflow execution.
"""
return self._client.call(
auth_ctx.ctx(),
'rollback_workflow',
execution_id=execution_id
)
class LocalManagerServer(object):
"""RPC Executor server."""
@ -226,26 +120,6 @@ class LocalManagerServer(object):
def __init__(self, manager):
self._executor = manager
def run_action(self, rpc_ctx, action_ex_id, action_class_str,
attributes, params):
"""Receives calls over RPC to run action on executor.
:param rpc_ctx: RPC request context dictionary.
"""
LOG.info(
"Received RPC request 'run_action'[rpc_ctx=%s,"
" action_ex_id=%s, action_class=%s, attributes=%s, params=%s]"
% (rpc_ctx, action_ex_id, action_class_str, attributes, params)
)
self._executor.run_action(
action_ex_id,
action_class_str,
attributes,
params
)
class ExecutorClient(base.Executor):
"""RPC Executor client."""
@ -266,20 +140,3 @@ class ExecutorClient(base.Executor):
messaging.Target(),
serializer=serializer
)
def run_action(self, action_ex_id, action_class_str, attributes,
action_params, target=None):
"""Sends a request to run action to executor."""
kwargs = {
'action_ex_id': action_ex_id,
'action_class_str': action_class_str,
'attributes': attributes,
'params': action_params
}
self._client.prepare(topic=self.topic, server=target).cast(
auth_ctx.ctx(),
'run_action',
**kwargs
)