Remove duplication notification delivery
Story: 2008529 Task: 41838 Signed-off-by: Bin Yang <bin.yang@windriver.com> Change-Id: I90480828c8262ba82da732efee82857dbe8910b6
This commit is contained in:
parent
7834ef4f3e
commit
435d2df181
@ -0,0 +1,31 @@
|
||||
|
||||
class NodeNotAvailable(Exception):
|
||||
def __init__(self, node_name):
|
||||
self.node_name = node_name
|
||||
def __str__(self):
|
||||
return "Node:{0} not available".format(self.node_name)
|
||||
|
||||
|
||||
class ResourceNotAvailable(Exception):
|
||||
def __init__(self, node_name, resource_type):
|
||||
self.node_name = node_name
|
||||
self.resource_type = resource_type
|
||||
def __str__(self):
|
||||
return "Resource with type:{0} is not available on node:{1}".format(
|
||||
self.resource_type, self.node_name)
|
||||
|
||||
|
||||
class InvalidEndpoint(Exception):
|
||||
def __init__(self, endpoint_uri):
|
||||
self.endpoint_uri = endpoint_uri
|
||||
|
||||
def __str__(self):
|
||||
return "Endpoint is invalid:{0}".format(self.endpoint_uri)
|
||||
|
||||
|
||||
class InvalidSubscription(Exception):
|
||||
def __init__(self, subscriptioninfo):
|
||||
self.subscriptioninfo = subscriptioninfo
|
||||
|
||||
def __str__(self):
|
||||
return "Subscription is invalid:{0}".format(self.subscriptioninfo.to_dict())
|
@ -8,6 +8,7 @@ class Subscription(OrmBase):
|
||||
UriLocation = Column(String(512))
|
||||
ResourceType = Column(String(64))
|
||||
EndpointUri = Column(String(512))
|
||||
InitialDeliveryTimestamp = Column(Float)
|
||||
Status = Column(Integer)
|
||||
CreateTime = Column(Float)
|
||||
LastUpdateTime = Column(Float)
|
||||
|
@ -118,6 +118,7 @@ class NotificationWorker:
|
||||
|
||||
def __init_node_sync_channel(self):
|
||||
self.__node_sync_event = mp.Event()
|
||||
self.__node_sync_q = Queue.Queue()
|
||||
# initial to be set
|
||||
self.__node_sync_event.set()
|
||||
|
||||
@ -203,12 +204,14 @@ class NotificationWorker:
|
||||
nodeinfo_repo.add(entry)
|
||||
node_resource_updated = True
|
||||
node_changed = True
|
||||
self.__node_sync_q.put(node_name)
|
||||
LOG.debug("Add NodeInfo: {0}".format(entry.NodeName))
|
||||
elif not entry.Timestamp or entry.Timestamp < location_info['Timestamp']:
|
||||
# update the entry
|
||||
if entry.ResourceTypes != location_info2.ResourceTypes:
|
||||
node_resource_updated = True
|
||||
nodeinfo_repo.update(entry.NodeName, **location_info2.to_orm())
|
||||
self.__node_sync_q.put(node_name)
|
||||
LOG.debug("Update NodeInfo: {0}".format(entry.NodeName))
|
||||
else:
|
||||
# do nothing
|
||||
@ -235,6 +238,35 @@ class NotificationWorker:
|
||||
self.signal_events()
|
||||
pass
|
||||
|
||||
def __get_lastest_delivery_timestamp(self, node_name, subscriptionid):
|
||||
last_delivery_stat = self.notification_stat.get(node_name,{}).get(subscriptionid,{})
|
||||
last_delivery_time = last_delivery_stat.get('EventTimestamp', None)
|
||||
return last_delivery_time
|
||||
|
||||
def __update_delivery_timestamp(self, node_name, subscriptionid, this_delivery_time):
|
||||
if not self.notification_stat.get(node_name, None):
|
||||
self.notification_stat[node_name] = {
|
||||
subscriptionid: {
|
||||
'EventTimestamp': this_delivery_time
|
||||
}
|
||||
}
|
||||
LOG.debug("delivery time @node: {0},subscription:{1} is added".format(
|
||||
node_name, subscriptionid))
|
||||
elif not self.notification_stat[node_name].get(subscriptionid, None):
|
||||
self.notification_stat[node_name][subscriptionid] = {
|
||||
'EventTimestamp': this_delivery_time
|
||||
}
|
||||
LOG.debug("delivery time @node: {0},subscription:{1} is added".format(
|
||||
node_name, subscriptionid))
|
||||
else:
|
||||
last_delivery_stat = self.notification_stat.get(node_name,{}).get(subscriptionid,{})
|
||||
last_delivery_time = last_delivery_stat.get('EventTimestamp', None)
|
||||
if (last_delivery_time >= this_delivery_time):
|
||||
return
|
||||
last_delivery_stat['EventTimestamp'] = this_delivery_time
|
||||
LOG.debug("delivery time @node: {0},subscription:{1} is updated".format(
|
||||
node_name, subscriptionid))
|
||||
|
||||
def handle_notification_delivery(self, notification_info):
|
||||
LOG.debug("start notification delivery")
|
||||
result = True
|
||||
@ -267,40 +299,22 @@ class NotificationWorker:
|
||||
|
||||
subscription_dto2 = SubscriptionInfo(entry)
|
||||
try:
|
||||
last_delivery_stat = self.notification_stat.get(node_name,{}).get(subscriptionid,{})
|
||||
last_delivery_time = last_delivery_stat.get('EventTimestamp', None)
|
||||
last_delivery_time = self.__get_lastest_delivery_timestamp(node_name, subscriptionid)
|
||||
if last_delivery_time and last_delivery_time >= this_delivery_time:
|
||||
# skip this entry since already delivered
|
||||
LOG.debug("Ignore the notification for: {0}".format(entry.SubscriptionId))
|
||||
raise Exception("notification timestamp indicate it is not lastest")
|
||||
continue
|
||||
|
||||
subscription_helper.notify(subscription_dto2, notification_info)
|
||||
LOG.debug("notification is delivered successfully to {0}".format(
|
||||
entry.SubscriptionId))
|
||||
|
||||
if not self.notification_stat.get(node_name, None):
|
||||
self.notification_stat[node_name] = {
|
||||
subscriptionid: {
|
||||
'EventTimestamp': this_delivery_time
|
||||
}
|
||||
}
|
||||
LOG.debug("delivery time @node: {0},subscription:{1} is added".format(
|
||||
node_name, subscriptionid))
|
||||
elif not self.notification_stat[node_name].get(subscriptionid, None):
|
||||
self.notification_stat[node_name][subscriptionid] = {
|
||||
'EventTimestamp': this_delivery_time
|
||||
}
|
||||
LOG.debug("delivery time @node: {0},subscription:{1} is added".format(
|
||||
node_name, subscriptionid))
|
||||
else:
|
||||
last_delivery_stat['EventTimestamp'] = this_delivery_time
|
||||
LOG.debug("delivery time @node: {0},subscription:{1} is updated".format(
|
||||
node_name, subscriptionid))
|
||||
self.__update_delivery_timestamp(node_name, subscriptionid, this_delivery_time)
|
||||
|
||||
except Exception as ex:
|
||||
LOG.warning("notification is not delivered to {0}:{1}".format(
|
||||
entry.SubscriptionId, str(ex)))
|
||||
# remove the entry
|
||||
# proceed to next entry
|
||||
continue
|
||||
finally:
|
||||
pass
|
||||
@ -321,7 +335,9 @@ class NotificationWorker:
|
||||
def process_sync_node_event(self):
|
||||
LOG.debug("Start processing sync node event")
|
||||
need_to_sync_node_again = False
|
||||
for broker_node_name, node_resources in self.node_resources_map.items():
|
||||
|
||||
while not self.__node_sync_q.empty():
|
||||
broker_node_name = self.__node_sync_q.get(False)
|
||||
try:
|
||||
result = self.syncup_node(broker_node_name)
|
||||
if not result:
|
||||
@ -460,6 +476,11 @@ class NotificationWorker:
|
||||
node_map = {}
|
||||
self.node_resources_map[current_node_name] = node_map
|
||||
node_map[resource_type] = self.node_resources_iteration
|
||||
# update the initial delivery timestamp as well
|
||||
|
||||
self.__update_delivery_timestamp(
|
||||
NodeInfoHelper.default_node_name(broker_node_name),
|
||||
s.SubscriptionId, s.InitialDeliveryTimestamp)
|
||||
|
||||
# delete all entry with Status == 0
|
||||
subscription_repo.delete(Status=0)
|
||||
@ -610,16 +631,27 @@ class NotificationWorker:
|
||||
def __refresh_watchers_from_map(self):
|
||||
try:
|
||||
LOG.debug("refresh with {0} nodes".format(len(self.node_resources_map)))
|
||||
node_to_sync = []
|
||||
for broker_node_name, node_resources in self.node_resources_map.items():
|
||||
LOG.debug("check to watch resources@{0} :{1}".format(broker_node_name, node_resources))
|
||||
need_to_sync_node = False
|
||||
for resource_type, iteration in node_resources.items():
|
||||
# enable watchers
|
||||
if iteration == self.node_resources_iteration:
|
||||
self.__start_watch_resource(broker_node_name, resource_type)
|
||||
need_to_sync_node = True
|
||||
else:
|
||||
self.__stop_watch_resource(broker_node_name, resource_type)
|
||||
if need_to_sync_node:
|
||||
node_to_sync.append(broker_node_name)
|
||||
self.__refresh_location_watcher()
|
||||
self.__cleanup_map()
|
||||
if node_to_sync:
|
||||
# trigger the node sync up event
|
||||
for node_name in node_to_sync:
|
||||
self.__node_sync_q.put(node_name)
|
||||
self.signal_node_sync_event()
|
||||
self.signal_events()
|
||||
except Exception as ex:
|
||||
LOG.debug("exception: {0}".format(str(ex)))
|
||||
pass
|
||||
|
@ -11,6 +11,8 @@ from notificationclientsdk.client.notificationservice import NotificationService
|
||||
from notificationclientsdk.services.daemon import DaemonControl
|
||||
from notificationclientsdk.common.helpers import subscription_helper
|
||||
|
||||
from notificationclientsdk.exception import client_exception
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
from notificationclientsdk.common.helpers import log_helper
|
||||
@ -46,7 +48,7 @@ class PtpService(object):
|
||||
broker_node_name = subscription_dto.ResourceQualifier.NodeName
|
||||
default_node_name = NodeInfoHelper.default_node_name(broker_node_name)
|
||||
nodeinfos = NodeInfoHelper.enumerate_nodes(broker_node_name)
|
||||
# 1, check node availability from DB
|
||||
# check node availability from DB
|
||||
if not nodeinfos or not default_node_name in nodeinfos:
|
||||
# update nodeinfo
|
||||
try:
|
||||
@ -55,17 +57,9 @@ class PtpService(object):
|
||||
except oslo_messaging.exceptions.MessagingTimeout as ex:
|
||||
LOG.warning("node {0} cannot be reached due to {1}".format(
|
||||
default_node_name, str(ex)))
|
||||
raise ex
|
||||
raise client_exception.NodeNotAvailable(broker_node_name)
|
||||
|
||||
# 2, add to DB
|
||||
entry = self.subscription_repo.add(subscription_orm)
|
||||
# must commit the transaction to make it visible to daemon worker
|
||||
self.subscription_repo.commit()
|
||||
|
||||
# 3, refresh daemon
|
||||
self.daemon_control.refresh()
|
||||
|
||||
# 4, get initial resource status
|
||||
# get initial resource status
|
||||
if default_node_name:
|
||||
ptpstatus = None
|
||||
try:
|
||||
@ -74,24 +68,28 @@ class PtpService(object):
|
||||
except oslo_messaging.exceptions.MessagingTimeout as ex:
|
||||
LOG.warning("ptp status is not available @node {0} due to {1}".format(
|
||||
default_node_name, str(ex)))
|
||||
# remove the entry
|
||||
self.subscription_repo.delete_one(SubscriptionId = entry.SubscriptionId)
|
||||
self.subscription_repo.commit()
|
||||
self.daemon_control.refresh()
|
||||
raise ex
|
||||
raise client_exception.ResourceNotAvailable(broker_node_name, subscription_dto.ResourceType)
|
||||
|
||||
# 5, initial delivery of ptp status
|
||||
# construct subscription entry
|
||||
subscription_orm.InitialDeliveryTimestamp = ptpstatus.get('EventTimestamp', None)
|
||||
entry = self.subscription_repo.add(subscription_orm)
|
||||
|
||||
# Delivery the initial notification of ptp status
|
||||
subscription_dto2 = SubscriptionInfo(entry)
|
||||
try:
|
||||
subscription_helper.notify(subscription_dto2, ptpstatus)
|
||||
LOG.info("initial ptpstatus is delivered successfully")
|
||||
except Exception as ex:
|
||||
LOG.warning("initial ptpstatus is not delivered:{0}".format(type(ex), str(ex)))
|
||||
# remove the entry
|
||||
self.subscription_repo.delete_one(SubscriptionId = entry.SubscriptionId)
|
||||
LOG.warning("initial ptpstatus is not delivered:{0}".format(str(ex)))
|
||||
raise client_exception.InvalidEndpoint(subscription_dto.EndpointUri)
|
||||
|
||||
try:
|
||||
# commit the subscription entry
|
||||
self.subscription_repo.commit()
|
||||
self.daemon_control.refresh()
|
||||
subscription_dto2 = None
|
||||
except Exception as ex:
|
||||
LOG.warning("subscription is not added successfully:{0}".format(str(ex)))
|
||||
raise ex
|
||||
return subscription_dto2
|
||||
|
||||
def remove_subscription(self, subscriptionid):
|
||||
|
@ -16,6 +16,7 @@ from notificationclientsdk.model.dto.subscription import SubscriptionInfo
|
||||
|
||||
from notificationclientsdk.repository.subscription_repo import SubscriptionRepo
|
||||
from notificationclientsdk.services.ptp import PtpService
|
||||
from notificationclientsdk.exception import client_exception
|
||||
|
||||
from sidecar.repository.notification_control import notification_control
|
||||
from sidecar.repository.dbcontext_default import defaults
|
||||
@ -63,14 +64,22 @@ class SubscriptionsController(rest.RestController):
|
||||
LOG.info('created subscription: {0}'.format(subscription.to_dict()))
|
||||
|
||||
return subscription
|
||||
except client_exception.InvalidSubscription as ex:
|
||||
abort(400)
|
||||
except client_exception.InvalidEndpoint as ex:
|
||||
abort(400)
|
||||
except client_exception.NodeNotAvailable as ex:
|
||||
abort(404)
|
||||
except client_exception.ResourceNotAvailable as ex:
|
||||
abort(404)
|
||||
except oslo_messaging.exceptions.MessagingTimeout as ex:
|
||||
abort(404)
|
||||
except HTTPException as ex:
|
||||
LOG.warning("Client side error:{0},{1}".format(type(ex), str(ex)))
|
||||
raise ex
|
||||
abort(400)
|
||||
except HTTPServerError as ex:
|
||||
LOG.error("Server side error:{0},{1}".format(type(ex), str(ex)))
|
||||
raise ex
|
||||
abort(500)
|
||||
except Exception as ex:
|
||||
LOG.error("Exception:{0}@{1}".format(type(ex),str(ex)))
|
||||
abort(500)
|
||||
|
Loading…
x
Reference in New Issue
Block a user