Improve delay with fine-grained locking
Change-Id: I5a8ef020e0c3dda50fa725c6b6509bc876593115
This commit is contained in:
parent
125f5ccfab
commit
543ef67ed1
@ -45,7 +45,7 @@ class AppHandler(object):
|
||||
|
||||
self.status = "success"
|
||||
|
||||
def add_app(self, _app_data):
|
||||
def add_app(self, _app):
|
||||
"""Add app and set or regenerate topology, return updated topology."""
|
||||
self.apps.clear()
|
||||
|
||||
@ -101,6 +101,55 @@ class AppHandler(object):
|
||||
|
||||
new_app = App(stack_id, application_name, action)
|
||||
self.apps[stack_id] = new_app
|
||||
=======
|
||||
self.logger.debug("AppHandler: parse app")
|
||||
|
||||
stack_id = None
|
||||
if "stack_id" in _app.keys():
|
||||
stack_id = _app["stack_id"]
|
||||
else:
|
||||
stack_id = "none"
|
||||
|
||||
application_name = None
|
||||
if "application_name" in _app.keys():
|
||||
application_name = _app["application_name"]
|
||||
else:
|
||||
application_name = "none"
|
||||
|
||||
action = _app["action"]
|
||||
if action == "ping":
|
||||
self.logger.debug("AppHandler: got ping")
|
||||
elif action == "replan" or action == "migrate":
|
||||
re_app = self._regenerate_app_topology(stack_id, _app, app_topology, action)
|
||||
if re_app is None:
|
||||
self.apps[stack_id] = None
|
||||
self.status = "cannot locate the original plan for stack = " + stack_id
|
||||
return None
|
||||
|
||||
if action == "replan":
|
||||
self.logger.debug("AppHandler: got replan: " + stack_id)
|
||||
elif action == "migrate":
|
||||
self.logger.debug("AppHandler: got migration: " + stack_id)
|
||||
|
||||
app_id = app_topology.set_app_topology(re_app)
|
||||
|
||||
if app_id is None:
|
||||
self.logger.error("AppHandler: " + app_topology.status)
|
||||
self.status = app_topology.status
|
||||
self.apps[stack_id] = None
|
||||
return None
|
||||
else:
|
||||
app_id = app_topology.set_app_topology(_app)
|
||||
|
||||
if app_id is None:
|
||||
self.logger.error("AppHandler: " + app_topology.status)
|
||||
self.status = app_topology.status
|
||||
self.apps[stack_id] = None
|
||||
return None
|
||||
|
||||
new_app = App(stack_id, application_name, action)
|
||||
self.apps[stack_id] = new_app
|
||||
>>>>>>> c095458... Improve delay with fine-grained locking
|
||||
|
||||
return app_topology
|
||||
|
||||
|
@ -21,6 +21,10 @@ from valet.common.music import Music
|
||||
from valet.engine.optimizer.db_connect.event import Event
|
||||
|
||||
|
||||
def ensurekey(d, k):
|
||||
return d.setdefault(k, {})
|
||||
|
||||
|
||||
class MusicHandler(object):
|
||||
"""Music Handler Class.
|
||||
|
||||
@ -509,42 +513,42 @@ class MusicHandler(object):
|
||||
if 'flavors' in _status.keys():
|
||||
flavors = _status['flavors']
|
||||
for fk, f in flavors.iteritems():
|
||||
if fk in json_resource['flavors'].keys():
|
||||
if fk in ensurekey(json_resource, 'flavors').keys():
|
||||
del json_resource['flavors'][fk]
|
||||
json_resource['flavors'][fk] = f
|
||||
|
||||
if 'logical_groups' in _status.keys():
|
||||
logical_groups = _status['logical_groups']
|
||||
for lgk, lg in logical_groups.iteritems():
|
||||
if lgk in json_resource['logical_groups'].keys():
|
||||
if lgk in ensurekey(json_resource, 'logical_groups').keys():
|
||||
del json_resource['logical_groups'][lgk]
|
||||
json_resource['logical_groups'][lgk] = lg
|
||||
|
||||
if 'storages' in _status.keys():
|
||||
storages = _status['storages']
|
||||
for stk, st in storages.iteritems():
|
||||
if stk in json_resource['storages'].keys():
|
||||
if stk in ensurekey(json_resource, 'storages').keys():
|
||||
del json_resource['storages'][stk]
|
||||
json_resource['storages'][stk] = st
|
||||
|
||||
if 'switches' in _status.keys():
|
||||
switches = _status['switches']
|
||||
for sk, s in switches.iteritems():
|
||||
if sk in json_resource['switches'].keys():
|
||||
if sk in ensurekey(json_resource, 'switches').keys():
|
||||
del json_resource['switches'][sk]
|
||||
json_resource['switches'][sk] = s
|
||||
|
||||
if 'hosts' in _status.keys():
|
||||
hosts = _status['hosts']
|
||||
for hk, h in hosts.iteritems():
|
||||
if hk in json_resource['hosts'].keys():
|
||||
if hk in ensurekey(json_resource, 'hosts').keys():
|
||||
del json_resource['hosts'][hk]
|
||||
json_resource['hosts'][hk] = h
|
||||
|
||||
if 'host_groups' in _status.keys():
|
||||
host_groupss = _status['host_groups']
|
||||
for hgk, hg in host_groupss.iteritems():
|
||||
if hgk in json_resource['host_groups'].keys():
|
||||
if hgk in ensurekey(json_resource, 'host_groups').keys():
|
||||
del json_resource['host_groups'][hgk]
|
||||
json_resource['host_groups'][hgk] = hg
|
||||
|
||||
|
@ -41,10 +41,9 @@ class Ostro(object):
|
||||
|
||||
self.db = MusicHandler(self.config, self.logger)
|
||||
if self.db.init_db() is False:
|
||||
self.logger.error("Ostro.__init__: error while initializing MUSIC "
|
||||
"database")
|
||||
self.logger.error("Ostro: error while initializing MUSIC database")
|
||||
else:
|
||||
self.logger.debug("Ostro.__init__: done init music")
|
||||
self.logger.debug("Ostro: done init music")
|
||||
|
||||
self.resource = Resource(self.db, self.config, self.logger)
|
||||
self.logger.debug("done init resource")
|
||||
@ -112,7 +111,7 @@ class Ostro(object):
|
||||
for t in self.thread_list:
|
||||
t.join()
|
||||
|
||||
self.logger.info("Ostro.run_ostro: exit Ostro")
|
||||
self.logger.info("Ostro: exit Ostro")
|
||||
|
||||
def stop_ostro(self):
|
||||
"""Stop main engine process."""
|
||||
@ -135,25 +134,22 @@ class Ostro(object):
|
||||
resource_status = self.db.get_resource_status(
|
||||
self.resource.datacenter.name)
|
||||
if resource_status is None:
|
||||
self.logger.error("Ostro.bootstrap: failed to read from table: " + self.config.db_resource_table)
|
||||
self.logger.error("Ostro: failed to read from table: " + self.config.db_resource_table)
|
||||
return False
|
||||
|
||||
if len(resource_status) > 0:
|
||||
self.logger.info("Ostro.bootstrap: bootstrap from db")
|
||||
self.logger.info("Ostro: bootstrap from db")
|
||||
if not self.resource.bootstrap_from_db(resource_status):
|
||||
self.logger.error("Ostro.bootstrap: failed to parse bootstrap data!")
|
||||
self.logger.error("Ostro: failed to parse bootstrap data!")
|
||||
|
||||
self.logger.info("read bootstrap data from OpenStack")
|
||||
if not self._set_hosts():
|
||||
self.logger.error('_set_hosts is false')
|
||||
return False
|
||||
|
||||
if not self._set_flavors():
|
||||
self.logger.info("_set_flavors is false")
|
||||
return False
|
||||
|
||||
if not self._set_topology():
|
||||
self.logger.error("_set_topology is false")
|
||||
return False
|
||||
|
||||
self.resource.update_topology()
|
||||
@ -162,7 +158,7 @@ class Ostro(object):
|
||||
self.logger.critical("Ostro.bootstrap failed: " +
|
||||
traceback.format_exc())
|
||||
|
||||
self.logger.info("Ostro.bootstrap: done bootstrap")
|
||||
self.logger.info("Ostro: done bootstrap")
|
||||
|
||||
return True
|
||||
|
||||
@ -196,93 +192,77 @@ class Ostro(object):
|
||||
|
||||
def place_app(self, _app_data):
|
||||
"""Place results of query and placement requests in the db."""
|
||||
self.data_lock.acquire()
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
query_request_list = []
|
||||
placement_request_list = []
|
||||
for req in _app_data:
|
||||
if req["action"] == "query":
|
||||
query_request_list.append(req)
|
||||
self.logger.info("Ostro: start query")
|
||||
|
||||
query_result = self._query(req)
|
||||
result = self._get_json_results("query", "ok",
|
||||
self.status, query_result)
|
||||
|
||||
if self.db.put_result(result) is False:
|
||||
return False
|
||||
|
||||
self.logger.info("Ostro: done query")
|
||||
else:
|
||||
placement_request_list.append(req)
|
||||
self.logger.info("Ostro: start app placement")
|
||||
|
||||
if len(query_request_list) > 0:
|
||||
self.logger.info("Ostro.place_app: start query")
|
||||
result = None
|
||||
placement_map = self._place_app(req)
|
||||
|
||||
query_results = self._query(query_request_list)
|
||||
if placement_map is None:
|
||||
result = self._get_json_results("placement", "error",
|
||||
self.status, placement_map)
|
||||
else:
|
||||
result = self._get_json_results("placement", "ok",
|
||||
"success", placement_map)
|
||||
|
||||
result = self._get_json_results("query", "ok", self.status,
|
||||
query_results)
|
||||
if self.db.put_result(result) is False:
|
||||
return False
|
||||
|
||||
if self.db.put_result(result) is False:
|
||||
self.data_lock.release()
|
||||
return False
|
||||
|
||||
self.logger.info("Ostro.place_app: done query")
|
||||
|
||||
if len(placement_request_list) > 0:
|
||||
|
||||
self.logger.info("Ostro.place_app: start app placement")
|
||||
|
||||
result = None
|
||||
|
||||
placement_map = self._place_app(placement_request_list)
|
||||
|
||||
if placement_map is None:
|
||||
result = self._get_json_results("placement", "error",
|
||||
self.status, placement_map)
|
||||
else:
|
||||
result = self._get_json_results("placement", "ok", "success",
|
||||
placement_map)
|
||||
|
||||
if self.db.put_result(result) is False:
|
||||
self.data_lock.release()
|
||||
return False
|
||||
|
||||
self.logger.info("Ostro.place_app: done app placement")
|
||||
self.logger.info("Ostro: done app placement")
|
||||
|
||||
end_time = time.time()
|
||||
self.logger.info("Ostro: total decision delay of request = " + str(end_time - start_time) + " sec")
|
||||
|
||||
self.logger.info("Ostro.place_app: total decision delay of request = " +
|
||||
str(end_time - start_time) + " sec")
|
||||
|
||||
self.data_lock.release()
|
||||
return True
|
||||
|
||||
def _query(self, _query_list):
|
||||
query_results = {}
|
||||
def _query(self, _q):
|
||||
query_result = {}
|
||||
|
||||
for q in _query_list:
|
||||
if "type" in q.keys():
|
||||
if q["type"] == "group_vms":
|
||||
if "parameters" in q.keys():
|
||||
params = q["parameters"]
|
||||
if "group_name" in params.keys():
|
||||
vm_list = self._get_vms_from_logical_group(
|
||||
params["group_name"])
|
||||
query_results[q["stack_id"]] = vm_list
|
||||
else:
|
||||
self.status = "unknown paramenter in query"
|
||||
self.logger.warn("Ostro._query: unknown paramenter in query")
|
||||
query_results[q["stack_id"]] = None
|
||||
if "type" in _q.keys():
|
||||
if _q["type"] == "group_vms":
|
||||
if "parameters" in _q.keys():
|
||||
params = _q["parameters"]
|
||||
if "group_name" in params.keys():
|
||||
self.data_lock.acquire()
|
||||
vm_list = self._get_vms_from_logical_group(params["group_name"])
|
||||
self.data_lock.release()
|
||||
query_result[_q["stack_id"]] = vm_list
|
||||
else:
|
||||
self.status = "no paramenter in query"
|
||||
self.logger.warn("Ostro._query: no parameters in query")
|
||||
query_results[q["stack_id"]] = None
|
||||
elif q["type"] == "all_groups":
|
||||
query_results[q["stack_id"]] = self._get_logical_groups()
|
||||
self.status = "unknown paramenter in query"
|
||||
self.logger.warn("Ostro: unknown paramenter in query")
|
||||
query_result[_q["stack_id"]] = None
|
||||
else:
|
||||
self.status = "unknown query type"
|
||||
self.logger.warn("Ostro._query: unknown query type")
|
||||
query_results[q["stack_id"]] = None
|
||||
self.status = "no paramenter in query"
|
||||
self.logger.warn("Ostro: no parameters in query")
|
||||
query_result[_q["stack_id"]] = None
|
||||
elif _q["type"] == "all_groups":
|
||||
self.data_lock.acquire()
|
||||
query_result[_q["stack_id"]] = self._get_logical_groups()
|
||||
self.data_lock.release()
|
||||
else:
|
||||
self.status = "unknown type in query"
|
||||
self.logger.warn("Ostro._query: no type in query")
|
||||
query_results[q["stack_id"]] = None
|
||||
self.status = "unknown query type"
|
||||
self.logger.warn("Ostro: unknown query type")
|
||||
query_result[_q["stack_id"]] = None
|
||||
else:
|
||||
self.status = "unknown type in query"
|
||||
self.logger.warn("Ostro: no type in query")
|
||||
query_result[_q["stack_id"]] = None
|
||||
|
||||
return query_results
|
||||
return query_result
|
||||
|
||||
def _get_vms_from_logical_group(self, _group_name):
|
||||
vm_list = []
|
||||
@ -310,9 +290,9 @@ class Ostro(object):
|
||||
|
||||
return logical_groups
|
||||
|
||||
def _place_app(self, _app_data):
|
||||
def _place_app(self, _app):
|
||||
"""Set application topology."""
|
||||
app_topology = self.app_handler.add_app(_app_data)
|
||||
app_topology = self.app_handler.add_app(_app)
|
||||
if app_topology is None:
|
||||
self.status = self.app_handler.status
|
||||
self.logger.error("Ostro._place_app: error while register"
|
||||
@ -323,14 +303,16 @@ class Ostro(object):
|
||||
for _, vm in app_topology.vms.iteritems():
|
||||
if self._set_vm_flavor_information(vm) is False:
|
||||
self.status = "fail to set flavor information"
|
||||
self.logger.error("Ostro._place_app: failed to set flavor information ")
|
||||
self.logger.error("Ostro: failed to set flavor information ")
|
||||
return None
|
||||
for _, vg in app_topology.vgroups.iteritems():
|
||||
if self._set_vm_flavor_information(vg) is False:
|
||||
self.status = "fail to set flavor information in a group"
|
||||
self.logger.error("Ostro._place_app: failed to set flavor information in a group")
|
||||
self.logger.error("Ostro: failed to set flavor information in a group")
|
||||
return None
|
||||
|
||||
self.data_lock.acquire()
|
||||
|
||||
"""Set weights for optimization."""
|
||||
app_topology.set_weight()
|
||||
app_topology.set_optimization_priority()
|
||||
@ -341,6 +323,7 @@ class Ostro(object):
|
||||
self.status = self.optimizer.status
|
||||
self.logger.debug("Ostro._place_app: error while optimizing app "
|
||||
"placement: " + self.status)
|
||||
self.data_lock.release()
|
||||
return None
|
||||
|
||||
"""Update resource and app information."""
|
||||
@ -354,6 +337,8 @@ class Ostro(object):
|
||||
if vk in placement_map.keys():
|
||||
del placement_map[vk]
|
||||
|
||||
self.data_lock.release()
|
||||
|
||||
return placement_map
|
||||
|
||||
def _set_vm_flavor_information(self, _v):
|
||||
|
@ -98,23 +98,21 @@ class ComputeManager(threading.Thread):
|
||||
self.logger.info("ComputeManager: --- start compute_nodes "
|
||||
"status update ---")
|
||||
|
||||
self.data_lock.acquire()
|
||||
try:
|
||||
triggered_host_updates = self.set_hosts()
|
||||
triggered_flavor_updates = self.set_flavors()
|
||||
triggered_host_updates = self.set_hosts()
|
||||
triggered_flavor_updates = self.set_flavors()
|
||||
|
||||
if triggered_host_updates is True and \
|
||||
triggered_flavor_updates is True:
|
||||
if self.resource.update_topology() is False:
|
||||
# TODO(UNKNOWN): error in MUSIC. ignore?
|
||||
pass
|
||||
else:
|
||||
# TODO(UNKNOWN): error handling, e.g.,
|
||||
# 3 times failure then stop Ostro?
|
||||
pass
|
||||
finally:
|
||||
if triggered_host_updates is True and triggered_flavor_updates is True:
|
||||
self.data_lock.acquire()
|
||||
update_status = self.resource.update_topology()
|
||||
self.data_lock.release()
|
||||
|
||||
if update_status is False:
|
||||
# TODO(GY): error in MUSIC. ignore?
|
||||
pass
|
||||
else:
|
||||
# TODO(GY): error handling, e.g., 3 times failure then stop Ostro?
|
||||
pass
|
||||
|
||||
self.logger.info("ComputeManager: --- done compute_nodes "
|
||||
"status update ---")
|
||||
|
||||
@ -139,8 +137,10 @@ class ComputeManager(threading.Thread):
|
||||
|
||||
self._compute_avail_host_resources(hosts)
|
||||
|
||||
self.data_lock.acquire()
|
||||
self._check_logical_group_update(logical_groups)
|
||||
self._check_host_update(hosts)
|
||||
self.data_lock.release()
|
||||
|
||||
return True
|
||||
|
||||
@ -387,7 +387,9 @@ class ComputeManager(threading.Thread):
|
||||
self.logger.error("ComputeManager: " + status)
|
||||
return False
|
||||
|
||||
self.data_lock.acquire()
|
||||
self._check_flavor_update(flavors)
|
||||
self.data_lock.release()
|
||||
|
||||
return True
|
||||
|
||||
|
@ -95,15 +95,15 @@ class TopologyManager(threading.Thread):
|
||||
self.logger.info("TopologyManager: --- start topology "
|
||||
"status update ---")
|
||||
|
||||
self.data_lock.acquire()
|
||||
try:
|
||||
if self.set_topology() is True:
|
||||
if self.resource.update_topology() is False:
|
||||
# TODO(UNKOWN): ignore?
|
||||
pass
|
||||
finally:
|
||||
if self.set_topology() is True:
|
||||
self.data_lock.acquire()
|
||||
update_status = self.resource.update_topology()
|
||||
self.data_lock.release()
|
||||
|
||||
if update_status is False:
|
||||
# TODO(GY): ignore?
|
||||
pass
|
||||
|
||||
self.logger.info("TopologyManager: --- done topology status update ---")
|
||||
|
||||
def set_topology(self):
|
||||
@ -128,7 +128,9 @@ class TopologyManager(threading.Thread):
|
||||
self.logger.error("TopologyManager: " + status)
|
||||
return False
|
||||
|
||||
self.data_lock.acquire()
|
||||
self._check_update(datacenter, host_groups, hosts, switches)
|
||||
self.data_lock.release()
|
||||
|
||||
return True
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user