diff --git a/requirements.txt b/requirements.txt index af13cd0ba..e465fb092 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,5 +26,4 @@ enum34;python_version=='2.7' or python_version=='2.6' trollius>=1.0 autobahn>=0.10.1 # MIT License requests>=2.5.2 -taskflow>=1.16.0 futurist>=0.1.2 # Apache-2.0 diff --git a/zaqar/notification/notifier.py b/zaqar/notification/notifier.py index 5dd2e27e4..ad0d633ba 100644 --- a/zaqar/notification/notifier.py +++ b/zaqar/notification/notifier.py @@ -14,13 +14,10 @@ # limitations under the License. from stevedore import driver -import uuid import futurist from oslo_log import log as logging from six.moves import urllib_parse -from taskflow import engines -from taskflow.patterns import unordered_flow as uf LOG = logging.getLogger(__name__) @@ -44,24 +41,12 @@ class NotifierDriver(object): subscribers = self.subscription_controller.list(queue_name, project) - wh_flow = uf.Flow('webhook_notifier_flow') - for sub in next(subscribers): s_type = urllib_parse.urlparse(sub['subscriber']).scheme - invoke_args = [uuid.uuid4()] - invoke_kwds = {'inject': {'subscription': sub, - 'messages': messages}} mgr = driver.DriverManager('zaqar.notification.tasks', s_type, - invoke_on_load=True, - invoke_args=invoke_args, - invoke_kwds=invoke_kwds) - wh_flow.add(mgr.driver) - - if wh_flow: - e = engines.load(wh_flow, executor=self.executor, - engine='parallel') - e.run() + invoke_on_load=True) + self.executor.submit(mgr.driver.execute, sub, messages) else: LOG.error('Failed to get subscription controller.') diff --git a/zaqar/notification/task/webhook.py b/zaqar/notification/task/webhook.py index 4b21446ed..b2b2dbb59 100644 --- a/zaqar/notification/task/webhook.py +++ b/zaqar/notification/task/webhook.py @@ -16,15 +16,11 @@ import json from oslo_log import log as logging import requests -from taskflow import task LOG = logging.getLogger(__name__) -class WebhookTask(task.Task): - def __init__(self, name, show_name=True, inject=None): - super(WebhookTask, self).__init__(name, inject=inject) - self._show_name = show_name +class WebhookTask(object): def execute(self, subscription, messages, **kwargs): try: