diff --git a/AUTHORS b/AUTHORS index f6287945c8..ea0e8ce34f 100644 --- a/AUTHORS +++ b/AUTHORS @@ -15,6 +15,7 @@ Chuck Thier Contributors ------------ +Joe Arnold Chmouel Boudjnah Anne Gentle Clay Gerrard diff --git a/bin/swift-account-stats-logger b/bin/swift-account-stats-logger deleted file mode 100755 index b018ab55ab..0000000000 --- a/bin/swift-account-stats-logger +++ /dev/null @@ -1,27 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2010-2011 OpenStack, LLC. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from swift.stats.db_stats_collector import AccountStatsCollector -from swift.common.utils import parse_options -from swift.common.daemon import run_daemon - -if __name__ == '__main__': - conf_file, options = parse_options() - # currently AccountStatsCollector only supports run_once - options['once'] = True - run_daemon(AccountStatsCollector, conf_file, - section_name='log-processor-stats', - log_name="account-stats", **options) diff --git a/bin/swift-container-stats-logger b/bin/swift-container-stats-logger deleted file mode 100755 index 3b93c20cd4..0000000000 --- a/bin/swift-container-stats-logger +++ /dev/null @@ -1,27 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2010-2011 OpenStack, LLC. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from swift.stats.db_stats_collector import ContainerStatsCollector -from swift.common.utils import parse_options -from swift.common.daemon import run_daemon - -if __name__ == '__main__': - conf_file, options = parse_options() - # currently ContainerStatsCollector only supports run_once - options['once'] = True - run_daemon(ContainerStatsCollector, conf_file, - section_name='log-processor-container-stats', - log_name="container-stats", **options) diff --git a/bin/swift-log-stats-collector b/bin/swift-log-stats-collector deleted file mode 100755 index f2d5011ec2..0000000000 --- a/bin/swift-log-stats-collector +++ /dev/null @@ -1,35 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2010-2011 OpenStack, LLC. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from optparse import OptionParser - -from swift.stats.log_processor import LogProcessorDaemon -from swift.common.utils import parse_options -from swift.common.daemon import run_daemon - -if __name__ == '__main__': - parser = OptionParser(usage='Usage: %prog [options] ') - - parser.add_option('--lookback_hours', type='int', dest='lookback_hours', - help='Hours in the past to start looking for log files') - parser.add_option('--lookback_window', type='int', dest='lookback_window', - help='Hours past lookback_hours to stop looking for log files') - - conf_file, options = parse_options(parser) - # currently the LogProcessorDaemon only supports run_once - options['once'] = True - run_daemon(LogProcessorDaemon, conf_file, section_name=None, - log_name='log-stats-collector', **options) diff --git a/bin/swift-log-uploader b/bin/swift-log-uploader deleted file mode 100755 index 3639dfffb4..0000000000 --- a/bin/swift-log-uploader +++ /dev/null @@ -1,49 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2010-2011 OpenStack, LLC. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import sys -from optparse import OptionParser -from swift.stats.log_uploader import LogUploader -from swift.common.utils import parse_options -from swift.common import utils - -if __name__ == '__main__': - parser = OptionParser("Usage: %prog CONFIG_FILE PLUGIN") - parser.add_option('-c', '--log_cutoff', - help='Override new_log_cutoff.') - parser.add_option('-x', '--regex', - help='Override source_filename_pattern regex.') - conf_file, options = parse_options(parser=parser) - try: - plugin = options['extra_args'][0] - except (IndexError, KeyError): - print "Error: missing plugin name" - sys.exit(1) - - uploader_conf = utils.readconf(conf_file, 'log-processor') - section_name = 'log-processor-%s' % plugin - plugin_conf = utils.readconf(conf_file, section_name) - uploader_conf.update(plugin_conf) - - # pre-configure logger - logger = utils.get_logger(uploader_conf, log_route='log-uploader', - log_to_console=options.get('verbose', False)) - # currently LogUploader only supports run_once - options['once'] = True - regex = options.get('regex') - cutoff = options.get('log_cutoff') - uploader = LogUploader(uploader_conf, plugin, - regex=regex, cutoff=cutoff).run(**options) diff --git a/doc/source/development_saio.rst b/doc/source/development_saio.rst index 255d478096..642bdcc670 100644 --- a/doc/source/development_saio.rst +++ b/doc/source/development_saio.rst @@ -630,7 +630,6 @@ Setting up scripts for running Swift #. `remakerings` #. `cd ~/swift/trunk; ./.unittests` #. `startmain` (The ``Unable to increase file descriptor limit. Running as non-root?`` warnings are expected and ok.) - #. `recreateaccounts` #. Get an `X-Storage-Url` and `X-Auth-Token`: ``curl -v -H 'X-Storage-User: test:tester' -H 'X-Storage-Pass: testing' http://127.0.0.1:8080/auth/v1.0`` #. Check that you can GET account: ``curl -v -H 'X-Auth-Token: ' `` #. Check that `swift` works: `swift -A http://127.0.0.1:8080/auth/v1.0 -U test:tester -K testing stat` diff --git a/doc/source/index.rst b/doc/source/index.rst index 67ef32ff1a..5a7cc228e5 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -42,7 +42,6 @@ Overview and Concepts overview_reaper overview_auth overview_replication - overview_stats ratelimit overview_large_objects overview_container_sync diff --git a/doc/source/overview_stats.rst b/doc/source/overview_stats.rst deleted file mode 100644 index 3043b57ece..0000000000 --- a/doc/source/overview_stats.rst +++ /dev/null @@ -1,192 +0,0 @@ -================== -Swift stats system -================== - -The swift stats system is composed of three parts parts: log creation, log -uploading, and log processing. The system handles two types of logs (access -and account stats), but it can be extended to handle other types of logs. - ---------- -Log Types ---------- - -*********** -Access logs -*********** - -Access logs are the proxy server logs. Rackspace uses syslog-ng to redirect -the proxy log output to an hourly log file. For example, a proxy request that -is made on August 4, 2010 at 12:37 gets logged in a file named 2010080412. -This allows easy log rotation and easy per-hour log processing. - -********************************* -Account / Container DB stats logs -********************************* - -DB stats logs are generated by a stats system process. -swift-account-stats-logger runs on each account server (via cron) and walks -the filesystem looking for account databases. When an account database is -found, the logger selects the account hash, bytes_used, container_count, and -object_count. These values are then written out as one line in a csv file. One -csv file is produced for every run of swift-account-stats-logger. This means -that, system wide, one csv file is produced for every storage node. Rackspace -runs the account stats logger every hour. Therefore, in a cluster of ten -account servers, ten csv files are produced every hour. Also, every account -will have one entry for every replica in the system. On average, there will be -three copies of each account in the aggregate of all account stat csv files -created in one system-wide run. The swift-container-stats-logger runs in a -similar fashion, scanning the container dbs. - ----------------------- -Log Processing plugins ----------------------- - -The swift stats system is written to allow a plugin to be defined for every -log type. Swift includes plugins for both access logs and storage stats logs. -Each plugin is responsible for defining, in a config section, where the logs -are stored on disk, where the logs will be stored in swift (account and -container), the filename format of the logs on disk, the location of the -plugin class definition, and any plugin-specific config values. - -The plugin class definition defines three methods. The constructor must accept -one argument (the dict representation of the plugin's config section). The -process method must accept an iterator, and the account, container, and object -name of the log. The keylist_mapping accepts no parameters. - -------------- -Log Uploading -------------- - -swift-log-uploader accepts a config file and a plugin name. It finds the log -files on disk according to the plugin config section and uploads them to the -swift cluster. This means one uploader process will run on each proxy server -node and each account server node. To not upload partially-written log files, -the uploader will not upload files with an mtime of less than two hours ago. -Rackspace runs this process once an hour via cron. - --------------- -Log Processing --------------- - -swift-log-stats-collector accepts a config file and generates a csv that is -uploaded to swift. It loads all plugins defined in the config file, generates -a list of all log files in swift that need to be processed, and passes an -iterable of the log file data to the appropriate plugin's process method. The -process method returns a dictionary of data in the log file keyed on (account, -year, month, day, hour). The log-stats-collector process then combines all -dictionaries from all calls to a process method into one dictionary. Key -collisions within each (account, year, month, day, hour) dictionary are -summed. Finally, the summed dictionary is mapped to the final csv values with -each plugin's keylist_mapping method. - -The resulting csv file has one line per (account, year, month, day, hour) for -all log files processed in that run of swift-log-stats-collector. - - -================================ -Running the stats system on SAIO -================================ - -#. Create a swift account to use for storing stats information, and note the - account hash. The hash will be used in config files. - -#. Edit /etc/rsyslog.d/10-swift.conf:: - - # Uncomment the following to have a log containing all logs together - #local1,local2,local3,local4,local5.* /var/log/swift/all.log - - $template HourlyProxyLog,"/var/log/swift/hourly/%$YEAR%%$MONTH%%$DAY%%$HOUR%" - local1.*;local1.!notice ?HourlyProxyLog - - local1.*;local1.!notice /var/log/swift/proxy.log - local1.notice /var/log/swift/proxy.error - local1.* ~ - -#. Edit /etc/rsyslog.conf and make the following change:: - $PrivDropToGroup adm - -#. `mkdir -p /var/log/swift/hourly` -#. `chown -R syslog.adm /var/log/swift` -#. `chmod 775 /var/log/swift /var/log/swift/hourly` -#. `service rsyslog restart` -#. `usermod -a -G adm ` -#. Relogin to let the group change take effect. -#. Create `/etc/swift/log-processor.conf`:: - - [log-processor] - swift_account = - user = - - [log-processor-access] - swift_account = - container_name = log_data - log_dir = /var/log/swift/hourly/ - source_filename_pattern = ^ - (?P[0-9]{4}) - (?P[0-1][0-9]) - (?P[0-3][0-9]) - (?P[0-2][0-9]) - .*$ - class_path = swift.stats.access_processor.AccessLogProcessor - user = - - [log-processor-stats] - swift_account = - container_name = account_stats - log_dir = /var/log/swift/stats/ - class_path = swift.stats.stats_processor.StatsLogProcessor - devices = /srv/1/node - mount_check = false - user = - - [log-processor-container-stats] - swift_account = - container_name = container_stats - log_dir = /var/log/swift/stats/ - class_path = swift.stats.stats_processor.StatsLogProcessor - processable = false - devices = /srv/1/node - mount_check = false - user = - -#. Add the following under [app:proxy-server] in `/etc/swift/proxy-server.conf`:: - - log_facility = LOG_LOCAL1 - -#. Create a `cron` job to run once per hour to create the stats logs. In - `/etc/cron.d/swift-stats-log-creator`:: - - 0 * * * * /usr/local/bin/swift-account-stats-logger /etc/swift/log-processor.conf - -#. Create a `cron` job to run once per hour to create the container stats logs. In - `/etc/cron.d/swift-container-stats-log-creator`:: - - 5 * * * * /usr/local/bin/swift-container-stats-logger /etc/swift/log-processor.conf - -#. Create a `cron` job to run once per hour to upload the stats logs. In - `/etc/cron.d/swift-stats-log-uploader`:: - - 10 * * * * /usr/local/bin/swift-log-uploader /etc/swift/log-processor.conf stats - -#. Create a `cron` job to run once per hour to upload the stats logs. In - `/etc/cron.d/swift-stats-log-uploader`:: - - 15 * * * * /usr/local/bin/swift-log-uploader /etc/swift/log-processor.conf container-stats - -#. Create a `cron` job to run once per hour to upload the access logs. In - `/etc/cron.d/swift-access-log-uploader`:: - - 5 * * * * /usr/local/bin/swift-log-uploader /etc/swift/log-processor.conf access - -#. Create a `cron` job to run once per hour to process the logs. In - `/etc/cron.d/swift-stats-processor`:: - - 30 * * * * /usr/local/bin/swift-log-stats-collector /etc/swift/log-processor.conf - -After running for a few hours, you should start to see .csv files in the -log_processing_data container in the swift stats account that was created -earlier. This file will have one entry per account per hour for each account -with activity in that hour. One .csv file should be produced per hour. Note -that the stats will be delayed by at least two hours by default. This can be -changed with the new_log_cutoff variable in the config file. See -`log-processor.conf-sample` for more details. diff --git a/etc/account-server.conf-sample b/etc/account-server.conf-sample index 1fac948619..34ba714b72 100644 --- a/etc/account-server.conf-sample +++ b/etc/account-server.conf-sample @@ -44,17 +44,6 @@ use = egg:swift#account # The replicator also performs reclamation # reclaim_age = 86400 -[account-stats] -# You can override the default log routing for this app here (don't use set!): -# log_name = account-stats -# log_facility = LOG_LOCAL0 -# log_level = INFO -# cf_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 -# container_name = account_stats -# proxy_server_conf = /etc/swift/proxy-server.conf -# log_facility = LOG_LOCAL0 -# log_level = INFO - [account-auditor] # You can override the default log routing for this app here (don't use set!): # log_name = account-auditor diff --git a/etc/log-processor.conf-sample b/etc/log-processor.conf-sample deleted file mode 100644 index 350ae73010..0000000000 --- a/etc/log-processor.conf-sample +++ /dev/null @@ -1,57 +0,0 @@ -# plugin section format is named "log-processor-" - -[log-processor] -swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 -# container_name = log_processing_data -# proxy_server_conf = /etc/swift/proxy-server.conf -# log_facility = LOG_LOCAL0 -# log_level = INFO -# lookback_hours = 120 -# lookback_window = 120 -# user = swift - -[log-processor-access] -# log_dir = /var/log/swift/ -swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 -container_name = log_data -source_filename_pattern = ^ - (?P[0-9]{4}) - (?P[0-1][0-9]) - (?P[0-3][0-9]) - (?P[0-2][0-9]) - .*$ -# new_log_cutoff = 7200 -# unlink_log = True -class_path = swift.stats.access_processor.AccessLogProcessor -# service ips is for client ip addresses that should be counted as servicenet -# service_ips = -# load balancer private ips is for load balancer ip addresses that should be -# counted as servicenet -# lb_private_ips = -# server_name = proxy-server -# user = swift -# warn_percent = 0.8 - -[log-processor-stats] -# log_dir = /var/log/swift/ -swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 -container_name = account_stats -# new_log_cutoff = 7200 -# unlink_log = True -class_path = swift.stats.stats_processor.StatsLogProcessor -# devices = /srv/node -# mount_check = true -# user = swift - -[log-processor-container-stats] -# log_dir = /var/log/swift/ -swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 -container_name = container_stats -# new_log_cutoff = 7200 -# unlink_log = True -class_path = swift.stats.stats_processor.StatsLogProcessor -processable = false -# devices = /srv/node -# mount_check = true -# user = swift -# metadata_keys = comma separated list of user metadata keys to be collected diff --git a/setup.py b/setup.py index 9331f778ca..bb40f81563 100644 --- a/setup.py +++ b/setup.py @@ -92,10 +92,6 @@ setup( 'bin/swift-stats-report', 'bin/swift-dispersion-populate', 'bin/swift-dispersion-report', 'bin/swift-bench', - 'bin/swift-log-uploader', - 'bin/swift-log-stats-collector', - 'bin/swift-account-stats-logger', - 'bin/swift-container-stats-logger', ], entry_points={ 'paste.app_factory': [ diff --git a/swift/account/server.py b/swift/account/server.py index fd839a4909..ee55c0556b 100644 --- a/swift/account/server.py +++ b/swift/account/server.py @@ -29,7 +29,7 @@ import simplejson from swift.common.db import AccountBroker from swift.common.utils import get_logger, get_param, hash_path, \ - normalize_timestamp, split_path, storage_directory + normalize_timestamp, split_path, storage_directory, XML_EXTRA_ENTITIES from swift.common.constraints import ACCOUNT_LISTING_LIMIT, \ check_mount, check_float, check_utf8 from swift.common.db_replicator import ReplicatorRpc @@ -79,6 +79,9 @@ class AccountController(object): try: drive, part, account, container = split_path(unquote(req.path), 3, 4) + if (account and not check_utf8(account)) or \ + (container and not check_utf8(container)): + raise ValueError('NULL characters not allowed in names') except ValueError, err: return HTTPBadRequest(body=str(err), content_type='text/plain', request=req) @@ -201,8 +204,8 @@ class AccountController(object): marker = get_param(req, 'marker', '') end_marker = get_param(req, 'end_marker') query_format = get_param(req, 'format') - except UnicodeDecodeError, err: - return HTTPBadRequest(body='parameters not utf8', + except (UnicodeDecodeError, ValueError), err: + return HTTPBadRequest(body='parameters not utf8 or contain NULLs', content_type='text/plain', request=req) if query_format: req.accept = 'application/%s' % query_format.lower() @@ -228,7 +231,7 @@ class AccountController(object): output_list = ['', '' % account] for (name, object_count, bytes_used, is_subdir) in account_list: - name = saxutils.escape(name) + name = saxutils.escape(name, XML_EXTRA_ENTITIES) if is_subdir: output_list.append('' % name) else: diff --git a/swift/common/compressing_file_reader.py b/swift/common/compressing_file_reader.py deleted file mode 100644 index c581bddadd..0000000000 --- a/swift/common/compressing_file_reader.py +++ /dev/null @@ -1,73 +0,0 @@ -# Copyright (c) 2010-2011 OpenStack, LLC. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import zlib -import struct - - -class CompressingFileReader(object): - ''' - Wraps a file object and provides a read method that returns gzip'd data. - - One warning: if read is called with a small value, the data returned may - be bigger than the value. In this case, the "compressed" data will be - bigger than the original data. To solve this, use a bigger read buffer. - - An example use case: - Given an uncompressed file on disk, provide a way to read compressed data - without buffering the entire file data in memory. Using this class, an - uncompressed log file could be uploaded as compressed data with chunked - transfer encoding. - - gzip header and footer code taken from the python stdlib gzip module - - :param file_obj: File object to read from - :param compresslevel: compression level - ''' - - def __init__(self, file_obj, compresslevel=9): - self._f = file_obj - self._compressor = zlib.compressobj(compresslevel, - zlib.DEFLATED, - -zlib.MAX_WBITS, - zlib.DEF_MEM_LEVEL, - 0) - self.done = False - self.first = True - self.crc32 = 0 - self.total_size = 0 - - def read(self, *a, **kw): - if self.done: - return '' - x = self._f.read(*a, **kw) - if x: - self.crc32 = zlib.crc32(x, self.crc32) & 0xffffffffL - self.total_size += len(x) - compressed = self._compressor.compress(x) - if not compressed: - compressed = self._compressor.flush(zlib.Z_SYNC_FLUSH) - else: - compressed = self._compressor.flush(zlib.Z_FINISH) - crc32 = struct.pack(" 299) \ - and tries < self.retries: - req_copy = webob_request_copy(req, source_file=source_file, - compress=compress) - resp = self.upload_app.handle_request(req_copy) - tries += 1 - return resp - - def upload_file(self, source_file, account, container, object_name, - compress=True, content_type='application/x-gzip', - etag=None): - """ - Upload a file to cloud files. - - :param source_file: path to or file like object to upload - :param account: account to upload to - :param container: container to upload to - :param object_name: name of object being uploaded - :param compress: if True, compresses object as it is uploaded - :param content_type: content-type of object - :param etag: etag for object to check successful upload - :returns: True if successful, False otherwise - """ - target_name = '/v1/%s/%s/%s' % (account, container, object_name) - - # create the container - if not self.create_container(account, container): - return False - - # upload the file to the account - req = webob.Request.blank(target_name, content_type=content_type, - environ={'REQUEST_METHOD': 'PUT'}, - headers={'Transfer-Encoding': 'chunked'}) - req.content_length = None # to make sure we send chunked data - if etag: - req.headers['etag'] = etag - resp = self._handle_request(req, source_file=source_file, - compress=compress) - if not (200 <= resp.status_int < 300): - return False - return True - - def get_object(self, account, container, object_name): - """ - Get object. - - :param account: account name object is in - :param container: container name object is in - :param object_name: name of object to get - :returns: iterator for object data - """ - req = webob.Request.blank('/v1/%s/%s/%s' % - (account, container, object_name), - environ={'REQUEST_METHOD': 'GET'}) - resp = self._handle_request(req) - return resp.status_int, resp.app_iter - - def create_container(self, account, container): - """ - Create container. - - :param account: account name to put the container in - :param container: container name to create - :returns: True if successful, otherwise False - """ - req = webob.Request.blank('/v1/%s/%s' % (account, container), - environ={'REQUEST_METHOD': 'PUT'}) - resp = self._handle_request(req) - return 200 <= resp.status_int < 300 - - def get_container_list(self, account, container, marker=None, - end_marker=None, limit=None, prefix=None, - delimiter=None, full_listing=True): - """ - Get a listing of objects for the container. - - :param account: account name for the container - :param container: container name to get a listing for - :param marker: marker query - :param end_marker: end marker query - :param limit: limit query - :param prefix: prefix query - :param delimeter: string to delimit the queries on - :param full_listing: if True, return a full listing, else returns a max - of 10000 listings - :returns: list of objects - """ - if full_listing: - rv = [] - listing = self.get_container_list(account, container, marker, - end_marker, limit, prefix, - delimiter, full_listing=False) - while listing: - rv.extend(listing) - if not delimiter: - marker = listing[-1]['name'] - else: - marker = listing[-1].get('name', listing[-1].get('subdir')) - listing = self.get_container_list(account, container, marker, - end_marker, limit, prefix, - delimiter, - full_listing=False) - return rv - path = '/v1/%s/%s' % (account, quote(container)) - qs = 'format=json' - if marker: - qs += '&marker=%s' % quote(marker) - if end_marker: - qs += '&end_marker=%s' % quote(end_marker) - if limit: - qs += '&limit=%d' % limit - if prefix: - qs += '&prefix=%s' % quote(prefix) - if delimiter: - qs += '&delimiter=%s' % quote(delimiter) - path += '?%s' % qs - req = webob.Request.blank(path, environ={'REQUEST_METHOD': 'GET'}) - resp = self._handle_request(req) - if resp.status_int < 200 or resp.status_int >= 300: - return [] # TODO: distinguish between 404 and empty container - if resp.status_int == 204: - return [] - return json_loads(resp.body) diff --git a/swift/common/middleware/tempauth.py b/swift/common/middleware/tempauth.py index df80b36af6..4977ed5ac2 100644 --- a/swift/common/middleware/tempauth.py +++ b/swift/common/middleware/tempauth.py @@ -89,7 +89,7 @@ class TempAuth(object): if ip == '0.0.0.0': ip = '127.0.0.1' url += ip - url += ':' + conf.get('bind_port', 80) + '/v1/' + \ + url += ':' + conf.get('bind_port', '8080') + '/v1/' + \ self.reseller_prefix + conf_key.split('_')[1] groups = values self.users[conf_key.split('_', 1)[1].replace('_', ':')] = { diff --git a/swift/common/utils.py b/swift/common/utils.py index ab97a44237..a896aa9e55 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -42,6 +42,7 @@ from eventlet import greenio, GreenPool, sleep, Timeout, listen from eventlet.green import socket, subprocess, ssl, thread, threading import netifaces +from swift.common.constraints import check_utf8 from swift.common.exceptions import LockTimeout, MessageTimeout # logging doesn't import patched as cleanly as one would like @@ -74,6 +75,8 @@ if hash_conf.read('/etc/swift/swift.conf'): # Used when reading config values TRUE_VALUES = set(('true', '1', 'yes', 'on', 't', 'y')) +# Used with xml.sax.saxutils.escape +XML_EXTRA_ENTITIES = dict((chr(x), '&#x%x;' % x) for x in xrange(1, 0x20)) def validate_configuration(): if HASH_PATH_SUFFIX == '': @@ -110,8 +113,8 @@ def get_param(req, name, default=None): :returns: HTTP request parameter value """ value = req.str_params.get(name, default) - if value: - value.decode('utf8') # Ensure UTF8ness + if value and not check_utf8(value): + raise ValueError('Not valid UTF-8 or contains NULL characters') return value @@ -144,12 +147,12 @@ def drop_buffer_cache(fd, offset, length): """ global _posix_fadvise if _posix_fadvise is None: - _posix_fadvise = load_libc_function('posix_fadvise') + _posix_fadvise = load_libc_function('posix_fadvise64') # 4 means "POSIX_FADV_DONTNEED" ret = _posix_fadvise(fd, ctypes.c_uint64(offset), ctypes.c_uint64(length), 4) if ret != 0: - logging.warn("posix_fadvise(%s, %s, %s, 4) -> %s" + logging.warn("posix_fadvise64(%s, %s, %s, 4) -> %s" % (fd, offset, length, ret)) diff --git a/swift/container/server.py b/swift/container/server.py index 39e3d4ac11..65dfe6ba0c 100644 --- a/swift/container/server.py +++ b/swift/container/server.py @@ -33,7 +33,7 @@ from webob.exc import HTTPAccepted, HTTPBadRequest, HTTPConflict, \ from swift.common.db import ContainerBroker from swift.common.utils import get_logger, get_param, hash_path, \ normalize_timestamp, storage_directory, split_path, urlparse, \ - validate_sync_to + validate_sync_to, XML_EXTRA_ENTITIES from swift.common.constraints import CONTAINER_LISTING_LIMIT, \ check_mount, check_float, check_utf8 from swift.common.bufferedhttp import http_connect @@ -172,6 +172,10 @@ class ContainerController(object): try: drive, part, account, container, obj = split_path( unquote(req.path), 4, 5, True) + if (account and not check_utf8(account)) or \ + (container and not check_utf8(container)) or \ + (obj and not check_utf8(obj)): + raise ValueError('NULL characters not allowed in names') except ValueError, err: return HTTPBadRequest(body=str(err), content_type='text/plain', request=req) @@ -289,7 +293,7 @@ class ContainerController(object): return HTTPPreconditionFailed(request=req, body='Maximum limit is %d' % CONTAINER_LISTING_LIMIT) query_format = get_param(req, 'format') - except UnicodeDecodeError, err: + except (UnicodeDecodeError, ValueError), err: return HTTPBadRequest(body='parameters not utf8', content_type='text/plain', request=req) if query_format: @@ -324,21 +328,23 @@ class ContainerController(object): xml_output = [] for (name, created_at, size, content_type, etag) in container_list: # escape name and format date here - name = saxutils.escape(name) + name = saxutils.escape(name, XML_EXTRA_ENTITIES) created_at = datetime.utcfromtimestamp( float(created_at)).isoformat() if content_type is None: xml_output.append('%s' '' % (name, name)) else: - content_type = saxutils.escape(content_type) + content_type = saxutils.escape(content_type, + XML_EXTRA_ENTITIES) xml_output.append('%s%s'\ '%d%s'\ '%s' % \ (name, etag, size, content_type, created_at)) container_list = ''.join([ '\n', - '' % saxutils.quoteattr(container), + '' % + saxutils.quoteattr(container, XML_EXTRA_ENTITIES), ''.join(xml_output), '']) else: if not container_list: diff --git a/swift/stats/__init__.py b/swift/stats/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/swift/stats/access_processor.py b/swift/stats/access_processor.py deleted file mode 100644 index 897c23880a..0000000000 --- a/swift/stats/access_processor.py +++ /dev/null @@ -1,250 +0,0 @@ -# Copyright (c) 2010-2011 OpenStack, LLC. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import collections -from urllib import unquote -import copy - -from swift.common.utils import split_path, get_logger - -month_map = '_ Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split() -LISTING_PARAMS = set( - 'path limit format delimiter marker end_marker prefix'.split()) - - -class AccessLogProcessor(object): - """Transform proxy server access logs""" - - def __init__(self, conf): - self.server_name = conf.get('server_name', 'proxy-server') - self.lb_private_ips = [x.strip() for x in \ - conf.get('lb_private_ips', '').split(',')\ - if x.strip()] - self.service_ips = [x.strip() for x in \ - conf.get('service_ips', '').split(',')\ - if x.strip()] - self.warn_percent = float(conf.get('warn_percent', '0.8')) - self.logger = get_logger(conf, log_route='access-processor') - - def log_line_parser(self, raw_log): - '''given a raw access log line, return a dict of the good parts''' - d = {} - try: - (unused, - server, - client_ip, - lb_ip, - timestamp, - method, - request, - http_version, - code, - referrer, - user_agent, - auth_token, - bytes_in, - bytes_out, - etag, - trans_id, - headers, - processing_time) = (unquote(x) for x in - raw_log[16:].split(' ')[:18]) - except ValueError: - self.logger.debug(_('Bad line data: %s') % repr(raw_log)) - return {} - if server != self.server_name: - # incorrect server name in log line - self.logger.debug(_('Bad server name: found "%(found)s" ' \ - 'expected "%(expected)s"') % - {'found': server, 'expected': self.server_name}) - return {} - try: - (version, account, container_name, object_name) = \ - split_path(request, 2, 4, True) - except ValueError, e: - self.logger.debug(_('Invalid path: %(error)s from data: %(log)s') % - {'error': e, 'log': repr(raw_log)}) - return {} - if container_name is not None: - container_name = container_name.split('?', 1)[0] - if object_name is not None: - object_name = object_name.split('?', 1)[0] - account = account.split('?', 1)[0] - query = None - if '?' in request: - request, query = request.split('?', 1) - args = query.split('&') - # Count each query argument. This is used later to aggregate - # the number of format, prefix, etc. queries. - for q in args: - if '=' in q: - k, v = q.split('=', 1) - else: - k = q - # Certain keys will get summmed in stats reporting - # (format, path, delimiter, etc.). Save a "1" here - # to indicate that this request is 1 request for - # its respective key. - if k in LISTING_PARAMS: - d[k] = 1 - d['client_ip'] = client_ip - d['lb_ip'] = lb_ip - d['method'] = method - d['request'] = request - if query: - d['query'] = query - d['http_version'] = http_version - d['code'] = code - d['referrer'] = referrer - d['user_agent'] = user_agent - d['auth_token'] = auth_token - d['bytes_in'] = bytes_in - d['bytes_out'] = bytes_out - d['etag'] = etag - d['trans_id'] = trans_id - d['processing_time'] = processing_time - day, month, year, hour, minute, second = timestamp.split('/') - d['day'] = day - month = ('%02s' % month_map.index(month)).replace(' ', '0') - d['month'] = month - d['year'] = year - d['hour'] = hour - d['minute'] = minute - d['second'] = second - d['tz'] = '+0000' - d['account'] = account - d['container_name'] = container_name - d['object_name'] = object_name - d['bytes_out'] = int(d['bytes_out'].replace('-', '0')) - d['bytes_in'] = int(d['bytes_in'].replace('-', '0')) - d['code'] = int(d['code']) - return d - - def process(self, obj_stream, data_object_account, data_object_container, - data_object_name): - '''generate hourly groupings of data from one access log file''' - hourly_aggr_info = {} - total_lines = 0 - bad_lines = 0 - for line in obj_stream: - line_data = self.log_line_parser(line) - total_lines += 1 - if not line_data: - bad_lines += 1 - continue - account = line_data['account'] - container_name = line_data['container_name'] - year = line_data['year'] - month = line_data['month'] - day = line_data['day'] - hour = line_data['hour'] - bytes_out = line_data['bytes_out'] - bytes_in = line_data['bytes_in'] - method = line_data['method'] - code = int(line_data['code']) - object_name = line_data['object_name'] - client_ip = line_data['client_ip'] - - op_level = None - if not container_name: - op_level = 'account' - elif container_name and not object_name: - op_level = 'container' - elif object_name: - op_level = 'object' - - aggr_key = (account, year, month, day, hour) - d = hourly_aggr_info.get(aggr_key, {}) - if line_data['lb_ip'] in self.lb_private_ips: - source = 'service' - else: - source = 'public' - - if line_data['client_ip'] in self.service_ips: - source = 'service' - - d[(source, 'bytes_out')] = d.setdefault(( - source, 'bytes_out'), 0) + bytes_out - d[(source, 'bytes_in')] = d.setdefault((source, 'bytes_in'), 0) + \ - bytes_in - - d['format_query'] = d.setdefault('format_query', 0) + \ - line_data.get('format', 0) - d['marker_query'] = d.setdefault('marker_query', 0) + \ - line_data.get('marker', 0) - d['prefix_query'] = d.setdefault('prefix_query', 0) + \ - line_data.get('prefix', 0) - d['delimiter_query'] = d.setdefault('delimiter_query', 0) + \ - line_data.get('delimiter', 0) - path = line_data.get('path', 0) - d['path_query'] = d.setdefault('path_query', 0) + path - - code = '%dxx' % (code / 100) - key = (source, op_level, method, code) - d[key] = d.setdefault(key, 0) + 1 - - hourly_aggr_info[aggr_key] = d - if bad_lines > (total_lines * self.warn_percent): - name = '/'.join([data_object_account, data_object_container, - data_object_name]) - self.logger.warning(_('I found a bunch of bad lines in %(name)s '\ - '(%(bad)d bad, %(total)d total)') % - {'name': name, 'bad': bad_lines, 'total': total_lines}) - return hourly_aggr_info - - def keylist_mapping(self): - source_keys = 'service public'.split() - level_keys = 'account container object'.split() - verb_keys = 'GET PUT POST DELETE HEAD COPY'.split() - code_keys = '2xx 4xx 5xx'.split() - - keylist_mapping = { - # : or - 'service_bw_in': ('service', 'bytes_in'), - 'service_bw_out': ('service', 'bytes_out'), - 'public_bw_in': ('public', 'bytes_in'), - 'public_bw_out': ('public', 'bytes_out'), - 'account_requests': set(), - 'container_requests': set(), - 'object_requests': set(), - 'service_request': set(), - 'public_request': set(), - 'ops_count': set(), - } - for verb in verb_keys: - keylist_mapping[verb] = set() - for code in code_keys: - keylist_mapping[code] = set() - for source in source_keys: - for level in level_keys: - for verb in verb_keys: - for code in code_keys: - keylist_mapping['account_requests'].add( - (source, 'account', verb, code)) - keylist_mapping['container_requests'].add( - (source, 'container', verb, code)) - keylist_mapping['object_requests'].add( - (source, 'object', verb, code)) - keylist_mapping['service_request'].add( - ('service', level, verb, code)) - keylist_mapping['public_request'].add( - ('public', level, verb, code)) - keylist_mapping[verb].add( - (source, level, verb, code)) - keylist_mapping[code].add( - (source, level, verb, code)) - keylist_mapping['ops_count'].add( - (source, level, verb, code)) - return keylist_mapping diff --git a/swift/stats/db_stats_collector.py b/swift/stats/db_stats_collector.py deleted file mode 100644 index 95efaa8597..0000000000 --- a/swift/stats/db_stats_collector.py +++ /dev/null @@ -1,177 +0,0 @@ -# Copyright (c) 2010-2011 OpenStack, LLC. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import time -from paste.deploy import appconfig -import shutil -import hashlib -import urllib - -from swift.account.server import DATADIR as account_server_data_dir -from swift.container.server import DATADIR as container_server_data_dir -from swift.common.db import AccountBroker, ContainerBroker -from swift.common.utils import renamer, get_logger, readconf, mkdirs, \ - TRUE_VALUES, remove_file -from swift.common.constraints import check_mount -from swift.common.daemon import Daemon - - -class DatabaseStatsCollector(Daemon): - """ - Extract storage stats from account databases on the account - storage nodes - - Any subclasses must define the function get_data. - """ - - def __init__(self, stats_conf, stats_type, data_dir, filename_format): - super(DatabaseStatsCollector, self).__init__(stats_conf) - self.stats_type = stats_type - self.data_dir = data_dir - self.filename_format = filename_format - self.devices = stats_conf.get('devices', '/srv/node') - self.mount_check = stats_conf.get('mount_check', - 'true').lower() in TRUE_VALUES - self.target_dir = stats_conf.get('log_dir', '/var/log/swift') - mkdirs(self.target_dir) - self.logger = get_logger(stats_conf, - log_route='%s-stats' % stats_type) - - def run_once(self, *args, **kwargs): - self.logger.info(_("Gathering %s stats" % self.stats_type)) - start = time.time() - self.find_and_process() - self.logger.info(_("Gathering %s stats complete (%0.2f minutes)") % - (self.stats_type, (time.time() - start) / 60)) - - def get_data(self): - raise NotImplementedError('Subclasses must override') - - def get_header(self): - raise NotImplementedError('Subclasses must override') - - def find_and_process(self): - src_filename = time.strftime(self.filename_format) - working_dir = os.path.join(self.target_dir, - '.%-stats_tmp' % self.stats_type) - shutil.rmtree(working_dir, ignore_errors=True) - mkdirs(working_dir) - tmp_filename = os.path.join(working_dir, src_filename) - hasher = hashlib.md5() - try: - with open(tmp_filename, 'wb') as statfile: - statfile.write(self.get_header()) - for device in os.listdir(self.devices): - if self.mount_check and not check_mount(self.devices, - device): - self.logger.error( - _("Device %s is not mounted, skipping.") % device) - continue - db_dir = os.path.join(self.devices, device, self.data_dir) - if not os.path.exists(db_dir): - self.logger.debug( - _("Path %s does not exist, skipping.") % db_dir) - continue - for root, dirs, files in os.walk(db_dir, topdown=False): - for filename in files: - if filename.endswith('.db'): - db_path = os.path.join(root, filename) - line_data = self.get_data(db_path) - if line_data: - statfile.write(line_data) - hasher.update(line_data) - - src_filename += hasher.hexdigest() - renamer(tmp_filename, os.path.join(self.target_dir, src_filename)) - finally: - shutil.rmtree(working_dir, ignore_errors=True) - - -class AccountStatsCollector(DatabaseStatsCollector): - """ - Extract storage stats from account databases on the account - storage nodes - """ - - def __init__(self, stats_conf): - super(AccountStatsCollector, self).__init__(stats_conf, 'account', - account_server_data_dir, - 'stats-%Y%m%d%H_') - - def get_data(self, db_path): - """ - Data for generated csv has the following columns: - Account Hash, Container Count, Object Count, Bytes Used - """ - line_data = None - broker = AccountBroker(db_path) - if not broker.is_deleted(): - info = broker.get_info() - line_data = '"%s",%d,%d,%d\n' % (info['account'], - info['container_count'], - info['object_count'], - info['bytes_used']) - return line_data - - def get_header(self): - return '' - - -class ContainerStatsCollector(DatabaseStatsCollector): - """ - Extract storage stats from container databases on the container - storage nodes - """ - - def __init__(self, stats_conf): - super(ContainerStatsCollector, self).__init__(stats_conf, 'container', - container_server_data_dir, - 'container-stats-%Y%m%d%H_') - # webob calls title on all the header keys - self.metadata_keys = ['X-Container-Meta-%s' % mkey.strip().title() - for mkey in stats_conf.get('metadata_keys', '').split(',') - if mkey.strip()] - - def get_header(self): - header = 'Account Hash,Container Name,Object Count,Bytes Used' - if self.metadata_keys: - xtra_headers = ','.join(self.metadata_keys) - header += ',%s' % xtra_headers - header += '\n' - return header - - def get_data(self, db_path): - """ - Data for generated csv has the following columns: - Account Hash, Container Name, Object Count, Bytes Used - This will just collect whether or not the metadata is set - using a 1 or ''. - """ - line_data = None - broker = ContainerBroker(db_path) - if not broker.is_deleted(): - info = broker.get_info(include_metadata=bool(self.metadata_keys)) - encoded_container_name = urllib.quote(info['container']) - line_data = '"%s","%s",%d,%d' % ( - info['account'], encoded_container_name, - info['object_count'], info['bytes_used']) - if self.metadata_keys: - metadata_results = ','.join( - [info['metadata'].get(mkey) and '1' or '' - for mkey in self.metadata_keys]) - line_data += ',%s' % metadata_results - line_data += '\n' - return line_data diff --git a/swift/stats/log_processor.py b/swift/stats/log_processor.py deleted file mode 100644 index 7f76305675..0000000000 --- a/swift/stats/log_processor.py +++ /dev/null @@ -1,581 +0,0 @@ -# Copyright (c) 2010-2011 OpenStack, LLC. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ConfigParser import ConfigParser -import zlib -import time -import datetime -import cStringIO -import collections -from paste.deploy import appconfig -import multiprocessing -import Queue -import cPickle -import hashlib - -from swift.common.internal_proxy import InternalProxy -from swift.common.exceptions import ChunkReadTimeout -from swift.common.utils import get_logger, readconf, TRUE_VALUES -from swift.common.daemon import Daemon - -now = datetime.datetime.now - - -class BadFileDownload(Exception): - - def __init__(self, status_code=None): - self.status_code = status_code - - -class LogProcessor(object): - """Load plugins, process logs""" - - def __init__(self, conf, logger): - if isinstance(logger, tuple): - self.logger = get_logger(*logger, log_route='log-processor') - else: - self.logger = logger - - self.conf = conf - self._internal_proxy = None - - # load the processing plugins - self.plugins = {} - plugin_prefix = 'log-processor-' - for section in (x for x in conf if x.startswith(plugin_prefix)): - plugin_name = section[len(plugin_prefix):] - plugin_conf = conf.get(section, {}) - if plugin_conf.get('processable', 'true').lower() not in \ - TRUE_VALUES: - continue - self.plugins[plugin_name] = plugin_conf - class_path = self.plugins[plugin_name]['class_path'] - import_target, class_name = class_path.rsplit('.', 1) - module = __import__(import_target, fromlist=[import_target]) - klass = getattr(module, class_name) - self.plugins[plugin_name]['instance'] = klass(plugin_conf) - self.logger.debug(_('Loaded plugin "%s"') % plugin_name) - - @property - def internal_proxy(self): - if self._internal_proxy is None: - stats_conf = self.conf.get('log-processor', {}) - proxy_server_conf_loc = stats_conf.get('proxy_server_conf', - '/etc/swift/proxy-server.conf') - proxy_server_conf = appconfig( - 'config:%s' % proxy_server_conf_loc, - name='proxy-server') - self._internal_proxy = InternalProxy(proxy_server_conf, - self.logger, - retries=3) - return self._internal_proxy - - def process_one_file(self, plugin_name, account, container, object_name): - self.logger.info(_('Processing %(obj)s with plugin "%(plugin)s"') % - {'obj': '/'.join((account, container, object_name)), - 'plugin': plugin_name}) - # get an iter of the object data - compressed = object_name.endswith('.gz') - stream = self.get_object_data(account, container, object_name, - compressed=compressed) - # look up the correct plugin and send the stream to it - return self.plugins[plugin_name]['instance'].process(stream, - account, - container, - object_name) - - def get_data_list(self, start_date=None, end_date=None, - listing_filter=None): - total_list = [] - for plugin_name, data in self.plugins.items(): - account = data['swift_account'] - container = data['container_name'] - listing = self.get_container_listing(account, - container, - start_date, - end_date) - for object_name in listing: - # The items in this list end up being passed as positional - # parameters to process_one_file. - x = (plugin_name, account, container, object_name) - if x not in listing_filter: - total_list.append(x) - return total_list - - def get_container_listing(self, swift_account, container_name, - start_date=None, end_date=None, - listing_filter=None): - ''' - Get a container listing, filtered by start_date, end_date, and - listing_filter. Dates, if given, must be in YYYYMMDDHH format - ''' - search_key = None - if start_date is not None: - try: - parsed_date = time.strptime(start_date, '%Y%m%d%H') - except ValueError: - pass - else: - year = '%04d' % parsed_date.tm_year - month = '%02d' % parsed_date.tm_mon - day = '%02d' % parsed_date.tm_mday - hour = '%02d' % parsed_date.tm_hour - search_key = '/'.join([year, month, day, hour]) - end_key = None - if end_date is not None: - try: - parsed_date = time.strptime(end_date, '%Y%m%d%H') - except ValueError: - pass - else: - year = '%04d' % parsed_date.tm_year - month = '%02d' % parsed_date.tm_mon - day = '%02d' % parsed_date.tm_mday - # Since the end_marker filters by <, we need to add something - # to make sure we get all the data under the last hour. Adding - # one to the hour should be all-inclusive. - hour = '%02d' % (parsed_date.tm_hour + 1) - end_key = '/'.join([year, month, day, hour]) - container_listing = self.internal_proxy.get_container_list( - swift_account, - container_name, - marker=search_key, - end_marker=end_key) - results = [] - if listing_filter is None: - listing_filter = set() - for item in container_listing: - name = item['name'] - if name not in listing_filter: - results.append(name) - return results - - def get_object_data(self, swift_account, container_name, object_name, - compressed=False): - '''reads an object and yields its lines''' - code, o = self.internal_proxy.get_object(swift_account, container_name, - object_name) - if code < 200 or code >= 300: - raise BadFileDownload(code) - last_part = '' - last_compressed_part = '' - # magic in the following zlib.decompressobj argument is courtesy of - # Python decompressing gzip chunk-by-chunk - # http://stackoverflow.com/questions/2423866 - d = zlib.decompressobj(16 + zlib.MAX_WBITS) - try: - for chunk in o: - if compressed: - try: - chunk = d.decompress(chunk) - except zlib.error: - self.logger.debug(_('Bad compressed data for %s') - % '/'.join((swift_account, container_name, - object_name))) - raise BadFileDownload() # bad compressed data - parts = chunk.split('\n') - parts[0] = last_part + parts[0] - for part in parts[:-1]: - yield part - last_part = parts[-1] - if last_part: - yield last_part - except ChunkReadTimeout: - raise BadFileDownload() - - def generate_keylist_mapping(self): - keylist = {} - for plugin in self.plugins: - plugin_keylist = self.plugins[plugin]['instance'].keylist_mapping() - if not plugin_keylist: - continue - for k, v in plugin_keylist.items(): - o = keylist.get(k) - if o: - if isinstance(o, set): - if isinstance(v, set): - o.update(v) - else: - o.update([v]) - else: - o = set(o) - if isinstance(v, set): - o.update(v) - else: - o.update([v]) - else: - o = v - keylist[k] = o - return keylist - - -class LogProcessorDaemon(Daemon): - """ - Gather raw log data and farm proccessing to generate a csv that is - uploaded to swift. - """ - - def __init__(self, conf): - c = conf.get('log-processor') - super(LogProcessorDaemon, self).__init__(c) - self.total_conf = conf - self.logger = get_logger(c, log_route='log-processor') - self.log_processor = LogProcessor(conf, self.logger) - self.lookback_hours = int(c.get('lookback_hours', '120')) - self.lookback_window = int(c.get('lookback_window', - str(self.lookback_hours))) - self.log_processor_account = c['swift_account'] - self.log_processor_container = c.get('container_name', - 'log_processing_data') - self.worker_count = int(c.get('worker_count', '1')) - self._keylist_mapping = None - self.processed_files_filename = 'processed_files.pickle.gz' - - def get_lookback_interval(self): - """ - :returns: lookback_start, lookback_end. - - Both or just lookback_end can be None. Otherwise, returns strings - of the form 'YYYYMMDDHH'. The interval returned is used as bounds - when looking for logs to processes. - - A returned None means don't limit the log files examined on that - side of the interval. - """ - - if self.lookback_hours == 0: - lookback_start = None - lookback_end = None - else: - delta_hours = datetime.timedelta(hours=self.lookback_hours) - lookback_start = now() - delta_hours - lookback_start = lookback_start.strftime('%Y%m%d%H') - if self.lookback_window == 0: - lookback_end = None - else: - delta_window = datetime.timedelta(hours=self.lookback_window) - lookback_end = now() - \ - delta_hours + \ - delta_window - lookback_end = lookback_end.strftime('%Y%m%d%H') - return lookback_start, lookback_end - - def get_processed_files_list(self): - """ - :returns: a set of files that have already been processed or returns - None on error. - - Downloads the set from the stats account. Creates an empty set if - the an existing file cannot be found. - """ - try: - # Note: this file (or data set) will grow without bound. - # In practice, if it becomes a problem (say, after many months of - # running), one could manually prune the file to remove older - # entries. Automatically pruning on each run could be dangerous. - # There is not a good way to determine when an old entry should be - # pruned (lookback_hours could be set to anything and could change) - stream = self.log_processor.get_object_data( - self.log_processor_account, - self.log_processor_container, - self.processed_files_filename, - compressed=True) - buf = '\n'.join(x for x in stream) - if buf: - files = cPickle.loads(buf) - else: - return None - except BadFileDownload, err: - if err.status_code == 404: - files = set() - else: - return None - return files - - def get_aggregate_data(self, processed_files, input_data): - """ - Aggregates stats data by account/hour, summing as needed. - - :param processed_files: set of processed files - :param input_data: is the output from multiprocess_collate/the plugins. - - :returns: A dict containing data aggregated from the input_data - passed in. - - The dict returned has tuple keys of the form: - (account, year, month, day, hour) - The dict returned has values that are dicts with items of this - form: - key:field_value - - key corresponds to something in one of the plugin's keylist - mapping, something like the tuple (source, level, verb, code) - - field_value is the sum of the field_values for the - corresponding values in the input - - Both input_data and the dict returned are hourly aggregations of - stats. - - Multiple values for the same (account, hour, tuple key) found in - input_data are summed in the dict returned. - """ - - aggr_data = {} - for item, data in input_data: - # since item contains the plugin and the log name, new plugins will - # "reprocess" the file and the results will be in the final csv. - processed_files.add(item) - for k, d in data.items(): - existing_data = aggr_data.get(k, {}) - for i, j in d.items(): - current = existing_data.get(i, 0) - # merging strategy for key collisions is addition - # processing plugins need to realize this - existing_data[i] = current + j - aggr_data[k] = existing_data - return aggr_data - - def get_final_info(self, aggr_data): - """ - Aggregates data from aggr_data based on the keylist mapping. - - :param aggr_data: The results of the get_aggregate_data function. - :returns: a dict of further aggregated data - - The dict returned has keys of the form: - (account, year, month, day, hour) - The dict returned has values that are dicts with items of this - form: - 'field_name': field_value (int) - - Data is aggregated as specified by the keylist mapping. The - keylist mapping specifies which keys to combine in aggr_data - and the final field_names for these combined keys in the dict - returned. Fields combined are summed. - """ - - final_info = collections.defaultdict(dict) - for account, data in aggr_data.items(): - for key, mapping in self.keylist_mapping.items(): - if isinstance(mapping, (list, set)): - value = 0 - for k in mapping: - try: - value += data[k] - except KeyError: - pass - else: - try: - value = data[mapping] - except KeyError: - value = 0 - final_info[account][key] = value - return final_info - - def store_processed_files_list(self, processed_files): - """ - Stores the proccessed files list in the stats account. - - :param processed_files: set of processed files - """ - - s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL) - f = cStringIO.StringIO(s) - self.log_processor.internal_proxy.upload_file(f, - self.log_processor_account, - self.log_processor_container, - self.processed_files_filename) - - def get_output(self, final_info): - """ - :returns: a list of rows to appear in the csv file. - - The first row contains the column headers for the rest of the - rows in the returned list. - - Each row after the first row corresponds to an account's data - for that hour. - """ - - sorted_keylist_mapping = sorted(self.keylist_mapping) - columns = ['data_ts', 'account'] + sorted_keylist_mapping - output = [columns] - for (account, year, month, day, hour), d in final_info.items(): - data_ts = '%04d/%02d/%02d %02d:00:00' % \ - (int(year), int(month), int(day), int(hour)) - row = [data_ts, '%s' % (account)] - for k in sorted_keylist_mapping: - row.append(str(d[k])) - output.append(row) - return output - - def store_output(self, output): - """ - Takes the a list of rows and stores a csv file of the values in the - stats account. - - :param output: list of rows to appear in the csv file - - This csv file is final product of this script. - """ - - out_buf = '\n'.join([','.join(row) for row in output]) - h = hashlib.md5(out_buf).hexdigest() - upload_name = time.strftime('%Y/%m/%d/%H/') + '%s.csv.gz' % h - f = cStringIO.StringIO(out_buf) - self.log_processor.internal_proxy.upload_file(f, - self.log_processor_account, - self.log_processor_container, - upload_name) - - @property - def keylist_mapping(self): - """ - :returns: the keylist mapping. - - The keylist mapping determines how the stats fields are aggregated - in the final aggregation step. - """ - - if self._keylist_mapping == None: - self._keylist_mapping = \ - self.log_processor.generate_keylist_mapping() - return self._keylist_mapping - - def process_logs(self, logs_to_process, processed_files): - """ - :param logs_to_process: list of logs to process - :param processed_files: set of processed files - - :returns: returns a list of rows of processed data. - - The first row is the column headers. The rest of the rows contain - hourly aggregate data for the account specified in the row. - - Files processed are added to the processed_files set. - - When a large data structure is no longer needed, it is deleted in - an effort to conserve memory. - """ - - # map - processor_args = (self.total_conf, self.logger) - results = multiprocess_collate(processor_args, logs_to_process, - self.worker_count) - - # reduce - aggr_data = self.get_aggregate_data(processed_files, results) - del results - - # group - # reduce a large number of keys in aggr_data[k] to a small - # number of output keys - final_info = self.get_final_info(aggr_data) - del aggr_data - - # output - return self.get_output(final_info) - - def run_once(self, *args, **kwargs): - """ - Process log files that fall within the lookback interval. - - Upload resulting csv file to stats account. - - Update processed files list and upload to stats account. - """ - - for k in 'lookback_hours lookback_window'.split(): - if k in kwargs and kwargs[k] is not None: - setattr(self, k, kwargs[k]) - - start = time.time() - self.logger.info(_("Beginning log processing")) - - lookback_start, lookback_end = self.get_lookback_interval() - self.logger.debug('lookback_start: %s' % lookback_start) - self.logger.debug('lookback_end: %s' % lookback_end) - - processed_files = self.get_processed_files_list() - if processed_files == None: - self.logger.error(_('Log processing unable to load list of ' - 'already processed log files')) - return - self.logger.debug(_('found %d processed files') % - len(processed_files)) - - logs_to_process = self.log_processor.get_data_list(lookback_start, - lookback_end, processed_files) - self.logger.info(_('loaded %d files to process') % - len(logs_to_process)) - - if logs_to_process: - output = self.process_logs(logs_to_process, processed_files) - self.store_output(output) - del output - - self.store_processed_files_list(processed_files) - - self.logger.info(_("Log processing done (%0.2f minutes)") % - ((time.time() - start) / 60)) - - -def multiprocess_collate(processor_args, logs_to_process, worker_count): - ''' - yield hourly data from logs_to_process - Every item that this function yields will be added to the processed files - list. - ''' - results = [] - in_queue = multiprocessing.Queue() - out_queue = multiprocessing.Queue() - for _junk in range(worker_count): - p = multiprocessing.Process(target=collate_worker, - args=(processor_args, - in_queue, - out_queue)) - p.start() - results.append(p) - for x in logs_to_process: - in_queue.put(x) - for _junk in range(worker_count): - in_queue.put(None) # tell the worker to end - while True: - try: - item, data = out_queue.get_nowait() - except Queue.Empty: - time.sleep(.01) - else: - if not isinstance(data, Exception): - yield item, data - if not any(r.is_alive() for r in results) and out_queue.empty(): - # all the workers are done and nothing is in the queue - break - - -def collate_worker(processor_args, in_queue, out_queue): - '''worker process for multiprocess_collate''' - p = LogProcessor(*processor_args) - while True: - item = in_queue.get() - if item is None: - # no more work to process - break - try: - ret = p.process_one_file(*item) - except Exception, err: - item_string = '/'.join(item[1:]) - p.logger.exception("Unable to process file '%s'" % (item_string)) - ret = err - out_queue.put((item, ret)) diff --git a/swift/stats/log_uploader.py b/swift/stats/log_uploader.py deleted file mode 100644 index ea51061d54..0000000000 --- a/swift/stats/log_uploader.py +++ /dev/null @@ -1,188 +0,0 @@ -# Copyright (c) 2010-2011 OpenStack, LLC. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import with_statement -import os -import hashlib -import time -import gzip -import re -import sys -from paste.deploy import appconfig - -from swift.common.internal_proxy import InternalProxy -from swift.common.daemon import Daemon -from swift.common import utils - - -class LogUploader(Daemon): - ''' - Given a local directory, a swift account, and a container name, LogParser - will upload all files in the local directory to the given account/ - container. All but the newest files will be uploaded, and the files' md5 - sum will be computed. The hash is used to prevent duplicate data from - being uploaded multiple times in different files (ex: log lines). Since - the hash is computed, it is also used as the uploaded object's etag to - ensure data integrity. - - Note that after the file is successfully uploaded, it will be unlinked. - - The given proxy server config is used to instantiate a proxy server for - the object uploads. - - The default log file format is: plugin_name-%Y%m%d%H* . Any other format - of log file names must supply a regular expression that defines groups - for year, month, day, and hour. The regular expression will be evaluated - with re.VERBOSE. A common example may be: - source_filename_pattern = ^cdn_logger- - (?P[0-9]{4}) - (?P[0-1][0-9]) - (?P[0-3][0-9]) - (?P[0-2][0-9]) - .*$ - ''' - - def __init__(self, uploader_conf, plugin_name, regex=None, cutoff=None): - super(LogUploader, self).__init__(uploader_conf) - log_name = '%s-log-uploader' % plugin_name - self.logger = utils.get_logger(uploader_conf, log_name, - log_route=plugin_name) - self.log_dir = uploader_conf.get('log_dir', '/var/log/swift/') - self.swift_account = uploader_conf['swift_account'] - self.container_name = uploader_conf['container_name'] - proxy_server_conf_loc = uploader_conf.get('proxy_server_conf', - '/etc/swift/proxy-server.conf') - proxy_server_conf = appconfig('config:%s' % proxy_server_conf_loc, - name='proxy-server') - self.internal_proxy = InternalProxy(proxy_server_conf) - self.new_log_cutoff = int(cutoff or - uploader_conf.get('new_log_cutoff', '7200')) - self.unlink_log = uploader_conf.get('unlink_log', 'true').lower() in \ - utils.TRUE_VALUES - self.filename_pattern = regex or \ - uploader_conf.get('source_filename_pattern', - ''' - ^%s- - (?P[0-9]{4}) - (?P[0-1][0-9]) - (?P[0-3][0-9]) - (?P[0-2][0-9]) - .*$''' % plugin_name) - - def run_once(self, *args, **kwargs): - self.logger.info(_("Uploading logs")) - start = time.time() - self.upload_all_logs() - self.logger.info(_("Uploading logs complete (%0.2f minutes)") % - ((time.time() - start) / 60)) - - def get_relpath_to_files_under_log_dir(self): - """ - Look under log_dir recursively and return all filenames as relpaths - - :returns : list of strs, the relpath to all filenames under log_dir - """ - all_files = [] - for path, dirs, files in os.walk(self.log_dir): - all_files.extend(os.path.join(path, f) for f in files) - return [os.path.relpath(f, start=self.log_dir) for f in all_files] - - def filter_files(self, all_files): - """ - Filter files based on regex pattern - - :param all_files: list of strs, relpath of the filenames under log_dir - :param pattern: regex pattern to match against filenames - - :returns : dict mapping full path of file to match group dict - """ - filename2match = {} - found_match = False - for filename in all_files: - match = re.match(self.filename_pattern, filename, re.VERBOSE) - if match: - found_match = True - full_path = os.path.join(self.log_dir, filename) - filename2match[full_path] = match.groupdict() - else: - self.logger.debug(_('%(filename)s does not match ' - '%(pattern)s') % {'filename': filename, - 'pattern': self.filename_pattern}) - return filename2match - - def upload_all_logs(self): - """ - Match files under log_dir to source_filename_pattern and upload to - swift - """ - all_files = self.get_relpath_to_files_under_log_dir() - filename2match = self.filter_files(all_files) - if not filename2match: - self.logger.error(_('No files in %(log_dir)s match %(pattern)s') % - {'log_dir': self.log_dir, - 'pattern': self.filename_pattern}) - sys.exit(1) - if not self.internal_proxy.create_container(self.swift_account, - self.container_name): - self.logger.error(_('Unable to create container for ' - '%(account)s/%(container)s') % { - 'account': self.swift_account, - 'container': self.container_name}) - return - for filename, match in filename2match.items(): - # don't process very new logs - seconds_since_mtime = time.time() - os.stat(filename).st_mtime - if seconds_since_mtime < self.new_log_cutoff: - self.logger.debug(_("Skipping log: %(file)s " - "(< %(cutoff)d seconds old)") % { - 'file': filename, - 'cutoff': self.new_log_cutoff}) - continue - self.upload_one_log(filename, **match) - - def upload_one_log(self, filename, year, month, day, hour): - """ - Upload one file to swift - """ - if os.path.getsize(filename) == 0: - self.logger.debug(_("Log %s is 0 length, skipping") % filename) - return - self.logger.debug(_("Processing log: %s") % filename) - filehash = hashlib.md5() - already_compressed = True if filename.endswith('.gz') else False - opener = gzip.open if already_compressed else open - f = opener(filename, 'rb') - try: - for line in f: - # filter out bad lines here? - filehash.update(line) - finally: - f.close() - filehash = filehash.hexdigest() - # By adding a hash to the filename, we ensure that uploaded files - # have unique filenames and protect against uploading one file - # more than one time. By using md5, we get an etag for free. - target_filename = '/'.join([year, month, day, hour, filehash + '.gz']) - if self.internal_proxy.upload_file(filename, - self.swift_account, - self.container_name, - target_filename, - compress=(not already_compressed)): - self.logger.debug(_("Uploaded log %(file)s to %(target)s") % - {'file': filename, 'target': target_filename}) - if self.unlink_log: - os.unlink(filename) - else: - self.logger.error(_("ERROR: Upload of log %s failed!") % filename) diff --git a/swift/stats/stats_processor.py b/swift/stats/stats_processor.py deleted file mode 100644 index f9496c1df9..0000000000 --- a/swift/stats/stats_processor.py +++ /dev/null @@ -1,69 +0,0 @@ -# Copyright (c) 2010-2011 OpenStack, LLC. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from swift.common.utils import get_logger - - -class StatsLogProcessor(object): - """Transform account storage stat logs""" - - def __init__(self, conf): - self.logger = get_logger(conf, log_route='stats-processor') - - def process(self, obj_stream, data_object_account, data_object_container, - data_object_name): - '''generate hourly groupings of data from one stats log file''' - account_totals = {} - year, month, day, hour, _junk = data_object_name.split('/') - for line in obj_stream: - if not line: - continue - try: - (account, - container_count, - object_count, - bytes_used) = line.split(',') - except (IndexError, ValueError): - # bad line data - self.logger.debug(_('Bad line data: %s') % repr(line)) - continue - account = account.strip('"') - container_count = int(container_count.strip('"')) - object_count = int(object_count.strip('"')) - bytes_used = int(bytes_used.strip('"')) - aggr_key = (account, year, month, day, hour) - d = account_totals.get(aggr_key, {}) - d['replica_count'] = d.setdefault('replica_count', 0) + 1 - d['container_count'] = d.setdefault('container_count', 0) + \ - container_count - d['object_count'] = d.setdefault('object_count', 0) + \ - object_count - d['bytes_used'] = d.setdefault('bytes_used', 0) + \ - bytes_used - account_totals[aggr_key] = d - return account_totals - - def keylist_mapping(self): - ''' - returns a dictionary of final keys mapped to source keys - ''' - keylist_mapping = { - # : or - 'bytes_used': 'bytes_used', - 'container_count': 'container_count', - 'object_count': 'object_count', - 'replica_count': 'replica_count', - } - return keylist_mapping diff --git a/test/functionalnosetests/test_account.py b/test/functionalnosetests/test_account.py index 4b5da32da1..02f48e62ba 100755 --- a/test/functionalnosetests/test_account.py +++ b/test/functionalnosetests/test_account.py @@ -2,6 +2,7 @@ import unittest from nose import SkipTest +from uuid import uuid4 from swift.common.constraints import MAX_META_COUNT, MAX_META_NAME_LENGTH, \ MAX_META_OVERALL_SIZE, MAX_META_VALUE_LENGTH @@ -132,6 +133,32 @@ class TestAccount(unittest.TestCase): resp.read() self.assertEquals(resp.status, 400) + def test_name_control_chars(self): + if skip: + raise SkipTest + + container = uuid4().hex + + def put(url, token, parsed, conn): + conn.request('PUT', '%s/%s%%01test' % + (parsed.path, container), '', + {'X-Auth-Token': token, 'Content-Length': '0'}) + return check_response(conn) + + resp = retry(put) + resp.read() + self.assertTrue(resp.status in (201, 202)) + + def get(url, token, parsed, conn): + conn.request('GET', '%s?format=xml' % (parsed.path,), '', + {'X-Auth-Token': token}) + return check_response(conn) + + resp = retry(get) + body = resp.read() + self.assertEquals(resp.status, 200) + self.assertTrue('%stest' % (container,) in body) + if __name__ == '__main__': unittest.main() diff --git a/test/functionalnosetests/test_container.py b/test/functionalnosetests/test_container.py index 738231e02b..a89b224899 100755 --- a/test/functionalnosetests/test_container.py +++ b/test/functionalnosetests/test_container.py @@ -522,6 +522,50 @@ class TestContainer(unittest.TestCase): resp.read() self.assertEquals(resp.status, 201) + def test_name_control_chars(self): + if skip: + raise SkipTest + + def put(url, token, parsed, conn): + conn.request('PUT', '%s/%s%%00test' % (parsed.path, self.name), '', + {'X-Auth-Token': token}) + return check_response(conn) + + resp = retry(put) + resp.read() + # NULLs not allowed + self.assertEquals(resp.status, 412) + + def put(url, token, parsed, conn): + conn.request('PUT', '%s/%s%%01test' % (parsed.path, self.name), '', + {'X-Auth-Token': token}) + return check_response(conn) + + resp = retry(put) + resp.read() + # 0x01 allowed + self.assertTrue(resp.status in (201, 202)) + + def put(url, token, parsed, conn): + conn.request('PUT', '%s/%s/object%%01test' % + (parsed.path, self.name), '', + {'X-Auth-Token': token, 'Content-Length': '0'}) + return check_response(conn) + + resp = retry(put) + resp.read() + self.assertTrue(resp.status in (201, 202)) + + def get(url, token, parsed, conn): + conn.request('GET', '%s/%s?format=xml' % (parsed.path, self.name), + '', {'X-Auth-Token': token}) + return check_response(conn) + + resp = retry(get) + body = resp.read() + self.assertEquals(resp.status, 200) + self.assertTrue('objecttest' in body) + if __name__ == '__main__': unittest.main() diff --git a/test/functionalnosetests/test_object.py b/test/functionalnosetests/test_object.py index 5975cf16a2..73bfae2f5e 100644 --- a/test/functionalnosetests/test_object.py +++ b/test/functionalnosetests/test_object.py @@ -541,6 +541,30 @@ class TestObject(unittest.TestCase): resp.read() self.assertEquals(resp.status, 204) + def test_name_control_chars(self): + if skip: + raise SkipTest + + def put(url, token, parsed, conn): + conn.request('PUT', '%s/%s/obj%%00test' % (parsed.path, + self.container), 'test', {'X-Auth-Token': token}) + return check_response(conn) + + resp = retry(put) + resp.read() + # NULLs not allowed + self.assertEquals(resp.status, 412) + + def put(url, token, parsed, conn): + conn.request('PUT', '%s/%s/obj%%01test' % (parsed.path, + self.container), 'test', {'X-Auth-Token': token}) + return check_response(conn) + + resp = retry(put) + resp.read() + # 0x01 allowed + self.assertEquals(resp.status, 201) + if __name__ == '__main__': unittest.main() diff --git a/test/unit/account/test_server.py b/test/unit/account/test_server.py index 238b7f3d18..cbc89395f1 100644 --- a/test/unit/account/test_server.py +++ b/test/unit/account/test_server.py @@ -22,6 +22,7 @@ from StringIO import StringIO import simplejson import xml.dom.minidom from webob import Request +from xml.parsers.expat import ExpatError from swift.account.server import AccountController, ACCOUNT_LISTING_LIMIT from swift.common.utils import normalize_timestamp @@ -450,7 +451,8 @@ class TestAccountController(unittest.TestCase): 'X-Bytes-Used': '0', 'X-Timestamp': normalize_timestamp(0)}) self.controller.PUT(req) - req = Request.blank('/sda1/p/a/c2', environ={'REQUEST_METHOD': 'PUT'}, + req = Request.blank('/sda1/p/a/c2%04', + environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Put-Timestamp': '2', 'X-Delete-Timestamp': '0', 'X-Object-Count': '0', @@ -462,7 +464,15 @@ class TestAccountController(unittest.TestCase): resp = self.controller.GET(req) self.assertEquals(resp.content_type, 'application/xml') self.assertEquals(resp.status_int, 200) - dom = xml.dom.minidom.parseString(resp.body) + try: + dom = xml.dom.minidom.parseString(resp.body) + except ExpatError, err: + # Expat doesn't like control characters, which are XML 1.1 + # compatible. Soooo, we have to replace them. We'll do a specific + # replace in this case, but real code that uses Expat will need + # something more resilient. + dom = xml.dom.minidom.parseString( + resp.body.replace('', '\\x04')) self.assertEquals(dom.firstChild.nodeName, 'account') listing = \ [n for n in dom.firstChild.childNodes if n.nodeName != '#text'] @@ -483,7 +493,7 @@ class TestAccountController(unittest.TestCase): self.assertEquals(sorted([n.nodeName for n in container]), ['bytes', 'count', 'name']) node = [n for n in container if n.nodeName == 'name'][0] - self.assertEquals(node.firstChild.nodeValue, 'c2') + self.assertEquals(node.firstChild.nodeValue, 'c2\\x04') node = [n for n in container if n.nodeName == 'count'][0] self.assertEquals(node.firstChild.nodeValue, '0') node = [n for n in container if n.nodeName == 'bytes'][0] @@ -495,7 +505,8 @@ class TestAccountController(unittest.TestCase): 'X-Bytes-Used': '2', 'X-Timestamp': normalize_timestamp(0)}) self.controller.PUT(req) - req = Request.blank('/sda1/p/a/c2', environ={'REQUEST_METHOD': 'PUT'}, + req = Request.blank('/sda1/p/a/c2%04', + environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Put-Timestamp': '2', 'X-Delete-Timestamp': '0', 'X-Object-Count': '3', @@ -506,7 +517,15 @@ class TestAccountController(unittest.TestCase): environ={'REQUEST_METHOD': 'GET'}) resp = self.controller.GET(req) self.assertEquals(resp.status_int, 200) - dom = xml.dom.minidom.parseString(resp.body) + try: + dom = xml.dom.minidom.parseString(resp.body) + except ExpatError, err: + # Expat doesn't like control characters, which are XML 1.1 + # compatible. Soooo, we have to replace them. We'll do a specific + # replace in this case, but real code that uses Expat will need + # something more resilient. + dom = xml.dom.minidom.parseString( + resp.body.replace('', '\\x04')) self.assertEquals(dom.firstChild.nodeName, 'account') listing = \ [n for n in dom.firstChild.childNodes if n.nodeName != '#text'] @@ -526,7 +545,7 @@ class TestAccountController(unittest.TestCase): self.assertEquals(sorted([n.nodeName for n in container]), ['bytes', 'count', 'name']) node = [n for n in container if n.nodeName == 'name'][0] - self.assertEquals(node.firstChild.nodeValue, 'c2') + self.assertEquals(node.firstChild.nodeValue, 'c2\\x04') node = [n for n in container if n.nodeName == 'count'][0] self.assertEquals(node.firstChild.nodeValue, '3') node = [n for n in container if n.nodeName == 'bytes'][0] @@ -959,6 +978,35 @@ class TestAccountController(unittest.TestCase): resp = self.controller.GET(req) self.assert_(resp.status_int in (204, 412), resp.status_int) + def test_params_no_null(self): + self.controller.PUT(Request.blank('/sda1/p/a', + headers={'X-Timestamp': normalize_timestamp(1)}, + environ={'REQUEST_METHOD': 'PUT'})) + for param in ('delimiter', 'format', 'limit', 'marker', + 'prefix'): + req = Request.blank('/sda1/p/a?%s=\x00' % param, + environ={'REQUEST_METHOD': 'GET'}) + resp = self.controller.GET(req) + self.assertEquals(resp.status_int, 400) + + def test_PUT_account_no_null(self): + req = Request.blank('/sda1/p/test\x00test', + environ={'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '1'}) + resp = self.controller.PUT(req) + self.assertEquals(resp.status_int, 400) + + def test_PUT_container_no_null(self): + req = Request.blank('/sda1/p/a', + environ={'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '1'}) + resp = self.controller.PUT(req) + self.assertEquals(resp.status_int, 201) + req = Request.blank('/sda1/p/a/test\x00test', + environ={'REQUEST_METHOD': 'PUT', 'HTTP_X_PUT_TIMESTAMP': '1', + 'HTTP_X_DELETE_TIMESTAMP': '0', + 'HTTP_X_OBJECT_COUNT': '0', 'HTTP_X_BYTES_USED': '0'}) + resp = self.controller.PUT(req) + self.assertEquals(resp.status_int, 400) + if __name__ == '__main__': unittest.main() diff --git a/test/unit/common/middleware/test_tempauth.py b/test/unit/common/middleware/test_tempauth.py index fdb988a699..df9a51cf8a 100644 --- a/test/unit/common/middleware/test_tempauth.py +++ b/test/unit/common/middleware/test_tempauth.py @@ -542,5 +542,23 @@ class TestAuth(unittest.TestCase): self.assertEquals(resp.status_int, 204) +class TestParseUserCreation(unittest.TestCase): + def test_parse_user_creation(self): + auth_filter = auth.filter_factory({ + 'user_test_tester3': 'testing', + 'user_admin_admin': 'admin .admin .reseller_admin', + })(FakeApp()) + self.assertEquals(auth_filter.users, { + 'admin:admin': { + 'url': 'http://127.0.0.1:8080/v1/AUTH_admin', + 'groups': ['.admin', '.reseller_admin'], + 'key': 'admin' + }, 'test:tester3': { + 'url': 'http://127.0.0.1:8080/v1/AUTH_test', + 'groups': [], + 'key': 'testing' + }, + }) + if __name__ == '__main__': unittest.main() diff --git a/test/unit/common/test_compressing_file_reader.py b/test/unit/common/test_compressing_file_reader.py deleted file mode 100644 index 65c29554d7..0000000000 --- a/test/unit/common/test_compressing_file_reader.py +++ /dev/null @@ -1,34 +0,0 @@ -# Copyright (c) 2010-2011 OpenStack, LLC. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" Tests for swift.common.compressing_file_reader """ - -import unittest -import cStringIO - -from swift.common.compressing_file_reader import CompressingFileReader - -class TestCompressingFileReader(unittest.TestCase): - - def test_read(self): - plain = 'obj\ndata' - s = cStringIO.StringIO(plain) - expected = '\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xff\xcaO\xca\xe2JI,'\ - 'I\x04\x00\x00\x00\xff\xff\x03\x00P(\xa8\x1f\x08\x00\x00'\ - '\x00' - x = CompressingFileReader(s) - compressed = ''.join(iter(lambda: x.read(), '')) - self.assertEquals(compressed, expected) - self.assertEquals(x.read(), '') diff --git a/test/unit/common/test_internal_proxy.py b/test/unit/common/test_internal_proxy.py deleted file mode 100644 index a2e82f8d31..0000000000 --- a/test/unit/common/test_internal_proxy.py +++ /dev/null @@ -1,192 +0,0 @@ -# Copyright (c) 2010-2011 OpenStack, LLC. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# TODO: Tests - -import unittest -import webob -import tempfile -import json - -from swift.common import internal_proxy - -class DumbBaseApplicationFactory(object): - - def __init__(self, status_codes, body=''): - self.status_codes = status_codes[:] - self.body = body - - def __call__(self, *a, **kw): - app = DumbBaseApplication(*a, **kw) - app.status_codes = self.status_codes - try: - app.default_status_code = self.status_codes[-1] - except IndexError: - app.default_status_code = 200 - app.body = self.body - return app - -class DumbBaseApplication(object): - - def __init__(self, *a, **kw): - self.status_codes = [] - self.default_status_code = 200 - self.call_count = 0 - self.body = '' - - def handle_request(self, req): - self.call_count += 1 - req.path_info_pop() - if isinstance(self.body, list): - try: - body = self.body.pop(0) - except IndexError: - body = '' - else: - body = self.body - resp = webob.Response(request=req, body=body, - conditional_response=True) - try: - resp.status_int = self.status_codes.pop(0) - except IndexError: - resp.status_int = self.default_status_code - return resp - - def update_request(self, req): - return req - - -class TestInternalProxy(unittest.TestCase): - - def test_webob_request_copy(self): - req = webob.Request.blank('/') - req2 = internal_proxy.webob_request_copy(req) - self.assertEquals(req.path, req2.path) - self.assertEquals(req.path_info, req2.path_info) - self.assertFalse(req is req2) - - def test_handle_request(self): - status_codes = [200] - internal_proxy.BaseApplication = DumbBaseApplicationFactory( - status_codes) - p = internal_proxy.InternalProxy() - req = webob.Request.blank('/') - orig_req = internal_proxy.webob_request_copy(req) - resp = p._handle_request(req) - self.assertEquals(req.path_info, orig_req.path_info) - - def test_handle_request_with_retries(self): - status_codes = [500, 200] - internal_proxy.BaseApplication = DumbBaseApplicationFactory( - status_codes) - p = internal_proxy.InternalProxy(retries=3) - req = webob.Request.blank('/') - orig_req = internal_proxy.webob_request_copy(req) - resp = p._handle_request(req) - self.assertEquals(req.path_info, orig_req.path_info) - self.assertEquals(p.upload_app.call_count, 2) - self.assertEquals(resp.status_int, 200) - - def test_get_object(self): - status_codes = [200] - internal_proxy.BaseApplication = DumbBaseApplicationFactory( - status_codes) - p = internal_proxy.InternalProxy() - code, body = p.get_object('a', 'c', 'o') - body = ''.join(body) - self.assertEquals(code, 200) - self.assertEquals(body, '') - - def test_create_container(self): - status_codes = [200] - internal_proxy.BaseApplication = DumbBaseApplicationFactory( - status_codes) - p = internal_proxy.InternalProxy() - resp = p.create_container('a', 'c') - self.assertTrue(resp) - - def test_handle_request_with_retries_all_error(self): - status_codes = [500, 500, 500, 500, 500] - internal_proxy.BaseApplication = DumbBaseApplicationFactory( - status_codes) - p = internal_proxy.InternalProxy(retries=3) - req = webob.Request.blank('/') - orig_req = internal_proxy.webob_request_copy(req) - resp = p._handle_request(req) - self.assertEquals(req.path_info, orig_req.path_info) - self.assertEquals(p.upload_app.call_count, 3) - self.assertEquals(resp.status_int, 500) - - def test_get_container_list_empty(self): - status_codes = [200] - internal_proxy.BaseApplication = DumbBaseApplicationFactory( - status_codes, body='[]') - p = internal_proxy.InternalProxy() - resp = p.get_container_list('a', 'c') - self.assertEquals(resp, []) - - def test_get_container_list_no_body(self): - status_codes = [204] - internal_proxy.BaseApplication = DumbBaseApplicationFactory( - status_codes, body='') - p = internal_proxy.InternalProxy() - resp = p.get_container_list('a', 'c') - self.assertEquals(resp, []) - - def test_get_container_list_full_listing(self): - status_codes = [200, 200] - obj_a = dict(name='foo', hash='foo', bytes=3, - content_type='text/plain', last_modified='2011/01/01') - obj_b = dict(name='bar', hash='bar', bytes=3, - content_type='text/plain', last_modified='2011/01/01') - body = [json.dumps([obj_a]), json.dumps([obj_b]), json.dumps([])] - internal_proxy.BaseApplication = DumbBaseApplicationFactory( - status_codes, body=body) - p = internal_proxy.InternalProxy() - resp = p.get_container_list('a', 'c') - expected = ['foo', 'bar'] - self.assertEquals([x['name'] for x in resp], expected) - - def test_get_container_list_full(self): - status_codes = [204] - internal_proxy.BaseApplication = DumbBaseApplicationFactory( - status_codes, body='') - p = internal_proxy.InternalProxy() - resp = p.get_container_list('a', 'c', marker='a', end_marker='b', - limit=100, prefix='/', delimiter='.') - self.assertEquals(resp, []) - - def test_upload_file(self): - status_codes = [200, 200] # container PUT + object PUT - internal_proxy.BaseApplication = DumbBaseApplicationFactory( - status_codes) - p = internal_proxy.InternalProxy() - with tempfile.NamedTemporaryFile() as file_obj: - resp = p.upload_file(file_obj.name, 'a', 'c', 'o') - self.assertTrue(resp) - - def test_upload_file_with_retries(self): - status_codes = [200, 500, 200] # container PUT + error + object PUT - internal_proxy.BaseApplication = DumbBaseApplicationFactory( - status_codes) - p = internal_proxy.InternalProxy(retries=3) - with tempfile.NamedTemporaryFile() as file_obj: - resp = p.upload_file(file_obj, 'a', 'c', 'o') - self.assertTrue(resp) - self.assertEquals(p.upload_app.call_count, 3) - - -if __name__ == '__main__': - unittest.main() diff --git a/test/unit/container/test_server.py b/test/unit/container/test_server.py index 117a0dccd5..7ef0cee5c3 100644 --- a/test/unit/container/test_server.py +++ b/test/unit/container/test_server.py @@ -246,6 +246,24 @@ class TestContainerController(unittest.TestCase): resp = self.controller.PUT(req) self.assertEquals(resp.status_int, 201) + def test_PUT_container_no_null(self): + req = Request.blank('/sda1/p/a/test\x00test', + environ={'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '1'}) + resp = self.controller.PUT(req) + self.assertEquals(resp.status_int, 400) + + def test_PUT_object_no_null(self): + req = Request.blank('/sda1/p/a/test', + environ={'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '1'}) + resp = self.controller.PUT(req) + self.assertEquals(resp.status_int, 201) + req = Request.blank('/sda1/p/a/test/test\x00test', + environ={'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '1', + 'HTTP_X_SIZE': '0', 'HTTP_X_CONTENT_TYPE': 'text/plain', + 'HTTP_X_ETAG': 'd41d8cd98f00b204e9800998ecf8427e'}) + resp = self.controller.PUT(req) + self.assertEquals(resp.status_int, 400) + def test_PUT_account_update(self): bindsock = listen(('127.0.0.1', 0)) def accept(return_code, expected_timestamp): @@ -582,25 +600,25 @@ class TestContainerController(unittest.TestCase): resp = self.controller.PUT(req) # fill the container for i in range(3): - req = Request.blank('/sda1/p/a/xmlc/%s'%i, environ= - {'REQUEST_METHOD': 'PUT', - 'HTTP_X_TIMESTAMP': '1', - 'HTTP_X_CONTENT_TYPE': 'text/plain', - 'HTTP_X_ETAG': 'x', - 'HTTP_X_SIZE': 0}) + req = Request.blank('/sda1/p/a/xmlc/%s%%%02x' % (i, i + 1), + environ={'REQUEST_METHOD': 'PUT', + 'HTTP_X_TIMESTAMP': '1', + 'HTTP_X_CONTENT_TYPE': 'text/plain', + 'HTTP_X_ETAG': 'x', + 'HTTP_X_SIZE': 0}) resp = self.controller.PUT(req) self.assertEquals(resp.status_int, 201) xml_body = '\n' \ '' \ - '0x0' \ + '0x0' \ 'text/plain' \ '1970-01-01T00:00:01' \ '' \ - '1x0' \ + '1x0' \ 'text/plain' \ '1970-01-01T00:00:01' \ '' \ - '2x0' \ + '2x0' \ 'text/plain' \ '1970-01-01T00:00:01' \ '' \ @@ -822,6 +840,17 @@ class TestContainerController(unittest.TestCase): resp = self.controller.GET(req) self.assert_(resp.status_int in (204, 412), resp.status_int) + def test_params_no_null(self): + self.controller.PUT(Request.blank('/sda1/p/a/c', + headers={'X-Timestamp': normalize_timestamp(1)}, + environ={'REQUEST_METHOD': 'PUT'})) + for param in ('delimiter', 'format', 'limit', 'marker', 'path', + 'prefix'): + req = Request.blank('/sda1/p/a/c?%s=\x00' % param, + environ={'REQUEST_METHOD': 'GET'}) + resp = self.controller.GET(req) + self.assertEquals(resp.status_int, 400) + if __name__ == '__main__': unittest.main() diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 604ca630da..f7b1edd328 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -2045,8 +2045,8 @@ class TestObjectController(unittest.TestCase): proxy_server.http_connect = fake_http_connect(201, 201, 201, 201) controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') - req = Request.blank('/a/c/o', {}, headers={ - 'Transfer-Encoding': 'chunked', + req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'COPY'}, + headers={'Transfer-Encoding': 'chunked', 'Content-Type': 'foo/bar'}) req.body_file = ChunkedFile(10) @@ -2058,8 +2058,8 @@ class TestObjectController(unittest.TestCase): # test 413 entity to large from swift.proxy import server proxy_server.http_connect = fake_http_connect(201, 201, 201, 201) - req = Request.blank('/a/c/o', {}, headers={ - 'Transfer-Encoding': 'chunked', + req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'COPY'}, + headers={'Transfer-Encoding': 'chunked', 'Content-Type': 'foo/bar'}) req.body_file = ChunkedFile(11) self.app.memcache.store = {} @@ -2111,6 +2111,20 @@ class TestObjectController(unittest.TestCase): exp = 'HTTP/1.1 412' self.assertEquals(headers[:len(exp)], exp) + def test_chunked_put_bad_utf8_null(self): + # Check invalid utf-8 + (prolis, acc1lis, acc2lis, con2lis, con2lis, obj1lis, obj2lis) = \ + _test_sockets + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('GET /v1/a%00 HTTP/1.1\r\nHost: localhost\r\n' + 'Connection: close\r\nX-Auth-Token: t\r\n' + 'Content-Length: 0\r\n\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 412' + self.assertEquals(headers[:len(exp)], exp) + def test_chunked_put_bad_path_no_controller(self): # Check bad path, no controller (prolis, acc1lis, acc2lis, con2lis, con2lis, obj1lis, obj2lis) = \ diff --git a/test/unit/stats/__init__.py b/test/unit/stats/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/test/unit/stats/test_access_processor.py b/test/unit/stats/test_access_processor.py deleted file mode 100644 index 18e18803f2..0000000000 --- a/test/unit/stats/test_access_processor.py +++ /dev/null @@ -1,94 +0,0 @@ -# Copyright (c) 2010-2011 OpenStack, LLC. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# TODO: Tests - -import unittest -from swift.stats import access_processor - - -class TestAccessProcessor(unittest.TestCase): - - def test_log_line_parser_query_args(self): - p = access_processor.AccessLogProcessor({}) - log_line = [str(x) for x in range(18)] - log_line[1] = 'proxy-server' - log_line[4] = '1/Jan/3/4/5/6' - query = 'foo' - for param in access_processor.LISTING_PARAMS: - query += '&%s=blah' % param - log_line[6] = '/v1/a/c/o?%s' % query - log_line = 'x'*16 + ' '.join(log_line) - res = p.log_line_parser(log_line) - expected = {'code': 8, 'processing_time': '17', 'auth_token': '11', - 'month': '01', 'second': '6', 'year': '3', 'tz': '+0000', - 'http_version': '7', 'object_name': 'o', 'etag': '14', - 'method': '5', 'trans_id': '15', 'client_ip': '2', - 'bytes_out': 13, 'container_name': 'c', 'day': '1', - 'minute': '5', 'account': 'a', 'hour': '4', - 'referrer': '9', 'request': '/v1/a/c/o', - 'user_agent': '10', 'bytes_in': 12, 'lb_ip': '3'} - for param in access_processor.LISTING_PARAMS: - expected[param] = 1 - expected['query'] = query - self.assertEquals(res, expected) - - def test_log_line_parser_field_count(self): - p = access_processor.AccessLogProcessor({}) - # too few fields - log_line = [str(x) for x in range(17)] - log_line[1] = 'proxy-server' - log_line[4] = '1/Jan/3/4/5/6' - log_line[6] = '/v1/a/c/o' - log_line = 'x'*16 + ' '.join(log_line) - res = p.log_line_parser(log_line) - expected = {} - self.assertEquals(res, expected) - # right amount of fields - log_line = [str(x) for x in range(18)] - log_line[1] = 'proxy-server' - log_line[4] = '1/Jan/3/4/5/6' - log_line[6] = '/v1/a/c/o' - log_line = 'x'*16 + ' '.join(log_line) - res = p.log_line_parser(log_line) - expected = {'code': 8, 'processing_time': '17', 'auth_token': '11', - 'month': '01', 'second': '6', 'year': '3', 'tz': '+0000', - 'http_version': '7', 'object_name': 'o', 'etag': '14', - 'method': '5', 'trans_id': '15', 'client_ip': '2', - 'bytes_out': 13, 'container_name': 'c', 'day': '1', - 'minute': '5', 'account': 'a', 'hour': '4', - 'referrer': '9', 'request': '/v1/a/c/o', - 'user_agent': '10', 'bytes_in': 12, 'lb_ip': '3'} - self.assertEquals(res, expected) - # too many fields - log_line = [str(x) for x in range(19)] - log_line[1] = 'proxy-server' - log_line[4] = '1/Jan/3/4/5/6' - log_line[6] = '/v1/a/c/o' - log_line = 'x'*16 + ' '.join(log_line) - res = p.log_line_parser(log_line) - expected = {'code': 8, 'processing_time': '17', 'auth_token': '11', - 'month': '01', 'second': '6', 'year': '3', 'tz': '+0000', - 'http_version': '7', 'object_name': 'o', 'etag': '14', - 'method': '5', 'trans_id': '15', 'client_ip': '2', - 'bytes_out': 13, 'container_name': 'c', 'day': '1', - 'minute': '5', 'account': 'a', 'hour': '4', - 'referrer': '9', 'request': '/v1/a/c/o', - 'user_agent': '10', 'bytes_in': 12, 'lb_ip': '3'} - self.assertEquals(res, expected) - - -if __name__ == '__main__': - unittest.main() diff --git a/test/unit/stats/test_db_stats_collector.py b/test/unit/stats/test_db_stats_collector.py deleted file mode 100644 index 3c4949aff5..0000000000 --- a/test/unit/stats/test_db_stats_collector.py +++ /dev/null @@ -1,238 +0,0 @@ -# Copyright (c) 2010-2011 OpenStack, LLC. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import unittest -import os -import time -import uuid -from shutil import rmtree -from swift.stats import db_stats_collector -from tempfile import mkdtemp -from test.unit import FakeLogger -from swift.common.db import AccountBroker, ContainerBroker -from swift.common.utils import mkdirs - - -class TestDbStats(unittest.TestCase): - - def setUp(self): - self._was_logger = db_stats_collector.get_logger - db_stats_collector.get_logger = FakeLogger - self.testdir = os.path.join(mkdtemp(), 'tmp_test_db_stats') - self.devices = os.path.join(self.testdir, 'node') - rmtree(self.testdir, ignore_errors=1) - mkdirs(os.path.join(self.devices, 'sda')) - self.accounts = os.path.join(self.devices, 'sda', 'accounts') - self.containers = os.path.join(self.devices, 'sda', 'containers') - self.log_dir = '%s/log' % self.testdir - - self.conf = dict(devices=self.devices, - log_dir=self.log_dir, - mount_check='false') - - def tearDown(self): - db_stats_collector.get_logger = self._was_logger - rmtree(self.testdir) - - def test_account_stat_get_data(self): - stat = db_stats_collector.AccountStatsCollector(self.conf) - account_db = AccountBroker("%s/acc.db" % self.accounts, - account='test_acc') - account_db.initialize() - account_db.put_container('test_container', time.time(), - None, 10, 1000) - info = stat.get_data("%s/acc.db" % self.accounts) - self.assertEquals('''"test_acc",1,10,1000\n''', info) - - def test_container_stat_get_data(self): - stat = db_stats_collector.ContainerStatsCollector(self.conf) - container_db = ContainerBroker("%s/con.db" % self.containers, - account='test_acc', container='test_con') - container_db.initialize() - container_db.put_object('test_obj', time.time(), 10, 'text', 'faketag') - info = stat.get_data("%s/con.db" % self.containers) - self.assertEquals('''"test_acc","test_con",1,10\n''', info) - - def test_container_stat_get_metadata(self): - stat = db_stats_collector.ContainerStatsCollector(self.conf) - container_db = ContainerBroker("%s/con.db" % self.containers, - account='test_acc', container='test_con') - container_db.initialize() - container_db.put_object('test_obj', time.time(), 10, 'text', 'faketag') - info = stat.get_data("%s/con.db" % self.containers) - self.assertEquals('''"test_acc","test_con",1,10\n''', info) - container_db.update_metadata({'test1': ('val', 1000)}) - - def _gen_account_stat(self): - stat = db_stats_collector.AccountStatsCollector(self.conf) - output_data = set() - for i in range(10): - account_db = AccountBroker("%s/stats-201001010%s-%s.db" % - (self.accounts, i, uuid.uuid4().hex), - account='test_acc_%s' % i) - account_db.initialize() - account_db.put_container('test_container', time.time(), - None, 10, 1000) - # this will "commit" the data - account_db.get_info() - output_data.add('''"test_acc_%s",1,10,1000''' % i), - - self.assertEqual(len(output_data), 10) - return stat, output_data - - def _drop_metadata_col(self, broker, acc_name): - broker.conn.execute('''drop table container_stat''') - broker.conn.executescript(""" - CREATE TABLE container_stat ( - account TEXT DEFAULT '%s', - container TEXT DEFAULT 'test_con', - created_at TEXT, - put_timestamp TEXT DEFAULT '0', - delete_timestamp TEXT DEFAULT '0', - object_count INTEGER, - bytes_used INTEGER, - reported_put_timestamp TEXT DEFAULT '0', - reported_delete_timestamp TEXT DEFAULT '0', - reported_object_count INTEGER DEFAULT 0, - reported_bytes_used INTEGER DEFAULT 0, - hash TEXT default '00000000000000000000000000000000', - id TEXT, - status TEXT DEFAULT '', - status_changed_at TEXT DEFAULT '0' - ); - - INSERT INTO container_stat (object_count, bytes_used) - VALUES (1, 10); - """ % acc_name) - - def _gen_container_stat(self, set_metadata=False, drop_metadata=False): - if set_metadata: - self.conf['metadata_keys'] = 'test1,test2' - # webob runs title on all headers - stat = db_stats_collector.ContainerStatsCollector(self.conf) - output_data = set() - for i in range(10): - cont_db = ContainerBroker( - "%s/container-stats-201001010%s-%s.db" % (self.containers, i, - uuid.uuid4().hex), - account='test_acc_%s' % i, container='test_con') - cont_db.initialize() - cont_db.put_object('test_obj', time.time(), 10, 'text', 'faketag') - metadata_output = '' - if set_metadata: - if i % 2: - cont_db.update_metadata({'X-Container-Meta-Test1': (5, 1)}) - metadata_output = ',1,' - else: - cont_db.update_metadata({'X-Container-Meta-Test2': (7, 2)}) - metadata_output = ',,1' - # this will "commit" the data - cont_db.get_info() - if drop_metadata: - output_data.add('''"test_acc_%s","test_con",1,10,,''' % i) - else: - output_data.add('''"test_acc_%s","test_con",1,10%s''' % - (i, metadata_output)) - if drop_metadata: - self._drop_metadata_col(cont_db, 'test_acc_%s' % i) - - self.assertEqual(len(output_data), 10) - return stat, output_data - - def test_account_stat_run_once_account(self): - stat, output_data = self._gen_account_stat() - stat.run_once() - stat_file = os.listdir(self.log_dir)[0] - with open(os.path.join(self.log_dir, stat_file)) as stat_handle: - for i in range(10): - data = stat_handle.readline() - output_data.discard(data.strip()) - - self.assertEqual(len(output_data), 0) - - def test_account_stat_run_once_container_metadata(self): - - stat, output_data = self._gen_container_stat(set_metadata=True) - stat.run_once() - stat_file = os.listdir(self.log_dir)[0] - with open(os.path.join(self.log_dir, stat_file)) as stat_handle: - headers = stat_handle.readline() - self.assert_(headers.startswith('Account Hash,Container Name,')) - for i in range(10): - data = stat_handle.readline() - output_data.discard(data.strip()) - - self.assertEqual(len(output_data), 0) - - def test_account_stat_run_once_container_no_metadata(self): - - stat, output_data = self._gen_container_stat(set_metadata=True, - drop_metadata=True) - stat.run_once() - stat_file = os.listdir(self.log_dir)[0] - with open(os.path.join(self.log_dir, stat_file)) as stat_handle: - headers = stat_handle.readline() - self.assert_(headers.startswith('Account Hash,Container Name,')) - for i in range(10): - data = stat_handle.readline() - output_data.discard(data.strip()) - - self.assertEqual(len(output_data), 0) - - def test_account_stat_run_once_both(self): - acc_stat, acc_output_data = self._gen_account_stat() - con_stat, con_output_data = self._gen_container_stat() - - acc_stat.run_once() - stat_file = os.listdir(self.log_dir)[0] - with open(os.path.join(self.log_dir, stat_file)) as stat_handle: - for i in range(10): - data = stat_handle.readline() - acc_output_data.discard(data.strip()) - - self.assertEqual(len(acc_output_data), 0) - - con_stat.run_once() - stat_file = [f for f in os.listdir(self.log_dir) if f != stat_file][0] - with open(os.path.join(self.log_dir, stat_file)) as stat_handle: - headers = stat_handle.readline() - self.assert_(headers.startswith('Account Hash,Container Name,')) - for i in range(10): - data = stat_handle.readline() - con_output_data.discard(data.strip()) - - self.assertEqual(len(con_output_data), 0) - - def test_account_stat_run_once_fail(self): - stat, output_data = self._gen_account_stat() - rmtree(self.accounts) - stat.run_once() - self.assertEquals(len(stat.logger.log_dict['debug']), 1) - - def test_not_implemented(self): - db_stat = db_stats_collector.DatabaseStatsCollector(self.conf, - 'account', 'test_dir', 'stats-%Y%m%d%H_') - self.assertRaises(NotImplementedError, db_stat.get_data) - self.assertRaises(NotImplementedError, db_stat.get_header) - - def test_not_not_mounted(self): - self.conf['mount_check'] = 'true' - stat, output_data = self._gen_account_stat() - stat.run_once() - self.assertEquals(len(stat.logger.log_dict['error']), 1) - -if __name__ == '__main__': - unittest.main() diff --git a/test/unit/stats/test_log_processor.py b/test/unit/stats/test_log_processor.py deleted file mode 100644 index c1b3b68b19..0000000000 --- a/test/unit/stats/test_log_processor.py +++ /dev/null @@ -1,834 +0,0 @@ -# Copyright (c) 2010-2011 OpenStack, LLC. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest -from test.unit import tmpfile -import Queue -import datetime -import hashlib -import pickle -import time - -from swift.common import internal_proxy -from swift.stats import log_processor -from swift.common.exceptions import ChunkReadTimeout - - -class FakeUploadApp(object): - def __init__(self, *args, **kwargs): - pass - -class DumbLogger(object): - def __getattr__(self, n): - return self.foo - - def foo(self, *a, **kw): - pass - -class DumbInternalProxy(object): - def __init__(self, code=200, timeout=False, bad_compressed=False): - self.code = code - self.timeout = timeout - self.bad_compressed = bad_compressed - - def get_container_list(self, account, container, marker=None, - end_marker=None): - n = '2010/03/14/13/obj1' - if marker is None or n > marker: - if end_marker: - if n <= end_marker: - return [{'name': n}] - else: - return [] - return [{'name': n}] - return [] - - def get_object(self, account, container, object_name): - if object_name.endswith('.gz'): - if self.bad_compressed: - # invalid compressed data - def data(): - yield '\xff\xff\xff\xff\xff\xff\xff' - else: - # 'obj\ndata', compressed with gzip -9 - def data(): - yield '\x1f\x8b\x08' - yield '\x08"\xd79L' - yield '\x02\x03te' - yield 'st\x00\xcbO' - yield '\xca\xe2JI,I' - yield '\xe4\x02\x00O\xff' - yield '\xa3Y\t\x00\x00\x00' - else: - def data(): - yield 'obj\n' - if self.timeout: - raise ChunkReadTimeout - yield 'data' - return self.code, data() - -class TestLogProcessor(unittest.TestCase): - - access_test_line = 'Jul 9 04:14:30 saio proxy-server 1.2.3.4 4.5.6.7 '\ - '09/Jul/2010/04/14/30 GET '\ - '/v1/acct/foo/bar?format=json&foo HTTP/1.0 200 - '\ - 'curl tk4e350daf-9338-4cc6-aabb-090e49babfbd '\ - '6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262' - stats_test_line = 'account,1,2,3' - proxy_config = {'log-processor': { - - } - } - - def test_lazy_load_internal_proxy(self): - # stub out internal_proxy's upload_app - internal_proxy.BaseApplication = FakeUploadApp - dummy_proxy_config = """[app:proxy-server] -use = egg:swift#proxy -""" - with tmpfile(dummy_proxy_config) as proxy_config_file: - conf = {'log-processor': { - 'proxy_server_conf': proxy_config_file, - } - } - p = log_processor.LogProcessor(conf, DumbLogger()) - self.assert_(isinstance(p._internal_proxy, - None.__class__)) - self.assert_(isinstance(p.internal_proxy, - log_processor.InternalProxy)) - self.assertEquals(p.internal_proxy, p._internal_proxy) - - # reset FakeUploadApp - reload(internal_proxy) - - def test_access_log_line_parser(self): - access_proxy_config = self.proxy_config.copy() - access_proxy_config.update({ - 'log-processor-access': { - 'source_filename_format':'%Y%m%d%H*', - 'class_path': - 'swift.stats.access_processor.AccessLogProcessor' - }}) - p = log_processor.LogProcessor(access_proxy_config, DumbLogger()) - result = p.plugins['access']['instance'].log_line_parser(self.access_test_line) - self.assertEquals(result, {'code': 200, - 'processing_time': '0.0262', - 'auth_token': 'tk4e350daf-9338-4cc6-aabb-090e49babfbd', - 'month': '07', - 'second': '30', - 'year': '2010', - 'query': 'format=json&foo', - 'tz': '+0000', - 'http_version': 'HTTP/1.0', - 'object_name': 'bar', - 'etag': '-', - 'method': 'GET', - 'trans_id': 'txfa431231-7f07-42fd-8fc7-7da9d8cc1f90', - 'client_ip': '1.2.3.4', - 'format': 1, - 'bytes_out': 95, - 'container_name': 'foo', - 'day': '09', - 'minute': '14', - 'account': 'acct', - 'hour': '04', - 'referrer': '-', - 'request': '/v1/acct/foo/bar', - 'user_agent': 'curl', - 'bytes_in': 6, - 'lb_ip': '4.5.6.7'}) - - def test_process_one_access_file(self): - access_proxy_config = self.proxy_config.copy() - access_proxy_config.update({ - 'log-processor-access': { - 'source_filename_format':'%Y%m%d%H*', - 'class_path': - 'swift.stats.access_processor.AccessLogProcessor' - }}) - p = log_processor.LogProcessor(access_proxy_config, DumbLogger()) - def get_object_data(*a, **kw): - return [self.access_test_line] - p.get_object_data = get_object_data - result = p.process_one_file('access', 'a', 'c', 'o') - expected = {('acct', '2010', '07', '09', '04'): - {('public', 'object', 'GET', '2xx'): 1, - ('public', 'bytes_out'): 95, - 'marker_query': 0, - 'format_query': 1, - 'delimiter_query': 0, - 'path_query': 0, - ('public', 'bytes_in'): 6, - 'prefix_query': 0}} - self.assertEquals(result, expected) - - def test_process_one_access_file_error(self): - access_proxy_config = self.proxy_config.copy() - access_proxy_config.update({ - 'log-processor-access': { - 'source_filename_format':'%Y%m%d%H*', - 'class_path': - 'swift.stats.access_processor.AccessLogProcessor' - }}) - p = log_processor.LogProcessor(access_proxy_config, DumbLogger()) - p._internal_proxy = DumbInternalProxy(code=500) - self.assertRaises(log_processor.BadFileDownload, p.process_one_file, - 'access', 'a', 'c', 'o') - - def test_get_container_listing(self): - p = log_processor.LogProcessor(self.proxy_config, DumbLogger()) - p._internal_proxy = DumbInternalProxy() - result = p.get_container_listing('a', 'foo') - expected = ['2010/03/14/13/obj1'] - self.assertEquals(result, expected) - result = p.get_container_listing('a', 'foo', listing_filter=expected) - expected = [] - self.assertEquals(result, expected) - result = p.get_container_listing('a', 'foo', start_date='2010031412', - end_date='2010031414') - expected = ['2010/03/14/13/obj1'] - self.assertEquals(result, expected) - result = p.get_container_listing('a', 'foo', start_date='2010031414') - expected = [] - self.assertEquals(result, expected) - result = p.get_container_listing('a', 'foo', start_date='2010031410', - end_date='2010031412') - expected = [] - self.assertEquals(result, expected) - result = p.get_container_listing('a', 'foo', start_date='2010031412', - end_date='2010031413') - expected = ['2010/03/14/13/obj1'] - self.assertEquals(result, expected) - - def test_get_object_data(self): - p = log_processor.LogProcessor(self.proxy_config, DumbLogger()) - p._internal_proxy = DumbInternalProxy() - result = list(p.get_object_data('a', 'c', 'o', False)) - expected = ['obj','data'] - self.assertEquals(result, expected) - result = list(p.get_object_data('a', 'c', 'o.gz', True)) - self.assertEquals(result, expected) - - def test_get_object_data_errors(self): - p = log_processor.LogProcessor(self.proxy_config, DumbLogger()) - p._internal_proxy = DumbInternalProxy(code=500) - result = p.get_object_data('a', 'c', 'o') - self.assertRaises(log_processor.BadFileDownload, list, result) - p._internal_proxy = DumbInternalProxy(bad_compressed=True) - result = p.get_object_data('a', 'c', 'o.gz', True) - self.assertRaises(log_processor.BadFileDownload, list, result) - p._internal_proxy = DumbInternalProxy(timeout=True) - result = p.get_object_data('a', 'c', 'o') - self.assertRaises(log_processor.BadFileDownload, list, result) - - def test_get_stat_totals(self): - stats_proxy_config = self.proxy_config.copy() - stats_proxy_config.update({ - 'log-processor-stats': { - 'class_path': - 'swift.stats.stats_processor.StatsLogProcessor' - }}) - p = log_processor.LogProcessor(stats_proxy_config, DumbLogger()) - p._internal_proxy = DumbInternalProxy() - def get_object_data(*a,**kw): - return [self.stats_test_line] - p.get_object_data = get_object_data - result = p.process_one_file('stats', 'a', 'c', 'y/m/d/h/o') - expected = {('account', 'y', 'm', 'd', 'h'): - {'replica_count': 1, - 'object_count': 2, - 'container_count': 1, - 'bytes_used': 3}} - self.assertEquals(result, expected) - - def test_generate_keylist_mapping(self): - p = log_processor.LogProcessor(self.proxy_config, DumbLogger()) - result = p.generate_keylist_mapping() - expected = {} - self.assertEquals(result, expected) - - def test_generate_keylist_mapping_with_dummy_plugins(self): - class Plugin1(object): - def keylist_mapping(self): - return {'a': 'b', 'c': 'd', 'e': ['f', 'g']} - class Plugin2(object): - def keylist_mapping(self): - return {'a': '1', 'e': '2', 'h': '3'} - p = log_processor.LogProcessor(self.proxy_config, DumbLogger()) - p.plugins['plugin1'] = {'instance': Plugin1()} - p.plugins['plugin2'] = {'instance': Plugin2()} - result = p.generate_keylist_mapping() - expected = {'a': set(['b', '1']), 'c': 'd', 'e': set(['2', 'f', 'g']), - 'h': '3'} - self.assertEquals(result, expected) - - def test_access_keylist_mapping_format(self): - proxy_config = self.proxy_config.copy() - proxy_config.update({ - 'log-processor-access': { - 'source_filename_format':'%Y%m%d%H*', - 'class_path': - 'swift.stats.access_processor.AccessLogProcessor' - }}) - p = log_processor.LogProcessor(proxy_config, DumbLogger()) - mapping = p.generate_keylist_mapping() - for k, v in mapping.items(): - # these only work for Py2.7+ - #self.assertIsInstance(k, str) - self.assertTrue(isinstance(k, str), type(k)) - - def test_stats_keylist_mapping_format(self): - proxy_config = self.proxy_config.copy() - proxy_config.update({ - 'log-processor-stats': { - 'class_path': - 'swift.stats.stats_processor.StatsLogProcessor' - }}) - p = log_processor.LogProcessor(proxy_config, DumbLogger()) - mapping = p.generate_keylist_mapping() - for k, v in mapping.items(): - # these only work for Py2.7+ - #self.assertIsInstance(k, str) - self.assertTrue(isinstance(k, str), type(k)) - - def test_collate_worker(self): - try: - log_processor.LogProcessor._internal_proxy = DumbInternalProxy() - def get_object_data(*a,**kw): - return [self.access_test_line] - orig_get_object_data = log_processor.LogProcessor.get_object_data - log_processor.LogProcessor.get_object_data = get_object_data - proxy_config = self.proxy_config.copy() - proxy_config.update({ - 'log-processor-access': { - 'source_filename_format':'%Y%m%d%H*', - 'class_path': - 'swift.stats.access_processor.AccessLogProcessor' - }}) - processor_args = (proxy_config, DumbLogger()) - q_in = Queue.Queue() - q_out = Queue.Queue() - work_request = ('access', 'a','c','o') - q_in.put(work_request) - q_in.put(None) - log_processor.collate_worker(processor_args, q_in, q_out) - item, ret = q_out.get() - self.assertEquals(item, work_request) - expected = {('acct', '2010', '07', '09', '04'): - {('public', 'object', 'GET', '2xx'): 1, - ('public', 'bytes_out'): 95, - 'marker_query': 0, - 'format_query': 1, - 'delimiter_query': 0, - 'path_query': 0, - ('public', 'bytes_in'): 6, - 'prefix_query': 0}} - self.assertEquals(ret, expected) - finally: - log_processor.LogProcessor._internal_proxy = None - log_processor.LogProcessor.get_object_data = orig_get_object_data - - def test_collate_worker_error(self): - def get_object_data(*a,**kw): - raise Exception() - orig_get_object_data = log_processor.LogProcessor.get_object_data - try: - log_processor.LogProcessor.get_object_data = get_object_data - proxy_config = self.proxy_config.copy() - proxy_config.update({ - 'log-processor-access': { - 'source_filename_format':'%Y%m%d%H*', - 'class_path': - 'swift.stats.access_processor.AccessLogProcessor' - }}) - processor_args = (proxy_config, DumbLogger()) - q_in = Queue.Queue() - q_out = Queue.Queue() - work_request = ('access', 'a','c','o') - q_in.put(work_request) - q_in.put(None) - log_processor.collate_worker(processor_args, q_in, q_out) - item, ret = q_out.get() - self.assertEquals(item, work_request) - # these only work for Py2.7+ - #self.assertIsInstance(ret, log_processor.BadFileDownload) - self.assertTrue(isinstance(ret, Exception)) - finally: - log_processor.LogProcessor.get_object_data = orig_get_object_data - - def test_multiprocess_collate(self): - try: - log_processor.LogProcessor._internal_proxy = DumbInternalProxy() - def get_object_data(*a,**kw): - return [self.access_test_line] - orig_get_object_data = log_processor.LogProcessor.get_object_data - log_processor.LogProcessor.get_object_data = get_object_data - proxy_config = self.proxy_config.copy() - proxy_config.update({ - 'log-processor-access': { - 'source_filename_format':'%Y%m%d%H*', - 'class_path': - 'swift.stats.access_processor.AccessLogProcessor' - }}) - processor_args = (proxy_config, DumbLogger()) - item = ('access', 'a','c','o') - logs_to_process = [item] - results = log_processor.multiprocess_collate(processor_args, - logs_to_process, - 1) - results = list(results) - expected = [(item, {('acct', '2010', '07', '09', '04'): - {('public', 'object', 'GET', '2xx'): 1, - ('public', 'bytes_out'): 95, - 'marker_query': 0, - 'format_query': 1, - 'delimiter_query': 0, - 'path_query': 0, - ('public', 'bytes_in'): 6, - 'prefix_query': 0}})] - self.assertEquals(results, expected) - finally: - log_processor.LogProcessor._internal_proxy = None - log_processor.LogProcessor.get_object_data = orig_get_object_data - - def test_multiprocess_collate_errors(self): - def get_object_data(*a,**kw): - raise log_processor.BadFileDownload() - orig_get_object_data = log_processor.LogProcessor.get_object_data - try: - log_processor.LogProcessor.get_object_data = get_object_data - proxy_config = self.proxy_config.copy() - proxy_config.update({ - 'log-processor-access': { - 'source_filename_format':'%Y%m%d%H*', - 'class_path': - 'swift.stats.access_processor.AccessLogProcessor' - }}) - processor_args = (proxy_config, DumbLogger()) - item = ('access', 'a','c','o') - logs_to_process = [item] - results = log_processor.multiprocess_collate(processor_args, - logs_to_process, - 1) - results = list(results) - expected = [] - self.assertEquals(results, expected) - finally: - log_processor.LogProcessor._internal_proxy = None - log_processor.LogProcessor.get_object_data = orig_get_object_data - -class TestLogProcessorDaemon(unittest.TestCase): - - def test_get_lookback_interval(self): - class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): - def __init__(self, lookback_hours, lookback_window): - self.lookback_hours = lookback_hours - self.lookback_window = lookback_window - - try: - d = datetime.datetime - - for x in [ - [d(2011, 1, 1), 0, 0, None, None], - [d(2011, 1, 1), 120, 0, '2010122700', None], - [d(2011, 1, 1), 120, 24, '2010122700', '2010122800'], - [d(2010, 1, 2, 3, 4), 120, 48, '2009122803', '2009123003'], - [d(2009, 5, 6, 7, 8), 1200, 100, '2009031707', '2009032111'], - [d(2008, 9, 10, 11, 12), 3000, 1000, '2008050811', '2008061903'], - ]: - - log_processor.now = lambda: x[0] - - d = MockLogProcessorDaemon(x[1], x[2]) - self.assertEquals((x[3], x[4]), d.get_lookback_interval()) - finally: - log_processor.now = datetime.datetime.now - - def test_get_processed_files_list(self): - class MockLogProcessor(): - def __init__(self, stream): - self.stream = stream - - def get_object_data(self, *args, **kwargs): - return self.stream - - class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): - def __init__(self, stream): - self.log_processor = MockLogProcessor(stream) - self.log_processor_account = 'account' - self.log_processor_container = 'container' - self.processed_files_filename = 'filename' - - file_list = set(['a', 'b', 'c']) - - for s, l in [['', None], - [pickle.dumps(set()).split('\n'), set()], - [pickle.dumps(file_list).split('\n'), file_list], - ]: - - self.assertEquals(l, - MockLogProcessorDaemon(s).get_processed_files_list()) - - def test_get_processed_files_list_bad_file_downloads(self): - class MockLogProcessor(): - def __init__(self, status_code): - self.err = log_processor.BadFileDownload(status_code) - - def get_object_data(self, *a, **k): - raise self.err - - class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): - def __init__(self, status_code): - self.log_processor = MockLogProcessor(status_code) - self.log_processor_account = 'account' - self.log_processor_container = 'container' - self.processed_files_filename = 'filename' - - for c, l in [[404, set()], [503, None], [None, None]]: - self.assertEquals(l, - MockLogProcessorDaemon(c).get_processed_files_list()) - - def test_get_aggregate_data(self): - # when run "for real" - # the various keys/values in the input and output - # dictionaries are often not simple strings - # for testing we can use keys that are easier to work with - - processed_files = set() - - data_in = [ - ['file1', { - 'acct1_time1': {'field1': 1, 'field2': 2, 'field3': 3}, - 'acct1_time2': {'field1': 4, 'field2': 5}, - 'acct2_time1': {'field1': 6, 'field2': 7}, - 'acct3_time3': {'field1': 8, 'field2': 9}, - } - ], - ['file2', {'acct1_time1': {'field1': 10}}], - ] - - expected_data_out = { - 'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3}, - 'acct1_time2': {'field1': 4, 'field2': 5}, - 'acct2_time1': {'field1': 6, 'field2': 7}, - 'acct3_time3': {'field1': 8, 'field2': 9}, - } - - class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): - def __init__(self): - pass - - d = MockLogProcessorDaemon() - data_out = d.get_aggregate_data(processed_files, data_in) - - for k, v in expected_data_out.items(): - self.assertEquals(v, data_out[k]) - - self.assertEquals(set(['file1', 'file2']), processed_files) - - def test_get_final_info(self): - # when run "for real" - # the various keys/values in the input and output - # dictionaries are often not simple strings - # for testing we can use keys/values that are easier to work with - - class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): - def __init__(self): - self._keylist_mapping = { - 'out_field1':['field1', 'field2', 'field3'], - 'out_field2':['field2', 'field3'], - 'out_field3':['field3'], - 'out_field4':'field4', - 'out_field5':['field6', 'field7', 'field8'], - 'out_field6':['field6'], - 'out_field7':'field7', - } - - data_in = { - 'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3, - 'field4': 8, 'field5': 11}, - 'acct1_time2': {'field1': 4, 'field2': 5}, - 'acct2_time1': {'field1': 6, 'field2': 7}, - 'acct3_time3': {'field1': 8, 'field2': 9}, - } - - expected_data_out = { - 'acct1_time1': {'out_field1': 16, 'out_field2': 5, - 'out_field3': 3, 'out_field4': 8, 'out_field5': 0, - 'out_field6': 0, 'out_field7': 0,}, - 'acct1_time2': {'out_field1': 9, 'out_field2': 5, - 'out_field3': 0, 'out_field4': 0, 'out_field5': 0, - 'out_field6': 0, 'out_field7': 0,}, - 'acct2_time1': {'out_field1': 13, 'out_field2': 7, - 'out_field3': 0, 'out_field4': 0, 'out_field5': 0, - 'out_field6': 0, 'out_field7': 0,}, - 'acct3_time3': {'out_field1': 17, 'out_field2': 9, - 'out_field3': 0, 'out_field4': 0, 'out_field5': 0, - 'out_field6': 0, 'out_field7': 0,}, - } - - self.assertEquals(expected_data_out, - MockLogProcessorDaemon().get_final_info(data_in)) - - def test_store_processed_files_list(self): - class MockInternalProxy: - def __init__(self, test, daemon, processed_files): - self.test = test - self.daemon = daemon - self.processed_files = processed_files - - def upload_file(self, f, account, container, filename): - self.test.assertEquals(self.processed_files, - pickle.loads(f.getvalue())) - self.test.assertEquals(self.daemon.log_processor_account, - account) - self.test.assertEquals(self.daemon.log_processor_container, - container) - self.test.assertEquals(self.daemon.processed_files_filename, - filename) - - class MockLogProcessor: - def __init__(self, test, daemon, processed_files): - self.internal_proxy = MockInternalProxy(test, daemon, - processed_files) - - class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): - def __init__(self, test, processed_files): - self.log_processor = \ - MockLogProcessor(test, self, processed_files) - self.log_processor_account = 'account' - self.log_processor_container = 'container' - self.processed_files_filename = 'filename' - - processed_files = set(['a', 'b', 'c']) - MockLogProcessorDaemon(self, processed_files).\ - store_processed_files_list(processed_files) - - def test_get_output(self): - class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): - def __init__(self): - self._keylist_mapping = {'a':None, 'b':None, 'c':None} - - data_in = { - ('acct1', 2010, 1, 1, 0): {'a':1, 'b':2, 'c':3}, - ('acct1', 2010, 10, 10, 10): {'a':10, 'b':20, 'c':30}, - ('acct2', 2008, 3, 6, 9): {'a':8, 'b':9, 'c':12}, - ('acct3', 2005, 4, 8, 16): {'a':1, 'b':5, 'c':25}, - } - - expected_data_out = [ - ['data_ts', 'account', 'a', 'b', 'c'], - ['2010/01/01 00:00:00', 'acct1', '1', '2', '3'], - ['2010/10/10 10:00:00', 'acct1', '10', '20', '30'], - ['2008/03/06 09:00:00', 'acct2', '8', '9', '12'], - ['2005/04/08 16:00:00', 'acct3', '1', '5', '25'], - ] - - data_out = MockLogProcessorDaemon().get_output(data_in) - self.assertEquals(expected_data_out[0], data_out[0]) - - for row in data_out[1:]: - self.assert_(row in expected_data_out) - - for row in expected_data_out[1:]: - self.assert_(row in data_out) - - def test_store_output(self): - try: - real_strftime = time.strftime - mock_strftime_return = '2010/03/02/01/' - def mock_strftime(format): - self.assertEquals('%Y/%m/%d/%H/', format) - return mock_strftime_return - log_processor.time.strftime = mock_strftime - - data_in = [ - ['data_ts', 'account', 'a', 'b', 'c'], - ['2010/10/10 10:00:00', 'acct1', '1', '2', '3'], - ['2010/10/10 10:00:00', 'acct1', '10', '20', '30'], - ['2008/03/06 09:00:00', 'acct2', '8', '9', '12'], - ['2005/04/08 16:00:00', 'acct3', '1', '5', '25'], - ] - - expected_output = '\n'.join([','.join(row) for row in data_in]) - h = hashlib.md5(expected_output).hexdigest() - expected_filename = '%s%s.csv.gz' % (mock_strftime_return, h) - - class MockInternalProxy: - def __init__(self, test, daemon, expected_filename, - expected_output): - self.test = test - self.daemon = daemon - self.expected_filename = expected_filename - self.expected_output = expected_output - - def upload_file(self, f, account, container, filename): - self.test.assertEquals(self.daemon.log_processor_account, - account) - self.test.assertEquals(self.daemon.log_processor_container, - container) - self.test.assertEquals(self.expected_filename, filename) - self.test.assertEquals(self.expected_output, f.getvalue()) - - class MockLogProcessor: - def __init__(self, test, daemon, expected_filename, - expected_output): - self.internal_proxy = MockInternalProxy(test, daemon, - expected_filename, expected_output) - - class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): - def __init__(self, test, expected_filename, expected_output): - self.log_processor = MockLogProcessor(test, self, - expected_filename, expected_output) - self.log_processor_account = 'account' - self.log_processor_container = 'container' - self.processed_files_filename = 'filename' - - MockLogProcessorDaemon(self, expected_filename, expected_output).\ - store_output(data_in) - finally: - log_processor.time.strftime = real_strftime - - def test_keylist_mapping(self): - # Kind of lame test to see if the propery is both - # generated by a particular method and cached properly. - # The method that actually generates the mapping is - # tested elsewhere. - - value_return = 'keylist_mapping' - class MockLogProcessor: - def __init__(self): - self.call_count = 0 - - def generate_keylist_mapping(self): - self.call_count += 1 - return value_return - - class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): - def __init__(self): - self.log_processor = MockLogProcessor() - self._keylist_mapping = None - - d = MockLogProcessorDaemon() - self.assertEquals(value_return, d.keylist_mapping) - self.assertEquals(value_return, d.keylist_mapping) - self.assertEquals(1, d.log_processor.call_count) - - def test_process_logs(self): - try: - mock_logs_to_process = 'logs_to_process' - mock_processed_files = 'processed_files' - - real_multiprocess_collate = log_processor.multiprocess_collate - multiprocess_collate_return = 'multiprocess_collate_return' - - get_aggregate_data_return = 'get_aggregate_data_return' - get_final_info_return = 'get_final_info_return' - get_output_return = 'get_output_return' - - class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): - def __init__(self, test): - self.test = test - self.total_conf = 'total_conf' - self.logger = 'logger' - self.worker_count = 'worker_count' - - def get_aggregate_data(self, processed_files, results): - self.test.assertEquals(mock_processed_files, processed_files) - self.test.assertEquals(multiprocess_collate_return, results) - return get_aggregate_data_return - - def get_final_info(self, aggr_data): - self.test.assertEquals(get_aggregate_data_return, aggr_data) - return get_final_info_return - - def get_output(self, final_info): - self.test.assertEquals(get_final_info_return, final_info) - return get_output_return - - d = MockLogProcessorDaemon(self) - - def mock_multiprocess_collate(processor_args, logs_to_process, - worker_count): - self.assertEquals(d.total_conf, processor_args[0]) - self.assertEquals(d.logger, processor_args[1]) - - self.assertEquals(mock_logs_to_process, logs_to_process) - self.assertEquals(d.worker_count, worker_count) - - return multiprocess_collate_return - - log_processor.multiprocess_collate = mock_multiprocess_collate - - output = d.process_logs(mock_logs_to_process, mock_processed_files) - self.assertEquals(get_output_return, output) - finally: - log_processor.multiprocess_collate = real_multiprocess_collate - - def test_run_once_get_processed_files_list_returns_none(self): - class MockLogProcessor: - def get_data_list(self, lookback_start, lookback_end, - processed_files): - raise unittest.TestCase.failureException, \ - 'Method should not be called' - - class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): - def __init__(self): - self.logger = DumbLogger() - self.log_processor = MockLogProcessor() - - def get_lookback_interval(self): - return None, None - - def get_processed_files_list(self): - return None - - MockLogProcessorDaemon().run_once() - - def test_run_once_no_logs_to_process(self): - class MockLogProcessor(): - def __init__(self, daemon, test): - self.daemon = daemon - self.test = test - - def get_data_list(self, lookback_start, lookback_end, - processed_files): - self.test.assertEquals(self.daemon.lookback_start, - lookback_start) - self.test.assertEquals(self.daemon.lookback_end, - lookback_end) - self.test.assertEquals(self.daemon.processed_files, - processed_files) - return [] - - class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): - def __init__(self, test): - self.logger = DumbLogger() - self.log_processor = MockLogProcessor(self, test) - self.lookback_start = 'lookback_start' - self.lookback_end = 'lookback_end' - self.processed_files = ['a', 'b', 'c'] - - def get_lookback_interval(self): - return self.lookback_start, self.lookback_end - - def get_processed_files_list(self): - return self.processed_files - - def process_logs(logs_to_process, processed_files): - raise unittest.TestCase.failureException, \ - 'Method should not be called' - - MockLogProcessorDaemon(self).run_once() diff --git a/test/unit/stats/test_log_uploader.py b/test/unit/stats/test_log_uploader.py deleted file mode 100644 index 01bb00ceaf..0000000000 --- a/test/unit/stats/test_log_uploader.py +++ /dev/null @@ -1,240 +0,0 @@ -# Copyright (c) 2010-2011 OpenStack, LLC. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest -import os -from datetime import datetime -from tempfile import mkdtemp -from shutil import rmtree -from functools import partial -from collections import defaultdict -import random -import string - -from test.unit import temptree -from swift.stats import log_uploader - -import logging -logging.basicConfig(level=logging.DEBUG) -LOGGER = logging.getLogger() - -COMPRESSED_DATA = '\x1f\x8b\x08\x08\x87\xa5zM\x02\xffdata\x00KI,I\x04\x00c' \ - '\xf3\xf3\xad\x04\x00\x00\x00' - -access_regex = ''' - ^ - (?P[0-9]{4}) - (?P[0-1][0-9]) - (?P[0-3][0-9]) - (?P[0-2][0-9]) - .*$ - ''' - - -def mock_appconfig(*args, **kwargs): - pass - - -class MockInternalProxy(): - - def __init__(self, *args, **kwargs): - pass - - def create_container(self, *args, **kwargs): - return True - - def upload_file(self, *args, **kwargs): - return True - - -_orig_LogUploader = log_uploader.LogUploader - - -class MockLogUploader(_orig_LogUploader): - - def __init__(self, conf, logger=LOGGER): - conf['swift_account'] = conf.get('swift_account', '') - conf['container_name'] = conf.get('container_name', '') - conf['new_log_cutoff'] = conf.get('new_log_cutoff', '0') - conf['source_filename_format'] = conf.get( - 'source_filename_format', conf.get('filename_format')) - log_uploader.LogUploader.__init__(self, conf, 'plugin') - self.logger = logger - self.uploaded_files = [] - - def upload_one_log(self, filename, year, month, day, hour): - d = {'year': year, 'month': month, 'day': day, 'hour': hour} - self.uploaded_files.append((filename, d)) - _orig_LogUploader.upload_one_log(self, filename, year, month, - day, hour) - - -class TestLogUploader(unittest.TestCase): - - def setUp(self): - # mock internal proxy - self._orig_InternalProxy = log_uploader.InternalProxy - self._orig_appconfig = log_uploader.appconfig - log_uploader.InternalProxy = MockInternalProxy - log_uploader.appconfig = mock_appconfig - - def tearDown(self): - log_uploader.appconfig = self._orig_appconfig - log_uploader.InternalProxy = self._orig_InternalProxy - - def test_bad_pattern_in_config(self): - files = [datetime.now().strftime('%Y%m%d%H')] - with temptree(files, contents=[COMPRESSED_DATA] * len(files)) as t: - # invalid pattern - conf = {'log_dir': t, - 'source_filename_pattern': '%Y%m%d%h'} # should be %H - uploader = MockLogUploader(conf) - self.assertRaises(SystemExit, uploader.upload_all_logs) - - conf = {'log_dir': t, 'source_filename_pattern': access_regex} - uploader = MockLogUploader(conf) - uploader.upload_all_logs() - self.assertEquals(len(uploader.uploaded_files), 1) - - def test_pattern_upload_all_logs(self): - - # test empty dir - with temptree([]) as t: - conf = {'log_dir': t} - uploader = MockLogUploader(conf) - self.assertRaises(SystemExit, uploader.run_once) - - def get_random_length_str(max_len=10, chars=string.ascii_letters): - return ''.join(random.choice(chars) for x in - range(random.randint(1, max_len))) - - template = 'prefix_%(random)s_%(digits)s.blah.' \ - '%(datestr)s%(hour)0.2d00-%(next_hour)0.2d00-%(number)s.gz' - pattern = '''prefix_.*_[0-9]+\.blah\. - (?P[0-9]{4}) - (?P[0-1][0-9]) - (?P[0-3][0-9]) - (?P[0-2][0-9])00-[0-9]{2}00 - -[0-9]?[0-9]\.gz''' - files_that_should_match = [] - # add some files that match - for i in range(24): - fname = template % { - 'random': get_random_length_str(), - 'digits': get_random_length_str(16, string.digits), - 'datestr': datetime.now().strftime('%Y%m%d'), - 'hour': i, - 'next_hour': i + 1, - 'number': random.randint(0, 20), - } - files_that_should_match.append(fname) - - # add some files that don't match - files = list(files_that_should_match) - for i in range(24): - fname = template % { - 'random': get_random_length_str(), - 'digits': get_random_length_str(16, string.digits), - 'datestr': datetime.now().strftime('%Y%m'), - 'hour': i, - 'next_hour': i + 1, - 'number': random.randint(0, 20), - } - files.append(fname) - - for fname in files: - print fname - - with temptree(files, contents=[COMPRESSED_DATA] * len(files)) as t: - self.assertEquals(len(os.listdir(t)), 48) - conf = {'source_filename_pattern': pattern, 'log_dir': t} - uploader = MockLogUploader(conf) - uploader.run_once() - self.assertEquals(len(os.listdir(t)), 24) - self.assertEquals(len(uploader.uploaded_files), 24) - files_that_were_uploaded = set(x[0] for x in - uploader.uploaded_files) - for f in files_that_should_match: - self.assert_(os.path.join(t, f) in files_that_were_uploaded) - - def test_log_cutoff(self): - files = [datetime.now().strftime('%Y%m%d%H')] - with temptree(files) as t: - conf = {'log_dir': t, 'new_log_cutoff': '7200', - 'source_filename_pattern': access_regex} - uploader = MockLogUploader(conf) - uploader.run_once() - self.assertEquals(len(uploader.uploaded_files), 0) - conf = {'log_dir': t, 'new_log_cutoff': '0', - 'source_filename_pattern': access_regex} - uploader = MockLogUploader(conf) - uploader.run_once() - self.assertEquals(len(uploader.uploaded_files), 1) - - def test_create_container_fail(self): - files = [datetime.now().strftime('%Y%m%d%H')] - conf = {'source_filename_pattern': access_regex} - with temptree(files) as t: - conf['log_dir'] = t - uploader = MockLogUploader(conf) - uploader.run_once() - self.assertEquals(len(uploader.uploaded_files), 1) - - with temptree(files) as t: - conf['log_dir'] = t - uploader = MockLogUploader(conf) - # mock create_container to fail - uploader.internal_proxy.create_container = lambda *args: False - uploader.run_once() - self.assertEquals(len(uploader.uploaded_files), 0) - - def test_unlink_log(self): - files = [datetime.now().strftime('%Y%m%d%H')] - with temptree(files, contents=[COMPRESSED_DATA]) as t: - conf = {'log_dir': t, 'unlink_log': 'false', - 'source_filename_pattern': access_regex} - uploader = MockLogUploader(conf) - uploader.run_once() - self.assertEquals(len(uploader.uploaded_files), 1) - # file still there - self.assertEquals(len(os.listdir(t)), 1) - - conf = {'log_dir': t, 'unlink_log': 'true', - 'source_filename_pattern': access_regex} - uploader = MockLogUploader(conf) - uploader.run_once() - self.assertEquals(len(uploader.uploaded_files), 1) - # file gone - self.assertEquals(len(os.listdir(t)), 0) - - def test_upload_file_failed(self): - files = ['plugin-%s' % datetime.now().strftime('%Y%m%d%H')] - with temptree(files, contents=[COMPRESSED_DATA]) as t: - conf = {'log_dir': t, 'unlink_log': 'true', - 'source_filename_pattern': access_regex} - uploader = MockLogUploader(conf) - - # mock upload_file to fail, and clean up mock - def mock_upload_file(self, *args, **kwargs): - uploader.uploaded_files.pop() - return False - uploader.internal_proxy.upload_file = mock_upload_file - self.assertRaises(SystemExit, uploader.run_once) - # file still there - self.assertEquals(len(os.listdir(t)), 1) - - -if __name__ == '__main__': - unittest.main() diff --git a/test/unit/stats/test_stats_processor.py b/test/unit/stats/test_stats_processor.py deleted file mode 100644 index c3af1c1b69..0000000000 --- a/test/unit/stats/test_stats_processor.py +++ /dev/null @@ -1,29 +0,0 @@ -# Copyright (c) 2010-2011 OpenStack, LLC. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# TODO: Tests - -import unittest -from swift.stats import stats_processor - - -class TestStatsProcessor(unittest.TestCase): - - def test_placeholder(self): - pass - - -if __name__ == '__main__': - unittest.main()