Add email driver for notification
DocImpact blueprint email-notification Change-Id: Ib09f7d3b47c3a4479da7fb9b9e2acceb691975ac
This commit is contained in:
parent
d73aaf1d91
commit
8ce9d3eaef
@ -76,6 +76,7 @@ zaqar.storage.redis.driver.queue.stages =
|
|||||||
zaqar.notification.tasks =
|
zaqar.notification.tasks =
|
||||||
http = zaqar.notification.task.webhook:WebhookTask
|
http = zaqar.notification.task.webhook:WebhookTask
|
||||||
https = zaqar.notification.task.webhook:WebhookTask
|
https = zaqar.notification.task.webhook:WebhookTask
|
||||||
|
mailto = zaqar.notification.task.mailto:MailtoTask
|
||||||
|
|
||||||
[nosetests]
|
[nosetests]
|
||||||
where=zaqar/tests
|
where=zaqar/tests
|
||||||
|
@ -54,7 +54,17 @@ _SIGNED_URL_OPTIONS = (
|
|||||||
_SIGNED_URL_GROUP = 'signed_url'
|
_SIGNED_URL_GROUP = 'signed_url'
|
||||||
|
|
||||||
|
|
||||||
|
_NOTIFICATION_OPTIONS = (
|
||||||
|
cfg.StrOpt('smtp_command', default='/usr/sbin/sendmail -t -oi',
|
||||||
|
help=('The command of smtp to send email. The format is '
|
||||||
|
'"command_name arg1 arg2".')),
|
||||||
|
)
|
||||||
|
|
||||||
|
_NOTIFICATION_GROUP = 'notification'
|
||||||
|
|
||||||
|
|
||||||
def _config_options():
|
def _config_options():
|
||||||
return [(None, _GENERAL_OPTIONS),
|
return [(None, _GENERAL_OPTIONS),
|
||||||
(_DRIVER_GROUP, _DRIVER_OPTIONS),
|
(_DRIVER_GROUP, _DRIVER_OPTIONS),
|
||||||
(_SIGNED_URL_GROUP, _SIGNED_URL_OPTIONS)]
|
(_SIGNED_URL_GROUP, _SIGNED_URL_OPTIONS),
|
||||||
|
(_NOTIFICATION_GROUP, _NOTIFICATION_OPTIONS)]
|
||||||
|
@ -40,10 +40,13 @@ class NotifierDriver(object):
|
|||||||
|
|
||||||
for sub in next(subscribers):
|
for sub in next(subscribers):
|
||||||
s_type = urllib_parse.urlparse(sub['subscriber']).scheme
|
s_type = urllib_parse.urlparse(sub['subscriber']).scheme
|
||||||
|
data_driver = self.subscription_controller.driver
|
||||||
|
conf = (getattr(data_driver, 'conf', None) or
|
||||||
|
getattr(data_driver, '_conf'))
|
||||||
mgr = driver.DriverManager('zaqar.notification.tasks',
|
mgr = driver.DriverManager('zaqar.notification.tasks',
|
||||||
s_type,
|
s_type,
|
||||||
invoke_on_load=True)
|
invoke_on_load=True)
|
||||||
self.executor.submit(mgr.driver.execute, sub, messages)
|
self.executor.submit(mgr.driver.execute, sub, messages,
|
||||||
|
conf=conf)
|
||||||
else:
|
else:
|
||||||
LOG.error('Failed to get subscription controller.')
|
LOG.error('Failed to get subscription controller.')
|
||||||
|
50
zaqar/notification/task/mailto.py
Normal file
50
zaqar/notification/task/mailto.py
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
# Copyright (c) 2015 Catalyst IT Ltd
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from email.mime import text
|
||||||
|
import json
|
||||||
|
from six.moves import urllib_parse
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
from oslo_log import log as logging
|
||||||
|
|
||||||
|
from zaqar.i18n import _LE
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class MailtoTask(object):
|
||||||
|
|
||||||
|
def execute(self, subscription, messages, **kwargs):
|
||||||
|
subscriber = urllib_parse.urlparse(subscription['subscriber'])
|
||||||
|
params = urllib_parse.parse_qs(subscriber.query)
|
||||||
|
params = dict((k.lower(), v) for k, v in params.items())
|
||||||
|
conf = kwargs.get('conf')
|
||||||
|
try:
|
||||||
|
for message in messages:
|
||||||
|
p = subprocess.Popen(conf.notification.smtp_command.split(' '),
|
||||||
|
stdin=subprocess.PIPE)
|
||||||
|
msg = text.MIMEText(json.dumps(message))
|
||||||
|
msg["to"] = subscriber.path
|
||||||
|
msg["from"] = subscription['options'].get('from', '')
|
||||||
|
subject_opt = subscription['options'].get('subject', '')
|
||||||
|
msg["subject"] = params.get('subject', subject_opt)
|
||||||
|
p.communicate(msg.as_string())
|
||||||
|
except OSError as err:
|
||||||
|
LOG.error(_LE('Failed to create process for sendmail, '
|
||||||
|
'because %s') % str(err))
|
||||||
|
except Exception as exc:
|
||||||
|
LOG.exception(_LE('Failed to send email'))
|
||||||
|
LOG.exception(exc)
|
@ -13,6 +13,7 @@
|
|||||||
# the License.
|
# the License.
|
||||||
|
|
||||||
import functools
|
import functools
|
||||||
|
import json
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
import msgpack
|
import msgpack
|
||||||
@ -78,7 +79,7 @@ class SubscriptionController(base.Subscription):
|
|||||||
'source': record[0],
|
'source': record[0],
|
||||||
'subscriber': record[1],
|
'subscriber': record[1],
|
||||||
'ttl': record[2],
|
'ttl': record[2],
|
||||||
'options': record[3],
|
'options': json.loads(record[3]),
|
||||||
}
|
}
|
||||||
marker_next['next'] = sid
|
marker_next['next'] = sid
|
||||||
|
|
||||||
|
@ -26,10 +26,6 @@ class NotifierTest(testing.TestBase):
|
|||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(NotifierTest, self).setUp()
|
super(NotifierTest, self).setUp()
|
||||||
self.subscription = [{'subscriber': 'http://trigger.me'},
|
|
||||||
{'subscriber': 'http://call.me'},
|
|
||||||
{'subscriber': 'http://ping.me'}
|
|
||||||
]
|
|
||||||
self.client_id = uuid.uuid4()
|
self.client_id = uuid.uuid4()
|
||||||
self.project = uuid.uuid4()
|
self.project = uuid.uuid4()
|
||||||
self.messages = [{"ttl": 300,
|
self.messages = [{"ttl": 300,
|
||||||
@ -42,38 +38,82 @@ class NotifierTest(testing.TestBase):
|
|||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
|
||||||
|
def test_webhook(self):
|
||||||
|
subscription = [{'subscriber': 'http://trigger_me'},
|
||||||
|
{'subscriber': 'http://call_me'},
|
||||||
|
{'subscriber': 'http://ping_me'}]
|
||||||
ctlr = mock.MagicMock()
|
ctlr = mock.MagicMock()
|
||||||
ctlr.list = mock.Mock(return_value=iter([self.subscription]))
|
ctlr.list = mock.Mock(return_value=iter([subscription]))
|
||||||
self.driver = notifier.NotifierDriver(subscription_controller=ctlr)
|
driver = notifier.NotifierDriver(subscription_controller=ctlr)
|
||||||
|
|
||||||
def test_post(self):
|
|
||||||
headers = {'Content-Type': 'application/json'}
|
headers = {'Content-Type': 'application/json'}
|
||||||
with mock.patch('requests.post') as mock_post:
|
with mock.patch('requests.post') as mock_post:
|
||||||
self.driver.post('fake_queue', self.messages,
|
driver.post('fake_queue', self.messages, self.client_id,
|
||||||
self.client_id, self.project)
|
self.project)
|
||||||
self.driver.executor.shutdown()
|
driver.executor.shutdown()
|
||||||
mock_post.assert_has_calls([
|
mock_post.assert_has_calls([
|
||||||
mock.call(self.subscription[0]['subscriber'],
|
mock.call(subscription[0]['subscriber'],
|
||||||
data=json.dumps(self.messages[0]),
|
data=json.dumps(self.messages[0]),
|
||||||
headers=headers),
|
headers=headers),
|
||||||
mock.call(self.subscription[1]['subscriber'],
|
mock.call(subscription[1]['subscriber'],
|
||||||
data=json.dumps(self.messages[0]),
|
data=json.dumps(self.messages[0]),
|
||||||
headers=headers),
|
headers=headers),
|
||||||
mock.call(self.subscription[2]['subscriber'],
|
mock.call(subscription[2]['subscriber'],
|
||||||
data=json.dumps(self.messages[0]),
|
data=json.dumps(self.messages[0]),
|
||||||
headers=headers),
|
headers=headers),
|
||||||
mock.call(self.subscription[0]['subscriber'],
|
mock.call(subscription[0]['subscriber'],
|
||||||
data=json.dumps(self.messages[1]),
|
data=json.dumps(self.messages[1]),
|
||||||
headers=headers),
|
headers=headers),
|
||||||
mock.call(self.subscription[1]['subscriber'],
|
mock.call(subscription[1]['subscriber'],
|
||||||
data=json.dumps(self.messages[1]),
|
data=json.dumps(self.messages[1]),
|
||||||
headers=headers),
|
headers=headers),
|
||||||
mock.call(self.subscription[2]['subscriber'],
|
mock.call(subscription[2]['subscriber'],
|
||||||
data=json.dumps(self.messages[1]),
|
data=json.dumps(self.messages[1]),
|
||||||
headers=headers),
|
headers=headers),
|
||||||
], any_order=True)
|
], any_order=True)
|
||||||
self.assertEqual(6, len(mock_post.mock_calls))
|
self.assertEqual(6, len(mock_post.mock_calls))
|
||||||
|
|
||||||
|
@mock.patch('subprocess.Popen')
|
||||||
|
def test_mailto(self, mock_popen):
|
||||||
|
subscription = [{'subscriber': 'mailto:aaa@example.com',
|
||||||
|
'options': {'subject': 'Hello',
|
||||||
|
'from': 'zaqar@example.com'}},
|
||||||
|
{'subscriber': 'mailto:bbb@example.com',
|
||||||
|
'options': {'subject': 'Hello',
|
||||||
|
'from': 'zaqar@example.com'}}]
|
||||||
|
ctlr = mock.MagicMock()
|
||||||
|
ctlr.list = mock.Mock(return_value=iter([subscription]))
|
||||||
|
driver = notifier.NotifierDriver(subscription_controller=ctlr)
|
||||||
|
called = set()
|
||||||
|
msg = ('Content-Type: text/plain; charset="us-ascii"\n'
|
||||||
|
'MIME-Version: 1.0\nContent-Transfer-Encoding: 7bit\nto:'
|
||||||
|
' %(to)s\nfrom: %(from)s\nsubject: %(subject)s\n\n%(body)s')
|
||||||
|
|
||||||
|
mail1 = msg % {'to': subscription[0]['subscriber'][7:],
|
||||||
|
'from': 'zaqar@example.com', 'subject': 'Hello',
|
||||||
|
'body': json.dumps(self.messages[0])}
|
||||||
|
mail2 = msg % {'to': subscription[0]['subscriber'][7:],
|
||||||
|
'from': 'zaqar@example.com', 'subject': 'Hello',
|
||||||
|
'body': json.dumps(self.messages[1])}
|
||||||
|
mail3 = msg % {'to': subscription[1]['subscriber'][7:],
|
||||||
|
'from': 'zaqar@example.com', 'subject': 'Hello',
|
||||||
|
'body': json.dumps(self.messages[0])}
|
||||||
|
mail4 = msg % {'to': subscription[1]['subscriber'][7:],
|
||||||
|
'from': 'zaqar@example.com', 'subject': 'Hello',
|
||||||
|
'body': json.dumps(self.messages[1])}
|
||||||
|
|
||||||
|
def _communicate(msg):
|
||||||
|
called.add(msg)
|
||||||
|
|
||||||
|
mock_process = mock.Mock()
|
||||||
|
attrs = {'communicate': _communicate}
|
||||||
|
mock_process.configure_mock(**attrs)
|
||||||
|
mock_popen.return_value = mock_process
|
||||||
|
driver.post('fake_queue', self.messages, self.client_id, self.project)
|
||||||
|
driver.executor.shutdown()
|
||||||
|
|
||||||
|
self.assertEqual(4, len(called))
|
||||||
|
self.assertEqual(called, {mail1, mail2, mail3, mail4})
|
||||||
|
|
||||||
def test_post_no_subscriber(self):
|
def test_post_no_subscriber(self):
|
||||||
ctlr = mock.MagicMock()
|
ctlr = mock.MagicMock()
|
||||||
ctlr.list = mock.Mock(return_value=iter([[]]))
|
ctlr.list = mock.Mock(return_value=iter([[]]))
|
||||||
@ -81,4 +121,5 @@ class NotifierTest(testing.TestBase):
|
|||||||
with mock.patch('requests.post') as mock_post:
|
with mock.patch('requests.post') as mock_post:
|
||||||
driver.post('fake_queue', self.messages, self.client_id,
|
driver.post('fake_queue', self.messages, self.client_id,
|
||||||
self.project)
|
self.project)
|
||||||
|
driver.executor.shutdown()
|
||||||
self.assertEqual(0, mock_post.call_count)
|
self.assertEqual(0, mock_post.call_count)
|
||||||
|
@ -72,7 +72,7 @@ _TRANSPORT_LIMITS_OPTIONS = (
|
|||||||
deprecated_group='limits:transport',
|
deprecated_group='limits:transport',
|
||||||
help='Defines the maximum message grace period in seconds.'),
|
help='Defines the maximum message grace period in seconds.'),
|
||||||
|
|
||||||
cfg.ListOpt('subscriber_types', default=['http', 'https'],
|
cfg.ListOpt('subscriber_types', default=['http', 'https', 'mailto'],
|
||||||
help='Defines supported subscriber types.'),
|
help='Defines supported subscriber types.'),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user