swift-ceph-backend/swift_ceph_backend/rados_diskfile.py
Thiago da Silva d90d1ca900 open and close ioctx only once
The io context handle should be open only once in the life
of the application, there's no need to open and close
with every I/O transaction

Change-Id: I9948a71dbdcd20b4732c109ad35a400bd5962766
Signed-off-by: Thiago da Silva <thiago@redhat.com>
2015-09-16 19:32:54 +00:00

461 lines
15 KiB
Python

# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import atexit
import cPickle as pickle
from contextlib import contextmanager
from hashlib import md5
from eventlet import Timeout
import time
from swift.common.utils import normalize_timestamp
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \
DiskFileCollision, DiskFileNotOpen, DiskFileNoSpace, DiskFileError
from swift.common.swob import multi_range_iterator
from swift.obj.diskfile import METADATA_KEY
class RadosFileSystem(object):
def __init__(self, ceph_conf, rados_user, rados_pool, logger, **kwargs):
self._conf = ceph_conf
self._user = rados_user
self._pool = rados_pool
self._logger = logger
self._rados = None
self._ioctx = None
try:
import rados
except ImportError:
rados = None
self.RADOS = kwargs.get('rados', rados)
atexit.register(self._shutdown)
def _get_rados(self, fs):
if self._rados is None:
self._rados = fs.RADOS.Rados(conffile=fs._conf, rados_id=fs._user)
self._rados.connect()
self._ioctx = self._rados.open_ioctx(self._pool)
return self._rados, self._ioctx
def _shutdown(self):
if self._rados:
self._ioctx.close()
self._rados.shutdown()
class _radosfs(object):
def __init__(self, fs, _get_rados):
self._rados, self._ioctx = _get_rados(fs)
self._fs = fs
self._pool = fs._pool
def del_object(self, obj):
try:
self._ioctx.remove_object(obj)
finally:
pass
def get_metadata(self, obj):
ret = None
try:
ret = pickle.loads(self._ioctx.get_xattr(obj, METADATA_KEY))
finally:
return ret
def put_metadata(self, obj, metadata):
# Pickle the metadata object and set it on a xattr
self._ioctx.set_xattr(obj, METADATA_KEY,
pickle.dumps(metadata))
def put_object(self, obj, metadata):
self._ioctx.aio_flush()
self.put_metadata(obj, metadata)
def size(self, obj):
size = 0
try:
(size, ts) = self._ioctx.stat(obj)
finally:
return size
def create(self, obj, size):
try:
self._ioctx.trunc(obj, size)
except self._fs.RADOS.NoSpace:
raise DiskFileNoSpace()
def write(self, obj, offset, data):
try:
self._ioctx.write(obj, data, offset)
except self._fs.RADOS.NoSpace:
raise DiskFileNoSpace()
except Exception:
raise DiskFileError()
return len(data)
def read(self, obj, off):
return self._ioctx.read(obj, offset=off)
def quarantine(self, obj):
# There is no way of swift recon monitor to get information
# of the quarantined file, so better clean it up
self.del_object(obj)
def open(self):
return self._radosfs(self, self._get_rados)
def get_diskfile(self, device, partition, account,
container, obj, **kwargs):
return DiskFile(self, device, partition, account,
container, obj)
class DiskFileWriter(object):
"""
.. note::
RADOS based alternative pluggable on-disk backend implementation.
Encapsulation of the write context for servicing PUT REST API
requests. Serves as the context manager object for DiskFile's create()
method.
:param fs: internal file system object to use
:param name: standard object name
"""
def __init__(self, fs, name):
self._fs = fs
self._name = name
self._write_offset = 0
def write(self, chunk):
"""
Write a chunk of data.
:param chunk: the chunk of data to write as a string object
"""
written = 0
while written != len(chunk):
written += self._fs.write(self._name,
self._write_offset + written,
chunk[written:])
self._write_offset += len(chunk)
return self._write_offset
def put(self, metadata):
"""
Flush all the writes so far and set the metadata
:param metadata: dictionary of metadata to be written
"""
metadata['name'] = self._name
self._fs.put_object(self._name, metadata)
def commit(self, timestamp):
pass
class DiskFileReader(object):
"""
.. note::
RADOS based alternative pluggable on-disk backend implementation.
Encapsulation of the read context for servicing GET REST API
requests. Serves as the context manager object for DiskFile's reader()
method.
:param fs: internal filesystem object
:param name: object name
:param obj_size: on-disk size of object in bytes
:param etag: MD5 hash of object from metadata
:param iter_hook: called when __iter__ returns a chunk
"""
def __init__(self, fs, name, obj_size, etag, iter_hook=None):
self._fs = fs.open()
self._name = name
self._obj_size = obj_size
self._etag = etag
self._iter_hook = iter_hook
self._iter_etag = None
self._bytes_read = 0
self._read_offset = 0
self._started_at_0 = False
self._read_to_eof = False
self._suppress_file_closing = False
def __iter__(self):
try:
if self._read_offset == 0:
self._started_at_0 = True
self._read_to_eof = False
self._iter_etag = md5()
while True:
chunk = self._fs.read(self._name, self._read_offset)
if chunk:
if self._iter_etag:
self._iter_etag.update(chunk)
self._read_offset += len(chunk)
yield chunk
if self._iter_hook:
self._iter_hook()
else:
self._read_to_eof = True
break
finally:
if not self._suppress_file_closing:
self.close()
def app_iter_range(self, start, stop):
self._read_offset = start
if stop is not None:
length = stop - start
else:
length = None
try:
self._suppress_file_closing = True
for chunk in self:
if length is not None:
length -= len(chunk)
if length < 0:
# Chop off the extra:
yield chunk[:length]
break
yield chunk
finally:
self._suppress_file_closing = False
try:
self.close()
except DiskFileQuarantined:
pass
def app_iter_ranges(self, ranges, content_type, boundary, size):
if not ranges:
yield ''
else:
try:
self._suppress_file_closing = True
for chunk in multi_range_iterator(
ranges, content_type, boundary, size,
self.app_iter_range):
yield chunk
finally:
self._suppress_file_closing = False
try:
self.close()
except DiskFileQuarantined:
pass
def _quarantine(self, msg):
self._fs.quarantine(self._name)
def _handle_close_quarantine(self):
if self._read_offset != self._obj_size:
self._quarantine(
"Bytes read: %d, does not match metadata: %d" % (
self._read_offset, self._obj_size))
elif self._iter_etag and \
self._etag != self._iter_etag.hexdigest():
self._quarantine(
"ETag %s and file's md5 %s do not match" % (
self._etag, self._iter_etag.hexdigest()))
def close(self):
"""
handle quarantining file if necessary.
"""
try:
if self._started_at_0 and self._read_to_eof:
self._handle_close_quarantine()
except (Exception, Timeout):
pass
class DiskFile(object):
"""
.. note::
RADOS based alternative pluggable on-disk backend implementation.
Manage object files in RADOS filesystem
:param account: account name for the object
:param container: container name for the object
:param obj: object name for the object
:param iter_hook: called when __iter__ returns a chunk
:param keep_cache: caller's preference for keeping data read in the cache
"""
def __init__(self, fs, device, partition, account, container, obj):
self._name = '/' + '/'.join((device, partition, account,
container, obj))
self._metadata = None
self._fs = fs
def open(self):
"""
Open the file and read the metadata.
This method must populate the _metadata attribute.
:raises DiskFileCollision: on name mis-match with metadata
:raises DiskFileNotExist: if it does not exist
:raises DiskFileQuarantined: if while reading metadata of the file
some data did not pass cross checks
"""
self._fs_inst = self._fs.open()
self._metadata = self._fs_inst.get_metadata(self._name)
if self._metadata is None:
raise DiskFileNotExist()
self._verify_data_file()
self._metadata = self._metadata or {}
return self
def __enter__(self):
if self._metadata is None:
raise DiskFileNotOpen()
return self
def __exit__(self, t, v, tb):
pass
def _quarantine(self, msg):
self._fs_inst.quarantine(self._name)
raise DiskFileQuarantined(msg)
def _verify_data_file(self):
"""
Verify the metadata's name value matches what we think the object is
named.
:raises DiskFileCollision: if the metadata stored name does not match
the referenced name of the file
:raises DiskFileNotExist: if the object has expired
:raises DiskFileQuarantined: if data inconsistencies were detected
between the metadata and the file-system
metadata
"""
try:
mname = self._metadata['name']
except KeyError:
self._quarantine("missing name metadata")
else:
if mname != self._name:
raise DiskFileCollision('Client path does not match path '
'stored in object metadata')
try:
x_delete_at = int(self._metadata['X-Delete-At'])
except KeyError:
pass
except ValueError:
# Quarantine, the x-delete-at key is present but not an
# integer.
self._quarantine(
"bad metadata x-delete-at value %s" % (
self._metadata['X-Delete-At']))
else:
if x_delete_at <= time.time():
raise DiskFileNotExist('Expired')
try:
metadata_size = int(self._metadata['Content-Length'])
except KeyError:
self._quarantine(
"missing content-length in metadata")
except ValueError:
# Quarantine, the content-length key is present but not an
# integer.
self._quarantine(
"bad metadata content-length value %s" % (
self._metadata['Content-Length']))
obj_size = self._fs_inst.size(self._name)
if obj_size != metadata_size:
self._quarantine(
"metadata content-length %s does"
" not match actual object size %s" % (
metadata_size, obj_size))
def get_metadata(self):
"""
Provide the metadata for an object as a dictionary.
:returns: object's metadata dictionary
"""
if self._metadata is None:
raise DiskFileNotOpen()
return self._metadata
def read_metadata(self):
"""
Return the metadata for an object.
:returns: metadata dictionary for an object
"""
with self.open():
return self.get_metadata()
def reader(self, iter_hook=None, keep_cache=False):
"""
Return a swift.common.swob.Response class compatible "app_iter"
object. The responsibility of closing the open file is passed to the
DiskFileReader object.
:param iter_hook:
:param keep_cache:
"""
if self._metadata is None:
raise DiskFileNotOpen()
dr = DiskFileReader(self._fs, self._name,
int(self._metadata['Content-Length']),
self._metadata['ETag'],
iter_hook=iter_hook)
return dr
@contextmanager
def create(self, size=None):
"""
Context manager to create a file. We create a temporary file first, and
then return a DiskFileWriter object to encapsulate the state.
:param size: optional initial size of file to explicitly allocate on
disk
:raises DiskFileNoSpace: if a size is specified and allocation fails
"""
fs_inst = None
fs_inst = self._fs.open()
if size is not None:
fs_inst.create(self._name, size)
yield DiskFileWriter(fs_inst, self._name)
def write_metadata(self, metadata):
"""
Write a block of metadata to an object.
"""
with self.create() as writer:
writer.put(metadata)
def delete(self, timestamp):
"""
Perform a delete for the given object in the given container under the
given account.
:param timestamp: timestamp to compare with each file
"""
fs_inst = None
timestamp = normalize_timestamp(timestamp)
fs_inst = self._fs.open()
md = fs_inst.get_metadata(self._name)
if md and md['X-Timestamp'] < timestamp:
fs_inst.del_object(self._name)