PTP notification backend integration

The notification client and service are integrated which
allows subscription and queries of GNSS, OS clock and
overall sync states.

Test plan:
PASS: GNSS state subscribe and pull
PASS: OS Clock state subscribe and pull
PASS: Overall state subscribe and pull

Story: 2010056
Task: 45996

Signed-off-by: Teresa Ho <teresa.ho@windriver.com>
Change-Id: Ifcc0c5f33a29f2f6f522d2677e4f5498b5faaf4b
This commit is contained in:
Teresa Ho 2022-08-11 12:16:33 -04:00
parent d54873f806
commit b626d725e7
4 changed files with 81 additions and 28 deletions

View File

@ -10,6 +10,7 @@ import logging
import multiprocessing as mp import multiprocessing as mp
import threading import threading
import time import time
from datetime import datetime, timezone
from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1 from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1
from notificationclientsdk.model.dto.subscription import SubscriptionInfoV2 from notificationclientsdk.model.dto.subscription import SubscriptionInfoV2
@ -65,8 +66,9 @@ class NotificationHandler(NotificationHandlerBase):
_,node_name,_ = subscription_helper.parse_resource_address(resource_address) _,node_name,_ = subscription_helper.parse_resource_address(resource_address)
this_delivery_time = notification_info['time'] this_delivery_time = notification_info['time']
# Change time from float to ascii format # Change time from float to ascii format
notification_info['time'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', notification_info['time'] = datetime.fromtimestamp(this_delivery_time).strftime('%Y-%m-%dT%H:%M:%S%fZ')
time.gmtime(this_delivery_time)) # notification_info['time'] = time.strftime('%Y-%m-%dT%H:%M:%SZ',
# time.gmtime(this_delivery_time))
entries = subscription_repo.get(Status=1) entries = subscription_repo.get(Status=1)
for entry in entries: for entry in entries:
@ -137,7 +139,7 @@ class NotificationHandler(NotificationHandlerBase):
else: else:
last_delivery_stat = self.notification_stat.get(node_name,{}).get(subscriptionid,{}) last_delivery_stat = self.notification_stat.get(node_name,{}).get(subscriptionid,{})
last_delivery_time = last_delivery_stat.get('EventTimestamp', None) last_delivery_time = last_delivery_stat.get('EventTimestamp', None)
if (last_delivery_time >= this_delivery_time): if (last_delivery_time and last_delivery_time >= this_delivery_time):
return return
last_delivery_stat['EventTimestamp'] = this_delivery_time last_delivery_stat['EventTimestamp'] = this_delivery_time
LOG.debug("delivery time @node: {0},subscription:{1} is updated".format( LOG.debug("delivery time @node: {0},subscription:{1} is updated".format(

View File

@ -8,6 +8,7 @@ import oslo_messaging
import logging import logging
import json import json
import kombu import kombu
from datetime import datetime, timezone
from notificationclientsdk.client.notificationservice import NotificationServiceClient from notificationclientsdk.client.notificationservice import NotificationServiceClient
from notificationclientsdk.common.helpers import subscription_helper from notificationclientsdk.common.helpers import subscription_helper
@ -114,9 +115,11 @@ class PtpService(object):
def add_subscription(self, subscription_dto): def add_subscription(self, subscription_dto):
subscription_orm = SubscriptionOrm(**subscription_dto.to_orm()) subscription_orm = SubscriptionOrm(**subscription_dto.to_orm())
resource_address = None
if hasattr(subscription_dto, 'ResourceAddress'): if hasattr(subscription_dto, 'ResourceAddress'):
_,nodename,_ = subscription_helper.parse_resource_address(subscription_dto.ResourceAddress) _,nodename,_ = subscription_helper.parse_resource_address(subscription_dto.ResourceAddress)
broker_name = nodename broker_name = nodename
resource_address = subscription_dto.ResourceAddress
elif hasattr(subscription_dto, 'ResourceType'): elif hasattr(subscription_dto, 'ResourceType'):
broker_name = subscription_dto.ResourceQualifier.NodeName broker_name = subscription_dto.ResourceQualifier.NodeName
default_node_name = NodeInfoHelper.default_node_name(broker_name) default_node_name = NodeInfoHelper.default_node_name(broker_name)
@ -136,11 +139,19 @@ class PtpService(object):
if default_node_name: if default_node_name:
ptpstatus = None ptpstatus = None
ptpstatus = self._query(default_node_name, broker_pod_ip) ptpstatus = self._query(default_node_name, broker_pod_ip, resource_address)
LOG.info("initial ptpstatus:{0}".format(ptpstatus)) LOG.info("initial ptpstatus:{0}".format(ptpstatus))
# construct subscription entry # construct subscription entry
subscription_orm.InitialDeliveryTimestamp = ptpstatus.get('EventTimestamp', None) timestamp = ptpstatus.get('EventTimestamp', None)
if timestamp is None:
timestamp = ptpstatus.get('time', None)
# Change time from float to ascii format
ptpstatus['time'] = datetime.fromtimestamp(ptpstatus['time']).strftime('%Y-%m-%dT%H:%M:%S%fZ')
# ptpstatus['time'] = time.strftime('%Y-%m-%dT%H:%M:%SZ',
# time.gmtime(timestamp))
subscription_orm.InitialDeliveryTimestamp = timestamp
entry = self.subscription_repo.add(subscription_orm) entry = self.subscription_repo.add(subscription_orm)
# Delivery the initial notification of ptp status # Delivery the initial notification of ptp status

View File

@ -10,6 +10,7 @@ from webob.exc import HTTPException, HTTPNotFound, HTTPBadRequest, HTTPClientErr
from wsme import types as wtypes from wsme import types as wtypes
from wsmeext.pecan import wsexpose from wsmeext.pecan import wsexpose
from datetime import datetime, timezone
import os import os
import logging import logging
import oslo_messaging import oslo_messaging
@ -45,6 +46,10 @@ class ResourceAddressController(object):
abort(404) abort(404)
ptpservice = PtpService(notification_control) ptpservice = PtpService(notification_control)
ptpstatus = ptpservice.query(THIS_NODE_NAME, self.resource_address) ptpstatus = ptpservice.query(THIS_NODE_NAME, self.resource_address)
# Change time from float to ascii format
# ptpstatus['time'] = time.strftime('%Y-%m-%dT%H:%M:%SZ',
# time.gmtime(ptpstatus['time']))
ptpstatus['time'] = datetime.fromtimestamp(ptpstatus['time']).strftime('%Y-%m-%dT%H:%M:%S%fZ')
return ptpstatus return ptpstatus
except client_exception.NodeNotAvailable as ex: except client_exception.NodeNotAvailable as ex:
LOG.warning("Node is not available:{0}".format(str(ex))) LOG.warning("Node is not available:{0}".format(str(ex)))

View File

@ -80,34 +80,69 @@ class PtpWatcherDefault:
self.watcher = watcher self.watcher = watcher
self.init_time = time.time() self.init_time = time.time()
def query_status(self, **rpc_kwargs): def _build_event_response(self, resource_path, last_event_time, resource_address, sync_state):
self.watcher.ptptracker_context_lock.acquire() if resource_path in [constants.SOURCE_SYNC_PTP_CLOCK_CLASS, constants.SOURCE_SYNCE_CLOCK_QUALITY]:
sync_state = self.watcher.ptptracker_context.get('sync_state', PtpState.Freerun) data_type = constants.DATA_TYPE_METRIC
last_event_time = self.watcher.ptptracker_context.get('last_event_time', time.time()) else:
self.watcher.ptptracker_context_lock.release() data_type = constants.DATA_TYPE_NOTIFICATION
lastStatus = {
'id': uuidutils.generate_uuid(),
'specversion': constants.SPEC_VERSION,
'source': resource_path,
'type': source_type[resource_path],
'time': last_event_time,
'data': {
'version': constants.DATA_VERSION,
'values': [
{
'data_type': data_type,
'ResourceAddress': resource_address,
'value_type': constants.VALUE_TYPE_ENUMERATION,
'value': sync_state
}
]
}
}
return lastStatus
def query_status(self, **rpc_kwargs):
lastStatus = {}
resource_address = rpc_kwargs.get('ResourceAddress', None) resource_address = rpc_kwargs.get('ResourceAddress', None)
if resource_address: if resource_address:
_, nodename, resource_path = ptpsync.parse_resource_address(resource_address) _, nodename, resource_path = ptpsync.parse_resource_address(resource_address)
lastStatus = { if resource_path == constants.SOURCE_SYNC_ALL:
'id': uuidutils.generate_uuid(), resource_path = constants.SOURCE_SYNC_SYNC_STATE
'specversion': constants.SPEC_VERSION, if resource_path == constants.SOURCE_SYNC_GNSS_SYNC_STATUS:
'source': resource_path, self.watcher.gnsstracker_context_lock.acquire()
'type': source_type[resource_path], sync_state = self.watcher.gnsstracker_context.get('sync_state', GnssState.Freerun)
'time': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(last_event_time)), last_event_time = self.watcher.gnsstracker_context.get('last_event_time', time.time())
'data': { self.watcher.gnsstracker_context_lock.release()
'version': constants.DATA_VERSION, lastStatus = self._build_event_response(resource_path, last_event_time, resource_address, sync_state)
'values': [ # elif resource_path == constants.SOURCE_SYNC_PTP_CLOCK_CLASS:
{ elif resource_path == constants.SOURCE_SYNC_PTP_LOCK_STATE:
'data_type': constants.DATA_TYPE_NOTIFICATION, self.watcher.ptptracker_context_lock.acquire()
'ResourceAddress': resource_address, sync_state = self.watcher.ptptracker_context.get('sync_state', PtpState.Freerun)
'value_type': constants.VALUE_TYPE_ENUMERATION, last_event_time = self.watcher.ptptracker_context.get('last_event_time', time.time())
'value': sync_state self.watcher.ptptracker_context_lock.release()
} lastStatus = self._build_event_response(resource_path, last_event_time, resource_address, sync_state)
] elif resource_path == constants.SOURCE_SYNC_OS_CLOCK:
} self.watcher.osclocktracker_context_lock.acquire()
} sync_state = self.watcher.osclocktracker_context.get('sync_state', OsClockState.Freerun)
last_event_time = self.watcher.osclocktracker_context.get('last_event_time', time.time())
self.watcher.osclocktracker_context_lock.release()
lastStatus = self._build_event_response(resource_path, last_event_time, resource_address, sync_state)
elif resource_path == constants.SOURCE_SYNC_SYNC_STATE:
self.watcher.overalltracker_context_lock.acquire()
sync_state = self.watcher.overalltracker_context.get('sync_state', OverallClockState.Freerun)
last_event_time = self.watcher.overalltracker_context.get('last_event_time', time.time())
self.watcher.overalltracker_context_lock.release()
lastStatus = self._build_event_response(resource_path, last_event_time, resource_address, sync_state)
LOG.debug("query_status: {}".format(lastStatus))
else: else:
self.watcher.ptptracker_context_lock.acquire()
sync_state = self.watcher.ptptracker_context.get('sync_state', PtpState.Freerun)
last_event_time = self.watcher.ptptracker_context.get('last_event_time', time.time())
self.watcher.ptptracker_context_lock.release()
lastStatus = { lastStatus = {
'ResourceType': ResourceType.TypePTP, 'ResourceType': ResourceType.TypePTP,
'EventData': { 'EventData': {