diff --git a/bin/swift-log-uploader b/bin/swift-log-uploader new file mode 100755 index 0000000000..4ca9c17795 --- /dev/null +++ b/bin/swift-log-uploader @@ -0,0 +1,83 @@ +#!/usr/bin/python +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import signal +import sys +import time +from ConfigParser import ConfigParser + +from swift.stats.log_uploader import LogUploader +from swift.common.utils import get_logger + +if __name__ == '__main__': + if len(sys.argv) < 3: + print "Usage: swift-log-uploader CONFIG_FILE plugin" + sys.exit() + + c = ConfigParser() + if not c.read(sys.argv[1]): + print "Unable to read config file." + sys.exit(1) + + if c.has_section('log-processor'): + parser_conf = dict(c.items('log-processor')) + else: + print "Unable to find log-processor config section in %s." % sys.argv[1] + sys.exit(1) + + plugin = sys.argv[2] + section_name = 'log-processor-%s' % plugin + if c.has_section(section_name): + uploader_conf.update(dict(c.items(section_name))) + else: + print "Unable to find %s config section in %s." % (section_name, + sys.argv[1]) + sys.exit(1) + + try: + os.setsid() + except OSError: + pass + + logger = get_logger(uploader_conf, 'swift-log-uploader') + + def kill_children(*args): + signal.signal(signal.SIGTERM, signal.SIG_IGN) + os.killpg(0, signal.SIGTERM) + sys.exit() + + signal.signal(signal.SIGTERM, kill_children) + + log_dir = uploader_conf.get('log_dir', '/var/log/swift/') + swift_account = uploader_conf['swift_account'] + container_name = uploader_conf['container_name'] + source_filename_format = uploader_conf['source_filename_format'] + proxy_server_conf_loc = uploader_conf.get('proxy_server_conf', + '/etc/swift/proxy-server.conf') + try: + c = ConfigParser() + c.read(proxy_server_conf_loc) + proxy_server_conf = dict(c.items('proxy-server')) + except: + proxy_server_conf = None + uploader = LogUploader(log_dir, swift_account, container_name, + source_filename_format, proxy_server_conf, logger) + logger.info("Uploading logs") + start = time.time() + uploader.upload_all_logs() + logger.info("Uploading logs complete (%0.2f minutes)" % + ((time.time()-start)/60)) diff --git a/etc/log-processing.conf-sample b/etc/log-processing.conf-sample new file mode 100644 index 0000000000..a73f9a0bc3 --- /dev/null +++ b/etc/log-processing.conf-sample @@ -0,0 +1,26 @@ +# plugin section format is named "log-processor-" +# section "log-processor" is the generic defaults (overridden by plugins) + +[log-processor] +# working_dir = /tmp/swift/ +# proxy_server_conf = /etc/swift/proxy-server.conf +# log_facility = LOG_LOCAL0 +# log_level = INFO +# lookback_hours = 120 +# lookback_window = 120 + +[log-processor-access] +# log_dir = /var/log/swift/ +swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 +container_name = log_data +source_filename_format = %Y%m%d%H* +class_path = swift.stats.access_processor +# service ips is for client ip addresses that should be counted as servicenet +# service_ips = + +[log-processor-stats] +# log_dir = /var/log/swift/ +swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 +container_name = account_stats +source_filename_format = %Y%m%d%H* +class_path = swift.stats.stats_processor \ No newline at end of file diff --git a/swift/common/compressed_file_reader.py b/swift/common/compressed_file_reader.py new file mode 100644 index 0000000000..9eef0200c7 --- /dev/null +++ b/swift/common/compressed_file_reader.py @@ -0,0 +1,72 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import zlib +import struct + + +class CompressedFileReader(object): + ''' + Wraps a file object and provides a read method that returns gzip'd data. + + One warning: if read is called with a small value, the data returned may + be bigger than the value. In this case, the "compressed" data will be + bigger than the original data. To solve this, use a bigger read buffer. + + An example use case: + Given an uncompressed file on disk, provide a way to read compressed data + without buffering the entire file data in memory. Using this class, an + uncompressed log file could be uploaded as compressed data with chunked + transfer encoding. + + gzip header and footer code taken from the python stdlib gzip module + + :param file_obj: File object to read from + :param compresslevel: compression level + ''' + def __init__(self, file_obj, compresslevel=9): + self._f = file_obj + self._compressor = zlib.compressobj(compresslevel, + zlib.DEFLATED, + -zlib.MAX_WBITS, + zlib.DEF_MEM_LEVEL, + 0) + self.done = False + self.first = True + self.crc32 = 0 + self.total_size = 0 + + def read(self, *a, **kw): + if self.done: + return '' + x = self._f.read(*a, **kw) + if x: + self.crc32 = zlib.crc32(x, self.crc32) & 0xffffffffL + self.total_size += len(x) + compressed = self._compressor.compress(x) + if not compressed: + compressed = self._compressor.flush(zlib.Z_SYNC_FLUSH) + else: + compressed = self._compressor.flush(zlib.Z_FINISH) + crc32 = struct.pack(" 299) \ + and tries <= self.retries: + resp = self.upload_app.handle_request(self.upload_app.update_request(req)) + tries += 1 + if not (200 <= resp.status_int < 300): + return False + return True + + def get_object(self, account, container, object_name): + """ + Get object. + + :param account: account name object is in + :param container: container name object is in + :param object_name: name of object to get + :returns: iterator for object data + """ + req = webob.Request.blank('/v1/%s/%s/%s' % + (account, container, object_name), + environ={'REQUEST_METHOD': 'GET'}) + req.account = account + resp = self.upload_app.handle_request(self.upload_app.update_request(req)) + tries = 1 + while (resp.status_int < 200 or resp.status_int > 299) \ + and tries <= self.retries: + resp = self.upload_app.handle_request(self.upload_app.update_request(req)) + tries += 1 + for x in resp.app_iter: + yield x + + def create_container(self, account, container): + """ + Create container. + + :param account: account name to put the container in + :param container: container name to create + :returns: True if successful, otherwise False + """ + req = webob.Request.blank('/v1/%s/%s' % (account, container), + environ={'REQUEST_METHOD': 'PUT'}) + req.account = account + resp = self.upload_app.handle_request(self.upload_app.update_request(req)) + tries = 1 + while (resp.status_int < 200 or resp.status_int > 299) \ + and tries <= self.retries: + resp = self.upload_app.handle_request(self.upload_app.update_request(req)) + tries += 1 + return 200 <= resp.status_int < 300 + + def get_container_list(self, account, container, marker=None, limit=None, + prefix=None, delimiter=None, full_listing=True): + """ + Get container listing. + + :param account: account name for the container + :param container: container name to get the listing of + :param marker: marker query + :param limit: limit to query + :param prefix: prefix query + :param delimeter: delimeter for query + :param full_listing: if True, make enough requests to get all listings + :returns: list of objects + """ + if full_listing: + rv = [] + listing = self.get_container_list(account, container, marker, + limit, prefix, delimiter, full_listing=False) + while listing: + rv.extend(listing) + if not delimiter: + marker = listing[-1]['name'] + else: + marker = listing[-1].get('name', listing[-1].get('subdir')) + listing = self.get_container_list(account, container, marker, + limit, prefix, delimiter, full_listing=False) + return rv + path = '/v1/%s/%s' % (account, container) + qs = 'format=json' + if marker: + qs += '&marker=%s' % quote(marker) + if limit: + qs += '&limit=%d' % limit + if prefix: + qs += '&prefix=%s' % quote(prefix) + if delimiter: + qs += '&delimiter=%s' % quote(delimiter) + path += '?%s' % qs + req = webob.Request.blank(path, environ={'REQUEST_METHOD': 'GET'}) + req.account = account + resp = self.upload_app.handle_request(self.upload_app.update_request(req)) + tries = 1 + while (resp.status_int < 200 or resp.status_int > 299) \ + and tries <= self.retries: + resp = self.upload_app.handle_request(self.upload_app.update_request(req)) + tries += 1 + if resp.status_int == 204: + return [] + if 200 <= resp.status_int < 300: + return json_loads(resp.body) diff --git a/swift/stats/__init__.py b/swift/stats/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/swift/stats/log_uploader.py b/swift/stats/log_uploader.py new file mode 100644 index 0000000000..8c45d64db5 --- /dev/null +++ b/swift/stats/log_uploader.py @@ -0,0 +1,135 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import with_statement +import os +import hashlib +import time +import gzip +import glob + +from swift.common.internal_proxy import InternalProxy + +class LogUploader(object): + ''' + Given a local directory, a swift account, and a container name, LogParser + will upload all files in the local directory to the given account/container. + All but the newest files will be uploaded, and the files' md5 sum will be + computed. The hash is used to prevent duplicate data from being uploaded + multiple times in different files (ex: log lines). Since the hash is + computed, it is also used as the uploaded object's etag to ensure data + integrity. + + Note that after the file is successfully uploaded, it will be unlinked. + + The given proxy server config is used to instantiate a proxy server for + the object uploads. + ''' + + def __init__(self, log_dir, swift_account, container_name, filename_format, + proxy_server_conf, logger): + if not log_dir.endswith('/'): + log_dir = log_dir + '/' + self.log_dir = log_dir + self.swift_account = swift_account + self.container_name = container_name + self.filename_format = filename_format + self.internal_proxy = InternalProxy(proxy_server_conf, logger) + self.logger = logger + + def upload_all_logs(self): + i = [(c,self.filename_format.index(c)) for c in '%Y %m %d %H'.split()] + i.sort() + year_offset = month_offset = day_offset = hour_offset = None + for c, start in i: + if c == '%Y': + year_offset = start, start+4 + elif c == '%m': + month_offset = start, start+2 + elif c == '%d': + day_offset = start, start+2 + elif c == '%H': + hour_offset = start, start+2 + if not (year_offset and month_offset and day_offset and hour_offset): + # don't have all the parts, can't upload anything + return + glob_pattern = self.filename_format + glob_pattern = glob_pattern.replace('%Y', '????') + glob_pattern = glob_pattern.replace('%m', '??') + glob_pattern = glob_pattern.replace('%d', '??') + glob_pattern = glob_pattern.replace('%H', '??') + filelist = glob.iglob(os.path.join(self.log_dir, glob_pattern)) + current_hour = int(time.strftime('%H')) + today = int(time.strftime('%Y%m%d')) + self.internal_proxy.create_container(self.swift_account, + self.container_name) + for filename in filelist: + try: + # From the filename, we need to derive the year, month, day, + # and hour for the file. These values are used in the uploaded + # object's name, so they should be a reasonably accurate + # representation of the time for which the data in the file was + # collected. The file's last modified time is not a reliable + # representation of the data in the file. For example, an old + # log file (from hour A) may be uploaded or moved into the + # log_dir in hour Z. The file's modified time will be for hour + # Z, and therefore the object's name in the system will not + # represent the data in it. + # If the filename doesn't match the format, it shouldn't be + # uploaded. + year = filename[slice(*year_offset)] + month = filename[slice(*month_offset)] + day = filename[slice(*day_offset)] + hour = filename[slice(*hour_offset)] + except IndexError: + # unexpected filename format, move on + self.logger.error("Unexpected log: %s" % filename) + continue + if (time.time() - os.stat(filename).st_mtime) < 7200: + # don't process very new logs + self.logger.debug("Skipping log: %s (< 2 hours old)" % filename) + continue + self.upload_one_log(filename, year, month, day, hour) + + def upload_one_log(self, filename, year, month, day, hour): + if os.path.getsize(filename) == 0: + self.logger.debug("Log %s is 0 length, skipping" % filename) + return + self.logger.debug("Processing log: %s" % filename) + filehash = hashlib.md5() + already_compressed = True if filename.endswith('.gz') else False + opener = gzip.open if already_compressed else open + f = opener(filename, 'rb') + try: + for line in f: + # filter out bad lines here? + filehash.update(line) + finally: + f.close() + filehash = filehash.hexdigest() + # By adding a hash to the filename, we ensure that uploaded files + # have unique filenames and protect against uploading one file + # more than one time. By using md5, we get an etag for free. + target_filename = '/'.join([year, month, day, hour, filehash+'.gz']) + if self.internal_proxy.upload_file(filename, + self.swift_account, + self.container_name, + target_filename, + compress=(not already_compressed)): + self.logger.debug("Uploaded log %s to %s" % + (filename, target_filename)) + os.unlink(filename) + else: + self.logger.error("ERROR: Upload of log %s failed!" % filename)