From 305e4b41f55b482d7defed967b435c171c01cf9d Mon Sep 17 00:00:00 2001 From: gholt Date: Wed, 23 Feb 2011 23:26:05 -0800 Subject: [PATCH] Initial work on container syncing --- bin/st | 127 +++++++++++----- bin/swift-container-sync | 23 +++ etc/container-server.conf-sample | 10 ++ setup.py | 2 +- swift/common/client.py | 38 +++-- swift/common/manager.py | 7 +- swift/common/middleware/auth.py | 9 ++ swift/common/middleware/swauth.py | 9 ++ swift/container/server.py | 9 +- swift/container/sync.py | 245 ++++++++++++++++++++++++++++++ swift/obj/server.py | 31 ++-- swift/proxy/server.py | 77 ++++++++-- 12 files changed, 509 insertions(+), 78 deletions(-) create mode 100755 bin/swift-container-sync create mode 100644 swift/container/sync.py diff --git a/bin/st b/bin/st index 4e6024f84f..5e06cb7961 100755 --- a/bin/st +++ b/bin/st @@ -30,22 +30,30 @@ from time import sleep import socket from cStringIO import StringIO -from httplib import HTTPException, HTTPSConnection +from httplib import HTTPException from re import compile, DOTALL from tokenize import generate_tokens, STRING, NAME, OP from urllib import quote as _quote, unquote from urlparse import urlparse, urlunparse +try: + from eventlet.green.httplib import HTTPSConnection +except ImportError: + from httplib import HTTPSConnection + try: from eventlet import sleep -except Exception: +except ImportError: from time import sleep try: from swift.common.bufferedhttp \ import BufferedHTTPConnection as HTTPConnection -except Exception: - from httplib import HTTPConnection +except ImportError: + try: + from eventlet.green.httplib import HTTPConnection + except ImportError: + from httplib import HTTPConnection def quote(value, safe='/'): @@ -226,7 +234,7 @@ def get_account(url, token, marker=None, limit=None, prefix=None, listing = \ get_account(url, token, marker, limit, prefix, http_conn)[1] if listing: - rv.extend(listing) + rv[1].extend(listing) return rv parsed, conn = http_conn qs = 'format=json' @@ -570,9 +578,9 @@ def head_object(url, token, container, name, http_conn=None): return resp_headers -def put_object(url, token, container, name, contents, content_length=None, - etag=None, chunk_size=65536, content_type=None, headers=None, - http_conn=None): +def put_object(url, token=None, container=None, name=None, contents=None, + content_length=None, etag=None, chunk_size=65536, + content_type=None, headers=None, http_conn=None): """ Put an object @@ -596,10 +604,17 @@ def put_object(url, token, container, name, contents, content_length=None, parsed, conn = http_conn else: parsed, conn = http_connection(url) - path = '%s/%s/%s' % (parsed.path, quote(container), quote(name)) - if not headers: + path = parsed.path + if container: + path = '%s/%s' % (path.rstrip('/'), quote(container)) + if name: + path = '%s/%s' % (path.rstrip('/'), quote(name)) + if headers: + headers = dict(headers) + else: headers = {} - headers['X-Auth-Token'] = token + if token: + headers['X-Auth-Token'] = token if etag: headers['ETag'] = etag.strip('"') if content_length is not None: @@ -638,7 +653,7 @@ def put_object(url, token, container, name, contents, content_length=None, raise ClientException('Object PUT failed', http_scheme=parsed.scheme, http_host=conn.host, http_port=conn.port, http_path=path, http_status=resp.status, http_reason=resp.reason) - return resp.getheader('etag').strip('"') + return resp.getheader('etag', '').strip('"') def post_object(url, token, container, name, headers, http_conn=None): @@ -669,7 +684,8 @@ def post_object(url, token, container, name, headers, http_conn=None): http_status=resp.status, http_reason=resp.reason) -def delete_object(url, token, container, name, http_conn=None): +def delete_object(url, token=None, container=None, name=None, http_conn=None, + headers=None): """ Delete object @@ -685,8 +701,18 @@ def delete_object(url, token, container, name, http_conn=None): parsed, conn = http_conn else: parsed, conn = http_connection(url) - path = '%s/%s/%s' % (parsed.path, quote(container), quote(name)) - conn.request('DELETE', path, '', {'X-Auth-Token': token}) + path = parsed.path + if container: + path = '%s/%s' % (path.rstrip('/'), quote(container)) + if name: + path = '%s/%s' % (path.rstrip('/'), quote(name)) + if headers: + headers = dict(headers) + else: + headers = {} + if token: + headers['X-Auth-Token'] = token + conn.request('DELETE', path, '', headers) resp = conn.getresponse() resp.read() if resp.status < 200 or resp.status >= 300: @@ -700,7 +726,7 @@ class Connection(object): """Convenience class to make requests that will also retry the request""" def __init__(self, authurl, user, key, retries=5, preauthurl=None, - preauthtoken=None, snet=False): + preauthtoken=None, snet=False, starting_backoff=1): """ :param authurl: authenitcation URL :param user: user name to authenticate as @@ -720,6 +746,7 @@ class Connection(object): self.token = preauthtoken self.attempts = 0 self.snet = snet + self.starting_backoff = starting_backoff def get_auth(self): return get_auth(self.authurl, self.user, self.key, snet=self.snet) @@ -727,9 +754,9 @@ class Connection(object): def http_connection(self): return http_connection(self.url) - def _retry(self, func, *args, **kwargs): + def _retry(self, reset_func, func, *args, **kwargs): self.attempts = 0 - backoff = 1 + backoff = self.starting_backoff while self.attempts <= self.retries: self.attempts += 1 try: @@ -758,10 +785,12 @@ class Connection(object): raise sleep(backoff) backoff *= 2 + if reset_func: + reset_func(func, *args, **kwargs) def head_account(self): """Wrapper for :func:`head_account`""" - return self._retry(head_account) + return self._retry(None, head_account) def get_account(self, marker=None, limit=None, prefix=None, full_listing=False): @@ -769,16 +798,16 @@ class Connection(object): # TODO(unknown): With full_listing=True this will restart the entire # listing with each retry. Need to make a better version that just # retries where it left off. - return self._retry(get_account, marker=marker, limit=limit, + return self._retry(None, get_account, marker=marker, limit=limit, prefix=prefix, full_listing=full_listing) def post_account(self, headers): """Wrapper for :func:`post_account`""" - return self._retry(post_account, headers) + return self._retry(None, post_account, headers) def head_container(self, container): """Wrapper for :func:`head_container`""" - return self._retry(head_container, container) + return self._retry(None, head_container, container) def get_container(self, container, marker=None, limit=None, prefix=None, delimiter=None, full_listing=False): @@ -786,46 +815,58 @@ class Connection(object): # TODO(unknown): With full_listing=True this will restart the entire # listing with each retry. Need to make a better version that just # retries where it left off. - return self._retry(get_container, container, marker=marker, + return self._retry(None, get_container, container, marker=marker, limit=limit, prefix=prefix, delimiter=delimiter, full_listing=full_listing) def put_container(self, container, headers=None): """Wrapper for :func:`put_container`""" - return self._retry(put_container, container, headers=headers) + return self._retry(None, put_container, container, headers=headers) def post_container(self, container, headers): """Wrapper for :func:`post_container`""" - return self._retry(post_container, container, headers) + return self._retry(None, post_container, container, headers) def delete_container(self, container): """Wrapper for :func:`delete_container`""" - return self._retry(delete_container, container) + return self._retry(None, delete_container, container) def head_object(self, container, obj): """Wrapper for :func:`head_object`""" - return self._retry(head_object, container, obj) + return self._retry(None, head_object, container, obj) def get_object(self, container, obj, resp_chunk_size=None): """Wrapper for :func:`get_object`""" - return self._retry(get_object, container, obj, + return self._retry(None, get_object, container, obj, resp_chunk_size=resp_chunk_size) def put_object(self, container, obj, contents, content_length=None, etag=None, chunk_size=65536, content_type=None, headers=None): """Wrapper for :func:`put_object`""" - return self._retry(put_object, container, obj, contents, + + def _default_reset(*args, **kwargs): + raise ClientException('put_object(%r, %r, ...) failure and no ' + 'ability to reset contents for reupload.' % (container, obj)) + + reset_func = _default_reset + tell = getattr(contents, 'tell', None) + seek = getattr(contents, 'seek', None) + if tell and seek: + orig_pos = tell() + reset_func = lambda *a, **k: seek(orig_pos) + + return self._retry(reset_func, put_object, container, obj, contents, content_length=content_length, etag=etag, chunk_size=chunk_size, content_type=content_type, headers=headers) def post_object(self, container, obj, headers): """Wrapper for :func:`post_object`""" - return self._retry(post_object, container, obj, headers) + return self._retry(None, post_object, container, obj, headers) def delete_object(self, container, obj): """Wrapper for :func:`delete_object`""" - return self._retry(delete_object, container, obj) + return self._retry(None, delete_object, container, obj) # End inclusion of swift.common.client # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # @@ -1297,10 +1338,14 @@ Container: %s Objects: %d Bytes: %d Read ACL: %s -Write ACL: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0], +Write ACL: %s + Sync To: %s + Sync Key: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0], object_count, bytes_used, headers.get('x-container-read', ''), - headers.get('x-container-write', ''))) + headers.get('x-container-write', ''), + headers.get('x-container-sync-to', ''), + headers.get('x-container-sync-key', ''))) for key, value in headers.items(): if key.startswith('x-container-meta-'): print_queue.put('%9s: %s' % ('Meta %s' % @@ -1309,7 +1354,8 @@ Write ACL: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0], if not key.startswith('x-container-meta-') and key not in ( 'content-length', 'date', 'x-container-object-count', 'x-container-bytes-used', 'x-container-read', - 'x-container-write'): + 'x-container-write', 'x-container-sync-to', + 'x-container-sync-key'): print_queue.put( '%9s: %s' % (key.title(), value)) except ClientException, err: @@ -1374,13 +1420,18 @@ def st_post(options, args, print_queue, error_queue): parser.add_option('-w', '--write-acl', dest='write_acl', help='Sets the ' 'Write ACL for containers. Quick summary of ACL syntax: account1, ' 'account2:user2') + parser.add_option('-t', '--sync-to', dest='sync_to', help='Sets the ' + 'Sync To for containers, for multi-cluster replication.') + parser.add_option('-k', '--sync-key', dest='sync_key', help='Sets the ' + 'Sync Key for containers, for multi-cluster replication.') parser.add_option('-m', '--meta', action='append', dest='meta', default=[], help='Sets a meta data item with the syntax name:value. This option ' 'may be repeated. Example: -m Color:Blue -m Size:Large') (options, args) = parse_args(parser, args) args = args[1:] - if (options.read_acl or options.write_acl) and not args: - exit('-r and -w options only allowed for containers') + if (options.read_acl or options.write_acl or options.sync_to or + options.sync_key) and not args: + exit('-r, -w, -t, and -k options only allowed for containers') conn = Connection(options.auth, options.user, options.key) if not args: headers = {} @@ -1408,6 +1459,10 @@ def st_post(options, args, print_queue, error_queue): headers['X-Container-Read'] = options.read_acl if options.write_acl is not None: headers['X-Container-Write'] = options.write_acl + if options.sync_to is not None: + headers['X-Container-Sync-To'] = options.sync_to + if options.sync_key is not None: + headers['X-Container-Sync-Key'] = options.sync_key try: conn.post_container(args[0], headers=headers) except ClientException, err: diff --git a/bin/swift-container-sync b/bin/swift-container-sync new file mode 100755 index 0000000000..8310faa802 --- /dev/null +++ b/bin/swift-container-sync @@ -0,0 +1,23 @@ +#!/usr/bin/python +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from swift.container.sync import ContainerSync +from swift.common.utils import parse_options +from swift.common.daemon import run_daemon + +if __name__ == '__main__': + conf_file, options = parse_options(once=True) + run_daemon(ContainerSync, conf_file, **options) diff --git a/etc/container-server.conf-sample b/etc/container-server.conf-sample index fb3a47835e..7a69495c16 100644 --- a/etc/container-server.conf-sample +++ b/etc/container-server.conf-sample @@ -60,3 +60,13 @@ use = egg:swift#container # log_level = INFO # Will audit, at most, 1 container per device per interval # interval = 1800 + +[container-sync] +# You can override the default log routing for this app here (don't use set!): +# log_name = container-sync +# log_facility = LOG_LOCAL0 +# log_level = INFO +# Will sync, at most, each container once per interval +# interval = 300 +# Maximum amount of time to spend syncing each container +# container_time = 60 diff --git a/setup.py b/setup.py index c80d62ddc8..7227a8535f 100644 --- a/setup.py +++ b/setup.py @@ -83,7 +83,7 @@ setup( 'bin/swift-auth-recreate-accounts', 'bin/swift-auth-server', 'bin/swift-auth-update-reseller-prefixes', 'bin/swift-container-auditor', - 'bin/swift-container-replicator', + 'bin/swift-container-replicator', 'bin/swift-container-sync', 'bin/swift-container-server', 'bin/swift-container-updater', 'bin/swift-drive-audit', 'bin/swift-get-nodes', 'bin/swift-init', 'bin/swift-object-auditor', diff --git a/swift/common/client.py b/swift/common/client.py index 1fffaa493d..6b8d20d4f1 100644 --- a/swift/common/client.py +++ b/swift/common/client.py @@ -566,9 +566,9 @@ def head_object(url, token, container, name, http_conn=None): return resp_headers -def put_object(url, token, container, name, contents, content_length=None, - etag=None, chunk_size=65536, content_type=None, headers=None, - http_conn=None): +def put_object(url, token=None, container=None, name=None, contents=None, + content_length=None, etag=None, chunk_size=65536, + content_type=None, headers=None, http_conn=None): """ Put an object @@ -592,10 +592,17 @@ def put_object(url, token, container, name, contents, content_length=None, parsed, conn = http_conn else: parsed, conn = http_connection(url) - path = '%s/%s/%s' % (parsed.path, quote(container), quote(name)) - if not headers: + path = parsed.path + if container: + path = '%s/%s' % (path.rstrip('/'), quote(container)) + if name: + path = '%s/%s' % (path.rstrip('/'), quote(name)) + if headers: + headers = dict(headers) + else: headers = {} - headers['X-Auth-Token'] = token + if token: + headers['X-Auth-Token'] = token if etag: headers['ETag'] = etag.strip('"') if content_length is not None: @@ -634,7 +641,7 @@ def put_object(url, token, container, name, contents, content_length=None, raise ClientException('Object PUT failed', http_scheme=parsed.scheme, http_host=conn.host, http_port=conn.port, http_path=path, http_status=resp.status, http_reason=resp.reason) - return resp.getheader('etag').strip('"') + return resp.getheader('etag', '').strip('"') def post_object(url, token, container, name, headers, http_conn=None): @@ -665,7 +672,8 @@ def post_object(url, token, container, name, headers, http_conn=None): http_status=resp.status, http_reason=resp.reason) -def delete_object(url, token, container, name, http_conn=None): +def delete_object(url, token=None, container=None, name=None, http_conn=None, + headers=None): """ Delete object @@ -681,8 +689,18 @@ def delete_object(url, token, container, name, http_conn=None): parsed, conn = http_conn else: parsed, conn = http_connection(url) - path = '%s/%s/%s' % (parsed.path, quote(container), quote(name)) - conn.request('DELETE', path, '', {'X-Auth-Token': token}) + path = parsed.path + if container: + path = '%s/%s' % (path.rstrip('/'), quote(container)) + if name: + path = '%s/%s' % (path.rstrip('/'), quote(name)) + if headers: + headers = dict(headers) + else: + headers = {} + if token: + headers['X-Auth-Token'] = token + conn.request('DELETE', path, '', headers) resp = conn.getresponse() resp.read() if resp.status < 200 or resp.status >= 300: diff --git a/swift/common/manager.py b/swift/common/manager.py index b5b126a822..96ca319260 100644 --- a/swift/common/manager.py +++ b/swift/common/manager.py @@ -31,9 +31,10 @@ RUN_DIR = '/var/run/swift' # auth-server has been removed from ALL_SERVERS, start it explicitly ALL_SERVERS = ['account-auditor', 'account-server', 'container-auditor', - 'container-replicator', 'container-server', 'container-updater', - 'object-auditor', 'object-server', 'object-replicator', 'object-updater', - 'proxy-server', 'account-replicator', 'account-reaper'] + 'container-replicator', 'container-server', 'container-sync', + 'container-updater', 'object-auditor', 'object-server', + 'object-replicator', 'object-updater', 'proxy-server', + 'account-replicator', 'account-reaper'] MAIN_SERVERS = ['proxy-server', 'account-server', 'container-server', 'object-server'] REST_SERVERS = [s for s in ALL_SERVERS if s not in MAIN_SERVERS] diff --git a/swift/common/middleware/auth.py b/swift/common/middleware/auth.py index a51788f7b7..ba9f639cdc 100644 --- a/swift/common/middleware/auth.py +++ b/swift/common/middleware/auth.py @@ -176,11 +176,20 @@ class DevAuth(object): return self.denied_response(req) user_groups = (req.remote_user or '').split(',') if '.reseller_admin' in user_groups: + req.environ['swift_owner'] = True return None if account in user_groups and \ (req.method not in ('DELETE', 'PUT') or container): # If the user is admin for the account and is not trying to do an # account DELETE or PUT... + req.environ['swift_owner'] = True + return None + # TODO: Restrict this further to only authenticated folks in the .sync + # group. Currently, anybody with the x-container-sync-key can do a + # sync. + if 'swift_sync_key' in req.environ and \ + req.environ['swift_sync_key'] == \ + req.headers.get('x-container-sync-key', None): return None referrers, groups = parse_acl(getattr(req, 'acl', None)) if referrer_allowed(req.referer, referrers): diff --git a/swift/common/middleware/swauth.py b/swift/common/middleware/swauth.py index 68b0d7afaf..8467d43b5c 100644 --- a/swift/common/middleware/swauth.py +++ b/swift/common/middleware/swauth.py @@ -269,11 +269,20 @@ class Swauth(object): if '.reseller_admin' in user_groups and \ account != self.reseller_prefix and \ account[len(self.reseller_prefix)] != '.': + req.environ['swift_owner'] = True return None if account in user_groups and \ (req.method not in ('DELETE', 'PUT') or container): # If the user is admin for the account and is not trying to do an # account DELETE or PUT... + req.environ['swift_owner'] = True + return None + # TODO: Restrict this further to only authenticated folks in the .sync + # group. Currently, anybody with the x-container-sync-key can do a + # sync. + if 'swift_sync_key' in req.environ and \ + req.environ['swift_sync_key'] == \ + req.headers.get('x-container-sync-key', None): return None referrers, groups = parse_acl(getattr(req, 'acl', None)) if referrer_allowed(req.referer, referrers): diff --git a/swift/container/server.py b/swift/container/server.py index 549fc47596..44af493f12 100644 --- a/swift/container/server.py +++ b/swift/container/server.py @@ -46,7 +46,8 @@ class ContainerController(object): """WSGI Controller for the container server.""" # Ensure these are all lowercase - save_headers = ['x-container-read', 'x-container-write'] + save_headers = ['x-container-read', 'x-container-write', + 'x-container-sync-key', 'x-container-sync-to'] def __init__(self, conf): self.logger = get_logger(conf, log_route='container-server') @@ -232,7 +233,8 @@ class ContainerController(object): } headers.update((key, value) for key, (value, timestamp) in broker.metadata.iteritems() - if value != '') + if value != '' and (key.lower() in self.save_headers or + key.lower().startswith('x-container-meta-'))) return HTTPNoContent(request=req, headers=headers) def GET(self, req): @@ -259,7 +261,8 @@ class ContainerController(object): } resp_headers.update((key, value) for key, (value, timestamp) in broker.metadata.iteritems() - if value != '') + if value != '' and (key.lower() in self.save_headers or + key.lower().startswith('x-container-meta-'))) try: path = get_param(req, 'path') prefix = get_param(req, 'prefix') diff --git a/swift/container/sync.py b/swift/container/sync.py new file mode 100644 index 0000000000..a4b450eb84 --- /dev/null +++ b/swift/container/sync.py @@ -0,0 +1,245 @@ +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import time +from random import random, shuffle + +from swift.container import server as container_server +from swift.common import client, direct_client +from swift.common.ring import Ring +from swift.common.db import ContainerBroker +from swift.common.utils import audit_location_generator, get_logger, \ + normalize_timestamp, TRUE_VALUES +from swift.common.daemon import Daemon + + +class Iter2FileLikeObject(object): + + def __init__(self, iterator): + self.iterator = iterator + self._chunk = '' + + def read(self, size=-1): + if size < 0: + chunk = self._chunk + self._chunk = '' + return chunk + ''.join(self.iterator) + chunk = '' + try: + chunk = self.iterator.next() + except StopIteration: + pass + if len(chunk) <= size: + return chunk + self._chunk = chunk[size:] + return chunk[:size] + + +class ContainerSync(Daemon): + """Sync syncable containers.""" + + def __init__(self, conf, object_ring=None): + self.conf = conf + self.logger = get_logger(conf, log_route='container-sync') + self.devices = conf.get('devices', '/srv/node') + self.mount_check = \ + conf.get('mount_check', 'true').lower() in TRUE_VALUES + self.interval = int(conf.get('interval', 300)) + self.container_time = int(conf.get('container_time', 60)) + swift_dir = conf.get('swift_dir', '/etc/swift') + self.container_syncs = 0 + self.container_deletes = 0 + self.container_puts = 0 + self.container_skips = 0 + self.container_failures = 0 + self.reported = time.time() + self.object_ring = object_ring or \ + Ring(os.path.join(swift_dir, 'object.ring.gz')) + + def run_forever(self): + """Run the container sync until stopped.""" + time.sleep(random() * self.interval) + while True: + begin = time.time() + all_locs = audit_location_generator(self.devices, + container_server.DATADIR, + mount_check=self.mount_check, + logger=self.logger) + for path, device, partition in all_locs: + self.container_sync(path) + if time.time() - self.reported >= 3600: # once an hour + self._report() + elapsed = time.time() - begin + if elapsed < self.interval: + time.sleep(self.interval - elapsed) + + def run_once(self): + """Run the container sync once.""" + self.logger.info(_('Begin container sync "once" mode')) + begin = time.time() + all_locs = audit_location_generator(self.devices, + container_server.DATADIR, + mount_check=self.mount_check, + logger=self.logger) + for path, device, partition in all_locs: + self.container_sync(path) + if time.time() - self.reported >= 3600: # once an hour + self._report() + self._report() + elapsed = time.time() - begin + self.logger.info( + _('Container sync "once" mode completed: %.02fs'), elapsed) + + def _report(self): + self.logger.info( + _('Since %(time)s: %(sync)s synced [%(delete)s deletes, %(put)s ' + 'puts], %(skip)s skipped, %(fail)s failed'), + {'time': time.ctime(self.reported), + 'sync': self.container_syncs, + 'delete': self.container_deletes, + 'put': self.container_puts, + 'skip': self.container_skips, + 'fail': self.container_failures}) + self.reported = time.time() + self.container_syncs = 0 + self.container_deletes = 0 + self.container_puts = 0 + self.container_skips = 0 + self.container_failures = 0 + + def container_sync(self, path): + """ + Syncs the given container path + + :param path: the path to a container db + """ + try: + if not path.endswith('.db'): + return + broker = ContainerBroker(path) + info = broker.get_info() + if not broker.is_deleted(): + sync_to = None + sync_key = None + sync_row = -1 + for key, (value, timestamp) in broker.metadata.iteritems(): + if key.lower() == 'x-container-sync-to': + sync_to = value + elif key.lower() == 'x-container-sync-key': + sync_key = value + # TODO: Make this a separate column, not a metadata item. + # Each db should track what it has synced separately and + # these metadata get ovewritten by newer values from other + # dbs. Also, once a new column, it'll need special + # attention when doing a fresh db copy. + elif key.lower() == 'x-container-sync-row': + sync_row = int(value) + if not sync_to or not sync_key: + self.container_skips += 1 + return + sync_to = sync_to.rstrip('/') + stop_at = time.time() + self.container_time + while time.time() < stop_at: + rows = broker.get_items_since(sync_row, 1) + if not rows: + break + if not self.container_sync_row(rows[0], sync_to, sync_key, + broker, info): + return + sync_row = rows[0]['ROWID'] + broker.update_metadata({'X-Container-Sync-Row': + (str(sync_row), normalize_timestamp(time.time()))}) + self.container_syncs += 1 + except Exception: + self.container_failures += 1 + self.logger.exception(_('ERROR Syncing %s'), (broker.db_file)) + + def container_sync_row(self, row, sync_to, sync_key, broker, info): + try: + if row['deleted']: + try: + client.delete_object(sync_to, name=row['name'], + headers={'X-Timestamp': row['created_at'], + 'X-Container-Sync-Key': sync_key}) + except client.ClientException, err: + if err.http_status != 404: + raise + self.container_deletes += 1 + else: + part, nodes = self.object_ring.get_nodes( + info['account'], info['container'], + row['name']) + shuffle(nodes) + exc = None + for node in nodes: + try: + headers, body = \ + direct_client.direct_get_object(node, part, + info['account'], info['container'], + row['name'], resp_chunk_size=65536) + break + except client.ClientException, err: + exc = err + else: + if exc: + raise exc + raise Exception(_('Unknown exception trying to GET: ' + '%(node)r %(account)r %(container)r %(object)r'), + {'node': node, 'part': part, + 'account': info['account'], + 'container': info['container'], + 'object': row['name']}) + for key in ('date', 'last-modified'): + if key in headers: + del headers[key] + if 'etag' in headers: + headers['etag'] = headers['etag'].strip('"') + headers['X-Timestamp'] = row['created_at'] + headers['X-Container-Sync-Key'] = sync_key + client.put_object(sync_to, name=row['name'], + headers=headers, + contents=Iter2FileLikeObject(body)) + self.container_puts += 1 + except client.ClientException, err: + if err.http_status == 401: + self.logger.info(_('Unauth %(sync_from)r ' + '=> %(sync_to)r key: %(sync_key)r'), + {'sync_from': '%s/%s' % + (client.quote(info['account']), + client.quote(info['container'])), + 'sync_to': sync_to, + 'sync_key': sync_key}) + elif err.http_status == 404: + self.logger.info(_('Not found %(sync_from)r ' + '=> %(sync_to)r key: %(sync_key)r'), + {'sync_from': '%s/%s' % + (client.quote(info['account']), + client.quote(info['container'])), + 'sync_to': sync_to, + 'sync_key': sync_key}) + else: + self.logger.exception( + _('ERROR Syncing %(db_file)s %(row)s'), + {'db_file': broker.db_file, 'row': row}) + self.container_failures += 1 + return False + except Exception: + self.logger.exception( + _('ERROR Syncing %(db_file)s %(row)s'), + {'db_file': broker.db_file, 'row': row}) + self.container_failures += 1 + return False + return True diff --git a/swift/obj/server.py b/swift/obj/server.py index 9e95ec7c6f..41f15d2c12 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -376,6 +376,7 @@ class ObjectController(object): return error_response file = DiskFile(self.devices, device, partition, account, container, obj, disk_chunk_size=self.disk_chunk_size) + orig_timestamp = file.metadata.get('X-Timestamp') upload_expiration = time.time() + self.max_upload_time etag = md5() upload_size = 0 @@ -422,13 +423,16 @@ class ObjectController(object): request.headers['Content-Encoding'] file.put(fd, tmppath, metadata) file.unlinkold(metadata['X-Timestamp']) - self.container_update('PUT', account, container, obj, request.headers, - {'x-size': file.metadata['Content-Length'], - 'x-content-type': file.metadata['Content-Type'], - 'x-timestamp': file.metadata['X-Timestamp'], - 'x-etag': file.metadata['ETag'], - 'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')}, - device) + if not orig_timestamp or \ + orig_timestamp < request.headers['x-timestamp']: + self.container_update('PUT', account, container, obj, + request.headers, + {'x-size': file.metadata['Content-Length'], + 'x-content-type': file.metadata['Content-Type'], + 'x-timestamp': file.metadata['X-Timestamp'], + 'x-etag': file.metadata['ETag'], + 'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')}, + device) resp = HTTPCreated(request=request, etag=etag) return resp @@ -521,6 +525,8 @@ class ObjectController(object): response.headers[key] = value response.etag = file.metadata['ETag'] response.last_modified = float(file.metadata['X-Timestamp']) + # Needed for container sync feature + response.headers['X-Timestamp'] = file.metadata['X-Timestamp'] response.content_length = int(file.metadata['Content-Length']) if 'Content-Encoding' in file.metadata: response.content_encoding = file.metadata['Content-Encoding'] @@ -543,6 +549,7 @@ class ObjectController(object): response_class = HTTPNoContent file = DiskFile(self.devices, device, partition, account, container, obj, disk_chunk_size=self.disk_chunk_size) + orig_timestamp = file.metadata.get('X-Timestamp') if file.is_deleted(): response_class = HTTPNotFound metadata = { @@ -551,10 +558,12 @@ class ObjectController(object): with file.mkstemp() as (fd, tmppath): file.put(fd, tmppath, metadata, extension='.ts') file.unlinkold(metadata['X-Timestamp']) - self.container_update('DELETE', account, container, obj, - request.headers, {'x-timestamp': metadata['X-Timestamp'], - 'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')}, - device) + if not orig_timestamp or \ + orig_timestamp < request.headers['x-timestamp']: + self.container_update('DELETE', account, container, obj, + request.headers, {'x-timestamp': metadata['X-Timestamp'], + 'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')}, + device) resp = response_class(request=request) return resp diff --git a/swift/proxy/server.py b/swift/proxy/server.py index cc09ac1459..af71346348 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -33,7 +33,7 @@ from random import shuffle from eventlet import sleep, TimeoutError from eventlet.timeout import Timeout -from webob.exc import HTTPBadRequest, HTTPMethodNotAllowed, \ +from webob.exc import HTTPAccepted, HTTPBadRequest, HTTPMethodNotAllowed, \ HTTPNotFound, HTTPPreconditionFailed, \ HTTPRequestTimeout, HTTPServiceUnavailable, \ HTTPUnprocessableEntity, HTTPRequestEntityTooLarge, HTTPServerError, \ @@ -407,8 +407,8 @@ class Controller(object): :param account: account name for the container :param container: container name to look up :returns: tuple of (container partition, container nodes, container - read acl, container write acl) or (None, None, None, None) if - the container does not exist + read acl, container write acl, container sync key) or (None, + None, None, None, None) if the container does not exist """ partition, nodes = self.app.container_ring.get_nodes( account, container) @@ -420,15 +420,17 @@ class Controller(object): status = cache_value['status'] read_acl = cache_value['read_acl'] write_acl = cache_value['write_acl'] + sync_key = cache_value.get('sync_key') if status == 200: - return partition, nodes, read_acl, write_acl + return partition, nodes, read_acl, write_acl, sync_key elif status == 404: - return None, None, None, None + return None, None, None, None, None if not self.account_info(account)[1]: - return None, None, None, None + return None, None, None, None, None result_code = 0 read_acl = None write_acl = None + sync_key = None container_size = None attempts_left = self.app.container_ring.replica_count headers = {'x-cf-trans-id': self.trans_id} @@ -446,6 +448,7 @@ class Controller(object): result_code = 200 read_acl = resp.getheader('x-container-read') write_acl = resp.getheader('x-container-write') + sync_key = resp.getheader('x-container-sync-key') container_size = \ resp.getheader('X-Container-Object-Count') break @@ -474,11 +477,12 @@ class Controller(object): {'status': result_code, 'read_acl': read_acl, 'write_acl': write_acl, + 'sync_key': sync_key, 'container_size': container_size}, timeout=cache_timeout) if result_code == 200: - return partition, nodes, read_acl, write_acl - return None, None, None, None + return partition, nodes, read_acl, write_acl, sync_key + return None, None, None, None, None def iter_nodes(self, partition, nodes, ring): """ @@ -631,6 +635,9 @@ class Controller(object): raise res.app_iter = file_iter() update_headers(res, source.getheaders()) + # Used by container sync feature + res.environ['swift_x_timestamp'] = \ + source.getheader('x-timestamp') res.status = source.status res.content_length = source.getheader('Content-Length') if source.getheader('Content-Type'): @@ -640,6 +647,9 @@ class Controller(object): elif 200 <= source.status <= 399: res = status_map[source.status](request=req) update_headers(res, source.getheaders()) + # Used by container sync feature + res.environ['swift_x_timestamp'] = \ + source.getheader('x-timestamp') if req.method == 'HEAD': res.content_length = source.getheader('Content-Length') if source.getheader('Content-Type'): @@ -866,7 +876,7 @@ class ObjectController(Controller): error_response = check_metadata(req, 'object') if error_response: return error_response - container_partition, containers, _junk, req.acl = \ + container_partition, containers, _junk, req.acl, _junk = \ self.container_info(self.account_name, self.container_name) if 'swift.authorize' in req.environ: aresp = req.environ['swift.authorize'](req) @@ -908,7 +918,8 @@ class ObjectController(Controller): @delay_denial def PUT(self, req): """HTTP PUT request handler.""" - container_partition, containers, _junk, req.acl = \ + (container_partition, containers, _junk, req.acl, + req.environ['swift_sync_key']) = \ self.container_info(self.account_name, self.container_name) if 'swift.authorize' in req.environ: aresp = req.environ['swift.authorize'](req) @@ -920,7 +931,27 @@ class ObjectController(Controller): self.app.container_ring) partition, nodes = self.app.object_ring.get_nodes( self.account_name, self.container_name, self.object_name) - req.headers['X-Timestamp'] = normalize_timestamp(time.time()) + # Used by container sync feature + if 'x-timestamp' in req.headers: + try: + req.headers['X-Timestamp'] = \ + normalize_timestamp(float(req.headers['x-timestamp'])) + # For container sync PUTs, do a HEAD to see if we can + # shortcircuit + hreq = Request.blank(req.path_info, + environ={'REQUEST_METHOD': 'HEAD'}) + self.GETorHEAD_base(hreq, _('Object'), partition, nodes, + hreq.path_info, self.app.object_ring.replica_count) + if 'swift_x_timestamp' in hreq.environ and \ + float(hreq.environ['swift_x_timestamp']) >= \ + float(req.headers['x-timestamp']): + return HTTPAccepted(request=req) + except ValueError: + return HTTPBadRequest(request=req, content_type='text/plain', + body='X-Timestamp should be a UNIX timestamp float value; ' + 'was %r' % req.headers['x-timestamp']) + else: + req.headers['X-Timestamp'] = normalize_timestamp(time.time()) # Sometimes the 'content-type' header exists, but is set to None. content_type_manually_set = True if not req.headers.get('content-type'): @@ -1114,7 +1145,8 @@ class ObjectController(Controller): @delay_denial def DELETE(self, req): """HTTP DELETE request handler.""" - container_partition, containers, _junk, req.acl = \ + (container_partition, containers, _junk, req.acl, + req.environ['swift_sync_key']) = \ self.container_info(self.account_name, self.container_name) if 'swift.authorize' in req.environ: aresp = req.environ['swift.authorize'](req) @@ -1126,7 +1158,17 @@ class ObjectController(Controller): self.app.container_ring) partition, nodes = self.app.object_ring.get_nodes( self.account_name, self.container_name, self.object_name) - req.headers['X-Timestamp'] = normalize_timestamp(time.time()) + # Used by container sync feature + if 'x-timestamp' in req.headers: + try: + req.headers['X-Timestamp'] = \ + normalize_timestamp(float(req.headers['x-timestamp'])) + except ValueError: + return HTTPBadRequest(request=req, content_type='text/plain', + body='X-Timestamp should be a UNIX timestamp float value; ' + 'was %r' % req.headers['x-timestamp']) + else: + req.headers['X-Timestamp'] = normalize_timestamp(time.time()) statuses = [] reasons = [] bodies = [] @@ -1186,7 +1228,8 @@ class ContainerController(Controller): """WSGI controller for container requests""" # Ensure these are all lowercase - pass_through_headers = ['x-container-read', 'x-container-write'] + pass_through_headers = ['x-container-read', 'x-container-write', + 'x-container-sync-key', 'x-container-sync-to'] def __init__(self, app, account_name, container_name, **kwargs): Controller.__init__(self, app) @@ -1222,6 +1265,7 @@ class ContainerController(Controller): {'status': resp.status_int, 'read_acl': resp.headers.get('x-container-read'), 'write_acl': resp.headers.get('x-container-write'), + 'sync_key': resp.headers.get('x-container-sync-key'), 'container_size': resp.headers.get('x-container-object-count')}, timeout=self.app.recheck_container_existence) @@ -1230,6 +1274,11 @@ class ContainerController(Controller): aresp = req.environ['swift.authorize'](req) if aresp: return aresp + if not req.environ.get('owner', False): + for key in ('x-container-read', 'x-container-write', + 'x-container-sync-key', 'x-container-sync-to'): + if key in resp.headers: + del resp.headers[key] return resp @public