Usage info in JSON format
Before this commit, Slogging only outputs usage info in CSV file. To enhance usability, the output format of choice is increased.
This commit is contained in:
parent
f5234eed45
commit
d221703677
@ -9,6 +9,7 @@ swift_account = AUTH_test
|
|||||||
# lookback_hours = 120
|
# lookback_hours = 120
|
||||||
# lookback_window = 120
|
# lookback_window = 120
|
||||||
# user = swift
|
# user = swift
|
||||||
|
# format_type = csv
|
||||||
|
|
||||||
[log-processor-access]
|
[log-processor-access]
|
||||||
# log_dir = /var/log/swift/
|
# log_dir = /var/log/swift/
|
||||||
|
27
slogging/common.py
Normal file
27
slogging/common.py
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
# Copyright(c)2014 NTT corp.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
|
||||||
|
def get_format_type(conf, logger, key, default):
|
||||||
|
"""
|
||||||
|
Get and check format_type value.
|
||||||
|
"""
|
||||||
|
format_type = conf.get(key, default).lower()
|
||||||
|
if format_type not in ('json', 'csv'):
|
||||||
|
logger.warning(
|
||||||
|
_("Invalid Parameter %s: %s, " % (key, format_type) +
|
||||||
|
"use default %s.") % default)
|
||||||
|
format_type = default
|
||||||
|
return format_type
|
@ -24,12 +24,15 @@ import multiprocessing
|
|||||||
import Queue
|
import Queue
|
||||||
import cPickle
|
import cPickle
|
||||||
import hashlib
|
import hashlib
|
||||||
|
import json
|
||||||
|
import io
|
||||||
|
|
||||||
from slogging.internal_proxy import InternalProxy
|
from slogging.internal_proxy import InternalProxy
|
||||||
from swift.common.utils import get_logger, readconf
|
from swift.common.utils import get_logger, readconf
|
||||||
from swift.common.daemon import Daemon
|
from swift.common.daemon import Daemon
|
||||||
from slogging.log_common import LogProcessorCommon, multiprocess_collate, \
|
from slogging.log_common import LogProcessorCommon, multiprocess_collate, \
|
||||||
BadFileDownload
|
BadFileDownload
|
||||||
|
from slogging import common
|
||||||
|
|
||||||
now = datetime.datetime.now
|
now = datetime.datetime.now
|
||||||
|
|
||||||
@ -53,6 +56,7 @@ class LogProcessor(LogProcessorCommon):
|
|||||||
module = __import__(import_target, fromlist=[import_target])
|
module = __import__(import_target, fromlist=[import_target])
|
||||||
klass = getattr(module, class_name)
|
klass = getattr(module, class_name)
|
||||||
self.plugins[plugin_name]['instance'] = klass(plugin_conf)
|
self.plugins[plugin_name]['instance'] = klass(plugin_conf)
|
||||||
|
self.plugins[plugin_name]['keylist_mapping'] = {}
|
||||||
self.logger.debug(_('Loaded plugin "%s"') % plugin_name)
|
self.logger.debug(_('Loaded plugin "%s"') % plugin_name)
|
||||||
|
|
||||||
def process_one_file(self, plugin_name, account, container, object_name):
|
def process_one_file(self, plugin_name, account, container, object_name):
|
||||||
@ -90,7 +94,8 @@ class LogProcessor(LogProcessorCommon):
|
|||||||
def generate_keylist_mapping(self):
|
def generate_keylist_mapping(self):
|
||||||
keylist = {}
|
keylist = {}
|
||||||
for plugin in self.plugins:
|
for plugin in self.plugins:
|
||||||
plugin_keylist = self.plugins[plugin]['instance'].keylist_mapping()
|
plugin_keylist = self.plugins[plugin]['keylist_mapping'] = \
|
||||||
|
self.plugins[plugin]['instance'].keylist_mapping()
|
||||||
if not plugin_keylist:
|
if not plugin_keylist:
|
||||||
continue
|
continue
|
||||||
for k, v in plugin_keylist.items():
|
for k, v in plugin_keylist.items():
|
||||||
@ -134,6 +139,8 @@ class LogProcessorDaemon(Daemon):
|
|||||||
self.worker_count = int(c.get('worker_count', '1'))
|
self.worker_count = int(c.get('worker_count', '1'))
|
||||||
self._keylist_mapping = None
|
self._keylist_mapping = None
|
||||||
self.processed_files_filename = 'processed_files.pickle.gz'
|
self.processed_files_filename = 'processed_files.pickle.gz'
|
||||||
|
self.format_type = common.get_format_type(c, self.logger,
|
||||||
|
'format_type', 'csv')
|
||||||
|
|
||||||
def get_lookback_interval(self):
|
def get_lookback_interval(self):
|
||||||
"""
|
"""
|
||||||
@ -291,41 +298,90 @@ class LogProcessorDaemon(Daemon):
|
|||||||
|
|
||||||
def get_output(self, final_info):
|
def get_output(self, final_info):
|
||||||
"""
|
"""
|
||||||
:returns: a list of rows to appear in the csv file.
|
:returns: a list of rows to appear in the csv file or
|
||||||
|
a dictionary to appear in the json file.
|
||||||
|
|
||||||
The first row contains the column headers for the rest of the
|
csv file:
|
||||||
rows in the returned list.
|
The first row contains the column headers for the rest
|
||||||
|
of the rows in the returned list.
|
||||||
|
|
||||||
Each row after the first row corresponds to an account's data
|
Each row after the first row corresponds to an account's
|
||||||
for that hour.
|
data.
|
||||||
|
json file:
|
||||||
|
First level just shows a label "stats_data".
|
||||||
|
Second level of stats_data lists account names.
|
||||||
|
Each account block starts with a time label, and it
|
||||||
|
contains stats of account usage.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
sorted_keylist_mapping = sorted(self.keylist_mapping)
|
if self.format_type == 'json':
|
||||||
columns = ['data_ts', 'account'] + sorted_keylist_mapping
|
all_account_stats = collections.defaultdict(dict)
|
||||||
output = [columns]
|
for (account, year, month, day, hour), d in final_info.items():
|
||||||
for (account, year, month, day, hour), d in final_info.items():
|
data_ts = datetime.datetime(int(year), int(month),
|
||||||
data_ts = '%04d/%02d/%02d %02d:00:00' % \
|
int(day), int(hour))
|
||||||
(int(year), int(month), int(day), int(hour))
|
time_stamp = data_ts.strftime('%Y/%m/%d %H:00:00')
|
||||||
row = [data_ts, '%s' % (account)]
|
hourly_account_stats = \
|
||||||
for k in sorted_keylist_mapping:
|
self.restructure_stats_dictionary(d)
|
||||||
row.append(str(d[k]))
|
all_account_stats[account].update({time_stamp:
|
||||||
output.append(row)
|
hourly_account_stats})
|
||||||
|
output = {'stats_data': all_account_stats}
|
||||||
|
else: # csv
|
||||||
|
sorted_keylist_mapping = sorted(self.keylist_mapping)
|
||||||
|
columns = ['data_ts', 'account'] + sorted_keylist_mapping
|
||||||
|
output = [columns]
|
||||||
|
for (account, year, month, day, hour), d in final_info.items():
|
||||||
|
data_ts = '%04d/%02d/%02d %02d:00:00' % \
|
||||||
|
(int(year), int(month), int(day), int(hour))
|
||||||
|
row = [data_ts, '%s' % (account)]
|
||||||
|
for k in sorted_keylist_mapping:
|
||||||
|
row.append(str(d[k]))
|
||||||
|
output.append(row)
|
||||||
return output
|
return output
|
||||||
|
|
||||||
|
def restructure_stats_dictionary(self, target_dict):
|
||||||
|
"""
|
||||||
|
Restructure stats dictionary for json format.
|
||||||
|
|
||||||
|
:param target_dict: dictionary of restructuring target
|
||||||
|
|
||||||
|
:returns: restructured stats dictionary
|
||||||
|
"""
|
||||||
|
|
||||||
|
account_stats = {}
|
||||||
|
access_stats = {}
|
||||||
|
account_stats_key_list = \
|
||||||
|
self.log_processor.plugins['stats']['keylist_mapping']
|
||||||
|
access_stats_key_list = \
|
||||||
|
self.log_processor.plugins['access']['keylist_mapping']
|
||||||
|
hourly_stats = {'account_stats': account_stats,
|
||||||
|
'access_stats': access_stats}
|
||||||
|
for k, v in target_dict.items():
|
||||||
|
if k in account_stats_key_list:
|
||||||
|
account_stats[k] = int(v)
|
||||||
|
elif k in access_stats_key_list:
|
||||||
|
access_stats[k] = int(v)
|
||||||
|
return hourly_stats
|
||||||
|
|
||||||
def store_output(self, output):
|
def store_output(self, output):
|
||||||
"""
|
"""
|
||||||
Takes the a list of rows and stores a csv file of the values in the
|
Takes the dictionary or the list of rows and stores a json/csv file of
|
||||||
stats account.
|
the values in the stats account.
|
||||||
|
|
||||||
:param output: list of rows to appear in the csv file
|
:param output: a dictonary or a list of row
|
||||||
|
|
||||||
This csv file is final product of this script.
|
This json or csv file is final product of this script.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
out_buf = '\n'.join([','.join(row) for row in output])
|
if self.format_type == 'json':
|
||||||
h = hashlib.md5(out_buf).hexdigest()
|
out_buf = json.dumps(output, indent=2)
|
||||||
upload_name = time.strftime('%Y/%m/%d/%H/') + '%s.csv.gz' % h
|
h = hashlib.md5(out_buf).hexdigest()
|
||||||
f = cStringIO.StringIO(out_buf)
|
upload_name = time.strftime('%Y/%m/%d/%H/') + '%s.json.gz' % h
|
||||||
|
f = io.BytesIO(out_buf)
|
||||||
|
else:
|
||||||
|
out_buf = '\n'.join([','.join(row) for row in output])
|
||||||
|
h = hashlib.md5(out_buf).hexdigest()
|
||||||
|
upload_name = time.strftime('%Y/%m/%d/%H/') + '%s.csv.gz' % h
|
||||||
|
f = cStringIO.StringIO(out_buf)
|
||||||
self.log_processor.internal_proxy.upload_file(f,
|
self.log_processor.internal_proxy.upload_file(f,
|
||||||
self.log_processor_account,
|
self.log_processor_account,
|
||||||
self.log_processor_container,
|
self.log_processor_container,
|
||||||
@ -384,7 +440,7 @@ class LogProcessorDaemon(Daemon):
|
|||||||
"""
|
"""
|
||||||
Process log files that fall within the lookback interval.
|
Process log files that fall within the lookback interval.
|
||||||
|
|
||||||
Upload resulting csv file to stats account.
|
Upload resulting csv or json file to stats account.
|
||||||
|
|
||||||
Update processed files list and upload to stats account.
|
Update processed files list and upload to stats account.
|
||||||
"""
|
"""
|
||||||
|
Loading…
Reference in New Issue
Block a user