Initial checkin of benchmark code
This commit is contained in:
parent
f828723075
commit
6cef24dcd7
104
bin/swift-bench
Executable file
104
bin/swift-bench
Executable file
@ -0,0 +1,104 @@
|
||||
#!/usr/bin/python
|
||||
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import signal
|
||||
import uuid
|
||||
from optparse import OptionParser
|
||||
|
||||
from swift.common.bench import BenchController
|
||||
from swift.common.utils import readconf, NamedLogger
|
||||
|
||||
# The defaults should be sufficient to run the slapper on a SAIO
|
||||
CONF_DEFAULTS = {
|
||||
'auth': 'http://saio:11000/v1.0',
|
||||
'user': 'test:tester',
|
||||
'key': 'testing',
|
||||
'object_sources': '',
|
||||
'put_concurrency': '10',
|
||||
'get_concurrency': '10',
|
||||
'del_concurrency': '10',
|
||||
'concurrency': '',
|
||||
'object_size': '1',
|
||||
'num_objects': '1000',
|
||||
'num_gets': '10000',
|
||||
'delete': 'yes',
|
||||
'container_name': uuid.uuid4().hex,
|
||||
'use_proxy': 'yes',
|
||||
'url': '',
|
||||
'devices': 'sdb',
|
||||
'logging_level': 'INFO',
|
||||
'timeout': '10',
|
||||
}
|
||||
|
||||
if __name__ == '__main__':
|
||||
usage = "usage: %prog [OPTIONS] [CONF_FILE]"
|
||||
usage += """\n\nConf file (with defaults):
|
||||
|
||||
[bench]
|
||||
auth = http://saio:11000/v1.0
|
||||
user = test:tester
|
||||
key = testing
|
||||
concurrency = 10
|
||||
object_size = 1
|
||||
num_objects = 1000
|
||||
num_gets = 10000
|
||||
delete = yes
|
||||
"""
|
||||
parser = OptionParser(usage=usage)
|
||||
parser.add_option('-A', '--auth', dest='auth',
|
||||
help='URL for obtaining an auth token')
|
||||
parser.add_option('-U', '--user', dest='user',
|
||||
help='User name for obtaining an auth token')
|
||||
parser.add_option('-K', '--key', dest='key',
|
||||
help='Key for obtaining an auth token')
|
||||
parser.add_option('-u', '--url', dest='url',
|
||||
help='Storage URL')
|
||||
parser.add_option('-c', '--concurrency', dest='concurrency',
|
||||
help='Number of concurrent connections to use')
|
||||
parser.add_option('-s', '--object-size', dest='object_size',
|
||||
help='Size of objects to PUT (in bytes)')
|
||||
parser.add_option('-n', '--num-objects', dest='num_objects',
|
||||
help='Number of objects to PUT')
|
||||
parser.add_option('-g', '--num-gets', dest='num_gets',
|
||||
help='Number of GET operations to perform')
|
||||
parser.add_option('-x', '--no-delete', dest='delete', action='store_false',
|
||||
help='If set, will not delete the objects created')
|
||||
|
||||
_, args = parser.parse_args()
|
||||
if args:
|
||||
conf = args[0]
|
||||
if not os.path.exists(conf):
|
||||
sys.exit("No such conf file: %s" % conf)
|
||||
conf = readconf(conf, 'bench', log_name='swift-bench',
|
||||
defaults=CONF_DEFAULTS)
|
||||
else:
|
||||
conf = CONF_DEFAULTS
|
||||
parser.set_defaults(**conf)
|
||||
options, _ = parser.parse_args()
|
||||
if options.concurrency is not '':
|
||||
options.put_concurrency = options.concurrency
|
||||
options.get_concurrency = options.concurrency
|
||||
options.del_concurrency = options.concurrency
|
||||
|
||||
def sigterm(signum, frame):
|
||||
sys.exit('Termination signal received.')
|
||||
signal.signal(signal.SIGTERM, sigterm)
|
||||
|
||||
logger = logging.getLogger()
|
||||
logger.setLevel({
|
||||
'debug': logging.DEBUG,
|
||||
'info': logging.INFO,
|
||||
'warning': logging.WARNING,
|
||||
'error': logging.ERROR,
|
||||
'critical': logging.CRITICAL}.get(
|
||||
options.logging_level, logging.INFO))
|
||||
loghandler = logging.StreamHandler()
|
||||
logformat = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
|
||||
loghandler.setFormatter(logformat)
|
||||
logger.addHandler(loghandler)
|
||||
logger = NamedLogger(logger, 'swift-bench')
|
||||
|
||||
controller = BenchController(logger, options)
|
||||
controller.run()
|
3
setup.py
3
setup.py
@ -74,7 +74,8 @@ setup(
|
||||
'bin/swift-object-server',
|
||||
'bin/swift-object-updater', 'bin/swift-proxy-server',
|
||||
'bin/swift-ring-builder', 'bin/swift-stats-populate',
|
||||
'bin/swift-stats-report'
|
||||
'bin/swift-stats-report',
|
||||
'bin/swift-bench',
|
||||
],
|
||||
entry_points={
|
||||
'paste.app_factory': [
|
||||
|
273
swift/common/bench.py
Normal file
273
swift/common/bench.py
Normal file
@ -0,0 +1,273 @@
|
||||
import hashlib
|
||||
import uuid
|
||||
import time
|
||||
import random
|
||||
from urlparse import urlparse
|
||||
|
||||
import eventlet.pools
|
||||
import eventlet.corolocal
|
||||
from eventlet.green.httplib import HTTPSConnection, \
|
||||
HTTPResponse, CannotSendRequest, _UNKNOWN
|
||||
|
||||
from swift.common.bufferedhttp \
|
||||
import BufferedHTTPConnection as HTTPConnection
|
||||
|
||||
|
||||
class ConnectionPool(eventlet.pools.Pool):
|
||||
def __init__(self, url, size):
|
||||
self.url = url
|
||||
self.url_parsed = urlparse(self.url)
|
||||
eventlet.pools.Pool.__init__(self, size, size)
|
||||
|
||||
def create(self):
|
||||
if self.url_parsed[0] == 'https':
|
||||
hc = HTTPSConnection(self.url_parsed[1])
|
||||
elif self.url_parsed[0] == 'http':
|
||||
hc = HTTPConnection(self.url_parsed[1])
|
||||
else:
|
||||
raise Exception("Can't handle %s" % self.url_parsed[0])
|
||||
return hc
|
||||
|
||||
|
||||
class Bench(object):
|
||||
def __init__(self, logger, conf, names):
|
||||
self.logger = logger
|
||||
self.user = conf.user
|
||||
self.key = conf.key
|
||||
self.auth_url = conf.auth
|
||||
self.use_proxy = conf.use_proxy in ('1', 'yes', 'true', 'True')
|
||||
if self.use_proxy:
|
||||
# Get the auth token
|
||||
parsed = urlparse(self.auth_url)
|
||||
if parsed.scheme == 'http':
|
||||
hc = HTTPConnection(parsed.netloc)
|
||||
elif parsed.scheme == 'https':
|
||||
hc = HTTPSConnection(parsed.netloc)
|
||||
else:
|
||||
raise ClientException(
|
||||
'Cannot handle protocol scheme %s for url %s' %
|
||||
(parsed.scheme, self.auth_url))
|
||||
hc_args = ('GET', parsed.path, None,
|
||||
{'X-Auth-User': self.user, 'X-Auth-Key': self.key})
|
||||
hc.request(*hc_args)
|
||||
hcr = hc.getresponse()
|
||||
hcrd = hcr.read()
|
||||
if hcr.status != 204:
|
||||
raise Exception("Could not authenticate (%s)" % hcr.status)
|
||||
self.token = hcr.getheader('x-auth-token')
|
||||
self.account = hcr.getheader('x-storage-url').split('/')[-1]
|
||||
if conf.url == '':
|
||||
self.url = hcr.getheader('x-storage-url')
|
||||
else:
|
||||
self.url = conf.url
|
||||
else:
|
||||
self.token = 'SlapChop!'
|
||||
self.account = conf.account
|
||||
self.url = conf.url
|
||||
self.container_name = conf.container_name
|
||||
|
||||
self.object_size = int(conf.object_size)
|
||||
self.object_sources = conf.object_sources
|
||||
self.files = []
|
||||
if self.object_sources:
|
||||
self.object_sources = self.object_sources.split()
|
||||
self.files = [file(f, 'rb').read() for f in self.object_sources]
|
||||
|
||||
self.put_concurrency = int(conf.put_concurrency)
|
||||
self.get_concurrency = int(conf.get_concurrency)
|
||||
self.del_concurrency = int(conf.del_concurrency)
|
||||
self.total_objects = int(conf.num_objects)
|
||||
self.total_gets = int(conf.num_gets)
|
||||
self.timeout = int(conf.timeout)
|
||||
self.url_parsed = urlparse(self.url)
|
||||
self.devices = conf.devices.split()
|
||||
self.names = names
|
||||
self.local = eventlet.corolocal.local()
|
||||
self.conn_pool = ConnectionPool(self.url,
|
||||
max(self.put_concurrency, self.get_concurrency,
|
||||
self.del_concurrency))
|
||||
|
||||
def _log_status(self, title):
|
||||
total = time.time() - self.beginbeat
|
||||
self.logger.info('%s %s [%s failures], %.01f/s' % (
|
||||
self.complete, title, self.failures,
|
||||
(float(self.complete) / total),
|
||||
))
|
||||
|
||||
def _create_connection(self):
|
||||
if self.url_parsed[0] == 'https':
|
||||
hc = HTTPSConnection(self.url_parsed[1])
|
||||
elif self.url_parsed[0] == 'http':
|
||||
hc = HTTPConnection(self.url_parsed[1])
|
||||
else:
|
||||
raise Exception("Can't handle %s" % self.url_parsed[0])
|
||||
return hc
|
||||
|
||||
def _send_request(self, *args):
|
||||
hc = self.conn_pool.get()
|
||||
try:
|
||||
start = time.time()
|
||||
try:
|
||||
hc.request(*args)
|
||||
hcr = hc.getresponse()
|
||||
hcrd = hcr.read()
|
||||
hcr.close()
|
||||
except CannotSendRequest:
|
||||
self.logger.info("CannonSendRequest. Skipping...")
|
||||
try:
|
||||
hc.close()
|
||||
except:
|
||||
pass
|
||||
self.failures += 1
|
||||
hc = self._create_connection()
|
||||
return
|
||||
total = time.time() - start
|
||||
self.logger.debug("%s %s: %04f" %
|
||||
(args[0], args[1], total))
|
||||
if hcr.status < 200 or hcr.status > 299:
|
||||
self.failures += 1
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
finally:
|
||||
self.conn_pool.put(hc)
|
||||
|
||||
def run(self):
|
||||
pool = eventlet.GreenPool(self.concurrency)
|
||||
events = []
|
||||
self.beginbeat = self.heartbeat = time.time()
|
||||
self.heartbeat -= 13 # just to get the first report quicker
|
||||
self.failures = 0
|
||||
self.complete = 0
|
||||
for i in xrange(self.total):
|
||||
pool.spawn_n(self._run, i)
|
||||
pool.waitall()
|
||||
self._log_status(self.msg + ' **FINAL**')
|
||||
|
||||
def _run(self, thread):
|
||||
return
|
||||
|
||||
|
||||
class BenchController(object):
|
||||
def __init__(self, logger, conf):
|
||||
self.logger = logger
|
||||
self.conf = conf
|
||||
self.names = []
|
||||
self.delete = conf.delete in ('1', 'true', 'True', 'yes')
|
||||
self.gets = int(conf.num_gets)
|
||||
|
||||
def run(self):
|
||||
puts = BenchPUT(self.logger, self.conf, self.names)
|
||||
puts.run()
|
||||
if self.gets:
|
||||
gets = BenchGET(self.logger, self.conf, self.names)
|
||||
gets.run()
|
||||
if self.delete:
|
||||
dels = BenchDELETE(self.logger, self.conf, self.names)
|
||||
dels.run()
|
||||
|
||||
|
||||
class BenchDELETE(Bench):
|
||||
def __init__(self, logger, conf, names):
|
||||
Bench.__init__(self, logger, conf, names)
|
||||
self.concurrency = self.del_concurrency
|
||||
self.total = len(names)
|
||||
self.msg = 'DEL'
|
||||
|
||||
def _run(self, thread):
|
||||
if time.time() - self.heartbeat >= 15:
|
||||
self.heartbeat = time.time()
|
||||
self._log_status('DEL')
|
||||
device, partition, path = self.names.pop()
|
||||
headers = {
|
||||
'X-Timestamp': "%013.05f" % time.time(),
|
||||
'X-ID': str(uuid.uuid4()),
|
||||
'X-Auth-Token': self.token,
|
||||
}
|
||||
if self.use_proxy:
|
||||
hc_args = ('DELETE', "/v1/%s/%s/%s" %
|
||||
(self.account, self.container_name, path), '', headers)
|
||||
else:
|
||||
hc_args = ('DELETE', "/%s/%s/%s/%s/%s" %
|
||||
(device, partition, self.account, self.container_name, path),
|
||||
'', headers)
|
||||
self._send_request(*hc_args)
|
||||
self.complete += 1
|
||||
|
||||
|
||||
class BenchGET(Bench):
|
||||
def __init__(self, logger, conf, names):
|
||||
Bench.__init__(self, logger, conf, names)
|
||||
self.concurrency = self.get_concurrency
|
||||
self.total = self.total_gets
|
||||
self.msg = 'GETS'
|
||||
|
||||
def _run(self, thread):
|
||||
if time.time() - self.heartbeat >= 15:
|
||||
self.heartbeat = time.time()
|
||||
self._log_status('GETS')
|
||||
device, partition, name = random.choice(self.names)
|
||||
headers = {
|
||||
'X-Auth-Token': self.token,
|
||||
'X-Timestamp': "%013.05f" % time.time(),
|
||||
}
|
||||
if self.use_proxy:
|
||||
hc_args = ('GET', '/v1/%s/%s/%s' %
|
||||
(self.account, self.container_name, name), '', headers)
|
||||
else:
|
||||
hc_args = ('GET', '/%s/%s/%s/%s/%s' %
|
||||
(device, partition, self.account, self.container_name, name),
|
||||
'', headers)
|
||||
self._send_request(*hc_args)
|
||||
self.complete += 1
|
||||
|
||||
|
||||
class BenchPUT(Bench):
|
||||
def __init__(self, logger, conf, names):
|
||||
Bench.__init__(self, logger, conf, names)
|
||||
self.concurrency = self.put_concurrency
|
||||
self.total = self.total_objects
|
||||
self.msg = 'PUTS'
|
||||
if self.use_proxy:
|
||||
# Create the container
|
||||
if self.url.startswith('http://'):
|
||||
hc = HTTPConnection(self.url.split('/')[2])
|
||||
else:
|
||||
hc = HTTPSConnection(self.url.split('/')[2])
|
||||
hc_args = ('PUT',
|
||||
'/v1/%s/%s' % (self.account, self.container_name),
|
||||
None, {'X-Auth-Token': self.token})
|
||||
hc.request(*hc_args)
|
||||
hcr = hc.getresponse()
|
||||
hcrd = hcr.read()
|
||||
if hcr.status < 200 or hcr.status > 299:
|
||||
raise Exception('Could not create container %s: code: %s' %
|
||||
(self.container_name, hcr.status))
|
||||
|
||||
def _run(self, thread):
|
||||
if time.time() - self.heartbeat >= 15:
|
||||
self.heartbeat = time.time()
|
||||
self._log_status('PUTS')
|
||||
name = uuid.uuid4().hex
|
||||
if self.object_sources:
|
||||
source = random.choice(self.files)
|
||||
else:
|
||||
source = '0' * self.object_size
|
||||
headers = {
|
||||
'Content-Type': 'application/octet-stream',
|
||||
'X-ID': str(uuid.uuid4()),
|
||||
'X-Auth-Token': self.token,
|
||||
'X-Timestamp': "%013.05f" % time.time(),
|
||||
}
|
||||
device = random.choice(self.devices)
|
||||
partition = str(random.randint(1, 3000))
|
||||
if self.use_proxy:
|
||||
hc_args = ('PUT', '/v1/%s/%s/%s' %
|
||||
(self.account, self.container_name, name), source, headers)
|
||||
else:
|
||||
hc_args = ('PUT', '/%s/%s/%s/%s/%s' %
|
||||
(device, partition, self.account, self.container_name, name),
|
||||
source, headers)
|
||||
if self._send_request(*hc_args):
|
||||
self.names.append((device, partition, name))
|
||||
self.complete += 1
|
@ -534,8 +534,10 @@ def cache_from_env(env):
|
||||
return item_from_env(env, 'swift.cache')
|
||||
|
||||
|
||||
def readconf(conf, section_name, log_name=None):
|
||||
c = ConfigParser()
|
||||
def readconf(conf, section_name, log_name=None, defaults=None):
|
||||
if defaults is None:
|
||||
defaults = {}
|
||||
c = ConfigParser(defaults)
|
||||
if not c.read(conf):
|
||||
print "Unable to read config file %s" % conf
|
||||
sys.exit(1)
|
||||
|
Loading…
x
Reference in New Issue
Block a user