From bd3fb2a4031cd7188fa212c9690d6759a254dce3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frank=20Jia=20=28=E8=B4=BE=E6=80=9D=E7=91=9E=29?= Date: Wed, 19 Jul 2023 10:43:07 +0800 Subject: [PATCH] add rpc function Change-Id: I410bf1c8d7ca3d07ab9ed544223c5bec06f5f7a0 --- etc/venus/venus.conf | 8 +-- venus/cmd/api.py | 3 + venus/cmd/task.py | 51 +++++++++++++ venus/conf/common.py | 3 + venus/manager.py | 93 ++++++++++++++++++++++++ venus/rpc.py | 165 +++++++++++++++++++++++++++++++++++++++++++ venus/service.py | 27 ++++--- 7 files changed, 337 insertions(+), 13 deletions(-) create mode 100644 venus/cmd/task.py create mode 100644 venus/manager.py create mode 100644 venus/rpc.py diff --git a/etc/venus/venus.conf b/etc/venus/venus.conf index 93d1fe2..11ee1a7 100644 --- a/etc/venus/venus.conf +++ b/etc/venus/venus.conf @@ -21,12 +21,12 @@ os_region_name = RegionOne osapi_venus_listen_port = 10010 osapi_venus_workers = 1 log_dir = /var/log/nova/ -logging_default_format_string = %(asctime)s.%(msecs)03d %(process)d %(levelname)s %(name)s [- req-None - - - - -] %(instance)s%(message)s +logging_default_format_string = %(asctime)s.%(msecs)03d %(process)d %(levelname)s %(name)s [- - - - - - -] %(instance)s%(message)s logging_context_format_string = %(asctime)s.%(msecs)03d %(process)d %(levelname)s %(name)s [%(global_request_id)s %(request_id)s %(user_identity)s] %(instance)s%(message)s - - - +[oslo_messaging_notifications] +transport_url = rabbit://stackrabbit:secret@localhost:5672/ +driver = messagingv2 [elasticsearch] diff --git a/venus/cmd/api.py b/venus/cmd/api.py index 613dc67..7e10e26 100644 --- a/venus/cmd/api.py +++ b/venus/cmd/api.py @@ -23,6 +23,7 @@ from oslo_reports import guru_meditation_report as gmr from venus.conf import CONF from venus import i18n +from venus import rpc from venus import service from venus import utils from venus import version @@ -43,6 +44,8 @@ def main(): gmr.TextGuruMeditation.setup_autorun(version) + rpc.init(CONF) + server = service.WSGIService('osapi_venus') launcher = service.get_launcher() launcher.launch_service(server, workers=server.workers) diff --git a/venus/cmd/task.py b/venus/cmd/task.py new file mode 100644 index 0000000..9de438b --- /dev/null +++ b/venus/cmd/task.py @@ -0,0 +1,51 @@ +# Copyright 2020 Inspur +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Starter script for Venus API.""" + +import os +import sys + +from oslo_log import log as logging +from oslo_reports import guru_meditation_report as gmr + +from venus.conf import CONF +from venus import i18n +from venus import service +from venus import utils +from venus import version + +i18n.enable_lazy() + + +def main(): + CONF(sys.argv[1:], project='venus', + version=version.version_string()) + logdir = CONF.log_dir + is_exists = os.path.exists(logdir) + if not is_exists: + os.makedirs(logdir) + logging.setup(CONF, "venus") + utils.monkey_patch() + + gmr.TextGuruMeditation.setup_autorun(version) + + server = service.Service.create(binary='venus-task', + topic="venus-task") + service.serve(server) + service.wait() + + +if __name__ == "__main__": + main() diff --git a/venus/conf/common.py b/venus/conf/common.py index 356fb6d..e3e38a3 100644 --- a/venus/conf/common.py +++ b/venus/conf/common.py @@ -174,6 +174,9 @@ global_opts = [ help='Base URL that will be presented to users in links ' 'to the OpenStack Venus API', deprecated_name='osapi_compute_link_prefix'), + cfg.StrOpt('task_manager', + default="venus.manager.Manager", + help='Btask_manager') ] diff --git a/venus/manager.py b/venus/manager.py new file mode 100644 index 0000000..53a719e --- /dev/null +++ b/venus/manager.py @@ -0,0 +1,93 @@ +# Copyright 2020 Inspur +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Base Manager class. + +Managers will often provide methods for initial setup of a host or periodic +tasks to a wrapping service. + +This module provides Manager, a base class for managers. + +""" + +from oslo_config import cfg +from oslo_log import log as logging +import oslo_messaging as messaging +from oslo_service import periodic_task + +from venus.db import base +from venus import version + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + + +class PeriodicTasks(periodic_task.PeriodicTasks): + def __init__(self): + super(PeriodicTasks, self).__init__(CONF) + + +class Manager(base.Base, PeriodicTasks): + # Set RPC API version to 1.0 by default. + RPC_API_VERSION = '1.0' + + target = messaging.Target(version=RPC_API_VERSION) + + def __init__(self, host=None, db_driver=None): + if not host: + host = CONF.host + self.host = host + self.additional_endpoints = [] + super(Manager, self).__init__(db_driver) + + def periodic_tasks(self, context, raise_on_error=False): + """Tasks to be run at a periodic interval.""" + return self.run_periodic_tasks(context, raise_on_error=raise_on_error) + + def init_host(self): + """Handle initialization if this is a standalone service. + + A hook point for services to execute tasks before the services are made + available (i.e. showing up on RPC and starting to accept RPC calls) to + other components. Child classes should override this method. + + """ + pass + + def init_host_with_rpc(self): + """A hook for service to do jobs after RPC is ready. + + Like init_host(), this method is a hook where services get a chance + to execute tasks that *need* RPC. Child classes should override + this method. + + """ + pass + + def service_version(self): + return version.version_string() + + def service_config(self): + config = {} + for key in CONF: + config[key] = CONF.get(key, None) + return config + + def is_working(self): + """Method indicating if service is working correctly. + + This method is supposed to be overriden by subclasses and return if + manager is working correctly. + """ + return True diff --git a/venus/rpc.py b/venus/rpc.py new file mode 100644 index 0000000..6a8b807 --- /dev/null +++ b/venus/rpc.py @@ -0,0 +1,165 @@ +# Copyright 2020 Inspur +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +__all__ = [ + 'init', + 'cleanup', + 'set_defaults', + 'add_extra_exmods', + 'clear_extra_exmods', + 'get_allowed_exmods', + 'RequestContextSerializer', + 'get_client', + 'get_server', + 'get_notifier', + 'TRANSPORT_ALIASES', +] + +import oslo_messaging as messaging +from oslo_serialization import jsonutils +from osprofiler import profiler +from venus.conf import CONF + +import venus.context +import venus.exception + + +TRANSPORT = None +NOTIFIER = None + +ALLOWED_EXMODS = [ + venus.exception.__name__, +] +EXTRA_EXMODS = [] + +# NOTE(flaper87): The venus.openstack.common.rpc entries are +# for backwards compat with Havana rpc_backend configuration +# values. The venus.rpc entries are for compat with Folsom values. +TRANSPORT_ALIASES = { + 'venus.openstack.common.rpc.impl_kombu': 'rabbit', + 'venus.openstack.common.rpc.impl_qpid': 'qpid', + 'venus.openstack.common.rpc.impl_zmq': 'zmq', + 'venus.rpc.impl_kombu': 'rabbit', + 'venus.rpc.impl_qpid': 'qpid', + 'venus.rpc.impl_zmq': 'zmq', +} + + +def init(conf): + global TRANSPORT, NOTIFIER + exmods = get_allowed_exmods() + TRANSPORT = messaging.get_transport(conf, + allowed_remote_exmods=exmods) + + serializer = RequestContextSerializer(JsonPayloadSerializer()) + NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer) + + +def initialized(): + return None not in [TRANSPORT, NOTIFIER] + + +def cleanup(): + global TRANSPORT, NOTIFIER + assert TRANSPORT is not None + assert NOTIFIER is not None + TRANSPORT.cleanup() + TRANSPORT = NOTIFIER = None + + +def set_defaults(control_exchange): + messaging.set_transport_defaults(control_exchange) + + +def add_extra_exmods(*args): + EXTRA_EXMODS.extend(args) + + +def clear_extra_exmods(): + del EXTRA_EXMODS[:] + + +def get_allowed_exmods(): + return ALLOWED_EXMODS + EXTRA_EXMODS + + +class JsonPayloadSerializer(messaging.NoOpSerializer): + @staticmethod + def serialize_entity(context, entity): + return jsonutils.to_primitive(entity, convert_instances=True) + + +class RequestContextSerializer(messaging.Serializer): + + def __init__(self, base): + self._base = base + + def serialize_entity(self, context, entity): + if not self._base: + return entity + return self._base.serialize_entity(context, entity) + + def deserialize_entity(self, context, entity): + if not self._base: + return entity + return self._base.deserialize_entity(context, entity) + + def serialize_context(self, context): + _context = context.to_dict() + prof = profiler.get() + if prof: + trace_info = { + "hmac_key": prof.hmac_key, + "base_id": prof.get_base_id(), + "parent_id": prof.get_id() + } + _context.update({"trace_info": trace_info}) + return _context + + def deserialize_context(self, context): + trace_info = context.pop("trace_info", None) + if trace_info: + profiler.init(**trace_info) + + return venus.context.RequestContext.from_dict(context) + + +def get_transport_url(url_str=None): + return messaging.TransportURL.parse(CONF, url_str, TRANSPORT_ALIASES) + + +def get_client(target, version_cap=None, serializer=None): + assert TRANSPORT is not None + serializer = RequestContextSerializer(serializer) + return messaging.RPCClient(TRANSPORT, + target, + version_cap=version_cap, + serializer=serializer) + + +def get_server(target, endpoints, serializer=None): + assert TRANSPORT is not None + serializer = RequestContextSerializer(serializer) + return messaging.get_rpc_server(TRANSPORT, + target, + endpoints, + executor='eventlet', + serializer=serializer) + + +def get_notifier(service=None, host=None, publisher_id=None): + assert NOTIFIER is not None + if not publisher_id: + publisher_id = "%s.%s" % (service, host or CONF.host) + return NOTIFIER.prepare(publisher_id=publisher_id) diff --git a/venus/service.py b/venus/service.py index fb8d86c..30f70da 100644 --- a/venus/service.py +++ b/venus/service.py @@ -17,21 +17,25 @@ import inspect import os +import oslo_messaging as messaging +import osprofiler.notifier +import osprofiler.web import random + from oslo_concurrency import processutils from oslo_service import loopingcall from oslo_service import service from oslo_utils import importutils -import osprofiler.notifier -from osprofiler import profiler -import osprofiler.web + from venus.common.utils import LOG from venus.conf import CONF from venus import context from venus import exception from venus.i18n import _, _LI, _LW +from venus.objects import base as objects_base +from venus import rpc from venus import version from venus.wsgi import common as wsgi_common from venus.wsgi import eventlet_server as wsgi @@ -71,11 +75,8 @@ class Service(service.Service): self.topic = topic self.manager_class_name = manager manager_class = importutils.import_class(self.manager_class_name) - manager_class = profiler.trace_cls("rpc")(manager_class) - self.manager = manager_class(host=self.host, - service_name=service_name, - *args, **kwargs) + self.manager = manager_class(*args, **kwargs) self.periodic_interval = periodic_interval self.periodic_fuzzy_delay = periodic_fuzzy_delay self.saved_args, self.saved_kwargs = args, kwargs @@ -185,7 +186,7 @@ class Service(service.Service): self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error) -class WSGIService(service.ServiceBase): +class WSGIService(service.Service): """Provides ability to launch API from a 'paste' configuration.""" def __init__(self, name, loader=None): @@ -212,6 +213,7 @@ class WSGIService(service.ServiceBase): 'workers': self.workers}) raise exception.InvalidInput(msg) setup_profiler(name, self.host) + self.rpcserver = None self.server = wsgi.Server(name, self.app, @@ -250,8 +252,15 @@ class WSGIService(service.ServiceBase): """ if self.manager: self.manager.init_host() + self.topic = "xxxxxxx" + target = messaging.Target(topic=self.topic, server="10.49.38.57") + endpoints = [self.manager] + # endpoints.extend(self.manager.additional_endpoints) + serializer = objects_base.VenusObjectSerializer() + self.rpcserver = rpc.get_server(target, endpoints, serializer) + self.rpcserver.start() + self.server.start() - self.port = self.server.port def stop(self): """Stop serving this API.