initial stats system commit. includes the log uploader

This commit is contained in:
John Dickinson 2010-08-05 13:57:26 -05:00
parent d7ae75aa58
commit 46e8aecd03
6 changed files with 490 additions and 0 deletions

83
bin/swift-log-uploader Executable file
View File

@ -0,0 +1,83 @@
#!/usr/bin/python
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import signal
import sys
import time
from ConfigParser import ConfigParser
from swift.stats.log_uploader import LogUploader
from swift.common.utils import get_logger
if __name__ == '__main__':
if len(sys.argv) < 3:
print "Usage: swift-log-uploader CONFIG_FILE plugin"
sys.exit()
c = ConfigParser()
if not c.read(sys.argv[1]):
print "Unable to read config file."
sys.exit(1)
if c.has_section('log-processor'):
parser_conf = dict(c.items('log-processor'))
else:
print "Unable to find log-processor config section in %s." % sys.argv[1]
sys.exit(1)
plugin = sys.argv[2]
section_name = 'log-processor-%s' % plugin
if c.has_section(section_name):
uploader_conf.update(dict(c.items(section_name)))
else:
print "Unable to find %s config section in %s." % (section_name,
sys.argv[1])
sys.exit(1)
try:
os.setsid()
except OSError:
pass
logger = get_logger(uploader_conf, 'swift-log-uploader')
def kill_children(*args):
signal.signal(signal.SIGTERM, signal.SIG_IGN)
os.killpg(0, signal.SIGTERM)
sys.exit()
signal.signal(signal.SIGTERM, kill_children)
log_dir = uploader_conf.get('log_dir', '/var/log/swift/')
swift_account = uploader_conf['swift_account']
container_name = uploader_conf['container_name']
source_filename_format = uploader_conf['source_filename_format']
proxy_server_conf_loc = uploader_conf.get('proxy_server_conf',
'/etc/swift/proxy-server.conf')
try:
c = ConfigParser()
c.read(proxy_server_conf_loc)
proxy_server_conf = dict(c.items('proxy-server'))
except:
proxy_server_conf = None
uploader = LogUploader(log_dir, swift_account, container_name,
source_filename_format, proxy_server_conf, logger)
logger.info("Uploading logs")
start = time.time()
uploader.upload_all_logs()
logger.info("Uploading logs complete (%0.2f minutes)" %
((time.time()-start)/60))

View File

@ -0,0 +1,26 @@
# plugin section format is named "log-processor-<plugin>"
# section "log-processor" is the generic defaults (overridden by plugins)
[log-processor]
# working_dir = /tmp/swift/
# proxy_server_conf = /etc/swift/proxy-server.conf
# log_facility = LOG_LOCAL0
# log_level = INFO
# lookback_hours = 120
# lookback_window = 120
[log-processor-access]
# log_dir = /var/log/swift/
swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
container_name = log_data
source_filename_format = %Y%m%d%H*
class_path = swift.stats.access_processor
# service ips is for client ip addresses that should be counted as servicenet
# service_ips =
[log-processor-stats]
# log_dir = /var/log/swift/
swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
container_name = account_stats
source_filename_format = %Y%m%d%H*
class_path = swift.stats.stats_processor

View File

@ -0,0 +1,72 @@
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import zlib
import struct
class CompressedFileReader(object):
'''
Wraps a file object and provides a read method that returns gzip'd data.
One warning: if read is called with a small value, the data returned may
be bigger than the value. In this case, the "compressed" data will be
bigger than the original data. To solve this, use a bigger read buffer.
An example use case:
Given an uncompressed file on disk, provide a way to read compressed data
without buffering the entire file data in memory. Using this class, an
uncompressed log file could be uploaded as compressed data with chunked
transfer encoding.
gzip header and footer code taken from the python stdlib gzip module
:param file_obj: File object to read from
:param compresslevel: compression level
'''
def __init__(self, file_obj, compresslevel=9):
self._f = file_obj
self._compressor = zlib.compressobj(compresslevel,
zlib.DEFLATED,
-zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL,
0)
self.done = False
self.first = True
self.crc32 = 0
self.total_size = 0
def read(self, *a, **kw):
if self.done:
return ''
x = self._f.read(*a, **kw)
if x:
self.crc32 = zlib.crc32(x, self.crc32) & 0xffffffffL
self.total_size += len(x)
compressed = self._compressor.compress(x)
if not compressed:
compressed = self._compressor.flush(zlib.Z_SYNC_FLUSH)
else:
compressed = self._compressor.flush(zlib.Z_FINISH)
crc32 = struct.pack("<L", self.crc32 & 0xffffffffL)
size = struct.pack("<L", self.total_size & 0xffffffffL)
footer = crc32 + size
compressed += footer
self.done = True
if self.first:
self.first = False
header = '\037\213\010\000\000\000\000\000\002\377'
compressed = header + compressed
return compressed

View File

@ -0,0 +1,174 @@
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import webob
from urllib import quote, unquote
from json import loads as json_loads
from swift.common.compressed_file_reader import CompressedFileReader
from swift.proxy.server import BaseApplication
class InternalProxy(object):
"""
Set up a private instance of a proxy server that allows normal requests
to be made without having to actually send the request to the proxy.
This also doesn't log the requests to the normal proxy logs.
:param proxy_server_conf: proxy server configuration dictionary
:param logger: logger to log requests to
:param retries: number of times to retry each request
"""
def __init__(self, proxy_server_conf=None, logger=None, retries=0):
self.upload_app = BaseApplication(proxy_server_conf, logger)
self.retries = retries
def upload_file(self, source_file, account, container, object_name,
compress=True, content_type='application/x-gzip'):
"""
Upload a file to cloud files.
:param source_file: path to or file like object to upload
:param account: account to upload to
:param container: container to upload to
:param object_name: name of object being uploaded
:param compress: if True, compresses object as it is uploaded
:param content_type: content-type of object
:returns: True if successful, False otherwise
"""
log_create_pattern = '/v1/%s/%s/%s' % (account, container, object_name)
# create the container
if not self.put_container(account, container):
return False
# upload the file to the account
req = webob.Request.blank(log_create_pattern,
environ={'REQUEST_METHOD': 'PUT'},
headers={'Transfer-Encoding': 'chunked'})
if compress:
if hasattr(source_file, 'read'):
compressed_file = CompressedFileReader(source_file)
else:
compressed_file = CompressedFileReader(open(source_file, 'rb'))
req.body_file = compressed_file
else:
if not hasattr(source_file, 'read'):
source_file = open(source_file, 'rb')
req.body_file = source_file
req.account = account
req.content_type = content_type
req.content_length = None # to make sure we send chunked data
resp = self.upload_app.handle_request(self.upload_app.update_request(req))
tries = 1
while (resp.status_int < 200 or resp.status_int > 299) \
and tries <= self.retries:
resp = self.upload_app.handle_request(self.upload_app.update_request(req))
tries += 1
if not (200 <= resp.status_int < 300):
return False
return True
def get_object(self, account, container, object_name):
"""
Get object.
:param account: account name object is in
:param container: container name object is in
:param object_name: name of object to get
:returns: iterator for object data
"""
req = webob.Request.blank('/v1/%s/%s/%s' %
(account, container, object_name),
environ={'REQUEST_METHOD': 'GET'})
req.account = account
resp = self.upload_app.handle_request(self.upload_app.update_request(req))
tries = 1
while (resp.status_int < 200 or resp.status_int > 299) \
and tries <= self.retries:
resp = self.upload_app.handle_request(self.upload_app.update_request(req))
tries += 1
for x in resp.app_iter:
yield x
def create_container(self, account, container):
"""
Create container.
:param account: account name to put the container in
:param container: container name to create
:returns: True if successful, otherwise False
"""
req = webob.Request.blank('/v1/%s/%s' % (account, container),
environ={'REQUEST_METHOD': 'PUT'})
req.account = account
resp = self.upload_app.handle_request(self.upload_app.update_request(req))
tries = 1
while (resp.status_int < 200 or resp.status_int > 299) \
and tries <= self.retries:
resp = self.upload_app.handle_request(self.upload_app.update_request(req))
tries += 1
return 200 <= resp.status_int < 300
def get_container_list(self, account, container, marker=None, limit=None,
prefix=None, delimiter=None, full_listing=True):
"""
Get container listing.
:param account: account name for the container
:param container: container name to get the listing of
:param marker: marker query
:param limit: limit to query
:param prefix: prefix query
:param delimeter: delimeter for query
:param full_listing: if True, make enough requests to get all listings
:returns: list of objects
"""
if full_listing:
rv = []
listing = self.get_container_list(account, container, marker,
limit, prefix, delimiter, full_listing=False)
while listing:
rv.extend(listing)
if not delimiter:
marker = listing[-1]['name']
else:
marker = listing[-1].get('name', listing[-1].get('subdir'))
listing = self.get_container_list(account, container, marker,
limit, prefix, delimiter, full_listing=False)
return rv
path = '/v1/%s/%s' % (account, container)
qs = 'format=json'
if marker:
qs += '&marker=%s' % quote(marker)
if limit:
qs += '&limit=%d' % limit
if prefix:
qs += '&prefix=%s' % quote(prefix)
if delimiter:
qs += '&delimiter=%s' % quote(delimiter)
path += '?%s' % qs
req = webob.Request.blank(path, environ={'REQUEST_METHOD': 'GET'})
req.account = account
resp = self.upload_app.handle_request(self.upload_app.update_request(req))
tries = 1
while (resp.status_int < 200 or resp.status_int > 299) \
and tries <= self.retries:
resp = self.upload_app.handle_request(self.upload_app.update_request(req))
tries += 1
if resp.status_int == 204:
return []
if 200 <= resp.status_int < 300:
return json_loads(resp.body)

0
swift/stats/__init__.py Normal file
View File

135
swift/stats/log_uploader.py Normal file
View File

@ -0,0 +1,135 @@
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import with_statement
import os
import hashlib
import time
import gzip
import glob
from swift.common.internal_proxy import InternalProxy
class LogUploader(object):
'''
Given a local directory, a swift account, and a container name, LogParser
will upload all files in the local directory to the given account/container.
All but the newest files will be uploaded, and the files' md5 sum will be
computed. The hash is used to prevent duplicate data from being uploaded
multiple times in different files (ex: log lines). Since the hash is
computed, it is also used as the uploaded object's etag to ensure data
integrity.
Note that after the file is successfully uploaded, it will be unlinked.
The given proxy server config is used to instantiate a proxy server for
the object uploads.
'''
def __init__(self, log_dir, swift_account, container_name, filename_format,
proxy_server_conf, logger):
if not log_dir.endswith('/'):
log_dir = log_dir + '/'
self.log_dir = log_dir
self.swift_account = swift_account
self.container_name = container_name
self.filename_format = filename_format
self.internal_proxy = InternalProxy(proxy_server_conf, logger)
self.logger = logger
def upload_all_logs(self):
i = [(c,self.filename_format.index(c)) for c in '%Y %m %d %H'.split()]
i.sort()
year_offset = month_offset = day_offset = hour_offset = None
for c, start in i:
if c == '%Y':
year_offset = start, start+4
elif c == '%m':
month_offset = start, start+2
elif c == '%d':
day_offset = start, start+2
elif c == '%H':
hour_offset = start, start+2
if not (year_offset and month_offset and day_offset and hour_offset):
# don't have all the parts, can't upload anything
return
glob_pattern = self.filename_format
glob_pattern = glob_pattern.replace('%Y', '????')
glob_pattern = glob_pattern.replace('%m', '??')
glob_pattern = glob_pattern.replace('%d', '??')
glob_pattern = glob_pattern.replace('%H', '??')
filelist = glob.iglob(os.path.join(self.log_dir, glob_pattern))
current_hour = int(time.strftime('%H'))
today = int(time.strftime('%Y%m%d'))
self.internal_proxy.create_container(self.swift_account,
self.container_name)
for filename in filelist:
try:
# From the filename, we need to derive the year, month, day,
# and hour for the file. These values are used in the uploaded
# object's name, so they should be a reasonably accurate
# representation of the time for which the data in the file was
# collected. The file's last modified time is not a reliable
# representation of the data in the file. For example, an old
# log file (from hour A) may be uploaded or moved into the
# log_dir in hour Z. The file's modified time will be for hour
# Z, and therefore the object's name in the system will not
# represent the data in it.
# If the filename doesn't match the format, it shouldn't be
# uploaded.
year = filename[slice(*year_offset)]
month = filename[slice(*month_offset)]
day = filename[slice(*day_offset)]
hour = filename[slice(*hour_offset)]
except IndexError:
# unexpected filename format, move on
self.logger.error("Unexpected log: %s" % filename)
continue
if (time.time() - os.stat(filename).st_mtime) < 7200:
# don't process very new logs
self.logger.debug("Skipping log: %s (< 2 hours old)" % filename)
continue
self.upload_one_log(filename, year, month, day, hour)
def upload_one_log(self, filename, year, month, day, hour):
if os.path.getsize(filename) == 0:
self.logger.debug("Log %s is 0 length, skipping" % filename)
return
self.logger.debug("Processing log: %s" % filename)
filehash = hashlib.md5()
already_compressed = True if filename.endswith('.gz') else False
opener = gzip.open if already_compressed else open
f = opener(filename, 'rb')
try:
for line in f:
# filter out bad lines here?
filehash.update(line)
finally:
f.close()
filehash = filehash.hexdigest()
# By adding a hash to the filename, we ensure that uploaded files
# have unique filenames and protect against uploading one file
# more than one time. By using md5, we get an etag for free.
target_filename = '/'.join([year, month, day, hour, filehash+'.gz'])
if self.internal_proxy.upload_file(filename,
self.swift_account,
self.container_name,
target_filename,
compress=(not already_compressed)):
self.logger.debug("Uploaded log %s to %s" %
(filename, target_filename))
os.unlink(filename)
else:
self.logger.error("ERROR: Upload of log %s failed!" % filename)