diff --git a/centos_stable_docker_images.inc b/centos_stable_docker_images.inc index e69de29..ff7bc4f 100644 --- a/centos_stable_docker_images.inc +++ b/centos_stable_docker_images.inc @@ -0,0 +1,3 @@ +notificationservice-base +locationservice-base +notificationclient-base diff --git a/locationservice-base/centos/docker/Dockerfile b/locationservice-base/centos/docker/Dockerfile new file mode 100644 index 0000000..a91644a --- /dev/null +++ b/locationservice-base/centos/docker/Dockerfile @@ -0,0 +1,22 @@ +ARG BASE +FROM ${BASE} + +ARG STX_REPO_FILE=/etc/yum.repos.d/stx.repo + +ENV KUBE_LATEST_VERSION="v1.18.3" + +RUN set -ex ;\ + yum install --disablerepo=* \ + $(grep '^name=' ${STX_REPO_FILE} | awk -F '=' '{printf "--enablerepo=" $2 " "}') \ + -y \ + gcc python3-devel python3-pip \ + && pip3 install --user pecan \ + && pip3 install oslo-config \ + && pip3 install oslo-messaging \ + && pip3 install WSME + +WORKDIR /opt/ +COPY ./locationservice /opt/locationservice +RUN cd /opt/locationservice && python3 setup.py develop + +CMD ["bash"] diff --git a/locationservice-base/centos/docker/locationservice/apiserver/__init__.py b/locationservice-base/centos/docker/locationservice/apiserver/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/locationservice-base/centos/docker/locationservice/apiserver/app.py b/locationservice-base/centos/docker/locationservice/apiserver/app.py new file mode 100644 index 0000000..ae7479a --- /dev/null +++ b/locationservice-base/centos/docker/locationservice/apiserver/app.py @@ -0,0 +1,15 @@ +from pecan import make_app +from apiserver.repository.notification_control import notification_control + +from pecan import conf + +def setup_app(config): + + notification_control.refresh() + app_conf = dict(config.app) + + return make_app( + app_conf.pop('root'), + logging=getattr(config, 'logging', {}), + **app_conf + ) diff --git a/locationservice-base/centos/docker/locationservice/apiserver/controllers/__init__.py b/locationservice-base/centos/docker/locationservice/apiserver/controllers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/locationservice-base/centos/docker/locationservice/apiserver/controllers/root.py b/locationservice-base/centos/docker/locationservice/apiserver/controllers/root.py new file mode 100644 index 0000000..21c947d --- /dev/null +++ b/locationservice-base/centos/docker/locationservice/apiserver/controllers/root.py @@ -0,0 +1,20 @@ +#coding=utf-8 + +from pecan import expose, redirect, rest, route, response +from webob.exc import status_map + +from wsme import types as wtypes +from wsmeext.pecan import wsexpose + +class HealthController(rest.RestController): + + @wsexpose(wtypes.text) + def get(self): + return {'health': True} + + +class RootController(object): + pass + + +route(RootController, 'health', HealthController()) diff --git a/locationservice-base/centos/docker/locationservice/apiserver/model/__init__.py b/locationservice-base/centos/docker/locationservice/apiserver/model/__init__.py new file mode 100644 index 0000000..71f7c51 --- /dev/null +++ b/locationservice-base/centos/docker/locationservice/apiserver/model/__init__.py @@ -0,0 +1,17 @@ +from pecan import conf # noqa + +def init_model(): + """ + This is a stub method which is called at application startup time. + + If you need to bind to a parsed database configuration, set up tables or + ORM classes, or perform any database initialization, this is the + recommended place to do it. + + For more information working with databases, and some common recipes, + see https://pecan.readthedocs.io/en/latest/databases.html + """ + + pass + + diff --git a/locationservice-base/centos/docker/locationservice/apiserver/repository/__init__.py b/locationservice-base/centos/docker/locationservice/apiserver/repository/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/locationservice-base/centos/docker/locationservice/apiserver/repository/notification_control.py b/locationservice-base/centos/docker/locationservice/apiserver/repository/notification_control.py new file mode 100644 index 0000000..c4a56a9 --- /dev/null +++ b/locationservice-base/centos/docker/locationservice/apiserver/repository/notification_control.py @@ -0,0 +1,26 @@ +import os +import time +import json +from pecan import conf +from locationservicesdk.services.daemon import DaemonControl + +REGISTRATION_USER = os.environ.get("REGISTRATION_USER", "admin") +REGISTRATION_PASS = os.environ.get("REGISTRATION_PASS", "admin") +REGISTRATION_PORT = os.environ.get("REGISTRATION_PORT", "5672") +REGISTRATION_HOST = os.environ.get("REGISTRATION_HOST",'registration.notification.svc.cluster.local') +THIS_NODE_NAME = os.environ.get("THIS_NODE_NAME",'controller-0') +THIS_POD_IP = os.environ.get("THIS_POD_IP",'127.0.0.1') + +REGISTRATION_TRANSPORT_ENDPOINT = 'rabbit://{0}:{1}@{2}:{3}'.format( + REGISTRATION_USER, REGISTRATION_PASS, REGISTRATION_HOST, REGISTRATION_PORT) + +sqlalchemy_conf_json=json.dumps({}) +LocationInfo = { + 'NodeName': THIS_NODE_NAME, + 'PodIP': THIS_POD_IP, + 'ResourceTypes': ['PTP'], + 'Timestamp': time.time() + } +location_info_json = json.dumps(LocationInfo) +notification_control = DaemonControl( + sqlalchemy_conf_json, REGISTRATION_TRANSPORT_ENDPOINT, location_info_json) diff --git a/locationservice-base/centos/docker/locationservice/apiserver/templates/layout.html b/locationservice-base/centos/docker/locationservice/apiserver/templates/layout.html new file mode 100644 index 0000000..2bdfe88 --- /dev/null +++ b/locationservice-base/centos/docker/locationservice/apiserver/templates/layout.html @@ -0,0 +1,22 @@ + + + ${self.title()} + ${self.style()} + ${self.javascript()} + + + ${self.body()} + + + +<%def name="title()"> + Default Title + + +<%def name="style()"> + + + +<%def name="javascript()"> + + diff --git a/locationservice-base/centos/docker/locationservice/apiserver/tests/__init__.py b/locationservice-base/centos/docker/locationservice/apiserver/tests/__init__.py new file mode 100644 index 0000000..78ea527 --- /dev/null +++ b/locationservice-base/centos/docker/locationservice/apiserver/tests/__init__.py @@ -0,0 +1,22 @@ +import os +from unittest import TestCase +from pecan import set_config +from pecan.testing import load_test_app + +__all__ = ['FunctionalTest'] + + +class FunctionalTest(TestCase): + """ + Used for functional tests where you need to test your + literal application and its integration with the framework. + """ + + def setUp(self): + self.app = load_test_app(os.path.join( + os.path.dirname(__file__), + 'config.py' + )) + + def tearDown(self): + set_config({}, overwrite=True) diff --git a/locationservice-base/centos/docker/locationservice/apiserver/tests/config.py b/locationservice-base/centos/docker/locationservice/apiserver/tests/config.py new file mode 100644 index 0000000..8f66b60 --- /dev/null +++ b/locationservice-base/centos/docker/locationservice/apiserver/tests/config.py @@ -0,0 +1,25 @@ +# Server Specific Configurations +server = { + 'port': '8080', + 'host': '0.0.0.0' +} + +# Pecan Application Configurations +app = { + 'root': 'notificationclient.controllers.root.RootController', + 'modules': ['notificationclient'], + 'static_root': '%(confdir)s/../../public', + 'template_path': '%(confdir)s/../templates', + 'debug': True, + 'errors': { + '404': '/error/404', + '__force_dict__': True + } +} + +# Custom Configurations must be in Python dictionary format:: +# +# foo = {'bar':'baz'} +# +# All configurations are accessible at:: +# pecan.conf diff --git a/locationservice-base/centos/docker/locationservice/apiserver/tests/test_functional.py b/locationservice-base/centos/docker/locationservice/apiserver/tests/test_functional.py new file mode 100644 index 0000000..252c3aa --- /dev/null +++ b/locationservice-base/centos/docker/locationservice/apiserver/tests/test_functional.py @@ -0,0 +1,22 @@ +from unittest import TestCase +from webtest import TestApp +from notificationclient.tests import FunctionalTest + + +class TestRootController(FunctionalTest): + + def test_get(self): + response = self.app.get('/') + assert response.status_int == 200 + + def test_search(self): + response = self.app.post('/', params={'q': 'RestController'}) + assert response.status_int == 302 + assert response.headers['Location'] == ( + 'https://pecan.readthedocs.io/en/latest/search.html' + '?q=RestController' + ) + + def test_get_not_found(self): + response = self.app.get('/a/bogus/url', expect_errors=True) + assert response.status_int == 404 diff --git a/locationservice-base/centos/docker/locationservice/apiserver/tests/test_units.py b/locationservice-base/centos/docker/locationservice/apiserver/tests/test_units.py new file mode 100644 index 0000000..573fb68 --- /dev/null +++ b/locationservice-base/centos/docker/locationservice/apiserver/tests/test_units.py @@ -0,0 +1,7 @@ +from unittest import TestCase + + +class TestUnits(TestCase): + + def test_units(self): + assert 5 * 5 == 25 diff --git a/locationservice-base/centos/docker/locationservice/config.py b/locationservice-base/centos/docker/locationservice/config.py new file mode 100644 index 0000000..c0a9c3e --- /dev/null +++ b/locationservice-base/centos/docker/locationservice/config.py @@ -0,0 +1,54 @@ +# Server Specific Configurations +server = { + 'port': '8080', + 'host': '0.0.0.0' +} + +# Pecan Application Configurations +app = { + 'root': 'apiserver.controllers.root.RootController', + 'modules': ['apiserver'], + 'static_root': '%(confdir)s/public', + 'template_path': '%(confdir)s/apiserver/templates', + 'debug': True, + 'errors': { + 404: '/error/404', + '__force_dict__': True + } +} + +logging = { + 'root': {'level': 'INFO', 'handlers': ['console']}, + 'loggers': { + 'apiserver': {'level': 'DEBUG', 'handlers': ['console'], 'propagate': False}, + 'pecan': {'level': 'DEBUG', 'handlers': ['console'], 'propagate': False}, + 'py.warnings': {'handlers': ['console']}, + '__force_dict__': True + }, + 'handlers': { + 'console': { + 'level': 'DEBUG', + 'class': 'logging.StreamHandler', + 'formatter': 'color' + } + }, + 'formatters': { + 'simple': { + 'format': ('%(asctime)s %(levelname)-5.5s [%(name)s]' + '[%(threadName)s] %(message)s') + }, + 'color': { + '()': 'pecan.log.ColorFormatter', + 'format': ('%(asctime)s [%(padded_color_levelname)s] [%(name)s]' + '[%(threadName)s] %(message)s'), + '__force_dict__': True + } + } +} + +# Custom Configurations must be in Python dictionary format:: +# +# foo = {'bar':'baz'} +# +# All configurations are accessible at:: +# pecan.conf diff --git a/locationservice-base/centos/docker/locationservice/locationservice.egg-info/PKG-INFO b/locationservice-base/centos/docker/locationservice/locationservice.egg-info/PKG-INFO new file mode 100644 index 0000000..464fa9c --- /dev/null +++ b/locationservice-base/centos/docker/locationservice/locationservice.egg-info/PKG-INFO @@ -0,0 +1,11 @@ +Metadata-Version: 1.0 +Name: locationservice +Version: 0.1 +Summary: locationservice offers a container image for notificationservice +to manage location information +Home-page: UNKNOWN +Author: Bin Yang +Author-email: bin.yang@windriver.com +License: Apache License 2.0 +Description: UNKNOWN +Platform: UNKNOWN diff --git a/locationservice-base/centos/docker/locationservice/locationservice.egg-info/SOURCES.txt b/locationservice-base/centos/docker/locationservice/locationservice.egg-info/SOURCES.txt new file mode 100644 index 0000000..6e37901 --- /dev/null +++ b/locationservice-base/centos/docker/locationservice/locationservice.egg-info/SOURCES.txt @@ -0,0 +1,20 @@ +MANIFEST.in +setup.cfg +setup.py +apiserver/__init__.py +apiserver/app.py +apiserver.egg-info/PKG-INFO +apiserver.egg-info/SOURCES.txt +apiserver.egg-info/dependency_links.txt +apiserver.egg-info/not-zip-safe +apiserver.egg-info/requires.txt +apiserver.egg-info/top_level.txt +apiserver/controllers/__init__.py +apiserver/controllers/root.py +apiserver/model/__init__.py +apiserver/tests/__init__.py +apiserver/tests/config.py +apiserver/tests/test_functional.py +apiserver/tests/test_units.py +public/css/style.css +public/images/logo.png \ No newline at end of file diff --git a/locationservice-base/centos/docker/locationservice/locationservice.egg-info/dependency_links.txt b/locationservice-base/centos/docker/locationservice/locationservice.egg-info/dependency_links.txt new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/locationservice-base/centos/docker/locationservice/locationservice.egg-info/dependency_links.txt @@ -0,0 +1 @@ + diff --git a/locationservice-base/centos/docker/locationservice/locationservice.egg-info/not-zip-safe b/locationservice-base/centos/docker/locationservice/locationservice.egg-info/not-zip-safe new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/locationservice-base/centos/docker/locationservice/locationservice.egg-info/not-zip-safe @@ -0,0 +1 @@ + diff --git a/locationservice-base/centos/docker/locationservice/locationservice.egg-info/requires.txt b/locationservice-base/centos/docker/locationservice/locationservice.egg-info/requires.txt new file mode 100644 index 0000000..ea2fd7b --- /dev/null +++ b/locationservice-base/centos/docker/locationservice/locationservice.egg-info/requires.txt @@ -0,0 +1 @@ +pecan \ No newline at end of file diff --git a/locationservice-base/centos/docker/locationservice/locationservice.egg-info/top_level.txt b/locationservice-base/centos/docker/locationservice/locationservice.egg-info/top_level.txt new file mode 100644 index 0000000..e6f379c --- /dev/null +++ b/locationservice-base/centos/docker/locationservice/locationservice.egg-info/top_level.txt @@ -0,0 +1 @@ +locationservice diff --git a/locationservice-base/centos/docker/locationservice/locationservicesdk/__init__.py b/locationservice-base/centos/docker/locationservice/locationservicesdk/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/locationservice-base/centos/docker/locationservice/locationservicesdk/client/base.py b/locationservice-base/centos/docker/locationservice/locationservicesdk/client/base.py new file mode 100644 index 0000000..e3c8a47 --- /dev/null +++ b/locationservice-base/centos/docker/locationservice/locationservicesdk/client/base.py @@ -0,0 +1,107 @@ +import os +import json +import time +import oslo_messaging +from oslo_config import cfg +from locationservicesdk.common.helpers import rpc_helper +from locationservicesdk.model.dto.rpc_endpoint import RpcEndpointInfo + +import logging + +LOG = logging.getLogger(__name__) + +from locationservicesdk.common.helpers import log_helper +log_helper.config_logger(LOG) + +class BrokerClientBase(object): + def __init__(self, broker_name, broker_transport_endpoint): + self.broker_name = broker_name + self.listeners = {} + self.broker_endpoint = RpcEndpointInfo(broker_transport_endpoint) + self.transport = rpc_helper.get_transport(self.broker_endpoint) + LOG.debug("Created Broker client:{0}".format(broker_name)) + + def __del__(self): + self.transport.cleanup() + del self.transport + return + + def __create_listener(self, context): + target = oslo_messaging.Target( + topic=context['topic'], + server=context['server']) + endpoints = context['endpoints'] + server = oslo_messaging.get_rpc_server( + self.transport, target, endpoints, executor=None) + return server + + def _refresh(self): + for topic, servers in self.listeners.items(): + for servername, context in servers.items(): + try: + rpcserver = context.get('rpcserver', None) + isactive = context.get('active', False) + if isactive and not rpcserver: + rpcserver = self.__create_listener(context) + rpcserver.start() + context['rpcserver'] = rpcserver + LOG.debug("Started rpcserver@{0}@{1}".format(context['topic'], context['server'])) + elif not isactive and rpcserver: + rpcserver.stop() + rpcserver.wait() + context.pop('rpcserver') + LOG.debug("Stopped rpcserver@{0}@{1}".format(context['topic'], context['server'])) + except: + LOG.error("Failed to update listener for topic/server:{0}/{1}" + .format(topic, servername)) + continue + + def add_listener(self, topic, server, listener_endpoints=None): + context = self.listeners.get(topic,{}).get(server, {}) + if not context: + context = { + 'endpoints': listener_endpoints, + 'topic': topic, + 'server': server, + 'active': True + } + if not self.listeners.get(topic, None): + self.listeners[topic] = {} + self.listeners[topic][server] = context + else: + context['endpoints'] = listener_endpoints + context['active'] = True + + self._refresh() + + def remove_listener(self, topic, server): + context = self.listeners.get(topic,{}).get(server, {}) + if context: + context['active'] = False + self._refresh() + + def is_listening(self, topic, server): + context = self.listeners.get(topic,{}).get(server, {}) + return context.get('active', False) + + def any_listener(self): + for topic, servers in self.listeners.items(): + for servername, context in servers.items(): + isactive = context.get('active', False) + if isactive: + return True + return False + + def call(self, topic, server, api_name, **api_kwargs): + target = oslo_messaging.Target( + topic=topic, server=server, version=self.broker_endpoint.Version, + namespace=self.broker_endpoint.Namespace) + queryclient = oslo_messaging.RPCClient(self.transport, target, timeout = 2, retry = 0) + return queryclient.call({}, api_name, **api_kwargs) + + def cast(self, topic, api_name, **api_kwargs): + target = oslo_messaging.Target( + topic=topic, fanout=True, version=self.broker_endpoint.Version, + namespace=self.broker_endpoint.Namespace) + queryclient = oslo_messaging.RPCClient(self.transport, target) + queryclient.cast({}, api_name, **api_kwargs) diff --git a/locationservice-base/centos/docker/locationservice/locationservicesdk/client/locationproducer.py b/locationservice-base/centos/docker/locationservice/locationservicesdk/client/locationproducer.py new file mode 100644 index 0000000..bdf614d --- /dev/null +++ b/locationservice-base/centos/docker/locationservice/locationservicesdk/client/locationproducer.py @@ -0,0 +1,84 @@ +import os +import json +import time +import oslo_messaging +from oslo_config import cfg + +from locationservicesdk.client.base import BrokerClientBase + +import logging + +LOG = logging.getLogger(__name__) + +from locationservicesdk.common.helpers import log_helper +log_helper.config_logger(LOG) + + +class LocationProducer(BrokerClientBase): + class ListenerEndpoint(object): + target = oslo_messaging.Target(namespace='notification', version='1.0') + + def __init__(self, location_info, handler=None): + self.location_info = location_info + self.handler = handler + pass + + def QueryLocation(self, ctx, **rpc_kwargs): + LOG.debug ("LocationProducer QueryLocation called %s" %rpc_kwargs) + return self.location_info + + def TriggerAnnouncement(self, ctx, **rpc_kwargs): + LOG.debug ("LocationProducer TriggerAnnouncement called %s" %rpc_kwargs) + if self.handler: + return self.handler.handle(**rpc_kwargs) + else: + return False + + def __init__(self, node_name, registrationservice_transport_endpoint): + self.Id = id(self) + self.node_name = node_name + super(LocationProducer, self).__init__( + 'locationproducer', registrationservice_transport_endpoint) + return + + def __del__(self): + super(LocationProducer, self).__del__() + return + + def announce_location(self, LocationInfo): + location_topic_all='LocationListener-*' + location_topic='LocationListener-{0}'.format(self.node_name) + server = None + while True: + try: + self.cast(location_topic_all, 'NotifyLocation', location_info=LocationInfo) + LOG.debug("Broadcast location info:{0}@Topic:{1}".format(LocationInfo, location_topic)) + except Exception as ex: + LOG.debug("Failed to publish location due to: {0}".format(str(ex))) + continue + else: + break + + def start_location_listener(self, location_info, handler=None): + + topic='LocationQuery' + server="LocationService-{0}".format(self.node_name) + endpoints = [LocationProducer.ListenerEndpoint(location_info, handler)] + + super(LocationProducer, self).add_listener( + topic, server, endpoints) + return True + + def stop_location_listener(self): + topic='LocationQuery' + server="LocationService-{0}".format(self.node_name) + super(LocationProducer, self).remove_listener( + topic, server) + + def is_listening(self): + topic='LocationQuery' + server="LocationService-{0}".format(self.node_name) + return super(LocationProducer, self).is_listening( + topic, server) + + diff --git a/locationservice-base/centos/docker/locationservice/locationservicesdk/common/helpers/log_helper.py b/locationservice-base/centos/docker/locationservice/locationservicesdk/common/helpers/log_helper.py new file mode 100644 index 0000000..d1a16e7 --- /dev/null +++ b/locationservice-base/centos/docker/locationservice/locationservicesdk/common/helpers/log_helper.py @@ -0,0 +1,12 @@ +import logging + +def get_logger(module_name): + logger = logging.getLogger(module_name) + return config_logger(logger) + +def config_logger(logger): + ''' + configure the logger: uncomment following lines for debugging + ''' + # logger.setLevel(level=logging.DEBUG) + return logger diff --git a/locationservice-base/centos/docker/locationservice/locationservicesdk/common/helpers/rpc_helper.py b/locationservice-base/centos/docker/locationservice/locationservicesdk/common/helpers/rpc_helper.py new file mode 100644 index 0000000..ed98405 --- /dev/null +++ b/locationservice-base/centos/docker/locationservice/locationservicesdk/common/helpers/rpc_helper.py @@ -0,0 +1,22 @@ +#coding=utf-8 + +import os +import json + +import oslo_messaging +from oslo_config import cfg + + +def setup_client(rpc_endpoint_info, topic, server): + oslo_messaging.set_transport_defaults(rpc_endpoint_info.Exchange) + transport = oslo_messaging.get_rpc_transport(cfg.CONF, url=rpc_endpoint_info.TransportEndpoint) + target = oslo_messaging.Target(topic=topic, + version=rpc_endpoint_info.Version, + server=server, + namespace=rpc_endpoint_info.Namespace) + client = oslo_messaging.RPCClient(transport, target) + return client + +def get_transport(rpc_endpoint_info): + oslo_messaging.set_transport_defaults(rpc_endpoint_info.Exchange) + return oslo_messaging.get_rpc_transport(cfg.CONF, url=rpc_endpoint_info.TransportEndpoint) diff --git a/locationservice-base/centos/docker/locationservice/locationservicesdk/model/__init__.py b/locationservice-base/centos/docker/locationservice/locationservicesdk/model/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/locationservice-base/centos/docker/locationservice/locationservicesdk/model/dto/__init__.py b/locationservice-base/centos/docker/locationservice/locationservicesdk/model/dto/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/locationservice-base/centos/docker/locationservice/locationservicesdk/model/dto/location.py b/locationservice-base/centos/docker/locationservice/locationservicesdk/model/dto/location.py new file mode 100644 index 0000000..13ca17d --- /dev/null +++ b/locationservice-base/centos/docker/locationservice/locationservicesdk/model/dto/location.py @@ -0,0 +1,10 @@ +#coding=utf-8 + +from wsme import types as wtypes +from locationservicesdk.model.dto.resourcetype import EnumResourceType + +class LocationInfo(wtypes.Base): + NodeName = wtypes.text + PodIP = wtypes.text + Timestamp = float + ResourceTypes = [EnumResourceType] diff --git a/locationservice-base/centos/docker/locationservice/locationservicesdk/model/dto/resourcetype.py b/locationservice-base/centos/docker/locationservice/locationservicesdk/model/dto/resourcetype.py new file mode 100644 index 0000000..a04f96b --- /dev/null +++ b/locationservice-base/centos/docker/locationservice/locationservicesdk/model/dto/resourcetype.py @@ -0,0 +1,9 @@ +#coding=utf-8 + +from wsme import types as wtypes + +EnumResourceType = wtypes.Enum(str, 'PTP', 'FPGA') + +class ResourceType(object): + TypePTP = "PTP" + TypeFPGA = 'FPGA' diff --git a/locationservice-base/centos/docker/locationservice/locationservicesdk/model/dto/rpc_endpoint.py b/locationservice-base/centos/docker/locationservice/locationservicesdk/model/dto/rpc_endpoint.py new file mode 100644 index 0000000..150f57d --- /dev/null +++ b/locationservice-base/centos/docker/locationservice/locationservicesdk/model/dto/rpc_endpoint.py @@ -0,0 +1,34 @@ +#coding=utf-8 + +from wsme import types as wtypes + +RPC_ENDPOINT_BASE = { + 'Version': '1.0', + 'Namespace': 'notification', + 'Exchange': 'notification_exchange', + 'TransportEndpoint': '', + 'Topic': '', + 'Server': '' +} + +class RpcEndpointInfo(wtypes.Base): + TransportEndpoint = wtypes.text + Exchange = wtypes.text + Topic = wtypes.text + Server = wtypes.text + Version = wtypes.text + Namespace = wtypes.text + + def __init__(self, transport_endpoint): + self.endpoint_json = { + 'Version': RPC_ENDPOINT_BASE['Version'], + 'Namespace': RPC_ENDPOINT_BASE['Namespace'], + 'Exchange': RPC_ENDPOINT_BASE['Exchange'], + 'TransportEndpoint': transport_endpoint, + 'Topic': RPC_ENDPOINT_BASE['Topic'], + 'Server': RPC_ENDPOINT_BASE['Server'] + } + super(RpcEndpointInfo, self).__init__(**self.endpoint_json) + + def to_dict(self): + return self.endpoint_json diff --git a/locationservice-base/centos/docker/locationservice/locationservicesdk/services/__init__.py b/locationservice-base/centos/docker/locationservice/locationservicesdk/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/locationservice-base/centos/docker/locationservice/locationservicesdk/services/daemon.py b/locationservice-base/centos/docker/locationservice/locationservicesdk/services/daemon.py new file mode 100644 index 0000000..02455bd --- /dev/null +++ b/locationservice-base/centos/docker/locationservice/locationservicesdk/services/daemon.py @@ -0,0 +1,131 @@ + +import os +import json +import time +import oslo_messaging +from oslo_config import cfg +import logging + +import multiprocessing as mp + +from locationservicesdk.common.helpers import rpc_helper +from locationservicesdk.model.dto.rpc_endpoint import RpcEndpointInfo +from locationservicesdk.model.dto.resourcetype import ResourceType + +from locationservicesdk.client.locationproducer import LocationProducer + +LOG = logging.getLogger(__name__) + +from locationservicesdk.common.helpers import log_helper +log_helper.config_logger(LOG) + +'''Entry point of Default Process Worker''' +def ProcessWorkerDefault(event, sqlalchemy_conf_json, registration_endpoint, location_info_json): + worker = LocationWatcherDefault(event, sqlalchemy_conf_json, registration_endpoint, location_info_json) + worker.run() + return + + +class LocationWatcherDefault: + class LocationRequestHandlerDefault(object): + def __init__(self, watcher): + self.watcher = watcher + + def handle(self, **rpc_kwargs): + self.watcher.signal_location_event() + + def __init__(self, event, sqlalchemy_conf_json, registration_transport_endpoint, location_info_json): + self.sqlalchemy_conf = json.loads(sqlalchemy_conf_json) + self.event = event + self.event_timeout = float(2.0) + self.event_iteration = 0 + self.location_info = json.loads(location_info_json) + this_node_name = self.location_info['NodeName'] + + self.registration_endpoint = RpcEndpointInfo(registration_transport_endpoint) + self.LocationProducer = LocationProducer( + this_node_name, + self.registration_endpoint.TransportEndpoint) + + def signal_location_event(self): + if self.event: + self.event.set() + else: + LOG.warning("Unable to assert location event") + pass + + def run(self): + # start location listener + self.__start_listener() + while True: + # annouce the location + self.__announce_location() + if self.event.wait(self.event_timeout): + LOG.debug("daemon control event is asserted") + self.event.clear() + else: + # max timeout: 1 hour + if self.event_timeout < float(3600): + self.event_timeout = self.event_timeout + self.event_timeout + LOG.debug("daemon control event is timeout") + continue + self.__stop_listener() + + '''Start listener to answer querying from clients''' + def __start_listener(self): + LOG.debug("start listener to answer location querying") + + self.LocationProducer.start_location_listener( + self.location_info, + LocationWatcherDefault.LocationRequestHandlerDefault(self) + ) + return + + def __stop_listener(self): + LOG.debug("stop listener to answer location querying") + + self.LocationProducer.stop_location_listener(self.location_info) + return + + '''announce location''' + def __announce_location(self): + LOG.debug("announce location info to clients") + self.LocationProducer.announce_location(self.location_info) + return + +class DaemonControl(object): + + def __init__( + self, sqlalchemy_conf_json, registration_transport_endpoint, + location_info, process_worker = None, daemon_mode=True): + + self.daemon_mode = daemon_mode + self.event = mp.Event() + self.registration_endpoint = RpcEndpointInfo(registration_transport_endpoint) + self.registration_transport = rpc_helper.get_transport(self.registration_endpoint) + self.location_info = location_info + self.sqlalchemy_conf_json = sqlalchemy_conf_json + + if not process_worker: + process_worker = ProcessWorkerDefault + self.process_worker = process_worker + + if not self.daemon_mode: + return + + self.mpinstance = mp.Process( + target=process_worker, + args=(self.event, self.sqlalchemy_conf_json, + self.registration_endpoint.TransportEndpoint, + self.location_info)) + self.mpinstance.start() + + pass + + def refresh(self): + if not self.daemon_mode: + self.process_worker( + self.event, self.sqlalchemy_conf_json, + self.registration_endpoint.TransportEndpoint, self.location_info) + + self.event.set() diff --git a/locationservice-base/centos/docker/locationservice/public/css/style.css b/locationservice-base/centos/docker/locationservice/public/css/style.css new file mode 100644 index 0000000..55c9db5 --- /dev/null +++ b/locationservice-base/centos/docker/locationservice/public/css/style.css @@ -0,0 +1,43 @@ +body { + background: #311F00; + color: white; + font-family: 'Helvetica Neue', 'Helvetica', 'Verdana', sans-serif; + padding: 1em 2em; +} + +a { + color: #FAFF78; + text-decoration: none; +} + +a:hover { + text-decoration: underline; +} + +div#content { + width: 800px; + margin: 0 auto; +} + +form { + margin: 0; + padding: 0; + border: 0; +} + +fieldset { + border: 0; +} + +input.error { + background: #FAFF78; +} + +header { + text-align: center; +} + +h1, h2, h3, h4, h5, h6 { + font-family: 'Futura-CondensedExtraBold', 'Futura', 'Helvetica', sans-serif; + text-transform: uppercase; +} diff --git a/locationservice-base/centos/docker/locationservice/public/images/logo.png b/locationservice-base/centos/docker/locationservice/public/images/logo.png new file mode 100644 index 0000000..39de6e5 Binary files /dev/null and b/locationservice-base/centos/docker/locationservice/public/images/logo.png differ diff --git a/locationservice-base/centos/docker/locationservice/setup.cfg b/locationservice-base/centos/docker/locationservice/setup.cfg new file mode 100644 index 0000000..9935a8c --- /dev/null +++ b/locationservice-base/centos/docker/locationservice/setup.cfg @@ -0,0 +1,6 @@ +[nosetests] +match=^test +where=apiserver +nocapture=1 +cover-package=apiserver +cover-erase=1 diff --git a/locationservice-base/centos/docker/locationservice/setup.py b/locationservice-base/centos/docker/locationservice/setup.py new file mode 100644 index 0000000..73c70f4 --- /dev/null +++ b/locationservice-base/centos/docker/locationservice/setup.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- +try: + from setuptools import setup, find_packages +except ImportError: + from ez_setup import use_setuptools + use_setuptools() + from setuptools import setup, find_packages + +setup( + name='apiserver', + version='0.1', + description='', + author='', + author_email='', + install_requires=[ + "pecan", + ], + test_suite='apiserver', + zip_safe=False, + include_package_data=True, + packages=find_packages(exclude=['ez_setup']) +) diff --git a/locationservice-base/centos/locationservice-base.stable_docker_image b/locationservice-base/centos/locationservice-base.stable_docker_image new file mode 100644 index 0000000..fe18da2 --- /dev/null +++ b/locationservice-base/centos/locationservice-base.stable_docker_image @@ -0,0 +1,2 @@ +BUILDER=docker +LABEL=locationservice-base \ No newline at end of file diff --git a/notificationclient-base/centos/docker/Dockerfile b/notificationclient-base/centos/docker/Dockerfile new file mode 100644 index 0000000..0c0addf --- /dev/null +++ b/notificationclient-base/centos/docker/Dockerfile @@ -0,0 +1,23 @@ +ARG BASE +FROM ${BASE} + +ARG STX_REPO_FILE=/etc/yum.repos.d/stx.repo + +ENV KUBE_LATEST_VERSION="v1.18.3" + +RUN set -ex ;\ + yum install --disablerepo=* \ + $(grep '^name=' ${STX_REPO_FILE} | awk -F '=' '{printf "--enablerepo=" $2 " "}') \ + -y \ + gcc python3-devel python3-pip \ + && pip3 install --user pecan \ + && pip3 install oslo-config \ + && pip3 install oslo-messaging \ + && pip3 install WSME \ + && pip3 install sqlalchemy + +WORKDIR /opt/ +COPY ./notificationclient-sidecar /opt/notificationclient +RUN cd /opt/notificationclient && python3 setup.py develop + +CMD ["bash"] diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/MANIFEST.in b/notificationclient-base/centos/docker/notificationclient-sidecar/MANIFEST.in new file mode 100644 index 0000000..c922f11 --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/MANIFEST.in @@ -0,0 +1 @@ +recursive-include public * diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/config.py b/notificationclient-base/centos/docker/notificationclient-sidecar/config.py new file mode 100644 index 0000000..8a7714a --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/config.py @@ -0,0 +1,66 @@ +import os +SIDECAR_API_PORT = os.environ.get("SIDECAR_API_PORT", "8080") +SIDECAR_API_HOST = os.environ.get("SIDECAR_API_HOST", "127.0.0.1") +# Server Specific Configurations +server = { + 'port': SIDECAR_API_PORT, + 'host': SIDECAR_API_HOST +} + +# Pecan Application Configurations +app = { + 'root': 'sidecar.controllers.root.RootController', + 'modules': ['sidecar'], + 'static_root': '%(confdir)s/public', + 'template_path': '%(confdir)s/sidecar/templates', + 'debug': True, + 'errors': { + 404: '/error/404', + '__force_dict__': True + } +} + +logging = { + 'root': {'level': 'INFO', 'handlers': ['console']}, + 'loggers': { + 'sidecar': {'level': 'DEBUG', 'handlers': ['console'], 'propagate': False}, + 'pecan': {'level': 'DEBUG', 'handlers': ['console'], 'propagate': False}, + 'py.warnings': {'handlers': ['console']}, + '__force_dict__': True + }, + 'handlers': { + 'console': { + 'level': 'DEBUG', + 'class': 'logging.StreamHandler', + 'formatter': 'color' + } + }, + 'formatters': { + 'simple': { + 'format': ('%(asctime)s %(levelname)-5.5s [%(name)s]' + '[%(threadName)s] %(message)s') + }, + 'color': { + '()': 'pecan.log.ColorFormatter', + 'format': ('%(asctime)s [%(padded_color_levelname)s] [%(name)s]' + '[%(threadName)s] %(message)s'), + '__force_dict__': True + } + } +} + +# Bindings and options to pass to SQLAlchemy's ``create_engine`` +sqlalchemy = { + 'url' : 'sqlite:///sidecar.db', + 'echo' : False, + 'echo_pool' : False, + 'pool_recycle' : 3600, + 'encoding' : 'utf-8' +} + +# Custom Configurations must be in Python dictionary format:: +# +# foo = {'bar':'baz'} +# +# All configurations are accessible at:: +# pecan.conf diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/__init__.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/client/base.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/client/base.py new file mode 100644 index 0000000..8cccacf --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/client/base.py @@ -0,0 +1,109 @@ +import os +import json +import time +import oslo_messaging +from oslo_config import cfg +from notificationclientsdk.common.helpers import rpc_helper +from notificationclientsdk.model.dto.rpc_endpoint import RpcEndpointInfo + +import logging + +LOG = logging.getLogger(__name__) + +from notificationclientsdk.common.helpers import log_helper +log_helper.config_logger(LOG) + +class BrokerClientBase(object): + def __init__(self, broker_name, broker_transport_endpoint): + self.broker_name = broker_name + self.listeners = {} + self.broker_endpoint = RpcEndpointInfo(broker_transport_endpoint) + self.transport = rpc_helper.get_transport(self.broker_endpoint) + LOG.debug("Created Broker client:{0}".format(broker_name)) + + def __del__(self): + self.transport.cleanup() + del self.transport + return + + def __create_listener(self, context): + target = oslo_messaging.Target( + topic=context['topic'], + server=context['server']) + endpoints = context['endpoints'] + server = oslo_messaging.get_rpc_server( + self.transport, target, endpoints, executor=None) + return server + + def _refresh(self): + for topic, servers in self.listeners.items(): + for servername, context in servers.items(): + try: + rpcserver = context.get('rpcserver', None) + isactive = context.get('active', False) + if isactive and not rpcserver: + rpcserver = self.__create_listener(context) + rpcserver.start() + context['rpcserver'] = rpcserver + LOG.debug("Started rpcserver@{0}@{1}".format(context['topic'], context['server'])) + elif not isactive and rpcserver: + rpcserver.stop() + rpcserver.wait() + context.pop('rpcserver') + LOG.debug("Stopped rpcserver@{0}@{1}".format(context['topic'], context['server'])) + except: + LOG.error("Failed to update listener for topic/server:{0}/{1}" + .format(topic, servername)) + continue + + def add_listener(self, topic, server, listener_endpoints=None): + context = self.listeners.get(topic,{}).get(server, {}) + if not context: + context = { + 'endpoints': listener_endpoints, + 'topic': topic, + 'server': server, + 'active': True + } + if not self.listeners.get(topic, None): + self.listeners[topic] = {} + self.listeners[topic][server] = context + else: + context['endpoints'] = listener_endpoints + context['active'] = True + + self._refresh() + + def remove_listener(self, topic, server): + context = self.listeners.get(topic,{}).get(server, {}) + if context: + context['active'] = False + self._refresh() + + def is_listening(self, topic, server): + context = self.listeners.get(topic,{}).get(server, {}) + return context.get('active', False) + + def any_listener(self): + for topic, servers in self.listeners.items(): + for servername, context in servers.items(): + isactive = context.get('active', False) + if isactive: + return True + return False + + def call(self, topic, server, api_name, timeout=None, retry=None, **api_kwargs): + target = oslo_messaging.Target( + topic=topic, server=server, version=self.broker_endpoint.Version, + namespace=self.broker_endpoint.Namespace) + # note: the call might stuck here on 'Connection failed' and retry forever + # due to the tcp connection is unreachable: 'AMQP server on : is unreachable: timed out' + queryclient = oslo_messaging.RPCClient(self.transport, target, timeout = timeout, retry = retry) + return queryclient.call({}, api_name, **api_kwargs) + + def cast(self, topic, api_name, timeout=None, retry=None, **api_kwargs): + target = oslo_messaging.Target( + topic=topic, fanout=True, version=self.broker_endpoint.Version, + namespace=self.broker_endpoint.Namespace) + queryclient = oslo_messaging.RPCClient(self.transport, target, timeout = timeout, retry = retry) + queryclient.cast({}, api_name, **api_kwargs) diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/client/locationservice.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/client/locationservice.py new file mode 100644 index 0000000..53c18b5 --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/client/locationservice.py @@ -0,0 +1,149 @@ +import os +import json +import time +import oslo_messaging +from oslo_config import cfg + +from notificationclientsdk.common.helpers import hostfile_helper + +from notificationclientsdk.client.base import BrokerClientBase + +from notificationclientsdk.client.notificationservice import NotificationServiceClient, NotificationHandlerBase + +import logging + +LOG = logging.getLogger(__name__) + +from notificationclientsdk.common.helpers import log_helper +log_helper.config_logger(LOG) + +class LocationHandlerBase(object): + + def __init__(self): + self.NOTIFICATIONSERVICE_HOSTNAME = 'notificationservice-{0}' + pass + + def handle(self, location_info): + pass + +class LocationHandlerDefault(LocationHandlerBase): + def __init__(self, host_file_path='/etc/hosts'): + self.hostfile = host_file_path + super(LocationHandlerDefault, self).__init__() + + def handle(self, location_info): + LOG.debug("Received location info:{0}".format(location_info)) + nodename = location_info.get('NodeName', None) + podip = location_info.get("PodIP", None) + if not nodename or not podip: + LOG.warning("Mising NodeName or PodIP inside location info") + return False + + hostfile_helper.update_host( + self.NOTIFICATIONSERVICE_HOSTNAME.format(nodename), + podip) + LOG.debug("Updated location with IP:{0}".format(podip)) + return True + +class LocationServiceClient(BrokerClientBase): + class ListenerEndpoint(object): + target = oslo_messaging.Target(namespace='notification', version='1.0') + + def __init__(self, handler): + self.handler = handler + + def NotifyLocation(self, ctx, location_info): + LOG.debug("LocationServiceClient NotifyLocation called %s" % location_info) + self.handler.handle(location_info) + return time.time() + + def __init__(self, registrationservice_transport_endpoint): + self.Id = id(self) + super(LocationServiceClient, self).__init__( + 'locationservice', registrationservice_transport_endpoint) + return + + def __del__(self): + super(LocationServiceClient, self).__del__() + return + + def update_location(self, target_node_name, location_handler=None, timeout=None, retry=None): + if not location_handler: + location_handler = LocationHandlerDefault('/etc/hosts') + location_info = self.query_location(target_node_name, timeout=timeout, retry=retry) + if location_info: + location_handler.handle(location_info) + return True + else: + return False + + def query_location(self, target_node_name, timeout=None, retry=None): + topic = 'LocationQuery' + server = 'LocationService-{0}'.format(target_node_name) + return self.call(topic, server, 'QueryLocation', timeout=timeout, retry=retry) + + def trigger_location_annoucement(self, timeout=None, retry=None): + topic = 'LocationQuery' + return self.cast(topic, 'TriggerAnnouncement', timeout=timeout, retry=retry) + + def add_location_listener(self, target_node_name, location_handler=None): + if not location_handler: + location_handler = LocationHandlerDefault('/etc/hosts') + + topic='LocationListener-{0}'.format(target_node_name) + server="LocationListener-{0}".format(self.Id) + endpoints = [LocationServiceClient.ListenerEndpoint(location_handler)] + + super(LocationServiceClient, self).add_listener( + topic, server, endpoints) + return True + + def remove_location_listener(self, target_node_name): + topic='LocationListener-{0}'.format(target_node_name) + server="LocationListener-{0}".format(self.Id) + super(LocationServiceClient, self).remove_listener( + topic, server) + + def is_listening_on_location(self, target_node_name): + topic='LocationListener-{0}'.format(target_node_name) + server="LocationListener-{0}".format(self.Id) + return super(LocationServiceClient, self).is_listening( + topic, server) + + ### extensions + def trigger_publishing_status(self, resource_type, + timeout=None, retry=None, resource_qualifier_json=None): + topic = '{0}-Status'.format(resource_type) + try: + self.cast( + topic, 'TriggerDelivery', timeout=timeout, retry=retry, + QualifierJson=resource_qualifier_json) + except Exception as ex: + LOG.warning("Fail to trigger_publishing_status: {0}".format(str(ex))) + return False + return True + + def add_resource_status_listener(self, resource_type, status_handler=None): + if not status_handler: + status_handler = NotificationHandlerBase() + + topic='{0}-Event-*'.format(resource_type) + server="{0}-EventListener-{1}".format(resource_type, self.Id) + endpoints = [NotificationServiceClient.ListenerEndpoint(status_handler)] + + super(LocationServiceClient, self).add_listener( + topic, server, endpoints) + return True + + def remove_resource_status_listener(self, resource_type): + topic='{0}-Event-*'.format(resource_type) + server="{0}-EventListener-{1}".format(resource_type, self.Id) + super(LocationServiceClient, self).remove_listener( + topic, server) + pass + + def is_listening_on_resource(self, resource_type): + topic='{0}-Event-*'.format(resource_type) + server="{0}-EventListener-{1}".format(resource_type, self.Id) + return super(LocationServiceClient, self).is_listening( + topic, server) diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/client/notificationservice.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/client/notificationservice.py new file mode 100644 index 0000000..52a8030 --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/client/notificationservice.py @@ -0,0 +1,85 @@ +import os +import json +import time +import oslo_messaging +from oslo_config import cfg + +from notificationclientsdk.model.dto.rpc_endpoint import RpcEndpointInfo + +from notificationclientsdk.client.base import BrokerClientBase + +from notificationclientsdk.model.dto.subscription import SubscriptionInfo +from notificationclientsdk.repository.subscription_repo import SubscriptionRepo + +import logging + +LOG = logging.getLogger(__name__) + +from notificationclientsdk.common.helpers import log_helper +log_helper.config_logger(LOG) + +class NotificationHandlerBase(object): + + def __init__(self): + pass + + def handle(self, notification_info): + return False + +class NotificationServiceClient(BrokerClientBase): + class ListenerEndpoint(object): + target = oslo_messaging.Target(namespace='notification', version='1.0') + + def __init__(self, handler): + self.handler = handler + + def NotifyStatus(self, ctx, notification): + LOG.debug("NotificationServiceClient NotifyStatus called %s" % notification) + self.handler.handle(notification) + return time.time() + + '''Init client to notification service''' + def __init__(self, target_node_name, notificationservice_transport_endpoint): + self.Id = id(self) + self.target_node_name = target_node_name + super(NotificationServiceClient, self).__init__( + '{0}'.format(target_node_name), + notificationservice_transport_endpoint) + return + + def __del__(self): + super(NotificationServiceClient, self).__del__() + return + + def query_resource_status(self, resource_type, + timeout=None, retry=None, resource_qualifier_json=None): + topic = '{0}-Status'.format(resource_type) + server = '{0}-Tracking-{1}'.format(resource_type, self.target_node_name) + return self.call( + topic, server, 'QueryStatus', timeout=timeout, retry=retry, + QualifierJson=resource_qualifier_json) + + def add_resource_status_listener(self, resource_type, status_handler=None): + if not status_handler: + status_handler = NotificationHandlerBase() + + topic='{0}-Event-{1}'.format(resource_type, self.broker_name) + server="{0}-EventListener-{1}".format(resource_type, self.Id) + endpoints = [NotificationServiceClient.ListenerEndpoint(status_handler)] + + super(NotificationServiceClient, self).add_listener( + topic, server, endpoints) + return True + + def remove_resource_status_listener(self, resource_type): + topic='{0}-Event-{1}'.format(resource_type, self.broker_name) + server="{0}-EventListener-{1}".format(resource_type, self.Id) + super(NotificationServiceClient, self).remove_listener( + topic, server) + pass + + def is_listening_on_resource(self, resource_type): + topic='{0}-Event-{1}'.format(resource_type, self.broker_name) + server="{0}-EventListener-{1}".format(resource_type, self.Id) + return super(NotificationServiceClient, self).is_listening( + topic, server) diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/hostfile_helper.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/hostfile_helper.py new file mode 100644 index 0000000..d78d14a --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/hostfile_helper.py @@ -0,0 +1,29 @@ +#coding:utf8 + +import os +import sys +import re + +def update_host(hostname, ip): + hostsfile="/etc/hosts" + Lines=[] + replaced = False + + with open(hostsfile) as fd: + for line in fd.readlines(): + if line.strip() == '': + Lines.append(line) + else: + h_name = line.strip().split()[1] + if h_name == hostname: + lin = "{0} {1}".format(ip, hostname) + Lines.append(lin) + replaced = True + else: + Lines.append(line) + + if replaced == False: + Lines.append("{0} {1}".format(ip, hostname)) + + with open(hostsfile, 'w') as fc: + fc.writelines(Lines) diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/log_helper.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/log_helper.py new file mode 100644 index 0000000..edaa55a --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/log_helper.py @@ -0,0 +1,12 @@ +import logging + +def get_logger(module_name): + logger = logging.getLogger(module_name) + return config_logger(logger) + +def config_logger(logger): + ''' + configure the logger: uncomment following lines for debugging + ''' + # logger.setLevel(level=logging.DEBUG) + return logger diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/nodeinfo_helper.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/nodeinfo_helper.py new file mode 100644 index 0000000..33773d5 --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/nodeinfo_helper.py @@ -0,0 +1,71 @@ +import json +from notificationclientsdk.repository.node_repo import NodeRepo + +class NodeInfoHelper(object): + BROKER_NODE_ALL = '*' + residing_node_name = None + + @staticmethod + def set_residing_node(residing_node_name): + NodeInfoHelper.residing_node_name = residing_node_name + + @staticmethod + def get_residing_node(): + residing_node_name = NodeInfoHelper.residing_node_name + return residing_node_name + + @staticmethod + def expand_node_name(node_name_pattern): + if node_name_pattern == '.': + return NodeInfoHelper.residing_node_name + elif node_name_pattern == NodeInfoHelper.BROKER_NODE_ALL: + return NodeInfoHelper.BROKER_NODE_ALL + else: + return node_name_pattern + + @staticmethod + def default_node_name(node_name_pattern): + if node_name_pattern == '.' or node_name_pattern == '*': + return NodeInfoHelper.residing_node_name + else: + return node_name_pattern + + @staticmethod + def match_node_name(node_name_pattern, target_node_name): + if node_name_pattern == '*': + return True + elif node_name_pattern == '.': + return NodeInfoHelper.residing_node_name == target_node_name + else: + return node_name_pattern == target_node_name + + @staticmethod + def enumerate_nodes(node_name_pattern): + ''' + enumerate nodes from node repo by pattern + ''' + nodeinfos = [] + if not node_name_pattern: + raise ValueError("node name pattern is invalid") + + nodeinfo_repo = None + try: + nodeinfo_repo = NodeRepo(autocommit=True) + filter = {} + if node_name_pattern == '*': + pass + elif not node_name_pattern or node_name_pattern == '.': + filter = { 'NodeName': NodeInfoHelper.residing_node_name } + else: + filter = { 'NodeName': node_name_pattern } + + nodeinfos = [x.NodeName for x in nodeinfo_repo.get(Status=1, **filter)] + + except Exception as ex: + LOG.warning("Failed to enumerate nodes:{0}".format(str(ex))) + nodeinfos = None + finally: + if nodeinfo_repo: + del nodeinfo_repo + + return nodeinfos diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/rpc_helper.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/rpc_helper.py new file mode 100644 index 0000000..f159b74 --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/rpc_helper.py @@ -0,0 +1,22 @@ +#coding=utf-8 + +import os +import json + +import oslo_messaging +from oslo_config import cfg + + +def setup_client(rpc_endpoint_info, topic, server): + oslo_messaging.set_transport_defaults(rpc_endpoint_info.Exchange) + transport = oslo_messaging.get_rpc_transport(cfg.CONF, url=rpc_endpoint_info.TransportEndpoint) + target = oslo_messaging.Target(topic=topic, + version=rpc_endpoint_info.Version, + server=server, + namespace=rpc_endpoint_info.Namespace) + client = oslo_messaging.RPCClient(transport, target) + return client + +def get_transport(rpc_endpoint_info): + oslo_messaging.set_transport_defaults(rpc_endpoint_info.Exchange) + return oslo_messaging.get_rpc_transport(cfg.CONF, url=rpc_endpoint_info.TransportEndpoint) diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/subscription_helper.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/subscription_helper.py new file mode 100644 index 0000000..6c1fc18 --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/subscription_helper.py @@ -0,0 +1,49 @@ +#coding=utf-8 + +import os +import json + +import requests +import logging + +from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper + +LOG = logging.getLogger(__name__) + +from notificationclientsdk.common.helpers import log_helper +log_helper.config_logger(LOG) + +def notify(subscriptioninfo, notification, timeout=2, retry=3): + result = False + while True: + retry = retry - 1 + try: + headers = {'Content-Type': 'application/json'} + data = json.dumps(notification) + url = subscriptioninfo.EndpointUri + response = requests.post(url, data=data, headers=headers, timeout=timeout) + response.raise_for_status() + result = True + return response + except requests.exceptions.ConnectionError as errc: + if retry > 0: + LOG.warning("Retry notifying due to: {0}".format(str(errc))) + continue + raise errc + except requests.exceptions.Timeout as errt: + if retry > 0: + LOG.warning("Retry notifying due to: {0}".format(str(errt))) + continue + raise errt + except requests.exceptions.RequestException as ex: + LOG.warning("Failed to notify due to: {0}".format(str(ex))) + raise ex + except requests.exceptions.HTTPError as ex: + LOG.warning("Failed to notify due to: {0}".format(str(ex))) + raise ex + except Exception as ex: + LOG.warning("Failed to notify due to: {0}".format(str(ex))) + raise ex + + return result + diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/__init__.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/dto/__init__.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/dto/__init__.py new file mode 100644 index 0000000..1216f3f --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/dto/__init__.py @@ -0,0 +1,16 @@ +from notificationclientsdk.model.dto.subscription import SubscriptionInfo +from notificationclientsdk.model.dto.subscription import ResourceQualifierPtp + +from wsme.rest.json import tojson + +@tojson.when_object(SubscriptionInfo) +def subscriptioninfo_tojson(datatype, value): + if value is None: + return None + return value.to_dict() + +@tojson.when_object(ResourceQualifierPtp) +def resourcequalifierptp_tojson(datatype, value): + if value is None: + return None + return value.to_dict() diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/dto/location.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/dto/location.py new file mode 100644 index 0000000..f9f7e0b --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/dto/location.py @@ -0,0 +1,30 @@ +#coding=utf-8 + +import json + +from wsme import types as wtypes +from notificationclientsdk.model.dto.resourcetype import EnumResourceType + +class LocationInfo(wtypes.Base): + NodeName = wtypes.text + PodIP = wtypes.text + Timestamp = float + ResourceTypes = [EnumResourceType] + + def to_dict(self): + d = { + 'NodeName': self.NodeName, + 'PodIP': self.PodIP, + 'Timestamp': self.Timestamp, + 'ResourceTypes': [x for x in self.ResourceTypes] + } + return d + + def to_orm(self): + d = { + 'NodeName': self.NodeName, + 'PodIP': self.PodIP or '', + 'Timestamp': self.Timestamp, + 'ResourceTypes': json.dumps([x for x in self.ResourceTypes]) if self.ResourceTypes else '' + } + return d diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/dto/resourcetype.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/dto/resourcetype.py new file mode 100644 index 0000000..5e80cf3 --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/dto/resourcetype.py @@ -0,0 +1,9 @@ +#coding=utf-8 + +from wsme import types as wtypes + +EnumResourceType = wtypes.Enum(str, 'PTP', 'FPGA') + +class ResourceType(object): + TypePTP = "PTP" + TypeFPGA = "FPGA" diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/dto/rpc_endpoint.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/dto/rpc_endpoint.py new file mode 100644 index 0000000..a955655 --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/dto/rpc_endpoint.py @@ -0,0 +1,34 @@ +#coding=utf-8 + +from wsme import types as wtypes + +RPC_ENDPOINT_BASE = { + 'Version': '1.0', + 'Namespace': 'notification', + 'Exchange': 'notification_exchange', + 'TransportEndpoint': '', + 'Topic': '', + 'Server': '' +} + +class RpcEndpointInfo(wtypes.Base): + TransportEndpoint = wtypes.text + Exchange = wtypes.text + Topic = wtypes.text + Server = wtypes.text + Version = wtypes.text + Namespace = wtypes.text + + def __init__(self, transport_endpoint): + self.endpoint_json = { + 'Version': RPC_ENDPOINT_BASE['Version'], + 'Namespace': RPC_ENDPOINT_BASE['Namespace'], + 'Exchange': RPC_ENDPOINT_BASE['Exchange'], + 'TransportEndpoint': transport_endpoint, + 'Topic': RPC_ENDPOINT_BASE['Topic'], + 'Server': RPC_ENDPOINT_BASE['Server'] + } + super(RpcEndpointInfo, self).__init__(**self.endpoint_json) + + def to_dict(self): + return self.endpoint_json diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/dto/subscription.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/dto/subscription.py new file mode 100644 index 0000000..12714af --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/dto/subscription.py @@ -0,0 +1,94 @@ +#coding=utf-8 + +import os +import json +from wsme import types as wtypes +import datetime +import time + +import uuid + +from notificationclientsdk.model.dto.resourcetype import EnumResourceType, ResourceType + +''' +Base for Resource Qualifiers +''' +class ResourceQualifierBase(wtypes.Base): + def __init__(self, **kw): + super(ResourceQualifierBase, self).__init__(**kw) + + def to_dict(self): + pass + + +''' +Resource Qualifiers PTP +''' +class ResourceQualifierPtp(ResourceQualifierBase): + NodeName = wtypes.text + + def __init__(self, **kw): + self.NodeName = kw.pop('NodeName', None) + super(ResourceQualifierPtp, self).__init__(**kw) + + def to_dict(self): + d = { + 'NodeName': self.NodeName + } + return d + +''' +ViewModel of Subscription +''' +class SubscriptionInfo(wtypes.Base): + SubscriptionId = wtypes.text + UriLocation = wtypes.text + ResourceType = EnumResourceType + EndpointUri = wtypes.text + + # dynamic type depending on ResourceType + def set_resource_qualifier(self, value): + if isinstance(value, wtypes.Base): + self._ResourceQualifer = value + else: + self._ResourceQualifierJson = value + self._ResourceQualifer = None + + def get_resource_qualifier(self): + if not self._ResourceQualifer: + if self.ResourceType == ResourceType.TypePTP: + self._ResourceQualifer = ResourceQualifierPtp(**self._ResourceQualifierJson) + + return self._ResourceQualifer + + ResourceQualifier = wtypes.wsproperty(wtypes.Base, + get_resource_qualifier, set_resource_qualifier, mandatory=True) + + + def __init__(self, orm_entry=None): + if orm_entry: + self.SubscriptionId = orm_entry.SubscriptionId + self.ResourceType = orm_entry.ResourceType + self.UriLocation = orm_entry.UriLocation + self.ResourceQualifier = json.loads(orm_entry.ResourceQualifierJson) + self.EndpointUri = orm_entry.EndpointUri + + def to_dict(self): + d = { + 'SubscriptionId': self.SubscriptionId, + 'ResourceType': self.ResourceType, + 'UriLocation': self.UriLocation, + 'EndpointUri': self.EndpointUri, + 'ResourceQualifier': self.ResourceQualifier.to_dict() + } + return d + + def to_orm(self): + d = { + 'SubscriptionId': self.SubscriptionId, + 'ResourceType': self.ResourceType or '', + 'UriLocation': self.UriLocation, + 'EndpointUri': self.EndpointUri or '', + 'ResourceQualifierJson': json.dumps(self.ResourceQualifier.to_dict()) or '' + } + return d diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/orm/__init__.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/orm/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/orm/base.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/orm/base.py new file mode 100644 index 0000000..b00c202 --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/orm/base.py @@ -0,0 +1,10 @@ +import sqlalchemy +from sqlalchemy import MetaData +from sqlalchemy.ext.declarative import declarative_base + +DefaultMetaData = MetaData() +OrmBase = declarative_base(metadata = DefaultMetaData) #生成orm基类 + +def create_tables(orm_engine): + OrmBase.metadata.create_all(orm_engine) #创建表结构 + return OrmBase.metadata diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/orm/node.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/orm/node.py new file mode 100644 index 0000000..19aa8b9 --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/orm/node.py @@ -0,0 +1,20 @@ +from sqlalchemy import Float, Integer, ForeignKey, String, Column + +from notificationclientsdk.model.orm.base import OrmBase + +''' +NodeName: literal Node Name +ResourceTypes: json dump of Enumerate string list: PTP, FPGA, etc +''' +class NodeInfo(OrmBase): + __tablename__ = 'nodeinfo' + NodeName = Column(String(128), primary_key=True) + PodIP = Column(String(256)) + ResourceTypes = Column(String(1024)) + Timestamp = Column(Float) + Status = Column(Integer) + CreateTime = Column(Float) + LastUpdateTime = Column(Float) + +def create_tables(orm_engine): + NodeInfo.metadata.create_all(orm_engine) diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/orm/subscription.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/orm/subscription.py new file mode 100644 index 0000000..0e2be72 --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/orm/subscription.py @@ -0,0 +1,17 @@ +from sqlalchemy import Float, Integer, ForeignKey, String, Column + +from notificationclientsdk.model.orm.base import OrmBase + +class Subscription(OrmBase): + __tablename__ = 'subscription' + SubscriptionId = Column(String(128), primary_key=True) + UriLocation = Column(String(512)) + ResourceType = Column(String(64)) + EndpointUri = Column(String(512)) + Status = Column(Integer) + CreateTime = Column(Float) + LastUpdateTime = Column(Float) + ResourceQualifierJson = Column(String) + +def create_tables(orm_engine): + Subscription.metadata.create_all(orm_engine) diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/repository/__init__.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/repository/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/repository/dbcontext.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/repository/dbcontext.py new file mode 100644 index 0000000..a3a3660 --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/repository/dbcontext.py @@ -0,0 +1,75 @@ +import logging + +from sqlalchemy import create_engine, MetaData +from sqlalchemy.orm import scoped_session, sessionmaker + +from notificationclientsdk.model.orm import base +from notificationclientsdk.model.orm import subscription +from notificationclientsdk.model.orm import node + +LOG = logging.getLogger(__name__) + +from notificationclientsdk.common.helpers import log_helper +log_helper.config_logger(LOG) + + +class DbContext(object): + # static properties + DBSession = None + metadata = None + engine = None + + @staticmethod + def _engine_from_config(configuration): + configuration = dict(configuration) + url = configuration.pop('url') + return create_engine(url, **configuration) + + @staticmethod + def init_dbcontext(sqlalchemy_conf): + """ + This is a stub method which is called at application startup time. + + If you need to bind to a parsed database configuration, set up tables or + ORM classes, or perform any database initialization, this is the + recommended place to do it. + + For more information working with databases, and some common recipes, + see https://pecan.readthedocs.io/en/latest/databases.html + """ + + DbContext.engine = DbContext._engine_from_config(sqlalchemy_conf) + DbContext.DbSession = sessionmaker(bind=DbContext.engine) + + DbContext.metadata = base.create_tables(DbContext.engine) + DbContext.metadata.bind = DbContext.engine + + def __init__(self, session=None): + LOG.debug("initing DbContext ...") + if not session: + if not DbContext.engine: + raise Exception("DbContext must be inited with DbContext.init_dbcontext() first") + session = scoped_session(DbContext.DbSession) + self.session = session + + def __del__(self): + LOG.debug("deleting DbContext ...") + pass + + def get_session(self): + return self.session + + def start(self): + pass + + def start_read_only(self): + self.start() + + def commit(self): + self.session.commit() + + def rollback(self): + self.session.rollback() + + def clear(self): + self.session.remove() diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/repository/node_repo.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/repository/node_repo.py new file mode 100644 index 0000000..8eac79f --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/repository/node_repo.py @@ -0,0 +1,81 @@ +import time, uuid +import logging + +from sqlalchemy.orm import scoped_session, sessionmaker + +from notificationclientsdk.model.orm.node import NodeInfo as NodeInfoOrm +from notificationclientsdk.repository.dbcontext import DbContext + +LOG = logging.getLogger(__name__) + +from notificationclientsdk.common.helpers import log_helper +log_helper.config_logger(LOG) + +class NodeRepo(DbContext): + def __init__(self, session=None, autocommit=False): + self.autocommit = autocommit + super(NodeRepo, self).__init__(session) + if session: + self.own_session = False + else: + self.own_session = True + + def __del__(self): + if self.own_session: + self.clear() + + def add(self, nodeinfo): + try: + nodeinfo.Status = 1 + nodeinfo.CreateTime = time.time() + nodeinfo.LastUpdateTime = nodeinfo.CreateTime + self.session.add(nodeinfo) + except Exception as ex: + if self.autocommit: + self.rollback() + raise ex + else: + if self.autocommit: + self.commit() + return nodeinfo + + def update(self, node_name, **data): + try: + data['LastUpdateTime'] = time.time() + self.session.query(NodeInfoOrm).filter_by(NodeName=node_name).update(data) + except Exception as ex: + if self.autocommit: + self.rollback() + raise ex + else: + if self.autocommit: + self.commit() + + def get_one(self, **filter): + return self.session.query(NodeInfoOrm).filter_by(**filter).first() + + def get(self, **filter): + return self.session.query(NodeInfoOrm).filter_by(**filter) + + def delete_one(self, **filter): + try: + entry = self.session.query(NodeInfoOrm).filter_by(**filter).first() + self.session.delete(entry) + except Exception as ex: + if self.autocommit: + self.rollback() + raise ex + else: + if self.autocommit: + self.commit() + + def delete(self, **filter): + try: + entry = self.session.query(NodeInfoOrm).filter_by(**filter).delete() + except Exception as ex: + if self.autocommit: + self.rollback() + raise ex + else: + if self.autocommit: + self.commit() diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/repository/subscription_repo.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/repository/subscription_repo.py new file mode 100644 index 0000000..dc305e3 --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/repository/subscription_repo.py @@ -0,0 +1,86 @@ +import time, uuid +import logging + +from sqlalchemy.orm import scoped_session, sessionmaker + +from notificationclientsdk.model.orm.subscription import Subscription as SubscriptionOrm +from notificationclientsdk.repository.dbcontext import DbContext + +LOG = logging.getLogger(__name__) + +from notificationclientsdk.common.helpers import log_helper +log_helper.config_logger(LOG) + +class SubscriptionRepo(DbContext): + + def __init__(self, session=None, autocommit=False): + self.autocommit = autocommit + super(SubscriptionRepo, self).__init__(session) + if session: + self.own_session = False + else: + self.own_session = True + + def __del__(self): + if self.own_session: + self.clear() + + def add(self, subscription): + try: + subscription.SubscriptionId = str(uuid.uuid1()) + subscription.Status = 1 + subscription.CreateTime = time.time() + subscription.LastUpdateTime = subscription.CreateTime + subscription.UriLocation = "{0}/{1}".format( + subscription.UriLocation, subscription.SubscriptionId) + + self.session.add(subscription) + except Exception as ex: + if self.autocommit: + self.rollback() + raise ex + else: + if self.autocommit: + self.commit() + return subscription + + def update(self, subscriptionid, **data): + try: + data['LastUpdateTime'] = time.time() + self.session.query(SubscriptionOrm).filter_by(SubscriptionId=subscriptionid).update(data) + except Exception as ex: + if self.autocommit: + self.rollback() + raise ex + else: + if self.autocommit: + self.commit() + + def get_one(self, **filter): + return self.session.query(SubscriptionOrm).filter_by(**filter).first() + + def get(self, **filter): + return self.session.query(SubscriptionOrm).filter_by(**filter) + + def delete_one(self, **filter): + try: + entry = self.session.query(SubscriptionOrm).filter_by(**filter).first() + self.session.delete(entry) + except Exception as ex: + if self.autocommit: + self.rollback() + raise ex + else: + if self.autocommit: + self.commit() + + def delete(self, **filter): + try: + entry = self.session.query(SubscriptionOrm).filter_by(**filter).delete() + except Exception as ex: + if self.autocommit: + self.rollback() + raise ex + else: + if self.autocommit: + self.commit() diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/__init__.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/daemon.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/daemon.py new file mode 100644 index 0000000..99a2648 --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/daemon.py @@ -0,0 +1,666 @@ + +import os +import json +import time +import oslo_messaging +from oslo_config import cfg +import logging + +import multiprocessing as mp +import threading +import sys +if sys.version > '3': + import queue as Queue +else: + import Queue + +from notificationclientsdk.common.helpers import subscription_helper +from notificationclientsdk.common.helpers import rpc_helper, hostfile_helper +from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper + +from notificationclientsdk.model.dto.rpc_endpoint import RpcEndpointInfo +from notificationclientsdk.model.dto.subscription import SubscriptionInfo +from notificationclientsdk.model.dto.resourcetype import ResourceType +from notificationclientsdk.model.dto.location import LocationInfo + +from notificationclientsdk.repository.dbcontext import DbContext +from notificationclientsdk.repository.subscription_repo import SubscriptionRepo + +from notificationclientsdk.model.orm.node import NodeInfo as NodeInfoOrm + +from notificationclientsdk.repository.node_repo import NodeRepo + +from notificationclientsdk.client.locationservice import LocationServiceClient +from notificationclientsdk.client.notificationservice import NotificationServiceClient +from notificationclientsdk.client.notificationservice import NotificationHandlerBase + +from notificationclientsdk.client.locationservice import LocationHandlerDefault + +LOG = logging.getLogger(__name__) + +from notificationclientsdk.common.helpers import log_helper +log_helper.config_logger(LOG) + +'''Entry point of Default Process Worker''' +def ProcessWorkerDefault(event, subscription_event, daemon_context): + worker = NotificationWorker(event, subscription_event, daemon_context) + worker.run() + return + + +class NotificationWorker: + + class NotificationWatcher(NotificationHandlerBase): + def __init__(self, notification_watcher): + self.notification_watcher = notification_watcher + super(NotificationWorker.NotificationWatcher, self).__init__() + + def handle(self, notification_info): + LOG.debug("Received notification:{0}".format(notification_info)) + result = self.notification_watcher.handle_notification_delivery(notification_info) + return result + + class NodeInfoWatcher(LocationHandlerDefault): + def __init__(self, notification_watcher): + self.notification_watcher = notification_watcher + super(NotificationWorker.NodeInfoWatcher, self).__init__() + + def handle(self, location_info): + LOG.debug("Received location info:{0}".format(location_info)) + return self.notification_watcher.produce_location_event(location_info) + + def __init__( + self, event, subscription_event, daemon_context): + + self.daemon_context = daemon_context + self.residing_node_name = daemon_context['THIS_NODE_NAME'] + NodeInfoHelper.set_residing_node(self.residing_node_name) + + self.sqlalchemy_conf = json.loads(daemon_context['SQLALCHEMY_CONF_JSON']) + DbContext.init_dbcontext(self.sqlalchemy_conf) + self.event = event + self.subscription_event = subscription_event + + self.registration_endpoint = RpcEndpointInfo(daemon_context['REGISTRATION_TRANSPORT_ENDPOINT']) + self.locationservice_client = LocationServiceClient(self.registration_endpoint.TransportEndpoint) + # dict,key: node name, value , notificationservice client + self.notificationservice_clients = {} + + # Watcher callback + self.__NotificationWatcher = NotificationWorker.NotificationWatcher(self) + self.__NodeInfoWatcher = NotificationWorker.NodeInfoWatcher(self) + + self.__init_node_resources_map() + self.__init_node_info_channel() + self.__init_location_channel() + self.__init_notification_channel() + self.__init_node_sync_channel() + + def __init_node_resources_map(self): + self.node_resources_map = {} + self.node_resources_iteration = 0 + self.__node_resources_event = mp.Event() + + def __init_node_info_channel(self): + self.__node_info_event = mp.Event() + + def __init_location_channel(self): + self.location_event = mp.Event() + self.location_lock = threading.Lock() + # map index by node name + # only cache the latest loation info + self.location_channel = {} + self.location_keys_q = Queue.Queue() + + def __init_notification_channel(self): + self.notification_lock = threading.Lock() + self.notification_stat = {} + + def __init_node_sync_channel(self): + self.__node_sync_event = mp.Event() + # initial to be set + self.__node_sync_event.set() + + def __del__(self): + del self.locationservice_client + + def signal_location_event(self): + self.location_event.set() + + def signal_subscription_event(self): + self.subscription_event.set() + + def signal_node_sync_event(self): + self.__node_sync_event.set() + + def signal_nodeinfo_event(self): + self.__node_info_event.set() + + def signal_node_resources_event(self): + self.__node_resources_event.set() + + def signal_events(self): + self.event.set() + + def produce_location_event(self, location_info): + node_name = location_info.get('NodeName', None) + podip = location_info.get("PodIP", None) + if not node_name or not podip: + LOG.warning("Missing PodIP inside location info:{0}".format(location_info)) + return False + + result = True + timestamp = location_info.get('Timestamp', 0) + # acquire lock to sync threads which invoke this method + self.location_lock.acquire() + try: + current = self.location_channel.get(node_name, {}) + if current.get('Timestamp', 0) < timestamp: + if current.get('PodIP', None) != podip: + # update /etc/hosts must happen in threads to avoid blocking by the main thread + NOTIFICATIONSERVICE_HOSTNAME = 'notificationservice-{0}' + hostfile_helper.update_host( + NOTIFICATIONSERVICE_HOSTNAME.format(node_name), podip) + LOG.debug("Updated location with IP:{0}".format(podip)) + + # replace the location_info + self.location_channel[node_name] = location_info + self.location_keys_q.put(node_name) + # notify the consumer to process the update + self.signal_location_event() + self.signal_events() + result = True + except Exception as ex: + LOG.warning("failed to produce location event:{0}".format(str(ex))) + result = False + finally: + # release lock + self.location_lock.release() + + return result + + def consume_location_event(self): + LOG.debug("Start consuming location event") + need_to_sync_node = False + node_changed = False + node_resource_updated = False + nodeinfo_repo = NodeRepo(autocommit=True) + + while not self.location_keys_q.empty(): + node_name = self.location_keys_q.get(False) + location_info = self.location_channel.get(node_name, None) + if not location_info: + LOG.warning("consume location@{0} without location info".format(node_name)) + continue + + LOG.debug("process location event@{0}:{1}".format(node_name, location_info)) + + location_info2 = LocationInfo(**location_info) + + entry = nodeinfo_repo.get_one(NodeName=location_info['NodeName'], Status=1) + if not entry: + entry = NodeInfoOrm(**location_info2.to_orm()) + nodeinfo_repo.add(entry) + node_resource_updated = True + node_changed = True + LOG.debug("Add NodeInfo: {0}".format(entry.NodeName)) + elif not entry.Timestamp or entry.Timestamp < location_info['Timestamp']: + # update the entry + if entry.ResourceTypes != location_info2.ResourceTypes: + node_resource_updated = True + nodeinfo_repo.update(entry.NodeName, **location_info2.to_orm()) + LOG.debug("Update NodeInfo: {0}".format(entry.NodeName)) + else: + # do nothing + LOG.debug("Ignore the location for: {0}".format(entry.NodeName)) + continue + need_to_sync_node = True + continue + + del nodeinfo_repo + LOG.debug("Finished consuming location event") + if need_to_sync_node or node_resource_updated: + if node_changed: + LOG.debug("signal node changed event") + # node changes triggers rebuild map from subscription + # due to the potential subscriptions to all nodes + self.signal_subscription_event() + if node_resource_updated: + # signal the potential changes on node resources + LOG.debug("signal node resources updating event") + self.signal_nodeinfo_event() + if need_to_sync_node: + LOG.debug("signal node syncing event") + self.signal_node_sync_event() + self.signal_events() + pass + + def handle_notification_delivery(self, notification_info): + LOG.debug("start notification delivery") + result = True + subscription_repo = None + try: + self.notification_lock.acquire() + subscription_repo = SubscriptionRepo(autocommit=True) + resource_type = notification_info.get('ResourceType', None) + node_name = notification_info.get('ResourceQualifier', {}).get('NodeName', None) + if not resource_type: + raise Exception("abnormal notification@{0}".format(node_name)) + + if resource_type == ResourceType.TypePTP: + pass + else: + raise Exception("notification with unsupported resource type:{0}".format(resource_type)) + + this_delivery_time = notification_info['EventTimestamp'] + + entries = subscription_repo.get(ResourceType=resource_type, Status=1) + for entry in entries: + subscriptionid = entry.SubscriptionId + ResourceQualifierJson = entry.ResourceQualifierJson or '{}' + ResourceQualifier = json.loads(ResourceQualifierJson) + # qualify by NodeName + entry_node_name = ResourceQualifier.get('NodeName', None) + node_name_matched = NodeInfoHelper.match_node_name(entry_node_name, node_name) + if not node_name_matched: + continue + + subscription_dto2 = SubscriptionInfo(entry) + try: + last_delivery_stat = self.notification_stat.get(node_name,{}).get(subscriptionid,{}) + last_delivery_time = last_delivery_stat.get('EventTimestamp', None) + if last_delivery_time and last_delivery_time >= this_delivery_time: + # skip this entry since already delivered + LOG.debug("Ignore the notification for: {0}".format(entry.SubscriptionId)) + raise Exception("notification timestamp indicate it is not lastest") + + subscription_helper.notify(subscription_dto2, notification_info) + LOG.debug("notification is delivered successfully to {0}".format( + entry.SubscriptionId)) + + if not self.notification_stat.get(node_name, None): + self.notification_stat[node_name] = { + subscriptionid: { + 'EventTimestamp': this_delivery_time + } + } + LOG.debug("delivery time @node: {0},subscription:{1} is added".format( + node_name, subscriptionid)) + elif not self.notification_stat[node_name].get(subscriptionid, None): + self.notification_stat[node_name][subscriptionid] = { + 'EventTimestamp': this_delivery_time + } + LOG.debug("delivery time @node: {0},subscription:{1} is added".format( + node_name, subscriptionid)) + else: + last_delivery_stat['EventTimestamp'] = this_delivery_time + LOG.debug("delivery time @node: {0},subscription:{1} is updated".format( + node_name, subscriptionid)) + + except Exception as ex: + LOG.warning("notification is not delivered to {0}:{1}".format( + entry.SubscriptionId, str(ex))) + # remove the entry + continue + finally: + pass + except Exception as ex: + LOG.warning("Failed to delivery notification:{0}".format(str(ex))) + result = False + finally: + self.notification_lock.release() + if not subscription_repo: + del subscription_repo + + if result: + LOG.debug("Finished notification delivery") + else: + LOG.warning("Failed on notification delivery") + return result + + def process_sync_node_event(self): + LOG.debug("Start processing sync node event") + need_to_sync_node_again = False + for broker_node_name, node_resources in self.node_resources_map.items(): + try: + result = self.syncup_node(broker_node_name) + if not result: + need_to_sync_node_again = True + except Exception as ex: + LOG.warning("Failed to syncup node{0}:{1}".format(broker_node_name, str(ex))) + continue + + if need_to_sync_node_again: + # continue try in to next loop + self.signal_node_sync_event() + self.signal_events() + LOG.debug("Finished processing sync node event") + + def run(self): + # start location listener + self.__start_watch_all_nodes() + while True: + self.event.wait() + self.event.clear() + LOG.debug("daemon control event is asserted") + + if self.location_event.is_set(): + self.location_event.clear() + # process location notifications + self.consume_location_event() + + if self.subscription_event.is_set(): + self.subscription_event.clear() + # build node resources map from subscriptions + self.process_subscription_event() + + if self.__node_info_event.is_set(): + self.__node_info_event.clear() + # update node_resources_map from node info + self.__update_map_from_nodeinfos() + + if self.__node_resources_event.is_set(): + self.__node_resources_event.clear() + # update watchers from node_resources_map + self.__refresh_watchers_from_map() + + if self.__node_sync_event.is_set(): + self.__node_sync_event.clear() + # compensate for the possible loss of notification during reconnection + self.process_sync_node_event() + + continue + return + + def syncup_resource(self, broker_node_name, resource_type): + # check to sync up resource status on a node + LOG.debug("sync up resource@{0} :{1}".format(broker_node_name, resource_type)) + try: + if broker_node_name == NodeInfoHelper.BROKER_NODE_ALL: + self.locationservice_client.trigger_publishing_status( + resource_type, timeout=5, retry=10) + return True + + # 1, query resource status + broker_client = self.notificationservice_clients.get(broker_node_name, None) + if not broker_client: + raise Exception("notification service client is not setup for node {0}".format(broker_node_name)) + resource_status = broker_client.query_resource_status( + resource_type, timeout=5, retry=10) + + # 2, deliver resource by comparing LastDelivery time with EventTimestamp + # 3, update the LastDelivery with EventTimestamp + self.__NotificationWatcher.handle(resource_status) + except oslo_messaging.exceptions.MessagingTimeout as ex: + LOG.warning("Fail to syncup resource {0}@{1}, due to {2}".format( + resource_type, broker_node_name, str(ex))) + return False + except Exception as ex: + LOG.warning("Fail to syncup resource {0}@{1}, due to {2}".format( + resource_type, broker_node_name, str(ex))) + raise ex + finally: + pass + return True + + def syncup_node(self, broker_node_name): + all_resource_synced = True + # check to sync up resources status on a node + node_resources = self.node_resources_map.get(broker_node_name, None) + if node_resources: + LOG.debug("sync up resources@{0} :{1}".format(broker_node_name, node_resources)) + for resource_type, iteration in node_resources.items(): + if iteration == self.node_resources_iteration: + result = self.syncup_resource(broker_node_name, resource_type) + if not result: + all_resource_synced = False + return all_resource_synced + + def __cleanup_map(self): + for broker_node_name, node_resources in self.node_resources_map.items(): + resourcetypelist = [r for (r, i) in node_resources.items() if i 0 else True + + if isretrystopped: + LOG.error("Failed to publish ptp status:{0}@Topic:{1}".format( + ptpstatus, topic)) + return isretrystopped == False + + def publish_status_all(self, ptpstatus, retry=3): + if not self.registration_broker_client: + return False + topic_all='PTP-Event-*' + server = None + isretrystopped = False + while not isretrystopped: + try: + self.registration_broker_client.cast( + topic_all, 'NotifyStatus', notification=ptpstatus) + LOG.debug("Published ptp status:{0}@Topic:{1}".format(ptpstatus, topic_all)) + break + except Exception as ex: + LOG.warning("Failed to publish ptp status:{0}@Topic:{1} due to: {2}".format( + ptpstatus, topic, str(ex))) + retry = retry - 1 + isretrystopped = False if retry > 0 else True + + if isretrystopped: + LOG.error("Failed to publish ptp status:{0}@Topic:{1}".format( + ptpstatus, topic)) + return isretrystopped == False + + def start_status_listener(self, handler=None): + result = False + result1 = self.start_status_listener_local(handler) if self.local_broker_client else result + result2 = self.start_status_listener_all(handler) if self.registration_broker_client else result + result = result1 and result2 + return result + + def start_status_listener_local(self, handler=None): + if not self.local_broker_client: + return False + + topic='PTP-Status' + server='PTP-Tracking-{0}'.format(self.node_name) + endpoints = [PtpEventProducer.ListenerEndpoint(handler)] + + self.local_broker_client.add_listener( + topic, server, endpoints) + return True + + def start_status_listener_all(self, handler=None): + if not self.registration_broker_client: + return False + + topic='PTP-Status' + server='PTP-Tracking-{0}'.format(self.node_name) + endpoints = [PtpEventProducer.ListenerEndpoint(handler)] + + self.registration_broker_client.add_listener( + topic, server, endpoints) + return True + + def stop_status_listener(self): + result = False + result1 = self.stop_status_listener_local() if self.local_broker_client else result + result2 = self.stop_status_listener_all() if self.registration_broker_client else result + result = result1 and result2 + return result + + def stop_status_listener_local(self): + if not self.local_broker_client: + return False + + topic='PTP-Status' + server="PTP-Tracking-{0}".format(self.node_name) + self.local_broker_client.remove_listener( + topic, server) + + def stop_status_listener_all(self): + if not self.registration_broker_client: + return False + + topic='PTP-Status' + server="PTP-Tracking-{0}".format(self.node_name) + self.registration_broker_client.remove_listener( + topic, server) + + def is_listening(self): + result = False + result1 = self.is_listening_local() if self.local_broker_client else result + result2 = self.is_listening_all() if self.registration_broker_client else result + result = result1 and result2 + return result + + def is_listening_local(self): + if not self.local_broker_client: + return False + + topic='PTP-Status' + server="PTP-Tracking-{0}".format(self.node_name) + return self.local_broker_client.is_listening( + topic, server) + + def is_listening_all(self): + if not self.registration_broker_client: + return False + topic='PTP-Status' + server="PTP-Tracking-{0}".format(self.node_name) + return self.registration_broker_client.is_listening( + topic, server) diff --git a/notificationservice-base/centos/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/log_helper.py b/notificationservice-base/centos/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/log_helper.py new file mode 100644 index 0000000..d1a16e7 --- /dev/null +++ b/notificationservice-base/centos/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/log_helper.py @@ -0,0 +1,12 @@ +import logging + +def get_logger(module_name): + logger = logging.getLogger(module_name) + return config_logger(logger) + +def config_logger(logger): + ''' + configure the logger: uncomment following lines for debugging + ''' + # logger.setLevel(level=logging.DEBUG) + return logger diff --git a/notificationservice-base/centos/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptpsync.py b/notificationservice-base/centos/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptpsync.py new file mode 100644 index 0000000..fe96d0b --- /dev/null +++ b/notificationservice-base/centos/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptpsync.py @@ -0,0 +1,5 @@ +#! /usr/bin/python3 + +'''ptp_status API stub''' +def ptp_status(holdover_time, freq, sync_state, event_time): + return new_event, sync_state, event_time diff --git a/notificationservice-base/centos/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/rpc_helper.py b/notificationservice-base/centos/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/rpc_helper.py new file mode 100644 index 0000000..71e5cbf --- /dev/null +++ b/notificationservice-base/centos/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/rpc_helper.py @@ -0,0 +1,19 @@ +#coding=utf-8 + +import oslo_messaging +from oslo_config import cfg + + +def setup_client(rpc_endpoint_info, topic, server): + oslo_messaging.set_transport_defaults(rpc_endpoint_info.Exchange) + transport = oslo_messaging.get_rpc_transport(cfg.CONF, url=rpc_endpoint_info.TransportEndpoint) + target = oslo_messaging.Target(topic=topic, + version=rpc_endpoint_info.Version, + server=server, + namespace=rpc_endpoint_info.Namespace) + client = oslo_messaging.RPCClient(transport, target) + return client + +def get_transport(rpc_endpoint_info): + oslo_messaging.set_transport_defaults(rpc_endpoint_info.Exchange) + return oslo_messaging.get_rpc_transport(cfg.CONF, url=rpc_endpoint_info.TransportEndpoint) diff --git a/notificationservice-base/centos/docker/ptptrackingfunction/trackingfunctionsdk/model/__init__.py b/notificationservice-base/centos/docker/ptptrackingfunction/trackingfunctionsdk/model/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/notificationservice-base/centos/docker/ptptrackingfunction/trackingfunctionsdk/model/dto/__init__.py b/notificationservice-base/centos/docker/ptptrackingfunction/trackingfunctionsdk/model/dto/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/notificationservice-base/centos/docker/ptptrackingfunction/trackingfunctionsdk/model/dto/ptpstate.py b/notificationservice-base/centos/docker/ptptrackingfunction/trackingfunctionsdk/model/dto/ptpstate.py new file mode 100644 index 0000000..5415ac4 --- /dev/null +++ b/notificationservice-base/centos/docker/ptptrackingfunction/trackingfunctionsdk/model/dto/ptpstate.py @@ -0,0 +1,11 @@ +#coding=utf-8 + +from wsme import types as wtypes + +EnumPtpState = wtypes.Enum(str, 'Locked', 'Freerun', 'Holdover') + +class PtpState(object): + Locked = "Locked" + Freerun = "Freerun" + Holdover = "Holdover" + diff --git a/notificationservice-base/centos/docker/ptptrackingfunction/trackingfunctionsdk/model/dto/ptpstatus.py b/notificationservice-base/centos/docker/ptptrackingfunction/trackingfunctionsdk/model/dto/ptpstatus.py new file mode 100644 index 0000000..0be94fc --- /dev/null +++ b/notificationservice-base/centos/docker/ptptrackingfunction/trackingfunctionsdk/model/dto/ptpstatus.py @@ -0,0 +1,24 @@ +#coding=utf-8 + +from wsme import types as wtypes +from trackingfunctionsdk.model.dto.resourcetype import EnumResourceType +from trackingfunctionsdk.model.dto.ptpstate import PtpState + +class PtpStatus(wtypes.Base): + EventTimestamp = float + ResourceType = EnumResourceType + EventData_State = PtpState + ResourceQualifier_NodeName = wtypes.text + + def to_dict(self): + d = { + 'EventTimestamp': self.EventTimestamp, + 'ResourceType': self.ResourceType, + 'EventData': { + 'State': self.EventData_State + }, + 'ResourceQualifier': { + 'NodeName': self.ResourceQualifier_NodeName + } + } + return d diff --git a/notificationservice-base/centos/docker/ptptrackingfunction/trackingfunctionsdk/model/dto/resourcetype.py b/notificationservice-base/centos/docker/ptptrackingfunction/trackingfunctionsdk/model/dto/resourcetype.py new file mode 100644 index 0000000..788f477 --- /dev/null +++ b/notificationservice-base/centos/docker/ptptrackingfunction/trackingfunctionsdk/model/dto/resourcetype.py @@ -0,0 +1,9 @@ +#coding=utf-8 + +from wsme import types as wtypes + +EnumResourceType = wtypes.Enum(str, 'PTP', 'FPGA') + +class ResourceType(object): + TypePTP = "PTP" + TypeFPGA = "FPGA" diff --git a/notificationservice-base/centos/docker/ptptrackingfunction/trackingfunctionsdk/model/dto/rpc_endpoint.py b/notificationservice-base/centos/docker/ptptrackingfunction/trackingfunctionsdk/model/dto/rpc_endpoint.py new file mode 100644 index 0000000..e4dcceb --- /dev/null +++ b/notificationservice-base/centos/docker/ptptrackingfunction/trackingfunctionsdk/model/dto/rpc_endpoint.py @@ -0,0 +1,34 @@ +#coding=utf-8 + +from wsme import types as wtypes + +RPC_ENDPOINT_BASE = { + 'Version': '1.0', + 'Namespace': 'notification', + 'Exchange': 'notification_exchange', + 'TransportEndpoint': '', + 'Topic': '', + 'Server': '' +} + +class RpcEndpointInfo(wtypes.Base): + TransportEndpoint = wtypes.text + Exchange = wtypes.text + Topic = wtypes.text + Server = wtypes.text + Version = wtypes.text + Namespace = wtypes.text + + def __init__(self, transport_endpoint): + self.endpoint_json = { + 'Version': RPC_ENDPOINT_BASE['Version'], + 'Namespace': RPC_ENDPOINT_BASE['Namespace'], + 'Exchange': RPC_ENDPOINT_BASE['Exchange'], + 'TransportEndpoint': transport_endpoint, + 'Topic': RPC_ENDPOINT_BASE['Topic'], + 'Server': RPC_ENDPOINT_BASE['Server'] + } + super(RpcEndpointInfo, self).__init__(**self.endpoint_json) + + def to_dict(self): + return self.endpoint_json diff --git a/notificationservice-base/centos/docker/ptptrackingfunction/trackingfunctionsdk/services/__init__.py b/notificationservice-base/centos/docker/ptptrackingfunction/trackingfunctionsdk/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/notificationservice-base/centos/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py b/notificationservice-base/centos/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py new file mode 100644 index 0000000..5c08bc5 --- /dev/null +++ b/notificationservice-base/centos/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py @@ -0,0 +1,213 @@ + +import os +import json +import time +import oslo_messaging +from oslo_config import cfg +import logging + +import multiprocessing as mp +import threading + +from trackingfunctionsdk.common.helpers import rpc_helper +from trackingfunctionsdk.model.dto.rpc_endpoint import RpcEndpointInfo +from trackingfunctionsdk.model.dto.resourcetype import ResourceType +from trackingfunctionsdk.model.dto.ptpstate import PtpState + +from trackingfunctionsdk.client.ptpeventproducer import PtpEventProducer + +from trackingfunctionsdk.common.helpers import ptpsync as ptpsync + +LOG = logging.getLogger(__name__) + +from trackingfunctionsdk.common.helpers import log_helper +log_helper.config_logger(LOG) + +THIS_NODE_NAME = os.environ.get("THIS_NODE_NAME",'controller-0') + + +'''Entry point of Default Process Worker''' +def ProcessWorkerDefault(event, sqlalchemy_conf_json, broker_transport_endpoint): + worker = PtpWatcherDefault(event, sqlalchemy_conf_json, broker_transport_endpoint) + worker.run() + return + + +class PtpWatcherDefault: + DEFAULT_PTPTRACKER_CONTEXT = { + 'holdover_seconds': 30, + 'poll_freq_seconds': 2 + } + + class PtpRequestHandlerDefault(object): + def __init__(self, watcher): + self.watcher = watcher + self.init_time = time.time() + + def query_status(self, **rpc_kwargs): + self.watcher.ptptracker_context_lock.acquire() + sync_state = self.watcher.ptptracker_context.get('sync_state', PtpState.Freerun) + last_event_time = self.watcher.ptptracker_context.get('last_event_time', time.time()) + self.watcher.ptptracker_context_lock.release() + + lastStatus = { + 'ResourceType': ResourceType.TypePTP, + 'EventData': { + 'State': sync_state + }, + 'ResourceQualifier': { + 'NodeName': self.watcher.node_name + }, + 'EventTimestamp': last_event_time + } + return lastStatus + + def trigger_delivery(self, **rpc_kwargs): + self.watcher.forced_publishing = True + self.watcher.signal_ptp_event() + pass + + def __init__(self, event, sqlalchemy_conf_json, daemon_context_json): + self.sqlalchemy_conf = json.loads(sqlalchemy_conf_json) + self.event = event + self.init_time = time.time() + + self.daemon_context = json.loads(daemon_context_json) + self.ptptracker_context = self.daemon_context.get( + 'ptptracker_context', PtpWatcherDefault.DEFAULT_PTPTRACKER_CONTEXT) + self.ptptracker_context['sync_state'] = PtpState.Freerun + self.ptptracker_context['last_event_time'] = self.init_time + self.ptptracker_context_lock = threading.Lock() + + self.ptp_device_simulated = "true" == self.ptptracker_context.get('device_simulated', "False").lower() + + self.event_timeout = float(self.ptptracker_context['poll_freq_seconds']) + + self.node_name = self.daemon_context['THIS_NODE_NAME'] + self.namespace = self.daemon_context.get('THIS_NAMESPACE', 'notification') + + broker_transport_endpoint = self.daemon_context['NOTIFICATION_TRANSPORT_ENDPOINT'] + + registration_transport_endpoint = self.daemon_context['REGISTRATION_TRANSPORT_ENDPOINT'] + + self.broker_endpoint = RpcEndpointInfo(broker_transport_endpoint) + self.registration_broker_endpoint = RpcEndpointInfo(registration_transport_endpoint) + self.ptpeventproducer = PtpEventProducer( + self.node_name, + self.broker_endpoint.TransportEndpoint, + self.registration_broker_endpoint.TransportEndpoint) + + self.__ptprequest_handler = PtpWatcherDefault.PtpRequestHandlerDefault(self) + self.forced_publishing = False + + def signal_ptp_event(self): + if self.event: + self.event.set() + else: + LOG.warning("Unable to assert ptp event") + pass + + def run(self): + # start location listener + self.__start_listener() + while True: + # annouce the location + forced = self.forced_publishing + self.forced_publishing = False + self.__publish_ptpstatus(forced) + if self.event.wait(self.event_timeout): + LOG.debug("daemon control event is asserted") + self.event.clear() + else: + LOG.debug("daemon control event is timeout") + pass + continue + self.__stop_listener() + + '''Start listener to answer querying from clients''' + def __start_listener(self): + LOG.debug("start listener to answer location querying") + + self.ptpeventproducer.start_status_listener( + self.__ptprequest_handler + ) + return + + def __stop_listener(self): + LOG.debug("stop listener to answer location querying") + + self.ptpeventproducer.stop_status_listener(self.location_info) + return + + def __get_ptp_status(self, holdover_time, freq, sync_state, last_event_time): + new_event = False + new_event_time = last_event_time + if self.ptp_device_simulated: + now = time.time() + timediff = now - last_event_time + if timediff > holdover_time: + new_event = True + new_event_time = now + if sync_state == PtpState.Freerun: + sync_state = PtpState.Locked + elif sync_state == PtpState.Locked: + sync_state = PtpState.Holdover + elif sync_state == PtpState.Holdover: + sync_state = PtpState.Freerun + else: + sync_state = PtpState.Freerun + else: + new_event, sync_state, new_event_time = ptpsync.ptp_status( + holdover_time, freq, sync_state, last_event_time) + return new_event, sync_state, new_event_time + + '''announce location''' + def __publish_ptpstatus(self, forced=False): + holdover_time = float(self.ptptracker_context['holdover_seconds']) + freq = float(self.ptptracker_context['poll_freq_seconds']) + sync_state = self.ptptracker_context.get('sync_state', 'Unknown') + last_event_time = self.ptptracker_context.get('last_event_time', time.time()) + + new_event, sync_state, new_event_time = self.__get_ptp_status( + holdover_time, freq, sync_state, last_event_time) + + if new_event or forced: + # update context + self.ptptracker_context_lock.acquire() + self.ptptracker_context['sync_state'] = sync_state + self.ptptracker_context['last_event_time'] = new_event_time + self.ptptracker_context_lock.release() + + # publish new event + LOG.debug("publish ptp status to clients") + lastStatus = { + 'ResourceType': 'PTP', + 'EventData': { + 'State': sync_state + }, + 'ResourceQualifier': { + 'NodeName': self.node_name + }, + 'EventTimestamp': new_event_time + } + self.ptpeventproducer.publish_status(lastStatus) + return + + +class DaemonControl(object): + + def __init__(self, sqlalchemy_conf_json, daemon_context_json, process_worker = None): + self.event = mp.Event() + self.daemon_context = json.loads(daemon_context_json) + self.node_name = self.daemon_context['THIS_NODE_NAME'] + if not process_worker: + process_worker = ProcessWorkerDefault + + self. sqlalchemy_conf_json = sqlalchemy_conf_json + self.daemon_context_json = daemon_context_json + self.process_worker = process_worker + return + + def refresh(self): + self.process_worker(self.event, self.sqlalchemy_conf_json, self.daemon_context_json) + self.event.set() diff --git a/notificationservice-base/centos/notificationservice-base.stable_docker_image b/notificationservice-base/centos/notificationservice-base.stable_docker_image new file mode 100644 index 0000000..2bd26d6 --- /dev/null +++ b/notificationservice-base/centos/notificationservice-base.stable_docker_image @@ -0,0 +1,2 @@ +BUILDER=docker +LABEL=notificationservice-base \ No newline at end of file