Add a chunks_finished to BaseDiskFileWriter
BaseDiskFileWriter will track md5 and expose upload_size, etag via the chunks_finished method. The BaseDiskFileReader already tracks the md5/etag via _iter_etag, for parity we add a _chunks_etag to the BaseDiskFileReader. Instead of returning the upload_size and hexdigest every call to write, we return the tuple from chunks_finished. Change-Id: I26c58719cff5fde941d0248c250a0204e0379ae5
This commit is contained in:
parent
29f71c9119
commit
52ecbf9539
@ -39,7 +39,7 @@ import os
|
|||||||
import re
|
import re
|
||||||
import time
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
import hashlib
|
from hashlib import md5
|
||||||
import logging
|
import logging
|
||||||
import traceback
|
import traceback
|
||||||
import xattr
|
import xattr
|
||||||
@ -194,14 +194,14 @@ def read_metadata(fd, add_missing_checksum=False):
|
|||||||
# exist. This is fine; it just means that this object predates the
|
# exist. This is fine; it just means that this object predates the
|
||||||
# introduction of metadata checksums.
|
# introduction of metadata checksums.
|
||||||
if add_missing_checksum:
|
if add_missing_checksum:
|
||||||
new_checksum = hashlib.md5(metadata).hexdigest()
|
new_checksum = md5(metadata).hexdigest()
|
||||||
try:
|
try:
|
||||||
xattr.setxattr(fd, METADATA_CHECKSUM_KEY, new_checksum)
|
xattr.setxattr(fd, METADATA_CHECKSUM_KEY, new_checksum)
|
||||||
except (IOError, OSError) as e:
|
except (IOError, OSError) as e:
|
||||||
logging.error("Error adding metadata: %s" % e)
|
logging.error("Error adding metadata: %s" % e)
|
||||||
|
|
||||||
if metadata_checksum:
|
if metadata_checksum:
|
||||||
computed_checksum = hashlib.md5(metadata).hexdigest().encode('ascii')
|
computed_checksum = md5(metadata).hexdigest().encode('ascii')
|
||||||
if metadata_checksum != computed_checksum:
|
if metadata_checksum != computed_checksum:
|
||||||
raise DiskFileBadMetadataChecksum(
|
raise DiskFileBadMetadataChecksum(
|
||||||
"Metadata checksum mismatch for %s: "
|
"Metadata checksum mismatch for %s: "
|
||||||
@ -226,7 +226,7 @@ def write_metadata(fd, metadata, xattr_size=65536):
|
|||||||
:param metadata: metadata to write
|
:param metadata: metadata to write
|
||||||
"""
|
"""
|
||||||
metastr = pickle.dumps(_encode_metadata(metadata), PICKLE_PROTOCOL)
|
metastr = pickle.dumps(_encode_metadata(metadata), PICKLE_PROTOCOL)
|
||||||
metastr_md5 = hashlib.md5(metastr).hexdigest().encode('ascii')
|
metastr_md5 = md5(metastr).hexdigest().encode('ascii')
|
||||||
key = 0
|
key = 0
|
||||||
try:
|
try:
|
||||||
while metastr:
|
while metastr:
|
||||||
@ -1084,7 +1084,7 @@ class BaseDiskFileManager(object):
|
|||||||
|
|
||||||
:param path: full path to directory
|
:param path: full path to directory
|
||||||
"""
|
"""
|
||||||
hashes = defaultdict(hashlib.md5)
|
hashes = defaultdict(md5)
|
||||||
try:
|
try:
|
||||||
path_contents = sorted(os.listdir(path))
|
path_contents = sorted(os.listdir(path))
|
||||||
except OSError as err:
|
except OSError as err:
|
||||||
@ -1626,6 +1626,7 @@ class BaseDiskFileWriter(object):
|
|||||||
self._fd = None
|
self._fd = None
|
||||||
self._tmppath = None
|
self._tmppath = None
|
||||||
self._size = size
|
self._size = size
|
||||||
|
self._chunks_etag = md5()
|
||||||
self._bytes_per_sync = bytes_per_sync
|
self._bytes_per_sync = bytes_per_sync
|
||||||
self._diskfile = diskfile
|
self._diskfile = diskfile
|
||||||
self.next_part_power = next_part_power
|
self.next_part_power = next_part_power
|
||||||
@ -1716,13 +1717,10 @@ class BaseDiskFileWriter(object):
|
|||||||
For this implementation, the data is written into a temporary file.
|
For this implementation, the data is written into a temporary file.
|
||||||
|
|
||||||
:param chunk: the chunk of data to write as a string object
|
:param chunk: the chunk of data to write as a string object
|
||||||
|
|
||||||
:returns: the total number of bytes written to an object
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if not self._fd:
|
if not self._fd:
|
||||||
raise ValueError('Writer is not open')
|
raise ValueError('Writer is not open')
|
||||||
|
self._chunks_etag.update(chunk)
|
||||||
while chunk:
|
while chunk:
|
||||||
written = os.write(self._fd, chunk)
|
written = os.write(self._fd, chunk)
|
||||||
self._upload_size += written
|
self._upload_size += written
|
||||||
@ -1735,7 +1733,13 @@ class BaseDiskFileWriter(object):
|
|||||||
drop_buffer_cache(self._fd, self._last_sync, diff)
|
drop_buffer_cache(self._fd, self._last_sync, diff)
|
||||||
self._last_sync = self._upload_size
|
self._last_sync = self._upload_size
|
||||||
|
|
||||||
return self._upload_size
|
def chunks_finished(self):
|
||||||
|
"""
|
||||||
|
Expose internal stats about written chunks.
|
||||||
|
|
||||||
|
:returns: a tuple, (upload_size, etag)
|
||||||
|
"""
|
||||||
|
return self._upload_size, self._chunks_etag.hexdigest()
|
||||||
|
|
||||||
def _finalize_put(self, metadata, target_path, cleanup):
|
def _finalize_put(self, metadata, target_path, cleanup):
|
||||||
# Write the metadata before calling fsync() so that both data and
|
# Write the metadata before calling fsync() so that both data and
|
||||||
@ -1940,7 +1944,7 @@ class BaseDiskFileReader(object):
|
|||||||
def _init_checks(self):
|
def _init_checks(self):
|
||||||
if self._fp.tell() == 0:
|
if self._fp.tell() == 0:
|
||||||
self._started_at_0 = True
|
self._started_at_0 = True
|
||||||
self._iter_etag = hashlib.md5()
|
self._iter_etag = md5()
|
||||||
|
|
||||||
def _update_checks(self, chunk):
|
def _update_checks(self, chunk):
|
||||||
if self._iter_etag:
|
if self._iter_etag:
|
||||||
|
@ -103,6 +103,7 @@ class DiskFileWriter(object):
|
|||||||
self._name = name
|
self._name = name
|
||||||
self._fp = None
|
self._fp = None
|
||||||
self._upload_size = 0
|
self._upload_size = 0
|
||||||
|
self._chunks_etag = hashlib.md5()
|
||||||
|
|
||||||
def open(self):
|
def open(self):
|
||||||
"""
|
"""
|
||||||
@ -130,7 +131,15 @@ class DiskFileWriter(object):
|
|||||||
"""
|
"""
|
||||||
self._fp.write(chunk)
|
self._fp.write(chunk)
|
||||||
self._upload_size += len(chunk)
|
self._upload_size += len(chunk)
|
||||||
return self._upload_size
|
self._chunks_etag.update(chunk)
|
||||||
|
|
||||||
|
def chunks_finished(self):
|
||||||
|
"""
|
||||||
|
Expose internal stats about written chunks.
|
||||||
|
|
||||||
|
:returns: a tuple, (upload_size, etag)
|
||||||
|
"""
|
||||||
|
return self._upload_size, self._chunks_etag.hexdigest()
|
||||||
|
|
||||||
def put(self, metadata):
|
def put(self, metadata):
|
||||||
"""
|
"""
|
||||||
|
@ -796,12 +796,9 @@ class ObjectController(BaseStorageServer):
|
|||||||
headers={'X-Backend-Timestamp': orig_timestamp.internal})
|
headers={'X-Backend-Timestamp': orig_timestamp.internal})
|
||||||
orig_delete_at = int(orig_metadata.get('X-Delete-At') or 0)
|
orig_delete_at = int(orig_metadata.get('X-Delete-At') or 0)
|
||||||
upload_expiration = time.time() + self.max_upload_time
|
upload_expiration = time.time() + self.max_upload_time
|
||||||
etag = md5()
|
|
||||||
elapsed_time = 0
|
elapsed_time = 0
|
||||||
try:
|
try:
|
||||||
with disk_file.create(size=fsize) as writer:
|
with disk_file.create(size=fsize) as writer:
|
||||||
upload_size = 0
|
|
||||||
|
|
||||||
# If the proxy wants to send us object metadata after the
|
# If the proxy wants to send us object metadata after the
|
||||||
# object body, it sets some headers. We have to tell the
|
# object body, it sets some headers. We have to tell the
|
||||||
# proxy, in the 100 Continue response, that we're able to
|
# proxy, in the 100 Continue response, that we're able to
|
||||||
@ -853,13 +850,13 @@ class ObjectController(BaseStorageServer):
|
|||||||
if start_time > upload_expiration:
|
if start_time > upload_expiration:
|
||||||
self.logger.increment('PUT.timeouts')
|
self.logger.increment('PUT.timeouts')
|
||||||
return HTTPRequestTimeout(request=request)
|
return HTTPRequestTimeout(request=request)
|
||||||
etag.update(chunk)
|
writer.write(chunk)
|
||||||
upload_size = writer.write(chunk)
|
|
||||||
elapsed_time += time.time() - start_time
|
elapsed_time += time.time() - start_time
|
||||||
except ChunkReadError:
|
except ChunkReadError:
|
||||||
return HTTPClientDisconnect(request=request)
|
return HTTPClientDisconnect(request=request)
|
||||||
except ChunkReadTimeout:
|
except ChunkReadTimeout:
|
||||||
return HTTPRequestTimeout(request=request)
|
return HTTPRequestTimeout(request=request)
|
||||||
|
upload_size, etag = writer.chunks_finished()
|
||||||
if upload_size:
|
if upload_size:
|
||||||
self.logger.transfer_rate(
|
self.logger.transfer_rate(
|
||||||
'PUT.' + device + '.timing', elapsed_time,
|
'PUT.' + device + '.timing', elapsed_time,
|
||||||
@ -874,7 +871,6 @@ class ObjectController(BaseStorageServer):
|
|||||||
|
|
||||||
request_etag = (footer_meta.get('etag') or
|
request_etag = (footer_meta.get('etag') or
|
||||||
request.headers.get('etag', '')).lower()
|
request.headers.get('etag', '')).lower()
|
||||||
etag = etag.hexdigest()
|
|
||||||
if request_etag and request_etag != etag:
|
if request_etag and request_etag != etag:
|
||||||
return HTTPUnprocessableEntity(request=request)
|
return HTTPUnprocessableEntity(request=request)
|
||||||
metadata = {
|
metadata = {
|
||||||
|
@ -3571,7 +3571,6 @@ class DiskFileMixin(BaseDiskFileTestMixin):
|
|||||||
except IndexError:
|
except IndexError:
|
||||||
data = archives[0]
|
data = archives[0]
|
||||||
|
|
||||||
etag = md5()
|
|
||||||
if ts:
|
if ts:
|
||||||
timestamp = Timestamp(ts)
|
timestamp = Timestamp(ts)
|
||||||
else:
|
else:
|
||||||
@ -3582,9 +3581,8 @@ class DiskFileMixin(BaseDiskFileTestMixin):
|
|||||||
prealloc_size = None
|
prealloc_size = None
|
||||||
|
|
||||||
with df.create(size=prealloc_size) as writer:
|
with df.create(size=prealloc_size) as writer:
|
||||||
upload_size = writer.write(data)
|
writer.write(data)
|
||||||
etag.update(data)
|
upload_size, etag = writer.chunks_finished()
|
||||||
etag = etag.hexdigest()
|
|
||||||
metadata = {
|
metadata = {
|
||||||
'ETag': etag,
|
'ETag': etag,
|
||||||
'X-Timestamp': timestamp.internal,
|
'X-Timestamp': timestamp.internal,
|
||||||
|
@ -5863,7 +5863,8 @@ class BaseTestECObjectController(BaseTestObjectController):
|
|||||||
prolis = _test_sockets[0]
|
prolis = _test_sockets[0]
|
||||||
prosrv = _test_servers[0]
|
prosrv = _test_servers[0]
|
||||||
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
|
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
|
||||||
with mock.patch('swift.obj.server.md5', busted_md5_constructor):
|
with mock.patch('swift.obj.diskfile.md5',
|
||||||
|
busted_md5_constructor):
|
||||||
fd = sock.makefile()
|
fd = sock.makefile()
|
||||||
fd.write('PUT /v1/a/%s/pimento HTTP/1.1\r\n'
|
fd.write('PUT /v1/a/%s/pimento HTTP/1.1\r\n'
|
||||||
'Host: localhost\r\n'
|
'Host: localhost\r\n'
|
||||||
|
Loading…
Reference in New Issue
Block a user