use replication_ip in ssync

Update ssync_sender to use replication_ip and replication_port from the ring.

Those attributes are supposed to allow for a separate replication network, and
are used by rsync replication.

Change-Id: Ib4cc3cbc1503b85dfdfa0edab58a49c95eac5993
This commit is contained in:
Michael Barton 2014-10-16 01:56:48 +00:00
parent 6986f7272c
commit 556568b1c3
2 changed files with 22 additions and 12 deletions

View File

@ -63,8 +63,8 @@ class Sender(object):
except (exceptions.MessageTimeout,
exceptions.ReplicationException) as err:
self.daemon.logger.error(
'%s:%s/%s/%s %s', self.node.get('ip'),
self.node.get('port'), self.node.get('device'),
'%s:%s/%s/%s %s', self.node.get('replication_ip'),
self.node.get('replication_port'), self.node.get('device'),
self.job.get('partition'), err)
except Exception:
# We don't want any exceptions to escape our code and possibly
@ -73,7 +73,8 @@ class Sender(object):
# no such thing.
self.daemon.logger.exception(
'%s:%s/%s/%s EXCEPTION in replication.Sender',
self.node.get('ip'), self.node.get('port'),
self.node.get('replication_ip'),
self.node.get('replication_port'),
self.node.get('device'), self.job.get('partition'))
except Exception:
# We don't want any exceptions to escape our code and possibly
@ -94,7 +95,8 @@ class Sender(object):
with exceptions.MessageTimeout(
self.daemon.conn_timeout, 'connect send'):
self.connection = bufferedhttp.BufferedHTTPConnection(
'%s:%s' % (self.node['ip'], self.node['port']))
'%s:%s' % (self.node['replication_ip'],
self.node['replication_port']))
self.connection.putrequest('REPLICATION', '/%s/%s' % (
self.node['device'], self.job['partition']))
self.connection.putheader('Transfer-Encoding', 'chunked')

View File

@ -132,7 +132,8 @@ class TestSender(unittest.TestCase):
raise exc
with mock.patch.object(ssync_sender.Sender, 'connect', connect):
node = dict(ip='1.2.3.4', port=5678, device='sda1')
node = dict(replication_ip='1.2.3.4', replication_port=5678,
device='sda1')
job = dict(partition='9')
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
self.sender.suffixes = ['abc']
@ -148,7 +149,8 @@ class TestSender(unittest.TestCase):
raise exceptions.ReplicationException('test connect')
with mock.patch.object(ssync_sender.Sender, 'connect', connect):
node = dict(ip='1.2.3.4', port=5678, device='sda1')
node = dict(replication_ip='1.2.3.4', replication_port=5678,
device='sda1')
job = dict(partition='9')
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
self.sender.suffixes = ['abc']
@ -159,7 +161,8 @@ class TestSender(unittest.TestCase):
self.assertEqual(str(call[1][-1]), 'test connect')
def test_call_catches_other_exceptions(self):
node = dict(ip='1.2.3.4', port=5678, device='sda1')
node = dict(replication_ip='1.2.3.4', replication_port=5678,
device='sda1')
job = dict(partition='9')
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
self.sender.suffixes = ['abc']
@ -172,7 +175,8 @@ class TestSender(unittest.TestCase):
'sda1', '9'))
def test_call_catches_exception_handling_exception(self):
node = dict(ip='1.2.3.4', port=5678, device='sda1')
node = dict(replication_ip='1.2.3.4', replication_port=5678,
device='sda1')
job = None # Will cause inside exception handler to fail
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
self.sender.suffixes = ['abc']
@ -208,7 +212,8 @@ class TestSender(unittest.TestCase):
@patch_policies
def test_connect(self):
node = dict(ip='1.2.3.4', port=5678, device='sda1')
node = dict(replication_ip='1.2.3.4', replication_port=5678,
device='sda1')
job = dict(partition='9', policy_idx=1)
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
self.sender.suffixes = ['abc']
@ -240,7 +245,8 @@ class TestSender(unittest.TestCase):
def test_connect_send_timeout(self):
self.replicator.conn_timeout = 0.01
node = dict(ip='1.2.3.4', port=5678, device='sda1')
node = dict(replication_ip='1.2.3.4', replication_port=5678,
device='sda1')
job = dict(partition='9')
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
self.sender.suffixes = ['abc']
@ -259,7 +265,8 @@ class TestSender(unittest.TestCase):
def test_connect_receive_timeout(self):
self.replicator.node_timeout = 0.02
node = dict(ip='1.2.3.4', port=5678, device='sda1')
node = dict(replication_ip='1.2.3.4', replication_port=5678,
device='sda1')
job = dict(partition='9')
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
self.sender.suffixes = ['abc']
@ -280,7 +287,8 @@ class TestSender(unittest.TestCase):
def test_connect_bad_status(self):
self.replicator.node_timeout = 0.02
node = dict(ip='1.2.3.4', port=5678, device='sda1')
node = dict(replication_ip='1.2.3.4', replication_port=5678,
device='sda1')
job = dict(partition='9')
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
self.sender.suffixes = ['abc']