Lazy resource status update to db

Client not catching exceptions
Engine Health Check refined

Change-Id: I1d5a5d385471815bfe0c2beb50bcf9ac57d9cae8
This commit is contained in:
Gueyoung Jung 2017-01-28 23:09:23 -05:00 committed by Omar Rivera
parent fc0d8fc3fc
commit 8083445865
21 changed files with 329 additions and 1028 deletions

View File

@ -1,19 +1,18 @@
# valet user/group required (or substitute as needed).
# Place in /opt/apache2/sites-available, symlink from
# /opt/apache2/sites-enabled, and run 'apachectl restart' as root.
# Optional: Append python-path=PATH_TO_VENV_PACKAGES to WSGIDaemonProcess
# Optional: Append python-path=PATH_TO_VENV_PACKAGES to WSGIDaemonProcess
Listen 8090
ServerName valet
<VirtualHost *:8090>
ServerName valet
WSGIDaemonProcess valet user=m04060 group=m04060 threads=20
WSGIDaemonProcess valet user=valet_user group=valet_user threads=30
WSGIScriptAlias / /var/www/valet/app.wsgi
SetEnv APACHE_RUN_USER m04060
SetEnv APACHE_RUN_GROUP m04060
SetEnv APACHE_RUN_USER valet_user
SetEnv APACHE_RUN_GROUP valet_user
WSGIProcessGroup valet
<Directory /var/www/valet/>
@ -23,7 +22,7 @@ ServerName valet
Allow from all
</Directory>
ErrorLog /var/log/valet/api.log
ErrorLog /var/log/apache2/valet/error.log
LogLevel warn
CustomLog /var/log/valet/access.log combined
CustomLog /var/log/apache2/valet/access.log combined
</VirtualHost>

View File

@ -4,120 +4,23 @@ logging_default_format_string='%(asctime)s.%(msecs)03d [%(levelname)-5.5s] [%(na
use_stderr=False
log_dir=/var/log/valet
# __
# /_\ |__| |
# / \ | |
#
[server]
host = 0.0.0.0
port = 8090
[messaging]
username = rabbitmq_username
password = rabbitmq_psw
host = rabbitmq_host
port = rabbitmq_port
username=rabbit_userid
password=rabbit_password
host=rabbit_host
port=rabbit_port
[identity]
project_name = project_name
username = project_username
password = project_username_password
auth_url = http://keystone_host:5000/v2.0
# interface = admin
# _ _
# | \ |_\
# |_/ |_/
#
project_name=identity_project
username=identity_user
password=identity_password
auth_url=auth_uri
[music]
host = music_host
port = 8080
keyspace = valet_keyspace
replication_factor = 3
# tries = 100
# interval = 0.1
# request_table = placement_requests
# response_table = placement_results
# event_table = oslo_messages
# resource_table = resource_status
# app_table = app
# resource_index_table = resource_log_index
# app_index_table = app_log_index
# uuid_table = uuid_map
# __ __ __
# |__ |\ | | | |\ | |__
# |__ | \| |__T | | \| |__
#
host=music_host
port=music_port
keyspace=db_keyspace
[engine]
# Used for Ostro active/passive selection
priority = 1
# Set the location of daemon process id
pid = /var/run/valet/ostro-daemon.pid
# Set IP of this Ostro
# ip = localhost
# health_timeout = 10
#------------------------------------------------------------------------------------------------------------
# Management configuration
#------------------------------------------------------------------------------------------------------------
# Inform the name of datacenter (region name), where Valet/Ostro is deployed.
# datacenter_name = aic
# Set the naming convention rules.
# Currently, 3 chars of CLLI + region number + 'r' + rack id number + 1 char of node type + node id number.
# For example, pdk15r05c001 indicates the first KVM compute server (i.e., 'c001') in the fifth rack
# (i.e., 'r05') in the fifteenth DeKalb-Peachtree Airport Region (i.e., 'pdk15').
# Set the number of chars that indicates the region code. The above example, 'pdk' is the region code.
# num_of_region_chars = 3
# Set 1 char of rack indicator. This should be 'r'.
# rack_code_list = r
# Set all of chars, each of which indicates the node type.
# Currently, 'a' = network, 'c' = KVM compute, 'u' = ESXi compute, 'f' = ?, 'o' = operation, 'p' = power,
# 's' = storage.
# node_code_list = a,c,u,f,o,p,s
# Set trigger time or frequency for checking compute hosting server status (i.e., call Nova)
# Note that currently, compute (Nova) should be triggered first then trigger topology.
# compute_trigger_time = 01:00
# compute_trigger_frequency = 14400
# Set trigger time or frequency for checking datacenter topology
# topology_trigger_time = 01:40
# topology_trigger_frequency = 28800
# Set default overbooking ratios. Note that each compute node can have its own ratios.
# default_cpu_allocation_ratio = 16
# default_ram_allocation_ratio = 1.5
# default_disk_allocation_ratio = 1
# Set static unused percentages of resources (i.e., standby) that are set aside for applications's workload spikes.
# static_cpu_standby_ratio = 0
# static_mem_standby_ratio = 0
# static_local_disk_standby_ratio = 0
# Set Ostro execution mode
# mode = [live|sim], sim will let Ostro simulate datacenter, while live will let it handle a real datacenter
# mode = live
# Set the location of simulation configuration file (i.e., ostro_sim.cfg).
# This is used only when the simulation mode
# sim_cfg_loc = /etc/valet/engine/ostro_sim.cfg
# Inform whether network controller (i.e., Tegu) has been deployed.
# If it does, set its API, Otherwise ignore these parameters
# network_control = no
# network_control_api = 29444/tegu/api
# Set RPC server ip and port if used. Otherwise, ignore these parameters
# rpc_server_ip = localhost
# rpc_server_port = 8002
priority=engine_priority

View File

@ -0,0 +1,13 @@
PARSED_VAL=`grep keyspace /etc/valet/valet.conf |awk 'BEGIN{FS="="}{print $NF}'`
KEYSPACE_NAME="$PARSED_VAL"
sed -i -e "s/#VALET_KEYSPACE#/${KEYSPACE_NAME}/g" /opt/app/aic-valet-tools/populate.cql
/usr/bin/cqlsh -f /opt/app/aic-valet-tools/populate.cql
cassandra_cnt=`/usr/bin/cqlsh -e "describe KEYSPACE ${KEYSPACE_NAME};"|grep -c CREATE`
if [ $cassandra_cnt -gt 12 ]; then
exit $cassandra_cnt
fi
/usr/bin/cqlsh -e "drop KEYSPACE ${KEYSPACE_NAME};"
sleep 5
pecan populate /var/www/valet/config.py >> /var/log/valet/pecan_populate.out
sleep 5
exit $cassandra_cnt

View File

@ -74,16 +74,13 @@ def tenant_servers_in_group(tenant_id, group):
server_list = server_list_for_group(group)
nova = nova_client()
for server_id in server_list:
if server_id == "none":
servers.append(server_id)
else:
try:
server = nova.servers.get(server_id)
if server.tenant_id == tenant_id:
servers.append(server_id)
except Exception as ex: # TODO(JD): update DB
api.LOG.error("Instance %s could not be found" % server_id)
api.LOG.error(ex)
try:
server = nova.servers.get(server_id)
if server.tenant_id == tenant_id:
servers.append(server_id)
except Exception as ex: # TODO(JD): update DB
api.LOG.error("Instance %s could not be found" % server_id)
api.LOG.error(ex)
if len(servers) > 0:
return servers

View File

@ -153,7 +153,7 @@ class Parser(object):
vgroups[vgroup.uuid] = vgroup
self.logger.debug("group = " + vgroup.name)
self.logger.debug("group = " + vgroup.name + vgroup.name + ", type = " + vgroup.vgroup_type)
vgroup_captured = True
self._set_vm_links(_elements, vms)

View File

@ -42,9 +42,13 @@ class MusicHandler(object):
if self.config.mode.startswith("sim"):
self.music = Music()
elif self.config.mode.startswith("live"):
self.music = Music(
hosts=self.config.db_hosts,
replication_factor=self.config.replication_factor)
self.music = Music(hosts=self.config.db_hosts,
replication_factor=self.config.replication_factor)
if self.config.db_hosts is not None and len(self.config.db_hosts) > 0:
for dbh in self.config.db_hosts:
self.logger.debug("DB: music host = " + dbh)
if self.config.replication_factor is not None:
self.logger.debug("DB: music replication factor = " + str(self.config.replication_factor))
def init_db(self):
"""Init Database.

View File

@ -77,9 +77,6 @@ class ConstraintSolver(object):
"node = " + _n.node.name
self.logger.error("ConstraintSolver: " + self.status)
return candidate_list
else:
self.logger.debug("ConstraintSolver: done availability_"
"zone constraint " + str(len(candidate_list)))
"""Host aggregate constraint."""
if isinstance(_n.node, VGroup) or isinstance(_n.node, VM):
@ -90,9 +87,6 @@ class ConstraintSolver(object):
"node = " + _n.node.name
self.logger.error("ConstraintSolver: " + self.status)
return candidate_list
else:
self.logger.debug("ConstraintSolver: done host_aggregate "
"constraint " + str(len(candidate_list)))
"""CPU capacity constraint."""
if isinstance(_n.node, VGroup) or isinstance(_n.node, VM):
@ -102,9 +96,6 @@ class ConstraintSolver(object):
"node = " + _n.node.name
self.logger.error("ConstraintSolver: " + self.status)
return candidate_list
else:
self.logger.debug("ConstraintSolver: done cpu capacity "
"constraint " + str(len(candidate_list)))
"""Memory capacity constraint."""
if isinstance(_n.node, VGroup) or isinstance(_n.node, VM):
@ -114,9 +105,6 @@ class ConstraintSolver(object):
"node = " + _n.node.name
self.logger.error("ConstraintSolver: " + self.status)
return candidate_list
else:
self.logger.debug("ConstraintSolver: done memory capacity "
"constraint " + str(len(candidate_list)))
"""Local disk capacity constraint."""
if isinstance(_n.node, VGroup) or isinstance(_n.node, VM):
@ -126,12 +114,7 @@ class ConstraintSolver(object):
"node = " + _n.node.name
self.logger.error("ConstraintSolver: " + self.status)
return candidate_list
else:
self.logger.debug("ConstraintSolver: done local disk capacity "
"constraint " + str(len(candidate_list)))
"""Diversity constraint."""
if len(_n.node.diversity_groups) > 0:
for _, diversity_id in _n.node.diversity_groups.iteritems():
if diversity_id.split(":")[0] == _level:
@ -154,9 +137,6 @@ class ConstraintSolver(object):
"node = " + _n.node.name
self.logger.error("ConstraintSolver: " + self.status)
return candidate_list
else:
self.logger.debug("ConstraintSolver: done diversity_group "
"constraint " + str(len(candidate_list)))
"""Exclusivity constraint."""
exclusivities = self.get_exclusivities(_n.node.exclusivity_groups,
@ -177,9 +157,6 @@ class ConstraintSolver(object):
"node = " + _n.node.name
self.logger.error("ConstraintSolver: " + self.status)
return candidate_list
else:
self.logger.debug("ConstraintSolver: done exclusivity "
"group constraint " + str(len(candidate_list)))
else:
self._constrain_non_exclusivity(_level, candidate_list)
if len(candidate_list) == 0:
@ -187,9 +164,6 @@ class ConstraintSolver(object):
"node = " + _n.node.name
self.logger.error("ConstraintSolver: " + self.status)
return candidate_list
else:
self.logger.debug("ConstraintSolver: done non-exclusivity_"
"group constraint " + str(len(candidate_list)))
"""Affinity constraint."""
affinity_id = _n.get_affinity_id() # level:name, except name == "any"
@ -203,9 +177,6 @@ class ConstraintSolver(object):
"node = " + _n.node.name
self.logger.error("ConstraintSolver: " + self.status)
return candidate_list
else:
self.logger.debug("ConstraintSolver: done affinity_"
"group constraintt " + str(len(candidate_list)))
return candidate_list

View File

@ -73,6 +73,55 @@ class Ostro(object):
self.status = "success"
self.end_of_process = False
self.batch_store_trigger = 10 # sec
self.batch_events_count = 1
'''
def run_ostro(self):
self.logger.info("start Ostro ......")
self.topology.start()
self.compute.start()
self.listener.start()
self.thread_list.append(self.topology)
self.thread_list.append(self.compute)
self.thread_list.append(self.listener)
while self.end_of_process is False:
request_list = self.db.get_requests()
if request_list is None:
break
if len(request_list) > 0:
if self.place_app(request_list) is False:
break
else:
event_list = self.db.get_events()
if event_list is None:
break
if len(event_list) > 0:
if self.handle_events(event_list) is False:
break
else:
if self.resource.resource_updated is True and \
(time.time()-self.resource.curr_db_timestamp) >= self.batch_store_trigger:
if self.resource.store_topology_updates() is False:
break
self.resource.resource_updated = False
else:
time.sleep(0.1)
self.topology.end_of_process = True
self.compute.end_of_process = True
for t in self.thread_list:
t.join()
self.logger.info("exit Ostro")
'''
def run_ostro(self):
"""Start main engine process."""
"""Start topology, compute, and listener processes. Start process of
@ -106,6 +155,12 @@ class Ostro(object):
if len(event_list) > 0:
if self.handle_events(event_list) is False:
break
else:
if self.resource.resource_updated is True and \
(time.time() - self.resource.curr_db_timestamp) >= self.batch_store_trigger:
if self.resource.store_topology_updates() is False:
break
self.resource.resource_updated = False
self.topology.end_of_process = True
self.compute.end_of_process = True
@ -226,7 +281,7 @@ class Ostro(object):
self.logger.info("done app placement")
end_time = time.time()
self.logger.info("total decision delay of request = " + str(end_time - start_time) + " sec")
self.logger.info("EVAL: total decision delay of request = " + str(end_time - start_time) + " sec")
return True
@ -266,6 +321,8 @@ class Ostro(object):
return query_result
def _get_vms_from_logical_group(self, _group_name):
self.logger.debug("query to see vms of " + _group_name)
vm_list = []
vm_id_list = []
@ -274,11 +331,18 @@ class Ostro(object):
lg.group_type == "DIV":
lg_id = lgk.split(":")
if lg_id[1] == _group_name:
self.logger.debug("found group in Ostro")
vm_id_list = lg.vm_list
break
if len(vm_id_list) == 0:
self.logger.debug("group does not exist in Ostro")
for vm_id in vm_id_list:
vm_list.append(vm_id[2])
if vm_id[2] != "none": # if physical_uuid != 'none'
vm_list.append(vm_id[2])
else:
self.logger.warn("found pending vms in this group while query")
return vm_list
@ -326,7 +390,8 @@ class Ostro(object):
"""Update resource and app information."""
if len(placement_map) > 0:
self.resource.update_topology()
self.resource.update_topology(store=False)
self.app_handler.add_placement(placement_map,
self.resource.current_timestamp)
if len(app_topology.exclusion_list_map) > 0 and \
@ -359,7 +424,7 @@ class Ostro(object):
if self._set_flavors() is False:
return False
self.resource.update_topology()
self.resource.update_topology(store=False)
flavor = self.resource.get_flavor(_vm.flavor)
if flavor is None:
@ -386,6 +451,8 @@ class Ostro(object):
resource_updated = False
events_count = 0
handled_event_list = []
for e in _event_list:
if e.host is not None and e.host != "none":
if self._check_host(e.host) is False:
@ -395,7 +462,7 @@ class Ostro(object):
if e.method == "build_and_run_instance":
# VM is created (from stack)
self.logger.debug("Ostro.handle_events: got build_and_run event")
self.logger.debug("Ostro.handle_events: got build_and_run event for " + e.uuid)
if self.db.put_uuid(e) is False:
self.data_lock.release()
return False
@ -403,6 +470,7 @@ class Ostro(object):
elif e.method == "object_action":
if e.object_name == 'Instance':
# VM became active or deleted
# h_uuid, stack_id
orch_id = self.db.get_uuid(e.uuid)
if orch_id is None:
self.data_lock.release()
@ -410,7 +478,7 @@ class Ostro(object):
if e.vm_state == "active":
self.logger.debug("Ostro.handle_events: got instance_"
"active event")
"active event for " + e.uuid)
vm_info = self.app_handler.get_vm_info(orch_id[1], orch_id[0], e.host)
if vm_info is None:
self.logger.error("Ostro.handle_events: error "
@ -476,7 +544,7 @@ class Ostro(object):
elif e.vm_state == "deleted":
self.logger.debug("Ostro.handle_events: got instance_"
"delete event")
"delete event for " + e.uuid)
self._remove_vm_from_host(e.uuid, orch_id[0], e.host,
e.vcpus, e.mem, e.local_disk)
@ -500,6 +568,9 @@ class Ostro(object):
# Host resource is updated
self.logger.debug("Ostro.handle_events: got compute event")
elif e.object_name == 'ComputeNode':
# Host resource is updated
self.logger.debug("EVENT: got compute for " + e.host)
# NOTE: what if host is disabled?
if self.resource.update_host_resources(
e.host, e.status, e.vcpus, e.vcpus_used, e.mem,
@ -516,10 +587,15 @@ class Ostro(object):
self.logger.warn("Ostro.handle_events: unknown event "
"method = " + e.method)
if resource_updated is True:
self.resource.update_topology()
events_count += 1
handled_event_list.append(e)
if events_count >= self.batch_events_count:
break
for e in _event_list:
if resource_updated is True:
self.resource.update_topology(store=False)
for e in handled_event_list:
if self.db.delete_event(e.event_id) is False:
self.data_lock.release()
return False
@ -567,6 +643,8 @@ class Ostro(object):
_vcpus, _mem,
_local_disk)
self.resource.update_host_time(_host_name)
else:
self.logger.warn("vm (" + _uuid + ") is missing while removing")
def _remove_vm_from_logical_groups(self, _uuid, _h_uuid, _host_name):
host = self.resource.hosts[_host_name]

View File

@ -323,19 +323,11 @@ class Search(object):
avail_resources = _avail_hosts
_node_list.sort(key=operator.attrgetter("sort_base"), reverse=True)
self.logger.debug("level = " + _level)
for on in _node_list:
self.logger.debug("node = {}, value = {}".format(on.node.name,
on.sort_base))
while len(_node_list) > 0:
n = _node_list.pop(0)
best_resource = self._get_best_resource_for_planned(n, _level,
avail_resources)
if best_resource is not None:
debug_best_resource = best_resource.get_resource_name(_level)
self.logger.debug("Search: best resource = " +
debug_best_resource + " for node = " +
n.node.name)
self._deduct_reservation(_level, best_resource, n)
self._close_planned_placement(_level, best_resource, n.node)
@ -535,7 +527,7 @@ class Search(object):
for lgk, lg in self.resource.logical_groups.iteritems():
if lg.status != "enabled":
self.logger.debug("group (" + lg.name + ") disabled")
self.logger.warn("group (" + lg.name + ") disabled")
continue
lgr = LogicalGroupResource()
@ -793,12 +785,6 @@ class Search(object):
_open_node_list.sort(key=operator.attrgetter("sort_base"), reverse=True)
self.logger.debug("Search: the order of open node list in level = " +
_level)
for on in _open_node_list:
self.logger.debug(" node = {}, value = {}".format(on.node.name,
on.sort_base))
while len(_open_node_list) > 0:
n = _open_node_list.pop(0)
self.logger.debug("Search: level = " + _level + ", node = " +
@ -809,19 +795,12 @@ class Search(object):
success = False
break
debug_best_resource = best_resource.get_resource_name(_level)
self.logger.debug("Search: best resource = " + debug_best_resource +
" for node = " + n.node.name)
if n.node not in self.planned_placements.keys():
"""for VM or Volume under host level only"""
self._deduct_reservation(_level, best_resource, n)
"""close all types of nodes under any level, but VM or Volume
with above host level"""
self._close_node_placement(_level, best_resource, n.node)
else:
self.logger.debug("Search: node (" + n.node.name +
") is already deducted")
return success
@ -1838,7 +1817,7 @@ class Search(object):
self._rollback_reservation(v)
if _v in self.node_placements.keys():
self.logger.debug("node (" + _v.name + ") rollbacked")
# self.logger.debug("node (" + _v.name + ") rollbacked")
chosen_host = self.avail_hosts[self.node_placements[_v].host_name]
level = self.node_placements[_v].level
@ -2139,8 +2118,6 @@ class Search(object):
def _rollback_node_placement(self, _v):
if _v in self.node_placements.keys():
del self.node_placements[_v]
self.logger.debug("Search: node (" + _v.name +
") removed from placement")
if isinstance(_v, VGroup):
for _, sg in _v.subvgroups.iteritems():

View File

@ -15,7 +15,7 @@ class HealthCheck(object):
rest = None
def __init__(self, hosts=[], port='8080', keyspace='valet'):
def __init__(self, hosts=[]):
self.tries = CONF.engine.health_timeout * 2 # default health_timeout=10
self.uuid = str(uuid.uuid4())
@ -28,15 +28,15 @@ class HealthCheck(object):
}
self.rest = REST(**kwargs)
def ping(self, my_id):
def ping(self):
engine_alive = False
engine_id = None
try:
if self._send():
engine_alive = self._read_response(my_id)
engine_id = self._read_response()
finally:
self._delete_result()
return engine_alive
return engine_id
def _send(self):
@ -55,9 +55,9 @@ class HealthCheck(object):
return response.status_code == 204 if response else False
def _read_response(self, my_id):
def _read_response(self):
found = False
engine_id = None
path = '/keyspaces/%(keyspace)s/tables/%(table)s/rows?stack_id=%(uid)s' % {
'keyspace': CONF.music.keyspace,
'table': CONF.music.response_table,
@ -72,52 +72,58 @@ class HealthCheck(object):
if response.status_code == 200 and len(response.text) > 3:
j = json.loads(response.text)
stack_id = j['row 0']['stack_id']
if j['row 0']['stack_id'] != self.uuid:
continue
placement = json.loads(j['row 0']['placement'])
engine_id = placement['resources']['id']
break
except Exception as e:
logger.warn("HealthCheck exception in read response - " + str(e))
if stack_id == self.uuid and engine_id == my_id:
found = True
break
except Exception:
pass
return found
return engine_id
def _delete_result(self):
# leave the table clean - delete from requests and responses
# leave a clean table - delete from requests and responses
data = {
"consistencyInfo": {"type": "eventual"}
}
try:
path = '/keyspaces/%(keyspace)s/tables/%(table)s/rows?stack_id=%(uid)s' % {
'keyspace': CONF.music.keyspace,
'table': CONF.music.request_table,
'uid': self.uuid
}
data = {
"consistencyInfo": {"type": "eventual"}
}
self.rest.request(method='delete', path=path, data=data)
except Exception as e:
logger.warn("HealthCheck exception in delete request - " + str(e))
try:
path = '/keyspaces/%(keyspace)s/tables/%(table)s/rows?stack_id=%(uid)s' % {
'keyspace': CONF.music.keyspace,
'table': CONF.music.response_table,
'uid': self.uuid
}
self.rest.request(method='delete', path=path, data=data)
except Exception:
pass
except Exception as e:
logger.warn("HealthCheck exception in delete response - " + str(e))
if __name__ == "__main__":
alive = False
respondent_id = None
code = 0
init_engine(default_config_files=['/etc/valet/valet.conf'])
logger = get_logger("ostro_daemon")
if os.path.exists(CONF.engine.pid):
alive = HealthCheck().ping(CONF.engine.priority)
if alive:
code = CONF.engine.priority
logger.info("HealthCheck - Alive, priority = {}".format(CONF.engine.priority))
respondent_id = HealthCheck().ping()
if respondent_id == CONF.engine.priority:
code = CONF.engine.priority
logger.info("HealthCheck - Alive, respondent instance id: {}".format(respondent_id))
else:
logger.warn("HealthCheck - pid file exists, engine {} did not respond in a timely manner (respondent id {})"
.format(CONF.engine.priority, respondent_id))
else:
logger.warn("HealthCheck - Engine is DEAD!")
logger.info("HealthCheck - no pid file, engine is not running!")
sys.exit(code)

View File

@ -103,7 +103,7 @@ class ComputeManager(threading.Thread):
if triggered_host_updates is True and triggered_flavor_updates is True:
self.data_lock.acquire()
update_status = self.resource.update_topology()
update_status = self.resource.update_topology(store=False)
self.data_lock.release()
if update_status is False:

View File

@ -53,7 +53,10 @@ class Resource(object):
self.flavors = {}
self.current_timestamp = 0
self.last_log_index = 0
self.curr_db_timestamp = 0
# self.last_log_index = 0
self.resource_updated = False
""" resource status aggregation """
self.CPU_avail = 0
@ -62,33 +65,6 @@ class Resource(object):
self.disk_avail = 0
self.nw_bandwidth_avail = 0
def show_current_logical_groups(self):
for lgk, lg in self.logical_groups.iteritems():
if lg.status == "enabled":
self.logger.debug("Resource: lg name = " + lgk)
self.logger.debug(" type = " + lg.group_type)
if lg.group_type == "AGGR":
for k in lg.metadata.keys():
self.logger.debug(" key = " + k)
self.logger.debug(" vms")
for v in lg.vm_list:
self.logger.debug(" orch_id = " + v[0] + " uuid = " + v[2])
self.logger.debug(" hosts")
for h, v in lg.vms_per_host.iteritems():
self.logger.debug(" host = " + h)
self.logger.debug(" vms = " + str(len(lg.vms_per_host[h])))
host = None
if h in self.hosts.keys():
host = self.hosts[h]
elif h in self.host_groups.keys():
host = self.host_groups[h]
else:
self.logger.error("Resource: lg member not exist")
if host is not None:
self.logger.debug(" status = " + host.status)
if lgk not in host.memberships.keys():
self.logger.error("membership missing")
def bootstrap_from_db(self, _resource_status):
"""Return True if bootsrap resource from database successful."""
try:
@ -313,29 +289,28 @@ class Resource(object):
self._update_storage_avail()
self._update_nw_bandwidth_avail()
# for test
# self.show_current_logical_groups()
if store is False:
return True
ct = self._store_topology_updates()
if ct is None:
return False
else:
self.current_timestamp = ct
return True
return self.store_topology_updates()
def _update_topology(self):
updated = False
for level in LEVELS:
for _, host_group in self.host_groups.iteritems():
if host_group.host_type == level and \
host_group.check_availability() is True:
if host_group.last_update > self.current_timestamp:
self._update_host_group_topology(host_group)
updated = True
if self.datacenter.last_update > self.current_timestamp:
if self.datacenter.last_update >= self.current_timestamp:
self._update_datacenter_topology()
updated = True
if updated is True:
self.current_timestamp = time.time()
self.resource_updated = True
def _update_host_group_topology(self, _host_group):
_host_group.init_resources()
@ -457,9 +432,8 @@ class Resource(object):
# NOTE: peer links?
self.nw_bandwidth_avail += min(avail_nw_bandwidth_list)
def _store_topology_updates(self):
last_update_time = self.current_timestamp
def store_topology_updates(self):
updated = False
flavor_updates = {}
logical_group_updates = {}
storage_updates = {}
@ -468,87 +442,139 @@ class Resource(object):
host_group_updates = {}
datacenter_update = None
for fk, flavor in self.flavors.iteritems():
if flavor.last_update > self.current_timestamp:
flavor_updates[fk] = flavor.get_json_info()
self.logger.info("check and store resource status")
last_update_time = flavor.last_update
for fk, flavor in self.flavors.iteritems():
if flavor.last_update >= self.curr_db_timestamp:
flavor_updates[fk] = flavor.get_json_info()
# self.logger.debug("resource flavor(" + fk + ") stored")
updated = True
for lgk, lg in self.logical_groups.iteritems():
if lg.last_update > self.current_timestamp:
if lg.last_update >= self.curr_db_timestamp:
logical_group_updates[lgk] = lg.get_json_info()
last_update_time = lg.last_update
# self.logger.debug("resource lg(" + lgk + ") stored")
updated = True
for shk, storage_host in self.storage_hosts.iteritems():
if storage_host.last_update > self.current_timestamp or \
storage_host.last_cap_update > self.current_timestamp:
if storage_host.last_update >= self.curr_db_timestamp or \
storage_host.last_cap_update >= self.curr_db_timestamp:
storage_updates[shk] = storage_host.get_json_info()
if storage_host.last_update > self.current_time_stamp:
last_update_time = storage_host.last_update
if storage_host.last_cap_update > self.current_timestamp:
last_update_time = storage_host.last_cap_update
# self.logger.debug("resource storage(" + shk + ") stored")
updated = True
for sk, s in self.switches.iteritems():
if s.last_update > self.current_timestamp:
if s.last_update >= self.curr_db_timestamp:
switch_updates[sk] = s.get_json_info()
last_update_time = s.last_update
# self.logger.debug("resource switch(" + sk + ") stored")
updated = True
for hk, host in self.hosts.iteritems():
if host.last_update > self.current_timestamp or \
host.last_link_update > self.current_timestamp:
host_updates[hk] = host.get_json_info()
if host.last_update > self.current_timestamp:
last_update_time = host.last_update
if host.last_link_update > self.current_timestamp:
last_update_time = host.last_link_update
# self.logger.debug("resource host(" + hk + ") stored")
updated = True
for hgk, host_group in self.host_groups.iteritems():
if host_group.last_update > self.current_timestamp or \
host_group.last_link_update > self.current_timestamp:
if host_group.last_update >= self.curr_db_timestamp or \
host_group.last_link_update >= self.curr_db_timestamp:
host_group_updates[hgk] = host_group.get_json_info()
# self.logger.debug("resource hgroup(" + hgk + ") stored")
updated = True
if host_group.last_update > self.current_timestamp:
last_update_time = host_group.last_update
if host_group.last_link_update > self.current_timestamp:
last_update_time = host_group.last_link_update
if self.datacenter.last_update > self.current_timestamp or \
self.datacenter.last_link_update > self.current_timestamp:
if self.datacenter.last_update >= self.curr_db_timestamp or \
self.datacenter.last_link_update >= self.curr_db_timestamp:
datacenter_update = self.datacenter.get_json_info()
# self.logger.debug("resource datacenter stored")
updated = True
if self.datacenter.last_update > self.current_timestamp:
last_update_time = self.datacenter.last_update
if self.datacenter.last_link_update > self.current_timestamp:
last_update_time = self.datacenter.last_link_update
# (resource_logfile, last_index, mode) = util.get_last_logfile(self.config.resource_log_loc,
# self.config.max_log_size,
# self.config.max_num_of_logs,
# self.datacenter.name,
# self.last_log_index)
# self.last_log_index = last_index
json_logging = {}
json_logging['timestamp'] = last_update_time
# logging = open(self.config.resource_log_loc + resource_logfile, mode)
if len(flavor_updates) > 0:
json_logging['flavors'] = flavor_updates
if len(logical_group_updates) > 0:
json_logging['logical_groups'] = logical_group_updates
if len(storage_updates) > 0:
json_logging['storages'] = storage_updates
if len(switch_updates) > 0:
json_logging['switches'] = switch_updates
if len(host_updates) > 0:
json_logging['hosts'] = host_updates
if len(host_group_updates) > 0:
json_logging['host_groups'] = host_group_updates
if datacenter_update is not None:
json_logging['datacenter'] = datacenter_update
if updated is True:
json_logging = {}
json_logging['timestamp'] = self.curr_db_timestamp
if self.db is not None:
if self.db.update_resource_status(self.datacenter.name,
json_logging) is False:
if len(flavor_updates) > 0:
json_logging['flavors'] = flavor_updates
if len(logical_group_updates) > 0:
json_logging['logical_groups'] = logical_group_updates
if len(storage_updates) > 0:
json_logging['storages'] = storage_updates
if len(switch_updates) > 0:
json_logging['switches'] = switch_updates
if len(host_updates) > 0:
json_logging['hosts'] = host_updates
if len(host_group_updates) > 0:
json_logging['host_groups'] = host_group_updates
if datacenter_update is not None:
json_logging['datacenter'] = datacenter_update
# logged_data = json.dumps(json_logging)
# logging.write(logged_data)
# logging.write("\n")
# logging.close()
# self.logger.info("log resource status in " + resource_logfile)
# if self.db is not None:
if self.db.update_resource_status(self.datacenter.name, json_logging) is False:
return None
return last_update_time
self.curr_db_timestamp = time.time()
# for test
# self.show_current_logical_groups()
# self.show_current_host_status()
return True
def show_current_logical_groups(self):
for lgk, lg in self.logical_groups.iteritems():
if lg.status == "enabled":
self.logger.debug("TEST: lg name = " + lgk)
self.logger.debug(" type = " + lg.group_type)
if lg.group_type == "AGGR":
for k in lg.metadata.keys():
self.logger.debug(" metadata key = " + k)
self.logger.debug(" vms")
for v in lg.vm_list:
self.logger.debug(" orch_id = " + v[0] + " uuid = " + v[2])
self.logger.debug(" hosts")
for h, v in lg.vms_per_host.iteritems():
self.logger.debug(" host = " + h)
self.logger.debug(" vms = " + str(len(lg.vms_per_host[h])))
host = None
if h in self.hosts.keys():
host = self.hosts[h]
elif h in self.host_groups.keys():
host = self.host_groups[h]
else:
self.logger.error("TEST: lg member not exist")
if host is not None:
self.logger.debug(" status = " + host.status)
if lgk not in host.memberships.keys():
self.logger.error("TEST: membership missing")
def show_current_host_status(self):
for hk, host in self.hosts.iteritems():
self.logger.debug("TEST: host name = " + hk)
self.logger.debug(" status = " + host.status)
self.logger.debug(" vms = " + str(len(host.vm_list)))
self.logger.debug(" memberships")
for mk in host.memberships.keys():
self.logger.debug(" " + mk)
if mk not in self.logical_groups.keys():
self.logger.error("TEST: lg missing")
def update_rack_resource(self, _host):
"""Update resources for rack (host), then update cluster."""
@ -780,13 +806,16 @@ class Resource(object):
"""Remove vm by orchestration id from lgs. Update host and lgs."""
for lgk in _host.memberships.keys():
if lgk not in self.logical_groups.keys():
self.logger.warn("logical group (" + lgk + ") missing while removing " + _h_uuid)
continue
lg = self.logical_groups[lgk]
if isinstance(_host, Host):
# remove host from lg's membership if the host has no vms of lg
if lg.remove_vm_by_h_uuid(_h_uuid, _host.name) is True:
lg.last_update = time.time()
# remove lg from host's membership if lg does not have the host
if _host.remove_membership(lg) is True:
_host.last_update = time.time()
@ -816,13 +845,16 @@ class Resource(object):
"""Remove vm by uuid from lgs and update proper host and lgs."""
for lgk in _host.memberships.keys():
if lgk not in self.logical_groups.keys():
self.logger.warn("logical group (" + lgk + ") missing while removing " + _uuid)
continue
lg = self.logical_groups[lgk]
if isinstance(_host, Host):
# remove host from lg's membership if the host has no vms of lg
if lg.remove_vm_by_uuid(_uuid, _host.name) is True:
lg.last_update = time.time()
# remove lg from host's membership if lg does not have the host
if _host.remove_membership(lg) is True:
_host.last_update = time.time()

View File

@ -97,7 +97,7 @@ class TopologyManager(threading.Thread):
if self.set_topology() is True:
self.data_lock.acquire()
update_status = self.resource.update_topology()
update_status = self.resource.update_topology(store=False)
self.data_lock.release()
if update_status is False:

View File

View File

@ -1,66 +0,0 @@
#!/usr/bin/env python
# vi: sw=4 ts=4:
#
# ---------------------------------------------------------------------------
# Copyright 2014-2017 AT&T Intellectual Property
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ---------------------------------------------------------------------------
#
#
# configuration file for havalet - valet processes monitoring tool.
# group name is a logical process name.
# properties MUST have the following properties:
# 'host'
# 'user'
# 'start'
# 'stop'
# 'test'
#
# IMPORTANT:
# "test" - MUST return a value != 0, this value should reflects
# the monitored process priority.
#
# "process's priority" - used for active/stand-by scenarios.
# MUST be greater than 0 - lower number means higher priority.
# e.g. instance which returns '1', as its response to "test" invocation,
# will get precedence over instance which returns '2' as its priority.
# priority 0 means thr process is down.
#
# "stand_by_list" - OPTIONAL property. comma delimited hosts list.
# used on active/stand-by scenarios.
# ha_valet will attempt to restart the instance with the lower priority value,
# only if the instance fails to start, ha_valet will try to restart the process
# on the following host in the list.
[Ostro]
order=5
priority=1
host=valet1
user=m04060
stand_by_list=valet2
start="ssh -o ConnectTimeout=1 %(user)s@%(host)s 'python /usr/local/lib/python2.7/dist-packages/valet/engine/optimizer/ostro_server/ostro_daemon.py -c restart'"
stop="ssh -o ConnectTimeout=1 %(user)s@%(host)s 'python /usr/local/lib/python2.7/dist-packages/valet/engine/optimizer/ostro_server/ostro_daemon.py -c stop'"
test="ssh -o ConnectTimeout=1 %(user)s@%(host)s 'python /usr/local/lib/python2.7/dist-packages/valet/engine/optimizer/ostro_server/health_checker.py ; exit $?'"
[ValetApi]
order=4
priority=1
host=valet1
stand_by_list=valet2
user=m04060
start="ssh -o ConnectTimeout=1 %(user)s@%(host)s 'sudo service apache2 restart'"
stop="ssh -o ConnectTimeout=1 %(user)s@%(host)s 'sudo apachectl stop'"
test="exit $(wget -T 1 -t 1 -qO- http://%(host)s:8090/v1 | grep CURRENT | wc -l)"

View File

@ -1,555 +0,0 @@
#
# Copyright 2014-2017 AT&T Intellectual Property
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""HA Valet.
Mnemonic: ha_valet.py
Abstract: High availability script for valet processes. Starts it's
configured processes, and pings for their availability. If local
instances are not running, then makes the current instances
start. If it finds multiple instances running, then determines
which instance should be shut down based on priorities.
Author: Amnon Sagiv based on ha_tegu by Kaustubh Joshi
------------------------------------------------------------------------------
Algorithm
-----------
The ha_valet script runs on each valet node in a continuous loop checking for
heartbeats from all the valet nodes found in the "stand_by_list" conf property
once every 5 secs (default). A heartbeat is obtained by invoking the
"test_command" conf property.
If exactly one monitored process instance is running, the script does
nothing. If no instance is running, then the local instance is activated after
waiting for 5*priority seconds to let a higher priority valet take over
first. A valet monitored process's priority is determined by its conf.
If the current node's is running and another is found, then a
conflict resolution process is invoked whereby the priorities of both
processes are compared, and the instance with the higher value is deactivated.
IMPORTANT: test_command must return a value != 0, this is value should reflects
the monitored process priority.
"""
import logging.handlers
import os
from oslo_config import cfg
import socket
import subprocess
import threading
import time
# import argparse
# from oslo_log import log as logging
CONF = cfg.CONF
# Directory locations
LOG_DIR = os.getenv('HA_VALET_LOGD', '/var/log/havalet/')
ETC_DIR = os.getenv('HA_VALET_ETCD', '/etc/valet/ha/')
DEFAULT_CONF_FILE = ETC_DIR + 'ha_valet.cfg'
# Set the maximum logfile size as Byte for time-series log files
max_log_size = 1000000
# Set the maximum number of time-series log files
max_num_of_logs = 10
PRIMARY_SETUP = 1
RETRY_COUNT = 3 # How many times to retry ping command
CONNECT_TIMEOUT = 3 # Ping timeout
MAX_QUICK_STARTS = 10 # we stop if there are > 10 restart in quick succession
QUICK_RESTART_SEC = 150 # we consider it a quick restart if less than this
# HA Configuration
HEARTBEAT_SEC = 20 # Heartbeat interval in seconds
NAME = 'name'
ORDER = 'order'
HOST = 'host'
USER = 'user'
PRIORITY = 'priority'
START_COMMAND = 'start'
STOP_COMMAND = 'stop'
TEST_COMMAND = 'test'
STAND_BY_LIST = 'stand_by_list'
ostro_group = cfg.OptGroup(name='Ostro', title='Valet Engine HA conf')
api_group = cfg.OptGroup(name='ValetApi', title='Valet Api HA conf')
havalet_opts = [
cfg.IntOpt(PRIORITY, default=1, help='master slave distinguish'),
cfg.IntOpt(ORDER, help='launching order'),
cfg.StrOpt(HOST, help='where the monitored process is running on'),
cfg.StrOpt(USER, help='linux user'),
cfg.ListOpt(STAND_BY_LIST, help='monitored hosts list'),
cfg.StrOpt(START_COMMAND, help='launch command'),
cfg.StrOpt(STOP_COMMAND, help='stop command'),
cfg.StrOpt(TEST_COMMAND, help='test command')
]
# common.init_conf("havalet.log", grp2opt={api_group: havalet_opts, ostro_group: havalet_opts})
CONF.register_group(api_group)
CONF.register_opts(havalet_opts, api_group)
CONF.register_group(ostro_group)
CONF.register_opts(havalet_opts, ostro_group)
def read_conf():
"""Return dictionary of configured processes."""
return dict([
('Ostro', {
NAME: 'Ostro',
ORDER: CONF.Ostro.order,
HOST: CONF.Ostro.host,
USER: CONF.Ostro.user,
PRIORITY: CONF.Ostro.priority,
START_COMMAND: CONF.Ostro.start,
STOP_COMMAND: CONF.Ostro.stop,
TEST_COMMAND: CONF.Ostro.test,
STAND_BY_LIST: CONF.Ostro.stand_by_list
}),
('ValetApi', {
NAME: 'ValetApi',
ORDER: CONF.ValetApi.order,
HOST: CONF.ValetApi.host,
USER: CONF.ValetApi.user,
PRIORITY: CONF.ValetApi.priority,
START_COMMAND: CONF.ValetApi.start,
STOP_COMMAND: CONF.ValetApi.stop,
TEST_COMMAND: CONF.ValetApi.test,
STAND_BY_LIST: CONF.ValetApi.stand_by_list
})])
def prepare_log(obj, name):
obj.log = logging.getLogger(name)
obj.log.setLevel(logging.DEBUG)
# logging.register_options(CONF)
# logging.setup(CONF, 'valet')
handler = logging.handlers.RotatingFileHandler(LOG_DIR + name + '.log',
maxBytes=max_log_size,
backupCount=max_num_of_logs)
fmt = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
handler.setFormatter(fmt)
obj.log.addHandler(handler)
class HaValetThread (threading.Thread):
def __init__(self, data, exit_event):
"""Initialize HAValetThread."""
threading.Thread.__init__(self)
self.data = data
self.log = None
def run(self):
"""Main function."""
prepare_log(self, self.data[NAME])
self.log.info('HA Valet - ' + self.data[NAME] +
' Watcher Thread - starting')
fqdn_list = []
this_node = socket.getfqdn()
fqdn_list.append(this_node)
# Read list of standby valet nodes and find us
standby_list = self.data.get(STAND_BY_LIST, None)
while not len(standby_list) is 0: # loop until we find us
self.log.debug("stand by list: " + str(standby_list))
try:
for fqdn in fqdn_list:
self.log.debug("fqdn_list: " + str(fqdn_list))
if fqdn in standby_list:
this_node = fqdn
break
standby_list.remove(this_node)
self.data[STAND_BY_LIST] = standby_list
self.log.debug("modified stand by list: " + str(standby_list))
except ValueError:
self.log.debug("host " + this_node +
" is not in standby list: %s - continue"
% str(standby_list))
break
# Loop forever sending pings
self._main_loop(this_node)
self.log.info("HA Valet Watcher Thread - going down!")
def use(self, param):
pass
def _main_loop(self, this_node):
"""Main heartbeat and liveness check loop.
:param this_node: host name
:type this_node: string
:return: None
:rtype:
"""
quick_start = 0 # number of restarts close together
last_start = 0
priority_wait = False
"""
DO NOT RENAME, DELETE, MOVE the following parameters,
they may be referenced from within the process commands
"""
host = self.data.get(HOST, 'localhost')
user = self.data.get(USER, None)
self.use(user)
my_priority = int(self.data.get(PRIORITY, 1))
start_command = self.data.get(START_COMMAND, None)
stop_command = self.data.get(STOP_COMMAND, None)
test_command = self.data.get(TEST_COMMAND, None)
standby_list = self.data.get(STAND_BY_LIST)
while True:
if not priority_wait:
# Normal heartbeat
time.sleep(HEARTBEAT_SEC)
else:
# No valet running. Wait for higher priority valet to activate.
time.sleep(HEARTBEAT_SEC / my_priority)
self.log.info('checking status here - ' + host +
', my priority: ' + str(my_priority))
i_am_active, priority = self._is_active(eval(test_command))
self.log.info(host + ': host_active = ' + str(i_am_active) +
', ' + str(priority))
any_active = i_am_active
self.log.info('any active = ' + str(any_active))
# Check for active valets
standby_list_is_empty = not standby_list
if not standby_list_is_empty:
self.log.debug('main loop: standby_list is not empty ' +
str(standby_list))
for host_in_list in standby_list:
if host_in_list == this_node:
self.log.info('host_in_list is this_node - skipping')
continue
self.log.info('checking status on - ' + host_in_list)
<<<<<<< HEAD
host = host_in_list
host_active, host_priority = \
self._is_active(eval(test_command))
host = self.data.get(HOST, 'localhost')
self.log.info(host_in_list + ' - host_active = ' +
str(host_active) + ', ' + str(host_priority))
=======
# host = host_in_list
host_active, host_priority = self._is_active(test_command % {'host': host_in_list, 'user': user})
# host = self.data.get(HOST, 'localhost')
self.log.info(host_in_list + ' - host_active = ' + str(host_active) + ', ' + str(host_priority))
>>>>>>> da5d947... Resolve affinity group with flavor id
# Check for split brain: 2 valets active
if i_am_active and host_active:
self.log.info('found two live instances, '
'checking priorities')
should_be_active = self._should_be_active(host_priority, my_priority)
if should_be_active:
<<<<<<< HEAD
self.log.info('deactivate myself, ' + host_in_list +
' already running')
# Deactivate myself
self._deactivate_process(eval(stop_command))
=======
self.log.info('deactivate myself, ' + host_in_list + ' already running')
self._deactivate_process(stop_command % {'host': host, 'user': user}) # Deactivate myself
>>>>>>> da5d947... Resolve affinity group with flavor id
i_am_active = False
else:
self.log.info('deactivate ' + self.data[NAME] +
' on ' + host_in_list +
', already running here')
<<<<<<< HEAD
host = host_in_list
# Deactivate other valet
self._deactivate_process(eval(stop_command))
host = self.data.get(HOST, 'localhost')
=======
# host = host_in_list
# Deactivate other valet
self._deactivate_process(stop_command % {'host': host_in_list, 'user': user})
# host = self.data.get(HOST, 'localhost')
>>>>>>> da5d947... Resolve affinity group with flavor id
# Track that at-least one valet is active
any_active = any_active or host_active
# If no active process or I'm primary, then we must try to start one
if not any_active or \
(not i_am_active and my_priority == PRIMARY_SETUP):
self.log.warn('there is no instance up')
self.log.info('Im primary instance: ' +
str(my_priority is PRIMARY_SETUP))
if priority_wait or my_priority == PRIMARY_SETUP:
now = int(time.time())
# quick restart (crash?)
if now - last_start < QUICK_RESTART_SEC:
quick_start += 1
if quick_start > MAX_QUICK_STARTS:
self.log.critical("too many restarts "
"in quick succession.")
else:
# reset if it's been a while since last restart
quick_start = 0
if last_start == 0:
diff = "never by this instance"
else:
diff = "%d seconds ago" % (now - last_start)
last_start = now
priority_wait = False
<<<<<<< HEAD
if (not i_am_active and my_priority == PRIMARY_SETUP) or \
(standby_list is not None):
self.log.info('no running instance found, '
'starting here; last start %s' % diff)
self._activate_process(start_command, my_priority)
else:
# LIMITATION - supporting only 1 stand by host
host = standby_list[0]
self.log.info('no running instances found, starting '
'on %s; last start %s' % (host, diff))
self._activate_process(start_command, my_priority)
host = self.data.get(HOST, 'localhost')
=======
if (not i_am_active and my_priority == PRIMARY_SETUP) or (standby_list is not None):
self.log.info('no running instance found, starting here; last start %s' % diff)
self._activate_process(start_command % {'host': host, 'user': user}, my_priority)
else:
# host = standby_list[0] # LIMITATION - supporting only 1 stand by host
self.log.info('no running instances found, starting on %s; last start %s' % (host, diff))
self._activate_process(start_command % {'host': standby_list[0], 'user': user}, my_priority)
# host = self.data.get(HOST, 'localhost')
>>>>>>> da5d947... Resolve affinity group with flavor id
else:
priority_wait = True
else:
self.log.info("status: up and running")
# end loop
def _should_be_active(self, host_priority, my_priority):
"""Should Be Active.
Returns True if host should be active as opposed to current node,
based on the hosts priorities.
Lower value means higher Priority,
0 (zero) - invalid priority (e.g. process is down)
:param host_priority: other host's priority
:type host_priority: int
:param my_priority: my priority
:type my_priority: int
:return: True/False
:rtype: bool
"""
self.log.info('my priority is %d, remote priority is %d' %
(my_priority, host_priority))
return host_priority < my_priority
def _is_active(self, call):
"""_is_active.
Return 'True, Priority' if valet is running on host
'False, None' Otherwise.
"""
# must use no-proxy to avoid proxy servers gumming up the works
for i in xrange(RETRY_COUNT):
try:
self.log.info('ping (retry %d): %s' % (i, call))
proc = subprocess.Popen(call, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, shell=True)
priority = proc.wait()
if priority == 255: # no route to host
priority = 0
out, err = proc.communicate()
self.log.debug('out: ' + out + ', err: ' + err)
self.log.info('ping result (should be > 0): %s'
% (str(priority)))
return (priority > 0), priority
except subprocess.CalledProcessError:
self.log.error('ping error: ' +
str(subprocess.CalledProcessError))
continue
return False, None
def _deactivate_process(self, deactivate_command):
"""Deactivate Process.
Deactivate valet on a given host. If host is omitted, local
valet is stopped. Returns True if successful, False on error.
"""
try:
# call = "'" + deactivate_command % (PROTO, host, port) + "'"
self.log.info('deactivate_command: ' + deactivate_command)
subprocess.check_call(deactivate_command, shell=True)
return True
except subprocess.CalledProcessError as e:
self.log.error(str(e))
return False
def _activate_process(self, activate_command, priority):
"""Activate Process.
Activate valet on a given host. If host is omitted, local
valet is started. Returns True if successful, False on error.
"""
try:
self.log.info('activate_command: ' + activate_command)
subprocess.check_call(activate_command, shell=True)
time.sleep(HEARTBEAT_SEC) # allow some grace period
return True
except subprocess.CalledProcessError as e:
self.log.error(str(e))
return False
class HAValet(object):
""""""
def __init__(self):
"""Init HAValet object."""
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
self.log = None
<<<<<<< HEAD
@DeprecationWarning
def _parse_valet_conf_v010(self, conf_file_name=DEFAULT_CONF_FILE,
process=''):
"""Parse Valet Conf v010.
This function reads the valet config file and returns configuration
attributes in key/value format
:param conf_file_name: config file name
:type conf_file_name: string
:param process: specific process name
when not supplied - the module launches all the
processes in the configuration
:type process: string
:return: dictionary of configured monitored processes
:rtype: dict
"""
cdata = {}
section = ''
try:
with open(conf_file_name, 'r') as valet_conf_file:
for line in valet_conf_file.readlines():
if line.strip(' \t\r\n')[:1] == '#' or line.__len__() == 2:
continue
elif line.lstrip(' \t\r\n')[:1] == ':':
tokens = line.lstrip(' \t\n\r').split(' ')
section = tokens[0][1:].strip('\n\r\n')
cdata[section] = {}
cdata[section][NAME] = section
else:
if line[:1] == '\n':
continue
tokens = line.split('=')
key = tokens[0].strip(' \t\n\r')
value = tokens[1].strip(' \t\n\r')
cdata[section][key] = value
# if need to run a specific process
# remove all others
if process is not '':
for key in cdata.keys():
if key != process:
del cdata[key]
return cdata
except OSError:
print('unable to open %s file for some reason' % conf_file_name)
return cdata
=======
>>>>>>> da5d947... Resolve affinity group with flavor id
def _valid_process_conf_data(self, process_data):
"""Valid Process conf data.
verify all mandatory parameters are found in the monitored process
configuration only standby_list is optional
:param process_data: specific process configuration parameters
:type process_data: dict
:return: are all mandatory parameters are found
:rtype: bool
"""
if (process_data.get(HOST) is not None and
process_data.get(PRIORITY) is not None and
process_data.get(ORDER) is not None and
process_data.get(START_COMMAND) is not None and
process_data.get(STOP_COMMAND) is not None and
process_data.get(TEST_COMMAND) is not None):
return True
else:
return False
def start(self):
"""Start valet HA - Main function."""
prepare_log(self, 'havalet')
self.log.info('ha_valet v1.1 starting')
conf_data = read_conf()
if len(conf_data.keys()) is 0:
self.log.warn('Processes list is empty - leaving.')
return
threads = []
exit_event = threading.Event()
# sort by launching order
proc_sorted = sorted(conf_data.values(), key=lambda d: int(d[ORDER]))
for proc in proc_sorted:
if self._valid_process_conf_data(proc):
self.log.info('Launching: ' + proc[NAME] + ' - parameters: ' +
str(proc))
thread = HaValetThread(proc, exit_event)
time.sleep(HEARTBEAT_SEC)
thread.start()
threads.append(thread)
else:
self.log.info(proc[NAME] +
" section is missing mandatory parameter.")
continue
self.log.info('on air.')
while not exit_event.isSet():
time.sleep(HEARTBEAT_SEC)
# Wait for all threads to complete
for thread in threads:
thread.join()
self.log.info('ha_valet v1.1 exiting')
if __name__ == '__main__' or __name__ == "main":
CONF(default_config_files=[DEFAULT_CONF_FILE])
HAValet().start()

View File

@ -1,66 +0,0 @@
#
# ---------------------------------------------------------------------------
# Copyright 2014-2017 AT&T Intellectual Property
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ---------------------------------------------------------------------------
#
#
# configuration file for havalet - valet processes monitoring tool.
# group name is a logical process name.
# properties MUST have the following properties:
# 'host'
# 'user'
# 'start'
# 'stop'
# 'test'
#
# IMPORTANT:
# "test" - MUST return a value != 0, this value should reflects
# the monitored process priority.
#
# "process's priority" - used for active/stand-by scenarios.
# MUST be greater than 0 - lower number means higher priority.
# e.g. instance which returns '1', as its response to "test" invocation,
# will get precedence over instance which returns '2' as its priority.
# priority 0 means thr process is down.
#
# "stand_by_list" - OPTIONAL property. comma delimited hosts list.
# used on active/stand-by scenarios.
# ha_valet will attempt to restart the instance with the lower priority value,
# only if the instance fails to start, ha_valet will try to restart the process
# on the following host in the list.
[Ostro]
order=5
priority=2
host=valet2
user=m04060
stand_by_list=valet1
start="ssh -o ConnectTimeout=1 %(user)s@%(host)s 'python /usr/local/lib/python2.7/dist-packages/valet/engine/optimizer/ostro_server/ostro_daemon.py -c restart'"
stop="ssh -o ConnectTimeout=1 %(user)s@%(host)s 'python /usr/local/lib/python2.7/dist-packages/valet/engine/optimizer/ostro_server/ostro_daemon.py -c stop'"
test="ssh -o ConnectTimeout=1 %(user)s@%(host)s 'python /usr/local/lib/python2.7/dist-packages/valet/engine/optimizer/ostro_server/health_checker.py ; exit $?'"
[ValetApi]
order=4
priority=2
host=valet2
stand_by_list=valet1
user=m04060
start="ssh -o ConnectTimeout=1 %(user)s@%(host)s 'sudo service apache2 restart'
stop="ssh -o ConnectTimeout=1 %(user)s@%(host)s 'sudo apachectl stop'"
test="exit $(wget -T 1 -t 1 -qO- http://%(host)s:8090/v1 | grep CURRENT | wc -l)"

View File

@ -46,6 +46,7 @@ class Analyzer(object):
self.log.log_info("Starting to check instances location")
result = True
self.init_servers_list()
self.init_resources(resources)
ins_group = self.init_instances_for_group(resources)
@ -126,7 +127,6 @@ class Analyzer(object):
"""Return host of instance with matching name."""
hosts = []
self.init_servers_list()
self.log.log_debug("host - instance dictionary is: %s" % self.host_instance_dict)
for res in res_name:
@ -180,6 +180,7 @@ class Analyzer(object):
exclusivity_group_hosts = self.get_exclusivity_group_hosts()
self.log.log_debug("exclusivity group hosts are: %s " % exclusivity_group_hosts)
self.log.log_debug("instances on host are: %s " % self.instances_on_host)
# instances - all the instances on the exclusivity group hosts
for host in exclusivity_group_hosts:

View File

@ -16,9 +16,7 @@
"""General Logger."""
from oslo_log import log as logging
from tempest import config
CONF = config.CONF
LOG = logging.getLogger(__name__)
COLORS = \

View File

@ -138,6 +138,7 @@ class ScenarioTestCase(test.BaseTestCase):
except Exception:
self.log.log_error("Failed to create valet group",
traceback.format_exc())
raise
def get_env_file(self, template):
try:
@ -158,15 +159,24 @@ class ScenarioTestCase(test.BaseTestCase):
self.log.log_error("Failed to load environment file", traceback.format_exc())
def _delete_group(self, group_id):
self.valet_client.delete_all_members(group_id)
self.valet_client.delete_group(group_id)
try:
self.valet_client.delete_all_members(group_id)
self.valet_client.delete_group(group_id)
except Exception:
self.log.log_error("Failed to delete group", traceback.format_exc())
raise
def delete_stack(self):
"""Use heat client to delete stack."""
self.heat_client.delete_stack(self.stack_identifier)
self.heat_client.wait_for_stack_status(
self.stack_identifier, "DELETE_COMPLETE",
failure_pattern='^.*DELETE_FAILED$')
try:
self.heat_client.delete_stack(self.stack_identifier)
self.heat_client.wait_for_stack_status(
self.stack_identifier, "DELETE_COMPLETE",
failure_pattern='^.*DELETE_FAILED$')
except Exception:
self.log.log_error("Failed to delete stack", traceback.format_exc())
raise
def show_stack(self, stack_id):
"""Return show stack with given id from heat client."""

View File

@ -21,7 +21,7 @@ class TestHealthCheck(Base):
mock_send.return_value = True
mock_read.return_value = True
self.validate_test(self.pingger.ping(1))
self.validate_test(self.pingger.ping() == 1)
@mock.patch.object(HealthCheck, '_send')
@mock.patch.object(HealthCheck, '_read_response')
@ -29,7 +29,7 @@ class TestHealthCheck(Base):
mock_send.return_value = False
mock_read.return_value = True
self.validate_test(not self.pingger.ping(1))
self.validate_test(self.pingger.ping() is None)
@mock.patch.object(HealthCheck, '_send')
@mock.patch.object(HealthCheck, '_read_response')
@ -37,7 +37,7 @@ class TestHealthCheck(Base):
mock_send.return_value = True
mock_read.return_value = False
self.validate_test(not self.pingger.ping(1))
self.validate_test(not self.pingger.ping())
def test_send(self):
self.pingger.rest.request.return_value.status_code = 204
@ -48,29 +48,28 @@ class TestHealthCheck(Base):
self.validate_test(not self.pingger._send())
def test_read_response(self):
id = 1
mid = 1
self.pingger.rest.request.return_value.status_code = 200
self.pingger.rest.request.return_value.text = json % (id, self.pingger.uuid)
self.validate_test(self.pingger._read_response(id))
self.pingger.rest.request.return_value.text = json % (mid, self.pingger.uuid)
self.validate_test(self.pingger._read_response())
def test_read_response_from_other_engine(self):
my_id = 1
id = 2
self.pingger.rest.request.return_value.status_code = 200
self.pingger.rest.request.return_value.text = json % (id, self.pingger.uuid)
self.validate_test(not self.pingger._read_response(my_id))
self.pingger.rest.request.return_value.text = json % (my_id, self.pingger.uuid)
self.validate_test(not self.pingger._read_response() == 2)
def test_read_response_unhappy_wrong_res_code(self):
self.pingger.rest.request.return_value.status_code = 204
self.pingger.rest.request.return_value.text = self.pingger.uuid
self.validate_test(not self.pingger._read_response(1))
self.validate_test(not self.pingger._read_response())
def test_read_response_unhappy_wrong_body(self):
self.pingger.rest.request.return_value.status_code = 200
self.pingger.rest.request.return_value.text = ""
self.validate_test(not self.pingger._read_response(1))
self.validate_test(not self.pingger._read_response())
def test_read_response_unhappy_wrong_both(self):
self.pingger.rest.request.return_value.status_code = 204
self.pingger.rest.request.return_value.text = ""
self.validate_test(not self.pingger._read_response(1))
self.validate_test(not self.pingger._read_response())