fixed pattern matching/globbing in swift-log-uploader

This commit is contained in:
Clay Gerrard 2011-03-11 17:50:54 -06:00
parent 089de66dfa
commit 19e185749f
3 changed files with 369 additions and 112 deletions

View File

@ -14,7 +14,7 @@ swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
# log_dir = /var/log/swift/ # log_dir = /var/log/swift/
swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
container_name = log_data container_name = log_data
source_filename_format = access-%Y%m%d%H source_filename_pattern = access-%Y%m%d%H
# new_log_cutoff = 7200 # new_log_cutoff = 7200
# unlink_log = True # unlink_log = True
class_path = swift.stats.access_processor.AccessLogProcessor class_path = swift.stats.access_processor.AccessLogProcessor
@ -31,7 +31,7 @@ class_path = swift.stats.access_processor.AccessLogProcessor
# log_dir = /var/log/swift/ # log_dir = /var/log/swift/
swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
container_name = account_stats container_name = account_stats
source_filename_format = stats-%Y%m%d%H_* source_filename_pattern = stats-%Y%m%d%H_.*
# new_log_cutoff = 7200 # new_log_cutoff = 7200
# unlink_log = True # unlink_log = True
class_path = swift.stats.stats_processor.StatsLogProcessor class_path = swift.stats.stats_processor.StatsLogProcessor

View File

@ -18,7 +18,7 @@ import os
import hashlib import hashlib
import time import time
import gzip import gzip
import glob import re
from paste.deploy import appconfig from paste.deploy import appconfig
from swift.common.internal_proxy import InternalProxy from swift.common.internal_proxy import InternalProxy
@ -44,29 +44,30 @@ class LogUploader(Daemon):
def __init__(self, uploader_conf, plugin_name): def __init__(self, uploader_conf, plugin_name):
super(LogUploader, self).__init__(uploader_conf) super(LogUploader, self).__init__(uploader_conf)
log_dir = uploader_conf.get('log_dir', '/var/log/swift/') log_name = '%s-log-uploader' % plugin_name
swift_account = uploader_conf['swift_account'] self.logger = utils.get_logger(uploader_conf, log_name,
container_name = uploader_conf['container_name'] log_route=plugin_name)
source_filename_format = uploader_conf['source_filename_format'] self.log_dir = uploader_conf.get('log_dir', '/var/log/swift/')
self.swift_account = uploader_conf['swift_account']
self.container_name = uploader_conf['container_name']
proxy_server_conf_loc = uploader_conf.get('proxy_server_conf', proxy_server_conf_loc = uploader_conf.get('proxy_server_conf',
'/etc/swift/proxy-server.conf') '/etc/swift/proxy-server.conf')
proxy_server_conf = appconfig('config:%s' % proxy_server_conf_loc, proxy_server_conf = appconfig('config:%s' % proxy_server_conf_loc,
name='proxy-server') name='proxy-server')
new_log_cutoff = int(uploader_conf.get('new_log_cutoff', '7200'))
unlink_log = uploader_conf.get('unlink_log', 'True').lower() in \
('true', 'on', '1', 'yes')
self.unlink_log = unlink_log
self.new_log_cutoff = new_log_cutoff
if not log_dir.endswith('/'):
log_dir = log_dir + '/'
self.log_dir = log_dir
self.swift_account = swift_account
self.container_name = container_name
self.filename_format = source_filename_format
self.internal_proxy = InternalProxy(proxy_server_conf) self.internal_proxy = InternalProxy(proxy_server_conf)
log_name = '%s-log-uploader' % plugin_name self.new_log_cutoff = int(uploader_conf.get('new_log_cutoff', '7200'))
self.logger = utils.get_logger(uploader_conf, log_name, self.unlink_log = uploader_conf.get('unlink_log', 'True').lower() in \
log_route=plugin_name) utils.TRUE_VALUES
# source_filename_format is deprecated
source_filename_format = uploader_conf.get('source_filename_format')
source_filename_pattern = uploader_conf.get('source_filename_pattern')
if source_filename_format and not source_filename_pattern:
self.logger.warning(_('source_filename_format is unreliable and '
'deprecated; use source_filename_pattern'))
self.pattern = self.convert_glob_to_regex(source_filename_format)
else:
self.pattern = source_filename_pattern or '%Y%m%d%H'
def run_once(self, *args, **kwargs): def run_once(self, *args, **kwargs):
self.logger.info(_("Uploading logs")) self.logger.info(_("Uploading logs"))
@ -75,70 +76,114 @@ class LogUploader(Daemon):
self.logger.info(_("Uploading logs complete (%0.2f minutes)") % self.logger.info(_("Uploading logs complete (%0.2f minutes)") %
((time.time() - start) / 60)) ((time.time() - start) / 60))
def convert_glob_to_regex(self, glob):
"""
Make a best effort to support old style config globs
:param : old style config source_filename_format
:returns : new style config source_filename_pattern
"""
pattern = glob
pattern = pattern.replace('.', r'\.')
pattern = pattern.replace('*', r'.*')
pattern = pattern.replace('?', r'.?')
return pattern
def validate_filename_pattern(self):
"""
Validate source_filename_pattern
:returns : valid regex pattern based on soruce_filename_pattern with
group matches substituded for date fmt markers
"""
pattern = self.pattern
markers = {
'%Y': ('year', '(?P<year>[0-9]{4})'),
'%m': ('month', '(?P<month>[0-1][0-9])'),
'%d': ('day', '(?P<day>[0-3][0-9])'),
'%H': ('hour', '(?P<hour>[0-2][0-9])'),
}
for marker, (type, group) in markers.items():
if marker not in self.pattern:
self.logger.error(_('source_filename_pattern much contain a '
'marker %(marker)s to match the '
'%(type)s') % {'marker': marker,
'type': type})
return
pattern = pattern.replace(marker, group)
return pattern
def get_relpath_to_files_under_log_dir(self):
"""
Look under log_dir recursively and return all filenames as relpaths
:returns : list of strs, the relpath to all filenames under log_dir
"""
all_files = []
for path, dirs, files in os.walk(self.log_dir):
all_files.extend(os.path.join(path, f) for f in files)
return [os.path.relpath(f, start=self.log_dir) for f in all_files]
def filter_files(self, all_files, pattern):
"""
Filter files based on regex pattern
:param all_files: list of strs, relpath of the filenames under log_dir
:param pattern: regex pattern to match against filenames
:returns : dict mapping full path of file to match group dict
"""
filename2match = {}
found_match = False
for filename in all_files:
match = re.match(pattern, filename)
if match:
found_match = True
full_path = os.path.join(self.log_dir, filename)
filename2match[full_path] = match.groupdict()
else:
self.logger.debug(_('%(filename)s does not match '
'%(pattern)s') % {'filename': filename,
'pattern': pattern})
return filename2match
def upload_all_logs(self): def upload_all_logs(self):
i = [(self.filename_format.index(c), c) for c in '%Y %m %d %H'.split()] """
i.sort() Match files under log_dir to source_filename_pattern and upload to swift
year_offset = month_offset = day_offset = hour_offset = None """
base_offset = len(self.log_dir.rstrip('/')) + 1 pattern = self.validate_filename_pattern()
for start, c in i: if not pattern:
offset = base_offset + start self.logger.error(_('Invalid filename_format'))
if c == '%Y':
year_offset = offset, offset + 4
# Add in the difference between len(%Y) and the expanded
# version of %Y (????). This makes sure the codes after this
# one will align properly in the final filename.
base_offset += 2
elif c == '%m':
month_offset = offset, offset + 2
elif c == '%d':
day_offset = offset, offset + 2
elif c == '%H':
hour_offset = offset, offset + 2
if not (year_offset and month_offset and day_offset and hour_offset):
# don't have all the parts, can't upload anything
return return
glob_pattern = self.filename_format all_files = self.get_relpath_to_files_under_log_dir()
glob_pattern = glob_pattern.replace('%Y', '????', 1) filename2match = self.filter_files(all_files, pattern)
glob_pattern = glob_pattern.replace('%m', '??', 1) if not filename2match:
glob_pattern = glob_pattern.replace('%d', '??', 1) self.logger.info(_('No files in %(log_dir)s match %(pattern)s') %
glob_pattern = glob_pattern.replace('%H', '??', 1) {'log_dir': self.log_dir, 'pattern': pattern})
filelist = glob.iglob(os.path.join(self.log_dir, glob_pattern)) return
current_hour = int(time.strftime('%H')) if not self.internal_proxy.create_container(self.swift_account,
today = int(time.strftime('%Y%m%d')) self.container_name):
self.internal_proxy.create_container(self.swift_account, self.logger.error(_('Unable to create container for '
self.container_name) '%(account)s/%(container)s') % {
for filename in filelist: 'account': self.swift_account,
try: 'container': self.container_name})
# From the filename, we need to derive the year, month, day, return
# and hour for the file. These values are used in the uploaded for filename, match in filename2match.items():
# object's name, so they should be a reasonably accurate # don't process very new logs
# representation of the time for which the data in the file was seconds_since_mtime = time.time() - os.stat(filename).st_mtime
# collected. The file's last modified time is not a reliable if seconds_since_mtime < self.new_log_cutoff:
# representation of the data in the file. For example, an old self.logger.debug(_("Skipping log: %(file)s "
# log file (from hour A) may be uploaded or moved into the "(< %(cutoff)d seconds old)") % {
# log_dir in hour Z. The file's modified time will be for hour 'file': filename,
# Z, and therefore the object's name in the system will not 'cutoff': self.new_log_cutoff})
# represent the data in it.
# If the filename doesn't match the format, it shouldn't be
# uploaded.
year = filename[slice(*year_offset)]
month = filename[slice(*month_offset)]
day = filename[slice(*day_offset)]
hour = filename[slice(*hour_offset)]
except IndexError:
# unexpected filename format, move on
self.logger.error(_("Unexpected log: %s") % filename)
continue continue
if ((time.time() - os.stat(filename).st_mtime) < self.upload_one_log(filename, **match)
self.new_log_cutoff):
# don't process very new logs
self.logger.debug(
_("Skipping log: %(file)s (< %(cutoff)d seconds old)") %
{'file': filename, 'cutoff': self.new_log_cutoff})
continue
self.upload_one_log(filename, year, month, day, hour)
def upload_one_log(self, filename, year, month, day, hour): def upload_one_log(self, filename, year, month, day, hour):
"""
Upload one file to swift
"""
if os.path.getsize(filename) == 0: if os.path.getsize(filename) == 0:
self.logger.debug(_("Log %s is 0 length, skipping") % filename) self.logger.debug(_("Log %s is 0 length, skipping") % filename)
return return

View File

@ -20,7 +20,12 @@ import os
from datetime import datetime from datetime import datetime
from tempfile import mkdtemp from tempfile import mkdtemp
from shutil import rmtree from shutil import rmtree
from functools import partial
from collections import defaultdict
import random
import string
from test.unit import temptree
from swift.stats import log_uploader from swift.stats import log_uploader
import logging import logging
@ -29,34 +34,62 @@ LOGGER = logging.getLogger()
DEFAULT_GLOB = '%Y%m%d%H' DEFAULT_GLOB = '%Y%m%d%H'
COMPRESSED_DATA = '\x1f\x8b\x08\x08\x87\xa5zM\x02\xffdata\x00KI,I\x04\x00c' \
'\xf3\xf3\xad\x04\x00\x00\x00'
def mock_appconfig(*args, **kwargs):
pass
class MockInternalProxy():
def __init__(self, *args, **kwargs):
pass
def create_container(self, *args, **kwargs):
return True
def upload_file(self, *args, **kwargs):
return True
_orig_LogUploader = log_uploader.LogUploader
class MockLogUploader(_orig_LogUploader):
def __init__(self, conf, logger=LOGGER):
conf['swift_account'] = conf.get('swift_account', '')
conf['container_name'] = conf.get('container_name', '')
conf['new_log_cutoff'] = conf.get('new_log_cutoff', '0')
conf['source_filename_format'] = conf.get(
'source_filename_format', conf.get('filename_format'))
log_uploader.LogUploader.__init__(self, conf, 'plugin')
self.logger = logger
self.uploaded_files = []
def upload_one_log(self, filename, year, month, day, hour):
d = {'year': year, 'month': month, 'day': day, 'hour': hour}
self.uploaded_files.append((filename, d))
_orig_LogUploader.upload_one_log(self, filename, year, month,
day, hour)
class TestLogUploader(unittest.TestCase): class TestLogUploader(unittest.TestCase):
def test_upload_all_logs(self): def setUp(self):
# mock internal proxy
self._orig_InternalProxy = log_uploader.InternalProxy
self._orig_appconfig = log_uploader.appconfig
log_uploader.InternalProxy = MockInternalProxy
log_uploader.appconfig = mock_appconfig
class MockInternalProxy(): def tearDown(self):
log_uploader.appconfig = self._orig_appconfig
def create_container(self, *args, **kwargs): log_uploader.InternalProxy = self._orig_InternalProxy
pass
class MonkeyLogUploader(log_uploader.LogUploader):
def __init__(self, conf, logger=LOGGER):
self.log_dir = conf['log_dir']
self.filename_format = conf.get('filename_format',
DEFAULT_GLOB)
self.new_log_cutoff = 0
self.logger = logger
self.internal_proxy = MockInternalProxy()
self.swift_account = ''
self.container_name = ''
self.uploaded_files = []
def upload_one_log(self, filename, year, month, day, hour):
d = {'year': year, 'month': month, 'day': day, 'hour': hour}
self.uploaded_files.append((filename, d))
def test_deprecated_glob_style_upload_all_logs(self):
tmpdir = mkdtemp() tmpdir = mkdtemp()
try: try:
today = datetime.now() today = datetime.now()
@ -72,7 +105,7 @@ class TestLogUploader(unittest.TestCase):
open(os.path.join(tmpdir, ts), 'w').close() open(os.path.join(tmpdir, ts), 'w').close()
conf = {'log_dir': tmpdir} conf = {'log_dir': tmpdir}
uploader = MonkeyLogUploader(conf) uploader = MockLogUploader(conf)
uploader.upload_all_logs() uploader.upload_all_logs()
self.assertEquals(len(uploader.uploaded_files), 24) self.assertEquals(len(uploader.uploaded_files), 24)
for i, file_date in enumerate(sorted(uploader.uploaded_files)): for i, file_date in enumerate(sorted(uploader.uploaded_files)):
@ -112,7 +145,7 @@ class TestLogUploader(unittest.TestCase):
'log_dir': '%s/' % tmpdir, 'log_dir': '%s/' % tmpdir,
'filename_format': 'swift-blah_98764.%Y%m%d-%H*.tar.gz', 'filename_format': 'swift-blah_98764.%Y%m%d-%H*.tar.gz',
} }
uploader = MonkeyLogUploader(conf) uploader = MockLogUploader(conf)
uploader.upload_all_logs() uploader.upload_all_logs()
self.assertEquals(len(uploader.uploaded_files), 24) self.assertEquals(len(uploader.uploaded_files), 24)
for i, file_date in enumerate(sorted(uploader.uploaded_files)): for i, file_date in enumerate(sorted(uploader.uploaded_files)):
@ -146,22 +179,201 @@ class TestLogUploader(unittest.TestCase):
'log_dir': tmpdir, 'log_dir': tmpdir,
'filename_format': '*.%Y%m%d%H.log', 'filename_format': '*.%Y%m%d%H.log',
} }
uploader = MonkeyLogUploader(conf) uploader = MockLogUploader(conf)
uploader.upload_all_logs() uploader.upload_all_logs()
self.assertEquals(len(uploader.uploaded_files), 24) self.assertEquals(len(uploader.uploaded_files), 24)
for i, file_date in enumerate(sorted(uploader.uploaded_files)): fname_to_int = lambda x: int(os.path.basename(x[0]).split('.')[0])
numerically = lambda x, y: cmp(fname_to_int(x),
fname_to_int(y))
for i, file_date in enumerate(sorted(uploader.uploaded_files,
cmp=numerically)):
d = {'year': year, 'month': month, 'day': day, 'hour': i} d = {'year': year, 'month': month, 'day': day, 'hour': i}
for k, v in d.items(): for k, v in d.items():
d[k] = '%0.2d' % v d[k] = '%0.2d' % v
expected = (os.path.join(tmpdir, '%s.%s%0.2d.log' % expected = (os.path.join(tmpdir, '%s.%s%0.2d.log' %
(i, today_str, i)), d) (i, today_str, i)), d)
# TODO: support wildcards before the date pattern self.assertEquals(file_date, expected)
# (i.e. relative offsets)
#print file_date
#self.assertEquals(file_date, expected)
finally: finally:
rmtree(tmpdir) rmtree(tmpdir)
def test_bad_pattern_in_config(self):
files = [datetime.now().strftime('%Y%m%d%H')]
with temptree(files, contents=[COMPRESSED_DATA] * len(files)) as t:
# invalid pattern
conf = {'log_dir': t, 'source_filename_pattern': '%Y%m%d%h'} # should be %H
uploader = MockLogUploader(conf)
self.assertFalse(uploader.validate_filename_pattern())
uploader.upload_all_logs()
self.assertEquals(uploader.uploaded_files, [])
conf = {'log_dir': t, 'source_filename_pattern': '%Y%m%d%H'}
uploader = MockLogUploader(conf)
self.assert_(uploader.validate_filename_pattern())
uploader.upload_all_logs()
self.assertEquals(len(uploader.uploaded_files), 1)
# deprecated warning on source_filename_format
class MockLogger():
def __init__(self):
self.msgs = defaultdict(list)
def log(self, level, msg):
self.msgs[level].append(msg)
def __getattr__(self, attr):
return partial(self.log, attr)
logger = MockLogger.logger = MockLogger()
def mock_get_logger(*args, **kwargs):
return MockLogger.logger
_orig_get_logger = log_uploader.utils.get_logger
try:
log_uploader.utils.get_logger = mock_get_logger
conf = {'source_filename_format': '%Y%m%d%H'}
uploader = MockLogUploader(conf, logger=logger)
self.assert_([m for m in logger.msgs['warning']
if 'deprecated' in m])
finally:
log_uploader.utils.get_logger = _orig_get_logger
# convert source_filename_format to regex
conf = {'source_filename_format': 'pattern-*.%Y%m%d%H.*.gz'}
uploader = MockLogUploader(conf)
expected = r'pattern-.*\.%Y%m%d%H\..*\.gz'
self.assertEquals(uploader.pattern, expected)
# use source_filename_pattern if we have the choice!
conf = {
'source_filename_format': 'bad',
'source_filename_pattern': 'good',
}
uploader = MockLogUploader(conf)
self.assertEquals(uploader.pattern, 'good')
def test_pattern_upload_all_logs(self):
# test empty dir
with temptree([]) as t:
conf = {'log_dir': t}
uploader = MockLogUploader(conf)
uploader.run_once()
self.assertEquals(len(uploader.uploaded_files), 0)
def get_random_length_str(max_len=10, chars=string.ascii_letters):
return ''.join(random.choice(chars) for x in
range(random.randint(1, max_len)))
template = 'prefix_%(random)s_%(digits)s.blah.' \
'%(datestr)s%(hour)0.2d00-%(next_hour)0.2d00-%(number)s.gz'
pattern = r'prefix_.*_[0-9]+\.blah\.%Y%m%d%H00-[0-9]{2}00' \
'-[0-9]?[0-9]\.gz'
files_that_should_match = []
# add some files that match
for i in range(24):
fname = template % {
'random': get_random_length_str(),
'digits': get_random_length_str(16, string.digits),
'datestr': datetime.now().strftime('%Y%m%d'),
'hour': i,
'next_hour': i + 1,
'number': random.randint(0, 20),
}
files_that_should_match.append(fname)
# add some files that don't match
files = list(files_that_should_match)
for i in range(24):
fname = template % {
'random': get_random_length_str(),
'digits': get_random_length_str(16, string.digits),
'datestr': datetime.now().strftime('%Y%m'),
'hour': i,
'next_hour': i + 1,
'number': random.randint(0, 20),
}
files.append(fname)
for fname in files:
print fname
with temptree(files, contents=[COMPRESSED_DATA] * len(files)) as t:
self.assertEquals(len(os.listdir(t)), 48)
conf = {'source_filename_pattern': pattern, 'log_dir': t}
uploader = MockLogUploader(conf)
uploader.run_once()
self.assertEquals(len(os.listdir(t)), 24)
self.assertEquals(len(uploader.uploaded_files), 24)
files_that_were_uploaded = set(x[0] for x in
uploader.uploaded_files)
for f in files_that_should_match:
self.assert_(os.path.join(t, f) in files_that_were_uploaded)
def test_log_cutoff(self):
files = [datetime.now().strftime('%Y%m%d%H')]
with temptree(files) as t:
conf = {'log_dir': t, 'new_log_cutoff': '7200'}
uploader = MockLogUploader(conf)
uploader.run_once()
self.assertEquals(len(uploader.uploaded_files), 0)
conf = {'log_dir': t, 'new_log_cutoff': '0'}
uploader = MockLogUploader(conf)
uploader.run_once()
self.assertEquals(len(uploader.uploaded_files), 1)
def test_create_container_fail(self):
files = [datetime.now().strftime('%Y%m%d%H')]
with temptree(files) as t:
conf = {'log_dir': t}
uploader = MockLogUploader(conf)
uploader.run_once()
self.assertEquals(len(uploader.uploaded_files), 1)
with temptree(files) as t:
conf = {'log_dir': t}
uploader = MockLogUploader(conf)
# mock create_container to fail
uploader.internal_proxy.create_container = lambda *args: False
uploader.run_once()
self.assertEquals(len(uploader.uploaded_files), 0)
def test_unlink_log(self):
files = [datetime.now().strftime('%Y%m%d%H')]
with temptree(files, contents=[COMPRESSED_DATA]) as t:
conf = {'log_dir': t, 'unlink_log': 'false'}
uploader = MockLogUploader(conf)
uploader.run_once()
self.assertEquals(len(uploader.uploaded_files), 1)
# file still there
self.assertEquals(len(os.listdir(t)), 1)
conf = {'log_dir': t, 'unlink_log': 'true'}
uploader = MockLogUploader(conf)
uploader.run_once()
self.assertEquals(len(uploader.uploaded_files), 1)
# file gone
self.assertEquals(len(os.listdir(t)), 0)
def test_upload_file_failed(self):
files = [datetime.now().strftime('%Y%m%d%H')]
with temptree(files, contents=[COMPRESSED_DATA]) as t:
conf = {'log_dir': t, 'unlink_log': 'true'}
uploader = MockLogUploader(conf)
# mock upload_file to fail, and clean up mock
def mock_upload_file(self, *args, **kwargs):
uploader.uploaded_files.pop()
return False
uploader.internal_proxy.upload_file = mock_upload_file
uploader.run_once()
self.assertEquals(len(uploader.uploaded_files), 0)
# file still there
self.assertEquals(len(os.listdir(t)), 1)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()