From b02f0db126b873902bdb63f94a3c412082e14a54 Mon Sep 17 00:00:00 2001 From: Pete Zaitcev Date: Tue, 17 Jun 2014 22:35:59 -0700 Subject: [PATCH] Refactoring storage policies merge_timestamps * base implementation of is_deleted phrased to use _is_deleted * wrap pre-conn coded _is_deleted inside a transation for merge_timestamps Implements: blueprint storage-policies Change-Id: I6a948908c3e45b70707981d87171cb2cb910fe1e --- swift/account/backend.py | 38 ++- swift/common/db.py | 44 ++- swift/container/backend.py | 37 ++- test/unit/account/test_backend.py | 31 +- test/unit/common/test_db.py | 461 +++++++++++++++++++++++----- test/unit/container/test_backend.py | 18 ++ 6 files changed, 512 insertions(+), 117 deletions(-) diff --git a/swift/account/backend.py b/swift/account/backend.py index 815ccd5ef8..10cf18c134 100644 --- a/swift/account/backend.py +++ b/swift/account/backend.py @@ -154,9 +154,9 @@ class AccountBroker(DatabaseBroker): conn.execute(''' UPDATE account_stat SET account = ?, created_at = ?, id = ?, - put_timestamp = ? + put_timestamp = ?, status_changed_at = ? ''', (self.account, normalize_timestamp(time.time()), str(uuid4()), - put_timestamp)) + put_timestamp, put_timestamp)) def create_policy_stat_table(self, conn): """ @@ -284,23 +284,29 @@ class AccountBroker(DatabaseBroker): protocol=PICKLE_PROTOCOL).encode('base64')) fp.flush() - def is_deleted(self): + def _is_deleted_info(self, status, container_count, delete_timestamp, + put_timestamp): """ - Check if the account DB is considered to be deleted. + Apply delete logic to database info. - :returns: True if the account DB is considered to be deleted, False - otherwise + :returns: True if the DB is considered to be deleted, False otherwise """ - if self.db_file != ':memory:' and not os.path.exists(self.db_file): - return True - self._commit_puts_stale_ok() - with self.get() as conn: - row = conn.execute(''' - SELECT put_timestamp, delete_timestamp, container_count, status - FROM account_stat''').fetchone() - return row['status'] == 'DELETED' or ( - row['container_count'] in (None, '', 0, '0') and - row['delete_timestamp'] > row['put_timestamp']) + return status == 'DELETED' or ( + container_count in (None, '', 0, '0') and + float(delete_timestamp) > float(put_timestamp)) + + def _is_deleted(self, conn): + """ + Check account_stat table and evaluate info. + + :param conn: database conn + + :returns: True if the DB is considered to be deleted, False otherwise + """ + info = conn.execute(''' + SELECT put_timestamp, delete_timestamp, container_count, status + FROM account_stat''').fetchone() + return self._is_deleted_info(**info) def is_status_deleted(self): """Only returns true if the status field is set to DELETED.""" diff --git a/swift/common/db.py b/swift/common/db.py index e6812d86cd..10fcc1be9f 100644 --- a/swift/common/db.py +++ b/swift/common/db.py @@ -425,6 +425,28 @@ class DatabaseBroker(object): # Override for additional work when receiving an rsynced db. pass + def _is_deleted(self, conn): + """ + Check if the database is considered deleted + + :param conn: database conn + + :returns: True if the DB is considered to be deleted, False otherwise + """ + raise NotImplementedError() + + def is_deleted(self): + """ + Check if the DB is considered to be deleted. + + :returns: True if the DB is considered to be deleted, False otherwise + """ + if self.db_file != ':memory:' and not os.path.exists(self.db_file): + return True + self._commit_puts_stale_ok() + with self.get() as conn: + return self._is_deleted(conn) + def merge_timestamps(self, created_at, put_timestamp, delete_timestamp): """ Used in replication to handle updating timestamps. @@ -433,17 +455,18 @@ class DatabaseBroker(object): :param put_timestamp: put timestamp :param delete_timestamp: delete timestamp """ - current_status = self.is_deleted() with self.get() as conn: + old_status = self._is_deleted(conn) conn.execute(''' UPDATE %s_stat SET created_at=MIN(?, created_at), put_timestamp=MAX(?, put_timestamp), delete_timestamp=MAX(?, delete_timestamp) ''' % self.db_type, (created_at, put_timestamp, delete_timestamp)) + if old_status != self._is_deleted(conn): + timestamp = normalize_timestamp(time.time()) + self._update_status_changed_at(conn, timestamp) + conn.commit() - if self.is_deleted() != current_status: - timestamp = normalize_timestamp(time.time()) - self.update_status_changed_at(timestamp) def get_items_since(self, start, count): """ @@ -489,7 +512,7 @@ class DatabaseBroker(object): with self.get() as conn: curs = conn.execute(''' SELECT remote_id, sync_point FROM %s_sync - ''' % 'incoming' if incoming else 'outgoing') + ''' % ('incoming' if incoming else 'outgoing')) result = [] for row in curs: result.append({'remote_id': row[0], 'sync_point': row[1]}) @@ -783,8 +806,11 @@ class DatabaseBroker(object): current status_changed_at timestamp. """ with self.get() as conn: - conn.execute( - 'UPDATE %s_stat SET status_changed_at = ?' - ' WHERE status_changed_at < ?' % self.db_type, - (timestamp, timestamp)) + self._update_status_changed_at(conn, timestamp) conn.commit() + + def _update_status_changed_at(self, conn, timestamp): + conn.execute( + 'UPDATE %s_stat SET status_changed_at = ?' + ' WHERE status_changed_at < ?' % self.db_type, + (timestamp, timestamp)) diff --git a/swift/container/backend.py b/swift/container/backend.py index 584cb4ef4f..6a2f92bebd 100644 --- a/swift/container/backend.py +++ b/swift/container/backend.py @@ -361,16 +361,33 @@ class ContainerBroker(DatabaseBroker): protocol=PICKLE_PROTOCOL).encode('base64')) fp.flush() - def is_deleted(self, **kwargs): + def _is_deleted_info(self, object_count, put_timestamp, delete_timestamp, + **kwargs): """ - Check if the DB is considered to be deleted. + Apply delete logic to database info. :returns: True if the DB is considered to be deleted, False otherwise """ - _info, is_deleted = self.get_info_is_deleted(**kwargs) - return is_deleted + # The container is considered deleted if the delete_timestamp + # value is greater than the put_timestamp, and there are no + # objects in the container. + return (object_count in (None, '', 0, '0')) and ( + float(delete_timestamp) > float(put_timestamp)) - def get_info_is_deleted(self, timestamp=None): + def _is_deleted(self, conn): + """ + Check container_stat view and evaluate info. + + :param conn: database conn + + :returns: True if the DB is considered to be deleted, False otherwise + """ + info = conn.execute(''' + SELECT put_timestamp, delete_timestamp, object_count + FROM container_stat''').fetchone() + return self._is_deleted_info(**info) + + def get_info_is_deleted(self): """ Get the is_deleted status and info for the container. @@ -380,15 +397,7 @@ class ContainerBroker(DatabaseBroker): if self.db_file != ':memory:' and not os.path.exists(self.db_file): return {}, True info = self.get_info() - # leave this db as a tombstone for a consistency window - if timestamp and info['delete_timestamp'] > timestamp: - return info, False - # The container is considered deleted if the delete_timestamp - # value is greater than the put_timestamp, and there are no - # objects in the container. - is_deleted = (info['object_count'] in (None, '', 0, '0')) and \ - (float(info['delete_timestamp']) > float(info['put_timestamp'])) - return info, is_deleted + return info, self._is_deleted_info(**info) def get_info(self): """ diff --git a/test/unit/account/test_backend.py b/test/unit/account/test_backend.py index 61a4f66c52..5b47f102f1 100644 --- a/test/unit/account/test_backend.py +++ b/test/unit/account/test_backend.py @@ -26,6 +26,7 @@ from shutil import rmtree import sqlite3 import itertools from contextlib import contextmanager +import random from swift.account.backend import AccountBroker from swift.common.utils import normalize_timestamp @@ -33,6 +34,8 @@ from test.unit import patch_policies, with_tempdir from swift.common.db import DatabaseConnectionError from swift.common.storage_policy import StoragePolicy, POLICIES +from test.unit.common.test_db import TestExampleBroker + @patch_policies class TestAccountBroker(unittest.TestCase): @@ -167,7 +170,11 @@ class TestAccountBroker(unittest.TestCase): self.assertEqual(info['put_timestamp'], normalize_timestamp(start)) self.assert_(float(info['created_at']) >= start) self.assertEqual(info['delete_timestamp'], '0') - self.assertEqual(info['status_changed_at'], '0') + if self.__class__ == TestAccountBrokerBeforeMetadata: + self.assertEqual(info['status_changed_at'], '0') + else: + self.assertEqual(info['status_changed_at'], + normalize_timestamp(start)) # delete it delete_timestamp = normalize_timestamp(ts.next()) @@ -320,7 +327,10 @@ class TestAccountBroker(unittest.TestCase): self.assertEqual(info['hash'], '00000000000000000000000000000000') self.assertEqual(info['put_timestamp'], normalize_timestamp(1)) self.assertEqual(info['delete_timestamp'], '0') - self.assertEqual(info['status_changed_at'], '0') + if self.__class__ == TestAccountBrokerBeforeMetadata: + self.assertEqual(info['status_changed_at'], '0') + else: + self.assertEqual(info['status_changed_at'], normalize_timestamp(1)) info = broker.get_info() self.assertEqual(info['container_count'], 0) @@ -725,6 +735,23 @@ def premetadata_create_account_stat_table(self, conn, put_timestamp): put_timestamp)) +class TestCommonAccountBroker(TestExampleBroker): + + broker_class = AccountBroker + + def setUp(self): + super(TestCommonAccountBroker, self).setUp() + self.policy = random.choice(list(POLICIES)) + + def put_item(self, broker, timestamp): + broker.put_container('test', timestamp, 0, 0, 0, + int(self.policy)) + + def delete_item(self, broker, timestamp): + broker.put_container('test', 0, timestamp, 0, 0, + int(self.policy)) + + class TestAccountBrokerBeforeMetadata(TestAccountBroker): """ Tests for AccountBroker against databases created before diff --git a/test/unit/common/test_db.py b/test/unit/common/test_db.py index 5a9b79cd17..461e869f6b 100644 --- a/test/unit/common/test_db.py +++ b/test/unit/common/test_db.py @@ -20,6 +20,7 @@ import unittest from tempfile import mkdtemp from shutil import rmtree, copy from uuid import uuid4 +import cPickle as pickle import simplejson import sqlite3 @@ -33,10 +34,12 @@ from eventlet.timeout import Timeout import swift.common.db from swift.common.db import chexor, dict_factory, get_db_connection, \ DatabaseBroker, DatabaseConnectionError, DatabaseAlreadyExists, \ - GreenDBConnection + GreenDBConnection, PICKLE_PROTOCOL from swift.common.utils import normalize_timestamp, mkdirs, json from swift.common.exceptions import LockTimeout +from test.unit import with_tempdir + class TestDatabaseConnectionError(unittest.TestCase): @@ -175,25 +178,399 @@ class TestGetDBConnection(unittest.TestCase): class ExampleBroker(DatabaseBroker): + """ + Concrete enough implementation of a DatabaseBroker. + """ db_type = 'test' db_contains_type = 'test' - def _initialize(self, conn, timestamp, **kwargs): + def _initialize(self, conn, put_timestamp, **kwargs): conn.executescript(''' CREATE TABLE test_stat ( - name TEXT, - timestamp TEXT DEFAULT 0, - status_changed_at TEXT DEFAULT 0 + test_count INTEGER DEFAULT 0, + created_at TEXT, + put_timestamp TEXT DEFAULT '0', + delete_timestamp TEXT DEFAULT '0', + status_changed_at TEXT DEFAULT '0', + metadata TEXT DEFAULT '' + ); CREATE TABLE test ( ROWID INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT + name TEXT, + created_at TEXT, + deleted INTEGER DEFAULT 0 ); + CREATE TRIGGER test_insert AFTER INSERT ON test + BEGIN + UPDATE test_stat + SET test_count = test_count + (1 - new.deleted); + END; + CREATE TRIGGER test_delete AFTER DELETE ON test + BEGIN + UPDATE test_stat + SET test_count = test_count - (1 - old.deleted); + END; ''') conn.execute(""" - INSERT INTO test_stat (name, timestamp) VALUES (?, ?) - """, (self.account, timestamp)) + INSERT INTO test_stat ( + created_at, put_timestamp, status_changed_at) + VALUES (?, ?, ?); + """, (normalize_timestamp(time.time()), put_timestamp, + put_timestamp)) + + def merge_items(self, item_list): + with self.get() as conn: + for rec in item_list: + conn.execute( + 'DELETE FROM test WHERE name = ? and created_at < ?', ( + rec['name'], rec['created_at'])) + if not conn.execute( + 'SELECT 1 FROM test WHERE name = ?', + (rec['name'],)).fetchall(): + conn.execute(''' + INSERT INTO test (name, created_at, deleted) + VALUES (?, ?, ?)''', ( + rec['name'], rec['created_at'], rec['deleted'])) + conn.commit() + + def _commit_puts_load(self, item_list, entry): + (name, timestamp, deleted) = pickle.loads(entry.decode('base64')) + item_list.append({ + 'name': name, + 'created_at': timestamp, + 'deleted': deleted, + }) + + def _load_item(self, name, timestamp, deleted): + if self.db_file == ':memory:': + record = { + 'name': name, + 'created_at': timestamp, + 'deleted': deleted, + } + self.merge_items([record]) + return + with open(self.pending_file, 'a+b') as fp: + fp.write(':') + fp.write(pickle.dumps( + (name, timestamp, deleted), + protocol=PICKLE_PROTOCOL).encode('base64')) + fp.flush() + + def put_test(self, name, timestamp): + self._load_item(name, timestamp, 0) + + def delete_test(self, name, timestamp): + self._load_item(name, timestamp, 1) + + def _is_deleted(self, conn): + info = conn.execute('SELECT * FROM test_stat').fetchone() + return (info['test_count'] in (None, '', 0, '0')) and \ + (normalize_timestamp(info['delete_timestamp']) > + normalize_timestamp(info['put_timestamp'])) + + +class TestExampleBroker(unittest.TestCase): + """ + Tests that use the mostly Concrete enough ExampleBroker to exercise some + of the abstract methods on DatabaseBroker. + """ + + broker_class = ExampleBroker + policy = 0 + + def test_merge_timestamps_simple_delete(self): + ts = (normalize_timestamp(t) for t in + itertools.count(int(time.time()))) + put_timestamp = ts.next() + broker = self.broker_class(':memory:', account='a', container='c') + broker.initialize(put_timestamp) + created_at = broker.get_info()['created_at'] + broker.merge_timestamps(created_at, put_timestamp, '0') + info = broker.get_info() + self.assertEqual(info['created_at'], created_at) + self.assertEqual(info['put_timestamp'], put_timestamp) + self.assertEqual(info['delete_timestamp'], '0') + self.assertEqual(info['status_changed_at'], put_timestamp) + # delete + delete_timestamp = ts.next() + broker.merge_timestamps(created_at, put_timestamp, delete_timestamp) + self.assert_(broker.is_deleted()) + info = broker.get_info() + self.assertEqual(info['created_at'], created_at) + self.assertEqual(info['put_timestamp'], put_timestamp) + self.assertEqual(info['delete_timestamp'], delete_timestamp) + self.assert_(info['status_changed_at'] > + normalize_timestamp(put_timestamp)) + + def put_item(self, broker, timestamp): + broker.put_test('test', timestamp) + + def delete_item(self, broker, timestamp): + broker.delete_test('test', timestamp) + + def test_merge_timestamps_delete_with_objects(self): + ts = (normalize_timestamp(t) for t in + itertools.count(int(time.time()))) + put_timestamp = ts.next() + broker = self.broker_class(':memory:', account='a', container='c') + broker.initialize(put_timestamp, storage_policy_index=int(self.policy)) + created_at = broker.get_info()['created_at'] + broker.merge_timestamps(created_at, put_timestamp, '0') + info = broker.get_info() + self.assertEqual(info['created_at'], created_at) + self.assertEqual(info['put_timestamp'], put_timestamp) + self.assertEqual(info['delete_timestamp'], '0') + self.assertEqual(info['status_changed_at'], put_timestamp) + # add object + self.put_item(broker, ts.next()) + self.assertEqual(broker.get_info()[ + '%s_count' % broker.db_contains_type], 1) + # delete + delete_timestamp = ts.next() + broker.merge_timestamps(created_at, put_timestamp, delete_timestamp) + self.assertFalse(broker.is_deleted()) + info = broker.get_info() + self.assertEqual(info['created_at'], created_at) + self.assertEqual(info['put_timestamp'], put_timestamp) + self.assertEqual(info['delete_timestamp'], delete_timestamp) + # status is unchanged + self.assertEqual(info['status_changed_at'], put_timestamp) + # count is causing status to hold on + self.delete_item(broker, ts.next()) + self.assertEqual(broker.get_info()[ + '%s_count' % broker.db_contains_type], 0) + self.assert_(broker.is_deleted()) + + def test_merge_timestamps_simple_recreate(self): + ts = (normalize_timestamp(t) for t in + itertools.count(int(time.time()))) + put_timestamp = ts.next() + broker = self.broker_class(':memory:', account='a', container='c') + broker.initialize(put_timestamp, storage_policy_index=int(self.policy)) + virgin_status_changed_at = broker.get_info()['status_changed_at'] + created_at = broker.get_info()['created_at'] + delete_timestamp = ts.next() + broker.merge_timestamps(created_at, put_timestamp, delete_timestamp) + self.assert_(broker.is_deleted()) + info = broker.get_info() + self.assertEqual(info['created_at'], created_at) + self.assertEqual(info['put_timestamp'], put_timestamp) + self.assertEqual(info['delete_timestamp'], delete_timestamp) + orig_status_changed_at = info['status_changed_at'] + self.assert_(orig_status_changed_at > + normalize_timestamp(virgin_status_changed_at)) + # recreate + recreate_timestamp = ts.next() + status_changed_at = time.time() + with patch('swift.common.db.time.time', new=lambda: status_changed_at): + broker.merge_timestamps(created_at, recreate_timestamp, '0') + self.assertFalse(broker.is_deleted()) + info = broker.get_info() + self.assertEqual(info['created_at'], created_at) + self.assertEqual(info['put_timestamp'], recreate_timestamp) + self.assertEqual(info['delete_timestamp'], delete_timestamp) + self.assert_(info['status_changed_at'], status_changed_at) + + def test_merge_timestamps_recreate_with_objects(self): + ts = (normalize_timestamp(t) for t in + itertools.count(int(time.time()))) + put_timestamp = ts.next() + broker = self.broker_class(':memory:', account='a', container='c') + broker.initialize(put_timestamp, storage_policy_index=int(self.policy)) + created_at = broker.get_info()['created_at'] + # delete + delete_timestamp = ts.next() + broker.merge_timestamps(created_at, put_timestamp, delete_timestamp) + self.assert_(broker.is_deleted()) + info = broker.get_info() + self.assertEqual(info['created_at'], created_at) + self.assertEqual(info['put_timestamp'], put_timestamp) + self.assertEqual(info['delete_timestamp'], delete_timestamp) + orig_status_changed_at = info['status_changed_at'] + self.assert_(normalize_timestamp(orig_status_changed_at) >= + normalize_timestamp(put_timestamp)) + # add object + self.put_item(broker, ts.next()) + count_key = '%s_count' % broker.db_contains_type + self.assertEqual(broker.get_info()[count_key], 1) + self.assertFalse(broker.is_deleted()) + # recreate + recreate_timestamp = ts.next() + broker.merge_timestamps(created_at, recreate_timestamp, '0') + self.assertFalse(broker.is_deleted()) + info = broker.get_info() + self.assertEqual(info['created_at'], created_at) + self.assertEqual(info['put_timestamp'], recreate_timestamp) + self.assertEqual(info['delete_timestamp'], delete_timestamp) + self.assertEqual(info['status_changed_at'], orig_status_changed_at) + # count is not causing status to hold on + self.delete_item(broker, ts.next()) + self.assertFalse(broker.is_deleted()) + + def test_merge_timestamps_update_put_no_status_change(self): + ts = (normalize_timestamp(t) for t in + itertools.count(int(time.time()))) + put_timestamp = ts.next() + broker = self.broker_class(':memory:', account='a', container='c') + broker.initialize(put_timestamp, storage_policy_index=int(self.policy)) + info = broker.get_info() + orig_status_changed_at = info['status_changed_at'] + created_at = info['created_at'] + new_put_timestamp = ts.next() + broker.merge_timestamps(created_at, new_put_timestamp, '0') + info = broker.get_info() + self.assertEqual(new_put_timestamp, info['put_timestamp']) + self.assertEqual(orig_status_changed_at, info['status_changed_at']) + + def test_merge_timestamps_update_delete_no_status_change(self): + ts = (normalize_timestamp(t) for t in + itertools.count(int(time.time()))) + put_timestamp = ts.next() + broker = self.broker_class(':memory:', account='a', container='c') + broker.initialize(put_timestamp, storage_policy_index=int(self.policy)) + created_at = broker.get_info()['created_at'] + broker.merge_timestamps(created_at, put_timestamp, ts.next()) + orig_status_changed_at = broker.get_info()['status_changed_at'] + new_delete_timestamp = ts.next() + broker.merge_timestamps(created_at, put_timestamp, + new_delete_timestamp) + info = broker.get_info() + self.assertEqual(new_delete_timestamp, info['delete_timestamp']) + self.assertEqual(orig_status_changed_at, info['status_changed_at']) + + def test_get_max_row(self): + ts = (normalize_timestamp(t) for t in + itertools.count(int(time.time()))) + broker = self.broker_class(':memory:', account='a', container='c') + broker.initialize(ts.next(), storage_policy_index=int(self.policy)) + self.assertEquals(-1, broker.get_max_row()) + self.put_item(broker, ts.next()) + self.assertEquals(1, broker.get_max_row()) + self.delete_item(broker, ts.next()) + self.assertEquals(2, broker.get_max_row()) + self.put_item(broker, ts.next()) + self.assertEquals(3, broker.get_max_row()) + + def test_get_info(self): + broker = self.broker_class(':memory:', account='test', container='c') + created_at = time.time() + with patch('swift.common.db.time.time', new=lambda: created_at): + broker.initialize(normalize_timestamp(1), + storage_policy_index=int(self.policy)) + info = broker.get_info() + count_key = '%s_count' % broker.db_contains_type + expected = { + count_key: 0, + 'created_at': normalize_timestamp(created_at), + 'put_timestamp': normalize_timestamp(1), + 'status_changed_at': normalize_timestamp(1), + 'delete_timestamp': '0', + } + for k, v in expected.items(): + self.assertEqual(info[k], v, + 'mismatch for %s, %s != %s' % ( + k, info[k], v)) + + def test_get_raw_metadata(self): + broker = self.broker_class(':memory:', account='test', container='c') + broker.initialize(normalize_timestamp(0), + storage_policy_index=int(self.policy)) + self.assertEqual(broker.metadata, {}) + self.assertEqual(broker.get_raw_metadata(), '') + key = u'test\u062a'.encode('utf-8') + value = u'value\u062a' + metadata = { + key: [value, normalize_timestamp(1)] + } + broker.update_metadata(metadata) + self.assertEqual(broker.metadata, metadata) + self.assertEqual(broker.get_raw_metadata(), + json.dumps(metadata)) + + def test_put_timestamp(self): + ts = (normalize_timestamp(t) for t in + itertools.count(int(time.time()))) + broker = self.broker_class(':memory:', account='a', container='c') + orig_put_timestamp = ts.next() + broker.initialize(orig_put_timestamp, + storage_policy_index=int(self.policy)) + self.assertEqual(broker.get_info()['put_timestamp'], + orig_put_timestamp) + # put_timestamp equal - no change + broker.update_put_timestamp(orig_put_timestamp) + self.assertEqual(broker.get_info()['put_timestamp'], + orig_put_timestamp) + # put_timestamp newer - gets newer + newer_put_timestamp = ts.next() + broker.update_put_timestamp(newer_put_timestamp) + self.assertEqual(broker.get_info()['put_timestamp'], + newer_put_timestamp) + # put_timestamp older - no change + broker.update_put_timestamp(orig_put_timestamp) + self.assertEqual(broker.get_info()['put_timestamp'], + newer_put_timestamp) + + def test_status_changed_at(self): + ts = (normalize_timestamp(t) for t in + itertools.count(int(time.time()))) + broker = self.broker_class(':memory:', account='test', container='c') + put_timestamp = ts.next() + created_at = time.time() + with patch('swift.common.db.time.time', new=lambda: created_at): + broker.initialize(put_timestamp, + storage_policy_index=int(self.policy)) + self.assertEquals(broker.get_info()['status_changed_at'], + put_timestamp) + self.assertEquals(broker.get_info()['created_at'], + normalize_timestamp(created_at)) + status_changed_at = ts.next() + broker.update_status_changed_at(status_changed_at) + self.assertEqual(broker.get_info()['status_changed_at'], + status_changed_at) + # save the old and get a new status_changed_at + old_status_changed_at, status_changed_at = \ + status_changed_at, ts.next() + broker.update_status_changed_at(status_changed_at) + self.assertEqual(broker.get_info()['status_changed_at'], + status_changed_at) + # status changed at won't go backwards... + broker.update_status_changed_at(old_status_changed_at) + self.assertEqual(broker.get_info()['status_changed_at'], + status_changed_at) + + def test_get_syncs(self): + broker = self.broker_class(':memory:', account='a', container='c') + broker.initialize(normalize_timestamp(time.time()), + storage_policy_index=int(self.policy)) + self.assertEqual([], broker.get_syncs()) + broker.merge_syncs([{'sync_point': 1, 'remote_id': 'remote1'}]) + self.assertEqual([{'sync_point': 1, 'remote_id': 'remote1'}], + broker.get_syncs()) + self.assertEqual([], broker.get_syncs(incoming=False)) + broker.merge_syncs([{'sync_point': 2, 'remote_id': 'remote2'}], + incoming=False) + self.assertEqual([{'sync_point': 2, 'remote_id': 'remote2'}], + broker.get_syncs(incoming=False)) + + @with_tempdir + def test_commit_pending(self, tempdir): + ts = (normalize_timestamp(t) for t in + itertools.count(int(time.time()))) + broker = self.broker_class(os.path.join(tempdir, 'test.db'), + account='a', container='c') + broker.initialize(ts.next(), storage_policy_index=int(self.policy)) + self.put_item(broker, ts.next()) + qry = 'select * from %s_stat' % broker.db_type + with broker.get() as conn: + rows = [dict(x) for x in conn.execute(qry)] + info = rows[0] + count_key = '%s_count' % broker.db_contains_type + self.assertEqual(0, info[count_key]) + broker.get_info() + self.assertEqual(1, broker.get_info()[count_key]) class TestDatabaseBroker(unittest.TestCase): @@ -731,74 +1108,6 @@ class TestDatabaseBroker(unittest.TestCase): [first_value, first_timestamp]) self.assert_('Second' not in broker.metadata) - def test_get_max_row(self): - broker = ExampleBroker(':memory:') - broker.initialize(0) - self.assertEquals(-1, broker.get_max_row()) - with broker.get() as conn: - conn.execute(''' - INSERT INTO test (name) VALUES (?) - ''', ('test_name',)) - conn.commit() - self.assertEquals(1, broker.get_max_row()) - with broker.get() as conn: - conn.executescript(''' - DELETE FROM test; - ''') - conn.commit() - self.assertEquals(1, broker.get_max_row()) - with broker.get() as conn: - conn.execute(''' - INSERT INTO test (name) VALUES (?) - ''', ('test_name',)) - conn.commit() - self.assertEquals(2, broker.get_max_row()) - - def test_get_info(self): - broker = ExampleBroker(':memory:', account='test') - broker.initialize(normalize_timestamp(1)) - info = broker.get_info() - expected = { - 'name': 'test', - 'timestamp': '0000000001.00000', - 'status_changed_at': '0', - } - self.assertEqual(info, expected) - - def test_get_raw_metadata(self): - broker = ExampleBroker(':memory:', account='test') - broker.initialize(normalize_timestamp(0)) - self.assertEqual(broker.metadata, {}) - self.assertEqual(broker.get_raw_metadata(), '') - metadata = { - 'test': ['value', normalize_timestamp(1)] - } - broker.update_metadata(metadata) - self.assertEqual(broker.metadata, metadata) - self.assertEqual(broker.get_raw_metadata(), - json.dumps(metadata)) - - def test_status_changed_at(self): - ts = (normalize_timestamp(t) for t in - itertools.count(int(time.time()))) - broker = ExampleBroker(':memory:', account='test') - broker.initialize(ts.next()) - self.assertEquals(broker.get_info()['status_changed_at'], '0') - status_changed_at = ts.next() - broker.update_status_changed_at(status_changed_at) - self.assertEqual(broker.get_info()['status_changed_at'], - status_changed_at) - # save the old and get a new status_changed_at - old_status_changed_at, status_changed_at = \ - status_changed_at, ts.next() - broker.update_status_changed_at(status_changed_at) - self.assertEqual(broker.get_info()['status_changed_at'], - status_changed_at) - # status changed at won't go backwards... - broker.update_status_changed_at(old_status_changed_at) - self.assertEqual(broker.get_info()['status_changed_at'], - status_changed_at) - if __name__ == '__main__': unittest.main() diff --git a/test/unit/container/test_backend.py b/test/unit/container/test_backend.py index c159477d73..41b0c3f10c 100644 --- a/test/unit/container/test_backend.py +++ b/test/unit/container/test_backend.py @@ -34,6 +34,7 @@ from swift.common.storage_policy import POLICIES import mock from test.unit import patch_policies, with_tempdir +from test.unit.common.test_db import TestExampleBroker class TestContainerBroker(unittest.TestCase): @@ -1390,6 +1391,23 @@ class TestContainerBroker(unittest.TestCase): self.assertEqual(broker.get_policy_stats(), expected) +class TestCommonContainerBroker(TestExampleBroker): + + broker_class = ContainerBroker + + def setUp(self): + super(TestCommonContainerBroker, self).setUp() + self.policy = random.choice(list(POLICIES)) + + def put_item(self, broker, timestamp): + broker.put_object('test', timestamp, 0, 'text/plain', 'x', + storage_policy_index=int(self.policy)) + + def delete_item(self, broker, timestamp): + broker.delete_object('test', timestamp, + storage_policy_index=int(self.policy)) + + class ContainerBrokerMigrationMixin(object): """ Mixin for running ContainerBroker against databases created with