From 67452b94dbd5aad0d76b8e3e6da7169f92a65b19 Mon Sep 17 00:00:00 2001 From: Teresa Ho Date: Thu, 30 Jun 2022 16:53:41 -0400 Subject: [PATCH] Implement ORAN v2 api on notificationserver The commit added the support to publish ptp status for v2 API in the notification-service. It also renamed the previously name v0 API to v1 API. Test Plan: Pass: PTP status push notification v2 API Pass: PTP status push notification v1 API Pass: Subscribe/List/Delete subscription in v2 API Pass: Subscribe/List/Delete subscription in v1 API Story: 2010056 Task: 45809 Signed-off-by: Teresa Ho Change-Id: Id5a1ff955eade59d68b6bcadfea4ffe6ed1567cd --- .../client/notificationservice.py | 6 +- .../common/helpers/constants.py | 32 ++++ .../common/helpers/subscription_helper.py | 5 +- .../model/dto/__init__.py | 8 +- .../model/dto/subscription.py | 6 +- .../services/broker_state_manager.py | 15 +- .../services/notification_handler.py | 32 ++-- .../services/notification_worker.py | 6 +- .../notificationclientsdk/services/ptp.py | 29 +-- .../sidecar/controllers/root.py | 38 ++-- .../sidecar/controllers/v1/subscriptions.py | 102 +---------- .../sidecar/controllers/v2/__init__.py | 5 + .../controllers/v2/resource_address.py | 69 +++++++ .../sidecar/controllers/v2/subscriptions.py | 168 ++++++++++++++++++ .../sidecar/model/jsonify.py | 6 +- notificationservice-base/centos/Dockerfile | 1 + .../client/ptpeventproducer.py | 8 +- .../common/helpers/constants.py | 19 +- .../common/helpers/ptpsync.py | 17 +- .../trackingfunctionsdk/services/daemon.py | 87 +++++++-- 20 files changed, 480 insertions(+), 179 deletions(-) create mode 100644 notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/constants.py create mode 100644 notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/__init__.py create mode 100644 notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/resource_address.py create mode 100644 notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/subscriptions.py diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/client/notificationservice.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/client/notificationservice.py index f112542..f2d205d 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/client/notificationservice.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/client/notificationservice.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2021 Wind River Systems, Inc. +# Copyright (c) 2021-2022 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -58,12 +58,12 @@ class NotificationServiceClient(BrokerClientBase): return def query_resource_status(self, resource_type, - timeout=None, retry=None, resource_qualifier_json=None): + timeout=None, retry=None, resource_qualifier_json=None, resource_address=None): topic = '{0}-Status'.format(resource_type) server = '{0}-Tracking-{1}'.format(resource_type, self.target_node_name) return self.call( topic, server, 'QueryStatus', timeout=timeout, retry=retry, - QualifierJson=resource_qualifier_json) + QualifierJson=resource_qualifier_json, ResourceAddress=resource_address) def add_resource_status_listener(self, resource_type, status_handler=None): if not status_handler: diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/constants.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/constants.py new file mode 100644 index 0000000..f0d24b5 --- /dev/null +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/constants.py @@ -0,0 +1,32 @@ +# +# Copyright (c) 2022 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# +SPEC_VERSION = "1.0" +DATA_TYPE_NOTIFICATION = "notification" +DATA_TYPE_METRIC = "metric" +VALUE_TYPE_ENUMERATION = "enumeration" +VALUE_TYPE_METRIC = "metric" + +SOURCE_SYNC_ALL = '/sync' +SOURCE_SYNC_GNSS_SYNC_STATUS = '/sync/gnss-status/gnss-sync-status' +SOURCE_SYNC_PTP_CLOCK_CLASS = '/sync/ptp-status/clock-class' +SOURCE_SYNC_PTP_LOCK_STATE = '/sync/ptp-status/lock-state' +SOURCE_SYNC_OS_CLOCK = '/sync/sync-status/os-clock-sync-state' +SOURCE_SYNC_SYNC_STATE = '/sync/sync-status/sync-state' +SOURCE_SYNCE_CLOCK_QUALITY = '/sync/synce-status/clock-quality' +SOURCE_SYNCE_LOCK_STATE_EXTENDED = '/sync/synce-status/lock-state-extended' +SOURCE_SYNCE_LOCK_STATE = '/sync/synce-status/lock-state' + +VALID_SOURCE_URI = { + SOURCE_SYNC_ALL, + SOURCE_SYNC_GNSS_SYNC_STATUS, + SOURCE_SYNC_PTP_CLOCK_CLASS, + SOURCE_SYNC_PTP_LOCK_STATE, + SOURCE_SYNC_OS_CLOCK, + SOURCE_SYNC_SYNC_STATE, + SOURCE_SYNCE_CLOCK_QUALITY, + SOURCE_SYNCE_LOCK_STATE_EXTENDED, + SOURCE_SYNCE_LOCK_STATE +} diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/subscription_helper.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/subscription_helper.py index c846c66..020c0af 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/subscription_helper.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/subscription_helper.py @@ -1,6 +1,5 @@ -#coding=utf-8 # -# Copyright (c) 2021 Wind River Systems, Inc. +# Copyright (c) 2021-2022 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -59,7 +58,7 @@ def parse_resource_address(resource_address): # Assume no optional hierarchy for now clusterName = resource_address.split('/')[1] nodeName = resource_address.split('/')[2] - resource_path = re.split('[/]', resource_address, 3)[3] + resource_path = '/' + re.split('[/]', resource_address, 3)[3] return clusterName, nodeName, resource_path diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/model/dto/__init__.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/model/dto/__init__.py index 5d0d1b7..3b5c878 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/model/dto/__init__.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/model/dto/__init__.py @@ -1,16 +1,16 @@ # -# Copyright (c) 2021 Wind River Systems, Inc. +# Copyright (c) 2021-2022 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # -from notificationclientsdk.model.dto.subscription import SubscriptionInfoV0 from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1 +from notificationclientsdk.model.dto.subscription import SubscriptionInfoV2 from notificationclientsdk.model.dto.subscription import ResourceQualifierPtp from wsme.rest.json import tojson -@tojson.when_object(SubscriptionInfoV0) +@tojson.when_object(SubscriptionInfoV1) def subscriptioninfo_tojson(datatype, value): if value is None: return None @@ -22,7 +22,7 @@ def resourcequalifierptp_tojson(datatype, value): return None return value.to_dict() -@tojson.when_object(SubscriptionInfoV1) +@tojson.when_object(SubscriptionInfoV2) def subscriptioninfo_tojson(datatype, value): if value is None: return None diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/model/dto/subscription.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/model/dto/subscription.py index 0b0a18f..d4ddfe5 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/model/dto/subscription.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/model/dto/subscription.py @@ -1,6 +1,6 @@ #coding=utf-8 # -# Copyright (c) 2021 Wind River Systems, Inc. +# Copyright (c) 2021-2022 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -45,7 +45,7 @@ class ResourceQualifierPtp(ResourceQualifierBase): ''' ViewModel of Subscription ''' -class SubscriptionInfoV0(wtypes.Base): +class SubscriptionInfoV1(wtypes.Base): SubscriptionId = wtypes.text UriLocation = wtypes.text ResourceType = EnumResourceType @@ -99,7 +99,7 @@ class SubscriptionInfoV0(wtypes.Base): } return d -class SubscriptionInfoV1(wtypes.Base): +class SubscriptionInfoV2(wtypes.Base): SubscriptionId = wtypes.text UriLocation = wtypes.text EndpointUri = wtypes.text diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/broker_state_manager.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/broker_state_manager.py index 8e980a1..0107ab7 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/broker_state_manager.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/broker_state_manager.py @@ -1,13 +1,13 @@ # -# Copyright (c) 2021 Wind River Systems, Inc. +# Copyright (c) 2021-2022 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # import json import logging -from notificationclientsdk.model.dto.subscription import SubscriptionInfoV0 from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1 +from notificationclientsdk.model.dto.subscription import SubscriptionInfoV2 from notificationclientsdk.model.dto.resourcetype import ResourceType from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper from notificationclientsdk.common.helpers import subscription_helper @@ -96,9 +96,8 @@ class BrokerStateManager: changed = False broker_name = None - LOG.info("__refresh_by_subscription: subscription_orm={}".format(subscription_orm)) if getattr(subscription_orm, 'ResourceType') is not None: - subscription = SubscriptionInfoV0(subscription_orm) + subscription = SubscriptionInfoV1(subscription_orm) resource = subscription.ResourceType # assume PTP and not wildcard if resource == ResourceType.TypePTP: @@ -108,22 +107,22 @@ class BrokerStateManager: LOG.debug("Ignore the subscription for: {0}".format(subscription_orm.SubscriptionId)) return False else: - subscription = SubscriptionInfoV1(subscription_orm) + subscription = SubscriptionInfoV2(subscription_orm) _, nodename, resource = subscription_helper.parse_resource_address(subscription.ResourceAddress) broker_name = nodename - LOG.info("subscription:{0}, Status:{1}".format(subscription.to_dict(), subscription_orm.Status)) + LOG.debug("subscription:{0}, Status:{1}".format(subscription.to_dict(), subscription_orm.Status)) if subscription_orm.Status != 1: return False if not broker_name: # ignore the subscription due to unsupported type - LOG.info("Ignore the subscription for: {0}".format(subscription.SubscriptionId)) + LOG.debug("Ignore the subscription for: {0}".format(subscription.SubscriptionId)) return False enumerated_broker_names = NodeInfoHelper.enumerate_nodes(broker_name) if not enumerated_broker_names: - LOG.info("Failed to enumerate broker names for {0}".format(broker_name)) + LOG.debug("Failed to enumerate broker names for {0}".format(broker_name)) return False for expanded_broker_name in enumerated_broker_names: diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_handler.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_handler.py index 7226e4d..cda17b1 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_handler.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_handler.py @@ -1,6 +1,5 @@ - # -# Copyright (c) 2021 Wind River Systems, Inc. +# Copyright (c) 2021-2022 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -10,9 +9,10 @@ import logging import multiprocessing as mp import threading +import time -from notificationclientsdk.model.dto.subscription import SubscriptionInfoV0 from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1 +from notificationclientsdk.model.dto.subscription import SubscriptionInfoV2 from notificationclientsdk.model.dto.resourcetype import ResourceType from notificationclientsdk.repository.subscription_repo import SubscriptionRepo @@ -48,32 +48,38 @@ class NotificationHandler(NotificationHandlerBase): self.notification_lock.acquire() subscription_repo = SubscriptionRepo(autocommit=True) resource_type = notification_info.get('ResourceType', None) - resource_address = notification_info.get('ResourceAddress', None) # Get nodename from resource address - if resource_address: - _,node_name,_ = subscription_helper.parse_resource_address(resource_address) - else: + if resource_type: node_name = notification_info.get('ResourceQualifier', {}).get('NodeName', None) if not resource_type: raise Exception("abnormal notification@{0}".format(node_name)) - if not resource_type in self.__supported_resource_types: raise Exception("notification with unsupported resource type:{0}".format(resource_type)) + this_delivery_time = notification_info['EventTimestamp'] + else: + source = notification_info.get('source', None) + values = notification_info.get('data', {}).get('values', []) + resource_address = values[0].get('ResourceAddress', None) + if not resource_address: + raise Exception("No resource address in notification source".format(source)) + _,node_name,_ = subscription_helper.parse_resource_address(resource_address) + this_delivery_time = notification_info['time'] + # Change time from float to ascii format + notification_info['time'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', + time.gmtime(this_delivery_time)) - this_delivery_time = notification_info['EventTimestamp'] - - entries = subscription_repo.get(ResourceType=resource_type, Status=1) + entries = subscription_repo.get(Status=1) for entry in entries: subscriptionid = entry.SubscriptionId if entry.ResourceAddress: _,entry_node_name,_ = subscription_helper.parse_resource_address(entry.ResourceAddress) - subscription_dto2 = SubscriptionInfoV1(entry) + subscription_dto2 = SubscriptionInfoV2(entry) else: ResourceQualifierJson = entry.ResourceQualifierJson or '{}' ResourceQualifier = json.loads(ResourceQualifierJson) # qualify by NodeName entry_node_name = ResourceQualifier.get('NodeName', None) - subscription_dto2 = SubscriptionInfoV0(entry) + subscription_dto2 = SubscriptionInfoV1(entry) node_name_matched = NodeInfoHelper.match_node_name(entry_node_name, node_name) if not node_name_matched: continue diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_worker.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_worker.py index 17e1709..4206903 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_worker.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_worker.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2021 Wind River Systems, Inc. +# Copyright (c) 2021-2022 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -20,7 +20,7 @@ from notificationclientsdk.common.helpers import rpc_helper, hostfile_helper from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper from notificationclientsdk.model.dto.rpc_endpoint import RpcEndpointInfo -from notificationclientsdk.model.dto.subscription import SubscriptionInfoV0 +from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1 from notificationclientsdk.model.dto.resourcetype import ResourceType from notificationclientsdk.model.dto.location import LocationInfo @@ -242,7 +242,7 @@ class NotificationWorker: for s in subs: if s.ResourceType: - subinfo = SubscriptionInfoV0(s) + subinfo = SubscriptionInfoV1(s) # assume resource type being PTP and not wildcard resource_type = s.ResourceType if resource_type == ResourceType.TypePTP: diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py index 64dc23c..3090062 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2021 Wind River Systems, Inc. +# Copyright (c) 2021-2022 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -9,16 +9,17 @@ import logging import json import kombu +from notificationclientsdk.client.notificationservice import NotificationServiceClient +from notificationclientsdk.common.helpers import subscription_helper +from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper +from notificationclientsdk.model.dto.resourcetype import ResourceType +from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1 +from notificationclientsdk.model.dto.subscription import SubscriptionInfoV2 +from notificationclientsdk.model.orm.subscription import Subscription as SubscriptionOrm from notificationclientsdk.repository.node_repo import NodeRepo from notificationclientsdk.repository.subscription_repo import SubscriptionRepo -from notificationclientsdk.model.dto.resourcetype import ResourceType -from notificationclientsdk.model.dto.subscription import SubscriptionInfoV0 -from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1 -from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper -from notificationclientsdk.model.orm.subscription import Subscription as SubscriptionOrm -from notificationclientsdk.client.notificationservice import NotificationServiceClient from notificationclientsdk.services.daemon import DaemonControl -from notificationclientsdk.common.helpers import subscription_helper + from notificationclientsdk.exception import client_exception @@ -70,7 +71,7 @@ class PtpService(object): finally: del nodeinfo_repo - def query(self, broker_name): + def query(self, broker_name, resource_address=None): default_node_name = NodeInfoHelper.default_node_name(broker_name) broker_pod_ip, supported_resource_types = self.__get_node_info(default_node_name) @@ -83,9 +84,9 @@ class PtpService(object): ResourceType.TypePTP, default_node_name)) raise client_exception.ResourceNotAvailable(broker_name, ResourceType.TypePTP) - return self._query(default_node_name, broker_pod_ip) + return self._query(default_node_name, broker_pod_ip, resource_address) - def _query(self, broker_name, broker_pod_ip): + def _query(self, broker_name, broker_pod_ip, resource_address=None): broker_host = "[{0}]".format(broker_pod_ip) broker_transport_endpoint = "rabbit://{0}:{1}@{2}:{3}".format( self.daemon_control.daemon_context['NOTIFICATION_BROKER_USER'], @@ -97,7 +98,7 @@ class PtpService(object): notificationservice_client = NotificationServiceClient( broker_name, broker_transport_endpoint, broker_pod_ip) resource_status = notificationservice_client.query_resource_status( - ResourceType.TypePTP, timeout=5, retry=10) + ResourceType.TypePTP, timeout=5, retry=10, resource_address=resource_address) return resource_status except oslo_messaging.exceptions.MessagingTimeout as ex: LOG.warning("ptp status is not available @node {0} due to {1}".format( @@ -144,9 +145,9 @@ class PtpService(object): # Delivery the initial notification of ptp status if hasattr(subscription_dto, 'ResourceType'): - subscription_dto2 = SubscriptionInfoV0(entry) - else: subscription_dto2 = SubscriptionInfoV1(entry) + else: + subscription_dto2 = SubscriptionInfoV2(entry) try: subscription_helper.notify(subscription_dto2, ptpstatus) diff --git a/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/root.py b/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/root.py index 16e0a34..636cdd0 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/root.py +++ b/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/root.py @@ -1,6 +1,5 @@ -#coding=utf-8 # -# Copyright (c) 2021 Wind River Systems, Inc. +# Copyright (c) 2021-2022 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -14,9 +13,10 @@ from wsmeext.pecan import wsexpose THIS_NODE_NAME = os.environ.get("THIS_NODE_NAME",'controller-0') -from sidecar.controllers.v1.subscriptions import SubscriptionsControllerV0 from sidecar.controllers.v1.subscriptions import SubscriptionsControllerV1 +from sidecar.controllers.v2.subscriptions import SubscriptionsControllerV2 from sidecar.controllers.v1.resource.ptp import PtpController +from sidecar.controllers.v2.resource_address import ResourceAddressController import logging LOG = logging.getLogger(__name__) from notificationclientsdk.common.helpers import log_helper @@ -40,21 +40,33 @@ class V1Controller(rest.RestController): @expose("json") def _lookup(self, primary_key, *remainder): - LOG.info("_lookup: primary_key={} remainder={}".format(primary_key, remainder)) - payload = None - if request.is_body_readable: - payload = request.json_body - LOG.info("_lookup: payload={}".format(payload)) if primary_key: if 'ptp' == primary_key.lower(): return PtpController(), remainder elif 'subscriptions' == primary_key.lower(): - if payload and 'ResourceType' in payload: - return SubscriptionsControllerV0(), remainder - else: - return SubscriptionsControllerV1(), remainder + return SubscriptionsControllerV1(), remainder abort(404) +class V2Controller(rest.RestController): + + @wsexpose(wtypes.text) + def get(self): + return 'v2controller' + + @expose("json") + def _lookup(self, primary_key, *remainder): + if primary_key: + if 'subscriptions' == primary_key.lower(): + return SubscriptionsControllerV2(), remainder + else: + if 'currentstate' == remainder[-1].lower(): + resource_address_array = remainder[:-1] + resource_address = '/' + primary_key + '/' + '/'.join(resource_address_array) + remainder = remainder[-1:] + return ResourceAddressController(resource_address), remainder + abort(404) + + class ocloudDaemonController(rest.RestController): # All supported API versions @@ -73,6 +85,8 @@ class ocloudDaemonController(rest.RestController): if primary_key: if 'v1' == primary_key.lower(): return V1Controller(), remainder + elif 'v2' == primary_key.lower(): + return V2Controller(), remainder abort(404) class RootController(object): diff --git a/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v1/subscriptions.py b/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v1/subscriptions.py index 7ffed79..fa4eb4f 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v1/subscriptions.py +++ b/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v1/subscriptions.py @@ -1,6 +1,6 @@ #coding=utf-8 # -# Copyright (c) 2021 Wind River Systems, Inc. +# Copyright (c) 2021-2022 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -16,7 +16,6 @@ from wsme import types as wtypes from wsmeext.pecan import wsexpose from notificationclientsdk.model.dto.resourcetype import ResourceType -from notificationclientsdk.model.dto.subscription import SubscriptionInfoV0 from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1 from notificationclientsdk.repository.subscription_repo import SubscriptionRepo @@ -33,9 +32,9 @@ log_helper.config_logger(LOG) THIS_NODE_NAME = os.environ.get("THIS_NODE_NAME",'controller-0') -class SubscriptionsControllerV0(rest.RestController): +class SubscriptionsControllerV1(rest.RestController): - @wsexpose(SubscriptionInfoV0, body=SubscriptionInfoV0, status_code=201) + @wsexpose(SubscriptionInfoV1, body=SubscriptionInfoV1, status_code=201) def post(self, subscription): # decode the request body try: @@ -49,7 +48,7 @@ class SubscriptionsControllerV0(rest.RestController): subscription.ResourceType)) abort(404) - if not self._validateV0(subscription): + if not self._validateV1(subscription): LOG.warning(' Invalid Request data:{0}'.format(subscription.to_dict())) abort(400) @@ -88,87 +87,6 @@ class SubscriptionsControllerV0(rest.RestController): LOG.error("Exception:{0}@{1}".format(type(ex),str(ex))) abort(500) - @expose('json') - def get(self): - try: - repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit = False) - entries = repo.get(Status=1) - - response.status = 200 - return [SubscriptionInfoV0(x).to_dict() for x in entries if x.Status == 1] - except HTTPException as ex: - LOG.warning("Client side error:{0},{1}".format(type(ex), str(ex))) - raise ex - except HTTPServerError as ex: - LOG.error("Server side error:{0},{1}".format(type(ex), str(ex))) - raise ex - except Exception as ex: - LOG.error("Exception:{0}@{1}".format(type(ex),str(ex))) - abort(500) - - @expose() - def _lookup(self, subscription_id, *remainder): - return SubscriptionController(subscription_id), remainder - - def _validateV0(self, subscription_request): - try: - assert subscription_request.ResourceType == 'PTP' - assert subscription_request.EndpointUri - - return True - except: - return False - - -class SubscriptionsControllerV1(rest.RestController): - - @wsexpose(SubscriptionInfoV1, body=SubscriptionInfoV1, status_code=201) - def post(self, subscription): - # decode the request body - try: - if subscription.ResourceAddress: - LOG.info(' subscribe: ResourceAddress {0} with callback uri {1}'.format( - subscription.ResourceAddress, - subscription.EndpointUri)) - - if not self._validateV1(subscription): - LOG.warning(' Invalid Request data:{0}'.format(subscription.to_dict())) - abort(400) - - subscription.UriLocation = "{0}://{1}:{2}/ocloudNotifications/v1/subscriptions".format( - conf.server.get('protocol','http'), - conf.server.get('host', '127.0.0.1'), - conf.server.get('port', '8080') - ) - if subscription.ResourceAddress: - ptpservice = PtpService(notification_control) - entry = ptpservice.add_subscription(subscription) - del ptpservice - if not entry: - abort(404) - subscription.SubscriptionId = entry.SubscriptionId - subscription.UriLocation = entry.UriLocation - LOG.info('created subscription: {0}'.format(subscription.to_dict())) - - return subscription - except client_exception.InvalidSubscription as ex: - abort(400) - except client_exception.InvalidEndpoint as ex: - abort(400) - except client_exception.NodeNotAvailable as ex: - abort(404) - except client_exception.ResourceNotAvailable as ex: - abort(404) - except HTTPException as ex: - LOG.warning("Client side error:{0},{1}".format(type(ex), str(ex))) - abort(400) - except HTTPServerError as ex: - LOG.error("Server side error:{0},{1}".format(type(ex), str(ex))) - abort(500) - except Exception as ex: - LOG.error("Exception:{0}@{1}".format(type(ex),str(ex))) - abort(500) - @expose('json') def get(self): try: @@ -178,9 +96,7 @@ class SubscriptionsControllerV1(rest.RestController): subs = [] for x in entries: if x.Status == 1: - if getattr(x, 'ResourceType', None): - subs.append(SubscriptionInfoV0(x).to_dict()) - else: + if getattr(x, 'ResourceType', None) is not None: subs.append(SubscriptionInfoV1(x).to_dict()) return subs except HTTPException as ex: @@ -199,13 +115,14 @@ class SubscriptionsControllerV1(rest.RestController): def _validateV1(self, subscription_request): try: - assert subscription_request.ResourceAddress + assert subscription_request.ResourceType == 'PTP' assert subscription_request.EndpointUri return True except: return False + class SubscriptionController(rest.RestController): def __init__(self, subscription_id): self.subscription_id = subscription_id @@ -220,10 +137,7 @@ class SubscriptionController(rest.RestController): abort(404) else: response.status = 200 - if getattr(entry, 'ResourceType', None): - return SubscriptionInfoV0(entry).to_dict() - else: - return SubscriptionInfoV1(entry).to_dict() + return SubscriptionInfoV1(entry).to_dict() except HTTPException as ex: LOG.warning("Client side error:{0},{1}".format(type(ex), str(ex))) raise ex diff --git a/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/__init__.py b/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/__init__.py new file mode 100644 index 0000000..12f4606 --- /dev/null +++ b/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/__init__.py @@ -0,0 +1,5 @@ +# +# Copyright (c) 2022 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# diff --git a/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/resource_address.py b/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/resource_address.py new file mode 100644 index 0000000..5c41ee9 --- /dev/null +++ b/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/resource_address.py @@ -0,0 +1,69 @@ +# +# Copyright (c) 2022 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# + +from pecan import expose, redirect, rest, route, response, abort +from webob.exc import HTTPException, HTTPNotFound, HTTPBadRequest, HTTPClientError, HTTPServerError + +from wsme import types as wtypes +from wsmeext.pecan import wsexpose + +import os +import logging +import oslo_messaging + +from notificationclientsdk.common.helpers import constants +from notificationclientsdk.common.helpers import subscription_helper +from notificationclientsdk.services.ptp import PtpService +from notificationclientsdk.exception import client_exception + +from sidecar.repository.notification_control import notification_control + +LOG = logging.getLogger(__name__) + +from notificationclientsdk.common.helpers import log_helper +log_helper.config_logger(LOG) + +THIS_NODE_NAME = os.environ.get("THIS_NODE_NAME",'controller-0') + +class ResourceAddressController(object): + def __init__(self, resource_address): + self.resource_address = resource_address + + @expose('json') + def CurrentState(self): + try: + # validate resource address + _, nodename, resource = subscription_helper.parse_resource_address(self.resource_address) + if nodename != THIS_NODE_NAME and nodename != '.': + LOG.warning("Node {} is not available".format(nodename)) + abort(404) + if resource not in constants.VALID_SOURCE_URI: + LOG.warning("Resource {} is not valid".format(resource)) + abort(404) + ptpservice = PtpService(notification_control) + ptpstatus = ptpservice.query(THIS_NODE_NAME, self.resource_address) + return ptpstatus + except client_exception.NodeNotAvailable as ex: + LOG.warning("Node is not available:{0}".format(str(ex))) + abort(404) + except client_exception.ResourceNotAvailable as ex: + LOG.warning("Resource is not available:{0}".format(str(ex))) + abort(404) + except oslo_messaging.exceptions.MessagingTimeout as ex: + LOG.warning("Resource is not reachable:{0}".format(str(ex))) + abort(404) + except HTTPException as ex: + LOG.warning("Client side error:{0},{1}".format(type(ex), str(ex))) + # raise ex + abort(400) + except HTTPServerError as ex: + LOG.error("Server side error:{0},{1}".format(type(ex), str(ex))) + # raise ex + abort(500) + except Exception as ex: + LOG.error("Exception:{0}@{1}".format(type(ex),str(ex))) + abort(500) + diff --git a/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/subscriptions.py b/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/subscriptions.py new file mode 100644 index 0000000..e06a5c1 --- /dev/null +++ b/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/subscriptions.py @@ -0,0 +1,168 @@ +# +# Copyright (c) 2021-2022 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# + +from pecan import conf +from pecan import expose, rest, response, abort +from webob.exc import HTTPException, HTTPNotFound, HTTPBadRequest, HTTPClientError, HTTPServerError + +import os +import logging + +from wsme import types as wtypes +from wsmeext.pecan import wsexpose + +from notificationclientsdk.model.dto.resourcetype import ResourceType +from notificationclientsdk.model.dto.subscription import SubscriptionInfoV2 + +from notificationclientsdk.repository.subscription_repo import SubscriptionRepo +from notificationclientsdk.services.ptp import PtpService +from notificationclientsdk.exception import client_exception + +from sidecar.repository.notification_control import notification_control +from sidecar.repository.dbcontext_default import defaults + +LOG = logging.getLogger(__name__) + +from notificationclientsdk.common.helpers import log_helper +log_helper.config_logger(LOG) + +THIS_NODE_NAME = os.environ.get("THIS_NODE_NAME",'controller-0') + +class SubscriptionsControllerV2(rest.RestController): + + @wsexpose(SubscriptionInfoV2, body=SubscriptionInfoV2, status_code=201) + def post(self, subscription): + # decode the request body + try: + if subscription.ResourceAddress: + LOG.info(' subscribe: ResourceAddress {0} with callback uri {1}'.format( + subscription.ResourceAddress, + subscription.EndpointUri)) + + if not self._validateV2(subscription): + LOG.warning(' Invalid Request data:{0}'.format(subscription.to_dict())) + abort(400) + + subscription.UriLocation = "{0}://{1}:{2}/ocloudNotifications/v2/subscriptions".format( + conf.server.get('protocol','http'), + conf.server.get('host', '127.0.0.1'), + conf.server.get('port', '8080') + ) + if subscription.ResourceAddress: + ptpservice = PtpService(notification_control) + entry = ptpservice.add_subscription(subscription) + del ptpservice + if not entry: + abort(404) + subscription.SubscriptionId = entry.SubscriptionId + subscription.UriLocation = entry.UriLocation + LOG.info('created subscription: {0}'.format(subscription.to_dict())) + + return subscription + except client_exception.InvalidSubscription as ex: + abort(400) + except client_exception.InvalidEndpoint as ex: + abort(400) + except client_exception.NodeNotAvailable as ex: + abort(404) + except client_exception.ResourceNotAvailable as ex: + abort(404) + except HTTPException as ex: + LOG.warning("Client side error:{0},{1}".format(type(ex), str(ex))) + abort(400) + except HTTPServerError as ex: + LOG.error("Server side error:{0},{1}".format(type(ex), str(ex))) + abort(500) + except Exception as ex: + LOG.error("Exception:{0}@{1}".format(type(ex),str(ex))) + abort(500) + + @expose('json') + def get(self): + try: + repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit = False) + entries = repo.get(Status=1) + response.status = 200 + subs = [] + for x in entries: + if x.Status == 1: + if getattr(x, 'ResourceAddress', None) is not None: + subs.append(SubscriptionInfoV2(x).to_dict()) + return subs + except HTTPException as ex: + LOG.warning("Client side error:{0},{1}".format(type(ex), str(ex))) + raise ex + except HTTPServerError as ex: + LOG.error("Server side error:{0},{1}".format(type(ex), str(ex))) + raise ex + except Exception as ex: + LOG.error("Exception:{0}@{1}".format(type(ex),str(ex))) + abort(500) + + @expose() + def _lookup(self, subscription_id, *remainder): + return SubscriptionController(subscription_id), remainder + + def _validateV2(self, subscription_request): + try: + assert subscription_request.ResourceAddress + assert subscription_request.EndpointUri + + return True + except: + return False + +class SubscriptionController(rest.RestController): + def __init__(self, subscription_id): + self.subscription_id = subscription_id + + @expose('json') + def get(self): + try: + repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit = False) + entry = repo.get_one(SubscriptionId=self.subscription_id, Status=1) + + if not entry: + abort(404) + else: + response.status = 200 + if getattr(entry, 'ResourceAddress', None): + return SubscriptionInfoV2(entry).to_dict() + + except HTTPException as ex: + LOG.warning("Client side error:{0},{1}".format(type(ex), str(ex))) + raise ex + except HTTPServerError as ex: + LOG.error("Server side error:{0},{1}".format(type(ex), str(ex))) + raise ex + except Exception as ex: + LOG.error("Exception:{0}@{1}".format(type(ex),str(ex))) + abort(500) + + @wsexpose(status_code=204) + def delete(self): + try: + repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit = False) + entry = repo.get_one(SubscriptionId=self.subscription_id) + if entry: + if entry.SubscriptionId: + ptpservice = PtpService(notification_control) + ptpservice.remove_subscription(entry.SubscriptionId) + del ptpservice + return + else: + repo.delete_one(SubscriptionId=self.subscription_id) + return + abort(404) + except HTTPException as ex: + LOG.warning("Client side error:{0},{1}".format(type(ex), str(ex))) + raise ex + except HTTPServerError as ex: + LOG.error("Server side error:{0},{1}".format(type(ex), str(ex))) + raise ex + except Exception as ex: + LOG.error("Exception:{0}@{1}".format(type(ex),str(ex))) + abort(500) diff --git a/notificationclient-base/docker/notificationclient-sidecar/sidecar/model/jsonify.py b/notificationclient-base/docker/notificationclient-sidecar/sidecar/model/jsonify.py index 19762c5..859d923 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/sidecar/model/jsonify.py +++ b/notificationclient-base/docker/notificationclient-sidecar/sidecar/model/jsonify.py @@ -4,13 +4,13 @@ # SPDX-License-Identifier: Apache-2.0 # -from notificationclientsdk.model.dto.subscription import SubscriptionInfoV0 from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1 +from notificationclientsdk.model.dto.subscription import SubscriptionInfoV2 from notificationclientsdk.model.dto.subscription import ResourceQualifierPtp from pecan.jsonify import jsonify -@jsonify.register(SubscriptionInfoV0) +@jsonify.register(SubscriptionInfoV1) def jsonify_subscriptioninfo(subscriptionInfo): return subscriptionInfo.to_dict() @@ -18,7 +18,7 @@ def jsonify_subscriptioninfo(subscriptionInfo): def jsonify_resourcequalifierptp(resourceQualifierPtp): return resourceQualifierPtp.to_dict() -@jsonify.register(SubscriptionInfoV1) +@jsonify.register(SubscriptionInfoV2) def jsonify_subscriptioninfo(subscriptionInfo): return subscriptionInfo.to_dict() diff --git a/notificationservice-base/centos/Dockerfile b/notificationservice-base/centos/Dockerfile index a138932..cf916a0 100644 --- a/notificationservice-base/centos/Dockerfile +++ b/notificationservice-base/centos/Dockerfile @@ -11,6 +11,7 @@ RUN set -ex ;\ -y \ gcc python3-devel python3-pip \ && pip3 install --user pecan \ + && pip3 install pygtail \ && pip3 install oslo-config \ && pip3 install oslo-messaging \ && pip3 install WSME diff --git a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/client/ptpeventproducer.py b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/client/ptpeventproducer.py index 94c745d..46fe83f 100644 --- a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/client/ptpeventproducer.py +++ b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/client/ptpeventproducer.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2021 Wind River Systems, Inc. +# Copyright (c) 2021-2022 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -37,7 +37,7 @@ class PtpEventProducer(object): return None def TriggerDelivery(self, ctx, **rpc_kwargs): - LOG.debug ("PtpEventProducer TriggerDelivery called %s" %rpc_kwargs) + LOG.debug("PtpEventProducer TriggerDelivery called %s" %rpc_kwargs) if self.handler: return self.handler.trigger_delivery(**rpc_kwargs) else: @@ -71,10 +71,10 @@ class PtpEventProducer(object): result2 = self.publish_status_all(ptpstatus, retry) if self.registration_broker_client else result return result1, result2 - def publish_status_local(self, ptpstatus, retry=3): + def publish_status_local(self, ptpstatus, source, retry=3): if not self.local_broker_client: return False - topic='PTP-Event-{0}'.format(self.node_name) + topic='{0}-Event-{1}'.format(source, self.node_name) server = None isretrystopped = False while not isretrystopped: diff --git a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/constants.py b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/constants.py index d40a32d..67c8e70 100644 --- a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/constants.py +++ b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/constants.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2021 Wind River Systems, Inc. +# Copyright (c) 2021-2022 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -46,3 +46,20 @@ PHC2SYS_TOLERANCE_HIGH = 37000001000 # testing values CGU_PATH_VALID = "/sys/kernel/debug/ice/0000:18:00.0/cgu" + +SPEC_VERSION = "1.0" +DATA_VERSION = "1.0" +DATA_TYPE_NOTIFICATION = "notification" +DATA_TYPE_METRIC = "metric" +VALUE_TYPE_ENUMERATION = "enumeration" +VALUE_TYPE_METRIC = "metric" + +SOURCE_SYNC_ALL = '/sync' +SOURCE_SYNC_GNSS_SYNC_STATUS = '/sync/gnss-status/gnss-sync-status' +SOURCE_SYNC_PTP_CLOCK_CLASS = '/sync/ptp-status/clock-class' +SOURCE_SYNC_PTP_LOCK_STATE = '/sync/ptp-status/lock-state' +SOURCE_SYNC_OS_CLOCK = '/sync/sync-status/os-clock-sync-state' +SOURCE_SYNC_SYNC_STATE = '/sync/sync-status/sync-state' +SOURCE_SYNCE_CLOCK_QUALITY = '/sync/synce-status/clock-quality' +SOURCE_SYNCE_LOCK_STATE_EXTENDED = '/sync/synce-status/lock-state-extended' +SOURCE_SYNCE_LOCK_STATE = '/sync/synce-status/lock-state' diff --git a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptpsync.py b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptpsync.py index 57dd202..516c5ea 100644 --- a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptpsync.py +++ b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptpsync.py @@ -1,6 +1,6 @@ #! /usr/bin/python3 # -# Copyright (c) 2021 Wind River Systems, Inc. +# Copyright (c) 2021-2022 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -14,6 +14,7 @@ # import errno, os import os.path +import re import sys import subprocess import datetime @@ -196,3 +197,17 @@ def ptp_status(holdover_time, freq, sync_state, event_time): else: new_event = "false" return new_event, sync_state, event_time + +def parse_resource_address(resource_address): + # The format of resource address is: + # /{clusterName}/{siteName}(/optional/hierarchy/..)/{nodeName}/{resource} + # Assume no optional hierarchy for now + clusterName = resource_address.split('/')[1] + nodeName = resource_address.split('/')[2] + resource_path = '/' + re.split('[/]', resource_address, 3)[3] + return clusterName, nodeName, resource_path + +def format_resource_address(node_name, resource): + # Return a resource_address + resource_address = '/./' + node_name + resource + return resource_address diff --git a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py index 2c73421..2072a80 100644 --- a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py +++ b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py @@ -9,12 +9,15 @@ import json import time import oslo_messaging from oslo_config import cfg +from oslo_utils import uuidutils + import logging import multiprocessing as mp import threading from trackingfunctionsdk.client.ptpeventproducer import PtpEventProducer +from trackingfunctionsdk.common.helpers import constants from trackingfunctionsdk.common.helpers import ptpsync from trackingfunctionsdk.common.helpers import log_helper from trackingfunctionsdk.common.helpers.dmesg_watcher import DmesgWatcher @@ -36,6 +39,18 @@ log_helper.config_logger(LOG) THIS_NODE_NAME = os.environ.get("THIS_NODE_NAME",'controller-0') +# Event source to event type mapping +source_type = { + '/sync/gnss-status/gnss-sync-status': 'event.sync.gnss-status.gnss-state-change', + '/sync/ptp-status/clock-class': 'event.sync.ptp-status.ptp-clock-class-change', + '/sync/ptp-status/lock-state': 'event.sync.ptp-status.ptp-state-change', + '/sync/sync-status/os-clock-sync-state': 'event.sync.sync-status.os-clock-sync-state-change', + '/sync/sync-status/sync-state': 'event.sync.sync-status.synchronization-state-change', + '/sync/synce-status/clock-quality': 'event.sync.synce-status.synce-clock-quality-change', + '/sync/synce-status/lock-state-extended': 'event.sync.synce-status.synce-state-change-extended', + '/sync/synce-status/lock-state': 'event.sync.synce-status.synce-state-change', + '/sync/synce-status/lock-state': 'event.sync.synce-status.synce-state-change', +} '''Entry point of Default Process Worker''' def ProcessWorkerDefault(event, sqlalchemy_conf_json, broker_transport_endpoint): @@ -61,16 +76,38 @@ class PtpWatcherDefault: last_event_time = self.watcher.ptptracker_context.get('last_event_time', time.time()) self.watcher.ptptracker_context_lock.release() - lastStatus = { - 'ResourceType': ResourceType.TypePTP, - 'EventData': { - 'State': sync_state - }, - 'ResourceQualifier': { - 'NodeName': self.watcher.node_name - }, - 'EventTimestamp': last_event_time - } + resource_address = rpc_kwargs.get('ResourceAddress', None) + if resource_address: + _, nodename, resource_path = ptpsync.parse_resource_address(resource_address) + lastStatus = { + 'id': uuidutils.generate_uuid(), + 'specversion': constants.SPEC_VERSION, + 'source': resource_path, + 'type': source_type[resource_path], + 'time': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(last_event_time)), + 'data': { + 'version': constants.DATA_VERSION, + 'values': [ + { + 'data_type': constants.DATA_TYPE_NOTIFICATION, + 'ResourceAddress': resource_address, + 'value_type': constants.VALUE_TYPE_ENUMERATION, + 'value': sync_state + } + ] + } + } + else: + lastStatus = { + 'ResourceType': ResourceType.TypePTP, + 'EventData': { + 'State': sync_state + }, + 'ResourceQualifier': { + 'NodeName': self.watcher.node_name + }, + 'EventTimestamp': last_event_time + } return lastStatus def trigger_delivery(self, **rpc_kwargs): @@ -183,7 +220,7 @@ class PtpWatcherDefault: holdover_time, freq, sync_state, last_event_time) return new_event, sync_state, new_event_time - '''announce location''' + '''publish ptp status''' def __publish_ptpstatus(self, forced=False): holdover_time = float(self.ptptracker_context['holdover_seconds']) freq = float(self.ptptracker_context['poll_freq_seconds']) @@ -200,7 +237,7 @@ class PtpWatcherDefault: self.ptptracker_context['last_event_time'] = new_event_time self.ptptracker_context_lock.release() - # publish new event + # publish new event in API version v1 format LOG.debug("publish ptp status to clients") lastStatus = { 'ResourceType': 'PTP', @@ -212,7 +249,31 @@ class PtpWatcherDefault: }, 'EventTimestamp': new_event_time } - self.ptpeventproducer.publish_status(lastStatus) + self.ptpeventproducer.publish_status(lastStatus, 'PTP') + + # publish new event in API version v2 format + resource_address = ptpsync.format_resource_address( + self.node_name, constants.SOURCE_SYNC_SYNC_STATE) + lastStatus = { + 'id': uuidutils.generate_uuid(), + 'specversion': constants.SPEC_VERSION, + 'source': constants.SOURCE_SYNC_SYNC_STATE, + 'type': source_type[constants.SOURCE_SYNC_SYNC_STATE], + 'time': new_event_time, + 'data': { + 'version': constants.DATA_VERSION, + 'values': [ + { + 'data_type': constants.DATA_TYPE_NOTIFICATION, + 'ResourceAddress': resource_address, + 'value_type': constants.VALUE_TYPE_ENUMERATION, + 'value': sync_state + } + ] + } + } + self.ptpeventproducer.publish_status(lastStatus, constants.SOURCE_SYNC_SYNC_STATE) + self.ptpeventproducer.publish_status(lastStatus, constants.SOURCE_SYNC_ALL) return