diff --git a/swift/account/backend.py b/swift/account/backend.py index f29e64d93b..1a940c49ec 100644 --- a/swift/account/backend.py +++ b/swift/account/backend.py @@ -37,7 +37,7 @@ class AccountBroker(DatabaseBroker): db_contains_type = 'container' db_reclaim_timestamp = 'delete_timestamp' - def _initialize(self, conn, put_timestamp): + def _initialize(self, conn, put_timestamp, **kwargs): """ Create a brand new account database (tables, indices, triggers, etc.) diff --git a/swift/common/db.py b/swift/common/db.py index e3346bea3e..e6812d86cd 100644 --- a/swift/common/db.py +++ b/swift/common/db.py @@ -215,11 +215,15 @@ class DatabaseBroker(object): """ return self.db_file - def initialize(self, put_timestamp=None): + def initialize(self, put_timestamp=None, storage_policy_index=None): """ Create the DB + The storage_policy_index is passed through to the subclass's + ``_initialize`` method. It is ignored by ``AccountBroker``. + :param put_timestamp: timestamp of initial PUT request + :param storage_policy_index: only required for containers """ if self.db_file == ':memory:': tmp_db_file = None @@ -277,7 +281,8 @@ class DatabaseBroker(object): """) if not put_timestamp: put_timestamp = normalize_timestamp(0) - self._initialize(conn, put_timestamp) + self._initialize(conn, put_timestamp, + storage_policy_index=storage_policy_index) conn.commit() if tmp_db_file: conn.close() @@ -428,6 +433,7 @@ class DatabaseBroker(object): :param put_timestamp: put timestamp :param delete_timestamp: delete timestamp """ + current_status = self.is_deleted() with self.get() as conn: conn.execute(''' UPDATE %s_stat SET created_at=MIN(?, created_at), @@ -435,6 +441,9 @@ class DatabaseBroker(object): delete_timestamp=MAX(?, delete_timestamp) ''' % self.db_type, (created_at, put_timestamp, delete_timestamp)) conn.commit() + if self.is_deleted() != current_status: + timestamp = normalize_timestamp(time.time()) + self.update_status_changed_at(timestamp) def get_items_since(self, start, count): """ @@ -486,32 +495,36 @@ class DatabaseBroker(object): result.append({'remote_id': row[0], 'sync_point': row[1]}) return result + def get_max_row(self): + query = ''' + SELECT SQLITE_SEQUENCE.seq + FROM SQLITE_SEQUENCE + WHERE SQLITE_SEQUENCE.name == '%s' + LIMIT 1 + ''' % (self.db_contains_type) + with self.get() as conn: + row = conn.execute(query).fetchone() + return row[0] if row else -1 + def get_replication_info(self): """ Get information about the DB required for replication. - :returns: dict containing keys: hash, id, created_at, put_timestamp, - delete_timestamp, count, max_row, and metadata + :returns: dict containing keys from get_info plus max_row and metadata + + Note:: get_info's _count is translated to just + "count" and metadata is the raw string. """ + info = self.get_info() + info['count'] = info.pop('%s_count' % self.db_contains_type) + info['metadata'] = self.get_raw_metadata() + info['max_row'] = self.get_max_row() + return info + + def get_info(self): self._commit_puts_stale_ok() - query_part1 = ''' - SELECT hash, id, created_at, put_timestamp, delete_timestamp, - %s_count AS count, - CASE WHEN SQLITE_SEQUENCE.seq IS NOT NULL - THEN SQLITE_SEQUENCE.seq ELSE -1 END AS max_row, ''' % \ - self.db_contains_type - query_part2 = ''' - FROM (%s_stat LEFT JOIN SQLITE_SEQUENCE - ON SQLITE_SEQUENCE.name == '%s') LIMIT 1 - ''' % (self.db_type, self.db_contains_type) with self.get() as conn: - try: - curs = conn.execute(query_part1 + 'metadata' + query_part2) - except sqlite3.OperationalError as err: - if 'no such column: metadata' not in str(err): - raise - curs = conn.execute(query_part1 + "'' as metadata" + - query_part2) + curs = conn.execute('SELECT * from %s_stat' % self.db_type) curs.row_factory = dict_factory return curs.fetchone() @@ -621,13 +634,7 @@ class DatabaseBroker(object): with open(self.db_file, 'rb+') as fp: fallocate(fp.fileno(), int(prealloc_size)) - @property - def metadata(self): - """ - Returns the metadata dict for the database. The metadata dict values - are tuples of (value, timestamp) where the timestamp indicates when - that key was set to that value. - """ + def get_raw_metadata(self): with self.get() as conn: try: metadata = conn.execute('SELECT metadata FROM %s_stat' % @@ -636,6 +643,16 @@ class DatabaseBroker(object): if 'no such column: metadata' not in str(err): raise metadata = '' + return metadata + + @property + def metadata(self): + """ + Returns the metadata dict for the database. The metadata dict values + are tuples of (value, timestamp) where the timestamp indicates when + that key was set to that value. + """ + metadata = self.get_raw_metadata() if metadata: metadata = json.loads(metadata) utf8encodekeys(metadata) @@ -758,3 +775,16 @@ class DatabaseBroker(object): ' WHERE put_timestamp < ?' % self.db_type, (timestamp, timestamp)) conn.commit() + + def update_status_changed_at(self, timestamp): + """ + Update the status_changed_at field in the stat table. Only + modifies status_changed_at if the timestamp is greater than the + current status_changed_at timestamp. + """ + with self.get() as conn: + conn.execute( + 'UPDATE %s_stat SET status_changed_at = ?' + ' WHERE status_changed_at < ?' % self.db_type, + (timestamp, timestamp)) + conn.commit() diff --git a/swift/container/auditor.py b/swift/container/auditor.py index 104cc4889f..c7d3aec893 100644 --- a/swift/container/auditor.py +++ b/swift/container/auditor.py @@ -30,9 +30,9 @@ from swift.common.daemon import Daemon class ContainerAuditor(Daemon): """Audit containers.""" - def __init__(self, conf): + def __init__(self, conf, logger=None): self.conf = conf - self.logger = get_logger(conf, log_route='container-auditor') + self.logger = logger or get_logger(conf, log_route='container-auditor') self.devices = conf.get('devices', '/srv/node') self.mount_check = config_true_value(conf.get('mount_check', 'true')) self.interval = int(conf.get('interval', 1800)) diff --git a/swift/container/backend.py b/swift/container/backend.py index 38cdaac89e..584cb4ef4f 100644 --- a/swift/container/backend.py +++ b/swift/container/backend.py @@ -30,6 +30,109 @@ from swift.common.db import DatabaseBroker, DatabaseConnectionError, \ DATADIR = 'containers' +POLICY_STAT_TABLE_CREATE = ''' + CREATE TABLE policy_stat ( + storage_policy_index INTEGER PRIMARY KEY, + object_count INTEGER DEFAULT 0, + bytes_used INTEGER DEFAULT 0 + ); +''' + +POLICY_STAT_TRIGGER_SCRIPT = ''' + CREATE TRIGGER object_insert_policy_stat AFTER INSERT ON object + BEGIN + UPDATE policy_stat + SET object_count = object_count + (1 - new.deleted), + bytes_used = bytes_used + new.size + WHERE storage_policy_index = new.storage_policy_index; + INSERT INTO policy_stat ( + storage_policy_index, object_count, bytes_used) + SELECT new.storage_policy_index, + (1 - new.deleted), + new.size + WHERE NOT EXISTS( + SELECT changes() as change + FROM policy_stat + WHERE change <> 0 + ); + UPDATE container_info + SET hash = chexor(hash, new.name, new.created_at); + END; + + CREATE TRIGGER object_delete_policy_stat AFTER DELETE ON object + BEGIN + UPDATE policy_stat + SET object_count = object_count - (1 - old.deleted), + bytes_used = bytes_used - old.size + WHERE storage_policy_index = old.storage_policy_index; + UPDATE container_info + SET hash = chexor(hash, old.name, old.created_at); + END; +''' + +CONTAINER_INFO_TABLE_SCRIPT = ''' + CREATE TABLE container_info ( + account TEXT, + container TEXT, + created_at TEXT, + put_timestamp TEXT DEFAULT '0', + delete_timestamp TEXT DEFAULT '0', + reported_put_timestamp TEXT DEFAULT '0', + reported_delete_timestamp TEXT DEFAULT '0', + reported_object_count INTEGER DEFAULT 0, + reported_bytes_used INTEGER DEFAULT 0, + hash TEXT default '00000000000000000000000000000000', + id TEXT, + status TEXT DEFAULT '', + status_changed_at TEXT DEFAULT '0', + metadata TEXT DEFAULT '', + x_container_sync_point1 INTEGER DEFAULT -1, + x_container_sync_point2 INTEGER DEFAULT -1, + storage_policy_index INTEGER DEFAULT 0, + reconciler_sync_point INTEGER DEFAULT -1 + ); +''' + +CONTAINER_STAT_VIEW_SCRIPT = ''' + CREATE VIEW container_stat + AS SELECT ci.account, ci.container, ci.created_at, + ci.put_timestamp, ci.delete_timestamp, + ci.reported_put_timestamp, ci.reported_delete_timestamp, + ci.reported_object_count, ci.reported_bytes_used, ci.hash, + ci.id, ci.status, ci.status_changed_at, ci.metadata, + ci.x_container_sync_point1, ci.x_container_sync_point2, + ci.reconciler_sync_point, + ci.storage_policy_index, + coalesce(ps.object_count, 0) AS object_count, + coalesce(ps.bytes_used, 0) AS bytes_used + FROM container_info ci LEFT JOIN policy_stat ps + ON ci.storage_policy_index = ps.storage_policy_index; + + CREATE TRIGGER container_stat_update + INSTEAD OF UPDATE ON container_stat + BEGIN + UPDATE container_info + SET account = NEW.account, + container = NEW.container, + created_at = NEW.created_at, + put_timestamp = NEW.put_timestamp, + delete_timestamp = NEW.delete_timestamp, + reported_put_timestamp = NEW.reported_put_timestamp, + reported_delete_timestamp = NEW.reported_delete_timestamp, + reported_object_count = NEW.reported_object_count, + reported_bytes_used = NEW.reported_bytes_used, + hash = NEW.hash, + id = NEW.id, + status = NEW.status, + status_changed_at = NEW.status_changed_at, + metadata = NEW.metadata, + x_container_sync_point1 = NEW.x_container_sync_point1, + x_container_sync_point2 = NEW.x_container_sync_point2, + storage_policy_index = NEW.storage_policy_index, + reconciler_sync_point = NEW.reconciler_sync_point; + END; +''' + class ContainerBroker(DatabaseBroker): """Encapsulates working with a container database.""" @@ -37,7 +140,14 @@ class ContainerBroker(DatabaseBroker): db_contains_type = 'object' db_reclaim_timestamp = 'created_at' - def _initialize(self, conn, put_timestamp): + @property + def storage_policy_index(self): + if not hasattr(self, '_storage_policy_index'): + self._storage_policy_index = \ + self.get_info()['storage_policy_index'] + return self._storage_policy_index + + def _initialize(self, conn, put_timestamp, storage_policy_index): """ Create a brand new container database (tables, indices, triggers, etc.) """ @@ -48,7 +158,9 @@ class ContainerBroker(DatabaseBroker): raise ValueError( 'Attempting to create a new database with no container set') self.create_object_table(conn) - self.create_container_stat_table(conn, put_timestamp) + self.create_policy_stat_table(conn, storage_policy_index) + self.create_container_info_table(conn, put_timestamp, + storage_policy_index) def create_object_table(self, conn): """ @@ -65,74 +177,70 @@ class ContainerBroker(DatabaseBroker): size INTEGER, content_type TEXT, etag TEXT, - deleted INTEGER DEFAULT 0 + deleted INTEGER DEFAULT 0, + storage_policy_index INTEGER DEFAULT 0 ); CREATE INDEX ix_object_deleted_name ON object (deleted, name); - CREATE TRIGGER object_insert AFTER INSERT ON object - BEGIN - UPDATE container_stat - SET object_count = object_count + (1 - new.deleted), - bytes_used = bytes_used + new.size, - hash = chexor(hash, new.name, new.created_at); - END; - CREATE TRIGGER object_update BEFORE UPDATE ON object BEGIN SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT'); END; - CREATE TRIGGER object_delete AFTER DELETE ON object - BEGIN - UPDATE container_stat - SET object_count = object_count - (1 - old.deleted), - bytes_used = bytes_used - old.size, - hash = chexor(hash, old.name, old.created_at); - END; - """) + """ + POLICY_STAT_TRIGGER_SCRIPT) - def create_container_stat_table(self, conn, put_timestamp=None): + def create_container_info_table(self, conn, put_timestamp, + storage_policy_index): """ - Create the container_stat table which is specific to the container DB. + Create the container_info table which is specific to the container DB. Not a part of Pluggable Back-ends, internal to the baseline code. + Also creates the container_stat view. :param conn: DB connection object :param put_timestamp: put timestamp + :param storage_policy_index: storage policy index """ if put_timestamp is None: put_timestamp = normalize_timestamp(0) - conn.executescript(""" - CREATE TABLE container_stat ( - account TEXT, - container TEXT, - created_at TEXT, - put_timestamp TEXT DEFAULT '0', - delete_timestamp TEXT DEFAULT '0', - object_count INTEGER, - bytes_used INTEGER, - reported_put_timestamp TEXT DEFAULT '0', - reported_delete_timestamp TEXT DEFAULT '0', - reported_object_count INTEGER DEFAULT 0, - reported_bytes_used INTEGER DEFAULT 0, - hash TEXT default '00000000000000000000000000000000', - id TEXT, - status TEXT DEFAULT '', - status_changed_at TEXT DEFAULT '0', - metadata TEXT DEFAULT '', - x_container_sync_point1 INTEGER DEFAULT -1, - x_container_sync_point2 INTEGER DEFAULT -1 - ); + # The container_stat view is for compatibility; old versions of Swift + # expected a container_stat table with columns "object_count" and + # "bytes_used", but when that stuff became per-storage-policy and + # moved to the policy_stat table, we stopped creating those columns in + # container_stat. + # + # To retain compatibility, we create the container_stat view with some + # triggers to make it behave like the old container_stat table. This + # way, if an old version of Swift encounters a database with the new + # schema, it can still work. + # + # Note that this can occur during a rolling Swift upgrade if a DB gets + # rsynced from an old node to a new, so it's necessary for + # availability during upgrades. The fact that it enables downgrades is + # a nice bonus. + conn.executescript(CONTAINER_INFO_TABLE_SCRIPT + + CONTAINER_STAT_VIEW_SCRIPT) + conn.execute(""" + INSERT INTO container_info (account, container, created_at, id, + put_timestamp, status_changed_at, storage_policy_index) + VALUES (?, ?, ?, ?, ?, ?, ?); + """, (self.account, self.container, normalize_timestamp(time.time()), + str(uuid4()), put_timestamp, put_timestamp, + storage_policy_index)) - INSERT INTO container_stat (object_count, bytes_used) - VALUES (0, 0); - """) - conn.execute(''' - UPDATE container_stat - SET account = ?, container = ?, created_at = ?, id = ?, - put_timestamp = ? - ''', (self.account, self.container, normalize_timestamp(time.time()), - str(uuid4()), put_timestamp)) + def create_policy_stat_table(self, conn, storage_policy_index=0): + """ + Create policy_stat table. + + :param conn: DB connection object + :param storage_policy_index: the policy_index the container is + being created with + """ + conn.executescript(POLICY_STAT_TABLE_CREATE) + conn.execute(""" + INSERT INTO policy_stat (storage_policy_index) + VALUES (?) + """, (storage_policy_index,)) def get_db_version(self, conn): if self._db_version == -1: @@ -165,14 +273,19 @@ class ContainerBroker(DatabaseBroker): def _commit_puts_load(self, item_list, entry): """See :func:`swift.common.db.DatabaseBroker._commit_puts_load`""" - (name, timestamp, size, content_type, etag, deleted) = \ - pickle.loads(entry.decode('base64')) + data = pickle.loads(entry.decode('base64')) + (name, timestamp, size, content_type, etag, deleted) = data[:6] + if len(data) > 6: + storage_policy_index = data[6] + else: + storage_policy_index = 0 item_list.append({'name': name, 'created_at': timestamp, 'size': size, 'content_type': content_type, 'etag': etag, - 'deleted': deleted}) + 'deleted': deleted, + 'storage_policy_index': storage_policy_index}) def empty(self): """ @@ -182,20 +295,30 @@ class ContainerBroker(DatabaseBroker): """ self._commit_puts_stale_ok() with self.get() as conn: - row = conn.execute( - 'SELECT object_count from container_stat').fetchone() + try: + row = conn.execute( + 'SELECT max(object_count) from policy_stat').fetchone() + except sqlite3.OperationalError as err: + if not any(msg in str(err) for msg in ( + "no such column: storage_policy_index", + "no such table: policy_stat")): + raise + row = conn.execute( + 'SELECT object_count from container_stat').fetchone() return (row[0] == 0) - def delete_object(self, name, timestamp): + def delete_object(self, name, timestamp, storage_policy_index=0): """ Mark an object deleted. :param name: object name to be deleted :param timestamp: timestamp when the object was marked as deleted """ - self.put_object(name, timestamp, 0, 'application/deleted', 'noetag', 1) + self.put_object(name, timestamp, 0, 'application/deleted', 'noetag', + deleted=1, storage_policy_index=storage_policy_index) - def put_object(self, name, timestamp, size, content_type, etag, deleted=0): + def put_object(self, name, timestamp, size, content_type, etag, deleted=0, + storage_policy_index=0): """ Creates an object in the DB with its metadata. @@ -206,10 +329,12 @@ class ContainerBroker(DatabaseBroker): :param etag: object etag :param deleted: if True, marks the object as deleted and sets the deteleted_at timestamp to timestamp + :param storage_policy_index: the storage policy index for the object """ record = {'name': name, 'created_at': timestamp, 'size': size, 'content_type': content_type, 'etag': etag, - 'deleted': deleted} + 'deleted': deleted, + 'storage_policy_index': storage_policy_index} if self.db_file == ':memory:': self.merge_items([record]) return @@ -231,93 +356,101 @@ class ContainerBroker(DatabaseBroker): # delimiter fp.write(':') fp.write(pickle.dumps( - (name, timestamp, size, content_type, etag, deleted), + (name, timestamp, size, content_type, etag, deleted, + storage_policy_index), protocol=PICKLE_PROTOCOL).encode('base64')) fp.flush() - def is_deleted(self, timestamp=None): + def is_deleted(self, **kwargs): """ Check if the DB is considered to be deleted. :returns: True if the DB is considered to be deleted, False otherwise """ + _info, is_deleted = self.get_info_is_deleted(**kwargs) + return is_deleted + + def get_info_is_deleted(self, timestamp=None): + """ + Get the is_deleted status and info for the container. + + :returns: a tuple, in the form (info, is_deleted) info is a dict as + returned by get_info and is_deleted is a boolean. + """ if self.db_file != ':memory:' and not os.path.exists(self.db_file): - return True - self._commit_puts_stale_ok() - with self.get() as conn: - row = conn.execute(''' - SELECT put_timestamp, delete_timestamp, object_count - FROM container_stat''').fetchone() - # leave this db as a tombstone for a consistency window - if timestamp and row['delete_timestamp'] > timestamp: - return False - # The container is considered deleted if the delete_timestamp - # value is greater than the put_timestamp, and there are no - # objects in the container. - return (row['object_count'] in (None, '', 0, '0')) and \ - (float(row['delete_timestamp']) > float(row['put_timestamp'])) + return {}, True + info = self.get_info() + # leave this db as a tombstone for a consistency window + if timestamp and info['delete_timestamp'] > timestamp: + return info, False + # The container is considered deleted if the delete_timestamp + # value is greater than the put_timestamp, and there are no + # objects in the container. + is_deleted = (info['object_count'] in (None, '', 0, '0')) and \ + (float(info['delete_timestamp']) > float(info['put_timestamp'])) + return info, is_deleted def get_info(self): """ Get global data for the container. :returns: dict with keys: account, container, created_at, - put_timestamp, delete_timestamp, object_count, bytes_used, - reported_put_timestamp, reported_delete_timestamp, - reported_object_count, reported_bytes_used, hash, id, - x_container_sync_point1, and x_container_sync_point2. + put_timestamp, delete_timestamp, status_changed_at, + object_count, bytes_used, reported_put_timestamp, + reported_delete_timestamp, reported_object_count, + reported_bytes_used, hash, id, x_container_sync_point1, + x_container_sync_point2, and storage_policy_index. """ self._commit_puts_stale_ok() with self.get() as conn: data = None - trailing = 'x_container_sync_point1, x_container_sync_point2' + trailing_sync = 'x_container_sync_point1, x_container_sync_point2' + trailing_pol = 'storage_policy_index' + errors = set() while not data: try: - data = conn.execute(''' + data = conn.execute((''' SELECT account, container, created_at, put_timestamp, - delete_timestamp, object_count, bytes_used, + delete_timestamp, status_changed_at, + object_count, bytes_used, reported_put_timestamp, reported_delete_timestamp, reported_object_count, reported_bytes_used, hash, - id, %s - FROM container_stat - ''' % (trailing,)).fetchone() + id, %s, %s + FROM container_stat + ''') % (trailing_sync, trailing_pol)).fetchone() except sqlite3.OperationalError as err: - if 'no such column: x_container_sync_point' in str(err): - trailing = '-1 AS x_container_sync_point1, ' \ - '-1 AS x_container_sync_point2' + err_msg = str(err) + if err_msg in errors: + # only attempt migration once + raise + errors.add(err_msg) + if 'no such column: storage_policy_index' in err_msg: + trailing_pol = '0 AS storage_policy_index' + elif 'no such column: x_container_sync_point' in err_msg: + trailing_sync = '-1 AS x_container_sync_point1, ' \ + '-1 AS x_container_sync_point2' else: raise data = dict(data) + # populate instance cache + self._storage_policy_index = data['storage_policy_index'] + self.account = data['account'] + self.container = data['container'] return data def set_x_container_sync_points(self, sync_point1, sync_point2): with self.get() as conn: - orig_isolation_level = conn.isolation_level try: - # We turn off auto-transactions to ensure the alter table - # commands are part of the transaction. - conn.isolation_level = None - conn.execute('BEGIN') - try: - self._set_x_container_sync_points(conn, sync_point1, - sync_point2) - except sqlite3.OperationalError as err: - if 'no such column: x_container_sync_point' not in \ - str(err): - raise - conn.execute(''' - ALTER TABLE container_stat - ADD COLUMN x_container_sync_point1 INTEGER DEFAULT -1 - ''') - conn.execute(''' - ALTER TABLE container_stat - ADD COLUMN x_container_sync_point2 INTEGER DEFAULT -1 - ''') - self._set_x_container_sync_points(conn, sync_point1, - sync_point2) - conn.execute('COMMIT') - finally: - conn.isolation_level = orig_isolation_level + self._set_x_container_sync_points(conn, sync_point1, + sync_point2) + except sqlite3.OperationalError as err: + if 'no such column: x_container_sync_point' not in \ + str(err): + raise + self._migrate_add_container_sync_points(conn) + self._set_x_container_sync_points(conn, sync_point1, + sync_point2) + conn.commit() def _set_x_container_sync_points(self, conn, sync_point1, sync_point2): if sync_point1 is not None and sync_point2 is not None: @@ -337,6 +470,79 @@ class ContainerBroker(DatabaseBroker): SET x_container_sync_point2 = ? ''', (sync_point2,)) + def get_policy_stats(self): + with self.get() as conn: + try: + info = conn.execute(''' + SELECT storage_policy_index, object_count, bytes_used + FROM policy_stat + ''').fetchall() + except sqlite3.OperationalError as err: + if not any(msg in str(err) for msg in ( + "no such column: storage_policy_index", + "no such table: policy_stat")): + raise + info = conn.execute(''' + SELECT 0 as storage_policy_index, object_count, bytes_used + FROM container_stat + ''').fetchall() + policy_stats = {} + for row in info: + stats = dict(row) + key = stats.pop('storage_policy_index') + policy_stats[key] = stats + return policy_stats + + def has_multiple_policies(self): + with self.get() as conn: + try: + curs = conn.execute(''' + SELECT count(storage_policy_index) + FROM policy_stat + ''').fetchone() + except sqlite3.OperationalError as err: + if 'no such table: policy_stat' not in str(err): + raise + # no policy_stat row + return False + if curs and curs[0] > 1: + return True + # only one policy_stat row + return False + + def set_storage_policy_index(self, policy_index, timestamp=None): + """ + Update the container_stat policy_index and status_changed_at. + """ + if timestamp is None: + timestamp = normalize_timestamp(time.time()) + + def _setit(conn): + conn.execute(''' + INSERT OR IGNORE INTO policy_stat (storage_policy_index) + VALUES (?) + ''', (policy_index,)) + conn.execute(''' + UPDATE container_stat + SET storage_policy_index = ?, + status_changed_at = MAX(?, status_changed_at) + WHERE storage_policy_index <> ? + ''', (policy_index, timestamp, policy_index)) + conn.commit() + + with self.get() as conn: + try: + _setit(conn) + except sqlite3.OperationalError as err: + if not any(msg in str(err) for msg in ( + "no such column: storage_policy_index", + "no such table: policy_stat")): + raise + self._migrate_add_storage_policy(conn) + _setit(conn) + + self._storage_policy_index = policy_index + def reported(self, put_timestamp, delete_timestamp, object_count, bytes_used): """ @@ -356,7 +562,7 @@ class ContainerBroker(DatabaseBroker): conn.commit() def list_objects_iter(self, limit, marker, end_marker, prefix, delimiter, - path=None): + path=None, storage_policy_index=0): """ Get a list of objects sorted by name starting at marker onward, up to limit entries. Entries will begin with the prefix and will not @@ -409,9 +615,27 @@ class ContainerBroker(DatabaseBroker): query += ' +deleted = 0' else: query += ' deleted = 0' - query += ' ORDER BY name LIMIT ?' - query_args.append(limit - len(results)) - curs = conn.execute(query, query_args) + orig_tail_query = ''' + ORDER BY name LIMIT ? + ''' + orig_tail_args = [limit - len(results)] + # storage policy filter + policy_tail_query = ''' + AND storage_policy_index = ? + ''' + orig_tail_query + policy_tail_args = [storage_policy_index] + orig_tail_args + tail_query, tail_args = \ + policy_tail_query, policy_tail_args + try: + curs = conn.execute(query + tail_query, + tuple(query_args + tail_args)) + except sqlite3.OperationalError as err: + if 'no such column: storage_policy_index' not in str(err): + raise + tail_query, tail_args = \ + orig_tail_query, orig_tail_args + curs = conn.execute(query + tail_query, + tuple(query_args + tail_args)) curs.row_factory = None if prefix is None: @@ -466,26 +690,34 @@ class ContainerBroker(DatabaseBroker): 'size', 'content_type', 'etag', 'deleted'} :param source: if defined, update incoming_sync with the source """ - with self.get() as conn: + def _really_merge_items(conn): max_rowid = -1 for rec in item_list: + rec.setdefault('storage_policy_index', 0) # legacy query = ''' DELETE FROM object WHERE name = ? AND (created_at < ?) + AND storage_policy_index = ? ''' if self.get_db_version(conn) >= 1: query += ' AND deleted IN (0, 1)' - conn.execute(query, (rec['name'], rec['created_at'])) - query = 'SELECT 1 FROM object WHERE name = ?' + conn.execute(query, (rec['name'], rec['created_at'], + rec['storage_policy_index'])) + query = ''' + SELECT 1 FROM object WHERE name = ? + AND storage_policy_index = ? + ''' if self.get_db_version(conn) >= 1: query += ' AND deleted IN (0, 1)' - if not conn.execute(query, (rec['name'],)).fetchall(): + if not conn.execute(query, ( + rec['name'], rec['storage_policy_index'])).fetchall(): conn.execute(''' INSERT INTO object (name, created_at, size, - content_type, etag, deleted) - VALUES (?, ?, ?, ?, ?, ?) + content_type, etag, deleted, storage_policy_index) + VALUES (?, ?, ?, ?, ?, ?, ?) ''', ([rec['name'], rec['created_at'], rec['size'], - rec['content_type'], rec['etag'], rec['deleted']])) + rec['content_type'], rec['etag'], rec['deleted'], + rec['storage_policy_index']])) if source: max_rowid = max(max_rowid, rec['ROWID']) if source: @@ -500,3 +732,158 @@ class ContainerBroker(DatabaseBroker): WHERE remote_id=? ''', (max_rowid, source)) conn.commit() + + with self.get() as conn: + try: + return _really_merge_items(conn) + except sqlite3.OperationalError as err: + if 'no such column: storage_policy_index' not in str(err): + raise + self._migrate_add_storage_policy(conn) + return _really_merge_items(conn) + + def get_reconciler_sync(self): + with self.get() as conn: + try: + return conn.execute(''' + SELECT reconciler_sync_point FROM container_stat + ''').fetchone()[0] + except sqlite3.OperationalError as err: + if "no such column: reconciler_sync_point" not in str(err): + raise + return -1 + + def update_reconciler_sync(self, point): + query = ''' + UPDATE container_stat + SET reconciler_sync_point = ? + ''' + with self.get() as conn: + try: + conn.execute(query, (point,)) + except sqlite3.OperationalError as err: + if "no such column: reconciler_sync_point" not in str(err): + raise + self._migrate_add_storage_policy(conn) + conn.execute(query, (point,)) + conn.commit() + + def get_misplaced_since(self, start, count): + """ + Get a list of objects which are in a storage policy different + from the container's storage policy. + + :param start: last reconciler sync point + :param count: maximum number of entries to get + + :returns: list of dicts with keys: name, created_at, size, + content_type, etag, storage_policy_index + """ + qry = ''' + SELECT ROWID, name, created_at, size, content_type, etag, + deleted, storage_policy_index + FROM object + WHERE ROWID > ? + AND storage_policy_index != ( + SELECT storage_policy_index FROM container_stat LIMIT 1) + ORDER BY ROWID ASC LIMIT ? + ''' + self._commit_puts_stale_ok() + with self.get() as conn: + try: + cur = conn.execute(qry, (start, count)) + except sqlite3.OperationalError as err: + if "no such column: storage_policy_index" not in str(err): + raise + return [] + return list(dict(row) for row in cur.fetchall()) + + def _migrate_add_container_sync_points(self, conn): + """ + Add the x_container_sync_point columns to the 'container_stat' table. + """ + conn.executescript(''' + BEGIN; + ALTER TABLE container_stat + ADD COLUMN x_container_sync_point1 INTEGER DEFAULT -1; + ALTER TABLE container_stat + ADD COLUMN x_container_sync_point2 INTEGER DEFAULT -1; + COMMIT; + ''') + + def _migrate_add_storage_policy(self, conn): + """ + Migrate the container schema to support tracking objects from + multiple storage policies. If the container_stat table has any + pending migrations, they are applied now before copying into + container_info. + + * create the 'policy_stat' table. + * copy the current 'object_count' and 'bytes_used' columns to a + row in the 'policy_stat' table. + * add the storage_policy_index column to the 'object' table. + * drop the 'object_insert' and 'object_delete' triggers. + * add the 'object_insert_policy_stat' and + 'object_delete_policy_stat' triggers. + * create container_info table for non-policy container info + * insert values from container_stat into container_info + * drop container_stat table + * create container_stat view + """ + + # I tried just getting the list of column names in the current + # container_stat table with a pragma table_info, but could never get + # it inside the same transaction as the DDL (non-DML) statements: + # https://docs.python.org/2/library/sqlite3.html + # #controlling-transactions + # So we just apply all pending migrations to container_stat and copy a + # static known list of column names into container_info. + try: + self._migrate_add_container_sync_points(conn) + except sqlite3.OperationalError as e: + if 'duplicate column' in str(e): + conn.execute('ROLLBACK;') + else: + raise + + try: + conn.executescript(""" + ALTER TABLE container_stat + ADD COLUMN metadata TEXT DEFAULT ''; + """) + except sqlite3.OperationalError as e: + if 'duplicate column' not in str(e): + raise + + column_names = ', '.join(( + 'account', 'container', 'created_at', 'put_timestamp', + 'delete_timestamp', 'reported_put_timestamp', + 'reported_object_count', 'reported_bytes_used', 'hash', 'id', + 'status', 'status_changed_at', 'metadata', + 'x_container_sync_point1', 'x_container_sync_point2')) + + conn.executescript( + 'BEGIN;' + + POLICY_STAT_TABLE_CREATE + + ''' + INSERT INTO policy_stat ( + storage_policy_index, object_count, bytes_used) + SELECT 0, object_count, bytes_used + FROM container_stat; + + ALTER TABLE object + ADD COLUMN storage_policy_index INTEGER DEFAULT 0; + + DROP TRIGGER object_insert; + DROP TRIGGER object_delete; + ''' + + POLICY_STAT_TRIGGER_SCRIPT + + CONTAINER_INFO_TABLE_SCRIPT + + ''' + INSERT INTO container_info (%s) + SELECT %s FROM container_stat; + + DROP TABLE IF EXISTS container_stat; + ''' % (column_names, column_names) + + CONTAINER_STAT_VIEW_SCRIPT + + 'COMMIT;') diff --git a/swift/container/server.py b/swift/container/server.py index 94763a1631..2a4eb375ad 100644 --- a/swift/container/server.py +++ b/swift/container/server.py @@ -38,12 +38,40 @@ from swift.common.bufferedhttp import http_connect from swift.common.exceptions import ConnectionTimeout from swift.common.db_replicator import ReplicatorRpc from swift.common.http import HTTP_NOT_FOUND, is_success +from swift.common.storage_policy import POLICIES, POLICY_INDEX from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPConflict, \ HTTPCreated, HTTPInternalServerError, HTTPNoContent, HTTPNotFound, \ HTTPPreconditionFailed, HTTPMethodNotAllowed, Request, Response, \ HTTPInsufficientStorage, HTTPException, HeaderKeyDict +def gen_resp_headers(info, is_deleted=False): + """ + Convert container info dict to headers. + """ + # backend headers are always included + headers = { + 'X-Backend-Timestamp': normalize_timestamp(info.get('created_at', 0)), + 'X-Backend-PUT-Timestamp': normalize_timestamp( + info.get('put_timestamp', 0)), + 'X-Backend-DELETE-Timestamp': normalize_timestamp( + info.get('delete_timestamp', 0)), + 'X-Backend-Status-Changed-At': normalize_timestamp( + info.get('status_changed_at', 0)), + POLICY_INDEX: info.get('storage_policy_index', 0), + } + if not is_deleted: + # base container info on deleted containers is not exposed to client + headers.update({ + 'X-Container-Object-Count': info.get('object_count', 0), + 'X-Container-Bytes-Used': info.get('bytes_used', 0), + 'X-Timestamp': normalize_timestamp(info.get('created_at', 0)), + 'X-PUT-Timestamp': normalize_timestamp( + info.get('put_timestamp', 0)), + }) + return headers + + class ContainerController(object): """WSGI Controller for the container server.""" @@ -102,6 +130,32 @@ class ContainerController(object): kwargs.setdefault('logger', self.logger) return ContainerBroker(db_path, **kwargs) + def get_and_validate_policy_index(self, req): + """ + Validate that the index supplied maps to a policy. + + :returns: policy index from request, or None if not present + :raises: HTTPBadRequest if the supplied index is bogus + """ + + policy_index = req.headers.get(POLICY_INDEX, None) + if policy_index is None: + return None + + try: + policy_index = int(policy_index) + except ValueError: + raise HTTPBadRequest( + request=req, content_type="text/plain", + body=("Invalid X-Storage-Policy-Index %r" % policy_index)) + + policy = POLICIES.get_by_index(policy_index) + if policy is None: + raise HTTPBadRequest( + request=req, content_type="text/plain", + body=("Invalid X-Storage-Policy-Index %r" % policy_index)) + return int(policy) + def account_update(self, req, account, container, broker): """ Update the account server(s) with latest container info. @@ -199,9 +253,13 @@ class ContainerController(object): broker = self._get_container_broker(drive, part, account, container) if account.startswith(self.auto_create_account_prefix) and obj and \ not os.path.exists(broker.db_file): + requested_policy_index = (self.get_and_validate_policy_index(req) + or POLICIES.default.idx) try: - broker.initialize(normalize_timestamp( - req.headers.get('x-timestamp') or time.time())) + broker.initialize( + normalize_timestamp( + req.headers.get('x-timestamp') or time.time()), + requested_policy_index) except DatabaseAlreadyExists: pass if not os.path.exists(broker.db_file): @@ -225,19 +283,42 @@ class ContainerController(object): return HTTPNoContent(request=req) return HTTPNotFound() - def _update_or_create(self, req, broker, timestamp): + def _update_or_create(self, req, broker, timestamp, new_container_policy, + requested_policy_index): + """ + Create new database broker or update timestamps for existing database. + + :param req: the swob request object + :param broker: the broker instance for the container + :param timestamp: internalized timestamp + :param new_container_policy: the storage policy index to use + when creating the container + :param requested_policy_index: the storage policy index sent in the + request, may be None + :returns: created, a bool, if database did not previously exist + """ if not os.path.exists(broker.db_file): try: - broker.initialize(timestamp) + broker.initialize(timestamp, new_container_policy) except DatabaseAlreadyExists: pass else: return True # created - created = broker.is_deleted() + recreated = broker.is_deleted() + if recreated: + # only set storage policy on deleted containers + broker.set_storage_policy_index(new_container_policy, + timestamp=timestamp) + elif requested_policy_index is not None: + # validate requested policy with existing container + if requested_policy_index != broker.storage_policy_index: + raise HTTPConflict(request=req) broker.update_put_timestamp(timestamp) if broker.is_deleted(): raise HTTPConflict(request=req) - return created + if recreated: + broker.update_status_changed_at(timestamp) + return recreated @public @timing_stats() @@ -257,13 +338,14 @@ class ContainerController(object): return HTTPBadRequest(err) if self.mount_check and not check_mount(self.root, drive): return HTTPInsufficientStorage(drive=drive, request=req) + requested_policy_index = self.get_and_validate_policy_index(req) timestamp = normalize_timestamp(req.headers['x-timestamp']) broker = self._get_container_broker(drive, part, account, container) if obj: # put container object if account.startswith(self.auto_create_account_prefix) and \ not os.path.exists(broker.db_file): try: - broker.initialize(timestamp) + broker.initialize(timestamp, 0) except DatabaseAlreadyExists: pass if not os.path.exists(broker.db_file): @@ -273,20 +355,28 @@ class ContainerController(object): req.headers['x-etag']) return HTTPCreated(request=req) else: # put container - created = self._update_or_create(req, broker, timestamp) + if requested_policy_index is None: + # use the default index sent by the proxy if available + new_container_policy = req.headers.get( + 'X-Backend-Storage-Policy-Default', int(POLICIES.default)) + else: + new_container_policy = requested_policy_index + created = self._update_or_create(req, broker, + timestamp, + new_container_policy, + requested_policy_index) metadata = {} metadata.update( (key, (value, timestamp)) for key, value in req.headers.iteritems() if key.lower() in self.save_headers or is_sys_or_user_meta('container', key)) - if metadata: - if 'X-Container-Sync-To' in metadata: - if 'X-Container-Sync-To' not in broker.metadata or \ - metadata['X-Container-Sync-To'][0] != \ - broker.metadata['X-Container-Sync-To'][0]: - broker.set_x_container_sync_points(-1, -1) - broker.update_metadata(metadata) + if 'X-Container-Sync-To' in metadata: + if 'X-Container-Sync-To' not in broker.metadata or \ + metadata['X-Container-Sync-To'][0] != \ + broker.metadata['X-Container-Sync-To'][0]: + broker.set_x_container_sync_points(-1, -1) + broker.update_metadata(metadata) resp = self.account_update(req, account, container, broker) if resp: return resp @@ -307,15 +397,10 @@ class ContainerController(object): broker = self._get_container_broker(drive, part, account, container, pending_timeout=0.1, stale_reads_ok=True) - if broker.is_deleted(): - return HTTPNotFound(request=req) - info = broker.get_info() - headers = { - 'X-Container-Object-Count': info['object_count'], - 'X-Container-Bytes-Used': info['bytes_used'], - 'X-Timestamp': info['created_at'], - 'X-PUT-Timestamp': info['put_timestamp'], - } + info, is_deleted = broker.get_info_is_deleted() + headers = gen_resp_headers(info, is_deleted=is_deleted) + if is_deleted: + return HTTPNotFound(request=req, headers=headers) headers.update( (key, value) for key, (value, timestamp) in broker.metadata.iteritems() @@ -377,22 +462,17 @@ class ContainerController(object): broker = self._get_container_broker(drive, part, account, container, pending_timeout=0.1, stale_reads_ok=True) - if broker.is_deleted(): - return HTTPNotFound(request=req) - info = broker.get_info() + info, is_deleted = broker.get_info_is_deleted() + resp_headers = gen_resp_headers(info, is_deleted=is_deleted) + if is_deleted: + return HTTPNotFound(request=req, headers=resp_headers) container_list = broker.list_objects_iter(limit, marker, end_marker, prefix, delimiter, path) - return self.create_listing(req, out_content_type, info, + return self.create_listing(req, out_content_type, info, resp_headers, broker.metadata, container_list, container) - def create_listing(self, req, out_content_type, info, metadata, - container_list, container): - resp_headers = { - 'X-Container-Object-Count': info['object_count'], - 'X-Container-Bytes-Used': info['bytes_used'], - 'X-Timestamp': info['created_at'], - 'X-PUT-Timestamp': info['put_timestamp'], - } + def create_listing(self, req, out_content_type, info, resp_headers, + metadata, container_list, container): for key, (value, timestamp) in metadata.iteritems(): if value and (key.lower() in self.save_headers or is_sys_or_user_meta('container', key)): diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 139d6609f0..a45a3a2f95 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -50,6 +50,7 @@ from swift.common.swob import Request, Response, HeaderKeyDict, Range, \ HTTPException, HTTPRequestedRangeNotSatisfiable from swift.common.request_helpers import strip_sys_meta_prefix, \ strip_user_meta_prefix, is_user_meta, is_sys_meta, is_sys_or_user_meta +from swift.common.storage_policy import POLICY_INDEX, POLICY, POLICIES def update_headers(response, headers): @@ -1201,6 +1202,16 @@ class Controller(object): container, obj, res) except ValueError: pass + # if a backend policy index is present in resp headers, translate it + # here with the friendly policy name + if POLICY_INDEX in res.headers and is_success(res.status_int): + policy = POLICIES.get_by_index(res.headers[POLICY_INDEX]) + if policy: + res.headers[POLICY] = policy.name + else: + self.app.logger.error( + 'Could not translate %s (%r) from %r to policy', + POLICY_INDEX, res.headers[POLICY_INDEX], path) return res def is_origin_allowed(self, cors_info, origin): diff --git a/swift/proxy/controllers/container.py b/swift/proxy/controllers/container.py index 85e31fe4d7..77df764815 100644 --- a/swift/proxy/controllers/container.py +++ b/swift/proxy/controllers/container.py @@ -23,6 +23,7 @@ from swift.common import constraints from swift.common.http import HTTP_ACCEPTED from swift.proxy.controllers.base import Controller, delay_denial, \ cors_validation, clear_info_cache +from swift.common.storage_policy import POLICIES, POLICY, POLICY_INDEX from swift.common.swob import HTTPBadRequest, HTTPForbidden, \ HTTPNotFound @@ -34,7 +35,7 @@ class ContainerController(Controller): # Ensure these are all lowercase pass_through_headers = ['x-container-read', 'x-container-write', 'x-container-sync-key', 'x-container-sync-to', - 'x-versions-location'] + 'x-versions-location', POLICY_INDEX.lower()] def __init__(self, app, account_name, container_name, **kwargs): Controller.__init__(self, app) @@ -47,6 +48,27 @@ class ContainerController(Controller): 'x-remove-%s-write' % st, 'x-remove-versions-location'] + def _convert_policy_to_index(self, req): + """ + Helper method to convert a policy name (from a request from a client) + to a policy index (for a request to a backend). + + :param req: incoming request + """ + policy_name = req.headers.get(POLICY) + if not policy_name: + return + policy = POLICIES.get_by_name(policy_name) + if not policy: + raise HTTPBadRequest(request=req, + content_type="text/plain", + body=("Invalid %s '%s'" + % (POLICY, policy_name))) + if policy.is_deprecated: + body = 'Storage Policy %r is deprecated' % (policy.name) + raise HTTPBadRequest(request=req, body=body) + return int(policy) + def clean_acls(self, req): if 'swift.clean_acl' in req.environ: for header in ('x-container-read', 'x-container-write'): @@ -101,6 +123,7 @@ class ContainerController(Controller): self.clean_acls(req) or check_metadata(req, 'container') if error_response: return error_response + policy_index = self._convert_policy_to_index(req) if not req.environ.get('swift_owner'): for key in self.app.swift_owner_headers: req.headers.pop(key, None) @@ -128,7 +151,8 @@ class ContainerController(Controller): container_partition, containers = self.app.container_ring.get_nodes( self.account_name, self.container_name) headers = self._backend_requests(req, len(containers), - account_partition, accounts) + account_partition, accounts, + policy_index) clear_info_cache(self.app, req.environ, self.account_name, self.container_name) resp = self.make_requests( @@ -183,9 +207,14 @@ class ContainerController(Controller): return HTTPNotFound(request=req) return resp - def _backend_requests(self, req, n_outgoing, - account_partition, accounts): + def _backend_requests(self, req, n_outgoing, account_partition, accounts, + policy_index=None): additional = {'X-Timestamp': normalize_timestamp(time.time())} + if policy_index is None: + additional['X-Backend-Storage-Policy-Default'] = \ + int(POLICIES.default) + else: + additional[POLICY_INDEX] = str(policy_index) headers = [self.generate_request_headers(req, transfer=True, additional=additional) for _junk in range(n_outgoing)] diff --git a/test/unit/__init__.py b/test/unit/__init__.py index b3e6a41bc5..d44ee7346c 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -22,6 +22,7 @@ import errno import sys from contextlib import contextmanager, closing from collections import defaultdict, Iterable +from numbers import Number from tempfile import NamedTemporaryFile import time from eventlet.green import socket @@ -34,7 +35,6 @@ from hashlib import md5 from eventlet import sleep, Timeout import logging.handlers from httplib import HTTPException -from numbers import Number from swift.common import storage_policy import functools import cPickle as pickle @@ -327,6 +327,22 @@ def temptree(files, contents=''): rmtree(tempdir) +def with_tempdir(f): + """ + Decorator to give a single test a tempdir as argument to test method. + """ + @functools.wraps(f) + def wrapped(*args, **kwargs): + tempdir = mkdtemp() + args = list(args) + args.append(tempdir) + try: + return f(*args, **kwargs) + finally: + rmtree(tempdir) + return wrapped + + class NullLoggingHandler(logging.Handler): def emit(self, record): diff --git a/test/unit/common/ring/test_ring.py b/test/unit/common/ring/test_ring.py index 1892d19923..7f938f0cfe 100644 --- a/test/unit/common/ring/test_ring.py +++ b/test/unit/common/ring/test_ring.py @@ -28,6 +28,19 @@ from time import sleep, time from swift.common import ring, utils +class TestRingBase(unittest.TestCase): + + def setUp(self): + self._orig_hash_suffix = utils.HASH_PATH_SUFFIX + self._orig_hash_prefix = utils.HASH_PATH_PREFIX + utils.HASH_PATH_SUFFIX = 'endcap' + utils.HASH_PATH_PREFIX = '' + + def tearDown(self): + utils.HASH_PATH_SUFFIX = self._orig_hash_suffix + utils.HASH_PATH_PREFIX = self._orig_hash_prefix + + class TestRingData(unittest.TestCase): def setUp(self): @@ -109,11 +122,10 @@ class TestRingData(unittest.TestCase): '0644') -class TestRing(unittest.TestCase): +class TestRing(TestRingBase): def setUp(self): - utils.HASH_PATH_SUFFIX = 'endcap' - utils.HASH_PATH_PREFIX = '' + super(TestRing, self).setUp() self.testdir = mkdtemp() self.testgz = os.path.join(self.testdir, 'whatever.ring.gz') self.intended_replica2part2dev_id = [ @@ -147,6 +159,7 @@ class TestRing(unittest.TestCase): reload_time=self.intended_reload_time, ring_name='whatever') def tearDown(self): + super(TestRing, self).tearDown() rmtree(self.testdir, ignore_errors=1) def test_creation(self): diff --git a/test/unit/common/test_db.py b/test/unit/common/test_db.py index aa9ae11da4..5a9b79cd17 100644 --- a/test/unit/common/test_db.py +++ b/test/unit/common/test_db.py @@ -23,6 +23,9 @@ from uuid import uuid4 import simplejson import sqlite3 +import itertools +import time +import random from mock import patch, MagicMock from eventlet.timeout import Timeout @@ -31,7 +34,7 @@ import swift.common.db from swift.common.db import chexor, dict_factory, get_db_connection, \ DatabaseBroker, DatabaseConnectionError, DatabaseAlreadyExists, \ GreenDBConnection -from swift.common.utils import normalize_timestamp, mkdirs +from swift.common.utils import normalize_timestamp, mkdirs, json from swift.common.exceptions import LockTimeout @@ -82,6 +85,30 @@ class TestChexor(unittest.TestCase): 'd41d8cd98f00b204e9800998ecf8427e', None, normalize_timestamp(1)) + def test_chexor(self): + ts = (normalize_timestamp(ts) for ts in + itertools.count(int(time.time()))) + + objects = [ + ('frank', ts.next()), + ('bob', ts.next()), + ('tom', ts.next()), + ('frank', ts.next()), + ('tom', ts.next()), + ('bob', ts.next()), + ] + hash_ = '0' + random.shuffle(objects) + for obj in objects: + hash_ = chexor(hash_, *obj) + + other_hash = '0' + random.shuffle(objects) + for obj in objects: + other_hash = chexor(other_hash, *obj) + + self.assertEqual(hash_, other_hash) + class TestGreenDBConnection(unittest.TestCase): @@ -147,6 +174,28 @@ class TestGetDBConnection(unittest.TestCase): mock_db_cmd.call_count)) +class ExampleBroker(DatabaseBroker): + + db_type = 'test' + db_contains_type = 'test' + + def _initialize(self, conn, timestamp, **kwargs): + conn.executescript(''' + CREATE TABLE test_stat ( + name TEXT, + timestamp TEXT DEFAULT 0, + status_changed_at TEXT DEFAULT 0 + ); + CREATE TABLE test ( + ROWID INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT + ); + ''') + conn.execute(""" + INSERT INTO test_stat (name, timestamp) VALUES (?, ?) + """, (self.account, timestamp)) + + class TestDatabaseBroker(unittest.TestCase): def setUp(self): @@ -226,7 +275,7 @@ class TestDatabaseBroker(unittest.TestCase): broker.initialize, normalize_timestamp('1')) def test_delete_db(self): - def init_stub(conn, put_timestamp): + def init_stub(conn, put_timestamp, **kwargs): conn.execute('CREATE TABLE test (one TEXT)') conn.execute('CREATE TABLE test_stat (id TEXT)') conn.execute('INSERT INTO test_stat (id) VALUES (?)', @@ -383,7 +432,7 @@ class TestDatabaseBroker(unittest.TestCase): broker.db_contains_type = 'test' uuid1 = str(uuid4()) - def _initialize(conn, timestamp): + def _initialize(conn, timestamp, **kwargs): conn.execute('CREATE TABLE test (one TEXT)') conn.execute('CREATE TABLE test_stat (id TEXT)') conn.execute('INSERT INTO test_stat (id) VALUES (?)', (uuid1,)) @@ -433,7 +482,7 @@ class TestDatabaseBroker(unittest.TestCase): broker.db_type = 'test' broker.db_contains_type = 'test' - def _initialize(conn, timestamp): + def _initialize(conn, timestamp, **kwargs): conn.execute('CREATE TABLE test (one TEXT)') conn.execute('INSERT INTO test (one) VALUES ("1")') conn.execute('INSERT INTO test (one) VALUES ("2")') @@ -456,7 +505,7 @@ class TestDatabaseBroker(unittest.TestCase): broker.db_contains_type = 'test' uuid1 = str(uuid4()) - def _initialize(conn, timestamp): + def _initialize(conn, timestamp, **kwargs): conn.execute('CREATE TABLE test (one TEXT)') conn.execute('CREATE TABLE test_stat (id TEXT)') conn.execute('INSERT INTO test_stat (id) VALUES (?)', (uuid1,)) @@ -531,7 +580,7 @@ class TestDatabaseBroker(unittest.TestCase): broker_metadata = metadata and simplejson.dumps( {'Test': ('Value', normalize_timestamp(1))}) or '' - def _initialize(conn, put_timestamp): + def _initialize(conn, put_timestamp, **kwargs): if put_timestamp is None: put_timestamp = normalize_timestamp(0) conn.executescript(''' @@ -562,6 +611,7 @@ class TestDatabaseBroker(unittest.TestCase): created_at TEXT, put_timestamp TEXT DEFAULT '0', delete_timestamp TEXT DEFAULT '0', + status_changed_at TEXT DEFAULT '0', test_count INTEGER, hash TEXT default '00000000000000000000000000000000', id TEXT @@ -571,8 +621,10 @@ class TestDatabaseBroker(unittest.TestCase): ''' % (metadata and ", metadata TEXT DEFAULT ''" or "")) conn.execute(''' UPDATE test_stat - SET account = ?, created_at = ?, id = ?, put_timestamp = ? - ''', (broker.account, broker_creation, broker_uuid, put_timestamp)) + SET account = ?, created_at = ?, id = ?, put_timestamp = ?, + status_changed_at = ? + ''', (broker.account, broker_creation, broker_uuid, put_timestamp, + put_timestamp)) if metadata: conn.execute('UPDATE test_stat SET metadata = ?', (broker_metadata,)) @@ -582,11 +634,11 @@ class TestDatabaseBroker(unittest.TestCase): broker.initialize(put_timestamp) info = broker.get_replication_info() self.assertEquals(info, { - 'count': 0, + 'account': broker.account, 'count': 0, 'hash': '00000000000000000000000000000000', 'created_at': broker_creation, 'put_timestamp': put_timestamp, - 'delete_timestamp': '0', 'max_row': -1, 'id': broker_uuid, - 'metadata': broker_metadata}) + 'delete_timestamp': '0', 'status_changed_at': put_timestamp, + 'max_row': -1, 'id': broker_uuid, 'metadata': broker_metadata}) insert_timestamp = normalize_timestamp(3) with broker.get() as conn: conn.execute(''' @@ -595,21 +647,21 @@ class TestDatabaseBroker(unittest.TestCase): conn.commit() info = broker.get_replication_info() self.assertEquals(info, { - 'count': 1, + 'account': broker.account, 'count': 1, 'hash': 'bdc4c93f574b0d8c2911a27ce9dd38ba', 'created_at': broker_creation, 'put_timestamp': put_timestamp, - 'delete_timestamp': '0', 'max_row': 1, 'id': broker_uuid, - 'metadata': broker_metadata}) + 'delete_timestamp': '0', 'status_changed_at': put_timestamp, + 'max_row': 1, 'id': broker_uuid, 'metadata': broker_metadata}) with broker.get() as conn: conn.execute('DELETE FROM test') conn.commit() info = broker.get_replication_info() self.assertEquals(info, { - 'count': 0, + 'account': broker.account, 'count': 0, 'hash': '00000000000000000000000000000000', 'created_at': broker_creation, 'put_timestamp': put_timestamp, - 'delete_timestamp': '0', 'max_row': 1, 'id': broker_uuid, - 'metadata': broker_metadata}) + 'delete_timestamp': '0', 'status_changed_at': put_timestamp, + 'max_row': 1, 'id': broker_uuid, 'metadata': broker_metadata}) return broker def test_metadata(self): @@ -679,6 +731,74 @@ class TestDatabaseBroker(unittest.TestCase): [first_value, first_timestamp]) self.assert_('Second' not in broker.metadata) + def test_get_max_row(self): + broker = ExampleBroker(':memory:') + broker.initialize(0) + self.assertEquals(-1, broker.get_max_row()) + with broker.get() as conn: + conn.execute(''' + INSERT INTO test (name) VALUES (?) + ''', ('test_name',)) + conn.commit() + self.assertEquals(1, broker.get_max_row()) + with broker.get() as conn: + conn.executescript(''' + DELETE FROM test; + ''') + conn.commit() + self.assertEquals(1, broker.get_max_row()) + with broker.get() as conn: + conn.execute(''' + INSERT INTO test (name) VALUES (?) + ''', ('test_name',)) + conn.commit() + self.assertEquals(2, broker.get_max_row()) + + def test_get_info(self): + broker = ExampleBroker(':memory:', account='test') + broker.initialize(normalize_timestamp(1)) + info = broker.get_info() + expected = { + 'name': 'test', + 'timestamp': '0000000001.00000', + 'status_changed_at': '0', + } + self.assertEqual(info, expected) + + def test_get_raw_metadata(self): + broker = ExampleBroker(':memory:', account='test') + broker.initialize(normalize_timestamp(0)) + self.assertEqual(broker.metadata, {}) + self.assertEqual(broker.get_raw_metadata(), '') + metadata = { + 'test': ['value', normalize_timestamp(1)] + } + broker.update_metadata(metadata) + self.assertEqual(broker.metadata, metadata) + self.assertEqual(broker.get_raw_metadata(), + json.dumps(metadata)) + + def test_status_changed_at(self): + ts = (normalize_timestamp(t) for t in + itertools.count(int(time.time()))) + broker = ExampleBroker(':memory:', account='test') + broker.initialize(ts.next()) + self.assertEquals(broker.get_info()['status_changed_at'], '0') + status_changed_at = ts.next() + broker.update_status_changed_at(status_changed_at) + self.assertEqual(broker.get_info()['status_changed_at'], + status_changed_at) + # save the old and get a new status_changed_at + old_status_changed_at, status_changed_at = \ + status_changed_at, ts.next() + broker.update_status_changed_at(status_changed_at) + self.assertEqual(broker.get_info()['status_changed_at'], + status_changed_at) + # status changed at won't go backwards... + broker.update_status_changed_at(old_status_changed_at) + self.assertEqual(broker.get_info()['status_changed_at'], + status_changed_at) + if __name__ == '__main__': unittest.main() diff --git a/test/unit/container/test_auditor.py b/test/unit/container/test_auditor.py index 5173141bfa..d1df39d2cd 100644 --- a/test/unit/container/test_auditor.py +++ b/test/unit/container/test_auditor.py @@ -21,8 +21,10 @@ import random from tempfile import mkdtemp from shutil import rmtree +from swift.common.utils import normalize_timestamp from swift.container import auditor -from test.unit import FakeLogger +from test.unit import debug_logger, with_tempdir +from test.unit.container import test_backend class FakeContainerBroker(object): @@ -45,7 +47,7 @@ class TestAuditor(unittest.TestCase): def setUp(self): self.testdir = os.path.join(mkdtemp(), 'tmp_test_container_auditor') - self.logger = FakeLogger() + self.logger = debug_logger() rmtree(self.testdir, ignore_errors=1) os.mkdir(self.testdir) fnames = ['true1.db', 'true2.db', 'true3.db', @@ -78,7 +80,7 @@ class TestAuditor(unittest.TestCase): return time.time() conf = {} - test_auditor = auditor.ContainerAuditor(conf) + test_auditor = auditor.ContainerAuditor(conf, logger=self.logger) with mock.patch('swift.container.auditor.time', FakeTime()): def fake_audit_location_generator(*args, **kwargs): @@ -94,7 +96,7 @@ class TestAuditor(unittest.TestCase): @mock.patch('swift.container.auditor.ContainerBroker', FakeContainerBroker) def test_run_once(self): conf = {} - test_auditor = auditor.ContainerAuditor(conf) + test_auditor = auditor.ContainerAuditor(conf, logger=self.logger) def fake_audit_location_generator(*args, **kwargs): files = os.listdir(self.testdir) @@ -109,7 +111,7 @@ class TestAuditor(unittest.TestCase): @mock.patch('swift.container.auditor.ContainerBroker', FakeContainerBroker) def test_container_auditor(self): conf = {} - test_auditor = auditor.ContainerAuditor(conf) + test_auditor = auditor.ContainerAuditor(conf, logger=self.logger) files = os.listdir(self.testdir) for f in files: path = os.path.join(self.testdir, f) @@ -117,5 +119,45 @@ class TestAuditor(unittest.TestCase): self.assertEquals(test_auditor.container_failures, 2) self.assertEquals(test_auditor.container_passes, 3) + +class TestAuditorMigrations(unittest.TestCase): + + @with_tempdir + def test_db_migration(self, tempdir): + db_path = os.path.join(tempdir, 'sda', 'containers', '0', '0', '0', + 'test.db') + with test_backend.TestContainerBrokerBeforeSPI.old_broker() as \ + old_ContainerBroker: + broker = old_ContainerBroker(db_path, account='a', container='c') + broker.initialize(normalize_timestamp(0), -1) + + with broker.get() as conn: + try: + conn.execute('SELECT storage_policy_index ' + 'FROM container_stat') + except Exception as err: + self.assert_('no such column: storage_policy_index' in + str(err)) + else: + self.fail('TestContainerBrokerBeforeSPI broker class ' + 'was already migrated') + + conf = {'devices': tempdir, 'mount_check': False} + test_auditor = auditor.ContainerAuditor(conf, logger=debug_logger()) + test_auditor.run_once() + + broker = auditor.ContainerBroker(db_path, account='a', container='c') + info = broker.get_info() + expected = { + 'account': 'a', + 'container': 'c', + 'object_count': 0, + 'bytes_used': 0, + 'storage_policy_index': 0, + } + for k, v in expected.items(): + self.assertEqual(info[k], v) + + if __name__ == '__main__': unittest.main() diff --git a/test/unit/container/test_backend.py b/test/unit/container/test_backend.py index 56f2c584ec..c159477d73 100644 --- a/test/unit/container/test_backend.py +++ b/test/unit/container/test_backend.py @@ -15,13 +15,25 @@ """ Tests for swift.container.backend """ +import os import hashlib import unittest from time import sleep, time from uuid import uuid4 +import itertools +import random +from collections import defaultdict +from contextlib import contextmanager +import sqlite3 +import pickle from swift.container.backend import ContainerBroker from swift.common.utils import normalize_timestamp +from swift.common.storage_policy import POLICIES + +import mock + +from test.unit import patch_policies, with_tempdir class TestContainerBroker(unittest.TestCase): @@ -31,18 +43,41 @@ class TestContainerBroker(unittest.TestCase): # Test ContainerBroker.__init__ broker = ContainerBroker(':memory:', account='a', container='c') self.assertEqual(broker.db_file, ':memory:') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) with broker.get() as conn: curs = conn.cursor() curs.execute('SELECT 1') self.assertEqual(curs.fetchall()[0][0], 1) + @patch_policies + def test_storage_policy_property(self): + ts = itertools.count(1) + for policy in POLICIES: + broker = ContainerBroker(':memory:', account='a', + container='policy_%s' % policy.name) + broker.initialize(normalize_timestamp(ts.next()), policy.idx) + with broker.get() as conn: + try: + conn.execute('''SELECT storage_policy_index + FROM container_stat''') + except Exception: + is_migrated = False + else: + is_migrated = True + if not is_migrated: + # pre spi tests don't set policy on initialize + broker.set_storage_policy_index(policy.idx) + self.assertEqual(policy.idx, broker.storage_policy_index) + # make sure it's cached + with mock.patch.object(broker, 'get'): + self.assertEqual(policy.idx, broker.storage_policy_index) + def test_exception(self): # Test ContainerBroker throwing a conn away after # unhandled exception first_conn = None broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) with broker.get() as conn: first_conn = conn try: @@ -56,7 +91,7 @@ class TestContainerBroker(unittest.TestCase): def test_empty(self): # Test ContainerBroker.empty broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) self.assert_(broker.empty()) broker.put_object('o', normalize_timestamp(time()), 0, 'text/plain', 'd41d8cd98f00b204e9800998ecf8427e') @@ -68,7 +103,7 @@ class TestContainerBroker(unittest.TestCase): def test_reclaim(self): broker = ContainerBroker(':memory:', account='test_account', container='test_container') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) broker.put_object('o', normalize_timestamp(time()), 0, 'text/plain', 'd41d8cd98f00b204e9800998ecf8427e') with broker.get() as conn: @@ -125,10 +160,56 @@ class TestContainerBroker(unittest.TestCase): broker.reclaim(normalize_timestamp(time()), time()) broker.delete_db(normalize_timestamp(time())) + def test_get_info_is_deleted(self): + start = int(time()) + ts = (normalize_timestamp(t) for t in itertools.count(start)) + broker = ContainerBroker(':memory:', account='test_account', + container='test_container') + # create it + broker.initialize(ts.next(), POLICIES.default.idx) + info, is_deleted = broker.get_info_is_deleted() + self.assertEqual(is_deleted, broker.is_deleted()) + self.assertEqual(is_deleted, False) # sanity + self.assertEqual(info, broker.get_info()) + self.assertEqual(info['put_timestamp'], normalize_timestamp(start)) + self.assert_(float(info['created_at']) >= start) + self.assertEqual(info['delete_timestamp'], '0') + if self.__class__ in (TestContainerBrokerBeforeMetadata, + TestContainerBrokerBeforeXSync, + TestContainerBrokerBeforeSPI): + self.assertEqual(info['status_changed_at'], '0') + else: + self.assertEqual(info['status_changed_at'], + normalize_timestamp(start)) + + # delete it + delete_timestamp = ts.next() + broker.delete_db(delete_timestamp) + info, is_deleted = broker.get_info_is_deleted() + self.assertEqual(is_deleted, True) # sanity + self.assertEqual(is_deleted, broker.is_deleted()) + self.assertEqual(info, broker.get_info()) + self.assertEqual(info['put_timestamp'], normalize_timestamp(start)) + self.assert_(float(info['created_at']) >= start) + self.assertEqual(info['delete_timestamp'], delete_timestamp) + self.assertEqual(info['status_changed_at'], delete_timestamp) + + # bring back to life + broker.put_object('obj', ts.next(), 0, 'text/plain', 'etag', + storage_policy_index=broker.storage_policy_index) + info, is_deleted = broker.get_info_is_deleted() + self.assertEqual(is_deleted, False) # sanity + self.assertEqual(is_deleted, broker.is_deleted()) + self.assertEqual(info, broker.get_info()) + self.assertEqual(info['put_timestamp'], normalize_timestamp(start)) + self.assert_(float(info['created_at']) >= start) + self.assertEqual(info['delete_timestamp'], delete_timestamp) + self.assertEqual(info['status_changed_at'], delete_timestamp) + def test_delete_object(self): # Test ContainerBroker.delete_object broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) broker.put_object('o', normalize_timestamp(time()), 0, 'text/plain', 'd41d8cd98f00b204e9800998ecf8427e') with broker.get() as conn: @@ -151,7 +232,7 @@ class TestContainerBroker(unittest.TestCase): def test_put_object(self): # Test ContainerBroker.put_object broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) # Create initial object timestamp = normalize_timestamp(time()) @@ -347,16 +428,150 @@ class TestContainerBroker(unittest.TestCase): self.assertEquals(conn.execute( "SELECT deleted FROM object").fetchone()[0], 0) + @patch_policies + def test_put_misplaced_object_does_not_effect_container_stats(self): + policy = random.choice(list(POLICIES)) + ts = (normalize_timestamp(t) for t in + itertools.count(int(time()))) + broker = ContainerBroker(':memory:', + account='a', container='c') + broker.initialize(ts.next(), policy.idx) + # migration tests may not honor policy on initialize + if isinstance(self, ContainerBrokerMigrationMixin): + real_storage_policy_index = \ + broker.get_info()['storage_policy_index'] + policy = filter(lambda p: p.idx == real_storage_policy_index, + POLICIES)[0] + broker.put_object('correct_o', ts.next(), 123, 'text/plain', + '5af83e3196bf99f440f31f2e1a6c9afe', + storage_policy_index=policy.idx) + info = broker.get_info() + self.assertEqual(1, info['object_count']) + self.assertEqual(123, info['bytes_used']) + other_policy = random.choice([p for p in POLICIES + if p is not policy]) + broker.put_object('wrong_o', ts.next(), 123, 'text/plain', + '5af83e3196bf99f440f31f2e1a6c9afe', + storage_policy_index=other_policy.idx) + self.assertEqual(1, info['object_count']) + self.assertEqual(123, info['bytes_used']) + + @patch_policies + def test_has_multiple_policies(self): + policy = random.choice(list(POLICIES)) + ts = (normalize_timestamp(t) for t in + itertools.count(int(time()))) + broker = ContainerBroker(':memory:', + account='a', container='c') + broker.initialize(ts.next(), policy.idx) + # migration tests may not honor policy on initialize + if isinstance(self, ContainerBrokerMigrationMixin): + real_storage_policy_index = \ + broker.get_info()['storage_policy_index'] + policy = filter(lambda p: p.idx == real_storage_policy_index, + POLICIES)[0] + broker.put_object('correct_o', ts.next(), 123, 'text/plain', + '5af83e3196bf99f440f31f2e1a6c9afe', + storage_policy_index=policy.idx) + self.assertFalse(broker.has_multiple_policies()) + other_policy = [p for p in POLICIES if p is not policy][0] + broker.put_object('wrong_o', ts.next(), 123, 'text/plain', + '5af83e3196bf99f440f31f2e1a6c9afe', + storage_policy_index=other_policy.idx) + self.assert_(broker.has_multiple_policies()) + + @patch_policies + def test_get_policy_info(self): + policy = random.choice(list(POLICIES)) + ts = (normalize_timestamp(t) for t in + itertools.count(int(time()))) + broker = ContainerBroker(':memory:', + account='a', container='c') + broker.initialize(ts.next(), policy.idx) + # migration tests may not honor policy on initialize + if isinstance(self, ContainerBrokerMigrationMixin): + real_storage_policy_index = \ + broker.get_info()['storage_policy_index'] + policy = filter(lambda p: p.idx == real_storage_policy_index, + POLICIES)[0] + policy_stats = broker.get_policy_stats() + expected = {policy.idx: {'bytes_used': 0, 'object_count': 0}} + self.assertEqual(policy_stats, expected) + + # add an object + broker.put_object('correct_o', ts.next(), 123, 'text/plain', + '5af83e3196bf99f440f31f2e1a6c9afe', + storage_policy_index=policy.idx) + policy_stats = broker.get_policy_stats() + expected = {policy.idx: {'bytes_used': 123, 'object_count': 1}} + self.assertEqual(policy_stats, expected) + + # add a misplaced object + other_policy = random.choice([p for p in POLICIES + if p is not policy]) + broker.put_object('wrong_o', ts.next(), 123, 'text/plain', + '5af83e3196bf99f440f31f2e1a6c9afe', + storage_policy_index=other_policy.idx) + policy_stats = broker.get_policy_stats() + expected = { + policy.idx: {'bytes_used': 123, 'object_count': 1}, + other_policy.idx: {'bytes_used': 123, 'object_count': 1}, + } + self.assertEqual(policy_stats, expected) + + def test_policy_stat_tracking(self): + ts = (normalize_timestamp(t) for t in + itertools.count(int(time()))) + broker = ContainerBroker(':memory:', + account='a', container='c') + broker.initialize(ts.next(), POLICIES.default.idx) + stats = defaultdict(dict) + + iters = 100 + for i in range(iters): + policy_index = random.randint(0, iters * 0.1) + name = 'object-%s' % random.randint(0, iters * 0.1) + size = random.randint(0, iters) + broker.put_object(name, ts.next(), size, 'text/plain', + '5af83e3196bf99f440f31f2e1a6c9afe', + storage_policy_index=policy_index) + # track the size of the latest timestamp put for each object + # in each storage policy + stats[policy_index][name] = size + policy_stats = broker.get_policy_stats() + # if no objects were added for the default policy we still + # expect an entry for the default policy in the returned info + # because the database was initialized with that storage policy + # - but it must be empty. + if POLICIES.default.idx not in stats: + default_stats = policy_stats.pop(POLICIES.default.idx) + expected = {'object_count': 0, 'bytes_used': 0} + self.assertEqual(default_stats, expected) + self.assertEqual(len(policy_stats), len(stats)) + for policy_index, stat in policy_stats.items(): + self.assertEqual(stat['object_count'], len(stats[policy_index])) + self.assertEqual(stat['bytes_used'], + sum(stats[policy_index].values())) + def test_get_info(self): # Test ContainerBroker.get_info broker = ContainerBroker(':memory:', account='test1', container='test2') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) info = broker.get_info() self.assertEquals(info['account'], 'test1') self.assertEquals(info['container'], 'test2') self.assertEquals(info['hash'], '00000000000000000000000000000000') + self.assertEqual(info['put_timestamp'], normalize_timestamp(1)) + self.assertEqual(info['delete_timestamp'], '0') + if self.__class__ in (TestContainerBrokerBeforeMetadata, + TestContainerBrokerBeforeXSync, + TestContainerBrokerBeforeSPI): + self.assertEqual(info['status_changed_at'], '0') + else: + self.assertEqual(info['status_changed_at'], + normalize_timestamp(1)) info = broker.get_info() self.assertEquals(info['object_count'], 0) @@ -401,7 +616,7 @@ class TestContainerBroker(unittest.TestCase): def test_set_x_syncs(self): broker = ContainerBroker(':memory:', account='test1', container='test2') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) info = broker.get_info() self.assertEquals(info['x_container_sync_point1'], -1) @@ -415,7 +630,7 @@ class TestContainerBroker(unittest.TestCase): def test_get_report_info(self): broker = ContainerBroker(':memory:', account='test1', container='test2') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) info = broker.get_info() self.assertEquals(info['account'], 'test1') @@ -482,7 +697,7 @@ class TestContainerBroker(unittest.TestCase): def test_list_objects_iter(self): # Test ContainerBroker.list_objects_iter broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) for obj1 in xrange(4): for obj2 in xrange(125): broker.put_object('%d/%04d' % (obj1, obj2), @@ -601,7 +816,7 @@ class TestContainerBroker(unittest.TestCase): # Test ContainerBroker.list_objects_iter using a # delimiter that is not a slash broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) for obj1 in xrange(4): for obj2 in xrange(125): broker.put_object('%d:%04d' % (obj1, obj2), @@ -718,7 +933,7 @@ class TestContainerBroker(unittest.TestCase): def test_list_objects_iter_prefix_delim(self): # Test ContainerBroker.list_objects_iter broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) broker.put_object( '/pets/dogs/1', normalize_timestamp(0), 0, @@ -755,7 +970,7 @@ class TestContainerBroker(unittest.TestCase): # Test ContainerBroker.list_objects_iter for a # container that has an odd file with a trailing delimiter broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) broker.put_object('a', normalize_timestamp(time()), 0, 'text/plain', 'd41d8cd98f00b204e9800998ecf8427e') broker.put_object('a/', normalize_timestamp(time()), 0, @@ -835,7 +1050,7 @@ class TestContainerBroker(unittest.TestCase): # Test ContainerBroker.list_objects_iter for a # container that has an odd file with a trailing delimiter broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) broker.put_object('a', normalize_timestamp(time()), 0, 'text/plain', 'd41d8cd98f00b204e9800998ecf8427e') broker.put_object('a:', normalize_timestamp(time()), 0, @@ -913,7 +1128,7 @@ class TestContainerBroker(unittest.TestCase): def test_chexor(self): broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) broker.put_object('a', normalize_timestamp(1), 0, 'text/plain', 'd41d8cd98f00b204e9800998ecf8427e') broker.put_object('b', normalize_timestamp(2), 0, @@ -933,7 +1148,7 @@ class TestContainerBroker(unittest.TestCase): def test_newid(self): # test DatabaseBroker.newid broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) id = broker.get_info()['id'] broker.newid('someid') self.assertNotEquals(id, broker.get_info()['id']) @@ -941,7 +1156,7 @@ class TestContainerBroker(unittest.TestCase): def test_get_items_since(self): # test DatabaseBroker.get_items_since broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) broker.put_object('a', normalize_timestamp(1), 0, 'text/plain', 'd41d8cd98f00b204e9800998ecf8427e') max_row = broker.get_replication_info()['max_row'] @@ -954,9 +1169,9 @@ class TestContainerBroker(unittest.TestCase): def test_sync_merging(self): # exercise the DatabaseBroker sync functions a bit broker1 = ContainerBroker(':memory:', account='a', container='c') - broker1.initialize(normalize_timestamp('1')) + broker1.initialize(normalize_timestamp('1'), 0) broker2 = ContainerBroker(':memory:', account='a', container='c') - broker2.initialize(normalize_timestamp('1')) + broker2.initialize(normalize_timestamp('1'), 0) self.assertEquals(broker2.get_sync('12345'), -1) broker1.merge_syncs([{'sync_point': 3, 'remote_id': '12345'}]) broker2.merge_syncs(broker1.get_syncs()) @@ -964,9 +1179,9 @@ class TestContainerBroker(unittest.TestCase): def test_merge_items(self): broker1 = ContainerBroker(':memory:', account='a', container='c') - broker1.initialize(normalize_timestamp('1')) + broker1.initialize(normalize_timestamp('1'), 0) broker2 = ContainerBroker(':memory:', account='a', container='c') - broker2.initialize(normalize_timestamp('1')) + broker2.initialize(normalize_timestamp('1'), 0) broker1.put_object('a', normalize_timestamp(1), 0, 'text/plain', 'd41d8cd98f00b204e9800998ecf8427e') broker1.put_object('b', normalize_timestamp(2), 0, @@ -989,10 +1204,10 @@ class TestContainerBroker(unittest.TestCase): def test_merge_items_overwrite(self): # test DatabaseBroker.merge_items broker1 = ContainerBroker(':memory:', account='a', container='c') - broker1.initialize(normalize_timestamp('1')) + broker1.initialize(normalize_timestamp('1'), 0) id = broker1.get_info()['id'] broker2 = ContainerBroker(':memory:', account='a', container='c') - broker2.initialize(normalize_timestamp('1')) + broker2.initialize(normalize_timestamp('1'), 0) broker1.put_object('a', normalize_timestamp(2), 0, 'text/plain', 'd41d8cd98f00b204e9800998ecf8427e') broker1.put_object('b', normalize_timestamp(3), 0, @@ -1014,10 +1229,10 @@ class TestContainerBroker(unittest.TestCase): def test_merge_items_post_overwrite_out_of_order(self): # test DatabaseBroker.merge_items broker1 = ContainerBroker(':memory:', account='a', container='c') - broker1.initialize(normalize_timestamp('1')) + broker1.initialize(normalize_timestamp('1'), 0) id = broker1.get_info()['id'] broker2 = ContainerBroker(':memory:', account='a', container='c') - broker2.initialize(normalize_timestamp('1')) + broker2.initialize(normalize_timestamp('1'), 0) broker1.put_object('a', normalize_timestamp(2), 0, 'text/plain', 'd41d8cd98f00b204e9800998ecf8427e') broker1.put_object('b', normalize_timestamp(3), 0, @@ -1056,8 +1271,165 @@ class TestContainerBroker(unittest.TestCase): self.assertEquals(rec['created_at'], normalize_timestamp(5)) self.assertEquals(rec['content_type'], 'text/plain') + def test_set_storage_policy_index(self): + ts = (normalize_timestamp(t) for t in + itertools.count(int(time()))) + broker = ContainerBroker(':memory:', account='test_account', + container='test_container') + timestamp = ts.next() + broker.initialize(timestamp, 0) -def premetadata_create_container_stat_table(self, conn, put_timestamp=None): + info = broker.get_info() + self.assertEqual(0, info['storage_policy_index']) # sanity check + self.assertEqual(0, info['object_count']) + self.assertEqual(0, info['bytes_used']) + if self.__class__ in (TestContainerBrokerBeforeMetadata, + TestContainerBrokerBeforeXSync, + TestContainerBrokerBeforeSPI): + self.assertEqual(info['status_changed_at'], '0') + else: + self.assertEqual(timestamp, info['status_changed_at']) + expected = {0: {'object_count': 0, 'bytes_used': 0}} + self.assertEqual(expected, broker.get_policy_stats()) + + timestamp = ts.next() + broker.set_storage_policy_index(111, timestamp) + self.assertEqual(broker.storage_policy_index, 111) + info = broker.get_info() + self.assertEqual(111, info['storage_policy_index']) + self.assertEqual(0, info['object_count']) + self.assertEqual(0, info['bytes_used']) + self.assertEqual(timestamp, info['status_changed_at']) + expected[111] = {'object_count': 0, 'bytes_used': 0} + self.assertEqual(expected, broker.get_policy_stats()) + + timestamp = ts.next() + broker.set_storage_policy_index(222, timestamp) + self.assertEqual(broker.storage_policy_index, 222) + info = broker.get_info() + self.assertEqual(222, info['storage_policy_index']) + self.assertEqual(0, info['object_count']) + self.assertEqual(0, info['bytes_used']) + self.assertEqual(timestamp, info['status_changed_at']) + expected[222] = {'object_count': 0, 'bytes_used': 0} + self.assertEqual(expected, broker.get_policy_stats()) + + old_timestamp, timestamp = timestamp, ts.next() + broker.set_storage_policy_index(222, timestamp) # it's idempotent + info = broker.get_info() + self.assertEqual(222, info['storage_policy_index']) + self.assertEqual(0, info['object_count']) + self.assertEqual(0, info['bytes_used']) + self.assertEqual(old_timestamp, info['status_changed_at']) + self.assertEqual(expected, broker.get_policy_stats()) + + def test_set_storage_policy_index_empty(self): + # Putting an object may trigger migrations, so test with a + # never-had-an-object container to make sure we handle it + broker = ContainerBroker(':memory:', account='test_account', + container='test_container') + broker.initialize(normalize_timestamp('1'), 0) + info = broker.get_info() + self.assertEqual(0, info['storage_policy_index']) + + broker.set_storage_policy_index(2) + info = broker.get_info() + self.assertEqual(2, info['storage_policy_index']) + + def test_reconciler_sync(self): + broker = ContainerBroker(':memory:', account='test_account', + container='test_container') + broker.initialize(normalize_timestamp('1'), 0) + self.assertEquals(-1, broker.get_reconciler_sync()) + broker.update_reconciler_sync(10) + self.assertEquals(10, broker.get_reconciler_sync()) + + @with_tempdir + def test_legacy_pending_files(self, tempdir): + ts = (normalize_timestamp(t) for t in + itertools.count(int(time()))) + db_path = os.path.join(tempdir, 'container.db') + + # first init an acct DB without the policy_stat table present + broker = ContainerBroker(db_path, account='a', container='c') + broker.initialize(ts.next(), 1) + + # manually make some pending entries lacking storage_policy_index + with open(broker.pending_file, 'a+b') as fp: + for i in range(10): + name, timestamp, size, content_type, etag, deleted = ( + 'o%s' % i, ts.next(), 0, 'c', 'e', 0) + fp.write(':') + fp.write(pickle.dumps( + (name, timestamp, size, content_type, etag, deleted), + protocol=2).encode('base64')) + fp.flush() + + # use put_object to append some more entries with different + # values for storage_policy_index + for i in range(10, 30): + name = 'o%s' % i + if i < 20: + size = 1 + storage_policy_index = 0 + else: + size = 2 + storage_policy_index = 1 + broker.put_object(name, ts.next(), size, 'c', 'e', 0, + storage_policy_index=storage_policy_index) + + broker._commit_puts_stale_ok() + + # 10 objects with 0 bytes each in the legacy pending entries + # 10 objects with 1 bytes each in storage policy 0 + # 10 objects with 2 bytes each in storage policy 1 + expected = { + 0: {'object_count': 20, 'bytes_used': 10}, + 1: {'object_count': 10, 'bytes_used': 20}, + } + self.assertEqual(broker.get_policy_stats(), expected) + + +class ContainerBrokerMigrationMixin(object): + """ + Mixin for running ContainerBroker against databases created with + older schemas. + """ + def setUp(self): + self._imported_create_object_table = \ + ContainerBroker.create_object_table + ContainerBroker.create_object_table = \ + prespi_create_object_table + self._imported_create_container_info_table = \ + ContainerBroker.create_container_info_table + ContainerBroker.create_container_info_table = \ + premetadata_create_container_info_table + self._imported_create_policy_stat_table = \ + ContainerBroker.create_policy_stat_table + ContainerBroker.create_policy_stat_table = lambda *args: None + + @classmethod + @contextmanager + def old_broker(cls): + cls.runTest = lambda *a, **k: None + case = cls() + case.setUp() + try: + yield ContainerBroker + finally: + case.tearDown() + + def tearDown(self): + ContainerBroker.create_container_info_table = \ + self._imported_create_container_info_table + ContainerBroker.create_object_table = \ + self._imported_create_object_table + ContainerBroker.create_policy_stat_table = \ + self._imported_create_policy_stat_table + + +def premetadata_create_container_info_table(self, conn, put_timestamp, + _spi=None): """ Copied from ContainerBroker before the metadata column was added; used for testing with TestContainerBrokerBeforeMetadata. @@ -1099,19 +1471,17 @@ def premetadata_create_container_stat_table(self, conn, put_timestamp=None): str(uuid4()), put_timestamp)) -class TestContainerBrokerBeforeMetadata(TestContainerBroker): +class TestContainerBrokerBeforeMetadata(ContainerBrokerMigrationMixin, + TestContainerBroker): """ Tests for ContainerBroker against databases created before the metadata column was added. """ def setUp(self): - self._imported_create_container_stat_table = \ - ContainerBroker.create_container_stat_table - ContainerBroker.create_container_stat_table = \ - premetadata_create_container_stat_table + super(TestContainerBrokerBeforeMetadata, self).setUp() broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) exc = None with broker.get() as conn: try: @@ -1121,15 +1491,15 @@ class TestContainerBrokerBeforeMetadata(TestContainerBroker): self.assert_('no such column: metadata' in str(exc)) def tearDown(self): - ContainerBroker.create_container_stat_table = \ - self._imported_create_container_stat_table + super(TestContainerBrokerBeforeMetadata, self).tearDown() broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) with broker.get() as conn: conn.execute('SELECT metadata FROM container_stat') -def prexsync_create_container_stat_table(self, conn, put_timestamp=None): +def prexsync_create_container_info_table(self, conn, put_timestamp, + _spi=None): """ Copied from ContainerBroker before the x_container_sync_point[12] columns were added; used for testing with @@ -1173,19 +1543,19 @@ def prexsync_create_container_stat_table(self, conn, put_timestamp=None): str(uuid4()), put_timestamp)) -class TestContainerBrokerBeforeXSync(TestContainerBroker): +class TestContainerBrokerBeforeXSync(ContainerBrokerMigrationMixin, + TestContainerBroker): """ Tests for ContainerBroker against databases created before the x_container_sync_point[12] columns were added. """ def setUp(self): - self._imported_create_container_stat_table = \ - ContainerBroker.create_container_stat_table - ContainerBroker.create_container_stat_table = \ - prexsync_create_container_stat_table + super(TestContainerBrokerBeforeXSync, self).setUp() + ContainerBroker.create_container_info_table = \ + prexsync_create_container_info_table broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) exc = None with broker.get() as conn: try: @@ -1196,9 +1566,300 @@ class TestContainerBrokerBeforeXSync(TestContainerBroker): self.assert_('no such column: x_container_sync_point1' in str(exc)) def tearDown(self): - ContainerBroker.create_container_stat_table = \ - self._imported_create_container_stat_table + super(TestContainerBrokerBeforeXSync, self).tearDown() broker = ContainerBroker(':memory:', account='a', container='c') - broker.initialize(normalize_timestamp('1')) + broker.initialize(normalize_timestamp('1'), 0) with broker.get() as conn: conn.execute('SELECT x_container_sync_point1 FROM container_stat') + + +def prespi_create_object_table(self, conn, *args, **kwargs): + conn.executescript(""" + CREATE TABLE object ( + ROWID INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT, + created_at TEXT, + size INTEGER, + content_type TEXT, + etag TEXT, + deleted INTEGER DEFAULT 0 + ); + + CREATE INDEX ix_object_deleted_name ON object (deleted, name); + + CREATE TRIGGER object_insert AFTER INSERT ON object + BEGIN + UPDATE container_stat + SET object_count = object_count + (1 - new.deleted), + bytes_used = bytes_used + new.size, + hash = chexor(hash, new.name, new.created_at); + END; + + CREATE TRIGGER object_update BEFORE UPDATE ON object + BEGIN + SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT'); + END; + + CREATE TRIGGER object_delete AFTER DELETE ON object + BEGIN + UPDATE container_stat + SET object_count = object_count - (1 - old.deleted), + bytes_used = bytes_used - old.size, + hash = chexor(hash, old.name, old.created_at); + END; + """) + + +def prespi_create_container_info_table(self, conn, put_timestamp, + _spi=None): + """ + Copied from ContainerBroker before the + storage_policy_index column was added; used for testing with + TestContainerBrokerBeforeSPI. + + Create the container_stat table which is specific to the container DB. + + :param conn: DB connection object + :param put_timestamp: put timestamp + """ + if put_timestamp is None: + put_timestamp = normalize_timestamp(0) + conn.executescript(""" + CREATE TABLE container_stat ( + account TEXT, + container TEXT, + created_at TEXT, + put_timestamp TEXT DEFAULT '0', + delete_timestamp TEXT DEFAULT '0', + object_count INTEGER, + bytes_used INTEGER, + reported_put_timestamp TEXT DEFAULT '0', + reported_delete_timestamp TEXT DEFAULT '0', + reported_object_count INTEGER DEFAULT 0, + reported_bytes_used INTEGER DEFAULT 0, + hash TEXT default '00000000000000000000000000000000', + id TEXT, + status TEXT DEFAULT '', + status_changed_at TEXT DEFAULT '0', + metadata TEXT DEFAULT '', + x_container_sync_point1 INTEGER DEFAULT -1, + x_container_sync_point2 INTEGER DEFAULT -1 + ); + + INSERT INTO container_stat (object_count, bytes_used) + VALUES (0, 0); + """) + conn.execute(''' + UPDATE container_stat + SET account = ?, container = ?, created_at = ?, id = ?, + put_timestamp = ? + ''', (self.account, self.container, normalize_timestamp(time()), + str(uuid4()), put_timestamp)) + + +class TestContainerBrokerBeforeSPI(ContainerBrokerMigrationMixin, + TestContainerBroker): + """ + Tests for ContainerBroker against databases created + before the storage_policy_index column was added. + """ + + def setUp(self): + super(TestContainerBrokerBeforeSPI, self).setUp() + ContainerBroker.create_container_info_table = \ + prespi_create_container_info_table + + broker = ContainerBroker(':memory:', account='a', container='c') + broker.initialize(normalize_timestamp('1'), 0) + exc = None + with broker.get() as conn: + try: + conn.execute('''SELECT storage_policy_index + FROM container_stat''') + except BaseException as err: + exc = err + self.assert_('no such column: storage_policy_index' in str(exc)) + + def tearDown(self): + super(TestContainerBrokerBeforeSPI, self).tearDown() + broker = ContainerBroker(':memory:', account='a', container='c') + broker.initialize(normalize_timestamp('1'), 0) + with broker.get() as conn: + conn.execute('SELECT storage_policy_index FROM container_stat') + + @patch_policies + @with_tempdir + def test_object_table_migration(self, tempdir): + db_path = os.path.join(tempdir, 'container.db') + + # initialize an un-migrated database + broker = ContainerBroker(db_path, account='a', container='c') + put_timestamp = normalize_timestamp(int(time())) + broker.initialize(put_timestamp, None) + with broker.get() as conn: + try: + conn.execute(''' + SELECT storage_policy_index FROM object + ''').fetchone()[0] + except sqlite3.OperationalError as err: + # confirm that the table doesn't have this column + self.assert_('no such column: storage_policy_index' in + str(err)) + else: + self.fail('broker did not raise sqlite3.OperationalError ' + 'trying to select from storage_policy_index ' + 'from object table!') + + # manually insert an existing row to avoid automatic migration + obj_put_timestamp = normalize_timestamp(time()) + with broker.get() as conn: + conn.execute(''' + INSERT INTO object (name, created_at, size, + content_type, etag, deleted) + VALUES (?, ?, ?, ?, ?, ?) + ''', ('test_name', obj_put_timestamp, 123, + 'text/plain', '8f4c680e75ca4c81dc1917ddab0a0b5c', 0)) + conn.commit() + + # make sure we can iter objects without performing migration + for o in broker.list_objects_iter(1, None, None, None, None): + self.assertEqual(o, ('test_name', obj_put_timestamp, 123, + 'text/plain', + '8f4c680e75ca4c81dc1917ddab0a0b5c')) + + # get_info + info = broker.get_info() + expected = { + 'account': 'a', + 'container': 'c', + 'put_timestamp': put_timestamp, + 'delete_timestamp': '0', + 'status_changed_at': '0', + 'bytes_used': 123, + 'object_count': 1, + 'reported_put_timestamp': '0', + 'reported_delete_timestamp': '0', + 'reported_object_count': 0, + 'reported_bytes_used': 0, + 'x_container_sync_point1': -1, + 'x_container_sync_point2': -1, + 'storage_policy_index': 0, + } + for k, v in expected.items(): + self.assertEqual(info[k], v, + 'The value for %s was %r not %r' % ( + k, info[k], v)) + self.assert_(float(info['created_at']) > float(put_timestamp)) + self.assertNotEqual(int(info['hash'], 16), 0) + orig_hash = info['hash'] + # get_replication_info + info = broker.get_replication_info() + # translate object count for replicators + expected['count'] = expected.pop('object_count') + for k, v in expected.items(): + self.assertEqual(info[k], v) + self.assert_(float(info['created_at']) > float(put_timestamp)) + self.assertEqual(info['hash'], orig_hash) + self.assertEqual(info['max_row'], 1) + self.assertEqual(info['metadata'], '') + # get_policy_stats + info = broker.get_policy_stats() + expected = { + 0: {'bytes_used': 123, 'object_count': 1} + } + self.assertEqual(info, expected) + # empty & is_deleted + self.assertEqual(broker.empty(), False) + self.assertEqual(broker.is_deleted(), False) + + # no migrations have occurred yet + + # container_stat table + with broker.get() as conn: + try: + conn.execute(''' + SELECT storage_policy_index FROM container_stat + ''').fetchone()[0] + except sqlite3.OperationalError as err: + # confirm that the table doesn't have this column + self.assert_('no such column: storage_policy_index' in + str(err)) + else: + self.fail('broker did not raise sqlite3.OperationalError ' + 'trying to select from storage_policy_index ' + 'from container_stat table!') + + # object table + with broker.get() as conn: + try: + conn.execute(''' + SELECT storage_policy_index FROM object + ''').fetchone()[0] + except sqlite3.OperationalError as err: + # confirm that the table doesn't have this column + self.assert_('no such column: storage_policy_index' in + str(err)) + else: + self.fail('broker did not raise sqlite3.OperationalError ' + 'trying to select from storage_policy_index ' + 'from object table!') + + # policy_stat table + with broker.get() as conn: + try: + conn.execute(''' + SELECT storage_policy_index FROM policy_stat + ''').fetchone()[0] + except sqlite3.OperationalError as err: + # confirm that the table does not exist yet + self.assert_('no such table: policy_stat' in str(err)) + else: + self.fail('broker did not raise sqlite3.OperationalError ' + 'trying to select from storage_policy_index ' + 'from policy_stat table!') + + # now do a PUT with a different value for storage_policy_index + # which will update the DB schema as well as update policy_stats + # for legacy objects in the DB (those without an SPI) + second_object_put_timestamp = normalize_timestamp(time()) + other_policy = [p for p in POLICIES if p.idx != 0][0] + broker.put_object('test_second', second_object_put_timestamp, + 456, 'text/plain', + 'cbac50c175793513fa3c581551c876ab', + storage_policy_index=other_policy.idx) + broker._commit_puts_stale_ok() + + # we are fully migrated and both objects have their + # storage_policy_index + with broker.get() as conn: + storage_policy_index = conn.execute(''' + SELECT storage_policy_index FROM container_stat + ''').fetchone()[0] + self.assertEqual(storage_policy_index, 0) + rows = conn.execute(''' + SELECT name, storage_policy_index FROM object + ''').fetchall() + for row in rows: + if row[0] == 'test_name': + self.assertEqual(row[1], 0) + else: + self.assertEqual(row[1], other_policy.idx) + + # and all stats tracking is in place + stats = broker.get_policy_stats() + self.assertEqual(len(stats), 2) + self.assertEqual(stats[0]['object_count'], 1) + self.assertEqual(stats[0]['bytes_used'], 123) + self.assertEqual(stats[other_policy.idx]['object_count'], 1) + self.assertEqual(stats[other_policy.idx]['bytes_used'], 456) + + # get info still reports on the legacy storage policy + info = broker.get_info() + self.assertEqual(info['object_count'], 1) + self.assertEqual(info['bytes_used'], 123) + + # unless you change the storage policy + broker.set_storage_policy_index(other_policy.idx) + info = broker.get_info() + self.assertEqual(info['object_count'], 1) + self.assertEqual(info['bytes_used'], 456) diff --git a/test/unit/container/test_server.py b/test/unit/container/test_server.py index 4fd319ab90..7495b96648 100644 --- a/test/unit/container/test_server.py +++ b/test/unit/container/test_server.py @@ -17,6 +17,7 @@ import operator import os import mock import unittest +import itertools from contextlib import contextmanager from shutil import rmtree from StringIO import StringIO @@ -24,6 +25,8 @@ from tempfile import mkdtemp from test.unit import FakeLogger from time import gmtime from xml.dom import minidom +import time +import random from eventlet import spawn, Timeout, listen import simplejson @@ -35,8 +38,11 @@ from swift.common import constraints from swift.common.utils import (normalize_timestamp, mkdirs, public, replication, lock_parent_directory) from test.unit import fake_http_connect +from swift.common.storage_policy import POLICY_INDEX, POLICIES from swift.common.request_helpers import get_sys_meta_prefix +from test.unit import patch_policies + @contextmanager def save_globals(): @@ -48,6 +54,7 @@ def save_globals(): swift.container.server.http_connect = orig_http_connect +@patch_policies class TestContainerController(unittest.TestCase): """Test swift.container.server.ContainerController""" def setUp(self): @@ -60,11 +67,46 @@ class TestContainerController(unittest.TestCase): mkdirs(os.path.join(self.testdir, 'sda1', 'tmp')) self.controller = container_server.ContainerController( {'devices': self.testdir, 'mount_check': 'false'}) + # some of the policy tests want at least two policies + self.assert_(len(POLICIES) > 1) def tearDown(self): - """Tear down for testing swift.object_server.ObjectController""" rmtree(os.path.dirname(self.testdir), ignore_errors=1) + def _check_put_container_storage_policy(self, req, policy_index): + resp = req.get_response(self.controller) + self.assertEqual(201, resp.status_int) + req = Request.blank(req.path, method='HEAD') + resp = req.get_response(self.controller) + self.assertEqual(204, resp.status_int) + self.assertEqual(str(policy_index), resp.headers[POLICY_INDEX]) + + def test_get_and_validate_policy_index(self): + # no policy is OK + req = Request.blank('/sda1/p/a/container_default', method='PUT', + headers={'X-Timestamp': '0'}) + self._check_put_container_storage_policy(req, POLICIES.default.idx) + + # bogus policies + for policy in ('nada', 999): + req = Request.blank('/sda1/p/a/c_%s' % policy, method='PUT', + headers={ + 'X-Timestamp': '0', + POLICY_INDEX: policy + }) + resp = req.get_response(self.controller) + self.assertEqual(400, resp.status_int) + self.assert_('invalid' in resp.body.lower()) + + # good policies + for policy in POLICIES: + req = Request.blank('/sda1/p/a/c_%s' % policy.name, method='PUT', + headers={ + 'X-Timestamp': '0', + POLICY_INDEX: policy.idx, + }) + self._check_put_container_storage_policy(req, policy.idx) + def test_acl_container(self): # Ensure no acl by default req = Request.blank( @@ -120,31 +162,95 @@ class TestContainerController(unittest.TestCase): 'account:user') def test_HEAD(self): - req = Request.blank( - '/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT', - 'HTTP_X_TIMESTAMP': '0'}) + start = int(time.time()) + ts = itertools.count(start) + req = Request.blank('/sda1/p/a/c', method='PUT', headers={ + 'x-timestamp': normalize_timestamp(ts.next())}) req.get_response(self.controller) - req = Request.blank( - '/sda1/p/a/c', environ={'REQUEST_METHOD': 'HEAD', - 'HTTP_X_TIMESTAMP': '0'}) + req = Request.blank('/sda1/p/a/c', method='HEAD') response = req.get_response(self.controller) - self.assert_(response.status.startswith('204')) - self.assertEquals(int(response.headers['x-container-bytes-used']), 0) - self.assertEquals(int(response.headers['x-container-object-count']), 0) - req2 = Request.blank( - '/sda1/p/a/c/o', environ={ - 'REQUEST_METHOD': 'PUT', - 'HTTP_X_TIMESTAMP': '1', 'HTTP_X_SIZE': 42, - 'HTTP_X_CONTENT_TYPE': 'text/plain', 'HTTP_X_ETAG': 'x'}) - req2.get_response(self.controller) + self.assertEqual(response.status_int, 204) + self.assertEqual(response.headers['x-container-bytes-used'], '0') + self.assertEqual(response.headers['x-container-object-count'], '0') + obj_put_request = Request.blank( + '/sda1/p/a/c/o', method='PUT', headers={ + 'x-timestamp': normalize_timestamp(ts.next()), + 'x-size': 42, + 'x-content-type': 'text/plain', + 'x-etag': 'x', + }) + obj_put_request.get_response(self.controller) + # re-issue HEAD request response = req.get_response(self.controller) - self.assertEquals(int(response.headers['x-container-bytes-used']), 42) - self.assertEquals(int(response.headers['x-container-object-count']), 1) + self.assertEqual(response.status_int // 100, 2) + self.assertEqual(response.headers['x-container-bytes-used'], '42') + self.assertEqual(response.headers['x-container-object-count'], '1') + # created at time... + self.assert_(float(response.headers['x-timestamp']) >= start) + self.assertEqual(response.headers['x-put-timestamp'], + normalize_timestamp(start)) + + # backend headers + self.assertEqual(int(response.headers[POLICY_INDEX]), + int(POLICIES.default)) + self.assert_(float(response.headers['x-backend-timestamp']) >= start) + self.assertEqual(response.headers['x-backend-put-timestamp'], + normalize_timestamp(start)) + self.assertEqual(response.headers['x-backend-delete-timestamp'], + normalize_timestamp(0)) + self.assertEqual(response.headers['x-backend-status-changed-at'], + normalize_timestamp(start)) def test_HEAD_not_found(self): - req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'HEAD'}) + req = Request.blank('/sda1/p/a/c', method='HEAD') resp = req.get_response(self.controller) - self.assertEquals(resp.status_int, 404) + self.assertEqual(resp.status_int, 404) + self.assertEqual(int(resp.headers[POLICY_INDEX]), 0) + self.assertEqual(resp.headers['x-backend-timestamp'], + normalize_timestamp(0)) + self.assertEqual(resp.headers['x-backend-put-timestamp'], + normalize_timestamp(0)) + self.assertEqual(resp.headers['x-backend-status-changed-at'], + normalize_timestamp(0)) + self.assertEqual(resp.headers['x-backend-delete-timestamp'], + normalize_timestamp(0)) + for header in ('x-container-object-count', 'x-container-bytes-used', + 'x-timestamp', 'x-put-timestamp'): + self.assertEqual(resp.headers[header], None) + + def test_deleted_headers(self): + ts = itertools.count(int(time.time())) + request_method_times = { + 'PUT': normalize_timestamp(ts.next()), + 'DELETE': normalize_timestamp(ts.next()), + } + # setup a deleted container + for method in ('PUT', 'DELETE'): + x_timestamp = request_method_times[method] + req = Request.blank('/sda1/p/a/c', method=method, + headers={'x-timestamp': x_timestamp}) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int // 100, 2) + + for method in ('GET', 'HEAD'): + req = Request.blank('/sda1/p/a/c', method=method) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 404) + # backend headers + self.assertEqual(int(resp.headers[POLICY_INDEX]), + int(POLICIES.default)) + self.assert_(float(resp.headers['x-backend-timestamp']) >= + float(request_method_times['PUT'])) + self.assertEqual(resp.headers['x-backend-put-timestamp'], + request_method_times['PUT']) + self.assertEqual(resp.headers['x-backend-delete-timestamp'], + request_method_times['DELETE']) + self.assertEqual(resp.headers['x-backend-status-changed-at'], + request_method_times['DELETE']) + for header in ('x-container-object-count', + 'x-container-bytes-used', 'x-timestamp', + 'x-put-timestamp'): + self.assertEqual(resp.headers[header], None) def test_HEAD_invalid_partition(self): req = Request.blank('/sda1/./a/c', environ={'REQUEST_METHOD': 'HEAD', @@ -237,6 +343,233 @@ class TestContainerController(unittest.TestCase): resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 404) + def test_PUT_good_policy_specified(self): + policy = random.choice(list(POLICIES)) + # Set metadata header + req = Request.blank('/sda1/p/a/c', method='PUT', + headers={'X-Timestamp': normalize_timestamp(1), + POLICY_INDEX: policy.idx}) + resp = req.get_response(self.controller) + self.assertEquals(resp.status_int, 201) + + # now make sure we read it back + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'}) + resp = req.get_response(self.controller) + self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx)) + + def test_PUT_no_policy_specified(self): + # Set metadata header + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': normalize_timestamp(1)}) + resp = req.get_response(self.controller) + self.assertEquals(resp.status_int, 201) + + # now make sure the default was used (pol 1) + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'}) + resp = req.get_response(self.controller) + self.assertEquals(resp.headers.get(POLICY_INDEX), + str(POLICIES.default.idx)) + + def test_PUT_bad_policy_specified(self): + # Set metadata header + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': normalize_timestamp(1), + POLICY_INDEX: 'nada'}) + resp = req.get_response(self.controller) + # make sure we get bad response + self.assertEquals(resp.status_int, 400) + + def test_PUT_no_policy_change(self): + ts = itertools.count(1) + policy = random.choice(list(POLICIES)) + # Set metadata header + req = Request.blank('/sda1/p/a/c', method='PUT', headers={ + 'X-Timestamp': normalize_timestamp(ts.next()), + POLICY_INDEX: policy.idx}) + resp = req.get_response(self.controller) + self.assertEquals(resp.status_int, 201) + req = Request.blank('/sda1/p/a/c') + resp = req.get_response(self.controller) + self.assertEquals(resp.status_int, 204) + # make sure we get the right index back + self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx)) + + # now try to update w/o changing the policy + for method in ('POST', 'PUT'): + req = Request.blank('/sda1/p/a/c', method=method, headers={ + 'X-Timestamp': normalize_timestamp(ts.next()), + POLICY_INDEX: policy.idx + }) + resp = req.get_response(self.controller) + self.assertEquals(resp.status_int // 100, 2) + # make sure we get the right index back + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'}) + resp = req.get_response(self.controller) + self.assertEquals(resp.status_int, 204) + self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx)) + + def test_PUT_bad_policy_change(self): + ts = itertools.count(1) + policy = random.choice(list(POLICIES)) + # Set metadata header + req = Request.blank('/sda1/p/a/c', method='PUT', headers={ + 'X-Timestamp': normalize_timestamp(ts.next()), + POLICY_INDEX: policy.idx}) + resp = req.get_response(self.controller) + self.assertEquals(resp.status_int, 201) + req = Request.blank('/sda1/p/a/c') + resp = req.get_response(self.controller) + self.assertEquals(resp.status_int, 204) + # make sure we get the right index back + self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx)) + + other_policies = [p for p in POLICIES if p != policy] + for other_policy in other_policies: + # now try to change it and make sure we get a conflict + req = Request.blank('/sda1/p/a/c', method='PUT', headers={ + 'X-Timestamp': normalize_timestamp(ts.next()), + POLICY_INDEX: other_policy.idx + }) + resp = req.get_response(self.controller) + self.assertEquals(resp.status_int, 409) + + # and make sure there is no change! + req = Request.blank('/sda1/p/a/c') + resp = req.get_response(self.controller) + self.assertEquals(resp.status_int, 204) + # make sure we get the right index back + self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx)) + + def test_POST_ignores_policy_change(self): + ts = itertools.count(1) + policy = random.choice(list(POLICIES)) + req = Request.blank('/sda1/p/a/c', method='PUT', headers={ + 'X-Timestamp': normalize_timestamp(ts.next()), + POLICY_INDEX: policy.idx}) + resp = req.get_response(self.controller) + self.assertEquals(resp.status_int, 201) + req = Request.blank('/sda1/p/a/c') + resp = req.get_response(self.controller) + self.assertEquals(resp.status_int, 204) + # make sure we get the right index back + self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx)) + + other_policies = [p for p in POLICIES if p != policy] + for other_policy in other_policies: + # now try to change it and make sure we get a conflict + req = Request.blank('/sda1/p/a/c', method='POST', headers={ + 'X-Timestamp': normalize_timestamp(ts.next()), + POLICY_INDEX: other_policy.idx + }) + resp = req.get_response(self.controller) + # valid request + self.assertEquals(resp.status_int // 100, 2) + + # but it does nothing + req = Request.blank('/sda1/p/a/c') + resp = req.get_response(self.controller) + self.assertEquals(resp.status_int, 204) + # make sure we get the right index back + self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx)) + + def test_PUT_no_policy_for_existing_default(self): + ts = itertools.count(1) + # create a container with the default storage policy + req = Request.blank('/sda1/p/a/c', method='PUT', headers={ + 'X-Timestamp': normalize_timestamp(ts.next()), + }) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 201) # sanity check + + # check the policy index + req = Request.blank('/sda1/p/a/c', method='HEAD') + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 204) + self.assertEqual(resp.headers[POLICY_INDEX], + str(POLICIES.default.idx)) + + # put again without specifying the storage policy + req = Request.blank('/sda1/p/a/c', method='PUT', headers={ + 'X-Timestamp': normalize_timestamp(ts.next()), + }) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 202) # sanity check + + # policy index is unchanged + req = Request.blank('/sda1/p/a/c', method='HEAD') + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 204) + self.assertEqual(resp.headers[POLICY_INDEX], + str(POLICIES.default.idx)) + + def test_PUT_proxy_default_no_policy_for_existing_default(self): + # make it look like the proxy has a different default than we do, like + # during a config change restart across a multi node cluster. + proxy_default = random.choice([p for p in POLICIES if not + p.is_default]) + ts = itertools.count(1) + # create a container with the default storage policy + req = Request.blank('/sda1/p/a/c', method='PUT', headers={ + 'X-Timestamp': normalize_timestamp(ts.next()), + 'X-Backend-Storage-Policy-Default': int(proxy_default), + }) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 201) # sanity check + + # check the policy index + req = Request.blank('/sda1/p/a/c', method='HEAD') + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 204) + self.assertEqual(int(resp.headers[POLICY_INDEX]), + int(proxy_default)) + + # put again without proxy specifying the different default + req = Request.blank('/sda1/p/a/c', method='PUT', headers={ + 'X-Timestamp': normalize_timestamp(ts.next()), + 'X-Backend-Storage-Policy-Default': int(POLICIES.default), + }) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 202) # sanity check + + # policy index is unchanged + req = Request.blank('/sda1/p/a/c', method='HEAD') + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 204) + self.assertEqual(int(resp.headers[POLICY_INDEX]), + int(proxy_default)) + + def test_PUT_no_policy_for_existing_non_default(self): + ts = itertools.count(1) + non_default_policy = [p for p in POLICIES if not p.is_default][0] + # create a container with the non-default storage policy + req = Request.blank('/sda1/p/a/c', method='PUT', headers={ + 'X-Timestamp': normalize_timestamp(ts.next()), + POLICY_INDEX: non_default_policy.idx, + }) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 201) # sanity check + + # check the policy index + req = Request.blank('/sda1/p/a/c', method='HEAD') + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 204) + self.assertEqual(resp.headers[POLICY_INDEX], + str(non_default_policy.idx)) + + # put again without specifiying the storage policy + req = Request.blank('/sda1/p/a/c', method='PUT', headers={ + 'X-Timestamp': normalize_timestamp(ts.next()), + }) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 202) # sanity check + + # policy index is unchanged + req = Request.blank('/sda1/p/a/c', method='HEAD') + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 204) + self.assertEqual(resp.headers[POLICY_INDEX], + str(non_default_policy.idx)) + def test_PUT_GET_metadata(self): # Set metadata header req = Request.blank( @@ -305,20 +638,20 @@ class TestContainerController(unittest.TestCase): req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': normalize_timestamp(1), key: 'Value'}) - resp = self.controller.PUT(req) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 201) - req = Request.blank('/sda1/p/a/c') - resp = self.controller.GET(req) + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'}) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 204) self.assertEquals(resp.headers.get(key.lower()), 'Value') # Set another metadata header, ensuring old one doesn't disappear req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'POST'}, headers={'X-Timestamp': normalize_timestamp(1), key2: 'Value2'}) - resp = self.controller.POST(req) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 204) - req = Request.blank('/sda1/p/a/c') - resp = self.controller.GET(req) + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'}) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 204) self.assertEquals(resp.headers.get(key.lower()), 'Value') self.assertEquals(resp.headers.get(key2.lower()), 'Value2') @@ -326,10 +659,10 @@ class TestContainerController(unittest.TestCase): req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': normalize_timestamp(3), key: 'New Value'}) - resp = self.controller.PUT(req) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 202) - req = Request.blank('/sda1/p/a/c') - resp = self.controller.GET(req) + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'}) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 204) self.assertEquals(resp.headers.get(key.lower()), 'New Value') @@ -337,10 +670,10 @@ class TestContainerController(unittest.TestCase): req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': normalize_timestamp(2), key: 'Old Value'}) - resp = self.controller.PUT(req) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 202) - req = Request.blank('/sda1/p/a/c') - resp = self.controller.GET(req) + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'}) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 204) self.assertEquals(resp.headers.get(key.lower()), 'New Value') @@ -348,10 +681,10 @@ class TestContainerController(unittest.TestCase): req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': normalize_timestamp(4), key: ''}) - resp = self.controller.PUT(req) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 202) - req = Request.blank('/sda1/p/a/c') - resp = self.controller.GET(req) + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'}) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 204) self.assert_(key.lower() not in resp.headers) @@ -437,26 +770,26 @@ class TestContainerController(unittest.TestCase): key = '%sTest' % prefix req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': normalize_timestamp(1)}) - resp = self.controller.PUT(req) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 201) # Set metadata header req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'POST'}, headers={'X-Timestamp': normalize_timestamp(1), key: 'Value'}) - resp = self.controller.POST(req) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 204) req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'HEAD'}) - resp = self.controller.HEAD(req) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 204) self.assertEquals(resp.headers.get(key.lower()), 'Value') # Update metadata header req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'POST'}, headers={'X-Timestamp': normalize_timestamp(3), key: 'New Value'}) - resp = self.controller.POST(req) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 204) req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'HEAD'}) - resp = self.controller.HEAD(req) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 204) self.assertEquals(resp.headers.get(key.lower()), 'New Value') @@ -464,10 +797,10 @@ class TestContainerController(unittest.TestCase): req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'POST'}, headers={'X-Timestamp': normalize_timestamp(2), key: 'Old Value'}) - resp = self.controller.POST(req) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 204) req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'HEAD'}) - resp = self.controller.HEAD(req) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 204) self.assertEquals(resp.headers.get(key.lower()), 'New Value') @@ -475,10 +808,10 @@ class TestContainerController(unittest.TestCase): req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'POST'}, headers={'X-Timestamp': normalize_timestamp(4), key: ''}) - resp = self.controller.POST(req) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 204) req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'HEAD'}) - resp = self.controller.HEAD(req) + resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 204) self.assert_(key.lower() not in resp.headers) @@ -780,11 +1113,22 @@ class TestContainerController(unittest.TestCase): req = Request.blank(path, method='GET') resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 404) # sanity + # backend headers + expectations = { + 'x-backend-put-timestamp': normalize_timestamp(1), + 'x-backend-delete-timestamp': normalize_timestamp(2), + 'x-backend-status-changed-at': normalize_timestamp(2), + } + for header, value in expectations.items(): + self.assertEqual(resp.headers[header], value, + 'response header %s was %s not %s' % ( + header, resp.headers[header], value)) db = self.controller._get_container_broker('sda1', 'p', 'a', 'c') self.assertEqual(True, db.is_deleted()) info = db.get_info() self.assertEquals(info['put_timestamp'], normalize_timestamp('1')) self.assertEquals(info['delete_timestamp'], normalize_timestamp('2')) + self.assertEquals(info['status_changed_at'], normalize_timestamp('2')) # recreate req = Request.blank(path, method='PUT', headers={'X-Timestamp': '4'}) @@ -795,6 +1139,20 @@ class TestContainerController(unittest.TestCase): info = db.get_info() self.assertEquals(info['put_timestamp'], normalize_timestamp('4')) self.assertEquals(info['delete_timestamp'], normalize_timestamp('2')) + self.assertEquals(info['status_changed_at'], normalize_timestamp('4')) + for method in ('GET', 'HEAD'): + req = Request.blank(path) + resp = req.get_response(self.controller) + expectations = { + 'x-put-timestamp': normalize_timestamp(4), + 'x-backend-put-timestamp': normalize_timestamp(4), + 'x-backend-delete-timestamp': normalize_timestamp(2), + 'x-backend-status-changed-at': normalize_timestamp(4), + } + for header, expected in expectations.items(): + self.assertEqual(resp.headers[header], expected, + 'header %s was %s is not expected %s' % ( + header, resp.headers[header], expected)) def test_DELETE_PUT_recreate_replication_race(self): path = '/sda1/p/a/c' @@ -862,6 +1220,71 @@ class TestContainerController(unittest.TestCase): resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 404) + def test_change_storage_policy_via_DELETE_then_PUT(self): + ts = (normalize_timestamp(t) for t in + itertools.count(int(time.time()))) + policy = random.choice(list(POLICIES)) + req = Request.blank( + '/sda1/p/a/c', method='PUT', + headers={'X-Timestamp': ts.next(), + POLICY_INDEX: policy.idx}) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 201) # sanity check + + # try re-recreate with other policies + other_policies = [p for p in POLICIES if p != policy] + for other_policy in other_policies: + # first delete the existing container + req = Request.blank('/sda1/p/a/c', method='DELETE', headers={ + 'X-Timestamp': ts.next()}) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 204) # sanity check + + # at this point, the DB should still exist but be in a deleted + # state, so changing the policy index is perfectly acceptable + req = Request.blank('/sda1/p/a/c', method='PUT', headers={ + 'X-Timestamp': ts.next(), + POLICY_INDEX: other_policy.idx}) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 201) # sanity check + + req = Request.blank( + '/sda1/p/a/c', method='HEAD') + resp = req.get_response(self.controller) + self.assertEqual(resp.headers[POLICY_INDEX], + str(other_policy.idx)) + + def test_change_to_default_storage_policy_via_DELETE_then_PUT(self): + ts = (normalize_timestamp(t) for t in + itertools.count(int(time.time()))) + non_default_policy = random.choice([p for p in POLICIES + if not p.is_default]) + req = Request.blank('/sda1/p/a/c', method='PUT', headers={ + 'X-Timestamp': ts.next(), + POLICY_INDEX: non_default_policy.idx, + }) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 201) # sanity check + + req = Request.blank( + '/sda1/p/a/c', method='DELETE', + headers={'X-Timestamp': ts.next()}) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 204) # sanity check + + # at this point, the DB should still exist but be in a deleted state, + # so changing the policy index is perfectly acceptable + req = Request.blank( + '/sda1/p/a/c', method='PUT', + headers={'X-Timestamp': ts.next()}) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 201) # sanity check + + req = Request.blank('/sda1/p/a/c', method='HEAD') + resp = req.get_response(self.controller) + self.assertEqual(resp.headers[POLICY_INDEX], + str(POLICIES.default.idx)) + def test_DELETE_object(self): req = Request.blank( '/sda1/p/a/c', diff --git a/test/unit/container/test_updater.py b/test/unit/container/test_updater.py index 5c32dfa3b0..8c15b93ce8 100644 --- a/test/unit/container/test_updater.py +++ b/test/unit/container/test_updater.py @@ -91,7 +91,7 @@ class TestContainerUpdater(unittest.TestCase): os.mkdir(subdir) cb = ContainerBroker(os.path.join(subdir, 'hash.db'), account='a', container='c') - cb.initialize(normalize_timestamp(1)) + cb.initialize(normalize_timestamp(1), 0) cu.run_once() info = cb.get_info() self.assertEquals(info['object_count'], 0) @@ -172,7 +172,7 @@ class TestContainerUpdater(unittest.TestCase): os.mkdir(subdir) cb = ContainerBroker(os.path.join(subdir, 'hash.db'), account='a', container='\xce\xa9') - cb.initialize(normalize_timestamp(1)) + cb.initialize(normalize_timestamp(1), 0) cb.put_object('\xce\xa9', normalize_timestamp(2), 3, 'text/plain', '68b329da9893e34099c7d8ad5cb9c940') diff --git a/test/unit/proxy/controllers/test_container.py b/test/unit/proxy/controllers/test_container.py index 7c8ecf7075..eda000fe98 100644 --- a/test/unit/proxy/controllers/test_container.py +++ b/test/unit/proxy/controllers/test_container.py @@ -20,11 +20,17 @@ from swift.common.swob import Request from swift.proxy import server as proxy_server from swift.proxy.controllers.base import headers_to_container_info from test.unit import fake_http_connect, FakeRing, FakeMemcache +from swift.common.storage_policy import StoragePolicy from swift.common.request_helpers import get_sys_meta_prefix +from test.unit import patch_policies +from test.unit.common.ring.test_ring import TestRingBase -class TestContainerController(unittest.TestCase): + +@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())]) +class TestContainerController(TestRingBase): def setUp(self): + TestRingBase.setUp(self) self.app = proxy_server.Application(None, FakeMemcache(), account_ring=FakeRing(), container_ring=FakeRing(), diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 1e47ef9564..41654a0d55 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -27,6 +27,7 @@ from tempfile import mkdtemp import weakref import operator import re +import random import mock from eventlet import sleep, spawn, wsgi, listen @@ -34,7 +35,8 @@ import simplejson from test.unit import ( connect_tcp, readuntil2crlfs, FakeLogger, fake_http_connect, FakeRing, - FakeMemcache, debug_logger, patch_policies, write_fake_ring) + FakeMemcache, debug_logger, patch_policies, write_fake_ring, + mocked_http_conn) from swift.proxy import server as proxy_server from swift.account import server as account_server from swift.container import server as container_server @@ -49,10 +51,11 @@ from swift.proxy.controllers import base as proxy_base from swift.proxy.controllers.base import get_container_memcache_key, \ get_account_memcache_key, cors_validation import swift.proxy.controllers -from swift.common.request_helpers import get_sys_meta_prefix -from swift.common.storage_policy import StoragePolicy from swift.common.swob import Request, Response, HTTPUnauthorized, \ HTTPException +from swift.common.storage_policy import StoragePolicy, \ + POLICIES, POLICY, POLICY_INDEX +from swift.common.request_helpers import get_sys_meta_prefix # mocks logging.getLogger().addHandler(logging.StreamHandler(sys.stdout)) @@ -4170,6 +4173,11 @@ class TestObjectController(unittest.TestCase): ]) +@patch_policies([ + StoragePolicy(0, 'zero', True, object_ring=FakeRing()), + StoragePolicy(1, 'one', False, object_ring=FakeRing()), + StoragePolicy(2, 'two', False, True, object_ring=FakeRing()) +]) class TestContainerController(unittest.TestCase): "Test swift.proxy_server.ContainerController" @@ -4178,7 +4186,76 @@ class TestContainerController(unittest.TestCase): account_ring=FakeRing(), container_ring=FakeRing(), object_ring=FakeRing(), - logger=FakeLogger()) + logger=debug_logger()) + + def test_convert_policy_to_index(self): + controller = swift.proxy.controllers.ContainerController(self.app, + 'a', 'c') + expected = { + 'zero': 0, + 'ZeRo': 0, + 'one': 1, + 'OnE': 1, + } + for name, index in expected.items(): + req = Request.blank('/a/c', headers={'Content-Length': '0', + 'Content-Type': 'text/plain', + POLICY: name}) + self.assertEqual(controller._convert_policy_to_index(req), index) + # default test + req = Request.blank('/a/c', headers={'Content-Length': '0', + 'Content-Type': 'text/plain'}) + self.assertEqual(controller._convert_policy_to_index(req), None) + # negative test + req = Request.blank('/a/c', headers={'Content-Length': '0', + 'Content-Type': 'text/plain', POLICY: 'nada'}) + self.assertRaises(HTTPException, controller._convert_policy_to_index, + req) + # storage policy two is deprecated + req = Request.blank('/a/c', headers={'Content-Length': '0', + 'Content-Type': 'text/plain', + POLICY: 'two'}) + self.assertRaises(HTTPException, controller._convert_policy_to_index, + req) + + def test_convert_index_to_name(self): + policy = random.choice(list(POLICIES)) + req = Request.blank('/v1/a/c') + with mocked_http_conn( + 200, 200, + headers={POLICY_INDEX: int(policy)}, + ) as fake_conn: + resp = req.get_response(self.app) + self.assertRaises(StopIteration, fake_conn.code_iter.next) + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.headers[POLICY], policy.name) + + def test_no_convert_index_to_name_when_container_not_found(self): + policy = random.choice(list(POLICIES)) + req = Request.blank('/v1/a/c') + with mocked_http_conn( + 200, 404, 404, 404, + headers={POLICY_INDEX: int(policy)}) as fake_conn: + resp = req.get_response(self.app) + self.assertRaises(StopIteration, fake_conn.code_iter.next) + self.assertEqual(resp.status_int, 404) + self.assertEqual(resp.headers[POLICY], None) + + def test_error_convert_index_to_name(self): + req = Request.blank('/v1/a/c') + with mocked_http_conn( + 200, 200, + headers={POLICY_INDEX: '-1'}) as fake_conn: + resp = req.get_response(self.app) + self.assertRaises(StopIteration, fake_conn.code_iter.next) + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.headers[POLICY], None) + error_lines = self.app.logger.get_lines_for_level('error') + self.assertEqual(2, len(error_lines)) + for msg in error_lines: + expected = "Could not translate " \ + "X-Backend-Storage-Policy-Index ('-1')" + self.assertTrue(expected in msg) def test_transfer_headers(self): src_headers = {'x-remove-versions-location': 'x', @@ -4287,6 +4364,59 @@ class TestContainerController(unittest.TestCase): self.app.account_autocreate = True test_status_map((404, 404, 404), 404, None, 404) + def test_PUT_policy_headers(self): + backend_requests = [] + + def capture_requests(ipaddr, port, device, partition, method, + path, headers=None, query_string=None): + if method == 'PUT': + backend_requests.append(headers) + + def test_policy(requested_policy): + with save_globals(): + mock_conn = set_http_connect(200, 201, 201, 201, + give_connect=capture_requests) + self.app.memcache.store = {} + req = Request.blank('/v1/a/test', method='PUT', + headers={'Content-Length': 0}) + if requested_policy: + expected_policy = requested_policy + req.headers[POLICY] = policy.name + else: + expected_policy = POLICIES.default + res = req.get_response(self.app) + if expected_policy.is_deprecated: + self.assertEquals(res.status_int, 400) + self.assertEqual(0, len(backend_requests)) + expected = 'is deprecated' + self.assertTrue(expected in res.body, + '%r did not include %r' % ( + res.body, expected)) + return + self.assertEquals(res.status_int, 201) + self.assertEqual( + expected_policy.object_ring.replicas, + len(backend_requests)) + for headers in backend_requests: + if not requested_policy: + self.assertFalse(POLICY_INDEX in headers) + self.assertTrue( + 'X-Backend-Storage-Policy-Default' in headers) + self.assertEqual( + int(expected_policy), + int(headers['X-Backend-Storage-Policy-Default'])) + else: + self.assertTrue(POLICY_INDEX in headers) + self.assertEqual(int(headers[POLICY_INDEX]), + policy.idx) + # make sure all mocked responses are consumed + self.assertRaises(StopIteration, mock_conn.code_iter.next) + + test_policy(None) # no policy header + for policy in POLICIES: + backend_requests = [] # reset backend requests + test_policy(policy) + def test_PUT(self): with save_globals(): controller = proxy_server.ContainerController(self.app, 'account', @@ -5930,6 +6060,7 @@ class TestProxyObjectPerformance(unittest.TestCase): @patch_policies([StoragePolicy(0, 'migrated'), StoragePolicy(1, 'ernie', True), + StoragePolicy(2, 'deprecated', is_deprecated=True), StoragePolicy(3, 'bert')]) class TestSwiftInfo(unittest.TestCase): def setUp(self): @@ -5971,6 +6102,8 @@ class TestSwiftInfo(unittest.TestCase): self.assertTrue('policies' in si) sorted_pols = sorted(si['policies'], key=operator.itemgetter('name')) self.assertEqual(len(sorted_pols), 3) + for policy in sorted_pols: + self.assertNotEquals(policy['name'], 'deprecated') self.assertEqual(sorted_pols[0]['name'], 'bert') self.assertEqual(sorted_pols[1]['name'], 'ernie') self.assertEqual(sorted_pols[2]['name'], 'migrated')