feat: implement support for the cephfs-share relation

This allows using the `cephfs-client` charm to connect
to the CephFS share without having to do a manual
setup.

Change-Id: I1823dc0624c66232cd282ed098c7a58ea7456fa8
This commit is contained in:
José Julián Espina 2024-05-03 18:00:57 -06:00
parent 8655c1b81a
commit d3288e9b87
No known key found for this signature in database
GPG Key ID: 1E1AFCAE118C0954
7 changed files with 216 additions and 7 deletions

2
.gitignore vendored
View File

@ -1,10 +1,10 @@
build build
.tox .tox
layers layers
interfaces
.testrepository .testrepository
__pycache__ __pycache__
*.pyc *.pyc
.idea .idea
.stestr .stestr
*.charm *.charm
.vscode

View File

@ -0,0 +1,3 @@
name: cephfs_share
summary: CephFS Share provider interface
version: 1

View File

@ -0,0 +1,99 @@
from charms.reactive import scopes, when, set_flag, clear_flag
from charms.reactive.endpoints import Endpoint
from charmhelpers.core import hookenv
from typing import Iterable, Dict, Set
import json
class _Transaction:
"""Store transaction information between data mappings."""
def __init__(self, added: Set, changed: Set, deleted: Set):
self.added: Set = added
self.changed: Set = changed
self.deleted: Set = deleted
def _eval(relation) -> _Transaction:
"""Evaluate the difference between data in an integration changed databag.
Args:
relation: Relation with the written data.
Returns:
_Transaction:
Transaction info containing the added, deleted, and changed
keys from the relation databag.
"""
# Retrieve the old data from the data key in the unit databag.
old_data = json.loads(relation.to_publish_raw.get("cache", "{}"))
# Retrieve the new data from the relation integration databag.
new_data = {
key: value for key, value in relation.received_app.items() if key != "cache"
}
# These are the keys that were added to the databag and triggered this event.
added = new_data.keys() - old_data.keys()
# These are the keys that were removed from the databag and triggered this event.
deleted = old_data.keys() - new_data.keys()
# These are the keys that were added or already existed in the databag, but had their values changed.
changed = added.union(
{key for key in old_data.keys() & new_data.keys() if old_data[key] != new_data[key]}
)
# Convert the new_data to a serializable format and save it for a next diff check.
relation.to_publish_raw.update({
"cache": json.dumps(new_data)
})
# Return the transaction with all possible changes.
return _Transaction(added, changed, deleted)
class CephFSProvides(Endpoint):
@when('endpoint.{endpoint_name}.changed')
def changed(self):
if hookenv.is_leader():
for relation in self.relations:
transaction = _eval(relation)
if "name" in transaction.added:
set_flag(self.expand_name('{endpoint_name}.available'))
def manage_flags(self):
if not self.is_joined:
clear_flag(
self.expand_name('{endpoint_name}.available')
)
def set_share(self, share_info: Dict, auth_info: Dict) -> None:
"""Set info for mounting a CephFS share.
Args:
relation:
share_info: Dictionary with the information required to mount the CephFS share.
- fsid: ID of the Ceph cluster.
- name: Name of the exported Ceph filesystem.
- path: Exported path of the Ceph filesystem.
- monitor_hosts: Address list of the available Ceph MON nodes.
auth_info: Dictionary with the information required to authenticate against the Ceph cluster.
- username: Name of the user authorized to access the Ceph filesystem.
- key: Cephx key for the authorized user.
Notes:
Only the application leader unit can set the CephFS share data.
"""
if hookenv.is_leader():
share_info = json.dumps({
'fsid': share_info['fsid'],
'name': share_info['name'],
'path': share_info['path'],
'monitor_hosts': share_info['monitor_hosts']
})
auth_info = json.dumps({
'username': auth_info['username'],
'key': auth_info['key']
})
for relation in self.relations:
relation.to_publish_app_raw.update({
"share_info": share_info,
"auth": f"plain:{auth_info}",
})

View File

@ -1,4 +1,4 @@
includes: ['layer:ceph', 'interface:ceph-mds'] includes: ['layer:ceph', 'interface:ceph-mds', 'interface:cephfs_share']
options: options:
basic: basic:
use_venv: True use_venv: True

View File

@ -19,5 +19,9 @@ subordinate: false
requires: requires:
ceph-mds: ceph-mds:
interface: ceph-mds interface: ceph-mds
provides:
cephfs-share:
interface: cephfs_share
extra-bindings: extra-bindings:
public: public:

View File

@ -17,8 +17,10 @@ from charms import reactive
import charmhelpers.core as ch_core import charmhelpers.core as ch_core
from charmhelpers.core.hookenv import ( from charmhelpers.core.hookenv import (
service_name, service_name, application_name,
is_leader,
config) config)
from charmhelpers.contrib.storage.linux import ceph
import charms_openstack.bus import charms_openstack.bus
import charms_openstack.charm as charm import charms_openstack.charm as charm
@ -205,3 +207,37 @@ def storage_ceph_connected(ceph):
weight=metadata_weight, weight=metadata_weight,
app_name=ceph_mds.ceph_pool_app_name) app_name=ceph_mds.ceph_pool_app_name)
ceph_mds.request_cephfs(service, extra_pools=extra_pools) ceph_mds.request_cephfs(service, extra_pools=extra_pools)
# Must have a current request thanks to the call above
rq = ceph_mds.get_current_request()
rq.add_op({
'op': 'create-cephfs-client',
'fs_name': service,
'client_id': '{}-client'.format(service),
'path': "/",
'perms': 'rw',
})
ceph_mds.send_request_if_needed(rq)
@reactive.when_none('charm.paused', 'run-default-update-status')
@reactive.when('cephfs.configured', 'ceph-mds.pools.available',
'cephfs-share.available')
def cephfs_share_available():
cephfs_share = reactive.endpoint_from_flag('cephfs-share.available')
ceph_mds = reactive.endpoint_from_flag('ceph-mds.pools.available')
service = application_name()
if is_leader():
response_key = ceph.get_broker_rsp_key()
# After the `create-cephfs-client` request completes, the
# databag must contain the generated key for that user.
key = ceph_mds.all_joined_units.received[response_key]["key"]
cephfs_share.set_share(share_info={
"fsid": ceph_mds.fsid,
"name": service,
"path": "/",
"monitor_hosts": ceph_mds.mon_hosts(),
}, auth_info={
"username": '{}-client'.format(service),
"key": key
})

View File

@ -32,12 +32,27 @@ class TestRegisteredHooks(test_utils.TestRegisteredHooks):
] ]
hook_set = { hook_set = {
'when': { 'when': {
'config_changed': ('ceph-mds.pools.available',), 'config_changed': (
'storage_ceph_connected': ('ceph-mds.connected',), 'ceph-mds.pools.available',
),
'storage_ceph_connected': (
'ceph-mds.connected',
),
'cephfs_share_available': (
'cephfs.configured',
'ceph-mds.pools.available',
'cephfs-share.available',
),
}, },
'when_none': { 'when_none': {
'config_changed': ('charm.paused', 'config_changed': (
'run-default-update-status',), 'charm.paused',
'run-default-update-status',
),
'cephfs_share_available': (
'charm.paused',
'run-default-update-status',
),
}, },
} }
# test that the hooks were registered via the reactive.ceph_fs module # test that the hooks were registered via the reactive.ceph_fs module
@ -83,3 +98,55 @@ class TestCephFSHandlers(test_utils.PatchHelper):
handlers.config_changed() handlers.config_changed()
self.target.install.assert_called_once_with() self.target.install.assert_called_once_with()
self.target.upgrade_if_available.assert_called_once_with([ceph_mds]) self.target.upgrade_if_available.assert_called_once_with([ceph_mds])
def test_cephfs_share_available(self):
self.patch_object(handlers.reactive, 'endpoint_from_flag')
handlers.ch_core.hookenv.application_name.return_value = "ceph-fs"
handlers.ceph.get_broker_rsp_key.return_value = 'broker-rsp-ceph-fs-0'
ceph_mds = mock.MagicMock()
ceph_mds.fsid = "354ca7c4-f10d-11ee-93f8-1f85f87b7845"
ceph_mds.mon_hosts.return_value = [
"10.5.0.80:6789", "10.5.2.23:6789", "10.5.2.17:6789"]
ceph_mds.all_joined_units.received = {
"auth": "cephx",
"broker-rsp-ceph-fs-0": {
"exit-code": 0,
"key": "AQDvOE5mUfBIKxAAYT73/v7NzwWx2ovLB4nnOg==",
"request-id": "22dd9c7d8c7d392d44866b35219a654006fd90f0"},
"ceph-public-address": "10.143.60.15",
"fsid": "354ca7c4-f10d-11ee-93f8-1f85f87b7845",
"juju-2ffa43-1_mds_key":
"AQDwOE5mmkQ1LBAAVrx4OXWwWM+XmK/KjnJcdA==",
}
cephfs_share = mock.MagicMock()
def mock_eff(flag):
if flag == "ceph-mds.pools.available":
return ceph_mds
elif flag == "cephfs-share.available":
return cephfs_share
else:
raise Exception("invalid input")
self.endpoint_from_flag.side_effect = mock_eff
handlers.cephfs_share_available()
cephfs_share.set_share.assert_called_once_with(
share_info={
"fsid": "354ca7c4-f10d-11ee-93f8-1f85f87b7845",
"name": "ceph-fs",
"path": "/",
"monitor_hosts": [
"10.5.0.80:6789",
"10.5.2.23:6789",
"10.5.2.17:6789"
],
},
auth_info={
"username": "ceph-fs-client",
"key": "AQDvOE5mUfBIKxAAYT73/v7NzwWx2ovLB4nnOg=="
}
)