dd1b4d78fa
These updates, on the master branch, are to support testing the caracal packages and support of the charms for caracal. They do NOT lock the charms down, and don't change the testing branches to stable branches. Update unit-test to deal with Py3.11 which is run on Debian Buster and doesn't have /etc/lsb/release file (incorrect mocking issue). Change-Id: Icddaf9f7b091a09ef4627384cd349e43b34b1325
151 lines
4.2 KiB
Python
151 lines
4.2 KiB
Python
# Copyright 2014-2021 Canonical Limited.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""
|
|
Charm helpers snap for classic charms.
|
|
|
|
If writing reactive charms, use the snap layer:
|
|
https://lists.ubuntu.com/archives/snapcraft/2016-September/001114.html
|
|
"""
|
|
import subprocess
|
|
import os
|
|
from time import sleep
|
|
from charmhelpers.core.hookenv import log
|
|
|
|
__author__ = 'Joseph Borg <joseph.borg@canonical.com>'
|
|
|
|
# The return code for "couldn't acquire lock" in Snap
|
|
# (hopefully this will be improved).
|
|
SNAP_NO_LOCK = 1
|
|
SNAP_NO_LOCK_RETRY_DELAY = 10 # Wait X seconds between Snap lock checks.
|
|
SNAP_NO_LOCK_RETRY_COUNT = 30 # Retry to acquire the lock X times.
|
|
SNAP_CHANNELS = [
|
|
'edge',
|
|
'beta',
|
|
'candidate',
|
|
'stable',
|
|
]
|
|
|
|
|
|
class CouldNotAcquireLockException(Exception):
|
|
pass
|
|
|
|
|
|
class InvalidSnapChannel(Exception):
|
|
pass
|
|
|
|
|
|
def _snap_exec(commands):
|
|
"""
|
|
Execute snap commands.
|
|
|
|
:param commands: List commands
|
|
:return: Integer exit code
|
|
"""
|
|
assert isinstance(commands, list)
|
|
|
|
retry_count = 0
|
|
return_code = None
|
|
|
|
while return_code is None or return_code == SNAP_NO_LOCK:
|
|
try:
|
|
return_code = subprocess.check_call(['snap'] + commands,
|
|
env=os.environ)
|
|
except subprocess.CalledProcessError as e:
|
|
retry_count += + 1
|
|
if retry_count > SNAP_NO_LOCK_RETRY_COUNT:
|
|
raise CouldNotAcquireLockException(
|
|
'Could not acquire lock after {} attempts'
|
|
.format(SNAP_NO_LOCK_RETRY_COUNT))
|
|
return_code = e.returncode
|
|
log('Snap failed to acquire lock, trying again in {} seconds.'
|
|
.format(SNAP_NO_LOCK_RETRY_DELAY), level='WARN')
|
|
sleep(SNAP_NO_LOCK_RETRY_DELAY)
|
|
|
|
return return_code
|
|
|
|
|
|
def snap_install(packages, *flags):
|
|
"""
|
|
Install a snap package.
|
|
|
|
:param packages: String or List String package name
|
|
:param flags: List String flags to pass to install command
|
|
:return: Integer return code from snap
|
|
"""
|
|
if type(packages) is not list:
|
|
packages = [packages]
|
|
|
|
flags = list(flags)
|
|
|
|
message = 'Installing snap(s) "%s"' % ', '.join(packages)
|
|
if flags:
|
|
message += ' with option(s) "%s"' % ', '.join(flags)
|
|
|
|
log(message, level='INFO')
|
|
return _snap_exec(['install'] + flags + packages)
|
|
|
|
|
|
def snap_remove(packages, *flags):
|
|
"""
|
|
Remove a snap package.
|
|
|
|
:param packages: String or List String package name
|
|
:param flags: List String flags to pass to remove command
|
|
:return: Integer return code from snap
|
|
"""
|
|
if type(packages) is not list:
|
|
packages = [packages]
|
|
|
|
flags = list(flags)
|
|
|
|
message = 'Removing snap(s) "%s"' % ', '.join(packages)
|
|
if flags:
|
|
message += ' with options "%s"' % ', '.join(flags)
|
|
|
|
log(message, level='INFO')
|
|
return _snap_exec(['remove'] + flags + packages)
|
|
|
|
|
|
def snap_refresh(packages, *flags):
|
|
"""
|
|
Refresh / Update snap package.
|
|
|
|
:param packages: String or List String package name
|
|
:param flags: List String flags to pass to refresh command
|
|
:return: Integer return code from snap
|
|
"""
|
|
if type(packages) is not list:
|
|
packages = [packages]
|
|
|
|
flags = list(flags)
|
|
|
|
message = 'Refreshing snap(s) "%s"' % ', '.join(packages)
|
|
if flags:
|
|
message += ' with options "%s"' % ', '.join(flags)
|
|
|
|
log(message, level='INFO')
|
|
return _snap_exec(['refresh'] + flags + packages)
|
|
|
|
|
|
def valid_snap_channel(channel):
|
|
""" Validate snap channel exists
|
|
|
|
:raises InvalidSnapChannel: When channel does not exist
|
|
:return: Boolean
|
|
"""
|
|
if channel.lower() in SNAP_CHANNELS:
|
|
return True
|
|
else:
|
|
raise InvalidSnapChannel("Invalid Snap Channel: {}".format(channel))
|