From 15009bb76ee648a8f2968e0e9fc16d9bd11f746d Mon Sep 17 00:00:00 2001 From: gholt Date: Tue, 10 Aug 2010 12:18:15 -0700 Subject: [PATCH] Added metadata to account and container servers --- swift/account/server.py | 61 +++++- swift/common/db.py | 126 +++++++++-- swift/common/db_replicator.py | 41 ++-- swift/container/server.py | 55 ++++- test/unit/account/test_server.py | 88 ++++++++ test/unit/common/test_db.py | 292 +++++++++++++++++++++++++ test/unit/common/test_db_replicator.py | 25 ++- test/unit/container/test_server.py | 92 ++++++++ 8 files changed, 718 insertions(+), 62 deletions(-) diff --git a/swift/account/server.py b/swift/account/server.py index 6e5dd7472e..0d784c3338 100644 --- a/swift/account/server.py +++ b/swift/account/server.py @@ -20,7 +20,6 @@ import time import traceback from urllib import unquote -from swift.common.utils import get_logger from webob import Request, Response from webob.exc import HTTPAccepted, HTTPBadRequest, \ @@ -30,8 +29,8 @@ import simplejson from xml.sax import saxutils from swift.common.db import AccountBroker -from swift.common.utils import get_param, split_path, storage_directory, \ - hash_path +from swift.common.utils import get_logger, get_param, hash_path, \ + normalize_timestamp, split_path, storage_directory from swift.common.constraints import ACCOUNT_LISTING_LIMIT, \ check_mount, check_float, check_xml_encodable from swift.common.healthcheck import healthcheck @@ -100,13 +99,26 @@ class AccountController(object): else: return HTTPCreated(request=req) else: # put account + timestamp = normalize_timestamp(req.headers['x-timestamp']) if not os.path.exists(broker.db_file): - broker.initialize(req.headers['x-timestamp']) - return HTTPCreated(request=req) + broker.initialize(timestamp) + created = True elif broker.is_status_deleted(): return HTTPForbidden(request=req, body='Recently deleted') else: - broker.update_put_timestamp(req.headers['x-timestamp']) + created = broker.is_deleted() + broker.update_put_timestamp(timestamp) + if broker.is_deleted(): + return HTTPConflict(request=req) + metadata = {} + metadata.update((key, (value, timestamp)) + for key, value in req.headers.iteritems() + if key.lower().startswith('x-account-meta-')) + if metadata: + broker.metadata = metadata + if created: + return HTTPCreated(request=req) + else: return HTTPAccepted(request=req) def HEAD(self, req): @@ -142,6 +154,9 @@ class AccountController(object): container_ts = broker.get_container_timestamp(container) if container_ts is not None: headers['X-Container-Timestamp'] = container_ts + headers.update((key, value) + for key, (value, timestamp) in broker.metadata.iteritems() + if value != '') return HTTPNoContent(request=req, headers=headers) def GET(self, req): @@ -165,6 +180,9 @@ class AccountController(object): 'X-Account-Bytes-Used': info['bytes_used'], 'X-Timestamp': info['created_at'], 'X-PUT-Timestamp': info['put_timestamp']} + resp_headers.update((key, value) + for key, (value, timestamp) in broker.metadata.iteritems() + if value != '') try: prefix = get_param(req, 'prefix') delimiter = get_param(req, 'delimiter') @@ -229,9 +247,9 @@ class AccountController(object): ret.charset = 'utf8' return ret - def POST(self, req): + def REPLICATE(self, req): """ - Handle HTTP POST request. + Handle HTTP REPLICATE request. Handler for RPC calls for account replication. """ try: @@ -250,6 +268,31 @@ class AccountController(object): ret.request = req return ret + def POST(self, req): + """Handle HTTP POST request.""" + try: + drive, part, account = split_path(unquote(req.path), 3) + except ValueError, err: + return HTTPBadRequest(body=str(err), content_type='text/plain', + request=req) + if 'x-timestamp' not in req.headers or \ + not check_float(req.headers['x-timestamp']): + return HTTPBadRequest(body='Missing or bad timestamp', + request=req, content_type='text/plain') + if self.mount_check and not check_mount(self.root, drive): + return Response(status='507 %s is not mounted' % drive) + broker = self._get_account_broker(drive, part, account) + if broker.is_deleted(): + return HTTPNotFound(request=req) + timestamp = normalize_timestamp(req.headers['x-timestamp']) + metadata = {} + metadata.update((key, (value, timestamp)) + for key, value in req.headers.iteritems() + if key.lower().startswith('x-account-meta-')) + if metadata: + broker.metadata = metadata + return HTTPNoContent(request=req) + def __call__(self, env, start_response): start_time = time.time() req = Request(env) @@ -283,7 +326,7 @@ class AccountController(object): req.referer or '-', req.user_agent or '-', trans_time, additional_info) - if req.method.upper() == 'POST': + if req.method.upper() == 'REPLICATE': self.logger.debug(log_message) else: self.logger.info(log_message) diff --git a/swift/common/db.py b/swift/common/db.py index e2eb2a2901..828fd4378d 100644 --- a/swift/common/db.py +++ b/swift/common/db.py @@ -29,6 +29,7 @@ from random import randint from tempfile import mkstemp from eventlet import sleep +import simplejson as json import sqlite3 from swift.common.utils import normalize_timestamp, renamer, \ @@ -396,23 +397,32 @@ class DatabaseBroker(object): """ Get information about the DB required for replication. - :returns: tuple of (hash, id, created_at, put_timestamp, - delete_timestamp) from the DB + :returns: dict containing keys: hash, id, created_at, put_timestamp, + delete_timestamp, count, max_row, and metadata """ try: self._commit_puts() except LockTimeout: if not self.stale_reads_ok: raise + query_part1 = ''' + SELECT hash, id, created_at, put_timestamp, delete_timestamp, + %s_count AS count, + CASE WHEN SQLITE_SEQUENCE.seq IS NOT NULL + THEN SQLITE_SEQUENCE.seq ELSE -1 END AS max_row, ''' % \ + self.db_contains_type + query_part2 = ''' + FROM (%s_stat LEFT JOIN SQLITE_SEQUENCE + ON SQLITE_SEQUENCE.name == '%s') LIMIT 1 + ''' % (self.db_type, self.db_contains_type) with self.get() as conn: - curs = conn.execute(''' - SELECT hash, id, created_at, put_timestamp, delete_timestamp, - %s_count AS count, - CASE WHEN SQLITE_SEQUENCE.seq IS NOT NULL - THEN SQLITE_SEQUENCE.seq ELSE -1 END AS max_row - FROM (%s_stat LEFT JOIN SQLITE_SEQUENCE - ON SQLITE_SEQUENCE.name == '%s') LIMIT 1 - ''' % (self.db_contains_type, self.db_type, self.db_contains_type)) + try: + curs = conn.execute(query_part1 + 'metadata' + query_part2) + except sqlite3.OperationalError, err: + if 'no such column: metadata' not in str(err): + raise + curs = conn.execute(query_part1 + "'' as metadata" + + query_part2) curs.row_factory = dict_factory return curs.fetchone() @@ -472,6 +482,88 @@ class DatabaseBroker(object): with open(self.db_file, 'rb+') as fp: fallocate(fp.fileno(), int(prealloc_size)) + @property + def metadata(self): + """ + Returns the metadata dict for the database. The metadata dict values + are tuples of (value, timestamp) where the timestamp indicates when + that key was set to that value. + """ + with self.get() as conn: + try: + metadata = conn.execute('SELECT metadata FROM %s_stat' % + self.db_type).fetchone()[0] + except sqlite3.OperationalError, err: + if 'no such column: metadata' not in str(err): + raise + metadata = '' + if metadata: + metadata = json.loads(metadata) + else: + metadata = {} + return metadata + + @metadata.setter + def metadata(self, new_metadata): + """ + Updates the metadata dict for the database. The metadata dict values + are tuples of (value, timestamp) where the timestamp indicates when + that key was set to that value. Key/values will only be overwritten if + the timestamp is newer. To delete a key, set its value to ('', + timestamp). These empty keys will eventually be removed by + :func:reclaim + """ + old_metadata = self.metadata + if set(new_metadata).issubset(set(old_metadata)): + for key, (value, timestamp) in new_metadata.iteritems(): + if timestamp > old_metadata[key][1]: + break + else: + return + with self.get() as conn: + try: + md = conn.execute('SELECT metadata FROM %s_stat' % + self.db_type).fetchone()[0] + md = md and json.loads(md) or {} + except sqlite3.OperationalError, err: + if 'no such column: metadata' not in str(err): + raise + conn.execute(""" + ALTER TABLE %s_stat + ADD COLUMN metadata TEXT DEFAULT '' """ % self.db_type) + md = {} + for key, value_timestamp in new_metadata.iteritems(): + value, timestamp = value_timestamp + if key not in md or timestamp > md[key][1]: + md[key] = value_timestamp + conn.execute('UPDATE %s_stat SET metadata = ?' % self.db_type, + (json.dumps(md),)) + conn.commit() + + def reclaim(self, timestamp): + """Removes any empty metadata values older than the timestamp""" + if not self.metadata: + return + with self.get() as conn: + try: + md = conn.execute('SELECT metadata FROM %s_stat' % + self.db_type).fetchone()[0] + if md: + md = json.loads(md) + keys_to_delete = [] + for key, (value, value_timestamp) in md.iteritems(): + if value == '' and value_timestamp < timestamp: + keys_to_delete.append(key) + if keys_to_delete: + for key in keys_to_delete: + del md[key] + conn.execute('UPDATE %s_stat SET metadata = ?' % + self.db_type, (json.dumps(md),)) + conn.commit() + except sqlite3.OperationalError, err: + if 'no such column: metadata' not in str(err): + raise + class ContainerBroker(DatabaseBroker): """Encapsulates working with a container database.""" @@ -532,7 +624,7 @@ class ContainerBroker(DatabaseBroker): def create_container_stat_table(self, conn, put_timestamp=None): """ - Create the container_stat table which is specifc to the container DB. + Create the container_stat table which is specific to the container DB. :param conn: DB connection object :param put_timestamp: put timestamp @@ -555,7 +647,8 @@ class ContainerBroker(DatabaseBroker): hash TEXT default '00000000000000000000000000000000', id TEXT, status TEXT DEFAULT '', - status_changed_at TEXT DEFAULT '0' + status_changed_at TEXT DEFAULT '0', + metadata TEXT DEFAULT '' ); INSERT INTO container_stat (object_count, bytes_used) @@ -658,6 +751,8 @@ class ContainerBroker(DatabaseBroker): from incoming_sync and outgoing_sync where the updated_at timestamp is < sync_timestamp. + In addition, this calls the DatabaseBroker's :func:reclaim method. + :param object_timestamp: max created_at timestamp of object rows to delete :param sync_timestamp: max update_at timestamp of sync rows to delete @@ -680,6 +775,7 @@ class ContainerBroker(DatabaseBroker): if 'no such column: updated_at' not in str(err): raise conn.commit() + DatabaseBroker.reclaim(self, object_timestamp) def delete_object(self, name, timestamp): """ @@ -1034,7 +1130,8 @@ class AccountBroker(DatabaseBroker): hash TEXT default '00000000000000000000000000000000', id TEXT, status TEXT DEFAULT '', - status_changed_at TEXT DEFAULT '0' + status_changed_at TEXT DEFAULT '0', + metadata TEXT DEFAULT '' ); INSERT INTO account_stat (container_count) VALUES (0); @@ -1133,6 +1230,8 @@ class AccountBroker(DatabaseBroker): from incoming_sync and outgoing_sync where the updated_at timestamp is < sync_timestamp. + In addition, this calls the DatabaseBroker's :func:reclaim method. + :param object_timestamp: max created_at timestamp of container rows to delete :param sync_timestamp: max update_at timestamp of sync rows to delete @@ -1156,6 +1255,7 @@ class AccountBroker(DatabaseBroker): if 'no such column: updated_at' not in str(err): raise conn.commit() + DatabaseBroker.reclaim(self, container_timestamp) def get_container_timestamp(self, container_name): """ diff --git a/swift/common/db_replicator.py b/swift/common/db_replicator.py index 5f416cce31..a96b507e90 100644 --- a/swift/common/db_replicator.py +++ b/swift/common/db_replicator.py @@ -53,7 +53,7 @@ def quarantine_db(object_file, server_type): class ReplConnection(BufferedHTTPConnection): """ - Helper to simplify POSTing to a remote server. + Helper to simplify REPLICATEing to a remote server. """ def __init__(self, node, partition, hash_, logger): @@ -63,9 +63,9 @@ class ReplConnection(BufferedHTTPConnection): BufferedHTTPConnection.__init__(self, '%(ip)s:%(port)s' % node) self.path = '/%s/%s/%s' % (node['device'], partition, hash_) - def post(self, *args): + def replicate(self, *args): """ - Make an HTTP POST request + Make an HTTP REPLICATE request :param args: list of json-encodable objects @@ -73,7 +73,7 @@ class ReplConnection(BufferedHTTPConnection): """ try: body = simplejson.dumps(args) - self.request('POST', self.path, body, + self.request('REPLICATE', self.path, body, {'Content-Type': 'application/json'}) response = self.getresponse() response.data = response.read() @@ -158,7 +158,7 @@ class Replicator(object): return proc.returncode == 0 def _rsync_db(self, broker, device, http, local_id, - post_method='complete_rsync', post_timeout=None): + replicate_method='complete_rsync', replicate_timeout=None): """ Sync a whole db using rsync. @@ -166,8 +166,8 @@ class Replicator(object): :param device: device to sync to :param http: ReplConnection object :param local_id: unique ID of the local database replica - :param post_method: remote operation to perform after rsync - :param post_timeout: timeout to wait in seconds + :param replicate_method: remote operation to perform after rsync + :param replicate_timeout: timeout to wait in seconds """ if self.vm_test_mode: remote_file = '%s::%s%s/%s/tmp/%s' % (device['ip'], @@ -186,8 +186,8 @@ class Replicator(object): with broker.lock(): if not self._rsync_file(broker.db_file, remote_file, False): return False - with Timeout(post_timeout or self.node_timeout): - response = http.post(post_method, local_id) + with Timeout(replicate_timeout or self.node_timeout): + response = http.replicate(replicate_method, local_id) return response and response.status >= 200 and response.status < 300 def _usync_db(self, point, broker, http, remote_id, local_id): @@ -208,7 +208,7 @@ class Replicator(object): objects = broker.get_items_since(point, self.per_diff) while len(objects): with Timeout(self.node_timeout): - response = http.post('merge_items', objects, local_id) + response = http.replicate('merge_items', objects, local_id) if not response or response.status >= 300 or response.status < 200: if response: self.logger.error('ERROR Bad response %s from %s' % @@ -217,7 +217,7 @@ class Replicator(object): point = objects[-1]['ROWID'] objects = broker.get_items_since(point, self.per_diff) with Timeout(self.node_timeout): - response = http.post('merge_syncs', sync_table) + response = http.replicate('merge_syncs', sync_table) if response and response.status >= 200 and response.status < 300: broker.merge_syncs([{'remote_id': remote_id, 'sync_point': point}], incoming=False) @@ -266,7 +266,8 @@ class Replicator(object): :param broker: DB broker for the DB to be replication :param partition: partition on the node to replicate to :param info: DB info as a dictionary of {'max_row', 'hash', 'id', - 'created_at', 'put_timestamp', 'delete_timestamp'} + 'created_at', 'put_timestamp', 'delete_timestamp', + 'metadata'} :returns: True if successful, False otherwise """ @@ -277,9 +278,9 @@ class Replicator(object): 'ERROR Unable to connect to remote server: %s' % node) return False with Timeout(self.node_timeout): - response = http.post('sync', info['max_row'], info['hash'], + response = http.replicate('sync', info['max_row'], info['hash'], info['id'], info['created_at'], info['put_timestamp'], - info['delete_timestamp']) + info['delete_timestamp'], info['metadata']) if not response: return False elif response.status == HTTPNotFound.code: # completely missing, rsync @@ -297,8 +298,8 @@ class Replicator(object): if rinfo['max_row'] / float(info['max_row']) < 0.5: self.stats['remote_merge'] += 1 return self._rsync_db(broker, node, http, info['id'], - post_method='rsync_then_merge', - post_timeout=(info['count'] / 2000)) + replicate_method='rsync_then_merge', + replicate_timeout=(info['count'] / 2000)) # else send diffs over to the remote server return self._usync_db(max(rinfo['point'], local_sync), broker, http, rinfo['id'], info['id']) @@ -445,11 +446,11 @@ class ReplicatorRpc(object): self.broker_class = broker_class self.mount_check = mount_check - def dispatch(self, post_args, args): + def dispatch(self, replicate_args, args): if not hasattr(args, 'pop'): return HTTPBadRequest(body='Invalid object type') op = args.pop(0) - drive, partition, hsh = post_args + drive, partition, hsh = replicate_args if self.mount_check and \ not os.path.ismount(os.path.join(self.root, drive)): return Response(status='507 %s is not mounted' % drive) @@ -469,7 +470,7 @@ class ReplicatorRpc(object): def sync(self, broker, args): (remote_sync, hash_, id_, created_at, put_timestamp, - delete_timestamp) = args + delete_timestamp, metadata) = args try: info = broker.get_replication_info() except Exception, e: @@ -479,6 +480,8 @@ class ReplicatorRpc(object): quarantine_db(broker.db_file, broker.db_type) return HTTPNotFound() raise + if metadata: + broker.metadata = simplejson.loads(metadata) if info['put_timestamp'] != put_timestamp or \ info['created_at'] != created_at or \ info['delete_timestamp'] != delete_timestamp: diff --git a/swift/container/server.py b/swift/container/server.py index 57775ee661..0ef0d03a81 100644 --- a/swift/container/server.py +++ b/swift/container/server.py @@ -31,7 +31,7 @@ from webob.exc import HTTPAccepted, HTTPBadRequest, HTTPConflict, \ from swift.common.db import ContainerBroker from swift.common.utils import get_logger, get_param, hash_path, \ - storage_directory, split_path + normalize_timestamp, storage_directory, split_path from swift.common.constraints import CONTAINER_LISTING_LIMIT, \ check_mount, check_float, check_xml_encodable from swift.common.bufferedhttp import http_connect @@ -175,23 +175,29 @@ class ContainerController(object): content_type='text/plain') if self.mount_check and not check_mount(self.root, drive): return Response(status='507 %s is not mounted' % drive) + timestamp = normalize_timestamp(req.headers['x-timestamp']) broker = self._get_container_broker(drive, part, account, container) if obj: # put container object if not os.path.exists(broker.db_file): return HTTPNotFound() - broker.put_object(obj, req.headers['x-timestamp'], - int(req.headers['x-size']), req.headers['x-content-type'], - req.headers['x-etag']) + broker.put_object(obj, timestamp, int(req.headers['x-size']), + req.headers['x-content-type'], req.headers['x-etag']) return HTTPCreated(request=req) else: # put container if not os.path.exists(broker.db_file): - broker.initialize(req.headers['x-timestamp']) + broker.initialize(timestamp) created = True else: created = broker.is_deleted() - broker.update_put_timestamp(req.headers['x-timestamp']) + broker.update_put_timestamp(timestamp) if broker.is_deleted(): return HTTPConflict(request=req) + metadata = {} + metadata.update((key, (value, timestamp)) + for key, value in req.headers.iteritems() + if key.lower().startswith('x-container-meta-')) + if metadata: + broker.metadata = metadata resp = self.account_update(req, account, container, broker) if resp: return resp @@ -222,6 +228,9 @@ class ContainerController(object): 'X-Timestamp': info['created_at'], 'X-PUT-Timestamp': info['put_timestamp'], } + headers.update((key, value) + for key, (value, timestamp) in broker.metadata.iteritems() + if value != '') return HTTPNoContent(request=req, headers=headers) def GET(self, req): @@ -246,6 +255,9 @@ class ContainerController(object): 'X-Timestamp': info['created_at'], 'X-PUT-Timestamp': info['put_timestamp'], } + resp_headers.update((key, value) + for key, (value, timestamp) in broker.metadata.iteritems() + if value != '') try: path = get_param(req, 'path') prefix = get_param(req, 'prefix') @@ -324,9 +336,9 @@ class ContainerController(object): ret.charset = 'utf8' return ret - def POST(self, req): + def REPLICATE(self, req): """ - Handle HTTP POST request (json-encoded RPC calls for replication.) + Handle HTTP REPLICATE request (json-encoded RPC calls for replication.) """ try: post_args = split_path(unquote(req.path), 3) @@ -344,6 +356,31 @@ class ContainerController(object): ret.request = req return ret + def POST(self, req): + """Handle HTTP POST request.""" + try: + drive, part, account, container = split_path(unquote(req.path), 4) + except ValueError, err: + return HTTPBadRequest(body=str(err), content_type='text/plain', + request=req) + if 'x-timestamp' not in req.headers or \ + not check_float(req.headers['x-timestamp']): + return HTTPBadRequest(body='Missing or bad timestamp', + request=req, content_type='text/plain') + if self.mount_check and not check_mount(self.root, drive): + return Response(status='507 %s is not mounted' % drive) + broker = self._get_container_broker(drive, part, account, container) + if broker.is_deleted(): + return HTTPNotFound(request=req) + timestamp = normalize_timestamp(req.headers['x-timestamp']) + metadata = {} + metadata.update((key, (value, timestamp)) + for key, value in req.headers.iteritems() + if key.lower().startswith('x-container-meta-')) + if metadata: + broker.metadata = metadata + return HTTPNoContent(request=req) + def __call__(self, env, start_response): start_time = time.time() req = Request(env) @@ -373,7 +410,7 @@ class ContainerController(object): req.headers.get('x-cf-trans-id', '-'), req.referer or '-', req.user_agent or '-', trans_time) - if req.method.upper() == 'POST': + if req.method.upper() == 'REPLICATE': self.logger.debug(log_message) else: self.logger.info(log_message) diff --git a/test/unit/account/test_server.py b/test/unit/account/test_server.py index 84ef0ed897..71a6ff053d 100644 --- a/test/unit/account/test_server.py +++ b/test/unit/account/test_server.py @@ -196,6 +196,94 @@ class TestAccountController(unittest.TestCase): self.assertEquals(resp.status_int, 403) self.assertEquals(resp.body, 'Recently deleted') + def test_PUT_GET_metadata(self): + # Set metadata header + req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': normalize_timestamp(1), + 'X-Account-Meta-Test': 'Value'}) + resp = self.controller.PUT(req) + self.assertEquals(resp.status_int, 201) + req = Request.blank('/sda1/p/a') + resp = self.controller.GET(req) + self.assertEquals(resp.status_int, 204) + self.assertEquals(resp.headers.get('x-account-meta-test'), 'Value') + # Update metadata header + req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': normalize_timestamp(3), + 'X-Account-Meta-Test': 'New Value'}) + resp = self.controller.PUT(req) + self.assertEquals(resp.status_int, 202) + req = Request.blank('/sda1/p/a') + resp = self.controller.GET(req) + self.assertEquals(resp.status_int, 204) + self.assertEquals(resp.headers.get('x-account-meta-test'), 'New Value') + # Send old update to metadata header + req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': normalize_timestamp(2), + 'X-Account-Meta-Test': 'Old Value'}) + resp = self.controller.PUT(req) + self.assertEquals(resp.status_int, 202) + req = Request.blank('/sda1/p/a') + resp = self.controller.GET(req) + self.assertEquals(resp.status_int, 204) + self.assertEquals(resp.headers.get('x-account-meta-test'), 'New Value') + # Remove metadata header (by setting it to empty) + req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': normalize_timestamp(4), + 'X-Account-Meta-Test': ''}) + resp = self.controller.PUT(req) + self.assertEquals(resp.status_int, 202) + req = Request.blank('/sda1/p/a') + resp = self.controller.GET(req) + self.assertEquals(resp.status_int, 204) + self.assert_('x-account-meta-test' not in resp.headers) + + def test_POST_HEAD_metadata(self): + req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': normalize_timestamp(1)}) + resp = self.controller.PUT(req) + self.assertEquals(resp.status_int, 201) + # Set metadata header + req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'POST'}, + headers={'X-Timestamp': normalize_timestamp(1), + 'X-Account-Meta-Test': 'Value'}) + resp = self.controller.POST(req) + self.assertEquals(resp.status_int, 204) + req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'HEAD'}) + resp = self.controller.HEAD(req) + self.assertEquals(resp.status_int, 204) + self.assertEquals(resp.headers.get('x-account-meta-test'), 'Value') + # Update metadata header + req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'POST'}, + headers={'X-Timestamp': normalize_timestamp(3), + 'X-Account-Meta-Test': 'New Value'}) + resp = self.controller.POST(req) + self.assertEquals(resp.status_int, 204) + req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'HEAD'}) + resp = self.controller.HEAD(req) + self.assertEquals(resp.status_int, 204) + self.assertEquals(resp.headers.get('x-account-meta-test'), 'New Value') + # Send old update to metadata header + req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'POST'}, + headers={'X-Timestamp': normalize_timestamp(2), + 'X-Account-Meta-Test': 'Old Value'}) + resp = self.controller.POST(req) + self.assertEquals(resp.status_int, 204) + req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'HEAD'}) + resp = self.controller.HEAD(req) + self.assertEquals(resp.status_int, 204) + self.assertEquals(resp.headers.get('x-account-meta-test'), 'New Value') + # Remove metadata header (by setting it to empty) + req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'POST'}, + headers={'X-Timestamp': normalize_timestamp(4), + 'X-Account-Meta-Test': ''}) + resp = self.controller.POST(req) + self.assertEquals(resp.status_int, 204) + req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'HEAD'}) + resp = self.controller.HEAD(req) + self.assertEquals(resp.status_int, 204) + self.assert_('x-account-meta-test' not in resp.headers) + def test_GET_not_found_plain(self): req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'GET'}) resp = self.controller.GET(req) diff --git a/test/unit/common/test_db.py b/test/unit/common/test_db.py index 1979681e27..fbc161e1c7 100644 --- a/test/unit/common/test_db.py +++ b/test/unit/common/test_db.py @@ -367,6 +367,161 @@ class TestDatabaseBroker(unittest.TestCase): broker.merge_syncs([{'sync_point': 5, 'remote_id': uuid2}]) self.assertEquals(broker.get_sync(uuid2), 5) + def test_get_replication_info(self): + self.get_replication_info_tester(metadata=False) + + def test_get_replication_info_with_metadata(self): + self.get_replication_info_tester(metadata=True) + + def get_replication_info_tester(self, metadata=False): + broker = DatabaseBroker(':memory:', account='a') + broker.db_type = 'test' + broker.db_contains_type = 'test' + broker_creation = normalize_timestamp(1) + broker_uuid = str(uuid4()) + broker_metadata = metadata and simplejson.dumps( + {'Test': ('Value', normalize_timestamp(1))}) or '' + def _initialize(conn, put_timestamp): + if put_timestamp is None: + put_timestamp = normalize_timestamp(0) + conn.executescript(''' + CREATE TABLE test ( + ROWID INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT UNIQUE, + created_at TEXT + ); + CREATE TRIGGER test_insert AFTER INSERT ON test + BEGIN + UPDATE test_stat + SET test_count = test_count + 1, + hash = chexor(hash, new.name, new.created_at); + END; + CREATE TRIGGER test_update BEFORE UPDATE ON test + BEGIN + SELECT RAISE(FAIL, + 'UPDATE not allowed; DELETE and INSERT'); + END; + CREATE TRIGGER test_delete AFTER DELETE ON test + BEGIN + UPDATE test_stat + SET test_count = test_count - 1, + hash = chexor(hash, old.name, old.created_at); + END; + CREATE TABLE test_stat ( + account TEXT, + created_at TEXT, + put_timestamp TEXT DEFAULT '0', + delete_timestamp TEXT DEFAULT '0', + test_count INTEGER, + hash TEXT default '00000000000000000000000000000000', + id TEXT + %s + ); + INSERT INTO test_stat (test_count) VALUES (0); + ''' % (metadata and ", metadata TEXT DEFAULT ''" or "")) + conn.execute(''' + UPDATE test_stat + SET account = ?, created_at = ?, id = ?, put_timestamp = ? + ''', (broker.account, broker_creation, broker_uuid, put_timestamp)) + if metadata: + conn.execute('UPDATE test_stat SET metadata = ?', + (broker_metadata,)) + conn.commit() + broker._initialize = _initialize + put_timestamp = normalize_timestamp(2) + broker.initialize(put_timestamp) + info = broker.get_replication_info() + self.assertEquals(info, {'count': 0, + 'hash': '00000000000000000000000000000000', + 'created_at': broker_creation, 'put_timestamp': put_timestamp, + 'delete_timestamp': '0', 'max_row': -1, 'id': broker_uuid, + 'metadata': broker_metadata}) + insert_timestamp = normalize_timestamp(3) + with broker.get() as conn: + conn.execute(''' + INSERT INTO test (name, created_at) VALUES ('test', ?) + ''', (insert_timestamp,)) + conn.commit() + info = broker.get_replication_info() + self.assertEquals(info, {'count': 1, + 'hash': 'bdc4c93f574b0d8c2911a27ce9dd38ba', + 'created_at': broker_creation, 'put_timestamp': put_timestamp, + 'delete_timestamp': '0', 'max_row': 1, 'id': broker_uuid, + 'metadata': broker_metadata}) + with broker.get() as conn: + conn.execute('DELETE FROM test') + conn.commit() + info = broker.get_replication_info() + self.assertEquals(info, {'count': 0, + 'hash': '00000000000000000000000000000000', + 'created_at': broker_creation, 'put_timestamp': put_timestamp, + 'delete_timestamp': '0', 'max_row': 1, 'id': broker_uuid, + 'metadata': broker_metadata}) + return broker + + def test_metadata(self): + # Initializes a good broker for us + broker = self.get_replication_info_tester(metadata=True) + # Add our first item + first_timestamp = normalize_timestamp(1) + first_value = '1' + broker.metadata = {'First': [first_value, first_timestamp]} + self.assert_('First' in broker.metadata) + self.assertEquals(broker.metadata['First'], + [first_value, first_timestamp]) + # Add our second item + second_timestamp = normalize_timestamp(2) + second_value = '2' + broker.metadata = {'Second': [second_value, second_timestamp]} + self.assert_('First' in broker.metadata) + self.assertEquals(broker.metadata['First'], + [first_value, first_timestamp]) + self.assert_('Second' in broker.metadata) + self.assertEquals(broker.metadata['Second'], + [second_value, second_timestamp]) + # Update our first item + first_timestamp = normalize_timestamp(3) + first_value = '1b' + broker.metadata = {'First': [first_value, first_timestamp]} + self.assert_('First' in broker.metadata) + self.assertEquals(broker.metadata['First'], + [first_value, first_timestamp]) + self.assert_('Second' in broker.metadata) + self.assertEquals(broker.metadata['Second'], + [second_value, second_timestamp]) + # Delete our second item (by setting to empty string) + second_timestamp = normalize_timestamp(4) + second_value = '' + broker.metadata = {'Second': [second_value, second_timestamp]} + self.assert_('First' in broker.metadata) + self.assertEquals(broker.metadata['First'], + [first_value, first_timestamp]) + self.assert_('Second' in broker.metadata) + self.assertEquals(broker.metadata['Second'], + [second_value, second_timestamp]) + # Reclaim at point before second item was deleted + broker.reclaim(normalize_timestamp(3)) + self.assert_('First' in broker.metadata) + self.assertEquals(broker.metadata['First'], + [first_value, first_timestamp]) + self.assert_('Second' in broker.metadata) + self.assertEquals(broker.metadata['Second'], + [second_value, second_timestamp]) + # Reclaim at point second item was deleted + broker.reclaim(normalize_timestamp(4)) + self.assert_('First' in broker.metadata) + self.assertEquals(broker.metadata['First'], + [first_value, first_timestamp]) + self.assert_('Second' in broker.metadata) + self.assertEquals(broker.metadata['Second'], + [second_value, second_timestamp]) + # Reclaim after point second item was deleted + broker.reclaim(normalize_timestamp(5)) + self.assert_('First' in broker.metadata) + self.assertEquals(broker.metadata['First'], + [first_value, first_timestamp]) + self.assert_('Second' not in broker.metadata) + class TestContainerBroker(unittest.TestCase): """ Tests for swift.common.db.ContainerBroker """ @@ -1119,6 +1274,78 @@ class TestContainerBroker(unittest.TestCase): self.assertEquals(rec['content_type'], 'text/plain') +def premetadata_create_container_stat_table(self, conn, put_timestamp=None): + """ + Copied from swift.common.db.ContainerBroker before the metadata column was + added; used for testing with TestContainerBrokerBeforeMetadata. + + Create the container_stat table which is specifc to the container DB. + + :param conn: DB connection object + :param put_timestamp: put timestamp + """ + if put_timestamp is None: + put_timestamp = normalize_timestamp(0) + conn.executescript(""" + CREATE TABLE container_stat ( + account TEXT, + container TEXT, + created_at TEXT, + put_timestamp TEXT DEFAULT '0', + delete_timestamp TEXT DEFAULT '0', + object_count INTEGER, + bytes_used INTEGER, + reported_put_timestamp TEXT DEFAULT '0', + reported_delete_timestamp TEXT DEFAULT '0', + reported_object_count INTEGER DEFAULT 0, + reported_bytes_used INTEGER DEFAULT 0, + hash TEXT default '00000000000000000000000000000000', + id TEXT, + status TEXT DEFAULT '', + status_changed_at TEXT DEFAULT '0' + ); + + INSERT INTO container_stat (object_count, bytes_used) + VALUES (0, 0); + """) + conn.execute(''' + UPDATE container_stat + SET account = ?, container = ?, created_at = ?, id = ?, + put_timestamp = ? + ''', (self.account, self.container, normalize_timestamp(time()), + str(uuid4()), put_timestamp)) + + +class TestContainerBrokerBeforeMetadata(TestContainerBroker): + """ + Tests for swift.common.db.ContainerBroker against databases created before + the metadata column was added. + """ + + def setUp(self): + self._imported_create_container_stat_table = \ + ContainerBroker.create_container_stat_table + ContainerBroker.create_container_stat_table = \ + premetadata_create_container_stat_table + broker = ContainerBroker(':memory:', account='a', container='c') + broker.initialize(normalize_timestamp('1')) + exc = None + with broker.get() as conn: + try: + conn.execute('SELECT metadata FROM container_stat') + except BaseException, err: + exc = err + self.assert_('no such column: metadata' in str(exc)) + + def tearDown(self): + ContainerBroker.create_container_stat_table = \ + self._imported_create_container_stat_table + broker = ContainerBroker(':memory:', account='a', container='c') + broker.initialize(normalize_timestamp('1')) + with broker.get() as conn: + conn.execute('SELECT metadata FROM container_stat') + + class TestAccountBroker(unittest.TestCase): """ Tests for swift.common.db.AccountBroker """ @@ -1575,5 +1802,70 @@ class TestAccountBroker(unittest.TestCase): sorted([rec['name'] for rec in items])) +def premetadata_create_account_stat_table(self, conn, put_timestamp): + """ + Copied from swift.common.db.AccountBroker before the metadata column was + added; used for testing with TestAccountBrokerBeforeMetadata. + + Create account_stat table which is specific to the account DB. + + :param conn: DB connection object + :param put_timestamp: put timestamp + """ + conn.executescript(""" + CREATE TABLE account_stat ( + account TEXT, + created_at TEXT, + put_timestamp TEXT DEFAULT '0', + delete_timestamp TEXT DEFAULT '0', + container_count INTEGER, + object_count INTEGER DEFAULT 0, + bytes_used INTEGER DEFAULT 0, + hash TEXT default '00000000000000000000000000000000', + id TEXT, + status TEXT DEFAULT '', + status_changed_at TEXT DEFAULT '0' + ); + + INSERT INTO account_stat (container_count) VALUES (0); + """) + + conn.execute(''' + UPDATE account_stat SET account = ?, created_at = ?, id = ?, + put_timestamp = ? + ''', (self.account, normalize_timestamp(time()), str(uuid4()), + put_timestamp)) + + +class TestAccountBrokerBeforeMetadata(TestAccountBroker): + """ + Tests for swift.common.db.AccountBroker against databases created before + the metadata column was added. + """ + + def setUp(self): + self._imported_create_account_stat_table = \ + AccountBroker.create_account_stat_table + AccountBroker.create_account_stat_table = \ + premetadata_create_account_stat_table + broker = AccountBroker(':memory:', account='a') + broker.initialize(normalize_timestamp('1')) + exc = None + with broker.get() as conn: + try: + conn.execute('SELECT metadata FROM account_stat') + except BaseException, err: + exc = err + self.assert_('no such column: metadata' in str(exc)) + + def tearDown(self): + AccountBroker.create_account_stat_table = \ + self._imported_create_account_stat_table + broker = AccountBroker(':memory:', account='a') + broker.initialize(normalize_timestamp('1')) + with broker.get() as conn: + conn.execute('SELECT metadata FROM account_stat') + + if __name__ == '__main__': unittest.main() diff --git a/test/unit/common/test_db_replicator.py b/test/unit/common/test_db_replicator.py index bd247772f0..4ee7f92089 100644 --- a/test/unit/common/test_db_replicator.py +++ b/test/unit/common/test_db_replicator.py @@ -19,7 +19,7 @@ import os import logging from swift.common import db_replicator -from swift.common import db, utils +from swift.common.utils import normalize_timestamp from swift.container import server as container_server @@ -61,13 +61,13 @@ def _mock_process(*args): yield db_replicator.subprocess.Popen = orig_process -class PostReplHttp: +class ReplHttp: def __init__(self, response=None): self.response = response - posted = False + replicated = False host = 'localhost' - def post(self, *args): - self.posted = True + def replicate(self, *args): + self.replicated = True class Response: status = 200 data = self.response @@ -125,18 +125,18 @@ class TestDBReplicator(unittest.TestCase): conn = db_replicator.ReplConnection(node, '1234567890', 'abcdefg', logging.getLogger()) def req(method, path, body, headers): - self.assertEquals(method, 'POST') + self.assertEquals(method, 'REPLICATE') self.assertEquals(headers['Content-Type'], 'application/json') class Resp: def read(self): return 'data' resp = Resp() conn.request = req conn.getresponse = lambda *args: resp - self.assertEquals(conn.post(1, 2, 3), resp) + self.assertEquals(conn.replicate(1, 2, 3), resp) def other_req(method, path, body, headers): raise Exception('blah') conn.request = other_req - self.assertEquals(conn.post(1, 2, 3), None) + self.assertEquals(conn.replicate(1, 2, 3), None) def test_rsync_file(self): replicator = TestReplicator({}, {}) @@ -153,7 +153,7 @@ class TestDBReplicator(unittest.TestCase): replicator = TestReplicator({}, {}) replicator._rsync_file = lambda *args: True fake_device = {'ip': '127.0.0.1', 'device': 'sda1'} - replicator._rsync_db(FakeBroker(), fake_device, PostReplHttp(), 'abcd') + replicator._rsync_db(FakeBroker(), fake_device, ReplHttp(), 'abcd') def test_in_sync(self): replicator = TestReplicator({}, {}) @@ -175,7 +175,7 @@ class TestDBReplicator(unittest.TestCase): replicator.replicate_once() def test_usync(self): - fake_http = PostReplHttp() + fake_http = ReplHttp() replicator = TestReplicator({}, {}) replicator._usync_db(0, FakeBroker(), fake_http, '12345', '67890') @@ -184,8 +184,9 @@ class TestDBReplicator(unittest.TestCase): fake_node = {'ip': '127.0.0.1', 'device': 'sda1', 'port': 1000} fake_info = {'id': 'a', 'point': -1, 'max_row': 0, 'hash': 'b', 'created_at': 100, 'put_timestamp': 0, - 'delete_timestamp': 0} - replicator._http_connect = lambda *args: PostReplHttp('{"id": 3, "point": -1}') + 'delete_timestamp': 0, + 'metadata': {'Test': ('Value', normalize_timestamp(1))}} + replicator._http_connect = lambda *args: ReplHttp('{"id": 3, "point": -1}') self.assertEquals(replicator._repl_to_node( fake_node, FakeBroker(), '0', fake_info), True) diff --git a/test/unit/container/test_server.py b/test/unit/container/test_server.py index b38c5489e4..bc0001674a 100644 --- a/test/unit/container/test_server.py +++ b/test/unit/container/test_server.py @@ -93,6 +93,98 @@ class TestContainerController(unittest.TestCase): resp = self.controller.PUT(req) self.assertEquals(resp.status_int, 404) + def test_PUT_GET_metadata(self): + # Set metadata header + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': normalize_timestamp(1), + 'X-Container-Meta-Test': 'Value'}) + resp = self.controller.PUT(req) + self.assertEquals(resp.status_int, 201) + req = Request.blank('/sda1/p/a/c') + resp = self.controller.GET(req) + self.assertEquals(resp.status_int, 204) + self.assertEquals(resp.headers.get('x-container-meta-test'), 'Value') + # Update metadata header + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': normalize_timestamp(3), + 'X-Container-Meta-Test': 'New Value'}) + resp = self.controller.PUT(req) + self.assertEquals(resp.status_int, 202) + req = Request.blank('/sda1/p/a/c') + resp = self.controller.GET(req) + self.assertEquals(resp.status_int, 204) + self.assertEquals(resp.headers.get('x-container-meta-test'), + 'New Value') + # Send old update to metadata header + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': normalize_timestamp(2), + 'X-Container-Meta-Test': 'Old Value'}) + resp = self.controller.PUT(req) + self.assertEquals(resp.status_int, 202) + req = Request.blank('/sda1/p/a/c') + resp = self.controller.GET(req) + self.assertEquals(resp.status_int, 204) + self.assertEquals(resp.headers.get('x-container-meta-test'), + 'New Value') + # Remove metadata header (by setting it to empty) + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': normalize_timestamp(4), + 'X-Container-Meta-Test': ''}) + resp = self.controller.PUT(req) + self.assertEquals(resp.status_int, 202) + req = Request.blank('/sda1/p/a/c') + resp = self.controller.GET(req) + self.assertEquals(resp.status_int, 204) + self.assert_('x-container-meta-test' not in resp.headers) + + def test_POST_HEAD_metadata(self): + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': normalize_timestamp(1)}) + resp = self.controller.PUT(req) + self.assertEquals(resp.status_int, 201) + # Set metadata header + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'POST'}, + headers={'X-Timestamp': normalize_timestamp(1), + 'X-Container-Meta-Test': 'Value'}) + resp = self.controller.POST(req) + self.assertEquals(resp.status_int, 204) + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'HEAD'}) + resp = self.controller.HEAD(req) + self.assertEquals(resp.status_int, 204) + self.assertEquals(resp.headers.get('x-container-meta-test'), 'Value') + # Update metadata header + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'POST'}, + headers={'X-Timestamp': normalize_timestamp(3), + 'X-Container-Meta-Test': 'New Value'}) + resp = self.controller.POST(req) + self.assertEquals(resp.status_int, 204) + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'HEAD'}) + resp = self.controller.HEAD(req) + self.assertEquals(resp.status_int, 204) + self.assertEquals(resp.headers.get('x-container-meta-test'), + 'New Value') + # Send old update to metadata header + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'POST'}, + headers={'X-Timestamp': normalize_timestamp(2), + 'X-Container-Meta-Test': 'Old Value'}) + resp = self.controller.POST(req) + self.assertEquals(resp.status_int, 204) + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'HEAD'}) + resp = self.controller.HEAD(req) + self.assertEquals(resp.status_int, 204) + self.assertEquals(resp.headers.get('x-container-meta-test'), + 'New Value') + # Remove metadata header (by setting it to empty) + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'POST'}, + headers={'X-Timestamp': normalize_timestamp(4), + 'X-Container-Meta-Test': ''}) + resp = self.controller.POST(req) + self.assertEquals(resp.status_int, 204) + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'HEAD'}) + resp = self.controller.HEAD(req) + self.assertEquals(resp.status_int, 204) + self.assert_('x-container-meta-test' not in resp.headers) + def test_DELETE_obj_not_found(self): req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'DELETE'},