diff --git a/bin/swift-access-log-delivery b/bin/swift-access-log-delivery new file mode 100755 index 0000000..7e764b3 --- /dev/null +++ b/bin/swift-access-log-delivery @@ -0,0 +1,25 @@ +#!/usr/bin/python +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from slogging.access_log_delivery import AccessLogDeliveryDaemon +from swift.common.utils import parse_options +from swift.common.daemon import run_daemon + +if __name__ == '__main__': + conf_file, options = parse_options(once=True) + run_daemon(AccessLogDeliveryDaemon, conf_file, + section_name='access-log-delivery', + log_name='access-log-delivery', **options) diff --git a/etc/access-log-delivery.conf-sample b/etc/access-log-delivery.conf-sample new file mode 100644 index 0000000..95b673b --- /dev/null +++ b/etc/access-log-delivery.conf-sample @@ -0,0 +1,19 @@ +[access-log-delivery] +swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 +# container_name = access_log_delivery_data +# proxy_server_conf = /etc/swift/proxy-server.conf +# log_facility = LOG_LOCAL0 +# log_level = INFO +# lookback_hours = 120 +# lookback_window = 120 +# user = swift +# processed_files_object_name = processed_files.pickle.gz +# frequency = 3600 +# target_container = .ACCESS_LOGS +log_source_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 +# log_source_container_name = log_data +# metadata_key = x-container-meta-access-log-delivery +# memcache_servers = +# worker_count = 1 +# server_name = proxy-server +# working_dir = /tmp/swift diff --git a/setup.py b/setup.py index e2120d2..213483c 100644 --- a/setup.py +++ b/setup.py @@ -42,6 +42,7 @@ setup( install_requires=[], # removed for better compat scripts=['bin/swift-account-stats-logger', 'bin/swift-container-stats-logger', - 'bin/swift-log-stats-collector', 'bin/swift-log-uploader' + 'bin/swift-log-stats-collector', 'bin/swift-log-uploader', + 'bin/swift-access-log-delivery' ], ) diff --git a/slogging/access_log_delivery.py b/slogging/access_log_delivery.py new file mode 100644 index 0000000..5819d39 --- /dev/null +++ b/slogging/access_log_delivery.py @@ -0,0 +1,386 @@ +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import collections +import datetime +from uuid import uuid4 +import Queue +from urllib import unquote +import os +import cPickle +import cStringIO +import functools +import random +import errno + +from swift.common.daemon import Daemon +from swift.common.utils import get_logger, TRUE_VALUES, split_path +from swift.common.exceptions import LockTimeout, ChunkReadTimeout +from slogging.log_common import LogProcessorCommon, multiprocess_collate, \ + BadFileDownload, lock_file + + +month_map = '_ Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split() +MEMOIZE_KEY_LIMIT = 10000 +MEMOIZE_FLUSH_RATE = 0.25 + + +def make_clf_from_parts(parts): + format = '%(client_ip)s - - [%(day)s/%(month)s/%(year)s:%(hour)s:' \ + '%(minute)s:%(second)s %(tz)s] "%(method)s %(request)s ' \ + '%(http_version)s" %(code)s %(bytes_out)s "%(referrer)s" ' \ + '"%(user_agent)s"' + try: + return format % parts + except KeyError: + return None + + +def memoize(func): + cache = {} + + @functools.wraps(func) + def wrapped(*args): + key = tuple(args) + if key in cache: + return cache[key] + result = func(*args) + cache_keys = cache.keys() + len_cache = len(cache_keys) + if len_cache > MEMOIZE_KEY_LIMIT: + for _unused in xrange(len_cache * MEMOIZE_FLUSH_RATE): + index_to_delete = random.randrange(0, len(cache_keys)) + key_to_delete = cache_keys.pop(index_to_delete) + del cache[key_to_delete] + cache[key] = result + return result + return wrapped + + +class FileBuffer(object): + + def __init__(self, limit, logger): + self.buffers = collections.defaultdict(list) + self.limit = limit + self.logger = logger + self.total_size = 0 + + def write(self, filename, data): + self.buffers[filename].append(data) + self.total_size += len(data) + if self.total_size >= self.limit: + self.flush() + + def flush(self): + while self.buffers: + filename_list = self.buffers.keys() + for filename in filename_list: + out = '\n'.join(self.buffers[filename]) + '\n' + mid_dirs = os.path.dirname(filename) + try: + os.makedirs(mid_dirs) + except OSError, err: + if err.errno == errno.EEXIST: + pass + else: + raise + try: + with lock_file(filename, append=True, unlink=False) as f: + f.write(out) + except LockTimeout: + # couldn't write, we'll try again later + self.logger.debug(_('Timeout writing to %s' % filename)) + else: + del self.buffers[filename] + self.total_size = 0 + + +class AccessLogDelivery(LogProcessorCommon): + + def __init__(self, conf, logger): + super(AccessLogDelivery, self).__init__(conf, logger, + 'access-log-delivery') + self.frequency = int(conf.get('frequency', '3600')) + self.metadata_key = conf.get('metadata_key', + 'x-container-meta-access-log-delivery').lower() + self.server_name = conf.get('server_name', 'proxy-server') + self.working_dir = conf.get('working_dir', '/tmp/swift').rstrip('/') + buffer_limit = conf.get('buffer_limit', '10485760') + self.file_buffer = FileBuffer(buffer_limit, logger) + + def process_one_file(self, account, container, object_name): + files_to_upload = set() + try: + year, month, day, hour, _unused = object_name.split('/', 4) + except ValueError: + self.logger.info(_('Odd object name: %s. Skipping' % object_name)) + return + filename_pattern = '%s/%%s/%%s/%s/%s/%s/%s' % (self.working_dir, year, + month, day, hour) + self.logger.debug(_('Processing %s' % object_name)) + # get an iter of the object data + compressed = object_name.endswith('.gz') + stream = self.get_object_data(account, container, object_name, + compressed=compressed) + buff = collections.defaultdict(list) + for line in stream: + clf, account, container = self.convert_log_line(line) + if not clf or not account or not container: + # bad log line + continue + if self.get_container_save_log_flag(account, container): + filename = filename_pattern % (account, container) + self.file_buffer.write(filename, clf) + files_to_upload.add(filename) + self.file_buffer.flush() + return files_to_upload + + @memoize + def get_container_save_log_flag(self, account, container): + key = 'save-access-logs-%s-%s' % (account, container) + flag = self.memcache.get(key) + if flag is None: + metadata = self.internal_proxy.get_container_metadata(account, + container) + val = metadata.get(self.metadata_key) + flag = val in TRUE_VALUES + self.memcache.set(key, flag, timeout=self.frequency) + return flag + + def convert_log_line(self, raw_log): + parts = self.log_line_parser(raw_log) + return (make_clf_from_parts(parts), + parts.get('account'), + parts.get('container_name')) + + def log_line_parser(self, raw_log): + '''given a raw access log line, return a dict of the good parts''' + d = {} + try: + (unused, + server, + client_ip, + lb_ip, + timestamp, + method, + request, + http_version, + code, + referrer, + user_agent, + auth_token, + bytes_in, + bytes_out, + etag, + trans_id, + headers, + processing_time) = (unquote(x) for x in + raw_log[16:].split(' ')[:18]) + except ValueError: + self.logger.debug(_('Bad line data: %s') % repr(raw_log)) + return {} + if server != self.server_name: + # incorrect server name in log line + self.logger.debug(_('Bad server name: found "%(found)s" ' \ + 'expected "%(expected)s"') % + {'found': server, 'expected': self.server_name}) + return {} + try: + (version, account, container_name, object_name) = \ + split_path(request, 2, 4, True) + except ValueError, e: + self.logger.debug(_('Invalid path: %(error)s from data: %(log)s') % + {'error': e, 'log': repr(raw_log)}) + return {} + if container_name is not None: + container_name = container_name.split('?', 1)[0] + if object_name is not None: + object_name = object_name.split('?', 1)[0] + account = account.split('?', 1)[0] + d['client_ip'] = client_ip + d['lb_ip'] = lb_ip + d['method'] = method + d['request'] = request + d['http_version'] = http_version + d['code'] = code + d['referrer'] = referrer + d['user_agent'] = user_agent + d['auth_token'] = auth_token + d['bytes_in'] = bytes_in + d['bytes_out'] = bytes_out + d['etag'] = etag + d['trans_id'] = trans_id + d['processing_time'] = processing_time + day, month, year, hour, minute, second = timestamp.split('/') + d['day'] = day + month = ('%02s' % month_map.index(month)).replace(' ', '0') + d['month'] = month + d['year'] = year + d['hour'] = hour + d['minute'] = minute + d['second'] = second + d['tz'] = '+0000' + d['account'] = account + d['container_name'] = container_name + d['object_name'] = object_name + d['bytes_out'] = int(d['bytes_out'].replace('-', '0')) + d['bytes_in'] = int(d['bytes_in'].replace('-', '0')) + d['code'] = int(d['code']) + return d + + +class AccessLogDeliveryDaemon(Daemon): + """ + Processes access (proxy) logs to split them up by account and deliver the + split logs to their respective accounts. + """ + + def __init__(self, c): + self.conf = c + super(AccessLogDeliveryDaemon, self).__init__(c) + self.logger = get_logger(c, log_route='access-log-delivery') + self.log_processor = AccessLogDelivery(c, self.logger) + self.lookback_hours = int(c.get('lookback_hours', '120')) + self.lookback_window = int(c.get('lookback_window', + str(self.lookback_hours))) + self.log_delivery_account = c['swift_account'] + self.log_delivery_container = c.get('container_name', + 'access_log_delivery_data') + self.source_account = c['log_source_account'] + self.source_container = c.get('log_source_container_name', 'log_data') + self.target_container = c.get('target_container', '.ACCESS_LOGS') + self.frequency = int(c.get('frequency', '3600')) + self.processed_files_object_name = c.get('processed_files_object_name', + 'processed_files.pickle.gz') + self.worker_count = int(c.get('worker_count', '1')) + self.working_dir = c.get('working_dir', '/tmp/swift') + if self.working_dir.endswith('/'): + self.working_dir = self.working_dir.rstrip('/') + + def run_once(self, *a, **kw): + self.logger.info(_("Beginning log processing")) + start = time.time() + if self.lookback_hours == 0: + lookback_start = None + lookback_end = None + else: + delta_hours = datetime.timedelta(hours=self.lookback_hours) + lookback_start = datetime.datetime.now() - delta_hours + lookback_start = lookback_start.strftime('%Y%m%d%H') + if self.lookback_window == 0: + lookback_end = None + else: + delta_window = datetime.timedelta(hours=self.lookback_window) + lookback_end = datetime.datetime.now() - \ + delta_hours + \ + delta_window + lookback_end = lookback_end.strftime('%Y%m%d%H') + self.logger.debug('lookback_start: %s' % lookback_start) + self.logger.debug('lookback_end: %s' % lookback_end) + try: + # Note: this file (or data set) will grow without bound. + # In practice, if it becomes a problem (say, after many months of + # running), one could manually prune the file to remove older + # entries. Automatically pruning on each run could be dangerous. + # There is not a good way to determine when an old entry should be + # pruned (lookback_hours could be set to anything and could change) + processed_files_stream = self.log_processor.get_object_data( + self.log_delivery_account, + self.log_delivery_container, + self.processed_files_object_name, + compressed=True) + buf = '\n'.join(x for x in processed_files_stream) + if buf: + already_processed_files = cPickle.loads(buf) + else: + already_processed_files = set() + except BadFileDownload, err: + if err.status_code == 404: + already_processed_files = set() + else: + self.logger.error(_('Access log delivery unable to load list ' + 'of already processed log files')) + return + self.logger.debug(_('found %d processed files') % \ + len(already_processed_files)) + logs_to_process = self.log_processor.get_container_listing( + self.source_account, + self.source_container, + lookback_start, + lookback_end, + already_processed_files) + self.logger.info(_('loaded %d files to process') % + len(logs_to_process)) + if not logs_to_process: + self.logger.info(_("Log processing done (%0.2f minutes)") % + ((time.time() - start) / 60)) + return + + logs_to_process = [(self.source_account, self.source_container, x) + for x in logs_to_process] + + # map + processor_args = (self.conf, self.logger) + results = multiprocess_collate(AccessLogDelivery, processor_args, + 'process_one_file', logs_to_process, + self.worker_count) + + #reduce + processed_files = already_processed_files + files_to_upload = set() + for item, data in results: + a, c, o = item + processed_files.add(o) + if data: + files_to_upload.update(data) + len_working_dir = len(self.working_dir) + 1 # +1 for the trailing '/' + for filename in files_to_upload: + target_name = filename[len_working_dir:] + account, target_name = target_name.split('/', 1) + some_id = uuid4().hex + target_name = '%s/%s.log.gz' % (target_name, some_id) + success = self.log_processor.internal_proxy.upload_file(filename, + account, + self.target_container, + target_name) + if success: + os.unlink(filename) + self.logger.debug('Uploaded %s to account %s' % (filename, + account)) + else: + self.logger.error('Could not upload %s to account %s' % ( + filename, account)) + + # cleanup + s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL) + f = cStringIO.StringIO(s) + success = self.log_processor.internal_proxy.upload_file(f, + self.log_delivery_account, + self.log_delivery_container, + self.processed_files_object_name) + if not success: + self.logger.error('Error uploading updated processed files log') + self.logger.info(_("Log processing done (%0.2f minutes)") % + ((time.time() - start) / 60)) + + def run_forever(self, *a, **kw): + while True: + start_time = time.time() + self.run_once() + end_time = time.time() + # don't run more than once every self.frequency seconds + sleep_time = self.frequency - (end_time - start_time) + time.sleep(max(0, sleep_time)) diff --git a/slogging/internal_proxy.py b/slogging/internal_proxy.py index 10d8977..354cce6 100644 --- a/slogging/internal_proxy.py +++ b/slogging/internal_proxy.py @@ -16,6 +16,7 @@ import webob from urllib import quote, unquote from json import loads as json_loads +import copy from slogging.compressing_file_reader import CompressingFileReader from swift.proxy.server import BaseApplication @@ -208,3 +209,13 @@ class InternalProxy(object): if resp.status_int == 204: return [] return json_loads(resp.body) + + def get_container_metadata(self, account, container): + path = '/v1/%s/%s/' % (account, container) + req = webob.Request.blank(path, environ={'REQUEST_METHOD': 'HEAD'}) + resp = self._handle_request(req) + out = {} + for k, v in resp.headers.iteritems(): + if k.lower().startswith('x-container-meta-'): + out[k] = v + return out diff --git a/slogging/log_common.py b/slogging/log_common.py new file mode 100644 index 0000000..0f7f43d --- /dev/null +++ b/slogging/log_common.py @@ -0,0 +1,240 @@ +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import multiprocessing +import Queue +import datetime +import zlib +import time +from paste.deploy import appconfig +from contextlib import contextmanager +import os +import errno +import fcntl + +from swift.common.memcached import MemcacheRing +from slogging.internal_proxy import InternalProxy +from swift.common.utils import get_logger +from swift.common.exceptions import ChunkReadTimeout, LockTimeout + + +class BadFileDownload(Exception): + def __init__(self, status_code=None): + self.status_code = status_code + + +class LogProcessorCommon(object): + + def __init__(self, conf, logger, log_route='log-processor'): + if isinstance(logger, tuple): + self.logger = get_logger(*logger, log_route=log_route) + else: + self.logger = logger + self.memcache = MemcacheRing([s.strip() for s in + conf.get('memcache_servers', '').split(',') + if s.strip()]) + self.conf = conf + self._internal_proxy = None + + @property + def internal_proxy(self): + if self._internal_proxy is None: + stats_conf = self.conf.get('log-processor', {}) + proxy_server_conf_loc = stats_conf.get('proxy_server_conf', + '/etc/swift/proxy-server.conf') + proxy_server_conf = appconfig( + 'config:%s' % proxy_server_conf_loc, + name='proxy-server') + self._internal_proxy = InternalProxy(proxy_server_conf, + self.logger, + retries=3) + return self._internal_proxy + + def get_object_data(self, swift_account, container_name, object_name, + compressed=False): + '''reads an object and yields its lines''' + code, o = self.internal_proxy.get_object(swift_account, container_name, + object_name) + if code < 200 or code >= 300: + raise BadFileDownload(code) + last_part = '' + last_compressed_part = '' + # magic in the following zlib.decompressobj argument is courtesy of + # Python decompressing gzip chunk-by-chunk + # http://stackoverflow.com/questions/2423866 + d = zlib.decompressobj(16 + zlib.MAX_WBITS) + try: + for chunk in o: + if compressed: + try: + chunk = d.decompress(chunk) + except zlib.error: + self.logger.debug(_('Bad compressed data for %s') + % '/'.join((swift_account, container_name, + object_name))) + raise BadFileDownload() # bad compressed data + parts = chunk.split('\n') + parts[0] = last_part + parts[0] + for part in parts[:-1]: + yield part + last_part = parts[-1] + if last_part: + yield last_part + except ChunkReadTimeout: + raise BadFileDownload() + + def get_container_listing(self, swift_account, container_name, + start_date=None, end_date=None, + listing_filter=None): + ''' + Get a container listing, filtered by start_date, end_date, and + listing_filter. Dates, if given, must be in YYYYMMDDHH format + ''' + search_key = None + if start_date is not None: + try: + parsed_date = time.strptime(start_date, '%Y%m%d%H') + except ValueError: + pass + else: + year = '%04d' % parsed_date.tm_year + month = '%02d' % parsed_date.tm_mon + day = '%02d' % parsed_date.tm_mday + hour = '%02d' % parsed_date.tm_hour + search_key = '/'.join([year, month, day, hour]) + end_key = None + if end_date is not None: + try: + parsed_date = time.strptime(end_date, '%Y%m%d%H') + except ValueError: + pass + else: + year = '%04d' % parsed_date.tm_year + month = '%02d' % parsed_date.tm_mon + day = '%02d' % parsed_date.tm_mday + # Since the end_marker filters by <, we need to add something + # to make sure we get all the data under the last hour. Adding + # one to the hour should be all-inclusive. + hour = '%02d' % (parsed_date.tm_hour + 1) + end_key = '/'.join([year, month, day, hour]) + container_listing = self.internal_proxy.get_container_list( + swift_account, + container_name, + marker=search_key, + end_marker=end_key) + results = [] + if listing_filter is None: + listing_filter = set() + for item in container_listing: + name = item['name'] + if name not in listing_filter: + results.append(name) + return results + + +def multiprocess_collate(processor_klass, processor_args, processor_method, + items_to_process, worker_count, logger=None): + ''' + ''' + results = [] + in_queue = multiprocessing.Queue() + out_queue = multiprocessing.Queue() + for _junk in range(worker_count): + p = multiprocessing.Process(target=collate_worker, + args=(processor_klass, + processor_args, + processor_method, + in_queue, + out_queue)) + p.start() + results.append(p) + for x in items_to_process: + in_queue.put(x) + for _junk in range(worker_count): + in_queue.put(None) # tell the worker to end + while True: + try: + item, data = out_queue.get_nowait() + except Queue.Empty: + time.sleep(.01) + else: + if isinstance(data, Exception): + if logger: + logger.exception(data) + else: + yield item, data + if not any(r.is_alive() for r in results) and out_queue.empty(): + # all the workers are done and nothing is in the queue + break + + +def collate_worker(processor_klass, processor_args, processor_method, in_queue, + out_queue): + '''worker process for multiprocess_collate''' + p = processor_klass(*processor_args) + while True: + item = in_queue.get() + if item is None: + # no more work to process + break + try: + method = getattr(p, processor_method) + except AttributeError: + return + try: + ret = method(*item) + except Exception, err: + ret = err + out_queue.put((item, ret)) + + +@contextmanager +def lock_file(filename, timeout=10, append=False, unlink=True): + """ + Context manager that acquires a lock on a file. This will block until + the lock can be acquired, or the timeout time has expired (whichever occurs + first). + + :param filename: file to be locked + :param timeout: timeout (in seconds) + :param append: True if file should be opened in append mode + :param unlink: True if the file should be unlinked at the end + """ + flags = os.O_CREAT | os.O_RDWR + if append: + flags |= os.O_APPEND + fd = os.open(filename, flags) + try: + with LockTimeout(timeout, filename): + while True: + try: + fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + break + except IOError, err: + if err.errno != errno.EAGAIN: + raise + sleep(0.01) + mode = 'r+' + if append: + mode = 'a+' + file_obj = os.fdopen(fd, mode) + yield file_obj + finally: + try: + file_obj.close() + except UnboundLocalError: + pass # may have not actually opened the file + if unlink: + os.unlink(filename) diff --git a/slogging/log_processor.py b/slogging/log_processor.py index dd13d56..eca0281 100644 --- a/slogging/log_processor.py +++ b/slogging/log_processor.py @@ -26,30 +26,19 @@ import cPickle import hashlib from slogging.internal_proxy import InternalProxy -from swift.common.exceptions import ChunkReadTimeout -from swift.common.utils import get_logger, readconf, TRUE_VALUES +from swift.common.utils import get_logger, readconf from swift.common.daemon import Daemon +from slogging.log_common import LogProcessorCommon, multiprocess_collate, \ + BadFileDownload now = datetime.datetime.now -class BadFileDownload(Exception): - - def __init__(self, status_code=None): - self.status_code = status_code - - -class LogProcessor(object): +class LogProcessor(LogProcessorCommon): """Load plugins, process logs""" def __init__(self, conf, logger): - if isinstance(logger, tuple): - self.logger = get_logger(*logger, log_route='log-processor') - else: - self.logger = logger - - self.conf = conf - self._internal_proxy = None + super(LogProcessor, self).__init__(conf, logger, 'log-processor') # load the processing plugins self.plugins = {} @@ -57,9 +46,6 @@ class LogProcessor(object): for section in (x for x in conf if x.startswith(plugin_prefix)): plugin_name = section[len(plugin_prefix):] plugin_conf = conf.get(section, {}) - if plugin_conf.get('processable', 'true').lower() not in \ - TRUE_VALUES: - continue self.plugins[plugin_name] = plugin_conf class_path = self.plugins[plugin_name]['class_path'] import_target, class_name = class_path.rsplit('.', 1) @@ -68,20 +54,6 @@ class LogProcessor(object): self.plugins[plugin_name]['instance'] = klass(plugin_conf) self.logger.debug(_('Loaded plugin "%s"') % plugin_name) - @property - def internal_proxy(self): - if self._internal_proxy is None: - stats_conf = self.conf.get('log-processor', {}) - proxy_server_conf_loc = stats_conf.get('proxy_server_conf', - '/etc/swift/proxy-server.conf') - proxy_server_conf = appconfig( - 'config:%s' % proxy_server_conf_loc, - name='proxy-server') - self._internal_proxy = InternalProxy(proxy_server_conf, - self.logger, - retries=3) - return self._internal_proxy - def process_one_file(self, plugin_name, account, container, object_name): self.logger.info(_('Processing %(obj)s with plugin "%(plugin)s"') % {'obj': '/'.join((account, container, object_name)), @@ -114,87 +86,6 @@ class LogProcessor(object): total_list.append(x) return total_list - def get_container_listing(self, swift_account, container_name, - start_date=None, end_date=None, - listing_filter=None): - ''' - Get a container listing, filtered by start_date, end_date, and - listing_filter. Dates, if given, must be in YYYYMMDDHH format - ''' - search_key = None - if start_date is not None: - try: - parsed_date = time.strptime(start_date, '%Y%m%d%H') - except ValueError: - pass - else: - year = '%04d' % parsed_date.tm_year - month = '%02d' % parsed_date.tm_mon - day = '%02d' % parsed_date.tm_mday - hour = '%02d' % parsed_date.tm_hour - search_key = '/'.join([year, month, day, hour]) - end_key = None - if end_date is not None: - try: - parsed_date = time.strptime(end_date, '%Y%m%d%H') - except ValueError: - pass - else: - year = '%04d' % parsed_date.tm_year - month = '%02d' % parsed_date.tm_mon - day = '%02d' % parsed_date.tm_mday - # Since the end_marker filters by <, we need to add something - # to make sure we get all the data under the last hour. Adding - # one to the hour should be all-inclusive. - hour = '%02d' % (parsed_date.tm_hour + 1) - end_key = '/'.join([year, month, day, hour]) - container_listing = self.internal_proxy.get_container_list( - swift_account, - container_name, - marker=search_key, - end_marker=end_key) - results = [] - if listing_filter is None: - listing_filter = set() - for item in container_listing: - name = item['name'] - if name not in listing_filter: - results.append(name) - return results - - def get_object_data(self, swift_account, container_name, object_name, - compressed=False): - '''reads an object and yields its lines''' - code, o = self.internal_proxy.get_object(swift_account, container_name, - object_name) - if code < 200 or code >= 300: - raise BadFileDownload(code) - last_part = '' - last_compressed_part = '' - # magic in the following zlib.decompressobj argument is courtesy of - # Python decompressing gzip chunk-by-chunk - # http://stackoverflow.com/questions/2423866 - d = zlib.decompressobj(16 + zlib.MAX_WBITS) - try: - for chunk in o: - if compressed: - try: - chunk = d.decompress(chunk) - except zlib.error: - self.logger.debug(_('Bad compressed data for %s') - % '/'.join((swift_account, container_name, - object_name))) - raise BadFileDownload() # bad compressed data - parts = chunk.split('\n') - parts[0] = last_part + parts[0] - for part in parts[:-1]: - yield part - last_part = parts[-1] - if last_part: - yield last_part - except ChunkReadTimeout: - raise BadFileDownload() - def generate_keylist_mapping(self): keylist = {} for plugin in self.plugins: @@ -471,8 +362,9 @@ class LogProcessorDaemon(Daemon): # map processor_args = (self.total_conf, self.logger) - results = multiprocess_collate(processor_args, logs_to_process, - self.worker_count) + results = multiprocess_collate(LogProcessor, processor_args, + 'process_one_file', logs_to_process, + self.worker_count) # reduce aggr_data = self.get_aggregate_data(processed_files, results) @@ -529,53 +421,3 @@ class LogProcessorDaemon(Daemon): self.logger.info(_("Log processing done (%0.2f minutes)") % ((time.time() - start) / 60)) - - -def multiprocess_collate(processor_args, logs_to_process, worker_count): - ''' - yield hourly data from logs_to_process - Every item that this function yields will be added to the processed files - list. - ''' - results = [] - in_queue = multiprocessing.Queue() - out_queue = multiprocessing.Queue() - for _junk in range(worker_count): - p = multiprocessing.Process(target=collate_worker, - args=(processor_args, - in_queue, - out_queue)) - p.start() - results.append(p) - for x in logs_to_process: - in_queue.put(x) - for _junk in range(worker_count): - in_queue.put(None) # tell the worker to end - while True: - try: - item, data = out_queue.get_nowait() - except Queue.Empty: - time.sleep(.01) - else: - if not isinstance(data, Exception): - yield item, data - if not any(r.is_alive() for r in results) and out_queue.empty(): - # all the workers are done and nothing is in the queue - break - - -def collate_worker(processor_args, in_queue, out_queue): - '''worker process for multiprocess_collate''' - p = LogProcessor(*processor_args) - while True: - item = in_queue.get() - if item is None: - # no more work to process - break - try: - ret = p.process_one_file(*item) - except Exception, err: - item_string = '/'.join(item[1:]) - p.logger.exception("Unable to process file '%s'" % (item_string)) - ret = err - out_queue.put((item, ret)) diff --git a/test_slogging/unit/test_access_log_delivery.py b/test_slogging/unit/test_access_log_delivery.py new file mode 100644 index 0000000..f10be92 --- /dev/null +++ b/test_slogging/unit/test_access_log_delivery.py @@ -0,0 +1,218 @@ +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# TODO: Tests + +import unittest +from contextlib import contextmanager + +from test.unit import temptree +from slogging import access_log_delivery + + +class DumbLogger(object): + def __getattr__(self, n): + return self.foo + + def foo(self, *a, **kw): + pass + + +class FakeMemcache(object): + + def __init__(self): + self.store = {} + + def get(self, key): + return self.store.get(key) + + def keys(self): + return self.store.keys() + + def set(self, key, value, timeout=0): + self.store[key] = value + return True + + def incr(self, key, timeout=0): + self.store[key] = self.store.setdefault(key, 0) + 1 + return self.store[key] + + @contextmanager + def soft_lock(self, key, timeout=0, retries=5): + yield True + + def delete(self, key): + try: + del self.store[key] + except Exception: + pass + return True + + +class TestAccessLogDelivery(unittest.TestCase): + + def test_log_line_parser_query_args(self): + p = access_log_delivery.AccessLogDelivery({}, DumbLogger()) + log_line = [str(x) for x in range(18)] + log_line[1] = 'proxy-server' + log_line[4] = '1/Jan/3/4/5/6' + log_line[6] = '/v1/a/c/o?foo' + log_line = 'x' * 16 + ' '.join(log_line) + res = p.log_line_parser(log_line) + expected = {'code': 8, 'processing_time': '17', 'auth_token': '11', + 'month': '01', 'second': '6', 'year': '3', 'tz': '+0000', + 'http_version': '7', 'object_name': 'o', 'etag': '14', + 'method': '5', 'trans_id': '15', 'client_ip': '2', + 'bytes_out': 13, 'container_name': 'c', 'day': '1', + 'minute': '5', 'account': 'a', 'hour': '4', + 'referrer': '9', 'request': '/v1/a/c/o?foo', + 'user_agent': '10', 'bytes_in': 12, 'lb_ip': '3'} + self.assertEquals(res, expected) + + def test_log_line_parser_field_count(self): + p = access_log_delivery.AccessLogDelivery({}, DumbLogger()) + # too few fields + log_line = [str(x) for x in range(17)] + log_line[1] = 'proxy-server' + log_line[4] = '1/Jan/3/4/5/6' + log_line[6] = '/v1/a/c/o' + log_line = 'x' * 16 + ' '.join(log_line) + res = p.log_line_parser(log_line) + expected = {} + self.assertEquals(res, expected) + # right amount of fields + log_line = [str(x) for x in range(18)] + log_line[1] = 'proxy-server' + log_line[4] = '1/Jan/3/4/5/6' + log_line[6] = '/v1/a/c/o' + log_line = 'x' * 16 + ' '.join(log_line) + res = p.log_line_parser(log_line) + expected = {'code': 8, 'processing_time': '17', 'auth_token': '11', + 'month': '01', 'second': '6', 'year': '3', 'tz': '+0000', + 'http_version': '7', 'object_name': 'o', 'etag': '14', + 'method': '5', 'trans_id': '15', 'client_ip': '2', + 'bytes_out': 13, 'container_name': 'c', 'day': '1', + 'minute': '5', 'account': 'a', 'hour': '4', + 'referrer': '9', 'request': '/v1/a/c/o', + 'user_agent': '10', 'bytes_in': 12, 'lb_ip': '3'} + self.assertEquals(res, expected) + # too many fields + log_line = [str(x) for x in range(19)] + log_line[1] = 'proxy-server' + log_line[4] = '1/Jan/3/4/5/6' + log_line[6] = '/v1/a/c/o' + log_line = 'x' * 16 + ' '.join(log_line) + res = p.log_line_parser(log_line) + expected = {'code': 8, 'processing_time': '17', 'auth_token': '11', + 'month': '01', 'second': '6', 'year': '3', 'tz': '+0000', + 'http_version': '7', 'object_name': 'o', 'etag': '14', + 'method': '5', 'trans_id': '15', 'client_ip': '2', + 'bytes_out': 13, 'container_name': 'c', 'day': '1', + 'minute': '5', 'account': 'a', 'hour': '4', + 'referrer': '9', 'request': '/v1/a/c/o', + 'user_agent': '10', 'bytes_in': 12, 'lb_ip': '3'} + self.assertEquals(res, expected) + + def test_make_clf_from_parts(self): + p = access_log_delivery.AccessLogDelivery({}, DumbLogger()) + log_line = [str(x) for x in range(18)] + log_line[1] = 'proxy-server' + log_line[4] = '1/Jan/3/4/5/6' + log_line[6] = '/v1/a/c/o?foo' + log_line = 'x' * 16 + ' '.join(log_line) + parts = p.log_line_parser(log_line) + clf = access_log_delivery.make_clf_from_parts(parts) + expect = '2 - - [1/01/3:4:5:6 +0000] "5 /v1/a/c/o?foo 7" 8 13 "9" "10"' + self.assertEquals(clf, expect) + + def test_convert_log_line(self): + p = access_log_delivery.AccessLogDelivery({}, DumbLogger()) + log_line = [str(x) for x in range(18)] + log_line[1] = 'proxy-server' + log_line[4] = '1/Jan/3/4/5/6' + log_line[6] = '/v1/a/c/o?foo' + log_line = 'x' * 16 + ' '.join(log_line) + res = p.convert_log_line(log_line) + expected = ( + '2 - - [1/01/3:4:5:6 +0000] "5 /v1/a/c/o?foo 7" 8 13 "9" "10"', + 'a', + 'c') + self.assertEquals(res, expected) + + def test_get_container_save_log_flag(self): + p = access_log_delivery.AccessLogDelivery({}, DumbLogger()) + + def my_get_metadata_true(*a, **kw): + return {p.metadata_key: 'yes'} + + def my_get_metadata_false(*a, **kw): + return {p.metadata_key: 'no'} + p.internal_proxy.get_container_metadata = my_get_metadata_false + p.memcache = FakeMemcache() + res = p.get_container_save_log_flag('a', 'c1') + expected = False + self.assertEquals(res, expected) + p.internal_proxy.get_container_metadata = my_get_metadata_true + p.memcache = FakeMemcache() + res = p.get_container_save_log_flag('a', 'c2') + expected = True + self.assertEquals(res, expected) + + def test_process_one_file(self): + with temptree([]) as t: + conf = {'working_dir': t} + p = access_log_delivery.AccessLogDelivery(conf, DumbLogger()) + + def my_get_object_data(*a, **kw): + all_lines = [] + log_line = [str(x) for x in range(18)] + log_line[1] = 'proxy-server' + log_line[4] = '1/Jan/3/4/5/6' + log_line[6] = '/v1/a/c/o?foo' + yield 'x' * 16 + ' '.join(log_line) + + log_line = [str(x) for x in range(18)] + log_line[1] = 'proxy-server' + log_line[4] = '1/Jan/3/4/5/6' + log_line[6] = '/v1/a/c/o' + yield 'x' * 16 + ' '.join(log_line) + + log_line = [str(x) for x in range(18)] + log_line[1] = 'proxy-server' + log_line[4] = '1/Jan/3/4/5/6' + log_line[6] = '/v1/a2/c2/o2' + yield 'x' * 16 + ' '.join(log_line) + + def my_get_container_save_log_flag(*a, **kw): + return True + p.get_object_data = my_get_object_data + p.get_container_save_log_flag = my_get_container_save_log_flag + res = p.process_one_file('a', 'c', '2011/03/14/12/hash') + expected = ['%s/a2/c2/2011/03/14/12' % t, + '%s/a/c/2011/03/14/12' % t] + self.assertEquals(res, set(expected)) + lines = [p.convert_log_line(x)[0] for x in my_get_object_data()] + with open(expected[0], 'rb') as f: + raw = f.read() + res = '\n'.join(lines[2:]) + '\n' + self.assertEquals(res, raw) + with open(expected[1], 'rb') as f: + raw = f.read() + res = '\n'.join(lines[:2]) + '\n' + self.assertEquals(res, raw) + + +if __name__ == '__main__': + unittest.main() diff --git a/test_slogging/unit/test_access_processor.py b/test_slogging/unit/test_access_processor.py index 4a0fd30..9b5418d 100644 --- a/test_slogging/unit/test_access_processor.py +++ b/test_slogging/unit/test_access_processor.py @@ -30,7 +30,7 @@ class TestAccessProcessor(unittest.TestCase): for param in access_processor.LISTING_PARAMS: query += '&%s=blah' % param log_line[6] = '/v1/a/c/o?%s' % query - log_line = 'x'*16 + ' '.join(log_line) + log_line = 'x' * 16 + ' '.join(log_line) res = p.log_line_parser(log_line) expected = {'code': 8, 'processing_time': '17', 'auth_token': '11', 'month': '01', 'second': '6', 'year': '3', 'tz': '+0000', @@ -44,7 +44,7 @@ class TestAccessProcessor(unittest.TestCase): expected[param] = 1 expected['query'] = query self.assertEquals(res, expected) - + def test_log_line_parser_field_count(self): p = access_processor.AccessLogProcessor({}) # too few fields @@ -52,7 +52,7 @@ class TestAccessProcessor(unittest.TestCase): log_line[1] = 'proxy-server' log_line[4] = '1/Jan/3/4/5/6' log_line[6] = '/v1/a/c/o' - log_line = 'x'*16 + ' '.join(log_line) + log_line = 'x' * 16 + ' '.join(log_line) res = p.log_line_parser(log_line) expected = {} self.assertEquals(res, expected) @@ -61,7 +61,7 @@ class TestAccessProcessor(unittest.TestCase): log_line[1] = 'proxy-server' log_line[4] = '1/Jan/3/4/5/6' log_line[6] = '/v1/a/c/o' - log_line = 'x'*16 + ' '.join(log_line) + log_line = 'x' * 16 + ' '.join(log_line) res = p.log_line_parser(log_line) expected = {'code': 8, 'processing_time': '17', 'auth_token': '11', 'month': '01', 'second': '6', 'year': '3', 'tz': '+0000', @@ -77,7 +77,7 @@ class TestAccessProcessor(unittest.TestCase): log_line[1] = 'proxy-server' log_line[4] = '1/Jan/3/4/5/6' log_line[6] = '/v1/a/c/o' - log_line = 'x'*16 + ' '.join(log_line) + log_line = 'x' * 16 + ' '.join(log_line) res = p.log_line_parser(log_line) expected = {'code': 8, 'processing_time': '17', 'auth_token': '11', 'month': '01', 'second': '6', 'year': '3', 'tz': '+0000', diff --git a/test_slogging/unit/test_log_processor.py b/test_slogging/unit/test_log_processor.py index e2df63b..d5d49a6 100644 --- a/test_slogging/unit/test_log_processor.py +++ b/test_slogging/unit/test_log_processor.py @@ -23,6 +23,7 @@ import time from slogging import internal_proxy from slogging import log_processor +from slogging import log_common from swift.common.exceptions import ChunkReadTimeout @@ -119,10 +120,10 @@ use = egg:swift#proxy def test_access_log_line_parser(self): access_proxy_config = self.proxy_config.copy() access_proxy_config.update({ - 'log-processor-access': { - 'source_filename_format': '%Y%m%d%H*', - 'class_path': - 'slogging.access_processor.AccessLogProcessor' + 'log-processor-access': { + 'source_filename_format': '%Y%m%d%H*', + 'class_path': + 'slogging.access_processor.AccessLogProcessor' }}) p = log_processor.LogProcessor(access_proxy_config, DumbLogger()) result = p.plugins['access']['instance'].log_line_parser( @@ -157,10 +158,10 @@ use = egg:swift#proxy def test_process_one_access_file(self): access_proxy_config = self.proxy_config.copy() access_proxy_config.update({ - 'log-processor-access': { - 'source_filename_format': '%Y%m%d%H*', - 'class_path': - 'slogging.access_processor.AccessLogProcessor' + 'log-processor-access': { + 'source_filename_format': '%Y%m%d%H*', + 'class_path': + 'slogging.access_processor.AccessLogProcessor' }}) p = log_processor.LogProcessor(access_proxy_config, DumbLogger()) @@ -182,14 +183,14 @@ use = egg:swift#proxy def test_process_one_access_file_error(self): access_proxy_config = self.proxy_config.copy() access_proxy_config.update({ - 'log-processor-access': { - 'source_filename_format': '%Y%m%d%H*', - 'class_path': - 'slogging.access_processor.AccessLogProcessor' + 'log-processor-access': { + 'source_filename_format': '%Y%m%d%H*', + 'class_path': + 'slogging.access_processor.AccessLogProcessor' }}) p = log_processor.LogProcessor(access_proxy_config, DumbLogger()) p._internal_proxy = DumbInternalProxy(code=500) - self.assertRaises(log_processor.BadFileDownload, p.process_one_file, + self.assertRaises(log_common.BadFileDownload, p.process_one_file, 'access', 'a', 'c', 'o') def test_get_container_listing(self): @@ -230,13 +231,13 @@ use = egg:swift#proxy p = log_processor.LogProcessor(self.proxy_config, DumbLogger()) p._internal_proxy = DumbInternalProxy(code=500) result = p.get_object_data('a', 'c', 'o') - self.assertRaises(log_processor.BadFileDownload, list, result) + self.assertRaises(log_common.BadFileDownload, list, result) p._internal_proxy = DumbInternalProxy(bad_compressed=True) result = p.get_object_data('a', 'c', 'o.gz', True) - self.assertRaises(log_processor.BadFileDownload, list, result) + self.assertRaises(log_common.BadFileDownload, list, result) p._internal_proxy = DumbInternalProxy(timeout=True) result = p.get_object_data('a', 'c', 'o') - self.assertRaises(log_processor.BadFileDownload, list, result) + self.assertRaises(log_common.BadFileDownload, list, result) def test_get_stat_totals(self): stats_proxy_config = self.proxy_config.copy() @@ -284,10 +285,10 @@ use = egg:swift#proxy def test_access_keylist_mapping_format(self): proxy_config = self.proxy_config.copy() proxy_config.update({ - 'log-processor-access': { - 'source_filename_format': '%Y%m%d%H*', - 'class_path': - 'slogging.access_processor.AccessLogProcessor' + 'log-processor-access': { + 'source_filename_format': '%Y%m%d%H*', + 'class_path': + 'slogging.access_processor.AccessLogProcessor' }}) p = log_processor.LogProcessor(proxy_config, DumbLogger()) mapping = p.generate_keylist_mapping() @@ -331,7 +332,9 @@ use = egg:swift#proxy work_request = ('access', 'a', 'c', 'o') q_in.put(work_request) q_in.put(None) - log_processor.collate_worker(processor_args, q_in, q_out) + processor_klass = log_processor.LogProcessor + log_common.collate_worker(processor_klass, processor_args, + 'process_one_file', q_in, q_out) item, ret = q_out.get() self.assertEquals(item, work_request) expected = {('acct', '2010', '07', '09', '04'): @@ -367,11 +370,13 @@ use = egg:swift#proxy work_request = ('access', 'a', 'c', 'o') q_in.put(work_request) q_in.put(None) - log_processor.collate_worker(processor_args, q_in, q_out) + processor_klass = log_processor.LogProcessor + log_common.collate_worker(processor_klass, processor_args, + 'process_one_file', q_in, q_out) item, ret = q_out.get() self.assertEquals(item, work_request) # these only work for Py2.7+ - #self.assertIsInstance(ret, log_processor.BadFileDownload) + #self.assertIsInstance(ret, log_common.BadFileDownload) self.assertTrue(isinstance(ret, Exception)) finally: log_processor.LogProcessor.get_object_data = orig_get_object_data @@ -394,7 +399,10 @@ use = egg:swift#proxy processor_args = (proxy_config, DumbLogger()) item = ('access', 'a', 'c', 'o') logs_to_process = [item] - results = log_processor.multiprocess_collate(processor_args, + processor_klass = log_processor.LogProcessor + results = log_processor.multiprocess_collate(processor_klass, + processor_args, + 'process_one_file', logs_to_process, 1) results = list(results) @@ -414,7 +422,7 @@ use = egg:swift#proxy def test_multiprocess_collate_errors(self): def get_object_data(*a, **kw): - raise log_processor.BadFileDownload() + raise log_common.BadFileDownload() orig_get_object_data = log_processor.LogProcessor.get_object_data try: log_processor.LogProcessor.get_object_data = get_object_data @@ -428,9 +436,12 @@ use = egg:swift#proxy processor_args = (proxy_config, DumbLogger()) item = ('access', 'a', 'c', 'o') logs_to_process = [item] - results = log_processor.multiprocess_collate(processor_args, - logs_to_process, - 1) + processor_klass = log_processor.LogProcessor + results = log_common.multiprocess_collate(processor_klass, + processor_args, + 'process_one_file', + logs_to_process, + 1) results = list(results) expected = [] self.assertEquals(results, expected) @@ -495,7 +506,7 @@ class TestLogProcessorDaemon(unittest.TestCase): def test_get_processed_files_list_bad_file_downloads(self): class MockLogProcessor(): def __init__(self, status_code): - self.err = log_processor.BadFileDownload(status_code) + self.err = log_common.BadFileDownload(status_code) def get_object_data(self, *a, **k): raise self.err @@ -578,16 +589,16 @@ class TestLogProcessorDaemon(unittest.TestCase): expected_data_out = { 'acct1_time1': {'out_field1': 16, 'out_field2': 5, 'out_field3': 3, 'out_field4': 8, 'out_field5': 0, - 'out_field6': 0, 'out_field7': 0, }, + 'out_field6': 0, 'out_field7': 0}, 'acct1_time2': {'out_field1': 9, 'out_field2': 5, 'out_field3': 0, 'out_field4': 0, 'out_field5': 0, - 'out_field6': 0, 'out_field7': 0, }, + 'out_field6': 0, 'out_field7': 0}, 'acct2_time1': {'out_field1': 13, 'out_field2': 7, 'out_field3': 0, 'out_field4': 0, 'out_field5': 0, - 'out_field6': 0, 'out_field7': 0, }, + 'out_field6': 0, 'out_field7': 0}, 'acct3_time3': {'out_field1': 17, 'out_field2': 9, 'out_field3': 0, 'out_field4': 0, 'out_field5': 0, - 'out_field6': 0, 'out_field7': 0, }, + 'out_field6': 0, 'out_field7': 0}, } self.assertEquals(expected_data_out, @@ -776,8 +787,9 @@ class TestLogProcessorDaemon(unittest.TestCase): d = MockLogProcessorDaemon(self) - def mock_multiprocess_collate(processor_args, logs_to_process, - worker_count): + def mock_multiprocess_collate(processor_klass, processor_args, + processor_method, logs_to_process, + worker_count): self.assertEquals(d.total_conf, processor_args[0]) self.assertEquals(d.logger, processor_args[1])