Merge "feat(benchmarking) : Producer-Consumer scenario"
This commit is contained in:
commit
57842ef9b3
5
bench-requirements.txt
Normal file
5
bench-requirements.txt
Normal file
@ -0,0 +1,5 @@
|
||||
argparse>=1.2.1
|
||||
gevent>=1.0.1
|
||||
marktime>=0.2.0
|
||||
psutil>=2.1.1
|
||||
python-marconiclient>=0.0.2
|
36
marconi/bench/README.rst
Normal file
36
marconi/bench/README.rst
Normal file
@ -0,0 +1,36 @@
|
||||
Marconi Benchmarking
|
||||
====================
|
||||
|
||||
Structure
|
||||
---------
|
||||
The Benchmark tool fires up both a Producer Process and a Consumer Process, while
|
||||
accepting CLI parameters for the number of processes, number of workers and duration of test.
|
||||
|
||||
The Producer Process publishes messages to a given queue, while the Consumer consumes the messages
|
||||
claiming and deleting them.
|
||||
|
||||
Need of the Benchmark
|
||||
---------------------
|
||||
|
||||
Marconi is a performance oriented API. Any changes made need to performance tested, and this tool
|
||||
helps by a being quick way to test that.
|
||||
|
||||
Setup
|
||||
-----
|
||||
Benchmark dependencies need to be pip installed::
|
||||
|
||||
pip install -r bench-requirements.txt
|
||||
|
||||
Make sure you have a running instance of Marconi after following `README`_ for
|
||||
setting up Marconi running at port 8888::
|
||||
|
||||
Export an environment variable called MESSAGES_PATH and set it to the path of messages.json
|
||||
in marconi/bench
|
||||
|
||||
Note: This allows benchmarking with different set of messages rather than those specified in
|
||||
messages.json
|
||||
|
||||
$ marconi-bench-pc -p {Number of Processes} -w {Number of Workers} -t {Duration in Seconds}
|
||||
|
||||
|
||||
.. _`README` : https://github.com/openstack/marconi/blob/master/README.rst
|
0
marconi/bench/__init__.py
Normal file
0
marconi/bench/__init__.py
Normal file
33
marconi/bench/cli_config.py
Normal file
33
marconi/bench/cli_config.py
Normal file
@ -0,0 +1,33 @@
|
||||
# Copyright (c) 2014 Rackspace, Inc.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo.config import cfg
|
||||
import psutil
|
||||
|
||||
conf = cfg.CONF
|
||||
_CLI_OPTIONS = (
|
||||
cfg.IntOpt(
|
||||
'processes',
|
||||
short='p',
|
||||
default=psutil.NUM_CPUS,
|
||||
help='Number of Processes'),
|
||||
cfg.IntOpt(
|
||||
'workers',
|
||||
short='w',
|
||||
default=psutil.NUM_CPUS * 2,
|
||||
help='Number of Workers'),
|
||||
cfg.IntOpt('time', short='t', default=3, help="time in seconds"),
|
||||
)
|
||||
conf.register_cli_opts(_CLI_OPTIONS)
|
||||
conf(project='marconi', prog='marconi-queues')
|
29
marconi/bench/conductor.py
Normal file
29
marconi/bench/conductor.py
Normal file
@ -0,0 +1,29 @@
|
||||
# Copyright (c) 2014 Rackspace, Inc.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
import multiprocessing as mp
|
||||
|
||||
from marconi.bench import consumer
|
||||
from marconi.bench import producer
|
||||
|
||||
|
||||
def main():
|
||||
procs = [mp.Process(target=worker.run)
|
||||
for worker in [producer, consumer]]
|
||||
for each_proc in procs:
|
||||
each_proc.start()
|
||||
for each_proc in procs:
|
||||
each_proc.join()
|
142
marconi/bench/consumer.py
Normal file
142
marconi/bench/consumer.py
Normal file
@ -0,0 +1,142 @@
|
||||
# Copyright (c) 2014 Rackspace, Inc.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from __future__ import division
|
||||
|
||||
import multiprocessing as mp
|
||||
import time
|
||||
|
||||
from gevent import monkey as curious_george
|
||||
curious_george.patch_all(thread=False, select=False)
|
||||
import gevent
|
||||
from marconiclient.queues.v1 import client
|
||||
from marconiclient.transport.errors import TransportError
|
||||
import marktime
|
||||
|
||||
from marconi.bench.cli_config import conf
|
||||
|
||||
URL = 'http://localhost:8888'
|
||||
QUEUE_PREFIX = 'ogre-test-queue-'
|
||||
|
||||
|
||||
def claim_delete(stats, test_duration, ttl, grace, limit):
|
||||
"""Consumer Worker
|
||||
|
||||
The Consumer Worker continuously claims and deletes messages
|
||||
for the specified duration. The time taken for each claim and
|
||||
delete is recorded for calculating throughput and latency.
|
||||
"""
|
||||
|
||||
cli = client.Client(URL)
|
||||
queue = cli.queue(QUEUE_PREFIX + '1')
|
||||
end = time.time() + test_duration
|
||||
total_elapsed = 0
|
||||
total_requests = 0
|
||||
claim_total_requests = 0
|
||||
delete_total_requests = 0
|
||||
|
||||
while time.time() < end:
|
||||
marktime.start('claim_message')
|
||||
try:
|
||||
claim = queue.claim(ttl=ttl, grace=grace, limit=limit)
|
||||
|
||||
except TransportError as ex:
|
||||
print ("Could not claim messages : {0}".format(ex))
|
||||
|
||||
else:
|
||||
total_elapsed += marktime.stop('claim_message').seconds
|
||||
total_requests += 1
|
||||
claim_total_requests += 1
|
||||
|
||||
try:
|
||||
marktime.start('delete_message')
|
||||
|
||||
for msg in claim:
|
||||
# TODO(TheSriram): Simulate actual work before deletion
|
||||
msg.delete()
|
||||
|
||||
total_elapsed += marktime.stop('delete_message').seconds
|
||||
delete_total_requests += 1
|
||||
total_requests += 1
|
||||
stats.put({'total_requests': total_requests,
|
||||
'claim_total_requests': claim_total_requests,
|
||||
'delete_total_requests': delete_total_requests,
|
||||
'total_elapsed': total_elapsed})
|
||||
|
||||
except TransportError as ex:
|
||||
print ("Could not claim and delete : {0}".format(ex))
|
||||
|
||||
|
||||
def load_generator(stats, num_workers, test_duration, url, ttl, grace, limit):
|
||||
gevent.joinall([
|
||||
gevent.spawn(claim_delete, stats, test_duration, ttl,
|
||||
grace, limit)
|
||||
for _ in range(num_workers)
|
||||
])
|
||||
|
||||
|
||||
def crunch(stats):
|
||||
total_requests = 0
|
||||
total_latency = 0.0
|
||||
claim_total_requests = 0
|
||||
delete_total_requests = 0
|
||||
|
||||
while not stats.empty():
|
||||
entry = stats.get_nowait()
|
||||
total_requests += entry['total_requests']
|
||||
total_latency += entry['total_elapsed']
|
||||
claim_total_requests += entry['claim_total_requests']
|
||||
delete_total_requests += entry['delete_total_requests']
|
||||
|
||||
return (total_requests, total_latency, claim_total_requests,
|
||||
delete_total_requests)
|
||||
|
||||
|
||||
def run():
|
||||
num_procs = conf.processes
|
||||
num_workers = conf.workers
|
||||
test_duration = conf.time
|
||||
stats = mp.Queue()
|
||||
# TODO(TheSriram) : Make ttl,grace and limit configurable
|
||||
args = (stats, num_workers, test_duration, URL, 300, 200, 1)
|
||||
|
||||
procs = [mp.Process(target=load_generator, args=args)
|
||||
for _ in range(num_procs)]
|
||||
|
||||
print ("\nStarting Consumer...")
|
||||
|
||||
start = time.time()
|
||||
|
||||
for each_proc in procs:
|
||||
each_proc.start()
|
||||
for each_proc in procs:
|
||||
each_proc.join()
|
||||
|
||||
(total_requests, total_latency, claim_total_requests,
|
||||
delete_total_requests) = crunch(stats)
|
||||
|
||||
duration = time.time() - start
|
||||
throughput = total_requests / duration
|
||||
latency = 1000 * total_latency / total_requests
|
||||
|
||||
print('Duration: {0:.1f} sec'.format(duration))
|
||||
print('Total Requests: {0}'.format(total_requests))
|
||||
print('Throughput: {0:.0f} req/sec'.format(throughput))
|
||||
print('Latency: {0:.1f} ms/req'.format(latency))
|
||||
|
||||
print('') # Blank line
|
||||
|
||||
|
||||
def main():
|
||||
run()
|
72
marconi/bench/messages.json
Normal file
72
marconi/bench/messages.json
Normal file
@ -0,0 +1,72 @@
|
||||
[
|
||||
{
|
||||
"weight": 0.8,
|
||||
"doc": {
|
||||
"ttl": 60,
|
||||
"body": {
|
||||
"id": "7FA23C90-62F7-40D2-9360-FBD5D7D61CD1",
|
||||
"evt": "Wakeup"
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"weight": 0.1,
|
||||
"doc": {
|
||||
"ttl": 3600,
|
||||
"body": {
|
||||
"ResultSet": {
|
||||
"totalResultsAvailable": 1827221,
|
||||
"totalResultsReturned": 2,
|
||||
"firstResultPosition": 1,
|
||||
"Result": [
|
||||
{
|
||||
"Title": "potato jpg",
|
||||
"Summary": "Kentang Si bungsu dari keluarga Solanum tuberosum L ini ternyata memiliki khasiat untuk mengurangi kerutan jerawat bintik hitam dan kemerahan pada kulit Gunakan seminggu sekali sebagai",
|
||||
"Url": "http://www.mediaindonesia.com/spaw/uploads/images/potato.jpg",
|
||||
"ClickUrl": "http://www.mediaindonesia.com/spaw/uploads/images/potato.jpg",
|
||||
"RefererUrl": "http://www.mediaindonesia.com/mediaperempuan/index.php?ar_id=Nzkw",
|
||||
"FileSize": 22630,
|
||||
"FileFormat": "jpeg",
|
||||
"Height": 362,
|
||||
"Width": 532,
|
||||
"Thumbnail": {
|
||||
"Url": "http://thm-a01.yimg.com/nimage/557094559c18f16a",
|
||||
"Height": 98,
|
||||
"Width": 145
|
||||
}
|
||||
},
|
||||
{
|
||||
"Title": "potato jpg",
|
||||
"Summary": "Introduction of puneri aloo This is a traditional potato preparation flavoured with curry leaves and peanuts and can be eaten on fasting day Preparation time 10 min",
|
||||
"Url": "http://www.infovisual.info/01/photo/potato.jpg",
|
||||
"ClickUrl": "http://www.infovisual.info/01/photo/potato.jpg",
|
||||
"RefererUrl": "http://sundayfood.com/puneri-aloo-indian-%20recipe",
|
||||
"FileSize": 119398,
|
||||
"FileFormat": "jpeg",
|
||||
"Height": 685,
|
||||
"Width": 1024,
|
||||
"Thumbnail": {
|
||||
"Url": "http://thm-a01.yimg.com/nimage/7fa23212efe84b64",
|
||||
"Height": 107,
|
||||
"Width": 160
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"weight": 0.1,
|
||||
"doc": {
|
||||
"ttl": 360,
|
||||
"body": {
|
||||
"id": "7FA23C90-62F7-40D2-9360-FBD5D7D61CD1",
|
||||
"evt": "StartBackup",
|
||||
"files": [
|
||||
"/foo/bar/stuff/thing.dat"
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
169
marconi/bench/producer.py
Normal file
169
marconi/bench/producer.py
Normal file
@ -0,0 +1,169 @@
|
||||
# Copyright (c) 2014 Rackspace, Inc.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from __future__ import division
|
||||
|
||||
import json
|
||||
import multiprocessing as mp
|
||||
import os
|
||||
import random
|
||||
import sys
|
||||
import time
|
||||
|
||||
from gevent import monkey as curious_george
|
||||
curious_george.patch_all(thread=False, select=False)
|
||||
import gevent
|
||||
from marconiclient.queues.v1 import client
|
||||
from marconiclient.transport.errors import TransportError
|
||||
import marktime
|
||||
|
||||
from marconi.bench.cli_config import conf
|
||||
|
||||
|
||||
# TODO(TheSriram): Make configurable
|
||||
URL = 'http://localhost:8888'
|
||||
QUEUE_PREFIX = 'ogre-test-queue-'
|
||||
|
||||
# TODO(TheSriram) : Migrate from env variable to config
|
||||
if os.environ.get('MESSAGES_PATH'):
|
||||
with open(os.environ.get('MESSAGES_PATH')) as f:
|
||||
message_pool = json.loads(f.read())
|
||||
else:
|
||||
print("Error : $MESSAGES_PATH needs to be set")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
message_pool.sort(key=lambda msg: msg['weight'])
|
||||
|
||||
|
||||
def choose_message():
|
||||
"""Choose a message from our pool of possibilities."""
|
||||
|
||||
# Assume message_pool is sorted by weight, ascending
|
||||
position = random.random()
|
||||
accumulator = 0.00
|
||||
|
||||
for each_message in message_pool:
|
||||
accumulator += each_message['weight']
|
||||
if position < accumulator:
|
||||
return each_message['doc']
|
||||
|
||||
assert False
|
||||
|
||||
|
||||
def producer(stats, test_duration):
|
||||
"""Producer Worker
|
||||
|
||||
The Producer Worker continuously post messages for
|
||||
the specified duration. The time taken for each post
|
||||
is recorded for calculating throughput and latency.
|
||||
"""
|
||||
|
||||
cli = client.Client(URL)
|
||||
queue = cli.queue(QUEUE_PREFIX + '1')
|
||||
|
||||
total_requests = 0
|
||||
total_elapsed = 0
|
||||
end = time.time() + test_duration
|
||||
|
||||
while time.time() < end:
|
||||
marktime.start('post message')
|
||||
|
||||
# TODO(TheSriram): Track/report errors
|
||||
try:
|
||||
queue.post(choose_message())
|
||||
|
||||
except TransportError as ex:
|
||||
print("Could not post a message : {0}".format(ex))
|
||||
|
||||
else:
|
||||
total_elapsed += marktime.stop('post message').seconds
|
||||
total_requests += 1
|
||||
|
||||
stats.put({
|
||||
'total_requests': total_requests,
|
||||
'total_elapsed': total_elapsed
|
||||
})
|
||||
|
||||
|
||||
# TODO(TheSriram): make distributed across multiple machines
|
||||
# TODO(TheSriram): post across several queues (which workers to which queues?
|
||||
# weight them, so can have some busy queues, some not.)
|
||||
def load_generator(stats, num_workers, test_duration):
|
||||
# TODO(TheSriram): Have some way to get all of the workers to line up and
|
||||
# start at the same time (is this really useful?)
|
||||
|
||||
gevent.joinall([
|
||||
gevent.spawn(producer, stats, test_duration)
|
||||
for _ in range(num_workers)
|
||||
])
|
||||
|
||||
|
||||
def crunch(stats):
|
||||
total_requests = 0
|
||||
total_latency = 0.0
|
||||
|
||||
while not stats.empty():
|
||||
entry = stats.get_nowait()
|
||||
total_requests += entry['total_requests']
|
||||
total_latency += entry['total_elapsed']
|
||||
|
||||
return total_requests, total_latency
|
||||
|
||||
|
||||
def run():
|
||||
|
||||
num_procs = conf.processes
|
||||
num_workers = conf.workers
|
||||
test_duration = conf.time
|
||||
stats = mp.Queue()
|
||||
args = (stats, num_workers, test_duration)
|
||||
|
||||
# TODO(TheSriram): Multiple test runs, vary num workers and drain/delete
|
||||
# queues in between each run. Plot these on a graph, with
|
||||
# concurrency as the X axis.
|
||||
|
||||
procs = [
|
||||
mp.Process(target=load_generator, args=args)
|
||||
for _ in range(num_procs)
|
||||
]
|
||||
|
||||
print('\nStarting Producer...')
|
||||
start = time.time()
|
||||
|
||||
for each_proc in procs:
|
||||
each_proc.start()
|
||||
|
||||
for each_proc in procs:
|
||||
each_proc.join()
|
||||
|
||||
total_requests, total_latency = crunch(stats)
|
||||
|
||||
# TODO(TheSriram): Add one more stat: "attempted req/sec" so can
|
||||
# graph that on the x axis vs. achieved throughput and
|
||||
# latency.
|
||||
duration = time.time() - start
|
||||
throughput = total_requests / duration
|
||||
latency = 1000 * total_latency / total_requests
|
||||
|
||||
print('Duration: {0:.1f} sec'.format(duration))
|
||||
print('Total Requests: {0}'.format(total_requests))
|
||||
print('Throughput: {0:.0f} req/sec'.format(throughput))
|
||||
print('Latency: {0:.1f} ms/req'.format(latency))
|
||||
|
||||
print('') # Blank line
|
||||
|
||||
|
||||
def main():
|
||||
run()
|
Loading…
Reference in New Issue
Block a user