fixed edge case when log processor cannot get a log file for processing
This commit is contained in:
commit
52217d8e5f
@ -159,11 +159,10 @@ class LogProcessor(object):
|
||||
def get_object_data(self, swift_account, container_name, object_name,
|
||||
compressed=False):
|
||||
'''reads an object and yields its lines'''
|
||||
code, o = self.internal_proxy.get_object(swift_account,
|
||||
container_name,
|
||||
object_name)
|
||||
code, o = self.internal_proxy.get_object(swift_account, container_name,
|
||||
object_name)
|
||||
if code < 200 or code >= 300:
|
||||
return
|
||||
raise BadFileDownload()
|
||||
last_part = ''
|
||||
last_compressed_part = ''
|
||||
# magic in the following zlib.decompressobj argument is courtesy of
|
||||
@ -273,7 +272,7 @@ class LogProcessorDaemon(Daemon):
|
||||
already_processed_files = cPickle.loads(buf)
|
||||
else:
|
||||
already_processed_files = set()
|
||||
except Exception:
|
||||
except BadFileDownload:
|
||||
already_processed_files = set()
|
||||
self.logger.debug(_('found %d processed files') % \
|
||||
len(already_processed_files))
|
||||
@ -362,7 +361,11 @@ class LogProcessorDaemon(Daemon):
|
||||
|
||||
|
||||
def multiprocess_collate(processor_args, logs_to_process, worker_count):
|
||||
'''yield hourly data from logs_to_process'''
|
||||
'''
|
||||
yield hourly data from logs_to_process
|
||||
Every item that this function yields will be added to the processed files
|
||||
list.
|
||||
'''
|
||||
results = []
|
||||
in_queue = multiprocessing.Queue()
|
||||
out_queue = multiprocessing.Queue()
|
||||
@ -376,33 +379,30 @@ def multiprocess_collate(processor_args, logs_to_process, worker_count):
|
||||
for x in logs_to_process:
|
||||
in_queue.put(x)
|
||||
for _junk in range(worker_count):
|
||||
in_queue.put(None)
|
||||
count = 0
|
||||
in_queue.put(None) # tell the worker to end
|
||||
while True:
|
||||
try:
|
||||
item, data = out_queue.get_nowait()
|
||||
count += 1
|
||||
if data:
|
||||
yield item, data
|
||||
if count >= len(logs_to_process):
|
||||
# this implies that one result will come from every request
|
||||
break
|
||||
except Queue.Empty:
|
||||
time.sleep(.1)
|
||||
for r in results:
|
||||
r.join()
|
||||
time.sleep(.01)
|
||||
else:
|
||||
if not isinstance(data, BadFileDownload):
|
||||
yield item, data
|
||||
if not any(r.is_alive() for r in results) and out_queue.empty():
|
||||
# all the workers are done and nothing is in the queue
|
||||
break
|
||||
|
||||
|
||||
def collate_worker(processor_args, in_queue, out_queue):
|
||||
'''worker process for multiprocess_collate'''
|
||||
p = LogProcessor(*processor_args)
|
||||
while True:
|
||||
item = in_queue.get()
|
||||
if item is None:
|
||||
# no more work to process
|
||||
break
|
||||
try:
|
||||
item = in_queue.get_nowait()
|
||||
if item is None:
|
||||
break
|
||||
except Queue.Empty:
|
||||
time.sleep(.1)
|
||||
else:
|
||||
ret = p.process_one_file(*item)
|
||||
out_queue.put((item, ret))
|
||||
except BadFileDownload, err:
|
||||
ret = err
|
||||
out_queue.put((item, ret))
|
||||
|
@ -15,9 +15,11 @@
|
||||
|
||||
import unittest
|
||||
from test.unit import tmpfile
|
||||
import Queue
|
||||
|
||||
from swift.common import internal_proxy
|
||||
from swift.stats import log_processor
|
||||
from swift.common.exceptions import ChunkReadTimeout
|
||||
|
||||
|
||||
class FakeUploadApp(object):
|
||||
@ -33,6 +35,11 @@ class DumbLogger(object):
|
||||
pass
|
||||
|
||||
class DumbInternalProxy(object):
|
||||
def __init__(self, code=200, timeout=False, bad_compressed=False):
|
||||
self.code = code
|
||||
self.timeout = timeout
|
||||
self.bad_compressed = bad_compressed
|
||||
|
||||
def get_container_list(self, account, container, marker=None,
|
||||
end_marker=None):
|
||||
n = '2010/03/14/13/obj1'
|
||||
@ -46,22 +53,28 @@ class DumbInternalProxy(object):
|
||||
return []
|
||||
|
||||
def get_object(self, account, container, object_name):
|
||||
code = 200
|
||||
if object_name.endswith('.gz'):
|
||||
# same data as below, compressed with gzip -9
|
||||
def data():
|
||||
yield '\x1f\x8b\x08'
|
||||
yield '\x08"\xd79L'
|
||||
yield '\x02\x03te'
|
||||
yield 'st\x00\xcbO'
|
||||
yield '\xca\xe2JI,I'
|
||||
yield '\xe4\x02\x00O\xff'
|
||||
yield '\xa3Y\t\x00\x00\x00'
|
||||
if self.bad_compressed:
|
||||
# invalid compressed data
|
||||
def data():
|
||||
yield '\xff\xff\xff\xff\xff\xff\xff'
|
||||
else:
|
||||
# 'obj\ndata', compressed with gzip -9
|
||||
def data():
|
||||
yield '\x1f\x8b\x08'
|
||||
yield '\x08"\xd79L'
|
||||
yield '\x02\x03te'
|
||||
yield 'st\x00\xcbO'
|
||||
yield '\xca\xe2JI,I'
|
||||
yield '\xe4\x02\x00O\xff'
|
||||
yield '\xa3Y\t\x00\x00\x00'
|
||||
else:
|
||||
def data():
|
||||
yield 'obj\n'
|
||||
if self.timeout:
|
||||
raise ChunkReadTimeout
|
||||
yield 'data'
|
||||
return code, data()
|
||||
return self.code, data()
|
||||
|
||||
class TestLogProcessor(unittest.TestCase):
|
||||
|
||||
@ -159,6 +172,19 @@ use = egg:swift#proxy
|
||||
'prefix_query': 0}}
|
||||
self.assertEquals(result, expected)
|
||||
|
||||
def test_process_one_access_file_error(self):
|
||||
access_proxy_config = self.proxy_config.copy()
|
||||
access_proxy_config.update({
|
||||
'log-processor-access': {
|
||||
'source_filename_format':'%Y%m%d%H*',
|
||||
'class_path':
|
||||
'swift.stats.access_processor.AccessLogProcessor'
|
||||
}})
|
||||
p = log_processor.LogProcessor(access_proxy_config, DumbLogger())
|
||||
p._internal_proxy = DumbInternalProxy(code=500)
|
||||
self.assertRaises(log_processor.BadFileDownload, p.process_one_file,
|
||||
'access', 'a', 'c', 'o')
|
||||
|
||||
def test_get_container_listing(self):
|
||||
p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
|
||||
p._internal_proxy = DumbInternalProxy()
|
||||
@ -193,6 +219,18 @@ use = egg:swift#proxy
|
||||
result = list(p.get_object_data('a', 'c', 'o.gz', True))
|
||||
self.assertEquals(result, expected)
|
||||
|
||||
def test_get_object_data_errors(self):
|
||||
p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
|
||||
p._internal_proxy = DumbInternalProxy(code=500)
|
||||
result = p.get_object_data('a', 'c', 'o')
|
||||
self.assertRaises(log_processor.BadFileDownload, list, result)
|
||||
p._internal_proxy = DumbInternalProxy(bad_compressed=True)
|
||||
result = p.get_object_data('a', 'c', 'o.gz', True)
|
||||
self.assertRaises(log_processor.BadFileDownload, list, result)
|
||||
p._internal_proxy = DumbInternalProxy(timeout=True)
|
||||
result = p.get_object_data('a', 'c', 'o')
|
||||
self.assertRaises(log_processor.BadFileDownload, list, result)
|
||||
|
||||
def test_get_stat_totals(self):
|
||||
stats_proxy_config = self.proxy_config.copy()
|
||||
stats_proxy_config.update({
|
||||
@ -262,3 +300,130 @@ use = egg:swift#proxy
|
||||
# these only work for Py2.7+
|
||||
#self.assertIsInstance(k, str)
|
||||
self.assertTrue(isinstance(k, str), type(k))
|
||||
|
||||
def test_collate_worker(self):
|
||||
try:
|
||||
log_processor.LogProcessor._internal_proxy = DumbInternalProxy()
|
||||
def get_object_data(*a,**kw):
|
||||
return [self.access_test_line]
|
||||
orig_get_object_data = log_processor.LogProcessor.get_object_data
|
||||
log_processor.LogProcessor.get_object_data = get_object_data
|
||||
proxy_config = self.proxy_config.copy()
|
||||
proxy_config.update({
|
||||
'log-processor-access': {
|
||||
'source_filename_format':'%Y%m%d%H*',
|
||||
'class_path':
|
||||
'swift.stats.access_processor.AccessLogProcessor'
|
||||
}})
|
||||
processor_args = (proxy_config, DumbLogger())
|
||||
q_in = Queue.Queue()
|
||||
q_out = Queue.Queue()
|
||||
work_request = ('access', 'a','c','o')
|
||||
q_in.put(work_request)
|
||||
q_in.put(None)
|
||||
log_processor.collate_worker(processor_args, q_in, q_out)
|
||||
item, ret = q_out.get()
|
||||
self.assertEquals(item, work_request)
|
||||
expected = {('acct', '2010', '07', '09', '04'):
|
||||
{('public', 'object', 'GET', '2xx'): 1,
|
||||
('public', 'bytes_out'): 95,
|
||||
'marker_query': 0,
|
||||
'format_query': 1,
|
||||
'delimiter_query': 0,
|
||||
'path_query': 0,
|
||||
('public', 'bytes_in'): 6,
|
||||
'prefix_query': 0}}
|
||||
self.assertEquals(ret, expected)
|
||||
finally:
|
||||
log_processor.LogProcessor._internal_proxy = None
|
||||
log_processor.LogProcessor.get_object_data = orig_get_object_data
|
||||
|
||||
def test_collate_worker_error(self):
|
||||
def get_object_data(*a,**kw):
|
||||
raise log_processor.BadFileDownload()
|
||||
orig_get_object_data = log_processor.LogProcessor.get_object_data
|
||||
try:
|
||||
log_processor.LogProcessor.get_object_data = get_object_data
|
||||
proxy_config = self.proxy_config.copy()
|
||||
proxy_config.update({
|
||||
'log-processor-access': {
|
||||
'source_filename_format':'%Y%m%d%H*',
|
||||
'class_path':
|
||||
'swift.stats.access_processor.AccessLogProcessor'
|
||||
}})
|
||||
processor_args = (proxy_config, DumbLogger())
|
||||
q_in = Queue.Queue()
|
||||
q_out = Queue.Queue()
|
||||
work_request = ('access', 'a','c','o')
|
||||
q_in.put(work_request)
|
||||
q_in.put(None)
|
||||
log_processor.collate_worker(processor_args, q_in, q_out)
|
||||
item, ret = q_out.get()
|
||||
self.assertEquals(item, work_request)
|
||||
# these only work for Py2.7+
|
||||
#self.assertIsInstance(ret, log_processor.BadFileDownload)
|
||||
self.assertTrue(isinstance(ret, log_processor.BadFileDownload),
|
||||
type(ret))
|
||||
finally:
|
||||
log_processor.LogProcessor.get_object_data = orig_get_object_data
|
||||
|
||||
def test_multiprocess_collate(self):
|
||||
try:
|
||||
log_processor.LogProcessor._internal_proxy = DumbInternalProxy()
|
||||
def get_object_data(*a,**kw):
|
||||
return [self.access_test_line]
|
||||
orig_get_object_data = log_processor.LogProcessor.get_object_data
|
||||
log_processor.LogProcessor.get_object_data = get_object_data
|
||||
proxy_config = self.proxy_config.copy()
|
||||
proxy_config.update({
|
||||
'log-processor-access': {
|
||||
'source_filename_format':'%Y%m%d%H*',
|
||||
'class_path':
|
||||
'swift.stats.access_processor.AccessLogProcessor'
|
||||
}})
|
||||
processor_args = (proxy_config, DumbLogger())
|
||||
item = ('access', 'a','c','o')
|
||||
logs_to_process = [item]
|
||||
results = log_processor.multiprocess_collate(processor_args,
|
||||
logs_to_process,
|
||||
1)
|
||||
results = list(results)
|
||||
expected = [(item, {('acct', '2010', '07', '09', '04'):
|
||||
{('public', 'object', 'GET', '2xx'): 1,
|
||||
('public', 'bytes_out'): 95,
|
||||
'marker_query': 0,
|
||||
'format_query': 1,
|
||||
'delimiter_query': 0,
|
||||
'path_query': 0,
|
||||
('public', 'bytes_in'): 6,
|
||||
'prefix_query': 0}})]
|
||||
self.assertEquals(results, expected)
|
||||
finally:
|
||||
log_processor.LogProcessor._internal_proxy = None
|
||||
log_processor.LogProcessor.get_object_data = orig_get_object_data
|
||||
|
||||
def test_multiprocess_collate_errors(self):
|
||||
def get_object_data(*a,**kw):
|
||||
raise log_processor.BadFileDownload()
|
||||
orig_get_object_data = log_processor.LogProcessor.get_object_data
|
||||
try:
|
||||
log_processor.LogProcessor.get_object_data = get_object_data
|
||||
proxy_config = self.proxy_config.copy()
|
||||
proxy_config.update({
|
||||
'log-processor-access': {
|
||||
'source_filename_format':'%Y%m%d%H*',
|
||||
'class_path':
|
||||
'swift.stats.access_processor.AccessLogProcessor'
|
||||
}})
|
||||
processor_args = (proxy_config, DumbLogger())
|
||||
item = ('access', 'a','c','o')
|
||||
logs_to_process = [item]
|
||||
results = log_processor.multiprocess_collate(processor_args,
|
||||
logs_to_process,
|
||||
1)
|
||||
results = list(results)
|
||||
expected = []
|
||||
self.assertEquals(results, expected)
|
||||
finally:
|
||||
log_processor.LogProcessor._internal_proxy = None
|
||||
log_processor.LogProcessor.get_object_data = orig_get_object_data
|
||||
|
Loading…
x
Reference in New Issue
Block a user