versioned writes middleware

Rewrite object versioning as middleware to simplify the PUT method
in the object controller.

The functionality remains basically the
same with the only major difference being the ability to now
version slo manifest files. dlo manifests are still not
supported as part of this patch.

Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com>

DocImpact
Change-Id: Ie899290b3312e201979eafefb253d1a60b65b837
Signed-off-by: Thiago da Silva <thiago@redhat.com>
Signed-off-by: Prashanth Pai <ppai@redhat.com>
This commit is contained in:
Thiago da Silva 2014-11-09 13:13:27 -05:00
parent 9f1787b653
commit 035a411660
22 changed files with 1816 additions and 980 deletions

View File

@ -9,7 +9,6 @@ user = <your-user-name>
log_facility = LOG_LOCAL2
recon_cache_path = /var/cache/swift
eventlet_debug = true
allow_versions = true
[pipeline:main]
pipeline = recon container-server

View File

@ -9,7 +9,6 @@ user = <your-user-name>
log_facility = LOG_LOCAL3
recon_cache_path = /var/cache/swift2
eventlet_debug = true
allow_versions = true
[pipeline:main]
pipeline = recon container-server

View File

@ -9,7 +9,6 @@ user = <your-user-name>
log_facility = LOG_LOCAL4
recon_cache_path = /var/cache/swift3
eventlet_debug = true
allow_versions = true
[pipeline:main]
pipeline = recon container-server

View File

@ -9,7 +9,6 @@ user = <your-user-name>
log_facility = LOG_LOCAL5
recon_cache_path = /var/cache/swift4
eventlet_debug = true
allow_versions = true
[pipeline:main]
pipeline = recon container-server

View File

@ -9,7 +9,7 @@ eventlet_debug = true
[pipeline:main]
# Yes, proxy-logging appears twice. This is so that
# middleware-originated requests get logged too.
pipeline = catch_errors gatekeeper healthcheck proxy-logging cache bulk tempurl ratelimit crossdomain tempauth staticweb container-quotas account-quotas slo dlo proxy-logging proxy-server
pipeline = catch_errors gatekeeper healthcheck proxy-logging cache bulk tempurl ratelimit crossdomain tempauth staticweb container-quotas account-quotas slo dlo versioned_writes proxy-logging proxy-server
[filter:catch_errors]
use = egg:swift#catch_errors
@ -60,6 +60,10 @@ use = egg:swift#memcache
[filter:gatekeeper]
use = egg:swift#gatekeeper
[filter:versioned_writes]
use = egg:swift#versioned_writes
allow_versioned_writes = true
[app:proxy-server]
use = egg:swift#proxy
allow_account_management = true

View File

@ -102,6 +102,7 @@ DLO :ref:`dynamic-large-objects`
LE :ref:`list_endpoints`
KS :ref:`keystoneauth`
RL :ref:`ratelimit`
VW :ref:`versioned_writes`
======================= =============================

View File

@ -155,6 +155,15 @@ Name Check (Forbidden Character Filter)
:members:
:show-inheritance:
.. _versioned_writes:
Object Versioning
=================
.. automodule:: swift.common.middleware.versioned_writes
:members:
:show-inheritance:
Proxy Logging
=============

View File

@ -1,89 +1,6 @@
=================
Object Versioning
=================
--------
Overview
--------
Object versioning in swift is implemented by setting a flag on the container
to tell swift to version all objects in the container. The flag is the
``X-Versions-Location`` header on the container, and its value is the
container where the versions are stored. It is recommended to use a different
``X-Versions-Location`` container for each container that is being versioned.
When data is ``PUT`` into a versioned container (a container with the
versioning flag turned on), the existing data in the file is redirected to a
new object and the data in the ``PUT`` request is saved as the data for the
versioned object. The new object name (for the previous version) is
``<versions_container>/<length><object_name>/<timestamp>``, where ``length``
is the 3-character zero-padded hexadecimal length of the ``<object_name>`` and
``<timestamp>`` is the timestamp of when the previous version was created.
A ``GET`` to a versioned object will return the current version of the object
without having to do any request redirects or metadata lookups.
A ``POST`` to a versioned object will update the object metadata as normal,
but will not create a new version of the object. In other words, new versions
are only created when the content of the object changes.
A ``DELETE`` to a versioned object will only remove the current version of the
object. If you have 5 total versions of the object, you must delete the
object 5 times to completely remove the object.
Note: A large object manifest file cannot be versioned, but a large object
manifest may point to versioned segments.
--------------------------------------------------
How to Enable Object Versioning in a Swift Cluster
--------------------------------------------------
Set ``allow_versions`` to ``True`` in the container server config.
-----------------------
Examples Using ``curl``
-----------------------
First, create a container with the ``X-Versions-Location`` header or add the
header to an existing container. Also make sure the container referenced by
the ``X-Versions-Location`` exists. In this example, the name of that
container is "versions"::
curl -i -XPUT -H "X-Auth-Token: <token>" \
-H "X-Versions-Location: versions" http://<storage_url>/container
curl -i -XPUT -H "X-Auth-Token: <token>" http://<storage_url>/versions
Create an object (the first version)::
curl -i -XPUT --data-binary 1 -H "X-Auth-Token: <token>" \
http://<storage_url>/container/myobject
Now create a new version of that object::
curl -i -XPUT --data-binary 2 -H "X-Auth-Token: <token>" \
http://<storage_url>/container/myobject
See a listing of the older versions of the object::
curl -i -H "X-Auth-Token: <token>" \
http://<storage_url>/versions?prefix=008myobject/
Now delete the current version of the object and see that the older version is
gone::
curl -i -XDELETE -H "X-Auth-Token: <token>" \
http://<storage_url>/container/myobject
curl -i -H "X-Auth-Token: <token>" \
http://<storage_url>/versions?prefix=008myobject/
---------------------------------------------------
How to Disable Object Versioning in a Swift Cluster
---------------------------------------------------
If you want to disable all functionality, set ``allow_versions`` back to
``False`` in the container server config.
Disable versioning a versioned container (x is any value except empty)::
curl -i -XPOST -H "X-Auth-Token: <token>" \
-H "X-Remove-Versions-Location: x" http://<storage_url>/container
.. automodule:: swift.common.middleware.versioned_writes
:members:
:show-inheritance:

View File

@ -77,7 +77,7 @@ bind_port = 8080
# eventlet_debug = false
[pipeline:main]
pipeline = catch_errors gatekeeper healthcheck proxy-logging cache container_sync bulk tempurl ratelimit tempauth container-quotas account-quotas slo dlo proxy-logging proxy-server
pipeline = catch_errors gatekeeper healthcheck proxy-logging cache container_sync bulk tempurl ratelimit tempauth container-quotas account-quotas slo dlo versioned_writes proxy-logging proxy-server
[app:proxy-server]
use = egg:swift#proxy
@ -703,3 +703,14 @@ use = egg:swift#xprofile
#
# unwind the iterator of applications
# unwind = false
# Note: Put after slo, dlo in the pipeline.
# If you don't put it in the pipeline, it will be inserted automatically.
[filter:versioned_writes]
use = egg:swift#versioned_writes
# Enables using versioned writes middleware and exposing configuration
# settings via HTTP GET /info.
# WARNING: Setting this option bypasses the "allow_versions" option
# in the container configuration file, which will be eventually
# deprecated. See documentation for more details.
# allow_versioned_writes = false

View File

@ -95,6 +95,7 @@ paste.filter_factory =
gatekeeper = swift.common.middleware.gatekeeper:filter_factory
container_sync = swift.common.middleware.container_sync:filter_factory
xprofile = swift.common.middleware.xprofile:filter_factory
versioned_writes = swift.common.middleware.versioned_writes:filter_factory
[build_sphinx]
all_files = 1

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import os
import urllib
import time
@ -406,28 +407,33 @@ def check_destination_header(req):
'<container name>/<object name>')
def check_account_format(req, account):
def check_name_format(req, name, target_type):
"""
Validate that the header contains valid account name.
We assume the caller ensures that
destination header is present in req.headers.
Validate that the header contains valid account or container name.
:param req: HTTP request object
:returns: A properly encoded account name
:param name: header value to validate
:param target_type: which header is being validated (Account or Container)
:returns: A properly encoded account name or container name
:raise: HTTPPreconditionFailed if account header
is not well formatted.
"""
if not account:
if not name:
raise HTTPPreconditionFailed(
request=req,
body='Account name cannot be empty')
if isinstance(account, unicode):
account = account.encode('utf-8')
if '/' in account:
body='%s name cannot be empty' % target_type)
if isinstance(name, unicode):
name = name.encode('utf-8')
if '/' in name:
raise HTTPPreconditionFailed(
request=req,
body='Account name cannot contain slashes')
return account
body='%s name cannot contain slashes' % target_type)
return name
check_account_format = functools.partial(check_name_format,
target_type='Account')
check_container_format = functools.partial(check_name_format,
target_type='Container')
def valid_api_version(version):

View File

@ -0,0 +1,490 @@
# Copyright (c) 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Object versioning in swift is implemented by setting a flag on the container
to tell swift to version all objects in the container. The flag is the
``X-Versions-Location`` header on the container, and its value is the
container where the versions are stored. It is recommended to use a different
``X-Versions-Location`` container for each container that is being versioned.
When data is ``PUT`` into a versioned container (a container with the
versioning flag turned on), the existing data in the file is redirected to a
new object and the data in the ``PUT`` request is saved as the data for the
versioned object. The new object name (for the previous version) is
``<versions_container>/<length><object_name>/<timestamp>``, where ``length``
is the 3-character zero-padded hexadecimal length of the ``<object_name>`` and
``<timestamp>`` is the timestamp of when the previous version was created.
A ``GET`` to a versioned object will return the current version of the object
without having to do any request redirects or metadata lookups.
A ``POST`` to a versioned object will update the object metadata as normal,
but will not create a new version of the object. In other words, new versions
are only created when the content of the object changes.
A ``DELETE`` to a versioned object will only remove the current version of the
object. If you have 5 total versions of the object, you must delete the
object 5 times to completely remove the object.
--------------------------------------------------
How to Enable Object Versioning in a Swift Cluster
--------------------------------------------------
This middleware was written as an effort to refactor parts of the proxy server,
so this functionality was already available in previous releases and every
attempt was made to maintain backwards compatibility. To allow operators to
perform a seamless upgrade, it is not required to add the middleware to the
proxy pipeline and the flag ``allow_versions`` in the container server
configuration files are still valid. In future releases, ``allow_versions``
will be deprecated in favor of adding this middleware to the pipeline to enable
or disable the feature.
In case the middleware is added to the proxy pipeline, you must also
set ``allow_versioned_writes`` to ``True`` in the middleware options
to enable the information about this middleware to be returned in a /info
request.
Upgrade considerations: If ``allow_versioned_writes`` is set in the filter
configuration, you can leave the ``allow_versions`` flag in the container
server configuration files untouched. If you decide to disable or remove the
``allow_versions`` flag, you must re-set any existing containers that had
the 'X-Versions-Location' flag configured so that it can now be tracked by the
versioned_writes middleware.
-----------------------
Examples Using ``curl``
-----------------------
First, create a container with the ``X-Versions-Location`` header or add the
header to an existing container. Also make sure the container referenced by
the ``X-Versions-Location`` exists. In this example, the name of that
container is "versions"::
curl -i -XPUT -H "X-Auth-Token: <token>" \
-H "X-Versions-Location: versions" http://<storage_url>/container
curl -i -XPUT -H "X-Auth-Token: <token>" http://<storage_url>/versions
Create an object (the first version)::
curl -i -XPUT --data-binary 1 -H "X-Auth-Token: <token>" \
http://<storage_url>/container/myobject
Now create a new version of that object::
curl -i -XPUT --data-binary 2 -H "X-Auth-Token: <token>" \
http://<storage_url>/container/myobject
See a listing of the older versions of the object::
curl -i -H "X-Auth-Token: <token>" \
http://<storage_url>/versions?prefix=008myobject/
Now delete the current version of the object and see that the older version is
gone::
curl -i -XDELETE -H "X-Auth-Token: <token>" \
http://<storage_url>/container/myobject
curl -i -H "X-Auth-Token: <token>" \
http://<storage_url>/versions?prefix=008myobject/
---------------------------------------------------
How to Disable Object Versioning in a Swift Cluster
---------------------------------------------------
If you want to disable all functionality, set ``allow_versioned_writes`` to
``False`` in the middleware options.
Disable versioning from a container (x is any value except empty)::
curl -i -XPOST -H "X-Auth-Token: <token>" \
-H "X-Remove-Versions-Location: x" http://<storage_url>/container
"""
import time
from urllib import quote, unquote
from swift.common.utils import get_logger, Timestamp, json, \
register_swift_info, config_true_value
from swift.common.request_helpers import get_sys_meta_prefix
from swift.common.wsgi import WSGIContext, make_pre_authed_request
from swift.common.swob import Request
from swift.common.constraints import (
check_account_format, check_container_format, check_destination_header)
from swift.proxy.controllers.base import get_container_info
from swift.common.http import (
is_success, is_client_error, HTTP_NOT_FOUND)
from swift.common.swob import HTTPPreconditionFailed, HTTPServiceUnavailable, \
HTTPServerError
from swift.common.exceptions import (
ListingIterNotFound, ListingIterError)
class VersionedWritesContext(WSGIContext):
def __init__(self, wsgi_app, logger):
WSGIContext.__init__(self, wsgi_app)
self.logger = logger
def _listing_iter(self, account_name, lcontainer, lprefix, env):
for page in self._listing_pages_iter(account_name,
lcontainer, lprefix, env):
for item in page:
yield item
def _listing_pages_iter(self, account_name, lcontainer, lprefix, env):
marker = ''
while True:
lreq = make_pre_authed_request(
env, method='GET', swift_source='VW',
path='/v1/%s/%s' % (account_name, lcontainer))
lreq.environ['QUERY_STRING'] = \
'format=json&prefix=%s&marker=%s' % (quote(lprefix),
quote(marker))
lresp = lreq.get_response(self.app)
if not is_success(lresp.status_int):
if lresp.status_int == HTTP_NOT_FOUND:
raise ListingIterNotFound()
elif is_client_error(lresp.status_int):
raise HTTPPreconditionFailed()
else:
raise ListingIterError()
if not lresp.body:
break
sublisting = json.loads(lresp.body)
if not sublisting:
break
marker = sublisting[-1]['name'].encode('utf-8')
yield sublisting
def handle_obj_versions_put(self, req, object_versions,
object_name, policy_index):
ret = None
# do a HEAD request to check object versions
_headers = {'X-Newest': 'True',
'X-Backend-Storage-Policy-Index': policy_index,
'x-auth-token': req.headers.get('x-auth-token')}
# make a pre_auth request in case the user has write access
# to container, but not READ. This was allowed in previous version
# (i.e., before middleware) so keeping the same behavior here
head_req = make_pre_authed_request(
req.environ, path=req.path_info,
headers=_headers, method='HEAD', swift_source='VW')
hresp = head_req.get_response(self.app)
is_dlo_manifest = 'X-Object-Manifest' in req.headers or \
'X-Object-Manifest' in hresp.headers
# if there's an existing object, then copy it to
# X-Versions-Location
if is_success(hresp.status_int) and not is_dlo_manifest:
lcontainer = object_versions.split('/')[0]
prefix_len = '%03x' % len(object_name)
lprefix = prefix_len + object_name + '/'
ts_source = hresp.environ.get('swift_x_timestamp')
if ts_source is None:
ts_source = time.mktime(time.strptime(
hresp.headers['last-modified'],
'%a, %d %b %Y %H:%M:%S GMT'))
new_ts = Timestamp(ts_source).internal
vers_obj_name = lprefix + new_ts
copy_headers = {
'Destination': '%s/%s' % (lcontainer, vers_obj_name),
'x-auth-token': req.headers.get('x-auth-token')}
# COPY implementation sets X-Newest to True when it internally
# does a GET on source object. So, we don't have to explicity
# set it in request headers here.
copy_req = make_pre_authed_request(
req.environ, path=req.path_info,
headers=copy_headers, method='COPY', swift_source='VW')
copy_resp = copy_req.get_response(self.app)
if is_success(copy_resp.status_int):
# success versioning previous existing object
# return None and handle original request
ret = None
else:
if is_client_error(copy_resp.status_int):
# missing container or bad permissions
ret = HTTPPreconditionFailed(request=req)
else:
# could not copy the data, bail
ret = HTTPServiceUnavailable(request=req)
else:
if hresp.status_int == HTTP_NOT_FOUND or is_dlo_manifest:
# nothing to version
# return None and handle original request
ret = None
else:
# if not HTTP_NOT_FOUND, return error immediately
ret = hresp
return ret
def handle_obj_versions_delete(self, req, object_versions,
account_name, container_name, object_name):
lcontainer = object_versions.split('/')[0]
prefix_len = '%03x' % len(object_name)
lprefix = prefix_len + object_name + '/'
item_list = []
try:
for _item in self._listing_iter(account_name, lcontainer, lprefix,
req.environ):
item_list.append(_item)
except ListingIterNotFound:
pass
except HTTPPreconditionFailed:
return HTTPPreconditionFailed(request=req)
except ListingIterError:
return HTTPServerError(request=req)
if item_list:
# we're about to start making COPY requests - need to validate the
# write access to the versioned container
if 'swift.authorize' in req.environ:
container_info = get_container_info(
req.environ, self.app)
req.acl = container_info.get('write_acl')
aresp = req.environ['swift.authorize'](req)
if aresp:
return aresp
while len(item_list) > 0:
previous_version = item_list.pop()
# there are older versions so copy the previous version to the
# current object and delete the previous version
prev_obj_name = previous_version['name'].encode('utf-8')
copy_path = '/v1/' + account_name + '/' + \
lcontainer + '/' + prev_obj_name
copy_headers = {'X-Newest': 'True',
'Destination': container_name + '/' + object_name,
'x-auth-token': req.headers.get('x-auth-token')}
copy_req = make_pre_authed_request(
req.environ, path=copy_path,
headers=copy_headers, method='COPY', swift_source='VW')
copy_resp = copy_req.get_response(self.app)
# if the version isn't there, keep trying with previous version
if copy_resp.status_int == HTTP_NOT_FOUND:
continue
if not is_success(copy_resp.status_int):
if is_client_error(copy_resp.status_int):
# some user error, maybe permissions
return HTTPPreconditionFailed(request=req)
else:
# could not copy the data, bail
return HTTPServiceUnavailable(request=req)
# reset these because the COPY changed them
new_del_req = make_pre_authed_request(
req.environ, path=copy_path, method='DELETE',
swift_source='VW')
req = new_del_req
# remove 'X-If-Delete-At', since it is not for the older copy
if 'X-If-Delete-At' in req.headers:
del req.headers['X-If-Delete-At']
break
# handle DELETE request here in case it was modified
return req.get_response(self.app)
def handle_container_request(self, env, start_response):
app_resp = self._app_call(env)
if self._response_headers is None:
self._response_headers = []
sysmeta_version_hdr = get_sys_meta_prefix('container') + \
'versions-location'
location = ''
for key, val in self._response_headers:
if key.lower() == sysmeta_version_hdr:
location = val
if location:
self._response_headers.extend([('X-Versions-Location', location)])
start_response(self._response_status,
self._response_headers,
self._response_exc_info)
return app_resp
class VersionedWritesMiddleware(object):
def __init__(self, app, conf):
self.app = app
self.conf = conf
self.logger = get_logger(conf, log_route='versioned_writes')
def container_request(self, req, start_response, enabled):
sysmeta_version_hdr = get_sys_meta_prefix('container') + \
'versions-location'
# set version location header as sysmeta
if 'X-Versions-Location' in req.headers:
val = req.headers.get('X-Versions-Location')
if val:
# diferently from previous version, we are actually
# returning an error if user tries to set versions location
# while feature is explicitly disabled.
if not config_true_value(enabled) and \
req.method in ('PUT', 'POST'):
raise HTTPPreconditionFailed(
request=req, content_type='text/plain',
body='Versioned Writes is disabled')
location = check_container_format(req, val)
req.headers[sysmeta_version_hdr] = location
# reset original header to maintain sanity
# now only sysmeta is source of Versions Location
req.headers['X-Versions-Location'] = ''
# if both headers are in the same request
# adding location takes precendence over removing
if 'X-Remove-Versions-Location' in req.headers:
del req.headers['X-Remove-Versions-Location']
else:
# empty value is the same as X-Remove-Versions-Location
req.headers['X-Remove-Versions-Location'] = 'x'
# handle removing versions container
val = req.headers.get('X-Remove-Versions-Location')
if val:
req.headers.update({sysmeta_version_hdr: ''})
req.headers.update({'X-Versions-Location': ''})
del req.headers['X-Remove-Versions-Location']
# send request and translate sysmeta headers from response
vw_ctx = VersionedWritesContext(self.app, self.logger)
return vw_ctx.handle_container_request(req.environ, start_response)
def object_request(self, req, version, account, container, obj,
allow_versioned_writes):
account_name = unquote(account)
container_name = unquote(container)
object_name = unquote(obj)
container_info = None
resp = None
is_enabled = config_true_value(allow_versioned_writes)
if req.method in ('PUT', 'DELETE'):
container_info = get_container_info(
req.environ, self.app)
elif req.method == 'COPY' and 'Destination' in req.headers:
if 'Destination-Account' in req.headers:
account_name = req.headers.get('Destination-Account')
account_name = check_account_format(req, account_name)
container_name, object_name = check_destination_header(req)
req.environ['PATH_INFO'] = "/%s/%s/%s/%s" % (
version, account_name, container_name, object_name)
container_info = get_container_info(
req.environ, self.app)
if not container_info:
return self.app
# To maintain backwards compatibility, container version
# location could be stored as sysmeta or not, need to check both.
# If stored as sysmeta, check if middleware is enabled. If sysmeta
# is not set, but versions property is set in container_info, then
# for backwards compatibility feature is enabled.
object_versions = container_info.get(
'sysmeta', {}).get('versions-location')
if object_versions and isinstance(object_versions, unicode):
object_versions = object_versions.encode('utf-8')
elif not object_versions:
object_versions = container_info.get('versions')
# if allow_versioned_writes is not set in the configuration files
# but 'versions' is configured, enable feature to maintain
# backwards compatibility
if not allow_versioned_writes and object_versions:
is_enabled = True
if is_enabled and object_versions:
object_versions = unquote(object_versions)
vw_ctx = VersionedWritesContext(self.app, self.logger)
if req.method in ('PUT', 'COPY'):
policy_idx = req.headers.get(
'X-Backend-Storage-Policy-Index',
container_info['storage_policy'])
resp = vw_ctx.handle_obj_versions_put(
req, object_versions, object_name, policy_idx)
else: # handle DELETE
resp = vw_ctx.handle_obj_versions_delete(
req, object_versions, account_name,
container_name, object_name)
if resp:
return resp
else:
return self.app
def __call__(self, env, start_response):
# making a duplicate, because if this is a COPY request, we will
# modify the PATH_INFO to find out if the 'Destination' is in a
# versioned container
req = Request(env.copy())
try:
(version, account, container, obj) = req.split_path(3, 4, True)
except ValueError:
return self.app(env, start_response)
# In case allow_versioned_writes is set in the filter configuration,
# the middleware becomes the authority on whether object
# versioning is enabled or not. In case it is not set, then
# the option in the container configuration is still checked
# for backwards compatibility
# For a container request, first just check if option is set,
# can be either true or false.
# If set, check if enabled when actually trying to set container
# header. If not set, let request be handled by container server
# for backwards compatibility.
# For an object request, also check if option is set (either T or F).
# If set, check if enabled when checking versions container in
# sysmeta property. If it is not set check 'versions' property in
# container_info
allow_versioned_writes = self.conf.get('allow_versioned_writes')
if allow_versioned_writes and container and not obj:
return self.container_request(req, start_response,
allow_versioned_writes)
elif obj and req.method in ('PUT', 'COPY', 'DELETE'):
return self.object_request(
req, version, account, container, obj,
allow_versioned_writes)(env, start_response)
else:
return self.app(env, start_response)
def filter_factory(global_conf, **local_conf):
conf = global_conf.copy()
conf.update(local_conf)
if config_true_value(conf.get('allow_versioned_writes')):
register_swift_info('versioned_writes')
def obj_versions_filter(app):
return VersionedWritesMiddleware(app, conf)
return obj_versions_filter

View File

@ -51,13 +51,12 @@ from swift.common.constraints import check_metadata, check_object_creation, \
check_account_format
from swift.common import constraints
from swift.common.exceptions import ChunkReadTimeout, \
ChunkWriteTimeout, ConnectionTimeout, ListingIterNotFound, \
ListingIterNotAuthorized, ListingIterError, ResponseTimeout, \
ChunkWriteTimeout, ConnectionTimeout, ResponseTimeout, \
InsufficientStorage, FooterNotSupported, MultiphasePUTNotSupported, \
PutterConnectError
from swift.common.http import (
is_success, is_client_error, is_server_error, HTTP_CONTINUE, HTTP_CREATED,
HTTP_MULTIPLE_CHOICES, HTTP_NOT_FOUND, HTTP_INTERNAL_SERVER_ERROR,
is_success, is_server_error, HTTP_CONTINUE, HTTP_CREATED,
HTTP_MULTIPLE_CHOICES, HTTP_INTERNAL_SERVER_ERROR,
HTTP_SERVICE_UNAVAILABLE, HTTP_INSUFFICIENT_STORAGE,
HTTP_PRECONDITION_FAILED, HTTP_CONFLICT, is_informational)
from swift.common.storage_policy import (POLICIES, REPL_POLICY, EC_POLICY,
@ -139,46 +138,6 @@ class BaseObjectController(Controller):
self.container_name = unquote(container_name)
self.object_name = unquote(object_name)
def _listing_iter(self, lcontainer, lprefix, env):
for page in self._listing_pages_iter(lcontainer, lprefix, env):
for item in page:
yield item
def _listing_pages_iter(self, lcontainer, lprefix, env):
lpartition = self.app.container_ring.get_part(
self.account_name, lcontainer)
marker = ''
while True:
lreq = Request.blank('i will be overridden by env', environ=env)
# Don't quote PATH_INFO, by WSGI spec
lreq.environ['PATH_INFO'] = \
'/v1/%s/%s' % (self.account_name, lcontainer)
lreq.environ['REQUEST_METHOD'] = 'GET'
lreq.environ['QUERY_STRING'] = \
'format=json&prefix=%s&marker=%s' % (quote(lprefix),
quote(marker))
container_node_iter = self.app.iter_nodes(self.app.container_ring,
lpartition)
lresp = self.GETorHEAD_base(
lreq, _('Container'), container_node_iter, lpartition,
lreq.swift_entity_path)
if 'swift.authorize' in env:
lreq.acl = lresp.headers.get('x-container-read')
aresp = env['swift.authorize'](lreq)
if aresp:
raise ListingIterNotAuthorized(aresp)
if lresp.status_int == HTTP_NOT_FOUND:
raise ListingIterNotFound()
elif not is_success(lresp.status_int):
raise ListingIterError()
if not lresp.body:
break
sublisting = json.loads(lresp.body)
if not sublisting:
break
marker = sublisting[-1]['name'].encode('utf-8')
yield sublisting
def iter_nodes_local_first(self, ring, partition):
"""
Yields nodes for a ring partition.
@ -548,71 +507,6 @@ class BaseObjectController(Controller):
# until copy request handling moves to middleware
return None, req, data_source, update_response
def _handle_object_versions(self, req):
"""
This method handles versionining of objects in containers that
have the feature enabled.
When a new PUT request is sent, the proxy checks for previous versions
of that same object name. If found, it is copied to a different
container and the new version is stored in its place.
This method was added as part of the PUT method refactoring and the
functionality is expected to be moved to middleware
"""
container_info = self.container_info(
self.account_name, self.container_name, req)
policy_index = req.headers.get('X-Backend-Storage-Policy-Index',
container_info['storage_policy'])
obj_ring = self.app.get_object_ring(policy_index)
partition, nodes = obj_ring.get_nodes(
self.account_name, self.container_name, self.object_name)
object_versions = container_info['versions']
# do a HEAD request for checking object versions
if object_versions and not req.environ.get('swift_versioned_copy'):
# make sure proxy-server uses the right policy index
_headers = {'X-Backend-Storage-Policy-Index': policy_index,
'X-Newest': 'True'}
hreq = Request.blank(req.path_info, headers=_headers,
environ={'REQUEST_METHOD': 'HEAD'})
hnode_iter = self.app.iter_nodes(obj_ring, partition)
hresp = self.GETorHEAD_base(
hreq, _('Object'), hnode_iter, partition,
hreq.swift_entity_path)
is_manifest = 'X-Object-Manifest' in req.headers or \
'X-Object-Manifest' in hresp.headers
if hresp.status_int != HTTP_NOT_FOUND and not is_manifest:
# This is a version manifest and needs to be handled
# differently. First copy the existing data to a new object,
# then write the data from this request to the version manifest
# object.
lcontainer = object_versions.split('/')[0]
prefix_len = '%03x' % len(self.object_name)
lprefix = prefix_len + self.object_name + '/'
ts_source = hresp.environ.get('swift_x_timestamp')
if ts_source is None:
ts_source = time.mktime(time.strptime(
hresp.headers['last-modified'],
'%a, %d %b %Y %H:%M:%S GMT'))
new_ts = Timestamp(ts_source).internal
vers_obj_name = lprefix + new_ts
copy_headers = {
'Destination': '%s/%s' % (lcontainer, vers_obj_name)}
copy_environ = {'REQUEST_METHOD': 'COPY',
'swift_versioned_copy': True
}
copy_req = Request.blank(req.path_info, headers=copy_headers,
environ=copy_environ)
copy_resp = self.COPY(copy_req)
if is_client_error(copy_resp.status_int):
# missing container or bad permissions
raise HTTPPreconditionFailed(request=req)
elif not is_success(copy_resp.status_int):
# could not copy the data, bail
raise HTTPServiceUnavailable(request=req)
def _update_content_type(self, req):
# Sometimes the 'content-type' header exists, but is set to None.
req.content_type_manually_set = True
@ -819,9 +713,6 @@ class BaseObjectController(Controller):
self._update_x_timestamp(req)
# check if versioning is enabled and handle copying previous version
self._handle_object_versions(req)
# check if request is a COPY of an existing object
source_header = req.headers.get('X-Copy-From')
if source_header:
@ -865,86 +756,10 @@ class BaseObjectController(Controller):
containers = container_info['nodes']
req.acl = container_info['write_acl']
req.environ['swift_sync_key'] = container_info['sync_key']
object_versions = container_info['versions']
if 'swift.authorize' in req.environ:
aresp = req.environ['swift.authorize'](req)
if aresp:
return aresp
if object_versions:
# this is a version manifest and needs to be handled differently
object_versions = unquote(object_versions)
lcontainer = object_versions.split('/')[0]
prefix_len = '%03x' % len(self.object_name)
lprefix = prefix_len + self.object_name + '/'
item_list = []
try:
for _item in self._listing_iter(lcontainer, lprefix,
req.environ):
item_list.append(_item)
except ListingIterNotFound:
# no worries, last_item is None
pass
except ListingIterNotAuthorized as err:
return err.aresp
except ListingIterError:
return HTTPServerError(request=req)
while len(item_list) > 0:
previous_version = item_list.pop()
# there are older versions so copy the previous version to the
# current object and delete the previous version
orig_container = self.container_name
orig_obj = self.object_name
self.container_name = lcontainer
self.object_name = previous_version['name'].encode('utf-8')
copy_path = '/v1/' + self.account_name + '/' + \
self.container_name + '/' + self.object_name
copy_headers = {'X-Newest': 'True',
'Destination': orig_container + '/' + orig_obj
}
copy_environ = {'REQUEST_METHOD': 'COPY',
'swift_versioned_copy': True
}
creq = Request.blank(copy_path, headers=copy_headers,
environ=copy_environ)
copy_resp = self.COPY(creq)
if copy_resp.status_int == HTTP_NOT_FOUND:
# the version isn't there so we'll try with previous
self.container_name = orig_container
self.object_name = orig_obj
continue
if is_client_error(copy_resp.status_int):
# some user error, maybe permissions
return HTTPPreconditionFailed(request=req)
elif not is_success(copy_resp.status_int):
# could not copy the data, bail
return HTTPServiceUnavailable(request=req)
# reset these because the COPY changed them
self.container_name = lcontainer
self.object_name = previous_version['name'].encode('utf-8')
new_del_req = Request.blank(copy_path, environ=req.environ)
container_info = self.container_info(
self.account_name, self.container_name, req)
policy_idx = container_info['storage_policy']
obj_ring = self.app.get_object_ring(policy_idx)
# pass the policy index to storage nodes via req header
new_del_req.headers['X-Backend-Storage-Policy-Index'] = \
policy_idx
container_partition = container_info['partition']
containers = container_info['nodes']
new_del_req.acl = container_info['write_acl']
new_del_req.path_info = copy_path
req = new_del_req
# remove 'X-If-Delete-At', since it is not for the older copy
if 'X-If-Delete-At' in req.headers:
del req.headers['X-If-Delete-At']
if 'swift.authorize' in req.environ:
aresp = req.environ['swift.authorize'](req)
if aresp:
return aresp
break
if not containers:
return HTTPNotFound(request=req)
partition, nodes = obj_ring.get_nodes(

View File

@ -64,6 +64,9 @@ required_filters = [
if pipe.startswith('catch_errors')
else [])},
{'name': 'dlo', 'after_fn': lambda _junk: [
'staticweb', 'tempauth', 'keystoneauth',
'catch_errors', 'gatekeeper', 'proxy_logging']},
{'name': 'versioned_writes', 'after_fn': lambda _junk: [
'staticweb', 'tempauth', 'keystoneauth',
'catch_errors', 'gatekeeper', 'proxy_logging']}]

View File

@ -236,6 +236,9 @@ class Connection(object):
if not cfg.get('no_auth_token'):
headers['X-Auth-Token'] = self.storage_token
if cfg.get('use_token'):
headers['X-Auth-Token'] = cfg.get('use_token')
if isinstance(hdrs, dict):
headers.update(hdrs)
return headers
@ -507,6 +510,18 @@ class Container(Base):
return self.conn.make_request('PUT', self.path, hdrs=hdrs,
parms=parms, cfg=cfg) in (201, 202)
def update_metadata(self, hdrs=None, cfg=None):
if hdrs is None:
hdrs = {}
if cfg is None:
cfg = {}
self.conn.make_request('POST', self.path, hdrs=hdrs, cfg=cfg)
if not 200 <= self.conn.response.status <= 299:
raise ResponseError(self.conn.response, 'POST',
self.conn.make_path(self.path))
return True
def delete(self, hdrs=None, parms=None):
if hdrs is None:
hdrs = {}
@ -637,6 +652,9 @@ class File(Base):
else:
headers['Content-Length'] = 0
if cfg.get('use_token'):
headers['X-Auth-Token'] = cfg.get('use_token')
if cfg.get('no_content_type'):
pass
elif self.content_type:
@ -711,13 +729,13 @@ class File(Base):
return self.conn.make_request('COPY', self.path, hdrs=headers,
parms=parms) == 201
def delete(self, hdrs=None, parms=None):
def delete(self, hdrs=None, parms=None, cfg=None):
if hdrs is None:
hdrs = {}
if parms is None:
parms = {}
if self.conn.make_request('DELETE', self.path, hdrs=hdrs,
parms=parms) != 204:
cfg=cfg, parms=parms) != 204:
raise ResponseError(self.conn.response, 'DELETE',
self.conn.make_path(self.path))

View File

@ -2598,7 +2598,7 @@ class TestObjectVersioningEnv(object):
@classmethod
def setUp(cls):
cls.conn = Connection(tf.config)
cls.conn.authenticate()
cls.storage_url, cls.storage_token = cls.conn.authenticate()
cls.account = Account(cls.conn, tf.config.get('account',
tf.config['username']))
@ -2628,6 +2628,30 @@ class TestObjectVersioningEnv(object):
# if versioning is off, then X-Versions-Location won't persist
cls.versioning_enabled = 'versions' in container_info
# setup another account to test ACLs
config2 = deepcopy(tf.config)
config2['account'] = tf.config['account2']
config2['username'] = tf.config['username2']
config2['password'] = tf.config['password2']
cls.conn2 = Connection(config2)
cls.storage_url2, cls.storage_token2 = cls.conn2.authenticate()
cls.account2 = cls.conn2.get_account()
cls.account2.delete_containers()
# setup another account with no access to anything to test ACLs
config3 = deepcopy(tf.config)
config3['account'] = tf.config['account']
config3['username'] = tf.config['username3']
config3['password'] = tf.config['password3']
cls.conn3 = Connection(config3)
cls.storage_url3, cls.storage_token3 = cls.conn3.authenticate()
cls.account3 = cls.conn3.get_account()
@classmethod
def tearDown(cls):
cls.account.delete_containers()
cls.account2.delete_containers()
class TestCrossPolicyObjectVersioningEnv(object):
# tri-state: None initially, then True/False
@ -2650,14 +2674,14 @@ class TestCrossPolicyObjectVersioningEnv(object):
cls.multiple_policies_enabled = True
else:
cls.multiple_policies_enabled = False
# We have to lie here that versioning is enabled. We actually
# don't know, but it does not matter. We know these tests cannot
# run without multiple policies present. If multiple policies are
# present, we won't be setting this field to any value, so it
# should all still work.
cls.versioning_enabled = True
cls.versioning_enabled = False
return
if cls.versioning_enabled is None:
cls.versioning_enabled = 'versioned_writes' in cluster_info
if not cls.versioning_enabled:
return
policy = cls.policies.select()
version_policy = cls.policies.exclude(name=policy['name']).select()
@ -2691,6 +2715,25 @@ class TestCrossPolicyObjectVersioningEnv(object):
# if versioning is off, then X-Versions-Location won't persist
cls.versioning_enabled = 'versions' in container_info
# setup another account to test ACLs
config2 = deepcopy(tf.config)
config2['account'] = tf.config['account2']
config2['username'] = tf.config['username2']
config2['password'] = tf.config['password2']
cls.conn2 = Connection(config2)
cls.storage_url2, cls.storage_token2 = cls.conn2.authenticate()
cls.account2 = cls.conn2.get_account()
cls.account2.delete_containers()
# setup another account with no access to anything to test ACLs
config3 = deepcopy(tf.config)
config3['account'] = tf.config['account']
config3['username'] = tf.config['username3']
config3['password'] = tf.config['password3']
cls.conn3 = Connection(config3)
cls.storage_url3, cls.storage_token3 = cls.conn3.authenticate()
cls.account3 = cls.conn3.get_account()
class TestObjectVersioning(Base):
env = TestObjectVersioningEnv
@ -2709,40 +2752,103 @@ class TestObjectVersioning(Base):
def tearDown(self):
super(TestObjectVersioning, self).tearDown()
try:
# delete versions first!
# only delete files and not container
# as they were configured in self.env
self.env.versions_container.delete_files()
self.env.container.delete_files()
except ResponseError:
pass
def test_clear_version_option(self):
# sanity
self.assertEqual(self.env.container.info()['versions'],
self.env.versions_container.name)
self.env.container.update_metadata(
hdrs={'X-Versions-Location': ''})
self.assertEqual(self.env.container.info().get('versions'), None)
# set location back to the way it was
self.env.container.update_metadata(
hdrs={'X-Versions-Location': self.env.versions_container.name})
self.assertEqual(self.env.container.info()['versions'],
self.env.versions_container.name)
def test_overwriting(self):
container = self.env.container
versions_container = self.env.versions_container
cont_info = container.info()
self.assertEquals(cont_info['versions'], versions_container.name)
obj_name = Utils.create_name()
versioned_obj = container.file(obj_name)
versioned_obj.write("aaaaa")
versioned_obj.write("aaaaa", hdrs={'Content-Type': 'text/jibberish01'})
obj_info = versioned_obj.info()
self.assertEqual('text/jibberish01', obj_info['content_type'])
self.assertEqual(0, versions_container.info()['object_count'])
versioned_obj.write("bbbbb")
versioned_obj.write("bbbbb", hdrs={'Content-Type': 'text/jibberish02',
'X-Object-Meta-Foo': 'Bar'})
versioned_obj.initialize()
self.assertEqual(versioned_obj.content_type, 'text/jibberish02')
self.assertEqual(versioned_obj.metadata['foo'], 'Bar')
# the old version got saved off
self.assertEqual(1, versions_container.info()['object_count'])
versioned_obj_name = versions_container.files()[0]
self.assertEqual(
"aaaaa", versions_container.file(versioned_obj_name).read())
prev_version = versions_container.file(versioned_obj_name)
prev_version.initialize()
self.assertEqual("aaaaa", prev_version.read())
self.assertEqual(prev_version.content_type, 'text/jibberish01')
# make sure the new obj metadata did not leak to the prev. version
self.assertTrue('foo' not in prev_version.metadata)
# check that POST does not create a new version
versioned_obj.sync_metadata(metadata={'fu': 'baz'})
self.assertEqual(1, versions_container.info()['object_count'])
# if we overwrite it again, there are two versions
versioned_obj.write("ccccc")
self.assertEqual(2, versions_container.info()['object_count'])
versioned_obj_name = versions_container.files()[1]
prev_version = versions_container.file(versioned_obj_name)
prev_version.initialize()
self.assertEqual("bbbbb", prev_version.read())
self.assertEqual(prev_version.content_type, 'text/jibberish02')
self.assertTrue('foo' in prev_version.metadata)
self.assertTrue('fu' in prev_version.metadata)
# as we delete things, the old contents return
self.assertEqual("ccccc", versioned_obj.read())
# test copy from a different container
src_container = self.env.account.container(Utils.create_name())
self.assertTrue(src_container.create())
src_name = Utils.create_name()
src_obj = src_container.file(src_name)
src_obj.write("ddddd", hdrs={'Content-Type': 'text/jibberish04'})
src_obj.copy(container.name, obj_name)
self.assertEqual("ddddd", versioned_obj.read())
versioned_obj.initialize()
self.assertEqual(versioned_obj.content_type, 'text/jibberish04')
# make sure versions container has the previous version
self.assertEqual(3, versions_container.info()['object_count'])
versioned_obj_name = versions_container.files()[2]
prev_version = versions_container.file(versioned_obj_name)
prev_version.initialize()
self.assertEqual("ccccc", prev_version.read())
# test delete
versioned_obj.delete()
self.assertEqual("ccccc", versioned_obj.read())
versioned_obj.delete()
self.assertEqual("bbbbb", versioned_obj.read())
versioned_obj.delete()
self.assertEqual("aaaaa", versioned_obj.read())
self.assertEqual(0, versions_container.info()['object_count'])
versioned_obj.delete()
self.assertRaises(ResponseError, versioned_obj.read)
@ -2774,6 +2880,87 @@ class TestObjectVersioning(Base):
self.assertEqual(3, versions_container.info()['object_count'])
self.assertEqual("112233", man_file.read())
def test_versioning_container_acl(self):
# create versions container and DO NOT give write access to account2
versions_container = self.env.account.container(Utils.create_name())
self.assertTrue(versions_container.create(hdrs={
'X-Container-Write': ''
}))
# check account2 cannot write to versions container
fail_obj_name = Utils.create_name()
fail_obj = versions_container.file(fail_obj_name)
self.assertRaises(ResponseError, fail_obj.write, "should fail",
cfg={'use_token': self.env.storage_token2})
# create container and give write access to account2
# don't set X-Versions-Location just yet
container = self.env.account.container(Utils.create_name())
self.assertTrue(container.create(hdrs={
'X-Container-Write': self.env.conn2.user_acl}))
# check account2 cannot set X-Versions-Location on container
self.assertRaises(ResponseError, container.update_metadata, hdrs={
'X-Versions-Location': versions_container},
cfg={'use_token': self.env.storage_token2})
# good! now let admin set the X-Versions-Location
# p.s.: sticking a 'x-remove' header here to test precedence
# of both headers. Setting the location should succeed.
self.assertTrue(container.update_metadata(hdrs={
'X-Remove-Versions-Location': versions_container,
'X-Versions-Location': versions_container}))
# write object twice to container and check version
obj_name = Utils.create_name()
versioned_obj = container.file(obj_name)
self.assertTrue(versioned_obj.write("never argue with the data",
cfg={'use_token': self.env.storage_token2}))
self.assertEqual(versioned_obj.read(), "never argue with the data")
self.assertTrue(
versioned_obj.write("we don't have no beer, just tequila",
cfg={'use_token': self.env.storage_token2}))
self.assertEqual(versioned_obj.read(),
"we don't have no beer, just tequila")
self.assertEqual(1, versions_container.info()['object_count'])
# read the original uploaded object
for filename in versions_container.files():
backup_file = versions_container.file(filename)
break
self.assertEqual(backup_file.read(), "never argue with the data")
# user3 (some random user with no access to anything)
# tries to read from versioned container
self.assertRaises(ResponseError, backup_file.read,
cfg={'use_token': self.env.storage_token3})
# user3 cannot write or delete from source container either
self.assertRaises(ResponseError, versioned_obj.write,
"some random user trying to write data",
cfg={'use_token': self.env.storage_token3})
self.assertRaises(ResponseError, versioned_obj.delete,
cfg={'use_token': self.env.storage_token3})
# user2 can't read or delete from versions-location
self.assertRaises(ResponseError, backup_file.read,
cfg={'use_token': self.env.storage_token2})
self.assertRaises(ResponseError, backup_file.delete,
cfg={'use_token': self.env.storage_token2})
# but is able to delete from the source container
# this could be a helpful scenario for dev ops that want to setup
# just one container to hold object versions of multiple containers
# and each one of those containers are owned by different users
self.assertTrue(versioned_obj.delete(
cfg={'use_token': self.env.storage_token2}))
# tear-down since we create these containers here
# and not in self.env
versions_container.delete_recursive()
container.delete_recursive()
def test_versioning_check_acl(self):
container = self.env.container
versions_container = self.env.versions_container

View File

@ -76,7 +76,7 @@ class FakeSwift(object):
path += '?' + env['QUERY_STRING']
if 'swift.authorize' in env:
resp = env['swift.authorize']()
resp = env['swift.authorize'](swob.Request(env))
if resp:
return resp(env, start_response)

View File

@ -793,7 +793,7 @@ class TestDloGetManifest(DloTestCase):
def test_get_with_auth_overridden(self):
auth_got_called = [0]
def my_auth():
def my_auth(req):
auth_got_called[0] += 1
return None

View File

@ -0,0 +1,558 @@
# Copyright (c) 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from swift.common import swob
from swift.common.middleware import versioned_writes
from swift.common.swob import Request
from test.unit.common.middleware.helpers import FakeSwift
class FakeCache(object):
def __init__(self, val):
if 'status' not in val:
val['status'] = 200
self.val = val
def get(self, *args):
return self.val
class VersionedWritesTestCase(unittest.TestCase):
def setUp(self):
self.app = FakeSwift()
conf = {'allow_versioned_writes': 'true'}
self.vw = versioned_writes.filter_factory(conf)(self.app)
def call_app(self, req, app=None, expect_exception=False):
if app is None:
app = self.app
self.authorized = []
def authorize(req):
self.authorized.append(req)
if 'swift.authorize' not in req.environ:
req.environ['swift.authorize'] = authorize
req.headers.setdefault("User-Agent", "Marula Kruger")
status = [None]
headers = [None]
def start_response(s, h, ei=None):
status[0] = s
headers[0] = h
body_iter = app(req.environ, start_response)
body = ''
caught_exc = None
try:
for chunk in body_iter:
body += chunk
except Exception as exc:
if expect_exception:
caught_exc = exc
else:
raise
if expect_exception:
return status[0], headers[0], body, caught_exc
else:
return status[0], headers[0], body
def call_vw(self, req, **kwargs):
return self.call_app(req, app=self.vw, **kwargs)
def assertRequestEqual(self, req, other):
self.assertEqual(req.method, other.method)
self.assertEqual(req.path, other.path)
def test_put_container(self):
self.app.register('PUT', '/v1/a/c', swob.HTTPOk, {}, 'passed')
req = Request.blank('/v1/a/c',
headers={'X-Versions-Location': 'ver_cont'},
environ={'REQUEST_METHOD': 'PUT'})
status, headers, body = self.call_vw(req)
self.assertEquals(status, '200 OK')
# check for sysmeta header
calls = self.app.calls_with_headers
method, path, req_headers = calls[0]
self.assertEquals('PUT', method)
self.assertEquals('/v1/a/c', path)
self.assertTrue('x-container-sysmeta-versions-location' in req_headers)
self.assertEqual(len(self.authorized), 1)
self.assertRequestEqual(req, self.authorized[0])
def test_container_allow_versioned_writes_false(self):
self.vw.conf = {'allow_versioned_writes': 'false'}
# PUT/POST container must fail as 412 when allow_versioned_writes
# set to false
for method in ('PUT', 'POST'):
req = Request.blank('/v1/a/c',
headers={'X-Versions-Location': 'ver_cont'},
environ={'REQUEST_METHOD': method})
try:
status, headers, body = self.call_vw(req)
except swob.HTTPException as e:
pass
self.assertEquals(e.status_int, 412)
# GET/HEAD performs as normal
self.app.register('GET', '/v1/a/c', swob.HTTPOk, {}, 'passed')
self.app.register('HEAD', '/v1/a/c', swob.HTTPOk, {}, 'passed')
for method in ('GET', 'HEAD'):
req = Request.blank('/v1/a/c',
headers={'X-Versions-Location': 'ver_cont'},
environ={'REQUEST_METHOD': method})
status, headers, body = self.call_vw(req)
self.assertEquals(status, '200 OK')
def test_remove_versions_location(self):
self.app.register('POST', '/v1/a/c', swob.HTTPOk, {}, 'passed')
req = Request.blank('/v1/a/c',
headers={'X-Remove-Versions-Location': 'x'},
environ={'REQUEST_METHOD': 'POST'})
status, headers, body = self.call_vw(req)
self.assertEquals(status, '200 OK')
# check for sysmeta header
calls = self.app.calls_with_headers
method, path, req_headers = calls[0]
self.assertEquals('POST', method)
self.assertEquals('/v1/a/c', path)
self.assertTrue('x-container-sysmeta-versions-location' in req_headers)
self.assertTrue('x-versions-location' in req_headers)
self.assertEqual(len(self.authorized), 1)
self.assertRequestEqual(req, self.authorized[0])
def test_remove_add_versions_precedence(self):
self.app.register(
'POST', '/v1/a/c', swob.HTTPOk,
{'x-container-sysmeta-versions-location': 'ver_cont'},
'passed')
req = Request.blank('/v1/a/c',
headers={'X-Remove-Versions-Location': 'x',
'X-Versions-Location': 'ver_cont'},
environ={'REQUEST_METHOD': 'POST'})
status, headers, body = self.call_vw(req)
self.assertEquals(status, '200 OK')
self.assertTrue(('X-Versions-Location', 'ver_cont') in headers)
# check for sysmeta header
calls = self.app.calls_with_headers
method, path, req_headers = calls[0]
self.assertEquals('POST', method)
self.assertEquals('/v1/a/c', path)
self.assertTrue('x-container-sysmeta-versions-location' in req_headers)
self.assertTrue('x-remove-versions-location' not in req_headers)
self.assertEqual(len(self.authorized), 1)
self.assertRequestEqual(req, self.authorized[0])
def test_get_container(self):
self.app.register(
'GET', '/v1/a/c', swob.HTTPOk,
{'x-container-sysmeta-versions-location': 'ver_cont'}, None)
req = Request.blank(
'/v1/a/c',
environ={'REQUEST_METHOD': 'GET'})
status, headers, body = self.call_vw(req)
self.assertEquals(status, '200 OK')
self.assertTrue(('X-Versions-Location', 'ver_cont') in headers)
self.assertEqual(len(self.authorized), 1)
self.assertRequestEqual(req, self.authorized[0])
def test_get_head(self):
self.app.register('GET', '/v1/a/c/o', swob.HTTPOk, {}, None)
req = Request.blank(
'/v1/a/c/o',
environ={'REQUEST_METHOD': 'GET'})
status, headers, body = self.call_vw(req)
self.assertEquals(status, '200 OK')
self.assertEqual(len(self.authorized), 1)
self.assertRequestEqual(req, self.authorized[0])
self.app.register('HEAD', '/v1/a/c/o', swob.HTTPOk, {}, None)
req = Request.blank(
'/v1/a/c/o',
environ={'REQUEST_METHOD': 'HEAD'})
status, headers, body = self.call_vw(req)
self.assertEquals(status, '200 OK')
self.assertEqual(len(self.authorized), 1)
self.assertRequestEqual(req, self.authorized[0])
def test_put_object_no_versioning(self):
self.app.register(
'PUT', '/v1/a/c/o', swob.HTTPOk, {}, 'passed')
cache = FakeCache({})
req = Request.blank(
'/v1/a/c/o',
environ={'REQUEST_METHOD': 'PUT', 'swift.cache': cache,
'CONTENT_LENGTH': '100'})
status, headers, body = self.call_vw(req)
self.assertEquals(status, '200 OK')
self.assertEqual(len(self.authorized), 1)
self.assertRequestEqual(req, self.authorized[0])
def test_put_first_object_success(self):
self.app.register(
'PUT', '/v1/a/c/o', swob.HTTPOk, {}, 'passed')
self.app.register(
'HEAD', '/v1/a/c/o', swob.HTTPNotFound, {}, None)
cache = FakeCache({'sysmeta': {'versions-location': 'ver_cont'}})
req = Request.blank(
'/v1/a/c/o',
environ={'REQUEST_METHOD': 'PUT', 'swift.cache': cache,
'CONTENT_LENGTH': '100'})
status, headers, body = self.call_vw(req)
self.assertEquals(status, '200 OK')
self.assertEqual(len(self.authorized), 1)
self.assertRequestEqual(req, self.authorized[0])
def test_PUT_versioning_with_nonzero_default_policy(self):
self.app.register(
'PUT', '/v1/a/c/o', swob.HTTPOk, {}, 'passed')
self.app.register(
'HEAD', '/v1/a/c/o', swob.HTTPNotFound, {}, None)
cache = FakeCache({'versions': 'ver_cont', 'storage_policy': '2'})
req = Request.blank(
'/v1/a/c/o',
environ={'REQUEST_METHOD': 'PUT', 'swift.cache': cache,
'CONTENT_LENGTH': '100'})
status, headers, body = self.call_vw(req)
self.assertEquals(status, '200 OK')
# check for 'X-Backend-Storage-Policy-Index' in HEAD request
calls = self.app.calls_with_headers
method, path, req_headers = calls[0]
self.assertEquals('HEAD', method)
self.assertEquals('/v1/a/c/o', path)
self.assertTrue('X-Backend-Storage-Policy-Index' in req_headers)
self.assertEquals('2',
req_headers.get('X-Backend-Storage-Policy-Index'))
self.assertEqual(len(self.authorized), 1)
self.assertRequestEqual(req, self.authorized[0])
def test_put_object_no_versioning_with_container_config_true(self):
# set False to versions_write obsously and expect no COPY occurred
self.vw.conf = {'allow_versioned_writes': 'false'}
self.app.register(
'PUT', '/v1/a/c/o', swob.HTTPCreated, {}, 'passed')
self.app.register(
'HEAD', '/v1/a/c/o', swob.HTTPOk,
{'last-modified': 'Wed, 19 Nov 2014 18:19:02 GMT'}, 'passed')
cache = FakeCache({'versions': 'ver_cont'})
req = Request.blank(
'/v1/a/c/o',
environ={'REQUEST_METHOD': 'PUT', 'swift.cache': cache,
'CONTENT_LENGTH': '100'})
status, headers, body = self.call_vw(req)
self.assertEquals(status, '201 Created')
self.assertEqual(len(self.authorized), 1)
self.assertRequestEqual(req, self.authorized[0])
called_method = [method for (method, path, hdrs) in self.app._calls]
self.assertTrue('COPY' not in called_method)
def test_delete_object_no_versioning_with_container_config_true(self):
# set False to versions_write obviously and expect no GET versioning
# container and COPY called (just delete object as normal)
self.vw.conf = {'allow_versioned_writes': 'false'}
self.app.register(
'DELETE', '/v1/a/c/o', swob.HTTPNoContent, {}, 'passed')
cache = FakeCache({'versions': 'ver_cont'})
req = Request.blank(
'/v1/a/c/o',
environ={'REQUEST_METHOD': 'DELETE', 'swift.cache': cache})
status, headers, body = self.call_vw(req)
self.assertEquals(status, '204 No Content')
self.assertEqual(len(self.authorized), 1)
self.assertRequestEqual(req, self.authorized[0])
called_method = \
[method for (method, path, rheaders) in self.app._calls]
self.assertTrue('COPY' not in called_method)
self.assertTrue('GET' not in called_method)
def test_copy_object_no_versioning_with_container_config_true(self):
# set False to versions_write obviously and expect no extra
# COPY called (just copy object as normal)
self.vw.conf = {'allow_versioned_writes': 'false'}
self.app.register(
'COPY', '/v1/a/c/o', swob.HTTPCreated, {}, None)
cache = FakeCache({'versions': 'ver_cont'})
req = Request.blank(
'/v1/a/c/o',
environ={'REQUEST_METHOD': 'COPY', 'swift.cache': cache})
status, headers, body = self.call_vw(req)
self.assertEquals(status, '201 Created')
self.assertEqual(len(self.authorized), 1)
self.assertRequestEqual(req, self.authorized[0])
called_method = \
[method for (method, path, rheaders) in self.app._calls]
self.assertTrue('COPY' in called_method)
self.assertEquals(called_method.count('COPY'), 1)
def test_new_version_success(self):
self.app.register(
'PUT', '/v1/a/c/o', swob.HTTPOk, {}, 'passed')
self.app.register(
'HEAD', '/v1/a/c/o', swob.HTTPOk,
{'last-modified': 'Wed, 19 Nov 2014 18:19:02 GMT'}, 'passed')
self.app.register(
'COPY', '/v1/a/c/o', swob.HTTPCreated, {}, None)
cache = FakeCache({'sysmeta': {'versions-location': 'ver_cont'}})
req = Request.blank(
'/v1/a/c/o',
environ={'REQUEST_METHOD': 'PUT', 'swift.cache': cache,
'CONTENT_LENGTH': '100'})
status, headers, body = self.call_vw(req)
self.assertEquals(status, '200 OK')
self.assertEqual(len(self.authorized), 1)
self.assertRequestEqual(req, self.authorized[0])
def test_new_version_sysmeta_precedence(self):
self.app.register(
'PUT', '/v1/a/c/o', swob.HTTPOk, {}, 'passed')
self.app.register(
'HEAD', '/v1/a/c/o', swob.HTTPOk,
{'last-modified': 'Wed, 19 Nov 2014 18:19:02 GMT'}, 'passed')
self.app.register(
'COPY', '/v1/a/c/o', swob.HTTPCreated, {}, None)
# fill cache with two different values for versions location
# new middleware should use sysmeta first
cache = FakeCache({'versions': 'old_ver_cont',
'sysmeta': {'versions-location': 'ver_cont'}})
req = Request.blank(
'/v1/a/c/o',
environ={'REQUEST_METHOD': 'PUT', 'swift.cache': cache,
'CONTENT_LENGTH': '100'})
status, headers, body = self.call_vw(req)
self.assertEquals(status, '200 OK')
self.assertEqual(len(self.authorized), 1)
self.assertRequestEqual(req, self.authorized[0])
# check that sysmeta header was used
calls = self.app.calls_with_headers
method, path, req_headers = calls[1]
self.assertEquals('COPY', method)
self.assertEquals('/v1/a/c/o', path)
self.assertTrue(req_headers['Destination'].startswith('ver_cont/'))
def test_copy_first_version(self):
self.app.register(
'COPY', '/v1/a/src_cont/src_obj', swob.HTTPOk, {}, 'passed')
self.app.register(
'HEAD', '/v1/a/tgt_cont/tgt_obj', swob.HTTPNotFound, {}, None)
cache = FakeCache({'sysmeta': {'versions-location': 'ver_cont'}})
req = Request.blank(
'/v1/a/src_cont/src_obj',
environ={'REQUEST_METHOD': 'COPY', 'swift.cache': cache,
'CONTENT_LENGTH': '100'},
headers={'Destination': 'tgt_cont/tgt_obj'})
status, headers, body = self.call_vw(req)
self.assertEquals(status, '200 OK')
self.assertEqual(len(self.authorized), 1)
self.assertRequestEqual(req, self.authorized[0])
def test_copy_new_version(self):
self.app.register(
'COPY', '/v1/a/src_cont/src_obj', swob.HTTPOk, {}, 'passed')
self.app.register(
'HEAD', '/v1/a/tgt_cont/tgt_obj', swob.HTTPOk,
{'last-modified': 'Wed, 19 Nov 2014 18:19:02 GMT'}, 'passed')
self.app.register(
'COPY', '/v1/a/tgt_cont/tgt_obj', swob.HTTPCreated, {}, None)
cache = FakeCache({'sysmeta': {'versions-location': 'ver_cont'}})
req = Request.blank(
'/v1/a/src_cont/src_obj',
environ={'REQUEST_METHOD': 'COPY', 'swift.cache': cache,
'CONTENT_LENGTH': '100'},
headers={'Destination': 'tgt_cont/tgt_obj'})
status, headers, body = self.call_vw(req)
self.assertEquals(status, '200 OK')
self.assertEqual(len(self.authorized), 1)
self.assertRequestEqual(req, self.authorized[0])
def test_copy_new_version_different_account(self):
self.app.register(
'COPY', '/v1/src_a/src_cont/src_obj', swob.HTTPOk, {}, 'passed')
self.app.register(
'HEAD', '/v1/tgt_a/tgt_cont/tgt_obj', swob.HTTPOk,
{'last-modified': 'Wed, 19 Nov 2014 18:19:02 GMT'}, 'passed')
self.app.register(
'COPY', '/v1/tgt_a/tgt_cont/tgt_obj', swob.HTTPCreated, {}, None)
cache = FakeCache({'sysmeta': {'versions-location': 'ver_cont'}})
req = Request.blank(
'/v1/src_a/src_cont/src_obj',
environ={'REQUEST_METHOD': 'COPY', 'swift.cache': cache,
'CONTENT_LENGTH': '100'},
headers={'Destination': 'tgt_cont/tgt_obj',
'Destination-Account': 'tgt_a'})
status, headers, body = self.call_vw(req)
self.assertEquals(status, '200 OK')
self.assertEqual(len(self.authorized), 1)
self.assertRequestEqual(req, self.authorized[0])
def test_delete_first_object_success(self):
self.app.register(
'DELETE', '/v1/a/c/o', swob.HTTPOk, {}, 'passed')
self.app.register(
'GET', '/v1/a/ver_cont?format=json&prefix=001o/&marker=',
swob.HTTPNotFound, {}, None)
cache = FakeCache({'sysmeta': {'versions-location': 'ver_cont'}})
req = Request.blank(
'/v1/a/c/o',
environ={'REQUEST_METHOD': 'DELETE', 'swift.cache': cache,
'CONTENT_LENGTH': '0'})
status, headers, body = self.call_vw(req)
self.assertEquals(status, '200 OK')
self.assertEqual(len(self.authorized), 1)
self.assertRequestEqual(req, self.authorized[0])
def test_delete_latest_version_success(self):
self.app.register(
'DELETE', '/v1/a/c/o', swob.HTTPOk, {}, 'passed')
self.app.register(
'GET', '/v1/a/ver_cont?format=json&prefix=001o/&marker=',
swob.HTTPOk, {},
'[{"hash": "x", '
'"last_modified": "2014-11-21T14:14:27.409100", '
'"bytes": 3, '
'"name": "001o/1", '
'"content_type": "text/plain"}, '
'{"hash": "y", '
'"last_modified": "2014-11-21T14:23:02.206740", '
'"bytes": 3, '
'"name": "001o/2", '
'"content_type": "text/plain"}]')
self.app.register(
'GET', '/v1/a/ver_cont?format=json&prefix=001o/'
'&marker=001o/2',
swob.HTTPNotFound, {}, None)
self.app.register(
'COPY', '/v1/a/ver_cont/001o/2', swob.HTTPCreated,
{}, None)
self.app.register(
'DELETE', '/v1/a/ver_cont/001o/2', swob.HTTPOk,
{}, None)
cache = FakeCache({'sysmeta': {'versions-location': 'ver_cont'}})
req = Request.blank(
'/v1/a/c/o',
headers={'X-If-Delete-At': 1},
environ={'REQUEST_METHOD': 'DELETE', 'swift.cache': cache,
'CONTENT_LENGTH': '0'})
status, headers, body = self.call_vw(req)
self.assertEquals(status, '200 OK')
self.assertEqual(len(self.authorized), 1)
self.assertRequestEqual(req, self.authorized[0])
# check that X-If-Delete-At was removed from DELETE request
calls = self.app.calls_with_headers
method, path, req_headers = calls.pop()
self.assertEquals('DELETE', method)
self.assertTrue(path.startswith('/v1/a/ver_cont/001o/2'))
self.assertFalse('x-if-delete-at' in req_headers or
'X-If-Delete-At' in req_headers)
def test_DELETE_on_expired_versioned_object(self):
self.app.register(
'GET', '/v1/a/ver_cont?format=json&prefix=001o/&marker=',
swob.HTTPOk, {},
'[{"hash": "x", '
'"last_modified": "2014-11-21T14:14:27.409100", '
'"bytes": 3, '
'"name": "001o/1", '
'"content_type": "text/plain"}, '
'{"hash": "y", '
'"last_modified": "2014-11-21T14:23:02.206740", '
'"bytes": 3, '
'"name": "001o/2", '
'"content_type": "text/plain"}]')
self.app.register(
'GET', '/v1/a/ver_cont?format=json&prefix=001o/'
'&marker=001o/2',
swob.HTTPNotFound, {}, None)
# expired object
self.app.register(
'COPY', '/v1/a/ver_cont/001o/2', swob.HTTPNotFound,
{}, None)
self.app.register(
'COPY', '/v1/a/ver_cont/001o/1', swob.HTTPCreated,
{}, None)
self.app.register(
'DELETE', '/v1/a/ver_cont/001o/1', swob.HTTPOk,
{}, None)
cache = FakeCache({'sysmeta': {'versions-location': 'ver_cont'}})
req = Request.blank(
'/v1/a/c/o',
environ={'REQUEST_METHOD': 'DELETE', 'swift.cache': cache,
'CONTENT_LENGTH': '0'})
status, headers, body = self.call_vw(req)
self.assertEquals(status, '200 OK')
self.assertEqual(len(self.authorized), 1)
self.assertRequestEqual(req, self.authorized[0])
def test_denied_DELETE_of_versioned_object(self):
authorize_call = []
self.app.register(
'DELETE', '/v1/a/c/o', swob.HTTPOk, {}, 'passed')
self.app.register(
'GET', '/v1/a/ver_cont?format=json&prefix=001o/&marker=',
swob.HTTPOk, {},
'[{"hash": "x", '
'"last_modified": "2014-11-21T14:14:27.409100", '
'"bytes": 3, '
'"name": "001o/1", '
'"content_type": "text/plain"}, '
'{"hash": "y", '
'"last_modified": "2014-11-21T14:23:02.206740", '
'"bytes": 3, '
'"name": "001o/2", '
'"content_type": "text/plain"}]')
self.app.register(
'GET', '/v1/a/ver_cont?format=json&prefix=001o/'
'&marker=001o/2',
swob.HTTPNotFound, {}, None)
self.app.register(
'DELETE', '/v1/a/c/o', swob.HTTPForbidden,
{}, None)
def fake_authorize(req):
authorize_call.append(req)
return swob.HTTPForbidden()
cache = FakeCache({'sysmeta': {'versions-location': 'ver_cont'}})
req = Request.blank(
'/v1/a/c/o',
environ={'REQUEST_METHOD': 'DELETE', 'swift.cache': cache,
'swift.authorize': fake_authorize,
'CONTENT_LENGTH': '0'})
status, headers, body = self.call_vw(req)
self.assertEquals(status, '403 Forbidden')
self.assertEqual(len(authorize_call), 1)
self.assertRequestEqual(req, authorize_call[0])

View File

@ -515,6 +515,24 @@ class TestConstraints(unittest.TestCase):
constraints.check_account_format,
req, req.headers['X-Copy-From-Account'])
def test_check_container_format(self):
invalid_versions_locations = (
'container/with/slashes',
'', # empty
)
for versions_location in invalid_versions_locations:
req = Request.blank(
'/v/a/c/o', headers={
'X-Versions-Location': versions_location})
try:
constraints.check_container_format(
req, req.headers['X-Versions-Location'])
except HTTPException as e:
self.assertTrue(e.body.startswith('Container name cannot'))
else:
self.fail('check_container_format did not raise error for %r' %
req.headers['X-Versions-Location'])
class TestConstraintsConfig(unittest.TestCase):

View File

@ -141,6 +141,11 @@ class TestWSGI(unittest.TestCase):
expected = swift.common.middleware.dlo.DynamicLargeObject
self.assertTrue(isinstance(app, expected))
app = app.app
expected = \
swift.common.middleware.versioned_writes.VersionedWritesMiddleware
self.assert_(isinstance(app, expected))
app = app.app
expected = swift.proxy.server.Application
self.assertTrue(isinstance(app, expected))
@ -1414,6 +1419,7 @@ class TestPipelineModification(unittest.TestCase):
['swift.common.middleware.catch_errors',
'swift.common.middleware.gatekeeper',
'swift.common.middleware.dlo',
'swift.common.middleware.versioned_writes',
'swift.proxy.server'])
def test_proxy_modify_wsgi_pipeline(self):
@ -1444,6 +1450,7 @@ class TestPipelineModification(unittest.TestCase):
['swift.common.middleware.catch_errors',
'swift.common.middleware.gatekeeper',
'swift.common.middleware.dlo',
'swift.common.middleware.versioned_writes',
'swift.common.middleware.healthcheck',
'swift.proxy.server'])
@ -1541,6 +1548,7 @@ class TestPipelineModification(unittest.TestCase):
'swift.common.middleware.catch_errors',
'swift.common.middleware.gatekeeper',
'swift.common.middleware.dlo',
'swift.common.middleware.versioned_writes',
'swift.common.middleware.healthcheck',
'swift.proxy.server'])
@ -1554,6 +1562,7 @@ class TestPipelineModification(unittest.TestCase):
'swift.common.middleware.healthcheck',
'swift.common.middleware.catch_errors',
'swift.common.middleware.dlo',
'swift.common.middleware.versioned_writes',
'swift.proxy.server'])
def test_catch_errors_gatekeeper_configured_not_at_start(self):
@ -1566,6 +1575,7 @@ class TestPipelineModification(unittest.TestCase):
'swift.common.middleware.catch_errors',
'swift.common.middleware.gatekeeper',
'swift.common.middleware.dlo',
'swift.common.middleware.versioned_writes',
'swift.proxy.server'])
@with_tempdir
@ -1598,7 +1608,7 @@ class TestPipelineModification(unittest.TestCase):
tempdir, policy.ring_name + '.ring.gz')
app = wsgi.loadapp(conf_path)
proxy_app = app.app.app.app.app
proxy_app = app.app.app.app.app.app
self.assertEqual(proxy_app.account_ring.serialized_path,
account_ring_path)
self.assertEqual(proxy_app.container_ring.serialized_path,

File diff suppressed because it is too large Load Diff