Plumb sharding stats though recon middleware
To make it easier to have access to the sharding stats add /recon/sharding as a recon middleware endpoint. This allows an easy way to ask a container server for it's sharding stats using REST inside the cluster: curl <container-server>/recon/sharding Also add a get_recon method to the direct client so it can also be used easily inside tooling and probe tests. Co-Authored-By: Alistair Coles <alistairncoles@gmail.com> Change-Id: I2a6024277d1198d8c996682682bfe28797344951
This commit is contained in:
parent
6ed82b106c
commit
b1309c95e5
@ -27,7 +27,7 @@ import six
|
||||
import six.moves.cPickle as pickle
|
||||
from six.moves.http_client import HTTPException
|
||||
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.bufferedhttp import http_connect, http_connect_raw
|
||||
from swift.common.exceptions import ClientException
|
||||
from swift.common.request_helpers import USE_REPLICATION_NETWORK_HEADER, \
|
||||
get_ip_port
|
||||
@ -56,6 +56,20 @@ class DirectClientException(ClientException):
|
||||
http_reason=resp.reason, http_headers=headers)
|
||||
|
||||
|
||||
class DirectClientReconException(ClientException):
|
||||
|
||||
def __init__(self, method, node, path, resp):
|
||||
if not isinstance(path, six.text_type):
|
||||
path = path.decode("utf-8")
|
||||
msg = 'server %s:%s direct %s %r gave status %s' % (
|
||||
node['ip'], node['port'], method, path, resp.status)
|
||||
headers = HeaderKeyDict(resp.getheaders())
|
||||
super(DirectClientReconException, self).__init__(
|
||||
msg, http_host=node['ip'], http_port=node['port'],
|
||||
http_status=resp.status, http_reason=resp.reason,
|
||||
http_headers=headers)
|
||||
|
||||
|
||||
def _make_path(*components):
|
||||
return u'/' + u'/'.join(
|
||||
x.decode('utf-8') if isinstance(x, six.binary_type) else x
|
||||
@ -640,3 +654,31 @@ def retry(func, *args, **kwargs):
|
||||
http_device=args[0]['device'])
|
||||
else:
|
||||
raise ClientException('Raise too many retries')
|
||||
|
||||
|
||||
def direct_get_recon(node, recon_command, conn_timeout=5, response_timeout=15,
|
||||
headers=None):
|
||||
"""
|
||||
Get recon json directly from the storage server.
|
||||
|
||||
:param node: node dictionary from the ring
|
||||
:param recon_command: recon string (post /recon/)
|
||||
:param conn_timeout: timeout in seconds for establishing the connection
|
||||
:param response_timeout: timeout in seconds for getting the response
|
||||
:param headers: dict to be passed into HTTPConnection headers
|
||||
:returns: deserialized json response
|
||||
:raises DirectClientReconException: HTTP GET request failed
|
||||
"""
|
||||
if headers is None:
|
||||
headers = {}
|
||||
|
||||
ip, port = get_ip_port(node, headers)
|
||||
path = '/recon/%s' % recon_command
|
||||
with Timeout(conn_timeout):
|
||||
conn = http_connect_raw(ip, port, 'GET', path,
|
||||
headers=gen_headers(headers))
|
||||
with Timeout(response_timeout):
|
||||
resp = conn.getresponse()
|
||||
if not is_success(resp.status):
|
||||
raise DirectClientReconException('GET', node, path, resp)
|
||||
return json.loads(resp.read())
|
||||
|
@ -135,6 +135,12 @@ class ReconMiddleware(object):
|
||||
return self._from_recon_cache(['drive_audit_errors'],
|
||||
self.drive_recon_cache)
|
||||
|
||||
def get_sharding_info(self):
|
||||
"""get sharding info"""
|
||||
return self._from_recon_cache(["sharding_stats",
|
||||
"sharding_last"],
|
||||
self.container_recon_cache)
|
||||
|
||||
def get_replication_info(self, recon_type):
|
||||
"""get replication info"""
|
||||
replication_list = ['replication_time',
|
||||
@ -372,6 +378,8 @@ class ReconMiddleware(object):
|
||||
content = self.get_driveaudit_error()
|
||||
elif rcheck == "time":
|
||||
content = self.get_time()
|
||||
elif rcheck == "sharding":
|
||||
content = self.get_sharding_info()
|
||||
else:
|
||||
content = "Invalid path: %s" % req.path
|
||||
return Response(request=req, status="404 Not Found",
|
||||
|
@ -157,6 +157,9 @@ class FakeRecon(object):
|
||||
self.fake_replication_rtype = recon_type
|
||||
return {'replicationtest': "1"}
|
||||
|
||||
def fake_sharding(self):
|
||||
return {"sharding_stats": "1"}
|
||||
|
||||
def fake_updater(self, recon_type):
|
||||
self.fake_updater_rtype = recon_type
|
||||
return {'updatertest': "1"}
|
||||
@ -1107,6 +1110,84 @@ class TestReconSuccess(TestCase):
|
||||
rv = self.app.get_time()
|
||||
self.assertEqual(rv, now)
|
||||
|
||||
def test_get_sharding_info(self):
|
||||
from_cache_response = {
|
||||
"sharding_stats": {
|
||||
"attempted": 0,
|
||||
"deferred": 0,
|
||||
"diff": 0,
|
||||
"diff_capped": 0,
|
||||
"empty": 0,
|
||||
"failure": 0,
|
||||
"hashmatch": 0,
|
||||
"no_change": 0,
|
||||
"remote_merge": 0,
|
||||
"remove": 0,
|
||||
"rsync": 0,
|
||||
"start": 1614136398.5729735,
|
||||
"success": 0,
|
||||
"ts_repl": 0,
|
||||
"sharding": {
|
||||
"audit_root": {
|
||||
"attempted": 0,
|
||||
"failure": 0,
|
||||
"success": 0,
|
||||
},
|
||||
"audit_shard": {
|
||||
"attempted": 0,
|
||||
"failure": 0,
|
||||
"success": 0,
|
||||
},
|
||||
"cleaved": {
|
||||
"attempted": 0,
|
||||
"failure": 0,
|
||||
"max_time": 0,
|
||||
"min_time": 0,
|
||||
"success": 0,
|
||||
},
|
||||
"created": {
|
||||
"attempted": 0,
|
||||
"failure": 0,
|
||||
"success": 0,
|
||||
},
|
||||
"misplaced": {
|
||||
"attempted": 0,
|
||||
"failure": 0,
|
||||
"found": 0,
|
||||
"placed": 0,
|
||||
"success": 0,
|
||||
"unplaced": 0,
|
||||
},
|
||||
"scanned": {
|
||||
"attempted": 0,
|
||||
"failure": 0,
|
||||
"found": 0,
|
||||
"max_time": 0,
|
||||
"min_time": 0,
|
||||
"success": 0,
|
||||
},
|
||||
"sharding_candidates": {
|
||||
"found": 0,
|
||||
"top": [],
|
||||
},
|
||||
"visited": {
|
||||
"attempted": 0,
|
||||
"completed": 0,
|
||||
"failure": 0,
|
||||
"skipped": 6,
|
||||
"success": 0,
|
||||
}
|
||||
},
|
||||
},
|
||||
"sharding_last": 1614136398.6680582}
|
||||
self.fakecache.fakeout_calls = []
|
||||
self.fakecache.fakeout = from_cache_response
|
||||
rv = self.app.get_sharding_info()
|
||||
self.assertEqual(self.fakecache.fakeout_calls,
|
||||
[((['sharding_stats', 'sharding_last'],
|
||||
'/var/cache/swift/container.recon'), {})])
|
||||
self.assertEqual(rv, from_cache_response)
|
||||
|
||||
|
||||
class TestReconMiddleware(unittest.TestCase):
|
||||
|
||||
@ -1139,6 +1220,7 @@ class TestReconMiddleware(unittest.TestCase):
|
||||
self.app.get_socket_info = self.frecon.fake_sockstat
|
||||
self.app.get_driveaudit_error = self.frecon.fake_driveaudit
|
||||
self.app.get_time = self.frecon.fake_time
|
||||
self.app.get_sharding_info = self.frecon.fake_sharding
|
||||
|
||||
def test_recon_get_mem(self):
|
||||
get_mem_resp = [b'{"memtest": "1"}']
|
||||
@ -1406,6 +1488,14 @@ class TestReconMiddleware(unittest.TestCase):
|
||||
resp = self.real_app_get_swift_conf_md5()
|
||||
self.assertIsNone(resp['/etc/swift/swift.conf'])
|
||||
|
||||
def test_recon_get_sharding(self):
|
||||
get_sharding_resp = [
|
||||
b'{"sharding_stats": "1"}']
|
||||
req = Request.blank('/recon/sharding',
|
||||
environ={'REQUEST_METHOD': 'GET'})
|
||||
resp = self.app(req.environ, start_response)
|
||||
self.assertEqual(resp, get_sharding_resp)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -86,11 +86,11 @@ class FakeConn(object):
|
||||
|
||||
@contextmanager
|
||||
def mocked_http_conn(*args, **kwargs):
|
||||
mocked = kwargs.pop('mocked', 'swift.common.bufferedhttp.http_connect_raw')
|
||||
fake_conn = FakeConn(*args, **kwargs)
|
||||
mock_http_conn = lambda *args, **kwargs: \
|
||||
fake_conn._update_raw_call_args(*args, **kwargs)
|
||||
with mock.patch('swift.common.bufferedhttp.http_connect_raw',
|
||||
new=mock_http_conn):
|
||||
with mock.patch(mocked, new=mock_http_conn):
|
||||
yield fake_conn
|
||||
|
||||
|
||||
@ -979,6 +979,36 @@ class TestDirectClient(unittest.TestCase):
|
||||
for line in error_lines:
|
||||
self.assertIn('Kaboom!', line)
|
||||
|
||||
def test_direct_get_recon(self):
|
||||
data = {
|
||||
"/etc/swift/account.ring.gz": "de7316d2809205fa13ebfc747566260c",
|
||||
"/etc/swift/container.ring.gz": "8e63c916fec81825cc40940eefe1d058",
|
||||
"/etc/swift/object.ring.gz": "a77f51c14bbf7075bb7be0c27fd00dc4",
|
||||
"/etc/swift/object-1.ring.gz": "f0222326f80ee5cb34b7546b18727923",
|
||||
"/etc/swift/object-2.ring.gz": "2228dc8a7ff1cf2eb89b116653ac6191"}
|
||||
body = json.dumps(data)
|
||||
with mocked_http_conn(
|
||||
200, {}, body,
|
||||
mocked='swift.common.direct_client.http_connect_raw') as conn:
|
||||
resp = direct_client.direct_get_recon(self.node, "ringmd5")
|
||||
self.assertEqual(conn.method, 'GET')
|
||||
self.assertEqual(conn.path, '/recon/ringmd5')
|
||||
self.assertEqual(conn.host, self.node['ip'])
|
||||
self.assertEqual(conn.port, self.node['port'])
|
||||
self.assertEqual(data, resp)
|
||||
|
||||
# Now check failure
|
||||
with mocked_http_conn(
|
||||
500,
|
||||
mocked='swift.common.direct_client.http_connect_raw') as conn:
|
||||
with self.assertRaises(ClientException) as raised:
|
||||
resp = direct_client.direct_get_recon(self.node, "ringmd5")
|
||||
self.assertEqual(conn.host, self.node['ip'])
|
||||
self.assertEqual(conn.port, self.node['port'])
|
||||
self.assertEqual(conn.method, 'GET')
|
||||
self.assertEqual(conn.path, '/recon/ringmd5')
|
||||
self.assertEqual(raised.exception.http_status, 500)
|
||||
|
||||
|
||||
class TestUTF8DirectClient(TestDirectClient):
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user