updaters: use replication network

In general, background processes ought to use the replication network;
this reserves the "normal" network for work that's in the client-request
path, including things like inline container updates.

Change-Id: I0d778aac68b101cdcb077d70c250a56a199175a2
This commit is contained in:
Tim Burke 2021-12-02 15:34:43 -08:00 committed by Alistair Coles
parent 1859f2e161
commit eec595b123
5 changed files with 67 additions and 37 deletions

View File

@ -338,12 +338,13 @@ class ContainerUpdater(Daemon):
'X-Backend-Storage-Policy-Index': storage_policy_index,
'user-agent': self.user_agent}
conn = http_connect(
node['ip'], node['port'], node['device'], part,
'PUT', container, headers=headers)
node['replication_ip'], node['replication_port'],
node['device'], part, 'PUT', container, headers=headers)
except (Exception, Timeout):
self.logger.exception(
'ERROR account update failed with '
'%(ip)s:%(port)s/%(device)s (will retry later): ', node)
'%(replication_ip)s:%(replication_port)s/%(device)s '
'(will retry later): ', node)
return HTTP_INTERNAL_SERVER_ERROR
with Timeout(self.node_timeout):
try:
@ -353,7 +354,9 @@ class ContainerUpdater(Daemon):
except (Exception, Timeout):
if self.logger.getEffectiveLevel() <= logging.DEBUG:
self.logger.exception(
'Exception with %(ip)s:%(port)s/%(device)s', node)
'Exception with '
'%(replication_ip)s:%(replication_port)s/%(device)s',
node)
return HTTP_INTERNAL_SERVER_ERROR
finally:
conn.close()

View File

@ -468,8 +468,9 @@ class ObjectUpdater(Daemon):
redirect = None
try:
with ConnectionTimeout(self.conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'],
part, op, obj, headers_out)
conn = http_connect(
node['replication_ip'], node['replication_port'],
node['device'], part, op, obj, headers_out)
with Timeout(self.node_timeout):
resp = conn.getresponse()
resp.read()
@ -487,10 +488,12 @@ class ObjectUpdater(Daemon):
self.logger.debug(
'Error code %(status)d is returned from remote '
'server %(ip)s: %(port)s / %(device)s',
{'status': resp.status, 'ip': node['ip'],
'port': node['port'], 'device': node['device']})
{'status': resp.status, 'ip': node['replication_ip'],
'port': node['replication_port'],
'device': node['device']})
return success, node['id'], redirect
except (Exception, Timeout):
self.logger.exception('ERROR with remote server '
'%(ip)s:%(port)s/%(device)s', node)
self.logger.exception(
'ERROR with remote server '
'%(replication_ip)s:%(replication_port)s/%(device)s', node)
return HTTP_INTERNAL_SERVER_ERROR, node['id'], redirect

View File

@ -48,9 +48,15 @@ class TestContainerUpdater(unittest.TestCase):
with closing(GzipFile(ring_file, 'wb')) as f:
pickle.dump(
RingData([[0, 1, 0, 1], [1, 0, 1, 0]],
[{'id': 0, 'ip': '127.0.0.1', 'port': 12345,
[{'id': 0, 'ip': '127.0.0.2', 'port': 12345,
'replication_ip': '127.0.0.1',
# replication_port may be overridden in tests but
# include here for completeness...
'replication_port': 67890,
'device': 'sda1', 'zone': 0},
{'id': 1, 'ip': '127.0.0.1', 'port': 12345,
{'id': 1, 'ip': '127.0.0.2', 'port': 12345,
'replication_ip': '127.0.0.1',
'replication_port': 67890,
'device': 'sda1', 'zone': 2}], 30),
f)
self.devices_dir = os.path.join(self.testdir, 'devices')
@ -264,12 +270,13 @@ class TestContainerUpdater(unittest.TestCase):
spawned = spawn(spawn_accepts)
for dev in cu.get_account_ring().devs:
if dev is not None:
dev['port'] = bindsock.getsockname()[1]
dev['replication_port'] = bindsock.getsockname()[1]
cu.run_once()
for event in spawned.wait():
err = event.wait()
if err:
raise err
with Timeout(5):
for event in spawned.wait():
err = event.wait()
if err:
raise err
info = cb.get_info()
self.assertEqual(info['object_count'], 1)
self.assertEqual(info['bytes_used'], 3)
@ -338,12 +345,13 @@ class TestContainerUpdater(unittest.TestCase):
spawned = spawn(spawn_accepts)
for dev in cu.get_account_ring().devs:
if dev is not None:
dev['port'] = bindsock.getsockname()[1]
dev['replication_port'] = bindsock.getsockname()[1]
cu.run_once()
for event in spawned.wait():
err = event.wait()
if err:
raise err
with Timeout(5):
for event in spawned.wait():
err = event.wait()
if err:
raise err
info = cb.get_info()
self.assertEqual(info['object_count'], 1)
self.assertEqual(info['bytes_used'], 3)
@ -425,12 +433,13 @@ class TestContainerUpdater(unittest.TestCase):
spawned = spawn(spawn_accepts)
for dev in cu.get_account_ring().devs:
if dev is not None:
dev['port'] = bindsock.getsockname()[1]
dev['replication_port'] = bindsock.getsockname()[1]
cu.run_once()
for event in spawned.wait():
err = event.wait()
if err:
raise err
with Timeout(5):
for event in spawned.wait():
err = event.wait()
if err:
raise err
info = cb.get_info()
self.assertEqual(info['object_count'], 1)
self.assertEqual(info['bytes_used'], 3)
@ -514,12 +523,13 @@ class TestContainerUpdater(unittest.TestCase):
spawned = spawn(spawn_accepts)
for dev in cu.get_account_ring().devs:
if dev is not None:
dev['port'] = bindsock.getsockname()[1]
dev['replication_port'] = bindsock.getsockname()[1]
cu.run_once()
for event in spawned.wait():
err = event.wait()
if err:
raise err
with Timeout(5):
for event in spawned.wait():
err = event.wait()
if err:
raise err
info = cb.get_info()
self.assertEqual(info['object_count'], 1)
self.assertEqual(info['bytes_used'], 3)

View File

@ -1188,6 +1188,8 @@ class TestObjectController(unittest.TestCase):
{'devices': self.testdir,
'mount_check': 'false'}, logger=debug_logger())
node = {'id': 1, 'ip': 'chost', 'port': 3200,
'replication_ip': 'chost_repl',
'replication_port': 6200,
'device': 'cdevice'}
mock_ring = mock.MagicMock()
mock_ring.get_nodes.return_value = (99, [node])
@ -1197,6 +1199,8 @@ class TestObjectController(unittest.TestCase):
self.assertEqual(1, len(conn.requests))
self.assertEqual('/cdevice/99/.sharded_a/c_shard_1/o',
conn.requests[0]['path'])
self.assertEqual(6200, conn.requests[0]['port'])
self.assertEqual('chost_repl', conn.requests[0]['ip'])
def test_PUT_redirected_async_pending(self):
self._check_PUT_redirected_async_pending()

View File

@ -78,13 +78,23 @@ class TestObjectUpdater(unittest.TestCase):
RingData([[0, 1, 2, 0, 1, 2],
[1, 2, 0, 1, 2, 0],
[2, 3, 1, 2, 3, 1]],
[{'id': 0, 'ip': '127.0.0.1', 'port': 1,
[{'id': 0, 'ip': '127.0.0.2', 'port': 1,
'replication_ip': '127.0.0.1',
# replication_port may be overridden in tests but
# include here for completeness...
'replication_port': 67890,
'device': 'sda1', 'zone': 0},
{'id': 1, 'ip': '127.0.0.1', 'port': 1,
{'id': 1, 'ip': '127.0.0.2', 'port': 1,
'replication_ip': '127.0.0.1',
'replication_port': 67890,
'device': 'sda1', 'zone': 2},
{'id': 2, 'ip': '127.0.0.1', 'port': 1,
{'id': 2, 'ip': '127.0.0.2', 'port': 1,
'replication_ip': '127.0.0.1',
'replication_port': 67890,
'device': 'sda1', 'zone': 4},
{'id': 3, 'ip': '127.0.0.1', 'port': 1,
{'id': 3, 'ip': '127.0.0.2', 'port': 1,
'replication_ip': '127.0.0.1',
'replication_port': 67890,
'device': 'sda1', 'zone': 6}], 30),
f)
self.devices_dir = os.path.join(self.testdir, 'devices')
@ -535,7 +545,7 @@ class TestObjectUpdater(unittest.TestCase):
event = spawn(accept, [201, 500, 500])
for dev in ou.get_container_ring().devs:
if dev is not None:
dev['port'] = bindsock.getsockname()[1]
dev['replication_port'] = bindsock.getsockname()[1]
ou.logger._clear()
ou.run_once()