Refactoring the PUT method

Extracting large chunks of the PUT method into smaller
methods to improve maintainability and reuse of code.

Based on the work that Clay Gerrard started:
https://review.openstack.org/#/c/77812/

Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com>

Change-Id: Id479fc5b159a2782361ac4a6e4a6d8bbaee4fe85
Signed-off-by: Thiago da Silva <thiago@redhat.com>
This commit is contained in:
Thiago da Silva 2015-02-17 16:55:34 -05:00
parent 3d3db0ab78
commit 23d0842dec
5 changed files with 424 additions and 245 deletions

View File

@ -928,6 +928,10 @@ class Request(object):
if entity_path is not None: if entity_path is not None:
return '/' + entity_path return '/' + entity_path
@property
def is_chunked(self):
return 'chunked' in self.headers.get('transfer-encoding', '')
@property @property
def url(self): def url(self):
"Provides the full url of the request" "Provides the full url of the request"

View File

@ -52,12 +52,13 @@ from swift.common.http import (
HTTP_MULTIPLE_CHOICES, HTTP_NOT_FOUND, HTTP_INTERNAL_SERVER_ERROR, HTTP_MULTIPLE_CHOICES, HTTP_NOT_FOUND, HTTP_INTERNAL_SERVER_ERROR,
HTTP_SERVICE_UNAVAILABLE, HTTP_INSUFFICIENT_STORAGE, HTTP_SERVICE_UNAVAILABLE, HTTP_INSUFFICIENT_STORAGE,
HTTP_PRECONDITION_FAILED, HTTP_CONFLICT) HTTP_PRECONDITION_FAILED, HTTP_CONFLICT)
from swift.common.storage_policy import POLICIES
from swift.proxy.controllers.base import Controller, delay_denial, \ from swift.proxy.controllers.base import Controller, delay_denial, \
cors_validation cors_validation
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \
HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \ HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \
HTTPServerError, HTTPServiceUnavailable, Request, \ HTTPServerError, HTTPServiceUnavailable, Request, \
HTTPClientDisconnect, HeaderKeyDict HTTPClientDisconnect, HeaderKeyDict, HTTPException
from swift.common.request_helpers import is_sys_or_user_meta, is_sys_meta, \ from swift.common.request_helpers import is_sys_or_user_meta, is_sys_meta, \
remove_items, copy_header_subset remove_items, copy_header_subset
@ -321,7 +322,13 @@ class ObjectController(Controller):
def _connect_put_node(self, nodes, part, path, headers, def _connect_put_node(self, nodes, part, path, headers,
logger_thread_locals): logger_thread_locals):
"""Method for a file PUT connect""" """
Make a connection for a replicated object.
Connects to the first working node that it finds in node_iter
and sends over the request headers. Returns an HTTPConnection
object to handle the rest of the streaming.
"""
self.app.logger.thread_locals = logger_thread_locals self.app.logger.thread_locals = logger_thread_locals
for node in nodes: for node in nodes:
try: try:
@ -350,7 +357,8 @@ class ObjectController(Controller):
self.app.error_limit(node, _('ERROR Insufficient Storage')) self.app.error_limit(node, _('ERROR Insufficient Storage'))
elif is_server_error(resp.status): elif is_server_error(resp.status):
self.app.error_occurred( self.app.error_occurred(
node, _('ERROR %(status)d Expect: 100-continue ' node,
_('ERROR %(status)d Expect: 100-continue '
'From Object Server') % { 'From Object Server') % {
'status': resp.status}) 'status': resp.status})
except (Exception, Timeout): except (Exception, Timeout):
@ -358,28 +366,32 @@ class ObjectController(Controller):
node, _('Object'), node, _('Object'),
_('Expect: 100-continue on %s') % path) _('Expect: 100-continue on %s') % path)
def _get_put_responses(self, req, conns, nodes): def _await_response(self, conn, **kwargs):
statuses = []
reasons = []
bodies = []
etags = set()
def get_conn_response(conn):
try:
with Timeout(self.app.node_timeout): with Timeout(self.app.node_timeout):
if conn.resp: if conn.resp:
return (conn, conn.resp) return conn.resp
else: else:
return (conn, conn.getresponse()) return conn.getresponse()
def _get_conn_response(self, conn, req, **kwargs):
try:
resp = self._await_response(conn, **kwargs)
return (conn, resp)
except (Exception, Timeout): except (Exception, Timeout):
self.app.exception_occurred( self.app.exception_occurred(
conn.node, _('Object'), conn.node, _('Object'),
_('Trying to get final status of PUT to %s') % req.path) _('Trying to get final status of PUT to %s') % req.path)
return (None, None) return (None, None)
def _get_put_responses(self, req, conns, nodes):
statuses = []
reasons = []
bodies = []
etags = set()
pile = GreenAsyncPile(len(conns)) pile = GreenAsyncPile(len(conns))
for conn in conns: for conn in conns:
pile.spawn(get_conn_response, conn) pile.spawn(self._get_conn_response, conn, req)
def _handle_response(conn, response): def _handle_response(conn, response):
statuses.append(response.status) statuses.append(response.status)
@ -440,56 +452,135 @@ class ObjectController(Controller):
return req, delete_at_container, delete_at_part, delete_at_nodes return req, delete_at_container, delete_at_part, delete_at_nodes
@public def _handle_copy_request(self, req):
@cors_validation """
@delay_denial This method handles copying objects based on values set in the headers
def PUT(self, req): 'X-Copy-From' and 'X-Copy-From-Account'
"""HTTP PUT request handler."""
if req.if_none_match is not None and '*' not in req.if_none_match: This method was added as part of the refactoring of the PUT method and
# Sending an etag with if-none-match isn't currently supported the functionality is expected to be moved to middleware
return HTTPBadRequest(request=req, content_type='text/plain', """
body='If-None-Match only supports *') if req.environ.get('swift.orig_req_method', req.method) != 'POST':
req.environ.setdefault('swift.log_info', []).append(
'x-copy-from:%s' % req.headers['X-Copy-From'])
ver, acct, _rest = req.split_path(2, 3, True)
src_account_name = req.headers.get('X-Copy-From-Account', None)
if src_account_name:
src_account_name = check_account_format(req, src_account_name)
else:
src_account_name = acct
src_container_name, src_obj_name = check_copy_from_header(req)
source_header = '/%s/%s/%s/%s' % (
ver, src_account_name, src_container_name, src_obj_name)
source_req = req.copy_get()
# make sure the source request uses it's container_info
source_req.headers.pop('X-Backend-Storage-Policy-Index', None)
source_req.path_info = source_header
source_req.headers['X-Newest'] = 'true'
orig_obj_name = self.object_name
orig_container_name = self.container_name
orig_account_name = self.account_name
sink_req = Request.blank(req.path_info,
environ=req.environ, headers=req.headers)
self.object_name = src_obj_name
self.container_name = src_container_name
self.account_name = src_account_name
source_resp = self.GET(source_req)
# This gives middlewares a way to change the source; for example,
# this lets you COPY a SLO manifest and have the new object be the
# concatenation of the segments (like what a GET request gives
# the client), not a copy of the manifest file.
hook = req.environ.get(
'swift.copy_hook',
(lambda source_req, source_resp, sink_req: source_resp))
source_resp = hook(source_req, source_resp, sink_req)
# reset names
self.object_name = orig_obj_name
self.container_name = orig_container_name
self.account_name = orig_account_name
if source_resp.status_int >= HTTP_MULTIPLE_CHOICES:
# this is a bit of ugly code, but I'm willing to live with it
# until copy request handling moves to middleware
return source_resp, None, None, None
if source_resp.content_length is None:
# This indicates a transfer-encoding: chunked source object,
# which currently only happens because there are more than
# CONTAINER_LISTING_LIMIT segments in a segmented object. In
# this case, we're going to refuse to do the server-side copy.
raise HTTPRequestEntityTooLarge(request=req)
if source_resp.content_length > constraints.MAX_FILE_SIZE:
raise HTTPRequestEntityTooLarge(request=req)
data_source = iter(source_resp.app_iter)
sink_req.content_length = source_resp.content_length
sink_req.etag = source_resp.etag
# we no longer need the X-Copy-From header
del sink_req.headers['X-Copy-From']
if 'X-Copy-From-Account' in sink_req.headers:
del sink_req.headers['X-Copy-From-Account']
if not req.content_type_manually_set:
sink_req.headers['Content-Type'] = \
source_resp.headers['Content-Type']
if config_true_value(
sink_req.headers.get('x-fresh-metadata', 'false')):
# post-as-copy: ignore new sysmeta, copy existing sysmeta
condition = lambda k: is_sys_meta('object', k)
remove_items(sink_req.headers, condition)
copy_header_subset(source_resp, sink_req, condition)
else:
# copy/update existing sysmeta and user meta
copy_headers_into(source_resp, sink_req)
copy_headers_into(req, sink_req)
# copy over x-static-large-object for POSTs and manifest copies
if 'X-Static-Large-Object' in source_resp.headers and \
req.params.get('multipart-manifest') == 'get':
sink_req.headers['X-Static-Large-Object'] = \
source_resp.headers['X-Static-Large-Object']
req = sink_req
def update_response(req, resp):
acct, path = source_resp.environ['PATH_INFO'].split('/', 3)[2:4]
resp.headers['X-Copied-From-Account'] = quote(acct)
resp.headers['X-Copied-From'] = quote(path)
if 'last-modified' in source_resp.headers:
resp.headers['X-Copied-From-Last-Modified'] = \
source_resp.headers['last-modified']
copy_headers_into(req, resp)
return resp
# this is a bit of ugly code, but I'm willing to live with it
# until copy request handling moves to middleware
return None, req, data_source, update_response
def _handle_object_versions(self, req):
"""
This method handles versionining of objects in containers that
have the feature enabled.
When a new PUT request is sent, the proxy checks for previous versions
of that same object name. If found, it is copied to a different
container and the new version is stored in its place.
This method was added as part of the PUT method refactoring and the
functionality is expected to be moved to middleware
"""
container_info = self.container_info( container_info = self.container_info(
self.account_name, self.container_name, req) self.account_name, self.container_name, req)
policy_index = req.headers.get('X-Backend-Storage-Policy-Index', policy_index = req.headers.get('X-Backend-Storage-Policy-Index',
container_info['storage_policy']) container_info['storage_policy'])
obj_ring = self.app.get_object_ring(policy_index) obj_ring = self.app.get_object_ring(policy_index)
# pass the policy index to storage nodes via req header
req.headers['X-Backend-Storage-Policy-Index'] = policy_index
container_partition = container_info['partition']
containers = container_info['nodes']
req.acl = container_info['write_acl']
req.environ['swift_sync_key'] = container_info['sync_key']
object_versions = container_info['versions']
if 'swift.authorize' in req.environ:
aresp = req.environ['swift.authorize'](req)
if aresp:
return aresp
if not containers:
return HTTPNotFound(request=req)
# Sometimes the 'content-type' header exists, but is set to None.
content_type_manually_set = True
detect_content_type = \
config_true_value(req.headers.get('x-detect-content-type'))
if detect_content_type or not req.headers.get('content-type'):
guessed_type, _junk = mimetypes.guess_type(req.path_info)
req.headers['Content-Type'] = guessed_type or \
'application/octet-stream'
if detect_content_type:
req.headers.pop('x-detect-content-type')
else:
content_type_manually_set = False
error_response = check_object_creation(req, self.object_name) or \
check_content_type(req)
if error_response:
return error_response
partition, nodes = obj_ring.get_nodes( partition, nodes = obj_ring.get_nodes(
self.account_name, self.container_name, self.object_name) self.account_name, self.container_name, self.object_name)
object_versions = container_info['versions']
# do a HEAD request for checking object versions # do a HEAD request for checking object versions
if object_versions and not req.environ.get('swift_versioned_copy'): if object_versions and not req.environ.get('swift_versioned_copy'):
@ -502,20 +593,6 @@ class ObjectController(Controller):
hreq, _('Object'), obj_ring, partition, hreq, _('Object'), obj_ring, partition,
hreq.swift_entity_path) hreq.swift_entity_path)
# Used by container sync feature
if 'x-timestamp' in req.headers:
try:
req_timestamp = Timestamp(req.headers['X-Timestamp'])
except ValueError:
return HTTPBadRequest(
request=req, content_type='text/plain',
body='X-Timestamp should be a UNIX timestamp float value; '
'was %r' % req.headers['x-timestamp'])
req.headers['X-Timestamp'] = req_timestamp.internal
else:
req.headers['X-Timestamp'] = Timestamp(time.time()).internal
if object_versions and not req.environ.get('swift_versioned_copy'):
is_manifest = 'X-Object-Manifest' in req.headers or \ is_manifest = 'X-Object-Manifest' in req.headers or \
'X-Object-Manifest' in hresp.headers 'X-Object-Manifest' in hresp.headers
if hresp.status_int != HTTP_NOT_FOUND and not is_manifest: if hresp.status_int != HTTP_NOT_FOUND and not is_manifest:
@ -543,120 +620,41 @@ class ObjectController(Controller):
copy_resp = self.COPY(copy_req) copy_resp = self.COPY(copy_req)
if is_client_error(copy_resp.status_int): if is_client_error(copy_resp.status_int):
# missing container or bad permissions # missing container or bad permissions
return HTTPPreconditionFailed(request=req) raise HTTPPreconditionFailed(request=req)
elif not is_success(copy_resp.status_int): elif not is_success(copy_resp.status_int):
# could not copy the data, bail # could not copy the data, bail
return HTTPServiceUnavailable(request=req) raise HTTPServiceUnavailable(request=req)
reader = req.environ['wsgi.input'].read def _update_content_type(self, req):
data_source = iter(lambda: reader(self.app.client_chunk_size), '') # Sometimes the 'content-type' header exists, but is set to None.
source_header = req.headers.get('X-Copy-From') req.content_type_manually_set = True
source_resp = None detect_content_type = \
if source_header: config_true_value(req.headers.get('x-detect-content-type'))
if req.environ.get('swift.orig_req_method', req.method) != 'POST': if detect_content_type or not req.headers.get('content-type'):
req.environ.setdefault('swift.log_info', []).append( guessed_type, _junk = mimetypes.guess_type(req.path_info)
'x-copy-from:%s' % source_header) req.headers['Content-Type'] = guessed_type or \
ver, acct, _rest = req.split_path(2, 3, True) 'application/octet-stream'
src_account_name = req.headers.get('X-Copy-From-Account', None) if detect_content_type:
if src_account_name: req.headers.pop('x-detect-content-type')
src_account_name = check_account_format(req, src_account_name)
else: else:
src_account_name = acct req.content_type_manually_set = False
src_container_name, src_obj_name = check_copy_from_header(req)
source_header = '/%s/%s/%s/%s' % (
ver, src_account_name, src_container_name, src_obj_name)
source_req = req.copy_get()
# make sure the source request uses it's container_info def _update_x_timestamp(self, req):
source_req.headers.pop('X-Backend-Storage-Policy-Index', None) # Used by container sync feature
source_req.path_info = source_header if 'x-timestamp' in req.headers:
source_req.headers['X-Newest'] = 'true' try:
orig_obj_name = self.object_name req_timestamp = Timestamp(req.headers['X-Timestamp'])
orig_container_name = self.container_name except ValueError:
orig_account_name = self.account_name raise HTTPBadRequest(
self.object_name = src_obj_name request=req, content_type='text/plain',
self.container_name = src_container_name body='X-Timestamp should be a UNIX timestamp float value; '
self.account_name = src_account_name 'was %r' % req.headers['x-timestamp'])
sink_req = Request.blank(req.path_info, req.headers['X-Timestamp'] = req_timestamp.internal
environ=req.environ, headers=req.headers)
source_resp = self.GET(source_req)
# This gives middlewares a way to change the source; for example,
# this lets you COPY a SLO manifest and have the new object be the
# concatenation of the segments (like what a GET request gives
# the client), not a copy of the manifest file.
hook = req.environ.get(
'swift.copy_hook',
(lambda source_req, source_resp, sink_req: source_resp))
source_resp = hook(source_req, source_resp, sink_req)
if source_resp.status_int >= HTTP_MULTIPLE_CHOICES:
return source_resp
self.object_name = orig_obj_name
self.container_name = orig_container_name
self.account_name = orig_account_name
data_source = iter(source_resp.app_iter)
sink_req.content_length = source_resp.content_length
if sink_req.content_length is None:
# This indicates a transfer-encoding: chunked source object,
# which currently only happens because there are more than
# CONTAINER_LISTING_LIMIT segments in a segmented object. In
# this case, we're going to refuse to do the server-side copy.
return HTTPRequestEntityTooLarge(request=req)
if sink_req.content_length > constraints.MAX_FILE_SIZE:
return HTTPRequestEntityTooLarge(request=req)
sink_req.etag = source_resp.etag
# we no longer need the X-Copy-From header
del sink_req.headers['X-Copy-From']
if 'X-Copy-From-Account' in sink_req.headers:
del sink_req.headers['X-Copy-From-Account']
if not content_type_manually_set:
sink_req.headers['Content-Type'] = \
source_resp.headers['Content-Type']
if config_true_value(
sink_req.headers.get('x-fresh-metadata', 'false')):
# post-as-copy: ignore new sysmeta, copy existing sysmeta
condition = lambda k: is_sys_meta('object', k)
remove_items(sink_req.headers, condition)
copy_header_subset(source_resp, sink_req, condition)
else: else:
# copy/update existing sysmeta and user meta req.headers['X-Timestamp'] = Timestamp(time.time()).internal
copy_headers_into(source_resp, sink_req) return None
copy_headers_into(req, sink_req)
# copy over x-static-large-object for POSTs and manifest copies
if 'X-Static-Large-Object' in source_resp.headers and \
req.params.get('multipart-manifest') == 'get':
sink_req.headers['X-Static-Large-Object'] = \
source_resp.headers['X-Static-Large-Object']
req = sink_req
req, delete_at_container, delete_at_part, \
delete_at_nodes = self._config_obj_expiration(req)
node_iter = GreenthreadSafeIterator(
self.iter_nodes_local_first(obj_ring, partition))
pile = GreenPile(len(nodes))
te = req.headers.get('transfer-encoding', '')
chunked = ('chunked' in te)
outgoing_headers = self._backend_requests(
req, len(nodes), container_partition, containers,
delete_at_container, delete_at_part, delete_at_nodes)
for nheaders in outgoing_headers:
# RFC2616:8.2.3 disallows 100-continue without a body
if (req.content_length > 0) or chunked:
nheaders['Expect'] = '100-continue'
pile.spawn(self._connect_put_node, node_iter, partition,
req.swift_entity_path, nheaders,
self.app.logger.thread_locals)
conns = [conn for conn in pile if conn]
min_conns = quorum_size(len(nodes))
def _check_failure_put_connections(self, conns, req, nodes):
if req.if_none_match is not None and '*' in req.if_none_match: if req.if_none_match is not None and '*' in req.if_none_match:
statuses = [conn.resp.status for conn in conns if conn.resp] statuses = [conn.resp.status for conn in conns if conn.resp]
if HTTP_PRECONDITION_FAILED in statuses: if HTTP_PRECONDITION_FAILED in statuses:
@ -664,7 +662,7 @@ class ObjectController(Controller):
self.app.logger.debug( self.app.logger.debug(
_('Object PUT returning 412, %(statuses)r'), _('Object PUT returning 412, %(statuses)r'),
{'statuses': statuses}) {'statuses': statuses})
return HTTPPreconditionFailed(request=req) raise HTTPPreconditionFailed(request=req)
if any(conn for conn in conns if conn.resp and if any(conn for conn in conns if conn.resp and
conn.resp.status == HTTP_CONFLICT): conn.resp.status == HTTP_CONFLICT):
@ -675,14 +673,44 @@ class ObjectController(Controller):
'%(req_timestamp)s <= %(timestamps)r'), '%(req_timestamp)s <= %(timestamps)r'),
{'req_timestamp': req.timestamp.internal, {'req_timestamp': req.timestamp.internal,
'timestamps': ', '.join(timestamps)}) 'timestamps': ', '.join(timestamps)})
return HTTPAccepted(request=req) raise HTTPAccepted(request=req)
min_conns = quorum_size(len(nodes))
self._check_min_conn(req, conns, min_conns)
def _get_put_connections(self, req, nodes, partition, outgoing_headers,
policy, expect):
"""
Establish connections to storage nodes for PUT request
"""
obj_ring = policy.object_ring
node_iter = GreenthreadSafeIterator(
self.iter_nodes_local_first(obj_ring, partition))
pile = GreenPile(len(nodes))
for nheaders in outgoing_headers:
if expect:
nheaders['Expect'] = '100-continue'
pile.spawn(self._connect_put_node, node_iter, partition,
req.swift_entity_path, nheaders,
self.app.logger.thread_locals)
conns = [conn for conn in pile if conn]
return conns
def _check_min_conn(self, req, conns, min_conns, msg=None):
msg = msg or 'Object PUT returning 503, %(conns)s/%(nodes)s ' \
'required connections'
if len(conns) < min_conns: if len(conns) < min_conns:
self.app.logger.error( self.app.logger.error((msg),
_('Object PUT returning 503, %(conns)s/%(nodes)s '
'required connections'),
{'conns': len(conns), 'nodes': min_conns}) {'conns': len(conns), 'nodes': min_conns})
return HTTPServiceUnavailable(request=req) raise HTTPServiceUnavailable(request=req)
def _transfer_data(self, req, data_source, conns, nodes):
min_conns = quorum_size(len(nodes))
bytes_transferred = 0 bytes_transferred = 0
try: try:
with ContextPool(len(nodes)) as pool: with ContextPool(len(nodes)) as pool:
@ -695,48 +723,90 @@ class ObjectController(Controller):
try: try:
chunk = next(data_source) chunk = next(data_source)
except StopIteration: except StopIteration:
if chunked: if req.is_chunked:
for conn in conns: for conn in conns:
conn.queue.put('0\r\n\r\n') conn.queue.put('0\r\n\r\n')
break break
bytes_transferred += len(chunk) bytes_transferred += len(chunk)
if bytes_transferred > constraints.MAX_FILE_SIZE: if bytes_transferred > constraints.MAX_FILE_SIZE:
return HTTPRequestEntityTooLarge(request=req) raise HTTPRequestEntityTooLarge(request=req)
for conn in list(conns): for conn in list(conns):
if not conn.failed: if not conn.failed:
conn.queue.put( conn.queue.put(
'%x\r\n%s\r\n' % (len(chunk), chunk) '%x\r\n%s\r\n' % (len(chunk), chunk)
if chunked else chunk) if req.is_chunked else chunk)
else: else:
conn.close()
conns.remove(conn) conns.remove(conn)
if len(conns) < min_conns: self._check_min_conn(
self.app.logger.error(_( req, conns, min_conns,
'Object PUT exceptions during' msg='Object PUT exceptions during'
' send, %(conns)s/%(nodes)s required connections'), ' send, %(conns)s/%(nodes)s required connections')
{'conns': len(conns), 'nodes': min_conns})
return HTTPServiceUnavailable(request=req)
for conn in conns: for conn in conns:
if conn.queue.unfinished_tasks: if conn.queue.unfinished_tasks:
conn.queue.join() conn.queue.join()
conns = [conn for conn in conns if not conn.failed] conns = [conn for conn in conns if not conn.failed]
self._check_min_conn(
req, conns, min_conns,
msg='Object PUT exceptions after last send, '
'%(conns)s/%(nodes)s required connections')
except ChunkReadTimeout as err: except ChunkReadTimeout as err:
self.app.logger.warn( self.app.logger.warn(
_('ERROR Client read timeout (%ss)'), err.seconds) _('ERROR Client read timeout (%ss)'), err.seconds)
self.app.logger.increment('client_timeouts') self.app.logger.increment('client_timeouts')
return HTTPRequestTimeout(request=req) raise HTTPRequestTimeout(request=req)
except HTTPException:
raise
except (Exception, Timeout): except (Exception, Timeout):
self.app.logger.exception( self.app.logger.exception(
_('ERROR Exception causing client disconnect')) _('ERROR Exception causing client disconnect'))
return HTTPClientDisconnect(request=req) raise HTTPClientDisconnect(request=req)
if req.content_length and bytes_transferred < req.content_length: if req.content_length and bytes_transferred < req.content_length:
req.client_disconnect = True req.client_disconnect = True
self.app.logger.warn( self.app.logger.warn(
_('Client disconnected without sending enough data')) _('Client disconnected without sending enough data'))
self.app.logger.increment('client_disconnects') self.app.logger.increment('client_disconnects')
return HTTPClientDisconnect(request=req) raise HTTPClientDisconnect(request=req)
statuses, reasons, bodies, etags = self._get_put_responses(req, conns, def _store_object(self, req, data_source, nodes, partition,
nodes) outgoing_headers):
"""
Store a replicated object.
This method is responsible for establishing connection
with storage nodes and sending object to each one of those
nodes. After sending the data, the "best" reponse will be
returned based on statuses from all connections
"""
policy_idx = req.headers.get('X-Backend-Storage-Policy-Index')
policy = POLICIES.get_by_index(policy_idx)
if not nodes:
return HTTPNotFound()
# RFC2616:8.2.3 disallows 100-continue without a body
if (req.content_length > 0) or req.is_chunked:
expect = True
else:
expect = False
conns = self._get_put_connections(req, nodes, partition,
outgoing_headers, policy, expect)
try:
# check that a minimum number of connections were established and
# meet all the correct conditions set in the request
self._check_failure_put_connections(conns, req, nodes)
# transfer data
self._transfer_data(req, data_source, conns, nodes)
# get responses
statuses, reasons, bodies, etags = self._get_put_responses(
req, conns, nodes)
except HTTPException as resp:
return resp
finally:
for conn in conns:
conn.close()
if len(etags) > 1: if len(etags) > 1:
self.app.logger.error( self.app.logger.error(
@ -745,18 +815,83 @@ class ObjectController(Controller):
etag = etags.pop() if len(etags) else None etag = etags.pop() if len(etags) else None
resp = self.best_response(req, statuses, reasons, bodies, resp = self.best_response(req, statuses, reasons, bodies,
_('Object PUT'), etag=etag) _('Object PUT'), etag=etag)
if source_header:
acct, path = source_header.split('/', 3)[2:4]
resp.headers['X-Copied-From-Account'] = quote(acct)
resp.headers['X-Copied-From'] = quote(path)
if 'last-modified' in source_resp.headers:
resp.headers['X-Copied-From-Last-Modified'] = \
source_resp.headers['last-modified']
copy_headers_into(req, resp)
resp.last_modified = math.ceil( resp.last_modified = math.ceil(
float(Timestamp(req.headers['X-Timestamp']))) float(Timestamp(req.headers['X-Timestamp'])))
return resp return resp
@public
@cors_validation
@delay_denial
def PUT(self, req):
"""HTTP PUT request handler."""
if req.if_none_match is not None and '*' not in req.if_none_match:
# Sending an etag with if-none-match isn't currently supported
return HTTPBadRequest(request=req, content_type='text/plain',
body='If-None-Match only supports *')
container_info = self.container_info(
self.account_name, self.container_name, req)
policy_index = req.headers.get('X-Backend-Storage-Policy-Index',
container_info['storage_policy'])
obj_ring = self.app.get_object_ring(policy_index)
container_nodes = container_info['nodes']
container_partition = container_info['partition']
partition, nodes = obj_ring.get_nodes(
self.account_name, self.container_name, self.object_name)
# pass the policy index to storage nodes via req header
req.headers['X-Backend-Storage-Policy-Index'] = policy_index
req.acl = container_info['write_acl']
req.environ['swift_sync_key'] = container_info['sync_key']
# is request authorized
if 'swift.authorize' in req.environ:
aresp = req.environ['swift.authorize'](req)
if aresp:
return aresp
if not container_info['nodes']:
return HTTPNotFound(request=req)
# update content type in case it is missing
self._update_content_type(req)
# check constraints on object name and request headers
error_response = check_object_creation(req, self.object_name) or \
check_content_type(req)
if error_response:
return error_response
self._update_x_timestamp(req)
# check if versioning is enabled and handle copying previous version
self._handle_object_versions(req)
# check if request is a COPY of an existing object
source_header = req.headers.get('X-Copy-From')
if source_header:
error_response, req, data_source, update_response = \
self._handle_copy_request(req)
if error_response:
return error_response
else:
reader = req.environ['wsgi.input'].read
data_source = iter(lambda: reader(self.app.client_chunk_size), '')
update_response = lambda req, resp: resp
# check if object is set to be automaticaly deleted (i.e. expired)
req, delete_at_container, delete_at_part, \
delete_at_nodes = self._config_obj_expiration(req)
# add special headers to be handled by storage nodes
outgoing_headers = self._backend_requests(
req, len(nodes), container_partition, container_nodes,
delete_at_container, delete_at_part, delete_at_nodes)
# send object to storage nodes
resp = self._store_object(
req, data_source, nodes, partition, outgoing_headers)
return update_response(req, resp)
@public @public
@cors_validation @cors_validation
@delay_denial @delay_denial

View File

@ -733,6 +733,9 @@ def fake_http_connect(*code_iter, **kwargs):
def getheader(self, name, default=None): def getheader(self, name, default=None):
return swob.HeaderKeyDict(self.getheaders()).get(name, default) return swob.HeaderKeyDict(self.getheaders()).get(name, default)
def close(self):
pass
timestamps_iter = iter(kwargs.get('timestamps') or ['1'] * len(code_iter)) timestamps_iter = iter(kwargs.get('timestamps') or ['1'] * len(code_iter))
etag_iter = iter(kwargs.get('etags') or [None] * len(code_iter)) etag_iter = iter(kwargs.get('etags') or [None] * len(code_iter))
if isinstance(kwargs.get('headers'), list): if isinstance(kwargs.get('headers'), list):

View File

@ -993,7 +993,10 @@ class TestObjectController(unittest.TestCase):
headers={'Content-Length': '0', headers={'Content-Length': '0',
'Content-Type': 'text/plain'}) 'Content-Type': 'text/plain'})
self.app.update_request(req) self.app.update_request(req)
try:
res = method(req) res = method(req)
except HTTPException as res:
pass
self.assertEquals(res.status_int, expected) self.assertEquals(res.status_int, expected)
# repeat test # repeat test
@ -1003,7 +1006,10 @@ class TestObjectController(unittest.TestCase):
headers={'Content-Length': '0', headers={'Content-Length': '0',
'Content-Type': 'text/plain'}) 'Content-Type': 'text/plain'})
self.app.update_request(req) self.app.update_request(req)
try:
res = method(req) res = method(req)
except HTTPException as res:
pass
self.assertEquals(res.status_int, expected) self.assertEquals(res.status_int, expected)
@unpatch_policies @unpatch_policies
@ -1734,7 +1740,10 @@ class TestObjectController(unittest.TestCase):
req = Request.blank('/v1/a/c/o.jpg', {}) req = Request.blank('/v1/a/c/o.jpg', {})
req.content_length = 0 req.content_length = 0
self.app.update_request(req) self.app.update_request(req)
try:
res = controller.PUT(req) res = controller.PUT(req)
except HTTPException as res:
pass
expected = str(expected) expected = str(expected)
self.assertEquals(res.status[:len(expected)], expected) self.assertEquals(res.status[:len(expected)], expected)
test_status_map((200, 200, 201, 201, -1), 201) # connect exc test_status_map((200, 200, 201, 201, -1), 201) # connect exc
@ -1763,7 +1772,10 @@ class TestObjectController(unittest.TestCase):
environ={'REQUEST_METHOD': 'PUT'}, environ={'REQUEST_METHOD': 'PUT'},
body='some data') body='some data')
self.app.update_request(req) self.app.update_request(req)
try:
res = controller.PUT(req) res = controller.PUT(req)
except HTTPException as res:
pass
expected = str(expected) expected = str(expected)
self.assertEquals(res.status[:len(expected)], expected) self.assertEquals(res.status[:len(expected)], expected)
test_status_map((200, 200, 201, -1, 201), 201) test_status_map((200, 200, 201, -1, 201), 201)
@ -1805,7 +1817,10 @@ class TestObjectController(unittest.TestCase):
req = Request.blank('/v1/a/c/o.jpg', {}) req = Request.blank('/v1/a/c/o.jpg', {})
req.content_length = 0 req.content_length = 0
self.app.update_request(req) self.app.update_request(req)
try:
res = controller.PUT(req) res = controller.PUT(req)
except HTTPException as res:
pass
expected = str(expected) expected = str(expected)
self.assertEquals(res.status[:len(str(expected))], self.assertEquals(res.status[:len(str(expected))],
str(expected)) str(expected))
@ -3391,7 +3406,10 @@ class TestObjectController(unittest.TestCase):
self.app.update_request(req) self.app.update_request(req)
self.app.memcache.store = {} self.app.memcache.store = {}
try:
resp = controller.PUT(req) resp = controller.PUT(req)
except HTTPException as resp:
pass
self.assertEquals(resp.status_int, 413) self.assertEquals(resp.status_int, 413)
def test_basic_COPY(self): def test_basic_COPY(self):
@ -3632,7 +3650,10 @@ class TestObjectController(unittest.TestCase):
kwargs = dict(body=copy_from_obj_body) kwargs = dict(body=copy_from_obj_body)
with self.controller_context(req, *status_list, with self.controller_context(req, *status_list,
**kwargs) as controller: **kwargs) as controller:
try:
resp = controller.COPY(req) resp = controller.COPY(req)
except HTTPException as resp:
pass
self.assertEquals(resp.status_int, 413) self.assertEquals(resp.status_int, 413)
@_limit_max_file_size @_limit_max_file_size
@ -3656,7 +3677,10 @@ class TestObjectController(unittest.TestCase):
kwargs = dict(body=copy_from_obj_body) kwargs = dict(body=copy_from_obj_body)
with self.controller_context(req, *status_list, with self.controller_context(req, *status_list,
**kwargs) as controller: **kwargs) as controller:
try:
resp = controller.COPY(req) resp = controller.COPY(req)
except HTTPException as resp:
pass
self.assertEquals(resp.status_int, 413) self.assertEquals(resp.status_int, 413)
def test_COPY_newest(self): def test_COPY_newest(self):
@ -3698,24 +3722,29 @@ class TestObjectController(unittest.TestCase):
def test_COPY_delete_at(self): def test_COPY_delete_at(self):
with save_globals(): with save_globals():
given_headers = {} backend_requests = []
def fake_connect_put_node(nodes, part, path, headers, def capture_requests(ipaddr, port, device, partition, method, path,
logger_thread_locals): headers=None, query_string=None):
given_headers.update(headers) backend_requests.append((method, path, headers))
controller = proxy_server.ObjectController(self.app, 'a', controller = proxy_server.ObjectController(self.app, 'a',
'c', 'o') 'c', 'o')
controller._connect_put_node = fake_connect_put_node set_http_connect(200, 200, 200, 200, 200, 201, 201, 201,
set_http_connect(200, 200, 200, 200, 200, 201, 201, 201) give_connect=capture_requests)
self.app.memcache.store = {} self.app.memcache.store = {}
req = Request.blank('/v1/a/c/o', req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'COPY'}, environ={'REQUEST_METHOD': 'COPY'},
headers={'Destination': '/c/o'}) headers={'Destination': '/c/o'})
self.app.update_request(req) self.app.update_request(req)
controller.COPY(req) resp = controller.COPY(req)
self.assertEquals(given_headers.get('X-Delete-At'), '9876543210') self.assertEqual(201, resp.status_int) # sanity
for method, path, given_headers in backend_requests:
if method != 'PUT':
continue
self.assertEquals(given_headers.get('X-Delete-At'),
'9876543210')
self.assertTrue('X-Delete-At-Host' in given_headers) self.assertTrue('X-Delete-At-Host' in given_headers)
self.assertTrue('X-Delete-At-Device' in given_headers) self.assertTrue('X-Delete-At-Device' in given_headers)
self.assertTrue('X-Delete-At-Partition' in given_headers) self.assertTrue('X-Delete-At-Partition' in given_headers)
@ -3723,16 +3752,16 @@ class TestObjectController(unittest.TestCase):
def test_COPY_account_delete_at(self): def test_COPY_account_delete_at(self):
with save_globals(): with save_globals():
given_headers = {} backend_requests = []
def fake_connect_put_node(nodes, part, path, headers, def capture_requests(ipaddr, port, device, partition, method, path,
logger_thread_locals): headers=None, query_string=None):
given_headers.update(headers) backend_requests.append((method, path, headers))
controller = proxy_server.ObjectController(self.app, 'a', controller = proxy_server.ObjectController(self.app, 'a',
'c', 'o') 'c', 'o')
controller._connect_put_node = fake_connect_put_node set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201,
set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201) give_connect=capture_requests)
self.app.memcache.store = {} self.app.memcache.store = {}
req = Request.blank('/v1/a/c/o', req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'COPY'}, environ={'REQUEST_METHOD': 'COPY'},
@ -3740,8 +3769,13 @@ class TestObjectController(unittest.TestCase):
'Destination-Account': 'a1'}) 'Destination-Account': 'a1'})
self.app.update_request(req) self.app.update_request(req)
controller.COPY(req) resp = controller.COPY(req)
self.assertEquals(given_headers.get('X-Delete-At'), '9876543210') self.assertEqual(201, resp.status_int) # sanity
for method, path, given_headers in backend_requests:
if method != 'PUT':
continue
self.assertEquals(given_headers.get('X-Delete-At'),
'9876543210')
self.assertTrue('X-Delete-At-Host' in given_headers) self.assertTrue('X-Delete-At-Host' in given_headers)
self.assertTrue('X-Delete-At-Device' in given_headers) self.assertTrue('X-Delete-At-Device' in given_headers)
self.assertTrue('X-Delete-At-Partition' in given_headers) self.assertTrue('X-Delete-At-Partition' in given_headers)

View File

@ -70,6 +70,9 @@ class FakeServerConnection(WSGIContext):
def send(self, data): def send(self, data):
self.data += data self.data += data
def close(self):
pass
def __call__(self, ipaddr, port, device, partition, method, path, def __call__(self, ipaddr, port, device, partition, method, path,
headers=None, query_string=None): headers=None, query_string=None):
self.path = quote('/' + device + '/' + str(partition) + path) self.path = quote('/' + device + '/' + str(partition) + path)