diff --git a/swift/stats/log_processor.py b/swift/stats/log_processor.py index 29f8a75245..f2cfd220ce 100644 --- a/swift/stats/log_processor.py +++ b/swift/stats/log_processor.py @@ -30,6 +30,8 @@ from swift.common.exceptions import ChunkReadTimeout from swift.common.utils import get_logger, readconf from swift.common.daemon import Daemon +now = datetime.datetime.now + class BadFileDownload(Exception): def __init__(self, status_code=None): @@ -234,31 +236,46 @@ class LogProcessorDaemon(Daemon): self.log_processor_container = c.get('container_name', 'log_processing_data') self.worker_count = int(c.get('worker_count', '1')) + self._keylist_mapping = None + self.processed_files_filename = 'processed_files.pickle.gz' - def run_once(self, *args, **kwargs): - for k in 'lookback_hours lookback_window'.split(): - if kwargs[k] is not None: - setattr(self, k, kwargs[k]) + def get_lookback_interval(self): + """ + :returns: lookback_start, lookback_end. + + Both or just lookback_end can be None. Otherwise, returns strings + of the form 'YYYYMMDDHH'. The interval returned is used as bounds + when looking for logs to processes. + + A returned None means don't limit the log files examined on that + side of the interval. + """ - self.logger.info(_("Beginning log processing")) - start = time.time() if self.lookback_hours == 0: lookback_start = None lookback_end = None else: delta_hours = datetime.timedelta(hours=self.lookback_hours) - lookback_start = datetime.datetime.now() - delta_hours + lookback_start = now() - delta_hours lookback_start = lookback_start.strftime('%Y%m%d%H') if self.lookback_window == 0: lookback_end = None else: delta_window = datetime.timedelta(hours=self.lookback_window) - lookback_end = datetime.datetime.now() - \ + lookback_end = now() - \ delta_hours + \ delta_window lookback_end = lookback_end.strftime('%Y%m%d%H') - self.logger.debug('lookback_start: %s' % lookback_start) - self.logger.debug('lookback_end: %s' % lookback_end) + return lookback_start, lookback_end + + def get_processed_files_list(self): + """ + :returns: a set of files that have already been processed or returns + None on error. + + Downloads the set from the stats account. Creates an empty set if + the an existing file cannot be found. + """ try: # Note: this file (or data set) will grow without bound. # In practice, if it becomes a problem (say, after many months of @@ -266,44 +283,52 @@ class LogProcessorDaemon(Daemon): # entries. Automatically pruning on each run could be dangerous. # There is not a good way to determine when an old entry should be # pruned (lookback_hours could be set to anything and could change) - processed_files_stream = self.log_processor.get_object_data( - self.log_processor_account, - self.log_processor_container, - 'processed_files.pickle.gz', - compressed=True) - buf = '\n'.join(x for x in processed_files_stream) + stream = self.log_processor.get_object_data( + self.log_processor_account, + self.log_processor_container, + self.processed_files_filename, + compressed=True) + buf = '\n'.join(x for x in stream) if buf: - already_processed_files = cPickle.loads(buf) + files = cPickle.loads(buf) else: - already_processed_files = set() + return None except BadFileDownload, err: if err.status_code == 404: - already_processed_files = set() + files = set() else: - self.logger.error(_('Log processing unable to load list of ' - 'already processed log files')) - return - self.logger.debug(_('found %d processed files') % \ - len(already_processed_files)) - logs_to_process = self.log_processor.get_data_list(lookback_start, - lookback_end, - already_processed_files) - self.logger.info(_('loaded %d files to process') % - len(logs_to_process)) - if not logs_to_process: - self.logger.info(_("Log processing done (%0.2f minutes)") % - ((time.time() - start) / 60)) - return + return None + return files - # map - processor_args = (self.total_conf, self.logger) - results = multiprocess_collate(processor_args, logs_to_process, - self.worker_count) + def get_aggregate_data(self, processed_files, input_data): + """ + Aggregates stats data by account/hour, summing as needed. + + :param processed_files: set of processed files + :param input_data: is the output from multiprocess_collate/the plugins. + + :returns: A dict containing data aggregated from the input_data + passed in. + + The dict returned has tuple keys of the form: + (account, year, month, day, hour) + The dict returned has values that are dicts with items of this + form: + key:field_value + - key corresponds to something in one of the plugin's keylist + mapping, something like the tuple (source, level, verb, code) + - field_value is the sum of the field_values for the + corresponding values in the input + + Both input_data and the dict returned are hourly aggregations of + stats. + + Multiple values for the same (account, hour, tuple key) found in + input_data are summed in the dict returned. + """ - #reduce aggr_data = {} - processed_files = already_processed_files - for item, data in results: + for item, data in input_data: # since item contains the plugin and the log name, new plugins will # "reprocess" the file and the results will be in the final csv. processed_files.add(item) @@ -315,14 +340,30 @@ class LogProcessorDaemon(Daemon): # processing plugins need to realize this existing_data[i] = current + j aggr_data[k] = existing_data + return aggr_data + + def get_final_info(self, aggr_data): + """ + Aggregates data from aggr_data based on the keylist mapping. + + :param aggr_data: The results of the get_aggregate_data function. + :returns: a dict of further aggregated data + + The dict returned has keys of the form: + (account, year, month, day, hour) + The dict returned has values that are dicts with items of this + form: + 'field_name': field_value (int) + + Data is aggregated as specified by the keylist mapping. The + keylist mapping specifies which keys to combine in aggr_data + and the final field_names for these combined keys in the dict + returned. Fields combined are summed. + """ - # group - # reduce a large number of keys in aggr_data[k] to a small number of - # output keys - keylist_mapping = self.log_processor.generate_keylist_mapping() final_info = collections.defaultdict(dict) for account, data in aggr_data.items(): - for key, mapping in keylist_mapping.items(): + for key, mapping in self.keylist_mapping.items(): if isinstance(mapping, (list, set)): value = 0 for k in mapping: @@ -336,37 +377,154 @@ class LogProcessorDaemon(Daemon): except KeyError: value = 0 final_info[account][key] = value + return final_info - # output - sorted_keylist_mapping = sorted(keylist_mapping) - columns = 'data_ts,account,' + ','.join(sorted_keylist_mapping) - out_buf = [columns] + def store_processed_files_list(self, processed_files): + """ + Stores the proccessed files list in the stats account. + + :param processed_files: set of processed files + """ + + s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL) + f = cStringIO.StringIO(s) + self.log_processor.internal_proxy.upload_file(f, + self.log_processor_account, + self.log_processor_container, + self.processed_files_filename) + + def get_output(self, final_info): + """ + :returns: a list of rows to appear in the csv file. + + The first row contains the column headers for the rest of the + rows in the returned list. + + Each row after the first row corresponds to an account's data + for that hour. + """ + + sorted_keylist_mapping = sorted(self.keylist_mapping) + columns = ['data_ts', 'account'] + sorted_keylist_mapping + output = [columns] for (account, year, month, day, hour), d in final_info.items(): - data_ts = '%s/%s/%s %s:00:00' % (year, month, day, hour) - row = [data_ts] - row.append('%s' % account) + data_ts = '%04d/%02d/%02d %02d:00:00' % \ + (int(year), int(month), int(day), int(hour)) + row = [data_ts, '%s' % (account)] for k in sorted_keylist_mapping: - row.append('%s' % d[k]) - out_buf.append(','.join(row)) - out_buf = '\n'.join(out_buf) + row.append(str(d[k])) + output.append(row) + return output + + def store_output(self, output): + """ + Takes the a list of rows and stores a csv file of the values in the + stats account. + + :param output: list of rows to appear in the csv file + + This csv file is final product of this script. + """ + + out_buf = '\n'.join([','.join(row) for row in output]) h = hashlib.md5(out_buf).hexdigest() upload_name = time.strftime('%Y/%m/%d/%H/') + '%s.csv.gz' % h f = cStringIO.StringIO(out_buf) self.log_processor.internal_proxy.upload_file(f, - self.log_processor_account, - self.log_processor_container, - upload_name) + self.log_processor_account, + self.log_processor_container, + upload_name) - # cleanup - s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL) - f = cStringIO.StringIO(s) - self.log_processor.internal_proxy.upload_file(f, - self.log_processor_account, - self.log_processor_container, - 'processed_files.pickle.gz') + @property + def keylist_mapping(self): + """ + :returns: the keylist mapping. + + The keylist mapping determines how the stats fields are aggregated in + the final aggregation step. + """ + + if self._keylist_mapping == None: + self._keylist_mapping = \ + self.log_processor.generate_keylist_mapping() + return self._keylist_mapping + + def process_logs(self, logs_to_process, processed_files): + """ + :param logs_to_process: list of logs to process + :param processed_files: set of processed files + + :returns: returns a list of rows of processed data. + + The first row is the column headers. The rest of the rows contain + hourly aggregate data for the account specified in the row. + + Files processed are added to the processed_files set. + + When a large data structure is no longer needed, it is deleted in + an effort to conserve memory. + """ + + # map + processor_args = (self.total_conf, self.logger) + results = multiprocess_collate(processor_args, logs_to_process, + self.worker_count) + + # reduce + aggr_data = self.get_aggregate_data(processed_files, results) + del results + + # group + # reduce a large number of keys in aggr_data[k] to a small + # number of output keys + final_info = self.get_final_info(aggr_data) + del aggr_data + + # output + return self.get_output(final_info) + + def run_once(self, *args, **kwargs): + """ + Process log files that fall within the lookback interval. + + Upload resulting csv file to stats account. + + Update processed files list and upload to stats account. + """ + + for k in 'lookback_hours lookback_window'.split(): + if k in kwargs and kwargs[k] is not None: + setattr(self, k, kwargs[k]) + + start = time.time() + self.logger.info(_("Beginning log processing")) + + lookback_start, lookback_end = self.get_lookback_interval() + self.logger.debug('lookback_start: %s' % lookback_start) + self.logger.debug('lookback_end: %s' % lookback_end) + + processed_files = self.get_processed_files_list() + if processed_files == None: + self.logger.error(_('Log processing unable to load list of ' + 'already processed log files')) + return + self.logger.debug(_('found %d processed files') % + len(processed_files)) + + logs_to_process = self.log_processor.get_data_list(lookback_start, + lookback_end, processed_files) + self.logger.info(_('loaded %d files to process') % + len(logs_to_process)) + + if logs_to_process: + output = self.process_logs(logs_to_process, processed_files) + self.store_output(output) + del output + + self.store_processed_files_list(processed_files) self.logger.info(_("Log processing done (%0.2f minutes)") % - ((time.time() - start) / 60)) + ((time.time() - start) / 60)) def multiprocess_collate(processor_args, logs_to_process, worker_count): diff --git a/test/unit/stats/test_log_processor.py b/test/unit/stats/test_log_processor.py index 80b4822560..fa90ec5825 100644 --- a/test/unit/stats/test_log_processor.py +++ b/test/unit/stats/test_log_processor.py @@ -16,6 +16,10 @@ import unittest from test.unit import tmpfile import Queue +import datetime +import hashlib +import pickle +import time from swift.common import internal_proxy from swift.stats import log_processor @@ -26,7 +30,6 @@ class FakeUploadApp(object): def __init__(self, *args, **kwargs): pass - class DumbLogger(object): def __getattr__(self, n): return self.foo @@ -77,7 +80,7 @@ class DumbInternalProxy(object): return self.code, data() class TestLogProcessor(unittest.TestCase): - + access_test_line = 'Jul 9 04:14:30 saio proxy-server 1.2.3.4 4.5.6.7 '\ '09/Jul/2010/04/14/30 GET '\ '/v1/acct/foo/bar?format=json&foo HTTP/1.0 200 - '\ @@ -85,7 +88,7 @@ class TestLogProcessor(unittest.TestCase): '6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262' stats_test_line = 'account,1,2,3' proxy_config = {'log-processor': { - + } } @@ -426,3 +429,407 @@ use = egg:swift#proxy finally: log_processor.LogProcessor._internal_proxy = None log_processor.LogProcessor.get_object_data = orig_get_object_data + +class TestLogProcessorDaemon(unittest.TestCase): + + def test_get_lookback_interval(self): + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self, lookback_hours, lookback_window): + self.lookback_hours = lookback_hours + self.lookback_window = lookback_window + + try: + d = datetime.datetime + + for x in [ + [d(2011, 1, 1), 0, 0, None, None], + [d(2011, 1, 1), 120, 0, '2010122700', None], + [d(2011, 1, 1), 120, 24, '2010122700', '2010122800'], + [d(2010, 1, 2, 3, 4), 120, 48, '2009122803', '2009123003'], + [d(2009, 5, 6, 7, 8), 1200, 100, '2009031707', '2009032111'], + [d(2008, 9, 10, 11, 12), 3000, 1000, '2008050811', '2008061903'], + ]: + + log_processor.now = lambda: x[0] + + d = MockLogProcessorDaemon(x[1], x[2]) + self.assertEquals((x[3], x[4]), d.get_lookback_interval()) + finally: + log_processor.now = datetime.datetime.now + + def test_get_processed_files_list(self): + class MockLogProcessor(): + def __init__(self, stream): + self.stream = stream + + def get_object_data(self, *args, **kwargs): + return self.stream + + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self, stream): + self.log_processor = MockLogProcessor(stream) + self.log_processor_account = 'account' + self.log_processor_container = 'container' + self.processed_files_filename = 'filename' + + file_list = set(['a', 'b', 'c']) + + for s, l in [['', None], + [pickle.dumps(set()).split('\n'), set()], + [pickle.dumps(file_list).split('\n'), file_list], + ]: + + self.assertEquals(l, + MockLogProcessorDaemon(s).get_processed_files_list()) + + def test_get_processed_files_list_bad_file_downloads(self): + class MockLogProcessor(): + def __init__(self, status_code): + self.err = log_processor.BadFileDownload(status_code) + + def get_object_data(self, *a, **k): + raise self.err + + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self, status_code): + self.log_processor = MockLogProcessor(status_code) + self.log_processor_account = 'account' + self.log_processor_container = 'container' + self.processed_files_filename = 'filename' + + for c, l in [[404, set()], [503, None], [None, None]]: + self.assertEquals(l, + MockLogProcessorDaemon(c).get_processed_files_list()) + + def test_get_aggregate_data(self): + # when run "for real" + # the various keys/values in the input and output + # dictionaries are often not simple strings + # for testing we can use keys that are easier to work with + + processed_files = set() + + data_in = [ + ['file1', { + 'acct1_time1': {'field1': 1, 'field2': 2, 'field3': 3}, + 'acct1_time2': {'field1': 4, 'field2': 5}, + 'acct2_time1': {'field1': 6, 'field2': 7}, + 'acct3_time3': {'field1': 8, 'field2': 9}, + } + ], + ['file2', {'acct1_time1': {'field1': 10}}], + ] + + expected_data_out = { + 'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3}, + 'acct1_time2': {'field1': 4, 'field2': 5}, + 'acct2_time1': {'field1': 6, 'field2': 7}, + 'acct3_time3': {'field1': 8, 'field2': 9}, + } + + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self): + pass + + d = MockLogProcessorDaemon() + data_out = d.get_aggregate_data(processed_files, data_in) + + for k, v in expected_data_out.items(): + self.assertEquals(v, data_out[k]) + + self.assertEquals(set(['file1', 'file2']), processed_files) + + def test_get_final_info(self): + # when run "for real" + # the various keys/values in the input and output + # dictionaries are often not simple strings + # for testing we can use keys/values that are easier to work with + + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self): + self._keylist_mapping = { + 'out_field1':['field1', 'field2', 'field3'], + 'out_field2':['field2', 'field3'], + 'out_field3':['field3'], + 'out_field4':'field4', + 'out_field5':['field6', 'field7', 'field8'], + 'out_field6':['field6'], + 'out_field7':'field7', + } + + data_in = { + 'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3, + 'field4': 8, 'field5': 11}, + 'acct1_time2': {'field1': 4, 'field2': 5}, + 'acct2_time1': {'field1': 6, 'field2': 7}, + 'acct3_time3': {'field1': 8, 'field2': 9}, + } + + expected_data_out = { + 'acct1_time1': {'out_field1': 16, 'out_field2': 5, + 'out_field3': 3, 'out_field4': 8, 'out_field5': 0, + 'out_field6': 0, 'out_field7': 0,}, + 'acct1_time2': {'out_field1': 9, 'out_field2': 5, + 'out_field3': 0, 'out_field4': 0, 'out_field5': 0, + 'out_field6': 0, 'out_field7': 0,}, + 'acct2_time1': {'out_field1': 13, 'out_field2': 7, + 'out_field3': 0, 'out_field4': 0, 'out_field5': 0, + 'out_field6': 0, 'out_field7': 0,}, + 'acct3_time3': {'out_field1': 17, 'out_field2': 9, + 'out_field3': 0, 'out_field4': 0, 'out_field5': 0, + 'out_field6': 0, 'out_field7': 0,}, + } + + self.assertEquals(expected_data_out, + MockLogProcessorDaemon().get_final_info(data_in)) + + def test_store_processed_files_list(self): + class MockInternalProxy: + def __init__(self, test, daemon, processed_files): + self.test = test + self.daemon = daemon + self.processed_files = processed_files + + def upload_file(self, f, account, container, filename): + self.test.assertEquals(self.processed_files, + pickle.loads(f.getvalue())) + self.test.assertEquals(self.daemon.log_processor_account, + account) + self.test.assertEquals(self.daemon.log_processor_container, + container) + self.test.assertEquals(self.daemon.processed_files_filename, + filename) + + class MockLogProcessor: + def __init__(self, test, daemon, processed_files): + self.internal_proxy = MockInternalProxy(test, daemon, + processed_files) + + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self, test, processed_files): + self.log_processor = \ + MockLogProcessor(test, self, processed_files) + self.log_processor_account = 'account' + self.log_processor_container = 'container' + self.processed_files_filename = 'filename' + + processed_files = set(['a', 'b', 'c']) + MockLogProcessorDaemon(self, processed_files).\ + store_processed_files_list(processed_files) + + def test_get_output(self): + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self): + self._keylist_mapping = {'a':None, 'b':None, 'c':None} + + data_in = { + ('acct1', 2010, 1, 1, 0): {'a':1, 'b':2, 'c':3}, + ('acct1', 2010, 10, 10, 10): {'a':10, 'b':20, 'c':30}, + ('acct2', 2008, 3, 6, 9): {'a':8, 'b':9, 'c':12}, + ('acct3', 2005, 4, 8, 16): {'a':1, 'b':5, 'c':25}, + } + + expected_data_out = [ + ['data_ts', 'account', 'a', 'b', 'c'], + ['2010/01/01 00:00:00', 'acct1', '1', '2', '3'], + ['2010/10/10 10:00:00', 'acct1', '10', '20', '30'], + ['2008/03/06 09:00:00', 'acct2', '8', '9', '12'], + ['2005/04/08 16:00:00', 'acct3', '1', '5', '25'], + ] + + data_out = MockLogProcessorDaemon().get_output(data_in) + self.assertEquals(expected_data_out[0], data_out[0]) + + for row in data_out[1:]: + self.assert_(row in expected_data_out) + + for row in expected_data_out[1:]: + self.assert_(row in data_out) + + def test_store_output(self): + try: + real_strftime = time.strftime + mock_strftime_return = '2010/03/02/01/' + def mock_strftime(format): + self.assertEquals('%Y/%m/%d/%H/', format) + return mock_strftime_return + log_processor.time.strftime = mock_strftime + + data_in = [ + ['data_ts', 'account', 'a', 'b', 'c'], + ['2010/10/10 10:00:00', 'acct1', '1', '2', '3'], + ['2010/10/10 10:00:00', 'acct1', '10', '20', '30'], + ['2008/03/06 09:00:00', 'acct2', '8', '9', '12'], + ['2005/04/08 16:00:00', 'acct3', '1', '5', '25'], + ] + + expected_output = '\n'.join([','.join(row) for row in data_in]) + h = hashlib.md5(expected_output).hexdigest() + expected_filename = '%s%s.csv.gz' % (mock_strftime_return, h) + + class MockInternalProxy: + def __init__(self, test, daemon, expected_filename, + expected_output): + self.test = test + self.daemon = daemon + self.expected_filename = expected_filename + self.expected_output = expected_output + + def upload_file(self, f, account, container, filename): + self.test.assertEquals(self.daemon.log_processor_account, + account) + self.test.assertEquals(self.daemon.log_processor_container, + container) + self.test.assertEquals(self.expected_filename, filename) + self.test.assertEquals(self.expected_output, f.getvalue()) + + class MockLogProcessor: + def __init__(self, test, daemon, expected_filename, + expected_output): + self.internal_proxy = MockInternalProxy(test, daemon, + expected_filename, expected_output) + + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self, test, expected_filename, expected_output): + self.log_processor = MockLogProcessor(test, self, + expected_filename, expected_output) + self.log_processor_account = 'account' + self.log_processor_container = 'container' + self.processed_files_filename = 'filename' + + MockLogProcessorDaemon(self, expected_filename, expected_output).\ + store_output(data_in) + finally: + log_processor.time.strftime = real_strftime + + def test_keylist_mapping(self): + # Kind of lame test to see if the propery is both + # generated by a particular method and cached properly. + # The method that actually generates the mapping is + # tested elsewhere. + + value_return = 'keylist_mapping' + class MockLogProcessor: + def __init__(self): + self.call_count = 0 + + def generate_keylist_mapping(self): + self.call_count += 1 + return value_return + + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self): + self.log_processor = MockLogProcessor() + self._keylist_mapping = None + + d = MockLogProcessorDaemon() + self.assertEquals(value_return, d.keylist_mapping) + self.assertEquals(value_return, d.keylist_mapping) + self.assertEquals(1, d.log_processor.call_count) + + def test_process_logs(self): + try: + mock_logs_to_process = 'logs_to_process' + mock_processed_files = 'processed_files' + + real_multiprocess_collate = log_processor.multiprocess_collate + multiprocess_collate_return = 'multiprocess_collate_return' + + get_aggregate_data_return = 'get_aggregate_data_return' + get_final_info_return = 'get_final_info_return' + get_output_return = 'get_output_return' + + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self, test): + self.test = test + self.total_conf = 'total_conf' + self.logger = 'logger' + self.worker_count = 'worker_count' + + def get_aggregate_data(self, processed_files, results): + self.test.assertEquals(mock_processed_files, processed_files) + self.test.assertEquals(multiprocess_collate_return, results) + return get_aggregate_data_return + + def get_final_info(self, aggr_data): + self.test.assertEquals(get_aggregate_data_return, aggr_data) + return get_final_info_return + + def get_output(self, final_info): + self.test.assertEquals(get_final_info_return, final_info) + return get_output_return + + d = MockLogProcessorDaemon(self) + + def mock_multiprocess_collate(processor_args, logs_to_process, + worker_count): + self.assertEquals(d.total_conf, processor_args[0]) + self.assertEquals(d.logger, processor_args[1]) + + self.assertEquals(mock_logs_to_process, logs_to_process) + self.assertEquals(d.worker_count, worker_count) + + return multiprocess_collate_return + + log_processor.multiprocess_collate = mock_multiprocess_collate + + output = d.process_logs(mock_logs_to_process, mock_processed_files) + self.assertEquals(get_output_return, output) + finally: + log_processor.multiprocess_collate = real_multiprocess_collate + + def test_run_once_get_processed_files_list_returns_none(self): + class MockLogProcessor: + def get_data_list(self, lookback_start, lookback_end, + processed_files): + raise unittest.TestCase.failureException, \ + 'Method should not be called' + + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self): + self.logger = DumbLogger() + self.log_processor = MockLogProcessor() + + def get_lookback_interval(self): + return None, None + + def get_processed_files_list(self): + return None + + MockLogProcessorDaemon().run_once() + + def test_run_once_no_logs_to_process(self): + class MockLogProcessor(): + def __init__(self, daemon, test): + self.daemon = daemon + self.test = test + + def get_data_list(self, lookback_start, lookback_end, + processed_files): + self.test.assertEquals(self.daemon.lookback_start, + lookback_start) + self.test.assertEquals(self.daemon.lookback_end, + lookback_end) + self.test.assertEquals(self.daemon.processed_files, + processed_files) + return [] + + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self, test): + self.logger = DumbLogger() + self.log_processor = MockLogProcessor(self, test) + self.lookback_start = 'lookback_start' + self.lookback_end = 'lookback_end' + self.processed_files = ['a', 'b', 'c'] + + def get_lookback_interval(self): + return self.lookback_start, self.lookback_end + + def get_processed_files_list(self): + return self.processed_files + + def process_logs(logs_to_process, processed_files): + raise unittest.TestCase.failureException, \ + 'Method should not be called' + + MockLogProcessorDaemon(self).run_once()