stats system for swift that takes logs generated by the system and creates a .csv file with aggregate info for each account in the system

This commit is contained in:
John Dickinson 2010-10-08 22:27:37 +00:00 committed by Tarmac
commit 679efc0300
26 changed files with 2066 additions and 13 deletions

27
bin/swift-account-stats-logger Executable file
View File

@ -0,0 +1,27 @@
#!/usr/bin/python
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from swift.stats.account_stats import AccountStat
from swift.common import utils
if __name__ == '__main__':
if len(sys.argv) < 2:
print "Usage: swift-account-stats-logger CONFIG_FILE"
sys.exit()
stats_conf = utils.readconf(sys.argv[1], 'log-processor-stats')
stats = AccountStat(stats_conf).run(once=True)

27
bin/swift-log-stats-collector Executable file
View File

@ -0,0 +1,27 @@
#!/usr/bin/python
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from swift.stats.log_processor import LogProcessorDaemon
from swift.common import utils
if __name__ == '__main__':
if len(sys.argv) < 2:
print "Usage: swift-log-stats-collector CONFIG_FILE"
sys.exit()
conf = utils.readconf(sys.argv[1], log_name='log-stats-collector')
stats = LogProcessorDaemon(conf).run(once=True)

31
bin/swift-log-uploader Executable file
View File

@ -0,0 +1,31 @@
#!/usr/bin/python
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from swift.stats.log_uploader import LogUploader
from swift.common import utils
if __name__ == '__main__':
if len(sys.argv) < 3:
print "Usage: swift-log-uploader CONFIG_FILE plugin"
sys.exit()
uploader_conf = utils.readconf(sys.argv[1], 'log-processor')
plugin = sys.argv[2]
section_name = 'log-processor-%s' % plugin
plugin_conf = utils.readconf(sys.argv[1], section_name)
uploader_conf.update(plugin_conf)
uploader = LogUploader(uploader_conf, plugin).run(once=True)

View File

@ -24,6 +24,7 @@ Overview:
overview_reaper
overview_auth
overview_replication
overview_stats
Development:

View File

@ -0,0 +1,184 @@
==================
Swift stats system
==================
The swift stats system is composed of three parts parts: log creation, log
uploading, and log processing. The system handles two types of logs (access
and account stats), but it can be extended to handle other types of logs.
---------
Log Types
---------
***********
Access logs
***********
Access logs are the proxy server logs. Rackspace uses syslog-ng to redirect
the proxy log output to an hourly log file. For example, a proxy request that
is made on August 4, 2010 at 12:37 gets logged in a file named 2010080412.
This allows easy log rotation and easy per-hour log processing.
******************
Account stats logs
******************
Account stats logs are generated by a stats system process.
swift-account-stats-logger runs on each account server (via cron) and walks
the filesystem looking for account databases. When an account database is
found, the logger selects the account hash, bytes_used, container_count, and
object_count. These values are then written out as one line in a csv file. One
csv file is produced for every run of swift-account-stats-logger. This means
that, system wide, one csv file is produced for every storage node. Rackspace
runs the account stats logger every hour. Therefore, in a cluster of ten
account servers, ten csv files are produced every hour. Also, every account
will have one entry for every replica in the system. On average, there will be
three copies of each account in the aggregate of all account stat csv files
created in one system-wide run.
----------------------
Log Processing plugins
----------------------
The swift stats system is written to allow a plugin to be defined for every
log type. Swift includes plugins for both access logs and storage stats logs.
Each plugin is responsible for defining, in a config section, where the logs
are stored on disk, where the logs will be stored in swift (account and
container), the filename format of the logs on disk, the location of the
plugin class definition, and any plugin-specific config values.
The plugin class definition defines three methods. The constructor must accept
one argument (the dict representation of the plugin's config section). The
process method must accept an iterator, and the account, container, and object
name of the log. The keylist_mapping accepts no parameters.
-------------
Log Uploading
-------------
swift-log-uploader accepts a config file and a plugin name. It finds the log
files on disk according to the plugin config section and uploads them to the
swift cluster. This means one uploader process will run on each proxy server
node and each account server node. To not upload partially-written log files,
the uploader will not upload files with an mtime of less than two hours ago.
Rackspace runs this process once an hour via cron.
--------------
Log Processing
--------------
swift-log-stats-collector accepts a config file and generates a csv that is
uploaded to swift. It loads all plugins defined in the config file, generates
a list of all log files in swift that need to be processed, and passes an
iterable of the log file data to the appropriate plugin's process method. The
process method returns a dictionary of data in the log file keyed on (account,
year, month, day, hour). The log-stats-collector process then combines all
dictionaries from all calls to a process method into one dictionary. Key
collisions within each (account, year, month, day, hour) dictionary are
summed. Finally, the summed dictionary is mapped to the final csv values with
each plugin's keylist_mapping method.
The resulting csv file has one line per (account, year, month, day, hour) for
all log files processed in that run of swift-log-stats-collector.
================================
Running the stats system on SAIO
================================
#. Create a swift account to use for storing stats information, and note the
account hash. The hash will be used in config files.
#. Install syslog-ng::
sudo apt-get install syslog-ng
#. Add the following to the end of `/etc/syslog-ng/syslog-ng.conf`::
# Added for swift logging
destination df_local1 { file("/var/log/swift/proxy.log" owner(<username>) group(<groupname>)); };
destination df_local1_err { file("/var/log/swift/proxy.error" owner(<username>) group(<groupname>)); };
destination df_local1_hourly { file("/var/log/swift/hourly/$YEAR$MONTH$DAY$HOUR" owner(<username>) group(<groupname>)); };
filter f_local1 { facility(local1) and level(info); };
filter f_local1_err { facility(local1) and not level(info); };
# local1.info -/var/log/swift/proxy.log
# write to local file and to remove log server
log {
source(s_all);
filter(f_local1);
destination(df_local1);
destination(df_local1_hourly);
};
# local1.error -/var/log/swift/proxy.error
# write to local file and to remove log server
log {
source(s_all);
filter(f_local1_err);
destination(df_local1_err);
};
#. Restart syslog-ng
#. Create the log directories::
mkdir /var/log/swift/hourly
mkdir /var/log/swift/stats
chown -R <username>:<groupname> /var/log/swift
#. Create `/etc/swift/log-processor.conf`::
[log-processor]
swift_account = <your-stats-account-hash>
user = <your-user-name>
[log-processor-access]
swift_account = <your-stats-account-hash>
container_name = log_data
log_dir = /var/log/swift/hourly/
source_filename_format = %Y%m%d%H
class_path = swift.stats.access_processor.AccessLogProcessor
user = <your-user-name>
[log-processor-stats]
swift_account = <your-stats-account-hash>
container_name = account_stats
log_dir = /var/log/swift/stats/
source_filename_format = %Y%m%d%H_*
class_path = swift.stats.stats_processor.StatsLogProcessor
account_server_conf = /etc/swift/account-server/1.conf
user = <your-user-name>
#. Add the following under [app:proxy-server] in `/etc/swift/proxy-server.conf`::
log_facility = LOG_LOCAL1
#. Create a `cron` job to run once per hour to create the stats logs. In
`/etc/cron.d/swift-stats-log-creator`::
0 * * * * <your-user-name> swift-account-stats-logger /etc/swift/log-processor.conf
#. Create a `cron` job to run once per hour to upload the stats logs. In
`/etc/cron.d/swift-stats-log-uploader`::
10 * * * * <your-user-name> swift-log-uploader /etc/swift/log-processor.conf stats
#. Create a `cron` job to run once per hour to upload the access logs. In
`/etc/cron.d/swift-access-log-uploader`::
5 * * * * <your-user-name> swift-log-uploader /etc/swift/log-processor.conf access
#. Create a `cron` job to run once per hour to process the logs. In
`/etc/cron.d/swift-stats-processor`::
30 * * * * <your-user-name> swift-log-stats-collector /etc/swift/log-processor.conf
After running for a few hours, you should start to see .csv files in the
log_processing_data container in the swift stats account that was created
earlier. This file will have one entry per account per hour for each account
with activity in that hour. One .csv file should be produced per hour. Note
that the stats will be delayed by at least two hours by default. This can be
changed with the new_log_cutoff variable in the config file. See
`log-processing.conf-sample` for more details.

View File

@ -0,0 +1,39 @@
# plugin section format is named "log-processor-<plugin>"
[log-processor]
swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
# container_name = log_processing_data
# proxy_server_conf = /etc/swift/proxy-server.conf
# log_facility = LOG_LOCAL0
# log_level = INFO
# lookback_hours = 120
# lookback_window = 120
# user = swift
[log-processor-access]
# log_dir = /var/log/swift/
swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
container_name = log_data
source_filename_format = access-%Y%m%d%H
# new_log_cutoff = 7200
# unlink_log = True
class_path = swift.stats.access_processor.AccessLogProcessor
# service ips is for client ip addresses that should be counted as servicenet
# service_ips =
# load balancer private ips is for load balancer ip addresses that should be
# counted as servicenet
# lb_private_ips =
# server_name = proxy
# user = swift
# warn_percent = 0.8
[log-processor-stats]
# log_dir = /var/log/swift/
swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
container_name = account_stats
source_filename_format = stats-%Y%m%d%H_*
# new_log_cutoff = 7200
# unlink_log = True
class_path = swift.stats.stats_processor.StatsLogProcessor
# account_server_conf = /etc/swift/account-server.conf
# user = swift

View File

@ -76,6 +76,9 @@ setup(
'bin/swift-ring-builder', 'bin/swift-stats-populate',
'bin/swift-stats-report',
'bin/swift-bench',
'bin/swift-log-uploader',
'bin/swift-log-stats-collector',
'bin/swift-account-stats-logger',
],
entry_points={
'paste.app_factory': [

View File

@ -0,0 +1,73 @@
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import zlib
import struct
class CompressingFileReader(object):
'''
Wraps a file object and provides a read method that returns gzip'd data.
One warning: if read is called with a small value, the data returned may
be bigger than the value. In this case, the "compressed" data will be
bigger than the original data. To solve this, use a bigger read buffer.
An example use case:
Given an uncompressed file on disk, provide a way to read compressed data
without buffering the entire file data in memory. Using this class, an
uncompressed log file could be uploaded as compressed data with chunked
transfer encoding.
gzip header and footer code taken from the python stdlib gzip module
:param file_obj: File object to read from
:param compresslevel: compression level
'''
def __init__(self, file_obj, compresslevel=9):
self._f = file_obj
self._compressor = zlib.compressobj(compresslevel,
zlib.DEFLATED,
-zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL,
0)
self.done = False
self.first = True
self.crc32 = 0
self.total_size = 0
def read(self, *a, **kw):
if self.done:
return ''
x = self._f.read(*a, **kw)
if x:
self.crc32 = zlib.crc32(x, self.crc32) & 0xffffffffL
self.total_size += len(x)
compressed = self._compressor.compress(x)
if not compressed:
compressed = self._compressor.flush(zlib.Z_SYNC_FLUSH)
else:
compressed = self._compressor.flush(zlib.Z_FINISH)
crc32 = struct.pack("<L", self.crc32 & 0xffffffffL)
size = struct.pack("<L", self.total_size & 0xffffffffL)
footer = crc32 + size
compressed += footer
self.done = True
if self.first:
self.first = False
header = '\037\213\010\000\000\000\000\000\002\377'
compressed = header + compressed
return compressed

View File

@ -34,12 +34,15 @@ class Daemon(object):
"""Override this to run forever"""
raise NotImplementedError('run_forever not implemented')
def run(self, once=False):
def run(self, once=False, capture_stdout=True, capture_stderr=True):
"""Run the daemon"""
# log uncaught exceptions
sys.excepthook = lambda *exc_info: \
self.logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info)
sys.stdout = sys.stderr = utils.LoggerFileObject(self.logger)
if capture_stdout:
sys.stdout = utils.LoggerFileObject(self.logger)
if capture_stderr:
sys.stderr = utils.LoggerFileObject(self.logger)
utils.drop_privileges(self.conf.get('user', 'swift'))

View File

@ -0,0 +1,210 @@
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import webob
from urllib import quote, unquote
from json import loads as json_loads
from swift.common.compressing_file_reader import CompressingFileReader
from swift.proxy.server import BaseApplication
class MemcacheStub(object):
def get(self, *a, **kw):
return None
def set(self, *a, **kw):
return None
def incr(self, *a, **kw):
return 0
def delete(self, *a, **kw):
return None
def set_multi(self, *a, **kw):
return None
def get_multi(self, *a, **kw):
return []
class InternalProxy(object):
"""
Set up a private instance of a proxy server that allows normal requests
to be made without having to actually send the request to the proxy.
This also doesn't log the requests to the normal proxy logs.
:param proxy_server_conf: proxy server configuration dictionary
:param logger: logger to log requests to
:param retries: number of times to retry each request
"""
def __init__(self, proxy_server_conf=None, logger=None, retries=0):
self.upload_app = BaseApplication(proxy_server_conf,
memcache=MemcacheStub(),
logger=logger)
self.retries = retries
def upload_file(self, source_file, account, container, object_name,
compress=True, content_type='application/x-gzip',
etag=None):
"""
Upload a file to cloud files.
:param source_file: path to or file like object to upload
:param account: account to upload to
:param container: container to upload to
:param object_name: name of object being uploaded
:param compress: if True, compresses object as it is uploaded
:param content_type: content-type of object
:param etag: etag for object to check successful upload
:returns: True if successful, False otherwise
"""
target_name = '/v1/%s/%s/%s' % (account, container, object_name)
# create the container
if not self.create_container(account, container):
return False
# upload the file to the account
req = webob.Request.blank(target_name,
environ={'REQUEST_METHOD': 'PUT'},
headers={'Transfer-Encoding': 'chunked'})
if compress:
if hasattr(source_file, 'read'):
compressed_file = CompressingFileReader(source_file)
else:
compressed_file = CompressingFileReader(
open(source_file, 'rb'))
req.body_file = compressed_file
else:
if not hasattr(source_file, 'read'):
source_file = open(source_file, 'rb')
req.body_file = source_file
req.account = account
req.content_type = content_type
req.content_length = None # to make sure we send chunked data
if etag:
req.etag = etag
resp = self.upload_app.handle_request(
self.upload_app.update_request(req))
tries = 1
while (resp.status_int < 200 or resp.status_int > 299) \
and tries <= self.retries:
resp = self.upload_app.handle_request(
self.upload_app.update_request(req))
tries += 1
if not (200 <= resp.status_int < 300):
return False
return True
def get_object(self, account, container, object_name):
"""
Get object.
:param account: account name object is in
:param container: container name object is in
:param object_name: name of object to get
:returns: iterator for object data
"""
req = webob.Request.blank('/v1/%s/%s/%s' %
(account, container, object_name),
environ={'REQUEST_METHOD': 'GET'})
req.account = account
resp = self.upload_app.handle_request(
self.upload_app.update_request(req))
tries = 1
while (resp.status_int < 200 or resp.status_int > 299) \
and tries <= self.retries:
resp = self.upload_app.handle_request(
self.upload_app.update_request(req))
tries += 1
return resp.status_int, resp.app_iter
def create_container(self, account, container):
"""
Create container.
:param account: account name to put the container in
:param container: container name to create
:returns: True if successful, otherwise False
"""
req = webob.Request.blank('/v1/%s/%s' % (account, container),
environ={'REQUEST_METHOD': 'PUT'})
req.account = account
resp = self.upload_app.handle_request(
self.upload_app.update_request(req))
tries = 1
while (resp.status_int < 200 or resp.status_int > 299) \
and tries <= self.retries:
resp = self.upload_app.handle_request(
self.upload_app.update_request(req))
tries += 1
return 200 <= resp.status_int < 300
def get_container_list(self, account, container, marker=None, limit=None,
prefix=None, delimiter=None, full_listing=True):
"""
Get container listing.
:param account: account name for the container
:param container: container name to get the listing of
:param marker: marker query
:param limit: limit to query
:param prefix: prefix query
:param delimeter: delimeter for query
:param full_listing: if True, make enough requests to get all listings
:returns: list of objects
"""
if full_listing:
rv = []
listing = self.get_container_list(account, container, marker,
limit, prefix, delimiter, full_listing=False)
while listing:
rv.extend(listing)
if not delimiter:
marker = listing[-1]['name']
else:
marker = listing[-1].get('name', listing[-1].get('subdir'))
listing = self.get_container_list(account, container, marker,
limit, prefix, delimiter, full_listing=False)
return rv
path = '/v1/%s/%s' % (account, container)
qs = 'format=json'
if marker:
qs += '&marker=%s' % quote(marker)
if limit:
qs += '&limit=%d' % limit
if prefix:
qs += '&prefix=%s' % quote(prefix)
if delimiter:
qs += '&delimiter=%s' % quote(delimiter)
path += '?%s' % qs
req = webob.Request.blank(path, environ={'REQUEST_METHOD': 'GET'})
req.account = account
resp = self.upload_app.handle_request(
self.upload_app.update_request(req))
tries = 1
while (resp.status_int < 200 or resp.status_int > 299) \
and tries <= self.retries:
resp = self.upload_app.handle_request(
self.upload_app.update_request(req))
tries += 1
if resp.status_int == 204:
return []
if 200 <= resp.status_int < 300:
return json_loads(resp.body)

View File

@ -553,12 +553,13 @@ def cache_from_env(env):
return item_from_env(env, 'swift.cache')
def readconf(conf, section_name, log_name=None, defaults=None):
def readconf(conf, section_name=None, log_name=None, defaults=None):
"""
Read config file and return config items as a dict
:param conf: path to config file
:param section_name: config section to read
:param section_name: config section to read (will return all sections if
not defined)
:param log_name: name to be used with logging (will use section_name if
not defined)
:param defaults: dict of default values to pre-populate the config with
@ -570,16 +571,24 @@ def readconf(conf, section_name, log_name=None, defaults=None):
if not c.read(conf):
print "Unable to read config file %s" % conf
sys.exit(1)
if c.has_section(section_name):
conf = dict(c.items(section_name))
else:
print "Unable to find %s config section in %s" % (section_name, conf)
sys.exit(1)
if "log_name" not in conf:
if log_name is not None:
conf['log_name'] = log_name
if section_name:
if c.has_section(section_name):
conf = dict(c.items(section_name))
else:
conf['log_name'] = section_name
print "Unable to find %s config section in %s" % (section_name,
conf)
sys.exit(1)
if "log_name" not in conf:
if log_name is not None:
conf['log_name'] = log_name
else:
conf['log_name'] = section_name
else:
conf = {}
for s in c.sections():
conf.update({s: dict(c.items(s))})
if 'log_name' not in conf:
conf['log_name'] = log_name
return conf

0
swift/stats/__init__.py Normal file
View File

View File

@ -0,0 +1,239 @@
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
from urllib import unquote
import copy
from swift.common.utils import split_path, get_logger
month_map = '_ Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split()
class AccessLogProcessor(object):
"""Transform proxy server access logs"""
def __init__(self, conf):
self.server_name = conf.get('server_name', 'proxy')
self.lb_private_ips = [x.strip() for x in \
conf.get('lb_private_ips', '').split(',')\
if x.strip()]
self.service_ips = [x.strip() for x in \
conf.get('service_ips', '').split(',')\
if x.strip()]
self.warn_percent = float(conf.get('warn_percent', '0.8'))
self.logger = get_logger(conf)
def log_line_parser(self, raw_log):
'''given a raw access log line, return a dict of the good parts'''
d = {}
try:
(_,
server,
client_ip,
lb_ip,
timestamp,
method,
request,
http_version,
code,
referrer,
user_agent,
auth_token,
bytes_in,
bytes_out,
etag,
trans_id,
headers,
processing_time) = (unquote(x) for x in raw_log[16:].split(' '))
except ValueError:
self.logger.debug('Bad line data: %s' % repr(raw_log))
return {}
if server != self.server_name:
# incorrect server name in log line
self.logger.debug('Bad server name: found "%s" expected "%s"' \
% (server, self.server_name))
return {}
(version,
account,
container_name,
object_name) = split_path(request, 2, 4, True)
if container_name is not None:
container_name = container_name.split('?', 1)[0]
if object_name is not None:
object_name = object_name.split('?', 1)[0]
account = account.split('?', 1)[0]
query = None
if '?' in request:
request, query = request.split('?', 1)
args = query.split('&')
# Count each query argument. This is used later to aggregate
# the number of format, prefix, etc. queries.
for q in args:
if '=' in q:
k, v = q.split('=', 1)
else:
k = q
# Certain keys will get summmed in stats reporting
# (format, path, delimiter, etc.). Save a "1" here
# to indicate that this request is 1 request for
# its respective key.
d[k] = 1
d['client_ip'] = client_ip
d['lb_ip'] = lb_ip
d['method'] = method
d['request'] = request
if query:
d['query'] = query
d['http_version'] = http_version
d['code'] = code
d['referrer'] = referrer
d['user_agent'] = user_agent
d['auth_token'] = auth_token
d['bytes_in'] = bytes_in
d['bytes_out'] = bytes_out
d['etag'] = etag
d['trans_id'] = trans_id
d['processing_time'] = processing_time
day, month, year, hour, minute, second = timestamp.split('/')
d['day'] = day
month = ('%02s' % month_map.index(month)).replace(' ', '0')
d['month'] = month
d['year'] = year
d['hour'] = hour
d['minute'] = minute
d['second'] = second
d['tz'] = '+0000'
d['account'] = account
d['container_name'] = container_name
d['object_name'] = object_name
d['bytes_out'] = int(d['bytes_out'].replace('-', '0'))
d['bytes_in'] = int(d['bytes_in'].replace('-', '0'))
d['code'] = int(d['code'])
return d
def process(self, obj_stream, account, container, object_name):
'''generate hourly groupings of data from one access log file'''
hourly_aggr_info = {}
total_lines = 0
bad_lines = 0
for line in obj_stream:
line_data = self.log_line_parser(line)
total_lines += 1
if not line_data:
bad_lines += 1
continue
account = line_data['account']
container_name = line_data['container_name']
year = line_data['year']
month = line_data['month']
day = line_data['day']
hour = line_data['hour']
bytes_out = line_data['bytes_out']
bytes_in = line_data['bytes_in']
method = line_data['method']
code = int(line_data['code'])
object_name = line_data['object_name']
client_ip = line_data['client_ip']
op_level = None
if not container_name:
op_level = 'account'
elif container_name and not object_name:
op_level = 'container'
elif object_name:
op_level = 'object'
aggr_key = (account, year, month, day, hour)
d = hourly_aggr_info.get(aggr_key, {})
if line_data['lb_ip'] in self.lb_private_ips:
source = 'service'
else:
source = 'public'
if line_data['client_ip'] in self.service_ips:
source = 'service'
d[(source, 'bytes_out')] = d.setdefault((
source, 'bytes_out'), 0) + bytes_out
d[(source, 'bytes_in')] = d.setdefault((source, 'bytes_in'), 0) + \
bytes_in
d['format_query'] = d.setdefault('format_query', 0) + \
line_data.get('format', 0)
d['marker_query'] = d.setdefault('marker_query', 0) + \
line_data.get('marker', 0)
d['prefix_query'] = d.setdefault('prefix_query', 0) + \
line_data.get('prefix', 0)
d['delimiter_query'] = d.setdefault('delimiter_query', 0) + \
line_data.get('delimiter', 0)
path = line_data.get('path', 0)
d['path_query'] = d.setdefault('path_query', 0) + path
code = '%dxx' % (code / 100)
key = (source, op_level, method, code)
d[key] = d.setdefault(key, 0) + 1
hourly_aggr_info[aggr_key] = d
if bad_lines > (total_lines * self.warn_percent):
name = '/'.join([account, container, object_name])
self.logger.warning('I found a bunch of bad lines in %s '\
'(%d bad, %d total)' % (name, bad_lines, total_lines))
return hourly_aggr_info
def keylist_mapping(self):
source_keys = 'service public'.split()
level_keys = 'account container object'.split()
verb_keys = 'GET PUT POST DELETE HEAD COPY'.split()
code_keys = '2xx 4xx 5xx'.split()
keylist_mapping = {
# <db key> : <row key> or <set of row keys>
'service_bw_in': ('service', 'bytes_in'),
'service_bw_out': ('service', 'bytes_out'),
'public_bw_in': ('public', 'bytes_in'),
'public_bw_out': ('public', 'bytes_out'),
'account_requests': set(),
'container_requests': set(),
'object_requests': set(),
'service_request': set(),
'public_request': set(),
'ops_count': set(),
}
for verb in verb_keys:
keylist_mapping[verb] = set()
for code in code_keys:
keylist_mapping[code] = set()
for source in source_keys:
for level in level_keys:
for verb in verb_keys:
for code in code_keys:
keylist_mapping['account_requests'].add(
(source, 'account', verb, code))
keylist_mapping['container_requests'].add(
(source, 'container', verb, code))
keylist_mapping['object_requests'].add(
(source, 'object', verb, code))
keylist_mapping['service_request'].add(
('service', level, verb, code))
keylist_mapping['public_request'].add(
('public', level, verb, code))
keylist_mapping[verb].add(
(source, level, verb, code))
keylist_mapping[code].add(
(source, level, verb, code))
keylist_mapping['ops_count'].add(
(source, level, verb, code))
return keylist_mapping

View File

@ -0,0 +1,111 @@
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
from paste.deploy import appconfig
import shutil
import hashlib
from swift.account.server import DATADIR as account_server_data_dir
from swift.common.db import AccountBroker
from swift.common.internal_proxy import InternalProxy
from swift.common.utils import renamer, get_logger, readconf, mkdirs
from swift.common.constraints import check_mount
from swift.common.daemon import Daemon
class AccountStat(Daemon):
"""
Extract storage stats from account databases on the account
storage nodes
"""
def __init__(self, stats_conf):
super(AccountStat, self).__init__(stats_conf)
target_dir = stats_conf.get('log_dir', '/var/log/swift')
account_server_conf_loc = stats_conf.get('account_server_conf',
'/etc/swift/account-server.conf')
server_conf = appconfig('config:%s' % account_server_conf_loc,
name='account-server')
filename_format = stats_conf['source_filename_format']
if filename_format.count('*') > 1:
raise Exception('source filename format should have at max one *')
self.filename_format = filename_format
self.target_dir = target_dir
mkdirs(self.target_dir)
self.devices = server_conf.get('devices', '/srv/node')
self.mount_check = server_conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')
self.logger = get_logger(stats_conf, 'swift-account-stats-logger')
def run_once(self):
self.logger.info("Gathering account stats")
start = time.time()
self.find_and_process()
self.logger.info("Gathering account stats complete (%0.2f minutes)" %
((time.time() - start) / 60))
def find_and_process(self):
src_filename = time.strftime(self.filename_format)
working_dir = os.path.join(self.target_dir, '.stats_tmp')
shutil.rmtree(working_dir, ignore_errors=True)
mkdirs(working_dir)
tmp_filename = os.path.join(working_dir, src_filename)
hasher = hashlib.md5()
with open(tmp_filename, 'wb') as statfile:
# csv has the following columns:
# Account Name, Container Count, Object Count, Bytes Used
for device in os.listdir(self.devices):
if self.mount_check and not check_mount(self.devices, device):
self.logger.error("Device %s is not mounted, skipping." %
device)
continue
accounts = os.path.join(self.devices,
device,
account_server_data_dir)
if not os.path.exists(accounts):
self.logger.debug("Path %s does not exist, skipping." %
accounts)
continue
for root, dirs, files in os.walk(accounts, topdown=False):
for filename in files:
if filename.endswith('.db'):
db_path = os.path.join(root, filename)
broker = AccountBroker(db_path)
if not broker.is_deleted():
(account_name,
_, _, _,
container_count,
object_count,
bytes_used,
_, _) = broker.get_info()
line_data = '"%s",%d,%d,%d\n' % (
account_name, container_count,
object_count, bytes_used)
statfile.write(line_data)
hasher.update(line_data)
file_hash = hasher.hexdigest()
hash_index = src_filename.find('*')
if hash_index < 0:
# if there is no * in the target filename, the uploader probably
# won't work because we are crafting a filename that doesn't
# fit the pattern
src_filename = '_'.join([src_filename, file_hash])
else:
parts = src_filename[:hash_index], src_filename[hash_index + 1:]
src_filename = ''.join([parts[0], file_hash, parts[1]])
renamer(tmp_filename, os.path.join(self.target_dir, src_filename))
shutil.rmtree(working_dir, ignore_errors=True)

View File

@ -0,0 +1,424 @@
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ConfigParser import ConfigParser
import zlib
import time
import datetime
import cStringIO
import collections
from paste.deploy import appconfig
import multiprocessing
import Queue
import cPickle
import hashlib
from swift.common.internal_proxy import InternalProxy
from swift.common.exceptions import ChunkReadTimeout
from swift.common.utils import get_logger, readconf
from swift.common.daemon import Daemon
class BadFileDownload(Exception):
pass
class LogProcessor(object):
"""Load plugins, process logs"""
def __init__(self, conf, logger):
if isinstance(logger, tuple):
self.logger = get_logger(*logger)
else:
self.logger = logger
self.conf = conf
self._internal_proxy = None
# load the processing plugins
self.plugins = {}
plugin_prefix = 'log-processor-'
for section in (x for x in conf if x.startswith(plugin_prefix)):
plugin_name = section[len(plugin_prefix):]
plugin_conf = conf.get(section, {})
self.plugins[plugin_name] = plugin_conf
class_path = self.plugins[plugin_name]['class_path']
import_target, class_name = class_path.rsplit('.', 1)
module = __import__(import_target, fromlist=[import_target])
klass = getattr(module, class_name)
self.plugins[plugin_name]['instance'] = klass(plugin_conf)
self.logger.debug('Loaded plugin "%s"' % plugin_name)
@property
def internal_proxy(self):
if self._internal_proxy is None:
stats_conf = self.conf.get('log-processor', {})
proxy_server_conf_loc = stats_conf.get('proxy_server_conf',
'/etc/swift/proxy-server.conf')
proxy_server_conf = appconfig(
'config:%s' % proxy_server_conf_loc,
name='proxy-server')
self._internal_proxy = InternalProxy(proxy_server_conf,
self.logger,
retries=3)
else:
return self._internal_proxy
def process_one_file(self, plugin_name, account, container, object_name):
self.logger.info('Processing %s/%s/%s with plugin "%s"' % (account,
container,
object_name,
plugin_name))
# get an iter of the object data
compressed = object_name.endswith('.gz')
stream = self.get_object_data(account, container, object_name,
compressed=compressed)
# look up the correct plugin and send the stream to it
return self.plugins[plugin_name]['instance'].process(stream,
account,
container,
object_name)
def get_data_list(self, start_date=None, end_date=None,
listing_filter=None):
total_list = []
for plugin_name, data in self.plugins.items():
account = data['swift_account']
container = data['container_name']
listing = self.get_container_listing(account,
container,
start_date,
end_date)
for object_name in listing:
# The items in this list end up being passed as positional
# parameters to process_one_file.
x = (plugin_name, account, container, object_name)
if x not in listing_filter:
total_list.append(x)
return total_list
def get_container_listing(self, swift_account, container_name,
start_date=None, end_date=None,
listing_filter=None):
'''
Get a container listing, filtered by start_date, end_date, and
listing_filter. Dates, if given, should be in YYYYMMDDHH format
'''
search_key = None
if start_date is not None:
date_parts = []
try:
year, start_date = start_date[:4], start_date[4:]
if year:
date_parts.append(year)
month, start_date = start_date[:2], start_date[2:]
if month:
date_parts.append(month)
day, start_date = start_date[:2], start_date[2:]
if day:
date_parts.append(day)
hour, start_date = start_date[:2], start_date[2:]
if hour:
date_parts.append(hour)
except IndexError:
pass
else:
search_key = '/'.join(date_parts)
end_key = None
if end_date is not None:
date_parts = []
try:
year, end_date = end_date[:4], end_date[4:]
if year:
date_parts.append(year)
month, end_date = end_date[:2], end_date[2:]
if month:
date_parts.append(month)
day, end_date = end_date[:2], end_date[2:]
if day:
date_parts.append(day)
hour, end_date = end_date[:2], end_date[2:]
if hour:
date_parts.append(hour)
except IndexError:
pass
else:
end_key = '/'.join(date_parts)
container_listing = self.internal_proxy.get_container_list(
swift_account,
container_name,
marker=search_key)
results = []
if container_listing is not None:
if listing_filter is None:
listing_filter = set()
for item in container_listing:
name = item['name']
if end_key and name > end_key:
break
if name not in listing_filter:
results.append(name)
return results
def get_object_data(self, swift_account, container_name, object_name,
compressed=False):
'''reads an object and yields its lines'''
code, o = self.internal_proxy.get_object(swift_account,
container_name,
object_name)
if code < 200 or code >= 300:
return
last_part = ''
last_compressed_part = ''
# magic in the following zlib.decompressobj argument is courtesy of
# Python decompressing gzip chunk-by-chunk
# http://stackoverflow.com/questions/2423866
d = zlib.decompressobj(16 + zlib.MAX_WBITS)
try:
for chunk in o:
if compressed:
try:
chunk = d.decompress(chunk)
except zlib.error:
self.logger.debug('Bad compressed data for %s/%s/%s' %
(swift_account,
container_name,
object_name))
raise BadFileDownload() # bad compressed data
parts = chunk.split('\n')
parts[0] = last_part + parts[0]
for part in parts[:-1]:
yield part
last_part = parts[-1]
if last_part:
yield last_part
except ChunkReadTimeout:
raise BadFileDownload()
def generate_keylist_mapping(self):
keylist = {}
for plugin in self.plugins:
plugin_keylist = self.plugins[plugin]['instance'].keylist_mapping()
if not plugin_keylist:
continue
for k, v in plugin_keylist.items():
o = keylist.get(k)
if o:
if isinstance(o, set):
if isinstance(v, set):
o.update(v)
else:
o.update([v])
else:
o = set(o)
if isinstance(v, set):
o.update(v)
else:
o.update([v])
else:
o = v
keylist[k] = o
return keylist
class LogProcessorDaemon(Daemon):
"""
Gather raw log data and farm proccessing to generate a csv that is
uploaded to swift.
"""
def __init__(self, conf):
c = conf.get('log-processor')
super(LogProcessorDaemon, self).__init__(c)
self.total_conf = conf
self.logger = get_logger(c)
self.log_processor = LogProcessor(conf, self.logger)
self.lookback_hours = int(c.get('lookback_hours', '120'))
self.lookback_window = int(c.get('lookback_window',
str(self.lookback_hours)))
self.log_processor_account = c['swift_account']
self.log_processor_container = c.get('container_name',
'log_processing_data')
self.worker_count = int(c.get('worker_count', '1'))
def run_once(self):
self.logger.info("Beginning log processing")
start = time.time()
if self.lookback_hours == 0:
lookback_start = None
lookback_end = None
else:
delta_hours = datetime.timedelta(hours=self.lookback_hours)
lookback_start = datetime.datetime.now() - delta_hours
lookback_start = lookback_start.strftime('%Y%m%d%H')
if self.lookback_window == 0:
lookback_end = None
else:
delta_window = datetime.timedelta(hours=self.lookback_window)
lookback_end = datetime.datetime.now() - \
delta_hours + \
delta_window
lookback_end = lookback_end.strftime('%Y%m%d%H')
self.logger.debug('lookback_start: %s' % lookback_start)
self.logger.debug('lookback_end: %s' % lookback_end)
try:
# Note: this file (or data set) will grow without bound.
# In practice, if it becomes a problem (say, after many months of
# running), one could manually prune the file to remove older
# entries. Automatically pruning on each run could be dangerous.
# There is not a good way to determine when an old entry should be
# pruned (lookback_hours could be set to anything and could change)
processed_files_stream = self.log_processor.get_object_data(
self.log_processor_account,
self.log_processor_container,
'processed_files.pickle.gz',
compressed=True)
buf = '\n'.join(x for x in processed_files_stream)
if buf:
already_processed_files = cPickle.loads(buf)
else:
already_processed_files = set()
except:
already_processed_files = set()
self.logger.debug('found %d processed files' % \
len(already_processed_files))
logs_to_process = self.log_processor.get_data_list(lookback_start,
lookback_end,
already_processed_files)
self.logger.info('loaded %d files to process' % len(logs_to_process))
if not logs_to_process:
self.logger.info("Log processing done (%0.2f minutes)" %
((time.time() - start) / 60))
return
# map
processor_args = (self.total_conf, self.logger)
results = multiprocess_collate(processor_args, logs_to_process,
self.worker_count)
#reduce
aggr_data = {}
processed_files = already_processed_files
for item, data in results:
# since item contains the plugin and the log name, new plugins will
# "reprocess" the file and the results will be in the final csv.
processed_files.add(item)
for k, d in data.items():
existing_data = aggr_data.get(k, {})
for i, j in d.items():
current = existing_data.get(i, 0)
# merging strategy for key collisions is addition
# processing plugins need to realize this
existing_data[i] = current + j
aggr_data[k] = existing_data
# group
# reduce a large number of keys in aggr_data[k] to a small number of
# output keys
keylist_mapping = self.log_processor.generate_keylist_mapping()
final_info = collections.defaultdict(dict)
for account, data in aggr_data.items():
for key, mapping in keylist_mapping.items():
if isinstance(mapping, (list, set)):
value = 0
for k in mapping:
try:
value += data[k]
except KeyError:
pass
else:
try:
value = data[mapping]
except KeyError:
value = 0
final_info[account][key] = value
# output
sorted_keylist_mapping = sorted(keylist_mapping)
columns = 'data_ts,account,' + ','.join(sorted_keylist_mapping)
out_buf = [columns]
for (account, year, month, day, hour), d in final_info.items():
data_ts = '%s/%s/%s %s:00:00' % (year, month, day, hour)
row = [data_ts]
row.append('%s' % account)
for k in sorted_keylist_mapping:
row.append('%s' % d[k])
out_buf.append(','.join(row))
out_buf = '\n'.join(out_buf)
h = hashlib.md5(out_buf).hexdigest()
upload_name = time.strftime('%Y/%m/%d/%H/') + '%s.csv.gz' % h
f = cStringIO.StringIO(out_buf)
self.log_processor.internal_proxy.upload_file(f,
self.log_processor_account,
self.log_processor_container,
upload_name)
# cleanup
s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL)
f = cStringIO.StringIO(s)
self.log_processor.internal_proxy.upload_file(f,
self.log_processor_account,
self.log_processor_container,
'processed_files.pickle.gz')
self.logger.info("Log processing done (%0.2f minutes)" %
((time.time() - start) / 60))
def multiprocess_collate(processor_args, logs_to_process, worker_count):
'''yield hourly data from logs_to_process'''
results = []
in_queue = multiprocessing.Queue()
out_queue = multiprocessing.Queue()
for _ in range(worker_count):
p = multiprocessing.Process(target=collate_worker,
args=(processor_args,
in_queue,
out_queue))
p.start()
results.append(p)
for x in logs_to_process:
in_queue.put(x)
for _ in range(worker_count):
in_queue.put(None)
count = 0
while True:
try:
item, data = out_queue.get_nowait()
count += 1
if data:
yield item, data
if count >= len(logs_to_process):
# this implies that one result will come from every request
break
except Queue.Empty:
time.sleep(.1)
for r in results:
r.join()
def collate_worker(processor_args, in_queue, out_queue):
'''worker process for multiprocess_collate'''
p = LogProcessor(*processor_args)
while True:
try:
item = in_queue.get_nowait()
if item is None:
break
except Queue.Empty:
time.sleep(.1)
else:
ret = p.process_one_file(*item)
out_queue.put((item, ret))

170
swift/stats/log_uploader.py Normal file
View File

@ -0,0 +1,170 @@
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import with_statement
import os
import hashlib
import time
import gzip
import glob
from paste.deploy import appconfig
from swift.common.internal_proxy import InternalProxy
from swift.common.daemon import Daemon
from swift.common import utils
class LogUploader(Daemon):
'''
Given a local directory, a swift account, and a container name, LogParser
will upload all files in the local directory to the given account/
container. All but the newest files will be uploaded, and the files' md5
sum will be computed. The hash is used to prevent duplicate data from
being uploaded multiple times in different files (ex: log lines). Since
the hash is computed, it is also used as the uploaded object's etag to
ensure data integrity.
Note that after the file is successfully uploaded, it will be unlinked.
The given proxy server config is used to instantiate a proxy server for
the object uploads.
'''
def __init__(self, uploader_conf, plugin_name):
super(LogUploader, self).__init__(uploader_conf)
log_dir = uploader_conf.get('log_dir', '/var/log/swift/')
swift_account = uploader_conf['swift_account']
container_name = uploader_conf['container_name']
source_filename_format = uploader_conf['source_filename_format']
proxy_server_conf_loc = uploader_conf.get('proxy_server_conf',
'/etc/swift/proxy-server.conf')
proxy_server_conf = appconfig('config:%s' % proxy_server_conf_loc,
name='proxy-server')
new_log_cutoff = int(uploader_conf.get('new_log_cutoff', '7200'))
unlink_log = uploader_conf.get('unlink_log', 'True').lower() in \
('true', 'on', '1', 'yes')
self.unlink_log = unlink_log
self.new_log_cutoff = new_log_cutoff
if not log_dir.endswith('/'):
log_dir = log_dir + '/'
self.log_dir = log_dir
self.swift_account = swift_account
self.container_name = container_name
self.filename_format = source_filename_format
self.internal_proxy = InternalProxy(proxy_server_conf)
log_name = 'swift-log-uploader-%s' % plugin_name
self.logger = utils.get_logger(uploader_conf, plugin_name)
def run_once(self):
self.logger.info("Uploading logs")
start = time.time()
self.upload_all_logs()
self.logger.info("Uploading logs complete (%0.2f minutes)" %
((time.time() - start) / 60))
def upload_all_logs(self):
i = [(self.filename_format.index(c), c) for c in '%Y %m %d %H'.split()]
i.sort()
year_offset = month_offset = day_offset = hour_offset = None
base_offset = len(self.log_dir)
for start, c in i:
offset = base_offset + start
if c == '%Y':
year_offset = offset, offset + 4
# Add in the difference between len(%Y) and the expanded
# version of %Y (????). This makes sure the codes after this
# one will align properly in the final filename.
base_offset += 2
elif c == '%m':
month_offset = offset, offset + 2
elif c == '%d':
day_offset = offset, offset + 2
elif c == '%H':
hour_offset = offset, offset + 2
if not (year_offset and month_offset and day_offset and hour_offset):
# don't have all the parts, can't upload anything
return
glob_pattern = self.filename_format
glob_pattern = glob_pattern.replace('%Y', '????', 1)
glob_pattern = glob_pattern.replace('%m', '??', 1)
glob_pattern = glob_pattern.replace('%d', '??', 1)
glob_pattern = glob_pattern.replace('%H', '??', 1)
filelist = glob.iglob(os.path.join(self.log_dir, glob_pattern))
current_hour = int(time.strftime('%H'))
today = int(time.strftime('%Y%m%d'))
self.internal_proxy.create_container(self.swift_account,
self.container_name)
for filename in filelist:
try:
# From the filename, we need to derive the year, month, day,
# and hour for the file. These values are used in the uploaded
# object's name, so they should be a reasonably accurate
# representation of the time for which the data in the file was
# collected. The file's last modified time is not a reliable
# representation of the data in the file. For example, an old
# log file (from hour A) may be uploaded or moved into the
# log_dir in hour Z. The file's modified time will be for hour
# Z, and therefore the object's name in the system will not
# represent the data in it.
# If the filename doesn't match the format, it shouldn't be
# uploaded.
year = filename[slice(*year_offset)]
month = filename[slice(*month_offset)]
day = filename[slice(*day_offset)]
hour = filename[slice(*hour_offset)]
except IndexError:
# unexpected filename format, move on
self.logger.error("Unexpected log: %s" % filename)
continue
if ((time.time() - os.stat(filename).st_mtime) <
self.new_log_cutoff):
# don't process very new logs
self.logger.debug(
"Skipping log: %s (< %d seconds old)" % (filename,
self.new_log_cutoff))
continue
self.upload_one_log(filename, year, month, day, hour)
def upload_one_log(self, filename, year, month, day, hour):
if os.path.getsize(filename) == 0:
self.logger.debug("Log %s is 0 length, skipping" % filename)
return
self.logger.debug("Processing log: %s" % filename)
filehash = hashlib.md5()
already_compressed = True if filename.endswith('.gz') else False
opener = gzip.open if already_compressed else open
f = opener(filename, 'rb')
try:
for line in f:
# filter out bad lines here?
filehash.update(line)
finally:
f.close()
filehash = filehash.hexdigest()
# By adding a hash to the filename, we ensure that uploaded files
# have unique filenames and protect against uploading one file
# more than one time. By using md5, we get an etag for free.
target_filename = '/'.join([year, month, day, hour, filehash + '.gz'])
if self.internal_proxy.upload_file(filename,
self.swift_account,
self.container_name,
target_filename,
compress=(not already_compressed)):
self.logger.debug("Uploaded log %s to %s" %
(filename, target_filename))
if self.unlink_log:
os.unlink(filename)
else:
self.logger.error("ERROR: Upload of log %s failed!" % filename)

View File

@ -0,0 +1,68 @@
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from swift.common.utils import get_logger
class StatsLogProcessor(object):
"""Transform account storage stat logs"""
def __init__(self, conf):
self.logger = get_logger(conf)
def process(self, obj_stream, account, container, object_name):
'''generate hourly groupings of data from one stats log file'''
account_totals = {}
year, month, day, hour, _ = object_name.split('/')
for line in obj_stream:
if not line:
continue
try:
(account,
container_count,
object_count,
bytes_used) = line.split(',')
except (IndexError, ValueError):
# bad line data
self.logger.debug('Bad line data: %s' % repr(line))
continue
account = account.strip('"')
container_count = int(container_count.strip('"'))
object_count = int(object_count.strip('"'))
bytes_used = int(bytes_used.strip('"'))
aggr_key = (account, year, month, day, hour)
d = account_totals.get(aggr_key, {})
d['replica_count'] = d.setdefault('replica_count', 0) + 1
d['container_count'] = d.setdefault('container_count', 0) + \
container_count
d['object_count'] = d.setdefault('object_count', 0) + \
object_count
d['bytes_used'] = d.setdefault('bytes_used', 0) + \
bytes_used
account_totals[aggr_key] = d
return account_totals
def keylist_mapping(self):
'''
returns a dictionary of final keys mapped to source keys
'''
keylist_mapping = {
# <db key> : <row key> or <set of row keys>
'bytes_used': 'bytes_used',
'container_count': 'container_count',
'object_count': 'object_count',
'replica_count': 'replica_count',
}
return keylist_mapping

View File

@ -0,0 +1,34 @@
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Tests for swift.common.compressing_file_reader """
import unittest
import cStringIO
from swift.common.compressing_file_reader import CompressingFileReader
class TestCompressingFileReader(unittest.TestCase):
def test_read(self):
plain = 'obj\ndata'
s = cStringIO.StringIO(plain)
expected = '\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xff\xcaO\xca\xe2JI,'\
'I\x04\x00\x00\x00\xff\xff\x03\x00P(\xa8\x1f\x08\x00\x00'\
'\x00'
x = CompressingFileReader(s)
compressed = ''.join(iter(lambda: x.read(), ''))
self.assertEquals(compressed, expected)
self.assertEquals(x.read(), '')

View File

@ -0,0 +1,29 @@
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# TODO: Tests
import unittest
from swift.common import internal_proxy
class TestInternalProxy(unittest.TestCase):
def test_placeholder(self):
pass
if __name__ == '__main__':
unittest.main()

View File

@ -247,5 +247,33 @@ class TestUtils(unittest.TestCase):
self.assert_(callable(
utils.load_libc_function('some_not_real_function')))
def test_readconf(self):
conf = '''[section1]
foo = bar
[section2]
log_name = yarr'''
f = open('/tmp/test', 'wb')
f.write(conf)
f.close()
result = utils.readconf('/tmp/test')
expected = {'log_name': None,
'section1': {'foo': 'bar'},
'section2': {'log_name': 'yarr'}}
self.assertEquals(result, expected)
result = utils.readconf('/tmp/test', 'section1')
expected = {'log_name': 'section1', 'foo': 'bar'}
self.assertEquals(result, expected)
result = utils.readconf('/tmp/test', 'section2').get('log_name')
expected = 'yarr'
self.assertEquals(result, expected)
result = utils.readconf('/tmp/test', 'section1', log_name='foo').get('log_name')
expected = 'foo'
self.assertEquals(result, expected)
result = utils.readconf('/tmp/test', 'section1', defaults={'bar': 'baz'})
expected = {'log_name': 'section1', 'foo': 'bar', 'bar': 'baz'}
self.assertEquals(result, expected)
os.unlink('/tmp/test')
if __name__ == '__main__':
unittest.main()

View File

View File

@ -0,0 +1,29 @@
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# TODO: Tests
import unittest
from swift.stats import access_processor
class TestAccessProcessor(unittest.TestCase):
def test_placeholder(self):
pass
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,29 @@
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# TODO: Tests
import unittest
from swift.stats import account_stats
class TestAccountStats(unittest.TestCase):
def test_placeholder(self):
pass
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,227 @@
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from swift.stats import log_processor
class DumbLogger(object):
def __getattr__(self, n):
return self.foo
def foo(self, *a, **kw):
pass
class DumbInternalProxy(object):
def get_container_list(self, account, container, marker=None):
n = '2010/03/14/13/obj1'
if marker is None or n > marker:
return [{'name': n}]
else:
return []
def get_object(self, account, container, object_name):
code = 200
if object_name.endswith('.gz'):
# same data as below, compressed with gzip -9
def data():
yield '\x1f\x8b\x08'
yield '\x08"\xd79L'
yield '\x02\x03te'
yield 'st\x00\xcbO'
yield '\xca\xe2JI,I'
yield '\xe4\x02\x00O\xff'
yield '\xa3Y\t\x00\x00\x00'
else:
def data():
yield 'obj\n'
yield 'data'
return code, data()
class TestLogProcessor(unittest.TestCase):
access_test_line = 'Jul 9 04:14:30 saio proxy 1.2.3.4 4.5.6.7 '\
'09/Jul/2010/04/14/30 GET '\
'/v1/acct/foo/bar?format=json&foo HTTP/1.0 200 - '\
'curl tk4e350daf-9338-4cc6-aabb-090e49babfbd '\
'6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262'
stats_test_line = 'account,1,2,3'
proxy_config = {'log-processor': {
}
}
def test_access_log_line_parser(self):
access_proxy_config = self.proxy_config.copy()
access_proxy_config.update({
'log-processor-access': {
'source_filename_format':'%Y%m%d%H*',
'class_path':
'swift.stats.access_processor.AccessLogProcessor'
}})
p = log_processor.LogProcessor(access_proxy_config, DumbLogger())
result = p.plugins['access']['instance'].log_line_parser(self.access_test_line)
self.assertEquals(result, {'code': 200,
'processing_time': '0.0262',
'auth_token': 'tk4e350daf-9338-4cc6-aabb-090e49babfbd',
'month': '07',
'second': '30',
'year': '2010',
'query': 'format=json&foo',
'tz': '+0000',
'http_version': 'HTTP/1.0',
'object_name': 'bar',
'etag': '-',
'foo': 1,
'method': 'GET',
'trans_id': 'txfa431231-7f07-42fd-8fc7-7da9d8cc1f90',
'client_ip': '1.2.3.4',
'format': 1,
'bytes_out': 95,
'container_name': 'foo',
'day': '09',
'minute': '14',
'account': 'acct',
'hour': '04',
'referrer': '-',
'request': '/v1/acct/foo/bar',
'user_agent': 'curl',
'bytes_in': 6,
'lb_ip': '4.5.6.7'})
def test_process_one_access_file(self):
access_proxy_config = self.proxy_config.copy()
access_proxy_config.update({
'log-processor-access': {
'source_filename_format':'%Y%m%d%H*',
'class_path':
'swift.stats.access_processor.AccessLogProcessor'
}})
p = log_processor.LogProcessor(access_proxy_config, DumbLogger())
def get_object_data(*a, **kw):
return [self.access_test_line]
p.get_object_data = get_object_data
result = p.process_one_file('access', 'a', 'c', 'o')
expected = {('acct', '2010', '07', '09', '04'):
{('public', 'object', 'GET', '2xx'): 1,
('public', 'bytes_out'): 95,
'marker_query': 0,
'format_query': 1,
'delimiter_query': 0,
'path_query': 0,
('public', 'bytes_in'): 6,
'prefix_query': 0}}
self.assertEquals(result, expected)
def test_get_container_listing(self):
p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
p._internal_proxy = DumbInternalProxy()
result = p.get_container_listing('a', 'foo')
expected = ['2010/03/14/13/obj1']
self.assertEquals(result, expected)
result = p.get_container_listing('a', 'foo', listing_filter=expected)
expected = []
self.assertEquals(result, expected)
result = p.get_container_listing('a', 'foo', start_date='2010031412',
end_date='2010031414')
expected = ['2010/03/14/13/obj1']
self.assertEquals(result, expected)
result = p.get_container_listing('a', 'foo', start_date='2010031414')
expected = []
self.assertEquals(result, expected)
result = p.get_container_listing('a', 'foo', start_date='2010031410',
end_date='2010031412')
expected = []
self.assertEquals(result, expected)
def test_get_object_data(self):
p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
p._internal_proxy = DumbInternalProxy()
result = list(p.get_object_data('a', 'c', 'o', False))
expected = ['obj','data']
self.assertEquals(result, expected)
result = list(p.get_object_data('a', 'c', 'o.gz', True))
self.assertEquals(result, expected)
def test_get_stat_totals(self):
stats_proxy_config = self.proxy_config.copy()
stats_proxy_config.update({
'log-processor-stats': {
'class_path':
'swift.stats.stats_processor.StatsLogProcessor'
}})
p = log_processor.LogProcessor(stats_proxy_config, DumbLogger())
p._internal_proxy = DumbInternalProxy()
def get_object_data(*a,**kw):
return [self.stats_test_line]
p.get_object_data = get_object_data
result = p.process_one_file('stats', 'a', 'c', 'y/m/d/h/o')
expected = {('account', 'y', 'm', 'd', 'h'):
{'replica_count': 1,
'object_count': 2,
'container_count': 1,
'bytes_used': 3}}
self.assertEquals(result, expected)
def test_generate_keylist_mapping(self):
p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
result = p.generate_keylist_mapping()
expected = {}
print p.plugins
self.assertEquals(result, expected)
def test_generate_keylist_mapping_with_dummy_plugins(self):
class Plugin1(object):
def keylist_mapping(self):
return {'a': 'b', 'c': 'd', 'e': ['f', 'g']}
class Plugin2(object):
def keylist_mapping(self):
return {'a': '1', 'e': '2', 'h': '3'}
p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
p.plugins['plugin1'] = {'instance': Plugin1()}
p.plugins['plugin2'] = {'instance': Plugin2()}
result = p.generate_keylist_mapping()
expected = {'a': set(['b', '1']), 'c': 'd', 'e': set(['2', 'f', 'g']),
'h': '3'}
self.assertEquals(result, expected)
def test_access_keylist_mapping_format(self):
proxy_config = self.proxy_config.copy()
proxy_config.update({
'log-processor-access': {
'source_filename_format':'%Y%m%d%H*',
'class_path':
'swift.stats.access_processor.AccessLogProcessor'
}})
p = log_processor.LogProcessor(proxy_config, DumbLogger())
mapping = p.generate_keylist_mapping()
for k, v in mapping.items():
# these only work for Py2.7+
#self.assertIsInstance(k, str)
self.assertTrue(isinstance(k, str), type(k))
def test_stats_keylist_mapping_format(self):
proxy_config = self.proxy_config.copy()
proxy_config.update({
'log-processor-stats': {
'class_path':
'swift.stats.stats_processor.StatsLogProcessor'
}})
p = log_processor.LogProcessor(proxy_config, DumbLogger())
mapping = p.generate_keylist_mapping()
for k, v in mapping.items():
# these only work for Py2.7+
#self.assertIsInstance(k, str)
self.assertTrue(isinstance(k, str), type(k))

View File

@ -0,0 +1,29 @@
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# TODO: Tests
import unittest
from swift.stats import log_uploader
class TestLogUploader(unittest.TestCase):
def test_placeholder(self):
pass
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,29 @@
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# TODO: Tests
import unittest
from swift.stats import stats_processor
class TestStatsProcessor(unittest.TestCase):
def test_placeholder(self):
pass
if __name__ == '__main__':
unittest.main()