Initial work on container syncing

This commit is contained in:
gholt 2011-02-23 23:26:05 -08:00
parent aa14afe2bb
commit 305e4b41f5
12 changed files with 509 additions and 78 deletions

127
bin/st
View File

@ -30,22 +30,30 @@ from time import sleep
import socket
from cStringIO import StringIO
from httplib import HTTPException, HTTPSConnection
from httplib import HTTPException
from re import compile, DOTALL
from tokenize import generate_tokens, STRING, NAME, OP
from urllib import quote as _quote, unquote
from urlparse import urlparse, urlunparse
try:
from eventlet.green.httplib import HTTPSConnection
except ImportError:
from httplib import HTTPSConnection
try:
from eventlet import sleep
except Exception:
except ImportError:
from time import sleep
try:
from swift.common.bufferedhttp \
import BufferedHTTPConnection as HTTPConnection
except Exception:
from httplib import HTTPConnection
except ImportError:
try:
from eventlet.green.httplib import HTTPConnection
except ImportError:
from httplib import HTTPConnection
def quote(value, safe='/'):
@ -226,7 +234,7 @@ def get_account(url, token, marker=None, limit=None, prefix=None,
listing = \
get_account(url, token, marker, limit, prefix, http_conn)[1]
if listing:
rv.extend(listing)
rv[1].extend(listing)
return rv
parsed, conn = http_conn
qs = 'format=json'
@ -570,9 +578,9 @@ def head_object(url, token, container, name, http_conn=None):
return resp_headers
def put_object(url, token, container, name, contents, content_length=None,
etag=None, chunk_size=65536, content_type=None, headers=None,
http_conn=None):
def put_object(url, token=None, container=None, name=None, contents=None,
content_length=None, etag=None, chunk_size=65536,
content_type=None, headers=None, http_conn=None):
"""
Put an object
@ -596,10 +604,17 @@ def put_object(url, token, container, name, contents, content_length=None,
parsed, conn = http_conn
else:
parsed, conn = http_connection(url)
path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
if not headers:
path = parsed.path
if container:
path = '%s/%s' % (path.rstrip('/'), quote(container))
if name:
path = '%s/%s' % (path.rstrip('/'), quote(name))
if headers:
headers = dict(headers)
else:
headers = {}
headers['X-Auth-Token'] = token
if token:
headers['X-Auth-Token'] = token
if etag:
headers['ETag'] = etag.strip('"')
if content_length is not None:
@ -638,7 +653,7 @@ def put_object(url, token, container, name, contents, content_length=None,
raise ClientException('Object PUT failed', http_scheme=parsed.scheme,
http_host=conn.host, http_port=conn.port, http_path=path,
http_status=resp.status, http_reason=resp.reason)
return resp.getheader('etag').strip('"')
return resp.getheader('etag', '').strip('"')
def post_object(url, token, container, name, headers, http_conn=None):
@ -669,7 +684,8 @@ def post_object(url, token, container, name, headers, http_conn=None):
http_status=resp.status, http_reason=resp.reason)
def delete_object(url, token, container, name, http_conn=None):
def delete_object(url, token=None, container=None, name=None, http_conn=None,
headers=None):
"""
Delete object
@ -685,8 +701,18 @@ def delete_object(url, token, container, name, http_conn=None):
parsed, conn = http_conn
else:
parsed, conn = http_connection(url)
path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
conn.request('DELETE', path, '', {'X-Auth-Token': token})
path = parsed.path
if container:
path = '%s/%s' % (path.rstrip('/'), quote(container))
if name:
path = '%s/%s' % (path.rstrip('/'), quote(name))
if headers:
headers = dict(headers)
else:
headers = {}
if token:
headers['X-Auth-Token'] = token
conn.request('DELETE', path, '', headers)
resp = conn.getresponse()
resp.read()
if resp.status < 200 or resp.status >= 300:
@ -700,7 +726,7 @@ class Connection(object):
"""Convenience class to make requests that will also retry the request"""
def __init__(self, authurl, user, key, retries=5, preauthurl=None,
preauthtoken=None, snet=False):
preauthtoken=None, snet=False, starting_backoff=1):
"""
:param authurl: authenitcation URL
:param user: user name to authenticate as
@ -720,6 +746,7 @@ class Connection(object):
self.token = preauthtoken
self.attempts = 0
self.snet = snet
self.starting_backoff = starting_backoff
def get_auth(self):
return get_auth(self.authurl, self.user, self.key, snet=self.snet)
@ -727,9 +754,9 @@ class Connection(object):
def http_connection(self):
return http_connection(self.url)
def _retry(self, func, *args, **kwargs):
def _retry(self, reset_func, func, *args, **kwargs):
self.attempts = 0
backoff = 1
backoff = self.starting_backoff
while self.attempts <= self.retries:
self.attempts += 1
try:
@ -758,10 +785,12 @@ class Connection(object):
raise
sleep(backoff)
backoff *= 2
if reset_func:
reset_func(func, *args, **kwargs)
def head_account(self):
"""Wrapper for :func:`head_account`"""
return self._retry(head_account)
return self._retry(None, head_account)
def get_account(self, marker=None, limit=None, prefix=None,
full_listing=False):
@ -769,16 +798,16 @@ class Connection(object):
# TODO(unknown): With full_listing=True this will restart the entire
# listing with each retry. Need to make a better version that just
# retries where it left off.
return self._retry(get_account, marker=marker, limit=limit,
return self._retry(None, get_account, marker=marker, limit=limit,
prefix=prefix, full_listing=full_listing)
def post_account(self, headers):
"""Wrapper for :func:`post_account`"""
return self._retry(post_account, headers)
return self._retry(None, post_account, headers)
def head_container(self, container):
"""Wrapper for :func:`head_container`"""
return self._retry(head_container, container)
return self._retry(None, head_container, container)
def get_container(self, container, marker=None, limit=None, prefix=None,
delimiter=None, full_listing=False):
@ -786,46 +815,58 @@ class Connection(object):
# TODO(unknown): With full_listing=True this will restart the entire
# listing with each retry. Need to make a better version that just
# retries where it left off.
return self._retry(get_container, container, marker=marker,
return self._retry(None, get_container, container, marker=marker,
limit=limit, prefix=prefix, delimiter=delimiter,
full_listing=full_listing)
def put_container(self, container, headers=None):
"""Wrapper for :func:`put_container`"""
return self._retry(put_container, container, headers=headers)
return self._retry(None, put_container, container, headers=headers)
def post_container(self, container, headers):
"""Wrapper for :func:`post_container`"""
return self._retry(post_container, container, headers)
return self._retry(None, post_container, container, headers)
def delete_container(self, container):
"""Wrapper for :func:`delete_container`"""
return self._retry(delete_container, container)
return self._retry(None, delete_container, container)
def head_object(self, container, obj):
"""Wrapper for :func:`head_object`"""
return self._retry(head_object, container, obj)
return self._retry(None, head_object, container, obj)
def get_object(self, container, obj, resp_chunk_size=None):
"""Wrapper for :func:`get_object`"""
return self._retry(get_object, container, obj,
return self._retry(None, get_object, container, obj,
resp_chunk_size=resp_chunk_size)
def put_object(self, container, obj, contents, content_length=None,
etag=None, chunk_size=65536, content_type=None,
headers=None):
"""Wrapper for :func:`put_object`"""
return self._retry(put_object, container, obj, contents,
def _default_reset(*args, **kwargs):
raise ClientException('put_object(%r, %r, ...) failure and no '
'ability to reset contents for reupload.' % (container, obj))
reset_func = _default_reset
tell = getattr(contents, 'tell', None)
seek = getattr(contents, 'seek', None)
if tell and seek:
orig_pos = tell()
reset_func = lambda *a, **k: seek(orig_pos)
return self._retry(reset_func, put_object, container, obj, contents,
content_length=content_length, etag=etag, chunk_size=chunk_size,
content_type=content_type, headers=headers)
def post_object(self, container, obj, headers):
"""Wrapper for :func:`post_object`"""
return self._retry(post_object, container, obj, headers)
return self._retry(None, post_object, container, obj, headers)
def delete_object(self, container, obj):
"""Wrapper for :func:`delete_object`"""
return self._retry(delete_object, container, obj)
return self._retry(None, delete_object, container, obj)
# End inclusion of swift.common.client
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
@ -1297,10 +1338,14 @@ Container: %s
Objects: %d
Bytes: %d
Read ACL: %s
Write ACL: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0],
Write ACL: %s
Sync To: %s
Sync Key: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0],
object_count, bytes_used,
headers.get('x-container-read', ''),
headers.get('x-container-write', '')))
headers.get('x-container-write', ''),
headers.get('x-container-sync-to', ''),
headers.get('x-container-sync-key', '')))
for key, value in headers.items():
if key.startswith('x-container-meta-'):
print_queue.put('%9s: %s' % ('Meta %s' %
@ -1309,7 +1354,8 @@ Write ACL: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0],
if not key.startswith('x-container-meta-') and key not in (
'content-length', 'date', 'x-container-object-count',
'x-container-bytes-used', 'x-container-read',
'x-container-write'):
'x-container-write', 'x-container-sync-to',
'x-container-sync-key'):
print_queue.put(
'%9s: %s' % (key.title(), value))
except ClientException, err:
@ -1374,13 +1420,18 @@ def st_post(options, args, print_queue, error_queue):
parser.add_option('-w', '--write-acl', dest='write_acl', help='Sets the '
'Write ACL for containers. Quick summary of ACL syntax: account1, '
'account2:user2')
parser.add_option('-t', '--sync-to', dest='sync_to', help='Sets the '
'Sync To for containers, for multi-cluster replication.')
parser.add_option('-k', '--sync-key', dest='sync_key', help='Sets the '
'Sync Key for containers, for multi-cluster replication.')
parser.add_option('-m', '--meta', action='append', dest='meta', default=[],
help='Sets a meta data item with the syntax name:value. This option '
'may be repeated. Example: -m Color:Blue -m Size:Large')
(options, args) = parse_args(parser, args)
args = args[1:]
if (options.read_acl or options.write_acl) and not args:
exit('-r and -w options only allowed for containers')
if (options.read_acl or options.write_acl or options.sync_to or
options.sync_key) and not args:
exit('-r, -w, -t, and -k options only allowed for containers')
conn = Connection(options.auth, options.user, options.key)
if not args:
headers = {}
@ -1408,6 +1459,10 @@ def st_post(options, args, print_queue, error_queue):
headers['X-Container-Read'] = options.read_acl
if options.write_acl is not None:
headers['X-Container-Write'] = options.write_acl
if options.sync_to is not None:
headers['X-Container-Sync-To'] = options.sync_to
if options.sync_key is not None:
headers['X-Container-Sync-Key'] = options.sync_key
try:
conn.post_container(args[0], headers=headers)
except ClientException, err:

23
bin/swift-container-sync Executable file
View File

@ -0,0 +1,23 @@
#!/usr/bin/python
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from swift.container.sync import ContainerSync
from swift.common.utils import parse_options
from swift.common.daemon import run_daemon
if __name__ == '__main__':
conf_file, options = parse_options(once=True)
run_daemon(ContainerSync, conf_file, **options)

View File

@ -60,3 +60,13 @@ use = egg:swift#container
# log_level = INFO
# Will audit, at most, 1 container per device per interval
# interval = 1800
[container-sync]
# You can override the default log routing for this app here (don't use set!):
# log_name = container-sync
# log_facility = LOG_LOCAL0
# log_level = INFO
# Will sync, at most, each container once per interval
# interval = 300
# Maximum amount of time to spend syncing each container
# container_time = 60

View File

@ -83,7 +83,7 @@ setup(
'bin/swift-auth-recreate-accounts', 'bin/swift-auth-server',
'bin/swift-auth-update-reseller-prefixes',
'bin/swift-container-auditor',
'bin/swift-container-replicator',
'bin/swift-container-replicator', 'bin/swift-container-sync',
'bin/swift-container-server', 'bin/swift-container-updater',
'bin/swift-drive-audit', 'bin/swift-get-nodes',
'bin/swift-init', 'bin/swift-object-auditor',

View File

@ -566,9 +566,9 @@ def head_object(url, token, container, name, http_conn=None):
return resp_headers
def put_object(url, token, container, name, contents, content_length=None,
etag=None, chunk_size=65536, content_type=None, headers=None,
http_conn=None):
def put_object(url, token=None, container=None, name=None, contents=None,
content_length=None, etag=None, chunk_size=65536,
content_type=None, headers=None, http_conn=None):
"""
Put an object
@ -592,10 +592,17 @@ def put_object(url, token, container, name, contents, content_length=None,
parsed, conn = http_conn
else:
parsed, conn = http_connection(url)
path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
if not headers:
path = parsed.path
if container:
path = '%s/%s' % (path.rstrip('/'), quote(container))
if name:
path = '%s/%s' % (path.rstrip('/'), quote(name))
if headers:
headers = dict(headers)
else:
headers = {}
headers['X-Auth-Token'] = token
if token:
headers['X-Auth-Token'] = token
if etag:
headers['ETag'] = etag.strip('"')
if content_length is not None:
@ -634,7 +641,7 @@ def put_object(url, token, container, name, contents, content_length=None,
raise ClientException('Object PUT failed', http_scheme=parsed.scheme,
http_host=conn.host, http_port=conn.port, http_path=path,
http_status=resp.status, http_reason=resp.reason)
return resp.getheader('etag').strip('"')
return resp.getheader('etag', '').strip('"')
def post_object(url, token, container, name, headers, http_conn=None):
@ -665,7 +672,8 @@ def post_object(url, token, container, name, headers, http_conn=None):
http_status=resp.status, http_reason=resp.reason)
def delete_object(url, token, container, name, http_conn=None):
def delete_object(url, token=None, container=None, name=None, http_conn=None,
headers=None):
"""
Delete object
@ -681,8 +689,18 @@ def delete_object(url, token, container, name, http_conn=None):
parsed, conn = http_conn
else:
parsed, conn = http_connection(url)
path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
conn.request('DELETE', path, '', {'X-Auth-Token': token})
path = parsed.path
if container:
path = '%s/%s' % (path.rstrip('/'), quote(container))
if name:
path = '%s/%s' % (path.rstrip('/'), quote(name))
if headers:
headers = dict(headers)
else:
headers = {}
if token:
headers['X-Auth-Token'] = token
conn.request('DELETE', path, '', headers)
resp = conn.getresponse()
resp.read()
if resp.status < 200 or resp.status >= 300:

View File

@ -31,9 +31,10 @@ RUN_DIR = '/var/run/swift'
# auth-server has been removed from ALL_SERVERS, start it explicitly
ALL_SERVERS = ['account-auditor', 'account-server', 'container-auditor',
'container-replicator', 'container-server', 'container-updater',
'object-auditor', 'object-server', 'object-replicator', 'object-updater',
'proxy-server', 'account-replicator', 'account-reaper']
'container-replicator', 'container-server', 'container-sync',
'container-updater', 'object-auditor', 'object-server',
'object-replicator', 'object-updater', 'proxy-server',
'account-replicator', 'account-reaper']
MAIN_SERVERS = ['proxy-server', 'account-server', 'container-server',
'object-server']
REST_SERVERS = [s for s in ALL_SERVERS if s not in MAIN_SERVERS]

View File

@ -176,11 +176,20 @@ class DevAuth(object):
return self.denied_response(req)
user_groups = (req.remote_user or '').split(',')
if '.reseller_admin' in user_groups:
req.environ['swift_owner'] = True
return None
if account in user_groups and \
(req.method not in ('DELETE', 'PUT') or container):
# If the user is admin for the account and is not trying to do an
# account DELETE or PUT...
req.environ['swift_owner'] = True
return None
# TODO: Restrict this further to only authenticated folks in the .sync
# group. Currently, anybody with the x-container-sync-key can do a
# sync.
if 'swift_sync_key' in req.environ and \
req.environ['swift_sync_key'] == \
req.headers.get('x-container-sync-key', None):
return None
referrers, groups = parse_acl(getattr(req, 'acl', None))
if referrer_allowed(req.referer, referrers):

View File

@ -269,11 +269,20 @@ class Swauth(object):
if '.reseller_admin' in user_groups and \
account != self.reseller_prefix and \
account[len(self.reseller_prefix)] != '.':
req.environ['swift_owner'] = True
return None
if account in user_groups and \
(req.method not in ('DELETE', 'PUT') or container):
# If the user is admin for the account and is not trying to do an
# account DELETE or PUT...
req.environ['swift_owner'] = True
return None
# TODO: Restrict this further to only authenticated folks in the .sync
# group. Currently, anybody with the x-container-sync-key can do a
# sync.
if 'swift_sync_key' in req.environ and \
req.environ['swift_sync_key'] == \
req.headers.get('x-container-sync-key', None):
return None
referrers, groups = parse_acl(getattr(req, 'acl', None))
if referrer_allowed(req.referer, referrers):

View File

@ -46,7 +46,8 @@ class ContainerController(object):
"""WSGI Controller for the container server."""
# Ensure these are all lowercase
save_headers = ['x-container-read', 'x-container-write']
save_headers = ['x-container-read', 'x-container-write',
'x-container-sync-key', 'x-container-sync-to']
def __init__(self, conf):
self.logger = get_logger(conf, log_route='container-server')
@ -232,7 +233,8 @@ class ContainerController(object):
}
headers.update((key, value)
for key, (value, timestamp) in broker.metadata.iteritems()
if value != '')
if value != '' and (key.lower() in self.save_headers or
key.lower().startswith('x-container-meta-')))
return HTTPNoContent(request=req, headers=headers)
def GET(self, req):
@ -259,7 +261,8 @@ class ContainerController(object):
}
resp_headers.update((key, value)
for key, (value, timestamp) in broker.metadata.iteritems()
if value != '')
if value != '' and (key.lower() in self.save_headers or
key.lower().startswith('x-container-meta-')))
try:
path = get_param(req, 'path')
prefix = get_param(req, 'prefix')

245
swift/container/sync.py Normal file
View File

@ -0,0 +1,245 @@
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
from random import random, shuffle
from swift.container import server as container_server
from swift.common import client, direct_client
from swift.common.ring import Ring
from swift.common.db import ContainerBroker
from swift.common.utils import audit_location_generator, get_logger, \
normalize_timestamp, TRUE_VALUES
from swift.common.daemon import Daemon
class Iter2FileLikeObject(object):
def __init__(self, iterator):
self.iterator = iterator
self._chunk = ''
def read(self, size=-1):
if size < 0:
chunk = self._chunk
self._chunk = ''
return chunk + ''.join(self.iterator)
chunk = ''
try:
chunk = self.iterator.next()
except StopIteration:
pass
if len(chunk) <= size:
return chunk
self._chunk = chunk[size:]
return chunk[:size]
class ContainerSync(Daemon):
"""Sync syncable containers."""
def __init__(self, conf, object_ring=None):
self.conf = conf
self.logger = get_logger(conf, log_route='container-sync')
self.devices = conf.get('devices', '/srv/node')
self.mount_check = \
conf.get('mount_check', 'true').lower() in TRUE_VALUES
self.interval = int(conf.get('interval', 300))
self.container_time = int(conf.get('container_time', 60))
swift_dir = conf.get('swift_dir', '/etc/swift')
self.container_syncs = 0
self.container_deletes = 0
self.container_puts = 0
self.container_skips = 0
self.container_failures = 0
self.reported = time.time()
self.object_ring = object_ring or \
Ring(os.path.join(swift_dir, 'object.ring.gz'))
def run_forever(self):
"""Run the container sync until stopped."""
time.sleep(random() * self.interval)
while True:
begin = time.time()
all_locs = audit_location_generator(self.devices,
container_server.DATADIR,
mount_check=self.mount_check,
logger=self.logger)
for path, device, partition in all_locs:
self.container_sync(path)
if time.time() - self.reported >= 3600: # once an hour
self._report()
elapsed = time.time() - begin
if elapsed < self.interval:
time.sleep(self.interval - elapsed)
def run_once(self):
"""Run the container sync once."""
self.logger.info(_('Begin container sync "once" mode'))
begin = time.time()
all_locs = audit_location_generator(self.devices,
container_server.DATADIR,
mount_check=self.mount_check,
logger=self.logger)
for path, device, partition in all_locs:
self.container_sync(path)
if time.time() - self.reported >= 3600: # once an hour
self._report()
self._report()
elapsed = time.time() - begin
self.logger.info(
_('Container sync "once" mode completed: %.02fs'), elapsed)
def _report(self):
self.logger.info(
_('Since %(time)s: %(sync)s synced [%(delete)s deletes, %(put)s '
'puts], %(skip)s skipped, %(fail)s failed'),
{'time': time.ctime(self.reported),
'sync': self.container_syncs,
'delete': self.container_deletes,
'put': self.container_puts,
'skip': self.container_skips,
'fail': self.container_failures})
self.reported = time.time()
self.container_syncs = 0
self.container_deletes = 0
self.container_puts = 0
self.container_skips = 0
self.container_failures = 0
def container_sync(self, path):
"""
Syncs the given container path
:param path: the path to a container db
"""
try:
if not path.endswith('.db'):
return
broker = ContainerBroker(path)
info = broker.get_info()
if not broker.is_deleted():
sync_to = None
sync_key = None
sync_row = -1
for key, (value, timestamp) in broker.metadata.iteritems():
if key.lower() == 'x-container-sync-to':
sync_to = value
elif key.lower() == 'x-container-sync-key':
sync_key = value
# TODO: Make this a separate column, not a metadata item.
# Each db should track what it has synced separately and
# these metadata get ovewritten by newer values from other
# dbs. Also, once a new column, it'll need special
# attention when doing a fresh db copy.
elif key.lower() == 'x-container-sync-row':
sync_row = int(value)
if not sync_to or not sync_key:
self.container_skips += 1
return
sync_to = sync_to.rstrip('/')
stop_at = time.time() + self.container_time
while time.time() < stop_at:
rows = broker.get_items_since(sync_row, 1)
if not rows:
break
if not self.container_sync_row(rows[0], sync_to, sync_key,
broker, info):
return
sync_row = rows[0]['ROWID']
broker.update_metadata({'X-Container-Sync-Row':
(str(sync_row), normalize_timestamp(time.time()))})
self.container_syncs += 1
except Exception:
self.container_failures += 1
self.logger.exception(_('ERROR Syncing %s'), (broker.db_file))
def container_sync_row(self, row, sync_to, sync_key, broker, info):
try:
if row['deleted']:
try:
client.delete_object(sync_to, name=row['name'],
headers={'X-Timestamp': row['created_at'],
'X-Container-Sync-Key': sync_key})
except client.ClientException, err:
if err.http_status != 404:
raise
self.container_deletes += 1
else:
part, nodes = self.object_ring.get_nodes(
info['account'], info['container'],
row['name'])
shuffle(nodes)
exc = None
for node in nodes:
try:
headers, body = \
direct_client.direct_get_object(node, part,
info['account'], info['container'],
row['name'], resp_chunk_size=65536)
break
except client.ClientException, err:
exc = err
else:
if exc:
raise exc
raise Exception(_('Unknown exception trying to GET: '
'%(node)r %(account)r %(container)r %(object)r'),
{'node': node, 'part': part,
'account': info['account'],
'container': info['container'],
'object': row['name']})
for key in ('date', 'last-modified'):
if key in headers:
del headers[key]
if 'etag' in headers:
headers['etag'] = headers['etag'].strip('"')
headers['X-Timestamp'] = row['created_at']
headers['X-Container-Sync-Key'] = sync_key
client.put_object(sync_to, name=row['name'],
headers=headers,
contents=Iter2FileLikeObject(body))
self.container_puts += 1
except client.ClientException, err:
if err.http_status == 401:
self.logger.info(_('Unauth %(sync_from)r '
'=> %(sync_to)r key: %(sync_key)r'),
{'sync_from': '%s/%s' %
(client.quote(info['account']),
client.quote(info['container'])),
'sync_to': sync_to,
'sync_key': sync_key})
elif err.http_status == 404:
self.logger.info(_('Not found %(sync_from)r '
'=> %(sync_to)r key: %(sync_key)r'),
{'sync_from': '%s/%s' %
(client.quote(info['account']),
client.quote(info['container'])),
'sync_to': sync_to,
'sync_key': sync_key})
else:
self.logger.exception(
_('ERROR Syncing %(db_file)s %(row)s'),
{'db_file': broker.db_file, 'row': row})
self.container_failures += 1
return False
except Exception:
self.logger.exception(
_('ERROR Syncing %(db_file)s %(row)s'),
{'db_file': broker.db_file, 'row': row})
self.container_failures += 1
return False
return True

View File

@ -376,6 +376,7 @@ class ObjectController(object):
return error_response
file = DiskFile(self.devices, device, partition, account, container,
obj, disk_chunk_size=self.disk_chunk_size)
orig_timestamp = file.metadata.get('X-Timestamp')
upload_expiration = time.time() + self.max_upload_time
etag = md5()
upload_size = 0
@ -422,13 +423,16 @@ class ObjectController(object):
request.headers['Content-Encoding']
file.put(fd, tmppath, metadata)
file.unlinkold(metadata['X-Timestamp'])
self.container_update('PUT', account, container, obj, request.headers,
{'x-size': file.metadata['Content-Length'],
'x-content-type': file.metadata['Content-Type'],
'x-timestamp': file.metadata['X-Timestamp'],
'x-etag': file.metadata['ETag'],
'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')},
device)
if not orig_timestamp or \
orig_timestamp < request.headers['x-timestamp']:
self.container_update('PUT', account, container, obj,
request.headers,
{'x-size': file.metadata['Content-Length'],
'x-content-type': file.metadata['Content-Type'],
'x-timestamp': file.metadata['X-Timestamp'],
'x-etag': file.metadata['ETag'],
'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')},
device)
resp = HTTPCreated(request=request, etag=etag)
return resp
@ -521,6 +525,8 @@ class ObjectController(object):
response.headers[key] = value
response.etag = file.metadata['ETag']
response.last_modified = float(file.metadata['X-Timestamp'])
# Needed for container sync feature
response.headers['X-Timestamp'] = file.metadata['X-Timestamp']
response.content_length = int(file.metadata['Content-Length'])
if 'Content-Encoding' in file.metadata:
response.content_encoding = file.metadata['Content-Encoding']
@ -543,6 +549,7 @@ class ObjectController(object):
response_class = HTTPNoContent
file = DiskFile(self.devices, device, partition, account, container,
obj, disk_chunk_size=self.disk_chunk_size)
orig_timestamp = file.metadata.get('X-Timestamp')
if file.is_deleted():
response_class = HTTPNotFound
metadata = {
@ -551,10 +558,12 @@ class ObjectController(object):
with file.mkstemp() as (fd, tmppath):
file.put(fd, tmppath, metadata, extension='.ts')
file.unlinkold(metadata['X-Timestamp'])
self.container_update('DELETE', account, container, obj,
request.headers, {'x-timestamp': metadata['X-Timestamp'],
'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')},
device)
if not orig_timestamp or \
orig_timestamp < request.headers['x-timestamp']:
self.container_update('DELETE', account, container, obj,
request.headers, {'x-timestamp': metadata['X-Timestamp'],
'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')},
device)
resp = response_class(request=request)
return resp

View File

@ -33,7 +33,7 @@ from random import shuffle
from eventlet import sleep, TimeoutError
from eventlet.timeout import Timeout
from webob.exc import HTTPBadRequest, HTTPMethodNotAllowed, \
from webob.exc import HTTPAccepted, HTTPBadRequest, HTTPMethodNotAllowed, \
HTTPNotFound, HTTPPreconditionFailed, \
HTTPRequestTimeout, HTTPServiceUnavailable, \
HTTPUnprocessableEntity, HTTPRequestEntityTooLarge, HTTPServerError, \
@ -407,8 +407,8 @@ class Controller(object):
:param account: account name for the container
:param container: container name to look up
:returns: tuple of (container partition, container nodes, container
read acl, container write acl) or (None, None, None, None) if
the container does not exist
read acl, container write acl, container sync key) or (None,
None, None, None, None) if the container does not exist
"""
partition, nodes = self.app.container_ring.get_nodes(
account, container)
@ -420,15 +420,17 @@ class Controller(object):
status = cache_value['status']
read_acl = cache_value['read_acl']
write_acl = cache_value['write_acl']
sync_key = cache_value.get('sync_key')
if status == 200:
return partition, nodes, read_acl, write_acl
return partition, nodes, read_acl, write_acl, sync_key
elif status == 404:
return None, None, None, None
return None, None, None, None, None
if not self.account_info(account)[1]:
return None, None, None, None
return None, None, None, None, None
result_code = 0
read_acl = None
write_acl = None
sync_key = None
container_size = None
attempts_left = self.app.container_ring.replica_count
headers = {'x-cf-trans-id': self.trans_id}
@ -446,6 +448,7 @@ class Controller(object):
result_code = 200
read_acl = resp.getheader('x-container-read')
write_acl = resp.getheader('x-container-write')
sync_key = resp.getheader('x-container-sync-key')
container_size = \
resp.getheader('X-Container-Object-Count')
break
@ -474,11 +477,12 @@ class Controller(object):
{'status': result_code,
'read_acl': read_acl,
'write_acl': write_acl,
'sync_key': sync_key,
'container_size': container_size},
timeout=cache_timeout)
if result_code == 200:
return partition, nodes, read_acl, write_acl
return None, None, None, None
return partition, nodes, read_acl, write_acl, sync_key
return None, None, None, None, None
def iter_nodes(self, partition, nodes, ring):
"""
@ -631,6 +635,9 @@ class Controller(object):
raise
res.app_iter = file_iter()
update_headers(res, source.getheaders())
# Used by container sync feature
res.environ['swift_x_timestamp'] = \
source.getheader('x-timestamp')
res.status = source.status
res.content_length = source.getheader('Content-Length')
if source.getheader('Content-Type'):
@ -640,6 +647,9 @@ class Controller(object):
elif 200 <= source.status <= 399:
res = status_map[source.status](request=req)
update_headers(res, source.getheaders())
# Used by container sync feature
res.environ['swift_x_timestamp'] = \
source.getheader('x-timestamp')
if req.method == 'HEAD':
res.content_length = source.getheader('Content-Length')
if source.getheader('Content-Type'):
@ -866,7 +876,7 @@ class ObjectController(Controller):
error_response = check_metadata(req, 'object')
if error_response:
return error_response
container_partition, containers, _junk, req.acl = \
container_partition, containers, _junk, req.acl, _junk = \
self.container_info(self.account_name, self.container_name)
if 'swift.authorize' in req.environ:
aresp = req.environ['swift.authorize'](req)
@ -908,7 +918,8 @@ class ObjectController(Controller):
@delay_denial
def PUT(self, req):
"""HTTP PUT request handler."""
container_partition, containers, _junk, req.acl = \
(container_partition, containers, _junk, req.acl,
req.environ['swift_sync_key']) = \
self.container_info(self.account_name, self.container_name)
if 'swift.authorize' in req.environ:
aresp = req.environ['swift.authorize'](req)
@ -920,7 +931,27 @@ class ObjectController(Controller):
self.app.container_ring)
partition, nodes = self.app.object_ring.get_nodes(
self.account_name, self.container_name, self.object_name)
req.headers['X-Timestamp'] = normalize_timestamp(time.time())
# Used by container sync feature
if 'x-timestamp' in req.headers:
try:
req.headers['X-Timestamp'] = \
normalize_timestamp(float(req.headers['x-timestamp']))
# For container sync PUTs, do a HEAD to see if we can
# shortcircuit
hreq = Request.blank(req.path_info,
environ={'REQUEST_METHOD': 'HEAD'})
self.GETorHEAD_base(hreq, _('Object'), partition, nodes,
hreq.path_info, self.app.object_ring.replica_count)
if 'swift_x_timestamp' in hreq.environ and \
float(hreq.environ['swift_x_timestamp']) >= \
float(req.headers['x-timestamp']):
return HTTPAccepted(request=req)
except ValueError:
return HTTPBadRequest(request=req, content_type='text/plain',
body='X-Timestamp should be a UNIX timestamp float value; '
'was %r' % req.headers['x-timestamp'])
else:
req.headers['X-Timestamp'] = normalize_timestamp(time.time())
# Sometimes the 'content-type' header exists, but is set to None.
content_type_manually_set = True
if not req.headers.get('content-type'):
@ -1114,7 +1145,8 @@ class ObjectController(Controller):
@delay_denial
def DELETE(self, req):
"""HTTP DELETE request handler."""
container_partition, containers, _junk, req.acl = \
(container_partition, containers, _junk, req.acl,
req.environ['swift_sync_key']) = \
self.container_info(self.account_name, self.container_name)
if 'swift.authorize' in req.environ:
aresp = req.environ['swift.authorize'](req)
@ -1126,7 +1158,17 @@ class ObjectController(Controller):
self.app.container_ring)
partition, nodes = self.app.object_ring.get_nodes(
self.account_name, self.container_name, self.object_name)
req.headers['X-Timestamp'] = normalize_timestamp(time.time())
# Used by container sync feature
if 'x-timestamp' in req.headers:
try:
req.headers['X-Timestamp'] = \
normalize_timestamp(float(req.headers['x-timestamp']))
except ValueError:
return HTTPBadRequest(request=req, content_type='text/plain',
body='X-Timestamp should be a UNIX timestamp float value; '
'was %r' % req.headers['x-timestamp'])
else:
req.headers['X-Timestamp'] = normalize_timestamp(time.time())
statuses = []
reasons = []
bodies = []
@ -1186,7 +1228,8 @@ class ContainerController(Controller):
"""WSGI controller for container requests"""
# Ensure these are all lowercase
pass_through_headers = ['x-container-read', 'x-container-write']
pass_through_headers = ['x-container-read', 'x-container-write',
'x-container-sync-key', 'x-container-sync-to']
def __init__(self, app, account_name, container_name, **kwargs):
Controller.__init__(self, app)
@ -1222,6 +1265,7 @@ class ContainerController(Controller):
{'status': resp.status_int,
'read_acl': resp.headers.get('x-container-read'),
'write_acl': resp.headers.get('x-container-write'),
'sync_key': resp.headers.get('x-container-sync-key'),
'container_size': resp.headers.get('x-container-object-count')},
timeout=self.app.recheck_container_existence)
@ -1230,6 +1274,11 @@ class ContainerController(Controller):
aresp = req.environ['swift.authorize'](req)
if aresp:
return aresp
if not req.environ.get('owner', False):
for key in ('x-container-read', 'x-container-write',
'x-container-sync-key', 'x-container-sync-to'):
if key in resp.headers:
del resp.headers[key]
return resp
@public