Refactor ceph-manager code to use sysinv-api instead of RPC

The monitor module of ceph-manager has support for executing
certain actions during an upgrade. This code is intended
to support alarm filtering or other actions necessary to
migrate ceph-related code from one version to another.

The code currently uses an RPC call to conductor in order
to fetch upgrade information from sysinv. This patch changes
ceph-manager to use the REST API instead of an RCP. The main
reason for this migration is a planned change on sysinv to
use ZeroMQ for RPC, instead of RabbitMQ. This change on
sysinv would require changes on ceph-manager to use the new
protocol. Using the REST API has other advantages over RPC,
such as decoupling and resilience.

Story: 2010087
Task: 46225

Test Plan:
1. Unit tests for the new code
2. Installed the new code and verified the code works by
   forcing the upgrade check to run (with and without an
   active upgrade)
3. Fresh installation of ISO

Signed-off-by: Isac Souza <IsacSacchi.Souza@windriver.com>
Change-Id: Ifd763825cd184192f100a119db6c3aee92708706
This commit is contained in:
Isac Souza 2022-09-07 15:53:46 -03:00 committed by Al Bailey
parent 971f9b6630
commit 958237c229
9 changed files with 222 additions and 46 deletions

View File

@ -6,6 +6,7 @@
- stx-utilities-tox-pep8 - stx-utilities-tox-pep8
- stx-utilities-tox-pylint - stx-utilities-tox-pylint
- stx-utilities-ceph-manager-tox-bandit - stx-utilities-ceph-manager-tox-bandit
- stx-utilities-ceph-manager-tox-py39
- stx-utilities-ceph-client-tox-bandit - stx-utilities-ceph-client-tox-bandit
- stx-utilities-pci-irq-affinity-agent-tox-py27 - stx-utilities-pci-irq-affinity-agent-tox-py27
- stx-utilities-pci-irq-affinity-agent-tox-py39 - stx-utilities-pci-irq-affinity-agent-tox-py39
@ -16,6 +17,7 @@
- stx-utilities-tox-pep8 - stx-utilities-tox-pep8
- stx-utilities-tox-pylint - stx-utilities-tox-pylint
- stx-utilities-ceph-manager-tox-bandit - stx-utilities-ceph-manager-tox-bandit
- stx-utilities-ceph-manager-tox-py39
- stx-utilities-ceph-client-tox-bandit - stx-utilities-ceph-client-tox-bandit
- stx-utilities-pci-irq-affinity-agent-tox-py27 - stx-utilities-pci-irq-affinity-agent-tox-py27
- stx-utilities-pci-irq-affinity-agent-tox-py39 - stx-utilities-pci-irq-affinity-agent-tox-py39
@ -71,6 +73,19 @@
tox_envlist: bandit tox_envlist: bandit
tox_extra_args: -c ./ceph/ceph-manager/ceph-manager/tox.ini tox_extra_args: -c ./ceph/ceph-manager/ceph-manager/tox.ini
- job:
name: stx-utilities-ceph-manager-tox-py39
parent: tox
description: |
Run py39 unittests for utilities ceph-mananger
nodeset: debian-bullseye
files:
- ./ceph/ceph-manager/ceph-manager/*
vars:
tox_envlist: py39
python_version: 3.9
tox_extra_args: -c ./ceph/ceph-manager/ceph-manager/tox.ini
- job: - job:
name: stx-utilities-ceph-client-tox-bandit name: stx-utilities-ceph-client-tox-bandit
parent: tox parent: tox

View File

@ -21,6 +21,7 @@ from ceph_manager.i18n import _
from ceph_manager.i18n import _LE from ceph_manager.i18n import _LE
from ceph_manager.i18n import _LI from ceph_manager.i18n import _LI
from ceph_manager.i18n import _LW from ceph_manager.i18n import _LW
from ceph_manager.sysinv_api import upgrade
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -35,12 +36,14 @@ LOG = logging.getLogger(__name__)
# #
class HandleUpgradesMixin(object): class HandleUpgradesMixin(object):
def __init__(self, service): def __init__(self, service, conf):
self.service = service self.service = service
self.sysinv_upgrade_api = upgrade.SysinvUpgradeApi(conf)
self.wait_for_upgrade_complete = False self.wait_for_upgrade_complete = False
def setup(self, config): def setup(self, config):
self._set_upgrade(self.service.retry_get_software_upgrade_status()) self._set_upgrade(
self.sysinv_upgrade_api.retry_get_software_upgrade_status())
def _set_upgrade(self, upgrade): def _set_upgrade(self, upgrade):
state = upgrade.get('state') state = upgrade.get('state')
@ -109,7 +112,7 @@ class HandleUpgradesMixin(object):
and (constants.CEPH_HEALTH_WARN_REQUIRE_JEWEL_OSDS_NOT_SET and (constants.CEPH_HEALTH_WARN_REQUIRE_JEWEL_OSDS_NOT_SET
in health['detail'])): in health['detail'])):
try: try:
upgrade = self.service.get_software_upgrade_status() upgrade = self.sysinv_upgrade_api.get_software_upgrade_status()
except Exception as ex: except Exception as ex:
LOG.warn(_LW( LOG.warn(_LW(
"Getting software upgrade status failed " "Getting software upgrade status failed "
@ -152,7 +155,7 @@ class HandleUpgradesMixin(object):
class Monitor(HandleUpgradesMixin): class Monitor(HandleUpgradesMixin):
def __init__(self, service): def __init__(self, service, conf):
self.service = service self.service = service
self.current_ceph_health = "" self.current_ceph_health = ""
self.tiers_size = {} self.tiers_size = {}
@ -160,7 +163,7 @@ class Monitor(HandleUpgradesMixin):
self.primary_tier_name = constants.SB_TIER_DEFAULT_NAMES[ self.primary_tier_name = constants.SB_TIER_DEFAULT_NAMES[
constants.SB_TIER_TYPE_CEPH] + constants.CEPH_CRUSH_TIER_SUFFIX constants.SB_TIER_TYPE_CEPH] + constants.CEPH_CRUSH_TIER_SUFFIX
self.cluster_is_up = False self.cluster_is_up = False
super(Monitor, self).__init__(service) super(Monitor, self).__init__(service, conf)
def setup(self, config): def setup(self, config):
super(Monitor, self).setup(config) super(Monitor, self).setup(config)

View File

@ -23,13 +23,10 @@ import oslo_messaging as messaging
from oslo_service import service from oslo_service import service
# noinspection PyUnresolvedReferences # noinspection PyUnresolvedReferences
from oslo_service.periodic_task import PeriodicTasks from oslo_service.periodic_task import PeriodicTasks
# noinspection PyUnresolvedReferences
from retrying import retry
from ceph_manager import constants from ceph_manager import constants
from ceph_manager import utils from ceph_manager import utils
from ceph_manager.i18n import _LI from ceph_manager.i18n import _LI
from ceph_manager.i18n import _LW
from ceph_manager.monitor import Monitor from ceph_manager.monitor import Monitor
from cephclient import wrapper from cephclient import wrapper
@ -94,38 +91,16 @@ class RpcEndpoint(PeriodicTasks):
return self.service.monitor.cluster_is_up return self.service.monitor.cluster_is_up
class SysinvConductorUpgradeApi(object): class Service(service.Service):
def __init__(self):
self.sysinv_conductor = None
super(SysinvConductorUpgradeApi, self).__init__()
def get_software_upgrade_status(self):
LOG.info(_LI("Getting software upgrade status from sysinv"))
cctxt = self.sysinv_conductor.prepare(timeout=2)
upgrade = cctxt.call({}, 'get_software_upgrade_status')
LOG.info(_LI("Software upgrade status: %s") % str(upgrade))
return upgrade
@retry(wait_fixed=1000,
retry_on_exception=lambda e:
LOG.warn(_LW(
"Getting software upgrade status failed "
"with: %s. Retrying... ") % str(e)) or True)
def retry_get_software_upgrade_status(self):
return self.get_software_upgrade_status()
class Service(SysinvConductorUpgradeApi, service.Service):
def __init__(self, conf): def __init__(self, conf):
super(Service, self).__init__() super(Service, self).__init__()
self.conf = conf self.conf = conf
self.rpc_server = None self.rpc_server = None
self.sysinv_conductor = None
self.ceph_api = None self.ceph_api = None
self.entity_instance_id = '' self.entity_instance_id = ''
self.fm_api = fm_api.FaultAPIs() self.fm_api = fm_api.FaultAPIs()
self.monitor = Monitor(self) self.monitor = Monitor(self, conf)
self.config = None self.config = None
self.config_desired = None self.config_desired = None
self.config_applied = None self.config_applied = None
@ -135,17 +110,13 @@ class Service(SysinvConductorUpgradeApi, service.Service):
# pylint: disable=protected-access # pylint: disable=protected-access
sysinv_conf = self.conf._namespace._normalized[0]['DEFAULT'] sysinv_conf = self.conf._namespace._normalized[0]['DEFAULT']
url = "rabbit://{user}:{password}@{host}:{port}".format( url = "rabbit://{user}:{password}@{host}:{port}"\
user=sysinv_conf['rabbit_userid'][0], "".format(user=sysinv_conf['rabbit_userid'][0],
password=sysinv_conf['rabbit_password'][0], password=sysinv_conf['rabbit_password'][0],
host=utils.ipv6_bracketed(sysinv_conf['rabbit_host'][0]), host=utils.ipv6_bracketed(
port=sysinv_conf['rabbit_port'][0] sysinv_conf['rabbit_host'][0]),
) port=sysinv_conf['rabbit_port'][0])
transport = messaging.get_transport(self.conf, url=url) transport = messaging.get_transport(self.conf, url=url)
self.sysinv_conductor = messaging.RPCClient(
transport,
messaging.Target(
topic=constants.SYSINV_CONDUCTOR_TOPIC))
self.ceph_api = wrapper.CephWrapper( self.ceph_api = wrapper.CephWrapper(
endpoint='http://localhost:{}'.format(constants.CEPH_MGR_PORT)) endpoint='http://localhost:{}'.format(constants.CEPH_MGR_PORT))

View File

@ -0,0 +1,3 @@
# Copyright (c) 2022 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0

View File

@ -0,0 +1,93 @@
# Copyright (c) 2022 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
from keystoneauth1 import session as ksession
from keystoneauth1.identity import v3
from oslo_log import log
from retrying import retry
from urllib3 import util
from ceph_manager.i18n import _LI
from ceph_manager.i18n import _LW
from ceph_manager.utils import ipv6_bracketed
LOG = log.getLogger(__name__)
MAX_RETRY = 5
class SysinvUpgradeApi(object):
def __init__(self, conf):
# pylint: disable=protected-access
sysinv_conf = conf._namespace._normalized[0]['DEFAULT']
sysinv_api_bind_ip = sysinv_conf['sysinv_api_bind_ip'][0]
sysinv_api_port = sysinv_conf['sysinv_api_port'][0]
self.base_url = util.Url(
scheme='http',
host=ipv6_bracketed(sysinv_api_bind_ip),
port=sysinv_api_port,
path='/v1').url
# pylint: disable=protected-access
auth_conf = conf._namespace._normalized[0]['keystone_authtoken']
self.auth_url = auth_conf['auth_url'][0]
self.auth_username = auth_conf['username'][0]
self.auth_password = auth_conf['password'][0]
self.auth_user_domain_name = auth_conf['user_domain_name'][0]
self.auth_project_name = auth_conf['project_name'][0]
self.auth_project_domain_name = auth_conf['project_domain_name'][0]
def _rest_api_request(self, method, api_cmd, api_cmd_headers=None,
api_cmd_payload=None):
headers = {}
headers['Accept'] = "application/json"
if api_cmd_headers is not None:
headers.update(api_cmd_headers)
session = self._get_session()
response = session.request(
api_cmd, method, headers=headers, json=api_cmd_payload)
return response.json()
def _get_session(self):
auth = v3.Password(auth_url=self.auth_url + "/v3",
username=self.auth_username,
password=self.auth_password,
project_name=self.auth_project_name,
user_domain_name=self.auth_user_domain_name,
project_domain_name=self.auth_project_domain_name)
session = ksession.Session(auth=auth)
return session
def _get_upgrades(self):
url = self.base_url + '/upgrade'
response = self._rest_api_request('GET', url)
return response.get('upgrades', [])
def get_software_upgrade_status(self):
LOG.info(_LI("Getting software upgrade status from sysinv"))
upgrade = {
'from_version': None,
'to_version': None,
'state': None
}
upgrades = self._get_upgrades()
if upgrades:
upgrade = upgrades[0]
LOG.info(_LI("Software upgrade status: %s") % str(upgrade))
return upgrade
@retry(stop_max_attempt_number=MAX_RETRY,
wait_fixed=1000,
retry_on_exception=lambda e:
LOG.warn(_LW(
"Getting software upgrade status failed "
"with: %s. Retrying... ") % str(e)) or True)
def retry_get_software_upgrade_status(self):
return self.get_software_upgrade_status()

View File

@ -0,0 +1,84 @@
import unittest
from keystoneauth1.exceptions import base
import mock
from ceph_manager.sysinv_api import upgrade
SYSINV_CONF = {
'sysinv_api_bind_ip': '192.168.1.1',
'sysinv_api_port': 12345
}
KEYSTONE_CONF = {
'auth_url': 'http://example.com',
'username': 'sysadmin',
'password': 'hunter2',
'user_domain_name': 'Default',
'project_name': 'sysinv',
'project_domain_name': 'Default'
}
UPGRADE_DICT = {
'from_version': '123',
'to_version': '456',
'state': 'done'
}
class SysinvUpgradeApiTest(unittest.TestCase):
def setUp(self):
conf = mock.MagicMock()
conf._namespace._normalized.return_value = [{'DEFAULT': SYSINV_CONF}]
conf._namespace._normalized.return_value = [
{'keystone_authtoken': KEYSTONE_CONF}]
self.api = upgrade.SysinvUpgradeApi(conf)
self.session_mock = mock.MagicMock()
self.response_mock = mock.MagicMock()
self.session_mock.request.return_value = self.response_mock
self.api._get_session = mock.MagicMock(return_value=self.session_mock)
def test_get_software_upgrade_status_has_upgrade(self):
self.response_mock.json.return_value = {'upgrades': [UPGRADE_DICT]}
status = self.api.get_software_upgrade_status()
self.session_mock.request.assert_called_once()
assert status == UPGRADE_DICT
def test_get_software_upgrade_status_no_upgrade(self):
expected = {
'from_version': None,
'to_version': None,
'state': None
}
self.response_mock.json.return_value = {'upgrades': []}
status = self.api.get_software_upgrade_status()
self.session_mock.request.assert_called_once()
assert status == expected
def test_retry_get_software_upgrade_status_should_retry(self):
self.response_mock.json.return_value = {'upgrades': [UPGRADE_DICT]}
self.session_mock.request.side_effect = [
base.ClientException('Boom!'), self.response_mock]
status = self.api.retry_get_software_upgrade_status()
assert self.session_mock.request.call_count == 2
assert status == UPGRADE_DICT
def test_retry_get_software_upgrade_status_retry_limit(self):
ex = base.ClientException('Boom!')
self.session_mock.request.side_effect = [
ex for _ in range(upgrade.MAX_RETRY+1)]
with self.assertRaises(base.ClientException) as context:
self.api.retry_get_software_upgrade_status()
assert context.exception == ex
assert self.session_mock.request.call_count == upgrade.MAX_RETRY

View File

@ -13,7 +13,7 @@ setuptools.setup(
version='1.0.0', version='1.0.0',
description='CEPH manager', description='CEPH manager',
license='Apache-2.0', license='Apache-2.0',
packages=['ceph_manager'], packages=['ceph_manager', 'ceph_manager.sysinv_api'],
entry_points={ entry_points={
} }
) )

View File

@ -6,7 +6,9 @@ bandit;python_version>="3.0"
mock mock
flake8 flake8
eventlet eventlet
keystoneauth1
pytest pytest
oslo.log oslo.log
oslo.i18n oslo.i18n
flake8-import-order flake8-import-order
retrying

View File

@ -2,7 +2,7 @@
[tox] [tox]
minversion = 1.6 minversion = 1.6
envlist = py27,pep8,bandit envlist = py27,py39,pep8,bandit
skipsdist = True skipsdist = True
# tox does not work if the path to the workdir is too long, so move it to /tmp # tox does not work if the path to the workdir is too long, so move it to /tmp
toxworkdir = /tmp/{env:USER}_utilities_ceph_manager_tox toxworkdir = /tmp/{env:USER}_utilities_ceph_manager_tox
@ -11,7 +11,9 @@ toxworkdir = /tmp/{env:USER}_utilities_ceph_manager_tox
basepython = python3.9 basepython = python3.9
setenv = VIRTUAL_ENV={envdir} setenv = VIRTUAL_ENV={envdir}
usedevelop = True usedevelop = True
install_command = pip install -U --force-reinstall {opts} {packages} install_command = pip install -U --force-reinstall \
-c{env:UPPER_CONSTRAINTS_FILE:https://opendev.org/starlingx/root/raw/branch/master/build-tools/requirements/debian/upper-constraints.txt} \
{opts} {packages}
deps = -r{toxinidir}/test-requirements.txt deps = -r{toxinidir}/test-requirements.txt
commands = pytest {posargs} commands = pytest {posargs}
whitelist_externals = bash whitelist_externals = bash
@ -19,6 +21,9 @@ passenv = http_proxy HTTP_PROXY https_proxy HTTPS_PROXY no_proxy NO_PROXY
[testenv:py27] [testenv:py27]
basepython = python2.7 basepython = python2.7
install_command = pip install -U --force-reinstall \
-c{env:UPPER_CONSTRAINTS_FILE:https://opendev.org/openstack/requirements/raw/branch/stable/stein/upper-constraints.txt} \
{opts} {packages}
[testenv:pep8] [testenv:pep8]
commands = commands =
@ -40,6 +45,6 @@ skips = B104,B110
exclude = tests exclude = tests
[testenv:bandit] [testenv:bandit]
basepython = python3 basepython = python3.9
deps = -r{toxinidir}/test-requirements.txt deps = -r{toxinidir}/test-requirements.txt
commands = bandit --ini tox.ini -n 5 -r ceph_manager commands = bandit --ini tox.ini -n 5 -r ceph_manager