Add method to compute a file's checksum to fileutils

File checksums are used in projects like glance and
glance_store to verify image uploads and so on, so a
generic oslo utility method could be useful.

Change-Id: I1b76ee1876771e7965fd5704ab6281318f36e810
This commit is contained in:
Rajath Agasthya 2017-07-16 12:05:27 -07:00
parent 6661cb619b
commit 750ed32771
2 changed files with 76 additions and 0 deletions

View File

@ -21,6 +21,7 @@ File utilities.
import contextlib
import errno
import hashlib
import os
import stat
import tempfile
@ -103,3 +104,23 @@ def write_to_tempfile(content, path=None, suffix='', prefix='tmp'):
finally:
os.close(fd)
return path
def compute_file_checksum(path, read_chunksize=65536, algorithm='sha256'):
"""Compute checksum of a file's contents.
:param path: Path to the file
:param read_chunksize: Maximum number of bytes to be read from the file
at once. Default is 65536 bytes or 64KB
:param algorithm: The hash algorithm name to use. For example, 'md5',
'sha256', 'sha512' and so on. Default is 'sha256'. Refer to
hashlib.algorithms_available for available algorithms
:return: Hex digest string of the checksum
.. versionadded:: 3.31.0
"""
checksum = hashlib.new(algorithm) # Raises appropriate exceptions.
with open(path, 'rb') as f:
for chunk in iter(lambda: f.read(read_chunksize), b''):
checksum.update(chunk)
return checksum.hexdigest()

View File

@ -14,6 +14,7 @@
# under the License.
import errno
import hashlib
import os
import shutil
import stat
@ -189,3 +190,57 @@ class WriteToTempfileTestCase(test_base.BaseTestCase):
self.assertTrue(basepath.startswith(tempfile.gettempdir()))
self.check_file_content(res)
class TestComputeFileChecksum(test_base.BaseTestCase):
def setUp(self):
super(TestComputeFileChecksum, self).setUp()
self.content = 'fake_content'.encode('ascii')
def check_file_content(self, content, path):
with open(path, 'r') as fd:
ans = fd.read()
self.assertEqual(content, six.b(ans))
def test_compute_checksum_default_algorithm(self):
path = fileutils.write_to_tempfile(self.content)
self.assertTrue(os.path.exists(path))
self.check_file_content(self.content, path)
expected_checksum = hashlib.sha256()
expected_checksum.update(self.content)
actual_checksum = fileutils.compute_file_checksum(path)
self.assertEqual(expected_checksum.hexdigest(), actual_checksum)
def test_compute_checksum_named_algorithm(self):
path = fileutils.write_to_tempfile(self.content)
self.assertTrue(os.path.exists(path))
self.check_file_content(self.content, path)
expected_checksum = hashlib.sha512()
expected_checksum.update(self.content)
actual_checksum = fileutils.compute_file_checksum(path,
algorithm='sha512')
self.assertEqual(expected_checksum.hexdigest(), actual_checksum)
def test_compute_checksum_invalid_algorithm(self):
path = fileutils.write_to_tempfile(self.content)
self.assertTrue(os.path.exists(path))
self.check_file_content(self.content, path)
self.assertRaises(ValueError, fileutils.compute_file_checksum,
path, algorithm='foo')
def test_file_does_not_exist(self):
random_file_name = uuid.uuid4().hex
path = os.path.join('/tmp', random_file_name)
self.assertRaises(IOError, fileutils.compute_file_checksum, path)
def test_generic_io_error(self):
tempdir = tempfile.mkdtemp()
self.assertRaises(IOError, fileutils.compute_file_checksum, tempdir)