Merge "Split backends off swift/common/db.py"

This commit is contained in:
Jenkins 2013-09-11 02:46:40 +00:00 committed by Gerrit Code Review
commit 21d6654a58
18 changed files with 2699 additions and 2572 deletions

16
doc/source/backends.rst Normal file
View File

@ -0,0 +1,16 @@
======================================
Pluggable Back-ends: API Documentation
======================================
.. automodule:: swift.account.backend
:private-members:
:members:
:undoc-members:
.. automodule:: swift.container.backend
:private-members:
:members:
:undoc-members:
.. automodule:: swift.obj.diskfile
:members:

View File

@ -66,6 +66,7 @@ Developer Documentation
development_guidelines
development_saio
development_auth
backends
Administrator Documentation
===========================

View File

@ -20,7 +20,7 @@ from random import random
import swift.common.db
from swift.account import server as account_server
from swift.common.db import AccountBroker
from swift.account.backend import AccountBroker
from swift.common.utils import get_logger, audit_location_generator, \
config_true_value, dump_recon_cache, ratelimit_sleep
from swift.common.daemon import Daemon

416
swift/account/backend.py Normal file
View File

@ -0,0 +1,416 @@
# Copyright (c) 2010-2012 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Pluggable Back-end for Account Server
"""
from __future__ import with_statement
import os
from uuid import uuid4
import time
import cPickle as pickle
import errno
import sqlite3
from swift.common.utils import normalize_timestamp, lock_parent_directory
from swift.common.db import DatabaseBroker, DatabaseConnectionError, \
PENDING_CAP, PICKLE_PROTOCOL, utf8encode
class AccountBroker(DatabaseBroker):
"""Encapsulates working with a account database."""
db_type = 'account'
db_contains_type = 'container'
db_reclaim_timestamp = 'delete_timestamp'
def _initialize(self, conn, put_timestamp):
"""
Create a brand new database (tables, indices, triggers, etc.)
:param conn: DB connection object
:param put_timestamp: put timestamp
"""
if not self.account:
raise ValueError(
'Attempting to create a new database with no account set')
self.create_container_table(conn)
self.create_account_stat_table(conn, put_timestamp)
def create_container_table(self, conn):
"""
Create container table which is specific to the account DB.
:param conn: DB connection object
"""
conn.executescript("""
CREATE TABLE container (
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
put_timestamp TEXT,
delete_timestamp TEXT,
object_count INTEGER,
bytes_used INTEGER,
deleted INTEGER DEFAULT 0
);
CREATE INDEX ix_container_deleted_name ON
container (deleted, name);
CREATE TRIGGER container_insert AFTER INSERT ON container
BEGIN
UPDATE account_stat
SET container_count = container_count + (1 - new.deleted),
object_count = object_count + new.object_count,
bytes_used = bytes_used + new.bytes_used,
hash = chexor(hash, new.name,
new.put_timestamp || '-' ||
new.delete_timestamp || '-' ||
new.object_count || '-' || new.bytes_used);
END;
CREATE TRIGGER container_update BEFORE UPDATE ON container
BEGIN
SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT');
END;
CREATE TRIGGER container_delete AFTER DELETE ON container
BEGIN
UPDATE account_stat
SET container_count = container_count - (1 - old.deleted),
object_count = object_count - old.object_count,
bytes_used = bytes_used - old.bytes_used,
hash = chexor(hash, old.name,
old.put_timestamp || '-' ||
old.delete_timestamp || '-' ||
old.object_count || '-' || old.bytes_used);
END;
""")
def create_account_stat_table(self, conn, put_timestamp):
"""
Create account_stat table which is specific to the account DB.
:param conn: DB connection object
:param put_timestamp: put timestamp
"""
conn.executescript("""
CREATE TABLE account_stat (
account TEXT,
created_at TEXT,
put_timestamp TEXT DEFAULT '0',
delete_timestamp TEXT DEFAULT '0',
container_count INTEGER,
object_count INTEGER DEFAULT 0,
bytes_used INTEGER DEFAULT 0,
hash TEXT default '00000000000000000000000000000000',
id TEXT,
status TEXT DEFAULT '',
status_changed_at TEXT DEFAULT '0',
metadata TEXT DEFAULT ''
);
INSERT INTO account_stat (container_count) VALUES (0);
""")
conn.execute('''
UPDATE account_stat SET account = ?, created_at = ?, id = ?,
put_timestamp = ?
''', (self.account, normalize_timestamp(time.time()), str(uuid4()),
put_timestamp))
def get_db_version(self, conn):
if self._db_version == -1:
self._db_version = 0
for row in conn.execute('''
SELECT name FROM sqlite_master
WHERE name = 'ix_container_deleted_name' '''):
self._db_version = 1
return self._db_version
def _delete_db(self, conn, timestamp, force=False):
"""
Mark the DB as deleted.
:param conn: DB connection object
:param timestamp: timestamp to mark as deleted
"""
conn.execute("""
UPDATE account_stat
SET delete_timestamp = ?,
status = 'DELETED',
status_changed_at = ?
WHERE delete_timestamp < ? """, (timestamp, timestamp, timestamp))
def _commit_puts_load(self, item_list, entry):
(name, put_timestamp, delete_timestamp,
object_count, bytes_used, deleted) = \
pickle.loads(entry.decode('base64'))
item_list.append(
{'name': name,
'put_timestamp': put_timestamp,
'delete_timestamp': delete_timestamp,
'object_count': object_count,
'bytes_used': bytes_used,
'deleted': deleted})
def empty(self):
"""
Check if the account DB is empty.
:returns: True if the database has no active containers.
"""
self._commit_puts_stale_ok()
with self.get() as conn:
row = conn.execute(
'SELECT container_count from account_stat').fetchone()
return (row[0] == 0)
def put_container(self, name, put_timestamp, delete_timestamp,
object_count, bytes_used):
"""
Create a container with the given attributes.
:param name: name of the container to create
:param put_timestamp: put_timestamp of the container to create
:param delete_timestamp: delete_timestamp of the container to create
:param object_count: number of objects in the container
:param bytes_used: number of bytes used by the container
"""
if delete_timestamp > put_timestamp and \
object_count in (None, '', 0, '0'):
deleted = 1
else:
deleted = 0
record = {'name': name, 'put_timestamp': put_timestamp,
'delete_timestamp': delete_timestamp,
'object_count': object_count,
'bytes_used': bytes_used,
'deleted': deleted}
if self.db_file == ':memory:':
self.merge_items([record])
return
if not os.path.exists(self.db_file):
raise DatabaseConnectionError(self.db_file, "DB doesn't exist")
pending_size = 0
try:
pending_size = os.path.getsize(self.pending_file)
except OSError as err:
if err.errno != errno.ENOENT:
raise
if pending_size > PENDING_CAP:
self._commit_puts([record])
else:
with lock_parent_directory(self.pending_file,
self.pending_timeout):
with open(self.pending_file, 'a+b') as fp:
# Colons aren't used in base64 encoding; so they are our
# delimiter
fp.write(':')
fp.write(pickle.dumps(
(name, put_timestamp, delete_timestamp, object_count,
bytes_used, deleted),
protocol=PICKLE_PROTOCOL).encode('base64'))
fp.flush()
def is_deleted(self):
"""
Check if the account DB is considered to be deleted.
:returns: True if the account DB is considered to be deleted, False
otherwise
"""
if self.db_file != ':memory:' and not os.path.exists(self.db_file):
return True
self._commit_puts_stale_ok()
with self.get() as conn:
row = conn.execute('''
SELECT put_timestamp, delete_timestamp, container_count, status
FROM account_stat''').fetchone()
return row['status'] == 'DELETED' or (
row['container_count'] in (None, '', 0, '0') and
row['delete_timestamp'] > row['put_timestamp'])
def is_status_deleted(self):
"""Only returns true if the status field is set to DELETED."""
with self.get() as conn:
row = conn.execute('''
SELECT status
FROM account_stat''').fetchone()
return (row['status'] == "DELETED")
def get_info(self):
"""
Get global data for the account.
:returns: dict with keys: account, created_at, put_timestamp,
delete_timestamp, container_count, object_count,
bytes_used, hash, id
"""
self._commit_puts_stale_ok()
with self.get() as conn:
return dict(conn.execute('''
SELECT account, created_at, put_timestamp, delete_timestamp,
container_count, object_count, bytes_used, hash, id
FROM account_stat
''').fetchone())
def list_containers_iter(self, limit, marker, end_marker, prefix,
delimiter):
"""
Get a list of containers sorted by name starting at marker onward, up
to limit entries. Entries will begin with the prefix and will not have
the delimiter after the prefix.
:param limit: maximum number of entries to get
:param marker: marker query
:param end_marker: end marker query
:param prefix: prefix query
:param delimiter: delimiter for query
:returns: list of tuples of (name, object_count, bytes_used, 0)
"""
(marker, end_marker, prefix, delimiter) = utf8encode(
marker, end_marker, prefix, delimiter)
self._commit_puts_stale_ok()
if delimiter and not prefix:
prefix = ''
orig_marker = marker
with self.get() as conn:
results = []
while len(results) < limit:
query = """
SELECT name, object_count, bytes_used, 0
FROM container
WHERE deleted = 0 AND """
query_args = []
if end_marker:
query += ' name < ? AND'
query_args.append(end_marker)
if marker and marker >= prefix:
query += ' name > ? AND'
query_args.append(marker)
elif prefix:
query += ' name >= ? AND'
query_args.append(prefix)
if self.get_db_version(conn) < 1:
query += ' +deleted = 0'
else:
query += ' deleted = 0'
query += ' ORDER BY name LIMIT ?'
query_args.append(limit - len(results))
curs = conn.execute(query, query_args)
curs.row_factory = None
if prefix is None:
# A delimiter without a specified prefix is ignored
return [r for r in curs]
if not delimiter:
if not prefix:
# It is possible to have a delimiter but no prefix
# specified. As above, the prefix will be set to the
# empty string, so avoid performing the extra work to
# check against an empty prefix.
return [r for r in curs]
else:
return [r for r in curs if r[0].startswith(prefix)]
# We have a delimiter and a prefix (possibly empty string) to
# handle
rowcount = 0
for row in curs:
rowcount += 1
marker = name = row[0]
if len(results) >= limit or not name.startswith(prefix):
curs.close()
return results
end = name.find(delimiter, len(prefix))
if end > 0:
marker = name[:end] + chr(ord(delimiter) + 1)
dir_name = name[:end + 1]
if dir_name != orig_marker:
results.append([dir_name, 0, 0, 1])
curs.close()
break
results.append(row)
if not rowcount:
break
return results
def merge_items(self, item_list, source=None):
"""
Merge items into the container table.
:param item_list: list of dictionaries of {'name', 'put_timestamp',
'delete_timestamp', 'object_count', 'bytes_used',
'deleted'}
:param source: if defined, update incoming_sync with the source
"""
with self.get() as conn:
max_rowid = -1
for rec in item_list:
record = [rec['name'], rec['put_timestamp'],
rec['delete_timestamp'], rec['object_count'],
rec['bytes_used'], rec['deleted']]
query = '''
SELECT name, put_timestamp, delete_timestamp,
object_count, bytes_used, deleted
FROM container WHERE name = ?
'''
if self.get_db_version(conn) >= 1:
query += ' AND deleted IN (0, 1)'
curs = conn.execute(query, (rec['name'],))
curs.row_factory = None
row = curs.fetchone()
if row:
row = list(row)
for i in xrange(5):
if record[i] is None and row[i] is not None:
record[i] = row[i]
if row[1] > record[1]: # Keep newest put_timestamp
record[1] = row[1]
if row[2] > record[2]: # Keep newest delete_timestamp
record[2] = row[2]
# If deleted, mark as such
if record[2] > record[1] and \
record[3] in (None, '', 0, '0'):
record[5] = 1
else:
record[5] = 0
conn.execute('''
DELETE FROM container WHERE name = ? AND
deleted IN (0, 1)
''', (record[0],))
conn.execute('''
INSERT INTO container (name, put_timestamp,
delete_timestamp, object_count, bytes_used,
deleted)
VALUES (?, ?, ?, ?, ?, ?)
''', record)
if source:
max_rowid = max(max_rowid, rec['ROWID'])
if source:
try:
conn.execute('''
INSERT INTO incoming_sync (sync_point, remote_id)
VALUES (?, ?)
''', (max_rowid, source))
except sqlite3.IntegrityError:
conn.execute('''
UPDATE incoming_sync SET sync_point=max(?, sync_point)
WHERE remote_id=?
''', (max_rowid, source))
conn.commit()

View File

@ -24,7 +24,7 @@ from eventlet import GreenPool, sleep, Timeout
import swift.common.db
from swift.account.server import DATADIR
from swift.common.db import AccountBroker
from swift.account.backend import AccountBroker
from swift.common.direct_client import ClientException, \
direct_delete_container, direct_delete_object, direct_get_container
from swift.common.ring import Ring
@ -206,7 +206,7 @@ class AccountReaper(Daemon):
.. seealso::
:class:`swift.common.db.AccountBroker` for the broker class.
:class:`swift.account.backend.AccountBroker` for the broker class.
.. seealso::

View File

@ -14,11 +14,12 @@
# limitations under the License.
from swift.account import server as account_server
from swift.common import db, db_replicator
from swift.account.backend import AccountBroker
from swift.common import db_replicator
class AccountReplicator(db_replicator.Replicator):
server_type = 'account'
brokerclass = db.AccountBroker
brokerclass = AccountBroker
datadir = account_server.DATADIR
default_port = 6002

View File

@ -23,8 +23,9 @@ from swift import gettext_ as _
from eventlet import Timeout
import swift.common.db
from swift.account.backend import AccountBroker
from swift.account.utils import account_listing_response
from swift.common.db import AccountBroker, DatabaseConnectionError
from swift.common.db import DatabaseConnectionError, DatabaseAlreadyExists
from swift.common.request_helpers import get_param, get_listing_content_type, \
split_and_validate_path
from swift.common.utils import get_logger, hash_path, public, \
@ -119,7 +120,7 @@ class AccountController(object):
try:
broker.initialize(normalize_timestamp(
req.headers.get('x-timestamp') or time.time()))
except swift.common.db.DatabaseAlreadyExists:
except DatabaseAlreadyExists:
pass
if req.headers.get('x-account-override-deleted', 'no').lower() != \
'yes' and broker.is_deleted():
@ -140,7 +141,7 @@ class AccountController(object):
try:
broker.initialize(timestamp)
created = True
except swift.common.db.DatabaseAlreadyExists:
except DatabaseAlreadyExists:
pass
elif broker.is_status_deleted():
return self._deleted_response(broker, req, HTTPForbidden,

View File

@ -23,7 +23,6 @@ import os
from uuid import uuid4
import sys
import time
import cPickle as pickle
import errno
from swift import gettext_ as _
from tempfile import mkstemp
@ -731,855 +730,3 @@ class DatabaseBroker(object):
' WHERE put_timestamp < ?' % self.db_type,
(timestamp, timestamp))
conn.commit()
class ContainerBroker(DatabaseBroker):
"""Encapsulates working with a container database."""
db_type = 'container'
db_contains_type = 'object'
db_reclaim_timestamp = 'created_at'
def _initialize(self, conn, put_timestamp):
"""Creates a brand new database (tables, indices, triggers, etc.)"""
if not self.account:
raise ValueError(
'Attempting to create a new database with no account set')
if not self.container:
raise ValueError(
'Attempting to create a new database with no container set')
self.create_object_table(conn)
self.create_container_stat_table(conn, put_timestamp)
def create_object_table(self, conn):
"""
Create the object table which is specifc to the container DB.
:param conn: DB connection object
"""
conn.executescript("""
CREATE TABLE object (
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
created_at TEXT,
size INTEGER,
content_type TEXT,
etag TEXT,
deleted INTEGER DEFAULT 0
);
CREATE INDEX ix_object_deleted_name ON object (deleted, name);
CREATE TRIGGER object_insert AFTER INSERT ON object
BEGIN
UPDATE container_stat
SET object_count = object_count + (1 - new.deleted),
bytes_used = bytes_used + new.size,
hash = chexor(hash, new.name, new.created_at);
END;
CREATE TRIGGER object_update BEFORE UPDATE ON object
BEGIN
SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT');
END;
CREATE TRIGGER object_delete AFTER DELETE ON object
BEGIN
UPDATE container_stat
SET object_count = object_count - (1 - old.deleted),
bytes_used = bytes_used - old.size,
hash = chexor(hash, old.name, old.created_at);
END;
""")
def create_container_stat_table(self, conn, put_timestamp=None):
"""
Create the container_stat table which is specific to the container DB.
:param conn: DB connection object
:param put_timestamp: put timestamp
"""
if put_timestamp is None:
put_timestamp = normalize_timestamp(0)
conn.executescript("""
CREATE TABLE container_stat (
account TEXT,
container TEXT,
created_at TEXT,
put_timestamp TEXT DEFAULT '0',
delete_timestamp TEXT DEFAULT '0',
object_count INTEGER,
bytes_used INTEGER,
reported_put_timestamp TEXT DEFAULT '0',
reported_delete_timestamp TEXT DEFAULT '0',
reported_object_count INTEGER DEFAULT 0,
reported_bytes_used INTEGER DEFAULT 0,
hash TEXT default '00000000000000000000000000000000',
id TEXT,
status TEXT DEFAULT '',
status_changed_at TEXT DEFAULT '0',
metadata TEXT DEFAULT '',
x_container_sync_point1 INTEGER DEFAULT -1,
x_container_sync_point2 INTEGER DEFAULT -1
);
INSERT INTO container_stat (object_count, bytes_used)
VALUES (0, 0);
""")
conn.execute('''
UPDATE container_stat
SET account = ?, container = ?, created_at = ?, id = ?,
put_timestamp = ?
''', (self.account, self.container, normalize_timestamp(time.time()),
str(uuid4()), put_timestamp))
def get_db_version(self, conn):
if self._db_version == -1:
self._db_version = 0
for row in conn.execute('''
SELECT name FROM sqlite_master
WHERE name = 'ix_object_deleted_name' '''):
self._db_version = 1
return self._db_version
def _newid(self, conn):
conn.execute('''
UPDATE container_stat
SET reported_put_timestamp = 0, reported_delete_timestamp = 0,
reported_object_count = 0, reported_bytes_used = 0''')
def _delete_db(self, conn, timestamp):
"""
Mark the DB as deleted
:param conn: DB connection object
:param timestamp: timestamp to mark as deleted
"""
conn.execute("""
UPDATE container_stat
SET delete_timestamp = ?,
status = 'DELETED',
status_changed_at = ?
WHERE delete_timestamp < ? """, (timestamp, timestamp, timestamp))
def _commit_puts_load(self, item_list, entry):
(name, timestamp, size, content_type, etag, deleted) = \
pickle.loads(entry.decode('base64'))
item_list.append({'name': name,
'created_at': timestamp,
'size': size,
'content_type': content_type,
'etag': etag,
'deleted': deleted})
def empty(self):
"""
Check if the DB is empty.
:returns: True if the database has no active objects, False otherwise
"""
self._commit_puts_stale_ok()
with self.get() as conn:
row = conn.execute(
'SELECT object_count from container_stat').fetchone()
return (row[0] == 0)
def delete_object(self, name, timestamp):
"""
Mark an object deleted.
:param name: object name to be deleted
:param timestamp: timestamp when the object was marked as deleted
"""
self.put_object(name, timestamp, 0, 'application/deleted', 'noetag', 1)
def put_object(self, name, timestamp, size, content_type, etag, deleted=0):
"""
Creates an object in the DB with its metadata.
:param name: object name to be created
:param timestamp: timestamp of when the object was created
:param size: object size
:param content_type: object content-type
:param etag: object etag
:param deleted: if True, marks the object as deleted and sets the
deteleted_at timestamp to timestamp
"""
record = {'name': name, 'created_at': timestamp, 'size': size,
'content_type': content_type, 'etag': etag,
'deleted': deleted}
if self.db_file == ':memory:':
self.merge_items([record])
return
if not os.path.exists(self.db_file):
raise DatabaseConnectionError(self.db_file, "DB doesn't exist")
pending_size = 0
try:
pending_size = os.path.getsize(self.pending_file)
except OSError as err:
if err.errno != errno.ENOENT:
raise
if pending_size > PENDING_CAP:
self._commit_puts([record])
else:
with lock_parent_directory(self.pending_file,
self.pending_timeout):
with open(self.pending_file, 'a+b') as fp:
# Colons aren't used in base64 encoding; so they are our
# delimiter
fp.write(':')
fp.write(pickle.dumps(
(name, timestamp, size, content_type, etag, deleted),
protocol=PICKLE_PROTOCOL).encode('base64'))
fp.flush()
def is_deleted(self, timestamp=None):
"""
Check if the DB is considered to be deleted.
:returns: True if the DB is considered to be deleted, False otherwise
"""
if self.db_file != ':memory:' and not os.path.exists(self.db_file):
return True
self._commit_puts_stale_ok()
with self.get() as conn:
row = conn.execute('''
SELECT put_timestamp, delete_timestamp, object_count
FROM container_stat''').fetchone()
# leave this db as a tombstone for a consistency window
if timestamp and row['delete_timestamp'] > timestamp:
return False
# The container is considered deleted if the delete_timestamp
# value is greater than the put_timestamp, and there are no
# objects in the container.
return (row['object_count'] in (None, '', 0, '0')) and \
(float(row['delete_timestamp']) > float(row['put_timestamp']))
def get_info(self):
"""
Get global data for the container.
:returns: dict with keys: account, container, created_at,
put_timestamp, delete_timestamp, object_count, bytes_used,
reported_put_timestamp, reported_delete_timestamp,
reported_object_count, reported_bytes_used, hash, id,
x_container_sync_point1, and x_container_sync_point2.
"""
self._commit_puts_stale_ok()
with self.get() as conn:
data = None
trailing = 'x_container_sync_point1, x_container_sync_point2'
while not data:
try:
data = conn.execute('''
SELECT account, container, created_at, put_timestamp,
delete_timestamp, object_count, bytes_used,
reported_put_timestamp, reported_delete_timestamp,
reported_object_count, reported_bytes_used, hash,
id, %s
FROM container_stat
''' % (trailing,)).fetchone()
except sqlite3.OperationalError as err:
if 'no such column: x_container_sync_point' in str(err):
trailing = '-1 AS x_container_sync_point1, ' \
'-1 AS x_container_sync_point2'
else:
raise
data = dict(data)
return data
def set_x_container_sync_points(self, sync_point1, sync_point2):
with self.get() as conn:
orig_isolation_level = conn.isolation_level
try:
# We turn off auto-transactions to ensure the alter table
# commands are part of the transaction.
conn.isolation_level = None
conn.execute('BEGIN')
try:
self._set_x_container_sync_points(conn, sync_point1,
sync_point2)
except sqlite3.OperationalError as err:
if 'no such column: x_container_sync_point' not in \
str(err):
raise
conn.execute('''
ALTER TABLE container_stat
ADD COLUMN x_container_sync_point1 INTEGER DEFAULT -1
''')
conn.execute('''
ALTER TABLE container_stat
ADD COLUMN x_container_sync_point2 INTEGER DEFAULT -1
''')
self._set_x_container_sync_points(conn, sync_point1,
sync_point2)
conn.execute('COMMIT')
finally:
conn.isolation_level = orig_isolation_level
def _set_x_container_sync_points(self, conn, sync_point1, sync_point2):
if sync_point1 is not None and sync_point2 is not None:
conn.execute('''
UPDATE container_stat
SET x_container_sync_point1 = ?,
x_container_sync_point2 = ?
''', (sync_point1, sync_point2))
elif sync_point1 is not None:
conn.execute('''
UPDATE container_stat
SET x_container_sync_point1 = ?
''', (sync_point1,))
elif sync_point2 is not None:
conn.execute('''
UPDATE container_stat
SET x_container_sync_point2 = ?
''', (sync_point2,))
def reported(self, put_timestamp, delete_timestamp, object_count,
bytes_used):
"""
Update reported stats.
:param put_timestamp: put_timestamp to update
:param delete_timestamp: delete_timestamp to update
:param object_count: object_count to update
:param bytes_used: bytes_used to update
"""
with self.get() as conn:
conn.execute('''
UPDATE container_stat
SET reported_put_timestamp = ?, reported_delete_timestamp = ?,
reported_object_count = ?, reported_bytes_used = ?
''', (put_timestamp, delete_timestamp, object_count, bytes_used))
conn.commit()
def list_objects_iter(self, limit, marker, end_marker, prefix, delimiter,
path=None):
"""
Get a list of objects sorted by name starting at marker onward, up
to limit entries. Entries will begin with the prefix and will not
have the delimiter after the prefix.
:param limit: maximum number of entries to get
:param marker: marker query
:param end_marker: end marker query
:param prefix: prefix query
:param delimiter: delimiter for query
:param path: if defined, will set the prefix and delimter based on
the path
:returns: list of tuples of (name, created_at, size, content_type,
etag)
"""
delim_force_gte = False
(marker, end_marker, prefix, delimiter, path) = utf8encode(
marker, end_marker, prefix, delimiter, path)
self._commit_puts_stale_ok()
if path is not None:
prefix = path
if path:
prefix = path = path.rstrip('/') + '/'
delimiter = '/'
elif delimiter and not prefix:
prefix = ''
orig_marker = marker
with self.get() as conn:
results = []
while len(results) < limit:
query = '''SELECT name, created_at, size, content_type, etag
FROM object WHERE'''
query_args = []
if end_marker:
query += ' name < ? AND'
query_args.append(end_marker)
if delim_force_gte:
query += ' name >= ? AND'
query_args.append(marker)
# Always set back to False
delim_force_gte = False
elif marker and marker >= prefix:
query += ' name > ? AND'
query_args.append(marker)
elif prefix:
query += ' name >= ? AND'
query_args.append(prefix)
if self.get_db_version(conn) < 1:
query += ' +deleted = 0'
else:
query += ' deleted = 0'
query += ' ORDER BY name LIMIT ?'
query_args.append(limit - len(results))
curs = conn.execute(query, query_args)
curs.row_factory = None
if prefix is None:
# A delimiter without a specified prefix is ignored
return [r for r in curs]
if not delimiter:
if not prefix:
# It is possible to have a delimiter but no prefix
# specified. As above, the prefix will be set to the
# empty string, so avoid performing the extra work to
# check against an empty prefix.
return [r for r in curs]
else:
return [r for r in curs if r[0].startswith(prefix)]
# We have a delimiter and a prefix (possibly empty string) to
# handle
rowcount = 0
for row in curs:
rowcount += 1
marker = name = row[0]
if len(results) >= limit or not name.startswith(prefix):
curs.close()
return results
end = name.find(delimiter, len(prefix))
if path is not None:
if name == path:
continue
if end >= 0 and len(name) > end + len(delimiter):
marker = name[:end] + chr(ord(delimiter) + 1)
curs.close()
break
elif end > 0:
marker = name[:end] + chr(ord(delimiter) + 1)
# we want result to be inclusinve of delim+1
delim_force_gte = True
dir_name = name[:end + 1]
if dir_name != orig_marker:
results.append([dir_name, '0', 0, None, ''])
curs.close()
break
results.append(row)
if not rowcount:
break
return results
def merge_items(self, item_list, source=None):
"""
Merge items into the object table.
:param item_list: list of dictionaries of {'name', 'created_at',
'size', 'content_type', 'etag', 'deleted'}
:param source: if defined, update incoming_sync with the source
"""
with self.get() as conn:
max_rowid = -1
for rec in item_list:
query = '''
DELETE FROM object
WHERE name = ? AND (created_at < ?)
'''
if self.get_db_version(conn) >= 1:
query += ' AND deleted IN (0, 1)'
conn.execute(query, (rec['name'], rec['created_at']))
query = 'SELECT 1 FROM object WHERE name = ?'
if self.get_db_version(conn) >= 1:
query += ' AND deleted IN (0, 1)'
if not conn.execute(query, (rec['name'],)).fetchall():
conn.execute('''
INSERT INTO object (name, created_at, size,
content_type, etag, deleted)
VALUES (?, ?, ?, ?, ?, ?)
''', ([rec['name'], rec['created_at'], rec['size'],
rec['content_type'], rec['etag'], rec['deleted']]))
if source:
max_rowid = max(max_rowid, rec['ROWID'])
if source:
try:
conn.execute('''
INSERT INTO incoming_sync (sync_point, remote_id)
VALUES (?, ?)
''', (max_rowid, source))
except sqlite3.IntegrityError:
conn.execute('''
UPDATE incoming_sync SET sync_point=max(?, sync_point)
WHERE remote_id=?
''', (max_rowid, source))
conn.commit()
class AccountBroker(DatabaseBroker):
"""Encapsulates working with a account database."""
db_type = 'account'
db_contains_type = 'container'
db_reclaim_timestamp = 'delete_timestamp'
def _initialize(self, conn, put_timestamp):
"""
Create a brand new database (tables, indices, triggers, etc.)
:param conn: DB connection object
:param put_timestamp: put timestamp
"""
if not self.account:
raise ValueError(
'Attempting to create a new database with no account set')
self.create_container_table(conn)
self.create_account_stat_table(conn, put_timestamp)
def create_container_table(self, conn):
"""
Create container table which is specific to the account DB.
:param conn: DB connection object
"""
conn.executescript("""
CREATE TABLE container (
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
put_timestamp TEXT,
delete_timestamp TEXT,
object_count INTEGER,
bytes_used INTEGER,
deleted INTEGER DEFAULT 0
);
CREATE INDEX ix_container_deleted_name ON
container (deleted, name);
CREATE TRIGGER container_insert AFTER INSERT ON container
BEGIN
UPDATE account_stat
SET container_count = container_count + (1 - new.deleted),
object_count = object_count + new.object_count,
bytes_used = bytes_used + new.bytes_used,
hash = chexor(hash, new.name,
new.put_timestamp || '-' ||
new.delete_timestamp || '-' ||
new.object_count || '-' || new.bytes_used);
END;
CREATE TRIGGER container_update BEFORE UPDATE ON container
BEGIN
SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT');
END;
CREATE TRIGGER container_delete AFTER DELETE ON container
BEGIN
UPDATE account_stat
SET container_count = container_count - (1 - old.deleted),
object_count = object_count - old.object_count,
bytes_used = bytes_used - old.bytes_used,
hash = chexor(hash, old.name,
old.put_timestamp || '-' ||
old.delete_timestamp || '-' ||
old.object_count || '-' || old.bytes_used);
END;
""")
def create_account_stat_table(self, conn, put_timestamp):
"""
Create account_stat table which is specific to the account DB.
:param conn: DB connection object
:param put_timestamp: put timestamp
"""
conn.executescript("""
CREATE TABLE account_stat (
account TEXT,
created_at TEXT,
put_timestamp TEXT DEFAULT '0',
delete_timestamp TEXT DEFAULT '0',
container_count INTEGER,
object_count INTEGER DEFAULT 0,
bytes_used INTEGER DEFAULT 0,
hash TEXT default '00000000000000000000000000000000',
id TEXT,
status TEXT DEFAULT '',
status_changed_at TEXT DEFAULT '0',
metadata TEXT DEFAULT ''
);
INSERT INTO account_stat (container_count) VALUES (0);
""")
conn.execute('''
UPDATE account_stat SET account = ?, created_at = ?, id = ?,
put_timestamp = ?
''', (self.account, normalize_timestamp(time.time()), str(uuid4()),
put_timestamp))
def get_db_version(self, conn):
if self._db_version == -1:
self._db_version = 0
for row in conn.execute('''
SELECT name FROM sqlite_master
WHERE name = 'ix_container_deleted_name' '''):
self._db_version = 1
return self._db_version
def _delete_db(self, conn, timestamp, force=False):
"""
Mark the DB as deleted.
:param conn: DB connection object
:param timestamp: timestamp to mark as deleted
"""
conn.execute("""
UPDATE account_stat
SET delete_timestamp = ?,
status = 'DELETED',
status_changed_at = ?
WHERE delete_timestamp < ? """, (timestamp, timestamp, timestamp))
def _commit_puts_load(self, item_list, entry):
(name, put_timestamp, delete_timestamp,
object_count, bytes_used, deleted) = \
pickle.loads(entry.decode('base64'))
item_list.append(
{'name': name,
'put_timestamp': put_timestamp,
'delete_timestamp': delete_timestamp,
'object_count': object_count,
'bytes_used': bytes_used,
'deleted': deleted})
def empty(self):
"""
Check if the account DB is empty.
:returns: True if the database has no active containers.
"""
self._commit_puts_stale_ok()
with self.get() as conn:
row = conn.execute(
'SELECT container_count from account_stat').fetchone()
return (row[0] == 0)
def put_container(self, name, put_timestamp, delete_timestamp,
object_count, bytes_used):
"""
Create a container with the given attributes.
:param name: name of the container to create
:param put_timestamp: put_timestamp of the container to create
:param delete_timestamp: delete_timestamp of the container to create
:param object_count: number of objects in the container
:param bytes_used: number of bytes used by the container
"""
if delete_timestamp > put_timestamp and \
object_count in (None, '', 0, '0'):
deleted = 1
else:
deleted = 0
record = {'name': name, 'put_timestamp': put_timestamp,
'delete_timestamp': delete_timestamp,
'object_count': object_count,
'bytes_used': bytes_used,
'deleted': deleted}
if self.db_file == ':memory:':
self.merge_items([record])
return
if not os.path.exists(self.db_file):
raise DatabaseConnectionError(self.db_file, "DB doesn't exist")
pending_size = 0
try:
pending_size = os.path.getsize(self.pending_file)
except OSError as err:
if err.errno != errno.ENOENT:
raise
if pending_size > PENDING_CAP:
self._commit_puts([record])
else:
with lock_parent_directory(self.pending_file,
self.pending_timeout):
with open(self.pending_file, 'a+b') as fp:
# Colons aren't used in base64 encoding; so they are our
# delimiter
fp.write(':')
fp.write(pickle.dumps(
(name, put_timestamp, delete_timestamp, object_count,
bytes_used, deleted),
protocol=PICKLE_PROTOCOL).encode('base64'))
fp.flush()
def is_deleted(self):
"""
Check if the account DB is considered to be deleted.
:returns: True if the account DB is considered to be deleted, False
otherwise
"""
if self.db_file != ':memory:' and not os.path.exists(self.db_file):
return True
self._commit_puts_stale_ok()
with self.get() as conn:
row = conn.execute('''
SELECT put_timestamp, delete_timestamp, container_count, status
FROM account_stat''').fetchone()
return row['status'] == 'DELETED' or (
row['container_count'] in (None, '', 0, '0') and
row['delete_timestamp'] > row['put_timestamp'])
def is_status_deleted(self):
"""Only returns true if the status field is set to DELETED."""
with self.get() as conn:
row = conn.execute('''
SELECT status
FROM account_stat''').fetchone()
return (row['status'] == "DELETED")
def get_info(self):
"""
Get global data for the account.
:returns: dict with keys: account, created_at, put_timestamp,
delete_timestamp, container_count, object_count,
bytes_used, hash, id
"""
self._commit_puts_stale_ok()
with self.get() as conn:
return dict(conn.execute('''
SELECT account, created_at, put_timestamp, delete_timestamp,
container_count, object_count, bytes_used, hash, id
FROM account_stat
''').fetchone())
def list_containers_iter(self, limit, marker, end_marker, prefix,
delimiter):
"""
Get a list of containers sorted by name starting at marker onward, up
to limit entries. Entries will begin with the prefix and will not have
the delimiter after the prefix.
:param limit: maximum number of entries to get
:param marker: marker query
:param end_marker: end marker query
:param prefix: prefix query
:param delimiter: delimiter for query
:returns: list of tuples of (name, object_count, bytes_used, 0)
"""
(marker, end_marker, prefix, delimiter) = utf8encode(
marker, end_marker, prefix, delimiter)
self._commit_puts_stale_ok()
if delimiter and not prefix:
prefix = ''
orig_marker = marker
with self.get() as conn:
results = []
while len(results) < limit:
query = """
SELECT name, object_count, bytes_used, 0
FROM container
WHERE deleted = 0 AND """
query_args = []
if end_marker:
query += ' name < ? AND'
query_args.append(end_marker)
if marker and marker >= prefix:
query += ' name > ? AND'
query_args.append(marker)
elif prefix:
query += ' name >= ? AND'
query_args.append(prefix)
if self.get_db_version(conn) < 1:
query += ' +deleted = 0'
else:
query += ' deleted = 0'
query += ' ORDER BY name LIMIT ?'
query_args.append(limit - len(results))
curs = conn.execute(query, query_args)
curs.row_factory = None
if prefix is None:
# A delimiter without a specified prefix is ignored
return [r for r in curs]
if not delimiter:
if not prefix:
# It is possible to have a delimiter but no prefix
# specified. As above, the prefix will be set to the
# empty string, so avoid performing the extra work to
# check against an empty prefix.
return [r for r in curs]
else:
return [r for r in curs if r[0].startswith(prefix)]
# We have a delimiter and a prefix (possibly empty string) to
# handle
rowcount = 0
for row in curs:
rowcount += 1
marker = name = row[0]
if len(results) >= limit or not name.startswith(prefix):
curs.close()
return results
end = name.find(delimiter, len(prefix))
if end > 0:
marker = name[:end] + chr(ord(delimiter) + 1)
dir_name = name[:end + 1]
if dir_name != orig_marker:
results.append([dir_name, 0, 0, 1])
curs.close()
break
results.append(row)
if not rowcount:
break
return results
def merge_items(self, item_list, source=None):
"""
Merge items into the container table.
:param item_list: list of dictionaries of {'name', 'put_timestamp',
'delete_timestamp', 'object_count', 'bytes_used',
'deleted'}
:param source: if defined, update incoming_sync with the source
"""
with self.get() as conn:
max_rowid = -1
for rec in item_list:
record = [rec['name'], rec['put_timestamp'],
rec['delete_timestamp'], rec['object_count'],
rec['bytes_used'], rec['deleted']]
query = '''
SELECT name, put_timestamp, delete_timestamp,
object_count, bytes_used, deleted
FROM container WHERE name = ?
'''
if self.get_db_version(conn) >= 1:
query += ' AND deleted IN (0, 1)'
curs = conn.execute(query, (rec['name'],))
curs.row_factory = None
row = curs.fetchone()
if row:
row = list(row)
for i in xrange(5):
if record[i] is None and row[i] is not None:
record[i] = row[i]
if row[1] > record[1]: # Keep newest put_timestamp
record[1] = row[1]
if row[2] > record[2]: # Keep newest delete_timestamp
record[2] = row[2]
# If deleted, mark as such
if record[2] > record[1] and \
record[3] in (None, '', 0, '0'):
record[5] = 1
else:
record[5] = 0
conn.execute('''
DELETE FROM container WHERE name = ? AND
deleted IN (0, 1)
''', (record[0],))
conn.execute('''
INSERT INTO container (name, put_timestamp,
delete_timestamp, object_count, bytes_used,
deleted)
VALUES (?, ?, ?, ?, ?, ?)
''', record)
if source:
max_rowid = max(max_rowid, rec['ROWID'])
if source:
try:
conn.execute('''
INSERT INTO incoming_sync (sync_point, remote_id)
VALUES (?, ?)
''', (max_rowid, source))
except sqlite3.IntegrityError:
conn.execute('''
UPDATE incoming_sync SET sync_point=max(?, sync_point)
WHERE remote_id=?
''', (max_rowid, source))
conn.commit()

View File

@ -22,7 +22,7 @@ from eventlet import Timeout
import swift.common.db
from swift.container import server as container_server
from swift.common.db import ContainerBroker
from swift.container.backend import ContainerBroker
from swift.common.utils import get_logger, audit_location_generator, \
config_true_value, dump_recon_cache, ratelimit_sleep
from swift.common.daemon import Daemon

496
swift/container/backend.py Normal file
View File

@ -0,0 +1,496 @@
# Copyright (c) 2010-2012 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Pluggable Back-ends for Container Server
"""
from __future__ import with_statement
import os
from uuid import uuid4
import time
import cPickle as pickle
import errno
import sqlite3
from swift.common.utils import normalize_timestamp, lock_parent_directory
from swift.common.db import DatabaseBroker, DatabaseConnectionError, \
PENDING_CAP, PICKLE_PROTOCOL, utf8encode
class ContainerBroker(DatabaseBroker):
"""Encapsulates working with a container database."""
db_type = 'container'
db_contains_type = 'object'
db_reclaim_timestamp = 'created_at'
def _initialize(self, conn, put_timestamp):
"""Creates a brand new database (tables, indices, triggers, etc.)"""
if not self.account:
raise ValueError(
'Attempting to create a new database with no account set')
if not self.container:
raise ValueError(
'Attempting to create a new database with no container set')
self.create_object_table(conn)
self.create_container_stat_table(conn, put_timestamp)
def create_object_table(self, conn):
"""
Create the object table which is specifc to the container DB.
:param conn: DB connection object
"""
conn.executescript("""
CREATE TABLE object (
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
created_at TEXT,
size INTEGER,
content_type TEXT,
etag TEXT,
deleted INTEGER DEFAULT 0
);
CREATE INDEX ix_object_deleted_name ON object (deleted, name);
CREATE TRIGGER object_insert AFTER INSERT ON object
BEGIN
UPDATE container_stat
SET object_count = object_count + (1 - new.deleted),
bytes_used = bytes_used + new.size,
hash = chexor(hash, new.name, new.created_at);
END;
CREATE TRIGGER object_update BEFORE UPDATE ON object
BEGIN
SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT');
END;
CREATE TRIGGER object_delete AFTER DELETE ON object
BEGIN
UPDATE container_stat
SET object_count = object_count - (1 - old.deleted),
bytes_used = bytes_used - old.size,
hash = chexor(hash, old.name, old.created_at);
END;
""")
def create_container_stat_table(self, conn, put_timestamp=None):
"""
Create the container_stat table which is specific to the container DB.
:param conn: DB connection object
:param put_timestamp: put timestamp
"""
if put_timestamp is None:
put_timestamp = normalize_timestamp(0)
conn.executescript("""
CREATE TABLE container_stat (
account TEXT,
container TEXT,
created_at TEXT,
put_timestamp TEXT DEFAULT '0',
delete_timestamp TEXT DEFAULT '0',
object_count INTEGER,
bytes_used INTEGER,
reported_put_timestamp TEXT DEFAULT '0',
reported_delete_timestamp TEXT DEFAULT '0',
reported_object_count INTEGER DEFAULT 0,
reported_bytes_used INTEGER DEFAULT 0,
hash TEXT default '00000000000000000000000000000000',
id TEXT,
status TEXT DEFAULT '',
status_changed_at TEXT DEFAULT '0',
metadata TEXT DEFAULT '',
x_container_sync_point1 INTEGER DEFAULT -1,
x_container_sync_point2 INTEGER DEFAULT -1
);
INSERT INTO container_stat (object_count, bytes_used)
VALUES (0, 0);
""")
conn.execute('''
UPDATE container_stat
SET account = ?, container = ?, created_at = ?, id = ?,
put_timestamp = ?
''', (self.account, self.container, normalize_timestamp(time.time()),
str(uuid4()), put_timestamp))
def get_db_version(self, conn):
if self._db_version == -1:
self._db_version = 0
for row in conn.execute('''
SELECT name FROM sqlite_master
WHERE name = 'ix_object_deleted_name' '''):
self._db_version = 1
return self._db_version
def _newid(self, conn):
conn.execute('''
UPDATE container_stat
SET reported_put_timestamp = 0, reported_delete_timestamp = 0,
reported_object_count = 0, reported_bytes_used = 0''')
def _delete_db(self, conn, timestamp):
"""
Mark the DB as deleted
:param conn: DB connection object
:param timestamp: timestamp to mark as deleted
"""
conn.execute("""
UPDATE container_stat
SET delete_timestamp = ?,
status = 'DELETED',
status_changed_at = ?
WHERE delete_timestamp < ? """, (timestamp, timestamp, timestamp))
def _commit_puts_load(self, item_list, entry):
(name, timestamp, size, content_type, etag, deleted) = \
pickle.loads(entry.decode('base64'))
item_list.append({'name': name,
'created_at': timestamp,
'size': size,
'content_type': content_type,
'etag': etag,
'deleted': deleted})
def empty(self):
"""
Check if the DB is empty.
:returns: True if the database has no active objects, False otherwise
"""
self._commit_puts_stale_ok()
with self.get() as conn:
row = conn.execute(
'SELECT object_count from container_stat').fetchone()
return (row[0] == 0)
def delete_object(self, name, timestamp):
"""
Mark an object deleted.
:param name: object name to be deleted
:param timestamp: timestamp when the object was marked as deleted
"""
self.put_object(name, timestamp, 0, 'application/deleted', 'noetag', 1)
def put_object(self, name, timestamp, size, content_type, etag, deleted=0):
"""
Creates an object in the DB with its metadata.
:param name: object name to be created
:param timestamp: timestamp of when the object was created
:param size: object size
:param content_type: object content-type
:param etag: object etag
:param deleted: if True, marks the object as deleted and sets the
deteleted_at timestamp to timestamp
"""
record = {'name': name, 'created_at': timestamp, 'size': size,
'content_type': content_type, 'etag': etag,
'deleted': deleted}
if self.db_file == ':memory:':
self.merge_items([record])
return
if not os.path.exists(self.db_file):
raise DatabaseConnectionError(self.db_file, "DB doesn't exist")
pending_size = 0
try:
pending_size = os.path.getsize(self.pending_file)
except OSError as err:
if err.errno != errno.ENOENT:
raise
if pending_size > PENDING_CAP:
self._commit_puts([record])
else:
with lock_parent_directory(self.pending_file,
self.pending_timeout):
with open(self.pending_file, 'a+b') as fp:
# Colons aren't used in base64 encoding; so they are our
# delimiter
fp.write(':')
fp.write(pickle.dumps(
(name, timestamp, size, content_type, etag, deleted),
protocol=PICKLE_PROTOCOL).encode('base64'))
fp.flush()
def is_deleted(self, timestamp=None):
"""
Check if the DB is considered to be deleted.
:returns: True if the DB is considered to be deleted, False otherwise
"""
if self.db_file != ':memory:' and not os.path.exists(self.db_file):
return True
self._commit_puts_stale_ok()
with self.get() as conn:
row = conn.execute('''
SELECT put_timestamp, delete_timestamp, object_count
FROM container_stat''').fetchone()
# leave this db as a tombstone for a consistency window
if timestamp and row['delete_timestamp'] > timestamp:
return False
# The container is considered deleted if the delete_timestamp
# value is greater than the put_timestamp, and there are no
# objects in the container.
return (row['object_count'] in (None, '', 0, '0')) and \
(float(row['delete_timestamp']) > float(row['put_timestamp']))
def get_info(self):
"""
Get global data for the container.
:returns: dict with keys: account, container, created_at,
put_timestamp, delete_timestamp, object_count, bytes_used,
reported_put_timestamp, reported_delete_timestamp,
reported_object_count, reported_bytes_used, hash, id,
x_container_sync_point1, and x_container_sync_point2.
"""
self._commit_puts_stale_ok()
with self.get() as conn:
data = None
trailing = 'x_container_sync_point1, x_container_sync_point2'
while not data:
try:
data = conn.execute('''
SELECT account, container, created_at, put_timestamp,
delete_timestamp, object_count, bytes_used,
reported_put_timestamp, reported_delete_timestamp,
reported_object_count, reported_bytes_used, hash,
id, %s
FROM container_stat
''' % (trailing,)).fetchone()
except sqlite3.OperationalError as err:
if 'no such column: x_container_sync_point' in str(err):
trailing = '-1 AS x_container_sync_point1, ' \
'-1 AS x_container_sync_point2'
else:
raise
data = dict(data)
return data
def set_x_container_sync_points(self, sync_point1, sync_point2):
with self.get() as conn:
orig_isolation_level = conn.isolation_level
try:
# We turn off auto-transactions to ensure the alter table
# commands are part of the transaction.
conn.isolation_level = None
conn.execute('BEGIN')
try:
self._set_x_container_sync_points(conn, sync_point1,
sync_point2)
except sqlite3.OperationalError as err:
if 'no such column: x_container_sync_point' not in \
str(err):
raise
conn.execute('''
ALTER TABLE container_stat
ADD COLUMN x_container_sync_point1 INTEGER DEFAULT -1
''')
conn.execute('''
ALTER TABLE container_stat
ADD COLUMN x_container_sync_point2 INTEGER DEFAULT -1
''')
self._set_x_container_sync_points(conn, sync_point1,
sync_point2)
conn.execute('COMMIT')
finally:
conn.isolation_level = orig_isolation_level
def _set_x_container_sync_points(self, conn, sync_point1, sync_point2):
if sync_point1 is not None and sync_point2 is not None:
conn.execute('''
UPDATE container_stat
SET x_container_sync_point1 = ?,
x_container_sync_point2 = ?
''', (sync_point1, sync_point2))
elif sync_point1 is not None:
conn.execute('''
UPDATE container_stat
SET x_container_sync_point1 = ?
''', (sync_point1,))
elif sync_point2 is not None:
conn.execute('''
UPDATE container_stat
SET x_container_sync_point2 = ?
''', (sync_point2,))
def reported(self, put_timestamp, delete_timestamp, object_count,
bytes_used):
"""
Update reported stats.
:param put_timestamp: put_timestamp to update
:param delete_timestamp: delete_timestamp to update
:param object_count: object_count to update
:param bytes_used: bytes_used to update
"""
with self.get() as conn:
conn.execute('''
UPDATE container_stat
SET reported_put_timestamp = ?, reported_delete_timestamp = ?,
reported_object_count = ?, reported_bytes_used = ?
''', (put_timestamp, delete_timestamp, object_count, bytes_used))
conn.commit()
def list_objects_iter(self, limit, marker, end_marker, prefix, delimiter,
path=None):
"""
Get a list of objects sorted by name starting at marker onward, up
to limit entries. Entries will begin with the prefix and will not
have the delimiter after the prefix.
:param limit: maximum number of entries to get
:param marker: marker query
:param end_marker: end marker query
:param prefix: prefix query
:param delimiter: delimiter for query
:param path: if defined, will set the prefix and delimter based on
the path
:returns: list of tuples of (name, created_at, size, content_type,
etag)
"""
delim_force_gte = False
(marker, end_marker, prefix, delimiter, path) = utf8encode(
marker, end_marker, prefix, delimiter, path)
self._commit_puts_stale_ok()
if path is not None:
prefix = path
if path:
prefix = path = path.rstrip('/') + '/'
delimiter = '/'
elif delimiter and not prefix:
prefix = ''
orig_marker = marker
with self.get() as conn:
results = []
while len(results) < limit:
query = '''SELECT name, created_at, size, content_type, etag
FROM object WHERE'''
query_args = []
if end_marker:
query += ' name < ? AND'
query_args.append(end_marker)
if delim_force_gte:
query += ' name >= ? AND'
query_args.append(marker)
# Always set back to False
delim_force_gte = False
elif marker and marker >= prefix:
query += ' name > ? AND'
query_args.append(marker)
elif prefix:
query += ' name >= ? AND'
query_args.append(prefix)
if self.get_db_version(conn) < 1:
query += ' +deleted = 0'
else:
query += ' deleted = 0'
query += ' ORDER BY name LIMIT ?'
query_args.append(limit - len(results))
curs = conn.execute(query, query_args)
curs.row_factory = None
if prefix is None:
# A delimiter without a specified prefix is ignored
return [r for r in curs]
if not delimiter:
if not prefix:
# It is possible to have a delimiter but no prefix
# specified. As above, the prefix will be set to the
# empty string, so avoid performing the extra work to
# check against an empty prefix.
return [r for r in curs]
else:
return [r for r in curs if r[0].startswith(prefix)]
# We have a delimiter and a prefix (possibly empty string) to
# handle
rowcount = 0
for row in curs:
rowcount += 1
marker = name = row[0]
if len(results) >= limit or not name.startswith(prefix):
curs.close()
return results
end = name.find(delimiter, len(prefix))
if path is not None:
if name == path:
continue
if end >= 0 and len(name) > end + len(delimiter):
marker = name[:end] + chr(ord(delimiter) + 1)
curs.close()
break
elif end > 0:
marker = name[:end] + chr(ord(delimiter) + 1)
# we want result to be inclusinve of delim+1
delim_force_gte = True
dir_name = name[:end + 1]
if dir_name != orig_marker:
results.append([dir_name, '0', 0, None, ''])
curs.close()
break
results.append(row)
if not rowcount:
break
return results
def merge_items(self, item_list, source=None):
"""
Merge items into the object table.
:param item_list: list of dictionaries of {'name', 'created_at',
'size', 'content_type', 'etag', 'deleted'}
:param source: if defined, update incoming_sync with the source
"""
with self.get() as conn:
max_rowid = -1
for rec in item_list:
query = '''
DELETE FROM object
WHERE name = ? AND (created_at < ?)
'''
if self.get_db_version(conn) >= 1:
query += ' AND deleted IN (0, 1)'
conn.execute(query, (rec['name'], rec['created_at']))
query = 'SELECT 1 FROM object WHERE name = ?'
if self.get_db_version(conn) >= 1:
query += ' AND deleted IN (0, 1)'
if not conn.execute(query, (rec['name'],)).fetchall():
conn.execute('''
INSERT INTO object (name, created_at, size,
content_type, etag, deleted)
VALUES (?, ?, ?, ?, ?, ?)
''', ([rec['name'], rec['created_at'], rec['size'],
rec['content_type'], rec['etag'], rec['deleted']]))
if source:
max_rowid = max(max_rowid, rec['ROWID'])
if source:
try:
conn.execute('''
INSERT INTO incoming_sync (sync_point, remote_id)
VALUES (?, ?)
''', (max_rowid, source))
except sqlite3.IntegrityError:
conn.execute('''
UPDATE incoming_sync SET sync_point=max(?, sync_point)
WHERE remote_id=?
''', (max_rowid, source))
conn.commit()

View File

@ -14,12 +14,13 @@
# limitations under the License.
from swift.container import server as container_server
from swift.common import db, db_replicator
from swift.container.backend import ContainerBroker
from swift.common import db_replicator
class ContainerReplicator(db_replicator.Replicator):
server_type = 'container'
brokerclass = db.ContainerBroker
brokerclass = ContainerBroker
datadir = container_server.DATADIR
default_port = 6001

View File

@ -25,7 +25,8 @@ from xml.etree.cElementTree import Element, SubElement, tostring
from eventlet import Timeout
import swift.common.db
from swift.common.db import ContainerBroker
from swift.container.backend import ContainerBroker
from swift.common.db import DatabaseAlreadyExists
from swift.common.request_helpers import get_param, get_listing_content_type, \
split_and_validate_path
from swift.common.utils import get_logger, hash_path, public, \
@ -194,7 +195,7 @@ class ContainerController(object):
try:
broker.initialize(normalize_timestamp(
req.headers.get('x-timestamp') or time.time()))
except swift.common.db.DatabaseAlreadyExists:
except DatabaseAlreadyExists:
pass
if not os.path.exists(broker.db_file):
return HTTPNotFound()
@ -241,7 +242,7 @@ class ContainerController(object):
not os.path.exists(broker.db_file):
try:
broker.initialize(timestamp)
except swift.common.db.DatabaseAlreadyExists:
except DatabaseAlreadyExists:
pass
if not os.path.exists(broker.db_file):
return HTTPNotFound()
@ -254,7 +255,7 @@ class ContainerController(object):
try:
broker.initialize(timestamp)
created = True
except swift.common.db.DatabaseAlreadyExists:
except DatabaseAlreadyExists:
pass
else:
created = broker.is_deleted()

View File

@ -24,9 +24,9 @@ import swift.common.db
from swift.container import server as container_server
from swiftclient import ClientException, delete_object, put_object, \
quote
from swift.container.backend import ContainerBroker
from swift.common.direct_client import direct_get_object
from swift.common.ring import Ring
from swift.common.db import ContainerBroker
from swift.common.utils import audit_location_generator, get_logger, \
hash_path, config_true_value, validate_sync_to, whataremyips, FileLikeIter
from swift.common.daemon import Daemon

View File

@ -25,9 +25,9 @@ from tempfile import mkstemp
from eventlet import spawn, patcher, Timeout
import swift.common.db
from swift.container.backend import ContainerBroker
from swift.container.server import DATADIR
from swift.common.bufferedhttp import http_connect
from swift.common.db import ContainerBroker
from swift.common.exceptions import ConnectionTimeout
from swift.common.ring import Ring
from swift.common.utils import get_logger, config_true_value, dump_recon_cache

View File

@ -0,0 +1,540 @@
# Copyright (c) 2010-2012 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Tests for swift.account.backend """
from __future__ import with_statement
import hashlib
import unittest
from time import sleep, time
from uuid import uuid4
from swift.account.backend import AccountBroker
from swift.common.utils import normalize_timestamp
class TestAccountBroker(unittest.TestCase):
"""Tests for AccountBroker"""
def test_creation(self):
# Test AccountBroker.__init__
broker = AccountBroker(':memory:', account='a')
self.assertEqual(broker.db_file, ':memory:')
got_exc = False
try:
with broker.get() as conn:
pass
except Exception:
got_exc = True
self.assert_(got_exc)
broker.initialize(normalize_timestamp('1'))
with broker.get() as conn:
curs = conn.cursor()
curs.execute('SELECT 1')
self.assertEqual(curs.fetchall()[0][0], 1)
def test_exception(self):
# Test AccountBroker throwing a conn away after exception
first_conn = None
broker = AccountBroker(':memory:', account='a')
broker.initialize(normalize_timestamp('1'))
with broker.get() as conn:
first_conn = conn
try:
with broker.get() as conn:
self.assertEquals(first_conn, conn)
raise Exception('OMG')
except Exception:
pass
self.assert_(broker.conn is None)
def test_empty(self):
# Test AccountBroker.empty
broker = AccountBroker(':memory:', account='a')
broker.initialize(normalize_timestamp('1'))
self.assert_(broker.empty())
broker.put_container('o', normalize_timestamp(time()), 0, 0, 0)
self.assert_(not broker.empty())
sleep(.00001)
broker.put_container('o', 0, normalize_timestamp(time()), 0, 0)
self.assert_(broker.empty())
def test_reclaim(self):
broker = AccountBroker(':memory:', account='test_account')
broker.initialize(normalize_timestamp('1'))
broker.put_container('c', normalize_timestamp(time()), 0, 0, 0)
with broker.get() as conn:
self.assertEquals(conn.execute(
"SELECT count(*) FROM container "
"WHERE deleted = 0").fetchone()[0], 1)
self.assertEquals(conn.execute(
"SELECT count(*) FROM container "
"WHERE deleted = 1").fetchone()[0], 0)
broker.reclaim(normalize_timestamp(time() - 999), time())
with broker.get() as conn:
self.assertEquals(conn.execute(
"SELECT count(*) FROM container "
"WHERE deleted = 0").fetchone()[0], 1)
self.assertEquals(conn.execute(
"SELECT count(*) FROM container "
"WHERE deleted = 1").fetchone()[0], 0)
sleep(.00001)
broker.put_container('c', 0, normalize_timestamp(time()), 0, 0)
with broker.get() as conn:
self.assertEquals(conn.execute(
"SELECT count(*) FROM container "
"WHERE deleted = 0").fetchone()[0], 0)
self.assertEquals(conn.execute(
"SELECT count(*) FROM container "
"WHERE deleted = 1").fetchone()[0], 1)
broker.reclaim(normalize_timestamp(time() - 999), time())
with broker.get() as conn:
self.assertEquals(conn.execute(
"SELECT count(*) FROM container "
"WHERE deleted = 0").fetchone()[0], 0)
self.assertEquals(conn.execute(
"SELECT count(*) FROM container "
"WHERE deleted = 1").fetchone()[0], 1)
sleep(.00001)
broker.reclaim(normalize_timestamp(time()), time())
with broker.get() as conn:
self.assertEquals(conn.execute(
"SELECT count(*) FROM container "
"WHERE deleted = 0").fetchone()[0], 0)
self.assertEquals(conn.execute(
"SELECT count(*) FROM container "
"WHERE deleted = 1").fetchone()[0], 0)
# Test reclaim after deletion. Create 3 test containers
broker.put_container('x', 0, 0, 0, 0)
broker.put_container('y', 0, 0, 0, 0)
broker.put_container('z', 0, 0, 0, 0)
broker.reclaim(normalize_timestamp(time()), time())
# self.assertEquals(len(res), 2)
# self.assert_(isinstance(res, tuple))
# containers, account_name = res
# self.assert_(containers is None)
# self.assert_(account_name is None)
# Now delete the account
broker.delete_db(normalize_timestamp(time()))
broker.reclaim(normalize_timestamp(time()), time())
# self.assertEquals(len(res), 2)
# self.assert_(isinstance(res, tuple))
# containers, account_name = res
# self.assertEquals(account_name, 'test_account')
# self.assertEquals(len(containers), 3)
# self.assert_('x' in containers)
# self.assert_('y' in containers)
# self.assert_('z' in containers)
# self.assert_('a' not in containers)
def test_delete_container(self):
# Test AccountBroker.delete_container
broker = AccountBroker(':memory:', account='a')
broker.initialize(normalize_timestamp('1'))
broker.put_container('o', normalize_timestamp(time()), 0, 0, 0)
with broker.get() as conn:
self.assertEquals(conn.execute(
"SELECT count(*) FROM container "
"WHERE deleted = 0").fetchone()[0], 1)
self.assertEquals(conn.execute(
"SELECT count(*) FROM container "
"WHERE deleted = 1").fetchone()[0], 0)
sleep(.00001)
broker.put_container('o', 0, normalize_timestamp(time()), 0, 0)
with broker.get() as conn:
self.assertEquals(conn.execute(
"SELECT count(*) FROM container "
"WHERE deleted = 0").fetchone()[0], 0)
self.assertEquals(conn.execute(
"SELECT count(*) FROM container "
"WHERE deleted = 1").fetchone()[0], 1)
def test_put_container(self):
# Test AccountBroker.put_container
broker = AccountBroker(':memory:', account='a')
broker.initialize(normalize_timestamp('1'))
# Create initial container
timestamp = normalize_timestamp(time())
broker.put_container('"{<container \'&\' name>}"', timestamp, 0, 0, 0)
with broker.get() as conn:
self.assertEquals(conn.execute(
"SELECT name FROM container").fetchone()[0],
'"{<container \'&\' name>}"')
self.assertEquals(conn.execute(
"SELECT put_timestamp FROM container").fetchone()[0],
timestamp)
self.assertEquals(conn.execute(
"SELECT deleted FROM container").fetchone()[0], 0)
# Reput same event
broker.put_container('"{<container \'&\' name>}"', timestamp, 0, 0, 0)
with broker.get() as conn:
self.assertEquals(conn.execute(
"SELECT name FROM container").fetchone()[0],
'"{<container \'&\' name>}"')
self.assertEquals(conn.execute(
"SELECT put_timestamp FROM container").fetchone()[0],
timestamp)
self.assertEquals(conn.execute(
"SELECT deleted FROM container").fetchone()[0], 0)
# Put new event
sleep(.00001)
timestamp = normalize_timestamp(time())
broker.put_container('"{<container \'&\' name>}"', timestamp, 0, 0, 0)
with broker.get() as conn:
self.assertEquals(conn.execute(
"SELECT name FROM container").fetchone()[0],
'"{<container \'&\' name>}"')
self.assertEquals(conn.execute(
"SELECT put_timestamp FROM container").fetchone()[0],
timestamp)
self.assertEquals(conn.execute(
"SELECT deleted FROM container").fetchone()[0], 0)
# Put old event
otimestamp = normalize_timestamp(float(timestamp) - 1)
broker.put_container('"{<container \'&\' name>}"', otimestamp, 0, 0, 0)
with broker.get() as conn:
self.assertEquals(conn.execute(
"SELECT name FROM container").fetchone()[0],
'"{<container \'&\' name>}"')
self.assertEquals(conn.execute(
"SELECT put_timestamp FROM container").fetchone()[0],
timestamp)
self.assertEquals(conn.execute(
"SELECT deleted FROM container").fetchone()[0], 0)
# Put old delete event
dtimestamp = normalize_timestamp(float(timestamp) - 1)
broker.put_container('"{<container \'&\' name>}"', 0, dtimestamp, 0, 0)
with broker.get() as conn:
self.assertEquals(conn.execute(
"SELECT name FROM container").fetchone()[0],
'"{<container \'&\' name>}"')
self.assertEquals(conn.execute(
"SELECT put_timestamp FROM container").fetchone()[0],
timestamp)
self.assertEquals(conn.execute(
"SELECT delete_timestamp FROM container").fetchone()[0],
dtimestamp)
self.assertEquals(conn.execute(
"SELECT deleted FROM container").fetchone()[0], 0)
# Put new delete event
sleep(.00001)
timestamp = normalize_timestamp(time())
broker.put_container('"{<container \'&\' name>}"', 0, timestamp, 0, 0)
with broker.get() as conn:
self.assertEquals(conn.execute(
"SELECT name FROM container").fetchone()[0],
'"{<container \'&\' name>}"')
self.assertEquals(conn.execute(
"SELECT delete_timestamp FROM container").fetchone()[0],
timestamp)
self.assertEquals(conn.execute(
"SELECT deleted FROM container").fetchone()[0], 1)
# Put new event
sleep(.00001)
timestamp = normalize_timestamp(time())
broker.put_container('"{<container \'&\' name>}"', timestamp, 0, 0, 0)
with broker.get() as conn:
self.assertEquals(conn.execute(
"SELECT name FROM container").fetchone()[0],
'"{<container \'&\' name>}"')
self.assertEquals(conn.execute(
"SELECT put_timestamp FROM container").fetchone()[0],
timestamp)
self.assertEquals(conn.execute(
"SELECT deleted FROM container").fetchone()[0], 0)
def test_get_info(self):
# Test AccountBroker.get_info
broker = AccountBroker(':memory:', account='test1')
broker.initialize(normalize_timestamp('1'))
info = broker.get_info()
self.assertEquals(info['account'], 'test1')
self.assertEquals(info['hash'], '00000000000000000000000000000000')
info = broker.get_info()
self.assertEquals(info['container_count'], 0)
broker.put_container('c1', normalize_timestamp(time()), 0, 0, 0)
info = broker.get_info()
self.assertEquals(info['container_count'], 1)
sleep(.00001)
broker.put_container('c2', normalize_timestamp(time()), 0, 0, 0)
info = broker.get_info()
self.assertEquals(info['container_count'], 2)
sleep(.00001)
broker.put_container('c2', normalize_timestamp(time()), 0, 0, 0)
info = broker.get_info()
self.assertEquals(info['container_count'], 2)
sleep(.00001)
broker.put_container('c1', 0, normalize_timestamp(time()), 0, 0)
info = broker.get_info()
self.assertEquals(info['container_count'], 1)
sleep(.00001)
broker.put_container('c2', 0, normalize_timestamp(time()), 0, 0)
info = broker.get_info()
self.assertEquals(info['container_count'], 0)
def test_list_containers_iter(self):
# Test AccountBroker.list_containers_iter
broker = AccountBroker(':memory:', account='a')
broker.initialize(normalize_timestamp('1'))
for cont1 in xrange(4):
for cont2 in xrange(125):
broker.put_container('%d-%04d' % (cont1, cont2),
normalize_timestamp(time()), 0, 0, 0)
for cont in xrange(125):
broker.put_container('2-0051-%04d' % cont,
normalize_timestamp(time()), 0, 0, 0)
for cont in xrange(125):
broker.put_container('3-%04d-0049' % cont,
normalize_timestamp(time()), 0, 0, 0)
listing = broker.list_containers_iter(100, '', None, None, '')
self.assertEquals(len(listing), 100)
self.assertEquals(listing[0][0], '0-0000')
self.assertEquals(listing[-1][0], '0-0099')
listing = broker.list_containers_iter(100, '', '0-0050', None, '')
self.assertEquals(len(listing), 50)
self.assertEquals(listing[0][0], '0-0000')
self.assertEquals(listing[-1][0], '0-0049')
listing = broker.list_containers_iter(100, '0-0099', None, None, '')
self.assertEquals(len(listing), 100)
self.assertEquals(listing[0][0], '0-0100')
self.assertEquals(listing[-1][0], '1-0074')
listing = broker.list_containers_iter(55, '1-0074', None, None, '')
self.assertEquals(len(listing), 55)
self.assertEquals(listing[0][0], '1-0075')
self.assertEquals(listing[-1][0], '2-0004')
listing = broker.list_containers_iter(10, '', None, '0-01', '')
self.assertEquals(len(listing), 10)
self.assertEquals(listing[0][0], '0-0100')
self.assertEquals(listing[-1][0], '0-0109')
listing = broker.list_containers_iter(10, '', None, '0-01', '-')
self.assertEquals(len(listing), 10)
self.assertEquals(listing[0][0], '0-0100')
self.assertEquals(listing[-1][0], '0-0109')
listing = broker.list_containers_iter(10, '', None, '0-', '-')
self.assertEquals(len(listing), 10)
self.assertEquals(listing[0][0], '0-0000')
self.assertEquals(listing[-1][0], '0-0009')
listing = broker.list_containers_iter(10, '', None, '', '-')
self.assertEquals(len(listing), 4)
self.assertEquals([row[0] for row in listing],
['0-', '1-', '2-', '3-'])
listing = broker.list_containers_iter(10, '2-', None, None, '-')
self.assertEquals(len(listing), 1)
self.assertEquals([row[0] for row in listing], ['3-'])
listing = broker.list_containers_iter(10, '', None, '2', '-')
self.assertEquals(len(listing), 1)
self.assertEquals([row[0] for row in listing], ['2-'])
listing = broker.list_containers_iter(10, '2-0050', None, '2-', '-')
self.assertEquals(len(listing), 10)
self.assertEquals(listing[0][0], '2-0051')
self.assertEquals(listing[1][0], '2-0051-')
self.assertEquals(listing[2][0], '2-0052')
self.assertEquals(listing[-1][0], '2-0059')
listing = broker.list_containers_iter(10, '3-0045', None, '3-', '-')
self.assertEquals(len(listing), 10)
self.assertEquals([row[0] for row in listing],
['3-0045-', '3-0046', '3-0046-', '3-0047',
'3-0047-', '3-0048', '3-0048-', '3-0049',
'3-0049-', '3-0050'])
broker.put_container('3-0049-', normalize_timestamp(time()), 0, 0, 0)
listing = broker.list_containers_iter(10, '3-0048', None, None, None)
self.assertEquals(len(listing), 10)
self.assertEquals([row[0] for row in listing],
['3-0048-0049', '3-0049', '3-0049-', '3-0049-0049',
'3-0050', '3-0050-0049', '3-0051', '3-0051-0049',
'3-0052', '3-0052-0049'])
listing = broker.list_containers_iter(10, '3-0048', None, '3-', '-')
self.assertEquals(len(listing), 10)
self.assertEquals([row[0] for row in listing],
['3-0048-', '3-0049', '3-0049-', '3-0050',
'3-0050-', '3-0051', '3-0051-', '3-0052',
'3-0052-', '3-0053'])
listing = broker.list_containers_iter(10, None, None, '3-0049-', '-')
self.assertEquals(len(listing), 2)
self.assertEquals([row[0] for row in listing],
['3-0049-', '3-0049-0049'])
def test_double_check_trailing_delimiter(self):
# Test AccountBroker.list_containers_iter for an
# account that has an odd container with a trailing delimiter
broker = AccountBroker(':memory:', account='a')
broker.initialize(normalize_timestamp('1'))
broker.put_container('a', normalize_timestamp(time()), 0, 0, 0)
broker.put_container('a-', normalize_timestamp(time()), 0, 0, 0)
broker.put_container('a-a', normalize_timestamp(time()), 0, 0, 0)
broker.put_container('a-a-a', normalize_timestamp(time()), 0, 0, 0)
broker.put_container('a-a-b', normalize_timestamp(time()), 0, 0, 0)
broker.put_container('a-b', normalize_timestamp(time()), 0, 0, 0)
broker.put_container('b', normalize_timestamp(time()), 0, 0, 0)
broker.put_container('b-a', normalize_timestamp(time()), 0, 0, 0)
broker.put_container('b-b', normalize_timestamp(time()), 0, 0, 0)
broker.put_container('c', normalize_timestamp(time()), 0, 0, 0)
listing = broker.list_containers_iter(15, None, None, None, None)
self.assertEquals(len(listing), 10)
self.assertEquals([row[0] for row in listing],
['a', 'a-', 'a-a', 'a-a-a', 'a-a-b', 'a-b', 'b',
'b-a', 'b-b', 'c'])
listing = broker.list_containers_iter(15, None, None, '', '-')
self.assertEquals(len(listing), 5)
self.assertEquals([row[0] for row in listing],
['a', 'a-', 'b', 'b-', 'c'])
listing = broker.list_containers_iter(15, None, None, 'a-', '-')
self.assertEquals(len(listing), 4)
self.assertEquals([row[0] for row in listing],
['a-', 'a-a', 'a-a-', 'a-b'])
listing = broker.list_containers_iter(15, None, None, 'b-', '-')
self.assertEquals(len(listing), 2)
self.assertEquals([row[0] for row in listing], ['b-a', 'b-b'])
def test_chexor(self):
broker = AccountBroker(':memory:', account='a')
broker.initialize(normalize_timestamp('1'))
broker.put_container('a', normalize_timestamp(1),
normalize_timestamp(0), 0, 0)
broker.put_container('b', normalize_timestamp(2),
normalize_timestamp(0), 0, 0)
hasha = hashlib.md5(
'%s-%s' % ('a', '0000000001.00000-0000000000.00000-0-0')
).digest()
hashb = hashlib.md5(
'%s-%s' % ('b', '0000000002.00000-0000000000.00000-0-0')
).digest()
hashc = \
''.join(('%02x' % (ord(a) ^ ord(b)) for a, b in zip(hasha, hashb)))
self.assertEquals(broker.get_info()['hash'], hashc)
broker.put_container('b', normalize_timestamp(3),
normalize_timestamp(0), 0, 0)
hashb = hashlib.md5(
'%s-%s' % ('b', '0000000003.00000-0000000000.00000-0-0')
).digest()
hashc = \
''.join(('%02x' % (ord(a) ^ ord(b)) for a, b in zip(hasha, hashb)))
self.assertEquals(broker.get_info()['hash'], hashc)
def test_merge_items(self):
broker1 = AccountBroker(':memory:', account='a')
broker1.initialize(normalize_timestamp('1'))
broker2 = AccountBroker(':memory:', account='a')
broker2.initialize(normalize_timestamp('1'))
broker1.put_container('a', normalize_timestamp(1), 0, 0, 0)
broker1.put_container('b', normalize_timestamp(2), 0, 0, 0)
id = broker1.get_info()['id']
broker2.merge_items(broker1.get_items_since(
broker2.get_sync(id), 1000), id)
items = broker2.get_items_since(-1, 1000)
self.assertEquals(len(items), 2)
self.assertEquals(['a', 'b'], sorted([rec['name'] for rec in items]))
broker1.put_container('c', normalize_timestamp(3), 0, 0, 0)
broker2.merge_items(broker1.get_items_since(
broker2.get_sync(id), 1000), id)
items = broker2.get_items_since(-1, 1000)
self.assertEquals(len(items), 3)
self.assertEquals(['a', 'b', 'c'],
sorted([rec['name'] for rec in items]))
def premetadata_create_account_stat_table(self, conn, put_timestamp):
"""
Copied from AccountBroker before the metadata column was
added; used for testing with TestAccountBrokerBeforeMetadata.
Create account_stat table which is specific to the account DB.
:param conn: DB connection object
:param put_timestamp: put timestamp
"""
conn.executescript('''
CREATE TABLE account_stat (
account TEXT,
created_at TEXT,
put_timestamp TEXT DEFAULT '0',
delete_timestamp TEXT DEFAULT '0',
container_count INTEGER,
object_count INTEGER DEFAULT 0,
bytes_used INTEGER DEFAULT 0,
hash TEXT default '00000000000000000000000000000000',
id TEXT,
status TEXT DEFAULT '',
status_changed_at TEXT DEFAULT '0'
);
INSERT INTO account_stat (container_count) VALUES (0);
''')
conn.execute('''
UPDATE account_stat SET account = ?, created_at = ?, id = ?,
put_timestamp = ?
''', (self.account, normalize_timestamp(time()), str(uuid4()),
put_timestamp))
class TestAccountBrokerBeforeMetadata(TestAccountBroker):
"""
Tests for AccountBroker against databases created before
the metadata column was added.
"""
def setUp(self):
self._imported_create_account_stat_table = \
AccountBroker.create_account_stat_table
AccountBroker.create_account_stat_table = \
premetadata_create_account_stat_table
broker = AccountBroker(':memory:', account='a')
broker.initialize(normalize_timestamp('1'))
exc = None
with broker.get() as conn:
try:
conn.execute('SELECT metadata FROM account_stat')
except BaseException as err:
exc = err
self.assert_('no such column: metadata' in str(exc))
def tearDown(self):
AccountBroker.create_account_stat_table = \
self._imported_create_account_stat_table
broker = AccountBroker(':memory:', account='a')
broker.initialize(normalize_timestamp('1'))
with broker.get() as conn:
conn.execute('SELECT metadata FROM account_stat')

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -26,7 +26,7 @@ from eventlet import spawn, Timeout, listen
from swift.common import utils
from swift.container import updater as container_updater
from swift.container import server as container_server
from swift.common.db import ContainerBroker
from swift.container.backend import ContainerBroker
from swift.common.ring import RingData
from swift.common.utils import normalize_timestamp