needed to push logging to where exception is caught
This commit is contained in:
parent
fb233f2060
commit
243fb2a49f
@ -468,7 +468,7 @@ class LogProcessorDaemon(Daemon):
|
||||
# map
|
||||
processor_args = (self.total_conf, self.logger)
|
||||
results = multiprocess_collate(processor_args, logs_to_process,
|
||||
self.worker_count, self.logger)
|
||||
self.worker_count)
|
||||
|
||||
# reduce
|
||||
aggr_data = self.get_aggregate_data(processed_files, results)
|
||||
@ -527,8 +527,7 @@ class LogProcessorDaemon(Daemon):
|
||||
((time.time() - start) / 60))
|
||||
|
||||
|
||||
def multiprocess_collate(processor_args, logs_to_process, worker_count,
|
||||
logger):
|
||||
def multiprocess_collate(processor_args, logs_to_process, worker_count):
|
||||
'''
|
||||
yield hourly data from logs_to_process
|
||||
Every item that this function yields will be added to the processed files
|
||||
@ -554,11 +553,7 @@ def multiprocess_collate(processor_args, logs_to_process, worker_count,
|
||||
except Queue.Empty:
|
||||
time.sleep(.01)
|
||||
else:
|
||||
if isinstance(data, Exception):
|
||||
item_string = '/'.join(item[2:])
|
||||
logger.exception("Problem processing file '%s'" %
|
||||
(item_string))
|
||||
else:
|
||||
if not isinstance(data, Exception):
|
||||
yield item, data
|
||||
if not any(r.is_alive() for r in results) and out_queue.empty():
|
||||
# all the workers are done and nothing is in the queue
|
||||
@ -576,5 +571,7 @@ def collate_worker(processor_args, in_queue, out_queue):
|
||||
try:
|
||||
ret = p.process_one_file(*item)
|
||||
except Exception, err:
|
||||
item_string = '/'.join(item[2:])
|
||||
p.logger.exception("Unable to process file '%s'" % (item_string))
|
||||
ret = err
|
||||
out_queue.put((item, ret))
|
||||
|
@ -387,8 +387,7 @@ use = egg:swift#proxy
|
||||
logs_to_process = [item]
|
||||
results = log_processor.multiprocess_collate(processor_args,
|
||||
logs_to_process,
|
||||
1,
|
||||
DumbLogger())
|
||||
1)
|
||||
results = list(results)
|
||||
expected = [(item, {('acct', '2010', '07', '09', '04'):
|
||||
{('public', 'object', 'GET', '2xx'): 1,
|
||||
@ -422,8 +421,7 @@ use = egg:swift#proxy
|
||||
logs_to_process = [item]
|
||||
results = log_processor.multiprocess_collate(processor_args,
|
||||
logs_to_process,
|
||||
1,
|
||||
DumbLogger())
|
||||
1)
|
||||
results = list(results)
|
||||
expected = []
|
||||
self.assertEquals(results, expected)
|
||||
@ -763,13 +761,12 @@ class TestLogProcessorDaemon(unittest.TestCase):
|
||||
d = MockLogProcessorDaemon(self)
|
||||
|
||||
def mock_multiprocess_collate(processor_args, logs_to_process,
|
||||
worker_count, logger):
|
||||
worker_count):
|
||||
self.assertEquals(d.total_conf, processor_args[0])
|
||||
self.assertEquals(d.logger, processor_args[1])
|
||||
|
||||
self.assertEquals(mock_logs_to_process, logs_to_process)
|
||||
self.assertEquals(d.worker_count, worker_count)
|
||||
self.assertEquals(d.logger, logger)
|
||||
|
||||
return multiprocess_collate_return
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user