From 4fdd180b5ad0447d729236104014949dd44649d4 Mon Sep 17 00:00:00 2001 From: Gary Kotton Date: Sat, 3 Nov 2012 03:18:06 +0000 Subject: [PATCH] Add eventlet_backdoor and threadgroup from openstack-common In addition to this the list of imports is now in an alphabetical order. Change-Id: I34517ce3f740f63094d21b3cbebb542cc40661a2 --- openstack-common.conf | 2 +- quantum/openstack/common/eventlet_backdoor.py | 78 ++++++++++++ quantum/openstack/common/threadgroup.py | 119 ++++++++++++++++++ 3 files changed, 198 insertions(+), 1 deletion(-) create mode 100644 quantum/openstack/common/eventlet_backdoor.py create mode 100644 quantum/openstack/common/threadgroup.py diff --git a/openstack-common.conf b/openstack-common.conf index c9a003964c..93c65eb901 100644 --- a/openstack-common.conf +++ b/openstack-common.conf @@ -1,5 +1,5 @@ [DEFAULT] # The list of modules to copy from openstack-common -modules=cfg,exception,importutils,iniparser,jsonutils,policy,setup,network_utils,notifier,timeutils,log,context,local,rpc,gettextutils,excutils,uuidutils,service,loopingcall +modules=cfg,context,eventlet_backdoor,exception,excutils,gettextutils,importutils,iniparser,jsonutils,local,log,loopingcall,network_utils,notifier,policy,rpc,service,setup,threadgroup,timeutils,uuidutils # The base module to hold the copy of openstack.common base=quantum diff --git a/quantum/openstack/common/eventlet_backdoor.py b/quantum/openstack/common/eventlet_backdoor.py new file mode 100644 index 0000000000..cee9bb55d5 --- /dev/null +++ b/quantum/openstack/common/eventlet_backdoor.py @@ -0,0 +1,78 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2012 Openstack, LLC. +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import gc +import pprint +import sys +import traceback + +import eventlet +import eventlet.backdoor +import greenlet + +from quantum.openstack.common import cfg + +eventlet_backdoor_opts = [ + cfg.IntOpt('backdoor_port', + default=None, + help='port for eventlet backdoor to listen') +] + +CONF = cfg.CONF +CONF.register_opts(eventlet_backdoor_opts) + + +def _dont_use_this(): + print "Don't use this, just disconnect instead" + + +def _find_objects(t): + return filter(lambda o: isinstance(o, t), gc.get_objects()) + + +def _print_greenthreads(): + for i, gt in enumerate(find_objects(greenlet.greenlet)): + print i, gt + traceback.print_stack(gt.gr_frame) + print + + +def initialize_if_enabled(): + backdoor_locals = { + 'exit': _dont_use_this, # So we don't exit the entire process + 'quit': _dont_use_this, # So we don't exit the entire process + 'fo': _find_objects, + 'pgt': _print_greenthreads, + } + + if CONF.backdoor_port is None: + return + + # NOTE(johannes): The standard sys.displayhook will print the value of + # the last expression and set it to __builtin__._, which overwrites + # the __builtin__._ that gettext sets. Let's switch to using pprint + # since it won't interact poorly with gettext, and it's easier to + # read the output too. + def displayhook(val): + if val is not None: + pprint.pprint(val) + sys.displayhook = displayhook + + eventlet.spawn_n(eventlet.backdoor.backdoor_server, + eventlet.listen(('localhost', CONF.backdoor_port)), + locals=backdoor_locals) diff --git a/quantum/openstack/common/threadgroup.py b/quantum/openstack/common/threadgroup.py new file mode 100644 index 0000000000..9d39d04983 --- /dev/null +++ b/quantum/openstack/common/threadgroup.py @@ -0,0 +1,119 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from eventlet import greenlet +from eventlet import greenpool +from eventlet import greenthread + +from quantum.openstack.common.gettextutils import _ +from quantum.openstack.common import log as logging +from quantum.openstack.common import loopingcall + + +LOG = logging.getLogger(__name__) + + +def _thread_done(gt, *args, **kwargs): + ''' + Callback function to be passed to GreenThread.link() when we spawn() + Calls the ThreadGroup to notify if. + ''' + kwargs['group'].thread_done(kwargs['thread']) + + +class Thread(object): + """ + Wrapper around a greenthread, that holds a reference to + the ThreadGroup. The Thread will notify the ThreadGroup + when it has done so it can be removed from the threads + list. + """ + def __init__(self, name, thread, group): + self.name = name + self.thread = thread + self.thread.link(_thread_done, group=group, thread=self) + + def stop(self): + self.thread.cancel() + + def wait(self): + return self.thread.wait() + + +class ThreadGroup(object): + """ + The point of this class is to: + - keep track of timers and greenthreads (making it easier to stop them + when need be). + - provide an easy API to add timers. + """ + def __init__(self, name, thread_pool_size=10): + self.name = name + self.pool = greenpool.GreenPool(thread_pool_size) + self.threads = [] + self.timers = [] + + def add_timer(self, interval, callback, initial_delay=None, + *args, **kwargs): + pulse = loopingcall.LoopingCall(callback, *args, **kwargs) + pulse.start(interval=interval, + initial_delay=initial_delay) + self.timers.append(pulse) + + def add_thread(self, callback, *args, **kwargs): + gt = self.pool.spawn(callback, *args, **kwargs) + th = Thread(callback.__name__, gt, self) + self.threads.append(th) + + def thread_done(self, thread): + self.threads.remove(thread) + + def stop(self): + current = greenthread.getcurrent() + for x in self.threads: + if x is current: + # don't kill the current thread. + continue + try: + x.stop() + except Exception as ex: + LOG.exception(ex) + + for x in self.timers: + try: + x.stop() + except Exception as ex: + LOG.exception(ex) + self.timers = [] + + def wait(self): + for x in self.timers: + try: + x.wait() + except greenlet.GreenletExit: + pass + except Exception as ex: + LOG.exception(ex) + current = greenthread.getcurrent() + for x in self.threads: + if x is current: + continue + try: + x.wait() + except greenlet.GreenletExit: + pass + except Exception as ex: + LOG.exception(ex)