diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/exception/__init__.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/exception/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/exception/client_exception.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/exception/client_exception.py new file mode 100644 index 0000000..f6eb144 --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/exception/client_exception.py @@ -0,0 +1,31 @@ + +class NodeNotAvailable(Exception): + def __init__(self, node_name): + self.node_name = node_name + def __str__(self): + return "Node:{0} not available".format(self.node_name) + + +class ResourceNotAvailable(Exception): + def __init__(self, node_name, resource_type): + self.node_name = node_name + self.resource_type = resource_type + def __str__(self): + return "Resource with type:{0} is not available on node:{1}".format( + self.resource_type, self.node_name) + + +class InvalidEndpoint(Exception): + def __init__(self, endpoint_uri): + self.endpoint_uri = endpoint_uri + + def __str__(self): + return "Endpoint is invalid:{0}".format(self.endpoint_uri) + + +class InvalidSubscription(Exception): + def __init__(self, subscriptioninfo): + self.subscriptioninfo = subscriptioninfo + + def __str__(self): + return "Subscription is invalid:{0}".format(self.subscriptioninfo.to_dict()) diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/orm/subscription.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/orm/subscription.py index 0e2be72..c06a9bf 100644 --- a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/orm/subscription.py +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/orm/subscription.py @@ -8,6 +8,7 @@ class Subscription(OrmBase): UriLocation = Column(String(512)) ResourceType = Column(String(64)) EndpointUri = Column(String(512)) + InitialDeliveryTimestamp = Column(Float) Status = Column(Integer) CreateTime = Column(Float) LastUpdateTime = Column(Float) diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/daemon.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/daemon.py index 99a2648..9b6eff2 100644 --- a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/daemon.py +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/daemon.py @@ -118,6 +118,7 @@ class NotificationWorker: def __init_node_sync_channel(self): self.__node_sync_event = mp.Event() + self.__node_sync_q = Queue.Queue() # initial to be set self.__node_sync_event.set() @@ -203,12 +204,14 @@ class NotificationWorker: nodeinfo_repo.add(entry) node_resource_updated = True node_changed = True + self.__node_sync_q.put(node_name) LOG.debug("Add NodeInfo: {0}".format(entry.NodeName)) elif not entry.Timestamp or entry.Timestamp < location_info['Timestamp']: # update the entry if entry.ResourceTypes != location_info2.ResourceTypes: node_resource_updated = True nodeinfo_repo.update(entry.NodeName, **location_info2.to_orm()) + self.__node_sync_q.put(node_name) LOG.debug("Update NodeInfo: {0}".format(entry.NodeName)) else: # do nothing @@ -235,6 +238,35 @@ class NotificationWorker: self.signal_events() pass + def __get_lastest_delivery_timestamp(self, node_name, subscriptionid): + last_delivery_stat = self.notification_stat.get(node_name,{}).get(subscriptionid,{}) + last_delivery_time = last_delivery_stat.get('EventTimestamp', None) + return last_delivery_time + + def __update_delivery_timestamp(self, node_name, subscriptionid, this_delivery_time): + if not self.notification_stat.get(node_name, None): + self.notification_stat[node_name] = { + subscriptionid: { + 'EventTimestamp': this_delivery_time + } + } + LOG.debug("delivery time @node: {0},subscription:{1} is added".format( + node_name, subscriptionid)) + elif not self.notification_stat[node_name].get(subscriptionid, None): + self.notification_stat[node_name][subscriptionid] = { + 'EventTimestamp': this_delivery_time + } + LOG.debug("delivery time @node: {0},subscription:{1} is added".format( + node_name, subscriptionid)) + else: + last_delivery_stat = self.notification_stat.get(node_name,{}).get(subscriptionid,{}) + last_delivery_time = last_delivery_stat.get('EventTimestamp', None) + if (last_delivery_time >= this_delivery_time): + return + last_delivery_stat['EventTimestamp'] = this_delivery_time + LOG.debug("delivery time @node: {0},subscription:{1} is updated".format( + node_name, subscriptionid)) + def handle_notification_delivery(self, notification_info): LOG.debug("start notification delivery") result = True @@ -267,40 +299,22 @@ class NotificationWorker: subscription_dto2 = SubscriptionInfo(entry) try: - last_delivery_stat = self.notification_stat.get(node_name,{}).get(subscriptionid,{}) - last_delivery_time = last_delivery_stat.get('EventTimestamp', None) + last_delivery_time = self.__get_lastest_delivery_timestamp(node_name, subscriptionid) if last_delivery_time and last_delivery_time >= this_delivery_time: # skip this entry since already delivered LOG.debug("Ignore the notification for: {0}".format(entry.SubscriptionId)) - raise Exception("notification timestamp indicate it is not lastest") + continue subscription_helper.notify(subscription_dto2, notification_info) LOG.debug("notification is delivered successfully to {0}".format( entry.SubscriptionId)) - if not self.notification_stat.get(node_name, None): - self.notification_stat[node_name] = { - subscriptionid: { - 'EventTimestamp': this_delivery_time - } - } - LOG.debug("delivery time @node: {0},subscription:{1} is added".format( - node_name, subscriptionid)) - elif not self.notification_stat[node_name].get(subscriptionid, None): - self.notification_stat[node_name][subscriptionid] = { - 'EventTimestamp': this_delivery_time - } - LOG.debug("delivery time @node: {0},subscription:{1} is added".format( - node_name, subscriptionid)) - else: - last_delivery_stat['EventTimestamp'] = this_delivery_time - LOG.debug("delivery time @node: {0},subscription:{1} is updated".format( - node_name, subscriptionid)) + self.__update_delivery_timestamp(node_name, subscriptionid, this_delivery_time) except Exception as ex: LOG.warning("notification is not delivered to {0}:{1}".format( entry.SubscriptionId, str(ex))) - # remove the entry + # proceed to next entry continue finally: pass @@ -321,7 +335,9 @@ class NotificationWorker: def process_sync_node_event(self): LOG.debug("Start processing sync node event") need_to_sync_node_again = False - for broker_node_name, node_resources in self.node_resources_map.items(): + + while not self.__node_sync_q.empty(): + broker_node_name = self.__node_sync_q.get(False) try: result = self.syncup_node(broker_node_name) if not result: @@ -460,6 +476,11 @@ class NotificationWorker: node_map = {} self.node_resources_map[current_node_name] = node_map node_map[resource_type] = self.node_resources_iteration + # update the initial delivery timestamp as well + + self.__update_delivery_timestamp( + NodeInfoHelper.default_node_name(broker_node_name), + s.SubscriptionId, s.InitialDeliveryTimestamp) # delete all entry with Status == 0 subscription_repo.delete(Status=0) @@ -610,16 +631,27 @@ class NotificationWorker: def __refresh_watchers_from_map(self): try: LOG.debug("refresh with {0} nodes".format(len(self.node_resources_map))) + node_to_sync = [] for broker_node_name, node_resources in self.node_resources_map.items(): LOG.debug("check to watch resources@{0} :{1}".format(broker_node_name, node_resources)) + need_to_sync_node = False for resource_type, iteration in node_resources.items(): # enable watchers if iteration == self.node_resources_iteration: self.__start_watch_resource(broker_node_name, resource_type) + need_to_sync_node = True else: self.__stop_watch_resource(broker_node_name, resource_type) + if need_to_sync_node: + node_to_sync.append(broker_node_name) self.__refresh_location_watcher() self.__cleanup_map() + if node_to_sync: + # trigger the node sync up event + for node_name in node_to_sync: + self.__node_sync_q.put(node_name) + self.signal_node_sync_event() + self.signal_events() except Exception as ex: LOG.debug("exception: {0}".format(str(ex))) pass diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py index 61d8d32..7f82b48 100644 --- a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py @@ -11,6 +11,8 @@ from notificationclientsdk.client.notificationservice import NotificationService from notificationclientsdk.services.daemon import DaemonControl from notificationclientsdk.common.helpers import subscription_helper +from notificationclientsdk.exception import client_exception + LOG = logging.getLogger(__name__) from notificationclientsdk.common.helpers import log_helper @@ -46,7 +48,7 @@ class PtpService(object): broker_node_name = subscription_dto.ResourceQualifier.NodeName default_node_name = NodeInfoHelper.default_node_name(broker_node_name) nodeinfos = NodeInfoHelper.enumerate_nodes(broker_node_name) - # 1, check node availability from DB + # check node availability from DB if not nodeinfos or not default_node_name in nodeinfos: # update nodeinfo try: @@ -55,17 +57,9 @@ class PtpService(object): except oslo_messaging.exceptions.MessagingTimeout as ex: LOG.warning("node {0} cannot be reached due to {1}".format( default_node_name, str(ex))) - raise ex + raise client_exception.NodeNotAvailable(broker_node_name) - # 2, add to DB - entry = self.subscription_repo.add(subscription_orm) - # must commit the transaction to make it visible to daemon worker - self.subscription_repo.commit() - - # 3, refresh daemon - self.daemon_control.refresh() - - # 4, get initial resource status + # get initial resource status if default_node_name: ptpstatus = None try: @@ -74,24 +68,28 @@ class PtpService(object): except oslo_messaging.exceptions.MessagingTimeout as ex: LOG.warning("ptp status is not available @node {0} due to {1}".format( default_node_name, str(ex))) - # remove the entry - self.subscription_repo.delete_one(SubscriptionId = entry.SubscriptionId) - self.subscription_repo.commit() - self.daemon_control.refresh() - raise ex + raise client_exception.ResourceNotAvailable(broker_node_name, subscription_dto.ResourceType) - # 5, initial delivery of ptp status + # construct subscription entry + subscription_orm.InitialDeliveryTimestamp = ptpstatus.get('EventTimestamp', None) + entry = self.subscription_repo.add(subscription_orm) + + # Delivery the initial notification of ptp status subscription_dto2 = SubscriptionInfo(entry) try: subscription_helper.notify(subscription_dto2, ptpstatus) LOG.info("initial ptpstatus is delivered successfully") except Exception as ex: - LOG.warning("initial ptpstatus is not delivered:{0}".format(type(ex), str(ex))) - # remove the entry - self.subscription_repo.delete_one(SubscriptionId = entry.SubscriptionId) + LOG.warning("initial ptpstatus is not delivered:{0}".format(str(ex))) + raise client_exception.InvalidEndpoint(subscription_dto.EndpointUri) + + try: + # commit the subscription entry self.subscription_repo.commit() self.daemon_control.refresh() - subscription_dto2 = None + except Exception as ex: + LOG.warning("subscription is not added successfully:{0}".format(str(ex))) + raise ex return subscription_dto2 def remove_subscription(self, subscriptionid): diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/sidecar/controllers/v1/subscriptions.py b/notificationclient-base/centos/docker/notificationclient-sidecar/sidecar/controllers/v1/subscriptions.py index 246a073..18f0cc1 100644 --- a/notificationclient-base/centos/docker/notificationclient-sidecar/sidecar/controllers/v1/subscriptions.py +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/sidecar/controllers/v1/subscriptions.py @@ -16,6 +16,7 @@ from notificationclientsdk.model.dto.subscription import SubscriptionInfo from notificationclientsdk.repository.subscription_repo import SubscriptionRepo from notificationclientsdk.services.ptp import PtpService +from notificationclientsdk.exception import client_exception from sidecar.repository.notification_control import notification_control from sidecar.repository.dbcontext_default import defaults @@ -63,14 +64,22 @@ class SubscriptionsController(rest.RestController): LOG.info('created subscription: {0}'.format(subscription.to_dict())) return subscription + except client_exception.InvalidSubscription as ex: + abort(400) + except client_exception.InvalidEndpoint as ex: + abort(400) + except client_exception.NodeNotAvailable as ex: + abort(404) + except client_exception.ResourceNotAvailable as ex: + abort(404) except oslo_messaging.exceptions.MessagingTimeout as ex: abort(404) except HTTPException as ex: LOG.warning("Client side error:{0},{1}".format(type(ex), str(ex))) - raise ex + abort(400) except HTTPServerError as ex: LOG.error("Server side error:{0},{1}".format(type(ex), str(ex))) - raise ex + abort(500) except Exception as ex: LOG.error("Exception:{0}@{1}".format(type(ex),str(ex))) abort(500)