From 6cef24dcd757ad36d4d861353d3fc911a36a8e7a Mon Sep 17 00:00:00 2001 From: Chuck Thier Date: Mon, 27 Sep 2010 23:10:09 +0000 Subject: [PATCH] Initial checkin of benchmark code --- bin/swift-bench | 104 ++++++++++++++++ setup.py | 3 +- swift/common/bench.py | 273 ++++++++++++++++++++++++++++++++++++++++++ swift/common/utils.py | 6 +- 4 files changed, 383 insertions(+), 3 deletions(-) create mode 100755 bin/swift-bench create mode 100644 swift/common/bench.py diff --git a/bin/swift-bench b/bin/swift-bench new file mode 100755 index 0000000000..10df6a74a3 --- /dev/null +++ b/bin/swift-bench @@ -0,0 +1,104 @@ +#!/usr/bin/python + +import logging +import os +import sys +import signal +import uuid +from optparse import OptionParser + +from swift.common.bench import BenchController +from swift.common.utils import readconf, NamedLogger + +# The defaults should be sufficient to run the slapper on a SAIO +CONF_DEFAULTS = { + 'auth': 'http://saio:11000/v1.0', + 'user': 'test:tester', + 'key': 'testing', + 'object_sources': '', + 'put_concurrency': '10', + 'get_concurrency': '10', + 'del_concurrency': '10', + 'concurrency': '', + 'object_size': '1', + 'num_objects': '1000', + 'num_gets': '10000', + 'delete': 'yes', + 'container_name': uuid.uuid4().hex, + 'use_proxy': 'yes', + 'url': '', + 'devices': 'sdb', + 'logging_level': 'INFO', + 'timeout': '10', + } + +if __name__ == '__main__': + usage = "usage: %prog [OPTIONS] [CONF_FILE]" + usage += """\n\nConf file (with defaults): + + [bench] + auth = http://saio:11000/v1.0 + user = test:tester + key = testing + concurrency = 10 + object_size = 1 + num_objects = 1000 + num_gets = 10000 + delete = yes + """ + parser = OptionParser(usage=usage) + parser.add_option('-A', '--auth', dest='auth', + help='URL for obtaining an auth token') + parser.add_option('-U', '--user', dest='user', + help='User name for obtaining an auth token') + parser.add_option('-K', '--key', dest='key', + help='Key for obtaining an auth token') + parser.add_option('-u', '--url', dest='url', + help='Storage URL') + parser.add_option('-c', '--concurrency', dest='concurrency', + help='Number of concurrent connections to use') + parser.add_option('-s', '--object-size', dest='object_size', + help='Size of objects to PUT (in bytes)') + parser.add_option('-n', '--num-objects', dest='num_objects', + help='Number of objects to PUT') + parser.add_option('-g', '--num-gets', dest='num_gets', + help='Number of GET operations to perform') + parser.add_option('-x', '--no-delete', dest='delete', action='store_false', + help='If set, will not delete the objects created') + + _, args = parser.parse_args() + if args: + conf = args[0] + if not os.path.exists(conf): + sys.exit("No such conf file: %s" % conf) + conf = readconf(conf, 'bench', log_name='swift-bench', + defaults=CONF_DEFAULTS) + else: + conf = CONF_DEFAULTS + parser.set_defaults(**conf) + options, _ = parser.parse_args() + if options.concurrency is not '': + options.put_concurrency = options.concurrency + options.get_concurrency = options.concurrency + options.del_concurrency = options.concurrency + + def sigterm(signum, frame): + sys.exit('Termination signal received.') + signal.signal(signal.SIGTERM, sigterm) + + logger = logging.getLogger() + logger.setLevel({ + 'debug': logging.DEBUG, + 'info': logging.INFO, + 'warning': logging.WARNING, + 'error': logging.ERROR, + 'critical': logging.CRITICAL}.get( + options.logging_level, logging.INFO)) + loghandler = logging.StreamHandler() + logformat = logging.Formatter('%(asctime)s %(levelname)s %(message)s') + loghandler.setFormatter(logformat) + logger.addHandler(loghandler) + logger = NamedLogger(logger, 'swift-bench') + + controller = BenchController(logger, options) + controller.run() diff --git a/setup.py b/setup.py index 56977c97d2..936bf92f8e 100644 --- a/setup.py +++ b/setup.py @@ -74,7 +74,8 @@ setup( 'bin/swift-object-server', 'bin/swift-object-updater', 'bin/swift-proxy-server', 'bin/swift-ring-builder', 'bin/swift-stats-populate', - 'bin/swift-stats-report' + 'bin/swift-stats-report', + 'bin/swift-bench', ], entry_points={ 'paste.app_factory': [ diff --git a/swift/common/bench.py b/swift/common/bench.py new file mode 100644 index 0000000000..49c4873809 --- /dev/null +++ b/swift/common/bench.py @@ -0,0 +1,273 @@ +import hashlib +import uuid +import time +import random +from urlparse import urlparse + +import eventlet.pools +import eventlet.corolocal +from eventlet.green.httplib import HTTPSConnection, \ + HTTPResponse, CannotSendRequest, _UNKNOWN + +from swift.common.bufferedhttp \ + import BufferedHTTPConnection as HTTPConnection + + +class ConnectionPool(eventlet.pools.Pool): + def __init__(self, url, size): + self.url = url + self.url_parsed = urlparse(self.url) + eventlet.pools.Pool.__init__(self, size, size) + + def create(self): + if self.url_parsed[0] == 'https': + hc = HTTPSConnection(self.url_parsed[1]) + elif self.url_parsed[0] == 'http': + hc = HTTPConnection(self.url_parsed[1]) + else: + raise Exception("Can't handle %s" % self.url_parsed[0]) + return hc + + +class Bench(object): + def __init__(self, logger, conf, names): + self.logger = logger + self.user = conf.user + self.key = conf.key + self.auth_url = conf.auth + self.use_proxy = conf.use_proxy in ('1', 'yes', 'true', 'True') + if self.use_proxy: + # Get the auth token + parsed = urlparse(self.auth_url) + if parsed.scheme == 'http': + hc = HTTPConnection(parsed.netloc) + elif parsed.scheme == 'https': + hc = HTTPSConnection(parsed.netloc) + else: + raise ClientException( + 'Cannot handle protocol scheme %s for url %s' % + (parsed.scheme, self.auth_url)) + hc_args = ('GET', parsed.path, None, + {'X-Auth-User': self.user, 'X-Auth-Key': self.key}) + hc.request(*hc_args) + hcr = hc.getresponse() + hcrd = hcr.read() + if hcr.status != 204: + raise Exception("Could not authenticate (%s)" % hcr.status) + self.token = hcr.getheader('x-auth-token') + self.account = hcr.getheader('x-storage-url').split('/')[-1] + if conf.url == '': + self.url = hcr.getheader('x-storage-url') + else: + self.url = conf.url + else: + self.token = 'SlapChop!' + self.account = conf.account + self.url = conf.url + self.container_name = conf.container_name + + self.object_size = int(conf.object_size) + self.object_sources = conf.object_sources + self.files = [] + if self.object_sources: + self.object_sources = self.object_sources.split() + self.files = [file(f, 'rb').read() for f in self.object_sources] + + self.put_concurrency = int(conf.put_concurrency) + self.get_concurrency = int(conf.get_concurrency) + self.del_concurrency = int(conf.del_concurrency) + self.total_objects = int(conf.num_objects) + self.total_gets = int(conf.num_gets) + self.timeout = int(conf.timeout) + self.url_parsed = urlparse(self.url) + self.devices = conf.devices.split() + self.names = names + self.local = eventlet.corolocal.local() + self.conn_pool = ConnectionPool(self.url, + max(self.put_concurrency, self.get_concurrency, + self.del_concurrency)) + + def _log_status(self, title): + total = time.time() - self.beginbeat + self.logger.info('%s %s [%s failures], %.01f/s' % ( + self.complete, title, self.failures, + (float(self.complete) / total), + )) + + def _create_connection(self): + if self.url_parsed[0] == 'https': + hc = HTTPSConnection(self.url_parsed[1]) + elif self.url_parsed[0] == 'http': + hc = HTTPConnection(self.url_parsed[1]) + else: + raise Exception("Can't handle %s" % self.url_parsed[0]) + return hc + + def _send_request(self, *args): + hc = self.conn_pool.get() + try: + start = time.time() + try: + hc.request(*args) + hcr = hc.getresponse() + hcrd = hcr.read() + hcr.close() + except CannotSendRequest: + self.logger.info("CannonSendRequest. Skipping...") + try: + hc.close() + except: + pass + self.failures += 1 + hc = self._create_connection() + return + total = time.time() - start + self.logger.debug("%s %s: %04f" % + (args[0], args[1], total)) + if hcr.status < 200 or hcr.status > 299: + self.failures += 1 + return False + else: + return True + finally: + self.conn_pool.put(hc) + + def run(self): + pool = eventlet.GreenPool(self.concurrency) + events = [] + self.beginbeat = self.heartbeat = time.time() + self.heartbeat -= 13 # just to get the first report quicker + self.failures = 0 + self.complete = 0 + for i in xrange(self.total): + pool.spawn_n(self._run, i) + pool.waitall() + self._log_status(self.msg + ' **FINAL**') + + def _run(self, thread): + return + + +class BenchController(object): + def __init__(self, logger, conf): + self.logger = logger + self.conf = conf + self.names = [] + self.delete = conf.delete in ('1', 'true', 'True', 'yes') + self.gets = int(conf.num_gets) + + def run(self): + puts = BenchPUT(self.logger, self.conf, self.names) + puts.run() + if self.gets: + gets = BenchGET(self.logger, self.conf, self.names) + gets.run() + if self.delete: + dels = BenchDELETE(self.logger, self.conf, self.names) + dels.run() + + +class BenchDELETE(Bench): + def __init__(self, logger, conf, names): + Bench.__init__(self, logger, conf, names) + self.concurrency = self.del_concurrency + self.total = len(names) + self.msg = 'DEL' + + def _run(self, thread): + if time.time() - self.heartbeat >= 15: + self.heartbeat = time.time() + self._log_status('DEL') + device, partition, path = self.names.pop() + headers = { + 'X-Timestamp': "%013.05f" % time.time(), + 'X-ID': str(uuid.uuid4()), + 'X-Auth-Token': self.token, + } + if self.use_proxy: + hc_args = ('DELETE', "/v1/%s/%s/%s" % + (self.account, self.container_name, path), '', headers) + else: + hc_args = ('DELETE', "/%s/%s/%s/%s/%s" % + (device, partition, self.account, self.container_name, path), + '', headers) + self._send_request(*hc_args) + self.complete += 1 + + +class BenchGET(Bench): + def __init__(self, logger, conf, names): + Bench.__init__(self, logger, conf, names) + self.concurrency = self.get_concurrency + self.total = self.total_gets + self.msg = 'GETS' + + def _run(self, thread): + if time.time() - self.heartbeat >= 15: + self.heartbeat = time.time() + self._log_status('GETS') + device, partition, name = random.choice(self.names) + headers = { + 'X-Auth-Token': self.token, + 'X-Timestamp': "%013.05f" % time.time(), + } + if self.use_proxy: + hc_args = ('GET', '/v1/%s/%s/%s' % + (self.account, self.container_name, name), '', headers) + else: + hc_args = ('GET', '/%s/%s/%s/%s/%s' % + (device, partition, self.account, self.container_name, name), + '', headers) + self._send_request(*hc_args) + self.complete += 1 + + +class BenchPUT(Bench): + def __init__(self, logger, conf, names): + Bench.__init__(self, logger, conf, names) + self.concurrency = self.put_concurrency + self.total = self.total_objects + self.msg = 'PUTS' + if self.use_proxy: + # Create the container + if self.url.startswith('http://'): + hc = HTTPConnection(self.url.split('/')[2]) + else: + hc = HTTPSConnection(self.url.split('/')[2]) + hc_args = ('PUT', + '/v1/%s/%s' % (self.account, self.container_name), + None, {'X-Auth-Token': self.token}) + hc.request(*hc_args) + hcr = hc.getresponse() + hcrd = hcr.read() + if hcr.status < 200 or hcr.status > 299: + raise Exception('Could not create container %s: code: %s' % + (self.container_name, hcr.status)) + + def _run(self, thread): + if time.time() - self.heartbeat >= 15: + self.heartbeat = time.time() + self._log_status('PUTS') + name = uuid.uuid4().hex + if self.object_sources: + source = random.choice(self.files) + else: + source = '0' * self.object_size + headers = { + 'Content-Type': 'application/octet-stream', + 'X-ID': str(uuid.uuid4()), + 'X-Auth-Token': self.token, + 'X-Timestamp': "%013.05f" % time.time(), + } + device = random.choice(self.devices) + partition = str(random.randint(1, 3000)) + if self.use_proxy: + hc_args = ('PUT', '/v1/%s/%s/%s' % + (self.account, self.container_name, name), source, headers) + else: + hc_args = ('PUT', '/%s/%s/%s/%s/%s' % + (device, partition, self.account, self.container_name, name), + source, headers) + if self._send_request(*hc_args): + self.names.append((device, partition, name)) + self.complete += 1 diff --git a/swift/common/utils.py b/swift/common/utils.py index f2e186c03e..7f193e66c3 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -534,8 +534,10 @@ def cache_from_env(env): return item_from_env(env, 'swift.cache') -def readconf(conf, section_name, log_name=None): - c = ConfigParser() +def readconf(conf, section_name, log_name=None, defaults=None): + if defaults is None: + defaults = {} + c = ConfigParser(defaults) if not c.read(conf): print "Unable to read config file %s" % conf sys.exit(1)