made stats processing handle generic exception in collate_worker()

This commit is contained in:
Greg Lange 2011-04-13 14:52:25 +00:00
parent f3ab8693fa
commit fb233f2060
2 changed files with 17 additions and 10 deletions

View File

@ -468,7 +468,7 @@ class LogProcessorDaemon(Daemon):
# map
processor_args = (self.total_conf, self.logger)
results = multiprocess_collate(processor_args, logs_to_process,
self.worker_count)
self.worker_count, self.logger)
# reduce
aggr_data = self.get_aggregate_data(processed_files, results)
@ -527,7 +527,8 @@ class LogProcessorDaemon(Daemon):
((time.time() - start) / 60))
def multiprocess_collate(processor_args, logs_to_process, worker_count):
def multiprocess_collate(processor_args, logs_to_process, worker_count,
logger):
'''
yield hourly data from logs_to_process
Every item that this function yields will be added to the processed files
@ -553,7 +554,11 @@ def multiprocess_collate(processor_args, logs_to_process, worker_count):
except Queue.Empty:
time.sleep(.01)
else:
if not isinstance(data, BadFileDownload):
if isinstance(data, Exception):
item_string = '/'.join(item[2:])
logger.exception("Problem processing file '%s'" %
(item_string))
else:
yield item, data
if not any(r.is_alive() for r in results) and out_queue.empty():
# all the workers are done and nothing is in the queue
@ -570,6 +575,6 @@ def collate_worker(processor_args, in_queue, out_queue):
break
try:
ret = p.process_one_file(*item)
except BadFileDownload, err:
except Exception, err:
ret = err
out_queue.put((item, ret))

View File

@ -342,7 +342,7 @@ use = egg:swift#proxy
def test_collate_worker_error(self):
def get_object_data(*a,**kw):
raise log_processor.BadFileDownload()
raise Exception()
orig_get_object_data = log_processor.LogProcessor.get_object_data
try:
log_processor.LogProcessor.get_object_data = get_object_data
@ -364,8 +364,7 @@ use = egg:swift#proxy
self.assertEquals(item, work_request)
# these only work for Py2.7+
#self.assertIsInstance(ret, log_processor.BadFileDownload)
self.assertTrue(isinstance(ret, log_processor.BadFileDownload),
type(ret))
self.assertTrue(isinstance(ret, Exception), type(ret))
finally:
log_processor.LogProcessor.get_object_data = orig_get_object_data
@ -388,7 +387,8 @@ use = egg:swift#proxy
logs_to_process = [item]
results = log_processor.multiprocess_collate(processor_args,
logs_to_process,
1)
1,
DumbLogger())
results = list(results)
expected = [(item, {('acct', '2010', '07', '09', '04'):
{('public', 'object', 'GET', '2xx'): 1,
@ -422,7 +422,8 @@ use = egg:swift#proxy
logs_to_process = [item]
results = log_processor.multiprocess_collate(processor_args,
logs_to_process,
1)
1,
DumbLogger())
results = list(results)
expected = []
self.assertEquals(results, expected)
@ -762,12 +763,13 @@ class TestLogProcessorDaemon(unittest.TestCase):
d = MockLogProcessorDaemon(self)
def mock_multiprocess_collate(processor_args, logs_to_process,
worker_count):
worker_count, logger):
self.assertEquals(d.total_conf, processor_args[0])
self.assertEquals(d.logger, processor_args[1])
self.assertEquals(mock_logs_to_process, logs_to_process)
self.assertEquals(d.worker_count, worker_count)
self.assertEquals(d.logger, logger)
return multiprocess_collate_return