Merge "Event simulator II"
This commit is contained in:
commit
ba7ed70e1b
@ -190,7 +190,7 @@ paste.filter_factory = trove.common.wsgi:ContextMiddleware.factory
|
||||
paste.filter_factory = trove.common.wsgi:FaultWrapper.factory
|
||||
|
||||
[filter:ratelimit]
|
||||
paste.filter_factory = trove.common.limits:RateLimitingMiddleware.factory
|
||||
paste.filter_factory = trove.tests.fakes.limits:FakeRateLimitingMiddleware.factory
|
||||
|
||||
[app:troveapp]
|
||||
paste.app_factory = trove.common.api:app_factory
|
||||
|
111
run_tests.py
111
run_tests.py
@ -16,6 +16,7 @@
|
||||
# under the License.
|
||||
#
|
||||
|
||||
import functools
|
||||
import gettext
|
||||
import os
|
||||
import urllib
|
||||
@ -34,6 +35,7 @@ import eventlet
|
||||
eventlet.monkey_patch(thread=False)
|
||||
|
||||
CONF = cfg.CONF
|
||||
original_excepthook = sys.excepthook
|
||||
|
||||
|
||||
def add_support_for_localization():
|
||||
@ -153,18 +155,76 @@ def initialize_fakes(app):
|
||||
wsgi_interceptor)
|
||||
from trove.tests.util import event_simulator
|
||||
event_simulator.monkey_patch()
|
||||
from trove.tests.fakes import taskmanager
|
||||
taskmanager.monkey_patch()
|
||||
|
||||
|
||||
def parse_args_for_test_config():
|
||||
test_conf = 'etc/tests/localhost.test.conf'
|
||||
repl = False
|
||||
new_argv = []
|
||||
for index in range(len(sys.argv)):
|
||||
arg = sys.argv[index]
|
||||
print(arg)
|
||||
if arg[:14] == "--test-config=":
|
||||
del sys.argv[index]
|
||||
return arg[14:]
|
||||
return 'etc/tests/localhost.test.conf'
|
||||
test_conf = arg[14:]
|
||||
elif arg == "--repl":
|
||||
repl = True
|
||||
else:
|
||||
new_argv.append(arg)
|
||||
sys.argv = new_argv
|
||||
return test_conf, repl
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
def run_tests(repl):
|
||||
"""Runs all of the tests."""
|
||||
|
||||
if repl:
|
||||
# Actually show errors in the repl.
|
||||
sys.excepthook = original_excepthook
|
||||
|
||||
def no_thanks(exit_code):
|
||||
print("Tests finished with exit code %d." % exit_code)
|
||||
sys.exit = no_thanks
|
||||
|
||||
proboscis.TestProgram().run_and_exit()
|
||||
|
||||
if repl:
|
||||
import code
|
||||
code.interact()
|
||||
|
||||
|
||||
def import_tests():
|
||||
# F401 unused imports needed for tox tests
|
||||
from trove.tests.api import backups # noqa
|
||||
from trove.tests.api import header # noqa
|
||||
from trove.tests.api import limits # noqa
|
||||
from trove.tests.api import flavors # noqa
|
||||
from trove.tests.api import versions # noqa
|
||||
from trove.tests.api import instances as rd_instances # noqa
|
||||
from trove.tests.api import instances_actions as rd_actions # noqa
|
||||
from trove.tests.api import instances_delete # noqa
|
||||
from trove.tests.api import instances_mysql_down # noqa
|
||||
from trove.tests.api import instances_resize # noqa
|
||||
from trove.tests.api import configurations # noqa
|
||||
from trove.tests.api import databases # noqa
|
||||
from trove.tests.api import datastores # noqa
|
||||
from trove.tests.api import replication # noqa
|
||||
from trove.tests.api import root # noqa
|
||||
from trove.tests.api import root_on_create # noqa
|
||||
from trove.tests.api import users # noqa
|
||||
from trove.tests.api import user_access # noqa
|
||||
from trove.tests.api.mgmt import accounts # noqa
|
||||
from trove.tests.api.mgmt import admin_required # noqa
|
||||
from trove.tests.api.mgmt import hosts # noqa
|
||||
from trove.tests.api.mgmt import instances as mgmt_instances # noqa
|
||||
from trove.tests.api.mgmt import instances_actions as mgmt_actions # noqa
|
||||
from trove.tests.api.mgmt import storage # noqa
|
||||
from trove.tests.api.mgmt import malformed_json # noqa
|
||||
from trove.tests.db import migrations # noqa
|
||||
|
||||
|
||||
def main(import_func):
|
||||
try:
|
||||
wsgi_install()
|
||||
add_support_for_localization()
|
||||
@ -175,44 +235,25 @@ if __name__ == "__main__":
|
||||
app = initialize_trove(config_file)
|
||||
# Initialize sqlite database.
|
||||
initialize_database()
|
||||
# Swap out WSGI, httplib, and several sleep functions
|
||||
# with test doubles.
|
||||
# Swap out WSGI, httplib, and other components with test doubles.
|
||||
initialize_fakes(app)
|
||||
|
||||
# Initialize the test configuration.
|
||||
test_config_file = parse_args_for_test_config()
|
||||
test_config_file, repl = parse_args_for_test_config()
|
||||
CONFIG.load_from_file(test_config_file)
|
||||
|
||||
# F401 unused imports needed for tox tests
|
||||
from trove.tests.api import backups # noqa
|
||||
from trove.tests.api import header # noqa
|
||||
from trove.tests.api import limits # noqa
|
||||
from trove.tests.api import flavors # noqa
|
||||
from trove.tests.api import versions # noqa
|
||||
from trove.tests.api import instances as rd_instances # noqa
|
||||
from trove.tests.api import instances_actions as rd_actions # noqa
|
||||
from trove.tests.api import instances_delete # noqa
|
||||
from trove.tests.api import instances_mysql_down # noqa
|
||||
from trove.tests.api import instances_resize # noqa
|
||||
from trove.tests.api import configurations # noqa
|
||||
from trove.tests.api import databases # noqa
|
||||
from trove.tests.api import datastores # noqa
|
||||
from trove.tests.api import replication # noqa
|
||||
from trove.tests.api import root # noqa
|
||||
from trove.tests.api import root_on_create # noqa
|
||||
from trove.tests.api import users # noqa
|
||||
from trove.tests.api import user_access # noqa
|
||||
from trove.tests.api.mgmt import accounts # noqa
|
||||
from trove.tests.api.mgmt import admin_required # noqa
|
||||
from trove.tests.api.mgmt import hosts # noqa
|
||||
from trove.tests.api.mgmt import instances as mgmt_instances # noqa
|
||||
from trove.tests.api.mgmt import instances_actions as mgmt_actions # noqa
|
||||
from trove.tests.api.mgmt import storage # noqa
|
||||
from trove.tests.api.mgmt import malformed_json # noqa
|
||||
from trove.tests.db import migrations # noqa
|
||||
import_func()
|
||||
|
||||
from trove.tests.util import event_simulator
|
||||
event_simulator.run_main(functools.partial(run_tests, repl))
|
||||
|
||||
except Exception as e:
|
||||
# Printing the error manually like this is necessary due to oddities
|
||||
# with sys.excepthook.
|
||||
print("Run tests failed: %s" % e)
|
||||
traceback.print_exc()
|
||||
raise
|
||||
|
||||
proboscis.TestProgram().run_and_exit()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main(import_tests)
|
||||
|
@ -207,7 +207,7 @@ class RateLimitingMiddleware(base_wsgi.TroveMiddleware):
|
||||
|
||||
delay, error = self._limiter.check_for_delay(verb, url, tenant_id)
|
||||
|
||||
if delay:
|
||||
if delay and self.enabled():
|
||||
msg = _("This request was rate-limited.")
|
||||
retry = time.time() + delay
|
||||
return base_wsgi.OverLimitFault(msg, error, retry)
|
||||
@ -216,6 +216,9 @@ class RateLimitingMiddleware(base_wsgi.TroveMiddleware):
|
||||
|
||||
return self.application
|
||||
|
||||
def enabled(self):
|
||||
return True
|
||||
|
||||
|
||||
class Limiter(object):
|
||||
"""
|
||||
|
@ -33,6 +33,7 @@ from trove.tests.util.users import Requirements
|
||||
from trove.tests.api.instances import instance_info
|
||||
from trove.tests.api.instances import VOLUME_SUPPORT
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
@ -52,15 +53,24 @@ class TestBase(object):
|
||||
volume, [], [])
|
||||
return result.id
|
||||
|
||||
def wait_for_instance_status(self, instance_id, status="ACTIVE"):
|
||||
def wait_for_instance_status(self, instance_id, status="ACTIVE",
|
||||
acceptable_states=None):
|
||||
if acceptable_states:
|
||||
acceptable_states.append(status)
|
||||
|
||||
def assert_state(instance):
|
||||
if acceptable_states:
|
||||
assert_true(instance.status in acceptable_states,
|
||||
"Invalid status: %s" % instance.status)
|
||||
return instance
|
||||
poll_until(lambda: self.dbaas.instances.get(instance_id),
|
||||
lambda instance: instance.status == status,
|
||||
time_out=3, sleep_time=1)
|
||||
lambda instance: assert_state(instance).status == status,
|
||||
time_out=30, sleep_time=1)
|
||||
|
||||
def wait_for_instance_task_status(self, instance_id, description):
|
||||
poll_until(lambda: self.dbaas.management.show(instance_id),
|
||||
lambda instance: instance.task_description == description,
|
||||
time_out=3, sleep_time=1)
|
||||
time_out=30, sleep_time=1)
|
||||
|
||||
def is_instance_deleted(self, instance_id):
|
||||
while True:
|
||||
|
@ -27,6 +27,7 @@ from trove.tests.util import create_dbaas_client
|
||||
from troveclient.compat import exceptions
|
||||
from datetime import datetime
|
||||
from trove.tests.util.users import Users
|
||||
from trove.tests.fakes import limits as fake_limits
|
||||
|
||||
GROUP = "dbaas.api.limits"
|
||||
DEFAULT_RATE = 200
|
||||
@ -35,6 +36,15 @@ DEFAULT_MAX_INSTANCES = 55
|
||||
DEFAULT_MAX_BACKUPS = 5
|
||||
|
||||
|
||||
def ensure_limits_are_not_faked(func):
|
||||
def _cd(*args, **kwargs):
|
||||
fake_limits.ENABLED = True
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
finally:
|
||||
fake_limits.ENABLED = False
|
||||
|
||||
|
||||
@test(groups=[GROUP])
|
||||
class Limits(object):
|
||||
|
||||
@ -81,6 +91,7 @@ class Limits(object):
|
||||
return d
|
||||
|
||||
@test
|
||||
@ensure_limits_are_not_faked
|
||||
def test_limits_index(self):
|
||||
"""Test_limits_index."""
|
||||
|
||||
@ -101,6 +112,7 @@ class Limits(object):
|
||||
assert_true(d[k].nextAvailable is not None)
|
||||
|
||||
@test
|
||||
@ensure_limits_are_not_faked
|
||||
def test_limits_get_remaining(self):
|
||||
"""Test_limits_get_remaining."""
|
||||
|
||||
@ -121,6 +133,7 @@ class Limits(object):
|
||||
assert_true(get.nextAvailable is not None)
|
||||
|
||||
@test
|
||||
@ensure_limits_are_not_faked
|
||||
def test_limits_exception(self):
|
||||
"""Test_limits_exception."""
|
||||
|
||||
|
26
trove/tests/fakes/limits.py
Normal file
26
trove/tests/fakes/limits.py
Normal file
@ -0,0 +1,26 @@
|
||||
# Copyright 2014 Rackspace Hosting
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
from trove.common import limits
|
||||
|
||||
|
||||
ENABLED = False
|
||||
|
||||
|
||||
class FakeRateLimitingMiddleware(limits.RateLimitingMiddleware):
|
||||
|
||||
def enabled(self):
|
||||
return ENABLED
|
@ -297,7 +297,7 @@ class FakeServers(object):
|
||||
"available.")
|
||||
|
||||
server.schedule_status("ACTIVE", 1)
|
||||
LOG.info(_("FAKE_SERVERS_DB : %s") % str(FAKE_SERVERS_DB))
|
||||
LOG.info("FAKE_SERVERS_DB : %s" % str(FAKE_SERVERS_DB))
|
||||
return server
|
||||
|
||||
def _get_volumes_from_bdm(self, block_device_mapping):
|
||||
@ -734,6 +734,9 @@ class FakeSecurityGroups(object):
|
||||
self.securityGroups[secGrp.get_id()] = secGrp
|
||||
return secGrp
|
||||
|
||||
def delete(self, group_id):
|
||||
pass
|
||||
|
||||
def list(self):
|
||||
pass
|
||||
|
||||
|
53
trove/tests/fakes/taskmanager.py
Normal file
53
trove/tests/fakes/taskmanager.py
Normal file
@ -0,0 +1,53 @@
|
||||
# Copyright 2014 Rackspace Hosting
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
import eventlet
|
||||
from trove.taskmanager import api
|
||||
from trove.taskmanager.manager import Manager
|
||||
|
||||
|
||||
class FakeApi(api.API):
|
||||
|
||||
def __init__(self, context):
|
||||
self.context = context
|
||||
|
||||
def make_msg(self, method_name, *args, **kwargs):
|
||||
return {"name": method_name, "args": args, "kwargs": kwargs}
|
||||
|
||||
def call(self, context, msg):
|
||||
manager, method = self.get_tm_method(msg['name'])
|
||||
return method(manager, context, *msg['args'], **msg['kwargs'])
|
||||
|
||||
def cast(self, context, msg):
|
||||
manager, method = self.get_tm_method(msg['name'])
|
||||
|
||||
def func():
|
||||
method(manager, context, *msg['args'], **msg['kwargs'])
|
||||
|
||||
eventlet.spawn_after(0.1, func)
|
||||
|
||||
def get_tm_method(self, method_name):
|
||||
manager = Manager()
|
||||
method = getattr(Manager, method_name)
|
||||
return manager, method
|
||||
|
||||
|
||||
def monkey_patch():
|
||||
api.API = FakeApi
|
||||
|
||||
def fake_load(context, manager=None):
|
||||
return FakeApi(context)
|
||||
api.load = fake_load
|
@ -1,6 +1,4 @@
|
||||
# Copyright 2013 OpenStack Foundation
|
||||
# Copyright 2013 Rackspace Hosting
|
||||
# Copyright 2013 Hewlett-Packard Development Company, L.P.
|
||||
# Copyright 2014 Rackspace Hosting
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
@ -18,116 +16,242 @@
|
||||
|
||||
"""
|
||||
Simulates time itself to make the fake mode tests run even faster.
|
||||
|
||||
Specifically, this forces all various threads of execution to run one at a time
|
||||
based on when they would have been scheduled using the various eventlet spawn
|
||||
functions. Because only one thing is running at a given time, it eliminates
|
||||
race conditions that would normally be present from testing multi-threaded
|
||||
scenarios. It also means that the simulated time.sleep does not actually have
|
||||
to sit around for the designated time, which greatly speeds up the time it
|
||||
takes to run the tests.
|
||||
"""
|
||||
|
||||
from proboscis.asserts import fail
|
||||
from trove.openstack.common import log as logging
|
||||
from trove.common import exception
|
||||
import eventlet
|
||||
from eventlet import spawn as true_spawn
|
||||
from eventlet.event import Event
|
||||
from eventlet.semaphore import Semaphore
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
class Coroutine(object):
|
||||
"""
|
||||
This class simulates a coroutine, which is ironic, as greenlet actually
|
||||
*is* a coroutine. But trying to use greenlet here gives nasty results
|
||||
since eventlet thoroughly monkey-patches things, making it difficult
|
||||
to run greenlet on its own.
|
||||
|
||||
Essentially think of this as a wrapper for eventlet's threads which has a
|
||||
run and sleep function similar to old school coroutines, meaning it won't
|
||||
start until told and when asked to sleep it won't wake back up without
|
||||
permission.
|
||||
"""
|
||||
|
||||
ALL = []
|
||||
|
||||
def __init__(self, func, *args, **kwargs):
|
||||
self.my_sem = Semaphore(0) # This is held by the thread as it runs.
|
||||
self.caller_sem = None
|
||||
self.dead = False
|
||||
started = Event()
|
||||
self.id = 5
|
||||
self.ALL.append(self)
|
||||
|
||||
def go():
|
||||
self.id = eventlet.corolocal.get_ident()
|
||||
started.send(True)
|
||||
self.my_sem.acquire(blocking=True, timeout=None)
|
||||
try:
|
||||
func(*args, **kwargs)
|
||||
# except Exception as e:
|
||||
# print("Exception in coroutine! %s" % e)
|
||||
finally:
|
||||
self.dead = True
|
||||
self.caller_sem.release() # Relinquish control back to caller.
|
||||
for i in range(len(self.ALL)):
|
||||
if self.ALL[i].id == self.id:
|
||||
del self.ALL[i]
|
||||
break
|
||||
|
||||
true_spawn(go)
|
||||
started.wait()
|
||||
|
||||
@classmethod
|
||||
def get_current(cls):
|
||||
"""Finds the coroutine associated with the thread which calls it."""
|
||||
return cls.get_by_id(eventlet.corolocal.get_ident())
|
||||
|
||||
@classmethod
|
||||
def get_by_id(cls, id):
|
||||
for cr in cls.ALL:
|
||||
if cr.id == id:
|
||||
return cr
|
||||
raise RuntimeError("Coroutine with id %s not found!" % id)
|
||||
|
||||
def sleep(self):
|
||||
"""Puts the coroutine to sleep until run is called again.
|
||||
|
||||
This should only be called by the thread which owns this object.
|
||||
"""
|
||||
# Only call this from it's own thread.
|
||||
assert eventlet.corolocal.get_ident() == self.id
|
||||
self.caller_sem.release() # Relinquish control back to caller.
|
||||
self.my_sem.acquire(blocking=True, timeout=None)
|
||||
|
||||
def run(self):
|
||||
"""Starts up the thread. Should be called from a different thread."""
|
||||
# Don't call this from the thread which it represents.
|
||||
assert eventlet.corolocal.get_ident() != self.id
|
||||
self.caller_sem = Semaphore(0)
|
||||
self.my_sem.release()
|
||||
self.caller_sem.acquire() # Wait for it to finish.
|
||||
|
||||
|
||||
allowable_empty_sleeps = 0
|
||||
pending_events = []
|
||||
sleep_entrance_count = 0
|
||||
|
||||
|
||||
def event_simulator_spawn_after(time_from_now_in_seconds, func, *args, **kw):
|
||||
"""Fakes events without doing any actual waiting."""
|
||||
def __cb():
|
||||
func(*args, **kw)
|
||||
pending_events.append({"time": time_from_now_in_seconds, "func": __cb})
|
||||
main_greenlet = None
|
||||
|
||||
|
||||
def event_simulator_spawn(func, *args, **kw):
|
||||
event_simulator_spawn_after(0, func, *args, **kw)
|
||||
fake_threads = []
|
||||
|
||||
|
||||
def event_simulator_sleep(time_to_sleep):
|
||||
"""Simulates waiting for an event.
|
||||
allowable_empty_sleeps = 1
|
||||
sleep_allowance = allowable_empty_sleeps
|
||||
|
||||
This is used to monkey patch the sleep methods, so that no actually waiting
|
||||
occurs but functions which would have run as threads are executed.
|
||||
|
||||
This function will also raise an assertion failure if there were no pending
|
||||
events ready to run. If this happens there are two possibilities:
|
||||
1. The test code (or potentially code in Trove task manager) is
|
||||
sleeping even though no action is taking place in
|
||||
another thread.
|
||||
2. The test code (or task manager code) is sleeping waiting for a
|
||||
condition that will never be met because the thread it was waiting
|
||||
on experienced an error or did not finish successfully.
|
||||
|
||||
A good example of this second case is when a bug in task manager causes the
|
||||
create instance method to fail right away, but the test code tries to poll
|
||||
the instance's status until it gets rate limited. That makes finding the
|
||||
real error a real hassle. Thus it makes more sense to raise an exception
|
||||
whenever the app seems to be napping for no reason.
|
||||
def other_threads_are_active():
|
||||
"""Returns True if concurrent activity is being simulated.
|
||||
|
||||
Specifically, this means there is a fake thread in action other than the
|
||||
"pulse" thread and the main test thread.
|
||||
"""
|
||||
global pending_events
|
||||
global allowable_empty_sleeps
|
||||
if len(pending_events) == 0:
|
||||
allowable_empty_sleeps -= 1
|
||||
if allowable_empty_sleeps < 0:
|
||||
fail("Trying to sleep when no events are pending.")
|
||||
return len(fake_threads) >= 2
|
||||
|
||||
global sleep_entrance_count
|
||||
sleep_entrance_count += 1
|
||||
time_to_sleep = float(time_to_sleep)
|
||||
|
||||
run_once = False # Ensure simulator runs even if the sleep time is zero.
|
||||
while not run_once or time_to_sleep > 0:
|
||||
run_once = True
|
||||
itr_sleep = 0.5
|
||||
for i in range(len(pending_events)):
|
||||
event = pending_events[i]
|
||||
event["time"] = event["time"] - itr_sleep
|
||||
if event["func"] is not None and event["time"] < 0:
|
||||
# Call event, but first delete it so this function can be
|
||||
# reentrant.
|
||||
func = event["func"]
|
||||
event["func"] = None
|
||||
try:
|
||||
func()
|
||||
except Exception:
|
||||
LOG.exception("Simulated event error.")
|
||||
time_to_sleep -= itr_sleep
|
||||
sleep_entrance_count -= 1
|
||||
if sleep_entrance_count < 1:
|
||||
# Clear out old events
|
||||
pending_events = [event for event in pending_events
|
||||
if event["func"] is not None]
|
||||
def fake_sleep(time_to_sleep):
|
||||
"""Simulates sleep.
|
||||
|
||||
Puts the coroutine which calls it to sleep. If a coroutine object is not
|
||||
associated with the caller this will fail.
|
||||
"""
|
||||
global sleep_allowance
|
||||
sleep_allowance -= 1
|
||||
if not other_threads_are_active():
|
||||
if sleep_allowance < -1:
|
||||
raise RuntimeError("Sleeping for no reason.")
|
||||
else:
|
||||
return # Forgive the thread for calling this for one time.
|
||||
sleep_allowance = allowable_empty_sleeps
|
||||
|
||||
cr = Coroutine.get_current()
|
||||
for ft in fake_threads:
|
||||
if ft['greenlet'].id == cr.id:
|
||||
ft['next_sleep_time'] = time_to_sleep
|
||||
|
||||
cr.sleep()
|
||||
|
||||
|
||||
def fake_poll_until(retriever, condition=lambda value: value,
|
||||
sleep_time=1, time_out=None):
|
||||
"""Retrieves object until it passes condition, then returns it.
|
||||
|
||||
If time_out_limit is passed in, PollTimeOut will be raised once that
|
||||
amount of time is eclipsed.
|
||||
|
||||
"""
|
||||
"""Fakes out poll until."""
|
||||
from trove.common import exception
|
||||
slept_time = 0
|
||||
while True:
|
||||
resource = retriever()
|
||||
if condition(resource):
|
||||
return resource
|
||||
event_simulator_sleep(sleep_time)
|
||||
fake_sleep(sleep_time)
|
||||
slept_time += sleep_time
|
||||
if time_out and slept_time >= time_out:
|
||||
raise exception.PollTimeOut()
|
||||
|
||||
|
||||
def run_main(func):
|
||||
"""Runs the given function as the initial thread of the event simulator."""
|
||||
global main_greenlet
|
||||
main_greenlet = Coroutine(main_loop)
|
||||
fake_spawn(0, func)
|
||||
main_greenlet.run()
|
||||
|
||||
|
||||
def main_loop():
|
||||
"""The coroutine responsible for calling each "fake thread."
|
||||
|
||||
The Coroutine which calls this is the only one that won't end up being
|
||||
associated with the fake_threads list. The reason is this loop needs to
|
||||
wait on whatever thread is running, meaning it has to be a Coroutine as
|
||||
well.
|
||||
"""
|
||||
while len(fake_threads) > 0:
|
||||
pulse(0.1)
|
||||
|
||||
|
||||
def fake_spawn_n(func, *args, **kw):
|
||||
fake_spawn(0, func, *args, **kw)
|
||||
|
||||
|
||||
def fake_spawn(time_from_now_in_seconds, func, *args, **kw):
|
||||
"""Fakes eventlet's spawn function by adding a fake thread."""
|
||||
def thread_start():
|
||||
#fake_sleep(time_from_now_in_seconds)
|
||||
return func(*args, **kw)
|
||||
|
||||
cr = Coroutine(thread_start)
|
||||
fake_threads.append({'sleep': time_from_now_in_seconds,
|
||||
'greenlet': cr,
|
||||
'name': str(func)})
|
||||
|
||||
|
||||
def pulse(seconds):
|
||||
"""
|
||||
Runs the event simulator for the amount of simulated time denoted by
|
||||
"seconds".
|
||||
"""
|
||||
index = 0
|
||||
while index < len(fake_threads):
|
||||
t = fake_threads[index]
|
||||
t['sleep'] -= seconds
|
||||
if t['sleep'] <= 0:
|
||||
t['sleep'] = 0
|
||||
t['next_sleep_time'] = None
|
||||
t['greenlet'].run()
|
||||
sleep_time = t['next_sleep_time']
|
||||
if sleep_time is None or isinstance(sleep_time, tuple):
|
||||
del fake_threads[index]
|
||||
index -= 1
|
||||
else:
|
||||
t['sleep'] = sleep_time
|
||||
index += 1
|
||||
|
||||
|
||||
def wait_until_all_activity_stops():
|
||||
"""In fake mode, wait for all simulated events to chill out.
|
||||
|
||||
This can be useful in situations where you need simulated activity (such
|
||||
as calls running in TaskManager) to "bleed out" and finish before running
|
||||
another test.
|
||||
|
||||
"""
|
||||
if main_greenlet is None:
|
||||
return
|
||||
while other_threads_are_active():
|
||||
fake_sleep(1)
|
||||
|
||||
|
||||
def monkey_patch():
|
||||
"""
|
||||
Changes global functions such as time.sleep, eventlet.spawn* and others
|
||||
to their event_simulator equivalents.
|
||||
"""
|
||||
import time
|
||||
time.sleep = event_simulator_sleep
|
||||
time.sleep = fake_sleep
|
||||
import eventlet
|
||||
from eventlet import greenthread
|
||||
eventlet.sleep = event_simulator_sleep
|
||||
greenthread.sleep = event_simulator_sleep
|
||||
eventlet.spawn_after = event_simulator_spawn_after
|
||||
eventlet.spawn_n = event_simulator_spawn
|
||||
eventlet.spawn = NotImplementedError
|
||||
eventlet.sleep = fake_sleep
|
||||
greenthread.sleep = fake_sleep
|
||||
eventlet.spawn_after = fake_spawn
|
||||
|
||||
def raise_error():
|
||||
raise RuntimeError("Illegal operation!")
|
||||
|
||||
eventlet.spawn_n = fake_spawn_n
|
||||
eventlet.spawn = raise_error
|
||||
from trove.common import utils
|
||||
utils.poll_until = fake_poll_until
|
||||
|
Loading…
x
Reference in New Issue
Block a user