Stevedore worker plugins

The deferred processor now loads plugins via stevedore, and passes
all event messages to each plugin found. This provides the first
extensibility mechanism for storyboard, by allowing anyone to write
tasks that should be handled asynchronously when the system changes.
Custom workers must implement two methods: 'enabled' and 'handle'.
The first checks to see whether this plugin is enabled, and will be
run only once during initialization. The second handles the event.

The intent is to drive more advanced functionality off of this
mechanism, such as emails, search indexes, and third party tool
integration.

Change-Id: I32b40eab9355c18db1e4ec132b09dc77561a3475
This commit is contained in:
Michael Krotscheck 2014-10-01 14:24:05 -07:00
parent 9bf27916cb
commit b2e26aa7ed
6 changed files with 128 additions and 31 deletions

View File

@ -17,3 +17,4 @@ WSME>=0.6
sqlalchemy-migrate>=0.8.2,!=0.8.4
SQLAlchemy-FullText-Search
eventlet>=0.13.0
stevedore>=1.0.0

View File

@ -36,6 +36,8 @@ console_scripts =
storyboard-worker-daemon = storyboard.worker.daemon:run
storyboard-db-manage = storyboard.db.migration.cli:main
storyboard-migrate = storyboard.migrate.cli:main
storyboard.worker.task =
subscription = storyboard.worker.task.subscription:Subscription
[build_sphinx]
source-dir = doc/source

View File

@ -13,19 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import ast
import time
from oslo.config import cfg
from pika.exceptions import ConnectionClosed
from stevedore import enabled
from storyboard.db.api import timeline_events
from storyboard.notifications.conf import NOTIFICATION_OPTS
from storyboard.notifications.connection_service import ConnectionService
from storyboard.notifications.subscriptions_handler import handle_deletions
from storyboard.notifications.subscriptions_handler import handle_resources
from storyboard.notifications.subscriptions_handler import \
handle_timeline_events
from storyboard.openstack.common import log
@ -41,6 +36,13 @@ def subscribe():
subscriber = Subscriber(CONF.notifications)
subscriber.start()
manager = enabled.EnabledExtensionManager(
namespace='storyboard.worker.task',
check_func=check_enabled,
invoke_on_load=True,
invoke_args=(CONF,)
)
while subscriber.started:
(method, properties, body) = subscriber.get()
@ -49,35 +51,31 @@ def subscribe():
time.sleep(5)
continue
body_dict = ast.literal_eval(body)
if 'event_id' in body_dict:
event_id = body_dict['event_id']
event = timeline_events.event_get(event_id)
handle_timeline_events(event, body_dict['author_id'])
manager.map(handle_event, body)
else:
if body_dict['resource'] == 'project_groups':
if 'sub_resource_id' in body_dict:
handle_resources(method=body_dict['method'],
resource_id=body_dict['resource_id'],
sub_resource_id=body_dict[
'sub_resource_id'],
author_id=body_dict['author_id'])
else:
handle_resources(method=body_dict['method'],
resource_id=body_dict['resource_id'],
author_id=body_dict['author_id'])
if body_dict['method'] == 'DELETE':
resource_name = body_dict['resource']
resource_id = body_dict['resource_id']
if 'sub_resource_id' not in body_dict:
handle_deletions(resource_name, resource_id)
# Handle the message
# Ack the message
subscriber.ack(method.delivery_tag)
def handle_event(ext, body):
"""Handle an event from the queue.
:param ext: The extension that's handling this event.
:param body: The body of the event.
:return: The result of the handler.
"""
return ext.obj.handle(body)
def check_enabled(ext):
"""Check to see whether an extension should be enabled.
:param ext: The extension instance to check.
:return: True if it should be enabled. Otherwise false.
"""
return ext.obj.enabled()
class Subscriber(ConnectionService):
def __init__(self, conf):
"""Setup the subscriber instance based on our configuration.

View File

View File

@ -0,0 +1,37 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing permissions and
# limitations under the License.
import abc
class WorkerTaskBase(object):
"""Base class for a worker that listens to events that occur within the
API.
"""
__metaclass__ = abc.ABCMeta
def __init__(self, config):
self.config = config
@abc.abstractmethod
def enabled(self):
"""A method which indicates whether this worker task is properly
configured and should be enabled. If it's ready to go, return True.
Otherwise, return False.
"""
@abc.abstractmethod
def handle(self, body):
"""Handle an event."""

View File

@ -0,0 +1,59 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing permissions and
# limitations under the License.
import json
from storyboard.db.api import timeline_events
from storyboard.notifications.subscriptions_handler import handle_deletions
from storyboard.notifications.subscriptions_handler import handle_resources
from storyboard.notifications.subscriptions_handler import \
handle_timeline_events
from storyboard.worker.task.base import WorkerTaskBase
class Subscription(WorkerTaskBase):
def handle(self, body):
"""This worker handles API events and attempts to determine whether
they correspond to user subscriptions.
:param body: The event message body.
:return:
"""
body_dict = json.loads(body)
if 'event_id' in body_dict:
event_id = body_dict['event_id']
event = timeline_events.event_get(event_id)
handle_timeline_events(event, body_dict['author_id'])
else:
if body_dict['resource'] == 'project_groups':
if 'sub_resource_id' in body_dict:
handle_resources(method=body_dict['method'],
resource_id=body_dict['resource_id'],
sub_resource_id=body_dict[
'sub_resource_id'],
author_id=body_dict['author_id'])
else:
handle_resources(method=body_dict['method'],
resource_id=body_dict['resource_id'],
author_id=body_dict['author_id'])
if body_dict['method'] == 'DELETE':
resource_name = body_dict['resource']
resource_id = body_dict['resource_id']
if 'sub_resource_id' not in body_dict:
handle_deletions(resource_name, resource_id)
def enabled(self):
return True