Allow sending object metadata after data

This lets the proxy server send object metadata to the object server
after the object data. This is necessary for EC, as it allows us to
compute the etag of the object in the proxy server and still store it
with the object.

The wire format is a multipart MIME document. For sanity during a
rolling upgrade, the multipart MIME document is only sent to the
object server if it indicates, via 100 Continue header, that it knows
how to consume it.

Example 1 (new proxy, new obj server):

   proxy: PUT /p/a/c/o
          X-Backend-Obj-Metadata-Footer: yes

     obj: 100 Continue
        X-Obj-Metadata-Footer: yes

   proxy: --MIMEmimeMIMEmime...

Example2: (new proxy, old obj server)

   proxy: PUT /p/a/c/o
          X-Backend-Obj-Metadata-Footer: yes

     obj: 100 Continue

   proxy: <obj body>

Co-Authored-By: Alistair Coles <alistair.coles@hp.com>
Co-Authored-By: Thiago da Silva <thiago@redhat.com>
Co-Authored-By: John Dickinson <me@not.mn>
Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com>
Co-Authored-By: Tushar Gohad <tushar.gohad@intel.com>
Co-Authored-By: Paul Luse <paul.e.luse@intel.com>
Co-Authored-By: Christian Schwede <christian.schwede@enovance.com>
Co-Authored-By: Yuan Zhou <yuan.zhou@intel.com>
Change-Id: Id38f7e93e3473f19ff88123ae0501000ed9b2e89
This commit is contained in:
Samuel Merritt 2014-09-16 18:40:41 -07:00 committed by Clay Gerrard
parent fa89064933
commit b1eda4aef8
9 changed files with 1333 additions and 175 deletions

View File

@ -26,10 +26,12 @@ import time
from contextlib import contextmanager
from urllib import unquote
from swift import gettext_ as _
from swift.common.storage_policy import POLICIES
from swift.common.constraints import FORMAT2CONTENT_TYPE
from swift.common.exceptions import ListingIterError, SegmentError
from swift.common.http import is_success
from swift.common.swob import HTTPBadRequest, HTTPNotAcceptable
from swift.common.swob import (HTTPBadRequest, HTTPNotAcceptable,
HTTPServiceUnavailable)
from swift.common.utils import split_path, validate_device_partition
from swift.common.wsgi import make_subrequest
@ -82,21 +84,27 @@ def get_listing_content_type(req):
def get_name_and_placement(request, minsegs=1, maxsegs=None,
rest_with_last=False):
"""
Utility function to split and validate the request path and
storage_policy_index. The storage_policy_index is extracted from
the headers of the request and converted to an integer, and then the
args are passed through to :meth:`split_and_validate_path`.
Utility function to split and validate the request path and storage
policy. The storage policy index is extracted from the headers of
the request and converted to a StoragePolicy instance. The
remaining args are passed through to
:meth:`split_and_validate_path`.
:returns: a list, result of :meth:`split_and_validate_path` with
storage_policy_index appended on the end
:raises: HTTPBadRequest
the BaseStoragePolicy instance appended on the end
:raises: HTTPServiceUnavailable if the path is invalid or no policy exists
with the extracted policy_index.
"""
policy_idx = request.headers.get('X-Backend-Storage-Policy-Index', '0')
policy_idx = int(policy_idx)
policy_index = request.headers.get('X-Backend-Storage-Policy-Index')
policy = POLICIES.get_by_index(policy_index)
if not policy:
raise HTTPServiceUnavailable(
body=_("No policy with index %s") % policy_index,
request=request, content_type='text/plain')
results = split_and_validate_path(request, minsegs=minsegs,
maxsegs=maxsegs,
rest_with_last=rest_with_last)
results.append(policy_idx)
results.append(policy)
return results

View File

@ -36,7 +36,7 @@ needs to change.
"""
from collections import defaultdict
from cStringIO import StringIO
from StringIO import StringIO
import UserDict
import time
from functools import partial
@ -128,6 +128,20 @@ class _UTC(tzinfo):
UTC = _UTC()
class WsgiStringIO(StringIO):
"""
This class adds support for the additional wsgi.input methods defined on
eventlet.wsgi.Input to the StringIO class which would otherwise be a fine
stand-in for the file-like object in the WSGI environment.
"""
def set_hundred_continue_response_headers(self, headers):
pass
def send_hundred_continue_response(self):
pass
def _datetime_property(header):
"""
Set and retrieve the datetime value of self.headers[header]
@ -743,16 +757,16 @@ def _req_environ_property(environ_field):
def _req_body_property():
"""
Set and retrieve the Request.body parameter. It consumes wsgi.input and
returns the results. On assignment, uses a StringIO to create a new
returns the results. On assignment, uses a WsgiStringIO to create a new
wsgi.input.
"""
def getter(self):
body = self.environ['wsgi.input'].read()
self.environ['wsgi.input'] = StringIO(body)
self.environ['wsgi.input'] = WsgiStringIO(body)
return body
def setter(self, value):
self.environ['wsgi.input'] = StringIO(value)
self.environ['wsgi.input'] = WsgiStringIO(value)
self.environ['CONTENT_LENGTH'] = str(len(value))
return property(getter, setter, doc="Get and set the request body str")
@ -820,7 +834,7 @@ class Request(object):
:param path: encoded, parsed, and unquoted into PATH_INFO
:param environ: WSGI environ dictionary
:param headers: HTTP headers
:param body: stuffed in a StringIO and hung on wsgi.input
:param body: stuffed in a WsgiStringIO and hung on wsgi.input
:param kwargs: any environ key with an property setter
"""
headers = headers or {}
@ -855,10 +869,10 @@ class Request(object):
}
env.update(environ)
if body is not None:
env['wsgi.input'] = StringIO(body)
env['wsgi.input'] = WsgiStringIO(body)
env['CONTENT_LENGTH'] = str(len(body))
elif 'wsgi.input' not in env:
env['wsgi.input'] = StringIO('')
env['wsgi.input'] = WsgiStringIO('')
req = Request(env)
for key, val in headers.iteritems():
req.headers[key] = val
@ -965,7 +979,7 @@ class Request(object):
env.update({
'REQUEST_METHOD': 'GET',
'CONTENT_LENGTH': '0',
'wsgi.input': StringIO(''),
'wsgi.input': WsgiStringIO(''),
})
return Request(env)
@ -1102,10 +1116,12 @@ class Response(object):
app_iter = _resp_app_iter_property()
def __init__(self, body=None, status=200, headers=None, app_iter=None,
request=None, conditional_response=False, **kw):
request=None, conditional_response=False,
conditional_etag=None, **kw):
self.headers = HeaderKeyDict(
[('Content-Type', 'text/html; charset=UTF-8')])
self.conditional_response = conditional_response
self._conditional_etag = conditional_etag
self.request = request
self.body = body
self.app_iter = app_iter
@ -1131,6 +1147,26 @@ class Response(object):
if 'charset' in kw and 'content_type' in kw:
self.charset = kw['charset']
@property
def conditional_etag(self):
"""
The conditional_etag keyword argument for Response will allow the
conditional match value of a If-Match request to be compared to a
non-standard value.
This is available for Storage Policies that do not store the client
object data verbatim on the storage nodes, but still need support
conditional requests.
It's most effectively used with X-Backend-Etag-Is-At which would
define the additional Metadata key where the original ETag of the
clear-form client request data.
"""
if self._conditional_etag is not None:
return self._conditional_etag
else:
return self.etag
def _prepare_for_ranges(self, ranges):
"""
Prepare the Response for multiple ranges.
@ -1161,15 +1197,16 @@ class Response(object):
return content_size, content_type
def _response_iter(self, app_iter, body):
etag = self.conditional_etag
if self.conditional_response and self.request:
if self.etag and self.request.if_none_match and \
self.etag in self.request.if_none_match:
if etag and self.request.if_none_match and \
etag in self.request.if_none_match:
self.status = 304
self.content_length = 0
return ['']
if self.etag and self.request.if_match and \
self.etag not in self.request.if_match:
if etag and self.request.if_match and \
etag not in self.request.if_match:
self.status = 412
self.content_length = 0
return ['']

View File

@ -16,10 +16,12 @@
""" Object Server for Swift """
import cPickle as pickle
import json
import os
import multiprocessing
import time
import traceback
import rfc822
import socket
import math
from swift import gettext_ as _
@ -30,7 +32,7 @@ from eventlet import sleep, wsgi, Timeout
from swift.common.utils import public, get_logger, \
config_true_value, timing_stats, replication, \
normalize_delete_at_timestamp, get_log_line, Timestamp, \
get_expirer_container
get_expirer_container, iter_multipart_mime_documents
from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_object_creation, \
valid_timestamp, check_utf8
@ -48,8 +50,35 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \
HTTPPreconditionFailed, HTTPRequestTimeout, HTTPUnprocessableEntity, \
HTTPClientDisconnect, HTTPMethodNotAllowed, Request, Response, \
HTTPInsufficientStorage, HTTPForbidden, HTTPException, HeaderKeyDict, \
HTTPConflict
from swift.obj.diskfile import DATAFILE_SYSTEM_META, DiskFileManager
HTTPConflict, HTTPServerError
from swift.obj.diskfile import DATAFILE_SYSTEM_META, DiskFileRouter
def iter_mime_headers_and_bodies(wsgi_input, mime_boundary, read_chunk_size):
mime_documents_iter = iter_multipart_mime_documents(
wsgi_input, mime_boundary, read_chunk_size)
for file_like in mime_documents_iter:
hdrs = HeaderKeyDict(rfc822.Message(file_like, 0))
yield (hdrs, file_like)
def drain(file_like, read_size, timeout):
"""
Read and discard any bytes from file_like.
:param file_like: file-like object to read from
:param read_size: how big a chunk to read at a time
:param timeout: how long to wait for a read (use None for no timeout)
:raises ChunkReadTimeout: if no chunk was read in time
"""
while True:
with ChunkReadTimeout(timeout):
chunk = file_like.read(read_size)
if not chunk:
break
class EventletPlungerString(str):
@ -142,7 +171,7 @@ class ObjectController(BaseStorageServer):
# Common on-disk hierarchy shared across account, container and object
# servers.
self._diskfile_mgr = DiskFileManager(conf, self.logger)
self._diskfile_router = DiskFileRouter(conf, self.logger)
# This is populated by global_conf_callback way below as the semaphore
# is shared by all workers.
if 'replication_semaphore' in conf:
@ -156,7 +185,7 @@ class ObjectController(BaseStorageServer):
conf.get('replication_failure_ratio') or 1.0)
def get_diskfile(self, device, partition, account, container, obj,
policy_idx, **kwargs):
policy, **kwargs):
"""
Utility method for instantiating a DiskFile object supporting a given
REST API.
@ -165,11 +194,11 @@ class ObjectController(BaseStorageServer):
DiskFile class would simply over-ride this method to provide that
behavior.
"""
return self._diskfile_mgr.get_diskfile(
device, partition, account, container, obj, policy_idx, **kwargs)
return self._diskfile_router[policy].get_diskfile(
device, partition, account, container, obj, policy, **kwargs)
def async_update(self, op, account, container, obj, host, partition,
contdevice, headers_out, objdevice, policy_index):
contdevice, headers_out, objdevice, policy):
"""
Sends or saves an async update.
@ -183,7 +212,7 @@ class ObjectController(BaseStorageServer):
:param headers_out: dictionary of headers to send in the container
request
:param objdevice: device name that the object is in
:param policy_index: the associated storage policy index
:param policy: the associated BaseStoragePolicy instance
"""
headers_out['user-agent'] = 'object-server %s' % os.getpid()
full_path = '/%s/%s/%s' % (account, container, obj)
@ -213,12 +242,11 @@ class ObjectController(BaseStorageServer):
data = {'op': op, 'account': account, 'container': container,
'obj': obj, 'headers': headers_out}
timestamp = headers_out['x-timestamp']
self._diskfile_mgr.pickle_async_update(objdevice, account, container,
obj, data, timestamp,
policy_index)
self._diskfile_router[policy].pickle_async_update(
objdevice, account, container, obj, data, timestamp, policy)
def container_update(self, op, account, container, obj, request,
headers_out, objdevice, policy_idx):
headers_out, objdevice, policy):
"""
Update the container when objects are updated.
@ -230,6 +258,7 @@ class ObjectController(BaseStorageServer):
:param headers_out: dictionary of headers to send in the container
request(s)
:param objdevice: device name that the object is in
:param policy: the BaseStoragePolicy instance
"""
headers_in = request.headers
conthosts = [h.strip() for h in
@ -255,14 +284,14 @@ class ObjectController(BaseStorageServer):
headers_out['x-trans-id'] = headers_in.get('x-trans-id', '-')
headers_out['referer'] = request.as_referer()
headers_out['X-Backend-Storage-Policy-Index'] = policy_idx
headers_out['X-Backend-Storage-Policy-Index'] = int(policy)
for conthost, contdevice in updates:
self.async_update(op, account, container, obj, conthost,
contpartition, contdevice, headers_out,
objdevice, policy_idx)
objdevice, policy)
def delete_at_update(self, op, delete_at, account, container, obj,
request, objdevice, policy_index):
request, objdevice, policy):
"""
Update the expiring objects container when objects are updated.
@ -273,7 +302,7 @@ class ObjectController(BaseStorageServer):
:param obj: object name
:param request: the original request driving the update
:param objdevice: device name that the object is in
:param policy_index: the policy index to be used for tmp dir
:param policy: the BaseStoragePolicy instance (used for tmp dir)
"""
if config_true_value(
request.headers.get('x-backend-replication', 'f')):
@ -333,13 +362,66 @@ class ObjectController(BaseStorageServer):
op, self.expiring_objects_account, delete_at_container,
'%s-%s/%s/%s' % (delete_at, account, container, obj),
host, partition, contdevice, headers_out, objdevice,
policy_index)
policy)
def _make_timeout_reader(self, file_like):
def timeout_reader():
with ChunkReadTimeout(self.client_timeout):
return file_like.read(self.network_chunk_size)
return timeout_reader
def _read_put_commit_message(self, mime_documents_iter):
rcvd_commit = False
try:
with ChunkReadTimeout(self.client_timeout):
commit_hdrs, commit_iter = next(mime_documents_iter)
if commit_hdrs.get('X-Document', None) == "put commit":
rcvd_commit = True
drain(commit_iter, self.network_chunk_size, self.client_timeout)
except ChunkReadTimeout:
raise HTTPClientDisconnect()
except StopIteration:
raise HTTPBadRequest(body="couldn't find PUT commit MIME doc")
return rcvd_commit
def _read_metadata_footer(self, mime_documents_iter):
try:
with ChunkReadTimeout(self.client_timeout):
footer_hdrs, footer_iter = next(mime_documents_iter)
except ChunkReadTimeout:
raise HTTPClientDisconnect()
except StopIteration:
raise HTTPBadRequest(body="couldn't find footer MIME doc")
timeout_reader = self._make_timeout_reader(footer_iter)
try:
footer_body = ''.join(iter(timeout_reader, ''))
except ChunkReadTimeout:
raise HTTPClientDisconnect()
footer_md5 = footer_hdrs.get('Content-MD5')
if not footer_md5:
raise HTTPBadRequest(body="no Content-MD5 in footer")
if footer_md5 != md5(footer_body).hexdigest():
raise HTTPUnprocessableEntity(body="footer MD5 mismatch")
try:
return HeaderKeyDict(json.loads(footer_body))
except ValueError:
raise HTTPBadRequest("invalid JSON for footer doc")
def _check_container_override(self, update_headers, metadata):
for key, val in metadata.iteritems():
override_prefix = 'x-backend-container-update-override-'
if key.lower().startswith(override_prefix):
override = key.lower().replace(override_prefix, 'x-')
update_headers[override] = val
@public
@timing_stats()
def POST(self, request):
"""Handle HTTP POST requests for the Swift Object Server."""
device, partition, account, container, obj, policy_idx = \
device, partition, account, container, obj, policy = \
get_name_and_placement(request, 5, 5, True)
req_timestamp = valid_timestamp(request)
new_delete_at = int(request.headers.get('X-Delete-At') or 0)
@ -349,7 +431,7 @@ class ObjectController(BaseStorageServer):
try:
disk_file = self.get_diskfile(
device, partition, account, container, obj,
policy_idx=policy_idx)
policy=policy)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
try:
@ -374,11 +456,11 @@ class ObjectController(BaseStorageServer):
if orig_delete_at != new_delete_at:
if new_delete_at:
self.delete_at_update('PUT', new_delete_at, account, container,
obj, request, device, policy_idx)
obj, request, device, policy)
if orig_delete_at:
self.delete_at_update('DELETE', orig_delete_at, account,
container, obj, request, device,
policy_idx)
policy)
try:
disk_file.write_metadata(metadata)
except (DiskFileXattrNotSupported, DiskFileNoSpace):
@ -389,7 +471,7 @@ class ObjectController(BaseStorageServer):
@timing_stats()
def PUT(self, request):
"""Handle HTTP PUT requests for the Swift Object Server."""
device, partition, account, container, obj, policy_idx = \
device, partition, account, container, obj, policy = \
get_name_and_placement(request, 5, 5, True)
req_timestamp = valid_timestamp(request)
error_response = check_object_creation(request, obj)
@ -404,10 +486,22 @@ class ObjectController(BaseStorageServer):
except ValueError as e:
return HTTPBadRequest(body=str(e), request=request,
content_type='text/plain')
# In case of multipart-MIME put, the proxy sends a chunked request,
# but may let us know the real content length so we can verify that
# we have enough disk space to hold the object.
if fsize is None:
fsize = request.headers.get('X-Backend-Obj-Content-Length')
if fsize is not None:
try:
fsize = int(fsize)
except ValueError as e:
return HTTPBadRequest(body=str(e), request=request,
content_type='text/plain')
try:
disk_file = self.get_diskfile(
device, partition, account, container, obj,
policy_idx=policy_idx)
policy=policy)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
try:
@ -439,13 +533,51 @@ class ObjectController(BaseStorageServer):
with disk_file.create(size=fsize) as writer:
upload_size = 0
def timeout_reader():
with ChunkReadTimeout(self.client_timeout):
return request.environ['wsgi.input'].read(
self.network_chunk_size)
# If the proxy wants to send us object metadata after the
# object body, it sets some headers. We have to tell the
# proxy, in the 100 Continue response, that we're able to
# parse a multipart MIME document and extract the object and
# metadata from it. If we don't, then the proxy won't
# actually send the footer metadata.
have_metadata_footer = False
use_multiphase_commit = False
mime_documents_iter = iter([])
obj_input = request.environ['wsgi.input']
hundred_continue_headers = []
if config_true_value(
request.headers.get(
'X-Backend-Obj-Multiphase-Commit')):
use_multiphase_commit = True
hundred_continue_headers.append(
('X-Obj-Multiphase-Commit', 'yes'))
if config_true_value(
request.headers.get('X-Backend-Obj-Metadata-Footer')):
have_metadata_footer = True
hundred_continue_headers.append(
('X-Obj-Metadata-Footer', 'yes'))
if have_metadata_footer or use_multiphase_commit:
obj_input.set_hundred_continue_response_headers(
hundred_continue_headers)
mime_boundary = request.headers.get(
'X-Backend-Obj-Multipart-Mime-Boundary')
if not mime_boundary:
return HTTPBadRequest("no MIME boundary")
try:
for chunk in iter(lambda: timeout_reader(), ''):
with ChunkReadTimeout(self.client_timeout):
mime_documents_iter = iter_mime_headers_and_bodies(
request.environ['wsgi.input'],
mime_boundary, self.network_chunk_size)
_junk_hdrs, obj_input = next(mime_documents_iter)
except ChunkReadTimeout:
return HTTPRequestTimeout(request=request)
timeout_reader = self._make_timeout_reader(obj_input)
try:
for chunk in iter(timeout_reader, ''):
start_time = time.time()
if start_time > upload_expiration:
self.logger.increment('PUT.timeouts')
@ -461,9 +593,16 @@ class ObjectController(BaseStorageServer):
upload_size)
if fsize is not None and fsize != upload_size:
return HTTPClientDisconnect(request=request)
footer_meta = {}
if have_metadata_footer:
footer_meta = self._read_metadata_footer(
mime_documents_iter)
request_etag = (footer_meta.get('etag') or
request.headers.get('etag', '')).lower()
etag = etag.hexdigest()
if 'etag' in request.headers and \
request.headers['etag'].lower() != etag:
if request_etag and request_etag != etag:
return HTTPUnprocessableEntity(request=request)
metadata = {
'X-Timestamp': request.timestamp.internal,
@ -473,6 +612,8 @@ class ObjectController(BaseStorageServer):
}
metadata.update(val for val in request.headers.iteritems()
if is_sys_or_user_meta('object', val[0]))
metadata.update(val for val in footer_meta.iteritems()
if is_sys_or_user_meta('object', val[0]))
headers_to_copy = (
request.headers.get(
'X-Backend-Replication-Headers', '').split() +
@ -482,39 +623,63 @@ class ObjectController(BaseStorageServer):
header_caps = header_key.title()
metadata[header_caps] = request.headers[header_key]
writer.put(metadata)
# if the PUT requires a two-phase commit (a data and a commit
# phase) send the proxy server another 100-continue response
# to indicate that we are finished writing object data
if use_multiphase_commit:
request.environ['wsgi.input'].\
send_hundred_continue_response()
if not self._read_put_commit_message(mime_documents_iter):
return HTTPServerError(request=request)
# got 2nd phase confirmation, write a timestamp.durable
# state file to indicate a successful PUT
writer.commit(request.timestamp)
# Drain any remaining MIME docs from the socket. There
# shouldn't be any, but we must read the whole request body.
try:
while True:
with ChunkReadTimeout(self.client_timeout):
_junk_hdrs, _junk_body = next(mime_documents_iter)
drain(_junk_body, self.network_chunk_size,
self.client_timeout)
except ChunkReadTimeout:
raise HTTPClientDisconnect()
except StopIteration:
pass
except (DiskFileXattrNotSupported, DiskFileNoSpace):
return HTTPInsufficientStorage(drive=device, request=request)
if orig_delete_at != new_delete_at:
if new_delete_at:
self.delete_at_update(
'PUT', new_delete_at, account, container, obj, request,
device, policy_idx)
device, policy)
if orig_delete_at:
self.delete_at_update(
'DELETE', orig_delete_at, account, container, obj,
request, device, policy_idx)
request, device, policy)
update_headers = HeaderKeyDict({
'x-size': metadata['Content-Length'],
'x-content-type': metadata['Content-Type'],
'x-timestamp': metadata['X-Timestamp'],
'x-etag': metadata['ETag']})
# apply any container update header overrides sent with request
for key, val in request.headers.iteritems():
override_prefix = 'x-backend-container-update-override-'
if key.lower().startswith(override_prefix):
override = key.lower().replace(override_prefix, 'x-')
update_headers[override] = val
self._check_container_override(update_headers, request.headers)
self._check_container_override(update_headers, footer_meta)
self.container_update(
'PUT', account, container, obj, request,
update_headers,
device, policy_idx)
device, policy)
return HTTPCreated(request=request, etag=etag)
@public
@timing_stats()
def GET(self, request):
"""Handle HTTP GET requests for the Swift Object Server."""
device, partition, account, container, obj, policy_idx = \
device, partition, account, container, obj, policy = \
get_name_and_placement(request, 5, 5, True)
keep_cache = self.keep_cache_private or (
'X-Auth-Token' not in request.headers and
@ -522,7 +687,7 @@ class ObjectController(BaseStorageServer):
try:
disk_file = self.get_diskfile(
device, partition, account, container, obj,
policy_idx=policy_idx)
policy=policy)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
try:
@ -533,9 +698,14 @@ class ObjectController(BaseStorageServer):
keep_cache = (self.keep_cache_private or
('X-Auth-Token' not in request.headers and
'X-Storage-Token' not in request.headers))
conditional_etag = None
if 'X-Backend-Etag-Is-At' in request.headers:
conditional_etag = metadata.get(
request.headers['X-Backend-Etag-Is-At'])
response = Response(
app_iter=disk_file.reader(keep_cache=keep_cache),
request=request, conditional_response=True)
request=request, conditional_response=True,
conditional_etag=conditional_etag)
response.headers['Content-Type'] = metadata.get(
'Content-Type', 'application/octet-stream')
for key, value in metadata.iteritems():
@ -567,12 +737,12 @@ class ObjectController(BaseStorageServer):
@timing_stats(sample_rate=0.8)
def HEAD(self, request):
"""Handle HTTP HEAD requests for the Swift Object Server."""
device, partition, account, container, obj, policy_idx = \
device, partition, account, container, obj, policy = \
get_name_and_placement(request, 5, 5, True)
try:
disk_file = self.get_diskfile(
device, partition, account, container, obj,
policy_idx=policy_idx)
policy=policy)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
try:
@ -585,7 +755,12 @@ class ObjectController(BaseStorageServer):
headers['X-Backend-Timestamp'] = e.timestamp.internal
return HTTPNotFound(request=request, headers=headers,
conditional_response=True)
response = Response(request=request, conditional_response=True)
conditional_etag = None
if 'X-Backend-Etag-Is-At' in request.headers:
conditional_etag = metadata.get(
request.headers['X-Backend-Etag-Is-At'])
response = Response(request=request, conditional_response=True,
conditional_etag=conditional_etag)
response.headers['Content-Type'] = metadata.get(
'Content-Type', 'application/octet-stream')
for key, value in metadata.iteritems():
@ -609,13 +784,13 @@ class ObjectController(BaseStorageServer):
@timing_stats()
def DELETE(self, request):
"""Handle HTTP DELETE requests for the Swift Object Server."""
device, partition, account, container, obj, policy_idx = \
device, partition, account, container, obj, policy = \
get_name_and_placement(request, 5, 5, True)
req_timestamp = valid_timestamp(request)
try:
disk_file = self.get_diskfile(
device, partition, account, container, obj,
policy_idx=policy_idx)
policy=policy)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
try:
@ -667,13 +842,13 @@ class ObjectController(BaseStorageServer):
if orig_delete_at:
self.delete_at_update('DELETE', orig_delete_at, account,
container, obj, request, device,
policy_idx)
policy)
if orig_timestamp < req_timestamp:
disk_file.delete(req_timestamp)
self.container_update(
'DELETE', account, container, obj, request,
HeaderKeyDict({'x-timestamp': req_timestamp.internal}),
device, policy_idx)
device, policy)
return response_class(
request=request,
headers={'X-Backend-Timestamp': response_timestamp.internal})
@ -694,7 +869,7 @@ class ObjectController(BaseStorageServer):
get_name_and_placement(request, 2, 3, True)
suffixes = suffix_parts.split('-') if suffix_parts else []
try:
hashes = self._diskfile_mgr.get_hashes(
hashes = self._diskfile_router[policy].get_hashes(
device, partition, suffixes, policy)
except DiskFileDeviceUnavailable:
resp = HTTPInsufficientStorage(drive=device, request=request)

View File

@ -24,6 +24,7 @@ from swift.common import exceptions
from swift.common import http
from swift.common import swob
from swift.common import utils
from swift.common import request_helpers
class Receiver(object):
@ -98,7 +99,7 @@ class Receiver(object):
if not self.app.replication_semaphore.acquire(False):
raise swob.HTTPServiceUnavailable()
try:
with self.app._diskfile_mgr.replication_lock(self.device):
with self.diskfile_mgr.replication_lock(self.device):
for data in self.missing_check():
yield data
for data in self.updates():
@ -166,14 +167,14 @@ class Receiver(object):
"""
# The following is the setting we talk about above in _ensure_flush.
self.request.environ['eventlet.minimum_write_chunk_size'] = 0
self.device, self.partition = utils.split_path(
urllib.unquote(self.request.path), 2, 2, False)
self.device, self.partition, self.policy = \
request_helpers.get_name_and_placement(self.request, 2, 2, False)
self.policy_idx = \
int(self.request.headers.get('X-Backend-Storage-Policy-Index', 0))
utils.validate_device_partition(self.device, self.partition)
if self.app._diskfile_mgr.mount_check and \
not constraints.check_mount(
self.app._diskfile_mgr.devices, self.device):
self.diskfile_mgr = self.app._diskfile_router[self.policy]
if self.diskfile_mgr.mount_check and not constraints.check_mount(
self.diskfile_mgr.devices, self.device):
raise swob.HTTPInsufficientStorage(drive=self.device)
self.fp = self.request.environ['wsgi.input']
for data in self._ensure_flush():
@ -229,8 +230,8 @@ class Receiver(object):
object_hash, timestamp = [urllib.unquote(v) for v in line.split()]
want = False
try:
df = self.app._diskfile_mgr.get_diskfile_from_hash(
self.device, self.partition, object_hash, self.policy_idx)
df = self.diskfile_mgr.get_diskfile_from_hash(
self.device, self.partition, object_hash, self.policy)
except exceptions.DiskFileNotExist:
want = True
else:

View File

@ -882,6 +882,9 @@ class ObjectController(Controller):
req, delete_at_container, delete_at_part, \
delete_at_nodes = self._config_obj_expiration(req)
# XXX hack for PUT to EC until the proxy learns how to encode
req.headers['X-Object-Sysmeta-Ec-Archive-Index'] = 0
# add special headers to be handled by storage nodes
outgoing_headers = self._backend_requests(
req, len(nodes), container_partition, container_nodes,

View File

@ -16,10 +16,13 @@
"""Tests for swift.common.request_helpers"""
import unittest
from swift.common.swob import Request
from swift.common.swob import Request, HTTPException
from swift.common.storage_policy import POLICIES, EC_POLICY, REPL_POLICY
from swift.common.request_helpers import is_sys_meta, is_user_meta, \
is_sys_or_user_meta, strip_sys_meta_prefix, strip_user_meta_prefix, \
remove_items, copy_header_subset
remove_items, copy_header_subset, get_name_and_placement
from test.unit import patch_policies
server_types = ['account', 'container', 'object']
@ -81,3 +84,77 @@ class TestRequestHelpers(unittest.TestCase):
self.assertEqual(to_req.headers['A'], 'b')
self.assertFalse('c' in to_req.headers)
self.assertFalse('C' in to_req.headers)
@patch_policies(with_ec_default=True)
def test_get_name_and_placement_object_req(self):
path = '/device/part/account/container/object'
req = Request.blank(path, headers={
'X-Backend-Storage-Policy-Index': '0'})
device, part, account, container, obj, policy = \
get_name_and_placement(req, 5, 5, True)
self.assertEqual(device, 'device')
self.assertEqual(part, 'part')
self.assertEqual(account, 'account')
self.assertEqual(container, 'container')
self.assertEqual(obj, 'object')
self.assertEqual(policy, POLICIES[0])
self.assertEqual(policy.policy_type, EC_POLICY)
req.headers['X-Backend-Storage-Policy-Index'] = 1
device, part, account, container, obj, policy = \
get_name_and_placement(req, 5, 5, True)
self.assertEqual(device, 'device')
self.assertEqual(part, 'part')
self.assertEqual(account, 'account')
self.assertEqual(container, 'container')
self.assertEqual(obj, 'object')
self.assertEqual(policy, POLICIES[1])
self.assertEqual(policy.policy_type, REPL_POLICY)
req.headers['X-Backend-Storage-Policy-Index'] = 'foo'
try:
device, part, account, container, obj, policy = \
get_name_and_placement(req, 5, 5, True)
except HTTPException as e:
self.assertEqual(e.status_int, 503)
self.assertEqual(str(e), '503 Service Unavailable')
self.assertEqual(e.body, "No policy with index foo")
else:
self.fail('get_name_and_placement did not raise error '
'for invalid storage policy index')
@patch_policies(with_ec_default=True)
def test_get_name_and_placement_object_replication(self):
# yup, suffixes are sent '-'.joined in the path
path = '/device/part/012-345-678-9ab-cde'
req = Request.blank(path, headers={
'X-Backend-Storage-Policy-Index': '0'})
device, partition, suffix_parts, policy = \
get_name_and_placement(req, 2, 3, True)
self.assertEqual(device, 'device')
self.assertEqual(partition, 'part')
self.assertEqual(suffix_parts, '012-345-678-9ab-cde')
self.assertEqual(policy, POLICIES[0])
self.assertEqual(policy.policy_type, EC_POLICY)
path = '/device/part'
req = Request.blank(path, headers={
'X-Backend-Storage-Policy-Index': '1'})
device, partition, suffix_parts, policy = \
get_name_and_placement(req, 2, 3, True)
self.assertEqual(device, 'device')
self.assertEqual(partition, 'part')
self.assertEqual(suffix_parts, None) # false-y
self.assertEqual(policy, POLICIES[1])
self.assertEqual(policy.policy_type, REPL_POLICY)
path = '/device/part/' # with a trailing slash
req = Request.blank(path, headers={
'X-Backend-Storage-Policy-Index': '1'})
device, partition, suffix_parts, policy = \
get_name_and_placement(req, 2, 3, True)
self.assertEqual(device, 'device')
self.assertEqual(partition, 'part')
self.assertEqual(suffix_parts, '') # still false-y
self.assertEqual(policy, POLICIES[1])
self.assertEqual(policy.policy_type, REPL_POLICY)

View File

@ -1553,6 +1553,17 @@ class TestConditionalIfMatch(unittest.TestCase):
self.assertEquals(resp.status_int, 200)
self.assertEquals(body, 'hi')
def test_simple_conditional_etag_match(self):
# if etag matches, proceed as normal
req = swift.common.swob.Request.blank(
'/', headers={'If-Match': 'not-the-etag'})
resp = req.get_response(self.fake_app)
resp.conditional_response = True
resp._conditional_etag = 'not-the-etag'
body = ''.join(resp(req.environ, self.fake_start_response))
self.assertEquals(resp.status_int, 200)
self.assertEquals(body, 'hi')
def test_quoted_simple_match(self):
# double quotes or not, doesn't matter
req = swift.common.swob.Request.blank(
@ -1573,6 +1584,16 @@ class TestConditionalIfMatch(unittest.TestCase):
self.assertEquals(resp.status_int, 412)
self.assertEquals(body, '')
def test_simple_conditional_etag_no_match(self):
req = swift.common.swob.Request.blank(
'/', headers={'If-Match': 'the-etag'})
resp = req.get_response(self.fake_app)
resp.conditional_response = True
resp._conditional_etag = 'not-the-etag'
body = ''.join(resp(req.environ, self.fake_start_response))
self.assertEquals(resp.status_int, 412)
self.assertEquals(body, '')
def test_match_star(self):
# "*" means match anything; see RFC 2616 section 14.24
req = swift.common.swob.Request.blank(

File diff suppressed because it is too large Load Diff

View File

@ -27,6 +27,7 @@ from swift.common import constraints
from swift.common import exceptions
from swift.common import swob
from swift.common import utils
from swift.common.storage_policy import POLICIES
from swift.obj import diskfile
from swift.obj import server
from swift.obj import ssync_receiver
@ -34,6 +35,7 @@ from swift.obj import ssync_receiver
from test import unit
@unit.patch_policies()
class TestReceiver(unittest.TestCase):
def setUp(self):
@ -46,12 +48,12 @@ class TestReceiver(unittest.TestCase):
self.testdir = os.path.join(
tempfile.mkdtemp(), 'tmp_test_ssync_receiver')
utils.mkdirs(os.path.join(self.testdir, 'sda1', 'tmp'))
conf = {
self.conf = {
'devices': self.testdir,
'mount_check': 'false',
'replication_one_per_device': 'false',
'log_requests': 'false'}
self.controller = server.ObjectController(conf)
self.controller = server.ObjectController(self.conf)
self.controller.bytes_per_sync = 1
self.account1 = 'a'
@ -111,8 +113,8 @@ class TestReceiver(unittest.TestCase):
def test_REPLICATION_calls_replication_lock(self):
with mock.patch.object(
self.controller._diskfile_mgr, 'replication_lock') as \
mocked_replication_lock:
self.controller._diskfile_router[POLICIES.legacy],
'replication_lock') as mocked_replication_lock:
req = swob.Request.blank(
'/sda1/1',
environ={'REQUEST_METHOD': 'REPLICATION'},
@ -140,9 +142,8 @@ class TestReceiver(unittest.TestCase):
body_lines,
[':MISSING_CHECK: START', ':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END'])
self.assertEqual(rcvr.policy_idx, 0)
self.assertEqual(rcvr.policy, POLICIES[0])
@unit.patch_policies()
def test_Receiver_with_storage_policy_index_header(self):
req = swob.Request.blank(
'/sda1/1',
@ -157,15 +158,30 @@ class TestReceiver(unittest.TestCase):
body_lines,
[':MISSING_CHECK: START', ':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END'])
self.assertEqual(rcvr.policy_idx, 1)
self.assertEqual(rcvr.policy, POLICIES[1])
def test_Receiver_with_bad_storage_policy_index_header(self):
valid_indices = sorted([int(policy) for policy in POLICIES])
bad_index = valid_indices[-1] + 1
req = swob.Request.blank(
'/sda1/1',
environ={'REQUEST_METHOD': 'SSYNC',
'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': bad_index},
body=':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'
':UPDATES: START\r\n:UPDATES: END\r\n')
self.controller.logger = mock.MagicMock()
receiver = ssync_receiver.Receiver(self.controller, req)
body_lines = [chunk.strip() for chunk in receiver() if chunk.strip()]
self.assertEqual(body_lines, [":ERROR: 503 'No policy with index 2'"])
def test_REPLICATION_replication_lock_fail(self):
def _mock(path):
with exceptions.ReplicationLockTimeout(0.01, '/somewhere/' + path):
eventlet.sleep(0.05)
with mock.patch.object(
self.controller._diskfile_mgr, 'replication_lock', _mock):
self.controller._diskfile_mgr
self.controller._diskfile_router[POLICIES.legacy],
'replication_lock', _mock):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/sda1/1',
@ -190,7 +206,7 @@ class TestReceiver(unittest.TestCase):
resp = req.get_response(self.controller)
self.assertEqual(
self.body_lines(resp.body),
[":ERROR: 0 'Invalid path: /device'"])
[":ERROR: 400 'Invalid path: /device'"])
self.assertEqual(resp.status_int, 200)
self.assertFalse(mocked_replication_semaphore.acquire.called)
self.assertFalse(mocked_replication_semaphore.release.called)
@ -203,7 +219,7 @@ class TestReceiver(unittest.TestCase):
resp = req.get_response(self.controller)
self.assertEqual(
self.body_lines(resp.body),
[":ERROR: 0 'Invalid path: /device/'"])
[":ERROR: 400 'Invalid path: /device/'"])
self.assertEqual(resp.status_int, 200)
self.assertFalse(mocked_replication_semaphore.acquire.called)
self.assertFalse(mocked_replication_semaphore.release.called)
@ -230,7 +246,7 @@ class TestReceiver(unittest.TestCase):
resp = req.get_response(self.controller)
self.assertEqual(
self.body_lines(resp.body),
[":ERROR: 0 'Invalid path: /device/partition/junk'"])
[":ERROR: 400 'Invalid path: /device/partition/junk'"])
self.assertEqual(resp.status_int, 200)
self.assertFalse(mocked_replication_semaphore.acquire.called)
self.assertFalse(mocked_replication_semaphore.release.called)
@ -240,7 +256,8 @@ class TestReceiver(unittest.TestCase):
mock.patch.object(
self.controller, 'replication_semaphore'),
mock.patch.object(
self.controller._diskfile_mgr, 'mount_check', False),
self.controller._diskfile_router[POLICIES.legacy],
'mount_check', False),
mock.patch.object(
constraints, 'check_mount', return_value=False)) as (
mocked_replication_semaphore,
@ -259,7 +276,8 @@ class TestReceiver(unittest.TestCase):
mock.patch.object(
self.controller, 'replication_semaphore'),
mock.patch.object(
self.controller._diskfile_mgr, 'mount_check', True),
self.controller._diskfile_router[POLICIES.legacy],
'mount_check', True),
mock.patch.object(
constraints, 'check_mount', return_value=False)) as (
mocked_replication_semaphore,
@ -275,7 +293,8 @@ class TestReceiver(unittest.TestCase):
"device</p></html>'"])
self.assertEqual(resp.status_int, 200)
mocked_check_mount.assert_called_once_with(
self.controller._diskfile_mgr.devices, 'device')
self.controller._diskfile_router[POLICIES.legacy].devices,
'device')
mocked_check_mount.reset_mock()
mocked_check_mount.return_value = True
@ -287,7 +306,8 @@ class TestReceiver(unittest.TestCase):
[':ERROR: 0 "Looking for :MISSING_CHECK: START got \'\'"'])
self.assertEqual(resp.status_int, 200)
mocked_check_mount.assert_called_once_with(
self.controller._diskfile_mgr.devices, 'device')
self.controller._diskfile_router[POLICIES.legacy].devices,
'device')
def test_REPLICATION_Exception(self):
@ -486,7 +506,8 @@ class TestReceiver(unittest.TestCase):
def test_MISSING_CHECK_have_one_exact(self):
object_dir = utils.storage_directory(
os.path.join(self.testdir, 'sda1', diskfile.get_data_dir(0)),
os.path.join(self.testdir, 'sda1',
diskfile.get_data_dir(POLICIES[0])),
'1', self.hash1)
utils.mkdirs(object_dir)
fp = open(os.path.join(object_dir, self.ts1 + '.data'), 'w+')
@ -515,10 +536,10 @@ class TestReceiver(unittest.TestCase):
self.assertFalse(self.controller.logger.error.called)
self.assertFalse(self.controller.logger.exception.called)
@unit.patch_policies
def test_MISSING_CHECK_storage_policy(self):
object_dir = utils.storage_directory(
os.path.join(self.testdir, 'sda1', diskfile.get_data_dir(1)),
os.path.join(self.testdir, 'sda1',
diskfile.get_data_dir(POLICIES[1])),
'1', self.hash1)
utils.mkdirs(object_dir)
fp = open(os.path.join(object_dir, self.ts1 + '.data'), 'w+')
@ -550,7 +571,8 @@ class TestReceiver(unittest.TestCase):
def test_MISSING_CHECK_have_one_newer(self):
object_dir = utils.storage_directory(
os.path.join(self.testdir, 'sda1', diskfile.get_data_dir(0)),
os.path.join(self.testdir, 'sda1',
diskfile.get_data_dir(POLICIES[0])),
'1', self.hash1)
utils.mkdirs(object_dir)
newer_ts1 = utils.normalize_timestamp(float(self.ts1) + 1)
@ -583,7 +605,8 @@ class TestReceiver(unittest.TestCase):
def test_MISSING_CHECK_have_one_older(self):
object_dir = utils.storage_directory(
os.path.join(self.testdir, 'sda1', diskfile.get_data_dir(0)),
os.path.join(self.testdir, 'sda1',
diskfile.get_data_dir(POLICIES[0])),
'1', self.hash1)
utils.mkdirs(object_dir)
older_ts1 = utils.normalize_timestamp(float(self.ts1) - 1)
@ -1072,7 +1095,6 @@ class TestReceiver(unittest.TestCase):
'content-encoding specialty-header')})
self.assertEqual(req.read_body, '1')
@unit.patch_policies()
def test_UPDATES_with_storage_policy(self):
_PUT_request = [None]