4a13dcc4a8
The current rule inside the db_replicator is to rsync+merge containers during replication if the difference between rowids differ by more than 50%: # if the difference in rowids between the two differs by # more than 50%, rsync then do a remote merge. if rinfo['max_row'] / float(info['max_row']) < 0.5: This mean on smaller containers, that only have few rows, and differ by a small number still rsync+merge rather then copying rows. This change adds a new condition, the difference in the rowids must be greater than the defined per_diff otherwise usync will be used: # if the difference in rowids between the two differs by # more than 50% and the difference is greater than per_diff, # rsync then do a remote merge. # NOTE: difference > per_diff stops us from dropping to rsync # on smaller containers, who have only a few rows to sync. if rinfo['max_row'] / float(info['max_row']) < 0.5 and \ info['max_row'] - rinfo['max_row'] > self.per_diff: Change-Id: I9e779f71bf37714919a525404565dd075762b0d4 Closes-bug: #1019712
146 lines
5.6 KiB
Python
146 lines
5.6 KiB
Python
# Copyright (c) 2010-2012 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import os
|
|
import time
|
|
import unittest
|
|
import shutil
|
|
|
|
from swift.account import replicator, backend, server
|
|
from swift.common.utils import normalize_timestamp
|
|
from swift.common.storage_policy import POLICIES
|
|
|
|
from test.unit.common import test_db_replicator
|
|
|
|
|
|
class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
|
|
|
backend = backend.AccountBroker
|
|
datadir = server.DATADIR
|
|
replicator_daemon = replicator.AccountReplicator
|
|
|
|
def test_sync(self):
|
|
broker = self._get_broker('a', node_index=0)
|
|
put_timestamp = normalize_timestamp(time.time())
|
|
broker.initialize(put_timestamp)
|
|
# "replicate" to same database
|
|
daemon = replicator.AccountReplicator({})
|
|
part, node = self._get_broker_part_node(broker)
|
|
info = broker.get_replication_info()
|
|
success = daemon._repl_to_node(node, broker, part, info)
|
|
# nothing to do
|
|
self.assertTrue(success)
|
|
self.assertEqual(1, daemon.stats['no_change'])
|
|
|
|
def test_sync_remote_missing(self):
|
|
broker = self._get_broker('a', node_index=0)
|
|
put_timestamp = time.time()
|
|
broker.initialize(put_timestamp)
|
|
# "replicate" to all other nodes
|
|
part, node = self._get_broker_part_node(broker)
|
|
daemon = self._run_once(node)
|
|
# complete rsync
|
|
self.assertEqual(2, daemon.stats['rsync'])
|
|
local_info = self._get_broker(
|
|
'a', node_index=0).get_info()
|
|
for i in range(1, 3):
|
|
remote_broker = self._get_broker('a', node_index=i)
|
|
self.assertTrue(os.path.exists(remote_broker.db_file))
|
|
remote_info = remote_broker.get_info()
|
|
for k, v in local_info.items():
|
|
if k == 'id':
|
|
continue
|
|
self.assertEqual(remote_info[k], v,
|
|
"mismatch remote %s %r != %r" % (
|
|
k, remote_info[k], v))
|
|
|
|
def test_sync_remote_missing_most_rows(self):
|
|
put_timestamp = time.time()
|
|
# create "local" broker
|
|
broker = self._get_broker('a', node_index=0)
|
|
broker.initialize(put_timestamp)
|
|
# create "remote" broker
|
|
remote_broker = self._get_broker('a', node_index=1)
|
|
remote_broker.initialize(put_timestamp)
|
|
# add a row to "local" db
|
|
broker.put_container('/a/c', time.time(), 0, 0, 0,
|
|
POLICIES.default.idx)
|
|
# replicate
|
|
daemon = replicator.AccountReplicator({'per_diff': 1})
|
|
|
|
def _rsync_file(db_file, remote_file, **kwargs):
|
|
remote_server, remote_path = remote_file.split('/', 1)
|
|
dest_path = os.path.join(self.root, remote_path)
|
|
shutil.copy(db_file, dest_path)
|
|
return True
|
|
daemon._rsync_file = _rsync_file
|
|
part, node = self._get_broker_part_node(remote_broker)
|
|
info = broker.get_replication_info()
|
|
success = daemon._repl_to_node(node, broker, part, info)
|
|
self.assertTrue(success)
|
|
# row merge
|
|
self.assertEqual(1, daemon.stats['remote_merge'])
|
|
local_info = self._get_broker(
|
|
'a', node_index=0).get_info()
|
|
remote_info = self._get_broker(
|
|
'a', node_index=1).get_info()
|
|
for k, v in local_info.items():
|
|
if k == 'id':
|
|
continue
|
|
self.assertEqual(remote_info[k], v,
|
|
"mismatch remote %s %r != %r" % (
|
|
k, remote_info[k], v))
|
|
|
|
def test_sync_remote_missing_one_rows(self):
|
|
put_timestamp = time.time()
|
|
# create "local" broker
|
|
broker = self._get_broker('a', node_index=0)
|
|
broker.initialize(put_timestamp)
|
|
# create "remote" broker
|
|
remote_broker = self._get_broker('a', node_index=1)
|
|
remote_broker.initialize(put_timestamp)
|
|
# add some rows to both db
|
|
for i in range(10):
|
|
put_timestamp = time.time()
|
|
for db in (broker, remote_broker):
|
|
path = '/a/c_%s' % i
|
|
db.put_container(path, put_timestamp, 0, 0, 0,
|
|
POLICIES.default.idx)
|
|
# now a row to the "local" broker only
|
|
broker.put_container('/a/c_missing', time.time(), 0, 0, 0,
|
|
POLICIES.default.idx)
|
|
# replicate
|
|
daemon = replicator.AccountReplicator({})
|
|
part, node = self._get_broker_part_node(remote_broker)
|
|
info = broker.get_replication_info()
|
|
success = daemon._repl_to_node(node, broker, part, info)
|
|
self.assertTrue(success)
|
|
# row merge
|
|
self.assertEqual(1, daemon.stats['diff'])
|
|
local_info = self._get_broker(
|
|
'a', node_index=0).get_info()
|
|
remote_info = self._get_broker(
|
|
'a', node_index=1).get_info()
|
|
for k, v in local_info.items():
|
|
if k == 'id':
|
|
continue
|
|
self.assertEqual(remote_info[k], v,
|
|
"mismatch remote %s %r != %r" % (
|
|
k, remote_info[k], v))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|