Updated direct_client to match the changes in client
This commit is contained in:
parent
2edfd2b951
commit
d0367fdf19
@ -86,7 +86,7 @@ def audit(coropool, connpool, account, container_ring, object_ring, options):
|
|||||||
retries_done[0] += attempts - 1
|
retries_done[0] += attempts - 1
|
||||||
found = True
|
found = True
|
||||||
if not estimated_objects:
|
if not estimated_objects:
|
||||||
estimated_objects = info[0]
|
estimated_objects = int(info['x-container-object-count'])
|
||||||
except ClientException, err:
|
except ClientException, err:
|
||||||
if err.http_status not in (404, 507):
|
if err.http_status not in (404, 507):
|
||||||
error_log('Giving up on /%s/%s/%s: %s' % (part, account,
|
error_log('Giving up on /%s/%s/%s: %s' % (part, account,
|
||||||
|
@ -310,7 +310,7 @@ class AccountReaper(Daemon):
|
|||||||
try:
|
try:
|
||||||
objects = direct_get_container(node, part, account, container,
|
objects = direct_get_container(node, part, account, container,
|
||||||
marker=marker, conn_timeout=self.conn_timeout,
|
marker=marker, conn_timeout=self.conn_timeout,
|
||||||
response_timeout=self.node_timeout)
|
response_timeout=self.node_timeout)[1]
|
||||||
self.stats_return_codes[2] = \
|
self.stats_return_codes[2] = \
|
||||||
self.stats_return_codes.get(2, 0) + 1
|
self.stats_return_codes.get(2, 0) + 1
|
||||||
except ClientException, err:
|
except ClientException, err:
|
||||||
|
@ -504,10 +504,6 @@ def get_object(url, token, container, name, http_conn=None,
|
|||||||
raise ClientException('Object GET failed', http_scheme=parsed.scheme,
|
raise ClientException('Object GET failed', http_scheme=parsed.scheme,
|
||||||
http_host=conn.host, http_port=conn.port, http_path=path,
|
http_host=conn.host, http_port=conn.port, http_path=path,
|
||||||
http_status=resp.status, http_reason=resp.reason)
|
http_status=resp.status, http_reason=resp.reason)
|
||||||
metadata = {}
|
|
||||||
for key, value in resp.getheaders():
|
|
||||||
if key.lower().startswith('x-object-meta-'):
|
|
||||||
metadata[unquote(key[len('x-object-meta-'):])] = unquote(value)
|
|
||||||
if resp_chunk_size:
|
if resp_chunk_size:
|
||||||
|
|
||||||
def _object_body():
|
def _object_body():
|
||||||
@ -550,10 +546,6 @@ def head_object(url, token, container, name, http_conn=None):
|
|||||||
raise ClientException('Object HEAD failed', http_scheme=parsed.scheme,
|
raise ClientException('Object HEAD failed', http_scheme=parsed.scheme,
|
||||||
http_host=conn.host, http_port=conn.port, http_path=path,
|
http_host=conn.host, http_port=conn.port, http_path=path,
|
||||||
http_status=resp.status, http_reason=resp.reason)
|
http_status=resp.status, http_reason=resp.reason)
|
||||||
metadata = {}
|
|
||||||
for key, value in resp.getheaders():
|
|
||||||
if key.lower().startswith('x-object-meta-'):
|
|
||||||
metadata[unquote(key[len('x-object-meta-'):])] = unquote(value)
|
|
||||||
resp_headers = {}
|
resp_headers = {}
|
||||||
for header, value in resp.getheaders():
|
for header, value in resp.getheaders():
|
||||||
resp_headers[header.lower()] = value
|
resp_headers[header.lower()] = value
|
||||||
|
@ -47,7 +47,8 @@ def direct_head_container(node, part, account, container, conn_timeout=5,
|
|||||||
:param container: container name
|
:param container: container name
|
||||||
:param conn_timeout: timeout in seconds for establishing the connection
|
:param conn_timeout: timeout in seconds for establishing the connection
|
||||||
:param response_timeout: timeout in seconds for getting the response
|
:param response_timeout: timeout in seconds for getting the response
|
||||||
:returns: tuple of (object count, bytes used)
|
:returns: a dict containing the response's headers (all header names will
|
||||||
|
be lowercase)
|
||||||
"""
|
"""
|
||||||
path = '/%s/%s' % (account, container)
|
path = '/%s/%s' % (account, container)
|
||||||
with Timeout(conn_timeout):
|
with Timeout(conn_timeout):
|
||||||
@ -65,8 +66,10 @@ def direct_head_container(node, part, account, container, conn_timeout=5,
|
|||||||
http_host=node['ip'], http_port=node['port'],
|
http_host=node['ip'], http_port=node['port'],
|
||||||
http_device=node['device'], http_status=resp.status,
|
http_device=node['device'], http_status=resp.status,
|
||||||
http_reason=resp.reason)
|
http_reason=resp.reason)
|
||||||
return int(resp.getheader('x-container-object-count')), \
|
resp_headers = {}
|
||||||
int(resp.getheader('x-container-bytes-used'))
|
for header, value in resp.getheaders():
|
||||||
|
resp_headers[header.lower()] = value
|
||||||
|
return resp_headers
|
||||||
|
|
||||||
|
|
||||||
def direct_get_container(node, part, account, container, marker=None,
|
def direct_get_container(node, part, account, container, marker=None,
|
||||||
@ -85,7 +88,8 @@ def direct_get_container(node, part, account, container, marker=None,
|
|||||||
:param delimeter: delimeter for the query
|
:param delimeter: delimeter for the query
|
||||||
:param conn_timeout: timeout in seconds for establishing the connection
|
:param conn_timeout: timeout in seconds for establishing the connection
|
||||||
:param response_timeout: timeout in seconds for getting the response
|
:param response_timeout: timeout in seconds for getting the response
|
||||||
:returns: list of objects
|
:returns: a tuple of (response headers, a list of objects) The response
|
||||||
|
headers will be a dict and all header names will be lowercase.
|
||||||
"""
|
"""
|
||||||
path = '/%s/%s' % (account, container)
|
path = '/%s/%s' % (account, container)
|
||||||
qs = 'format=json'
|
qs = 'format=json'
|
||||||
@ -111,10 +115,13 @@ def direct_get_container(node, part, account, container, marker=None,
|
|||||||
http_host=node['ip'], http_port=node['port'],
|
http_host=node['ip'], http_port=node['port'],
|
||||||
http_device=node['device'], http_status=resp.status,
|
http_device=node['device'], http_status=resp.status,
|
||||||
http_reason=resp.reason)
|
http_reason=resp.reason)
|
||||||
|
resp_headers = {}
|
||||||
|
for header, value in resp.getheaders():
|
||||||
|
resp_headers[header.lower()] = value
|
||||||
if resp.status == 204:
|
if resp.status == 204:
|
||||||
resp.read()
|
resp.read()
|
||||||
return []
|
return resp_headers, []
|
||||||
return json_loads(resp.read())
|
return resp_headers, json_loads(resp.read())
|
||||||
|
|
||||||
|
|
||||||
def direct_delete_container(node, part, account, container, conn_timeout=5,
|
def direct_delete_container(node, part, account, container, conn_timeout=5,
|
||||||
@ -126,6 +133,7 @@ def direct_delete_container(node, part, account, container, conn_timeout=5,
|
|||||||
'DELETE', path, headers)
|
'DELETE', path, headers)
|
||||||
with Timeout(response_timeout):
|
with Timeout(response_timeout):
|
||||||
resp = conn.getresponse()
|
resp = conn.getresponse()
|
||||||
|
resp.read()
|
||||||
if resp.status < 200 or resp.status >= 300:
|
if resp.status < 200 or resp.status >= 300:
|
||||||
raise ClientException(
|
raise ClientException(
|
||||||
'Container server %s:%s direct DELETE %s gave status %s' %
|
'Container server %s:%s direct DELETE %s gave status %s' %
|
||||||
@ -135,7 +143,6 @@ def direct_delete_container(node, part, account, container, conn_timeout=5,
|
|||||||
http_host=node['ip'], http_port=node['port'],
|
http_host=node['ip'], http_port=node['port'],
|
||||||
http_device=node['device'], http_status=resp.status,
|
http_device=node['device'], http_status=resp.status,
|
||||||
http_reason=resp.reason)
|
http_reason=resp.reason)
|
||||||
return resp
|
|
||||||
|
|
||||||
|
|
||||||
def direct_head_object(node, part, account, container, obj, conn_timeout=5,
|
def direct_head_object(node, part, account, container, obj, conn_timeout=5,
|
||||||
@ -150,8 +157,8 @@ def direct_head_object(node, part, account, container, obj, conn_timeout=5,
|
|||||||
:param obj: object name
|
:param obj: object name
|
||||||
:param conn_timeout: timeout in seconds for establishing the connection
|
:param conn_timeout: timeout in seconds for establishing the connection
|
||||||
:param response_timeout: timeout in seconds for getting the response
|
:param response_timeout: timeout in seconds for getting the response
|
||||||
:returns: tuple of (content-type, object size, last modified timestamp,
|
:returns: a dict containing the response's headers (all header names will
|
||||||
etag, metadata dictionary)
|
be lowercase)
|
||||||
"""
|
"""
|
||||||
path = '/%s/%s/%s' % (account, container, obj)
|
path = '/%s/%s/%s' % (account, container, obj)
|
||||||
with Timeout(conn_timeout):
|
with Timeout(conn_timeout):
|
||||||
@ -169,19 +176,14 @@ def direct_head_object(node, part, account, container, obj, conn_timeout=5,
|
|||||||
http_host=node['ip'], http_port=node['port'],
|
http_host=node['ip'], http_port=node['port'],
|
||||||
http_device=node['device'], http_status=resp.status,
|
http_device=node['device'], http_status=resp.status,
|
||||||
http_reason=resp.reason)
|
http_reason=resp.reason)
|
||||||
metadata = {}
|
resp_headers = {}
|
||||||
for key, value in resp.getheaders():
|
for header, value in resp.getheaders():
|
||||||
if key.lower().startswith('x-object-meta-'):
|
resp_headers[header.lower()] = value
|
||||||
metadata[unquote(key[len('x-object-meta-'):])] = unquote(value)
|
return resp_headers
|
||||||
return resp.getheader('content-type'), \
|
|
||||||
int(resp.getheader('content-length')), \
|
|
||||||
resp.getheader('last-modified'), \
|
|
||||||
resp.getheader('etag').strip('"'), \
|
|
||||||
metadata
|
|
||||||
|
|
||||||
|
|
||||||
def direct_get_object(node, part, account, container, obj, conn_timeout=5,
|
def direct_get_object(node, part, account, container, obj, conn_timeout=5,
|
||||||
response_timeout=15):
|
response_timeout=15, resp_chunk_size=None):
|
||||||
"""
|
"""
|
||||||
Get object directly from the object server.
|
Get object directly from the object server.
|
||||||
|
|
||||||
@ -192,7 +194,9 @@ def direct_get_object(node, part, account, container, obj, conn_timeout=5,
|
|||||||
:param obj: object name
|
:param obj: object name
|
||||||
:param conn_timeout: timeout in seconds for establishing the connection
|
:param conn_timeout: timeout in seconds for establishing the connection
|
||||||
:param response_timeout: timeout in seconds for getting the response
|
:param response_timeout: timeout in seconds for getting the response
|
||||||
:returns: object
|
:param resp_chunk_size: if defined, chunk size of data to read.
|
||||||
|
:returns: a tuple of (response headers, the object's contents) The response
|
||||||
|
headers will be a dict and all header names will be lowercase.
|
||||||
"""
|
"""
|
||||||
path = '/%s/%s/%s' % (account, container, obj)
|
path = '/%s/%s/%s' % (account, container, obj)
|
||||||
with Timeout(conn_timeout):
|
with Timeout(conn_timeout):
|
||||||
@ -201,6 +205,7 @@ def direct_get_object(node, part, account, container, obj, conn_timeout=5,
|
|||||||
with Timeout(response_timeout):
|
with Timeout(response_timeout):
|
||||||
resp = conn.getresponse()
|
resp = conn.getresponse()
|
||||||
if resp.status < 200 or resp.status >= 300:
|
if resp.status < 200 or resp.status >= 300:
|
||||||
|
resp.read()
|
||||||
raise ClientException(
|
raise ClientException(
|
||||||
'Object server %s:%s direct GET %s gave status %s' %
|
'Object server %s:%s direct GET %s gave status %s' %
|
||||||
(node['ip'], node['port'],
|
(node['ip'], node['port'],
|
||||||
@ -209,16 +214,20 @@ def direct_get_object(node, part, account, container, obj, conn_timeout=5,
|
|||||||
http_host=node['ip'], http_port=node['port'],
|
http_host=node['ip'], http_port=node['port'],
|
||||||
http_device=node['device'], http_status=resp.status,
|
http_device=node['device'], http_status=resp.status,
|
||||||
http_reason=resp.reason)
|
http_reason=resp.reason)
|
||||||
metadata = {}
|
if resp_chunk_size:
|
||||||
for key, value in resp.getheaders():
|
|
||||||
if key.lower().startswith('x-object-meta-'):
|
def _object_body():
|
||||||
metadata[unquote(key[len('x-object-meta-'):])] = unquote(value)
|
buf = resp.read(resp_chunk_size)
|
||||||
return (resp.getheader('content-type'),
|
while buf:
|
||||||
int(resp.getheader('content-length')),
|
yield buf
|
||||||
resp.getheader('last-modified'),
|
buf = resp.read(resp_chunk_size)
|
||||||
resp.getheader('etag').strip('"'),
|
object_body = _object_body()
|
||||||
metadata,
|
else:
|
||||||
resp.read())
|
object_body = resp.read()
|
||||||
|
resp_headers = {}
|
||||||
|
for header, value in resp.getheaders():
|
||||||
|
resp_headers[header.lower()] = value
|
||||||
|
return resp_headers, object_body
|
||||||
|
|
||||||
|
|
||||||
def direct_delete_object(node, part, account, container, obj,
|
def direct_delete_object(node, part, account, container, obj,
|
||||||
@ -242,6 +251,7 @@ def direct_delete_object(node, part, account, container, obj,
|
|||||||
'DELETE', path, headers)
|
'DELETE', path, headers)
|
||||||
with Timeout(response_timeout):
|
with Timeout(response_timeout):
|
||||||
resp = conn.getresponse()
|
resp = conn.getresponse()
|
||||||
|
resp.read()
|
||||||
if resp.status < 200 or resp.status >= 300:
|
if resp.status < 200 or resp.status >= 300:
|
||||||
raise ClientException(
|
raise ClientException(
|
||||||
'Object server %s:%s direct DELETE %s gave status %s' %
|
'Object server %s:%s direct DELETE %s gave status %s' %
|
||||||
@ -251,7 +261,6 @@ def direct_delete_object(node, part, account, container, obj,
|
|||||||
http_host=node['ip'], http_port=node['port'],
|
http_host=node['ip'], http_port=node['port'],
|
||||||
http_device=node['device'], http_status=resp.status,
|
http_device=node['device'], http_status=resp.status,
|
||||||
http_reason=resp.reason)
|
http_reason=resp.reason)
|
||||||
return resp
|
|
||||||
|
|
||||||
|
|
||||||
def retry(func, *args, **kwargs):
|
def retry(func, *args, **kwargs):
|
||||||
|
@ -52,7 +52,7 @@ class TestObjectAsyncUpdate(unittest.TestCase):
|
|||||||
((cnode['port'] - 6001) / 10)]).pid
|
((cnode['port'] - 6001) / 10)]).pid
|
||||||
sleep(2)
|
sleep(2)
|
||||||
self.assert_(not direct_client.direct_get_container(cnode, cpart,
|
self.assert_(not direct_client.direct_get_container(cnode, cpart,
|
||||||
self.account, container))
|
self.account, container)[1])
|
||||||
ps = []
|
ps = []
|
||||||
for n in xrange(1, 5):
|
for n in xrange(1, 5):
|
||||||
ps.append(Popen(['swift-object-updater',
|
ps.append(Popen(['swift-object-updater',
|
||||||
@ -60,7 +60,7 @@ class TestObjectAsyncUpdate(unittest.TestCase):
|
|||||||
for p in ps:
|
for p in ps:
|
||||||
p.wait()
|
p.wait()
|
||||||
objs = [o['name'] for o in direct_client.direct_get_container(cnode,
|
objs = [o['name'] for o in direct_client.direct_get_container(cnode,
|
||||||
cpart, self.account, container)]
|
cpart, self.account, container)[1]]
|
||||||
self.assert_(obj in objs)
|
self.assert_(obj in objs)
|
||||||
|
|
||||||
|
|
||||||
|
@ -81,7 +81,7 @@ class TestObjectHandoff(unittest.TestCase):
|
|||||||
for cnode in cnodes:
|
for cnode in cnodes:
|
||||||
objs = [o['name'] for o in
|
objs = [o['name'] for o in
|
||||||
direct_client.direct_get_container(cnode, cpart,
|
direct_client.direct_get_container(cnode, cpart,
|
||||||
self.account, container)]
|
self.account, container)[1]]
|
||||||
if obj not in objs:
|
if obj not in objs:
|
||||||
raise Exception(
|
raise Exception(
|
||||||
'Container server %s:%s did not know about object' %
|
'Container server %s:%s did not know about object' %
|
||||||
@ -127,9 +127,9 @@ class TestObjectHandoff(unittest.TestCase):
|
|||||||
kill(self.pids[self.port2server[onode['port']]], SIGTERM)
|
kill(self.pids[self.port2server[onode['port']]], SIGTERM)
|
||||||
client.post_object(self.url, self.token, container, obj,
|
client.post_object(self.url, self.token, container, obj,
|
||||||
headers={'x-object-meta-probe': 'value'})
|
headers={'x-object-meta-probe': 'value'})
|
||||||
ometadata = client.head_object(self.url, self.token, container, obj)
|
oheaders = client.head_object(self.url, self.token, container, obj)
|
||||||
if ometadata.get('x-object-meta-probe') != 'value':
|
if oheaders.get('x-object-meta-probe') != 'value':
|
||||||
raise Exception('Metadata incorrect, was %s' % repr(ometadata))
|
raise Exception('Metadata incorrect, was %s' % repr(oheaders))
|
||||||
exc = False
|
exc = False
|
||||||
try:
|
try:
|
||||||
direct_client.direct_get_object(another_onode, opart, self.account,
|
direct_client.direct_get_object(another_onode, opart, self.account,
|
||||||
@ -144,9 +144,9 @@ class TestObjectHandoff(unittest.TestCase):
|
|||||||
'/etc/swift/object-server/%d.conf' %
|
'/etc/swift/object-server/%d.conf' %
|
||||||
((onode['port'] - 6000) / 10)]).pid
|
((onode['port'] - 6000) / 10)]).pid
|
||||||
sleep(2)
|
sleep(2)
|
||||||
ometadata = direct_client.direct_get_object(onode, opart, self.account,
|
oheaders = direct_client.direct_get_object(onode, opart, self.account,
|
||||||
container, obj)[-2]
|
container, obj)[0]
|
||||||
if ometadata.get('probe') == 'value':
|
if oheaders.get('x-object-meta-probe') == 'value':
|
||||||
raise Exception('Previously downed object server had the new '
|
raise Exception('Previously downed object server had the new '
|
||||||
'metadata when it should not have it')
|
'metadata when it should not have it')
|
||||||
# Run the extra server last so it'll remove it's extra partition
|
# Run the extra server last so it'll remove it's extra partition
|
||||||
@ -160,9 +160,9 @@ class TestObjectHandoff(unittest.TestCase):
|
|||||||
call(['swift-object-replicator',
|
call(['swift-object-replicator',
|
||||||
'/etc/swift/object-server/%d.conf' %
|
'/etc/swift/object-server/%d.conf' %
|
||||||
((another_onode['port'] - 6000) / 10), 'once'])
|
((another_onode['port'] - 6000) / 10), 'once'])
|
||||||
ometadata = direct_client.direct_get_object(onode, opart, self.account,
|
oheaders = direct_client.direct_get_object(onode, opart, self.account,
|
||||||
container, obj)[-2]
|
container, obj)[0]
|
||||||
if ometadata.get('probe') != 'value':
|
if oheaders.get('x-object-meta-probe') != 'value':
|
||||||
raise Exception(
|
raise Exception(
|
||||||
'Previously downed object server did not have the new metadata')
|
'Previously downed object server did not have the new metadata')
|
||||||
|
|
||||||
@ -182,7 +182,7 @@ class TestObjectHandoff(unittest.TestCase):
|
|||||||
for cnode in cnodes:
|
for cnode in cnodes:
|
||||||
objs = [o['name'] for o in
|
objs = [o['name'] for o in
|
||||||
direct_client.direct_get_container(
|
direct_client.direct_get_container(
|
||||||
cnode, cpart, self.account, container)]
|
cnode, cpart, self.account, container)[1]]
|
||||||
if obj in objs:
|
if obj in objs:
|
||||||
raise Exception(
|
raise Exception(
|
||||||
'Container server %s:%s still knew about object' %
|
'Container server %s:%s still knew about object' %
|
||||||
|
@ -16,7 +16,6 @@
|
|||||||
# TODO: Tests
|
# TODO: Tests
|
||||||
|
|
||||||
import unittest
|
import unittest
|
||||||
from swift.common import direct_client
|
|
||||||
|
|
||||||
class TestAuditor(unittest.TestCase):
|
class TestAuditor(unittest.TestCase):
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user