diff --git a/oslo_utils/fileutils.py b/oslo_utils/fileutils.py index b579addc..ed1bba90 100644 --- a/oslo_utils/fileutils.py +++ b/oslo_utils/fileutils.py @@ -21,6 +21,7 @@ File utilities. import contextlib import errno +import hashlib import os import stat import tempfile @@ -103,3 +104,23 @@ def write_to_tempfile(content, path=None, suffix='', prefix='tmp'): finally: os.close(fd) return path + + +def compute_file_checksum(path, read_chunksize=65536, algorithm='sha256'): + """Compute checksum of a file's contents. + + :param path: Path to the file + :param read_chunksize: Maximum number of bytes to be read from the file + at once. Default is 65536 bytes or 64KB + :param algorithm: The hash algorithm name to use. For example, 'md5', + 'sha256', 'sha512' and so on. Default is 'sha256'. Refer to + hashlib.algorithms_available for available algorithms + :return: Hex digest string of the checksum + + .. versionadded:: 3.31.0 + """ + checksum = hashlib.new(algorithm) # Raises appropriate exceptions. + with open(path, 'rb') as f: + for chunk in iter(lambda: f.read(read_chunksize), b''): + checksum.update(chunk) + return checksum.hexdigest() diff --git a/oslo_utils/tests/test_fileutils.py b/oslo_utils/tests/test_fileutils.py index a16e28e4..33974adb 100644 --- a/oslo_utils/tests/test_fileutils.py +++ b/oslo_utils/tests/test_fileutils.py @@ -14,6 +14,7 @@ # under the License. import errno +import hashlib import os import shutil import stat @@ -189,3 +190,57 @@ class WriteToTempfileTestCase(test_base.BaseTestCase): self.assertTrue(basepath.startswith(tempfile.gettempdir())) self.check_file_content(res) + + +class TestComputeFileChecksum(test_base.BaseTestCase): + + def setUp(self): + super(TestComputeFileChecksum, self).setUp() + self.content = 'fake_content'.encode('ascii') + + def check_file_content(self, content, path): + with open(path, 'r') as fd: + ans = fd.read() + self.assertEqual(content, six.b(ans)) + + def test_compute_checksum_default_algorithm(self): + path = fileutils.write_to_tempfile(self.content) + self.assertTrue(os.path.exists(path)) + self.check_file_content(self.content, path) + + expected_checksum = hashlib.sha256() + expected_checksum.update(self.content) + + actual_checksum = fileutils.compute_file_checksum(path) + + self.assertEqual(expected_checksum.hexdigest(), actual_checksum) + + def test_compute_checksum_named_algorithm(self): + path = fileutils.write_to_tempfile(self.content) + self.assertTrue(os.path.exists(path)) + self.check_file_content(self.content, path) + + expected_checksum = hashlib.sha512() + expected_checksum.update(self.content) + + actual_checksum = fileutils.compute_file_checksum(path, + algorithm='sha512') + + self.assertEqual(expected_checksum.hexdigest(), actual_checksum) + + def test_compute_checksum_invalid_algorithm(self): + path = fileutils.write_to_tempfile(self.content) + self.assertTrue(os.path.exists(path)) + self.check_file_content(self.content, path) + + self.assertRaises(ValueError, fileutils.compute_file_checksum, + path, algorithm='foo') + + def test_file_does_not_exist(self): + random_file_name = uuid.uuid4().hex + path = os.path.join('/tmp', random_file_name) + self.assertRaises(IOError, fileutils.compute_file_checksum, path) + + def test_generic_io_error(self): + tempdir = tempfile.mkdtemp() + self.assertRaises(IOError, fileutils.compute_file_checksum, tempdir)