airshipctl/krm-functions/kubeval-validator/image/extract-openapi.py
Ruslan Aliev f7fca469de Add KRM function to validate site documents
Proposed KRM function can perform static validation of the site
documents based on Kubeval. It allows to specify additional CRD
locations to be able to validate custom resources. As input,
it uses document bundle created from desired phase's document
entry point or output from render command.

Change-Id: Ib76d88a6e6c7f3b29b29cab0abe038eef380686f
Signed-off-by: Ruslan Aliev <raliev@mirantis.com>
Relates-To: #19
2021-04-22 11:13:40 -05:00

137 lines
5.0 KiB
Python
Executable File

#!/usr/bin/python3
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from os import getenv
from os.path import basename, isfile, join
from urllib import request
from urllib.parse import urlparse
from openapi2jsonschema import command
from openapi2jsonschema.command import debug, info, error
from openapi_spec_validator import validate_v3_spec
from openapi_spec_validator.exceptions import OpenAPIValidationError
from ruamel.yaml import YAML
openapi_schema_path = "/workdir/schemas-cache/openapischema"
openapi_schema_pattern = """openapi: 3.0.0
info:
title: title
version: 1.0.1
paths: {}
components:
schemas: {}
"""
crd_kind = "CustomResourceDefinition"
crd_list = "/workdir/schemas-cache/crd-list"
phase_rendered = "phase-rendered.yaml"
rewrite_env = "VALIDATOR_REWRITE_SCHEMAS"
yaml = YAML()
def get_gvk(crd):
""" Extracts group, version(s), kind data from CRD """
group = crd["spec"]["group"].split(".")[0]
kind = crd["spec"]["names"]["kind"].lower()
try:
version = crd["spec"]["version"] # v1beta1 CRD
except KeyError:
version = crd["spec"]["versions"] # v1 CRD
return group, version, kind
def process_crd(crd, schemas, schemas_location, rewrite=False):
""" Processes CRD document, extracts GVK and corresponding OpenAPIV3Schema(s) """
g, v, k = get_gvk(crd) # get GVK as tuple
if isinstance(v, str): # process CRD as v1beta1
try:
gvk = g + '.' + v + '.' + k
kgv = k + "-" + g + "-" + v + ".json"
# do not rewrite schemas by default if already exists
if (not isfile(join(schemas_location, kgv)) and gvk not in schemas) or rewrite:
schemas[gvk] = crd["spec"]["validation"]["openAPIV3Schema"]
debug("Extracting OpenAPIV3Schema for {}".format(gvk))
else:
debug("OpenAPIV3Schema for {} was already processed, skipping".format(gvk))
except KeyError:
error("Cannot find OpenAPIV3Schema for {}".format(k))
return
if isinstance(v, list): # process CRD as v1
for version in v:
try:
gvk = g + '.' + version["name"] + '.' + k
kgv = k + "-" + g + "-" + version["name"] + ".json"
# do not rewrite schemas by default if already exists
if (not isfile(join(schemas_location, kgv)) and gvk not in schemas) or rewrite:
schemas[gvk] = version["schema"]["openAPIV3Schema"]
debug("Extracting OpenAPIV3Schema for {}".format(gvk))
else:
debug("OpenAPIV3Schema for {} was already processed, skipping".format(gvk))
except KeyError:
error("Cannot find OpenAPIV3Schema for {}".format(k))
continue
return
def check_yaml_kind(data):
""" Determines whether a YAML document has CRD kind """
return True if data is not None and "kind" in data and data["kind"] == crd_kind else False
def run():
"""
The main function. Reads CRDs from URLs, intelligently extracts OpenAPIV3Schema(s)
from each CRD, appends OpenAPIV3Schema to a designated file and verifies through OpenAPIValidator
"""
openapi_schema = yaml.load(openapi_schema_pattern)
schemas = openapi_schema["components"]["schemas"]
with open(crd_list, 'r') as crd_list_file: # read file with CRD locations
crd_list_data = yaml.load(crd_list_file)
for crd_path in crd_list_data['crdList']:
crd_data = yaml.load_all(request.urlopen(crd_path).read().decode('utf-8')) # read CRD from URL
for crd in crd_data:
try:
if check_yaml_kind(crd):
process_crd(crd, schemas, crd_list_data["schemasLocation"],
basename(urlparse(crd_path).path) == phase_rendered and getenv(rewrite_env) is not None)
except Exception as exc:
error("An error occurred while processing CRD data from {}\n{}".format(crd_path, exc))
# Validate output V3 spec
try:
validate_v3_spec(openapi_schema)
info("Validation of OpenAPIV3Schemas is successful")
except OpenAPIValidationError as exc:
error("An error occurred while validating OpenAPIV3Schema")
raise exc
# Rewrite openAPI schema file
with open(openapi_schema_path, 'w') as openapi_schema_file:
info("Saving OpenAPIV3Schemas")
yaml.dump(openapi_schema, openapi_schema_file)
# run openapi2jsonschema conversion
command.default()
if __name__ == "__main__":
run()