From c96d5c671db9c96f65067d93c0a307cf4b7d91b4 Mon Sep 17 00:00:00 2001 From: oshritf Date: Thu, 18 Feb 2016 14:50:08 +0200 Subject: [PATCH] Per container stat. report In addition to the container sync stat. report, keeping per container statistics allows administrator with more control over bytes transfered over a specific time per user account: The per container stats are crucial for billing purposes and provides the operator a 'progress bar' equivalent on the container's replication status. Change-Id: Ia8abcdaf53e466e8d60a957c76e32c2b2c5dc3fa --- doc/source/overview_container_sync.rst | 44 ++++++++ swift/container/sync.py | 142 +++++++++++++++++-------- test/unit/container/test_sync.py | 94 ++++++++++++++-- 3 files changed, 226 insertions(+), 54 deletions(-) diff --git a/doc/source/overview_container_sync.rst b/doc/source/overview_container_sync.rst index 25772bdf1e..e69ec2743e 100644 --- a/doc/source/overview_container_sync.rst +++ b/doc/source/overview_container_sync.rst @@ -121,6 +121,50 @@ should be noted there is no way for an end user to detect sync progress or problems other than HEADing both containers and comparing the overall information. + + +----------------------------- +Container Sync Statistics +----------------------------- + +Container Sync INFO level logs contains activity metrics and accounting +information foe insightful tracking. +Currently two different statistics are collected: + +About once an hour or so, accumulated statistics of all operations performed +by Container Sync are reported to the log file with the following format: +"Since (time): (sync) synced [(delete) deletes, (put) puts], (skip) skipped, +(fail) failed" +time: last report time +sync: number of containers with sync turned on that were successfully synced +delete: number of successful DELETE object requests to the target cluster +put: number of successful PUT object request to the target cluster +skip: number of containers whose sync has been turned off, but are not +yet cleared from the sync store +fail: number of containers with failure (due to exception, timeout or other +reason) + +For each container synced, per container statistics are reported with the +following format: +Container sync report: (container), time window start: (start), time window +end: %(end), puts: (puts), posts: (posts), deletes: (deletes), bytes: (bytes), +sync_point1: (point1), sync_point2: (point2), total_rows: (total) +container: account/container statistics are for +start: report start time +end: report end time +puts: number of successful PUT object requests to the target container +posts: N/A (0) +deletes: number of successful DELETE object requests to the target container +bytes: number of bytes sent over the network to the target container +point1: progress indication - the container's x_container_sync_point1 +point2: progress indication - the container's x_container_sync_point2 +total: number of objects processed at the container + +it is possible that more than one server syncs a container, therefore logfiles +from all servers need to be evaluated + + + ---------------------------------------------------------- Using the ``swift`` tool to set up synchronized containers ---------------------------------------------------------- diff --git a/swift/container/sync.py b/swift/container/sync.py index f72bc4838f..2ff4bff5c4 100644 --- a/swift/container/sync.py +++ b/swift/container/sync.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import collections import errno import os import uuid @@ -198,6 +199,14 @@ class ContainerSync(Daemon): self.container_skips = 0 #: Number of containers that had a failure of some type. self.container_failures = 0 + + #: Per container stats. These are collected per container. + #: puts - the number of puts that were done for the container + #: deletes - the number of deletes that were fot the container + #: bytes - the total number of bytes transferred per the container + self.container_stats = collections.defaultdict(int) + self.container_stats.clear() + #: Time of last stats report. self.reported = time() self.swift_dir = conf.get('swift_dir', '/etc/swift') @@ -239,6 +248,7 @@ class ContainerSync(Daemon): while True: begin = time() for path in self.sync_store.synced_containers_generator(): + self.container_stats.clear() self.container_sync(path) if time() - self.reported >= 3600: # once an hour self.report() @@ -282,6 +292,30 @@ class ContainerSync(Daemon): self.container_skips = 0 self.container_failures = 0 + def container_report(self, start, end, sync_point1, sync_point2, info, + max_row): + self.logger.info(_('Container sync report: %(container)s, ' + 'time window start: %(start)s, ' + 'time window end: %(end)s, ' + 'puts: %(puts)s, ' + 'posts: %(posts)s, ' + 'deletes: %(deletes)s, ' + 'bytes: %(bytes)s, ' + 'sync_point1: %(point1)s, ' + 'sync_point2: %(point2)s, ' + 'total_rows: %(total)s'), + {'container': '%s/%s' % (info['account'], + info['container']), + 'start': start, + 'end': end, + 'puts': self.container_stats['puts'], + 'posts': 0, + 'deletes': self.container_stats['deletes'], + 'bytes': self.container_stats['bytes'], + 'point1': sync_point1, + 'point2': sync_point2, + 'total': max_row}) + def container_sync(self, path): """ Checks the given path for a container database, determines if syncing @@ -339,51 +373,68 @@ class ContainerSync(Daemon): self.container_failures += 1 self.logger.increment('failures') return - stop_at = time() + self.container_time + start_at = time() + stop_at = start_at + self.container_time next_sync_point = None - while time() < stop_at and sync_point2 < sync_point1: - rows = broker.get_items_since(sync_point2, 1) - if not rows: - break - row = rows[0] - if row['ROWID'] > sync_point1: - break - # This node will only initially sync out one third of the - # objects (if 3 replicas, 1/4 if 4, etc.) and will skip - # problematic rows as needed in case of faults. - # This section will attempt to sync previously skipped - # rows in case the previous attempts by any of the nodes - # didn't succeed. - if not self.container_sync_row( - row, sync_to, user_key, broker, info, realm, - realm_key): - if not next_sync_point: - next_sync_point = sync_point2 - sync_point2 = row['ROWID'] - broker.set_x_container_sync_points(None, sync_point2) - if next_sync_point: - broker.set_x_container_sync_points(None, next_sync_point) - while time() < stop_at: - rows = broker.get_items_since(sync_point1, 1) - if not rows: - break - row = rows[0] - key = hash_path(info['account'], info['container'], - row['name'], raw_digest=True) - # This node will only initially sync out one third of the - # objects (if 3 replicas, 1/4 if 4, etc.). It'll come back - # around to the section above and attempt to sync - # previously skipped rows in case the other nodes didn't - # succeed or in case it failed to do so the first time. - if unpack_from('>I', key)[0] % \ - len(nodes) == ordinal: - self.container_sync_row( - row, sync_to, user_key, broker, info, realm, - realm_key) - sync_point1 = row['ROWID'] - broker.set_x_container_sync_points(sync_point1, None) - self.container_syncs += 1 - self.logger.increment('syncs') + sync_stage_time = start_at + try: + while time() < stop_at and sync_point2 < sync_point1: + rows = broker.get_items_since(sync_point2, 1) + if not rows: + break + row = rows[0] + if row['ROWID'] > sync_point1: + break + # This node will only initially sync out one third + # of the objects (if 3 replicas, 1/4 if 4, etc.) + # and will skip problematic rows as needed in case of + # faults. + # This section will attempt to sync previously skipped + # rows in case the previous attempts by any of the + # nodes didn't succeed. + if not self.container_sync_row( + row, sync_to, user_key, broker, info, realm, + realm_key): + if not next_sync_point: + next_sync_point = sync_point2 + sync_point2 = row['ROWID'] + broker.set_x_container_sync_points(None, sync_point2) + if next_sync_point: + broker.set_x_container_sync_points(None, + next_sync_point) + else: + next_sync_point = sync_point2 + sync_stage_time = time() + while sync_stage_time < stop_at: + rows = broker.get_items_since(sync_point1, 1) + if not rows: + break + row = rows[0] + key = hash_path(info['account'], info['container'], + row['name'], raw_digest=True) + # This node will only initially sync out one third of + # the objects (if 3 replicas, 1/4 if 4, etc.). + # It'll come back around to the section above + # and attempt to sync previously skipped rows in case + # the other nodes didn't succeed or in case it failed + # to do so the first time. + if unpack_from('>I', key)[0] % \ + len(nodes) == ordinal: + self.container_sync_row( + row, sync_to, user_key, broker, info, realm, + realm_key) + sync_point1 = row['ROWID'] + broker.set_x_container_sync_points(sync_point1, None) + sync_stage_time = time() + self.container_syncs += 1 + self.logger.increment('syncs') + except Exception as ex: + raise ex + finally: + self.container_report(start_at, sync_stage_time, + sync_point1, + next_sync_point, + info, broker.get_max_row()) except (Exception, Timeout): self.container_failures += 1 self.logger.increment('failures') @@ -506,6 +557,7 @@ class ContainerSync(Daemon): if err.http_status != HTTP_NOT_FOUND: raise self.container_deletes += 1 + self.container_stats['deletes'] += 1 self.logger.increment('deletes') self.logger.timing_since('deletes.timing', start_time) else: @@ -556,6 +608,8 @@ class ContainerSync(Daemon): proxy=self.select_http_proxy(), logger=self.logger, timeout=self.conn_timeout) self.container_puts += 1 + self.container_stats['puts'] += 1 + self.container_stats['bytes'] += row['size'] self.logger.increment('puts') self.logger.timing_since('puts.timing', start_time) except ClientException as err: diff --git a/test/unit/container/test_sync.py b/test/unit/container/test_sync.py index ef4a4f5a82..8adc282235 100644 --- a/test/unit/container/test_sync.py +++ b/test/unit/container/test_sync.py @@ -58,6 +58,9 @@ class FakeContainerBroker(object): self.sync_point1 = -1 self.sync_point2 = -1 + def get_max_row(self): + return 1 + def get_info(self): return self.info @@ -736,6 +739,67 @@ class TestContainerSync(unittest.TestCase): sync.hash_path = orig_hash_path sync.delete_object = orig_delete_object + def test_container_report(self): + container_stats = {'puts': 0, + 'deletes': 0, + 'bytes': 0} + + def fake_container_sync_row(self, row, sync_to, + user_key, broker, info, realm, realm_key): + if 'deleted' in row: + container_stats['deletes'] += 1 + return True + + container_stats['puts'] += 1 + container_stats['bytes'] += row['size'] + return True + + def fake_hash_path(account, container, obj, raw_digest=False): + # Ensures that no rows match for second loop, ordinal is 0 and + # all hashes are 1 + return '\x01' * 16 + + fcb = FakeContainerBroker( + 'path', + info={'account': 'a', 'container': 'c', + 'storage_policy_index': 0, + 'x_container_sync_point1': 5, + 'x_container_sync_point2': -1}, + metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), + 'x-container-sync-key': ('key', 1)}, + items_since=[{'ROWID': 1, 'name': 'o1', 'size': 0, + 'deleted': True}, + {'ROWID': 2, 'name': 'o2', 'size': 1010}, + {'ROWID': 3, 'name': 'o3', 'size': 0, + 'deleted': True}, + {'ROWID': 4, 'name': 'o4', 'size': 90}, + {'ROWID': 5, 'name': 'o5', 'size': 0}]) + + with mock.patch('swift.container.sync.InternalClient'), \ + mock.patch('swift.container.sync.hash_path', + fake_hash_path), \ + mock.patch('swift.container.sync.ContainerBroker', + lambda p: fcb): + cring = FakeRing() + cs = sync.ContainerSync({}, container_ring=cring, + logger=self.logger) + cs.container_stats = container_stats + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + cs.allowed_sync_hosts = ['127.0.0.1'] + funcType = type(sync.ContainerSync.container_sync_row) + cs.container_sync_row = funcType(fake_container_sync_row, + cs, sync.ContainerSync) + cs.container_sync('isa.db') + # Succeeds because no rows match + log_line = cs.logger.get_lines_for_level('info')[0] + lines = log_line.split(',') + self.assertTrue('sync_point2: 5', lines.pop().strip()) + self.assertTrue('sync_point1: 5', lines.pop().strip()) + self.assertTrue('bytes: 1100', lines.pop().strip()) + self.assertTrue('deletes: 2', lines.pop().strip()) + self.assertTrue('puts: 3', lines.pop().strip()) + def test_container_sync_row_delete(self): self._test_container_sync_row_delete(None, None) @@ -783,7 +847,8 @@ class TestContainerSync(unittest.TestCase): self.assertTrue(cs.container_sync_row( {'deleted': True, 'name': 'object', - 'created_at': created_at}, 'http://sync/to/path', + 'created_at': created_at, + 'size': '1000'}, 'http://sync/to/path', 'key', FakeContainerBroker('broker'), {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, realm, realm_key)) @@ -925,7 +990,8 @@ class TestContainerSync(unittest.TestCase): self.assertTrue(cs.container_sync_row( {'deleted': False, 'name': 'object', - 'created_at': created_at}, 'http://sync/to/path', + 'created_at': created_at, + 'size': 50}, 'http://sync/to/path', 'key', FakeContainerBroker('broker'), {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, realm, realm_key)) @@ -953,7 +1019,8 @@ class TestContainerSync(unittest.TestCase): self.assertTrue(cs.container_sync_row( {'deleted': False, 'name': 'object', - 'created_at': timestamp.internal}, 'http://sync/to/path', + 'created_at': timestamp.internal, + 'size': 60}, 'http://sync/to/path', 'key', FakeContainerBroker('broker'), {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, realm, realm_key)) @@ -966,7 +1033,8 @@ class TestContainerSync(unittest.TestCase): self.assertTrue(cs.container_sync_row( {'deleted': False, 'name': 'object', - 'created_at': '1.1'}, 'http://sync/to/path', + 'created_at': '1.1', + 'size': 60}, 'http://sync/to/path', 'key', FakeContainerBroker('broker'), {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, realm, realm_key)) @@ -987,7 +1055,8 @@ class TestContainerSync(unittest.TestCase): self.assertFalse(cs.container_sync_row( {'deleted': False, 'name': 'object', - 'created_at': timestamp.internal}, 'http://sync/to/path', + 'created_at': timestamp.internal, + 'size': 70}, 'http://sync/to/path', 'key', FakeContainerBroker('broker'), {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, realm, realm_key)) @@ -1011,7 +1080,8 @@ class TestContainerSync(unittest.TestCase): self.assertFalse(cs.container_sync_row( {'deleted': False, 'name': 'object', - 'created_at': timestamp.internal}, 'http://sync/to/path', + 'created_at': timestamp.internal, + 'size': 80}, 'http://sync/to/path', 'key', FakeContainerBroker('broker'), {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, realm, realm_key)) @@ -1038,7 +1108,8 @@ class TestContainerSync(unittest.TestCase): self.assertFalse(cs.container_sync_row( {'deleted': False, 'name': 'object', - 'created_at': timestamp.internal}, 'http://sync/to/path', + 'created_at': timestamp.internal, + 'size': 90}, 'http://sync/to/path', 'key', FakeContainerBroker('broker'), {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, realm, realm_key)) @@ -1055,7 +1126,8 @@ class TestContainerSync(unittest.TestCase): self.assertFalse(cs.container_sync_row( {'deleted': False, 'name': 'object', - 'created_at': timestamp.internal}, 'http://sync/to/path', + 'created_at': timestamp.internal, + 'size': 50}, 'http://sync/to/path', 'key', FakeContainerBroker('broker'), {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, realm, realm_key)) @@ -1072,7 +1144,8 @@ class TestContainerSync(unittest.TestCase): self.assertFalse(cs.container_sync_row( {'deleted': False, 'name': 'object', - 'created_at': timestamp.internal}, 'http://sync/to/path', + 'created_at': timestamp.internal, + 'size': 50}, 'http://sync/to/path', 'key', FakeContainerBroker('broker'), {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, realm, realm_key)) @@ -1093,7 +1166,8 @@ class TestContainerSync(unittest.TestCase): test_row = {'deleted': False, 'name': 'object', 'created_at': timestamp.internal, - 'etag': '1111'} + 'etag': '1111', + 'size': 10} test_info = {'account': 'a', 'container': 'c', 'storage_policy_index': 0}