From 4c950f99e44c209f920f759084d09d74ea899fc2 Mon Sep 17 00:00:00 2001 From: Sriram Madapusi Vasudevan Date: Mon, 9 Jun 2014 16:11:49 -0400 Subject: [PATCH] feat(benchmarking) : Producer-Consumer scenario This patch adds the ability to benchmark Marconi. The benchmark tool is a console script, and can be triggered using $ marconi-bench-pc The Benchmark tool fires up both a Producer Process and a Consumer Process, while accepting CLI parameters for the number of processes, number of workers and duration of test. The Producer Process publishes messages to a given queue, while the Consumer consumes the messages by claiming and deleting them. Setup: Benchmark dependencies need to be pip installed: pip install -r bench-requirements.txt Export an environment variable called MESSAGES_PATH and set it to the path of messages.json in marconi/bench Note: This allows benchmarking with different set of messages rather than those specified in messages.json Usage: $ marconi-bench-pc -p {No. Processes} -w {No. Workers} -t {No. Seconds} Example: $ marconi-bench-pc -p 2 -w 2 -t 4 Partially Implements: blueprint basic-benchmarking Change-Id: I57ebe853554199490adba8b2a091423f399b0565 --- bench-requirements.txt | 5 ++ marconi/bench/README.rst | 36 ++++++++ marconi/bench/__init__.py | 0 marconi/bench/cli_config.py | 33 +++++++ marconi/bench/conductor.py | 29 +++++++ marconi/bench/consumer.py | 142 ++++++++++++++++++++++++++++++ marconi/bench/messages.json | 72 +++++++++++++++ marconi/bench/producer.py | 169 ++++++++++++++++++++++++++++++++++++ setup.cfg | 1 + 9 files changed, 487 insertions(+) create mode 100644 bench-requirements.txt create mode 100644 marconi/bench/README.rst create mode 100644 marconi/bench/__init__.py create mode 100644 marconi/bench/cli_config.py create mode 100644 marconi/bench/conductor.py create mode 100644 marconi/bench/consumer.py create mode 100644 marconi/bench/messages.json create mode 100644 marconi/bench/producer.py diff --git a/bench-requirements.txt b/bench-requirements.txt new file mode 100644 index 000000000..14347ef8f --- /dev/null +++ b/bench-requirements.txt @@ -0,0 +1,5 @@ +argparse>=1.2.1 +gevent>=1.0.1 +marktime>=0.2.0 +psutil>=2.1.1 +python-marconiclient>=0.0.2 diff --git a/marconi/bench/README.rst b/marconi/bench/README.rst new file mode 100644 index 000000000..5dc7e06e1 --- /dev/null +++ b/marconi/bench/README.rst @@ -0,0 +1,36 @@ +Marconi Benchmarking +==================== + +Structure +--------- +The Benchmark tool fires up both a Producer Process and a Consumer Process, while +accepting CLI parameters for the number of processes, number of workers and duration of test. + +The Producer Process publishes messages to a given queue, while the Consumer consumes the messages +claiming and deleting them. + +Need of the Benchmark +--------------------- + +Marconi is a performance oriented API. Any changes made need to performance tested, and this tool +helps by a being quick way to test that. + +Setup +----- +Benchmark dependencies need to be pip installed:: + + pip install -r bench-requirements.txt + +Make sure you have a running instance of Marconi after following `README`_ for +setting up Marconi running at port 8888:: + +Export an environment variable called MESSAGES_PATH and set it to the path of messages.json +in marconi/bench + +Note: This allows benchmarking with different set of messages rather than those specified in + messages.json + + $ marconi-bench-pc -p {Number of Processes} -w {Number of Workers} -t {Duration in Seconds} + + +.. _`README` : https://github.com/openstack/marconi/blob/master/README.rst \ No newline at end of file diff --git a/marconi/bench/__init__.py b/marconi/bench/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/marconi/bench/cli_config.py b/marconi/bench/cli_config.py new file mode 100644 index 000000000..e6e25c109 --- /dev/null +++ b/marconi/bench/cli_config.py @@ -0,0 +1,33 @@ +# Copyright (c) 2014 Rackspace, Inc. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo.config import cfg +import psutil + +conf = cfg.CONF +_CLI_OPTIONS = ( + cfg.IntOpt( + 'processes', + short='p', + default=psutil.NUM_CPUS, + help='Number of Processes'), + cfg.IntOpt( + 'workers', + short='w', + default=psutil.NUM_CPUS * 2, + help='Number of Workers'), + cfg.IntOpt('time', short='t', default=3, help="time in seconds"), +) +conf.register_cli_opts(_CLI_OPTIONS) +conf(project='marconi', prog='marconi-queues') diff --git a/marconi/bench/conductor.py b/marconi/bench/conductor.py new file mode 100644 index 000000000..ce8ebf862 --- /dev/null +++ b/marconi/bench/conductor.py @@ -0,0 +1,29 @@ +# Copyright (c) 2014 Rackspace, Inc. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import multiprocessing as mp + +from marconi.bench import consumer +from marconi.bench import producer + + +def main(): + procs = [mp.Process(target=worker.run) + for worker in [producer, consumer]] + for each_proc in procs: + each_proc.start() + for each_proc in procs: + each_proc.join() diff --git a/marconi/bench/consumer.py b/marconi/bench/consumer.py new file mode 100644 index 000000000..9ddfe0666 --- /dev/null +++ b/marconi/bench/consumer.py @@ -0,0 +1,142 @@ +# Copyright (c) 2014 Rackspace, Inc. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import division + +import multiprocessing as mp +import time + +from gevent import monkey as curious_george +curious_george.patch_all(thread=False, select=False) +import gevent +from marconiclient.queues.v1 import client +from marconiclient.transport.errors import TransportError +import marktime + +from marconi.bench.cli_config import conf + +URL = 'http://localhost:8888' +QUEUE_PREFIX = 'ogre-test-queue-' + + +def claim_delete(stats, test_duration, ttl, grace, limit): + """Consumer Worker + + The Consumer Worker continuously claims and deletes messages + for the specified duration. The time taken for each claim and + delete is recorded for calculating throughput and latency. + """ + + cli = client.Client(URL) + queue = cli.queue(QUEUE_PREFIX + '1') + end = time.time() + test_duration + total_elapsed = 0 + total_requests = 0 + claim_total_requests = 0 + delete_total_requests = 0 + + while time.time() < end: + marktime.start('claim_message') + try: + claim = queue.claim(ttl=ttl, grace=grace, limit=limit) + + except TransportError as ex: + print ("Could not claim messages : {0}".format(ex)) + + else: + total_elapsed += marktime.stop('claim_message').seconds + total_requests += 1 + claim_total_requests += 1 + + try: + marktime.start('delete_message') + + for msg in claim: + # TODO(TheSriram): Simulate actual work before deletion + msg.delete() + + total_elapsed += marktime.stop('delete_message').seconds + delete_total_requests += 1 + total_requests += 1 + stats.put({'total_requests': total_requests, + 'claim_total_requests': claim_total_requests, + 'delete_total_requests': delete_total_requests, + 'total_elapsed': total_elapsed}) + + except TransportError as ex: + print ("Could not claim and delete : {0}".format(ex)) + + +def load_generator(stats, num_workers, test_duration, url, ttl, grace, limit): + gevent.joinall([ + gevent.spawn(claim_delete, stats, test_duration, ttl, + grace, limit) + for _ in range(num_workers) + ]) + + +def crunch(stats): + total_requests = 0 + total_latency = 0.0 + claim_total_requests = 0 + delete_total_requests = 0 + + while not stats.empty(): + entry = stats.get_nowait() + total_requests += entry['total_requests'] + total_latency += entry['total_elapsed'] + claim_total_requests += entry['claim_total_requests'] + delete_total_requests += entry['delete_total_requests'] + + return (total_requests, total_latency, claim_total_requests, + delete_total_requests) + + +def run(): + num_procs = conf.processes + num_workers = conf.workers + test_duration = conf.time + stats = mp.Queue() + # TODO(TheSriram) : Make ttl,grace and limit configurable + args = (stats, num_workers, test_duration, URL, 300, 200, 1) + + procs = [mp.Process(target=load_generator, args=args) + for _ in range(num_procs)] + + print ("\nStarting Consumer...") + + start = time.time() + + for each_proc in procs: + each_proc.start() + for each_proc in procs: + each_proc.join() + + (total_requests, total_latency, claim_total_requests, + delete_total_requests) = crunch(stats) + + duration = time.time() - start + throughput = total_requests / duration + latency = 1000 * total_latency / total_requests + + print('Duration: {0:.1f} sec'.format(duration)) + print('Total Requests: {0}'.format(total_requests)) + print('Throughput: {0:.0f} req/sec'.format(throughput)) + print('Latency: {0:.1f} ms/req'.format(latency)) + + print('') # Blank line + + +def main(): + run() diff --git a/marconi/bench/messages.json b/marconi/bench/messages.json new file mode 100644 index 000000000..97f909289 --- /dev/null +++ b/marconi/bench/messages.json @@ -0,0 +1,72 @@ +[ + { + "weight": 0.8, + "doc": { + "ttl": 60, + "body": { + "id": "7FA23C90-62F7-40D2-9360-FBD5D7D61CD1", + "evt": "Wakeup" + } + } + }, + { + "weight": 0.1, + "doc": { + "ttl": 3600, + "body": { + "ResultSet": { + "totalResultsAvailable": 1827221, + "totalResultsReturned": 2, + "firstResultPosition": 1, + "Result": [ + { + "Title": "potato jpg", + "Summary": "Kentang Si bungsu dari keluarga Solanum tuberosum L ini ternyata memiliki khasiat untuk mengurangi kerutan jerawat bintik hitam dan kemerahan pada kulit Gunakan seminggu sekali sebagai", + "Url": "http://www.mediaindonesia.com/spaw/uploads/images/potato.jpg", + "ClickUrl": "http://www.mediaindonesia.com/spaw/uploads/images/potato.jpg", + "RefererUrl": "http://www.mediaindonesia.com/mediaperempuan/index.php?ar_id=Nzkw", + "FileSize": 22630, + "FileFormat": "jpeg", + "Height": 362, + "Width": 532, + "Thumbnail": { + "Url": "http://thm-a01.yimg.com/nimage/557094559c18f16a", + "Height": 98, + "Width": 145 + } + }, + { + "Title": "potato jpg", + "Summary": "Introduction of puneri aloo This is a traditional potato preparation flavoured with curry leaves and peanuts and can be eaten on fasting day Preparation time 10 min", + "Url": "http://www.infovisual.info/01/photo/potato.jpg", + "ClickUrl": "http://www.infovisual.info/01/photo/potato.jpg", + "RefererUrl": "http://sundayfood.com/puneri-aloo-indian-%20recipe", + "FileSize": 119398, + "FileFormat": "jpeg", + "Height": 685, + "Width": 1024, + "Thumbnail": { + "Url": "http://thm-a01.yimg.com/nimage/7fa23212efe84b64", + "Height": 107, + "Width": 160 + } + } + ] + } + } + } + }, + { + "weight": 0.1, + "doc": { + "ttl": 360, + "body": { + "id": "7FA23C90-62F7-40D2-9360-FBD5D7D61CD1", + "evt": "StartBackup", + "files": [ + "/foo/bar/stuff/thing.dat" + ] + } + } + } +] diff --git a/marconi/bench/producer.py b/marconi/bench/producer.py new file mode 100644 index 000000000..3491075bf --- /dev/null +++ b/marconi/bench/producer.py @@ -0,0 +1,169 @@ +# Copyright (c) 2014 Rackspace, Inc. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import division + +import json +import multiprocessing as mp +import os +import random +import sys +import time + +from gevent import monkey as curious_george +curious_george.patch_all(thread=False, select=False) +import gevent +from marconiclient.queues.v1 import client +from marconiclient.transport.errors import TransportError +import marktime + +from marconi.bench.cli_config import conf + + +# TODO(TheSriram): Make configurable +URL = 'http://localhost:8888' +QUEUE_PREFIX = 'ogre-test-queue-' + +# TODO(TheSriram) : Migrate from env variable to config +if os.environ.get('MESSAGES_PATH'): + with open(os.environ.get('MESSAGES_PATH')) as f: + message_pool = json.loads(f.read()) +else: + print("Error : $MESSAGES_PATH needs to be set") + sys.exit(1) + + +message_pool.sort(key=lambda msg: msg['weight']) + + +def choose_message(): + """Choose a message from our pool of possibilities.""" + + # Assume message_pool is sorted by weight, ascending + position = random.random() + accumulator = 0.00 + + for each_message in message_pool: + accumulator += each_message['weight'] + if position < accumulator: + return each_message['doc'] + + assert False + + +def producer(stats, test_duration): + """Producer Worker + + The Producer Worker continuously post messages for + the specified duration. The time taken for each post + is recorded for calculating throughput and latency. + """ + + cli = client.Client(URL) + queue = cli.queue(QUEUE_PREFIX + '1') + + total_requests = 0 + total_elapsed = 0 + end = time.time() + test_duration + + while time.time() < end: + marktime.start('post message') + + # TODO(TheSriram): Track/report errors + try: + queue.post(choose_message()) + + except TransportError as ex: + print("Could not post a message : {0}".format(ex)) + + else: + total_elapsed += marktime.stop('post message').seconds + total_requests += 1 + + stats.put({ + 'total_requests': total_requests, + 'total_elapsed': total_elapsed + }) + + +# TODO(TheSriram): make distributed across multiple machines +# TODO(TheSriram): post across several queues (which workers to which queues? +# weight them, so can have some busy queues, some not.) +def load_generator(stats, num_workers, test_duration): + # TODO(TheSriram): Have some way to get all of the workers to line up and + # start at the same time (is this really useful?) + + gevent.joinall([ + gevent.spawn(producer, stats, test_duration) + for _ in range(num_workers) + ]) + + +def crunch(stats): + total_requests = 0 + total_latency = 0.0 + + while not stats.empty(): + entry = stats.get_nowait() + total_requests += entry['total_requests'] + total_latency += entry['total_elapsed'] + + return total_requests, total_latency + + +def run(): + + num_procs = conf.processes + num_workers = conf.workers + test_duration = conf.time + stats = mp.Queue() + args = (stats, num_workers, test_duration) + + # TODO(TheSriram): Multiple test runs, vary num workers and drain/delete + # queues in between each run. Plot these on a graph, with + # concurrency as the X axis. + + procs = [ + mp.Process(target=load_generator, args=args) + for _ in range(num_procs) + ] + + print('\nStarting Producer...') + start = time.time() + + for each_proc in procs: + each_proc.start() + + for each_proc in procs: + each_proc.join() + + total_requests, total_latency = crunch(stats) + + # TODO(TheSriram): Add one more stat: "attempted req/sec" so can + # graph that on the x axis vs. achieved throughput and + # latency. + duration = time.time() - start + throughput = total_requests / duration + latency = 1000 * total_latency / total_requests + + print('Duration: {0:.1f} sec'.format(duration)) + print('Total Requests: {0}'.format(total_requests)) + print('Throughput: {0:.0f} req/sec'.format(throughput)) + print('Latency: {0:.1f} ms/req'.format(latency)) + + print('') # Blank line + + +def main(): + run() diff --git a/setup.cfg b/setup.cfg index 48787223c..7109be12d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -32,6 +32,7 @@ source-dir = doc/source [entry_points] console_scripts = + marconi-bench-pc = marconi.bench.conductor:main marconi-server = marconi.cmd.server:run marconi.queues.data.storage =