Replace slowdown option with *_per_second option
container and object updaters sleeps "slowdown" (default 0.01) seconds after every processed container/object. Because time.sleep call adds overhead, use ratelimit_sleep from common.utils instead. Same as in auditor. Change-Id: I362aa0f13c78ad03ce1f76ee0257b0646f981212
This commit is contained in:
parent
55075ea8be
commit
a8bc94c7e3
@ -358,8 +358,10 @@ Number of reaper workers to spawn. The default is 4.
|
||||
Request timeout to external services. The default is 3 seconds.
|
||||
.IP \fBconn_timeout\fR
|
||||
Connection timeout to external services. The default is 0.5 seconds.
|
||||
.IP \fBcontainers_per_second\fR
|
||||
Maximum containers updated per second. Should be tuned according to individual system specs. 0 is unlimited. The default is 50.
|
||||
.IP \fBslowdown\fR
|
||||
Slowdown will sleep that amount between containers. The default is 0.01 seconds.
|
||||
Slowdown will sleep that amount between containers. The default is 0.01 seconds. Deprecated in favor of containers_per_second
|
||||
.IP \fBaccount_suppression_time\fR
|
||||
Seconds to suppress updating an account that has generated an error. The default is 60 seconds.
|
||||
.IP \fBrecon_cache_path\fR
|
||||
|
@ -500,8 +500,10 @@ Minimum time for a pass to take. The default is 300 seconds.
|
||||
Number of reaper workers to spawn. The default is 1.
|
||||
.IP \fBnode_timeout\fR
|
||||
Request timeout to external services. The default is 10 seconds.
|
||||
.IP \fBobjects_per_second\fR
|
||||
Maximum objects updated per second. Should be tuned according to individual system specs. 0 is unlimited. The default is 50.
|
||||
.IP \fBslowdown\fR
|
||||
Slowdown will sleep that amount between objects. The default is 0.01 seconds.
|
||||
Slowdown will sleep that amount between objects. The default is 0.01 seconds. Deprecated in favor of objects_per_second.
|
||||
.IP "\fBrecon_cache_path\fR"
|
||||
The recon_cache_path simply sets the directory where stats for a few items will be stored.
|
||||
Depending on the method of deployment you may need to create this directory manually
|
||||
|
@ -772,9 +772,9 @@ ionice_priority None I/O scheduling priority o
|
||||
|
||||
[object-updater]
|
||||
|
||||
================== =================== ==========================================
|
||||
=================== =================== ==========================================
|
||||
Option Default Description
|
||||
------------------ ------------------- ------------------------------------------
|
||||
------------------- ------------------- ------------------------------------------
|
||||
log_name object-updater Label used when logging
|
||||
log_facility LOG_LOCAL0 Syslog log facility
|
||||
log_level INFO Logging level
|
||||
@ -785,7 +785,11 @@ node_timeout DEFAULT or 10 Request timeout to external services. Th
|
||||
uses what's set here, or what's set in the
|
||||
DEFAULT section, or 10 (though other
|
||||
sections use 3 as the final default).
|
||||
slowdown 0.01 Time in seconds to wait between objects
|
||||
objects_per_second 50 Maximum objects updated per second.
|
||||
Should be tuned according to individual
|
||||
system specs. 0 is unlimited.
|
||||
slowdown 0.01 Time in seconds to wait between objects.
|
||||
Deprecated in favor of objects_per_second.
|
||||
recon_cache_path /var/cache/swift Path to recon cache
|
||||
nice_priority None Scheduling priority of server processes.
|
||||
Niceness values range from -20 (most
|
||||
@ -808,7 +812,7 @@ ionice_priority None I/O scheduling priority of server
|
||||
priority of the process. Work only with
|
||||
ionice_class.
|
||||
Ignored if IOPRIO_CLASS_IDLE is set.
|
||||
================== =================== ==========================================
|
||||
=================== =================== ==========================================
|
||||
|
||||
[object-auditor]
|
||||
|
||||
@ -1122,8 +1126,13 @@ node_timeout 3 Request timeout to external
|
||||
services
|
||||
conn_timeout 0.5 Connection timeout to external
|
||||
services
|
||||
containers_per_second 50 Maximum containers updated per second.
|
||||
Should be tuned according to individual
|
||||
system specs. 0 is unlimited.
|
||||
|
||||
slowdown 0.01 Time in seconds to wait between
|
||||
containers
|
||||
containers. Deprecated in favor of
|
||||
containers_per_second.
|
||||
account_suppression_time 60 Seconds to suppress updating an
|
||||
account that has generated an
|
||||
error (timeout, not yet found,
|
||||
|
@ -185,7 +185,11 @@ use = egg:swift#recon
|
||||
# node_timeout = 3
|
||||
# conn_timeout = 0.5
|
||||
#
|
||||
# slowdown will sleep that amount between containers
|
||||
# Send at most this many container updates per second
|
||||
# containers_per_second = 50
|
||||
#
|
||||
# slowdown will sleep that amount between containers. Deprecated; use
|
||||
# containers_per_second instead.
|
||||
# slowdown = 0.01
|
||||
#
|
||||
# Seconds to suppress updating an account that has generated an error
|
||||
|
@ -333,7 +333,12 @@ use = egg:swift#recon
|
||||
# interval = 300
|
||||
# concurrency = 1
|
||||
# node_timeout = <whatever's in the DEFAULT section or 10>
|
||||
# slowdown will sleep that amount between objects
|
||||
#
|
||||
# Send at most this many object updates per second
|
||||
# objects_per_second = 50
|
||||
#
|
||||
# slowdown will sleep that amount between objects. Deprecated; use
|
||||
# objects_per_second instead.
|
||||
# slowdown = 0.01
|
||||
#
|
||||
# recon_cache_path = /var/cache/swift
|
||||
|
@ -31,7 +31,7 @@ from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.exceptions import ConnectionTimeout
|
||||
from swift.common.ring import Ring
|
||||
from swift.common.utils import get_logger, config_true_value, ismount, \
|
||||
dump_recon_cache, majority_size, Timestamp
|
||||
dump_recon_cache, majority_size, Timestamp, ratelimit_sleep
|
||||
from swift.common.daemon import Daemon
|
||||
from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR
|
||||
|
||||
@ -48,7 +48,19 @@ class ContainerUpdater(Daemon):
|
||||
self.interval = int(conf.get('interval', 300))
|
||||
self.account_ring = None
|
||||
self.concurrency = int(conf.get('concurrency', 4))
|
||||
self.slowdown = float(conf.get('slowdown', 0.01))
|
||||
if 'slowdown' in conf:
|
||||
self.logger.warning(
|
||||
'The slowdown option is deprecated in favor of '
|
||||
'containers_per_second. This option may be ignored in a '
|
||||
'future release.')
|
||||
containers_per_second = 1 / (
|
||||
float(conf.get('slowdown', '0.01')) + 0.01)
|
||||
else:
|
||||
containers_per_second = 50
|
||||
self.containers_running_time = 0
|
||||
self.max_containers_per_second = \
|
||||
float(conf.get('containers_per_second',
|
||||
containers_per_second))
|
||||
self.node_timeout = float(conf.get('node_timeout', 3))
|
||||
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
|
||||
self.no_changes = 0
|
||||
@ -206,7 +218,10 @@ class ContainerUpdater(Daemon):
|
||||
for file in files:
|
||||
if file.endswith('.db'):
|
||||
self.process_container(os.path.join(root, file))
|
||||
time.sleep(self.slowdown)
|
||||
|
||||
self.containers_running_time = ratelimit_sleep(
|
||||
self.containers_running_time,
|
||||
self.max_containers_per_second)
|
||||
|
||||
def process_container(self, dbfile):
|
||||
"""
|
||||
|
@ -27,7 +27,7 @@ from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.exceptions import ConnectionTimeout
|
||||
from swift.common.ring import Ring
|
||||
from swift.common.utils import get_logger, renamer, write_pickle, \
|
||||
dump_recon_cache, config_true_value, ismount
|
||||
dump_recon_cache, config_true_value, ismount, ratelimit_sleep
|
||||
from swift.common.daemon import Daemon
|
||||
from swift.common.header_key_dict import HeaderKeyDict
|
||||
from swift.common.storage_policy import split_policy_string, PolicyError
|
||||
@ -47,7 +47,19 @@ class ObjectUpdater(Daemon):
|
||||
self.interval = int(conf.get('interval', 300))
|
||||
self.container_ring = None
|
||||
self.concurrency = int(conf.get('concurrency', 1))
|
||||
self.slowdown = float(conf.get('slowdown', 0.01))
|
||||
if 'slowdown' in conf:
|
||||
self.logger.warning(
|
||||
'The slowdown option is deprecated in favor of '
|
||||
'objects_per_second. This option may be ignored in a '
|
||||
'future release.')
|
||||
objects_per_second = 1 / (
|
||||
float(conf.get('slowdown', '0.01')) + 0.01)
|
||||
else:
|
||||
objects_per_second = 50
|
||||
self.objects_running_time = 0
|
||||
self.max_objects_per_second = \
|
||||
float(conf.get('objects_per_second',
|
||||
objects_per_second))
|
||||
self.node_timeout = float(conf.get('node_timeout', 10))
|
||||
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
|
||||
self.successes = 0
|
||||
@ -189,7 +201,10 @@ class ObjectUpdater(Daemon):
|
||||
self.process_object_update(update_path, device,
|
||||
policy)
|
||||
last_obj_hash = obj_hash
|
||||
time.sleep(self.slowdown)
|
||||
|
||||
self.objects_running_time = ratelimit_sleep(
|
||||
self.objects_running_time,
|
||||
self.max_objects_per_second)
|
||||
try:
|
||||
os.rmdir(prefix_path)
|
||||
except OSError:
|
||||
|
@ -83,6 +83,48 @@ class TestContainerUpdater(unittest.TestCase):
|
||||
self.assertEqual(cu.account_suppression_time, 0)
|
||||
self.assertTrue(cu.get_account_ring() is not None)
|
||||
|
||||
def test_conf_params(self):
|
||||
# defaults
|
||||
daemon = container_updater.ContainerUpdater({})
|
||||
self.assertEqual(daemon.devices, '/srv/node')
|
||||
self.assertEqual(daemon.mount_check, True)
|
||||
self.assertEqual(daemon.swift_dir, '/etc/swift')
|
||||
self.assertEqual(daemon.interval, 300)
|
||||
self.assertEqual(daemon.concurrency, 4)
|
||||
self.assertEqual(daemon.max_containers_per_second, 50.0)
|
||||
|
||||
# non-defaults
|
||||
conf = {
|
||||
'devices': '/some/where/else',
|
||||
'mount_check': 'huh?',
|
||||
'swift_dir': '/not/here',
|
||||
'interval': '600',
|
||||
'concurrency': '2',
|
||||
'containers_per_second': '10.5',
|
||||
}
|
||||
daemon = container_updater.ContainerUpdater(conf)
|
||||
self.assertEqual(daemon.devices, '/some/where/else')
|
||||
self.assertEqual(daemon.mount_check, False)
|
||||
self.assertEqual(daemon.swift_dir, '/not/here')
|
||||
self.assertEqual(daemon.interval, 600)
|
||||
self.assertEqual(daemon.concurrency, 2)
|
||||
self.assertEqual(daemon.max_containers_per_second, 10.5)
|
||||
|
||||
# check deprecated option
|
||||
daemon = container_updater.ContainerUpdater({'slowdown': '0.04'})
|
||||
self.assertEqual(daemon.max_containers_per_second, 20.0)
|
||||
|
||||
def check_bad(conf):
|
||||
with self.assertRaises(ValueError):
|
||||
container_updater.ContainerUpdater(conf)
|
||||
|
||||
check_bad({'interval': 'foo'})
|
||||
check_bad({'interval': '300.0'})
|
||||
check_bad({'concurrency': 'bar'})
|
||||
check_bad({'concurrency': '1.0'})
|
||||
check_bad({'slowdown': 'baz'})
|
||||
check_bad({'containers_per_second': 'quux'})
|
||||
|
||||
@mock.patch.object(container_updater, 'ismount')
|
||||
@mock.patch.object(container_updater.ContainerUpdater, 'container_sweep')
|
||||
def test_run_once_with_device_unmounted(self, mock_sweep, mock_ismount):
|
||||
|
@ -92,6 +92,49 @@ class TestObjectUpdater(unittest.TestCase):
|
||||
self.assertEqual(ou.node_timeout, 5.5)
|
||||
self.assertTrue(ou.get_container_ring() is not None)
|
||||
|
||||
def test_conf_params(self):
|
||||
# defaults
|
||||
daemon = object_updater.ObjectUpdater({}, logger=self.logger)
|
||||
self.assertEqual(daemon.devices, '/srv/node')
|
||||
self.assertEqual(daemon.mount_check, True)
|
||||
self.assertEqual(daemon.swift_dir, '/etc/swift')
|
||||
self.assertEqual(daemon.interval, 300)
|
||||
self.assertEqual(daemon.concurrency, 1)
|
||||
self.assertEqual(daemon.max_objects_per_second, 50.0)
|
||||
|
||||
# non-defaults
|
||||
conf = {
|
||||
'devices': '/some/where/else',
|
||||
'mount_check': 'huh?',
|
||||
'swift_dir': '/not/here',
|
||||
'interval': '600',
|
||||
'concurrency': '2',
|
||||
'objects_per_second': '10.5',
|
||||
}
|
||||
daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
|
||||
self.assertEqual(daemon.devices, '/some/where/else')
|
||||
self.assertEqual(daemon.mount_check, False)
|
||||
self.assertEqual(daemon.swift_dir, '/not/here')
|
||||
self.assertEqual(daemon.interval, 600)
|
||||
self.assertEqual(daemon.concurrency, 2)
|
||||
self.assertEqual(daemon.max_objects_per_second, 10.5)
|
||||
|
||||
# check deprecated option
|
||||
daemon = object_updater.ObjectUpdater({'slowdown': '0.04'},
|
||||
logger=self.logger)
|
||||
self.assertEqual(daemon.max_objects_per_second, 20.0)
|
||||
|
||||
def check_bad(conf):
|
||||
with self.assertRaises(ValueError):
|
||||
object_updater.ObjectUpdater(conf, logger=self.logger)
|
||||
|
||||
check_bad({'interval': 'foo'})
|
||||
check_bad({'interval': '300.0'})
|
||||
check_bad({'concurrency': 'bar'})
|
||||
check_bad({'concurrency': '1.0'})
|
||||
check_bad({'slowdown': 'baz'})
|
||||
check_bad({'objects_per_second': 'quux'})
|
||||
|
||||
@mock.patch('os.listdir')
|
||||
def test_listdir_with_exception(self, mock_listdir):
|
||||
e = OSError('permission_denied')
|
||||
|
Loading…
x
Reference in New Issue
Block a user