Adding container stats collector, unit tests, and refactoring some of the stats code. There will have to be changes to both the swift and rackswift conf files before this can be released. Please DO NOT approve this branch for merge until glange's stats stuff is all ready to go. gracias.

This commit is contained in:
David Goetz 2011-05-18 15:48:17 +00:00 committed by Tarmac
commit bdfe38917d
13 changed files with 466 additions and 391 deletions

View File

@ -14,13 +14,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from swift.stats.account_stats import AccountStat
from swift.stats.db_stats_collector import AccountStatsCollector
from swift.common.utils import parse_options
from swift.common.daemon import run_daemon
if __name__ == '__main__':
conf_file, options = parse_options()
# currently AccountStat only supports run_once
# currently AccountStatsCollector only supports run_once
options['once'] = True
run_daemon(AccountStat, conf_file, section_name='log-processor-stats',
run_daemon(AccountStatsCollector, conf_file,
section_name='log-processor-stats',
log_name="account-stats", **options)

View File

@ -1,3 +1,4 @@
#!/usr/bin/env python
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -13,17 +14,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# TODO: Tests
import unittest
from swift.stats import account_stats
class TestAccountStats(unittest.TestCase):
def test_placeholder(self):
pass
from swift.stats.db_stats_collector import ContainerStatsCollector
from swift.common.utils import parse_options
from swift.common.daemon import run_daemon
if __name__ == '__main__':
unittest.main()
conf_file, options = parse_options()
# currently ContainerStatsCollector only supports run_once
options['once'] = True
run_daemon(ContainerStatsCollector, conf_file,
section_name='log-processor-container-stats',
log_name="container-stats", **options)

View File

@ -102,7 +102,6 @@ Running the stats system on SAIO
local1.* ~
#. Edit /etc/rsyslog.conf and make the following change::
$PrivDropToGroup adm
#. `mkdir -p /var/log/swift/hourly`
@ -113,50 +112,75 @@ Running the stats system on SAIO
#. Relogin to let the group change take effect.
#. Create `/etc/swift/log-processor.conf`::
[log-processor]
swift_account = <your-stats-account-hash>
user = <your-user-name>
[log-processor]
swift_account = <your-stats-account-hash>
user = <your-user-name>
[log-processor-access]
swift_account = <your-stats-account-hash>
container_name = log_data
log_dir = /var/log/swift/hourly/
source_filename_format = %Y%m%d%H
class_path = swift.stats.access_processor.AccessLogProcessor
user = <your-user-name>
[log-processor-access]
swift_account = <your-stats-account-hash>
container_name = log_data
log_dir = /var/log/swift/hourly/
source_filename_pattern = ^
(?P<year>[0-9]{4})
(?P<month>[0-1][0-9])
(?P<day>[0-3][0-9])
(?P<hour>[0-2][0-9])
.*$
class_path = swift.stats.access_processor.AccessLogProcessor
user = <your-user-name>
[log-processor-stats]
swift_account = <your-stats-account-hash>
container_name = account_stats
log_dir = /var/log/swift/stats/
source_filename_format = %Y%m%d%H_*
class_path = swift.stats.stats_processor.StatsLogProcessor
account_server_conf = /etc/swift/account-server/1.conf
user = <your-user-name>
[log-processor-stats]
swift_account = <your-stats-account-hash>
container_name = account_stats
log_dir = /var/log/swift/stats/
class_path = swift.stats.stats_processor.StatsLogProcessor
devices = /srv/1/node
mount_check = false
user = <your-user-name>
[log-processor-container-stats]
swift_account = <your-stats-account-hash>
container_name = container_stats
log_dir = /var/log/swift/stats/
class_path = swift.stats.stats_processor.StatsLogProcessor
processable = false
devices = /srv/1/node
mount_check = false
user = <your-user-name>
#. Add the following under [app:proxy-server] in `/etc/swift/proxy-server.conf`::
log_facility = LOG_LOCAL1
log_facility = LOG_LOCAL1
#. Create a `cron` job to run once per hour to create the stats logs. In
`/etc/cron.d/swift-stats-log-creator`::
0 * * * * <your-user-name> swift-account-stats-logger /etc/swift/log-processor.conf
0 * * * * <your-user-name> /usr/local/bin/swift-account-stats-logger /etc/swift/log-processor.conf
#. Create a `cron` job to run once per hour to create the container stats logs. In
`/etc/cron.d/swift-container-stats-log-creator`::
5 * * * * <your-user-name> /usr/local/bin/swift-container-stats-logger /etc/swift/log-processor.conf
#. Create a `cron` job to run once per hour to upload the stats logs. In
`/etc/cron.d/swift-stats-log-uploader`::
10 * * * * <your-user-name> swift-log-uploader /etc/swift/log-processor.conf stats
10 * * * * <your-user-name> /usr/local/bin/swift-log-uploader /etc/swift/log-processor.conf stats
#. Create a `cron` job to run once per hour to upload the stats logs. In
`/etc/cron.d/swift-stats-log-uploader`::
15 * * * * <your-user-name> /usr/local/bin/swift-log-uploader /etc/swift/log-processor.conf container-stats
#. Create a `cron` job to run once per hour to upload the access logs. In
`/etc/cron.d/swift-access-log-uploader`::
5 * * * * <your-user-name> swift-log-uploader /etc/swift/log-processor.conf access
5 * * * * <your-user-name> /usr/local/bin/swift-log-uploader /etc/swift/log-processor.conf access
#. Create a `cron` job to run once per hour to process the logs. In
`/etc/cron.d/swift-stats-processor`::
30 * * * * <your-user-name> swift-log-stats-collector /etc/swift/log-processor.conf
30 * * * * <your-user-name> /usr/local/bin/swift-log-stats-collector /etc/swift/log-processor.conf
After running for a few hours, you should start to see .csv files in the
log_processing_data container in the swift stats account that was created

View File

@ -14,7 +14,12 @@ swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
# log_dir = /var/log/swift/
swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
container_name = log_data
source_filename_pattern = access-%Y%m%d%H
source_filename_pattern = ^
(?P<year>[0-9]{4})
(?P<month>[0-1][0-9])
(?P<day>[0-3][0-9])
(?P<hour>[0-2][0-9])
.*$
# new_log_cutoff = 7200
# unlink_log = True
class_path = swift.stats.access_processor.AccessLogProcessor
@ -31,9 +36,21 @@ class_path = swift.stats.access_processor.AccessLogProcessor
# log_dir = /var/log/swift/
swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
container_name = account_stats
source_filename_pattern = stats-%Y%m%d%H_.*
# new_log_cutoff = 7200
# unlink_log = True
class_path = swift.stats.stats_processor.StatsLogProcessor
# account_server_conf = /etc/swift/account-server.conf
# devices = /srv/node
# mount_check = true
# user = swift
[log-processor-container-stats]
# log_dir = /var/log/swift/
swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
container_name = container_stats
# new_log_cutoff = 7200
# unlink_log = True
class_path = swift.stats.stats_processor.StatsLogProcessor
processable = false
# devices = /srv/node
# mount_check = true
# user = swift

View File

@ -95,6 +95,7 @@ setup(
'bin/swift-log-uploader',
'bin/swift-log-stats-collector',
'bin/swift-account-stats-logger',
'bin/swift-container-stats-logger',
'bin/swauth-add-account', 'bin/swauth-add-user',
'bin/swauth-cleanup-tokens', 'bin/swauth-delete-account',
'bin/swauth-delete-user', 'bin/swauth-list', 'bin/swauth-prep',

View File

@ -883,7 +883,7 @@ class ContainerBroker(DatabaseBroker):
"""
Get global data for the container.
:returns: a tuple of (account, container, created_at, put_timestamp,
:returns: sqlite.row of (account, container, created_at, put_timestamp,
delete_timestamp, object_count, bytes_used,
reported_put_timestamp, reported_delete_timestamp,
reported_object_count, reported_bytes_used, hash, id)
@ -1394,7 +1394,7 @@ class AccountBroker(DatabaseBroker):
"""
Get global data for the account.
:returns: a tuple of (account, created_at, put_timestamp,
:returns: sqlite.row of (account, created_at, put_timestamp,
delete_timestamp, container_count, object_count,
bytes_used, hash, id)
"""

View File

@ -1,112 +0,0 @@
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
from paste.deploy import appconfig
import shutil
import hashlib
from swift.account.server import DATADIR as account_server_data_dir
from swift.common.db import AccountBroker
from swift.common.utils import renamer, get_logger, readconf, mkdirs
from swift.common.constraints import check_mount
from swift.common.daemon import Daemon
class AccountStat(Daemon):
"""
Extract storage stats from account databases on the account
storage nodes
"""
def __init__(self, stats_conf):
super(AccountStat, self).__init__(stats_conf)
target_dir = stats_conf.get('log_dir', '/var/log/swift')
account_server_conf_loc = stats_conf.get('account_server_conf',
'/etc/swift/account-server.conf')
server_conf = appconfig('config:%s' % account_server_conf_loc,
name='account-server')
filename_format = stats_conf['source_filename_format']
if filename_format.count('*') > 1:
raise Exception('source filename format should have at max one *')
self.filename_format = filename_format
self.target_dir = target_dir
mkdirs(self.target_dir)
self.devices = server_conf.get('devices', '/srv/node')
self.mount_check = server_conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')
self.logger = \
get_logger(stats_conf, log_route='account-stats')
def run_once(self, *args, **kwargs):
self.logger.info(_("Gathering account stats"))
start = time.time()
self.find_and_process()
self.logger.info(
_("Gathering account stats complete (%0.2f minutes)") %
((time.time() - start) / 60))
def find_and_process(self):
src_filename = time.strftime(self.filename_format)
working_dir = os.path.join(self.target_dir, '.stats_tmp')
shutil.rmtree(working_dir, ignore_errors=True)
mkdirs(working_dir)
tmp_filename = os.path.join(working_dir, src_filename)
hasher = hashlib.md5()
with open(tmp_filename, 'wb') as statfile:
# csv has the following columns:
# Account Name, Container Count, Object Count, Bytes Used
for device in os.listdir(self.devices):
if self.mount_check and not check_mount(self.devices, device):
self.logger.error(
_("Device %s is not mounted, skipping.") % device)
continue
accounts = os.path.join(self.devices,
device,
account_server_data_dir)
if not os.path.exists(accounts):
self.logger.debug(_("Path %s does not exist, skipping.") %
accounts)
continue
for root, dirs, files in os.walk(accounts, topdown=False):
for filename in files:
if filename.endswith('.db'):
db_path = os.path.join(root, filename)
broker = AccountBroker(db_path)
if not broker.is_deleted():
(account_name,
_junk, _junk, _junk,
container_count,
object_count,
bytes_used,
_junk, _junk) = broker.get_info()
line_data = '"%s",%d,%d,%d\n' % (
account_name, container_count,
object_count, bytes_used)
statfile.write(line_data)
hasher.update(line_data)
file_hash = hasher.hexdigest()
hash_index = src_filename.find('*')
if hash_index < 0:
# if there is no * in the target filename, the uploader probably
# won't work because we are crafting a filename that doesn't
# fit the pattern
src_filename = '_'.join([src_filename, file_hash])
else:
parts = src_filename[:hash_index], src_filename[hash_index + 1:]
src_filename = ''.join([parts[0], file_hash, parts[1]])
renamer(tmp_filename, os.path.join(self.target_dir, src_filename))
shutil.rmtree(working_dir, ignore_errors=True)

View File

@ -0,0 +1,152 @@
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
from paste.deploy import appconfig
import shutil
import hashlib
import urllib
from swift.account.server import DATADIR as account_server_data_dir
from swift.container.server import DATADIR as container_server_data_dir
from swift.common.db import AccountBroker, ContainerBroker
from swift.common.utils import renamer, get_logger, readconf, mkdirs, \
TRUE_VALUES, remove_file
from swift.common.constraints import check_mount
from swift.common.daemon import Daemon
class DatabaseStatsCollector(Daemon):
"""
Extract storage stats from account databases on the account
storage nodes
Any subclasses must define the function get_data.
"""
def __init__(self, stats_conf, stats_type, data_dir, filename_format):
super(DatabaseStatsCollector, self).__init__(stats_conf)
self.stats_type = stats_type
self.data_dir = data_dir
self.filename_format = filename_format
self.devices = stats_conf.get('devices', '/srv/node')
self.mount_check = stats_conf.get('mount_check',
'true').lower() in TRUE_VALUES
self.target_dir = stats_conf.get('log_dir', '/var/log/swift')
mkdirs(self.target_dir)
self.logger = get_logger(stats_conf,
log_route='%s-stats' % stats_type)
def run_once(self, *args, **kwargs):
self.logger.info(_("Gathering %s stats" % self.stats_type))
start = time.time()
self.find_and_process()
self.logger.info(_("Gathering %s stats complete (%0.2f minutes)") %
(self.stats_type, (time.time() - start) / 60))
def get_data(self):
raise Exception('Not Implemented')
def find_and_process(self):
src_filename = time.strftime(self.filename_format)
working_dir = os.path.join(self.target_dir,
'.%-stats_tmp' % self.stats_type)
shutil.rmtree(working_dir, ignore_errors=True)
mkdirs(working_dir)
tmp_filename = os.path.join(working_dir, src_filename)
hasher = hashlib.md5()
try:
with open(tmp_filename, 'wb') as statfile:
for device in os.listdir(self.devices):
if self.mount_check and not check_mount(self.devices,
device):
self.logger.error(
_("Device %s is not mounted, skipping.") % device)
continue
db_dir = os.path.join(self.devices, device, self.data_dir)
if not os.path.exists(db_dir):
self.logger.debug(
_("Path %s does not exist, skipping.") % db_dir)
continue
for root, dirs, files in os.walk(db_dir, topdown=False):
for filename in files:
if filename.endswith('.db'):
db_path = os.path.join(root, filename)
line_data = self.get_data(db_path)
if line_data:
statfile.write(line_data)
hasher.update(line_data)
src_filename += hasher.hexdigest()
renamer(tmp_filename, os.path.join(self.target_dir, src_filename))
finally:
shutil.rmtree(working_dir, ignore_errors=True)
class AccountStatsCollector(DatabaseStatsCollector):
"""
Extract storage stats from account databases on the account
storage nodes
"""
def __init__(self, stats_conf):
super(AccountStatsCollector, self).__init__(stats_conf, 'account',
account_server_data_dir,
'stats-%Y%m%d%H_')
def get_data(self, db_path):
"""
Data for generated csv has the following columns:
Account Hash, Container Count, Object Count, Bytes Used
"""
line_data = None
broker = AccountBroker(db_path)
if not broker.is_deleted():
info = broker.get_info()
line_data = '"%s",%d,%d,%d\n' % (info['account'],
info['container_count'],
info['object_count'],
info['bytes_used'])
return line_data
class ContainerStatsCollector(DatabaseStatsCollector):
"""
Extract storage stats from container databases on the container
storage nodes
"""
def __init__(self, stats_conf):
super(ContainerStatsCollector, self).__init__(stats_conf, 'container',
container_server_data_dir,
'container-stats-%Y%m%d%H_')
def get_data(self, db_path):
"""
Data for generated csv has the following columns:
Account Hash, Container Name, Object Count, Bytes Used
"""
line_data = None
broker = ContainerBroker(db_path)
if not broker.is_deleted():
info = broker.get_info()
encoded_container_name = urllib.quote(info['container'])
line_data = '"%s","%s",%d,%d\n' % (
info['account'],
encoded_container_name,
info['object_count'],
info['bytes_used'])
return line_data

View File

@ -27,13 +27,14 @@ import hashlib
from swift.common.internal_proxy import InternalProxy
from swift.common.exceptions import ChunkReadTimeout
from swift.common.utils import get_logger, readconf
from swift.common.utils import get_logger, readconf, TRUE_VALUES
from swift.common.daemon import Daemon
now = datetime.datetime.now
class BadFileDownload(Exception):
def __init__(self, status_code=None):
self.status_code = status_code
@ -56,6 +57,9 @@ class LogProcessor(object):
for section in (x for x in conf if x.startswith(plugin_prefix)):
plugin_name = section[len(plugin_prefix):]
plugin_conf = conf.get(section, {})
if plugin_conf.get('processable', 'true').lower() not in \
TRUE_VALUES:
continue
self.plugins[plugin_name] = plugin_conf
class_path = self.plugins[plugin_name]['class_path']
import_target, class_name = class_path.rsplit('.', 1)

View File

@ -19,6 +19,7 @@ import hashlib
import time
import gzip
import re
import sys
from paste.deploy import appconfig
from swift.common.internal_proxy import InternalProxy
@ -40,6 +41,17 @@ class LogUploader(Daemon):
The given proxy server config is used to instantiate a proxy server for
the object uploads.
The default log file format is: plugin_name-%Y%m%d%H* . Any other format
of log file names must supply a regular expression that defines groups
for year, month, day, and hour. The regular expression will be evaluated
with re.VERBOSE. A common example may be:
source_filename_pattern = ^cdn_logger-
(?P<year>[0-9]{4})
(?P<month>[0-1][0-9])
(?P<day>[0-3][0-9])
(?P<hour>[0-2][0-9])
.*$
'''
def __init__(self, uploader_conf, plugin_name):
@ -58,16 +70,14 @@ class LogUploader(Daemon):
self.new_log_cutoff = int(uploader_conf.get('new_log_cutoff', '7200'))
self.unlink_log = uploader_conf.get('unlink_log', 'True').lower() in \
utils.TRUE_VALUES
# source_filename_format is deprecated
source_filename_format = uploader_conf.get('source_filename_format')
source_filename_pattern = uploader_conf.get('source_filename_pattern')
if source_filename_format and not source_filename_pattern:
self.logger.warning(_('source_filename_format is unreliable and '
'deprecated; use source_filename_pattern'))
self.pattern = self.convert_glob_to_regex(source_filename_format)
else:
self.pattern = source_filename_pattern or '%Y%m%d%H'
self.filename_pattern = uploader_conf.get('source_filename_pattern',
'''
^%s-
(?P<year>[0-9]{4})
(?P<month>[0-1][0-9])
(?P<day>[0-3][0-9])
(?P<hour>[0-2][0-9])
.*$''' % plugin_name)
def run_once(self, *args, **kwargs):
self.logger.info(_("Uploading logs"))
@ -76,44 +86,6 @@ class LogUploader(Daemon):
self.logger.info(_("Uploading logs complete (%0.2f minutes)") %
((time.time() - start) / 60))
def convert_glob_to_regex(self, glob):
"""
Make a best effort to support old style config globs
:param : old style config source_filename_format
:returns : new style config source_filename_pattern
"""
pattern = glob
pattern = pattern.replace('.', r'\.')
pattern = pattern.replace('*', r'.*')
pattern = pattern.replace('?', r'.?')
return pattern
def validate_filename_pattern(self):
"""
Validate source_filename_pattern
:returns : valid regex pattern based on soruce_filename_pattern with
group matches substituded for date fmt markers
"""
pattern = self.pattern
markers = {
'%Y': ('year', '(?P<year>[0-9]{4})'),
'%m': ('month', '(?P<month>[0-1][0-9])'),
'%d': ('day', '(?P<day>[0-3][0-9])'),
'%H': ('hour', '(?P<hour>[0-2][0-9])'),
}
for marker, (mtype, group) in markers.items():
if marker not in self.pattern:
self.logger.error(_('source_filename_pattern much contain a '
'marker %(marker)s to match the '
'%(mtype)s') % {'marker': marker,
'mtype': mtype})
return
pattern = pattern.replace(marker, group)
return pattern
def get_relpath_to_files_under_log_dir(self):
"""
Look under log_dir recursively and return all filenames as relpaths
@ -125,7 +97,7 @@ class LogUploader(Daemon):
all_files.extend(os.path.join(path, f) for f in files)
return [os.path.relpath(f, start=self.log_dir) for f in all_files]
def filter_files(self, all_files, pattern):
def filter_files(self, all_files):
"""
Filter files based on regex pattern
@ -137,15 +109,15 @@ class LogUploader(Daemon):
filename2match = {}
found_match = False
for filename in all_files:
match = re.match(pattern, filename)
match = re.match(self.filename_pattern, filename, re.VERBOSE)
if match:
found_match = True
full_path = os.path.join(self.log_dir, filename)
filename2match[full_path] = match.groupdict()
else:
self.logger.debug(_('%(filename)s does not match '
'%(pattern)s') % {'filename': filename,
'pattern': pattern})
'%(pattern)s') % {'filename': filename,
'pattern': self.filename_pattern})
return filename2match
def upload_all_logs(self):
@ -153,16 +125,13 @@ class LogUploader(Daemon):
Match files under log_dir to source_filename_pattern and upload to
swift
"""
pattern = self.validate_filename_pattern()
if not pattern:
self.logger.error(_('Invalid filename_format'))
return
all_files = self.get_relpath_to_files_under_log_dir()
filename2match = self.filter_files(all_files, pattern)
filename2match = self.filter_files(all_files)
if not filename2match:
self.logger.info(_('No files in %(log_dir)s match %(pattern)s') %
{'log_dir': self.log_dir, 'pattern': pattern})
return
self.logger.error(_('No files in %(log_dir)s match %(pattern)s') %
{'log_dir': self.log_dir,
'pattern': self.filename_pattern})
sys.exit(1)
if not self.internal_proxy.create_container(self.swift_account,
self.container_name):
self.logger.error(_('Unable to create container for '

View File

@ -94,8 +94,8 @@ def temptree(files, contents=''):
class FakeLogger(object):
# a thread safe logger
def __init__(self):
self.log_dict = dict(error=[], info=[], warning=[])
def __init__(self, *args, **kwargs):
self.log_dict = dict(error=[], info=[], warning=[], debug=[])
def error(self, *args, **kwargs):
self.log_dict['error'].append((args, kwargs))
@ -106,6 +106,9 @@ class FakeLogger(object):
def warning(self, *args, **kwargs):
self.log_dict['warning'].append((args, kwargs))
def debug(self, *args, **kwargs):
self.log_dict['debug'].append((args, kwargs))
class MockTrue(object):
"""

View File

@ -0,0 +1,155 @@
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import os
import time
import uuid
from shutil import rmtree
from swift.stats import db_stats_collector
from tempfile import mkdtemp
from test.unit import FakeLogger
from swift.common.db import AccountBroker, ContainerBroker
from swift.common.utils import mkdirs
class TestDbStats(unittest.TestCase):
def setUp(self):
self._was_logger = db_stats_collector.get_logger
db_stats_collector.get_logger = FakeLogger
self.testdir = os.path.join(mkdtemp(), 'tmp_test_db_stats')
self.devices = os.path.join(self.testdir, 'node')
rmtree(self.testdir, ignore_errors=1)
mkdirs(os.path.join(self.devices, 'sda'))
self.accounts = os.path.join(self.devices, 'sda', 'accounts')
self.containers = os.path.join(self.devices, 'sda', 'containers')
self.log_dir = '%s/log' % self.testdir
self.conf = dict(devices=self.devices,
log_dir=self.log_dir,
mount_check='false')
def tearDown(self):
db_stats_collector.get_logger = self._was_logger
rmtree(self.testdir)
def test_account_stat_get_data(self):
stat = db_stats_collector.AccountStatsCollector(self.conf)
account_db = AccountBroker("%s/acc.db" % self.accounts,
account='test_acc')
account_db.initialize()
account_db.put_container('test_container', time.time(),
None, 10, 1000)
info = stat.get_data("%s/acc.db" % self.accounts)
self.assertEquals('''"test_acc",1,10,1000\n''', info)
def test_container_stat_get_data(self):
stat = db_stats_collector.ContainerStatsCollector(self.conf)
container_db = ContainerBroker("%s/con.db" % self.containers,
account='test_acc', container='test_con')
container_db.initialize()
container_db.put_object('test_obj', time.time(), 10, 'text', 'faketag')
info = stat.get_data("%s/con.db" % self.containers)
self.assertEquals('''"test_acc","test_con",1,10\n''', info)
def _gen_account_stat(self):
stat = db_stats_collector.AccountStatsCollector(self.conf)
output_data = set()
for i in range(10):
account_db = AccountBroker("%s/stats-201001010%s-%s.db" %
(self.accounts, i, uuid.uuid4().hex),
account='test_acc_%s' % i)
account_db.initialize()
account_db.put_container('test_container', time.time(),
None, 10, 1000)
# this will "commit" the data
account_db.get_info()
output_data.add('''"test_acc_%s",1,10,1000''' % i),
self.assertEqual(len(output_data), 10)
return stat, output_data
def _gen_container_stat(self):
stat = db_stats_collector.ContainerStatsCollector(self.conf)
output_data = set()
for i in range(10):
account_db = ContainerBroker(
"%s/container-stats-201001010%s-%s.db" % (self.containers, i,
uuid.uuid4().hex),
account='test_acc_%s' % i, container='test_con')
account_db.initialize()
account_db.put_object('test_obj', time.time(), 10, 'text',
'faketag')
# this will "commit" the data
account_db.get_info()
output_data.add('''"test_acc_%s","test_con",1,10''' % i),
self.assertEqual(len(output_data), 10)
return stat, output_data
def test_account_stat_run_once_account(self):
stat, output_data = self._gen_account_stat()
stat.run_once()
stat_file = os.listdir(self.log_dir)[0]
with open(os.path.join(self.log_dir, stat_file)) as stat_handle:
for i in range(10):
data = stat_handle.readline()
output_data.discard(data.strip())
self.assertEqual(len(output_data), 0)
def test_account_stat_run_once_both(self):
acc_stat, acc_output_data = self._gen_account_stat()
con_stat, con_output_data = self._gen_container_stat()
acc_stat.run_once()
stat_file = os.listdir(self.log_dir)[0]
with open(os.path.join(self.log_dir, stat_file)) as stat_handle:
for i in range(10):
data = stat_handle.readline()
acc_output_data.discard(data.strip())
self.assertEqual(len(acc_output_data), 0)
con_stat.run_once()
stat_file = [f for f in os.listdir(self.log_dir) if f != stat_file][0]
with open(os.path.join(self.log_dir, stat_file)) as stat_handle:
for i in range(10):
data = stat_handle.readline()
con_output_data.discard(data.strip())
self.assertEqual(len(con_output_data), 0)
def test_account_stat_run_once_fail(self):
stat, output_data = self._gen_account_stat()
rmtree(self.accounts)
stat.run_once()
self.assertEquals(len(stat.logger.log_dict['debug']), 1)
def test_not_implemented(self):
db_stat = db_stats_collector.DatabaseStatsCollector(self.conf,
'account', 'test_dir', 'stats-%Y%m%d%H_')
self.assertRaises(Exception, db_stat.get_data)
def test_not_not_mounted(self):
self.conf['mount_check'] = 'true'
stat, output_data = self._gen_account_stat()
stat.run_once()
self.assertEquals(len(stat.logger.log_dict['error']), 1)
if __name__ == '__main__':
unittest.main()

View File

@ -30,11 +30,18 @@ import logging
logging.basicConfig(level=logging.DEBUG)
LOGGER = logging.getLogger()
DEFAULT_GLOB = '%Y%m%d%H'
COMPRESSED_DATA = '\x1f\x8b\x08\x08\x87\xa5zM\x02\xffdata\x00KI,I\x04\x00c' \
'\xf3\xf3\xad\x04\x00\x00\x00'
access_regex = '''
^
(?P<year>[0-9]{4})
(?P<month>[0-1][0-9])
(?P<day>[0-3][0-9])
(?P<hour>[0-2][0-9])
.*$
'''
def mock_appconfig(*args, **kwargs):
pass
@ -87,179 +94,27 @@ class TestLogUploader(unittest.TestCase):
log_uploader.appconfig = self._orig_appconfig
log_uploader.InternalProxy = self._orig_InternalProxy
def test_deprecated_glob_style_upload_all_logs(self):
tmpdir = mkdtemp()
try:
today = datetime.now()
year = today.year
month = today.month
day = today.day
today_str = today.strftime('%Y%m%d')
time_strs = []
for i in range(24):
time_strs.append('%s%0.2d' % (today_str, i))
for ts in time_strs:
open(os.path.join(tmpdir, ts), 'w').close()
conf = {'log_dir': tmpdir}
uploader = MockLogUploader(conf)
uploader.upload_all_logs()
self.assertEquals(len(uploader.uploaded_files), 24)
for i, file_date in enumerate(sorted(uploader.uploaded_files)):
d = {'year': year, 'month': month, 'day': day, 'hour': i}
for k, v in d.items():
d[k] = '%0.2d' % v
expected = (os.path.join(tmpdir, '%s%0.2d' %
(today_str, i)), d)
self.assertEquals(file_date, expected)
finally:
rmtree(tmpdir)
tmpdir = mkdtemp()
try:
today = datetime.now()
year = today.year
month = today.month
day = today.day
today_str = today.strftime('%Y%m%d')
time_strs = []
for i in range(24):
time_strs.append('%s-%0.2d00' % (today_str, i))
for ts in time_strs:
open(os.path.join(tmpdir, 'swift-blah_98764.%s-2400.tar.gz' %
ts), 'w').close()
open(os.path.join(tmpdir, 'swift.blah_98764.%s-2400.tar.gz' % ts),
'w').close()
open(os.path.join(tmpdir, 'swift-blah_98764.%s-2400.tar.g' % ts),
'w').close()
open(os.path.join(tmpdir,
'swift-blah_201102160100.%s-2400.tar.gz' %
'201102160100'), 'w').close()
conf = {
'log_dir': '%s/' % tmpdir,
'filename_format': 'swift-blah_98764.%Y%m%d-%H*.tar.gz',
}
uploader = MockLogUploader(conf)
uploader.upload_all_logs()
self.assertEquals(len(uploader.uploaded_files), 24)
for i, file_date in enumerate(sorted(uploader.uploaded_files)):
filename, date_dict = file_date
filename = os.path.basename(filename)
self.assert_(today_str in filename, filename)
self.assert_(filename.startswith('swift'), filename)
self.assert_(filename.endswith('tar.gz'), filename)
d = {'year': year, 'month': month, 'day': day, 'hour': i}
for k, v in d.items():
d[k] = '%0.2d' % v
self.assertEquals(d, date_dict)
finally:
rmtree(tmpdir)
tmpdir = mkdtemp()
try:
today = datetime.now()
year = today.year
month = today.month
day = today.day
today_str = today.strftime('%Y%m%d')
time_strs = []
for i in range(24):
time_strs.append('%s%0.2d' % (today_str, i))
for i, ts in enumerate(time_strs):
open(os.path.join(tmpdir, '%s.%s.log' % (i, ts)), 'w').close()
conf = {
'log_dir': tmpdir,
'filename_format': '*.%Y%m%d%H.log',
}
uploader = MockLogUploader(conf)
uploader.upload_all_logs()
self.assertEquals(len(uploader.uploaded_files), 24)
fname_to_int = lambda x: int(os.path.basename(x[0]).split('.')[0])
numerically = lambda x, y: cmp(fname_to_int(x),
fname_to_int(y))
for i, file_date in enumerate(sorted(uploader.uploaded_files,
cmp=numerically)):
d = {'year': year, 'month': month, 'day': day, 'hour': i}
for k, v in d.items():
d[k] = '%0.2d' % v
expected = (os.path.join(tmpdir, '%s.%s%0.2d.log' %
(i, today_str, i)), d)
self.assertEquals(file_date, expected)
finally:
rmtree(tmpdir)
def test_bad_pattern_in_config(self):
files = [datetime.now().strftime('%Y%m%d%H')]
with temptree(files, contents=[COMPRESSED_DATA] * len(files)) as t:
# invalid pattern
conf = {'log_dir': t, 'source_filename_pattern': '%Y%m%d%h'} # should be %H
conf = {'log_dir': t,
'source_filename_pattern': '%Y%m%d%h'} # should be %H
uploader = MockLogUploader(conf)
self.assertFalse(uploader.validate_filename_pattern())
uploader.upload_all_logs()
self.assertEquals(uploader.uploaded_files, [])
self.assertRaises(SystemExit, uploader.upload_all_logs)
conf = {'log_dir': t, 'source_filename_pattern': '%Y%m%d%H'}
conf = {'log_dir': t, 'source_filename_pattern': access_regex}
uploader = MockLogUploader(conf)
self.assert_(uploader.validate_filename_pattern())
uploader.upload_all_logs()
self.assertEquals(len(uploader.uploaded_files), 1)
# deprecated warning on source_filename_format
class MockLogger():
def __init__(self):
self.msgs = defaultdict(list)
def log(self, level, msg):
self.msgs[level].append(msg)
def __getattr__(self, attr):
return partial(self.log, attr)
logger = MockLogger.logger = MockLogger()
def mock_get_logger(*args, **kwargs):
return MockLogger.logger
_orig_get_logger = log_uploader.utils.get_logger
try:
log_uploader.utils.get_logger = mock_get_logger
conf = {'source_filename_format': '%Y%m%d%H'}
uploader = MockLogUploader(conf, logger=logger)
self.assert_([m for m in logger.msgs['warning']
if 'deprecated' in m])
finally:
log_uploader.utils.get_logger = _orig_get_logger
# convert source_filename_format to regex
conf = {'source_filename_format': 'pattern-*.%Y%m%d%H.*.gz'}
uploader = MockLogUploader(conf)
expected = r'pattern-.*\.%Y%m%d%H\..*\.gz'
self.assertEquals(uploader.pattern, expected)
# use source_filename_pattern if we have the choice!
conf = {
'source_filename_format': 'bad',
'source_filename_pattern': 'good',
}
uploader = MockLogUploader(conf)
self.assertEquals(uploader.pattern, 'good')
def test_pattern_upload_all_logs(self):
# test empty dir
with temptree([]) as t:
conf = {'log_dir': t}
uploader = MockLogUploader(conf)
uploader.run_once()
self.assertEquals(len(uploader.uploaded_files), 0)
self.assertRaises(SystemExit, uploader.run_once)
def get_random_length_str(max_len=10, chars=string.ascii_letters):
return ''.join(random.choice(chars) for x in
@ -267,8 +122,12 @@ class TestLogUploader(unittest.TestCase):
template = 'prefix_%(random)s_%(digits)s.blah.' \
'%(datestr)s%(hour)0.2d00-%(next_hour)0.2d00-%(number)s.gz'
pattern = r'prefix_.*_[0-9]+\.blah\.%Y%m%d%H00-[0-9]{2}00' \
'-[0-9]?[0-9]\.gz'
pattern = '''prefix_.*_[0-9]+\.blah\.
(?P<year>[0-9]{4})
(?P<month>[0-1][0-9])
(?P<day>[0-3][0-9])
(?P<hour>[0-2][0-9])00-[0-9]{2}00
-[0-9]?[0-9]\.gz'''
files_that_should_match = []
# add some files that match
for i in range(24):
@ -313,25 +172,28 @@ class TestLogUploader(unittest.TestCase):
def test_log_cutoff(self):
files = [datetime.now().strftime('%Y%m%d%H')]
with temptree(files) as t:
conf = {'log_dir': t, 'new_log_cutoff': '7200'}
conf = {'log_dir': t, 'new_log_cutoff': '7200',
'source_filename_pattern': access_regex}
uploader = MockLogUploader(conf)
uploader.run_once()
self.assertEquals(len(uploader.uploaded_files), 0)
conf = {'log_dir': t, 'new_log_cutoff': '0'}
conf = {'log_dir': t, 'new_log_cutoff': '0',
'source_filename_pattern': access_regex}
uploader = MockLogUploader(conf)
uploader.run_once()
self.assertEquals(len(uploader.uploaded_files), 1)
def test_create_container_fail(self):
files = [datetime.now().strftime('%Y%m%d%H')]
conf = {'source_filename_pattern': access_regex}
with temptree(files) as t:
conf = {'log_dir': t}
conf['log_dir'] = t
uploader = MockLogUploader(conf)
uploader.run_once()
self.assertEquals(len(uploader.uploaded_files), 1)
with temptree(files) as t:
conf = {'log_dir': t}
conf['log_dir'] = t
uploader = MockLogUploader(conf)
# mock create_container to fail
uploader.internal_proxy.create_container = lambda *args: False
@ -341,14 +203,16 @@ class TestLogUploader(unittest.TestCase):
def test_unlink_log(self):
files = [datetime.now().strftime('%Y%m%d%H')]
with temptree(files, contents=[COMPRESSED_DATA]) as t:
conf = {'log_dir': t, 'unlink_log': 'false'}
conf = {'log_dir': t, 'unlink_log': 'false',
'source_filename_pattern': access_regex}
uploader = MockLogUploader(conf)
uploader.run_once()
self.assertEquals(len(uploader.uploaded_files), 1)
# file still there
self.assertEquals(len(os.listdir(t)), 1)
conf = {'log_dir': t, 'unlink_log': 'true'}
conf = {'log_dir': t, 'unlink_log': 'true',
'source_filename_pattern': access_regex}
uploader = MockLogUploader(conf)
uploader.run_once()
self.assertEquals(len(uploader.uploaded_files), 1)
@ -356,9 +220,10 @@ class TestLogUploader(unittest.TestCase):
self.assertEquals(len(os.listdir(t)), 0)
def test_upload_file_failed(self):
files = [datetime.now().strftime('%Y%m%d%H')]
files = ['plugin-%s' % datetime.now().strftime('%Y%m%d%H')]
with temptree(files, contents=[COMPRESSED_DATA]) as t:
conf = {'log_dir': t, 'unlink_log': 'true'}
conf = {'log_dir': t, 'unlink_log': 'true',
'source_filename_pattern': access_regex}
uploader = MockLogUploader(conf)
# mock upload_file to fail, and clean up mock
@ -366,12 +231,10 @@ class TestLogUploader(unittest.TestCase):
uploader.uploaded_files.pop()
return False
uploader.internal_proxy.upload_file = mock_upload_file
uploader.run_once()
self.assertEquals(len(uploader.uploaded_files), 0)
self.assertRaises(SystemExit, uploader.run_once)
# file still there
self.assertEquals(len(os.listdir(t)), 1)
if __name__ == '__main__':
unittest.main()