replace eventlet timer with thread manner
oslo.service doesn't do a monkey_patch, hence it is danger to mix eventlet thread and traditional thread, let's remove eventlet timer as we do for Ceilometer. Change-Id: Ie4e73a8d5e9c19cbaa9a3b18a92a9914f6d4edf1
This commit is contained in:
parent
adc15b1732
commit
5200ec703f
@ -17,8 +17,11 @@
|
|||||||
import abc
|
import abc
|
||||||
import datetime
|
import datetime
|
||||||
import json
|
import json
|
||||||
|
import threading
|
||||||
|
|
||||||
|
from concurrent import futures
|
||||||
import croniter
|
import croniter
|
||||||
|
from futurist import periodics
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
from oslo_service import service as os_service
|
from oslo_service import service as os_service
|
||||||
@ -232,20 +235,44 @@ class AlarmEvaluationService(os_service.Service):
|
|||||||
# allow time for coordination if necessary
|
# allow time for coordination if necessary
|
||||||
delay_start = self.partition_coordinator.is_active()
|
delay_start = self.partition_coordinator.is_active()
|
||||||
|
|
||||||
|
ef = lambda: futures.ThreadPoolExecutor(max_workers=10)
|
||||||
|
self.periodic = periodics.PeriodicWorker.create(
|
||||||
|
[], executor_factory=ef)
|
||||||
|
|
||||||
if self.evaluators:
|
if self.evaluators:
|
||||||
interval = self.conf.evaluation_interval
|
@periodics.periodic(spacing=self.conf.evaluation_interval,
|
||||||
self.tg.add_timer(
|
run_immediately=not delay_start)
|
||||||
interval,
|
def evaluate_alarms():
|
||||||
self._evaluate_assigned_alarms,
|
self._evaluate_assigned_alarms()
|
||||||
initial_delay=interval if delay_start else None)
|
|
||||||
|
self.periodic.add(evaluate_alarms)
|
||||||
|
|
||||||
if self.partition_coordinator.is_active():
|
if self.partition_coordinator.is_active():
|
||||||
heartbeat_interval = min(self.conf.coordination.heartbeat,
|
heartbeat_interval = min(self.conf.coordination.heartbeat,
|
||||||
self.conf.evaluation_interval / 4)
|
self.conf.evaluation_interval / 4)
|
||||||
self.tg.add_timer(heartbeat_interval,
|
|
||||||
self.partition_coordinator.heartbeat)
|
@periodics.periodic(spacing=heartbeat_interval,
|
||||||
|
run_immediately=True)
|
||||||
|
def heartbeat():
|
||||||
|
self.partition_coordinator.heartbeat()
|
||||||
|
|
||||||
|
self.periodic.add(heartbeat)
|
||||||
|
|
||||||
|
t = threading.Thread(target=self.periodic.start)
|
||||||
|
t.daemon = True
|
||||||
|
t.start()
|
||||||
|
|
||||||
|
# NOTE(sileht): We have to drop eventlet to drop this last eventlet
|
||||||
|
# thread
|
||||||
# Add a dummy thread to have wait() working
|
# Add a dummy thread to have wait() working
|
||||||
self.tg.add_timer(604800, lambda: None)
|
self.tg.add_timer(604800, lambda: None)
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self.periodic.stop()
|
||||||
|
self.periodic.wait()
|
||||||
|
self.partition_coordinator.stop()
|
||||||
|
super(AlarmEvaluationService, self).stop()
|
||||||
|
|
||||||
def _assigned_alarms(self):
|
def _assigned_alarms(self):
|
||||||
# NOTE(r-mibu): The 'event' type alarms will be evaluated by the
|
# NOTE(r-mibu): The 'event' type alarms will be evaluated by the
|
||||||
# event-driven alarm evaluator, so this periodical evaluator skips
|
# event-driven alarm evaluator, so this periodical evaluator skips
|
||||||
|
@ -69,20 +69,8 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase):
|
|||||||
self.svc.partition_coordinator.join_group.assert_called_once_with(
|
self.svc.partition_coordinator.join_group.assert_called_once_with(
|
||||||
self.svc.PARTITIONING_GROUP_NAME)
|
self.svc.PARTITIONING_GROUP_NAME)
|
||||||
|
|
||||||
initial_delay = test_interval if coordination_active else None
|
|
||||||
expected = [
|
|
||||||
mock.call(test_interval,
|
|
||||||
self.svc._evaluate_assigned_alarms,
|
|
||||||
initial_delay=initial_delay),
|
|
||||||
mock.call(604800, mock.ANY),
|
|
||||||
]
|
|
||||||
if coordination_active:
|
|
||||||
hb_interval = min(coordination_heartbeat, test_interval / 4)
|
|
||||||
hb_call = mock.call(hb_interval,
|
|
||||||
self.svc.partition_coordinator.heartbeat)
|
|
||||||
expected.insert(1, hb_call)
|
|
||||||
actual = self.svc.tg.add_timer.call_args_list
|
actual = self.svc.tg.add_timer.call_args_list
|
||||||
self.assertEqual(expected, actual)
|
self.assertEqual([mock.call(604800, mock.ANY)], actual)
|
||||||
|
|
||||||
def test_start_singleton(self):
|
def test_start_singleton(self):
|
||||||
self._do_test_start(coordination_active=False)
|
self._do_test_start(coordination_active=False)
|
||||||
|
@ -4,6 +4,8 @@
|
|||||||
|
|
||||||
retrying!=1.3.0,>=1.2.3 # Apache-2.0
|
retrying!=1.3.0,>=1.2.3 # Apache-2.0
|
||||||
croniter>=0.3.4 # MIT License
|
croniter>=0.3.4 # MIT License
|
||||||
|
futures>=3.0;python_version=='2.7' or python_version=='2.6' # BSD
|
||||||
|
futurist>=0.11.0 # Apache-2.0
|
||||||
jsonschema!=2.5.0,<3.0.0,>=2.0.0
|
jsonschema!=2.5.0,<3.0.0,>=2.0.0
|
||||||
keystonemiddleware>=2.2.0
|
keystonemiddleware>=2.2.0
|
||||||
gnocchiclient>=2.1.0 # Apache-2.0
|
gnocchiclient>=2.1.0 # Apache-2.0
|
||||||
|
Loading…
x
Reference in New Issue
Block a user