From 0bb5857da3af3afc2872255441efa1c0fbc23f3f Mon Sep 17 00:00:00 2001 From: Clay Gerrard Date: Mon, 20 Sep 2010 17:52:58 -0500 Subject: [PATCH] pep8 issues, refactored log_processor a tiny bit for testing (lazy load internal proxy), added a few comments --- swift/stats/access_processor.py | 16 ++++--- swift/stats/account_stats.py | 24 ++++++---- swift/stats/log_processor.py | 66 +++++++++++++++++---------- swift/stats/log_uploader.py | 32 +++++++------ swift/stats/stats_processor.py | 2 + test/unit/stats/test_log_processor.py | 8 ++-- 6 files changed, 89 insertions(+), 59 deletions(-) diff --git a/swift/stats/access_processor.py b/swift/stats/access_processor.py index e4be212efb..6fcdafc59c 100644 --- a/swift/stats/access_processor.py +++ b/swift/stats/access_processor.py @@ -21,7 +21,9 @@ from swift.common.utils import split_path month_map = '_ Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split() + class AccessLogProcessor(object): + """Transform proxy server access logs""" def __init__(self, conf): self.server_name = conf.get('server_name', 'proxy') @@ -112,8 +114,8 @@ class AccessLogProcessor(object): d['account'] = account d['container_name'] = container_name d['object_name'] = object_name - d['bytes_out'] = int(d['bytes_out'].replace('-','0')) - d['bytes_in'] = int(d['bytes_in'].replace('-','0')) + d['bytes_out'] = int(d['bytes_out'].replace('-', '0')) + d['bytes_in'] = int(d['bytes_in'].replace('-', '0')) d['code'] = int(d['code']) return d @@ -151,12 +153,12 @@ class AccessLogProcessor(object): source = 'service' else: source = 'public' - + if line_data['client_ip'] in self.service_ips: source = 'service' - d[(source, 'bytes_out')] = d.setdefault((source, 'bytes_out'), 0) + \ - bytes_out + d[(source, 'bytes_out')] = d.setdefault(( + source, 'bytes_out'), 0) + bytes_out d[(source, 'bytes_in')] = d.setdefault((source, 'bytes_in'), 0) + \ bytes_in @@ -171,7 +173,7 @@ class AccessLogProcessor(object): path = line_data.get('path', 0) d['path_query'] = d.setdefault('path_query', 0) + path - code = '%dxx' % (code/100) + code = '%dxx' % (code / 100) key = (source, op_level, method, code) d[key] = d.setdefault(key, 0) + 1 @@ -220,5 +222,5 @@ class AccessLogProcessor(object): keylist_mapping[code].add( (source, level, verb, code)) keylist_mapping['ops_count'].add( - (source,level,verb,code)) + (source, level, verb, code)) return keylist_mapping diff --git a/swift/stats/account_stats.py b/swift/stats/account_stats.py index 3eac63c0f6..3ed8b6d88b 100644 --- a/swift/stats/account_stats.py +++ b/swift/stats/account_stats.py @@ -21,9 +21,15 @@ from swift.account.server import DATADIR as account_server_data_dir from swift.common.db import AccountBroker from swift.common.internal_proxy import InternalProxy from swift.common.utils import renamer, get_logger, readconf +from swift.common.constraints import check_mount from swift.common.daemon import Daemon + class AccountStat(Daemon): + """Extract storage stats from account databases on the account + storage nodes + """ + def __init__(self, stats_conf): super(AccountStat, self).__init__(stats_conf) target_dir = stats_conf.get('log_dir', '/var/log/swift') @@ -45,7 +51,7 @@ class AccountStat(Daemon): start = time.time() self.find_and_process() self.logger.info("Gathering account stats complete (%0.2f minutes)" % - ((time.time()-start)/60)) + ((time.time() - start) / 60)) def find_and_process(self): #TODO: handle a counter in the filename to prevent overwrites? @@ -53,10 +59,10 @@ class AccountStat(Daemon): #TODO: don't use /tmp? tmp_filename = os.path.join('/tmp', src_filename) with open(tmp_filename, 'wb') as statfile: - #statfile.write('Account Name, Container Count, Object Count, Bytes Used\n') + #statfile.write( + # 'Account Name, Container Count, Object Count, Bytes Used\n') for device in os.listdir(self.devices): - if self.mount_check and \ - not os.path.ismount(os.path.join(self.devices, device)): + if self.mount_check and not check_mount(self.devices, device): self.logger.error("Device %s is not mounted, skipping." % device) continue @@ -70,7 +76,8 @@ class AccountStat(Daemon): for root, dirs, files in os.walk(accounts, topdown=False): for filename in files: if filename.endswith('.db'): - broker = AccountBroker(os.path.join(root, filename)) + db_path = os.path.join(root, filename) + broker = AccountBroker(db_path) if not broker.is_deleted(): (account_name, _, _, _, @@ -78,9 +85,8 @@ class AccountStat(Daemon): object_count, bytes_used, _, _) = broker.get_info() - line_data = '"%s",%d,%d,%d\n' % (account_name, - container_count, - object_count, - bytes_used) + line_data = '"%s",%d,%d,%d\n' % ( + account_name, container_count, + object_count, bytes_used) statfile.write(line_data) renamer(tmp_filename, os.path.join(self.target_dir, src_filename)) diff --git a/swift/stats/log_processor.py b/swift/stats/log_processor.py index 04c4a3b649..473beda112 100644 --- a/swift/stats/log_processor.py +++ b/swift/stats/log_processor.py @@ -29,26 +29,22 @@ from swift.common.exceptions import ChunkReadTimeout from swift.common.utils import get_logger, readconf from swift.common.daemon import Daemon + class BadFileDownload(Exception): pass + class LogProcessor(object): + """Load plugins, process logs""" def __init__(self, conf, logger): stats_conf = conf.get('log-processor', {}) - - proxy_server_conf_loc = stats_conf.get('proxy_server_conf', - '/etc/swift/proxy-server.conf') - self.proxy_server_conf = appconfig('config:%s' % proxy_server_conf_loc, - name='proxy-server') + if isinstance(logger, tuple): self.logger = get_logger(*logger) else: self.logger = logger - self.internal_proxy = InternalProxy(self.proxy_server_conf, - self.logger, - retries=3) - + # load the processing plugins self.plugins = {} plugin_prefix = 'log-processor-' @@ -56,11 +52,25 @@ class LogProcessor(object): plugin_name = section[len(plugin_prefix):] plugin_conf = conf.get(section, {}) self.plugins[plugin_name] = plugin_conf - import_target, class_name = plugin_conf['class_path'].rsplit('.', 1) + class_path = self.plugins[plugin_name]['class_path'] + import_target, class_name = class_path.rsplit('.', 1) module = __import__(import_target, fromlist=[import_target]) klass = getattr(module, class_name) self.plugins[plugin_name]['instance'] = klass(plugin_conf) + @property + def internal_proxy(self): + '''Lazy load internal proxy''' + if self._internal_proxy is None: + proxy_server_conf_loc = stats_conf.get('proxy_server_conf', + '/etc/swift/proxy-server.conf') + self.proxy_server_conf = appconfig('config:%s' % proxy_server_conf_loc, + name='proxy-server') + self._internal_proxy = InternalProxy(self.proxy_server_conf, + self.logger, + retries=3) + return self._internal_proxy + def process_one_file(self, plugin_name, account, container, object_name): # get an iter of the object data compressed = object_name.endswith('.gz') @@ -72,7 +82,8 @@ class LogProcessor(object): container, object_name) - def get_data_list(self, start_date=None, end_date=None, listing_filter=None): + def get_data_list(self, start_date=None, end_date=None, + listing_filter=None): total_list = [] for name, data in self.plugins.items(): account = data['swift_account'] @@ -89,8 +100,9 @@ class LogProcessor(object): total_list.append(x) return total_list - def get_container_listing(self, swift_account, container_name, start_date=None, - end_date=None, listing_filter=None): + def get_container_listing(self, swift_account, container_name, + start_date=None, end_date=None, + listing_filter=None): ''' Get a container listing, filtered by start_date, end_date, and listing_filter. Dates, if given, should be in YYYYMMDDHH format @@ -162,15 +174,16 @@ class LogProcessor(object): last_part = '' last_compressed_part = '' # magic in the following zlib.decompressobj argument is courtesy of - # http://stackoverflow.com/questions/2423866/python-decompressing-gzip-chunk-by-chunk - d = zlib.decompressobj(16+zlib.MAX_WBITS) + # Python decompressing gzip chunk-by-chunk + # http://stackoverflow.com/questions/2423866 + d = zlib.decompressobj(16 + zlib.MAX_WBITS) try: for chunk in o: if compressed: try: chunk = d.decompress(chunk) except zlib.error: - raise BadFileDownload() # bad compressed data + raise BadFileDownload() # bad compressed data parts = chunk.split('\n') parts[0] = last_part + parts[0] for part in parts[:-1]: @@ -208,6 +221,8 @@ class LogProcessor(object): class LogProcessorDaemon(Daemon): + """Gather raw log data and farm proccessing, results output via print""" + def __init__(self, conf): c = conf.get('log-processor') super(LogProcessorDaemon, self).__init__(c) @@ -228,15 +243,16 @@ class LogProcessorDaemon(Daemon): lookback_start = None lookback_end = None else: - lookback_start = datetime.datetime.now() - \ - datetime.timedelta(hours=self.lookback_hours) + delta_hours = datetime.timedelta(hours=self.lookback_hours) + lookback_start = datetime.datetime.now() - delta_hours lookback_start = lookback_start.strftime('%Y%m%d%H') if self.lookback_window == 0: lookback_end = None else: + delta_window = datetime.timedelta(hours=self.lookback_window) lookback_end = datetime.datetime.now() - \ - datetime.timedelta(hours=self.lookback_hours) + \ - datetime.timedelta(hours=self.lookback_window) + delta_hours + \ + delta_window lookback_end = lookback_end.strftime('%Y%m%d%H') self.logger.debug('lookback_start: %s' % lookback_start) self.logger.debug('lookback_end: %s' % lookback_end) @@ -261,7 +277,7 @@ class LogProcessorDaemon(Daemon): self.logger.info('loaded %d files to process' % len(logs_to_process)) if not logs_to_process: self.logger.info("Log processing done (%0.2f minutes)" % - ((time.time()-start)/60)) + ((time.time() - start) / 60)) return # map @@ -315,7 +331,7 @@ class LogProcessorDaemon(Daemon): row = [bill_ts, data_ts] row.append('%s' % account) for k in sorted_keylist_mapping: - row.append('%s'%d[k]) + row.append('%s' % d[k]) print ','.join(row) # cleanup @@ -327,7 +343,8 @@ class LogProcessorDaemon(Daemon): 'processed_files.pickle.gz') self.logger.info("Log processing done (%0.2f minutes)" % - ((time.time()-start)/60)) + ((time.time() - start) / 60)) + def multiprocess_collate(processor_args, logs_to_process): '''yield hourly data from logs_to_process''' @@ -361,6 +378,7 @@ def multiprocess_collate(processor_args, logs_to_process): for r in results: r.join() + def collate_worker(processor_args, in_queue, out_queue): '''worker process for multiprocess_collate''' p = LogProcessor(*processor_args) @@ -373,4 +391,4 @@ def collate_worker(processor_args, in_queue, out_queue): time.sleep(.1) else: ret = p.process_one_file(*item) - out_queue.put((item, ret)) \ No newline at end of file + out_queue.put((item, ret)) diff --git a/swift/stats/log_uploader.py b/swift/stats/log_uploader.py index f3fff42df5..d695e27338 100644 --- a/swift/stats/log_uploader.py +++ b/swift/stats/log_uploader.py @@ -25,18 +25,19 @@ from swift.common.internal_proxy import InternalProxy from swift.common.daemon import Daemon from swift.common import utils + class LogUploader(Daemon): ''' Given a local directory, a swift account, and a container name, LogParser - will upload all files in the local directory to the given account/container. - All but the newest files will be uploaded, and the files' md5 sum will be - computed. The hash is used to prevent duplicate data from being uploaded - multiple times in different files (ex: log lines). Since the hash is - computed, it is also used as the uploaded object's etag to ensure data - integrity. - + will upload all files in the local directory to the given account/ + container. All but the newest files will be uploaded, and the files' md5 + sum will be computed. The hash is used to prevent duplicate data from + being uploaded multiple times in different files (ex: log lines). Since + the hash is computed, it is also used as the uploaded object's etag to + ensure data integrity. + Note that after the file is successfully uploaded, it will be unlinked. - + The given proxy server config is used to instantiate a proxy server for the object uploads. ''' @@ -66,7 +67,7 @@ class LogUploader(Daemon): start = time.time() self.upload_all_logs() self.logger.info("Uploading logs complete (%0.2f minutes)" % - ((time.time()-start)/60)) + ((time.time() - start) / 60)) def upload_all_logs(self): i = [(self.filename_format.index(c), c) for c in '%Y %m %d %H'.split()] @@ -76,17 +77,17 @@ class LogUploader(Daemon): for start, c in i: offset = base_offset + start if c == '%Y': - year_offset = offset, offset+4 + year_offset = offset, offset + 4 # Add in the difference between len(%Y) and the expanded # version of %Y (????). This makes sure the codes after this # one will align properly in the final filename. base_offset += 2 elif c == '%m': - month_offset = offset, offset+2 + month_offset = offset, offset + 2 elif c == '%d': - day_offset = offset, offset+2 + day_offset = offset, offset + 2 elif c == '%H': - hour_offset = offset, offset+2 + hour_offset = offset, offset + 2 if not (year_offset and month_offset and day_offset and hour_offset): # don't have all the parts, can't upload anything return @@ -124,7 +125,8 @@ class LogUploader(Daemon): continue if (time.time() - os.stat(filename).st_mtime) < 7200: # don't process very new logs - self.logger.debug("Skipping log: %s (< 2 hours old)" % filename) + self.logger.debug( + "Skipping log: %s (< 2 hours old)" % filename) continue self.upload_one_log(filename, year, month, day, hour) @@ -147,7 +149,7 @@ class LogUploader(Daemon): # By adding a hash to the filename, we ensure that uploaded files # have unique filenames and protect against uploading one file # more than one time. By using md5, we get an etag for free. - target_filename = '/'.join([year, month, day, hour, filehash+'.gz']) + target_filename = '/'.join([year, month, day, hour, filehash + '.gz']) if self.internal_proxy.upload_file(filename, self.swift_account, self.container_name, diff --git a/swift/stats/stats_processor.py b/swift/stats/stats_processor.py index eb31d8c668..30500dfe95 100644 --- a/swift/stats/stats_processor.py +++ b/swift/stats/stats_processor.py @@ -13,7 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. + class StatsLogProcessor(object): + """Transform account storage stat logs""" def __init__(self, conf): pass diff --git a/test/unit/stats/test_log_processor.py b/test/unit/stats/test_log_processor.py index 1f4f14f513..f6576f6a43 100644 --- a/test/unit/stats/test_log_processor.py +++ b/test/unit/stats/test_log_processor.py @@ -112,7 +112,7 @@ class TestLogProcessor(unittest.TestCase): def test_get_container_listing(self): p = log_processor.LogProcessor(self.proxy_config, DumbLogger()) - p.internal_proxy = DumbInternalProxy() + p._internal_proxy = DumbInternalProxy() result = p.get_container_listing('a', 'foo') expected = ['2010/03/14/13/obj1'] self.assertEquals(result, expected) @@ -133,7 +133,7 @@ class TestLogProcessor(unittest.TestCase): def test_get_object_data(self): p = log_processor.LogProcessor(self.proxy_config, DumbLogger()) - p.internal_proxy = DumbInternalProxy() + p._internal_proxy = DumbInternalProxy() result = list(p.get_object_data('a', 'c', 'o', False)) expected = ['obj','data'] self.assertEquals(result, expected) @@ -148,7 +148,7 @@ class TestLogProcessor(unittest.TestCase): 'swift.stats.stats_processor.StatsLogProcessor' }}) p = log_processor.LogProcessor(stats_proxy_config, DumbLogger()) - p.internal_proxy = DumbInternalProxy() + p._internal_proxy = DumbInternalProxy() def get_object_data(*a,**kw): return [self.stats_test_line] p.get_object_data = get_object_data @@ -158,4 +158,4 @@ class TestLogProcessor(unittest.TestCase): 'object_count': 2, 'container_count': 1, 'bytes_used': 3}} - self.assertEquals(result, expected) \ No newline at end of file + self.assertEquals(result, expected)