diff --git a/slogging/log_common.py b/slogging/log_common.py index a3c1523..310cf8e 100644 --- a/slogging/log_common.py +++ b/slogging/log_common.py @@ -74,6 +74,11 @@ class LogProcessorCommon(object): def get_object_data(self, swift_account, container_name, object_name, compressed=False): '''reads an object and yields its lines''' + self.logger.debug('get_object_data(%r, %r, %r, compressed=%r)' % + (swift_account, + container_name, + object_name, + compressed)) code, o = self.internal_proxy.get_object(swift_account, container_name, object_name) if code < 200 or code >= 300: @@ -173,11 +178,15 @@ def multiprocess_collate(processor_klass, processor_args, processor_method, in_queue.put(x) for _junk in range(worker_count): in_queue.put(None) # tell the worker to end + in_queue.close() while True: try: item, data = out_queue.get_nowait() except Queue.Empty: sleep(.01) + except Exception: + if logger: + logger.exception('error reading from out queue') else: if isinstance(data, Exception): if logger: @@ -192,18 +201,24 @@ def multiprocess_collate(processor_klass, processor_args, processor_method, def collate_worker(processor_klass, processor_args, processor_method, in_queue, out_queue): '''worker process for multiprocess_collate''' - p = processor_klass(*processor_args) - while True: - item = in_queue.get() - if item is None: - # no more work to process - break - try: - method = getattr(p, processor_method) - except AttributeError: - return - try: - ret = method(*item) - except Exception, err: - ret = err - out_queue.put((item, ret)) + try: + p = processor_klass(*processor_args) + while True: + item = in_queue.get() + if item is None: + # no more work to process + break + try: + method = getattr(p, processor_method) + except AttributeError: + return + try: + ret = method(*item) + except Exception, err: + ret = err + out_queue.put((item, ret)) + except Exception, err: + print '****ERROR in worker****\n%r\n********' % err + finally: + in_queue.close() + out_queue.close()