diff --git a/.gitignore b/.gitignore index 9a9a4ef..abb7f91 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,9 @@ +# MacOS +.DS_Store + +# IDEA / IDE +.idea/ + # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] @@ -108,4 +114,3 @@ obj/ [Rr]elease*/ _ReSharper*/ [Tt]est[Rr]esult* -.idea/* diff --git a/tools/conf.d/HAValet.conf b/tools/conf.d/HAValet.conf deleted file mode 100644 index 6f9fc85..0000000 --- a/tools/conf.d/HAValet.conf +++ /dev/null @@ -1,6 +0,0 @@ -[program:HAValet] -command=python /usr/local/lib/python2.7/dist-packages/valet/ha/ha_valet.py -autostart=true -autorestart=true -stderr_logfile=/var/log/HAValet.err.log -stdout_logfile=/var/log/HAValet.out.log diff --git a/tools/conf.d/music.conf b/tools/conf.d/music.conf deleted file mode 100644 index 34b3550..0000000 --- a/tools/conf.d/music.conf +++ /dev/null @@ -1,15 +0,0 @@ -[program:cassandra] -command=/bin/bash -c '/opt/app/apache-cassandra-2.1.1/bin/cassandra -f' -autostart=true -autorestart=true -stopsignal=KILL -stderr_logfile=/var/log/cassandra.err.log -stdout_logfile=/var/log/cassandra.out.log - -[program:Zookeeper] -command=/opt/app/zookeeper-3.4.6/bin/zkServer.sh start-foreground -autostart=true -autorestart=true -stopsignal=KILL -stderr_logfile=/var/log/zookeeper.err.log -stdout_logfile=/var/log/zookeeper.out.log \ No newline at end of file diff --git a/valet/engine/optimizer/app_manager/app_handler.py b/valet/engine/optimizer/app_manager/app_handler.py index 6947231..6e11065 100755 --- a/valet/engine/optimizer/app_manager/app_handler.py +++ b/valet/engine/optimizer/app_manager/app_handler.py @@ -48,8 +48,6 @@ class AppHandler(object): app_topology = AppTopology(self.resource, self.logger) - self.logger.debug("AppHandler: parse app") - stack_id = None if "stack_id" in _app.keys(): stack_id = _app["stack_id"] @@ -88,6 +86,11 @@ class AppHandler(object): else: app_id = app_topology.set_app_topology(_app) + if len(app_topology.candidate_list_map) > 0: + self.logger.debug("got ad-hoc placement: " + stack_id) + else: + self.logger.debug("got placement: " + stack_id) + if app_id is None: self.logger.error(app_topology.status) self.status = app_topology.status @@ -204,17 +207,9 @@ class AppHandler(object): _app_topology.candidate_list_map[vmk] = \ _app["locations"] - self.logger.debug("AppHandler: re-requested vm = " + - vm["name"] + " in") - for hk in _app["locations"]: - self.logger.debug(" " + hk) - elif vmk in _app["exclusions"]: _app_topology.planned_vm_map[vmk] = vm["host"] - self.logger.debug("AppHandler: exception from " - "replan = " + vm["name"]) - elif _action == "migrate": if vmk == _app["orchestration_id"]: _app_topology.exclusion_list_map[vmk] = _app[ diff --git a/valet/engine/optimizer/app_manager/app_topology.py b/valet/engine/optimizer/app_manager/app_topology.py index 2704277..565e995 100755 --- a/valet/engine/optimizer/app_manager/app_topology.py +++ b/valet/engine/optimizer/app_manager/app_topology.py @@ -67,6 +67,9 @@ class AppTopology(object): """ (vgroups, vms, volumes) = self.parser.set_topology(_app_graph) + if len(self.parser.candidate_list_map) > 0: + self.candidate_list_map = self.parser.candidate_list_map + if len(vgroups) == 0 and len(vms) == 0 and len(volumes) == 0: self.status = self.parser.status return None diff --git a/valet/engine/optimizer/app_manager/app_topology_parser.py b/valet/engine/optimizer/app_manager/app_topology_parser.py index 1be6091..1aefc0f 100755 --- a/valet/engine/optimizer/app_manager/app_topology_parser.py +++ b/valet/engine/optimizer/app_manager/app_topology_parser.py @@ -48,6 +48,8 @@ class Parser(object): self.application_name = None self.action = None # [create|update|ping] + self.candidate_list_map = {} + self.status = "success" def set_topology(self, _graph): @@ -72,6 +74,11 @@ class Parser(object): else: self.action = "any" + if "locations" in _graph.keys() and len(_graph["locations"]) > 0: + if len(_graph["resources"]) == 1: + v_uuid = _graph["resources"].keys()[0] + self.candidate_list_map[v_uuid] = _graph["locations"] + return self._set_topology(_graph["resources"]) def _set_topology(self, _elements): @@ -103,6 +110,10 @@ class Parser(object): # NOTE: do not allow to specify a certain host name vm.availability_zone = az.split(":")[0] + if "locations" in r.keys(): + if len(r["locations"]) > 0: + self.candidate_list_map[rk] = r["locations"] + vms[vm.uuid] = vm self.logger.debug("vm = " + vm.uuid) diff --git a/valet/engine/optimizer/db_connect/client.cfg b/valet/engine/optimizer/db_connect/client.cfg deleted file mode 100644 index 7b10c14..0000000 --- a/valet/engine/optimizer/db_connect/client.cfg +++ /dev/null @@ -1,14 +0,0 @@ -# Version 2.0.2: - -# Set database keyspace -db_keyspace=valet_test -db_request_table=placement_requests -db_response_table=placement_results -db_event_table=oslo_messages -db_resource_table=resource_status -db_resource_index_table=resource_log_index -db_app_index_table=app_log_index -db_app_table=app -db_uuid_table=uuid_map - -#replication_factor=3 diff --git a/valet/engine/optimizer/db_connect/configuration.py b/valet/engine/optimizer/db_connect/configuration.py deleted file mode 100644 index 1e0f2d5..0000000 --- a/valet/engine/optimizer/db_connect/configuration.py +++ /dev/null @@ -1,84 +0,0 @@ -# -# Copyright 2014-2017 AT&T Intellectual Property -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Configuration.""" - -import sys - - -class Config(object): - """Config Class. - - This class consists of one function that reads client config options - from a file and sets the corresponding config variables of this class. - """ - - def __init__(self): - """Init Config class.""" - self.mode = None - - self.db_keyspace = None - self.db_request_table = None - self.db_response_table = None - self.db_event_table = None - self.db_resource_table = None - self.db_app_table = None - self.db_resource_index_table = None - self.db_app_index_table = None - self.db_uuid_table = None - - def configure(self): - """Read client config file for config options and return success.""" - try: - f = open("./client.cfg", "r") - line = f.readline() - - while line: - if line.startswith("#") or line.startswith(" ") or line == "\n": - line = f.readline() - continue - - (rk, v) = line.split("=") - k = rk.strip() - - if k == "db_keyspace": - self.db_keyspace = v.strip() - elif k == "db_request_table": - self.db_request_table = v.strip() - elif k == "db_response_table": - self.db_response_table = v.strip() - elif k == "db_event_table": - self.db_event_table = v.strip() - elif k == "db_resource_table": - self.db_resource_table = v.strip() - elif k == "db_app_table": - self.db_app_table = v.strip() - elif k == "db_resource_index_table": - self.db_resource_index_table = v.strip() - elif k == "db_app_index_table": - self.db_app_index_table = v.strip() - elif k == "db_uuid_table": - self.db_uuid_table = v.strip() - - line = f.readline() - - f.close() - - return "success" - - except IOError as e: - return "I/O error({}): {}".format(e.errno, e.strerror) - except Exception: - return "Unexpected error: ", sys.exc_info()[0] diff --git a/valet/engine/optimizer/db_connect/music_handler.py b/valet/engine/optimizer/db_connect/music_handler.py index e7eb03e..5fea74d 100644 --- a/valet/engine/optimizer/db_connect/music_handler.py +++ b/valet/engine/optimizer/db_connect/music_handler.py @@ -17,6 +17,7 @@ import json import operator +import time from valet.common.music import Music from valet.engine.optimizer.db_connect.event import Event @@ -175,12 +176,14 @@ class MusicHandler(object): """ event_list = [] + ts = time.time() events = {} try: events = self.music.read_all_rows(self.config.db_keyspace, self.config.db_event_table) except Exception as e: self.logger.error("DB:event: " + str(e)) + self.logger.debug("EVAL: the delay of getting events = " + str(time.time() - ts)) return None if len(events) > 0: @@ -330,6 +333,7 @@ class MusicHandler(object): if len(event_list) > 0: event_list.sort(key=operator.attrgetter('event_id')) + self.logger.debug("EVAL: the delay of getting events = " + str(time.time() - ts)) return event_list def delete_event(self, _event_id): @@ -409,12 +413,14 @@ class MusicHandler(object): """Return list of requests that consists of all rows in a db table.""" request_list = [] + ts = time.time() requests = {} try: requests = self.music.read_all_rows(self.config.db_keyspace, self.config.db_request_table) except Exception as e: self.logger.error("DB: while reading requests: " + str(e)) + self.logger.debug("EVAL: the delay of getting requests = " + str(time.time() - ts)) return None if len(requests) > 0: @@ -428,6 +434,7 @@ class MusicHandler(object): for r in r_list: request_list.append(r) + self.logger.debug("EVAL: the delay of getting requests = " + str(time.time() - ts)) return request_list def put_result(self, _result): diff --git a/valet/engine/optimizer/ostro/constraint_solver.py b/valet/engine/optimizer/ostro/constraint_solver.py index 8d2b527..b319dfa 100755 --- a/valet/engine/optimizer/ostro/constraint_solver.py +++ b/valet/engine/optimizer/ostro/constraint_solver.py @@ -49,8 +49,6 @@ class ConstraintSolver(object): """When replanning.""" if _n.node.host is not None and len(_n.node.host) > 0: - self.logger.debug("ConstraintSolver: reconsider with given " - "candidates") for hk in _n.node.host: for ark, ar in _avail_resources.iteritems(): if hk == ark: @@ -275,10 +273,6 @@ class ConstraintSolver(object): if r not in conflict_list: conflict_list.append(r) - debug_resource_name = r.get_resource_name(_level) - self.logger.debug("ConstraintSolver: exclusivity defined " - "in resource = " + debug_resource_name) - _candidate_list[:] = [c for c in _candidate_list if c not in conflict_list] @@ -330,10 +324,6 @@ class ConstraintSolver(object): if self.exist_group(_level, _exclusivity_id, "EX", r) is True: if r not in candidate_list: candidate_list.append(r) - else: - debug_resource_name = r.get_resource_name(_level) - self.logger.debug("ConstraintSolver: exclusivity not exist in " - "resource = " + debug_resource_name) return candidate_list @@ -344,10 +334,6 @@ class ConstraintSolver(object): if self.check_hibernated(_level, r) is True: if r not in candidate_list: candidate_list.append(r) - else: - debug_resource_name = r.get_resource_name(_level) - self.logger.debug("ConstraintSolver: exclusivity not allowed " - "in resource = " + debug_resource_name) return candidate_list diff --git a/valet/engine/optimizer/ostro/openstack_filters.py b/valet/engine/optimizer/ostro/openstack_filters.py index 5b87546..9425877 100755 --- a/valet/engine/optimizer/ostro/openstack_filters.py +++ b/valet/engine/optimizer/ostro/openstack_filters.py @@ -89,17 +89,11 @@ class AggregateInstanceExtraSpecsFilter(object): aggregate_vals = _metadata.get(key, None) if not aggregate_vals: - self.logger.debug("key (" + key + ") not exists in logical_" - "group (" + _lg_name + ") " + - " of host (" + _h_name + ")") return False for aggregate_val in aggregate_vals: if openstack_utils.match(aggregate_val, req): break else: - self.logger.debug("key (" + key + ")'s value (" + req + ") not " - "exists in logical_group " + "(" + _lg_name + - ") " + " of host (" + _h_name + ")") return False return True @@ -138,8 +132,6 @@ class AvailabilityZoneFilter(object): for azr in az_request_list: if azr not in availability_zone_list: - self.logger.debug("AZ (" + azr + ") not exists in host " + "(" + - _host.get_resource_name(_level) + ")") return False return True @@ -160,17 +152,9 @@ class RamFilter(object): # Do not allow an instance to overcommit against itself, only against # other instances. if not total_ram >= requested_ram: - self.logger.debug("requested mem (" + str(requested_ram) + - ") more than total mem (" + - str(total_ram) + ") in host (" + - _host.get_resource_name(_level) + ")") return False if not usable_ram >= requested_ram: - self.logger.debug("requested mem (" + str(requested_ram) + - ") more than avail mem (" + - str(usable_ram) + ") in host (" + - _host.get_resource_name(_level) + ")") return False return True @@ -192,17 +176,9 @@ class CoreFilter(object): # Do not allow an instance to overcommit against itself, only against # other instances. if instance_vCPUs > vCPUs: - self.logger.debug("requested vCPUs (" + str(instance_vCPUs) + - ") more than total vCPUs (" + - str(vCPUs) + ") in host (" + - _host.get_resource_name(_level) + ")") return False if avail_vCPUs < instance_vCPUs: - self.logger.debug("requested vCPUs (" + str(instance_vCPUs) + - ") more than avail vCPUs (" + - str(avail_vCPUs) + ") in host (" + - _host.get_resource_name(_level) + ")") return False return True @@ -221,10 +197,6 @@ class DiskFilter(object): (_, usable_disk) = _host.get_local_disk(_level) if not usable_disk >= requested_disk: - self.logger.debug("requested disk (" + str(requested_disk) + - ") more than avail disk (" + - str(usable_disk) + ") in host (" + - _host.get_resource_name(_level) + ")") return False return True diff --git a/valet/engine/optimizer/ostro/optimizer.py b/valet/engine/optimizer/ostro/optimizer.py index e140365..154c70b 100755 --- a/valet/engine/optimizer/ostro/optimizer.py +++ b/valet/engine/optimizer/ostro/optimizer.py @@ -44,12 +44,14 @@ class Optimizer(object): start_ts = time.time() - if len(_app_topology.candidate_list_map) > 0: - place_type = "replan" - elif len(_app_topology.exclusion_list_map) > 0: + if len(_app_topology.exclusion_list_map) > 0: place_type = "migration" else: - place_type = "create" + if (len(_app_topology.old_vm_map) > 0 or len(_app_topology.planned_vm_map) > 0) and \ + len(_app_topology.candidate_list_map) > 0: + place_type = "replan" + else: + place_type = "create" if place_type == "migration": vm_id = _app_topology.exclusion_list_map.keys()[0] @@ -65,9 +67,6 @@ class Optimizer(object): if len(_app_topology.old_vm_map) > 0: uuid_map = self._delete_old_vms(_app_topology.old_vm_map) self.resource.update_topology(store=False) - - self.logger.debug("Optimizer: remove old placements for " - "replan") else: success = self.search.place_nodes(_app_topology, self.resource) diff --git a/valet/engine/optimizer/ostro/ostro.py b/valet/engine/optimizer/ostro/ostro.py index 8923053..5c8e0b6 100755 --- a/valet/engine/optimizer/ostro/ostro.py +++ b/valet/engine/optimizer/ostro/ostro.py @@ -74,7 +74,7 @@ class Ostro(object): self.end_of_process = False self.batch_store_trigger = 10 # sec - self.batch_events_count = 1 + # self.batch_events_count = 1 ''' def run_ostro(self): @@ -107,9 +107,12 @@ class Ostro(object): else: if self.resource.resource_updated is True and \ (time.time()-self.resource.curr_db_timestamp) >= self.batch_store_trigger: + self.data_lock.acquire() if self.resource.store_topology_updates() is False: + self.data_lock.release() break self.resource.resource_updated = False + self.data_lock.release() else: time.sleep(0.1) @@ -158,9 +161,12 @@ class Ostro(object): else: if self.resource.resource_updated is True and \ (time.time() - self.resource.curr_db_timestamp) >= self.batch_store_trigger: + self.data_lock.acquire() if self.resource.store_topology_updates() is False: + self.data_lock.release() break self.resource.resource_updated = False + self.data_lock.release() self.topology.end_of_process = True self.compute.end_of_process = True @@ -281,7 +287,7 @@ class Ostro(object): self.logger.info("done app placement") end_time = time.time() - self.logger.info("EVAL: total decision delay of request = " + str(end_time - start_time) + " sec") + self.logger.debug("EVAL: total decision delay of request = " + str(end_time - start_time)) return True @@ -424,8 +430,6 @@ class Ostro(object): if self._set_flavors() is False: return False - self.resource.update_topology(store=False) - flavor = self.resource.get_flavor(_vm.flavor) if flavor is None: return False @@ -449,10 +453,12 @@ class Ostro(object): """ self.data_lock.acquire() + event_handler_start_time = time.time() + resource_updated = False - events_count = 0 - handled_event_list = [] + # events_count = 0 + # handled_event_list = [] for e in _event_list: if e.host is not None and e.host != "none": if self._check_host(e.host) is False: @@ -587,15 +593,15 @@ class Ostro(object): self.logger.warn("Ostro.handle_events: unknown event " "method = " + e.method) - events_count += 1 - handled_event_list.append(e) - if events_count >= self.batch_events_count: - break + # events_count += 1 + # handled_event_list.append(e) + # if events_count >= self.batch_events_count: + # break if resource_updated is True: self.resource.update_topology(store=False) - for e in handled_event_list: + for e in _event_list: if self.db.delete_event(e.event_id) is False: self.data_lock.release() return False @@ -606,6 +612,8 @@ class Ostro(object): self.data_lock.release() return False + self.logger.debug("EVAL: total delay for event handling = " + str(time.time() - event_handler_start_time)) + self.data_lock.release() return True diff --git a/valet/engine/optimizer/ostro/search.py b/valet/engine/optimizer/ostro/search.py index 58c13ec..72d687e 100755 --- a/valet/engine/optimizer/ostro/search.py +++ b/valet/engine/optimizer/ostro/search.py @@ -149,8 +149,6 @@ class Search(object): if len(self.app_topology.old_vm_map) > 0: self._adjust_resources() - self.logger.debug("Search: adjust resources by deducting prior " - "placements") self._compute_resource_weights() @@ -808,8 +806,6 @@ class Search(object): """Already-planned vgroup.""" planned_host = None if _n.node in self.planned_placements.keys(): - self.logger.debug("Search: already determined node = " + - _n.node.name) copied_host = self.planned_placements[_n.node] if _level == "host": planned_host = _avail_resources[copied_host.host_name] diff --git a/valet/engine/resource_manager/compute_manager.py b/valet/engine/resource_manager/compute_manager.py index 07176d7..b52c8ca 100755 --- a/valet/engine/resource_manager/compute_manager.py +++ b/valet/engine/resource_manager/compute_manager.py @@ -99,19 +99,11 @@ class ComputeManager(threading.Thread): "status update ---") triggered_host_updates = self.set_hosts() + if triggered_host_updates is not True: + self.logger.warn("fail to set hosts from nova") triggered_flavor_updates = self.set_flavors() - - if triggered_host_updates is True and triggered_flavor_updates is True: - self.data_lock.acquire() - update_status = self.resource.update_topology(store=False) - self.data_lock.release() - - if update_status is False: - # TODO(GY): error in MUSIC. ignore? - pass - else: - # TODO(GY): error handling, e.g., 3 times failure then stop Ostro? - pass + if triggered_flavor_updates is not True: + self.logger.warn("fail to set flavor from nova") self.logger.info("ComputeManager: --- done compute_nodes " "status update ---") @@ -138,8 +130,11 @@ class ComputeManager(threading.Thread): self._compute_avail_host_resources(hosts) self.data_lock.acquire() - self._check_logical_group_update(logical_groups) - self._check_host_update(hosts) + lg_updated = self._check_logical_group_update(logical_groups) + host_updated = self._check_host_update(hosts) + + if lg_updated is True or host_updated is True: + self.resource.update_topology(store=False) self.data_lock.release() return True @@ -149,6 +144,8 @@ class ComputeManager(threading.Thread): self.resource.compute_avail_resources(hk, host) def _check_logical_group_update(self, _logical_groups): + updated = False + for lk in _logical_groups.keys(): if lk not in self.resource.logical_groups.keys(): self.resource.logical_groups[lk] = deepcopy(_logical_groups[lk]) @@ -156,6 +153,7 @@ class ComputeManager(threading.Thread): self.resource.logical_groups[lk].last_update = time.time() self.logger.warn("ComputeManager: new logical group (" + lk + ") added") + updated = True for rlk in self.resource.logical_groups.keys(): rl = self.resource.logical_groups[rlk] @@ -167,6 +165,7 @@ class ComputeManager(threading.Thread): self.resource.logical_groups[rlk].last_update = time.time() self.logger.warn("ComputeManager: logical group (" + rlk + ") removed") + updated = True for lk in _logical_groups.keys(): lg = _logical_groups[lk] @@ -178,28 +177,42 @@ class ComputeManager(threading.Thread): rlg.last_update = time.time() self.logger.warn("ComputeManager: logical group (" + lk + ") updated") + updated = True + + return updated def _check_logical_group_metadata_update(self, _lg, _rlg): + updated = False + if _lg.status != _rlg.status: _rlg.status = _lg.status + updated = True for mdk in _lg.metadata.keys(): if mdk not in _rlg.metadata.keys(): _rlg.metadata[mdk] = _lg.metadata[mdk] + updated = True for rmdk in _rlg.metadata.keys(): if rmdk not in _lg.metadata.keys(): del _rlg.metadata[rmdk] + updated = True for hk in _lg.vms_per_host.keys(): if hk not in _rlg.vms_per_host.keys(): _rlg.vms_per_host[hk] = deepcopy(_lg.vms_per_host[hk]) + updated = True for rhk in _rlg.vms_per_host.keys(): if rhk not in _lg.vms_per_host.keys(): del _rlg.vms_per_host[rhk] + updated = True + + return updated def _check_host_update(self, _hosts): + updated = False + for hk in _hosts.keys(): if hk not in self.resource.hosts.keys(): new_host = Host(hk) @@ -208,6 +221,7 @@ class ComputeManager(threading.Thread): new_host.last_update = time.time() self.logger.warn("ComputeManager: new host (" + new_host.name + ") added") + updated = True for rhk, rhost in self.resource.hosts.iteritems(): if rhk not in _hosts.keys(): @@ -217,23 +231,28 @@ class ComputeManager(threading.Thread): rhost.last_update = time.time() self.logger.warn("ComputeManager: host (" + rhost.name + ") disabled") + updated = True for hk in _hosts.keys(): host = _hosts[hk] rhost = self.resource.hosts[hk] if self._check_host_config_update(host, rhost) is True: rhost.last_update = time.time() + updated = True for hk, h in self.resource.hosts.iteritems(): if h.clean_memberships() is True: h.last_update = time.time() self.logger.warn("ComputeManager: host (" + h.name + ") updated (delete EX/AFF/DIV membership)") + updated = True for hk, host in self.resource.hosts.iteritems(): - if host.last_update > self.resource.current_timestamp: + if host.last_update >= self.resource.current_timestamp: self.resource.update_rack_resource(host) + return updated + def _check_host_config_update(self, _host, _rhost): topology_updated = False @@ -388,19 +407,23 @@ class ComputeManager(threading.Thread): return False self.data_lock.acquire() - self._check_flavor_update(flavors) + if self._check_flavor_update(flavors) is True: + self.resource.update_topology(store=False) self.data_lock.release() return True def _check_flavor_update(self, _flavors): + updated = False + for fk in _flavors.keys(): if fk not in self.resource.flavors.keys(): self.resource.flavors[fk] = deepcopy(_flavors[fk]) self.resource.flavors[fk].last_update = time.time() self.logger.warn("ComputeManager: new flavor (" + - fk + ") added") + fk + ":" + _flavors[fk].flavor_id + ") added") + updated = True for rfk in self.resource.flavors.keys(): rf = self.resource.flavors[rfk] @@ -410,6 +433,7 @@ class ComputeManager(threading.Thread): rf.last_update = time.time() self.logger.warn("ComputeManager: flavor (" + rfk + ":" + rf.flavor_id + ") removed") + updated = True for fk in _flavors.keys(): f = _flavors[fk] @@ -419,6 +443,9 @@ class ComputeManager(threading.Thread): rf.last_update = time.time() self.logger.warn("ComputeManager: flavor (" + fk + ":" + rf.flavor_id + ") spec updated") + updated = True + + return updated def _check_flavor_spec_update(self, _f, _rf): spec_updated = False diff --git a/valet/engine/resource_manager/resource.py b/valet/engine/resource_manager/resource.py index 457b08f..f4b1d4e 100755 --- a/valet/engine/resource_manager/resource.py +++ b/valet/engine/resource_manager/resource.py @@ -207,7 +207,7 @@ class Resource(object): self.host_groups[hgk] = host_group if len(self.host_groups) == 0: - self.logger.error("fail loading host_groups") + self.logger.warn("fail loading host_groups") dc = _resource_status.get("datacenter") if dc: @@ -433,6 +433,8 @@ class Resource(object): self.nw_bandwidth_avail += min(avail_nw_bandwidth_list) def store_topology_updates(self): + store_start_time = time.time() + updated = False flavor_updates = {} logical_group_updates = {} @@ -536,6 +538,8 @@ class Resource(object): # self.show_current_logical_groups() # self.show_current_host_status() + self.logger.debug("EVAL: total delay for store resource status = " + str(time.time() - store_start_time)) + return True def show_current_logical_groups(self): @@ -566,12 +570,28 @@ class Resource(object): self.logger.error("TEST: membership missing") def show_current_host_status(self): - for hk, host in self.hosts.iteritems(): + for hk, h in self.hosts.iteritems(): self.logger.debug("TEST: host name = " + hk) - self.logger.debug(" status = " + host.status) - self.logger.debug(" vms = " + str(len(host.vm_list))) + self.logger.debug(" status = " + h.status + ", " + h.state) + self.logger.debug(" vms = " + str(len(h.vm_list))) + self.logger.debug(" resources (org, total, avail, used)") + cpu_org = str(h.original_vCPUs) + cpu_tot = str(h.vCPUs) + cpu_avail = str(h.avail_vCPUs) + cpu_used = str(h.vCPUs_used) + self.logger.debug(" cpu = " + cpu_org + ", " + cpu_tot + ", " + cpu_avail + ", " + cpu_used) + mem_org = str(h.original_mem_cap) + mem_tot = str(h.mem_cap) + mem_avail = str(h.avail_mem_cap) + mem_used = str(h.free_mem_mb) + self.logger.debug(" mem = " + mem_org + ", " + mem_tot + ", " + mem_avail + ", " + mem_used) + dsk_org = str(h.original_local_disk_cap) + dsk_tot = str(h.local_disk_cap) + dsk_avail = str(h.avail_local_disk_cap) + dsk_used = str(h.free_disk_gb) + self.logger.debug(" disk = " + dsk_org + ", " + dsk_tot + ", " + dsk_avail + ", " + dsk_used) self.logger.debug(" memberships") - for mk in host.memberships.keys(): + for mk in h.memberships.keys(): self.logger.debug(" " + mk) if mk not in self.logical_groups.keys(): self.logger.error("TEST: lg missing") diff --git a/valet/engine/resource_manager/simulation/__init__.py b/valet/engine/resource_manager/simulation/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/valet/engine/resource_manager/simulation/compute_simulator.py b/valet/engine/resource_manager/simulation/compute_simulator.py deleted file mode 100644 index 96a7310..0000000 --- a/valet/engine/resource_manager/simulation/compute_simulator.py +++ /dev/null @@ -1,123 +0,0 @@ -# -# Copyright 2014-2017 AT&T Intellectual Property -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Simulate hosts and flavors.""" - -from valet.engine.resource_manager.resource_base \ - import Host, LogicalGroup, Flavor - - -class SimCompute(object): - """Simulate Compute class.""" - - def __init__(self, _config): - """Init.""" - self.config = _config - - def set_hosts(self, _hosts, _logical_groups): - """Return success after setting sim hosts and flavors.""" - self._set_availability_zones(_hosts, _logical_groups) - - self._set_aggregates(_hosts, _logical_groups) - - self._set_placed_vms(_hosts, _logical_groups) - - self._set_resources(_hosts) - - return "success" - - def _set_availability_zones(self, _hosts, _logical_groups): - logical_group = LogicalGroup("nova") - logical_group.group_type = "AZ" - _logical_groups[logical_group.name] = logical_group - - for r_num in range(0, self.config.num_of_racks): - for h_num in range(0, self.config.num_of_hosts_per_rack): - host = Host(self.config.mode + "0r" + str(r_num) + "c" + - str(h_num)) - host.tag.append("nova") - host.memberships["nova"] = logical_group - - logical_group.vms_per_host[host.name] = [] - - _hosts[host.name] = host - - def _set_aggregates(self, _hosts, _logical_groups): - for a_num in range(0, self.config.num_of_aggregates): - metadata = {} - metadata["cpu_allocation_ratio"] = "0.5" - - aggregate = LogicalGroup("aggregate" + str(a_num)) - aggregate.group_type = "AGGR" - aggregate.metadata = metadata - - _logical_groups[aggregate.name] = aggregate - - for a_num in range(0, self.config.num_of_aggregates): - aggregate = _logical_groups["aggregate" + str(a_num)] - for r_num in range(0, self.config.num_of_racks): - for h_num in range(0, self.config.num_of_hosts_per_rack): - host_name = self.config.mode + "0r" + str(r_num) + "c" + \ - str(h_num) - if host_name in _hosts.keys(): - if (h_num % - (self.config.aggregated_ratio + a_num)) == 0: - host = _hosts[host_name] - host.memberships[aggregate.name] = aggregate - - aggregate.vms_per_host[host.name] = [] - - def _set_placed_vms(self, _hosts, _logical_groups): - pass - - def _set_resources(self, _hosts): - for r_num in range(0, self.config.num_of_racks): - for h_num in range(0, self.config.num_of_hosts_per_rack): - host_name = self.config.mode + "0r" + str(r_num) + "c" + \ - str(h_num) - if host_name in _hosts.keys(): - host = _hosts[host_name] - host.original_vCPUs = float(self.config.cpus_per_host) - host.vCPUs_used = 0.0 - host.original_mem_cap = float(self.config.mem_per_host) - host.free_mem_mb = host.original_mem_cap - host.original_local_disk_cap = \ - float(self.config.disk_per_host) - host.free_disk_gb = host.original_local_disk_cap - host.disk_available_least = host.original_local_disk_cap - - def set_flavors(self, _flavors): - """Return success after setting passed in flavors.""" - for f_num in range(0, self.config.num_of_basic_flavors): - flavor = Flavor("bflavor" + str(f_num)) - flavor.vCPUs = float(self.config.base_flavor_cpus * (f_num + 1)) - flavor.mem_cap = float(self.config.base_flavor_mem * (f_num + 1)) - flavor.disk_cap = \ - float(self.config.base_flavor_disk * (f_num + 1)) + \ - 10.0 + 20.0 / 1024.0 - - _flavors[flavor.name] = flavor - - for a_num in range(0, self.config.num_of_aggregates): - flavor = Flavor("sflavor" + str(a_num)) - flavor.vCPUs = self.config.base_flavor_cpus * (a_num + 1) - flavor.mem_cap = self.config.base_flavor_mem * (a_num + 1) - flavor.disk_cap = self.config.base_flavor_disk * (a_num + 1) - - flavor.extra_specs["cpu_allocation_ratio"] = "0.5" - - _flavors[flavor.name] = flavor - - return "success" diff --git a/valet/engine/resource_manager/simulation/topology_simulator.py b/valet/engine/resource_manager/simulation/topology_simulator.py deleted file mode 100644 index d839f65..0000000 --- a/valet/engine/resource_manager/simulation/topology_simulator.py +++ /dev/null @@ -1,153 +0,0 @@ -# -# Copyright 2014-2017 AT&T Intellectual Property -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Simulate datacenter configurations (i.e., layout, cabling).""" - -from valet.engine.resource_manager.resource_base \ - import HostGroup, Host, Switch, Link - - -class SimTopology(object): - """Simulate Network and Host Topology class.""" - - def __init__(self, _config): - """Init.""" - self.config = _config - - def set_topology(self, _datacenter, _host_groups, _hosts, _switches): - """Return success string after setting network and host topology.""" - self._set_network_topology(_switches) - self._set_host_topology(_datacenter, _host_groups, _hosts, _switches) - - return "success" - - def _set_network_topology(self, _switches): - root_switch = Switch("r0") - root_switch.switch_type = "root" - _switches[root_switch.name] = root_switch - - if self.config.num_of_spine_switches > 0: - for s_num in range(0, self.config.num_of_spine_switches): - switch = Switch(root_switch.name + "s" + str(s_num)) - switch.switch_type = "spine" - _switches[switch.name] = switch - - for r_num in range(0, self.config.num_of_racks): - switch = Switch(root_switch.name + "t" + str(r_num)) - switch.switch_type = "ToR" - _switches[switch.name] = switch - - for h_num in range(0, self.config.num_of_hosts_per_rack): - leaf_switch = Switch(switch.name + "l" + str(h_num)) - leaf_switch.switch_type = "leaf" - _switches[leaf_switch.name] = leaf_switch - - if self.config.num_of_spine_switches > 0: - for s_num in range(0, self.config.num_of_spine_switches): - s = _switches[root_switch.name + "s" + str(s_num)] - - up_link = Link(s.name + "-" + root_switch.name) - up_link.resource = root_switch - up_link.nw_bandwidth = self.config.bandwidth_of_spine - up_link.avail_nw_bandwidth = up_link.nw_bandwidth - s.up_links[up_link.name] = up_link - - if self.config.num_of_spine_switches > 1: - ps = None - if (s_num % 2) == 0: - if (s_num + 1) < self.config.num_of_spine_switches: - ps = _switches[root_switch.name + "s" + - str(s_num + 1)] - else: - ps = _switches[root_switch.name + "s" + str(s_num - 1)] - if ps is not None: - peer_link = Link(s.name + "-" + ps.name) - peer_link.resource = ps - peer_link.nw_bandwidth = self.config.bandwidth_of_spine - peer_link.avail_nw_bandwidth = peer_link.nw_bandwidth - s.peer_links[peer_link.name] = peer_link - - for r_num in range(0, self.config.num_of_racks): - s = _switches[root_switch.name + "t" + str(r_num)] - - parent_switch_list = [] - if self.config.num_of_spine_switches > 0: - for s_num in range(0, self.config.num_of_spine_switches): - parent_switch_list.append(_switches[root_switch.name + - "s" + str(s_num)]) - else: - parent_switch_list.append(_switches[root_switch.name]) - - for parent_switch in parent_switch_list: - up_link = Link(s.name + "-" + parent_switch.name) - up_link.resource = parent_switch - up_link.nw_bandwidth = self.config.bandwidth_of_rack - up_link.avail_nw_bandwidth = up_link.nw_bandwidth - s.up_links[up_link.name] = up_link - - if self.config.num_of_racks > 1: - ps = None - if (r_num % 2) == 0: - if (r_num + 1) < self.config.num_of_racks: - ps = _switches[root_switch.name + "t" + str(r_num + 1)] - else: - ps = _switches[root_switch.name + "t" + str(r_num - 1)] - if ps is not None: - peer_link = Link(s.name + "-" + ps.name) - peer_link.resource = ps - peer_link.nw_bandwidth = self.config.bandwidth_of_rack - peer_link.avail_nw_bandwidth = peer_link.nw_bandwidth - s.peer_links[peer_link.name] = peer_link - - for h_num in range(0, self.config.num_of_hosts_per_rack): - ls = _switches[s.name + "l" + str(h_num)] - - l_up_link = Link(ls.name + "-" + s.name) - l_up_link.resource = s - l_up_link.nw_bandwidth = self.config.bandwidth_of_host - l_up_link.avail_nw_bandwidth = l_up_link.nw_bandwidth - ls.up_links[l_up_link.name] = l_up_link - - def _set_host_topology(self, _datacenter, _host_groups, _hosts, _switches): - root_switch = _switches["r0"] - - for r_num in range(0, self.config.num_of_racks): - host_group = HostGroup(_datacenter.name + "r" + str(r_num)) - host_group.host_type = "rack" - switch = _switches[root_switch.name + "t" + str(r_num)] - host_group.switches[switch.name] = switch - _host_groups[host_group.name] = host_group - - for h_num in range(0, self.config.num_of_hosts_per_rack): - host = Host(host_group.name + "c" + str(h_num)) - leaf_switch = _switches[switch.name + "l" + str(h_num)] - host.switches[leaf_switch.name] = leaf_switch - _hosts[host.name] = host - - for r_num in range(0, self.config.num_of_racks): - host_group = _host_groups[_datacenter.name + "r" + str(r_num)] - host_group.parent_resource = _datacenter - - for h_num in range(0, self.config.num_of_hosts_per_rack): - host = _hosts[host_group.name + "c" + str(h_num)] - host.host_group = host_group - - host_group.child_resources[host.name] = host - - _datacenter.root_switches[root_switch.name] = root_switch - - for r_num in range(0, self.config.num_of_racks): - host_group = _host_groups[_datacenter.name + "r" + str(r_num)] - _datacenter.resources[host_group.name] = host_group diff --git a/valet/engine/resource_manager/topology_manager.py b/valet/engine/resource_manager/topology_manager.py index 5ff3482..2ab14eb 100755 --- a/valet/engine/resource_manager/topology_manager.py +++ b/valet/engine/resource_manager/topology_manager.py @@ -95,14 +95,8 @@ class TopologyManager(threading.Thread): self.logger.info("TopologyManager: --- start topology " "status update ---") - if self.set_topology() is True: - self.data_lock.acquire() - update_status = self.resource.update_topology(store=False) - self.data_lock.release() - - if update_status is False: - # TODO(GY): ignore? - pass + if self.set_topology() is not True: + self.logger.warn("fail to set topology") self.logger.info("--- done topology status update ---") @@ -129,12 +123,15 @@ class TopologyManager(threading.Thread): return False self.data_lock.acquire() - self._check_update(datacenter, host_groups, hosts, switches) + if self._check_update(datacenter, host_groups, hosts, switches) is True: + self.resource.update_topology(store=False) self.data_lock.release() return True def _check_update(self, _datacenter, _host_groups, _hosts, _switches): + updated = False + for sk in _switches.keys(): if sk not in self.resource.switches.keys(): new_switch = self._create_new_switch(_switches[sk]) @@ -144,6 +141,7 @@ class TopologyManager(threading.Thread): self.logger.warn("TopologyManager: new switch (" + new_switch.name + ") added") + updated = True for rsk in self.resource.switches.keys(): if rsk not in _switches.keys(): @@ -154,6 +152,7 @@ class TopologyManager(threading.Thread): self.logger.warn("TopologyManager: switch (" + switch.name + ") disabled") + updated = True for hk in _hosts.keys(): if hk not in self.resource.hosts.keys(): @@ -164,6 +163,7 @@ class TopologyManager(threading.Thread): self.logger.warn("TopologyManager: new host (" + new_host.name + ") added from configuration") + updated = True for rhk in self.resource.hosts.keys(): if rhk not in _hosts.keys(): @@ -175,6 +175,7 @@ class TopologyManager(threading.Thread): self.logger.warn("TopologyManager: host (" + host.name + ") removed from configuration") + updated = True for hgk in _host_groups.keys(): if hgk not in self.resource.host_groups.keys(): @@ -185,6 +186,7 @@ class TopologyManager(threading.Thread): self.logger.warn("TopologyManager: new host_group (" + new_host_group.name + ") added") + updated = True for rhgk in self.resource.host_groups.keys(): if rhgk not in _host_groups.keys(): @@ -195,13 +197,14 @@ class TopologyManager(threading.Thread): self.logger.warn("TopologyManager: host_group (" + host_group.name + ") disabled") + updated = True for sk in _switches.keys(): switch = _switches[sk] rswitch = self.resource.switches[sk] - link_updated = self._check_switch_update(switch, rswitch) - if link_updated is True: + if self._check_switch_update(switch, rswitch) is True: rswitch.last_update = time.time() + updated = True for hk in _hosts.keys(): host = _hosts[hk] @@ -210,8 +213,10 @@ class TopologyManager(threading.Thread): self._check_host_update(host, rhost) if topology_updated is True: rhost.last_update = time.time() + updated = True if link_updated is True: rhost.last_link_update = time.time() + updated = True for hgk in _host_groups.keys(): hg = _host_groups[hgk] @@ -220,24 +225,30 @@ class TopologyManager(threading.Thread): self._check_host_group_update(hg, rhg) if topology_updated is True: rhg.last_update = time.time() + updated = True if link_updated is True: rhg.last_link_update = time.time() + updated = True (topology_updated, link_updated) = \ self._check_datacenter_update(_datacenter) if topology_updated is True: self.resource.datacenter.last_update = time.time() + updated = True if link_updated is True: self.resource.datacenter.last_link_update = time.time() + updated = True for hk, host in self.resource.hosts.iteritems(): - if host.last_update > self.resource.current_timestamp: + if host.last_update >= self.resource.current_timestamp: self.resource.update_rack_resource(host) for hgk, hg in self.resource.host_groups.iteritems(): - if hg.last_update > self.resource.current_timestamp: + if hg.last_update >= self.resource.current_timestamp: self.resource.update_cluster_resource(hg) + return updated + def _create_new_switch(self, _switch): new_switch = Switch(_switch.name) new_switch.switch_type = _switch.switch_type