From bbeef9574409b886f1fb9d7b2ba9f7d7f61d4851 Mon Sep 17 00:00:00 2001 From: Cole Walker Date: Wed, 30 Oct 2024 14:22:26 -0400 Subject: [PATCH] Rework notification messages to remove instance tags Rework the pull status and subscription notification to remove the top level instance tags that were returned with the notification. These tags were originally introduced to identify which PTP instance a given notification was associated with. Updates to the ORAN standard have clarified that the instance identification should instead be included in the Resource Address field. This commit includes the following changes: - Update notification format to place instance identifier in Resource Address instead of a top level tag (default behaviour) - Provide a helm override to enable legacy notification format with top level instance tags for compatibility. This can be enabled by setting the override "notification_format=legacy" - Ensure GM clockClass is correctly reported for both BC and GM configs - Changes result in new images for notificationclient-base and notificationservice-base-v2 - Pull status notifications are now delivered as a list of dicts when there are multiple instances in a request. The "overall sync status" message is delivered as a dict because it only ever has a single notification - Push notifications are delivered as individual dicts for each notification, this behaviour remains the same Test plan: Pass: Verify ptp-notification builds and deploys Pass: Verify container image builds Pass: Verify ptp-notification operations (pull status, subscribe, list, delete) Pass: Verify standard and legacy format output for each notification type Partial-bug: 2089035 Signed-off-by: Cole Walker Change-Id: Ied6674a02b41ed31079a291fc9bace74d95d467a --- .../ptp-notification/templates/daemonset.yaml | 2 + .../ptp-notification/values.yaml | 1 + notificationclient-base/debian/Dockerfile | 1 + .../common/helpers/subscription_helper.py | 54 ++++-- .../services/notification_handler.py | 99 +++++++---- .../notificationclientsdk/services/ptp.py | 56 +++--- .../controllers/v2/resource_address.py | 29 +++- .../client/ptpeventproducer.py | 19 +- .../common/helpers/constants.py | 6 +- .../common/helpers/os_clock_monitor.py | 10 +- .../common/helpers/ptp_monitor.py | 18 +- .../common/helpers/ptpsync.py | 11 +- .../trackingfunctionsdk/services/daemon.py | 162 ++++++++++++++---- .../tests/test_os_clock_monitor.py | 4 +- 14 files changed, 338 insertions(+), 134 deletions(-) diff --git a/helm-charts/custom/ptp-notification-helm/ptp-notification-helm/ptp-notification/templates/daemonset.yaml b/helm-charts/custom/ptp-notification-helm/ptp-notification-helm/ptp-notification/templates/daemonset.yaml index e2b1773..9e1bdf1 100644 --- a/helm-charts/custom/ptp-notification-helm/ptp-notification-helm/ptp-notification/templates/daemonset.yaml +++ b/helm-charts/custom/ptp-notification-helm/ptp-notification-helm/ptp-notification/templates/daemonset.yaml @@ -176,6 +176,8 @@ spec: value: "{{ .Values.ptptrackingv2.log_level }}" - name: CONTROL_TIMEOUT value: "{{ .Values.ptptrackingv2.control_timeout }}" + - name: NOTIFICATION_FORMAT + value: "{{ .Values.ptptrackingv2.notification_format }}" command: ["python3", "/mnt/ptptracking_start_v2.py"] {{- if .Values.ptptrackingv2.endpoint.liveness }} livenessProbe: diff --git a/helm-charts/custom/ptp-notification-helm/ptp-notification-helm/ptp-notification/values.yaml b/helm-charts/custom/ptp-notification-helm/ptp-notification-helm/ptp-notification/values.yaml index 41eb36a..1b841df 100644 --- a/helm-charts/custom/ptp-notification-helm/ptp-notification-helm/ptp-notification/values.yaml +++ b/helm-charts/custom/ptp-notification-helm/ptp-notification-helm/ptp-notification/values.yaml @@ -103,6 +103,7 @@ ptptracking: ptptrackingv2: enabled: True imagePullSecrets: default-registry-key + notification_format: "standard" ptp4lSocket: /var/run/ptp4l-ptp4l-legacy ptp4lServiceName: True ptp4lClockClassLockedList: "6,7,135" diff --git a/notificationclient-base/debian/Dockerfile b/notificationclient-base/debian/Dockerfile index d5cf9b5..4917057 100644 --- a/notificationclient-base/debian/Dockerfile +++ b/notificationclient-base/debian/Dockerfile @@ -9,6 +9,7 @@ RUN apt-get -y update \ gcc \ python3-dev \ python3 \ + curl \ && apt-get -y clean \ && rm -rf /var/lib/apt/lists/* RUN pip3 install --user pecan \ diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/subscription_helper.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/subscription_helper.py index 133d27d..205d5aa 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/subscription_helper.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/subscription_helper.py @@ -1,17 +1,16 @@ # -# Copyright (c) 2021-2023 Wind River Systems, Inc. +# Copyright (c) 2021-2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # import json +import logging import re +from datetime import datetime import requests -import logging -from datetime import datetime -from notificationclientsdk.common.helpers import constants -from notificationclientsdk.common.helpers import log_helper +from notificationclientsdk.common.helpers import constants, log_helper from notificationclientsdk.exception import client_exception LOG = logging.getLogger(__name__) @@ -33,13 +32,36 @@ def notify(subscriptioninfo, notification, timeout=2, retry=3): timeout=timeout) response.raise_for_status() else: - # version 2 - for item in notification: - data = format_notification_data(subscriptioninfo, {item: notification[item]}) - data = json.dumps(data) - response = requests.post(url, data=data, headers=headers, - timeout=timeout) - response.raise_for_status() + if isinstance(notification, list): + # List-type notification response format + LOG.debug("Formatting subscription response: list") + # Post notification for each list item + for item in notification: + data = json.dumps(item) + LOG.info("Notification to post %s", (data)) + response = requests.post(url, data=data, headers=headers, + timeout=timeout) + response.raise_for_status() + else: + # Dict type notification response format + LOG.debug("Formatting subscription response: dict") + if notification.get('id', None): + # Not a nested dict, post the data + data = json.dumps(notification) + LOG.info("Notification to post %s", (data)) + response = requests.post(url, data=data, headers=headers, + timeout=timeout) + response.raise_for_status() + else: + for item in notification: + # Nested dict with instance tags, post each item + data = format_notification_data(subscriptioninfo, {item: notification[item]}) + data = json.dumps(data) + LOG.info("Notification to post %s", (data)) + response = requests.post(url, data=data, headers=headers, + timeout=timeout) + response.raise_for_status() + if notification == {}: if hasattr(subscriptioninfo, 'ResourceType'): resource = "{'ResourceType':'" + \ @@ -64,6 +86,7 @@ def notify(subscriptioninfo, notification, timeout=2, retry=3): raise errt except requests.exceptions.RequestException as ex: LOG.warning("Failed to notify due to: {0}".format(str(ex))) + LOG.warning(" %s", (notification)) raise ex except requests.exceptions.HTTPError as ex: LOG.warning("Failed to notify due to: {0}".format(str(ex))) @@ -74,8 +97,11 @@ def notify(subscriptioninfo, notification, timeout=2, retry=3): return result - def format_notification_data(subscriptioninfo, notification): + if isinstance(notification, list): + return notification + + # Formatting for legacy notification if hasattr(subscriptioninfo, 'ResourceType'): LOG.debug("format_notification_data: Found v1 subscription, " "no formatting required.") @@ -112,7 +138,7 @@ def format_notification_data(subscriptioninfo, notification): float(this_delivery_time)).strftime('%Y-%m-%dT%H:%M:%S%fZ') instance['time'] = format_time else: - raise Exception("format_notification_data: No valid source " + LOG.warning("format_notification_data: No valid source " "address found in notification") LOG.debug("format_notification_data: Added parent key for client " "consumption: %s" % formatted_notification) diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_handler.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_handler.py index 412ff50..f6b7e22 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_handler.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_handler.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2021-2023 Wind River Systems, Inc. +# Copyright (c) 2021-2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -48,34 +48,47 @@ class NotificationHandler(NotificationHandlerBase): resource_address = None try: self.notification_lock.acquire() + LOG.info("Notification handler notification_info %s", notification_info) subscription_repo = SubscriptionRepo(autocommit=True) - resource_type = notification_info.get('ResourceType', None) - # Get nodename from resource address - if resource_type: - node_name = notification_info.get('ResourceQualifier', {}).get('NodeName', None) - if not resource_type: - raise Exception("abnormal notification@{0}".format(node_name)) - if not resource_type in self.__supported_resource_types: - raise Exception( - "notification with unsupported resource type:{0}".format(resource_type)) - this_delivery_time = notification_info['EventTimestamp'] - # Get subscriptions from DB to deliver notification to - entries = subscription_repo.get(Status=1, ResourceType=resource_type) - else: - parent_key = list(notification_info.keys())[0] - source = notification_info[parent_key].get('source', None) - values = notification_info[parent_key].get('data', {}).get('values', []) - resource_address = values[0].get('ResourceAddress', None) - this_delivery_time = notification_info[parent_key].get('time') - if not resource_address: - raise Exception("No resource address in notification source".format(source)) - _, node_name, _, _, _ = subscription_helper.parse_resource_address(resource_address) - # Get subscriptions from DB to deliver notification to. - # Unable to filter on resource_address here because resource_address may contain - # either an unknown node name (ie. controller-0) or a '/./' resulting in entries - # being missed. Instead, filter these v2 subscriptions in the for loop below once - # the resource path has been obtained. - entries = subscription_repo.get(Status=1) + if isinstance(notification_info, dict): + resource_type = notification_info.get('ResourceType', None) + # Get nodename from resource address + if resource_type: + node_name = notification_info.get('ResourceQualifier', {}).get('NodeName', None) + if not resource_type: + raise Exception("abnormal notification@{0}".format(node_name)) + if not resource_type in self.__supported_resource_types: + raise Exception( + "notification with unsupported resource type:{0}".format(resource_type)) + this_delivery_time = notification_info['EventTimestamp'] + # Get subscriptions from DB to deliver notification to + entries = subscription_repo.get(Status=1, ResourceType=resource_type) + else: + parent_key = list(notification_info.keys())[0] + source = notification_info[parent_key].get('source', None) + values = notification_info[parent_key].get('data', {}).get('values', []) + resource_address = values[0].get('ResourceAddress', None) + this_delivery_time = notification_info[parent_key].get('time') + if not resource_address: + raise Exception("No resource address in notification source".format(source)) + _, node_name, _, _, _ = subscription_helper.parse_resource_address(resource_address) + # Get subscriptions from DB to deliver notification to. + # Unable to filter on resource_address here because resource_address may contain + # either an unknown node name (ie. controller-0) or a '/./' resulting in entries + # being missed. Instead, filter these v2 subscriptions in the for loop below once + # the resource path has been obtained. + entries = subscription_repo.get(Status=1) + elif isinstance(notification_info, list): + LOG.debug("Handle list") + for item in notification_info: + source = item.get('source', None) + values = item.get('data', {}).get('values', []) + resource_address = values[0].get('ResourceAddress', None) + this_delivery_time = item.get('time') + if not resource_address: + raise Exception("No resource address in notification source".format(source)) + _, node_name, _, _, _ = subscription_helper.parse_resource_address(resource_address) + entries = subscription_repo.get(Status=1) for entry in entries: subscriptionid = entry.SubscriptionId @@ -106,10 +119,11 @@ class NotificationHandler(NotificationHandlerBase): entry.SubscriptionId)) continue - subscription_helper.notify(subscription_dto2, notification_info) - LOG.debug("notification is delivered successfully to {0}".format( + notification_to_send = self.__format_timestamps(notification_info) + LOG.info("Sending notification to subscribers: %s", notification_to_send) + subscription_helper.notify(subscription_dto2, notification_to_send) + LOG.info("notification is delivered successfully to {0}".format( entry.SubscriptionId)) - self.update_delivery_timestamp(node_name, subscriptionid, this_delivery_time) except Exception as ex: @@ -129,6 +143,28 @@ class NotificationHandler(NotificationHandlerBase): if not subscription_repo: del subscription_repo + def __format_timestamps(self, ptpstatus): + if isinstance(ptpstatus, list): + LOG.debug("Format timestamps for standard subscription response") + for item in ptpstatus: + item['time'] = datetime.fromtimestamp( + item['time']).strftime( + '%Y-%m-%dT%H:%M:%S%fZ') + elif isinstance(ptpstatus, dict): + LOG.debug("Format timestamps for response with instance tags") + try: + for item in ptpstatus: + # Change time from float to ascii format + ptpstatus[item]['time'] = datetime.fromtimestamp( + ptpstatus[item]['time']).strftime( + '%Y-%m-%dT%H:%M:%S%fZ') + except (TypeError, AttributeError): + LOG.debug("Format timestamp for single notification") + ptpstatus['time'] = datetime.fromtimestamp( + ptpstatus['time']).strftime( + '%Y-%m-%dT%H:%M:%S%fZ') + return ptpstatus + def __get_latest_delivery_timestamp(self, node_name, subscriptionid): last_delivery_stat = self.notification_stat.get(node_name, {}).get(subscriptionid, {}) last_delivery_time = last_delivery_stat.get('EventTimestamp', None) @@ -152,6 +188,7 @@ class NotificationHandler(NotificationHandlerBase): else: last_delivery_stat = self.notification_stat.get(node_name, {}).get(subscriptionid, {}) last_delivery_time = last_delivery_stat.get('EventTimestamp', None) + LOG.debug("last_delivery_time %s this_delivery_time %s" % (last_delivery_time, this_delivery_time)) if (last_delivery_time and last_delivery_time >= this_delivery_time): return last_delivery_stat['EventTimestamp'] = this_delivery_time diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py index b5297e5..edf289b 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py @@ -1,30 +1,29 @@ # -# Copyright (c) 2021-2023 Wind River Systems, Inc. +# Copyright (c) 2021-2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # -import oslo_messaging -import logging import json -import kombu -import requests +import logging from datetime import datetime -from notificationclientsdk.client.notificationservice \ - import NotificationServiceClient -from notificationclientsdk.common.helpers import subscription_helper -from notificationclientsdk.common.helpers import log_helper -from notificationclientsdk.common.helpers import constants +import kombu +import oslo_messaging +import requests +from notificationclientsdk.client.notificationservice import \ + NotificationServiceClient +from notificationclientsdk.common.helpers import (constants, log_helper, + subscription_helper) from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper +from notificationclientsdk.exception import client_exception from notificationclientsdk.model.dto.resourcetype import ResourceType -from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1 -from notificationclientsdk.model.dto.subscription import SubscriptionInfoV2 -from notificationclientsdk.model.orm.subscription \ - import Subscription as SubscriptionOrm +from notificationclientsdk.model.dto.subscription import (SubscriptionInfoV1, + SubscriptionInfoV2) +from notificationclientsdk.model.orm.subscription import \ + Subscription as SubscriptionOrm from notificationclientsdk.repository.node_repo import NodeRepo from notificationclientsdk.repository.subscription_repo import SubscriptionRepo -from notificationclientsdk.exception import client_exception LOG = logging.getLogger(__name__) log_helper.config_logger(LOG) @@ -272,18 +271,33 @@ class PtpService(object): timestamp = ptpstatus[constants.PTP_V1_KEY].get( 'EventTimestamp', None) ptpstatus = ptpstatus[constants.PTP_V1_KEY] - else: + elif isinstance(ptpstatus, list): + LOG.debug("Format timestamps for standard subscription response") for item in ptpstatus: - timestamp = ptpstatus[item].get('time', None) - # Change time from float to ascii format - ptpstatus[item]['time'] = datetime.fromtimestamp( - ptpstatus[item]['time']).strftime( + timestamp = item.get('time', None) + item['time'] = datetime.fromtimestamp( + item['time']).strftime( + '%Y-%m-%dT%H:%M:%S%fZ') + elif isinstance(ptpstatus, dict): + LOG.debug("Format timestamps for response with instance tags") + try: + for item in ptpstatus: + timestamp = ptpstatus[item].get('time', None) + # Change time from float to ascii format + ptpstatus[item]['time'] = datetime.fromtimestamp( + ptpstatus[item]['time']).strftime( + '%Y-%m-%dT%H:%M:%S%fZ') + except (TypeError, AttributeError): + LOG.debug("Format timestamp for single notification") + timestamp = ptpstatus.get('time', None) + ptpstatus['time'] = datetime.fromtimestamp( + ptpstatus['time']).strftime( '%Y-%m-%dT%H:%M:%S%fZ') - nodes[default_node_name] = ptpstatus subscription_orm = SubscriptionOrm(**subscription_dto.to_orm()) subscription_orm.InitialDeliveryTimestamp = timestamp + LOG.debug("Setting initial delivery timestamp %s", timestamp) entry = self.subscription_repo.add(subscription_orm) # Delivery the initial notification of ptp status diff --git a/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/resource_address.py b/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/resource_address.py index 1e26426..a74a1e2 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/resource_address.py +++ b/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/resource_address.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2022 Wind River Systems, Inc. +# Copyright (c) 2022-2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -36,7 +36,7 @@ class ResourceAddressController(object): self.resource_address) if nodename == constants.WILDCARD_CURRENT_NODE: nodename = notification_control.get_residing_nodename() - LOG.debug('Nodename to query: %s' % nodename) + LOG.info('Nodename to query: %s' % nodename) if not notification_control.in_service_nodenames(nodename): LOG.warning("Node {} is not available".format(nodename)) raise client_exception.NodeNotAvailable(nodename) @@ -46,11 +46,23 @@ class ResourceAddressController(object): ptpservice = PtpService(notification_control) ptpstatus = ptpservice.query(nodename, self.resource_address, optional) - LOG.debug('Got ptpstatus: %s' % ptpstatus) - for item in ptpstatus: - ptpstatus[item]['time'] = datetime.fromtimestamp( - ptpstatus[item]['time']).strftime( - '%Y-%m-%dT%H:%M:%S%fZ') + LOG.info('Received ptpstatus: %s', ptpstatus) + if isinstance(ptpstatus, dict): + try: + for item in ptpstatus: + ptpstatus[item]['time'] = datetime.fromtimestamp( + ptpstatus[item]['time']).strftime( + '%Y-%m-%dT%H:%M:%S%fZ') + except TypeError: + # ptpstatus does not have instance tags + ptpstatus['time'] = datetime.fromtimestamp( + ptpstatus['time']).strftime( + '%Y-%m-%dT%H:%M:%S%fZ') + elif isinstance(ptpstatus, list): + for item in ptpstatus: + item['time'] = datetime.fromtimestamp( + item['time']).strftime( + '%Y-%m-%dT%H:%M:%S%fZ') return ptpstatus except client_exception.NodeNotAvailable as ex: LOG.warning("{0}".format(str(ex))) @@ -66,7 +78,8 @@ class ResourceAddressController(object): # raise ex abort(400) except TypeError as ex: - LOG.error("Resource {0} not found on {1}".format(self.resource_address, nodename)) + LOG.error("Resource {0} not found on {1}, error: {2}".format( + self.resource_address, nodename, ex)) abort(404) except HTTPServerError as ex: LOG.error("Server side error:{0},{1}".format(type(ex), str(ex))) diff --git a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/client/ptpeventproducer.py b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/client/ptpeventproducer.py index d06d396..d0a21d5 100644 --- a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/client/ptpeventproducer.py +++ b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/client/ptpeventproducer.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2021-2023 Wind River Systems, Inc. +# Copyright (c) 2021-2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -16,7 +16,7 @@ import logging LOG = logging.getLogger(__name__) -from trackingfunctionsdk.common.helpers import log_helper +from trackingfunctionsdk.common.helpers import log_helper, constants log_helper.config_logger(LOG) @@ -32,7 +32,8 @@ class PtpEventProducer(object): pass def QueryStatus(self, ctx, **rpc_kwargs): - LOG.debug("PtpEventProducer QueryStatus called %s" % rpc_kwargs) + # This is where the "GET" commands run through + LOG.info("PtpEventProducer QueryStatus called %s" % rpc_kwargs) if self.handler: return self.handler.query_status(**rpc_kwargs) else: @@ -85,16 +86,16 @@ class PtpEventProducer(object): try: self.local_broker_client.cast( topic, 'NotifyStatus', notification=ptpstatus) - LOG.debug("Published ptp status:{0}@Topic:{1}".format(ptpstatus, topic)) + LOG.debug("Published ptp status local:{0}@Topic:{1}".format(ptpstatus, topic)) break except Exception as ex: - LOG.warning("Failed to publish ptp status:{0}@Topic:{1} due to: {2}".format( + LOG.warning("Failed to publish ptp status local:{0}@Topic:{1} due to: {2}".format( ptpstatus, topic, str(ex))) retry = retry - 1 isretrystopped = False if retry > 0 else True if isretrystopped: - LOG.error("Failed to publish ptp status:{0}@Topic:{1}".format( + LOG.error("Failed to publish ptp status local:{0}@Topic:{1}".format( ptpstatus, topic)) return isretrystopped == False @@ -108,16 +109,16 @@ class PtpEventProducer(object): try: self.registration_broker_client.cast( topic_all, 'NotifyStatus', notification=ptpstatus) - LOG.debug("Published ptp status:{0}@Topic:{1}".format(ptpstatus, topic_all)) + LOG.debug("Published ptp status all:{0}@Topic:{1}".format(ptpstatus, topic_all)) break except Exception as ex: - LOG.warning("Failed to publish ptp status:{0}@Topic:{1} due to: {2}".format( + LOG.warning("Failed to publish ptp status all:{0}@Topic:{1} due to: {2}".format( ptpstatus, topic_all, str(ex))) retry = retry - 1 isretrystopped = False if retry > 0 else True if isretrystopped: - LOG.error("Failed to publish ptp status:{0}@Topic:{1}".format( + LOG.error("Failed to publish ptp status all:{0}@Topic:{1}".format( ptpstatus, topic_all)) return isretrystopped == False diff --git a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/constants.py b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/constants.py index 2434616..fac8711 100644 --- a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/constants.py +++ b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/constants.py @@ -1,10 +1,11 @@ # -# Copyright (c) 2021-2023 Wind River Systems, Inc. +# Copyright (c) 2021-2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # from os import path +import os # phc states constants FREERUN_PHC_STATE = "Freerun" @@ -41,6 +42,9 @@ GNSS_DPLL_1 = "DPLL1" UTC_OFFSET = "37" +# Notification formatting +NOTIFICATION_FORMAT = os.environ.get("NOTIFICATION_FORMAT", 'standard') + if path.exists('/ptp/linuxptp/ptpinstance'): LINUXPTP_CONFIG_PATH = '/ptp/linuxptp/ptpinstance/' elif path.exists('/ptp/ptpinstance'): diff --git a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/os_clock_monitor.py b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/os_clock_monitor.py index 632b14a..bd64948 100644 --- a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/os_clock_monitor.py +++ b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/os_clock_monitor.py @@ -15,6 +15,7 @@ from glob import glob from trackingfunctionsdk.common.helpers import log_helper from trackingfunctionsdk.common.helpers import constants from trackingfunctionsdk.model.dto.osclockstate import OsClockState +from trackingfunctionsdk.common.helpers import ptpsync as utils LOG = logging.getLogger(__name__) log_helper.config_logger(LOG) @@ -304,12 +305,17 @@ class OsClockMonitor: def set_os_clock_state(self): offset_int = int(self.offset) + _, _, phc2sys, _ = \ + utils.check_critical_resources('', self.phc2sys_instance) if offset_int > self.phc2sys_tolerance_high or \ offset_int < self.phc2sys_tolerance_low: LOG.warning("PHC2SYS offset is outside of tolerance") self._state = OsClockState.Freerun + elif not phc2sys: + LOG.warning("Phc2sys instance %s is not running", self.phc2sys_instance) + self._state = OsClockState.Freerun else: - LOG.info("PHC2SYS offset is within tolerance") + LOG.info("PHC2SYS offset is within tolerance: %s", offset_int) self._state = OsClockState.Locked # Perform an extra check for HA Phc2sys to ensure we have a source interface @@ -341,7 +347,7 @@ class OsClockMonitor: self._state = constants.HOLDOVER_PHC_STATE elif previous_sync_state == constants.HOLDOVER_PHC_STATE and \ time_in_holdover < max_holdover_time: - LOG.debug("OS Clock: Time in holdover is %s " + LOG.info("OS Clock: Time in holdover is %s " "Max time in holdover is %s" % (time_in_holdover, max_holdover_time)) self._state = constants.HOLDOVER_PHC_STATE diff --git a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptp_monitor.py b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptp_monitor.py index 652c2ad..a6e3147 100644 --- a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptp_monitor.py +++ b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptp_monitor.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2021-2023 Wind River Systems, Inc. +# Copyright (c) 2021-2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -83,7 +83,7 @@ class PtpMonitor: def set_ptp_clock_class(self): try: - clock_class = self.pmc_query_results['clockClass'] + clock_class = self.pmc_query_results['gm.ClockClass'] # Reset retry counter upon getting clock class self._clock_class_retry = 3 except KeyError: @@ -132,11 +132,12 @@ class PtpMonitor: # max holdover time is calculated to be in a 'safety' zone max_holdover_time = (self.holdover_time - self.freq * 2) - pmc, ptp4l, phc2sys, ptp4lconf = \ + pmc, ptp4l, _, ptp4lconf = \ utils.check_critical_resources(self.ptp4l_service_name, self.phc2sys_service_name) # run pmc command if preconditions met - if pmc and ptp4l and phc2sys and ptp4lconf: + # Removed check for phc2sys, ptp4l status should not depend on it + if pmc and ptp4l and ptp4lconf: self.pmc_query_results, total_ptp_keywords, port_count = \ self.ptpsync() try: @@ -147,12 +148,11 @@ class PtpMonitor: sync_state = previous_sync_state else: LOG.warning("Missing critical resource: " - "PMC %s PTP4L %s PHC2SYS %s PTP4LCONF %s" - % (pmc, ptp4l, phc2sys, ptp4lconf)) + "PMC %s PTP4L %s PTP4LCONF %s" + % (pmc, ptp4l, ptp4lconf)) sync_state = PtpState.Freerun - # determine if transition into holdover mode (cannot be in holdover if - # system clock is not in sync) - if sync_state == PtpState.Freerun and phc2sys: + # determine if transition into holdover mode + if sync_state == PtpState.Freerun: if previous_sync_state in [constants.UNKNOWN_PHC_STATE, PtpState.Freerun]: sync_state = PtpState.Freerun diff --git a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptpsync.py b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptpsync.py index ae040a6..646376f 100644 --- a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptpsync.py +++ b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptpsync.py @@ -1,6 +1,6 @@ #! /usr/bin/python3 # -# Copyright (c) 2021-2023 Wind River Systems, Inc. +# Copyright (c) 2021-2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -114,7 +114,12 @@ def parse_resource_address(resource_address): return clusterName, nodeName, resource_path -def format_resource_address(node_name, resource): +def format_resource_address(node_name, resource, instance=None): # Return a resource_address - resource_address = '/./' + node_name + resource + resource_address = '/./' + node_name + if instance: + resource_address = resource_address + '/' + instance + resource + else: + resource_address = resource_address + resource + LOG.debug("format_resource_address %s" % resource_address) return resource_address diff --git a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py index d96f7f1..860fc4b 100644 --- a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py +++ b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py @@ -117,7 +117,12 @@ class PtpWatcherDefault: return lastStatus def query_status(self, **rpc_kwargs): + # Client PULL status requests come through here + # Dict is used for legacy notification format lastStatus = {} + # List is used for standard notification format + newStatus = [] + resource_address = rpc_kwargs.get('ResourceAddress', None) optional = rpc_kwargs.get('optional', None) if resource_address: @@ -137,8 +142,9 @@ class PtpWatcherDefault: constants.SOURCE_SYNC_GNSS_SYNC_STATUS, last_event_time, utils.format_resource_address(nodename, - constants.SOURCE_SYNC_GNSS_SYNC_STATUS), + constants.SOURCE_SYNC_GNSS_SYNC_STATUS, optional), sync_state) + newStatus.append(lastStatus[optional]) elif not optional: for config in self.daemon_context['GNSS_INSTANCES']: sync_state = \ @@ -151,8 +157,9 @@ class PtpWatcherDefault: constants.SOURCE_SYNC_GNSS_SYNC_STATUS, last_event_time, utils.format_resource_address(nodename, - constants.SOURCE_SYNC_GNSS_SYNC_STATUS), + constants.SOURCE_SYNC_GNSS_SYNC_STATUS, config), sync_state) + newStatus.append(lastStatus[config]) else: lastStatus = None self.watcher.gnsstracker_context_lock.release() @@ -170,8 +177,9 @@ class PtpWatcherDefault: constants.SOURCE_SYNC_PTP_CLOCK_CLASS, last_clock_class_event_time, utils.format_resource_address(nodename, - constants.SOURCE_SYNC_PTP_CLOCK_CLASS), + constants.SOURCE_SYNC_PTP_CLOCK_CLASS, optional), clock_class, constants.VALUE_TYPE_METRIC) + newStatus.append(lastStatus[optional]) elif not optional: for config in self.daemon_context['PTP4L_INSTANCES']: clock_class = \ @@ -185,8 +193,9 @@ class PtpWatcherDefault: constants.SOURCE_SYNC_PTP_CLOCK_CLASS, last_clock_class_event_time, utils.format_resource_address(nodename, - constants.SOURCE_SYNC_PTP_CLOCK_CLASS), + constants.SOURCE_SYNC_PTP_CLOCK_CLASS, config), clock_class, constants.VALUE_TYPE_METRIC) + newStatus.append(lastStatus[config]) else: lastStatus = None self.watcher.ptptracker_context_lock.release() @@ -204,8 +213,9 @@ class PtpWatcherDefault: constants.SOURCE_SYNC_PTP_LOCK_STATE, last_event_time, utils.format_resource_address(nodename, - constants.SOURCE_SYNC_PTP_LOCK_STATE), + constants.SOURCE_SYNC_PTP_LOCK_STATE, optional), sync_state) + newStatus.append(lastStatus[optional]) elif not optional: for config in self.daemon_context['PTP4L_INSTANCES']: sync_state = \ @@ -218,8 +228,9 @@ class PtpWatcherDefault: constants.SOURCE_SYNC_PTP_LOCK_STATE, last_event_time, utils.format_resource_address(nodename, - constants.SOURCE_SYNC_PTP_LOCK_STATE), + constants.SOURCE_SYNC_PTP_LOCK_STATE, config), sync_state) + newStatus.append(lastStatus[config]) else: lastStatus = None self.watcher.ptptracker_context_lock.release() @@ -239,6 +250,7 @@ class PtpWatcherDefault: utils.format_resource_address(nodename, constants.SOURCE_SYNC_OS_CLOCK), sync_state) + newStatus.append(lastStatus['os_clock_status']) if resource_path == constants.SOURCE_SYNC_SYNC_STATE or \ resource_path == constants.SOURCE_SYNC_ALL: self.watcher.overalltracker_context_lock.acquire() @@ -253,9 +265,23 @@ class PtpWatcherDefault: utils.format_resource_address(nodename, constants.SOURCE_SYNC_SYNC_STATE), sync_state) - LOG.debug("query_status: {}".format(lastStatus)) + if resource_path == constants.SOURCE_SYNC_ALL: + newStatus.append(lastStatus['overall_sync_status']) + else: + # Special handling for overall_sync_status + # There will only ever be a single response from + # SOURCE_SYNC_SYNC_STATE. + # Return a dict rather than a list + newStatus = lastStatus['overall_sync_status'] + + + if constants.NOTIFICATION_FORMAT == 'standard': + LOG.info("PULL status returning: %s", newStatus) + return newStatus + else: + LOG.info("PULL status returning: {}".format(lastStatus)) + return lastStatus - return lastStatus def trigger_delivery(self, **rpc_kwargs): self.watcher.forced_publishing = True @@ -383,6 +409,16 @@ class PtpWatcherDefault: notificationservice_health = HealthServer() notificationservice_health.run() + # Need to give the notificationclient sidecar pods + # a few seconds to re-connect to the newly started + # RabbitMQ. If we don't wait here, the initial + # status delivieries can be sent before the clients + # are connected and they will never receive the + # notification + # This approach can probably be improved by + # checking the RabbitMQ endpoint + time.sleep(10) + while True: # announce the location forced = self.forced_publishing @@ -526,6 +562,7 @@ class PtpWatcherDefault: last_event_time = self.osclocktracker_context.get('last_event_time', time.time()) lastStatus = {} + newStatus = [] new_event, sync_state, new_event_time = self.__get_os_clock_status( holdover_time, freq, sync_state, last_event_time) @@ -559,13 +596,23 @@ class PtpWatcherDefault: ] } } - self.ptpeventproducer.publish_status( - lastStatus, constants.SOURCE_SYNC_OS_CLOCK) - self.ptpeventproducer.publish_status( - lastStatus, constants.SOURCE_SYNC_ALL) + + newStatus.append(lastStatus['os_clock_status']) + + if constants.NOTIFICATION_FORMAT == 'standard': + self.ptpeventproducer.publish_status( + newStatus, constants.SOURCE_SYNC_OS_CLOCK) + self.ptpeventproducer.publish_status( + newStatus, constants.SOURCE_SYNC_ALL) + else: + self.ptpeventproducer.publish_status( + lastStatus, constants.SOURCE_SYNC_OS_CLOCK) + self.ptpeventproducer.publish_status( + lastStatus, constants.SOURCE_SYNC_ALL) def __publish_overall_sync_status(self, forced=False): lastStatus = {} + newStatus = [] holdover_time = float(self.overalltracker_context['holdover_seconds']) freq = float(self.overalltracker_context['poll_freq_seconds']) sync_state = self.overalltracker_context.get('sync_state', 'Unknown') @@ -605,14 +652,24 @@ class PtpWatcherDefault: ] } } - self.ptpeventproducer.publish_status( - lastStatus, constants.SOURCE_SYNC_SYNC_STATE) - self.ptpeventproducer.publish_status( - lastStatus, constants.SOURCE_SYNC_ALL) + newStatus.append(lastStatus['overall_sync_status']) + if constants.NOTIFICATION_FORMAT == 'standard': + self.ptpeventproducer.publish_status( + newStatus, constants.SOURCE_SYNC_SYNC_STATE) + self.ptpeventproducer.publish_status( + newStatus, constants.SOURCE_SYNC_ALL) + else: + self.ptpeventproducer.publish_status( + lastStatus, constants.SOURCE_SYNC_SYNC_STATE) + self.ptpeventproducer.publish_status( + lastStatus, constants.SOURCE_SYNC_ALL) def __publish_gnss_status(self, forced=False): - lastStatus = {} + for gnss in self.observer_list: + # Ensure that status structs are cleared between each iteration + lastStatus = {} + newStatus = [] holdover_time = float( self.gnsstracker_context[ gnss.ts2phc_service_name]['holdover_seconds']) @@ -643,7 +700,9 @@ class PtpWatcherDefault: # publish new event in API version v2 format resource_address = utils.format_resource_address( - self.node_name, constants.SOURCE_SYNC_GNSS_SYNC_STATUS) + self.node_name, + constants.SOURCE_SYNC_GNSS_SYNC_STATUS, + gnss.ts2phc_service_name) lastStatus[gnss.ts2phc_service_name] = { 'id': uuidutils.generate_uuid(), 'specversion': constants.SPEC_VERSION, @@ -663,15 +722,27 @@ class PtpWatcherDefault: ] } } - self.ptpeventproducer.publish_status( - lastStatus, constants.SOURCE_SYNC_GNSS_SYNC_STATUS) - self.ptpeventproducer.publish_status( - lastStatus, constants.SOURCE_SYNC_ALL) + newStatus.append(lastStatus[gnss.ts2phc_service_name]) + if constants.NOTIFICATION_FORMAT == 'standard': + self.ptpeventproducer.publish_status( + newStatus, constants.SOURCE_SYNC_GNSS_SYNC_STATUS) + self.ptpeventproducer.publish_status( + newStatus, constants.SOURCE_SYNC_ALL) + else: + self.ptpeventproducer.publish_status( + newStatus, constants.SOURCE_SYNC_GNSS_SYNC_STATUS) + self.ptpeventproducer.publish_status( + newStatus, constants.SOURCE_SYNC_ALL) def __publish_ptpstatus(self, forced=False): - lastStatus = {} - lastClockClassStatus = {} + for ptp_monitor in self.ptp_monitor_list: + # Ensure that status structs are cleared between each iteration + newStatus = [] + newClockClassStatus = [] + lastStatus = {} + lastClockClassStatus = {} + holdover_time = float(self.ptptracker_context[ ptp_monitor.ptp4l_service_name]['holdover_seconds']) freq = float(self.ptptracker_context[ @@ -717,7 +788,9 @@ class PtpWatcherDefault: lastStatus = {} # publish new event in API version v2 format resource_address = utils.format_resource_address( - self.node_name, constants.SOURCE_SYNC_PTP_LOCK_STATE) + self.node_name, + constants.SOURCE_SYNC_PTP_LOCK_STATE, + ptp_monitor.ptp4l_service_name) lastStatus[ptp_monitor.ptp4l_service_name] = { 'id': uuidutils.generate_uuid(), 'specversion': constants.SPEC_VERSION, @@ -737,10 +810,18 @@ class PtpWatcherDefault: } } self.ptptracker_context_lock.release() - self.ptpeventproducer.publish_status( - lastStatus, constants.SOURCE_SYNC_PTP_LOCK_STATE) - self.ptpeventproducer.publish_status( - lastStatus, constants.SOURCE_SYNC_ALL) + newStatus.append(lastStatus[ptp_monitor.ptp4l_service_name]) + + if constants.NOTIFICATION_FORMAT == 'standard': + self.ptpeventproducer.publish_status( + newStatus, constants.SOURCE_SYNC_PTP_LOCK_STATE) + self.ptpeventproducer.publish_status( + newStatus, constants.SOURCE_SYNC_ALL) + else: + self.ptpeventproducer.publish_status( + lastStatus, constants.SOURCE_SYNC_PTP_LOCK_STATE) + self.ptpeventproducer.publish_status( + lastStatus, constants.SOURCE_SYNC_ALL) if new_clock_class_event or forced: # update context @@ -752,7 +833,9 @@ class PtpWatcherDefault: = clock_class_event_time resource_address = utils.format_resource_address( - self.node_name, constants.SOURCE_SYNC_PTP_CLOCK_CLASS) + self.node_name, + constants.SOURCE_SYNC_PTP_CLOCK_CLASS, + ptp_monitor.ptp4l_service_name) lastClockClassStatus[ptp_monitor.ptp4l_service_name] = { 'id': uuidutils.generate_uuid(), @@ -772,14 +855,23 @@ class PtpWatcherDefault: ] } } + newClockClassStatus.append(lastClockClassStatus[ptp_monitor.ptp4l_service_name]) self.ptptracker_context_lock.release() LOG.info("Publishing clockClass for %s: %s" % (ptp_monitor.ptp4l_service_name, clock_class)) - self.ptpeventproducer.publish_status( - lastClockClassStatus, - constants.SOURCE_SYNC_PTP_CLOCK_CLASS) - self.ptpeventproducer.publish_status(lastClockClassStatus, - constants.SOURCE_SYNC_ALL) + + if constants.NOTIFICATION_FORMAT == 'standard': + self.ptpeventproducer.publish_status( + newClockClassStatus, + constants.SOURCE_SYNC_PTP_CLOCK_CLASS) + self.ptpeventproducer.publish_status(newClockClassStatus, + constants.SOURCE_SYNC_ALL) + else: + self.ptpeventproducer.publish_status( + lastClockClassStatus, + constants.SOURCE_SYNC_PTP_CLOCK_CLASS) + self.ptpeventproducer.publish_status(lastClockClassStatus, + constants.SOURCE_SYNC_ALL) class DaemonControl(object): diff --git a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/tests/test_os_clock_monitor.py b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/tests/test_os_clock_monitor.py index 508be75..719912b 100644 --- a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/tests/test_os_clock_monitor.py +++ b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/tests/test_os_clock_monitor.py @@ -85,7 +85,9 @@ class OsClockMonitorTests(unittest.TestCase): self.clockmon.get_os_clock_offset() assert self.clockmon.offset == '37000000015' - def test_set_os_closck_state(self): + @mock.patch('trackingfunctionsdk.common.helpers.ptpsync.check_critical_resources', + side_effect=[b'True']) + def test_set_os_clock_state(self, critical_patched): self.clockmon = OsClockMonitor(phc2sys_config=phc2sys_test_config, init=False) self.clockmon.offset = '37000000015' self.clockmon.set_os_clock_state()