client.py: Reset of streams during upload retries

This commit is contained in:
gholt 2011-01-18 21:37:34 +00:00 committed by Tarmac
commit 6024b39314
2 changed files with 126 additions and 16 deletions

View File

@ -696,7 +696,7 @@ class Connection(object):
"""Convenience class to make requests that will also retry the request"""
def __init__(self, authurl, user, key, retries=5, preauthurl=None,
preauthtoken=None, snet=False):
preauthtoken=None, snet=False, starting_backoff=1):
"""
:param authurl: authenitcation URL
:param user: user name to authenticate as
@ -716,6 +716,7 @@ class Connection(object):
self.token = preauthtoken
self.attempts = 0
self.snet = snet
self.starting_backoff = starting_backoff
def get_auth(self):
return get_auth(self.authurl, self.user, self.key, snet=self.snet)
@ -723,9 +724,9 @@ class Connection(object):
def http_connection(self):
return http_connection(self.url)
def _retry(self, func, *args, **kwargs):
def _retry(self, reset_func, func, *args, **kwargs):
self.attempts = 0
backoff = 1
backoff = self.starting_backoff
while self.attempts <= self.retries:
self.attempts += 1
try:
@ -754,10 +755,12 @@ class Connection(object):
raise
sleep(backoff)
backoff *= 2
if reset_func:
reset_func(func, *args, **kwargs)
def head_account(self):
"""Wrapper for :func:`head_account`"""
return self._retry(head_account)
return self._retry(None, head_account)
def get_account(self, marker=None, limit=None, prefix=None,
full_listing=False):
@ -765,16 +768,16 @@ class Connection(object):
# TODO(unknown): With full_listing=True this will restart the entire
# listing with each retry. Need to make a better version that just
# retries where it left off.
return self._retry(get_account, marker=marker, limit=limit,
return self._retry(None, get_account, marker=marker, limit=limit,
prefix=prefix, full_listing=full_listing)
def post_account(self, headers):
"""Wrapper for :func:`post_account`"""
return self._retry(post_account, headers)
return self._retry(None, post_account, headers)
def head_container(self, container):
"""Wrapper for :func:`head_container`"""
return self._retry(head_container, container)
return self._retry(None, head_container, container)
def get_container(self, container, marker=None, limit=None, prefix=None,
delimiter=None, full_listing=False):
@ -782,43 +785,55 @@ class Connection(object):
# TODO(unknown): With full_listing=True this will restart the entire
# listing with each retry. Need to make a better version that just
# retries where it left off.
return self._retry(get_container, container, marker=marker,
return self._retry(None, get_container, container, marker=marker,
limit=limit, prefix=prefix, delimiter=delimiter,
full_listing=full_listing)
def put_container(self, container, headers=None):
"""Wrapper for :func:`put_container`"""
return self._retry(put_container, container, headers=headers)
return self._retry(None, put_container, container, headers=headers)
def post_container(self, container, headers):
"""Wrapper for :func:`post_container`"""
return self._retry(post_container, container, headers)
return self._retry(None, post_container, container, headers)
def delete_container(self, container):
"""Wrapper for :func:`delete_container`"""
return self._retry(delete_container, container)
return self._retry(None, delete_container, container)
def head_object(self, container, obj):
"""Wrapper for :func:`head_object`"""
return self._retry(head_object, container, obj)
return self._retry(None, head_object, container, obj)
def get_object(self, container, obj, resp_chunk_size=None):
"""Wrapper for :func:`get_object`"""
return self._retry(get_object, container, obj,
return self._retry(None, get_object, container, obj,
resp_chunk_size=resp_chunk_size)
def put_object(self, container, obj, contents, content_length=None,
etag=None, chunk_size=65536, content_type=None,
headers=None):
"""Wrapper for :func:`put_object`"""
return self._retry(put_object, container, obj, contents,
def _default_reset(*args, **kwargs):
raise ClientException('put_object(%r, %r, ...) failure and no '
'ability to reset contents for reupload.' % (container, obj))
reset_func = _default_reset
tell = getattr(contents, 'tell', None)
seek = getattr(contents, 'seek', None)
if tell and seek:
orig_pos = tell()
reset_func = lambda *a, **k: seek(orig_pos)
return self._retry(reset_func, put_object, container, obj, contents,
content_length=content_length, etag=etag, chunk_size=chunk_size,
content_type=content_type, headers=headers)
def post_object(self, container, obj, headers):
"""Wrapper for :func:`post_object`"""
return self._retry(post_object, container, obj, headers)
return self._retry(None, post_object, container, obj, headers)
def delete_object(self, container, obj):
"""Wrapper for :func:`delete_object`"""
return self._retry(delete_object, container, obj)
return self._retry(None, delete_object, container, obj)

View File

@ -14,7 +14,10 @@
# limitations under the License.
# TODO: More tests
import socket
import unittest
from StringIO import StringIO
from urlparse import urlparse
# TODO: mock http connection class with more control over headers
from test.unit.proxy.test_server import fake_http_connect
@ -377,5 +380,97 @@ class TestConnection(MockHttpTest):
self.assertEquals(conn.url, 'http://www.new.com')
self.assertEquals(conn.token, 'new')
def test_reset_stream(self):
class LocalContents(object):
def __init__(self, tell_value=0):
self.already_read = False
self.seeks = []
self.tell_value = tell_value
def tell(self):
return self.tell_value
def seek(self, position):
self.seeks.append(position)
self.already_read = False
def read(self, size=-1):
if self.already_read:
return ''
else:
self.already_read = True
return 'abcdef'
class LocalConnection(object):
def putrequest(self, *args, **kwargs):
return
def putheader(self, *args, **kwargs):
return
def endheaders(self, *args, **kwargs):
return
def send(self, *args, **kwargs):
raise socket.error('oops')
def request(self, *args, **kwargs):
return
def getresponse(self, *args, **kwargs):
self.status = 200
return self
def getheader(self, *args, **kwargs):
return ''
def read(self, *args, **kwargs):
return ''
def local_http_connection(url):
parsed = urlparse(url)
return parsed, LocalConnection()
orig_conn = c.http_connection
try:
c.http_connection = local_http_connection
conn = c.Connection('http://www.example.com', 'asdf', 'asdf',
retries=1, starting_backoff=.0001)
contents = LocalContents()
exc = None
try:
conn.put_object('c', 'o', contents)
except socket.error, err:
exc = err
self.assertEquals(contents.seeks, [0])
self.assertEquals(str(exc), 'oops')
contents = LocalContents(tell_value=123)
exc = None
try:
conn.put_object('c', 'o', contents)
except socket.error, err:
exc = err
self.assertEquals(contents.seeks, [123])
self.assertEquals(str(exc), 'oops')
contents = LocalContents()
contents.tell = None
exc = None
try:
conn.put_object('c', 'o', contents)
except c.ClientException, err:
exc = err
self.assertEquals(contents.seeks, [])
self.assertEquals(str(exc), "put_object('c', 'o', ...) failure "
"and no ability to reset contents for reupload.")
finally:
c.http_connection = orig_conn
if __name__ == '__main__':
unittest.main()