diff --git a/swift/common/direct_client.py b/swift/common/direct_client.py index b4954dfbe3..e5610f9d29 100644 --- a/swift/common/direct_client.py +++ b/swift/common/direct_client.py @@ -28,7 +28,7 @@ from eventlet import sleep, Timeout from swift.common.bufferedhttp import http_connect from swiftclient import ClientException, json_loads -from swift.common.utils import normalize_timestamp +from swift.common.utils import normalize_timestamp, FileLikeIter from swift.common.http import HTTP_NO_CONTENT, HTTP_INSUFFICIENT_STORAGE, \ is_success, is_server_error from swift.common.swob import HeaderKeyDict @@ -305,7 +305,7 @@ def direct_get_object(node, part, account, container, obj, conn_timeout=5, def direct_put_object(node, part, account, container, name, contents, content_length=None, etag=None, content_type=None, headers=None, conn_timeout=5, response_timeout=15, - resp_chunk_size=None): + chunk_size=65535): """ Put object directly from the object server. @@ -324,7 +324,7 @@ def direct_put_object(node, part, account, container, name, contents, :param chunk_size: if defined, chunk size of data to send. :returns: etag from the server response """ - # TODO: Add chunked puts + path = '/%s/%s/%s' % (account, container, name) if headers is None: headers = {} @@ -332,6 +332,10 @@ def direct_put_object(node, part, account, container, name, contents, headers['ETag'] = etag.strip('"') if content_length is not None: headers['Content-Length'] = str(content_length) + else: + for n, v in headers.iteritems(): + if n.lower() == 'content-length': + content_length = int(v) if content_type is not None: headers['Content-Type'] = content_type else: @@ -340,11 +344,36 @@ def direct_put_object(node, part, account, container, name, contents, headers['Content-Length'] = '0' if isinstance(contents, basestring): contents = [contents] + #Incase the caller want to insert an object with specific age + add_ts = 'X-Timestamp' not in headers + + if content_length is None: + headers['Transfer-Encoding'] = 'chunked' + with Timeout(conn_timeout): conn = http_connect(node['ip'], node['port'], node['device'], part, - 'PUT', path, headers=gen_headers(headers, True)) - for chunk in contents: - conn.send(chunk) + 'PUT', path, headers=gen_headers(headers, add_ts)) + + contents_f = FileLikeIter(contents) + + if content_length is None: + chunk = contents_f.read(chunk_size) + while chunk: + conn.send('%x\r\n%s\r\n' % (len(chunk), chunk)) + chunk = contents_f.read(chunk_size) + conn.send('0\r\n\r\n') + else: + left = content_length + while left > 0: + size = chunk_size + if size > left: + size = left + chunk = contents_f.read(size) + if not chunk: + break + conn.send(chunk) + left -= len(chunk) + with Timeout(response_timeout): resp = conn.getresponse() resp.read() diff --git a/test/unit/common/test_direct_client.py b/test/unit/common/test_direct_client.py index 7bcfc705ec..17ba91a3a1 100644 --- a/test/unit/common/test_direct_client.py +++ b/test/unit/common/test_direct_client.py @@ -13,12 +13,62 @@ # See the License for the specific language governing permissions and # limitations under the License. -# TODO: Tests - import unittest import os +import StringIO +import cPickle as pickle +from hashlib import md5 + from swift.common import direct_client +from swiftclient import ClientException, json_loads + + +def mock_http_connect(status, fake_headers=None, body=None): + + class FakeConn(object): + + def __init__(self, status, fake_headers, body, *args, **kwargs): + self.status = status + self.reason = 'Fake' + self.body = body + self.host = args[0] + self.port = args[1] + self.method = args[4] + self.path = args[5] + self.with_exc = False + self.headers = kwargs.get('headers', {}) + self.fake_headers = fake_headers + self.etag = md5() + + def getresponse(self): + if self.with_exc: + raise Exception('test') + + if self.fake_headers is not None and self.method == 'POST': + self.fake_headers.append(self.headers) + return self + + def getheader(self, header, default=None): + return self.headers.get(header.lower(), default) + + def getheaders(self): + if self.fake_headers is not None: + for key in self.fake_headers: + self.headers.update({key: self.fake_headers[key]}) + return self.headers.items() + + def read(self): + return self.body + + def send(self, data): + self.etag.update(data) + self.headers['etag'] = str(self.etag.hexdigest()) + + def close(self): + return + return lambda *args, **kwargs: FakeConn(status, fake_headers, body, + *args, **kwargs) class TestDirectClient(unittest.TestCase): @@ -53,6 +103,263 @@ class TestDirectClient(unittest.TestCase): assert hdrs['user-agent'] == 'direct-client %s' % os.getpid() assert len(hdrs.keys()) == 1 + def test_direct_get_account(self): + node = {'ip':'1.2.3.4', 'port':'6000', 'device':'sda'} + part = '0' + account = 'a' + container = 'c' + name = 'o' + headers = { + 'X-Account-Container-Count': '1', + 'X-Account-Object-Count': '1', + 'X-Account-Bytes-Used': '1', + 'X-Timestamp': '1234567890', + 'X-PUT-Timestamp': '1234567890'} + + body = '[{"count": 1, "bytes": 20971520, "name": "c1"}]' + + fake_headers = {} + for header, value in headers.items(): + fake_headers[header.lower()] = value + + was_http_connector = direct_client.http_connect + direct_client.http_connect = mock_http_connect(200, fake_headers, body) + + resp_headers, resp = direct_client.direct_get_account(node, part, account) + + fake_headers.update({'user-agent':'direct-client %s' % os.getpid()}) + self.assertEqual(fake_headers, resp_headers) + self.assertEqual(json_loads(body), resp) + + + direct_client.http_connect = mock_http_connect(204, fake_headers, body) + + resp_headers, resp = direct_client.direct_get_account(node, part, account) + + fake_headers.update({'user-agent':'direct-client %s' % os.getpid()}) + self.assertEqual(fake_headers, resp_headers) + self.assertEqual([], resp) + + + direct_client.http_connect = was_http_connector + + def test_direct_head_container(self): + node = {'ip':'1.2.3.4', 'port':'6000', 'device':'sda'} + part = '0' + account = 'a' + container = 'c' + name = 'o' + contents = StringIO.StringIO('123456') + headers = {'key':'value'} + + + was_http_connector = direct_client.http_connect + direct_client.http_connect = mock_http_connect(200, headers) + + resp = direct_client.direct_head_container(node, part, account, container) + + headers.update({'user-agent':'direct-client %s' % os.getpid()}) + self.assertEqual(headers, resp) + + direct_client.http_connect = was_http_connector + + def test_direct_get_container(self): + node = {'ip':'1.2.3.4', 'port':'6000', 'device':'sda'} + part = '0' + account = 'a' + container = 'c' + name = 'o' + contents = StringIO.StringIO('123456') + headers = {'key':'value'} + body = '[{"hash": "8f4e3", "last_modified": "317260", "bytes": 209}]' + + + was_http_connector = direct_client.http_connect + direct_client.http_connect = mock_http_connect(200, headers, body) + + resp_headers, resp = \ + direct_client.direct_get_container(node, part, account, container) + + headers.update({'user-agent':'direct-client %s' % os.getpid()}) + self.assertEqual(headers, resp_headers) + self.assertEqual(json_loads(body), resp) + + + direct_client.http_connect = mock_http_connect(204, headers, body) + + resp_headers, resp = \ + direct_client.direct_get_container(node, part, account, container) + + headers.update({'user-agent':'direct-client %s' % os.getpid()}) + self.assertEqual(headers, resp_headers) + self.assertEqual([], resp) + + + direct_client.http_connect = was_http_connector + + def test_direct_delete_container(self): + node = {'ip':'1.2.3.4', 'port':'6000', 'device':'sda'} + part = '0' + account = 'a' + container = 'c' + name = 'o' + contents = StringIO.StringIO('123456') + headers = {'key':'value'} + + + was_http_connector = direct_client.http_connect + direct_client.http_connect = mock_http_connect(200) + + direct_client.direct_delete_container(node, part, account, container) + + direct_client.http_connect = was_http_connector + + def test_direct_head_object(self): + node = {'ip':'1.2.3.4', 'port':'6000', 'device':'sda'} + part = '0' + account = 'a' + container = 'c' + name = 'o' + contents = StringIO.StringIO('123456') + headers = {'key':'value'} + + + was_http_connector = direct_client.http_connect + direct_client.http_connect = mock_http_connect(200, headers) + + resp = direct_client.direct_head_object(node, part, account, + container, name) + headers.update({'user-agent':'direct-client %s' % os.getpid()}) + self.assertEqual(headers, resp) + + direct_client.http_connect = was_http_connector + + def test_direct_get_object(self): + node = {'ip':'1.2.3.4', 'port':'6000', 'device':'sda'} + part = '0' + account = 'a' + container = 'c' + name = 'o' + contents = StringIO.StringIO('123456') + + was_http_connector = direct_client.http_connect + direct_client.http_connect = mock_http_connect(200, body=contents) + + resp_header, obj_body = \ + direct_client.direct_get_object(node, part, account, container, name) + self.assertEqual(obj_body, contents) + + direct_client.http_connect = was_http_connector + + pass + + def test_direct_post_object(self): + node = {'ip':'1.2.3.4', 'port':'6000', 'device':'sda'} + part = '0' + account = 'a' + container = 'c' + name = 'o' + contents = StringIO.StringIO('123456') + headers = {'Key':'value'} + + fake_headers = [] + + was_http_connector = direct_client.http_connect + direct_client.http_connect = mock_http_connect(200, fake_headers) + + direct_client.direct_post_object(node, part, account, + container, name, headers) + self.assertEqual(headers['Key'], fake_headers[0].get('Key')) + + direct_client.http_connect = was_http_connector + + def test_direct_delete_object(self): + node = {'ip':'1.2.3.4', 'port':'6000', 'device':'sda'} + part = '0' + account = 'a' + container = 'c' + name = 'o' + contents = StringIO.StringIO('123456') + + was_http_connector = direct_client.http_connect + direct_client.http_connect = mock_http_connect(200) + + direct_client.direct_delete_object(node, part, account, container, name) + + direct_client.http_connect = was_http_connector + + def test_direct_put_object(self): + node = {'ip':'1.2.3.4', 'port':'6000', 'device':'sda'} + part = '0' + account = 'a' + container = 'c' + name = 'o' + contents = StringIO.StringIO('123456') + + was_http_connector = direct_client.http_connect + direct_client.http_connect = mock_http_connect(200) + + resp = direct_client.direct_put_object(node, part, account, + container, name, contents, 6) + self.assertEqual(md5('123456').hexdigest(), resp) + + direct_client.http_connect = was_http_connector + + def test_direct_put_object_fail(self): + node = {'ip':'1.2.3.4', 'port':'6000', 'device':'sda'} + part = '0' + account = 'a' + container = 'c' + name = 'o' + contents = StringIO.StringIO('123456') + + was_http_connector = direct_client.http_connect + direct_client.http_connect = mock_http_connect(500) + + self.assertRaises(direct_client.ClientException,\ + direct_client.direct_put_object, node, part, account,\ + container, name, contents) + + direct_client.http_connect = was_http_connector + + def test_direct_put_object_chunked(self): + node = {'ip':'1.2.3.4', 'port':'6000', 'device':'sda'} + part = '0' + account = 'a' + container = 'c' + name = 'o' + contents = StringIO.StringIO('123456') + + was_http_connector = direct_client.http_connect + direct_client.http_connect = mock_http_connect(200) + + resp = direct_client.direct_put_object(node, part, account,\ + container, name, contents) + self.assertEqual(md5('6\r\n123456\r\n0\r\n\r\n').hexdigest(), resp) + + direct_client.http_connect = was_http_connector + + def test_retry(self): + node = {'ip':'1.2.3.4', 'port':'6000', 'device':'sda'} + part = '0' + account = 'a' + container = 'c' + name = 'o' + contents = StringIO.StringIO('123456') + headers = {'key':'value'} + + was_http_connector = direct_client.http_connect + direct_client.http_connect = mock_http_connect(200, headers) + + attempts, resp = direct_client.retry(direct_client.direct_head_object,\ + node, part, account, container, name) + headers.update({'user-agent':'direct-client %s' % os.getpid()}) + self.assertEqual(headers, resp) + self.assertEqual(attempts, 1) + + + direct_client.http_connect = was_http_connector if __name__ == '__main__': unittest.main() +