Merge "versioned writes middleware"
This commit is contained in:
commit
0279411c58
@ -9,7 +9,6 @@ user = <your-user-name>
|
||||
log_facility = LOG_LOCAL2
|
||||
recon_cache_path = /var/cache/swift
|
||||
eventlet_debug = true
|
||||
allow_versions = true
|
||||
|
||||
[pipeline:main]
|
||||
pipeline = recon container-server
|
||||
|
@ -9,7 +9,6 @@ user = <your-user-name>
|
||||
log_facility = LOG_LOCAL3
|
||||
recon_cache_path = /var/cache/swift2
|
||||
eventlet_debug = true
|
||||
allow_versions = true
|
||||
|
||||
[pipeline:main]
|
||||
pipeline = recon container-server
|
||||
|
@ -9,7 +9,6 @@ user = <your-user-name>
|
||||
log_facility = LOG_LOCAL4
|
||||
recon_cache_path = /var/cache/swift3
|
||||
eventlet_debug = true
|
||||
allow_versions = true
|
||||
|
||||
[pipeline:main]
|
||||
pipeline = recon container-server
|
||||
|
@ -9,7 +9,6 @@ user = <your-user-name>
|
||||
log_facility = LOG_LOCAL5
|
||||
recon_cache_path = /var/cache/swift4
|
||||
eventlet_debug = true
|
||||
allow_versions = true
|
||||
|
||||
[pipeline:main]
|
||||
pipeline = recon container-server
|
||||
|
@ -9,7 +9,7 @@ eventlet_debug = true
|
||||
[pipeline:main]
|
||||
# Yes, proxy-logging appears twice. This is so that
|
||||
# middleware-originated requests get logged too.
|
||||
pipeline = catch_errors gatekeeper healthcheck proxy-logging cache bulk tempurl ratelimit crossdomain tempauth staticweb container-quotas account-quotas slo dlo proxy-logging proxy-server
|
||||
pipeline = catch_errors gatekeeper healthcheck proxy-logging cache bulk tempurl ratelimit crossdomain tempauth staticweb container-quotas account-quotas slo dlo versioned_writes proxy-logging proxy-server
|
||||
|
||||
[filter:catch_errors]
|
||||
use = egg:swift#catch_errors
|
||||
@ -60,6 +60,10 @@ use = egg:swift#memcache
|
||||
[filter:gatekeeper]
|
||||
use = egg:swift#gatekeeper
|
||||
|
||||
[filter:versioned_writes]
|
||||
use = egg:swift#versioned_writes
|
||||
allow_versioned_writes = true
|
||||
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
allow_account_management = true
|
||||
|
@ -102,6 +102,7 @@ DLO :ref:`dynamic-large-objects`
|
||||
LE :ref:`list_endpoints`
|
||||
KS :ref:`keystoneauth`
|
||||
RL :ref:`ratelimit`
|
||||
VW :ref:`versioned_writes`
|
||||
======================= =============================
|
||||
|
||||
|
||||
|
@ -155,6 +155,15 @@ Name Check (Forbidden Character Filter)
|
||||
:members:
|
||||
:show-inheritance:
|
||||
|
||||
.. _versioned_writes:
|
||||
|
||||
Object Versioning
|
||||
=================
|
||||
|
||||
.. automodule:: swift.common.middleware.versioned_writes
|
||||
:members:
|
||||
:show-inheritance:
|
||||
|
||||
Proxy Logging
|
||||
=============
|
||||
|
||||
|
@ -1,89 +1,6 @@
|
||||
=================
|
||||
Object Versioning
|
||||
=================
|
||||
|
||||
--------
|
||||
Overview
|
||||
--------
|
||||
|
||||
Object versioning in swift is implemented by setting a flag on the container
|
||||
to tell swift to version all objects in the container. The flag is the
|
||||
``X-Versions-Location`` header on the container, and its value is the
|
||||
container where the versions are stored. It is recommended to use a different
|
||||
``X-Versions-Location`` container for each container that is being versioned.
|
||||
|
||||
When data is ``PUT`` into a versioned container (a container with the
|
||||
versioning flag turned on), the existing data in the file is redirected to a
|
||||
new object and the data in the ``PUT`` request is saved as the data for the
|
||||
versioned object. The new object name (for the previous version) is
|
||||
``<versions_container>/<length><object_name>/<timestamp>``, where ``length``
|
||||
is the 3-character zero-padded hexadecimal length of the ``<object_name>`` and
|
||||
``<timestamp>`` is the timestamp of when the previous version was created.
|
||||
|
||||
A ``GET`` to a versioned object will return the current version of the object
|
||||
without having to do any request redirects or metadata lookups.
|
||||
|
||||
A ``POST`` to a versioned object will update the object metadata as normal,
|
||||
but will not create a new version of the object. In other words, new versions
|
||||
are only created when the content of the object changes.
|
||||
|
||||
A ``DELETE`` to a versioned object will only remove the current version of the
|
||||
object. If you have 5 total versions of the object, you must delete the
|
||||
object 5 times to completely remove the object.
|
||||
|
||||
Note: A large object manifest file cannot be versioned, but a large object
|
||||
manifest may point to versioned segments.
|
||||
|
||||
--------------------------------------------------
|
||||
How to Enable Object Versioning in a Swift Cluster
|
||||
--------------------------------------------------
|
||||
|
||||
Set ``allow_versions`` to ``True`` in the container server config.
|
||||
|
||||
-----------------------
|
||||
Examples Using ``curl``
|
||||
-----------------------
|
||||
|
||||
First, create a container with the ``X-Versions-Location`` header or add the
|
||||
header to an existing container. Also make sure the container referenced by
|
||||
the ``X-Versions-Location`` exists. In this example, the name of that
|
||||
container is "versions"::
|
||||
|
||||
curl -i -XPUT -H "X-Auth-Token: <token>" \
|
||||
-H "X-Versions-Location: versions" http://<storage_url>/container
|
||||
curl -i -XPUT -H "X-Auth-Token: <token>" http://<storage_url>/versions
|
||||
|
||||
Create an object (the first version)::
|
||||
|
||||
curl -i -XPUT --data-binary 1 -H "X-Auth-Token: <token>" \
|
||||
http://<storage_url>/container/myobject
|
||||
|
||||
Now create a new version of that object::
|
||||
|
||||
curl -i -XPUT --data-binary 2 -H "X-Auth-Token: <token>" \
|
||||
http://<storage_url>/container/myobject
|
||||
|
||||
See a listing of the older versions of the object::
|
||||
|
||||
curl -i -H "X-Auth-Token: <token>" \
|
||||
http://<storage_url>/versions?prefix=008myobject/
|
||||
|
||||
Now delete the current version of the object and see that the older version is
|
||||
gone::
|
||||
|
||||
curl -i -XDELETE -H "X-Auth-Token: <token>" \
|
||||
http://<storage_url>/container/myobject
|
||||
curl -i -H "X-Auth-Token: <token>" \
|
||||
http://<storage_url>/versions?prefix=008myobject/
|
||||
|
||||
---------------------------------------------------
|
||||
How to Disable Object Versioning in a Swift Cluster
|
||||
---------------------------------------------------
|
||||
|
||||
If you want to disable all functionality, set ``allow_versions`` back to
|
||||
``False`` in the container server config.
|
||||
|
||||
Disable versioning a versioned container (x is any value except empty)::
|
||||
|
||||
curl -i -XPOST -H "X-Auth-Token: <token>" \
|
||||
-H "X-Remove-Versions-Location: x" http://<storage_url>/container
|
||||
.. automodule:: swift.common.middleware.versioned_writes
|
||||
:members:
|
||||
:show-inheritance:
|
||||
|
@ -77,7 +77,7 @@ bind_port = 8080
|
||||
# eventlet_debug = false
|
||||
|
||||
[pipeline:main]
|
||||
pipeline = catch_errors gatekeeper healthcheck proxy-logging cache container_sync bulk tempurl ratelimit tempauth container-quotas account-quotas slo dlo proxy-logging proxy-server
|
||||
pipeline = catch_errors gatekeeper healthcheck proxy-logging cache container_sync bulk tempurl ratelimit tempauth container-quotas account-quotas slo dlo versioned_writes proxy-logging proxy-server
|
||||
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
@ -703,3 +703,14 @@ use = egg:swift#xprofile
|
||||
#
|
||||
# unwind the iterator of applications
|
||||
# unwind = false
|
||||
|
||||
# Note: Put after slo, dlo in the pipeline.
|
||||
# If you don't put it in the pipeline, it will be inserted automatically.
|
||||
[filter:versioned_writes]
|
||||
use = egg:swift#versioned_writes
|
||||
# Enables using versioned writes middleware and exposing configuration
|
||||
# settings via HTTP GET /info.
|
||||
# WARNING: Setting this option bypasses the "allow_versions" option
|
||||
# in the container configuration file, which will be eventually
|
||||
# deprecated. See documentation for more details.
|
||||
# allow_versioned_writes = false
|
||||
|
@ -95,6 +95,7 @@ paste.filter_factory =
|
||||
gatekeeper = swift.common.middleware.gatekeeper:filter_factory
|
||||
container_sync = swift.common.middleware.container_sync:filter_factory
|
||||
xprofile = swift.common.middleware.xprofile:filter_factory
|
||||
versioned_writes = swift.common.middleware.versioned_writes:filter_factory
|
||||
|
||||
[build_sphinx]
|
||||
all_files = 1
|
||||
|
@ -13,6 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import functools
|
||||
import os
|
||||
import urllib
|
||||
import time
|
||||
@ -406,28 +407,33 @@ def check_destination_header(req):
|
||||
'<container name>/<object name>')
|
||||
|
||||
|
||||
def check_account_format(req, account):
|
||||
def check_name_format(req, name, target_type):
|
||||
"""
|
||||
Validate that the header contains valid account name.
|
||||
We assume the caller ensures that
|
||||
destination header is present in req.headers.
|
||||
Validate that the header contains valid account or container name.
|
||||
|
||||
:param req: HTTP request object
|
||||
:returns: A properly encoded account name
|
||||
:param name: header value to validate
|
||||
:param target_type: which header is being validated (Account or Container)
|
||||
:returns: A properly encoded account name or container name
|
||||
:raise: HTTPPreconditionFailed if account header
|
||||
is not well formatted.
|
||||
"""
|
||||
if not account:
|
||||
if not name:
|
||||
raise HTTPPreconditionFailed(
|
||||
request=req,
|
||||
body='Account name cannot be empty')
|
||||
if isinstance(account, unicode):
|
||||
account = account.encode('utf-8')
|
||||
if '/' in account:
|
||||
body='%s name cannot be empty' % target_type)
|
||||
if isinstance(name, unicode):
|
||||
name = name.encode('utf-8')
|
||||
if '/' in name:
|
||||
raise HTTPPreconditionFailed(
|
||||
request=req,
|
||||
body='Account name cannot contain slashes')
|
||||
return account
|
||||
body='%s name cannot contain slashes' % target_type)
|
||||
return name
|
||||
|
||||
check_account_format = functools.partial(check_name_format,
|
||||
target_type='Account')
|
||||
check_container_format = functools.partial(check_name_format,
|
||||
target_type='Container')
|
||||
|
||||
|
||||
def valid_api_version(version):
|
||||
|
490
swift/common/middleware/versioned_writes.py
Normal file
490
swift/common/middleware/versioned_writes.py
Normal file
@ -0,0 +1,490 @@
|
||||
# Copyright (c) 2014 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Object versioning in swift is implemented by setting a flag on the container
|
||||
to tell swift to version all objects in the container. The flag is the
|
||||
``X-Versions-Location`` header on the container, and its value is the
|
||||
container where the versions are stored. It is recommended to use a different
|
||||
``X-Versions-Location`` container for each container that is being versioned.
|
||||
|
||||
When data is ``PUT`` into a versioned container (a container with the
|
||||
versioning flag turned on), the existing data in the file is redirected to a
|
||||
new object and the data in the ``PUT`` request is saved as the data for the
|
||||
versioned object. The new object name (for the previous version) is
|
||||
``<versions_container>/<length><object_name>/<timestamp>``, where ``length``
|
||||
is the 3-character zero-padded hexadecimal length of the ``<object_name>`` and
|
||||
``<timestamp>`` is the timestamp of when the previous version was created.
|
||||
|
||||
A ``GET`` to a versioned object will return the current version of the object
|
||||
without having to do any request redirects or metadata lookups.
|
||||
|
||||
A ``POST`` to a versioned object will update the object metadata as normal,
|
||||
but will not create a new version of the object. In other words, new versions
|
||||
are only created when the content of the object changes.
|
||||
|
||||
A ``DELETE`` to a versioned object will only remove the current version of the
|
||||
object. If you have 5 total versions of the object, you must delete the
|
||||
object 5 times to completely remove the object.
|
||||
|
||||
--------------------------------------------------
|
||||
How to Enable Object Versioning in a Swift Cluster
|
||||
--------------------------------------------------
|
||||
|
||||
This middleware was written as an effort to refactor parts of the proxy server,
|
||||
so this functionality was already available in previous releases and every
|
||||
attempt was made to maintain backwards compatibility. To allow operators to
|
||||
perform a seamless upgrade, it is not required to add the middleware to the
|
||||
proxy pipeline and the flag ``allow_versions`` in the container server
|
||||
configuration files are still valid. In future releases, ``allow_versions``
|
||||
will be deprecated in favor of adding this middleware to the pipeline to enable
|
||||
or disable the feature.
|
||||
|
||||
In case the middleware is added to the proxy pipeline, you must also
|
||||
set ``allow_versioned_writes`` to ``True`` in the middleware options
|
||||
to enable the information about this middleware to be returned in a /info
|
||||
request.
|
||||
|
||||
Upgrade considerations: If ``allow_versioned_writes`` is set in the filter
|
||||
configuration, you can leave the ``allow_versions`` flag in the container
|
||||
server configuration files untouched. If you decide to disable or remove the
|
||||
``allow_versions`` flag, you must re-set any existing containers that had
|
||||
the 'X-Versions-Location' flag configured so that it can now be tracked by the
|
||||
versioned_writes middleware.
|
||||
|
||||
-----------------------
|
||||
Examples Using ``curl``
|
||||
-----------------------
|
||||
|
||||
First, create a container with the ``X-Versions-Location`` header or add the
|
||||
header to an existing container. Also make sure the container referenced by
|
||||
the ``X-Versions-Location`` exists. In this example, the name of that
|
||||
container is "versions"::
|
||||
|
||||
curl -i -XPUT -H "X-Auth-Token: <token>" \
|
||||
-H "X-Versions-Location: versions" http://<storage_url>/container
|
||||
curl -i -XPUT -H "X-Auth-Token: <token>" http://<storage_url>/versions
|
||||
|
||||
Create an object (the first version)::
|
||||
|
||||
curl -i -XPUT --data-binary 1 -H "X-Auth-Token: <token>" \
|
||||
http://<storage_url>/container/myobject
|
||||
|
||||
Now create a new version of that object::
|
||||
|
||||
curl -i -XPUT --data-binary 2 -H "X-Auth-Token: <token>" \
|
||||
http://<storage_url>/container/myobject
|
||||
|
||||
See a listing of the older versions of the object::
|
||||
|
||||
curl -i -H "X-Auth-Token: <token>" \
|
||||
http://<storage_url>/versions?prefix=008myobject/
|
||||
|
||||
Now delete the current version of the object and see that the older version is
|
||||
gone::
|
||||
|
||||
curl -i -XDELETE -H "X-Auth-Token: <token>" \
|
||||
http://<storage_url>/container/myobject
|
||||
curl -i -H "X-Auth-Token: <token>" \
|
||||
http://<storage_url>/versions?prefix=008myobject/
|
||||
|
||||
---------------------------------------------------
|
||||
How to Disable Object Versioning in a Swift Cluster
|
||||
---------------------------------------------------
|
||||
|
||||
If you want to disable all functionality, set ``allow_versioned_writes`` to
|
||||
``False`` in the middleware options.
|
||||
|
||||
Disable versioning from a container (x is any value except empty)::
|
||||
|
||||
curl -i -XPOST -H "X-Auth-Token: <token>" \
|
||||
-H "X-Remove-Versions-Location: x" http://<storage_url>/container
|
||||
"""
|
||||
|
||||
import time
|
||||
from urllib import quote, unquote
|
||||
from swift.common.utils import get_logger, Timestamp, json, \
|
||||
register_swift_info, config_true_value
|
||||
from swift.common.request_helpers import get_sys_meta_prefix
|
||||
from swift.common.wsgi import WSGIContext, make_pre_authed_request
|
||||
from swift.common.swob import Request
|
||||
from swift.common.constraints import (
|
||||
check_account_format, check_container_format, check_destination_header)
|
||||
from swift.proxy.controllers.base import get_container_info
|
||||
from swift.common.http import (
|
||||
is_success, is_client_error, HTTP_NOT_FOUND)
|
||||
from swift.common.swob import HTTPPreconditionFailed, HTTPServiceUnavailable, \
|
||||
HTTPServerError
|
||||
from swift.common.exceptions import (
|
||||
ListingIterNotFound, ListingIterError)
|
||||
|
||||
|
||||
class VersionedWritesContext(WSGIContext):
|
||||
|
||||
def __init__(self, wsgi_app, logger):
|
||||
WSGIContext.__init__(self, wsgi_app)
|
||||
self.logger = logger
|
||||
|
||||
def _listing_iter(self, account_name, lcontainer, lprefix, env):
|
||||
for page in self._listing_pages_iter(account_name,
|
||||
lcontainer, lprefix, env):
|
||||
for item in page:
|
||||
yield item
|
||||
|
||||
def _listing_pages_iter(self, account_name, lcontainer, lprefix, env):
|
||||
marker = ''
|
||||
while True:
|
||||
lreq = make_pre_authed_request(
|
||||
env, method='GET', swift_source='VW',
|
||||
path='/v1/%s/%s' % (account_name, lcontainer))
|
||||
lreq.environ['QUERY_STRING'] = \
|
||||
'format=json&prefix=%s&marker=%s' % (quote(lprefix),
|
||||
quote(marker))
|
||||
lresp = lreq.get_response(self.app)
|
||||
if not is_success(lresp.status_int):
|
||||
if lresp.status_int == HTTP_NOT_FOUND:
|
||||
raise ListingIterNotFound()
|
||||
elif is_client_error(lresp.status_int):
|
||||
raise HTTPPreconditionFailed()
|
||||
else:
|
||||
raise ListingIterError()
|
||||
|
||||
if not lresp.body:
|
||||
break
|
||||
|
||||
sublisting = json.loads(lresp.body)
|
||||
if not sublisting:
|
||||
break
|
||||
marker = sublisting[-1]['name'].encode('utf-8')
|
||||
yield sublisting
|
||||
|
||||
def handle_obj_versions_put(self, req, object_versions,
|
||||
object_name, policy_index):
|
||||
ret = None
|
||||
|
||||
# do a HEAD request to check object versions
|
||||
_headers = {'X-Newest': 'True',
|
||||
'X-Backend-Storage-Policy-Index': policy_index,
|
||||
'x-auth-token': req.headers.get('x-auth-token')}
|
||||
|
||||
# make a pre_auth request in case the user has write access
|
||||
# to container, but not READ. This was allowed in previous version
|
||||
# (i.e., before middleware) so keeping the same behavior here
|
||||
head_req = make_pre_authed_request(
|
||||
req.environ, path=req.path_info,
|
||||
headers=_headers, method='HEAD', swift_source='VW')
|
||||
hresp = head_req.get_response(self.app)
|
||||
|
||||
is_dlo_manifest = 'X-Object-Manifest' in req.headers or \
|
||||
'X-Object-Manifest' in hresp.headers
|
||||
|
||||
# if there's an existing object, then copy it to
|
||||
# X-Versions-Location
|
||||
if is_success(hresp.status_int) and not is_dlo_manifest:
|
||||
lcontainer = object_versions.split('/')[0]
|
||||
prefix_len = '%03x' % len(object_name)
|
||||
lprefix = prefix_len + object_name + '/'
|
||||
ts_source = hresp.environ.get('swift_x_timestamp')
|
||||
if ts_source is None:
|
||||
ts_source = time.mktime(time.strptime(
|
||||
hresp.headers['last-modified'],
|
||||
'%a, %d %b %Y %H:%M:%S GMT'))
|
||||
new_ts = Timestamp(ts_source).internal
|
||||
vers_obj_name = lprefix + new_ts
|
||||
copy_headers = {
|
||||
'Destination': '%s/%s' % (lcontainer, vers_obj_name),
|
||||
'x-auth-token': req.headers.get('x-auth-token')}
|
||||
|
||||
# COPY implementation sets X-Newest to True when it internally
|
||||
# does a GET on source object. So, we don't have to explicity
|
||||
# set it in request headers here.
|
||||
copy_req = make_pre_authed_request(
|
||||
req.environ, path=req.path_info,
|
||||
headers=copy_headers, method='COPY', swift_source='VW')
|
||||
copy_resp = copy_req.get_response(self.app)
|
||||
|
||||
if is_success(copy_resp.status_int):
|
||||
# success versioning previous existing object
|
||||
# return None and handle original request
|
||||
ret = None
|
||||
else:
|
||||
if is_client_error(copy_resp.status_int):
|
||||
# missing container or bad permissions
|
||||
ret = HTTPPreconditionFailed(request=req)
|
||||
else:
|
||||
# could not copy the data, bail
|
||||
ret = HTTPServiceUnavailable(request=req)
|
||||
|
||||
else:
|
||||
if hresp.status_int == HTTP_NOT_FOUND or is_dlo_manifest:
|
||||
# nothing to version
|
||||
# return None and handle original request
|
||||
ret = None
|
||||
else:
|
||||
# if not HTTP_NOT_FOUND, return error immediately
|
||||
ret = hresp
|
||||
|
||||
return ret
|
||||
|
||||
def handle_obj_versions_delete(self, req, object_versions,
|
||||
account_name, container_name, object_name):
|
||||
lcontainer = object_versions.split('/')[0]
|
||||
prefix_len = '%03x' % len(object_name)
|
||||
lprefix = prefix_len + object_name + '/'
|
||||
item_list = []
|
||||
try:
|
||||
for _item in self._listing_iter(account_name, lcontainer, lprefix,
|
||||
req.environ):
|
||||
item_list.append(_item)
|
||||
except ListingIterNotFound:
|
||||
pass
|
||||
except HTTPPreconditionFailed:
|
||||
return HTTPPreconditionFailed(request=req)
|
||||
except ListingIterError:
|
||||
return HTTPServerError(request=req)
|
||||
|
||||
if item_list:
|
||||
# we're about to start making COPY requests - need to validate the
|
||||
# write access to the versioned container
|
||||
if 'swift.authorize' in req.environ:
|
||||
container_info = get_container_info(
|
||||
req.environ, self.app)
|
||||
req.acl = container_info.get('write_acl')
|
||||
aresp = req.environ['swift.authorize'](req)
|
||||
if aresp:
|
||||
return aresp
|
||||
|
||||
while len(item_list) > 0:
|
||||
previous_version = item_list.pop()
|
||||
|
||||
# there are older versions so copy the previous version to the
|
||||
# current object and delete the previous version
|
||||
prev_obj_name = previous_version['name'].encode('utf-8')
|
||||
|
||||
copy_path = '/v1/' + account_name + '/' + \
|
||||
lcontainer + '/' + prev_obj_name
|
||||
|
||||
copy_headers = {'X-Newest': 'True',
|
||||
'Destination': container_name + '/' + object_name,
|
||||
'x-auth-token': req.headers.get('x-auth-token')}
|
||||
|
||||
copy_req = make_pre_authed_request(
|
||||
req.environ, path=copy_path,
|
||||
headers=copy_headers, method='COPY', swift_source='VW')
|
||||
copy_resp = copy_req.get_response(self.app)
|
||||
|
||||
# if the version isn't there, keep trying with previous version
|
||||
if copy_resp.status_int == HTTP_NOT_FOUND:
|
||||
continue
|
||||
|
||||
if not is_success(copy_resp.status_int):
|
||||
if is_client_error(copy_resp.status_int):
|
||||
# some user error, maybe permissions
|
||||
return HTTPPreconditionFailed(request=req)
|
||||
else:
|
||||
# could not copy the data, bail
|
||||
return HTTPServiceUnavailable(request=req)
|
||||
|
||||
# reset these because the COPY changed them
|
||||
new_del_req = make_pre_authed_request(
|
||||
req.environ, path=copy_path, method='DELETE',
|
||||
swift_source='VW')
|
||||
req = new_del_req
|
||||
|
||||
# remove 'X-If-Delete-At', since it is not for the older copy
|
||||
if 'X-If-Delete-At' in req.headers:
|
||||
del req.headers['X-If-Delete-At']
|
||||
break
|
||||
|
||||
# handle DELETE request here in case it was modified
|
||||
return req.get_response(self.app)
|
||||
|
||||
def handle_container_request(self, env, start_response):
|
||||
app_resp = self._app_call(env)
|
||||
if self._response_headers is None:
|
||||
self._response_headers = []
|
||||
sysmeta_version_hdr = get_sys_meta_prefix('container') + \
|
||||
'versions-location'
|
||||
location = ''
|
||||
for key, val in self._response_headers:
|
||||
if key.lower() == sysmeta_version_hdr:
|
||||
location = val
|
||||
|
||||
if location:
|
||||
self._response_headers.extend([('X-Versions-Location', location)])
|
||||
|
||||
start_response(self._response_status,
|
||||
self._response_headers,
|
||||
self._response_exc_info)
|
||||
return app_resp
|
||||
|
||||
|
||||
class VersionedWritesMiddleware(object):
|
||||
|
||||
def __init__(self, app, conf):
|
||||
self.app = app
|
||||
self.conf = conf
|
||||
self.logger = get_logger(conf, log_route='versioned_writes')
|
||||
|
||||
def container_request(self, req, start_response, enabled):
|
||||
sysmeta_version_hdr = get_sys_meta_prefix('container') + \
|
||||
'versions-location'
|
||||
|
||||
# set version location header as sysmeta
|
||||
if 'X-Versions-Location' in req.headers:
|
||||
val = req.headers.get('X-Versions-Location')
|
||||
if val:
|
||||
# diferently from previous version, we are actually
|
||||
# returning an error if user tries to set versions location
|
||||
# while feature is explicitly disabled.
|
||||
if not config_true_value(enabled) and \
|
||||
req.method in ('PUT', 'POST'):
|
||||
raise HTTPPreconditionFailed(
|
||||
request=req, content_type='text/plain',
|
||||
body='Versioned Writes is disabled')
|
||||
|
||||
location = check_container_format(req, val)
|
||||
req.headers[sysmeta_version_hdr] = location
|
||||
|
||||
# reset original header to maintain sanity
|
||||
# now only sysmeta is source of Versions Location
|
||||
req.headers['X-Versions-Location'] = ''
|
||||
|
||||
# if both headers are in the same request
|
||||
# adding location takes precendence over removing
|
||||
if 'X-Remove-Versions-Location' in req.headers:
|
||||
del req.headers['X-Remove-Versions-Location']
|
||||
else:
|
||||
# empty value is the same as X-Remove-Versions-Location
|
||||
req.headers['X-Remove-Versions-Location'] = 'x'
|
||||
|
||||
# handle removing versions container
|
||||
val = req.headers.get('X-Remove-Versions-Location')
|
||||
if val:
|
||||
req.headers.update({sysmeta_version_hdr: ''})
|
||||
req.headers.update({'X-Versions-Location': ''})
|
||||
del req.headers['X-Remove-Versions-Location']
|
||||
|
||||
# send request and translate sysmeta headers from response
|
||||
vw_ctx = VersionedWritesContext(self.app, self.logger)
|
||||
return vw_ctx.handle_container_request(req.environ, start_response)
|
||||
|
||||
def object_request(self, req, version, account, container, obj,
|
||||
allow_versioned_writes):
|
||||
account_name = unquote(account)
|
||||
container_name = unquote(container)
|
||||
object_name = unquote(obj)
|
||||
container_info = None
|
||||
resp = None
|
||||
is_enabled = config_true_value(allow_versioned_writes)
|
||||
if req.method in ('PUT', 'DELETE'):
|
||||
container_info = get_container_info(
|
||||
req.environ, self.app)
|
||||
elif req.method == 'COPY' and 'Destination' in req.headers:
|
||||
if 'Destination-Account' in req.headers:
|
||||
account_name = req.headers.get('Destination-Account')
|
||||
account_name = check_account_format(req, account_name)
|
||||
container_name, object_name = check_destination_header(req)
|
||||
req.environ['PATH_INFO'] = "/%s/%s/%s/%s" % (
|
||||
version, account_name, container_name, object_name)
|
||||
container_info = get_container_info(
|
||||
req.environ, self.app)
|
||||
|
||||
if not container_info:
|
||||
return self.app
|
||||
|
||||
# To maintain backwards compatibility, container version
|
||||
# location could be stored as sysmeta or not, need to check both.
|
||||
# If stored as sysmeta, check if middleware is enabled. If sysmeta
|
||||
# is not set, but versions property is set in container_info, then
|
||||
# for backwards compatibility feature is enabled.
|
||||
object_versions = container_info.get(
|
||||
'sysmeta', {}).get('versions-location')
|
||||
if object_versions and isinstance(object_versions, unicode):
|
||||
object_versions = object_versions.encode('utf-8')
|
||||
elif not object_versions:
|
||||
object_versions = container_info.get('versions')
|
||||
# if allow_versioned_writes is not set in the configuration files
|
||||
# but 'versions' is configured, enable feature to maintain
|
||||
# backwards compatibility
|
||||
if not allow_versioned_writes and object_versions:
|
||||
is_enabled = True
|
||||
|
||||
if is_enabled and object_versions:
|
||||
object_versions = unquote(object_versions)
|
||||
vw_ctx = VersionedWritesContext(self.app, self.logger)
|
||||
if req.method in ('PUT', 'COPY'):
|
||||
policy_idx = req.headers.get(
|
||||
'X-Backend-Storage-Policy-Index',
|
||||
container_info['storage_policy'])
|
||||
resp = vw_ctx.handle_obj_versions_put(
|
||||
req, object_versions, object_name, policy_idx)
|
||||
else: # handle DELETE
|
||||
resp = vw_ctx.handle_obj_versions_delete(
|
||||
req, object_versions, account_name,
|
||||
container_name, object_name)
|
||||
|
||||
if resp:
|
||||
return resp
|
||||
else:
|
||||
return self.app
|
||||
|
||||
def __call__(self, env, start_response):
|
||||
# making a duplicate, because if this is a COPY request, we will
|
||||
# modify the PATH_INFO to find out if the 'Destination' is in a
|
||||
# versioned container
|
||||
req = Request(env.copy())
|
||||
try:
|
||||
(version, account, container, obj) = req.split_path(3, 4, True)
|
||||
except ValueError:
|
||||
return self.app(env, start_response)
|
||||
|
||||
# In case allow_versioned_writes is set in the filter configuration,
|
||||
# the middleware becomes the authority on whether object
|
||||
# versioning is enabled or not. In case it is not set, then
|
||||
# the option in the container configuration is still checked
|
||||
# for backwards compatibility
|
||||
|
||||
# For a container request, first just check if option is set,
|
||||
# can be either true or false.
|
||||
# If set, check if enabled when actually trying to set container
|
||||
# header. If not set, let request be handled by container server
|
||||
# for backwards compatibility.
|
||||
# For an object request, also check if option is set (either T or F).
|
||||
# If set, check if enabled when checking versions container in
|
||||
# sysmeta property. If it is not set check 'versions' property in
|
||||
# container_info
|
||||
allow_versioned_writes = self.conf.get('allow_versioned_writes')
|
||||
if allow_versioned_writes and container and not obj:
|
||||
return self.container_request(req, start_response,
|
||||
allow_versioned_writes)
|
||||
elif obj and req.method in ('PUT', 'COPY', 'DELETE'):
|
||||
return self.object_request(
|
||||
req, version, account, container, obj,
|
||||
allow_versioned_writes)(env, start_response)
|
||||
else:
|
||||
return self.app(env, start_response)
|
||||
|
||||
|
||||
def filter_factory(global_conf, **local_conf):
|
||||
conf = global_conf.copy()
|
||||
conf.update(local_conf)
|
||||
if config_true_value(conf.get('allow_versioned_writes')):
|
||||
register_swift_info('versioned_writes')
|
||||
|
||||
def obj_versions_filter(app):
|
||||
return VersionedWritesMiddleware(app, conf)
|
||||
|
||||
return obj_versions_filter
|
@ -51,13 +51,12 @@ from swift.common.constraints import check_metadata, check_object_creation, \
|
||||
check_account_format
|
||||
from swift.common import constraints
|
||||
from swift.common.exceptions import ChunkReadTimeout, \
|
||||
ChunkWriteTimeout, ConnectionTimeout, ListingIterNotFound, \
|
||||
ListingIterNotAuthorized, ListingIterError, ResponseTimeout, \
|
||||
ChunkWriteTimeout, ConnectionTimeout, ResponseTimeout, \
|
||||
InsufficientStorage, FooterNotSupported, MultiphasePUTNotSupported, \
|
||||
PutterConnectError
|
||||
from swift.common.http import (
|
||||
is_success, is_client_error, is_server_error, HTTP_CONTINUE, HTTP_CREATED,
|
||||
HTTP_MULTIPLE_CHOICES, HTTP_NOT_FOUND, HTTP_INTERNAL_SERVER_ERROR,
|
||||
is_success, is_server_error, HTTP_CONTINUE, HTTP_CREATED,
|
||||
HTTP_MULTIPLE_CHOICES, HTTP_INTERNAL_SERVER_ERROR,
|
||||
HTTP_SERVICE_UNAVAILABLE, HTTP_INSUFFICIENT_STORAGE,
|
||||
HTTP_PRECONDITION_FAILED, HTTP_CONFLICT, is_informational)
|
||||
from swift.common.storage_policy import (POLICIES, REPL_POLICY, EC_POLICY,
|
||||
@ -139,46 +138,6 @@ class BaseObjectController(Controller):
|
||||
self.container_name = unquote(container_name)
|
||||
self.object_name = unquote(object_name)
|
||||
|
||||
def _listing_iter(self, lcontainer, lprefix, env):
|
||||
for page in self._listing_pages_iter(lcontainer, lprefix, env):
|
||||
for item in page:
|
||||
yield item
|
||||
|
||||
def _listing_pages_iter(self, lcontainer, lprefix, env):
|
||||
lpartition = self.app.container_ring.get_part(
|
||||
self.account_name, lcontainer)
|
||||
marker = ''
|
||||
while True:
|
||||
lreq = Request.blank('i will be overridden by env', environ=env)
|
||||
# Don't quote PATH_INFO, by WSGI spec
|
||||
lreq.environ['PATH_INFO'] = \
|
||||
'/v1/%s/%s' % (self.account_name, lcontainer)
|
||||
lreq.environ['REQUEST_METHOD'] = 'GET'
|
||||
lreq.environ['QUERY_STRING'] = \
|
||||
'format=json&prefix=%s&marker=%s' % (quote(lprefix),
|
||||
quote(marker))
|
||||
container_node_iter = self.app.iter_nodes(self.app.container_ring,
|
||||
lpartition)
|
||||
lresp = self.GETorHEAD_base(
|
||||
lreq, _('Container'), container_node_iter, lpartition,
|
||||
lreq.swift_entity_path)
|
||||
if 'swift.authorize' in env:
|
||||
lreq.acl = lresp.headers.get('x-container-read')
|
||||
aresp = env['swift.authorize'](lreq)
|
||||
if aresp:
|
||||
raise ListingIterNotAuthorized(aresp)
|
||||
if lresp.status_int == HTTP_NOT_FOUND:
|
||||
raise ListingIterNotFound()
|
||||
elif not is_success(lresp.status_int):
|
||||
raise ListingIterError()
|
||||
if not lresp.body:
|
||||
break
|
||||
sublisting = json.loads(lresp.body)
|
||||
if not sublisting:
|
||||
break
|
||||
marker = sublisting[-1]['name'].encode('utf-8')
|
||||
yield sublisting
|
||||
|
||||
def iter_nodes_local_first(self, ring, partition):
|
||||
"""
|
||||
Yields nodes for a ring partition.
|
||||
@ -548,71 +507,6 @@ class BaseObjectController(Controller):
|
||||
# until copy request handling moves to middleware
|
||||
return None, req, data_source, update_response
|
||||
|
||||
def _handle_object_versions(self, req):
|
||||
"""
|
||||
This method handles versionining of objects in containers that
|
||||
have the feature enabled.
|
||||
|
||||
When a new PUT request is sent, the proxy checks for previous versions
|
||||
of that same object name. If found, it is copied to a different
|
||||
container and the new version is stored in its place.
|
||||
|
||||
This method was added as part of the PUT method refactoring and the
|
||||
functionality is expected to be moved to middleware
|
||||
"""
|
||||
container_info = self.container_info(
|
||||
self.account_name, self.container_name, req)
|
||||
policy_index = req.headers.get('X-Backend-Storage-Policy-Index',
|
||||
container_info['storage_policy'])
|
||||
obj_ring = self.app.get_object_ring(policy_index)
|
||||
partition, nodes = obj_ring.get_nodes(
|
||||
self.account_name, self.container_name, self.object_name)
|
||||
object_versions = container_info['versions']
|
||||
|
||||
# do a HEAD request for checking object versions
|
||||
if object_versions and not req.environ.get('swift_versioned_copy'):
|
||||
# make sure proxy-server uses the right policy index
|
||||
_headers = {'X-Backend-Storage-Policy-Index': policy_index,
|
||||
'X-Newest': 'True'}
|
||||
hreq = Request.blank(req.path_info, headers=_headers,
|
||||
environ={'REQUEST_METHOD': 'HEAD'})
|
||||
hnode_iter = self.app.iter_nodes(obj_ring, partition)
|
||||
hresp = self.GETorHEAD_base(
|
||||
hreq, _('Object'), hnode_iter, partition,
|
||||
hreq.swift_entity_path)
|
||||
|
||||
is_manifest = 'X-Object-Manifest' in req.headers or \
|
||||
'X-Object-Manifest' in hresp.headers
|
||||
if hresp.status_int != HTTP_NOT_FOUND and not is_manifest:
|
||||
# This is a version manifest and needs to be handled
|
||||
# differently. First copy the existing data to a new object,
|
||||
# then write the data from this request to the version manifest
|
||||
# object.
|
||||
lcontainer = object_versions.split('/')[0]
|
||||
prefix_len = '%03x' % len(self.object_name)
|
||||
lprefix = prefix_len + self.object_name + '/'
|
||||
ts_source = hresp.environ.get('swift_x_timestamp')
|
||||
if ts_source is None:
|
||||
ts_source = time.mktime(time.strptime(
|
||||
hresp.headers['last-modified'],
|
||||
'%a, %d %b %Y %H:%M:%S GMT'))
|
||||
new_ts = Timestamp(ts_source).internal
|
||||
vers_obj_name = lprefix + new_ts
|
||||
copy_headers = {
|
||||
'Destination': '%s/%s' % (lcontainer, vers_obj_name)}
|
||||
copy_environ = {'REQUEST_METHOD': 'COPY',
|
||||
'swift_versioned_copy': True
|
||||
}
|
||||
copy_req = Request.blank(req.path_info, headers=copy_headers,
|
||||
environ=copy_environ)
|
||||
copy_resp = self.COPY(copy_req)
|
||||
if is_client_error(copy_resp.status_int):
|
||||
# missing container or bad permissions
|
||||
raise HTTPPreconditionFailed(request=req)
|
||||
elif not is_success(copy_resp.status_int):
|
||||
# could not copy the data, bail
|
||||
raise HTTPServiceUnavailable(request=req)
|
||||
|
||||
def _update_content_type(self, req):
|
||||
# Sometimes the 'content-type' header exists, but is set to None.
|
||||
req.content_type_manually_set = True
|
||||
@ -819,9 +713,6 @@ class BaseObjectController(Controller):
|
||||
|
||||
self._update_x_timestamp(req)
|
||||
|
||||
# check if versioning is enabled and handle copying previous version
|
||||
self._handle_object_versions(req)
|
||||
|
||||
# check if request is a COPY of an existing object
|
||||
source_header = req.headers.get('X-Copy-From')
|
||||
if source_header:
|
||||
@ -865,86 +756,10 @@ class BaseObjectController(Controller):
|
||||
containers = container_info['nodes']
|
||||
req.acl = container_info['write_acl']
|
||||
req.environ['swift_sync_key'] = container_info['sync_key']
|
||||
object_versions = container_info['versions']
|
||||
if 'swift.authorize' in req.environ:
|
||||
aresp = req.environ['swift.authorize'](req)
|
||||
if aresp:
|
||||
return aresp
|
||||
if object_versions:
|
||||
# this is a version manifest and needs to be handled differently
|
||||
object_versions = unquote(object_versions)
|
||||
lcontainer = object_versions.split('/')[0]
|
||||
prefix_len = '%03x' % len(self.object_name)
|
||||
lprefix = prefix_len + self.object_name + '/'
|
||||
item_list = []
|
||||
try:
|
||||
for _item in self._listing_iter(lcontainer, lprefix,
|
||||
req.environ):
|
||||
item_list.append(_item)
|
||||
except ListingIterNotFound:
|
||||
# no worries, last_item is None
|
||||
pass
|
||||
except ListingIterNotAuthorized as err:
|
||||
return err.aresp
|
||||
except ListingIterError:
|
||||
return HTTPServerError(request=req)
|
||||
|
||||
while len(item_list) > 0:
|
||||
previous_version = item_list.pop()
|
||||
# there are older versions so copy the previous version to the
|
||||
# current object and delete the previous version
|
||||
orig_container = self.container_name
|
||||
orig_obj = self.object_name
|
||||
self.container_name = lcontainer
|
||||
self.object_name = previous_version['name'].encode('utf-8')
|
||||
|
||||
copy_path = '/v1/' + self.account_name + '/' + \
|
||||
self.container_name + '/' + self.object_name
|
||||
|
||||
copy_headers = {'X-Newest': 'True',
|
||||
'Destination': orig_container + '/' + orig_obj
|
||||
}
|
||||
copy_environ = {'REQUEST_METHOD': 'COPY',
|
||||
'swift_versioned_copy': True
|
||||
}
|
||||
creq = Request.blank(copy_path, headers=copy_headers,
|
||||
environ=copy_environ)
|
||||
copy_resp = self.COPY(creq)
|
||||
if copy_resp.status_int == HTTP_NOT_FOUND:
|
||||
# the version isn't there so we'll try with previous
|
||||
self.container_name = orig_container
|
||||
self.object_name = orig_obj
|
||||
continue
|
||||
if is_client_error(copy_resp.status_int):
|
||||
# some user error, maybe permissions
|
||||
return HTTPPreconditionFailed(request=req)
|
||||
elif not is_success(copy_resp.status_int):
|
||||
# could not copy the data, bail
|
||||
return HTTPServiceUnavailable(request=req)
|
||||
# reset these because the COPY changed them
|
||||
self.container_name = lcontainer
|
||||
self.object_name = previous_version['name'].encode('utf-8')
|
||||
new_del_req = Request.blank(copy_path, environ=req.environ)
|
||||
container_info = self.container_info(
|
||||
self.account_name, self.container_name, req)
|
||||
policy_idx = container_info['storage_policy']
|
||||
obj_ring = self.app.get_object_ring(policy_idx)
|
||||
# pass the policy index to storage nodes via req header
|
||||
new_del_req.headers['X-Backend-Storage-Policy-Index'] = \
|
||||
policy_idx
|
||||
container_partition = container_info['partition']
|
||||
containers = container_info['nodes']
|
||||
new_del_req.acl = container_info['write_acl']
|
||||
new_del_req.path_info = copy_path
|
||||
req = new_del_req
|
||||
# remove 'X-If-Delete-At', since it is not for the older copy
|
||||
if 'X-If-Delete-At' in req.headers:
|
||||
del req.headers['X-If-Delete-At']
|
||||
if 'swift.authorize' in req.environ:
|
||||
aresp = req.environ['swift.authorize'](req)
|
||||
if aresp:
|
||||
return aresp
|
||||
break
|
||||
if not containers:
|
||||
return HTTPNotFound(request=req)
|
||||
partition, nodes = obj_ring.get_nodes(
|
||||
|
@ -64,6 +64,9 @@ required_filters = [
|
||||
if pipe.startswith('catch_errors')
|
||||
else [])},
|
||||
{'name': 'dlo', 'after_fn': lambda _junk: [
|
||||
'staticweb', 'tempauth', 'keystoneauth',
|
||||
'catch_errors', 'gatekeeper', 'proxy_logging']},
|
||||
{'name': 'versioned_writes', 'after_fn': lambda _junk: [
|
||||
'staticweb', 'tempauth', 'keystoneauth',
|
||||
'catch_errors', 'gatekeeper', 'proxy_logging']}]
|
||||
|
||||
|
@ -236,6 +236,9 @@ class Connection(object):
|
||||
if not cfg.get('no_auth_token'):
|
||||
headers['X-Auth-Token'] = self.storage_token
|
||||
|
||||
if cfg.get('use_token'):
|
||||
headers['X-Auth-Token'] = cfg.get('use_token')
|
||||
|
||||
if isinstance(hdrs, dict):
|
||||
headers.update(hdrs)
|
||||
return headers
|
||||
@ -507,6 +510,18 @@ class Container(Base):
|
||||
return self.conn.make_request('PUT', self.path, hdrs=hdrs,
|
||||
parms=parms, cfg=cfg) in (201, 202)
|
||||
|
||||
def update_metadata(self, hdrs=None, cfg=None):
|
||||
if hdrs is None:
|
||||
hdrs = {}
|
||||
if cfg is None:
|
||||
cfg = {}
|
||||
|
||||
self.conn.make_request('POST', self.path, hdrs=hdrs, cfg=cfg)
|
||||
if not 200 <= self.conn.response.status <= 299:
|
||||
raise ResponseError(self.conn.response, 'POST',
|
||||
self.conn.make_path(self.path))
|
||||
return True
|
||||
|
||||
def delete(self, hdrs=None, parms=None):
|
||||
if hdrs is None:
|
||||
hdrs = {}
|
||||
@ -637,6 +652,9 @@ class File(Base):
|
||||
else:
|
||||
headers['Content-Length'] = 0
|
||||
|
||||
if cfg.get('use_token'):
|
||||
headers['X-Auth-Token'] = cfg.get('use_token')
|
||||
|
||||
if cfg.get('no_content_type'):
|
||||
pass
|
||||
elif self.content_type:
|
||||
@ -711,13 +729,13 @@ class File(Base):
|
||||
return self.conn.make_request('COPY', self.path, hdrs=headers,
|
||||
parms=parms) == 201
|
||||
|
||||
def delete(self, hdrs=None, parms=None):
|
||||
def delete(self, hdrs=None, parms=None, cfg=None):
|
||||
if hdrs is None:
|
||||
hdrs = {}
|
||||
if parms is None:
|
||||
parms = {}
|
||||
if self.conn.make_request('DELETE', self.path, hdrs=hdrs,
|
||||
parms=parms) != 204:
|
||||
cfg=cfg, parms=parms) != 204:
|
||||
|
||||
raise ResponseError(self.conn.response, 'DELETE',
|
||||
self.conn.make_path(self.path))
|
||||
|
@ -2598,7 +2598,7 @@ class TestObjectVersioningEnv(object):
|
||||
@classmethod
|
||||
def setUp(cls):
|
||||
cls.conn = Connection(tf.config)
|
||||
cls.conn.authenticate()
|
||||
cls.storage_url, cls.storage_token = cls.conn.authenticate()
|
||||
|
||||
cls.account = Account(cls.conn, tf.config.get('account',
|
||||
tf.config['username']))
|
||||
@ -2628,6 +2628,30 @@ class TestObjectVersioningEnv(object):
|
||||
# if versioning is off, then X-Versions-Location won't persist
|
||||
cls.versioning_enabled = 'versions' in container_info
|
||||
|
||||
# setup another account to test ACLs
|
||||
config2 = deepcopy(tf.config)
|
||||
config2['account'] = tf.config['account2']
|
||||
config2['username'] = tf.config['username2']
|
||||
config2['password'] = tf.config['password2']
|
||||
cls.conn2 = Connection(config2)
|
||||
cls.storage_url2, cls.storage_token2 = cls.conn2.authenticate()
|
||||
cls.account2 = cls.conn2.get_account()
|
||||
cls.account2.delete_containers()
|
||||
|
||||
# setup another account with no access to anything to test ACLs
|
||||
config3 = deepcopy(tf.config)
|
||||
config3['account'] = tf.config['account']
|
||||
config3['username'] = tf.config['username3']
|
||||
config3['password'] = tf.config['password3']
|
||||
cls.conn3 = Connection(config3)
|
||||
cls.storage_url3, cls.storage_token3 = cls.conn3.authenticate()
|
||||
cls.account3 = cls.conn3.get_account()
|
||||
|
||||
@classmethod
|
||||
def tearDown(cls):
|
||||
cls.account.delete_containers()
|
||||
cls.account2.delete_containers()
|
||||
|
||||
|
||||
class TestCrossPolicyObjectVersioningEnv(object):
|
||||
# tri-state: None initially, then True/False
|
||||
@ -2650,14 +2674,14 @@ class TestCrossPolicyObjectVersioningEnv(object):
|
||||
cls.multiple_policies_enabled = True
|
||||
else:
|
||||
cls.multiple_policies_enabled = False
|
||||
# We have to lie here that versioning is enabled. We actually
|
||||
# don't know, but it does not matter. We know these tests cannot
|
||||
# run without multiple policies present. If multiple policies are
|
||||
# present, we won't be setting this field to any value, so it
|
||||
# should all still work.
|
||||
cls.versioning_enabled = True
|
||||
cls.versioning_enabled = False
|
||||
return
|
||||
|
||||
if cls.versioning_enabled is None:
|
||||
cls.versioning_enabled = 'versioned_writes' in cluster_info
|
||||
if not cls.versioning_enabled:
|
||||
return
|
||||
|
||||
policy = cls.policies.select()
|
||||
version_policy = cls.policies.exclude(name=policy['name']).select()
|
||||
|
||||
@ -2691,6 +2715,25 @@ class TestCrossPolicyObjectVersioningEnv(object):
|
||||
# if versioning is off, then X-Versions-Location won't persist
|
||||
cls.versioning_enabled = 'versions' in container_info
|
||||
|
||||
# setup another account to test ACLs
|
||||
config2 = deepcopy(tf.config)
|
||||
config2['account'] = tf.config['account2']
|
||||
config2['username'] = tf.config['username2']
|
||||
config2['password'] = tf.config['password2']
|
||||
cls.conn2 = Connection(config2)
|
||||
cls.storage_url2, cls.storage_token2 = cls.conn2.authenticate()
|
||||
cls.account2 = cls.conn2.get_account()
|
||||
cls.account2.delete_containers()
|
||||
|
||||
# setup another account with no access to anything to test ACLs
|
||||
config3 = deepcopy(tf.config)
|
||||
config3['account'] = tf.config['account']
|
||||
config3['username'] = tf.config['username3']
|
||||
config3['password'] = tf.config['password3']
|
||||
cls.conn3 = Connection(config3)
|
||||
cls.storage_url3, cls.storage_token3 = cls.conn3.authenticate()
|
||||
cls.account3 = cls.conn3.get_account()
|
||||
|
||||
|
||||
class TestObjectVersioning(Base):
|
||||
env = TestObjectVersioningEnv
|
||||
@ -2709,40 +2752,103 @@ class TestObjectVersioning(Base):
|
||||
def tearDown(self):
|
||||
super(TestObjectVersioning, self).tearDown()
|
||||
try:
|
||||
# delete versions first!
|
||||
# only delete files and not container
|
||||
# as they were configured in self.env
|
||||
self.env.versions_container.delete_files()
|
||||
self.env.container.delete_files()
|
||||
except ResponseError:
|
||||
pass
|
||||
|
||||
def test_clear_version_option(self):
|
||||
# sanity
|
||||
self.assertEqual(self.env.container.info()['versions'],
|
||||
self.env.versions_container.name)
|
||||
self.env.container.update_metadata(
|
||||
hdrs={'X-Versions-Location': ''})
|
||||
self.assertEqual(self.env.container.info().get('versions'), None)
|
||||
|
||||
# set location back to the way it was
|
||||
self.env.container.update_metadata(
|
||||
hdrs={'X-Versions-Location': self.env.versions_container.name})
|
||||
self.assertEqual(self.env.container.info()['versions'],
|
||||
self.env.versions_container.name)
|
||||
|
||||
def test_overwriting(self):
|
||||
container = self.env.container
|
||||
versions_container = self.env.versions_container
|
||||
cont_info = container.info()
|
||||
self.assertEquals(cont_info['versions'], versions_container.name)
|
||||
|
||||
obj_name = Utils.create_name()
|
||||
|
||||
versioned_obj = container.file(obj_name)
|
||||
versioned_obj.write("aaaaa")
|
||||
versioned_obj.write("aaaaa", hdrs={'Content-Type': 'text/jibberish01'})
|
||||
obj_info = versioned_obj.info()
|
||||
self.assertEqual('text/jibberish01', obj_info['content_type'])
|
||||
|
||||
self.assertEqual(0, versions_container.info()['object_count'])
|
||||
|
||||
versioned_obj.write("bbbbb")
|
||||
versioned_obj.write("bbbbb", hdrs={'Content-Type': 'text/jibberish02',
|
||||
'X-Object-Meta-Foo': 'Bar'})
|
||||
versioned_obj.initialize()
|
||||
self.assertEqual(versioned_obj.content_type, 'text/jibberish02')
|
||||
self.assertEqual(versioned_obj.metadata['foo'], 'Bar')
|
||||
|
||||
# the old version got saved off
|
||||
self.assertEqual(1, versions_container.info()['object_count'])
|
||||
versioned_obj_name = versions_container.files()[0]
|
||||
self.assertEqual(
|
||||
"aaaaa", versions_container.file(versioned_obj_name).read())
|
||||
prev_version = versions_container.file(versioned_obj_name)
|
||||
prev_version.initialize()
|
||||
self.assertEqual("aaaaa", prev_version.read())
|
||||
self.assertEqual(prev_version.content_type, 'text/jibberish01')
|
||||
|
||||
# make sure the new obj metadata did not leak to the prev. version
|
||||
self.assertTrue('foo' not in prev_version.metadata)
|
||||
|
||||
# check that POST does not create a new version
|
||||
versioned_obj.sync_metadata(metadata={'fu': 'baz'})
|
||||
self.assertEqual(1, versions_container.info()['object_count'])
|
||||
|
||||
# if we overwrite it again, there are two versions
|
||||
versioned_obj.write("ccccc")
|
||||
self.assertEqual(2, versions_container.info()['object_count'])
|
||||
versioned_obj_name = versions_container.files()[1]
|
||||
prev_version = versions_container.file(versioned_obj_name)
|
||||
prev_version.initialize()
|
||||
self.assertEqual("bbbbb", prev_version.read())
|
||||
self.assertEqual(prev_version.content_type, 'text/jibberish02')
|
||||
self.assertTrue('foo' in prev_version.metadata)
|
||||
self.assertTrue('fu' in prev_version.metadata)
|
||||
|
||||
# as we delete things, the old contents return
|
||||
self.assertEqual("ccccc", versioned_obj.read())
|
||||
|
||||
# test copy from a different container
|
||||
src_container = self.env.account.container(Utils.create_name())
|
||||
self.assertTrue(src_container.create())
|
||||
src_name = Utils.create_name()
|
||||
src_obj = src_container.file(src_name)
|
||||
src_obj.write("ddddd", hdrs={'Content-Type': 'text/jibberish04'})
|
||||
src_obj.copy(container.name, obj_name)
|
||||
|
||||
self.assertEqual("ddddd", versioned_obj.read())
|
||||
versioned_obj.initialize()
|
||||
self.assertEqual(versioned_obj.content_type, 'text/jibberish04')
|
||||
|
||||
# make sure versions container has the previous version
|
||||
self.assertEqual(3, versions_container.info()['object_count'])
|
||||
versioned_obj_name = versions_container.files()[2]
|
||||
prev_version = versions_container.file(versioned_obj_name)
|
||||
prev_version.initialize()
|
||||
self.assertEqual("ccccc", prev_version.read())
|
||||
|
||||
# test delete
|
||||
versioned_obj.delete()
|
||||
self.assertEqual("ccccc", versioned_obj.read())
|
||||
versioned_obj.delete()
|
||||
self.assertEqual("bbbbb", versioned_obj.read())
|
||||
versioned_obj.delete()
|
||||
self.assertEqual("aaaaa", versioned_obj.read())
|
||||
self.assertEqual(0, versions_container.info()['object_count'])
|
||||
versioned_obj.delete()
|
||||
self.assertRaises(ResponseError, versioned_obj.read)
|
||||
|
||||
@ -2774,6 +2880,87 @@ class TestObjectVersioning(Base):
|
||||
self.assertEqual(3, versions_container.info()['object_count'])
|
||||
self.assertEqual("112233", man_file.read())
|
||||
|
||||
def test_versioning_container_acl(self):
|
||||
# create versions container and DO NOT give write access to account2
|
||||
versions_container = self.env.account.container(Utils.create_name())
|
||||
self.assertTrue(versions_container.create(hdrs={
|
||||
'X-Container-Write': ''
|
||||
}))
|
||||
|
||||
# check account2 cannot write to versions container
|
||||
fail_obj_name = Utils.create_name()
|
||||
fail_obj = versions_container.file(fail_obj_name)
|
||||
self.assertRaises(ResponseError, fail_obj.write, "should fail",
|
||||
cfg={'use_token': self.env.storage_token2})
|
||||
|
||||
# create container and give write access to account2
|
||||
# don't set X-Versions-Location just yet
|
||||
container = self.env.account.container(Utils.create_name())
|
||||
self.assertTrue(container.create(hdrs={
|
||||
'X-Container-Write': self.env.conn2.user_acl}))
|
||||
|
||||
# check account2 cannot set X-Versions-Location on container
|
||||
self.assertRaises(ResponseError, container.update_metadata, hdrs={
|
||||
'X-Versions-Location': versions_container},
|
||||
cfg={'use_token': self.env.storage_token2})
|
||||
|
||||
# good! now let admin set the X-Versions-Location
|
||||
# p.s.: sticking a 'x-remove' header here to test precedence
|
||||
# of both headers. Setting the location should succeed.
|
||||
self.assertTrue(container.update_metadata(hdrs={
|
||||
'X-Remove-Versions-Location': versions_container,
|
||||
'X-Versions-Location': versions_container}))
|
||||
|
||||
# write object twice to container and check version
|
||||
obj_name = Utils.create_name()
|
||||
versioned_obj = container.file(obj_name)
|
||||
self.assertTrue(versioned_obj.write("never argue with the data",
|
||||
cfg={'use_token': self.env.storage_token2}))
|
||||
self.assertEqual(versioned_obj.read(), "never argue with the data")
|
||||
|
||||
self.assertTrue(
|
||||
versioned_obj.write("we don't have no beer, just tequila",
|
||||
cfg={'use_token': self.env.storage_token2}))
|
||||
self.assertEqual(versioned_obj.read(),
|
||||
"we don't have no beer, just tequila")
|
||||
self.assertEqual(1, versions_container.info()['object_count'])
|
||||
|
||||
# read the original uploaded object
|
||||
for filename in versions_container.files():
|
||||
backup_file = versions_container.file(filename)
|
||||
break
|
||||
self.assertEqual(backup_file.read(), "never argue with the data")
|
||||
|
||||
# user3 (some random user with no access to anything)
|
||||
# tries to read from versioned container
|
||||
self.assertRaises(ResponseError, backup_file.read,
|
||||
cfg={'use_token': self.env.storage_token3})
|
||||
|
||||
# user3 cannot write or delete from source container either
|
||||
self.assertRaises(ResponseError, versioned_obj.write,
|
||||
"some random user trying to write data",
|
||||
cfg={'use_token': self.env.storage_token3})
|
||||
self.assertRaises(ResponseError, versioned_obj.delete,
|
||||
cfg={'use_token': self.env.storage_token3})
|
||||
|
||||
# user2 can't read or delete from versions-location
|
||||
self.assertRaises(ResponseError, backup_file.read,
|
||||
cfg={'use_token': self.env.storage_token2})
|
||||
self.assertRaises(ResponseError, backup_file.delete,
|
||||
cfg={'use_token': self.env.storage_token2})
|
||||
|
||||
# but is able to delete from the source container
|
||||
# this could be a helpful scenario for dev ops that want to setup
|
||||
# just one container to hold object versions of multiple containers
|
||||
# and each one of those containers are owned by different users
|
||||
self.assertTrue(versioned_obj.delete(
|
||||
cfg={'use_token': self.env.storage_token2}))
|
||||
|
||||
# tear-down since we create these containers here
|
||||
# and not in self.env
|
||||
versions_container.delete_recursive()
|
||||
container.delete_recursive()
|
||||
|
||||
def test_versioning_check_acl(self):
|
||||
container = self.env.container
|
||||
versions_container = self.env.versions_container
|
||||
|
@ -76,7 +76,7 @@ class FakeSwift(object):
|
||||
path += '?' + env['QUERY_STRING']
|
||||
|
||||
if 'swift.authorize' in env:
|
||||
resp = env['swift.authorize']()
|
||||
resp = env['swift.authorize'](swob.Request(env))
|
||||
if resp:
|
||||
return resp(env, start_response)
|
||||
|
||||
|
@ -793,7 +793,7 @@ class TestDloGetManifest(DloTestCase):
|
||||
def test_get_with_auth_overridden(self):
|
||||
auth_got_called = [0]
|
||||
|
||||
def my_auth():
|
||||
def my_auth(req):
|
||||
auth_got_called[0] += 1
|
||||
return None
|
||||
|
||||
|
558
test/unit/common/middleware/test_versioned_writes.py
Normal file
558
test/unit/common/middleware/test_versioned_writes.py
Normal file
@ -0,0 +1,558 @@
|
||||
# Copyright (c) 2013 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import unittest
|
||||
from swift.common import swob
|
||||
from swift.common.middleware import versioned_writes
|
||||
from swift.common.swob import Request
|
||||
from test.unit.common.middleware.helpers import FakeSwift
|
||||
|
||||
|
||||
class FakeCache(object):
|
||||
|
||||
def __init__(self, val):
|
||||
if 'status' not in val:
|
||||
val['status'] = 200
|
||||
self.val = val
|
||||
|
||||
def get(self, *args):
|
||||
return self.val
|
||||
|
||||
|
||||
class VersionedWritesTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.app = FakeSwift()
|
||||
conf = {'allow_versioned_writes': 'true'}
|
||||
self.vw = versioned_writes.filter_factory(conf)(self.app)
|
||||
|
||||
def call_app(self, req, app=None, expect_exception=False):
|
||||
if app is None:
|
||||
app = self.app
|
||||
|
||||
self.authorized = []
|
||||
|
||||
def authorize(req):
|
||||
self.authorized.append(req)
|
||||
|
||||
if 'swift.authorize' not in req.environ:
|
||||
req.environ['swift.authorize'] = authorize
|
||||
|
||||
req.headers.setdefault("User-Agent", "Marula Kruger")
|
||||
|
||||
status = [None]
|
||||
headers = [None]
|
||||
|
||||
def start_response(s, h, ei=None):
|
||||
status[0] = s
|
||||
headers[0] = h
|
||||
|
||||
body_iter = app(req.environ, start_response)
|
||||
body = ''
|
||||
caught_exc = None
|
||||
try:
|
||||
for chunk in body_iter:
|
||||
body += chunk
|
||||
except Exception as exc:
|
||||
if expect_exception:
|
||||
caught_exc = exc
|
||||
else:
|
||||
raise
|
||||
|
||||
if expect_exception:
|
||||
return status[0], headers[0], body, caught_exc
|
||||
else:
|
||||
return status[0], headers[0], body
|
||||
|
||||
def call_vw(self, req, **kwargs):
|
||||
return self.call_app(req, app=self.vw, **kwargs)
|
||||
|
||||
def assertRequestEqual(self, req, other):
|
||||
self.assertEqual(req.method, other.method)
|
||||
self.assertEqual(req.path, other.path)
|
||||
|
||||
def test_put_container(self):
|
||||
self.app.register('PUT', '/v1/a/c', swob.HTTPOk, {}, 'passed')
|
||||
req = Request.blank('/v1/a/c',
|
||||
headers={'X-Versions-Location': 'ver_cont'},
|
||||
environ={'REQUEST_METHOD': 'PUT'})
|
||||
status, headers, body = self.call_vw(req)
|
||||
self.assertEquals(status, '200 OK')
|
||||
|
||||
# check for sysmeta header
|
||||
calls = self.app.calls_with_headers
|
||||
method, path, req_headers = calls[0]
|
||||
self.assertEquals('PUT', method)
|
||||
self.assertEquals('/v1/a/c', path)
|
||||
self.assertTrue('x-container-sysmeta-versions-location' in req_headers)
|
||||
self.assertEqual(len(self.authorized), 1)
|
||||
self.assertRequestEqual(req, self.authorized[0])
|
||||
|
||||
def test_container_allow_versioned_writes_false(self):
|
||||
self.vw.conf = {'allow_versioned_writes': 'false'}
|
||||
|
||||
# PUT/POST container must fail as 412 when allow_versioned_writes
|
||||
# set to false
|
||||
for method in ('PUT', 'POST'):
|
||||
req = Request.blank('/v1/a/c',
|
||||
headers={'X-Versions-Location': 'ver_cont'},
|
||||
environ={'REQUEST_METHOD': method})
|
||||
try:
|
||||
status, headers, body = self.call_vw(req)
|
||||
except swob.HTTPException as e:
|
||||
pass
|
||||
self.assertEquals(e.status_int, 412)
|
||||
|
||||
# GET/HEAD performs as normal
|
||||
self.app.register('GET', '/v1/a/c', swob.HTTPOk, {}, 'passed')
|
||||
self.app.register('HEAD', '/v1/a/c', swob.HTTPOk, {}, 'passed')
|
||||
|
||||
for method in ('GET', 'HEAD'):
|
||||
req = Request.blank('/v1/a/c',
|
||||
headers={'X-Versions-Location': 'ver_cont'},
|
||||
environ={'REQUEST_METHOD': method})
|
||||
status, headers, body = self.call_vw(req)
|
||||
self.assertEquals(status, '200 OK')
|
||||
|
||||
def test_remove_versions_location(self):
|
||||
self.app.register('POST', '/v1/a/c', swob.HTTPOk, {}, 'passed')
|
||||
req = Request.blank('/v1/a/c',
|
||||
headers={'X-Remove-Versions-Location': 'x'},
|
||||
environ={'REQUEST_METHOD': 'POST'})
|
||||
status, headers, body = self.call_vw(req)
|
||||
self.assertEquals(status, '200 OK')
|
||||
|
||||
# check for sysmeta header
|
||||
calls = self.app.calls_with_headers
|
||||
method, path, req_headers = calls[0]
|
||||
self.assertEquals('POST', method)
|
||||
self.assertEquals('/v1/a/c', path)
|
||||
self.assertTrue('x-container-sysmeta-versions-location' in req_headers)
|
||||
self.assertTrue('x-versions-location' in req_headers)
|
||||
self.assertEqual(len(self.authorized), 1)
|
||||
self.assertRequestEqual(req, self.authorized[0])
|
||||
|
||||
def test_remove_add_versions_precedence(self):
|
||||
self.app.register(
|
||||
'POST', '/v1/a/c', swob.HTTPOk,
|
||||
{'x-container-sysmeta-versions-location': 'ver_cont'},
|
||||
'passed')
|
||||
req = Request.blank('/v1/a/c',
|
||||
headers={'X-Remove-Versions-Location': 'x',
|
||||
'X-Versions-Location': 'ver_cont'},
|
||||
environ={'REQUEST_METHOD': 'POST'})
|
||||
|
||||
status, headers, body = self.call_vw(req)
|
||||
self.assertEquals(status, '200 OK')
|
||||
self.assertTrue(('X-Versions-Location', 'ver_cont') in headers)
|
||||
|
||||
# check for sysmeta header
|
||||
calls = self.app.calls_with_headers
|
||||
method, path, req_headers = calls[0]
|
||||
self.assertEquals('POST', method)
|
||||
self.assertEquals('/v1/a/c', path)
|
||||
self.assertTrue('x-container-sysmeta-versions-location' in req_headers)
|
||||
self.assertTrue('x-remove-versions-location' not in req_headers)
|
||||
self.assertEqual(len(self.authorized), 1)
|
||||
self.assertRequestEqual(req, self.authorized[0])
|
||||
|
||||
def test_get_container(self):
|
||||
self.app.register(
|
||||
'GET', '/v1/a/c', swob.HTTPOk,
|
||||
{'x-container-sysmeta-versions-location': 'ver_cont'}, None)
|
||||
req = Request.blank(
|
||||
'/v1/a/c',
|
||||
environ={'REQUEST_METHOD': 'GET'})
|
||||
status, headers, body = self.call_vw(req)
|
||||
self.assertEquals(status, '200 OK')
|
||||
self.assertTrue(('X-Versions-Location', 'ver_cont') in headers)
|
||||
self.assertEqual(len(self.authorized), 1)
|
||||
self.assertRequestEqual(req, self.authorized[0])
|
||||
|
||||
def test_get_head(self):
|
||||
self.app.register('GET', '/v1/a/c/o', swob.HTTPOk, {}, None)
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'GET'})
|
||||
status, headers, body = self.call_vw(req)
|
||||
self.assertEquals(status, '200 OK')
|
||||
self.assertEqual(len(self.authorized), 1)
|
||||
self.assertRequestEqual(req, self.authorized[0])
|
||||
|
||||
self.app.register('HEAD', '/v1/a/c/o', swob.HTTPOk, {}, None)
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'HEAD'})
|
||||
status, headers, body = self.call_vw(req)
|
||||
self.assertEquals(status, '200 OK')
|
||||
self.assertEqual(len(self.authorized), 1)
|
||||
self.assertRequestEqual(req, self.authorized[0])
|
||||
|
||||
def test_put_object_no_versioning(self):
|
||||
self.app.register(
|
||||
'PUT', '/v1/a/c/o', swob.HTTPOk, {}, 'passed')
|
||||
|
||||
cache = FakeCache({})
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT', 'swift.cache': cache,
|
||||
'CONTENT_LENGTH': '100'})
|
||||
status, headers, body = self.call_vw(req)
|
||||
self.assertEquals(status, '200 OK')
|
||||
self.assertEqual(len(self.authorized), 1)
|
||||
self.assertRequestEqual(req, self.authorized[0])
|
||||
|
||||
def test_put_first_object_success(self):
|
||||
self.app.register(
|
||||
'PUT', '/v1/a/c/o', swob.HTTPOk, {}, 'passed')
|
||||
self.app.register(
|
||||
'HEAD', '/v1/a/c/o', swob.HTTPNotFound, {}, None)
|
||||
|
||||
cache = FakeCache({'sysmeta': {'versions-location': 'ver_cont'}})
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT', 'swift.cache': cache,
|
||||
'CONTENT_LENGTH': '100'})
|
||||
status, headers, body = self.call_vw(req)
|
||||
self.assertEquals(status, '200 OK')
|
||||
self.assertEqual(len(self.authorized), 1)
|
||||
self.assertRequestEqual(req, self.authorized[0])
|
||||
|
||||
def test_PUT_versioning_with_nonzero_default_policy(self):
|
||||
self.app.register(
|
||||
'PUT', '/v1/a/c/o', swob.HTTPOk, {}, 'passed')
|
||||
self.app.register(
|
||||
'HEAD', '/v1/a/c/o', swob.HTTPNotFound, {}, None)
|
||||
|
||||
cache = FakeCache({'versions': 'ver_cont', 'storage_policy': '2'})
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT', 'swift.cache': cache,
|
||||
'CONTENT_LENGTH': '100'})
|
||||
status, headers, body = self.call_vw(req)
|
||||
self.assertEquals(status, '200 OK')
|
||||
|
||||
# check for 'X-Backend-Storage-Policy-Index' in HEAD request
|
||||
calls = self.app.calls_with_headers
|
||||
method, path, req_headers = calls[0]
|
||||
self.assertEquals('HEAD', method)
|
||||
self.assertEquals('/v1/a/c/o', path)
|
||||
self.assertTrue('X-Backend-Storage-Policy-Index' in req_headers)
|
||||
self.assertEquals('2',
|
||||
req_headers.get('X-Backend-Storage-Policy-Index'))
|
||||
self.assertEqual(len(self.authorized), 1)
|
||||
self.assertRequestEqual(req, self.authorized[0])
|
||||
|
||||
def test_put_object_no_versioning_with_container_config_true(self):
|
||||
# set False to versions_write obsously and expect no COPY occurred
|
||||
self.vw.conf = {'allow_versioned_writes': 'false'}
|
||||
self.app.register(
|
||||
'PUT', '/v1/a/c/o', swob.HTTPCreated, {}, 'passed')
|
||||
self.app.register(
|
||||
'HEAD', '/v1/a/c/o', swob.HTTPOk,
|
||||
{'last-modified': 'Wed, 19 Nov 2014 18:19:02 GMT'}, 'passed')
|
||||
cache = FakeCache({'versions': 'ver_cont'})
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT', 'swift.cache': cache,
|
||||
'CONTENT_LENGTH': '100'})
|
||||
status, headers, body = self.call_vw(req)
|
||||
self.assertEquals(status, '201 Created')
|
||||
self.assertEqual(len(self.authorized), 1)
|
||||
self.assertRequestEqual(req, self.authorized[0])
|
||||
called_method = [method for (method, path, hdrs) in self.app._calls]
|
||||
self.assertTrue('COPY' not in called_method)
|
||||
|
||||
def test_delete_object_no_versioning_with_container_config_true(self):
|
||||
# set False to versions_write obviously and expect no GET versioning
|
||||
# container and COPY called (just delete object as normal)
|
||||
self.vw.conf = {'allow_versioned_writes': 'false'}
|
||||
self.app.register(
|
||||
'DELETE', '/v1/a/c/o', swob.HTTPNoContent, {}, 'passed')
|
||||
cache = FakeCache({'versions': 'ver_cont'})
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'DELETE', 'swift.cache': cache})
|
||||
status, headers, body = self.call_vw(req)
|
||||
self.assertEquals(status, '204 No Content')
|
||||
self.assertEqual(len(self.authorized), 1)
|
||||
self.assertRequestEqual(req, self.authorized[0])
|
||||
called_method = \
|
||||
[method for (method, path, rheaders) in self.app._calls]
|
||||
self.assertTrue('COPY' not in called_method)
|
||||
self.assertTrue('GET' not in called_method)
|
||||
|
||||
def test_copy_object_no_versioning_with_container_config_true(self):
|
||||
# set False to versions_write obviously and expect no extra
|
||||
# COPY called (just copy object as normal)
|
||||
self.vw.conf = {'allow_versioned_writes': 'false'}
|
||||
self.app.register(
|
||||
'COPY', '/v1/a/c/o', swob.HTTPCreated, {}, None)
|
||||
cache = FakeCache({'versions': 'ver_cont'})
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'COPY', 'swift.cache': cache})
|
||||
status, headers, body = self.call_vw(req)
|
||||
self.assertEquals(status, '201 Created')
|
||||
self.assertEqual(len(self.authorized), 1)
|
||||
self.assertRequestEqual(req, self.authorized[0])
|
||||
called_method = \
|
||||
[method for (method, path, rheaders) in self.app._calls]
|
||||
self.assertTrue('COPY' in called_method)
|
||||
self.assertEquals(called_method.count('COPY'), 1)
|
||||
|
||||
def test_new_version_success(self):
|
||||
self.app.register(
|
||||
'PUT', '/v1/a/c/o', swob.HTTPOk, {}, 'passed')
|
||||
self.app.register(
|
||||
'HEAD', '/v1/a/c/o', swob.HTTPOk,
|
||||
{'last-modified': 'Wed, 19 Nov 2014 18:19:02 GMT'}, 'passed')
|
||||
self.app.register(
|
||||
'COPY', '/v1/a/c/o', swob.HTTPCreated, {}, None)
|
||||
cache = FakeCache({'sysmeta': {'versions-location': 'ver_cont'}})
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT', 'swift.cache': cache,
|
||||
'CONTENT_LENGTH': '100'})
|
||||
status, headers, body = self.call_vw(req)
|
||||
self.assertEquals(status, '200 OK')
|
||||
self.assertEqual(len(self.authorized), 1)
|
||||
self.assertRequestEqual(req, self.authorized[0])
|
||||
|
||||
def test_new_version_sysmeta_precedence(self):
|
||||
self.app.register(
|
||||
'PUT', '/v1/a/c/o', swob.HTTPOk, {}, 'passed')
|
||||
self.app.register(
|
||||
'HEAD', '/v1/a/c/o', swob.HTTPOk,
|
||||
{'last-modified': 'Wed, 19 Nov 2014 18:19:02 GMT'}, 'passed')
|
||||
self.app.register(
|
||||
'COPY', '/v1/a/c/o', swob.HTTPCreated, {}, None)
|
||||
|
||||
# fill cache with two different values for versions location
|
||||
# new middleware should use sysmeta first
|
||||
cache = FakeCache({'versions': 'old_ver_cont',
|
||||
'sysmeta': {'versions-location': 'ver_cont'}})
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT', 'swift.cache': cache,
|
||||
'CONTENT_LENGTH': '100'})
|
||||
status, headers, body = self.call_vw(req)
|
||||
self.assertEquals(status, '200 OK')
|
||||
self.assertEqual(len(self.authorized), 1)
|
||||
self.assertRequestEqual(req, self.authorized[0])
|
||||
|
||||
# check that sysmeta header was used
|
||||
calls = self.app.calls_with_headers
|
||||
method, path, req_headers = calls[1]
|
||||
self.assertEquals('COPY', method)
|
||||
self.assertEquals('/v1/a/c/o', path)
|
||||
self.assertTrue(req_headers['Destination'].startswith('ver_cont/'))
|
||||
|
||||
def test_copy_first_version(self):
|
||||
self.app.register(
|
||||
'COPY', '/v1/a/src_cont/src_obj', swob.HTTPOk, {}, 'passed')
|
||||
self.app.register(
|
||||
'HEAD', '/v1/a/tgt_cont/tgt_obj', swob.HTTPNotFound, {}, None)
|
||||
cache = FakeCache({'sysmeta': {'versions-location': 'ver_cont'}})
|
||||
req = Request.blank(
|
||||
'/v1/a/src_cont/src_obj',
|
||||
environ={'REQUEST_METHOD': 'COPY', 'swift.cache': cache,
|
||||
'CONTENT_LENGTH': '100'},
|
||||
headers={'Destination': 'tgt_cont/tgt_obj'})
|
||||
status, headers, body = self.call_vw(req)
|
||||
self.assertEquals(status, '200 OK')
|
||||
self.assertEqual(len(self.authorized), 1)
|
||||
self.assertRequestEqual(req, self.authorized[0])
|
||||
|
||||
def test_copy_new_version(self):
|
||||
self.app.register(
|
||||
'COPY', '/v1/a/src_cont/src_obj', swob.HTTPOk, {}, 'passed')
|
||||
self.app.register(
|
||||
'HEAD', '/v1/a/tgt_cont/tgt_obj', swob.HTTPOk,
|
||||
{'last-modified': 'Wed, 19 Nov 2014 18:19:02 GMT'}, 'passed')
|
||||
self.app.register(
|
||||
'COPY', '/v1/a/tgt_cont/tgt_obj', swob.HTTPCreated, {}, None)
|
||||
cache = FakeCache({'sysmeta': {'versions-location': 'ver_cont'}})
|
||||
req = Request.blank(
|
||||
'/v1/a/src_cont/src_obj',
|
||||
environ={'REQUEST_METHOD': 'COPY', 'swift.cache': cache,
|
||||
'CONTENT_LENGTH': '100'},
|
||||
headers={'Destination': 'tgt_cont/tgt_obj'})
|
||||
status, headers, body = self.call_vw(req)
|
||||
self.assertEquals(status, '200 OK')
|
||||
self.assertEqual(len(self.authorized), 1)
|
||||
self.assertRequestEqual(req, self.authorized[0])
|
||||
|
||||
def test_copy_new_version_different_account(self):
|
||||
self.app.register(
|
||||
'COPY', '/v1/src_a/src_cont/src_obj', swob.HTTPOk, {}, 'passed')
|
||||
self.app.register(
|
||||
'HEAD', '/v1/tgt_a/tgt_cont/tgt_obj', swob.HTTPOk,
|
||||
{'last-modified': 'Wed, 19 Nov 2014 18:19:02 GMT'}, 'passed')
|
||||
self.app.register(
|
||||
'COPY', '/v1/tgt_a/tgt_cont/tgt_obj', swob.HTTPCreated, {}, None)
|
||||
cache = FakeCache({'sysmeta': {'versions-location': 'ver_cont'}})
|
||||
req = Request.blank(
|
||||
'/v1/src_a/src_cont/src_obj',
|
||||
environ={'REQUEST_METHOD': 'COPY', 'swift.cache': cache,
|
||||
'CONTENT_LENGTH': '100'},
|
||||
headers={'Destination': 'tgt_cont/tgt_obj',
|
||||
'Destination-Account': 'tgt_a'})
|
||||
status, headers, body = self.call_vw(req)
|
||||
self.assertEquals(status, '200 OK')
|
||||
self.assertEqual(len(self.authorized), 1)
|
||||
self.assertRequestEqual(req, self.authorized[0])
|
||||
|
||||
def test_delete_first_object_success(self):
|
||||
self.app.register(
|
||||
'DELETE', '/v1/a/c/o', swob.HTTPOk, {}, 'passed')
|
||||
self.app.register(
|
||||
'GET', '/v1/a/ver_cont?format=json&prefix=001o/&marker=',
|
||||
swob.HTTPNotFound, {}, None)
|
||||
|
||||
cache = FakeCache({'sysmeta': {'versions-location': 'ver_cont'}})
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'DELETE', 'swift.cache': cache,
|
||||
'CONTENT_LENGTH': '0'})
|
||||
status, headers, body = self.call_vw(req)
|
||||
self.assertEquals(status, '200 OK')
|
||||
self.assertEqual(len(self.authorized), 1)
|
||||
self.assertRequestEqual(req, self.authorized[0])
|
||||
|
||||
def test_delete_latest_version_success(self):
|
||||
self.app.register(
|
||||
'DELETE', '/v1/a/c/o', swob.HTTPOk, {}, 'passed')
|
||||
self.app.register(
|
||||
'GET', '/v1/a/ver_cont?format=json&prefix=001o/&marker=',
|
||||
swob.HTTPOk, {},
|
||||
'[{"hash": "x", '
|
||||
'"last_modified": "2014-11-21T14:14:27.409100", '
|
||||
'"bytes": 3, '
|
||||
'"name": "001o/1", '
|
||||
'"content_type": "text/plain"}, '
|
||||
'{"hash": "y", '
|
||||
'"last_modified": "2014-11-21T14:23:02.206740", '
|
||||
'"bytes": 3, '
|
||||
'"name": "001o/2", '
|
||||
'"content_type": "text/plain"}]')
|
||||
self.app.register(
|
||||
'GET', '/v1/a/ver_cont?format=json&prefix=001o/'
|
||||
'&marker=001o/2',
|
||||
swob.HTTPNotFound, {}, None)
|
||||
self.app.register(
|
||||
'COPY', '/v1/a/ver_cont/001o/2', swob.HTTPCreated,
|
||||
{}, None)
|
||||
self.app.register(
|
||||
'DELETE', '/v1/a/ver_cont/001o/2', swob.HTTPOk,
|
||||
{}, None)
|
||||
|
||||
cache = FakeCache({'sysmeta': {'versions-location': 'ver_cont'}})
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o',
|
||||
headers={'X-If-Delete-At': 1},
|
||||
environ={'REQUEST_METHOD': 'DELETE', 'swift.cache': cache,
|
||||
'CONTENT_LENGTH': '0'})
|
||||
status, headers, body = self.call_vw(req)
|
||||
self.assertEquals(status, '200 OK')
|
||||
self.assertEqual(len(self.authorized), 1)
|
||||
self.assertRequestEqual(req, self.authorized[0])
|
||||
|
||||
# check that X-If-Delete-At was removed from DELETE request
|
||||
calls = self.app.calls_with_headers
|
||||
method, path, req_headers = calls.pop()
|
||||
self.assertEquals('DELETE', method)
|
||||
self.assertTrue(path.startswith('/v1/a/ver_cont/001o/2'))
|
||||
self.assertFalse('x-if-delete-at' in req_headers or
|
||||
'X-If-Delete-At' in req_headers)
|
||||
|
||||
def test_DELETE_on_expired_versioned_object(self):
|
||||
self.app.register(
|
||||
'GET', '/v1/a/ver_cont?format=json&prefix=001o/&marker=',
|
||||
swob.HTTPOk, {},
|
||||
'[{"hash": "x", '
|
||||
'"last_modified": "2014-11-21T14:14:27.409100", '
|
||||
'"bytes": 3, '
|
||||
'"name": "001o/1", '
|
||||
'"content_type": "text/plain"}, '
|
||||
'{"hash": "y", '
|
||||
'"last_modified": "2014-11-21T14:23:02.206740", '
|
||||
'"bytes": 3, '
|
||||
'"name": "001o/2", '
|
||||
'"content_type": "text/plain"}]')
|
||||
self.app.register(
|
||||
'GET', '/v1/a/ver_cont?format=json&prefix=001o/'
|
||||
'&marker=001o/2',
|
||||
swob.HTTPNotFound, {}, None)
|
||||
|
||||
# expired object
|
||||
self.app.register(
|
||||
'COPY', '/v1/a/ver_cont/001o/2', swob.HTTPNotFound,
|
||||
{}, None)
|
||||
self.app.register(
|
||||
'COPY', '/v1/a/ver_cont/001o/1', swob.HTTPCreated,
|
||||
{}, None)
|
||||
self.app.register(
|
||||
'DELETE', '/v1/a/ver_cont/001o/1', swob.HTTPOk,
|
||||
{}, None)
|
||||
|
||||
cache = FakeCache({'sysmeta': {'versions-location': 'ver_cont'}})
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'DELETE', 'swift.cache': cache,
|
||||
'CONTENT_LENGTH': '0'})
|
||||
status, headers, body = self.call_vw(req)
|
||||
self.assertEquals(status, '200 OK')
|
||||
self.assertEqual(len(self.authorized), 1)
|
||||
self.assertRequestEqual(req, self.authorized[0])
|
||||
|
||||
def test_denied_DELETE_of_versioned_object(self):
|
||||
authorize_call = []
|
||||
self.app.register(
|
||||
'DELETE', '/v1/a/c/o', swob.HTTPOk, {}, 'passed')
|
||||
self.app.register(
|
||||
'GET', '/v1/a/ver_cont?format=json&prefix=001o/&marker=',
|
||||
swob.HTTPOk, {},
|
||||
'[{"hash": "x", '
|
||||
'"last_modified": "2014-11-21T14:14:27.409100", '
|
||||
'"bytes": 3, '
|
||||
'"name": "001o/1", '
|
||||
'"content_type": "text/plain"}, '
|
||||
'{"hash": "y", '
|
||||
'"last_modified": "2014-11-21T14:23:02.206740", '
|
||||
'"bytes": 3, '
|
||||
'"name": "001o/2", '
|
||||
'"content_type": "text/plain"}]')
|
||||
self.app.register(
|
||||
'GET', '/v1/a/ver_cont?format=json&prefix=001o/'
|
||||
'&marker=001o/2',
|
||||
swob.HTTPNotFound, {}, None)
|
||||
self.app.register(
|
||||
'DELETE', '/v1/a/c/o', swob.HTTPForbidden,
|
||||
{}, None)
|
||||
|
||||
def fake_authorize(req):
|
||||
authorize_call.append(req)
|
||||
return swob.HTTPForbidden()
|
||||
|
||||
cache = FakeCache({'sysmeta': {'versions-location': 'ver_cont'}})
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'DELETE', 'swift.cache': cache,
|
||||
'swift.authorize': fake_authorize,
|
||||
'CONTENT_LENGTH': '0'})
|
||||
status, headers, body = self.call_vw(req)
|
||||
self.assertEquals(status, '403 Forbidden')
|
||||
self.assertEqual(len(authorize_call), 1)
|
||||
self.assertRequestEqual(req, authorize_call[0])
|
@ -515,6 +515,24 @@ class TestConstraints(unittest.TestCase):
|
||||
constraints.check_account_format,
|
||||
req, req.headers['X-Copy-From-Account'])
|
||||
|
||||
def test_check_container_format(self):
|
||||
invalid_versions_locations = (
|
||||
'container/with/slashes',
|
||||
'', # empty
|
||||
)
|
||||
for versions_location in invalid_versions_locations:
|
||||
req = Request.blank(
|
||||
'/v/a/c/o', headers={
|
||||
'X-Versions-Location': versions_location})
|
||||
try:
|
||||
constraints.check_container_format(
|
||||
req, req.headers['X-Versions-Location'])
|
||||
except HTTPException as e:
|
||||
self.assertTrue(e.body.startswith('Container name cannot'))
|
||||
else:
|
||||
self.fail('check_container_format did not raise error for %r' %
|
||||
req.headers['X-Versions-Location'])
|
||||
|
||||
|
||||
class TestConstraintsConfig(unittest.TestCase):
|
||||
|
||||
|
@ -141,6 +141,11 @@ class TestWSGI(unittest.TestCase):
|
||||
expected = swift.common.middleware.dlo.DynamicLargeObject
|
||||
self.assertTrue(isinstance(app, expected))
|
||||
|
||||
app = app.app
|
||||
expected = \
|
||||
swift.common.middleware.versioned_writes.VersionedWritesMiddleware
|
||||
self.assert_(isinstance(app, expected))
|
||||
|
||||
app = app.app
|
||||
expected = swift.proxy.server.Application
|
||||
self.assertTrue(isinstance(app, expected))
|
||||
@ -1414,6 +1419,7 @@ class TestPipelineModification(unittest.TestCase):
|
||||
['swift.common.middleware.catch_errors',
|
||||
'swift.common.middleware.gatekeeper',
|
||||
'swift.common.middleware.dlo',
|
||||
'swift.common.middleware.versioned_writes',
|
||||
'swift.proxy.server'])
|
||||
|
||||
def test_proxy_modify_wsgi_pipeline(self):
|
||||
@ -1444,6 +1450,7 @@ class TestPipelineModification(unittest.TestCase):
|
||||
['swift.common.middleware.catch_errors',
|
||||
'swift.common.middleware.gatekeeper',
|
||||
'swift.common.middleware.dlo',
|
||||
'swift.common.middleware.versioned_writes',
|
||||
'swift.common.middleware.healthcheck',
|
||||
'swift.proxy.server'])
|
||||
|
||||
@ -1541,6 +1548,7 @@ class TestPipelineModification(unittest.TestCase):
|
||||
'swift.common.middleware.catch_errors',
|
||||
'swift.common.middleware.gatekeeper',
|
||||
'swift.common.middleware.dlo',
|
||||
'swift.common.middleware.versioned_writes',
|
||||
'swift.common.middleware.healthcheck',
|
||||
'swift.proxy.server'])
|
||||
|
||||
@ -1554,6 +1562,7 @@ class TestPipelineModification(unittest.TestCase):
|
||||
'swift.common.middleware.healthcheck',
|
||||
'swift.common.middleware.catch_errors',
|
||||
'swift.common.middleware.dlo',
|
||||
'swift.common.middleware.versioned_writes',
|
||||
'swift.proxy.server'])
|
||||
|
||||
def test_catch_errors_gatekeeper_configured_not_at_start(self):
|
||||
@ -1566,6 +1575,7 @@ class TestPipelineModification(unittest.TestCase):
|
||||
'swift.common.middleware.catch_errors',
|
||||
'swift.common.middleware.gatekeeper',
|
||||
'swift.common.middleware.dlo',
|
||||
'swift.common.middleware.versioned_writes',
|
||||
'swift.proxy.server'])
|
||||
|
||||
@with_tempdir
|
||||
@ -1598,7 +1608,7 @@ class TestPipelineModification(unittest.TestCase):
|
||||
tempdir, policy.ring_name + '.ring.gz')
|
||||
|
||||
app = wsgi.loadapp(conf_path)
|
||||
proxy_app = app.app.app.app.app
|
||||
proxy_app = app.app.app.app.app.app
|
||||
self.assertEqual(proxy_app.account_ring.serialized_path,
|
||||
account_ring_path)
|
||||
self.assertEqual(proxy_app.container_ring.serialized_path,
|
||||
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue
Block a user