From b2e26aa7ed3adfbafbd20252e84b0d904e04d29a Mon Sep 17 00:00:00 2001 From: Michael Krotscheck Date: Wed, 1 Oct 2014 14:24:05 -0700 Subject: [PATCH] Stevedore worker plugins The deferred processor now loads plugins via stevedore, and passes all event messages to each plugin found. This provides the first extensibility mechanism for storyboard, by allowing anyone to write tasks that should be handled asynchronously when the system changes. Custom workers must implement two methods: 'enabled' and 'handle'. The first checks to see whether this plugin is enabled, and will be run only once during initialization. The second handles the event. The intent is to drive more advanced functionality off of this mechanism, such as emails, search indexes, and third party tool integration. Change-Id: I32b40eab9355c18db1e4ec132b09dc77561a3475 --- requirements.txt | 1 + setup.cfg | 2 + storyboard/notifications/subscriber.py | 60 +++++++++++++------------- storyboard/worker/task/__init__.py | 0 storyboard/worker/task/base.py | 37 ++++++++++++++++ storyboard/worker/task/subscription.py | 59 +++++++++++++++++++++++++ 6 files changed, 128 insertions(+), 31 deletions(-) create mode 100644 storyboard/worker/task/__init__.py create mode 100644 storyboard/worker/task/base.py create mode 100644 storyboard/worker/task/subscription.py diff --git a/requirements.txt b/requirements.txt index 41d1255f..b68c12b3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,3 +17,4 @@ WSME>=0.6 sqlalchemy-migrate>=0.8.2,!=0.8.4 SQLAlchemy-FullText-Search eventlet>=0.13.0 +stevedore>=1.0.0 \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index 01538726..19c3656e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -36,6 +36,8 @@ console_scripts = storyboard-worker-daemon = storyboard.worker.daemon:run storyboard-db-manage = storyboard.db.migration.cli:main storyboard-migrate = storyboard.migrate.cli:main +storyboard.worker.task = + subscription = storyboard.worker.task.subscription:Subscription [build_sphinx] source-dir = doc/source diff --git a/storyboard/notifications/subscriber.py b/storyboard/notifications/subscriber.py index 9de2e989..a5e3a452 100644 --- a/storyboard/notifications/subscriber.py +++ b/storyboard/notifications/subscriber.py @@ -13,19 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -import ast import time from oslo.config import cfg from pika.exceptions import ConnectionClosed +from stevedore import enabled -from storyboard.db.api import timeline_events from storyboard.notifications.conf import NOTIFICATION_OPTS from storyboard.notifications.connection_service import ConnectionService -from storyboard.notifications.subscriptions_handler import handle_deletions -from storyboard.notifications.subscriptions_handler import handle_resources -from storyboard.notifications.subscriptions_handler import \ - handle_timeline_events from storyboard.openstack.common import log @@ -41,6 +36,13 @@ def subscribe(): subscriber = Subscriber(CONF.notifications) subscriber.start() + manager = enabled.EnabledExtensionManager( + namespace='storyboard.worker.task', + check_func=check_enabled, + invoke_on_load=True, + invoke_args=(CONF,) + ) + while subscriber.started: (method, properties, body) = subscriber.get() @@ -49,35 +51,31 @@ def subscribe(): time.sleep(5) continue - body_dict = ast.literal_eval(body) - if 'event_id' in body_dict: - event_id = body_dict['event_id'] - event = timeline_events.event_get(event_id) - handle_timeline_events(event, body_dict['author_id']) + manager.map(handle_event, body) - else: - if body_dict['resource'] == 'project_groups': - if 'sub_resource_id' in body_dict: - handle_resources(method=body_dict['method'], - resource_id=body_dict['resource_id'], - sub_resource_id=body_dict[ - 'sub_resource_id'], - author_id=body_dict['author_id']) - else: - handle_resources(method=body_dict['method'], - resource_id=body_dict['resource_id'], - author_id=body_dict['author_id']) - - if body_dict['method'] == 'DELETE': - resource_name = body_dict['resource'] - resource_id = body_dict['resource_id'] - if 'sub_resource_id' not in body_dict: - handle_deletions(resource_name, resource_id) - - # Handle the message + # Ack the message subscriber.ack(method.delivery_tag) +def handle_event(ext, body): + """Handle an event from the queue. + + :param ext: The extension that's handling this event. + :param body: The body of the event. + :return: The result of the handler. + """ + return ext.obj.handle(body) + + +def check_enabled(ext): + """Check to see whether an extension should be enabled. + + :param ext: The extension instance to check. + :return: True if it should be enabled. Otherwise false. + """ + return ext.obj.enabled() + + class Subscriber(ConnectionService): def __init__(self, conf): """Setup the subscriber instance based on our configuration. diff --git a/storyboard/worker/task/__init__.py b/storyboard/worker/task/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/storyboard/worker/task/base.py b/storyboard/worker/task/base.py new file mode 100644 index 00000000..a5cd8162 --- /dev/null +++ b/storyboard/worker/task/base.py @@ -0,0 +1,37 @@ +# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing permissions and +# limitations under the License. + +import abc + + +class WorkerTaskBase(object): + """Base class for a worker that listens to events that occur within the + API. + """ + + __metaclass__ = abc.ABCMeta + + def __init__(self, config): + self.config = config + + @abc.abstractmethod + def enabled(self): + """A method which indicates whether this worker task is properly + configured and should be enabled. If it's ready to go, return True. + Otherwise, return False. + """ + + @abc.abstractmethod + def handle(self, body): + """Handle an event.""" diff --git a/storyboard/worker/task/subscription.py b/storyboard/worker/task/subscription.py new file mode 100644 index 00000000..56583344 --- /dev/null +++ b/storyboard/worker/task/subscription.py @@ -0,0 +1,59 @@ +# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing permissions and +# limitations under the License. + +import json + +from storyboard.db.api import timeline_events +from storyboard.notifications.subscriptions_handler import handle_deletions +from storyboard.notifications.subscriptions_handler import handle_resources +from storyboard.notifications.subscriptions_handler import \ + handle_timeline_events +from storyboard.worker.task.base import WorkerTaskBase + + +class Subscription(WorkerTaskBase): + def handle(self, body): + """This worker handles API events and attempts to determine whether + they correspond to user subscriptions. + + :param body: The event message body. + :return: + """ + body_dict = json.loads(body) + if 'event_id' in body_dict: + event_id = body_dict['event_id'] + event = timeline_events.event_get(event_id) + handle_timeline_events(event, body_dict['author_id']) + + else: + if body_dict['resource'] == 'project_groups': + if 'sub_resource_id' in body_dict: + handle_resources(method=body_dict['method'], + resource_id=body_dict['resource_id'], + sub_resource_id=body_dict[ + 'sub_resource_id'], + author_id=body_dict['author_id']) + else: + handle_resources(method=body_dict['method'], + resource_id=body_dict['resource_id'], + author_id=body_dict['author_id']) + + if body_dict['method'] == 'DELETE': + resource_name = body_dict['resource'] + resource_id = body_dict['resource_id'] + if 'sub_resource_id' not in body_dict: + handle_deletions(resource_name, resource_id) + + def enabled(self): + return True