Add Storage Policy support to Containers

Containers now have a storage policy index associated with them,
stored in the container_stat table. This index is only settable at
container creation time (PUT request), and cannot be changed without
deleting and recreating the container. This is because a container's
policy index will apply to all its objects, so changing a container's
policy index would require moving large amounts of object data
around. If a user wants to change the policy for data in a container,
they must create a new container with the desired policy and move the
data over.

Keep status_changed_at up-to-date with status changes.

In particular during container recreation and replication.

When a container-server receives a PUT for a deleted database an extra UPDATE
is issued against the container_stat table to notate the x-timestamp of the
request.

During replication if merge_timestamps causes a container's status to change
(from DELETED to ACTIVE or vice-versa) the status_changed_at field is set to
the current time.

Accurate reporting of status_changed_at is useful for container replication
forensics and allows resolution of "set on create" attributes like the
upcoming storage_policy_index.

Expose Backend container info on deleted containers.

Include basic container info in backend headers on 404 responses from the
container server.  Default empty values are used as placeholders if the
database does not exist.

Specifically the X-Backend-Status-Changed-At, X-Backend-DELETE-Timestamp and
the X-Backend-Storage-Policy-Index value will be needed by the reconciler to
deal with reconciling out of order object writes in the face of recently
deleted containers.

 * Add "status_changed_at" key to the response from ContainerBroker.get_info.
 * Add "Status Timestamp" field to swift.cli.info.print_db_info_metadata.
 * Add "status_changed_at" key to the response from AccountBroker.get_info.

DocImpact
Implements: blueprint storage-policies
Change-Id: Ie6d388f067f5b096b0f96faef151120ba23c8748
This commit is contained in:
Clay Gerrard 2014-05-27 16:57:25 -07:00
parent 7624b198cf
commit 4321bb0af6
16 changed files with 2269 additions and 318 deletions

View File

@ -37,7 +37,7 @@ class AccountBroker(DatabaseBroker):
db_contains_type = 'container'
db_reclaim_timestamp = 'delete_timestamp'
def _initialize(self, conn, put_timestamp):
def _initialize(self, conn, put_timestamp, **kwargs):
"""
Create a brand new account database (tables, indices, triggers, etc.)

View File

@ -215,11 +215,15 @@ class DatabaseBroker(object):
"""
return self.db_file
def initialize(self, put_timestamp=None):
def initialize(self, put_timestamp=None, storage_policy_index=None):
"""
Create the DB
The storage_policy_index is passed through to the subclass's
``_initialize`` method. It is ignored by ``AccountBroker``.
:param put_timestamp: timestamp of initial PUT request
:param storage_policy_index: only required for containers
"""
if self.db_file == ':memory:':
tmp_db_file = None
@ -277,7 +281,8 @@ class DatabaseBroker(object):
""")
if not put_timestamp:
put_timestamp = normalize_timestamp(0)
self._initialize(conn, put_timestamp)
self._initialize(conn, put_timestamp,
storage_policy_index=storage_policy_index)
conn.commit()
if tmp_db_file:
conn.close()
@ -428,6 +433,7 @@ class DatabaseBroker(object):
:param put_timestamp: put timestamp
:param delete_timestamp: delete timestamp
"""
current_status = self.is_deleted()
with self.get() as conn:
conn.execute('''
UPDATE %s_stat SET created_at=MIN(?, created_at),
@ -435,6 +441,9 @@ class DatabaseBroker(object):
delete_timestamp=MAX(?, delete_timestamp)
''' % self.db_type, (created_at, put_timestamp, delete_timestamp))
conn.commit()
if self.is_deleted() != current_status:
timestamp = normalize_timestamp(time.time())
self.update_status_changed_at(timestamp)
def get_items_since(self, start, count):
"""
@ -486,32 +495,36 @@ class DatabaseBroker(object):
result.append({'remote_id': row[0], 'sync_point': row[1]})
return result
def get_max_row(self):
query = '''
SELECT SQLITE_SEQUENCE.seq
FROM SQLITE_SEQUENCE
WHERE SQLITE_SEQUENCE.name == '%s'
LIMIT 1
''' % (self.db_contains_type)
with self.get() as conn:
row = conn.execute(query).fetchone()
return row[0] if row else -1
def get_replication_info(self):
"""
Get information about the DB required for replication.
:returns: dict containing keys: hash, id, created_at, put_timestamp,
delete_timestamp, count, max_row, and metadata
:returns: dict containing keys from get_info plus max_row and metadata
Note:: get_info's <db_contains_type>_count is translated to just
"count" and metadata is the raw string.
"""
info = self.get_info()
info['count'] = info.pop('%s_count' % self.db_contains_type)
info['metadata'] = self.get_raw_metadata()
info['max_row'] = self.get_max_row()
return info
def get_info(self):
self._commit_puts_stale_ok()
query_part1 = '''
SELECT hash, id, created_at, put_timestamp, delete_timestamp,
%s_count AS count,
CASE WHEN SQLITE_SEQUENCE.seq IS NOT NULL
THEN SQLITE_SEQUENCE.seq ELSE -1 END AS max_row, ''' % \
self.db_contains_type
query_part2 = '''
FROM (%s_stat LEFT JOIN SQLITE_SEQUENCE
ON SQLITE_SEQUENCE.name == '%s') LIMIT 1
''' % (self.db_type, self.db_contains_type)
with self.get() as conn:
try:
curs = conn.execute(query_part1 + 'metadata' + query_part2)
except sqlite3.OperationalError as err:
if 'no such column: metadata' not in str(err):
raise
curs = conn.execute(query_part1 + "'' as metadata" +
query_part2)
curs = conn.execute('SELECT * from %s_stat' % self.db_type)
curs.row_factory = dict_factory
return curs.fetchone()
@ -621,13 +634,7 @@ class DatabaseBroker(object):
with open(self.db_file, 'rb+') as fp:
fallocate(fp.fileno(), int(prealloc_size))
@property
def metadata(self):
"""
Returns the metadata dict for the database. The metadata dict values
are tuples of (value, timestamp) where the timestamp indicates when
that key was set to that value.
"""
def get_raw_metadata(self):
with self.get() as conn:
try:
metadata = conn.execute('SELECT metadata FROM %s_stat' %
@ -636,6 +643,16 @@ class DatabaseBroker(object):
if 'no such column: metadata' not in str(err):
raise
metadata = ''
return metadata
@property
def metadata(self):
"""
Returns the metadata dict for the database. The metadata dict values
are tuples of (value, timestamp) where the timestamp indicates when
that key was set to that value.
"""
metadata = self.get_raw_metadata()
if metadata:
metadata = json.loads(metadata)
utf8encodekeys(metadata)
@ -758,3 +775,16 @@ class DatabaseBroker(object):
' WHERE put_timestamp < ?' % self.db_type,
(timestamp, timestamp))
conn.commit()
def update_status_changed_at(self, timestamp):
"""
Update the status_changed_at field in the stat table. Only
modifies status_changed_at if the timestamp is greater than the
current status_changed_at timestamp.
"""
with self.get() as conn:
conn.execute(
'UPDATE %s_stat SET status_changed_at = ?'
' WHERE status_changed_at < ?' % self.db_type,
(timestamp, timestamp))
conn.commit()

View File

@ -30,9 +30,9 @@ from swift.common.daemon import Daemon
class ContainerAuditor(Daemon):
"""Audit containers."""
def __init__(self, conf):
def __init__(self, conf, logger=None):
self.conf = conf
self.logger = get_logger(conf, log_route='container-auditor')
self.logger = logger or get_logger(conf, log_route='container-auditor')
self.devices = conf.get('devices', '/srv/node')
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
self.interval = int(conf.get('interval', 1800))

View File

@ -30,6 +30,109 @@ from swift.common.db import DatabaseBroker, DatabaseConnectionError, \
DATADIR = 'containers'
POLICY_STAT_TABLE_CREATE = '''
CREATE TABLE policy_stat (
storage_policy_index INTEGER PRIMARY KEY,
object_count INTEGER DEFAULT 0,
bytes_used INTEGER DEFAULT 0
);
'''
POLICY_STAT_TRIGGER_SCRIPT = '''
CREATE TRIGGER object_insert_policy_stat AFTER INSERT ON object
BEGIN
UPDATE policy_stat
SET object_count = object_count + (1 - new.deleted),
bytes_used = bytes_used + new.size
WHERE storage_policy_index = new.storage_policy_index;
INSERT INTO policy_stat (
storage_policy_index, object_count, bytes_used)
SELECT new.storage_policy_index,
(1 - new.deleted),
new.size
WHERE NOT EXISTS(
SELECT changes() as change
FROM policy_stat
WHERE change <> 0
);
UPDATE container_info
SET hash = chexor(hash, new.name, new.created_at);
END;
CREATE TRIGGER object_delete_policy_stat AFTER DELETE ON object
BEGIN
UPDATE policy_stat
SET object_count = object_count - (1 - old.deleted),
bytes_used = bytes_used - old.size
WHERE storage_policy_index = old.storage_policy_index;
UPDATE container_info
SET hash = chexor(hash, old.name, old.created_at);
END;
'''
CONTAINER_INFO_TABLE_SCRIPT = '''
CREATE TABLE container_info (
account TEXT,
container TEXT,
created_at TEXT,
put_timestamp TEXT DEFAULT '0',
delete_timestamp TEXT DEFAULT '0',
reported_put_timestamp TEXT DEFAULT '0',
reported_delete_timestamp TEXT DEFAULT '0',
reported_object_count INTEGER DEFAULT 0,
reported_bytes_used INTEGER DEFAULT 0,
hash TEXT default '00000000000000000000000000000000',
id TEXT,
status TEXT DEFAULT '',
status_changed_at TEXT DEFAULT '0',
metadata TEXT DEFAULT '',
x_container_sync_point1 INTEGER DEFAULT -1,
x_container_sync_point2 INTEGER DEFAULT -1,
storage_policy_index INTEGER DEFAULT 0,
reconciler_sync_point INTEGER DEFAULT -1
);
'''
CONTAINER_STAT_VIEW_SCRIPT = '''
CREATE VIEW container_stat
AS SELECT ci.account, ci.container, ci.created_at,
ci.put_timestamp, ci.delete_timestamp,
ci.reported_put_timestamp, ci.reported_delete_timestamp,
ci.reported_object_count, ci.reported_bytes_used, ci.hash,
ci.id, ci.status, ci.status_changed_at, ci.metadata,
ci.x_container_sync_point1, ci.x_container_sync_point2,
ci.reconciler_sync_point,
ci.storage_policy_index,
coalesce(ps.object_count, 0) AS object_count,
coalesce(ps.bytes_used, 0) AS bytes_used
FROM container_info ci LEFT JOIN policy_stat ps
ON ci.storage_policy_index = ps.storage_policy_index;
CREATE TRIGGER container_stat_update
INSTEAD OF UPDATE ON container_stat
BEGIN
UPDATE container_info
SET account = NEW.account,
container = NEW.container,
created_at = NEW.created_at,
put_timestamp = NEW.put_timestamp,
delete_timestamp = NEW.delete_timestamp,
reported_put_timestamp = NEW.reported_put_timestamp,
reported_delete_timestamp = NEW.reported_delete_timestamp,
reported_object_count = NEW.reported_object_count,
reported_bytes_used = NEW.reported_bytes_used,
hash = NEW.hash,
id = NEW.id,
status = NEW.status,
status_changed_at = NEW.status_changed_at,
metadata = NEW.metadata,
x_container_sync_point1 = NEW.x_container_sync_point1,
x_container_sync_point2 = NEW.x_container_sync_point2,
storage_policy_index = NEW.storage_policy_index,
reconciler_sync_point = NEW.reconciler_sync_point;
END;
'''
class ContainerBroker(DatabaseBroker):
"""Encapsulates working with a container database."""
@ -37,7 +140,14 @@ class ContainerBroker(DatabaseBroker):
db_contains_type = 'object'
db_reclaim_timestamp = 'created_at'
def _initialize(self, conn, put_timestamp):
@property
def storage_policy_index(self):
if not hasattr(self, '_storage_policy_index'):
self._storage_policy_index = \
self.get_info()['storage_policy_index']
return self._storage_policy_index
def _initialize(self, conn, put_timestamp, storage_policy_index):
"""
Create a brand new container database (tables, indices, triggers, etc.)
"""
@ -48,7 +158,9 @@ class ContainerBroker(DatabaseBroker):
raise ValueError(
'Attempting to create a new database with no container set')
self.create_object_table(conn)
self.create_container_stat_table(conn, put_timestamp)
self.create_policy_stat_table(conn, storage_policy_index)
self.create_container_info_table(conn, put_timestamp,
storage_policy_index)
def create_object_table(self, conn):
"""
@ -65,74 +177,70 @@ class ContainerBroker(DatabaseBroker):
size INTEGER,
content_type TEXT,
etag TEXT,
deleted INTEGER DEFAULT 0
deleted INTEGER DEFAULT 0,
storage_policy_index INTEGER DEFAULT 0
);
CREATE INDEX ix_object_deleted_name ON object (deleted, name);
CREATE TRIGGER object_insert AFTER INSERT ON object
BEGIN
UPDATE container_stat
SET object_count = object_count + (1 - new.deleted),
bytes_used = bytes_used + new.size,
hash = chexor(hash, new.name, new.created_at);
END;
CREATE TRIGGER object_update BEFORE UPDATE ON object
BEGIN
SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT');
END;
CREATE TRIGGER object_delete AFTER DELETE ON object
BEGIN
UPDATE container_stat
SET object_count = object_count - (1 - old.deleted),
bytes_used = bytes_used - old.size,
hash = chexor(hash, old.name, old.created_at);
END;
""")
""" + POLICY_STAT_TRIGGER_SCRIPT)
def create_container_stat_table(self, conn, put_timestamp=None):
def create_container_info_table(self, conn, put_timestamp,
storage_policy_index):
"""
Create the container_stat table which is specific to the container DB.
Create the container_info table which is specific to the container DB.
Not a part of Pluggable Back-ends, internal to the baseline code.
Also creates the container_stat view.
:param conn: DB connection object
:param put_timestamp: put timestamp
:param storage_policy_index: storage policy index
"""
if put_timestamp is None:
put_timestamp = normalize_timestamp(0)
conn.executescript("""
CREATE TABLE container_stat (
account TEXT,
container TEXT,
created_at TEXT,
put_timestamp TEXT DEFAULT '0',
delete_timestamp TEXT DEFAULT '0',
object_count INTEGER,
bytes_used INTEGER,
reported_put_timestamp TEXT DEFAULT '0',
reported_delete_timestamp TEXT DEFAULT '0',
reported_object_count INTEGER DEFAULT 0,
reported_bytes_used INTEGER DEFAULT 0,
hash TEXT default '00000000000000000000000000000000',
id TEXT,
status TEXT DEFAULT '',
status_changed_at TEXT DEFAULT '0',
metadata TEXT DEFAULT '',
x_container_sync_point1 INTEGER DEFAULT -1,
x_container_sync_point2 INTEGER DEFAULT -1
);
# The container_stat view is for compatibility; old versions of Swift
# expected a container_stat table with columns "object_count" and
# "bytes_used", but when that stuff became per-storage-policy and
# moved to the policy_stat table, we stopped creating those columns in
# container_stat.
#
# To retain compatibility, we create the container_stat view with some
# triggers to make it behave like the old container_stat table. This
# way, if an old version of Swift encounters a database with the new
# schema, it can still work.
#
# Note that this can occur during a rolling Swift upgrade if a DB gets
# rsynced from an old node to a new, so it's necessary for
# availability during upgrades. The fact that it enables downgrades is
# a nice bonus.
conn.executescript(CONTAINER_INFO_TABLE_SCRIPT +
CONTAINER_STAT_VIEW_SCRIPT)
conn.execute("""
INSERT INTO container_info (account, container, created_at, id,
put_timestamp, status_changed_at, storage_policy_index)
VALUES (?, ?, ?, ?, ?, ?, ?);
""", (self.account, self.container, normalize_timestamp(time.time()),
str(uuid4()), put_timestamp, put_timestamp,
storage_policy_index))
INSERT INTO container_stat (object_count, bytes_used)
VALUES (0, 0);
""")
conn.execute('''
UPDATE container_stat
SET account = ?, container = ?, created_at = ?, id = ?,
put_timestamp = ?
''', (self.account, self.container, normalize_timestamp(time.time()),
str(uuid4()), put_timestamp))
def create_policy_stat_table(self, conn, storage_policy_index=0):
"""
Create policy_stat table.
:param conn: DB connection object
:param storage_policy_index: the policy_index the container is
being created with
"""
conn.executescript(POLICY_STAT_TABLE_CREATE)
conn.execute("""
INSERT INTO policy_stat (storage_policy_index)
VALUES (?)
""", (storage_policy_index,))
def get_db_version(self, conn):
if self._db_version == -1:
@ -165,14 +273,19 @@ class ContainerBroker(DatabaseBroker):
def _commit_puts_load(self, item_list, entry):
"""See :func:`swift.common.db.DatabaseBroker._commit_puts_load`"""
(name, timestamp, size, content_type, etag, deleted) = \
pickle.loads(entry.decode('base64'))
data = pickle.loads(entry.decode('base64'))
(name, timestamp, size, content_type, etag, deleted) = data[:6]
if len(data) > 6:
storage_policy_index = data[6]
else:
storage_policy_index = 0
item_list.append({'name': name,
'created_at': timestamp,
'size': size,
'content_type': content_type,
'etag': etag,
'deleted': deleted})
'deleted': deleted,
'storage_policy_index': storage_policy_index})
def empty(self):
"""
@ -182,20 +295,30 @@ class ContainerBroker(DatabaseBroker):
"""
self._commit_puts_stale_ok()
with self.get() as conn:
row = conn.execute(
'SELECT object_count from container_stat').fetchone()
try:
row = conn.execute(
'SELECT max(object_count) from policy_stat').fetchone()
except sqlite3.OperationalError as err:
if not any(msg in str(err) for msg in (
"no such column: storage_policy_index",
"no such table: policy_stat")):
raise
row = conn.execute(
'SELECT object_count from container_stat').fetchone()
return (row[0] == 0)
def delete_object(self, name, timestamp):
def delete_object(self, name, timestamp, storage_policy_index=0):
"""
Mark an object deleted.
:param name: object name to be deleted
:param timestamp: timestamp when the object was marked as deleted
"""
self.put_object(name, timestamp, 0, 'application/deleted', 'noetag', 1)
self.put_object(name, timestamp, 0, 'application/deleted', 'noetag',
deleted=1, storage_policy_index=storage_policy_index)
def put_object(self, name, timestamp, size, content_type, etag, deleted=0):
def put_object(self, name, timestamp, size, content_type, etag, deleted=0,
storage_policy_index=0):
"""
Creates an object in the DB with its metadata.
@ -206,10 +329,12 @@ class ContainerBroker(DatabaseBroker):
:param etag: object etag
:param deleted: if True, marks the object as deleted and sets the
deteleted_at timestamp to timestamp
:param storage_policy_index: the storage policy index for the object
"""
record = {'name': name, 'created_at': timestamp, 'size': size,
'content_type': content_type, 'etag': etag,
'deleted': deleted}
'deleted': deleted,
'storage_policy_index': storage_policy_index}
if self.db_file == ':memory:':
self.merge_items([record])
return
@ -231,93 +356,101 @@ class ContainerBroker(DatabaseBroker):
# delimiter
fp.write(':')
fp.write(pickle.dumps(
(name, timestamp, size, content_type, etag, deleted),
(name, timestamp, size, content_type, etag, deleted,
storage_policy_index),
protocol=PICKLE_PROTOCOL).encode('base64'))
fp.flush()
def is_deleted(self, timestamp=None):
def is_deleted(self, **kwargs):
"""
Check if the DB is considered to be deleted.
:returns: True if the DB is considered to be deleted, False otherwise
"""
_info, is_deleted = self.get_info_is_deleted(**kwargs)
return is_deleted
def get_info_is_deleted(self, timestamp=None):
"""
Get the is_deleted status and info for the container.
:returns: a tuple, in the form (info, is_deleted) info is a dict as
returned by get_info and is_deleted is a boolean.
"""
if self.db_file != ':memory:' and not os.path.exists(self.db_file):
return True
self._commit_puts_stale_ok()
with self.get() as conn:
row = conn.execute('''
SELECT put_timestamp, delete_timestamp, object_count
FROM container_stat''').fetchone()
# leave this db as a tombstone for a consistency window
if timestamp and row['delete_timestamp'] > timestamp:
return False
# The container is considered deleted if the delete_timestamp
# value is greater than the put_timestamp, and there are no
# objects in the container.
return (row['object_count'] in (None, '', 0, '0')) and \
(float(row['delete_timestamp']) > float(row['put_timestamp']))
return {}, True
info = self.get_info()
# leave this db as a tombstone for a consistency window
if timestamp and info['delete_timestamp'] > timestamp:
return info, False
# The container is considered deleted if the delete_timestamp
# value is greater than the put_timestamp, and there are no
# objects in the container.
is_deleted = (info['object_count'] in (None, '', 0, '0')) and \
(float(info['delete_timestamp']) > float(info['put_timestamp']))
return info, is_deleted
def get_info(self):
"""
Get global data for the container.
:returns: dict with keys: account, container, created_at,
put_timestamp, delete_timestamp, object_count, bytes_used,
reported_put_timestamp, reported_delete_timestamp,
reported_object_count, reported_bytes_used, hash, id,
x_container_sync_point1, and x_container_sync_point2.
put_timestamp, delete_timestamp, status_changed_at,
object_count, bytes_used, reported_put_timestamp,
reported_delete_timestamp, reported_object_count,
reported_bytes_used, hash, id, x_container_sync_point1,
x_container_sync_point2, and storage_policy_index.
"""
self._commit_puts_stale_ok()
with self.get() as conn:
data = None
trailing = 'x_container_sync_point1, x_container_sync_point2'
trailing_sync = 'x_container_sync_point1, x_container_sync_point2'
trailing_pol = 'storage_policy_index'
errors = set()
while not data:
try:
data = conn.execute('''
data = conn.execute(('''
SELECT account, container, created_at, put_timestamp,
delete_timestamp, object_count, bytes_used,
delete_timestamp, status_changed_at,
object_count, bytes_used,
reported_put_timestamp, reported_delete_timestamp,
reported_object_count, reported_bytes_used, hash,
id, %s
FROM container_stat
''' % (trailing,)).fetchone()
id, %s, %s
FROM container_stat
''') % (trailing_sync, trailing_pol)).fetchone()
except sqlite3.OperationalError as err:
if 'no such column: x_container_sync_point' in str(err):
trailing = '-1 AS x_container_sync_point1, ' \
'-1 AS x_container_sync_point2'
err_msg = str(err)
if err_msg in errors:
# only attempt migration once
raise
errors.add(err_msg)
if 'no such column: storage_policy_index' in err_msg:
trailing_pol = '0 AS storage_policy_index'
elif 'no such column: x_container_sync_point' in err_msg:
trailing_sync = '-1 AS x_container_sync_point1, ' \
'-1 AS x_container_sync_point2'
else:
raise
data = dict(data)
# populate instance cache
self._storage_policy_index = data['storage_policy_index']
self.account = data['account']
self.container = data['container']
return data
def set_x_container_sync_points(self, sync_point1, sync_point2):
with self.get() as conn:
orig_isolation_level = conn.isolation_level
try:
# We turn off auto-transactions to ensure the alter table
# commands are part of the transaction.
conn.isolation_level = None
conn.execute('BEGIN')
try:
self._set_x_container_sync_points(conn, sync_point1,
sync_point2)
except sqlite3.OperationalError as err:
if 'no such column: x_container_sync_point' not in \
str(err):
raise
conn.execute('''
ALTER TABLE container_stat
ADD COLUMN x_container_sync_point1 INTEGER DEFAULT -1
''')
conn.execute('''
ALTER TABLE container_stat
ADD COLUMN x_container_sync_point2 INTEGER DEFAULT -1
''')
self._set_x_container_sync_points(conn, sync_point1,
sync_point2)
conn.execute('COMMIT')
finally:
conn.isolation_level = orig_isolation_level
self._set_x_container_sync_points(conn, sync_point1,
sync_point2)
except sqlite3.OperationalError as err:
if 'no such column: x_container_sync_point' not in \
str(err):
raise
self._migrate_add_container_sync_points(conn)
self._set_x_container_sync_points(conn, sync_point1,
sync_point2)
conn.commit()
def _set_x_container_sync_points(self, conn, sync_point1, sync_point2):
if sync_point1 is not None and sync_point2 is not None:
@ -337,6 +470,79 @@ class ContainerBroker(DatabaseBroker):
SET x_container_sync_point2 = ?
''', (sync_point2,))
def get_policy_stats(self):
with self.get() as conn:
try:
info = conn.execute('''
SELECT storage_policy_index, object_count, bytes_used
FROM policy_stat
''').fetchall()
except sqlite3.OperationalError as err:
if not any(msg in str(err) for msg in (
"no such column: storage_policy_index",
"no such table: policy_stat")):
raise
info = conn.execute('''
SELECT 0 as storage_policy_index, object_count, bytes_used
FROM container_stat
''').fetchall()
policy_stats = {}
for row in info:
stats = dict(row)
key = stats.pop('storage_policy_index')
policy_stats[key] = stats
return policy_stats
def has_multiple_policies(self):
with self.get() as conn:
try:
curs = conn.execute('''
SELECT count(storage_policy_index)
FROM policy_stat
''').fetchone()
except sqlite3.OperationalError as err:
if 'no such table: policy_stat' not in str(err):
raise
# no policy_stat row
return False
if curs and curs[0] > 1:
return True
# only one policy_stat row
return False
def set_storage_policy_index(self, policy_index, timestamp=None):
"""
Update the container_stat policy_index and status_changed_at.
"""
if timestamp is None:
timestamp = normalize_timestamp(time.time())
def _setit(conn):
conn.execute('''
INSERT OR IGNORE INTO policy_stat (storage_policy_index)
VALUES (?)
''', (policy_index,))
conn.execute('''
UPDATE container_stat
SET storage_policy_index = ?,
status_changed_at = MAX(?, status_changed_at)
WHERE storage_policy_index <> ?
''', (policy_index, timestamp, policy_index))
conn.commit()
with self.get() as conn:
try:
_setit(conn)
except sqlite3.OperationalError as err:
if not any(msg in str(err) for msg in (
"no such column: storage_policy_index",
"no such table: policy_stat")):
raise
self._migrate_add_storage_policy(conn)
_setit(conn)
self._storage_policy_index = policy_index
def reported(self, put_timestamp, delete_timestamp, object_count,
bytes_used):
"""
@ -356,7 +562,7 @@ class ContainerBroker(DatabaseBroker):
conn.commit()
def list_objects_iter(self, limit, marker, end_marker, prefix, delimiter,
path=None):
path=None, storage_policy_index=0):
"""
Get a list of objects sorted by name starting at marker onward, up
to limit entries. Entries will begin with the prefix and will not
@ -409,9 +615,27 @@ class ContainerBroker(DatabaseBroker):
query += ' +deleted = 0'
else:
query += ' deleted = 0'
query += ' ORDER BY name LIMIT ?'
query_args.append(limit - len(results))
curs = conn.execute(query, query_args)
orig_tail_query = '''
ORDER BY name LIMIT ?
'''
orig_tail_args = [limit - len(results)]
# storage policy filter
policy_tail_query = '''
AND storage_policy_index = ?
''' + orig_tail_query
policy_tail_args = [storage_policy_index] + orig_tail_args
tail_query, tail_args = \
policy_tail_query, policy_tail_args
try:
curs = conn.execute(query + tail_query,
tuple(query_args + tail_args))
except sqlite3.OperationalError as err:
if 'no such column: storage_policy_index' not in str(err):
raise
tail_query, tail_args = \
orig_tail_query, orig_tail_args
curs = conn.execute(query + tail_query,
tuple(query_args + tail_args))
curs.row_factory = None
if prefix is None:
@ -466,26 +690,34 @@ class ContainerBroker(DatabaseBroker):
'size', 'content_type', 'etag', 'deleted'}
:param source: if defined, update incoming_sync with the source
"""
with self.get() as conn:
def _really_merge_items(conn):
max_rowid = -1
for rec in item_list:
rec.setdefault('storage_policy_index', 0) # legacy
query = '''
DELETE FROM object
WHERE name = ? AND (created_at < ?)
AND storage_policy_index = ?
'''
if self.get_db_version(conn) >= 1:
query += ' AND deleted IN (0, 1)'
conn.execute(query, (rec['name'], rec['created_at']))
query = 'SELECT 1 FROM object WHERE name = ?'
conn.execute(query, (rec['name'], rec['created_at'],
rec['storage_policy_index']))
query = '''
SELECT 1 FROM object WHERE name = ?
AND storage_policy_index = ?
'''
if self.get_db_version(conn) >= 1:
query += ' AND deleted IN (0, 1)'
if not conn.execute(query, (rec['name'],)).fetchall():
if not conn.execute(query, (
rec['name'], rec['storage_policy_index'])).fetchall():
conn.execute('''
INSERT INTO object (name, created_at, size,
content_type, etag, deleted)
VALUES (?, ?, ?, ?, ?, ?)
content_type, etag, deleted, storage_policy_index)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', ([rec['name'], rec['created_at'], rec['size'],
rec['content_type'], rec['etag'], rec['deleted']]))
rec['content_type'], rec['etag'], rec['deleted'],
rec['storage_policy_index']]))
if source:
max_rowid = max(max_rowid, rec['ROWID'])
if source:
@ -500,3 +732,158 @@ class ContainerBroker(DatabaseBroker):
WHERE remote_id=?
''', (max_rowid, source))
conn.commit()
with self.get() as conn:
try:
return _really_merge_items(conn)
except sqlite3.OperationalError as err:
if 'no such column: storage_policy_index' not in str(err):
raise
self._migrate_add_storage_policy(conn)
return _really_merge_items(conn)
def get_reconciler_sync(self):
with self.get() as conn:
try:
return conn.execute('''
SELECT reconciler_sync_point FROM container_stat
''').fetchone()[0]
except sqlite3.OperationalError as err:
if "no such column: reconciler_sync_point" not in str(err):
raise
return -1
def update_reconciler_sync(self, point):
query = '''
UPDATE container_stat
SET reconciler_sync_point = ?
'''
with self.get() as conn:
try:
conn.execute(query, (point,))
except sqlite3.OperationalError as err:
if "no such column: reconciler_sync_point" not in str(err):
raise
self._migrate_add_storage_policy(conn)
conn.execute(query, (point,))
conn.commit()
def get_misplaced_since(self, start, count):
"""
Get a list of objects which are in a storage policy different
from the container's storage policy.
:param start: last reconciler sync point
:param count: maximum number of entries to get
:returns: list of dicts with keys: name, created_at, size,
content_type, etag, storage_policy_index
"""
qry = '''
SELECT ROWID, name, created_at, size, content_type, etag,
deleted, storage_policy_index
FROM object
WHERE ROWID > ?
AND storage_policy_index != (
SELECT storage_policy_index FROM container_stat LIMIT 1)
ORDER BY ROWID ASC LIMIT ?
'''
self._commit_puts_stale_ok()
with self.get() as conn:
try:
cur = conn.execute(qry, (start, count))
except sqlite3.OperationalError as err:
if "no such column: storage_policy_index" not in str(err):
raise
return []
return list(dict(row) for row in cur.fetchall())
def _migrate_add_container_sync_points(self, conn):
"""
Add the x_container_sync_point columns to the 'container_stat' table.
"""
conn.executescript('''
BEGIN;
ALTER TABLE container_stat
ADD COLUMN x_container_sync_point1 INTEGER DEFAULT -1;
ALTER TABLE container_stat
ADD COLUMN x_container_sync_point2 INTEGER DEFAULT -1;
COMMIT;
''')
def _migrate_add_storage_policy(self, conn):
"""
Migrate the container schema to support tracking objects from
multiple storage policies. If the container_stat table has any
pending migrations, they are applied now before copying into
container_info.
* create the 'policy_stat' table.
* copy the current 'object_count' and 'bytes_used' columns to a
row in the 'policy_stat' table.
* add the storage_policy_index column to the 'object' table.
* drop the 'object_insert' and 'object_delete' triggers.
* add the 'object_insert_policy_stat' and
'object_delete_policy_stat' triggers.
* create container_info table for non-policy container info
* insert values from container_stat into container_info
* drop container_stat table
* create container_stat view
"""
# I tried just getting the list of column names in the current
# container_stat table with a pragma table_info, but could never get
# it inside the same transaction as the DDL (non-DML) statements:
# https://docs.python.org/2/library/sqlite3.html
# #controlling-transactions
# So we just apply all pending migrations to container_stat and copy a
# static known list of column names into container_info.
try:
self._migrate_add_container_sync_points(conn)
except sqlite3.OperationalError as e:
if 'duplicate column' in str(e):
conn.execute('ROLLBACK;')
else:
raise
try:
conn.executescript("""
ALTER TABLE container_stat
ADD COLUMN metadata TEXT DEFAULT '';
""")
except sqlite3.OperationalError as e:
if 'duplicate column' not in str(e):
raise
column_names = ', '.join((
'account', 'container', 'created_at', 'put_timestamp',
'delete_timestamp', 'reported_put_timestamp',
'reported_object_count', 'reported_bytes_used', 'hash', 'id',
'status', 'status_changed_at', 'metadata',
'x_container_sync_point1', 'x_container_sync_point2'))
conn.executescript(
'BEGIN;' +
POLICY_STAT_TABLE_CREATE +
'''
INSERT INTO policy_stat (
storage_policy_index, object_count, bytes_used)
SELECT 0, object_count, bytes_used
FROM container_stat;
ALTER TABLE object
ADD COLUMN storage_policy_index INTEGER DEFAULT 0;
DROP TRIGGER object_insert;
DROP TRIGGER object_delete;
''' +
POLICY_STAT_TRIGGER_SCRIPT +
CONTAINER_INFO_TABLE_SCRIPT +
'''
INSERT INTO container_info (%s)
SELECT %s FROM container_stat;
DROP TABLE IF EXISTS container_stat;
''' % (column_names, column_names) +
CONTAINER_STAT_VIEW_SCRIPT +
'COMMIT;')

View File

@ -38,12 +38,40 @@ from swift.common.bufferedhttp import http_connect
from swift.common.exceptions import ConnectionTimeout
from swift.common.db_replicator import ReplicatorRpc
from swift.common.http import HTTP_NOT_FOUND, is_success
from swift.common.storage_policy import POLICIES, POLICY_INDEX
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPConflict, \
HTTPCreated, HTTPInternalServerError, HTTPNoContent, HTTPNotFound, \
HTTPPreconditionFailed, HTTPMethodNotAllowed, Request, Response, \
HTTPInsufficientStorage, HTTPException, HeaderKeyDict
def gen_resp_headers(info, is_deleted=False):
"""
Convert container info dict to headers.
"""
# backend headers are always included
headers = {
'X-Backend-Timestamp': normalize_timestamp(info.get('created_at', 0)),
'X-Backend-PUT-Timestamp': normalize_timestamp(
info.get('put_timestamp', 0)),
'X-Backend-DELETE-Timestamp': normalize_timestamp(
info.get('delete_timestamp', 0)),
'X-Backend-Status-Changed-At': normalize_timestamp(
info.get('status_changed_at', 0)),
POLICY_INDEX: info.get('storage_policy_index', 0),
}
if not is_deleted:
# base container info on deleted containers is not exposed to client
headers.update({
'X-Container-Object-Count': info.get('object_count', 0),
'X-Container-Bytes-Used': info.get('bytes_used', 0),
'X-Timestamp': normalize_timestamp(info.get('created_at', 0)),
'X-PUT-Timestamp': normalize_timestamp(
info.get('put_timestamp', 0)),
})
return headers
class ContainerController(object):
"""WSGI Controller for the container server."""
@ -102,6 +130,32 @@ class ContainerController(object):
kwargs.setdefault('logger', self.logger)
return ContainerBroker(db_path, **kwargs)
def get_and_validate_policy_index(self, req):
"""
Validate that the index supplied maps to a policy.
:returns: policy index from request, or None if not present
:raises: HTTPBadRequest if the supplied index is bogus
"""
policy_index = req.headers.get(POLICY_INDEX, None)
if policy_index is None:
return None
try:
policy_index = int(policy_index)
except ValueError:
raise HTTPBadRequest(
request=req, content_type="text/plain",
body=("Invalid X-Storage-Policy-Index %r" % policy_index))
policy = POLICIES.get_by_index(policy_index)
if policy is None:
raise HTTPBadRequest(
request=req, content_type="text/plain",
body=("Invalid X-Storage-Policy-Index %r" % policy_index))
return int(policy)
def account_update(self, req, account, container, broker):
"""
Update the account server(s) with latest container info.
@ -199,9 +253,13 @@ class ContainerController(object):
broker = self._get_container_broker(drive, part, account, container)
if account.startswith(self.auto_create_account_prefix) and obj and \
not os.path.exists(broker.db_file):
requested_policy_index = (self.get_and_validate_policy_index(req)
or POLICIES.default.idx)
try:
broker.initialize(normalize_timestamp(
req.headers.get('x-timestamp') or time.time()))
broker.initialize(
normalize_timestamp(
req.headers.get('x-timestamp') or time.time()),
requested_policy_index)
except DatabaseAlreadyExists:
pass
if not os.path.exists(broker.db_file):
@ -225,19 +283,42 @@ class ContainerController(object):
return HTTPNoContent(request=req)
return HTTPNotFound()
def _update_or_create(self, req, broker, timestamp):
def _update_or_create(self, req, broker, timestamp, new_container_policy,
requested_policy_index):
"""
Create new database broker or update timestamps for existing database.
:param req: the swob request object
:param broker: the broker instance for the container
:param timestamp: internalized timestamp
:param new_container_policy: the storage policy index to use
when creating the container
:param requested_policy_index: the storage policy index sent in the
request, may be None
:returns: created, a bool, if database did not previously exist
"""
if not os.path.exists(broker.db_file):
try:
broker.initialize(timestamp)
broker.initialize(timestamp, new_container_policy)
except DatabaseAlreadyExists:
pass
else:
return True # created
created = broker.is_deleted()
recreated = broker.is_deleted()
if recreated:
# only set storage policy on deleted containers
broker.set_storage_policy_index(new_container_policy,
timestamp=timestamp)
elif requested_policy_index is not None:
# validate requested policy with existing container
if requested_policy_index != broker.storage_policy_index:
raise HTTPConflict(request=req)
broker.update_put_timestamp(timestamp)
if broker.is_deleted():
raise HTTPConflict(request=req)
return created
if recreated:
broker.update_status_changed_at(timestamp)
return recreated
@public
@timing_stats()
@ -257,13 +338,14 @@ class ContainerController(object):
return HTTPBadRequest(err)
if self.mount_check and not check_mount(self.root, drive):
return HTTPInsufficientStorage(drive=drive, request=req)
requested_policy_index = self.get_and_validate_policy_index(req)
timestamp = normalize_timestamp(req.headers['x-timestamp'])
broker = self._get_container_broker(drive, part, account, container)
if obj: # put container object
if account.startswith(self.auto_create_account_prefix) and \
not os.path.exists(broker.db_file):
try:
broker.initialize(timestamp)
broker.initialize(timestamp, 0)
except DatabaseAlreadyExists:
pass
if not os.path.exists(broker.db_file):
@ -273,20 +355,28 @@ class ContainerController(object):
req.headers['x-etag'])
return HTTPCreated(request=req)
else: # put container
created = self._update_or_create(req, broker, timestamp)
if requested_policy_index is None:
# use the default index sent by the proxy if available
new_container_policy = req.headers.get(
'X-Backend-Storage-Policy-Default', int(POLICIES.default))
else:
new_container_policy = requested_policy_index
created = self._update_or_create(req, broker,
timestamp,
new_container_policy,
requested_policy_index)
metadata = {}
metadata.update(
(key, (value, timestamp))
for key, value in req.headers.iteritems()
if key.lower() in self.save_headers or
is_sys_or_user_meta('container', key))
if metadata:
if 'X-Container-Sync-To' in metadata:
if 'X-Container-Sync-To' not in broker.metadata or \
metadata['X-Container-Sync-To'][0] != \
broker.metadata['X-Container-Sync-To'][0]:
broker.set_x_container_sync_points(-1, -1)
broker.update_metadata(metadata)
if 'X-Container-Sync-To' in metadata:
if 'X-Container-Sync-To' not in broker.metadata or \
metadata['X-Container-Sync-To'][0] != \
broker.metadata['X-Container-Sync-To'][0]:
broker.set_x_container_sync_points(-1, -1)
broker.update_metadata(metadata)
resp = self.account_update(req, account, container, broker)
if resp:
return resp
@ -307,15 +397,10 @@ class ContainerController(object):
broker = self._get_container_broker(drive, part, account, container,
pending_timeout=0.1,
stale_reads_ok=True)
if broker.is_deleted():
return HTTPNotFound(request=req)
info = broker.get_info()
headers = {
'X-Container-Object-Count': info['object_count'],
'X-Container-Bytes-Used': info['bytes_used'],
'X-Timestamp': info['created_at'],
'X-PUT-Timestamp': info['put_timestamp'],
}
info, is_deleted = broker.get_info_is_deleted()
headers = gen_resp_headers(info, is_deleted=is_deleted)
if is_deleted:
return HTTPNotFound(request=req, headers=headers)
headers.update(
(key, value)
for key, (value, timestamp) in broker.metadata.iteritems()
@ -377,22 +462,17 @@ class ContainerController(object):
broker = self._get_container_broker(drive, part, account, container,
pending_timeout=0.1,
stale_reads_ok=True)
if broker.is_deleted():
return HTTPNotFound(request=req)
info = broker.get_info()
info, is_deleted = broker.get_info_is_deleted()
resp_headers = gen_resp_headers(info, is_deleted=is_deleted)
if is_deleted:
return HTTPNotFound(request=req, headers=resp_headers)
container_list = broker.list_objects_iter(limit, marker, end_marker,
prefix, delimiter, path)
return self.create_listing(req, out_content_type, info,
return self.create_listing(req, out_content_type, info, resp_headers,
broker.metadata, container_list, container)
def create_listing(self, req, out_content_type, info, metadata,
container_list, container):
resp_headers = {
'X-Container-Object-Count': info['object_count'],
'X-Container-Bytes-Used': info['bytes_used'],
'X-Timestamp': info['created_at'],
'X-PUT-Timestamp': info['put_timestamp'],
}
def create_listing(self, req, out_content_type, info, resp_headers,
metadata, container_list, container):
for key, (value, timestamp) in metadata.iteritems():
if value and (key.lower() in self.save_headers or
is_sys_or_user_meta('container', key)):

View File

@ -50,6 +50,7 @@ from swift.common.swob import Request, Response, HeaderKeyDict, Range, \
HTTPException, HTTPRequestedRangeNotSatisfiable
from swift.common.request_helpers import strip_sys_meta_prefix, \
strip_user_meta_prefix, is_user_meta, is_sys_meta, is_sys_or_user_meta
from swift.common.storage_policy import POLICY_INDEX, POLICY, POLICIES
def update_headers(response, headers):
@ -1201,6 +1202,16 @@ class Controller(object):
container, obj, res)
except ValueError:
pass
# if a backend policy index is present in resp headers, translate it
# here with the friendly policy name
if POLICY_INDEX in res.headers and is_success(res.status_int):
policy = POLICIES.get_by_index(res.headers[POLICY_INDEX])
if policy:
res.headers[POLICY] = policy.name
else:
self.app.logger.error(
'Could not translate %s (%r) from %r to policy',
POLICY_INDEX, res.headers[POLICY_INDEX], path)
return res
def is_origin_allowed(self, cors_info, origin):

View File

@ -23,6 +23,7 @@ from swift.common import constraints
from swift.common.http import HTTP_ACCEPTED
from swift.proxy.controllers.base import Controller, delay_denial, \
cors_validation, clear_info_cache
from swift.common.storage_policy import POLICIES, POLICY, POLICY_INDEX
from swift.common.swob import HTTPBadRequest, HTTPForbidden, \
HTTPNotFound
@ -34,7 +35,7 @@ class ContainerController(Controller):
# Ensure these are all lowercase
pass_through_headers = ['x-container-read', 'x-container-write',
'x-container-sync-key', 'x-container-sync-to',
'x-versions-location']
'x-versions-location', POLICY_INDEX.lower()]
def __init__(self, app, account_name, container_name, **kwargs):
Controller.__init__(self, app)
@ -47,6 +48,27 @@ class ContainerController(Controller):
'x-remove-%s-write' % st,
'x-remove-versions-location']
def _convert_policy_to_index(self, req):
"""
Helper method to convert a policy name (from a request from a client)
to a policy index (for a request to a backend).
:param req: incoming request
"""
policy_name = req.headers.get(POLICY)
if not policy_name:
return
policy = POLICIES.get_by_name(policy_name)
if not policy:
raise HTTPBadRequest(request=req,
content_type="text/plain",
body=("Invalid %s '%s'"
% (POLICY, policy_name)))
if policy.is_deprecated:
body = 'Storage Policy %r is deprecated' % (policy.name)
raise HTTPBadRequest(request=req, body=body)
return int(policy)
def clean_acls(self, req):
if 'swift.clean_acl' in req.environ:
for header in ('x-container-read', 'x-container-write'):
@ -101,6 +123,7 @@ class ContainerController(Controller):
self.clean_acls(req) or check_metadata(req, 'container')
if error_response:
return error_response
policy_index = self._convert_policy_to_index(req)
if not req.environ.get('swift_owner'):
for key in self.app.swift_owner_headers:
req.headers.pop(key, None)
@ -128,7 +151,8 @@ class ContainerController(Controller):
container_partition, containers = self.app.container_ring.get_nodes(
self.account_name, self.container_name)
headers = self._backend_requests(req, len(containers),
account_partition, accounts)
account_partition, accounts,
policy_index)
clear_info_cache(self.app, req.environ,
self.account_name, self.container_name)
resp = self.make_requests(
@ -183,9 +207,14 @@ class ContainerController(Controller):
return HTTPNotFound(request=req)
return resp
def _backend_requests(self, req, n_outgoing,
account_partition, accounts):
def _backend_requests(self, req, n_outgoing, account_partition, accounts,
policy_index=None):
additional = {'X-Timestamp': normalize_timestamp(time.time())}
if policy_index is None:
additional['X-Backend-Storage-Policy-Default'] = \
int(POLICIES.default)
else:
additional[POLICY_INDEX] = str(policy_index)
headers = [self.generate_request_headers(req, transfer=True,
additional=additional)
for _junk in range(n_outgoing)]

View File

@ -22,6 +22,7 @@ import errno
import sys
from contextlib import contextmanager, closing
from collections import defaultdict, Iterable
from numbers import Number
from tempfile import NamedTemporaryFile
import time
from eventlet.green import socket
@ -34,7 +35,6 @@ from hashlib import md5
from eventlet import sleep, Timeout
import logging.handlers
from httplib import HTTPException
from numbers import Number
from swift.common import storage_policy
import functools
import cPickle as pickle
@ -327,6 +327,22 @@ def temptree(files, contents=''):
rmtree(tempdir)
def with_tempdir(f):
"""
Decorator to give a single test a tempdir as argument to test method.
"""
@functools.wraps(f)
def wrapped(*args, **kwargs):
tempdir = mkdtemp()
args = list(args)
args.append(tempdir)
try:
return f(*args, **kwargs)
finally:
rmtree(tempdir)
return wrapped
class NullLoggingHandler(logging.Handler):
def emit(self, record):

View File

@ -28,6 +28,19 @@ from time import sleep, time
from swift.common import ring, utils
class TestRingBase(unittest.TestCase):
def setUp(self):
self._orig_hash_suffix = utils.HASH_PATH_SUFFIX
self._orig_hash_prefix = utils.HASH_PATH_PREFIX
utils.HASH_PATH_SUFFIX = 'endcap'
utils.HASH_PATH_PREFIX = ''
def tearDown(self):
utils.HASH_PATH_SUFFIX = self._orig_hash_suffix
utils.HASH_PATH_PREFIX = self._orig_hash_prefix
class TestRingData(unittest.TestCase):
def setUp(self):
@ -109,11 +122,10 @@ class TestRingData(unittest.TestCase):
'0644')
class TestRing(unittest.TestCase):
class TestRing(TestRingBase):
def setUp(self):
utils.HASH_PATH_SUFFIX = 'endcap'
utils.HASH_PATH_PREFIX = ''
super(TestRing, self).setUp()
self.testdir = mkdtemp()
self.testgz = os.path.join(self.testdir, 'whatever.ring.gz')
self.intended_replica2part2dev_id = [
@ -147,6 +159,7 @@ class TestRing(unittest.TestCase):
reload_time=self.intended_reload_time, ring_name='whatever')
def tearDown(self):
super(TestRing, self).tearDown()
rmtree(self.testdir, ignore_errors=1)
def test_creation(self):

View File

@ -23,6 +23,9 @@ from uuid import uuid4
import simplejson
import sqlite3
import itertools
import time
import random
from mock import patch, MagicMock
from eventlet.timeout import Timeout
@ -31,7 +34,7 @@ import swift.common.db
from swift.common.db import chexor, dict_factory, get_db_connection, \
DatabaseBroker, DatabaseConnectionError, DatabaseAlreadyExists, \
GreenDBConnection
from swift.common.utils import normalize_timestamp, mkdirs
from swift.common.utils import normalize_timestamp, mkdirs, json
from swift.common.exceptions import LockTimeout
@ -82,6 +85,30 @@ class TestChexor(unittest.TestCase):
'd41d8cd98f00b204e9800998ecf8427e', None,
normalize_timestamp(1))
def test_chexor(self):
ts = (normalize_timestamp(ts) for ts in
itertools.count(int(time.time())))
objects = [
('frank', ts.next()),
('bob', ts.next()),
('tom', ts.next()),
('frank', ts.next()),
('tom', ts.next()),
('bob', ts.next()),
]
hash_ = '0'
random.shuffle(objects)
for obj in objects:
hash_ = chexor(hash_, *obj)
other_hash = '0'
random.shuffle(objects)
for obj in objects:
other_hash = chexor(other_hash, *obj)
self.assertEqual(hash_, other_hash)
class TestGreenDBConnection(unittest.TestCase):
@ -147,6 +174,28 @@ class TestGetDBConnection(unittest.TestCase):
mock_db_cmd.call_count))
class ExampleBroker(DatabaseBroker):
db_type = 'test'
db_contains_type = 'test'
def _initialize(self, conn, timestamp, **kwargs):
conn.executescript('''
CREATE TABLE test_stat (
name TEXT,
timestamp TEXT DEFAULT 0,
status_changed_at TEXT DEFAULT 0
);
CREATE TABLE test (
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT
);
''')
conn.execute("""
INSERT INTO test_stat (name, timestamp) VALUES (?, ?)
""", (self.account, timestamp))
class TestDatabaseBroker(unittest.TestCase):
def setUp(self):
@ -226,7 +275,7 @@ class TestDatabaseBroker(unittest.TestCase):
broker.initialize, normalize_timestamp('1'))
def test_delete_db(self):
def init_stub(conn, put_timestamp):
def init_stub(conn, put_timestamp, **kwargs):
conn.execute('CREATE TABLE test (one TEXT)')
conn.execute('CREATE TABLE test_stat (id TEXT)')
conn.execute('INSERT INTO test_stat (id) VALUES (?)',
@ -383,7 +432,7 @@ class TestDatabaseBroker(unittest.TestCase):
broker.db_contains_type = 'test'
uuid1 = str(uuid4())
def _initialize(conn, timestamp):
def _initialize(conn, timestamp, **kwargs):
conn.execute('CREATE TABLE test (one TEXT)')
conn.execute('CREATE TABLE test_stat (id TEXT)')
conn.execute('INSERT INTO test_stat (id) VALUES (?)', (uuid1,))
@ -433,7 +482,7 @@ class TestDatabaseBroker(unittest.TestCase):
broker.db_type = 'test'
broker.db_contains_type = 'test'
def _initialize(conn, timestamp):
def _initialize(conn, timestamp, **kwargs):
conn.execute('CREATE TABLE test (one TEXT)')
conn.execute('INSERT INTO test (one) VALUES ("1")')
conn.execute('INSERT INTO test (one) VALUES ("2")')
@ -456,7 +505,7 @@ class TestDatabaseBroker(unittest.TestCase):
broker.db_contains_type = 'test'
uuid1 = str(uuid4())
def _initialize(conn, timestamp):
def _initialize(conn, timestamp, **kwargs):
conn.execute('CREATE TABLE test (one TEXT)')
conn.execute('CREATE TABLE test_stat (id TEXT)')
conn.execute('INSERT INTO test_stat (id) VALUES (?)', (uuid1,))
@ -531,7 +580,7 @@ class TestDatabaseBroker(unittest.TestCase):
broker_metadata = metadata and simplejson.dumps(
{'Test': ('Value', normalize_timestamp(1))}) or ''
def _initialize(conn, put_timestamp):
def _initialize(conn, put_timestamp, **kwargs):
if put_timestamp is None:
put_timestamp = normalize_timestamp(0)
conn.executescript('''
@ -562,6 +611,7 @@ class TestDatabaseBroker(unittest.TestCase):
created_at TEXT,
put_timestamp TEXT DEFAULT '0',
delete_timestamp TEXT DEFAULT '0',
status_changed_at TEXT DEFAULT '0',
test_count INTEGER,
hash TEXT default '00000000000000000000000000000000',
id TEXT
@ -571,8 +621,10 @@ class TestDatabaseBroker(unittest.TestCase):
''' % (metadata and ", metadata TEXT DEFAULT ''" or ""))
conn.execute('''
UPDATE test_stat
SET account = ?, created_at = ?, id = ?, put_timestamp = ?
''', (broker.account, broker_creation, broker_uuid, put_timestamp))
SET account = ?, created_at = ?, id = ?, put_timestamp = ?,
status_changed_at = ?
''', (broker.account, broker_creation, broker_uuid, put_timestamp,
put_timestamp))
if metadata:
conn.execute('UPDATE test_stat SET metadata = ?',
(broker_metadata,))
@ -582,11 +634,11 @@ class TestDatabaseBroker(unittest.TestCase):
broker.initialize(put_timestamp)
info = broker.get_replication_info()
self.assertEquals(info, {
'count': 0,
'account': broker.account, 'count': 0,
'hash': '00000000000000000000000000000000',
'created_at': broker_creation, 'put_timestamp': put_timestamp,
'delete_timestamp': '0', 'max_row': -1, 'id': broker_uuid,
'metadata': broker_metadata})
'delete_timestamp': '0', 'status_changed_at': put_timestamp,
'max_row': -1, 'id': broker_uuid, 'metadata': broker_metadata})
insert_timestamp = normalize_timestamp(3)
with broker.get() as conn:
conn.execute('''
@ -595,21 +647,21 @@ class TestDatabaseBroker(unittest.TestCase):
conn.commit()
info = broker.get_replication_info()
self.assertEquals(info, {
'count': 1,
'account': broker.account, 'count': 1,
'hash': 'bdc4c93f574b0d8c2911a27ce9dd38ba',
'created_at': broker_creation, 'put_timestamp': put_timestamp,
'delete_timestamp': '0', 'max_row': 1, 'id': broker_uuid,
'metadata': broker_metadata})
'delete_timestamp': '0', 'status_changed_at': put_timestamp,
'max_row': 1, 'id': broker_uuid, 'metadata': broker_metadata})
with broker.get() as conn:
conn.execute('DELETE FROM test')
conn.commit()
info = broker.get_replication_info()
self.assertEquals(info, {
'count': 0,
'account': broker.account, 'count': 0,
'hash': '00000000000000000000000000000000',
'created_at': broker_creation, 'put_timestamp': put_timestamp,
'delete_timestamp': '0', 'max_row': 1, 'id': broker_uuid,
'metadata': broker_metadata})
'delete_timestamp': '0', 'status_changed_at': put_timestamp,
'max_row': 1, 'id': broker_uuid, 'metadata': broker_metadata})
return broker
def test_metadata(self):
@ -679,6 +731,74 @@ class TestDatabaseBroker(unittest.TestCase):
[first_value, first_timestamp])
self.assert_('Second' not in broker.metadata)
def test_get_max_row(self):
broker = ExampleBroker(':memory:')
broker.initialize(0)
self.assertEquals(-1, broker.get_max_row())
with broker.get() as conn:
conn.execute('''
INSERT INTO test (name) VALUES (?)
''', ('test_name',))
conn.commit()
self.assertEquals(1, broker.get_max_row())
with broker.get() as conn:
conn.executescript('''
DELETE FROM test;
''')
conn.commit()
self.assertEquals(1, broker.get_max_row())
with broker.get() as conn:
conn.execute('''
INSERT INTO test (name) VALUES (?)
''', ('test_name',))
conn.commit()
self.assertEquals(2, broker.get_max_row())
def test_get_info(self):
broker = ExampleBroker(':memory:', account='test')
broker.initialize(normalize_timestamp(1))
info = broker.get_info()
expected = {
'name': 'test',
'timestamp': '0000000001.00000',
'status_changed_at': '0',
}
self.assertEqual(info, expected)
def test_get_raw_metadata(self):
broker = ExampleBroker(':memory:', account='test')
broker.initialize(normalize_timestamp(0))
self.assertEqual(broker.metadata, {})
self.assertEqual(broker.get_raw_metadata(), '')
metadata = {
'test': ['value', normalize_timestamp(1)]
}
broker.update_metadata(metadata)
self.assertEqual(broker.metadata, metadata)
self.assertEqual(broker.get_raw_metadata(),
json.dumps(metadata))
def test_status_changed_at(self):
ts = (normalize_timestamp(t) for t in
itertools.count(int(time.time())))
broker = ExampleBroker(':memory:', account='test')
broker.initialize(ts.next())
self.assertEquals(broker.get_info()['status_changed_at'], '0')
status_changed_at = ts.next()
broker.update_status_changed_at(status_changed_at)
self.assertEqual(broker.get_info()['status_changed_at'],
status_changed_at)
# save the old and get a new status_changed_at
old_status_changed_at, status_changed_at = \
status_changed_at, ts.next()
broker.update_status_changed_at(status_changed_at)
self.assertEqual(broker.get_info()['status_changed_at'],
status_changed_at)
# status changed at won't go backwards...
broker.update_status_changed_at(old_status_changed_at)
self.assertEqual(broker.get_info()['status_changed_at'],
status_changed_at)
if __name__ == '__main__':
unittest.main()

View File

@ -21,8 +21,10 @@ import random
from tempfile import mkdtemp
from shutil import rmtree
from swift.common.utils import normalize_timestamp
from swift.container import auditor
from test.unit import FakeLogger
from test.unit import debug_logger, with_tempdir
from test.unit.container import test_backend
class FakeContainerBroker(object):
@ -45,7 +47,7 @@ class TestAuditor(unittest.TestCase):
def setUp(self):
self.testdir = os.path.join(mkdtemp(), 'tmp_test_container_auditor')
self.logger = FakeLogger()
self.logger = debug_logger()
rmtree(self.testdir, ignore_errors=1)
os.mkdir(self.testdir)
fnames = ['true1.db', 'true2.db', 'true3.db',
@ -78,7 +80,7 @@ class TestAuditor(unittest.TestCase):
return time.time()
conf = {}
test_auditor = auditor.ContainerAuditor(conf)
test_auditor = auditor.ContainerAuditor(conf, logger=self.logger)
with mock.patch('swift.container.auditor.time', FakeTime()):
def fake_audit_location_generator(*args, **kwargs):
@ -94,7 +96,7 @@ class TestAuditor(unittest.TestCase):
@mock.patch('swift.container.auditor.ContainerBroker', FakeContainerBroker)
def test_run_once(self):
conf = {}
test_auditor = auditor.ContainerAuditor(conf)
test_auditor = auditor.ContainerAuditor(conf, logger=self.logger)
def fake_audit_location_generator(*args, **kwargs):
files = os.listdir(self.testdir)
@ -109,7 +111,7 @@ class TestAuditor(unittest.TestCase):
@mock.patch('swift.container.auditor.ContainerBroker', FakeContainerBroker)
def test_container_auditor(self):
conf = {}
test_auditor = auditor.ContainerAuditor(conf)
test_auditor = auditor.ContainerAuditor(conf, logger=self.logger)
files = os.listdir(self.testdir)
for f in files:
path = os.path.join(self.testdir, f)
@ -117,5 +119,45 @@ class TestAuditor(unittest.TestCase):
self.assertEquals(test_auditor.container_failures, 2)
self.assertEquals(test_auditor.container_passes, 3)
class TestAuditorMigrations(unittest.TestCase):
@with_tempdir
def test_db_migration(self, tempdir):
db_path = os.path.join(tempdir, 'sda', 'containers', '0', '0', '0',
'test.db')
with test_backend.TestContainerBrokerBeforeSPI.old_broker() as \
old_ContainerBroker:
broker = old_ContainerBroker(db_path, account='a', container='c')
broker.initialize(normalize_timestamp(0), -1)
with broker.get() as conn:
try:
conn.execute('SELECT storage_policy_index '
'FROM container_stat')
except Exception as err:
self.assert_('no such column: storage_policy_index' in
str(err))
else:
self.fail('TestContainerBrokerBeforeSPI broker class '
'was already migrated')
conf = {'devices': tempdir, 'mount_check': False}
test_auditor = auditor.ContainerAuditor(conf, logger=debug_logger())
test_auditor.run_once()
broker = auditor.ContainerBroker(db_path, account='a', container='c')
info = broker.get_info()
expected = {
'account': 'a',
'container': 'c',
'object_count': 0,
'bytes_used': 0,
'storage_policy_index': 0,
}
for k, v in expected.items():
self.assertEqual(info[k], v)
if __name__ == '__main__':
unittest.main()

View File

@ -15,13 +15,25 @@
""" Tests for swift.container.backend """
import os
import hashlib
import unittest
from time import sleep, time
from uuid import uuid4
import itertools
import random
from collections import defaultdict
from contextlib import contextmanager
import sqlite3
import pickle
from swift.container.backend import ContainerBroker
from swift.common.utils import normalize_timestamp
from swift.common.storage_policy import POLICIES
import mock
from test.unit import patch_policies, with_tempdir
class TestContainerBroker(unittest.TestCase):
@ -31,18 +43,41 @@ class TestContainerBroker(unittest.TestCase):
# Test ContainerBroker.__init__
broker = ContainerBroker(':memory:', account='a', container='c')
self.assertEqual(broker.db_file, ':memory:')
broker.initialize(normalize_timestamp('1'))
broker.initialize(normalize_timestamp('1'), 0)
with broker.get() as conn:
curs = conn.cursor()
curs.execute('SELECT 1')
self.assertEqual(curs.fetchall()[0][0], 1)
@patch_policies
def test_storage_policy_property(self):
ts = itertools.count(1)
for policy in POLICIES:
broker = ContainerBroker(':memory:', account='a',
container='policy_%s' % policy.name)
broker.initialize(normalize_timestamp(ts.next()), policy.idx)
with broker.get() as conn:
try:
conn.execute('''SELECT storage_policy_index
FROM container_stat''')
except Exception:
is_migrated = False
else:
is_migrated = True
if not is_migrated:
# pre spi tests don't set policy on initialize
broker.set_storage_policy_index(policy.idx)
self.assertEqual(policy.idx, broker.storage_policy_index)
# make sure it's cached
with mock.patch.object(broker, 'get'):
self.assertEqual(policy.idx, broker.storage_policy_index)
def test_exception(self):
# Test ContainerBroker throwing a conn away after
# unhandled exception
first_conn = None
broker = ContainerBroker(':memory:', account='a', container='c')
broker.initialize(normalize_timestamp('1'))
broker.initialize(normalize_timestamp('1'), 0)
with broker.get() as conn:
first_conn = conn
try:
@ -56,7 +91,7 @@ class TestContainerBroker(unittest.TestCase):
def test_empty(self):
# Test ContainerBroker.empty
broker = ContainerBroker(':memory:', account='a', container='c')
broker.initialize(normalize_timestamp('1'))
broker.initialize(normalize_timestamp('1'), 0)
self.assert_(broker.empty())
broker.put_object('o', normalize_timestamp(time()), 0, 'text/plain',
'd41d8cd98f00b204e9800998ecf8427e')
@ -68,7 +103,7 @@ class TestContainerBroker(unittest.TestCase):
def test_reclaim(self):
broker = ContainerBroker(':memory:', account='test_account',
container='test_container')
broker.initialize(normalize_timestamp('1'))
broker.initialize(normalize_timestamp('1'), 0)
broker.put_object('o', normalize_timestamp(time()), 0, 'text/plain',
'd41d8cd98f00b204e9800998ecf8427e')
with broker.get() as conn:
@ -125,10 +160,56 @@ class TestContainerBroker(unittest.TestCase):
broker.reclaim(normalize_timestamp(time()), time())
broker.delete_db(normalize_timestamp(time()))
def test_get_info_is_deleted(self):
start = int(time())
ts = (normalize_timestamp(t) for t in itertools.count(start))
broker = ContainerBroker(':memory:', account='test_account',
container='test_container')
# create it
broker.initialize(ts.next(), POLICIES.default.idx)
info, is_deleted = broker.get_info_is_deleted()
self.assertEqual(is_deleted, broker.is_deleted())
self.assertEqual(is_deleted, False) # sanity
self.assertEqual(info, broker.get_info())
self.assertEqual(info['put_timestamp'], normalize_timestamp(start))
self.assert_(float(info['created_at']) >= start)
self.assertEqual(info['delete_timestamp'], '0')
if self.__class__ in (TestContainerBrokerBeforeMetadata,
TestContainerBrokerBeforeXSync,
TestContainerBrokerBeforeSPI):
self.assertEqual(info['status_changed_at'], '0')
else:
self.assertEqual(info['status_changed_at'],
normalize_timestamp(start))
# delete it
delete_timestamp = ts.next()
broker.delete_db(delete_timestamp)
info, is_deleted = broker.get_info_is_deleted()
self.assertEqual(is_deleted, True) # sanity
self.assertEqual(is_deleted, broker.is_deleted())
self.assertEqual(info, broker.get_info())
self.assertEqual(info['put_timestamp'], normalize_timestamp(start))
self.assert_(float(info['created_at']) >= start)
self.assertEqual(info['delete_timestamp'], delete_timestamp)
self.assertEqual(info['status_changed_at'], delete_timestamp)
# bring back to life
broker.put_object('obj', ts.next(), 0, 'text/plain', 'etag',
storage_policy_index=broker.storage_policy_index)
info, is_deleted = broker.get_info_is_deleted()
self.assertEqual(is_deleted, False) # sanity
self.assertEqual(is_deleted, broker.is_deleted())
self.assertEqual(info, broker.get_info())
self.assertEqual(info['put_timestamp'], normalize_timestamp(start))
self.assert_(float(info['created_at']) >= start)
self.assertEqual(info['delete_timestamp'], delete_timestamp)
self.assertEqual(info['status_changed_at'], delete_timestamp)
def test_delete_object(self):
# Test ContainerBroker.delete_object
broker = ContainerBroker(':memory:', account='a', container='c')
broker.initialize(normalize_timestamp('1'))
broker.initialize(normalize_timestamp('1'), 0)
broker.put_object('o', normalize_timestamp(time()), 0, 'text/plain',
'd41d8cd98f00b204e9800998ecf8427e')
with broker.get() as conn:
@ -151,7 +232,7 @@ class TestContainerBroker(unittest.TestCase):
def test_put_object(self):
# Test ContainerBroker.put_object
broker = ContainerBroker(':memory:', account='a', container='c')
broker.initialize(normalize_timestamp('1'))
broker.initialize(normalize_timestamp('1'), 0)
# Create initial object
timestamp = normalize_timestamp(time())
@ -347,16 +428,150 @@ class TestContainerBroker(unittest.TestCase):
self.assertEquals(conn.execute(
"SELECT deleted FROM object").fetchone()[0], 0)
@patch_policies
def test_put_misplaced_object_does_not_effect_container_stats(self):
policy = random.choice(list(POLICIES))
ts = (normalize_timestamp(t) for t in
itertools.count(int(time())))
broker = ContainerBroker(':memory:',
account='a', container='c')
broker.initialize(ts.next(), policy.idx)
# migration tests may not honor policy on initialize
if isinstance(self, ContainerBrokerMigrationMixin):
real_storage_policy_index = \
broker.get_info()['storage_policy_index']
policy = filter(lambda p: p.idx == real_storage_policy_index,
POLICIES)[0]
broker.put_object('correct_o', ts.next(), 123, 'text/plain',
'5af83e3196bf99f440f31f2e1a6c9afe',
storage_policy_index=policy.idx)
info = broker.get_info()
self.assertEqual(1, info['object_count'])
self.assertEqual(123, info['bytes_used'])
other_policy = random.choice([p for p in POLICIES
if p is not policy])
broker.put_object('wrong_o', ts.next(), 123, 'text/plain',
'5af83e3196bf99f440f31f2e1a6c9afe',
storage_policy_index=other_policy.idx)
self.assertEqual(1, info['object_count'])
self.assertEqual(123, info['bytes_used'])
@patch_policies
def test_has_multiple_policies(self):
policy = random.choice(list(POLICIES))
ts = (normalize_timestamp(t) for t in
itertools.count(int(time())))
broker = ContainerBroker(':memory:',
account='a', container='c')
broker.initialize(ts.next(), policy.idx)
# migration tests may not honor policy on initialize
if isinstance(self, ContainerBrokerMigrationMixin):
real_storage_policy_index = \
broker.get_info()['storage_policy_index']
policy = filter(lambda p: p.idx == real_storage_policy_index,
POLICIES)[0]
broker.put_object('correct_o', ts.next(), 123, 'text/plain',
'5af83e3196bf99f440f31f2e1a6c9afe',
storage_policy_index=policy.idx)
self.assertFalse(broker.has_multiple_policies())
other_policy = [p for p in POLICIES if p is not policy][0]
broker.put_object('wrong_o', ts.next(), 123, 'text/plain',
'5af83e3196bf99f440f31f2e1a6c9afe',
storage_policy_index=other_policy.idx)
self.assert_(broker.has_multiple_policies())
@patch_policies
def test_get_policy_info(self):
policy = random.choice(list(POLICIES))
ts = (normalize_timestamp(t) for t in
itertools.count(int(time())))
broker = ContainerBroker(':memory:',
account='a', container='c')
broker.initialize(ts.next(), policy.idx)
# migration tests may not honor policy on initialize
if isinstance(self, ContainerBrokerMigrationMixin):
real_storage_policy_index = \
broker.get_info()['storage_policy_index']
policy = filter(lambda p: p.idx == real_storage_policy_index,
POLICIES)[0]
policy_stats = broker.get_policy_stats()
expected = {policy.idx: {'bytes_used': 0, 'object_count': 0}}
self.assertEqual(policy_stats, expected)
# add an object
broker.put_object('correct_o', ts.next(), 123, 'text/plain',
'5af83e3196bf99f440f31f2e1a6c9afe',
storage_policy_index=policy.idx)
policy_stats = broker.get_policy_stats()
expected = {policy.idx: {'bytes_used': 123, 'object_count': 1}}
self.assertEqual(policy_stats, expected)
# add a misplaced object
other_policy = random.choice([p for p in POLICIES
if p is not policy])
broker.put_object('wrong_o', ts.next(), 123, 'text/plain',
'5af83e3196bf99f440f31f2e1a6c9afe',
storage_policy_index=other_policy.idx)
policy_stats = broker.get_policy_stats()
expected = {
policy.idx: {'bytes_used': 123, 'object_count': 1},
other_policy.idx: {'bytes_used': 123, 'object_count': 1},
}
self.assertEqual(policy_stats, expected)
def test_policy_stat_tracking(self):
ts = (normalize_timestamp(t) for t in
itertools.count(int(time())))
broker = ContainerBroker(':memory:',
account='a', container='c')
broker.initialize(ts.next(), POLICIES.default.idx)
stats = defaultdict(dict)
iters = 100
for i in range(iters):
policy_index = random.randint(0, iters * 0.1)
name = 'object-%s' % random.randint(0, iters * 0.1)
size = random.randint(0, iters)
broker.put_object(name, ts.next(), size, 'text/plain',
'5af83e3196bf99f440f31f2e1a6c9afe',
storage_policy_index=policy_index)
# track the size of the latest timestamp put for each object
# in each storage policy
stats[policy_index][name] = size
policy_stats = broker.get_policy_stats()
# if no objects were added for the default policy we still
# expect an entry for the default policy in the returned info
# because the database was initialized with that storage policy
# - but it must be empty.
if POLICIES.default.idx not in stats:
default_stats = policy_stats.pop(POLICIES.default.idx)
expected = {'object_count': 0, 'bytes_used': 0}
self.assertEqual(default_stats, expected)
self.assertEqual(len(policy_stats), len(stats))
for policy_index, stat in policy_stats.items():
self.assertEqual(stat['object_count'], len(stats[policy_index]))
self.assertEqual(stat['bytes_used'],
sum(stats[policy_index].values()))
def test_get_info(self):
# Test ContainerBroker.get_info
broker = ContainerBroker(':memory:', account='test1',
container='test2')
broker.initialize(normalize_timestamp('1'))
broker.initialize(normalize_timestamp('1'), 0)
info = broker.get_info()
self.assertEquals(info['account'], 'test1')
self.assertEquals(info['container'], 'test2')
self.assertEquals(info['hash'], '00000000000000000000000000000000')
self.assertEqual(info['put_timestamp'], normalize_timestamp(1))
self.assertEqual(info['delete_timestamp'], '0')
if self.__class__ in (TestContainerBrokerBeforeMetadata,
TestContainerBrokerBeforeXSync,
TestContainerBrokerBeforeSPI):
self.assertEqual(info['status_changed_at'], '0')
else:
self.assertEqual(info['status_changed_at'],
normalize_timestamp(1))
info = broker.get_info()
self.assertEquals(info['object_count'], 0)
@ -401,7 +616,7 @@ class TestContainerBroker(unittest.TestCase):
def test_set_x_syncs(self):
broker = ContainerBroker(':memory:', account='test1',
container='test2')
broker.initialize(normalize_timestamp('1'))
broker.initialize(normalize_timestamp('1'), 0)
info = broker.get_info()
self.assertEquals(info['x_container_sync_point1'], -1)
@ -415,7 +630,7 @@ class TestContainerBroker(unittest.TestCase):
def test_get_report_info(self):
broker = ContainerBroker(':memory:', account='test1',
container='test2')
broker.initialize(normalize_timestamp('1'))
broker.initialize(normalize_timestamp('1'), 0)
info = broker.get_info()
self.assertEquals(info['account'], 'test1')
@ -482,7 +697,7 @@ class TestContainerBroker(unittest.TestCase):
def test_list_objects_iter(self):
# Test ContainerBroker.list_objects_iter
broker = ContainerBroker(':memory:', account='a', container='c')
broker.initialize(normalize_timestamp('1'))
broker.initialize(normalize_timestamp('1'), 0)
for obj1 in xrange(4):
for obj2 in xrange(125):
broker.put_object('%d/%04d' % (obj1, obj2),
@ -601,7 +816,7 @@ class TestContainerBroker(unittest.TestCase):
# Test ContainerBroker.list_objects_iter using a
# delimiter that is not a slash
broker = ContainerBroker(':memory:', account='a', container='c')
broker.initialize(normalize_timestamp('1'))
broker.initialize(normalize_timestamp('1'), 0)
for obj1 in xrange(4):
for obj2 in xrange(125):
broker.put_object('%d:%04d' % (obj1, obj2),
@ -718,7 +933,7 @@ class TestContainerBroker(unittest.TestCase):
def test_list_objects_iter_prefix_delim(self):
# Test ContainerBroker.list_objects_iter
broker = ContainerBroker(':memory:', account='a', container='c')
broker.initialize(normalize_timestamp('1'))
broker.initialize(normalize_timestamp('1'), 0)
broker.put_object(
'/pets/dogs/1', normalize_timestamp(0), 0,
@ -755,7 +970,7 @@ class TestContainerBroker(unittest.TestCase):
# Test ContainerBroker.list_objects_iter for a
# container that has an odd file with a trailing delimiter
broker = ContainerBroker(':memory:', account='a', container='c')
broker.initialize(normalize_timestamp('1'))
broker.initialize(normalize_timestamp('1'), 0)
broker.put_object('a', normalize_timestamp(time()), 0,
'text/plain', 'd41d8cd98f00b204e9800998ecf8427e')
broker.put_object('a/', normalize_timestamp(time()), 0,
@ -835,7 +1050,7 @@ class TestContainerBroker(unittest.TestCase):
# Test ContainerBroker.list_objects_iter for a
# container that has an odd file with a trailing delimiter
broker = ContainerBroker(':memory:', account='a', container='c')
broker.initialize(normalize_timestamp('1'))
broker.initialize(normalize_timestamp('1'), 0)
broker.put_object('a', normalize_timestamp(time()), 0,
'text/plain', 'd41d8cd98f00b204e9800998ecf8427e')
broker.put_object('a:', normalize_timestamp(time()), 0,
@ -913,7 +1128,7 @@ class TestContainerBroker(unittest.TestCase):
def test_chexor(self):
broker = ContainerBroker(':memory:', account='a', container='c')
broker.initialize(normalize_timestamp('1'))
broker.initialize(normalize_timestamp('1'), 0)
broker.put_object('a', normalize_timestamp(1), 0,
'text/plain', 'd41d8cd98f00b204e9800998ecf8427e')
broker.put_object('b', normalize_timestamp(2), 0,
@ -933,7 +1148,7 @@ class TestContainerBroker(unittest.TestCase):
def test_newid(self):
# test DatabaseBroker.newid
broker = ContainerBroker(':memory:', account='a', container='c')
broker.initialize(normalize_timestamp('1'))
broker.initialize(normalize_timestamp('1'), 0)
id = broker.get_info()['id']
broker.newid('someid')
self.assertNotEquals(id, broker.get_info()['id'])
@ -941,7 +1156,7 @@ class TestContainerBroker(unittest.TestCase):
def test_get_items_since(self):
# test DatabaseBroker.get_items_since
broker = ContainerBroker(':memory:', account='a', container='c')
broker.initialize(normalize_timestamp('1'))
broker.initialize(normalize_timestamp('1'), 0)
broker.put_object('a', normalize_timestamp(1), 0,
'text/plain', 'd41d8cd98f00b204e9800998ecf8427e')
max_row = broker.get_replication_info()['max_row']
@ -954,9 +1169,9 @@ class TestContainerBroker(unittest.TestCase):
def test_sync_merging(self):
# exercise the DatabaseBroker sync functions a bit
broker1 = ContainerBroker(':memory:', account='a', container='c')
broker1.initialize(normalize_timestamp('1'))
broker1.initialize(normalize_timestamp('1'), 0)
broker2 = ContainerBroker(':memory:', account='a', container='c')
broker2.initialize(normalize_timestamp('1'))
broker2.initialize(normalize_timestamp('1'), 0)
self.assertEquals(broker2.get_sync('12345'), -1)
broker1.merge_syncs([{'sync_point': 3, 'remote_id': '12345'}])
broker2.merge_syncs(broker1.get_syncs())
@ -964,9 +1179,9 @@ class TestContainerBroker(unittest.TestCase):
def test_merge_items(self):
broker1 = ContainerBroker(':memory:', account='a', container='c')
broker1.initialize(normalize_timestamp('1'))
broker1.initialize(normalize_timestamp('1'), 0)
broker2 = ContainerBroker(':memory:', account='a', container='c')
broker2.initialize(normalize_timestamp('1'))
broker2.initialize(normalize_timestamp('1'), 0)
broker1.put_object('a', normalize_timestamp(1), 0,
'text/plain', 'd41d8cd98f00b204e9800998ecf8427e')
broker1.put_object('b', normalize_timestamp(2), 0,
@ -989,10 +1204,10 @@ class TestContainerBroker(unittest.TestCase):
def test_merge_items_overwrite(self):
# test DatabaseBroker.merge_items
broker1 = ContainerBroker(':memory:', account='a', container='c')
broker1.initialize(normalize_timestamp('1'))
broker1.initialize(normalize_timestamp('1'), 0)
id = broker1.get_info()['id']
broker2 = ContainerBroker(':memory:', account='a', container='c')
broker2.initialize(normalize_timestamp('1'))
broker2.initialize(normalize_timestamp('1'), 0)
broker1.put_object('a', normalize_timestamp(2), 0,
'text/plain', 'd41d8cd98f00b204e9800998ecf8427e')
broker1.put_object('b', normalize_timestamp(3), 0,
@ -1014,10 +1229,10 @@ class TestContainerBroker(unittest.TestCase):
def test_merge_items_post_overwrite_out_of_order(self):
# test DatabaseBroker.merge_items
broker1 = ContainerBroker(':memory:', account='a', container='c')
broker1.initialize(normalize_timestamp('1'))
broker1.initialize(normalize_timestamp('1'), 0)
id = broker1.get_info()['id']
broker2 = ContainerBroker(':memory:', account='a', container='c')
broker2.initialize(normalize_timestamp('1'))
broker2.initialize(normalize_timestamp('1'), 0)
broker1.put_object('a', normalize_timestamp(2), 0,
'text/plain', 'd41d8cd98f00b204e9800998ecf8427e')
broker1.put_object('b', normalize_timestamp(3), 0,
@ -1056,8 +1271,165 @@ class TestContainerBroker(unittest.TestCase):
self.assertEquals(rec['created_at'], normalize_timestamp(5))
self.assertEquals(rec['content_type'], 'text/plain')
def test_set_storage_policy_index(self):
ts = (normalize_timestamp(t) for t in
itertools.count(int(time())))
broker = ContainerBroker(':memory:', account='test_account',
container='test_container')
timestamp = ts.next()
broker.initialize(timestamp, 0)
def premetadata_create_container_stat_table(self, conn, put_timestamp=None):
info = broker.get_info()
self.assertEqual(0, info['storage_policy_index']) # sanity check
self.assertEqual(0, info['object_count'])
self.assertEqual(0, info['bytes_used'])
if self.__class__ in (TestContainerBrokerBeforeMetadata,
TestContainerBrokerBeforeXSync,
TestContainerBrokerBeforeSPI):
self.assertEqual(info['status_changed_at'], '0')
else:
self.assertEqual(timestamp, info['status_changed_at'])
expected = {0: {'object_count': 0, 'bytes_used': 0}}
self.assertEqual(expected, broker.get_policy_stats())
timestamp = ts.next()
broker.set_storage_policy_index(111, timestamp)
self.assertEqual(broker.storage_policy_index, 111)
info = broker.get_info()
self.assertEqual(111, info['storage_policy_index'])
self.assertEqual(0, info['object_count'])
self.assertEqual(0, info['bytes_used'])
self.assertEqual(timestamp, info['status_changed_at'])
expected[111] = {'object_count': 0, 'bytes_used': 0}
self.assertEqual(expected, broker.get_policy_stats())
timestamp = ts.next()
broker.set_storage_policy_index(222, timestamp)
self.assertEqual(broker.storage_policy_index, 222)
info = broker.get_info()
self.assertEqual(222, info['storage_policy_index'])
self.assertEqual(0, info['object_count'])
self.assertEqual(0, info['bytes_used'])
self.assertEqual(timestamp, info['status_changed_at'])
expected[222] = {'object_count': 0, 'bytes_used': 0}
self.assertEqual(expected, broker.get_policy_stats())
old_timestamp, timestamp = timestamp, ts.next()
broker.set_storage_policy_index(222, timestamp) # it's idempotent
info = broker.get_info()
self.assertEqual(222, info['storage_policy_index'])
self.assertEqual(0, info['object_count'])
self.assertEqual(0, info['bytes_used'])
self.assertEqual(old_timestamp, info['status_changed_at'])
self.assertEqual(expected, broker.get_policy_stats())
def test_set_storage_policy_index_empty(self):
# Putting an object may trigger migrations, so test with a
# never-had-an-object container to make sure we handle it
broker = ContainerBroker(':memory:', account='test_account',
container='test_container')
broker.initialize(normalize_timestamp('1'), 0)
info = broker.get_info()
self.assertEqual(0, info['storage_policy_index'])
broker.set_storage_policy_index(2)
info = broker.get_info()
self.assertEqual(2, info['storage_policy_index'])
def test_reconciler_sync(self):
broker = ContainerBroker(':memory:', account='test_account',
container='test_container')
broker.initialize(normalize_timestamp('1'), 0)
self.assertEquals(-1, broker.get_reconciler_sync())
broker.update_reconciler_sync(10)
self.assertEquals(10, broker.get_reconciler_sync())
@with_tempdir
def test_legacy_pending_files(self, tempdir):
ts = (normalize_timestamp(t) for t in
itertools.count(int(time())))
db_path = os.path.join(tempdir, 'container.db')
# first init an acct DB without the policy_stat table present
broker = ContainerBroker(db_path, account='a', container='c')
broker.initialize(ts.next(), 1)
# manually make some pending entries lacking storage_policy_index
with open(broker.pending_file, 'a+b') as fp:
for i in range(10):
name, timestamp, size, content_type, etag, deleted = (
'o%s' % i, ts.next(), 0, 'c', 'e', 0)
fp.write(':')
fp.write(pickle.dumps(
(name, timestamp, size, content_type, etag, deleted),
protocol=2).encode('base64'))
fp.flush()
# use put_object to append some more entries with different
# values for storage_policy_index
for i in range(10, 30):
name = 'o%s' % i
if i < 20:
size = 1
storage_policy_index = 0
else:
size = 2
storage_policy_index = 1
broker.put_object(name, ts.next(), size, 'c', 'e', 0,
storage_policy_index=storage_policy_index)
broker._commit_puts_stale_ok()
# 10 objects with 0 bytes each in the legacy pending entries
# 10 objects with 1 bytes each in storage policy 0
# 10 objects with 2 bytes each in storage policy 1
expected = {
0: {'object_count': 20, 'bytes_used': 10},
1: {'object_count': 10, 'bytes_used': 20},
}
self.assertEqual(broker.get_policy_stats(), expected)
class ContainerBrokerMigrationMixin(object):
"""
Mixin for running ContainerBroker against databases created with
older schemas.
"""
def setUp(self):
self._imported_create_object_table = \
ContainerBroker.create_object_table
ContainerBroker.create_object_table = \
prespi_create_object_table
self._imported_create_container_info_table = \
ContainerBroker.create_container_info_table
ContainerBroker.create_container_info_table = \
premetadata_create_container_info_table
self._imported_create_policy_stat_table = \
ContainerBroker.create_policy_stat_table
ContainerBroker.create_policy_stat_table = lambda *args: None
@classmethod
@contextmanager
def old_broker(cls):
cls.runTest = lambda *a, **k: None
case = cls()
case.setUp()
try:
yield ContainerBroker
finally:
case.tearDown()
def tearDown(self):
ContainerBroker.create_container_info_table = \
self._imported_create_container_info_table
ContainerBroker.create_object_table = \
self._imported_create_object_table
ContainerBroker.create_policy_stat_table = \
self._imported_create_policy_stat_table
def premetadata_create_container_info_table(self, conn, put_timestamp,
_spi=None):
"""
Copied from ContainerBroker before the metadata column was
added; used for testing with TestContainerBrokerBeforeMetadata.
@ -1099,19 +1471,17 @@ def premetadata_create_container_stat_table(self, conn, put_timestamp=None):
str(uuid4()), put_timestamp))
class TestContainerBrokerBeforeMetadata(TestContainerBroker):
class TestContainerBrokerBeforeMetadata(ContainerBrokerMigrationMixin,
TestContainerBroker):
"""
Tests for ContainerBroker against databases created before
the metadata column was added.
"""
def setUp(self):
self._imported_create_container_stat_table = \
ContainerBroker.create_container_stat_table
ContainerBroker.create_container_stat_table = \
premetadata_create_container_stat_table
super(TestContainerBrokerBeforeMetadata, self).setUp()
broker = ContainerBroker(':memory:', account='a', container='c')
broker.initialize(normalize_timestamp('1'))
broker.initialize(normalize_timestamp('1'), 0)
exc = None
with broker.get() as conn:
try:
@ -1121,15 +1491,15 @@ class TestContainerBrokerBeforeMetadata(TestContainerBroker):
self.assert_('no such column: metadata' in str(exc))
def tearDown(self):
ContainerBroker.create_container_stat_table = \
self._imported_create_container_stat_table
super(TestContainerBrokerBeforeMetadata, self).tearDown()
broker = ContainerBroker(':memory:', account='a', container='c')
broker.initialize(normalize_timestamp('1'))
broker.initialize(normalize_timestamp('1'), 0)
with broker.get() as conn:
conn.execute('SELECT metadata FROM container_stat')
def prexsync_create_container_stat_table(self, conn, put_timestamp=None):
def prexsync_create_container_info_table(self, conn, put_timestamp,
_spi=None):
"""
Copied from ContainerBroker before the
x_container_sync_point[12] columns were added; used for testing with
@ -1173,19 +1543,19 @@ def prexsync_create_container_stat_table(self, conn, put_timestamp=None):
str(uuid4()), put_timestamp))
class TestContainerBrokerBeforeXSync(TestContainerBroker):
class TestContainerBrokerBeforeXSync(ContainerBrokerMigrationMixin,
TestContainerBroker):
"""
Tests for ContainerBroker against databases created
before the x_container_sync_point[12] columns were added.
"""
def setUp(self):
self._imported_create_container_stat_table = \
ContainerBroker.create_container_stat_table
ContainerBroker.create_container_stat_table = \
prexsync_create_container_stat_table
super(TestContainerBrokerBeforeXSync, self).setUp()
ContainerBroker.create_container_info_table = \
prexsync_create_container_info_table
broker = ContainerBroker(':memory:', account='a', container='c')
broker.initialize(normalize_timestamp('1'))
broker.initialize(normalize_timestamp('1'), 0)
exc = None
with broker.get() as conn:
try:
@ -1196,9 +1566,300 @@ class TestContainerBrokerBeforeXSync(TestContainerBroker):
self.assert_('no such column: x_container_sync_point1' in str(exc))
def tearDown(self):
ContainerBroker.create_container_stat_table = \
self._imported_create_container_stat_table
super(TestContainerBrokerBeforeXSync, self).tearDown()
broker = ContainerBroker(':memory:', account='a', container='c')
broker.initialize(normalize_timestamp('1'))
broker.initialize(normalize_timestamp('1'), 0)
with broker.get() as conn:
conn.execute('SELECT x_container_sync_point1 FROM container_stat')
def prespi_create_object_table(self, conn, *args, **kwargs):
conn.executescript("""
CREATE TABLE object (
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
created_at TEXT,
size INTEGER,
content_type TEXT,
etag TEXT,
deleted INTEGER DEFAULT 0
);
CREATE INDEX ix_object_deleted_name ON object (deleted, name);
CREATE TRIGGER object_insert AFTER INSERT ON object
BEGIN
UPDATE container_stat
SET object_count = object_count + (1 - new.deleted),
bytes_used = bytes_used + new.size,
hash = chexor(hash, new.name, new.created_at);
END;
CREATE TRIGGER object_update BEFORE UPDATE ON object
BEGIN
SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT');
END;
CREATE TRIGGER object_delete AFTER DELETE ON object
BEGIN
UPDATE container_stat
SET object_count = object_count - (1 - old.deleted),
bytes_used = bytes_used - old.size,
hash = chexor(hash, old.name, old.created_at);
END;
""")
def prespi_create_container_info_table(self, conn, put_timestamp,
_spi=None):
"""
Copied from ContainerBroker before the
storage_policy_index column was added; used for testing with
TestContainerBrokerBeforeSPI.
Create the container_stat table which is specific to the container DB.
:param conn: DB connection object
:param put_timestamp: put timestamp
"""
if put_timestamp is None:
put_timestamp = normalize_timestamp(0)
conn.executescript("""
CREATE TABLE container_stat (
account TEXT,
container TEXT,
created_at TEXT,
put_timestamp TEXT DEFAULT '0',
delete_timestamp TEXT DEFAULT '0',
object_count INTEGER,
bytes_used INTEGER,
reported_put_timestamp TEXT DEFAULT '0',
reported_delete_timestamp TEXT DEFAULT '0',
reported_object_count INTEGER DEFAULT 0,
reported_bytes_used INTEGER DEFAULT 0,
hash TEXT default '00000000000000000000000000000000',
id TEXT,
status TEXT DEFAULT '',
status_changed_at TEXT DEFAULT '0',
metadata TEXT DEFAULT '',
x_container_sync_point1 INTEGER DEFAULT -1,
x_container_sync_point2 INTEGER DEFAULT -1
);
INSERT INTO container_stat (object_count, bytes_used)
VALUES (0, 0);
""")
conn.execute('''
UPDATE container_stat
SET account = ?, container = ?, created_at = ?, id = ?,
put_timestamp = ?
''', (self.account, self.container, normalize_timestamp(time()),
str(uuid4()), put_timestamp))
class TestContainerBrokerBeforeSPI(ContainerBrokerMigrationMixin,
TestContainerBroker):
"""
Tests for ContainerBroker against databases created
before the storage_policy_index column was added.
"""
def setUp(self):
super(TestContainerBrokerBeforeSPI, self).setUp()
ContainerBroker.create_container_info_table = \
prespi_create_container_info_table
broker = ContainerBroker(':memory:', account='a', container='c')
broker.initialize(normalize_timestamp('1'), 0)
exc = None
with broker.get() as conn:
try:
conn.execute('''SELECT storage_policy_index
FROM container_stat''')
except BaseException as err:
exc = err
self.assert_('no such column: storage_policy_index' in str(exc))
def tearDown(self):
super(TestContainerBrokerBeforeSPI, self).tearDown()
broker = ContainerBroker(':memory:', account='a', container='c')
broker.initialize(normalize_timestamp('1'), 0)
with broker.get() as conn:
conn.execute('SELECT storage_policy_index FROM container_stat')
@patch_policies
@with_tempdir
def test_object_table_migration(self, tempdir):
db_path = os.path.join(tempdir, 'container.db')
# initialize an un-migrated database
broker = ContainerBroker(db_path, account='a', container='c')
put_timestamp = normalize_timestamp(int(time()))
broker.initialize(put_timestamp, None)
with broker.get() as conn:
try:
conn.execute('''
SELECT storage_policy_index FROM object
''').fetchone()[0]
except sqlite3.OperationalError as err:
# confirm that the table doesn't have this column
self.assert_('no such column: storage_policy_index' in
str(err))
else:
self.fail('broker did not raise sqlite3.OperationalError '
'trying to select from storage_policy_index '
'from object table!')
# manually insert an existing row to avoid automatic migration
obj_put_timestamp = normalize_timestamp(time())
with broker.get() as conn:
conn.execute('''
INSERT INTO object (name, created_at, size,
content_type, etag, deleted)
VALUES (?, ?, ?, ?, ?, ?)
''', ('test_name', obj_put_timestamp, 123,
'text/plain', '8f4c680e75ca4c81dc1917ddab0a0b5c', 0))
conn.commit()
# make sure we can iter objects without performing migration
for o in broker.list_objects_iter(1, None, None, None, None):
self.assertEqual(o, ('test_name', obj_put_timestamp, 123,
'text/plain',
'8f4c680e75ca4c81dc1917ddab0a0b5c'))
# get_info
info = broker.get_info()
expected = {
'account': 'a',
'container': 'c',
'put_timestamp': put_timestamp,
'delete_timestamp': '0',
'status_changed_at': '0',
'bytes_used': 123,
'object_count': 1,
'reported_put_timestamp': '0',
'reported_delete_timestamp': '0',
'reported_object_count': 0,
'reported_bytes_used': 0,
'x_container_sync_point1': -1,
'x_container_sync_point2': -1,
'storage_policy_index': 0,
}
for k, v in expected.items():
self.assertEqual(info[k], v,
'The value for %s was %r not %r' % (
k, info[k], v))
self.assert_(float(info['created_at']) > float(put_timestamp))
self.assertNotEqual(int(info['hash'], 16), 0)
orig_hash = info['hash']
# get_replication_info
info = broker.get_replication_info()
# translate object count for replicators
expected['count'] = expected.pop('object_count')
for k, v in expected.items():
self.assertEqual(info[k], v)
self.assert_(float(info['created_at']) > float(put_timestamp))
self.assertEqual(info['hash'], orig_hash)
self.assertEqual(info['max_row'], 1)
self.assertEqual(info['metadata'], '')
# get_policy_stats
info = broker.get_policy_stats()
expected = {
0: {'bytes_used': 123, 'object_count': 1}
}
self.assertEqual(info, expected)
# empty & is_deleted
self.assertEqual(broker.empty(), False)
self.assertEqual(broker.is_deleted(), False)
# no migrations have occurred yet
# container_stat table
with broker.get() as conn:
try:
conn.execute('''
SELECT storage_policy_index FROM container_stat
''').fetchone()[0]
except sqlite3.OperationalError as err:
# confirm that the table doesn't have this column
self.assert_('no such column: storage_policy_index' in
str(err))
else:
self.fail('broker did not raise sqlite3.OperationalError '
'trying to select from storage_policy_index '
'from container_stat table!')
# object table
with broker.get() as conn:
try:
conn.execute('''
SELECT storage_policy_index FROM object
''').fetchone()[0]
except sqlite3.OperationalError as err:
# confirm that the table doesn't have this column
self.assert_('no such column: storage_policy_index' in
str(err))
else:
self.fail('broker did not raise sqlite3.OperationalError '
'trying to select from storage_policy_index '
'from object table!')
# policy_stat table
with broker.get() as conn:
try:
conn.execute('''
SELECT storage_policy_index FROM policy_stat
''').fetchone()[0]
except sqlite3.OperationalError as err:
# confirm that the table does not exist yet
self.assert_('no such table: policy_stat' in str(err))
else:
self.fail('broker did not raise sqlite3.OperationalError '
'trying to select from storage_policy_index '
'from policy_stat table!')
# now do a PUT with a different value for storage_policy_index
# which will update the DB schema as well as update policy_stats
# for legacy objects in the DB (those without an SPI)
second_object_put_timestamp = normalize_timestamp(time())
other_policy = [p for p in POLICIES if p.idx != 0][0]
broker.put_object('test_second', second_object_put_timestamp,
456, 'text/plain',
'cbac50c175793513fa3c581551c876ab',
storage_policy_index=other_policy.idx)
broker._commit_puts_stale_ok()
# we are fully migrated and both objects have their
# storage_policy_index
with broker.get() as conn:
storage_policy_index = conn.execute('''
SELECT storage_policy_index FROM container_stat
''').fetchone()[0]
self.assertEqual(storage_policy_index, 0)
rows = conn.execute('''
SELECT name, storage_policy_index FROM object
''').fetchall()
for row in rows:
if row[0] == 'test_name':
self.assertEqual(row[1], 0)
else:
self.assertEqual(row[1], other_policy.idx)
# and all stats tracking is in place
stats = broker.get_policy_stats()
self.assertEqual(len(stats), 2)
self.assertEqual(stats[0]['object_count'], 1)
self.assertEqual(stats[0]['bytes_used'], 123)
self.assertEqual(stats[other_policy.idx]['object_count'], 1)
self.assertEqual(stats[other_policy.idx]['bytes_used'], 456)
# get info still reports on the legacy storage policy
info = broker.get_info()
self.assertEqual(info['object_count'], 1)
self.assertEqual(info['bytes_used'], 123)
# unless you change the storage policy
broker.set_storage_policy_index(other_policy.idx)
info = broker.get_info()
self.assertEqual(info['object_count'], 1)
self.assertEqual(info['bytes_used'], 456)

View File

@ -17,6 +17,7 @@ import operator
import os
import mock
import unittest
import itertools
from contextlib import contextmanager
from shutil import rmtree
from StringIO import StringIO
@ -24,6 +25,8 @@ from tempfile import mkdtemp
from test.unit import FakeLogger
from time import gmtime
from xml.dom import minidom
import time
import random
from eventlet import spawn, Timeout, listen
import simplejson
@ -35,8 +38,11 @@ from swift.common import constraints
from swift.common.utils import (normalize_timestamp, mkdirs, public,
replication, lock_parent_directory)
from test.unit import fake_http_connect
from swift.common.storage_policy import POLICY_INDEX, POLICIES
from swift.common.request_helpers import get_sys_meta_prefix
from test.unit import patch_policies
@contextmanager
def save_globals():
@ -48,6 +54,7 @@ def save_globals():
swift.container.server.http_connect = orig_http_connect
@patch_policies
class TestContainerController(unittest.TestCase):
"""Test swift.container.server.ContainerController"""
def setUp(self):
@ -60,11 +67,46 @@ class TestContainerController(unittest.TestCase):
mkdirs(os.path.join(self.testdir, 'sda1', 'tmp'))
self.controller = container_server.ContainerController(
{'devices': self.testdir, 'mount_check': 'false'})
# some of the policy tests want at least two policies
self.assert_(len(POLICIES) > 1)
def tearDown(self):
"""Tear down for testing swift.object_server.ObjectController"""
rmtree(os.path.dirname(self.testdir), ignore_errors=1)
def _check_put_container_storage_policy(self, req, policy_index):
resp = req.get_response(self.controller)
self.assertEqual(201, resp.status_int)
req = Request.blank(req.path, method='HEAD')
resp = req.get_response(self.controller)
self.assertEqual(204, resp.status_int)
self.assertEqual(str(policy_index), resp.headers[POLICY_INDEX])
def test_get_and_validate_policy_index(self):
# no policy is OK
req = Request.blank('/sda1/p/a/container_default', method='PUT',
headers={'X-Timestamp': '0'})
self._check_put_container_storage_policy(req, POLICIES.default.idx)
# bogus policies
for policy in ('nada', 999):
req = Request.blank('/sda1/p/a/c_%s' % policy, method='PUT',
headers={
'X-Timestamp': '0',
POLICY_INDEX: policy
})
resp = req.get_response(self.controller)
self.assertEqual(400, resp.status_int)
self.assert_('invalid' in resp.body.lower())
# good policies
for policy in POLICIES:
req = Request.blank('/sda1/p/a/c_%s' % policy.name, method='PUT',
headers={
'X-Timestamp': '0',
POLICY_INDEX: policy.idx,
})
self._check_put_container_storage_policy(req, policy.idx)
def test_acl_container(self):
# Ensure no acl by default
req = Request.blank(
@ -120,31 +162,95 @@ class TestContainerController(unittest.TestCase):
'account:user')
def test_HEAD(self):
req = Request.blank(
'/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT',
'HTTP_X_TIMESTAMP': '0'})
start = int(time.time())
ts = itertools.count(start)
req = Request.blank('/sda1/p/a/c', method='PUT', headers={
'x-timestamp': normalize_timestamp(ts.next())})
req.get_response(self.controller)
req = Request.blank(
'/sda1/p/a/c', environ={'REQUEST_METHOD': 'HEAD',
'HTTP_X_TIMESTAMP': '0'})
req = Request.blank('/sda1/p/a/c', method='HEAD')
response = req.get_response(self.controller)
self.assert_(response.status.startswith('204'))
self.assertEquals(int(response.headers['x-container-bytes-used']), 0)
self.assertEquals(int(response.headers['x-container-object-count']), 0)
req2 = Request.blank(
'/sda1/p/a/c/o', environ={
'REQUEST_METHOD': 'PUT',
'HTTP_X_TIMESTAMP': '1', 'HTTP_X_SIZE': 42,
'HTTP_X_CONTENT_TYPE': 'text/plain', 'HTTP_X_ETAG': 'x'})
req2.get_response(self.controller)
self.assertEqual(response.status_int, 204)
self.assertEqual(response.headers['x-container-bytes-used'], '0')
self.assertEqual(response.headers['x-container-object-count'], '0')
obj_put_request = Request.blank(
'/sda1/p/a/c/o', method='PUT', headers={
'x-timestamp': normalize_timestamp(ts.next()),
'x-size': 42,
'x-content-type': 'text/plain',
'x-etag': 'x',
})
obj_put_request.get_response(self.controller)
# re-issue HEAD request
response = req.get_response(self.controller)
self.assertEquals(int(response.headers['x-container-bytes-used']), 42)
self.assertEquals(int(response.headers['x-container-object-count']), 1)
self.assertEqual(response.status_int // 100, 2)
self.assertEqual(response.headers['x-container-bytes-used'], '42')
self.assertEqual(response.headers['x-container-object-count'], '1')
# created at time...
self.assert_(float(response.headers['x-timestamp']) >= start)
self.assertEqual(response.headers['x-put-timestamp'],
normalize_timestamp(start))
# backend headers
self.assertEqual(int(response.headers[POLICY_INDEX]),
int(POLICIES.default))
self.assert_(float(response.headers['x-backend-timestamp']) >= start)
self.assertEqual(response.headers['x-backend-put-timestamp'],
normalize_timestamp(start))
self.assertEqual(response.headers['x-backend-delete-timestamp'],
normalize_timestamp(0))
self.assertEqual(response.headers['x-backend-status-changed-at'],
normalize_timestamp(start))
def test_HEAD_not_found(self):
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'HEAD'})
req = Request.blank('/sda1/p/a/c', method='HEAD')
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 404)
self.assertEqual(resp.status_int, 404)
self.assertEqual(int(resp.headers[POLICY_INDEX]), 0)
self.assertEqual(resp.headers['x-backend-timestamp'],
normalize_timestamp(0))
self.assertEqual(resp.headers['x-backend-put-timestamp'],
normalize_timestamp(0))
self.assertEqual(resp.headers['x-backend-status-changed-at'],
normalize_timestamp(0))
self.assertEqual(resp.headers['x-backend-delete-timestamp'],
normalize_timestamp(0))
for header in ('x-container-object-count', 'x-container-bytes-used',
'x-timestamp', 'x-put-timestamp'):
self.assertEqual(resp.headers[header], None)
def test_deleted_headers(self):
ts = itertools.count(int(time.time()))
request_method_times = {
'PUT': normalize_timestamp(ts.next()),
'DELETE': normalize_timestamp(ts.next()),
}
# setup a deleted container
for method in ('PUT', 'DELETE'):
x_timestamp = request_method_times[method]
req = Request.blank('/sda1/p/a/c', method=method,
headers={'x-timestamp': x_timestamp})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int // 100, 2)
for method in ('GET', 'HEAD'):
req = Request.blank('/sda1/p/a/c', method=method)
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 404)
# backend headers
self.assertEqual(int(resp.headers[POLICY_INDEX]),
int(POLICIES.default))
self.assert_(float(resp.headers['x-backend-timestamp']) >=
float(request_method_times['PUT']))
self.assertEqual(resp.headers['x-backend-put-timestamp'],
request_method_times['PUT'])
self.assertEqual(resp.headers['x-backend-delete-timestamp'],
request_method_times['DELETE'])
self.assertEqual(resp.headers['x-backend-status-changed-at'],
request_method_times['DELETE'])
for header in ('x-container-object-count',
'x-container-bytes-used', 'x-timestamp',
'x-put-timestamp'):
self.assertEqual(resp.headers[header], None)
def test_HEAD_invalid_partition(self):
req = Request.blank('/sda1/./a/c', environ={'REQUEST_METHOD': 'HEAD',
@ -237,6 +343,233 @@ class TestContainerController(unittest.TestCase):
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 404)
def test_PUT_good_policy_specified(self):
policy = random.choice(list(POLICIES))
# Set metadata header
req = Request.blank('/sda1/p/a/c', method='PUT',
headers={'X-Timestamp': normalize_timestamp(1),
POLICY_INDEX: policy.idx})
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 201)
# now make sure we read it back
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'})
resp = req.get_response(self.controller)
self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx))
def test_PUT_no_policy_specified(self):
# Set metadata header
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(1)})
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 201)
# now make sure the default was used (pol 1)
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'})
resp = req.get_response(self.controller)
self.assertEquals(resp.headers.get(POLICY_INDEX),
str(POLICIES.default.idx))
def test_PUT_bad_policy_specified(self):
# Set metadata header
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(1),
POLICY_INDEX: 'nada'})
resp = req.get_response(self.controller)
# make sure we get bad response
self.assertEquals(resp.status_int, 400)
def test_PUT_no_policy_change(self):
ts = itertools.count(1)
policy = random.choice(list(POLICIES))
# Set metadata header
req = Request.blank('/sda1/p/a/c', method='PUT', headers={
'X-Timestamp': normalize_timestamp(ts.next()),
POLICY_INDEX: policy.idx})
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 201)
req = Request.blank('/sda1/p/a/c')
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 204)
# make sure we get the right index back
self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx))
# now try to update w/o changing the policy
for method in ('POST', 'PUT'):
req = Request.blank('/sda1/p/a/c', method=method, headers={
'X-Timestamp': normalize_timestamp(ts.next()),
POLICY_INDEX: policy.idx
})
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int // 100, 2)
# make sure we get the right index back
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'})
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 204)
self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx))
def test_PUT_bad_policy_change(self):
ts = itertools.count(1)
policy = random.choice(list(POLICIES))
# Set metadata header
req = Request.blank('/sda1/p/a/c', method='PUT', headers={
'X-Timestamp': normalize_timestamp(ts.next()),
POLICY_INDEX: policy.idx})
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 201)
req = Request.blank('/sda1/p/a/c')
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 204)
# make sure we get the right index back
self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx))
other_policies = [p for p in POLICIES if p != policy]
for other_policy in other_policies:
# now try to change it and make sure we get a conflict
req = Request.blank('/sda1/p/a/c', method='PUT', headers={
'X-Timestamp': normalize_timestamp(ts.next()),
POLICY_INDEX: other_policy.idx
})
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 409)
# and make sure there is no change!
req = Request.blank('/sda1/p/a/c')
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 204)
# make sure we get the right index back
self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx))
def test_POST_ignores_policy_change(self):
ts = itertools.count(1)
policy = random.choice(list(POLICIES))
req = Request.blank('/sda1/p/a/c', method='PUT', headers={
'X-Timestamp': normalize_timestamp(ts.next()),
POLICY_INDEX: policy.idx})
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 201)
req = Request.blank('/sda1/p/a/c')
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 204)
# make sure we get the right index back
self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx))
other_policies = [p for p in POLICIES if p != policy]
for other_policy in other_policies:
# now try to change it and make sure we get a conflict
req = Request.blank('/sda1/p/a/c', method='POST', headers={
'X-Timestamp': normalize_timestamp(ts.next()),
POLICY_INDEX: other_policy.idx
})
resp = req.get_response(self.controller)
# valid request
self.assertEquals(resp.status_int // 100, 2)
# but it does nothing
req = Request.blank('/sda1/p/a/c')
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 204)
# make sure we get the right index back
self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx))
def test_PUT_no_policy_for_existing_default(self):
ts = itertools.count(1)
# create a container with the default storage policy
req = Request.blank('/sda1/p/a/c', method='PUT', headers={
'X-Timestamp': normalize_timestamp(ts.next()),
})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 201) # sanity check
# check the policy index
req = Request.blank('/sda1/p/a/c', method='HEAD')
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 204)
self.assertEqual(resp.headers[POLICY_INDEX],
str(POLICIES.default.idx))
# put again without specifying the storage policy
req = Request.blank('/sda1/p/a/c', method='PUT', headers={
'X-Timestamp': normalize_timestamp(ts.next()),
})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 202) # sanity check
# policy index is unchanged
req = Request.blank('/sda1/p/a/c', method='HEAD')
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 204)
self.assertEqual(resp.headers[POLICY_INDEX],
str(POLICIES.default.idx))
def test_PUT_proxy_default_no_policy_for_existing_default(self):
# make it look like the proxy has a different default than we do, like
# during a config change restart across a multi node cluster.
proxy_default = random.choice([p for p in POLICIES if not
p.is_default])
ts = itertools.count(1)
# create a container with the default storage policy
req = Request.blank('/sda1/p/a/c', method='PUT', headers={
'X-Timestamp': normalize_timestamp(ts.next()),
'X-Backend-Storage-Policy-Default': int(proxy_default),
})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 201) # sanity check
# check the policy index
req = Request.blank('/sda1/p/a/c', method='HEAD')
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 204)
self.assertEqual(int(resp.headers[POLICY_INDEX]),
int(proxy_default))
# put again without proxy specifying the different default
req = Request.blank('/sda1/p/a/c', method='PUT', headers={
'X-Timestamp': normalize_timestamp(ts.next()),
'X-Backend-Storage-Policy-Default': int(POLICIES.default),
})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 202) # sanity check
# policy index is unchanged
req = Request.blank('/sda1/p/a/c', method='HEAD')
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 204)
self.assertEqual(int(resp.headers[POLICY_INDEX]),
int(proxy_default))
def test_PUT_no_policy_for_existing_non_default(self):
ts = itertools.count(1)
non_default_policy = [p for p in POLICIES if not p.is_default][0]
# create a container with the non-default storage policy
req = Request.blank('/sda1/p/a/c', method='PUT', headers={
'X-Timestamp': normalize_timestamp(ts.next()),
POLICY_INDEX: non_default_policy.idx,
})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 201) # sanity check
# check the policy index
req = Request.blank('/sda1/p/a/c', method='HEAD')
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 204)
self.assertEqual(resp.headers[POLICY_INDEX],
str(non_default_policy.idx))
# put again without specifiying the storage policy
req = Request.blank('/sda1/p/a/c', method='PUT', headers={
'X-Timestamp': normalize_timestamp(ts.next()),
})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 202) # sanity check
# policy index is unchanged
req = Request.blank('/sda1/p/a/c', method='HEAD')
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 204)
self.assertEqual(resp.headers[POLICY_INDEX],
str(non_default_policy.idx))
def test_PUT_GET_metadata(self):
# Set metadata header
req = Request.blank(
@ -305,20 +638,20 @@ class TestContainerController(unittest.TestCase):
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(1),
key: 'Value'})
resp = self.controller.PUT(req)
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 201)
req = Request.blank('/sda1/p/a/c')
resp = self.controller.GET(req)
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'})
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 204)
self.assertEquals(resp.headers.get(key.lower()), 'Value')
# Set another metadata header, ensuring old one doesn't disappear
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': normalize_timestamp(1),
key2: 'Value2'})
resp = self.controller.POST(req)
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 204)
req = Request.blank('/sda1/p/a/c')
resp = self.controller.GET(req)
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'})
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 204)
self.assertEquals(resp.headers.get(key.lower()), 'Value')
self.assertEquals(resp.headers.get(key2.lower()), 'Value2')
@ -326,10 +659,10 @@ class TestContainerController(unittest.TestCase):
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(3),
key: 'New Value'})
resp = self.controller.PUT(req)
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 202)
req = Request.blank('/sda1/p/a/c')
resp = self.controller.GET(req)
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'})
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 204)
self.assertEquals(resp.headers.get(key.lower()),
'New Value')
@ -337,10 +670,10 @@ class TestContainerController(unittest.TestCase):
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(2),
key: 'Old Value'})
resp = self.controller.PUT(req)
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 202)
req = Request.blank('/sda1/p/a/c')
resp = self.controller.GET(req)
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'})
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 204)
self.assertEquals(resp.headers.get(key.lower()),
'New Value')
@ -348,10 +681,10 @@ class TestContainerController(unittest.TestCase):
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(4),
key: ''})
resp = self.controller.PUT(req)
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 202)
req = Request.blank('/sda1/p/a/c')
resp = self.controller.GET(req)
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'})
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 204)
self.assert_(key.lower() not in resp.headers)
@ -437,26 +770,26 @@ class TestContainerController(unittest.TestCase):
key = '%sTest' % prefix
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(1)})
resp = self.controller.PUT(req)
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 201)
# Set metadata header
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': normalize_timestamp(1),
key: 'Value'})
resp = self.controller.POST(req)
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 204)
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'HEAD'})
resp = self.controller.HEAD(req)
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 204)
self.assertEquals(resp.headers.get(key.lower()), 'Value')
# Update metadata header
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': normalize_timestamp(3),
key: 'New Value'})
resp = self.controller.POST(req)
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 204)
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'HEAD'})
resp = self.controller.HEAD(req)
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 204)
self.assertEquals(resp.headers.get(key.lower()),
'New Value')
@ -464,10 +797,10 @@ class TestContainerController(unittest.TestCase):
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': normalize_timestamp(2),
key: 'Old Value'})
resp = self.controller.POST(req)
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 204)
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'HEAD'})
resp = self.controller.HEAD(req)
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 204)
self.assertEquals(resp.headers.get(key.lower()),
'New Value')
@ -475,10 +808,10 @@ class TestContainerController(unittest.TestCase):
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': normalize_timestamp(4),
key: ''})
resp = self.controller.POST(req)
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 204)
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'HEAD'})
resp = self.controller.HEAD(req)
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 204)
self.assert_(key.lower() not in resp.headers)
@ -780,11 +1113,22 @@ class TestContainerController(unittest.TestCase):
req = Request.blank(path, method='GET')
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 404) # sanity
# backend headers
expectations = {
'x-backend-put-timestamp': normalize_timestamp(1),
'x-backend-delete-timestamp': normalize_timestamp(2),
'x-backend-status-changed-at': normalize_timestamp(2),
}
for header, value in expectations.items():
self.assertEqual(resp.headers[header], value,
'response header %s was %s not %s' % (
header, resp.headers[header], value))
db = self.controller._get_container_broker('sda1', 'p', 'a', 'c')
self.assertEqual(True, db.is_deleted())
info = db.get_info()
self.assertEquals(info['put_timestamp'], normalize_timestamp('1'))
self.assertEquals(info['delete_timestamp'], normalize_timestamp('2'))
self.assertEquals(info['status_changed_at'], normalize_timestamp('2'))
# recreate
req = Request.blank(path, method='PUT',
headers={'X-Timestamp': '4'})
@ -795,6 +1139,20 @@ class TestContainerController(unittest.TestCase):
info = db.get_info()
self.assertEquals(info['put_timestamp'], normalize_timestamp('4'))
self.assertEquals(info['delete_timestamp'], normalize_timestamp('2'))
self.assertEquals(info['status_changed_at'], normalize_timestamp('4'))
for method in ('GET', 'HEAD'):
req = Request.blank(path)
resp = req.get_response(self.controller)
expectations = {
'x-put-timestamp': normalize_timestamp(4),
'x-backend-put-timestamp': normalize_timestamp(4),
'x-backend-delete-timestamp': normalize_timestamp(2),
'x-backend-status-changed-at': normalize_timestamp(4),
}
for header, expected in expectations.items():
self.assertEqual(resp.headers[header], expected,
'header %s was %s is not expected %s' % (
header, resp.headers[header], expected))
def test_DELETE_PUT_recreate_replication_race(self):
path = '/sda1/p/a/c'
@ -862,6 +1220,71 @@ class TestContainerController(unittest.TestCase):
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 404)
def test_change_storage_policy_via_DELETE_then_PUT(self):
ts = (normalize_timestamp(t) for t in
itertools.count(int(time.time())))
policy = random.choice(list(POLICIES))
req = Request.blank(
'/sda1/p/a/c', method='PUT',
headers={'X-Timestamp': ts.next(),
POLICY_INDEX: policy.idx})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 201) # sanity check
# try re-recreate with other policies
other_policies = [p for p in POLICIES if p != policy]
for other_policy in other_policies:
# first delete the existing container
req = Request.blank('/sda1/p/a/c', method='DELETE', headers={
'X-Timestamp': ts.next()})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 204) # sanity check
# at this point, the DB should still exist but be in a deleted
# state, so changing the policy index is perfectly acceptable
req = Request.blank('/sda1/p/a/c', method='PUT', headers={
'X-Timestamp': ts.next(),
POLICY_INDEX: other_policy.idx})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 201) # sanity check
req = Request.blank(
'/sda1/p/a/c', method='HEAD')
resp = req.get_response(self.controller)
self.assertEqual(resp.headers[POLICY_INDEX],
str(other_policy.idx))
def test_change_to_default_storage_policy_via_DELETE_then_PUT(self):
ts = (normalize_timestamp(t) for t in
itertools.count(int(time.time())))
non_default_policy = random.choice([p for p in POLICIES
if not p.is_default])
req = Request.blank('/sda1/p/a/c', method='PUT', headers={
'X-Timestamp': ts.next(),
POLICY_INDEX: non_default_policy.idx,
})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 201) # sanity check
req = Request.blank(
'/sda1/p/a/c', method='DELETE',
headers={'X-Timestamp': ts.next()})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 204) # sanity check
# at this point, the DB should still exist but be in a deleted state,
# so changing the policy index is perfectly acceptable
req = Request.blank(
'/sda1/p/a/c', method='PUT',
headers={'X-Timestamp': ts.next()})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 201) # sanity check
req = Request.blank('/sda1/p/a/c', method='HEAD')
resp = req.get_response(self.controller)
self.assertEqual(resp.headers[POLICY_INDEX],
str(POLICIES.default.idx))
def test_DELETE_object(self):
req = Request.blank(
'/sda1/p/a/c',

View File

@ -91,7 +91,7 @@ class TestContainerUpdater(unittest.TestCase):
os.mkdir(subdir)
cb = ContainerBroker(os.path.join(subdir, 'hash.db'), account='a',
container='c')
cb.initialize(normalize_timestamp(1))
cb.initialize(normalize_timestamp(1), 0)
cu.run_once()
info = cb.get_info()
self.assertEquals(info['object_count'], 0)
@ -172,7 +172,7 @@ class TestContainerUpdater(unittest.TestCase):
os.mkdir(subdir)
cb = ContainerBroker(os.path.join(subdir, 'hash.db'), account='a',
container='\xce\xa9')
cb.initialize(normalize_timestamp(1))
cb.initialize(normalize_timestamp(1), 0)
cb.put_object('\xce\xa9', normalize_timestamp(2), 3, 'text/plain',
'68b329da9893e34099c7d8ad5cb9c940')

View File

@ -20,11 +20,17 @@ from swift.common.swob import Request
from swift.proxy import server as proxy_server
from swift.proxy.controllers.base import headers_to_container_info
from test.unit import fake_http_connect, FakeRing, FakeMemcache
from swift.common.storage_policy import StoragePolicy
from swift.common.request_helpers import get_sys_meta_prefix
from test.unit import patch_policies
from test.unit.common.ring.test_ring import TestRingBase
class TestContainerController(unittest.TestCase):
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())])
class TestContainerController(TestRingBase):
def setUp(self):
TestRingBase.setUp(self)
self.app = proxy_server.Application(None, FakeMemcache(),
account_ring=FakeRing(),
container_ring=FakeRing(),

View File

@ -27,6 +27,7 @@ from tempfile import mkdtemp
import weakref
import operator
import re
import random
import mock
from eventlet import sleep, spawn, wsgi, listen
@ -34,7 +35,8 @@ import simplejson
from test.unit import (
connect_tcp, readuntil2crlfs, FakeLogger, fake_http_connect, FakeRing,
FakeMemcache, debug_logger, patch_policies, write_fake_ring)
FakeMemcache, debug_logger, patch_policies, write_fake_ring,
mocked_http_conn)
from swift.proxy import server as proxy_server
from swift.account import server as account_server
from swift.container import server as container_server
@ -49,10 +51,11 @@ from swift.proxy.controllers import base as proxy_base
from swift.proxy.controllers.base import get_container_memcache_key, \
get_account_memcache_key, cors_validation
import swift.proxy.controllers
from swift.common.request_helpers import get_sys_meta_prefix
from swift.common.storage_policy import StoragePolicy
from swift.common.swob import Request, Response, HTTPUnauthorized, \
HTTPException
from swift.common.storage_policy import StoragePolicy, \
POLICIES, POLICY, POLICY_INDEX
from swift.common.request_helpers import get_sys_meta_prefix
# mocks
logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))
@ -4170,6 +4173,11 @@ class TestObjectController(unittest.TestCase):
])
@patch_policies([
StoragePolicy(0, 'zero', True, object_ring=FakeRing()),
StoragePolicy(1, 'one', False, object_ring=FakeRing()),
StoragePolicy(2, 'two', False, True, object_ring=FakeRing())
])
class TestContainerController(unittest.TestCase):
"Test swift.proxy_server.ContainerController"
@ -4178,7 +4186,76 @@ class TestContainerController(unittest.TestCase):
account_ring=FakeRing(),
container_ring=FakeRing(),
object_ring=FakeRing(),
logger=FakeLogger())
logger=debug_logger())
def test_convert_policy_to_index(self):
controller = swift.proxy.controllers.ContainerController(self.app,
'a', 'c')
expected = {
'zero': 0,
'ZeRo': 0,
'one': 1,
'OnE': 1,
}
for name, index in expected.items():
req = Request.blank('/a/c', headers={'Content-Length': '0',
'Content-Type': 'text/plain',
POLICY: name})
self.assertEqual(controller._convert_policy_to_index(req), index)
# default test
req = Request.blank('/a/c', headers={'Content-Length': '0',
'Content-Type': 'text/plain'})
self.assertEqual(controller._convert_policy_to_index(req), None)
# negative test
req = Request.blank('/a/c', headers={'Content-Length': '0',
'Content-Type': 'text/plain', POLICY: 'nada'})
self.assertRaises(HTTPException, controller._convert_policy_to_index,
req)
# storage policy two is deprecated
req = Request.blank('/a/c', headers={'Content-Length': '0',
'Content-Type': 'text/plain',
POLICY: 'two'})
self.assertRaises(HTTPException, controller._convert_policy_to_index,
req)
def test_convert_index_to_name(self):
policy = random.choice(list(POLICIES))
req = Request.blank('/v1/a/c')
with mocked_http_conn(
200, 200,
headers={POLICY_INDEX: int(policy)},
) as fake_conn:
resp = req.get_response(self.app)
self.assertRaises(StopIteration, fake_conn.code_iter.next)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.headers[POLICY], policy.name)
def test_no_convert_index_to_name_when_container_not_found(self):
policy = random.choice(list(POLICIES))
req = Request.blank('/v1/a/c')
with mocked_http_conn(
200, 404, 404, 404,
headers={POLICY_INDEX: int(policy)}) as fake_conn:
resp = req.get_response(self.app)
self.assertRaises(StopIteration, fake_conn.code_iter.next)
self.assertEqual(resp.status_int, 404)
self.assertEqual(resp.headers[POLICY], None)
def test_error_convert_index_to_name(self):
req = Request.blank('/v1/a/c')
with mocked_http_conn(
200, 200,
headers={POLICY_INDEX: '-1'}) as fake_conn:
resp = req.get_response(self.app)
self.assertRaises(StopIteration, fake_conn.code_iter.next)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.headers[POLICY], None)
error_lines = self.app.logger.get_lines_for_level('error')
self.assertEqual(2, len(error_lines))
for msg in error_lines:
expected = "Could not translate " \
"X-Backend-Storage-Policy-Index ('-1')"
self.assertTrue(expected in msg)
def test_transfer_headers(self):
src_headers = {'x-remove-versions-location': 'x',
@ -4287,6 +4364,59 @@ class TestContainerController(unittest.TestCase):
self.app.account_autocreate = True
test_status_map((404, 404, 404), 404, None, 404)
def test_PUT_policy_headers(self):
backend_requests = []
def capture_requests(ipaddr, port, device, partition, method,
path, headers=None, query_string=None):
if method == 'PUT':
backend_requests.append(headers)
def test_policy(requested_policy):
with save_globals():
mock_conn = set_http_connect(200, 201, 201, 201,
give_connect=capture_requests)
self.app.memcache.store = {}
req = Request.blank('/v1/a/test', method='PUT',
headers={'Content-Length': 0})
if requested_policy:
expected_policy = requested_policy
req.headers[POLICY] = policy.name
else:
expected_policy = POLICIES.default
res = req.get_response(self.app)
if expected_policy.is_deprecated:
self.assertEquals(res.status_int, 400)
self.assertEqual(0, len(backend_requests))
expected = 'is deprecated'
self.assertTrue(expected in res.body,
'%r did not include %r' % (
res.body, expected))
return
self.assertEquals(res.status_int, 201)
self.assertEqual(
expected_policy.object_ring.replicas,
len(backend_requests))
for headers in backend_requests:
if not requested_policy:
self.assertFalse(POLICY_INDEX in headers)
self.assertTrue(
'X-Backend-Storage-Policy-Default' in headers)
self.assertEqual(
int(expected_policy),
int(headers['X-Backend-Storage-Policy-Default']))
else:
self.assertTrue(POLICY_INDEX in headers)
self.assertEqual(int(headers[POLICY_INDEX]),
policy.idx)
# make sure all mocked responses are consumed
self.assertRaises(StopIteration, mock_conn.code_iter.next)
test_policy(None) # no policy header
for policy in POLICIES:
backend_requests = [] # reset backend requests
test_policy(policy)
def test_PUT(self):
with save_globals():
controller = proxy_server.ContainerController(self.app, 'account',
@ -5930,6 +6060,7 @@ class TestProxyObjectPerformance(unittest.TestCase):
@patch_policies([StoragePolicy(0, 'migrated'),
StoragePolicy(1, 'ernie', True),
StoragePolicy(2, 'deprecated', is_deprecated=True),
StoragePolicy(3, 'bert')])
class TestSwiftInfo(unittest.TestCase):
def setUp(self):
@ -5971,6 +6102,8 @@ class TestSwiftInfo(unittest.TestCase):
self.assertTrue('policies' in si)
sorted_pols = sorted(si['policies'], key=operator.itemgetter('name'))
self.assertEqual(len(sorted_pols), 3)
for policy in sorted_pols:
self.assertNotEquals(policy['name'], 'deprecated')
self.assertEqual(sorted_pols[0]['name'], 'bert')
self.assertEqual(sorted_pols[1]['name'], 'ernie')
self.assertEqual(sorted_pols[2]['name'], 'migrated')