From 3eef684d05118874a0208f0b871162607e84eb8a Mon Sep 17 00:00:00 2001 From: Nataliia Uvarova Date: Thu, 31 Jul 2014 12:29:42 +0200 Subject: [PATCH] Improve benchmarking tool This commit adds several enhancements to benchmarking tool: server_url and path to messages now can be configured in config file. Default output of program has been changed: now it prints values in json so they can be parsed more easily. Previous human readable representation is accessible via --verbose flag. The `total_requests` metric now shows all performed requests (either failed or successful) and new metric - `successful_requests` - was introduced to store count of successful requests. Change-Id: Id6fe4b2046394a348ba07eb5b2b003c6024b78b0 Partially-implements: blueprint gen-bench-reports --- README.rst | 53 ++++++++++++ .../zaqar-benchmark-messages.json | 0 etc/zaqar-benchmark.conf.sample | 5 ++ zaqar/bench/conductor.py | 17 +++- zaqar/bench/{cli_config.py => config.py} | 5 +- zaqar/bench/consumer.py | 59 ++++++------- zaqar/bench/producer.py | 86 +++++++++---------- 7 files changed, 151 insertions(+), 74 deletions(-) rename zaqar/bench/messages.json => etc/zaqar-benchmark-messages.json (100%) create mode 100644 etc/zaqar-benchmark.conf.sample rename zaqar/bench/{cli_config.py => config.py} (81%) diff --git a/README.rst b/README.rst index e713a3ac4..f6dbaad3d 100644 --- a/README.rst +++ b/README.rst @@ -82,6 +82,59 @@ And then run tests:: You can read more about running functional tests in separate `TESTS_README`_. +Running the benchmarking tool +---------------------- + +First install and run zaqar-server (see above). + +Then install additional requirements:: + + pip install -r bench-requirements.txt + +Copy the configuration file to ``~/.zaqar``:: + + cp etc/zaqar-benchmark.conf.sample ~/.zaqar/zaqar-benchmark.conf + +In the configuration file specify where zaqar-server can be found:: + + server_url = http://localhost:8888 + +The benchmarking tool needs a set of messages to work with. Specify the path to the file with messages +in the configuration file. Alternatively, put it in the directory with the configuration file and name it +``zaqar-benchmark-messages.json``. As a starting point, you can use the sample file from the ``etc`` directory: + + cp etc/zaqar-benchmark-messages.json ~/.zaqar/ + +If the file is not found or no file is specified, a single hard-coded message is used for all requests. + +Run the benchmarking tool using the following command:: + + zaqar-bench-pc --processes 2 --workers 2 --time 10 + +By default, the results are in JSON. For more human-readable output add the ``--verbose`` flag. +Verbose output looks similar to the following:: + + Starting Producer... + + Starting Consumer... + Params + processes: 2.0 + workers: 2.0 + + Consumer + duration_sec: 4.2 + ms_per_req: 38.9 + total_reqs: 104.0 + successful_reqs: 104.0 + reqs_per_sec: 24.8 + + Producer + duration_sec: 4.1 + ms_per_req: 6.9 + total_reqs: 575.0 + successful_reqs: 575.0 + reqs_per_sec: 138.6 + .. _`OpenStack` : http://openstack.org/ .. _`MongoDB` : http://docs.mongodb.org/manual/installation/ .. _`pyenv` : https://github.com/yyuu/pyenv/ diff --git a/zaqar/bench/messages.json b/etc/zaqar-benchmark-messages.json similarity index 100% rename from zaqar/bench/messages.json rename to etc/zaqar-benchmark-messages.json diff --git a/etc/zaqar-benchmark.conf.sample b/etc/zaqar-benchmark.conf.sample new file mode 100644 index 000000000..6e190d4b2 --- /dev/null +++ b/etc/zaqar-benchmark.conf.sample @@ -0,0 +1,5 @@ +[DEFAULT] +# verbose = False +# server_url = http://localhost:8888 +# messages_path = some/path/to/messages.json +# queue_prefix = ogre-test-queue- diff --git a/zaqar/bench/conductor.py b/zaqar/bench/conductor.py index 77bce0dbf..2d395a5f9 100644 --- a/zaqar/bench/conductor.py +++ b/zaqar/bench/conductor.py @@ -14,16 +14,31 @@ from __future__ import print_function +import json import multiprocessing as mp +from zaqar.bench.config import conf from zaqar.bench import consumer from zaqar.bench import producer def main(): - procs = [mp.Process(target=worker.run) + downstream_queue = mp.Queue() + procs = [mp.Process(target=worker.run, args=(downstream_queue,)) for worker in [producer, consumer]] for each_proc in procs: each_proc.start() for each_proc in procs: each_proc.join() + + stats = {'params': {'processes': conf.processes, 'workers': conf.workers}} + for each_proc in procs: + stats.update(downstream_queue.get_nowait()) + + if conf.verbose: + for name, stat in stats.items(): + print(name.capitalize()) + print("\n".join("{}: {:.1f}".format(*it) for it in stat.items())) + print('') # Blank line + else: + print(json.dumps(stats)) diff --git a/zaqar/bench/cli_config.py b/zaqar/bench/config.py similarity index 81% rename from zaqar/bench/cli_config.py rename to zaqar/bench/config.py index c160899c2..84fe99352 100644 --- a/zaqar/bench/cli_config.py +++ b/zaqar/bench/config.py @@ -28,6 +28,9 @@ _CLI_OPTIONS = ( default=psutil.NUM_CPUS * 2, help='Number of Workers'), cfg.IntOpt('time', short='t', default=3, help="time in seconds"), + cfg.StrOpt('server_url', short='s', default='http://localhost:8888'), + cfg.StrOpt('queue_prefix', short='q', default='ogre-test-queue-'), + cfg.StrOpt('messages_path', short='m') ) conf.register_cli_opts(_CLI_OPTIONS) -conf(project='zaqar', prog='zaqar-queues') +conf(project='zaqar', prog='zaqar-benchmark') diff --git a/zaqar/bench/consumer.py b/zaqar/bench/consumer.py index 2de6a8c53..38a9d674f 100644 --- a/zaqar/bench/consumer.py +++ b/zaqar/bench/consumer.py @@ -13,8 +13,10 @@ # limitations under the License. from __future__ import division +from __future__ import print_function import multiprocessing as mp +import sys import time from gevent import monkey as curious_george @@ -24,10 +26,7 @@ import marktime from zaqarclient.queues.v1 import client from zaqarclient.transport.errors import TransportError -from zaqar.bench.cli_config import conf - -URL = 'http://localhost:8888' -QUEUE_PREFIX = 'ogre-test-queue-' +from zaqar.bench.config import conf def claim_delete(stats, test_duration, ttl, grace, limit): @@ -38,8 +37,8 @@ def claim_delete(stats, test_duration, ttl, grace, limit): delete is recorded for calculating throughput and latency. """ - cli = client.Client(URL) - queue = cli.queue(QUEUE_PREFIX + '1') + cli = client.Client(conf.server_url) + queue = cli.queue(conf.queue_prefix + '1') end = time.time() + test_duration total_elapsed = 0 total_requests = 0 @@ -52,11 +51,10 @@ def claim_delete(stats, test_duration, ttl, grace, limit): claim = queue.claim(ttl=ttl, grace=grace, limit=limit) except TransportError as ex: - print ("Could not claim messages : {0}".format(ex)) + sys.stderr.write("Could not claim messages : {0}\n".format(ex)) else: total_elapsed += marktime.stop('claim_message').seconds - total_requests += 1 claim_total_requests += 1 try: @@ -68,14 +66,19 @@ def claim_delete(stats, test_duration, ttl, grace, limit): total_elapsed += marktime.stop('delete_message').seconds delete_total_requests += 1 - total_requests += 1 - stats.put({'total_requests': total_requests, - 'claim_total_requests': claim_total_requests, - 'delete_total_requests': delete_total_requests, - 'total_elapsed': total_elapsed}) except TransportError as ex: - print ("Could not claim and delete : {0}".format(ex)) + sys.stderr.write("Could not delete messages: {0}\n".format(ex)) + + finally: + total_requests += 1 + finally: + total_requests += 1 + + stats.put({'total_requests': total_requests, + 'claim_total_requests': claim_total_requests, + 'delete_total_requests': delete_total_requests, + 'total_elapsed': total_elapsed}) def load_generator(stats, num_workers, test_duration, url, ttl, grace, limit): @@ -103,18 +106,19 @@ def crunch(stats): delete_total_requests) -def run(): +def run(upstream_queue): num_procs = conf.processes num_workers = conf.workers test_duration = conf.time stats = mp.Queue() # TODO(TheSriram) : Make ttl,grace and limit configurable - args = (stats, num_workers, test_duration, URL, 300, 200, 1) + args = (stats, num_workers, test_duration, conf.server_url, 300, 200, 1) procs = [mp.Process(target=load_generator, args=args) for _ in range(num_procs)] - print ("\nStarting Consumer...") + if conf.verbose: + print("\nStarting Consumer...") start = time.time() @@ -126,17 +130,14 @@ def run(): (total_requests, total_latency, claim_total_requests, delete_total_requests) = crunch(stats) + successful_requests = claim_total_requests + delete_total_requests duration = time.time() - start - throughput = total_requests / duration - latency = 1000 * total_latency / total_requests + throughput = successful_requests / duration + latency = 1000 * total_latency / successful_requests - print('Duration: {0:.1f} sec'.format(duration)) - print('Total Requests: {0}'.format(total_requests)) - print('Throughput: {0:.0f} req/sec'.format(throughput)) - print('Latency: {0:.1f} ms/req'.format(latency)) - - print('') # Blank line - - -def main(): - run() + upstream_queue.put({'consumer': { + 'duration_sec': duration, + 'total_reqs': total_requests, + 'successful_reqs': successful_requests, + 'reqs_per_sec': throughput, + 'ms_per_req': latency}}) diff --git a/zaqar/bench/producer.py b/zaqar/bench/producer.py index 890e89492..54ce18c08 100644 --- a/zaqar/bench/producer.py +++ b/zaqar/bench/producer.py @@ -13,10 +13,10 @@ # limitations under the License. from __future__ import division +from __future__ import print_function import json import multiprocessing as mp -import os import random import sys import time @@ -28,26 +28,10 @@ import marktime from zaqarclient.queues.v1 import client from zaqarclient.transport.errors import TransportError -from zaqar.bench.cli_config import conf +from zaqar.bench.config import conf -# TODO(TheSriram): Make configurable -URL = 'http://localhost:8888' -QUEUE_PREFIX = 'ogre-test-queue-' - -# TODO(TheSriram) : Migrate from env variable to config -if os.environ.get('MESSAGES_PATH'): - with open(os.environ.get('MESSAGES_PATH')) as f: - message_pool = json.loads(f.read()) -else: - print("Error : $MESSAGES_PATH needs to be set") - sys.exit(1) - - -message_pool.sort(key=lambda msg: msg['weight']) - - -def choose_message(): +def choose_message(message_pool): """Choose a message from our pool of possibilities.""" # Assume message_pool is sorted by weight, ascending @@ -62,6 +46,21 @@ def choose_message(): assert False +def load_messages(): + default_file_name = 'zaqar-benchmark-messages.json' + messages_path = conf.messages_path or conf.find_file(default_file_name) + if messages_path: + with open(messages_path) as f: + message_pool = json.load(f) + message_pool.sort(key=lambda msg: msg['weight']) + return message_pool + else: + return [{"weight": 1.0, + "doc": {"ttl": 60, + "body": {"id": "7FA23C90-62F7-40D2-9360-FBD5D7D61CD1", + "evt": "Single"}}}] + + def producer(stats, test_duration): """Producer Worker @@ -70,10 +69,12 @@ def producer(stats, test_duration): is recorded for calculating throughput and latency. """ - cli = client.Client(URL) - queue = cli.queue(QUEUE_PREFIX + '1') + cli = client.Client(conf.server_url) + queue = cli.queue(conf.queue_prefix + '1') + message_pool = load_messages() total_requests = 0 + successful_requests = 0 total_elapsed = 0 end = time.time() + test_duration @@ -82,16 +83,20 @@ def producer(stats, test_duration): # TODO(TheSriram): Track/report errors try: - queue.post(choose_message()) + queue.post(choose_message(message_pool)) except TransportError as ex: - print("Could not post a message : {0}".format(ex)) + sys.stderr.write("Could not post a message : {0}\n".format(ex)) else: + successful_requests += 1 total_elapsed += marktime.stop('post message').seconds + + finally: total_requests += 1 stats.put({ + 'successful_requests': successful_requests, 'total_requests': total_requests, 'total_elapsed': total_elapsed }) @@ -113,17 +118,18 @@ def load_generator(stats, num_workers, test_duration): def crunch(stats): total_requests = 0 total_latency = 0.0 + successful_requests = 0 while not stats.empty(): entry = stats.get_nowait() total_requests += entry['total_requests'] total_latency += entry['total_elapsed'] + successful_requests += entry['successful_requests'] - return total_requests, total_latency + return successful_requests, total_requests, total_latency -def run(): - +def run(upstream_queue): num_procs = conf.processes num_workers = conf.workers test_duration = conf.time @@ -139,7 +145,8 @@ def run(): for _ in range(num_procs) ] - print('\nStarting Producer...') + if conf.verbose: + print('\nStarting Producer...') start = time.time() for each_proc in procs: @@ -148,22 +155,15 @@ def run(): for each_proc in procs: each_proc.join() - total_requests, total_latency = crunch(stats) + successful_requests, total_requests, total_latency = crunch(stats) - # TODO(TheSriram): Add one more stat: "attempted req/sec" so can - # graph that on the x axis vs. achieved throughput and - # latency. duration = time.time() - start - throughput = total_requests / duration - latency = 1000 * total_latency / total_requests + throughput = successful_requests / duration + latency = 1000 * total_latency / successful_requests - print('Duration: {0:.1f} sec'.format(duration)) - print('Total Requests: {0}'.format(total_requests)) - print('Throughput: {0:.0f} req/sec'.format(throughput)) - print('Latency: {0:.1f} ms/req'.format(latency)) - - print('') # Blank line - - -def main(): - run() + upstream_queue.put({'producer': { + 'duration_sec': duration, + 'total_reqs': total_requests, + 'successful_reqs': successful_requests, + 'reqs_per_sec': throughput, + 'ms_per_req': latency}})