Merge pull request #33 from notmyname/billing_catchup

Billing catchup
This commit is contained in:
Greg Lange 2012-02-27 10:48:02 -08:00
commit 65da19b204

View File

@ -74,6 +74,11 @@ class LogProcessorCommon(object):
def get_object_data(self, swift_account, container_name, object_name,
compressed=False):
'''reads an object and yields its lines'''
self.logger.debug('get_object_data(%r, %r, %r, compressed=%r)' %
(swift_account,
container_name,
object_name,
compressed))
code, o = self.internal_proxy.get_object(swift_account, container_name,
object_name)
if code < 200 or code >= 300:
@ -173,11 +178,15 @@ def multiprocess_collate(processor_klass, processor_args, processor_method,
in_queue.put(x)
for _junk in range(worker_count):
in_queue.put(None) # tell the worker to end
in_queue.close()
while True:
try:
item, data = out_queue.get_nowait()
except Queue.Empty:
sleep(.01)
except Exception:
if logger:
logger.exception('error reading from out queue')
else:
if isinstance(data, Exception):
if logger:
@ -192,18 +201,24 @@ def multiprocess_collate(processor_klass, processor_args, processor_method,
def collate_worker(processor_klass, processor_args, processor_method, in_queue,
out_queue):
'''worker process for multiprocess_collate'''
p = processor_klass(*processor_args)
while True:
item = in_queue.get()
if item is None:
# no more work to process
break
try:
method = getattr(p, processor_method)
except AttributeError:
return
try:
ret = method(*item)
except Exception, err:
ret = err
out_queue.put((item, ret))
try:
p = processor_klass(*processor_args)
while True:
item = in_queue.get()
if item is None:
# no more work to process
break
try:
method = getattr(p, processor_method)
except AttributeError:
return
try:
ret = method(*item)
except Exception, err:
ret = err
out_queue.put((item, ret))
except Exception, err:
print '****ERROR in worker****\n%r\n********' % err
finally:
in_queue.close()
out_queue.close()