Refactor Engine to reduce delay

Change-Id: If6594a034db6e398e26dfdafafbae0c5fbfbf51f
This commit is contained in:
Gueyoung Jung 2017-03-10 11:49:53 -05:00 committed by Omar Rivera
parent 860554760b
commit ce1282b192
25 changed files with 250 additions and 3100 deletions

View File

@ -54,8 +54,6 @@ class AppHandler(object):
self.max_decision_history = 5000
self.min_decision_history = 1000
self.last_log_index = 0
self.status = "success"
# NOTE(GJ): do not cache migration decision
@ -123,7 +121,7 @@ class AppHandler(object):
action = _app["action"]
if action == "ping":
self.logger.debug("got ping")
self.logger.info("got ping")
elif action == "replan" or action == "migrate":
re_app = self._regenerate_app_topology(stack_id, _app,
app_topology, action)
@ -133,9 +131,9 @@ class AppHandler(object):
return None
if action == "replan":
self.logger.debug("got replan: " + stack_id)
self.logger.info("got replan: " + stack_id)
elif action == "migrate":
self.logger.debug("got migration: " + stack_id)
self.logger.info("got migration: " + stack_id)
app_id = app_topology.set_app_topology(re_app)
@ -148,9 +146,9 @@ class AppHandler(object):
app_id = app_topology.set_app_topology(_app)
if len(app_topology.candidate_list_map) > 0:
self.logger.debug("got ad-hoc placement: " + stack_id)
self.logger.info("got ad-hoc placement: " + stack_id)
else:
self.logger.debug("got placement: " + stack_id)
self.logger.info("got placement: " + stack_id)
if app_id is None:
self.logger.error(app_topology.status)
@ -322,8 +320,6 @@ class AppHandler(object):
exclusivity_groups[ex_id] = []
exclusivity_groups[ex_id].append(gk)
# NOTE: skip pipes in this version
for div_id, resource_list in diversity_groups.iteritems():
divk_level_name = div_id.split(":")
resources[divk_level_name[0]] = {}

View File

@ -30,31 +30,28 @@ class AppTopology(object):
"""Init App Topology Class."""
self.vgroups = {}
self.vms = {}
self.volumes = {}
""" for replan """
# for replan
self.old_vm_map = {}
self.planned_vm_map = {}
self.candidate_list_map = {}
""" for migration-tip """
# for migration-tip
self.exclusion_list_map = {}
self.resource = _resource
self.logger = _logger
""" restriction of host naming convention """
# restriction of host naming convention
high_level_allowed = True
if "none" in self.resource.datacenter.region_code_list:
high_level_allowed = False
self.parser = Parser(high_level_allowed, self.logger)
self.total_nw_bandwidth = 0
self.total_CPU = 0
self.total_mem = 0
self.total_local_vol = 0
self.total_vols = {}
self.optimization_priority = None
self.status = "success"
@ -65,22 +62,20 @@ class AppTopology(object):
Set app topology by calling parser to determine vgroups,
vms and volumes. Then return parsed stack_id, app_name and action.
"""
(vgroups, vms, volumes) = self.parser.set_topology(_app_graph)
(vgroups, vms) = self.parser.set_topology(_app_graph)
if len(self.parser.candidate_list_map) > 0:
self.candidate_list_map = self.parser.candidate_list_map
if len(vgroups) == 0 and len(vms) == 0 and len(volumes) == 0:
if len(vgroups) == 0 and len(vms) == 0:
self.status = self.parser.status
return None
""" cumulate virtual resources """
# cumulate virtual resources
for _, vgroup in vgroups.iteritems():
self.vgroups[vgroup.uuid] = vgroup
for _, vm in vms.iteritems():
self.vms[vm.uuid] = vm
for _, vol in volumes.iteritems():
self.volumes[vol.uuid] = vol
return self.parser.stack_id, self.parser.application_name, \
self.parser.action
@ -127,19 +122,6 @@ class AppTopology(object):
_v.local_volume_weight = 0.0
self.total_local_vol += _v.local_volume_size
bandwidth = _v.nw_bandwidth + _v.io_bandwidth
if self.resource.nw_bandwidth_avail > 0:
_v.bandwidth_weight = float(bandwidth) / \
float(self.resource.nw_bandwidth_avail)
else:
if bandwidth > 0:
_v.bandwidth_weight = 1.0
else:
_v.bandwidth_weight = 0.0
self.total_nw_bandwidth += bandwidth
def _set_vgroup_resource(self, _vg):
if isinstance(_vg, VM):
return
@ -178,17 +160,6 @@ class AppTopology(object):
else:
_vgroup.local_volume_weight = 0.0
bandwidth = _vgroup.nw_bandwidth + _vgroup.io_bandwidth
if self.resource.nw_bandwidth_avail > 0:
_vgroup.bandwidth_weight = float(bandwidth) / \
float(self.resource.nw_bandwidth_avail)
else:
if bandwidth > 0:
_vgroup.bandwidth_weight = 1.0
else:
_vgroup.bandwidth_weight = 0.0
for _, svg in _vgroup.subvgroups.iteritems():
if isinstance(svg, VGroup):
self._set_vgroup_weight(svg)
@ -200,20 +171,9 @@ class AppTopology(object):
and overall volume for an app. Then Sorts the results and sets
optimization order accordingly.
"""
if len(self.vgroups) == 0 and len(self.vms) == 0 and \
len(self.volumes) == 0:
if len(self.vgroups) == 0 and len(self.vms) == 0:
return
app_nw_bandwidth_weight = -1
if self.resource.nw_bandwidth_avail > 0:
app_nw_bandwidth_weight = float(self.total_nw_bandwidth) / \
float(self.resource.nw_bandwidth_avail)
else:
if self.total_nw_bandwidth > 0:
app_nw_bandwidth_weight = 1.0
else:
app_nw_bandwidth_weight = 0.0
app_CPU_weight = -1
if self.resource.CPU_avail > 0:
app_CPU_weight = float(self.total_CPU) / \
@ -244,25 +204,9 @@ class AppTopology(object):
else:
app_local_vol_weight = 0.0
total_vol_list = []
for vol_class in self.total_vols.keys():
total_vol_list.append(self.total_vols[vol_class])
app_vol_weight = -1
if self.resource.disk_avail > 0:
app_vol_weight = float(sum(total_vol_list)) / \
float(self.resource.disk_avail)
else:
if sum(total_vol_list) > 0:
app_vol_weight = 1.0
else:
app_vol_weight = 0.0
opt = [("bw", app_nw_bandwidth_weight),
("cpu", app_CPU_weight),
opt = [("cpu", app_CPU_weight),
("mem", app_mem_weight),
("lvol", app_local_vol_weight),
("vol", app_vol_weight)]
("lvol", app_local_vol_weight)]
self.optimization_priority = sorted(opt,
key=lambda resource: resource[1],

View File

@ -45,8 +45,6 @@ class VGroup(object):
self.survgroup = None # where this vgroup belong to
self.subvgroups = {} # child vgroups
self.vgroup_list = [] # a list of links to VMs or Volumes
self.diversity_groups = {} # cumulative diversity/exclusivity group
self.exclusivity_groups = {} # over this level. key=name, value=level
@ -56,15 +54,10 @@ class VGroup(object):
self.vCPUs = 0
self.mem = 0 # MB
self.local_volume_size = 0 # GB
self.volume_sizes = {} # key = volume_class_name, value = size
self.nw_bandwidth = 0 # Mbps
self.io_bandwidth = 0 # Mbps
self.vCPU_weight = -1
self.mem_weight = -1
self.local_volume_weight = -1
self.volume_weight = -1 # averge of all storage classes
self.bandwidth_weight = -1
self.host = None
@ -80,17 +73,12 @@ class VGroup(object):
for vk in self.subvgroups.keys():
subvgroup_list.append(vk)
link_list = []
for l in self.vgroup_list:
link_list.append(l.get_json_info())
return {'name': self.name,
'status': self.status,
'vgroup_type': self.vgroup_type,
'level': self.level,
'survgroup': survgroup_id,
'subvgroup_list': subvgroup_list,
'link_list': link_list,
'diversity_groups': self.diversity_groups,
'exclusivity_groups': self.exclusivity_groups,
'availability_zones': self.availability_zone_list,
@ -98,14 +86,9 @@ class VGroup(object):
'cpus': self.vCPUs,
'mem': self.mem,
'local_volume': self.local_volume_size,
'volumes': self.volume_sizes,
'nw_bandwidth': self.nw_bandwidth,
'io_bandwidth': self.io_bandwidth,
'cpu_weight': self.vCPU_weight,
'mem_weight': self.mem_weight,
'local_volume_weight': self.local_volume_weight,
'volume_weight': self.volume_weight,
'bandwidth_weight': self.bandwidth_weight,
'host': self.host}
@ -126,9 +109,6 @@ class VM(object):
self.survgroup = None # VGroup where this vm belongs to
self.volume_list = [] # a list of links to Volumes
self.vm_list = [] # a list of links to VMs
self.diversity_groups = {}
self.exclusivity_groups = {}
@ -139,13 +119,10 @@ class VM(object):
self.vCPUs = 0
self.mem = 0 # MB
self.local_volume_size = 0 # GB
self.nw_bandwidth = 0
self.io_bandwidth = 0
self.vCPU_weight = -1
self.mem_weight = -1
self.local_volume_weight = -1
self.bandwidth_weight = -1
self.host = None # where this vm is placed
@ -157,14 +134,6 @@ class VM(object):
else:
survgroup_id = self.survgroup.uuid
vm_list = []
for vml in self.vm_list:
vm_list.append(vml.get_json_info())
vol_list = []
for voll in self.volume_list:
vol_list.append(voll.get_json_info())
availability_zone = None
if self.availability_zone is None:
availability_zone = "none"
@ -174,8 +143,6 @@ class VM(object):
return {'name': self.name,
'status': self.status,
'survgroup': survgroup_id,
'vm_list': vm_list,
'volume_list': vol_list,
'diversity_groups': self.diversity_groups,
'exclusivity_groups': self.exclusivity_groups,
'availability_zones': availability_zone,
@ -184,128 +151,7 @@ class VM(object):
'cpus': self.vCPUs,
'mem': self.mem,
'local_volume': self.local_volume_size,
'nw_bandwidth': self.nw_bandwidth,
'io_bandwidth': self.io_bandwidth,
'cpu_weight': self.vCPU_weight,
'mem_weight': self.mem_weight,
'local_volume_weight': self.local_volume_weight,
'bandwidth_weight': self.bandwidth_weight,
'host': self.host}
class Volume(object):
"""Volume Class.
This class represents a volume, containing an app id and name, as well as
a list of links to VMs and the groups it belongs to. This also contains
data about the resources needed such as size, bandwidth and weight.
"""
def __init__(self, _app_uuid, _uuid):
"""Init Volume Class."""
self.app_uuid = _app_uuid
self.uuid = _uuid
self.name = None
self.status = "requested"
self.volume_class = None
self.survgroup = None # where this vm belongs to
self.vm_list = [] # a list of links to VMs
self.diversity_groups = {}
self.exclusivity_groups = {}
self.volume_size = 0 # GB
self.io_bandwidth = 0
self.volume_weight = -1
self.bandwidth_weight = -1
self.storage_host = None
def get_json_info(self):
"""Return JSON info for a Volume."""
survgroup_id = None
if self.survgroup is None:
survgroup_id = "none"
else:
survgroup_id = self.survgroup.uuid
volume_class = None
if self.volume_class is None:
volume_class = "none"
else:
volume_class = self.volume_class
vm_list = []
for vml in self.vm_list:
vm_list.append(vml.get_json_info())
return {'name': self.name,
'status': self.status,
'class': volume_class,
'survgroup': survgroup_id,
'vm_list': vm_list,
'diversity_groups': self.diversity_groups,
'exclusivity_groups': self.exclusivity_groups,
'volume': self.volume_size,
'io_bandwidth': self.io_bandwidth,
'volume_weight': self.volume_weight,
'bandwidth_weight': self.bandwidth_weight,
'host': self.storage_host}
class VGroupLink(object):
"""VGroup Link Class.
This class represents a link between VGroups.
"""
def __init__(self, _n):
"""Init VGroup Link."""
self.node = _n # target VM or Volume
self.nw_bandwidth = 0
self.io_bandwidth = 0
def get_json_info(self):
"""Return JSON info of VGroup Link Object."""
return {'target': self.node.uuid,
'nw_bandwidth': self.nw_bandwidth,
'io_bandwidth': self.io_bandwidth}
class VMLink(object):
"""VM Link Class.
This class represents a link between VMs.
"""
def __init__(self, _n):
"""Init VM Link."""
self.node = _n # target VM
self.nw_bandwidth = 0 # Mbps
def get_json_info(self):
"""Return JSON info of VM Link Object."""
return {'target': self.node.uuid,
'nw_bandwidth': self.nw_bandwidth}
class VolumeLink(object):
"""Volume Link Class.
This class represents a link between volumes.
"""
def __init__(self, _n):
"""Init Volume Link."""
self.node = _n # target Volume
self.io_bandwidth = 0 # Mbps
def get_json_info(self):
"""Return JSON info of Volume Link Object."""
return {'target': self.node.uuid,
'io_bandwidth': self.io_bandwidth}

View File

@ -35,6 +35,7 @@ class Parser(object):
This class handles parsing out the data related to the desired
topology from a template.
not supported OS::Nova::ServerGroup OS::Heat::AutoScalingGroup OS::Heat::Stack OS::Heat::ResourceGroup
"""
def __init__(self, _high_level_allowed, _logger):
@ -83,48 +84,35 @@ class Parser(object):
def _set_topology(self, _elements):
vgroups = {}
vgroup_captured = False
vms = {}
""" empty at this version """
volumes = {}
for rk, r in _elements.iteritems():
if r["type"] == "OS::Nova::Server":
vm = VM(self.stack_id, rk)
if "name" in r.keys():
vm.name = r["name"]
else:
vm.name = vm.uuid
flavor_id = r["properties"]["flavor"]
if isinstance(flavor_id, six.string_types):
vm.flavor = flavor_id
else:
vm.flavor = str(flavor_id)
if "availability_zone" in r["properties"].keys():
az = r["properties"]["availability_zone"]
# NOTE: do not allow to specify a certain host name
vm.availability_zone = az.split(":")[0]
if "locations" in r.keys():
if len(r["locations"]) > 0:
self.candidate_list_map[rk] = r["locations"]
vms[vm.uuid] = vm
self.logger.debug("vm = " + vm.uuid)
self.logger.info("vm = " + vm.uuid)
elif r["type"] == "OS::Cinder::Volume":
self.logger.warn("Parser: do nothing for volume at this "
"version")
elif r["type"] == "ATT::Valet::GroupAssignment":
vgroup = VGroup(self.stack_id, rk)
vgroup.vgroup_type = None
if "group_type" in r["properties"].keys():
if r["properties"]["group_type"] == "affinity":
@ -136,17 +124,17 @@ class Parser(object):
else:
self.status = "unknown group = " + \
r["properties"]["group_type"]
return {}, {}, {}
return {}, {}
else:
self.status = "no group type"
return {}, {}, {}
return {}, {}
if "group_name" in r["properties"].keys():
vgroup.name = r["properties"]["group_name"]
else:
if vgroup.vgroup_type == "EX":
self.status = "no exclusivity group identifier"
return {}, {}, {}
return {}, {}
else:
vgroup.name = "any"
@ -154,39 +142,21 @@ class Parser(object):
vgroup.level = r["properties"]["level"]
if vgroup.level != "host":
if self.high_level_allowed is False:
self.status = "only host level of affinity group " \
"allowed due to the mis-match of " \
"host naming convention"
return {}, {}, {}
self.status = "only host level of affinity group allowed " + \
"due to the mis-match of host naming convention"
return {}, {}
else:
self.status = "no grouping level"
return {}, {}, {}
return {}, {}
vgroups[vgroup.uuid] = vgroup
self.logger.info("group = " + vgroup.name + vgroup.name + ", type = " + vgroup.vgroup_type)
self.logger.debug("group = " + vgroup.name + vgroup.name + ", type = " + vgroup.vgroup_type)
vgroup_captured = True
self._set_vm_links(_elements, vms)
if self._set_volume_links(_elements, vms, volumes) is False:
return {}, {}, {}
self._set_total_link_capacities(vms, volumes)
self.logger.debug("all vms parsed")
if self._merge_diversity_groups(_elements, vgroups, vms, volumes) \
is False:
return {}, {}, {}
if self._merge_exclusivity_groups(_elements, vgroups, vms, volumes) \
is False:
return {}, {}, {}
if self._merge_affinity_groups(_elements, vgroups, vms, volumes) \
is False:
return {}, {}, {}
if self._merge_diversity_groups(_elements, vgroups, vms) is False:
return {}, {}
if self._merge_exclusivity_groups(_elements, vgroups, vms) is False:
return {}, {}
if self._merge_affinity_groups(_elements, vgroups, vms) is False:
return {}, {}
""" delete all EX and DIV vgroups after merging """
for vgk in vgroups.keys():
@ -194,222 +164,110 @@ class Parser(object):
if vg.vgroup_type == "DIV" or vg.vgroup_type == "EX":
del vgroups[vgk]
for vgk in vgroups.keys():
vgroup = vgroups[vgk]
self._set_vgroup_links(vgroup, vgroups, vms, volumes)
return vgroups, vms
if vgroup_captured is True:
self.logger.debug("all groups resolved")
return vgroups, vms, volumes
def _set_vm_links(self, _elements, _vms):
for _, r in _elements.iteritems():
if r["type"] == "ATT::CloudQoS::Pipe":
resources = r["properties"]["resources"]
for vk1 in resources:
if vk1 in _vms.keys():
vm = _vms[vk1]
for vk2 in resources:
if vk2 != vk1:
if vk2 in _vms.keys():
link = VMLink(_vms[vk2])
if "bandwidth" in r["properties"].keys():
link.nw_bandwidth = \
r["properties"]["bandwidth"]["min"]
vm.vm_list.append(link)
def _set_volume_links(self, _elements, _vms, _volumes):
for rk, r in _elements.iteritems():
if r["type"] == "OS::Cinder::VolumeAttachment":
self.logger.warn("Parser: do nothing for volume attachment at "
"this version")
return True
def _set_total_link_capacities(self, _vms, _volumes):
for _, vm in _vms.iteritems():
for vl in vm.vm_list:
vm.nw_bandwidth += vl.nw_bandwidth
for voll in vm.volume_list:
vm.io_bandwidth += voll.io_bandwidth
for _, volume in _volumes.iteritems():
for vl in volume.vm_list:
volume.io_bandwidth += vl.io_bandwidth
def _merge_diversity_groups(self, _elements, _vgroups, _vms, _volumes):
def _merge_diversity_groups(self, _elements, _vgroups, _vms):
for level in LEVELS:
for rk, r in _elements.iteritems():
if r["type"] == "ATT::Valet::GroupAssignment" and \
r["properties"]["group_type"] == "diversity" and \
r["properties"]["level"] == level:
vgroup = _vgroups[rk]
for vk in r["properties"]["resources"]:
if vk in _vms.keys():
vgroup.subvgroups[vk] = _vms[vk]
_vms[vk].diversity_groups[rk] = \
vgroup.level + ":" + vgroup.name
elif vk in _volumes.keys():
vgroup.subvgroups[vk] = _volumes[vk]
_volumes[vk].diversity_groups[rk] = \
vgroup.level + ":" + vgroup.name
_vms[vk].diversity_groups[rk] = vgroup.level + ":" + vgroup.name
elif vk in _vgroups.keys():
vg = _vgroups[vk]
if LEVELS.index(vg.level) > LEVELS.index(level):
self.status = "grouping scope: nested " \
"group's level is higher"
return False
if vg.vgroup_type == "DIV" or \
vg.vgroup_type == "EX":
self.status = "group type (" + \
vg.vgroup_type + ") not allowd " \
"to be nested in diversity " \
"group at this version"
if vg.vgroup_type == "DIV" or vg.vgroup_type == "EX":
self.status = vg.vgroup_type + " not allowd to be nested in diversity group"
return False
vgroup.subvgroups[vk] = vg
vg.diversity_groups[rk] = vgroup.level + ":" + \
vgroup.name
else:
self.status = "invalid resource = " + vk
return False
return True
def _merge_exclusivity_groups(self, _elements, _vgroups, _vms, _volumes):
def _merge_exclusivity_groups(self, _elements, _vgroups, _vms):
for level in LEVELS:
for rk, r in _elements.iteritems():
if r["type"] == "ATT::Valet::GroupAssignment" and \
r["properties"]["group_type"] == "exclusivity" and \
r["properties"]["level"] == level:
vgroup = _vgroups[rk]
for vk in r["properties"]["resources"]:
if vk in _vms.keys():
vgroup.subvgroups[vk] = _vms[vk]
_vms[vk].exclusivity_groups[rk] = \
vgroup.level + ":" + vgroup.name
elif vk in _volumes.keys():
vgroup.subvgroups[vk] = _volumes[vk]
_volumes[vk].exclusivity_groups[rk] = \
vgroup.level + ":" + vgroup.name
_vms[vk].exclusivity_groups[rk] = vgroup.level + ":" + vgroup.name
elif vk in _vgroups.keys():
vg = _vgroups[vk]
if LEVELS.index(vg.level) > LEVELS.index(level):
self.status = "grouping scope: nested " \
"group's level is higher"
return False
if vg.vgroup_type == "DIV" or \
vg.vgroup_type == "EX":
self.status = "group type (" + \
vg.vgroup_type + ") not allowd " \
"to be nested " \
"in " \
"exclusivity " \
"group at " \
"this version"
if vg.vgroup_type == "DIV" or vg.vgroup_type == "EX":
self.status = vg.vgroup_type + ") not allowd to be nested in exclusivity group"
return False
vgroup.subvgroups[vk] = vg
vg.exclusivity_groups[rk] = vgroup.level + ":" + \
vgroup.name
else:
self.status = "invalid resource = " + vk
return False
return True
def _merge_affinity_groups(self, _elements, _vgroups, _vms, _volumes):
# key is uuid of vm, volume, or vgroup & value is its parent vgroup
def _merge_affinity_groups(self, _elements, _vgroups, _vms):
# key is uuid of vm or vgroup & value is its parent vgroup
affinity_map = {}
for level in LEVELS:
for rk, r in _elements.iteritems():
if r["type"] == "ATT::Valet::GroupAssignment" and \
r["properties"]["group_type"] == "affinity" and \
r["properties"]["level"] == level:
vgroup = None
if rk in _vgroups.keys():
vgroup = _vgroups[rk]
else:
continue
self.logger.debug("Parser: merge for affinity = " +
vgroup.name)
for vk in r["properties"]["resources"]:
if vk in _vms.keys():
vgroup.subvgroups[vk] = _vms[vk]
_vms[vk].survgroup = vgroup
affinity_map[vk] = vgroup
self._add_implicit_diversity_groups(
vgroup, _vms[vk].diversity_groups)
self._add_implicit_exclusivity_groups(
vgroup, _vms[vk].exclusivity_groups)
self._add_implicit_diversity_groups(vgroup, _vms[vk].diversity_groups)
self._add_implicit_exclusivity_groups(vgroup, _vms[vk].exclusivity_groups)
self._add_memberships(vgroup, _vms[vk])
del _vms[vk]
elif vk in _volumes.keys():
vgroup.subvgroups[vk] = _volumes[vk]
_volumes[vk].survgroup = vgroup
affinity_map[vk] = vgroup
self._add_implicit_diversity_groups(
vgroup, _volumes[vk].diversity_groups)
self._add_implicit_exclusivity_groups(
vgroup, _volumes[vk].exclusivity_groups)
self._add_memberships(vgroup, _volumes[vk])
del _volumes[vk]
elif vk in _vgroups.keys():
vg = _vgroups[vk]
if LEVELS.index(vg.level) > LEVELS.index(level):
self.status = "grouping scope: nested " \
"group's level is higher"
return False
if vg.vgroup_type == "DIV" or vg.vgroup_type == "EX":
if self._merge_subgroups(
vgroup, vg.subvgroups, _vms, _volumes,
_vgroups, _elements, affinity_map) \
is False:
if self._merge_subgroups(vgroup, vg.subvgroups, _vms, _vgroups,
_elements, affinity_map) is False:
return False
del _vgroups[vk]
else:
if self._exist_in_subgroups(vk, vgroup) is None:
if self._get_subgroups(
vg, _elements, _vgroups, _vms,
_volumes, affinity_map) \
is False:
if self._get_subgroups(vg, _elements,
_vgroups, _vms,
affinity_map) is False:
return False
vgroup.subvgroups[vk] = vg
vg.survgroup = vgroup
affinity_map[vk] = vgroup
self._add_implicit_diversity_groups(
vgroup, vg.diversity_groups)
self._add_implicit_exclusivity_groups(
vgroup, vg.exclusivity_groups)
self._add_implicit_diversity_groups(vgroup, vg.diversity_groups)
self._add_implicit_exclusivity_groups(vgroup, vg.exclusivity_groups)
self._add_memberships(vgroup, vg)
del _vgroups[vk]
else:
# vk belongs to the other vgroup already
@ -417,7 +275,6 @@ class Parser(object):
if vk not in affinity_map.keys():
self.status = "invalid resource = " + vk
return False
if affinity_map[vk].uuid != vgroup.uuid:
if self._exist_in_subgroups(vk, vgroup) is None:
self._set_implicit_grouping(
@ -425,69 +282,38 @@ class Parser(object):
return True
def _merge_subgroups(self, _vgroup, _subgroups, _vms, _volumes, _vgroups,
_elements, _affinity_map):
def _merge_subgroups(self, _vgroup, _subgroups, _vms, _vgroups, _elements, _affinity_map):
for vk, _ in _subgroups.iteritems():
if vk in _vms.keys():
_vgroup.subvgroups[vk] = _vms[vk]
_vms[vk].survgroup = _vgroup
_affinity_map[vk] = _vgroup
self._add_implicit_diversity_groups(
_vgroup, _vms[vk].diversity_groups)
self._add_implicit_exclusivity_groups(
_vgroup, _vms[vk].exclusivity_groups)
self._add_implicit_diversity_groups(_vgroup, _vms[vk].diversity_groups)
self._add_implicit_exclusivity_groups(_vgroup, _vms[vk].exclusivity_groups)
self._add_memberships(_vgroup, _vms[vk])
del _vms[vk]
elif vk in _volumes.keys():
_vgroup.subvgroups[vk] = _volumes[vk]
_volumes[vk].survgroup = _vgroup
_affinity_map[vk] = _vgroup
self._add_implicit_diversity_groups(
_vgroup, _volumes[vk].diversity_groups)
self._add_implicit_exclusivity_groups(
_vgroup, _volumes[vk].exclusivity_groups)
self._add_memberships(_vgroup, _volumes[vk])
del _volumes[vk]
elif vk in _vgroups.keys():
vg = _vgroups[vk]
if LEVELS.index(vg.level) > LEVELS.index(_vgroup.level):
self.status = "grouping scope: nested group's level is " \
"higher"
return False
if vg.vgroup_type == "DIV" or vg.vgroup_type == "EX":
if self._merge_subgroups(_vgroup, vg.subvgroups,
_vms, _volumes, _vgroups,
_vms, _vgroups,
_elements, _affinity_map) is False:
return False
del _vgroups[vk]
else:
if self._exist_in_subgroups(vk, _vgroup) is None:
if self._get_subgroups(vg, _elements, _vgroups, _vms,
_volumes, _affinity_map) \
is False:
if self._get_subgroups(vg, _elements, _vgroups, _vms, _affinity_map) is False:
return False
_vgroup.subvgroups[vk] = vg
vg.survgroup = _vgroup
_affinity_map[vk] = _vgroup
self._add_implicit_diversity_groups(
_vgroup, vg.diversity_groups)
self._add_implicit_exclusivity_groups(
_vgroup, vg.exclusivity_groups)
self._add_implicit_diversity_groups(_vgroup, vg.diversity_groups)
self._add_implicit_exclusivity_groups(_vgroup, vg.exclusivity_groups)
self._add_memberships(_vgroup, vg)
del _vgroups[vk]
else:
# vk belongs to the other vgroup already
@ -495,89 +321,51 @@ class Parser(object):
if vk not in _affinity_map.keys():
self.status = "invalid resource = " + vk
return False
if _affinity_map[vk].uuid != _vgroup.uuid:
if self._exist_in_subgroups(vk, _vgroup) is None:
self._set_implicit_grouping(vk, _vgroup, _affinity_map, _vgroups)
return True
def _get_subgroups(self, _vgroup, _elements, _vgroups, _vms, _volumes,
_affinity_map):
def _get_subgroups(self, _vgroup, _elements, _vgroups, _vms, _affinity_map):
for vk in _elements[_vgroup.uuid]["properties"]["resources"]:
if vk in _vms.keys():
_vgroup.subvgroups[vk] = _vms[vk]
_vms[vk].survgroup = _vgroup
_affinity_map[vk] = _vgroup
self._add_implicit_diversity_groups(
_vgroup, _vms[vk].diversity_groups)
self._add_implicit_exclusivity_groups(
_vgroup, _vms[vk].exclusivity_groups)
self._add_implicit_diversity_groups(_vgroup, _vms[vk].diversity_groups)
self._add_implicit_exclusivity_groups(_vgroup, _vms[vk].exclusivity_groups)
self._add_memberships(_vgroup, _vms[vk])
del _vms[vk]
elif vk in _volumes.keys():
_vgroup.subvgroups[vk] = _volumes[vk]
_volumes[vk].survgroup = _vgroup
_affinity_map[vk] = _vgroup
self._add_implicit_diversity_groups(
_vgroup, _volumes[vk].diversity_groups)
self._add_implicit_exclusivity_groups(
_vgroup, _volumes[vk].exclusivity_groups)
self._add_memberships(_vgroup, _volumes[vk])
del _volumes[vk]
elif vk in _vgroups.keys():
vg = _vgroups[vk]
if LEVELS.index(vg.level) > LEVELS.index(_vgroup.level):
self.status = "grouping scope: nested group's level is " \
"higher"
return False
if vg.vgroup_type == "DIV" or vg.vgroup_type == "EX":
if self._merge_subgroups(_vgroup, vg.subvgroups,
_vms, _volumes, _vgroups,
_vms, _vgroups,
_elements, _affinity_map) is False:
return False
del _vgroups[vk]
else:
if self._exist_in_subgroups(vk, _vgroup) is None:
if self._get_subgroups(
vg, _elements, _vgroups, _vms, _volumes,
_affinity_map) is False:
if self._get_subgroups(vg, _elements, _vgroups, _vms, _affinity_map) is False:
return False
_vgroup.subvgroups[vk] = vg
vg.survgroup = _vgroup
_affinity_map[vk] = _vgroup
self._add_implicit_diversity_groups(
_vgroup, vg.diversity_groups)
self._add_implicit_exclusivity_groups(
_vgroup, vg.exclusivity_groups)
self._add_implicit_diversity_groups(_vgroup, vg.diversity_groups)
self._add_implicit_exclusivity_groups(_vgroup, vg.exclusivity_groups)
self._add_memberships(_vgroup, vg)
del _vgroups[vk]
else:
if vk not in _affinity_map.keys():
self.status = "invalid resource = " + vk
return False
if _affinity_map[vk].uuid != _vgroup.uuid:
if self._exist_in_subgroups(vk, _vgroup) is None:
self._set_implicit_grouping(
vk, _vgroup, _affinity_map, _vgroups)
self._set_implicit_grouping(vk, _vgroup, _affinity_map, _vgroups)
return True
def _add_implicit_diversity_groups(self, _vgroup, _diversity_groups):
@ -596,46 +384,31 @@ class Parser(object):
if isinstance(_v, VM) or isinstance(_v, VGroup):
for extra_specs in _v.extra_specs_list:
_vgroup.extra_specs_list.append(extra_specs)
if isinstance(_v, VM) and _v.availability_zone is not None:
if _v.availability_zone not in _vgroup.availability_zone_list:
_vgroup.availability_zone_list.append(_v.availability_zone)
if isinstance(_v, VGroup):
for az in _v.availability_zone_list:
if az not in _vgroup.availability_zone_list:
_vgroup.availability_zone_list.append(az)
'''
for hgk, hg in _v.host_aggregates.iteritems():
_vgroup.host_aggregates[hgk] = hg
'''
''' take vk's most top parent as a s_vg's child vgroup '''
def _set_implicit_grouping(self, _vk, _s_vg, _affinity_map, _vgroups):
t_vg = _affinity_map[_vk] # where _vk currently belongs to
# if the parent belongs to the other parent vgroup
if t_vg.uuid in _affinity_map.keys():
self._set_implicit_grouping(
t_vg.uuid, _s_vg, _affinity_map, _vgroups)
# if the parent belongs to the other parent vgroup
self._set_implicit_grouping(t_vg.uuid, _s_vg, _affinity_map, _vgroups)
else:
if LEVELS.index(t_vg.level) > LEVELS.index(_s_vg.level):
t_vg.level = _s_vg.level
if self._exist_in_subgroups(t_vg.uuid, _s_vg) is None:
_s_vg.subvgroups[t_vg.uuid] = t_vg
t_vg.survgroup = _s_vg
_affinity_map[t_vg.uuid] = _s_vg
self._add_implicit_diversity_groups(
_s_vg, t_vg.diversity_groups)
self._add_implicit_exclusivity_groups(
_s_vg, t_vg.exclusivity_groups)
self._add_implicit_diversity_groups(_s_vg, t_vg.diversity_groups)
self._add_implicit_exclusivity_groups(_s_vg, t_vg.exclusivity_groups)
self._add_memberships(_s_vg, t_vg)
del _vgroups[t_vg.uuid]
def _exist_in_subgroups(self, _vk, _vg):
@ -650,78 +423,3 @@ class Parser(object):
if containing_vg_uuid is not None:
break
return containing_vg_uuid
def _set_vgroup_links(self, _vgroup, _vgroups, _vms, _volumes):
for _, svg in _vgroup.subvgroups.iteritems():
# currently, not define vgroup itself in pipe
if isinstance(svg, VM):
for vml in svg.vm_list:
found = False
for _, tvgroup in _vgroups.iteritems():
containing_vg_uuid = self._exist_in_subgroups(
vml.node.uuid, tvgroup)
if containing_vg_uuid is not None:
found = True
if containing_vg_uuid != _vgroup.uuid and \
self._exist_in_subgroups(
containing_vg_uuid, _vgroup) is None:
self._add_nw_link(vml, _vgroup)
break
if found is False:
for tvk in _vms.keys():
if tvk == vml.node.uuid:
self._add_nw_link(vml, _vgroup)
break
for voll in svg.volume_list:
found = False
for _, tvgroup in _vgroups.iteritems():
containing_vg_uuid = self._exist_in_subgroups(
voll.node.uuid, tvgroup)
if containing_vg_uuid is not None:
found = True
if containing_vg_uuid != _vgroup.uuid and \
self._exist_in_subgroups(
containing_vg_uuid, _vgroup) is None:
self._add_io_link(voll, _vgroup)
break
if found is False:
for tvk in _volumes.keys():
if tvk == voll.node.uuid:
self._add_io_link(voll, _vgroup)
break
elif isinstance(svg, VGroup):
self._set_vgroup_links(svg, _vgroups, _vms, _volumes)
for svgl in svg.vgroup_list: # svgl is a link to VM or Volume
if self._exist_in_subgroups(svgl.node.uuid, _vgroup) \
is None:
self._add_nw_link(svgl, _vgroup)
self._add_io_link(svgl, _vgroup)
def _add_nw_link(self, _link, _vgroup):
_vgroup.nw_bandwidth += _link.nw_bandwidth
vgroup_link = self._get_vgroup_link(_link, _vgroup.vgroup_list)
if vgroup_link is not None:
vgroup_link.nw_bandwidth += _link.nw_bandwidth
else:
link = VGroupLink(_link.node) # _link.node is VM
link.nw_bandwidth = _link.nw_bandwidth
_vgroup.vgroup_list.append(link)
def _add_io_link(self, _link, _vgroup):
_vgroup.io_bandwidth += _link.io_bandwidth
vgroup_link = self._get_vgroup_link(_link, _vgroup.vgroup_list)
if vgroup_link is not None:
vgroup_link.io_bandwidth += _link.io_bandwidth
else:
link = VGroupLink(_link.node)
link.io_bandwidth = _link.io_bandwidth
_vgroup.vgroup_list.append(link)
def _get_vgroup_link(self, _link, _vgroup_link_list):
vgroup_link = None
for vgl in _vgroup_link_list:
if vgl.node.uuid == _link.node.uuid:
vgroup_link = vgl
break
return vgroup_link

View File

@ -34,7 +34,6 @@ class App(object):
self.vgroups = {}
self.vms = {}
self.volumes = {}
self.status = 'requested' # Moved to "scheduled" (and then "placed")
@ -44,12 +43,6 @@ class App(object):
self.vms[_vm.uuid].status = _status
self.vms[_vm.uuid].host = _host_name
def add_volume(self, _vol, _host_name):
"""Add volume to app, set status to scheduled."""
self.vms[_vol.uuid] = _vol
self.vms[_vol.uuid].status = "scheduled"
self.vms[_vol.uuid].storage_host = _host_name
def add_vgroup(self, _vg, _host_name):
"""Add vgroup to app, set status to scheduled."""
self.vgroups[_vg.uuid] = _vg
@ -62,10 +55,6 @@ class App(object):
for vmk, vm in self.vms.iteritems():
vms[vmk] = vm.get_json_info()
vols = {}
for volk, vol in self.volumes.iteritems():
vols[volk] = vol.get_json_info()
vgs = {}
for vgk, vg in self.vgroups.iteritems():
vgs[vgk] = vg.get_json_info()
@ -75,7 +64,6 @@ class App(object):
'stack_id': self.app_id,
'name': self.app_name,
'VMs': vms,
'Volumes': vols,
'VGroups': vgs}
def log_in_info(self):

View File

@ -159,7 +159,6 @@ class Event(object):
str_numa_topology = self.change_data['numa_topology']
try:
numa_topology = json.loads(str_numa_topology)
# print json.dumps(numa_topology, indent=4)
if 'nova_object.data' in numa_topology.keys():
if 'cells' in numa_topology['nova_object.data']:
@ -171,7 +170,6 @@ class Event(object):
except (ValueError, KeyError, TypeError):
pass
# print "error while parsing numa_topology"
elif self.method == 'build_and_run_instance':
if 'scheduler_hints' in self.args['filter_properties'].keys():

View File

@ -25,6 +25,7 @@ def ensurekey(d, k):
return d.setdefault(k, {})
# FIXME(GJ): make MUSIC as pluggable
class MusicHandler(object):
"""Music Handler Class.
@ -37,19 +38,14 @@ class MusicHandler(object):
self.config = _config
self.logger = _logger
self.music = None
if self.config.mode.startswith("sim"):
self.music = Music()
elif self.config.mode.startswith("live"):
self.music = Music(hosts=self.config.db_hosts,
replication_factor=self.config.replication_factor)
if self.config.db_hosts is not None and len(self.config.db_hosts) > 0:
for dbh in self.config.db_hosts:
self.logger.debug("DB: music host = " + dbh)
if self.config.replication_factor is not None:
self.logger.debug("DB: music replication factor = " + str(self.config.replication_factor))
self.music = Music(hosts=self.config.db_hosts, replication_factor=self.config.replication_factor)
if self.config.db_hosts is not None and len(self.config.db_hosts) > 0:
for dbh in self.config.db_hosts:
self.logger.info("DB: music host = " + dbh)
if self.config.replication_factor is not None:
self.logger.info("DB: music replication factor = " + str(self.config.replication_factor))
# FIXME(GJ): this may not need
def init_db(self):
"""Init Database.
@ -127,30 +123,6 @@ class MusicHandler(object):
self.logger.error("DB: " + str(e))
return False
schema = {
'site_name': 'text',
'app_log_index': 'text',
'PRIMARY KEY': '(site_name)'
}
try:
self.music.create_table(self.config.db_keyspace,
self.config.db_app_index_table, schema)
except Exception as e:
self.logger.error("DB: " + str(e))
return False
schema = {
'site_name': 'text',
'resource_log_index': 'text',
'PRIMARY KEY': '(site_name)'
}
try:
self.music.create_table(self.config.db_keyspace,
self.config.db_resource_index_table, schema)
except Exception as e:
self.logger.error("DB: " + str(e))
return False
schema = {
'uuid': 'text',
'h_uuid': 'text',
@ -515,20 +487,6 @@ class MusicHandler(object):
del json_resource['logical_groups'][lgk]
json_resource['logical_groups'][lgk] = lg
if 'storages' in _status.keys():
storages = _status['storages']
for stk, st in storages.iteritems():
if stk in ensurekey(json_resource, 'storages').keys():
del json_resource['storages'][stk]
json_resource['storages'][stk] = st
if 'switches' in _status.keys():
switches = _status['switches']
for sk, s in switches.iteritems():
if sk in ensurekey(json_resource, 'switches').keys():
del json_resource['switches'][sk]
json_resource['switches'][sk] = s
if 'hosts' in _status.keys():
hosts = _status['hosts']
for hk, h in hosts.iteritems():
@ -578,42 +536,6 @@ class MusicHandler(object):
return True
def update_resource_log_index(self, _k, _index):
"""Update resource log index in database and return True."""
data = {
'site_name': _k,
'resource_log_index': str(_index)
}
try:
self.music.update_row_eventually(
self.config.db_keyspace, self.config.db_resource_index_table,
'site_name', _k, data)
except Exception as e:
self.logger.error("MUSIC error while updating resource log "
"index: " + str(e))
return False
return True
def update_app_log_index(self, _k, _index):
"""Update app log index in database and return True."""
data = {
'site_name': _k,
'app_log_index': str(_index)
}
try:
self.music.update_row_eventually(self.config.db_keyspace,
self.config.db_app_index_table,
'site_name', _k, data)
except Exception as e:
self.logger.error("MUSIC error while updating app log index: " +
str(e))
return False
return True
def add_app(self, _k, _app_data):
"""Add app to database in music and return True."""
try:
@ -734,7 +656,7 @@ class MusicHandler(object):
if vmk == _h_uuid:
if vm["status"] != "deleted":
vm["status"] = "deleted"
self.logger.debug("DB: deleted marked")
self.logger.warn("DB: deleted marked")
updated = True
else:
self.logger.warn("DB: vm was already deleted")

View File

@ -113,6 +113,7 @@ class ConstraintSolver(object):
self.logger.error("ConstraintSolver: " + self.status)
return candidate_list
""" diversity constraint """
if len(_n.node.diversity_groups) > 0:
for _, diversity_id in _n.node.diversity_groups.iteritems():
if diversity_id.split(":")[0] == _level:
@ -190,8 +191,7 @@ class ConstraintSolver(object):
if r not in conflict_list:
conflict_list.append(r)
_candidate_list[:] = [c for c in _candidate_list
if c not in conflict_list]
_candidate_list[:] = [c for c in _candidate_list if c not in conflict_list]
def _constrain_diversity_with_others(self, _level, _diversity_id,
_candidate_list):
@ -202,9 +202,7 @@ class ConstraintSolver(object):
if r not in conflict_list:
conflict_list.append(r)
_candidate_list[:] = [c for c in _candidate_list
if c not in conflict_list]
_candidate_list[:] = [c for c in _candidate_list if c not in conflict_list]
def exist_group(self, _level, _id, _group_type, _candidate):
"""Check if group esists."""
@ -230,8 +228,7 @@ class ConstraintSolver(object):
if r not in conflict_list:
conflict_list.append(r)
_candidate_list[:] = [c for c in _candidate_list
if c not in conflict_list]
_candidate_list[:] = [c for c in _candidate_list if c not in conflict_list]
def conflict_diversity(self, _level, _n, _node_placements, _candidate):
"""Return True if the candidate has a placement conflict."""
@ -273,8 +270,7 @@ class ConstraintSolver(object):
if r not in conflict_list:
conflict_list.append(r)
_candidate_list[:] = [c for c in _candidate_list
if c not in conflict_list]
_candidate_list[:] = [c for c in _candidate_list if c not in conflict_list]
def conflict_exclusivity(self, _level, _candidate):
"""Check for an exculsivity conflict."""
@ -358,8 +354,7 @@ class ConstraintSolver(object):
if r not in conflict_list:
conflict_list.append(r)
_candidate_list[:] = [c for c in _candidate_list
if c not in conflict_list]
_candidate_list[:] = [c for c in _candidate_list if c not in conflict_list]
def check_host_aggregates(self, _level, _candidate, _v):
"""Check if the candidate passes the aggregate instance extra specs zone filter."""
@ -373,8 +368,7 @@ class ConstraintSolver(object):
if r not in conflict_list:
conflict_list.append(r)
_candidate_list[:] = [c for c in _candidate_list
if c not in conflict_list]
_candidate_list[:] = [c for c in _candidate_list if c not in conflict_list]
def check_availability_zone(self, _level, _candidate, _v):
"""Check if the candidate passes the availability zone filter."""
@ -387,8 +381,7 @@ class ConstraintSolver(object):
if self.check_cpu_capacity(_level, _n.node, ch) is False:
conflict_list.append(ch)
_candidate_list[:] = [c for c in _candidate_list
if c not in conflict_list]
_candidate_list[:] = [c for c in _candidate_list if c not in conflict_list]
def check_cpu_capacity(self, _level, _v, _candidate):
"""Check if the candidate passes the core filter."""
@ -401,8 +394,7 @@ class ConstraintSolver(object):
if self.check_mem_capacity(_level, _n.node, ch) is False:
conflict_list.append(ch)
_candidate_list[:] = [c for c in _candidate_list
if c not in conflict_list]
_candidate_list[:] = [c for c in _candidate_list if c not in conflict_list]
def check_mem_capacity(self, _level, _v, _candidate):
"""Check if the candidate passes the RAM filter."""
@ -415,178 +407,8 @@ class ConstraintSolver(object):
if self.check_local_disk_capacity(_level, _n.node, ch) is False:
conflict_list.append(ch)
_candidate_list[:] = [c for c in _candidate_list
if c not in conflict_list]
_candidate_list[:] = [c for c in _candidate_list if c not in conflict_list]
def check_local_disk_capacity(self, _level, _v, _candidate):
"""Check if the candidate passes the disk filter."""
return self.openstack_D.host_passes(_level, _candidate, _v)
def _constrain_storage_capacity(self, _level, _n, _candidate_list):
conflict_list = []
for ch in _candidate_list:
if self.check_storage_availability(_level, _n.node, ch) is False:
conflict_list.append(ch)
# debug_resource_name = ch.get_resource_name(_level)
avail_storages = ch.get_avail_storages(_level)
avail_disks = []
volume_classes = []
volume_sizes = []
if isinstance(_n.node, VGroup):
for vck in _n.node.volume_sizes.keys():
volume_classes.append(vck)
volume_sizes.append(_n.node.volume_sizes[vck])
else:
volume_classes.append(_n.node.volume_class)
volume_sizes.append(_n.node.volume_size)
for vc in volume_classes:
for _, s in avail_storages.iteritems():
if vc == "any" or s.storage_class == vc:
_candidate_list[:] = [c for c in _candidate_list
if c not in conflict_list]
def check_storage_availability(self, _level, _v, _ch):
"""Return True if there is sufficient storage availability."""
available = False
volume_sizes = []
if isinstance(_v, VGroup):
for vck in _v.volume_sizes.keys():
volume_sizes.append((vck, _v.volume_sizes[vck]))
else:
volume_sizes.append((_v.volume_class, _v.volume_size))
avail_storages = _ch.get_avail_storages(_level)
for vc, vs in volume_sizes:
for _, s in avail_storages.iteritems():
if vc == "any" or s.storage_class == vc:
if s.storage_avail_disk >= vs:
available = True
break
else:
available = False
if available is False:
break
return available
def _constrain_nw_bandwidth_capacity(self, _level, _n, _node_placements,
_candidate_list):
conflict_list = []
for cr in _candidate_list:
if self.check_nw_bandwidth_availability(
_level, _n, _node_placements, cr) is False:
if cr not in conflict_list:
conflict_list.append(cr)
_candidate_list[:] = [c for c in _candidate_list
if c not in conflict_list]
def check_nw_bandwidth_availability(self, _level, _n, _node_placements,
_cr):
"""Return True if there is sufficient network availability."""
# NOTE: 3rd entry for special node requiring bandwidth of out-going
# from spine switch
total_req_bandwidths = [0, 0, 0]
link_list = _n.get_all_links()
for vl in link_list:
bandwidth = _n.get_bandwidth_of_link(vl)
placement_level = None
if vl.node in _node_placements.keys(): # vl.node is VM or Volume
placement_level = \
_node_placements[vl.node].get_common_placement(_cr)
else: # in the open list
placement_level = \
_n.get_common_diversity(vl.node.diversity_groups)
if placement_level == "ANY":
implicit_diversity = self.get_implicit_diversity(_n.node,
link_list,
vl.node,
_level)
if implicit_diversity[0] is not None:
placement_level = implicit_diversity[1]
self.get_req_bandwidths(_level, placement_level, bandwidth,
total_req_bandwidths)
return self._check_nw_bandwidth_availability(_level,
total_req_bandwidths, _cr)
# to find any implicit diversity relation caused by the other links of _v
# (i.e., intersection between _v and _target_v)
def get_implicit_diversity(self, _v, _link_list, _target_v, _level):
"""Get the maximum implicit diversity between _v and _target_v."""
max_implicit_diversity = (None, 0)
for vl in _link_list:
diversity_level = _v.get_common_diversity(vl.node.diversity_groups)
if diversity_level != "ANY" \
and LEVELS.index(diversity_level) >= LEVELS.index(_level):
for dk, dl in vl.node.diversity_groups.iteritems():
if LEVELS.index(dl) > LEVELS.index(diversity_level):
if _target_v.uuid != vl.node.uuid:
if dk in _target_v.diversity_groups.keys():
if LEVELS.index(dl) > max_implicit_diversity[1]:
max_implicit_diversity = (dk, dl)
return max_implicit_diversity
def get_req_bandwidths(self, _level, _placement_level, _bandwidth,
_total_req_bandwidths):
"""Calculate and update total required bandwidths."""
if _level == "cluster" or _level == "rack":
if _placement_level == "cluster" or _placement_level == "rack":
_total_req_bandwidths[1] += _bandwidth
elif _level == "host":
if _placement_level == "cluster" or _placement_level == "rack":
_total_req_bandwidths[1] += _bandwidth
_total_req_bandwidths[0] += _bandwidth
elif _placement_level == "host":
_total_req_bandwidths[0] += _bandwidth
def _check_nw_bandwidth_availability(self, _level, _req_bandwidths,
_candidate_resource):
available = True
if _level == "cluster":
cluster_avail_bandwidths = []
for _, sr in _candidate_resource.cluster_avail_switches.iteritems():
cluster_avail_bandwidths.append(max(sr.avail_bandwidths))
if max(cluster_avail_bandwidths) < _req_bandwidths[1]:
available = False
elif _level == "rack":
rack_avail_bandwidths = []
for _, sr in _candidate_resource.rack_avail_switches.iteritems():
rack_avail_bandwidths.append(max(sr.avail_bandwidths))
if max(rack_avail_bandwidths) < _req_bandwidths[1]:
available = False
elif _level == "host":
host_avail_bandwidths = []
for _, sr in _candidate_resource.host_avail_switches.iteritems():
host_avail_bandwidths.append(max(sr.avail_bandwidths))
if max(host_avail_bandwidths) < _req_bandwidths[0]:
available = False
rack_avail_bandwidths = []
for _, sr in _candidate_resource.rack_avail_switches.iteritems():
rack_avail_bandwidths.append(max(sr.avail_bandwidths))
avail_bandwidth = min(max(host_avail_bandwidths),
max(rack_avail_bandwidths))
if avail_bandwidth < _req_bandwidths[1]:
available = False
return available

View File

@ -15,6 +15,8 @@
"""AggregateInstanceExtraSpecsFilter."""
import openstack_utils
import six
from valet.engine.optimizer.app_manager.app_topology_base import VM
@ -23,6 +25,7 @@ from valet.engine.optimizer.ostro import openstack_utils
_SCOPE = 'aggregate_instance_extra_specs'
# FIXME(GJ): make extensible
class AggregateInstanceExtraSpecsFilter(object):
"""AggregateInstanceExtraSpecsFilter works with InstanceType records."""
@ -192,7 +195,7 @@ class DiskFilter(object):
self.logger = _logger
def host_passes(self, _level, _host, _v):
"""Return True if the requested disk is less than the available disk."""
"""Filter based on disk usage."""
requested_disk = _v.local_volume_size
(_, usable_disk) = _host.get_local_disk(_level)

View File

@ -18,10 +18,12 @@
import time
from valet.engine.optimizer.app_manager.app_topology_base \
import VGroup, VM, Volume
import VGroup, VM
from valet.engine.optimizer.ostro.search import Search
# FIXME(GJ): make search algorithm pluggable
# NOTE(GJ): do not deal with Volume placements at this version
class Optimizer(object):
"""Optimizer."""
@ -42,8 +44,6 @@ class Optimizer(object):
uuid_map = None
place_type = None
start_ts = time.time()
if len(_app_topology.exclusion_list_map) > 0:
place_type = "migration"
else:
@ -70,24 +70,11 @@ class Optimizer(object):
else:
success = self.search.place_nodes(_app_topology, self.resource)
end_ts = time.time()
if success is True:
self.logger.debug("Optimizer: search running time = " +
str(end_ts - start_ts) + " sec")
self.logger.debug("Optimizer: total number of hosts = " +
str(self.search.num_of_hosts))
placement_map = {}
for v in self.search.node_placements.keys():
if isinstance(v, VM):
placement_map[v] = self.search.node_placements[v].host_name
elif isinstance(v, Volume):
placement_map[v] = \
self.search.node_placements[v].host_name + "@"
placement_map[v] += \
self.search.node_placements[v].storage.storage_name
elif isinstance(v, VGroup):
if v.level == "host":
placement_map[v] = \
@ -129,50 +116,18 @@ class Optimizer(object):
def _update_resource_status(self, _uuid_map):
for v, np in self.search.node_placements.iteritems():
uuid = "none"
if _uuid_map is not None:
if v.uuid in _uuid_map.keys():
uuid = _uuid_map[v.uuid]
if isinstance(v, VM):
uuid = "none"
if _uuid_map is not None:
if v.uuid in _uuid_map.keys():
uuid = _uuid_map[v.uuid]
self.resource.add_vm_to_host(np.host_name,
(v.uuid, v.name, uuid),
v.vCPUs, v.mem, v.local_volume_size)
self.resource.add_vm_to_host(np.host_name,
(v.uuid, v.name, uuid),
v.vCPUs, v.mem,
v.local_volume_size)
self._update_logical_grouping(v, self.search.avail_hosts[np.host_name], uuid)
for vl in v.vm_list:
tnp = self.search.node_placements[vl.node]
placement_level = np.get_common_placement(tnp)
self.resource.deduct_bandwidth(np.host_name,
placement_level,
vl.nw_bandwidth)
for voll in v.volume_list:
tnp = self.search.node_placements[voll.node]
placement_level = np.get_common_placement(tnp)
self.resource.deduct_bandwidth(np.host_name,
placement_level,
voll.io_bandwidth)
self._update_logical_grouping(
v, self.search.avail_hosts[np.host_name], uuid)
self.resource.update_host_time(np.host_name)
elif isinstance(v, Volume):
self.resource.add_vol_to_host(np.host_name,
np.storage.storage_name, v.name,
v.volume_size)
for vl in v.vm_list:
tnp = self.search.node_placements[vl.node]
placement_level = np.get_common_placement(tnp)
self.resource.deduct_bandwidth(np.host_name,
placement_level,
vl.io_bandwidth)
self.resource.update_storage_time(np.storage.storage_name)
self.resource.update_host_time(np.host_name)
def _update_logical_grouping(self, _v, _avail_host, _uuid):
for lgk, lg in _avail_host.host_memberships.iteritems():

View File

@ -21,7 +21,7 @@ import time
import traceback
from valet.engine.listener.listener_manager import ListenerManager
from valet.engine.optimizer.app_manager.app_handler import AppHandler
from valet.engine.optimizer.app_manager.app_topology_base import VM, Volume
from valet.engine.optimizer.app_manager.app_topology_base import VM
from valet.engine.optimizer.db_connect.music_handler import MusicHandler
from valet.engine.optimizer.ostro.optimizer import Optimizer
from valet.engine.resource_manager.compute_manager import ComputeManager
@ -42,33 +42,24 @@ class Ostro(object):
self.db = MusicHandler(self.config, self.logger)
if self.db.init_db() is False:
self.logger.error("error while initializing MUSIC database")
else:
self.logger.debug("done init music")
self.resource = Resource(self.db, self.config, self.logger)
self.logger.debug("done init resource")
self.app_handler = AppHandler(self.resource, self.db, self.config,
self.logger)
self.logger.debug("done init apphandler")
self.optimizer = Optimizer(self.resource, self.logger)
self.logger.debug("done init optimizer")
self.data_lock = threading.Lock()
self.thread_list = []
self.topology = TopologyManager(1, "Topology", self.resource,
self.data_lock, self.config,
self.logger)
self.logger.debug("done init topology")
self.data_lock, self.config, self.logger)
self.compute = ComputeManager(2, "Compute", self.resource,
self.data_lock, self.config, self.logger)
self.logger.debug("done init compute")
self.listener = ListenerManager(3, "Listener", CONF)
self.logger.debug("done init listener")
self.status = "success"
self.end_of_process = False
@ -269,8 +260,6 @@ class Ostro(object):
return query_result
def _get_vms_from_logical_group(self, _group_name):
self.logger.debug("query to see vms of " + _group_name)
vm_list = []
vm_id_list = []
@ -279,13 +268,9 @@ class Ostro(object):
lg.group_type == "DIV":
lg_id = lgk.split(":")
if lg_id[1] == _group_name:
self.logger.debug("found group in Ostro")
vm_id_list = lg.vm_list
break
if len(vm_id_list) == 0:
self.logger.debug("group does not exist in Ostro")
for vm_id in vm_id_list:
if vm_id[2] != "none": # if physical_uuid != 'none'
vm_list.append(vm_id[2])
@ -405,7 +390,7 @@ class Ostro(object):
if e.method == "build_and_run_instance":
# VM is created (from stack)
self.logger.debug("Ostro.handle_events: got build_and_run event for " + e.uuid)
self.logger.info("Ostro.handle_events: got build_and_run event for " + e.uuid)
if self.db.put_uuid(e) is False:
self.data_lock.release()
return False
@ -420,7 +405,7 @@ class Ostro(object):
return False
if e.vm_state == "active":
self.logger.debug("Ostro.handle_events: got instance_"
self.logger.info("Ostro.handle_events: got instance_"
"active event for " + e.uuid)
vm_info = self.app_handler.get_vm_info(orch_id[1], orch_id[0], e.host)
if vm_info is None:
@ -458,28 +443,20 @@ class Ostro(object):
self._update_h_uuid_in_logical_groups(
orch_id[0], e.uuid, e.host)
else:
self.logger.debug("Ostro.handle_events: vm "
"activated as planned")
self._update_uuid_in_host(orch_id[0],
e.uuid, e.host)
self._update_uuid_in_logical_groups(
orch_id[0], e.uuid, e.host)
self.logger.info("EVENT: vm activated as planned")
self._update_uuid_in_host(orch_id[0], e.uuid, e.host)
self._update_uuid_in_logical_groups(orch_id[0], e.uuid, e.host)
resource_updated = True
elif e.vm_state == "deleted":
self.logger.debug("Ostro.handle_events: got instance_"
"delete event for " + e.uuid)
self.logger.info("EVENT: got instance_delete for " + e.uuid)
self._remove_vm_from_host(e.uuid, orch_id[0], e.host,
e.vcpus, e.mem, e.local_disk)
self._remove_vm_from_logical_groups(e.uuid, orch_id[0],
e.host)
self._remove_vm_from_host(e.uuid, orch_id[0], e.host, e.vcpus, e.mem, e.local_disk)
self._remove_vm_from_logical_groups(e.uuid, orch_id[0], e.host)
if self.app_handler.update_vm_info(orch_id[1],
orch_id[0]) is False:
self.logger.error("Ostro.handle_events: error "
"while updating app in MUSIC")
if self.app_handler.update_vm_info(orch_id[1], orch_id[0]) is False:
self.logger.error("EVENT: error while updating app in MUSIC")
self.data_lock.release()
return False
@ -495,7 +472,7 @@ class Ostro(object):
elif e.object_name == 'ComputeNode':
# Host resource is updated
self.logger.debug("EVENT: got compute for " + e.host)
self.logger.info("EVENT: got compute for " + e.host)
# NOTE: what if host is disabled?
if self.resource.update_host_resources(
e.host, e.status, e.vcpus, e.vcpus_used, e.mem,
@ -648,7 +625,7 @@ class Ostro(object):
if _status_type != "error":
applications = {}
for v in _map.keys():
if isinstance(v, VM) or isinstance(v, Volume):
if isinstance(v, VM):
resources = None
if v.app_uuid in applications.keys():
resources = applications[v.app_uuid]

File diff suppressed because it is too large Load Diff

View File

@ -28,40 +28,16 @@ class Resource(object):
self.level = None
self.host_name = None
self.host_memberships = {} # all mapped logical groups to host
self.host_vCPUs = 0 # original total vCPUs before overcommit
self.host_avail_vCPUs = 0 # remaining vCPUs after overcommit
self.host_mem = 0 # original total mem cap before overcommit
self.host_avail_mem = 0 # remaining mem cap after
self.host_local_disk = 0 # original total local disk cap before overcommit
self.host_avail_local_disk = 0 # remaining local disk cap after overcommit
self.host_num_of_placed_vms = 0 # the number of vms currently placed in this host
# all mapped logical groups to host
self.host_memberships = {}
# original total vCPUs before overcommit
self.host_vCPUs = 0
# remaining vCPUs after overcommit
self.host_avail_vCPUs = 0
# original total mem cap before overcommit
self.host_mem = 0
# remaining mem cap after
self.host_avail_mem = 0
# original total local disk cap before overcommit
self.host_local_disk = 0
# remaining local disk cap after overcommit
self.host_avail_local_disk = 0
# all mapped switches to host
self.host_avail_switches = {}
# all mapped storage_resources to host
self.host_avail_storages = {}
# the number of vms currently placed in this host
self.host_num_of_placed_vms = 0
# where this host is located
self.rack_name = None
self.rack_name = None # where this host is located
self.rack_memberships = {}
self.rack_vCPUs = 0
self.rack_avail_vCPUs = 0
@ -69,13 +45,6 @@ class Resource(object):
self.rack_avail_mem = 0
self.rack_local_disk = 0
self.rack_avail_local_disk = 0
# all mapped switches to rack
self.rack_avail_switches = {}
# all mapped storage_resources to rack
self.rack_avail_storages = {}
self.rack_num_of_placed_vms = 0
# where this host and rack are located
@ -88,19 +57,9 @@ class Resource(object):
self.cluster_avail_mem = 0
self.cluster_local_disk = 0
self.cluster_avail_local_disk = 0
# all mapped switches to cluster
self.cluster_avail_switches = {}
# all mapped storage_resources to cluster
self.cluster_avail_storages = {}
self.cluster_num_of_placed_vms = 0
# selected best storage for volume among host_avail_storages
self.storage = None
# order to place
self.sort_base = 0
self.sort_base = 0 # order to place
def get_common_placement(self, _resource):
"""Get common placement level."""
@ -232,32 +191,6 @@ class Resource(object):
return (mem, avail_mem)
def get_avail_storages(self, _level):
"""Get the available storages of this resource at the specified level."""
avail_storages = None
if _level == "cluster":
avail_storages = self.cluster_avail_storages
elif _level == "rack":
avail_storages = self.rack_avail_storages
elif _level == "host":
avail_storages = self.host_avail_storages
return avail_storages
def get_avail_switches(self, _level):
"""Get the available switches of this resource at the specified level."""
avail_switches = None
if _level == "cluster":
avail_switches = self.cluster_avail_switches
elif _level == "rack":
avail_switches = self.rack_avail_switches
elif _level == "host":
avail_switches = self.host_avail_switches
return avail_switches
class LogicalGroupResource(object):
"""LogicalGroupResource."""
@ -275,71 +208,13 @@ class LogicalGroupResource(object):
self.num_of_placed_vms_per_host = {}
class StorageResource(object):
"""StorageResource."""
def __init__(self):
"""Initialization."""
self.storage_name = None
self.storage_class = None
self.storage_avail_disk = 0
self.sort_base = 0
class SwitchResource(object):
"""SwitchResource."""
def __init__(self):
"""Initialization."""
self.switch_name = None
self.switch_type = None
self.avail_bandwidths = [] # out-bound bandwidths
self.sort_base = 0
class Node(object):
"""Node."""
def __init__(self):
"""Initialization."""
self.node = None # VM, Volume, or VGroup
self.node = None # VM or VGroup
self.sort_base = -1
def get_all_links(self):
"""Return a list of links for vms, volumes, and/or vgroups."""
link_list = []
if isinstance(self.node, VM):
for vml in self.node.vm_list:
link_list.append(vml)
for voll in self.node.volume_list:
link_list.append(voll)
elif isinstance(self.node, Volume):
for vml in self.node.vm_list: # vml is VolumeLink
link_list.append(vml)
elif isinstance(self.node, VGroup):
for vgl in self.node.vgroup_list:
link_list.append(vgl)
return link_list
def get_bandwidth_of_link(self, _link):
"""Return bandwidth of link."""
bandwidth = 0
if isinstance(self.node, VGroup) or isinstance(self.node, VM):
if isinstance(_link.node, VM):
bandwidth = _link.nw_bandwidth
elif isinstance(_link.node, Volume):
bandwidth = _link.io_bandwidth
else:
bandwidth = _link.io_bandwidth
return bandwidth
def get_common_diversity(self, _diversity_groups):
"""Return the common level of the given diversity groups."""
common_level = "ANY"
@ -365,15 +240,3 @@ class Node(object):
aff_id = self.node.level + ":" + self.node.name
return aff_id
def compute_reservation(_level, _placement_level, _bandwidth):
"""Compute and return the reservation."""
reservation = 0
if _placement_level != "ANY":
diff = LEVELS.index(_placement_level) - LEVELS.index(_level) + 1
if diff > 0:
reservation = _bandwidth * diff * 2
return reservation

View File

@ -50,8 +50,6 @@ class Config(object):
self.db_event_table = None
self.db_resource_table = None
self.db_app_table = None
self.db_resource_index_table = None
self.db_app_index_table = None
self.db_uuid_table = None
self.replication_factor = 3
self.db_hosts = []
@ -198,10 +196,6 @@ class Config(object):
self.db_app_table = CONF.music.app_table
self.db_resource_index_table = CONF.music.resource_index_table
self.db_app_index_table = CONF.music.app_index_table
self.db_uuid_table = CONF.music.uuid_table
self.replication_factor = CONF.music.replication_factor

View File

@ -86,9 +86,6 @@ class Daemon(object):
os.dup2(se.fileno(), sys.stderr.fileno())
atexit.register(self.delpid)
# write pidfile - moved to OstroDaemon.run
# pid = str(os.getpid())
# file(self.pidfile, 'w+').write("%s\n" % pid)
def delpid(self):
"""Remove pidfile."""
@ -151,7 +148,6 @@ class Daemon(object):
if os.path.exists(self.pidfile):
os.remove(self.pidfile)
else:
# print str(err)
sys.exit(1)
def status(self):

View File

@ -1,88 +0,0 @@
#
# Copyright 2014-2017 AT&T Intellectual Property
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Util."""
from os import listdir, stat
from os.path import isfile, join
def get_logfile(_loc, _max_log_size, _name):
"""Get logfile from location and return with file mode."""
files = [f for f in listdir(_loc) if isfile(join(_loc, f))]
logfile_index = 0
for f in files:
f_name_list = f.split(".")
f_type = f_name_list[len(f_name_list) - 1]
if f_type == "log":
f_id_list = f.split("_")
temp_f_id = f_id_list[len(f_id_list) - 1]
f_id = temp_f_id.split(".")[0]
f_index = int(f_id)
if f_index > logfile_index:
logfile_index = f_index
last_logfile = _name + "_" + str(logfile_index) + ".log"
mode = None
if isfile(_loc + last_logfile) is True:
statinfo = stat(_loc + last_logfile)
if statinfo.st_size > _max_log_size:
last_logfile = _name + "_" + str(logfile_index + 1) + ".log"
mode = 'w'
else:
mode = 'a'
else:
mode = 'w'
return (last_logfile, mode)
def get_last_logfile(_loc, _max_log_size, _max_num_of_logs,
_name, _last_index):
"""Return last logfile from location with index and mode."""
last_logfile = _name + "_" + str(_last_index) + ".log"
mode = None
if isfile(_loc + last_logfile) is True:
statinfo = stat(_loc + last_logfile)
if statinfo.st_size > _max_log_size:
if (_last_index + 1) < _max_num_of_logs:
_last_index = _last_index + 1
else:
_last_index = 0
last_logfile = _name + "_" + str(_last_index) + ".log"
mode = 'w'
else:
mode = 'a'
else:
mode = 'w'
return (last_logfile, _last_index, mode)
def adjust_json_string(_data):
"""Adjust data value formatting to be consistent and return."""
_data = _data.replace("None", '"none"')
_data = _data.replace("False", '"false"')
_data = _data.replace("True", '"true"')
_data = _data.replace('_"none"', "_none")
_data = _data.replace('_"false"', "_false")
_data = _data.replace('_"true"', "_true")
return _data

View File

@ -137,7 +137,6 @@ class Compute(object):
return "success"
# NOTE: do not set any info in _logical_groups
def _set_placed_vms(self, _hosts, _logical_groups):
error_status = None
@ -154,7 +153,7 @@ class Compute(object):
if result_status_detail == "success":
vm_id = ("none", vm_detail[0], vm_uuid)
_hosts[hk].vm_list.append(vm_id)
# FIXME(GJ): should track logical groups (e.g., AZ)?
else:
error_status = result_status_detail
break
@ -206,7 +205,8 @@ class Compute(object):
return "success"
def _set_resources(self, _hosts):
# Returns Hypervisor list
''' returns Hypervisor list '''
host_list = self.nova.hypervisors.list()
try:
@ -252,7 +252,8 @@ class Compute(object):
return error_status
def _set_flavors(self, _flavors):
# Get a list of all flavors
''' get a list of all flavors '''
flavor_list = self.nova.flavors.list()
try:
@ -280,9 +281,6 @@ class Compute(object):
swap_mb = float(sw)
flavor.disk_cap = root_gb + ephemeral_gb + swap_mb / float(1024)
# self.logger.info("adding flavor " + str(flavor.__dict__))
_flavors[flavor.name] = flavor
except (ValueError, KeyError, TypeError):

View File

@ -20,7 +20,6 @@ import time
from copy import deepcopy
from valet.engine.resource_manager.compute import Compute
from valet.engine.resource_manager.compute_simulator import SimCompute
from valet.engine.resource_manager.resource_base import Host
@ -93,12 +92,7 @@ class ComputeManager(threading.Thread):
hosts = {}
logical_groups = {}
compute = None
if self.config.mode.startswith("sim") is True or \
self.config.mode.startswith("test") is True:
compute = SimCompute(self.config)
else:
compute = Compute(self.logger)
compute = Compute(self.logger)
status = compute.set_hosts(hosts, logical_groups)
if status != "success":
@ -376,12 +370,7 @@ class ComputeManager(threading.Thread):
"""Return True if compute set flavors returns success."""
flavors = {}
compute = None
if self.config.mode.startswith("sim") is True or \
self.config.mode.startswith("test") is True:
compute = SimCompute(self.config)
else:
compute = Compute(self.logger)
compute = Compute(self.logger)
status = compute.set_flavors(flavors)
if status != "success":

View File

@ -1,127 +0,0 @@
#
# Copyright 2014-2017 AT&T Intellectual Property
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Compute Simulator."""
from valet.engine.resource_manager.resource_base \
import Host, LogicalGroup, Flavor
class SimCompute(object):
"""Sim Compute Class.
This class simulates a compute datacenter using classes from resource_base.
"""
def __init__(self, _config):
"""Init Sim Compute class (object)."""
self.config = _config
self.datacenter_name = "sim"
def set_hosts(self, _hosts, _logical_groups):
"""Set hosts and logical groups using resource_base, return success."""
self._set_availability_zones(_hosts, _logical_groups)
self._set_aggregates(_hosts, _logical_groups)
self._set_placed_vms(_hosts, _logical_groups)
self._set_resources(_hosts)
return "success"
def _set_availability_zones(self, _hosts, _logical_groups):
logical_group = LogicalGroup("nova")
logical_group.group_type = "AZ"
_logical_groups[logical_group.name] = logical_group
for r_num in range(0, self.config.num_of_racks):
for h_num in range(0, self.config.num_of_hosts_per_rack):
host = Host(self.datacenter_name + "0r" + str(r_num) +
"c" + str(h_num))
host.tag.append("nova")
host.memberships["nova"] = logical_group
logical_group.vms_per_host[host.name] = []
_hosts[host.name] = host
def _set_aggregates(self, _hosts, _logical_groups):
for a_num in range(0, self.config.num_of_aggregates):
metadata = {}
metadata["cpu_allocation_ratio"] = "0.5"
aggregate = LogicalGroup("aggregate" + str(a_num))
aggregate.group_type = "AGGR"
aggregate.metadata = metadata
_logical_groups[aggregate.name] = aggregate
for a_num in range(0, self.config.num_of_aggregates):
aggregate = _logical_groups["aggregate" + str(a_num)]
for r_num in range(0, self.config.num_of_racks):
for h_num in range(0, self.config.num_of_hosts_per_rack):
host_name = self.datacenter_name + "0r" + str(r_num) +\
"c" + str(h_num)
if host_name in _hosts.keys():
if (h_num %
(self.config.aggregated_ratio + a_num)) == 0:
host = _hosts[host_name]
host.memberships[aggregate.name] = aggregate
aggregate.vms_per_host[host.name] = []
def _set_placed_vms(self, _hosts, _logical_groups):
pass
def _set_resources(self, _hosts):
for r_num in range(0, self.config.num_of_racks):
for h_num in range(0, self.config.num_of_hosts_per_rack):
host_name = self.datacenter_name + "0r" + str(r_num) +\
"c" + str(h_num)
if host_name in _hosts.keys():
host = _hosts[host_name]
host.original_vCPUs = float(self.config.cpus_per_host)
host.vCPUs_used = 0.0
host.original_mem_cap = float(self.config.mem_per_host)
host.free_mem_mb = host.original_mem_cap
host.original_local_disk_cap = \
float(self.config.disk_per_host)
host.free_disk_gb = host.original_local_disk_cap
host.disk_available_least = host.original_local_disk_cap
def set_flavors(self, _flavors):
"""Set flavors in compute sim, return success."""
for f_num in range(0, self.config.num_of_basic_flavors):
flavor = Flavor("bflavor" + str(f_num))
flavor.vCPUs = float(self.config.base_flavor_cpus * (f_num + 1))
flavor.mem_cap = float(self.config.base_flavor_mem * (f_num + 1))
flavor.disk_cap = \
float(self.config.base_flavor_disk * (f_num + 1)) + \
10.0 + 20.0 / 1024.0
_flavors[flavor.name] = flavor
for a_num in range(0, self.config.num_of_aggregates):
flavor = Flavor("sflavor" + str(a_num))
flavor.vCPUs = self.config.base_flavor_cpus * (a_num + 1)
flavor.mem_cap = self.config.base_flavor_mem * (a_num + 1)
flavor.disk_cap = self.config.base_flavor_disk * (a_num + 1)
flavor.extra_specs["cpu_allocation_ratio"] = "0.5"
_flavors[flavor.name] = flavor
return "success"

View File

@ -15,7 +15,6 @@
"""Resource - Handles data, metadata, status of resources."""
import sys
import time
import traceback
@ -45,8 +44,6 @@ class Resource(object):
self.datacenter = Datacenter(self.config.datacenter_name)
self.host_groups = {}
self.hosts = {}
self.switches = {}
self.storage_hosts = {}
""" metadata """
self.logical_groups = {}
@ -76,7 +73,6 @@ class Resource(object):
logical_group.status = lg.get("status")
logical_group.metadata = lg.get("metadata")
logical_group.vm_list = lg.get("vm_list")
logical_group.volume_list = lg.get("volume_list", [])
logical_group.vms_per_host = lg.get("vms_per_host")
self.logical_groups[lgk] = logical_group
@ -100,47 +96,6 @@ class Resource(object):
if len(self.flavors) == 0:
self.logger.error("fail loading flavors")
switches = _resource_status.get("switches")
if switches:
for sk, s in switches.iteritems():
switch = Switch(sk)
switch.switch_type = s.get("switch_type")
switch.status = s.get("status")
self.switches[sk] = switch
if len(self.switches) > 0:
for sk, s in switches.iteritems():
switch = self.switches[sk]
up_links = {}
uls = s.get("up_links")
for ulk, ul in uls.iteritems():
ulink = Link(ulk)
ulink.resource = self.switches[ul.get("resource")]
ulink.nw_bandwidth = ul.get("bandwidth")
ulink.avail_nw_bandwidth = ul.get("avail_bandwidth")
up_links[ulk] = ulink
switch.up_links = up_links
peer_links = {}
pls = s.get("peer_links")
for plk, pl in pls.iteritems():
plink = Link(plk)
plink.resource = self.switches[pl.get("resource")]
plink.nw_bandwidth = pl.get("bandwidth")
plink.avail_nw_bandwidth = pl.get("avail_bandwidth")
peer_links[plk] = plink
switch.peer_links = peer_links
else:
self.logger.error("fail loading switches")
# storage_hosts
hosts = _resource_status.get("hosts")
if hosts:
for hk, h in hosts.iteritems():
@ -162,16 +117,10 @@ class Resource(object):
host.free_disk_gb = h.get("free_disk_gb")
host.disk_available_least = h.get("disk_available_least")
host.vm_list = h.get("vm_list")
host.volume_list = h.get("volume_list", [])
for lgk in h["membership_list"]:
host.memberships[lgk] = self.logical_groups[lgk]
for sk in h.get("switch_list", []):
host.switches[sk] = self.switches[sk]
# host.storages
self.hosts[hk] = host
if len(self.hosts) == 0:
@ -194,16 +143,10 @@ class Resource(object):
hg.get("original_local_disk")
host_group.avail_local_disk_cap = hg.get("avail_local_disk")
host_group.vm_list = hg.get("vm_list")
host_group.volume_list = hg.get("volume_list", [])
for lgk in hg.get("membership_list"):
host_group.memberships[lgk] = self.logical_groups[lgk]
for sk in hg.get("switch_list", []):
host_group.switches[sk] = self.switches[sk]
# host.storages
self.host_groups[hgk] = host_group
if len(self.host_groups) == 0:
@ -225,16 +168,10 @@ class Resource(object):
dc.get("original_local_disk")
self.datacenter.avail_local_disk_cap = dc.get("avail_local_disk")
self.datacenter.vm_list = dc.get("vm_list")
self.datacenter.volume_list = dc.get("volume_list", [])
for lgk in dc.get("membership_list"):
self.datacenter.memberships[lgk] = self.logical_groups[lgk]
for sk in dc.get("switch_list", []):
self.datacenter.root_switches[sk] = self.switches[sk]
# host.storages
for ck in dc.get("children"):
if ck in self.host_groups.keys():
self.datacenter.resources[ck] = self.host_groups[ck]
@ -273,11 +210,9 @@ class Resource(object):
host.host_group = self.host_groups[pk]
self._update_compute_avail()
self._update_storage_avail()
self._update_nw_bandwidth_avail()
except Exception:
self.logger.error("Resource: bootstrap_from_db:" + traceback.format_exc())
self.logger.error("while bootstrap_from_db:" + traceback.format_exc())
return True
@ -286,8 +221,6 @@ class Resource(object):
self._update_topology()
self._update_compute_avail()
self._update_storage_avail()
self._update_nw_bandwidth_avail()
if store is False:
return True
@ -315,8 +248,6 @@ class Resource(object):
def _update_host_group_topology(self, _host_group):
_host_group.init_resources()
del _host_group.vm_list[:]
del _host_group.volume_list[:]
_host_group.storages.clear()
for _, host in _host_group.child_resources.iteritems():
if host.check_availability() is True:
@ -331,16 +262,9 @@ class Resource(object):
host.original_local_disk_cap
_host_group.avail_local_disk_cap += host.avail_local_disk_cap
for shk, storage_host in host.storages.iteritems():
if storage_host.status == "enabled":
_host_group.storages[shk] = storage_host
for vm_id in host.vm_list:
_host_group.vm_list.append(vm_id)
for vol_name in host.volume_list:
_host_group.volume_list.append(vol_name)
_host_group.init_memberships()
for _, host in _host_group.child_resources.iteritems():
@ -351,8 +275,6 @@ class Resource(object):
def _update_datacenter_topology(self):
self.datacenter.init_resources()
del self.datacenter.vm_list[:]
del self.datacenter.volume_list[:]
self.datacenter.storages.clear()
self.datacenter.memberships.clear()
for _, resource in self.datacenter.resources.iteritems():
@ -369,16 +291,9 @@ class Resource(object):
self.datacenter.avail_local_disk_cap += \
resource.avail_local_disk_cap
for shk, storage_host in resource.storages.iteritems():
if storage_host.status == "enabled":
self.datacenter.storages[shk] = storage_host
for vm_name in resource.vm_list:
self.datacenter.vm_list.append(vm_name)
for vol_name in resource.volume_list:
self.datacenter.volume_list.append(vol_name)
for mk in resource.memberships.keys():
self.datacenter.memberships[mk] = resource.memberships[mk]
@ -387,57 +302,10 @@ class Resource(object):
self.mem_avail = self.datacenter.avail_mem_cap
self.local_disk_avail = self.datacenter.avail_local_disk_cap
def _update_storage_avail(self):
self.disk_avail = 0
for _, storage_host in self.storage_hosts.iteritems():
if storage_host.status == "enabled":
self.disk_avail += storage_host.avail_disk_cap
def _update_nw_bandwidth_avail(self):
self.nw_bandwidth_avail = 0
level = "leaf"
for _, s in self.switches.iteritems():
if s.status == "enabled":
if level == "leaf":
if s.switch_type == "ToR" or s.switch_type == "spine":
level = s.switch_type
elif level == "ToR":
if s.switch_type == "spine":
level = s.switch_type
if level == "leaf":
self.nw_bandwidth_avail = sys.maxint
elif level == "ToR":
for _, h in self.hosts.iteritems():
if h.status == "enabled" and h.state == "up" and \
("nova" in h.tag) and ("infra" in h.tag):
avail_nw_bandwidth_list = [sys.maxint]
for sk, s in h.switches.iteritems():
if s.status == "enabled":
for ulk, ul in s.up_links.iteritems():
avail_nw_bandwidth_list.append(
ul.avail_nw_bandwidth)
self.nw_bandwidth_avail += min(avail_nw_bandwidth_list)
elif level == "spine":
for _, hg in self.host_groups.iteritems():
if hg.host_type == "rack" and hg.status == "enabled":
avail_nw_bandwidth_list = [sys.maxint]
for _, s in hg.switches.iteritems():
if s.status == "enabled":
for _, ul in s.up_links.iteritems():
avail_nw_bandwidth_list.append(
ul.avail_nw_bandwidth)
# NOTE: peer links?
self.nw_bandwidth_avail += min(avail_nw_bandwidth_list)
def store_topology_updates(self):
updated = False
flavor_updates = {}
logical_group_updates = {}
storage_updates = {}
switch_updates = {}
host_updates = {}
host_group_updates = {}
datacenter_update = None
@ -454,31 +322,17 @@ class Resource(object):
logical_group_updates[lgk] = lg.get_json_info()
updated = True
for shk, storage_host in self.storage_hosts.iteritems():
if storage_host.last_update >= self.curr_db_timestamp or \
storage_host.last_cap_update >= self.curr_db_timestamp:
storage_updates[shk] = storage_host.get_json_info()
updated = True
for sk, s in self.switches.iteritems():
if s.last_update >= self.curr_db_timestamp:
switch_updates[sk] = s.get_json_info()
updated = True
for hk, host in self.hosts.iteritems():
if host.last_update > self.current_timestamp or \
host.last_link_update > self.current_timestamp:
if host.last_update >= self.curr_db_timestamp:
host_updates[hk] = host.get_json_info()
updated = True
for hgk, host_group in self.host_groups.iteritems():
if host_group.last_update >= self.curr_db_timestamp or \
host_group.last_link_update >= self.curr_db_timestamp:
if host_group.last_update >= self.curr_db_timestamp:
host_group_updates[hgk] = host_group.get_json_info()
updated = True
if self.datacenter.last_update >= self.curr_db_timestamp or \
self.datacenter.last_link_update >= self.curr_db_timestamp:
if self.datacenter.last_update >= self.curr_db_timestamp:
datacenter_update = self.datacenter.get_json_info()
updated = True
@ -492,10 +346,6 @@ class Resource(object):
json_logging['flavors'] = flavor_updates
if len(logical_group_updates) > 0:
json_logging['logical_groups'] = logical_group_updates
if len(storage_updates) > 0:
json_logging['storages'] = storage_updates
if len(switch_updates) > 0:
json_logging['switches'] = switch_updates
if len(host_updates) > 0:
json_logging['hosts'] = host_updates
if len(host_group_updates) > 0:
@ -511,9 +361,10 @@ class Resource(object):
return True
def show_current_logical_groups(self):
self.logger.debug("--- track logical groups info ---")
for lgk, lg in self.logical_groups.iteritems():
if lg.status == "enabled":
self.logger.debug("TEST: lg name = " + lgk)
self.logger.debug("lg name = " + lgk)
self.logger.debug(" type = " + lg.group_type)
if lg.group_type == "AGGR":
for k in lg.metadata.keys():
@ -538,8 +389,9 @@ class Resource(object):
self.logger.error("TEST: membership missing")
def show_current_host_status(self):
self.logger.debug("--- track host info ---")
for hk, h in self.hosts.iteritems():
self.logger.debug("TEST: host name = " + hk)
self.logger.debug("host name = " + hk)
self.logger.debug(" status = " + h.status + ", " + h.state)
self.logger.debug(" vms = " + str(len(h.vm_list)))
self.logger.debug(" resources (org, total, avail, used)")
@ -637,55 +489,6 @@ class Resource(object):
host.free_disk_gb += _ldisk
host.disk_available_least += _ldisk
def add_vol_to_host(self, _host_name, _storage_name, _v_id, _disk):
"""Add volume to host and adjust available disk on host."""
host = self.hosts[_host_name]
host.volume_list.append(_v_id)
storage_host = self.storage_hosts[_storage_name]
storage_host.volume_list.append(_v_id)
storage_host.avail_disk_cap -= _disk
# NOTE: Assume the up-link of spine switch is not used except out-going
# from datacenter
# NOTE: What about peer-switches?
def deduct_bandwidth(self, _host_name, _placement_level, _bandwidth):
"""Deduct bandwidth at appropriate placement level."""
host = self.hosts[_host_name]
if _placement_level == "host":
self._deduct_host_bandwidth(host, _bandwidth)
elif _placement_level == "rack":
self._deduct_host_bandwidth(host, _bandwidth)
rack = host.host_group
if not isinstance(rack, Datacenter):
self._deduct_host_bandwidth(rack, _bandwidth)
elif _placement_level == "cluster":
self._deduct_host_bandwidth(host, _bandwidth)
rack = host.host_group
self._deduct_host_bandwidth(rack, _bandwidth)
cluster = rack.parent_resource
for _, s in cluster.switches.iteritems():
if s.switch_type == "spine":
for _, ul in s.up_links.iteritems():
ul.avail_nw_bandwidth -= _bandwidth
s.last_update = time.time()
def _deduct_host_bandwidth(self, _host, _bandwidth):
for _, hs in _host.switches.iteritems():
for _, ul in hs.up_links.iteritems():
ul.avail_nw_bandwidth -= _bandwidth
hs.last_update = time.time()
# called from handle_events
def update_host_resources(self, _hn, _st, _vcpus, _vcpus_used, _mem, _fmem,
_ldisk, _fldisk, _avail_least):
@ -695,7 +498,7 @@ class Resource(object):
if host.status != _st:
host.status = _st
self.logger.debug("Resource.update_host_resources: host(" + _hn +
self.logger.warn("Resource.update_host_resources: host(" + _hn +
") status changed")
updated = True
@ -713,12 +516,6 @@ class Resource(object):
host.last_update = time.time()
self.update_rack_resource(host)
def update_storage_time(self, _storage_name):
"""Update last storage update time."""
storage_host = self.storage_hosts[_storage_name]
storage_host.last_cap_update = time.time()
def add_logical_group(self, _host_name, _lg_name, _lg_type):
"""Add logical group to host memberships and update host resource."""
host = None

View File

@ -51,19 +51,12 @@ class Datacenter(object):
self.original_local_disk_cap = 0
self.avail_local_disk_cap = 0
self.root_switches = {}
self.storages = {}
self.resources = {}
# a list of placed vms, (ochestration_uuid, vm_name, physical_uuid)
self.vm_list = []
# a list of placed volumes
self.volume_list = []
self.last_update = 0
self.last_link_update = 0
def init_resources(self):
"""Init datacenter resources to 0."""
@ -83,14 +76,6 @@ class Datacenter(object):
for lgk in self.memberships.keys():
membership_list.append(lgk)
switch_list = []
for sk in self.root_switches.keys():
switch_list.append(sk)
storage_list = []
for shk in self.storages.keys():
storage_list.append(shk)
child_list = []
for ck in self.resources.keys():
child_list.append(ck)
@ -108,13 +93,9 @@ class Datacenter(object):
'local_disk': self.local_disk_cap,
'original_local_disk': self.original_local_disk_cap,
'avail_local_disk': self.avail_local_disk_cap,
'switch_list': switch_list,
'storage_list': storage_list,
'children': child_list,
'vm_list': self.vm_list,
'volume_list': self.volume_list,
'last_update': self.last_update,
'last_link_update': self.last_link_update}
'last_update': self.last_update}
class HostGroup(object):
@ -147,20 +128,13 @@ class HostGroup(object):
self.original_local_disk_cap = 0
self.avail_local_disk_cap = 0
self.switches = {} # ToRs
self.storages = {}
self.parent_resource = None # e.g., datacenter
self.child_resources = {} # e.g., hosting servers
# a list of placed vms, (ochestration_uuid, vm_name, physical_uuid)
self.vm_list = []
# a list of placed volumes
self.volume_list = []
self.last_update = 0
self.last_link_update = 0
def init_resources(self):
"""Init all host group resources to 0."""
@ -212,14 +186,6 @@ class HostGroup(object):
for lgk in self.memberships.keys():
membership_list.append(lgk)
switch_list = []
for sk in self.switches.keys():
switch_list.append(sk)
storage_list = []
for shk in self.storages.keys():
storage_list.append(shk)
child_list = []
for ck in self.child_resources.keys():
child_list.append(ck)
@ -236,14 +202,10 @@ class HostGroup(object):
'local_disk': self.local_disk_cap,
'original_local_disk': self.original_local_disk_cap,
'avail_local_disk': self.avail_local_disk_cap,
'switch_list': switch_list,
'storage_list': storage_list,
'parent': self.parent_resource.name,
'children': child_list,
'vm_list': self.vm_list,
'volume_list': self.volume_list,
'last_update': self.last_update,
'last_link_update': self.last_link_update}
'last_update': self.last_update}
class Host(object):
@ -282,19 +244,12 @@ class Host(object):
self.free_disk_gb = 0
self.disk_available_least = 0
self.switches = {} # leaf
self.storages = {}
self.host_group = None # e.g., rack
# a list of placed vms, (ochestration_uuid, vm_name, physical_uuid)
self.vm_list = []
# a list of placed volumes
self.volume_list = []
self.last_update = 0
self.last_link_update = 0
def clean_memberships(self):
"""Return True if host cleaned from logical group membership."""
@ -458,14 +413,6 @@ class Host(object):
for lgk in self.memberships.keys():
membership_list.append(lgk)
switch_list = []
for sk in self.switches.keys():
switch_list.append(sk)
storage_list = []
for shk in self.storages.keys():
storage_list.append(shk)
return {'tag': self.tag, 'status': self.status, 'state': self.state,
'membership_list': membership_list,
'vCPUs': self.vCPUs,
@ -481,13 +428,9 @@ class Host(object):
'free_mem_mb': self.free_mem_mb,
'free_disk_gb': self.free_disk_gb,
'disk_available_least': self.disk_available_least,
'switch_list': switch_list,
'storage_list': storage_list,
'parent': self.host_group.name,
'vm_list': self.vm_list,
'volume_list': self.volume_list,
'last_update': self.last_update,
'last_link_update': self.last_link_update}
'last_update': self.last_update}
class LogicalGroup(object):
@ -694,93 +637,10 @@ class LogicalGroup(object):
'group_type': self.group_type,
'metadata': self.metadata,
'vm_list': self.vm_list,
'volume_list': self.volume_list,
'vms_per_host': self.vms_per_host,
'last_update': self.last_update}
class Switch(object):
"""Switch class."""
def __init__(self, _switch_id):
"""Init Switch object."""
self.name = _switch_id
self.switch_type = "ToR" # root, spine, ToR, or leaf
self.status = "enabled"
self.up_links = {}
self.down_links = {} # currently, not used
self.peer_links = {}
self.last_update = 0
def get_json_info(self):
"""Return JSON info on Switch object."""
ulinks = {}
for ulk, ul in self.up_links.iteritems():
ulinks[ulk] = ul.get_json_info()
plinks = {}
for plk, pl in self.peer_links.iteritems():
plinks[plk] = pl.get_json_info()
return {'status': self.status,
'switch_type': self.switch_type,
'up_links': ulinks,
'peer_links': plinks,
'last_update': self.last_update}
class Link(object):
"""Link class."""
def __init__(self, _name):
"""Init Link object."""
self.name = _name # format: source + "-" + target
self.resource = None # switch beging connected to
self.nw_bandwidth = 0 # Mbps
self.avail_nw_bandwidth = 0
def get_json_info(self):
"""Return JSON info on Link object."""
return {'resource': self.resource.name,
'bandwidth': self.nw_bandwidth,
'avail_bandwidth': self.avail_nw_bandwidth}
class StorageHost(object):
"""Storage Host class."""
def __init__(self, _name):
"""Init Storage Host object."""
self.name = _name
self.storage_class = None # tiering, e.g., platinum, gold, silver
self.status = "enabled"
self.host_list = []
self.disk_cap = 0 # GB
self.avail_disk_cap = 0
self.volume_list = [] # list of volume names placed in this host
self.last_update = 0
self.last_cap_update = 0
def get_json_info(self):
"""Return JSON info on Storage Host object."""
return {'status': self.status,
'class': self.storage_class,
'host_list': self.host_list,
'disk': self.disk_cap,
'avail_disk': self.avail_disk_cap,
'volume_list': self.volume_list,
'last_update': self.last_update,
'last_cap_update': self.last_cap_update}
class Flavor(object):
"""Flavor class."""

View File

@ -16,14 +16,16 @@
"""Topology class - performs actual setting up of Topology object."""
import copy
import sys
from sre_parse import isdigit
from valet.engine.resource_manager.resource_base import HostGroup, Switch, Link
from valet.engine.resource_manager.resource_base import HostGroup
class Topology(object):
"""Topology class."""
"""
Topology class.
currently, using cannonical naming convention to find the topology
"""
def __init__(self, _config, _logger):
"""Init config and logger."""
@ -31,22 +33,16 @@ class Topology(object):
self.logger = _logger
# Triggered by rhosts change
def set_topology(self, _datacenter, _host_groups, _hosts, _rhosts,
_switches):
def set_topology(self, _datacenter, _host_groups, _hosts, _rhosts):
"""Return result status if setting host or network topology fails."""
result_status = self._set_host_topology(_datacenter, _host_groups,
_hosts, _rhosts)
if result_status != "success":
return result_status
result_status = self._set_network_topology(_datacenter, _host_groups,
_hosts, _switches)
if result_status != "success":
return result_status
# TODO(GJ): set network bandwidth links
return "success"
# NOTE: currently, the hosts are copied from Nova
def _set_host_topology(self, _datacenter, _host_groups, _hosts, _rhosts):
for rhk, rh in _rhosts.iteritems():
h = copy.deepcopy(rh)
@ -87,43 +83,6 @@ class Topology(object):
return "success"
# NOTE: this is just muck-ups
def _set_network_topology(self, _datacenter, _host_groups, _hosts,
_switches):
root_switch = Switch(_datacenter.name)
root_switch.switch_type = "root"
_datacenter.root_switches[root_switch.name] = root_switch
_switches[root_switch.name] = root_switch
for hgk, hg in _host_groups.iteritems():
switch = Switch(hgk)
switch.switch_type = "ToR"
up_link = Link(hgk + "-" + _datacenter.name)
up_link.resource = root_switch
up_link.nw_bandwidth = sys.maxint
up_link.avail_nw_bandwidth = up_link.nw_bandwidth
switch.up_links[up_link.name] = up_link
hg.switches[switch.name] = switch
_switches[switch.name] = switch
for hk, h in hg.child_resources.iteritems():
leaf_switch = Switch(hk)
leaf_switch.switch_type = "leaf"
l_up_link = Link(hk + "-" + hgk)
l_up_link.resource = switch
l_up_link.nw_bandwidth = sys.maxint
l_up_link.avail_nw_bandwidth = l_up_link.nw_bandwidth
leaf_switch.up_links[l_up_link.name] = l_up_link
h.switches[leaf_switch.name] = leaf_switch
_switches[leaf_switch.name] = leaf_switch
return "success"
def _set_layout_by_name(self, _host_name):
region_name = None
rack_name = None

View File

@ -24,7 +24,7 @@ import threading
import time
from valet.engine.resource_manager.resource_base \
import Datacenter, HostGroup, Host, Switch, Link
import Datacenter, HostGroup, Host
from valet.engine.resource_manager.topology import Topology
@ -43,7 +43,6 @@ class TopologyManager(threading.Thread):
self.resource = _resource
self.config = _config
self.logger = _logger
self.update_batch_wait = self.config.update_batch_wait
@ -79,58 +78,29 @@ class TopologyManager(threading.Thread):
self.logger.info("--- done topology status update ---")
def set_topology(self):
"""Return True if datacenter topology successfully setup."""
datacenter = None
host_groups = {}
hosts = {}
switches = {}
topology = None
if self.config.mode.startswith("sim") is True or \
self.config.mode.startswith("test") is True:
datacenter = Datacenter("sim")
else:
datacenter = Datacenter(self.config.datacenter_name)
# NOTE(GJ): do not consider switch topology at this version
datacenter = Datacenter(self.config.datacenter_name)
topology = Topology(self.config, self.logger)
status = topology.set_topology(datacenter, host_groups, hosts,
self.resource.hosts, switches)
self.resource.hosts)
if status != "success":
return False
self.data_lock.acquire()
if self._check_update(datacenter, host_groups, hosts, switches) is True:
if self._check_update(datacenter, host_groups, hosts) is True:
self.resource.update_topology(store=False)
self.data_lock.release()
return True
def _check_update(self, _datacenter, _host_groups, _hosts, _switches):
def _check_update(self, _datacenter, _host_groups, _hosts):
updated = False
for sk in _switches.keys():
if sk not in self.resource.switches.keys():
new_switch = self._create_new_switch(_switches[sk])
self.resource.switches[new_switch.name] = new_switch
new_switch.last_update = time.time()
self.logger.warn("TopologyManager: new switch (" +
new_switch.name + ") added")
updated = True
for rsk in self.resource.switches.keys():
if rsk not in _switches.keys():
switch = self.resource.switches[rsk]
switch.status = "disabled"
switch.last_update = time.time()
self.logger.warn("TopologyManager: switch (" +
switch.name + ") disabled")
updated = True
for hk in _hosts.keys():
if hk not in self.resource.hosts.keys():
new_host = self._create_new_host(_hosts[hk])
@ -176,45 +146,26 @@ class TopologyManager(threading.Thread):
host_group.name + ") disabled")
updated = True
for sk in _switches.keys():
switch = _switches[sk]
rswitch = self.resource.switches[sk]
if self._check_switch_update(switch, rswitch) is True:
rswitch.last_update = time.time()
updated = True
for hk in _hosts.keys():
host = _hosts[hk]
rhost = self.resource.hosts[hk]
(topology_updated, link_updated) = \
self._check_host_update(host, rhost)
topology_updated = self._check_host_update(host, rhost)
if topology_updated is True:
rhost.last_update = time.time()
updated = True
if link_updated is True:
rhost.last_link_update = time.time()
updated = True
for hgk in _host_groups.keys():
hg = _host_groups[hgk]
rhg = self.resource.host_groups[hgk]
(topology_updated, link_updated) = \
self._check_host_group_update(hg, rhg)
topology_updated = self._check_host_group_update(hg, rhg)
if topology_updated is True:
rhg.last_update = time.time()
updated = True
if link_updated is True:
rhg.last_link_update = time.time()
updated = True
(topology_updated, link_updated) = \
self._check_datacenter_update(_datacenter)
topology_updated = self._check_datacenter_update(_datacenter)
if topology_updated is True:
self.resource.datacenter.last_update = time.time()
updated = True
if link_updated is True:
self.resource.datacenter.last_link_update = time.time()
updated = True
for hk, host in self.resource.hosts.iteritems():
if host.last_update >= self.resource.current_timestamp:
@ -226,21 +177,6 @@ class TopologyManager(threading.Thread):
return updated
def _create_new_switch(self, _switch):
new_switch = Switch(_switch.name)
new_switch.switch_type = _switch.switch_type
return new_switch
def _create_new_link(self, _link):
new_link = Link(_link.name)
new_link.resource = self.resource.switches[_link.resource.name]
new_link.nw_bandwidth = _link.nw_bandwidth
new_link.avail_nw_bandwidth = new_link.nw_bandwidth
return new_link
def _create_new_host(self, _host):
new_host = Host(_host.name)
new_host.tag.append("infra")
@ -253,101 +189,8 @@ class TopologyManager(threading.Thread):
return new_hg
def _check_switch_update(self, _switch, _rswitch):
updated = False
if _switch.switch_type != _rswitch.switch_type:
_rswitch.switch_type = _switch.switch_type
updated = True
self.logger.warn("TopologyManager: switch (" + _rswitch.name +
") updated (switch type)")
if _rswitch.status == "disabled":
_rswitch.status = "enabled"
updated = True
self.logger.warn("TopologyManager: switch (" + _rswitch.name +
") updated (enabled)")
for ulk in _switch.up_links.keys():
exist = False
for rulk in _rswitch.up_links.keys():
if ulk == rulk:
exist = True
break
if exist is False:
new_link = self._create_new_link(_switch.up_links[ulk])
_rswitch.up_links[new_link.name] = new_link
updated = True
self.logger.warn("TopologyManager: switch (" + _rswitch.name +
") updated (new link)")
for rulk in _rswitch.up_links.keys():
exist = False
for ulk in _switch.up_links.keys():
if rulk == ulk:
exist = True
break
if exist is False:
del _rswitch.up_links[rulk]
updated = True
self.logger.warn("TopologyManager: switch (" + _rswitch.name +
") updated (link removed)")
for ulk in _rswitch.up_links.keys():
link = _switch.up_links[ulk]
rlink = _rswitch.up_links[ulk]
if self._check_link_update(link, rlink) is True:
updated = True
self.logger.warn("TopologyManager: switch (" + _rswitch.name +
") updated (bandwidth)")
for plk in _switch.peer_links.keys():
exist = False
for rplk in _rswitch.peer_links.keys():
if plk == rplk:
exist = True
break
if exist is False:
new_link = self._create_new_link(_switch.peer_links[plk])
_rswitch.peer_links[new_link.name] = new_link
updated = True
self.logger.warn("TopologyManager: switch (" + _rswitch.name +
") updated (new link)")
for rplk in _rswitch.peer_links.keys():
exist = False
for plk in _switch.peer_links.keys():
if rplk == plk:
exist = True
break
if exist is False:
del _rswitch.peer_links[rplk]
updated = True
self.logger.warn("TopologyManager: switch (" + _rswitch.name +
") updated (link removed)")
for plk in _rswitch.peer_links.keys():
link = _switch.peer_links[plk]
rlink = _rswitch.peer_links[plk]
if self._check_link_update(link, rlink) is True:
updated = True
self.logger.warn("TopologyManager: switch (" + _rswitch.name +
") updated (bandwidth)")
return updated
def _check_link_update(self, _link, _rlink):
updated = False
if _link.nw_bandwidth != _rlink.nw_bandwidth:
_rlink.nw_bandwidth = _link.nw_bandwidth
updated = True
return updated
def _check_host_update(self, _host, _rhost):
updated = False
link_updated = False
if "infra" not in _rhost.tag:
_rhost.tag.append("infra")
@ -367,35 +210,10 @@ class TopologyManager(threading.Thread):
self.logger.warn("TopologyManager: host (" + _rhost.name +
") updated (host_group)")
for sk in _host.switches.keys():
exist = False
for rsk in _rhost.switches.keys():
if sk == rsk:
exist = True
break
if exist is False:
_rhost.switches[sk] = self.resource.switches[sk]
link_updated = True
self.logger.warn("TopologyManager: host (" + _rhost.name +
") updated (new switch)")
for rsk in _rhost.switches.keys():
exist = False
for sk in _host.switches.keys():
if rsk == sk:
exist = True
break
if exist is False:
del _rhost.switches[rsk]
link_updated = True
self.logger.warn("TopologyManager: host (" + _rhost.name +
") updated (switch removed)")
return (updated, link_updated)
return updated
def _check_host_group_update(self, _hg, _rhg):
updated = False
link_updated = False
if _hg.host_type != _rhg.host_type:
_rhg.host_type = _hg.host_type
@ -445,35 +263,10 @@ class TopologyManager(threading.Thread):
self.logger.warn("TopologyManager: host_group (" + _rhg.name +
") updated (child host removed)")
for sk in _hg.switches.keys():
exist = False
for rsk in _rhg.switches.keys():
if sk == rsk:
exist = True
break
if exist is False:
_rhg.switches[sk] = self.resource.switches[sk]
link_updated = True
self.logger.warn("TopologyManager: host_group (" + _rhg.name +
") updated (new switch)")
for rsk in _rhg.switches.keys():
exist = False
for sk in _hg.switches.keys():
if rsk == sk:
exist = True
break
if exist is False:
del _rhg.switches[rsk]
link_updated = True
self.logger.warn("TopologyManager: host_group (" + _rhg.name +
") updated (switch removed)")
return (updated, link_updated)
return updated
def _check_datacenter_update(self, _datacenter):
updated = False
link_updated = False
for rc in _datacenter.region_code_list:
if rc not in self.resource.datacenter.region_code_list:
@ -521,29 +314,4 @@ class TopologyManager(threading.Thread):
self.logger.warn("TopologyManager: datacenter updated "
"(resource removed)")
for sk in _datacenter.root_switches.keys():
exist = False
for rsk in self.resource.datacenter.root_switches.keys():
if sk == rsk:
exist = True
break
if exist is False:
self.resource.datacenter.root_switches[sk] = \
self.resource.switches[sk]
link_updated = True
self.logger.warn("TopologyManager: datacenter updated "
"(new switch)")
for rsk in self.resource.datacenter.root_switches.keys():
exist = False
for sk in _datacenter.root_switches.keys():
if rsk == sk:
exist = True
break
if exist is False:
del self.resource.datacenter.root_switches[rsk]
link_updated = True
self.logger.warn("TopologyManager: datacenter updated "
"(switch removed)")
return (updated, link_updated)
return updated

View File

@ -1,156 +0,0 @@
#
# Copyright 2014-2017 AT&T Intellectual Property
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Simulate datacenter configurations (i.e., layout, cabling)."""
from valet.engine.resource_manager.resource_base \
import HostGroup, Host, Switch, Link
class SimTopology(object):
"""Simulate Topology class.
Sim network and host topology for datacenters.
"""
def __init__(self, _config):
"""Init."""
self.config = _config
def set_topology(self, _datacenter, _host_groups, _hosts, _switches):
"""Return success after setting network and host topology."""
self._set_network_topology(_switches)
self._set_host_topology(_datacenter, _host_groups, _hosts, _switches)
return "success"
def _set_network_topology(self, _switches):
root_switch = Switch("r0")
root_switch.switch_type = "root"
_switches[root_switch.name] = root_switch
if self.config.num_of_spine_switches > 0:
for s_num in range(0, self.config.num_of_spine_switches):
switch = Switch(root_switch.name + "s" + str(s_num))
switch.switch_type = "spine"
_switches[switch.name] = switch
for r_num in range(0, self.config.num_of_racks):
switch = Switch(root_switch.name + "t" + str(r_num))
switch.switch_type = "ToR"
_switches[switch.name] = switch
for h_num in range(0, self.config.num_of_hosts_per_rack):
leaf_switch = Switch(switch.name + "l" + str(h_num))
leaf_switch.switch_type = "leaf"
_switches[leaf_switch.name] = leaf_switch
if self.config.num_of_spine_switches > 0:
for s_num in range(0, self.config.num_of_spine_switches):
s = _switches[root_switch.name + "s" + str(s_num)]
up_link = Link(s.name + "-" + root_switch.name)
up_link.resource = root_switch
up_link.nw_bandwidth = self.config.bandwidth_of_spine
up_link.avail_nw_bandwidth = up_link.nw_bandwidth
s.up_links[up_link.name] = up_link
if self.config.num_of_spine_switches > 1:
ps = None
if (s_num % 2) == 0:
if (s_num + 1) < self.config.num_of_spine_switches:
ps = _switches[root_switch.name + "s" +
str(s_num + 1)]
else:
ps = _switches[root_switch.name + "s" + str(s_num - 1)]
if ps is not None:
peer_link = Link(s.name + "-" + ps.name)
peer_link.resource = ps
peer_link.nw_bandwidth = self.config.bandwidth_of_spine
peer_link.avail_nw_bandwidth = peer_link.nw_bandwidth
s.peer_links[peer_link.name] = peer_link
for r_num in range(0, self.config.num_of_racks):
s = _switches[root_switch.name + "t" + str(r_num)]
parent_switch_list = []
if self.config.num_of_spine_switches > 0:
for s_num in range(0, self.config.num_of_spine_switches):
parent_switch_list.append(_switches[root_switch.name +
"s" + str(s_num)])
else:
parent_switch_list.append(_switches[root_switch.name])
for parent_switch in parent_switch_list:
up_link = Link(s.name + "-" + parent_switch.name)
up_link.resource = parent_switch
up_link.nw_bandwidth = self.config.bandwidth_of_rack
up_link.avail_nw_bandwidth = up_link.nw_bandwidth
s.up_links[up_link.name] = up_link
if self.config.num_of_racks > 1:
ps = None
if (r_num % 2) == 0:
if (r_num + 1) < self.config.num_of_racks:
ps = _switches[root_switch.name + "t" + str(r_num + 1)]
else:
ps = _switches[root_switch.name + "t" + str(r_num - 1)]
if ps is not None:
peer_link = Link(s.name + "-" + ps.name)
peer_link.resource = ps
peer_link.nw_bandwidth = self.config.bandwidth_of_rack
peer_link.avail_nw_bandwidth = peer_link.nw_bandwidth
s.peer_links[peer_link.name] = peer_link
for h_num in range(0, self.config.num_of_hosts_per_rack):
ls = _switches[s.name + "l" + str(h_num)]
l_up_link = Link(ls.name + "-" + s.name)
l_up_link.resource = s
l_up_link.nw_bandwidth = self.config.bandwidth_of_host
l_up_link.avail_nw_bandwidth = l_up_link.nw_bandwidth
ls.up_links[l_up_link.name] = l_up_link
def _set_host_topology(self, _datacenter, _host_groups, _hosts, _switches):
root_switch = _switches["r0"]
for r_num in range(0, self.config.num_of_racks):
host_group = HostGroup(_datacenter.name + "r" + str(r_num))
host_group.host_type = "rack"
switch = _switches[root_switch.name + "t" + str(r_num)]
host_group.switches[switch.name] = switch
_host_groups[host_group.name] = host_group
for h_num in range(0, self.config.num_of_hosts_per_rack):
host = Host(host_group.name + "c" + str(h_num))
leaf_switch = _switches[switch.name + "l" + str(h_num)]
host.switches[leaf_switch.name] = leaf_switch
_hosts[host.name] = host
for r_num in range(0, self.config.num_of_racks):
host_group = _host_groups[_datacenter.name + "r" + str(r_num)]
host_group.parent_resource = _datacenter
for h_num in range(0, self.config.num_of_hosts_per_rack):
host = _hosts[host_group.name + "c" + str(h_num)]
host.host_group = host_group
host_group.child_resources[host.name] = host
_datacenter.root_switches[root_switch.name] = root_switch
for r_num in range(0, self.config.num_of_racks):
host_group = _host_groups[_datacenter.name + "r" + str(r_num)]
_datacenter.resources[host_group.name] = host_group