diff --git a/bin/swiftonhpss-nstool b/bin/swiftonhpss-nstool new file mode 100755 index 0000000..a9666ea --- /dev/null +++ b/bin/swiftonhpss-nstool @@ -0,0 +1,855 @@ +#!/usr/bin/python +# +# Copyright (c) 2015-2016 IBM Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import stat +import os +import multiprocessing +from pwd import getpwuid +import logging +import argparse +import swiftclient +from swiftclient.exceptions import ClientException +import requests +import getpass +from functools import wraps + +from simplejson.scanner import JSONDecodeError + +""" +Namespace reconciliation tool for SwiftOnHPSS. + +Depends on Keystone for authentication, preferably with the Keystone HPSS +identity backend. + +Has not been tested with any other authentication service. + +WARNING: Do not run nstool on a regular Swift JBOD disk, or 'fun' things +will happen to your metadata. + +LIMITATION: In order to save unnecessary file I/O on HPSS, it is highly +recommended to add 'object_post_as_copy=false' to the configuration of +the proxy server that will handle SwiftOnHPSS calls! + +LIMITATION: Only supports the Keystone Identity V2 API +""" + +# TODO: pull OpenStack details from env vars + + +def trace_function(f): + @wraps(f) + def wrap(*args, **kwargs): + logging.info('entering %s' % f.__name__) + logging.info('args: %s' % str(args)) + logging.info('kwargs: %s' % str(kwargs)) + result = f(*args, **kwargs) + logging.info('result: %s' % str(result)) + logging.info('exiting %s' % f.__name__) + return result + return wrap + + +@trace_function +def main(program_args): + + nstool = Reconciler(program_args) + + if program_args.device.endswith('/'): + program_args.device = program_args.device[:-1] + + # Get and check authentication + password = getpass.getpass(prompt='Keystone password for %s@admin: ' + % program_args.username) + + try: + admin_keystone_api, admin_token =\ + nstool.auth_into_keystone(password, 'admin') + except ValueError: + logging.debug("Failed login!") + print "Authentication failed." + sys.exit(1) + + del password + + # Figure out what we're doing. + target_account, target_container = program_args.account,\ + program_args.container + + # Start doing it. + #pool = multiprocessing.Pool(processes=multiprocessing.cpu_count()-1) + + # Multiprocessing does not play nicely with the lazy loading that + # keystoneclient does, so let's not mess with it for now. + pool = None + + # Doing this convolution because if we scope into a project as root@admin, + # and initialize it, the symlink always gets written to 'admin' because + # SwiftOnHPSS only gets the name of the account currently scoped into, + # even if it's not the same as the account we're modifying containers in. + # So we just cheat and make ourselves a member of every project, and + # then scope in as root into that account to initialize it. + # TODO: make this use a multiprocessing pool + # FIXME: this repeats itself, refactor to be less bad + if program_args.account == "": + for account in admin_keystone_api.list_tenants(): + try: + admin_keystone_api.grant_role_on_tenant(account['name'], + program_args.username, + '_member_') + admin_keystone_api.grant_role_on_tenant(account['name'], + program_args.username, + 'admin') + except KeyError: + sys.exit(1) + user_keystone_api = \ + LightweightKeystoneAPI(admin_keystone_api.url, + admin_keystone_api.username, + admin_keystone_api.password, + account['name'], + admin_keystone_api.version) + user_keystone_api.authenticate() + nstool.initialize_account(keystone_api=user_keystone_api, + target_account=account['name']) + else: + try: + admin_keystone_api.grant_role_on_tenant(target_account, + program_args.username, + '_member_') + admin_keystone_api.grant_role_on_tenant(target_account, + program_args.username, + 'admin') + except KeyError: + sys.exit(1) + user_keystone_api = \ + LightweightKeystoneAPI(admin_keystone_api.url, + admin_keystone_api.username, + admin_keystone_api.password, + target_account, + admin_keystone_api.version) + user_keystone_api.authenticate() + nstool.initialize_account(keystone_api=user_keystone_api, + target_account=target_account) + + # Handle reconciling one or all containers. + if target_container != "": + swift_api = nstool.auth_into_swift(keystone_api=admin_keystone_api, + target_account=target_account) + nstool.reconcile_container(keystone_api=admin_keystone_api, + swift_api=swift_api, + target_account=target_account, + target_container=target_container) + elif target_account != "": + nstool.reconcile_account(keystone_api=admin_keystone_api, + target_account=target_account, + pool=pool) + else: + # reconcile everything + nstool.reconcile_all_accounts(keystone_api=admin_keystone_api, + pool=pool) + + if pool: + pool.join() + pool.close() + + print "Done" + + +def check_usage(): + """ + Checks the user arguments and parses them + + :returns: argument namespace + :rtype: argparse.Namespace + """ + parser = argparse.ArgumentParser() + parser.add_argument("-d", "--device", + type=str, + help="Swift device path", + required=True) + parser.add_argument('-a', "--account", + type=str, + help="Account to reconcile", + default="") + parser.add_argument('-c', '--container', + type=str, + help="Container to reconcile", + default="") + parser.add_argument("-k", "--keystone-url", + type=str, + help="Keystone url endpoint", + required=True) + + # TODO: get this from Keystone + parser.add_argument("-s", "--storage-url", + type=str, + help="Storage url endpoint", + required=True) + + parser.add_argument("-u", "--username", + type=str, + help="Admin user (default: root)", + default="root") + parser.add_argument("-p", "--prefix", + type=str, + help="Auth prefix for account", + required=True) + parser.add_argument("-l", "--logfile", + type=str, + help="Location of log file", + default='/var/log/swift/nstool.log') + parser.add_argument("-g", "--verify-ssl", + type=bool, + help="Whether Keystone should verify \ + SSL certificate (True or False)", + default=True) + parser.add_argument("-n", "--storage-policy-name", + type=str, + help="Swift storage policy name for SwiftOnHPSS", + required=True) + parser.add_argument("-v", "--verbose", + help="Show debug traces", + action="store_true") + return parser.parse_args() + + +class Reconciler: + + def __init__(self, args): + self._args = args + + def _report(self, msg): + if self._args.verbose: + print msg + + @trace_function + def auth_into_swift(self, keystone_api, target_account): + """ + Gets a Swift API client object that is authorized to make changes to + the specified Swift account/tenant by name. + + :param LightweightKeystoneAPI keystone_api: + Keystone API instance + :param str target_account: + Account name from Keystone we want to target + :returns: + Swift API instance that is authorized to alter the given account + """ + keystone_acct = keystone_api.get_tenant(target_account) + + account_url = "%s/%s_%s" % (self._args.storage_url, + self._args.prefix, + keystone_acct['id']) + swift_api = \ + swiftclient.client.Connection(preauthurl=account_url, + preauthtoken=keystone_api.auth_token) + return swift_api + + def auth_into_keystone(self, admin_password, target_project): + """ + Returns a Keystone client object and auth token if authentication is + successful. + + :param str admin_password: Password to authenticate with + :param str target_project: Project to scope into + :returns: Keystone API client, and authorization token + """ + + keystone_api = LightweightKeystoneAPI(url=self._args.keystone_url, + username=self._args.username, + password=admin_password, + tenant_name=target_project, + api_version="v2" + ) + if not keystone_api.authenticate(): + raise ValueError('Could not authenticate into Keystone!') + return keystone_api, keystone_api.auth_token + + @trace_function + def list_hpss_containers(self, swift_api, target_account): + """ + Lists containers in Swift metadata that use the given storage policy + name, for a given account. + + :param target_account: Account to look in + :param swiftclient.client.Connection swift_api: + Swift API client, authenticated for a given account + :return: + List of container dictionaries with this property + """ + hpss_containers = [] + containers = swift_api.get_account(target_account)[1] + for container in containers: + storage_policy = swift_api.get_container()[0]['X-Storage-Policy'] + if storage_policy == self._args.storage_policy_name: + hpss_containers.append(container) + return hpss_containers + + @trace_function + def list_accounts(self, keystone_api): + """ + Convenience function to list all Keystone accounts. + + :param LightweightKeystoneAPI keystone_api: Keystone API client + :returns: List of accounts in Keystone + :rtype: List of Keystone tenant names + """ + return [tenant['name'] for tenant in keystone_api.list_tenants()] + + @trace_function + def initialize_account(self, keystone_api, target_account): + """ + Initializes a single account in HPSS, or in other words creates the + account directory in HPSS. + + :param LightweightKeystoneAPI root_keystone_api: + Keystone API client for root user scoped into target account + :param str target_account: Account name to initialize + """ + + # TODO: decide if account needs initializing before here + + swift_api = self.auth_into_swift(keystone_api=keystone_api, + target_account=target_account) + temp_name = ".deleteme" + print "Initializing account %s..." % target_account + swift_api.put_container(temp_name, + headers={'X-Storage-Policy': + self._args.storage_policy_name}) + swift_api.put_object(temp_name, '.deleteme', contents=None) + + @trace_function + def reconcile_all_accounts(self, keystone_api, pool): + """ + Reconciles all accounts. + + :param LightweightKeystoneAPI keystone_api: + Keystone API client + :param multiprocessing.Pool pool: Process pool + :returns: nothing + """ + all_accounts = self.list_accounts(keystone_api) + print "Reconciling all accounts..." + for account in all_accounts: + self.reconcile_account(keystone_api, account, pool) + + @trace_function + def reconcile_account(self, keystone_api, target_account, pool): + """ + Reconciles all the containers in a single account. + + :param LightweightKeystoneAPI keystone_api: + Keystone API client + :param str target_account: Account name with containers to reconcile + :param multiprocessing.Pool pool: Process pool + :returns: nothing + """ + swift_containers = [] + account_directory = "%s/%s" % (self._args.device, target_account) + print "Reconciling account %s" % target_account + if not (os.path.isdir(account_directory) or + os.path.islink(account_directory)): + print "%s is not a directory" % account_directory + return + if target_account is None: + print "Account '%s' does not exist" % target_account + logging.exception("Account '%s' does not exist" % target_account) + return + + swift_api = self.auth_into_swift(keystone_api=keystone_api, + target_account=target_account) + + stats, containers = swift_api.get_account() + + for val in containers: + swift_containers.append(val['name']) + + good_containers = \ + self.clean_account(swift_api, target_account) + + # FIXME: Can we figure out a better way to deal with accounts with a + # ridiculous number of containers? + if len(good_containers) > 1000000: + print "Account has too many containers, exiting..." + return + + # If we're operating on all containers, spawn more worker processes + for cont in good_containers: + if pool: + pool.apply_async(func=self.reconcile_container, + args=(keystone_api, swift_api, target_account, + cont)) + else: + self.reconcile_container(keystone_api, swift_api, + target_account, cont) + + @trace_function + def reconcile_container(self, keystone_api, swift_api, target_account, + target_container): + """ + Entry point for worker processes. + + Makes changes to Swift containers depending on the changes made in HPSS + :param LightweightKeystoneAPI keystone_api: Keystone API client + :param swiftclient.Connection swift_api: Swift API client + :param str target_account: Account name the target container belongs to + :param str target_container: Container name that needs reconciling + :returns: nothing + """ + + swift_containers = [account['name'] for account in + swift_api.get_account()[1]] + + # Check if container directory exists + container_dir = "%s/%s/%s" % (self._args.device, target_account, + target_container) + if not os.path.isdir(container_dir): + print "%s is not a directory" % container_dir + logging.error("%s is not a directory" % container_dir) + return + + # Check if container exists or else create it + if target_container not in swift_containers \ + and target_container is not "": + print "Container %s does not exist, creating" % target_container + try: + swift_api.put_container( + target_container, + headers={'X-Storage-Policy': + self._args.storage_policy_name}) + except ClientException as e: + print "Putting container %s went wrong" % target_container + logging.exception("Putting container %s went wrong" % + target_container) + raise e + print "Reconciling container %s/%s" % (target_account, target_container) + # Make sure those objects get added into the Swift metadata DBs + self.add_objects_from_hpss(swift_api, target_container, container_dir) + + # Clear out objects that exist only in Swift + self.clean_container(swift_api, target_account, target_container) + + # TODO: make sure we don't clobber permissions already existing! + # Set up permissions for the container in Swift afterwards + self.configure_permissions(swift_api, keystone_api, target_account, + target_container) + + def _get_object_names(self, container_dir): + file_gen = os.walk(container_dir) + objects = [] + for path, directories, files in file_gen: + for file in files: + objects.append(os.path.relpath('%s/%s' % (path, file), + container_dir)) + return objects + + @trace_function + def add_objects_from_hpss(self, swift_api, target_container, container_dir): + """ + Update object metadata on object creates, and returns a list of all the + objects existing in the container from Swift. + + :param swiftclient.Connection swift_api: Swift API client + :param str target_container: Container to add objects to + :param str container_dir: Container directory to scan for objects in + :returns: List of objects from Swift metadata in container + """ + + # TODO: be smarter about what files we HEAD + + hpss_objects = self._get_object_names(container_dir) + logging.debug('hpss_objects is %s' % str(hpss_objects)) + for obj in hpss_objects: + print "Adding object %s..." % obj + try: + swift_api.head_object(target_container, obj) + except ClientException as err: + fail_reason = "Updating object %s in container %s went wrong"\ + % (obj, target_container) + print fail_reason + raise err + + # Get list of objects from this container + try: + container_stats, swift_objects =\ + swift_api.get_container(target_container) + logging.debug('swift_objects is %s' % str(swift_objects)) + except ClientException as err: + print "Get on container %s went wrong" % target_container + raise err + return swift_objects + + @trace_function + def clean_account(self, swift_api, target_account): + """ + Deletes containers in Swift that do not exist on the HPSS filesystem, + and returns a list of the containers that exist in HPSS. + + :param swiftclient.Connection swift_api: Swift API client + :param str target_account: Account of the containers that need cleaning + :returns: List of containers existing on HPSS mount point + """ + # Check containers in HPSS + account_directory = "%s/%s" % (self._args.device, target_account) + + # Check containers in Swift + swift_containers = [container['name'] for container + in swift_api.get_account()[1]] + try: + hpss_containers = os.listdir(account_directory) + except OSError as err: + print "Unable to list files under directory: %s" % account_directory + raise err + + # Delete containers that only exist in Swift, but not HPSS + ghost_containers = list(set(swift_containers) - set(hpss_containers)) + for target_container in ghost_containers: + try: + for obj in swift_api.get_container(target_container)[1]: + try: + swift_api.delete_object(target_container, obj['name']) + except ClientException as e: + if e.http_status == 404: + pass + swift_api.delete_container(target_container) + except ClientException: + print "Deleting container '%s' went wrong!" % target_container + raise + + return hpss_containers + + @trace_function + def clean_container(self, swift_api, target_account, target_container): + """ + Deletes Swift objects that do not exist in HPSS from the container + + :param swiftclient.Connection swift_api: Swift API client + :param str target_account: Account that target container belongs to + :param str target_container: Container that needs cleaning + :returns: nothing + """ + + container_path = "%s/%s/%s" % (self._args.device, target_account, + target_container) + + hpss_objects = self._get_object_names(container_path) + swift_objects = [obj['name'] for obj in + swift_api.get_container(target_container)[1]] + known_good_objects = [] + swift_only_objects = list(set(swift_objects) - set(hpss_objects)) + + # If we have objects that only exist in the Swift metadata, delete those + # objects. + for target_obj in swift_only_objects: + try: + swift_api.delete_object(target_container, target_obj) + except ClientException: + container_stats, post_delete_dict =\ + swift_api.get_container(target_container) + for obj in post_delete_dict: + known_good_objects.append(obj['name']) + if target_obj in known_good_objects: + fail_reason =\ + "Deleting object %s in container %s went wrong"\ + % (target_obj, target_container) + logging.error(fail_reason) + print fail_reason + raise IOError(fail_reason) + + # TODO: clean up this code + @trace_function + def configure_permissions(self, swift_api, keystone_api, target_account, + target_container): + """ + Configuring Swift Container ACLs + + :param swiftclient.Connection swift_api: + Swift API client + :param LightweightKeystoneAPI keystone_api: + Keystone API client + :param str target_account: Account that container belongs in + :param str target_container: Container name + :returns: nothing + """ + + # TODO: figure out how to deal with files that have more restrictive + # permissions than the container does + + path = "%s/%s/%s" % (self._args.device, target_account, + target_container) + # Obtain unix user permissions for path + try: + file_user = getpwuid(os.stat(path).st_uid).pw_name + mode = os.stat(path).st_mode + except KeyError as err: + fail_reason = "Cannot find permissions for %s" % path + print fail_reason + logging.exception(fail_reason) + raise err + + # Check if file user exists in Keystone + try: + keystone_users = [user['name'] for user + in keystone_api.list_users()] + except Exception as e: + fail_reason = "Couldn't get user list" + print fail_reason + logging.exception(fail_reason) + raise e + + if file_user not in keystone_users: + fail_reason = \ + "Cannot configure proper permissions for this path %s\ + because user %s does not exist in keystone" % (path, file_user) + print fail_reason + logging.error(fail_reason) + raise IOError(fail_reason) + + # Update container ACLs + try: + if mode & stat.S_IRUSR: + headers = {"X-Read-Container": "%s:%s" + % (target_account, file_user)} + swift_api.post_container(target_container, headers) + if mode & stat.S_IWUSR: + headers = {"X-Write-Container": "%s:%s" + % (target_account, file_user)} + swift_api.post_container(target_container, headers) + except requests.ConnectionError as err: + print "Cannot configure ACLs due to metadata header issue" + logging.exception( + "Cannot configure ACLs due to metadata header issue") + raise err + + +# This only exists because the keystoneclient library is so massive that it has +# to have a lazy-loading mechanism that ensures only one of it can be active, +# so we can't have handles to multiple different Keystone scopes simultaneously +class LightweightKeystoneAPI: + + MEMBER_ROLE_ID = '9fe2ff9ee4384b1894a90878d3e92bab' + + def __init__(self, url, username, password, tenant_name, api_version='v2'): + self.url = url + self.username = username + self.password = password + self.tenant_name = tenant_name + self.auth_token = None + self.version = api_version + logging.debug('New LightweightKeystoneAPI instance created for %s@%s,' + ' id = %s' % + (self.username, self.tenant_name, hex(id(self)))) + + def _get_keystone_response(self, method, url, headers=None, json=None): + resp = method(url=url, + headers=headers, + json=json) + logging.debug('Response code: %s' % str(resp.status_code)) + logging.debug('Response content: %s' % resp.content) + if not (200 <= resp.status_code <= 299): + raise IOError(resp.status_code, 'Request was unsuccessful') + try: + resp_json = resp.json() + logging.debug('response json: %s' % resp_json) + except JSONDecodeError: + logging.error('Malformed response from Keystone') + raise IOError('Malformed response from Keystone') + return resp.headers, resp_json + + @trace_function + def authenticate(self): + if self.auth_token: + return True + + if self.version == 'v2': + url = '%s/tokens' % self.url + token_req = {'auth': {'tenantName': self.tenant_name, + 'passwordCredentials': { + 'username': self.username, + 'password': self.password + }}} + else: + url = '%s/auth/tokens' % self.url + token_req = {'auth': {'identity': + {'methods': ['password'], + 'password': { + 'user': { + 'name': self.username, + 'password': self.password, + 'domain': {'id': 'default'} + } + } + }, + 'scope': { + 'project': { + 'name': self.tenant_name, + 'domain': { + 'id': 'default' + } + } + } + } + } + try: + resp_headers, resp_json =\ + self._get_keystone_response(requests.post, + url, + None, + token_req) + except IOError: + return False + if self.version == 'v2': + self.auth_token = resp_json['access']['token']['id'] + else: + self.auth_token = resp_headers['X-Subject-Token'] + logging.debug('New auth token: %s' % self.auth_token) + return True + + @trace_function + def list_users(self): + headers = {'X-Auth-Token': self.auth_token} + resp_headers, resp_json = \ + self._get_keystone_response(requests.get, + '%s/users' % self.url, + headers) + return resp_json['users'] + + @trace_function + def list_tenants(self): + if self.version == 'v2': + url = '%s/tenants' % self.url + else: + url = '%s/projects' % self.url + resp_headers, resp_json = \ + self._get_keystone_response(requests.get, + url, + {'X-Auth-Token': self.auth_token}) + if self.version == 'v2': + return resp_json['tenants'] + else: + return resp_json['projects'] + + @trace_function + def list_roles(self): + if self.version == 'v2': + url = '%s/OS-KSADM/roles' % self.url + else: + url = '%s/roles' % self.url + resp_headers, resp_json =\ + self._get_keystone_response(requests.get, + url, + {'X-Auth-Token': + self.auth_token}) + return resp_json['roles'] + + @trace_function + def get_role(self, role_name): + role_id = self._get_id_for_role_name(role_name) + if self.version == 'v2': + url = '%s/OS-KSADM/%s' % (self.url, role_id) + else: + url = '%s/roles/%s' % (self.url, role_id) + resp_headers, resp_json = \ + self._get_keystone_response(requests.get, + url, + {'X-Auth-Token': + self.auth_token}) + return resp_json['role'] + + @trace_function + # TODO: write v3 version of this + def get_tenant(self, target_tenant): + url = '%s/tenants?name=%s' % (self.url, target_tenant) + headers = {'X-Auth-Token': self.auth_token, + 'name': target_tenant} + logging.debug('url: %s' % url) + logging.debug('headers: %s' % headers) + resp_headers, resp_json = self._get_keystone_response(requests.get, + url, + headers) + return resp_json['tenant'] + + @trace_function + def _get_id_for_user_name(self, user_name): + users = [user['id'] for user in + self.list_users() if user['username'] == user_name] + if len(users) == 0: + raise KeyError('%s is not a user' % user_name) + else: + return users[0] + + @trace_function + def _get_id_for_tenant_name(self, tenant_name): + tenants = [tenant['id'] for tenant in self.list_tenants() + if tenant['name'] == tenant_name] + if len(tenants) == 0: + raise KeyError('%s is not a tenant' % tenant_name) + else: + return tenants[0] + + @trace_function + def _get_id_for_role_name(self, role_name): + roles = [role['id'] for role in + self.list_roles() if role['name'] == role_name] + if len(roles) == 0: + raise KeyError('%s is not a role' % role_name) + else: + return roles[0] + + @trace_function + def grant_role_on_tenant(self, target_project, target_user, target_role): + try: + tenant_id = self._get_id_for_tenant_name(target_project) + user_id = self._get_id_for_user_name(target_user) + role_id = self._get_id_for_role_name(target_role) + except KeyError as e: + print "Could not grant role %s to %s@%s (reason: %s)" % \ + (target_role, target_user, target_project, e.message) + raise + + if self.version == 'v2': + url = '%s/tenants/%s/users/%s/roles/OS-KSADM/%s' %\ + (self.url, tenant_id, user_id, role_id) + else: + url = '%s/projects/%s/users/%s/roles/%s' %\ + (self.url, tenant_id, user_id, role_id) + + try: + self._get_keystone_response(requests.put, url, + {'X-Auth-Token': self.auth_token}) + except IOError as e: + if e.errno == 409: + # We must've already granted ourselves this role. Carry on. + pass + else: + raise + + +if __name__ == "__main__": + _args = check_usage() + if os.getuid() != 0: + print 'swiftonhpss-nstool must be run as root' + sys.exit(1) + + # Initiating Log File + if _args.verbose: + log_level = logging.DEBUG + else: + log_level = logging.ERROR + logging.basicConfig(filename=_args.logfile, + level=log_level) + main(_args) \ No newline at end of file diff --git a/setup.py b/setup.py index 6329d20..71886d5 100644 --- a/setup.py +++ b/setup.py @@ -1,3 +1,4 @@ +# Copyright (c) 2016 IBM Corporation # Copyright (c) 2013 Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -22,26 +23,27 @@ setup( version=_pkginfo.full_version, description='SwiftOnHPSS', license='Apache License (2.0)', - author='IBM & Red Hat, Inc.', + author='IBM Corporation; Red Hat, Inc.', url='https://github.com/hpss-collaboration/swiftonhpss', packages=find_packages(exclude=['test', 'bin']), test_suite='nose.collector', classifiers=[ - 'Development Status :: 2 - Pre-Alpha' - 'Environment :: OpenStack' - 'Intended Audience :: Information Technology' - 'Intended Audience :: System Administrators' - 'License :: OSI Approved :: Apache Software License' - 'Operating System :: POSIX :: Linux' - 'Programming Language :: Python' - 'Programming Language :: Python :: 2' - 'Programming Language :: Python :: 2.6' + 'Development Status :: 2 - Pre-Alpha', + 'Environment :: OpenStack', + 'Intended Audience :: Information Technology', + 'Intended Audience :: System Administrators', + 'License :: OSI Approved :: Apache Software License', + 'Operating System :: POSIX :: Linux', + 'Programming Language :: Python', + 'Programming Language :: Python :: 2', + 'Programming Language :: Python :: 2.6', 'Programming Language :: Python :: 2.7' ], install_requires=[], scripts=[ 'bin/swiftonhpss-print-metadata', 'bin/swiftonhpss-migrate-metadata', + 'bin/swiftonhpss-nstool' ], entry_points={ 'paste.app_factory': [ diff --git a/swiftonhpss/swift/common/utils.py b/swiftonhpss/swift/common/utils.py index ac53009..d33873c 100644 --- a/swiftonhpss/swift/common/utils.py +++ b/swiftonhpss/swift/common/utils.py @@ -24,6 +24,7 @@ from eventlet import sleep import cPickle as pickle from cStringIO import StringIO import pickletools +import xattr from swiftonhpss.swift.common.exceptions import SwiftOnFileSystemIOError from swift.common.exceptions import DiskFileNoSpace from swift.common.db import utf8encodekeys @@ -235,24 +236,29 @@ def _read_for_etag(fp): return etag.hexdigest() -def get_etag(fd): +def get_etag(fd_or_path): """ - FIXME: It would be great to have a translator that returns the md5sum() of - the file as an xattr that can be simply fetched. + Either read the ETag from HPSS metadata, or read the entire file to + generate it. + """ + # Try to just get the MD5 sum from HPSS. We're assuming that we recheck + # this checksum every time we actually open the file for read/write. + attrs = xattr.xattr(fd_or_path) + if 'system.hpss.hash' in attrs: + return attrs['system.hpss.hash'] + elif 'user.hash.checksum' in attrs: + return attrs['user.hash.checksum'] - Since we don't have that we should yield after each chunk read and - computed so that we don't consume the worker thread. - """ - if isinstance(fd, int): + if isinstance(fd_or_path, int): # We are given a file descriptor, so this is an invocation from the # DiskFile.open() method. - fd = fd - etag = _read_for_etag(do_dup(fd)) - do_lseek(fd, 0, os.SEEK_SET) + etag = _read_for_etag(do_dup(fd_or_path)) + do_lseek(fd_or_path, 0, os.SEEK_SET) + else: # We are given a path to the object when the DiskDir.list_objects_iter # method invokes us. - path = fd + path = fd_or_path fd = do_open(path, os.O_RDONLY) etag = _read_for_etag(fd) do_close(fd) @@ -264,6 +270,7 @@ def get_object_metadata(obj_path_or_fd, stats=None): """ Return metadata of object. """ + logging.error('Entering get_object_metadata for %s' % obj_path_or_fd) if not stats: if isinstance(obj_path_or_fd, int): # We are given a file descriptor, so this is an invocation from the diff --git a/swiftonhpss/swift/obj/diskfile.py b/swiftonhpss/swift/obj/diskfile.py index b2b2bb5..cd8f607 100644 --- a/swiftonhpss/swift/obj/diskfile.py +++ b/swiftonhpss/swift/obj/diskfile.py @@ -24,6 +24,7 @@ except ImportError: import logging import time import hpssfs +import xattr from uuid import uuid4 from hashlib import md5 from eventlet import sleep @@ -54,7 +55,7 @@ from swift.obj.diskfile import get_async_dir # FIXME: Hopefully we'll be able to move to Python 2.7+ where O_CLOEXEC will # be back ported. See http://www.python.org/dev/peps/pep-0433/ -O_CLOEXEC = 02000000 +O_CLOEXEC = 0o20000000 MAX_RENAME_ATTEMPTS = 10 MAX_OPEN_ATTEMPTS = 10 @@ -298,7 +299,7 @@ class DiskFileWriter(object): df._threadpool.run_in_thread(self._write_entire_chunk, chunk) return self._upload_size - def _finalize_put(self, metadata, purgelock=False): + def _finalize_put(self, metadata, purgelock=False, has_etag=True): # Write out metadata before fsync() to ensure it is also forced to # disk. write_metadata(self._fd, metadata) @@ -390,11 +391,33 @@ class DiskFileWriter(object): else: # Success! break + # Close here so the calling context does not have to perform this # in a thread. self.close() - def put(self, metadata, purgelock): + # TODO: see if this is really the right way of getting the ETag + if not has_etag: + sleep(.5) + try: + xattrs = xattr.xattr(df._data_file) + if 'system.hpss.hash' in xattrs: + etag = xattrs['system.hpss.hash'] + elif 'user.hash.checksum' in xattrs: + etag = xattrs['user.hash.checksum'] + else: + raise DiskFileError( + 'ETag was not in HPSS xattrs for file %s' + % df._data_file) + metadata['ETag'] = etag + write_metadata(df._data_file, metadata) + except IOError as err: + raise DiskFileError( + err.errno, + "Could not get xattrs for file '%s', reason: %s" + % (df._data_file, err.strerror)) + + def put(self, metadata, purgelock=False, has_etag=True): """ Finalize writing the file on disk, and renames it from the temp file to the real location. This should be called after the data has been @@ -424,7 +447,7 @@ class DiskFileWriter(object): ' as a directory' % df._data_file) df._threadpool.force_run_in_thread(self._finalize_put, metadata, - purgelock) + purgelock, has_etag) # Avoid the unlink() system call as part of the mkstemp context # cleanup @@ -622,6 +645,7 @@ class DiskFile(object): self._put_datadir = self._container_path self._data_file = os.path.join(self._put_datadir, self._obj) + self._stat = do_stat(self._data_file) @property def timestamp(self): @@ -668,7 +692,7 @@ class DiskFile(object): obj_size = self._stat.st_size self._metadata = read_metadata(self._fd) - if not self._validate_object_metadata(self._fd): + if not self._validate_object_metadata(): self._create_object_metadata(self._fd) assert self._metadata is not None @@ -700,11 +724,10 @@ class DiskFile(object): return self - def _validate_object_metadata(self, fd): - + def _validate_object_metadata(self): # Has no Swift specific metadata saved as xattr. Probably because # object was added/replaced through filesystem interface. - if not self._metadata: + if not self._metadata and not self._is_dir: self._file_has_changed = True return False @@ -724,15 +747,16 @@ class DiskFile(object): # Check if the file has been modified through filesystem # interface by comparing mtime stored in xattr during PUT # and current mtime of file. + obj_stat = os.stat(self._data_file) if normalize_timestamp(self._metadata[X_MTIME]) != \ - normalize_timestamp(self._stat.st_mtime): + normalize_timestamp(obj_stat.st_mtime): self._file_has_changed = True return False else: # Without X_MTIME key, comparing md5sum is the only way # to determine if file has changed or not. This is inefficient # but there's no other way! - self._etag = get_etag(fd) + self._etag = get_etag(self._data_file) if self._etag != self._metadata[X_ETAG]: self._file_has_changed = True return False @@ -749,7 +773,6 @@ class DiskFile(object): return False def _create_object_metadata(self, fd): - if self._etag is None: self._etag = md5().hexdigest() if self._is_dir \ else get_etag(fd) @@ -801,6 +824,23 @@ class DiskFile(object): return True return False + def is_offline(self): + try: + raw_file_levels = xattr.getxattr(self._data_file, + "system.hpss.level") + except IOError as err: + raise SwiftOnFileSystemIOError( + err.errno, + '%s, xattr.getxattr("system.hpss.level", ...)' % err.strerror + ) + try: + file_levels = raw_file_levels.split(";") + top_level = file_levels[0].split(':') + bytes_on_disk = top_level[2].rstrip(' ') + except ValueError: + raise SwiftOnFileSystemIOError("Couldn't get system.hpss.level!") + return bytes_on_disk != self._stat.st_size + def __enter__(self): """ Context enter. @@ -848,15 +888,28 @@ class DiskFile(object): def read_metadata(self): """ - Return the metadata for an object without requiring the caller to open - the object first. + Return the metadata for an object without opening the object's file on + disk. :returns: metadata dictionary for an object :raises DiskFileError: this implementation will raise the same errors as the `open()` method. """ - with self.open(): - return self.get_metadata() + # FIXME: pull a lot of this and the copy of it from open() out to + # another function + + # Do not actually open the file, in order to duck hpssfs checksum + # validation and resulting timeouts + # This means we do a few things DiskFile.open() does. + try: + self._is_dir = os.path.isdir(self._data_file) + self._metadata = read_metadata(self._data_file) + except IOError: + raise DiskFileNotExist + if not self._validate_object_metadata(): + self._create_object_metadata(self._data_file) + self._filter_metadata() + return self._metadata def reader(self, iter_hook=None, keep_cache=False): """ diff --git a/swiftonhpss/swift/obj/server.py b/swiftonhpss/swift/obj/server.py index f278dbb..6660161 100644 --- a/swiftonhpss/swift/obj/server.py +++ b/swiftonhpss/swift/obj/server.py @@ -21,6 +21,7 @@ import time import xattr import os import hpssfs +import time from hashlib import md5 from swift.common.swob import HTTPConflict, HTTPBadRequest, HeaderKeyDict, \ HTTPInsufficientStorage, HTTPPreconditionFailed, HTTPRequestTimeout, \ @@ -78,6 +79,7 @@ class ObjectController(server.ObjectController): # Replaces Swift's DiskFileRouter object reference with ours. self._diskfile_router = SwiftOnFileDiskFileRouter(conf, self.logger) self.swift_dir = conf.get('swift_dir', '/etc/swift') + self.handle_md5 = conf.get('handle_md5', False) self.container_ring = None # This conf option will be deprecated and eventualy removed in # future releases @@ -152,7 +154,8 @@ class ObjectController(server.ObjectController): orig_delete_at = int(orig_metadata.get('X-Delete-At') or 0) upload_expiration = time.time() + self.max_upload_time - etag = md5() + if self.handle_md5: + etag = md5() elapsed_time = 0 # (HPSS) Check for HPSS-specific metadata headers @@ -178,7 +181,8 @@ class ObjectController(server.ObjectController): if start_time > upload_expiration: self.logger.increment('PUT.timeouts') return HTTPRequestTimeout(request=request) - etag.update(chunk) + if self.handle_md5: + etag.update(chunk) upload_size = writer.write(chunk) elapsed_time += time.time() - start_time except ChunkReadTimeout: @@ -188,7 +192,10 @@ class ObjectController(server.ObjectController): elapsed_time, upload_size) if fsize and fsize != upload_size: return HTTPClientDisconnect(request=request) - etag = etag.hexdigest() + if self.handle_md5: + etag = etag.hexdigest() + else: + etag = '' if 'etag' in request.headers \ and request.headers['etag'].lower() != etag: return HTTPUnprocessableEntity(request=request) @@ -210,33 +217,40 @@ class ObjectController(server.ObjectController): metadata[header_caps] = request.headers[header_key] # (HPSS) Purge lock the file - writer.put(metadata, purgelock=purgelock) + writer.put(metadata, purgelock=purgelock, + has_etag=self.handle_md5) except DiskFileNoSpace: return HTTPInsufficientStorage(drive=device, request=request) except SwiftOnFileSystemIOError: return HTTPServiceUnavailable(request=request) - # (HPSS) Set checksum on file - try: - xattr.setxattr(disk_file._data_file, 'system.hpss.hash', - "md5:%s" % etag) - except IOError: - logging.exception("Error setting HPSS E2EDI checksum in " - "system.hpss.hash, storing ETag in " - "user.hash.checksum\n") + # FIXME: this stuff really should be handled in DiskFile somehow? + if self.handle_md5: + # (HPSS) Set checksum on file ourselves, if hpssfs won't do it + # for us. try: - xattr.setxattr(disk_file._data_file, - 'user.hash.checksum', etag) - xattr.setxattr(disk_file._data_file, - 'user.hash.algorithm', 'md5') - xattr.setxattr(disk_file._data_file, - 'user.hash.state', 'Valid') - xattr.setxattr(disk_file._data_file, - 'user.hash.filesize', str(upload_size)) - except IOError as err: - raise SwiftOnFileSystemIOError( - err.errno, '%s, xattr.setxattr(...)' % err.strerror) + xattr.setxattr(disk_file._data_file, 'system.hpss.hash', + "md5:%s" % etag) + except IOError: + logging.debug("Could not write ETag to system.hpss.hash," + " trying user.hash.checksum") + try: + xattr.setxattr(disk_file._data_file, + 'user.hash.checksum', etag) + xattr.setxattr(disk_file._data_file, + 'user.hash.algorithm', 'md5') + xattr.setxattr(disk_file._data_file, + 'user.hash.state', 'Valid') + xattr.setxattr(disk_file._data_file, + 'user.hash.filesize', str(upload_size)) + xattr.setxattr(disk_file._data_file, + 'user.hash.app', 'swiftonhpss') + except IOError as err: + raise SwiftOnFileSystemIOError( + err.errno, + 'Could not write MD5 checksum to HPSS filesystem: ' + '%s' % err.strerror) # Update container metadata if orig_delete_at != new_delete_at: @@ -336,6 +350,7 @@ class ObjectController(server.ObjectController): try: disk_file = self.get_diskfile(device, partition, account, container, obj, policy=policy) + except DiskFileDeviceUnavailable: return HTTPInsufficientStorage(drive=device, request=request) @@ -370,7 +385,7 @@ class ObjectController(server.ObjectController): pass try: - self.get_hpss_xattr(request, response, disk_file) + self._get_hpss_xattr(request, response, disk_file) except SwiftOnFileSystemIOError: return HTTPServiceUnavailable(request=request) @@ -408,7 +423,7 @@ class ObjectController(server.ObjectController): # (HPSS) Our file could end up being on an offline # tape, so we need to check for it and return an # HTTP 'accepted, but still processing' response. - if self.is_offline(disk_file._data_file, request): + if disk_file.is_offline(): return HTTPAccepted(request=request) except (SwiftOnFileSystemIOError, SwiftOnFileFsException): return HTTPServiceUnavailable(request=request) @@ -435,7 +450,7 @@ class ObjectController(server.ObjectController): response.headers['X-Backend-Timestamp'] = file_x_ts.internal # (HPSS) Inject HPSS xattr metadata into headers try: - self.get_hpss_xattr(request, response, disk_file) + self._get_hpss_xattr(request, response, disk_file) except SwiftOnFileSystemIOError: return HTTPServiceUnavailable(request=request) return request.get_response(response) @@ -448,7 +463,7 @@ class ObjectController(server.ObjectController): # TODO: refactor this to live in DiskFile! # Along with all the other HPSS stuff - def get_hpss_xattr(self, request, response, diskfile): + def _get_hpss_xattr(self, request, response, diskfile): attrlist = {'X-HPSS-Account': 'account', 'X-HPSS-BitfileID': 'bitfile', 'X-HPSS-Comment': 'comment', @@ -476,27 +491,6 @@ class ObjectController(server.ObjectController): '%s, xattr.getxattr("%s", ...)' % (err.strerror, attr) ) - # TODO: move this to DiskFile - # TODO: make it more obvious how we're parsing the level xattr - def is_offline(self, path, request): - try: - byteslevel = xattr.getxattr(path, "system.hpss.level") - except IOError as err: - raise SwiftOnFileSystemIOError( - err.errno, - '%s, xattr.getxattr("system.hpss.level", ...)' % err.strerror - ) - - try: - byteslevelstring = byteslevel.split(";") - bytesfirstlevel = byteslevelstring[0].split(':') - bytesfile = bytesfirstlevel[2].rstrip(' ') - except ValueError: - raise SwiftOnFileFsException("Couldn't get system.hpss.level!") - setbytes = set(str(bytesfile)) - setsize = set(str(os.stat(path).st_size)) - return setbytes != setsize - @public @timing_stats() def POST(self, request): @@ -582,7 +576,14 @@ class ObjectController(server.ObjectController): orig_timestamp = e.timestamp orig_metadata = {} response_class = HTTPNotFound - except (DiskFileNotExist, DiskFileQuarantined): + + # If the file got deleted outside of Swift, we won't see it. So just say + # it got deleted, even if it never existed in the first place. + except DiskFileNotExist: + orig_timestamp = 0 + orig_metadata = {} + response_class = HTTPNoContent + except DiskFileQuarantined: orig_timestamp = 0 orig_metadata = {} response_class = HTTPNotFound