diff --git a/bin/swift-bench b/bin/swift-bench index b69671ab81..edeb5111c1 100755 --- a/bin/swift-bench +++ b/bin/swift-bench @@ -21,7 +21,8 @@ import signal import uuid from optparse import OptionParser -from swift.common.bench import BenchController +from swift.common.bench import (BenchController, DistributedBenchController, + create_containers, delete_containers) from swift.common.utils import readconf, LogAdapter # The defaults should be sufficient to run swift-bench on a SAIO @@ -49,6 +50,8 @@ CONF_DEFAULTS = { 'devices': 'sdb1', # space-sep list 'log_level': 'INFO', 'timeout': '10', + 'auth_version': '1.0', + 'bench_clients': [], } SAIO_DEFAULTS = { @@ -81,6 +84,13 @@ if __name__ == '__main__': help='User name for obtaining an auth token') parser.add_option('-K', '--key', dest='key', help='Key for obtaining an auth token') + parser.add_option('-b', '--bench-clients', action='append', + metavar=':', + help=('A string of the form ":" which matches ' + 'the arguments supplied to a swift-bench-client ' + 'process. This argument must be specified ' + 'once per swift-bench-client you want to ' + 'utilize.')) parser.add_option('-u', '--url', dest='url', help='Storage URL') parser.add_option('-c', '--concurrency', dest='concurrency', @@ -125,6 +135,8 @@ if __name__ == '__main__': options.put_concurrency = options.concurrency options.get_concurrency = options.concurrency options.del_concurrency = options.concurrency + options.containers = ['%s_%d' % (options.container_name, i) + for i in xrange(int(options.num_containers))] def sigterm(signum, frame): sys.exit('Termination signal received.') @@ -145,5 +157,13 @@ if __name__ == '__main__': '%(message)s') loghandler.setFormatter(logformat) - controller = BenchController(logger, options) + if options.use_proxy: + create_containers(logger, options) + + controller_class = DistributedBenchController if options.bench_clients \ + else BenchController + controller = controller_class(logger, options) controller.run() + + if options.delete: + delete_containers(logger, options) diff --git a/bin/swift-bench-client b/bin/swift-bench-client new file mode 100755 index 0000000000..9473b4560c --- /dev/null +++ b/bin/swift-bench-client @@ -0,0 +1,59 @@ +#!/usr/bin/env python +# Copyright (c) 2010-2012 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import sys +import signal +from optparse import OptionParser + +from swift.common.bench import BenchServer +from swift.common.utils import LogAdapter + +if __name__ == '__main__': + usage = "usage: %prog " + usage += "\n\nRun a client for distributed swift-bench runs." + parser = OptionParser(usage=usage) + parser.add_option('-o', '--log-level', dest='log_level', + default='info', + help='Logging level (debug, info, etc)') + + if len(sys.argv) != 3: + parser.print_help() + sys.exit(1) + options, args = parser.parse_args() + + logger = logging.getLogger() + logger.setLevel({ + 'debug': logging.DEBUG, + 'info': logging.INFO, + 'warning': logging.WARNING, + 'error': logging.ERROR, + 'critical': logging.CRITICAL}.get( + options.log_level.lower(), logging.INFO)) + loghandler = logging.StreamHandler() + logger.addHandler(loghandler) + logger = LogAdapter(logger, 'swift-bench-client') + logformat = logging.Formatter('%(server)s %(asctime)s %(levelname)s ' + '%(message)s') + loghandler.setFormatter(logformat) + + def sigterm(signum, frame): + sys.exit('Termination signal received.') + signal.signal(signal.SIGTERM, sigterm) + signal.signal(signal.SIGINT, sigterm) + + server = BenchServer(logger, args[0], args[1]) + server.run() diff --git a/setup.py b/setup.py index 185d11feed..edb0ce4b92 100644 --- a/setup.py +++ b/setup.py @@ -47,6 +47,7 @@ setup( 'bin/swift-account-replicator', 'bin/swift-account-server', 'bin/swift-bench', + 'bin/swift-bench-client', 'bin/swift-container-auditor', 'bin/swift-container-replicator', 'bin/swift-container-server', diff --git a/swift/common/bench.py b/swift/common/bench.py index 3edfaecccc..89e8a21124 100644 --- a/swift/common/bench.py +++ b/swift/common/bench.py @@ -13,21 +13,61 @@ # See the License for the specific language governing permissions and # limitations under the License. +import re import sys import uuid import time import random import signal +import socket +import logging from contextlib import contextmanager +from optparse import Values +import eventlet import eventlet.pools from eventlet.green.httplib import CannotSendRequest -from swift.common.utils import TRUE_VALUES +from swift.common.utils import TRUE_VALUES, LogAdapter import swiftclient as client from swift.common import direct_client from swift.common.http import HTTP_CONFLICT +try: + import simplejson as json +except ImportError: + import json + + +def _func_on_containers(logger, conf, concurrency_key, func): + """Run a function on each container with concurrency.""" + + bench = Bench(logger, conf, []) + pool = eventlet.GreenPool(int(getattr(conf, concurrency_key))) + for container in conf.containers: + pool.spawn_n(func, bench.url, bench.token, container) + pool.waitall() + + +def delete_containers(logger, conf): + """Utility function to delete benchmark containers.""" + + def _deleter(url, token, container): + try: + client.delete_container(url, token, container) + except client.ClientException, e: + if e.http_status != HTTP_CONFLICT: + logger.warn("Unable to delete container '%s'. " + "Got http status '%d'." % (container, e.http_status)) + + _func_on_containers(logger, conf, 'del_concurrency', _deleter) + + +def create_containers(logger, conf): + """Utility function to create benchmark containers.""" + + _func_on_containers(logger, conf, 'put_concurrency', client.put_container) + class ConnectionPool(eventlet.pools.Pool): @@ -39,6 +79,62 @@ class ConnectionPool(eventlet.pools.Pool): return client.http_connection(self.url) +class BenchServer(object): + """ + A BenchServer binds to an IP/port and listens for bench jobs. A bench + job consists of the normal conf "dict" encoded in JSON, terminated with an + EOF. The log level is at least INFO, but DEBUG may also be specified in + the conf dict. + + The server will wait forever for jobs, running them one at a time. + """ + def __init__(self, logger, bind_ip, bind_port): + self.logger = logger + self.bind_ip = bind_ip + self.bind_port = int(bind_port) + + def run(self): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.logger.info('Binding to %s:%s', self.bind_ip, self.bind_port) + s.bind((self.bind_ip, self.bind_port)) + s.listen(20) + while True: + client, address = s.accept() + self.logger.debug('Accepting connection from %s:%s', *address) + client_file = client.makefile('rb+', 1) + json_data = client_file.read() + conf = Values(json.loads(json_data)) + + self.logger.info( + 'Starting run for %s:%s [put/get/del_concurrency: %s/%s/%s, ' + 'num_objects: %s, num_gets: %s]', address[0], address[1], + conf.put_concurrency, conf.get_concurrency, + conf.del_concurrency, conf.num_objects, conf.num_gets) + + logger = logging.getLogger('bench-server') + level = logging.DEBUG if conf.log_level.lower() == 'debug' \ + else logging.INFO + logger.setLevel(level) + loghandler = logging.StreamHandler(stream=client_file) + logformat = logging.Formatter( + '%(server)s %(asctime)s %(levelname)s %(message)s') + loghandler.setFormatter(logformat) + logger.addHandler(loghandler) + logger = LogAdapter(logger, 'swift-bench-server') + + controller = BenchController(logger, conf) + try: + controller.run() + except socket.error: + logger.warning('Socket error', exc_info=1) + + logger.logger.removeHandler(loghandler) + client_file.close() + client.close() + + self.logger.info('...bench run completed; waiting for next run.') + + class Bench(object): def __init__(self, logger, conf, names): @@ -64,8 +160,6 @@ class Bench(object): self.account = conf.account self.url = conf.url self.ip, self.port = self.url.split('/')[2].split(':') - self.containers = ['%s_%d' % (conf.container_name, i) - for i in xrange(int(conf.num_containers))] self.object_size = int(conf.object_size) self.object_sources = conf.object_sources @@ -129,6 +223,88 @@ class Bench(object): return +class DistributedBenchController(object): + """ + This class manages a distributed swift-bench run. For this Controller + class to make sense, the conf.bench_clients list must contain at least one + entry. + + The idea is to split the configured load between one or more + swift-bench-client processes, each of which use eventlet for concurrency. + We deliberately take a simple, naive approach with these limitations: + 1) Concurrency, num_objects, and num_gets are spread evenly between the + swift-bench-client processes. With a low concurrency to + swift-bench-client count ratio, rounding may result in a greater + than desired aggregate concurrency. + 2) Each swift-bench-client process runs independently so some may + finish up before others, i.e. the target aggregate concurrency is + not necessarily present the whole time. This may bias aggregate + reported rates lower than a more efficient architecture. + 3) Because of #2, some swift-bench-client processes may be running GETs + while others are still runinng their PUTs. Because of this + potential skew, distributed runs will not isolate one operation at a + time like a single swift-bench run will. + 3) Reported aggregate rates are simply the sum of each + swift-bench-client process reported FINAL number. That's probably + inaccurate somehow. + """ + + def __init__(self, logger, conf): + self.logger = logger + # ... INFO 1000 PUTS **FINAL** [0 failures], 34.9/s + self.final_re = re.compile( + 'INFO (\d+) (.*) \*\*FINAL\*\* \[(\d+) failures\], (\d+\.\d+)/s') + self.clients = conf.bench_clients + del conf.bench_clients + for k in ['put_concurrency', 'get_concurrency', 'del_concurrency', + 'num_objects', 'num_gets']: + setattr(conf, k, max(1, int(getattr(conf, k)) / len(self.clients))) + self.conf = conf + + def run(self): + eventlet.patcher.monkey_patch(socket=True) + pool = eventlet.GreenPool(size=len(self.clients)) + pile = eventlet.GreenPile(pool) + for client in self.clients: + pile.spawn(self.do_run, client) + results = { + 'PUTS': dict(count=0, failures=0, rate=0.0), + 'GETS': dict(count=0, failures=0, rate=0.0), + 'DEL': dict(count=0, failures=0, rate=0.0), + } + for result in pile: + for k, v in result.iteritems(): + target = results[k] + target['count'] += int(v['count']) + target['failures'] += int(v['failures']) + target['rate'] += float(v['rate']) + for k in ['PUTS', 'GETS', 'DEL']: + v = results[k] + self.logger.info('%d %s **FINAL** [%d failures], %.1f/s' % ( + v['count'], k, v['failures'], v['rate'])) + + def do_run(self, client): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + ip, port = client.split(':') + s.connect((ip, int(port))) + s.sendall(json.dumps(self.conf.__dict__)) + s.shutdown(socket.SHUT_WR) + s_file = s.makefile('rb', 1) + result = {} + for line in s_file: + match = self.final_re.search(line) + if match: + g = match.groups() + result[g[1]] = { + 'count': g[0], + 'failures': g[2], + 'rate': g[3], + } + else: + sys.stderr.write('%s %s' % (client, line)) + return result + + class BenchController(object): def __init__(self, logger, conf): @@ -177,16 +353,6 @@ class BenchDELETE(Bench): self.total = len(names) self.msg = 'DEL' - def run(self): - Bench.run(self) - for container in self.containers: - try: - client.delete_container(self.url, self.token, container) - except client.ClientException, e: - if e.http_status != HTTP_CONFLICT: - self._log_status("Unable to delete container '%s'. " \ - "Got http status '%d'." % (container, e.http_status)) - def _run(self, thread): if time.time() - self.heartbeat >= 15: self.heartbeat = time.time() @@ -242,11 +408,7 @@ class BenchPUT(Bench): self.concurrency = self.put_concurrency self.total = self.total_objects self.msg = 'PUTS' - if self.use_proxy: - with self.connection() as conn: - for container_name in self.containers: - client.put_container(self.url, self.token, - container_name, http_conn=conn) + self.containers = conf.containers def _run(self, thread): if time.time() - self.heartbeat >= 15: