From 916f109e2fa9cb2e9d02c55fb801f7e0976b4322 Mon Sep 17 00:00:00 2001 From: Frode Nordahl Date: Tue, 14 Jul 2020 14:32:54 +0200 Subject: [PATCH] Add cleanup action and OVS to OVN migration test Add OVS to OVN migration at the end of the regular gate test. This adds only 5-10 minutes to each job and we want to confirm this works from focal-ussuri and onwards as this is the point where we recomend our end users to migrate from OVS to OVN. Do ch-sync. Merge after https://github.com/juju/charm-helpers/pull/511 Func-Test-Pr: https://github.com/openstack-charmers/zaza-openstack-tests/pull/365 Depends-On: Ifa99988612eaaeb9d60a0d99db172f97e27cfc93 Change-Id: Ia4b1d3a9e642b540d1e04adc0363f9b3e11f37cd --- actions.yaml | 20 + actions/cleanup | 1 + actions/cleanup.py | 173 ++ .../contrib/network/ovs/__init__.py | 76 +- .../charmhelpers/contrib/network/ovs/ovsdb.py | 64 +- .../charmhelpers/contrib/openstack/context.py | 96 ++ .../contrib/openstack/templates/ceph.conf | 4 + .../section-ceph-bluestore-compression | 28 + .../contrib/storage/linux/ceph.py | 1407 ++++++++++++----- tests/bundles/focal-ussuri-dvr-snat.yaml | 48 +- tests/bundles/focal-victoria-dvr-snat.yaml | 46 + tests/bundles/groovy-victoria-dvr-snat.yaml | 46 + tests/tests.yaml | 44 +- unit_tests/test_actions_cleanup.py | 185 +++ 14 files changed, 1790 insertions(+), 448 deletions(-) create mode 120000 actions/cleanup create mode 100755 actions/cleanup.py create mode 100644 hooks/charmhelpers/contrib/openstack/templates/section-ceph-bluestore-compression create mode 100644 unit_tests/test_actions_cleanup.py diff --git a/actions.yaml b/actions.yaml index 71536a8f..2b97908f 100644 --- a/actions.yaml +++ b/actions.yaml @@ -1,3 +1,23 @@ +cleanup: + description: | + Clean up after the Neutron agents. + params: + i-really-mean-it: + type: boolean + default: false + description: | + The default false will not run the action, set to true to perform + cleanup. + . + WARNING: Running this action will interrupt instance connectivity and + it will not be restored until either Neutron agents or a different + SDN reprograms connectivity on the hypervisor. + . + NOTE: The application must be configured with `firewall-driver` + 'openvswitch' and the unit must be paused prior to running this + action. + required: + - i-really-mean-it pause: description: Pause the neutron-openvswitch unit. This action will stop neutron-openvswitch services. resume: diff --git a/actions/cleanup b/actions/cleanup new file mode 120000 index 00000000..ca939196 --- /dev/null +++ b/actions/cleanup @@ -0,0 +1 @@ +cleanup.py \ No newline at end of file diff --git a/actions/cleanup.py b/actions/cleanup.py new file mode 100755 index 00000000..cbe86a44 --- /dev/null +++ b/actions/cleanup.py @@ -0,0 +1,173 @@ +#!/usr/bin/env python3 +# +# Copyright 2020 Canonical Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import subprocess +import sys +import traceback + +sys.path.append('hooks/') + +import charmhelpers.core as ch_core +import charmhelpers.contrib.openstack.utils as ch_openstack_utils +import charmhelpers.contrib.network.ovs as ch_ovs +import charmhelpers.contrib.network.ovs.ovsdb as ch_ovsdb + + +class BaseDocException(Exception): + """Use docstring as default message for exception.""" + + def __init__(self, message=None): + self.message = message or self.__doc__ + + def __repr__(self): + return self.message + + def __str__(self): + return self.message + + +class UnitNotPaused(BaseDocException): + """Action requires unit to be paused but it was not paused.""" + pass + + +class MandatoryConfigurationNotSet(BaseDocException): + """Action requires certain configuration to be set to operate correctly.""" + pass + + +def remove_patch_ports(bridge): + """Remove patch ports from both ends starting with named bridge. + + :param bridge: Name of bridge to look for patch ports to remove. + :type bridge: str + """ + # NOTE: We need to consume all output from the `patch_ports_on_bridge` + # generator prior to removing anything otherwise it will raise an error. + for patch in list(ch_ovs.patch_ports_on_bridge(bridge)): + ch_ovs.del_bridge_port( + patch.this_end.bridge, + patch.this_end.port, + linkdown=False) + ch_ovs.del_bridge_port( + patch.other_end.bridge, + patch.other_end.port, + linkdown=False) + + +def remove_per_bridge_controllers(): + """Remove per bridge controllers.""" + bridges = ch_ovsdb.SimpleOVSDB('ovs-vsctl').bridge + for bridge in bridges: + if bridge['controller']: + bridges.clear(str(bridge['_uuid']), 'controller') + + +def neutron_ipset_cleanup(): + """Perform Neutron ipset cleanup.""" + subprocess.check_call( + ( + 'neutron-ipset-cleanup', + '--config-file=/etc/neutron/neutron.conf', + '--config-file=/etc/neutron/plugins/ml2/openvswitch_agent.ini', + )) + + +def neutron_netns_cleanup(): + """Perform Neutron netns cleanup.""" + # FIXME: remove once package dependencies have been backported LP: #1881852 + subprocess.check_call(('apt', '-y', 'install', 'net-tools')) + _tmp_filters = '/etc/neutron/rootwrap.d/charm-n-ovs.filters' + with open(_tmp_filters, 'w') as fp: + fp.write( + '[Filters]\nneutron.cmd.netns_cleanup: CommandFilter, ip, root\n') + subprocess.check_call( + ( + 'neutron-netns-cleanup', + '--force', + *[ + # Existence of these files depend on our configuration. + '--config-file={}'.format(cfg) for cfg in ( + '/etc/neutron/neutron.conf', + '/etc/neutron/l3_agent.ini', + '/etc/neutron/fwaas_driver.ini', + '/etc/neutron/dhcp_agent.ini', + ) if os.path.exists(cfg)] + )) + os.unlink(_tmp_filters) + + +def cleanup(args): + """Clean up after Neutron agents.""" + # Check that prerequisites for operation are met + if not ch_openstack_utils.is_unit_paused_set(): + raise UnitNotPaused() + if ch_core.hookenv.config('firewall-driver') != 'openvswitch': + raise MandatoryConfigurationNotSet( + 'Action requires configuration option `firewall-driver` to be set ' + 'to "openvswitch" for succesfull operation.') + if not ch_core.hookenv.action_get('i-really-mean-it'): + raise MandatoryConfigurationNotSet( + 'Action requires the `i-really-mean-it` parameter to be set to ' + '"true".') + + # The names used for the integration- and tunnel-bridge are + # configurable, but this configuration is not exposed in the charm. + # + # Assume default names are used. + remove_patch_ports('br-int') + ch_ovs.del_bridge('br-tun') + + # The Neutron Open vSwitch agent configures each Open vSwitch bridge to + # establish an active OVSDB connection back to the Neutron Agent. + # + # Remove these + remove_per_bridge_controllers() + + # Remove namespaces set up by Neutron + neutron_netns_cleanup() + + # Remove ipsets set up by Neutron + neutron_ipset_cleanup() + + +# A dictionary of all the defined actions to callables (which take +# parsed arguments). +ACTIONS = {'cleanup': cleanup} + + +def main(args): + action_name = os.path.basename(args[0]) + try: + action = ACTIONS[action_name] + except KeyError: + msg = 'Action "{}" undefined'.format(action_name) + ch_core.hookenv.action_fail(msg) + return msg + else: + try: + action(args) + except Exception as e: + msg = 'Action "{}" failed: "{}"'.format(action_name, str(e)) + ch_core.hookenv.log( + '{} "{}"'.format(msg, traceback.format_exc()), + level=ch_core.hookenv.ERROR) + ch_core.hookenv.action_fail(msg) + + +if __name__ == "__main__": + sys.exit(main(sys.argv)) diff --git a/hooks/charmhelpers/contrib/network/ovs/__init__.py b/hooks/charmhelpers/contrib/network/ovs/__init__.py index acb503be..3004125a 100644 --- a/hooks/charmhelpers/contrib/network/ovs/__init__.py +++ b/hooks/charmhelpers/contrib/network/ovs/__init__.py @@ -13,6 +13,7 @@ # limitations under the License. ''' Helpers for interacting with OpenvSwitch ''' +import collections import hashlib import os import re @@ -20,9 +21,9 @@ import six import subprocess from charmhelpers import deprecate +from charmhelpers.contrib.network.ovs import ovsdb as ch_ovsdb from charmhelpers.fetch import apt_install - from charmhelpers.core.hookenv import ( log, WARNING, INFO, DEBUG ) @@ -592,3 +593,76 @@ def ovs_appctl(target, args): cmd = ['ovs-appctl', '-t', target] cmd.extend(args) return subprocess.check_output(cmd, universal_newlines=True) + + +def uuid_for_port(port_name): + """Get UUID of named port. + + :param port_name: Name of port. + :type port_name: str + :returns: Port UUID. + :rtype: Optional[uuid.UUID] + """ + for port in ch_ovsdb.SimpleOVSDB( + 'ovs-vsctl').port.find('name={}'.format(port_name)): + return port['_uuid'] + + +def bridge_for_port(port_uuid): + """Find which bridge a port is on. + + :param port_uuid: UUID of port. + :type port_uuid: uuid.UUID + :returns: Name of bridge or None. + :rtype: Optional[str] + """ + for bridge in ch_ovsdb.SimpleOVSDB( + 'ovs-vsctl').bridge: + # If there is a single port on a bridge the ports property will not be + # a list. ref: juju/charm-helpers#510 + if (isinstance(bridge['ports'], list) and + port_uuid in bridge['ports'] or + port_uuid == bridge['ports']): + return bridge['name'] + + +PatchPort = collections.namedtuple('PatchPort', ('bridge', 'port')) +Patch = collections.namedtuple('Patch', ('this_end', 'other_end')) + + +def patch_ports_on_bridge(bridge): + """Find patch ports on a bridge. + + :param bridge: Name of bridge + :type bridge: str + :returns: Iterator with bridge and port name for both ends of a patch. + :rtype: Iterator[Patch[PatchPort[str,str],PatchPort[str,str]]] + :raises: ValueError + """ + # On any given vSwitch there will be a small number of patch ports, so we + # start by iterating over ports with type `patch` then look up which bridge + # they belong to and act on any ports that match the criteria. + for interface in ch_ovsdb.SimpleOVSDB( + 'ovs-vsctl').interface.find('type=patch'): + for port in ch_ovsdb.SimpleOVSDB( + 'ovs-vsctl').port.find('name={}'.format(interface['name'])): + if bridge_for_port(port['_uuid']) == bridge: + this_end = PatchPort(bridge, port['name']) + other_end = PatchPort(bridge_for_port( + uuid_for_port( + interface['options']['peer'])), + interface['options']['peer']) + yield(Patch(this_end, other_end)) + # We expect one result and it is ok if it turns out to be a port + # for a different bridge. However we need a break here to satisfy + # the for/else check which is in place to detect interface refering + # to non-existent port. + break + else: + raise ValueError('Port for interface named "{}" does unexpectedly ' + 'not exist.'.format(interface['name'])) + else: + # Allow our caller to handle no patch ports found gracefully, in + # reference to PEP479 just doing a return will provide a emtpy iterator + # and not None. + return diff --git a/hooks/charmhelpers/contrib/network/ovs/ovsdb.py b/hooks/charmhelpers/contrib/network/ovs/ovsdb.py index 5e50bc36..2f1e53da 100644 --- a/hooks/charmhelpers/contrib/network/ovs/ovsdb.py +++ b/hooks/charmhelpers/contrib/network/ovs/ovsdb.py @@ -36,6 +36,11 @@ class SimpleOVSDB(object): for br in ovsdb.bridge: if br['name'] == 'br-test': ovsdb.bridge.set(br['uuid'], 'external_ids:charm', 'managed') + + WARNING: If a list type field only have one item `ovs-vsctl` will present + it as a single item. Since we do not know the schema we have no way of + knowing what fields should be de-serialized as lists so the caller has + to be careful of checking the type of values returned from this library. """ # For validation we keep a complete map of currently known good tool and @@ -157,6 +162,51 @@ class SimpleOVSDB(object): self._tool = tool self._table = table + def _deserialize_ovsdb(self, data): + """Deserialize OVSDB RFC7047 section 5.1 data. + + :param data: Multidimensional list where first row contains RFC7047 + type information + :type data: List[str,any] + :returns: Deserialized data. + :rtype: any + """ + # When using json formatted output to OVS commands Internal OVSDB + # notation may occur that require further deserializing. + # Reference: https://tools.ietf.org/html/rfc7047#section-5.1 + ovs_type_cb_map = { + 'uuid': uuid.UUID, + # NOTE: OVSDB sets have overloaded type + # see special handling below + 'set': list, + 'map': dict, + } + assert len(data) > 1, ('Invalid data provided, expecting list ' + 'with at least two elements.') + if data[0] == 'set': + # special handling for set + # + # it is either a list of strings or a list of typed lists. + # taste first element to see which it is + for el in data[1]: + # NOTE: We lock this handling down to the `uuid` type as + # that is the only one we have a practical example of. + # We could potentially just handle this generally based on + # the types listed in `ovs_type_cb_map` but let's open for + # that as soon as we have a concrete example to validate on + if isinstance( + el, list) and len(el) and el[0] == 'uuid': + decoded_set = [] + for el in data[1]: + decoded_set.append(self._deserialize_ovsdb(el)) + return(decoded_set) + # fall back to normal processing below + break + + # Use map to deserialize data with fallback to `str` + f = ovs_type_cb_map.get(data[0], str) + return f(data[1]) + def _find_tbl(self, condition=None): """Run and parse output of OVSDB `find` command. @@ -165,15 +215,6 @@ class SimpleOVSDB(object): :returns: Dictionary with data :rtype: Dict[str, any] """ - # When using json formatted output to OVS commands Internal OVSDB - # notation may occur that require further deserializing. - # Reference: https://tools.ietf.org/html/rfc7047#section-5.1 - ovs_type_cb_map = { - 'uuid': uuid.UUID, - # FIXME sets also appear to sometimes contain type/value tuples - 'set': list, - 'map': dict, - } cmd = [self._tool, '-f', 'json', 'find', self._table] if condition: cmd.append(condition) @@ -182,9 +223,8 @@ class SimpleOVSDB(object): for row in data['data']: values = [] for col in row: - if isinstance(col, list): - f = ovs_type_cb_map.get(col[0], str) - values.append(f(col[1])) + if isinstance(col, list) and len(col) > 1: + values.append(self._deserialize_ovsdb(col)) else: values.append(col) yield dict(zip(data['headings'], values)) diff --git a/hooks/charmhelpers/contrib/openstack/context.py b/hooks/charmhelpers/contrib/openstack/context.py index 42abccf7..54aed7ff 100644 --- a/hooks/charmhelpers/contrib/openstack/context.py +++ b/hooks/charmhelpers/contrib/openstack/context.py @@ -29,6 +29,8 @@ from subprocess import check_call, CalledProcessError import six +import charmhelpers.contrib.storage.linux.ceph as ch_ceph + from charmhelpers.contrib.openstack.audits.openstack_security_guide import ( _config_ini as config_ini ) @@ -56,6 +58,7 @@ from charmhelpers.core.hookenv import ( status_set, network_get_primary_address, WARNING, + service_name, ) from charmhelpers.core.sysctl import create as sysctl_create @@ -808,6 +811,12 @@ class CephContext(OSContextGenerator): ctxt['mon_hosts'] = ' '.join(sorted(mon_hosts)) + if config('pool-type') and config('pool-type') == 'erasure-coded': + base_pool_name = config('rbd-pool') or config('rbd-pool-name') + if not base_pool_name: + base_pool_name = service_name() + ctxt['rbd_default_data_pool'] = base_pool_name + if not os.path.isdir('/etc/ceph'): os.mkdir('/etc/ceph') @@ -3175,3 +3184,90 @@ class SRIOVContext(OSContextGenerator): :rtype: Dict[str,int] """ return self._map + + +class CephBlueStoreCompressionContext(OSContextGenerator): + """Ceph BlueStore compression options.""" + + # Tuple with Tuples that map configuration option name to CephBrokerRq op + # property name + options = ( + ('bluestore-compression-algorithm', + 'compression-algorithm'), + ('bluestore-compression-mode', + 'compression-mode'), + ('bluestore-compression-required-ratio', + 'compression-required-ratio'), + ('bluestore-compression-min-blob-size', + 'compression-min-blob-size'), + ('bluestore-compression-min-blob-size-hdd', + 'compression-min-blob-size-hdd'), + ('bluestore-compression-min-blob-size-ssd', + 'compression-min-blob-size-ssd'), + ('bluestore-compression-max-blob-size', + 'compression-max-blob-size'), + ('bluestore-compression-max-blob-size-hdd', + 'compression-max-blob-size-hdd'), + ('bluestore-compression-max-blob-size-ssd', + 'compression-max-blob-size-ssd'), + ) + + def __init__(self): + """Initialize context by loading values from charm config. + + We keep two maps, one suitable for use with CephBrokerRq's and one + suitable for template generation. + """ + charm_config = config() + + # CephBrokerRq op map + self.op = {} + # Context exposed for template generation + self.ctxt = {} + for config_key, op_key in self.options: + value = charm_config.get(config_key) + self.ctxt.update({config_key.replace('-', '_'): value}) + self.op.update({op_key: value}) + + def __call__(self): + """Get context. + + :returns: Context + :rtype: Dict[str,any] + """ + return self.ctxt + + def get_op(self): + """Get values for use in CephBrokerRq op. + + :returns: Context values with CephBrokerRq op property name as key. + :rtype: Dict[str,any] + """ + return self.op + + def get_kwargs(self): + """Get values for use as keyword arguments. + + :returns: Context values with key suitable for use as kwargs to + CephBrokerRq add_op_create_*_pool methods. + :rtype: Dict[str,any] + """ + return { + k.replace('-', '_'): v + for k, v in self.op.items() + } + + def validate(self): + """Validate options. + + :raises: AssertionError + """ + # We slip in a dummy name on class instantiation to allow validation of + # the other options. It will not affect further use. + # + # NOTE: once we retire Python 3.5 we can fold this into a in-line + # dictionary comprehension in the call to the initializer. + dummy_op = {'name': 'dummy-name'} + dummy_op.update(self.op) + pool = ch_ceph.BasePool('dummy-service', op=dummy_op) + pool.validate() diff --git a/hooks/charmhelpers/contrib/openstack/templates/ceph.conf b/hooks/charmhelpers/contrib/openstack/templates/ceph.conf index a11ce8ab..c0f22360 100644 --- a/hooks/charmhelpers/contrib/openstack/templates/ceph.conf +++ b/hooks/charmhelpers/contrib/openstack/templates/ceph.conf @@ -22,3 +22,7 @@ rbd default features = {{ rbd_features }} {{ key }} = {{ value }} {% endfor -%} {%- endif %} + +{% if rbd_default_data_pool -%} +rbd default data pool = {{ rbd_default_data_pool }} +{% endif %} diff --git a/hooks/charmhelpers/contrib/openstack/templates/section-ceph-bluestore-compression b/hooks/charmhelpers/contrib/openstack/templates/section-ceph-bluestore-compression new file mode 100644 index 00000000..a6430100 --- /dev/null +++ b/hooks/charmhelpers/contrib/openstack/templates/section-ceph-bluestore-compression @@ -0,0 +1,28 @@ +{# section header omitted as options can belong to multiple sections #} +{% if bluestore_compression_algorithm -%} +bluestore compression algorithm = {{ bluestore_compression_algorithm }} +{% endif -%} +{% if bluestore_compression_mode -%} +bluestore compression mode = {{ bluestore_compression_mode }} +{% endif -%} +{% if bluestore_compression_required_ratio -%} +bluestore compression required ratio = {{ bluestore_compression_required_ratio }} +{% endif -%} +{% if bluestore_compression_min_blob_size -%} +bluestore compression min blob size = {{ bluestore_compression_min_blob_size }} +{% endif -%} +{% if bluestore_compression_min_blob_size_hdd -%} +bluestore compression min blob size hdd = {{ bluestore_compression_min_blob_size_hdd }} +{% endif -%} +{% if bluestore_compression_min_blob_size_ssd -%} +bluestore compression min blob size ssd = {{ bluestore_compression_min_blob_size_ssd }} +{% endif -%} +{% if bluestore_compression_max_blob_size -%} +bluestore compression max blob size = {{ bluestore_compression_max_blob_size }} +{% endif -%} +{% if bluestore_compression_max_blob_size_hdd -%} +bluestore compression max blob size hdd = {{ bluestore_compression_max_blob_size_hdd }} +{% endif -%} +{% if bluestore_compression_max_blob_size_ssd -%} +bluestore compression max blob size ssd = {{ bluestore_compression_max_blob_size_ssd }} +{% endif -%} diff --git a/hooks/charmhelpers/contrib/storage/linux/ceph.py b/hooks/charmhelpers/contrib/storage/linux/ceph.py index 814d5c72..526b95ad 100644 --- a/hooks/charmhelpers/contrib/storage/linux/ceph.py +++ b/hooks/charmhelpers/contrib/storage/linux/ceph.py @@ -39,6 +39,7 @@ from subprocess import ( check_output, CalledProcessError, ) +from charmhelpers import deprecate from charmhelpers.core.hookenv import ( config, service_name, @@ -178,94 +179,293 @@ def send_osd_settings(): def validator(value, valid_type, valid_range=None): - """ - Used to validate these: http://docs.ceph.com/docs/master/rados/operations/pools/#set-pool-values + """Helper function for type validation. + + Used to validate these: + https://docs.ceph.com/docs/master/rados/operations/pools/#set-pool-values + https://docs.ceph.com/docs/master/rados/configuration/bluestore-config-ref/#inline-compression + Example input: validator(value=1, valid_type=int, valid_range=[0, 2]) + This says I'm testing value=1. It must be an int inclusive in [0,2] - :param value: The value to validate + :param value: The value to validate. + :type value: any :param valid_type: The type that value should be. + :type valid_type: any :param valid_range: A range of values that value can assume. - :return: + :type valid_range: Optional[Union[List,Tuple]] + :raises: AssertionError, ValueError """ - assert isinstance(value, valid_type), "{} is not a {}".format( - value, - valid_type) + assert isinstance(value, valid_type), ( + "{} is not a {}".format(value, valid_type)) if valid_range is not None: - assert isinstance(valid_range, list), \ - "valid_range must be a list, was given {}".format(valid_range) + assert isinstance( + valid_range, list) or isinstance(valid_range, tuple), ( + "valid_range must be of type List or Tuple, " + "was given {} of type {}" + .format(valid_range, type(valid_range))) # If we're dealing with strings if isinstance(value, six.string_types): - assert value in valid_range, \ - "{} is not in the list {}".format(value, valid_range) + assert value in valid_range, ( + "{} is not in the list {}".format(value, valid_range)) # Integer, float should have a min and max else: if len(valid_range) != 2: raise ValueError( - "Invalid valid_range list of {} for {}. " + "Invalid valid_range list of {} for {}. " "List must be [min,max]".format(valid_range, value)) - assert value >= valid_range[0], \ - "{} is less than minimum allowed value of {}".format( - value, valid_range[0]) - assert value <= valid_range[1], \ - "{} is greater than maximum allowed value of {}".format( - value, valid_range[1]) + assert value >= valid_range[0], ( + "{} is less than minimum allowed value of {}" + .format(value, valid_range[0])) + assert value <= valid_range[1], ( + "{} is greater than maximum allowed value of {}" + .format(value, valid_range[1])) class PoolCreationError(Exception): - """ - A custom error to inform the caller that a pool creation failed. Provides an error message + """A custom exception to inform the caller that a pool creation failed. + + Provides an error message """ def __init__(self, message): super(PoolCreationError, self).__init__(message) -class Pool(object): - """ - An object oriented approach to Ceph pool creation. This base class is inherited by ReplicatedPool and ErasurePool. - Do not call create() on this base class as it will not do anything. Instantiate a child class and call create(). - """ +class BasePool(object): + """An object oriented approach to Ceph pool creation. - def __init__(self, service, name): + This base class is inherited by ReplicatedPool and ErasurePool. Do not call + create() on this base class as it will raise an exception. + + Instantiate a child class and call create(). + """ + # Dictionary that maps pool operation properties to Tuples with valid type + # and valid range + op_validation_map = { + 'compression-algorithm': (str, ('lz4', 'snappy', 'zlib', 'zstd')), + 'compression-mode': (str, ('none', 'passive', 'aggressive', 'force')), + 'compression-required-ratio': (float, None), + 'compression-min-blob-size': (int, None), + 'compression-min-blob-size-hdd': (int, None), + 'compression-min-blob-size-ssd': (int, None), + 'compression-max-blob-size': (int, None), + 'compression-max-blob-size-hdd': (int, None), + 'compression-max-blob-size-ssd': (int, None), + } + + def __init__(self, service, name=None, percent_data=None, app_name=None, + op=None): + """Initialize BasePool object. + + Pool information is either initialized from individual keyword + arguments or from a individual CephBrokerRq operation Dict. + + :param service: The Ceph user name to run commands under. + :type service: str + :param name: Name of pool to operate on. + :type name: str + :param percent_data: The expected pool size in relation to all + available resources in the Ceph cluster. Will be + used to set the ``target_size_ratio`` pool + property. (default: 10.0) + :type percent_data: Optional[float] + :param app_name: Ceph application name, usually one of: + ('cephfs', 'rbd', 'rgw') (default: 'unknown') + :type app_name: Optional[str] + :param op: Broker request Op to compile pool data from. + :type op: Optional[Dict[str,any]] + :raises: KeyError + """ + # NOTE: Do not perform initialization steps that require live data from + # a running cluster here. The *Pool classes may be used for validation. self.service = service - self.name = name + self.nautilus_or_later = cmp_pkgrevno('ceph-common', '14.2.0') >= 0 + self.op = op or {} + + if op: + # When initializing from op the `name` attribute is required and we + # will fail with KeyError if it is not provided. + self.name = op['name'] + self.percent_data = op.get('weight') + self.app_name = op.get('app-name') + else: + self.name = name + self.percent_data = percent_data + self.app_name = app_name + + # Set defaults for these if they are not provided + self.percent_data = self.percent_data or 10.0 + self.app_name = self.app_name or 'unknown' + + def validate(self): + """Check that value of supplied operation parameters are valid. + + :raises: ValueError + """ + for op_key, op_value in self.op.items(): + if op_key in self.op_validation_map and op_value is not None: + valid_type, valid_range = self.op_validation_map[op_key] + try: + validator(op_value, valid_type, valid_range) + except (AssertionError, ValueError) as e: + # Normalize on ValueError, also add information about which + # variable we had an issue with. + raise ValueError("'{}': {}".format(op_key, str(e))) + + def _create(self): + """Perform the pool creation, method MUST be overridden by child class. + """ + raise NotImplementedError + + def _post_create(self): + """Perform common post pool creation tasks. + + Note that pool properties subject to change during the lifetime of a + pool / deployment should go into the ``update`` method. + + Do not add calls for a specific pool type here, those should go into + one of the pool specific classes. + """ + if self.nautilus_or_later: + # Ensure we set the expected pool ratio + update_pool( + client=self.service, + pool=self.name, + settings={ + 'target_size_ratio': str( + self.percent_data / 100.0), + }) + try: + set_app_name_for_pool(client=self.service, + pool=self.name, + name=self.app_name) + except CalledProcessError: + log('Could not set app name for pool {}' + .format(self.name), + level=WARNING) + if 'pg_autoscaler' in enabled_manager_modules(): + try: + enable_pg_autoscale(self.service, self.name) + except CalledProcessError as e: + log('Could not configure auto scaling for pool {}: {}' + .format(self.name, e), + level=WARNING) - # Create the pool if it doesn't exist already - # To be implemented by subclasses def create(self): - pass + """Create pool and perform any post pool creation tasks. + + To allow for sharing of common code among pool specific classes the + processing has been broken out into the private methods ``_create`` + and ``_post_create``. + + Do not add any pool type specific handling here, that should go into + one of the pool specific classes. + """ + if not pool_exists(self.service, self.name): + self.validate() + self._create() + self._post_create() + self.update() + + def set_quota(self): + """Set a quota if requested. + + :raises: CalledProcessError + """ + max_bytes = self.op.get('max-bytes') + max_objects = self.op.get('max-objects') + if max_bytes or max_objects: + set_pool_quota(service=self.service, pool_name=self.name, + max_bytes=max_bytes, max_objects=max_objects) + + def set_compression(self): + """Set compression properties if requested. + + :raises: CalledProcessError + """ + compression_properties = { + key.replace('-', '_'): value + for key, value in self.op.items() + if key in ( + 'compression-algorithm', + 'compression-mode', + 'compression-required-ratio', + 'compression-min-blob-size', + 'compression-min-blob-size-hdd', + 'compression-min-blob-size-ssd', + 'compression-max-blob-size', + 'compression-max-blob-size-hdd', + 'compression-max-blob-size-ssd') and value} + if compression_properties: + update_pool(self.service, self.name, compression_properties) + + def update(self): + """Update properties for an already existing pool. + + Do not add calls for a specific pool type here, those should go into + one of the pool specific classes. + """ + self.validate() + self.set_quota() + self.set_compression() def add_cache_tier(self, cache_pool, mode): - """ - Adds a new cache tier to an existing pool. - :param cache_pool: six.string_types. The cache tier pool name to add. - :param mode: six.string_types. The caching mode to use for this pool. valid range = ["readonly", "writeback"] - :return: None + """Adds a new cache tier to an existing pool. + + :param cache_pool: The cache tier pool name to add. + :type cache_pool: str + :param mode: The caching mode to use for this pool. + valid range = ["readonly", "writeback"] + :type mode: str """ # Check the input types and values validator(value=cache_pool, valid_type=six.string_types) - validator(value=mode, valid_type=six.string_types, valid_range=["readonly", "writeback"]) + validator( + value=mode, valid_type=six.string_types, + valid_range=["readonly", "writeback"]) - check_call(['ceph', '--id', self.service, 'osd', 'tier', 'add', self.name, cache_pool]) - check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, mode]) - check_call(['ceph', '--id', self.service, 'osd', 'tier', 'set-overlay', self.name, cache_pool]) - check_call(['ceph', '--id', self.service, 'osd', 'pool', 'set', cache_pool, 'hit_set_type', 'bloom']) + check_call([ + 'ceph', '--id', self.service, + 'osd', 'tier', 'add', self.name, cache_pool, + ]) + check_call([ + 'ceph', '--id', self.service, + 'osd', 'tier', 'cache-mode', cache_pool, mode, + ]) + check_call([ + 'ceph', '--id', self.service, + 'osd', 'tier', 'set-overlay', self.name, cache_pool, + ]) + check_call([ + 'ceph', '--id', self.service, + 'osd', 'pool', 'set', cache_pool, 'hit_set_type', 'bloom', + ]) def remove_cache_tier(self, cache_pool): - """ - Removes a cache tier from Ceph. Flushes all dirty objects from writeback pools and waits for that to complete. - :param cache_pool: six.string_types. The cache tier pool name to remove. - :return: None + """Removes a cache tier from Ceph. + + Flushes all dirty objects from writeback pools and waits for that to + complete. + + :param cache_pool: The cache tier pool name to remove. + :type cache_pool: str """ # read-only is easy, writeback is much harder mode = get_cache_mode(self.service, cache_pool) if mode == 'readonly': - check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'none']) - check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool]) + check_call([ + 'ceph', '--id', self.service, + 'osd', 'tier', 'cache-mode', cache_pool, 'none' + ]) + check_call([ + 'ceph', '--id', self.service, + 'osd', 'tier', 'remove', self.name, cache_pool, + ]) elif mode == 'writeback': pool_forward_cmd = ['ceph', '--id', self.service, 'osd', 'tier', @@ -276,9 +476,15 @@ class Pool(object): check_call(pool_forward_cmd) # Flush the cache and wait for it to return - check_call(['rados', '--id', self.service, '-p', cache_pool, 'cache-flush-evict-all']) - check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove-overlay', self.name]) - check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool]) + check_call([ + 'rados', '--id', self.service, + '-p', cache_pool, 'cache-flush-evict-all']) + check_call([ + 'ceph', '--id', self.service, + 'osd', 'tier', 'remove-overlay', self.name]) + check_call([ + 'ceph', '--id', self.service, + 'osd', 'tier', 'remove', self.name, cache_pool]) def get_pgs(self, pool_size, percent_data=DEFAULT_POOL_WEIGHT, device_class=None): @@ -305,19 +511,23 @@ class Pool(object): selected for the specific rule, rather it is left to the user to tune in the form of 'expected-osd-count' config option. - :param pool_size: int. pool_size is either the number of replicas for + :param pool_size: pool_size is either the number of replicas for replicated pools or the K+M sum for erasure coded pools - :param percent_data: float. the percentage of data that is expected to + :type pool_size: int + :param percent_data: the percentage of data that is expected to be contained in the pool for the specific OSD set. Default value is to assume 10% of the data is for this pool, which is a relatively low % of the data but allows for the pg_num to be increased. NOTE: the default is primarily to handle the scenario where related charms requiring pools has not been upgraded to include an update to indicate their relative usage of the pools. - :param device_class: str. class of storage to use for basis of pgs + :type percent_data: float + :param device_class: class of storage to use for basis of pgs calculation; ceph supports nvme, ssd and hdd by default based on presence of devices of each type in the deployment. - :return: int. The number of pgs to use. + :type device_class: str + :returns: The number of pgs to use. + :rtype: int """ # Note: This calculation follows the approach that is provided @@ -357,7 +567,8 @@ class Pool(object): return LEGACY_PG_COUNT percent_data /= 100.0 - target_pgs_per_osd = config('pgs-per-osd') or DEFAULT_PGS_PER_OSD_TARGET + target_pgs_per_osd = config( + 'pgs-per-osd') or DEFAULT_PGS_PER_OSD_TARGET num_pg = (target_pgs_per_osd * osd_count * percent_data) // pool_size # NOTE: ensure a sane minimum number of PGS otherwise we don't get any @@ -380,147 +591,174 @@ class Pool(object): return int(nearest) -class ReplicatedPool(Pool): - def __init__(self, service, name, pg_num=None, replicas=2, - percent_data=10.0, app_name=None): - super(ReplicatedPool, self).__init__(service=service, name=name) - self.replicas = replicas - self.percent_data = percent_data - if pg_num: +class Pool(BasePool): + """Compability shim for any descendents external to this library.""" + + @deprecate( + 'The ``Pool`` baseclass has been replaced by ``BasePool`` class.') + def __init__(self, service, name): + super(Pool, self).__init__(service, name=name) + + def create(self): + pass + + +class ReplicatedPool(BasePool): + def __init__(self, service, name=None, pg_num=None, replicas=None, + percent_data=None, app_name=None, op=None): + """Initialize ReplicatedPool object. + + Pool information is either initialized from individual keyword + arguments or from a individual CephBrokerRq operation Dict. + + Please refer to the docstring of the ``BasePool`` class for + documentation of the common parameters. + + :param pg_num: Express wish for number of Placement Groups (this value + is subject to validation against a running cluster prior + to use to avoid creating a pool with too many PGs) + :type pg_num: int + :param replicas: Number of copies there should be of each object added + to this replicated pool. + :type replicas: int + :raises: KeyError + """ + # NOTE: Do not perform initialization steps that require live data from + # a running cluster here. The *Pool classes may be used for validation. + + # The common parameters are handled in our parents initializer + super(ReplicatedPool, self).__init__( + service=service, name=name, percent_data=percent_data, + app_name=app_name, op=op) + + if op: + # When initializing from op `replicas` is a required attribute, and + # we will fail with KeyError if it is not provided. + self.replicas = op['replicas'] + self.pg_num = op.get('pg_num') + else: + self.replicas = replicas or 2 + self.pg_num = pg_num + + def _create(self): + # Do extra validation on pg_num with data from live cluster + if self.pg_num: # Since the number of placement groups were specified, ensure # that there aren't too many created. max_pgs = self.get_pgs(self.replicas, 100.0) - self.pg_num = min(pg_num, max_pgs) + self.pg_num = min(self.pg_num, max_pgs) else: - self.pg_num = self.get_pgs(self.replicas, percent_data) - if app_name: - self.app_name = app_name + self.pg_num = self.get_pgs(self.replicas, self.percent_data) + + # Create it + if self.nautilus_or_later: + cmd = [ + 'ceph', '--id', self.service, 'osd', 'pool', 'create', + '--pg-num-min={}'.format( + min(AUTOSCALER_DEFAULT_PGS, self.pg_num) + ), + self.name, str(self.pg_num) + ] else: - self.app_name = 'unknown' + cmd = [ + 'ceph', '--id', self.service, 'osd', 'pool', 'create', + self.name, str(self.pg_num) + ] + check_call(cmd) - def create(self): - if not pool_exists(self.service, self.name): - nautilus_or_later = cmp_pkgrevno('ceph-common', '14.2.0') >= 0 - # Create it - if nautilus_or_later: - cmd = [ - 'ceph', '--id', self.service, 'osd', 'pool', 'create', - '--pg-num-min={}'.format( - min(AUTOSCALER_DEFAULT_PGS, self.pg_num) - ), - self.name, str(self.pg_num) - ] - else: - cmd = [ - 'ceph', '--id', self.service, 'osd', 'pool', 'create', - self.name, str(self.pg_num) - ] - - try: - check_call(cmd) - # Set the pool replica size - update_pool(client=self.service, - pool=self.name, - settings={'size': str(self.replicas)}) - if nautilus_or_later: - # Ensure we set the expected pool ratio - update_pool(client=self.service, - pool=self.name, - settings={'target_size_ratio': str(self.percent_data / 100.0)}) - try: - set_app_name_for_pool(client=self.service, - pool=self.name, - name=self.app_name) - except CalledProcessError: - log('Could not set app name for pool {}'.format(self.name), level=WARNING) - if 'pg_autoscaler' in enabled_manager_modules(): - try: - enable_pg_autoscale(self.service, self.name) - except CalledProcessError as e: - log('Could not configure auto scaling for pool {}: {}'.format( - self.name, e), level=WARNING) - except CalledProcessError: - raise + def _post_create(self): + # Set the pool replica size + update_pool(client=self.service, + pool=self.name, + settings={'size': str(self.replicas)}) + # Perform other common post pool creation tasks + super(ReplicatedPool, self)._post_create() -# Default jerasure erasure coded pool -class ErasurePool(Pool): - def __init__(self, service, name, erasure_code_profile="default", - percent_data=10.0, app_name=None): - super(ErasurePool, self).__init__(service=service, name=name) - self.erasure_code_profile = erasure_code_profile - self.percent_data = percent_data - if app_name: - self.app_name = app_name +class ErasurePool(BasePool): + """Default jerasure erasure coded pool.""" + + def __init__(self, service, name=None, erasure_code_profile=None, + percent_data=None, app_name=None, op=None, + allow_ec_overwrites=False): + """Initialize ReplicatedPool object. + + Pool information is either initialized from individual keyword + arguments or from a individual CephBrokerRq operation Dict. + + Please refer to the docstring of the ``BasePool`` class for + documentation of the common parameters. + + :param erasure_code_profile: EC Profile to use (default: 'default') + :type erasure_code_profile: Optional[str] + """ + # NOTE: Do not perform initialization steps that require live data from + # a running cluster here. The *Pool classes may be used for validation. + + # The common parameters are handled in our parents initializer + super(ErasurePool, self).__init__( + service=service, name=name, percent_data=percent_data, + app_name=app_name, op=op) + + if op: + # Note that the different default when initializing from op stems + # from different handling of this in the `charms.ceph` library. + self.erasure_code_profile = op.get('erasure-profile', + 'default-canonical') + self.allow_ec_overwrites = op.get('allow-ec-overwrites') else: - self.app_name = 'unknown' + # We keep the class default when initialized from keyword arguments + # to not break the API for any other consumers. + self.erasure_code_profile = erasure_code_profile or 'default' + self.allow_ec_overwrites = allow_ec_overwrites - def create(self): - if not pool_exists(self.service, self.name): - # Try to find the erasure profile information in order to properly - # size the number of placement groups. The size of an erasure - # coded placement group is calculated as k+m. - erasure_profile = get_erasure_profile(self.service, - self.erasure_code_profile) + def _create(self): + # Try to find the erasure profile information in order to properly + # size the number of placement groups. The size of an erasure + # coded placement group is calculated as k+m. + erasure_profile = get_erasure_profile(self.service, + self.erasure_code_profile) - # Check for errors - if erasure_profile is None: - msg = ("Failed to discover erasure profile named " - "{}".format(self.erasure_code_profile)) - log(msg, level=ERROR) - raise PoolCreationError(msg) - if 'k' not in erasure_profile or 'm' not in erasure_profile: - # Error - msg = ("Unable to find k (data chunks) or m (coding chunks) " - "in erasure profile {}".format(erasure_profile)) - log(msg, level=ERROR) - raise PoolCreationError(msg) + # Check for errors + if erasure_profile is None: + msg = ("Failed to discover erasure profile named " + "{}".format(self.erasure_code_profile)) + log(msg, level=ERROR) + raise PoolCreationError(msg) + if 'k' not in erasure_profile or 'm' not in erasure_profile: + # Error + msg = ("Unable to find k (data chunks) or m (coding chunks) " + "in erasure profile {}".format(erasure_profile)) + log(msg, level=ERROR) + raise PoolCreationError(msg) - k = int(erasure_profile['k']) - m = int(erasure_profile['m']) - pgs = self.get_pgs(k + m, self.percent_data) - nautilus_or_later = cmp_pkgrevno('ceph-common', '14.2.0') >= 0 - # Create it - if nautilus_or_later: - cmd = [ - 'ceph', '--id', self.service, 'osd', 'pool', 'create', - '--pg-num-min={}'.format( - min(AUTOSCALER_DEFAULT_PGS, pgs) - ), - self.name, str(pgs), str(pgs), - 'erasure', self.erasure_code_profile - ] - else: - cmd = [ - 'ceph', '--id', self.service, 'osd', 'pool', 'create', - self.name, str(pgs), str(pgs), - 'erasure', self.erasure_code_profile - ] + k = int(erasure_profile['k']) + m = int(erasure_profile['m']) + pgs = self.get_pgs(k + m, self.percent_data) + self.nautilus_or_later = cmp_pkgrevno('ceph-common', '14.2.0') >= 0 + # Create it + if self.nautilus_or_later: + cmd = [ + 'ceph', '--id', self.service, 'osd', 'pool', 'create', + '--pg-num-min={}'.format( + min(AUTOSCALER_DEFAULT_PGS, pgs) + ), + self.name, str(pgs), str(pgs), + 'erasure', self.erasure_code_profile + ] + else: + cmd = [ + 'ceph', '--id', self.service, 'osd', 'pool', 'create', + self.name, str(pgs), str(pgs), + 'erasure', self.erasure_code_profile + ] + check_call(cmd) - try: - check_call(cmd) - try: - set_app_name_for_pool(client=self.service, - pool=self.name, - name=self.app_name) - except CalledProcessError: - log('Could not set app name for pool {}'.format(self.name), level=WARNING) - if nautilus_or_later: - # Ensure we set the expected pool ratio - update_pool(client=self.service, - pool=self.name, - settings={'target_size_ratio': str(self.percent_data / 100.0)}) - if 'pg_autoscaler' in enabled_manager_modules(): - try: - enable_pg_autoscale(self.service, self.name) - except CalledProcessError as e: - log('Could not configure auto scaling for pool {}: {}'.format( - self.name, e), level=WARNING) - except CalledProcessError: - raise - - """Get an existing erasure code profile if it already exists. - Returns json formatted output""" + def _post_create(self): + super(ErasurePool, self)._post_create() + if self.allow_ec_overwrites: + update_pool(self.service, self.name, + {'allow_ec_overwrites': 'true'}) def enabled_manager_modules(): @@ -541,22 +779,28 @@ def enabled_manager_modules(): def enable_pg_autoscale(service, pool_name): - """ - Enable Ceph's PG autoscaler for the specified pool. + """Enable Ceph's PG autoscaler for the specified pool. - :param service: six.string_types. The Ceph user name to run the command under - :param pool_name: six.string_types. The name of the pool to enable sutoscaling on - :raise: CalledProcessError if the command fails + :param service: The Ceph user name to run the command under + :type service: str + :param pool_name: The name of the pool to enable sutoscaling on + :type pool_name: str + :raises: CalledProcessError if the command fails """ - check_call(['ceph', '--id', service, 'osd', 'pool', 'set', pool_name, 'pg_autoscale_mode', 'on']) + check_call([ + 'ceph', '--id', service, + 'osd', 'pool', 'set', pool_name, 'pg_autoscale_mode', 'on']) def get_mon_map(service): - """ - Returns the current monitor map. - :param service: six.string_types. The Ceph user name to run the command under - :return: json string. :raise: ValueError if the monmap fails to parse. - Also raises CalledProcessError if our ceph command fails + """Return the current monitor map. + + :param service: The Ceph user name to run the command under + :type service: str + :returns: Dictionary with monitor map data + :rtype: Dict[str,any] + :raises: ValueError if the monmap fails to parse, CalledProcessError if our + ceph command fails. """ try: mon_status = check_output(['ceph', '--id', service, @@ -576,17 +820,16 @@ def get_mon_map(service): def hash_monitor_names(service): - """ + """Get a sorted list of monitor hashes in ascending order. + Uses the get_mon_map() function to get information about the monitor - cluster. - Hash the name of each monitor. Return a sorted list of monitor hashes - in an ascending order. - :param service: six.string_types. The Ceph user name to run the command under - :rtype : dict. json dict of monitor name, ip address and rank - example: { - 'name': 'ip-172-31-13-165', - 'rank': 0, - 'addr': '172.31.13.165:6789/0'} + cluster. Hash the name of each monitor. + + :param service: The Ceph user name to run the command under. + :type service: str + :returns: a sorted list of monitor hashes in an ascending order. + :rtype : List[str] + :raises: CalledProcessError, ValueError """ try: hash_list = [] @@ -603,46 +846,56 @@ def hash_monitor_names(service): def monitor_key_delete(service, key): - """ - Delete a key and value pair from the monitor cluster - :param service: six.string_types. The Ceph user name to run the command under + """Delete a key and value pair from the monitor cluster. + Deletes a key value pair on the monitor cluster. - :param key: six.string_types. The key to delete. + + :param service: The Ceph user name to run the command under + :type service: str + :param key: The key to delete. + :type key: str + :raises: CalledProcessError """ try: check_output( ['ceph', '--id', service, 'config-key', 'del', str(key)]) except CalledProcessError as e: - log("Monitor config-key put failed with message: {}".format( - e.output)) + log("Monitor config-key put failed with message: {}" + .format(e.output)) raise def monitor_key_set(service, key, value): - """ - Sets a key value pair on the monitor cluster. - :param service: six.string_types. The Ceph user name to run the command under - :param key: six.string_types. The key to set. - :param value: The value to set. This will be converted to a string - before setting + """Set a key value pair on the monitor cluster. + + :param service: The Ceph user name to run the command under. + :type service str + :param key: The key to set. + :type key: str + :param value: The value to set. This will be coerced into a string. + :type value: str + :raises: CalledProcessError """ try: check_output( ['ceph', '--id', service, 'config-key', 'put', str(key), str(value)]) except CalledProcessError as e: - log("Monitor config-key put failed with message: {}".format( - e.output)) + log("Monitor config-key put failed with message: {}" + .format(e.output)) raise def monitor_key_get(service, key): - """ - Gets the value of an existing key in the monitor cluster. - :param service: six.string_types. The Ceph user name to run the command under - :param key: six.string_types. The key to search for. + """Get the value of an existing key in the monitor cluster. + + :param service: The Ceph user name to run the command under + :type service: str + :param key: The key to search for. + :type key: str :return: Returns the value of that key or None if not found. + :rtype: Optional[str] """ try: output = check_output( @@ -650,19 +903,21 @@ def monitor_key_get(service, key): 'config-key', 'get', str(key)]).decode('UTF-8') return output except CalledProcessError as e: - log("Monitor config-key get failed with message: {}".format( - e.output)) + log("Monitor config-key get failed with message: {}" + .format(e.output)) return None def monitor_key_exists(service, key): - """ - Searches for the existence of a key in the monitor cluster. - :param service: six.string_types. The Ceph user name to run the command under - :param key: six.string_types. The key to search for - :return: Returns True if the key exists, False if not and raises an - exception if an unknown error occurs. :raise: CalledProcessError if - an unknown error occurs + """Search for existence of key in the monitor cluster. + + :param service: The Ceph user name to run the command under. + :type service: str + :param key: The key to search for. + :type key: str + :return: Returns True if the key exists, False if not. + :rtype: bool + :raises: CalledProcessError if an unknown error occurs. """ try: check_call( @@ -675,16 +930,20 @@ def monitor_key_exists(service, key): if e.returncode == errno.ENOENT: return False else: - log("Unknown error from ceph config-get exists: {} {}".format( - e.returncode, e.output)) + log("Unknown error from ceph config-get exists: {} {}" + .format(e.returncode, e.output)) raise def get_erasure_profile(service, name): - """ - :param service: six.string_types. The Ceph user name to run the command under - :param name: - :return: + """Get an existing erasure code profile if it exists. + + :param service: The Ceph user name to run the command under. + :type service: str + :param name: Name of profile. + :type name: str + :returns: Dictionary with profile data. + :rtype: Optional[Dict[str]] """ try: out = check_output(['ceph', '--id', service, @@ -698,54 +957,61 @@ def get_erasure_profile(service, name): def pool_set(service, pool_name, key, value): + """Sets a value for a RADOS pool in ceph. + + :param service: The Ceph user name to run the command under. + :type service: str + :param pool_name: Name of pool to set property on. + :type pool_name: str + :param key: Property key. + :type key: str + :param value: Value, will be coerced into str and shifted to lowercase. + :type value: str + :raises: CalledProcessError """ - Sets a value for a RADOS pool in ceph. - :param service: six.string_types. The Ceph user name to run the command under - :param pool_name: six.string_types - :param key: six.string_types - :param value: - :return: None. Can raise CalledProcessError - """ - cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', pool_name, key, - str(value).lower()] - try: - check_call(cmd) - except CalledProcessError: - raise + cmd = [ + 'ceph', '--id', service, + 'osd', 'pool', 'set', pool_name, key, str(value).lower()] + check_call(cmd) def snapshot_pool(service, pool_name, snapshot_name): + """Snapshots a RADOS pool in Ceph. + + :param service: The Ceph user name to run the command under. + :type service: str + :param pool_name: Name of pool to snapshot. + :type pool_name: str + :param snapshot_name: Name of snapshot to create. + :type snapshot_name: str + :raises: CalledProcessError """ - Snapshots a RADOS pool in ceph. - :param service: six.string_types. The Ceph user name to run the command under - :param pool_name: six.string_types - :param snapshot_name: six.string_types - :return: None. Can raise CalledProcessError - """ - cmd = ['ceph', '--id', service, 'osd', 'pool', 'mksnap', pool_name, snapshot_name] - try: - check_call(cmd) - except CalledProcessError: - raise + cmd = [ + 'ceph', '--id', service, + 'osd', 'pool', 'mksnap', pool_name, snapshot_name] + check_call(cmd) def remove_pool_snapshot(service, pool_name, snapshot_name): + """Remove a snapshot from a RADOS pool in Ceph. + + :param service: The Ceph user name to run the command under. + :type service: str + :param pool_name: Name of pool to remove snapshot from. + :type pool_name: str + :param snapshot_name: Name of snapshot to remove. + :type snapshot_name: str + :raises: CalledProcessError """ - Remove a snapshot from a RADOS pool in ceph. - :param service: six.string_types. The Ceph user name to run the command under - :param pool_name: six.string_types - :param snapshot_name: six.string_types - :return: None. Can raise CalledProcessError - """ - cmd = ['ceph', '--id', service, 'osd', 'pool', 'rmsnap', pool_name, snapshot_name] - try: - check_call(cmd) - except CalledProcessError: - raise + cmd = [ + 'ceph', '--id', service, + 'osd', 'pool', 'rmsnap', pool_name, snapshot_name] + check_call(cmd) def set_pool_quota(service, pool_name, max_bytes=None, max_objects=None): - """ + """Set byte quota on a RADOS pool in Ceph. + :param service: The Ceph user name to run the command under :type service: str :param pool_name: Name of pool @@ -756,7 +1022,9 @@ def set_pool_quota(service, pool_name, max_bytes=None, max_objects=None): :type max_objects: int :raises: subprocess.CalledProcessError """ - cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name] + cmd = [ + 'ceph', '--id', service, + 'osd', 'pool', 'set-quota', pool_name] if max_bytes: cmd = cmd + ['max_bytes', str(max_bytes)] if max_objects: @@ -765,119 +1033,216 @@ def set_pool_quota(service, pool_name, max_bytes=None, max_objects=None): def remove_pool_quota(service, pool_name): + """Remove byte quota on a RADOS pool in Ceph. + + :param service: The Ceph user name to run the command under. + :type service: str + :param pool_name: Name of pool to remove quota from. + :type pool_name: str + :raises: CalledProcessError """ - Set a byte quota on a RADOS pool in ceph. - :param service: six.string_types. The Ceph user name to run the command under - :param pool_name: six.string_types - :return: None. Can raise CalledProcessError - """ - cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name, 'max_bytes', '0'] - try: - check_call(cmd) - except CalledProcessError: - raise + cmd = [ + 'ceph', '--id', service, + 'osd', 'pool', 'set-quota', pool_name, 'max_bytes', '0'] + check_call(cmd) def remove_erasure_profile(service, profile_name): + """Remove erasure code profile. + + :param service: The Ceph user name to run the command under + :type service: str + :param profile_name: Name of profile to remove. + :type profile_name: str + :raises: CalledProcessError """ - Create a new erasure code profile if one does not already exist for it. Updates - the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/ - for more details - :param service: six.string_types. The Ceph user name to run the command under - :param profile_name: six.string_types - :return: None. Can raise CalledProcessError - """ - cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'rm', - profile_name] - try: - check_call(cmd) - except CalledProcessError: - raise + cmd = [ + 'ceph', '--id', service, + 'osd', 'erasure-code-profile', 'rm', profile_name] + check_call(cmd) -def create_erasure_profile(service, profile_name, erasure_plugin_name='jerasure', - failure_domain='host', +def create_erasure_profile(service, profile_name, + erasure_plugin_name='jerasure', + failure_domain=None, data_chunks=2, coding_chunks=1, locality=None, durability_estimator=None, - device_class=None): - """ - Create a new erasure code profile if one does not already exist for it. Updates - the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/ - for more details - :param service: six.string_types. The Ceph user name to run the command under - :param profile_name: six.string_types - :param erasure_plugin_name: six.string_types - :param failure_domain: six.string_types. One of ['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region', - 'room', 'root', 'row']) - :param data_chunks: int - :param coding_chunks: int - :param locality: int - :param durability_estimator: int - :param device_class: six.string_types - :return: None. Can raise CalledProcessError - """ - # Ensure this failure_domain is allowed by Ceph - validator(failure_domain, six.string_types, - ['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region', 'room', 'root', 'row']) + helper_chunks=None, + scalar_mds=None, + crush_locality=None, + device_class=None, + erasure_plugin_technique=None): + """Create a new erasure code profile if one does not already exist for it. - cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'set', profile_name, - 'plugin=' + erasure_plugin_name, 'k=' + str(data_chunks), 'm=' + str(coding_chunks) - ] - if locality is not None and durability_estimator is not None: - raise ValueError("create_erasure_profile should be called with k, m and one of l or c but not both.") + Updates the profile if it exists. Please refer to [0] for more details. + + 0: http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/ + + :param service: The Ceph user name to run the command under. + :type service: str + :param profile_name: Name of profile. + :type profile_name: str + :param erasure_plugin_name: Erasure code plugin. + :type erasure_plugin_name: str + :param failure_domain: Failure domain, one of: + ('chassis', 'datacenter', 'host', 'osd', 'pdu', + 'pod', 'rack', 'region', 'room', 'root', 'row'). + :type failure_domain: str + :param data_chunks: Number of data chunks. + :type data_chunks: int + :param coding_chunks: Number of coding chunks. + :type coding_chunks: int + :param locality: Locality. + :type locality: int + :param durability_estimator: Durability estimator. + :type durability_estimator: int + :param helper_chunks: int + :type helper_chunks: int + :param device_class: Restrict placement to devices of specific class. + :type device_class: str + :param scalar_mds: one of ['isa', 'jerasure', 'shec'] + :type scalar_mds: str + :param crush_locality: LRC locality faulure domain, one of: + ('chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', + 'rack', 'region', 'room', 'root', 'row') or unset. + :type crush_locaity: str + :param erasure_plugin_technique: Coding technique for EC plugin + :type erasure_plugin_technique: str + :return: None. Can raise CalledProcessError, ValueError or AssertionError + """ + plugin_techniques = { + 'jerasure': [ + 'reed_sol_van', + 'reed_sol_r6_op', + 'cauchy_orig', + 'cauchy_good', + 'liberation', + 'blaum_roth', + 'liber8tion' + ], + 'lrc': [], + 'isa': [ + 'reed_sol_van', + 'cauchy', + ], + 'shec': [ + 'single', + 'multiple' + ], + 'clay': [], + } + failure_domains = [ + 'chassis', 'datacenter', + 'host', 'osd', + 'pdu', 'pod', + 'rack', 'region', + 'room', 'root', + 'row', + ] + device_classes = [ + 'ssd', + 'hdd', + 'nvme' + ] + + validator(erasure_plugin_name, six.string_types, + list(plugin_techniques.keys())) + + cmd = [ + 'ceph', '--id', service, + 'osd', 'erasure-code-profile', 'set', profile_name, + 'plugin={}'.format(erasure_plugin_name), + 'k={}'.format(str(data_chunks)), + 'm={}'.format(str(coding_chunks)), + ] + + if erasure_plugin_technique: + validator(erasure_plugin_technique, six.string_types, + plugin_techniques[erasure_plugin_name]) + cmd.append('technique={}'.format(erasure_plugin_technique)) luminous_or_later = cmp_pkgrevno('ceph-common', '12.0.0') >= 0 - # failure_domain changed in luminous - if luminous_or_later: - cmd.append('crush-failure-domain=' + failure_domain) - else: - cmd.append('ruleset-failure-domain=' + failure_domain) + + # Set failure domain from options if not provided in args + if not failure_domain and config('customize-failure-domain'): + # Defaults to 'host' so just need to deal with + # setting 'rack' if feature is enabled + failure_domain = 'rack' + + if failure_domain: + validator(failure_domain, six.string_types, failure_domains) + # failure_domain changed in luminous + if luminous_or_later: + cmd.append('crush-failure-domain={}'.format(failure_domain)) + else: + cmd.append('ruleset-failure-domain={}'.format(failure_domain)) # device class new in luminous if luminous_or_later and device_class: + validator(device_class, six.string_types, device_classes) cmd.append('crush-device-class={}'.format(device_class)) else: log('Skipping device class configuration (ceph < 12.0.0)', level=DEBUG) # Add plugin specific information - if locality is not None: - # For local erasure codes - cmd.append('l=' + str(locality)) - if durability_estimator is not None: - # For Shec erasure codes - cmd.append('c=' + str(durability_estimator)) + if erasure_plugin_name == 'lrc': + # LRC mandatory configuration + if locality: + cmd.append('l={}'.format(str(locality))) + else: + raise ValueError("locality must be provided for lrc plugin") + # LRC optional configuration + if crush_locality: + validator(crush_locality, six.string_types, failure_domains) + cmd.append('crush-locality={}'.format(crush_locality)) + + if erasure_plugin_name == 'shec': + # SHEC optional configuration + if durability_estimator: + cmd.append('c={}'.format((durability_estimator))) + + if erasure_plugin_name == 'clay': + # CLAY optional configuration + if helper_chunks: + cmd.append('d={}'.format(str(helper_chunks))) + if scalar_mds: + cmd.append('scalar-mds={}'.format(scalar_mds)) if erasure_profile_exists(service, profile_name): cmd.append('--force') - try: - check_call(cmd) - except CalledProcessError: - raise + check_call(cmd) def rename_pool(service, old_name, new_name): - """ - Rename a Ceph pool from old_name to new_name - :param service: six.string_types. The Ceph user name to run the command under - :param old_name: six.string_types - :param new_name: six.string_types - :return: None + """Rename a Ceph pool from old_name to new_name. + + :param service: The Ceph user name to run the command under. + :type service: str + :param old_name: Name of pool subject to rename. + :type old_name: str + :param new_name: Name to rename pool to. + :type new_name: str """ validator(value=old_name, valid_type=six.string_types) validator(value=new_name, valid_type=six.string_types) - cmd = ['ceph', '--id', service, 'osd', 'pool', 'rename', old_name, new_name] + cmd = [ + 'ceph', '--id', service, + 'osd', 'pool', 'rename', old_name, new_name] check_call(cmd) def erasure_profile_exists(service, name): - """ - Check to see if an Erasure code profile already exists. - :param service: six.string_types. The Ceph user name to run the command under - :param name: six.string_types - :return: int or None + """Check to see if an Erasure code profile already exists. + + :param service: The Ceph user name to run the command under + :type service: str + :param name: Name of profile to look for. + :type name: str + :returns: True if it exists, False otherwise. + :rtype: bool """ validator(value=name, valid_type=six.string_types) try: @@ -890,11 +1255,14 @@ def erasure_profile_exists(service, name): def get_cache_mode(service, pool_name): - """ - Find the current caching mode of the pool_name given. - :param service: six.string_types. The Ceph user name to run the command under - :param pool_name: six.string_types - :return: int or None + """Find the current caching mode of the pool_name given. + + :param service: The Ceph user name to run the command under + :type service: str + :param pool_name: Name of pool. + :type pool_name: str + :returns: Current cache mode. + :rtype: Optional[int] """ validator(value=service, valid_type=six.string_types) validator(value=pool_name, valid_type=six.string_types) @@ -976,17 +1344,23 @@ def create_rbd_image(service, pool, image, sizemb): def update_pool(client, pool, settings): + """Update pool properties. + + :param client: Client/User-name to authenticate with. + :type client: str + :param pool: Name of pool to operate on + :type pool: str + :param settings: Dictionary with key/value pairs to set. + :type settings: Dict[str, str] + :raises: CalledProcessError + """ cmd = ['ceph', '--id', client, 'osd', 'pool', 'set', pool] for k, v in six.iteritems(settings): - cmd.append(k) - cmd.append(v) - - check_call(cmd) + check_call(cmd + [k, v]) def set_app_name_for_pool(client, pool, name): - """ - Calls `osd pool application enable` for the specified pool name + """Calls `osd pool application enable` for the specified pool name :param client: Name of the ceph client to use :type client: str @@ -1043,8 +1417,7 @@ def _keyring_path(service): def add_key(service, key): - """ - Add a key to a keyring. + """Add a key to a keyring. Creates the keyring if it doesn't already exist. @@ -1288,13 +1661,33 @@ class CephBrokerRq(object): The API is versioned and defaults to version 1. """ - def __init__(self, api_version=1, request_id=None): - self.api_version = api_version - if request_id: - self.request_id = request_id + def __init__(self, api_version=1, request_id=None, raw_request_data=None): + """Initialize CephBrokerRq object. + + Builds a new empty request or rebuilds a request from on-wire JSON + data. + + :param api_version: API version for request (default: 1). + :type api_version: Optional[int] + :param request_id: Unique identifier for request. + (default: string representation of generated UUID) + :type request_id: Optional[str] + :param raw_request_data: JSON-encoded string to build request from. + :type raw_request_data: Optional[str] + :raises: KeyError + """ + if raw_request_data: + request_data = json.loads(raw_request_data) + self.api_version = request_data['api-version'] + self.request_id = request_data['request-id'] + self.set_ops(request_data['ops']) else: - self.request_id = str(uuid.uuid1()) - self.ops = [] + self.api_version = api_version + if request_id: + self.request_id = request_id + else: + self.request_id = str(uuid.uuid1()) + self.ops = [] def add_op(self, op): """Add an op if it is not already in the list. @@ -1336,12 +1729,119 @@ class CephBrokerRq(object): group=group, namespace=namespace, app_name=app_name, max_bytes=max_bytes, max_objects=max_objects) + # Use function parameters and docstring to define types in a compatible + # manner. + # + # NOTE: Our caller should always use a kwarg Dict when calling us so + # no need to maintain fixed order/position for parameters. Please keep them + # sorted by name when adding new ones. + def _partial_build_common_op_create(self, + app_name=None, + compression_algorithm=None, + compression_mode=None, + compression_required_ratio=None, + compression_min_blob_size=None, + compression_min_blob_size_hdd=None, + compression_min_blob_size_ssd=None, + compression_max_blob_size=None, + compression_max_blob_size_hdd=None, + compression_max_blob_size_ssd=None, + group=None, + max_bytes=None, + max_objects=None, + namespace=None, + weight=None): + """Build common part of a create pool operation. + + :param app_name: Tag pool with application name. Note that there is + certain protocols emerging upstream with regard to + meaningful application names to use. + Examples are 'rbd' and 'rgw'. + :type app_name: Optional[str] + :param compression_algorithm: Compressor to use, one of: + ('lz4', 'snappy', 'zlib', 'zstd') + :type compression_algorithm: Optional[str] + :param compression_mode: When to compress data, one of: + ('none', 'passive', 'aggressive', 'force') + :type compression_mode: Optional[str] + :param compression_required_ratio: Minimum compression ratio for data + chunk, if the requested ratio is not + achieved the compressed version will + be thrown away and the original + stored. + :type compression_required_ratio: Optional[float] + :param compression_min_blob_size: Chunks smaller than this are never + compressed (unit: bytes). + :type compression_min_blob_size: Optional[int] + :param compression_min_blob_size_hdd: Chunks smaller than this are not + compressed when destined to + rotational media (unit: bytes). + :type compression_min_blob_size_hdd: Optional[int] + :param compression_min_blob_size_ssd: Chunks smaller than this are not + compressed when destined to flash + media (unit: bytes). + :type compression_min_blob_size_ssd: Optional[int] + :param compression_max_blob_size: Chunks larger than this are broken + into N * compression_max_blob_size + chunks before being compressed + (unit: bytes). + :type compression_max_blob_size: Optional[int] + :param compression_max_blob_size_hdd: Chunks larger than this are + broken into + N * compression_max_blob_size_hdd + chunks before being compressed + when destined for rotational + media (unit: bytes) + :type compression_max_blob_size_hdd: Optional[int] + :param compression_max_blob_size_ssd: Chunks larger than this are + broken into + N * compression_max_blob_size_ssd + chunks before being compressed + when destined for flash media + (unit: bytes). + :type compression_max_blob_size_ssd: Optional[int] + :param group: Group to add pool to + :type group: Optional[str] + :param max_bytes: Maximum bytes quota to apply + :type max_bytes: Optional[int] + :param max_objects: Maximum objects quota to apply + :type max_objects: Optional[int] + :param namespace: Group namespace + :type namespace: Optional[str] + :param weight: The percentage of data that is expected to be contained + in the pool from the total available space on the OSDs. + Used to calculate number of Placement Groups to create + for pool. + :type weight: Optional[float] + :returns: Dictionary with kwarg name as key. + :rtype: Dict[str,any] + :raises: AssertionError + """ + return { + 'app-name': app_name, + 'compression-algorithm': compression_algorithm, + 'compression-mode': compression_mode, + 'compression-required-ratio': compression_required_ratio, + 'compression-min-blob-size': compression_min_blob_size, + 'compression-min-blob-size-hdd': compression_min_blob_size_hdd, + 'compression-min-blob-size-ssd': compression_min_blob_size_ssd, + 'compression-max-blob-size': compression_max_blob_size, + 'compression-max-blob-size-hdd': compression_max_blob_size_hdd, + 'compression-max-blob-size-ssd': compression_max_blob_size_ssd, + 'group': group, + 'max-bytes': max_bytes, + 'max-objects': max_objects, + 'group-namespace': namespace, + 'weight': weight, + } + def add_op_create_replicated_pool(self, name, replica_count=3, pg_num=None, - weight=None, group=None, namespace=None, - app_name=None, max_bytes=None, - max_objects=None): + **kwargs): """Adds an operation to create a replicated pool. + Refer to docstring for ``_partial_build_common_op_create`` for + documentation of keyword arguments. + :param name: Name of pool to create :type name: str :param replica_count: Number of copies Ceph should keep of your data. @@ -1349,66 +1849,114 @@ class CephBrokerRq(object): :param pg_num: Request specific number of Placement Groups to create for pool. :type pg_num: int - :param weight: The percentage of data that is expected to be contained - in the pool from the total available space on the OSDs. - Used to calculate number of Placement Groups to create - for pool. - :type weight: float - :param group: Group to add pool to - :type group: str - :param namespace: Group namespace - :type namespace: str - :param app_name: (Optional) Tag pool with application name. Note that - there is certain protocols emerging upstream with - regard to meaningful application names to use. - Examples are ``rbd`` and ``rgw``. - :type app_name: str - :param max_bytes: Maximum bytes quota to apply - :type max_bytes: int - :param max_objects: Maximum objects quota to apply - :type max_objects: int + :raises: AssertionError if provided data is of invalid type/range """ - if pg_num and weight: + if pg_num and kwargs.get('weight'): raise ValueError('pg_num and weight are mutually exclusive') - self.add_op({'op': 'create-pool', 'name': name, - 'replicas': replica_count, 'pg_num': pg_num, - 'weight': weight, 'group': group, - 'group-namespace': namespace, 'app-name': app_name, - 'max-bytes': max_bytes, 'max-objects': max_objects}) + op = { + 'op': 'create-pool', + 'name': name, + 'replicas': replica_count, + 'pg_num': pg_num, + } + op.update(self._partial_build_common_op_create(**kwargs)) + + # Initialize Pool-object to validate type and range of ops. + pool = ReplicatedPool('dummy-service', op=op) + pool.validate() + + self.add_op(op) def add_op_create_erasure_pool(self, name, erasure_profile=None, - weight=None, group=None, app_name=None, - max_bytes=None, max_objects=None): + allow_ec_overwrites=False, **kwargs): """Adds an operation to create a erasure coded pool. + Refer to docstring for ``_partial_build_common_op_create`` for + documentation of keyword arguments. + :param name: Name of pool to create :type name: str :param erasure_profile: Name of erasure code profile to use. If not set the ceph-mon unit handling the broker request will set its default value. :type erasure_profile: str - :param weight: The percentage of data that is expected to be contained - in the pool from the total available space on the OSDs. - :type weight: float - :param group: Group to add pool to - :type group: str - :param app_name: (Optional) Tag pool with application name. Note that - there is certain protocols emerging upstream with - regard to meaningful application names to use. - Examples are ``rbd`` and ``rgw``. - :type app_name: str - :param max_bytes: Maximum bytes quota to apply - :type max_bytes: int - :param max_objects: Maximum objects quota to apply - :type max_objects: int + :param allow_ec_overwrites: allow EC pools to be overriden + :type allow_ec_overwrites: bool + :raises: AssertionError if provided data is of invalid type/range """ - self.add_op({'op': 'create-pool', 'name': name, - 'pool-type': 'erasure', - 'erasure-profile': erasure_profile, - 'weight': weight, - 'group': group, 'app-name': app_name, - 'max-bytes': max_bytes, 'max-objects': max_objects}) + op = { + 'op': 'create-pool', + 'name': name, + 'pool-type': 'erasure', + 'erasure-profile': erasure_profile, + 'allow-ec-overwrites': allow_ec_overwrites, + } + op.update(self._partial_build_common_op_create(**kwargs)) + + # Initialize Pool-object to validate type and range of ops. + pool = ErasurePool('dummy-service', op) + pool.validate() + + self.add_op(op) + + def add_op_create_erasure_profile(self, name, + erasure_type='jerasure', + erasure_technique=None, + k=None, m=None, + failure_domain=None, + lrc_locality=None, + shec_durability_estimator=None, + clay_helper_chunks=None, + device_class=None, + clay_scalar_mds=None, + lrc_crush_locality=None): + """Adds an operation to create a erasure coding profile. + + :param name: Name of profile to create + :type name: str + :param erasure_type: Which of the erasure coding plugins should be used + :type erasure_type: string + :param erasure_technique: EC plugin technique to use + :type erasure_technique: string + :param k: Number of data chunks + :type k: int + :param m: Number of coding chunks + :type m: int + :param lrc_locality: Group the coding and data chunks into sets of size locality + (lrc plugin) + :type lrc_locality: int + :param durability_estimator: The number of parity chuncks each of which includes + a data chunk in its calculation range (shec plugin) + :type durability_estimator: int + :param helper_chunks: The number of helper chunks to use for recovery operations + (clay plugin) + :type: helper_chunks: int + :param failure_domain: Type of failure domain from Ceph bucket types + to be used + :type failure_domain: string + :param device_class: Device class to use for profile (ssd, hdd) + :type device_class: string + :param clay_scalar_mds: Plugin to use for CLAY layered construction + (jerasure|isa|shec) + :type clay_scaler_mds: string + :param lrc_crush_locality: Type of crush bucket in which set of chunks + defined by lrc_locality will be stored. + :type lrc_crush_locality: string + """ + self.add_op({'op': 'create-erasure-profile', + 'name': name, + 'k': k, + 'm': m, + 'l': lrc_locality, + 'c': shec_durability_estimator, + 'd': clay_helper_chunks, + 'erasure-type': erasure_type, + 'erasure-technique': erasure_technique, + 'failure-domain': failure_domain, + 'device-class': device_class, + 'scalar-mds': clay_scalar_mds, + 'crush-locality': lrc_crush_locality}) def set_ops(self, ops): """Set request ops to provided value. @@ -1424,12 +1972,14 @@ class CephBrokerRq(object): 'request-id': self.request_id}) def _ops_equal(self, other): + keys_to_compare = [ + 'replicas', 'name', 'op', 'pg_num', 'group-permission', + 'object-prefix-permissions', + ] + keys_to_compare += list(self._partial_build_common_op_create().keys()) if len(self.ops) == len(other.ops): for req_no in range(0, len(self.ops)): - for key in [ - 'replicas', 'name', 'op', 'pg_num', 'weight', - 'group', 'group-namespace', 'group-permission', - 'object-prefix-permissions']: + for key in keys_to_compare: if self.ops[req_no].get(key) != other.ops[req_no].get(key): return False else: @@ -1522,18 +2072,15 @@ class CephBrokerRsp(object): def get_previous_request(rid): """Return the last ceph broker request sent on a given relation - @param rid: Relation id to query for request + :param rid: Relation id to query for request + :type rid: str + :returns: CephBrokerRq object or None if relation data not found. + :rtype: Optional[CephBrokerRq] """ - request = None broker_req = relation_get(attribute='broker_req', rid=rid, unit=local_unit()) if broker_req: - request_data = json.loads(broker_req) - request = CephBrokerRq(api_version=request_data['api-version'], - request_id=request_data['request-id']) - request.set_ops(request_data['ops']) - - return request + return CephBrokerRq(raw_request_data=broker_req) def get_request_states(request, relation='ceph'): diff --git a/tests/bundles/focal-ussuri-dvr-snat.yaml b/tests/bundles/focal-ussuri-dvr-snat.yaml index 1e90eb84..55aa01ed 100644 --- a/tests/bundles/focal-ussuri-dvr-snat.yaml +++ b/tests/bundles/focal-ussuri-dvr-snat.yaml @@ -1,5 +1,5 @@ variables: - openstack-origin: &openstack-origin distro + openstack-origin: &openstack-origin distro-proposed series: &series focal @@ -20,6 +20,8 @@ machines: 9: constraints: "root-disk=20G mem=4G" 10: {} + 11: {} + 12: {} # We specify machine placements for these to improve iteration # time, given that machine "0" comes up way before machine "7" @@ -113,6 +115,27 @@ applications: openstack-origin: *openstack-origin to: - '10' + vault-mysql-router: + charm: cs:~openstack-charmers-next/mysql-router + vault: + charm: cs:~openstack-charmers-next/vault + num_units: 1 + to: + - '11' + ovn-central: + charm: cs:~openstack-charmers-next/ovn-central + num_units: 3 + options: + source: *openstack-origin + to: + - '12' + neutron-api-plugin-ovn: + charm: cs:~openstack-charmers-next/neutron-api-plugin-ovn + ovn-chassis: + charm: cs:~openstack-charmers-next/ovn-chassis + options: + # start new units paused to allow unit by unit OVS to OVN migration + new-units-paused: true relations: - - 'neutron-api:amqp' - 'rabbitmq-server:amqp' @@ -156,3 +179,26 @@ relations: - ["glance-mysql-router:db-router", "mysql-innodb-cluster:db-router"] - ["neutron-mysql-router:db-router", "mysql-innodb-cluster:db-router"] - ["placement-mysql-router:db-router", "mysql-innodb-cluster:db-router"] + # We need to defer the addition of the neutron-api-plugin-ovn subordinate + # relation to the functional test as the test will first validate the legacy + # Neutron ML2+OVS topology, migrate it to OVN and then confirm connectivity + # post migration. + # + # - - neutron-api-plugin-ovn:neutron-plugin + # - neutron-api:neutron-plugin-api-subordinate + - - ovn-central:certificates + - vault:certificates + - - ovn-central:ovsdb-cms + - neutron-api-plugin-ovn:ovsdb-cms + - - ovn-chassis:nova-compute + - nova-compute:neutron-plugin + - - ovn-chassis:certificates + - vault:certificates + - - ovn-chassis:ovsdb + - ovn-central:ovsdb + - - vault:certificates + - neutron-api-plugin-ovn:certificates + - - vault:shared-db + - vault-mysql-router:shared-db + - - vault-mysql-router:db-router + - mysql-innodb-cluster:db-router diff --git a/tests/bundles/focal-victoria-dvr-snat.yaml b/tests/bundles/focal-victoria-dvr-snat.yaml index 7ff4037d..fc3f3ae9 100644 --- a/tests/bundles/focal-victoria-dvr-snat.yaml +++ b/tests/bundles/focal-victoria-dvr-snat.yaml @@ -20,6 +20,8 @@ machines: 9: constraints: "root-disk=20G mem=4G" 10: {} + 11: {} + 12: {} # We specify machine placements for these to improve iteration # time, given that machine "0" comes up way before machine "7" @@ -113,6 +115,27 @@ applications: openstack-origin: *openstack-origin to: - '10' + vault-mysql-router: + charm: cs:~openstack-charmers-next/mysql-router + vault: + charm: cs:~openstack-charmers-next/vault + num_units: 1 + to: + - '11' + ovn-central: + charm: cs:~openstack-charmers-next/ovn-central + num_units: 3 + options: + source: *openstack-origin + to: + - '12' + neutron-api-plugin-ovn: + charm: cs:~openstack-charmers-next/neutron-api-plugin-ovn + ovn-chassis: + charm: cs:~openstack-charmers-next/ovn-chassis + options: + # start new units paused to allow unit by unit OVS to OVN migration + new-units-paused: true relations: - - 'neutron-api:amqp' - 'rabbitmq-server:amqp' @@ -156,3 +179,26 @@ relations: - ["glance-mysql-router:db-router", "mysql-innodb-cluster:db-router"] - ["neutron-mysql-router:db-router", "mysql-innodb-cluster:db-router"] - ["placement-mysql-router:db-router", "mysql-innodb-cluster:db-router"] + # We need to defer the addition of the neutron-api-plugin-ovn subordinate + # relation to the functional test as the test will first validate the legacy + # Neutron ML2+OVS topology, migrate it to OVN and then confirm connectivity + # post migration. + # + # - - neutron-api-plugin-ovn:neutron-plugin + # - neutron-api:neutron-plugin-api-subordinate + - - ovn-central:certificates + - vault:certificates + - - ovn-central:ovsdb-cms + - neutron-api-plugin-ovn:ovsdb-cms + - - ovn-chassis:nova-compute + - nova-compute:neutron-plugin + - - ovn-chassis:certificates + - vault:certificates + - - ovn-chassis:ovsdb + - ovn-central:ovsdb + - - vault:certificates + - neutron-api-plugin-ovn:certificates + - - vault:shared-db + - vault-mysql-router:shared-db + - - vault-mysql-router:db-router + - mysql-innodb-cluster:db-router diff --git a/tests/bundles/groovy-victoria-dvr-snat.yaml b/tests/bundles/groovy-victoria-dvr-snat.yaml index 2b7d1b42..3de50a36 100644 --- a/tests/bundles/groovy-victoria-dvr-snat.yaml +++ b/tests/bundles/groovy-victoria-dvr-snat.yaml @@ -20,6 +20,8 @@ machines: 9: constraints: "root-disk=20G mem=4G" 10: {} + 11: {} + 12: {} # We specify machine placements for these to improve iteration # time, given that machine "0" comes up way before machine "7" @@ -113,6 +115,27 @@ applications: openstack-origin: *openstack-origin to: - '10' + vault-mysql-router: + charm: cs:~openstack-charmers-next/mysql-router + vault: + charm: cs:~openstack-charmers-next/vault + num_units: 1 + to: + - '11' + ovn-central: + charm: cs:~openstack-charmers-next/ovn-central + num_units: 3 + options: + source: *openstack-origin + to: + - '12' + neutron-api-plugin-ovn: + charm: cs:~openstack-charmers-next/neutron-api-plugin-ovn + ovn-chassis: + charm: cs:~openstack-charmers-next/ovn-chassis + options: + # start new units paused to allow unit by unit OVS to OVN migration + new-units-paused: true relations: - - 'neutron-api:amqp' - 'rabbitmq-server:amqp' @@ -156,3 +179,26 @@ relations: - ["glance-mysql-router:db-router", "mysql-innodb-cluster:db-router"] - ["neutron-mysql-router:db-router", "mysql-innodb-cluster:db-router"] - ["placement-mysql-router:db-router", "mysql-innodb-cluster:db-router"] + # We need to defer the addition of the neutron-api-plugin-ovn subordinate + # relation to the functional test as the test will first validate the legacy + # Neutron ML2+OVS topology, migrate it to OVN and then confirm connectivity + # post migration. + # + # - - neutron-api-plugin-ovn:neutron-plugin + # - neutron-api:neutron-plugin-api-subordinate + - - ovn-central:certificates + - vault:certificates + - - ovn-central:ovsdb-cms + - neutron-api-plugin-ovn:ovsdb-cms + - - ovn-chassis:nova-compute + - nova-compute:neutron-plugin + - - ovn-chassis:certificates + - vault:certificates + - - ovn-chassis:ovsdb + - ovn-central:ovsdb + - - vault:certificates + - neutron-api-plugin-ovn:certificates + - - vault:shared-db + - vault-mysql-router:shared-db + - - vault-mysql-router:db-router + - mysql-innodb-cluster:db-router diff --git a/tests/tests.yaml b/tests/tests.yaml index 0ae839dd..5fd7f895 100644 --- a/tests/tests.yaml +++ b/tests/tests.yaml @@ -1,7 +1,12 @@ charm_name: neutron-openvswitch +# NOTE: the OVN migration test runs at the end of a regular gate check and adds +# no more than 5-10 minutes to each job. We want this to run from focal-ussuri +# and onwards as that is the point where we recomend our users to migrate from +# OVS to OVN. + smoke_bundles: - - bionic-ussuri-dvr-snat + - migrate-ovn: focal-ussuri-dvr-snat gate_bundles: - trusty-mitaka @@ -14,21 +19,52 @@ gate_bundles: - bionic-stein-dvr-snat - bionic-train-dvr-snat - bionic-ussuri-dvr-snat - - focal-ussuri-dvr-snat - - focal-victoria-dvr-snat + - migrate-ovn: focal-ussuri-dvr-snat + - migrate-ovn: focal-victoria-dvr-snat dev_bundles: - - groovy-victoria-dvr-snat + - migrate-ovn: groovy-victoria-dvr-snat configure: - zaza.openstack.charm_tests.glance.setup.add_lts_image - zaza.openstack.charm_tests.neutron.setup.basic_overcloud_network - zaza.openstack.charm_tests.nova.setup.create_flavors - zaza.openstack.charm_tests.nova.setup.manage_ssh_key + - migrate-ovn: + - zaza.openstack.charm_tests.vault.setup.auto_initialize_no_validation + - zaza.openstack.charm_tests.glance.setup.add_lts_image + - zaza.openstack.charm_tests.neutron.setup.basic_overcloud_network + - zaza.openstack.charm_tests.nova.setup.create_flavors + - zaza.openstack.charm_tests.nova.setup.manage_ssh_key + - zaza.openstack.charm_tests.ovn.setup.pre_migration_configuration + +configure_options: + configure_gateway_ext_port_use_juju_wait: false + +target_deploy_status: + neutron-api-plugin-ovn: + workload-status: waiting + ovn-chassis: + workload-status: maintenance + workload-status-message: "Paused. Use 'resume' action to resume normal service." + ovn-central: + workload-status: waiting + workload-status-message: "'ovsdb-peer' incomplete, 'certificates' awaiting server certificate data" + vault: + workload-status: blocked + workload-status-message: Vault needs to be initialized tests: - zaza.openstack.charm_tests.neutron.tests.NeutronNetworkingTest - zaza.openstack.charm_tests.neutron.tests.NeutronOpenvSwitchTest + - migrate-ovn: + - zaza.openstack.charm_tests.neutron.tests.NeutronNetworkingTest + - zaza.openstack.charm_tests.ovn.tests.OVSOVNMigrationTest + - zaza.openstack.charm_tests.neutron.tests.NeutronNetworkingTest tests_options: + # NOTE: This allows us to run the NeutronNetworkingTest multiple times while + # reusing the instances created for the first run. This both saves time and + # allows verifying instances survive a SDN migration. + zaza.openstack.charm_tests.neutron.tests.NeutronNetworkingTest.test_instances_have_networking.run_resource_cleanup: false force_deploy: - groovy-victoria-dvr-snat diff --git a/unit_tests/test_actions_cleanup.py b/unit_tests/test_actions_cleanup.py new file mode 100644 index 00000000..706c79d7 --- /dev/null +++ b/unit_tests/test_actions_cleanup.py @@ -0,0 +1,185 @@ +# Copyright 2020 Canonical Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock + +import test_utils + +with mock.patch('neutron_ovs_utils.register_configs') as configs: + configs.return_value = 'test-config' + import cleanup as actions + + +class CleanupTestCase(test_utils.CharmTestCase): + + def setUp(self): + super(CleanupTestCase, self).setUp( + actions, [ + 'ch_core', + 'ch_openstack_utils', + 'ch_ovs', + 'neutron_ipset_cleanup', + 'neutron_netns_cleanup', + 'remove_patch_ports', + 'remove_per_bridge_controllers', + 'subprocess', + ]) + + def test_cleanup(self): + self.ch_openstack_utils.is_unit_paused_set.return_value = False + with self.assertRaises(actions.UnitNotPaused): + actions.cleanup([]) + self.ch_openstack_utils.is_unit_paused_set.return_value = True + with self.assertRaises(actions.MandatoryConfigurationNotSet): + actions.cleanup([]) + self.ch_core.hookenv.config.return_value = 'openvswitch' + self.ch_core.hookenv.action_get.return_value = False + with self.assertRaises(actions.MandatoryConfigurationNotSet): + actions.cleanup([]) + self.ch_core.hookenv.action_get.return_value = True + actions.cleanup([]) + self.remove_patch_ports.assert_called_once_with('br-int') + self.ch_ovs.del_bridge.assert_called_once_with('br-tun') + self.remove_per_bridge_controllers.assert_called_once_with() + self.neutron_netns_cleanup.assert_called_once_with() + self.neutron_ipset_cleanup.assert_called_once_with() + + +class HelperTestCase(test_utils.CharmTestCase): + + def setUp(self): + super(HelperTestCase, self).setUp( + actions, [ + 'ch_ovsdb', + ]) + + @mock.patch.object(actions.ch_ovs, 'del_bridge_port') + @mock.patch.object(actions.ch_ovs, 'patch_ports_on_bridge') + def test_remove_patch_ports( + self, _patch_ports_on_bridge, _del_bridge_port): + _patch_ports_on_bridge.return_value = [actions.ch_ovs.Patch( + this_end=actions.ch_ovs.PatchPort( + bridge='this-end-bridge', + port='this-end-port'), + other_end=actions.ch_ovs.PatchPort( + bridge='other-end-bridge', + port='other-end-port')), + ] + actions.remove_patch_ports('fake-bridge') + _patch_ports_on_bridge.assert_called_once_with( + 'fake-bridge') + _del_bridge_port.assert_has_calls([ + mock.call('this-end-bridge', 'this-end-port', linkdown=False), + mock.call('other-end-bridge', 'other-end-port', linkdown=False), + ]) + + def test_remove_per_bridge_controllers(self): + bridge = mock.MagicMock() + bridge.__getitem__.return_value = 'fake-uuid' + ovsdb = mock.MagicMock() + ovsdb.bridge.__iter__.return_value = [bridge] + self.ch_ovsdb.SimpleOVSDB.return_value = ovsdb + actions.remove_per_bridge_controllers() + ovsdb.bridge.clear.assert_called_once_with('fake-uuid', 'controller') + + @mock.patch.object(actions.subprocess, 'check_call') + def test_neutron_ipset_cleanup(self, _check_call): + actions.neutron_ipset_cleanup() + _check_call.assert_called_once_with( + ( + 'neutron-ipset-cleanup', + '--config-file=/etc/neutron/neutron.conf', + '--config-file=/etc/neutron/plugins/ml2/openvswitch_agent.ini', + )) + + @mock.patch.object(actions.os.path, 'exists') + @mock.patch.object(actions.os, 'unlink') + @mock.patch.object(actions.subprocess, 'check_call') + def test_neutron_netns_cleanup(self, _check_call, _unlink, _exists): + _exists.return_value = True + with test_utils.patch_open() as (_open, _file): + actions.neutron_netns_cleanup() + _open.assert_called_once_with( + '/etc/neutron/rootwrap.d/charm-n-ovs.filters', 'w') + _file.write.assert_called_once_with( + '[Filters]\n' + 'neutron.cmd.netns_cleanup: CommandFilter, ip, root\n') + _check_call.assert_has_calls([ + # FIXME: remove once package deps have been backported + mock.call(('apt', '-y', 'install', 'net-tools')), + mock.call( + ( + 'neutron-netns-cleanup', + '--force', + '--config-file=/etc/neutron/neutron.conf', + '--config-file=/etc/neutron/l3_agent.ini', + '--config-file=/etc/neutron/fwaas_driver.ini', + '--config-file=/etc/neutron/dhcp_agent.ini', + )), + ]) + _unlink.assert_called_once_with( + '/etc/neutron/rootwrap.d/charm-n-ovs.filters') + # Confirm behaviour when a config does not exist + _exists.reset_mock() + _exists.side_effect = [True, True, True, False] + _check_call.reset_mock() + actions.neutron_netns_cleanup() + _check_call.assert_has_calls([ + # FIXME: remove once package deps have been backported + mock.call(('apt', '-y', 'install', 'net-tools')), + mock.call( + ( + 'neutron-netns-cleanup', + '--force', + '--config-file=/etc/neutron/neutron.conf', + '--config-file=/etc/neutron/l3_agent.ini', + '--config-file=/etc/neutron/fwaas_driver.ini', + )), + ]) + + +class MainTestCase(test_utils.CharmTestCase): + + def setUp(self): + super(MainTestCase, self).setUp(actions, [ + 'ch_core' + ]) + + def test_invokes_action(self): + dummy_calls = [] + + def dummy_action(args): + dummy_calls.append(True) + + with mock.patch.dict(actions.ACTIONS, {'foo': dummy_action}): + actions.main(['foo']) + self.assertEqual(dummy_calls, [True]) + + def test_unknown_action(self): + """Unknown actions aren't a traceback.""" + exit_string = actions.main(['foo']) + self.assertEqual('Action "foo" undefined', exit_string) + + def test_failing_action(self): + """Actions which traceback trigger action_fail() calls.""" + dummy_calls = [] + + self.ch_core.hookenv.action_fail.side_effect = dummy_calls.append + + def dummy_action(args): + raise ValueError('uh oh') + + with mock.patch.dict(actions.ACTIONS, {'foo': dummy_action}): + actions.main(['foo']) + self.assertEqual(dummy_calls, ['Action "foo" failed: "uh oh"'])