#!/usr/bin/env python # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # Yet another cloud deployment tool import datetime import Elastic import Grafana import json import logging import time import Tools import WorkloadBase from openstack import connection from openstack import exceptions import os import requests try: from ostag import ostag except ImportError: ostag = None from collections import deque class Yoda(WorkloadBase.WorkloadBase): def __init__(self, config): self.logger = logging.getLogger('browbeat.yoda') self.config = config self.tools = Tools.Tools(self.config) self.grafana = Grafana.Grafana(self.config) self.elastic = Elastic.Elastic(self.config, self.__class__.__name__.lower()) self.error_count = 0 self.pass_count = 0 self.test_count = 0 self.scenario_count = 0 def get_stats(self): self.logger.info( "Current number of YODA tests executed: {}".format( self.test_count)) self.logger.info( "Current number of YODA tests passed: {}".format( self.pass_count)) self.logger.info( "Current number of YODA tests failed: {}".format( self.error_count)) def update_tests(self): self.test_count += 1 self.update_total_tests() def update_pass_tests(self): self.pass_count += 1 self.update_total_pass_tests() def update_fail_tests(self): self.error_count += 1 self.update_total_fail_tests() def update_scenarios(self): self.scenario_count += 1 self.update_total_scenarios() def state_tracker_extend(self, state, state_list): if state is None: return state_list elif state_list is None: return [state] elif state in state_list[-1]: return state_list else: state_list.append(state) return state_list def node_is_cleaning(self, provision_state): ret = provision_state is not None ret = ret and 'clean' in provision_state ret = ret and 'fail' not in provision_state return ret def is_cleaning(self, conn): for node in conn.bare_metal.nodes(): if self.node_is_cleaning(node.provision_state): return True return False def failed_cleaning_count(self, conn): count = 0 for node in conn.bare_metal.nodes(): if self.node_is_cleaning(node.provision_state): count += 1 return count def wait_for_clean(self, env_setup, conn): wait_time = 1 # 15 minute timeout timeout = (60 * 15) while self.is_cleaning(conn): # Cleans can fail, so we just try again if wait_time % 1000 == 0: self.set_ironic_node_state("manage", env_setup, conn) time.sleep(30) self.set_ironic_node_state("provide", env_setup, conn) time.sleep(1) wait_time += 1 if wait_time > timeout: self.logger.error("Node Cleaning failed") exit(1) # Required to use console commands because of this bug # https://bugs.launchpad.net/python-openstacksdk/+bug/1668767 def set_ironic_node_state(self, state, env_setup, conn, node_uuid=""): if node_uuid != "": nodes = [node_uuid] else: nodes = deque(map(lambda node: node.id, conn.bare_metal.nodes())) if state == "manage": cmd_base = "{} openstack baremetal node manage {}" for _ in range(len(nodes)): node = nodes.pop() node_obj = conn.bare_metal.get_node(node) if "manage" not in node_obj.provision_state: nodes.append(node) elif state == "provide": cmd_base = "{} openstack baremetal node provide {}" for _ in range(len(nodes)): node = nodes.pop() node_obj = conn.bare_metal.get_node(node) prov_state = node_obj.provision_state if prov_state is not None and "available" not in prov_state: nodes.append(node) elif state == "inspect": cmd_base = "{} openstack baremetal introspection start {}" elif state == "off": cmd_base = "{} openstack baremetal node power off {}" for _ in range(len(nodes)): node = nodes.pop() node_obj = conn.bare_metal.get_node(node) if "off" not in node_obj.power_state: nodes.append(node) elif state == "on": cmd_base = "{} openstack baremetal node power on {}" for _ in range(len(nodes)): node = nodes.pop() node_obj = conn.bare_metal.get_node(node) if "on" not in node_obj.power_state: nodes.append(node) elif state == "delete": cmd_base = "{} openstack baremetal node delete {}" else: self.logger.error("set_ironic_node_state() called with invalid state") exit(1) for node in nodes: cmd = cmd_base.format(env_setup, node) self.tools.run_async_cmd(cmd) time.sleep(.5) # Gathers metrics on the instack env import def import_instackenv(self, filepath, env_setup, conn): results = {} filepath = os.path.abspath(os.path.expandvars(filepath)) cmd = "{} openstack overcloud node import {}".format(env_setup, filepath) start_time = datetime.datetime.utcnow() out = self.tools.run_cmd(cmd) nodes = conn.bare_metal.nodes() for node in nodes: while 'enroll' in node.provision_state: node = conn.bare_metal.get_node(node) time.sleep(1) end_time = datetime.datetime.utcnow() results['import_time'] = (end_time - start_time).total_seconds() if out['stderr'] == '' or 'Error' not in out['stderr']: results['import_status'] = "success" else: results['import_status'] = "failure" self.logger.error("Instackenv import returned 1, printing stderr") self.logger.error(out['stderr']) return results # Introspection with exactly the documented workflow def introspection_bulk(self, timeout, env_setup, conn): results = {} nodes = deque(map(lambda node: node.id, conn.bare_metal.nodes())) cmd = "{} openstack overcloud node introspect --all-manageable".format(env_setup) results['nodes'] = {} for node in conn.bare_metal.nodes(details=True): results['nodes'][node.id] = {} results['nodes'][node.id]["last_error"] = node.last_error results['nodes'][node.id]["driver"] = node.driver results['nodes'][node.id]["driver_info"] = node.driver_info results['nodes'][node.id]["properties"] = node.properties results['nodes'][node.id]["failures"] = 0 results['nodes'][node.id]["state_list"] = None self.tools.run_async_cmd(cmd) out = self.watch_introspecting_nodes(nodes, timeout, conn, results) failed = out[0] results['raw'] = out[1] results["failure_count"] = len(failed) return results def watch_introspecting_nodes(self, nodes, timeout, conn, results): start_time = datetime.datetime.utcnow() times = [] timeout = datetime.timedelta(seconds=timeout) while len(nodes): node = nodes.pop() # rate limit time.sleep(1) node_obj = conn.bare_metal.get_node(node) if node_obj is None: self.logger.error("Can't find node " + node + " Which existed at the start of introspection \ did you delete it manually?") continue # == works here for string comparison because they are in fact # the same object if not changed stored_properties = str(results['nodes'][node_obj.id]["properties"]) node_properties = str(node_obj.properties) changed = not stored_properties == node_properties powered_off = 'off' in node_obj.power_state not_cleaning = 'clean' not in node_obj.provision_state if changed and powered_off and not_cleaning: results['nodes'][node_obj.id]["properties"] = node_obj.properties results['nodes'][node_obj.id]["state_list"] = \ self.state_tracker_extend(node_obj.provision_state, results['nodes'][node_obj.id]["state_list"]) times.append((datetime.datetime.utcnow() - start_time).total_seconds()) elif (datetime.datetime.utcnow() - start_time) > timeout: for node in nodes: node_obj = conn.bare_metal.get_node(node) results['nodes'][node_obj.id]['failures'] += 1 if results['nodes'][node_obj.id]['failures'] > 10: self.logger.error("Node " + node_obj.id + "has failed more than 10 introspections") self.logger.error("This probably means it's misconfigured, exiting") exit(1) break else: results['nodes'][node_obj.id]["state_list"] = \ self.state_tracker_extend(node_obj.provision_state, results['nodes'][node_obj.id]["state_list"]) nodes.appendleft(node) return (nodes, times) # Introspection with robust failure handling def introspection_individual(self, batch_size, timeout, env_setup, conn): nodes = deque(map(lambda node: node.id, conn.bare_metal.nodes())) failure_count = 0 batch = deque() results = {} results['raw'] = [] results['nodes'] = {} for node in conn.bare_metal.nodes(details=True): results['nodes'][node.id] = {} results['nodes'][node.id]["last_error"] = node.last_error results['nodes'][node.id]["driver"] = node.driver results['nodes'][node.id]["driver_info"] = node.driver_info results['nodes'][node.id]["properties"] = node.properties results['nodes'][node.id]["failures"] = 0 results['nodes'][node.id]["state_list"] = None while len(nodes): node = nodes.pop() self.set_ironic_node_state("inspect", env_setup, conn, node) batch.append(node) if len(batch) >= batch_size or (len(nodes) == 0 and len(batch) != 0): out = self.watch_introspecting_nodes(batch, timeout, conn, results) failed = out[0] results['raw'].extend(out[1]) failure_count = failure_count + len(failed) nodes.extend(failed) batch.clear() results["failure_count"] = failure_count return results def delete_stack(self, conn): wait_time = 0 # 30 minute timeout timeout = (60 * 30) while conn.orchestration.find_stack("overcloud") is not None: # Deletes can fail, so we just try again if wait_time % 2000 == 0: conn.orchestration.delete_stack("overcloud") time.sleep(5) wait_time += 5 if wait_time > timeout: self.logger.error("Overcloud stack delete failed") exit(1) def setup_nodes_dict(self, benchmark): nodes = {} for service in benchmark['cloud']: nodes[service['node']] = service['start_scale'] nodes["previous_" + service['node']] = -1 return nodes def update_nodes_dict(self, benchmark, nodes, changed): # update settings for next round, note if changes are made step = benchmark['step'] nodes_added = 0 for service in benchmark['cloud']: node_type = service['node'] end_scale = service['end_scale'] nodes["previous_" + node_type] = nodes[node_type] if nodes[node_type] < end_scale: difference = end_scale - nodes[node_type] allowed_difference = step - nodes_added add = min(difference, allowed_difference) nodes[node_type] += add nodes_added += add changed = True # edge cases, note we must round up otherwise we get # stuck forever if step is 1, this also means we must # violate the step rules to both ensure a valid deployment # and progression if 'control' in nodes and nodes['control'] == 2: nodes['control'] = 3 if 'ceph' in nodes and nodes['ceph'] > 0 and nodes['ceph'] < 3: nodes['ceph'] = 3 return (nodes, changed) def deploy_overcloud(self, start_time, results, ntp_server, conn, env_setup, benchmark): if type(ntp_server) != str: self.logger.error("Please configure an NTP server!") exit(1) cmd = env_setup + "openstack overcloud deploy --templates " for template in benchmark['templates']: cmd = cmd + " " + template + " " for service in benchmark['cloud']: cmd = cmd + " --" + service['node'] + "-scale " + str(results[service['node']]) cmd = cmd + " --timeout=" + str(benchmark['timeout']) + " --ntp-server=" + str(ntp_server) self.logger.debug("Openstack deployment command is " + cmd) results["overcloud_deploy_command"] = cmd deploy_process = self.tools.run_async_cmd(cmd) results['cleaning_failures'] = self.failed_cleaning_count(conn) results['nodes'] = {} while deploy_process.poll() is None: time.sleep(5) try: for node in conn.compute.servers(): time.sleep(1) # look for new instances to add to our metadata if node.name not in results['nodes']: results['nodes'][node.name] = {} create_time = datetime.datetime.strptime(node.created_at, "%Y-%m-%dT%H:%M:%SZ") results['nodes'][node.name]['created_at'] = \ (create_time - start_time).total_seconds() results['nodes'][node.name]['scheduler_hints'] = \ node.scheduler_hints results['nodes'][node.name]['state_list'] = None # try and figure out which baremetal node this # instance is scheduled on if 'bm_node' not in results['nodes'][node.name]: try: bm_node = next(conn.bare_metal.nodes(details=True, instance_id=node.id)) results['nodes'][node.name]['bm_node'] = \ bm_node.id results['nodes'][node.name]['bm_node_properties'] = \ bm_node.properties results['nodes'][node.name]['bm_node_driver'] = \ bm_node.driver results['nodes'][node.name]['bm_last_error'] = \ bm_node.last_error except StopIteration: continue update_time = datetime.datetime.strptime(node.updated_at, "%Y-%m-%dT%H:%M:%SZ") results['nodes'][node.name]['last_updated_at'] = \ (update_time - start_time).total_seconds() results['nodes'][node.name]['final_status'] = node.status bm_node = next(conn.bare_metal.nodes(details=True, instance_id=node.id)) state_list = results['nodes'][node.name]['state_list'] state_list = \ self.state_tracker_extend(bm_node.provision_state, state_list) rentry = results['nodes'][node.name] # Populate this field so it gets indexed every time # even if nodes are never pingable rentry['ping_time'] = -1 condition = 'private' in node.addresses condition = condition and 'pingable_at' not in rentry ping = self.tools.is_pingable(node.addresses['private']) condition = condition and ping if condition: ping_time = datetime.datetime.utcnow() rentry['ping_time'] = (ping_time - start_time).total_seconds() except exceptions.HttpException: self.logger.error("OpenStack bare_metal API is returning NULL") self.logger.error("This sometimes happens during stack creates") return results def elastic_insert(self, results, run, start_time, benchmark, results_dir): scenario_name = benchmark['name'] results['action'] = scenario_name.strip() results['browbeat_rerun'] = run results['timestamp'] = str(start_time).replace(" ","T") results['grafana_url'] = self.grafana.grafana_urls() results['scenario'] = benchmark['name'] results['scenario_config'] = benchmark # Create list of objects for Elastic insertion rather than # dict of dicts. Insert key to not lose name data nodes_data = [] for key in results['nodes']: results['nodes'][key]['name'] = key nodes_data.append(results['nodes'][key]) results['nodes'] = nodes_data results = self.elastic.combine_metadata(results) if not self.elastic.index_result(results, scenario_name, results_dir): self.update_index_failures() def dump_scenario_json(self, results_dir, json, time): with open(results_dir + "/" + str(time).strip() + ".json", 'w') as outfile: outfile.write(json) def setup_scenario(self, benchmark_name, dir_ts): results_dir = self.tools.create_results_dir(self.config['browbeat']['results'], dir_ts, benchmark_name, benchmark_name) if type(results_dir) is bool: self.logger.error("Malformed Config, benchmark names must be unique!") exit(1) self.logger.debug("Created result directory: {}".format(results_dir)) workload = self.__class__.__name__ self.workload_logger(results_dir, workload) return results_dir def introspection_workload(self, benchmark, run, results_dir, env_setup, conn): self.delete_stack(conn) self.wait_for_clean(env_setup, conn) test_start = datetime.datetime.utcnow() self.wait_for_clean(env_setup, conn) self.set_ironic_node_state("delete", env_setup, conn) while len(list(conn.bare_metal.nodes())) > 0: time.sleep(5) import_results = self.import_instackenv(benchmark['instackenv'], env_setup, conn) self.set_ironic_node_state("manage", env_setup, conn) self.set_ironic_node_state("off", env_setup, conn) if benchmark['method'] == "individual": introspection_results = self.introspection_individual(benchmark['batch_size'], benchmark['timeout'], env_setup, conn) elif benchmark['method'] == "bulk": introspection_results = self.introspection_bulk(benchmark['timeout'], env_setup, conn) else: self.logger.error("Malformed YODA configuration for " + benchmark['name']) exit(1) self.get_stats() # Combines dicts but mutates introspection_results rather than # returning a new value import_results.update(introspection_results) results = import_results results['total_nodes'] = len(list(map(lambda node: node.id, conn.bare_metal.nodes()))) # If maximum failure precentage is not set, we set it to 10% if 'max_fail_amnt' not in benchmark: benchmark['max_fail_amnt'] = .10 if results['failure_count'] >= results['total_nodes'] * benchmark['max_fail_amnt']: self.update_fail_tests() else: self.update_pass_tests() self.update_tests() self.dump_scenario_json(results_dir, json.dumps(results), test_start) if self.config['elasticsearch']['enabled']: self.elastic_insert(results, run, test_start, benchmark, results_dir) def overcloud_workload(self, benchmark, run, results_dir, env_setup, conn): if conn.orchestration.find_stack("overcloud") is None: self.set_ironic_node_state("provide", env_setup, conn) self.wait_for_clean(env_setup, conn) keep_stack = benchmark['keep_stack'] results = self.setup_nodes_dict(benchmark) changed = True while changed: changed = False # Can't scale from HA to non HA or back control_change = results['control'] != results['previous_control'] if keep_stack and not control_change: results['method'] = "update" else: self.delete_stack(conn) self.wait_for_clean(env_setup, conn) results['method'] = "new" start_time = datetime.datetime.utcnow() if 'node_pinning' in benchmark: if ostag is None: self.logger.error("ostag is not installed please run") self.logger.error(" pip install git+https://github.com/jkilpatr/ostag") self.logger.error("Pinning not used in this test!") elif benchmark['node_pinning']: ostag.clear_tags(conn) for node in benchmark['cloud']: ostag.mark_nodes("", node['node'], conn, False, "", node['end_scale']) else: ostag.clear_tags(conn) results = self.deploy_overcloud(start_time, results, benchmark['ntp_server'], conn, env_setup, benchmark) results['total_time'] = (datetime.datetime.utcnow() - start_time).total_seconds() results['result'] = str(conn.orchestration.find_stack("overcloud").status) results['result_reason'] = str(conn.orchestration.find_stack("overcloud").status_reason) results['total_nodes'] = len(list(map(lambda node: node.id, conn.bare_metal.nodes()))) if "COMPLETE" in results['result']: self.update_pass_tests() else: self.update_fail_tests() self.update_tests self.get_stats() self.tools.gather_metadata() self.dump_scenario_json(results_dir, json.dumps(results), start_time) if self.config['elasticsearch']['enabled']: self.elastic_insert(results, run, start_time, benchmark, results_dir) out = self.update_nodes_dict(benchmark, results, changed) results = out[0] changed = out[1] def start_workloads(self): """Iterates through all yoda scenarios in browbeat yaml config file""" self.logger.info("Starting YODA workloads") es_ts = datetime.datetime.utcnow() dir_ts = es_ts.strftime("%Y%m%d-%H%M%S") self.logger.debug("Time Stamp (Prefix): {}".format(dir_ts)) stackrc = self.config.get('yoda')['stackrc'] venv = self.config.get('yoda')['venv'] env_setup = "source {}; source {};".format(stackrc,venv) auth_vars = self.tools.load_stackrc(stackrc) if 'OS_AUTH_URL' not in auth_vars: self.logger.error("Please make sure your stackrc is configured correctly") exit(1) auth_args = { 'auth_url': auth_vars['OS_AUTH_URL'], 'project_name': 'admin', 'username': auth_vars['OS_USERNAME'], 'password': auth_vars['OS_PASSWORD'], 'verify': False } requests.packages.urllib3.disable_warnings() conn = connection.Connection(**auth_args) instackenv = self.config.get('yoda')['instackenv'] benchmarks = self.config.get('yoda')['benchmarks'] if (benchmarks is not None and len(benchmarks) > 0): for benchmark in benchmarks: if benchmark['enabled']: results_dir = self.setup_scenario(benchmark['name'], dir_ts) times = benchmark['times'] if 'instackenv' not in benchmark: benchmark['instackenv'] = instackenv for rerun in range(self.config['browbeat']['rerun']): for run in range(times): self.update_tests() if benchmark['type'] == "overcloud": self.overcloud_workload(benchmark, run, results_dir, env_setup, conn) elif benchmark['type'] == "introspection": self.introspection_workload(benchmark, run, results_dir, env_setup, conn) else: self.logger.error("Could not identify YODA workload!") exit(1) self.update_scenarios() else: self.logger.info( "Skipping {} benchmarks enabled: false".format(benchmark['name'])) else: self.logger.error("Config file contains no yoda benchmarks.")