Don't calculate ETag ourselves when our FUSE process does it for us; add namespace sync tool

This commit is contained in:
Phil Bridges 2016-02-10 18:59:19 -06:00
parent 2d4cc93deb
commit 3f19b07d3a
5 changed files with 1003 additions and 85 deletions

855
bin/swiftonhpss-nstool Executable file
View File

@ -0,0 +1,855 @@
#!/usr/bin/python
#
# Copyright (c) 2015-2016 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import stat
import os
import multiprocessing
from pwd import getpwuid
import logging
import argparse
import swiftclient
from swiftclient.exceptions import ClientException
import requests
import getpass
from functools import wraps
from simplejson.scanner import JSONDecodeError
"""
Namespace reconciliation tool for SwiftOnHPSS.
Depends on Keystone for authentication, preferably with the Keystone HPSS
identity backend.
Has not been tested with any other authentication service.
WARNING: Do not run nstool on a regular Swift JBOD disk, or 'fun' things
will happen to your metadata.
LIMITATION: In order to save unnecessary file I/O on HPSS, it is highly
recommended to add 'object_post_as_copy=false' to the configuration of
the proxy server that will handle SwiftOnHPSS calls!
LIMITATION: Only supports the Keystone Identity V2 API
"""
# TODO: pull OpenStack details from env vars
def trace_function(f):
@wraps(f)
def wrap(*args, **kwargs):
logging.info('entering %s' % f.__name__)
logging.info('args: %s' % str(args))
logging.info('kwargs: %s' % str(kwargs))
result = f(*args, **kwargs)
logging.info('result: %s' % str(result))
logging.info('exiting %s' % f.__name__)
return result
return wrap
@trace_function
def main(program_args):
nstool = Reconciler(program_args)
if program_args.device.endswith('/'):
program_args.device = program_args.device[:-1]
# Get and check authentication
password = getpass.getpass(prompt='Keystone password for %s@admin: '
% program_args.username)
try:
admin_keystone_api, admin_token =\
nstool.auth_into_keystone(password, 'admin')
except ValueError:
logging.debug("Failed login!")
print "Authentication failed."
sys.exit(1)
del password
# Figure out what we're doing.
target_account, target_container = program_args.account,\
program_args.container
# Start doing it.
#pool = multiprocessing.Pool(processes=multiprocessing.cpu_count()-1)
# Multiprocessing does not play nicely with the lazy loading that
# keystoneclient does, so let's not mess with it for now.
pool = None
# Doing this convolution because if we scope into a project as root@admin,
# and initialize it, the symlink always gets written to 'admin' because
# SwiftOnHPSS only gets the name of the account currently scoped into,
# even if it's not the same as the account we're modifying containers in.
# So we just cheat and make ourselves a member of every project, and
# then scope in as root into that account to initialize it.
# TODO: make this use a multiprocessing pool
# FIXME: this repeats itself, refactor to be less bad
if program_args.account == "":
for account in admin_keystone_api.list_tenants():
try:
admin_keystone_api.grant_role_on_tenant(account['name'],
program_args.username,
'_member_')
admin_keystone_api.grant_role_on_tenant(account['name'],
program_args.username,
'admin')
except KeyError:
sys.exit(1)
user_keystone_api = \
LightweightKeystoneAPI(admin_keystone_api.url,
admin_keystone_api.username,
admin_keystone_api.password,
account['name'],
admin_keystone_api.version)
user_keystone_api.authenticate()
nstool.initialize_account(keystone_api=user_keystone_api,
target_account=account['name'])
else:
try:
admin_keystone_api.grant_role_on_tenant(target_account,
program_args.username,
'_member_')
admin_keystone_api.grant_role_on_tenant(target_account,
program_args.username,
'admin')
except KeyError:
sys.exit(1)
user_keystone_api = \
LightweightKeystoneAPI(admin_keystone_api.url,
admin_keystone_api.username,
admin_keystone_api.password,
target_account,
admin_keystone_api.version)
user_keystone_api.authenticate()
nstool.initialize_account(keystone_api=user_keystone_api,
target_account=target_account)
# Handle reconciling one or all containers.
if target_container != "":
swift_api = nstool.auth_into_swift(keystone_api=admin_keystone_api,
target_account=target_account)
nstool.reconcile_container(keystone_api=admin_keystone_api,
swift_api=swift_api,
target_account=target_account,
target_container=target_container)
elif target_account != "":
nstool.reconcile_account(keystone_api=admin_keystone_api,
target_account=target_account,
pool=pool)
else:
# reconcile everything
nstool.reconcile_all_accounts(keystone_api=admin_keystone_api,
pool=pool)
if pool:
pool.join()
pool.close()
print "Done"
def check_usage():
"""
Checks the user arguments and parses them
:returns: argument namespace
:rtype: argparse.Namespace
"""
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--device",
type=str,
help="Swift device path",
required=True)
parser.add_argument('-a', "--account",
type=str,
help="Account to reconcile",
default="")
parser.add_argument('-c', '--container',
type=str,
help="Container to reconcile",
default="")
parser.add_argument("-k", "--keystone-url",
type=str,
help="Keystone url endpoint",
required=True)
# TODO: get this from Keystone
parser.add_argument("-s", "--storage-url",
type=str,
help="Storage url endpoint",
required=True)
parser.add_argument("-u", "--username",
type=str,
help="Admin user (default: root)",
default="root")
parser.add_argument("-p", "--prefix",
type=str,
help="Auth prefix for account",
required=True)
parser.add_argument("-l", "--logfile",
type=str,
help="Location of log file",
default='/var/log/swift/nstool.log')
parser.add_argument("-g", "--verify-ssl",
type=bool,
help="Whether Keystone should verify \
SSL certificate (True or False)",
default=True)
parser.add_argument("-n", "--storage-policy-name",
type=str,
help="Swift storage policy name for SwiftOnHPSS",
required=True)
parser.add_argument("-v", "--verbose",
help="Show debug traces",
action="store_true")
return parser.parse_args()
class Reconciler:
def __init__(self, args):
self._args = args
def _report(self, msg):
if self._args.verbose:
print msg
@trace_function
def auth_into_swift(self, keystone_api, target_account):
"""
Gets a Swift API client object that is authorized to make changes to
the specified Swift account/tenant by name.
:param LightweightKeystoneAPI keystone_api:
Keystone API instance
:param str target_account:
Account name from Keystone we want to target
:returns:
Swift API instance that is authorized to alter the given account
"""
keystone_acct = keystone_api.get_tenant(target_account)
account_url = "%s/%s_%s" % (self._args.storage_url,
self._args.prefix,
keystone_acct['id'])
swift_api = \
swiftclient.client.Connection(preauthurl=account_url,
preauthtoken=keystone_api.auth_token)
return swift_api
def auth_into_keystone(self, admin_password, target_project):
"""
Returns a Keystone client object and auth token if authentication is
successful.
:param str admin_password: Password to authenticate with
:param str target_project: Project to scope into
:returns: Keystone API client, and authorization token
"""
keystone_api = LightweightKeystoneAPI(url=self._args.keystone_url,
username=self._args.username,
password=admin_password,
tenant_name=target_project,
api_version="v2"
)
if not keystone_api.authenticate():
raise ValueError('Could not authenticate into Keystone!')
return keystone_api, keystone_api.auth_token
@trace_function
def list_hpss_containers(self, swift_api, target_account):
"""
Lists containers in Swift metadata that use the given storage policy
name, for a given account.
:param target_account: Account to look in
:param swiftclient.client.Connection swift_api:
Swift API client, authenticated for a given account
:return:
List of container dictionaries with this property
"""
hpss_containers = []
containers = swift_api.get_account(target_account)[1]
for container in containers:
storage_policy = swift_api.get_container()[0]['X-Storage-Policy']
if storage_policy == self._args.storage_policy_name:
hpss_containers.append(container)
return hpss_containers
@trace_function
def list_accounts(self, keystone_api):
"""
Convenience function to list all Keystone accounts.
:param LightweightKeystoneAPI keystone_api: Keystone API client
:returns: List of accounts in Keystone
:rtype: List of Keystone tenant names
"""
return [tenant['name'] for tenant in keystone_api.list_tenants()]
@trace_function
def initialize_account(self, keystone_api, target_account):
"""
Initializes a single account in HPSS, or in other words creates the
account directory in HPSS.
:param LightweightKeystoneAPI root_keystone_api:
Keystone API client for root user scoped into target account
:param str target_account: Account name to initialize
"""
# TODO: decide if account needs initializing before here
swift_api = self.auth_into_swift(keystone_api=keystone_api,
target_account=target_account)
temp_name = ".deleteme"
print "Initializing account %s..." % target_account
swift_api.put_container(temp_name,
headers={'X-Storage-Policy':
self._args.storage_policy_name})
swift_api.put_object(temp_name, '.deleteme', contents=None)
@trace_function
def reconcile_all_accounts(self, keystone_api, pool):
"""
Reconciles all accounts.
:param LightweightKeystoneAPI keystone_api:
Keystone API client
:param multiprocessing.Pool pool: Process pool
:returns: nothing
"""
all_accounts = self.list_accounts(keystone_api)
print "Reconciling all accounts..."
for account in all_accounts:
self.reconcile_account(keystone_api, account, pool)
@trace_function
def reconcile_account(self, keystone_api, target_account, pool):
"""
Reconciles all the containers in a single account.
:param LightweightKeystoneAPI keystone_api:
Keystone API client
:param str target_account: Account name with containers to reconcile
:param multiprocessing.Pool pool: Process pool
:returns: nothing
"""
swift_containers = []
account_directory = "%s/%s" % (self._args.device, target_account)
print "Reconciling account %s" % target_account
if not (os.path.isdir(account_directory) or
os.path.islink(account_directory)):
print "%s is not a directory" % account_directory
return
if target_account is None:
print "Account '%s' does not exist" % target_account
logging.exception("Account '%s' does not exist" % target_account)
return
swift_api = self.auth_into_swift(keystone_api=keystone_api,
target_account=target_account)
stats, containers = swift_api.get_account()
for val in containers:
swift_containers.append(val['name'])
good_containers = \
self.clean_account(swift_api, target_account)
# FIXME: Can we figure out a better way to deal with accounts with a
# ridiculous number of containers?
if len(good_containers) > 1000000:
print "Account has too many containers, exiting..."
return
# If we're operating on all containers, spawn more worker processes
for cont in good_containers:
if pool:
pool.apply_async(func=self.reconcile_container,
args=(keystone_api, swift_api, target_account,
cont))
else:
self.reconcile_container(keystone_api, swift_api,
target_account, cont)
@trace_function
def reconcile_container(self, keystone_api, swift_api, target_account,
target_container):
"""
Entry point for worker processes.
Makes changes to Swift containers depending on the changes made in HPSS
:param LightweightKeystoneAPI keystone_api: Keystone API client
:param swiftclient.Connection swift_api: Swift API client
:param str target_account: Account name the target container belongs to
:param str target_container: Container name that needs reconciling
:returns: nothing
"""
swift_containers = [account['name'] for account in
swift_api.get_account()[1]]
# Check if container directory exists
container_dir = "%s/%s/%s" % (self._args.device, target_account,
target_container)
if not os.path.isdir(container_dir):
print "%s is not a directory" % container_dir
logging.error("%s is not a directory" % container_dir)
return
# Check if container exists or else create it
if target_container not in swift_containers \
and target_container is not "":
print "Container %s does not exist, creating" % target_container
try:
swift_api.put_container(
target_container,
headers={'X-Storage-Policy':
self._args.storage_policy_name})
except ClientException as e:
print "Putting container %s went wrong" % target_container
logging.exception("Putting container %s went wrong" %
target_container)
raise e
print "Reconciling container %s/%s" % (target_account, target_container)
# Make sure those objects get added into the Swift metadata DBs
self.add_objects_from_hpss(swift_api, target_container, container_dir)
# Clear out objects that exist only in Swift
self.clean_container(swift_api, target_account, target_container)
# TODO: make sure we don't clobber permissions already existing!
# Set up permissions for the container in Swift afterwards
self.configure_permissions(swift_api, keystone_api, target_account,
target_container)
def _get_object_names(self, container_dir):
file_gen = os.walk(container_dir)
objects = []
for path, directories, files in file_gen:
for file in files:
objects.append(os.path.relpath('%s/%s' % (path, file),
container_dir))
return objects
@trace_function
def add_objects_from_hpss(self, swift_api, target_container, container_dir):
"""
Update object metadata on object creates, and returns a list of all the
objects existing in the container from Swift.
:param swiftclient.Connection swift_api: Swift API client
:param str target_container: Container to add objects to
:param str container_dir: Container directory to scan for objects in
:returns: List of objects from Swift metadata in container
"""
# TODO: be smarter about what files we HEAD
hpss_objects = self._get_object_names(container_dir)
logging.debug('hpss_objects is %s' % str(hpss_objects))
for obj in hpss_objects:
print "Adding object %s..." % obj
try:
swift_api.head_object(target_container, obj)
except ClientException as err:
fail_reason = "Updating object %s in container %s went wrong"\
% (obj, target_container)
print fail_reason
raise err
# Get list of objects from this container
try:
container_stats, swift_objects =\
swift_api.get_container(target_container)
logging.debug('swift_objects is %s' % str(swift_objects))
except ClientException as err:
print "Get on container %s went wrong" % target_container
raise err
return swift_objects
@trace_function
def clean_account(self, swift_api, target_account):
"""
Deletes containers in Swift that do not exist on the HPSS filesystem,
and returns a list of the containers that exist in HPSS.
:param swiftclient.Connection swift_api: Swift API client
:param str target_account: Account of the containers that need cleaning
:returns: List of containers existing on HPSS mount point
"""
# Check containers in HPSS
account_directory = "%s/%s" % (self._args.device, target_account)
# Check containers in Swift
swift_containers = [container['name'] for container
in swift_api.get_account()[1]]
try:
hpss_containers = os.listdir(account_directory)
except OSError as err:
print "Unable to list files under directory: %s" % account_directory
raise err
# Delete containers that only exist in Swift, but not HPSS
ghost_containers = list(set(swift_containers) - set(hpss_containers))
for target_container in ghost_containers:
try:
for obj in swift_api.get_container(target_container)[1]:
try:
swift_api.delete_object(target_container, obj['name'])
except ClientException as e:
if e.http_status == 404:
pass
swift_api.delete_container(target_container)
except ClientException:
print "Deleting container '%s' went wrong!" % target_container
raise
return hpss_containers
@trace_function
def clean_container(self, swift_api, target_account, target_container):
"""
Deletes Swift objects that do not exist in HPSS from the container
:param swiftclient.Connection swift_api: Swift API client
:param str target_account: Account that target container belongs to
:param str target_container: Container that needs cleaning
:returns: nothing
"""
container_path = "%s/%s/%s" % (self._args.device, target_account,
target_container)
hpss_objects = self._get_object_names(container_path)
swift_objects = [obj['name'] for obj in
swift_api.get_container(target_container)[1]]
known_good_objects = []
swift_only_objects = list(set(swift_objects) - set(hpss_objects))
# If we have objects that only exist in the Swift metadata, delete those
# objects.
for target_obj in swift_only_objects:
try:
swift_api.delete_object(target_container, target_obj)
except ClientException:
container_stats, post_delete_dict =\
swift_api.get_container(target_container)
for obj in post_delete_dict:
known_good_objects.append(obj['name'])
if target_obj in known_good_objects:
fail_reason =\
"Deleting object %s in container %s went wrong"\
% (target_obj, target_container)
logging.error(fail_reason)
print fail_reason
raise IOError(fail_reason)
# TODO: clean up this code
@trace_function
def configure_permissions(self, swift_api, keystone_api, target_account,
target_container):
"""
Configuring Swift Container ACLs
:param swiftclient.Connection swift_api:
Swift API client
:param LightweightKeystoneAPI keystone_api:
Keystone API client
:param str target_account: Account that container belongs in
:param str target_container: Container name
:returns: nothing
"""
# TODO: figure out how to deal with files that have more restrictive
# permissions than the container does
path = "%s/%s/%s" % (self._args.device, target_account,
target_container)
# Obtain unix user permissions for path
try:
file_user = getpwuid(os.stat(path).st_uid).pw_name
mode = os.stat(path).st_mode
except KeyError as err:
fail_reason = "Cannot find permissions for %s" % path
print fail_reason
logging.exception(fail_reason)
raise err
# Check if file user exists in Keystone
try:
keystone_users = [user['name'] for user
in keystone_api.list_users()]
except Exception as e:
fail_reason = "Couldn't get user list"
print fail_reason
logging.exception(fail_reason)
raise e
if file_user not in keystone_users:
fail_reason = \
"Cannot configure proper permissions for this path %s\
because user %s does not exist in keystone" % (path, file_user)
print fail_reason
logging.error(fail_reason)
raise IOError(fail_reason)
# Update container ACLs
try:
if mode & stat.S_IRUSR:
headers = {"X-Read-Container": "%s:%s"
% (target_account, file_user)}
swift_api.post_container(target_container, headers)
if mode & stat.S_IWUSR:
headers = {"X-Write-Container": "%s:%s"
% (target_account, file_user)}
swift_api.post_container(target_container, headers)
except requests.ConnectionError as err:
print "Cannot configure ACLs due to metadata header issue"
logging.exception(
"Cannot configure ACLs due to metadata header issue")
raise err
# This only exists because the keystoneclient library is so massive that it has
# to have a lazy-loading mechanism that ensures only one of it can be active,
# so we can't have handles to multiple different Keystone scopes simultaneously
class LightweightKeystoneAPI:
MEMBER_ROLE_ID = '9fe2ff9ee4384b1894a90878d3e92bab'
def __init__(self, url, username, password, tenant_name, api_version='v2'):
self.url = url
self.username = username
self.password = password
self.tenant_name = tenant_name
self.auth_token = None
self.version = api_version
logging.debug('New LightweightKeystoneAPI instance created for %s@%s,'
' id = %s' %
(self.username, self.tenant_name, hex(id(self))))
def _get_keystone_response(self, method, url, headers=None, json=None):
resp = method(url=url,
headers=headers,
json=json)
logging.debug('Response code: %s' % str(resp.status_code))
logging.debug('Response content: %s' % resp.content)
if not (200 <= resp.status_code <= 299):
raise IOError(resp.status_code, 'Request was unsuccessful')
try:
resp_json = resp.json()
logging.debug('response json: %s' % resp_json)
except JSONDecodeError:
logging.error('Malformed response from Keystone')
raise IOError('Malformed response from Keystone')
return resp.headers, resp_json
@trace_function
def authenticate(self):
if self.auth_token:
return True
if self.version == 'v2':
url = '%s/tokens' % self.url
token_req = {'auth': {'tenantName': self.tenant_name,
'passwordCredentials': {
'username': self.username,
'password': self.password
}}}
else:
url = '%s/auth/tokens' % self.url
token_req = {'auth': {'identity':
{'methods': ['password'],
'password': {
'user': {
'name': self.username,
'password': self.password,
'domain': {'id': 'default'}
}
}
},
'scope': {
'project': {
'name': self.tenant_name,
'domain': {
'id': 'default'
}
}
}
}
}
try:
resp_headers, resp_json =\
self._get_keystone_response(requests.post,
url,
None,
token_req)
except IOError:
return False
if self.version == 'v2':
self.auth_token = resp_json['access']['token']['id']
else:
self.auth_token = resp_headers['X-Subject-Token']
logging.debug('New auth token: %s' % self.auth_token)
return True
@trace_function
def list_users(self):
headers = {'X-Auth-Token': self.auth_token}
resp_headers, resp_json = \
self._get_keystone_response(requests.get,
'%s/users' % self.url,
headers)
return resp_json['users']
@trace_function
def list_tenants(self):
if self.version == 'v2':
url = '%s/tenants' % self.url
else:
url = '%s/projects' % self.url
resp_headers, resp_json = \
self._get_keystone_response(requests.get,
url,
{'X-Auth-Token': self.auth_token})
if self.version == 'v2':
return resp_json['tenants']
else:
return resp_json['projects']
@trace_function
def list_roles(self):
if self.version == 'v2':
url = '%s/OS-KSADM/roles' % self.url
else:
url = '%s/roles' % self.url
resp_headers, resp_json =\
self._get_keystone_response(requests.get,
url,
{'X-Auth-Token':
self.auth_token})
return resp_json['roles']
@trace_function
def get_role(self, role_name):
role_id = self._get_id_for_role_name(role_name)
if self.version == 'v2':
url = '%s/OS-KSADM/%s' % (self.url, role_id)
else:
url = '%s/roles/%s' % (self.url, role_id)
resp_headers, resp_json = \
self._get_keystone_response(requests.get,
url,
{'X-Auth-Token':
self.auth_token})
return resp_json['role']
@trace_function
# TODO: write v3 version of this
def get_tenant(self, target_tenant):
url = '%s/tenants?name=%s' % (self.url, target_tenant)
headers = {'X-Auth-Token': self.auth_token,
'name': target_tenant}
logging.debug('url: %s' % url)
logging.debug('headers: %s' % headers)
resp_headers, resp_json = self._get_keystone_response(requests.get,
url,
headers)
return resp_json['tenant']
@trace_function
def _get_id_for_user_name(self, user_name):
users = [user['id'] for user in
self.list_users() if user['username'] == user_name]
if len(users) == 0:
raise KeyError('%s is not a user' % user_name)
else:
return users[0]
@trace_function
def _get_id_for_tenant_name(self, tenant_name):
tenants = [tenant['id'] for tenant in self.list_tenants()
if tenant['name'] == tenant_name]
if len(tenants) == 0:
raise KeyError('%s is not a tenant' % tenant_name)
else:
return tenants[0]
@trace_function
def _get_id_for_role_name(self, role_name):
roles = [role['id'] for role in
self.list_roles() if role['name'] == role_name]
if len(roles) == 0:
raise KeyError('%s is not a role' % role_name)
else:
return roles[0]
@trace_function
def grant_role_on_tenant(self, target_project, target_user, target_role):
try:
tenant_id = self._get_id_for_tenant_name(target_project)
user_id = self._get_id_for_user_name(target_user)
role_id = self._get_id_for_role_name(target_role)
except KeyError as e:
print "Could not grant role %s to %s@%s (reason: %s)" % \
(target_role, target_user, target_project, e.message)
raise
if self.version == 'v2':
url = '%s/tenants/%s/users/%s/roles/OS-KSADM/%s' %\
(self.url, tenant_id, user_id, role_id)
else:
url = '%s/projects/%s/users/%s/roles/%s' %\
(self.url, tenant_id, user_id, role_id)
try:
self._get_keystone_response(requests.put, url,
{'X-Auth-Token': self.auth_token})
except IOError as e:
if e.errno == 409:
# We must've already granted ourselves this role. Carry on.
pass
else:
raise
if __name__ == "__main__":
_args = check_usage()
if os.getuid() != 0:
print 'swiftonhpss-nstool must be run as root'
sys.exit(1)
# Initiating Log File
if _args.verbose:
log_level = logging.DEBUG
else:
log_level = logging.ERROR
logging.basicConfig(filename=_args.logfile,
level=log_level)
main(_args)

View File

@ -1,3 +1,4 @@
# Copyright (c) 2016 IBM Corporation
# Copyright (c) 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -22,26 +23,27 @@ setup(
version=_pkginfo.full_version,
description='SwiftOnHPSS',
license='Apache License (2.0)',
author='IBM & Red Hat, Inc.',
author='IBM Corporation; Red Hat, Inc.',
url='https://github.com/hpss-collaboration/swiftonhpss',
packages=find_packages(exclude=['test', 'bin']),
test_suite='nose.collector',
classifiers=[
'Development Status :: 2 - Pre-Alpha'
'Environment :: OpenStack'
'Intended Audience :: Information Technology'
'Intended Audience :: System Administrators'
'License :: OSI Approved :: Apache Software License'
'Operating System :: POSIX :: Linux'
'Programming Language :: Python'
'Programming Language :: Python :: 2'
'Programming Language :: Python :: 2.6'
'Development Status :: 2 - Pre-Alpha',
'Environment :: OpenStack',
'Intended Audience :: Information Technology',
'Intended Audience :: System Administrators',
'License :: OSI Approved :: Apache Software License',
'Operating System :: POSIX :: Linux',
'Programming Language :: Python',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.6',
'Programming Language :: Python :: 2.7'
],
install_requires=[],
scripts=[
'bin/swiftonhpss-print-metadata',
'bin/swiftonhpss-migrate-metadata',
'bin/swiftonhpss-nstool'
],
entry_points={
'paste.app_factory': [

View File

@ -24,6 +24,7 @@ from eventlet import sleep
import cPickle as pickle
from cStringIO import StringIO
import pickletools
import xattr
from swiftonhpss.swift.common.exceptions import SwiftOnFileSystemIOError
from swift.common.exceptions import DiskFileNoSpace
from swift.common.db import utf8encodekeys
@ -235,24 +236,29 @@ def _read_for_etag(fp):
return etag.hexdigest()
def get_etag(fd):
def get_etag(fd_or_path):
"""
FIXME: It would be great to have a translator that returns the md5sum() of
the file as an xattr that can be simply fetched.
Either read the ETag from HPSS metadata, or read the entire file to
generate it.
"""
# Try to just get the MD5 sum from HPSS. We're assuming that we recheck
# this checksum every time we actually open the file for read/write.
attrs = xattr.xattr(fd_or_path)
if 'system.hpss.hash' in attrs:
return attrs['system.hpss.hash']
elif 'user.hash.checksum' in attrs:
return attrs['user.hash.checksum']
Since we don't have that we should yield after each chunk read and
computed so that we don't consume the worker thread.
"""
if isinstance(fd, int):
if isinstance(fd_or_path, int):
# We are given a file descriptor, so this is an invocation from the
# DiskFile.open() method.
fd = fd
etag = _read_for_etag(do_dup(fd))
do_lseek(fd, 0, os.SEEK_SET)
etag = _read_for_etag(do_dup(fd_or_path))
do_lseek(fd_or_path, 0, os.SEEK_SET)
else:
# We are given a path to the object when the DiskDir.list_objects_iter
# method invokes us.
path = fd
path = fd_or_path
fd = do_open(path, os.O_RDONLY)
etag = _read_for_etag(fd)
do_close(fd)
@ -264,6 +270,7 @@ def get_object_metadata(obj_path_or_fd, stats=None):
"""
Return metadata of object.
"""
logging.error('Entering get_object_metadata for %s' % obj_path_or_fd)
if not stats:
if isinstance(obj_path_or_fd, int):
# We are given a file descriptor, so this is an invocation from the

View File

@ -24,6 +24,7 @@ except ImportError:
import logging
import time
import hpssfs
import xattr
from uuid import uuid4
from hashlib import md5
from eventlet import sleep
@ -54,7 +55,7 @@ from swift.obj.diskfile import get_async_dir
# FIXME: Hopefully we'll be able to move to Python 2.7+ where O_CLOEXEC will
# be back ported. See http://www.python.org/dev/peps/pep-0433/
O_CLOEXEC = 02000000
O_CLOEXEC = 0o20000000
MAX_RENAME_ATTEMPTS = 10
MAX_OPEN_ATTEMPTS = 10
@ -298,7 +299,7 @@ class DiskFileWriter(object):
df._threadpool.run_in_thread(self._write_entire_chunk, chunk)
return self._upload_size
def _finalize_put(self, metadata, purgelock=False):
def _finalize_put(self, metadata, purgelock=False, has_etag=True):
# Write out metadata before fsync() to ensure it is also forced to
# disk.
write_metadata(self._fd, metadata)
@ -390,11 +391,33 @@ class DiskFileWriter(object):
else:
# Success!
break
# Close here so the calling context does not have to perform this
# in a thread.
self.close()
def put(self, metadata, purgelock):
# TODO: see if this is really the right way of getting the ETag
if not has_etag:
sleep(.5)
try:
xattrs = xattr.xattr(df._data_file)
if 'system.hpss.hash' in xattrs:
etag = xattrs['system.hpss.hash']
elif 'user.hash.checksum' in xattrs:
etag = xattrs['user.hash.checksum']
else:
raise DiskFileError(
'ETag was not in HPSS xattrs for file %s'
% df._data_file)
metadata['ETag'] = etag
write_metadata(df._data_file, metadata)
except IOError as err:
raise DiskFileError(
err.errno,
"Could not get xattrs for file '%s', reason: %s"
% (df._data_file, err.strerror))
def put(self, metadata, purgelock=False, has_etag=True):
"""
Finalize writing the file on disk, and renames it from the temp file
to the real location. This should be called after the data has been
@ -424,7 +447,7 @@ class DiskFileWriter(object):
' as a directory' % df._data_file)
df._threadpool.force_run_in_thread(self._finalize_put, metadata,
purgelock)
purgelock, has_etag)
# Avoid the unlink() system call as part of the mkstemp context
# cleanup
@ -622,6 +645,7 @@ class DiskFile(object):
self._put_datadir = self._container_path
self._data_file = os.path.join(self._put_datadir, self._obj)
self._stat = do_stat(self._data_file)
@property
def timestamp(self):
@ -668,7 +692,7 @@ class DiskFile(object):
obj_size = self._stat.st_size
self._metadata = read_metadata(self._fd)
if not self._validate_object_metadata(self._fd):
if not self._validate_object_metadata():
self._create_object_metadata(self._fd)
assert self._metadata is not None
@ -700,11 +724,10 @@ class DiskFile(object):
return self
def _validate_object_metadata(self, fd):
def _validate_object_metadata(self):
# Has no Swift specific metadata saved as xattr. Probably because
# object was added/replaced through filesystem interface.
if not self._metadata:
if not self._metadata and not self._is_dir:
self._file_has_changed = True
return False
@ -724,15 +747,16 @@ class DiskFile(object):
# Check if the file has been modified through filesystem
# interface by comparing mtime stored in xattr during PUT
# and current mtime of file.
obj_stat = os.stat(self._data_file)
if normalize_timestamp(self._metadata[X_MTIME]) != \
normalize_timestamp(self._stat.st_mtime):
normalize_timestamp(obj_stat.st_mtime):
self._file_has_changed = True
return False
else:
# Without X_MTIME key, comparing md5sum is the only way
# to determine if file has changed or not. This is inefficient
# but there's no other way!
self._etag = get_etag(fd)
self._etag = get_etag(self._data_file)
if self._etag != self._metadata[X_ETAG]:
self._file_has_changed = True
return False
@ -749,7 +773,6 @@ class DiskFile(object):
return False
def _create_object_metadata(self, fd):
if self._etag is None:
self._etag = md5().hexdigest() if self._is_dir \
else get_etag(fd)
@ -801,6 +824,23 @@ class DiskFile(object):
return True
return False
def is_offline(self):
try:
raw_file_levels = xattr.getxattr(self._data_file,
"system.hpss.level")
except IOError as err:
raise SwiftOnFileSystemIOError(
err.errno,
'%s, xattr.getxattr("system.hpss.level", ...)' % err.strerror
)
try:
file_levels = raw_file_levels.split(";")
top_level = file_levels[0].split(':')
bytes_on_disk = top_level[2].rstrip(' ')
except ValueError:
raise SwiftOnFileSystemIOError("Couldn't get system.hpss.level!")
return bytes_on_disk != self._stat.st_size
def __enter__(self):
"""
Context enter.
@ -848,15 +888,28 @@ class DiskFile(object):
def read_metadata(self):
"""
Return the metadata for an object without requiring the caller to open
the object first.
Return the metadata for an object without opening the object's file on
disk.
:returns: metadata dictionary for an object
:raises DiskFileError: this implementation will raise the same
errors as the `open()` method.
"""
with self.open():
return self.get_metadata()
# FIXME: pull a lot of this and the copy of it from open() out to
# another function
# Do not actually open the file, in order to duck hpssfs checksum
# validation and resulting timeouts
# This means we do a few things DiskFile.open() does.
try:
self._is_dir = os.path.isdir(self._data_file)
self._metadata = read_metadata(self._data_file)
except IOError:
raise DiskFileNotExist
if not self._validate_object_metadata():
self._create_object_metadata(self._data_file)
self._filter_metadata()
return self._metadata
def reader(self, iter_hook=None, keep_cache=False):
"""

View File

@ -21,6 +21,7 @@ import time
import xattr
import os
import hpssfs
import time
from hashlib import md5
from swift.common.swob import HTTPConflict, HTTPBadRequest, HeaderKeyDict, \
HTTPInsufficientStorage, HTTPPreconditionFailed, HTTPRequestTimeout, \
@ -78,6 +79,7 @@ class ObjectController(server.ObjectController):
# Replaces Swift's DiskFileRouter object reference with ours.
self._diskfile_router = SwiftOnFileDiskFileRouter(conf, self.logger)
self.swift_dir = conf.get('swift_dir', '/etc/swift')
self.handle_md5 = conf.get('handle_md5', False)
self.container_ring = None
# This conf option will be deprecated and eventualy removed in
# future releases
@ -152,7 +154,8 @@ class ObjectController(server.ObjectController):
orig_delete_at = int(orig_metadata.get('X-Delete-At') or 0)
upload_expiration = time.time() + self.max_upload_time
etag = md5()
if self.handle_md5:
etag = md5()
elapsed_time = 0
# (HPSS) Check for HPSS-specific metadata headers
@ -178,7 +181,8 @@ class ObjectController(server.ObjectController):
if start_time > upload_expiration:
self.logger.increment('PUT.timeouts')
return HTTPRequestTimeout(request=request)
etag.update(chunk)
if self.handle_md5:
etag.update(chunk)
upload_size = writer.write(chunk)
elapsed_time += time.time() - start_time
except ChunkReadTimeout:
@ -188,7 +192,10 @@ class ObjectController(server.ObjectController):
elapsed_time, upload_size)
if fsize and fsize != upload_size:
return HTTPClientDisconnect(request=request)
etag = etag.hexdigest()
if self.handle_md5:
etag = etag.hexdigest()
else:
etag = ''
if 'etag' in request.headers \
and request.headers['etag'].lower() != etag:
return HTTPUnprocessableEntity(request=request)
@ -210,33 +217,40 @@ class ObjectController(server.ObjectController):
metadata[header_caps] = request.headers[header_key]
# (HPSS) Purge lock the file
writer.put(metadata, purgelock=purgelock)
writer.put(metadata, purgelock=purgelock,
has_etag=self.handle_md5)
except DiskFileNoSpace:
return HTTPInsufficientStorage(drive=device, request=request)
except SwiftOnFileSystemIOError:
return HTTPServiceUnavailable(request=request)
# (HPSS) Set checksum on file
try:
xattr.setxattr(disk_file._data_file, 'system.hpss.hash',
"md5:%s" % etag)
except IOError:
logging.exception("Error setting HPSS E2EDI checksum in "
"system.hpss.hash, storing ETag in "
"user.hash.checksum\n")
# FIXME: this stuff really should be handled in DiskFile somehow?
if self.handle_md5:
# (HPSS) Set checksum on file ourselves, if hpssfs won't do it
# for us.
try:
xattr.setxattr(disk_file._data_file,
'user.hash.checksum', etag)
xattr.setxattr(disk_file._data_file,
'user.hash.algorithm', 'md5')
xattr.setxattr(disk_file._data_file,
'user.hash.state', 'Valid')
xattr.setxattr(disk_file._data_file,
'user.hash.filesize', str(upload_size))
except IOError as err:
raise SwiftOnFileSystemIOError(
err.errno, '%s, xattr.setxattr(...)' % err.strerror)
xattr.setxattr(disk_file._data_file, 'system.hpss.hash',
"md5:%s" % etag)
except IOError:
logging.debug("Could not write ETag to system.hpss.hash,"
" trying user.hash.checksum")
try:
xattr.setxattr(disk_file._data_file,
'user.hash.checksum', etag)
xattr.setxattr(disk_file._data_file,
'user.hash.algorithm', 'md5')
xattr.setxattr(disk_file._data_file,
'user.hash.state', 'Valid')
xattr.setxattr(disk_file._data_file,
'user.hash.filesize', str(upload_size))
xattr.setxattr(disk_file._data_file,
'user.hash.app', 'swiftonhpss')
except IOError as err:
raise SwiftOnFileSystemIOError(
err.errno,
'Could not write MD5 checksum to HPSS filesystem: '
'%s' % err.strerror)
# Update container metadata
if orig_delete_at != new_delete_at:
@ -336,6 +350,7 @@ class ObjectController(server.ObjectController):
try:
disk_file = self.get_diskfile(device, partition, account, container,
obj, policy=policy)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
@ -370,7 +385,7 @@ class ObjectController(server.ObjectController):
pass
try:
self.get_hpss_xattr(request, response, disk_file)
self._get_hpss_xattr(request, response, disk_file)
except SwiftOnFileSystemIOError:
return HTTPServiceUnavailable(request=request)
@ -408,7 +423,7 @@ class ObjectController(server.ObjectController):
# (HPSS) Our file could end up being on an offline
# tape, so we need to check for it and return an
# HTTP 'accepted, but still processing' response.
if self.is_offline(disk_file._data_file, request):
if disk_file.is_offline():
return HTTPAccepted(request=request)
except (SwiftOnFileSystemIOError, SwiftOnFileFsException):
return HTTPServiceUnavailable(request=request)
@ -435,7 +450,7 @@ class ObjectController(server.ObjectController):
response.headers['X-Backend-Timestamp'] = file_x_ts.internal
# (HPSS) Inject HPSS xattr metadata into headers
try:
self.get_hpss_xattr(request, response, disk_file)
self._get_hpss_xattr(request, response, disk_file)
except SwiftOnFileSystemIOError:
return HTTPServiceUnavailable(request=request)
return request.get_response(response)
@ -448,7 +463,7 @@ class ObjectController(server.ObjectController):
# TODO: refactor this to live in DiskFile!
# Along with all the other HPSS stuff
def get_hpss_xattr(self, request, response, diskfile):
def _get_hpss_xattr(self, request, response, diskfile):
attrlist = {'X-HPSS-Account': 'account',
'X-HPSS-BitfileID': 'bitfile',
'X-HPSS-Comment': 'comment',
@ -476,27 +491,6 @@ class ObjectController(server.ObjectController):
'%s, xattr.getxattr("%s", ...)' % (err.strerror, attr)
)
# TODO: move this to DiskFile
# TODO: make it more obvious how we're parsing the level xattr
def is_offline(self, path, request):
try:
byteslevel = xattr.getxattr(path, "system.hpss.level")
except IOError as err:
raise SwiftOnFileSystemIOError(
err.errno,
'%s, xattr.getxattr("system.hpss.level", ...)' % err.strerror
)
try:
byteslevelstring = byteslevel.split(";")
bytesfirstlevel = byteslevelstring[0].split(':')
bytesfile = bytesfirstlevel[2].rstrip(' ')
except ValueError:
raise SwiftOnFileFsException("Couldn't get system.hpss.level!")
setbytes = set(str(bytesfile))
setsize = set(str(os.stat(path).st_size))
return setbytes != setsize
@public
@timing_stats()
def POST(self, request):
@ -582,7 +576,14 @@ class ObjectController(server.ObjectController):
orig_timestamp = e.timestamp
orig_metadata = {}
response_class = HTTPNotFound
except (DiskFileNotExist, DiskFileQuarantined):
# If the file got deleted outside of Swift, we won't see it. So just say
# it got deleted, even if it never existed in the first place.
except DiskFileNotExist:
orig_timestamp = 0
orig_metadata = {}
response_class = HTTPNoContent
except DiskFileQuarantined:
orig_timestamp = 0
orig_metadata = {}
response_class = HTTPNotFound