Merge "Refactor notification framework"
This commit is contained in:
commit
8721513b8a
@ -73,6 +73,9 @@ zaqar.storage.mongodb.driver.queue.stages =
|
|||||||
zaqar.storage.redis.driver.queue.stages =
|
zaqar.storage.redis.driver.queue.stages =
|
||||||
message_queue_handler = zaqar.storage.redis.messages:MessageQueueHandler
|
message_queue_handler = zaqar.storage.redis.messages:MessageQueueHandler
|
||||||
|
|
||||||
|
zaqar.notification.tasks =
|
||||||
|
http = zaqar.notification.task.webhook:WebhookTask
|
||||||
|
https = zaqar.notification.task.webhook:WebhookTask
|
||||||
|
|
||||||
[nosetests]
|
[nosetests]
|
||||||
where=zaqar/tests
|
where=zaqar/tests
|
||||||
|
@ -13,16 +13,14 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
from stevedore import driver
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
import futurist
|
import futurist
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
import six
|
from six.moves import urllib_parse
|
||||||
from taskflow import engines
|
from taskflow import engines
|
||||||
from taskflow.patterns import unordered_flow as uf
|
from taskflow.patterns import unordered_flow as uf
|
||||||
from taskflow import task
|
|
||||||
|
|
||||||
from zaqar.notification.task import webhook
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -40,17 +38,6 @@ class NotifierDriver(object):
|
|||||||
# TODO(flwang): Make the max_workers configurable
|
# TODO(flwang): Make the max_workers configurable
|
||||||
self.executor = futurist.ThreadPoolExecutor(max_workers=10)
|
self.executor = futurist.ThreadPoolExecutor(max_workers=10)
|
||||||
|
|
||||||
def _generate_task(self, subscriber_uri, message):
|
|
||||||
task_name = uuid.uuid4()
|
|
||||||
# TODO(flwang): Need to work out a better way to make tasks
|
|
||||||
s_type = six.moves.urllib_parse.urlparse(subscriber_uri).scheme
|
|
||||||
|
|
||||||
t = task.Task
|
|
||||||
if s_type in ('http', 'https'):
|
|
||||||
t = webhook.WebhookTask
|
|
||||||
|
|
||||||
return t(task_name, inject={'uri': subscriber_uri, 'message': message})
|
|
||||||
|
|
||||||
def post(self, queue_name, messages, client_uuid, project=None):
|
def post(self, queue_name, messages, client_uuid, project=None):
|
||||||
"""Send messages to the subscribers."""
|
"""Send messages to the subscribers."""
|
||||||
if self.subscription_controller:
|
if self.subscription_controller:
|
||||||
@ -59,9 +46,18 @@ class NotifierDriver(object):
|
|||||||
|
|
||||||
wh_flow = uf.Flow('webhook_notifier_flow')
|
wh_flow = uf.Flow('webhook_notifier_flow')
|
||||||
|
|
||||||
for s in list(next(subscribers)):
|
for sub in next(subscribers):
|
||||||
for m in messages:
|
s_type = urllib_parse.urlparse(sub['subscriber']).scheme
|
||||||
wh_flow.add(self._generate_task(s['subscriber'], m))
|
invoke_args = [uuid.uuid4()]
|
||||||
|
invoke_kwds = {'inject': {'subscription': sub,
|
||||||
|
'messages': messages}}
|
||||||
|
|
||||||
|
mgr = driver.DriverManager('zaqar.notification.tasks',
|
||||||
|
s_type,
|
||||||
|
invoke_on_load=True,
|
||||||
|
invoke_args=invoke_args,
|
||||||
|
invoke_kwds=invoke_kwds)
|
||||||
|
wh_flow.add(mgr.driver)
|
||||||
|
|
||||||
if wh_flow:
|
if wh_flow:
|
||||||
e = engines.load(wh_flow, executor=self.executor,
|
e = engines.load(wh_flow, executor=self.executor,
|
||||||
|
@ -26,9 +26,11 @@ class WebhookTask(task.Task):
|
|||||||
super(WebhookTask, self).__init__(name, inject=inject)
|
super(WebhookTask, self).__init__(name, inject=inject)
|
||||||
self._show_name = show_name
|
self._show_name = show_name
|
||||||
|
|
||||||
def execute(self, uri, message, **kwargs):
|
def execute(self, subscription, messages, **kwargs):
|
||||||
try:
|
try:
|
||||||
requests.post(uri, data=json.dumps(message),
|
for msg in messages:
|
||||||
|
requests.post(subscription['subscriber'],
|
||||||
|
data=json.dumps(msg),
|
||||||
headers={'Content-Type': 'application/json'})
|
headers={'Content-Type': 'application/json'})
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
LOG.error(e)
|
LOG.error(e)
|
||||||
|
@ -19,7 +19,6 @@ import uuid
|
|||||||
import mock
|
import mock
|
||||||
|
|
||||||
from zaqar.notification import notifier
|
from zaqar.notification import notifier
|
||||||
from zaqar.notification import task
|
|
||||||
from zaqar import tests as testing
|
from zaqar import tests as testing
|
||||||
|
|
||||||
|
|
||||||
@ -74,11 +73,6 @@ class NotifierTest(testing.TestBase):
|
|||||||
], any_order=True)
|
], any_order=True)
|
||||||
self.assertEqual(6, len(mock_post.mock_calls))
|
self.assertEqual(6, len(mock_post.mock_calls))
|
||||||
|
|
||||||
def test_generate_task(self):
|
|
||||||
subscriber = self.subscription[0]['subscriber']
|
|
||||||
new_task = self.driver._generate_task(subscriber, self.messages)
|
|
||||||
self.assertIsInstance(new_task, task.webhook.WebhookTask)
|
|
||||||
|
|
||||||
def test_post_no_subscriber(self):
|
def test_post_no_subscriber(self):
|
||||||
ctlr = mock.MagicMock()
|
ctlr = mock.MagicMock()
|
||||||
ctlr.list = mock.Mock(return_value=iter([[]]))
|
ctlr.list = mock.Mock(return_value=iter([[]]))
|
||||||
|
@ -72,7 +72,7 @@ _TRANSPORT_LIMITS_OPTIONS = (
|
|||||||
deprecated_group='limits:transport',
|
deprecated_group='limits:transport',
|
||||||
help='Defines the maximum message grace period in seconds.'),
|
help='Defines the maximum message grace period in seconds.'),
|
||||||
|
|
||||||
cfg.ListOpt('subscriber_types', default=['http'],
|
cfg.ListOpt('subscriber_types', default=['http', 'https'],
|
||||||
help='Defines supported subscriber types.'),
|
help='Defines supported subscriber types.'),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user