Expiring Objects Support
Please see the doc/source/overview_expiring_objects.rst for more detail. Change-Id: I4ab49e731248cf62ce10001016e0c819cc531738
This commit is contained in:
parent
74752c8d26
commit
872420efdb
24
bin/swift-object-expirer
Normal file
24
bin/swift-object-expirer
Normal file
@ -0,0 +1,24 @@
|
||||
#!/usr/bin/env python
|
||||
# Copyright (c) 2010-2011 OpenStack, LLC.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from swift.common.daemon import run_daemon
|
||||
from swift.common.utils import parse_options
|
||||
from swift.obj.expirer import ObjectExpirer
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
conf_file, options = parse_options(once=True)
|
||||
run_daemon(ObjectExpirer, conf_file, **options)
|
@ -45,6 +45,7 @@ Overview and Concepts
|
||||
ratelimit
|
||||
overview_large_objects
|
||||
overview_container_sync
|
||||
overview_expiring_objects
|
||||
|
||||
Developer Documentation
|
||||
=======================
|
||||
|
43
doc/source/overview_expiring_objects.rst
Normal file
43
doc/source/overview_expiring_objects.rst
Normal file
@ -0,0 +1,43 @@
|
||||
=======================
|
||||
Expiring Object Support
|
||||
=======================
|
||||
|
||||
The ``swift-object-expirer`` offers scheduled deletion of objects. The Swift client would use the ``X-Delete-At`` or ``X-Delete-After`` headers during an object ``PUT`` or ``POST`` and the cluster would automatically quit serving that object at the specified time and would shortly thereafter remove the object from the system.
|
||||
|
||||
The ``X-Delete-At`` header takes a Unix Epoch timestamp, in integer form; for example: ``1317070737`` represents ``Mon Sep 26 20:58:57 2011 UTC``.
|
||||
|
||||
The ``X-Delete-After`` header takes a integer number of seconds. The proxy server that receives the request will convert this header into an ``X-Delete-At`` header using its current time plus the value given.
|
||||
|
||||
As expiring objects are added to the system, the object servers will record the expirations in a hidden ``.expiring_objects`` account for the ``swift-object-expirer`` to handle later.
|
||||
|
||||
Just one instance of the ``swift-object-expirer`` daemon needs to run for a cluster. This isn't exactly automatic failover high availability, but if this daemon doesn't run for a few hours it should not be any real issue. The expired-but-not-yet-deleted objects will still ``404 Not Found`` if someone tries to ``GET`` or ``HEAD`` them and they'll just be deleted a bit later when the daemon is restarted.
|
||||
|
||||
The daemon uses the ``/etc/swift/object-expirer.conf`` by default, and here is a quick sample conf file::
|
||||
|
||||
[DEFAULT]
|
||||
# swift_dir = /etc/swift
|
||||
# user = swift
|
||||
# You can specify default log routing here if you want:
|
||||
# log_name = swift
|
||||
# log_facility = LOG_LOCAL0
|
||||
# log_level = INFO
|
||||
|
||||
[object-expirer]
|
||||
interval = 300
|
||||
|
||||
[pipeline:main]
|
||||
pipeline = catch_errors cache proxy-server
|
||||
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
# See proxy-server.conf-sample for options
|
||||
|
||||
[filter:cache]
|
||||
use = egg:swift#memcache
|
||||
# See proxy-server.conf-sample for options
|
||||
|
||||
[filter:catch_errors]
|
||||
use = egg:swift#catch_errors
|
||||
# See proxy-server.conf-sample for options
|
||||
|
||||
The daemon needs to run on a machine with access to all the backend servers in the cluster, but does not need proxy server or public access. The daemon will use its own internal proxy code instance to access the backend servers.
|
25
etc/object-expirer.conf-sample
Normal file
25
etc/object-expirer.conf-sample
Normal file
@ -0,0 +1,25 @@
|
||||
[DEFAULT]
|
||||
# swift_dir = /etc/swift
|
||||
# user = swift
|
||||
# You can specify default log routing here if you want:
|
||||
# log_name = swift
|
||||
# log_facility = LOG_LOCAL0
|
||||
# log_level = INFO
|
||||
|
||||
[object-expirer]
|
||||
# interval = 300
|
||||
|
||||
[pipeline:main]
|
||||
pipeline = catch_errors cache proxy-server
|
||||
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
# See proxy-server.conf-sample for options
|
||||
|
||||
[filter:cache]
|
||||
use = egg:swift#memcache
|
||||
# See proxy-server.conf-sample for options
|
||||
|
||||
[filter:catch_errors]
|
||||
use = egg:swift#catch_errors
|
||||
# See proxy-server.conf-sample for options
|
@ -7,6 +7,7 @@
|
||||
# swift_dir = /etc/swift
|
||||
# devices = /srv/node
|
||||
# mount_check = true
|
||||
# expiring_objects_container_divisor = 86400
|
||||
# You can specify default log routing here if you want:
|
||||
# log_name = swift
|
||||
# log_facility = LOG_LOCAL0
|
||||
@ -33,7 +34,7 @@ use = egg:swift#object
|
||||
# Comma separated list of headers that can be set in metadata on an object.
|
||||
# This list is in addition to X-Object-Meta-* headers and cannot include
|
||||
# Content-Type, etag, Content-Length, or deleted
|
||||
# allowed_headers = Content-Encoding, Content-Disposition, X-Object-Manifest
|
||||
# allowed_headers = Content-Disposition, Content-Encoding, X-Delete-At, X-Object-Manifest
|
||||
|
||||
[filter:recon]
|
||||
use = egg:swift#recon
|
||||
|
@ -7,6 +7,7 @@
|
||||
# user = swift
|
||||
# cert_file = /etc/swift/proxy.crt
|
||||
# key_file = /etc/swift/proxy.key
|
||||
# expiring_objects_container_divisor = 86400
|
||||
# You can specify default log routing here if you want:
|
||||
# log_name = swift
|
||||
# log_facility = LOG_LOCAL0
|
||||
|
2
setup.py
2
setup.py
@ -49,7 +49,7 @@ setup(
|
||||
'bin/swift-container-server', 'bin/swift-container-updater',
|
||||
'bin/swift-drive-audit', 'bin/swift-get-nodes',
|
||||
'bin/swift-init', 'bin/swift-object-auditor',
|
||||
'bin/swift-object-info',
|
||||
'bin/swift-object-expirer', 'bin/swift-object-info',
|
||||
'bin/swift-object-replicator',
|
||||
'bin/swift-object-server',
|
||||
'bin/swift-object-updater', 'bin/swift-proxy-server',
|
||||
|
@ -48,6 +48,8 @@ class AccountController(object):
|
||||
('true', 't', '1', 'on', 'yes', 'y')
|
||||
self.replicator_rpc = ReplicatorRpc(self.root, DATADIR, AccountBroker,
|
||||
self.mount_check, logger=self.logger)
|
||||
self.auto_create_account_prefix = \
|
||||
conf.get('auto_create_account_prefix') or '.'
|
||||
|
||||
def _get_account_broker(self, drive, part, account):
|
||||
hsh = hash_path(account)
|
||||
@ -88,6 +90,10 @@ class AccountController(object):
|
||||
if container: # put account container
|
||||
if 'x-trans-id' in req.headers:
|
||||
broker.pending_timeout = 3
|
||||
if account.startswith(self.auto_create_account_prefix) and \
|
||||
not os.path.exists(broker.db_file):
|
||||
broker.initialize(normalize_timestamp(
|
||||
req.headers.get('x-timestamp') or time.time()))
|
||||
if req.headers.get('x-account-override-deleted', 'no').lower() != \
|
||||
'yes' and broker.is_deleted():
|
||||
return HTTPNotFound(request=req)
|
||||
|
@ -793,11 +793,13 @@ def cache_from_env(env):
|
||||
return item_from_env(env, 'swift.cache')
|
||||
|
||||
|
||||
def readconf(conf, section_name=None, log_name=None, defaults=None, raw=False):
|
||||
def readconf(conffile, section_name=None, log_name=None, defaults=None,
|
||||
raw=False):
|
||||
"""
|
||||
Read config file and return config items as a dict
|
||||
|
||||
:param conf: path to config file, or a file-like object (hasattr readline)
|
||||
:param conffile: path to config file, or a file-like object (hasattr
|
||||
readline)
|
||||
:param section_name: config section to read (will return all sections if
|
||||
not defined)
|
||||
:param log_name: name to be used with logging (will use section_name if
|
||||
@ -811,18 +813,18 @@ def readconf(conf, section_name=None, log_name=None, defaults=None, raw=False):
|
||||
c = RawConfigParser(defaults)
|
||||
else:
|
||||
c = ConfigParser(defaults)
|
||||
if hasattr(conf, 'readline'):
|
||||
c.readfp(conf)
|
||||
if hasattr(conffile, 'readline'):
|
||||
c.readfp(conffile)
|
||||
else:
|
||||
if not c.read(conf):
|
||||
print _("Unable to read config file %s") % conf
|
||||
if not c.read(conffile):
|
||||
print _("Unable to read config file %s") % conffile
|
||||
sys.exit(1)
|
||||
if section_name:
|
||||
if c.has_section(section_name):
|
||||
conf = dict(c.items(section_name))
|
||||
else:
|
||||
print _("Unable to find %s config section in %s") % \
|
||||
(section_name, conf)
|
||||
(section_name, conffile)
|
||||
sys.exit(1)
|
||||
if "log_name" not in conf:
|
||||
if log_name is not None:
|
||||
@ -835,6 +837,7 @@ def readconf(conf, section_name=None, log_name=None, defaults=None, raw=False):
|
||||
conf.update({s: dict(c.items(s))})
|
||||
if 'log_name' not in conf:
|
||||
conf['log_name'] = log_name
|
||||
conf['__file__'] = conffile
|
||||
return conf
|
||||
|
||||
|
||||
|
@ -204,6 +204,7 @@ def make_pre_authed_request(env, method, path, body=None, headers=None,
|
||||
:param headers: Extra HTTP headers of new request; None by default
|
||||
|
||||
:returns: webob.Request object
|
||||
|
||||
(Stolen from Swauth: https://github.com/gholt/swauth)
|
||||
"""
|
||||
newenv = {'REQUEST_METHOD': method, 'HTTP_USER_AGENT': agent}
|
||||
|
@ -61,6 +61,8 @@ class ContainerController(object):
|
||||
if h.strip()]
|
||||
self.replicator_rpc = ReplicatorRpc(self.root, DATADIR,
|
||||
ContainerBroker, self.mount_check, logger=self.logger)
|
||||
self.auto_create_account_prefix = \
|
||||
conf.get('auto_create_account_prefix') or '.'
|
||||
|
||||
def _get_container_broker(self, drive, part, account, container):
|
||||
"""
|
||||
@ -145,6 +147,10 @@ class ContainerController(object):
|
||||
if self.mount_check and not check_mount(self.root, drive):
|
||||
return Response(status='507 %s is not mounted' % drive)
|
||||
broker = self._get_container_broker(drive, part, account, container)
|
||||
if account.startswith(self.auto_create_account_prefix) and obj and \
|
||||
not os.path.exists(broker.db_file):
|
||||
broker.initialize(normalize_timestamp(
|
||||
req.headers.get('x-timestamp') or time.time()))
|
||||
if not os.path.exists(broker.db_file):
|
||||
return HTTPNotFound()
|
||||
if obj: # delete object
|
||||
@ -188,6 +194,9 @@ class ContainerController(object):
|
||||
timestamp = normalize_timestamp(req.headers['x-timestamp'])
|
||||
broker = self._get_container_broker(drive, part, account, container)
|
||||
if obj: # put container object
|
||||
if account.startswith(self.auto_create_account_prefix) and \
|
||||
not os.path.exists(broker.db_file):
|
||||
broker.initialize(timestamp)
|
||||
if not os.path.exists(broker.db_file):
|
||||
return HTTPNotFound()
|
||||
broker.put_object(obj, timestamp, int(req.headers['x-size']),
|
||||
|
246
swift/obj/expirer.py
Normal file
246
swift/obj/expirer.py
Normal file
@ -0,0 +1,246 @@
|
||||
# Copyright (c) 2010-2011 OpenStack, LLC.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from random import random
|
||||
from sys import exc_info
|
||||
from time import time
|
||||
from urllib import quote
|
||||
|
||||
from eventlet import sleep, TimeoutError
|
||||
from paste.deploy import loadapp
|
||||
from webob import Request
|
||||
|
||||
from swift.common.daemon import Daemon
|
||||
from swift.common.utils import get_logger
|
||||
|
||||
try:
|
||||
import simplejson as json
|
||||
except ImportError:
|
||||
import json
|
||||
|
||||
|
||||
class ObjectExpirer(Daemon):
|
||||
"""
|
||||
Daemon that queries the internal hidden expiring_objects_account to
|
||||
discover objects that need to be deleted.
|
||||
|
||||
:param conf: The daemon configuration.
|
||||
"""
|
||||
|
||||
def __init__(self, conf):
|
||||
self.conf = conf
|
||||
self.logger = get_logger(conf, log_route='object-expirer')
|
||||
self.interval = int(conf.get('interval') or 300)
|
||||
self.expiring_objects_account = \
|
||||
(conf.get('auto_create_account_prefix') or '.') + \
|
||||
'expiring_objects'
|
||||
self.retries = int(conf.get('retries') or 3)
|
||||
self.app = loadapp('config:' + (conf.get('__file__') or
|
||||
'/etc/swift/object-expirer.conf'))
|
||||
self.report_interval = int(conf.get('report_interval') or 300)
|
||||
self.report_first_time = self.report_last_time = time()
|
||||
self.report_objects = 0
|
||||
|
||||
def report(self, final=False):
|
||||
"""
|
||||
Emits a log line report of the progress so far, or the final progress
|
||||
is final=True.
|
||||
|
||||
:param final: Set to True for the last report once the expiration pass
|
||||
has completed.
|
||||
"""
|
||||
if final:
|
||||
elapsed = time() - self.report_first_time
|
||||
self.logger.info(_('Pass completed in %ds; %d objects expired') %
|
||||
(elapsed, self.report_objects))
|
||||
elif time() - self.report_last_time >= self.report_interval:
|
||||
elapsed = time() - self.report_first_time
|
||||
self.logger.info(_('Pass so far %ds; %d objects expired') %
|
||||
(elapsed, self.report_objects))
|
||||
self.report_last_time = time()
|
||||
|
||||
def run_once(self, *args, **kwargs):
|
||||
"""
|
||||
Executes a single pass, looking for objects to expire.
|
||||
|
||||
:param args: Extra args to fulfill the Daemon interface; this daemon
|
||||
has no additional args.
|
||||
:param kwargs: Extra keyword args to fulfill the Daemon interface; this
|
||||
daemon has no additional keyword args.
|
||||
"""
|
||||
self.report_first_time = self.report_last_time = time()
|
||||
self.report_objects = 0
|
||||
try:
|
||||
self.logger.debug(_('Run begin'))
|
||||
self.logger.info(_('Pass beginning; %s possible containers; %s '
|
||||
'possible objects') % self.get_account_info())
|
||||
for container in self.iter_containers():
|
||||
timestamp = int(container)
|
||||
if timestamp > int(time()):
|
||||
break
|
||||
for obj in self.iter_objects(container):
|
||||
timestamp, actual_obj = obj.split('-', 1)
|
||||
timestamp = int(timestamp)
|
||||
if timestamp > int(time()):
|
||||
break
|
||||
try:
|
||||
self.delete_actual_object(actual_obj, timestamp)
|
||||
self.delete_object(container, obj)
|
||||
self.report_objects += 1
|
||||
except (Exception, TimeoutError), err:
|
||||
self.logger.exception(
|
||||
_('Exception while deleting object %s %s %s') %
|
||||
(container, obj, str(err)))
|
||||
self.report()
|
||||
try:
|
||||
self.delete_container(container)
|
||||
except (Exception, TimeoutError), err:
|
||||
self.logger.exception(
|
||||
_('Exception while deleting container %s %s') %
|
||||
(container, str(err)))
|
||||
self.logger.debug(_('Run end'))
|
||||
self.report(final=True)
|
||||
except (Exception, TimeoutError):
|
||||
self.logger.exception(_('Unhandled exception'))
|
||||
|
||||
def run_forever(self, *args, **kwargs):
|
||||
"""
|
||||
Executes passes forever, looking for objects to expire.
|
||||
|
||||
:param args: Extra args to fulfill the Daemon interface; this daemon
|
||||
has no additional args.
|
||||
:param kwargs: Extra keyword args to fulfill the Daemon interface; this
|
||||
daemon has no additional keyword args.
|
||||
"""
|
||||
sleep(random() * self.interval)
|
||||
while True:
|
||||
begin = time()
|
||||
try:
|
||||
self.run_once()
|
||||
except (Exception, TimeoutError):
|
||||
self.logger.exception(_('Unhandled exception'))
|
||||
elapsed = time() - begin
|
||||
if elapsed < self.interval:
|
||||
sleep(random() * (self.interval - elapsed))
|
||||
|
||||
def get_response(self, method, path, headers, acceptable_statuses):
|
||||
headers['user-agent'] = 'Swift Object Expirer'
|
||||
resp = exc_type = exc_value = exc_traceback = None
|
||||
for attempt in xrange(self.retries):
|
||||
req = Request.blank(path, environ={'REQUEST_METHOD': method},
|
||||
headers=headers)
|
||||
try:
|
||||
resp = req.get_response(self.app)
|
||||
if resp.status_int in acceptable_statuses or \
|
||||
resp.status_int // 100 in acceptable_statuses:
|
||||
return resp
|
||||
except (Exception, TimeoutError):
|
||||
exc_type, exc_value, exc_traceback = exc_info()
|
||||
sleep(2 ** (attempt + 1))
|
||||
if resp:
|
||||
raise Exception(_('Unexpected response %s') % (resp.status,))
|
||||
if exc_type:
|
||||
# To make pep8 tool happy, in place of raise t, v, tb:
|
||||
raise exc_type(*exc_value.args), None, exc_traceback
|
||||
|
||||
def get_account_info(self):
|
||||
"""
|
||||
Returns (container_count, object_count) tuple indicating the values for
|
||||
the hidden expiration account.
|
||||
"""
|
||||
resp = self.get_response('HEAD',
|
||||
'/v1/' + quote(self.expiring_objects_account), {}, (2, 404))
|
||||
if resp.status_int == 404:
|
||||
return (0, 0)
|
||||
return (int(resp.headers['x-account-container-count']),
|
||||
int(resp.headers['x-account-object-count']))
|
||||
|
||||
def iter_containers(self):
|
||||
"""
|
||||
Returns an iterator of container names of the hidden expiration account
|
||||
listing.
|
||||
"""
|
||||
path = '/v1/%s?format=json' % (quote(self.expiring_objects_account),)
|
||||
marker = ''
|
||||
while True:
|
||||
resp = self.get_response('GET', path + '&marker=' + quote(marker),
|
||||
{}, (2, 404))
|
||||
if resp.status_int in (204, 404):
|
||||
break
|
||||
data = json.loads(resp.body)
|
||||
if not data:
|
||||
break
|
||||
for item in data:
|
||||
yield item['name']
|
||||
marker = data[-1]['name']
|
||||
|
||||
def iter_objects(self, container):
|
||||
"""
|
||||
Returns an iterator of object names of the hidden expiration account's
|
||||
container listing for the container name given.
|
||||
|
||||
:param container: The name of the container to list.
|
||||
"""
|
||||
path = '/v1/%s/%s?format=json' % \
|
||||
(quote(self.expiring_objects_account), quote(container))
|
||||
marker = ''
|
||||
while True:
|
||||
resp = self.get_response('GET', path + '&' + quote(marker),
|
||||
{}, (2, 404))
|
||||
if resp.status_int in (204, 404):
|
||||
break
|
||||
data = json.loads(resp.body)
|
||||
if not data:
|
||||
break
|
||||
for item in data:
|
||||
yield item['name']
|
||||
marker = data[-1]['name']
|
||||
|
||||
def delete_actual_object(self, actual_obj, timestamp):
|
||||
"""
|
||||
Deletes the end-user object indicated by the actual object name given
|
||||
'<account>/<container>/<object>' if and only if the X-Delete-At value
|
||||
of the object is exactly the timestamp given.
|
||||
|
||||
:param actual_obj: The name of the end-user object to delete:
|
||||
'<account>/<container>/<object>'
|
||||
:param timestamp: The timestamp the X-Delete-At value must match to
|
||||
perform the actual delete.
|
||||
"""
|
||||
self.get_response('DELETE', '/v1/%s' % (quote(actual_obj),),
|
||||
{'X-If-Delete-At': str(timestamp)}, (2, 404, 412))
|
||||
|
||||
def delete_object(self, container, obj):
|
||||
"""
|
||||
Deletes an object from the hidden expiring object account.
|
||||
|
||||
:param container: The name of the container for the object.
|
||||
:param obj: The name of the object to delete.
|
||||
"""
|
||||
self.get_response('DELETE',
|
||||
'/v1/%s/%s/%s' % (quote(self.expiring_objects_account),
|
||||
quote(container), quote(obj)),
|
||||
{}, (2, 404))
|
||||
|
||||
def delete_container(self, container):
|
||||
"""
|
||||
Deletes a container from the hidden expiring object account.
|
||||
|
||||
:param container: The name of the container to delete.
|
||||
"""
|
||||
self.get_response('DELETE',
|
||||
'/v1/%s/%s' % (quote(self.expiring_objects_account),
|
||||
quote(container)),
|
||||
{}, (2, 404, 409))
|
@ -362,12 +362,68 @@ class ObjectController(object):
|
||||
self.max_upload_time = int(conf.get('max_upload_time', 86400))
|
||||
self.slow = int(conf.get('slow', 0))
|
||||
self.bytes_per_sync = int(conf.get('mb_per_sync', 512)) * 1024 * 1024
|
||||
default_allowed_headers = 'content-encoding, x-object-manifest, ' \
|
||||
'content-disposition'
|
||||
default_allowed_headers = '''
|
||||
content-disposition,
|
||||
content-encoding,
|
||||
x-delete-at,
|
||||
x-object-manifest,
|
||||
'''
|
||||
self.allowed_headers = set(i.strip().lower() for i in \
|
||||
conf.get('allowed_headers', \
|
||||
default_allowed_headers).split(',') if i.strip() and \
|
||||
i.strip().lower() not in DISALLOWED_HEADERS)
|
||||
self.expiring_objects_account = \
|
||||
(conf.get('auto_create_account_prefix') or '.') + \
|
||||
'expiring_objects'
|
||||
self.expiring_objects_container_divisor = \
|
||||
int(conf.get('expiring_objects_container_divisor') or 86400)
|
||||
|
||||
def async_update(self, op, account, container, obj, host, partition,
|
||||
contdevice, headers_out, objdevice):
|
||||
"""
|
||||
Sends or saves an async update.
|
||||
|
||||
:param op: operation performed (ex: 'PUT', or 'DELETE')
|
||||
:param account: account name for the object
|
||||
:param container: container name for the object
|
||||
:param obj: object name
|
||||
:param host: host that the container is on
|
||||
:param partition: partition that the container is on
|
||||
:param contdevice: device name that the container is on
|
||||
:param headers_out: dictionary of headers to send in the container
|
||||
request
|
||||
:param objdevice: device name that the object is in
|
||||
"""
|
||||
full_path = '/%s/%s/%s' % (account, container, obj)
|
||||
if all([host, partition, contdevice]):
|
||||
try:
|
||||
with ConnectionTimeout(self.conn_timeout):
|
||||
ip, port = host.rsplit(':', 1)
|
||||
conn = http_connect(ip, port, contdevice, partition, op,
|
||||
full_path, headers_out)
|
||||
with Timeout(self.node_timeout):
|
||||
response = conn.getresponse()
|
||||
response.read()
|
||||
if 200 <= response.status < 300:
|
||||
return
|
||||
else:
|
||||
self.logger.error(_('ERROR Container update failed '
|
||||
'(saving for async update later): %(status)d '
|
||||
'response from %(ip)s:%(port)s/%(dev)s'),
|
||||
{'status': response.status, 'ip': ip, 'port': port,
|
||||
'dev': contdevice})
|
||||
except (Exception, TimeoutError):
|
||||
self.logger.exception(_('ERROR container update failed with '
|
||||
'%(ip)s:%(port)s/%(dev)s (saving for async update later)'),
|
||||
{'ip': ip, 'port': port, 'dev': contdevice})
|
||||
async_dir = os.path.join(self.devices, objdevice, ASYNCDIR)
|
||||
ohash = hash_path(account, container, obj)
|
||||
write_pickle(
|
||||
{'op': op, 'account': account, 'container': container,
|
||||
'obj': obj, 'headers': headers_out},
|
||||
os.path.join(async_dir, ohash[-3:], ohash + '-' +
|
||||
normalize_timestamp(headers_out['x-timestamp'])),
|
||||
os.path.join(self.devices, objdevice, 'tmp'))
|
||||
|
||||
def container_update(self, op, account, container, obj, headers_in,
|
||||
headers_out, objdevice):
|
||||
@ -388,35 +444,36 @@ class ObjectController(object):
|
||||
contdevice = headers_in.get('X-Container-Device', None)
|
||||
if not all([host, partition, contdevice]):
|
||||
return
|
||||
full_path = '/%s/%s/%s' % (account, container, obj)
|
||||
try:
|
||||
with ConnectionTimeout(self.conn_timeout):
|
||||
ip, port = host.rsplit(':', 1)
|
||||
conn = http_connect(ip, port, contdevice, partition, op,
|
||||
full_path, headers_out)
|
||||
with Timeout(self.node_timeout):
|
||||
response = conn.getresponse()
|
||||
response.read()
|
||||
if 200 <= response.status < 300:
|
||||
return
|
||||
else:
|
||||
self.logger.error(_('ERROR Container update failed '
|
||||
'(saving for async update later): %(status)d '
|
||||
'response from %(ip)s:%(port)s/%(dev)s'),
|
||||
{'status': response.status, 'ip': ip, 'port': port,
|
||||
'dev': contdevice})
|
||||
except (Exception, TimeoutError):
|
||||
self.logger.exception(_('ERROR container update failed with '
|
||||
'%(ip)s:%(port)s/%(dev)s (saving for async update later)'),
|
||||
{'ip': ip, 'port': port, 'dev': contdevice})
|
||||
async_dir = os.path.join(self.devices, objdevice, ASYNCDIR)
|
||||
ohash = hash_path(account, container, obj)
|
||||
write_pickle(
|
||||
{'op': op, 'account': account, 'container': container,
|
||||
'obj': obj, 'headers': headers_out},
|
||||
os.path.join(async_dir, ohash[-3:], ohash + '-' +
|
||||
normalize_timestamp(headers_out['x-timestamp'])),
|
||||
os.path.join(self.devices, objdevice, 'tmp'))
|
||||
self.async_update(op, account, container, obj, host, partition,
|
||||
contdevice, headers_out, objdevice)
|
||||
|
||||
def delete_at_update(self, op, delete_at, account, container, obj,
|
||||
headers_in, objdevice):
|
||||
"""
|
||||
Update the expiring objects container when objects are updated.
|
||||
|
||||
:param op: operation performed (ex: 'PUT', or 'DELETE')
|
||||
:param account: account name for the object
|
||||
:param container: container name for the object
|
||||
:param obj: object name
|
||||
:param headers_in: dictionary of headers from the original request
|
||||
:param objdevice: device name that the object is in
|
||||
"""
|
||||
host = partition = contdevice = None
|
||||
headers_out = {'x-timestamp': headers_in['x-timestamp'],
|
||||
'x-trans-id': headers_in.get('x-trans-id', '-')}
|
||||
if op != 'DELETE':
|
||||
host = headers_in.get('X-Delete-At-Host', None)
|
||||
partition = headers_in.get('X-Delete-At-Partition', None)
|
||||
contdevice = headers_in.get('X-Delete-At-Device', None)
|
||||
headers_out['x-size'] = '0'
|
||||
headers_out['x-content-type'] = 'text/plain'
|
||||
headers_out['x-etag'] = 'd41d8cd98f00b204e9800998ecf8427e'
|
||||
self.async_update(op, self.expiring_objects_account,
|
||||
str(delete_at / self.expiring_objects_container_divisor *
|
||||
self.expiring_objects_container_divisor),
|
||||
'%s-%s/%s/%s' % (delete_at, account, container, obj),
|
||||
host, partition, contdevice, headers_out, objdevice)
|
||||
|
||||
def POST(self, request):
|
||||
"""Handle HTTP POST requests for the Swift Object Server."""
|
||||
@ -430,11 +487,18 @@ class ObjectController(object):
|
||||
not check_float(request.headers['x-timestamp']):
|
||||
return HTTPBadRequest(body='Missing timestamp', request=request,
|
||||
content_type='text/plain')
|
||||
new_delete_at = int(request.headers.get('X-Delete-At') or 0)
|
||||
if new_delete_at and new_delete_at < time.time():
|
||||
return HTTPBadRequest(body='X-Delete-At in past', request=request,
|
||||
content_type='text/plain')
|
||||
if self.mount_check and not check_mount(self.devices, device):
|
||||
return Response(status='507 %s is not mounted' % device)
|
||||
file = DiskFile(self.devices, device, partition, account, container,
|
||||
obj, self.logger, disk_chunk_size=self.disk_chunk_size)
|
||||
|
||||
if 'X-Delete-At' in file.metadata and \
|
||||
int(file.metadata['X-Delete-At']) <= time.time():
|
||||
return HTTPNotFound(request=request)
|
||||
if file.is_deleted():
|
||||
response_class = HTTPNotFound
|
||||
else:
|
||||
@ -451,6 +515,14 @@ class ObjectController(object):
|
||||
if header_key in request.headers:
|
||||
header_caps = header_key.title()
|
||||
metadata[header_caps] = request.headers[header_key]
|
||||
old_delete_at = int(file.metadata.get('X-Delete-At') or 0)
|
||||
if old_delete_at != new_delete_at:
|
||||
if new_delete_at:
|
||||
self.delete_at_update('PUT', new_delete_at, account, container,
|
||||
obj, request.headers, device)
|
||||
if old_delete_at:
|
||||
self.delete_at_update('DELETE', old_delete_at, account,
|
||||
container, obj, request.headers, device)
|
||||
with file.mkstemp() as (fd, tmppath):
|
||||
file.put(fd, tmppath, metadata, extension='.meta')
|
||||
return response_class(request=request)
|
||||
@ -472,6 +544,10 @@ class ObjectController(object):
|
||||
error_response = check_object_creation(request, obj)
|
||||
if error_response:
|
||||
return error_response
|
||||
new_delete_at = int(request.headers.get('X-Delete-At') or 0)
|
||||
if new_delete_at and new_delete_at < time.time():
|
||||
return HTTPBadRequest(body='X-Delete-At in past', request=request,
|
||||
content_type='text/plain')
|
||||
file = DiskFile(self.devices, device, partition, account, container,
|
||||
obj, self.logger, disk_chunk_size=self.disk_chunk_size)
|
||||
orig_timestamp = file.metadata.get('X-Timestamp')
|
||||
@ -517,6 +593,14 @@ class ObjectController(object):
|
||||
if header_key in request.headers:
|
||||
header_caps = header_key.title()
|
||||
metadata[header_caps] = request.headers[header_key]
|
||||
old_delete_at = int(file.metadata.get('X-Delete-At') or 0)
|
||||
if old_delete_at != new_delete_at:
|
||||
if new_delete_at:
|
||||
self.delete_at_update('PUT', new_delete_at, account,
|
||||
container, obj, request.headers, device)
|
||||
if old_delete_at:
|
||||
self.delete_at_update('DELETE', old_delete_at, account,
|
||||
container, obj, request.headers, device)
|
||||
file.put(fd, tmppath, metadata)
|
||||
file.unlinkold(metadata['X-Timestamp'])
|
||||
if not orig_timestamp or \
|
||||
@ -545,7 +629,8 @@ class ObjectController(object):
|
||||
file = DiskFile(self.devices, device, partition, account, container,
|
||||
obj, self.logger, keep_data_fp=True,
|
||||
disk_chunk_size=self.disk_chunk_size)
|
||||
if file.is_deleted():
|
||||
if file.is_deleted() or ('X-Delete-At' in file.metadata and
|
||||
int(file.metadata['X-Delete-At']) <= time.time()):
|
||||
if request.headers.get('if-match') == '*':
|
||||
return HTTPPreconditionFailed(request=request)
|
||||
else:
|
||||
@ -619,7 +704,8 @@ class ObjectController(object):
|
||||
return Response(status='507 %s is not mounted' % device)
|
||||
file = DiskFile(self.devices, device, partition, account, container,
|
||||
obj, self.logger, disk_chunk_size=self.disk_chunk_size)
|
||||
if file.is_deleted():
|
||||
if file.is_deleted() or ('X-Delete-At' in file.metadata and
|
||||
int(file.metadata['X-Delete-At']) <= time.time()):
|
||||
return HTTPNotFound(request=request)
|
||||
try:
|
||||
file_size = file.get_data_file_size()
|
||||
@ -660,6 +746,11 @@ class ObjectController(object):
|
||||
response_class = HTTPNoContent
|
||||
file = DiskFile(self.devices, device, partition, account, container,
|
||||
obj, self.logger, disk_chunk_size=self.disk_chunk_size)
|
||||
if 'x-if-delete-at' in request.headers and \
|
||||
int(request.headers['x-if-delete-at']) != \
|
||||
int(file.metadata.get('X-Delete-At') or 0):
|
||||
return HTTPPreconditionFailed(request=request,
|
||||
body='X-If-Delete-At and X-Delete-At do not match')
|
||||
orig_timestamp = file.metadata.get('X-Timestamp')
|
||||
if file.is_deleted():
|
||||
response_class = HTTPNotFound
|
||||
@ -667,6 +758,10 @@ class ObjectController(object):
|
||||
'X-Timestamp': request.headers['X-Timestamp'], 'deleted': True,
|
||||
}
|
||||
with file.mkstemp() as (fd, tmppath):
|
||||
old_delete_at = int(file.metadata.get('X-Delete-At') or 0)
|
||||
if old_delete_at:
|
||||
self.delete_at_update('DELETE', old_delete_at, account,
|
||||
container, obj, request.headers, device)
|
||||
file.put(fd, tmppath, metadata, extension='.ts')
|
||||
file.unlinkold(metadata['X-Timestamp'])
|
||||
if not orig_timestamp or \
|
||||
|
@ -64,7 +64,8 @@ def update_headers(response, headers):
|
||||
if name == 'etag':
|
||||
response.headers[name] = value.replace('"', '')
|
||||
elif name not in ('date', 'content-length', 'content-type',
|
||||
'connection', 'x-timestamp', 'x-put-timestamp'):
|
||||
'connection', 'x-timestamp', 'x-put-timestamp',
|
||||
'x-delete-after'):
|
||||
response.headers[name] = value
|
||||
|
||||
|
||||
@ -882,6 +883,14 @@ class ObjectController(Controller):
|
||||
@delay_denial
|
||||
def POST(self, req):
|
||||
"""HTTP POST request handler."""
|
||||
if 'x-delete-after' in req.headers:
|
||||
try:
|
||||
x_delete_after = int(req.headers['x-delete-after'])
|
||||
except ValueError:
|
||||
return HTTPBadRequest(request=req,
|
||||
content_type='text/plain',
|
||||
body='Non-integer X-Delete-After')
|
||||
req.headers['x-delete-at'] = '%d' % (time.time() + x_delete_after)
|
||||
if self.app.object_post_as_copy:
|
||||
req.method = 'PUT'
|
||||
req.path_info = '/%s/%s/%s' % (self.account_name,
|
||||
@ -910,6 +919,24 @@ class ObjectController(Controller):
|
||||
return aresp
|
||||
if not containers:
|
||||
return HTTPNotFound(request=req)
|
||||
if 'x-delete-at' in req.headers:
|
||||
try:
|
||||
x_delete_at = int(req.headers['x-delete-at'])
|
||||
if x_delete_at < time.time():
|
||||
return HTTPBadRequest(body='X-Delete-At in past',
|
||||
request=req, content_type='text/plain')
|
||||
except ValueError:
|
||||
return HTTPBadRequest(request=req,
|
||||
content_type='text/plain',
|
||||
body='Non-integer X-Delete-At')
|
||||
delete_at_container = str(x_delete_at /
|
||||
self.app.expiring_objects_container_divisor *
|
||||
self.app.expiring_objects_container_divisor)
|
||||
delete_at_part, delete_at_nodes = \
|
||||
self.app.container_ring.get_nodes(
|
||||
self.app.expiring_objects_account, delete_at_container)
|
||||
else:
|
||||
delete_at_part = delete_at_nodes = None
|
||||
partition, nodes = self.app.object_ring.get_nodes(
|
||||
self.account_name, self.container_name, self.object_name)
|
||||
req.headers['X-Timestamp'] = normalize_timestamp(time.time())
|
||||
@ -919,6 +946,11 @@ class ObjectController(Controller):
|
||||
nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container
|
||||
nheaders['X-Container-Partition'] = container_partition
|
||||
nheaders['X-Container-Device'] = container['device']
|
||||
if delete_at_nodes:
|
||||
node = delete_at_nodes.pop(0)
|
||||
nheaders['X-Delete-At-Host'] = '%(ip)s:%(port)s' % node
|
||||
nheaders['X-Delete-At-Partition'] = delete_at_part
|
||||
nheaders['X-Delete-At-Device'] = node['device']
|
||||
headers.append(nheaders)
|
||||
return self.make_requests(req, self.app.object_ring,
|
||||
partition, 'POST', req.path_info, headers)
|
||||
@ -969,6 +1001,31 @@ class ObjectController(Controller):
|
||||
return aresp
|
||||
if not containers:
|
||||
return HTTPNotFound(request=req)
|
||||
if 'x-delete-after' in req.headers:
|
||||
try:
|
||||
x_delete_after = int(req.headers['x-delete-after'])
|
||||
except ValueError:
|
||||
return HTTPBadRequest(request=req,
|
||||
content_type='text/plain',
|
||||
body='Non-integer X-Delete-After')
|
||||
req.headers['x-delete-at'] = '%d' % (time.time() + x_delete_after)
|
||||
if 'x-delete-at' in req.headers:
|
||||
try:
|
||||
x_delete_at = int(req.headers['x-delete-at'])
|
||||
if x_delete_at < time.time():
|
||||
return HTTPBadRequest(body='X-Delete-At in past',
|
||||
request=req, content_type='text/plain')
|
||||
except ValueError:
|
||||
return HTTPBadRequest(request=req, content_type='text/plain',
|
||||
body='Non-integer X-Delete-At')
|
||||
delete_at_container = str(x_delete_at /
|
||||
self.app.expiring_objects_container_divisor *
|
||||
self.app.expiring_objects_container_divisor)
|
||||
delete_at_part, delete_at_nodes = \
|
||||
self.app.container_ring.get_nodes(
|
||||
self.app.expiring_objects_account, delete_at_container)
|
||||
else:
|
||||
delete_at_part = delete_at_nodes = None
|
||||
partition, nodes = self.app.object_ring.get_nodes(
|
||||
self.account_name, self.container_name, self.object_name)
|
||||
# Used by container sync feature
|
||||
@ -1064,6 +1121,11 @@ class ObjectController(Controller):
|
||||
nheaders['X-Container-Partition'] = container_partition
|
||||
nheaders['X-Container-Device'] = container['device']
|
||||
nheaders['Expect'] = '100-continue'
|
||||
if delete_at_nodes:
|
||||
node = delete_at_nodes.pop(0)
|
||||
nheaders['X-Delete-At-Host'] = '%(ip)s:%(port)s' % node
|
||||
nheaders['X-Delete-At-Partition'] = delete_at_part
|
||||
nheaders['X-Delete-At-Device'] = node['device']
|
||||
pile.spawn(self._connect_put_node, node_iter, partition,
|
||||
req.path_info, nheaders)
|
||||
conns = [conn for conn in pile if conn]
|
||||
@ -1549,6 +1611,11 @@ class BaseApplication(object):
|
||||
[os.path.join(swift_dir, 'mime.types')])
|
||||
self.account_autocreate = \
|
||||
conf.get('account_autocreate', 'no').lower() in TRUE_VALUES
|
||||
self.expiring_objects_account = \
|
||||
(conf.get('auto_create_account_prefix') or '.') + \
|
||||
'expiring_objects'
|
||||
self.expiring_objects_container_divisor = \
|
||||
int(conf.get('expiring_objects_container_divisor') or 86400)
|
||||
|
||||
def get_controller(self, path):
|
||||
"""
|
||||
|
@ -959,6 +959,24 @@ class TestAccountController(unittest.TestCase):
|
||||
resp = self.controller.GET(req)
|
||||
self.assert_(resp.status_int in (204, 412), resp.status_int)
|
||||
|
||||
def test_put_auto_create(self):
|
||||
headers = {'x-put-timestamp': normalize_timestamp(1),
|
||||
'x-delete-timestamp': normalize_timestamp(0),
|
||||
'x-object-count': '0',
|
||||
'x-bytes-used': '0'}
|
||||
|
||||
resp = self.controller.PUT(Request.blank('/sda1/p/a/c',
|
||||
environ={'REQUEST_METHOD': 'PUT'}, headers=dict(headers)))
|
||||
self.assertEquals(resp.status_int, 404)
|
||||
|
||||
resp = self.controller.PUT(Request.blank('/sda1/p/.a/c',
|
||||
environ={'REQUEST_METHOD': 'PUT'}, headers=dict(headers)))
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
|
||||
resp = self.controller.PUT(Request.blank('/sda1/p/a/.c',
|
||||
environ={'REQUEST_METHOD': 'PUT'}, headers=dict(headers)))
|
||||
self.assertEquals(resp.status_int, 404)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -512,25 +512,33 @@ log_name = yarr'''
|
||||
# setup a file stream
|
||||
make_fp = lambda: StringIO(conf)
|
||||
for conf_object_maker in (make_filename, make_fp):
|
||||
result = utils.readconf(conf_object_maker())
|
||||
expected = {'log_name': None,
|
||||
conffile = conf_object_maker()
|
||||
result = utils.readconf(conffile)
|
||||
expected = {'__file__': conffile,
|
||||
'log_name': None,
|
||||
'section1': {'foo': 'bar'},
|
||||
'section2': {'log_name': 'yarr'}}
|
||||
self.assertEquals(result, expected)
|
||||
result = utils.readconf(conf_object_maker(), 'section1')
|
||||
expected = {'log_name': 'section1', 'foo': 'bar'}
|
||||
conffile = conf_object_maker()
|
||||
result = utils.readconf(conffile, 'section1')
|
||||
expected = {'__file__': conffile, 'log_name': 'section1',
|
||||
'foo': 'bar'}
|
||||
self.assertEquals(result, expected)
|
||||
result = utils.readconf(conf_object_maker(),
|
||||
conffile = conf_object_maker()
|
||||
result = utils.readconf(conffile,
|
||||
'section2').get('log_name')
|
||||
expected = 'yarr'
|
||||
self.assertEquals(result, expected)
|
||||
result = utils.readconf(conf_object_maker(), 'section1',
|
||||
conffile = conf_object_maker()
|
||||
result = utils.readconf(conffile, 'section1',
|
||||
log_name='foo').get('log_name')
|
||||
expected = 'foo'
|
||||
self.assertEquals(result, expected)
|
||||
result = utils.readconf(conf_object_maker(), 'section1',
|
||||
conffile = conf_object_maker()
|
||||
result = utils.readconf(conffile, 'section1',
|
||||
defaults={'bar': 'baz'})
|
||||
expected = {'log_name': 'section1', 'foo': 'bar', 'bar': 'baz'}
|
||||
expected = {'__file__': conffile, 'log_name': 'section1',
|
||||
'foo': 'bar', 'bar': 'baz'}
|
||||
self.assertEquals(result, expected)
|
||||
self.assertRaises(SystemExit, utils.readconf, '/tmp/test', 'section3')
|
||||
os.unlink('/tmp/test')
|
||||
@ -549,8 +557,10 @@ log_name = %(yarr)s'''
|
||||
# setup a file stream
|
||||
make_fp = lambda: StringIO(conf)
|
||||
for conf_object_maker in (make_filename, make_fp):
|
||||
result = utils.readconf(conf_object_maker(), raw=True)
|
||||
expected = {'log_name': None,
|
||||
conffile = conf_object_maker()
|
||||
result = utils.readconf(conffile, raw=True)
|
||||
expected = {'__file__': conffile,
|
||||
'log_name': None,
|
||||
'section1': {'foo': 'bar'},
|
||||
'section2': {'log_name': '%(yarr)s'}}
|
||||
self.assertEquals(result, expected)
|
||||
|
@ -892,6 +892,47 @@ class TestContainerController(unittest.TestCase):
|
||||
resp = self.controller.GET(req)
|
||||
self.assert_(resp.status_int in (204, 412), resp.status_int)
|
||||
|
||||
def test_put_auto_create(self):
|
||||
headers = {'x-timestamp': normalize_timestamp(1),
|
||||
'x-size': '0',
|
||||
'x-content-type': 'text/plain',
|
||||
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e'}
|
||||
|
||||
resp = self.controller.PUT(Request.blank('/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT'}, headers=dict(headers)))
|
||||
self.assertEquals(resp.status_int, 404)
|
||||
|
||||
resp = self.controller.PUT(Request.blank('/sda1/p/.a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT'}, headers=dict(headers)))
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
|
||||
resp = self.controller.PUT(Request.blank('/sda1/p/a/.c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT'}, headers=dict(headers)))
|
||||
self.assertEquals(resp.status_int, 404)
|
||||
|
||||
resp = self.controller.PUT(Request.blank('/sda1/p/a/.c/.o',
|
||||
environ={'REQUEST_METHOD': 'PUT'}, headers=dict(headers)))
|
||||
self.assertEquals(resp.status_int, 404)
|
||||
|
||||
def test_delete_auto_create(self):
|
||||
headers = {'x-timestamp': normalize_timestamp(1)}
|
||||
|
||||
resp = self.controller.DELETE(Request.blank('/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'DELETE'}, headers=dict(headers)))
|
||||
self.assertEquals(resp.status_int, 404)
|
||||
|
||||
resp = self.controller.DELETE(Request.blank('/sda1/p/.a/c/o',
|
||||
environ={'REQUEST_METHOD': 'DELETE'}, headers=dict(headers)))
|
||||
self.assertEquals(resp.status_int, 204)
|
||||
|
||||
resp = self.controller.DELETE(Request.blank('/sda1/p/a/.c/o',
|
||||
environ={'REQUEST_METHOD': 'DELETE'}, headers=dict(headers)))
|
||||
self.assertEquals(resp.status_int, 404)
|
||||
|
||||
resp = self.controller.DELETE(Request.blank('/sda1/p/a/.c/.o',
|
||||
environ={'REQUEST_METHOD': 'DELETE'}, headers=dict(headers)))
|
||||
self.assertEquals(resp.status_int, 404)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
651
test/unit/obj/test_expirer.py
Normal file
651
test/unit/obj/test_expirer.py
Normal file
@ -0,0 +1,651 @@
|
||||
# Copyright (c) 2011 OpenStack, LLC.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
from sys import exc_info
|
||||
from time import time
|
||||
from unittest import main, TestCase
|
||||
|
||||
from swift.obj import expirer
|
||||
from swift.proxy.server import Application
|
||||
|
||||
|
||||
def not_random():
|
||||
return 0.5
|
||||
|
||||
|
||||
last_not_sleep = 0
|
||||
|
||||
|
||||
def not_sleep(seconds):
|
||||
global last_not_sleep
|
||||
last_not_sleep = seconds
|
||||
|
||||
|
||||
class MockLogger(object):
|
||||
|
||||
def __init__(self):
|
||||
self.debugs = []
|
||||
self.infos = []
|
||||
self.exceptions = []
|
||||
|
||||
def debug(self, msg):
|
||||
self.debugs.append(msg)
|
||||
|
||||
def info(self, msg):
|
||||
self.infos.append(msg)
|
||||
|
||||
def exception(self, msg):
|
||||
self.exceptions.append('%s: %s' % (msg, exc_info()[1]))
|
||||
|
||||
|
||||
class FakeRing(object):
|
||||
|
||||
def __init__(self):
|
||||
self.devs = {1: {'ip': '10.0.0.1', 'port': 1000, 'device': 'sda'},
|
||||
2: {'ip': '10.0.0.2', 'port': 1000, 'device': 'sda'},
|
||||
3: {'ip': '10.0.0.3', 'port': 1000, 'device': 'sda'},
|
||||
4: {'ip': '10.0.0.4', 'port': 1000, 'device': 'sda'}}
|
||||
self.replica_count = 3
|
||||
|
||||
def get_nodes(self, account, container=None, obj=None):
|
||||
return 1, [self.devs[i] for i in xrange(1, self.replica_count + 1)]
|
||||
|
||||
def get_part_nodes(self, part):
|
||||
return self.get_nodes('')[1]
|
||||
|
||||
def get_more_nodes(self, nodes):
|
||||
yield self.devs[self.replica_count + 1]
|
||||
|
||||
|
||||
class TestObjectExpirer(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.orig_loadapp = expirer.loadapp
|
||||
expirer.loadapp = lambda x: Application({}, account_ring=FakeRing(),
|
||||
container_ring=FakeRing(), object_ring=FakeRing())
|
||||
|
||||
def tearDown(self):
|
||||
expirer.loadapp = self.orig_loadapp
|
||||
|
||||
def test_report(self):
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.logger = MockLogger()
|
||||
|
||||
x.logger.infos = []
|
||||
x.report()
|
||||
self.assertEquals(x.logger.infos, [])
|
||||
|
||||
x.logger.infos = []
|
||||
x.report(final=True)
|
||||
self.assertTrue('completed' in x.logger.infos[-1], x.logger.infos)
|
||||
self.assertTrue('so far' not in x.logger.infos[-1], x.logger.infos)
|
||||
|
||||
x.logger.infos = []
|
||||
x.report_last_time = time() - x.report_interval
|
||||
x.report()
|
||||
self.assertTrue('completed' not in x.logger.infos[-1], x.logger.infos)
|
||||
self.assertTrue('so far' in x.logger.infos[-1], x.logger.infos)
|
||||
|
||||
def test_run_once_nothing_to_do(self):
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.logger = MockLogger()
|
||||
x.get_account_info = 'throw error because a string is not callable'
|
||||
x.run_once()
|
||||
self.assertEquals(x.logger.exceptions,
|
||||
["Unhandled exception: 'str' object is not callable"])
|
||||
|
||||
def test_run_once_calls_report(self):
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.logger = MockLogger()
|
||||
x.get_account_info = lambda: (1, 2)
|
||||
x.iter_containers = lambda: []
|
||||
x.run_once()
|
||||
self.assertEquals(x.logger.exceptions, [])
|
||||
self.assertEquals(x.logger.infos,
|
||||
['Pass beginning; 1 possible containers; 2 possible objects',
|
||||
'Pass completed in 0s; 0 objects expired'])
|
||||
|
||||
def test_container_timestamp_break(self):
|
||||
|
||||
def should_not_get_called(container):
|
||||
raise Exception('This should not have been called')
|
||||
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.logger = MockLogger()
|
||||
x.get_account_info = lambda: (1, 2)
|
||||
x.iter_containers = lambda: [str(int(time() + 86400))]
|
||||
x.iter_objects = should_not_get_called
|
||||
x.run_once()
|
||||
self.assertEquals(x.logger.exceptions, [])
|
||||
self.assertEquals(x.logger.infos,
|
||||
['Pass beginning; 1 possible containers; 2 possible objects',
|
||||
'Pass completed in 0s; 0 objects expired'])
|
||||
|
||||
# Reverse test to be sure it still would blow up the way expected.
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.logger = MockLogger()
|
||||
x.get_account_info = lambda: (1, 2)
|
||||
x.iter_containers = lambda: [str(int(time() - 86400))]
|
||||
x.iter_objects = should_not_get_called
|
||||
x.run_once()
|
||||
self.assertEquals(x.logger.exceptions,
|
||||
['Unhandled exception: This should not have been called'])
|
||||
|
||||
|
||||
def test_object_timestamp_break(self):
|
||||
|
||||
def should_not_get_called(actual_obj, timestamp):
|
||||
raise Exception('This should not have been called')
|
||||
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.logger = MockLogger()
|
||||
x.get_account_info = lambda: (1, 2)
|
||||
x.iter_containers = lambda: [str(int(time() - 86400))]
|
||||
x.iter_objects = lambda c: ['%d-actual-obj' % int(time() + 86400)]
|
||||
x.delete_actual_object = should_not_get_called
|
||||
x.delete_container = lambda c: None
|
||||
x.run_once()
|
||||
self.assertEquals(x.logger.exceptions, [])
|
||||
self.assertEquals(x.logger.infos,
|
||||
['Pass beginning; 1 possible containers; 2 possible objects',
|
||||
'Pass completed in 0s; 0 objects expired'])
|
||||
|
||||
# Reverse test to be sure it still would blow up the way expected.
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.logger = MockLogger()
|
||||
x.get_account_info = lambda: (1, 2)
|
||||
x.iter_containers = lambda: [str(int(time() - 86400))]
|
||||
ts = int(time() - 86400)
|
||||
x.iter_objects = lambda c: ['%d-actual-obj' % ts]
|
||||
x.delete_actual_object = should_not_get_called
|
||||
x.delete_container = lambda c: None
|
||||
x.run_once()
|
||||
self.assertEquals(x.logger.exceptions, ['Exception while deleting '
|
||||
'object %d %d-actual-obj This should not have been called: This '
|
||||
'should not have been called' % (ts, ts)])
|
||||
|
||||
def test_failed_delete_keeps_entry(self):
|
||||
|
||||
def deliberately_blow_up(actual_obj, timestamp):
|
||||
raise Exception('failed to delete actual object')
|
||||
|
||||
def should_not_get_called(container, obj):
|
||||
raise Exception('This should not have been called')
|
||||
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.logger = MockLogger()
|
||||
x.get_account_info = lambda: (1, 2)
|
||||
x.iter_containers = lambda: [str(int(time() - 86400))]
|
||||
ts = int(time() - 86400)
|
||||
x.iter_objects = lambda c: ['%d-actual-obj' % ts]
|
||||
x.delete_actual_object = deliberately_blow_up
|
||||
x.delete_object = should_not_get_called
|
||||
x.delete_container = lambda c: None
|
||||
x.run_once()
|
||||
self.assertEquals(x.logger.exceptions, ['Exception while deleting '
|
||||
'object %d %d-actual-obj failed to delete actual object: failed '
|
||||
'to delete actual object' % (ts, ts)])
|
||||
self.assertEquals(x.logger.infos,
|
||||
['Pass beginning; 1 possible containers; 2 possible objects',
|
||||
'Pass completed in 0s; 0 objects expired'])
|
||||
|
||||
# Reverse test to be sure it still would blow up the way expected.
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.logger = MockLogger()
|
||||
x.get_account_info = lambda: (1, 2)
|
||||
x.iter_containers = lambda: [str(int(time() - 86400))]
|
||||
ts = int(time() - 86400)
|
||||
x.iter_objects = lambda c: ['%d-actual-obj' % ts]
|
||||
x.delete_actual_object = lambda o, t: None
|
||||
x.delete_object = should_not_get_called
|
||||
x.delete_container = lambda c: None
|
||||
x.run_once()
|
||||
self.assertEquals(x.logger.exceptions, ['Exception while deleting '
|
||||
'object %d %d-actual-obj This should not have been called: This '
|
||||
'should not have been called' % (ts, ts)])
|
||||
|
||||
def test_success_gets_counted(self):
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.logger = MockLogger()
|
||||
x.get_account_info = lambda: (1, 2)
|
||||
x.iter_containers = lambda: [str(int(time() - 86400))]
|
||||
x.iter_objects = lambda c: ['%d-actual-obj' % int(time() - 86400)]
|
||||
x.delete_actual_object = lambda o, t: None
|
||||
x.delete_object = lambda c, o: None
|
||||
x.delete_container = lambda c: None
|
||||
self.assertEquals(x.report_objects, 0)
|
||||
x.run_once()
|
||||
self.assertEquals(x.report_objects, 1)
|
||||
self.assertEquals(x.logger.exceptions, [])
|
||||
self.assertEquals(x.logger.infos,
|
||||
['Pass beginning; 1 possible containers; 2 possible objects',
|
||||
'Pass completed in 0s; 1 objects expired'])
|
||||
|
||||
def test_failed_delete_continues_on(self):
|
||||
|
||||
def fail_delete_actual_object(actual_obj, timestamp):
|
||||
raise Exception('failed to delete actual object')
|
||||
|
||||
def fail_delete_container(container):
|
||||
raise Exception('failed to delete container')
|
||||
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.logger = MockLogger()
|
||||
x.get_account_info = lambda: (1, 2)
|
||||
cts = int(time() - 86400)
|
||||
x.iter_containers = lambda: [str(cts), str(cts + 1)]
|
||||
ots = int(time() - 86400)
|
||||
x.iter_objects = lambda c: ['%d-actual-obj' % ots, '%d-next-obj' % ots]
|
||||
x.delete_actual_object = fail_delete_actual_object
|
||||
x.delete_object = lambda c, o: None
|
||||
x.delete_container = fail_delete_container
|
||||
x.run_once()
|
||||
self.assertEquals(x.logger.exceptions, [
|
||||
'Exception while deleting object %d %d-actual-obj failed to '
|
||||
'delete actual object: failed to delete actual object' %
|
||||
(cts, ots),
|
||||
'Exception while deleting object %d %d-next-obj failed to delete '
|
||||
'actual object: failed to delete actual object' % (cts, ots),
|
||||
'Exception while deleting container %d failed to delete '
|
||||
'container: failed to delete container' % cts,
|
||||
'Exception while deleting object %d %d-actual-obj failed to '
|
||||
'delete actual object: failed to delete actual object' %
|
||||
(cts + 1, ots),
|
||||
'Exception while deleting object %d %d-next-obj failed to delete '
|
||||
'actual object: failed to delete actual object' % (cts + 1, ots),
|
||||
'Exception while deleting container %d failed to delete '
|
||||
'container: failed to delete container' % (cts + 1)])
|
||||
self.assertEquals(x.logger.infos,
|
||||
['Pass beginning; 1 possible containers; 2 possible objects',
|
||||
'Pass completed in 0s; 0 objects expired'])
|
||||
|
||||
def test_run_forever_initial_sleep_random(self):
|
||||
global last_not_sleep
|
||||
|
||||
def raise_system_exit():
|
||||
raise SystemExit('test_run_forever')
|
||||
|
||||
interval = 1234
|
||||
x = expirer.ObjectExpirer({'__file__': 'unit_test',
|
||||
'interval': interval})
|
||||
orig_random = expirer.random
|
||||
orig_sleep = expirer.sleep
|
||||
exc = None
|
||||
try:
|
||||
expirer.random = not_random
|
||||
expirer.sleep = not_sleep
|
||||
x.run_once = raise_system_exit
|
||||
x.run_forever()
|
||||
except SystemExit, err:
|
||||
exc = err
|
||||
finally:
|
||||
expirer.random = orig_random
|
||||
expirer.sleep = orig_sleep
|
||||
self.assertEquals(str(err), 'test_run_forever')
|
||||
self.assertEquals(last_not_sleep, 0.5 * interval)
|
||||
|
||||
def test_run_forever_catches_usual_exceptions(self):
|
||||
raises = [0]
|
||||
|
||||
def raise_exceptions():
|
||||
raises[0] += 1
|
||||
if raises[0] < 2:
|
||||
raise Exception('exception %d' % raises[0])
|
||||
raise SystemExit('exiting exception %d' % raises[0])
|
||||
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.logger = MockLogger()
|
||||
orig_sleep = expirer.sleep
|
||||
exc = None
|
||||
try:
|
||||
expirer.sleep = not_sleep
|
||||
x.run_once = raise_exceptions
|
||||
x.run_forever()
|
||||
except SystemExit, err:
|
||||
exc = err
|
||||
finally:
|
||||
expirer.sleep = orig_sleep
|
||||
self.assertEquals(str(err), 'exiting exception 2')
|
||||
self.assertEquals(x.logger.exceptions,
|
||||
['Unhandled exception: exception 1'])
|
||||
|
||||
def test_get_response_sets_user_agent(self):
|
||||
env_given = [None]
|
||||
|
||||
def fake_app(env, start_response):
|
||||
env_given[0] = env
|
||||
start_response('200 Ok', [('Content-Length', '0')])
|
||||
return []
|
||||
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.app = fake_app
|
||||
resp = x.get_response('GET', '/', {}, (200,))
|
||||
self.assertEquals(env_given[0]['HTTP_USER_AGENT'],
|
||||
'Swift Object Expirer')
|
||||
|
||||
def test_get_response_retries(self):
|
||||
global last_not_sleep
|
||||
tries = [0]
|
||||
|
||||
def fake_app(env, start_response):
|
||||
tries[0] += 1
|
||||
if tries[0] < 3:
|
||||
start_response('500 Internal Server Error',
|
||||
[('Content-Length', '0')])
|
||||
else:
|
||||
start_response('200 Ok', [('Content-Length', '0')])
|
||||
return []
|
||||
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.app = fake_app
|
||||
orig_sleep = expirer.sleep
|
||||
try:
|
||||
expirer.sleep = not_sleep
|
||||
resp = x.get_response('GET', '/', {}, (200,))
|
||||
finally:
|
||||
expirer.sleep = orig_sleep
|
||||
self.assertEquals(tries[0], 3)
|
||||
self.assertEquals(last_not_sleep, 4)
|
||||
|
||||
def test_get_response_method_path_headers(self):
|
||||
env_given = [None]
|
||||
|
||||
def fake_app(env, start_response):
|
||||
env_given[0] = env
|
||||
start_response('200 Ok', [('Content-Length', '0')])
|
||||
return []
|
||||
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.app = fake_app
|
||||
for method in ('GET', 'SOMETHINGELSE'):
|
||||
resp = x.get_response(method, '/', {}, (200,))
|
||||
self.assertEquals(env_given[0]['REQUEST_METHOD'], method)
|
||||
for path in ('/one', '/two/three'):
|
||||
resp = x.get_response('GET', path, {'X-Test': path}, (200,))
|
||||
self.assertEquals(env_given[0]['PATH_INFO'], path)
|
||||
self.assertEquals(env_given[0]['HTTP_X_TEST'], path)
|
||||
|
||||
def test_get_response_codes(self):
|
||||
|
||||
def fake_app(env, start_response):
|
||||
start_response('200 Ok', [('Content-Length', '0')])
|
||||
return []
|
||||
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.app = fake_app
|
||||
orig_sleep = expirer.sleep
|
||||
try:
|
||||
expirer.sleep = not_sleep
|
||||
resp = x.get_response('GET', '/', {}, (200,))
|
||||
resp = x.get_response('GET', '/', {}, (2,))
|
||||
resp = x.get_response('GET', '/', {}, (400, 200))
|
||||
resp = x.get_response('GET', '/', {}, (400, 2))
|
||||
try:
|
||||
resp = x.get_response('GET', '/', {}, (400,))
|
||||
except Exception, err:
|
||||
exc = err
|
||||
self.assertEquals(str(err), 'Unexpected response 200 Ok')
|
||||
try:
|
||||
resp = x.get_response('GET', '/', {}, (201,))
|
||||
except Exception, err:
|
||||
exc = err
|
||||
self.assertEquals(str(err), 'Unexpected response 200 Ok')
|
||||
finally:
|
||||
expirer.sleep = orig_sleep
|
||||
|
||||
def test_get_account_info(self):
|
||||
|
||||
def fake_app(env, start_response):
|
||||
start_response('200 Ok', [('Content-Length', '0'),
|
||||
('X-Account-Container-Count', '80'),
|
||||
('X-Account-Object-Count', '90')])
|
||||
return []
|
||||
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.app = fake_app
|
||||
self.assertEquals(x.get_account_info(), (80, 90))
|
||||
|
||||
def test_get_account_info_handles_404(self):
|
||||
|
||||
def fake_app(env, start_response):
|
||||
start_response('404 Not Found', [('Content-Length', '0')])
|
||||
return []
|
||||
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.app = fake_app
|
||||
self.assertEquals(x.get_account_info(), (0, 0))
|
||||
|
||||
def test_iter_containers(self):
|
||||
calls = [0]
|
||||
|
||||
def fake_app(env, start_response):
|
||||
calls[0] += 1
|
||||
if calls[0] == 1:
|
||||
body = json.dumps([{'name': 'one'}, {'name': 'two'}])
|
||||
start_response('200 Ok', [('Content-Length', str(len(body)))])
|
||||
return [body]
|
||||
elif calls[0] == 2:
|
||||
body = json.dumps([{'name': 'three'}, {'name': 'four'}])
|
||||
start_response('200 Ok', [('Content-Length', str(len(body)))])
|
||||
return [body]
|
||||
elif calls[0] == 3:
|
||||
start_response('204 Ok', [('Content-Length', '0')])
|
||||
return []
|
||||
raise Exception('Should not get here')
|
||||
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.app = fake_app
|
||||
self.assertEquals(list(x.iter_containers()),
|
||||
['one', 'two', 'three', 'four'])
|
||||
|
||||
def test_iter_containers_handles_404(self):
|
||||
|
||||
def fake_app(env, start_response):
|
||||
start_response('404 Not Found', [('Content-Length', '0')])
|
||||
return []
|
||||
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.app = fake_app
|
||||
self.assertEquals(list(x.iter_containers()), [])
|
||||
|
||||
def test_iter_objects(self):
|
||||
calls = [0]
|
||||
|
||||
def fake_app(env, start_response):
|
||||
calls[0] += 1
|
||||
if calls[0] == 1:
|
||||
body = json.dumps([{'name': 'one'}, {'name': 'two'}])
|
||||
start_response('200 Ok', [('Content-Length', str(len(body)))])
|
||||
return [body]
|
||||
elif calls[0] == 2:
|
||||
body = json.dumps([{'name': 'three'}, {'name': 'four'}])
|
||||
start_response('200 Ok', [('Content-Length', str(len(body)))])
|
||||
return [body]
|
||||
elif calls[0] == 3:
|
||||
start_response('204 Ok', [('Content-Length', '0')])
|
||||
return []
|
||||
raise Exception('Should not get here')
|
||||
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.app = fake_app
|
||||
self.assertEquals(list(x.iter_objects('container')),
|
||||
['one', 'two', 'three', 'four'])
|
||||
|
||||
def test_iter_objects_handles_404(self):
|
||||
|
||||
def fake_app(env, start_response):
|
||||
start_response('404 Not Found', [('Content-Length', '0')])
|
||||
return []
|
||||
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.app = fake_app
|
||||
self.assertEquals(list(x.iter_objects('container')), [])
|
||||
|
||||
def test_delete_actual_object(self):
|
||||
got_env = [None]
|
||||
|
||||
def fake_app(env, start_response):
|
||||
got_env[0] = env
|
||||
start_response('204 No Content', [('Content-Length', '0')])
|
||||
return []
|
||||
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.app = fake_app
|
||||
ts = '1234'
|
||||
x.delete_actual_object('/path/to/object', ts)
|
||||
self.assertEquals(got_env[0]['HTTP_X_IF_DELETE_AT'], ts)
|
||||
|
||||
def test_delete_actual_object_handles_404(self):
|
||||
|
||||
def fake_app(env, start_response):
|
||||
start_response('404 Not Found', [('Content-Length', '0')])
|
||||
return []
|
||||
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.app = fake_app
|
||||
x.delete_actual_object('/path/to/object', '1234')
|
||||
|
||||
def test_delete_actual_object_handles_412(self):
|
||||
|
||||
def fake_app(env, start_response):
|
||||
start_response('412 Precondition Failed',
|
||||
[('Content-Length', '0')])
|
||||
return []
|
||||
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.app = fake_app
|
||||
x.delete_actual_object('/path/to/object', '1234')
|
||||
|
||||
def test_delete_actual_object_does_not_handle_odd_stuff(self):
|
||||
|
||||
def fake_app(env, start_response):
|
||||
start_response('503 Internal Server Error',
|
||||
[('Content-Length', '0')])
|
||||
return []
|
||||
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.app = fake_app
|
||||
orig_sleep = expirer.sleep
|
||||
exc = None
|
||||
try:
|
||||
expirer.sleep = not_sleep
|
||||
x.delete_actual_object('/path/to/object', '1234')
|
||||
except Exception, err:
|
||||
exc = err
|
||||
finally:
|
||||
expirer.sleep = orig_sleep
|
||||
self.assertEquals(str(err),
|
||||
'Unexpected response 503 Internal Server Error')
|
||||
|
||||
def test_delete_object(self):
|
||||
got_env = [None]
|
||||
|
||||
def fake_app(env, start_response):
|
||||
got_env[0] = env
|
||||
start_response('204 No Content', [('Content-Length', '0')])
|
||||
return []
|
||||
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.app = fake_app
|
||||
x.delete_object('container', 'object')
|
||||
|
||||
def test_delete_object_handles_404(self):
|
||||
|
||||
def fake_app(env, start_response):
|
||||
start_response('404 Not Found', [('Content-Length', '0')])
|
||||
return []
|
||||
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.app = fake_app
|
||||
x.delete_object('container', 'object')
|
||||
|
||||
def test_delete_object_does_not_handle_odd_stuff(self):
|
||||
|
||||
def fake_app(env, start_response):
|
||||
start_response('503 Internal Server Error',
|
||||
[('Content-Length', '0')])
|
||||
return []
|
||||
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.app = fake_app
|
||||
orig_sleep = expirer.sleep
|
||||
exc = None
|
||||
try:
|
||||
expirer.sleep = not_sleep
|
||||
x.delete_object('container', 'object')
|
||||
except Exception, err:
|
||||
exc = err
|
||||
finally:
|
||||
expirer.sleep = orig_sleep
|
||||
self.assertEquals(str(err),
|
||||
'Unexpected response 503 Internal Server Error')
|
||||
|
||||
def test_delete_container(self):
|
||||
got_env = [None]
|
||||
|
||||
def fake_app(env, start_response):
|
||||
got_env[0] = env
|
||||
start_response('204 No Content', [('Content-Length', '0')])
|
||||
return []
|
||||
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.app = fake_app
|
||||
x.delete_container('container')
|
||||
|
||||
def test_delete_container_handles_404(self):
|
||||
|
||||
def fake_app(env, start_response):
|
||||
start_response('404 Not Found', [('Content-Length', '0')])
|
||||
return []
|
||||
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.app = fake_app
|
||||
x.delete_container('container')
|
||||
|
||||
def test_delete_container_handles_409(self):
|
||||
|
||||
def fake_app(env, start_response):
|
||||
start_response('409 Conflict', [('Content-Length', '0')])
|
||||
return []
|
||||
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.app = fake_app
|
||||
x.delete_container('container')
|
||||
|
||||
def test_delete_container_does_not_handle_odd_stuff(self):
|
||||
|
||||
def fake_app(env, start_response):
|
||||
start_response('503 Internal Server Error',
|
||||
[('Content-Length', '0')])
|
||||
return []
|
||||
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.app = fake_app
|
||||
orig_sleep = expirer.sleep
|
||||
exc = None
|
||||
try:
|
||||
expirer.sleep = not_sleep
|
||||
x.delete_container('container')
|
||||
except Exception, err:
|
||||
exc = err
|
||||
finally:
|
||||
expirer.sleep = orig_sleep
|
||||
self.assertEquals(str(err),
|
||||
'Unexpected response 503 Internal Server Error')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -34,6 +34,7 @@ from test.unit import _getxattr as getxattr
|
||||
from test.unit import _setxattr as setxattr
|
||||
from test.unit import connect_tcp, readuntil2crlfs
|
||||
from swift.obj import server as object_server
|
||||
from swift.common import utils
|
||||
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
|
||||
NullLogger, storage_directory
|
||||
from swift.common.exceptions import DiskFileNotExist
|
||||
@ -290,6 +291,7 @@ class TestObjectController(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
""" Set up for testing swift.object_server.ObjectController """
|
||||
utils.HASH_PATH_SUFFIX = 'endcap'
|
||||
self.testdir = \
|
||||
os.path.join(mkdtemp(), 'tmp_test_object_server_ObjectController')
|
||||
mkdirs(os.path.join(self.testdir, 'sda1', 'tmp'))
|
||||
@ -1482,6 +1484,555 @@ class TestObjectController(unittest.TestCase):
|
||||
self.assertEquals(resp.status_int, 200)
|
||||
self.assertEquals(resp.headers.get('x-object-manifest'), 'c/o/')
|
||||
|
||||
def test_async_update_http_connect(self):
|
||||
given_args = []
|
||||
|
||||
def fake_http_connect(*args):
|
||||
given_args.extend(args)
|
||||
raise Exception('test')
|
||||
|
||||
orig_http_connect = object_server.http_connect
|
||||
try:
|
||||
object_server.http_connect = fake_http_connect
|
||||
self.object_controller.async_update('PUT', 'a', 'c', 'o',
|
||||
'127.0.0.1:1234', 1, 'sdc1',
|
||||
{'x-timestamp': '1', 'x-out': 'set'}, 'sda1')
|
||||
finally:
|
||||
object_server.http_connect = orig_http_connect
|
||||
self.assertEquals(given_args, ['127.0.0.1', '1234', 'sdc1', 1, 'PUT',
|
||||
'/a/c/o', {'x-timestamp': '1', 'x-out': 'set'}])
|
||||
|
||||
def test_async_update_saves_on_exception(self):
|
||||
|
||||
def fake_http_connect(*args):
|
||||
raise Exception('test')
|
||||
|
||||
orig_http_connect = object_server.http_connect
|
||||
try:
|
||||
object_server.http_connect = fake_http_connect
|
||||
self.object_controller.async_update('PUT', 'a', 'c', 'o',
|
||||
'127.0.0.1:1234', 1, 'sdc1',
|
||||
{'x-timestamp': '1', 'x-out': 'set'}, 'sda1')
|
||||
finally:
|
||||
object_server.http_connect = orig_http_connect
|
||||
self.assertEquals(
|
||||
pickle.load(open(os.path.join(self.testdir, 'sda1',
|
||||
'async_pending', 'a83',
|
||||
'06fbf0b514e5199dfc4e00f42eb5ea83-0000000001.00000'))),
|
||||
{'headers': {'x-timestamp': '1', 'x-out': 'set'}, 'account': 'a',
|
||||
'container': 'c', 'obj': 'o', 'op': 'PUT'})
|
||||
|
||||
def test_async_update_saves_on_non_2xx(self):
|
||||
|
||||
def fake_http_connect(status):
|
||||
|
||||
class FakeConn(object):
|
||||
|
||||
def __init__(self, status):
|
||||
self.status = status
|
||||
|
||||
def getresponse(self):
|
||||
return self
|
||||
|
||||
def read(self):
|
||||
return ''
|
||||
|
||||
return lambda *args: FakeConn(status)
|
||||
|
||||
orig_http_connect = object_server.http_connect
|
||||
try:
|
||||
for status in (199, 300, 503):
|
||||
object_server.http_connect = fake_http_connect(status)
|
||||
self.object_controller.async_update('PUT', 'a', 'c', 'o',
|
||||
'127.0.0.1:1234', 1, 'sdc1',
|
||||
{'x-timestamp': '1', 'x-out': str(status)}, 'sda1')
|
||||
self.assertEquals(
|
||||
pickle.load(open(os.path.join(self.testdir, 'sda1',
|
||||
'async_pending', 'a83',
|
||||
'06fbf0b514e5199dfc4e00f42eb5ea83-0000000001.00000'))),
|
||||
{'headers': {'x-timestamp': '1', 'x-out': str(status)},
|
||||
'account': 'a', 'container': 'c', 'obj': 'o',
|
||||
'op': 'PUT'})
|
||||
finally:
|
||||
object_server.http_connect = orig_http_connect
|
||||
|
||||
def test_async_update_does_not_save_on_2xx(self):
|
||||
|
||||
def fake_http_connect(status):
|
||||
|
||||
class FakeConn(object):
|
||||
|
||||
def __init__(self, status):
|
||||
self.status = status
|
||||
|
||||
def getresponse(self):
|
||||
return self
|
||||
|
||||
def read(self):
|
||||
return ''
|
||||
|
||||
return lambda *args: FakeConn(status)
|
||||
|
||||
orig_http_connect = object_server.http_connect
|
||||
try:
|
||||
for status in (200, 299):
|
||||
object_server.http_connect = fake_http_connect(status)
|
||||
self.object_controller.async_update('PUT', 'a', 'c', 'o',
|
||||
'127.0.0.1:1234', 1, 'sdc1',
|
||||
{'x-timestamp': '1', 'x-out': str(status)}, 'sda1')
|
||||
self.assertFalse(
|
||||
os.path.exists(os.path.join(self.testdir, 'sda1',
|
||||
'async_pending', 'a83',
|
||||
'06fbf0b514e5199dfc4e00f42eb5ea83-0000000001.00000')))
|
||||
finally:
|
||||
object_server.http_connect = orig_http_connect
|
||||
|
||||
def test_delete_at_update_put(self):
|
||||
given_args = []
|
||||
|
||||
def fake_async_update(*args):
|
||||
given_args.extend(args)
|
||||
|
||||
self.object_controller.async_update = fake_async_update
|
||||
self.object_controller.delete_at_update('PUT', 2, 'a', 'c', 'o',
|
||||
{'x-timestamp': '1'}, 'sda1')
|
||||
self.assertEquals(given_args, ['PUT', '.expiring_objects', '0',
|
||||
'2-a/c/o', None, None, None,
|
||||
{'x-size': '0', 'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
|
||||
'x-content-type': 'text/plain', 'x-timestamp': '1',
|
||||
'x-trans-id': '-'},
|
||||
'sda1'])
|
||||
|
||||
def test_delete_at_update_put_with_info(self):
|
||||
given_args = []
|
||||
|
||||
def fake_async_update(*args):
|
||||
given_args.extend(args)
|
||||
|
||||
self.object_controller.async_update = fake_async_update
|
||||
self.object_controller.delete_at_update('PUT', 2, 'a', 'c', 'o',
|
||||
{'x-timestamp': '1', 'X-Delete-At-Host': '127.0.0.1:1234',
|
||||
'X-Delete-At-Partition': '3', 'X-Delete-At-Device': 'sdc1'},
|
||||
'sda1')
|
||||
self.assertEquals(given_args, ['PUT', '.expiring_objects', '0',
|
||||
'2-a/c/o', '127.0.0.1:1234', '3', 'sdc1',
|
||||
{'x-size': '0', 'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
|
||||
'x-content-type': 'text/plain', 'x-timestamp': '1',
|
||||
'x-trans-id': '-'},
|
||||
'sda1'])
|
||||
|
||||
def test_delete_at_update_delete(self):
|
||||
given_args = []
|
||||
|
||||
def fake_async_update(*args):
|
||||
given_args.extend(args)
|
||||
|
||||
self.object_controller.async_update = fake_async_update
|
||||
self.object_controller.delete_at_update('DELETE', 2, 'a', 'c', 'o',
|
||||
{'x-timestamp': '1'}, 'sda1')
|
||||
self.assertEquals(given_args, ['DELETE', '.expiring_objects', '0',
|
||||
'2-a/c/o', None, None, None,
|
||||
{'x-timestamp': '1', 'x-trans-id': '-'}, 'sda1'])
|
||||
|
||||
def test_POST_calls_delete_at(self):
|
||||
given_args = []
|
||||
|
||||
def fake_delete_at_update(*args):
|
||||
given_args.extend(args)
|
||||
|
||||
self.object_controller.delete_at_update = fake_delete_at_update
|
||||
|
||||
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': normalize_timestamp(time()),
|
||||
'Content-Length': '4',
|
||||
'Content-Type': 'application/octet-stream'})
|
||||
req.body = 'TEST'
|
||||
resp = self.object_controller.PUT(req)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
self.assertEquals(given_args, [])
|
||||
|
||||
sleep(.00001)
|
||||
req = Request.blank('/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'X-Timestamp': normalize_timestamp(time()),
|
||||
'Content-Type': 'application/x-test'})
|
||||
resp = self.object_controller.POST(req)
|
||||
self.assertEquals(resp.status_int, 202)
|
||||
self.assertEquals(given_args, [])
|
||||
|
||||
sleep(.00001)
|
||||
timestamp1 = normalize_timestamp(time())
|
||||
delete_at_timestamp1 = str(int(time() + 1000))
|
||||
req = Request.blank('/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'X-Timestamp': timestamp1,
|
||||
'Content-Type': 'application/x-test',
|
||||
'X-Delete-At': delete_at_timestamp1})
|
||||
resp = self.object_controller.POST(req)
|
||||
self.assertEquals(resp.status_int, 202)
|
||||
self.assertEquals(given_args, [
|
||||
'PUT', int(delete_at_timestamp1), 'a', 'c', 'o',
|
||||
{'X-Delete-At': delete_at_timestamp1,
|
||||
'Content-Type': 'application/x-test',
|
||||
'X-Timestamp': timestamp1,
|
||||
'Host': 'localhost:80'},
|
||||
'sda1'])
|
||||
|
||||
while given_args:
|
||||
given_args.pop()
|
||||
|
||||
sleep(.00001)
|
||||
timestamp2 = normalize_timestamp(time())
|
||||
delete_at_timestamp2 = str(int(time() + 2000))
|
||||
req = Request.blank('/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'X-Timestamp': timestamp2,
|
||||
'Content-Type': 'application/x-test',
|
||||
'X-Delete-At': delete_at_timestamp2})
|
||||
resp = self.object_controller.POST(req)
|
||||
self.assertEquals(resp.status_int, 202)
|
||||
self.assertEquals(given_args, [
|
||||
'PUT', int(delete_at_timestamp2), 'a', 'c', 'o',
|
||||
{'X-Delete-At': delete_at_timestamp2,
|
||||
'Content-Type': 'application/x-test',
|
||||
'X-Timestamp': timestamp2, 'Host': 'localhost:80'},
|
||||
'sda1',
|
||||
'DELETE', int(delete_at_timestamp1), 'a', 'c', 'o',
|
||||
# This 2 timestamp is okay because it's ignored since it's just
|
||||
# part of the current request headers. The above 1 timestamp is the
|
||||
# important one.
|
||||
{'X-Delete-At': delete_at_timestamp2,
|
||||
'Content-Type': 'application/x-test',
|
||||
'X-Timestamp': timestamp2, 'Host': 'localhost:80'},
|
||||
'sda1'])
|
||||
|
||||
def test_PUT_calls_delete_at(self):
|
||||
given_args = []
|
||||
|
||||
def fake_delete_at_update(*args):
|
||||
given_args.extend(args)
|
||||
|
||||
self.object_controller.delete_at_update = fake_delete_at_update
|
||||
|
||||
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': normalize_timestamp(time()),
|
||||
'Content-Length': '4',
|
||||
'Content-Type': 'application/octet-stream'})
|
||||
req.body = 'TEST'
|
||||
resp = self.object_controller.PUT(req)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
self.assertEquals(given_args, [])
|
||||
|
||||
sleep(.00001)
|
||||
timestamp1 = normalize_timestamp(time())
|
||||
delete_at_timestamp1 = str(int(time() + 1000))
|
||||
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': timestamp1,
|
||||
'Content-Length': '4',
|
||||
'Content-Type': 'application/octet-stream',
|
||||
'X-Delete-At': delete_at_timestamp1})
|
||||
req.body = 'TEST'
|
||||
resp = self.object_controller.PUT(req)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
self.assertEquals(given_args, [
|
||||
'PUT', int(delete_at_timestamp1), 'a', 'c', 'o',
|
||||
{'X-Delete-At': delete_at_timestamp1,
|
||||
'Content-Length': '4',
|
||||
'Content-Type': 'application/octet-stream',
|
||||
'X-Timestamp': timestamp1,
|
||||
'Host': 'localhost:80'},
|
||||
'sda1'])
|
||||
|
||||
while given_args:
|
||||
given_args.pop()
|
||||
|
||||
sleep(.00001)
|
||||
timestamp2 = normalize_timestamp(time())
|
||||
delete_at_timestamp2 = str(int(time() + 2000))
|
||||
req = Request.blank('/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': timestamp2,
|
||||
'Content-Length': '4',
|
||||
'Content-Type': 'application/octet-stream',
|
||||
'X-Delete-At': delete_at_timestamp2})
|
||||
req.body = 'TEST'
|
||||
resp = self.object_controller.PUT(req)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
self.assertEquals(given_args, [
|
||||
'PUT', int(delete_at_timestamp2), 'a', 'c', 'o',
|
||||
{'X-Delete-At': delete_at_timestamp2,
|
||||
'Content-Length': '4',
|
||||
'Content-Type': 'application/octet-stream',
|
||||
'X-Timestamp': timestamp2, 'Host': 'localhost:80'},
|
||||
'sda1',
|
||||
'DELETE', int(delete_at_timestamp1), 'a', 'c', 'o',
|
||||
# This 2 timestamp is okay because it's ignored since it's just
|
||||
# part of the current request headers. The above 1 timestamp is the
|
||||
# important one.
|
||||
{'X-Delete-At': delete_at_timestamp2,
|
||||
'Content-Length': '4',
|
||||
'Content-Type': 'application/octet-stream',
|
||||
'X-Timestamp': timestamp2, 'Host': 'localhost:80'},
|
||||
'sda1'])
|
||||
|
||||
def test_GET_but_expired(self):
|
||||
test_time = time() + 10000
|
||||
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': normalize_timestamp(test_time - 2000),
|
||||
'X-Delete-At': str(int(test_time + 100)),
|
||||
'Content-Length': '4',
|
||||
'Content-Type': 'application/octet-stream'})
|
||||
req.body = 'TEST'
|
||||
resp = self.object_controller.PUT(req)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
|
||||
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'X-Timestamp': normalize_timestamp(test_time)})
|
||||
resp = self.object_controller.GET(req)
|
||||
self.assertEquals(resp.status_int, 200)
|
||||
|
||||
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': normalize_timestamp(test_time - 1000),
|
||||
'X-Delete-At': str(int(time() + 1)),
|
||||
'Content-Length': '4',
|
||||
'Content-Type': 'application/octet-stream'})
|
||||
req.body = 'TEST'
|
||||
resp = self.object_controller.PUT(req)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
|
||||
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'X-Timestamp': normalize_timestamp(test_time)})
|
||||
resp = self.object_controller.GET(req)
|
||||
self.assertEquals(resp.status_int, 200)
|
||||
|
||||
orig_time = object_server.time.time
|
||||
try:
|
||||
t = time() + 2
|
||||
object_server.time.time = lambda: t
|
||||
req = Request.blank('/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'X-Timestamp': normalize_timestamp(t)})
|
||||
resp = self.object_controller.GET(req)
|
||||
self.assertEquals(resp.status_int, 404)
|
||||
finally:
|
||||
object_server.time.time = orig_time
|
||||
|
||||
def test_HEAD_but_expired(self):
|
||||
test_time = time() + 10000
|
||||
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': normalize_timestamp(test_time - 2000),
|
||||
'X-Delete-At': str(int(test_time + 100)),
|
||||
'Content-Length': '4',
|
||||
'Content-Type': 'application/octet-stream'})
|
||||
req.body = 'TEST'
|
||||
resp = self.object_controller.PUT(req)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
|
||||
req = Request.blank('/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'HEAD'},
|
||||
headers={'X-Timestamp': normalize_timestamp(test_time)})
|
||||
resp = self.object_controller.HEAD(req)
|
||||
self.assertEquals(resp.status_int, 200)
|
||||
|
||||
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': normalize_timestamp(test_time - 1000),
|
||||
'X-Delete-At': str(int(time() + 1)),
|
||||
'Content-Length': '4',
|
||||
'Content-Type': 'application/octet-stream'})
|
||||
req.body = 'TEST'
|
||||
resp = self.object_controller.PUT(req)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
|
||||
req = Request.blank('/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'HEAD'},
|
||||
headers={'X-Timestamp': normalize_timestamp(test_time)})
|
||||
resp = self.object_controller.HEAD(req)
|
||||
self.assertEquals(resp.status_int, 200)
|
||||
|
||||
orig_time = object_server.time.time
|
||||
try:
|
||||
t = time() + 2
|
||||
object_server.time.time = lambda: t
|
||||
req = Request.blank('/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'HEAD'},
|
||||
headers={'X-Timestamp': normalize_timestamp(time())})
|
||||
resp = self.object_controller.HEAD(req)
|
||||
self.assertEquals(resp.status_int, 404)
|
||||
finally:
|
||||
object_server.time.time = orig_time
|
||||
|
||||
def test_POST_but_expired(self):
|
||||
test_time = time() + 10000
|
||||
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': normalize_timestamp(test_time - 2000),
|
||||
'X-Delete-At': str(int(test_time + 100)),
|
||||
'Content-Length': '4',
|
||||
'Content-Type': 'application/octet-stream'})
|
||||
req.body = 'TEST'
|
||||
resp = self.object_controller.PUT(req)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
|
||||
req = Request.blank('/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'X-Timestamp': normalize_timestamp(test_time - 1500)})
|
||||
resp = self.object_controller.POST(req)
|
||||
self.assertEquals(resp.status_int, 202)
|
||||
|
||||
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': normalize_timestamp(test_time - 1000),
|
||||
'X-Delete-At': str(int(time() + 1)),
|
||||
'Content-Length': '4',
|
||||
'Content-Type': 'application/octet-stream'})
|
||||
req.body = 'TEST'
|
||||
resp = self.object_controller.PUT(req)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
|
||||
orig_time = object_server.time.time
|
||||
try:
|
||||
t = time() + 2
|
||||
object_server.time.time = lambda: t
|
||||
req = Request.blank('/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'X-Timestamp': normalize_timestamp(time())})
|
||||
resp = self.object_controller.POST(req)
|
||||
self.assertEquals(resp.status_int, 404)
|
||||
finally:
|
||||
object_server.time.time = orig_time
|
||||
|
||||
def test_DELETE_if_delete_at(self):
|
||||
test_time = time() + 10000
|
||||
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': normalize_timestamp(test_time - 99),
|
||||
'Content-Length': '4',
|
||||
'Content-Type': 'application/octet-stream'})
|
||||
req.body = 'TEST'
|
||||
resp = self.object_controller.PUT(req)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
|
||||
req = Request.blank('/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'DELETE'},
|
||||
headers={'X-Timestamp': normalize_timestamp(test_time - 98)})
|
||||
resp = self.object_controller.DELETE(req)
|
||||
self.assertEquals(resp.status_int, 204)
|
||||
|
||||
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': normalize_timestamp(test_time - 97),
|
||||
'X-Delete-At': str(int(test_time - 1)),
|
||||
'Content-Length': '4',
|
||||
'Content-Type': 'application/octet-stream'})
|
||||
req.body = 'TEST'
|
||||
resp = self.object_controller.PUT(req)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
|
||||
req = Request.blank('/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'DELETE'},
|
||||
headers={'X-Timestamp': normalize_timestamp(test_time - 95),
|
||||
'X-If-Delete-At': str(int(test_time))})
|
||||
resp = self.object_controller.DELETE(req)
|
||||
self.assertEquals(resp.status_int, 412)
|
||||
|
||||
req = Request.blank('/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'DELETE'},
|
||||
headers={'X-Timestamp': normalize_timestamp(test_time - 95)})
|
||||
resp = self.object_controller.DELETE(req)
|
||||
self.assertEquals(resp.status_int, 204)
|
||||
|
||||
delete_at_timestamp = str(int(test_time - 1))
|
||||
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': normalize_timestamp(test_time - 94),
|
||||
'X-Delete-At': delete_at_timestamp,
|
||||
'Content-Length': '4',
|
||||
'Content-Type': 'application/octet-stream'})
|
||||
req.body = 'TEST'
|
||||
resp = self.object_controller.PUT(req)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
|
||||
req = Request.blank('/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'DELETE'},
|
||||
headers={'X-Timestamp': normalize_timestamp(test_time - 92),
|
||||
'X-If-Delete-At': str(int(test_time))})
|
||||
resp = self.object_controller.DELETE(req)
|
||||
self.assertEquals(resp.status_int, 412)
|
||||
|
||||
req = Request.blank('/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'DELETE'},
|
||||
headers={'X-Timestamp': normalize_timestamp(test_time - 92),
|
||||
'X-If-Delete-At': delete_at_timestamp})
|
||||
resp = self.object_controller.DELETE(req)
|
||||
self.assertEquals(resp.status_int, 204)
|
||||
|
||||
def test_DELETE_calls_delete_at(self):
|
||||
given_args = []
|
||||
|
||||
def fake_delete_at_update(*args):
|
||||
given_args.extend(args)
|
||||
|
||||
self.object_controller.delete_at_update = fake_delete_at_update
|
||||
|
||||
timestamp1 = normalize_timestamp(time())
|
||||
delete_at_timestamp1 = str(int(time() + 1000))
|
||||
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': timestamp1,
|
||||
'Content-Length': '4',
|
||||
'Content-Type': 'application/octet-stream',
|
||||
'X-Delete-At': delete_at_timestamp1})
|
||||
req.body = 'TEST'
|
||||
resp = self.object_controller.PUT(req)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
self.assertEquals(given_args, [
|
||||
'PUT', int(delete_at_timestamp1), 'a', 'c', 'o',
|
||||
{'X-Delete-At': delete_at_timestamp1,
|
||||
'Content-Length': '4',
|
||||
'Content-Type': 'application/octet-stream',
|
||||
'X-Timestamp': timestamp1,
|
||||
'Host': 'localhost:80'},
|
||||
'sda1'])
|
||||
|
||||
while given_args:
|
||||
given_args.pop()
|
||||
|
||||
sleep(.00001)
|
||||
timestamp2 = normalize_timestamp(time())
|
||||
req = Request.blank('/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'DELETE'},
|
||||
headers={'X-Timestamp': timestamp2,
|
||||
'Content-Type': 'application/octet-stream'})
|
||||
resp = self.object_controller.DELETE(req)
|
||||
self.assertEquals(resp.status_int, 204)
|
||||
self.assertEquals(given_args, [
|
||||
'DELETE', int(delete_at_timestamp1), 'a', 'c', 'o',
|
||||
{'Content-Type': 'application/octet-stream',
|
||||
'Host': 'localhost:80', 'X-Timestamp': timestamp2},
|
||||
'sda1'])
|
||||
|
||||
def test_PUT_delete_at_in_past(self):
|
||||
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': normalize_timestamp(time()),
|
||||
'X-Delete-At': str(int(time() - 1)),
|
||||
'Content-Length': '4',
|
||||
'Content-Type': 'application/octet-stream'})
|
||||
req.body = 'TEST'
|
||||
resp = self.object_controller.PUT(req)
|
||||
self.assertEquals(resp.status_int, 400)
|
||||
self.assertTrue('X-Delete-At in past' in resp.body)
|
||||
|
||||
def test_POST_delete_at_in_past(self):
|
||||
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': normalize_timestamp(time()),
|
||||
'Content-Length': '4',
|
||||
'Content-Type': 'application/octet-stream'})
|
||||
req.body = 'TEST'
|
||||
resp = self.object_controller.PUT(req)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
|
||||
req = Request.blank('/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'X-Timestamp': normalize_timestamp(time() + 1),
|
||||
'X-Delete-At': str(int(time() - 1))})
|
||||
resp = self.object_controller.POST(req)
|
||||
self.assertEquals(resp.status_int, 400)
|
||||
self.assertTrue('X-Delete-At in past' in resp.body)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -2872,6 +2872,203 @@ class TestObjectController(unittest.TestCase):
|
||||
res = controller.COPY(req)
|
||||
self.assert_(called[0])
|
||||
|
||||
def test_POST_converts_delete_after_to_delete_at(self):
|
||||
with save_globals():
|
||||
controller = proxy_server.ObjectController(self.app, 'account',
|
||||
'container', 'object')
|
||||
proxy_server.http_connect = \
|
||||
fake_http_connect(200, 200, 200, 200, 200, 202, 202, 202)
|
||||
self.app.memcache.store = {}
|
||||
orig_time = proxy_server.time.time
|
||||
try:
|
||||
t = time()
|
||||
proxy_server.time.time = lambda: t
|
||||
req = Request.blank('/a/c/o', {},
|
||||
headers={'Content-Type': 'foo/bar', 'X-Delete-After': '60'})
|
||||
self.app.update_request(req)
|
||||
res = controller.POST(req)
|
||||
self.assertEquals(res.status, '202 Fake')
|
||||
self.assertEquals(req.headers.get('x-delete-at'),
|
||||
str(int(t + 60)))
|
||||
|
||||
self.app.object_post_as_copy = False
|
||||
controller = proxy_server.ObjectController(self.app, 'account',
|
||||
'container', 'object')
|
||||
proxy_server.http_connect = \
|
||||
fake_http_connect(200, 200, 202, 202, 202)
|
||||
self.app.memcache.store = {}
|
||||
req = Request.blank('/a/c/o', {},
|
||||
headers={'Content-Type': 'foo/bar', 'X-Delete-After': '60'})
|
||||
self.app.update_request(req)
|
||||
res = controller.POST(req)
|
||||
self.assertEquals(res.status, '202 Fake')
|
||||
self.assertEquals(req.headers.get('x-delete-at'),
|
||||
str(int(t + 60)))
|
||||
finally:
|
||||
proxy_server.time.time = orig_time
|
||||
|
||||
|
||||
def test_POST_non_int_delete_after(self):
|
||||
with save_globals():
|
||||
controller = proxy_server.ObjectController(self.app, 'account',
|
||||
'container', 'object')
|
||||
proxy_server.http_connect = \
|
||||
fake_http_connect(200, 200, 200, 200, 200, 202, 202, 202)
|
||||
self.app.memcache.store = {}
|
||||
req = Request.blank('/a/c/o', {},
|
||||
headers={'Content-Type': 'foo/bar', 'X-Delete-After': '60.1'})
|
||||
self.app.update_request(req)
|
||||
res = controller.POST(req)
|
||||
self.assertEquals(res.status, '400 Bad Request')
|
||||
self.assertTrue('Non-integer X-Delete-After' in res.body)
|
||||
|
||||
def test_POST_negative_delete_after(self):
|
||||
with save_globals():
|
||||
controller = proxy_server.ObjectController(self.app, 'account',
|
||||
'container', 'object')
|
||||
proxy_server.http_connect = \
|
||||
fake_http_connect(200, 200, 200, 200, 200, 202, 202, 202)
|
||||
self.app.memcache.store = {}
|
||||
req = Request.blank('/a/c/o', {},
|
||||
headers={'Content-Type': 'foo/bar', 'X-Delete-After': '-60'})
|
||||
self.app.update_request(req)
|
||||
res = controller.POST(req)
|
||||
self.assertEquals(res.status, '400 Bad Request')
|
||||
self.assertTrue('X-Delete-At in past' in res.body)
|
||||
|
||||
def test_POST_delete_at(self):
|
||||
with save_globals():
|
||||
given_headers = {}
|
||||
|
||||
def fake_make_requests(req, ring, part, method, path, headers,
|
||||
query_string=''):
|
||||
given_headers.update(headers[0])
|
||||
|
||||
self.app.object_post_as_copy = False
|
||||
controller = proxy_server.ObjectController(self.app, 'account',
|
||||
'container', 'object')
|
||||
controller.make_requests = fake_make_requests
|
||||
proxy_server.http_connect = fake_http_connect(200, 200)
|
||||
self.app.memcache.store = {}
|
||||
t = str(int(time() + 100))
|
||||
req = Request.blank('/a/c/o', {},
|
||||
headers={'Content-Type': 'foo/bar', 'X-Delete-At': t})
|
||||
self.app.update_request(req)
|
||||
controller.POST(req)
|
||||
self.assertEquals(given_headers.get('X-Delete-At'), t)
|
||||
self.assertTrue('X-Delete-At-Host' in given_headers)
|
||||
self.assertTrue('X-Delete-At-Device' in given_headers)
|
||||
self.assertTrue('X-Delete-At-Partition' in given_headers)
|
||||
|
||||
t = str(int(time() + 100)) + '.1'
|
||||
req = Request.blank('/a/c/o', {},
|
||||
headers={'Content-Type': 'foo/bar', 'X-Delete-At': t})
|
||||
self.app.update_request(req)
|
||||
resp = controller.POST(req)
|
||||
self.assertEquals(resp.status_int, 400)
|
||||
self.assertTrue('Non-integer X-Delete-At' in resp.body)
|
||||
|
||||
t = str(int(time() - 100))
|
||||
req = Request.blank('/a/c/o', {},
|
||||
headers={'Content-Type': 'foo/bar', 'X-Delete-At': t})
|
||||
self.app.update_request(req)
|
||||
resp = controller.POST(req)
|
||||
self.assertEquals(resp.status_int, 400)
|
||||
self.assertTrue('X-Delete-At in past' in resp.body)
|
||||
|
||||
def test_PUT_converts_delete_after_to_delete_at(self):
|
||||
with save_globals():
|
||||
controller = proxy_server.ObjectController(self.app, 'account',
|
||||
'container', 'object')
|
||||
proxy_server.http_connect = \
|
||||
fake_http_connect(200, 200, 201, 201, 201)
|
||||
self.app.memcache.store = {}
|
||||
orig_time = proxy_server.time.time
|
||||
try:
|
||||
t = time()
|
||||
proxy_server.time.time = lambda: t
|
||||
req = Request.blank('/a/c/o', {},
|
||||
headers={'Content-Length': '0', 'Content-Type': 'foo/bar',
|
||||
'X-Delete-After': '60'})
|
||||
self.app.update_request(req)
|
||||
res = controller.PUT(req)
|
||||
self.assertEquals(res.status, '201 Fake')
|
||||
self.assertEquals(req.headers.get('x-delete-at'),
|
||||
str(int(t + 60)))
|
||||
finally:
|
||||
proxy_server.time.time = orig_time
|
||||
|
||||
def test_PUT_non_int_delete_after(self):
|
||||
with save_globals():
|
||||
controller = proxy_server.ObjectController(self.app, 'account',
|
||||
'container', 'object')
|
||||
proxy_server.http_connect = \
|
||||
fake_http_connect(200, 200, 201, 201, 201)
|
||||
self.app.memcache.store = {}
|
||||
req = Request.blank('/a/c/o', {},
|
||||
headers={'Content-Length': '0', 'Content-Type': 'foo/bar',
|
||||
'X-Delete-After': '60.1'})
|
||||
self.app.update_request(req)
|
||||
res = controller.PUT(req)
|
||||
self.assertEquals(res.status, '400 Bad Request')
|
||||
self.assertTrue('Non-integer X-Delete-After' in res.body)
|
||||
|
||||
def test_PUT_negative_delete_after(self):
|
||||
with save_globals():
|
||||
controller = proxy_server.ObjectController(self.app, 'account',
|
||||
'container', 'object')
|
||||
proxy_server.http_connect = \
|
||||
fake_http_connect(200, 200, 201, 201, 201)
|
||||
self.app.memcache.store = {}
|
||||
req = Request.blank('/a/c/o', {},
|
||||
headers={'Content-Length': '0', 'Content-Type': 'foo/bar',
|
||||
'X-Delete-After': '-60'})
|
||||
self.app.update_request(req)
|
||||
res = controller.PUT(req)
|
||||
self.assertEquals(res.status, '400 Bad Request')
|
||||
self.assertTrue('X-Delete-At in past' in res.body)
|
||||
|
||||
def test_PUT_delete_at(self):
|
||||
with save_globals():
|
||||
given_headers = {}
|
||||
|
||||
def fake_connect_put_node(nodes, part, path, headers):
|
||||
given_headers.update(headers)
|
||||
|
||||
controller = proxy_server.ObjectController(self.app, 'account',
|
||||
'container', 'object')
|
||||
controller._connect_put_node = fake_connect_put_node
|
||||
proxy_server.http_connect = fake_http_connect(200, 200)
|
||||
self.app.memcache.store = {}
|
||||
t = str(int(time() + 100))
|
||||
req = Request.blank('/a/c/o', {},
|
||||
headers={'Content-Length': '0', 'Content-Type': 'foo/bar',
|
||||
'X-Delete-At': t})
|
||||
self.app.update_request(req)
|
||||
controller.PUT(req)
|
||||
self.assertEquals(given_headers.get('X-Delete-At'), t)
|
||||
self.assertTrue('X-Delete-At-Host' in given_headers)
|
||||
self.assertTrue('X-Delete-At-Device' in given_headers)
|
||||
self.assertTrue('X-Delete-At-Partition' in given_headers)
|
||||
|
||||
t = str(int(time() + 100)) + '.1'
|
||||
req = Request.blank('/a/c/o', {},
|
||||
headers={'Content-Length': '0', 'Content-Type': 'foo/bar',
|
||||
'X-Delete-At': t})
|
||||
self.app.update_request(req)
|
||||
resp = controller.PUT(req)
|
||||
self.assertEquals(resp.status_int, 400)
|
||||
self.assertTrue('Non-integer X-Delete-At' in resp.body)
|
||||
|
||||
t = str(int(time() - 100))
|
||||
req = Request.blank('/a/c/o', {},
|
||||
headers={'Content-Length': '0', 'Content-Type': 'foo/bar',
|
||||
'X-Delete-At': t})
|
||||
self.app.update_request(req)
|
||||
resp = controller.PUT(req)
|
||||
self.assertEquals(resp.status_int, 400)
|
||||
self.assertTrue('X-Delete-At in past' in resp.body)
|
||||
|
||||
|
||||
class TestContainerController(unittest.TestCase):
|
||||
"Test swift.proxy_server.ContainerController"
|
||||
|
Loading…
x
Reference in New Issue
Block a user