Merge "Explicit Event API"
This commit is contained in:
commit
ead1470858
@ -16,6 +16,7 @@
|
||||
import json
|
||||
from oslo.config import cfg
|
||||
from pecan import request
|
||||
from pecan import response
|
||||
|
||||
from storyboard.common import event_types
|
||||
from storyboard.db.api import base as api_base
|
||||
@ -48,13 +49,15 @@ def event_create(values):
|
||||
new_event = api_base.entity_create(models.TimeLineEvent, values)
|
||||
|
||||
if CONF.enable_notifications:
|
||||
payload = {
|
||||
"author_id": request.current_user_id,
|
||||
"method": "POST",
|
||||
"resource": "timeline_events",
|
||||
"event_id": new_event.id
|
||||
}
|
||||
publish("timeline_events", payload)
|
||||
# Build the payload. Use of None is included to ensure that we don't
|
||||
# accidentally blow up the API call, but we don't anticipate it
|
||||
# happening.
|
||||
publish(author_id=request.current_user_id or None,
|
||||
method="POST",
|
||||
path=request.path or None,
|
||||
status=response.status_code or None,
|
||||
resource="timeline_events",
|
||||
resource_id=new_event.id or None)
|
||||
|
||||
return new_event
|
||||
|
||||
|
@ -31,52 +31,41 @@ class NotificationHook(hooks.PecanHook):
|
||||
return
|
||||
|
||||
request = state.request
|
||||
req_method = request.method
|
||||
req_author_id = request.current_user_id
|
||||
req_path = request.path
|
||||
req_resource_grp = self._parse(req_path)
|
||||
response = state.response
|
||||
|
||||
if not req_resource_grp:
|
||||
# Attempt to determine the type of the payload. This checks for
|
||||
# nested paths.
|
||||
(resource, resource_id, subresource, subresource_id) \
|
||||
= self._parse(request.path)
|
||||
if not resource:
|
||||
return
|
||||
|
||||
resource = req_resource_grp[0]
|
||||
|
||||
if req_resource_grp[1]:
|
||||
resource_id = req_resource_grp[1]
|
||||
else:
|
||||
if state.request.method == 'POST':
|
||||
# When a resource is created..
|
||||
response_str = state.response.body
|
||||
response = json.loads(response_str)
|
||||
if response:
|
||||
resource_id = response.get('id')
|
||||
response_body = json.loads(response.body)
|
||||
if response_body:
|
||||
resource_id = response_body.get('id')
|
||||
else:
|
||||
resource_id = None
|
||||
|
||||
# when adding/removing projects to project_groups..
|
||||
if req_resource_grp[3]:
|
||||
sub_resource_id = req_resource_grp[3]
|
||||
payload = {
|
||||
"author_id": req_author_id,
|
||||
"method": req_method,
|
||||
"resource": resource,
|
||||
"resource_id": resource_id,
|
||||
"sub_resource_id": sub_resource_id
|
||||
}
|
||||
|
||||
else:
|
||||
payload = {
|
||||
"author_id": req_author_id,
|
||||
"method": req_method,
|
||||
"resource": resource,
|
||||
"resource_id": resource_id
|
||||
}
|
||||
|
||||
publish(resource, payload)
|
||||
# Build the payload. Use of None is included to ensure that we don't
|
||||
# accidentally blow up the API call, but we don't anticipate it
|
||||
# happening.
|
||||
publish(author_id=request.current_user_id,
|
||||
method=request.method,
|
||||
path=request.path,
|
||||
status=response.status_code,
|
||||
resource=resource,
|
||||
resource_id=resource_id,
|
||||
sub_resource=subresource,
|
||||
sub_resource_id=subresource_id)
|
||||
|
||||
def _parse(self, s):
|
||||
url_pattern = re.match("^\/v1\/([a-z_]+)\/?([0-9]+)?"
|
||||
"\/?([a-z]+)?\/?([0-9]+)?$", s)
|
||||
if url_pattern and url_pattern.groups()[0] != "openid":
|
||||
return url_pattern.groups()
|
||||
else:
|
||||
return
|
||||
if not url_pattern or url_pattern.groups()[0] == "openid":
|
||||
return None, None, None, None
|
||||
|
||||
groups = url_pattern.groups()
|
||||
|
||||
return groups[0], groups[1], groups[2], groups[3]
|
||||
|
@ -131,13 +131,19 @@ class Payload(object):
|
||||
self.payload = payload
|
||||
|
||||
|
||||
def publish(topic, payload):
|
||||
"""Send a message with a given topic and payload to the storyboard
|
||||
exchange. The message will be automatically JSON encoded.
|
||||
def publish(resource, author_id=None, method=None, path=None, status=None,
|
||||
resource_id=None, sub_resource=None, sub_resource_id=None):
|
||||
"""Send a message for an API event to the storyboard exchange. The message
|
||||
will be automatically JSON encoded.
|
||||
|
||||
:param topic: The RabbitMQ topic.
|
||||
:param payload: The JSON-serializable payload.
|
||||
:return:
|
||||
:param resource: The extrapolated resource type (project, story, etc).
|
||||
:param author_id: The ID of the author who performed this action.
|
||||
:param method: The HTTP Method used.
|
||||
:param path: The HTTP Path used.
|
||||
:param status: The HTTP Status code of the response.
|
||||
:param resource_id: The ID of the resource.
|
||||
:param sub_resource: The extracted subresource (user_token, etc)
|
||||
:param sub_resource_id: THe ID of the subresource.
|
||||
"""
|
||||
global PUBLISHER
|
||||
|
||||
@ -146,4 +152,18 @@ def publish(topic, payload):
|
||||
PUBLISHER = Publisher(CONF.notifications)
|
||||
PUBLISHER.start()
|
||||
|
||||
PUBLISHER.publish_message(topic, payload)
|
||||
payload = {
|
||||
"author_id": author_id,
|
||||
"method": method,
|
||||
"path": path,
|
||||
"status": status,
|
||||
"resource": resource,
|
||||
"resource_id": resource_id,
|
||||
"sub_resource": sub_resource,
|
||||
"sub_resource_id": sub_resource_id
|
||||
}
|
||||
|
||||
if resource:
|
||||
PUBLISHER.publish_message(resource, payload)
|
||||
else:
|
||||
LOG.warning("Attempted to send payload with no destination resource.")
|
||||
|
@ -13,6 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
import time
|
||||
|
||||
from oslo.config import cfg
|
||||
@ -65,7 +66,15 @@ def handle_event(ext, body):
|
||||
:param body: The body of the event.
|
||||
:return: The result of the handler.
|
||||
"""
|
||||
return ext.obj.handle(body)
|
||||
payload = json.loads(body)
|
||||
return ext.obj.handle(author_id=payload['author_id'] or None,
|
||||
method=payload['method'] or None,
|
||||
path=payload['path'] or None,
|
||||
status=payload['status'] or None,
|
||||
resource=payload['resource'] or None,
|
||||
resource_id=payload['resource_id'] or None,
|
||||
sub_resource=payload['sub_resource'] or None,
|
||||
sub_resource_id=payload['sub_resource_id'] or None)
|
||||
|
||||
|
||||
def check_enabled(ext):
|
||||
|
@ -33,5 +33,16 @@ class WorkerTaskBase(object):
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def handle(self, body):
|
||||
"""Handle an event."""
|
||||
def handle(self, author_id, method, path, status, resource, resource_id,
|
||||
sub_resource=None, sub_resource_id=None):
|
||||
"""Handle an event.
|
||||
|
||||
:param author_id: ID of the author's user record.
|
||||
:param method: The HTTP Method.
|
||||
:param path: The full HTTP Path requested.
|
||||
:param status: The returned HTTP Status of the response.
|
||||
:param resource: The resource type.
|
||||
:param resource_id: The ID of the resource.
|
||||
:param sub_resource: The subresource type.
|
||||
:param sub_resource_id: The ID of the subresource.
|
||||
"""
|
||||
|
@ -12,8 +12,6 @@
|
||||
# implied. See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
|
||||
from storyboard.db.api import subscriptions
|
||||
from storyboard.db.api import timeline_events
|
||||
from storyboard.notifications.subscriptions_handler import handle_deletions
|
||||
@ -24,36 +22,38 @@ from storyboard.worker.task.base import WorkerTaskBase
|
||||
|
||||
|
||||
class Subscription(WorkerTaskBase):
|
||||
def handle(self, body):
|
||||
def handle(self, author_id, method, path, status, resource, resource_id,
|
||||
sub_resource=None, sub_resource_id=None):
|
||||
"""This worker handles API events and attempts to determine whether
|
||||
they correspond to user subscriptions.
|
||||
|
||||
:param body: The event message body.
|
||||
:return:
|
||||
:param author_id: ID of the author's user record.
|
||||
:param method: The HTTP Method.
|
||||
:param path: The full HTTP Path requested.
|
||||
:param status: The returned HTTP Status of the response.
|
||||
:param resource: The resource type.
|
||||
:param resource_id: The ID of the resource.
|
||||
:param sub_resource: The subresource type.
|
||||
:param sub_resource_id: The ID of the subresource.
|
||||
"""
|
||||
body_dict = json.loads(body)
|
||||
|
||||
subscribers = subscriptions.subscription_get_all_subscriber_ids(
|
||||
body_dict['resource'], body_dict['resource_id']
|
||||
resource, resource_id
|
||||
)
|
||||
|
||||
if 'event_id' in body_dict:
|
||||
event_id = body_dict['event_id']
|
||||
event = timeline_events.event_get(event_id)
|
||||
handle_timeline_events(event, body_dict['author_id'], subscribers)
|
||||
if resource == 'timeline_events':
|
||||
event = timeline_events.event_get(resource_id)
|
||||
handle_timeline_events(event, author_id, subscribers)
|
||||
|
||||
elif body_dict['resource'] == 'project_groups':
|
||||
handle_resources(method=body_dict['method'],
|
||||
resource_id=body_dict['resource_id'],
|
||||
sub_resource_id=getattr(body_dict,
|
||||
'sub_resource_id', None),
|
||||
author_id=body_dict['author_id'],
|
||||
elif resource == 'project_groups':
|
||||
handle_resources(method=method,
|
||||
resource_id=resource_id,
|
||||
sub_resource_id=sub_resource_id,
|
||||
author_id=author_id,
|
||||
subscribers=subscribers)
|
||||
|
||||
if body_dict['method'] == 'DELETE':
|
||||
resource_name = body_dict['resource']
|
||||
resource_id = body_dict['resource_id']
|
||||
if 'sub_resource_id' not in body_dict:
|
||||
handle_deletions(resource_name, resource_id)
|
||||
if method == 'DELETE' and not sub_resource_id:
|
||||
handle_deletions(resource, resource_id)
|
||||
|
||||
def enabled(self):
|
||||
return True
|
||||
|
Loading…
x
Reference in New Issue
Block a user