Merge "Sync periodic_task from oslo-incubator"

This commit is contained in:
Jenkins 2014-05-30 16:22:50 +00:00 committed by Gerrit Code Review
commit 6d8f16a5dd
2 changed files with 27 additions and 31 deletions

View File

@ -11,22 +11,20 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import time
from oslo.config import cfg
import six
from neutron.openstack.common.gettextutils import _
from neutron.openstack.common.gettextutils import _, _LE, _LI
from neutron.openstack.common import log as logging
from neutron.openstack.common import timeutils
periodic_opts = [
cfg.BoolOpt('run_external_periodic_tasks',
default=True,
help=('Some periodic tasks can be run in a separate process. '
'Should we run them here?')),
help='Some periodic tasks can be run in a separate process. '
'Should we run them here?'),
]
CONF = cfg.CONF
@ -46,8 +44,8 @@ def periodic_task(*args, **kwargs):
This decorator can be used in two ways:
1. Without arguments '@periodic_task', this will be run on every cycle
of the periodic scheduler.
1. Without arguments '@periodic_task', this will be run on the default
interval of 60 seconds.
2. With arguments:
@periodic_task(spacing=N [, run_immediately=[True|False]])
@ -78,7 +76,7 @@ def periodic_task(*args, **kwargs):
if f._periodic_immediate:
f._periodic_last_run = None
else:
f._periodic_last_run = timeutils.utcnow()
f._periodic_last_run = time.time()
return f
# NOTE(sirp): The `if` is necessary to allow the decorator to be used with
@ -113,11 +111,6 @@ class _PeriodicTasksMeta(type):
except AttributeError:
cls._periodic_tasks = []
try:
cls._periodic_last_run = cls._periodic_last_run.copy()
except AttributeError:
cls._periodic_last_run = {}
try:
cls._periodic_spacing = cls._periodic_spacing.copy()
except AttributeError:
@ -129,28 +122,33 @@ class _PeriodicTasksMeta(type):
name = task.__name__
if task._periodic_spacing < 0:
LOG.info(_('Skipping periodic task %(task)s because '
'its interval is negative'),
LOG.info(_LI('Skipping periodic task %(task)s because '
'its interval is negative'),
{'task': name})
continue
if not task._periodic_enabled:
LOG.info(_('Skipping periodic task %(task)s because '
'it is disabled'),
LOG.info(_LI('Skipping periodic task %(task)s because '
'it is disabled'),
{'task': name})
continue
# A periodic spacing of zero indicates that this task should
# be run every pass
# be run on the default interval to avoid running too
# frequently.
if task._periodic_spacing == 0:
task._periodic_spacing = None
task._periodic_spacing = DEFAULT_INTERVAL
cls._periodic_tasks.append((name, task))
cls._periodic_spacing[name] = task._periodic_spacing
cls._periodic_last_run[name] = task._periodic_last_run
@six.add_metaclass(_PeriodicTasksMeta)
class PeriodicTasks(object):
def __init__(self):
super(PeriodicTasks, self).__init__()
self._periodic_last_run = {}
for name, task in self._periodic_tasks:
self._periodic_last_run[name] = task._periodic_last_run
def run_periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
@ -158,30 +156,27 @@ class PeriodicTasks(object):
for task_name, task in self._periodic_tasks:
full_task_name = '.'.join([self.__class__.__name__, task_name])
now = timeutils.utcnow()
spacing = self._periodic_spacing[task_name]
last_run = self._periodic_last_run[task_name]
# If a periodic task is _nearly_ due, then we'll run it early
if spacing is not None and last_run is not None:
due = last_run + datetime.timedelta(seconds=spacing)
if not timeutils.is_soon(due, 0.2):
idle_for = min(idle_for, timeutils.delta_seconds(now, due))
idle_for = min(idle_for, spacing)
if last_run is not None:
delta = last_run + spacing - time.time()
if delta > 0.2:
idle_for = min(idle_for, delta)
continue
if spacing is not None:
idle_for = min(idle_for, spacing)
LOG.debug(_("Running periodic task %(full_task_name)s"),
LOG.debug("Running periodic task %(full_task_name)s",
{"full_task_name": full_task_name})
self._periodic_last_run[task_name] = timeutils.utcnow()
self._periodic_last_run[task_name] = time.time()
try:
task(self, context)
except Exception as e:
if raise_on_error:
raise
LOG.exception(_("Error during %(full_task_name)s: %(e)s"),
LOG.exception(_LE("Error during %(full_task_name)s: %(e)s"),
{"full_task_name": full_task_name, "e": e})
time.sleep(0)

View File

@ -58,6 +58,7 @@ class LbaasAgentManager(periodic_task.PeriodicTasks):
# object individually;
def __init__(self, conf):
super(LbaasAgentManager, self).__init__()
self.conf = conf
self.context = context.get_admin_context_without_session()
self.plugin_rpc = agent_api.LbaasAgentApi(