Refactoring storage policies merge_timestamps
* base implementation of is_deleted phrased to use _is_deleted * wrap pre-conn coded _is_deleted inside a transation for merge_timestamps Implements: blueprint storage-policies Change-Id: I6a948908c3e45b70707981d87171cb2cb910fe1e
This commit is contained in:
parent
00a162c4d4
commit
b02f0db126
@ -154,9 +154,9 @@ class AccountBroker(DatabaseBroker):
|
|||||||
|
|
||||||
conn.execute('''
|
conn.execute('''
|
||||||
UPDATE account_stat SET account = ?, created_at = ?, id = ?,
|
UPDATE account_stat SET account = ?, created_at = ?, id = ?,
|
||||||
put_timestamp = ?
|
put_timestamp = ?, status_changed_at = ?
|
||||||
''', (self.account, normalize_timestamp(time.time()), str(uuid4()),
|
''', (self.account, normalize_timestamp(time.time()), str(uuid4()),
|
||||||
put_timestamp))
|
put_timestamp, put_timestamp))
|
||||||
|
|
||||||
def create_policy_stat_table(self, conn):
|
def create_policy_stat_table(self, conn):
|
||||||
"""
|
"""
|
||||||
@ -284,23 +284,29 @@ class AccountBroker(DatabaseBroker):
|
|||||||
protocol=PICKLE_PROTOCOL).encode('base64'))
|
protocol=PICKLE_PROTOCOL).encode('base64'))
|
||||||
fp.flush()
|
fp.flush()
|
||||||
|
|
||||||
def is_deleted(self):
|
def _is_deleted_info(self, status, container_count, delete_timestamp,
|
||||||
|
put_timestamp):
|
||||||
"""
|
"""
|
||||||
Check if the account DB is considered to be deleted.
|
Apply delete logic to database info.
|
||||||
|
|
||||||
:returns: True if the account DB is considered to be deleted, False
|
:returns: True if the DB is considered to be deleted, False otherwise
|
||||||
otherwise
|
|
||||||
"""
|
"""
|
||||||
if self.db_file != ':memory:' and not os.path.exists(self.db_file):
|
return status == 'DELETED' or (
|
||||||
return True
|
container_count in (None, '', 0, '0') and
|
||||||
self._commit_puts_stale_ok()
|
float(delete_timestamp) > float(put_timestamp))
|
||||||
with self.get() as conn:
|
|
||||||
row = conn.execute('''
|
def _is_deleted(self, conn):
|
||||||
SELECT put_timestamp, delete_timestamp, container_count, status
|
"""
|
||||||
FROM account_stat''').fetchone()
|
Check account_stat table and evaluate info.
|
||||||
return row['status'] == 'DELETED' or (
|
|
||||||
row['container_count'] in (None, '', 0, '0') and
|
:param conn: database conn
|
||||||
row['delete_timestamp'] > row['put_timestamp'])
|
|
||||||
|
:returns: True if the DB is considered to be deleted, False otherwise
|
||||||
|
"""
|
||||||
|
info = conn.execute('''
|
||||||
|
SELECT put_timestamp, delete_timestamp, container_count, status
|
||||||
|
FROM account_stat''').fetchone()
|
||||||
|
return self._is_deleted_info(**info)
|
||||||
|
|
||||||
def is_status_deleted(self):
|
def is_status_deleted(self):
|
||||||
"""Only returns true if the status field is set to DELETED."""
|
"""Only returns true if the status field is set to DELETED."""
|
||||||
|
@ -425,6 +425,28 @@ class DatabaseBroker(object):
|
|||||||
# Override for additional work when receiving an rsynced db.
|
# Override for additional work when receiving an rsynced db.
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def _is_deleted(self, conn):
|
||||||
|
"""
|
||||||
|
Check if the database is considered deleted
|
||||||
|
|
||||||
|
:param conn: database conn
|
||||||
|
|
||||||
|
:returns: True if the DB is considered to be deleted, False otherwise
|
||||||
|
"""
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def is_deleted(self):
|
||||||
|
"""
|
||||||
|
Check if the DB is considered to be deleted.
|
||||||
|
|
||||||
|
:returns: True if the DB is considered to be deleted, False otherwise
|
||||||
|
"""
|
||||||
|
if self.db_file != ':memory:' and not os.path.exists(self.db_file):
|
||||||
|
return True
|
||||||
|
self._commit_puts_stale_ok()
|
||||||
|
with self.get() as conn:
|
||||||
|
return self._is_deleted(conn)
|
||||||
|
|
||||||
def merge_timestamps(self, created_at, put_timestamp, delete_timestamp):
|
def merge_timestamps(self, created_at, put_timestamp, delete_timestamp):
|
||||||
"""
|
"""
|
||||||
Used in replication to handle updating timestamps.
|
Used in replication to handle updating timestamps.
|
||||||
@ -433,17 +455,18 @@ class DatabaseBroker(object):
|
|||||||
:param put_timestamp: put timestamp
|
:param put_timestamp: put timestamp
|
||||||
:param delete_timestamp: delete timestamp
|
:param delete_timestamp: delete timestamp
|
||||||
"""
|
"""
|
||||||
current_status = self.is_deleted()
|
|
||||||
with self.get() as conn:
|
with self.get() as conn:
|
||||||
|
old_status = self._is_deleted(conn)
|
||||||
conn.execute('''
|
conn.execute('''
|
||||||
UPDATE %s_stat SET created_at=MIN(?, created_at),
|
UPDATE %s_stat SET created_at=MIN(?, created_at),
|
||||||
put_timestamp=MAX(?, put_timestamp),
|
put_timestamp=MAX(?, put_timestamp),
|
||||||
delete_timestamp=MAX(?, delete_timestamp)
|
delete_timestamp=MAX(?, delete_timestamp)
|
||||||
''' % self.db_type, (created_at, put_timestamp, delete_timestamp))
|
''' % self.db_type, (created_at, put_timestamp, delete_timestamp))
|
||||||
|
if old_status != self._is_deleted(conn):
|
||||||
|
timestamp = normalize_timestamp(time.time())
|
||||||
|
self._update_status_changed_at(conn, timestamp)
|
||||||
|
|
||||||
conn.commit()
|
conn.commit()
|
||||||
if self.is_deleted() != current_status:
|
|
||||||
timestamp = normalize_timestamp(time.time())
|
|
||||||
self.update_status_changed_at(timestamp)
|
|
||||||
|
|
||||||
def get_items_since(self, start, count):
|
def get_items_since(self, start, count):
|
||||||
"""
|
"""
|
||||||
@ -489,7 +512,7 @@ class DatabaseBroker(object):
|
|||||||
with self.get() as conn:
|
with self.get() as conn:
|
||||||
curs = conn.execute('''
|
curs = conn.execute('''
|
||||||
SELECT remote_id, sync_point FROM %s_sync
|
SELECT remote_id, sync_point FROM %s_sync
|
||||||
''' % 'incoming' if incoming else 'outgoing')
|
''' % ('incoming' if incoming else 'outgoing'))
|
||||||
result = []
|
result = []
|
||||||
for row in curs:
|
for row in curs:
|
||||||
result.append({'remote_id': row[0], 'sync_point': row[1]})
|
result.append({'remote_id': row[0], 'sync_point': row[1]})
|
||||||
@ -783,8 +806,11 @@ class DatabaseBroker(object):
|
|||||||
current status_changed_at timestamp.
|
current status_changed_at timestamp.
|
||||||
"""
|
"""
|
||||||
with self.get() as conn:
|
with self.get() as conn:
|
||||||
conn.execute(
|
self._update_status_changed_at(conn, timestamp)
|
||||||
'UPDATE %s_stat SET status_changed_at = ?'
|
|
||||||
' WHERE status_changed_at < ?' % self.db_type,
|
|
||||||
(timestamp, timestamp))
|
|
||||||
conn.commit()
|
conn.commit()
|
||||||
|
|
||||||
|
def _update_status_changed_at(self, conn, timestamp):
|
||||||
|
conn.execute(
|
||||||
|
'UPDATE %s_stat SET status_changed_at = ?'
|
||||||
|
' WHERE status_changed_at < ?' % self.db_type,
|
||||||
|
(timestamp, timestamp))
|
||||||
|
@ -361,16 +361,33 @@ class ContainerBroker(DatabaseBroker):
|
|||||||
protocol=PICKLE_PROTOCOL).encode('base64'))
|
protocol=PICKLE_PROTOCOL).encode('base64'))
|
||||||
fp.flush()
|
fp.flush()
|
||||||
|
|
||||||
def is_deleted(self, **kwargs):
|
def _is_deleted_info(self, object_count, put_timestamp, delete_timestamp,
|
||||||
|
**kwargs):
|
||||||
"""
|
"""
|
||||||
Check if the DB is considered to be deleted.
|
Apply delete logic to database info.
|
||||||
|
|
||||||
:returns: True if the DB is considered to be deleted, False otherwise
|
:returns: True if the DB is considered to be deleted, False otherwise
|
||||||
"""
|
"""
|
||||||
_info, is_deleted = self.get_info_is_deleted(**kwargs)
|
# The container is considered deleted if the delete_timestamp
|
||||||
return is_deleted
|
# value is greater than the put_timestamp, and there are no
|
||||||
|
# objects in the container.
|
||||||
|
return (object_count in (None, '', 0, '0')) and (
|
||||||
|
float(delete_timestamp) > float(put_timestamp))
|
||||||
|
|
||||||
def get_info_is_deleted(self, timestamp=None):
|
def _is_deleted(self, conn):
|
||||||
|
"""
|
||||||
|
Check container_stat view and evaluate info.
|
||||||
|
|
||||||
|
:param conn: database conn
|
||||||
|
|
||||||
|
:returns: True if the DB is considered to be deleted, False otherwise
|
||||||
|
"""
|
||||||
|
info = conn.execute('''
|
||||||
|
SELECT put_timestamp, delete_timestamp, object_count
|
||||||
|
FROM container_stat''').fetchone()
|
||||||
|
return self._is_deleted_info(**info)
|
||||||
|
|
||||||
|
def get_info_is_deleted(self):
|
||||||
"""
|
"""
|
||||||
Get the is_deleted status and info for the container.
|
Get the is_deleted status and info for the container.
|
||||||
|
|
||||||
@ -380,15 +397,7 @@ class ContainerBroker(DatabaseBroker):
|
|||||||
if self.db_file != ':memory:' and not os.path.exists(self.db_file):
|
if self.db_file != ':memory:' and not os.path.exists(self.db_file):
|
||||||
return {}, True
|
return {}, True
|
||||||
info = self.get_info()
|
info = self.get_info()
|
||||||
# leave this db as a tombstone for a consistency window
|
return info, self._is_deleted_info(**info)
|
||||||
if timestamp and info['delete_timestamp'] > timestamp:
|
|
||||||
return info, False
|
|
||||||
# The container is considered deleted if the delete_timestamp
|
|
||||||
# value is greater than the put_timestamp, and there are no
|
|
||||||
# objects in the container.
|
|
||||||
is_deleted = (info['object_count'] in (None, '', 0, '0')) and \
|
|
||||||
(float(info['delete_timestamp']) > float(info['put_timestamp']))
|
|
||||||
return info, is_deleted
|
|
||||||
|
|
||||||
def get_info(self):
|
def get_info(self):
|
||||||
"""
|
"""
|
||||||
|
@ -26,6 +26,7 @@ from shutil import rmtree
|
|||||||
import sqlite3
|
import sqlite3
|
||||||
import itertools
|
import itertools
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
|
import random
|
||||||
|
|
||||||
from swift.account.backend import AccountBroker
|
from swift.account.backend import AccountBroker
|
||||||
from swift.common.utils import normalize_timestamp
|
from swift.common.utils import normalize_timestamp
|
||||||
@ -33,6 +34,8 @@ from test.unit import patch_policies, with_tempdir
|
|||||||
from swift.common.db import DatabaseConnectionError
|
from swift.common.db import DatabaseConnectionError
|
||||||
from swift.common.storage_policy import StoragePolicy, POLICIES
|
from swift.common.storage_policy import StoragePolicy, POLICIES
|
||||||
|
|
||||||
|
from test.unit.common.test_db import TestExampleBroker
|
||||||
|
|
||||||
|
|
||||||
@patch_policies
|
@patch_policies
|
||||||
class TestAccountBroker(unittest.TestCase):
|
class TestAccountBroker(unittest.TestCase):
|
||||||
@ -167,7 +170,11 @@ class TestAccountBroker(unittest.TestCase):
|
|||||||
self.assertEqual(info['put_timestamp'], normalize_timestamp(start))
|
self.assertEqual(info['put_timestamp'], normalize_timestamp(start))
|
||||||
self.assert_(float(info['created_at']) >= start)
|
self.assert_(float(info['created_at']) >= start)
|
||||||
self.assertEqual(info['delete_timestamp'], '0')
|
self.assertEqual(info['delete_timestamp'], '0')
|
||||||
self.assertEqual(info['status_changed_at'], '0')
|
if self.__class__ == TestAccountBrokerBeforeMetadata:
|
||||||
|
self.assertEqual(info['status_changed_at'], '0')
|
||||||
|
else:
|
||||||
|
self.assertEqual(info['status_changed_at'],
|
||||||
|
normalize_timestamp(start))
|
||||||
|
|
||||||
# delete it
|
# delete it
|
||||||
delete_timestamp = normalize_timestamp(ts.next())
|
delete_timestamp = normalize_timestamp(ts.next())
|
||||||
@ -320,7 +327,10 @@ class TestAccountBroker(unittest.TestCase):
|
|||||||
self.assertEqual(info['hash'], '00000000000000000000000000000000')
|
self.assertEqual(info['hash'], '00000000000000000000000000000000')
|
||||||
self.assertEqual(info['put_timestamp'], normalize_timestamp(1))
|
self.assertEqual(info['put_timestamp'], normalize_timestamp(1))
|
||||||
self.assertEqual(info['delete_timestamp'], '0')
|
self.assertEqual(info['delete_timestamp'], '0')
|
||||||
self.assertEqual(info['status_changed_at'], '0')
|
if self.__class__ == TestAccountBrokerBeforeMetadata:
|
||||||
|
self.assertEqual(info['status_changed_at'], '0')
|
||||||
|
else:
|
||||||
|
self.assertEqual(info['status_changed_at'], normalize_timestamp(1))
|
||||||
|
|
||||||
info = broker.get_info()
|
info = broker.get_info()
|
||||||
self.assertEqual(info['container_count'], 0)
|
self.assertEqual(info['container_count'], 0)
|
||||||
@ -725,6 +735,23 @@ def premetadata_create_account_stat_table(self, conn, put_timestamp):
|
|||||||
put_timestamp))
|
put_timestamp))
|
||||||
|
|
||||||
|
|
||||||
|
class TestCommonAccountBroker(TestExampleBroker):
|
||||||
|
|
||||||
|
broker_class = AccountBroker
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestCommonAccountBroker, self).setUp()
|
||||||
|
self.policy = random.choice(list(POLICIES))
|
||||||
|
|
||||||
|
def put_item(self, broker, timestamp):
|
||||||
|
broker.put_container('test', timestamp, 0, 0, 0,
|
||||||
|
int(self.policy))
|
||||||
|
|
||||||
|
def delete_item(self, broker, timestamp):
|
||||||
|
broker.put_container('test', 0, timestamp, 0, 0,
|
||||||
|
int(self.policy))
|
||||||
|
|
||||||
|
|
||||||
class TestAccountBrokerBeforeMetadata(TestAccountBroker):
|
class TestAccountBrokerBeforeMetadata(TestAccountBroker):
|
||||||
"""
|
"""
|
||||||
Tests for AccountBroker against databases created before
|
Tests for AccountBroker against databases created before
|
||||||
|
@ -20,6 +20,7 @@ import unittest
|
|||||||
from tempfile import mkdtemp
|
from tempfile import mkdtemp
|
||||||
from shutil import rmtree, copy
|
from shutil import rmtree, copy
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
|
import cPickle as pickle
|
||||||
|
|
||||||
import simplejson
|
import simplejson
|
||||||
import sqlite3
|
import sqlite3
|
||||||
@ -33,10 +34,12 @@ from eventlet.timeout import Timeout
|
|||||||
import swift.common.db
|
import swift.common.db
|
||||||
from swift.common.db import chexor, dict_factory, get_db_connection, \
|
from swift.common.db import chexor, dict_factory, get_db_connection, \
|
||||||
DatabaseBroker, DatabaseConnectionError, DatabaseAlreadyExists, \
|
DatabaseBroker, DatabaseConnectionError, DatabaseAlreadyExists, \
|
||||||
GreenDBConnection
|
GreenDBConnection, PICKLE_PROTOCOL
|
||||||
from swift.common.utils import normalize_timestamp, mkdirs, json
|
from swift.common.utils import normalize_timestamp, mkdirs, json
|
||||||
from swift.common.exceptions import LockTimeout
|
from swift.common.exceptions import LockTimeout
|
||||||
|
|
||||||
|
from test.unit import with_tempdir
|
||||||
|
|
||||||
|
|
||||||
class TestDatabaseConnectionError(unittest.TestCase):
|
class TestDatabaseConnectionError(unittest.TestCase):
|
||||||
|
|
||||||
@ -175,25 +178,399 @@ class TestGetDBConnection(unittest.TestCase):
|
|||||||
|
|
||||||
|
|
||||||
class ExampleBroker(DatabaseBroker):
|
class ExampleBroker(DatabaseBroker):
|
||||||
|
"""
|
||||||
|
Concrete enough implementation of a DatabaseBroker.
|
||||||
|
"""
|
||||||
|
|
||||||
db_type = 'test'
|
db_type = 'test'
|
||||||
db_contains_type = 'test'
|
db_contains_type = 'test'
|
||||||
|
|
||||||
def _initialize(self, conn, timestamp, **kwargs):
|
def _initialize(self, conn, put_timestamp, **kwargs):
|
||||||
conn.executescript('''
|
conn.executescript('''
|
||||||
CREATE TABLE test_stat (
|
CREATE TABLE test_stat (
|
||||||
name TEXT,
|
test_count INTEGER DEFAULT 0,
|
||||||
timestamp TEXT DEFAULT 0,
|
created_at TEXT,
|
||||||
status_changed_at TEXT DEFAULT 0
|
put_timestamp TEXT DEFAULT '0',
|
||||||
|
delete_timestamp TEXT DEFAULT '0',
|
||||||
|
status_changed_at TEXT DEFAULT '0',
|
||||||
|
metadata TEXT DEFAULT ''
|
||||||
|
|
||||||
);
|
);
|
||||||
CREATE TABLE test (
|
CREATE TABLE test (
|
||||||
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
|
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
name TEXT
|
name TEXT,
|
||||||
|
created_at TEXT,
|
||||||
|
deleted INTEGER DEFAULT 0
|
||||||
);
|
);
|
||||||
|
CREATE TRIGGER test_insert AFTER INSERT ON test
|
||||||
|
BEGIN
|
||||||
|
UPDATE test_stat
|
||||||
|
SET test_count = test_count + (1 - new.deleted);
|
||||||
|
END;
|
||||||
|
CREATE TRIGGER test_delete AFTER DELETE ON test
|
||||||
|
BEGIN
|
||||||
|
UPDATE test_stat
|
||||||
|
SET test_count = test_count - (1 - old.deleted);
|
||||||
|
END;
|
||||||
''')
|
''')
|
||||||
conn.execute("""
|
conn.execute("""
|
||||||
INSERT INTO test_stat (name, timestamp) VALUES (?, ?)
|
INSERT INTO test_stat (
|
||||||
""", (self.account, timestamp))
|
created_at, put_timestamp, status_changed_at)
|
||||||
|
VALUES (?, ?, ?);
|
||||||
|
""", (normalize_timestamp(time.time()), put_timestamp,
|
||||||
|
put_timestamp))
|
||||||
|
|
||||||
|
def merge_items(self, item_list):
|
||||||
|
with self.get() as conn:
|
||||||
|
for rec in item_list:
|
||||||
|
conn.execute(
|
||||||
|
'DELETE FROM test WHERE name = ? and created_at < ?', (
|
||||||
|
rec['name'], rec['created_at']))
|
||||||
|
if not conn.execute(
|
||||||
|
'SELECT 1 FROM test WHERE name = ?',
|
||||||
|
(rec['name'],)).fetchall():
|
||||||
|
conn.execute('''
|
||||||
|
INSERT INTO test (name, created_at, deleted)
|
||||||
|
VALUES (?, ?, ?)''', (
|
||||||
|
rec['name'], rec['created_at'], rec['deleted']))
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
def _commit_puts_load(self, item_list, entry):
|
||||||
|
(name, timestamp, deleted) = pickle.loads(entry.decode('base64'))
|
||||||
|
item_list.append({
|
||||||
|
'name': name,
|
||||||
|
'created_at': timestamp,
|
||||||
|
'deleted': deleted,
|
||||||
|
})
|
||||||
|
|
||||||
|
def _load_item(self, name, timestamp, deleted):
|
||||||
|
if self.db_file == ':memory:':
|
||||||
|
record = {
|
||||||
|
'name': name,
|
||||||
|
'created_at': timestamp,
|
||||||
|
'deleted': deleted,
|
||||||
|
}
|
||||||
|
self.merge_items([record])
|
||||||
|
return
|
||||||
|
with open(self.pending_file, 'a+b') as fp:
|
||||||
|
fp.write(':')
|
||||||
|
fp.write(pickle.dumps(
|
||||||
|
(name, timestamp, deleted),
|
||||||
|
protocol=PICKLE_PROTOCOL).encode('base64'))
|
||||||
|
fp.flush()
|
||||||
|
|
||||||
|
def put_test(self, name, timestamp):
|
||||||
|
self._load_item(name, timestamp, 0)
|
||||||
|
|
||||||
|
def delete_test(self, name, timestamp):
|
||||||
|
self._load_item(name, timestamp, 1)
|
||||||
|
|
||||||
|
def _is_deleted(self, conn):
|
||||||
|
info = conn.execute('SELECT * FROM test_stat').fetchone()
|
||||||
|
return (info['test_count'] in (None, '', 0, '0')) and \
|
||||||
|
(normalize_timestamp(info['delete_timestamp']) >
|
||||||
|
normalize_timestamp(info['put_timestamp']))
|
||||||
|
|
||||||
|
|
||||||
|
class TestExampleBroker(unittest.TestCase):
|
||||||
|
"""
|
||||||
|
Tests that use the mostly Concrete enough ExampleBroker to exercise some
|
||||||
|
of the abstract methods on DatabaseBroker.
|
||||||
|
"""
|
||||||
|
|
||||||
|
broker_class = ExampleBroker
|
||||||
|
policy = 0
|
||||||
|
|
||||||
|
def test_merge_timestamps_simple_delete(self):
|
||||||
|
ts = (normalize_timestamp(t) for t in
|
||||||
|
itertools.count(int(time.time())))
|
||||||
|
put_timestamp = ts.next()
|
||||||
|
broker = self.broker_class(':memory:', account='a', container='c')
|
||||||
|
broker.initialize(put_timestamp)
|
||||||
|
created_at = broker.get_info()['created_at']
|
||||||
|
broker.merge_timestamps(created_at, put_timestamp, '0')
|
||||||
|
info = broker.get_info()
|
||||||
|
self.assertEqual(info['created_at'], created_at)
|
||||||
|
self.assertEqual(info['put_timestamp'], put_timestamp)
|
||||||
|
self.assertEqual(info['delete_timestamp'], '0')
|
||||||
|
self.assertEqual(info['status_changed_at'], put_timestamp)
|
||||||
|
# delete
|
||||||
|
delete_timestamp = ts.next()
|
||||||
|
broker.merge_timestamps(created_at, put_timestamp, delete_timestamp)
|
||||||
|
self.assert_(broker.is_deleted())
|
||||||
|
info = broker.get_info()
|
||||||
|
self.assertEqual(info['created_at'], created_at)
|
||||||
|
self.assertEqual(info['put_timestamp'], put_timestamp)
|
||||||
|
self.assertEqual(info['delete_timestamp'], delete_timestamp)
|
||||||
|
self.assert_(info['status_changed_at'] >
|
||||||
|
normalize_timestamp(put_timestamp))
|
||||||
|
|
||||||
|
def put_item(self, broker, timestamp):
|
||||||
|
broker.put_test('test', timestamp)
|
||||||
|
|
||||||
|
def delete_item(self, broker, timestamp):
|
||||||
|
broker.delete_test('test', timestamp)
|
||||||
|
|
||||||
|
def test_merge_timestamps_delete_with_objects(self):
|
||||||
|
ts = (normalize_timestamp(t) for t in
|
||||||
|
itertools.count(int(time.time())))
|
||||||
|
put_timestamp = ts.next()
|
||||||
|
broker = self.broker_class(':memory:', account='a', container='c')
|
||||||
|
broker.initialize(put_timestamp, storage_policy_index=int(self.policy))
|
||||||
|
created_at = broker.get_info()['created_at']
|
||||||
|
broker.merge_timestamps(created_at, put_timestamp, '0')
|
||||||
|
info = broker.get_info()
|
||||||
|
self.assertEqual(info['created_at'], created_at)
|
||||||
|
self.assertEqual(info['put_timestamp'], put_timestamp)
|
||||||
|
self.assertEqual(info['delete_timestamp'], '0')
|
||||||
|
self.assertEqual(info['status_changed_at'], put_timestamp)
|
||||||
|
# add object
|
||||||
|
self.put_item(broker, ts.next())
|
||||||
|
self.assertEqual(broker.get_info()[
|
||||||
|
'%s_count' % broker.db_contains_type], 1)
|
||||||
|
# delete
|
||||||
|
delete_timestamp = ts.next()
|
||||||
|
broker.merge_timestamps(created_at, put_timestamp, delete_timestamp)
|
||||||
|
self.assertFalse(broker.is_deleted())
|
||||||
|
info = broker.get_info()
|
||||||
|
self.assertEqual(info['created_at'], created_at)
|
||||||
|
self.assertEqual(info['put_timestamp'], put_timestamp)
|
||||||
|
self.assertEqual(info['delete_timestamp'], delete_timestamp)
|
||||||
|
# status is unchanged
|
||||||
|
self.assertEqual(info['status_changed_at'], put_timestamp)
|
||||||
|
# count is causing status to hold on
|
||||||
|
self.delete_item(broker, ts.next())
|
||||||
|
self.assertEqual(broker.get_info()[
|
||||||
|
'%s_count' % broker.db_contains_type], 0)
|
||||||
|
self.assert_(broker.is_deleted())
|
||||||
|
|
||||||
|
def test_merge_timestamps_simple_recreate(self):
|
||||||
|
ts = (normalize_timestamp(t) for t in
|
||||||
|
itertools.count(int(time.time())))
|
||||||
|
put_timestamp = ts.next()
|
||||||
|
broker = self.broker_class(':memory:', account='a', container='c')
|
||||||
|
broker.initialize(put_timestamp, storage_policy_index=int(self.policy))
|
||||||
|
virgin_status_changed_at = broker.get_info()['status_changed_at']
|
||||||
|
created_at = broker.get_info()['created_at']
|
||||||
|
delete_timestamp = ts.next()
|
||||||
|
broker.merge_timestamps(created_at, put_timestamp, delete_timestamp)
|
||||||
|
self.assert_(broker.is_deleted())
|
||||||
|
info = broker.get_info()
|
||||||
|
self.assertEqual(info['created_at'], created_at)
|
||||||
|
self.assertEqual(info['put_timestamp'], put_timestamp)
|
||||||
|
self.assertEqual(info['delete_timestamp'], delete_timestamp)
|
||||||
|
orig_status_changed_at = info['status_changed_at']
|
||||||
|
self.assert_(orig_status_changed_at >
|
||||||
|
normalize_timestamp(virgin_status_changed_at))
|
||||||
|
# recreate
|
||||||
|
recreate_timestamp = ts.next()
|
||||||
|
status_changed_at = time.time()
|
||||||
|
with patch('swift.common.db.time.time', new=lambda: status_changed_at):
|
||||||
|
broker.merge_timestamps(created_at, recreate_timestamp, '0')
|
||||||
|
self.assertFalse(broker.is_deleted())
|
||||||
|
info = broker.get_info()
|
||||||
|
self.assertEqual(info['created_at'], created_at)
|
||||||
|
self.assertEqual(info['put_timestamp'], recreate_timestamp)
|
||||||
|
self.assertEqual(info['delete_timestamp'], delete_timestamp)
|
||||||
|
self.assert_(info['status_changed_at'], status_changed_at)
|
||||||
|
|
||||||
|
def test_merge_timestamps_recreate_with_objects(self):
|
||||||
|
ts = (normalize_timestamp(t) for t in
|
||||||
|
itertools.count(int(time.time())))
|
||||||
|
put_timestamp = ts.next()
|
||||||
|
broker = self.broker_class(':memory:', account='a', container='c')
|
||||||
|
broker.initialize(put_timestamp, storage_policy_index=int(self.policy))
|
||||||
|
created_at = broker.get_info()['created_at']
|
||||||
|
# delete
|
||||||
|
delete_timestamp = ts.next()
|
||||||
|
broker.merge_timestamps(created_at, put_timestamp, delete_timestamp)
|
||||||
|
self.assert_(broker.is_deleted())
|
||||||
|
info = broker.get_info()
|
||||||
|
self.assertEqual(info['created_at'], created_at)
|
||||||
|
self.assertEqual(info['put_timestamp'], put_timestamp)
|
||||||
|
self.assertEqual(info['delete_timestamp'], delete_timestamp)
|
||||||
|
orig_status_changed_at = info['status_changed_at']
|
||||||
|
self.assert_(normalize_timestamp(orig_status_changed_at) >=
|
||||||
|
normalize_timestamp(put_timestamp))
|
||||||
|
# add object
|
||||||
|
self.put_item(broker, ts.next())
|
||||||
|
count_key = '%s_count' % broker.db_contains_type
|
||||||
|
self.assertEqual(broker.get_info()[count_key], 1)
|
||||||
|
self.assertFalse(broker.is_deleted())
|
||||||
|
# recreate
|
||||||
|
recreate_timestamp = ts.next()
|
||||||
|
broker.merge_timestamps(created_at, recreate_timestamp, '0')
|
||||||
|
self.assertFalse(broker.is_deleted())
|
||||||
|
info = broker.get_info()
|
||||||
|
self.assertEqual(info['created_at'], created_at)
|
||||||
|
self.assertEqual(info['put_timestamp'], recreate_timestamp)
|
||||||
|
self.assertEqual(info['delete_timestamp'], delete_timestamp)
|
||||||
|
self.assertEqual(info['status_changed_at'], orig_status_changed_at)
|
||||||
|
# count is not causing status to hold on
|
||||||
|
self.delete_item(broker, ts.next())
|
||||||
|
self.assertFalse(broker.is_deleted())
|
||||||
|
|
||||||
|
def test_merge_timestamps_update_put_no_status_change(self):
|
||||||
|
ts = (normalize_timestamp(t) for t in
|
||||||
|
itertools.count(int(time.time())))
|
||||||
|
put_timestamp = ts.next()
|
||||||
|
broker = self.broker_class(':memory:', account='a', container='c')
|
||||||
|
broker.initialize(put_timestamp, storage_policy_index=int(self.policy))
|
||||||
|
info = broker.get_info()
|
||||||
|
orig_status_changed_at = info['status_changed_at']
|
||||||
|
created_at = info['created_at']
|
||||||
|
new_put_timestamp = ts.next()
|
||||||
|
broker.merge_timestamps(created_at, new_put_timestamp, '0')
|
||||||
|
info = broker.get_info()
|
||||||
|
self.assertEqual(new_put_timestamp, info['put_timestamp'])
|
||||||
|
self.assertEqual(orig_status_changed_at, info['status_changed_at'])
|
||||||
|
|
||||||
|
def test_merge_timestamps_update_delete_no_status_change(self):
|
||||||
|
ts = (normalize_timestamp(t) for t in
|
||||||
|
itertools.count(int(time.time())))
|
||||||
|
put_timestamp = ts.next()
|
||||||
|
broker = self.broker_class(':memory:', account='a', container='c')
|
||||||
|
broker.initialize(put_timestamp, storage_policy_index=int(self.policy))
|
||||||
|
created_at = broker.get_info()['created_at']
|
||||||
|
broker.merge_timestamps(created_at, put_timestamp, ts.next())
|
||||||
|
orig_status_changed_at = broker.get_info()['status_changed_at']
|
||||||
|
new_delete_timestamp = ts.next()
|
||||||
|
broker.merge_timestamps(created_at, put_timestamp,
|
||||||
|
new_delete_timestamp)
|
||||||
|
info = broker.get_info()
|
||||||
|
self.assertEqual(new_delete_timestamp, info['delete_timestamp'])
|
||||||
|
self.assertEqual(orig_status_changed_at, info['status_changed_at'])
|
||||||
|
|
||||||
|
def test_get_max_row(self):
|
||||||
|
ts = (normalize_timestamp(t) for t in
|
||||||
|
itertools.count(int(time.time())))
|
||||||
|
broker = self.broker_class(':memory:', account='a', container='c')
|
||||||
|
broker.initialize(ts.next(), storage_policy_index=int(self.policy))
|
||||||
|
self.assertEquals(-1, broker.get_max_row())
|
||||||
|
self.put_item(broker, ts.next())
|
||||||
|
self.assertEquals(1, broker.get_max_row())
|
||||||
|
self.delete_item(broker, ts.next())
|
||||||
|
self.assertEquals(2, broker.get_max_row())
|
||||||
|
self.put_item(broker, ts.next())
|
||||||
|
self.assertEquals(3, broker.get_max_row())
|
||||||
|
|
||||||
|
def test_get_info(self):
|
||||||
|
broker = self.broker_class(':memory:', account='test', container='c')
|
||||||
|
created_at = time.time()
|
||||||
|
with patch('swift.common.db.time.time', new=lambda: created_at):
|
||||||
|
broker.initialize(normalize_timestamp(1),
|
||||||
|
storage_policy_index=int(self.policy))
|
||||||
|
info = broker.get_info()
|
||||||
|
count_key = '%s_count' % broker.db_contains_type
|
||||||
|
expected = {
|
||||||
|
count_key: 0,
|
||||||
|
'created_at': normalize_timestamp(created_at),
|
||||||
|
'put_timestamp': normalize_timestamp(1),
|
||||||
|
'status_changed_at': normalize_timestamp(1),
|
||||||
|
'delete_timestamp': '0',
|
||||||
|
}
|
||||||
|
for k, v in expected.items():
|
||||||
|
self.assertEqual(info[k], v,
|
||||||
|
'mismatch for %s, %s != %s' % (
|
||||||
|
k, info[k], v))
|
||||||
|
|
||||||
|
def test_get_raw_metadata(self):
|
||||||
|
broker = self.broker_class(':memory:', account='test', container='c')
|
||||||
|
broker.initialize(normalize_timestamp(0),
|
||||||
|
storage_policy_index=int(self.policy))
|
||||||
|
self.assertEqual(broker.metadata, {})
|
||||||
|
self.assertEqual(broker.get_raw_metadata(), '')
|
||||||
|
key = u'test\u062a'.encode('utf-8')
|
||||||
|
value = u'value\u062a'
|
||||||
|
metadata = {
|
||||||
|
key: [value, normalize_timestamp(1)]
|
||||||
|
}
|
||||||
|
broker.update_metadata(metadata)
|
||||||
|
self.assertEqual(broker.metadata, metadata)
|
||||||
|
self.assertEqual(broker.get_raw_metadata(),
|
||||||
|
json.dumps(metadata))
|
||||||
|
|
||||||
|
def test_put_timestamp(self):
|
||||||
|
ts = (normalize_timestamp(t) for t in
|
||||||
|
itertools.count(int(time.time())))
|
||||||
|
broker = self.broker_class(':memory:', account='a', container='c')
|
||||||
|
orig_put_timestamp = ts.next()
|
||||||
|
broker.initialize(orig_put_timestamp,
|
||||||
|
storage_policy_index=int(self.policy))
|
||||||
|
self.assertEqual(broker.get_info()['put_timestamp'],
|
||||||
|
orig_put_timestamp)
|
||||||
|
# put_timestamp equal - no change
|
||||||
|
broker.update_put_timestamp(orig_put_timestamp)
|
||||||
|
self.assertEqual(broker.get_info()['put_timestamp'],
|
||||||
|
orig_put_timestamp)
|
||||||
|
# put_timestamp newer - gets newer
|
||||||
|
newer_put_timestamp = ts.next()
|
||||||
|
broker.update_put_timestamp(newer_put_timestamp)
|
||||||
|
self.assertEqual(broker.get_info()['put_timestamp'],
|
||||||
|
newer_put_timestamp)
|
||||||
|
# put_timestamp older - no change
|
||||||
|
broker.update_put_timestamp(orig_put_timestamp)
|
||||||
|
self.assertEqual(broker.get_info()['put_timestamp'],
|
||||||
|
newer_put_timestamp)
|
||||||
|
|
||||||
|
def test_status_changed_at(self):
|
||||||
|
ts = (normalize_timestamp(t) for t in
|
||||||
|
itertools.count(int(time.time())))
|
||||||
|
broker = self.broker_class(':memory:', account='test', container='c')
|
||||||
|
put_timestamp = ts.next()
|
||||||
|
created_at = time.time()
|
||||||
|
with patch('swift.common.db.time.time', new=lambda: created_at):
|
||||||
|
broker.initialize(put_timestamp,
|
||||||
|
storage_policy_index=int(self.policy))
|
||||||
|
self.assertEquals(broker.get_info()['status_changed_at'],
|
||||||
|
put_timestamp)
|
||||||
|
self.assertEquals(broker.get_info()['created_at'],
|
||||||
|
normalize_timestamp(created_at))
|
||||||
|
status_changed_at = ts.next()
|
||||||
|
broker.update_status_changed_at(status_changed_at)
|
||||||
|
self.assertEqual(broker.get_info()['status_changed_at'],
|
||||||
|
status_changed_at)
|
||||||
|
# save the old and get a new status_changed_at
|
||||||
|
old_status_changed_at, status_changed_at = \
|
||||||
|
status_changed_at, ts.next()
|
||||||
|
broker.update_status_changed_at(status_changed_at)
|
||||||
|
self.assertEqual(broker.get_info()['status_changed_at'],
|
||||||
|
status_changed_at)
|
||||||
|
# status changed at won't go backwards...
|
||||||
|
broker.update_status_changed_at(old_status_changed_at)
|
||||||
|
self.assertEqual(broker.get_info()['status_changed_at'],
|
||||||
|
status_changed_at)
|
||||||
|
|
||||||
|
def test_get_syncs(self):
|
||||||
|
broker = self.broker_class(':memory:', account='a', container='c')
|
||||||
|
broker.initialize(normalize_timestamp(time.time()),
|
||||||
|
storage_policy_index=int(self.policy))
|
||||||
|
self.assertEqual([], broker.get_syncs())
|
||||||
|
broker.merge_syncs([{'sync_point': 1, 'remote_id': 'remote1'}])
|
||||||
|
self.assertEqual([{'sync_point': 1, 'remote_id': 'remote1'}],
|
||||||
|
broker.get_syncs())
|
||||||
|
self.assertEqual([], broker.get_syncs(incoming=False))
|
||||||
|
broker.merge_syncs([{'sync_point': 2, 'remote_id': 'remote2'}],
|
||||||
|
incoming=False)
|
||||||
|
self.assertEqual([{'sync_point': 2, 'remote_id': 'remote2'}],
|
||||||
|
broker.get_syncs(incoming=False))
|
||||||
|
|
||||||
|
@with_tempdir
|
||||||
|
def test_commit_pending(self, tempdir):
|
||||||
|
ts = (normalize_timestamp(t) for t in
|
||||||
|
itertools.count(int(time.time())))
|
||||||
|
broker = self.broker_class(os.path.join(tempdir, 'test.db'),
|
||||||
|
account='a', container='c')
|
||||||
|
broker.initialize(ts.next(), storage_policy_index=int(self.policy))
|
||||||
|
self.put_item(broker, ts.next())
|
||||||
|
qry = 'select * from %s_stat' % broker.db_type
|
||||||
|
with broker.get() as conn:
|
||||||
|
rows = [dict(x) for x in conn.execute(qry)]
|
||||||
|
info = rows[0]
|
||||||
|
count_key = '%s_count' % broker.db_contains_type
|
||||||
|
self.assertEqual(0, info[count_key])
|
||||||
|
broker.get_info()
|
||||||
|
self.assertEqual(1, broker.get_info()[count_key])
|
||||||
|
|
||||||
|
|
||||||
class TestDatabaseBroker(unittest.TestCase):
|
class TestDatabaseBroker(unittest.TestCase):
|
||||||
@ -731,74 +1108,6 @@ class TestDatabaseBroker(unittest.TestCase):
|
|||||||
[first_value, first_timestamp])
|
[first_value, first_timestamp])
|
||||||
self.assert_('Second' not in broker.metadata)
|
self.assert_('Second' not in broker.metadata)
|
||||||
|
|
||||||
def test_get_max_row(self):
|
|
||||||
broker = ExampleBroker(':memory:')
|
|
||||||
broker.initialize(0)
|
|
||||||
self.assertEquals(-1, broker.get_max_row())
|
|
||||||
with broker.get() as conn:
|
|
||||||
conn.execute('''
|
|
||||||
INSERT INTO test (name) VALUES (?)
|
|
||||||
''', ('test_name',))
|
|
||||||
conn.commit()
|
|
||||||
self.assertEquals(1, broker.get_max_row())
|
|
||||||
with broker.get() as conn:
|
|
||||||
conn.executescript('''
|
|
||||||
DELETE FROM test;
|
|
||||||
''')
|
|
||||||
conn.commit()
|
|
||||||
self.assertEquals(1, broker.get_max_row())
|
|
||||||
with broker.get() as conn:
|
|
||||||
conn.execute('''
|
|
||||||
INSERT INTO test (name) VALUES (?)
|
|
||||||
''', ('test_name',))
|
|
||||||
conn.commit()
|
|
||||||
self.assertEquals(2, broker.get_max_row())
|
|
||||||
|
|
||||||
def test_get_info(self):
|
|
||||||
broker = ExampleBroker(':memory:', account='test')
|
|
||||||
broker.initialize(normalize_timestamp(1))
|
|
||||||
info = broker.get_info()
|
|
||||||
expected = {
|
|
||||||
'name': 'test',
|
|
||||||
'timestamp': '0000000001.00000',
|
|
||||||
'status_changed_at': '0',
|
|
||||||
}
|
|
||||||
self.assertEqual(info, expected)
|
|
||||||
|
|
||||||
def test_get_raw_metadata(self):
|
|
||||||
broker = ExampleBroker(':memory:', account='test')
|
|
||||||
broker.initialize(normalize_timestamp(0))
|
|
||||||
self.assertEqual(broker.metadata, {})
|
|
||||||
self.assertEqual(broker.get_raw_metadata(), '')
|
|
||||||
metadata = {
|
|
||||||
'test': ['value', normalize_timestamp(1)]
|
|
||||||
}
|
|
||||||
broker.update_metadata(metadata)
|
|
||||||
self.assertEqual(broker.metadata, metadata)
|
|
||||||
self.assertEqual(broker.get_raw_metadata(),
|
|
||||||
json.dumps(metadata))
|
|
||||||
|
|
||||||
def test_status_changed_at(self):
|
|
||||||
ts = (normalize_timestamp(t) for t in
|
|
||||||
itertools.count(int(time.time())))
|
|
||||||
broker = ExampleBroker(':memory:', account='test')
|
|
||||||
broker.initialize(ts.next())
|
|
||||||
self.assertEquals(broker.get_info()['status_changed_at'], '0')
|
|
||||||
status_changed_at = ts.next()
|
|
||||||
broker.update_status_changed_at(status_changed_at)
|
|
||||||
self.assertEqual(broker.get_info()['status_changed_at'],
|
|
||||||
status_changed_at)
|
|
||||||
# save the old and get a new status_changed_at
|
|
||||||
old_status_changed_at, status_changed_at = \
|
|
||||||
status_changed_at, ts.next()
|
|
||||||
broker.update_status_changed_at(status_changed_at)
|
|
||||||
self.assertEqual(broker.get_info()['status_changed_at'],
|
|
||||||
status_changed_at)
|
|
||||||
# status changed at won't go backwards...
|
|
||||||
broker.update_status_changed_at(old_status_changed_at)
|
|
||||||
self.assertEqual(broker.get_info()['status_changed_at'],
|
|
||||||
status_changed_at)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
@ -34,6 +34,7 @@ from swift.common.storage_policy import POLICIES
|
|||||||
import mock
|
import mock
|
||||||
|
|
||||||
from test.unit import patch_policies, with_tempdir
|
from test.unit import patch_policies, with_tempdir
|
||||||
|
from test.unit.common.test_db import TestExampleBroker
|
||||||
|
|
||||||
|
|
||||||
class TestContainerBroker(unittest.TestCase):
|
class TestContainerBroker(unittest.TestCase):
|
||||||
@ -1390,6 +1391,23 @@ class TestContainerBroker(unittest.TestCase):
|
|||||||
self.assertEqual(broker.get_policy_stats(), expected)
|
self.assertEqual(broker.get_policy_stats(), expected)
|
||||||
|
|
||||||
|
|
||||||
|
class TestCommonContainerBroker(TestExampleBroker):
|
||||||
|
|
||||||
|
broker_class = ContainerBroker
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestCommonContainerBroker, self).setUp()
|
||||||
|
self.policy = random.choice(list(POLICIES))
|
||||||
|
|
||||||
|
def put_item(self, broker, timestamp):
|
||||||
|
broker.put_object('test', timestamp, 0, 'text/plain', 'x',
|
||||||
|
storage_policy_index=int(self.policy))
|
||||||
|
|
||||||
|
def delete_item(self, broker, timestamp):
|
||||||
|
broker.delete_object('test', timestamp,
|
||||||
|
storage_policy_index=int(self.policy))
|
||||||
|
|
||||||
|
|
||||||
class ContainerBrokerMigrationMixin(object):
|
class ContainerBrokerMigrationMixin(object):
|
||||||
"""
|
"""
|
||||||
Mixin for running ContainerBroker against databases created with
|
Mixin for running ContainerBroker against databases created with
|
||||||
|
Loading…
x
Reference in New Issue
Block a user