diff --git a/etc/valet/api/app.apache2 b/etc/valet/api/valet_apache.conf similarity index 68% rename from etc/valet/api/app.apache2 rename to etc/valet/api/valet_apache.conf index 185e510..3f84c31 100644 --- a/etc/valet/api/app.apache2 +++ b/etc/valet/api/valet_apache.conf @@ -1,19 +1,18 @@ # valet user/group required (or substitute as needed). # Place in /opt/apache2/sites-available, symlink from # /opt/apache2/sites-enabled, and run 'apachectl restart' as root. -# Optional: Append python-path=PATH_TO_VENV_PACKAGES to WSGIDaemonProcess +# Optional: Append python-path=PATH_TO_VENV_PACKAGES to WSGIDaemonProcess Listen 8090 -ServerName valet - ServerName valet - WSGIDaemonProcess valet user=m04060 group=m04060 threads=20 + + WSGIDaemonProcess valet user=valet_user group=valet_user threads=30 WSGIScriptAlias / /var/www/valet/app.wsgi - SetEnv APACHE_RUN_USER m04060 - SetEnv APACHE_RUN_GROUP m04060 + SetEnv APACHE_RUN_USER valet_user + SetEnv APACHE_RUN_GROUP valet_user WSGIProcessGroup valet @@ -23,7 +22,7 @@ ServerName valet Allow from all - ErrorLog /var/log/valet/api.log + ErrorLog /var/log/apache2/valet/error.log LogLevel warn - CustomLog /var/log/valet/access.log combined + CustomLog /var/log/apache2/valet/access.log combined diff --git a/etc/valet/valet.conf b/etc/valet/valet.conf index 70c827e..16d78b2 100644 --- a/etc/valet/valet.conf +++ b/etc/valet/valet.conf @@ -4,120 +4,23 @@ logging_default_format_string='%(asctime)s.%(msecs)03d [%(levelname)-5.5s] [%(na use_stderr=False log_dir=/var/log/valet -# __ -# /_\ |__| | -# / \ | | -# - -[server] -host = 0.0.0.0 -port = 8090 [messaging] -username = rabbitmq_username -password = rabbitmq_psw -host = rabbitmq_host -port = rabbitmq_port +username=rabbit_userid +password=rabbit_password +host=rabbit_host +port=rabbit_port [identity] -project_name = project_name -username = project_username -password = project_username_password -auth_url = http://keystone_host:5000/v2.0 -# interface = admin - -# _ _ -# | \ |_\ -# |_/ |_/ -# +project_name=identity_project +username=identity_user +password=identity_password +auth_url=auth_uri [music] -host = music_host -port = 8080 -keyspace = valet_keyspace -replication_factor = 3 -# tries = 100 -# interval = 0.1 -# request_table = placement_requests -# response_table = placement_results -# event_table = oslo_messages -# resource_table = resource_status -# app_table = app -# resource_index_table = resource_log_index -# app_index_table = app_log_index -# uuid_table = uuid_map - - -# __ __ __ -# |__ |\ | | | |\ | |__ -# |__ | \| |__T | | \| |__ -# +host=music_host +port=music_port +keyspace=db_keyspace [engine] -# Used for Ostro active/passive selection -priority = 1 - -# Set the location of daemon process id -pid = /var/run/valet/ostro-daemon.pid - -# Set IP of this Ostro -# ip = localhost - -# health_timeout = 10 - -#------------------------------------------------------------------------------------------------------------ -# Management configuration -#------------------------------------------------------------------------------------------------------------ -# Inform the name of datacenter (region name), where Valet/Ostro is deployed. -# datacenter_name = aic - -# Set the naming convention rules. -# Currently, 3 chars of CLLI + region number + 'r' + rack id number + 1 char of node type + node id number. -# For example, pdk15r05c001 indicates the first KVM compute server (i.e., 'c001') in the fifth rack -# (i.e., 'r05') in the fifteenth DeKalb-Peachtree Airport Region (i.e., 'pdk15'). - -# Set the number of chars that indicates the region code. The above example, 'pdk' is the region code. -# num_of_region_chars = 3 - -# Set 1 char of rack indicator. This should be 'r'. -# rack_code_list = r - -# Set all of chars, each of which indicates the node type. -# Currently, 'a' = network, 'c' = KVM compute, 'u' = ESXi compute, 'f' = ?, 'o' = operation, 'p' = power, -# 's' = storage. -# node_code_list = a,c,u,f,o,p,s - -# Set trigger time or frequency for checking compute hosting server status (i.e., call Nova) -# Note that currently, compute (Nova) should be triggered first then trigger topology. -# compute_trigger_time = 01:00 -# compute_trigger_frequency = 14400 - -# Set trigger time or frequency for checking datacenter topology -# topology_trigger_time = 01:40 -# topology_trigger_frequency = 28800 - -# Set default overbooking ratios. Note that each compute node can have its own ratios. -# default_cpu_allocation_ratio = 16 -# default_ram_allocation_ratio = 1.5 -# default_disk_allocation_ratio = 1 - -# Set static unused percentages of resources (i.e., standby) that are set aside for applications's workload spikes. -# static_cpu_standby_ratio = 0 -# static_mem_standby_ratio = 0 -# static_local_disk_standby_ratio = 0 - -# Set Ostro execution mode -# mode = [live|sim], sim will let Ostro simulate datacenter, while live will let it handle a real datacenter -# mode = live -# Set the location of simulation configuration file (i.e., ostro_sim.cfg). -# This is used only when the simulation mode -# sim_cfg_loc = /etc/valet/engine/ostro_sim.cfg - -# Inform whether network controller (i.e., Tegu) has been deployed. -# If it does, set its API, Otherwise ignore these parameters -# network_control = no -# network_control_api = 29444/tegu/api - -# Set RPC server ip and port if used. Otherwise, ignore these parameters -# rpc_server_ip = localhost -# rpc_server_port = 8002 +priority=engine_priority diff --git a/tools/utils/pecan_populate.sh b/tools/utils/pecan_populate.sh new file mode 100644 index 0000000..7fac570 --- /dev/null +++ b/tools/utils/pecan_populate.sh @@ -0,0 +1,13 @@ +PARSED_VAL=`grep keyspace /etc/valet/valet.conf |awk 'BEGIN{FS="="}{print $NF}'` +KEYSPACE_NAME="$PARSED_VAL" +sed -i -e "s/#VALET_KEYSPACE#/${KEYSPACE_NAME}/g" /opt/app/aic-valet-tools/populate.cql +/usr/bin/cqlsh -f /opt/app/aic-valet-tools/populate.cql +cassandra_cnt=`/usr/bin/cqlsh -e "describe KEYSPACE ${KEYSPACE_NAME};"|grep -c CREATE` +if [ $cassandra_cnt -gt 12 ]; then + exit $cassandra_cnt +fi +/usr/bin/cqlsh -e "drop KEYSPACE ${KEYSPACE_NAME};" +sleep 5 +pecan populate /var/www/valet/config.py >> /var/log/valet/pecan_populate.out +sleep 5 +exit $cassandra_cnt diff --git a/valet/api/v1/controllers/groups.py b/valet/api/v1/controllers/groups.py index 74887b6..0d4912f 100644 --- a/valet/api/v1/controllers/groups.py +++ b/valet/api/v1/controllers/groups.py @@ -74,16 +74,13 @@ def tenant_servers_in_group(tenant_id, group): server_list = server_list_for_group(group) nova = nova_client() for server_id in server_list: - if server_id == "none": - servers.append(server_id) - else: - try: - server = nova.servers.get(server_id) - if server.tenant_id == tenant_id: - servers.append(server_id) - except Exception as ex: # TODO(JD): update DB - api.LOG.error("Instance %s could not be found" % server_id) - api.LOG.error(ex) + try: + server = nova.servers.get(server_id) + if server.tenant_id == tenant_id: + servers.append(server_id) + except Exception as ex: # TODO(JD): update DB + api.LOG.error("Instance %s could not be found" % server_id) + api.LOG.error(ex) if len(servers) > 0: return servers diff --git a/valet/engine/optimizer/app_manager/app_topology_parser.py b/valet/engine/optimizer/app_manager/app_topology_parser.py index 1b8e7f1..1be6091 100755 --- a/valet/engine/optimizer/app_manager/app_topology_parser.py +++ b/valet/engine/optimizer/app_manager/app_topology_parser.py @@ -153,7 +153,7 @@ class Parser(object): vgroups[vgroup.uuid] = vgroup - self.logger.debug("group = " + vgroup.name) + self.logger.debug("group = " + vgroup.name + vgroup.name + ", type = " + vgroup.vgroup_type) vgroup_captured = True self._set_vm_links(_elements, vms) diff --git a/valet/engine/optimizer/db_connect/music_handler.py b/valet/engine/optimizer/db_connect/music_handler.py index 5add75a..e7eb03e 100644 --- a/valet/engine/optimizer/db_connect/music_handler.py +++ b/valet/engine/optimizer/db_connect/music_handler.py @@ -42,9 +42,13 @@ class MusicHandler(object): if self.config.mode.startswith("sim"): self.music = Music() elif self.config.mode.startswith("live"): - self.music = Music( - hosts=self.config.db_hosts, - replication_factor=self.config.replication_factor) + self.music = Music(hosts=self.config.db_hosts, + replication_factor=self.config.replication_factor) + if self.config.db_hosts is not None and len(self.config.db_hosts) > 0: + for dbh in self.config.db_hosts: + self.logger.debug("DB: music host = " + dbh) + if self.config.replication_factor is not None: + self.logger.debug("DB: music replication factor = " + str(self.config.replication_factor)) def init_db(self): """Init Database. diff --git a/valet/engine/optimizer/ostro/constraint_solver.py b/valet/engine/optimizer/ostro/constraint_solver.py index e4f28ec..8d2b527 100755 --- a/valet/engine/optimizer/ostro/constraint_solver.py +++ b/valet/engine/optimizer/ostro/constraint_solver.py @@ -77,9 +77,6 @@ class ConstraintSolver(object): "node = " + _n.node.name self.logger.error("ConstraintSolver: " + self.status) return candidate_list - else: - self.logger.debug("ConstraintSolver: done availability_" - "zone constraint " + str(len(candidate_list))) """Host aggregate constraint.""" if isinstance(_n.node, VGroup) or isinstance(_n.node, VM): @@ -90,9 +87,6 @@ class ConstraintSolver(object): "node = " + _n.node.name self.logger.error("ConstraintSolver: " + self.status) return candidate_list - else: - self.logger.debug("ConstraintSolver: done host_aggregate " - "constraint " + str(len(candidate_list))) """CPU capacity constraint.""" if isinstance(_n.node, VGroup) or isinstance(_n.node, VM): @@ -102,9 +96,6 @@ class ConstraintSolver(object): "node = " + _n.node.name self.logger.error("ConstraintSolver: " + self.status) return candidate_list - else: - self.logger.debug("ConstraintSolver: done cpu capacity " - "constraint " + str(len(candidate_list))) """Memory capacity constraint.""" if isinstance(_n.node, VGroup) or isinstance(_n.node, VM): @@ -114,9 +105,6 @@ class ConstraintSolver(object): "node = " + _n.node.name self.logger.error("ConstraintSolver: " + self.status) return candidate_list - else: - self.logger.debug("ConstraintSolver: done memory capacity " - "constraint " + str(len(candidate_list))) """Local disk capacity constraint.""" if isinstance(_n.node, VGroup) or isinstance(_n.node, VM): @@ -126,12 +114,7 @@ class ConstraintSolver(object): "node = " + _n.node.name self.logger.error("ConstraintSolver: " + self.status) return candidate_list - else: - self.logger.debug("ConstraintSolver: done local disk capacity " - "constraint " + str(len(candidate_list))) - - """Diversity constraint.""" if len(_n.node.diversity_groups) > 0: for _, diversity_id in _n.node.diversity_groups.iteritems(): if diversity_id.split(":")[0] == _level: @@ -154,9 +137,6 @@ class ConstraintSolver(object): "node = " + _n.node.name self.logger.error("ConstraintSolver: " + self.status) return candidate_list - else: - self.logger.debug("ConstraintSolver: done diversity_group " - "constraint " + str(len(candidate_list))) """Exclusivity constraint.""" exclusivities = self.get_exclusivities(_n.node.exclusivity_groups, @@ -177,9 +157,6 @@ class ConstraintSolver(object): "node = " + _n.node.name self.logger.error("ConstraintSolver: " + self.status) return candidate_list - else: - self.logger.debug("ConstraintSolver: done exclusivity " - "group constraint " + str(len(candidate_list))) else: self._constrain_non_exclusivity(_level, candidate_list) if len(candidate_list) == 0: @@ -187,9 +164,6 @@ class ConstraintSolver(object): "node = " + _n.node.name self.logger.error("ConstraintSolver: " + self.status) return candidate_list - else: - self.logger.debug("ConstraintSolver: done non-exclusivity_" - "group constraint " + str(len(candidate_list))) """Affinity constraint.""" affinity_id = _n.get_affinity_id() # level:name, except name == "any" @@ -203,9 +177,6 @@ class ConstraintSolver(object): "node = " + _n.node.name self.logger.error("ConstraintSolver: " + self.status) return candidate_list - else: - self.logger.debug("ConstraintSolver: done affinity_" - "group constraintt " + str(len(candidate_list))) return candidate_list diff --git a/valet/engine/optimizer/ostro/ostro.py b/valet/engine/optimizer/ostro/ostro.py index 39e4520..8923053 100755 --- a/valet/engine/optimizer/ostro/ostro.py +++ b/valet/engine/optimizer/ostro/ostro.py @@ -73,6 +73,55 @@ class Ostro(object): self.status = "success" self.end_of_process = False + self.batch_store_trigger = 10 # sec + self.batch_events_count = 1 + + ''' + def run_ostro(self): + self.logger.info("start Ostro ......") + + self.topology.start() + self.compute.start() + self.listener.start() + + self.thread_list.append(self.topology) + self.thread_list.append(self.compute) + self.thread_list.append(self.listener) + + while self.end_of_process is False: + request_list = self.db.get_requests() + if request_list is None: + break + + if len(request_list) > 0: + if self.place_app(request_list) is False: + break + else: + event_list = self.db.get_events() + if event_list is None: + break + + if len(event_list) > 0: + if self.handle_events(event_list) is False: + break + else: + if self.resource.resource_updated is True and \ + (time.time()-self.resource.curr_db_timestamp) >= self.batch_store_trigger: + if self.resource.store_topology_updates() is False: + break + self.resource.resource_updated = False + else: + time.sleep(0.1) + + self.topology.end_of_process = True + self.compute.end_of_process = True + + for t in self.thread_list: + t.join() + + self.logger.info("exit Ostro") + ''' + def run_ostro(self): """Start main engine process.""" """Start topology, compute, and listener processes. Start process of @@ -106,6 +155,12 @@ class Ostro(object): if len(event_list) > 0: if self.handle_events(event_list) is False: break + else: + if self.resource.resource_updated is True and \ + (time.time() - self.resource.curr_db_timestamp) >= self.batch_store_trigger: + if self.resource.store_topology_updates() is False: + break + self.resource.resource_updated = False self.topology.end_of_process = True self.compute.end_of_process = True @@ -226,7 +281,7 @@ class Ostro(object): self.logger.info("done app placement") end_time = time.time() - self.logger.info("total decision delay of request = " + str(end_time - start_time) + " sec") + self.logger.info("EVAL: total decision delay of request = " + str(end_time - start_time) + " sec") return True @@ -266,6 +321,8 @@ class Ostro(object): return query_result def _get_vms_from_logical_group(self, _group_name): + self.logger.debug("query to see vms of " + _group_name) + vm_list = [] vm_id_list = [] @@ -274,11 +331,18 @@ class Ostro(object): lg.group_type == "DIV": lg_id = lgk.split(":") if lg_id[1] == _group_name: + self.logger.debug("found group in Ostro") vm_id_list = lg.vm_list break + if len(vm_id_list) == 0: + self.logger.debug("group does not exist in Ostro") + for vm_id in vm_id_list: - vm_list.append(vm_id[2]) + if vm_id[2] != "none": # if physical_uuid != 'none' + vm_list.append(vm_id[2]) + else: + self.logger.warn("found pending vms in this group while query") return vm_list @@ -326,7 +390,8 @@ class Ostro(object): """Update resource and app information.""" if len(placement_map) > 0: - self.resource.update_topology() + self.resource.update_topology(store=False) + self.app_handler.add_placement(placement_map, self.resource.current_timestamp) if len(app_topology.exclusion_list_map) > 0 and \ @@ -359,7 +424,7 @@ class Ostro(object): if self._set_flavors() is False: return False - self.resource.update_topology() + self.resource.update_topology(store=False) flavor = self.resource.get_flavor(_vm.flavor) if flavor is None: @@ -386,6 +451,8 @@ class Ostro(object): resource_updated = False + events_count = 0 + handled_event_list = [] for e in _event_list: if e.host is not None and e.host != "none": if self._check_host(e.host) is False: @@ -395,7 +462,7 @@ class Ostro(object): if e.method == "build_and_run_instance": # VM is created (from stack) - self.logger.debug("Ostro.handle_events: got build_and_run event") + self.logger.debug("Ostro.handle_events: got build_and_run event for " + e.uuid) if self.db.put_uuid(e) is False: self.data_lock.release() return False @@ -403,6 +470,7 @@ class Ostro(object): elif e.method == "object_action": if e.object_name == 'Instance': # VM became active or deleted + # h_uuid, stack_id orch_id = self.db.get_uuid(e.uuid) if orch_id is None: self.data_lock.release() @@ -410,7 +478,7 @@ class Ostro(object): if e.vm_state == "active": self.logger.debug("Ostro.handle_events: got instance_" - "active event") + "active event for " + e.uuid) vm_info = self.app_handler.get_vm_info(orch_id[1], orch_id[0], e.host) if vm_info is None: self.logger.error("Ostro.handle_events: error " @@ -476,7 +544,7 @@ class Ostro(object): elif e.vm_state == "deleted": self.logger.debug("Ostro.handle_events: got instance_" - "delete event") + "delete event for " + e.uuid) self._remove_vm_from_host(e.uuid, orch_id[0], e.host, e.vcpus, e.mem, e.local_disk) @@ -500,6 +568,9 @@ class Ostro(object): # Host resource is updated self.logger.debug("Ostro.handle_events: got compute event") + elif e.object_name == 'ComputeNode': + # Host resource is updated + self.logger.debug("EVENT: got compute for " + e.host) # NOTE: what if host is disabled? if self.resource.update_host_resources( e.host, e.status, e.vcpus, e.vcpus_used, e.mem, @@ -516,10 +587,15 @@ class Ostro(object): self.logger.warn("Ostro.handle_events: unknown event " "method = " + e.method) - if resource_updated is True: - self.resource.update_topology() + events_count += 1 + handled_event_list.append(e) + if events_count >= self.batch_events_count: + break - for e in _event_list: + if resource_updated is True: + self.resource.update_topology(store=False) + + for e in handled_event_list: if self.db.delete_event(e.event_id) is False: self.data_lock.release() return False @@ -567,6 +643,8 @@ class Ostro(object): _vcpus, _mem, _local_disk) self.resource.update_host_time(_host_name) + else: + self.logger.warn("vm (" + _uuid + ") is missing while removing") def _remove_vm_from_logical_groups(self, _uuid, _h_uuid, _host_name): host = self.resource.hosts[_host_name] diff --git a/valet/engine/optimizer/ostro/search.py b/valet/engine/optimizer/ostro/search.py index 1d9076e..58c13ec 100755 --- a/valet/engine/optimizer/ostro/search.py +++ b/valet/engine/optimizer/ostro/search.py @@ -323,19 +323,11 @@ class Search(object): avail_resources = _avail_hosts _node_list.sort(key=operator.attrgetter("sort_base"), reverse=True) - self.logger.debug("level = " + _level) - for on in _node_list: - self.logger.debug("node = {}, value = {}".format(on.node.name, - on.sort_base)) while len(_node_list) > 0: n = _node_list.pop(0) best_resource = self._get_best_resource_for_planned(n, _level, avail_resources) if best_resource is not None: - debug_best_resource = best_resource.get_resource_name(_level) - self.logger.debug("Search: best resource = " + - debug_best_resource + " for node = " + - n.node.name) self._deduct_reservation(_level, best_resource, n) self._close_planned_placement(_level, best_resource, n.node) @@ -535,7 +527,7 @@ class Search(object): for lgk, lg in self.resource.logical_groups.iteritems(): if lg.status != "enabled": - self.logger.debug("group (" + lg.name + ") disabled") + self.logger.warn("group (" + lg.name + ") disabled") continue lgr = LogicalGroupResource() @@ -793,12 +785,6 @@ class Search(object): _open_node_list.sort(key=operator.attrgetter("sort_base"), reverse=True) - self.logger.debug("Search: the order of open node list in level = " + - _level) - for on in _open_node_list: - self.logger.debug(" node = {}, value = {}".format(on.node.name, - on.sort_base)) - while len(_open_node_list) > 0: n = _open_node_list.pop(0) self.logger.debug("Search: level = " + _level + ", node = " + @@ -809,19 +795,12 @@ class Search(object): success = False break - debug_best_resource = best_resource.get_resource_name(_level) - self.logger.debug("Search: best resource = " + debug_best_resource + - " for node = " + n.node.name) - if n.node not in self.planned_placements.keys(): """for VM or Volume under host level only""" self._deduct_reservation(_level, best_resource, n) """close all types of nodes under any level, but VM or Volume with above host level""" self._close_node_placement(_level, best_resource, n.node) - else: - self.logger.debug("Search: node (" + n.node.name + - ") is already deducted") return success @@ -1838,7 +1817,7 @@ class Search(object): self._rollback_reservation(v) if _v in self.node_placements.keys(): - self.logger.debug("node (" + _v.name + ") rollbacked") + # self.logger.debug("node (" + _v.name + ") rollbacked") chosen_host = self.avail_hosts[self.node_placements[_v].host_name] level = self.node_placements[_v].level @@ -2139,8 +2118,6 @@ class Search(object): def _rollback_node_placement(self, _v): if _v in self.node_placements.keys(): del self.node_placements[_v] - self.logger.debug("Search: node (" + _v.name + - ") removed from placement") if isinstance(_v, VGroup): for _, sg in _v.subvgroups.iteritems(): diff --git a/valet/engine/optimizer/ostro_server/health_checker.py b/valet/engine/optimizer/ostro_server/health_checker.py index 5c978fa..7404d46 100644 --- a/valet/engine/optimizer/ostro_server/health_checker.py +++ b/valet/engine/optimizer/ostro_server/health_checker.py @@ -15,7 +15,7 @@ class HealthCheck(object): rest = None - def __init__(self, hosts=[], port='8080', keyspace='valet'): + def __init__(self, hosts=[]): self.tries = CONF.engine.health_timeout * 2 # default health_timeout=10 self.uuid = str(uuid.uuid4()) @@ -28,15 +28,15 @@ class HealthCheck(object): } self.rest = REST(**kwargs) - def ping(self, my_id): + def ping(self): - engine_alive = False + engine_id = None try: if self._send(): - engine_alive = self._read_response(my_id) + engine_id = self._read_response() finally: self._delete_result() - return engine_alive + return engine_id def _send(self): @@ -55,9 +55,9 @@ class HealthCheck(object): return response.status_code == 204 if response else False - def _read_response(self, my_id): + def _read_response(self): - found = False + engine_id = None path = '/keyspaces/%(keyspace)s/tables/%(table)s/rows?stack_id=%(uid)s' % { 'keyspace': CONF.music.keyspace, 'table': CONF.music.response_table, @@ -72,52 +72,58 @@ class HealthCheck(object): if response.status_code == 200 and len(response.text) > 3: j = json.loads(response.text) - stack_id = j['row 0']['stack_id'] + if j['row 0']['stack_id'] != self.uuid: + continue + placement = json.loads(j['row 0']['placement']) engine_id = placement['resources']['id'] + break + except Exception as e: + logger.warn("HealthCheck exception in read response - " + str(e)) - if stack_id == self.uuid and engine_id == my_id: - found = True - break - except Exception: - pass - - return found + return engine_id def _delete_result(self): - # leave the table clean - delete from requests and responses + # leave a clean table - delete from requests and responses + data = { + "consistencyInfo": {"type": "eventual"} + } + try: path = '/keyspaces/%(keyspace)s/tables/%(table)s/rows?stack_id=%(uid)s' % { 'keyspace': CONF.music.keyspace, 'table': CONF.music.request_table, 'uid': self.uuid } - data = { - "consistencyInfo": {"type": "eventual"} - } self.rest.request(method='delete', path=path, data=data) + except Exception as e: + logger.warn("HealthCheck exception in delete request - " + str(e)) + try: path = '/keyspaces/%(keyspace)s/tables/%(table)s/rows?stack_id=%(uid)s' % { 'keyspace': CONF.music.keyspace, 'table': CONF.music.response_table, 'uid': self.uuid } self.rest.request(method='delete', path=path, data=data) - except Exception: - pass + except Exception as e: + logger.warn("HealthCheck exception in delete response - " + str(e)) if __name__ == "__main__": - alive = False + respondent_id = None code = 0 init_engine(default_config_files=['/etc/valet/valet.conf']) logger = get_logger("ostro_daemon") if os.path.exists(CONF.engine.pid): - alive = HealthCheck().ping(CONF.engine.priority) - if alive: - code = CONF.engine.priority - logger.info("HealthCheck - Alive, priority = {}".format(CONF.engine.priority)) + respondent_id = HealthCheck().ping() + if respondent_id == CONF.engine.priority: + code = CONF.engine.priority + logger.info("HealthCheck - Alive, respondent instance id: {}".format(respondent_id)) + else: + logger.warn("HealthCheck - pid file exists, engine {} did not respond in a timely manner (respondent id {})" + .format(CONF.engine.priority, respondent_id)) else: - logger.warn("HealthCheck - Engine is DEAD!") + logger.info("HealthCheck - no pid file, engine is not running!") sys.exit(code) diff --git a/valet/engine/resource_manager/compute_manager.py b/valet/engine/resource_manager/compute_manager.py index 9406d23..07176d7 100755 --- a/valet/engine/resource_manager/compute_manager.py +++ b/valet/engine/resource_manager/compute_manager.py @@ -103,7 +103,7 @@ class ComputeManager(threading.Thread): if triggered_host_updates is True and triggered_flavor_updates is True: self.data_lock.acquire() - update_status = self.resource.update_topology() + update_status = self.resource.update_topology(store=False) self.data_lock.release() if update_status is False: diff --git a/valet/engine/resource_manager/resource.py b/valet/engine/resource_manager/resource.py index 5230cfa..457b08f 100755 --- a/valet/engine/resource_manager/resource.py +++ b/valet/engine/resource_manager/resource.py @@ -53,7 +53,10 @@ class Resource(object): self.flavors = {} self.current_timestamp = 0 - self.last_log_index = 0 + self.curr_db_timestamp = 0 + # self.last_log_index = 0 + + self.resource_updated = False """ resource status aggregation """ self.CPU_avail = 0 @@ -62,33 +65,6 @@ class Resource(object): self.disk_avail = 0 self.nw_bandwidth_avail = 0 - def show_current_logical_groups(self): - for lgk, lg in self.logical_groups.iteritems(): - if lg.status == "enabled": - self.logger.debug("Resource: lg name = " + lgk) - self.logger.debug(" type = " + lg.group_type) - if lg.group_type == "AGGR": - for k in lg.metadata.keys(): - self.logger.debug(" key = " + k) - self.logger.debug(" vms") - for v in lg.vm_list: - self.logger.debug(" orch_id = " + v[0] + " uuid = " + v[2]) - self.logger.debug(" hosts") - for h, v in lg.vms_per_host.iteritems(): - self.logger.debug(" host = " + h) - self.logger.debug(" vms = " + str(len(lg.vms_per_host[h]))) - host = None - if h in self.hosts.keys(): - host = self.hosts[h] - elif h in self.host_groups.keys(): - host = self.host_groups[h] - else: - self.logger.error("Resource: lg member not exist") - if host is not None: - self.logger.debug(" status = " + host.status) - if lgk not in host.memberships.keys(): - self.logger.error("membership missing") - def bootstrap_from_db(self, _resource_status): """Return True if bootsrap resource from database successful.""" try: @@ -313,29 +289,28 @@ class Resource(object): self._update_storage_avail() self._update_nw_bandwidth_avail() - # for test - # self.show_current_logical_groups() - if store is False: return True - ct = self._store_topology_updates() - if ct is None: - return False - else: - self.current_timestamp = ct - return True + return self.store_topology_updates() def _update_topology(self): + updated = False for level in LEVELS: for _, host_group in self.host_groups.iteritems(): if host_group.host_type == level and \ host_group.check_availability() is True: if host_group.last_update > self.current_timestamp: self._update_host_group_topology(host_group) + updated = True - if self.datacenter.last_update > self.current_timestamp: + if self.datacenter.last_update >= self.current_timestamp: self._update_datacenter_topology() + updated = True + + if updated is True: + self.current_timestamp = time.time() + self.resource_updated = True def _update_host_group_topology(self, _host_group): _host_group.init_resources() @@ -457,9 +432,8 @@ class Resource(object): # NOTE: peer links? self.nw_bandwidth_avail += min(avail_nw_bandwidth_list) - def _store_topology_updates(self): - last_update_time = self.current_timestamp - + def store_topology_updates(self): + updated = False flavor_updates = {} logical_group_updates = {} storage_updates = {} @@ -468,87 +442,139 @@ class Resource(object): host_group_updates = {} datacenter_update = None - for fk, flavor in self.flavors.iteritems(): - if flavor.last_update > self.current_timestamp: - flavor_updates[fk] = flavor.get_json_info() + self.logger.info("check and store resource status") - last_update_time = flavor.last_update + for fk, flavor in self.flavors.iteritems(): + if flavor.last_update >= self.curr_db_timestamp: + flavor_updates[fk] = flavor.get_json_info() + # self.logger.debug("resource flavor(" + fk + ") stored") + updated = True for lgk, lg in self.logical_groups.iteritems(): - if lg.last_update > self.current_timestamp: + if lg.last_update >= self.curr_db_timestamp: logical_group_updates[lgk] = lg.get_json_info() - - last_update_time = lg.last_update + # self.logger.debug("resource lg(" + lgk + ") stored") + updated = True for shk, storage_host in self.storage_hosts.iteritems(): - if storage_host.last_update > self.current_timestamp or \ - storage_host.last_cap_update > self.current_timestamp: + if storage_host.last_update >= self.curr_db_timestamp or \ + storage_host.last_cap_update >= self.curr_db_timestamp: storage_updates[shk] = storage_host.get_json_info() - - if storage_host.last_update > self.current_time_stamp: - last_update_time = storage_host.last_update - if storage_host.last_cap_update > self.current_timestamp: - last_update_time = storage_host.last_cap_update + # self.logger.debug("resource storage(" + shk + ") stored") + updated = True for sk, s in self.switches.iteritems(): - if s.last_update > self.current_timestamp: + if s.last_update >= self.curr_db_timestamp: switch_updates[sk] = s.get_json_info() - - last_update_time = s.last_update + # self.logger.debug("resource switch(" + sk + ") stored") + updated = True for hk, host in self.hosts.iteritems(): if host.last_update > self.current_timestamp or \ host.last_link_update > self.current_timestamp: host_updates[hk] = host.get_json_info() - - if host.last_update > self.current_timestamp: - last_update_time = host.last_update - if host.last_link_update > self.current_timestamp: - last_update_time = host.last_link_update + # self.logger.debug("resource host(" + hk + ") stored") + updated = True for hgk, host_group in self.host_groups.iteritems(): - if host_group.last_update > self.current_timestamp or \ - host_group.last_link_update > self.current_timestamp: + if host_group.last_update >= self.curr_db_timestamp or \ + host_group.last_link_update >= self.curr_db_timestamp: host_group_updates[hgk] = host_group.get_json_info() + # self.logger.debug("resource hgroup(" + hgk + ") stored") + updated = True - if host_group.last_update > self.current_timestamp: - last_update_time = host_group.last_update - if host_group.last_link_update > self.current_timestamp: - last_update_time = host_group.last_link_update - - if self.datacenter.last_update > self.current_timestamp or \ - self.datacenter.last_link_update > self.current_timestamp: + if self.datacenter.last_update >= self.curr_db_timestamp or \ + self.datacenter.last_link_update >= self.curr_db_timestamp: datacenter_update = self.datacenter.get_json_info() + # self.logger.debug("resource datacenter stored") + updated = True - if self.datacenter.last_update > self.current_timestamp: - last_update_time = self.datacenter.last_update - if self.datacenter.last_link_update > self.current_timestamp: - last_update_time = self.datacenter.last_link_update + # (resource_logfile, last_index, mode) = util.get_last_logfile(self.config.resource_log_loc, + # self.config.max_log_size, + # self.config.max_num_of_logs, + # self.datacenter.name, + # self.last_log_index) + # self.last_log_index = last_index - json_logging = {} - json_logging['timestamp'] = last_update_time + # logging = open(self.config.resource_log_loc + resource_logfile, mode) - if len(flavor_updates) > 0: - json_logging['flavors'] = flavor_updates - if len(logical_group_updates) > 0: - json_logging['logical_groups'] = logical_group_updates - if len(storage_updates) > 0: - json_logging['storages'] = storage_updates - if len(switch_updates) > 0: - json_logging['switches'] = switch_updates - if len(host_updates) > 0: - json_logging['hosts'] = host_updates - if len(host_group_updates) > 0: - json_logging['host_groups'] = host_group_updates - if datacenter_update is not None: - json_logging['datacenter'] = datacenter_update + if updated is True: + json_logging = {} + json_logging['timestamp'] = self.curr_db_timestamp - if self.db is not None: - if self.db.update_resource_status(self.datacenter.name, - json_logging) is False: + if len(flavor_updates) > 0: + json_logging['flavors'] = flavor_updates + if len(logical_group_updates) > 0: + json_logging['logical_groups'] = logical_group_updates + if len(storage_updates) > 0: + json_logging['storages'] = storage_updates + if len(switch_updates) > 0: + json_logging['switches'] = switch_updates + if len(host_updates) > 0: + json_logging['hosts'] = host_updates + if len(host_group_updates) > 0: + json_logging['host_groups'] = host_group_updates + if datacenter_update is not None: + json_logging['datacenter'] = datacenter_update + + # logged_data = json.dumps(json_logging) + + # logging.write(logged_data) + # logging.write("\n") + + # logging.close() + + # self.logger.info("log resource status in " + resource_logfile) + + # if self.db is not None: + if self.db.update_resource_status(self.datacenter.name, json_logging) is False: return None - return last_update_time + self.curr_db_timestamp = time.time() + + # for test + # self.show_current_logical_groups() + # self.show_current_host_status() + + return True + + def show_current_logical_groups(self): + for lgk, lg in self.logical_groups.iteritems(): + if lg.status == "enabled": + self.logger.debug("TEST: lg name = " + lgk) + self.logger.debug(" type = " + lg.group_type) + if lg.group_type == "AGGR": + for k in lg.metadata.keys(): + self.logger.debug(" metadata key = " + k) + self.logger.debug(" vms") + for v in lg.vm_list: + self.logger.debug(" orch_id = " + v[0] + " uuid = " + v[2]) + self.logger.debug(" hosts") + for h, v in lg.vms_per_host.iteritems(): + self.logger.debug(" host = " + h) + self.logger.debug(" vms = " + str(len(lg.vms_per_host[h]))) + host = None + if h in self.hosts.keys(): + host = self.hosts[h] + elif h in self.host_groups.keys(): + host = self.host_groups[h] + else: + self.logger.error("TEST: lg member not exist") + if host is not None: + self.logger.debug(" status = " + host.status) + if lgk not in host.memberships.keys(): + self.logger.error("TEST: membership missing") + + def show_current_host_status(self): + for hk, host in self.hosts.iteritems(): + self.logger.debug("TEST: host name = " + hk) + self.logger.debug(" status = " + host.status) + self.logger.debug(" vms = " + str(len(host.vm_list))) + self.logger.debug(" memberships") + for mk in host.memberships.keys(): + self.logger.debug(" " + mk) + if mk not in self.logical_groups.keys(): + self.logger.error("TEST: lg missing") def update_rack_resource(self, _host): """Update resources for rack (host), then update cluster.""" @@ -780,13 +806,16 @@ class Resource(object): """Remove vm by orchestration id from lgs. Update host and lgs.""" for lgk in _host.memberships.keys(): if lgk not in self.logical_groups.keys(): + self.logger.warn("logical group (" + lgk + ") missing while removing " + _h_uuid) continue lg = self.logical_groups[lgk] if isinstance(_host, Host): + # remove host from lg's membership if the host has no vms of lg if lg.remove_vm_by_h_uuid(_h_uuid, _host.name) is True: lg.last_update = time.time() + # remove lg from host's membership if lg does not have the host if _host.remove_membership(lg) is True: _host.last_update = time.time() @@ -816,13 +845,16 @@ class Resource(object): """Remove vm by uuid from lgs and update proper host and lgs.""" for lgk in _host.memberships.keys(): if lgk not in self.logical_groups.keys(): + self.logger.warn("logical group (" + lgk + ") missing while removing " + _uuid) continue lg = self.logical_groups[lgk] if isinstance(_host, Host): + # remove host from lg's membership if the host has no vms of lg if lg.remove_vm_by_uuid(_uuid, _host.name) is True: lg.last_update = time.time() + # remove lg from host's membership if lg does not have the host if _host.remove_membership(lg) is True: _host.last_update = time.time() diff --git a/valet/engine/resource_manager/topology_manager.py b/valet/engine/resource_manager/topology_manager.py index eb81fca..5ff3482 100755 --- a/valet/engine/resource_manager/topology_manager.py +++ b/valet/engine/resource_manager/topology_manager.py @@ -97,7 +97,7 @@ class TopologyManager(threading.Thread): if self.set_topology() is True: self.data_lock.acquire() - update_status = self.resource.update_topology() + update_status = self.resource.update_topology(store=False) self.data_lock.release() if update_status is False: diff --git a/valet/ha/__init__.py b/valet/ha/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/valet/ha/ha_valet.cfg b/valet/ha/ha_valet.cfg deleted file mode 100644 index 84afa6c..0000000 --- a/valet/ha/ha_valet.cfg +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/env python -# vi: sw=4 ts=4: -# -# --------------------------------------------------------------------------- -# Copyright 2014-2017 AT&T Intellectual Property -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# --------------------------------------------------------------------------- -# -# -# configuration file for havalet - valet processes monitoring tool. -# group name is a logical process name. -# properties MUST have the following properties: -# 'host' -# 'user' -# 'start' -# 'stop' -# 'test' -# -# IMPORTANT: -# "test" - MUST return a value != 0, this value should reflects -# the monitored process priority. -# -# "process's priority" - used for active/stand-by scenarios. -# MUST be greater than 0 - lower number means higher priority. -# e.g. instance which returns '1', as its response to "test" invocation, -# will get precedence over instance which returns '2' as its priority. -# priority 0 means thr process is down. -# -# "stand_by_list" - OPTIONAL property. comma delimited hosts list. -# used on active/stand-by scenarios. -# ha_valet will attempt to restart the instance with the lower priority value, -# only if the instance fails to start, ha_valet will try to restart the process -# on the following host in the list. - -[Ostro] -order=5 -priority=1 -host=valet1 -user=m04060 -stand_by_list=valet2 -start="ssh -o ConnectTimeout=1 %(user)s@%(host)s 'python /usr/local/lib/python2.7/dist-packages/valet/engine/optimizer/ostro_server/ostro_daemon.py -c restart'" -stop="ssh -o ConnectTimeout=1 %(user)s@%(host)s 'python /usr/local/lib/python2.7/dist-packages/valet/engine/optimizer/ostro_server/ostro_daemon.py -c stop'" -test="ssh -o ConnectTimeout=1 %(user)s@%(host)s 'python /usr/local/lib/python2.7/dist-packages/valet/engine/optimizer/ostro_server/health_checker.py ; exit $?'" - - -[ValetApi] -order=4 -priority=1 -host=valet1 -stand_by_list=valet2 -user=m04060 -start="ssh -o ConnectTimeout=1 %(user)s@%(host)s 'sudo service apache2 restart'" -stop="ssh -o ConnectTimeout=1 %(user)s@%(host)s 'sudo apachectl stop'" -test="exit $(wget -T 1 -t 1 -qO- http://%(host)s:8090/v1 | grep CURRENT | wc -l)" - diff --git a/valet/ha/ha_valet.py b/valet/ha/ha_valet.py deleted file mode 100644 index dceff65..0000000 --- a/valet/ha/ha_valet.py +++ /dev/null @@ -1,555 +0,0 @@ -# -# Copyright 2014-2017 AT&T Intellectual Property -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""HA Valet. - -Mnemonic: ha_valet.py -Abstract: High availability script for valet processes. Starts it's - configured processes, and pings for their availability. If local - instances are not running, then makes the current instances - start. If it finds multiple instances running, then determines - which instance should be shut down based on priorities. - -Author: Amnon Sagiv based on ha_tegu by Kaustubh Joshi - - ------------------------------------------------------------------------------ - - Algorithm - ----------- - The ha_valet script runs on each valet node in a continuous loop checking for - heartbeats from all the valet nodes found in the "stand_by_list" conf property - once every 5 secs (default). A heartbeat is obtained by invoking the - "test_command" conf property. - If exactly one monitored process instance is running, the script does - nothing. If no instance is running, then the local instance is activated after - waiting for 5*priority seconds to let a higher priority valet take over - first. A valet monitored process's priority is determined by its conf. - If the current node's is running and another is found, then a - conflict resolution process is invoked whereby the priorities of both - processes are compared, and the instance with the higher value is deactivated. - - IMPORTANT: test_command must return a value != 0, this is value should reflects - the monitored process priority. - """ - -import logging.handlers -import os -from oslo_config import cfg -import socket -import subprocess -import threading -import time -# import argparse -# from oslo_log import log as logging - -CONF = cfg.CONF - -# Directory locations -LOG_DIR = os.getenv('HA_VALET_LOGD', '/var/log/havalet/') -ETC_DIR = os.getenv('HA_VALET_ETCD', '/etc/valet/ha/') -DEFAULT_CONF_FILE = ETC_DIR + 'ha_valet.cfg' - -# Set the maximum logfile size as Byte for time-series log files -max_log_size = 1000000 -# Set the maximum number of time-series log files -max_num_of_logs = 10 - -PRIMARY_SETUP = 1 -RETRY_COUNT = 3 # How many times to retry ping command -CONNECT_TIMEOUT = 3 # Ping timeout -MAX_QUICK_STARTS = 10 # we stop if there are > 10 restart in quick succession -QUICK_RESTART_SEC = 150 # we consider it a quick restart if less than this - -# HA Configuration -HEARTBEAT_SEC = 20 # Heartbeat interval in seconds - - -NAME = 'name' -ORDER = 'order' -HOST = 'host' -USER = 'user' -PRIORITY = 'priority' -START_COMMAND = 'start' -STOP_COMMAND = 'stop' -TEST_COMMAND = 'test' -STAND_BY_LIST = 'stand_by_list' - -ostro_group = cfg.OptGroup(name='Ostro', title='Valet Engine HA conf') -api_group = cfg.OptGroup(name='ValetApi', title='Valet Api HA conf') - -havalet_opts = [ - cfg.IntOpt(PRIORITY, default=1, help='master slave distinguish'), - cfg.IntOpt(ORDER, help='launching order'), - cfg.StrOpt(HOST, help='where the monitored process is running on'), - cfg.StrOpt(USER, help='linux user'), - cfg.ListOpt(STAND_BY_LIST, help='monitored hosts list'), - cfg.StrOpt(START_COMMAND, help='launch command'), - cfg.StrOpt(STOP_COMMAND, help='stop command'), - cfg.StrOpt(TEST_COMMAND, help='test command') -] - -# common.init_conf("havalet.log", grp2opt={api_group: havalet_opts, ostro_group: havalet_opts}) - -CONF.register_group(api_group) -CONF.register_opts(havalet_opts, api_group) - -CONF.register_group(ostro_group) -CONF.register_opts(havalet_opts, ostro_group) - - -def read_conf(): - """Return dictionary of configured processes.""" - return dict([ - ('Ostro', { - NAME: 'Ostro', - ORDER: CONF.Ostro.order, - HOST: CONF.Ostro.host, - USER: CONF.Ostro.user, - PRIORITY: CONF.Ostro.priority, - START_COMMAND: CONF.Ostro.start, - STOP_COMMAND: CONF.Ostro.stop, - TEST_COMMAND: CONF.Ostro.test, - STAND_BY_LIST: CONF.Ostro.stand_by_list - }), - - ('ValetApi', { - NAME: 'ValetApi', - ORDER: CONF.ValetApi.order, - HOST: CONF.ValetApi.host, - USER: CONF.ValetApi.user, - PRIORITY: CONF.ValetApi.priority, - START_COMMAND: CONF.ValetApi.start, - STOP_COMMAND: CONF.ValetApi.stop, - TEST_COMMAND: CONF.ValetApi.test, - STAND_BY_LIST: CONF.ValetApi.stand_by_list - })]) - - -def prepare_log(obj, name): - obj.log = logging.getLogger(name) - obj.log.setLevel(logging.DEBUG) - # logging.register_options(CONF) - # logging.setup(CONF, 'valet') - handler = logging.handlers.RotatingFileHandler(LOG_DIR + name + '.log', - maxBytes=max_log_size, - backupCount=max_num_of_logs) - fmt = logging.Formatter('%(asctime)s %(levelname)s %(message)s') - handler.setFormatter(fmt) - obj.log.addHandler(handler) - - -class HaValetThread (threading.Thread): - - def __init__(self, data, exit_event): - """Initialize HAValetThread.""" - threading.Thread.__init__(self) - self.data = data - self.log = None - - def run(self): - """Main function.""" - prepare_log(self, self.data[NAME]) - self.log.info('HA Valet - ' + self.data[NAME] + - ' Watcher Thread - starting') - - fqdn_list = [] - this_node = socket.getfqdn() - fqdn_list.append(this_node) - - # Read list of standby valet nodes and find us - standby_list = self.data.get(STAND_BY_LIST, None) - - while not len(standby_list) is 0: # loop until we find us - self.log.debug("stand by list: " + str(standby_list)) - try: - for fqdn in fqdn_list: - self.log.debug("fqdn_list: " + str(fqdn_list)) - if fqdn in standby_list: - this_node = fqdn - break - standby_list.remove(this_node) - self.data[STAND_BY_LIST] = standby_list - self.log.debug("modified stand by list: " + str(standby_list)) - except ValueError: - self.log.debug("host " + this_node + - " is not in standby list: %s - continue" - % str(standby_list)) - break - - # Loop forever sending pings - self._main_loop(this_node) - self.log.info("HA Valet Watcher Thread - going down!") - - def use(self, param): - pass - - def _main_loop(self, this_node): - """Main heartbeat and liveness check loop. - - :param this_node: host name - :type this_node: string - :return: None - :rtype: - """ - quick_start = 0 # number of restarts close together - last_start = 0 - priority_wait = False - - """ - DO NOT RENAME, DELETE, MOVE the following parameters, - they may be referenced from within the process commands - """ - host = self.data.get(HOST, 'localhost') - user = self.data.get(USER, None) - self.use(user) - my_priority = int(self.data.get(PRIORITY, 1)) - start_command = self.data.get(START_COMMAND, None) - stop_command = self.data.get(STOP_COMMAND, None) - test_command = self.data.get(TEST_COMMAND, None) - standby_list = self.data.get(STAND_BY_LIST) - - while True: - if not priority_wait: - # Normal heartbeat - time.sleep(HEARTBEAT_SEC) - else: - # No valet running. Wait for higher priority valet to activate. - time.sleep(HEARTBEAT_SEC / my_priority) - - self.log.info('checking status here - ' + host + - ', my priority: ' + str(my_priority)) - i_am_active, priority = self._is_active(eval(test_command)) - self.log.info(host + ': host_active = ' + str(i_am_active) + - ', ' + str(priority)) - any_active = i_am_active - self.log.info('any active = ' + str(any_active)) - - # Check for active valets - standby_list_is_empty = not standby_list - if not standby_list_is_empty: - self.log.debug('main loop: standby_list is not empty ' + - str(standby_list)) - for host_in_list in standby_list: - if host_in_list == this_node: - self.log.info('host_in_list is this_node - skipping') - continue - - self.log.info('checking status on - ' + host_in_list) -<<<<<<< HEAD - host = host_in_list - host_active, host_priority = \ - self._is_active(eval(test_command)) - host = self.data.get(HOST, 'localhost') - self.log.info(host_in_list + ' - host_active = ' + - str(host_active) + ', ' + str(host_priority)) -======= - # host = host_in_list - host_active, host_priority = self._is_active(test_command % {'host': host_in_list, 'user': user}) - # host = self.data.get(HOST, 'localhost') - self.log.info(host_in_list + ' - host_active = ' + str(host_active) + ', ' + str(host_priority)) ->>>>>>> da5d947... Resolve affinity group with flavor id - # Check for split brain: 2 valets active - if i_am_active and host_active: - self.log.info('found two live instances, ' - 'checking priorities') - should_be_active = self._should_be_active(host_priority, my_priority) - if should_be_active: -<<<<<<< HEAD - self.log.info('deactivate myself, ' + host_in_list + - ' already running') - # Deactivate myself - self._deactivate_process(eval(stop_command)) -======= - self.log.info('deactivate myself, ' + host_in_list + ' already running') - self._deactivate_process(stop_command % {'host': host, 'user': user}) # Deactivate myself ->>>>>>> da5d947... Resolve affinity group with flavor id - i_am_active = False - else: - self.log.info('deactivate ' + self.data[NAME] + - ' on ' + host_in_list + - ', already running here') -<<<<<<< HEAD - host = host_in_list - # Deactivate other valet - self._deactivate_process(eval(stop_command)) - host = self.data.get(HOST, 'localhost') -======= - # host = host_in_list - # Deactivate other valet - self._deactivate_process(stop_command % {'host': host_in_list, 'user': user}) - # host = self.data.get(HOST, 'localhost') ->>>>>>> da5d947... Resolve affinity group with flavor id - - # Track that at-least one valet is active - any_active = any_active or host_active - - # If no active process or I'm primary, then we must try to start one - if not any_active or \ - (not i_am_active and my_priority == PRIMARY_SETUP): - self.log.warn('there is no instance up') - self.log.info('Im primary instance: ' + - str(my_priority is PRIMARY_SETUP)) - if priority_wait or my_priority == PRIMARY_SETUP: - now = int(time.time()) - # quick restart (crash?) - if now - last_start < QUICK_RESTART_SEC: - quick_start += 1 - if quick_start > MAX_QUICK_STARTS: - self.log.critical("too many restarts " - "in quick succession.") - else: - # reset if it's been a while since last restart - quick_start = 0 - - if last_start == 0: - diff = "never by this instance" - else: - diff = "%d seconds ago" % (now - last_start) - - last_start = now - priority_wait = False -<<<<<<< HEAD - if (not i_am_active and my_priority == PRIMARY_SETUP) or \ - (standby_list is not None): - self.log.info('no running instance found, ' - 'starting here; last start %s' % diff) - self._activate_process(start_command, my_priority) - else: - # LIMITATION - supporting only 1 stand by host - host = standby_list[0] - self.log.info('no running instances found, starting ' - 'on %s; last start %s' % (host, diff)) - self._activate_process(start_command, my_priority) - host = self.data.get(HOST, 'localhost') -======= - if (not i_am_active and my_priority == PRIMARY_SETUP) or (standby_list is not None): - self.log.info('no running instance found, starting here; last start %s' % diff) - self._activate_process(start_command % {'host': host, 'user': user}, my_priority) - else: - # host = standby_list[0] # LIMITATION - supporting only 1 stand by host - self.log.info('no running instances found, starting on %s; last start %s' % (host, diff)) - self._activate_process(start_command % {'host': standby_list[0], 'user': user}, my_priority) - # host = self.data.get(HOST, 'localhost') ->>>>>>> da5d947... Resolve affinity group with flavor id - else: - priority_wait = True - else: - self.log.info("status: up and running") - # end loop - - def _should_be_active(self, host_priority, my_priority): - """Should Be Active. - - Returns True if host should be active as opposed to current node, - based on the hosts priorities. - - Lower value means higher Priority, - 0 (zero) - invalid priority (e.g. process is down) - - :param host_priority: other host's priority - :type host_priority: int - :param my_priority: my priority - :type my_priority: int - :return: True/False - :rtype: bool - """ - self.log.info('my priority is %d, remote priority is %d' % - (my_priority, host_priority)) - return host_priority < my_priority - - def _is_active(self, call): - """_is_active. - - Return 'True, Priority' if valet is running on host - 'False, None' Otherwise. - """ - # must use no-proxy to avoid proxy servers gumming up the works - for i in xrange(RETRY_COUNT): - try: - self.log.info('ping (retry %d): %s' % (i, call)) - proc = subprocess.Popen(call, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, shell=True) - priority = proc.wait() - if priority == 255: # no route to host - priority = 0 - out, err = proc.communicate() - self.log.debug('out: ' + out + ', err: ' + err) - self.log.info('ping result (should be > 0): %s' - % (str(priority))) - return (priority > 0), priority - except subprocess.CalledProcessError: - self.log.error('ping error: ' + - str(subprocess.CalledProcessError)) - continue - return False, None - - def _deactivate_process(self, deactivate_command): - """Deactivate Process. - - Deactivate valet on a given host. If host is omitted, local - valet is stopped. Returns True if successful, False on error. - """ - try: - # call = "'" + deactivate_command % (PROTO, host, port) + "'" - self.log.info('deactivate_command: ' + deactivate_command) - subprocess.check_call(deactivate_command, shell=True) - return True - except subprocess.CalledProcessError as e: - self.log.error(str(e)) - return False - - def _activate_process(self, activate_command, priority): - """Activate Process. - - Activate valet on a given host. If host is omitted, local - valet is started. Returns True if successful, False on error. - """ - try: - self.log.info('activate_command: ' + activate_command) - subprocess.check_call(activate_command, shell=True) - time.sleep(HEARTBEAT_SEC) # allow some grace period - return True - except subprocess.CalledProcessError as e: - self.log.error(str(e)) - return False - - -class HAValet(object): - """""" - - def __init__(self): - """Init HAValet object.""" - if not os.path.exists(LOG_DIR): - os.makedirs(LOG_DIR) - self.log = None - -<<<<<<< HEAD - @DeprecationWarning - def _parse_valet_conf_v010(self, conf_file_name=DEFAULT_CONF_FILE, - process=''): - """Parse Valet Conf v010. - - This function reads the valet config file and returns configuration - attributes in key/value format - - :param conf_file_name: config file name - :type conf_file_name: string - :param process: specific process name - when not supplied - the module launches all the - processes in the configuration - :type process: string - :return: dictionary of configured monitored processes - :rtype: dict - """ - cdata = {} - section = '' - - try: - with open(conf_file_name, 'r') as valet_conf_file: - for line in valet_conf_file.readlines(): - if line.strip(' \t\r\n')[:1] == '#' or line.__len__() == 2: - continue - elif line.lstrip(' \t\r\n')[:1] == ':': - tokens = line.lstrip(' \t\n\r').split(' ') - section = tokens[0][1:].strip('\n\r\n') - cdata[section] = {} - cdata[section][NAME] = section - else: - if line[:1] == '\n': - continue - tokens = line.split('=') - key = tokens[0].strip(' \t\n\r') - value = tokens[1].strip(' \t\n\r') - cdata[section][key] = value - - # if need to run a specific process - # remove all others - if process is not '': - for key in cdata.keys(): - if key != process: - del cdata[key] - - return cdata - except OSError: - print('unable to open %s file for some reason' % conf_file_name) - return cdata - -======= ->>>>>>> da5d947... Resolve affinity group with flavor id - def _valid_process_conf_data(self, process_data): - """Valid Process conf data. - - verify all mandatory parameters are found in the monitored process - configuration only standby_list is optional - - :param process_data: specific process configuration parameters - :type process_data: dict - :return: are all mandatory parameters are found - :rtype: bool - """ - if (process_data.get(HOST) is not None and - process_data.get(PRIORITY) is not None and - process_data.get(ORDER) is not None and - process_data.get(START_COMMAND) is not None and - process_data.get(STOP_COMMAND) is not None and - process_data.get(TEST_COMMAND) is not None): - return True - else: - return False - - def start(self): - """Start valet HA - Main function.""" - prepare_log(self, 'havalet') - self.log.info('ha_valet v1.1 starting') - - conf_data = read_conf() - - if len(conf_data.keys()) is 0: - self.log.warn('Processes list is empty - leaving.') - return - - threads = [] - exit_event = threading.Event() - - # sort by launching order - proc_sorted = sorted(conf_data.values(), key=lambda d: int(d[ORDER])) - - for proc in proc_sorted: - if self._valid_process_conf_data(proc): - self.log.info('Launching: ' + proc[NAME] + ' - parameters: ' + - str(proc)) - thread = HaValetThread(proc, exit_event) - time.sleep(HEARTBEAT_SEC) - thread.start() - threads.append(thread) - else: - self.log.info(proc[NAME] + - " section is missing mandatory parameter.") - continue - - self.log.info('on air.') - - while not exit_event.isSet(): - time.sleep(HEARTBEAT_SEC) - - # Wait for all threads to complete - for thread in threads: - thread.join() - - self.log.info('ha_valet v1.1 exiting') - -if __name__ == '__main__' or __name__ == "main": - CONF(default_config_files=[DEFAULT_CONF_FILE]) - HAValet().start() diff --git a/valet/ha/ha_valet2.cfg b/valet/ha/ha_valet2.cfg deleted file mode 100644 index d5c6e30..0000000 --- a/valet/ha/ha_valet2.cfg +++ /dev/null @@ -1,66 +0,0 @@ - - -# -# --------------------------------------------------------------------------- -# Copyright 2014-2017 AT&T Intellectual Property -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# --------------------------------------------------------------------------- -# -# -# configuration file for havalet - valet processes monitoring tool. -# group name is a logical process name. -# properties MUST have the following properties: -# 'host' -# 'user' -# 'start' -# 'stop' -# 'test' -# -# IMPORTANT: -# "test" - MUST return a value != 0, this value should reflects -# the monitored process priority. -# -# "process's priority" - used for active/stand-by scenarios. -# MUST be greater than 0 - lower number means higher priority. -# e.g. instance which returns '1', as its response to "test" invocation, -# will get precedence over instance which returns '2' as its priority. -# priority 0 means thr process is down. -# -# "stand_by_list" - OPTIONAL property. comma delimited hosts list. -# used on active/stand-by scenarios. -# ha_valet will attempt to restart the instance with the lower priority value, -# only if the instance fails to start, ha_valet will try to restart the process -# on the following host in the list. - -[Ostro] -order=5 -priority=2 -host=valet2 -user=m04060 -stand_by_list=valet1 -start="ssh -o ConnectTimeout=1 %(user)s@%(host)s 'python /usr/local/lib/python2.7/dist-packages/valet/engine/optimizer/ostro_server/ostro_daemon.py -c restart'" -stop="ssh -o ConnectTimeout=1 %(user)s@%(host)s 'python /usr/local/lib/python2.7/dist-packages/valet/engine/optimizer/ostro_server/ostro_daemon.py -c stop'" -test="ssh -o ConnectTimeout=1 %(user)s@%(host)s 'python /usr/local/lib/python2.7/dist-packages/valet/engine/optimizer/ostro_server/health_checker.py ; exit $?'" - - -[ValetApi] -order=4 -priority=2 -host=valet2 -stand_by_list=valet1 -user=m04060 -start="ssh -o ConnectTimeout=1 %(user)s@%(host)s 'sudo service apache2 restart' -stop="ssh -o ConnectTimeout=1 %(user)s@%(host)s 'sudo apachectl stop'" -test="exit $(wget -T 1 -t 1 -qO- http://%(host)s:8090/v1 | grep CURRENT | wc -l)" - diff --git a/valet/tests/tempest/scenario/analyzer.py b/valet/tests/tempest/scenario/analyzer.py index 0ad3dc9..aaee675 100644 --- a/valet/tests/tempest/scenario/analyzer.py +++ b/valet/tests/tempest/scenario/analyzer.py @@ -46,6 +46,7 @@ class Analyzer(object): self.log.log_info("Starting to check instances location") result = True + self.init_servers_list() self.init_resources(resources) ins_group = self.init_instances_for_group(resources) @@ -126,7 +127,6 @@ class Analyzer(object): """Return host of instance with matching name.""" hosts = [] - self.init_servers_list() self.log.log_debug("host - instance dictionary is: %s" % self.host_instance_dict) for res in res_name: @@ -180,6 +180,7 @@ class Analyzer(object): exclusivity_group_hosts = self.get_exclusivity_group_hosts() self.log.log_debug("exclusivity group hosts are: %s " % exclusivity_group_hosts) + self.log.log_debug("instances on host are: %s " % self.instances_on_host) # instances - all the instances on the exclusivity group hosts for host in exclusivity_group_hosts: diff --git a/valet/tests/tempest/scenario/general_logger.py b/valet/tests/tempest/scenario/general_logger.py index e7466e1..89dc2d1 100644 --- a/valet/tests/tempest/scenario/general_logger.py +++ b/valet/tests/tempest/scenario/general_logger.py @@ -16,9 +16,7 @@ """General Logger.""" from oslo_log import log as logging -from tempest import config -CONF = config.CONF LOG = logging.getLogger(__name__) COLORS = \ diff --git a/valet/tests/tempest/scenario/scenario_base.py b/valet/tests/tempest/scenario/scenario_base.py index 78301a3..782c9ef 100644 --- a/valet/tests/tempest/scenario/scenario_base.py +++ b/valet/tests/tempest/scenario/scenario_base.py @@ -138,6 +138,7 @@ class ScenarioTestCase(test.BaseTestCase): except Exception: self.log.log_error("Failed to create valet group", traceback.format_exc()) + raise def get_env_file(self, template): try: @@ -158,15 +159,24 @@ class ScenarioTestCase(test.BaseTestCase): self.log.log_error("Failed to load environment file", traceback.format_exc()) def _delete_group(self, group_id): - self.valet_client.delete_all_members(group_id) - self.valet_client.delete_group(group_id) + try: + self.valet_client.delete_all_members(group_id) + self.valet_client.delete_group(group_id) + except Exception: + self.log.log_error("Failed to delete group", traceback.format_exc()) + raise def delete_stack(self): """Use heat client to delete stack.""" - self.heat_client.delete_stack(self.stack_identifier) - self.heat_client.wait_for_stack_status( - self.stack_identifier, "DELETE_COMPLETE", - failure_pattern='^.*DELETE_FAILED$') + try: + self.heat_client.delete_stack(self.stack_identifier) + self.heat_client.wait_for_stack_status( + self.stack_identifier, "DELETE_COMPLETE", + failure_pattern='^.*DELETE_FAILED$') + + except Exception: + self.log.log_error("Failed to delete stack", traceback.format_exc()) + raise def show_stack(self, stack_id): """Return show stack with given id from heat client.""" diff --git a/valet/tests/unit/engine/test_ping.py b/valet/tests/unit/engine/test_ping.py index 635436f..877a981 100644 --- a/valet/tests/unit/engine/test_ping.py +++ b/valet/tests/unit/engine/test_ping.py @@ -21,7 +21,7 @@ class TestHealthCheck(Base): mock_send.return_value = True mock_read.return_value = True - self.validate_test(self.pingger.ping(1)) + self.validate_test(self.pingger.ping() == 1) @mock.patch.object(HealthCheck, '_send') @mock.patch.object(HealthCheck, '_read_response') @@ -29,7 +29,7 @@ class TestHealthCheck(Base): mock_send.return_value = False mock_read.return_value = True - self.validate_test(not self.pingger.ping(1)) + self.validate_test(self.pingger.ping() is None) @mock.patch.object(HealthCheck, '_send') @mock.patch.object(HealthCheck, '_read_response') @@ -37,7 +37,7 @@ class TestHealthCheck(Base): mock_send.return_value = True mock_read.return_value = False - self.validate_test(not self.pingger.ping(1)) + self.validate_test(not self.pingger.ping()) def test_send(self): self.pingger.rest.request.return_value.status_code = 204 @@ -48,29 +48,28 @@ class TestHealthCheck(Base): self.validate_test(not self.pingger._send()) def test_read_response(self): - id = 1 + mid = 1 self.pingger.rest.request.return_value.status_code = 200 - self.pingger.rest.request.return_value.text = json % (id, self.pingger.uuid) - self.validate_test(self.pingger._read_response(id)) + self.pingger.rest.request.return_value.text = json % (mid, self.pingger.uuid) + self.validate_test(self.pingger._read_response()) def test_read_response_from_other_engine(self): my_id = 1 - id = 2 self.pingger.rest.request.return_value.status_code = 200 - self.pingger.rest.request.return_value.text = json % (id, self.pingger.uuid) - self.validate_test(not self.pingger._read_response(my_id)) + self.pingger.rest.request.return_value.text = json % (my_id, self.pingger.uuid) + self.validate_test(not self.pingger._read_response() == 2) def test_read_response_unhappy_wrong_res_code(self): self.pingger.rest.request.return_value.status_code = 204 self.pingger.rest.request.return_value.text = self.pingger.uuid - self.validate_test(not self.pingger._read_response(1)) + self.validate_test(not self.pingger._read_response()) def test_read_response_unhappy_wrong_body(self): self.pingger.rest.request.return_value.status_code = 200 self.pingger.rest.request.return_value.text = "" - self.validate_test(not self.pingger._read_response(1)) + self.validate_test(not self.pingger._read_response()) def test_read_response_unhappy_wrong_both(self): self.pingger.rest.request.return_value.status_code = 204 self.pingger.rest.request.return_value.text = "" - self.validate_test(not self.pingger._read_response(1)) + self.validate_test(not self.pingger._read_response())