From eec595b123612a58e26116c7d57c0afcde006429 Mon Sep 17 00:00:00 2001 From: Tim Burke Date: Thu, 2 Dec 2021 15:34:43 -0800 Subject: [PATCH] updaters: use replication network In general, background processes ought to use the replication network; this reserves the "normal" network for work that's in the client-request path, including things like inline container updates. Change-Id: I0d778aac68b101cdcb077d70c250a56a199175a2 --- swift/container/updater.py | 11 +++--- swift/obj/updater.py | 15 ++++---- test/unit/container/test_updater.py | 54 +++++++++++++++++------------ test/unit/obj/test_server.py | 4 +++ test/unit/obj/test_updater.py | 20 ++++++++--- 5 files changed, 67 insertions(+), 37 deletions(-) diff --git a/swift/container/updater.py b/swift/container/updater.py index 66375754bd..a22bf0b716 100644 --- a/swift/container/updater.py +++ b/swift/container/updater.py @@ -338,12 +338,13 @@ class ContainerUpdater(Daemon): 'X-Backend-Storage-Policy-Index': storage_policy_index, 'user-agent': self.user_agent} conn = http_connect( - node['ip'], node['port'], node['device'], part, - 'PUT', container, headers=headers) + node['replication_ip'], node['replication_port'], + node['device'], part, 'PUT', container, headers=headers) except (Exception, Timeout): self.logger.exception( 'ERROR account update failed with ' - '%(ip)s:%(port)s/%(device)s (will retry later): ', node) + '%(replication_ip)s:%(replication_port)s/%(device)s ' + '(will retry later): ', node) return HTTP_INTERNAL_SERVER_ERROR with Timeout(self.node_timeout): try: @@ -353,7 +354,9 @@ class ContainerUpdater(Daemon): except (Exception, Timeout): if self.logger.getEffectiveLevel() <= logging.DEBUG: self.logger.exception( - 'Exception with %(ip)s:%(port)s/%(device)s', node) + 'Exception with ' + '%(replication_ip)s:%(replication_port)s/%(device)s', + node) return HTTP_INTERNAL_SERVER_ERROR finally: conn.close() diff --git a/swift/obj/updater.py b/swift/obj/updater.py index 6f22a97dec..d49f12f76f 100644 --- a/swift/obj/updater.py +++ b/swift/obj/updater.py @@ -468,8 +468,9 @@ class ObjectUpdater(Daemon): redirect = None try: with ConnectionTimeout(self.conn_timeout): - conn = http_connect(node['ip'], node['port'], node['device'], - part, op, obj, headers_out) + conn = http_connect( + node['replication_ip'], node['replication_port'], + node['device'], part, op, obj, headers_out) with Timeout(self.node_timeout): resp = conn.getresponse() resp.read() @@ -487,10 +488,12 @@ class ObjectUpdater(Daemon): self.logger.debug( 'Error code %(status)d is returned from remote ' 'server %(ip)s: %(port)s / %(device)s', - {'status': resp.status, 'ip': node['ip'], - 'port': node['port'], 'device': node['device']}) + {'status': resp.status, 'ip': node['replication_ip'], + 'port': node['replication_port'], + 'device': node['device']}) return success, node['id'], redirect except (Exception, Timeout): - self.logger.exception('ERROR with remote server ' - '%(ip)s:%(port)s/%(device)s', node) + self.logger.exception( + 'ERROR with remote server ' + '%(replication_ip)s:%(replication_port)s/%(device)s', node) return HTTP_INTERNAL_SERVER_ERROR, node['id'], redirect diff --git a/test/unit/container/test_updater.py b/test/unit/container/test_updater.py index dd13c23c4b..177477c713 100644 --- a/test/unit/container/test_updater.py +++ b/test/unit/container/test_updater.py @@ -48,9 +48,15 @@ class TestContainerUpdater(unittest.TestCase): with closing(GzipFile(ring_file, 'wb')) as f: pickle.dump( RingData([[0, 1, 0, 1], [1, 0, 1, 0]], - [{'id': 0, 'ip': '127.0.0.1', 'port': 12345, + [{'id': 0, 'ip': '127.0.0.2', 'port': 12345, + 'replication_ip': '127.0.0.1', + # replication_port may be overridden in tests but + # include here for completeness... + 'replication_port': 67890, 'device': 'sda1', 'zone': 0}, - {'id': 1, 'ip': '127.0.0.1', 'port': 12345, + {'id': 1, 'ip': '127.0.0.2', 'port': 12345, + 'replication_ip': '127.0.0.1', + 'replication_port': 67890, 'device': 'sda1', 'zone': 2}], 30), f) self.devices_dir = os.path.join(self.testdir, 'devices') @@ -264,12 +270,13 @@ class TestContainerUpdater(unittest.TestCase): spawned = spawn(spawn_accepts) for dev in cu.get_account_ring().devs: if dev is not None: - dev['port'] = bindsock.getsockname()[1] + dev['replication_port'] = bindsock.getsockname()[1] cu.run_once() - for event in spawned.wait(): - err = event.wait() - if err: - raise err + with Timeout(5): + for event in spawned.wait(): + err = event.wait() + if err: + raise err info = cb.get_info() self.assertEqual(info['object_count'], 1) self.assertEqual(info['bytes_used'], 3) @@ -338,12 +345,13 @@ class TestContainerUpdater(unittest.TestCase): spawned = spawn(spawn_accepts) for dev in cu.get_account_ring().devs: if dev is not None: - dev['port'] = bindsock.getsockname()[1] + dev['replication_port'] = bindsock.getsockname()[1] cu.run_once() - for event in spawned.wait(): - err = event.wait() - if err: - raise err + with Timeout(5): + for event in spawned.wait(): + err = event.wait() + if err: + raise err info = cb.get_info() self.assertEqual(info['object_count'], 1) self.assertEqual(info['bytes_used'], 3) @@ -425,12 +433,13 @@ class TestContainerUpdater(unittest.TestCase): spawned = spawn(spawn_accepts) for dev in cu.get_account_ring().devs: if dev is not None: - dev['port'] = bindsock.getsockname()[1] + dev['replication_port'] = bindsock.getsockname()[1] cu.run_once() - for event in spawned.wait(): - err = event.wait() - if err: - raise err + with Timeout(5): + for event in spawned.wait(): + err = event.wait() + if err: + raise err info = cb.get_info() self.assertEqual(info['object_count'], 1) self.assertEqual(info['bytes_used'], 3) @@ -514,12 +523,13 @@ class TestContainerUpdater(unittest.TestCase): spawned = spawn(spawn_accepts) for dev in cu.get_account_ring().devs: if dev is not None: - dev['port'] = bindsock.getsockname()[1] + dev['replication_port'] = bindsock.getsockname()[1] cu.run_once() - for event in spawned.wait(): - err = event.wait() - if err: - raise err + with Timeout(5): + for event in spawned.wait(): + err = event.wait() + if err: + raise err info = cb.get_info() self.assertEqual(info['object_count'], 1) self.assertEqual(info['bytes_used'], 3) diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index 70040ee6d8..28d88e9a61 100644 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -1188,6 +1188,8 @@ class TestObjectController(unittest.TestCase): {'devices': self.testdir, 'mount_check': 'false'}, logger=debug_logger()) node = {'id': 1, 'ip': 'chost', 'port': 3200, + 'replication_ip': 'chost_repl', + 'replication_port': 6200, 'device': 'cdevice'} mock_ring = mock.MagicMock() mock_ring.get_nodes.return_value = (99, [node]) @@ -1197,6 +1199,8 @@ class TestObjectController(unittest.TestCase): self.assertEqual(1, len(conn.requests)) self.assertEqual('/cdevice/99/.sharded_a/c_shard_1/o', conn.requests[0]['path']) + self.assertEqual(6200, conn.requests[0]['port']) + self.assertEqual('chost_repl', conn.requests[0]['ip']) def test_PUT_redirected_async_pending(self): self._check_PUT_redirected_async_pending() diff --git a/test/unit/obj/test_updater.py b/test/unit/obj/test_updater.py index 4a6aeb18c1..1ba4c8d6ed 100644 --- a/test/unit/obj/test_updater.py +++ b/test/unit/obj/test_updater.py @@ -78,13 +78,23 @@ class TestObjectUpdater(unittest.TestCase): RingData([[0, 1, 2, 0, 1, 2], [1, 2, 0, 1, 2, 0], [2, 3, 1, 2, 3, 1]], - [{'id': 0, 'ip': '127.0.0.1', 'port': 1, + [{'id': 0, 'ip': '127.0.0.2', 'port': 1, + 'replication_ip': '127.0.0.1', + # replication_port may be overridden in tests but + # include here for completeness... + 'replication_port': 67890, 'device': 'sda1', 'zone': 0}, - {'id': 1, 'ip': '127.0.0.1', 'port': 1, + {'id': 1, 'ip': '127.0.0.2', 'port': 1, + 'replication_ip': '127.0.0.1', + 'replication_port': 67890, 'device': 'sda1', 'zone': 2}, - {'id': 2, 'ip': '127.0.0.1', 'port': 1, + {'id': 2, 'ip': '127.0.0.2', 'port': 1, + 'replication_ip': '127.0.0.1', + 'replication_port': 67890, 'device': 'sda1', 'zone': 4}, - {'id': 3, 'ip': '127.0.0.1', 'port': 1, + {'id': 3, 'ip': '127.0.0.2', 'port': 1, + 'replication_ip': '127.0.0.1', + 'replication_port': 67890, 'device': 'sda1', 'zone': 6}], 30), f) self.devices_dir = os.path.join(self.testdir, 'devices') @@ -535,7 +545,7 @@ class TestObjectUpdater(unittest.TestCase): event = spawn(accept, [201, 500, 500]) for dev in ou.get_container_ring().devs: if dev is not None: - dev['port'] = bindsock.getsockname()[1] + dev['replication_port'] = bindsock.getsockname()[1] ou.logger._clear() ou.run_once()