Add performance tests to SwiftOnHPSS

This commit is contained in:
Phil Bridges 2016-03-18 14:10:48 -05:00
parent ef2d07ea48
commit cb2c1022dc
2 changed files with 424 additions and 0 deletions

View File

@ -0,0 +1,22 @@
[config]
# The Keystone URL to talk to
#authurl=http://localhost:5000/v2.0
# The username to login as
#username=admin
# The tenant name to scope into
#tenantname=admin
# The password for that user
#password=password
# How many workers to spin up
#num_workers=2
# How many write/read cycles to execute
#num_cycles=8
# The name of the SwiftOnHPSS storage policy configured on your Swift cluster
#hpss_storage_policy=hpss

402
system_test/systemtest.py Executable file
View File

@ -0,0 +1,402 @@
#!/usr/bin/env python
#
# Copyright (c) 2015-2016 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import swiftclient
import multiprocessing
import os
import time
import ConfigParser
import collections
import itertools
from pprint import pprint as pretty_print
# This is here because you can't normally do multiprocessing on
# instance methods. We get around that restriction by using this
# top-level function in the worker process as the execution target
# to call it for us, with the class state and method name pickled and
# piped in through a queue. Who needs the Law of Demeter anyway?!
def _call_instance_method(instance, method_name, *args, **kwargs):
return getattr(instance.__class__, method_name)(instance, *args, **kwargs)
def _parse_config():
parser = ConfigParser.SafeConfigParser()
parser.read('systemtest.cfg')
options = {}
for opt in ['authurl', 'username', 'tenantname',
'password', 'num_workers', 'num_cycles',
'hpss_storage_policy']:
options[opt] = parser.get('config', opt)
# These are specifically numbers.
options['num_workers'] = int(options['num_workers'])
options['num_cycles'] = int(options['num_cycles'])
return options
def apply_ansi(string_to_format, ansi_codes):
input_codes = ansi_codes
if type(input_codes) is int:
input_codes = [ansi_codes]
if len(input_codes) is 0:
return string_to_format
code_str = ";".join(map(str, input_codes))
return '\033[%sm%s\033[0m' % (code_str, string_to_format)
def average(value_list):
if len(value_list) == 0:
return 0
return sum(value_list) / len(value_list)
def chain(value, *fns):
result = value
for fn in fns:
result = fn(result)
return result
def human_readable_size(size, over_time=True):
if over_time:
prefixes = ['B/s', 'KiB/s', 'MiB/s', 'GiB/s', 'TiB/s', 'PiB/s', 'EiB/s']
else:
prefixes = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB']
tmp = size
magnitude = 0
while tmp >= 1024 and magnitude < len(prefixes)-1:
tmp /= 1024.0
magnitude += 1
return '%f %s' % (tmp, prefixes[magnitude])
ansi_modes = {'clear': 0,
'bold': 1,
'blink': 5,
'fg_black': 30,
'fg_red': 31,
'fg_green': 32,
'fg_yellow': 33,
'fg_blue': 34,
'fg_magenta': 35,
'fg_cyan': 36,
'fg_white': 37,
'bg_black': 40,
'bg_red': 41,
'bg_green': 42,
'bg_yellow': 43,
'bg_blue': 44,
'bg_magenta': 45,
'bg_cyan': 46,
'bg_white': 47}
# These three things are all classes in their own right.
IORecord = collections.namedtuple('IORecord', ['size', 'elapsed_time'])
TestWorkerRecord = collections.namedtuple('TestWorkerRecord',
['handle', 'queue'])
TestResultRecord = collections.namedtuple('TestResultRecord',
['worker_pid', 'write_result',
'read_result', 'test_size', 'cycle'])
class TimedReader:
def __init__(self, source, size=None):
self.source = source
self.read_samples = []
self.done = False
self.last_read_time = time.time()
self.size = size
self._total_read = 0
def read(self, size):
current = time.time()
result = self.source.read(size)
bytes_read = len(result)
self.read_samples.append(
IORecord(size=bytes_read,
elapsed_time=current - self.last_read_time))
self._total_read += bytes_read
self.last_read_time = current
if result == '' or (self._total_read >= self.size):
self.done = True
return result
def reset(self):
self.source.seek(0)
self.read_samples = []
self.done = False
self.last_read_time = time.time()
self._total_read = 0
def time_so_far(self):
return sum([rec.elapsed_time for rec in self.read_samples])
def read_so_far(self):
return sum([rec.size for rec in self.read_samples])
def io_rate(self):
return self.read_so_far() / self.time_so_far()
class PerformanceTestWorker():
def __init__(self, args, queue):
self.pid = None
self.args = args
self.queue = queue
self.files_written = []
self.swift_connection = None
self.results = []
# This process is what the controller process calls to start execution.
def start(self, sem):
worker = multiprocessing.Process(target=_call_instance_method,
args=(self, '_do_it', sem))
worker.start()
# This function gets bootstrapped on the remote process side.
def _do_it(self, semaphore):
semaphore.acquire()
try:
self.pid = os.getpid()
num_cycles = int(self.args['num_cycles'])
MEGABYTE = 1024*1024
sizes = [1*MEGABYTE, 4*MEGABYTE, 16*MEGABYTE, 64*MEGABYTE]
self.swift_connection = swiftclient.client.Connection(
authurl=self.args['authurl'],
user=self.args['username'],
key=self.args['password'],
tenant_name=self.args['tenantname'],
auth_version='2')
for cycle in xrange(1, num_cycles+1):
for size in sizes:
result = self._test_cycle(size, cycle)
self.results.append(result)
except:
self.queue.put([])
raise
else:
self.queue.put(self.results)
finally:
semaphore.release()
def _test_cycle(self, size, cycle):
zero_name = '%d-%d-zero-%d' % (size, self.pid, cycle)
random_name = '%d-%d-random-%d' % (size, self.pid, cycle)
zero_file = open('/tmp/%s' % zero_name, 'w')
random_file = open('/tmp/%s' % random_name, 'w')
# Create files to read from, to make test more accurate
zero_file.write(open('/dev/zero').read(size))
random_file.write(open('/dev/urandom').read(size))
zero_file.close()
random_file.close()
zero_file = open('/tmp/%s' % zero_name, 'r')
random_file = open('/tmp/%s' % random_name, 'r')
# Write the file
zero_timer = TimedReader(zero_file, size)
self.swift_connection.put_object('.performance_test',
zero_name,
zero_timer, size)
random_timer = TimedReader(random_file, size)
self.swift_connection.put_object('.performance_test',
random_name,
random_timer, size)
write_result = {'zero_avg': zero_timer.io_rate(),
'random_avg': random_timer.io_rate(),
}
# Read the file back
zero_stream = \
self.swift_connection.get_object('.performance_test', zero_name,
resp_chunk_size=65536)[1]
random_stream = \
self.swift_connection.get_object('.performance_test', random_name,
resp_chunk_size=65536)[1]
zero_timer = TimedReader(zero_stream, size)
random_timer = TimedReader(random_stream, size)
read_size = 65536
while not (zero_timer.done or random_timer.done):
zero_timer.read(read_size)
random_timer.read(read_size)
read_result = {'zero_avg': zero_timer.io_rate(),
'random_avg': random_timer.io_rate(),
}
# Cleanup
self.swift_connection.delete_object('.performance_test', zero_name)
self.swift_connection.delete_object('.performance_test', random_name)
os.unlink('/tmp/%s' % zero_name)
os.unlink('/tmp/%s' % random_name)
return TestResultRecord(write_result=write_result,
read_result=read_result,
worker_pid=self.pid,
test_size=size,
cycle=cycle)
class PerformanceTestController():
def __init__(self, args):
self.args = args
self.semaphore = None
self.workers = []
def create_worker(self):
queue = multiprocessing.Queue()
worker = PerformanceTestWorker(self.args, queue)
self.workers.append(TestWorkerRecord(worker, queue))
def start_test(self):
self.semaphore = multiprocessing.Semaphore(len(self.workers))
for worker_record in self.workers:
worker_record.handle.start(self.semaphore)
def is_done(self):
if not self.semaphore:
return False
return self.semaphore.get_value() == len(self.workers)
def get_results(self):
worker_results = [worker.queue.get() for worker in self.workers]
results = list(itertools.chain(*worker_results))
# This next bit only works because Python's sort algorithm is stable.
# What this is supposed to do is sort this list so that we have
# the test size as the primary sort key, then worker_pid, then cycle.
# The order matters.
results = sorted(results, key=lambda x: x.cycle)
results = sorted(results, key=lambda x: x.worker_pid)
results = sorted(results, key=lambda x: x.test_size)
return results
# TODO: is there any nicer way to handle the view for this test?
# maybe in the future some fancy graphical view? something to copy-paste
# into slide decks
class PerformanceTestViewer():
def __init__(self, results):
self.results = results
def title(self, content):
return apply_ansi(content, ansi_modes['bold'])
def header(self, content):
return apply_ansi(content, ansi_modes['fg_green'])
def error(self, content):
return apply_ansi(content, ansi_modes['fg_red'])
def _group_by(self, iterable, keyfunc):
return {k: list(v) for k, v
in itertools.groupby(iterable, key=keyfunc)}
# TODO: make this much more concise
def show_results(self):
size_to_results = self._group_by(self.results,
keyfunc=lambda x: x.test_size)
pretty_print(size_to_results)
for size in size_to_results:
pid_to_results = self._group_by(size_to_results[size],
keyfunc=lambda x: x.worker_pid)
print self.title('Testing with files of size %s:' %
human_readable_size(size, over_time=False))
print '---'
print self.header(' Ingesting random bytes:')
for pid in pid_to_results:
print self.header(' Worker %s:' % pid), \
map(lambda res: human_readable_size(
res.write_result['random_avg']),
pid_to_results[pid])
print self.header(' Overall average per worker:'), \
chain(map(lambda res: res.write_result['random_avg'],
size_to_results[size]), average, human_readable_size)
print ''
print self.header(' Ingesting zero bytes:')
for pid in pid_to_results:
print self.header(' Worker %s:' % pid), \
map(lambda res: human_readable_size(
res.write_result['zero_avg']),
pid_to_results[pid])
print self.header(' Overall average per worker:'), \
chain(map(lambda res: res.write_result['zero_avg'],
size_to_results[size]), average, human_readable_size)
print ''
print self.header(' Recalling random bytes:')
for pid in pid_to_results:
print self.header(' Worker %s:' % pid), \
map(lambda res: human_readable_size(
res.read_result['random_avg']),
pid_to_results[pid])
print self.header(' Overall average per worker:'), \
chain(map(lambda res: res.read_result['random_avg'],
size_to_results[size]), average, human_readable_size)
print ''
print self.header(' Recalling zero bytes:')
for pid in pid_to_results:
print self.header(' Worker %s:' % pid), \
map(lambda res: human_readable_size(
res.read_result['zero_avg']),
pid_to_results[pid])
print self.header(' Overall average per worker:'), \
chain(map(lambda res: res.read_result['zero_avg'],
size_to_results[size]), average, human_readable_size)
print '---'
print ''
def main():
args = _parse_config()
worker_count = int(args['num_workers'])
swift = swiftclient.client.Connection(authurl=args['authurl'],
user=args['username'],
tenant_name=args['tenantname'],
key=args['password'],
auth_version='2')
swift.put_container('.performance_test',
{'X-Storage-Policy':
args['hpss_storage_policy']})
controller = PerformanceTestController(args)
for i in xrange(0, worker_count):
controller.create_worker()
print "Starting %d worker(s)..." % worker_count
controller.start_test()
print "Waiting for test results..."
# Wait for all "threads" to finish
time.sleep(.5)
while not controller.is_done():
time.sleep(.1)
all_results = controller.get_results()
view = PerformanceTestViewer(all_results)
view.show_results()
if __name__ == "__main__":
main()