From b3577a0cc024be3647769dfd42ec65ae71cedf9a Mon Sep 17 00:00:00 2001 From: vinod pandarinathan Date: Sat, 6 Jun 2015 05:46:55 -0700 Subject: [PATCH] Add openstack common service module Change-Id: I2a4ab17de6b7ba86141145f109275beacee51a9b --- cloudpulse/openstack/common/service.py | 507 +++++++++++++++++++++ cloudpulse/openstack/common/systemd.py | 105 +++++ cloudpulse/openstack/common/threadgroup.py | 150 ++++++ openstack-common.conf | 1 + 4 files changed, 763 insertions(+) create mode 100644 cloudpulse/openstack/common/service.py create mode 100644 cloudpulse/openstack/common/systemd.py create mode 100644 cloudpulse/openstack/common/threadgroup.py diff --git a/cloudpulse/openstack/common/service.py b/cloudpulse/openstack/common/service.py new file mode 100644 index 0000000..ae741f0 --- /dev/null +++ b/cloudpulse/openstack/common/service.py @@ -0,0 +1,507 @@ +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# Copyright 2011 Justin Santa Barbara +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Generic Node base class for all workers that run on hosts.""" + +import errno +import io +import logging +import os +import random +import signal +import sys +import time + +import eventlet +from eventlet import event +from oslo_config import cfg + +from cloudpulse.openstack.common import eventlet_backdoor +from cloudpulse.openstack.common._i18n import _LE, _LI, _LW +from cloudpulse.openstack.common import systemd +from cloudpulse.openstack.common import threadgroup + + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + + +def _sighup_supported(): + return hasattr(signal, 'SIGHUP') + + +def _is_daemon(): + # The process group for a foreground process will match the + # process group of the controlling terminal. If those values do + # not match, or ioctl() fails on the stdout file handle, we assume + # the process is running in the background as a daemon. + # http://www.gnu.org/software/bash/manual/bashref.html#Job-Control-Basics + try: + is_daemon = os.getpgrp() != os.tcgetpgrp(sys.stdout.fileno()) + except io.UnsupportedOperation: + # Could not get the fileno for stdout, so we must be a daemon. + is_daemon = True + except OSError as err: + if err.errno == errno.ENOTTY: + # Assume we are a daemon because there is no terminal. + is_daemon = True + else: + raise + return is_daemon + + +def _is_sighup_and_daemon(signo): + if not (_sighup_supported() and signo == signal.SIGHUP): + # Avoid checking if we are a daemon, because the signal isn't + # SIGHUP. + return False + return _is_daemon() + + +def _signo_to_signame(signo): + signals = {signal.SIGTERM: 'SIGTERM', + signal.SIGINT: 'SIGINT'} + if _sighup_supported(): + signals[signal.SIGHUP] = 'SIGHUP' + return signals[signo] + + +def _set_signals_handler(handler): + signal.signal(signal.SIGTERM, handler) + signal.signal(signal.SIGINT, handler) + if _sighup_supported(): + signal.signal(signal.SIGHUP, handler) + + +class Launcher(object): + """Launch one or more services and wait for them to complete.""" + + def __init__(self): + """Initialize the service launcher. + + :returns: None + + """ + self.services = Services() + self.backdoor_port = eventlet_backdoor.initialize_if_enabled() + + def launch_service(self, service): + """Load and start the given service. + + :param service: The service you would like to start. + :returns: None + + """ + service.backdoor_port = self.backdoor_port + self.services.add(service) + + def stop(self): + """Stop all services which are currently running. + + :returns: None + + """ + self.services.stop() + + def wait(self): + """Waits until all services have been stopped, and then returns. + + :returns: None + + """ + self.services.wait() + + def restart(self): + """Reload config files and restart service. + + :returns: None + + """ + cfg.CONF.reload_config_files() + self.services.restart() + + +class SignalExit(SystemExit): + def __init__(self, signo, exccode=1): + super(SignalExit, self).__init__(exccode) + self.signo = signo + + +class ServiceLauncher(Launcher): + def _handle_signal(self, signo, frame): + # Allow the process to be killed again and die from natural causes + _set_signals_handler(signal.SIG_DFL) + raise SignalExit(signo) + + def handle_signal(self): + _set_signals_handler(self._handle_signal) + + def _wait_for_exit_or_signal(self, ready_callback=None): + status = None + signo = 0 + + LOG.debug('Full set of CONF:') + CONF.log_opt_values(LOG, logging.DEBUG) + + try: + if ready_callback: + ready_callback() + super(ServiceLauncher, self).wait() + except SignalExit as exc: + signame = _signo_to_signame(exc.signo) + LOG.info(_LI('Caught %s, exiting'), signame) + status = exc.code + signo = exc.signo + except SystemExit as exc: + status = exc.code + finally: + self.stop() + + return status, signo + + def wait(self, ready_callback=None): + systemd.notify_once() + while True: + self.handle_signal() + status, signo = self._wait_for_exit_or_signal(ready_callback) + if not _is_sighup_and_daemon(signo): + return status + self.restart() + + +class ServiceWrapper(object): + def __init__(self, service, workers): + self.service = service + self.workers = workers + self.children = set() + self.forktimes = [] + + +class ProcessLauncher(object): + _signal_handlers_set = set() + + @classmethod + def _handle_class_signals(cls, *args, **kwargs): + for handler in cls._signal_handlers_set: + handler(*args, **kwargs) + + def __init__(self, wait_interval=0.01): + """Constructor. + + :param wait_interval: The interval to sleep for between checks + of child process exit. + """ + self.children = {} + self.sigcaught = None + self.running = True + self.wait_interval = wait_interval + rfd, self.writepipe = os.pipe() + self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r') + self.handle_signal() + + def handle_signal(self): + self._signal_handlers_set.add(self._handle_signal) + _set_signals_handler(self._handle_class_signals) + + def _handle_signal(self, signo, frame): + self.sigcaught = signo + self.running = False + + # Allow the process to be killed again and die from natural causes + _set_signals_handler(signal.SIG_DFL) + + def _pipe_watcher(self): + # This will block until the write end is closed when the parent + # dies unexpectedly + self.readpipe.read(1) + + LOG.info(_LI('Parent process has died unexpectedly, exiting')) + + sys.exit(1) + + def _child_process_handle_signal(self): + # Setup child signal handlers differently + def _sighup(*args): + signal.signal(signal.SIGHUP, signal.SIG_DFL) + raise SignalExit(signal.SIGHUP) + + # Parent signals with SIGTERM when it wants us to go away. + signal.signal(signal.SIGTERM, signal.SIG_DFL) + if _sighup_supported(): + signal.signal(signal.SIGHUP, _sighup) + # Block SIGINT and let the parent send us a SIGTERM + signal.signal(signal.SIGINT, signal.SIG_IGN) + + def _child_wait_for_exit_or_signal(self, launcher): + status = 0 + signo = 0 + + # NOTE(johannes): All exceptions are caught to ensure this + # doesn't fallback into the loop spawning children. It would + # be bad for a child to spawn more children. + try: + launcher.wait() + except SignalExit as exc: + signame = _signo_to_signame(exc.signo) + LOG.info(_LI('Child caught %s, exiting'), signame) + status = exc.code + signo = exc.signo + except SystemExit as exc: + status = exc.code + except BaseException: + LOG.exception(_LE('Unhandled exception')) + status = 2 + finally: + launcher.stop() + + return status, signo + + def _child_process(self, service): + self._child_process_handle_signal() + + # Reopen the eventlet hub to make sure we don't share an epoll + # fd with parent and/or siblings, which would be bad + eventlet.hubs.use_hub() + + # Close write to ensure only parent has it open + os.close(self.writepipe) + # Create greenthread to watch for parent to close pipe + eventlet.spawn_n(self._pipe_watcher) + + # Reseed random number generator + random.seed() + + launcher = Launcher() + launcher.launch_service(service) + return launcher + + def _start_child(self, wrap): + if len(wrap.forktimes) > wrap.workers: + # Limit ourselves to one process a second (over the period of + # number of workers * 1 second). This will allow workers to + # start up quickly but ensure we don't fork off children that + # die instantly too quickly. + if time.time() - wrap.forktimes[0] < wrap.workers: + LOG.info(_LI('Forking too fast, sleeping')) + time.sleep(1) + + wrap.forktimes.pop(0) + + wrap.forktimes.append(time.time()) + + pid = os.fork() + if pid == 0: + launcher = self._child_process(wrap.service) + while True: + self._child_process_handle_signal() + status, signo = self._child_wait_for_exit_or_signal(launcher) + if not _is_sighup_and_daemon(signo): + break + launcher.restart() + + os._exit(status) + + LOG.info(_LI('Started child %d'), pid) + + wrap.children.add(pid) + self.children[pid] = wrap + + return pid + + def launch_service(self, service, workers=1): + wrap = ServiceWrapper(service, workers) + + LOG.info(_LI('Starting %d workers'), wrap.workers) + while self.running and len(wrap.children) < wrap.workers: + self._start_child(wrap) + + def _wait_child(self): + try: + # Don't block if no child processes have exited + pid, status = os.waitpid(0, os.WNOHANG) + if not pid: + return None + except OSError as exc: + if exc.errno not in (errno.EINTR, errno.ECHILD): + raise + return None + + if os.WIFSIGNALED(status): + sig = os.WTERMSIG(status) + LOG.info(_LI('Child %(pid)d killed by signal %(sig)d'), + dict(pid=pid, sig=sig)) + else: + code = os.WEXITSTATUS(status) + LOG.info(_LI('Child %(pid)s exited with status %(code)d'), + dict(pid=pid, code=code)) + + if pid not in self.children: + LOG.warning(_LW('pid %d not in child list'), pid) + return None + + wrap = self.children.pop(pid) + wrap.children.remove(pid) + return wrap + + def _respawn_children(self): + while self.running: + wrap = self._wait_child() + if not wrap: + # Yield to other threads if no children have exited + # Sleep for a short time to avoid excessive CPU usage + # (see bug #1095346) + eventlet.greenthread.sleep(self.wait_interval) + continue + while self.running and len(wrap.children) < wrap.workers: + self._start_child(wrap) + + def wait(self): + """Loop waiting on children to die and respawning as necessary.""" + + systemd.notify_once() + LOG.debug('Full set of CONF:') + CONF.log_opt_values(LOG, logging.DEBUG) + + try: + while True: + self.handle_signal() + self._respawn_children() + # No signal means that stop was called. Don't clean up here. + if not self.sigcaught: + return + + signame = _signo_to_signame(self.sigcaught) + LOG.info(_LI('Caught %s, stopping children'), signame) + if not _is_sighup_and_daemon(self.sigcaught): + break + + cfg.CONF.reload_config_files() + for service in set( + [wrap.service for wrap in self.children.values()]): + service.reset() + + for pid in self.children: + os.kill(pid, signal.SIGHUP) + + self.running = True + self.sigcaught = None + except eventlet.greenlet.GreenletExit: + LOG.info(_LI("Wait called after thread killed. Cleaning up.")) + + self.stop() + + def stop(self): + """Terminate child processes and wait on each.""" + self.running = False + for pid in self.children: + try: + os.kill(pid, signal.SIGTERM) + except OSError as exc: + if exc.errno != errno.ESRCH: + raise + + # Wait for children to die + if self.children: + LOG.info(_LI('Waiting on %d children to exit'), len(self.children)) + while self.children: + self._wait_child() + + +class Service(object): + """Service object for binaries running on hosts.""" + + def __init__(self, threads=1000): + self.tg = threadgroup.ThreadGroup(threads) + + # signal that the service is done shutting itself down: + self._done = event.Event() + + def reset(self): + # NOTE(Fengqian): docs for Event.reset() recommend against using it + self._done = event.Event() + + def start(self): + pass + + def stop(self, graceful=False): + self.tg.stop(graceful) + self.tg.wait() + # Signal that service cleanup is done: + if not self._done.ready(): + self._done.send() + + def wait(self): + self._done.wait() + + +class Services(object): + + def __init__(self): + self.services = [] + self.tg = threadgroup.ThreadGroup() + self.done = event.Event() + + def add(self, service): + self.services.append(service) + self.tg.add_thread(self.run_service, service, self.done) + + def stop(self): + # wait for graceful shutdown of services: + for service in self.services: + service.stop() + service.wait() + + # Each service has performed cleanup, now signal that the run_service + # wrapper threads can now die: + if not self.done.ready(): + self.done.send() + + # reap threads: + self.tg.stop() + + def wait(self): + self.tg.wait() + + def restart(self): + self.stop() + self.done = event.Event() + for restart_service in self.services: + restart_service.reset() + self.tg.add_thread(self.run_service, restart_service, self.done) + + @staticmethod + def run_service(service, done): + """Service start wrapper. + + :param service: service to run + :param done: event to wait on until a shutdown is triggered + :returns: None + + """ + service.start() + done.wait() + + +def launch(service, workers=1): + if workers is None or workers == 1: + launcher = ServiceLauncher() + launcher.launch_service(service) + else: + launcher = ProcessLauncher() + launcher.launch_service(service, workers=workers) + + return launcher diff --git a/cloudpulse/openstack/common/systemd.py b/cloudpulse/openstack/common/systemd.py new file mode 100644 index 0000000..36243b3 --- /dev/null +++ b/cloudpulse/openstack/common/systemd.py @@ -0,0 +1,105 @@ +# Copyright 2012-2014 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Helper module for systemd service readiness notification. +""" + +import logging +import os +import socket +import sys + + +LOG = logging.getLogger(__name__) + + +def _abstractify(socket_name): + if socket_name.startswith('@'): + # abstract namespace socket + socket_name = '\0%s' % socket_name[1:] + return socket_name + + +def _sd_notify(unset_env, msg): + notify_socket = os.getenv('NOTIFY_SOCKET') + if notify_socket: + sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + try: + sock.connect(_abstractify(notify_socket)) + sock.sendall(msg) + if unset_env: + del os.environ['NOTIFY_SOCKET'] + except EnvironmentError: + LOG.debug("Systemd notification failed", exc_info=True) + finally: + sock.close() + + +def notify(): + """Send notification to Systemd that service is ready. + + For details see + http://www.freedesktop.org/software/systemd/man/sd_notify.html + """ + _sd_notify(False, 'READY=1') + + +def notify_once(): + """Send notification once to Systemd that service is ready. + + Systemd sets NOTIFY_SOCKET environment variable with the name of the + socket listening for notifications from services. + This method removes the NOTIFY_SOCKET environment variable to ensure + notification is sent only once. + """ + _sd_notify(True, 'READY=1') + + +def onready(notify_socket, timeout): + """Wait for systemd style notification on the socket. + + :param notify_socket: local socket address + :type notify_socket: string + :param timeout: socket timeout + :type timeout: float + :returns: 0 service ready + 1 service not ready + 2 timeout occurred + """ + sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + sock.settimeout(timeout) + sock.bind(_abstractify(notify_socket)) + try: + msg = sock.recv(512) + except socket.timeout: + return 2 + finally: + sock.close() + if 'READY=1' in msg: + return 0 + else: + return 1 + + +if __name__ == '__main__': + # simple CLI for testing + if len(sys.argv) == 1: + notify() + elif len(sys.argv) >= 2: + timeout = float(sys.argv[1]) + notify_socket = os.getenv('NOTIFY_SOCKET') + if notify_socket: + retval = onready(notify_socket, timeout) + sys.exit(retval) diff --git a/cloudpulse/openstack/common/threadgroup.py b/cloudpulse/openstack/common/threadgroup.py new file mode 100644 index 0000000..b7cbee4 --- /dev/null +++ b/cloudpulse/openstack/common/threadgroup.py @@ -0,0 +1,150 @@ +# Copyright 2012 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import logging +import threading + +import eventlet +from eventlet import greenpool + +from cloudpulse.openstack.common._i18n import _LE +from cloudpulse.openstack.common import loopingcall + + +LOG = logging.getLogger(__name__) + + +def _thread_done(gt, *args, **kwargs): + """Callback function to be passed to GreenThread.link() when we spawn() + Calls the :class:`ThreadGroup` to notify if. + + """ + kwargs['group'].thread_done(kwargs['thread']) + + +class Thread(object): + """Wrapper around a greenthread, that holds a reference to the + :class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when + it has done so it can be removed from the threads list. + """ + def __init__(self, thread, group): + self.thread = thread + self.thread.link(_thread_done, group=group, thread=self) + + def stop(self): + self.thread.kill() + + def wait(self): + return self.thread.wait() + + def link(self, func, *args, **kwargs): + self.thread.link(func, *args, **kwargs) + + +class ThreadGroup(object): + """The point of the ThreadGroup class is to: + + * keep track of timers and greenthreads (making it easier to stop them + when need be). + * provide an easy API to add timers. + """ + def __init__(self, thread_pool_size=10): + self.pool = greenpool.GreenPool(thread_pool_size) + self.threads = [] + self.timers = [] + + def add_dynamic_timer(self, callback, initial_delay=None, + periodic_interval_max=None, *args, **kwargs): + timer = loopingcall.DynamicLoopingCall(callback, *args, **kwargs) + timer.start(initial_delay=initial_delay, + periodic_interval_max=periodic_interval_max) + self.timers.append(timer) + + def add_timer(self, interval, callback, initial_delay=None, + *args, **kwargs): + pulse = loopingcall.FixedIntervalLoopingCall(callback, *args, **kwargs) + pulse.start(interval=interval, + initial_delay=initial_delay) + self.timers.append(pulse) + + def add_thread(self, callback, *args, **kwargs): + gt = self.pool.spawn(callback, *args, **kwargs) + th = Thread(gt, self) + self.threads.append(th) + return th + + def thread_done(self, thread): + self.threads.remove(thread) + + def _stop_threads(self): + current = threading.current_thread() + + # Iterate over a copy of self.threads so thread_done doesn't + # modify the list while we're iterating + for x in self.threads[:]: + if x is current: + # don't kill the current thread. + continue + try: + x.stop() + except eventlet.greenlet.GreenletExit: + pass + except Exception: + LOG.exception(_LE('Error stopping thread.')) + + def stop_timers(self): + for x in self.timers: + try: + x.stop() + except Exception: + LOG.exception(_LE('Error stopping timer.')) + self.timers = [] + + def stop(self, graceful=False): + """stop function has the option of graceful=True/False. + + * In case of graceful=True, wait for all threads to be finished. + Never kill threads. + * In case of graceful=False, kill threads immediately. + """ + self.stop_timers() + if graceful: + # In case of graceful=True, wait for all threads to be + # finished, never kill threads + self.wait() + else: + # In case of graceful=False(Default), kill threads + # immediately + self._stop_threads() + + def wait(self): + for x in self.timers: + try: + x.wait() + except eventlet.greenlet.GreenletExit: + pass + except Exception: + LOG.exception(_LE('Error waiting on ThreadGroup.')) + current = threading.current_thread() + + # Iterate over a copy of self.threads so thread_done doesn't + # modify the list while we're iterating + for x in self.threads[:]: + if x is current: + continue + try: + x.wait() + except eventlet.greenlet.GreenletExit: + pass + except Exception as ex: + LOG.exception(ex) diff --git a/openstack-common.conf b/openstack-common.conf index ac97626..09c2dfb 100644 --- a/openstack-common.conf +++ b/openstack-common.conf @@ -5,6 +5,7 @@ module=cliutils module=eventlet_backdoor module=loopingcall module=periodic_task +module=service # The base module to hold the copy of openstack.common base=cloudpulse