From 8ce9d3eaeff938f89b82eab048151818094eae9c Mon Sep 17 00:00:00 2001 From: Fei Long Wang Date: Thu, 16 Jul 2015 15:29:06 +1200 Subject: [PATCH] Add email driver for notification DocImpact blueprint email-notification Change-Id: Ib09f7d3b47c3a4479da7fb9b9e2acceb691975ac --- setup.cfg | 1 + zaqar/common/configs.py | 12 ++- zaqar/notification/notifier.py | 7 +- zaqar/notification/task/mailto.py | 50 +++++++++++++ zaqar/storage/redis/subscriptions.py | 3 +- .../tests/unit/notification/test_notifier.py | 75 ++++++++++++++----- zaqar/transport/validation.py | 2 +- 7 files changed, 128 insertions(+), 22 deletions(-) create mode 100644 zaqar/notification/task/mailto.py diff --git a/setup.cfg b/setup.cfg index c2f2edf5e..8b50f7b9e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -76,6 +76,7 @@ zaqar.storage.redis.driver.queue.stages = zaqar.notification.tasks = http = zaqar.notification.task.webhook:WebhookTask https = zaqar.notification.task.webhook:WebhookTask + mailto = zaqar.notification.task.mailto:MailtoTask [nosetests] where=zaqar/tests diff --git a/zaqar/common/configs.py b/zaqar/common/configs.py index 4b12eacd5..af49b0a5d 100644 --- a/zaqar/common/configs.py +++ b/zaqar/common/configs.py @@ -54,7 +54,17 @@ _SIGNED_URL_OPTIONS = ( _SIGNED_URL_GROUP = 'signed_url' +_NOTIFICATION_OPTIONS = ( + cfg.StrOpt('smtp_command', default='/usr/sbin/sendmail -t -oi', + help=('The command of smtp to send email. The format is ' + '"command_name arg1 arg2".')), +) + +_NOTIFICATION_GROUP = 'notification' + + def _config_options(): return [(None, _GENERAL_OPTIONS), (_DRIVER_GROUP, _DRIVER_OPTIONS), - (_SIGNED_URL_GROUP, _SIGNED_URL_OPTIONS)] + (_SIGNED_URL_GROUP, _SIGNED_URL_OPTIONS), + (_NOTIFICATION_GROUP, _NOTIFICATION_OPTIONS)] diff --git a/zaqar/notification/notifier.py b/zaqar/notification/notifier.py index 4a7d077d0..4e7c701c0 100644 --- a/zaqar/notification/notifier.py +++ b/zaqar/notification/notifier.py @@ -40,10 +40,13 @@ class NotifierDriver(object): for sub in next(subscribers): s_type = urllib_parse.urlparse(sub['subscriber']).scheme - + data_driver = self.subscription_controller.driver + conf = (getattr(data_driver, 'conf', None) or + getattr(data_driver, '_conf')) mgr = driver.DriverManager('zaqar.notification.tasks', s_type, invoke_on_load=True) - self.executor.submit(mgr.driver.execute, sub, messages) + self.executor.submit(mgr.driver.execute, sub, messages, + conf=conf) else: LOG.error('Failed to get subscription controller.') diff --git a/zaqar/notification/task/mailto.py b/zaqar/notification/task/mailto.py new file mode 100644 index 000000000..99b65f4d4 --- /dev/null +++ b/zaqar/notification/task/mailto.py @@ -0,0 +1,50 @@ +# Copyright (c) 2015 Catalyst IT Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from email.mime import text +import json +from six.moves import urllib_parse +import subprocess + +from oslo_log import log as logging + +from zaqar.i18n import _LE + +LOG = logging.getLogger(__name__) + + +class MailtoTask(object): + + def execute(self, subscription, messages, **kwargs): + subscriber = urllib_parse.urlparse(subscription['subscriber']) + params = urllib_parse.parse_qs(subscriber.query) + params = dict((k.lower(), v) for k, v in params.items()) + conf = kwargs.get('conf') + try: + for message in messages: + p = subprocess.Popen(conf.notification.smtp_command.split(' '), + stdin=subprocess.PIPE) + msg = text.MIMEText(json.dumps(message)) + msg["to"] = subscriber.path + msg["from"] = subscription['options'].get('from', '') + subject_opt = subscription['options'].get('subject', '') + msg["subject"] = params.get('subject', subject_opt) + p.communicate(msg.as_string()) + except OSError as err: + LOG.error(_LE('Failed to create process for sendmail, ' + 'because %s') % str(err)) + except Exception as exc: + LOG.exception(_LE('Failed to send email')) + LOG.exception(exc) diff --git a/zaqar/storage/redis/subscriptions.py b/zaqar/storage/redis/subscriptions.py index 7cef7d73c..b7b40cf66 100644 --- a/zaqar/storage/redis/subscriptions.py +++ b/zaqar/storage/redis/subscriptions.py @@ -13,6 +13,7 @@ # the License. import functools +import json import uuid import msgpack @@ -78,7 +79,7 @@ class SubscriptionController(base.Subscription): 'source': record[0], 'subscriber': record[1], 'ttl': record[2], - 'options': record[3], + 'options': json.loads(record[3]), } marker_next['next'] = sid diff --git a/zaqar/tests/unit/notification/test_notifier.py b/zaqar/tests/unit/notification/test_notifier.py index fec31a69f..1be9b8fbf 100644 --- a/zaqar/tests/unit/notification/test_notifier.py +++ b/zaqar/tests/unit/notification/test_notifier.py @@ -26,10 +26,6 @@ class NotifierTest(testing.TestBase): def setUp(self): super(NotifierTest, self).setUp() - self.subscription = [{'subscriber': 'http://trigger.me'}, - {'subscriber': 'http://call.me'}, - {'subscriber': 'http://ping.me'} - ] self.client_id = uuid.uuid4() self.project = uuid.uuid4() self.messages = [{"ttl": 300, @@ -42,38 +38,82 @@ class NotifierTest(testing.TestBase): } ] + def test_webhook(self): + subscription = [{'subscriber': 'http://trigger_me'}, + {'subscriber': 'http://call_me'}, + {'subscriber': 'http://ping_me'}] ctlr = mock.MagicMock() - ctlr.list = mock.Mock(return_value=iter([self.subscription])) - self.driver = notifier.NotifierDriver(subscription_controller=ctlr) - - def test_post(self): + ctlr.list = mock.Mock(return_value=iter([subscription])) + driver = notifier.NotifierDriver(subscription_controller=ctlr) headers = {'Content-Type': 'application/json'} with mock.patch('requests.post') as mock_post: - self.driver.post('fake_queue', self.messages, - self.client_id, self.project) - self.driver.executor.shutdown() + driver.post('fake_queue', self.messages, self.client_id, + self.project) + driver.executor.shutdown() mock_post.assert_has_calls([ - mock.call(self.subscription[0]['subscriber'], + mock.call(subscription[0]['subscriber'], data=json.dumps(self.messages[0]), headers=headers), - mock.call(self.subscription[1]['subscriber'], + mock.call(subscription[1]['subscriber'], data=json.dumps(self.messages[0]), headers=headers), - mock.call(self.subscription[2]['subscriber'], + mock.call(subscription[2]['subscriber'], data=json.dumps(self.messages[0]), headers=headers), - mock.call(self.subscription[0]['subscriber'], + mock.call(subscription[0]['subscriber'], data=json.dumps(self.messages[1]), headers=headers), - mock.call(self.subscription[1]['subscriber'], + mock.call(subscription[1]['subscriber'], data=json.dumps(self.messages[1]), headers=headers), - mock.call(self.subscription[2]['subscriber'], + mock.call(subscription[2]['subscriber'], data=json.dumps(self.messages[1]), headers=headers), ], any_order=True) self.assertEqual(6, len(mock_post.mock_calls)) + @mock.patch('subprocess.Popen') + def test_mailto(self, mock_popen): + subscription = [{'subscriber': 'mailto:aaa@example.com', + 'options': {'subject': 'Hello', + 'from': 'zaqar@example.com'}}, + {'subscriber': 'mailto:bbb@example.com', + 'options': {'subject': 'Hello', + 'from': 'zaqar@example.com'}}] + ctlr = mock.MagicMock() + ctlr.list = mock.Mock(return_value=iter([subscription])) + driver = notifier.NotifierDriver(subscription_controller=ctlr) + called = set() + msg = ('Content-Type: text/plain; charset="us-ascii"\n' + 'MIME-Version: 1.0\nContent-Transfer-Encoding: 7bit\nto:' + ' %(to)s\nfrom: %(from)s\nsubject: %(subject)s\n\n%(body)s') + + mail1 = msg % {'to': subscription[0]['subscriber'][7:], + 'from': 'zaqar@example.com', 'subject': 'Hello', + 'body': json.dumps(self.messages[0])} + mail2 = msg % {'to': subscription[0]['subscriber'][7:], + 'from': 'zaqar@example.com', 'subject': 'Hello', + 'body': json.dumps(self.messages[1])} + mail3 = msg % {'to': subscription[1]['subscriber'][7:], + 'from': 'zaqar@example.com', 'subject': 'Hello', + 'body': json.dumps(self.messages[0])} + mail4 = msg % {'to': subscription[1]['subscriber'][7:], + 'from': 'zaqar@example.com', 'subject': 'Hello', + 'body': json.dumps(self.messages[1])} + + def _communicate(msg): + called.add(msg) + + mock_process = mock.Mock() + attrs = {'communicate': _communicate} + mock_process.configure_mock(**attrs) + mock_popen.return_value = mock_process + driver.post('fake_queue', self.messages, self.client_id, self.project) + driver.executor.shutdown() + + self.assertEqual(4, len(called)) + self.assertEqual(called, {mail1, mail2, mail3, mail4}) + def test_post_no_subscriber(self): ctlr = mock.MagicMock() ctlr.list = mock.Mock(return_value=iter([[]])) @@ -81,4 +121,5 @@ class NotifierTest(testing.TestBase): with mock.patch('requests.post') as mock_post: driver.post('fake_queue', self.messages, self.client_id, self.project) + driver.executor.shutdown() self.assertEqual(0, mock_post.call_count) diff --git a/zaqar/transport/validation.py b/zaqar/transport/validation.py index be8d40ffc..40cfc6825 100644 --- a/zaqar/transport/validation.py +++ b/zaqar/transport/validation.py @@ -72,7 +72,7 @@ _TRANSPORT_LIMITS_OPTIONS = ( deprecated_group='limits:transport', help='Defines the maximum message grace period in seconds.'), - cfg.ListOpt('subscriber_types', default=['http', 'https'], + cfg.ListOpt('subscriber_types', default=['http', 'https', 'mailto'], help='Defines supported subscriber types.'), )