Merge "Latch shard-stat reporting"
This commit is contained in:
commit
c2b03f6779
@ -4869,6 +4869,8 @@ class ShardRange(object):
|
||||
value.
|
||||
:param epoch: optional epoch timestamp which represents the time at which
|
||||
sharding was enabled for a container.
|
||||
:param reported: optional indicator that this shard and its stats have
|
||||
been reported to the root container.
|
||||
"""
|
||||
FOUND = 10
|
||||
CREATED = 20
|
||||
@ -4919,7 +4921,8 @@ class ShardRange(object):
|
||||
|
||||
def __init__(self, name, timestamp, lower=MIN, upper=MAX,
|
||||
object_count=0, bytes_used=0, meta_timestamp=None,
|
||||
deleted=False, state=None, state_timestamp=None, epoch=None):
|
||||
deleted=False, state=None, state_timestamp=None, epoch=None,
|
||||
reported=False):
|
||||
self.account = self.container = self._timestamp = \
|
||||
self._meta_timestamp = self._state_timestamp = self._epoch = None
|
||||
self._lower = ShardRange.MIN
|
||||
@ -4938,6 +4941,7 @@ class ShardRange(object):
|
||||
self.state = self.FOUND if state is None else state
|
||||
self.state_timestamp = state_timestamp
|
||||
self.epoch = epoch
|
||||
self.reported = reported
|
||||
|
||||
@classmethod
|
||||
def _encode(cls, value):
|
||||
@ -5118,8 +5122,14 @@ class ShardRange(object):
|
||||
cast to an int, or if meta_timestamp is neither None nor can be
|
||||
cast to a :class:`~swift.common.utils.Timestamp`.
|
||||
"""
|
||||
if self.object_count != int(object_count):
|
||||
self.object_count = int(object_count)
|
||||
self.reported = False
|
||||
|
||||
if self.bytes_used != int(bytes_used):
|
||||
self.bytes_used = int(bytes_used)
|
||||
self.reported = False
|
||||
|
||||
if meta_timestamp is None:
|
||||
self.meta_timestamp = Timestamp.now()
|
||||
else:
|
||||
@ -5200,6 +5210,14 @@ class ShardRange(object):
|
||||
def epoch(self, epoch):
|
||||
self._epoch = self._to_timestamp(epoch)
|
||||
|
||||
@property
|
||||
def reported(self):
|
||||
return self._reported
|
||||
|
||||
@reported.setter
|
||||
def reported(self, value):
|
||||
self._reported = bool(value)
|
||||
|
||||
def update_state(self, state, state_timestamp=None):
|
||||
"""
|
||||
Set state to the given value and optionally update the state_timestamp
|
||||
@ -5216,6 +5234,7 @@ class ShardRange(object):
|
||||
self.state = state
|
||||
if state_timestamp is not None:
|
||||
self.state_timestamp = state_timestamp
|
||||
self.reported = False
|
||||
return True
|
||||
|
||||
@property
|
||||
@ -5338,6 +5357,7 @@ class ShardRange(object):
|
||||
yield 'state', self.state
|
||||
yield 'state_timestamp', self.state_timestamp.internal
|
||||
yield 'epoch', self.epoch.internal if self.epoch is not None else None
|
||||
yield 'reported', 1 if self.reported else 0
|
||||
|
||||
def copy(self, timestamp=None, **kwargs):
|
||||
"""
|
||||
@ -5369,7 +5389,8 @@ class ShardRange(object):
|
||||
params['name'], params['timestamp'], params['lower'],
|
||||
params['upper'], params['object_count'], params['bytes_used'],
|
||||
params['meta_timestamp'], params['deleted'], params['state'],
|
||||
params['state_timestamp'], params['epoch'])
|
||||
params['state_timestamp'], params['epoch'],
|
||||
params.get('reported', 0))
|
||||
|
||||
|
||||
def find_shard_range(item, ranges):
|
||||
|
@ -60,7 +60,7 @@ SHARD_UPDATE_STATES = [ShardRange.CREATED, ShardRange.CLEAVED,
|
||||
# tuples and vice-versa
|
||||
SHARD_RANGE_KEYS = ('name', 'timestamp', 'lower', 'upper', 'object_count',
|
||||
'bytes_used', 'meta_timestamp', 'deleted', 'state',
|
||||
'state_timestamp', 'epoch')
|
||||
'state_timestamp', 'epoch', 'reported')
|
||||
|
||||
POLICY_STAT_TABLE_CREATE = '''
|
||||
CREATE TABLE policy_stat (
|
||||
@ -267,6 +267,7 @@ def merge_shards(shard_data, existing):
|
||||
if existing['timestamp'] < shard_data['timestamp']:
|
||||
# note that currently we do not roll forward any meta or state from
|
||||
# an item that was created at older time, newer created time trumps
|
||||
shard_data['reported'] = 0 # reset the latch
|
||||
return True
|
||||
elif existing['timestamp'] > shard_data['timestamp']:
|
||||
return False
|
||||
@ -283,6 +284,18 @@ def merge_shards(shard_data, existing):
|
||||
else:
|
||||
new_content = True
|
||||
|
||||
# We can latch the reported flag
|
||||
if existing['reported'] and \
|
||||
existing['object_count'] == shard_data['object_count'] and \
|
||||
existing['bytes_used'] == shard_data['bytes_used'] and \
|
||||
existing['state'] == shard_data['state'] and \
|
||||
existing['epoch'] == shard_data['epoch']:
|
||||
shard_data['reported'] = 1
|
||||
else:
|
||||
shard_data.setdefault('reported', 0)
|
||||
if shard_data['reported'] and not existing['reported']:
|
||||
new_content = True
|
||||
|
||||
if (existing['state_timestamp'] == shard_data['state_timestamp']
|
||||
and shard_data['state'] > existing['state']):
|
||||
new_content = True
|
||||
@ -595,7 +608,8 @@ class ContainerBroker(DatabaseBroker):
|
||||
deleted INTEGER DEFAULT 0,
|
||||
state INTEGER,
|
||||
state_timestamp TEXT,
|
||||
epoch TEXT
|
||||
epoch TEXT,
|
||||
reported INTEGER DEFAULT 0
|
||||
);
|
||||
""" % SHARD_RANGE_TABLE)
|
||||
|
||||
@ -1428,10 +1442,13 @@ class ContainerBroker(DatabaseBroker):
|
||||
# sqlite3.OperationalError: cannot start a transaction
|
||||
# within a transaction
|
||||
conn.rollback()
|
||||
if ('no such table: %s' % SHARD_RANGE_TABLE) not in str(err):
|
||||
raise
|
||||
if 'no such column: reported' in str(err):
|
||||
self._migrate_add_shard_range_reported(conn)
|
||||
return _really_merge_items(conn)
|
||||
if ('no such table: %s' % SHARD_RANGE_TABLE) in str(err):
|
||||
self.create_shard_range_table(conn)
|
||||
return _really_merge_items(conn)
|
||||
raise
|
||||
|
||||
def get_reconciler_sync(self):
|
||||
with self.get() as conn:
|
||||
@ -1579,6 +1596,17 @@ class ContainerBroker(DatabaseBroker):
|
||||
CONTAINER_STAT_VIEW_SCRIPT +
|
||||
'COMMIT;')
|
||||
|
||||
def _migrate_add_shard_range_reported(self, conn):
|
||||
"""
|
||||
Add the reported column to the 'shard_range' table.
|
||||
"""
|
||||
conn.executescript('''
|
||||
BEGIN;
|
||||
ALTER TABLE %s
|
||||
ADD COLUMN reported INTEGER DEFAULT 0;
|
||||
COMMIT;
|
||||
''' % SHARD_RANGE_TABLE)
|
||||
|
||||
def _reclaim_other_stuff(self, conn, age_timestamp, sync_timestamp):
|
||||
super(ContainerBroker, self)._reclaim_other_stuff(
|
||||
conn, age_timestamp, sync_timestamp)
|
||||
@ -1628,7 +1656,7 @@ class ContainerBroker(DatabaseBroker):
|
||||
elif states is not None:
|
||||
included_states.add(states)
|
||||
|
||||
def do_query(conn):
|
||||
def do_query(conn, use_reported_column=True):
|
||||
condition = ''
|
||||
conditions = []
|
||||
params = []
|
||||
@ -1646,21 +1674,27 @@ class ContainerBroker(DatabaseBroker):
|
||||
params.append(self.path)
|
||||
if conditions:
|
||||
condition = ' WHERE ' + ' AND '.join(conditions)
|
||||
if use_reported_column:
|
||||
columns = SHARD_RANGE_KEYS
|
||||
else:
|
||||
columns = SHARD_RANGE_KEYS[:-1] + ('0 as reported', )
|
||||
sql = '''
|
||||
SELECT %s
|
||||
FROM %s%s;
|
||||
''' % (', '.join(SHARD_RANGE_KEYS), SHARD_RANGE_TABLE, condition)
|
||||
''' % (', '.join(columns), SHARD_RANGE_TABLE, condition)
|
||||
data = conn.execute(sql, params)
|
||||
data.row_factory = None
|
||||
return [row for row in data]
|
||||
|
||||
try:
|
||||
with self.maybe_get(connection) as conn:
|
||||
try:
|
||||
return do_query(conn)
|
||||
except sqlite3.OperationalError as err:
|
||||
if ('no such table: %s' % SHARD_RANGE_TABLE) not in str(err):
|
||||
raise
|
||||
if ('no such table: %s' % SHARD_RANGE_TABLE) in str(err):
|
||||
return []
|
||||
if 'no such column: reported' in str(err):
|
||||
return do_query(conn, use_reported_column=False)
|
||||
raise
|
||||
|
||||
@classmethod
|
||||
def resolve_shard_range_states(cls, states):
|
||||
|
@ -618,7 +618,8 @@ class ContainerSharder(ContainerReplicator):
|
||||
|
||||
def _send_shard_ranges(self, account, container, shard_ranges,
|
||||
headers=None):
|
||||
body = json.dumps([dict(sr) for sr in shard_ranges]).encode('ascii')
|
||||
body = json.dumps([dict(sr, reported=0)
|
||||
for sr in shard_ranges]).encode('ascii')
|
||||
part, nodes = self.ring.get_nodes(account, container)
|
||||
headers = headers or {}
|
||||
headers.update({'X-Backend-Record-Type': RECORD_TYPE_SHARD,
|
||||
@ -1469,7 +1470,7 @@ class ContainerSharder(ContainerReplicator):
|
||||
|
||||
def _update_root_container(self, broker):
|
||||
own_shard_range = broker.get_own_shard_range(no_default=True)
|
||||
if not own_shard_range:
|
||||
if not own_shard_range or own_shard_range.reported:
|
||||
return
|
||||
|
||||
# persist the reported shard metadata
|
||||
@ -1479,9 +1480,12 @@ class ContainerSharder(ContainerReplicator):
|
||||
include_own=True,
|
||||
include_deleted=True)
|
||||
# send everything
|
||||
self._send_shard_ranges(
|
||||
broker.root_account, broker.root_container,
|
||||
shard_ranges)
|
||||
if self._send_shard_ranges(
|
||||
broker.root_account, broker.root_container, shard_ranges):
|
||||
# on success, mark ourselves as reported so we don't keep
|
||||
# hammering the root
|
||||
own_shard_range.reported = True
|
||||
broker.merge_shard_ranges(own_shard_range)
|
||||
|
||||
def _process_broker(self, broker, node, part):
|
||||
broker.get_info() # make sure account/container are populated
|
||||
|
@ -189,6 +189,7 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
' "meta_timestamp": "%s",' % now.internal,
|
||||
' "name": "a/c",',
|
||||
' "object_count": 0,',
|
||||
' "reported": 0,',
|
||||
' "state": "sharding",',
|
||||
' "state_timestamp": "%s",' % now.internal,
|
||||
' "timestamp": "%s",' % now.internal,
|
||||
@ -230,6 +231,7 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
' "meta_timestamp": "%s",' % now.internal,
|
||||
' "name": "a/c",',
|
||||
' "object_count": 0,',
|
||||
' "reported": 0,',
|
||||
' "state": "sharding",',
|
||||
' "state_timestamp": "%s",' % now.internal,
|
||||
' "timestamp": "%s",' % now.internal,
|
||||
|
@ -7354,7 +7354,8 @@ class TestShardRange(unittest.TestCase):
|
||||
upper='', object_count=0, bytes_used=0,
|
||||
meta_timestamp=ts_1.internal, deleted=0,
|
||||
state=utils.ShardRange.FOUND,
|
||||
state_timestamp=ts_1.internal, epoch=None)
|
||||
state_timestamp=ts_1.internal, epoch=None,
|
||||
reported=0)
|
||||
assert_initialisation_ok(dict(empty_run, name='a/c', timestamp=ts_1),
|
||||
expect)
|
||||
assert_initialisation_ok(dict(name='a/c', timestamp=ts_1), expect)
|
||||
@ -7363,11 +7364,13 @@ class TestShardRange(unittest.TestCase):
|
||||
upper='u', object_count=2, bytes_used=10,
|
||||
meta_timestamp=ts_2, deleted=0,
|
||||
state=utils.ShardRange.CREATED,
|
||||
state_timestamp=ts_3.internal, epoch=ts_4)
|
||||
state_timestamp=ts_3.internal, epoch=ts_4,
|
||||
reported=0)
|
||||
expect.update({'lower': 'l', 'upper': 'u', 'object_count': 2,
|
||||
'bytes_used': 10, 'meta_timestamp': ts_2.internal,
|
||||
'state': utils.ShardRange.CREATED,
|
||||
'state_timestamp': ts_3.internal, 'epoch': ts_4})
|
||||
'state_timestamp': ts_3.internal, 'epoch': ts_4,
|
||||
'reported': 0})
|
||||
assert_initialisation_ok(good_run.copy(), expect)
|
||||
|
||||
# obj count and bytes used as int strings
|
||||
@ -7385,6 +7388,11 @@ class TestShardRange(unittest.TestCase):
|
||||
assert_initialisation_ok(good_deleted,
|
||||
dict(expect, deleted=1))
|
||||
|
||||
good_reported = good_run.copy()
|
||||
good_reported['reported'] = 1
|
||||
assert_initialisation_ok(good_reported,
|
||||
dict(expect, reported=1))
|
||||
|
||||
assert_initialisation_fails(dict(good_run, timestamp='water balloon'))
|
||||
|
||||
assert_initialisation_fails(
|
||||
@ -7423,7 +7431,7 @@ class TestShardRange(unittest.TestCase):
|
||||
'upper': upper, 'object_count': 10, 'bytes_used': 100,
|
||||
'meta_timestamp': ts_2.internal, 'deleted': 0,
|
||||
'state': utils.ShardRange.FOUND, 'state_timestamp': ts_3.internal,
|
||||
'epoch': ts_4}
|
||||
'epoch': ts_4, 'reported': 0}
|
||||
self.assertEqual(expected, sr_dict)
|
||||
self.assertIsInstance(sr_dict['lower'], six.string_types)
|
||||
self.assertIsInstance(sr_dict['upper'], six.string_types)
|
||||
@ -7438,6 +7446,14 @@ class TestShardRange(unittest.TestCase):
|
||||
for key in sr_dict:
|
||||
bad_dict = dict(sr_dict)
|
||||
bad_dict.pop(key)
|
||||
if key == 'reported':
|
||||
# This was added after the fact, and we need to be able to eat
|
||||
# data from old servers
|
||||
utils.ShardRange.from_dict(bad_dict)
|
||||
utils.ShardRange(**bad_dict)
|
||||
continue
|
||||
|
||||
# The rest were present from the beginning
|
||||
with self.assertRaises(KeyError):
|
||||
utils.ShardRange.from_dict(bad_dict)
|
||||
# But __init__ still (generally) works!
|
||||
|
@ -735,10 +735,12 @@ class TestContainerBroker(unittest.TestCase):
|
||||
self.assertEqual(info['put_timestamp'], start.internal)
|
||||
self.assertTrue(Timestamp(info['created_at']) >= start)
|
||||
self.assertEqual(info['delete_timestamp'], '0')
|
||||
if self.__class__ in (TestContainerBrokerBeforeMetadata,
|
||||
if self.__class__ in (
|
||||
TestContainerBrokerBeforeMetadata,
|
||||
TestContainerBrokerBeforeXSync,
|
||||
TestContainerBrokerBeforeSPI,
|
||||
TestContainerBrokerBeforeShardRanges):
|
||||
TestContainerBrokerBeforeShardRanges,
|
||||
TestContainerBrokerBeforeShardRangeReportedColumn):
|
||||
self.assertEqual(info['status_changed_at'], '0')
|
||||
else:
|
||||
self.assertEqual(info['status_changed_at'],
|
||||
@ -1025,6 +1027,8 @@ class TestContainerBroker(unittest.TestCase):
|
||||
"SELECT object_count FROM shard_range").fetchone()[0], 0)
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT bytes_used FROM shard_range").fetchone()[0], 0)
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT reported FROM shard_range").fetchone()[0], 0)
|
||||
|
||||
# Reput same event
|
||||
broker.merge_shard_ranges(
|
||||
@ -1050,6 +1054,64 @@ class TestContainerBroker(unittest.TestCase):
|
||||
"SELECT object_count FROM shard_range").fetchone()[0], 0)
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT bytes_used FROM shard_range").fetchone()[0], 0)
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT reported FROM shard_range").fetchone()[0], 0)
|
||||
|
||||
# Mark it as reported
|
||||
broker.merge_shard_ranges(
|
||||
ShardRange('"a/{<shardrange \'&\' name>}"', timestamp,
|
||||
'low', 'up', meta_timestamp=meta_timestamp,
|
||||
reported=True))
|
||||
with broker.get() as conn:
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT name FROM shard_range").fetchone()[0],
|
||||
'"a/{<shardrange \'&\' name>}"')
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT timestamp FROM shard_range").fetchone()[0],
|
||||
timestamp)
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT meta_timestamp FROM shard_range").fetchone()[0],
|
||||
meta_timestamp)
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT lower FROM shard_range").fetchone()[0], 'low')
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT upper FROM shard_range").fetchone()[0], 'up')
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT deleted FROM shard_range").fetchone()[0], 0)
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT object_count FROM shard_range").fetchone()[0], 0)
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT bytes_used FROM shard_range").fetchone()[0], 0)
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT reported FROM shard_range").fetchone()[0], 1)
|
||||
|
||||
# Reporting latches it
|
||||
broker.merge_shard_ranges(
|
||||
ShardRange('"a/{<shardrange \'&\' name>}"', timestamp,
|
||||
'low', 'up', meta_timestamp=meta_timestamp,
|
||||
reported=False))
|
||||
with broker.get() as conn:
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT name FROM shard_range").fetchone()[0],
|
||||
'"a/{<shardrange \'&\' name>}"')
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT timestamp FROM shard_range").fetchone()[0],
|
||||
timestamp)
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT meta_timestamp FROM shard_range").fetchone()[0],
|
||||
meta_timestamp)
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT lower FROM shard_range").fetchone()[0], 'low')
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT upper FROM shard_range").fetchone()[0], 'up')
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT deleted FROM shard_range").fetchone()[0], 0)
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT object_count FROM shard_range").fetchone()[0], 0)
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT bytes_used FROM shard_range").fetchone()[0], 0)
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT reported FROM shard_range").fetchone()[0], 1)
|
||||
|
||||
# Put new event
|
||||
timestamp = next(self.ts).internal
|
||||
@ -1077,11 +1139,14 @@ class TestContainerBroker(unittest.TestCase):
|
||||
"SELECT object_count FROM shard_range").fetchone()[0], 1)
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT bytes_used FROM shard_range").fetchone()[0], 2)
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT reported FROM shard_range").fetchone()[0], 0)
|
||||
|
||||
# Put old event
|
||||
broker.merge_shard_ranges(
|
||||
ShardRange('"a/{<shardrange \'&\' name>}"', old_put_timestamp,
|
||||
'lower', 'upper', 1, 2, meta_timestamp=meta_timestamp))
|
||||
'lower', 'upper', 1, 2, meta_timestamp=meta_timestamp,
|
||||
reported=True))
|
||||
with broker.get() as conn:
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT name FROM shard_range").fetchone()[0],
|
||||
@ -1102,6 +1167,8 @@ class TestContainerBroker(unittest.TestCase):
|
||||
"SELECT object_count FROM shard_range").fetchone()[0], 1)
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT bytes_used FROM shard_range").fetchone()[0], 2)
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT reported FROM shard_range").fetchone()[0], 0)
|
||||
|
||||
# Put old delete event
|
||||
broker.merge_shard_ranges(
|
||||
@ -1978,10 +2045,12 @@ class TestContainerBroker(unittest.TestCase):
|
||||
self.assertEqual(info['hash'], '00000000000000000000000000000000')
|
||||
self.assertEqual(info['put_timestamp'], Timestamp(1).internal)
|
||||
self.assertEqual(info['delete_timestamp'], '0')
|
||||
if self.__class__ in (TestContainerBrokerBeforeMetadata,
|
||||
if self.__class__ in (
|
||||
TestContainerBrokerBeforeMetadata,
|
||||
TestContainerBrokerBeforeXSync,
|
||||
TestContainerBrokerBeforeSPI,
|
||||
TestContainerBrokerBeforeShardRanges):
|
||||
TestContainerBrokerBeforeShardRanges,
|
||||
TestContainerBrokerBeforeShardRangeReportedColumn):
|
||||
self.assertEqual(info['status_changed_at'], '0')
|
||||
else:
|
||||
self.assertEqual(info['status_changed_at'],
|
||||
@ -3275,10 +3344,12 @@ class TestContainerBroker(unittest.TestCase):
|
||||
self.assertEqual(0, info['storage_policy_index']) # sanity check
|
||||
self.assertEqual(0, info['object_count'])
|
||||
self.assertEqual(0, info['bytes_used'])
|
||||
if self.__class__ in (TestContainerBrokerBeforeMetadata,
|
||||
if self.__class__ in (
|
||||
TestContainerBrokerBeforeMetadata,
|
||||
TestContainerBrokerBeforeXSync,
|
||||
TestContainerBrokerBeforeSPI,
|
||||
TestContainerBrokerBeforeShardRanges):
|
||||
TestContainerBrokerBeforeShardRanges,
|
||||
TestContainerBrokerBeforeShardRangeReportedColumn):
|
||||
self.assertEqual(info['status_changed_at'], '0')
|
||||
else:
|
||||
self.assertEqual(timestamp.internal, info['status_changed_at'])
|
||||
@ -5315,6 +5386,75 @@ class TestContainerBrokerBeforeShardRanges(ContainerBrokerMigrationMixin,
|
||||
FROM shard_range''')
|
||||
|
||||
|
||||
def pre_reported_create_shard_range_table(self, conn):
|
||||
"""
|
||||
Copied from ContainerBroker before the
|
||||
reported column was added; used for testing with
|
||||
TestContainerBrokerBeforeShardRangeReportedColumn.
|
||||
|
||||
Create a shard_range table with no 'reported' column.
|
||||
|
||||
:param conn: DB connection object
|
||||
"""
|
||||
conn.execute("""
|
||||
CREATE TABLE shard_range (
|
||||
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
name TEXT,
|
||||
timestamp TEXT,
|
||||
lower TEXT,
|
||||
upper TEXT,
|
||||
object_count INTEGER DEFAULT 0,
|
||||
bytes_used INTEGER DEFAULT 0,
|
||||
meta_timestamp TEXT,
|
||||
deleted INTEGER DEFAULT 0,
|
||||
state INTEGER,
|
||||
state_timestamp TEXT,
|
||||
epoch TEXT
|
||||
);
|
||||
""")
|
||||
|
||||
conn.execute("""
|
||||
CREATE TRIGGER shard_range_update BEFORE UPDATE ON shard_range
|
||||
BEGIN
|
||||
SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT');
|
||||
END;
|
||||
""")
|
||||
|
||||
|
||||
class TestContainerBrokerBeforeShardRangeReportedColumn(
|
||||
ContainerBrokerMigrationMixin, TestContainerBroker):
|
||||
"""
|
||||
Tests for ContainerBroker against databases created
|
||||
before the shard_ranges table was added.
|
||||
"""
|
||||
# *grumble grumble* This should include container_info/policy_stat :-/
|
||||
expected_db_tables = {'outgoing_sync', 'incoming_sync', 'object',
|
||||
'sqlite_sequence', 'container_stat', 'shard_range'}
|
||||
|
||||
def setUp(self):
|
||||
super(TestContainerBrokerBeforeShardRangeReportedColumn,
|
||||
self).setUp()
|
||||
ContainerBroker.create_shard_range_table = \
|
||||
pre_reported_create_shard_range_table
|
||||
|
||||
broker = ContainerBroker(':memory:', account='a', container='c')
|
||||
broker.initialize(Timestamp('1').internal, 0)
|
||||
with self.assertRaises(sqlite3.DatabaseError) as raised, \
|
||||
broker.get() as conn:
|
||||
conn.execute('''SELECT reported
|
||||
FROM shard_range''')
|
||||
self.assertIn('no such column: reported', str(raised.exception))
|
||||
|
||||
def tearDown(self):
|
||||
super(TestContainerBrokerBeforeShardRangeReportedColumn,
|
||||
self).tearDown()
|
||||
broker = ContainerBroker(':memory:', account='a', container='c')
|
||||
broker.initialize(Timestamp('1').internal, 0)
|
||||
with broker.get() as conn:
|
||||
conn.execute('''SELECT reported
|
||||
FROM shard_range''')
|
||||
|
||||
|
||||
class TestUpdateNewItemFromExisting(unittest.TestCase):
|
||||
# TODO: add test scenarios that have swift_bytes in content_type
|
||||
t0 = '1234567890.00000'
|
||||
|
@ -4189,6 +4189,7 @@ class TestSharder(BaseTestSharder):
|
||||
def capture_send(conn, data):
|
||||
bodies.append(data)
|
||||
|
||||
self.assertFalse(broker.get_own_shard_range().reported) # sanity
|
||||
with self._mock_sharder() as sharder:
|
||||
with mocked_http_conn(204, 204, 204,
|
||||
give_send=capture_send) as mock_conn:
|
||||
@ -4198,6 +4199,7 @@ class TestSharder(BaseTestSharder):
|
||||
self.assertEqual('PUT', req['method'])
|
||||
self.assertEqual([expected_sent] * 3,
|
||||
[json.loads(b) for b in bodies])
|
||||
self.assertTrue(broker.get_own_shard_range().reported)
|
||||
|
||||
def test_update_root_container_own_range(self):
|
||||
broker = self._make_broker()
|
||||
@ -4230,6 +4232,32 @@ class TestSharder(BaseTestSharder):
|
||||
with annotate_failure(state):
|
||||
check_only_own_shard_range_sent(state)
|
||||
|
||||
def test_update_root_container_already_reported(self):
|
||||
broker = self._make_broker()
|
||||
|
||||
def check_already_reported_not_sent(state):
|
||||
own_shard_range = broker.get_own_shard_range()
|
||||
|
||||
own_shard_range.reported = True
|
||||
self.assertTrue(own_shard_range.update_state(
|
||||
state, state_timestamp=next(self.ts_iter)))
|
||||
# Check that updating state clears the flag
|
||||
self.assertFalse(own_shard_range.reported)
|
||||
|
||||
# If we claim to have already updated...
|
||||
own_shard_range.reported = True
|
||||
broker.merge_shard_ranges([own_shard_range])
|
||||
|
||||
# ... then there's nothing to send
|
||||
with self._mock_sharder() as sharder:
|
||||
with mocked_http_conn() as mock_conn:
|
||||
sharder._update_root_container(broker)
|
||||
self.assertFalse(mock_conn.requests)
|
||||
|
||||
for state in ShardRange.STATES:
|
||||
with annotate_failure(state):
|
||||
check_already_reported_not_sent(state)
|
||||
|
||||
def test_update_root_container_all_ranges(self):
|
||||
broker = self._make_broker()
|
||||
other_shard_ranges = self._make_shard_ranges((('', 'h'), ('h', '')))
|
||||
|
Loading…
Reference in New Issue
Block a user