From 07ddeee517bcea5fae713958161e63a2e303dee6 Mon Sep 17 00:00:00 2001 From: kgriffs Date: Fri, 22 Aug 2014 16:02:24 -0500 Subject: [PATCH] Add observer role to benchmark tool This patch introduces a new observer role, which lists messages but does not claim them. As part of this work the config options were updated and the defaults adjusted to provide a better "kick the tires" experience. The default number of procs and workers is now hard-coded rather than being based on number of available CPUs, since the number of workers you may want to run is more dependent on the size of your Zaqar deployment and your network bandwidth than it is on the number of CPUs on the load generator. Finally, the "-pc" suffix was removed from the command name. This was included in this patch because it didn't seem significant enough to split out. Change-Id: I8a8190fb2cebc3489c78da4f6e1e7c51d8b97017 --- README.rst | 55 ++++++------ bench-requirements.txt | 1 - setup.cfg | 2 +- zaqar/bench/conductor.py | 70 +++++++++++---- zaqar/bench/config.py | 37 ++++++-- zaqar/bench/consumer.py | 23 ++--- zaqar/bench/observer.py | 178 +++++++++++++++++++++++++++++++++++++++ zaqar/bench/producer.py | 23 ++--- 8 files changed, 319 insertions(+), 70 deletions(-) create mode 100644 zaqar/bench/observer.py diff --git a/README.rst b/README.rst index 5343de67b..16b0c2575 100644 --- a/README.rst +++ b/README.rst @@ -112,40 +112,45 @@ is used for all requests. Run the benchmarking tool using the following command:: - $ zaqar-bench-pc + $ zaqar-bench -By default, the command will run a performance test for 3 seconds, using one -consumer and one producer for each CPU on the system, with 2 greenlet workers -per CPU per process. You can override these defaults in the config file or on -the command line using a variety of options. For example, the following -command runs a performance test for 10 seconds using 4 producer processes with -20 workers each, plus 1 consumer process with 4 workers:: +By default, the command will run a performance test for 5 seconds, using one +producer process with 10 greenlet workers, and one observer process with +5 workers. The consumer role is disabled by default. - $ zaqar-bench-pc -pp 4 -pw 20 -cp 1 -cw 4 -t 10 +You can override these defaults in the config file or on the command line +using a variety of options. For example, the following command runs a +performance test for 30 seconds using 4 producer processes with +20 workers each, plus 4 consumer processes with 20 workers each. Note that +the observer role is also disabled in this example by setting its number of +workers to zero:: -By default, the results are in JSON. For more human-readable output add the ``--verbose`` flag. -Verbose output looks similar to the following:: + $ zaqar-bench -pp 4 -pw 10 -cw 4 -cw 20 -ow 0 -t 30 - Starting Producer... +By default, the results are in JSON. For more human-readable output add +the ``--verbose`` flag. Verbose output looks similar to the following:: - Starting Consumer... + $ zaqar-bench --verbose - Consumer - ======== - duration_sec: 10.2 - ms_per_claim: 37.6 - ms_per_delete: 11.8 - reqs_per_sec: 82.0 - successful_reqs: 833.0 - total_reqs: 833.0 + Starting producer (pp=1 , pw=10)... + + Starting observer (op=1 , ow=5)... Producer ======== - duration_sec: 10.2 - ms_per_req: 3.8 - reqs_per_sec: 1033.6 - successful_reqs: 10523.0 - total_reqs: 10523.0 + duration_sec: 5.1 + ms_per_req: 2.9 + reqs_per_sec: 344.5 + successful_reqs: 1742.0 + total_reqs: 1742.0 + + Observer + ======== + duration_sec: 5.0 + ms_per_req: 2.9 + reqs_per_sec: 339.3 + successful_reqs: 1706.0 + total_reqs: 1706.0 .. _`OpenStack` : http://openstack.org/ diff --git a/bench-requirements.txt b/bench-requirements.txt index 939afe982..7b0d1dd25 100644 --- a/bench-requirements.txt +++ b/bench-requirements.txt @@ -1,5 +1,4 @@ argparse>=1.2.1 gevent>=1.0.1 marktime>=0.2.0 -psutil>=2.1.1 python-zaqarclient>=0.0.2 diff --git a/setup.cfg b/setup.cfg index ac53d0c05..7e507b675 100644 --- a/setup.cfg +++ b/setup.cfg @@ -32,7 +32,7 @@ source-dir = doc/source [entry_points] console_scripts = - zaqar-bench-pc = zaqar.bench.conductor:main + zaqar-bench = zaqar.bench.conductor:main zaqar-server = zaqar.cmd.server:run marconi-server = zaqar.cmd.server:run diff --git a/zaqar/bench/conductor.py b/zaqar/bench/conductor.py index 24ac8fea5..ba92fcd6a 100644 --- a/zaqar/bench/conductor.py +++ b/zaqar/bench/conductor.py @@ -17,17 +17,52 @@ from __future__ import print_function import json import multiprocessing as mp -from zaqar.bench.config import conf +from zaqarclient.queues.v1 import client + +from zaqar.bench import config from zaqar.bench import consumer +from zaqar.bench import observer from zaqar.bench import producer +CONF = config.conf + + +def _print_verbose_stats(name, stats): + print(name.capitalize()) + print('=' * len(name)) + + values = sorted(stats.items(), key=lambda v: v[0]) + formatted_vals = ['{}: {:.1f}'.format(*v) for v in values] + + print('\n'.join(formatted_vals)) + print() # Blank line + + +def _reset_queues(): + cli = client.Client(CONF.server_url) + + for i in range(CONF.num_queues): + # TODO(kgriffs): DRY up name generation so it is done + # in a helper, vs. being copy-pasted everywhere. + queue = cli.queue(CONF.queue_prefix + '-' + str(i)) + queue.delete() + def main(): - conf(project='zaqar', prog='zaqar-benchmark') + CONF(project='zaqar', prog='zaqar-benchmark') + + # NOTE(kgriffs): Reset queues since last time. We don't + # clean them up after the performance test, in case + # the user wants to examine the state of the system. + if not CONF.skip_queue_reset: + if CONF.verbose: + print('Resetting queues...') + + _reset_queues() downstream_queue = mp.Queue() procs = [mp.Process(target=worker.run, args=(downstream_queue,)) - for worker in [producer, consumer]] + for worker in [producer, consumer, observer]] for each_proc in procs: each_proc.start() @@ -39,29 +74,32 @@ def main(): for each_proc in procs: stats.update(downstream_queue.get_nowait()) - if conf.verbose: + if CONF.verbose: print() - for name, stats_group in stats.items(): - print(name.capitalize()) - print('=' * len(name)) + for name in ('producer', 'observer', 'consumer'): + stats_group = stats[name] - values = sorted(stats_group.items(), key=lambda v: v[0]) - formatted_vals = ["{}: {:.1f}".format(*v) for v in values] + # Skip disabled workers + if not stats_group['duration_sec']: + continue - print("\n".join(formatted_vals)) - print('') # Blank line + _print_verbose_stats(name, stats_group) else: stats['params'] = { 'producer': { - 'processes': conf.producer_processes, - 'workers': conf.producer_workers + 'processes': CONF.producer_processes, + 'workers': CONF.producer_workers }, 'consumer': { - 'processes': conf.consumer_processes, - 'workers': conf.consumer_workers - } + 'processes': CONF.consumer_processes, + 'workers': CONF.consumer_workers + }, + 'observer': { + 'processes': CONF.observer_processes, + 'workers': CONF.observer_workers + }, } print(json.dumps(stats)) diff --git a/zaqar/bench/config.py b/zaqar/bench/config.py index 03dcc108b..e2f7fc76a 100644 --- a/zaqar/bench/config.py +++ b/zaqar/bench/config.py @@ -13,38 +13,61 @@ # limitations under the License. from oslo.config import cfg -import psutil conf = cfg.CONF _CLI_OPTIONS = ( cfg.IntOpt( 'producer_processes', short='pp', - default=psutil.NUM_CPUS, + default=1, help='Number of Producer Processes'), cfg.IntOpt( 'producer_workers', short='pw', - default=psutil.NUM_CPUS * 2, + default=10, help='Number of Producer Workers'), + cfg.IntOpt( 'consumer_processes', short='cp', - default=psutil.NUM_CPUS, + default=1, help='Number of Consumer Processes'), cfg.IntOpt( 'consumer_workers', short='cw', - default=psutil.NUM_CPUS * 2, + default=0, help='Number of Consumer Workers'), + + cfg.IntOpt( + 'observer_processes', + short='op', + default=1, + help='Number of Observer Processes'), + cfg.IntOpt( + 'observer_workers', + short='ow', + default=5, + help='Number of Observer Workers'), + cfg.IntOpt('messages_per_claim', short='cno', default=5, help=('Number of messages the consumer will attempt to ' 'claim at a time')), - cfg.IntOpt('time', short='t', default=3, + cfg.IntOpt('messages_per_list', short='lno', default=5, + help=('Number of messages the obserer will attempt to ' + 'list at a time')), + + cfg.IntOpt('time', short='t', default=5, help="Duration of the performance test, in seconds"), + cfg.StrOpt('server_url', short='s', default='http://localhost:8888'), + cfg.StrOpt('queue_prefix', short='q', default='ogre-test-queue'), cfg.IntOpt('num_queues', short='qno', default=4), - cfg.StrOpt('messages_path', short='m') + + cfg.StrOpt('messages_path', short='m'), + + cfg.BoolOpt('skip_queue_reset', default=False, + help=('Do not reset queues before running' + 'the performance test')), ) conf.register_cli_opts(_CLI_OPTIONS) diff --git a/zaqar/bench/consumer.py b/zaqar/bench/consumer.py index 44da977a2..68d3e2658 100644 --- a/zaqar/bench/consumer.py +++ b/zaqar/bench/consumer.py @@ -27,7 +27,9 @@ import marktime from zaqarclient.queues.v1 import client from zaqarclient.transport.errors import TransportError -from zaqar.bench.config import conf +from zaqar.bench import config + +CONF = config.conf def claim_delete(queues, stats, test_duration, ttl, grace, limit): @@ -93,8 +95,8 @@ def claim_delete(queues, stats, test_duration, ttl, grace, limit): def load_generator(stats, num_workers, num_queues, test_duration, url, ttl, grace, limit): - cli = client.Client(conf.server_url) - queues = [cli.queue(conf.queue_prefix + '-' + str(i)) + cli = client.Client(CONF.server_url) + queues = [cli.queue(CONF.queue_prefix + '-' + str(i)) for i in range(num_queues)] gevent.joinall([ @@ -125,9 +127,9 @@ def crunch(stats): def run(upstream_queue): - num_procs = conf.consumer_processes - num_workers = conf.consumer_workers - num_queues = conf.num_queues + num_procs = CONF.consumer_processes + num_workers = CONF.consumer_workers + num_queues = CONF.num_queues # Stats that will be reported duration = 0 @@ -141,17 +143,18 @@ def run(upstream_queue): # Performance test if num_procs and num_workers: - test_duration = conf.time + test_duration = CONF.time stats = mp.Queue() # TODO(TheSriram) : Make ttl and grace configurable args = (stats, num_workers, num_queues, test_duration, - conf.server_url, 300, 200, conf.messages_per_claim) + CONF.server_url, 300, 200, CONF.messages_per_claim) procs = [mp.Process(target=load_generator, args=args) for _ in range(num_procs)] - if conf.verbose: - print("\nStarting Consumer...") + if CONF.verbose: + print('\nStarting consumers (cp={0}, cw={1})...'.format( + num_procs, num_workers)) start = time.time() diff --git a/zaqar/bench/observer.py b/zaqar/bench/observer.py new file mode 100644 index 000000000..d49a12a41 --- /dev/null +++ b/zaqar/bench/observer.py @@ -0,0 +1,178 @@ +# Copyright (c) 2014 Rackspace, Inc. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import division +from __future__ import print_function + +import multiprocessing as mp +import random +import sys +import time + +from gevent import monkey as curious_george +curious_george.patch_all(thread=False, select=False) +import gevent +import marktime +from six.moves import urllib +from zaqarclient.queues.v1 import client +from zaqarclient.transport.errors import TransportError + +from zaqar.bench import config + +CONF = config.conf + + +# +# TODO(kgriffs): Factor out the common code from producer, consumer +# and worker (DRY all the things!) +# + + +def _extract_marker(links): + for link in links: + if link['rel'] == 'next': + href = link['href'] + break + + query = urllib.parse.urlparse(href).query + params = urllib.parse.parse_qs(query) + return params['marker'][0] + + +def observer(queues, stats, test_duration, limit): + """Observer Worker + + The observer lists messages without claiming them. + """ + + end = time.time() + test_duration + + total_elapsed = 0 + total_succeeded = 0 + total_failed = 0 + + queues = [{'q': q, 'm': None} for q in queues] + + while time.time() < end: + # NOTE(kgriffs): Distribute requests across all queues evenly. + queue = random.choice(queues) + + try: + marktime.start('list_messages') + cursor = queue['q'].messages(limit=limit, marker=queue['m']) + total_elapsed += marktime.stop('list_messages').seconds + total_succeeded += 1 + + messages = list(cursor) + + if messages: + # TODO(kgriffs): Figure out a less hacky way to do this + # while preserving the ability to measure elapsed time + # per request. + queue['m'] = _extract_marker(cursor._links) + + except TransportError as ex: + sys.stderr.write("Could not list messages : {0}\n".format(ex)) + total_failed += 1 + + total_requests = total_succeeded + total_failed + + stats.put({ + 'total_requests': total_requests, + 'total_succeeded': total_succeeded, + 'total_elapsed': total_elapsed, + }) + + +def load_generator(stats, num_workers, num_queues, + test_duration, limit): + + cli = client.Client(CONF.server_url) + queues = [cli.queue(CONF.queue_prefix + '-' + str(i)) + for i in range(num_queues)] + + gevent.joinall([ + gevent.spawn(observer, + queues, stats, test_duration, limit) + + for _ in range(num_workers) + ]) + + +def crunch(stats): + total_requests = 0 + total_succeeded = 0 + total_elapsed = 0.0 + + while not stats.empty(): + entry = stats.get_nowait() + total_requests += entry['total_requests'] + total_succeeded += entry['total_succeeded'] + total_elapsed += entry['total_elapsed'] + + return (total_requests, total_succeeded, total_elapsed) + + +def run(upstream_queue): + num_procs = CONF.observer_processes + num_workers = CONF.observer_workers + num_queues = CONF.num_queues + + # Stats that will be reported + duration = 0 + total_requests = 0 + total_succeeded = 0 + throughput = 0 + latency = 0 + + # Performance test + if num_procs and num_workers: + test_duration = CONF.time + stats = mp.Queue() + args = (stats, num_workers, num_queues, test_duration, + CONF.messages_per_list) + + procs = [mp.Process(target=load_generator, args=args) + for _ in range(num_procs)] + + if CONF.verbose: + print('\nStarting observer (op={0}, ow={1})...'.format( + num_procs, num_workers)) + + start = time.time() + + for each_proc in procs: + each_proc.start() + + for each_proc in procs: + each_proc.join() + + (total_requests, total_succeeded, total_elapsed) = crunch(stats) + + duration = time.time() - start + + throughput = total_succeeded / duration + + if total_succeeded: + latency = (1000 * total_elapsed / total_succeeded) + + upstream_queue.put({ + 'observer': { + 'duration_sec': duration, + 'total_reqs': total_requests, + 'successful_reqs': total_succeeded, + 'reqs_per_sec': throughput, + 'ms_per_req': latency, + } + }) diff --git a/zaqar/bench/producer.py b/zaqar/bench/producer.py index 38ec7e015..961a1ca8b 100644 --- a/zaqar/bench/producer.py +++ b/zaqar/bench/producer.py @@ -28,7 +28,9 @@ import marktime from zaqarclient.queues.v1 import client from zaqarclient.transport.errors import TransportError -from zaqar.bench.config import conf +from zaqar.bench import config + +CONF = config.conf def choose_message(message_pool): @@ -48,7 +50,7 @@ def choose_message(message_pool): def load_messages(): default_file_name = 'zaqar-benchmark-messages.json' - messages_path = conf.messages_path or conf.find_file(default_file_name) + messages_path = CONF.messages_path or CONF.find_file(default_file_name) if messages_path: with open(messages_path) as f: message_pool = json.load(f) @@ -102,8 +104,8 @@ def producer(queues, message_pool, stats, test_duration): # weight them, so can have some busy queues, some not.) def load_generator(stats, num_workers, num_queues, test_duration): - cli = client.Client(conf.server_url) - queues = [cli.queue(conf.queue_prefix + '-' + str(i)) + cli = client.Client(CONF.server_url) + queues = [cli.queue(CONF.queue_prefix + '-' + str(i)) for i in range(num_queues)] message_pool = load_messages() @@ -131,9 +133,9 @@ def crunch(stats): def run(upstream_queue): - num_procs = conf.producer_processes - num_workers = conf.producer_workers - num_queues = conf.num_queues + num_procs = CONF.producer_processes + num_workers = CONF.producer_workers + num_queues = CONF.num_queues duration = 0 total_requests = 0 @@ -142,7 +144,7 @@ def run(upstream_queue): latency = 0 if num_procs and num_workers: - test_duration = conf.time + test_duration = CONF.time stats = mp.Queue() args = (stats, num_workers, num_queues, test_duration) @@ -155,8 +157,9 @@ def run(upstream_queue): for _ in range(num_procs) ] - if conf.verbose: - print('\nStarting Producer...') + if CONF.verbose: + print('\nStarting producer (pp={0}, pw={1})...'.format( + num_procs, num_workers)) start = time.time()