From e8b4375bb62286af9eafaa105342db6c5c8ee41c Mon Sep 17 00:00:00 2001 From: Douglas Henrique Koerich Date: Fri, 11 Nov 2022 16:05:19 -0300 Subject: [PATCH] Avoid duplicate subscriptions for PTP notification Currently both v1 and v2 of PTP notification APIs have no check for existing subscription when responding to a new request, then creating a duplicated one. The (undesirable) side effect of that is the multiple notification messages for the same application. Test Plan: PASS: Submitting consecutive POST requests to APIs v1 & v2. PASS: For v2, issued consecutive POST requests using '.' and the actual nodename. Closes-Bug: #1996233 Signed-off-by: Douglas Henrique Koerich Change-Id: Ia15476411ba493b2c1168757c4b01d1a7f168b38 --- .../common/helpers/subscription_helper.py | 53 ++++-- .../exception/client_exception.py | 16 +- .../notificationclientsdk/services/ptp.py | 153 ++++++++++++------ .../sidecar/controllers/v1/subscriptions.py | 77 ++++----- .../sidecar/controllers/v2/subscriptions.py | 60 +++---- 5 files changed, 228 insertions(+), 131 deletions(-) diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/subscription_helper.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/subscription_helper.py index 25e1621..01c5334 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/subscription_helper.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/subscription_helper.py @@ -26,7 +26,8 @@ def notify(subscriptioninfo, notification, timeout=2, retry=3): data = format_notification_data(subscriptioninfo, notification) data = json.dumps(data) url = subscriptioninfo.EndpointUri - response = requests.post(url, data=data, headers=headers, timeout=timeout) + response = requests.post(url, data=data, headers=headers, + timeout=timeout) response.raise_for_status() result = True return response @@ -55,36 +56,43 @@ def notify(subscriptioninfo, notification, timeout=2, retry=3): def format_notification_data(subscriptioninfo, notification): if hasattr(subscriptioninfo, 'ResourceType'): - LOG.debug("format_notification_data: Found v1 subscription, no formatting required.") + LOG.debug("format_notification_data: Found v1 subscription, " + "no formatting required.") return notification elif hasattr(subscriptioninfo, 'ResourceAddress'): - _, _, resource_path, _, _ = parse_resource_address(subscriptioninfo.ResourceAddress) - resource_mapped_value = constants.RESOURCE_ADDRESS_MAPPINGS[resource_path] + _, _, resource_path, _, _ = parse_resource_address( + subscriptioninfo.ResourceAddress) + resource_mapped_value = constants.RESOURCE_ADDRESS_MAPPINGS[ + resource_path] formatted_notification = {resource_mapped_value: []} for instance in notification: # Add the instance identifier to ResourceAddress for PTP lock-state # and PTP clockClass - if notification[instance]['source'] in [constants.SOURCE_SYNC_PTP_CLOCK_CLASS, - constants.SOURCE_SYNC_PTP_LOCK_STATE]: - temp_values = notification[instance].get('data', {}).get('values', []) + if notification[instance]['source'] in [ + constants.SOURCE_SYNC_PTP_CLOCK_CLASS, + constants.SOURCE_SYNC_PTP_LOCK_STATE]: + temp_values = notification[instance].get('data', {}).get( + 'values', []) resource_address = temp_values[0].get('ResourceAddress', None) if instance not in resource_address: add_instance_name = resource_address.split('/', 3) add_instance_name.insert(3, instance) resource_address = '/'.join(add_instance_name) - notification[instance]['data']['values'][0]['ResourceAddress'] = resource_address - formatted_notification[resource_mapped_value].append(notification[instance]) + notification[instance]['data']['values'][0][ + 'ResourceAddress'] = resource_address + formatted_notification[resource_mapped_value].append( + notification[instance]) for instance in formatted_notification[resource_mapped_value]: this_delivery_time = instance['time'] if type(this_delivery_time) != str: - format_time = datetime.fromtimestamp(float(this_delivery_time)).\ - strftime('%Y-%m-%dT%H:%M:%S%fZ') + format_time = datetime.fromtimestamp( + float(this_delivery_time)).strftime('%Y-%m-%dT%H:%M:%S%fZ') instance['time'] = format_time else: - raise Exception("format_notification_data: No valid source address found in notification") - LOG.debug( - "format_notification_data: Added parent key for client consumption: %s" % - formatted_notification) + raise Exception("format_notification_data: No valid source " + "address found in notification") + LOG.debug("format_notification_data: Added parent key for client " + "consumption: %s" % formatted_notification) return formatted_notification @@ -100,10 +108,23 @@ def parse_resource_address(resource_address): resource_path = resource_path.replace(remove_optional, '') resource_address = resource_address.replace(remove_optional, '') optional = resource_list[0] - LOG.debug("Optional hierarchy found when parsing resource address: %s" % optional) + LOG.debug("Optional hierarchy found when parsing resource address: %s" + % optional) else: optional = None # resource_address is the full address without any optional hierarchy # resource_path is the specific identifier for the resource return clusterName, nodeName, resource_path, optional, resource_address + + +def set_nodename_in_resource_address(resource_address, nodename): + # The format of resource address is: + # /{clusterName}/{siteName}(/optional/hierarchy/..)/{nodeName}/{resource} + cluster, _, path, optional, _ = parse_resource_address( + resource_address) + resource_address = '/' + cluster + '/' + nodename + if optional: + resource_address += '/' + optional + resource_address += path + return resource_address diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/exception/client_exception.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/exception/client_exception.py index 3992d2d..22e7b7b 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/exception/client_exception.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/exception/client_exception.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2021 Wind River Systems, Inc. +# Copyright (c) 2021-2022 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -7,6 +7,7 @@ class NodeNotAvailable(Exception): def __init__(self, node_name): self.node_name = node_name + def __str__(self): return "Node:{0} not available".format(self.node_name) @@ -15,6 +16,7 @@ class ResourceNotAvailable(Exception): def __init__(self, node_name, resource_type): self.node_name = node_name self.resource_type = resource_type + def __str__(self): return "Resource with type:{0} is not available on node:{1}".format( self.resource_type, self.node_name) @@ -33,4 +35,14 @@ class InvalidSubscription(Exception): self.subscriptioninfo = subscriptioninfo def __str__(self): - return "Subscription is invalid:{0}".format(self.subscriptioninfo.to_dict()) + return "Subscription is invalid:{0}".format( + self.subscriptioninfo.to_dict()) + + +class ServiceError(Exception): + def __init__(self, code, *args): + super().__init__(args) + self.code = code + + def __str__(self): + return str(self.code) diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py index 62a33c4..4fd5a14 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py @@ -8,9 +8,10 @@ import oslo_messaging import logging import json import kombu -from datetime import datetime, timezone +from datetime import datetime -from notificationclientsdk.client.notificationservice import NotificationServiceClient +from notificationclientsdk.client.notificationservice \ + import NotificationServiceClient from notificationclientsdk.common.helpers import subscription_helper from notificationclientsdk.common.helpers import log_helper from notificationclientsdk.common.helpers import constants @@ -18,11 +19,10 @@ from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper from notificationclientsdk.model.dto.resourcetype import ResourceType from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1 from notificationclientsdk.model.dto.subscription import SubscriptionInfoV2 -from notificationclientsdk.model.orm.subscription import Subscription as SubscriptionOrm +from notificationclientsdk.model.orm.subscription \ + import Subscription as SubscriptionOrm from notificationclientsdk.repository.node_repo import NodeRepo from notificationclientsdk.repository.subscription_repo import SubscriptionRepo -from notificationclientsdk.services.daemon import DaemonControl - from notificationclientsdk.exception import client_exception LOG = logging.getLogger(__name__) @@ -39,34 +39,40 @@ class PtpService(object): def __del__(self): del self.subscription_repo self.locationservice_client = None - return def __query_locationinfo(self, broker_name, timeout=5, retry=2): try: - location_info = self.locationservice_client.query_location(broker_name, timeout, retry) - LOG.debug("Pulled location info@{0}:{1}".format(broker_name, location_info)) + location_info = \ + self.locationservice_client.query_location(broker_name, + timeout, retry) + LOG.debug("Pulled location info@{0}:{1}".format( + broker_name, location_info)) return location_info except Exception as ex: - LOG.warning("Failed to query location of node:{0} due to: {1}".format( - broker_name, str(ex))) + LOG.warning("Failed to query location of node:{0} " + "due to: {1}".format(broker_name, str(ex))) raise client_exception.NodeNotAvailable(broker_name) def __get_node_info(self, default_node_name, timeout=5, retry=2): try: nodeinfo_repo = NodeRepo(autocommit=False) - nodeinfo = nodeinfo_repo.get_one(Status=1, NodeName=default_node_name) + nodeinfo = nodeinfo_repo.get_one(Status=1, + NodeName=default_node_name) broker_pod_ip = None supported_resource_types = [] if nodeinfo: broker_pod_ip = nodeinfo.PodIP - supported_resource_types = json.loads(nodeinfo.ResourceTypes or '[]') + supported_resource_types = json.loads( + nodeinfo.ResourceTypes or '[]') if not broker_pod_ip: # try to query the broker ip - location_info = self.__query_locationinfo(default_node_name, timeout, retry) + location_info = self.__query_locationinfo( + default_node_name, timeout, retry) broker_pod_ip = location_info.get("PodIP", None) - supported_resource_types = location_info.get("ResourceTypes", []) + supported_resource_types = location_info.get( + "ResourceTypes", []) return broker_pod_ip, supported_resource_types finally: @@ -74,20 +80,25 @@ class PtpService(object): def query(self, broker_name, resource_address=None, optional=None): default_node_name = NodeInfoHelper.default_node_name(broker_name) - broker_pod_ip, supported_resource_types = self.__get_node_info(default_node_name) + broker_pod_ip, supported_resource_types = self.__get_node_info( + default_node_name) if not broker_pod_ip: - LOG.warning("Node {0} is not available yet".format(default_node_name)) + LOG.warning("Node {0} is not available yet".format( + default_node_name)) raise client_exception.NodeNotAvailable(broker_name) - if not ResourceType.TypePTP in supported_resource_types: + if ResourceType.TypePTP not in supported_resource_types: LOG.warning("Resource {0}@{1} is not available yet".format( ResourceType.TypePTP, default_node_name)) - raise client_exception.ResourceNotAvailable(broker_name, ResourceType.TypePTP) + raise client_exception.ResourceNotAvailable(broker_name, + ResourceType.TypePTP) - return self._query(default_node_name, broker_pod_ip, resource_address, optional) + return self._query(default_node_name, broker_pod_ip, + resource_address, optional) - def _query(self, broker_name, broker_pod_ip, resource_address=None, optional=None): + def _query(self, broker_name, broker_pod_ip, resource_address=None, + optional=None): broker_host = "[{0}]".format(broker_pod_ip) broker_transport_endpoint = "rabbit://{0}:{1}@{2}:{3}".format( self.daemon_control.daemon_context['NOTIFICATION_BROKER_USER'], @@ -99,14 +110,15 @@ class PtpService(object): notificationservice_client = NotificationServiceClient( broker_name, broker_transport_endpoint, broker_pod_ip) resource_status = notificationservice_client.query_resource_status( - ResourceType.TypePTP, timeout=5, retry=10, resource_address=resource_address, - optional=optional) + ResourceType.TypePTP, timeout=5, retry=10, + resource_address=resource_address, optional=optional) return resource_status except oslo_messaging.exceptions.MessagingTimeout as ex: - LOG.warning("ptp status is not available @node {0} due to {1}".format( - broker_name, str(ex))) - raise client_exception.ResourceNotAvailable(broker_name, ResourceType.TypePTP) - except kombu.exceptions.OperationalError as ex: + LOG.warning("ptp status is not available " + "@node {0} due to {1}".format(broker_name, str(ex))) + raise client_exception.ResourceNotAvailable(broker_name, + ResourceType.TypePTP) + except kombu.exceptions.OperationalError: LOG.warning("Node {0} is unreachable yet".format(broker_name)) raise client_exception.NodeNotAvailable(broker_name) finally: @@ -115,54 +127,97 @@ class PtpService(object): del notificationservice_client def add_subscription(self, subscription_dto): - subscription_orm = SubscriptionOrm(**subscription_dto.to_orm()) resource_address = None if hasattr(subscription_dto, 'ResourceAddress'): - _, nodename, _, _, _ = subscription_helper.parse_resource_address(subscription_dto. - ResourceAddress) + _, nodename, _, _, _ = subscription_helper.parse_resource_address( + subscription_dto.ResourceAddress) broker_name = nodename resource_address = subscription_dto.ResourceAddress elif hasattr(subscription_dto, 'ResourceType'): broker_name = subscription_dto.ResourceQualifier.NodeName default_node_name = NodeInfoHelper.default_node_name(broker_name) - broker_pod_ip, supported_resource_types = self.__get_node_info(default_node_name) + broker_pod_ip, supported_resource_types = self.__get_node_info( + default_node_name) if not broker_pod_ip: - LOG.warning("Node {0} is not available yet".format(default_node_name)) + LOG.warning("Node {0} is not available yet".format( + default_node_name)) raise client_exception.NodeNotAvailable(broker_name) - if not ResourceType.TypePTP in supported_resource_types: + if ResourceType.TypePTP not in supported_resource_types: LOG.warning("Resource {0}@{1} is not available yet".format( ResourceType.TypePTP, default_node_name)) - raise client_exception.ResourceNotAvailable(broker_name, ResourceType.TypePTP) + raise client_exception.ResourceNotAvailable(broker_name, + ResourceType.TypePTP) # get initial resource status if default_node_name: - ptpstatus = None - ptpstatus = self._query(default_node_name, - broker_pod_ip, - resource_address, - optional=None) + ptpstatus = self._query(default_node_name, broker_pod_ip, + resource_address, optional=None) LOG.info("initial ptpstatus:{0}".format(ptpstatus)) # construct subscription entry if constants.PTP_V1_KEY in ptpstatus: - timestamp = ptpstatus[constants.PTP_V1_KEY].get('EventTimestamp', None) + timestamp = ptpstatus[constants.PTP_V1_KEY].get( + 'EventTimestamp', None) ptpstatus = ptpstatus[constants.PTP_V1_KEY] else: for item in ptpstatus: timestamp = ptpstatus[item].get('time', None) # Change time from float to ascii format - ptpstatus[item]['time'] = datetime.fromtimestamp(ptpstatus[item]['time']) \ - .strftime('%Y-%m-%dT%H:%M:%S%fZ') + ptpstatus[item]['time'] = datetime.fromtimestamp( + ptpstatus[item]['time']).strftime( + '%Y-%m-%dT%H:%M:%S%fZ') + # avoid duplicated subscription + entry = None + if hasattr(subscription_dto, 'ResourceType'): + version = 1 + resource_qualifier_dto = \ + subscription_dto.ResourceQualifier.to_dict() + LOG.debug('Looking for existing subscription for ' + 'EndpointUri %s ResourceQualifier %s' + % (subscription_dto.EndpointUri, + resource_qualifier_dto)) + entries = self.subscription_repo.get( + EndpointUri=subscription_dto.EndpointUri + ) + for e in entries: + resource_qualifier_json = e.ResourceQualifierJson or '{}' + resource_qualifier_repo = json.loads( + resource_qualifier_json) + if resource_qualifier_dto == resource_qualifier_repo: + entry = e + break + else: + version = 2 + + # Replace eventual '.' in ResourceAddress by the actual + # nodename + subscription_dto.ResourceAddress = \ + subscription_helper.set_nodename_in_resource_address( + subscription_dto.ResourceAddress, default_node_name) + + LOG.debug('Looking for existing subscription for ' + 'EndpointUri %s ResourceAddress %s' + % (subscription_dto.EndpointUri, + subscription_dto.ResourceAddress)) + entry = self.subscription_repo.get_one( + EndpointUri=subscription_dto.EndpointUri, + ResourceAddress=subscription_dto.ResourceAddress + ) + if entry: + LOG.debug('Found existing entry in subscription repo') + raise client_exception.ServiceError(409) + + subscription_orm = SubscriptionOrm(**subscription_dto.to_orm()) subscription_orm.InitialDeliveryTimestamp = timestamp entry = self.subscription_repo.add(subscription_orm) # Delivery the initial notification of ptp status - if hasattr(subscription_dto, 'ResourceType'): + if version == 1: subscription_dto2 = SubscriptionInfoV1(entry) else: subscription_dto2 = SubscriptionInfoV2(entry) @@ -171,15 +226,18 @@ class PtpService(object): subscription_helper.notify(subscription_dto2, ptpstatus) LOG.info("initial ptpstatus is delivered successfully") except Exception as ex: - LOG.warning("initial ptpstatus is not delivered:{0}".format(str(ex))) - raise client_exception.InvalidEndpoint(subscription_dto.EndpointUri) + LOG.warning("initial ptpstatus is not delivered:{0}".format( + str(ex))) + raise client_exception.InvalidEndpoint( + subscription_dto.EndpointUri) try: # commit the subscription entry self.subscription_repo.commit() self.daemon_control.refresh() except Exception as ex: - LOG.warning("subscription is not added successfully:{0}".format(str(ex))) + LOG.warning("subscription is not added successfully:" + "{0}".format(str(ex))) raise ex return subscription_dto2 @@ -191,6 +249,7 @@ class PtpService(object): # 2, refresh daemon self.daemon_control.refresh() except Exception as ex: - LOG.warning("subscription {0} is not deleted due to:{1}/{2}".format( - self.subscriptionid, type(ex), str(ex))) + LOG.warning("subscription {0} is not deleted due to:" + "{1}/{2}".format(self.subscriptionid, type(ex), + str(ex))) raise ex diff --git a/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v1/subscriptions.py b/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v1/subscriptions.py index fa4eb4f..962c22f 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v1/subscriptions.py +++ b/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v1/subscriptions.py @@ -1,4 +1,3 @@ -#coding=utf-8 # # Copyright (c) 2021-2022 Wind River Systems, Inc. # @@ -7,14 +6,13 @@ from pecan import conf from pecan import expose, rest, response, abort -from webob.exc import HTTPException, HTTPNotFound, HTTPBadRequest, HTTPClientError, HTTPServerError +from webob.exc import HTTPException, HTTPServerError -import os import logging -from wsme import types as wtypes from wsmeext.pecan import wsexpose +from notificationclientsdk.common.helpers import log_helper from notificationclientsdk.model.dto.resourcetype import ResourceType from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1 @@ -26,11 +24,8 @@ from sidecar.repository.notification_control import notification_control from sidecar.repository.dbcontext_default import defaults LOG = logging.getLogger(__name__) - -from notificationclientsdk.common.helpers import log_helper log_helper.config_logger(LOG) -THIS_NODE_NAME = os.environ.get("THIS_NODE_NAME",'controller-0') class SubscriptionsControllerV1(rest.RestController): @@ -39,43 +34,48 @@ class SubscriptionsControllerV1(rest.RestController): # decode the request body try: if subscription.ResourceType == ResourceType.TypePTP: - LOG.info(' subscribe: {0}, {1} with callback uri {2}'.format( + LOG.info('subscribe: {0}, {1} with callback uri {2}'.format( subscription.ResourceType, subscription.ResourceQualifier.NodeName, subscription.EndpointUri)) else: - LOG.warning(' Subscribe with unsupported ResourceType:{0}'.format( - subscription.ResourceType)) + LOG.warning('Subscribe with unsupported ' + 'ResourceType:{0}'.format( + subscription.ResourceType)) abort(404) if not self._validateV1(subscription): - LOG.warning(' Invalid Request data:{0}'.format(subscription.to_dict())) + LOG.warning('Invalid Request data:{0}'.format( + subscription.to_dict())) abort(400) - subscription.UriLocation = "{0}://{1}:{2}/ocloudNotifications/v1/subscriptions".format( - conf.server.get('protocol','http'), - conf.server.get('host', '127.0.0.1'), - conf.server.get('port', '8080') - ) + subscription.UriLocation = \ + "{0}://{1}:{2}/ocloudNotifications/v1/subscriptions".format( + conf.server.get('protocol', 'http'), + conf.server.get('host', '127.0.0.1'), + conf.server.get('port', '8080') + ) if subscription.ResourceType == ResourceType.TypePTP: ptpservice = PtpService(notification_control) - entry = ptpservice.add_subscription(subscription) - del ptpservice - if not entry: - abort(404) - subscription.SubscriptionId = entry.SubscriptionId - subscription.UriLocation = entry.UriLocation - LOG.info('created subscription: {0}'.format(subscription.to_dict())) + entry = ptpservice.add_subscription(subscription) + subscription.SubscriptionId = entry.SubscriptionId + subscription.UriLocation = entry.UriLocation + LOG.info('created subscription: {0}'.format( + subscription.to_dict())) + + del ptpservice return subscription - except client_exception.InvalidSubscription as ex: + except client_exception.ServiceError as err: + abort(int(str(err))) + except client_exception.InvalidSubscription: abort(400) - except client_exception.InvalidEndpoint as ex: + except client_exception.InvalidEndpoint: abort(400) - except client_exception.NodeNotAvailable as ex: + except client_exception.NodeNotAvailable: abort(404) - except client_exception.ResourceNotAvailable as ex: + except client_exception.ResourceNotAvailable: abort(404) except HTTPException as ex: LOG.warning("Client side error:{0},{1}".format(type(ex), str(ex))) @@ -84,13 +84,14 @@ class SubscriptionsControllerV1(rest.RestController): LOG.error("Server side error:{0},{1}".format(type(ex), str(ex))) abort(500) except Exception as ex: - LOG.error("Exception:{0}@{1}".format(type(ex),str(ex))) + LOG.error("Exception:{0}@{1}".format(type(ex), str(ex))) abort(500) @expose('json') def get(self): try: - repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit = False) + repo = SubscriptionRepo(defaults['dbcontext'].get_session(), + autocommit=False) entries = repo.get(Status=1) response.status = 200 subs = [] @@ -106,7 +107,7 @@ class SubscriptionsControllerV1(rest.RestController): LOG.error("Server side error:{0},{1}".format(type(ex), str(ex))) raise ex except Exception as ex: - LOG.error("Exception:{0}@{1}".format(type(ex),str(ex))) + LOG.error("Exception:{0}@{1}".format(type(ex), str(ex))) abort(500) @expose() @@ -117,9 +118,8 @@ class SubscriptionsControllerV1(rest.RestController): try: assert subscription_request.ResourceType == 'PTP' assert subscription_request.EndpointUri - return True - except: + except Exception: return False @@ -130,7 +130,8 @@ class SubscriptionController(rest.RestController): @expose('json') def get(self): try: - repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit = False) + repo = SubscriptionRepo(defaults['dbcontext'].get_session(), + autocommit=False) entry = repo.get_one(SubscriptionId=self.subscription_id, Status=1) if not entry: @@ -145,23 +146,23 @@ class SubscriptionController(rest.RestController): LOG.error("Server side error:{0},{1}".format(type(ex), str(ex))) raise ex except Exception as ex: - LOG.error("Exception:{0}@{1}".format(type(ex),str(ex))) + LOG.error("Exception:{0}@{1}".format(type(ex), str(ex))) abort(500) @wsexpose(status_code=204) def delete(self): try: - repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit = False) + repo = SubscriptionRepo(defaults['dbcontext'].get_session(), + autocommit=False) entry = repo.get_one(SubscriptionId=self.subscription_id) if entry: if entry.SubscriptionId: ptpservice = PtpService(notification_control) ptpservice.remove_subscription(entry.SubscriptionId) del ptpservice - return else: repo.delete_one(SubscriptionId=self.subscription_id) - return + return abort(404) except HTTPException as ex: LOG.warning("Client side error:{0},{1}".format(type(ex), str(ex))) @@ -170,5 +171,5 @@ class SubscriptionController(rest.RestController): LOG.error("Server side error:{0},{1}".format(type(ex), str(ex))) raise ex except Exception as ex: - LOG.error("Exception:{0}@{1}".format(type(ex),str(ex))) + LOG.error("Exception:{0}@{1}".format(type(ex), str(ex))) abort(500) diff --git a/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/subscriptions.py b/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/subscriptions.py index b44da1f..ddd69a6 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/subscriptions.py +++ b/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/subscriptions.py @@ -6,12 +6,10 @@ from pecan import conf from pecan import expose, rest, response, abort -from webob.exc import HTTPException, HTTPNotFound, HTTPBadRequest, HTTPClientError, HTTPServerError +from webob.exc import HTTPException, HTTPServerError -import os import logging -from wsme import types as wtypes from wsmeext.pecan import wsexpose from notificationclientsdk.model.dto.subscription import SubscriptionInfoV2 @@ -27,8 +25,6 @@ from sidecar.repository.dbcontext_default import defaults LOG = logging.getLogger(__name__) log_helper.config_logger(LOG) -THIS_NODE_NAME = os.environ.get("THIS_NODE_NAME", 'controller-0') - class SubscriptionsControllerV2(rest.RestController): @@ -37,37 +33,43 @@ class SubscriptionsControllerV2(rest.RestController): # decode the request body try: if subscription.ResourceAddress: - LOG.info(' subscribe: ResourceAddress {0} with callback uri {1}'.format( - subscription.ResourceAddress, - subscription.EndpointUri)) + LOG.info('subscribe: ResourceAddress {0} with ' + 'callback uri {1}'.format( + subscription.ResourceAddress, + subscription.EndpointUri)) if not self._validateV2(subscription): - LOG.warning(' Invalid Request data:{0}'.format(subscription.to_dict())) + LOG.warning('Invalid Request data:{0}'.format( + subscription.to_dict())) abort(400) - subscription.UriLocation = "{0}://{1}:{2}/ocloudNotifications/v2/subscriptions".format( - conf.server.get('protocol', 'http'), - conf.server.get('host', '127.0.0.1'), - conf.server.get('port', '8080') - ) + subscription.UriLocation = \ + "{0}://{1}:{2}/ocloudNotifications/v2/subscriptions".format( + conf.server.get('protocol', 'http'), + conf.server.get('host', '127.0.0.1'), + conf.server.get('port', '8080') + ) if subscription.ResourceAddress: ptpservice = PtpService(notification_control) + entry = ptpservice.add_subscription(subscription) + subscription.SubscriptionId = entry.SubscriptionId + subscription.UriLocation = entry.UriLocation + LOG.info('created subscription: {0}'.format( + subscription.to_dict())) + del ptpservice - if not entry: - abort(404) - subscription.SubscriptionId = entry.SubscriptionId - subscription.UriLocation = entry.UriLocation - LOG.info('created subscription: {0}'.format(subscription.to_dict())) return subscription - except client_exception.InvalidSubscription as ex: + except client_exception.ServiceError as err: + abort(int(str(err))) + except client_exception.InvalidSubscription: abort(400) - except client_exception.InvalidEndpoint as ex: + except client_exception.InvalidEndpoint: abort(400) - except client_exception.NodeNotAvailable as ex: + except client_exception.NodeNotAvailable: abort(404) - except client_exception.ResourceNotAvailable as ex: + except client_exception.ResourceNotAvailable: abort(404) except HTTPException as ex: LOG.warning("Client side error:{0},{1}".format(type(ex), str(ex))) @@ -82,7 +84,8 @@ class SubscriptionsControllerV2(rest.RestController): @expose('json') def get(self): try: - repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit=False) + repo = SubscriptionRepo(defaults['dbcontext'].get_session(), + autocommit=False) entries = repo.get(Status=1) response.status = 200 subs = [] @@ -109,9 +112,8 @@ class SubscriptionsControllerV2(rest.RestController): try: assert subscription_request.ResourceAddress assert subscription_request.EndpointUri - return True - except: + except Exception: return False @@ -122,7 +124,8 @@ class SubscriptionController(rest.RestController): @expose('json') def get(self): try: - repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit=False) + repo = SubscriptionRepo(defaults['dbcontext'].get_session(), + autocommit=False) entry = repo.get_one(SubscriptionId=self.subscription_id, Status=1) if not entry: @@ -145,7 +148,8 @@ class SubscriptionController(rest.RestController): @wsexpose(status_code=204) def delete(self): try: - repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit=False) + repo = SubscriptionRepo(defaults['dbcontext'].get_session(), + autocommit=False) entry = repo.get_one(SubscriptionId=self.subscription_id) if entry: if entry.SubscriptionId: