Refactored the log processing daemon to make it more testable.
Added tests for that. I shouldn't have changed how it worked at all. This needs to be tested on staging extensively before being pushed to production.
This commit is contained in:
commit
bac7d7380a
@ -30,6 +30,8 @@ from swift.common.exceptions import ChunkReadTimeout
|
||||
from swift.common.utils import get_logger, readconf
|
||||
from swift.common.daemon import Daemon
|
||||
|
||||
now = datetime.datetime.now
|
||||
|
||||
|
||||
class BadFileDownload(Exception):
|
||||
def __init__(self, status_code=None):
|
||||
@ -234,31 +236,46 @@ class LogProcessorDaemon(Daemon):
|
||||
self.log_processor_container = c.get('container_name',
|
||||
'log_processing_data')
|
||||
self.worker_count = int(c.get('worker_count', '1'))
|
||||
self._keylist_mapping = None
|
||||
self.processed_files_filename = 'processed_files.pickle.gz'
|
||||
|
||||
def run_once(self, *args, **kwargs):
|
||||
for k in 'lookback_hours lookback_window'.split():
|
||||
if kwargs[k] is not None:
|
||||
setattr(self, k, kwargs[k])
|
||||
def get_lookback_interval(self):
|
||||
"""
|
||||
:returns: lookback_start, lookback_end.
|
||||
|
||||
Both or just lookback_end can be None. Otherwise, returns strings
|
||||
of the form 'YYYYMMDDHH'. The interval returned is used as bounds
|
||||
when looking for logs to processes.
|
||||
|
||||
A returned None means don't limit the log files examined on that
|
||||
side of the interval.
|
||||
"""
|
||||
|
||||
self.logger.info(_("Beginning log processing"))
|
||||
start = time.time()
|
||||
if self.lookback_hours == 0:
|
||||
lookback_start = None
|
||||
lookback_end = None
|
||||
else:
|
||||
delta_hours = datetime.timedelta(hours=self.lookback_hours)
|
||||
lookback_start = datetime.datetime.now() - delta_hours
|
||||
lookback_start = now() - delta_hours
|
||||
lookback_start = lookback_start.strftime('%Y%m%d%H')
|
||||
if self.lookback_window == 0:
|
||||
lookback_end = None
|
||||
else:
|
||||
delta_window = datetime.timedelta(hours=self.lookback_window)
|
||||
lookback_end = datetime.datetime.now() - \
|
||||
lookback_end = now() - \
|
||||
delta_hours + \
|
||||
delta_window
|
||||
lookback_end = lookback_end.strftime('%Y%m%d%H')
|
||||
self.logger.debug('lookback_start: %s' % lookback_start)
|
||||
self.logger.debug('lookback_end: %s' % lookback_end)
|
||||
return lookback_start, lookback_end
|
||||
|
||||
def get_processed_files_list(self):
|
||||
"""
|
||||
:returns: a set of files that have already been processed or returns
|
||||
None on error.
|
||||
|
||||
Downloads the set from the stats account. Creates an empty set if
|
||||
the an existing file cannot be found.
|
||||
"""
|
||||
try:
|
||||
# Note: this file (or data set) will grow without bound.
|
||||
# In practice, if it becomes a problem (say, after many months of
|
||||
@ -266,44 +283,52 @@ class LogProcessorDaemon(Daemon):
|
||||
# entries. Automatically pruning on each run could be dangerous.
|
||||
# There is not a good way to determine when an old entry should be
|
||||
# pruned (lookback_hours could be set to anything and could change)
|
||||
processed_files_stream = self.log_processor.get_object_data(
|
||||
self.log_processor_account,
|
||||
self.log_processor_container,
|
||||
'processed_files.pickle.gz',
|
||||
compressed=True)
|
||||
buf = '\n'.join(x for x in processed_files_stream)
|
||||
stream = self.log_processor.get_object_data(
|
||||
self.log_processor_account,
|
||||
self.log_processor_container,
|
||||
self.processed_files_filename,
|
||||
compressed=True)
|
||||
buf = '\n'.join(x for x in stream)
|
||||
if buf:
|
||||
already_processed_files = cPickle.loads(buf)
|
||||
files = cPickle.loads(buf)
|
||||
else:
|
||||
already_processed_files = set()
|
||||
return None
|
||||
except BadFileDownload, err:
|
||||
if err.status_code == 404:
|
||||
already_processed_files = set()
|
||||
files = set()
|
||||
else:
|
||||
self.logger.error(_('Log processing unable to load list of '
|
||||
'already processed log files'))
|
||||
return
|
||||
self.logger.debug(_('found %d processed files') % \
|
||||
len(already_processed_files))
|
||||
logs_to_process = self.log_processor.get_data_list(lookback_start,
|
||||
lookback_end,
|
||||
already_processed_files)
|
||||
self.logger.info(_('loaded %d files to process') %
|
||||
len(logs_to_process))
|
||||
if not logs_to_process:
|
||||
self.logger.info(_("Log processing done (%0.2f minutes)") %
|
||||
((time.time() - start) / 60))
|
||||
return
|
||||
return None
|
||||
return files
|
||||
|
||||
# map
|
||||
processor_args = (self.total_conf, self.logger)
|
||||
results = multiprocess_collate(processor_args, logs_to_process,
|
||||
self.worker_count)
|
||||
def get_aggregate_data(self, processed_files, input_data):
|
||||
"""
|
||||
Aggregates stats data by account/hour, summing as needed.
|
||||
|
||||
:param processed_files: set of processed files
|
||||
:param input_data: is the output from multiprocess_collate/the plugins.
|
||||
|
||||
:returns: A dict containing data aggregated from the input_data
|
||||
passed in.
|
||||
|
||||
The dict returned has tuple keys of the form:
|
||||
(account, year, month, day, hour)
|
||||
The dict returned has values that are dicts with items of this
|
||||
form:
|
||||
key:field_value
|
||||
- key corresponds to something in one of the plugin's keylist
|
||||
mapping, something like the tuple (source, level, verb, code)
|
||||
- field_value is the sum of the field_values for the
|
||||
corresponding values in the input
|
||||
|
||||
Both input_data and the dict returned are hourly aggregations of
|
||||
stats.
|
||||
|
||||
Multiple values for the same (account, hour, tuple key) found in
|
||||
input_data are summed in the dict returned.
|
||||
"""
|
||||
|
||||
#reduce
|
||||
aggr_data = {}
|
||||
processed_files = already_processed_files
|
||||
for item, data in results:
|
||||
for item, data in input_data:
|
||||
# since item contains the plugin and the log name, new plugins will
|
||||
# "reprocess" the file and the results will be in the final csv.
|
||||
processed_files.add(item)
|
||||
@ -315,14 +340,30 @@ class LogProcessorDaemon(Daemon):
|
||||
# processing plugins need to realize this
|
||||
existing_data[i] = current + j
|
||||
aggr_data[k] = existing_data
|
||||
return aggr_data
|
||||
|
||||
def get_final_info(self, aggr_data):
|
||||
"""
|
||||
Aggregates data from aggr_data based on the keylist mapping.
|
||||
|
||||
:param aggr_data: The results of the get_aggregate_data function.
|
||||
:returns: a dict of further aggregated data
|
||||
|
||||
The dict returned has keys of the form:
|
||||
(account, year, month, day, hour)
|
||||
The dict returned has values that are dicts with items of this
|
||||
form:
|
||||
'field_name': field_value (int)
|
||||
|
||||
Data is aggregated as specified by the keylist mapping. The
|
||||
keylist mapping specifies which keys to combine in aggr_data
|
||||
and the final field_names for these combined keys in the dict
|
||||
returned. Fields combined are summed.
|
||||
"""
|
||||
|
||||
# group
|
||||
# reduce a large number of keys in aggr_data[k] to a small number of
|
||||
# output keys
|
||||
keylist_mapping = self.log_processor.generate_keylist_mapping()
|
||||
final_info = collections.defaultdict(dict)
|
||||
for account, data in aggr_data.items():
|
||||
for key, mapping in keylist_mapping.items():
|
||||
for key, mapping in self.keylist_mapping.items():
|
||||
if isinstance(mapping, (list, set)):
|
||||
value = 0
|
||||
for k in mapping:
|
||||
@ -336,37 +377,154 @@ class LogProcessorDaemon(Daemon):
|
||||
except KeyError:
|
||||
value = 0
|
||||
final_info[account][key] = value
|
||||
return final_info
|
||||
|
||||
# output
|
||||
sorted_keylist_mapping = sorted(keylist_mapping)
|
||||
columns = 'data_ts,account,' + ','.join(sorted_keylist_mapping)
|
||||
out_buf = [columns]
|
||||
def store_processed_files_list(self, processed_files):
|
||||
"""
|
||||
Stores the proccessed files list in the stats account.
|
||||
|
||||
:param processed_files: set of processed files
|
||||
"""
|
||||
|
||||
s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL)
|
||||
f = cStringIO.StringIO(s)
|
||||
self.log_processor.internal_proxy.upload_file(f,
|
||||
self.log_processor_account,
|
||||
self.log_processor_container,
|
||||
self.processed_files_filename)
|
||||
|
||||
def get_output(self, final_info):
|
||||
"""
|
||||
:returns: a list of rows to appear in the csv file.
|
||||
|
||||
The first row contains the column headers for the rest of the
|
||||
rows in the returned list.
|
||||
|
||||
Each row after the first row corresponds to an account's data
|
||||
for that hour.
|
||||
"""
|
||||
|
||||
sorted_keylist_mapping = sorted(self.keylist_mapping)
|
||||
columns = ['data_ts', 'account'] + sorted_keylist_mapping
|
||||
output = [columns]
|
||||
for (account, year, month, day, hour), d in final_info.items():
|
||||
data_ts = '%s/%s/%s %s:00:00' % (year, month, day, hour)
|
||||
row = [data_ts]
|
||||
row.append('%s' % account)
|
||||
data_ts = '%04d/%02d/%02d %02d:00:00' % \
|
||||
(int(year), int(month), int(day), int(hour))
|
||||
row = [data_ts, '%s' % (account)]
|
||||
for k in sorted_keylist_mapping:
|
||||
row.append('%s' % d[k])
|
||||
out_buf.append(','.join(row))
|
||||
out_buf = '\n'.join(out_buf)
|
||||
row.append(str(d[k]))
|
||||
output.append(row)
|
||||
return output
|
||||
|
||||
def store_output(self, output):
|
||||
"""
|
||||
Takes the a list of rows and stores a csv file of the values in the
|
||||
stats account.
|
||||
|
||||
:param output: list of rows to appear in the csv file
|
||||
|
||||
This csv file is final product of this script.
|
||||
"""
|
||||
|
||||
out_buf = '\n'.join([','.join(row) for row in output])
|
||||
h = hashlib.md5(out_buf).hexdigest()
|
||||
upload_name = time.strftime('%Y/%m/%d/%H/') + '%s.csv.gz' % h
|
||||
f = cStringIO.StringIO(out_buf)
|
||||
self.log_processor.internal_proxy.upload_file(f,
|
||||
self.log_processor_account,
|
||||
self.log_processor_container,
|
||||
upload_name)
|
||||
self.log_processor_account,
|
||||
self.log_processor_container,
|
||||
upload_name)
|
||||
|
||||
# cleanup
|
||||
s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL)
|
||||
f = cStringIO.StringIO(s)
|
||||
self.log_processor.internal_proxy.upload_file(f,
|
||||
self.log_processor_account,
|
||||
self.log_processor_container,
|
||||
'processed_files.pickle.gz')
|
||||
@property
|
||||
def keylist_mapping(self):
|
||||
"""
|
||||
:returns: the keylist mapping.
|
||||
|
||||
The keylist mapping determines how the stats fields are aggregated in
|
||||
the final aggregation step.
|
||||
"""
|
||||
|
||||
if self._keylist_mapping == None:
|
||||
self._keylist_mapping = \
|
||||
self.log_processor.generate_keylist_mapping()
|
||||
return self._keylist_mapping
|
||||
|
||||
def process_logs(self, logs_to_process, processed_files):
|
||||
"""
|
||||
:param logs_to_process: list of logs to process
|
||||
:param processed_files: set of processed files
|
||||
|
||||
:returns: returns a list of rows of processed data.
|
||||
|
||||
The first row is the column headers. The rest of the rows contain
|
||||
hourly aggregate data for the account specified in the row.
|
||||
|
||||
Files processed are added to the processed_files set.
|
||||
|
||||
When a large data structure is no longer needed, it is deleted in
|
||||
an effort to conserve memory.
|
||||
"""
|
||||
|
||||
# map
|
||||
processor_args = (self.total_conf, self.logger)
|
||||
results = multiprocess_collate(processor_args, logs_to_process,
|
||||
self.worker_count)
|
||||
|
||||
# reduce
|
||||
aggr_data = self.get_aggregate_data(processed_files, results)
|
||||
del results
|
||||
|
||||
# group
|
||||
# reduce a large number of keys in aggr_data[k] to a small
|
||||
# number of output keys
|
||||
final_info = self.get_final_info(aggr_data)
|
||||
del aggr_data
|
||||
|
||||
# output
|
||||
return self.get_output(final_info)
|
||||
|
||||
def run_once(self, *args, **kwargs):
|
||||
"""
|
||||
Process log files that fall within the lookback interval.
|
||||
|
||||
Upload resulting csv file to stats account.
|
||||
|
||||
Update processed files list and upload to stats account.
|
||||
"""
|
||||
|
||||
for k in 'lookback_hours lookback_window'.split():
|
||||
if k in kwargs and kwargs[k] is not None:
|
||||
setattr(self, k, kwargs[k])
|
||||
|
||||
start = time.time()
|
||||
self.logger.info(_("Beginning log processing"))
|
||||
|
||||
lookback_start, lookback_end = self.get_lookback_interval()
|
||||
self.logger.debug('lookback_start: %s' % lookback_start)
|
||||
self.logger.debug('lookback_end: %s' % lookback_end)
|
||||
|
||||
processed_files = self.get_processed_files_list()
|
||||
if processed_files == None:
|
||||
self.logger.error(_('Log processing unable to load list of '
|
||||
'already processed log files'))
|
||||
return
|
||||
self.logger.debug(_('found %d processed files') %
|
||||
len(processed_files))
|
||||
|
||||
logs_to_process = self.log_processor.get_data_list(lookback_start,
|
||||
lookback_end, processed_files)
|
||||
self.logger.info(_('loaded %d files to process') %
|
||||
len(logs_to_process))
|
||||
|
||||
if logs_to_process:
|
||||
output = self.process_logs(logs_to_process, processed_files)
|
||||
self.store_output(output)
|
||||
del output
|
||||
|
||||
self.store_processed_files_list(processed_files)
|
||||
|
||||
self.logger.info(_("Log processing done (%0.2f minutes)") %
|
||||
((time.time() - start) / 60))
|
||||
((time.time() - start) / 60))
|
||||
|
||||
|
||||
def multiprocess_collate(processor_args, logs_to_process, worker_count):
|
||||
|
@ -16,6 +16,10 @@
|
||||
import unittest
|
||||
from test.unit import tmpfile
|
||||
import Queue
|
||||
import datetime
|
||||
import hashlib
|
||||
import pickle
|
||||
import time
|
||||
|
||||
from swift.common import internal_proxy
|
||||
from swift.stats import log_processor
|
||||
@ -26,7 +30,6 @@ class FakeUploadApp(object):
|
||||
def __init__(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
|
||||
class DumbLogger(object):
|
||||
def __getattr__(self, n):
|
||||
return self.foo
|
||||
@ -77,7 +80,7 @@ class DumbInternalProxy(object):
|
||||
return self.code, data()
|
||||
|
||||
class TestLogProcessor(unittest.TestCase):
|
||||
|
||||
|
||||
access_test_line = 'Jul 9 04:14:30 saio proxy-server 1.2.3.4 4.5.6.7 '\
|
||||
'09/Jul/2010/04/14/30 GET '\
|
||||
'/v1/acct/foo/bar?format=json&foo HTTP/1.0 200 - '\
|
||||
@ -85,7 +88,7 @@ class TestLogProcessor(unittest.TestCase):
|
||||
'6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262'
|
||||
stats_test_line = 'account,1,2,3'
|
||||
proxy_config = {'log-processor': {
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -426,3 +429,407 @@ use = egg:swift#proxy
|
||||
finally:
|
||||
log_processor.LogProcessor._internal_proxy = None
|
||||
log_processor.LogProcessor.get_object_data = orig_get_object_data
|
||||
|
||||
class TestLogProcessorDaemon(unittest.TestCase):
|
||||
|
||||
def test_get_lookback_interval(self):
|
||||
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
|
||||
def __init__(self, lookback_hours, lookback_window):
|
||||
self.lookback_hours = lookback_hours
|
||||
self.lookback_window = lookback_window
|
||||
|
||||
try:
|
||||
d = datetime.datetime
|
||||
|
||||
for x in [
|
||||
[d(2011, 1, 1), 0, 0, None, None],
|
||||
[d(2011, 1, 1), 120, 0, '2010122700', None],
|
||||
[d(2011, 1, 1), 120, 24, '2010122700', '2010122800'],
|
||||
[d(2010, 1, 2, 3, 4), 120, 48, '2009122803', '2009123003'],
|
||||
[d(2009, 5, 6, 7, 8), 1200, 100, '2009031707', '2009032111'],
|
||||
[d(2008, 9, 10, 11, 12), 3000, 1000, '2008050811', '2008061903'],
|
||||
]:
|
||||
|
||||
log_processor.now = lambda: x[0]
|
||||
|
||||
d = MockLogProcessorDaemon(x[1], x[2])
|
||||
self.assertEquals((x[3], x[4]), d.get_lookback_interval())
|
||||
finally:
|
||||
log_processor.now = datetime.datetime.now
|
||||
|
||||
def test_get_processed_files_list(self):
|
||||
class MockLogProcessor():
|
||||
def __init__(self, stream):
|
||||
self.stream = stream
|
||||
|
||||
def get_object_data(self, *args, **kwargs):
|
||||
return self.stream
|
||||
|
||||
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
|
||||
def __init__(self, stream):
|
||||
self.log_processor = MockLogProcessor(stream)
|
||||
self.log_processor_account = 'account'
|
||||
self.log_processor_container = 'container'
|
||||
self.processed_files_filename = 'filename'
|
||||
|
||||
file_list = set(['a', 'b', 'c'])
|
||||
|
||||
for s, l in [['', None],
|
||||
[pickle.dumps(set()).split('\n'), set()],
|
||||
[pickle.dumps(file_list).split('\n'), file_list],
|
||||
]:
|
||||
|
||||
self.assertEquals(l,
|
||||
MockLogProcessorDaemon(s).get_processed_files_list())
|
||||
|
||||
def test_get_processed_files_list_bad_file_downloads(self):
|
||||
class MockLogProcessor():
|
||||
def __init__(self, status_code):
|
||||
self.err = log_processor.BadFileDownload(status_code)
|
||||
|
||||
def get_object_data(self, *a, **k):
|
||||
raise self.err
|
||||
|
||||
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
|
||||
def __init__(self, status_code):
|
||||
self.log_processor = MockLogProcessor(status_code)
|
||||
self.log_processor_account = 'account'
|
||||
self.log_processor_container = 'container'
|
||||
self.processed_files_filename = 'filename'
|
||||
|
||||
for c, l in [[404, set()], [503, None], [None, None]]:
|
||||
self.assertEquals(l,
|
||||
MockLogProcessorDaemon(c).get_processed_files_list())
|
||||
|
||||
def test_get_aggregate_data(self):
|
||||
# when run "for real"
|
||||
# the various keys/values in the input and output
|
||||
# dictionaries are often not simple strings
|
||||
# for testing we can use keys that are easier to work with
|
||||
|
||||
processed_files = set()
|
||||
|
||||
data_in = [
|
||||
['file1', {
|
||||
'acct1_time1': {'field1': 1, 'field2': 2, 'field3': 3},
|
||||
'acct1_time2': {'field1': 4, 'field2': 5},
|
||||
'acct2_time1': {'field1': 6, 'field2': 7},
|
||||
'acct3_time3': {'field1': 8, 'field2': 9},
|
||||
}
|
||||
],
|
||||
['file2', {'acct1_time1': {'field1': 10}}],
|
||||
]
|
||||
|
||||
expected_data_out = {
|
||||
'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3},
|
||||
'acct1_time2': {'field1': 4, 'field2': 5},
|
||||
'acct2_time1': {'field1': 6, 'field2': 7},
|
||||
'acct3_time3': {'field1': 8, 'field2': 9},
|
||||
}
|
||||
|
||||
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
d = MockLogProcessorDaemon()
|
||||
data_out = d.get_aggregate_data(processed_files, data_in)
|
||||
|
||||
for k, v in expected_data_out.items():
|
||||
self.assertEquals(v, data_out[k])
|
||||
|
||||
self.assertEquals(set(['file1', 'file2']), processed_files)
|
||||
|
||||
def test_get_final_info(self):
|
||||
# when run "for real"
|
||||
# the various keys/values in the input and output
|
||||
# dictionaries are often not simple strings
|
||||
# for testing we can use keys/values that are easier to work with
|
||||
|
||||
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
|
||||
def __init__(self):
|
||||
self._keylist_mapping = {
|
||||
'out_field1':['field1', 'field2', 'field3'],
|
||||
'out_field2':['field2', 'field3'],
|
||||
'out_field3':['field3'],
|
||||
'out_field4':'field4',
|
||||
'out_field5':['field6', 'field7', 'field8'],
|
||||
'out_field6':['field6'],
|
||||
'out_field7':'field7',
|
||||
}
|
||||
|
||||
data_in = {
|
||||
'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3,
|
||||
'field4': 8, 'field5': 11},
|
||||
'acct1_time2': {'field1': 4, 'field2': 5},
|
||||
'acct2_time1': {'field1': 6, 'field2': 7},
|
||||
'acct3_time3': {'field1': 8, 'field2': 9},
|
||||
}
|
||||
|
||||
expected_data_out = {
|
||||
'acct1_time1': {'out_field1': 16, 'out_field2': 5,
|
||||
'out_field3': 3, 'out_field4': 8, 'out_field5': 0,
|
||||
'out_field6': 0, 'out_field7': 0,},
|
||||
'acct1_time2': {'out_field1': 9, 'out_field2': 5,
|
||||
'out_field3': 0, 'out_field4': 0, 'out_field5': 0,
|
||||
'out_field6': 0, 'out_field7': 0,},
|
||||
'acct2_time1': {'out_field1': 13, 'out_field2': 7,
|
||||
'out_field3': 0, 'out_field4': 0, 'out_field5': 0,
|
||||
'out_field6': 0, 'out_field7': 0,},
|
||||
'acct3_time3': {'out_field1': 17, 'out_field2': 9,
|
||||
'out_field3': 0, 'out_field4': 0, 'out_field5': 0,
|
||||
'out_field6': 0, 'out_field7': 0,},
|
||||
}
|
||||
|
||||
self.assertEquals(expected_data_out,
|
||||
MockLogProcessorDaemon().get_final_info(data_in))
|
||||
|
||||
def test_store_processed_files_list(self):
|
||||
class MockInternalProxy:
|
||||
def __init__(self, test, daemon, processed_files):
|
||||
self.test = test
|
||||
self.daemon = daemon
|
||||
self.processed_files = processed_files
|
||||
|
||||
def upload_file(self, f, account, container, filename):
|
||||
self.test.assertEquals(self.processed_files,
|
||||
pickle.loads(f.getvalue()))
|
||||
self.test.assertEquals(self.daemon.log_processor_account,
|
||||
account)
|
||||
self.test.assertEquals(self.daemon.log_processor_container,
|
||||
container)
|
||||
self.test.assertEquals(self.daemon.processed_files_filename,
|
||||
filename)
|
||||
|
||||
class MockLogProcessor:
|
||||
def __init__(self, test, daemon, processed_files):
|
||||
self.internal_proxy = MockInternalProxy(test, daemon,
|
||||
processed_files)
|
||||
|
||||
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
|
||||
def __init__(self, test, processed_files):
|
||||
self.log_processor = \
|
||||
MockLogProcessor(test, self, processed_files)
|
||||
self.log_processor_account = 'account'
|
||||
self.log_processor_container = 'container'
|
||||
self.processed_files_filename = 'filename'
|
||||
|
||||
processed_files = set(['a', 'b', 'c'])
|
||||
MockLogProcessorDaemon(self, processed_files).\
|
||||
store_processed_files_list(processed_files)
|
||||
|
||||
def test_get_output(self):
|
||||
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
|
||||
def __init__(self):
|
||||
self._keylist_mapping = {'a':None, 'b':None, 'c':None}
|
||||
|
||||
data_in = {
|
||||
('acct1', 2010, 1, 1, 0): {'a':1, 'b':2, 'c':3},
|
||||
('acct1', 2010, 10, 10, 10): {'a':10, 'b':20, 'c':30},
|
||||
('acct2', 2008, 3, 6, 9): {'a':8, 'b':9, 'c':12},
|
||||
('acct3', 2005, 4, 8, 16): {'a':1, 'b':5, 'c':25},
|
||||
}
|
||||
|
||||
expected_data_out = [
|
||||
['data_ts', 'account', 'a', 'b', 'c'],
|
||||
['2010/01/01 00:00:00', 'acct1', '1', '2', '3'],
|
||||
['2010/10/10 10:00:00', 'acct1', '10', '20', '30'],
|
||||
['2008/03/06 09:00:00', 'acct2', '8', '9', '12'],
|
||||
['2005/04/08 16:00:00', 'acct3', '1', '5', '25'],
|
||||
]
|
||||
|
||||
data_out = MockLogProcessorDaemon().get_output(data_in)
|
||||
self.assertEquals(expected_data_out[0], data_out[0])
|
||||
|
||||
for row in data_out[1:]:
|
||||
self.assert_(row in expected_data_out)
|
||||
|
||||
for row in expected_data_out[1:]:
|
||||
self.assert_(row in data_out)
|
||||
|
||||
def test_store_output(self):
|
||||
try:
|
||||
real_strftime = time.strftime
|
||||
mock_strftime_return = '2010/03/02/01/'
|
||||
def mock_strftime(format):
|
||||
self.assertEquals('%Y/%m/%d/%H/', format)
|
||||
return mock_strftime_return
|
||||
log_processor.time.strftime = mock_strftime
|
||||
|
||||
data_in = [
|
||||
['data_ts', 'account', 'a', 'b', 'c'],
|
||||
['2010/10/10 10:00:00', 'acct1', '1', '2', '3'],
|
||||
['2010/10/10 10:00:00', 'acct1', '10', '20', '30'],
|
||||
['2008/03/06 09:00:00', 'acct2', '8', '9', '12'],
|
||||
['2005/04/08 16:00:00', 'acct3', '1', '5', '25'],
|
||||
]
|
||||
|
||||
expected_output = '\n'.join([','.join(row) for row in data_in])
|
||||
h = hashlib.md5(expected_output).hexdigest()
|
||||
expected_filename = '%s%s.csv.gz' % (mock_strftime_return, h)
|
||||
|
||||
class MockInternalProxy:
|
||||
def __init__(self, test, daemon, expected_filename,
|
||||
expected_output):
|
||||
self.test = test
|
||||
self.daemon = daemon
|
||||
self.expected_filename = expected_filename
|
||||
self.expected_output = expected_output
|
||||
|
||||
def upload_file(self, f, account, container, filename):
|
||||
self.test.assertEquals(self.daemon.log_processor_account,
|
||||
account)
|
||||
self.test.assertEquals(self.daemon.log_processor_container,
|
||||
container)
|
||||
self.test.assertEquals(self.expected_filename, filename)
|
||||
self.test.assertEquals(self.expected_output, f.getvalue())
|
||||
|
||||
class MockLogProcessor:
|
||||
def __init__(self, test, daemon, expected_filename,
|
||||
expected_output):
|
||||
self.internal_proxy = MockInternalProxy(test, daemon,
|
||||
expected_filename, expected_output)
|
||||
|
||||
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
|
||||
def __init__(self, test, expected_filename, expected_output):
|
||||
self.log_processor = MockLogProcessor(test, self,
|
||||
expected_filename, expected_output)
|
||||
self.log_processor_account = 'account'
|
||||
self.log_processor_container = 'container'
|
||||
self.processed_files_filename = 'filename'
|
||||
|
||||
MockLogProcessorDaemon(self, expected_filename, expected_output).\
|
||||
store_output(data_in)
|
||||
finally:
|
||||
log_processor.time.strftime = real_strftime
|
||||
|
||||
def test_keylist_mapping(self):
|
||||
# Kind of lame test to see if the propery is both
|
||||
# generated by a particular method and cached properly.
|
||||
# The method that actually generates the mapping is
|
||||
# tested elsewhere.
|
||||
|
||||
value_return = 'keylist_mapping'
|
||||
class MockLogProcessor:
|
||||
def __init__(self):
|
||||
self.call_count = 0
|
||||
|
||||
def generate_keylist_mapping(self):
|
||||
self.call_count += 1
|
||||
return value_return
|
||||
|
||||
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
|
||||
def __init__(self):
|
||||
self.log_processor = MockLogProcessor()
|
||||
self._keylist_mapping = None
|
||||
|
||||
d = MockLogProcessorDaemon()
|
||||
self.assertEquals(value_return, d.keylist_mapping)
|
||||
self.assertEquals(value_return, d.keylist_mapping)
|
||||
self.assertEquals(1, d.log_processor.call_count)
|
||||
|
||||
def test_process_logs(self):
|
||||
try:
|
||||
mock_logs_to_process = 'logs_to_process'
|
||||
mock_processed_files = 'processed_files'
|
||||
|
||||
real_multiprocess_collate = log_processor.multiprocess_collate
|
||||
multiprocess_collate_return = 'multiprocess_collate_return'
|
||||
|
||||
get_aggregate_data_return = 'get_aggregate_data_return'
|
||||
get_final_info_return = 'get_final_info_return'
|
||||
get_output_return = 'get_output_return'
|
||||
|
||||
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
|
||||
def __init__(self, test):
|
||||
self.test = test
|
||||
self.total_conf = 'total_conf'
|
||||
self.logger = 'logger'
|
||||
self.worker_count = 'worker_count'
|
||||
|
||||
def get_aggregate_data(self, processed_files, results):
|
||||
self.test.assertEquals(mock_processed_files, processed_files)
|
||||
self.test.assertEquals(multiprocess_collate_return, results)
|
||||
return get_aggregate_data_return
|
||||
|
||||
def get_final_info(self, aggr_data):
|
||||
self.test.assertEquals(get_aggregate_data_return, aggr_data)
|
||||
return get_final_info_return
|
||||
|
||||
def get_output(self, final_info):
|
||||
self.test.assertEquals(get_final_info_return, final_info)
|
||||
return get_output_return
|
||||
|
||||
d = MockLogProcessorDaemon(self)
|
||||
|
||||
def mock_multiprocess_collate(processor_args, logs_to_process,
|
||||
worker_count):
|
||||
self.assertEquals(d.total_conf, processor_args[0])
|
||||
self.assertEquals(d.logger, processor_args[1])
|
||||
|
||||
self.assertEquals(mock_logs_to_process, logs_to_process)
|
||||
self.assertEquals(d.worker_count, worker_count)
|
||||
|
||||
return multiprocess_collate_return
|
||||
|
||||
log_processor.multiprocess_collate = mock_multiprocess_collate
|
||||
|
||||
output = d.process_logs(mock_logs_to_process, mock_processed_files)
|
||||
self.assertEquals(get_output_return, output)
|
||||
finally:
|
||||
log_processor.multiprocess_collate = real_multiprocess_collate
|
||||
|
||||
def test_run_once_get_processed_files_list_returns_none(self):
|
||||
class MockLogProcessor:
|
||||
def get_data_list(self, lookback_start, lookback_end,
|
||||
processed_files):
|
||||
raise unittest.TestCase.failureException, \
|
||||
'Method should not be called'
|
||||
|
||||
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
|
||||
def __init__(self):
|
||||
self.logger = DumbLogger()
|
||||
self.log_processor = MockLogProcessor()
|
||||
|
||||
def get_lookback_interval(self):
|
||||
return None, None
|
||||
|
||||
def get_processed_files_list(self):
|
||||
return None
|
||||
|
||||
MockLogProcessorDaemon().run_once()
|
||||
|
||||
def test_run_once_no_logs_to_process(self):
|
||||
class MockLogProcessor():
|
||||
def __init__(self, daemon, test):
|
||||
self.daemon = daemon
|
||||
self.test = test
|
||||
|
||||
def get_data_list(self, lookback_start, lookback_end,
|
||||
processed_files):
|
||||
self.test.assertEquals(self.daemon.lookback_start,
|
||||
lookback_start)
|
||||
self.test.assertEquals(self.daemon.lookback_end,
|
||||
lookback_end)
|
||||
self.test.assertEquals(self.daemon.processed_files,
|
||||
processed_files)
|
||||
return []
|
||||
|
||||
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
|
||||
def __init__(self, test):
|
||||
self.logger = DumbLogger()
|
||||
self.log_processor = MockLogProcessor(self, test)
|
||||
self.lookback_start = 'lookback_start'
|
||||
self.lookback_end = 'lookback_end'
|
||||
self.processed_files = ['a', 'b', 'c']
|
||||
|
||||
def get_lookback_interval(self):
|
||||
return self.lookback_start, self.lookback_end
|
||||
|
||||
def get_processed_files_list(self):
|
||||
return self.processed_files
|
||||
|
||||
def process_logs(logs_to_process, processed_files):
|
||||
raise unittest.TestCase.failureException, \
|
||||
'Method should not be called'
|
||||
|
||||
MockLogProcessorDaemon(self).run_once()
|
||||
|
Loading…
x
Reference in New Issue
Block a user