d90d1ca900
The io context handle should be open only once in the life of the application, there's no need to open and close with every I/O transaction Change-Id: I9948a71dbdcd20b4732c109ad35a400bd5962766 Signed-off-by: Thiago da Silva <thiago@redhat.com>
461 lines
15 KiB
Python
461 lines
15 KiB
Python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import atexit
|
|
import cPickle as pickle
|
|
from contextlib import contextmanager
|
|
from hashlib import md5
|
|
from eventlet import Timeout
|
|
import time
|
|
|
|
from swift.common.utils import normalize_timestamp
|
|
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \
|
|
DiskFileCollision, DiskFileNotOpen, DiskFileNoSpace, DiskFileError
|
|
from swift.common.swob import multi_range_iterator
|
|
from swift.obj.diskfile import METADATA_KEY
|
|
|
|
|
|
class RadosFileSystem(object):
|
|
def __init__(self, ceph_conf, rados_user, rados_pool, logger, **kwargs):
|
|
self._conf = ceph_conf
|
|
self._user = rados_user
|
|
self._pool = rados_pool
|
|
self._logger = logger
|
|
|
|
self._rados = None
|
|
self._ioctx = None
|
|
try:
|
|
import rados
|
|
except ImportError:
|
|
rados = None
|
|
self.RADOS = kwargs.get('rados', rados)
|
|
atexit.register(self._shutdown)
|
|
|
|
def _get_rados(self, fs):
|
|
if self._rados is None:
|
|
self._rados = fs.RADOS.Rados(conffile=fs._conf, rados_id=fs._user)
|
|
self._rados.connect()
|
|
self._ioctx = self._rados.open_ioctx(self._pool)
|
|
return self._rados, self._ioctx
|
|
|
|
def _shutdown(self):
|
|
if self._rados:
|
|
self._ioctx.close()
|
|
self._rados.shutdown()
|
|
|
|
class _radosfs(object):
|
|
def __init__(self, fs, _get_rados):
|
|
self._rados, self._ioctx = _get_rados(fs)
|
|
self._fs = fs
|
|
self._pool = fs._pool
|
|
|
|
def del_object(self, obj):
|
|
try:
|
|
self._ioctx.remove_object(obj)
|
|
finally:
|
|
pass
|
|
|
|
def get_metadata(self, obj):
|
|
ret = None
|
|
try:
|
|
ret = pickle.loads(self._ioctx.get_xattr(obj, METADATA_KEY))
|
|
finally:
|
|
return ret
|
|
|
|
def put_metadata(self, obj, metadata):
|
|
# Pickle the metadata object and set it on a xattr
|
|
self._ioctx.set_xattr(obj, METADATA_KEY,
|
|
pickle.dumps(metadata))
|
|
|
|
def put_object(self, obj, metadata):
|
|
self._ioctx.aio_flush()
|
|
self.put_metadata(obj, metadata)
|
|
|
|
def size(self, obj):
|
|
size = 0
|
|
try:
|
|
(size, ts) = self._ioctx.stat(obj)
|
|
finally:
|
|
return size
|
|
|
|
def create(self, obj, size):
|
|
try:
|
|
self._ioctx.trunc(obj, size)
|
|
except self._fs.RADOS.NoSpace:
|
|
raise DiskFileNoSpace()
|
|
|
|
def write(self, obj, offset, data):
|
|
try:
|
|
self._ioctx.write(obj, data, offset)
|
|
except self._fs.RADOS.NoSpace:
|
|
raise DiskFileNoSpace()
|
|
except Exception:
|
|
raise DiskFileError()
|
|
return len(data)
|
|
|
|
def read(self, obj, off):
|
|
return self._ioctx.read(obj, offset=off)
|
|
|
|
def quarantine(self, obj):
|
|
# There is no way of swift recon monitor to get information
|
|
# of the quarantined file, so better clean it up
|
|
self.del_object(obj)
|
|
|
|
def open(self):
|
|
return self._radosfs(self, self._get_rados)
|
|
|
|
def get_diskfile(self, device, partition, account,
|
|
container, obj, **kwargs):
|
|
return DiskFile(self, device, partition, account,
|
|
container, obj)
|
|
|
|
|
|
class DiskFileWriter(object):
|
|
"""
|
|
.. note::
|
|
RADOS based alternative pluggable on-disk backend implementation.
|
|
|
|
Encapsulation of the write context for servicing PUT REST API
|
|
requests. Serves as the context manager object for DiskFile's create()
|
|
method.
|
|
|
|
:param fs: internal file system object to use
|
|
:param name: standard object name
|
|
"""
|
|
def __init__(self, fs, name):
|
|
self._fs = fs
|
|
self._name = name
|
|
self._write_offset = 0
|
|
|
|
def write(self, chunk):
|
|
"""
|
|
Write a chunk of data.
|
|
|
|
:param chunk: the chunk of data to write as a string object
|
|
"""
|
|
written = 0
|
|
while written != len(chunk):
|
|
written += self._fs.write(self._name,
|
|
self._write_offset + written,
|
|
chunk[written:])
|
|
self._write_offset += len(chunk)
|
|
return self._write_offset
|
|
|
|
def put(self, metadata):
|
|
"""
|
|
Flush all the writes so far and set the metadata
|
|
|
|
:param metadata: dictionary of metadata to be written
|
|
"""
|
|
metadata['name'] = self._name
|
|
self._fs.put_object(self._name, metadata)
|
|
|
|
def commit(self, timestamp):
|
|
pass
|
|
|
|
|
|
class DiskFileReader(object):
|
|
"""
|
|
.. note::
|
|
RADOS based alternative pluggable on-disk backend implementation.
|
|
|
|
Encapsulation of the read context for servicing GET REST API
|
|
requests. Serves as the context manager object for DiskFile's reader()
|
|
method.
|
|
|
|
:param fs: internal filesystem object
|
|
:param name: object name
|
|
:param obj_size: on-disk size of object in bytes
|
|
:param etag: MD5 hash of object from metadata
|
|
:param iter_hook: called when __iter__ returns a chunk
|
|
"""
|
|
def __init__(self, fs, name, obj_size, etag, iter_hook=None):
|
|
self._fs = fs.open()
|
|
self._name = name
|
|
self._obj_size = obj_size
|
|
self._etag = etag
|
|
self._iter_hook = iter_hook
|
|
self._iter_etag = None
|
|
self._bytes_read = 0
|
|
self._read_offset = 0
|
|
self._started_at_0 = False
|
|
self._read_to_eof = False
|
|
self._suppress_file_closing = False
|
|
|
|
def __iter__(self):
|
|
try:
|
|
if self._read_offset == 0:
|
|
self._started_at_0 = True
|
|
self._read_to_eof = False
|
|
self._iter_etag = md5()
|
|
while True:
|
|
chunk = self._fs.read(self._name, self._read_offset)
|
|
if chunk:
|
|
if self._iter_etag:
|
|
self._iter_etag.update(chunk)
|
|
self._read_offset += len(chunk)
|
|
yield chunk
|
|
if self._iter_hook:
|
|
self._iter_hook()
|
|
else:
|
|
self._read_to_eof = True
|
|
break
|
|
finally:
|
|
if not self._suppress_file_closing:
|
|
self.close()
|
|
|
|
def app_iter_range(self, start, stop):
|
|
self._read_offset = start
|
|
if stop is not None:
|
|
length = stop - start
|
|
else:
|
|
length = None
|
|
try:
|
|
self._suppress_file_closing = True
|
|
for chunk in self:
|
|
if length is not None:
|
|
length -= len(chunk)
|
|
if length < 0:
|
|
# Chop off the extra:
|
|
yield chunk[:length]
|
|
break
|
|
yield chunk
|
|
finally:
|
|
self._suppress_file_closing = False
|
|
try:
|
|
self.close()
|
|
except DiskFileQuarantined:
|
|
pass
|
|
|
|
def app_iter_ranges(self, ranges, content_type, boundary, size):
|
|
if not ranges:
|
|
yield ''
|
|
else:
|
|
try:
|
|
self._suppress_file_closing = True
|
|
for chunk in multi_range_iterator(
|
|
ranges, content_type, boundary, size,
|
|
self.app_iter_range):
|
|
yield chunk
|
|
finally:
|
|
self._suppress_file_closing = False
|
|
try:
|
|
self.close()
|
|
except DiskFileQuarantined:
|
|
pass
|
|
|
|
def _quarantine(self, msg):
|
|
self._fs.quarantine(self._name)
|
|
|
|
def _handle_close_quarantine(self):
|
|
if self._read_offset != self._obj_size:
|
|
self._quarantine(
|
|
"Bytes read: %d, does not match metadata: %d" % (
|
|
self._read_offset, self._obj_size))
|
|
elif self._iter_etag and \
|
|
self._etag != self._iter_etag.hexdigest():
|
|
self._quarantine(
|
|
"ETag %s and file's md5 %s do not match" % (
|
|
self._etag, self._iter_etag.hexdigest()))
|
|
|
|
def close(self):
|
|
"""
|
|
handle quarantining file if necessary.
|
|
"""
|
|
try:
|
|
if self._started_at_0 and self._read_to_eof:
|
|
self._handle_close_quarantine()
|
|
except (Exception, Timeout):
|
|
pass
|
|
|
|
|
|
class DiskFile(object):
|
|
"""
|
|
.. note::
|
|
|
|
RADOS based alternative pluggable on-disk backend implementation.
|
|
|
|
Manage object files in RADOS filesystem
|
|
|
|
:param account: account name for the object
|
|
:param container: container name for the object
|
|
:param obj: object name for the object
|
|
:param iter_hook: called when __iter__ returns a chunk
|
|
:param keep_cache: caller's preference for keeping data read in the cache
|
|
"""
|
|
|
|
def __init__(self, fs, device, partition, account, container, obj):
|
|
self._name = '/' + '/'.join((device, partition, account,
|
|
container, obj))
|
|
self._metadata = None
|
|
self._fs = fs
|
|
|
|
def open(self):
|
|
"""
|
|
Open the file and read the metadata.
|
|
|
|
This method must populate the _metadata attribute.
|
|
:raises DiskFileCollision: on name mis-match with metadata
|
|
:raises DiskFileNotExist: if it does not exist
|
|
:raises DiskFileQuarantined: if while reading metadata of the file
|
|
some data did not pass cross checks
|
|
"""
|
|
self._fs_inst = self._fs.open()
|
|
self._metadata = self._fs_inst.get_metadata(self._name)
|
|
if self._metadata is None:
|
|
raise DiskFileNotExist()
|
|
self._verify_data_file()
|
|
self._metadata = self._metadata or {}
|
|
return self
|
|
|
|
def __enter__(self):
|
|
if self._metadata is None:
|
|
raise DiskFileNotOpen()
|
|
return self
|
|
|
|
def __exit__(self, t, v, tb):
|
|
pass
|
|
|
|
def _quarantine(self, msg):
|
|
self._fs_inst.quarantine(self._name)
|
|
raise DiskFileQuarantined(msg)
|
|
|
|
def _verify_data_file(self):
|
|
"""
|
|
Verify the metadata's name value matches what we think the object is
|
|
named.
|
|
|
|
:raises DiskFileCollision: if the metadata stored name does not match
|
|
the referenced name of the file
|
|
:raises DiskFileNotExist: if the object has expired
|
|
:raises DiskFileQuarantined: if data inconsistencies were detected
|
|
between the metadata and the file-system
|
|
metadata
|
|
"""
|
|
try:
|
|
mname = self._metadata['name']
|
|
except KeyError:
|
|
self._quarantine("missing name metadata")
|
|
else:
|
|
if mname != self._name:
|
|
raise DiskFileCollision('Client path does not match path '
|
|
'stored in object metadata')
|
|
try:
|
|
x_delete_at = int(self._metadata['X-Delete-At'])
|
|
except KeyError:
|
|
pass
|
|
except ValueError:
|
|
# Quarantine, the x-delete-at key is present but not an
|
|
# integer.
|
|
self._quarantine(
|
|
"bad metadata x-delete-at value %s" % (
|
|
self._metadata['X-Delete-At']))
|
|
else:
|
|
if x_delete_at <= time.time():
|
|
raise DiskFileNotExist('Expired')
|
|
try:
|
|
metadata_size = int(self._metadata['Content-Length'])
|
|
except KeyError:
|
|
self._quarantine(
|
|
"missing content-length in metadata")
|
|
except ValueError:
|
|
# Quarantine, the content-length key is present but not an
|
|
# integer.
|
|
self._quarantine(
|
|
"bad metadata content-length value %s" % (
|
|
self._metadata['Content-Length']))
|
|
|
|
obj_size = self._fs_inst.size(self._name)
|
|
if obj_size != metadata_size:
|
|
self._quarantine(
|
|
"metadata content-length %s does"
|
|
" not match actual object size %s" % (
|
|
metadata_size, obj_size))
|
|
|
|
def get_metadata(self):
|
|
"""
|
|
Provide the metadata for an object as a dictionary.
|
|
|
|
:returns: object's metadata dictionary
|
|
"""
|
|
if self._metadata is None:
|
|
raise DiskFileNotOpen()
|
|
return self._metadata
|
|
|
|
def read_metadata(self):
|
|
"""
|
|
Return the metadata for an object.
|
|
|
|
:returns: metadata dictionary for an object
|
|
"""
|
|
with self.open():
|
|
return self.get_metadata()
|
|
|
|
def reader(self, iter_hook=None, keep_cache=False):
|
|
"""
|
|
Return a swift.common.swob.Response class compatible "app_iter"
|
|
object. The responsibility of closing the open file is passed to the
|
|
DiskFileReader object.
|
|
|
|
:param iter_hook:
|
|
:param keep_cache:
|
|
"""
|
|
if self._metadata is None:
|
|
raise DiskFileNotOpen()
|
|
|
|
dr = DiskFileReader(self._fs, self._name,
|
|
int(self._metadata['Content-Length']),
|
|
self._metadata['ETag'],
|
|
iter_hook=iter_hook)
|
|
return dr
|
|
|
|
@contextmanager
|
|
def create(self, size=None):
|
|
"""
|
|
Context manager to create a file. We create a temporary file first, and
|
|
then return a DiskFileWriter object to encapsulate the state.
|
|
|
|
:param size: optional initial size of file to explicitly allocate on
|
|
disk
|
|
:raises DiskFileNoSpace: if a size is specified and allocation fails
|
|
"""
|
|
fs_inst = None
|
|
fs_inst = self._fs.open()
|
|
if size is not None:
|
|
fs_inst.create(self._name, size)
|
|
|
|
yield DiskFileWriter(fs_inst, self._name)
|
|
|
|
def write_metadata(self, metadata):
|
|
"""
|
|
Write a block of metadata to an object.
|
|
"""
|
|
with self.create() as writer:
|
|
writer.put(metadata)
|
|
|
|
def delete(self, timestamp):
|
|
"""
|
|
Perform a delete for the given object in the given container under the
|
|
given account.
|
|
|
|
:param timestamp: timestamp to compare with each file
|
|
"""
|
|
fs_inst = None
|
|
timestamp = normalize_timestamp(timestamp)
|
|
fs_inst = self._fs.open()
|
|
md = fs_inst.get_metadata(self._name)
|
|
if md and md['X-Timestamp'] < timestamp:
|
|
fs_inst.del_object(self._name)
|