From 4321bb0af6d7f7593ec507b20a519909055fd9f0 Mon Sep 17 00:00:00 2001 From: Clay Gerrard Date: Tue, 27 May 2014 16:57:25 -0700 Subject: [PATCH] Add Storage Policy support to Containers Containers now have a storage policy index associated with them, stored in the container_stat table. This index is only settable at container creation time (PUT request), and cannot be changed without deleting and recreating the container. This is because a container's policy index will apply to all its objects, so changing a container's policy index would require moving large amounts of object data around. If a user wants to change the policy for data in a container, they must create a new container with the desired policy and move the data over. Keep status_changed_at up-to-date with status changes. In particular during container recreation and replication. When a container-server receives a PUT for a deleted database an extra UPDATE is issued against the container_stat table to notate the x-timestamp of the request. During replication if merge_timestamps causes a container's status to change (from DELETED to ACTIVE or vice-versa) the status_changed_at field is set to the current time. Accurate reporting of status_changed_at is useful for container replication forensics and allows resolution of "set on create" attributes like the upcoming storage_policy_index. Expose Backend container info on deleted containers. Include basic container info in backend headers on 404 responses from the container server. Default empty values are used as placeholders if the database does not exist. Specifically the X-Backend-Status-Changed-At, X-Backend-DELETE-Timestamp and the X-Backend-Storage-Policy-Index value will be needed by the reconciler to deal with reconciling out of order object writes in the face of recently deleted containers. * Add "status_changed_at" key to the response from ContainerBroker.get_info. * Add "Status Timestamp" field to swift.cli.info.print_db_info_metadata. * Add "status_changed_at" key to the response from AccountBroker.get_info. DocImpact Implements: blueprint storage-policies Change-Id: Ie6d388f067f5b096b0f96faef151120ba23c8748 --- swift/account/backend.py | 2 +- swift/common/db.py | 86 +- swift/container/auditor.py | 4 +- swift/container/backend.py | 637 ++++++++++++--- swift/container/server.py | 152 +++- swift/proxy/controllers/base.py | 11 + swift/proxy/controllers/container.py | 37 +- test/unit/__init__.py | 18 +- test/unit/common/ring/test_ring.py | 19 +- test/unit/common/test_db.py | 154 +++- test/unit/container/test_auditor.py | 52 +- test/unit/container/test_backend.py | 751 ++++++++++++++++-- test/unit/container/test_server.py | 511 +++++++++++- test/unit/container/test_updater.py | 4 +- test/unit/proxy/controllers/test_container.py | 8 +- test/unit/proxy/test_server.py | 141 +++- 16 files changed, 2269 insertions(+), 318 deletions(-) diff --git a/swift/account/backend.py b/swift/account/backend.py index f29e64d93b..1a940c49ec 100644 --- a/swift/account/backend.py +++ b/swift/account/backend.py @@ -37,7 +37,7 @@ class AccountBroker(DatabaseBroker): db_contains_type = 'container' db_reclaim_timestamp = 'delete_timestamp' - def _initialize(self, conn, put_timestamp): + def _initialize(self, conn, put_timestamp, **kwargs): """ Create a brand new account database (tables, indices, triggers, etc.) diff --git a/swift/common/db.py b/swift/common/db.py index e3346bea3e..e6812d86cd 100644 --- a/swift/common/db.py +++ b/swift/common/db.py @@ -215,11 +215,15 @@ class DatabaseBroker(object): """ return self.db_file - def initialize(self, put_timestamp=None): + def initialize(self, put_timestamp=None, storage_policy_index=None): """ Create the DB + The storage_policy_index is passed through to the subclass's + ``_initialize`` method. It is ignored by ``AccountBroker``. + :param put_timestamp: timestamp of initial PUT request + :param storage_policy_index: only required for containers """ if self.db_file == ':memory:': tmp_db_file = None @@ -277,7 +281,8 @@ class DatabaseBroker(object): """) if not put_timestamp: put_timestamp = normalize_timestamp(0) - self._initialize(conn, put_timestamp) + self._initialize(conn, put_timestamp, + storage_policy_index=storage_policy_index) conn.commit() if tmp_db_file: conn.close() @@ -428,6 +433,7 @@ class DatabaseBroker(object): :param put_timestamp: put timestamp :param delete_timestamp: delete timestamp """ + current_status = self.is_deleted() with self.get() as conn: conn.execute(''' UPDATE %s_stat SET created_at=MIN(?, created_at), @@ -435,6 +441,9 @@ class DatabaseBroker(object): delete_timestamp=MAX(?, delete_timestamp) ''' % self.db_type, (created_at, put_timestamp, delete_timestamp)) conn.commit() + if self.is_deleted() != current_status: + timestamp = normalize_timestamp(time.time()) + self.update_status_changed_at(timestamp) def get_items_since(self, start, count): """ @@ -486,32 +495,36 @@ class DatabaseBroker(object): result.append({'remote_id': row[0], 'sync_point': row[1]}) return result + def get_max_row(self): + query = ''' + SELECT SQLITE_SEQUENCE.seq + FROM SQLITE_SEQUENCE + WHERE SQLITE_SEQUENCE.name == '%s' + LIMIT 1 + ''' % (self.db_contains_type) + with self.get() as conn: + row = conn.execute(query).fetchone() + return row[0] if row else -1 + def get_replication_info(self): """ Get information about the DB required for replication. - :returns: dict containing keys: hash, id, created_at, put_timestamp, - delete_timestamp, count, max_row, and metadata + :returns: dict containing keys from get_info plus max_row and metadata + + Note:: get_info's _count is translated to just + "count" and metadata is the raw string. """ + info = self.get_info() + info['count'] = info.pop('%s_count' % self.db_contains_type) + info['metadata'] = self.get_raw_metadata() + info['max_row'] = self.get_max_row() + return info + + def get_info(self): self._commit_puts_stale_ok() - query_part1 = ''' - SELECT hash, id, created_at, put_timestamp, delete_timestamp, - %s_count AS count, - CASE WHEN SQLITE_SEQUENCE.seq IS NOT NULL - THEN SQLITE_SEQUENCE.seq ELSE -1 END AS max_row, ''' % \ - self.db_contains_type - query_part2 = ''' - FROM (%s_stat LEFT JOIN SQLITE_SEQUENCE - ON SQLITE_SEQUENCE.name == '%s') LIMIT 1 - ''' % (self.db_type, self.db_contains_type) with self.get() as conn: - try: - curs = conn.execute(query_part1 + 'metadata' + query_part2) - except sqlite3.OperationalError as err: - if 'no such column: metadata' not in str(err): - raise - curs = conn.execute(query_part1 + "'' as metadata" + - query_part2) + curs = conn.execute('SELECT * from %s_stat' % self.db_type) curs.row_factory = dict_factory return curs.fetchone() @@ -621,13 +634,7 @@ class DatabaseBroker(object): with open(self.db_file, 'rb+') as fp: fallocate(fp.fileno(), int(prealloc_size)) - @property - def metadata(self): - """ - Returns the metadata dict for the database. The metadata dict values - are tuples of (value, timestamp) where the timestamp indicates when - that key was set to that value. - """ + def get_raw_metadata(self): with self.get() as conn: try: metadata = conn.execute('SELECT metadata FROM %s_stat' % @@ -636,6 +643,16 @@ class DatabaseBroker(object): if 'no such column: metadata' not in str(err): raise metadata = '' + return metadata + + @property + def metadata(self): + """ + Returns the metadata dict for the database. The metadata dict values + are tuples of (value, timestamp) where the timestamp indicates when + that key was set to that value. + """ + metadata = self.get_raw_metadata() if metadata: metadata = json.loads(metadata) utf8encodekeys(metadata) @@ -758,3 +775,16 @@ class DatabaseBroker(object): ' WHERE put_timestamp < ?' % self.db_type, (timestamp, timestamp)) conn.commit() + + def update_status_changed_at(self, timestamp): + """ + Update the status_changed_at field in the stat table. Only + modifies status_changed_at if the timestamp is greater than the + current status_changed_at timestamp. + """ + with self.get() as conn: + conn.execute( + 'UPDATE %s_stat SET status_changed_at = ?' + ' WHERE status_changed_at < ?' % self.db_type, + (timestamp, timestamp)) + conn.commit() diff --git a/swift/container/auditor.py b/swift/container/auditor.py index 104cc4889f..c7d3aec893 100644 --- a/swift/container/auditor.py +++ b/swift/container/auditor.py @@ -30,9 +30,9 @@ from swift.common.daemon import Daemon class ContainerAuditor(Daemon): """Audit containers.""" - def __init__(self, conf): + def __init__(self, conf, logger=None): self.conf = conf - self.logger = get_logger(conf, log_route='container-auditor') + self.logger = logger or get_logger(conf, log_route='container-auditor') self.devices = conf.get('devices', '/srv/node') self.mount_check = config_true_value(conf.get('mount_check', 'true')) self.interval = int(conf.get('interval', 1800)) diff --git a/swift/container/backend.py b/swift/container/backend.py index 38cdaac89e..584cb4ef4f 100644 --- a/swift/container/backend.py +++ b/swift/container/backend.py @@ -30,6 +30,109 @@ from swift.common.db import DatabaseBroker, DatabaseConnectionError, \ DATADIR = 'containers' +POLICY_STAT_TABLE_CREATE = ''' + CREATE TABLE policy_stat ( + storage_policy_index INTEGER PRIMARY KEY, + object_count INTEGER DEFAULT 0, + bytes_used INTEGER DEFAULT 0 + ); +''' + +POLICY_STAT_TRIGGER_SCRIPT = ''' + CREATE TRIGGER object_insert_policy_stat AFTER INSERT ON object + BEGIN + UPDATE policy_stat + SET object_count = object_count + (1 - new.deleted), + bytes_used = bytes_used + new.size + WHERE storage_policy_index = new.storage_policy_index; + INSERT INTO policy_stat ( + storage_policy_index, object_count, bytes_used) + SELECT new.storage_policy_index, + (1 - new.deleted), + new.size + WHERE NOT EXISTS( + SELECT changes() as change + FROM policy_stat + WHERE change <> 0 + ); + UPDATE container_info + SET hash = chexor(hash, new.name, new.created_at); + END; + + CREATE TRIGGER object_delete_policy_stat AFTER DELETE ON object + BEGIN + UPDATE policy_stat + SET object_count = object_count - (1 - old.deleted), + bytes_used = bytes_used - old.size + WHERE storage_policy_index = old.storage_policy_index; + UPDATE container_info + SET hash = chexor(hash, old.name, old.created_at); + END; +''' + +CONTAINER_INFO_TABLE_SCRIPT = ''' + CREATE TABLE container_info ( + account TEXT, + container TEXT, + created_at TEXT, + put_timestamp TEXT DEFAULT '0', + delete_timestamp TEXT DEFAULT '0', + reported_put_timestamp TEXT DEFAULT '0', + reported_delete_timestamp TEXT DEFAULT '0', + reported_object_count INTEGER DEFAULT 0, + reported_bytes_used INTEGER DEFAULT 0, + hash TEXT default '00000000000000000000000000000000', + id TEXT, + status TEXT DEFAULT '', + status_changed_at TEXT DEFAULT '0', + metadata TEXT DEFAULT '', + x_container_sync_point1 INTEGER DEFAULT -1, + x_container_sync_point2 INTEGER DEFAULT -1, + storage_policy_index INTEGER DEFAULT 0, + reconciler_sync_point INTEGER DEFAULT -1 + ); +''' + +CONTAINER_STAT_VIEW_SCRIPT = ''' + CREATE VIEW container_stat + AS SELECT ci.account, ci.container, ci.created_at, + ci.put_timestamp, ci.delete_timestamp, + ci.reported_put_timestamp, ci.reported_delete_timestamp, + ci.reported_object_count, ci.reported_bytes_used, ci.hash, + ci.id, ci.status, ci.status_changed_at, ci.metadata, + ci.x_container_sync_point1, ci.x_container_sync_point2, + ci.reconciler_sync_point, + ci.storage_policy_index, + coalesce(ps.object_count, 0) AS object_count, + coalesce(ps.bytes_used, 0) AS bytes_used + FROM container_info ci LEFT JOIN policy_stat ps + ON ci.storage_policy_index = ps.storage_policy_index; + + CREATE TRIGGER container_stat_update + INSTEAD OF UPDATE ON container_stat + BEGIN + UPDATE container_info + SET account = NEW.account, + container = NEW.container, + created_at = NEW.created_at, + put_timestamp = NEW.put_timestamp, + delete_timestamp = NEW.delete_timestamp, + reported_put_timestamp = NEW.reported_put_timestamp, + reported_delete_timestamp = NEW.reported_delete_timestamp, + reported_object_count = NEW.reported_object_count, + reported_bytes_used = NEW.reported_bytes_used, + hash = NEW.hash, + id = NEW.id, + status = NEW.status, + status_changed_at = NEW.status_changed_at, + metadata = NEW.metadata, + x_container_sync_point1 = NEW.x_container_sync_point1, + x_container_sync_point2 = NEW.x_container_sync_point2, + storage_policy_index = NEW.storage_policy_index, + reconciler_sync_point = NEW.reconciler_sync_point; + END; +''' + class ContainerBroker(DatabaseBroker): """Encapsulates working with a container database.""" @@ -37,7 +140,14 @@ class ContainerBroker(DatabaseBroker): db_contains_type = 'object' db_reclaim_timestamp = 'created_at' - def _initialize(self, conn, put_timestamp): + @property + def storage_policy_index(self): + if not hasattr(self, '_storage_policy_index'): + self._storage_policy_index = \ + self.get_info()['storage_policy_index'] + return self._storage_policy_index + + def _initialize(self, conn, put_timestamp, storage_policy_index): """ Create a brand new container database (tables, indices, triggers, etc.) """ @@ -48,7 +158,9 @@ class ContainerBroker(DatabaseBroker): raise ValueError( 'Attempting to create a new database with no container set') self.create_object_table(conn) - self.create_container_stat_table(conn, put_timestamp) + self.create_policy_stat_table(conn, storage_policy_index) + self.create_container_info_table(conn, put_timestamp, + storage_policy_index) def create_object_table(self, conn): """ @@ -65,74 +177,70 @@ class ContainerBroker(DatabaseBroker): size INTEGER, content_type TEXT, etag TEXT, - deleted INTEGER DEFAULT 0 + deleted INTEGER DEFAULT 0, + storage_policy_index INTEGER DEFAULT 0 ); CREATE INDEX ix_object_deleted_name ON object (deleted, name); - CREATE TRIGGER object_insert AFTER INSERT ON object - BEGIN - UPDATE container_stat - SET object_count = object_count + (1 - new.deleted), - bytes_used = bytes_used + new.size, - hash = chexor(hash, new.name, new.created_at); - END; - CREATE TRIGGER object_update BEFORE UPDATE ON object BEGIN SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT'); END; - CREATE TRIGGER object_delete AFTER DELETE ON object - BEGIN - UPDATE container_stat - SET object_count = object_count - (1 - old.deleted), - bytes_used = bytes_used - old.size, - hash = chexor(hash, old.name, old.created_at); - END; - """) + """ + POLICY_STAT_TRIGGER_SCRIPT) - def create_container_stat_table(self, conn, put_timestamp=None): + def create_container_info_table(self, conn, put_timestamp, + storage_policy_index): """ - Create the container_stat table which is specific to the container DB. + Create the container_info table which is specific to the container DB. Not a part of Pluggable Back-ends, internal to the baseline code. + Also creates the container_stat view. :param conn: DB connection object :param put_timestamp: put timestamp + :param storage_policy_index: storage policy index """ if put_timestamp is None: put_timestamp = normalize_timestamp(0) - conn.executescript(""" - CREATE TABLE container_stat ( - account TEXT, - container TEXT, - created_at TEXT, - put_timestamp TEXT DEFAULT '0', - delete_timestamp TEXT DEFAULT '0', - object_count INTEGER, - bytes_used INTEGER, - reported_put_timestamp TEXT DEFAULT '0', - reported_delete_timestamp TEXT DEFAULT '0', - reported_object_count INTEGER DEFAULT 0, - reported_bytes_used INTEGER DEFAULT 0, - hash TEXT default '00000000000000000000000000000000', - id TEXT, - status TEXT DEFAULT '', - status_changed_at TEXT DEFAULT '0', - metadata TEXT DEFAULT '', - x_container_sync_point1 INTEGER DEFAULT -1, - x_container_sync_point2 INTEGER DEFAULT -1 - ); + # The container_stat view is for compatibility; old versions of Swift + # expected a container_stat table with columns "object_count" and + # "bytes_used", but when that stuff became per-storage-policy and + # moved to the policy_stat table, we stopped creating those columns in + # container_stat. + # + # To retain compatibility, we create the container_stat view with some + # triggers to make it behave like the old container_stat table. This + # way, if an old version of Swift encounters a database with the new + # schema, it can still work. + # + # Note that this can occur during a rolling Swift upgrade if a DB gets + # rsynced from an old node to a new, so it's necessary for + # availability during upgrades. The fact that it enables downgrades is + # a nice bonus. + conn.executescript(CONTAINER_INFO_TABLE_SCRIPT + + CONTAINER_STAT_VIEW_SCRIPT) + conn.execute(""" + INSERT INTO container_info (account, container, created_at, id, + put_timestamp, status_changed_at, storage_policy_index) + VALUES (?, ?, ?, ?, ?, ?, ?); + """, (self.account, self.container, normalize_timestamp(time.time()), + str(uuid4()), put_timestamp, put_timestamp, + storage_policy_index)) - INSERT INTO container_stat (object_count, bytes_used) - VALUES (0, 0); - """) - conn.execute(''' - UPDATE container_stat - SET account = ?, container = ?, created_at = ?, id = ?, - put_timestamp = ? - ''', (self.account, self.container, normalize_timestamp(time.time()), - str(uuid4()), put_timestamp)) + def create_policy_stat_table(self, conn, storage_policy_index=0): + """ + Create policy_stat table. + + :param conn: DB connection object + :param storage_policy_index: the policy_index the container is + being created with + """ + conn.executescript(POLICY_STAT_TABLE_CREATE) + conn.execute(""" + INSERT INTO policy_stat (storage_policy_index) + VALUES (?) + """, (storage_policy_index,)) def get_db_version(self, conn): if self._db_version == -1: @@ -165,14 +273,19 @@ class ContainerBroker(DatabaseBroker): def _commit_puts_load(self, item_list, entry): """See :func:`swift.common.db.DatabaseBroker._commit_puts_load`""" - (name, timestamp, size, content_type, etag, deleted) = \ - pickle.loads(entry.decode('base64')) + data = pickle.loads(entry.decode('base64')) + (name, timestamp, size, content_type, etag, deleted) = data[:6] + if len(data) > 6: + storage_policy_index = data[6] + else: + storage_policy_index = 0 item_list.append({'name': name, 'created_at': timestamp, 'size': size, 'content_type': content_type, 'etag': etag, - 'deleted': deleted}) + 'deleted': deleted, + 'storage_policy_index': storage_policy_index}) def empty(self): """ @@ -182,20 +295,30 @@ class ContainerBroker(DatabaseBroker): """ self._commit_puts_stale_ok() with self.get() as conn: - row = conn.execute( - 'SELECT object_count from container_stat').fetchone() + try: + row = conn.execute( + 'SELECT max(object_count) from policy_stat').fetchone() + except sqlite3.OperationalError as err: + if not any(msg in str(err) for msg in ( + "no such column: storage_policy_index", + "no such table: policy_stat")): + raise + row = conn.execute( + 'SELECT object_count from container_stat').fetchone() return (row[0] == 0) - def delete_object(self, name, timestamp): + def delete_object(self, name, timestamp, storage_policy_index=0): """ Mark an object deleted. :param name: object name to be deleted :param timestamp: timestamp when the object was marked as deleted """ - self.put_object(name, timestamp, 0, 'application/deleted', 'noetag', 1) + self.put_object(name, timestamp, 0, 'application/deleted', 'noetag', + deleted=1, storage_policy_index=storage_policy_index) - def put_object(self, name, timestamp, size, content_type, etag, deleted=0): + def put_object(self, name, timestamp, size, content_type, etag, deleted=0, + storage_policy_index=0): """ Creates an object in the DB with its metadata. @@ -206,10 +329,12 @@ class ContainerBroker(DatabaseBroker): :param etag: object etag :param deleted: if True, marks the object as deleted and sets the deteleted_at timestamp to timestamp + :param storage_policy_index: the storage policy index for the object """ record = {'name': name, 'created_at': timestamp, 'size': size, 'content_type': content_type, 'etag': etag, - 'deleted': deleted} + 'deleted': deleted, + 'storage_policy_index': storage_policy_index} if self.db_file == ':memory:': self.merge_items([record]) return @@ -231,93 +356,101 @@ class ContainerBroker(DatabaseBroker): # delimiter fp.write(':') fp.write(pickle.dumps( - (name, timestamp, size, content_type, etag, deleted), + (name, timestamp, size, content_type, etag, deleted, + storage_policy_index), protocol=PICKLE_PROTOCOL).encode('base64')) fp.flush() - def is_deleted(self, timestamp=None): + def is_deleted(self, **kwargs): """ Check if the DB is considered to be deleted. :returns: True if the DB is considered to be deleted, False otherwise """ + _info, is_deleted = self.get_info_is_deleted(**kwargs) + return is_deleted + + def get_info_is_deleted(self, timestamp=None): + """ + Get the is_deleted status and info for the container. + + :returns: a tuple, in the form (info, is_deleted) info is a dict as + returned by get_info and is_deleted is a boolean. + """ if self.db_file != ':memory:' and not os.path.exists(self.db_file): - return True - self._commit_puts_stale_ok() - with self.get() as conn: - row = conn.execute(''' - SELECT put_timestamp, delete_timestamp, object_count - FROM container_stat''').fetchone() - # leave this db as a tombstone for a consistency window - if timestamp and row['delete_timestamp'] > timestamp: - return False - # The container is considered deleted if the delete_timestamp - # value is greater than the put_timestamp, and there are no - # objects in the container. - return (row['object_count'] in (None, '', 0, '0')) and \ - (float(row['delete_timestamp']) > float(row['put_timestamp'])) + return {}, True + info = self.get_info() + # leave this db as a tombstone for a consistency window + if timestamp and info['delete_timestamp'] > timestamp: + return info, False + # The container is considered deleted if the delete_timestamp + # value is greater than the put_timestamp, and there are no + # objects in the container. + is_deleted = (info['object_count'] in (None, '', 0, '0')) and \ + (float(info['delete_timestamp']) > float(info['put_timestamp'])) + return info, is_deleted def get_info(self): """ Get global data for the container. :returns: dict with keys: account, container, created_at, - put_timestamp, delete_timestamp, object_count, bytes_used, - reported_put_timestamp, reported_delete_timestamp, - reported_object_count, reported_bytes_used, hash, id, - x_container_sync_point1, and x_container_sync_point2. + put_timestamp, delete_timestamp, status_changed_at, + object_count, bytes_used, reported_put_timestamp, + reported_delete_timestamp, reported_object_count, + reported_bytes_used, hash, id, x_container_sync_point1, + x_container_sync_point2, and storage_policy_index. """ self._commit_puts_stale_ok() with self.get() as conn: data = None - trailing = 'x_container_sync_point1, x_container_sync_point2' + trailing_sync = 'x_container_sync_point1, x_container_sync_point2' + trailing_pol = 'storage_policy_index' + errors = set() while not data: try: - data = conn.execute(''' + data = conn.execute((''' SELECT account, container, created_at, put_timestamp, - delete_timestamp, object_count, bytes_used, + delete_timestamp, status_changed_at, + object_count, bytes_used, reported_put_timestamp, reported_delete_timestamp, reported_object_count, reported_bytes_used, hash, - id, %s - FROM container_stat - ''' % (trailing,)).fetchone() + id, %s, %s + FROM container_stat + ''') % (trailing_sync, trailing_pol)).fetchone() except sqlite3.OperationalError as err: - if 'no such column: x_container_sync_point' in str(err): - trailing = '-1 AS x_container_sync_point1, ' \ - '-1 AS x_container_sync_point2' + err_msg = str(err) + if err_msg in errors: + # only attempt migration once + raise + errors.add(err_msg) + if 'no such column: storage_policy_index' in err_msg: + trailing_pol = '0 AS storage_policy_index' + elif 'no such column: x_container_sync_point' in err_msg: + trailing_sync = '-1 AS x_container_sync_point1, ' \ + '-1 AS x_container_sync_point2' else: raise data = dict(data) + # populate instance cache + self._storage_policy_index = data['storage_policy_index'] + self.account = data['account'] + self.container = data['container'] return data def set_x_container_sync_points(self, sync_point1, sync_point2): with self.get() as conn: - orig_isolation_level = conn.isolation_level try: - # We turn off auto-transactions to ensure the alter table - # commands are part of the transaction. - conn.isolation_level = None - conn.execute('BEGIN') - try: - self._set_x_container_sync_points(conn, sync_point1, - sync_point2) - except sqlite3.OperationalError as err: - if 'no such column: x_container_sync_point' not in \ - str(err): - raise - conn.execute(''' - ALTER TABLE container_stat - ADD COLUMN x_container_sync_point1 INTEGER DEFAULT -1 - ''') - conn.execute(''' - ALTER TABLE container_stat - ADD COLUMN x_container_sync_point2 INTEGER DEFAULT -1 - ''') - self._set_x_container_sync_points(conn, sync_point1, - sync_point2) - conn.execute('COMMIT') - finally: - conn.isolation_level = orig_isolation_level + self._set_x_container_sync_points(conn, sync_point1, + sync_point2) + except sqlite3.OperationalError as err: + if 'no such column: x_container_sync_point' not in \ + str(err): + raise + self._migrate_add_container_sync_points(conn) + self._set_x_container_sync_points(conn, sync_point1, + sync_point2) + conn.commit() def _set_x_container_sync_points(self, conn, sync_point1, sync_point2): if sync_point1 is not None and sync_point2 is not None: @@ -337,6 +470,79 @@ class ContainerBroker(DatabaseBroker): SET x_container_sync_point2 = ? ''', (sync_point2,)) + def get_policy_stats(self): + with self.get() as conn: + try: + info = conn.execute(''' + SELECT storage_policy_index, object_count, bytes_used + FROM policy_stat + ''').fetchall() + except sqlite3.OperationalError as err: + if not any(msg in str(err) for msg in ( + "no such column: storage_policy_index", + "no such table: policy_stat")): + raise + info = conn.execute(''' + SELECT 0 as storage_policy_index, object_count, bytes_used + FROM container_stat + ''').fetchall() + policy_stats = {} + for row in info: + stats = dict(row) + key = stats.pop('storage_policy_index') + policy_stats[key] = stats + return policy_stats + + def has_multiple_policies(self): + with self.get() as conn: + try: + curs = conn.execute(''' + SELECT count(storage_policy_index) + FROM policy_stat + ''').fetchone() + except sqlite3.OperationalError as err: + if 'no such table: policy_stat' not in str(err): + raise + # no policy_stat row + return False + if curs and curs[0] > 1: + return True + # only one policy_stat row + return False + + def set_storage_policy_index(self, policy_index, timestamp=None): + """ + Update the container_stat policy_index and status_changed_at. + """ + if timestamp is None: + timestamp = normalize_timestamp(time.time()) + + def _setit(conn): + conn.execute(''' + INSERT OR IGNORE INTO policy_stat (storage_policy_index) + VALUES (?) + ''', (policy_index,)) + conn.execute(''' + UPDATE container_stat + SET storage_policy_index = ?, + status_changed_at = MAX(?, status_changed_at) + WHERE storage_policy_index <> ? + ''', (policy_index, timestamp, policy_index)) + conn.commit() + + with self.get() as conn: + try: + _setit(conn) + except sqlite3.OperationalError as err: + if not any(msg in str(err) for msg in ( + "no such column: storage_policy_index", + "no such table: policy_stat")): + raise + self._migrate_add_storage_policy(conn) + _setit(conn) + + self._storage_policy_index = policy_index + def reported(self, put_timestamp, delete_timestamp, object_count, bytes_used): """ @@ -356,7 +562,7 @@ class ContainerBroker(DatabaseBroker): conn.commit() def list_objects_iter(self, limit, marker, end_marker, prefix, delimiter, - path=None): + path=None, storage_policy_index=0): """ Get a list of objects sorted by name starting at marker onward, up to limit entries. Entries will begin with the prefix and will not @@ -409,9 +615,27 @@ class ContainerBroker(DatabaseBroker): query += ' +deleted = 0' else: query += ' deleted = 0' - query += ' ORDER BY name LIMIT ?' - query_args.append(limit - len(results)) - curs = conn.execute(query, query_args) + orig_tail_query = ''' + ORDER BY name LIMIT ? + ''' + orig_tail_args = [limit - len(results)] + # storage policy filter + policy_tail_query = ''' + AND storage_policy_index = ? + ''' + orig_tail_query + policy_tail_args = [storage_policy_index] + orig_tail_args + tail_query, tail_args = \ + policy_tail_query, policy_tail_args + try: + curs = conn.execute(query + tail_query, + tuple(query_args + tail_args)) + except sqlite3.OperationalError as err: + if 'no such column: storage_policy_index' not in str(err): + raise + tail_query, tail_args = \ + orig_tail_query, orig_tail_args + curs = conn.execute(query + tail_query, + tuple(query_args + tail_args)) curs.row_factory = None if prefix is None: @@ -466,26 +690,34 @@ class ContainerBroker(DatabaseBroker): 'size', 'content_type', 'etag', 'deleted'} :param source: if defined, update incoming_sync with the source """ - with self.get() as conn: + def _really_merge_items(conn): max_rowid = -1 for rec in item_list: + rec.setdefault('storage_policy_index', 0) # legacy query = ''' DELETE FROM object WHERE name = ? AND (created_at < ?) + AND storage_policy_index = ? ''' if self.get_db_version(conn) >= 1: query += ' AND deleted IN (0, 1)' - conn.execute(query, (rec['name'], rec['created_at'])) - query = 'SELECT 1 FROM object WHERE name = ?' + conn.execute(query, (rec['name'], rec['created_at'], + rec['storage_policy_index'])) + query = ''' + SELECT 1 FROM object WHERE name = ? + AND storage_policy_index = ? + ''' if self.get_db_version(conn) >= 1: query += ' AND deleted IN (0, 1)' - if not conn.execute(query, (rec['name'],)).fetchall(): + if not conn.execute(query, ( + rec['name'], rec['storage_policy_index'])).fetchall(): conn.execute(''' INSERT INTO object (name, created_at, size, - content_type, etag, deleted) - VALUES (?, ?, ?, ?, ?, ?) + content_type, etag, deleted, storage_policy_index) + VALUES (?, ?, ?, ?, ?, ?, ?) ''', ([rec['name'], rec['created_at'], rec['size'], - rec['content_type'], rec['etag'], rec['deleted']])) + rec['content_type'], rec['etag'], rec['deleted'], + rec['storage_policy_index']])) if source: max_rowid = max(max_rowid, rec['ROWID']) if source: @@ -500,3 +732,158 @@ class ContainerBroker(DatabaseBroker): WHERE remote_id=? ''', (max_rowid, source)) conn.commit() + + with self.get() as conn: + try: + return _really_merge_items(conn) + except sqlite3.OperationalError as err: + if 'no such column: storage_policy_index' not in str(err): + raise + self._migrate_add_storage_policy(conn) + return _really_merge_items(conn) + + def get_reconciler_sync(self): + with self.get() as conn: + try: + return conn.execute(''' + SELECT reconciler_sync_point FROM container_stat + ''').fetchone()[0] + except sqlite3.OperationalError as err: + if "no such column: reconciler_sync_point" not in str(err): + raise + return -1 + + def update_reconciler_sync(self, point): + query = ''' + UPDATE container_stat + SET reconciler_sync_point = ? + ''' + with self.get() as conn: + try: + conn.execute(query, (point,)) + except sqlite3.OperationalError as err: + if "no such column: reconciler_sync_point" not in str(err): + raise + self._migrate_add_storage_policy(conn) + conn.execute(query, (point,)) + conn.commit() + + def get_misplaced_since(self, start, count): + """ + Get a list of objects which are in a storage policy different + from the container's storage policy. + + :param start: last reconciler sync point + :param count: maximum number of entries to get + + :returns: list of dicts with keys: name, created_at, size, + content_type, etag, storage_policy_index + """ + qry = ''' + SELECT ROWID, name, created_at, size, content_type, etag, + deleted, storage_policy_index + FROM object + WHERE ROWID > ? + AND storage_policy_index != ( + SELECT storage_policy_index FROM container_stat LIMIT 1) + ORDER BY ROWID ASC LIMIT ? + ''' + self._commit_puts_stale_ok() + with self.get() as conn: + try: + cur = conn.execute(qry, (start, count)) + except sqlite3.OperationalError as err: + if "no such column: storage_policy_index" not in str(err): + raise + return [] + return list(dict(row) for row in cur.fetchall()) + + def _migrate_add_container_sync_points(self, conn): + """ + Add the x_container_sync_point columns to the 'container_stat' table. + """ + conn.executescript(''' + BEGIN; + ALTER TABLE container_stat + ADD COLUMN x_container_sync_point1 INTEGER DEFAULT -1; + ALTER TABLE container_stat + ADD COLUMN x_container_sync_point2 INTEGER DEFAULT -1; + COMMIT; + ''') + + def _migrate_add_storage_policy(self, conn): + """ + Migrate the container schema to support tracking objects from + multiple storage policies. If the container_stat table has any + pending migrations, they are applied now before copying into + container_info. + + * create the 'policy_stat' table. + * copy the current 'object_count' and 'bytes_used' columns to a + row in the 'policy_stat' table. + * add the storage_policy_index column to the 'object' table. + * drop the 'object_insert' and 'object_delete' triggers. + * add the 'object_insert_policy_stat' and + 'object_delete_policy_stat' triggers. + * create container_info table for non-policy container info + * insert values from container_stat into container_info + * drop container_stat table + * create container_stat view + """ + + # I tried just getting the list of column names in the current + # container_stat table with a pragma table_info, but could never get + # it inside the same transaction as the DDL (non-DML) statements: + # https://docs.python.org/2/library/sqlite3.html + # #controlling-transactions + # So we just apply all pending migrations to container_stat and copy a + # static known list of column names into container_info. + try: + self._migrate_add_container_sync_points(conn) + except sqlite3.OperationalError as e: + if 'duplicate column' in str(e): + conn.execute('ROLLBACK;') + else: + raise + + try: + conn.executescript(""" + ALTER TABLE container_stat + ADD COLUMN metadata TEXT DEFAULT ''; + """) + except sqlite3.OperationalError as e: + if 'duplicate column' not in str(e): + raise + + column_names = ', '.join(( + 'account', 'container', 'created_at', 'put_timestamp', + 'delete_timestamp', 'reported_put_timestamp', + 'reported_object_count', 'reported_bytes_used', 'hash', 'id', + 'status', 'status_changed_at', 'metadata', + 'x_container_sync_point1', 'x_container_sync_point2')) + + conn.executescript( + 'BEGIN;' + + POLICY_STAT_TABLE_CREATE + + ''' + INSERT INTO policy_stat ( + storage_policy_index, object_count, bytes_used) + SELECT 0, object_count, bytes_used + FROM container_stat; + + ALTER TABLE object + ADD COLUMN storage_policy_index INTEGER DEFAULT 0; + + DROP TRIGGER object_insert; + DROP TRIGGER object_delete; + ''' + + POLICY_STAT_TRIGGER_SCRIPT + + CONTAINER_INFO_TABLE_SCRIPT + + ''' + INSERT INTO container_info (%s) + SELECT %s FROM container_stat; + + DROP TABLE IF EXISTS container_stat; + ''' % (column_names, column_names) + + CONTAINER_STAT_VIEW_SCRIPT + + 'COMMIT;') diff --git a/swift/container/server.py b/swift/container/server.py index 94763a1631..2a4eb375ad 100644 --- a/swift/container/server.py +++ b/swift/container/server.py @@ -38,12 +38,40 @@ from swift.common.bufferedhttp import http_connect from swift.common.exceptions import ConnectionTimeout from swift.common.db_replicator import ReplicatorRpc from swift.common.http import HTTP_NOT_FOUND, is_success +from swift.common.storage_policy import POLICIES, POLICY_INDEX from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPConflict, \ HTTPCreated, HTTPInternalServerError, HTTPNoContent, HTTPNotFound, \ HTTPPreconditionFailed, HTTPMethodNotAllowed, Request, Response, \ HTTPInsufficientStorage, HTTPException, HeaderKeyDict +def gen_resp_headers(info, is_deleted=False): + """ + Convert container info dict to headers. + """ + # backend headers are always included + headers = { + 'X-Backend-Timestamp': normalize_timestamp(info.get('created_at', 0)), + 'X-Backend-PUT-Timestamp': normalize_timestamp( + info.get('put_timestamp', 0)), + 'X-Backend-DELETE-Timestamp': normalize_timestamp( + info.get('delete_timestamp', 0)), + 'X-Backend-Status-Changed-At': normalize_timestamp( + info.get('status_changed_at', 0)), + POLICY_INDEX: info.get('storage_policy_index', 0), + } + if not is_deleted: + # base container info on deleted containers is not exposed to client + headers.update({ + 'X-Container-Object-Count': info.get('object_count', 0), + 'X-Container-Bytes-Used': info.get('bytes_used', 0), + 'X-Timestamp': normalize_timestamp(info.get('created_at', 0)), + 'X-PUT-Timestamp': normalize_timestamp( + info.get('put_timestamp', 0)), + }) + return headers + + class ContainerController(object): """WSGI Controller for the container server.""" @@ -102,6 +130,32 @@ class ContainerController(object): kwargs.setdefault('logger', self.logger) return ContainerBroker(db_path, **kwargs) + def get_and_validate_policy_index(self, req): + """ + Validate that the index supplied maps to a policy. + + :returns: policy index from request, or None if not present + :raises: HTTPBadRequest if the supplied index is bogus + """ + + policy_index = req.headers.get(POLICY_INDEX, None) + if policy_index is None: + return None + + try: + policy_index = int(policy_index) + except ValueError: + raise HTTPBadRequest( + request=req, content_type="text/plain", + body=("Invalid X-Storage-Policy-Index %r" % policy_index)) + + policy = POLICIES.get_by_index(policy_index) + if policy is None: + raise HTTPBadRequest( + request=req, content_type="text/plain", + body=("Invalid X-Storage-Policy-Index %r" % policy_index)) + return int(policy) + def account_update(self, req, account, container, broker): """ Update the account server(s) with latest container info. @@ -199,9 +253,13 @@ class ContainerController(object): broker = self._get_container_broker(drive, part, account, container) if account.startswith(self.auto_create_account_prefix) and obj and \ not os.path.exists(broker.db_file): + requested_policy_index = (self.get_and_validate_policy_index(req) + or POLICIES.default.idx) try: - broker.initialize(normalize_timestamp( - req.headers.get('x-timestamp') or time.time())) + broker.initialize( + normalize_timestamp( + req.headers.get('x-timestamp') or time.time()), + requested_policy_index) except DatabaseAlreadyExists: pass if not os.path.exists(broker.db_file): @@ -225,19 +283,42 @@ class ContainerController(object): return HTTPNoContent(request=req) return HTTPNotFound() - def _update_or_create(self, req, broker, timestamp): + def _update_or_create(self, req, broker, timestamp, new_container_policy, + requested_policy_index): + """ + Create new database broker or update timestamps for existing database. + + :param req: the swob request object + :param broker: the broker instance for the container + :param timestamp: internalized timestamp + :param new_container_policy: the storage policy index to use + when creating the container + :param requested_policy_index: the storage policy index sent in the + request, may be None + :returns: created, a bool, if database did not previously exist + """ if not os.path.exists(broker.db_file): try: - broker.initialize(timestamp) + broker.initialize(timestamp, new_container_policy) except DatabaseAlreadyExists: pass else: return True # created - created = broker.is_deleted() + recreated = broker.is_deleted() + if recreated: + # only set storage policy on deleted containers + broker.set_storage_policy_index(new_container_policy, + timestamp=timestamp) + elif requested_policy_index is not None: + # validate requested policy with existing container + if requested_policy_index != broker.storage_policy_index: + raise HTTPConflict(request=req) broker.update_put_timestamp(timestamp) if broker.is_deleted(): raise HTTPConflict(request=req) - return created + if recreated: + broker.update_status_changed_at(timestamp) + return recreated @public @timing_stats() @@ -257,13 +338,14 @@ class ContainerController(object): return HTTPBadRequest(err) if self.mount_check and not check_mount(self.root, drive): return HTTPInsufficientStorage(drive=drive, request=req) + requested_policy_index = self.get_and_validate_policy_index(req) timestamp = normalize_timestamp(req.headers['x-timestamp']) broker = self._get_container_broker(drive, part, account, container) if obj: # put container object if account.startswith(self.auto_create_account_prefix) and \ not os.path.exists(broker.db_file): try: - broker.initialize(timestamp) + broker.initialize(timestamp, 0) except DatabaseAlreadyExists: pass if not os.path.exists(broker.db_file): @@ -273,20 +355,28 @@ class ContainerController(object): req.headers['x-etag']) return HTTPCreated(request=req) else: # put container - created = self._update_or_create(req, broker, timestamp) + if requested_policy_index is None: + # use the default index sent by the proxy if available + new_container_policy = req.headers.get( + 'X-Backend-Storage-Policy-Default', int(POLICIES.default)) + else: + new_container_policy = requested_policy_index + created = self._update_or_create(req, broker, + timestamp, + new_container_policy, + requested_policy_index) metadata = {} metadata.update( (key, (value, timestamp)) for key, value in req.headers.iteritems() if key.lower() in self.save_headers or is_sys_or_user_meta('container', key)) - if metadata: - if 'X-Container-Sync-To' in metadata: - if 'X-Container-Sync-To' not in broker.metadata or \ - metadata['X-Container-Sync-To'][0] != \ - broker.metadata['X-Container-Sync-To'][0]: - broker.set_x_container_sync_points(-1, -1) - broker.update_metadata(metadata) + if 'X-Container-Sync-To' in metadata: + if 'X-Container-Sync-To' not in broker.metadata or \ + metadata['X-Container-Sync-To'][0] != \ + broker.metadata['X-Container-Sync-To'][0]: + broker.set_x_container_sync_points(-1, -1) + broker.update_metadata(metadata) resp = self.account_update(req, account, container, broker) if resp: return resp @@ -307,15 +397,10 @@ class ContainerController(object): broker = self._get_container_broker(drive, part, account, container, pending_timeout=0.1, stale_reads_ok=True) - if broker.is_deleted(): - return HTTPNotFound(request=req) - info = broker.get_info() - headers = { - 'X-Container-Object-Count': info['object_count'], - 'X-Container-Bytes-Used': info['bytes_used'], - 'X-Timestamp': info['created_at'], - 'X-PUT-Timestamp': info['put_timestamp'], - } + info, is_deleted = broker.get_info_is_deleted() + headers = gen_resp_headers(info, is_deleted=is_deleted) + if is_deleted: + return HTTPNotFound(request=req, headers=headers) headers.update( (key, value) for key, (value, timestamp) in broker.metadata.iteritems() @@ -377,22 +462,17 @@ class ContainerController(object): broker = self._get_container_broker(drive, part, account, container, pending_timeout=0.1, stale_reads_ok=True) - if broker.is_deleted(): - return HTTPNotFound(request=req) - info = broker.get_info() + info, is_deleted = broker.get_info_is_deleted() + resp_headers = gen_resp_headers(info, is_deleted=is_deleted) + if is_deleted: + return HTTPNotFound(request=req, headers=resp_headers) container_list = broker.list_objects_iter(limit, marker, end_marker, prefix, delimiter, path) - return self.create_listing(req, out_content_type, info, + return self.create_listing(req, out_content_type, info, resp_headers, broker.metadata, container_list, container) - def create_listing(self, req, out_content_type, info, metadata, - container_list, container): - resp_headers = { - 'X-Container-Object-Count': info['object_count'], - 'X-Container-Bytes-Used': info['bytes_used'], - 'X-Timestamp': info['created_at'], - 'X-PUT-Timestamp': info['put_timestamp'], - } + def create_listing(self, req, out_content_type, info, resp_headers, + metadata, container_list, container): for key, (value, timestamp) in metadata.iteritems(): if value and (key.lower() in self.save_headers or is_sys_or_user_meta('container', key)): diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 139d6609f0..a45a3a2f95 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -50,6 +50,7 @@ from swift.common.swob import Request, Response, HeaderKeyDict, Range, \ HTTPException, HTTPRequestedRangeNotSatisfiable from swift.common.request_helpers import strip_sys_meta_prefix, \ strip_user_meta_prefix, is_user_meta, is_sys_meta, is_sys_or_user_meta +from swift.common.storage_policy import POLICY_INDEX, POLICY, POLICIES def update_headers(response, headers): @@ -1201,6 +1202,16 @@ class Controller(object): container, obj, res) except ValueError: pass + # if a backend policy index is present in resp headers, translate it + # here with the friendly policy name + if POLICY_INDEX in res.headers and is_success(res.status_int): + policy = POLICIES.get_by_index(res.headers[POLICY_INDEX]) + if policy: + res.headers[POLICY] = policy.name + else: + self.app.logger.error( + 'Could not translate %s (%r) from %r to policy', + POLICY_INDEX, res.headers[POLICY_INDEX], path) return res def is_origin_allowed(self, cors_info, origin): diff --git a/swift/proxy/controllers/container.py b/swift/proxy/controllers/container.py index 85e31fe4d7..77df764815 100644 --- a/swift/proxy/controllers/container.py +++ b/swift/proxy/controllers/container.py @@ -23,6 +23,7 @@ from swift.common import constraints from swift.common.http import HTTP_ACCEPTED from swift.proxy.controllers.base import Controller, delay_denial, \ cors_validation, clear_info_cache +from swift.common.storage_policy import POLICIES, POLICY, POLICY_INDEX from swift.common.swob import HTTPBadRequest, HTTPForbidden, \ HTTPNotFound @@ -34,7 +35,7 @@ class ContainerController(Controller): # Ensure these are all lowercase pass_through_headers = ['x-container-read', 'x-container-write', 'x-container-sync-key', 'x-container-sync-to', - 'x-versions-location'] + 'x-versions-location', POLICY_INDEX.lower()] def __init__(self, app, account_name, container_name, **kwargs): Controller.__init__(self, app) @@ -47,6 +48,27 @@ class ContainerController(Controller): 'x-remove-%s-write' % st, 'x-remove-versions-location'] + def _convert_policy_to_index(self, req): + """ + Helper method to convert a policy name (from a request from a client) + to a policy index (for a request to a backend). + + :param req: incoming request + """ + policy_name = req.headers.get(POLICY) + if not policy_name: + return + policy = POLICIES.get_by_name(policy_name) + if not policy: + raise HTTPBadRequest(request=req, + content_type="text/plain", + body=("Invalid %s '%s'" + % (POLICY, policy_name))) + if policy.is_deprecated: + body = 'Storage Policy %r is deprecated' % (policy.name) + raise HTTPBadRequest(request=req, body=body) + return int(policy) + def clean_acls(self, req): if 'swift.clean_acl' in req.environ: for header in ('x-container-read', 'x-container-write'): @@ -101,6 +123,7 @@ class ContainerController(Controller): self.clean_acls(req) or check_metadata(req, 'container') if error_response: return error_response + policy_index = self._convert_policy_to_index(req) if not req.environ.get('swift_owner'): for key in self.app.swift_owner_headers: req.headers.pop(key, None) @@ -128,7 +151,8 @@ class ContainerController(Controller): container_partition, containers = self.app.container_ring.get_nodes( self.account_name, self.container_name) headers = self._backend_requests(req, len(containers), - account_partition, accounts) + account_partition, accounts, + policy_index) clear_info_cache(self.app, req.environ, self.account_name, self.container_name) resp = self.make_requests( @@ -183,9 +207,14 @@ class ContainerController(Controller): return HTTPNotFound(request=req) return resp - def _backend_requests(self, req, n_outgoing, - account_partition, accounts): + def _backend_requests(self, req, n_outgoing, account_partition, accounts, + policy_index=None): additional = {'X-Timestamp': normalize_timestamp(time.time())} + if policy_index is None: + additional['X-Backend-Storage-Policy-Default'] = \ + int(POLICIES.default) + else: + additional[POLICY_INDEX] = str(policy_index) headers = [self.generate_request_headers(req, transfer=True, additional=additional) for _junk in range(n_outgoing)] diff --git a/test/unit/__init__.py b/test/unit/__init__.py index b3e6a41bc5..d44ee7346c 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -22,6 +22,7 @@ import errno import sys from contextlib import contextmanager, closing from collections import defaultdict, Iterable +from numbers import Number from tempfile import NamedTemporaryFile import time from eventlet.green import socket @@ -34,7 +35,6 @@ from hashlib import md5 from eventlet import sleep, Timeout import logging.handlers from httplib import HTTPException -from numbers import Number from swift.common import storage_policy import functools import cPickle as pickle @@ -327,6 +327,22 @@ def temptree(files, contents=''): rmtree(tempdir) +def with_tempdir(f): + """ + Decorator to give a single test a tempdir as argument to test method. + """ + @functools.wraps(f) + def wrapped(*args, **kwargs): + tempdir = mkdtemp() + args = list(args) + args.append(tempdir) + try: + return f(*args, **kwargs) + finally: + rmtree(tempdir) + return wrapped + + class NullLoggingHandler(logging.Handler): def emit(self, record): diff --git a/test/unit/common/ring/test_ring.py b/test/unit/common/ring/test_ring.py index 1892d19923..7f938f0cfe 100644 --- a/test/unit/common/ring/test_ring.py +++ b/test/unit/common/ring/test_ring.py @@ -28,6 +28,19 @@ from time import sleep, time from swift.common import ring, utils +class TestRingBase(unittest.TestCase): + + def setUp(self): + self._orig_hash_suffix = utils.HASH_PATH_SUFFIX + self._orig_hash_prefix = utils.HASH_PATH_PREFIX + utils.HASH_PATH_SUFFIX = 'endcap' + utils.HASH_PATH_PREFIX = '' + + def tearDown(self): + utils.HASH_PATH_SUFFIX = self._orig_hash_suffix + utils.HASH_PATH_PREFIX = self._orig_hash_prefix + + class TestRingData(unittest.TestCase): def setUp(self): @@ -109,11 +122,10 @@ class TestRingData(unittest.TestCase): '0644') -class TestRing(unittest.TestCase): +class TestRing(TestRingBase): def setUp(self): - utils.HASH_PATH_SUFFIX = 'endcap' - utils.HASH_PATH_PREFIX = '' + super(TestRing, self).setUp() self.testdir = mkdtemp() self.testgz = os.path.join(self.testdir, 'whatever.ring.gz') self.intended_replica2part2dev_id = [ @@ -147,6 +159,7 @@ class TestRing(unittest.TestCase): reload_time=self.intended_reload_time, ring_name='whatever') def tearDown(self): + super(TestRing, self).tearDown() rmtree(self.testdir, ignore_errors=1) def test_creation(self): diff --git a/test/unit/common/test_db.py b/test/unit/common/test_db.py index aa9ae11da4..5a9b79cd17 100644 --- a/test/unit/common/test_db.py +++ b/test/unit/common/test_db.py @@ -23,6 +23,9 @@ from uuid import uuid4 import simplejson import sqlite3 +import itertools +import time +import random from mock import patch, MagicMock from eventlet.timeout import Timeout @@ -31,7 +34,7 @@ import swift.common.db from swift.common.db import chexor, dict_factory, get_db_connection, \ DatabaseBroker, DatabaseConnectionError, DatabaseAlreadyExists, \ GreenDBConnection -from swift.common.utils import normalize_timestamp, mkdirs +from swift.common.utils import normalize_timestamp, mkdirs, json from swift.common.exceptions import LockTimeout @@ -82,6 +85,30 @@ class TestChexor(unittest.TestCase): 'd41d8cd98f00b204e9800998ecf8427e', None, normalize_timestamp(1)) + def test_chexor(self): + ts = (normalize_timestamp(ts) for ts in + itertools.count(int(time.time()))) + + objects = [ + ('frank', ts.next()), + ('bob', ts.next()), + ('tom', ts.next()), + ('frank', ts.next()), + ('tom', ts.next()), + ('bob', ts.next()), + ] + hash_ = '0' + random.shuffle(objects) + for obj in objects: + hash_ = chexor(hash_, *obj) + + other_hash = '0' + random.shuffle(objects) + for obj in objects: + other_hash = chexor(other_hash, *obj) + + self.assertEqual(hash_, other_hash) + class TestGreenDBConnection(unittest.TestCase): @@ -147,6 +174,28 @@ class TestGetDBConnection(unittest.TestCase): mock_db_cmd.call_count)) +class ExampleBroker(DatabaseBroker): + + db_type = 'test' + db_contains_type = 'test' + + def _initialize(self, conn, timestamp, **kwargs): + conn.executescript(''' + CREATE TABLE test_stat ( + name TEXT, + timestamp TEXT DEFAULT 0, + status_changed_at TEXT DEFAULT 0 + ); + CREATE TABLE test ( + ROWID INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT + ); + ''') + conn.execute(""" + INSERT INTO test_stat (name, timestamp) VALUES (?, ?) + """, (self.account, timestamp)) + + class TestDatabaseBroker(unittest.TestCase): def setUp(self): @@ -226,7 +275,7 @@ class TestDatabaseBroker(unittest.TestCase): broker.initialize, normalize_timestamp('1')) def test_delete_db(self): - def init_stub(conn, put_timestamp): + def init_stub(conn, put_timestamp, **kwargs): conn.execute('CREATE TABLE test (one TEXT)') conn.execute('CREATE TABLE test_stat (id TEXT)') conn.execute('INSERT INTO test_stat (id) VALUES (?)', @@ -383,7 +432,7 @@ class TestDatabaseBroker(unittest.TestCase): broker.db_contains_type = 'test' uuid1 = str(uuid4()) - def _initialize(conn, timestamp): + def _initialize(conn, timestamp, **kwargs): conn.execute('CREATE TABLE test (one TEXT)') conn.execute('CREATE TABLE test_stat (id TEXT)') conn.execute('INSERT INTO test_stat (id) VALUES (?)', (uuid1,)) @@ -433,7 +482,7 @@ class TestDatabaseBroker(unittest.TestCase): broker.db_type = 'test' broker.db_contains_type = 'test' - def _initialize(conn, timestamp): + def _initialize(conn, timestamp, **kwargs): conn.execute('CREATE TABLE test (one TEXT)') conn.execute('INSERT INTO test (one) VALUES ("1")') conn.execute('INSERT INTO test (one) VALUES ("2")') @@ -456,7 +505,7 @@ class TestDatabaseBroker(unittest.TestCase): broker.db_contains_type = 'test' uuid1 = str(uuid4()) - def _initialize(conn, timestamp): + def _initialize(conn, timestamp, **kwargs): conn.execute('CREATE TABLE test (one TEXT)') conn.execute('CREATE TABLE test_stat (id TEXT)') conn.execute('INSERT INTO test_stat (id) VALUES (?)', (uuid1,)) @@ -531,7 +580,7 @@ class TestDatabaseBroker(unittest.TestCase): broker_metadata = metadata and simplejson.dumps( {'Test': ('Value', normalize_timestamp(1))}) or '' - def _initialize(conn, put_timestamp): + def _initialize(conn, put_timestamp, **kwargs): if put_timestamp is None: put_timestamp = normalize_timestamp(0) conn.executescript(''' @@ -562,6 +611,7 @@ class TestDatabaseBroker(unittest.TestCase): created_at TEXT, put_timestamp TEXT DEFAULT '0', delete_timestamp TEXT DEFAULT '0', + status_changed_at TEXT DEFAULT '0', test_count INTEGER, hash TEXT default '00000000000000000000000000000000', id TEXT @@ -571,8 +621,10 @@ class TestDatabaseBroker(unittest.TestCase): ''' % (metadata and ", metadata TEXT DEFAULT ''" or "")) conn.execute(''' UPDATE test_stat - SET account = ?, created_at = ?, id = ?, put_timestamp = ? - ''', (broker.account, broker_creation, broker_uuid, put_timestamp)) + SET account = ?, created_at = ?, id = ?, put_timestamp = ?, + status_changed_at = ? + ''', (broker.account, broker_creation, broker_uuid, put_timestamp, + put_timestamp)) if metadata: conn.execute('UPDATE test_stat SET metadata = ?', (broker_metadata,)) @@ -582,11 +634,11 @@ class TestDatabaseBroker(unittest.TestCase): broker.initialize(put_timestamp) info = broker.get_replication_info() self.assertEquals(info, { - 'count': 0, + 'account': broker.account, 'count': 0, 'hash': '00000000000000000000000000000000', 'created_at': broker_creation, 'put_timestamp': put_timestamp, - 'delete_timestamp': '0', 'max_row': -1, 'id': broker_uuid, - 'metadata': broker_metadata}) + 'delete_timestamp': '0', 'status_changed_at': put_timestamp, + 'max_row': -1, 'id': broker_uuid, 'metadata': broker_metadata}) insert_timestamp = normalize_timestamp(3) with broker.get() as conn: conn.execute(''' @@ -595,21 +647,21 @@ class TestDatabaseBroker(unittest.TestCase): conn.commit() info = broker.get_replication_info() self.assertEquals(info, { - 'count': 1, + 'account': broker.account, 'count': 1, 'hash': 'bdc4c93f574b0d8c2911a27ce9dd38ba', 'created_at': broker_creation, 'put_timestamp': put_timestamp, - 'delete_timestamp': '0', 'max_row': 1, 'id': broker_uuid, - 'metadata': broker_metadata}) + 'delete_timestamp': '0', 'status_changed_at': put_timestamp, + 'max_row': 1, 'id': broker_uuid, 'metadata': broker_metadata}) with broker.get() as conn: conn.execute('DELETE FROM test') conn.commit() info = broker.get_replication_info() self.assertEquals(info, { - 'count': 0, + 'account': broker.account, 'count': 0, 'hash': '00000000000000000000000000000000', 'created_at': broker_creation, 'put_timestamp': put_timestamp, - 'delete_timestamp': '0', 'max_row': 1, 'id': broker_uuid, - 'metadata': broker_metadata}) + 'delete_timestamp': '0', 'status_changed_at': put_timestamp, + 'max_row': 1, 'id': broker_uuid, 'metadata': broker_metadata}) return broker def test_metadata(self): @@ -679,6 +731,74 @@ class TestDatabaseBroker(unittest.TestCase): [first_value, first_timestamp]) self.assert_('Second' not in broker.metadata) + def test_get_max_row(self): + broker = ExampleBroker(':memory:') + broker.initialize(0) + self.assertEquals(-1, broker.get_max_row()) + with broker.get() as conn: + conn.execute(''' + INSERT INTO test (name) VALUES (?) + ''', ('test_name',)) + conn.commit() + self.assertEquals(1, broker.get_max_row()) + with broker.get() as conn: + conn.executescript(''' + DELETE FROM test; + ''') + conn.commit() + self.assertEquals(1, broker.get_max_row()) + with broker.get() as conn: + conn.execute(''' + INSERT INTO test (name) VALUES (?) + ''', ('test_name',)) + conn.commit() + self.assertEquals(2, broker.get_max_row()) + + def test_get_info(self): + broker = ExampleBroker(':memory:', account='test') + broker.initialize(normalize_timestamp(1)) + info = broker.get_info() + expected = { + 'name': 'test', + 'timestamp': '0000000001.00000', + 'status_changed_at': '0', + } + self.assertEqual(info, expected) + + def test_get_raw_metadata(self): + broker = ExampleBroker(':memory:', account='test') + broker.initialize(normalize_timestamp(0)) + self.assertEqual(broker.metadata, {}) + self.assertEqual(broker.get_raw_metadata(), '') + metadata = { + 'test': ['value', normalize_timestamp(1)] + } + broker.update_metadata(metadata) + self.assertEqual(broker.metadata, metadata) + self.assertEqual(broker.get_raw_metadata(), + json.dumps(metadata)) + + def test_status_changed_at(self): + ts = (normalize_timestamp(t) for t in + itertools.count(int(time.time()))) + broker = ExampleBroker(':memory:', account='test') + broker.initialize(ts.next()) + self.assertEquals(broker.get_info()['status_changed_at'], '0') + status_changed_at = ts.next() + broker.update_status_changed_at(status_changed_at) + self.assertEqual(broker.get_info()['status_changed_at'], + status_changed_at) + # save the old and get a new status_changed_at + old_status_changed_at, status_changed_at = \ + status_changed_at, ts.next() + broker.update_status_changed_at(status_changed_at) + self.assertEqual(broker.get_info()['status_changed_at'], + status_changed_at) + # status changed at won't go backwards... + broker.update_status_changed_at(old_status_changed_at) + self.assertEqual(broker.get_info()['status_changed_at'], + status_changed_at) + if __name__ == '__main__': unittest.main() diff --git a/test/unit/container/test_auditor.py b/test/unit/container/test_auditor.py index 5173141bfa..d1df39d2cd 100644 --- a/test/unit/container/test_auditor.py +++ b/test/unit/container/test_auditor.py @@ -21,8 +21,10 @@ import random from tempfile import mkdtemp from shutil import rmtree +from swift.common.utils import normalize_timestamp from swift.container import auditor -from test.unit import FakeLogger +from test.unit import debug_logger, with_tempdir +from test.unit.container import test_backend class FakeContainerBroker(object): @@ -45,7 +47,7 @@ class TestAuditor(unittest.TestCase): def setUp(self): self.testdir = os.path.join(mkdtemp(), 'tmp_test_container_auditor') - self.logger = FakeLogger() + self.logger = debug_logger() rmtree(self.testdir, ignore_errors=1) os.mkdir(self.testdir) fnames = ['true1.db', 'true2.db', 'true3.db', @@ -78,7 +80,7 @@ class TestAuditor(unittest.TestCase): return time.time() conf = {} - test_auditor = auditor.ContainerAuditor(conf) + test_auditor = auditor.ContainerAuditor(conf, logger=self.logger) with mock.patch('swift.container.auditor.time', FakeTime()): def fake_audit_location_generator(*args, **kwargs): @@ -94,7 +96,7 @@ class TestAuditor(unittest.TestCase): @mock.patch('swift.container.auditor.ContainerBroker', FakeContainerBroker) def test_run_once(self): conf = {} - test_auditor = auditor.ContainerAuditor(conf) + test_auditor = auditor.ContainerAuditor(conf, logger=self.logger) def fake_audit_location_generator(*args, **kwargs): files = os.listdir(self.testdir) @@ -109,7 +111,7 @@ class TestAuditor(unittest.TestCase): @mock.patch('swift.container.auditor.ContainerBroker', FakeContainerBroker) def test_container_auditor(self): conf = {} - test_auditor = auditor.ContainerAuditor(conf) + test_auditor = auditor.ContainerAuditor(conf, logger=self.logger) files = os.listdir(self.testdir) for f in files: path = os.path.join(self.testdir, f) @@ -117,5 +119,45 @@ class TestAuditor(unittest.TestCase): self.assertEquals(test_auditor.container_failures, 2) self.assertEquals(test_auditor.container_passes, 3) + +class TestAuditorMigrations(unittest.TestCase): + + @with_tempdir + def test_db_migration(self, tempdir): + db_path = os.path.join(tempdir, 'sda', 'containers', '0', '0', '0', + 'test.db') + with test_backend.TestContainerBrokerBeforeSPI.old_broker() as \ + old_ContainerBroker: + broker = old_ContainerBroker(db_path, account='a', container='c') + broker.initialize(normalize_timestamp(0), -1) + + with broker.get() as conn: + try: + conn.execute('SELECT storage_policy_index ' + 'FROM container_stat') + except Exception as err: + self.assert_('no such column: storage_policy_index' in + str(err)) + else: + self.fail('TestContainerBrokerBeforeSPI broker class ' + 'was already migrated') + + conf = {'devices': tempdir, 'mount_check': False} + test_auditor = auditor.ContainerAuditor(conf, logger=debug_logger()) + test_auditor.run_once() + + broker = auditor.ContainerBroker(db_path, account='a', container='c') + info = broker.get_info() + expected = { + 'account': 'a', + 'container': 'c', + 'object_count': 0, + 'bytes_used': 0, + 'storage_policy_index': 0, + } + for k, v in expected.items(): + self.assertEqual(info[k], v) + + if __name__ == '__main__': unittest.main() diff --git a/test/unit/container/test_backend.py b/test/unit/container/test_backend.py index 56f2c584ec..c159477d73 100644 --- a/test/unit/container/test_backend.py +++ b/test/unit/container/test_backend.py @@ -15,13 +15,25 @@ """ Tests for swift.container.backend """ +import os import hashlib import unittest from time import sleep, time from uuid import uuid4 +import itertools +import random +from collections import defaultdict +from contextlib import contextmanager +import sqlite3 +import pickle from swift.container.backend import ContainerBroker from swift.common.utils import normalize_timestamp +from swift.common.storage_policy import POLICIES + +import mock + +from test.unit import patch_policies, with_tempdir class TestContainerBroker(unittest.TestCase): @@ -31,18 +43,41 @@ class TestContainerBroker(unittest.TestCase): # Test ContainerBroker.__init__ broker = ContainerBroker(':memory:', account='a', container='c') self.assertEqual(broker.db_file, ':memory:') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) with broker.get() as conn: curs = conn.cursor() curs.execute('SELECT 1') self.assertEqual(curs.fetchall()[0][0], 1) + @patch_policies + def test_storage_policy_property(self): + ts = itertools.count(1) + for policy in POLICIES: + broker = ContainerBroker(':memory:', account='a', + container='policy_%s' % policy.name) + broker.initialize(normalize_timestamp(ts.next()), policy.idx) + with broker.get() as conn: + try: + conn.execute('''SELECT storage_policy_index + FROM container_stat''') + except Exception: + is_migrated = False + else: + is_migrated = True + if not is_migrated: + # pre spi tests don't set policy on initialize + broker.set_storage_policy_index(policy.idx) + self.assertEqual(policy.idx, broker.storage_policy_index) + # make sure it's cached + with mock.patch.object(broker, 'get'): + self.assertEqual(policy.idx, broker.storage_policy_index) + def test_exception(self): # Test ContainerBroker throwing a conn away after # unhandled exception first_conn = None broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) with broker.get() as conn: first_conn = conn try: @@ -56,7 +91,7 @@ class TestContainerBroker(unittest.TestCase): def test_empty(self): # Test ContainerBroker.empty broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) self.assert_(broker.empty()) broker.put_object('o', normalize_timestamp(time()), 0, 'text/plain', 'd41d8cd98f00b204e9800998ecf8427e') @@ -68,7 +103,7 @@ class TestContainerBroker(unittest.TestCase): def test_reclaim(self): broker = ContainerBroker(':memory:', account='test_account', container='test_container') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) broker.put_object('o', normalize_timestamp(time()), 0, 'text/plain', 'd41d8cd98f00b204e9800998ecf8427e') with broker.get() as conn: @@ -125,10 +160,56 @@ class TestContainerBroker(unittest.TestCase): broker.reclaim(normalize_timestamp(time()), time()) broker.delete_db(normalize_timestamp(time())) + def test_get_info_is_deleted(self): + start = int(time()) + ts = (normalize_timestamp(t) for t in itertools.count(start)) + broker = ContainerBroker(':memory:', account='test_account', + container='test_container') + # create it + broker.initialize(ts.next(), POLICIES.default.idx) + info, is_deleted = broker.get_info_is_deleted() + self.assertEqual(is_deleted, broker.is_deleted()) + self.assertEqual(is_deleted, False) # sanity + self.assertEqual(info, broker.get_info()) + self.assertEqual(info['put_timestamp'], normalize_timestamp(start)) + self.assert_(float(info['created_at']) >= start) + self.assertEqual(info['delete_timestamp'], '0') + if self.__class__ in (TestContainerBrokerBeforeMetadata, + TestContainerBrokerBeforeXSync, + TestContainerBrokerBeforeSPI): + self.assertEqual(info['status_changed_at'], '0') + else: + self.assertEqual(info['status_changed_at'], + normalize_timestamp(start)) + + # delete it + delete_timestamp = ts.next() + broker.delete_db(delete_timestamp) + info, is_deleted = broker.get_info_is_deleted() + self.assertEqual(is_deleted, True) # sanity + self.assertEqual(is_deleted, broker.is_deleted()) + self.assertEqual(info, broker.get_info()) + self.assertEqual(info['put_timestamp'], normalize_timestamp(start)) + self.assert_(float(info['created_at']) >= start) + self.assertEqual(info['delete_timestamp'], delete_timestamp) + self.assertEqual(info['status_changed_at'], delete_timestamp) + + # bring back to life + broker.put_object('obj', ts.next(), 0, 'text/plain', 'etag', + storage_policy_index=broker.storage_policy_index) + info, is_deleted = broker.get_info_is_deleted() + self.assertEqual(is_deleted, False) # sanity + self.assertEqual(is_deleted, broker.is_deleted()) + self.assertEqual(info, broker.get_info()) + self.assertEqual(info['put_timestamp'], normalize_timestamp(start)) + self.assert_(float(info['created_at']) >= start) + self.assertEqual(info['delete_timestamp'], delete_timestamp) + self.assertEqual(info['status_changed_at'], delete_timestamp) + def test_delete_object(self): # Test ContainerBroker.delete_object broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) broker.put_object('o', normalize_timestamp(time()), 0, 'text/plain', 'd41d8cd98f00b204e9800998ecf8427e') with broker.get() as conn: @@ -151,7 +232,7 @@ class TestContainerBroker(unittest.TestCase): def test_put_object(self): # Test ContainerBroker.put_object broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) # Create initial object timestamp = normalize_timestamp(time()) @@ -347,16 +428,150 @@ class TestContainerBroker(unittest.TestCase): self.assertEquals(conn.execute( "SELECT deleted FROM object").fetchone()[0], 0) + @patch_policies + def test_put_misplaced_object_does_not_effect_container_stats(self): + policy = random.choice(list(POLICIES)) + ts = (normalize_timestamp(t) for t in + itertools.count(int(time()))) + broker = ContainerBroker(':memory:', + account='a', container='c') + broker.initialize(ts.next(), policy.idx) + # migration tests may not honor policy on initialize + if isinstance(self, ContainerBrokerMigrationMixin): + real_storage_policy_index = \ + broker.get_info()['storage_policy_index'] + policy = filter(lambda p: p.idx == real_storage_policy_index, + POLICIES)[0] + broker.put_object('correct_o', ts.next(), 123, 'text/plain', + '5af83e3196bf99f440f31f2e1a6c9afe', + storage_policy_index=policy.idx) + info = broker.get_info() + self.assertEqual(1, info['object_count']) + self.assertEqual(123, info['bytes_used']) + other_policy = random.choice([p for p in POLICIES + if p is not policy]) + broker.put_object('wrong_o', ts.next(), 123, 'text/plain', + '5af83e3196bf99f440f31f2e1a6c9afe', + storage_policy_index=other_policy.idx) + self.assertEqual(1, info['object_count']) + self.assertEqual(123, info['bytes_used']) + + @patch_policies + def test_has_multiple_policies(self): + policy = random.choice(list(POLICIES)) + ts = (normalize_timestamp(t) for t in + itertools.count(int(time()))) + broker = ContainerBroker(':memory:', + account='a', container='c') + broker.initialize(ts.next(), policy.idx) + # migration tests may not honor policy on initialize + if isinstance(self, ContainerBrokerMigrationMixin): + real_storage_policy_index = \ + broker.get_info()['storage_policy_index'] + policy = filter(lambda p: p.idx == real_storage_policy_index, + POLICIES)[0] + broker.put_object('correct_o', ts.next(), 123, 'text/plain', + '5af83e3196bf99f440f31f2e1a6c9afe', + storage_policy_index=policy.idx) + self.assertFalse(broker.has_multiple_policies()) + other_policy = [p for p in POLICIES if p is not policy][0] + broker.put_object('wrong_o', ts.next(), 123, 'text/plain', + '5af83e3196bf99f440f31f2e1a6c9afe', + storage_policy_index=other_policy.idx) + self.assert_(broker.has_multiple_policies()) + + @patch_policies + def test_get_policy_info(self): + policy = random.choice(list(POLICIES)) + ts = (normalize_timestamp(t) for t in + itertools.count(int(time()))) + broker = ContainerBroker(':memory:', + account='a', container='c') + broker.initialize(ts.next(), policy.idx) + # migration tests may not honor policy on initialize + if isinstance(self, ContainerBrokerMigrationMixin): + real_storage_policy_index = \ + broker.get_info()['storage_policy_index'] + policy = filter(lambda p: p.idx == real_storage_policy_index, + POLICIES)[0] + policy_stats = broker.get_policy_stats() + expected = {policy.idx: {'bytes_used': 0, 'object_count': 0}} + self.assertEqual(policy_stats, expected) + + # add an object + broker.put_object('correct_o', ts.next(), 123, 'text/plain', + '5af83e3196bf99f440f31f2e1a6c9afe', + storage_policy_index=policy.idx) + policy_stats = broker.get_policy_stats() + expected = {policy.idx: {'bytes_used': 123, 'object_count': 1}} + self.assertEqual(policy_stats, expected) + + # add a misplaced object + other_policy = random.choice([p for p in POLICIES + if p is not policy]) + broker.put_object('wrong_o', ts.next(), 123, 'text/plain', + '5af83e3196bf99f440f31f2e1a6c9afe', + storage_policy_index=other_policy.idx) + policy_stats = broker.get_policy_stats() + expected = { + policy.idx: {'bytes_used': 123, 'object_count': 1}, + other_policy.idx: {'bytes_used': 123, 'object_count': 1}, + } + self.assertEqual(policy_stats, expected) + + def test_policy_stat_tracking(self): + ts = (normalize_timestamp(t) for t in + itertools.count(int(time()))) + broker = ContainerBroker(':memory:', + account='a', container='c') + broker.initialize(ts.next(), POLICIES.default.idx) + stats = defaultdict(dict) + + iters = 100 + for i in range(iters): + policy_index = random.randint(0, iters * 0.1) + name = 'object-%s' % random.randint(0, iters * 0.1) + size = random.randint(0, iters) + broker.put_object(name, ts.next(), size, 'text/plain', + '5af83e3196bf99f440f31f2e1a6c9afe', + storage_policy_index=policy_index) + # track the size of the latest timestamp put for each object + # in each storage policy + stats[policy_index][name] = size + policy_stats = broker.get_policy_stats() + # if no objects were added for the default policy we still + # expect an entry for the default policy in the returned info + # because the database was initialized with that storage policy + # - but it must be empty. + if POLICIES.default.idx not in stats: + default_stats = policy_stats.pop(POLICIES.default.idx) + expected = {'object_count': 0, 'bytes_used': 0} + self.assertEqual(default_stats, expected) + self.assertEqual(len(policy_stats), len(stats)) + for policy_index, stat in policy_stats.items(): + self.assertEqual(stat['object_count'], len(stats[policy_index])) + self.assertEqual(stat['bytes_used'], + sum(stats[policy_index].values())) + def test_get_info(self): # Test ContainerBroker.get_info broker = ContainerBroker(':memory:', account='test1', container='test2') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) info = broker.get_info() self.assertEquals(info['account'], 'test1') self.assertEquals(info['container'], 'test2') self.assertEquals(info['hash'], '00000000000000000000000000000000') + self.assertEqual(info['put_timestamp'], normalize_timestamp(1)) + self.assertEqual(info['delete_timestamp'], '0') + if self.__class__ in (TestContainerBrokerBeforeMetadata, + TestContainerBrokerBeforeXSync, + TestContainerBrokerBeforeSPI): + self.assertEqual(info['status_changed_at'], '0') + else: + self.assertEqual(info['status_changed_at'], + normalize_timestamp(1)) info = broker.get_info() self.assertEquals(info['object_count'], 0) @@ -401,7 +616,7 @@ class TestContainerBroker(unittest.TestCase): def test_set_x_syncs(self): broker = ContainerBroker(':memory:', account='test1', container='test2') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) info = broker.get_info() self.assertEquals(info['x_container_sync_point1'], -1) @@ -415,7 +630,7 @@ class TestContainerBroker(unittest.TestCase): def test_get_report_info(self): broker = ContainerBroker(':memory:', account='test1', container='test2') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) info = broker.get_info() self.assertEquals(info['account'], 'test1') @@ -482,7 +697,7 @@ class TestContainerBroker(unittest.TestCase): def test_list_objects_iter(self): # Test ContainerBroker.list_objects_iter broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) for obj1 in xrange(4): for obj2 in xrange(125): broker.put_object('%d/%04d' % (obj1, obj2), @@ -601,7 +816,7 @@ class TestContainerBroker(unittest.TestCase): # Test ContainerBroker.list_objects_iter using a # delimiter that is not a slash broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) for obj1 in xrange(4): for obj2 in xrange(125): broker.put_object('%d:%04d' % (obj1, obj2), @@ -718,7 +933,7 @@ class TestContainerBroker(unittest.TestCase): def test_list_objects_iter_prefix_delim(self): # Test ContainerBroker.list_objects_iter broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) broker.put_object( '/pets/dogs/1', normalize_timestamp(0), 0, @@ -755,7 +970,7 @@ class TestContainerBroker(unittest.TestCase): # Test ContainerBroker.list_objects_iter for a # container that has an odd file with a trailing delimiter broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) broker.put_object('a', normalize_timestamp(time()), 0, 'text/plain', 'd41d8cd98f00b204e9800998ecf8427e') broker.put_object('a/', normalize_timestamp(time()), 0, @@ -835,7 +1050,7 @@ class TestContainerBroker(unittest.TestCase): # Test ContainerBroker.list_objects_iter for a # container that has an odd file with a trailing delimiter broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) broker.put_object('a', normalize_timestamp(time()), 0, 'text/plain', 'd41d8cd98f00b204e9800998ecf8427e') broker.put_object('a:', normalize_timestamp(time()), 0, @@ -913,7 +1128,7 @@ class TestContainerBroker(unittest.TestCase): def test_chexor(self): broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) broker.put_object('a', normalize_timestamp(1), 0, 'text/plain', 'd41d8cd98f00b204e9800998ecf8427e') broker.put_object('b', normalize_timestamp(2), 0, @@ -933,7 +1148,7 @@ class TestContainerBroker(unittest.TestCase): def test_newid(self): # test DatabaseBroker.newid broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) id = broker.get_info()['id'] broker.newid('someid') self.assertNotEquals(id, broker.get_info()['id']) @@ -941,7 +1156,7 @@ class TestContainerBroker(unittest.TestCase): def test_get_items_since(self): # test DatabaseBroker.get_items_since broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) broker.put_object('a', normalize_timestamp(1), 0, 'text/plain', 'd41d8cd98f00b204e9800998ecf8427e') max_row = broker.get_replication_info()['max_row'] @@ -954,9 +1169,9 @@ class TestContainerBroker(unittest.TestCase): def test_sync_merging(self): # exercise the DatabaseBroker sync functions a bit broker1 = ContainerBroker(':memory:', account='a', container='c') - broker1.initialize(normalize_timestamp('1')) + broker1.initialize(normalize_timestamp('1'), 0) broker2 = ContainerBroker(':memory:', account='a', container='c') - broker2.initialize(normalize_timestamp('1')) + broker2.initialize(normalize_timestamp('1'), 0) self.assertEquals(broker2.get_sync('12345'), -1) broker1.merge_syncs([{'sync_point': 3, 'remote_id': '12345'}]) broker2.merge_syncs(broker1.get_syncs()) @@ -964,9 +1179,9 @@ class TestContainerBroker(unittest.TestCase): def test_merge_items(self): broker1 = ContainerBroker(':memory:', account='a', container='c') - broker1.initialize(normalize_timestamp('1')) + broker1.initialize(normalize_timestamp('1'), 0) broker2 = ContainerBroker(':memory:', account='a', container='c') - broker2.initialize(normalize_timestamp('1')) + broker2.initialize(normalize_timestamp('1'), 0) broker1.put_object('a', normalize_timestamp(1), 0, 'text/plain', 'd41d8cd98f00b204e9800998ecf8427e') broker1.put_object('b', normalize_timestamp(2), 0, @@ -989,10 +1204,10 @@ class TestContainerBroker(unittest.TestCase): def test_merge_items_overwrite(self): # test DatabaseBroker.merge_items broker1 = ContainerBroker(':memory:', account='a', container='c') - broker1.initialize(normalize_timestamp('1')) + broker1.initialize(normalize_timestamp('1'), 0) id = broker1.get_info()['id'] broker2 = ContainerBroker(':memory:', account='a', container='c') - broker2.initialize(normalize_timestamp('1')) + broker2.initialize(normalize_timestamp('1'), 0) broker1.put_object('a', normalize_timestamp(2), 0, 'text/plain', 'd41d8cd98f00b204e9800998ecf8427e') broker1.put_object('b', normalize_timestamp(3), 0, @@ -1014,10 +1229,10 @@ class TestContainerBroker(unittest.TestCase): def test_merge_items_post_overwrite_out_of_order(self): # test DatabaseBroker.merge_items broker1 = ContainerBroker(':memory:', account='a', container='c') - broker1.initialize(normalize_timestamp('1')) + broker1.initialize(normalize_timestamp('1'), 0) id = broker1.get_info()['id'] broker2 = ContainerBroker(':memory:', account='a', container='c') - broker2.initialize(normalize_timestamp('1')) + broker2.initialize(normalize_timestamp('1'), 0) broker1.put_object('a', normalize_timestamp(2), 0, 'text/plain', 'd41d8cd98f00b204e9800998ecf8427e') broker1.put_object('b', normalize_timestamp(3), 0, @@ -1056,8 +1271,165 @@ class TestContainerBroker(unittest.TestCase): self.assertEquals(rec['created_at'], normalize_timestamp(5)) self.assertEquals(rec['content_type'], 'text/plain') + def test_set_storage_policy_index(self): + ts = (normalize_timestamp(t) for t in + itertools.count(int(time()))) + broker = ContainerBroker(':memory:', account='test_account', + container='test_container') + timestamp = ts.next() + broker.initialize(timestamp, 0) -def premetadata_create_container_stat_table(self, conn, put_timestamp=None): + info = broker.get_info() + self.assertEqual(0, info['storage_policy_index']) # sanity check + self.assertEqual(0, info['object_count']) + self.assertEqual(0, info['bytes_used']) + if self.__class__ in (TestContainerBrokerBeforeMetadata, + TestContainerBrokerBeforeXSync, + TestContainerBrokerBeforeSPI): + self.assertEqual(info['status_changed_at'], '0') + else: + self.assertEqual(timestamp, info['status_changed_at']) + expected = {0: {'object_count': 0, 'bytes_used': 0}} + self.assertEqual(expected, broker.get_policy_stats()) + + timestamp = ts.next() + broker.set_storage_policy_index(111, timestamp) + self.assertEqual(broker.storage_policy_index, 111) + info = broker.get_info() + self.assertEqual(111, info['storage_policy_index']) + self.assertEqual(0, info['object_count']) + self.assertEqual(0, info['bytes_used']) + self.assertEqual(timestamp, info['status_changed_at']) + expected[111] = {'object_count': 0, 'bytes_used': 0} + self.assertEqual(expected, broker.get_policy_stats()) + + timestamp = ts.next() + broker.set_storage_policy_index(222, timestamp) + self.assertEqual(broker.storage_policy_index, 222) + info = broker.get_info() + self.assertEqual(222, info['storage_policy_index']) + self.assertEqual(0, info['object_count']) + self.assertEqual(0, info['bytes_used']) + self.assertEqual(timestamp, info['status_changed_at']) + expected[222] = {'object_count': 0, 'bytes_used': 0} + self.assertEqual(expected, broker.get_policy_stats()) + + old_timestamp, timestamp = timestamp, ts.next() + broker.set_storage_policy_index(222, timestamp) # it's idempotent + info = broker.get_info() + self.assertEqual(222, info['storage_policy_index']) + self.assertEqual(0, info['object_count']) + self.assertEqual(0, info['bytes_used']) + self.assertEqual(old_timestamp, info['status_changed_at']) + self.assertEqual(expected, broker.get_policy_stats()) + + def test_set_storage_policy_index_empty(self): + # Putting an object may trigger migrations, so test with a + # never-had-an-object container to make sure we handle it + broker = ContainerBroker(':memory:', account='test_account', + container='test_container') + broker.initialize(normalize_timestamp('1'), 0) + info = broker.get_info() + self.assertEqual(0, info['storage_policy_index']) + + broker.set_storage_policy_index(2) + info = broker.get_info() + self.assertEqual(2, info['storage_policy_index']) + + def test_reconciler_sync(self): + broker = ContainerBroker(':memory:', account='test_account', + container='test_container') + broker.initialize(normalize_timestamp('1'), 0) + self.assertEquals(-1, broker.get_reconciler_sync()) + broker.update_reconciler_sync(10) + self.assertEquals(10, broker.get_reconciler_sync()) + + @with_tempdir + def test_legacy_pending_files(self, tempdir): + ts = (normalize_timestamp(t) for t in + itertools.count(int(time()))) + db_path = os.path.join(tempdir, 'container.db') + + # first init an acct DB without the policy_stat table present + broker = ContainerBroker(db_path, account='a', container='c') + broker.initialize(ts.next(), 1) + + # manually make some pending entries lacking storage_policy_index + with open(broker.pending_file, 'a+b') as fp: + for i in range(10): + name, timestamp, size, content_type, etag, deleted = ( + 'o%s' % i, ts.next(), 0, 'c', 'e', 0) + fp.write(':') + fp.write(pickle.dumps( + (name, timestamp, size, content_type, etag, deleted), + protocol=2).encode('base64')) + fp.flush() + + # use put_object to append some more entries with different + # values for storage_policy_index + for i in range(10, 30): + name = 'o%s' % i + if i < 20: + size = 1 + storage_policy_index = 0 + else: + size = 2 + storage_policy_index = 1 + broker.put_object(name, ts.next(), size, 'c', 'e', 0, + storage_policy_index=storage_policy_index) + + broker._commit_puts_stale_ok() + + # 10 objects with 0 bytes each in the legacy pending entries + # 10 objects with 1 bytes each in storage policy 0 + # 10 objects with 2 bytes each in storage policy 1 + expected = { + 0: {'object_count': 20, 'bytes_used': 10}, + 1: {'object_count': 10, 'bytes_used': 20}, + } + self.assertEqual(broker.get_policy_stats(), expected) + + +class ContainerBrokerMigrationMixin(object): + """ + Mixin for running ContainerBroker against databases created with + older schemas. + """ + def setUp(self): + self._imported_create_object_table = \ + ContainerBroker.create_object_table + ContainerBroker.create_object_table = \ + prespi_create_object_table + self._imported_create_container_info_table = \ + ContainerBroker.create_container_info_table + ContainerBroker.create_container_info_table = \ + premetadata_create_container_info_table + self._imported_create_policy_stat_table = \ + ContainerBroker.create_policy_stat_table + ContainerBroker.create_policy_stat_table = lambda *args: None + + @classmethod + @contextmanager + def old_broker(cls): + cls.runTest = lambda *a, **k: None + case = cls() + case.setUp() + try: + yield ContainerBroker + finally: + case.tearDown() + + def tearDown(self): + ContainerBroker.create_container_info_table = \ + self._imported_create_container_info_table + ContainerBroker.create_object_table = \ + self._imported_create_object_table + ContainerBroker.create_policy_stat_table = \ + self._imported_create_policy_stat_table + + +def premetadata_create_container_info_table(self, conn, put_timestamp, + _spi=None): """ Copied from ContainerBroker before the metadata column was added; used for testing with TestContainerBrokerBeforeMetadata. @@ -1099,19 +1471,17 @@ def premetadata_create_container_stat_table(self, conn, put_timestamp=None): str(uuid4()), put_timestamp)) -class TestContainerBrokerBeforeMetadata(TestContainerBroker): +class TestContainerBrokerBeforeMetadata(ContainerBrokerMigrationMixin, + TestContainerBroker): """ Tests for ContainerBroker against databases created before the metadata column was added. """ def setUp(self): - self._imported_create_container_stat_table = \ - ContainerBroker.create_container_stat_table - ContainerBroker.create_container_stat_table = \ - premetadata_create_container_stat_table + super(TestContainerBrokerBeforeMetadata, self).setUp() broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) exc = None with broker.get() as conn: try: @@ -1121,15 +1491,15 @@ class TestContainerBrokerBeforeMetadata(TestContainerBroker): self.assert_('no such column: metadata' in str(exc)) def tearDown(self): - ContainerBroker.create_container_stat_table = \ - self._imported_create_container_stat_table + super(TestContainerBrokerBeforeMetadata, self).tearDown() broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) with broker.get() as conn: conn.execute('SELECT metadata FROM container_stat') -def prexsync_create_container_stat_table(self, conn, put_timestamp=None): +def prexsync_create_container_info_table(self, conn, put_timestamp, + _spi=None): """ Copied from ContainerBroker before the x_container_sync_point[12] columns were added; used for testing with @@ -1173,19 +1543,19 @@ def prexsync_create_container_stat_table(self, conn, put_timestamp=None): str(uuid4()), put_timestamp)) -class TestContainerBrokerBeforeXSync(TestContainerBroker): +class TestContainerBrokerBeforeXSync(ContainerBrokerMigrationMixin, + TestContainerBroker): """ Tests for ContainerBroker against databases created before the x_container_sync_point[12] columns were added. """ def setUp(self): - self._imported_create_container_stat_table = \ - ContainerBroker.create_container_stat_table - ContainerBroker.create_container_stat_table = \ - prexsync_create_container_stat_table + super(TestContainerBrokerBeforeXSync, self).setUp() + ContainerBroker.create_container_info_table = \ + prexsync_create_container_info_table broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) exc = None with broker.get() as conn: try: @@ -1196,9 +1566,300 @@ class TestContainerBrokerBeforeXSync(TestContainerBroker): self.assert_('no such column: x_container_sync_point1' in str(exc)) def tearDown(self): - ContainerBroker.create_container_stat_table = \ - self._imported_create_container_stat_table + super(TestContainerBrokerBeforeXSync, self).tearDown() broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) with broker.get() as conn: conn.execute('SELECT x_container_sync_point1 FROM container_stat') + + +def prespi_create_object_table(self, conn, *args, **kwargs): + conn.executescript(""" + CREATE TABLE object ( + ROWID INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT, + created_at TEXT, + size INTEGER, + content_type TEXT, + etag TEXT, + deleted INTEGER DEFAULT 0 + ); + + CREATE INDEX ix_object_deleted_name ON object (deleted, name); + + CREATE TRIGGER object_insert AFTER INSERT ON object + BEGIN + UPDATE container_stat + SET object_count = object_count + (1 - new.deleted), + bytes_used = bytes_used + new.size, + hash = chexor(hash, new.name, new.created_at); + END; + + CREATE TRIGGER object_update BEFORE UPDATE ON object + BEGIN + SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT'); + END; + + CREATE TRIGGER object_delete AFTER DELETE ON object + BEGIN + UPDATE container_stat + SET object_count = object_count - (1 - old.deleted), + bytes_used = bytes_used - old.size, + hash = chexor(hash, old.name, old.created_at); + END; + """) + + +def prespi_create_container_info_table(self, conn, put_timestamp, + _spi=None): + """ + Copied from ContainerBroker before the + storage_policy_index column was added; used for testing with + TestContainerBrokerBeforeSPI. + + Create the container_stat table which is specific to the container DB. + + :param conn: DB connection object + :param put_timestamp: put timestamp + """ + if put_timestamp is None: + put_timestamp = normalize_timestamp(0) + conn.executescript(""" + CREATE TABLE container_stat ( + account TEXT, + container TEXT, + created_at TEXT, + put_timestamp TEXT DEFAULT '0', + delete_timestamp TEXT DEFAULT '0', + object_count INTEGER, + bytes_used INTEGER, + reported_put_timestamp TEXT DEFAULT '0', + reported_delete_timestamp TEXT DEFAULT '0', + reported_object_count INTEGER DEFAULT 0, + reported_bytes_used INTEGER DEFAULT 0, + hash TEXT default '00000000000000000000000000000000', + id TEXT, + status TEXT DEFAULT '', + status_changed_at TEXT DEFAULT '0', + metadata TEXT DEFAULT '', + x_container_sync_point1 INTEGER DEFAULT -1, + x_container_sync_point2 INTEGER DEFAULT -1 + ); + + INSERT INTO container_stat (object_count, bytes_used) + VALUES (0, 0); + """) + conn.execute(''' + UPDATE container_stat + SET account = ?, container = ?, created_at = ?, id = ?, + put_timestamp = ? + ''', (self.account, self.container, normalize_timestamp(time()), + str(uuid4()), put_timestamp)) + + +class TestContainerBrokerBeforeSPI(ContainerBrokerMigrationMixin, + TestContainerBroker): + """ + Tests for ContainerBroker against databases created + before the storage_policy_index column was added. + """ + + def setUp(self): + super(TestContainerBrokerBeforeSPI, self).setUp() + ContainerBroker.create_container_info_table = \ + prespi_create_container_info_table + + broker = ContainerBroker(':memory:', account='a', container='c') + broker.initialize(normalize_timestamp('1'), 0) + exc = None + with broker.get() as conn: + try: + conn.execute('''SELECT storage_policy_index + FROM container_stat''') + except BaseException as err: + exc = err + self.assert_('no such column: storage_policy_index' in str(exc)) + + def tearDown(self): + super(TestContainerBrokerBeforeSPI, self).tearDown() + broker = ContainerBroker(':memory:', account='a', container='c') + broker.initialize(normalize_timestamp('1'), 0) + with broker.get() as conn: + conn.execute('SELECT storage_policy_index FROM container_stat') + + @patch_policies + @with_tempdir + def test_object_table_migration(self, tempdir): + db_path = os.path.join(tempdir, 'container.db') + + # initialize an un-migrated database + broker = ContainerBroker(db_path, account='a', container='c') + put_timestamp = normalize_timestamp(int(time())) + broker.initialize(put_timestamp, None) + with broker.get() as conn: + try: + conn.execute(''' + SELECT storage_policy_index FROM object + ''').fetchone()[0] + except sqlite3.OperationalError as err: + # confirm that the table doesn't have this column + self.assert_('no such column: storage_policy_index' in + str(err)) + else: + self.fail('broker did not raise sqlite3.OperationalError ' + 'trying to select from storage_policy_index ' + 'from object table!') + + # manually insert an existing row to avoid automatic migration + obj_put_timestamp = normalize_timestamp(time()) + with broker.get() as conn: + conn.execute(''' + INSERT INTO object (name, created_at, size, + content_type, etag, deleted) + VALUES (?, ?, ?, ?, ?, ?) + ''', ('test_name', obj_put_timestamp, 123, + 'text/plain', '8f4c680e75ca4c81dc1917ddab0a0b5c', 0)) + conn.commit() + + # make sure we can iter objects without performing migration + for o in broker.list_objects_iter(1, None, None, None, None): + self.assertEqual(o, ('test_name', obj_put_timestamp, 123, + 'text/plain', + '8f4c680e75ca4c81dc1917ddab0a0b5c')) + + # get_info + info = broker.get_info() + expected = { + 'account': 'a', + 'container': 'c', + 'put_timestamp': put_timestamp, + 'delete_timestamp': '0', + 'status_changed_at': '0', + 'bytes_used': 123, + 'object_count': 1, + 'reported_put_timestamp': '0', + 'reported_delete_timestamp': '0', + 'reported_object_count': 0, + 'reported_bytes_used': 0, + 'x_container_sync_point1': -1, + 'x_container_sync_point2': -1, + 'storage_policy_index': 0, + } + for k, v in expected.items(): + self.assertEqual(info[k], v, + 'The value for %s was %r not %r' % ( + k, info[k], v)) + self.assert_(float(info['created_at']) > float(put_timestamp)) + self.assertNotEqual(int(info['hash'], 16), 0) + orig_hash = info['hash'] + # get_replication_info + info = broker.get_replication_info() + # translate object count for replicators + expected['count'] = expected.pop('object_count') + for k, v in expected.items(): + self.assertEqual(info[k], v) + self.assert_(float(info['created_at']) > float(put_timestamp)) + self.assertEqual(info['hash'], orig_hash) + self.assertEqual(info['max_row'], 1) + self.assertEqual(info['metadata'], '') + # get_policy_stats + info = broker.get_policy_stats() + expected = { + 0: {'bytes_used': 123, 'object_count': 1} + } + self.assertEqual(info, expected) + # empty & is_deleted + self.assertEqual(broker.empty(), False) + self.assertEqual(broker.is_deleted(), False) + + # no migrations have occurred yet + + # container_stat table + with broker.get() as conn: + try: + conn.execute(''' + SELECT storage_policy_index FROM container_stat + ''').fetchone()[0] + except sqlite3.OperationalError as err: + # confirm that the table doesn't have this column + self.assert_('no such column: storage_policy_index' in + str(err)) + else: + self.fail('broker did not raise sqlite3.OperationalError ' + 'trying to select from storage_policy_index ' + 'from container_stat table!') + + # object table + with broker.get() as conn: + try: + conn.execute(''' + SELECT storage_policy_index FROM object + ''').fetchone()[0] + except sqlite3.OperationalError as err: + # confirm that the table doesn't have this column + self.assert_('no such column: storage_policy_index' in + str(err)) + else: + self.fail('broker did not raise sqlite3.OperationalError ' + 'trying to select from storage_policy_index ' + 'from object table!') + + # policy_stat table + with broker.get() as conn: + try: + conn.execute(''' + SELECT storage_policy_index FROM policy_stat + ''').fetchone()[0] + except sqlite3.OperationalError as err: + # confirm that the table does not exist yet + self.assert_('no such table: policy_stat' in str(err)) + else: + self.fail('broker did not raise sqlite3.OperationalError ' + 'trying to select from storage_policy_index ' + 'from policy_stat table!') + + # now do a PUT with a different value for storage_policy_index + # which will update the DB schema as well as update policy_stats + # for legacy objects in the DB (those without an SPI) + second_object_put_timestamp = normalize_timestamp(time()) + other_policy = [p for p in POLICIES if p.idx != 0][0] + broker.put_object('test_second', second_object_put_timestamp, + 456, 'text/plain', + 'cbac50c175793513fa3c581551c876ab', + storage_policy_index=other_policy.idx) + broker._commit_puts_stale_ok() + + # we are fully migrated and both objects have their + # storage_policy_index + with broker.get() as conn: + storage_policy_index = conn.execute(''' + SELECT storage_policy_index FROM container_stat + ''').fetchone()[0] + self.assertEqual(storage_policy_index, 0) + rows = conn.execute(''' + SELECT name, storage_policy_index FROM object + ''').fetchall() + for row in rows: + if row[0] == 'test_name': + self.assertEqual(row[1], 0) + else: + self.assertEqual(row[1], other_policy.idx) + + # and all stats tracking is in place + stats = broker.get_policy_stats() + self.assertEqual(len(stats), 2) + self.assertEqual(stats[0]['object_count'], 1) + self.assertEqual(stats[0]['bytes_used'], 123) + self.assertEqual(stats[other_policy.idx]['object_count'], 1) + self.assertEqual(stats[other_policy.idx]['bytes_used'], 456) + + # get info still reports on the legacy storage policy + info = broker.get_info() + self.assertEqual(info['object_count'], 1) + self.assertEqual(info['bytes_used'], 123) + + # unless you change the storage policy + broker.set_storage_policy_index(other_policy.idx) + info = broker.get_info() + self.assertEqual(info['object_count'], 1) + self.assertEqual(info['bytes_used'], 456) diff --git a/test/unit/container/test_server.py b/test/unit/container/test_server.py index 4fd319ab90..7495b96648 100644 --- a/test/unit/container/test_server.py +++ b/test/unit/container/test_server.py @@ -17,6 +17,7 @@ import operator import os import mock import unittest +import itertools from contextlib import contextmanager from shutil import rmtree from StringIO import StringIO @@ -24,6 +25,8 @@ from tempfile import mkdtemp from test.unit import FakeLogger from time import gmtime from xml.dom import minidom +import time +import random from eventlet import spawn, Timeout, listen import simplejson @@ -35,8 +38,11 @@ from swift.common import constraints from swift.common.utils import (normalize_timestamp, mkdirs, public, replication, lock_parent_directory) from test.unit import fake_http_connect +from swift.common.storage_policy import POLICY_INDEX, POLICIES from swift.common.request_helpers import get_sys_meta_prefix +from test.unit import patch_policies + @contextmanager def save_globals(): @@ -48,6 +54,7 @@ def save_globals(): swift.container.server.http_connect = orig_http_connect +@patch_policies class TestContainerController(unittest.TestCase): """Test swift.container.server.ContainerController""" def setUp(self): @@ -60,11 +67,46 @@ class TestContainerController(unittest.TestCase): mkdirs(os.path.join(self.testdir, 'sda1', 'tmp')) self.controller = container_server.ContainerController( {'devices': self.testdir, 'mount_check': 'false'}) + # some of the policy tests want at least two policies + self.assert_(len(POLICIES) > 1) def tearDown(self): - """Tear down for testing swift.object_server.ObjectController""" rmtree(os.path.dirname(self.testdir), ignore_errors=1) + def _check_put_container_storage_policy(self, req, policy_index): + resp = req.get_response(self.controller) + self.assertEqual(201, resp.status_int) + req = Request.blank(req.path, method='HEAD') + resp = req.get_response(self.controller) + self.assertEqual(204, resp.status_int) + self.assertEqual(str(policy_index), resp.headers[POLICY_INDEX]) + + def test_get_and_validate_policy_index(self): + # no policy is OK + req = Request.blank('/sda1/p/a/container_default', method='PUT', + headers={'X-Timestamp': '0'}) + self._check_put_container_storage_policy(req, POLICIES.default.idx) + + # bogus policies + for policy in ('nada', 999): + req = Request.blank('/sda1/p/a/c_%s' % policy, method='PUT', + headers={ + 'X-Timestamp': '0', + POLICY_INDEX: policy + }) + resp = req.get_response(self.controller) + self.assertEqual(400, resp.status_int) + self.assert_('invalid' in resp.body.lower()) + + # good policies + for policy in POLICIES: + req = Request.blank('/sda1/p/a/c_%s' % policy.name, method='PUT', + headers={ + 'X-Timestamp': '0', + POLICY_INDEX: policy.idx, + }) + self._check_put_container_storage_policy(req, policy.idx) + def test_acl_container(self): # Ensure no acl by default req = Request.blank( @@ -120,31 +162,95 @@ class TestContainerController(unittest.TestCase): 'account:user') def test_HEAD(self): - req = Request.blank( - '/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT', - 'HTTP_X_TIMESTAMP': '0'}) + start = int(time.time()) + ts = itertools.count(start) + req = Request.blank('/sda1/p/a/c', method='PUT', headers={ + 'x-timestamp': normalize_timestamp(ts.next())}) req.get_response(self.controller) - req = Request.blank( - '/sda1/p/a/c', environ={'REQUEST_METHOD': 'HEAD', - 'HTTP_X_TIMESTAMP': '0'}) + req = Request.blank('/sda1/p/a/c', method='HEAD') response = req.get_response(self.controller) - self.assert_(response.status.startswith('204')) - self.assertEquals(int(response.headers['x-container-bytes-used']), 0) - self.assertEquals(int(response.headers['x-container-object-count']), 0) - req2 = Request.blank( - '/sda1/p/a/c/o', environ={ - 'REQUEST_METHOD': 'PUT', - 'HTTP_X_TIMESTAMP': '1', 'HTTP_X_SIZE': 42, - 'HTTP_X_CONTENT_TYPE': 'text/plain', 'HTTP_X_ETAG': 'x'}) - req2.get_response(self.controller) + self.assertEqual(response.status_int, 204) + self.assertEqual(response.headers['x-container-bytes-used'], '0') + self.assertEqual(response.headers['x-container-object-count'], '0') + obj_put_request = Request.blank( + '/sda1/p/a/c/o', method='PUT', headers={ + 'x-timestamp': normalize_timestamp(ts.next()), + 'x-size': 42, + 'x-content-type': 'text/plain', + 'x-etag': 'x', + }) + obj_put_request.get_response(self.controller) + # re-issue HEAD request response = req.get_response(self.controller) - self.assertEquals(int(response.headers['x-container-bytes-used']), 42) - self.assertEquals(int(response.headers['x-container-object-count']), 1) + self.assertEqual(response.status_int // 100, 2) + self.assertEqual(response.headers['x-container-bytes-used'], '42') + self.assertEqual(response.headers['x-container-object-count'], '1') + # created at time... + self.assert_(float(response.headers['x-timestamp']) >= start) + self.assertEqual(response.headers['x-put-timestamp'], + normalize_timestamp(start)) + + # backend headers + self.assertEqual(int(response.headers[POLICY_INDEX]), + int(POLICIES.default)) + self.assert_(float(response.headers['x-backend-timestamp']) >= start) + self.assertEqual(response.headers['x-backend-put-timestamp'], + normalize_timestamp(start)) + self.assertEqual(response.headers['x-backend-delete-timestamp'], + normalize_timestamp(0)) + self.assertEqual(response.headers['x-backend-status-changed-at'], + normalize_timestamp(start)) def test_HEAD_not_found(self): - req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'HEAD'}) + req = Request.blank('/sda1/p/a/c', method='HEAD') resp = req.get_response(self.controller) - self.assertEquals(resp.status_int, 404) + self.assertEqual(resp.status_int, 404) + self.assertEqual(int(resp.headers[POLICY_INDEX]), 0) + self.assertEqual(resp.headers['x-backend-timestamp'], + normalize_timestamp(0)) + self.assertEqual(resp.headers['x-backend-put-timestamp'], + normalize_timestamp(0)) + self.assertEqual(resp.headers['x-backend-status-changed-at'], + normalize_timestamp(0)) + self.assertEqual(resp.headers['x-backend-delete-timestamp'], + normalize_timestamp(0)) + for header in ('x-container-object-count', 'x-container-bytes-used', + 'x-timestamp', 'x-put-timestamp'): + self.assertEqual(resp.headers[header], None) + + def test_deleted_headers(self): + ts = itertools.count(int(time.time())) + request_method_times = { + 'PUT': normalize_timestamp(ts.next()), + 'DELETE': normalize_timestamp(ts.next()), + } + # setup a deleted container + for method in ('PUT', 'DELETE'): + x_timestamp = request_method_times[method] + req = Request.blank('/sda1/p/a/c', method=method, + headers={'x-timestamp': x_timestamp}) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int // 100, 2) + + for method in ('GET', 'HEAD'): + req = Request.blank('/sda1/p/a/c', method=method) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 404) + # backend headers + self.assertEqual(int(resp.headers[POLICY_INDEX]), + int(POLICIES.default)) + self.assert_(float(resp.headers['x-backend-timestamp']) >= + float(request_method_times['PUT'])) + self.assertEqual(resp.headers['x-backend-put-timestamp'], + request_method_times['PUT']) + self.assertEqual(resp.headers['x-backend-delete-timestamp'], + request_method_times['DELETE']) + self.assertEqual(resp.headers['x-backend-status-changed-at'], + request_method_times['DELETE']) + for header in ('x-container-object-count', + 'x-container-bytes-used', 'x-timestamp', + 'x-put-timestamp'): + self.assertEqual(resp.headers[header], None) def test_HEAD_invalid_partition(self): req = Request.blank('/sda1/./a/c', environ={'REQUEST_METHOD': 'HEAD', @@ -237,6 +343,233 @@ class TestContainerController(unittest.TestCase): resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 404) + def test_PUT_good_policy_specified(self): + policy = random.choice(list(POLICIES)) + # Set metadata header + req = Request.blank('/sda1/p/a/c', method='PUT', + headers={'X-Timestamp': normalize_timestamp(1), + POLICY_INDEX: policy.idx}) + resp = req.get_response(self.controller) + self.assertEquals(resp.status_int, 201) + + # now make sure we read it back + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'}) + resp = req.get_response(self.controller) + self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx)) + + def test_PUT_no_policy_specified(self): + # Set metadata header + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': normalize_timestamp(1)}) + resp = req.get_response(self.controller) + self.assertEquals(resp.status_int, 201) + + # now make sure the default was used (pol 1) + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'}) + resp = req.get_response(self.controller) + self.assertEquals(resp.headers.get(POLICY_INDEX), + str(POLICIES.default.idx)) + + def test_PUT_bad_policy_specified(self): + # Set metadata header + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': normalize_timestamp(1), + POLICY_INDEX: 'nada'}) + resp = req.get_response(self.controller) + # make sure we get bad response + self.assertEquals(resp.status_int, 400) + + def test_PUT_no_policy_change(self): + ts = itertools.count(1) + policy = random.choice(list(POLICIES)) + # Set metadata header + req = Request.blank('/sda1/p/a/c', method='PUT', headers={ + 'X-Timestamp': normalize_timestamp(ts.next()), + POLICY_INDEX: policy.idx}) + resp = req.get_response(self.controller) + self.assertEquals(resp.status_int, 201) + req = Request.blank('/sda1/p/a/c') + resp = req.get_response(self.controller) + self.assertEquals(resp.status_int, 204) + # make sure we get the right index back + self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx)) + + # now try to update w/o changing the policy + for method in ('POST', 'PUT'): + req = Request.blank('/sda1/p/a/c', method=method, headers={ + 'X-Timestamp': normalize_timestamp(ts.next()), + POLICY_INDEX: policy.idx + }) + resp = req.get_response(self.controller) + self.assertEquals(resp.status_int // 100, 2) + # make sure we get the right index back + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'}) + resp = req.get_response(self.controller) + self.assertEquals(resp.status_int, 204) + self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx)) + + def test_PUT_bad_policy_change(self): + ts = itertools.count(1) + policy = random.choice(list(POLICIES)) + # Set metadata header + req = Request.blank('/sda1/p/a/c', method='PUT', headers={ + 'X-Timestamp': normalize_timestamp(ts.next()), + POLICY_INDEX: policy.idx}) + resp = req.get_response(self.controller) + self.assertEquals(resp.status_int, 201) + req = Request.blank('/sda1/p/a/c') + resp = req.get_response(self.controller) + self.assertEquals(resp.status_int, 204) + # make sure we get the right index back + self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx)) + + other_policies = [p for p in POLICIES if p != policy] + for other_policy in other_policies: + # now try to change it and make sure we get a conflict + req = Request.blank('/sda1/p/a/c', method='PUT', headers={ + 'X-Timestamp': normalize_timestamp(ts.next()), + POLICY_INDEX: other_policy.idx + }) + resp = req.get_response(self.controller) + self.assertEquals(resp.status_int, 409) + + # and make sure there is no change! + req = Request.blank('/sda1/p/a/c') + resp = req.get_response(self.controller) + self.assertEquals(resp.status_int, 204) + # make sure we get the right index back + self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx)) + + def test_POST_ignores_policy_change(self): + ts = itertools.count(1) + policy = random.choice(list(POLICIES)) + req = Request.blank('/sda1/p/a/c', method='PUT', headers={ + 'X-Timestamp': normalize_timestamp(ts.next()), + POLICY_INDEX: policy.idx}) + resp = req.get_response(self.controller) + self.assertEquals(resp.status_int, 201) + req = Request.blank('/sda1/p/a/c') + resp = req.get_response(self.controller) + self.assertEquals(resp.status_int, 204) + # make sure we get the right index back + self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx)) + + other_policies = [p for p in POLICIES if p != policy] + for other_policy in other_policies: + # now try to change it and make sure we get a conflict + req = Request.blank('/sda1/p/a/c', method='POST', headers={ + 'X-Timestamp': normalize_timestamp(ts.next()), + POLICY_INDEX: other_policy.idx + }) + resp = req.get_response(self.controller) + # valid request + self.assertEquals(resp.status_int // 100, 2) + + # but it does nothing + req = Request.blank('/sda1/p/a/c') + resp = req.get_response(self.controller) + self.assertEquals(resp.status_int, 204) + # make sure we get the right index back + self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx)) + + def test_PUT_no_policy_for_existing_default(self): + ts = itertools.count(1) + # create a container with the default storage policy + req = Request.blank('/sda1/p/a/c', method='PUT', headers={ + 'X-Timestamp': normalize_timestamp(ts.next()), + }) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 201) # sanity check + + # check the policy index + req = Request.blank('/sda1/p/a/c', method='HEAD') + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 204) + self.assertEqual(resp.headers[POLICY_INDEX], + str(POLICIES.default.idx)) + + # put again without specifying the storage policy + req = Request.blank('/sda1/p/a/c', method='PUT', headers={ + 'X-Timestamp': normalize_timestamp(ts.next()), + }) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 202) # sanity check + + # policy index is unchanged + req = Request.blank('/sda1/p/a/c', method='HEAD') + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 204) + self.assertEqual(resp.headers[POLICY_INDEX], + str(POLICIES.default.idx)) + + def test_PUT_proxy_default_no_policy_for_existing_default(self): + # make it look like the proxy has a different default than we do, like + # during a config change restart across a multi node cluster. + proxy_default = random.choice([p for p in POLICIES if not + p.is_default]) + ts = itertools.count(1) + # create a container with the default storage policy + req = Request.blank('/sda1/p/a/c', method='PUT', headers={ + 'X-Timestamp': normalize_timestamp(ts.next()), + 'X-Backend-Storage-Policy-Default': int(proxy_default), + }) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 201) # sanity check + + # check the policy index + req = Request.blank('/sda1/p/a/c', method='HEAD') + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 204) + self.assertEqual(int(resp.headers[POLICY_INDEX]), + int(proxy_default)) + + # put again without proxy specifying the different default + req = Request.blank('/sda1/p/a/c', method='PUT', headers={ + 'X-Timestamp': normalize_timestamp(ts.next()), + 'X-Backend-Storage-Policy-Default': int(POLICIES.default), + }) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 202) # sanity check + + # policy index is unchanged + req = Request.blank('/sda1/p/a/c', method='HEAD') + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 204) + self.assertEqual(int(resp.headers[POLICY_INDEX]), + int(proxy_default)) + + def test_PUT_no_policy_for_existing_non_default(self): + ts = itertools.count(1) + non_default_policy = [p for p in POLICIES if not p.is_default][0] + # create a container with the non-default storage policy + req = Request.blank('/sda1/p/a/c', method='PUT', headers={ + 'X-Timestamp': normalize_timestamp(ts.next()), + POLICY_INDEX: non_default_policy.idx, + }) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 201) # sanity check + + # check the policy index + req = Request.blank('/sda1/p/a/c', method='HEAD') + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 204) + self.assertEqual(resp.headers[POLICY_INDEX], + str(non_default_policy.idx)) + + # put again without specifiying the storage policy + req = Request.blank('/sda1/p/a/c', method='PUT', headers={ + 'X-Timestamp': normalize_timestamp(ts.next()), + }) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 202) # sanity check + + # policy index is unchanged + req = Request.blank('/sda1/p/a/c', method='HEAD') + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 204) + self.assertEqual(resp.headers[POLICY_INDEX], + str(non_default_policy.idx)) + def test_PUT_GET_metadata(self): # Set metadata header req = Request.blank( @@ -305,20 +638,20 @@ class TestContainerController(unittest.TestCase): req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': normalize_timestamp(1), key: 'Value'}) - resp = self.controller.PUT(req) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 201) - req = Request.blank('/sda1/p/a/c') - resp = self.controller.GET(req) + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'}) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 204) self.assertEquals(resp.headers.get(key.lower()), 'Value') # Set another metadata header, ensuring old one doesn't disappear req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'POST'}, headers={'X-Timestamp': normalize_timestamp(1), key2: 'Value2'}) - resp = self.controller.POST(req) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 204) - req = Request.blank('/sda1/p/a/c') - resp = self.controller.GET(req) + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'}) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 204) self.assertEquals(resp.headers.get(key.lower()), 'Value') self.assertEquals(resp.headers.get(key2.lower()), 'Value2') @@ -326,10 +659,10 @@ class TestContainerController(unittest.TestCase): req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': normalize_timestamp(3), key: 'New Value'}) - resp = self.controller.PUT(req) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 202) - req = Request.blank('/sda1/p/a/c') - resp = self.controller.GET(req) + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'}) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 204) self.assertEquals(resp.headers.get(key.lower()), 'New Value') @@ -337,10 +670,10 @@ class TestContainerController(unittest.TestCase): req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': normalize_timestamp(2), key: 'Old Value'}) - resp = self.controller.PUT(req) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 202) - req = Request.blank('/sda1/p/a/c') - resp = self.controller.GET(req) + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'}) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 204) self.assertEquals(resp.headers.get(key.lower()), 'New Value') @@ -348,10 +681,10 @@ class TestContainerController(unittest.TestCase): req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': normalize_timestamp(4), key: ''}) - resp = self.controller.PUT(req) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 202) - req = Request.blank('/sda1/p/a/c') - resp = self.controller.GET(req) + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'}) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 204) self.assert_(key.lower() not in resp.headers) @@ -437,26 +770,26 @@ class TestContainerController(unittest.TestCase): key = '%sTest' % prefix req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': normalize_timestamp(1)}) - resp = self.controller.PUT(req) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 201) # Set metadata header req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'POST'}, headers={'X-Timestamp': normalize_timestamp(1), key: 'Value'}) - resp = self.controller.POST(req) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 204) req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'HEAD'}) - resp = self.controller.HEAD(req) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 204) self.assertEquals(resp.headers.get(key.lower()), 'Value') # Update metadata header req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'POST'}, headers={'X-Timestamp': normalize_timestamp(3), key: 'New Value'}) - resp = self.controller.POST(req) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 204) req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'HEAD'}) - resp = self.controller.HEAD(req) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 204) self.assertEquals(resp.headers.get(key.lower()), 'New Value') @@ -464,10 +797,10 @@ class TestContainerController(unittest.TestCase): req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'POST'}, headers={'X-Timestamp': normalize_timestamp(2), key: 'Old Value'}) - resp = self.controller.POST(req) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 204) req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'HEAD'}) - resp = self.controller.HEAD(req) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 204) self.assertEquals(resp.headers.get(key.lower()), 'New Value') @@ -475,10 +808,10 @@ class TestContainerController(unittest.TestCase): req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'POST'}, headers={'X-Timestamp': normalize_timestamp(4), key: ''}) - resp = self.controller.POST(req) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 204) req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'HEAD'}) - resp = self.controller.HEAD(req) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 204) self.assert_(key.lower() not in resp.headers) @@ -780,11 +1113,22 @@ class TestContainerController(unittest.TestCase): req = Request.blank(path, method='GET') resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 404) # sanity + # backend headers + expectations = { + 'x-backend-put-timestamp': normalize_timestamp(1), + 'x-backend-delete-timestamp': normalize_timestamp(2), + 'x-backend-status-changed-at': normalize_timestamp(2), + } + for header, value in expectations.items(): + self.assertEqual(resp.headers[header], value, + 'response header %s was %s not %s' % ( + header, resp.headers[header], value)) db = self.controller._get_container_broker('sda1', 'p', 'a', 'c') self.assertEqual(True, db.is_deleted()) info = db.get_info() self.assertEquals(info['put_timestamp'], normalize_timestamp('1')) self.assertEquals(info['delete_timestamp'], normalize_timestamp('2')) + self.assertEquals(info['status_changed_at'], normalize_timestamp('2')) # recreate req = Request.blank(path, method='PUT', headers={'X-Timestamp': '4'}) @@ -795,6 +1139,20 @@ class TestContainerController(unittest.TestCase): info = db.get_info() self.assertEquals(info['put_timestamp'], normalize_timestamp('4')) self.assertEquals(info['delete_timestamp'], normalize_timestamp('2')) + self.assertEquals(info['status_changed_at'], normalize_timestamp('4')) + for method in ('GET', 'HEAD'): + req = Request.blank(path) + resp = req.get_response(self.controller) + expectations = { + 'x-put-timestamp': normalize_timestamp(4), + 'x-backend-put-timestamp': normalize_timestamp(4), + 'x-backend-delete-timestamp': normalize_timestamp(2), + 'x-backend-status-changed-at': normalize_timestamp(4), + } + for header, expected in expectations.items(): + self.assertEqual(resp.headers[header], expected, + 'header %s was %s is not expected %s' % ( + header, resp.headers[header], expected)) def test_DELETE_PUT_recreate_replication_race(self): path = '/sda1/p/a/c' @@ -862,6 +1220,71 @@ class TestContainerController(unittest.TestCase): resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 404) + def test_change_storage_policy_via_DELETE_then_PUT(self): + ts = (normalize_timestamp(t) for t in + itertools.count(int(time.time()))) + policy = random.choice(list(POLICIES)) + req = Request.blank( + '/sda1/p/a/c', method='PUT', + headers={'X-Timestamp': ts.next(), + POLICY_INDEX: policy.idx}) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 201) # sanity check + + # try re-recreate with other policies + other_policies = [p for p in POLICIES if p != policy] + for other_policy in other_policies: + # first delete the existing container + req = Request.blank('/sda1/p/a/c', method='DELETE', headers={ + 'X-Timestamp': ts.next()}) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 204) # sanity check + + # at this point, the DB should still exist but be in a deleted + # state, so changing the policy index is perfectly acceptable + req = Request.blank('/sda1/p/a/c', method='PUT', headers={ + 'X-Timestamp': ts.next(), + POLICY_INDEX: other_policy.idx}) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 201) # sanity check + + req = Request.blank( + '/sda1/p/a/c', method='HEAD') + resp = req.get_response(self.controller) + self.assertEqual(resp.headers[POLICY_INDEX], + str(other_policy.idx)) + + def test_change_to_default_storage_policy_via_DELETE_then_PUT(self): + ts = (normalize_timestamp(t) for t in + itertools.count(int(time.time()))) + non_default_policy = random.choice([p for p in POLICIES + if not p.is_default]) + req = Request.blank('/sda1/p/a/c', method='PUT', headers={ + 'X-Timestamp': ts.next(), + POLICY_INDEX: non_default_policy.idx, + }) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 201) # sanity check + + req = Request.blank( + '/sda1/p/a/c', method='DELETE', + headers={'X-Timestamp': ts.next()}) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 204) # sanity check + + # at this point, the DB should still exist but be in a deleted state, + # so changing the policy index is perfectly acceptable + req = Request.blank( + '/sda1/p/a/c', method='PUT', + headers={'X-Timestamp': ts.next()}) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 201) # sanity check + + req = Request.blank('/sda1/p/a/c', method='HEAD') + resp = req.get_response(self.controller) + self.assertEqual(resp.headers[POLICY_INDEX], + str(POLICIES.default.idx)) + def test_DELETE_object(self): req = Request.blank( '/sda1/p/a/c', diff --git a/test/unit/container/test_updater.py b/test/unit/container/test_updater.py index 5c32dfa3b0..8c15b93ce8 100644 --- a/test/unit/container/test_updater.py +++ b/test/unit/container/test_updater.py @@ -91,7 +91,7 @@ class TestContainerUpdater(unittest.TestCase): os.mkdir(subdir) cb = ContainerBroker(os.path.join(subdir, 'hash.db'), account='a', container='c') - cb.initialize(normalize_timestamp(1)) + cb.initialize(normalize_timestamp(1), 0) cu.run_once() info = cb.get_info() self.assertEquals(info['object_count'], 0) @@ -172,7 +172,7 @@ class TestContainerUpdater(unittest.TestCase): os.mkdir(subdir) cb = ContainerBroker(os.path.join(subdir, 'hash.db'), account='a', container='\xce\xa9') - cb.initialize(normalize_timestamp(1)) + cb.initialize(normalize_timestamp(1), 0) cb.put_object('\xce\xa9', normalize_timestamp(2), 3, 'text/plain', '68b329da9893e34099c7d8ad5cb9c940') diff --git a/test/unit/proxy/controllers/test_container.py b/test/unit/proxy/controllers/test_container.py index 7c8ecf7075..eda000fe98 100644 --- a/test/unit/proxy/controllers/test_container.py +++ b/test/unit/proxy/controllers/test_container.py @@ -20,11 +20,17 @@ from swift.common.swob import Request from swift.proxy import server as proxy_server from swift.proxy.controllers.base import headers_to_container_info from test.unit import fake_http_connect, FakeRing, FakeMemcache +from swift.common.storage_policy import StoragePolicy from swift.common.request_helpers import get_sys_meta_prefix +from test.unit import patch_policies +from test.unit.common.ring.test_ring import TestRingBase -class TestContainerController(unittest.TestCase): + +@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())]) +class TestContainerController(TestRingBase): def setUp(self): + TestRingBase.setUp(self) self.app = proxy_server.Application(None, FakeMemcache(), account_ring=FakeRing(), container_ring=FakeRing(), diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 1e47ef9564..41654a0d55 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -27,6 +27,7 @@ from tempfile import mkdtemp import weakref import operator import re +import random import mock from eventlet import sleep, spawn, wsgi, listen @@ -34,7 +35,8 @@ import simplejson from test.unit import ( connect_tcp, readuntil2crlfs, FakeLogger, fake_http_connect, FakeRing, - FakeMemcache, debug_logger, patch_policies, write_fake_ring) + FakeMemcache, debug_logger, patch_policies, write_fake_ring, + mocked_http_conn) from swift.proxy import server as proxy_server from swift.account import server as account_server from swift.container import server as container_server @@ -49,10 +51,11 @@ from swift.proxy.controllers import base as proxy_base from swift.proxy.controllers.base import get_container_memcache_key, \ get_account_memcache_key, cors_validation import swift.proxy.controllers -from swift.common.request_helpers import get_sys_meta_prefix -from swift.common.storage_policy import StoragePolicy from swift.common.swob import Request, Response, HTTPUnauthorized, \ HTTPException +from swift.common.storage_policy import StoragePolicy, \ + POLICIES, POLICY, POLICY_INDEX +from swift.common.request_helpers import get_sys_meta_prefix # mocks logging.getLogger().addHandler(logging.StreamHandler(sys.stdout)) @@ -4170,6 +4173,11 @@ class TestObjectController(unittest.TestCase): ]) +@patch_policies([ + StoragePolicy(0, 'zero', True, object_ring=FakeRing()), + StoragePolicy(1, 'one', False, object_ring=FakeRing()), + StoragePolicy(2, 'two', False, True, object_ring=FakeRing()) +]) class TestContainerController(unittest.TestCase): "Test swift.proxy_server.ContainerController" @@ -4178,7 +4186,76 @@ class TestContainerController(unittest.TestCase): account_ring=FakeRing(), container_ring=FakeRing(), object_ring=FakeRing(), - logger=FakeLogger()) + logger=debug_logger()) + + def test_convert_policy_to_index(self): + controller = swift.proxy.controllers.ContainerController(self.app, + 'a', 'c') + expected = { + 'zero': 0, + 'ZeRo': 0, + 'one': 1, + 'OnE': 1, + } + for name, index in expected.items(): + req = Request.blank('/a/c', headers={'Content-Length': '0', + 'Content-Type': 'text/plain', + POLICY: name}) + self.assertEqual(controller._convert_policy_to_index(req), index) + # default test + req = Request.blank('/a/c', headers={'Content-Length': '0', + 'Content-Type': 'text/plain'}) + self.assertEqual(controller._convert_policy_to_index(req), None) + # negative test + req = Request.blank('/a/c', headers={'Content-Length': '0', + 'Content-Type': 'text/plain', POLICY: 'nada'}) + self.assertRaises(HTTPException, controller._convert_policy_to_index, + req) + # storage policy two is deprecated + req = Request.blank('/a/c', headers={'Content-Length': '0', + 'Content-Type': 'text/plain', + POLICY: 'two'}) + self.assertRaises(HTTPException, controller._convert_policy_to_index, + req) + + def test_convert_index_to_name(self): + policy = random.choice(list(POLICIES)) + req = Request.blank('/v1/a/c') + with mocked_http_conn( + 200, 200, + headers={POLICY_INDEX: int(policy)}, + ) as fake_conn: + resp = req.get_response(self.app) + self.assertRaises(StopIteration, fake_conn.code_iter.next) + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.headers[POLICY], policy.name) + + def test_no_convert_index_to_name_when_container_not_found(self): + policy = random.choice(list(POLICIES)) + req = Request.blank('/v1/a/c') + with mocked_http_conn( + 200, 404, 404, 404, + headers={POLICY_INDEX: int(policy)}) as fake_conn: + resp = req.get_response(self.app) + self.assertRaises(StopIteration, fake_conn.code_iter.next) + self.assertEqual(resp.status_int, 404) + self.assertEqual(resp.headers[POLICY], None) + + def test_error_convert_index_to_name(self): + req = Request.blank('/v1/a/c') + with mocked_http_conn( + 200, 200, + headers={POLICY_INDEX: '-1'}) as fake_conn: + resp = req.get_response(self.app) + self.assertRaises(StopIteration, fake_conn.code_iter.next) + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.headers[POLICY], None) + error_lines = self.app.logger.get_lines_for_level('error') + self.assertEqual(2, len(error_lines)) + for msg in error_lines: + expected = "Could not translate " \ + "X-Backend-Storage-Policy-Index ('-1')" + self.assertTrue(expected in msg) def test_transfer_headers(self): src_headers = {'x-remove-versions-location': 'x', @@ -4287,6 +4364,59 @@ class TestContainerController(unittest.TestCase): self.app.account_autocreate = True test_status_map((404, 404, 404), 404, None, 404) + def test_PUT_policy_headers(self): + backend_requests = [] + + def capture_requests(ipaddr, port, device, partition, method, + path, headers=None, query_string=None): + if method == 'PUT': + backend_requests.append(headers) + + def test_policy(requested_policy): + with save_globals(): + mock_conn = set_http_connect(200, 201, 201, 201, + give_connect=capture_requests) + self.app.memcache.store = {} + req = Request.blank('/v1/a/test', method='PUT', + headers={'Content-Length': 0}) + if requested_policy: + expected_policy = requested_policy + req.headers[POLICY] = policy.name + else: + expected_policy = POLICIES.default + res = req.get_response(self.app) + if expected_policy.is_deprecated: + self.assertEquals(res.status_int, 400) + self.assertEqual(0, len(backend_requests)) + expected = 'is deprecated' + self.assertTrue(expected in res.body, + '%r did not include %r' % ( + res.body, expected)) + return + self.assertEquals(res.status_int, 201) + self.assertEqual( + expected_policy.object_ring.replicas, + len(backend_requests)) + for headers in backend_requests: + if not requested_policy: + self.assertFalse(POLICY_INDEX in headers) + self.assertTrue( + 'X-Backend-Storage-Policy-Default' in headers) + self.assertEqual( + int(expected_policy), + int(headers['X-Backend-Storage-Policy-Default'])) + else: + self.assertTrue(POLICY_INDEX in headers) + self.assertEqual(int(headers[POLICY_INDEX]), + policy.idx) + # make sure all mocked responses are consumed + self.assertRaises(StopIteration, mock_conn.code_iter.next) + + test_policy(None) # no policy header + for policy in POLICIES: + backend_requests = [] # reset backend requests + test_policy(policy) + def test_PUT(self): with save_globals(): controller = proxy_server.ContainerController(self.app, 'account', @@ -5930,6 +6060,7 @@ class TestProxyObjectPerformance(unittest.TestCase): @patch_policies([StoragePolicy(0, 'migrated'), StoragePolicy(1, 'ernie', True), + StoragePolicy(2, 'deprecated', is_deprecated=True), StoragePolicy(3, 'bert')]) class TestSwiftInfo(unittest.TestCase): def setUp(self): @@ -5971,6 +6102,8 @@ class TestSwiftInfo(unittest.TestCase): self.assertTrue('policies' in si) sorted_pols = sorted(si['policies'], key=operator.itemgetter('name')) self.assertEqual(len(sorted_pols), 3) + for policy in sorted_pols: + self.assertNotEquals(policy['name'], 'deprecated') self.assertEqual(sorted_pols[0]['name'], 'bert') self.assertEqual(sorted_pols[1]['name'], 'ernie') self.assertEqual(sorted_pols[2]['name'], 'migrated')