Avoid duplicate subscriptions for PTP notification

Currently both v1 and v2 of PTP notification APIs have no check for
existing subscription when responding to a new request, then creating a
duplicated one. The (undesirable) side effect of that is the multiple
notification messages for the same application.

Test Plan:
PASS: Submitting consecutive POST requests to APIs v1 & v2.
PASS: For v2, issued consecutive POST requests using '.' and the actual
      nodename.

Closes-Bug: #1996233
Signed-off-by: Douglas Henrique Koerich <douglashenrique.koerich@windriver.com>
Change-Id: Ia15476411ba493b2c1168757c4b01d1a7f168b38
This commit is contained in:
Douglas Henrique Koerich 2022-11-11 16:05:19 -03:00
parent 3a365e737a
commit e8b4375bb6
5 changed files with 228 additions and 131 deletions

View File

@ -26,7 +26,8 @@ def notify(subscriptioninfo, notification, timeout=2, retry=3):
data = format_notification_data(subscriptioninfo, notification) data = format_notification_data(subscriptioninfo, notification)
data = json.dumps(data) data = json.dumps(data)
url = subscriptioninfo.EndpointUri url = subscriptioninfo.EndpointUri
response = requests.post(url, data=data, headers=headers, timeout=timeout) response = requests.post(url, data=data, headers=headers,
timeout=timeout)
response.raise_for_status() response.raise_for_status()
result = True result = True
return response return response
@ -55,36 +56,43 @@ def notify(subscriptioninfo, notification, timeout=2, retry=3):
def format_notification_data(subscriptioninfo, notification): def format_notification_data(subscriptioninfo, notification):
if hasattr(subscriptioninfo, 'ResourceType'): if hasattr(subscriptioninfo, 'ResourceType'):
LOG.debug("format_notification_data: Found v1 subscription, no formatting required.") LOG.debug("format_notification_data: Found v1 subscription, "
"no formatting required.")
return notification return notification
elif hasattr(subscriptioninfo, 'ResourceAddress'): elif hasattr(subscriptioninfo, 'ResourceAddress'):
_, _, resource_path, _, _ = parse_resource_address(subscriptioninfo.ResourceAddress) _, _, resource_path, _, _ = parse_resource_address(
resource_mapped_value = constants.RESOURCE_ADDRESS_MAPPINGS[resource_path] subscriptioninfo.ResourceAddress)
resource_mapped_value = constants.RESOURCE_ADDRESS_MAPPINGS[
resource_path]
formatted_notification = {resource_mapped_value: []} formatted_notification = {resource_mapped_value: []}
for instance in notification: for instance in notification:
# Add the instance identifier to ResourceAddress for PTP lock-state # Add the instance identifier to ResourceAddress for PTP lock-state
# and PTP clockClass # and PTP clockClass
if notification[instance]['source'] in [constants.SOURCE_SYNC_PTP_CLOCK_CLASS, if notification[instance]['source'] in [
constants.SOURCE_SYNC_PTP_LOCK_STATE]: constants.SOURCE_SYNC_PTP_CLOCK_CLASS,
temp_values = notification[instance].get('data', {}).get('values', []) constants.SOURCE_SYNC_PTP_LOCK_STATE]:
temp_values = notification[instance].get('data', {}).get(
'values', [])
resource_address = temp_values[0].get('ResourceAddress', None) resource_address = temp_values[0].get('ResourceAddress', None)
if instance not in resource_address: if instance not in resource_address:
add_instance_name = resource_address.split('/', 3) add_instance_name = resource_address.split('/', 3)
add_instance_name.insert(3, instance) add_instance_name.insert(3, instance)
resource_address = '/'.join(add_instance_name) resource_address = '/'.join(add_instance_name)
notification[instance]['data']['values'][0]['ResourceAddress'] = resource_address notification[instance]['data']['values'][0][
formatted_notification[resource_mapped_value].append(notification[instance]) 'ResourceAddress'] = resource_address
formatted_notification[resource_mapped_value].append(
notification[instance])
for instance in formatted_notification[resource_mapped_value]: for instance in formatted_notification[resource_mapped_value]:
this_delivery_time = instance['time'] this_delivery_time = instance['time']
if type(this_delivery_time) != str: if type(this_delivery_time) != str:
format_time = datetime.fromtimestamp(float(this_delivery_time)).\ format_time = datetime.fromtimestamp(
strftime('%Y-%m-%dT%H:%M:%S%fZ') float(this_delivery_time)).strftime('%Y-%m-%dT%H:%M:%S%fZ')
instance['time'] = format_time instance['time'] = format_time
else: else:
raise Exception("format_notification_data: No valid source address found in notification") raise Exception("format_notification_data: No valid source "
LOG.debug( "address found in notification")
"format_notification_data: Added parent key for client consumption: %s" % LOG.debug("format_notification_data: Added parent key for client "
formatted_notification) "consumption: %s" % formatted_notification)
return formatted_notification return formatted_notification
@ -100,10 +108,23 @@ def parse_resource_address(resource_address):
resource_path = resource_path.replace(remove_optional, '') resource_path = resource_path.replace(remove_optional, '')
resource_address = resource_address.replace(remove_optional, '') resource_address = resource_address.replace(remove_optional, '')
optional = resource_list[0] optional = resource_list[0]
LOG.debug("Optional hierarchy found when parsing resource address: %s" % optional) LOG.debug("Optional hierarchy found when parsing resource address: %s"
% optional)
else: else:
optional = None optional = None
# resource_address is the full address without any optional hierarchy # resource_address is the full address without any optional hierarchy
# resource_path is the specific identifier for the resource # resource_path is the specific identifier for the resource
return clusterName, nodeName, resource_path, optional, resource_address return clusterName, nodeName, resource_path, optional, resource_address
def set_nodename_in_resource_address(resource_address, nodename):
# The format of resource address is:
# /{clusterName}/{siteName}(/optional/hierarchy/..)/{nodeName}/{resource}
cluster, _, path, optional, _ = parse_resource_address(
resource_address)
resource_address = '/' + cluster + '/' + nodename
if optional:
resource_address += '/' + optional
resource_address += path
return resource_address

View File

@ -1,5 +1,5 @@
# #
# Copyright (c) 2021 Wind River Systems, Inc. # Copyright (c) 2021-2022 Wind River Systems, Inc.
# #
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
# #
@ -7,6 +7,7 @@
class NodeNotAvailable(Exception): class NodeNotAvailable(Exception):
def __init__(self, node_name): def __init__(self, node_name):
self.node_name = node_name self.node_name = node_name
def __str__(self): def __str__(self):
return "Node:{0} not available".format(self.node_name) return "Node:{0} not available".format(self.node_name)
@ -15,6 +16,7 @@ class ResourceNotAvailable(Exception):
def __init__(self, node_name, resource_type): def __init__(self, node_name, resource_type):
self.node_name = node_name self.node_name = node_name
self.resource_type = resource_type self.resource_type = resource_type
def __str__(self): def __str__(self):
return "Resource with type:{0} is not available on node:{1}".format( return "Resource with type:{0} is not available on node:{1}".format(
self.resource_type, self.node_name) self.resource_type, self.node_name)
@ -33,4 +35,14 @@ class InvalidSubscription(Exception):
self.subscriptioninfo = subscriptioninfo self.subscriptioninfo = subscriptioninfo
def __str__(self): def __str__(self):
return "Subscription is invalid:{0}".format(self.subscriptioninfo.to_dict()) return "Subscription is invalid:{0}".format(
self.subscriptioninfo.to_dict())
class ServiceError(Exception):
def __init__(self, code, *args):
super().__init__(args)
self.code = code
def __str__(self):
return str(self.code)

View File

@ -8,9 +8,10 @@ import oslo_messaging
import logging import logging
import json import json
import kombu import kombu
from datetime import datetime, timezone from datetime import datetime
from notificationclientsdk.client.notificationservice import NotificationServiceClient from notificationclientsdk.client.notificationservice \
import NotificationServiceClient
from notificationclientsdk.common.helpers import subscription_helper from notificationclientsdk.common.helpers import subscription_helper
from notificationclientsdk.common.helpers import log_helper from notificationclientsdk.common.helpers import log_helper
from notificationclientsdk.common.helpers import constants from notificationclientsdk.common.helpers import constants
@ -18,11 +19,10 @@ from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper
from notificationclientsdk.model.dto.resourcetype import ResourceType from notificationclientsdk.model.dto.resourcetype import ResourceType
from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1 from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1
from notificationclientsdk.model.dto.subscription import SubscriptionInfoV2 from notificationclientsdk.model.dto.subscription import SubscriptionInfoV2
from notificationclientsdk.model.orm.subscription import Subscription as SubscriptionOrm from notificationclientsdk.model.orm.subscription \
import Subscription as SubscriptionOrm
from notificationclientsdk.repository.node_repo import NodeRepo from notificationclientsdk.repository.node_repo import NodeRepo
from notificationclientsdk.repository.subscription_repo import SubscriptionRepo from notificationclientsdk.repository.subscription_repo import SubscriptionRepo
from notificationclientsdk.services.daemon import DaemonControl
from notificationclientsdk.exception import client_exception from notificationclientsdk.exception import client_exception
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -39,34 +39,40 @@ class PtpService(object):
def __del__(self): def __del__(self):
del self.subscription_repo del self.subscription_repo
self.locationservice_client = None self.locationservice_client = None
return
def __query_locationinfo(self, broker_name, timeout=5, retry=2): def __query_locationinfo(self, broker_name, timeout=5, retry=2):
try: try:
location_info = self.locationservice_client.query_location(broker_name, timeout, retry) location_info = \
LOG.debug("Pulled location info@{0}:{1}".format(broker_name, location_info)) self.locationservice_client.query_location(broker_name,
timeout, retry)
LOG.debug("Pulled location info@{0}:{1}".format(
broker_name, location_info))
return location_info return location_info
except Exception as ex: except Exception as ex:
LOG.warning("Failed to query location of node:{0} due to: {1}".format( LOG.warning("Failed to query location of node:{0} "
broker_name, str(ex))) "due to: {1}".format(broker_name, str(ex)))
raise client_exception.NodeNotAvailable(broker_name) raise client_exception.NodeNotAvailable(broker_name)
def __get_node_info(self, default_node_name, timeout=5, retry=2): def __get_node_info(self, default_node_name, timeout=5, retry=2):
try: try:
nodeinfo_repo = NodeRepo(autocommit=False) nodeinfo_repo = NodeRepo(autocommit=False)
nodeinfo = nodeinfo_repo.get_one(Status=1, NodeName=default_node_name) nodeinfo = nodeinfo_repo.get_one(Status=1,
NodeName=default_node_name)
broker_pod_ip = None broker_pod_ip = None
supported_resource_types = [] supported_resource_types = []
if nodeinfo: if nodeinfo:
broker_pod_ip = nodeinfo.PodIP broker_pod_ip = nodeinfo.PodIP
supported_resource_types = json.loads(nodeinfo.ResourceTypes or '[]') supported_resource_types = json.loads(
nodeinfo.ResourceTypes or '[]')
if not broker_pod_ip: if not broker_pod_ip:
# try to query the broker ip # try to query the broker ip
location_info = self.__query_locationinfo(default_node_name, timeout, retry) location_info = self.__query_locationinfo(
default_node_name, timeout, retry)
broker_pod_ip = location_info.get("PodIP", None) broker_pod_ip = location_info.get("PodIP", None)
supported_resource_types = location_info.get("ResourceTypes", []) supported_resource_types = location_info.get(
"ResourceTypes", [])
return broker_pod_ip, supported_resource_types return broker_pod_ip, supported_resource_types
finally: finally:
@ -74,20 +80,25 @@ class PtpService(object):
def query(self, broker_name, resource_address=None, optional=None): def query(self, broker_name, resource_address=None, optional=None):
default_node_name = NodeInfoHelper.default_node_name(broker_name) default_node_name = NodeInfoHelper.default_node_name(broker_name)
broker_pod_ip, supported_resource_types = self.__get_node_info(default_node_name) broker_pod_ip, supported_resource_types = self.__get_node_info(
default_node_name)
if not broker_pod_ip: if not broker_pod_ip:
LOG.warning("Node {0} is not available yet".format(default_node_name)) LOG.warning("Node {0} is not available yet".format(
default_node_name))
raise client_exception.NodeNotAvailable(broker_name) raise client_exception.NodeNotAvailable(broker_name)
if not ResourceType.TypePTP in supported_resource_types: if ResourceType.TypePTP not in supported_resource_types:
LOG.warning("Resource {0}@{1} is not available yet".format( LOG.warning("Resource {0}@{1} is not available yet".format(
ResourceType.TypePTP, default_node_name)) ResourceType.TypePTP, default_node_name))
raise client_exception.ResourceNotAvailable(broker_name, ResourceType.TypePTP) raise client_exception.ResourceNotAvailable(broker_name,
ResourceType.TypePTP)
return self._query(default_node_name, broker_pod_ip, resource_address, optional) return self._query(default_node_name, broker_pod_ip,
resource_address, optional)
def _query(self, broker_name, broker_pod_ip, resource_address=None, optional=None): def _query(self, broker_name, broker_pod_ip, resource_address=None,
optional=None):
broker_host = "[{0}]".format(broker_pod_ip) broker_host = "[{0}]".format(broker_pod_ip)
broker_transport_endpoint = "rabbit://{0}:{1}@{2}:{3}".format( broker_transport_endpoint = "rabbit://{0}:{1}@{2}:{3}".format(
self.daemon_control.daemon_context['NOTIFICATION_BROKER_USER'], self.daemon_control.daemon_context['NOTIFICATION_BROKER_USER'],
@ -99,14 +110,15 @@ class PtpService(object):
notificationservice_client = NotificationServiceClient( notificationservice_client = NotificationServiceClient(
broker_name, broker_transport_endpoint, broker_pod_ip) broker_name, broker_transport_endpoint, broker_pod_ip)
resource_status = notificationservice_client.query_resource_status( resource_status = notificationservice_client.query_resource_status(
ResourceType.TypePTP, timeout=5, retry=10, resource_address=resource_address, ResourceType.TypePTP, timeout=5, retry=10,
optional=optional) resource_address=resource_address, optional=optional)
return resource_status return resource_status
except oslo_messaging.exceptions.MessagingTimeout as ex: except oslo_messaging.exceptions.MessagingTimeout as ex:
LOG.warning("ptp status is not available @node {0} due to {1}".format( LOG.warning("ptp status is not available "
broker_name, str(ex))) "@node {0} due to {1}".format(broker_name, str(ex)))
raise client_exception.ResourceNotAvailable(broker_name, ResourceType.TypePTP) raise client_exception.ResourceNotAvailable(broker_name,
except kombu.exceptions.OperationalError as ex: ResourceType.TypePTP)
except kombu.exceptions.OperationalError:
LOG.warning("Node {0} is unreachable yet".format(broker_name)) LOG.warning("Node {0} is unreachable yet".format(broker_name))
raise client_exception.NodeNotAvailable(broker_name) raise client_exception.NodeNotAvailable(broker_name)
finally: finally:
@ -115,54 +127,97 @@ class PtpService(object):
del notificationservice_client del notificationservice_client
def add_subscription(self, subscription_dto): def add_subscription(self, subscription_dto):
subscription_orm = SubscriptionOrm(**subscription_dto.to_orm())
resource_address = None resource_address = None
if hasattr(subscription_dto, 'ResourceAddress'): if hasattr(subscription_dto, 'ResourceAddress'):
_, nodename, _, _, _ = subscription_helper.parse_resource_address(subscription_dto. _, nodename, _, _, _ = subscription_helper.parse_resource_address(
ResourceAddress) subscription_dto.ResourceAddress)
broker_name = nodename broker_name = nodename
resource_address = subscription_dto.ResourceAddress resource_address = subscription_dto.ResourceAddress
elif hasattr(subscription_dto, 'ResourceType'): elif hasattr(subscription_dto, 'ResourceType'):
broker_name = subscription_dto.ResourceQualifier.NodeName broker_name = subscription_dto.ResourceQualifier.NodeName
default_node_name = NodeInfoHelper.default_node_name(broker_name) default_node_name = NodeInfoHelper.default_node_name(broker_name)
broker_pod_ip, supported_resource_types = self.__get_node_info(default_node_name) broker_pod_ip, supported_resource_types = self.__get_node_info(
default_node_name)
if not broker_pod_ip: if not broker_pod_ip:
LOG.warning("Node {0} is not available yet".format(default_node_name)) LOG.warning("Node {0} is not available yet".format(
default_node_name))
raise client_exception.NodeNotAvailable(broker_name) raise client_exception.NodeNotAvailable(broker_name)
if not ResourceType.TypePTP in supported_resource_types: if ResourceType.TypePTP not in supported_resource_types:
LOG.warning("Resource {0}@{1} is not available yet".format( LOG.warning("Resource {0}@{1} is not available yet".format(
ResourceType.TypePTP, default_node_name)) ResourceType.TypePTP, default_node_name))
raise client_exception.ResourceNotAvailable(broker_name, ResourceType.TypePTP) raise client_exception.ResourceNotAvailable(broker_name,
ResourceType.TypePTP)
# get initial resource status # get initial resource status
if default_node_name: if default_node_name:
ptpstatus = None ptpstatus = None
ptpstatus = self._query(default_node_name, ptpstatus = self._query(default_node_name, broker_pod_ip,
broker_pod_ip, resource_address, optional=None)
resource_address,
optional=None)
LOG.info("initial ptpstatus:{0}".format(ptpstatus)) LOG.info("initial ptpstatus:{0}".format(ptpstatus))
# construct subscription entry # construct subscription entry
if constants.PTP_V1_KEY in ptpstatus: if constants.PTP_V1_KEY in ptpstatus:
timestamp = ptpstatus[constants.PTP_V1_KEY].get('EventTimestamp', None) timestamp = ptpstatus[constants.PTP_V1_KEY].get(
'EventTimestamp', None)
ptpstatus = ptpstatus[constants.PTP_V1_KEY] ptpstatus = ptpstatus[constants.PTP_V1_KEY]
else: else:
for item in ptpstatus: for item in ptpstatus:
timestamp = ptpstatus[item].get('time', None) timestamp = ptpstatus[item].get('time', None)
# Change time from float to ascii format # Change time from float to ascii format
ptpstatus[item]['time'] = datetime.fromtimestamp(ptpstatus[item]['time']) \ ptpstatus[item]['time'] = datetime.fromtimestamp(
.strftime('%Y-%m-%dT%H:%M:%S%fZ') ptpstatus[item]['time']).strftime(
'%Y-%m-%dT%H:%M:%S%fZ')
# avoid duplicated subscription
entry = None
if hasattr(subscription_dto, 'ResourceType'):
version = 1
resource_qualifier_dto = \
subscription_dto.ResourceQualifier.to_dict()
LOG.debug('Looking for existing subscription for '
'EndpointUri %s ResourceQualifier %s'
% (subscription_dto.EndpointUri,
resource_qualifier_dto))
entries = self.subscription_repo.get(
EndpointUri=subscription_dto.EndpointUri
)
for e in entries:
resource_qualifier_json = e.ResourceQualifierJson or '{}'
resource_qualifier_repo = json.loads(
resource_qualifier_json)
if resource_qualifier_dto == resource_qualifier_repo:
entry = e
break
else:
version = 2
# Replace eventual '.' in ResourceAddress by the actual
# nodename
subscription_dto.ResourceAddress = \
subscription_helper.set_nodename_in_resource_address(
subscription_dto.ResourceAddress, default_node_name)
LOG.debug('Looking for existing subscription for '
'EndpointUri %s ResourceAddress %s'
% (subscription_dto.EndpointUri,
subscription_dto.ResourceAddress))
entry = self.subscription_repo.get_one(
EndpointUri=subscription_dto.EndpointUri,
ResourceAddress=subscription_dto.ResourceAddress
)
if entry:
LOG.debug('Found existing entry in subscription repo')
raise client_exception.ServiceError(409)
subscription_orm = SubscriptionOrm(**subscription_dto.to_orm())
subscription_orm.InitialDeliveryTimestamp = timestamp subscription_orm.InitialDeliveryTimestamp = timestamp
entry = self.subscription_repo.add(subscription_orm) entry = self.subscription_repo.add(subscription_orm)
# Delivery the initial notification of ptp status # Delivery the initial notification of ptp status
if hasattr(subscription_dto, 'ResourceType'): if version == 1:
subscription_dto2 = SubscriptionInfoV1(entry) subscription_dto2 = SubscriptionInfoV1(entry)
else: else:
subscription_dto2 = SubscriptionInfoV2(entry) subscription_dto2 = SubscriptionInfoV2(entry)
@ -171,15 +226,18 @@ class PtpService(object):
subscription_helper.notify(subscription_dto2, ptpstatus) subscription_helper.notify(subscription_dto2, ptpstatus)
LOG.info("initial ptpstatus is delivered successfully") LOG.info("initial ptpstatus is delivered successfully")
except Exception as ex: except Exception as ex:
LOG.warning("initial ptpstatus is not delivered:{0}".format(str(ex))) LOG.warning("initial ptpstatus is not delivered:{0}".format(
raise client_exception.InvalidEndpoint(subscription_dto.EndpointUri) str(ex)))
raise client_exception.InvalidEndpoint(
subscription_dto.EndpointUri)
try: try:
# commit the subscription entry # commit the subscription entry
self.subscription_repo.commit() self.subscription_repo.commit()
self.daemon_control.refresh() self.daemon_control.refresh()
except Exception as ex: except Exception as ex:
LOG.warning("subscription is not added successfully:{0}".format(str(ex))) LOG.warning("subscription is not added successfully:"
"{0}".format(str(ex)))
raise ex raise ex
return subscription_dto2 return subscription_dto2
@ -191,6 +249,7 @@ class PtpService(object):
# 2, refresh daemon # 2, refresh daemon
self.daemon_control.refresh() self.daemon_control.refresh()
except Exception as ex: except Exception as ex:
LOG.warning("subscription {0} is not deleted due to:{1}/{2}".format( LOG.warning("subscription {0} is not deleted due to:"
self.subscriptionid, type(ex), str(ex))) "{1}/{2}".format(self.subscriptionid, type(ex),
str(ex)))
raise ex raise ex

View File

@ -1,4 +1,3 @@
#coding=utf-8
# #
# Copyright (c) 2021-2022 Wind River Systems, Inc. # Copyright (c) 2021-2022 Wind River Systems, Inc.
# #
@ -7,14 +6,13 @@
from pecan import conf from pecan import conf
from pecan import expose, rest, response, abort from pecan import expose, rest, response, abort
from webob.exc import HTTPException, HTTPNotFound, HTTPBadRequest, HTTPClientError, HTTPServerError from webob.exc import HTTPException, HTTPServerError
import os
import logging import logging
from wsme import types as wtypes
from wsmeext.pecan import wsexpose from wsmeext.pecan import wsexpose
from notificationclientsdk.common.helpers import log_helper
from notificationclientsdk.model.dto.resourcetype import ResourceType from notificationclientsdk.model.dto.resourcetype import ResourceType
from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1 from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1
@ -26,11 +24,8 @@ from sidecar.repository.notification_control import notification_control
from sidecar.repository.dbcontext_default import defaults from sidecar.repository.dbcontext_default import defaults
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
from notificationclientsdk.common.helpers import log_helper
log_helper.config_logger(LOG) log_helper.config_logger(LOG)
THIS_NODE_NAME = os.environ.get("THIS_NODE_NAME",'controller-0')
class SubscriptionsControllerV1(rest.RestController): class SubscriptionsControllerV1(rest.RestController):
@ -39,43 +34,48 @@ class SubscriptionsControllerV1(rest.RestController):
# decode the request body # decode the request body
try: try:
if subscription.ResourceType == ResourceType.TypePTP: if subscription.ResourceType == ResourceType.TypePTP:
LOG.info(' subscribe: {0}, {1} with callback uri {2}'.format( LOG.info('subscribe: {0}, {1} with callback uri {2}'.format(
subscription.ResourceType, subscription.ResourceType,
subscription.ResourceQualifier.NodeName, subscription.ResourceQualifier.NodeName,
subscription.EndpointUri)) subscription.EndpointUri))
else: else:
LOG.warning(' Subscribe with unsupported ResourceType:{0}'.format( LOG.warning('Subscribe with unsupported '
subscription.ResourceType)) 'ResourceType:{0}'.format(
subscription.ResourceType))
abort(404) abort(404)
if not self._validateV1(subscription): if not self._validateV1(subscription):
LOG.warning(' Invalid Request data:{0}'.format(subscription.to_dict())) LOG.warning('Invalid Request data:{0}'.format(
subscription.to_dict()))
abort(400) abort(400)
subscription.UriLocation = "{0}://{1}:{2}/ocloudNotifications/v1/subscriptions".format( subscription.UriLocation = \
conf.server.get('protocol','http'), "{0}://{1}:{2}/ocloudNotifications/v1/subscriptions".format(
conf.server.get('host', '127.0.0.1'), conf.server.get('protocol', 'http'),
conf.server.get('port', '8080') conf.server.get('host', '127.0.0.1'),
) conf.server.get('port', '8080')
)
if subscription.ResourceType == ResourceType.TypePTP: if subscription.ResourceType == ResourceType.TypePTP:
ptpservice = PtpService(notification_control) ptpservice = PtpService(notification_control)
entry = ptpservice.add_subscription(subscription)
del ptpservice
if not entry:
abort(404)
subscription.SubscriptionId = entry.SubscriptionId entry = ptpservice.add_subscription(subscription)
subscription.UriLocation = entry.UriLocation subscription.SubscriptionId = entry.SubscriptionId
LOG.info('created subscription: {0}'.format(subscription.to_dict())) subscription.UriLocation = entry.UriLocation
LOG.info('created subscription: {0}'.format(
subscription.to_dict()))
del ptpservice
return subscription return subscription
except client_exception.InvalidSubscription as ex: except client_exception.ServiceError as err:
abort(int(str(err)))
except client_exception.InvalidSubscription:
abort(400) abort(400)
except client_exception.InvalidEndpoint as ex: except client_exception.InvalidEndpoint:
abort(400) abort(400)
except client_exception.NodeNotAvailable as ex: except client_exception.NodeNotAvailable:
abort(404) abort(404)
except client_exception.ResourceNotAvailable as ex: except client_exception.ResourceNotAvailable:
abort(404) abort(404)
except HTTPException as ex: except HTTPException as ex:
LOG.warning("Client side error:{0},{1}".format(type(ex), str(ex))) LOG.warning("Client side error:{0},{1}".format(type(ex), str(ex)))
@ -84,13 +84,14 @@ class SubscriptionsControllerV1(rest.RestController):
LOG.error("Server side error:{0},{1}".format(type(ex), str(ex))) LOG.error("Server side error:{0},{1}".format(type(ex), str(ex)))
abort(500) abort(500)
except Exception as ex: except Exception as ex:
LOG.error("Exception:{0}@{1}".format(type(ex),str(ex))) LOG.error("Exception:{0}@{1}".format(type(ex), str(ex)))
abort(500) abort(500)
@expose('json') @expose('json')
def get(self): def get(self):
try: try:
repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit = False) repo = SubscriptionRepo(defaults['dbcontext'].get_session(),
autocommit=False)
entries = repo.get(Status=1) entries = repo.get(Status=1)
response.status = 200 response.status = 200
subs = [] subs = []
@ -106,7 +107,7 @@ class SubscriptionsControllerV1(rest.RestController):
LOG.error("Server side error:{0},{1}".format(type(ex), str(ex))) LOG.error("Server side error:{0},{1}".format(type(ex), str(ex)))
raise ex raise ex
except Exception as ex: except Exception as ex:
LOG.error("Exception:{0}@{1}".format(type(ex),str(ex))) LOG.error("Exception:{0}@{1}".format(type(ex), str(ex)))
abort(500) abort(500)
@expose() @expose()
@ -117,9 +118,8 @@ class SubscriptionsControllerV1(rest.RestController):
try: try:
assert subscription_request.ResourceType == 'PTP' assert subscription_request.ResourceType == 'PTP'
assert subscription_request.EndpointUri assert subscription_request.EndpointUri
return True return True
except: except Exception:
return False return False
@ -130,7 +130,8 @@ class SubscriptionController(rest.RestController):
@expose('json') @expose('json')
def get(self): def get(self):
try: try:
repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit = False) repo = SubscriptionRepo(defaults['dbcontext'].get_session(),
autocommit=False)
entry = repo.get_one(SubscriptionId=self.subscription_id, Status=1) entry = repo.get_one(SubscriptionId=self.subscription_id, Status=1)
if not entry: if not entry:
@ -145,23 +146,23 @@ class SubscriptionController(rest.RestController):
LOG.error("Server side error:{0},{1}".format(type(ex), str(ex))) LOG.error("Server side error:{0},{1}".format(type(ex), str(ex)))
raise ex raise ex
except Exception as ex: except Exception as ex:
LOG.error("Exception:{0}@{1}".format(type(ex),str(ex))) LOG.error("Exception:{0}@{1}".format(type(ex), str(ex)))
abort(500) abort(500)
@wsexpose(status_code=204) @wsexpose(status_code=204)
def delete(self): def delete(self):
try: try:
repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit = False) repo = SubscriptionRepo(defaults['dbcontext'].get_session(),
autocommit=False)
entry = repo.get_one(SubscriptionId=self.subscription_id) entry = repo.get_one(SubscriptionId=self.subscription_id)
if entry: if entry:
if entry.SubscriptionId: if entry.SubscriptionId:
ptpservice = PtpService(notification_control) ptpservice = PtpService(notification_control)
ptpservice.remove_subscription(entry.SubscriptionId) ptpservice.remove_subscription(entry.SubscriptionId)
del ptpservice del ptpservice
return
else: else:
repo.delete_one(SubscriptionId=self.subscription_id) repo.delete_one(SubscriptionId=self.subscription_id)
return return
abort(404) abort(404)
except HTTPException as ex: except HTTPException as ex:
LOG.warning("Client side error:{0},{1}".format(type(ex), str(ex))) LOG.warning("Client side error:{0},{1}".format(type(ex), str(ex)))
@ -170,5 +171,5 @@ class SubscriptionController(rest.RestController):
LOG.error("Server side error:{0},{1}".format(type(ex), str(ex))) LOG.error("Server side error:{0},{1}".format(type(ex), str(ex)))
raise ex raise ex
except Exception as ex: except Exception as ex:
LOG.error("Exception:{0}@{1}".format(type(ex),str(ex))) LOG.error("Exception:{0}@{1}".format(type(ex), str(ex)))
abort(500) abort(500)

View File

@ -6,12 +6,10 @@
from pecan import conf from pecan import conf
from pecan import expose, rest, response, abort from pecan import expose, rest, response, abort
from webob.exc import HTTPException, HTTPNotFound, HTTPBadRequest, HTTPClientError, HTTPServerError from webob.exc import HTTPException, HTTPServerError
import os
import logging import logging
from wsme import types as wtypes
from wsmeext.pecan import wsexpose from wsmeext.pecan import wsexpose
from notificationclientsdk.model.dto.subscription import SubscriptionInfoV2 from notificationclientsdk.model.dto.subscription import SubscriptionInfoV2
@ -27,8 +25,6 @@ from sidecar.repository.dbcontext_default import defaults
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
log_helper.config_logger(LOG) log_helper.config_logger(LOG)
THIS_NODE_NAME = os.environ.get("THIS_NODE_NAME", 'controller-0')
class SubscriptionsControllerV2(rest.RestController): class SubscriptionsControllerV2(rest.RestController):
@ -37,37 +33,43 @@ class SubscriptionsControllerV2(rest.RestController):
# decode the request body # decode the request body
try: try:
if subscription.ResourceAddress: if subscription.ResourceAddress:
LOG.info(' subscribe: ResourceAddress {0} with callback uri {1}'.format( LOG.info('subscribe: ResourceAddress {0} with '
subscription.ResourceAddress, 'callback uri {1}'.format(
subscription.EndpointUri)) subscription.ResourceAddress,
subscription.EndpointUri))
if not self._validateV2(subscription): if not self._validateV2(subscription):
LOG.warning(' Invalid Request data:{0}'.format(subscription.to_dict())) LOG.warning('Invalid Request data:{0}'.format(
subscription.to_dict()))
abort(400) abort(400)
subscription.UriLocation = "{0}://{1}:{2}/ocloudNotifications/v2/subscriptions".format( subscription.UriLocation = \
conf.server.get('protocol', 'http'), "{0}://{1}:{2}/ocloudNotifications/v2/subscriptions".format(
conf.server.get('host', '127.0.0.1'), conf.server.get('protocol', 'http'),
conf.server.get('port', '8080') conf.server.get('host', '127.0.0.1'),
) conf.server.get('port', '8080')
)
if subscription.ResourceAddress: if subscription.ResourceAddress:
ptpservice = PtpService(notification_control) ptpservice = PtpService(notification_control)
entry = ptpservice.add_subscription(subscription) entry = ptpservice.add_subscription(subscription)
subscription.SubscriptionId = entry.SubscriptionId
subscription.UriLocation = entry.UriLocation
LOG.info('created subscription: {0}'.format(
subscription.to_dict()))
del ptpservice del ptpservice
if not entry:
abort(404)
subscription.SubscriptionId = entry.SubscriptionId
subscription.UriLocation = entry.UriLocation
LOG.info('created subscription: {0}'.format(subscription.to_dict()))
return subscription return subscription
except client_exception.InvalidSubscription as ex: except client_exception.ServiceError as err:
abort(int(str(err)))
except client_exception.InvalidSubscription:
abort(400) abort(400)
except client_exception.InvalidEndpoint as ex: except client_exception.InvalidEndpoint:
abort(400) abort(400)
except client_exception.NodeNotAvailable as ex: except client_exception.NodeNotAvailable:
abort(404) abort(404)
except client_exception.ResourceNotAvailable as ex: except client_exception.ResourceNotAvailable:
abort(404) abort(404)
except HTTPException as ex: except HTTPException as ex:
LOG.warning("Client side error:{0},{1}".format(type(ex), str(ex))) LOG.warning("Client side error:{0},{1}".format(type(ex), str(ex)))
@ -82,7 +84,8 @@ class SubscriptionsControllerV2(rest.RestController):
@expose('json') @expose('json')
def get(self): def get(self):
try: try:
repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit=False) repo = SubscriptionRepo(defaults['dbcontext'].get_session(),
autocommit=False)
entries = repo.get(Status=1) entries = repo.get(Status=1)
response.status = 200 response.status = 200
subs = [] subs = []
@ -109,9 +112,8 @@ class SubscriptionsControllerV2(rest.RestController):
try: try:
assert subscription_request.ResourceAddress assert subscription_request.ResourceAddress
assert subscription_request.EndpointUri assert subscription_request.EndpointUri
return True return True
except: except Exception:
return False return False
@ -122,7 +124,8 @@ class SubscriptionController(rest.RestController):
@expose('json') @expose('json')
def get(self): def get(self):
try: try:
repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit=False) repo = SubscriptionRepo(defaults['dbcontext'].get_session(),
autocommit=False)
entry = repo.get_one(SubscriptionId=self.subscription_id, Status=1) entry = repo.get_one(SubscriptionId=self.subscription_id, Status=1)
if not entry: if not entry:
@ -145,7 +148,8 @@ class SubscriptionController(rest.RestController):
@wsexpose(status_code=204) @wsexpose(status_code=204)
def delete(self): def delete(self):
try: try:
repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit=False) repo = SubscriptionRepo(defaults['dbcontext'].get_session(),
autocommit=False)
entry = repo.get_one(SubscriptionId=self.subscription_id) entry = repo.get_one(SubscriptionId=self.subscription_id)
if entry: if entry:
if entry.SubscriptionId: if entry.SubscriptionId: