Merge "Add observer role to benchmark tool"
This commit is contained in:
commit
3e25ea4718
55
README.rst
55
README.rst
@ -112,40 +112,45 @@ is used for all requests.
|
||||
|
||||
Run the benchmarking tool using the following command::
|
||||
|
||||
$ zaqar-bench-pc
|
||||
$ zaqar-bench
|
||||
|
||||
By default, the command will run a performance test for 3 seconds, using one
|
||||
consumer and one producer for each CPU on the system, with 2 greenlet workers
|
||||
per CPU per process. You can override these defaults in the config file or on
|
||||
the command line using a variety of options. For example, the following
|
||||
command runs a performance test for 10 seconds using 4 producer processes with
|
||||
20 workers each, plus 1 consumer process with 4 workers::
|
||||
By default, the command will run a performance test for 5 seconds, using one
|
||||
producer process with 10 greenlet workers, and one observer process with
|
||||
5 workers. The consumer role is disabled by default.
|
||||
|
||||
$ zaqar-bench-pc -pp 4 -pw 20 -cp 1 -cw 4 -t 10
|
||||
You can override these defaults in the config file or on the command line
|
||||
using a variety of options. For example, the following command runs a
|
||||
performance test for 30 seconds using 4 producer processes with
|
||||
20 workers each, plus 4 consumer processes with 20 workers each. Note that
|
||||
the observer role is also disabled in this example by setting its number of
|
||||
workers to zero::
|
||||
|
||||
By default, the results are in JSON. For more human-readable output add the ``--verbose`` flag.
|
||||
Verbose output looks similar to the following::
|
||||
$ zaqar-bench -pp 4 -pw 10 -cw 4 -cw 20 -ow 0 -t 30
|
||||
|
||||
Starting Producer...
|
||||
By default, the results are in JSON. For more human-readable output add
|
||||
the ``--verbose`` flag. Verbose output looks similar to the following::
|
||||
|
||||
Starting Consumer...
|
||||
$ zaqar-bench --verbose
|
||||
|
||||
Consumer
|
||||
========
|
||||
duration_sec: 10.2
|
||||
ms_per_claim: 37.6
|
||||
ms_per_delete: 11.8
|
||||
reqs_per_sec: 82.0
|
||||
successful_reqs: 833.0
|
||||
total_reqs: 833.0
|
||||
Starting producer (pp=1 , pw=10)...
|
||||
|
||||
Starting observer (op=1 , ow=5)...
|
||||
|
||||
Producer
|
||||
========
|
||||
duration_sec: 10.2
|
||||
ms_per_req: 3.8
|
||||
reqs_per_sec: 1033.6
|
||||
successful_reqs: 10523.0
|
||||
total_reqs: 10523.0
|
||||
duration_sec: 5.1
|
||||
ms_per_req: 2.9
|
||||
reqs_per_sec: 344.5
|
||||
successful_reqs: 1742.0
|
||||
total_reqs: 1742.0
|
||||
|
||||
Observer
|
||||
========
|
||||
duration_sec: 5.0
|
||||
ms_per_req: 2.9
|
||||
reqs_per_sec: 339.3
|
||||
successful_reqs: 1706.0
|
||||
total_reqs: 1706.0
|
||||
|
||||
|
||||
.. _`OpenStack` : http://openstack.org/
|
||||
|
@ -1,5 +1,4 @@
|
||||
argparse>=1.2.1
|
||||
gevent>=1.0.1
|
||||
marktime>=0.2.0
|
||||
psutil>=2.1.1
|
||||
python-zaqarclient>=0.0.2
|
||||
|
@ -32,7 +32,7 @@ source-dir = doc/source
|
||||
|
||||
[entry_points]
|
||||
console_scripts =
|
||||
zaqar-bench-pc = zaqar.bench.conductor:main
|
||||
zaqar-bench = zaqar.bench.conductor:main
|
||||
zaqar-server = zaqar.cmd.server:run
|
||||
marconi-server = zaqar.cmd.server:run
|
||||
|
||||
|
@ -17,17 +17,52 @@ from __future__ import print_function
|
||||
import json
|
||||
import multiprocessing as mp
|
||||
|
||||
from zaqar.bench.config import conf
|
||||
from zaqarclient.queues.v1 import client
|
||||
|
||||
from zaqar.bench import config
|
||||
from zaqar.bench import consumer
|
||||
from zaqar.bench import observer
|
||||
from zaqar.bench import producer
|
||||
|
||||
CONF = config.conf
|
||||
|
||||
|
||||
def _print_verbose_stats(name, stats):
|
||||
print(name.capitalize())
|
||||
print('=' * len(name))
|
||||
|
||||
values = sorted(stats.items(), key=lambda v: v[0])
|
||||
formatted_vals = ['{}: {:.1f}'.format(*v) for v in values]
|
||||
|
||||
print('\n'.join(formatted_vals))
|
||||
print() # Blank line
|
||||
|
||||
|
||||
def _reset_queues():
|
||||
cli = client.Client(CONF.server_url)
|
||||
|
||||
for i in range(CONF.num_queues):
|
||||
# TODO(kgriffs): DRY up name generation so it is done
|
||||
# in a helper, vs. being copy-pasted everywhere.
|
||||
queue = cli.queue(CONF.queue_prefix + '-' + str(i))
|
||||
queue.delete()
|
||||
|
||||
|
||||
def main():
|
||||
conf(project='zaqar', prog='zaqar-benchmark')
|
||||
CONF(project='zaqar', prog='zaqar-benchmark')
|
||||
|
||||
# NOTE(kgriffs): Reset queues since last time. We don't
|
||||
# clean them up after the performance test, in case
|
||||
# the user wants to examine the state of the system.
|
||||
if not CONF.skip_queue_reset:
|
||||
if CONF.verbose:
|
||||
print('Resetting queues...')
|
||||
|
||||
_reset_queues()
|
||||
|
||||
downstream_queue = mp.Queue()
|
||||
procs = [mp.Process(target=worker.run, args=(downstream_queue,))
|
||||
for worker in [producer, consumer]]
|
||||
for worker in [producer, consumer, observer]]
|
||||
|
||||
for each_proc in procs:
|
||||
each_proc.start()
|
||||
@ -39,29 +74,32 @@ def main():
|
||||
for each_proc in procs:
|
||||
stats.update(downstream_queue.get_nowait())
|
||||
|
||||
if conf.verbose:
|
||||
if CONF.verbose:
|
||||
print()
|
||||
|
||||
for name, stats_group in stats.items():
|
||||
print(name.capitalize())
|
||||
print('=' * len(name))
|
||||
for name in ('producer', 'observer', 'consumer'):
|
||||
stats_group = stats[name]
|
||||
|
||||
values = sorted(stats_group.items(), key=lambda v: v[0])
|
||||
formatted_vals = ["{}: {:.1f}".format(*v) for v in values]
|
||||
# Skip disabled workers
|
||||
if not stats_group['duration_sec']:
|
||||
continue
|
||||
|
||||
print("\n".join(formatted_vals))
|
||||
print('') # Blank line
|
||||
_print_verbose_stats(name, stats_group)
|
||||
|
||||
else:
|
||||
stats['params'] = {
|
||||
'producer': {
|
||||
'processes': conf.producer_processes,
|
||||
'workers': conf.producer_workers
|
||||
'processes': CONF.producer_processes,
|
||||
'workers': CONF.producer_workers
|
||||
},
|
||||
'consumer': {
|
||||
'processes': conf.consumer_processes,
|
||||
'workers': conf.consumer_workers
|
||||
}
|
||||
'processes': CONF.consumer_processes,
|
||||
'workers': CONF.consumer_workers
|
||||
},
|
||||
'observer': {
|
||||
'processes': CONF.observer_processes,
|
||||
'workers': CONF.observer_workers
|
||||
},
|
||||
}
|
||||
|
||||
print(json.dumps(stats))
|
||||
|
@ -13,38 +13,61 @@
|
||||
# limitations under the License.
|
||||
|
||||
from oslo.config import cfg
|
||||
import psutil
|
||||
|
||||
conf = cfg.CONF
|
||||
_CLI_OPTIONS = (
|
||||
cfg.IntOpt(
|
||||
'producer_processes',
|
||||
short='pp',
|
||||
default=psutil.NUM_CPUS,
|
||||
default=1,
|
||||
help='Number of Producer Processes'),
|
||||
cfg.IntOpt(
|
||||
'producer_workers',
|
||||
short='pw',
|
||||
default=psutil.NUM_CPUS * 2,
|
||||
default=10,
|
||||
help='Number of Producer Workers'),
|
||||
|
||||
cfg.IntOpt(
|
||||
'consumer_processes',
|
||||
short='cp',
|
||||
default=psutil.NUM_CPUS,
|
||||
default=1,
|
||||
help='Number of Consumer Processes'),
|
||||
cfg.IntOpt(
|
||||
'consumer_workers',
|
||||
short='cw',
|
||||
default=psutil.NUM_CPUS * 2,
|
||||
default=0,
|
||||
help='Number of Consumer Workers'),
|
||||
|
||||
cfg.IntOpt(
|
||||
'observer_processes',
|
||||
short='op',
|
||||
default=1,
|
||||
help='Number of Observer Processes'),
|
||||
cfg.IntOpt(
|
||||
'observer_workers',
|
||||
short='ow',
|
||||
default=5,
|
||||
help='Number of Observer Workers'),
|
||||
|
||||
cfg.IntOpt('messages_per_claim', short='cno', default=5,
|
||||
help=('Number of messages the consumer will attempt to '
|
||||
'claim at a time')),
|
||||
cfg.IntOpt('time', short='t', default=3,
|
||||
cfg.IntOpt('messages_per_list', short='lno', default=5,
|
||||
help=('Number of messages the obserer will attempt to '
|
||||
'list at a time')),
|
||||
|
||||
cfg.IntOpt('time', short='t', default=5,
|
||||
help="Duration of the performance test, in seconds"),
|
||||
|
||||
cfg.StrOpt('server_url', short='s', default='http://localhost:8888'),
|
||||
|
||||
cfg.StrOpt('queue_prefix', short='q', default='ogre-test-queue'),
|
||||
cfg.IntOpt('num_queues', short='qno', default=4),
|
||||
cfg.StrOpt('messages_path', short='m')
|
||||
|
||||
cfg.StrOpt('messages_path', short='m'),
|
||||
|
||||
cfg.BoolOpt('skip_queue_reset', default=False,
|
||||
help=('Do not reset queues before running'
|
||||
'the performance test')),
|
||||
)
|
||||
conf.register_cli_opts(_CLI_OPTIONS)
|
||||
|
@ -27,7 +27,9 @@ import marktime
|
||||
from zaqarclient.queues.v1 import client
|
||||
from zaqarclient.transport.errors import TransportError
|
||||
|
||||
from zaqar.bench.config import conf
|
||||
from zaqar.bench import config
|
||||
|
||||
CONF = config.conf
|
||||
|
||||
|
||||
def claim_delete(queues, stats, test_duration, ttl, grace, limit):
|
||||
@ -93,8 +95,8 @@ def claim_delete(queues, stats, test_duration, ttl, grace, limit):
|
||||
def load_generator(stats, num_workers, num_queues,
|
||||
test_duration, url, ttl, grace, limit):
|
||||
|
||||
cli = client.Client(conf.server_url)
|
||||
queues = [cli.queue(conf.queue_prefix + '-' + str(i))
|
||||
cli = client.Client(CONF.server_url)
|
||||
queues = [cli.queue(CONF.queue_prefix + '-' + str(i))
|
||||
for i in range(num_queues)]
|
||||
|
||||
gevent.joinall([
|
||||
@ -125,9 +127,9 @@ def crunch(stats):
|
||||
|
||||
|
||||
def run(upstream_queue):
|
||||
num_procs = conf.consumer_processes
|
||||
num_workers = conf.consumer_workers
|
||||
num_queues = conf.num_queues
|
||||
num_procs = CONF.consumer_processes
|
||||
num_workers = CONF.consumer_workers
|
||||
num_queues = CONF.num_queues
|
||||
|
||||
# Stats that will be reported
|
||||
duration = 0
|
||||
@ -141,17 +143,18 @@ def run(upstream_queue):
|
||||
|
||||
# Performance test
|
||||
if num_procs and num_workers:
|
||||
test_duration = conf.time
|
||||
test_duration = CONF.time
|
||||
stats = mp.Queue()
|
||||
# TODO(TheSriram) : Make ttl and grace configurable
|
||||
args = (stats, num_workers, num_queues, test_duration,
|
||||
conf.server_url, 300, 200, conf.messages_per_claim)
|
||||
CONF.server_url, 300, 200, CONF.messages_per_claim)
|
||||
|
||||
procs = [mp.Process(target=load_generator, args=args)
|
||||
for _ in range(num_procs)]
|
||||
|
||||
if conf.verbose:
|
||||
print("\nStarting Consumer...")
|
||||
if CONF.verbose:
|
||||
print('\nStarting consumers (cp={0}, cw={1})...'.format(
|
||||
num_procs, num_workers))
|
||||
|
||||
start = time.time()
|
||||
|
||||
|
178
zaqar/bench/observer.py
Normal file
178
zaqar/bench/observer.py
Normal file
@ -0,0 +1,178 @@
|
||||
# Copyright (c) 2014 Rackspace, Inc.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
import multiprocessing as mp
|
||||
import random
|
||||
import sys
|
||||
import time
|
||||
|
||||
from gevent import monkey as curious_george
|
||||
curious_george.patch_all(thread=False, select=False)
|
||||
import gevent
|
||||
import marktime
|
||||
from six.moves import urllib
|
||||
from zaqarclient.queues.v1 import client
|
||||
from zaqarclient.transport.errors import TransportError
|
||||
|
||||
from zaqar.bench import config
|
||||
|
||||
CONF = config.conf
|
||||
|
||||
|
||||
#
|
||||
# TODO(kgriffs): Factor out the common code from producer, consumer
|
||||
# and worker (DRY all the things!)
|
||||
#
|
||||
|
||||
|
||||
def _extract_marker(links):
|
||||
for link in links:
|
||||
if link['rel'] == 'next':
|
||||
href = link['href']
|
||||
break
|
||||
|
||||
query = urllib.parse.urlparse(href).query
|
||||
params = urllib.parse.parse_qs(query)
|
||||
return params['marker'][0]
|
||||
|
||||
|
||||
def observer(queues, stats, test_duration, limit):
|
||||
"""Observer Worker
|
||||
|
||||
The observer lists messages without claiming them.
|
||||
"""
|
||||
|
||||
end = time.time() + test_duration
|
||||
|
||||
total_elapsed = 0
|
||||
total_succeeded = 0
|
||||
total_failed = 0
|
||||
|
||||
queues = [{'q': q, 'm': None} for q in queues]
|
||||
|
||||
while time.time() < end:
|
||||
# NOTE(kgriffs): Distribute requests across all queues evenly.
|
||||
queue = random.choice(queues)
|
||||
|
||||
try:
|
||||
marktime.start('list_messages')
|
||||
cursor = queue['q'].messages(limit=limit, marker=queue['m'])
|
||||
total_elapsed += marktime.stop('list_messages').seconds
|
||||
total_succeeded += 1
|
||||
|
||||
messages = list(cursor)
|
||||
|
||||
if messages:
|
||||
# TODO(kgriffs): Figure out a less hacky way to do this
|
||||
# while preserving the ability to measure elapsed time
|
||||
# per request.
|
||||
queue['m'] = _extract_marker(cursor._links)
|
||||
|
||||
except TransportError as ex:
|
||||
sys.stderr.write("Could not list messages : {0}\n".format(ex))
|
||||
total_failed += 1
|
||||
|
||||
total_requests = total_succeeded + total_failed
|
||||
|
||||
stats.put({
|
||||
'total_requests': total_requests,
|
||||
'total_succeeded': total_succeeded,
|
||||
'total_elapsed': total_elapsed,
|
||||
})
|
||||
|
||||
|
||||
def load_generator(stats, num_workers, num_queues,
|
||||
test_duration, limit):
|
||||
|
||||
cli = client.Client(CONF.server_url)
|
||||
queues = [cli.queue(CONF.queue_prefix + '-' + str(i))
|
||||
for i in range(num_queues)]
|
||||
|
||||
gevent.joinall([
|
||||
gevent.spawn(observer,
|
||||
queues, stats, test_duration, limit)
|
||||
|
||||
for _ in range(num_workers)
|
||||
])
|
||||
|
||||
|
||||
def crunch(stats):
|
||||
total_requests = 0
|
||||
total_succeeded = 0
|
||||
total_elapsed = 0.0
|
||||
|
||||
while not stats.empty():
|
||||
entry = stats.get_nowait()
|
||||
total_requests += entry['total_requests']
|
||||
total_succeeded += entry['total_succeeded']
|
||||
total_elapsed += entry['total_elapsed']
|
||||
|
||||
return (total_requests, total_succeeded, total_elapsed)
|
||||
|
||||
|
||||
def run(upstream_queue):
|
||||
num_procs = CONF.observer_processes
|
||||
num_workers = CONF.observer_workers
|
||||
num_queues = CONF.num_queues
|
||||
|
||||
# Stats that will be reported
|
||||
duration = 0
|
||||
total_requests = 0
|
||||
total_succeeded = 0
|
||||
throughput = 0
|
||||
latency = 0
|
||||
|
||||
# Performance test
|
||||
if num_procs and num_workers:
|
||||
test_duration = CONF.time
|
||||
stats = mp.Queue()
|
||||
args = (stats, num_workers, num_queues, test_duration,
|
||||
CONF.messages_per_list)
|
||||
|
||||
procs = [mp.Process(target=load_generator, args=args)
|
||||
for _ in range(num_procs)]
|
||||
|
||||
if CONF.verbose:
|
||||
print('\nStarting observer (op={0}, ow={1})...'.format(
|
||||
num_procs, num_workers))
|
||||
|
||||
start = time.time()
|
||||
|
||||
for each_proc in procs:
|
||||
each_proc.start()
|
||||
|
||||
for each_proc in procs:
|
||||
each_proc.join()
|
||||
|
||||
(total_requests, total_succeeded, total_elapsed) = crunch(stats)
|
||||
|
||||
duration = time.time() - start
|
||||
|
||||
throughput = total_succeeded / duration
|
||||
|
||||
if total_succeeded:
|
||||
latency = (1000 * total_elapsed / total_succeeded)
|
||||
|
||||
upstream_queue.put({
|
||||
'observer': {
|
||||
'duration_sec': duration,
|
||||
'total_reqs': total_requests,
|
||||
'successful_reqs': total_succeeded,
|
||||
'reqs_per_sec': throughput,
|
||||
'ms_per_req': latency,
|
||||
}
|
||||
})
|
@ -28,7 +28,9 @@ import marktime
|
||||
from zaqarclient.queues.v1 import client
|
||||
from zaqarclient.transport.errors import TransportError
|
||||
|
||||
from zaqar.bench.config import conf
|
||||
from zaqar.bench import config
|
||||
|
||||
CONF = config.conf
|
||||
|
||||
|
||||
def choose_message(message_pool):
|
||||
@ -48,7 +50,7 @@ def choose_message(message_pool):
|
||||
|
||||
def load_messages():
|
||||
default_file_name = 'zaqar-benchmark-messages.json'
|
||||
messages_path = conf.messages_path or conf.find_file(default_file_name)
|
||||
messages_path = CONF.messages_path or CONF.find_file(default_file_name)
|
||||
if messages_path:
|
||||
with open(messages_path) as f:
|
||||
message_pool = json.load(f)
|
||||
@ -102,8 +104,8 @@ def producer(queues, message_pool, stats, test_duration):
|
||||
# weight them, so can have some busy queues, some not.)
|
||||
def load_generator(stats, num_workers, num_queues, test_duration):
|
||||
|
||||
cli = client.Client(conf.server_url)
|
||||
queues = [cli.queue(conf.queue_prefix + '-' + str(i))
|
||||
cli = client.Client(CONF.server_url)
|
||||
queues = [cli.queue(CONF.queue_prefix + '-' + str(i))
|
||||
for i in range(num_queues)]
|
||||
|
||||
message_pool = load_messages()
|
||||
@ -131,9 +133,9 @@ def crunch(stats):
|
||||
|
||||
|
||||
def run(upstream_queue):
|
||||
num_procs = conf.producer_processes
|
||||
num_workers = conf.producer_workers
|
||||
num_queues = conf.num_queues
|
||||
num_procs = CONF.producer_processes
|
||||
num_workers = CONF.producer_workers
|
||||
num_queues = CONF.num_queues
|
||||
|
||||
duration = 0
|
||||
total_requests = 0
|
||||
@ -142,7 +144,7 @@ def run(upstream_queue):
|
||||
latency = 0
|
||||
|
||||
if num_procs and num_workers:
|
||||
test_duration = conf.time
|
||||
test_duration = CONF.time
|
||||
stats = mp.Queue()
|
||||
args = (stats, num_workers, num_queues, test_duration)
|
||||
|
||||
@ -155,8 +157,9 @@ def run(upstream_queue):
|
||||
for _ in range(num_procs)
|
||||
]
|
||||
|
||||
if conf.verbose:
|
||||
print('\nStarting Producer...')
|
||||
if CONF.verbose:
|
||||
print('\nStarting producer (pp={0}, pw={1})...'.format(
|
||||
num_procs, num_workers))
|
||||
|
||||
start = time.time()
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user