diff --git a/doc/source/admin_guide.rst b/doc/source/admin_guide.rst index 303256e803..87ff33fb78 100644 --- a/doc/source/admin_guide.rst +++ b/doc/source/admin_guide.rst @@ -342,6 +342,442 @@ For example, to obtain quarantine stats from all hosts in zone "3":: =============================================================================== +--------------------------- +Reporting Metrics to StatsD +--------------------------- + +If you have a StatsD_ server running, Swift may be configured to send it +real-time operational metrics. To enable this, set the following +configuration entries (see the sample configuration files):: + + log_statsd_host = localhost + log_statsd_port = 8125 + log_statsd_default_sample_rate = 1 + log_statsd_metric_prefix = [empty-string] + +If `log_statsd_host` is not set, this feature is disabled. The default values +for the other settings are given above. + +.. _StatsD: http://codeascraft.etsy.com/2011/02/15/measure-anything-measure-everything/ +.. _Graphite: http://graphite.wikidot.com/ +.. _Ganglia: http://ganglia.sourceforge.net/ + +The sample rate is a real number between 0 and 1 which defines the +probability of sending a sample for any given event or timing measurement. +This sample rate is sent with each sample to StatsD and used to +multiply the value. For example, with a sample rate of 0.5, StatsD will +multiply that counter's value by 2 when flushing the metric to an upstream +monitoring system (Graphite_, Ganglia_, etc.). To get the best data, start +with the default `log_statsd_default_sample_rate` value of 1 and only lower +it as needed. + +The metric prefix will be prepended to every metric sent to the StatsD server +For example, with:: + + log_statsd_metric_prefix = proxy01 + +the metric `proxy-server.errors` would be sent to StatsD as +`proxy01.proxy-server.errors`. This is useful for differentiating different +servers when sending statistics to a central StatsD server. If you run a local +StatsD server per node, you could configure a per-node metrics prefix there and +leave `log_statsd_metric_prefix` blank. + +Note that metrics reported to StatsD are counters or timing data (which +StatsD usually expands out to min, max, avg, count, and 90th percentile +per timing metric). Some important "gauge" metrics will still need to +be collected using another method. For example, the +`object-server.async_pendings` StatsD metric counts the generation of +async_pendings in real-time, but will not tell you the current number +of async_pending container updates on disk at any point in time. + +Note also that the set of metrics collected, their names, and their semantics +are not locked down and will change over time. StatsD logging is currently in +a "beta" stage and will continue to evolve. + +Metrics for `account-auditor`: + +========================== ========================================================= +Metric Name Description +-------------------------- --------------------------------------------------------- +`account-auditor.errors` Count of audit runs (across all account databases) which + caught an Exception. +`account-auditor.passes` Count of individual account databases which passed audit. +`account-auditor.failures` Count of individual account databases which failed audit. +`account-auditor.timing` Timing data for individual account database audits. +========================== ========================================================= + +Metrics for `account-reaper`: + +============================================== ==================================================== +Metric Name Description +---------------------------------------------- ---------------------------------------------------- +`account-reaper.errors` Count of devices failing the mount check. +`account-reaper.timing` Timing data for each reap_account() call. +`account-reaper.return_codes.X` Count of HTTP return codes from various operations + (eg. object listing, container deletion, etc.). The + value for X is the first digit of the return code + (2 for 201, 4 for 404, etc.). +`account-reaper.containers_failures` Count of failures to delete a container. +`account-reaper.containers_deleted` Count of containers successfully deleted. +`account-reaper.containers_remaining` Count of containers which failed to delete with + zero successes. +`account-reaper.containers_possibly_remaining` Count of containers which failed to delete with + at least one success. +`account-reaper.objects_failures` Count of failures to delete an object. +`account-reaper.objects_deleted` Count of objects successfully deleted. +`account-reaper.objects_remaining` Count of objects which failed to delete with zero + successes. +`account-reaper.objects_possibly_remaining` Count of objects which failed to delete with at + least one success. +============================================== ==================================================== + +Metrics for `account-server` ("Not Found" is not considered an error and requests +which increment `errors` are not included in the timing data): + +================================= ==================================================== +Metric Name Description +--------------------------------- ---------------------------------------------------- +`account-server.DELETE.errors` Count of errors handling DELETE requests: bad + request, not mounted, missing timestamp. +`account-server.DELETE.timing` Timing data for each DELETE request not resulting in + an error. +`account-server.PUT.errors` Count of errors handling PUT requests: bad request, + not mounted, conflict. +`account-server.PUT.timing` Timing data for each PUT request not resulting in an + error. +`account-server.HEAD.errors` Count of errors handling HEAD requests: bad request, + not mounted. +`account-server.HEAD.timing` Timing data for each HEAD request not resulting in + an error. +`account-server.GET.errors` Count of errors handling GET requests: bad request, + not mounted, bad delimiter, account listing limit + too high, bad accept header. +`account-server.GET.timing` Timing data for each GET request not resulting in + an error. +`account-server.REPLICATE.errors` Count of errors handling REPLICATE requests: bad + request, not mounted. +`account-server.REPLICATE.timing` Timing data for each REPLICATE request not resulting + in an error. +`account-server.POST.errors` Count of errors handling POST requests: bad request, + bad or missing timestamp, not mounted. +`account-server.POST.timing` Timing data for each POST request not resulting in + an error. +================================= ==================================================== + +Metrics for `account-replicator`: + +================================== ==================================================== +Metric Name Description +---------------------------------- ---------------------------------------------------- +`account-replicator.diffs` Count of syncs handled by sending differing rows. +`account-replicator.diff_caps` Count of "diffs" operations which failed because + "max_diffs" was hit. +`account-replicator.no_changes` Count of accounts found to be in sync. +`account-replicator.hashmatches` Count of accounts found to be in sync via hash + comparison (`broker.merge_syncs` was called). +`account-replicator.rsyncs` Count of completely missing accounts where were sent + via rsync. +`account-replicator.remote_merges` Count of syncs handled by sending entire database + via rsync. +`account-replicator.attempts` Count of database replication attempts. +`account-replicator.failures` Count of database replication attempts which failed + due to corruption (quarantined) or inability to read + as well as attempts to individual nodes which + failed. +`account-replicator.removes` Count of databases deleted because the + delete_timestamp was greater than the put_timestamp + and the database had no rows or because it was + successfully sync'ed to other locations and doesn't + belong here anymore. +`account-replicator.successes` Count of replication attempts to an individual node + which were successful. +`account-replicator.timing` Timing data for each database replication attempt + not resulting in a failure. +================================== ==================================================== + +Metrics for `container-auditor`: + +============================ ==================================================== +Metric Name Description +---------------------------- ---------------------------------------------------- +`container-auditor.errors` Incremented when an Exception is caught in an audit + pass (only once per pass, max). +`container-auditor.passes` Count of individual containers passing an audit. +`container-auditor.failures` Count of individual containers failing an audit. +`container-auditor.timing` Timing data for each container audit. +============================ ==================================================== + +Metrics for `container-replicator`: + +==================================== ==================================================== +Metric Name Description +------------------------------------ ---------------------------------------------------- +`container-replicator.diffs` Count of syncs handled by sending differing rows. +`container-replicator.diff_caps` Count of "diffs" operations which failed because + "max_diffs" was hit. +`container-replicator.no_changes` Count of containers found to be in sync. +`container-replicator.hashmatches` Count of containers found to be in sync via hash + comparison (`broker.merge_syncs` was called). +`container-replicator.rsyncs` Count of completely missing containers where were sent + via rsync. +`container-replicator.remote_merges` Count of syncs handled by sending entire database + via rsync. +`container-replicator.attempts` Count of database replication attempts. +`container-replicator.failures` Count of database replication attempts which failed + due to corruption (quarantined) or inability to read + as well as attempts to individual nodes which + failed. +`container-replicator.removes` Count of databases deleted because the + delete_timestamp was greater than the put_timestamp + and the database had no rows or because it was + successfully sync'ed to other locations and doesn't + belong here anymore. +`container-replicator.successes` Count of replication attempts to an individual node + which were successful. +`container-replicator.timing` Timing data for each database replication attempt + not resulting in a failure. +==================================== ==================================================== + +Metrics for `container-server` ("Not Found" is not considered an error and requests +which increment `errors` are not included in the timing data): + +=================================== ==================================================== +Metric Name Description +----------------------------------- ---------------------------------------------------- +`container-server.DELETE.errors` Count of errors handling DELETE requests: bad + request, not mounted, missing timestamp, conflict. +`container-server.DELETE.timing` Timing data for each DELETE request not resulting in + an error. +`container-server.PUT.errors` Count of errors handling PUT requests: bad request, + missing timestamp, not mounted, conflict. +`container-server.PUT.timing` Timing data for each PUT request not resulting in an + error. +`container-server.HEAD.errors` Count of errors handling HEAD requests: bad request, + not mounted. +`container-server.HEAD.timing` Timing data for each HEAD request not resulting in + an error. +`container-server.GET.errors` Count of errors handling GET requests: bad request, + not mounted, parameters not utf8, bad accept header. +`container-server.GET.timing` Timing data for each GET request not resulting in + an error. +`container-server.REPLICATE.errors` Count of errors handling REPLICATE requests: bad + request, not mounted. +`container-server.REPLICATE.timing` Timing data for each REPLICATE request not resulting + in an error. +`container-server.POST.errors` Count of errors handling POST requests: bad request, + bad x-container-sync-to, not mounted. +`container-server.POST.timing` Timing data for each POST request not resulting in + an error. +=================================== ==================================================== + +Metrics for `container-sync`: + +=============================== ==================================================== +Metric Name Description +------------------------------- ---------------------------------------------------- +`container-sync.skips` Count of containers skipped because they don't have + sync'ing enabled. +`container-sync.failures` Count of failures sync'ing of individual containers. +`container-sync.syncs` Count of individual containers sync'ed successfully. +`container-sync.deletes` Count of container database rows sync'ed by + deletion. +`container-sync.deletes.timing` Timing data for each container database row + sychronization via deletion. +`container-sync.puts` Count of container database rows sync'ed by PUTing. +`container-sync.puts.timing` Timing data for each container database row + sychronization via PUTing. +=============================== ==================================================== + +Metrics for `container-updater`: + +============================== ==================================================== +Metric Name Description +------------------------------ ---------------------------------------------------- +`container-updater.successes` Count of containers which successfully updated their + account. +`container-updater.failures` Count of containers which failed to update their + account. +`container-updater.no_changes` Count of containers which didn't need to update + their account. +`container-updater.timing` Timing data for processing a container; only + includes timing for containers which needed to + update their accounts (i.e. "successes" and + "failures" but not "no_changes"). +============================== ==================================================== + +Metrics for `object-auditor`: + +============================ ==================================================== +Metric Name Description +---------------------------- ---------------------------------------------------- +`object-auditor.quarantines` Count of objects failing audit and quarantined. +`object-auditor.errors` Count of errors encountered while auditing objects. +`object-auditor.timing` Timing data for each object audit (does not include + any rate-limiting sleep time for + max_files_per_second, but does include rate-limiting + sleep time for max_bytes_per_second). +============================ ==================================================== + +Metrics for `object-expirer`: + +======================== ==================================================== +Metric Name Description +------------------------ ---------------------------------------------------- +`object-expirer.objects` Count of objects expired. +`object-expirer.errors` Count of errors encountered while attempting to + expire an object. +`object-expirer.timing` Timing data for each object expiration attempt, + including ones resulting in an error. +======================== ==================================================== + +Metrics for `object-replicator`: + +=================================================== ==================================================== +Metric Name Description +--------------------------------------------------- ---------------------------------------------------- +`object-replicator.partition.delete.count.` A count of partitions on which were + replicated to another node because they didn't + belong on this node. This metric is tracked + per-device to allow for "quiescence detection" for + object replication activity on each device. +`object-replicator.partition.delete.timing` Timing data for partitions replicated to another + node because they didn't belong on this node. This + metric is not tracked per device. +`object-replicator.partition.update.count.` A count of partitions on which were + replicated to another node, but also belong on this + node. As with delete.count, this metric is tracked + per-device. +`object-replicator.partition.update.timing` Timing data for partitions replicated which also + belong on this node. This metric is not tracked + per-device. +`object-replicator.suffix.hashes` Count of suffix directories whose has (of filenames) + was recalculated. +`object-replicator.suffix.syncs` Count of suffix directories replicated with rsync. +=================================================== ==================================================== + +Metrics for `object-server`: + +================================ ==================================================== +Metric Name Description +-------------------------------- ---------------------------------------------------- +`object-server.quarantines` Count of objects (files) found bad and moved to + quarantine. +`object-server.async_pendings` Count of container updates saved as async_pendings + (may result from PUT or DELETE requests). +`object-server.POST.errors` Count of errors handling POST requests: bad request, + missing timestamp, delete-at in past, not mounted. +`object-server.POST.timing` Timing data for each POST request not resulting in + an error. +`object-server.PUT.errors` Count of errors handling PUT requests: bad request, + not mounted, missing timestamp, object creation + constraint violation, delete-at in past. +`object-server.PUT.timeouts` Count of object PUTs which exceeded max_upload_time. +`object-server.PUT.timing` Timing data for each PUT request not resulting in an + error. +`object-server.GET.errors` Count of errors handling GET requests: bad request, + not mounted, header timestamps before the epoch. + File errors resulting in a quarantine are not + counted here. +`object-server.GET.timing` Timing data for each GET request not resulting in an + error. Includes requests which couldn't find the + object (including disk errors resulting in file + quarantine). +`object-server.HEAD.errors` Count of errors handling HEAD requests: bad request, + not mounted. +`object-server.HEAD.timing` Timing data for each HEAD request not resulting in + an error. Includes requests which couldn't find the + object (including disk errors resulting in file + quarantine). +`object-server.DELETE.errors` Count of errors handling DELETE requests: bad + request, missing timestamp, not mounted. Includes + requests which couldn't find or match the object. +`object-server.DELETE.timing` Timing data for each DELETE request not resulting + in an error. +`object-server.REPLICATE.errors` Count of errors handling REPLICATE requests: bad + request, not mounted. +`object-server.REPLICATE.timing` Timing data for each REPLICATE request not resulting + in an error. +================================ ==================================================== + +Metrics for `object-updater`: + +============================ ==================================================== +Metric Name Description +---------------------------- ---------------------------------------------------- +`object-updater.errors` Count of drives not mounted or async_pending files + with an unexpected name. +`object-updater.timing` Timing data for object sweeps to flush async_pending + container updates. Does not include object sweeps + which did not find an existing async_pending storage + directory. +`object-updater.quarantines` Count of async_pending container updates which were + corrupted and moved to quarantine. +`object-updater.successes` Count of successful container updates. +`object-updater.failures` Count of failed continer updates. +============================ ==================================================== + +Metrics for `proxy-server` (in the table, `` may be `Account`, `Container`, +or `Object`, and corresponds to the internal Controller object which handled the +request): + +========================================= ==================================================== +Metric Name Description +----------------------------------------- ---------------------------------------------------- +`proxy-server.errors` Count of errors encountered while serving requests + before the controller type is determined. Includes + invalid Content-Length, errors finding the internal + controller to handle the request, invalid utf8, and + bad URLs. +`proxy-server..errors` Count of errors encountered after the controller + type is known. The details of which responses are + errors depend on the controller type and request + type (GET, PUT, etc.). Failed + authentication/authorization and "Not Found" + responses are not counted as errors. +`proxy-server..client_timeouts` Count of client timeouts (client did not read from + queue within `client_timeout` seconds). +`proxy-server..client_disconnects` Count of detected client disconnects. +`proxy-server..method_not_allowed` Count of MethodNotAllowed responses sent by the +`proxy-server..auth_short_circuits` Count of requests which short-circuited with an + authentication/authorization failure. +`proxy-server..GET.timing` Timing data for GET requests (excluding requests + with errors or failed authentication/authorization). +`proxy-server..HEAD.timing` Timing data for HEAD requests (excluding requests + with errors or failed authentication/authorization). +`proxy-server..POST.timing` Timing data for POST requests (excluding requests + with errors or failed authentication/authorization). + Requests with a client disconnect ARE included in + the timing data. +`proxy-server..PUT.timing` Timing data for PUT requests (excluding requests + with errors or failed authentication/authorization). + Account PUT requests which return MethodNotAllowed + because allow_account_management is disabled ARE + included. +`proxy-server..DELETE.timing` Timing data for DELETE requests (excluding requests + with errors or failed authentication/authorization). + Account DELETE requests which return + MethodNotAllowed because allow_account_management is + disabled ARE included. +`proxy-server.Object.COPY.timing` Timing data for object COPY requests (excluding + requests with errors or failed + authentication/authorization). +========================================= ==================================================== + +Metrics for `tempauth` (in the table, `` represents the actual configured +reseller_prefix or "`NONE`" if the reseller_prefix is the empty string): + +========================================= ==================================================== +Metric Name Description +----------------------------------------- ---------------------------------------------------- +`tempauth..unauthorized` Count of regular requests which were denied with + HTTPUnauthorized. +`tempauth..forbidden` Count of regular requests which were denied with + HTTPForbidden. +`tempauth..token_denied` Count of token requests which were denied. +`tempauth..errors` Count of errors. +========================================= ==================================================== + + ------------------------ Debugging Tips and Tools ------------------------ diff --git a/doc/source/conf.py b/doc/source/conf.py index 3a8fc7c5df..8f99eed00b 100644 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -18,7 +18,8 @@ # Swift documentation build configuration file, created by # sphinx-quickstart on Tue May 18 13:50:15 2010. # -# This file is execfile()d with the current directory set to its containing dir. +# This file is execfile()d with the current directory set to its containing +# dir. # # Note that not all possible configuration values are present in this # autogenerated file. @@ -26,28 +27,33 @@ # All configuration values have a default; values that are commented out # serve to show the default. -import sys, os +import sys +import os # If extensions (or modules to document with autodoc) are in another directory, # add these directories to sys.path here. If the directory is relative to the # documentation root, use os.path.abspath to make it absolute, like shown here. -sys.path.append([os.path.abspath('../swift'), os.path.abspath('..'), os.path.abspath('../bin')]) +sys.path.append([os.path.abspath('../swift'), os.path.abspath('..'), + os.path.abspath('../bin')]) -# -- General configuration ----------------------------------------------------- +# -- General configuration ---------------------------------------------------- -# Add any Sphinx extension module names here, as strings. They can be extensions -# coming with Sphinx (named 'sphinx.ext.*') or your custom ones. -extensions = ['sphinx.ext.autodoc', 'sphinx.ext.intersphinx', 'sphinx.ext.todo', 'sphinx.ext.coverage', 'sphinx.ext.pngmath', 'sphinx.ext.ifconfig'] +# Add any Sphinx extension module names here, as strings. They can be +# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom ones. +extensions = ['sphinx.ext.autodoc', 'sphinx.ext.intersphinx', + 'sphinx.ext.todo', 'sphinx.ext.coverage', 'sphinx.ext.pngmath', + 'sphinx.ext.ifconfig'] todo_include_todos = True # Add any paths that contain templates here, relative to this directory. -# Changing the path so that the Hudson build output contains GA code and the source -# docs do not contain the code so local, offline sphinx builds are "clean." +# Changing the path so that the Hudson build output contains GA code and the +# source docs do not contain the code so local, offline sphinx builds are +# "clean." templates_path = [] if os.getenv('HUDSON_PUBLISH_DOCS'): - templates_path = ['_ga', '_templates'] + templates_path = ['_ga', '_templates'] else: - templates_path = ['_templates'] + templates_path = ['_templates'] # The suffix of source filenames. source_suffix = '.rst' @@ -89,7 +95,8 @@ release = __version__ # for source files. exclude_trees = [] -# The reST default role (used for this markup: `text`) to use for all documents. +# The reST default role (used for this markup: `text`) to use for all +# documents. #default_role = None # If true, '()' will be appended to :func: etc. cross-reference text. @@ -110,7 +117,7 @@ pygments_style = 'sphinx' modindex_common_prefix = ['swift.'] -# -- Options for HTML output --------------------------------------------------- +# -- Options for HTML output ----------------------------------------------- # The theme to use for HTML and HTML Help pages. Major themes that come with # Sphinx are currently 'default' and 'sphinxdoc'. @@ -188,7 +195,7 @@ html_last_updated_fmt = os.popen(git_cmd).read() htmlhelp_basename = 'swiftdoc' -# -- Options for LaTeX output -------------------------------------------------- +# -- Options for LaTeX output ------------------------------------------------- # The paper size ('letter' or 'a4'). #latex_paper_size = 'letter' @@ -197,7 +204,8 @@ htmlhelp_basename = 'swiftdoc' #latex_font_size = '10pt' # Grouping the document tree into LaTeX files. List of tuples -# (source start file, target name, title, author, documentclass [howto/manual]). +# (source start file, target name, title, author, documentclass +# [howto/manual]). latex_documents = [ ('index', 'Swift.tex', u'Swift Documentation', u'Swift Team', 'manual'), @@ -224,4 +232,3 @@ latex_documents = [ intersphinx_mapping = {'python': ('http://docs.python.org/', None), 'nova': ('http://nova.openstack.org', None), 'glance': ('http://glance.openstack.org', None)} - diff --git a/etc/account-server.conf-sample b/etc/account-server.conf-sample index 2e773220ab..05599b637e 100644 --- a/etc/account-server.conf-sample +++ b/etc/account-server.conf-sample @@ -11,6 +11,12 @@ # log_name = swift # log_facility = LOG_LOCAL0 # log_level = INFO +# You can enable default statsD logging here and/or override it in sections +# below: +# log_statsd_host = localhost +# log_statsd_port = 8125 +# log_statsd_default_sample_rate = 1 +# log_statsd_metric_prefix = # Normally Swift will try to preallocate disk space for new SQLite databases to # decrease fragmentation (at the cost of disk usage). You may turn this feature # off here. diff --git a/etc/container-server.conf-sample b/etc/container-server.conf-sample index 452950f7d6..f84071babf 100644 --- a/etc/container-server.conf-sample +++ b/etc/container-server.conf-sample @@ -14,6 +14,12 @@ # log_name = swift # log_facility = LOG_LOCAL0 # log_level = INFO +# You can enable default statsD logging here and/or override it in sections +# below: +# log_statsd_host = localhost +# log_statsd_port = 8125 +# log_statsd_default_sample_rate = 1 +# log_statsd_metric_prefix = # Normally Swift will try to preallocate disk space for new SQLite databases to # decrease fragmentation (at the cost of disk usage). You may turn this feature # off here. diff --git a/etc/object-expirer.conf-sample b/etc/object-expirer.conf-sample index 17be2d41ef..5fe9793121 100644 --- a/etc/object-expirer.conf-sample +++ b/etc/object-expirer.conf-sample @@ -5,6 +5,11 @@ # log_name = swift # log_facility = LOG_LOCAL0 # log_level = INFO +# You can enable default statsD logging here if you want: +# log_statsd_host = localhost +# log_statsd_port = 8125 +# log_statsd_default_sample_rate = 1 +# log_statsd_metric_prefix = [object-expirer] # interval = 300 diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index 95afadb2f0..8f6ed5d013 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -12,6 +12,12 @@ # log_name = swift # log_facility = LOG_LOCAL0 # log_level = INFO +# You can enable default statsD logging here and/or override it in sections +# below: +# log_statsd_host = localhost +# log_statsd_port = 8125 +# log_statsd_default_sample_rate = 1 +# log_statsd_metric_prefix = [pipeline:main] pipeline = recon object-server diff --git a/etc/proxy-server.conf-sample b/etc/proxy-server.conf-sample index 366462148f..1e43e41f80 100644 --- a/etc/proxy-server.conf-sample +++ b/etc/proxy-server.conf-sample @@ -13,6 +13,12 @@ # log_name = swift # log_facility = LOG_LOCAL0 # log_level = INFO +# You can enable default statsD logging here and/or override it in sections +# below: +# log_statsd_host = localhost +# log_statsd_port = 8125 +# log_statsd_default_sample_rate = 1 +# log_statsd_metric_prefix = [pipeline:main] pipeline = catch_errors healthcheck cache tempauth proxy-server diff --git a/swift/account/auditor.py b/swift/account/auditor.py index 82052f1f94..108085f471 100644 --- a/swift/account/auditor.py +++ b/swift/account/auditor.py @@ -41,29 +41,34 @@ class AccountAuditor(Daemon): swift.common.db.DB_PREALLOCATION = \ conf.get('db_preallocation', 't').lower() in TRUE_VALUES + def _one_audit_pass(self, reported): + all_locs = audit_location_generator(self.devices, + account_server.DATADIR, mount_check=self.mount_check, + logger=self.logger) + for path, device, partition in all_locs: + self.account_audit(path) + if time.time() - reported >= 3600: # once an hour + self.logger.info(_('Since %(time)s: Account audits: ' + '%(passed)s passed audit, %(failed)s failed audit'), + {'time': time.ctime(reported), + 'passed': self.account_passes, + 'failed': self.account_failures}) + reported = time.time() + self.account_passes = 0 + self.account_failures = 0 + return reported + def run_forever(self, *args, **kwargs): """Run the account audit until stopped.""" reported = time.time() time.sleep(random() * self.interval) while True: - self.logger.info(_('Begin account audit pass')) + self.logger.info(_('Begin account audit pass.')) begin = time.time() try: - all_locs = audit_location_generator(self.devices, - account_server.DATADIR, mount_check=self.mount_check, - logger=self.logger) - for path, device, partition in all_locs: - self.account_audit(path) - if time.time() - reported >= 3600: # once an hour - self.logger.info(_('Since %(time)s: Account audits: ' - '%(passed)s passed audit, %(failed)s failed audit'), - {'time': time.ctime(reported), - 'passed': self.account_passes, - 'failed': self.account_failures}) - reported = time.time() - self.account_passes = 0 - self.account_failures = 0 + reported = self._one_audit_pass(reported) except (Exception, Timeout): + self.logger.increment('errors') self.logger.exception(_('ERROR auditing')) elapsed = time.time() - begin if elapsed < self.interval: @@ -75,21 +80,7 @@ class AccountAuditor(Daemon): """Run the account audit once.""" self.logger.info(_('Begin account audit "once" mode')) begin = reported = time.time() - all_locs = audit_location_generator(self.devices, - account_server.DATADIR, - mount_check=self.mount_check, - logger=self.logger) - for path, device, partition in all_locs: - self.account_audit(path) - if time.time() - reported >= 3600: # once an hour - self.logger.info(_('Since %(time)s: Account audits: ' - '%(passed)s passed audit, %(failed)s failed audit'), - {'time': time.ctime(reported), - 'passed': self.account_passes, - 'failed': self.account_failures}) - reported = time.time() - self.account_passes = 0 - self.account_failures = 0 + self._one_audit_pass(reported) elapsed = time.time() - begin self.logger.info( _('Account audit "once" mode completed: %.02fs'), elapsed) @@ -100,15 +91,19 @@ class AccountAuditor(Daemon): :param path: the path to an account db """ + start_time = time.time() try: if not path.endswith('.db'): return broker = AccountBroker(path) if not broker.is_deleted(): info = broker.get_info() + self.logger.increment('passes') self.account_passes += 1 self.logger.debug(_('Audit passed for %s') % broker.db_file) except (Exception, Timeout): + self.logger.increment('failures') self.account_failures += 1 self.logger.exception(_('ERROR Could not get account info %s'), (broker.db_file)) + self.logger.timing_since('timing', start_time) diff --git a/swift/account/reaper.py b/swift/account/reaper.py index f5096581ad..28017e8042 100644 --- a/swift/account/reaper.py +++ b/swift/account/reaper.py @@ -119,6 +119,7 @@ class AccountReaper(Daemon): for device in os.listdir(self.devices): if self.mount_check and \ not os.path.ismount(os.path.join(self.devices, device)): + self.logger.increment('errors') self.logger.debug( _('Skipping %s as it is not mounted'), device) continue @@ -162,6 +163,7 @@ class AccountReaper(Daemon): if fname.endswith('.ts'): break elif fname.endswith('.db'): + self.start_time = time() broker = \ AccountBroker(os.path.join(hsh_path, fname)) if broker.is_status_deleted() and \ @@ -262,6 +264,7 @@ class AccountReaper(Daemon): log = log[:-2] log += _(', elapsed: %.02fs') % (time() - begin) self.logger.info(log) + self.logger.timing_since('timing', self.start_time) return True def reap_container(self, account, account_partition, account_nodes, @@ -313,12 +316,15 @@ class AccountReaper(Daemon): response_timeout=self.node_timeout)[1] self.stats_return_codes[2] = \ self.stats_return_codes.get(2, 0) + 1 + self.logger.increment('return_codes.2') except ClientException, err: if self.logger.getEffectiveLevel() <= DEBUG: self.logger.exception( _('Exception with %(ip)s:%(port)s/%(device)s'), node) self.stats_return_codes[err.http_status / 100] = \ self.stats_return_codes.get(err.http_status / 100, 0) + 1 + self.logger.increment( + 'return_codes.%d' % (err.http_status / 100,)) if not objects: break try: @@ -348,19 +354,26 @@ class AccountReaper(Daemon): successes += 1 self.stats_return_codes[2] = \ self.stats_return_codes.get(2, 0) + 1 + self.logger.increment('return_codes.2') except ClientException, err: if self.logger.getEffectiveLevel() <= DEBUG: self.logger.exception( _('Exception with %(ip)s:%(port)s/%(device)s'), node) failures += 1 + self.logger.increment('containers_failures') self.stats_return_codes[err.http_status / 100] = \ self.stats_return_codes.get(err.http_status / 100, 0) + 1 + self.logger.increment( + 'return_codes.%d' % (err.http_status / 100,)) if successes > failures: self.stats_containers_deleted += 1 + self.logger.increment('containers_deleted') elif not successes: self.stats_containers_remaining += 1 + self.logger.increment('containers_remaining') else: self.stats_containers_possibly_remaining += 1 + self.logger.increment('containers_possibly_remaining') def reap_object(self, account, container, container_partition, container_nodes, obj): @@ -399,16 +412,23 @@ class AccountReaper(Daemon): successes += 1 self.stats_return_codes[2] = \ self.stats_return_codes.get(2, 0) + 1 + self.logger.increment('return_codes.2') except ClientException, err: if self.logger.getEffectiveLevel() <= DEBUG: self.logger.exception( _('Exception with %(ip)s:%(port)s/%(device)s'), node) failures += 1 + self.logger.increment('objects_failures') self.stats_return_codes[err.http_status / 100] = \ self.stats_return_codes.get(err.http_status / 100, 0) + 1 + self.logger.increment( + 'return_codes.%d' % (err.http_status / 100,)) if successes > failures: self.stats_objects_deleted += 1 + self.logger.increment('objects_deleted') elif not successes: self.stats_objects_remaining += 1 + self.logger.increment('objects_remaining') else: self.stats_objects_possibly_remaining += 1 + self.logger.increment('objects_possibly_remaining') diff --git a/swift/account/server.py b/swift/account/server.py index eaa62e1cf9..a1497f9776 100644 --- a/swift/account/server.py +++ b/swift/account/server.py @@ -65,32 +65,41 @@ class AccountController(object): def DELETE(self, req): """Handle HTTP DELETE request.""" + start_time = time.time() try: drive, part, account = split_path(unquote(req.path), 3) except ValueError, err: + self.logger.increment('DELETE.errors') return HTTPBadRequest(body=str(err), content_type='text/plain', request=req) if self.mount_check and not check_mount(self.root, drive): + self.logger.increment('DELETE.errors') return HTTPInsufficientStorage(drive=drive, request=req) if 'x-timestamp' not in req.headers or \ not check_float(req.headers['x-timestamp']): + self.logger.increment('DELETE.errors') return HTTPBadRequest(body='Missing timestamp', request=req, content_type='text/plain') broker = self._get_account_broker(drive, part, account) if broker.is_deleted(): + self.logger.timing_since('DELETE.timing', start_time) return HTTPNotFound(request=req) broker.delete_db(req.headers['x-timestamp']) + self.logger.timing_since('DELETE.timing', start_time) return HTTPNoContent(request=req) def PUT(self, req): """Handle HTTP PUT request.""" + start_time = time.time() try: drive, part, account, container = split_path(unquote(req.path), 3, 4) except ValueError, err: + self.logger.increment('PUT.errors') return HTTPBadRequest(body=str(err), content_type='text/plain', request=req) if self.mount_check and not check_mount(self.root, drive): + self.logger.increment('PUT.errors') return HTTPInsufficientStorage(drive=drive, request=req) broker = self._get_account_broker(drive, part, account) if container: # put account container @@ -102,11 +111,13 @@ class AccountController(object): req.headers.get('x-timestamp') or time.time())) if req.headers.get('x-account-override-deleted', 'no').lower() != \ 'yes' and broker.is_deleted(): + self.logger.timing_since('PUT.timing', start_time) return HTTPNotFound(request=req) broker.put_container(container, req.headers['x-put-timestamp'], req.headers['x-delete-timestamp'], req.headers['x-object-count'], req.headers['x-bytes-used']) + self.logger.timing_since('PUT.timing', start_time) if req.headers['x-delete-timestamp'] > \ req.headers['x-put-timestamp']: return HTTPNoContent(request=req) @@ -118,11 +129,13 @@ class AccountController(object): broker.initialize(timestamp) created = True elif broker.is_status_deleted(): + self.logger.timing_since('PUT.timing', start_time) return HTTPForbidden(request=req, body='Recently deleted') else: created = broker.is_deleted() broker.update_put_timestamp(timestamp) if broker.is_deleted(): + self.logger.increment('PUT.errors') return HTTPConflict(request=req) metadata = {} metadata.update((key, (value, timestamp)) @@ -130,6 +143,7 @@ class AccountController(object): if key.lower().startswith('x-account-meta-')) if metadata: broker.update_metadata(metadata) + self.logger.timing_since('PUT.timing', start_time) if created: return HTTPCreated(request=req) else: @@ -143,19 +157,23 @@ class AccountController(object): # container servers directly so this is no longer needed. We should # refactor out the container existence check here and retest # everything. + start_time = time.time() try: drive, part, account, container = split_path(unquote(req.path), 3, 4) except ValueError, err: + self.logger.increment('HEAD.errors') return HTTPBadRequest(body=str(err), content_type='text/plain', request=req) if self.mount_check and not check_mount(self.root, drive): + self.logger.increment('HEAD.errors') return HTTPInsufficientStorage(drive=drive, request=req) broker = self._get_account_broker(drive, part, account) if not container: broker.pending_timeout = 0.1 broker.stale_reads_ok = True if broker.is_deleted(): + self.logger.timing_since('HEAD.timing', start_time) return HTTPNotFound(request=req) info = broker.get_info() headers = { @@ -171,21 +189,26 @@ class AccountController(object): headers.update((key, value) for key, (value, timestamp) in broker.metadata.iteritems() if value != '') + self.logger.timing_since('HEAD.timing', start_time) return HTTPNoContent(request=req, headers=headers) def GET(self, req): """Handle HTTP GET request.""" + start_time = time.time() try: drive, part, account = split_path(unquote(req.path), 3) except ValueError, err: + self.logger.increment('GET.errors') return HTTPBadRequest(body=str(err), content_type='text/plain', request=req) if self.mount_check and not check_mount(self.root, drive): + self.logger.increment('GET.errors') return HTTPInsufficientStorage(drive=drive, request=req) broker = self._get_account_broker(drive, part, account) broker.pending_timeout = 0.1 broker.stale_reads_ok = True if broker.is_deleted(): + self.logger.timing_since('GET.timing', start_time) return HTTPNotFound(request=req) info = broker.get_info() resp_headers = { @@ -202,18 +225,21 @@ class AccountController(object): delimiter = get_param(req, 'delimiter') if delimiter and (len(delimiter) > 1 or ord(delimiter) > 254): # delimiters can be made more flexible later + self.logger.increment('GET.errors') return HTTPPreconditionFailed(body='Bad delimiter') limit = ACCOUNT_LISTING_LIMIT given_limit = get_param(req, 'limit') if given_limit and given_limit.isdigit(): limit = int(given_limit) if limit > ACCOUNT_LISTING_LIMIT: + self.logger.increment('GET.errors') return HTTPPreconditionFailed(request=req, body='Maximum limit is %d' % ACCOUNT_LISTING_LIMIT) marker = get_param(req, 'marker', '') end_marker = get_param(req, 'end_marker') query_format = get_param(req, 'format') except UnicodeDecodeError, err: + self.logger.increment('GET.errors') return HTTPBadRequest(body='parameters not utf8', content_type='text/plain', request=req) if query_format: @@ -224,6 +250,7 @@ class AccountController(object): 'application/xml', 'text/xml'], default_match='text/plain') except AssertionError, err: + self.logger.increment('GET.errors') return HTTPBadRequest(body='bad accept header: %s' % req.accept, content_type='text/plain', request=req) account_list = broker.list_containers_iter(limit, marker, end_marker, @@ -256,11 +283,13 @@ class AccountController(object): account_list = '\n'.join(output_list) else: if not account_list: + self.logger.timing_since('GET.timing', start_time) return HTTPNoContent(request=req, headers=resp_headers) account_list = '\n'.join(r[0] for r in account_list) + '\n' ret = Response(body=account_list, request=req, headers=resp_headers) ret.content_type = out_content_type ret.charset = 'utf-8' + self.logger.timing_since('GET.timing', start_time) return ret def REPLICATE(self, req): @@ -268,37 +297,47 @@ class AccountController(object): Handle HTTP REPLICATE request. Handler for RPC calls for account replication. """ + start_time = time.time() try: post_args = split_path(unquote(req.path), 3) except ValueError, err: + self.logger.increment('REPLICATE.errors') return HTTPBadRequest(body=str(err), content_type='text/plain', request=req) drive, partition, hash = post_args if self.mount_check and not check_mount(self.root, drive): + self.logger.increment('REPLICATE.errors') return HTTPInsufficientStorage(drive=drive, request=req) try: args = simplejson.load(req.environ['wsgi.input']) except ValueError, err: + self.logger.increment('REPLICATE.errors') return HTTPBadRequest(body=str(err), content_type='text/plain') ret = self.replicator_rpc.dispatch(post_args, args) ret.request = req + self.logger.timing_since('REPLICATE.timing', start_time) return ret def POST(self, req): """Handle HTTP POST request.""" + start_time = time.time() try: drive, part, account = split_path(unquote(req.path), 3) except ValueError, err: + self.logger.increment('POST.errors') return HTTPBadRequest(body=str(err), content_type='text/plain', request=req) if 'x-timestamp' not in req.headers or \ not check_float(req.headers['x-timestamp']): + self.logger.increment('POST.errors') return HTTPBadRequest(body='Missing or bad timestamp', request=req, content_type='text/plain') if self.mount_check and not check_mount(self.root, drive): + self.logger.increment('POST.errors') return HTTPInsufficientStorage(drive=drive, request=req) broker = self._get_account_broker(drive, part, account) if broker.is_deleted(): + self.logger.timing_since('POST.timing', start_time) return HTTPNotFound(request=req) timestamp = normalize_timestamp(req.headers['x-timestamp']) metadata = {} @@ -307,6 +346,7 @@ class AccountController(object): if key.lower().startswith('x-account-meta-')) if metadata: broker.update_metadata(metadata) + self.logger.timing_since('POST.timing', start_time) return HTTPNoContent(request=req) def __call__(self, env, start_response): diff --git a/swift/common/db_replicator.py b/swift/common/db_replicator.py index 62d1625872..8f7dd5e8aa 100644 --- a/swift/common/db_replicator.py +++ b/swift/common/db_replicator.py @@ -218,6 +218,7 @@ class Replicator(Daemon): :returns: boolean indicating completion and success """ self.stats['diff'] += 1 + self.logger.increment('diffs') self.logger.debug(_('Syncing chunks with %s'), http.host) sync_table = broker.get_syncs() objects = broker.get_items_since(point, self.per_diff) @@ -239,6 +240,7 @@ class Replicator(Daemon): '%s rows behind; moving on and will try again next pass.') % (broker.db_file, self.max_diffs * self.per_diff)) self.stats['diff_capped'] += 1 + self.logger.increment('diff_caps') else: with Timeout(self.node_timeout): response = http.replicate('merge_syncs', sync_table) @@ -262,9 +264,11 @@ class Replicator(Daemon): """ if max(rinfo['point'], local_sync) >= info['max_row']: self.stats['no_change'] += 1 + self.logger.increment('no_changes') return True if rinfo['hash'] == info['hash']: self.stats['hashmatch'] += 1 + self.logger.increment('hashmatches') broker.merge_syncs([{'remote_id': rinfo['id'], 'sync_point': rinfo['point']}], incoming=False) return True @@ -309,6 +313,7 @@ class Replicator(Daemon): return False elif response.status == HTTPNotFound.code: # completely missing, rsync self.stats['rsync'] += 1 + self.logger.increment('rsyncs') return self._rsync_db(broker, node, http, info['id']) elif response.status == HTTPInsufficientStorage.code: raise DriveNotMounted() @@ -321,6 +326,7 @@ class Replicator(Daemon): # more than 50%, rsync then do a remote merge. if rinfo['max_row'] / float(info['max_row']) < 0.5: self.stats['remote_merge'] += 1 + self.logger.increment('remote_merges') return self._rsync_db(broker, node, http, info['id'], replicate_method='rsync_then_merge', replicate_timeout=(info['count'] / 2000)) @@ -337,8 +343,10 @@ class Replicator(Daemon): :param object_file: DB file name to be replicated :param node_id: node id of the node to be replicated to """ + start_time = time.time() self.logger.debug(_('Replicating db %s'), object_file) self.stats['attempted'] += 1 + self.logger.increment('attempts') try: broker = self.brokerclass(object_file, pending_timeout=30) broker.reclaim(time.time() - self.reclaim_age, @@ -351,6 +359,7 @@ class Replicator(Daemon): else: self.logger.exception(_('ERROR reading db %s'), object_file) self.stats['failure'] += 1 + self.logger.increment('failures') return # The db is considered deleted if the delete_timestamp value is greater # than the put_timestamp, and there are no objects. @@ -370,6 +379,8 @@ class Replicator(Daemon): with lock_parent_directory(object_file): shutil.rmtree(os.path.dirname(object_file), True) self.stats['remove'] += 1 + self.logger.increment('removes') + self.logger.timing_since('timing', start_time) return responses = [] nodes = self.ring.get_part_nodes(int(partition)) @@ -391,6 +402,7 @@ class Replicator(Daemon): self.logger.exception(_('ERROR syncing %(file)s with node' ' %(node)s'), {'file': object_file, 'node': node}) self.stats['success' if success else 'failure'] += 1 + self.logger.increment('successes' if success else 'failures') responses.append(success) if not shouldbehere and all(responses): # If the db shouldn't be on this node and has been successfully @@ -398,6 +410,8 @@ class Replicator(Daemon): with lock_parent_directory(object_file): shutil.rmtree(os.path.dirname(object_file), True) self.stats['remove'] += 1 + self.logger.increment('removes') + self.logger.timing_since('timing', start_time) def roundrobin_datadirs(self, datadirs): """ diff --git a/swift/common/middleware/tempauth.py b/swift/common/middleware/tempauth.py index 417da9360a..a669ca20e7 100644 --- a/swift/common/middleware/tempauth.py +++ b/swift/common/middleware/tempauth.py @@ -69,6 +69,8 @@ class TempAuth(object): self.reseller_prefix = conf.get('reseller_prefix', 'AUTH').strip() if self.reseller_prefix and self.reseller_prefix[-1] != '_': self.reseller_prefix += '_' + self.logger.set_statsd_prefix('tempauth.%s' % ( + self.reseller_prefix if self.reseller_prefix else 'NONE',)) self.auth_prefix = conf.get('auth_prefix', '/auth/') if not self.auth_prefix: self.auth_prefix = '/auth/' @@ -145,6 +147,7 @@ class TempAuth(object): if self.reseller_prefix: # Because I know I'm the definitive auth for this token, I # can deny it outright. + self.logger.increment('unauthorized') return HTTPUnauthorized()(env, start_response) # Because I'm not certain if I'm the definitive auth for empty # reseller_prefixed tokens, I won't overwrite swift.authorize. @@ -160,6 +163,7 @@ class TempAuth(object): 1, 2, True) except ValueError: version, rest = None, None + self.logger.increment('errors') if rest and rest.startswith(self.reseller_prefix): # Handle anonymous access to accounts I'm the definitive # auth for. @@ -230,6 +234,7 @@ class TempAuth(object): try: version, account, container, obj = split_path(req.path, 1, 4, True) except ValueError: + self.logger.increment('errors') return HTTPNotFound(request=req) if not account or not account.startswith(self.reseller_prefix): return self.denied_response(req) @@ -270,8 +275,10 @@ class TempAuth(object): depending on whether the REMOTE_USER is set or not. """ if req.remote_user: + self.logger.increment('forbidden') return HTTPForbidden(request=req) else: + self.logger.increment('unauthorized') return HTTPUnauthorized(request=req) def handle(self, env, start_response): @@ -306,6 +313,7 @@ class TempAuth(object): return response except (Exception, Timeout): print "EXCEPTION IN handle: %s: %s" % (format_exc(), env) + self.logger.increment('errors') start_response('500 Server Error', [('Content-Type', 'text/plain')]) return ['Internal server error.\n'] @@ -323,11 +331,13 @@ class TempAuth(object): version, account, user, _junk = split_path(req.path_info, minsegs=1, maxsegs=4, rest_with_last=True) except ValueError: + self.logger.increment('errors') return HTTPNotFound(request=req) if version in ('v1', 'v1.0', 'auth'): if req.method == 'GET': handler = self.handle_get_token if not handler: + self.logger.increment('errors') req.response = HTTPBadRequest(request=req) else: req.response = handler(req) @@ -362,6 +372,7 @@ class TempAuth(object): pathsegs = split_path(req.path_info, minsegs=1, maxsegs=3, rest_with_last=True) except ValueError: + self.logger.increment('errors') return HTTPNotFound(request=req) if pathsegs[0] == 'v1' and pathsegs[2] == 'auth': account = pathsegs[1] @@ -369,9 +380,11 @@ class TempAuth(object): if not user: user = req.headers.get('x-auth-user') if not user or ':' not in user: + self.logger.increment('token_denied') return HTTPUnauthorized(request=req) account2, user = user.split(':', 1) if account != account2: + self.logger.increment('token_denied') return HTTPUnauthorized(request=req) key = req.headers.get('x-storage-pass') if not key: @@ -381,6 +394,7 @@ class TempAuth(object): if not user: user = req.headers.get('x-storage-user') if not user or ':' not in user: + self.logger.increment('token_denied') return HTTPUnauthorized(request=req) account, user = user.split(':', 1) key = req.headers.get('x-auth-key') @@ -389,12 +403,15 @@ class TempAuth(object): else: return HTTPBadRequest(request=req) if not all((account, user, key)): + self.logger.increment('token_denied') return HTTPUnauthorized(request=req) # Authenticate user account_user = account + ':' + user if account_user not in self.users: + self.logger.increment('token_denied') return HTTPUnauthorized(request=req) if self.users[account_user]['key'] != key: + self.logger.increment('token_denied') return HTTPUnauthorized(request=req) # Get memcache client memcache_client = cache_from_env(req.environ) diff --git a/swift/common/utils.py b/swift/common/utils.py index 47edce8c1c..615d1d863c 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -21,10 +21,11 @@ import os import pwd import sys import time +import functools from hashlib import md5 from random import shuffle from urllib import quote -from contextlib import contextmanager +from contextlib import contextmanager, closing import ctypes import ctypes.util from ConfigParser import ConfigParser, NoSectionError, NoOptionError, \ @@ -292,6 +293,54 @@ class LoggerFileObject(object): return self +class StatsdClient(object): + def __init__(self, host, port, base_prefix='', tail_prefix='', + default_sample_rate=1): + self._host = host + self._port = port + self._base_prefix = base_prefix + self.set_prefix(tail_prefix) + self._default_sample_rate = default_sample_rate + self._target = (self._host, self._port) + + def set_prefix(self, new_prefix): + if new_prefix and self._base_prefix: + self._prefix = '.'.join([self._base_prefix, new_prefix, '']) + elif new_prefix: + self._prefix = new_prefix + '.' + elif self._base_prefix: + self._prefix = self._base_prefix + '.' + else: + self._prefix = '' + + def _send(self, m_name, m_value, m_type, sample_rate): + if sample_rate is None: + sample_rate = self._default_sample_rate + parts = ['%s%s:%s' % (self._prefix, m_name, m_value), m_type] + if sample_rate < 1: + parts.append('@%s' % (sample_rate,)) + # Ideally, we'd cache a sending socket in self, but that + # results in a socket getting shared by multiple green threads. + with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as sock: + return sock.sendto('|'.join(parts), self._target) + + def update_stats(self, m_name, m_value, sample_rate=None): + return self._send(m_name, m_value, 'c', sample_rate) + + def increment(self, metric, sample_rate=None): + return self.update_stats(metric, 1, sample_rate) + + def decrement(self, metric, sample_rate=None): + return self.update_stats(metric, -1, sample_rate) + + def timing(self, metric, timing_ms, sample_rate=None): + return self._send(metric, timing_ms, 'ms', sample_rate) + + def timing_since(self, metric, orig_time, sample_rate=None): + return self.timing(metric, (time.time() - orig_time) * 1000, + sample_rate) + + # double inheritance to support property with setter class LogAdapter(logging.LoggerAdapter, object): """ @@ -377,6 +426,44 @@ class LogAdapter(logging.LoggerAdapter, object): call = self._exception call('%s: %s' % (msg, emsg), *args, **kwargs) + def set_statsd_prefix(self, prefix): + """ + The StatsD client prefix defaults to the "name" of the logger. This + method may override that default with a specific value. Currently used + in the proxy-server to differentiate the Account, Container, and Object + controllers. + """ + if self.logger.statsd_client: + self.logger.statsd_client.set_prefix(prefix) + + def statsd_delegate(statsd_func_name): + """ + Factory which creates methods which delegate to methods on + self.logger.statsd_client (an instance of StatsdClient). The + created methods conditionally delegate to a method whose name is given + in 'statsd_func_name'. The created delegate methods are a no-op when + StatsD logging is not configured. The created delegate methods also + handle the defaulting of sample_rate (to either the default specified + in the config with 'log_statsd_default_sample_rate' or the value passed + into delegate function). + + :param statsd_func_name: the name of a method on StatsdClient. + """ + + func = getattr(StatsdClient, statsd_func_name) + + @functools.wraps(func) + def wrapped(self, *a, **kw): + if getattr(self.logger, 'statsd_client'): + return func(self.logger.statsd_client, *a, **kw) + return wrapped + + update_stats = statsd_delegate('update_stats') + increment = statsd_delegate('increment') + decrement = statsd_delegate('decrement') + timing = statsd_delegate('timing') + timing_since = statsd_delegate('timing_since') + class SwiftLogFormatter(logging.Formatter): """ @@ -405,6 +492,9 @@ def get_logger(conf, name=None, log_to_console=False, log_route=None, log_facility = LOG_LOCAL0 log_level = INFO log_name = swift + log_statsd_host = (disabled) + log_statsd_port = 8125 + log_statsd_default_sample_rate = 1 :param conf: Configuration dict to read settings from :param name: Name of the logger @@ -454,6 +544,20 @@ def get_logger(conf, name=None, log_to_console=False, log_route=None, # set the level for the logger logger.setLevel( getattr(logging, conf.get('log_level', 'INFO').upper(), logging.INFO)) + + # Setup logger with a StatsD client if so configured + statsd_host = conf.get('log_statsd_host') + if statsd_host: + statsd_port = int(conf.get('log_statsd_port', 8125)) + base_prefix = conf.get('log_statsd_metric_prefix', '') + default_sample_rate = float(conf.get( + 'log_statsd_default_sample_rate', 1)) + statsd_client = StatsdClient(statsd_host, statsd_port, base_prefix, + name, default_sample_rate) + logger.statsd_client = statsd_client + else: + logger.statsd_client = None + adapted_logger = LogAdapter(logger, name) return adapted_logger diff --git a/swift/container/auditor.py b/swift/container/auditor.py index ff0cad4fe6..a69ebb3c3d 100644 --- a/swift/container/auditor.py +++ b/swift/container/auditor.py @@ -42,41 +42,7 @@ class ContainerAuditor(Daemon): swift.common.db.DB_PREALLOCATION = \ conf.get('db_preallocation', 't').lower() in TRUE_VALUES - def run_forever(self, *args, **kwargs): - """Run the container audit until stopped.""" - reported = time.time() - time.sleep(random() * self.interval) - while True: - self.logger.info(_('Begin container audit pass.')) - begin = time.time() - try: - all_locs = audit_location_generator(self.devices, - container_server.DATADIR, mount_check=self.mount_check, - logger=self.logger) - for path, device, partition in all_locs: - self.container_audit(path) - if time.time() - reported >= 3600: # once an hour - self.logger.info( - _('Since %(time)s: Container audits: %(pass)s ' - 'passed audit, %(fail)s failed audit'), - {'time': time.ctime(reported), - 'pass': self.container_passes, - 'fail': self.container_failures}) - reported = time.time() - self.container_passes = 0 - self.container_failures = 0 - except (Exception, Timeout): - self.logger.exception(_('ERROR auditing')) - elapsed = time.time() - begin - if elapsed < self.interval: - time.sleep(self.interval - elapsed) - self.logger.info( - _('Container audit pass completed: %.02fs'), elapsed) - - def run_once(self, *args, **kwargs): - """Run the container audit once.""" - self.logger.info(_('Begin container audit "once" mode')) - begin = reported = time.time() + def _one_audit_pass(self, reported): all_locs = audit_location_generator(self.devices, container_server.DATADIR, mount_check=self.mount_check, @@ -93,6 +59,31 @@ class ContainerAuditor(Daemon): reported = time.time() self.container_passes = 0 self.container_failures = 0 + return reported + + def run_forever(self, *args, **kwargs): + """Run the container audit until stopped.""" + reported = time.time() + time.sleep(random() * self.interval) + while True: + self.logger.info(_('Begin container audit pass.')) + begin = time.time() + try: + reported = self._one_audit_pass(reported) + except (Exception, Timeout): + self.logger.increment('errors') + self.logger.exception(_('ERROR auditing')) + elapsed = time.time() - begin + if elapsed < self.interval: + time.sleep(self.interval - elapsed) + self.logger.info( + _('Container audit pass completed: %.02fs'), elapsed) + + def run_once(self, *args, **kwargs): + """Run the container audit once.""" + self.logger.info(_('Begin container audit "once" mode')) + begin = reported = time.time() + self._one_audit_pass(reported) elapsed = time.time() - begin self.logger.info( _('Container audit "once" mode completed: %.02fs'), elapsed) @@ -103,15 +94,19 @@ class ContainerAuditor(Daemon): :param path: the path to a container db """ + start_time = time.time() try: if not path.endswith('.db'): return broker = ContainerBroker(path) if not broker.is_deleted(): info = broker.get_info() + self.logger.increment('passes') self.container_passes += 1 self.logger.debug(_('Audit passed for %s'), broker.db_file) except (Exception, Timeout): + self.logger.increment('failures') self.container_failures += 1 self.logger.exception(_('ERROR Could not get container info %s'), (broker.db_file)) + self.logger.timing_since('timing', start_time) diff --git a/swift/container/server.py b/swift/container/server.py index ad355d6c4c..8dcf12b97e 100644 --- a/swift/container/server.py +++ b/swift/container/server.py @@ -140,17 +140,21 @@ class ContainerController(object): def DELETE(self, req): """Handle HTTP DELETE request.""" + start_time = time.time() try: drive, part, account, container, obj = split_path( unquote(req.path), 4, 5, True) except ValueError, err: + self.logger.increment('DELETE.errors') return HTTPBadRequest(body=str(err), content_type='text/plain', request=req) if 'x-timestamp' not in req.headers or \ not check_float(req.headers['x-timestamp']): + self.logger.increment('DELETE.errors') return HTTPBadRequest(body='Missing timestamp', request=req, content_type='text/plain') if self.mount_check and not check_mount(self.root, drive): + self.logger.increment('DELETE.errors') return HTTPInsufficientStorage(drive=drive, request=req) broker = self._get_container_broker(drive, part, account, container) if account.startswith(self.auto_create_account_prefix) and obj and \ @@ -158,20 +162,25 @@ class ContainerController(object): broker.initialize(normalize_timestamp( req.headers.get('x-timestamp') or time.time())) if not os.path.exists(broker.db_file): + self.logger.timing_since('DELETE.timing', start_time) return HTTPNotFound() if obj: # delete object broker.delete_object(obj, req.headers.get('x-timestamp')) + self.logger.timing_since('DELETE.timing', start_time) return HTTPNoContent(request=req) else: # delete container if not broker.empty(): + self.logger.increment('DELETE.errors') return HTTPConflict(request=req) existed = float(broker.get_info()['put_timestamp']) and \ not broker.is_deleted() broker.delete_db(req.headers['X-Timestamp']) if not broker.is_deleted(): + self.logger.increment('DELETE.errors') return HTTPConflict(request=req) resp = self.account_update(req, account, container, broker) + self.logger.timing_since('DELETE.timing', start_time) if resp: return resp if existed: @@ -180,22 +189,27 @@ class ContainerController(object): def PUT(self, req): """Handle HTTP PUT request.""" + start_time = time.time() try: drive, part, account, container, obj = split_path( unquote(req.path), 4, 5, True) except ValueError, err: + self.logger.increment('PUT.errors') return HTTPBadRequest(body=str(err), content_type='text/plain', request=req) if 'x-timestamp' not in req.headers or \ not check_float(req.headers['x-timestamp']): + self.logger.increment('PUT.errors') return HTTPBadRequest(body='Missing timestamp', request=req, content_type='text/plain') if 'x-container-sync-to' in req.headers: err = validate_sync_to(req.headers['x-container-sync-to'], self.allowed_sync_hosts) if err: + self.logger.increment('PUT.errors') return HTTPBadRequest(err) if self.mount_check and not check_mount(self.root, drive): + self.logger.increment('PUT.errors') return HTTPInsufficientStorage(drive=drive, request=req) timestamp = normalize_timestamp(req.headers['x-timestamp']) broker = self._get_container_broker(drive, part, account, container) @@ -204,9 +218,11 @@ class ContainerController(object): not os.path.exists(broker.db_file): broker.initialize(timestamp) if not os.path.exists(broker.db_file): + self.logger.timing_since('PUT.timing', start_time) return HTTPNotFound() broker.put_object(obj, timestamp, int(req.headers['x-size']), req.headers['x-content-type'], req.headers['x-etag']) + self.logger.timing_since('PUT.timing', start_time) return HTTPCreated(request=req) else: # put container if not os.path.exists(broker.db_file): @@ -216,6 +232,7 @@ class ContainerController(object): created = broker.is_deleted() broker.update_put_timestamp(timestamp) if broker.is_deleted(): + self.logger.increment('PUT.errors') return HTTPConflict(request=req) metadata = {} metadata.update((key, (value, timestamp)) @@ -230,6 +247,7 @@ class ContainerController(object): broker.set_x_container_sync_points(-1, -1) broker.update_metadata(metadata) resp = self.account_update(req, account, container, broker) + self.logger.timing_since('PUT.timing', start_time) if resp: return resp if created: @@ -239,18 +257,22 @@ class ContainerController(object): def HEAD(self, req): """Handle HTTP HEAD request.""" + start_time = time.time() try: drive, part, account, container, obj = split_path( unquote(req.path), 4, 5, True) except ValueError, err: + self.logger.increment('HEAD.errors') return HTTPBadRequest(body=str(err), content_type='text/plain', request=req) if self.mount_check and not check_mount(self.root, drive): + self.logger.increment('HEAD.errors') return HTTPInsufficientStorage(drive=drive, request=req) broker = self._get_container_broker(drive, part, account, container) broker.pending_timeout = 0.1 broker.stale_reads_ok = True if broker.is_deleted(): + self.logger.timing_since('HEAD.timing', start_time) return HTTPNotFound(request=req) info = broker.get_info() headers = { @@ -263,22 +285,27 @@ class ContainerController(object): for key, (value, timestamp) in broker.metadata.iteritems() if value != '' and (key.lower() in self.save_headers or key.lower().startswith('x-container-meta-'))) + self.logger.timing_since('HEAD.timing', start_time) return HTTPNoContent(request=req, headers=headers) def GET(self, req): """Handle HTTP GET request.""" + start_time = time.time() try: drive, part, account, container, obj = split_path( unquote(req.path), 4, 5, True) except ValueError, err: + self.logger.increment('GET.errors') return HTTPBadRequest(body=str(err), content_type='text/plain', request=req) if self.mount_check and not check_mount(self.root, drive): + self.logger.increment('GET.errors') return HTTPInsufficientStorage(drive=drive, request=req) broker = self._get_container_broker(drive, part, account, container) broker.pending_timeout = 0.1 broker.stale_reads_ok = True if broker.is_deleted(): + self.logger.timing_since('GET.timing', start_time) return HTTPNotFound(request=req) info = broker.get_info() resp_headers = { @@ -309,6 +336,7 @@ class ContainerController(object): body='Maximum limit is %d' % CONTAINER_LISTING_LIMIT) query_format = get_param(req, 'format') except UnicodeDecodeError, err: + self.logger.increment('GET.errors') return HTTPBadRequest(body='parameters not utf8', content_type='text/plain', request=req) if query_format: @@ -319,6 +347,7 @@ class ContainerController(object): 'application/xml', 'text/xml'], default_match='text/plain') except AssertionError, err: + self.logger.increment('GET.errors') return HTTPBadRequest(body='bad accept header: %s' % req.accept, content_type='text/plain', request=req) container_list = broker.list_objects_iter(limit, marker, end_marker, @@ -371,53 +400,66 @@ class ContainerController(object): ''.join(xml_output), '']) else: if not container_list: + self.logger.timing_since('GET.timing', start_time) return HTTPNoContent(request=req, headers=resp_headers) container_list = '\n'.join(r[0] for r in container_list) + '\n' ret = Response(body=container_list, request=req, headers=resp_headers) ret.content_type = out_content_type ret.charset = 'utf-8' + self.logger.timing_since('GET.timing', start_time) return ret def REPLICATE(self, req): """ Handle HTTP REPLICATE request (json-encoded RPC calls for replication.) """ + start_time = time.time() try: post_args = split_path(unquote(req.path), 3) except ValueError, err: + self.logger.increment('REPLICATE.errors') return HTTPBadRequest(body=str(err), content_type='text/plain', request=req) drive, partition, hash = post_args if self.mount_check and not check_mount(self.root, drive): + self.logger.increment('REPLICATE.errors') return HTTPInsufficientStorage(drive=drive, request=req) try: args = simplejson.load(req.environ['wsgi.input']) except ValueError, err: + self.logger.increment('REPLICATE.errors') return HTTPBadRequest(body=str(err), content_type='text/plain') ret = self.replicator_rpc.dispatch(post_args, args) ret.request = req + self.logger.timing_since('REPLICATE.timing', start_time) return ret def POST(self, req): """Handle HTTP POST request.""" + start_time = time.time() try: drive, part, account, container = split_path(unquote(req.path), 4) except ValueError, err: + self.logger.increment('POST.errors') return HTTPBadRequest(body=str(err), content_type='text/plain', request=req) if 'x-timestamp' not in req.headers or \ not check_float(req.headers['x-timestamp']): + self.logger.increment('POST.errors') return HTTPBadRequest(body='Missing or bad timestamp', request=req, content_type='text/plain') if 'x-container-sync-to' in req.headers: err = validate_sync_to(req.headers['x-container-sync-to'], self.allowed_sync_hosts) if err: + self.logger.increment('POST.errors') return HTTPBadRequest(err) if self.mount_check and not check_mount(self.root, drive): + self.logger.increment('POST.errors') return HTTPInsufficientStorage(drive=drive, request=req) broker = self._get_container_broker(drive, part, account, container) if broker.is_deleted(): + self.logger.timing_since('POST.timing', start_time) return HTTPNotFound(request=req) timestamp = normalize_timestamp(req.headers['x-timestamp']) metadata = {} @@ -432,6 +474,7 @@ class ContainerController(object): broker.metadata['X-Container-Sync-To'][0]: broker.set_x_container_sync_points(-1, -1) broker.update_metadata(metadata) + self.logger.timing_since('POST.timing', start_time) return HTTPNoContent(request=req) def __call__(self, env, start_response): diff --git a/swift/container/sync.py b/swift/container/sync.py index 6d8b8d26d7..8d09a46a38 100644 --- a/swift/container/sync.py +++ b/swift/container/sync.py @@ -273,6 +273,7 @@ class ContainerSync(Daemon): sync_key = value if not sync_to or not sync_key: self.container_skips += 1 + self.logger.increment('skips') return sync_to = sync_to.rstrip('/') err = validate_sync_to(sync_to, self.allowed_sync_hosts) @@ -282,6 +283,7 @@ class ContainerSync(Daemon): {'db_file': broker.db_file, 'validate_sync_to_err': err}) self.container_failures += 1 + self.logger.increment('failures') return stop_at = time() + self.container_time while time() < stop_at and sync_point2 < sync_point1: @@ -324,8 +326,10 @@ class ContainerSync(Daemon): sync_point1 = row['ROWID'] broker.set_x_container_sync_points(sync_point1, None) self.container_syncs += 1 + self.logger.increment('syncs') except (Exception, Timeout), err: self.container_failures += 1 + self.logger.increment('failures') self.logger.exception(_('ERROR Syncing %s'), (broker.db_file)) def container_sync_row(self, row, sync_to, sync_key, broker, info): @@ -343,6 +347,7 @@ class ContainerSync(Daemon): :returns: True on success """ try: + start_time = time() if row['deleted']: try: delete_object(sync_to, name=row['name'], @@ -353,6 +358,8 @@ class ContainerSync(Daemon): if err.http_status != HTTP_NOT_FOUND: raise self.container_deletes += 1 + self.logger.increment('deletes') + self.logger.timing_since('deletes.timing', start_time) else: part, nodes = self.object_ring.get_nodes( info['account'], info['container'], @@ -398,6 +405,8 @@ class ContainerSync(Daemon): put_object(sync_to, name=row['name'], headers=headers, contents=_Iter2FileLikeObject(body), proxy=self.proxy) self.container_puts += 1 + self.logger.increment('puts') + self.logger.timing_since('puts.timing', start_time) except ClientException, err: if err.http_status == HTTP_UNAUTHORIZED: self.logger.info(_('Unauth %(sync_from)r ' @@ -416,11 +425,13 @@ class ContainerSync(Daemon): _('ERROR Syncing %(db_file)s %(row)s'), {'db_file': broker.db_file, 'row': row}) self.container_failures += 1 + self.logger.increment('failures') return False except (Exception, Timeout), err: self.logger.exception( _('ERROR Syncing %(db_file)s %(row)s'), {'db_file': broker.db_file, 'row': row}) self.container_failures += 1 + self.logger.increment('failures') return False return True diff --git a/swift/container/updater.py b/swift/container/updater.py index db9621183e..74f30172ff 100644 --- a/swift/container/updater.py +++ b/swift/container/updater.py @@ -194,6 +194,7 @@ class ContainerUpdater(Daemon): :param dbfile: container DB to process """ + start_time = time.time() broker = ContainerBroker(dbfile, logger=self.logger) info = broker.get_info() # Don't send updates if the container was auto-created since it @@ -220,6 +221,7 @@ class ContainerUpdater(Daemon): else: failures += 1 if successes > failures: + self.logger.increment('successes') self.successes += 1 self.logger.debug( _('Update report sent for %(container)s %(dbfile)s'), @@ -228,6 +230,7 @@ class ContainerUpdater(Daemon): info['delete_timestamp'], info['object_count'], info['bytes_used']) else: + self.logger.increment('failures') self.failures += 1 self.logger.debug( _('Update report failed for %(container)s %(dbfile)s'), @@ -237,7 +240,10 @@ class ContainerUpdater(Daemon): if self.new_account_suppressions: print >>self.new_account_suppressions, \ info['account'], until + # Only track timing data for attempted updates: + self.logger.timing_since('timing', start_time) else: + self.logger.increment('no_changes') self.no_changes += 1 def container_report(self, node, part, container, put_timestamp, diff --git a/swift/obj/auditor.py b/swift/obj/auditor.py index e77dfcff1e..212fb1b449 100644 --- a/swift/obj/auditor.py +++ b/swift/obj/auditor.py @@ -72,6 +72,7 @@ class AuditorWorker(object): for path, device, partition in all_locs: loop_time = time.time() self.object_audit(path, device, partition) + self.logger.timing_since('timing', loop_time) self.files_running_time = ratelimit_sleep( self.files_running_time, self.max_files_per_second) self.total_files_processed += 1 @@ -162,6 +163,7 @@ class AuditorWorker(object): finally: df.close(verify_file=False) except AuditException, err: + self.logger.increment('quarantines') self.quarantines += 1 self.logger.error(_('ERROR Object %(obj)s failed audit and will ' 'be quarantined: %(err)s'), {'obj': path, 'err': err}) @@ -169,6 +171,7 @@ class AuditorWorker(object): os.path.join(self.devices, device), path) return except (Exception, Timeout): + self.logger.increment('errors') self.errors += 1 self.logger.exception(_('ERROR Trying to audit %s'), path) return diff --git a/swift/obj/expirer.py b/swift/obj/expirer.py index 3d428eae30..c8bd12389d 100644 --- a/swift/obj/expirer.py +++ b/swift/obj/expirer.py @@ -103,15 +103,19 @@ class ObjectExpirer(Daemon): timestamp = int(timestamp) if timestamp > int(time()): break + start_time = time() try: self.delete_actual_object(actual_obj, timestamp) self.swift.delete_object(self.expiring_objects_account, container, obj) self.report_objects += 1 + self.logger.increment('objects') except (Exception, Timeout), err: + self.logger.increment('errors') self.logger.exception( _('Exception while deleting object %s %s %s') % (container, obj, str(err))) + self.logger.timing_since('timing', start_time) self.report() try: self.swift.delete_container( diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index bd80ccd705..9bd16f8dc4 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -358,6 +358,7 @@ class ObjectReplicator(Daemon): return [suff for suff in os.listdir(path) if len(suff) == 3 and isdir(join(path, suff))] self.replication_count += 1 + self.logger.increment('partition.delete.count.%s' % (job['device'],)) begin = time.time() try: responses = [] @@ -380,6 +381,7 @@ class ObjectReplicator(Daemon): self.logger.exception(_("Error syncing handoff partition")) finally: self.partition_times.append(time.time() - begin) + self.logger.timing_since('partition.delete.timing', begin) def update(self, job): """ @@ -388,6 +390,7 @@ class ObjectReplicator(Daemon): :param job: a dict containing info about the partition to be replicated """ self.replication_count += 1 + self.logger.increment('partition.update.count.%s' % (job['device'],)) begin = time.time() try: hashed, local_hash = tpool.execute(tpooled_get_hashes, job['path'], @@ -397,6 +400,7 @@ class ObjectReplicator(Daemon): if isinstance(hashed, BaseException): raise hashed self.suffix_hash += hashed + self.logger.update_stats('suffix.hashes', hashed) attempts_left = len(job['nodes']) nodes = itertools.chain(job['nodes'], self.object_ring.get_more_nodes(int(job['partition']))) @@ -431,6 +435,7 @@ class ObjectReplicator(Daemon): # See tpooled_get_hashes "Hack". if isinstance(hashed, BaseException): raise hashed + self.logger.update_stats('suffix.hashes', hashed) local_hash = recalc_hash suffixes = [suffix for suffix in local_hash if local_hash[suffix] != remote_hash.get(suffix, -1)] @@ -442,6 +447,7 @@ class ObjectReplicator(Daemon): headers={'Content-Length': '0'}) conn.getresponse().read() self.suffix_sync += len(suffixes) + self.logger.update_stats('suffix.syncs', len(suffixes)) except (Exception, Timeout): self.logger.exception(_("Error syncing with node: %s") % node) @@ -450,6 +456,7 @@ class ObjectReplicator(Daemon): self.logger.exception(_("Error syncing partition")) finally: self.partition_times.append(time.time() - begin) + self.logger.timing_since('partition.update.timing', begin) def stats_line(self): """ @@ -538,6 +545,7 @@ class ObjectReplicator(Daemon): nodes = [node for node in part_nodes if node['id'] != local_dev['id']] jobs.append(dict(path=join(obj_path, partition), + device=local_dev['device'], nodes=nodes, delete=len(nodes) > len(part_nodes) - 1, partition=partition)) diff --git a/swift/obj/server.py b/swift/obj/server.py index f155732967..1669385a8e 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -314,6 +314,7 @@ class DiskFile(object): if not (self.is_deleted() or self.quarantined_dir): self.quarantined_dir = quarantine_renamer(self.device_path, self.data_file) + self.logger.increment('quarantines') return self.quarantined_dir def get_data_file_size(self): @@ -420,6 +421,7 @@ class ObjectController(object): {'ip': ip, 'port': port, 'dev': contdevice}) async_dir = os.path.join(self.devices, objdevice, ASYNCDIR) ohash = hash_path(account, container, obj) + self.logger.increment('async_pendings') write_pickle( {'op': op, 'account': account, 'container': container, 'obj': obj, 'headers': headers_out}, @@ -479,27 +481,33 @@ class ObjectController(object): def POST(self, request): """Handle HTTP POST requests for the Swift Object Server.""" + start_time = time.time() try: device, partition, account, container, obj = \ split_path(unquote(request.path), 5, 5, True) except ValueError, err: + self.logger.increment('POST.errors') return HTTPBadRequest(body=str(err), request=request, content_type='text/plain') if 'x-timestamp' not in request.headers or \ not check_float(request.headers['x-timestamp']): + self.logger.increment('POST.errors') return HTTPBadRequest(body='Missing timestamp', request=request, content_type='text/plain') new_delete_at = int(request.headers.get('X-Delete-At') or 0) if new_delete_at and new_delete_at < time.time(): + self.logger.increment('POST.errors') return HTTPBadRequest(body='X-Delete-At in past', request=request, content_type='text/plain') if self.mount_check and not check_mount(self.devices, device): + self.logger.increment('POST.errors') return HTTPInsufficientStorage(drive=device, request=request) file = DiskFile(self.devices, device, partition, account, container, obj, self.logger, disk_chunk_size=self.disk_chunk_size) if 'X-Delete-At' in file.metadata and \ int(file.metadata['X-Delete-At']) <= time.time(): + self.logger.timing_since('POST.timing', start_time) return HTTPNotFound(request=request) if file.is_deleted(): response_class = HTTPNotFound @@ -527,27 +535,34 @@ class ObjectController(object): container, obj, request.headers, device) with file.mkstemp() as (fd, tmppath): file.put(fd, tmppath, metadata, extension='.meta') + self.logger.timing_since('POST.timing', start_time) return response_class(request=request) def PUT(self, request): """Handle HTTP PUT requests for the Swift Object Server.""" + start_time = time.time() try: device, partition, account, container, obj = \ split_path(unquote(request.path), 5, 5, True) except ValueError, err: + self.logger.increment('PUT.errors') return HTTPBadRequest(body=str(err), request=request, content_type='text/plain') if self.mount_check and not check_mount(self.devices, device): + self.logger.increment('PUT.errors') return HTTPInsufficientStorage(drive=device, request=request) if 'x-timestamp' not in request.headers or \ not check_float(request.headers['x-timestamp']): + self.logger.increment('PUT.errors') return HTTPBadRequest(body='Missing timestamp', request=request, content_type='text/plain') error_response = check_object_creation(request, obj) if error_response: + self.logger.increment('PUT.errors') return error_response new_delete_at = int(request.headers.get('X-Delete-At') or 0) if new_delete_at and new_delete_at < time.time(): + self.logger.increment('PUT.errors') return HTTPBadRequest(body='X-Delete-At in past', request=request, content_type='text/plain') file = DiskFile(self.devices, device, partition, account, container, @@ -564,6 +579,7 @@ class ObjectController(object): for chunk in iter(lambda: reader(self.network_chunk_size), ''): upload_size += len(chunk) if time.time() > upload_expiration: + self.logger.increment('PUT.timeouts') return HTTPRequestTimeout(request=request) etag.update(chunk) while chunk: @@ -616,17 +632,21 @@ class ObjectController(object): 'x-trans-id': request.headers.get('x-trans-id', '-')}, device) resp = HTTPCreated(request=request, etag=etag) + self.logger.timing_since('PUT.timing', start_time) return resp def GET(self, request): """Handle HTTP GET requests for the Swift Object Server.""" + start_time = time.time() try: device, partition, account, container, obj = \ split_path(unquote(request.path), 5, 5, True) except ValueError, err: + self.logger.increment('GET.errors') return HTTPBadRequest(body=str(err), request=request, content_type='text/plain') if self.mount_check and not check_mount(self.devices, device): + self.logger.increment('GET.errors') return HTTPInsufficientStorage(drive=device, request=request) file = DiskFile(self.devices, device, partition, account, container, obj, self.logger, keep_data_fp=True, @@ -634,43 +654,52 @@ class ObjectController(object): if file.is_deleted() or ('X-Delete-At' in file.metadata and int(file.metadata['X-Delete-At']) <= time.time()): if request.headers.get('if-match') == '*': + self.logger.timing_since('GET.timing', start_time) return HTTPPreconditionFailed(request=request) else: + self.logger.timing_since('GET.timing', start_time) return HTTPNotFound(request=request) try: file_size = file.get_data_file_size() except (DiskFileError, DiskFileNotExist): file.quarantine() + self.logger.timing_since('GET.timing', start_time) return HTTPNotFound(request=request) if request.headers.get('if-match') not in (None, '*') and \ file.metadata['ETag'] not in request.if_match: file.close() + self.logger.timing_since('GET.timing', start_time) return HTTPPreconditionFailed(request=request) if request.headers.get('if-none-match') is not None: if file.metadata['ETag'] in request.if_none_match: resp = HTTPNotModified(request=request) resp.etag = file.metadata['ETag'] file.close() + self.logger.timing_since('GET.timing', start_time) return resp try: if_unmodified_since = request.if_unmodified_since except (OverflowError, ValueError): # catches timestamps before the epoch + self.logger.increment('GET.errors') return HTTPPreconditionFailed(request=request) if if_unmodified_since and \ datetime.fromtimestamp(float(file.metadata['X-Timestamp']), UTC) > \ if_unmodified_since: file.close() + self.logger.timing_since('GET.timing', start_time) return HTTPPreconditionFailed(request=request) try: if_modified_since = request.if_modified_since except (OverflowError, ValueError): # catches timestamps before the epoch + self.logger.increment('GET.errors') return HTTPPreconditionFailed(request=request) if if_modified_since and \ datetime.fromtimestamp(float(file.metadata['X-Timestamp']), UTC) < \ if_modified_since: file.close() + self.logger.timing_since('GET.timing', start_time) return HTTPNotModified(request=request) response = Response(app_iter=file, request=request, conditional_response=True) @@ -690,29 +719,35 @@ class ObjectController(object): if 'Content-Encoding' in file.metadata: response.content_encoding = file.metadata['Content-Encoding'] response.headers['X-Timestamp'] = file.metadata['X-Timestamp'] + self.logger.timing_since('GET.timing', start_time) return request.get_response(response) def HEAD(self, request): """Handle HTTP HEAD requests for the Swift Object Server.""" + start_time = time.time() try: device, partition, account, container, obj = \ split_path(unquote(request.path), 5, 5, True) except ValueError, err: + self.logger.increment('HEAD.errors') resp = HTTPBadRequest(request=request) resp.content_type = 'text/plain' resp.body = str(err) return resp if self.mount_check and not check_mount(self.devices, device): + self.logger.increment('HEAD.errors') return HTTPInsufficientStorage(drive=device, request=request) file = DiskFile(self.devices, device, partition, account, container, obj, self.logger, disk_chunk_size=self.disk_chunk_size) if file.is_deleted() or ('X-Delete-At' in file.metadata and int(file.metadata['X-Delete-At']) <= time.time()): + self.logger.timing_since('HEAD.timing', start_time) return HTTPNotFound(request=request) try: file_size = file.get_data_file_size() except (DiskFileError, DiskFileNotExist): file.quarantine() + self.logger.timing_since('HEAD.timing', start_time) return HTTPNotFound(request=request) response = Response(request=request, conditional_response=True) response.headers['Content-Type'] = file.metadata.get('Content-Type', @@ -729,21 +764,26 @@ class ObjectController(object): if 'Content-Encoding' in file.metadata: response.content_encoding = file.metadata['Content-Encoding'] response.headers['X-Timestamp'] = file.metadata['X-Timestamp'] + self.logger.timing_since('HEAD.timing', start_time) return response def DELETE(self, request): """Handle HTTP DELETE requests for the Swift Object Server.""" + start_time = time.time() try: device, partition, account, container, obj = \ split_path(unquote(request.path), 5, 5, True) except ValueError, e: + self.logger.increment('DELETE.errors') return HTTPBadRequest(body=str(e), request=request, content_type='text/plain') if 'x-timestamp' not in request.headers or \ not check_float(request.headers['x-timestamp']): + self.logger.increment('DELETE.errors') return HTTPBadRequest(body='Missing timestamp', request=request, content_type='text/plain') if self.mount_check and not check_mount(self.devices, device): + self.logger.increment('DELETE.errors') return HTTPInsufficientStorage(drive=device, request=request) response_class = HTTPNoContent file = DiskFile(self.devices, device, partition, account, container, @@ -751,6 +791,7 @@ class ObjectController(object): if 'x-if-delete-at' in request.headers and \ int(request.headers['x-if-delete-at']) != \ int(file.metadata.get('X-Delete-At') or 0): + self.logger.timing_since('DELETE.timing', start_time) return HTTPPreconditionFailed(request=request, body='X-If-Delete-At and X-Delete-At do not match') orig_timestamp = file.metadata.get('X-Timestamp') @@ -773,6 +814,7 @@ class ObjectController(object): 'x-trans-id': request.headers.get('x-trans-id', '-')}, device) resp = response_class(request=request) + self.logger.timing_since('DELETE.timing', start_time) return resp def REPLICATE(self, request): @@ -780,13 +822,16 @@ class ObjectController(object): Handle REPLICATE requests for the Swift Object Server. This is used by the object replicator to get hashes for directories. """ + start_time = time.time() try: device, partition, suffix = split_path( unquote(request.path), 2, 3, True) except ValueError, e: + self.logger.increment('REPLICATE.errors') return HTTPBadRequest(body=str(e), request=request, content_type='text/plain') if self.mount_check and not check_mount(self.devices, device): + self.logger.increment('REPLICATE.errors') return HTTPInsufficientStorage(drive=device, request=request) path = os.path.join(self.devices, device, DATADIR, partition) if not os.path.exists(path): @@ -796,7 +841,9 @@ class ObjectController(object): recalculate=suffixes) # See tpooled_get_hashes "Hack". if isinstance(hashes, BaseException): + self.logger.increment('REPLICATE.errors') raise hashes + self.logger.timing_since('REPLICATE.timing', start_time) return Response(body=pickle.dumps(hashes)) def __call__(self, env, start_response): diff --git a/swift/obj/updater.py b/swift/obj/updater.py index 9be1919f95..25a282af87 100644 --- a/swift/obj/updater.py +++ b/swift/obj/updater.py @@ -69,6 +69,7 @@ class ObjectUpdater(Daemon): for device in os.listdir(self.devices): if self.mount_check and not \ os.path.ismount(os.path.join(self.devices, device)): + self.logger.increment('errors') self.logger.warn( _('Skipping %s as it is not mounted'), device) continue @@ -108,6 +109,7 @@ class ObjectUpdater(Daemon): for device in os.listdir(self.devices): if self.mount_check and \ not os.path.ismount(os.path.join(self.devices, device)): + self.logger.increment('errors') self.logger.warn( _('Skipping %s as it is not mounted'), device) continue @@ -124,6 +126,7 @@ class ObjectUpdater(Daemon): :param device: path to device """ + start_time = time.time() async_pending = os.path.join(device, ASYNCDIR) if not os.path.isdir(async_pending): return @@ -139,6 +142,7 @@ class ObjectUpdater(Daemon): try: obj_hash, timestamp = update.split('-') except ValueError: + self.logger.increment('errors') self.logger.error( _('ERROR async pending file with unexpected name %s') % (update_path)) @@ -153,6 +157,7 @@ class ObjectUpdater(Daemon): os.rmdir(prefix_path) except OSError: pass + self.logger.timing_since('timing', start_time) def process_object_update(self, update_path, device): """ @@ -166,6 +171,7 @@ class ObjectUpdater(Daemon): except Exception: self.logger.exception( _('ERROR Pickle problem, quarantining %s'), update_path) + self.logger.increment('quarantines') renamer(update_path, os.path.join(device, 'quarantined', 'objects', os.path.basename(update_path))) return @@ -187,11 +193,13 @@ class ObjectUpdater(Daemon): new_successes = True if success: self.successes += 1 + self.logger.increment('successes') self.logger.debug(_('Update sent for %(obj)s %(path)s'), {'obj': obj, 'path': update_path}) os.unlink(update_path) else: self.failures += 1 + self.logger.increment('failures') self.logger.debug(_('Update failed for %(obj)s %(path)s'), {'obj': obj, 'path': update_path}) if new_successes: diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 77697be7f7..0c105a6db5 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -88,7 +88,8 @@ def update_headers(response, headers): def public(func): """ - Decorator to declare which methods are public accessible as HTTP requests + Decorator to declare which methods are publicly accessible as HTTP + requests :param func: function to make public """ @@ -106,7 +107,7 @@ def delay_denial(func): delayed. This is so the method can load the Request object up with additional information that may be needed by the authorization system. - :param func: function to delay authorization on + :param func: function for which authorization will be delayed """ func.delay_denial = True @@ -655,12 +656,12 @@ class Controller(object): @public def GET(self, req): """Handler for HTTP GET requests.""" - return self.GETorHEAD(req) + return self.GETorHEAD(req, stats_type='GET') @public def HEAD(self, req): """Handler for HTTP HEAD requests.""" - return self.GETorHEAD(req) + return self.GETorHEAD(req, stats_type='HEAD') def _make_app_iter_reader(self, node, source, queue): """ @@ -685,6 +686,7 @@ class Controller(object): self.app.logger.warn( _('Client did not read from queue within %ss') % self.app.client_timeout) + self.app.logger.increment('client_timeouts') except (Exception, Timeout): self.exception_occurred(node, _('Object'), _('Trying to read during GET')) @@ -906,13 +908,15 @@ class ObjectController(Controller): for obj in sublisting: yield obj - def GETorHEAD(self, req): + def GETorHEAD(self, req, stats_type): """Handle HTTP GET or HEAD requests.""" + start_time = time.time() _junk, _junk, req.acl, _junk, _junk, object_versions = \ self.container_info(self.account_name, self.container_name) if 'swift.authorize' in req.environ: aresp = req.environ['swift.authorize'](req) if aresp: + self.app.logger.increment('auth_short_circuits') return aresp partition, nodes = self.app.object_ring.get_nodes( self.account_name, self.container_name, self.object_name) @@ -931,6 +935,8 @@ class ObjectController(Controller): self.iter_nodes(partition, nodes, self.app.object_ring), req.path_info, len(nodes)) if 'x-object-manifest' not in resp2.headers: + self.app.logger.timing_since( + '%s.timing' % (stats_type,), start_time) return resp resp = resp2 req.range = req_range @@ -942,10 +948,14 @@ class ObjectController(Controller): listing = list(self._listing_iter(lcontainer, lprefix, req.environ)) except ListingIterNotFound: + self.app.logger.timing_since( + '%s.timing' % (stats_type,), start_time) return HTTPNotFound(request=req) except ListingIterNotAuthorized, err: + self.app.logger.increment('auth_short_circuits') return err.aresp except ListingIterError: + self.app.logger.increment('errors') return HTTPServerError(request=req) if len(listing) > CONTAINER_LISTING_LIMIT: @@ -964,6 +974,8 @@ class ObjectController(Controller): return iter([]) head_response.status_int = resp.status_int + self.app.logger.timing_since( + '%s.timing' % (stats_type,), start_time) return head_response else: resp.app_iter = SegmentedIterable(self, lcontainer, @@ -993,31 +1005,34 @@ class ObjectController(Controller): resp.etag = etag resp.headers['accept-ranges'] = 'bytes' + self.app.logger.timing_since('%s.timing' % (stats_type,), start_time) return resp @public @delay_denial def GET(self, req): """Handler for HTTP GET requests.""" - return self.GETorHEAD(req) + return self.GETorHEAD(req, stats_type='GET') @public @delay_denial def HEAD(self, req): """Handler for HTTP HEAD requests.""" - return self.GETorHEAD(req) + return self.GETorHEAD(req, stats_type='HEAD') @public @delay_denial def POST(self, req): """HTTP POST request handler.""" + start_time = time.time() if 'x-delete-after' in req.headers: try: x_delete_after = int(req.headers['x-delete-after']) except ValueError: - return HTTPBadRequest(request=req, - content_type='text/plain', - body='Non-integer X-Delete-After') + self.app.logger.increment('errors') + return HTTPBadRequest(request=req, + content_type='text/plain', + body='Non-integer X-Delete-After') req.headers['x-delete-at'] = '%d' % (time.time() + x_delete_after) if self.app.object_post_as_copy: req.method = 'PUT' @@ -1028,7 +1043,7 @@ class ObjectController(Controller): self.object_name)) req.headers['X-Fresh-Metadata'] = 'true' req.environ['swift_versioned_copy'] = True - resp = self.PUT(req) + resp = self.PUT(req, start_time=start_time, stats_type='POST') # Older editions returned 202 Accepted on object POSTs, so we'll # convert any 201 Created responses to that for compatibility with # picky clients. @@ -1038,6 +1053,7 @@ class ObjectController(Controller): else: error_response = check_metadata(req, 'object') if error_response: + self.app.logger.increment('errors') return error_response container_partition, containers, _junk, req.acl, _junk, _junk = \ self.container_info(self.account_name, self.container_name, @@ -1045,16 +1061,20 @@ class ObjectController(Controller): if 'swift.authorize' in req.environ: aresp = req.environ['swift.authorize'](req) if aresp: + self.app.logger.increment('auth_short_circuits') return aresp if not containers: + self.app.logger.timing_since('POST.timing', start_time) return HTTPNotFound(request=req) if 'x-delete-at' in req.headers: try: x_delete_at = int(req.headers['x-delete-at']) if x_delete_at < time.time(): + self.app.logger.increment('errors') return HTTPBadRequest(body='X-Delete-At in past', request=req, content_type='text/plain') except ValueError: + self.app.logger.increment('errors') return HTTPBadRequest(request=req, content_type='text/plain', body='Non-integer X-Delete-At') @@ -1082,8 +1102,10 @@ class ObjectController(Controller): nheaders['X-Delete-At-Partition'] = delete_at_part nheaders['X-Delete-At-Device'] = node['device'] headers.append(nheaders) - return self.make_requests(req, self.app.object_ring, - partition, 'POST', req.path_info, headers) + resp = self.make_requests(req, self.app.object_ring, partition, + 'POST', req.path_info, headers) + self.app.logger.timing_since('POST.timing', start_time) + return resp def _send_file(self, conn, path): """Method for a file PUT coro""" @@ -1119,8 +1141,10 @@ class ObjectController(Controller): @public @delay_denial - def PUT(self, req): + def PUT(self, req, start_time=None, stats_type='PUT'): """HTTP PUT request handler.""" + if not start_time: + start_time = time.time() (container_partition, containers, _junk, req.acl, req.environ['swift_sync_key'], object_versions) = \ self.container_info(self.account_name, self.container_name, @@ -1128,13 +1152,17 @@ class ObjectController(Controller): if 'swift.authorize' in req.environ: aresp = req.environ['swift.authorize'](req) if aresp: + self.app.logger.increment('auth_short_circuits') return aresp if not containers: + self.app.logger.timing_since( + '%s.timing' % (stats_type,), start_time) return HTTPNotFound(request=req) if 'x-delete-after' in req.headers: try: x_delete_after = int(req.headers['x-delete-after']) except ValueError: + self.app.logger.increment('errors') return HTTPBadRequest(request=req, content_type='text/plain', body='Non-integer X-Delete-After') @@ -1143,9 +1171,11 @@ class ObjectController(Controller): try: x_delete_at = int(req.headers['x-delete-at']) if x_delete_at < time.time(): + self.app.logger.increment('errors') return HTTPBadRequest(body='X-Delete-At in past', request=req, content_type='text/plain') except ValueError: + self.app.logger.increment('errors') return HTTPBadRequest(request=req, content_type='text/plain', body='Non-integer X-Delete-At') delete_at_container = str(x_delete_at / @@ -1173,8 +1203,11 @@ class ObjectController(Controller): if 'swift_x_timestamp' in hresp.environ and \ float(hresp.environ['swift_x_timestamp']) >= \ float(req.headers['x-timestamp']): + self.app.logger.timing_since( + '%.timing' % (stats_type,), start_time) return HTTPAccepted(request=req) except ValueError: + self.app.logger.increment('errors') return HTTPBadRequest(request=req, content_type='text/plain', body='X-Timestamp should be a UNIX timestamp float value; ' 'was %r' % req.headers['x-timestamp']) @@ -1189,6 +1222,7 @@ class ObjectController(Controller): content_type_manually_set = False error_response = check_object_creation(req, self.object_name) if error_response: + self.app.logger.increment('errors') return error_response if object_versions and not req.environ.get('swift_versioned_copy'): is_manifest = 'x-object-manifest' in req.headers or \ @@ -1237,6 +1271,7 @@ class ObjectController(Controller): src_container_name, src_obj_name = \ source_header.split('/', 3)[2:] except ValueError: + self.app.logger.increment('errors') return HTTPPreconditionFailed(request=req, body='X-Copy-From header must be of the form' '/') @@ -1249,6 +1284,8 @@ class ObjectController(Controller): self.container_name = src_container_name source_resp = self.GET(source_req) if source_resp.status_int >= HTTP_MULTIPLE_CHOICES: + self.app.logger.timing_since( + '%s.timing' % (stats_type,), start_time) return source_resp self.object_name = orig_obj_name self.container_name = orig_container_name @@ -1261,6 +1298,7 @@ class ObjectController(Controller): # which currently only happens because there are more than # CONTAINER_LISTING_LIMIT segments in a segmented object. In # this case, we're going to refuse to do the server-side copy. + self.app.logger.increment('errors') return HTTPRequestEntityTooLarge(request=req) new_req.etag = source_resp.etag # we no longer need the X-Copy-From header @@ -1299,6 +1337,7 @@ class ObjectController(Controller): _('Object PUT returning 503, %(conns)s/%(nodes)s ' 'required connections'), {'conns': len(conns), 'nodes': len(nodes) // 2 + 1}) + self.app.logger.increment('errors') return HTTPServiceUnavailable(request=req) chunked = req.headers.get('transfer-encoding') try: @@ -1318,6 +1357,7 @@ class ObjectController(Controller): break req.bytes_transferred += len(chunk) if req.bytes_transferred > MAX_FILE_SIZE: + self.app.logger.increment('errors') return HTTPRequestEntityTooLarge(request=req) for conn in list(conns): if not conn.failed: @@ -1329,6 +1369,7 @@ class ObjectController(Controller): self.app.logger.error(_('Object PUT exceptions during' ' send, %(conns)s/%(nodes)s required connections'), {'conns': len(conns), 'nodes': len(nodes) / 2 + 1}) + self.app.logger.increment('errors') return HTTPServiceUnavailable(request=req) for conn in conns: if conn.queue.unfinished_tasks: @@ -1337,16 +1378,23 @@ class ObjectController(Controller): except ChunkReadTimeout, err: self.app.logger.warn( _('ERROR Client read timeout (%ss)'), err.seconds) + self.app.logger.increment('client_timeouts') return HTTPRequestTimeout(request=req) except (Exception, Timeout): req.client_disconnect = True self.app.logger.exception( _('ERROR Exception causing client disconnect')) + self.app.logger.increment('client_disconnects') + self.app.logger.timing_since( + '%s.timing' % (stats_type,), start_time) return HTTPClientDisconnect(request=req) if req.content_length and req.bytes_transferred < req.content_length: req.client_disconnect = True self.app.logger.warn( _('Client disconnected without sending enough data')) + self.app.logger.increment('client_disconnects') + self.app.logger.timing_since( + '%s.timing' % (stats_type,), start_time) return HTTPClientDisconnect(request=req) statuses = [] reasons = [] @@ -1372,6 +1420,7 @@ class ObjectController(Controller): if len(etags) > 1: self.app.logger.error( _('Object servers returned %s mismatched etags'), len(etags)) + self.app.logger.increment('errors') return HTTPServerError(request=req) etag = len(etags) and etags.pop() or None while len(statuses) < len(nodes): @@ -1392,12 +1441,14 @@ class ObjectController(Controller): # reset the bytes, since the user didn't actually send anything req.bytes_transferred = 0 resp.last_modified = float(req.headers['X-Timestamp']) + self.app.logger.timing_since('%s.timing' % (stats_type,), start_time) return resp @public @delay_denial def DELETE(self, req): """HTTP DELETE request handler.""" + start_time = time.time() (container_partition, containers, _junk, req.acl, req.environ['swift_sync_key'], object_versions) = \ self.container_info(self.account_name, self.container_name) @@ -1415,8 +1466,10 @@ class ObjectController(Controller): # no worries, last_item is None pass except ListingIterNotAuthorized, err: + self.app.logger.increment('auth_short_circuits') return err.aresp except ListingIterError: + self.app.logger.increment('errors') return HTTPServerError(request=req) if last_item: # there are older versions so copy the previous version to the @@ -1454,8 +1507,10 @@ class ObjectController(Controller): if 'swift.authorize' in req.environ: aresp = req.environ['swift.authorize'](req) if aresp: + self.app.logger.increment('auth_short_circuits') return aresp if not containers: + self.app.logger.timing_since('DELETE.timing', start_time) return HTTPNotFound(request=req) partition, nodes = self.app.object_ring.get_nodes( self.account_name, self.container_name, self.object_name) @@ -1465,6 +1520,7 @@ class ObjectController(Controller): req.headers['X-Timestamp'] = \ normalize_timestamp(float(req.headers['x-timestamp'])) except ValueError: + self.app.logger.increment('errors') return HTTPBadRequest(request=req, content_type='text/plain', body='X-Timestamp should be a UNIX timestamp float value; ' 'was %r' % req.headers['x-timestamp']) @@ -1478,15 +1534,19 @@ class ObjectController(Controller): nheaders['X-Container-Partition'] = container_partition nheaders['X-Container-Device'] = container['device'] headers.append(nheaders) - return self.make_requests(req, self.app.object_ring, + resp = self.make_requests(req, self.app.object_ring, partition, 'DELETE', req.path_info, headers) + self.app.logger.timing_since('DELETE.timing', start_time) + return resp @public @delay_denial def COPY(self, req): """HTTP COPY request handler.""" + start_time = time.time() dest = req.headers.get('Destination') if not dest: + self.app.logger.increment('errors') return HTTPPreconditionFailed(request=req, body='Destination header required') dest = unquote(dest) @@ -1495,6 +1555,7 @@ class ObjectController(Controller): try: _junk, dest_container, dest_object = dest.split('/', 2) except ValueError: + self.app.logger.increment('errors') return HTTPPreconditionFailed(request=req, body='Destination header must be of the form ' '/') @@ -1508,7 +1569,7 @@ class ObjectController(Controller): req.headers['Content-Length'] = 0 req.headers['X-Copy-From'] = quote(source) del req.headers['Destination'] - return self.PUT(req) + return self.PUT(req, start_time=start_time, stats_type='COPY') class ContainerController(Controller): @@ -1537,9 +1598,12 @@ class ContainerController(Controller): return HTTPBadRequest(request=req, body=str(err)) return None - def GETorHEAD(self, req): + def GETorHEAD(self, req, stats_type): """Handler for HTTP GET/HEAD requests.""" + start_time = time.time() if not self.account_info(self.account_name)[1]: + self.app.logger.timing_since( + '%s.timing' % (stats_type,), start_time) return HTTPNotFound(request=req) part, nodes = self.app.container_ring.get_nodes( self.account_name, self.container_name) @@ -1564,37 +1628,42 @@ class ContainerController(Controller): req.acl = resp.headers.get('x-container-read') aresp = req.environ['swift.authorize'](req) if aresp: + self.app.logger.increment('auth_short_circuits') return aresp if not req.environ.get('swift_owner', False): for key in ('x-container-read', 'x-container-write', 'x-container-sync-key', 'x-container-sync-to'): if key in resp.headers: del resp.headers[key] + self.app.logger.timing_since('%s.timing' % (stats_type,), start_time) return resp @public @delay_denial def GET(self, req): """Handler for HTTP GET requests.""" - return self.GETorHEAD(req) + return self.GETorHEAD(req, stats_type='GET') @public @delay_denial def HEAD(self, req): """Handler for HTTP HEAD requests.""" - return self.GETorHEAD(req) + return self.GETorHEAD(req, stats_type='HEAD') @public def PUT(self, req): """HTTP PUT request handler.""" + start_time = time.time() error_response = \ self.clean_acls(req) or check_metadata(req, 'container') if error_response: + self.app.logger.increment('errors') return error_response if len(self.container_name) > MAX_CONTAINER_NAME_LENGTH: resp = HTTPBadRequest(request=req) resp.body = 'Container name length of %d longer than %d' % \ (len(self.container_name), MAX_CONTAINER_NAME_LENGTH) + self.app.logger.increment('errors') return resp account_partition, accounts, container_count = \ self.account_info(self.account_name, @@ -1607,6 +1676,7 @@ class ContainerController(Controller): self.app.max_containers_per_account return resp if not accounts: + self.app.logger.timing_since('PUT.timing', start_time) return HTTPNotFound(request=req) container_partition, containers = self.app.container_ring.get_nodes( self.account_name, self.container_name) @@ -1624,20 +1694,25 @@ class ContainerController(Controller): cache_key = get_container_memcache_key(self.account_name, self.container_name) self.app.memcache.delete(cache_key) - return self.make_requests(req, self.app.container_ring, + resp = self.make_requests(req, self.app.container_ring, container_partition, 'PUT', req.path_info, headers) + self.app.logger.timing_since('PUT.timing', start_time) + return resp @public def POST(self, req): """HTTP POST request handler.""" + start_time = time.time() error_response = \ self.clean_acls(req) or check_metadata(req, 'container') if error_response: + self.app.logger.increment('errors') return error_response account_partition, accounts, container_count = \ self.account_info(self.account_name, autocreate=self.app.account_autocreate) if not accounts: + self.app.logger.timing_since('POST.timing', start_time) return HTTPNotFound(request=req) container_partition, containers = self.app.container_ring.get_nodes( self.account_name, self.container_name) @@ -1649,16 +1724,20 @@ class ContainerController(Controller): cache_key = get_container_memcache_key(self.account_name, self.container_name) self.app.memcache.delete(cache_key) - return self.make_requests(req, self.app.container_ring, + resp = self.make_requests(req, self.app.container_ring, container_partition, 'POST', req.path_info, [headers] * len(containers)) + self.app.logger.timing_since('POST.timing', start_time) + return resp @public def DELETE(self, req): """HTTP DELETE request handler.""" + start_time = time.time() account_partition, accounts, container_count = \ self.account_info(self.account_name) if not accounts: + self.app.logger.timing_since('DELETE.timing', start_time) return HTTPNotFound(request=req) container_partition, containers = self.app.container_ring.get_nodes( self.account_name, self.container_name) @@ -1677,6 +1756,7 @@ class ContainerController(Controller): resp = self.make_requests(req, self.app.container_ring, container_partition, 'DELETE', req.path_info, headers) # Indicates no server had the container + self.app.logger.timing_since('DELETE.timing', start_time) if resp.status_int == HTTP_ACCEPTED: return HTTPNotFound(request=req) return resp @@ -1690,8 +1770,9 @@ class AccountController(Controller): Controller.__init__(self, app) self.account_name = unquote(account_name) - def GETorHEAD(self, req): + def GETorHEAD(self, req, stats_type): """Handler for HTTP GET/HEAD requests.""" + start_time = time.time() partition, nodes = self.app.account_ring.get_nodes(self.account_name) shuffle(nodes) resp = self.GETorHEAD_base(req, _('Account'), partition, nodes, @@ -1701,6 +1782,8 @@ class AccountController(Controller): resp = HTTPBadRequest(request=req) resp.body = 'Account name length of %d longer than %d' % \ (len(self.account_name), MAX_ACCOUNT_NAME_LENGTH) + self.app.logger.timing_since( + '%s.timing' % (stats_type,), start_time) return resp headers = {'X-Timestamp': normalize_timestamp(time.time()), 'X-Trans-Id': self.trans_id, @@ -1714,20 +1797,25 @@ class AccountController(Controller): self.account_name) resp = self.GETorHEAD_base(req, _('Account'), partition, nodes, req.path_info.rstrip('/'), len(nodes)) + self.app.logger.timing_since('%s.timing' % (stats_type,), start_time) return resp @public def PUT(self, req): """HTTP PUT request handler.""" + start_time = time.time() if not self.app.allow_account_management: + self.app.logger.timing_since('PUT.timing', start_time) return HTTPMethodNotAllowed(request=req) error_response = check_metadata(req, 'account') if error_response: + self.app.logger.increment('errors') return error_response if len(self.account_name) > MAX_ACCOUNT_NAME_LENGTH: resp = HTTPBadRequest(request=req) resp.body = 'Account name length of %d longer than %d' % \ (len(self.account_name), MAX_ACCOUNT_NAME_LENGTH) + self.app.logger.increment('errors') return resp account_partition, accounts = \ self.app.account_ring.get_nodes(self.account_name) @@ -1737,14 +1825,18 @@ class AccountController(Controller): self.transfer_headers(req.headers, headers) if self.app.memcache: self.app.memcache.delete('account%s' % req.path_info.rstrip('/')) - return self.make_requests(req, self.app.account_ring, + resp = self.make_requests(req, self.app.account_ring, account_partition, 'PUT', req.path_info, [headers] * len(accounts)) + self.app.logger.timing_since('PUT.timing', start_time) + return resp @public def POST(self, req): """HTTP POST request handler.""" + start_time = time.time() error_response = check_metadata(req, 'account') if error_response: + self.app.logger.increment('errors') return error_response account_partition, accounts = \ self.app.account_ring.get_nodes(self.account_name) @@ -1762,6 +1854,7 @@ class AccountController(Controller): resp = HTTPBadRequest(request=req) resp.body = 'Account name length of %d longer than %d' % \ (len(self.account_name), MAX_ACCOUNT_NAME_LENGTH) + self.app.logger.increment('errors') return resp resp = self.make_requests( Request.blank('/v1/' + self.account_name), @@ -1770,12 +1863,15 @@ class AccountController(Controller): if not is_success(resp.status_int): raise Exception('Could not autocreate account %r' % self.account_name) + self.app.logger.timing_since('POST.timing', start_time) return resp @public def DELETE(self, req): """HTTP DELETE request handler.""" + start_time = time.time() if not self.app.allow_account_management: + self.app.logger.timing_since('DELETE.timing', start_time) return HTTPMethodNotAllowed(request=req) account_partition, accounts = \ self.app.account_ring.get_nodes(self.account_name) @@ -1784,9 +1880,11 @@ class AccountController(Controller): 'Connection': 'close'} if self.app.memcache: self.app.memcache.delete('account%s' % req.path_info.rstrip('/')) - return self.make_requests(req, self.app.account_ring, + resp = self.make_requests(req, self.app.account_ring, account_partition, 'DELETE', req.path_info, [headers] * len(accounts)) + self.app.logger.timing_since('DELETE.timing', start_time) + return resp class BaseApplication(object): @@ -1860,7 +1958,7 @@ class BaseApplication(object): :param path: path from request :returns: tuple of (controller class, path dictionary) - :raises: ValueError (thrown by split_path) id given invalid path + :raises: ValueError (thrown by split_path) if given invalid path """ version, account, container, obj = split_path(path, 1, 4, True) d = dict(version=version, @@ -1925,21 +2023,28 @@ class BaseApplication(object): :param req: webob.Request object """ try: + self.logger.set_statsd_prefix('proxy-server') if req.content_length and req.content_length < 0: + self.logger.increment('errors') return HTTPBadRequest(request=req, body='Invalid Content-Length') try: controller, path_parts = self.get_controller(req.path) except ValueError: + self.logger.increment('errors') return HTTPNotFound(request=req) if not check_utf8(req.path_info): + self.logger.increment('errors') return HTTPPreconditionFailed(request=req, body='Invalid UTF8') if not controller: + self.logger.increment('errors') return HTTPPreconditionFailed(request=req, body='Bad URL') if self.deny_host_headers and \ req.host.split(':')[0] in self.deny_host_headers: return HTTPForbidden(request=req, body='Invalid host header') + self.logger.set_statsd_prefix('proxy-server.' + + controller.server_type) controller = controller(self, **path_parts) if 'swift.trans_id' not in req.environ: # if this wasn't set by an earlier middleware, set it now @@ -1956,6 +2061,7 @@ class BaseApplication(object): except AttributeError: handler = None if not handler: + self.logger.increment('method_not_allowed') return HTTPMethodNotAllowed(request=req) if path_parts['version']: req.path_info_pop() @@ -1973,6 +2079,7 @@ class BaseApplication(object): # Response indicates denial, but we might delay the denial # and recheck later. If not delayed, return the error now. if not getattr(handler, 'delay_denial', None): + self.logger.increment('auth_short_circuits') return resp return handler(req) except (Exception, Timeout): diff --git a/test/unit/__init__.py b/test/unit/__init__.py index a75b37ba15..da70bd4dbd 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -3,6 +3,8 @@ import sys import os import copy +import logging +from sys import exc_info from contextlib import contextmanager from tempfile import NamedTemporaryFile from eventlet.green import socket @@ -99,15 +101,25 @@ def temptree(files, contents=''): rmtree(tempdir) -class FakeLogger(Handler): +class NullLoggingHandler(logging.Handler): + + def emit(self, record): + pass + + +class FakeLogger(object): # a thread safe logger def __init__(self, *args, **kwargs): - self.log_dict = dict(error=[], info=[], warning=[], debug=[]) + self._clear() self.level = logging.NOTSET if 'facility' in kwargs: self.facility = kwargs['facility'] + def _clear(self): + self.log_dict = dict( + error=[], info=[], warning=[], debug=[], exception=[]) + def error(self, *args, **kwargs): self.log_dict['error'].append((args, kwargs)) @@ -120,11 +132,21 @@ class FakeLogger(Handler): def debug(self, *args, **kwargs): self.log_dict['debug'].append((args, kwargs)) + def exception(self, *args, **kwargs): + self.log_dict['exception'].append((args, kwargs, str(exc_info()[1]))) + + # mock out the StatsD logging methods: + def set_statsd_prefix(self, *a, **kw): + pass + + increment = decrement = timing = timing_since = update_stats = \ + set_statsd_prefix + def setFormatter(self, obj): self.formatter = obj def close(self): - self.log_dict = dict(error=[], info=[], warning=[], debug=[]) + self._clear() def set_name(self, name): # don't touch _handlers @@ -151,6 +173,7 @@ class FakeLogger(Handler): def handleError(self, record): pass + original_syslog_handler = logging.handlers.SysLogHandler diff --git a/test/unit/common/test_manager.py b/test/unit/common/test_manager.py index be8db5975a..2e3058fef5 100644 --- a/test/unit/common/test_manager.py +++ b/test/unit/common/test_manager.py @@ -14,7 +14,6 @@ # limitations under the License. import unittest -from nose import SkipTest from test.unit import temptree import os diff --git a/test/unit/common/test_memcached.py b/test/unit/common/test_memcached.py index dff6e8073a..029a92c283 100644 --- a/test/unit/common/test_memcached.py +++ b/test/unit/common/test_memcached.py @@ -24,12 +24,7 @@ import unittest from uuid import uuid4 from swift.common import memcached - - -class NullLoggingHandler(logging.Handler): - - def emit(self, record): - pass +from test.unit import NullLoggingHandler class ExplodingMockMemcached(object): diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index c3eff86103..f21fd32ac9 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -17,14 +17,17 @@ from __future__ import with_statement from test.unit import temptree +import errno import logging import mimetools import os -import errno +import re import socket import sys import time import unittest +from threading import Thread +from Queue import Queue, Empty from getpass import getuser from shutil import rmtree from StringIO import StringIO @@ -326,6 +329,8 @@ class TestUtils(unittest.TestCase): 'test1\ntest3\ntest4\n') # make sure notice lvl logs by default logger.notice('test6') + self.assertEquals(sio.getvalue(), + 'test1\ntest3\ntest4\ntest6\n') def test_clean_logger_exception(self): # setup stream logging @@ -856,5 +861,261 @@ log_name = %(yarr)s''' self.assertFalse(utils.streq_const_time('ABC123', 'abc123')) +class TestStatsdLogging(unittest.TestCase): + def test_get_logger_statsd_client_not_specified(self): + logger = utils.get_logger({}, 'some-name', log_route='some-route') + # white-box construction validation + self.assertEqual(None, logger.logger.statsd_client) + + def test_get_logger_statsd_client_defaults(self): + logger = utils.get_logger({'log_statsd_host': 'some.host.com'}, + 'some-name', log_route='some-route') + # white-box construction validation + self.assert_(isinstance(logger.logger.statsd_client, utils.StatsdClient)) + self.assertEqual(logger.logger.statsd_client._host, 'some.host.com') + self.assertEqual(logger.logger.statsd_client._port, 8125) + self.assertEqual(logger.logger.statsd_client._prefix, 'some-name.') + self.assertEqual(logger.logger.statsd_client._default_sample_rate, 1) + + logger.set_statsd_prefix('some-name.more-specific') + self.assertEqual(logger.logger.statsd_client._prefix, + 'some-name.more-specific.') + logger.set_statsd_prefix('') + self.assertEqual(logger.logger.statsd_client._prefix, '') + + def test_get_logger_statsd_client_non_defaults(self): + logger = utils.get_logger({ + 'log_statsd_host': 'another.host.com', + 'log_statsd_port': 9876, + 'log_statsd_default_sample_rate': 0.75, + 'log_statsd_metric_prefix': 'tomato.sauce', + }, 'some-name', log_route='some-route') + self.assertEqual(logger.logger.statsd_client._prefix, + 'tomato.sauce.some-name.') + logger.set_statsd_prefix('some-name.more-specific') + self.assertEqual(logger.logger.statsd_client._prefix, + 'tomato.sauce.some-name.more-specific.') + logger.set_statsd_prefix('') + self.assertEqual(logger.logger.statsd_client._prefix, 'tomato.sauce.') + self.assertEqual(logger.logger.statsd_client._host, 'another.host.com') + self.assertEqual(logger.logger.statsd_client._port, 9876) + self.assertEqual(logger.logger.statsd_client._default_sample_rate, 0.75) + + +class TestStatsdLoggingDelegation(unittest.TestCase): + def setUp(self): + self.port = 9177 + self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.sock.bind(('localhost', self.port)) + self.queue = Queue() + self.reader_thread = Thread(target=self.statsd_reader) + self.reader_thread.setDaemon(1) + self.reader_thread.start() + + def tearDown(self): + # The "no-op when disabled" test doesn't set up a real logger, so + # create one here so we can tell the reader thread to stop. + if not getattr(self, 'logger', None): + self.logger = utils.get_logger({ + 'log_statsd_host': 'localhost', + 'log_statsd_port': str(self.port), + }, 'some-name') + self.logger.increment('STOP') + self.reader_thread.join(timeout=4) + self.sock.close() + del self.logger + time.sleep(0.15) # avoid occasional "Address already in use"? + + def statsd_reader(self): + while True: + try: + payload = self.sock.recv(4096) + if payload and 'STOP' in payload: + return 42 + self.queue.put(payload) + except Exception, e: + sys.stderr.write('statsd_reader thread: %r' % (e,)) + break + + def _send_and_get(self, sender_fn, *args, **kwargs): + """ + Because the client library may not actually send a packet with + sample_rate < 1, we keep trying until we get one through. + """ + got = None + while not got: + sender_fn(*args, **kwargs) + try: + got = self.queue.get(timeout=0.5) + except Empty: + pass + return got + + def assertStat(self, expected, sender_fn, *args, **kwargs): + got = self._send_and_get(sender_fn, *args, **kwargs) + return self.assertEqual(expected, got) + + def assertStatMatches(self, expected_regexp, sender_fn, *args, **kwargs): + got = self._send_and_get(sender_fn, *args, **kwargs) + return self.assert_(re.search(expected_regexp, got), + [got, expected_regexp]) + + def test_methods_are_no_ops_when_not_enabled(self): + logger = utils.get_logger({ + # No "log_statsd_host" means "disabled" + 'log_statsd_port': str(self.port), + }, 'some-name') + # Delegate methods are no-ops + self.assertEqual(None, logger.update_stats('foo', 88)) + self.assertEqual(None, logger.update_stats('foo', 88, 0.57)) + self.assertEqual(None, logger.update_stats('foo', 88, sample_rate=0.61)) + self.assertEqual(None, logger.increment('foo')) + self.assertEqual(None, logger.increment('foo', 0.57)) + self.assertEqual(None, logger.increment('foo', sample_rate=0.61)) + self.assertEqual(None, logger.decrement('foo')) + self.assertEqual(None, logger.decrement('foo', 0.57)) + self.assertEqual(None, logger.decrement('foo', sample_rate=0.61)) + self.assertEqual(None, logger.timing('foo', 88.048)) + self.assertEqual(None, logger.timing('foo', 88.57, 0.34)) + self.assertEqual(None, logger.timing('foo', 88.998, sample_rate=0.82)) + self.assertEqual(None, logger.timing_since('foo', 8938)) + self.assertEqual(None, logger.timing_since('foo', 8948, 0.57)) + self.assertEqual(None, logger.timing_since('foo', 849398, + sample_rate=0.61)) + # Now, the queue should be empty (no UDP packets sent) + self.assertRaises(Empty, self.queue.get_nowait) + + def test_delegate_methods_with_no_default_sample_rate(self): + self.logger = utils.get_logger({ + 'log_statsd_host': 'localhost', + 'log_statsd_port': str(self.port), + }, 'some-name') + self.assertStat('some-name.some.counter:1|c', self.logger.increment, + 'some.counter') + self.assertStat('some-name.some.counter:-1|c', self.logger.decrement, + 'some.counter') + self.assertStat('some-name.some.operation:4900.0|ms', + self.logger.timing, 'some.operation', 4.9 * 1000) + self.assertStatMatches('some-name\.another\.operation:\d+\.\d+\|ms', + self.logger.timing_since, 'another.operation', + time.time()) + self.assertStat('some-name.another.counter:42|c', + self.logger.update_stats, 'another.counter', 42) + + # Each call can override the sample_rate (also, bonus prefix test) + self.logger.set_statsd_prefix('pfx') + self.assertStat('pfx.some.counter:1|c|@0.972', self.logger.increment, + 'some.counter', sample_rate=0.972) + self.assertStat('pfx.some.counter:-1|c|@0.972', self.logger.decrement, + 'some.counter', sample_rate=0.972) + self.assertStat('pfx.some.operation:4900.0|ms|@0.972', + self.logger.timing, 'some.operation', 4.9 * 1000, + sample_rate=0.972) + self.assertStatMatches('pfx\.another\.op:\d+\.\d+\|ms|@0.972', + self.logger.timing_since, 'another.op', + time.time(), sample_rate=0.972) + self.assertStat('pfx.another.counter:3|c|@0.972', + self.logger.update_stats, 'another.counter', 3, + sample_rate=0.972) + + # Can override sample_rate with non-keyword arg + self.logger.set_statsd_prefix('') + self.assertStat('some.counter:1|c|@0.939', self.logger.increment, + 'some.counter', 0.939) + self.assertStat('some.counter:-1|c|@0.939', self.logger.decrement, + 'some.counter', 0.939) + self.assertStat('some.operation:4900.0|ms|@0.939', + self.logger.timing, 'some.operation', + 4.9 * 1000, 0.939) + self.assertStatMatches('another\.op:\d+\.\d+\|ms|@0.939', + self.logger.timing_since, 'another.op', + time.time(), 0.939) + self.assertStat('another.counter:3|c|@0.939', + self.logger.update_stats, 'another.counter', 3, 0.939) + + def test_delegate_methods_with_default_sample_rate(self): + self.logger = utils.get_logger({ + 'log_statsd_host': 'localhost', + 'log_statsd_port': str(self.port), + 'log_statsd_default_sample_rate': '0.93', + }, 'pfx') + self.assertStat('pfx.some.counter:1|c|@0.93', self.logger.increment, + 'some.counter') + self.assertStat('pfx.some.counter:-1|c|@0.93', self.logger.decrement, + 'some.counter') + self.assertStat('pfx.some.operation:4760.0|ms|@0.93', + self.logger.timing, 'some.operation', 4.76 * 1000) + self.assertStatMatches('pfx\.another\.op:\d+\.\d+\|ms|@0.93', + self.logger.timing_since, 'another.op', + time.time()) + self.assertStat('pfx.another.counter:3|c|@0.93', + self.logger.update_stats, 'another.counter', 3) + + # Each call can override the sample_rate + self.assertStat('pfx.some.counter:1|c|@0.9912', self.logger.increment, + 'some.counter', sample_rate=0.9912) + self.assertStat('pfx.some.counter:-1|c|@0.9912', self.logger.decrement, + 'some.counter', sample_rate=0.9912) + self.assertStat('pfx.some.operation:4900.0|ms|@0.9912', + self.logger.timing, 'some.operation', 4.9 * 1000, + sample_rate=0.9912) + self.assertStatMatches('pfx\.another\.op:\d+\.\d+\|ms|@0.9912', + self.logger.timing_since, 'another.op', + time.time(), sample_rate=0.9912) + self.assertStat('pfx.another.counter:3|c|@0.9912', + self.logger.update_stats, 'another.counter', 3, + sample_rate=0.9912) + + # Can override sample_rate with non-keyword arg + self.logger.set_statsd_prefix('') + self.assertStat('some.counter:1|c|@0.987654', self.logger.increment, + 'some.counter', 0.987654) + self.assertStat('some.counter:-1|c|@0.987654', self.logger.decrement, + 'some.counter', 0.987654) + self.assertStat('some.operation:4900.0|ms|@0.987654', + self.logger.timing, 'some.operation', + 4.9 * 1000, 0.987654) + self.assertStatMatches('another\.op:\d+\.\d+\|ms|@0.987654', + self.logger.timing_since, 'another.op', + time.time(), 0.987654) + self.assertStat('another.counter:3|c|@0.987654', + self.logger.update_stats, 'another.counter', + 3, 0.987654) + + def test_delegate_methods_with_metric_prefix(self): + self.logger = utils.get_logger({ + 'log_statsd_host': 'localhost', + 'log_statsd_port': str(self.port), + 'log_statsd_metric_prefix': 'alpha.beta', + }, 'pfx') + self.assertStat('alpha.beta.pfx.some.counter:1|c', + self.logger.increment, 'some.counter') + self.assertStat('alpha.beta.pfx.some.counter:-1|c', + self.logger.decrement, 'some.counter') + self.assertStat('alpha.beta.pfx.some.operation:4760.0|ms', + self.logger.timing, 'some.operation', 4.76 * 1000) + self.assertStatMatches( + 'alpha\.beta\.pfx\.another\.op:\d+\.\d+\|ms', + self.logger.timing_since, 'another.op', time.time()) + self.assertStat('alpha.beta.pfx.another.counter:3|c', + self.logger.update_stats, 'another.counter', 3) + + self.logger.set_statsd_prefix('') + self.assertStat('alpha.beta.some.counter:1|c|@0.9912', + self.logger.increment, 'some.counter', + sample_rate=0.9912) + self.assertStat('alpha.beta.some.counter:-1|c|@0.9912', + self.logger.decrement, 'some.counter', 0.9912) + self.assertStat('alpha.beta.some.operation:4900.0|ms|@0.9912', + self.logger.timing, 'some.operation', 4.9 * 1000, + sample_rate=0.9912) + self.assertStatMatches('alpha\.beta\.another\.op:\d+\.\d+\|ms|@0.9912', + self.logger.timing_since, 'another.op', + time.time(), sample_rate=0.9912) + self.assertStat('alpha.beta.another.counter:3|c|@0.9912', + self.logger.update_stats, 'another.counter', 3, + sample_rate=0.9912) + + if __name__ == '__main__': unittest.main() diff --git a/test/unit/container/test_sync.py b/test/unit/container/test_sync.py index 40b5020089..0e9612cf91 100644 --- a/test/unit/container/test_sync.py +++ b/test/unit/container/test_sync.py @@ -15,6 +15,8 @@ import unittest +import re +from test.unit import FakeLogger from swift.container import sync from swift.common import utils from swift.common.client import ClientException @@ -765,18 +767,6 @@ class TestContainerSync(unittest.TestCase): contents=None, proxy=None): raise ClientException('test client exception', http_status=401) - class FakeLogger(object): - - def __init__(self): - self.err = '' - self.exc = '' - - def info(self, err, *args, **kwargs): - self.err = err - - def exception(self, exc, *args, **kwargs): - self.exc = exc - sync.direct_get_object = fake_direct_get_object sync.put_object = fake_put_object cs.logger = FakeLogger() @@ -786,7 +776,8 @@ class TestContainerSync(unittest.TestCase): 'key', FakeContainerBroker('broker'), {'account': 'a', 'container': 'c'})) self.assertEquals(cs.container_puts, 2) - self.assertTrue(cs.logger.err.startswith('Unauth ')) + self.assert_(re.match('Unauth ', + cs.logger.log_dict['info'][0][0][0])) def fake_put_object(sync_to, name=None, headers=None, contents=None, proxy=None): @@ -794,12 +785,14 @@ class TestContainerSync(unittest.TestCase): sync.put_object = fake_put_object # Fail due to 404 + cs.logger = FakeLogger() self.assertFalse(cs.container_sync_row({'deleted': False, 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', 'key', FakeContainerBroker('broker'), {'account': 'a', 'container': 'c'})) self.assertEquals(cs.container_puts, 2) - self.assertTrue(cs.logger.err.startswith('Not found ')) + self.assert_(re.match('Not found ', + cs.logger.log_dict['info'][0][0][0])) def fake_put_object(sync_to, name=None, headers=None, contents=None, proxy=None): @@ -812,7 +805,9 @@ class TestContainerSync(unittest.TestCase): 'key', FakeContainerBroker('broker'), {'account': 'a', 'container': 'c'})) self.assertEquals(cs.container_puts, 2) - self.assertTrue(cs.logger.exc.startswith('ERROR Syncing ')) + self.assertTrue( + cs.logger.log_dict['exception'][0][0][0].startswith( + 'ERROR Syncing ')) finally: sync.shuffle = orig_shuffle sync.put_object = orig_put_object diff --git a/test/unit/obj/test_auditor.py b/test/unit/obj/test_auditor.py index a7429f7da3..9d9e4bfc92 100644 --- a/test/unit/obj/test_auditor.py +++ b/test/unit/obj/test_auditor.py @@ -309,11 +309,13 @@ class TestAuditor(unittest.TestCase): self.assertTrue(os.path.exists(ts_file_path)) def test_sleeper(self): - auditor.SLEEP_BETWEEN_AUDITS = 0.01 + auditor.SLEEP_BETWEEN_AUDITS = 0.10 my_auditor = auditor.ObjectAuditor(self.conf) start = time.time() my_auditor._sleep() - self.assertEquals(round(time.time() - start, 2), 0.01) + delta_t = time.time() - start + self.assert_(delta_t > 0.08) + self.assert_(delta_t < 0.12) def test_object_run_fast_track_zero_check_closed(self): rat = [False] diff --git a/test/unit/obj/test_expirer.py b/test/unit/obj/test_expirer.py index 2b5b9c2696..9cacce83b3 100644 --- a/test/unit/obj/test_expirer.py +++ b/test/unit/obj/test_expirer.py @@ -17,6 +17,7 @@ import json from sys import exc_info from time import time from unittest import main, TestCase +from test.unit import FakeLogger from swift.common import internal_client from swift.obj import expirer @@ -35,24 +36,8 @@ def not_sleep(seconds): last_not_sleep = seconds -class MockLogger(object): - - def __init__(self): - self.debugs = [] - self.infos = [] - self.exceptions = [] - - def debug(self, msg): - self.debugs.append(msg) - - def info(self, msg): - self.infos.append(msg) - - def exception(self, msg): - self.exceptions.append('%s: %s' % (msg, exc_info()[1])) - - class TestObjectExpirer(TestCase): + maxDiff = None def setUp(self): global not_sleep @@ -69,31 +54,35 @@ class TestObjectExpirer(TestCase): def test_report(self): x = expirer.ObjectExpirer({}) - x.logger = MockLogger() + x.logger = FakeLogger() - x.logger.infos = [] x.report() - self.assertEquals(x.logger.infos, []) + self.assertEquals(x.logger.log_dict['info'], []) - x.logger.infos = [] + x.logger._clear() x.report(final=True) - self.assertTrue('completed' in x.logger.infos[-1], x.logger.infos) - self.assertTrue('so far' not in x.logger.infos[-1], x.logger.infos) + self.assertTrue('completed' in x.logger.log_dict['info'][-1][0][0], + x.logger.log_dict['info']) + self.assertTrue('so far' not in x.logger.log_dict['info'][-1][0][0], + x.logger.log_dict['info']) - x.logger.infos = [] + x.logger._clear() x.report_last_time = time() - x.report_interval x.report() - self.assertTrue('completed' not in x.logger.infos[-1], x.logger.infos) - self.assertTrue('so far' in x.logger.infos[-1], x.logger.infos) + self.assertTrue('completed' not in x.logger.log_dict['info'][-1][0][0], + x.logger.log_dict['info']) + self.assertTrue('so far' in x.logger.log_dict['info'][-1][0][0], + x.logger.log_dict['info']) def test_run_once_nothing_to_do(self): x = expirer.ObjectExpirer({}) - x.logger = MockLogger() + x.logger = FakeLogger() x.swift = 'throw error because a string does not have needed methods' x.run_once() - self.assertEquals(x.logger.exceptions, - ["Unhandled exception: 'str' object has no attribute " - "'get_account_info'"]) + self.assertEquals(x.logger.log_dict['exception'], + [(("Unhandled exception",), {}, + "'str' object has no attribute " + "'get_account_info'")]) def test_run_once_calls_report(self): class InternalClient(object): @@ -104,13 +93,15 @@ class TestObjectExpirer(TestCase): return [] x = expirer.ObjectExpirer({}) - x.logger = MockLogger() + x.logger = FakeLogger() x.swift = InternalClient() x.run_once() - self.assertEquals(x.logger.exceptions, []) - self.assertEquals(x.logger.infos, - ['Pass beginning; 1 possible containers; 2 possible objects', - 'Pass completed in 0s; 0 objects expired']) + self.assertEquals(x.logger.log_dict['exception'], []) + self.assertEquals( + x.logger.log_dict['info'], + [(('Pass beginning; 1 possible containers; ' + '2 possible objects',), {}), + (('Pass completed in 0s; 0 objects expired',), {})]) def test_container_timestamp_break(self): class InternalClient(object): @@ -127,21 +118,24 @@ class TestObjectExpirer(TestCase): raise Exception('This should not have been called') x = expirer.ObjectExpirer({}) - x.logger = MockLogger() + x.logger = FakeLogger() x.swift = InternalClient([{'name': str(int(time() + 86400))}]) x.run_once() - self.assertEquals(x.logger.exceptions, []) - self.assertEquals(x.logger.infos, - ['Pass beginning; 1 possible containers; 2 possible objects', - 'Pass completed in 0s; 0 objects expired']) + self.assertEquals(x.logger.log_dict['exception'], []) + self.assertEquals( + x.logger.log_dict['info'], + [(('Pass beginning; 1 possible containers; ' + '2 possible objects',), {}), + (('Pass completed in 0s; 0 objects expired',), {})]) # Reverse test to be sure it still would blow up the way expected. x = expirer.ObjectExpirer({}) - x.logger = MockLogger() + x.logger = FakeLogger() x.swift = InternalClient([{'name': str(int(time() - 86400))}]) x.run_once() - self.assertEquals(x.logger.exceptions, - ['Unhandled exception: This should not have been called']) + self.assertEquals(x.logger.log_dict['exception'], + [(('Unhandled exception',), {}, + str(Exception('This should not have been called')))]) def test_object_timestamp_break(self): class InternalClient(object): @@ -165,27 +159,28 @@ class TestObjectExpirer(TestCase): raise Exception('This should not have been called') x = expirer.ObjectExpirer({}) - x.logger = MockLogger() + x.logger = FakeLogger() x.swift = InternalClient([{'name': str(int(time() - 86400))}], [{'name': '%d-actual-obj' % int(time() + 86400)}]) - x.delete_actual_object = should_not_be_called x.run_once() - self.assertEquals(x.logger.exceptions, []) - self.assertEquals(x.logger.infos, - ['Pass beginning; 1 possible containers; 2 possible objects', - 'Pass completed in 0s; 0 objects expired']) + self.assertEquals(x.logger.log_dict['exception'], []) + self.assertEquals(x.logger.log_dict['info'], + [(('Pass beginning; 1 possible containers; ' + '2 possible objects',), {}), + (('Pass completed in 0s; 0 objects expired',), {})]) # Reverse test to be sure it still would blow up the way expected. x = expirer.ObjectExpirer({}) - x.logger = MockLogger() + x.logger = FakeLogger() ts = int(time() - 86400) x.swift = InternalClient([{'name': str(int(time() - 86400))}], [{'name': '%d-actual-obj' % ts}]) x.delete_actual_object = should_not_be_called x.run_once() - self.assertEquals(x.logger.exceptions, ['Exception while deleting ' - 'object %d %d-actual-obj This should not have been called: This ' - 'should not have been called' % (ts, ts)]) + self.assertEquals(x.logger.log_dict['exception'], + [(('Exception while deleting object %d %d-actual-obj ' + 'This should not have been called' % (ts, ts),), {}, + 'This should not have been called')]) def test_failed_delete_keeps_entry(self): class InternalClient(object): @@ -215,31 +210,34 @@ class TestObjectExpirer(TestCase): raise Exception('This should not have been called') x = expirer.ObjectExpirer({}) - x.logger = MockLogger() + x.logger = FakeLogger() x.iter_containers = lambda: [str(int(time() - 86400))] ts = int(time() - 86400) x.delete_actual_object = deliberately_blow_up x.swift = InternalClient([{'name': str(int(time() - 86400))}], [{'name': '%d-actual-obj' % ts}]) x.run_once() - self.assertEquals(x.logger.exceptions, ['Exception while deleting ' - 'object %d %d-actual-obj failed to delete actual object: failed ' - 'to delete actual object' % (ts, ts)]) - self.assertEquals(x.logger.infos, - ['Pass beginning; 1 possible containers; 2 possible objects', - 'Pass completed in 0s; 0 objects expired']) + self.assertEquals(x.logger.log_dict['exception'], + [(('Exception while deleting object %d %d-actual-obj ' + 'failed to delete actual object' % (ts, ts),), {}, + 'failed to delete actual object')]) + self.assertEquals(x.logger.log_dict['info'], + [(('Pass beginning; 1 possible containers; ' + '2 possible objects',), {}), + (('Pass completed in 0s; 0 objects expired',), {})]) # Reverse test to be sure it still would blow up the way expected. x = expirer.ObjectExpirer({}) - x.logger = MockLogger() + x.logger = FakeLogger() ts = int(time() - 86400) x.delete_actual_object = lambda o, t: None x.swift = InternalClient([{'name': str(int(time() - 86400))}], [{'name': '%d-actual-obj' % ts}]) x.run_once() - self.assertEquals(x.logger.exceptions, ['Exception while deleting ' - 'object %d %d-actual-obj This should not have been called: This ' - 'should not have been called' % (ts, ts)]) + self.assertEquals(x.logger.log_dict['exception'], + [(('Exception while deleting object %d %d-actual-obj This should ' + 'not have been called' % (ts, ts),), {}, + 'This should not have been called')]) def test_success_gets_counted(self): class InternalClient(object): @@ -263,17 +261,18 @@ class TestObjectExpirer(TestCase): return self.objects x = expirer.ObjectExpirer({}) - x.logger = MockLogger() + x.logger = FakeLogger() x.delete_actual_object = lambda o, t: None self.assertEquals(x.report_objects, 0) x.swift = InternalClient([{'name': str(int(time() - 86400))}], [{'name': '%d-actual-obj' % int(time() - 86400)}]) x.run_once() self.assertEquals(x.report_objects, 1) - self.assertEquals(x.logger.exceptions, []) - self.assertEquals(x.logger.infos, - ['Pass beginning; 1 possible containers; 2 possible objects', - 'Pass completed in 0s; 1 objects expired']) + self.assertEquals(x.logger.log_dict['exception'], []) + self.assertEquals(x.logger.log_dict['info'], + [(('Pass beginning; 1 possible containers; ' + '2 possible objects',), {}), + (('Pass completed in 0s; 1 objects expired',), {})]) def test_failed_delete_continues_on(self): class InternalClient(object): @@ -300,7 +299,7 @@ class TestObjectExpirer(TestCase): raise Exception('failed to delete actual object') x = expirer.ObjectExpirer({}) - x.logger = MockLogger() + x.logger = FakeLogger() cts = int(time() - 86400) ots = int(time() - 86400) @@ -318,24 +317,29 @@ class TestObjectExpirer(TestCase): x.swift = InternalClient(containers, objects) x.delete_actual_object = fail_delete_actual_object x.run_once() - self.assertEquals(x.logger.exceptions, [ - 'Exception while deleting object %d %d-actual-obj failed to ' - 'delete actual object: failed to delete actual object' % - (cts, ots), - 'Exception while deleting object %d %d-next-obj failed to delete ' - 'actual object: failed to delete actual object' % (cts, ots), - 'Exception while deleting container %d failed to delete ' - 'container: failed to delete container' % cts, - 'Exception while deleting object %d %d-actual-obj failed to ' - 'delete actual object: failed to delete actual object' % - (cts + 1, ots), - 'Exception while deleting object %d %d-next-obj failed to delete ' - 'actual object: failed to delete actual object' % (cts + 1, ots), - 'Exception while deleting container %d failed to delete ' - 'container: failed to delete container' % (cts + 1)]) - self.assertEquals(x.logger.infos, - ['Pass beginning; 1 possible containers; 2 possible objects', - 'Pass completed in 0s; 0 objects expired']) + self.assertEquals(x.logger.log_dict['exception'], [ + (('Exception while deleting object %d %d-actual-obj failed to ' + 'delete actual object' % (cts, ots),), {}, + 'failed to delete actual object'), + (('Exception while deleting object %d %d-next-obj failed to ' + 'delete actual object' % (cts, ots),), {}, + 'failed to delete actual object'), + (('Exception while deleting container %d failed to delete ' + 'container' % (cts,),), {}, + 'failed to delete container'), + (('Exception while deleting object %d %d-actual-obj failed to ' + 'delete actual object' % (cts + 1, ots),), {}, + 'failed to delete actual object'), + (('Exception while deleting object %d %d-next-obj failed to ' + 'delete actual object' % (cts + 1, ots),), {}, + 'failed to delete actual object'), + (('Exception while deleting container %d failed to delete ' + 'container' % (cts + 1,),), {}, + 'failed to delete container')]) + self.assertEquals(x.logger.log_dict['info'], + [(('Pass beginning; 1 possible containers; ' + '2 possible objects',), {}), + (('Pass completed in 0s; 0 objects expired',), {})]) def test_run_forever_initial_sleep_random(self): global last_not_sleep @@ -372,7 +376,7 @@ class TestObjectExpirer(TestCase): raise SystemExit('exiting exception %d' % raises[0]) x = expirer.ObjectExpirer({}) - x.logger = MockLogger() + x.logger = FakeLogger() orig_sleep = expirer.sleep exc = None try: @@ -384,8 +388,9 @@ class TestObjectExpirer(TestCase): finally: expirer.sleep = orig_sleep self.assertEquals(str(err), 'exiting exception 2') - self.assertEquals(x.logger.exceptions, - ['Unhandled exception: exception 1']) + self.assertEquals(x.logger.log_dict['exception'], + [(('Unhandled exception',), {}, + 'exception 1')]) def test_delete_actual_object(self): got_env = [None] diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index 075700e62c..c93ba67fcc 100644 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -20,7 +20,6 @@ import os import sys import shutil import unittest -from nose import SkipTest from shutil import rmtree from StringIO import StringIO from time import gmtime, sleep, strftime, time diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 5f9b8e4d61..ccdace72c8 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -20,7 +20,6 @@ from logging.handlers import SysLogHandler import os import sys import unittest -from nose import SkipTest from ConfigParser import ConfigParser from contextlib import contextmanager from cStringIO import StringIO @@ -38,7 +37,7 @@ import simplejson from webob import Request, Response from webob.exc import HTTPNotFound, HTTPUnauthorized -from test.unit import connect_tcp, readuntil2crlfs +from test.unit import connect_tcp, readuntil2crlfs, FakeLogger from swift.proxy import server as proxy_server from swift.account import server as account_server from swift.container import server as container_server @@ -310,12 +309,6 @@ class FakeMemcacheReturnsNone(FakeMemcache): return None -class NullLoggingHandler(logging.Handler): - - def emit(self, record): - pass - - @contextmanager def save_globals(): orig_http_connect = getattr(proxy_server, 'http_connect', None) @@ -727,7 +720,7 @@ class TestProxyServer(unittest.TestCase): swift_dir = mkdtemp() try: baseapp = proxy_server.BaseApplication({'swift_dir': swift_dir}, - FakeMemcache(), NullLoggingHandler(), FakeRing(), FakeRing(), + FakeMemcache(), FakeLogger(), FakeRing(), FakeRing(), FakeRing()) resp = baseapp.handle_request( Request.blank('/', environ={'CONTENT_LENGTH': '-1'})) @@ -745,7 +738,7 @@ class TestProxyServer(unittest.TestCase): try: baseapp = proxy_server.BaseApplication({'swift_dir': swift_dir, 'deny_host_headers': 'invalid_host.com'}, - FakeMemcache(), NullLoggingHandler(), FakeRing(), FakeRing(), + FakeMemcache(), FakeLogger(), FakeRing(), FakeRing(), FakeRing()) resp = baseapp.handle_request( Request.blank('/v1/a/c/o', @@ -822,7 +815,7 @@ class TestObjectController(unittest.TestCase): with open(os.path.join(swift_dir, 'mime.types'), 'w') as fp: fp.write('foo/bar foo\n') ba = proxy_server.BaseApplication({'swift_dir': swift_dir}, - FakeMemcache(), NullLoggingHandler(), FakeRing(), FakeRing(), + FakeMemcache(), FakeLogger(), FakeRing(), FakeRing(), FakeRing()) self.assertEquals(proxy_server.mimetypes.guess_type('blah.foo')[0], 'foo/bar') @@ -2148,13 +2141,8 @@ class TestObjectController(unittest.TestCase): (prolis, acc1lis, acc2lis, con2lis, con2lis, obj1lis, obj2lis) = \ _test_sockets - class Logger(object): - - def info(self, msg): - self.msg = msg - orig_logger, orig_access_logger = prosrv.logger, prosrv.access_logger - prosrv.logger = prosrv.access_logger = Logger() + prosrv.logger = prosrv.access_logger = FakeLogger() sock = connect_tcp(('localhost', prolis.getsockname()[1])) fd = sock.makefile() fd.write( @@ -2167,7 +2155,8 @@ class TestObjectController(unittest.TestCase): exp = 'HTTP/1.1 200' self.assertEquals(headers[:len(exp)], exp) exp = '127.0.0.1 127.0.0.1' - self.assert_(exp in prosrv.logger.msg) + self.assertEquals(prosrv.logger.log_dict['exception'], []) + self.assert_(exp in prosrv.logger.log_dict['info'][-1][0][0]) def test_chunked_put_logging(self): # GET account with a query string to test that @@ -2178,13 +2167,8 @@ class TestObjectController(unittest.TestCase): (prolis, acc1lis, acc2lis, con2lis, con2lis, obj1lis, obj2lis) = \ _test_sockets - class Logger(object): - - def info(self, msg): - self.msg = msg - orig_logger, orig_access_logger = prosrv.logger, prosrv.access_logger - prosrv.logger = prosrv.access_logger = Logger() + prosrv.logger = prosrv.access_logger = FakeLogger() sock = connect_tcp(('localhost', prolis.getsockname()[1])) fd = sock.makefile() fd.write( @@ -2196,13 +2180,14 @@ class TestObjectController(unittest.TestCase): headers = readuntil2crlfs(fd) exp = 'HTTP/1.1 200' self.assertEquals(headers[:len(exp)], exp) - self.assert_('/v1/a%3Fformat%3Djson' in prosrv.logger.msg, - prosrv.logger.msg) + got_log_msg = prosrv.logger.log_dict['info'][-1][0][0] + self.assert_('/v1/a%3Fformat%3Djson' in got_log_msg, + prosrv.logger.log_dict) exp = 'host1' - self.assertEquals(prosrv.logger.msg[:len(exp)], exp) + self.assertEquals(got_log_msg[:len(exp)], exp) # Turn on header logging. - prosrv.logger = prosrv.access_logger = Logger() + prosrv.logger = prosrv.access_logger = FakeLogger() prosrv.log_headers = True sock = connect_tcp(('localhost', prolis.getsockname()[1])) fd = sock.makefile() @@ -2213,8 +2198,9 @@ class TestObjectController(unittest.TestCase): headers = readuntil2crlfs(fd) exp = 'HTTP/1.1 200' self.assertEquals(headers[:len(exp)], exp) - self.assert_('Goofy-Header%3A%20True' in prosrv.logger.msg, - prosrv.logger.msg) + self.assert_('Goofy-Header%3A%20True' in + prosrv.logger.log_dict['info'][-1][0][0], + prosrv.logger.log_dict) prosrv.log_headers = False prosrv.logger, prosrv.access_logger = orig_logger, orig_access_logger