Add streaming download support
So we don't run out of memory when fetching blobs, use cherrypy's streaming support. Change-Id: I6c05240a81f0b6b2dd0992e42d097883ca4fdc2e
This commit is contained in:
parent
6d2894de8b
commit
61af2796b2
@ -57,6 +57,17 @@ class FilesystemDriver(storageutils.StorageDriver):
|
|||||||
with open(path, 'rb') as f:
|
with open(path, 'rb') as f:
|
||||||
return f.read()
|
return f.read()
|
||||||
|
|
||||||
|
def stream_object(self, path):
|
||||||
|
path = os.path.join(self.root, path)
|
||||||
|
if not os.path.exists(path):
|
||||||
|
return None
|
||||||
|
with open(path, 'rb') as f:
|
||||||
|
while True:
|
||||||
|
chunk = f.read(4096)
|
||||||
|
if not chunk:
|
||||||
|
return
|
||||||
|
yield chunk
|
||||||
|
|
||||||
def delete_object(self, path):
|
def delete_object(self, path):
|
||||||
path = os.path.join(self.root, path)
|
path = os.path.join(self.root, path)
|
||||||
if os.path.exists(path):
|
if os.path.exists(path):
|
||||||
|
@ -106,20 +106,29 @@ class RegistryAPI:
|
|||||||
|
|
||||||
@cherrypy.expose
|
@cherrypy.expose
|
||||||
@cherrypy.config(**{'tools.auth_basic.checkpassword': require_read})
|
@cherrypy.config(**{'tools.auth_basic.checkpassword': require_read})
|
||||||
def get_blob(self, repository, digest):
|
def head_blob(self, repository, digest):
|
||||||
namespace = self.get_namespace()
|
namespace = self.get_namespace()
|
||||||
method = cherrypy.request.method
|
self.log.info('Head blob %s %s', repository, digest)
|
||||||
self.log.info('%s blob %s %s', method, repository, digest)
|
|
||||||
size = self.storage.blob_size(namespace, digest)
|
size = self.storage.blob_size(namespace, digest)
|
||||||
if size is None:
|
if size is None:
|
||||||
return self.not_found()
|
return self.not_found()
|
||||||
res = cherrypy.response
|
res = cherrypy.response
|
||||||
res.headers['Docker-Content-Digest'] = digest
|
res.headers['Docker-Content-Digest'] = digest
|
||||||
if method != 'HEAD':
|
|
||||||
data = self.storage.get_blob(namespace, digest)
|
|
||||||
return data
|
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
|
@cherrypy.expose
|
||||||
|
@cherrypy.config(**{'tools.auth_basic.checkpassword': require_read,
|
||||||
|
'response.stream': True})
|
||||||
|
def get_blob(self, repository, digest):
|
||||||
|
namespace = self.get_namespace()
|
||||||
|
self.log.info('Get blob %s %s', repository, digest)
|
||||||
|
size = self.storage.blob_size(namespace, digest)
|
||||||
|
if size is None:
|
||||||
|
return self.not_found()
|
||||||
|
res = cherrypy.response
|
||||||
|
res.headers['Docker-Content-Digest'] = digest
|
||||||
|
return self.storage.stream_blob(namespace, digest)
|
||||||
|
|
||||||
@cherrypy.expose
|
@cherrypy.expose
|
||||||
@cherrypy.config(**{'tools.auth_basic.checkpassword': require_write})
|
@cherrypy.config(**{'tools.auth_basic.checkpassword': require_write})
|
||||||
def start_upload(self, repository, digest=None):
|
def start_upload(self, repository, digest=None):
|
||||||
@ -260,6 +269,10 @@ class RegistryServer:
|
|||||||
conditions=dict(method=['GET']),
|
conditions=dict(method=['GET']),
|
||||||
controller=api, action='get_manifest')
|
controller=api, action='get_manifest')
|
||||||
route_map.connect('api', '/v2/{repository:.*}/blobs/{digest}',
|
route_map.connect('api', '/v2/{repository:.*}/blobs/{digest}',
|
||||||
|
conditions=dict(method=['HEAD']),
|
||||||
|
controller=api, action='head_blob')
|
||||||
|
route_map.connect('api', '/v2/{repository:.*}/blobs/{digest}',
|
||||||
|
conditions=dict(method=['GET']),
|
||||||
controller=api, action='get_blob')
|
controller=api, action='get_blob')
|
||||||
|
|
||||||
conf = {
|
conf = {
|
||||||
|
@ -139,6 +139,11 @@ class Storage:
|
|||||||
path = os.path.join(namespace, 'blobs', digest, 'data')
|
path = os.path.join(namespace, 'blobs', digest, 'data')
|
||||||
return self.backend.get_object(path)
|
return self.backend.get_object(path)
|
||||||
|
|
||||||
|
def stream_blob(self, namespace, digest):
|
||||||
|
path = os.path.join(namespace, 'blobs',
|
||||||
|
self._path_from_digest(digest), 'data')
|
||||||
|
return self.backend.stream_object(path)
|
||||||
|
|
||||||
def start_upload(self, namespace):
|
def start_upload(self, namespace):
|
||||||
"""Start an upload.
|
"""Start an upload.
|
||||||
|
|
||||||
|
@ -91,6 +91,19 @@ class StorageDriver(metaclass=ABCMeta):
|
|||||||
"""
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def stream_object(self, path):
|
||||||
|
"""Retrieve an object, streaming.
|
||||||
|
|
||||||
|
Return a generator with the content of the object at `path`.
|
||||||
|
|
||||||
|
:arg str path: The object path.
|
||||||
|
|
||||||
|
:returns: The contents of the object.
|
||||||
|
:rtype: generator of bytearray
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def delete_object(self, path):
|
def delete_object(self, path):
|
||||||
"""Delete an object.
|
"""Delete an object.
|
||||||
|
@ -120,6 +120,14 @@ class SwiftDriver(storageutils.StorageDriver):
|
|||||||
return None
|
return None
|
||||||
return ret.content
|
return ret.content
|
||||||
|
|
||||||
|
def stream_object(self, path):
|
||||||
|
try:
|
||||||
|
ret = retry_function(
|
||||||
|
lambda: self.conn.session.get(self.get_url(path), stream=True))
|
||||||
|
except keystoneauth1.exceptions.http.NotFound:
|
||||||
|
return None
|
||||||
|
return ret.iter_content(chunk_size=4096)
|
||||||
|
|
||||||
def delete_object(self, path):
|
def delete_object(self, path):
|
||||||
retry_function(
|
retry_function(
|
||||||
lambda: self.conn.session.delete(
|
lambda: self.conn.session.delete(
|
||||||
|
Loading…
Reference in New Issue
Block a user