From d8ad8ae473c84f21ca98d6583b3ca7d1b6b9e5a5 Mon Sep 17 00:00:00 2001 From: John Dickinson Date: Fri, 10 Sep 2010 15:08:06 -0500 Subject: [PATCH] set up log-stats-collector as a daemon process to create csv files --- bin/swift-log-stats-collector | 27 +++++ bin/swift-log-uploader | 1 - etc/log-processing.conf-sample | 4 +- swift/common/utils.py | 27 +++-- swift/stats/access_processor.py | 48 ++++++++- swift/stats/account_stats.py | 10 +- swift/stats/log_processor.py | 186 ++++++++++++++++++++++++++------ swift/stats/stats_processor.py | 21 +++- 8 files changed, 265 insertions(+), 59 deletions(-) create mode 100644 bin/swift-log-stats-collector diff --git a/bin/swift-log-stats-collector b/bin/swift-log-stats-collector new file mode 100644 index 0000000000..d21135b35c --- /dev/null +++ b/bin/swift-log-stats-collector @@ -0,0 +1,27 @@ +#!/usr/bin/python +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +from swift.stats.log_processor import LogProcessorDaemon +from swift.common import utils + +if __name__ == '__main__': + if len(sys.argv) < 2: + print "Usage: swift-log-stats-collector CONFIG_FILE" + sys.exit() + conf = utils.readconf(sys.argv[1], log_name='log-stats-collector') + stats = LogProcessorDaemon(conf).run(once=True) diff --git a/bin/swift-log-uploader b/bin/swift-log-uploader index 83c8f67303..b557e4c167 100755 --- a/bin/swift-log-uploader +++ b/bin/swift-log-uploader @@ -15,7 +15,6 @@ # limitations under the License. import sys -import time from swift.stats.log_uploader import LogUploader from swift.common import utils diff --git a/etc/log-processing.conf-sample b/etc/log-processing.conf-sample index 6ea8d2b0cb..97d33c15d3 100644 --- a/etc/log-processing.conf-sample +++ b/etc/log-processing.conf-sample @@ -1,8 +1,8 @@ # plugin section format is named "log-processor-" -# section "log-processor" is the generic defaults (overridden by plugins) [log-processor] -# working_dir = /tmp/swift/ +swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 +# container_name = log_processing_data # proxy_server_conf = /etc/swift/proxy-server.conf # log_facility = LOG_LOCAL0 # log_level = INFO diff --git a/swift/common/utils.py b/swift/common/utils.py index 49813be668..5e0f62269a 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -530,19 +530,26 @@ def item_from_env(env, item_name): def cache_from_env(env): return item_from_env(env, 'swift.cache') -def readconf(conf, section_name, log_name=None): +def readconf(conf, section_name=None, log_name=None): c = ConfigParser() if not c.read(conf): print "Unable to read config file %s" % conf sys.exit(1) - if c.has_section(section_name): - conf = dict(c.items(section_name)) - else: - print "Unable to find %s config section in %s" % (section_name, conf) - sys.exit(1) - if "log_name" not in conf: - if log_name is not None: - conf['log_name'] = log_name + if section_name: + if c.has_section(section_name): + conf = dict(c.items(section_name)) else: - conf['log_name'] = section_name + print "Unable to find %s config section in %s" % (section_name, conf) + sys.exit(1) + if "log_name" not in conf: + if log_name is not None: + conf['log_name'] = log_name + else: + conf['log_name'] = section_name + else: + conf = {} + for s in c.sections(): + conf.update({s:dict(c.items(s))}) + if 'log_name' not in conf: + conf['log_name'] = log_name return conf diff --git a/swift/stats/access_processor.py b/swift/stats/access_processor.py index a20d1741ed..08d2f1bf08 100644 --- a/swift/stats/access_processor.py +++ b/swift/stats/access_processor.py @@ -15,6 +15,7 @@ import collections from urllib import unquote +import copy from swift.common.utils import split_path @@ -116,7 +117,7 @@ class AccessLogProcessor(object): d['code'] = int(d['code']) return d - def process(self, obj_stream): + def process(self, obj_stream, account, container, object_name): '''generate hourly groupings of data from one access log file''' hourly_aggr_info = {} for line in obj_stream: @@ -176,3 +177,48 @@ class AccessLogProcessor(object): hourly_aggr_info[aggr_key] = d return hourly_aggr_info + + def keylist_mapping(self): + source_keys = 'service public'.split() + level_keys = 'account container object'.split() + verb_keys = 'GET PUT POST DELETE HEAD COPY'.split() + code_keys = '2xx 4xx 5xx'.split() + + keylist_mapping = { + # : or + 'service_bw_in': ('service', 'bytes_in'), + 'service_bw_out': ('service', 'bytes_out'), + 'public_bw_in': ('public', 'bytes_in'), + 'public_bw_out': ('public', 'bytes_out'), + 'account_requests': set(), + 'container_requests': set(), + 'object_requests': set(), + 'service_request': set(), + 'public_request': set(), + 'ops_count': set(), + } + for verb in verb_keys: + keylist_mapping[verb] = set() + for code in code_keys: + keylist_mapping[code] = set() + for source in source_keys: + for level in level_keys: + for verb in verb_keys: + for code in code_keys: + keylist_mapping['account_requests'].add( + (source, 'account', verb, code)) + keylist_mapping['container_requests'].add( + (source, 'container', verb, code)) + keylist_mapping['object_requests'].add( + (source, 'object', verb, code)) + keylist_mapping['service_request'].add( + ('service', level, verb, code)) + keylist_mapping['public_request'].add( + ('public', level, verb, code)) + keylist_mapping[verb].add( + (source, level, verb, code)) + keylist_mapping[code].add( + (source, level, verb, code)) + keylist_mapping['ops_count'].add( + (source,level,verb,code)) + return keylist_mapping diff --git a/swift/stats/account_stats.py b/swift/stats/account_stats.py index a514aec2e8..12c473cfdf 100644 --- a/swift/stats/account_stats.py +++ b/swift/stats/account_stats.py @@ -48,7 +48,7 @@ class AccountStat(Daemon): src_filename = time.strftime(self.filename_format) tmp_filename = os.path.join('/tmp', src_filename) with open(tmp_filename, 'wb') as statfile: - #statfile.write('Account Name, Container Count, Object Count, Bytes Used, Created At\n') + #statfile.write('Account Name, Container Count, Object Count, Bytes Used\n') for device in os.listdir(self.devices): if self.mount_check and \ not os.path.ismount(os.path.join(self.devices, device)): @@ -68,16 +68,14 @@ class AccountStat(Daemon): broker = AccountBroker(os.path.join(root, filename)) if not broker.is_deleted(): account_name, - created_at, - _, _, + _, _, _, container_count, object_count, bytes_used, _, _ = broker.get_info() - line_data = '"%s",%d,%d,%d,%s\n' % (account_name, + line_data = '"%s",%d,%d,%d\n' % (account_name, container_count, object_count, - bytes_used, - created_at) + bytes_used) statfile.write(line_data) renamer(tmp_filename, os.path.join(self.target_dir, src_filename)) diff --git a/swift/stats/log_processor.py b/swift/stats/log_processor.py index 10fd456757..66344ae106 100644 --- a/swift/stats/log_processor.py +++ b/swift/stats/log_processor.py @@ -15,36 +15,23 @@ from ConfigParser import ConfigParser import zlib +import time +import datetime +import cStringIO +import collections from swift.common.internal_proxy import InternalProxy from swift.common.exceptions import ChunkReadTimeout -from swift.common.utils import get_logger - -class ConfigError(Exception): - pass - -class MissingProxyConfig(ConfigError): - pass +from swift.common.utils import get_logger, readconf class LogProcessor(object): def __init__(self, conf, logger): stats_conf = conf.get('log-processor', {}) - working_dir = stats_conf.get('working_dir', '/tmp/swift/') - if working_dir.endswith('/') and len(working_dir) > 1: - working_dir = working_dir[:-1] - self.working_dir = working_dir proxy_server_conf_loc = stats_conf.get('proxy_server_conf', '/etc/swift/proxy-server.conf') - try: - c = ConfigParser() - c.read(proxy_server_conf_loc) - proxy_server_conf = dict(c.items('proxy-server')) - except: - raise - raise MissingProxyConfig() - self.proxy_server_conf = proxy_server_conf + self.proxy_server_conf = readconf(proxy_server_conf_loc, 'proxy-server') if isinstance(logger, tuple): self.logger = get_logger(*logger) else: @@ -71,7 +58,10 @@ class LogProcessor(object): stream = self.get_object_data(account, container, object_name, compressed=compressed) # look up the correct plugin and send the stream to it - return self.plugins[plugin_name]['instance'].process(stream) + return self.plugins[plugin_name]['instance'].process(stream, + account, + container, + object_name) def get_data_list(self, start_date=None, end_date=None, listing_filter=None): total_list = [] @@ -81,6 +71,8 @@ class LogProcessor(object): l = self.get_container_listing(account, container, start_date, end_date, listing_filter) for i in l: + # The items in this list end up being passed as positional + # parameters to process_one_file. total_list.append((p, account, container, i)) return total_list @@ -174,21 +166,146 @@ class LogProcessor(object): except ChunkReadTimeout: raise BadFileDownload() -def multiprocess_collate(processor_args, - start_date=None, - end_date=None, - listing_filter=None): - '''get listing of files and yield hourly data from them''' - p = LogProcessor(*processor_args) - all_files = p.get_data_list(start_date, end_date, listing_filter) + def generate_keylist_mapping(self): + keylist = {} + for plugin in self.plugins: + plugin_keylist = self.plugins['instance'].keylist_mapping() + for k, v in plugin_keylist.items(): + o = keylist.get(k) + if o: + if isinstance(o, set): + if isinstance(v, set): + o.update(v) + else: + o.update([v]) + else: + o = set(o) + if isinstance(v, set): + o.update(v) + else: + o.update([v]) + else: + o = v + keylist[k] = o + return keylist - p.logger.info('loaded %d files to process' % len(all_files)) - if not all_files: - # no work to do - return +class LogProcessorDaemon(Daemon): + def __init__(self, conf): + super(self, LogProcessorDaemon).__init__(conf) + self.log_processor = LogProcessor(conf, self.logger) + c = readconf(conf) + self.lookback_hours = int(c.get('lookback_hours', '120')) + self.lookback_window = int(c.get('lookback_window', '%s'%lookback_hours)) + self.log_processor_account = c['swift_account'] + self.log_processor_container = c.get('container_name', 'log_processing_data') - worker_count = multiprocessing.cpu_count() - 1 + def run_once(self): + self.logger.info("Beginning log processing") + start = time.time() + if lookback_hours == 0: + lookback_start = None + lookback_end = None + else: + lookback_start = datetime.datetime.now() - \ + datetime.timedelta(hours=lookback_hours) + lookback_start = lookback_start.strftime('%Y%m%d') + if lookback_window == 0: + lookback_end = None + else: + lookback_end = datetime.datetime.now() - \ + datetime.timedelta(hours=lookback_hours) + \ + datetime.timedelta(hours=lookback_window) + lookback_end = lookback_end.strftime('%Y%m%d') + + try: + processed_files_stream = self.log_processor,get_object_data( + self.log_processor_account, + self.log_processor_container, + 'processed_files.pickle.gz', + compressed=True) + buf = ''.join(x for x in processed_files_stream) + already_processed_files = cPickle.loads(buf) + except: + already_processed_files = set() + + logs_to_process = self.log_processor.get_data_list(lookback_start, + lookback_end, + already_processed_files) + self.logger.info('loaded %d files to process' % len(logs_to_process)) + if not logs_to_process: + self.logger.info("Log processing done (%0.2f minutes)" % + ((time.time()-start)/60)) + return + + # map + processor_args = (conf, self.logger) + results = multiprocess_collate(processor_args, logs_to_process) + + #reduce + aggr_data = {} + processed_files = already_processed_files + for item, data in results.items(): + # since item contains the plugin and the log name, new plugins will + # "reprocess" the file and the results will be in the final csv. + processed_files.append(item) + for k, d in data.items(): + existing_data = aggr_data.get(k, {}) + for i, j in d.items(): + current = existing_data.get(i, 0) + # merging strategy for key collisions is addition + # processing plugins need to realize this + existing_data[i] = current + j + aggr_data[k] = existing_data + + # group + # reduce a large number of keys in aggr_data[k] to a small number of + # output keys + keylist_mapping = generate_keylist_mapping() + final_info = collections.defaultdict(dict) + for account, data in rows.items(): + for key, mapping in keylist_mapping.items(): + if isinstance(mapping, (list, set)): + value = 0 + for k in mapping: + try: + value += data[k] + except KeyError: + pass + else: + try: + value = data[mapping] + except KeyError: + value = 0 + final_info[account][key] = value + + # output + sorted_keylist_mapping = sorted(keylist_mapping) + columns = 'bill_ts,data_ts,account,' + ','.join(sorted_keylist_mapping) + print columns + for (account, year, month, day, hour), d in final_info.items(): + bill_ts = '' + data_ts = '%s/%s/%s %s:00:00' % (year, month, day, hour) + row = [bill_ts, data_ts] + row.append('%s' % account) + for k in sorted_keylist_mapping: + row.append('%s'%d[k]) + print ','.join(row) + + # cleanup + s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL) + f = cStringIO.StringIO(s) + self.log_processor.internal_proxy.upload_file(s, + self.log_processor_account, + self.log_processor_container, + 'processed_files.pickle.gz') + + self.logger.info("Log processing done (%0.2f minutes)" % + ((time.time()-start)/60)) + +def multiprocess_collate(processor_args, logs_to_process): + '''yield hourly data from logs_to_process''' + worker_count = multiprocessing.cpu_count() results = [] in_queue = multiprocessing.Queue() out_queue = multiprocessing.Queue() @@ -199,7 +316,7 @@ def multiprocess_collate(processor_args, out_queue)) p.start() results.append(p) - for x in all_files: + for x in logs_to_process: in_queue.put(x) for _ in range(worker_count): in_queue.put(None) @@ -229,6 +346,5 @@ def collate_worker(processor_args, in_queue, out_queue): except Queue.Empty: time.sleep(.1) else: - ret = None - ret = p.process_one_file(item) + ret = p.process_one_file(*item) out_queue.put((item, ret)) \ No newline at end of file diff --git a/swift/stats/stats_processor.py b/swift/stats/stats_processor.py index 4921915176..c7b89c11e6 100644 --- a/swift/stats/stats_processor.py +++ b/swift/stats/stats_processor.py @@ -18,9 +18,10 @@ class StatsLogProcessor(object): def __init__(self, conf): pass - def process(self, obj_stream): + def process(self, obj_stream, account, container, object_name): '''generate hourly groupings of data from one stats log file''' account_totals = {} + year, month, day, hour, _ = object_name.split('/') for line in obj_stream: if not line: continue @@ -35,17 +36,29 @@ class StatsLogProcessor(object): object_count = int(object_count.strip('"')) bytes_used = int(bytes_used.strip('"')) created_at = created_at.strip('"') - d = account_totals.get(account, {}) - d['count'] = d.setdefault('count', 0) + 1 + aggr_key = (account, year, month, day, hour) + d = account_totals.get(aggr_key, {}) + d['replica_count'] = d.setdefault('count', 0) + 1 d['container_count'] = d.setdefault('container_count', 0) + \ container_count d['object_count'] = d.setdefault('object_count', 0) + \ object_count d['bytes_used'] = d.setdefault('bytes_used', 0) + \ bytes_used - d['created_at'] = created_at account_totals[account] = d except (IndexError, ValueError): # bad line data pass return account_totals + + def keylist_mapping(self): + ''' + returns a dictionary of final keys mapped to source keys + ''' + keylist_mapping = { + # : or + 'bytes_used': 'bytes_used', + 'container_count': 'container_count', + 'object_count': 'object_count', + 'replica_count': 'replica_count', + }