Merge pull request #1 from notmyname/access_log_delivery

access log delivery feature
This commit is contained in:
David 2011-08-08 09:03:45 -07:00
commit 447b2a1a88
10 changed files with 961 additions and 207 deletions

25
bin/swift-access-log-delivery Executable file
View File

@ -0,0 +1,25 @@
#!/usr/bin/python
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from slogging.access_log_delivery import AccessLogDeliveryDaemon
from swift.common.utils import parse_options
from swift.common.daemon import run_daemon
if __name__ == '__main__':
conf_file, options = parse_options(once=True)
run_daemon(AccessLogDeliveryDaemon, conf_file,
section_name='access-log-delivery',
log_name='access-log-delivery', **options)

View File

@ -0,0 +1,19 @@
[access-log-delivery]
swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
# container_name = access_log_delivery_data
# proxy_server_conf = /etc/swift/proxy-server.conf
# log_facility = LOG_LOCAL0
# log_level = INFO
# lookback_hours = 120
# lookback_window = 120
# user = swift
# processed_files_object_name = processed_files.pickle.gz
# frequency = 3600
# target_container = .ACCESS_LOGS
log_source_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
# log_source_container_name = log_data
# metadata_key = x-container-meta-access-log-delivery
# memcache_servers =
# worker_count = 1
# server_name = proxy-server
# working_dir = /tmp/swift

View File

@ -42,6 +42,7 @@ setup(
install_requires=[], # removed for better compat
scripts=['bin/swift-account-stats-logger',
'bin/swift-container-stats-logger',
'bin/swift-log-stats-collector', 'bin/swift-log-uploader'
'bin/swift-log-stats-collector', 'bin/swift-log-uploader',
'bin/swift-access-log-delivery'
],
)

View File

@ -0,0 +1,386 @@
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import collections
import datetime
from uuid import uuid4
import Queue
from urllib import unquote
import os
import cPickle
import cStringIO
import functools
import random
import errno
from swift.common.daemon import Daemon
from swift.common.utils import get_logger, TRUE_VALUES, split_path
from swift.common.exceptions import LockTimeout, ChunkReadTimeout
from slogging.log_common import LogProcessorCommon, multiprocess_collate, \
BadFileDownload, lock_file
month_map = '_ Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split()
MEMOIZE_KEY_LIMIT = 10000
MEMOIZE_FLUSH_RATE = 0.25
def make_clf_from_parts(parts):
format = '%(client_ip)s - - [%(day)s/%(month)s/%(year)s:%(hour)s:' \
'%(minute)s:%(second)s %(tz)s] "%(method)s %(request)s ' \
'%(http_version)s" %(code)s %(bytes_out)s "%(referrer)s" ' \
'"%(user_agent)s"'
try:
return format % parts
except KeyError:
return None
def memoize(func):
cache = {}
@functools.wraps(func)
def wrapped(*args):
key = tuple(args)
if key in cache:
return cache[key]
result = func(*args)
len_cache = len(cache)
if len_cache > MEMOIZE_KEY_LIMIT:
cache_keys = cache.keys()
for _unused in xrange(int(len_cache * MEMOIZE_FLUSH_RATE)):
index_to_delete = random.randrange(0, len(cache_keys))
key_to_delete = cache_keys.pop(index_to_delete)
del cache[key_to_delete]
cache[key] = result
return result
return wrapped
class FileBuffer(object):
def __init__(self, limit, logger):
self.buffers = collections.defaultdict(list)
self.limit = limit
self.logger = logger
self.total_size = 0
def write(self, filename, data):
self.buffers[filename].append(data)
self.total_size += len(data)
if self.total_size >= self.limit:
self.flush()
def flush(self):
while self.buffers:
filename_list = self.buffers.keys()
for filename in filename_list:
out = '\n'.join(self.buffers[filename]) + '\n'
mid_dirs = os.path.dirname(filename)
try:
os.makedirs(mid_dirs)
except OSError, err:
if err.errno == errno.EEXIST:
pass
else:
raise
try:
with lock_file(filename, append=True, unlink=False) as f:
f.write(out)
except LockTimeout:
# couldn't write, we'll try again later
self.logger.debug(_('Timeout writing to %s' % filename))
else:
del self.buffers[filename]
self.total_size = 0
class AccessLogDelivery(LogProcessorCommon):
def __init__(self, conf, logger):
super(AccessLogDelivery, self).__init__(conf, logger,
'access-log-delivery')
self.frequency = int(conf.get('frequency', '3600'))
self.metadata_key = conf.get('metadata_key',
'x-container-meta-access-log-delivery').lower()
self.server_name = conf.get('server_name', 'proxy-server')
self.working_dir = conf.get('working_dir', '/tmp/swift').rstrip('/')
buffer_limit = conf.get('buffer_limit', '10485760')
self.file_buffer = FileBuffer(buffer_limit, logger)
def process_one_file(self, account, container, object_name):
files_to_upload = set()
try:
year, month, day, hour, _unused = object_name.split('/', 4)
except ValueError:
self.logger.info(_('Odd object name: %s. Skipping' % object_name))
return
filename_pattern = '%s/%%s/%%s/%s/%s/%s/%s' % (self.working_dir, year,
month, day, hour)
self.logger.debug(_('Processing %s' % object_name))
# get an iter of the object data
compressed = object_name.endswith('.gz')
stream = self.get_object_data(account, container, object_name,
compressed=compressed)
buff = collections.defaultdict(list)
for line in stream:
clf, account, container = self.convert_log_line(line)
if not clf or not account or not container:
# bad log line
continue
if self.get_container_save_log_flag(account, container):
filename = filename_pattern % (account, container)
self.file_buffer.write(filename, clf)
files_to_upload.add(filename)
self.file_buffer.flush()
return files_to_upload
@memoize
def get_container_save_log_flag(self, account, container):
key = 'save-access-logs-%s-%s' % (account, container)
flag = self.memcache.get(key)
if flag is None:
metadata = self.internal_proxy.get_container_metadata(account,
container)
val = metadata.get(self.metadata_key)
flag = val in TRUE_VALUES
self.memcache.set(key, flag, timeout=self.frequency)
return flag
def convert_log_line(self, raw_log):
parts = self.log_line_parser(raw_log)
return (make_clf_from_parts(parts),
parts.get('account'),
parts.get('container_name'))
def log_line_parser(self, raw_log):
'''given a raw access log line, return a dict of the good parts'''
d = {}
try:
(unused,
server,
client_ip,
lb_ip,
timestamp,
method,
request,
http_version,
code,
referrer,
user_agent,
auth_token,
bytes_in,
bytes_out,
etag,
trans_id,
headers,
processing_time) = (unquote(x) for x in
raw_log[16:].split(' ')[:18])
except ValueError:
self.logger.debug(_('Bad line data: %s') % repr(raw_log))
return {}
if server != self.server_name:
# incorrect server name in log line
self.logger.debug(_('Bad server name: found "%(found)s" ' \
'expected "%(expected)s"') %
{'found': server, 'expected': self.server_name})
return {}
try:
(version, account, container_name, object_name) = \
split_path(request, 2, 4, True)
except ValueError, e:
self.logger.debug(_('Invalid path: %(error)s from data: %(log)s') %
{'error': e, 'log': repr(raw_log)})
return {}
if container_name is not None:
container_name = container_name.split('?', 1)[0]
if object_name is not None:
object_name = object_name.split('?', 1)[0]
account = account.split('?', 1)[0]
d['client_ip'] = client_ip
d['lb_ip'] = lb_ip
d['method'] = method
d['request'] = request
d['http_version'] = http_version
d['code'] = code
d['referrer'] = referrer
d['user_agent'] = user_agent
d['auth_token'] = auth_token
d['bytes_in'] = bytes_in
d['bytes_out'] = bytes_out
d['etag'] = etag
d['trans_id'] = trans_id
d['processing_time'] = processing_time
day, month, year, hour, minute, second = timestamp.split('/')
d['day'] = day
month = ('%02s' % month_map.index(month)).replace(' ', '0')
d['month'] = month
d['year'] = year
d['hour'] = hour
d['minute'] = minute
d['second'] = second
d['tz'] = '+0000'
d['account'] = account
d['container_name'] = container_name
d['object_name'] = object_name
d['bytes_out'] = int(d['bytes_out'].replace('-', '0'))
d['bytes_in'] = int(d['bytes_in'].replace('-', '0'))
d['code'] = int(d['code'])
return d
class AccessLogDeliveryDaemon(Daemon):
"""
Processes access (proxy) logs to split them up by account and deliver the
split logs to their respective accounts.
"""
def __init__(self, c):
self.conf = c
super(AccessLogDeliveryDaemon, self).__init__(c)
self.logger = get_logger(c, log_route='access-log-delivery')
self.log_processor = AccessLogDelivery(c, self.logger)
self.lookback_hours = int(c.get('lookback_hours', '120'))
self.lookback_window = int(c.get('lookback_window',
str(self.lookback_hours)))
self.log_delivery_account = c['swift_account']
self.log_delivery_container = c.get('container_name',
'access_log_delivery_data')
self.source_account = c['log_source_account']
self.source_container = c.get('log_source_container_name', 'log_data')
self.target_container = c.get('target_container', '.ACCESS_LOGS')
self.frequency = int(c.get('frequency', '3600'))
self.processed_files_object_name = c.get('processed_files_object_name',
'processed_files.pickle.gz')
self.worker_count = int(c.get('worker_count', '1'))
self.working_dir = c.get('working_dir', '/tmp/swift')
if self.working_dir.endswith('/'):
self.working_dir = self.working_dir.rstrip('/')
def run_once(self, *a, **kw):
self.logger.info(_("Beginning log processing"))
start = time.time()
if self.lookback_hours == 0:
lookback_start = None
lookback_end = None
else:
delta_hours = datetime.timedelta(hours=self.lookback_hours)
lookback_start = datetime.datetime.now() - delta_hours
lookback_start = lookback_start.strftime('%Y%m%d%H')
if self.lookback_window == 0:
lookback_end = None
else:
delta_window = datetime.timedelta(hours=self.lookback_window)
lookback_end = datetime.datetime.now() - \
delta_hours + \
delta_window
lookback_end = lookback_end.strftime('%Y%m%d%H')
self.logger.debug('lookback_start: %s' % lookback_start)
self.logger.debug('lookback_end: %s' % lookback_end)
try:
# Note: this file (or data set) will grow without bound.
# In practice, if it becomes a problem (say, after many months of
# running), one could manually prune the file to remove older
# entries. Automatically pruning on each run could be dangerous.
# There is not a good way to determine when an old entry should be
# pruned (lookback_hours could be set to anything and could change)
processed_files_stream = self.log_processor.get_object_data(
self.log_delivery_account,
self.log_delivery_container,
self.processed_files_object_name,
compressed=True)
buf = '\n'.join(x for x in processed_files_stream)
if buf:
already_processed_files = cPickle.loads(buf)
else:
already_processed_files = set()
except BadFileDownload, err:
if err.status_code == 404:
already_processed_files = set()
else:
self.logger.error(_('Access log delivery unable to load list '
'of already processed log files'))
return
self.logger.debug(_('found %d processed files') % \
len(already_processed_files))
logs_to_process = self.log_processor.get_container_listing(
self.source_account,
self.source_container,
lookback_start,
lookback_end,
already_processed_files)
self.logger.info(_('loaded %d files to process') %
len(logs_to_process))
if not logs_to_process:
self.logger.info(_("Log processing done (%0.2f minutes)") %
((time.time() - start) / 60))
return
logs_to_process = [(self.source_account, self.source_container, x)
for x in logs_to_process]
# map
processor_args = (self.conf, self.logger)
results = multiprocess_collate(AccessLogDelivery, processor_args,
'process_one_file', logs_to_process,
self.worker_count)
#reduce
processed_files = already_processed_files
files_to_upload = set()
for item, data in results:
a, c, o = item
processed_files.add(o)
if data:
files_to_upload.update(data)
len_working_dir = len(self.working_dir) + 1 # +1 for the trailing '/'
for filename in files_to_upload:
target_name = filename[len_working_dir:]
account, target_name = target_name.split('/', 1)
some_id = uuid4().hex
target_name = '%s/%s.log.gz' % (target_name, some_id)
success = self.log_processor.internal_proxy.upload_file(filename,
account,
self.target_container,
target_name)
if success:
os.unlink(filename)
self.logger.debug('Uploaded %s to account %s' % (filename,
account))
else:
self.logger.error('Could not upload %s to account %s' % (
filename, account))
# cleanup
s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL)
f = cStringIO.StringIO(s)
success = self.log_processor.internal_proxy.upload_file(f,
self.log_delivery_account,
self.log_delivery_container,
self.processed_files_object_name)
if not success:
self.logger.error('Error uploading updated processed files log')
self.logger.info(_("Log processing done (%0.2f minutes)") %
((time.time() - start) / 60))
def run_forever(self, *a, **kw):
while True:
start_time = time.time()
self.run_once()
end_time = time.time()
# don't run more than once every self.frequency seconds
sleep_time = self.frequency - (end_time - start_time)
time.sleep(max(0, sleep_time))

View File

@ -16,6 +16,7 @@
import webob
from urllib import quote, unquote
from json import loads as json_loads
import copy
from slogging.compressing_file_reader import CompressingFileReader
from swift.proxy.server import BaseApplication
@ -208,3 +209,13 @@ class InternalProxy(object):
if resp.status_int == 204:
return []
return json_loads(resp.body)
def get_container_metadata(self, account, container):
path = '/v1/%s/%s/' % (account, container)
req = webob.Request.blank(path, environ={'REQUEST_METHOD': 'HEAD'})
resp = self._handle_request(req)
out = {}
for k, v in resp.headers.iteritems():
if k.lower().startswith('x-container-meta-'):
out[k] = v
return out

240
slogging/log_common.py Normal file
View File

@ -0,0 +1,240 @@
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import multiprocessing
import Queue
import datetime
import zlib
import time
from paste.deploy import appconfig
from contextlib import contextmanager
import os
import errno
import fcntl
from swift.common.memcached import MemcacheRing
from slogging.internal_proxy import InternalProxy
from swift.common.utils import get_logger
from swift.common.exceptions import ChunkReadTimeout, LockTimeout
class BadFileDownload(Exception):
def __init__(self, status_code=None):
self.status_code = status_code
class LogProcessorCommon(object):
def __init__(self, conf, logger, log_route='log-processor'):
if isinstance(logger, tuple):
self.logger = get_logger(*logger, log_route=log_route)
else:
self.logger = logger
self.memcache = MemcacheRing([s.strip() for s in
conf.get('memcache_servers', '').split(',')
if s.strip()])
self.conf = conf
self._internal_proxy = None
@property
def internal_proxy(self):
if self._internal_proxy is None:
stats_conf = self.conf.get('log-processor', {})
proxy_server_conf_loc = stats_conf.get('proxy_server_conf',
'/etc/swift/proxy-server.conf')
proxy_server_conf = appconfig(
'config:%s' % proxy_server_conf_loc,
name='proxy-server')
self._internal_proxy = InternalProxy(proxy_server_conf,
self.logger,
retries=3)
return self._internal_proxy
def get_object_data(self, swift_account, container_name, object_name,
compressed=False):
'''reads an object and yields its lines'''
code, o = self.internal_proxy.get_object(swift_account, container_name,
object_name)
if code < 200 or code >= 300:
raise BadFileDownload(code)
last_part = ''
last_compressed_part = ''
# magic in the following zlib.decompressobj argument is courtesy of
# Python decompressing gzip chunk-by-chunk
# http://stackoverflow.com/questions/2423866
d = zlib.decompressobj(16 + zlib.MAX_WBITS)
try:
for chunk in o:
if compressed:
try:
chunk = d.decompress(chunk)
except zlib.error:
self.logger.debug(_('Bad compressed data for %s')
% '/'.join((swift_account, container_name,
object_name)))
raise BadFileDownload() # bad compressed data
parts = chunk.split('\n')
parts[0] = last_part + parts[0]
for part in parts[:-1]:
yield part
last_part = parts[-1]
if last_part:
yield last_part
except ChunkReadTimeout:
raise BadFileDownload()
def get_container_listing(self, swift_account, container_name,
start_date=None, end_date=None,
listing_filter=None):
'''
Get a container listing, filtered by start_date, end_date, and
listing_filter. Dates, if given, must be in YYYYMMDDHH format
'''
search_key = None
if start_date is not None:
try:
parsed_date = time.strptime(start_date, '%Y%m%d%H')
except ValueError:
pass
else:
year = '%04d' % parsed_date.tm_year
month = '%02d' % parsed_date.tm_mon
day = '%02d' % parsed_date.tm_mday
hour = '%02d' % parsed_date.tm_hour
search_key = '/'.join([year, month, day, hour])
end_key = None
if end_date is not None:
try:
parsed_date = time.strptime(end_date, '%Y%m%d%H')
except ValueError:
pass
else:
year = '%04d' % parsed_date.tm_year
month = '%02d' % parsed_date.tm_mon
day = '%02d' % parsed_date.tm_mday
# Since the end_marker filters by <, we need to add something
# to make sure we get all the data under the last hour. Adding
# one to the hour should be all-inclusive.
hour = '%02d' % (parsed_date.tm_hour + 1)
end_key = '/'.join([year, month, day, hour])
container_listing = self.internal_proxy.get_container_list(
swift_account,
container_name,
marker=search_key,
end_marker=end_key)
results = []
if listing_filter is None:
listing_filter = set()
for item in container_listing:
name = item['name']
if name not in listing_filter:
results.append(name)
return results
def multiprocess_collate(processor_klass, processor_args, processor_method,
items_to_process, worker_count, logger=None):
'''
'''
results = []
in_queue = multiprocessing.Queue()
out_queue = multiprocessing.Queue()
for _junk in range(worker_count):
p = multiprocessing.Process(target=collate_worker,
args=(processor_klass,
processor_args,
processor_method,
in_queue,
out_queue))
p.start()
results.append(p)
for x in items_to_process:
in_queue.put(x)
for _junk in range(worker_count):
in_queue.put(None) # tell the worker to end
while True:
try:
item, data = out_queue.get_nowait()
except Queue.Empty:
time.sleep(.01)
else:
if isinstance(data, Exception):
if logger:
logger.exception(data)
else:
yield item, data
if not any(r.is_alive() for r in results) and out_queue.empty():
# all the workers are done and nothing is in the queue
break
def collate_worker(processor_klass, processor_args, processor_method, in_queue,
out_queue):
'''worker process for multiprocess_collate'''
p = processor_klass(*processor_args)
while True:
item = in_queue.get()
if item is None:
# no more work to process
break
try:
method = getattr(p, processor_method)
except AttributeError:
return
try:
ret = method(*item)
except Exception, err:
ret = err
out_queue.put((item, ret))
@contextmanager
def lock_file(filename, timeout=10, append=False, unlink=True):
"""
Context manager that acquires a lock on a file. This will block until
the lock can be acquired, or the timeout time has expired (whichever occurs
first).
:param filename: file to be locked
:param timeout: timeout (in seconds)
:param append: True if file should be opened in append mode
:param unlink: True if the file should be unlinked at the end
"""
flags = os.O_CREAT | os.O_RDWR
if append:
flags |= os.O_APPEND
fd = os.open(filename, flags)
try:
with LockTimeout(timeout, filename):
while True:
try:
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
break
except IOError, err:
if err.errno != errno.EAGAIN:
raise
sleep(0.01)
mode = 'r+'
if append:
mode = 'a+'
file_obj = os.fdopen(fd, mode)
yield file_obj
finally:
try:
file_obj.close()
except UnboundLocalError:
pass # may have not actually opened the file
if unlink:
os.unlink(filename)

View File

@ -26,30 +26,19 @@ import cPickle
import hashlib
from slogging.internal_proxy import InternalProxy
from swift.common.exceptions import ChunkReadTimeout
from swift.common.utils import get_logger, readconf, TRUE_VALUES
from swift.common.utils import get_logger, readconf
from swift.common.daemon import Daemon
from slogging.log_common import LogProcessorCommon, multiprocess_collate, \
BadFileDownload
now = datetime.datetime.now
class BadFileDownload(Exception):
def __init__(self, status_code=None):
self.status_code = status_code
class LogProcessor(object):
class LogProcessor(LogProcessorCommon):
"""Load plugins, process logs"""
def __init__(self, conf, logger):
if isinstance(logger, tuple):
self.logger = get_logger(*logger, log_route='log-processor')
else:
self.logger = logger
self.conf = conf
self._internal_proxy = None
super(LogProcessor, self).__init__(conf, logger, 'log-processor')
# load the processing plugins
self.plugins = {}
@ -57,9 +46,6 @@ class LogProcessor(object):
for section in (x for x in conf if x.startswith(plugin_prefix)):
plugin_name = section[len(plugin_prefix):]
plugin_conf = conf.get(section, {})
if plugin_conf.get('processable', 'true').lower() not in \
TRUE_VALUES:
continue
self.plugins[plugin_name] = plugin_conf
class_path = self.plugins[plugin_name]['class_path']
import_target, class_name = class_path.rsplit('.', 1)
@ -68,20 +54,6 @@ class LogProcessor(object):
self.plugins[plugin_name]['instance'] = klass(plugin_conf)
self.logger.debug(_('Loaded plugin "%s"') % plugin_name)
@property
def internal_proxy(self):
if self._internal_proxy is None:
stats_conf = self.conf.get('log-processor', {})
proxy_server_conf_loc = stats_conf.get('proxy_server_conf',
'/etc/swift/proxy-server.conf')
proxy_server_conf = appconfig(
'config:%s' % proxy_server_conf_loc,
name='proxy-server')
self._internal_proxy = InternalProxy(proxy_server_conf,
self.logger,
retries=3)
return self._internal_proxy
def process_one_file(self, plugin_name, account, container, object_name):
self.logger.info(_('Processing %(obj)s with plugin "%(plugin)s"') %
{'obj': '/'.join((account, container, object_name)),
@ -114,87 +86,6 @@ class LogProcessor(object):
total_list.append(x)
return total_list
def get_container_listing(self, swift_account, container_name,
start_date=None, end_date=None,
listing_filter=None):
'''
Get a container listing, filtered by start_date, end_date, and
listing_filter. Dates, if given, must be in YYYYMMDDHH format
'''
search_key = None
if start_date is not None:
try:
parsed_date = time.strptime(start_date, '%Y%m%d%H')
except ValueError:
pass
else:
year = '%04d' % parsed_date.tm_year
month = '%02d' % parsed_date.tm_mon
day = '%02d' % parsed_date.tm_mday
hour = '%02d' % parsed_date.tm_hour
search_key = '/'.join([year, month, day, hour])
end_key = None
if end_date is not None:
try:
parsed_date = time.strptime(end_date, '%Y%m%d%H')
except ValueError:
pass
else:
year = '%04d' % parsed_date.tm_year
month = '%02d' % parsed_date.tm_mon
day = '%02d' % parsed_date.tm_mday
# Since the end_marker filters by <, we need to add something
# to make sure we get all the data under the last hour. Adding
# one to the hour should be all-inclusive.
hour = '%02d' % (parsed_date.tm_hour + 1)
end_key = '/'.join([year, month, day, hour])
container_listing = self.internal_proxy.get_container_list(
swift_account,
container_name,
marker=search_key,
end_marker=end_key)
results = []
if listing_filter is None:
listing_filter = set()
for item in container_listing:
name = item['name']
if name not in listing_filter:
results.append(name)
return results
def get_object_data(self, swift_account, container_name, object_name,
compressed=False):
'''reads an object and yields its lines'''
code, o = self.internal_proxy.get_object(swift_account, container_name,
object_name)
if code < 200 or code >= 300:
raise BadFileDownload(code)
last_part = ''
last_compressed_part = ''
# magic in the following zlib.decompressobj argument is courtesy of
# Python decompressing gzip chunk-by-chunk
# http://stackoverflow.com/questions/2423866
d = zlib.decompressobj(16 + zlib.MAX_WBITS)
try:
for chunk in o:
if compressed:
try:
chunk = d.decompress(chunk)
except zlib.error:
self.logger.debug(_('Bad compressed data for %s')
% '/'.join((swift_account, container_name,
object_name)))
raise BadFileDownload() # bad compressed data
parts = chunk.split('\n')
parts[0] = last_part + parts[0]
for part in parts[:-1]:
yield part
last_part = parts[-1]
if last_part:
yield last_part
except ChunkReadTimeout:
raise BadFileDownload()
def generate_keylist_mapping(self):
keylist = {}
for plugin in self.plugins:
@ -471,7 +362,8 @@ class LogProcessorDaemon(Daemon):
# map
processor_args = (self.total_conf, self.logger)
results = multiprocess_collate(processor_args, logs_to_process,
results = multiprocess_collate(LogProcessor, processor_args,
'process_one_file', logs_to_process,
self.worker_count)
# reduce
@ -529,53 +421,3 @@ class LogProcessorDaemon(Daemon):
self.logger.info(_("Log processing done (%0.2f minutes)") %
((time.time() - start) / 60))
def multiprocess_collate(processor_args, logs_to_process, worker_count):
'''
yield hourly data from logs_to_process
Every item that this function yields will be added to the processed files
list.
'''
results = []
in_queue = multiprocessing.Queue()
out_queue = multiprocessing.Queue()
for _junk in range(worker_count):
p = multiprocessing.Process(target=collate_worker,
args=(processor_args,
in_queue,
out_queue))
p.start()
results.append(p)
for x in logs_to_process:
in_queue.put(x)
for _junk in range(worker_count):
in_queue.put(None) # tell the worker to end
while True:
try:
item, data = out_queue.get_nowait()
except Queue.Empty:
time.sleep(.01)
else:
if not isinstance(data, Exception):
yield item, data
if not any(r.is_alive() for r in results) and out_queue.empty():
# all the workers are done and nothing is in the queue
break
def collate_worker(processor_args, in_queue, out_queue):
'''worker process for multiprocess_collate'''
p = LogProcessor(*processor_args)
while True:
item = in_queue.get()
if item is None:
# no more work to process
break
try:
ret = p.process_one_file(*item)
except Exception, err:
item_string = '/'.join(item[1:])
p.logger.exception("Unable to process file '%s'" % (item_string))
ret = err
out_queue.put((item, ret))

View File

@ -0,0 +1,218 @@
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# TODO: Tests
import unittest
from contextlib import contextmanager
from test.unit import temptree
from slogging import access_log_delivery
class DumbLogger(object):
def __getattr__(self, n):
return self.foo
def foo(self, *a, **kw):
pass
class FakeMemcache(object):
def __init__(self):
self.store = {}
def get(self, key):
return self.store.get(key)
def keys(self):
return self.store.keys()
def set(self, key, value, timeout=0):
self.store[key] = value
return True
def incr(self, key, timeout=0):
self.store[key] = self.store.setdefault(key, 0) + 1
return self.store[key]
@contextmanager
def soft_lock(self, key, timeout=0, retries=5):
yield True
def delete(self, key):
try:
del self.store[key]
except Exception:
pass
return True
class TestAccessLogDelivery(unittest.TestCase):
def test_log_line_parser_query_args(self):
p = access_log_delivery.AccessLogDelivery({}, DumbLogger())
log_line = [str(x) for x in range(18)]
log_line[1] = 'proxy-server'
log_line[4] = '1/Jan/3/4/5/6'
log_line[6] = '/v1/a/c/o?foo'
log_line = 'x' * 16 + ' '.join(log_line)
res = p.log_line_parser(log_line)
expected = {'code': 8, 'processing_time': '17', 'auth_token': '11',
'month': '01', 'second': '6', 'year': '3', 'tz': '+0000',
'http_version': '7', 'object_name': 'o', 'etag': '14',
'method': '5', 'trans_id': '15', 'client_ip': '2',
'bytes_out': 13, 'container_name': 'c', 'day': '1',
'minute': '5', 'account': 'a', 'hour': '4',
'referrer': '9', 'request': '/v1/a/c/o?foo',
'user_agent': '10', 'bytes_in': 12, 'lb_ip': '3'}
self.assertEquals(res, expected)
def test_log_line_parser_field_count(self):
p = access_log_delivery.AccessLogDelivery({}, DumbLogger())
# too few fields
log_line = [str(x) for x in range(17)]
log_line[1] = 'proxy-server'
log_line[4] = '1/Jan/3/4/5/6'
log_line[6] = '/v1/a/c/o'
log_line = 'x' * 16 + ' '.join(log_line)
res = p.log_line_parser(log_line)
expected = {}
self.assertEquals(res, expected)
# right amount of fields
log_line = [str(x) for x in range(18)]
log_line[1] = 'proxy-server'
log_line[4] = '1/Jan/3/4/5/6'
log_line[6] = '/v1/a/c/o'
log_line = 'x' * 16 + ' '.join(log_line)
res = p.log_line_parser(log_line)
expected = {'code': 8, 'processing_time': '17', 'auth_token': '11',
'month': '01', 'second': '6', 'year': '3', 'tz': '+0000',
'http_version': '7', 'object_name': 'o', 'etag': '14',
'method': '5', 'trans_id': '15', 'client_ip': '2',
'bytes_out': 13, 'container_name': 'c', 'day': '1',
'minute': '5', 'account': 'a', 'hour': '4',
'referrer': '9', 'request': '/v1/a/c/o',
'user_agent': '10', 'bytes_in': 12, 'lb_ip': '3'}
self.assertEquals(res, expected)
# too many fields
log_line = [str(x) for x in range(19)]
log_line[1] = 'proxy-server'
log_line[4] = '1/Jan/3/4/5/6'
log_line[6] = '/v1/a/c/o'
log_line = 'x' * 16 + ' '.join(log_line)
res = p.log_line_parser(log_line)
expected = {'code': 8, 'processing_time': '17', 'auth_token': '11',
'month': '01', 'second': '6', 'year': '3', 'tz': '+0000',
'http_version': '7', 'object_name': 'o', 'etag': '14',
'method': '5', 'trans_id': '15', 'client_ip': '2',
'bytes_out': 13, 'container_name': 'c', 'day': '1',
'minute': '5', 'account': 'a', 'hour': '4',
'referrer': '9', 'request': '/v1/a/c/o',
'user_agent': '10', 'bytes_in': 12, 'lb_ip': '3'}
self.assertEquals(res, expected)
def test_make_clf_from_parts(self):
p = access_log_delivery.AccessLogDelivery({}, DumbLogger())
log_line = [str(x) for x in range(18)]
log_line[1] = 'proxy-server'
log_line[4] = '1/Jan/3/4/5/6'
log_line[6] = '/v1/a/c/o?foo'
log_line = 'x' * 16 + ' '.join(log_line)
parts = p.log_line_parser(log_line)
clf = access_log_delivery.make_clf_from_parts(parts)
expect = '2 - - [1/01/3:4:5:6 +0000] "5 /v1/a/c/o?foo 7" 8 13 "9" "10"'
self.assertEquals(clf, expect)
def test_convert_log_line(self):
p = access_log_delivery.AccessLogDelivery({}, DumbLogger())
log_line = [str(x) for x in range(18)]
log_line[1] = 'proxy-server'
log_line[4] = '1/Jan/3/4/5/6'
log_line[6] = '/v1/a/c/o?foo'
log_line = 'x' * 16 + ' '.join(log_line)
res = p.convert_log_line(log_line)
expected = (
'2 - - [1/01/3:4:5:6 +0000] "5 /v1/a/c/o?foo 7" 8 13 "9" "10"',
'a',
'c')
self.assertEquals(res, expected)
def test_get_container_save_log_flag(self):
p = access_log_delivery.AccessLogDelivery({}, DumbLogger())
def my_get_metadata_true(*a, **kw):
return {p.metadata_key: 'yes'}
def my_get_metadata_false(*a, **kw):
return {p.metadata_key: 'no'}
p.internal_proxy.get_container_metadata = my_get_metadata_false
p.memcache = FakeMemcache()
res = p.get_container_save_log_flag('a', 'c1')
expected = False
self.assertEquals(res, expected)
p.internal_proxy.get_container_metadata = my_get_metadata_true
p.memcache = FakeMemcache()
res = p.get_container_save_log_flag('a', 'c2')
expected = True
self.assertEquals(res, expected)
def test_process_one_file(self):
with temptree([]) as t:
conf = {'working_dir': t}
p = access_log_delivery.AccessLogDelivery(conf, DumbLogger())
def my_get_object_data(*a, **kw):
all_lines = []
log_line = [str(x) for x in range(18)]
log_line[1] = 'proxy-server'
log_line[4] = '1/Jan/3/4/5/6'
log_line[6] = '/v1/a/c/o?foo'
yield 'x' * 16 + ' '.join(log_line)
log_line = [str(x) for x in range(18)]
log_line[1] = 'proxy-server'
log_line[4] = '1/Jan/3/4/5/6'
log_line[6] = '/v1/a/c/o'
yield 'x' * 16 + ' '.join(log_line)
log_line = [str(x) for x in range(18)]
log_line[1] = 'proxy-server'
log_line[4] = '1/Jan/3/4/5/6'
log_line[6] = '/v1/a2/c2/o2'
yield 'x' * 16 + ' '.join(log_line)
def my_get_container_save_log_flag(*a, **kw):
return True
p.get_object_data = my_get_object_data
p.get_container_save_log_flag = my_get_container_save_log_flag
res = p.process_one_file('a', 'c', '2011/03/14/12/hash')
expected = ['%s/a2/c2/2011/03/14/12' % t,
'%s/a/c/2011/03/14/12' % t]
self.assertEquals(res, set(expected))
lines = [p.convert_log_line(x)[0] for x in my_get_object_data()]
with open(expected[0], 'rb') as f:
raw = f.read()
res = '\n'.join(lines[2:]) + '\n'
self.assertEquals(res, raw)
with open(expected[1], 'rb') as f:
raw = f.read()
res = '\n'.join(lines[:2]) + '\n'
self.assertEquals(res, raw)
if __name__ == '__main__':
unittest.main()

View File

@ -23,6 +23,7 @@ import time
from slogging import internal_proxy
from slogging import log_processor
from slogging import log_common
from swift.common.exceptions import ChunkReadTimeout
@ -189,7 +190,7 @@ use = egg:swift#proxy
}})
p = log_processor.LogProcessor(access_proxy_config, DumbLogger())
p._internal_proxy = DumbInternalProxy(code=500)
self.assertRaises(log_processor.BadFileDownload, p.process_one_file,
self.assertRaises(log_common.BadFileDownload, p.process_one_file,
'access', 'a', 'c', 'o')
def test_get_container_listing(self):
@ -230,13 +231,13 @@ use = egg:swift#proxy
p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
p._internal_proxy = DumbInternalProxy(code=500)
result = p.get_object_data('a', 'c', 'o')
self.assertRaises(log_processor.BadFileDownload, list, result)
self.assertRaises(log_common.BadFileDownload, list, result)
p._internal_proxy = DumbInternalProxy(bad_compressed=True)
result = p.get_object_data('a', 'c', 'o.gz', True)
self.assertRaises(log_processor.BadFileDownload, list, result)
self.assertRaises(log_common.BadFileDownload, list, result)
p._internal_proxy = DumbInternalProxy(timeout=True)
result = p.get_object_data('a', 'c', 'o')
self.assertRaises(log_processor.BadFileDownload, list, result)
self.assertRaises(log_common.BadFileDownload, list, result)
def test_get_stat_totals(self):
stats_proxy_config = self.proxy_config.copy()
@ -331,7 +332,9 @@ use = egg:swift#proxy
work_request = ('access', 'a', 'c', 'o')
q_in.put(work_request)
q_in.put(None)
log_processor.collate_worker(processor_args, q_in, q_out)
processor_klass = log_processor.LogProcessor
log_common.collate_worker(processor_klass, processor_args,
'process_one_file', q_in, q_out)
item, ret = q_out.get()
self.assertEquals(item, work_request)
expected = {('acct', '2010', '07', '09', '04'):
@ -367,11 +370,13 @@ use = egg:swift#proxy
work_request = ('access', 'a', 'c', 'o')
q_in.put(work_request)
q_in.put(None)
log_processor.collate_worker(processor_args, q_in, q_out)
processor_klass = log_processor.LogProcessor
log_common.collate_worker(processor_klass, processor_args,
'process_one_file', q_in, q_out)
item, ret = q_out.get()
self.assertEquals(item, work_request)
# these only work for Py2.7+
#self.assertIsInstance(ret, log_processor.BadFileDownload)
#self.assertIsInstance(ret, log_common.BadFileDownload)
self.assertTrue(isinstance(ret, Exception))
finally:
log_processor.LogProcessor.get_object_data = orig_get_object_data
@ -394,7 +399,10 @@ use = egg:swift#proxy
processor_args = (proxy_config, DumbLogger())
item = ('access', 'a', 'c', 'o')
logs_to_process = [item]
results = log_processor.multiprocess_collate(processor_args,
processor_klass = log_processor.LogProcessor
results = log_processor.multiprocess_collate(processor_klass,
processor_args,
'process_one_file',
logs_to_process,
1)
results = list(results)
@ -414,7 +422,7 @@ use = egg:swift#proxy
def test_multiprocess_collate_errors(self):
def get_object_data(*a, **kw):
raise log_processor.BadFileDownload()
raise log_common.BadFileDownload()
orig_get_object_data = log_processor.LogProcessor.get_object_data
try:
log_processor.LogProcessor.get_object_data = get_object_data
@ -428,7 +436,10 @@ use = egg:swift#proxy
processor_args = (proxy_config, DumbLogger())
item = ('access', 'a', 'c', 'o')
logs_to_process = [item]
results = log_processor.multiprocess_collate(processor_args,
processor_klass = log_processor.LogProcessor
results = log_common.multiprocess_collate(processor_klass,
processor_args,
'process_one_file',
logs_to_process,
1)
results = list(results)
@ -495,7 +506,7 @@ class TestLogProcessorDaemon(unittest.TestCase):
def test_get_processed_files_list_bad_file_downloads(self):
class MockLogProcessor():
def __init__(self, status_code):
self.err = log_processor.BadFileDownload(status_code)
self.err = log_common.BadFileDownload(status_code)
def get_object_data(self, *a, **k):
raise self.err
@ -578,16 +589,16 @@ class TestLogProcessorDaemon(unittest.TestCase):
expected_data_out = {
'acct1_time1': {'out_field1': 16, 'out_field2': 5,
'out_field3': 3, 'out_field4': 8, 'out_field5': 0,
'out_field6': 0, 'out_field7': 0, },
'out_field6': 0, 'out_field7': 0},
'acct1_time2': {'out_field1': 9, 'out_field2': 5,
'out_field3': 0, 'out_field4': 0, 'out_field5': 0,
'out_field6': 0, 'out_field7': 0, },
'out_field6': 0, 'out_field7': 0},
'acct2_time1': {'out_field1': 13, 'out_field2': 7,
'out_field3': 0, 'out_field4': 0, 'out_field5': 0,
'out_field6': 0, 'out_field7': 0, },
'out_field6': 0, 'out_field7': 0},
'acct3_time3': {'out_field1': 17, 'out_field2': 9,
'out_field3': 0, 'out_field4': 0, 'out_field5': 0,
'out_field6': 0, 'out_field7': 0, },
'out_field6': 0, 'out_field7': 0},
}
self.assertEquals(expected_data_out,
@ -776,7 +787,8 @@ class TestLogProcessorDaemon(unittest.TestCase):
d = MockLogProcessorDaemon(self)
def mock_multiprocess_collate(processor_args, logs_to_process,
def mock_multiprocess_collate(processor_klass, processor_args,
processor_method, logs_to_process,
worker_count):
self.assertEquals(d.total_conf, processor_args[0])
self.assertEquals(d.logger, processor_args[1])