diff --git a/tools/simulator.py b/tools/simulator.py index 2f54e0b3d..843abc8ce 100755 --- a/tools/simulator.py +++ b/tools/simulator.py @@ -10,19 +10,14 @@ # License for the specific language governing permissions and limitations # under the License. -# Usage example: -# python tools/simulator.py \ -# --url rabbit://stackrabbit:secretrabbit@localhost/ rpc-server -# python tools/simulator.py -# --url rabbit://stackrabbit:secretrabbit@localhost/ rpc-client \ -# --exit-wait 15000 -p 64 -m 64 - import eventlet eventlet.monkey_patch() import argparse +import datetime import logging import sys +import threading import time from oslo_config import cfg @@ -30,8 +25,19 @@ import oslo_messaging as messaging from oslo_messaging import notify # noqa from oslo_messaging import rpc # noqa + LOG = logging.getLogger() +USAGE = """ Usage: ./simulator.py [-h] [--url URL] [-d DEBUG]\ + {notify-server,notify-client,rpc-server,rpc-client} ... + +Usage example: + python tools/simulator.py\ + --url rabbit://stackrabbit:secretrabbit@localhost/ rpc-server + python tools/simulator.py\ + --url rabbit://stackrabbit:secretrabbit@localhost/ rpc-client\ + --exit-wait 15000 -p 64 -m 64""" + class LoggingNoParsingFilter(logging.Filter): def filter(self, record): @@ -75,7 +81,7 @@ class RpcEndpoint(object): self.wait_before_answer = wait_before_answer def info(self, ctxt, message): - i = int(message.replace('test ', '')) + i = int(message.split(' ')[-1]) if self.count is None: self.count = i elif i == 0: @@ -89,11 +95,29 @@ class RpcEndpoint(object): return "OK: %s" % message -def rpc_server(transport, wait_before_answer): - endpoints = [RpcEndpoint(wait_before_answer)] - target = messaging.Target(topic='t1', server='moi') - server = rpc.get_rpc_server(transport, target, - endpoints, executor='eventlet') +class RpcEndpointMonitor(RpcEndpoint): + def __init__(self, *args, **kwargs): + super(RpcEndpointMonitor, self).__init__(*args, **kwargs) + + self._count = self._prev_count = 0 + self._monitor() + + def _monitor(self): + threading.Timer(1.0, self._monitor).start() + print ("%d msg was received per second" + % (self._count - self._prev_count)) + self._prev_count = self._count + + def info(self, *args, **kwargs): + self._count += 1 + super(RpcEndpointMonitor, self).info(*args, **kwargs) + + +def rpc_server(transport, target, wait_before_answer, executor, show_stats): + endpoint_cls = RpcEndpointMonitor if show_stats else RpcEndpoint + endpoints = [endpoint_cls(wait_before_answer)] + server = rpc.get_rpc_server(transport, target, endpoints, + executor=executor) server.start() server.wait() @@ -105,23 +129,38 @@ def threads_spawner(threads, method, *args, **kwargs): p.waitall() -def rpc_call(_id, transport, messages, wait_after_msg, timeout): - target = messaging.Target(topic='t1', server='moi') - c = rpc.RPCClient(transport, target) - c = c.prepare(timeout=timeout) +def send_msg(_id, transport, target, messages, wait_after_msg, timeout, + is_cast): + client = rpc.RPCClient(transport, target) + client = client.prepare(timeout=timeout) + rpc_method = _rpc_cast if is_cast else _rpc_call + for i in range(0, messages): - payload = "test %d" % i - LOG.info("SEND: %s" % payload) - try: - res = c.call({}, 'info', message=payload) - except Exception: - LOG.exception('no RCV for %s' % i) - else: - LOG.info("RCV: %s" % res) + msg = "test message %d" % i + LOG.info("SEND: %s" % msg) + rpc_method(client, msg) if wait_after_msg > 0: time.sleep(wait_after_msg) +def _rpc_call(client, msg): + try: + res = client.call({}, 'info', message=msg) + except Exception as e: + LOG.exception('Error %s on CALL for message %s' % (str(e), msg)) + else: + LOG.info("SENT: %s, RCV: %s" % (msg, res)) + + +def _rpc_cast(client, msg): + try: + client.cast({}, 'info', message=msg) + except Exception as e: + LOG.exception('Error %s on CAST for message %s' % (str(e), msg)) + else: + LOG.info("SENT: %s" % msg) + + def notifier(_id, transport, messages, wait_after_msg, timeout): n1 = notify.Notifier(transport, topic="n-t1").prepare( publisher_id='publisher-%d' % _id) @@ -137,11 +176,26 @@ def notifier(_id, transport, messages, wait_after_msg, timeout): time.sleep(wait_after_msg) +def _setup_logging(is_debug): + log_level = logging.DEBUG if is_debug else logging.WARN + logging.basicConfig(stream=sys.stdout, level=log_level) + logging.getLogger().handlers[0].addFilter(LoggingNoParsingFilter()) + for i in ['kombu', 'amqp', 'stevedore', 'qpid.messaging' + 'oslo.messaging._drivers.amqp', ]: + logging.getLogger(i).setLevel(logging.WARN) + + def main(): - parser = argparse.ArgumentParser(description='RPC DEMO') + parser = argparse.ArgumentParser( + description='Tools to play with oslo.messaging\'s RPC', + usage=USAGE, + ) parser.add_argument('--url', dest='url', default='rabbit://guest:password@localhost/', help="oslo.messaging transport url") + parser.add_argument('-d', '--debug', dest='debug', type=bool, + default=False, + help="Turn on DEBUG logging level instead of WARN") subparsers = parser.add_subparsers(dest='mode', help='notify/rpc server/client mode') @@ -158,6 +212,11 @@ def main(): server = subparsers.add_parser('rpc-server') server.add_argument('-w', dest='wait_before_answer', type=int, default=-1) + server.add_argument('--show-stats', dest='show_stats', + type=bool, default=True) + server.add_argument('-e', '--executor', dest='executor', + type=str, default='eventlet', + help='name of a message executor') client = subparsers.add_parser('rpc-client') client.add_argument('-p', dest='threads', type=int, default=1, @@ -171,34 +230,41 @@ def main(): client.add_argument('--exit-wait', dest='exit_wait', type=int, default=0, help='Keep connections open N seconds after calls ' 'have been done') + client.add_argument('--is-cast', dest='is_cast', type=bool, default=False, + help='Use `call` or `cast` RPC methods') args = parser.parse_args() - # Setup logging - logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) - logging.getLogger().handlers[0].addFilter(LoggingNoParsingFilter()) - for i in ['kombu', 'amqp', 'stevedore', 'qpid.messaging' - 'oslo.messaging._drivers.amqp', ]: - logging.getLogger(i).setLevel(logging.WARN) + _setup_logging(is_debug=args.debug) # oslo.config defaults cfg.CONF.heartbeat_interval = 5 cfg.CONF.notification_topics = "notif" cfg.CONF.notification_driver = "messaging" - # the transport transport = messaging.get_transport(cfg.CONF, url=args.url) + target = messaging.Target(topic='profiler_topic', server='profiler_server') if args.mode == 'rpc-server': - rpc_server(transport, args.wait_before_answer) + if args.url.startswith('zmq'): + cfg.CONF.rpc_zmq_matchmaker = "redis" + transport._driver.matchmaker._redis.flushdb() + rpc_server(transport, target, args.wait_before_answer, args.executor, + args.show_stats) elif args.mode == 'notify-server': notify_server(transport) elif args.mode == 'notify-client': threads_spawner(args.threads, notifier, transport, args.messages, args.wait_after_msg, args.timeout) elif args.mode == 'rpc-client': - threads_spawner(args.threads, rpc_call, transport, args.messages, - args.wait_after_msg, args.timeout) + start = datetime.datetime.now() + threads_spawner(args.threads, send_msg, transport, target, + args.messages, args.wait_after_msg, args.timeout, + args.is_cast) + time_ellapsed = (datetime.datetime.now() - start).total_seconds() + msg_count = args.messages * args.threads + print ('%d messages was sent for %s seconds. Bandwight is %s msg/sec' + % (msg_count, time_ellapsed, (msg_count / time_ellapsed))) LOG.info("calls finished, wait %d seconds" % args.exit_wait) time.sleep(args.exit_wait)