37c15ea4fb
Updates the locationservice, notificationservice and notificationclient containers to support ipv6 httpGet liveness probes. notificationservice-base and notificationservice-basev2: - Adds health.py which starts a simple http server that runs within the daemon. The k8s httpGet liveness probe can query this endpoint to verify that the service is running - Update the daemonset template and values to provide the required info for initalizing the new endpoint locationservice-base: - Remove unused portions of the locationservice_start.sh config map. The location-query-server.py and location-announce.py were never active and are not required - Add locationservice_start.py in order to start the locationservice pecan WSGI application with either an ipv4 or ipv6 socket - Use existing pecan endpoint to respond to liveness probes notificationclient-base: - Add notificationclient_start.py to start the notificationclient pecan WSGI application with either an ipv4 or ipv6 socket - Use existing pecan endpoint to respond to liveness probes Daemonset: - Add required ip and port environment variables to support liveness probes on each container - Add a conditional section for enabling liveness probes. Disabled by default but can be enabled via helm overrides by setting "liveness: True" Misc: - Re-organized python imports in affected files - Incremented helm chart version to 2.0.1 Test-plan: Pass: Verify application build and install Pass: Verify containers build correctly Pass: Deploy ptp-notification and verify basic sanity (v1 and v2 get, subscribe, delete, list) Pass: Enable httpGet liveness probes for each container and verify operation Pass: Verify application removal Story: 2011090 Task: 49851 Signed-off-by: Cole Walker <cole.walker@windriver.com> Change-Id: I4671c7f8c67c4869a6d5e3b384eae66d8c57a284
144 lines
4.6 KiB
Python
144 lines
4.6 KiB
Python
#
|
|
# Copyright (c) 2024 Wind River Systems, Inc.
|
|
#
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
#
|
|
|
|
import json
|
|
import logging
|
|
import multiprocessing as mp
|
|
import os
|
|
import time
|
|
|
|
import oslo_messaging
|
|
from locationservicesdk.client.locationproducer import LocationProducer
|
|
from locationservicesdk.common.helpers import log_helper, rpc_helper
|
|
from locationservicesdk.model.dto.resourcetype import ResourceType
|
|
from locationservicesdk.model.dto.rpc_endpoint import RpcEndpointInfo
|
|
from oslo_config import cfg
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
log_helper.config_logger(LOG)
|
|
|
|
'''Entry point of Default Process Worker'''
|
|
|
|
|
|
def ProcessWorkerDefault(event, sqlalchemy_conf_json, registration_endpoint, location_info_json):
|
|
worker = LocationWatcherDefault(
|
|
event, sqlalchemy_conf_json, registration_endpoint, location_info_json)
|
|
worker.run()
|
|
return
|
|
|
|
|
|
class LocationWatcherDefault:
|
|
class LocationRequestHandlerDefault(object):
|
|
def __init__(self, watcher):
|
|
self.watcher = watcher
|
|
|
|
def handle(self, **rpc_kwargs):
|
|
self.watcher.signal_location_event()
|
|
|
|
def __init__(self, event, sqlalchemy_conf_json, registration_transport_endpoint, location_info_json):
|
|
self.sqlalchemy_conf = json.loads(sqlalchemy_conf_json)
|
|
self.event = event
|
|
self.event_timeout = float(2.0)
|
|
self.event_iteration = 0
|
|
self.location_info = json.loads(location_info_json)
|
|
this_node_name = self.location_info['NodeName']
|
|
|
|
self.registration_endpoint = RpcEndpointInfo(
|
|
registration_transport_endpoint)
|
|
self.LocationProducer = LocationProducer(
|
|
this_node_name,
|
|
self.registration_endpoint.TransportEndpoint)
|
|
|
|
def signal_location_event(self):
|
|
if self.event:
|
|
self.event.set()
|
|
else:
|
|
LOG.warning("Unable to assert location event")
|
|
pass
|
|
|
|
def run(self):
|
|
# start location listener
|
|
self.__start_listener()
|
|
while True:
|
|
# annouce the location
|
|
self.__announce_location()
|
|
if self.event.wait(self.event_timeout):
|
|
LOG.debug("daemon control event is asserted")
|
|
self.event.clear()
|
|
else:
|
|
# max timeout: 1 hour
|
|
if self.event_timeout < float(3600):
|
|
self.event_timeout = self.event_timeout + self.event_timeout
|
|
LOG.debug("daemon control event is timeout: %s" %
|
|
self.event_timeout)
|
|
continue
|
|
self.__stop_listener()
|
|
|
|
'''Start listener to answer querying from clients'''
|
|
|
|
def __start_listener(self):
|
|
LOG.debug("start listener to answer location querying")
|
|
|
|
self.LocationProducer.start_location_listener(
|
|
self.location_info,
|
|
LocationWatcherDefault.LocationRequestHandlerDefault(self)
|
|
)
|
|
return
|
|
|
|
def __stop_listener(self):
|
|
LOG.debug("stop listener to answer location querying")
|
|
|
|
self.LocationProducer.stop_location_listener(self.location_info)
|
|
return
|
|
|
|
'''announce location'''
|
|
|
|
def __announce_location(self):
|
|
LOG.debug("announce location info to clients")
|
|
self.LocationProducer.announce_location(self.location_info)
|
|
return
|
|
|
|
|
|
class DaemonControl(object):
|
|
|
|
def __init__(
|
|
self, sqlalchemy_conf_json, registration_transport_endpoint,
|
|
location_info, process_worker=None, daemon_mode=True):
|
|
|
|
self.daemon_mode = daemon_mode
|
|
self.event = mp.Event()
|
|
self.registration_endpoint = RpcEndpointInfo(
|
|
registration_transport_endpoint)
|
|
self.registration_transport = rpc_helper.get_transport(
|
|
self.registration_endpoint)
|
|
self.location_info = location_info
|
|
self.sqlalchemy_conf_json = sqlalchemy_conf_json
|
|
|
|
if not process_worker:
|
|
process_worker = ProcessWorkerDefault
|
|
self.process_worker = process_worker
|
|
|
|
if not self.daemon_mode:
|
|
return
|
|
|
|
self.mpinstance = mp.Process(
|
|
target=process_worker,
|
|
args=(self.event, self.sqlalchemy_conf_json,
|
|
self.registration_endpoint.TransportEndpoint,
|
|
self.location_info))
|
|
self.mpinstance.start()
|
|
|
|
pass
|
|
|
|
def refresh(self):
|
|
if not self.daemon_mode:
|
|
self.process_worker(
|
|
self.event, self.sqlalchemy_conf_json,
|
|
self.registration_endpoint.TransportEndpoint, self.location_info)
|
|
|
|
self.event.set()
|