![Pierre Riteau](/assets/img/avatar_default.png)
If we try to create a new environment from a shared configuration inside the same repository, we can produce an infinite loop by copying the environment into itself. Ignore any existing environments directory during environment creation. Change-Id: Ic9d3d03192f515a055e41351a0bf25da7a7b0684 Story: 2002009 Task: 40038
252 lines
8.2 KiB
Python
252 lines
8.2 KiB
Python
# Copyright (c) 2017 StackHPC Ltd.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import base64
|
|
import glob
|
|
import itertools
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
|
|
import yaml
|
|
|
|
from kayobe import exception
|
|
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
def get_data_files_path(*relative_path):
|
|
"""Given a relative path to a data file, return the absolute path"""
|
|
# Detect editable pip install / python setup.py develop and use a path
|
|
# relative to the source directory
|
|
return os.path.join(_get_base_path(), *relative_path)
|
|
|
|
|
|
def _detect_install_prefix(path):
|
|
script_path = os.path.realpath(path)
|
|
script_path = os.path.normpath(script_path)
|
|
components = script_path.split(os.sep)
|
|
# use heuristic: anything before 'lib' in path is the prefix
|
|
if 'lib' not in components:
|
|
return None
|
|
prefix = itertools.takewhile(
|
|
lambda x: x != "lib",
|
|
components
|
|
)
|
|
prefix_path = os.sep.join(prefix)
|
|
return prefix_path
|
|
|
|
|
|
def _get_base_path():
|
|
override = os.environ.get("KAYOBE_DATA_FILES_PATH")
|
|
if override:
|
|
return os.path.join(override)
|
|
egg_glob = os.path.join(
|
|
sys.prefix, 'lib*', 'python*', '*-packages', 'kayobe.egg-link'
|
|
)
|
|
egg_link = glob.glob(egg_glob)
|
|
if egg_link:
|
|
with open(egg_link[0], "r") as f:
|
|
realpath = f.readline().strip()
|
|
return os.path.join(realpath)
|
|
|
|
prefix = _detect_install_prefix(__file__)
|
|
if prefix:
|
|
return os.path.join(prefix, "share", "kayobe")
|
|
|
|
# Assume uninstalled
|
|
return os.path.join(os.path.realpath(__file__), "..")
|
|
|
|
|
|
def galaxy_install(role_file, roles_path, force=False):
|
|
"""Install Ansible roles via Ansible Galaxy."""
|
|
cmd = ["ansible-galaxy", "install"]
|
|
cmd += ["--roles-path", roles_path]
|
|
cmd += ["--role-file", role_file]
|
|
if force:
|
|
cmd += ["--force"]
|
|
try:
|
|
run_command(cmd)
|
|
except subprocess.CalledProcessError as e:
|
|
LOG.error("Failed to install Ansible roles from %s via Ansible "
|
|
"Galaxy: returncode %d", role_file, e.returncode)
|
|
sys.exit(e.returncode)
|
|
|
|
|
|
def galaxy_remove(roles_to_remove, roles_path):
|
|
|
|
"""Remove Ansible roles via Ansible Galaxy."""
|
|
cmd = ["ansible-galaxy", "remove"]
|
|
cmd += ["--roles-path", roles_path]
|
|
cmd += roles_to_remove
|
|
try:
|
|
run_command(cmd)
|
|
except subprocess.CalledProcessError as e:
|
|
LOG.error("Failed to remove Ansible roles %s via Ansible "
|
|
"Galaxy: returncode %d",
|
|
",".join(roles_to_remove), e.returncode)
|
|
sys.exit(e.returncode)
|
|
|
|
|
|
def read_file(path, mode="r"):
|
|
"""Read the content of a file."""
|
|
with open(path, mode) as f:
|
|
return f.read()
|
|
|
|
|
|
def read_yaml_file(path):
|
|
"""Read and decode a YAML file."""
|
|
try:
|
|
content = read_file(path)
|
|
except IOError as e:
|
|
print("Failed to open config dump file %s: %s" %
|
|
(path, repr(e)))
|
|
sys.exit(1)
|
|
try:
|
|
return yaml.safe_load(content)
|
|
except yaml.YAMLError as e:
|
|
print("Failed to decode config dump YAML file %s: %s" %
|
|
(path, repr(e)))
|
|
sys.exit(1)
|
|
|
|
|
|
def is_readable_dir(path):
|
|
"""Check whether a path references a readable directory."""
|
|
if not os.path.exists(path):
|
|
return {"result": False, "message": "Path does not exist"}
|
|
if not os.path.isdir(path):
|
|
return {"result": False, "message": "Path is not a directory"}
|
|
if not os.access(path, os.R_OK):
|
|
return {"result": False, "message": "Directory is not readable"}
|
|
return {"result": True}
|
|
|
|
|
|
def is_readable_file(path):
|
|
"""Check whether a path references a readable file."""
|
|
if not os.path.exists(path):
|
|
return {"result": False, "message": "Path does not exist"}
|
|
if not os.path.isfile(path):
|
|
return {"result": False, "message": "Path is not a file"}
|
|
if not os.access(path, os.R_OK):
|
|
return {"result": False, "message": "File is not readable"}
|
|
return {"result": True}
|
|
|
|
|
|
def run_command(cmd, quiet=False, check_output=False, **kwargs):
|
|
"""Run a command, checking the output.
|
|
|
|
:param quiet: Redirect output to /dev/null
|
|
:param check_output: Whether to return the output of the command
|
|
:returns: The output of the command if check_output is true
|
|
"""
|
|
if isinstance(cmd, str):
|
|
cmd_string = cmd
|
|
else:
|
|
cmd_string = " ".join(cmd)
|
|
LOG.debug("Running command: %s", cmd_string)
|
|
if quiet:
|
|
with open("/dev/null", "w") as devnull:
|
|
kwargs["stdout"] = devnull
|
|
kwargs["stderr"] = devnull
|
|
subprocess.check_call(cmd, **kwargs)
|
|
elif check_output:
|
|
return subprocess.check_output(cmd, **kwargs)
|
|
else:
|
|
subprocess.check_call(cmd, **kwargs)
|
|
|
|
|
|
def quote_and_escape(value):
|
|
"""Quote and escape a string.
|
|
|
|
Adds enclosing single quotes to the string passed, and escapes single
|
|
quotes within the string using backslashes. This is useful for passing
|
|
'extra vars' to Ansible. Without this, Ansible only uses the part of the
|
|
string up to the first whitespace.
|
|
|
|
:param value: the string to quote and escape.
|
|
:returns: the quoted and escaped string.
|
|
"""
|
|
if not isinstance(value, str):
|
|
return value
|
|
return "'" + value.replace("'", "'\\''") + "'"
|
|
|
|
|
|
def escape_jinja(string):
|
|
"""Escapes a string so that jinja template variables are not expanded
|
|
|
|
:param string: the string to escape
|
|
:return: the escaped string
|
|
"""
|
|
# We base64 encode the string to avoid the need to escape characters.
|
|
# This is because ansible has some parsing quirks that makes it fairly
|
|
# hard to escape stuff in generic way.
|
|
# See: https://github.com/ansible/ansible/issues/10464
|
|
|
|
b64_value = base64.b64encode(string.encode())
|
|
return ''.join(('{{', "'", b64_value.decode(), "' | b64decode ", '}}'))
|
|
|
|
|
|
def intersect_limits(args_limit, cli_limit):
|
|
"""Create an Ansible host pattern of the intersection of two patterns.
|
|
|
|
:param args_limit: user-specified limit, or None.
|
|
:param cli_limit: limit originating from this CLI, or None.
|
|
:returns: a string representing an intersection of the two patterns.
|
|
"""
|
|
# NOTE(mgoddard): Ansible uses either commas (,) or colons (:) to separate
|
|
# parts of a host pattern. An intersection is specified using a separator
|
|
# followed by an ampersand (&). If a mix of comma and colon separators is
|
|
# used, Ansible picks one and treats the other as part of the host pattern.
|
|
# This leads to hard to diagnose errors. Try to determine which separator
|
|
# the user has specified, and be consistent. Error if both are used.
|
|
if args_limit and ',' in args_limit:
|
|
if ':' in args_limit:
|
|
raise exception.Error("Invalid format for host limit argument. "
|
|
"Cannot mix commas and colons to separate "
|
|
"hosts")
|
|
separator = ',&'
|
|
else:
|
|
separator = ':&'
|
|
limits = [l for l in [args_limit, cli_limit] if l]
|
|
return separator.join(limits)
|
|
|
|
|
|
def copy_dir(src, dest, exclude=None):
|
|
"""Copy recursively a directory.
|
|
|
|
:param src: path of the source directory
|
|
:param dest: destination path, will be created if it does not exist
|
|
:param exclude: names of files or directories at the root of the source
|
|
directory to exclude during copy
|
|
"""
|
|
if exclude is None:
|
|
exclude = []
|
|
|
|
if not os.path.isdir(dest):
|
|
os.mkdir(dest)
|
|
|
|
for file in os.listdir(src):
|
|
if file in exclude:
|
|
continue
|
|
|
|
src_path = os.path.join(src, file)
|
|
dest_path = os.path.join(dest, file)
|
|
if os.path.isdir(src_path):
|
|
copy_dir(src_path, dest_path)
|
|
else:
|
|
shutil.copy2(src_path, dest_path)
|