Merge "Add backend rate limiting middleware"
This commit is contained in:
commit
24acc6e56b
@ -102,6 +102,13 @@ AWS S3 Api
|
||||
:members:
|
||||
:show-inheritance:
|
||||
|
||||
Backend Ratelimit
|
||||
=================
|
||||
|
||||
.. automodule:: swift.common.middleware.backend_ratelimit
|
||||
:members:
|
||||
:show-inheritance:
|
||||
|
||||
.. _bulk:
|
||||
|
||||
Bulk Operations (Delete and Archive Auto Extraction)
|
||||
|
@ -80,7 +80,7 @@ bind_port = 6202
|
||||
# ionice_priority =
|
||||
|
||||
[pipeline:main]
|
||||
pipeline = healthcheck recon account-server
|
||||
pipeline = healthcheck recon backend_ratelimit account-server
|
||||
|
||||
[app:account-server]
|
||||
use = egg:swift#account
|
||||
@ -128,6 +128,18 @@ use = egg:swift#healthcheck
|
||||
use = egg:swift#recon
|
||||
# recon_cache_path = /var/cache/swift
|
||||
|
||||
[filter:backend_ratelimit]
|
||||
use = egg:swift#backend_ratelimit
|
||||
# Set the maximum rate of requests per second per device per worker. Beyond
|
||||
# this rate the server will return 529 responses and emit a 'backend.ratelimit'
|
||||
# statsd metric without logging. The default value of zero causes no
|
||||
# rate-limiting to be applied.
|
||||
# requests_per_device_per_second = 0.0
|
||||
#
|
||||
# Set the number of seconds of unused rate-limiting allowance that can
|
||||
# accumulate and be used to allow a subsequent burst of requests.
|
||||
# requests_per_device_rate_buffer = 1.0
|
||||
|
||||
[account-replicator]
|
||||
# You can override the default log routing for this app here (don't use set!):
|
||||
# log_name = account-replicator
|
||||
|
@ -86,7 +86,7 @@ bind_port = 6201
|
||||
# ionice_priority =
|
||||
|
||||
[pipeline:main]
|
||||
pipeline = healthcheck recon container-server
|
||||
pipeline = healthcheck recon backend_ratelimit container-server
|
||||
|
||||
[app:container-server]
|
||||
use = egg:swift#container
|
||||
@ -138,6 +138,18 @@ use = egg:swift#healthcheck
|
||||
use = egg:swift#recon
|
||||
#recon_cache_path = /var/cache/swift
|
||||
|
||||
[filter:backend_ratelimit]
|
||||
use = egg:swift#backend_ratelimit
|
||||
# Set the maximum rate of requests per second per device per worker. Beyond
|
||||
# this rate the server will return 529 responses and emit a 'backend.ratelimit'
|
||||
# statsd metric without logging. The default value of zero causes no
|
||||
# rate-limiting to be applied.
|
||||
# requests_per_device_per_second = 0.0
|
||||
#
|
||||
# Set the number of seconds of unused rate-limiting allowance that can
|
||||
# accumulate and be used to allow a subsequent burst of requests.
|
||||
# requests_per_device_rate_buffer = 1.0
|
||||
|
||||
[container-replicator]
|
||||
# You can override the default log routing for this app here (don't use set!):
|
||||
# log_name = container-replicator
|
||||
|
@ -117,7 +117,7 @@ bind_port = 6200
|
||||
# ionice_priority =
|
||||
|
||||
[pipeline:main]
|
||||
pipeline = healthcheck recon object-server
|
||||
pipeline = healthcheck recon backend_ratelimit object-server
|
||||
|
||||
[app:object-server]
|
||||
use = egg:swift#object
|
||||
@ -237,6 +237,18 @@ use = egg:swift#recon
|
||||
#recon_cache_path = /var/cache/swift
|
||||
#recon_lock_path = /var/lock
|
||||
|
||||
[filter:backend_ratelimit]
|
||||
use = egg:swift#backend_ratelimit
|
||||
# Set the maximum rate of requests per second per device per worker. Beyond
|
||||
# this rate the server will return 529 responses and emit a 'backend.ratelimit'
|
||||
# statsd metric without logging. The default value of zero causes no
|
||||
# rate-limiting to be applied.
|
||||
# requests_per_device_per_second = 0.0
|
||||
#
|
||||
# Set the number of seconds of unused rate-limiting allowance that can
|
||||
# accumulate and be used to allow a subsequent burst of requests.
|
||||
# requests_per_device_rate_buffer = 1.0
|
||||
|
||||
[object-replicator]
|
||||
# You can override the default log routing for this app here (don't use set!):
|
||||
# log_name = object-replicator
|
||||
|
@ -98,6 +98,7 @@ paste.filter_factory =
|
||||
memcache = swift.common.middleware.memcache:filter_factory
|
||||
read_only = swift.common.middleware.read_only:filter_factory
|
||||
ratelimit = swift.common.middleware.ratelimit:filter_factory
|
||||
backend_ratelimit = swift.common.middleware.backend_ratelimit:filter_factory
|
||||
cname_lookup = swift.common.middleware.cname_lookup:filter_factory
|
||||
catch_errors = swift.common.middleware.catch_errors:filter_factory
|
||||
domain_remap = swift.common.middleware.domain_remap:filter_factory
|
||||
|
86
swift/common/middleware/backend_ratelimit.py
Normal file
86
swift/common/middleware/backend_ratelimit.py
Normal file
@ -0,0 +1,86 @@
|
||||
# Copyright (c) 2022 NVIDIA
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import time
|
||||
from collections import defaultdict
|
||||
|
||||
from swift.common.request_helpers import split_and_validate_path
|
||||
from swift.common.swob import Request, HTTPTooManyBackendRequests
|
||||
from swift.common.utils import get_logger, non_negative_float, \
|
||||
EventletRateLimiter
|
||||
|
||||
RATE_LIMITED_METHODS = ('GET', 'HEAD', 'PUT', 'POST', 'DELETE', 'UPDATE',
|
||||
'REPLICATE')
|
||||
|
||||
|
||||
class BackendRateLimitMiddleware(object):
|
||||
"""
|
||||
Backend rate-limiting middleware.
|
||||
|
||||
Rate-limits requests to backend storage node devices. Each device is
|
||||
independently rate-limited. All requests with a 'GET', 'HEAD', 'PUT',
|
||||
'POST', 'DELETE', 'UPDATE' or 'REPLICATE' method are included in a device's
|
||||
rate limit.
|
||||
|
||||
If a request would cause the rate-limit to be exceeded then a response with
|
||||
a 529 status code is returned.
|
||||
"""
|
||||
def __init__(self, app, conf, logger=None):
|
||||
self.app = app
|
||||
self.logger = logger or get_logger(conf, log_route='backend_ratelimit')
|
||||
self.requests_per_device_per_second = non_negative_float(
|
||||
conf.get('requests_per_device_per_second', 0.0))
|
||||
self.requests_per_device_rate_buffer = non_negative_float(
|
||||
conf.get('requests_per_device_rate_buffer', 1.0))
|
||||
|
||||
# map device -> RateLimiter
|
||||
self.rate_limiters = defaultdict(
|
||||
lambda: EventletRateLimiter(
|
||||
max_rate=self.requests_per_device_per_second,
|
||||
rate_buffer=self.requests_per_device_rate_buffer,
|
||||
running_time=time.time(),
|
||||
burst_after_idle=True))
|
||||
|
||||
def __call__(self, env, start_response):
|
||||
"""
|
||||
WSGI entry point.
|
||||
|
||||
:param env: WSGI environment dictionary
|
||||
:param start_response: WSGI callable
|
||||
"""
|
||||
req = Request(env)
|
||||
handler = self.app
|
||||
if req.method in RATE_LIMITED_METHODS:
|
||||
try:
|
||||
device, partition, _ = split_and_validate_path(req, 1, 3, True)
|
||||
int(partition) # check it's a valid partition
|
||||
rate_limiter = self.rate_limiters[device]
|
||||
if not rate_limiter.is_allowed():
|
||||
self.logger.increment('backend.ratelimit')
|
||||
handler = HTTPTooManyBackendRequests()
|
||||
except Exception: # noqa
|
||||
# request may not have device/partition e.g. a healthcheck req
|
||||
pass
|
||||
return handler(env, start_response)
|
||||
|
||||
|
||||
def filter_factory(global_conf, **local_conf):
|
||||
conf = global_conf.copy()
|
||||
conf.update(local_conf)
|
||||
|
||||
def backend_ratelimit_filter(app):
|
||||
return BackendRateLimitMiddleware(app, conf)
|
||||
|
||||
return backend_ratelimit_filter
|
@ -114,6 +114,8 @@ RESPONSE_REASONS = {
|
||||
'backend server.'),
|
||||
507: ('Insufficient Storage', 'There was not enough space to save the '
|
||||
'resource. Drive: %(drive)s'),
|
||||
529: ('Too Many Backend Requests', 'The server is incapable of performing '
|
||||
'the requested operation due to too many requests. Slow down.')
|
||||
}
|
||||
|
||||
MAX_RANGE_OVERLAPS = 2
|
||||
@ -1619,3 +1621,4 @@ HTTPNotImplemented = status_map[501]
|
||||
HTTPBadGateway = status_map[502]
|
||||
HTTPServiceUnavailable = status_map[503]
|
||||
HTTPInsufficientStorage = status_map[507]
|
||||
HTTPTooManyBackendRequests = status_map[529]
|
||||
|
@ -325,9 +325,13 @@ def non_negative_float(value):
|
||||
:raises ValueError: if the value cannot be cast to a float or is negative.
|
||||
:return: a float
|
||||
"""
|
||||
value = float(value)
|
||||
if value < 0:
|
||||
raise ValueError
|
||||
try:
|
||||
value = float(value)
|
||||
if value < 0:
|
||||
raise ValueError
|
||||
except (TypeError, ValueError):
|
||||
raise ValueError('Value must be a non-negative float number, not "%s".'
|
||||
% value)
|
||||
return value
|
||||
|
||||
|
||||
|
@ -76,7 +76,7 @@ class FakeSwift(object):
|
||||
"""
|
||||
ALLOWED_METHODS = [
|
||||
'PUT', 'POST', 'DELETE', 'GET', 'HEAD', 'OPTIONS', 'REPLICATE',
|
||||
'UPDATE']
|
||||
'SSYNC', 'UPDATE']
|
||||
|
||||
def __init__(self):
|
||||
self._calls = []
|
||||
|
170
test/unit/common/middleware/test_backend_ratelimit.py
Normal file
170
test/unit/common/middleware/test_backend_ratelimit.py
Normal file
@ -0,0 +1,170 @@
|
||||
# Copyright (c) 2022 NVIDIA
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# Used by get_swift_info and register_swift_info to store information about
|
||||
# the swift cluster.
|
||||
import time
|
||||
import unittest
|
||||
from collections import defaultdict
|
||||
|
||||
import mock
|
||||
|
||||
from swift.common.middleware import backend_ratelimit
|
||||
from swift.common.middleware.backend_ratelimit import \
|
||||
BackendRateLimitMiddleware
|
||||
from swift.common.swob import Request, HTTPOk
|
||||
from test.debug_logger import debug_logger
|
||||
from test.unit.common.middleware.helpers import FakeSwift
|
||||
|
||||
|
||||
class FakeApp(object):
|
||||
def __init__(self):
|
||||
self.calls = []
|
||||
|
||||
def __call__(self, env, start_response):
|
||||
start_response('200 OK', {})
|
||||
return ['']
|
||||
|
||||
|
||||
class TestBackendRatelimitMiddleware(unittest.TestCase):
|
||||
def setUp(self):
|
||||
super(TestBackendRatelimitMiddleware, self).setUp()
|
||||
self.swift = FakeSwift()
|
||||
|
||||
def test_init(self):
|
||||
conf = {}
|
||||
factory = backend_ratelimit.filter_factory(conf)
|
||||
rl = factory(self.swift)
|
||||
self.assertEqual(0.0, rl.requests_per_device_per_second)
|
||||
self.assertEqual(1.0, rl.requests_per_device_rate_buffer)
|
||||
|
||||
conf = {'requests_per_device_per_second': 1.3,
|
||||
'requests_per_device_rate_buffer': 2.4}
|
||||
factory = backend_ratelimit.filter_factory(conf)
|
||||
rl = factory(self.swift)
|
||||
self.assertEqual(1.3, rl.requests_per_device_per_second)
|
||||
self.assertEqual(2.4, rl.requests_per_device_rate_buffer)
|
||||
|
||||
conf = {'requests_per_device_per_second': -1}
|
||||
factory = backend_ratelimit.filter_factory(conf)
|
||||
with self.assertRaises(ValueError) as cm:
|
||||
factory(self.swift)
|
||||
self.assertEqual(
|
||||
'Value must be a non-negative float number, not "-1.0".',
|
||||
str(cm.exception))
|
||||
|
||||
conf = {'requests_per_device_rate_buffer': -1}
|
||||
factory = backend_ratelimit.filter_factory(conf)
|
||||
with self.assertRaises(ValueError):
|
||||
factory(self.swift)
|
||||
self.assertEqual(
|
||||
'Value must be a non-negative float number, not "-1.0".',
|
||||
str(cm.exception))
|
||||
|
||||
def _do_test_ratelimit(self, method, req_per_sec, rate_buffer):
|
||||
# send 20 requests, time increments by 0.01 between each request
|
||||
start = time.time()
|
||||
fake_time = [start]
|
||||
|
||||
def mock_time():
|
||||
return fake_time[0]
|
||||
|
||||
app = FakeSwift()
|
||||
logger = debug_logger()
|
||||
# apply a ratelimit
|
||||
conf = {'requests_per_device_per_second': req_per_sec,
|
||||
'requests_per_device_rate_buffer': rate_buffer}
|
||||
rl = BackendRateLimitMiddleware(app, conf, logger)
|
||||
success = defaultdict(int)
|
||||
ratelimited = 0
|
||||
|
||||
with mock.patch('swift.common.utils.time.time', mock_time):
|
||||
for i in range(20):
|
||||
for dev in ['sda1', 'sda2', 'sda3']:
|
||||
req = Request.blank('/%s/99/a/c/o' % dev,
|
||||
environ={'REQUEST_METHOD': method})
|
||||
app.register(method, req.path, HTTPOk, {})
|
||||
resp = req.get_response(rl)
|
||||
if resp.status_int == 200:
|
||||
success[dev] += 1
|
||||
else:
|
||||
self.assertEqual(529, resp.status_int)
|
||||
self.assertTrue(resp.status.startswith(
|
||||
'529 Too Many Backend Requests'))
|
||||
ratelimited += 1
|
||||
fake_time[0] += 0.01
|
||||
self.assertEqual(
|
||||
ratelimited,
|
||||
logger.get_increment_counts().get('backend.ratelimit', 0))
|
||||
return success
|
||||
|
||||
def test_ratelimited(self):
|
||||
def do_test_ratelimit(method):
|
||||
# no rate-limiting
|
||||
success_per_dev = self._do_test_ratelimit(method, 0, 0)
|
||||
self.assertEqual([20] * 3, list(success_per_dev.values()))
|
||||
|
||||
# rate-limited
|
||||
success_per_dev = self._do_test_ratelimit(method, 1, 0)
|
||||
self.assertEqual([1] * 3, list(success_per_dev.values()))
|
||||
|
||||
success_per_dev = self._do_test_ratelimit(method, 10, 0)
|
||||
self.assertEqual([2] * 3, list(success_per_dev.values()))
|
||||
|
||||
success_per_dev = self._do_test_ratelimit(method, 101, 0)
|
||||
self.assertEqual([20] * 3, list(success_per_dev.values()))
|
||||
|
||||
# startup burst of 1 seconds allowance plus current allowance...
|
||||
success_per_dev = self._do_test_ratelimit(method, 1, 1)
|
||||
self.assertEqual([2] * 3, list(success_per_dev.values()))
|
||||
success_per_dev = self._do_test_ratelimit(method, 10, 1)
|
||||
self.assertEqual([12] * 3, list(success_per_dev.values()))
|
||||
|
||||
do_test_ratelimit('GET')
|
||||
do_test_ratelimit('HEAD')
|
||||
do_test_ratelimit('PUT')
|
||||
do_test_ratelimit('POST')
|
||||
do_test_ratelimit('DELETE')
|
||||
do_test_ratelimit('UPDATE')
|
||||
do_test_ratelimit('REPLICATE')
|
||||
|
||||
def test_not_ratelimited(self):
|
||||
def do_test_no_ratelimit(method):
|
||||
# verify no rate-limiting
|
||||
success_per_dev = self._do_test_ratelimit(method, 1, 0)
|
||||
self.assertEqual([20] * 3, list(success_per_dev.values()))
|
||||
|
||||
do_test_no_ratelimit('OPTIONS')
|
||||
do_test_no_ratelimit('SSYNC')
|
||||
|
||||
def test_unhandled_request(self):
|
||||
app = FakeSwift()
|
||||
logger = debug_logger()
|
||||
conf = {'requests_per_device_per_second': 1,
|
||||
'requests_per_device_rate_buffer': 1}
|
||||
|
||||
def do_test(path):
|
||||
rl = BackendRateLimitMiddleware(app, conf, logger)
|
||||
req = Request.blank(path)
|
||||
app.register('GET', req.path, HTTPOk, {})
|
||||
for i in range(10):
|
||||
resp = req.get_response(rl)
|
||||
self.assertEqual(200, resp.status_int)
|
||||
self.assertEqual(
|
||||
0, logger.get_increment_counts().get('backend.ratelimit', 0))
|
||||
|
||||
do_test('/recon/version')
|
||||
do_test('/healthcheck')
|
||||
do_test('/v1/a/c/o')
|
@ -3106,12 +3106,26 @@ cluster_dfw1 = http://dfw1.host/v1/
|
||||
self.assertEqual(1, utils.non_negative_float(True))
|
||||
self.assertEqual(0, utils.non_negative_float(False))
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
with self.assertRaises(ValueError) as cm:
|
||||
utils.non_negative_float(-1.1)
|
||||
with self.assertRaises(ValueError):
|
||||
self.assertEqual(
|
||||
'Value must be a non-negative float number, not "-1.1".',
|
||||
str(cm.exception))
|
||||
with self.assertRaises(ValueError) as cm:
|
||||
utils.non_negative_float('-1.1')
|
||||
with self.assertRaises(ValueError):
|
||||
self.assertEqual(
|
||||
'Value must be a non-negative float number, not "-1.1".',
|
||||
str(cm.exception))
|
||||
with self.assertRaises(ValueError) as cm:
|
||||
utils.non_negative_float('one')
|
||||
self.assertEqual(
|
||||
'Value must be a non-negative float number, not "one".',
|
||||
str(cm.exception))
|
||||
with self.assertRaises(ValueError) as cm:
|
||||
utils.non_negative_float(None)
|
||||
self.assertEqual(
|
||||
'Value must be a non-negative float number, not "None".',
|
||||
str(cm.exception))
|
||||
|
||||
def test_non_negative_int(self):
|
||||
self.assertEqual(0, utils.non_negative_int('0'))
|
||||
|
Loading…
Reference in New Issue
Block a user