From 543ef67ed16cba0290caf176be538a63cfbb3057 Mon Sep 17 00:00:00 2001 From: Gueyoung Jung Date: Mon, 9 Jan 2017 08:43:19 -0500 Subject: [PATCH] Improve delay with fine-grained locking Change-Id: I5a8ef020e0c3dda50fa725c6b6509bc876593115 --- .../optimizer/app_manager/app_handler.py | 51 +++++- .../optimizer/db_connect/music_handler.py | 16 +- valet/engine/optimizer/ostro/ostro.py | 151 ++++++++---------- .../resource_manager/compute_manager.py | 30 ++-- .../resource_manager/topology_manager.py | 16 +- 5 files changed, 153 insertions(+), 111 deletions(-) diff --git a/valet/engine/optimizer/app_manager/app_handler.py b/valet/engine/optimizer/app_manager/app_handler.py index e83c409..8a729ab 100755 --- a/valet/engine/optimizer/app_manager/app_handler.py +++ b/valet/engine/optimizer/app_manager/app_handler.py @@ -45,7 +45,7 @@ class AppHandler(object): self.status = "success" - def add_app(self, _app_data): + def add_app(self, _app): """Add app and set or regenerate topology, return updated topology.""" self.apps.clear() @@ -101,6 +101,55 @@ class AppHandler(object): new_app = App(stack_id, application_name, action) self.apps[stack_id] = new_app +======= + self.logger.debug("AppHandler: parse app") + + stack_id = None + if "stack_id" in _app.keys(): + stack_id = _app["stack_id"] + else: + stack_id = "none" + + application_name = None + if "application_name" in _app.keys(): + application_name = _app["application_name"] + else: + application_name = "none" + + action = _app["action"] + if action == "ping": + self.logger.debug("AppHandler: got ping") + elif action == "replan" or action == "migrate": + re_app = self._regenerate_app_topology(stack_id, _app, app_topology, action) + if re_app is None: + self.apps[stack_id] = None + self.status = "cannot locate the original plan for stack = " + stack_id + return None + + if action == "replan": + self.logger.debug("AppHandler: got replan: " + stack_id) + elif action == "migrate": + self.logger.debug("AppHandler: got migration: " + stack_id) + + app_id = app_topology.set_app_topology(re_app) + + if app_id is None: + self.logger.error("AppHandler: " + app_topology.status) + self.status = app_topology.status + self.apps[stack_id] = None + return None + else: + app_id = app_topology.set_app_topology(_app) + + if app_id is None: + self.logger.error("AppHandler: " + app_topology.status) + self.status = app_topology.status + self.apps[stack_id] = None + return None + + new_app = App(stack_id, application_name, action) + self.apps[stack_id] = new_app +>>>>>>> c095458... Improve delay with fine-grained locking return app_topology diff --git a/valet/engine/optimizer/db_connect/music_handler.py b/valet/engine/optimizer/db_connect/music_handler.py index 0a1b9ca..766a012 100644 --- a/valet/engine/optimizer/db_connect/music_handler.py +++ b/valet/engine/optimizer/db_connect/music_handler.py @@ -21,6 +21,10 @@ from valet.common.music import Music from valet.engine.optimizer.db_connect.event import Event +def ensurekey(d, k): + return d.setdefault(k, {}) + + class MusicHandler(object): """Music Handler Class. @@ -509,42 +513,42 @@ class MusicHandler(object): if 'flavors' in _status.keys(): flavors = _status['flavors'] for fk, f in flavors.iteritems(): - if fk in json_resource['flavors'].keys(): + if fk in ensurekey(json_resource, 'flavors').keys(): del json_resource['flavors'][fk] json_resource['flavors'][fk] = f if 'logical_groups' in _status.keys(): logical_groups = _status['logical_groups'] for lgk, lg in logical_groups.iteritems(): - if lgk in json_resource['logical_groups'].keys(): + if lgk in ensurekey(json_resource, 'logical_groups').keys(): del json_resource['logical_groups'][lgk] json_resource['logical_groups'][lgk] = lg if 'storages' in _status.keys(): storages = _status['storages'] for stk, st in storages.iteritems(): - if stk in json_resource['storages'].keys(): + if stk in ensurekey(json_resource, 'storages').keys(): del json_resource['storages'][stk] json_resource['storages'][stk] = st if 'switches' in _status.keys(): switches = _status['switches'] for sk, s in switches.iteritems(): - if sk in json_resource['switches'].keys(): + if sk in ensurekey(json_resource, 'switches').keys(): del json_resource['switches'][sk] json_resource['switches'][sk] = s if 'hosts' in _status.keys(): hosts = _status['hosts'] for hk, h in hosts.iteritems(): - if hk in json_resource['hosts'].keys(): + if hk in ensurekey(json_resource, 'hosts').keys(): del json_resource['hosts'][hk] json_resource['hosts'][hk] = h if 'host_groups' in _status.keys(): host_groupss = _status['host_groups'] for hgk, hg in host_groupss.iteritems(): - if hgk in json_resource['host_groups'].keys(): + if hgk in ensurekey(json_resource, 'host_groups').keys(): del json_resource['host_groups'][hgk] json_resource['host_groups'][hgk] = hg diff --git a/valet/engine/optimizer/ostro/ostro.py b/valet/engine/optimizer/ostro/ostro.py index 7e8e5e8..f1e929f 100755 --- a/valet/engine/optimizer/ostro/ostro.py +++ b/valet/engine/optimizer/ostro/ostro.py @@ -41,10 +41,9 @@ class Ostro(object): self.db = MusicHandler(self.config, self.logger) if self.db.init_db() is False: - self.logger.error("Ostro.__init__: error while initializing MUSIC " - "database") + self.logger.error("Ostro: error while initializing MUSIC database") else: - self.logger.debug("Ostro.__init__: done init music") + self.logger.debug("Ostro: done init music") self.resource = Resource(self.db, self.config, self.logger) self.logger.debug("done init resource") @@ -112,7 +111,7 @@ class Ostro(object): for t in self.thread_list: t.join() - self.logger.info("Ostro.run_ostro: exit Ostro") + self.logger.info("Ostro: exit Ostro") def stop_ostro(self): """Stop main engine process.""" @@ -135,25 +134,22 @@ class Ostro(object): resource_status = self.db.get_resource_status( self.resource.datacenter.name) if resource_status is None: - self.logger.error("Ostro.bootstrap: failed to read from table: " + self.config.db_resource_table) + self.logger.error("Ostro: failed to read from table: " + self.config.db_resource_table) return False if len(resource_status) > 0: - self.logger.info("Ostro.bootstrap: bootstrap from db") + self.logger.info("Ostro: bootstrap from db") if not self.resource.bootstrap_from_db(resource_status): - self.logger.error("Ostro.bootstrap: failed to parse bootstrap data!") + self.logger.error("Ostro: failed to parse bootstrap data!") self.logger.info("read bootstrap data from OpenStack") if not self._set_hosts(): - self.logger.error('_set_hosts is false') return False if not self._set_flavors(): - self.logger.info("_set_flavors is false") return False if not self._set_topology(): - self.logger.error("_set_topology is false") return False self.resource.update_topology() @@ -162,7 +158,7 @@ class Ostro(object): self.logger.critical("Ostro.bootstrap failed: " + traceback.format_exc()) - self.logger.info("Ostro.bootstrap: done bootstrap") + self.logger.info("Ostro: done bootstrap") return True @@ -196,93 +192,77 @@ class Ostro(object): def place_app(self, _app_data): """Place results of query and placement requests in the db.""" - self.data_lock.acquire() - start_time = time.time() - query_request_list = [] - placement_request_list = [] for req in _app_data: if req["action"] == "query": - query_request_list.append(req) + self.logger.info("Ostro: start query") + + query_result = self._query(req) + result = self._get_json_results("query", "ok", + self.status, query_result) + + if self.db.put_result(result) is False: + return False + + self.logger.info("Ostro: done query") else: - placement_request_list.append(req) + self.logger.info("Ostro: start app placement") - if len(query_request_list) > 0: - self.logger.info("Ostro.place_app: start query") + result = None + placement_map = self._place_app(req) - query_results = self._query(query_request_list) + if placement_map is None: + result = self._get_json_results("placement", "error", + self.status, placement_map) + else: + result = self._get_json_results("placement", "ok", + "success", placement_map) - result = self._get_json_results("query", "ok", self.status, - query_results) + if self.db.put_result(result) is False: + return False - if self.db.put_result(result) is False: - self.data_lock.release() - return False - - self.logger.info("Ostro.place_app: done query") - - if len(placement_request_list) > 0: - - self.logger.info("Ostro.place_app: start app placement") - - result = None - - placement_map = self._place_app(placement_request_list) - - if placement_map is None: - result = self._get_json_results("placement", "error", - self.status, placement_map) - else: - result = self._get_json_results("placement", "ok", "success", - placement_map) - - if self.db.put_result(result) is False: - self.data_lock.release() - return False - - self.logger.info("Ostro.place_app: done app placement") + self.logger.info("Ostro: done app placement") end_time = time.time() + self.logger.info("Ostro: total decision delay of request = " + str(end_time - start_time) + " sec") - self.logger.info("Ostro.place_app: total decision delay of request = " + - str(end_time - start_time) + " sec") - - self.data_lock.release() return True - def _query(self, _query_list): - query_results = {} + def _query(self, _q): + query_result = {} - for q in _query_list: - if "type" in q.keys(): - if q["type"] == "group_vms": - if "parameters" in q.keys(): - params = q["parameters"] - if "group_name" in params.keys(): - vm_list = self._get_vms_from_logical_group( - params["group_name"]) - query_results[q["stack_id"]] = vm_list - else: - self.status = "unknown paramenter in query" - self.logger.warn("Ostro._query: unknown paramenter in query") - query_results[q["stack_id"]] = None + if "type" in _q.keys(): + if _q["type"] == "group_vms": + if "parameters" in _q.keys(): + params = _q["parameters"] + if "group_name" in params.keys(): + self.data_lock.acquire() + vm_list = self._get_vms_from_logical_group(params["group_name"]) + self.data_lock.release() + query_result[_q["stack_id"]] = vm_list else: - self.status = "no paramenter in query" - self.logger.warn("Ostro._query: no parameters in query") - query_results[q["stack_id"]] = None - elif q["type"] == "all_groups": - query_results[q["stack_id"]] = self._get_logical_groups() + self.status = "unknown paramenter in query" + self.logger.warn("Ostro: unknown paramenter in query") + query_result[_q["stack_id"]] = None else: - self.status = "unknown query type" - self.logger.warn("Ostro._query: unknown query type") - query_results[q["stack_id"]] = None + self.status = "no paramenter in query" + self.logger.warn("Ostro: no parameters in query") + query_result[_q["stack_id"]] = None + elif _q["type"] == "all_groups": + self.data_lock.acquire() + query_result[_q["stack_id"]] = self._get_logical_groups() + self.data_lock.release() else: - self.status = "unknown type in query" - self.logger.warn("Ostro._query: no type in query") - query_results[q["stack_id"]] = None + self.status = "unknown query type" + self.logger.warn("Ostro: unknown query type") + query_result[_q["stack_id"]] = None + else: + self.status = "unknown type in query" + self.logger.warn("Ostro: no type in query") + query_result[_q["stack_id"]] = None - return query_results + return query_result def _get_vms_from_logical_group(self, _group_name): vm_list = [] @@ -310,9 +290,9 @@ class Ostro(object): return logical_groups - def _place_app(self, _app_data): + def _place_app(self, _app): """Set application topology.""" - app_topology = self.app_handler.add_app(_app_data) + app_topology = self.app_handler.add_app(_app) if app_topology is None: self.status = self.app_handler.status self.logger.error("Ostro._place_app: error while register" @@ -323,14 +303,16 @@ class Ostro(object): for _, vm in app_topology.vms.iteritems(): if self._set_vm_flavor_information(vm) is False: self.status = "fail to set flavor information" - self.logger.error("Ostro._place_app: failed to set flavor information ") + self.logger.error("Ostro: failed to set flavor information ") return None for _, vg in app_topology.vgroups.iteritems(): if self._set_vm_flavor_information(vg) is False: self.status = "fail to set flavor information in a group" - self.logger.error("Ostro._place_app: failed to set flavor information in a group") + self.logger.error("Ostro: failed to set flavor information in a group") return None + self.data_lock.acquire() + """Set weights for optimization.""" app_topology.set_weight() app_topology.set_optimization_priority() @@ -341,6 +323,7 @@ class Ostro(object): self.status = self.optimizer.status self.logger.debug("Ostro._place_app: error while optimizing app " "placement: " + self.status) + self.data_lock.release() return None """Update resource and app information.""" @@ -354,6 +337,8 @@ class Ostro(object): if vk in placement_map.keys(): del placement_map[vk] + self.data_lock.release() + return placement_map def _set_vm_flavor_information(self, _v): diff --git a/valet/engine/resource_manager/compute_manager.py b/valet/engine/resource_manager/compute_manager.py index 0c19f94..3e42b59 100755 --- a/valet/engine/resource_manager/compute_manager.py +++ b/valet/engine/resource_manager/compute_manager.py @@ -98,23 +98,21 @@ class ComputeManager(threading.Thread): self.logger.info("ComputeManager: --- start compute_nodes " "status update ---") - self.data_lock.acquire() - try: - triggered_host_updates = self.set_hosts() - triggered_flavor_updates = self.set_flavors() + triggered_host_updates = self.set_hosts() + triggered_flavor_updates = self.set_flavors() - if triggered_host_updates is True and \ - triggered_flavor_updates is True: - if self.resource.update_topology() is False: - # TODO(UNKNOWN): error in MUSIC. ignore? - pass - else: - # TODO(UNKNOWN): error handling, e.g., - # 3 times failure then stop Ostro? - pass - finally: + if triggered_host_updates is True and triggered_flavor_updates is True: + self.data_lock.acquire() + update_status = self.resource.update_topology() self.data_lock.release() + if update_status is False: + # TODO(GY): error in MUSIC. ignore? + pass + else: + # TODO(GY): error handling, e.g., 3 times failure then stop Ostro? + pass + self.logger.info("ComputeManager: --- done compute_nodes " "status update ---") @@ -139,8 +137,10 @@ class ComputeManager(threading.Thread): self._compute_avail_host_resources(hosts) + self.data_lock.acquire() self._check_logical_group_update(logical_groups) self._check_host_update(hosts) + self.data_lock.release() return True @@ -387,7 +387,9 @@ class ComputeManager(threading.Thread): self.logger.error("ComputeManager: " + status) return False + self.data_lock.acquire() self._check_flavor_update(flavors) + self.data_lock.release() return True diff --git a/valet/engine/resource_manager/topology_manager.py b/valet/engine/resource_manager/topology_manager.py index 90015aa..559211a 100755 --- a/valet/engine/resource_manager/topology_manager.py +++ b/valet/engine/resource_manager/topology_manager.py @@ -95,15 +95,15 @@ class TopologyManager(threading.Thread): self.logger.info("TopologyManager: --- start topology " "status update ---") - self.data_lock.acquire() - try: - if self.set_topology() is True: - if self.resource.update_topology() is False: - # TODO(UNKOWN): ignore? - pass - finally: + if self.set_topology() is True: + self.data_lock.acquire() + update_status = self.resource.update_topology() self.data_lock.release() + if update_status is False: + # TODO(GY): ignore? + pass + self.logger.info("TopologyManager: --- done topology status update ---") def set_topology(self): @@ -128,7 +128,9 @@ class TopologyManager(threading.Thread): self.logger.error("TopologyManager: " + status) return False + self.data_lock.acquire() self._check_update(datacenter, host_groups, hosts, switches) + self.data_lock.release() return True