ca87827db9
Change-Id: Ie0e8137615348b130b67323d0d2913dc5ebfd5fb
685 lines
28 KiB
Python
685 lines
28 KiB
Python
# Copyright (c) 2010-2012 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""Tests for swift.common.db"""
|
|
|
|
import os
|
|
import unittest
|
|
from tempfile import mkdtemp
|
|
from shutil import rmtree, copy
|
|
from uuid import uuid4
|
|
|
|
import simplejson
|
|
import sqlite3
|
|
from mock import patch, MagicMock
|
|
|
|
from eventlet.timeout import Timeout
|
|
|
|
import swift.common.db
|
|
from swift.common.db import chexor, dict_factory, get_db_connection, \
|
|
DatabaseBroker, DatabaseConnectionError, DatabaseAlreadyExists, \
|
|
GreenDBConnection
|
|
from swift.common.utils import normalize_timestamp, mkdirs
|
|
from swift.common.exceptions import LockTimeout
|
|
|
|
|
|
class TestDatabaseConnectionError(unittest.TestCase):
|
|
|
|
def test_str(self):
|
|
err = \
|
|
DatabaseConnectionError(':memory:', 'No valid database connection')
|
|
self.assert_(':memory:' in str(err))
|
|
self.assert_('No valid database connection' in str(err))
|
|
err = DatabaseConnectionError(':memory:',
|
|
'No valid database connection',
|
|
timeout=1357)
|
|
self.assert_(':memory:' in str(err))
|
|
self.assert_('No valid database connection' in str(err))
|
|
self.assert_('1357' in str(err))
|
|
|
|
|
|
class TestDictFactory(unittest.TestCase):
|
|
|
|
def test_normal_case(self):
|
|
conn = sqlite3.connect(':memory:')
|
|
conn.execute('CREATE TABLE test (one TEXT, two INTEGER)')
|
|
conn.execute('INSERT INTO test (one, two) VALUES ("abc", 123)')
|
|
conn.execute('INSERT INTO test (one, two) VALUES ("def", 456)')
|
|
conn.commit()
|
|
curs = conn.execute('SELECT one, two FROM test')
|
|
self.assertEquals(dict_factory(curs, curs.next()),
|
|
{'one': 'abc', 'two': 123})
|
|
self.assertEquals(dict_factory(curs, curs.next()),
|
|
{'one': 'def', 'two': 456})
|
|
|
|
|
|
class TestChexor(unittest.TestCase):
|
|
|
|
def test_normal_case(self):
|
|
self.assertEquals(
|
|
chexor('d41d8cd98f00b204e9800998ecf8427e',
|
|
'new name', normalize_timestamp(1)),
|
|
'4f2ea31ac14d4273fe32ba08062b21de')
|
|
|
|
def test_invalid_old_hash(self):
|
|
self.assertRaises(ValueError, chexor, 'oldhash', 'name',
|
|
normalize_timestamp(1))
|
|
|
|
def test_no_name(self):
|
|
self.assertRaises(Exception, chexor,
|
|
'd41d8cd98f00b204e9800998ecf8427e', None,
|
|
normalize_timestamp(1))
|
|
|
|
|
|
class TestGreenDBConnection(unittest.TestCase):
|
|
|
|
def test_execute_when_locked(self):
|
|
# This test is dependant on the code under test calling execute and
|
|
# commit as sqlite3.Cursor.execute in a subclass.
|
|
class InterceptCursor(sqlite3.Cursor):
|
|
pass
|
|
db_error = sqlite3.OperationalError('database is locked')
|
|
InterceptCursor.execute = MagicMock(side_effect=db_error)
|
|
with patch('sqlite3.Cursor', new=InterceptCursor):
|
|
conn = sqlite3.connect(':memory:', check_same_thread=False,
|
|
factory=GreenDBConnection, timeout=0.1)
|
|
self.assertRaises(Timeout, conn.execute, 'select 1')
|
|
self.assertTrue(InterceptCursor.execute.called)
|
|
self.assertEqual(InterceptCursor.execute.call_args_list,
|
|
list((InterceptCursor.execute.call_args,) *
|
|
InterceptCursor.execute.call_count))
|
|
|
|
def text_commit_when_locked(self):
|
|
# This test is dependant on the code under test calling commit and
|
|
# commit as sqlite3.Connection.commit in a subclass.
|
|
class InterceptConnection(sqlite3.Connection):
|
|
pass
|
|
db_error = sqlite3.OperationalError('database is locked')
|
|
InterceptConnection.commit = MagicMock(side_effect=db_error)
|
|
with patch('sqlite3.Connection', new=InterceptConnection):
|
|
conn = sqlite3.connect(':memory:', check_same_thread=False,
|
|
factory=GreenDBConnection, timeout=0.1)
|
|
self.assertRaises(Timeout, conn.commit)
|
|
self.assertTrue(InterceptConnection.commit.called)
|
|
self.assertEqual(InterceptConnection.commit.call_args_list,
|
|
list((InterceptConnection.commit.call_args,) *
|
|
InterceptConnection.commit.call_count))
|
|
|
|
|
|
class TestGetDBConnection(unittest.TestCase):
|
|
|
|
def test_normal_case(self):
|
|
conn = get_db_connection(':memory:')
|
|
self.assert_(hasattr(conn, 'execute'))
|
|
|
|
def test_invalid_path(self):
|
|
self.assertRaises(DatabaseConnectionError, get_db_connection,
|
|
'invalid database path / name')
|
|
|
|
def test_locked_db(self):
|
|
# This test is dependant on the code under test calling execute and
|
|
# commit as sqlite3.Cursor.execute in a subclass.
|
|
class InterceptCursor(sqlite3.Cursor):
|
|
pass
|
|
|
|
db_error = sqlite3.OperationalError('database is locked')
|
|
mock_db_cmd = MagicMock(side_effect=db_error)
|
|
InterceptCursor.execute = mock_db_cmd
|
|
|
|
with patch('sqlite3.Cursor', new=InterceptCursor):
|
|
self.assertRaises(Timeout, get_db_connection, ':memory:',
|
|
timeout=0.1)
|
|
self.assertTrue(mock_db_cmd.called)
|
|
self.assertEqual(mock_db_cmd.call_args_list,
|
|
list((mock_db_cmd.call_args,) *
|
|
mock_db_cmd.call_count))
|
|
|
|
|
|
class TestDatabaseBroker(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.testdir = mkdtemp()
|
|
|
|
def tearDown(self):
|
|
rmtree(self.testdir, ignore_errors=1)
|
|
|
|
def test_DB_PREALLOCATION_setting(self):
|
|
u = uuid4().hex
|
|
b = DatabaseBroker(u)
|
|
swift.common.db.DB_PREALLOCATION = False
|
|
b._preallocate()
|
|
swift.common.db.DB_PREALLOCATION = True
|
|
self.assertRaises(OSError, b._preallocate)
|
|
|
|
def test_memory_db_init(self):
|
|
broker = DatabaseBroker(':memory:')
|
|
self.assertEqual(broker.db_file, ':memory:')
|
|
self.assertRaises(AttributeError, broker.initialize,
|
|
normalize_timestamp('0'))
|
|
|
|
def test_disk_db_init(self):
|
|
db_file = os.path.join(self.testdir, '1.db')
|
|
broker = DatabaseBroker(db_file)
|
|
self.assertEqual(broker.db_file, db_file)
|
|
self.assert_(broker.conn is None)
|
|
|
|
def test_disk_preallocate(self):
|
|
test_size = [-1]
|
|
|
|
def fallocate_stub(fd, size):
|
|
test_size[0] = size
|
|
|
|
with patch('swift.common.db.fallocate', fallocate_stub):
|
|
db_file = os.path.join(self.testdir, 'pre.db')
|
|
# Write 1 byte and hope that the fs will allocate less than 1 MB.
|
|
f = open(db_file, "w")
|
|
f.write('@')
|
|
f.close()
|
|
b = DatabaseBroker(db_file)
|
|
b._preallocate()
|
|
# We only wrote 1 byte, so we should end with the 1st step or 1 MB.
|
|
self.assertEquals(test_size[0], 1024 * 1024)
|
|
|
|
def test_initialize(self):
|
|
self.assertRaises(AttributeError,
|
|
DatabaseBroker(':memory:').initialize,
|
|
normalize_timestamp('1'))
|
|
stub_dict = {}
|
|
|
|
def stub(*args, **kwargs):
|
|
for key in stub_dict.keys():
|
|
del stub_dict[key]
|
|
stub_dict['args'] = args
|
|
for key, value in kwargs.items():
|
|
stub_dict[key] = value
|
|
broker = DatabaseBroker(':memory:')
|
|
broker._initialize = stub
|
|
broker.initialize(normalize_timestamp('1'))
|
|
self.assert_(hasattr(stub_dict['args'][0], 'execute'))
|
|
self.assertEquals(stub_dict['args'][1], '0000000001.00000')
|
|
with broker.get() as conn:
|
|
conn.execute('SELECT * FROM outgoing_sync')
|
|
conn.execute('SELECT * FROM incoming_sync')
|
|
broker = DatabaseBroker(os.path.join(self.testdir, '1.db'))
|
|
broker._initialize = stub
|
|
broker.initialize(normalize_timestamp('1'))
|
|
self.assert_(hasattr(stub_dict['args'][0], 'execute'))
|
|
self.assertEquals(stub_dict['args'][1], '0000000001.00000')
|
|
with broker.get() as conn:
|
|
conn.execute('SELECT * FROM outgoing_sync')
|
|
conn.execute('SELECT * FROM incoming_sync')
|
|
broker = DatabaseBroker(os.path.join(self.testdir, '1.db'))
|
|
broker._initialize = stub
|
|
self.assertRaises(DatabaseAlreadyExists,
|
|
broker.initialize, normalize_timestamp('1'))
|
|
|
|
def test_delete_db(self):
|
|
def init_stub(conn, put_timestamp):
|
|
conn.execute('CREATE TABLE test (one TEXT)')
|
|
conn.execute('CREATE TABLE test_stat (id TEXT)')
|
|
conn.execute('INSERT INTO test_stat (id) VALUES (?)',
|
|
(str(uuid4),))
|
|
conn.execute('INSERT INTO test (one) VALUES ("1")')
|
|
conn.commit()
|
|
stub_called = [False]
|
|
|
|
def delete_stub(*a, **kw):
|
|
stub_called[0] = True
|
|
broker = DatabaseBroker(':memory:')
|
|
broker.db_type = 'test'
|
|
broker._initialize = init_stub
|
|
# Initializes a good broker for us
|
|
broker.initialize(normalize_timestamp('1'))
|
|
self.assert_(broker.conn is not None)
|
|
broker._delete_db = delete_stub
|
|
stub_called[0] = False
|
|
broker.delete_db('2')
|
|
self.assert_(stub_called[0])
|
|
broker = DatabaseBroker(os.path.join(self.testdir, '1.db'))
|
|
broker.db_type = 'test'
|
|
broker._initialize = init_stub
|
|
broker.initialize(normalize_timestamp('1'))
|
|
broker._delete_db = delete_stub
|
|
stub_called[0] = False
|
|
broker.delete_db('2')
|
|
self.assert_(stub_called[0])
|
|
# ensure that metadata was cleared
|
|
m2 = broker.metadata
|
|
self.assert_(not any(v[0] for v in m2.itervalues()))
|
|
self.assert_(all(v[1] == normalize_timestamp('2')
|
|
for v in m2.itervalues()))
|
|
|
|
def test_get(self):
|
|
broker = DatabaseBroker(':memory:')
|
|
got_exc = False
|
|
try:
|
|
with broker.get() as conn:
|
|
conn.execute('SELECT 1')
|
|
except Exception:
|
|
got_exc = True
|
|
broker = DatabaseBroker(os.path.join(self.testdir, '1.db'))
|
|
got_exc = False
|
|
try:
|
|
with broker.get() as conn:
|
|
conn.execute('SELECT 1')
|
|
except Exception:
|
|
got_exc = True
|
|
self.assert_(got_exc)
|
|
|
|
def stub(*args, **kwargs):
|
|
pass
|
|
broker._initialize = stub
|
|
broker.initialize(normalize_timestamp('1'))
|
|
with broker.get() as conn:
|
|
conn.execute('CREATE TABLE test (one TEXT)')
|
|
try:
|
|
with broker.get() as conn:
|
|
conn.execute('INSERT INTO test (one) VALUES ("1")')
|
|
raise Exception('test')
|
|
conn.commit()
|
|
except Exception:
|
|
pass
|
|
broker = DatabaseBroker(os.path.join(self.testdir, '1.db'))
|
|
with broker.get() as conn:
|
|
self.assertEquals(
|
|
[r[0] for r in conn.execute('SELECT * FROM test')], [])
|
|
with broker.get() as conn:
|
|
conn.execute('INSERT INTO test (one) VALUES ("1")')
|
|
conn.commit()
|
|
broker = DatabaseBroker(os.path.join(self.testdir, '1.db'))
|
|
with broker.get() as conn:
|
|
self.assertEquals(
|
|
[r[0] for r in conn.execute('SELECT * FROM test')], ['1'])
|
|
|
|
dbpath = os.path.join(self.testdir, 'dev', 'dbs', 'par', 'pre', 'db')
|
|
mkdirs(dbpath)
|
|
qpath = os.path.join(self.testdir, 'dev', 'quarantined', 'tests', 'db')
|
|
with patch('swift.common.db.renamer', lambda a, b: b):
|
|
# Test malformed database
|
|
copy(os.path.join(os.path.dirname(__file__),
|
|
'malformed_example.db'),
|
|
os.path.join(dbpath, '1.db'))
|
|
broker = DatabaseBroker(os.path.join(dbpath, '1.db'))
|
|
broker.db_type = 'test'
|
|
exc = None
|
|
try:
|
|
with broker.get() as conn:
|
|
conn.execute('SELECT * FROM test')
|
|
except Exception as err:
|
|
exc = err
|
|
self.assertEquals(
|
|
str(exc),
|
|
'Quarantined %s to %s due to malformed database' %
|
|
(dbpath, qpath))
|
|
# Test corrupted database
|
|
copy(os.path.join(os.path.dirname(__file__),
|
|
'corrupted_example.db'),
|
|
os.path.join(dbpath, '1.db'))
|
|
broker = DatabaseBroker(os.path.join(dbpath, '1.db'))
|
|
broker.db_type = 'test'
|
|
exc = None
|
|
try:
|
|
with broker.get() as conn:
|
|
conn.execute('SELECT * FROM test')
|
|
except Exception as err:
|
|
exc = err
|
|
self.assertEquals(
|
|
str(exc),
|
|
'Quarantined %s to %s due to corrupted database' %
|
|
(dbpath, qpath))
|
|
|
|
def test_lock(self):
|
|
broker = DatabaseBroker(os.path.join(self.testdir, '1.db'), timeout=.1)
|
|
got_exc = False
|
|
try:
|
|
with broker.lock():
|
|
pass
|
|
except Exception:
|
|
got_exc = True
|
|
self.assert_(got_exc)
|
|
|
|
def stub(*args, **kwargs):
|
|
pass
|
|
broker._initialize = stub
|
|
broker.initialize(normalize_timestamp('1'))
|
|
with broker.lock():
|
|
pass
|
|
with broker.lock():
|
|
pass
|
|
broker2 = DatabaseBroker(os.path.join(self.testdir, '1.db'),
|
|
timeout=.1)
|
|
broker2._initialize = stub
|
|
with broker.lock():
|
|
got_exc = False
|
|
try:
|
|
with broker2.lock():
|
|
pass
|
|
except LockTimeout:
|
|
got_exc = True
|
|
self.assert_(got_exc)
|
|
try:
|
|
with broker.lock():
|
|
raise Exception('test')
|
|
except Exception:
|
|
pass
|
|
with broker.lock():
|
|
pass
|
|
|
|
def test_newid(self):
|
|
broker = DatabaseBroker(':memory:')
|
|
broker.db_type = 'test'
|
|
broker.db_contains_type = 'test'
|
|
uuid1 = str(uuid4())
|
|
|
|
def _initialize(conn, timestamp):
|
|
conn.execute('CREATE TABLE test (one TEXT)')
|
|
conn.execute('CREATE TABLE test_stat (id TEXT)')
|
|
conn.execute('INSERT INTO test_stat (id) VALUES (?)', (uuid1,))
|
|
conn.commit()
|
|
broker._initialize = _initialize
|
|
broker.initialize(normalize_timestamp('1'))
|
|
uuid2 = str(uuid4())
|
|
broker.newid(uuid2)
|
|
with broker.get() as conn:
|
|
uuids = [r[0] for r in conn.execute('SELECT * FROM test_stat')]
|
|
self.assertEquals(len(uuids), 1)
|
|
self.assertNotEquals(uuids[0], uuid1)
|
|
uuid1 = uuids[0]
|
|
points = [(r[0], r[1]) for r in conn.execute(
|
|
'SELECT sync_point, '
|
|
'remote_id FROM incoming_sync WHERE remote_id = ?', (uuid2,))]
|
|
self.assertEquals(len(points), 1)
|
|
self.assertEquals(points[0][0], -1)
|
|
self.assertEquals(points[0][1], uuid2)
|
|
conn.execute('INSERT INTO test (one) VALUES ("1")')
|
|
conn.commit()
|
|
uuid3 = str(uuid4())
|
|
broker.newid(uuid3)
|
|
with broker.get() as conn:
|
|
uuids = [r[0] for r in conn.execute('SELECT * FROM test_stat')]
|
|
self.assertEquals(len(uuids), 1)
|
|
self.assertNotEquals(uuids[0], uuid1)
|
|
uuid1 = uuids[0]
|
|
points = [(r[0], r[1]) for r in conn.execute(
|
|
'SELECT sync_point, '
|
|
'remote_id FROM incoming_sync WHERE remote_id = ?', (uuid3,))]
|
|
self.assertEquals(len(points), 1)
|
|
self.assertEquals(points[0][1], uuid3)
|
|
broker.newid(uuid2)
|
|
with broker.get() as conn:
|
|
uuids = [r[0] for r in conn.execute('SELECT * FROM test_stat')]
|
|
self.assertEquals(len(uuids), 1)
|
|
self.assertNotEquals(uuids[0], uuid1)
|
|
points = [(r[0], r[1]) for r in conn.execute(
|
|
'SELECT sync_point, '
|
|
'remote_id FROM incoming_sync WHERE remote_id = ?', (uuid2,))]
|
|
self.assertEquals(len(points), 1)
|
|
self.assertEquals(points[0][1], uuid2)
|
|
|
|
def test_get_items_since(self):
|
|
broker = DatabaseBroker(':memory:')
|
|
broker.db_type = 'test'
|
|
broker.db_contains_type = 'test'
|
|
|
|
def _initialize(conn, timestamp):
|
|
conn.execute('CREATE TABLE test (one TEXT)')
|
|
conn.execute('INSERT INTO test (one) VALUES ("1")')
|
|
conn.execute('INSERT INTO test (one) VALUES ("2")')
|
|
conn.execute('INSERT INTO test (one) VALUES ("3")')
|
|
conn.commit()
|
|
broker._initialize = _initialize
|
|
broker.initialize(normalize_timestamp('1'))
|
|
self.assertEquals(broker.get_items_since(-1, 10),
|
|
[{'one': '1'}, {'one': '2'}, {'one': '3'}])
|
|
self.assertEquals(broker.get_items_since(-1, 2),
|
|
[{'one': '1'}, {'one': '2'}])
|
|
self.assertEquals(broker.get_items_since(1, 2),
|
|
[{'one': '2'}, {'one': '3'}])
|
|
self.assertEquals(broker.get_items_since(3, 2), [])
|
|
self.assertEquals(broker.get_items_since(999, 2), [])
|
|
|
|
def test_get_sync(self):
|
|
broker = DatabaseBroker(':memory:')
|
|
broker.db_type = 'test'
|
|
broker.db_contains_type = 'test'
|
|
uuid1 = str(uuid4())
|
|
|
|
def _initialize(conn, timestamp):
|
|
conn.execute('CREATE TABLE test (one TEXT)')
|
|
conn.execute('CREATE TABLE test_stat (id TEXT)')
|
|
conn.execute('INSERT INTO test_stat (id) VALUES (?)', (uuid1,))
|
|
conn.execute('INSERT INTO test (one) VALUES ("1")')
|
|
conn.commit()
|
|
pass
|
|
broker._initialize = _initialize
|
|
broker.initialize(normalize_timestamp('1'))
|
|
uuid2 = str(uuid4())
|
|
self.assertEquals(broker.get_sync(uuid2), -1)
|
|
broker.newid(uuid2)
|
|
self.assertEquals(broker.get_sync(uuid2), 1)
|
|
uuid3 = str(uuid4())
|
|
self.assertEquals(broker.get_sync(uuid3), -1)
|
|
with broker.get() as conn:
|
|
conn.execute('INSERT INTO test (one) VALUES ("2")')
|
|
conn.commit()
|
|
broker.newid(uuid3)
|
|
self.assertEquals(broker.get_sync(uuid2), 1)
|
|
self.assertEquals(broker.get_sync(uuid3), 2)
|
|
self.assertEquals(broker.get_sync(uuid2, incoming=False), -1)
|
|
self.assertEquals(broker.get_sync(uuid3, incoming=False), -1)
|
|
broker.merge_syncs([{'sync_point': 1, 'remote_id': uuid2}],
|
|
incoming=False)
|
|
self.assertEquals(broker.get_sync(uuid2), 1)
|
|
self.assertEquals(broker.get_sync(uuid3), 2)
|
|
self.assertEquals(broker.get_sync(uuid2, incoming=False), 1)
|
|
self.assertEquals(broker.get_sync(uuid3, incoming=False), -1)
|
|
broker.merge_syncs([{'sync_point': 2, 'remote_id': uuid3}],
|
|
incoming=False)
|
|
self.assertEquals(broker.get_sync(uuid2, incoming=False), 1)
|
|
self.assertEquals(broker.get_sync(uuid3, incoming=False), 2)
|
|
|
|
def test_merge_syncs(self):
|
|
broker = DatabaseBroker(':memory:')
|
|
|
|
def stub(*args, **kwargs):
|
|
pass
|
|
broker._initialize = stub
|
|
broker.initialize(normalize_timestamp('1'))
|
|
uuid2 = str(uuid4())
|
|
broker.merge_syncs([{'sync_point': 1, 'remote_id': uuid2}])
|
|
self.assertEquals(broker.get_sync(uuid2), 1)
|
|
uuid3 = str(uuid4())
|
|
broker.merge_syncs([{'sync_point': 2, 'remote_id': uuid3}])
|
|
self.assertEquals(broker.get_sync(uuid2), 1)
|
|
self.assertEquals(broker.get_sync(uuid3), 2)
|
|
self.assertEquals(broker.get_sync(uuid2, incoming=False), -1)
|
|
self.assertEquals(broker.get_sync(uuid3, incoming=False), -1)
|
|
broker.merge_syncs([{'sync_point': 3, 'remote_id': uuid2},
|
|
{'sync_point': 4, 'remote_id': uuid3}],
|
|
incoming=False)
|
|
self.assertEquals(broker.get_sync(uuid2, incoming=False), 3)
|
|
self.assertEquals(broker.get_sync(uuid3, incoming=False), 4)
|
|
self.assertEquals(broker.get_sync(uuid2), 1)
|
|
self.assertEquals(broker.get_sync(uuid3), 2)
|
|
broker.merge_syncs([{'sync_point': 5, 'remote_id': uuid2}])
|
|
self.assertEquals(broker.get_sync(uuid2), 5)
|
|
|
|
def test_get_replication_info(self):
|
|
self.get_replication_info_tester(metadata=False)
|
|
|
|
def test_get_replication_info_with_metadata(self):
|
|
self.get_replication_info_tester(metadata=True)
|
|
|
|
def get_replication_info_tester(self, metadata=False):
|
|
broker = DatabaseBroker(':memory:', account='a')
|
|
broker.db_type = 'test'
|
|
broker.db_contains_type = 'test'
|
|
broker_creation = normalize_timestamp(1)
|
|
broker_uuid = str(uuid4())
|
|
broker_metadata = metadata and simplejson.dumps(
|
|
{'Test': ('Value', normalize_timestamp(1))}) or ''
|
|
|
|
def _initialize(conn, put_timestamp):
|
|
if put_timestamp is None:
|
|
put_timestamp = normalize_timestamp(0)
|
|
conn.executescript('''
|
|
CREATE TABLE test (
|
|
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT UNIQUE,
|
|
created_at TEXT
|
|
);
|
|
CREATE TRIGGER test_insert AFTER INSERT ON test
|
|
BEGIN
|
|
UPDATE test_stat
|
|
SET test_count = test_count + 1,
|
|
hash = chexor(hash, new.name, new.created_at);
|
|
END;
|
|
CREATE TRIGGER test_update BEFORE UPDATE ON test
|
|
BEGIN
|
|
SELECT RAISE(FAIL,
|
|
'UPDATE not allowed; DELETE and INSERT');
|
|
END;
|
|
CREATE TRIGGER test_delete AFTER DELETE ON test
|
|
BEGIN
|
|
UPDATE test_stat
|
|
SET test_count = test_count - 1,
|
|
hash = chexor(hash, old.name, old.created_at);
|
|
END;
|
|
CREATE TABLE test_stat (
|
|
account TEXT,
|
|
created_at TEXT,
|
|
put_timestamp TEXT DEFAULT '0',
|
|
delete_timestamp TEXT DEFAULT '0',
|
|
test_count INTEGER,
|
|
hash TEXT default '00000000000000000000000000000000',
|
|
id TEXT
|
|
%s
|
|
);
|
|
INSERT INTO test_stat (test_count) VALUES (0);
|
|
''' % (metadata and ", metadata TEXT DEFAULT ''" or ""))
|
|
conn.execute('''
|
|
UPDATE test_stat
|
|
SET account = ?, created_at = ?, id = ?, put_timestamp = ?
|
|
''', (broker.account, broker_creation, broker_uuid, put_timestamp))
|
|
if metadata:
|
|
conn.execute('UPDATE test_stat SET metadata = ?',
|
|
(broker_metadata,))
|
|
conn.commit()
|
|
broker._initialize = _initialize
|
|
put_timestamp = normalize_timestamp(2)
|
|
broker.initialize(put_timestamp)
|
|
info = broker.get_replication_info()
|
|
self.assertEquals(info, {
|
|
'count': 0,
|
|
'hash': '00000000000000000000000000000000',
|
|
'created_at': broker_creation, 'put_timestamp': put_timestamp,
|
|
'delete_timestamp': '0', 'max_row': -1, 'id': broker_uuid,
|
|
'metadata': broker_metadata})
|
|
insert_timestamp = normalize_timestamp(3)
|
|
with broker.get() as conn:
|
|
conn.execute('''
|
|
INSERT INTO test (name, created_at) VALUES ('test', ?)
|
|
''', (insert_timestamp,))
|
|
conn.commit()
|
|
info = broker.get_replication_info()
|
|
self.assertEquals(info, {
|
|
'count': 1,
|
|
'hash': 'bdc4c93f574b0d8c2911a27ce9dd38ba',
|
|
'created_at': broker_creation, 'put_timestamp': put_timestamp,
|
|
'delete_timestamp': '0', 'max_row': 1, 'id': broker_uuid,
|
|
'metadata': broker_metadata})
|
|
with broker.get() as conn:
|
|
conn.execute('DELETE FROM test')
|
|
conn.commit()
|
|
info = broker.get_replication_info()
|
|
self.assertEquals(info, {
|
|
'count': 0,
|
|
'hash': '00000000000000000000000000000000',
|
|
'created_at': broker_creation, 'put_timestamp': put_timestamp,
|
|
'delete_timestamp': '0', 'max_row': 1, 'id': broker_uuid,
|
|
'metadata': broker_metadata})
|
|
return broker
|
|
|
|
def test_metadata(self):
|
|
def reclaim(broker, timestamp):
|
|
with broker.get() as conn:
|
|
broker._reclaim(conn, timestamp)
|
|
conn.commit()
|
|
# Initializes a good broker for us
|
|
broker = self.get_replication_info_tester(metadata=True)
|
|
# Add our first item
|
|
first_timestamp = normalize_timestamp(1)
|
|
first_value = '1'
|
|
broker.update_metadata({'First': [first_value, first_timestamp]})
|
|
self.assert_('First' in broker.metadata)
|
|
self.assertEquals(broker.metadata['First'],
|
|
[first_value, first_timestamp])
|
|
# Add our second item
|
|
second_timestamp = normalize_timestamp(2)
|
|
second_value = '2'
|
|
broker.update_metadata({'Second': [second_value, second_timestamp]})
|
|
self.assert_('First' in broker.metadata)
|
|
self.assertEquals(broker.metadata['First'],
|
|
[first_value, first_timestamp])
|
|
self.assert_('Second' in broker.metadata)
|
|
self.assertEquals(broker.metadata['Second'],
|
|
[second_value, second_timestamp])
|
|
# Update our first item
|
|
first_timestamp = normalize_timestamp(3)
|
|
first_value = '1b'
|
|
broker.update_metadata({'First': [first_value, first_timestamp]})
|
|
self.assert_('First' in broker.metadata)
|
|
self.assertEquals(broker.metadata['First'],
|
|
[first_value, first_timestamp])
|
|
self.assert_('Second' in broker.metadata)
|
|
self.assertEquals(broker.metadata['Second'],
|
|
[second_value, second_timestamp])
|
|
# Delete our second item (by setting to empty string)
|
|
second_timestamp = normalize_timestamp(4)
|
|
second_value = ''
|
|
broker.update_metadata({'Second': [second_value, second_timestamp]})
|
|
self.assert_('First' in broker.metadata)
|
|
self.assertEquals(broker.metadata['First'],
|
|
[first_value, first_timestamp])
|
|
self.assert_('Second' in broker.metadata)
|
|
self.assertEquals(broker.metadata['Second'],
|
|
[second_value, second_timestamp])
|
|
# Reclaim at point before second item was deleted
|
|
reclaim(broker, normalize_timestamp(3))
|
|
self.assert_('First' in broker.metadata)
|
|
self.assertEquals(broker.metadata['First'],
|
|
[first_value, first_timestamp])
|
|
self.assert_('Second' in broker.metadata)
|
|
self.assertEquals(broker.metadata['Second'],
|
|
[second_value, second_timestamp])
|
|
# Reclaim at point second item was deleted
|
|
reclaim(broker, normalize_timestamp(4))
|
|
self.assert_('First' in broker.metadata)
|
|
self.assertEquals(broker.metadata['First'],
|
|
[first_value, first_timestamp])
|
|
self.assert_('Second' in broker.metadata)
|
|
self.assertEquals(broker.metadata['Second'],
|
|
[second_value, second_timestamp])
|
|
# Reclaim after point second item was deleted
|
|
reclaim(broker, normalize_timestamp(5))
|
|
self.assert_('First' in broker.metadata)
|
|
self.assertEquals(broker.metadata['First'],
|
|
[first_value, first_timestamp])
|
|
self.assert_('Second' not in broker.metadata)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|