pep8 issues, refactored log_processor a tiny bit for testing (lazy load internal proxy), added a few comments
This commit is contained in:
parent
55c997aa94
commit
0bb5857da3
@ -21,7 +21,9 @@ from swift.common.utils import split_path
|
||||
|
||||
month_map = '_ Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split()
|
||||
|
||||
|
||||
class AccessLogProcessor(object):
|
||||
"""Transform proxy server access logs"""
|
||||
|
||||
def __init__(self, conf):
|
||||
self.server_name = conf.get('server_name', 'proxy')
|
||||
@ -112,8 +114,8 @@ class AccessLogProcessor(object):
|
||||
d['account'] = account
|
||||
d['container_name'] = container_name
|
||||
d['object_name'] = object_name
|
||||
d['bytes_out'] = int(d['bytes_out'].replace('-','0'))
|
||||
d['bytes_in'] = int(d['bytes_in'].replace('-','0'))
|
||||
d['bytes_out'] = int(d['bytes_out'].replace('-', '0'))
|
||||
d['bytes_in'] = int(d['bytes_in'].replace('-', '0'))
|
||||
d['code'] = int(d['code'])
|
||||
return d
|
||||
|
||||
@ -151,12 +153,12 @@ class AccessLogProcessor(object):
|
||||
source = 'service'
|
||||
else:
|
||||
source = 'public'
|
||||
|
||||
|
||||
if line_data['client_ip'] in self.service_ips:
|
||||
source = 'service'
|
||||
|
||||
d[(source, 'bytes_out')] = d.setdefault((source, 'bytes_out'), 0) + \
|
||||
bytes_out
|
||||
d[(source, 'bytes_out')] = d.setdefault((
|
||||
source, 'bytes_out'), 0) + bytes_out
|
||||
d[(source, 'bytes_in')] = d.setdefault((source, 'bytes_in'), 0) + \
|
||||
bytes_in
|
||||
|
||||
@ -171,7 +173,7 @@ class AccessLogProcessor(object):
|
||||
path = line_data.get('path', 0)
|
||||
d['path_query'] = d.setdefault('path_query', 0) + path
|
||||
|
||||
code = '%dxx' % (code/100)
|
||||
code = '%dxx' % (code / 100)
|
||||
key = (source, op_level, method, code)
|
||||
d[key] = d.setdefault(key, 0) + 1
|
||||
|
||||
@ -220,5 +222,5 @@ class AccessLogProcessor(object):
|
||||
keylist_mapping[code].add(
|
||||
(source, level, verb, code))
|
||||
keylist_mapping['ops_count'].add(
|
||||
(source,level,verb,code))
|
||||
(source, level, verb, code))
|
||||
return keylist_mapping
|
||||
|
@ -21,9 +21,15 @@ from swift.account.server import DATADIR as account_server_data_dir
|
||||
from swift.common.db import AccountBroker
|
||||
from swift.common.internal_proxy import InternalProxy
|
||||
from swift.common.utils import renamer, get_logger, readconf
|
||||
from swift.common.constraints import check_mount
|
||||
from swift.common.daemon import Daemon
|
||||
|
||||
|
||||
class AccountStat(Daemon):
|
||||
"""Extract storage stats from account databases on the account
|
||||
storage nodes
|
||||
"""
|
||||
|
||||
def __init__(self, stats_conf):
|
||||
super(AccountStat, self).__init__(stats_conf)
|
||||
target_dir = stats_conf.get('log_dir', '/var/log/swift')
|
||||
@ -45,7 +51,7 @@ class AccountStat(Daemon):
|
||||
start = time.time()
|
||||
self.find_and_process()
|
||||
self.logger.info("Gathering account stats complete (%0.2f minutes)" %
|
||||
((time.time()-start)/60))
|
||||
((time.time() - start) / 60))
|
||||
|
||||
def find_and_process(self):
|
||||
#TODO: handle a counter in the filename to prevent overwrites?
|
||||
@ -53,10 +59,10 @@ class AccountStat(Daemon):
|
||||
#TODO: don't use /tmp?
|
||||
tmp_filename = os.path.join('/tmp', src_filename)
|
||||
with open(tmp_filename, 'wb') as statfile:
|
||||
#statfile.write('Account Name, Container Count, Object Count, Bytes Used\n')
|
||||
#statfile.write(
|
||||
# 'Account Name, Container Count, Object Count, Bytes Used\n')
|
||||
for device in os.listdir(self.devices):
|
||||
if self.mount_check and \
|
||||
not os.path.ismount(os.path.join(self.devices, device)):
|
||||
if self.mount_check and not check_mount(self.devices, device):
|
||||
self.logger.error("Device %s is not mounted, skipping." %
|
||||
device)
|
||||
continue
|
||||
@ -70,7 +76,8 @@ class AccountStat(Daemon):
|
||||
for root, dirs, files in os.walk(accounts, topdown=False):
|
||||
for filename in files:
|
||||
if filename.endswith('.db'):
|
||||
broker = AccountBroker(os.path.join(root, filename))
|
||||
db_path = os.path.join(root, filename)
|
||||
broker = AccountBroker(db_path)
|
||||
if not broker.is_deleted():
|
||||
(account_name,
|
||||
_, _, _,
|
||||
@ -78,9 +85,8 @@ class AccountStat(Daemon):
|
||||
object_count,
|
||||
bytes_used,
|
||||
_, _) = broker.get_info()
|
||||
line_data = '"%s",%d,%d,%d\n' % (account_name,
|
||||
container_count,
|
||||
object_count,
|
||||
bytes_used)
|
||||
line_data = '"%s",%d,%d,%d\n' % (
|
||||
account_name, container_count,
|
||||
object_count, bytes_used)
|
||||
statfile.write(line_data)
|
||||
renamer(tmp_filename, os.path.join(self.target_dir, src_filename))
|
||||
|
@ -29,26 +29,22 @@ from swift.common.exceptions import ChunkReadTimeout
|
||||
from swift.common.utils import get_logger, readconf
|
||||
from swift.common.daemon import Daemon
|
||||
|
||||
|
||||
class BadFileDownload(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class LogProcessor(object):
|
||||
"""Load plugins, process logs"""
|
||||
|
||||
def __init__(self, conf, logger):
|
||||
stats_conf = conf.get('log-processor', {})
|
||||
|
||||
proxy_server_conf_loc = stats_conf.get('proxy_server_conf',
|
||||
'/etc/swift/proxy-server.conf')
|
||||
self.proxy_server_conf = appconfig('config:%s' % proxy_server_conf_loc,
|
||||
name='proxy-server')
|
||||
|
||||
if isinstance(logger, tuple):
|
||||
self.logger = get_logger(*logger)
|
||||
else:
|
||||
self.logger = logger
|
||||
self.internal_proxy = InternalProxy(self.proxy_server_conf,
|
||||
self.logger,
|
||||
retries=3)
|
||||
|
||||
|
||||
# load the processing plugins
|
||||
self.plugins = {}
|
||||
plugin_prefix = 'log-processor-'
|
||||
@ -56,11 +52,25 @@ class LogProcessor(object):
|
||||
plugin_name = section[len(plugin_prefix):]
|
||||
plugin_conf = conf.get(section, {})
|
||||
self.plugins[plugin_name] = plugin_conf
|
||||
import_target, class_name = plugin_conf['class_path'].rsplit('.', 1)
|
||||
class_path = self.plugins[plugin_name]['class_path']
|
||||
import_target, class_name = class_path.rsplit('.', 1)
|
||||
module = __import__(import_target, fromlist=[import_target])
|
||||
klass = getattr(module, class_name)
|
||||
self.plugins[plugin_name]['instance'] = klass(plugin_conf)
|
||||
|
||||
@property
|
||||
def internal_proxy(self):
|
||||
'''Lazy load internal proxy'''
|
||||
if self._internal_proxy is None:
|
||||
proxy_server_conf_loc = stats_conf.get('proxy_server_conf',
|
||||
'/etc/swift/proxy-server.conf')
|
||||
self.proxy_server_conf = appconfig('config:%s' % proxy_server_conf_loc,
|
||||
name='proxy-server')
|
||||
self._internal_proxy = InternalProxy(self.proxy_server_conf,
|
||||
self.logger,
|
||||
retries=3)
|
||||
return self._internal_proxy
|
||||
|
||||
def process_one_file(self, plugin_name, account, container, object_name):
|
||||
# get an iter of the object data
|
||||
compressed = object_name.endswith('.gz')
|
||||
@ -72,7 +82,8 @@ class LogProcessor(object):
|
||||
container,
|
||||
object_name)
|
||||
|
||||
def get_data_list(self, start_date=None, end_date=None, listing_filter=None):
|
||||
def get_data_list(self, start_date=None, end_date=None,
|
||||
listing_filter=None):
|
||||
total_list = []
|
||||
for name, data in self.plugins.items():
|
||||
account = data['swift_account']
|
||||
@ -89,8 +100,9 @@ class LogProcessor(object):
|
||||
total_list.append(x)
|
||||
return total_list
|
||||
|
||||
def get_container_listing(self, swift_account, container_name, start_date=None,
|
||||
end_date=None, listing_filter=None):
|
||||
def get_container_listing(self, swift_account, container_name,
|
||||
start_date=None, end_date=None,
|
||||
listing_filter=None):
|
||||
'''
|
||||
Get a container listing, filtered by start_date, end_date, and
|
||||
listing_filter. Dates, if given, should be in YYYYMMDDHH format
|
||||
@ -162,15 +174,16 @@ class LogProcessor(object):
|
||||
last_part = ''
|
||||
last_compressed_part = ''
|
||||
# magic in the following zlib.decompressobj argument is courtesy of
|
||||
# http://stackoverflow.com/questions/2423866/python-decompressing-gzip-chunk-by-chunk
|
||||
d = zlib.decompressobj(16+zlib.MAX_WBITS)
|
||||
# Python decompressing gzip chunk-by-chunk
|
||||
# http://stackoverflow.com/questions/2423866
|
||||
d = zlib.decompressobj(16 + zlib.MAX_WBITS)
|
||||
try:
|
||||
for chunk in o:
|
||||
if compressed:
|
||||
try:
|
||||
chunk = d.decompress(chunk)
|
||||
except zlib.error:
|
||||
raise BadFileDownload() # bad compressed data
|
||||
raise BadFileDownload() # bad compressed data
|
||||
parts = chunk.split('\n')
|
||||
parts[0] = last_part + parts[0]
|
||||
for part in parts[:-1]:
|
||||
@ -208,6 +221,8 @@ class LogProcessor(object):
|
||||
|
||||
|
||||
class LogProcessorDaemon(Daemon):
|
||||
"""Gather raw log data and farm proccessing, results output via print"""
|
||||
|
||||
def __init__(self, conf):
|
||||
c = conf.get('log-processor')
|
||||
super(LogProcessorDaemon, self).__init__(c)
|
||||
@ -228,15 +243,16 @@ class LogProcessorDaemon(Daemon):
|
||||
lookback_start = None
|
||||
lookback_end = None
|
||||
else:
|
||||
lookback_start = datetime.datetime.now() - \
|
||||
datetime.timedelta(hours=self.lookback_hours)
|
||||
delta_hours = datetime.timedelta(hours=self.lookback_hours)
|
||||
lookback_start = datetime.datetime.now() - delta_hours
|
||||
lookback_start = lookback_start.strftime('%Y%m%d%H')
|
||||
if self.lookback_window == 0:
|
||||
lookback_end = None
|
||||
else:
|
||||
delta_window = datetime.timedelta(hours=self.lookback_window)
|
||||
lookback_end = datetime.datetime.now() - \
|
||||
datetime.timedelta(hours=self.lookback_hours) + \
|
||||
datetime.timedelta(hours=self.lookback_window)
|
||||
delta_hours + \
|
||||
delta_window
|
||||
lookback_end = lookback_end.strftime('%Y%m%d%H')
|
||||
self.logger.debug('lookback_start: %s' % lookback_start)
|
||||
self.logger.debug('lookback_end: %s' % lookback_end)
|
||||
@ -261,7 +277,7 @@ class LogProcessorDaemon(Daemon):
|
||||
self.logger.info('loaded %d files to process' % len(logs_to_process))
|
||||
if not logs_to_process:
|
||||
self.logger.info("Log processing done (%0.2f minutes)" %
|
||||
((time.time()-start)/60))
|
||||
((time.time() - start) / 60))
|
||||
return
|
||||
|
||||
# map
|
||||
@ -315,7 +331,7 @@ class LogProcessorDaemon(Daemon):
|
||||
row = [bill_ts, data_ts]
|
||||
row.append('%s' % account)
|
||||
for k in sorted_keylist_mapping:
|
||||
row.append('%s'%d[k])
|
||||
row.append('%s' % d[k])
|
||||
print ','.join(row)
|
||||
|
||||
# cleanup
|
||||
@ -327,7 +343,8 @@ class LogProcessorDaemon(Daemon):
|
||||
'processed_files.pickle.gz')
|
||||
|
||||
self.logger.info("Log processing done (%0.2f minutes)" %
|
||||
((time.time()-start)/60))
|
||||
((time.time() - start) / 60))
|
||||
|
||||
|
||||
def multiprocess_collate(processor_args, logs_to_process):
|
||||
'''yield hourly data from logs_to_process'''
|
||||
@ -361,6 +378,7 @@ def multiprocess_collate(processor_args, logs_to_process):
|
||||
for r in results:
|
||||
r.join()
|
||||
|
||||
|
||||
def collate_worker(processor_args, in_queue, out_queue):
|
||||
'''worker process for multiprocess_collate'''
|
||||
p = LogProcessor(*processor_args)
|
||||
@ -373,4 +391,4 @@ def collate_worker(processor_args, in_queue, out_queue):
|
||||
time.sleep(.1)
|
||||
else:
|
||||
ret = p.process_one_file(*item)
|
||||
out_queue.put((item, ret))
|
||||
out_queue.put((item, ret))
|
||||
|
@ -25,18 +25,19 @@ from swift.common.internal_proxy import InternalProxy
|
||||
from swift.common.daemon import Daemon
|
||||
from swift.common import utils
|
||||
|
||||
|
||||
class LogUploader(Daemon):
|
||||
'''
|
||||
Given a local directory, a swift account, and a container name, LogParser
|
||||
will upload all files in the local directory to the given account/container.
|
||||
All but the newest files will be uploaded, and the files' md5 sum will be
|
||||
computed. The hash is used to prevent duplicate data from being uploaded
|
||||
multiple times in different files (ex: log lines). Since the hash is
|
||||
computed, it is also used as the uploaded object's etag to ensure data
|
||||
integrity.
|
||||
|
||||
will upload all files in the local directory to the given account/
|
||||
container. All but the newest files will be uploaded, and the files' md5
|
||||
sum will be computed. The hash is used to prevent duplicate data from
|
||||
being uploaded multiple times in different files (ex: log lines). Since
|
||||
the hash is computed, it is also used as the uploaded object's etag to
|
||||
ensure data integrity.
|
||||
|
||||
Note that after the file is successfully uploaded, it will be unlinked.
|
||||
|
||||
|
||||
The given proxy server config is used to instantiate a proxy server for
|
||||
the object uploads.
|
||||
'''
|
||||
@ -66,7 +67,7 @@ class LogUploader(Daemon):
|
||||
start = time.time()
|
||||
self.upload_all_logs()
|
||||
self.logger.info("Uploading logs complete (%0.2f minutes)" %
|
||||
((time.time()-start)/60))
|
||||
((time.time() - start) / 60))
|
||||
|
||||
def upload_all_logs(self):
|
||||
i = [(self.filename_format.index(c), c) for c in '%Y %m %d %H'.split()]
|
||||
@ -76,17 +77,17 @@ class LogUploader(Daemon):
|
||||
for start, c in i:
|
||||
offset = base_offset + start
|
||||
if c == '%Y':
|
||||
year_offset = offset, offset+4
|
||||
year_offset = offset, offset + 4
|
||||
# Add in the difference between len(%Y) and the expanded
|
||||
# version of %Y (????). This makes sure the codes after this
|
||||
# one will align properly in the final filename.
|
||||
base_offset += 2
|
||||
elif c == '%m':
|
||||
month_offset = offset, offset+2
|
||||
month_offset = offset, offset + 2
|
||||
elif c == '%d':
|
||||
day_offset = offset, offset+2
|
||||
day_offset = offset, offset + 2
|
||||
elif c == '%H':
|
||||
hour_offset = offset, offset+2
|
||||
hour_offset = offset, offset + 2
|
||||
if not (year_offset and month_offset and day_offset and hour_offset):
|
||||
# don't have all the parts, can't upload anything
|
||||
return
|
||||
@ -124,7 +125,8 @@ class LogUploader(Daemon):
|
||||
continue
|
||||
if (time.time() - os.stat(filename).st_mtime) < 7200:
|
||||
# don't process very new logs
|
||||
self.logger.debug("Skipping log: %s (< 2 hours old)" % filename)
|
||||
self.logger.debug(
|
||||
"Skipping log: %s (< 2 hours old)" % filename)
|
||||
continue
|
||||
self.upload_one_log(filename, year, month, day, hour)
|
||||
|
||||
@ -147,7 +149,7 @@ class LogUploader(Daemon):
|
||||
# By adding a hash to the filename, we ensure that uploaded files
|
||||
# have unique filenames and protect against uploading one file
|
||||
# more than one time. By using md5, we get an etag for free.
|
||||
target_filename = '/'.join([year, month, day, hour, filehash+'.gz'])
|
||||
target_filename = '/'.join([year, month, day, hour, filehash + '.gz'])
|
||||
if self.internal_proxy.upload_file(filename,
|
||||
self.swift_account,
|
||||
self.container_name,
|
||||
|
@ -13,7 +13,9 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
class StatsLogProcessor(object):
|
||||
"""Transform account storage stat logs"""
|
||||
|
||||
def __init__(self, conf):
|
||||
pass
|
||||
|
@ -112,7 +112,7 @@ class TestLogProcessor(unittest.TestCase):
|
||||
|
||||
def test_get_container_listing(self):
|
||||
p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
|
||||
p.internal_proxy = DumbInternalProxy()
|
||||
p._internal_proxy = DumbInternalProxy()
|
||||
result = p.get_container_listing('a', 'foo')
|
||||
expected = ['2010/03/14/13/obj1']
|
||||
self.assertEquals(result, expected)
|
||||
@ -133,7 +133,7 @@ class TestLogProcessor(unittest.TestCase):
|
||||
|
||||
def test_get_object_data(self):
|
||||
p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
|
||||
p.internal_proxy = DumbInternalProxy()
|
||||
p._internal_proxy = DumbInternalProxy()
|
||||
result = list(p.get_object_data('a', 'c', 'o', False))
|
||||
expected = ['obj','data']
|
||||
self.assertEquals(result, expected)
|
||||
@ -148,7 +148,7 @@ class TestLogProcessor(unittest.TestCase):
|
||||
'swift.stats.stats_processor.StatsLogProcessor'
|
||||
}})
|
||||
p = log_processor.LogProcessor(stats_proxy_config, DumbLogger())
|
||||
p.internal_proxy = DumbInternalProxy()
|
||||
p._internal_proxy = DumbInternalProxy()
|
||||
def get_object_data(*a,**kw):
|
||||
return [self.stats_test_line]
|
||||
p.get_object_data = get_object_data
|
||||
@ -158,4 +158,4 @@ class TestLogProcessor(unittest.TestCase):
|
||||
'object_count': 2,
|
||||
'container_count': 1,
|
||||
'bytes_used': 3}}
|
||||
self.assertEquals(result, expected)
|
||||
self.assertEquals(result, expected)
|
||||
|
Loading…
Reference in New Issue
Block a user