Relocate ceph-manager to stx-integ/ceph/ceph-manager

Move content from stx-upstream to stx-integ

Packages will be relocated to

stx-integ:
    ceph/
        ceph
        ceph-manager

Change-Id: I129faa448e2e52fc82101ae7ebc8ad5688f21523
Story: 2002801
Task: 22687
Signed-off-by: Scott Little <scott.little@windriver.com>
This commit is contained in:
Scott Little 2018-08-13 11:39:37 -04:00
parent d4a5366f53
commit 557dfe686c
23 changed files with 3279 additions and 0 deletions

6
ceph/ceph-manager/.gitignore vendored Normal file
View File

@ -0,0 +1,6 @@
!.distro
.distro/centos7/rpmbuild/RPMS
.distro/centos7/rpmbuild/SRPMS
.distro/centos7/rpmbuild/BUILD
.distro/centos7/rpmbuild/BUILDROOT
.distro/centos7/rpmbuild/SOURCES/ceph-manager*tar.gz

202
ceph/ceph-manager/LICENSE Normal file
View File

@ -0,0 +1,202 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -0,0 +1,13 @@
Metadata-Version: 1.1
Name: ceph-manager
Version: 1.0
Summary: Handle Ceph API calls and provide status updates via alarms
Home-page:
Author: Windriver
Author-email: info@windriver.com
License: Apache-2.0
Description: Handle Ceph API calls and provide status updates via alarms
Platform: UNKNOWN

View File

@ -0,0 +1,3 @@
SRC_DIR="ceph-manager"
COPY_LIST_TO_TAR="files scripts"
TIS_PATCH_VER=4

View File

@ -0,0 +1,70 @@
Summary: Handle Ceph API calls and provide status updates via alarms
Name: ceph-manager
Version: 1.0
Release: %{tis_patch_ver}%{?_tis_dist}
License: Apache-2.0
Group: base
Packager: Wind River <info@windriver.com>
URL: unknown
Source0: %{name}-%{version}.tar.gz
BuildRequires: python-setuptools
BuildRequires: systemd-units
BuildRequires: systemd-devel
Requires: sysinv
%description
Handle Ceph API calls and provide status updates via alarms.
Handle sysinv RPC calls for long running Ceph API operations:
- cache tiering enable
- cache tiering disable
%define local_bindir /usr/bin/
%define local_etc_initd /etc/init.d/
%define local_etc_logrotated /etc/logrotate.d/
%define pythonroot /usr/lib64/python2.7/site-packages
%define debug_package %{nil}
%prep
%setup
%build
%{__python} setup.py build
%install
%{__python} setup.py install --root=$RPM_BUILD_ROOT \
--install-lib=%{pythonroot} \
--prefix=/usr \
--install-data=/usr/share \
--single-version-externally-managed
install -d -m 755 %{buildroot}%{local_etc_initd}
install -p -D -m 700 scripts/init.d/ceph-manager %{buildroot}%{local_etc_initd}/ceph-manager
install -d -m 755 %{buildroot}%{local_bindir}
install -p -D -m 700 scripts/bin/ceph-manager %{buildroot}%{local_bindir}/ceph-manager
install -d -m 755 %{buildroot}%{local_etc_logrotated}
install -p -D -m 644 files/ceph-manager.logrotate %{buildroot}%{local_etc_logrotated}/ceph-manager.logrotate
install -d -m 755 %{buildroot}%{_unitdir}
install -m 644 -p -D files/%{name}.service %{buildroot}%{_unitdir}/%{name}.service
%clean
rm -rf $RPM_BUILD_ROOT
# Note: The package name is ceph-manager but the import name is ceph_manager so
# can't use '%{name}'.
%files
%defattr(-,root,root,-)
%doc LICENSE
%{local_bindir}/*
%{local_etc_initd}/*
%{_unitdir}/%{name}.service
%dir %{local_etc_logrotated}
%{local_etc_logrotated}/*
%dir %{pythonroot}/ceph_manager
%{pythonroot}/ceph_manager/*
%dir %{pythonroot}/ceph_manager-%{version}.0-py2.7.egg-info
%{pythonroot}/ceph_manager-%{version}.0-py2.7.egg-info/*

View File

@ -0,0 +1,202 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -0,0 +1,5 @@
#
# Copyright (c) 2016 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#

View File

@ -0,0 +1,705 @@
#
# Copyright (c) 2016 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import copy
import contextlib
import functools
import math
import subprocess
import time
import traceback
# noinspection PyUnresolvedReferences
import eventlet
# noinspection PyUnresolvedReferences
from eventlet.semaphore import Semaphore
# noinspection PyUnresolvedReferences
from oslo_log import log as logging
# noinspection PyUnresolvedReferences
from sysinv.conductor.cache_tiering_service_config import ServiceConfig
from i18n import _LI, _LW, _LE
import constants
import exception
import ceph
LOG = logging.getLogger(__name__)
CEPH_POOLS = copy.deepcopy(constants.CEPH_POOLS)
MAX_WAIT = constants.CACHE_FLUSH_MAX_WAIT_OBJ_COUNT_DECREASE_SEC
MIN_WAIT = constants.CACHE_FLUSH_MIN_WAIT_OBJ_COUNT_DECREASE_SEC
class LockOwnership(object):
def __init__(self, sem):
self.sem = sem
@contextlib.contextmanager
def __call__(self):
try:
yield
finally:
if self.sem:
self.sem.release()
def transfer(self):
new_lo = LockOwnership(self.sem)
self.sem = None
return new_lo
class Lock(object):
def __init__(self):
self.sem = Semaphore(value=1)
def try_lock(self):
result = self.sem.acquire(blocking=False)
if result:
return LockOwnership(self.sem)
class CacheTiering(object):
def __init__(self, service):
self.service = service
self.lock = Lock()
# will be unlocked by set_initial_config()
self._init_config_lock = self.lock.try_lock()
self.config = None
self.config_desired = None
self.config_applied = None
self.target_max_bytes = {}
def set_initial_config(self, config):
with self._init_config_lock():
LOG.info("Setting Ceph cache tiering initial configuration")
self.config = ServiceConfig.from_dict(
config.get(constants.CACHE_TIERING, {})) or \
ServiceConfig()
self.config_desired = ServiceConfig.from_dict(
config.get(constants.CACHE_TIERING_DESIRED, {})) or \
ServiceConfig()
self.config_applied = ServiceConfig.from_dict(
config.get(constants.CACHE_TIERING_APPLIED, {})) or \
ServiceConfig()
if self.config_desired:
LOG.debug("set_initial_config config_desired %s " %
self.config_desired.to_dict())
if self.config_applied:
LOG.debug("set_initial_config config_applied %s " %
self.config_applied.to_dict())
# Check that previous caching tier operation completed
# successfully or perform recovery
if (self.config_desired and
self.config_applied and
(self.config_desired.cache_enabled !=
self.config_applied.cache_enabled)):
if self.config_desired.cache_enabled:
self.enable_cache(self.config_desired.to_dict(),
self.config_applied.to_dict(),
self._init_config_lock.transfer())
else:
self.disable_cache(self.config_desired.to_dict(),
self.config_applied.to_dict(),
self._init_config_lock.transfer())
def is_locked(self):
lock_ownership = self.lock.try_lock()
if not lock_ownership:
return True
with lock_ownership():
return False
def update_pools_info(self):
global CEPH_POOLS
cfg = self.service.sysinv_conductor.call(
{}, 'get_ceph_pools_config')
CEPH_POOLS = copy.deepcopy(cfg)
LOG.info(_LI("update_pools_info: pools: {}").format(CEPH_POOLS))
def enable_cache(self, new_config, applied_config, lock_ownership=None):
new_config = ServiceConfig.from_dict(new_config)
applied_config = ServiceConfig.from_dict(applied_config)
if not lock_ownership:
lock_ownership = self.lock.try_lock()
if not lock_ownership:
raise exception.CephCacheEnableFailure()
with lock_ownership():
eventlet.spawn(self.do_enable_cache,
new_config, applied_config,
lock_ownership.transfer())
def do_enable_cache(self, new_config, applied_config, lock_ownership):
LOG.info(_LI("cache_tiering_enable_cache: "
"new_config={}, applied_config={}").format(
new_config.to_dict(), applied_config.to_dict()))
_unwind_actions = []
with lock_ownership():
success = False
_exception = None
try:
self.config_desired.cache_enabled = True
self.update_pools_info()
for pool in CEPH_POOLS:
if (pool['pool_name'] ==
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL or
pool['pool_name'] ==
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER):
object_pool_name = \
self.service.monitor._get_object_pool_name()
pool['pool_name'] = object_pool_name
self.cache_pool_create(pool)
_unwind_actions.append(
functools.partial(self.cache_pool_delete, pool))
for pool in CEPH_POOLS:
if (pool['pool_name'] ==
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL or
pool['pool_name'] ==
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER):
object_pool_name = \
self.service.monitor._get_object_pool_name()
pool['pool_name'] = object_pool_name
self.cache_tier_add(pool)
_unwind_actions.append(
functools.partial(self.cache_tier_remove, pool))
for pool in CEPH_POOLS:
if (pool['pool_name'] ==
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL or
pool['pool_name'] ==
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER):
object_pool_name = \
self.service.monitor._get_object_pool_name()
pool['pool_name'] = object_pool_name
self.cache_mode_set(pool, 'writeback')
self.cache_pool_set_config(pool, new_config)
self.cache_overlay_create(pool)
success = True
except Exception as e:
LOG.error(_LE('Failed to enable cache: reason=%s') %
traceback.format_exc())
for action in reversed(_unwind_actions):
try:
action()
except Exception:
LOG.warn(_LW('Failed cache enable '
'unwind action: reason=%s') %
traceback.format_exc())
success = False
_exception = str(e)
finally:
self.service.monitor.monitor_check_cache_tier(success)
if success:
self.config_applied.cache_enabled = True
self.service.sysinv_conductor.call(
{}, 'cache_tiering_enable_cache_complete',
success=success, exception=_exception,
new_config=new_config.to_dict(),
applied_config=applied_config.to_dict())
# Run first update of periodic target_max_bytes
self.update_cache_target_max_bytes()
@contextlib.contextmanager
def ignore_ceph_failure(self):
try:
yield
except exception.CephManagerException:
pass
def disable_cache(self, new_config, applied_config, lock_ownership=None):
new_config = ServiceConfig.from_dict(new_config)
applied_config = ServiceConfig.from_dict(applied_config)
if not lock_ownership:
lock_ownership = self.lock.try_lock()
if not lock_ownership:
raise exception.CephCacheDisableFailure()
with lock_ownership():
eventlet.spawn(self.do_disable_cache,
new_config, applied_config,
lock_ownership.transfer())
def do_disable_cache(self, new_config, applied_config, lock_ownership):
LOG.info(_LI("cache_tiering_disable_cache: "
"new_config={}, applied_config={}").format(
new_config, applied_config))
with lock_ownership():
success = False
_exception = None
try:
self.config_desired.cache_enabled = False
for pool in CEPH_POOLS:
if (pool['pool_name'] ==
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL or
pool['pool_name'] ==
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER):
object_pool_name = \
self.service.monitor._get_object_pool_name()
pool['pool_name'] = object_pool_name
with self.ignore_ceph_failure():
self.cache_mode_set(
pool, 'forward')
for pool in CEPH_POOLS:
if (pool['pool_name'] ==
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL or
pool['pool_name'] ==
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER):
object_pool_name = \
self.service.monitor._get_object_pool_name()
pool['pool_name'] = object_pool_name
retries_left = 3
while True:
try:
self.cache_flush(pool)
break
except exception.CephCacheFlushFailure:
retries_left -= 1
if not retries_left:
# give up
break
else:
time.sleep(1)
for pool in CEPH_POOLS:
if (pool['pool_name'] ==
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL or
pool['pool_name'] ==
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER):
object_pool_name = \
self.service.monitor._get_object_pool_name()
pool['pool_name'] = object_pool_name
with self.ignore_ceph_failure():
self.cache_overlay_delete(pool)
self.cache_tier_remove(pool)
for pool in CEPH_POOLS:
if (pool['pool_name'] ==
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL or
pool['pool_name'] ==
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER):
object_pool_name = \
self.service.monitor._get_object_pool_name()
pool['pool_name'] = object_pool_name
with self.ignore_ceph_failure():
self.cache_pool_delete(pool)
success = True
except Exception as e:
LOG.warn(_LE('Failed to disable cache: reason=%s') %
traceback.format_exc())
_exception = str(e)
finally:
self.service.monitor.monitor_check_cache_tier(False)
if success:
self.config_desired.cache_enabled = False
self.config_applied.cache_enabled = False
self.service.sysinv_conductor.call(
{}, 'cache_tiering_disable_cache_complete',
success=success, exception=_exception,
new_config=new_config.to_dict(),
applied_config=applied_config.to_dict())
def get_pool_pg_num(self, pool_name):
return self.service.sysinv_conductor.call(
{}, 'get_pool_pg_num',
pool_name=pool_name)
def cache_pool_create(self, pool):
backing_pool = pool['pool_name']
cache_pool = backing_pool + '-cache'
pg_num = self.get_pool_pg_num(cache_pool)
if not ceph.osd_pool_exists(self.service.ceph_api, cache_pool):
ceph.osd_pool_create(
self.service.ceph_api, cache_pool,
pg_num, pg_num)
def cache_pool_delete(self, pool):
cache_pool = pool['pool_name'] + '-cache'
ceph.osd_pool_delete(
self.service.ceph_api, cache_pool)
def cache_tier_add(self, pool):
backing_pool = pool['pool_name']
cache_pool = backing_pool + '-cache'
response, body = self.service.ceph_api.osd_tier_add(
backing_pool, cache_pool,
force_nonempty="--force-nonempty",
body='json')
if response.ok:
LOG.info(_LI("Added OSD tier: "
"backing_pool={}, cache_pool={}").format(
backing_pool, cache_pool))
else:
e = exception.CephPoolAddTierFailure(
backing_pool=backing_pool,
cache_pool=cache_pool,
response_status_code=response.status_code,
response_reason=response.reason,
status=body.get('status'),
output=body.get('output'))
LOG.warn(e)
raise e
def cache_tier_remove(self, pool):
backing_pool = pool['pool_name']
cache_pool = backing_pool + '-cache'
response, body = self.service.ceph_api.osd_tier_remove(
backing_pool, cache_pool, body='json')
if response.ok:
LOG.info(_LI("Removed OSD tier: "
"backing_pool={}, cache_pool={}").format(
backing_pool, cache_pool))
else:
e = exception.CephPoolRemoveTierFailure(
backing_pool=backing_pool,
cache_pool=cache_pool,
response_status_code=response.status_code,
response_reason=response.reason,
status=body.get('status'),
output=body.get('output'))
LOG.warn(e)
raise e
def cache_mode_set(self, pool, mode):
backing_pool = pool['pool_name']
cache_pool = backing_pool + '-cache'
response, body = self.service.ceph_api.osd_tier_cachemode(
cache_pool, mode, body='json')
if response.ok:
LOG.info(_LI("Set OSD tier cache mode: "
"cache_pool={}, mode={}").format(cache_pool, mode))
else:
e = exception.CephCacheSetModeFailure(
cache_pool=cache_pool,
mode=mode,
response_status_code=response.status_code,
response_reason=response.reason,
status=body.get('status'),
output=body.get('output'))
LOG.warn(e)
raise e
def cache_pool_set_config(self, pool, config):
for name, value in config.params.iteritems():
self.cache_pool_set_param(pool, name, value)
def cache_pool_set_param(self, pool, name, value):
backing_pool = pool['pool_name']
cache_pool = backing_pool + '-cache'
ceph.osd_set_pool_param(
self.service.ceph_api, cache_pool, name, value)
def cache_overlay_create(self, pool):
backing_pool = pool['pool_name']
cache_pool = backing_pool + '-cache'
response, body = self.service.ceph_api.osd_tier_set_overlay(
backing_pool, cache_pool, body='json')
if response.ok:
LOG.info(_LI("Set OSD tier overlay: "
"backing_pool={}, cache_pool={}").format(
backing_pool, cache_pool))
else:
e = exception.CephCacheCreateOverlayFailure(
backing_pool=backing_pool,
cache_pool=cache_pool,
response_status_code=response.status_code,
response_reason=response.reason,
status=body.get('status'),
output=body.get('output'))
LOG.warn(e)
raise e
def cache_overlay_delete(self, pool):
backing_pool = pool['pool_name']
cache_pool = pool['pool_name']
response, body = self.service.ceph_api.osd_tier_remove_overlay(
backing_pool, body='json')
if response.ok:
LOG.info(_LI("Removed OSD tier overlay: "
"backing_pool={}").format(backing_pool))
else:
e = exception.CephCacheDeleteOverlayFailure(
backing_pool=backing_pool,
cache_pool=cache_pool,
response_status_code=response.status_code,
response_reason=response.reason,
status=body.get('status'),
output=body.get('output'))
LOG.warn(e)
raise e
@staticmethod
def rados_cache_flush_evict_all(pool):
backing_pool = pool['pool_name']
cache_pool = backing_pool + '-cache'
try:
subprocess.check_call(
['/usr/bin/rados', '-p', cache_pool, 'cache-flush-evict-all'])
LOG.info(_LI("Flushed OSD cache pool:"
"cache_pool={}").format(cache_pool))
except subprocess.CalledProcessError as e:
_e = exception.CephCacheFlushFailure(
cache_pool=cache_pool,
return_code=str(e.returncode),
cmd=" ".join(e.cmd),
output=e.output)
LOG.warn(_e)
raise _e
def cache_flush(self, pool):
backing_pool = pool['pool_name']
cache_pool = backing_pool + '-cache'
try:
# set target_max_objects to a small value to force evacuation of
# objects from cache before we use rados cache-flush-evict-all
# WARNING: assuming cache_pool will be deleted after flush so
# we don't have to save/restore the value of target_max_objects
#
self.cache_pool_set_param(pool, 'target_max_objects', 1)
prev_object_count = None
wait_interval = MIN_WAIT
while True:
response, body = self.service.ceph_api.df(body='json')
if not response.ok:
LOG.warn(_LW(
"Failed to retrieve cluster free space stats: "
"status_code=%d, reason=%s") % (
response.status_code, response.reason))
break
stats = None
for s in body['output']['pools']:
if s['name'] == cache_pool:
stats = s['stats']
break
if not stats:
LOG.warn(_LW("Missing pool free space stats: "
"cache_pool=%s") % cache_pool)
break
object_count = stats['objects']
if object_count < constants.CACHE_FLUSH_OBJECTS_THRESHOLD:
break
if prev_object_count is not None:
delta_objects = object_count - prev_object_count
if delta_objects > 0:
LOG.warn(_LW("Unexpected increase in number "
"of objects in cache pool: "
"cache_pool=%s, prev_object_count=%d, "
"object_count=%d") % (
cache_pool, prev_object_count,
object_count))
break
if delta_objects == 0:
wait_interval *= 2
if wait_interval > MAX_WAIT:
LOG.warn(_LW(
"Cache pool number of objects did not "
"decrease: cache_pool=%s, object_count=%d, "
"wait_interval=%d") % (
cache_pool, object_count, wait_interval))
break
else:
wait_interval = MIN_WAIT
time.sleep(wait_interval)
prev_object_count = object_count
except exception.CephPoolSetParamFailure as e:
LOG.warn(e)
finally:
self.rados_cache_flush_evict_all(pool)
def update_cache_target_max_bytes(self):
"Dynamically compute target_max_bytes of caching pools"
# Only compute if cache tiering is enabled
if self.config_applied and self.config_desired:
if (not self.config_desired.cache_enabled or
not self.config_applied.cache_enabled):
LOG.debug("Cache tiering disabled, no need to update "
"target_max_bytes.")
return
LOG.debug("Updating target_max_bytes")
# Get available space
response, body = self.service.ceph_api.osd_df(body='json',
output_method='tree')
if not response.ok:
LOG.warn(_LW(
"Failed to retrieve cluster free space stats: "
"status_code=%d, reason=%s") % (
response.status_code, response.reason))
return
storage_tier_size = 0
cache_tier_size = 0
replication = constants.CEPH_REPLICATION_FACTOR
for node in body['output']['nodes']:
if node['name'] == 'storage-tier':
storage_tier_size = node['kb']*1024/replication
elif node['name'] == 'cache-tier':
cache_tier_size = node['kb']*1024/replication
if storage_tier_size == 0 or cache_tier_size == 0:
LOG.info("Failed to get cluster size "
"(storage_tier_size=%s, cache_tier_size=%s),"
"retrying on next cycle" %
(storage_tier_size, cache_tier_size))
return
# Get available pools
response, body = self.service.ceph_api.osd_lspools(body='json')
if not response.ok:
LOG.warn(_LW(
"Failed to retrieve available pools: "
"status_code=%d, reason=%s") % (
response.status_code, response.reason))
return
pools = [p['poolname'] for p in body['output']]
# Separate backing from caching for easy iteration
backing_pools = []
caching_pools = []
for p in pools:
if p.endswith('-cache'):
caching_pools.append(p)
else:
backing_pools.append(p)
LOG.debug("Pools: caching: %s, backing: %s" % (caching_pools,
backing_pools))
if not len(caching_pools):
# We do not have caching pools created yet
return
# Get quota from backing pools that are cached
stats = {}
for p in caching_pools:
backing_name = p.replace('-cache', '')
stats[backing_name] = {}
try:
quota = ceph.osd_pool_get_quota(self.service.ceph_api,
backing_name)
except exception.CephPoolGetQuotaFailure as e:
LOG.warn(_LW(
"Failed to retrieve quota: "
"exception: %s") % str(e))
return
stats[backing_name]['quota'] = quota['max_bytes']
stats[backing_name]['quota_pt'] = (quota['max_bytes']*100.0 /
storage_tier_size)
LOG.debug("Quota for pool: %s "
"is: %s B representing %s pt" %
(backing_name,
quota['max_bytes'],
stats[backing_name]['quota_pt']))
# target_max_bytes logic:
# - For computing target_max_bytes cache_tier_size must be equal than
# the sum of target_max_bytes of each caching pool
# - target_max_bytes for each caching pool is computed as the
# percentage of quota in corresponding backing pool
# - the caching tiers has to work at full capacity, so if the sum of
# all quotas in the backing tier is different than 100% we need to
# normalize
# - if the quota is zero for any pool we add CACHE_TIERING_MIN_QUOTA
# by default *after* normalization so that we have real minimum
# We compute the real percentage that need to be normalized after
# ensuring that we have CACHE_TIERING_MIN_QUOTA for each pool with
# a quota of 0
real_100pt = 90.0 # we start from max and decrease it for each 0 pool
# Note: We must avoid reaching 100% at all costs! and
# cache_target_full_ratio, the Ceph parameter that is supposed to
# protect the cluster against this does not work in Ceph v0.94.6!
# Therefore a value of 90% is better suited for this
for p in caching_pools:
backing_name = p.replace('-cache', '')
if stats[backing_name]['quota_pt'] == 0:
real_100pt -= constants.CACHE_TIERING_MIN_QUOTA
LOG.debug("Quota before normalization for %s is: %s pt" %
(p, stats[backing_name]['quota_pt']))
# Compute total percentage of quotas for all backing pools.
# Should be 100% if correctly configured
total_quota_pt = 0
for p in caching_pools:
backing_name = p.replace('-cache', '')
total_quota_pt += stats[backing_name]['quota_pt']
LOG.debug("Total quota pt is: %s" % total_quota_pt)
# Normalize quota pt to 100% (or real_100pt)
if total_quota_pt != 0: # to avoid divide by zero
for p in caching_pools:
backing_name = p.replace('-cache', '')
stats[backing_name]['quota_pt'] = \
(stats[backing_name]['quota_pt'] *
(real_100pt / total_quota_pt))
# Do not allow quota to be 0 for any pool
total = 0
for p in caching_pools:
backing_name = p.replace('-cache', '')
if stats[backing_name]['quota_pt'] == 0:
stats[backing_name]['quota_pt'] = \
constants.CACHE_TIERING_MIN_QUOTA
total += stats[backing_name]['quota_pt']
LOG.debug("Quota after normalization for %s is: %s:" %
(p, stats[backing_name]['quota_pt']))
if total > 100:
# Supplementary protection, we really have to avoid going above
# 100%. Note that real_100pt is less than 100% but we still got
# more than 100!
LOG.warn("Total sum of quotas should not go above 100% "
"but is: %s, recalculating in next cycle" % total)
return
LOG.debug("Total sum of quotas is %s pt" % total)
# Get current target_max_bytes. We cache it to reduce requests
# to ceph-rest-api. We are the ones changing it, so not an issue.
for p in caching_pools:
if p not in self.target_max_bytes:
try:
value = ceph.osd_get_pool_param(self.service.ceph_api, p,
constants.TARGET_MAX_BYTES)
except exception.CephPoolGetParamFailure as e:
LOG.warn(e)
return
self.target_max_bytes[p] = value
LOG.debug("Existing target_max_bytes got from "
"Ceph: %s" % self.target_max_bytes)
# Set TARGET_MAX_BYTES
LOG.debug("storage_tier_size: %s "
"cache_tier_size: %s" % (storage_tier_size,
cache_tier_size))
for p in caching_pools:
backing_name = p.replace('-cache', '')
s = stats[backing_name]
target_max_bytes = math.floor(s['quota_pt'] * cache_tier_size /
100.0)
target_max_bytes = int(target_max_bytes)
LOG.debug("New Target max bytes of pool: %s is: %s B" % (
p, target_max_bytes))
# Set the new target_max_bytes only if it changed
if self.target_max_bytes.get(p) == target_max_bytes:
LOG.debug("Target max bytes of pool: %s "
"is already updated" % p)
continue
try:
ceph.osd_set_pool_param(self.service.ceph_api, p,
constants.TARGET_MAX_BYTES,
target_max_bytes)
self.target_max_bytes[p] = target_max_bytes
except exception.CephPoolSetParamFailure as e:
LOG.warn(e)
continue
return

View File

@ -0,0 +1,164 @@
#
# Copyright (c) 2016-2018 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import exception
from i18n import _LI
# noinspection PyUnresolvedReferences
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
def osd_pool_set_quota(ceph_api, pool_name, max_bytes=0, max_objects=0):
"""Set the quota for an OSD pool_name
Setting max_bytes or max_objects to 0 will disable that quota param
:param pool_name: OSD pool_name
:param max_bytes: maximum bytes for OSD pool_name
:param max_objects: maximum objects for OSD pool_name
"""
# Update quota if needed
prev_quota = osd_pool_get_quota(ceph_api, pool_name)
if prev_quota["max_bytes"] != max_bytes:
resp, b = ceph_api.osd_set_pool_quota(pool_name, 'max_bytes',
max_bytes, body='json')
if resp.ok:
LOG.info(_LI("Set OSD pool_name quota: "
"pool_name={}, max_bytes={}").format(
pool_name, max_bytes))
else:
e = exception.CephPoolSetQuotaFailure(
pool=pool_name, name='max_bytes',
value=max_bytes, reason=resp.reason)
LOG.error(e)
raise e
if prev_quota["max_objects"] != max_objects:
resp, b = ceph_api.osd_set_pool_quota(pool_name, 'max_objects',
max_objects,
body='json')
if resp.ok:
LOG.info(_LI("Set OSD pool_name quota: "
"pool_name={}, max_objects={}").format(
pool_name, max_objects))
else:
e = exception.CephPoolSetQuotaFailure(
pool=pool_name, name='max_objects',
value=max_objects, reason=resp.reason)
LOG.error(e)
raise e
def osd_pool_get_quota(ceph_api, pool_name):
resp, quota = ceph_api.osd_get_pool_quota(pool_name, body='json')
if not resp.ok:
e = exception.CephPoolGetQuotaFailure(
pool=pool_name, reason=resp.reason)
LOG.error(e)
raise e
else:
return {"max_objects": quota["output"]["quota_max_objects"],
"max_bytes": quota["output"]["quota_max_bytes"]}
def osd_pool_exists(ceph_api, pool_name):
response, body = ceph_api.osd_pool_get(
pool_name, "pg_num", body='json')
if response.ok:
return True
return False
def osd_pool_create(ceph_api, pool_name, pg_num, pgp_num):
if pool_name.endswith("-cache"):
# ruleset 1: is the ruleset for the cache tier
# Name: cache_tier_ruleset
ruleset = 1
else:
# ruleset 0: is the default ruleset if no crushmap is loaded or
# the ruleset for the backing tier if loaded:
# Name: storage_tier_ruleset
ruleset = 0
response, body = ceph_api.osd_pool_create(
pool_name, pg_num, pgp_num, pool_type="replicated",
ruleset=ruleset, body='json')
if response.ok:
LOG.info(_LI("Created OSD pool: "
"pool_name={}, pg_num={}, pgp_num={}, "
"pool_type=replicated, ruleset={}").format(
pool_name, pg_num, pgp_num, ruleset))
else:
e = exception.CephPoolCreateFailure(
name=pool_name, reason=response.reason)
LOG.error(e)
raise e
# Explicitly assign the ruleset to the pool on creation since it is
# ignored in the create call
response, body = ceph_api.osd_set_pool_param(
pool_name, "crush_ruleset", ruleset, body='json')
if response.ok:
LOG.info(_LI("Assigned crush ruleset to OS pool: "
"pool_name={}, ruleset={}").format(
pool_name, ruleset))
else:
e = exception.CephPoolRulesetFailure(
name=pool_name, reason=response.reason)
LOG.error(e)
ceph_api.osd_pool_delete(
pool_name, pool_name,
sure='--yes-i-really-really-mean-it',
body='json')
raise e
def osd_pool_delete(ceph_api, pool_name):
"""Delete an osd pool
:param pool_name: pool name
"""
response, body = ceph_api.osd_pool_delete(
pool_name, pool_name,
sure='--yes-i-really-really-mean-it',
body='json')
if response.ok:
LOG.info(_LI("Deleted OSD pool {}").format(pool_name))
else:
e = exception.CephPoolDeleteFailure(
name=pool_name, reason=response.reason)
LOG.warn(e)
raise e
def osd_set_pool_param(ceph_api, pool_name, param, value):
response, body = ceph_api.osd_set_pool_param(
pool_name, param, value,
force=None, body='json')
if response.ok:
LOG.info('OSD set pool param: '
'pool={}, name={}, value={}'.format(
pool_name, param, value))
else:
raise exception.CephPoolSetParamFailure(
pool_name=pool_name,
param=param,
value=str(value),
reason=response.reason)
return response, body
def osd_get_pool_param(ceph_api, pool_name, param):
response, body = ceph_api.osd_get_pool_param(
pool_name, param, body='json')
if response.ok:
LOG.debug('OSD get pool param: '
'pool={}, name={}, value={}'.format(
pool_name, param, body['output'][param]))
else:
raise exception.CephPoolGetParamFailure(
pool_name=pool_name,
param=param,
reason=response.reason)
return body['output'][param]

View File

@ -0,0 +1,107 @@
#
# Copyright (c) 2016-2018 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
from i18n import _
# noinspection PyUnresolvedReferences
from sysinv.common import constants as sysinv_constants
CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL = \
sysinv_constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL
CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER = \
sysinv_constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER
CEPH_POOLS = sysinv_constants.BACKING_POOLS
CEPH_REPLICATION_FACTOR = sysinv_constants.CEPH_REPLICATION_FACTOR_DEFAULT
SERVICE_PARAM_CEPH_CACHE_HIT_SET_TYPE_BLOOM = \
sysinv_constants.SERVICE_PARAM_CEPH_CACHE_HIT_SET_TYPE_BLOOM
CACHE_TIERING_DEFAULTS = sysinv_constants.CACHE_TIERING_DEFAULTS
TARGET_MAX_BYTES = \
sysinv_constants.SERVICE_PARAM_CEPH_CACHE_TIER_TARGET_MAX_BYTES
# Cache tiering section shortener
CACHE_TIERING = \
sysinv_constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER
CACHE_TIERING_DESIRED = \
sysinv_constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER_DESIRED
CACHE_TIERING_APPLIED = \
sysinv_constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER_APPLIED
CACHE_TIERING_SECTIONS = \
[CACHE_TIERING, CACHE_TIERING_DESIRED, CACHE_TIERING_APPLIED]
# Cache flush parameters
CACHE_FLUSH_OBJECTS_THRESHOLD = 1000
CACHE_FLUSH_MIN_WAIT_OBJ_COUNT_DECREASE_SEC = 1
CACHE_FLUSH_MAX_WAIT_OBJ_COUNT_DECREASE_SEC = 128
CACHE_TIERING_MIN_QUOTA = 5
FM_ALARM_REASON_MAX_SIZE = 256
# TODO this will later change based on parsed health
# clock skew is vm malfunction, mon or osd is equipment mal
ALARM_CAUSE = 'equipment-malfunction'
ALARM_TYPE = 'equipment'
# Ceph health check interval (in seconds)
CEPH_HEALTH_CHECK_INTERVAL = 60
# Ceph health statuses
CEPH_HEALTH_OK = 'HEALTH_OK'
CEPH_HEALTH_WARN = 'HEALTH_WARN'
CEPH_HEALTH_ERR = 'HEALTH_ERR'
CEPH_HEALTH_DOWN = 'CEPH_DOWN'
# Statuses not reported by Ceph
CEPH_STATUS_CUSTOM = [CEPH_HEALTH_DOWN]
SEVERITY = {CEPH_HEALTH_DOWN: 'critical',
CEPH_HEALTH_ERR: 'critical',
CEPH_HEALTH_WARN: 'warning'}
SERVICE_AFFECTING = {CEPH_HEALTH_DOWN: True,
CEPH_HEALTH_ERR: True,
CEPH_HEALTH_WARN: False}
# TODO this will later change based on parsed health
ALARM_REASON_NO_OSD = _('no OSDs')
ALARM_REASON_OSDS_DOWN = _('OSDs are down')
ALARM_REASON_OSDS_OUT = _('OSDs are out')
ALARM_REASON_OSDS_DOWN_OUT = _('OSDs are down/out')
ALARM_REASON_PEER_HOST_DOWN = _('peer host down')
REPAIR_ACTION_MAJOR_CRITICAL_ALARM = _(
'Ensure storage hosts from replication group are unlocked and available.'
'Check if OSDs of each storage host are up and running.'
'If problem persists, contact next level of support.')
REPAIR_ACTION = _('If problem persists, contact next level of support.')
SYSINV_CONDUCTOR_TOPIC = 'sysinv.conductor_manager'
CEPH_MANAGER_TOPIC = 'sysinv.ceph_manager'
SYSINV_CONFIG_FILE = '/etc/sysinv/sysinv.conf'
# Titanium Cloud version strings
TITANIUM_SERVER_VERSION_16_10 = '16.10'
CEPH_HEALTH_WARN_REQUIRE_JEWEL_OSDS_NOT_SET = (
"all OSDs are running jewel or later but the "
"'require_jewel_osds' osdmap flag is not set")
UPGRADE_COMPLETED = \
sysinv_constants.UPGRADE_COMPLETED
UPGRADE_ABORTING = \
sysinv_constants.UPGRADE_ABORTING
UPGRADE_ABORT_COMPLETING = \
sysinv_constants.UPGRADE_ABORT_COMPLETING
UPGRADE_ABORTING_ROLLBACK = \
sysinv_constants.UPGRADE_ABORTING_ROLLBACK
CEPH_FLAG_REQUIRE_JEWEL_OSDS = 'require_jewel_osds'
# Tiers
CEPH_CRUSH_TIER_SUFFIX = sysinv_constants.CEPH_CRUSH_TIER_SUFFIX
SB_TIER_TYPE_CEPH = sysinv_constants.SB_TIER_TYPE_CEPH
SB_TIER_SUPPORTED = sysinv_constants.SB_TIER_SUPPORTED
SB_TIER_DEFAULT_NAMES = sysinv_constants.SB_TIER_DEFAULT_NAMES
SB_TIER_CEPH_POOLS = sysinv_constants.SB_TIER_CEPH_POOLS

View File

@ -0,0 +1,130 @@
#
# Copyright (c) 2016-2017 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
# noinspection PyUnresolvedReferences
from i18n import _, _LW
# noinspection PyUnresolvedReferences
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
class CephManagerException(Exception):
message = _("An unknown exception occurred.")
def __init__(self, message=None, **kwargs):
self.kwargs = kwargs
if not message:
try:
message = self.message % kwargs
except TypeError:
LOG.warn(_LW('Exception in string format operation'))
for name, value in kwargs.iteritems():
LOG.error("%s: %s" % (name, value))
# at least get the core message out if something happened
message = self.message
super(CephManagerException, self).__init__(message)
class CephPoolSetQuotaFailure(CephManagerException):
message = _("Error seting the OSD pool "
"quota %(name)s for %(pool)s to %(value)s") \
+ ": %(reason)s"
class CephPoolGetQuotaFailure(CephManagerException):
message = _("Error geting the OSD pool quota for %(pool)s") \
+ ": %(reason)s"
class CephPoolCreateFailure(CephManagerException):
message = _("Creating OSD pool %(name)s failed: %(reason)s")
class CephPoolDeleteFailure(CephManagerException):
message = _("Deleting OSD pool %(name)s failed: %(reason)s")
class CephPoolRulesetFailure(CephManagerException):
message = _("Assigning crush ruleset to OSD "
"pool %(name)s failed: %(reason)s")
class CephPoolAddTierFailure(CephManagerException):
message = _("Failed to add OSD tier: "
"backing_pool=%(backing_pool)s, cache_pool=%(cache_pool)s, "
"response=%(response_status_code)s:%(response_reason)s, "
"status=%(status)s, output=%(output)s")
class CephPoolRemoveTierFailure(CephManagerException):
message = _("Failed to remove tier: "
"backing_pool=%(backing_pool)s, cache_pool=%(cache_pool)s, "
"response=%(response_status_code)s:%(response_reason)s, "
"status=%(status)s, output=%(output)s")
class CephCacheSetModeFailure(CephManagerException):
message = _("Failed to set OSD tier cache mode: "
"cache_pool=%(cache_pool)s, mode=%(mode)s, "
"response=%(response_status_code)s:%(response_reason)s, "
"status=%(status)s, output=%(output)s")
class CephPoolSetParamFailure(CephManagerException):
message = _("Cannot set Ceph OSD pool parameter: "
"pool_name=%(pool_name)s, param=%(param)s, value=%(value)s. "
"Reason: %(reason)s")
class CephPoolGetParamFailure(CephManagerException):
message = _("Cannot get Ceph OSD pool parameter: "
"pool_name=%(pool_name)s, param=%(param)s. "
"Reason: %(reason)s")
class CephCacheCreateOverlayFailure(CephManagerException):
message = _("Failed to create overlay: "
"backing_pool=%(backing_pool)s, cache_pool=%(cache_pool)s, "
"response=%(response_status_code)s:%(response_reason)s, "
"status=%(status)s, output=%(output)s")
class CephCacheDeleteOverlayFailure(CephManagerException):
message = _("Failed to delete overlay: "
"backing_pool=%(backing_pool)s, cache_pool=%(cache_pool)s, "
"response=%(response_status_code)s:%(response_reason)s, "
"status=%(status)s, output=%(output)s")
class CephCacheFlushFailure(CephManagerException):
message = _("Failed to flush cache pool: "
"cache_pool=%(cache_pool)s, "
"return_code=%(return_code)s, "
"cmd=%(cmd)s, output=%(output)s")
class CephCacheEnableFailure(CephManagerException):
message = _("Cannot enable Ceph cache tier. "
"Reason: cache tiering operation in progress.")
class CephCacheDisableFailure(CephManagerException):
message = _("Cannot disable Ceph cache tier. "
"Reason: cache tiering operation in progress.")
class CephSetKeyFailure(CephManagerException):
message = _("Error setting the Ceph flag "
"'%(flag)s' %(extra)s: "
"response=%(response_status_code)s:%(response_reason)s, "
"status=%(status)s, output=%(output)s")
class CephApiFailure(CephManagerException):
message = _("API failure: "
"call=%(call)s, reason=%(reason)s")

View File

@ -0,0 +1,15 @@
#
# Copyright (c) 2016 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import oslo_i18n
DOMAIN = 'ceph-manager'
_translators = oslo_i18n.TranslatorFactory(domain=DOMAIN)
_ = _translators.primary
_LI = _translators.log_info
_LW = _translators.log_warning
_LE = _translators.log_error

View File

@ -0,0 +1,893 @@
#
# Copyright (c) 2013-2018 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import time
# noinspection PyUnresolvedReferences
from fm_api import fm_api
# noinspection PyUnresolvedReferences
from fm_api import constants as fm_constants
# noinspection PyUnresolvedReferences
from oslo_log import log as logging
from sysinv.conductor.cache_tiering_service_config import ServiceConfig
# noinspection PyProtectedMember
from i18n import _, _LI, _LW, _LE
import constants
import exception
LOG = logging.getLogger(__name__)
# When upgrading from 16.10 to 17.x Ceph goes from Hammer release
# to Jewel release. After all storage nodes are upgraded to 17.x
# the cluster is in HEALTH_WARN until administrator explicitly
# enables require_jewel_osds flag - which signals Ceph that it
# can safely transition from Hammer to Jewel
#
# This class is needed only when upgrading from 16.10 to 17.x
# TODO: remove it after 1st 17.x release
#
class HandleUpgradesMixin(object):
def __init__(self, service):
self.service = service
self.surpress_require_jewel_osds_warning = False
def setup(self, config):
self._set_upgrade(self.service.retry_get_software_upgrade_status())
def _set_upgrade(self, upgrade):
state = upgrade.get('state')
from_version = upgrade.get('from_version')
if (state
and state != constants.UPGRADE_COMPLETED
and from_version == constants.TITANIUM_SERVER_VERSION_16_10):
LOG.info(_LI("Surpress require_jewel_osds health warning"))
self.surpress_require_jewel_osds_warning = True
def set_flag_require_jewel_osds(self):
try:
response, body = self.service.ceph_api.osd_set_key(
constants.CEPH_FLAG_REQUIRE_JEWEL_OSDS,
body='json')
LOG.info(_LI("Set require_jewel_osds flag"))
except IOError as e:
raise exception.CephApiFailure(
call="osd_set_key",
reason=e.message)
else:
if not response.ok:
raise exception.CephSetKeyFailure(
flag=constants.CEPH_FLAG_REQUIRE_JEWEL_OSDS,
extra=_("needed to complete upgrade to Jewel"),
response_status_code=response.status_code,
response_reason=response.reason,
status=body.get('status'),
output=body.get('output'))
def filter_health_status(self, health):
health = self.auto_heal(health)
# filter out require_jewel_osds warning
#
if not self.surpress_require_jewel_osds_warning:
return health
if health['health'] != constants.CEPH_HEALTH_WARN:
return health
if (constants.CEPH_HEALTH_WARN_REQUIRE_JEWEL_OSDS_NOT_SET
not in health['detail']):
return health
return self._remove_require_jewel_osds_warning(health)
def _remove_require_jewel_osds_warning(self, health):
reasons_list = []
for reason in health['detail'].split(';'):
reason = reason.strip()
if len(reason) == 0:
continue
if constants.CEPH_HEALTH_WARN_REQUIRE_JEWEL_OSDS_NOT_SET in reason:
continue
reasons_list.append(reason)
if len(reasons_list) == 0:
health = {
'health': constants.CEPH_HEALTH_OK,
'detail': ''}
else:
health['detail'] = '; '.join(reasons_list)
return health
def auto_heal(self, health):
if (health['health'] == constants.CEPH_HEALTH_WARN
and (constants.CEPH_HEALTH_WARN_REQUIRE_JEWEL_OSDS_NOT_SET
in health['detail'])):
try:
upgrade = self.service.get_software_upgrade_status()
except Exception as ex:
LOG.warn(_LW(
"Getting software upgrade status failed "
"with: %s. Skip auto-heal attempt "
"(will retry on next ceph status poll).") % str(ex))
return
state = upgrade.get('state')
# surpress require_jewel_osds in case upgrade is
# in progress but not completed or aborting
if (not self.surpress_require_jewel_osds_warning
and (upgrade.get('from_version')
== constants.TITANIUM_SERVER_VERSION_16_10)
and state not in [
None,
constants.UPGRADE_COMPLETED,
constants.UPGRADE_ABORTING,
constants.UPGRADE_ABORT_COMPLETING,
constants.UPGRADE_ABORTING_ROLLBACK]):
LOG.info(_LI("Surpress require_jewel_osds health warning"))
self.surpress_require_jewel_osds_warning = True
# set require_jewel_osds in case upgrade is
# not in progress or completed
if (state in [None, constants.UPGRADE_COMPLETED]):
LOG.warn(_LW(
"No upgrade in progress or update completed "
"and require_jewel_osds health warning raised. "
"Set require_jewel_osds flag."))
self.set_flag_require_jewel_osds()
health = self._remove_require_jewel_osds_warning(health)
LOG.info(_LI("Unsurpress require_jewel_osds health warning"))
self.surpress_require_jewel_osds_warning = False
# unsurpress require_jewel_osds in case upgrade
# is aborting
if (self.surpress_require_jewel_osds_warning
and state in [
constants.UPGRADE_ABORTING,
constants.UPGRADE_ABORT_COMPLETING,
constants.UPGRADE_ABORTING_ROLLBACK]):
LOG.info(_LI("Unsurpress require_jewel_osds health warning"))
self.surpress_require_jewel_osds_warning = False
return health
class Monitor(HandleUpgradesMixin):
def __init__(self, service):
self.service = service
self.current_ceph_health = ""
self.cache_enabled = False
self.tiers_size = {}
self.known_object_pool_name = None
self.primary_tier_name = constants.SB_TIER_DEFAULT_NAMES[
constants.SB_TIER_TYPE_CEPH] + constants.CEPH_CRUSH_TIER_SUFFIX
self.cluster_is_up = False
super(Monitor, self).__init__(service)
def setup(self, config):
self.set_caching_tier_config(config)
super(Monitor, self).setup(config)
def set_caching_tier_config(self, config):
conf = ServiceConfig().from_dict(
config.get(constants.CACHE_TIERING_APPLIED))
if conf:
self.cache_enabled = conf.cache_enabled
def monitor_check_cache_tier(self, enable_flag):
LOG.info(_LI("monitor_check_cache_tier: "
"enable_flag={}".format(enable_flag)))
self.cache_enabled = enable_flag
def run(self):
# Wait until Ceph cluster is up and we can get the fsid
while True:
self.ceph_get_fsid()
if self.service.entity_instance_id:
break
time.sleep(constants.CEPH_HEALTH_CHECK_INTERVAL)
# Start monitoring ceph status
while True:
self.ceph_poll_status()
self.ceph_poll_quotas()
time.sleep(constants.CEPH_HEALTH_CHECK_INTERVAL)
def ceph_get_fsid(self):
# Check whether an alarm has already been raised
self._get_current_alarms()
if self.current_health_alarm:
LOG.info(_LI("Current alarm: %s") %
str(self.current_health_alarm.__dict__))
fsid = self._get_fsid()
if not fsid:
# Raise alarm - it will not have an entity_instance_id
self._report_fault({'health': constants.CEPH_HEALTH_DOWN,
'detail': 'Ceph cluster is down.'},
fm_constants.FM_ALARM_ID_STORAGE_CEPH)
else:
# Clear alarm with no entity_instance_id
self._clear_fault(fm_constants.FM_ALARM_ID_STORAGE_CEPH)
self.service.entity_instance_id = 'cluster=%s' % fsid
def ceph_poll_status(self):
# get previous data every time in case:
# * daemon restarted
# * alarm was cleared manually but stored as raised in daemon
self._get_current_alarms()
if self.current_health_alarm:
LOG.info(_LI("Current alarm: %s") %
str(self.current_health_alarm.__dict__))
# get ceph health
health = self._get_health()
LOG.info(_LI("Current Ceph health: "
"%(health)s detail: %(detail)s") % health)
health = self.filter_health_status(health)
if health['health'] != constants.CEPH_HEALTH_OK:
self._report_fault(health, fm_constants.FM_ALARM_ID_STORAGE_CEPH)
self._report_alarm_osds_health()
else:
self._clear_fault(fm_constants.FM_ALARM_ID_STORAGE_CEPH)
self.clear_all_major_critical()
def filter_health_status(self, health):
return super(Monitor, self).filter_health_status(health)
def ceph_poll_quotas(self):
self._get_current_alarms()
if self.current_quota_alarms:
LOG.info(_LI("Current quota alarms %s") %
self.current_quota_alarms)
# Get current current size of each tier
previous_tiers_size = self.tiers_size
self.tiers_size = self._get_tiers_size()
# Make sure any removed tiers have the alarms cleared
for t in (set(previous_tiers_size)-set(self.tiers_size)):
self._clear_fault(fm_constants.FM_ALARM_ID_STORAGE_CEPH_FREE_SPACE,
"{0}.tier={1}".format(
self.service.entity_instance_id,
t[:-len(constants.CEPH_CRUSH_TIER_SUFFIX)]))
# Check the quotas on each tier
for tier in self.tiers_size:
# TODO(rchurch): For R6 remove the tier from the default crushmap
# and remove this check. No longer supporting this tier in R5
if tier == 'cache-tier':
continue
# Extract the tier name from the crush equivalent
tier_name = tier[:-len(constants.CEPH_CRUSH_TIER_SUFFIX)]
if self.tiers_size[tier] == 0:
LOG.info(_LI("'%s' tier cluster size not yet available")
% tier_name)
continue
pools_quota_sum = 0
if tier == self.primary_tier_name:
for pool in constants.CEPH_POOLS:
if (pool['pool_name'] ==
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL or
pool['pool_name'] ==
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER):
object_pool_name = self._get_object_pool_name()
if object_pool_name is None:
LOG.error("Rados gateway object data pool does "
"not exist.")
else:
pools_quota_sum += \
self._get_osd_pool_quota(object_pool_name)
else:
pools_quota_sum += self._get_osd_pool_quota(
pool['pool_name'])
else:
for pool in constants.SB_TIER_CEPH_POOLS:
pool_name = "{0}-{1}".format(pool['pool_name'], tier_name)
pools_quota_sum += self._get_osd_pool_quota(pool_name)
# Currently, there is only one pool on the addtional tier(s),
# therefore allow a quota of 0
if (pools_quota_sum != self.tiers_size[tier] and
pools_quota_sum != 0):
self._report_fault(
{'tier_name': tier_name,
'tier_eid': "{0}.tier={1}".format(
self.service.entity_instance_id,
tier_name)},
fm_constants.FM_ALARM_ID_STORAGE_CEPH_FREE_SPACE)
else:
self._clear_fault(
fm_constants.FM_ALARM_ID_STORAGE_CEPH_FREE_SPACE,
"{0}.tier={1}".format(self.service.entity_instance_id,
tier_name))
# CEPH HELPERS
def _get_fsid(self):
try:
response, fsid = self.service.ceph_api.fsid(
body='text', timeout=30)
except IOError as e:
LOG.warning(_LW("ceph_api.fsid failed: %s") % str(e.message))
self.cluster_is_up = False
return None
if not response.ok:
LOG.warning(_LW("Get fsid failed: %s") % response.reason)
self.cluster_is_up = False
return None
self.cluster_is_up = True
return fsid.strip()
def _get_health(self):
try:
# we use text since it has all info
response, body = self.service.ceph_api.health(
body='text', timeout=30)
except IOError as e:
LOG.warning(_LW("ceph_api.health failed: %s") % str(e.message))
self.cluster_is_up = False
return {'health': constants.CEPH_HEALTH_DOWN,
'detail': 'Ceph cluster is down.'}
if not response.ok:
LOG.warning(_LW("CEPH health check failed: %s") % response.reason)
health_info = [constants.CEPH_HEALTH_DOWN, response.reason]
self.cluster_is_up = False
else:
health_info = body.split(' ', 1)
self.cluster_is_up = True
health = health_info[0]
if len(health_info) > 1:
detail = health_info[1]
else:
detail = health_info[0]
return {'health': health.strip(),
'detail': detail.strip()}
def _get_object_pool_name(self):
if self.known_object_pool_name is None:
response, body = self.service.ceph_api.osd_pool_get(
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL,
"pg_num",
body='json')
if response.ok:
self.known_object_pool_name = \
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL
return self.known_object_pool_name
response, body = self.service.ceph_api.osd_pool_get(
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER,
"pg_num",
body='json')
if response.ok:
self.known_object_pool_name = \
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER
return self.known_object_pool_name
return self.known_object_pool_name
def _get_osd_pool_quota(self, pool_name):
try:
resp, quota = self.service.ceph_api.osd_get_pool_quota(
pool_name, body='json')
except IOError:
return 0
if not resp.ok:
LOG.error(_LE("Getting the quota for "
"%(name)s pool failed:%(reason)s)") %
{"name": pool_name, "reason": resp.reason})
return 0
else:
try:
quota_gib = int(quota["output"]["quota_max_bytes"])/(1024**3)
return quota_gib
except IOError:
return 0
# we have two root nodes 'cache-tier' and 'storage-tier'
# to calculate the space that is used by the pools, we must only
# use 'storage-tier'
# this function determines if a certain node is under a certain
# tree
def host_is_in_root(self, search_tree, node, root_name):
if node['type'] == 'root':
if node['name'] == root_name:
return True
else:
return False
return self.host_is_in_root(search_tree,
search_tree[node['parent']],
root_name)
# The information received from ceph is not properly
# structured for efficient parsing and searching, so
# it must be processed and transformed into a more
# structured form.
#
# Input received from ceph is an array of nodes with the
# following structure:
# [{'id':<node_id>, 'children':<array_of_children_ids>, ....},
# ...]
#
# We process this array and transform it into a dictionary
# (for efficient access) The transformed "search tree" is a
# dictionary with the following structure:
# {<node_id> : {'children':<array_of_children_ids>}
def _get_tiers_size(self):
try:
resp, body = self.service.ceph_api.osd_df(
body='json',
output_method='tree')
except IOError:
return 0
if not resp.ok:
LOG.error(_LE("Getting the cluster usage "
"information failed: %(reason)s - "
"%(body)s") % {"reason": resp.reason,
"body": body})
return {}
# A node is a crushmap element: root, chassis, host, osd. Create a
# dictionary for the nodes with the key as the id used for efficient
# searching through nodes.
#
# For example: storage-0's node has one child node => OSD 0
# {
# "id": -4,
# "name": "storage-0",
# "type": "host",
# "type_id": 1,
# "reweight": -1.000000,
# "kb": 51354096,
# "kb_used": 1510348,
# "kb_avail": 49843748,
# "utilization": 2.941047,
# "var": 1.480470,
# "pgs": 0,
# "children": [
# 0
# ]
# },
search_tree = {}
for node in body['output']['nodes']:
search_tree[node['id']] = node
# Extract the tiers as we will return a dict for the size of each tier
tiers = {k: v for k, v in search_tree.items() if v['type'] == 'root'}
# For each tier, traverse the heirarchy from the root->chassis->host.
# Sum the host sizes to determine the overall size of the tier
tier_sizes = {}
for tier in tiers.values():
tier_size = 0
for chassis_id in tier['children']:
chassis_size = 0
chassis = search_tree[chassis_id]
for host_id in chassis['children']:
host = search_tree[host_id]
if (chassis_size == 0 or
chassis_size > host['kb']):
chassis_size = host['kb']
tier_size += chassis_size/(1024 ** 2)
tier_sizes[tier['name']] = tier_size
return tier_sizes
# ALARM HELPERS
@staticmethod
def _check_storage_group(osd_tree, group_id,
hosts, osds, fn_report_alarm):
reasons = set()
degraded_hosts = set()
severity = fm_constants.FM_ALARM_SEVERITY_CRITICAL
for host_id in hosts:
if len(osds[host_id]) == 0:
reasons.add(constants.ALARM_REASON_NO_OSD)
degraded_hosts.add(host_id)
else:
for osd_id in osds[host_id]:
if osd_tree[osd_id]['status'] == 'up':
if osd_tree[osd_id]['reweight'] == 0.0:
reasons.add(constants.ALARM_REASON_OSDS_OUT)
degraded_hosts.add(host_id)
else:
severity = fm_constants.FM_ALARM_SEVERITY_MAJOR
elif osd_tree[osd_id]['status'] == 'down':
reasons.add(constants.ALARM_REASON_OSDS_DOWN)
degraded_hosts.add(host_id)
if constants.ALARM_REASON_OSDS_OUT in reasons \
and constants.ALARM_REASON_OSDS_DOWN in reasons:
reasons.add(constants.ALARM_REASON_OSDS_DOWN_OUT)
reasons.remove(constants.ALARM_REASON_OSDS_OUT)
if constants.ALARM_REASON_OSDS_DOWN in reasons \
and constants.ALARM_REASON_OSDS_DOWN_OUT in reasons:
reasons.remove(constants.ALARM_REASON_OSDS_DOWN)
reason = "/".join(list(reasons))
if severity == fm_constants.FM_ALARM_SEVERITY_CRITICAL:
reason = "{} {}: {}".format(
fm_constants.ALARM_CRITICAL_REPLICATION,
osd_tree[group_id]['name'],
reason)
elif severity == fm_constants.FM_ALARM_SEVERITY_MAJOR:
reason = "{} {}: {}".format(
fm_constants.ALARM_MAJOR_REPLICATION,
osd_tree[group_id]['name'],
reason)
if len(degraded_hosts) == 0:
if len(hosts) < 2:
fn_report_alarm(
osd_tree[group_id]['name'],
"{} {}: {}".format(
fm_constants.ALARM_MAJOR_REPLICATION,
osd_tree[group_id]['name'],
constants.ALARM_REASON_PEER_HOST_DOWN),
fm_constants.FM_ALARM_SEVERITY_MAJOR)
elif len(degraded_hosts) == 1:
fn_report_alarm(
"{}.host={}".format(
osd_tree[group_id]['name'],
osd_tree[list(degraded_hosts)[0]]['name']),
reason, severity)
else:
fn_report_alarm(
osd_tree[group_id]['name'],
reason, severity)
def _check_storage_tier(self, osd_tree, tier_name, fn_report_alarm):
for tier_id in osd_tree:
if osd_tree[tier_id]['type'] != 'root':
continue
if osd_tree[tier_id]['name'] != tier_name:
continue
for group_id in osd_tree[tier_id]['children']:
if osd_tree[group_id]['type'] != 'chassis':
continue
if not osd_tree[group_id]['name'].startswith('group-'):
continue
hosts = []
osds = {}
for host_id in osd_tree[group_id]['children']:
if osd_tree[host_id]['type'] != 'host':
continue
hosts.append(host_id)
osds[host_id] = []
for osd_id in osd_tree[host_id]['children']:
if osd_tree[osd_id]['type'] == 'osd':
osds[host_id].append(osd_id)
self._check_storage_group(osd_tree, group_id, hosts,
osds, fn_report_alarm)
break
def _current_health_alarm_equals(self, reason, severity):
if not self.current_health_alarm:
return False
if getattr(self.current_health_alarm, 'severity', None) != severity:
return False
if getattr(self.current_health_alarm, 'reason_text', None) != reason:
return False
return True
def _report_alarm_osds_health(self):
response, osd_tree = self.service.ceph_api.osd_tree(body='json')
if not response.ok:
LOG.error(_LE("Failed to retrieve Ceph OSD tree: "
"status_code: %(status_code)s, reason: %(reason)s") %
{"status_code": response.status_code,
"reason": response.reason})
return
osd_tree = dict([(n['id'], n) for n in osd_tree['output']['nodes']])
alarms = []
self._check_storage_tier(osd_tree, "storage-tier",
lambda *args: alarms.append(args))
if self.cache_enabled:
self._check_storage_tier(osd_tree, "cache-tier",
lambda *args: alarms.append(args))
old_alarms = {}
for alarm_id in [
fm_constants.FM_ALARM_ID_STORAGE_CEPH_MAJOR,
fm_constants.FM_ALARM_ID_STORAGE_CEPH_CRITICAL]:
alarm_list = self.service.fm_api.get_faults_by_id(alarm_id)
if not alarm_list:
continue
for alarm in alarm_list:
if alarm.entity_instance_id not in old_alarms:
old_alarms[alarm.entity_instance_id] = []
old_alarms[alarm.entity_instance_id].append(
(alarm.alarm_id, alarm.reason_text))
for peer_group, reason, severity in alarms:
if self._current_health_alarm_equals(reason, severity):
continue
alarm_critical_major = fm_constants.FM_ALARM_ID_STORAGE_CEPH_MAJOR
if severity == fm_constants.FM_ALARM_SEVERITY_CRITICAL:
alarm_critical_major = (
fm_constants.FM_ALARM_ID_STORAGE_CEPH_CRITICAL)
entity_instance_id = (
self.service.entity_instance_id + '.peergroup=' + peer_group)
alarm_already_exists = False
if entity_instance_id in old_alarms:
for alarm_id, old_reason in old_alarms[entity_instance_id]:
if (reason == old_reason and
alarm_id == alarm_critical_major):
# if the alarm is exactly the same, we don't need
# to recreate it
old_alarms[entity_instance_id].remove(
(alarm_id, old_reason))
alarm_already_exists = True
elif (alarm_id == alarm_critical_major):
# if we change just the reason, then we just remove the
# alarm from the list so we don't remove it at the
# end of the function
old_alarms[entity_instance_id].remove(
(alarm_id, old_reason))
if (len(old_alarms[entity_instance_id]) == 0):
del old_alarms[entity_instance_id]
# in case the alarm is exactly the same, we skip the alarm set
if alarm_already_exists is True:
continue
major_repair_action = constants.REPAIR_ACTION_MAJOR_CRITICAL_ALARM
fault = fm_api.Fault(
alarm_id=alarm_critical_major,
alarm_type=fm_constants.FM_ALARM_TYPE_4,
alarm_state=fm_constants.FM_ALARM_STATE_SET,
entity_type_id=fm_constants.FM_ENTITY_TYPE_CLUSTER,
entity_instance_id=entity_instance_id,
severity=severity,
reason_text=reason,
probable_cause=fm_constants.ALARM_PROBABLE_CAUSE_15,
proposed_repair_action=major_repair_action,
service_affecting=constants.SERVICE_AFFECTING['HEALTH_WARN'])
alarm_uuid = self.service.fm_api.set_fault(fault)
if alarm_uuid:
LOG.info(_LI(
"Created storage alarm %(alarm_uuid)s - "
"severity: %(severity)s, reason: %(reason)s, "
"service_affecting: %(service_affecting)s") % {
"alarm_uuid": str(alarm_uuid),
"severity": str(severity),
"reason": reason,
"service_affecting": str(
constants.SERVICE_AFFECTING['HEALTH_WARN'])})
else:
LOG.error(_LE(
"Failed to create storage alarm - "
"severity: %(severity)s, reason: %(reason)s, "
"service_affecting: %(service_affecting)s") % {
"severity": str(severity),
"reason": reason,
"service_affecting": str(
constants.SERVICE_AFFECTING['HEALTH_WARN'])})
for entity_instance_id in old_alarms:
for alarm_id, old_reason in old_alarms[entity_instance_id]:
self.service.fm_api.clear_fault(alarm_id, entity_instance_id)
@staticmethod
def _parse_reason(health):
""" Parse reason strings received from Ceph """
if health['health'] in constants.CEPH_STATUS_CUSTOM:
# Don't parse reason messages that we added
return "Storage Alarm Condition: %(health)s. %(detail)s" % health
reasons_lst = health['detail'].split(';')
parsed_reasons_text = ""
# Check if PGs have issues - we can't safely store the entire message
# as it tends to be long
for reason in reasons_lst:
if "pgs" in reason:
parsed_reasons_text += "PGs are degraded/stuck or undersized"
break
# Extract recovery status
parsed_reasons = [r.strip() for r in reasons_lst if 'recovery' in r]
if parsed_reasons:
parsed_reasons_text += ";" + ";".join(parsed_reasons)
# We need to keep the most important parts of the messages when storing
# them to fm alarms, therefore text between [] brackets is truncated if
# max size is reached.
# Add brackets, if needed
if len(parsed_reasons_text):
lbracket = " ["
rbracket = "]"
else:
lbracket = ""
rbracket = ""
msg = {"head": "Storage Alarm Condition: ",
"tail": ". Please check 'ceph -s' for more details."}
max_size = constants.FM_ALARM_REASON_MAX_SIZE - \
len(msg["head"]) - len(msg["tail"])
return (
msg['head'] +
(health['health'] + lbracket + parsed_reasons_text)[:max_size-1] +
rbracket + msg['tail'])
def _report_fault(self, health, alarm_id):
if alarm_id == fm_constants.FM_ALARM_ID_STORAGE_CEPH:
new_severity = constants.SEVERITY[health['health']]
new_reason_text = self._parse_reason(health)
new_service_affecting = \
constants.SERVICE_AFFECTING[health['health']]
# Raise or update alarm if necessary
if ((not self.current_health_alarm) or
(self.current_health_alarm.__dict__['severity'] !=
new_severity) or
(self.current_health_alarm.__dict__['reason_text'] !=
new_reason_text) or
(self.current_health_alarm.__dict__['service_affecting'] !=
str(new_service_affecting))):
fault = fm_api.Fault(
alarm_id=fm_constants.FM_ALARM_ID_STORAGE_CEPH,
alarm_type=fm_constants.FM_ALARM_TYPE_4,
alarm_state=fm_constants.FM_ALARM_STATE_SET,
entity_type_id=fm_constants.FM_ENTITY_TYPE_CLUSTER,
entity_instance_id=self.service.entity_instance_id,
severity=new_severity,
reason_text=new_reason_text,
probable_cause=fm_constants.ALARM_PROBABLE_CAUSE_15,
proposed_repair_action=constants.REPAIR_ACTION,
service_affecting=new_service_affecting)
alarm_uuid = self.service.fm_api.set_fault(fault)
if alarm_uuid:
LOG.info(_LI(
"Created storage alarm %(alarm_uuid)s - "
"severity: %(severity)s, reason: %(reason)s, "
"service_affecting: %(service_affecting)s") % {
"alarm_uuid": alarm_uuid,
"severity": new_severity,
"reason": new_reason_text,
"service_affecting": new_service_affecting})
else:
LOG.error(_LE(
"Failed to create storage alarm - "
"severity: %(severity)s, reason: %(reason)s "
"service_affecting: %(service_affecting)s") % {
"severity": new_severity,
"reason": new_reason_text,
"service_affecting": new_service_affecting})
# Log detailed reason for later analysis
if (self.current_ceph_health != health['health'] or
self.detailed_health_reason != health['detail']):
LOG.info(_LI("Ceph status changed: %(health)s "
"detailed reason: %(detail)s") % health)
self.current_ceph_health = health['health']
self.detailed_health_reason = health['detail']
elif (alarm_id == fm_constants.FM_ALARM_ID_STORAGE_CEPH_FREE_SPACE and
not health['tier_eid'] in self.current_quota_alarms):
quota_reason_text = ("Quota/Space mismatch for the %s tier. The "
"sum of Ceph pool quotas does not match the "
"tier size." % health['tier_name'])
fault = fm_api.Fault(
alarm_id=fm_constants.FM_ALARM_ID_STORAGE_CEPH_FREE_SPACE,
alarm_state=fm_constants.FM_ALARM_STATE_SET,
entity_type_id=fm_constants.FM_ENTITY_TYPE_CLUSTER,
entity_instance_id=health['tier_eid'],
severity=fm_constants.FM_ALARM_SEVERITY_MINOR,
reason_text=quota_reason_text,
alarm_type=fm_constants.FM_ALARM_TYPE_7,
probable_cause=fm_constants.ALARM_PROBABLE_CAUSE_75,
proposed_repair_action=(
"Update ceph storage pool quotas to use all available "
"cluster space for the %s tier." % health['tier_name']),
service_affecting=False)
alarm_uuid = self.service.fm_api.set_fault(fault)
if alarm_uuid:
LOG.info(_LI(
"Created storage quota storage alarm %(alarm_uuid)s. "
"Reason: %(reason)s") % {
"alarm_uuid": alarm_uuid, "reason": quota_reason_text})
else:
LOG.error(_LE("Failed to create quota "
"storage alarm. Reason: %s") % quota_reason_text)
def _clear_fault(self, alarm_id, entity_instance_id=None):
# Only clear alarm if there is one already raised
if (alarm_id == fm_constants.FM_ALARM_ID_STORAGE_CEPH and
self.current_health_alarm):
LOG.info(_LI("Clearing health alarm"))
self.service.fm_api.clear_fault(
fm_constants.FM_ALARM_ID_STORAGE_CEPH,
self.service.entity_instance_id)
elif (alarm_id == fm_constants.FM_ALARM_ID_STORAGE_CEPH_FREE_SPACE and
entity_instance_id in self.current_quota_alarms):
LOG.info(_LI("Clearing quota alarm with entity_instance_id %s")
% entity_instance_id)
self.service.fm_api.clear_fault(
fm_constants.FM_ALARM_ID_STORAGE_CEPH_FREE_SPACE,
entity_instance_id)
def clear_critical_alarm(self, group_name):
alarm_list = self.service.fm_api.get_faults_by_id(
fm_constants.FM_ALARM_ID_STORAGE_CEPH_CRITICAL)
if alarm_list:
for alarm in range(len(alarm_list)):
group_id = alarm_list[alarm].entity_instance_id.find("group-")
group_instance_name = (
"group-" +
alarm_list[alarm].entity_instance_id[group_id + 6])
if group_name == group_instance_name:
self.service.fm_api.clear_fault(
fm_constants.FM_ALARM_ID_STORAGE_CEPH_CRITICAL,
alarm_list[alarm].entity_instance_id)
def clear_all_major_critical(self, group_name=None):
# clear major alarms
alarm_list = self.service.fm_api.get_faults_by_id(
fm_constants.FM_ALARM_ID_STORAGE_CEPH_MAJOR)
if alarm_list:
for alarm in range(len(alarm_list)):
if group_name is not None:
group_id = (
alarm_list[alarm].entity_instance_id.find("group-"))
group_instance_name = (
"group-" +
alarm_list[alarm].entity_instance_id[group_id+6])
if group_name == group_instance_name:
self.service.fm_api.clear_fault(
fm_constants.FM_ALARM_ID_STORAGE_CEPH_MAJOR,
alarm_list[alarm].entity_instance_id)
else:
self.service.fm_api.clear_fault(
fm_constants.FM_ALARM_ID_STORAGE_CEPH_MAJOR,
alarm_list[alarm].entity_instance_id)
# clear critical alarms
alarm_list = self.service.fm_api.get_faults_by_id(
fm_constants.FM_ALARM_ID_STORAGE_CEPH_CRITICAL)
if alarm_list:
for alarm in range(len(alarm_list)):
if group_name is not None:
group_id = (
alarm_list[alarm].entity_instance_id.find("group-"))
group_instance_name = (
"group-" +
alarm_list[alarm].entity_instance_id[group_id + 6])
if group_name == group_instance_name:
self.service.fm_api.clear_fault(
fm_constants.FM_ALARM_ID_STORAGE_CEPH_CRITICAL,
alarm_list[alarm].entity_instance_id)
else:
self.service.fm_api.clear_fault(
fm_constants.FM_ALARM_ID_STORAGE_CEPH_CRITICAL,
alarm_list[alarm].entity_instance_id)
def _get_current_alarms(self):
""" Retrieve currently raised alarm """
self.current_health_alarm = self.service.fm_api.get_fault(
fm_constants.FM_ALARM_ID_STORAGE_CEPH,
self.service.entity_instance_id)
quota_faults = self.service.fm_api.get_faults_by_id(
fm_constants.FM_ALARM_ID_STORAGE_CEPH_FREE_SPACE)
if quota_faults:
self.current_quota_alarms = [f.entity_instance_id
for f in quota_faults]
else:
self.current_quota_alarms = []

View File

@ -0,0 +1,249 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (c) 2016-2018 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
# https://chrigl.de/posts/2014/08/27/oslo-messaging-example.html
# http://docs.openstack.org/developer/oslo.messaging/server.html
import sys
# noinspection PyUnresolvedReferences
import eventlet
# noinspection PyUnresolvedReferences
import oslo_messaging as messaging
# noinspection PyUnresolvedReferences
from fm_api import fm_api
# noinspection PyUnresolvedReferences
from oslo_config import cfg
# noinspection PyUnresolvedReferences
from oslo_log import log as logging
# noinspection PyUnresolvedReferences
from oslo_service import service
# noinspection PyUnresolvedReferences
from oslo_service.periodic_task import PeriodicTasks
# noinspection PyUnresolvedReferences
from oslo_service import loopingcall
from sysinv.conductor.cache_tiering_service_config import ServiceConfig
# noinspection PyUnresolvedReferences
from cephclient import wrapper
from monitor import Monitor
from cache_tiering import CacheTiering
import exception
import constants
from i18n import _LI, _LW
from retrying import retry
eventlet.monkey_patch(all=True)
CONF = cfg.CONF
CONF.register_opts([
cfg.StrOpt('sysinv_api_bind_ip',
default='0.0.0.0',
help='IP for the Ceph Manager server to bind to')])
CONF.logging_default_format_string = (
'%(asctime)s.%(msecs)03d %(process)d '
'%(levelname)s %(name)s [-] %(message)s')
logging.register_options(CONF)
logging.setup(CONF, __name__)
LOG = logging.getLogger(__name__)
CONF.rpc_backend = 'rabbit'
class RpcEndpoint(PeriodicTasks):
def __init__(self, service=None):
self.service = service
def cache_tiering_enable_cache(self, _, new_config, applied_config):
LOG.info(_LI("Enabling cache"))
try:
self.service.cache_tiering.enable_cache(
new_config, applied_config)
except exception.CephManagerException as e:
self.service.sysinv_conductor.call(
{}, 'cache_tiering_enable_cache_complete',
success=False, exception=str(e.message),
new_config=new_config, applied_config=applied_config)
def cache_tiering_disable_cache(self, _, new_config, applied_config):
LOG.info(_LI("Disabling cache"))
try:
self.service.cache_tiering.disable_cache(
new_config, applied_config)
except exception.CephManagerException as e:
self.service.sysinv_conductor.call(
{}, 'cache_tiering_disable_cache_complete',
success=False, exception=str(e.message),
new_config=new_config, applied_config=applied_config)
def cache_tiering_operation_in_progress(self, _):
is_locked = self.service.cache_tiering.is_locked()
LOG.info(_LI("Cache tiering operation "
"is in progress: %s") % str(is_locked).lower())
return is_locked
def get_primary_tier_size(self, _):
"""Get the ceph size for the primary tier.
returns: an int for the size (in GB) of the tier
"""
tiers_size = self.service.monitor.tiers_size
primary_tier_size = tiers_size.get(
self.service.monitor.primary_tier_name, 0)
LOG.debug(_LI("Ceph cluster primary tier size: %s GB") %
str(primary_tier_size))
return primary_tier_size
def get_tiers_size(self, _):
"""Get the ceph cluster tier sizes.
returns: a dict of sizes (in GB) by tier name
"""
tiers_size = self.service.monitor.tiers_size
LOG.debug(_LI("Ceph cluster tiers (size in GB): %s") %
str(tiers_size))
return tiers_size
def is_cluster_up(self, _):
"""Report if the last health check was successful.
This is an independent view of the cluster accessibility that can be
used by the sysinv conductor to gate ceph API calls which would timeout
and potentially block other operations.
This view is only updated at the rate the monitor checks for a cluster
uuid or a health check (CEPH_HEALTH_CHECK_INTERVAL)
returns: boolean True if last health check was successful else False
"""
return self.service.monitor.cluster_is_up
# This class is needed only when upgrading from 16.10 to 17.x
# TODO: remove it after 1st 17.x release
#
class SysinvConductorUpgradeApi(object):
def __init__(self):
self.sysinv_conductor = None
super(SysinvConductorUpgradeApi, self).__init__()
def get_software_upgrade_status(self):
LOG.info(_LI("Getting software upgrade status from sysinv"))
cctxt = self.sysinv_conductor.prepare(timeout=2)
upgrade = cctxt.call({}, 'get_software_upgrade_status')
LOG.info(_LI("Software upgrade status: %s") % str(upgrade))
return upgrade
@retry(wait_fixed=1000,
retry_on_exception=lambda exception:
LOG.warn(_LW(
"Getting software upgrade status failed "
"with: %s. Retrying... ") % str(exception)) or True)
def retry_get_software_upgrade_status(self):
return self.get_software_upgrade_status()
class Service(SysinvConductorUpgradeApi, service.Service):
def __init__(self, conf):
super(Service, self).__init__()
self.conf = conf
self.rpc_server = None
self.sysinv_conductor = None
self.ceph_api = None
self.entity_instance_id = ''
self.fm_api = fm_api.FaultAPIs()
self.monitor = Monitor(self)
self.cache_tiering = CacheTiering(self)
self.config = None
self.config_desired = None
self.config_applied = None
def start(self):
super(Service, self).start()
transport = messaging.get_transport(self.conf)
self.sysinv_conductor = messaging.RPCClient(
transport,
messaging.Target(
topic=constants.SYSINV_CONDUCTOR_TOPIC))
self.ceph_api = wrapper.CephWrapper(
endpoint='http://localhost:5001/api/v0.1/')
# Get initial config from sysinv and send it to
# services that need it before starting them
config = self.get_caching_tier_config()
self.monitor.setup(config)
self.rpc_server = messaging.get_rpc_server(
transport,
messaging.Target(topic=constants.CEPH_MANAGER_TOPIC,
server=self.conf.sysinv_api_bind_ip),
[RpcEndpoint(self)],
executor='eventlet')
self.rpc_server.start()
self.cache_tiering.set_initial_config(config)
eventlet.spawn_n(self.monitor.run)
periodic = loopingcall.FixedIntervalLoopingCall(
self.update_ceph_target_max_bytes)
periodic.start(interval=300)
def get_caching_tier_config(self):
LOG.info("Getting cache tiering configuration from sysinv")
while True:
# Get initial configuration from sysinv,
# retry until sysinv starts
try:
cctxt = self.sysinv_conductor.prepare(timeout=2)
config = cctxt.call({}, 'cache_tiering_get_config')
for section in config:
if section == constants.CACHE_TIERING:
self.config = ServiceConfig().from_dict(
config[section])
elif section == constants.CACHE_TIERING_DESIRED:
self.config_desired = ServiceConfig().from_dict(
config[section])
elif section == constants.CACHE_TIERING_APPLIED:
self.config_applied = ServiceConfig().from_dict(
config[section])
LOG.info("Cache tiering configs: {}".format(config))
return config
except Exception as ex:
# In production we should retry on every error until connection
# is reestablished.
LOG.warn("Getting cache tiering configuration failed "
"with: {}. Retrying... ".format(str(ex)))
def stop(self):
try:
self.rpc_server.stop()
self.rpc_server.wait()
except Exception:
pass
super(Service, self).stop()
def update_ceph_target_max_bytes(self):
try:
self.cache_tiering.update_cache_target_max_bytes()
except Exception as ex:
LOG.exception("Updating Ceph target max bytes failed "
"with: {} retrying on next cycle.".format(str(ex)))
def run_service():
CONF(sys.argv[1:])
logging.setup(CONF, "ceph-manager")
launcher = service.launch(CONF, Service(CONF), workers=1)
launcher.wait()
if __name__ == "__main__":
run_service()

View File

@ -0,0 +1,309 @@
#
# Copyright (c) 2016 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import unittest
import mock
import subprocess
import math
from ..cache_tiering import CacheTiering
from ..cache_tiering import LOG as CT_LOG
from ..constants import CACHE_FLUSH_OBJECTS_THRESHOLD
from ..constants import CACHE_FLUSH_MIN_WAIT_OBJ_COUNT_DECREASE_SEC as MIN_WAIT
from ..constants import CACHE_FLUSH_MAX_WAIT_OBJ_COUNT_DECREASE_SEC as MAX_WAIT
from ..exception import CephCacheFlushFailure
class TestCacheFlush(unittest.TestCase):
def setUp(self):
self.service = mock.Mock()
self.ceph_api = mock.Mock()
self.service.ceph_api = self.ceph_api
self.cache_tiering = CacheTiering(self.service)
@mock.patch('subprocess.check_call')
def test_set_param_fail(self, mock_proc_call):
self.ceph_api.osd_set_pool_param = mock.Mock()
self.ceph_api.osd_set_pool_param.return_value = (
mock.Mock(ok=False, status_code=500, reason='denied'),
{})
self.cache_tiering.cache_flush({'pool_name': 'test'})
mock_proc_call.assert_called_with(
['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all'])
@mock.patch('subprocess.check_call')
def test_df_fail(self, mock_proc_call):
self.ceph_api.osd_set_pool_param = mock.Mock()
self.ceph_api.osd_set_pool_param.return_value = (
mock.Mock(ok=True, status_code=200, reason='OK'),
{})
self.ceph_api.df = mock.Mock()
self.ceph_api.df.return_value = (
mock.Mock(ok=False, status_code=500, reason='denied'),
{})
self.cache_tiering.cache_flush({'pool_name': 'test'})
self.ceph_api.osd_set_pool_param.assert_called_once_with(
'test-cache', 'target_max_objects', 1, force=None, body='json')
mock_proc_call.assert_called_with(
['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all'])
@mock.patch('subprocess.check_call')
def test_rados_evict_fail_raises(self, mock_proc_call):
mock_proc_call.side_effect = subprocess.CalledProcessError(1, ['cmd'])
self.ceph_api.osd_set_pool_param = mock.Mock()
self.ceph_api.osd_set_pool_param.return_value = (
mock.Mock(ok=False, status_code=500, reason='denied'),
{})
self.assertRaises(CephCacheFlushFailure,
self.cache_tiering.cache_flush,
{'pool_name': 'test'})
mock_proc_call.assert_called_with(
['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all'])
@mock.patch('subprocess.check_call')
def test_df_missing_pool(self, mock_proc_call):
self.ceph_api.osd_set_pool_param = mock.Mock()
self.ceph_api.osd_set_pool_param.return_value = (
mock.Mock(ok=True, status_code=200, reason='OK'),
{})
self.ceph_api.df = mock.Mock()
self.ceph_api.df.return_value = (
mock.Mock(ok=True, status_code=200, reason='OK'),
{'output': {
'pools': [
{'id': 0,
'name': 'rbd',
'stats': {'bytes_used': 0,
'kb_used': 0,
'max_avail': 9588428800,
'objects': 0}}]},
'status': 'OK'})
with mock.patch.object(CT_LOG, 'warn') as mock_lw:
self.cache_tiering.cache_flush({'pool_name': 'test'})
self.ceph_api.df.assert_called_once_with(body='json')
for c in mock_lw.call_args_list:
if 'Missing pool free space' in c[0][0]:
break
else:
self.fail('expected log warning')
self.ceph_api.osd_set_pool_param.assert_called_once_with(
'test-cache', 'target_max_objects', 1, force=None, body='json')
mock_proc_call.assert_called_with(
['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all'])
@mock.patch('subprocess.check_call')
def test_df_objects_empty(self, mock_proc_call):
self.ceph_api.osd_set_pool_param = mock.Mock()
self.ceph_api.osd_set_pool_param.return_value = (
mock.Mock(ok=True, status_code=200, reason='OK'),
{})
self.ceph_api.df = mock.Mock()
self.ceph_api.df.return_value = (
mock.Mock(ok=True, status_code=200, reason='OK'),
{'output': {
'pools': [
{'id': 0,
'name': 'test-cache',
'stats': {'bytes_used': 0,
'kb_used': 0,
'max_avail': 9588428800,
'objects': 0}}]},
'status': 'OK'})
self.cache_tiering.cache_flush({'pool_name': 'test'})
self.ceph_api.df.assert_called_once_with(body='json')
self.ceph_api.osd_set_pool_param.assert_called_once_with(
'test-cache', 'target_max_objects', 1, force=None, body='json')
mock_proc_call.assert_called_with(
['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all'])
@mock.patch('time.sleep')
@mock.patch('subprocess.check_call')
def test_df_objects_above_threshold(self, mock_proc_call, mock_time_sleep):
self.ceph_api.osd_set_pool_param = mock.Mock()
self.ceph_api.osd_set_pool_param.return_value = (
mock.Mock(ok=True, status_code=200, reason='OK'),
{})
self.ceph_api.df = mock.Mock()
self.ceph_api.df.side_effect = [
(mock.Mock(ok=True, status_code=200, reason='OK'),
{'output': {
'pools': [
{'id': 0,
'name': 'test-cache',
'stats': {'bytes_used': 0,
'kb_used': 0,
'max_avail': 9588428800,
'objects': CACHE_FLUSH_OBJECTS_THRESHOLD}}]},
'status': 'OK'}),
(mock.Mock(ok=True, status_code=200, reason='OK'),
{'output': {
'pools': [
{'id': 0,
'name': 'test-cache',
'stats': {'bytes_used': 0,
'kb_used': 0,
'max_avail': 9588428800,
'objects':
CACHE_FLUSH_OBJECTS_THRESHOLD - 1}}]},
'status': 'OK'})]
self.cache_tiering.cache_flush({'pool_name': 'test'})
self.ceph_api.osd_set_pool_param.assert_called_once_with(
'test-cache', 'target_max_objects', 1, force=None, body='json')
self.ceph_api.df.assert_called_with(body='json')
mock_time_sleep.assert_called_once_with(MIN_WAIT)
mock_proc_call.assert_called_with(
['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all'])
@mock.patch('time.sleep')
@mock.patch('subprocess.check_call')
def test_df_objects_interval_increase(self, mock_proc_call,
mock_time_sleep):
self.ceph_api.osd_set_pool_param = mock.Mock()
self.ceph_api.osd_set_pool_param.return_value = (
mock.Mock(ok=True, status_code=200, reason='OK'),
{})
self.ceph_api.df = mock.Mock()
self.ceph_api.df.side_effect = [
(mock.Mock(ok=True, status_code=200, reason='OK'),
{'output': {
'pools': [
{'id': 0,
'name': 'test-cache',
'stats': {'bytes_used': 0,
'kb_used': 0,
'max_avail': 9588428800,
'objects':
CACHE_FLUSH_OBJECTS_THRESHOLD + 1}}]},
'status': 'OK'}),
(mock.Mock(ok=True, status_code=200, reason='OK'),
{'output': {
'pools': [
{'id': 0,
'name': 'test-cache',
'stats': {'bytes_used': 0,
'kb_used': 0,
'max_avail': 9588428800,
'objects':
CACHE_FLUSH_OBJECTS_THRESHOLD + 1}}]},
'status': 'OK'}),
(mock.Mock(ok=True, status_code=200, reason='OK'),
{'output': {
'pools': [
{'id': 0,
'name': 'test-cache',
'stats': {'bytes_used': 0,
'kb_used': 0,
'max_avail': 9588428800,
'objects':
CACHE_FLUSH_OBJECTS_THRESHOLD + 1}}]},
'status': 'OK'}),
(mock.Mock(ok=True, status_code=200, reason='OK'),
{'output': {
'pools': [
{'id': 0,
'name': 'test-cache',
'stats': {'bytes_used': 0,
'kb_used': 0,
'max_avail': 9588428800,
'objects':
CACHE_FLUSH_OBJECTS_THRESHOLD - 1}}]},
'status': 'OK'})]
self.cache_tiering.cache_flush({'pool_name': 'test'})
self.ceph_api.osd_set_pool_param.assert_called_once_with(
'test-cache', 'target_max_objects', 1, force=None, body='json')
self.ceph_api.df.assert_called_with(body='json')
self.assertEqual([c[0][0] for c in mock_time_sleep.call_args_list],
[MIN_WAIT,
MIN_WAIT * 2,
MIN_WAIT * 4])
mock_proc_call.assert_called_with(
['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all'])
@mock.patch('time.sleep')
@mock.patch('subprocess.check_call')
def test_df_objects_allways_over_threshold(self, mock_proc_call,
mock_time_sleep):
self.ceph_api.osd_set_pool_param = mock.Mock()
self.ceph_api.osd_set_pool_param.return_value = (
mock.Mock(ok=True, status_code=200, reason='OK'),
{})
self.ceph_api.df = mock.Mock()
self.ceph_api.df.return_value = (
mock.Mock(ok=True, status_code=200, reason='OK'),
{'output': {
'pools': [
{'id': 0,
'name': 'test-cache',
'stats': {'bytes_used': 0,
'kb_used': 0,
'max_avail': 9588428800,
'objects':
CACHE_FLUSH_OBJECTS_THRESHOLD + 1}}]},
'status': 'OK'})
# noinspection PyTypeChecker
mock_time_sleep.side_effect = \
[None]*int(math.ceil(math.log(float(MAX_WAIT)/MIN_WAIT, 2)) + 1) \
+ [Exception('too many sleeps')]
self.cache_tiering.cache_flush({'pool_name': 'test'})
self.ceph_api.osd_set_pool_param.assert_called_once_with(
'test-cache', 'target_max_objects', 1, force=None, body='json')
self.ceph_api.df.assert_called_with(body='json')
expected_sleep = []
interval = MIN_WAIT
while interval <= MAX_WAIT:
expected_sleep.append(interval)
interval *= 2
self.assertEqual([c[0][0] for c in mock_time_sleep.call_args_list],
expected_sleep)
mock_proc_call.assert_called_with(
['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all'])
@mock.patch('time.sleep')
@mock.patch('subprocess.check_call')
def test_df_objects_increase(self, mock_proc_call, mock_time_sleep):
self.ceph_api.osd_set_pool_param = mock.Mock()
self.ceph_api.osd_set_pool_param.return_value = (
mock.Mock(ok=True, status_code=200, reason='OK'),
{})
self.ceph_api.df = mock.Mock()
self.ceph_api.df.side_effect = [
(mock.Mock(ok=True, status_code=200, reason='OK'),
{'output': {
'pools': [
{'id': 0,
'name': 'test-cache',
'stats': {'bytes_used': 0,
'kb_used': 0,
'max_avail': 9588428800,
'objects':
CACHE_FLUSH_OBJECTS_THRESHOLD + 1}}]},
'status': 'OK'}),
(mock.Mock(ok=True, status_code=200, reason='OK'),
{'output': {
'pools': [
{'id': 0,
'name': 'test-cache',
'stats': {'bytes_used': 0,
'kb_used': 0,
'max_avail': 9588428800,
'objects':
CACHE_FLUSH_OBJECTS_THRESHOLD + 2}}]},
'status': 'OK'})]
with mock.patch.object(CT_LOG, 'warn') as mock_lw:
self.cache_tiering.cache_flush({'pool_name': 'test'})
for c in mock_lw.call_args_list:
if 'Unexpected increase' in c[0][0]:
break
else:
self.fail('expected log warning')
self.ceph_api.df.assert_called_with(body='json')
mock_time_sleep.assert_called_once_with(MIN_WAIT)
self.ceph_api.osd_set_pool_param.assert_called_once_with(
'test-cache', 'target_max_objects', 1, force=None, body='json')
mock_proc_call.assert_called_with(
['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all'])

View File

@ -0,0 +1,19 @@
#!/usr/bin/env python
#
# Copyright (c) 2013-2014, 2016 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import setuptools
setuptools.setup(
name='ceph_manager',
version='1.0.0',
description='CEPH manager',
license='Apache-2.0',
packages=['ceph_manager'],
entry_points={
}
)

View File

@ -0,0 +1,10 @@
# The order of packages is significant, because pip processes them in the order
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
mock
flake8
eventlet
pytest
oslo.log
oslo.i18n

View File

@ -0,0 +1,29 @@
# adapted from glance tox.ini
[tox]
minversion = 1.6
envlist = py27,pep8
skipsdist = True
# tox does not work if the path to the workdir is too long, so move it to /tmp
toxworkdir = /tmp/{env:USER}_ceph_manager_tox
[testenv]
setenv = VIRTUAL_ENV={envdir}
usedevelop = True
install_command = pip install --no-use-wheel -U --force-reinstall {opts} {packages}
deps = -r{toxinidir}/test-requirements.txt
commands = py.test {posargs}
whitelist_externals = bash
passenv = http_proxy HTTP_PROXY https_proxy HTTPS_PROXY no_proxy NO_PROXY
[testenv:py27]
basepython = python2.7
setenv =
PYTHONPATH={toxinidir}/../../../../sysinv/recipes-common/sysinv/sysinv:{toxinidir}/../../../../config/recipes-common/tsconfig/tsconfig
[testenv:pep8]
commands =
flake8 {posargs}
[flake8]
exclude = .venv,.git,.tox,dist,doc,etc,*glance/locale*,*lib/python*,*egg,build

View File

@ -0,0 +1,11 @@
/var/log/ceph-manager.log {
nodateext
size 10M
start 1
rotate 10
missingok
notifempty
compress
delaycompress
copytruncate
}

View File

@ -0,0 +1,17 @@
[Unit]
Description=Handle Ceph API calls and provide status updates via alarms
After=ceph.target
[Service]
Type=forking
Restart=no
KillMode=process
RemainAfterExit=yes
ExecStart=/etc/rc.d/init.d/ceph-manager start
ExecStop=/etc/rc.d/init.d/ceph-manager stop
ExecReload=/etc/rc.d/init.d/ceph-manager reload
PIDFile=/var/run/ceph/ceph-manager.pid
[Install]
WantedBy=multi-user.target

View File

@ -0,0 +1,17 @@
#!/usr/bin/env python
#
# Copyright (c) 2016 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import sys
try:
from ceph_manager.server import run_service
except EnvironmentError as e:
print >> sys.stderr, "Error importing ceph_manager: ", str(e)
sys.exit(1)
run_service()

View File

@ -0,0 +1,103 @@
#!/bin/sh
#
# Copyright (c) 2013-2014, 2016 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
### BEGIN INIT INFO
# Provides: ceph-manager
# Required-Start: $ceph
# Required-Stop: $ceph
# Default-Start: 2 3 4 5
# Default-Stop: 0 1 6
# Short-Description: Daemon for polling ceph status
# Description: Daemon for polling ceph status
### END INIT INFO
DESC="ceph-manager"
DAEMON="/usr/bin/ceph-manager"
RUNDIR="/var/run/ceph"
PIDFILE=$RUNDIR/$DESC.pid
CONFIGFILE="/etc/sysinv/sysinv.conf"
LOGFILE="/var/log/ceph-manager.log"
start()
{
if [ -e $PIDFILE ]; then
PIDDIR=/prod/$(cat $PIDFILE)
if [ -d ${PIDFILE} ]; then
echo "$DESC already running."
exit 0
else
echo "Removing stale PID file $PIDFILE"
rm -f $PIDFILE
fi
fi
echo -n "Starting $DESC..."
mkdir -p $RUNDIR
start-stop-daemon --start --quiet \
--pidfile ${PIDFILE} --exec ${DAEMON} \
--make-pidfile --background \
-- --log-file=$LOGFILE --config-file=$CONFIGFILE
if [ $? -eq 0 ]; then
echo "done."
else
echo "failed."
exit 1
fi
}
stop()
{
echo -n "Stopping $DESC..."
start-stop-daemon --stop --quiet --pidfile $PIDFILE --retry 60
if [ $? -eq 0 ]; then
echo "done."
else
echo "failed."
fi
rm -f $PIDFILE
}
status()
{
pid=`cat $PIDFILE 2>/dev/null`
if [ -n "$pid" ]; then
if ps -p $pid &> /dev/null ; then
echo "$DESC is running"
exit 0
else
echo "$DESC is not running but has pid file"
exit 1
fi
fi
echo "$DESC is not running"
exit 3
}
case "$1" in
start)
start
;;
stop)
stop
;;
restart|force-reload|reload)
stop
start
;;
status)
status
;;
*)
echo "Usage: $0 {start|stop|force-reload|restart|reload|status}"
exit 1
;;
esac
exit 0