Updated service.py and its dependencies
This is to avoid service module dependency on importutils that are now moved to oslo.utils. The following changes are included: * neutron/openstack/common/eventlet_backdoor.py 5d40e14 Remove code that moved to oslo.i18n 90ae24b Remove redundant default=None for config options fcf517d Update oslo log messages with translation domains * neutron/openstack/common/loopingcall.py 5d40e14 Remove code that moved to oslo.i18n e377393 Changes calcuation of variable delay ab5d5f1 Use timestamp in loopingcall bc48099 Log the function name of looping call fb4e863 Remove deprecated LoopingCall fcf517d Update oslo log messages with translation domains * neutron/openstack/common/service.py 5d40e14 Remove code that moved to oslo.i18n 6ede600 rpc, notifier: remove deprecated modules * neutron/openstack/common/systemd.py 17c4e21 Fix docstring indentation in systemd * neutron/openstack/common/threadgroup.py 5a1a016 Make stop_timers() method public fdc8883 Add graceful stop function to ThreadGroup.stop 2d06d6c Simple typo correction 4d18b57 threadgroup: use threading rather than greenthread Change-Id: I4887545f861a93223e2c7cbcdd39efe991bff547
This commit is contained in:
parent
90efdc25ac
commit
e4589bc6bd
@ -29,7 +29,7 @@ import eventlet.backdoor
|
|||||||
import greenlet
|
import greenlet
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
|
|
||||||
from neutron.openstack.common.gettextutils import _
|
from neutron.openstack.common._i18n import _LI
|
||||||
from neutron.openstack.common import log as logging
|
from neutron.openstack.common import log as logging
|
||||||
|
|
||||||
help_for_backdoor_port = (
|
help_for_backdoor_port = (
|
||||||
@ -41,7 +41,6 @@ help_for_backdoor_port = (
|
|||||||
"chosen port is displayed in the service's log file.")
|
"chosen port is displayed in the service's log file.")
|
||||||
eventlet_backdoor_opts = [
|
eventlet_backdoor_opts = [
|
||||||
cfg.StrOpt('backdoor_port',
|
cfg.StrOpt('backdoor_port',
|
||||||
default=None,
|
|
||||||
help="Enable eventlet backdoor. %s" % help_for_backdoor_port)
|
help="Enable eventlet backdoor. %s" % help_for_backdoor_port)
|
||||||
]
|
]
|
||||||
|
|
||||||
@ -137,8 +136,10 @@ def initialize_if_enabled():
|
|||||||
# In the case of backdoor port being zero, a port number is assigned by
|
# In the case of backdoor port being zero, a port number is assigned by
|
||||||
# listen(). In any case, pull the port number out here.
|
# listen(). In any case, pull the port number out here.
|
||||||
port = sock.getsockname()[1]
|
port = sock.getsockname()[1]
|
||||||
LOG.info(_('Eventlet backdoor listening on %(port)s for process %(pid)d') %
|
LOG.info(
|
||||||
{'port': port, 'pid': os.getpid()})
|
_LI('Eventlet backdoor listening on %(port)s for process %(pid)d') %
|
||||||
|
{'port': port, 'pid': os.getpid()}
|
||||||
|
)
|
||||||
eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock,
|
eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock,
|
||||||
locals=backdoor_locals)
|
locals=backdoor_locals)
|
||||||
return port
|
return port
|
||||||
|
@ -16,31 +16,36 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
|
import time
|
||||||
|
|
||||||
from eventlet import event
|
from eventlet import event
|
||||||
from eventlet import greenthread
|
from eventlet import greenthread
|
||||||
|
|
||||||
from neutron.openstack.common.gettextutils import _
|
from neutron.openstack.common._i18n import _LE, _LW
|
||||||
from neutron.openstack.common import log as logging
|
from neutron.openstack.common import log as logging
|
||||||
from neutron.openstack.common import timeutils
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# NOTE(zyluo): This lambda function was declared to avoid mocking collisions
|
||||||
|
# with time.time() called in the standard logging module
|
||||||
|
# during unittests.
|
||||||
|
_ts = lambda: time.time()
|
||||||
|
|
||||||
|
|
||||||
class LoopingCallDone(Exception):
|
class LoopingCallDone(Exception):
|
||||||
"""Exception to break out and stop a LoopingCall.
|
"""Exception to break out and stop a LoopingCallBase.
|
||||||
|
|
||||||
The poll-function passed to LoopingCall can raise this exception to
|
The poll-function passed to LoopingCallBase can raise this exception to
|
||||||
break out of the loop normally. This is somewhat analogous to
|
break out of the loop normally. This is somewhat analogous to
|
||||||
StopIteration.
|
StopIteration.
|
||||||
|
|
||||||
An optional return-value can be included as the argument to the exception;
|
An optional return-value can be included as the argument to the exception;
|
||||||
this return-value will be returned by LoopingCall.wait()
|
this return-value will be returned by LoopingCallBase.wait()
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, retvalue=True):
|
def __init__(self, retvalue=True):
|
||||||
""":param retvalue: Value that LoopingCall.wait() should return."""
|
""":param retvalue: Value that LoopingCallBase.wait() should return."""
|
||||||
self.retvalue = retvalue
|
self.retvalue = retvalue
|
||||||
|
|
||||||
|
|
||||||
@ -72,21 +77,22 @@ class FixedIntervalLoopingCall(LoopingCallBase):
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
while self._running:
|
while self._running:
|
||||||
start = timeutils.utcnow()
|
start = _ts()
|
||||||
self.f(*self.args, **self.kw)
|
self.f(*self.args, **self.kw)
|
||||||
end = timeutils.utcnow()
|
end = _ts()
|
||||||
if not self._running:
|
if not self._running:
|
||||||
break
|
break
|
||||||
delay = interval - timeutils.delta_seconds(start, end)
|
delay = end - start - interval
|
||||||
if delay <= 0:
|
if delay > 0:
|
||||||
LOG.warn(_('task run outlasted interval by %s sec') %
|
LOG.warn(_LW('task %(func_name)s run outlasted '
|
||||||
-delay)
|
'interval by %(delay).2f sec'),
|
||||||
greenthread.sleep(delay if delay > 0 else 0)
|
{'func_name': repr(self.f), 'delay': delay})
|
||||||
|
greenthread.sleep(-delay if delay < 0 else 0)
|
||||||
except LoopingCallDone as e:
|
except LoopingCallDone as e:
|
||||||
self.stop()
|
self.stop()
|
||||||
done.send(e.retvalue)
|
done.send(e.retvalue)
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.exception(_('in fixed duration looping call'))
|
LOG.exception(_LE('in fixed duration looping call'))
|
||||||
done.send_exception(*sys.exc_info())
|
done.send_exception(*sys.exc_info())
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
@ -98,11 +104,6 @@ class FixedIntervalLoopingCall(LoopingCallBase):
|
|||||||
return self.done
|
return self.done
|
||||||
|
|
||||||
|
|
||||||
# TODO(mikal): this class name is deprecated in Havana and should be removed
|
|
||||||
# in the I release
|
|
||||||
LoopingCall = FixedIntervalLoopingCall
|
|
||||||
|
|
||||||
|
|
||||||
class DynamicLoopingCall(LoopingCallBase):
|
class DynamicLoopingCall(LoopingCallBase):
|
||||||
"""A looping call which sleeps until the next known event.
|
"""A looping call which sleeps until the next known event.
|
||||||
|
|
||||||
@ -126,14 +127,15 @@ class DynamicLoopingCall(LoopingCallBase):
|
|||||||
|
|
||||||
if periodic_interval_max is not None:
|
if periodic_interval_max is not None:
|
||||||
idle = min(idle, periodic_interval_max)
|
idle = min(idle, periodic_interval_max)
|
||||||
LOG.debug(_('Dynamic looping call sleeping for %.02f '
|
LOG.debug('Dynamic looping call %(func_name)s sleeping '
|
||||||
'seconds'), idle)
|
'for %(idle).02f seconds',
|
||||||
|
{'func_name': repr(self.f), 'idle': idle})
|
||||||
greenthread.sleep(idle)
|
greenthread.sleep(idle)
|
||||||
except LoopingCallDone as e:
|
except LoopingCallDone as e:
|
||||||
self.stop()
|
self.stop()
|
||||||
done.send(e.retvalue)
|
done.send(e.retvalue)
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.exception(_('in dynamic looping call'))
|
LOG.exception(_LE('in dynamic looping call'))
|
||||||
done.send_exception(*sys.exc_info())
|
done.send_exception(*sys.exc_info())
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
|
@ -38,14 +38,12 @@ from eventlet import event
|
|||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
|
|
||||||
from neutron.openstack.common import eventlet_backdoor
|
from neutron.openstack.common import eventlet_backdoor
|
||||||
from neutron.openstack.common.gettextutils import _LE, _LI, _LW
|
from neutron.openstack.common._i18n import _LE, _LI, _LW
|
||||||
from neutron.openstack.common import importutils
|
|
||||||
from neutron.openstack.common import log as logging
|
from neutron.openstack.common import log as logging
|
||||||
from neutron.openstack.common import systemd
|
from neutron.openstack.common import systemd
|
||||||
from neutron.openstack.common import threadgroup
|
from neutron.openstack.common import threadgroup
|
||||||
|
|
||||||
|
|
||||||
rpc = importutils.try_import('neutron.openstack.common.rpc')
|
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -180,12 +178,6 @@ class ServiceLauncher(Launcher):
|
|||||||
status = exc.code
|
status = exc.code
|
||||||
finally:
|
finally:
|
||||||
self.stop()
|
self.stop()
|
||||||
if rpc:
|
|
||||||
try:
|
|
||||||
rpc.cleanup()
|
|
||||||
except Exception:
|
|
||||||
# We're shutting down, so it doesn't matter at this point.
|
|
||||||
LOG.exception(_LE('Exception during rpc cleanup.'))
|
|
||||||
|
|
||||||
return status, signo
|
return status, signo
|
||||||
|
|
||||||
|
@ -50,14 +50,16 @@ def _sd_notify(unset_env, msg):
|
|||||||
|
|
||||||
def notify():
|
def notify():
|
||||||
"""Send notification to Systemd that service is ready.
|
"""Send notification to Systemd that service is ready.
|
||||||
|
|
||||||
For details see
|
For details see
|
||||||
http://www.freedesktop.org/software/systemd/man/sd_notify.html
|
http://www.freedesktop.org/software/systemd/man/sd_notify.html
|
||||||
"""
|
"""
|
||||||
_sd_notify(False, 'READY=1')
|
_sd_notify(False, 'READY=1')
|
||||||
|
|
||||||
|
|
||||||
def notify_once():
|
def notify_once():
|
||||||
"""Send notification once to Systemd that service is ready.
|
"""Send notification once to Systemd that service is ready.
|
||||||
|
|
||||||
Systemd sets NOTIFY_SOCKET environment variable with the name of the
|
Systemd sets NOTIFY_SOCKET environment variable with the name of the
|
||||||
socket listening for notifications from services.
|
socket listening for notifications from services.
|
||||||
This method removes the NOTIFY_SOCKET environment variable to ensure
|
This method removes the NOTIFY_SOCKET environment variable to ensure
|
||||||
|
@ -11,10 +11,10 @@
|
|||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
import threading
|
||||||
|
|
||||||
import eventlet
|
import eventlet
|
||||||
from eventlet import greenpool
|
from eventlet import greenpool
|
||||||
from eventlet import greenthread
|
|
||||||
|
|
||||||
from neutron.openstack.common import log as logging
|
from neutron.openstack.common import log as logging
|
||||||
from neutron.openstack.common import loopingcall
|
from neutron.openstack.common import loopingcall
|
||||||
@ -51,7 +51,7 @@ class Thread(object):
|
|||||||
|
|
||||||
|
|
||||||
class ThreadGroup(object):
|
class ThreadGroup(object):
|
||||||
"""The point of the ThreadGroup classis to:
|
"""The point of the ThreadGroup class is to:
|
||||||
|
|
||||||
* keep track of timers and greenthreads (making it easier to stop them
|
* keep track of timers and greenthreads (making it easier to stop them
|
||||||
when need be).
|
when need be).
|
||||||
@ -85,8 +85,8 @@ class ThreadGroup(object):
|
|||||||
def thread_done(self, thread):
|
def thread_done(self, thread):
|
||||||
self.threads.remove(thread)
|
self.threads.remove(thread)
|
||||||
|
|
||||||
def stop(self):
|
def _stop_threads(self):
|
||||||
current = greenthread.getcurrent()
|
current = threading.current_thread()
|
||||||
|
|
||||||
# Iterate over a copy of self.threads so thread_done doesn't
|
# Iterate over a copy of self.threads so thread_done doesn't
|
||||||
# modify the list while we're iterating
|
# modify the list while we're iterating
|
||||||
@ -99,6 +99,7 @@ class ThreadGroup(object):
|
|||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
LOG.exception(ex)
|
LOG.exception(ex)
|
||||||
|
|
||||||
|
def stop_timers(self):
|
||||||
for x in self.timers:
|
for x in self.timers:
|
||||||
try:
|
try:
|
||||||
x.stop()
|
x.stop()
|
||||||
@ -106,6 +107,23 @@ class ThreadGroup(object):
|
|||||||
LOG.exception(ex)
|
LOG.exception(ex)
|
||||||
self.timers = []
|
self.timers = []
|
||||||
|
|
||||||
|
def stop(self, graceful=False):
|
||||||
|
"""stop function has the option of graceful=True/False.
|
||||||
|
|
||||||
|
* In case of graceful=True, wait for all threads to be finished.
|
||||||
|
Never kill threads.
|
||||||
|
* In case of graceful=False, kill threads immediately.
|
||||||
|
"""
|
||||||
|
self.stop_timers()
|
||||||
|
if graceful:
|
||||||
|
# In case of graceful=True, wait for all threads to be
|
||||||
|
# finished, never kill threads
|
||||||
|
self.wait()
|
||||||
|
else:
|
||||||
|
# In case of graceful=False(Default), kill threads
|
||||||
|
# immediately
|
||||||
|
self._stop_threads()
|
||||||
|
|
||||||
def wait(self):
|
def wait(self):
|
||||||
for x in self.timers:
|
for x in self.timers:
|
||||||
try:
|
try:
|
||||||
@ -114,7 +132,7 @@ class ThreadGroup(object):
|
|||||||
pass
|
pass
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
LOG.exception(ex)
|
LOG.exception(ex)
|
||||||
current = greenthread.getcurrent()
|
current = threading.current_thread()
|
||||||
|
|
||||||
# Iterate over a copy of self.threads so thread_done doesn't
|
# Iterate over a copy of self.threads so thread_done doesn't
|
||||||
# modify the list while we're iterating
|
# modify the list while we're iterating
|
||||||
|
Loading…
x
Reference in New Issue
Block a user