From d221703677932531217184c9e74bae4ed42d9bc4 Mon Sep 17 00:00:00 2001 From: Daisuke Morita Date: Tue, 2 Sep 2014 11:51:45 +0900 Subject: [PATCH] Usage info in JSON format Before this commit, Slogging only outputs usage info in CSV file. To enhance usability, the output format of choice is increased. --- etc/log-processor.conf-sample | 1 + slogging/common.py | 27 +++++++++ slogging/log_processor.py | 106 ++++++++++++++++++++++++++-------- 3 files changed, 109 insertions(+), 25 deletions(-) create mode 100644 slogging/common.py diff --git a/etc/log-processor.conf-sample b/etc/log-processor.conf-sample index 3a12250..9d2d1fb 100644 --- a/etc/log-processor.conf-sample +++ b/etc/log-processor.conf-sample @@ -9,6 +9,7 @@ swift_account = AUTH_test # lookback_hours = 120 # lookback_window = 120 # user = swift +# format_type = csv [log-processor-access] # log_dir = /var/log/swift/ diff --git a/slogging/common.py b/slogging/common.py new file mode 100644 index 0000000..bf21e20 --- /dev/null +++ b/slogging/common.py @@ -0,0 +1,27 @@ +# Copyright(c)2014 NTT corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def get_format_type(conf, logger, key, default): + """ + Get and check format_type value. + """ + format_type = conf.get(key, default).lower() + if format_type not in ('json', 'csv'): + logger.warning( + _("Invalid Parameter %s: %s, " % (key, format_type) + + "use default %s.") % default) + format_type = default + return format_type diff --git a/slogging/log_processor.py b/slogging/log_processor.py index f11f5db..9a31193 100644 --- a/slogging/log_processor.py +++ b/slogging/log_processor.py @@ -24,12 +24,15 @@ import multiprocessing import Queue import cPickle import hashlib +import json +import io from slogging.internal_proxy import InternalProxy from swift.common.utils import get_logger, readconf from swift.common.daemon import Daemon from slogging.log_common import LogProcessorCommon, multiprocess_collate, \ BadFileDownload +from slogging import common now = datetime.datetime.now @@ -53,6 +56,7 @@ class LogProcessor(LogProcessorCommon): module = __import__(import_target, fromlist=[import_target]) klass = getattr(module, class_name) self.plugins[plugin_name]['instance'] = klass(plugin_conf) + self.plugins[plugin_name]['keylist_mapping'] = {} self.logger.debug(_('Loaded plugin "%s"') % plugin_name) def process_one_file(self, plugin_name, account, container, object_name): @@ -90,7 +94,8 @@ class LogProcessor(LogProcessorCommon): def generate_keylist_mapping(self): keylist = {} for plugin in self.plugins: - plugin_keylist = self.plugins[plugin]['instance'].keylist_mapping() + plugin_keylist = self.plugins[plugin]['keylist_mapping'] = \ + self.plugins[plugin]['instance'].keylist_mapping() if not plugin_keylist: continue for k, v in plugin_keylist.items(): @@ -134,6 +139,8 @@ class LogProcessorDaemon(Daemon): self.worker_count = int(c.get('worker_count', '1')) self._keylist_mapping = None self.processed_files_filename = 'processed_files.pickle.gz' + self.format_type = common.get_format_type(c, self.logger, + 'format_type', 'csv') def get_lookback_interval(self): """ @@ -291,41 +298,90 @@ class LogProcessorDaemon(Daemon): def get_output(self, final_info): """ - :returns: a list of rows to appear in the csv file. + :returns: a list of rows to appear in the csv file or + a dictionary to appear in the json file. - The first row contains the column headers for the rest of the - rows in the returned list. + csv file: + The first row contains the column headers for the rest + of the rows in the returned list. - Each row after the first row corresponds to an account's data - for that hour. + Each row after the first row corresponds to an account's + data. + json file: + First level just shows a label "stats_data". + Second level of stats_data lists account names. + Each account block starts with a time label, and it + contains stats of account usage. """ - sorted_keylist_mapping = sorted(self.keylist_mapping) - columns = ['data_ts', 'account'] + sorted_keylist_mapping - output = [columns] - for (account, year, month, day, hour), d in final_info.items(): - data_ts = '%04d/%02d/%02d %02d:00:00' % \ - (int(year), int(month), int(day), int(hour)) - row = [data_ts, '%s' % (account)] - for k in sorted_keylist_mapping: - row.append(str(d[k])) - output.append(row) + if self.format_type == 'json': + all_account_stats = collections.defaultdict(dict) + for (account, year, month, day, hour), d in final_info.items(): + data_ts = datetime.datetime(int(year), int(month), + int(day), int(hour)) + time_stamp = data_ts.strftime('%Y/%m/%d %H:00:00') + hourly_account_stats = \ + self.restructure_stats_dictionary(d) + all_account_stats[account].update({time_stamp: + hourly_account_stats}) + output = {'stats_data': all_account_stats} + else: # csv + sorted_keylist_mapping = sorted(self.keylist_mapping) + columns = ['data_ts', 'account'] + sorted_keylist_mapping + output = [columns] + for (account, year, month, day, hour), d in final_info.items(): + data_ts = '%04d/%02d/%02d %02d:00:00' % \ + (int(year), int(month), int(day), int(hour)) + row = [data_ts, '%s' % (account)] + for k in sorted_keylist_mapping: + row.append(str(d[k])) + output.append(row) return output + def restructure_stats_dictionary(self, target_dict): + """ + Restructure stats dictionary for json format. + + :param target_dict: dictionary of restructuring target + + :returns: restructured stats dictionary + """ + + account_stats = {} + access_stats = {} + account_stats_key_list = \ + self.log_processor.plugins['stats']['keylist_mapping'] + access_stats_key_list = \ + self.log_processor.plugins['access']['keylist_mapping'] + hourly_stats = {'account_stats': account_stats, + 'access_stats': access_stats} + for k, v in target_dict.items(): + if k in account_stats_key_list: + account_stats[k] = int(v) + elif k in access_stats_key_list: + access_stats[k] = int(v) + return hourly_stats + def store_output(self, output): """ - Takes the a list of rows and stores a csv file of the values in the - stats account. + Takes the dictionary or the list of rows and stores a json/csv file of + the values in the stats account. - :param output: list of rows to appear in the csv file + :param output: a dictonary or a list of row - This csv file is final product of this script. + This json or csv file is final product of this script. """ - out_buf = '\n'.join([','.join(row) for row in output]) - h = hashlib.md5(out_buf).hexdigest() - upload_name = time.strftime('%Y/%m/%d/%H/') + '%s.csv.gz' % h - f = cStringIO.StringIO(out_buf) + if self.format_type == 'json': + out_buf = json.dumps(output, indent=2) + h = hashlib.md5(out_buf).hexdigest() + upload_name = time.strftime('%Y/%m/%d/%H/') + '%s.json.gz' % h + f = io.BytesIO(out_buf) + else: + out_buf = '\n'.join([','.join(row) for row in output]) + h = hashlib.md5(out_buf).hexdigest() + upload_name = time.strftime('%Y/%m/%d/%H/') + '%s.csv.gz' % h + f = cStringIO.StringIO(out_buf) self.log_processor.internal_proxy.upload_file(f, self.log_processor_account, self.log_processor_container, @@ -384,7 +440,7 @@ class LogProcessorDaemon(Daemon): """ Process log files that fall within the lookback interval. - Upload resulting csv file to stats account. + Upload resulting csv or json file to stats account. Update processed files list and upload to stats account. """