Revise formatting of excel specs

The previous excel spec seemed to lack a consistent structure. This
change redesigns the spec to more simply and generically find where data
is located in the spreadsheet.

Change-Id: I98c1553531897e8c623caa8a1b9334ea915a3f49
This commit is contained in:
Ian H Pittwood 2019-07-02 14:39:32 -05:00
parent b0f7b952b4
commit b9da6d41eb
10 changed files with 660 additions and 366 deletions

View File

@ -18,45 +18,87 @@
specs:
# Design Spec file name: SiteDesignSpec_v0.1.xlsx
xl_spec:
ipmi_sheet_name: 'Site-Information'
start_row: 4
end_row: 15
hostname_col: 2
ipmi_address_col: 3
host_profile_col: 5
ipmi_gateway_col: 4
private_ip_sheet: 'Site-Information'
net_type_col: 1
vlan_col: 2
vlan_start_row: 19
vlan_end_row: 30
net_start_row: 33
net_end_row: 40
net_col: 2
net_vlan_col: 1
public_ip_sheet: 'Site-Information'
oam_vlan_col: 1
oam_ip_row: 43
oam_ip_col: 2
oob_net_row: 48
oob_net_start_col: 2
oob_net_end_col: 5
ingress_ip_row: 45
dns_ntp_ldap_sheet: 'Site-Information'
login_domain_row: 52
ldap_col: 2
global_group: 53
ldap_search_url_row: 54
ntp_row: 55
ntp_col: 2
dns_row: 56
dns_col: 2
domain_row: 51
domain_col: 2
location_sheet: 'Site-Information'
column: 2
corridor_row: 59
site_name_row: 58
state_name_row: 60
country_name_row: 61
clli_name_row: 62
ipmi:
# Series type data set that defines an iterable and the expected data
# for each index in the iterable
type: series
sheet_name: 'Site-Information'
iter:
index-type: row
start: 4
end: 15
data:
hostname: 2
ipmi_address: 3
ipmi_gateway: 4
host_profile: 5
private_vlan:
type: series
sheet_name: 'Site-Information'
no_sanitize: net_type
iter:
index-type: row
start: 19
end: 30
data:
net_type: 1
vlan: 2
private_net:
type: series
sheet_name: 'Site-Information'
iter:
index-type: row
start: 33
end: 40
data:
vlan: 1
ip: 2
public:
type: container
sheet_name: 'Site-Information'
data:
oam:
type: point
data:
vlan: [43, 1]
ip: [43, 2]
ingress:
type: point
data:
ip: [45, 2]
oob:
type: series
iter:
index-type: col
start: 2
end: 5
data:
ip: 48
site_info:
# Point type defines x, y (row, column) coordinates for where data can be found
type: point
sheet_name: 'Site-Information'
sanitize: false
data:
domain: [51, 2]
subdomain: [52, 2]
global_group: [53, 2]
ldap: [54, 2]
ntp: [55, 2]
dns: [56, 2]
location:
type: point
sheet_name: 'Site-Information'
sanitize: false
data:
sitename: [58, 2]
corridor: [59, 2]
state: [60, 2]
country: [61, 2]
clli: [62, 2]
...

View File

@ -12,15 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from copy import deepcopy
import logging
from openpyxl import load_workbook
from openpyxl import Workbook
import pprint
import re
import sys
import yaml
from spyglass_plugin_xls.exceptions import NoSpecMatched
from spyglass_plugin_xls import exceptions
LOG = logging.getLogger(__name__)
@ -28,7 +29,8 @@ LOG = logging.getLogger(__name__)
class ExcelParser(object):
"""Parse data from excel into a dict"""
def __init__(self, file_name: str, excel_specs: str):
def __init__(
self, file_name: str, excel_specs: str, spec: str = 'xl_spec'):
"""Initializes an ExcelParser to extract data from the Excel workbook
:param file_name: path to the Excel workbook
@ -42,7 +44,11 @@ class ExcelParser(object):
# all the inputs excel specs
combined_design_spec = self.load_excel_data(file_name)
self.wb_combined = combined_design_spec
self.spec = "xl_spec"
self.spec = spec
self.loaded_spec = self.excel_specs['specs'][self.spec]
self.validate_sheet_names_with_spec()
self.loaded_data = self.extract_data_using_spec()
@staticmethod
def sanitize(string):
@ -65,72 +71,112 @@ class ExcelParser(object):
header_value = ws.cell(row=header_row, column=ipmi_column).value
return bool(self.compare(ipmi_header, header_value))
def find_correct_spec(self):
"""Find the correct spec"""
def _get_workbook(self, sheet_name, data=None):
sheet_name_to_use = sheet_name
if data and 'sheet_name' in data:
sheet_name_to_use = data['sheet_name']
for spec in self.excel_specs["specs"]:
sheet_name = self.excel_specs["specs"][spec]["ipmi_sheet_name"]
for sheet in self.wb_combined.sheetnames:
if self.compare(sheet_name, sheet):
self.excel_specs["specs"][spec]["ipmi_sheet_name"] = sheet
if self.validate_sheet(spec, sheet):
return spec
raise NoSpecMatched(excel_specs=self.excel_specs)
def _get_workbook(self):
provided_sheetname = self.excel_specs["specs"][
self.spec]["ipmi_sheet_name"]
workbook_object, extracted_sheetname = self.get_xl_obj_and_sheetname(
provided_sheetname)
sheet_name_to_use)
if workbook_object is not None:
return workbook_object[extracted_sheetname]
else:
return self.wb_combined[provided_sheetname]
return self.wb_combined[sheet_name_to_use]
@staticmethod
def _check_sanitize_settings(data):
no_sanitize_keys = []
sanitize_default = True
if 'sanitize' in data and not data['sanitize']:
sanitize_default = False
if 'no_sanitize' in data:
no_sanitize_keys = data['no_sanitize']
return sanitize_default, no_sanitize_keys
def extract_data_points(self, data, sheet_name=None):
extracted_data = {}
ws = self._get_workbook(sheet_name, data)
sanitize_default, no_sanitize_keys = self._check_sanitize_settings(
data)
for key, coordinate in data['data'].items():
value = ws.cell(row=coordinate[0], column=coordinate[1]).value
if not sanitize_default or key in no_sanitize_keys:
extracted_data[key] = value
else:
extracted_data[key] = self.sanitize(value)
return extracted_data
def extract_data_series(self, data, sheet_name=None):
extracted_data = []
ws = self._get_workbook(sheet_name, data)
sanitize_default, no_sanitize_keys = self._check_sanitize_settings(
data)
for x in range(data['iter']['start'], data['iter']['end'] + 1):
data_dict = {}
for key, y in data['data'].items():
if data['iter']['index-type'] == 'row':
value = ws.cell(row=x, column=y).value
elif data['iter']['index-type'] == 'col':
value = ws.cell(row=y, column=x).value
else:
raise exceptions.InvalidSpec()
if value:
if not sanitize_default or key in no_sanitize_keys:
data_dict[key] = value
else:
data_dict[key] = self.sanitize(value)
if data_dict:
extracted_data.append(data_dict)
return extracted_data
def extract_data_using_spec(self, spec_dict=None, sheet_name=None):
if not spec_dict:
spec_dict = self.loaded_spec
extracted_data = {}
for name, data in spec_dict.items():
data_type = self.sanitize(data['type'])
if data_type == 'series':
extracted_data[name] = self.extract_data_series(
data, sheet_name)
elif data_type == 'point':
extracted_data[name] = self.extract_data_points(
data, sheet_name)
elif data_type == 'container':
sheet = None
if 'sheet_name' in data:
sheet = data['sheet_name']
extracted_data[name] = self.extract_data_using_spec(
data['data'], sheet or sheet_name)
return extracted_data
def get_ipmi_data(self):
"""Read IPMI data from the sheet"""
ipmi_data = {}
hosts = []
ws = self._get_workbook()
row = self.excel_specs["specs"][self.spec]["start_row"]
end_row = self.excel_specs["specs"][self.spec]["end_row"]
hostname_col = self.excel_specs["specs"][self.spec]["hostname_col"]
ipmi_address_col = self.excel_specs["specs"][
self.spec]["ipmi_address_col"]
host_profile_col = self.excel_specs["specs"][
self.spec]["host_profile_col"]
ipmi_gateway_col = self.excel_specs["specs"][
self.spec]["ipmi_gateway_col"]
previous_server_gateway = None
while row <= end_row:
hostname = self.sanitize(
ws.cell(row=row, column=hostname_col).value)
for entry in self.loaded_data['ipmi']:
hostname = entry['hostname']
hosts.append(hostname)
ipmi_address = ws.cell(row=row, column=ipmi_address_col).value
if "/" in ipmi_address:
ipmi_address = ipmi_address.split("/")[0]
ipmi_gateway = ws.cell(row=row, column=ipmi_gateway_col).value
if ipmi_gateway:
previous_server_gateway = ipmi_gateway
ipmi_data[hostname] = deepcopy(entry)
ipmi_data[hostname].pop('hostname')
if "/" in ipmi_data[hostname]['ipmi_address']:
ipmi_data[hostname]['ipmi_address'] = ipmi_data[hostname][
'ipmi_address'].split("/")[0]
if ipmi_data[hostname]['ipmi_gateway']:
previous_server_gateway = ipmi_data[hostname]['ipmi_gateway']
else:
ipmi_gateway = previous_server_gateway
host_profile = ws.cell(row=row, column=host_profile_col).value
try:
if host_profile is None:
raise RuntimeError(
"No value read from {} ".format(self.file_name) +
"sheet:{} row:{}, col:{}".format(
self.spec, row, host_profile_col))
except RuntimeError as rerror:
LOG.critical(rerror)
sys.exit("Spyglass exited")
ipmi_data[hostname] = {
"ipmi_address": ipmi_address,
"ipmi_gateway": ipmi_gateway,
"host_profile": host_profile,
}
row += 1
ipmi_data[hostname]['ipmi_gateway'] = previous_server_gateway
if not ipmi_data[hostname]['host_profile']:
raise exceptions.MissingData(
missing_data='host_profile', section='host %s' % hostname)
LOG.debug(
"ipmi data extracted from excel:\n{}".format(
pprint.pformat(ipmi_data)))
@ -139,22 +185,15 @@ class ExcelParser(object):
pprint.pformat(hosts)))
return [ipmi_data, hosts]
def get_private_vlan_data(self, ws):
def get_private_vlan_data(self):
"""Get private vlan data from private IP sheet"""
vlan_data = {}
row = self.excel_specs["specs"][self.spec]["vlan_start_row"]
end_row = self.excel_specs["specs"][self.spec]["vlan_end_row"]
type_col = self.excel_specs["specs"][self.spec]["net_type_col"]
vlan_col = self.excel_specs["specs"][self.spec]["vlan_col"]
while row <= end_row:
cell_value = ws.cell(row=row, column=type_col).value
if cell_value:
vlan = ws.cell(row=row, column=vlan_col).value
if vlan:
vlan = vlan.lower()
vlan_data[vlan] = cell_value
row += 1
for entry in self.loaded_data['private_vlan']:
net_type = entry['net_type']
if net_type:
vlan = re.sub(r'\W+', '', entry['vlan']).lower()
vlan_data[vlan] = net_type
LOG.debug(
"vlan data extracted from excel:\n%s", pprint.pformat(vlan_data))
return vlan_data
@ -162,73 +201,42 @@ class ExcelParser(object):
def get_private_network_data(self):
"""Read network data from the private ip sheet"""
ws = self._get_workbook()
vlan_data = self.get_private_vlan_data(ws)
vlan_data = self.get_private_vlan_data()
network_data = {}
row = self.excel_specs["specs"][self.spec]["net_start_row"]
end_row = self.excel_specs["specs"][self.spec]["net_end_row"]
col = self.excel_specs["specs"][self.spec]["net_col"]
vlan_col = self.excel_specs["specs"][self.spec]["net_vlan_col"]
old_vlan = ""
while row <= end_row:
vlan = ws.cell(row=row, column=vlan_col).value
if vlan:
vlan = vlan.lower()
network = ws.cell(row=row, column=col).value
if vlan and network:
net_type = vlan_data[vlan]
if "vlan" not in network_data:
network_data[net_type] = {"vlan": vlan, "subnet": []}
elif not vlan and network:
# If vlan is not present then assign old vlan to vlan as vlan
# value is spread over several rows
vlan = old_vlan
else:
row += 1
continue
network_data[vlan_data[vlan]]["subnet"].append(network)
old_vlan = vlan
row += 1
for entry in self.loaded_data['private_net']:
vlan = re.sub(r'\W+', '', entry['vlan']).lower()
network = entry['ip']
net_type = vlan_data[vlan]
if net_type not in network_data:
network_data[net_type] = {"vlan": vlan, "subnet": []}
network_data[net_type]["subnet"].append(network)
for network in network_data:
network_data[network]["is_common"] = True
"""
if len(network_data[network]['subnet']) > 1:
network_data[network]['is_common'] = False
else:
network_data[network]['is_common'] = True
LOG.debug(
"private network data extracted from\
excel:\n%s", pprint.pformat(network_data))
"""
return network_data
def get_public_network_data(self):
"""Read public network data from public ip data"""
network_data = {}
ws = self._get_workbook()
oam_row = self.excel_specs["specs"][self.spec]["oam_ip_row"]
oam_col = self.excel_specs["specs"][self.spec]["oam_ip_col"]
oam_vlan_col = self.excel_specs["specs"][self.spec]["oam_vlan_col"]
ingress_row = self.excel_specs["specs"][self.spec]["ingress_ip_row"]
oob_row = self.excel_specs["specs"][self.spec]["oob_net_row"]
col = self.excel_specs["specs"][self.spec]["oob_net_start_col"]
end_col = self.excel_specs["specs"][self.spec]["oob_net_end_col"]
oam_net = self.loaded_data['public']['oam']['ip']
if type(oam_net) is str:
oam_net = [oam_net]
network_data = {
"oam": {
"subnet": [ws.cell(row=oam_row, column=oam_col).value],
"vlan": ws.cell(row=oam_row, column=oam_vlan_col).value,
'subnet': oam_net,
'vlan': re.sub(
r'\W+', '', self.loaded_data['public']['oam']['vlan'])
},
"ingress": ws.cell(row=ingress_row, column=oam_col).value,
"ingress": self.loaded_data['public']['ingress']['ip'],
"oob": {
"subnet": []
}
}
while col <= end_col:
cell_value = ws.cell(row=oob_row, column=col).value
if cell_value:
network_data["oob"]["subnet"].append(self.sanitize(cell_value))
col += 1
for entry in self.loaded_data['public']['oob']:
oob_net = entry['ip']
if oob_net:
network_data["oob"]["subnet"].append(self.sanitize(oob_net))
LOG.debug(
"public network data extracted from\
excel:\n%s",
@ -240,47 +248,22 @@ class ExcelParser(object):
"""Read location, dns, ntp and ldap data"""
site_info = {}
provided_sheetname = self.excel_specs["specs"][
self.spec]["ipmi_sheet_name"]
ws = self._get_workbook()
dns_row = self.excel_specs["specs"][self.spec]["dns_row"]
dns_col = self.excel_specs["specs"][self.spec]["dns_col"]
ntp_row = self.excel_specs["specs"][self.spec]["ntp_row"]
ntp_col = self.excel_specs["specs"][self.spec]["ntp_col"]
domain_row = self.excel_specs["specs"][self.spec]["domain_row"]
domain_col = self.excel_specs["specs"][self.spec]["domain_col"]
login_domain_row = self.excel_specs["specs"][
self.spec]["login_domain_row"]
ldap_col = self.excel_specs["specs"][self.spec]["ldap_col"]
global_group = self.excel_specs["specs"][self.spec]["global_group"]
ldap_search_url_row = self.excel_specs["specs"][
self.spec]["ldap_search_url_row"]
dns_servers = ws.cell(row=dns_row, column=dns_col).value
ntp_servers = ws.cell(row=ntp_row, column=ntp_col).value
try:
if dns_servers is None:
raise RuntimeError(
(
"No value for dns_server from:{} Sheet:'{}' ",
"Row:{} Col:{}",
).format(
self.file_name, provided_sheetname, dns_row, dns_col))
except RuntimeError as rerror:
LOG.critical(rerror)
sys.exit("Tugboat exited!!")
dns_servers = self.loaded_data['site_info']['dns']
ntp_servers = self.loaded_data['site_info']['ntp']
if dns_servers is None:
raise exceptions.MissingData(
missing_data='dns servers', section='site_info')
dns_servers = list(filter(None, re.split(" |,|\n", dns_servers)))
ntp_servers = list(filter(None, re.split(" |,|\n", ntp_servers)))
site_info = {
"location": self.get_location_data(),
"dns": dns_servers,
"ntp": ntp_servers,
"domain": ws.cell(row=domain_row, column=domain_col).value,
"domain": self.loaded_data['site_info']['domain'],
"ldap": {
"subdomain": ws.cell(row=login_domain_row,
column=ldap_col).value,
"common_name": ws.cell(row=global_group,
column=ldap_col).value,
"url": ws.cell(row=ldap_search_url_row, column=ldap_col).value,
"subdomain": self.loaded_data['site_info']['subdomain'],
"common_name": self.loaded_data['site_info']['global_group'],
"url": self.loaded_data['site_info']['ldap'],
},
}
LOG.debug(
@ -292,59 +275,37 @@ class ExcelParser(object):
def get_location_data(self):
"""Read location data from the site and zone sheet"""
ws = self._get_workbook()
corridor_row = self.excel_specs["specs"][self.spec]["corridor_row"]
column = self.excel_specs["specs"][self.spec]["column"]
site_name_row = self.excel_specs["specs"][self.spec]["site_name_row"]
state_name_row = self.excel_specs["specs"][self.spec]["state_name_row"]
country_name_row = self.excel_specs["specs"][
self.spec]["country_name_row"]
clli_name_row = self.excel_specs["specs"][self.spec]["clli_name_row"]
return {
"corridor": ws.cell(row=corridor_row, column=column).value,
"name": ws.cell(row=site_name_row, column=column).value,
"state": ws.cell(row=state_name_row, column=column).value,
"country": ws.cell(row=country_name_row, column=column).value,
"physical_location": ws.cell(row=clli_name_row,
column=column).value,
"corridor": self.loaded_data['location']['corridor'],
"name": self.loaded_data['location']['sitename'],
"state": self.loaded_data['location']['state'],
"country": self.loaded_data['location']['country'],
"physical_location": self.loaded_data['location']['clli'],
}
def validate_sheet_names_with_spec(self):
"""Checks is sheet name in spec file matches with excel file"""
spec = list(self.excel_specs["specs"].keys())[0]
spec_item = self.excel_specs["specs"][spec]
sheet_name_list = []
ipmi_header_sheet_name = spec_item["ipmi_sheet_name"]
sheet_name_list.append(ipmi_header_sheet_name)
private_ip_sheet_name = spec_item["private_ip_sheet"]
sheet_name_list.append(private_ip_sheet_name)
public_ip_sheet_name = spec_item["public_ip_sheet"]
sheet_name_list.append(public_ip_sheet_name)
dns_ntp_ldap_sheet_name = spec_item["dns_ntp_ldap_sheet"]
sheet_name_list.append(dns_ntp_ldap_sheet_name)
location_sheet_name = spec_item["location_sheet"]
sheet_name_list.append(location_sheet_name)
for sheetname in sheet_name_list:
for key, data in self.loaded_spec.items():
sheet_name_list.append(data['sheet_name'])
for sheet_name in sheet_name_list:
workbook_object, extracted_sheetname = (
self.get_xl_obj_and_sheetname(sheetname))
self.get_xl_obj_and_sheetname(sheet_name))
if workbook_object is not None:
wb = workbook_object
sheetname = extracted_sheetname
sheet_name = extracted_sheetname
else:
wb = self.wb_combined
if sheetname not in wb.sheetnames:
raise RuntimeError(
"SheetName '{}' not found ".format(sheetname))
if sheet_name not in wb.sheetnames:
raise exceptions.ExcelSheetNotFound(sheet_name=sheet_name)
LOG.info("Sheet names in excel spec validated")
def get_data(self):
"""Create a dict with combined data"""
self.validate_sheet_names_with_spec()
ipmi_data = self.get_ipmi_data()
network_data = self.get_private_network_data()
public_network_data = self.get_public_network_data()

View File

@ -25,3 +25,19 @@ class ExcelFileNotSpecified(SpyglassBaseException):
class ExcelSpecNotSpecified(SpyglassBaseException):
message = 'Engineering excel spec not specified'
class InvalidSpec(SpyglassBaseException):
message = (
'Series type dataset is missing iter index type. '
'Possible index types are "row" or "col".')
class MissingData(SpyglassBaseException):
message = 'No %(missing_data) specified for %(section)'
class ExcelSheetNotFound(SpyglassBaseException):
message = (
'Sheet name %(sheet_name) could not be resolved in the given '
'Excel files.')

View File

@ -15,6 +15,153 @@
import pytest
@pytest.fixture(scope='class')
def raw_excel_data(request):
request.cls.raw_excel_data = {
'ipmi': [
{
'hostname': 'cab2r72c12',
'ipmi_address': '10.0.220.138',
'ipmi_gateway': '10.0.220.129',
'host_profile': 'dp-r720'
}, {
'hostname': 'cab2r72c13',
'ipmi_address': '10.0.220.139',
'ipmi_gateway': '10.0.220.129',
'host_profile': 'dp-r720'
}, {
'hostname': 'cab2r72c14',
'ipmi_address': '10.0.220.140',
'ipmi_gateway': '10.0.220.129',
'host_profile': 'dp-r720'
}, {
'hostname': 'cab2r72c15',
'ipmi_address': '10.0.220.141',
'ipmi_gateway': '10.0.220.129',
'host_profile': 'dp-r720'
}, {
'hostname': 'cab2r72c16',
'ipmi_address': '10.0.220.142',
'ipmi_gateway': '10.0.220.129',
'host_profile': 'cp-r720'
}, {
'hostname': 'cab2r72c17',
'ipmi_address': '10.0.220.143',
'ipmi_gateway': '10.0.220.129',
'host_profile': 'cp-r720'
}, {
'hostname': 'cab2r73c12',
'ipmi_address': '10.0.220.170',
'ipmi_gateway': '10.0.220.161',
'host_profile': 'dp-r720'
}, {
'hostname': 'cab2r73c13',
'ipmi_address': '10.0.220.171',
'ipmi_gateway': '10.0.220.161',
'host_profile': 'dp-r720'
}, {
'hostname': 'cab2r73c14',
'ipmi_address': '10.0.220.172',
'ipmi_gateway': '10.0.220.161',
'host_profile': 'dp-r720'
}, {
'hostname': 'cab2r73c15',
'ipmi_address': '10.0.220.173',
'ipmi_gateway': '10.0.220.161',
'host_profile': 'dp-r720'
}, {
'hostname': 'cab2r73c16',
'ipmi_address': '10.0.220.174',
'ipmi_gateway': '10.0.220.161',
'host_profile': 'cp-r720'
}, {
'hostname': 'cab2r73c17',
'ipmi_address': '10.0.220.175',
'ipmi_gateway': '10.0.220.161',
'host_profile': 'cp-r720'
}
],
'private_vlan': [
{
'net_type': 'iSCSI/Storage',
'vlan': 'vlan23'
}, {
'net_type': 'PXE',
'vlan': 'vlan21'
}, {
'net_type': 'Calico BGP peering addresses',
'vlan': 'vlan22'
}, {
'net_type': 'Overlay',
'vlan': 'vlan24'
}, {
'net_type': 'CNI Pod addresses',
'vlan': 'n/a'
}
],
'private_net': [
{
'vlan': 'vlan23',
'ip': '30.31.1.0/25'
}, {
'vlan': 'vlan21',
'ip': '30.30.4.0/25'
}, {
'vlan': 'vlan21',
'ip': '30.30.4.128/25'
}, {
'vlan': 'vlan21',
'ip': '30.30.5.0/25'
}, {
'vlan': 'vlan21',
'ip': '30.30.5.128/25'
}, {
'vlan': 'vlan22',
'ip': '30.29.1.0/25'
}, {
'vlan': 'vlan24',
'ip': '30.19.0.0/25'
}
],
'public': {
'oam': {
'vlan': 'vlan-21',
'ip': '10.0.220.0/26'
},
'ingress': {
'ip': '10.0.220.72/29'
},
'oob': [
{
'ip': '10.0.220.128/27'
}, {
'ip': '10.0.220.160/27'
}, {
'ip': '10.0.220.192/27'
}, {
'ip': '10.0.220.224/27'
}
]
},
'site_info': {
'domain': 'dmy00.example.com',
'subdomain': 'testitservices',
'global_group': 'AA-AAA-dmy00',
'ldap': 'url: ldap://ldap.example.com',
'ntp': '150.234.210.5 (ns1.example.com)',
'dns': '40.40.40.40 (ntp1.example.com),'
'\n41.41.41.41 (ntp2.example.com)'
},
'location': {
'sitename': 'SampleSiteName',
'corridor': 'Corridor 1',
'state': 'New Jersey',
'country': 'SampleCountry',
'clli': 'XXXXXX21'
}
}
@pytest.fixture(scope='class')
def site_data(request):
request.cls.site_data = {
@ -99,12 +246,12 @@ def site_data(request):
'network_data': {
'private': {
'iSCSI/Storage': {
'vlan': 'vlan 23',
'vlan': 'vlan23',
'subnet': ['30.31.1.0/25'],
'is_common': True
},
'PXE': {
'vlan': 'vlan 21',
'vlan': 'vlan21',
'subnet': [
'30.30.4.0/25', '30.30.4.128/25', '30.30.5.0/25',
'30.30.5.128/25'
@ -112,12 +259,12 @@ def site_data(request):
'is_common': True
},
'Calico BGP peering addresses': {
'vlan': 'vlan 22',
'vlan': 'vlan22',
'subnet': ['30.29.1.0/25'],
'is_common': True
},
'Overlay': {
'vlan': 'vlan 24',
'vlan': 'vlan24',
'subnet': ['30.19.0.0/25'],
'is_common': True
}
@ -125,7 +272,7 @@ def site_data(request):
'public': {
'oam': {
'subnet': ['10.0.220.0/26'],
'vlan': 'VLAN-21'
'vlan': 'vlan21'
},
'ingress': '10.0.220.72/29',
'oob': {

View File

@ -18,47 +18,87 @@
specs:
# Design Spec file name: SiteDesignSpec_v0.1.xlsx
xl_spec:
header_row: 3
ipmi_address_header: "IPMI Address"
ipmi_sheet_name: 'Site-Information'
start_row: 4
end_row: 15
hostname_col: 2
ipmi_address_col: 3
host_profile_col: 5
ipmi_gateway_col: 4
private_ip_sheet: 'Site-Information'
net_type_col: 1
vlan_col: 2
vlan_start_row: 19
vlan_end_row: 30
net_start_row: 33
net_end_row: 40
net_col: 2
net_vlan_col: 1
public_ip_sheet: 'Site-Information'
oam_vlan_col: 1
oam_ip_row: 43
oam_ip_col: 2
oob_net_row: 48
oob_net_start_col: 2
oob_net_end_col: 5
ingress_ip_row: 45
dns_ntp_ldap_sheet: 'Site-Information'
login_domain_row: 52
ldap_col: 2
global_group: 53
ldap_search_url_row: 54
ntp_row: 55
ntp_col: 2
dns_row: 56
dns_col: 2
domain_row: 51
domain_col: 2
location_sheet: 'Site-Information'
column: 2
corridor_row: 59
site_name_row: 58
state_name_row: 60
country_name_row: 61
clli_name_row: 62
ipmi:
# Series type data set that defines an iterable and the expected data
# for each index in the iterable
type: series
sheet_name: 'Site-Information'
iter:
index-type: row
start: 4
end: 15
data:
hostname: 2
ipmi_address: 3
ipmi_gateway: 4
host_profile: 5
private_vlan:
type: series
sheet_name: 'Site-Information'
no_sanitize: net_type
iter:
index-type: row
start: 19
end: 30
data:
net_type: 1
vlan: 2
private_net:
type: series
sheet_name: 'Site-Information'
iter:
index-type: row
start: 33
end: 40
data:
vlan: 1
ip: 2
public:
type: container
sheet_name: 'Site-Information'
data:
oam:
type: point
data:
vlan: [43, 1]
ip: [43, 2]
ingress:
type: point
data:
ip: [45, 2]
oob:
type: series
iter:
index-type: col
start: 2
end: 5
data:
ip: 48
site_info:
# Point type defines x, y (row, column) coordinates for where data can be found
type: point
sheet_name: 'Site-Information'
sanitize: false
data:
domain: [51, 2]
subdomain: [52, 2]
global_group: [53, 2]
ldap: [54, 2]
ntp: [55, 2]
dns: [56, 2]
location:
type: point
sheet_name: 'Site-Information'
sanitize: false
data:
sitename: [58, 2]
corridor: [59, 2]
state: [60, 2]
country: [61, 2]
clli: [62, 2]
...

View File

@ -18,47 +18,85 @@
specs:
# Design Spec file name: SiteDesignSpec_v0.1.xlsx
xl_spec:
header_row: 3
ipmi_address_header: "IPMI Address"
ipmi_sheet_name: 'Sheet-DNE'
start_row: 4
end_row: 15
hostname_col: 2
ipmi_address_col: 2
host_profile_col: 5
ipmi_gateway_col: 7
private_ip_sheet: 'Site-Information'
net_type_col: 1
vlan_col: 2
vlan_start_row: 19
vlan_end_row: 30
net_start_row: 33
net_end_row: 40
net_col: 2
net_vlan_col: 1
public_ip_sheet: 'Site-Information'
oam_vlan_col: 1
oam_ip_row: 43
oam_ip_col: 2
oob_net_row: 48
oob_net_start_col: 2
oob_net_end_col: 5
ingress_ip_row: 45
dns_ntp_ldap_sheet: 'Site-Information'
login_domain_row: 52
ldap_col: 2
global_group: 53
ldap_search_url_row: 54
ntp_row: 55
ntp_col: 2
dns_row: 56
dns_col: 2
domain_row: 51
domain_col: 2
location_sheet: 'Site-Information'
column: 2
corridor_row: 59
site_name_row: 58
state_name_row: 60
country_name_row: 61
clli_name_row: 62
ipmi:
# Series type data set that defines an iterable and the expected data
# for each index in the iterable
type: series
sheet_name: 'Sheet DNE'
iter:
index-type: row
start: 4
end: 15
data:
hostname: 2
ipmi_address: 7
ipmi_gateway: 4
host_profile: 5
private_vlan:
type: series
sheet_name: 'Site-Information'
iter:
index-type: row
start: 19
end: 30
data:
net_type: 1
vlan: 2
private_net:
type: series
sheet_name: 'Site-Information'
iter:
index-type: row
start: 33
end: 40
data:
net_vlan: 1
net: 2
public:
type: container
sheet_name: 'Site-Information'
data:
oam:
type: point
data:
oam_vlan: [43, 1]
oam_ip: [43, 2]
ingress:
type: point
data:
ingress_ip: [45, 2]
oob:
type: series
iter:
index-type: col
start: 2
end: 5
rows:
oob_net: 48
site_info:
# Point type defines x, y (row, column) coordinates for where data can be found
type: point
sheet_name: 'Site-Information'
data:
domain: [51, 2]
subdomain: [52, 2]
global_group: [53, 2]
ldap: [54, 2]
ntp: [55, 2]
dns: [56, 2]
location:
type: point
sheet_name: 'Site-Information'
data:
sitename: [58, 2]
corridor: [59, 2]
state: [60, 2]
country: [61, 2]
clli: [62, 2]
...

View File

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from copy import copy
from copy import deepcopy
import os
import unittest
from unittest import mock
@ -89,7 +89,7 @@ class TestExcelPlugin(unittest.TestCase):
def test_get_racks(self):
region = 'test_region'
obj = ExcelPlugin(region)
obj.parsed_xl_data = self.site_data
obj.parsed_xl_data = deepcopy(self.site_data)
result = obj.get_racks(region)
self.assertEqual(2, len(result))
for rack in result:
@ -104,7 +104,7 @@ class TestExcelPlugin(unittest.TestCase):
def test_get_hosts(self):
region = 'test_region'
obj = ExcelPlugin(region)
obj.parsed_xl_data = self.site_data
obj.parsed_xl_data = deepcopy(self.site_data)
result = obj.get_hosts(region)
self.assertEqual(12, len(result))
for host in result:
@ -116,7 +116,7 @@ class TestExcelPlugin(unittest.TestCase):
def test_get_hosts_using_rack(self):
region = 'test_region'
obj = ExcelPlugin(region)
obj.parsed_xl_data = self.site_data
obj.parsed_xl_data = deepcopy(self.site_data)
result = obj.get_hosts(region, 'rack73')
self.assertEqual(6, len(result))
for host in result:
@ -158,10 +158,10 @@ class TestExcelPlugin(unittest.TestCase):
'name': 'iSCSI/Storage'
}
}
network_data = self.site_data['network_data']
network_data = deepcopy(self.site_data['network_data'])
region = 'test_region'
obj = ExcelPlugin(region)
obj.parsed_xl_data = self.site_data
obj.parsed_xl_data = deepcopy(self.site_data)
result = obj.get_networks(region)
self.assertEqual(7, len(result))
for vlan_data in result:
@ -179,7 +179,7 @@ class TestExcelPlugin(unittest.TestCase):
def test_get_ips(self):
region = 'test_region'
obj = ExcelPlugin(region)
obj.parsed_xl_data = self.site_data
obj.parsed_xl_data = deepcopy(self.site_data)
host_name = 'cab2r72c15'
result = obj.get_ips(region, host_name)
self.assertIsInstance(result, models.IPList)
@ -188,20 +188,20 @@ class TestExcelPlugin(unittest.TestCase):
result.oob)
def test_get_ldap_information(self):
expected_ldap_data = copy(self.site_data['site_info']['ldap'])
expected_ldap_data = deepcopy(self.site_data['site_info']['ldap'])
expected_ldap_data['domain'] = 'example'
expected_ldap_data['url'] = expected_ldap_data['url'].split(' ')[1]
region = 'test_region'
obj = ExcelPlugin(region)
obj.parsed_xl_data = self.site_data
obj.parsed_xl_data = deepcopy(self.site_data)
result = obj.get_ldap_information(region)
self.assertDictEqual(expected_ldap_data, result)
def test_get_ntp_servers(self):
expected_ntp_servers = self.site_data['site_info']['ntp'][:1]
expected_ntp_servers = deepcopy(self.site_data['site_info']['ntp'][:1])
region = 'test_region'
obj = ExcelPlugin(region)
obj.parsed_xl_data = self.site_data
obj.parsed_xl_data = deepcopy(self.site_data)
result = obj.get_ntp_servers(region)
self.assertIsInstance(result, models.ServerList)
self.assertEqual(expected_ntp_servers, result.servers)
@ -213,7 +213,7 @@ class TestExcelPlugin(unittest.TestCase):
]
region = 'test_region'
obj = ExcelPlugin(region)
obj.parsed_xl_data = self.site_data
obj.parsed_xl_data = deepcopy(self.site_data)
result = obj.get_dns_servers(region)
self.assertIsInstance(result, models.ServerList)
self.assertEqual(expected_dns_servers, result.servers)
@ -221,36 +221,38 @@ class TestExcelPlugin(unittest.TestCase):
def test_get_domain_name(self):
region = 'test_region'
obj = ExcelPlugin(region)
obj.parsed_xl_data = self.site_data
obj.parsed_xl_data = deepcopy(self.site_data)
result = obj.get_domain_name(region)
self.assertEqual(self.site_data['site_info']['domain'], result)
def test_get_location_information(self):
expected_location_data = copy(self.site_data['site_info']['location'])
expected_location_data = deepcopy(
self.site_data['site_info']['location'])
expected_location_data['corridor'] = 'c1'
expected_location_data[
'physical_location_id'] = expected_location_data.pop(
'physical_location')
region = 'test_region'
obj = ExcelPlugin(region)
obj.parsed_xl_data = self.site_data
obj.parsed_xl_data = deepcopy(self.site_data)
result = obj.get_location_information(region)
self.assertDictEqual(expected_location_data, result)
def test_get_site_info(self):
expected_ntp_servers = self.site_data['site_info']['ntp'][:1]
expected_ntp_servers = deepcopy(self.site_data['site_info']['ntp'][:1])
expected_dns_servers = [
self.site_data['site_info']['dns'][0],
self.site_data['site_info']['dns'][2]
]
expected_location_data = copy(self.site_data['site_info']['location'])
expected_location_data = deepcopy(
self.site_data['site_info']['location'])
expected_location_data['corridor'] = 'c1'
expected_location_data[
'physical_location_id'] = expected_location_data.pop(
'physical_location')
expected_ldap_data = copy(self.site_data['site_info']['ldap'])
expected_ldap_data = deepcopy(self.site_data['site_info']['ldap'])
expected_ldap_data['domain'] = 'example'
expected_ldap_data['url'] = expected_ldap_data['url'].split(' ')[1]
@ -342,7 +344,7 @@ class TestExcelPlugin(unittest.TestCase):
}
region = 'test_region'
obj = ExcelPlugin(region)
obj.parsed_xl_data = self.site_data
obj.parsed_xl_data = deepcopy(self.site_data)
result = obj._get_rackwise_hosts()
self.assertDictEqual(expected_data, result)
@ -350,6 +352,6 @@ class TestExcelPlugin(unittest.TestCase):
expected_data = {'r72': 'rack72', 'r73': 'rack73'}
region = 'test_region'
obj = ExcelPlugin(region)
obj.parsed_xl_data = self.site_data
obj.parsed_xl_data = deepcopy(self.site_data)
result = obj._get_rack_data()
self.assertDictEqual(expected_data, result)

View File

@ -18,10 +18,10 @@ import unittest
from openpyxl import Workbook
from openpyxl.worksheet.worksheet import Worksheet
import pytest
from spyglass_plugin_xls.exceptions import NoSpecMatched
import yaml
from spyglass_plugin_xls.excel_parser import ExcelParser
from spyglass_plugin_xls.exceptions import ExcelSheetNotFound
FIXTURE_DIR = os.path.join(
os.path.dirname(os.path.dirname(__file__)), 'shared')
@ -35,16 +35,23 @@ EXCEL_FILE_PATH = os.path.join(FIXTURE_DIR, 'SiteDesignSpec_v0.1.xlsx')
SITE_CONFIG_PATH = os.path.join(FIXTURE_DIR, 'site_config.yaml')
@pytest.mark.usefixtures('raw_excel_data')
@pytest.mark.usefixtures('site_data')
class TestExcelParser(unittest.TestCase):
"""Tests for ExcelParser"""
SPEC = 'xl_spec'
maxDiff = None
def test___init__(self):
with open(EXCEL_SPEC_PATH, 'r') as f:
loaded_spec = yaml.safe_load(f)
result = ExcelParser(EXCEL_FILE_PATH, EXCEL_SPEC_PATH)
result = ExcelParser(EXCEL_FILE_PATH, EXCEL_SPEC_PATH, self.SPEC)
self.assertEqual(EXCEL_FILE_PATH, result.file_name)
self.assertDictEqual(loaded_spec, result.excel_specs)
self.assertDictEqual(
loaded_spec['specs'][self.SPEC], result.loaded_spec)
self.assertDictEqual(self.raw_excel_data, result.loaded_data)
self.assertIsInstance(result.wb_combined, Workbook)
self.assertEqual('xl_spec', result.spec)
@ -68,31 +75,73 @@ class TestExcelParser(unittest.TestCase):
result = obj.compare(test_string1, test_string2)
self.assertFalse(result)
@unittest.skip(
'Ian Pittwood: Not in use. Sheet validation will be redone separately.'
)
def test_validate_sheet(self):
obj = ExcelParser(EXCEL_FILE_PATH, EXCEL_SPEC_PATH)
result = obj.validate_sheet('xl_spec', 'Site-Information')
self.assertTrue(result)
@unittest.skip(
'Ian Pittwood: Not in use. Sheet validation will be redone separately.'
)
def test_validate_sheet_invalid(self):
obj = ExcelParser(EXCEL_FILE_PATH, INVALID_EXCEL_SPEC_PATH)
result = obj.validate_sheet('xl_spec', 'Site-Information')
self.assertFalse(result)
def test_find_correct_spec(self):
obj = ExcelParser(EXCEL_FILE_PATH, EXCEL_SPEC_PATH)
result = obj.find_correct_spec()
self.assertEqual('xl_spec', result)
def test_find_correct_spec_no_spec_matched(self):
obj = ExcelParser(EXCEL_FILE_PATH, INVALID_EXCEL_SPEC_PATH)
with self.assertRaises(NoSpecMatched):
obj.find_correct_spec()
def test__get_workbook(self):
obj = ExcelParser(EXCEL_FILE_PATH, EXCEL_SPEC_PATH)
result = obj._get_workbook()
result = obj._get_workbook('Site-Information')
self.assertIsInstance(result, Worksheet)
def test__check_sanitize_settings(self):
test_data_sanitize_only = {'sanitize': False}
sanitize, no_sanitize_keys = ExcelParser._check_sanitize_settings(
test_data_sanitize_only)
self.assertFalse(sanitize)
self.assertFalse(no_sanitize_keys)
test_data_no_sanitize_keys = {
'no_sanitize': ['here', 'is', 'some', 'keys']
}
sanitize, no_sanitize_keys = ExcelParser._check_sanitize_settings(
test_data_no_sanitize_keys)
self.assertTrue(sanitize)
self.assertEqual(
test_data_no_sanitize_keys['no_sanitize'], no_sanitize_keys)
def test_extract_data_points(self):
expected_data = self.raw_excel_data['public']['oam']
obj = ExcelParser(EXCEL_FILE_PATH, EXCEL_SPEC_PATH, self.SPEC)
result = obj.extract_data_points(
obj.loaded_spec['public']['data']['oam'], 'Site-Information')
self.assertDictEqual(expected_data, result)
def test_extract_data_points_unsanitized(self):
expected_data = self.raw_excel_data['location']
obj = ExcelParser(EXCEL_FILE_PATH, EXCEL_SPEC_PATH, self.SPEC)
result = obj.extract_data_points(obj.loaded_spec['location'])
self.assertDictEqual(expected_data, result)
def test_extract_data_series(self):
expected_data = self.raw_excel_data['ipmi']
obj = ExcelParser(EXCEL_FILE_PATH, EXCEL_SPEC_PATH, self.SPEC)
result = obj.extract_data_series(obj.loaded_spec['ipmi'])
self.assertEqual(expected_data, result)
def test_extract_data_series_no_sanitize(self):
expected_data = self.raw_excel_data['private_vlan']
obj = ExcelParser(EXCEL_FILE_PATH, EXCEL_SPEC_PATH, self.SPEC)
result = obj.extract_data_series(obj.loaded_spec['private_vlan'])
self.assertEqual(expected_data, result)
def test_extract_data_using_spec(self):
expected_data = self.raw_excel_data
obj = ExcelParser(EXCEL_FILE_PATH, EXCEL_SPEC_PATH, self.SPEC)
result = obj.extract_data_using_spec(obj.loaded_spec)
self.assertDictEqual(expected_data, result)
def test_get_ipmi_data(self):
expected_hosts = self.site_data['ipmi_data'][1]
expected_ipmi_data = self.site_data['ipmi_data'][0]
@ -103,14 +152,14 @@ class TestExcelParser(unittest.TestCase):
def test_get_private_vlan_data(self):
expected_vlan_data = {
'vlan 23': 'iSCSI/Storage',
'vlan 21': 'PXE',
'vlan 22': 'Calico BGP peering addresses',
'vlan 24': 'Overlay',
'n/a': 'CNI Pod addresses'
'vlan23': 'iSCSI/Storage',
'vlan21': 'PXE',
'vlan22': 'Calico BGP peering addresses',
'vlan24': 'Overlay',
'na': 'CNI Pod addresses'
}
obj = ExcelParser(EXCEL_FILE_PATH, EXCEL_SPEC_PATH)
result = obj.get_private_vlan_data(obj._get_workbook())
result = obj.get_private_vlan_data()
self.assertDictEqual(expected_vlan_data, result)
def test_get_private_network_data(self):
@ -142,9 +191,8 @@ class TestExcelParser(unittest.TestCase):
self.assertIsNone(obj.validate_sheet_names_with_spec())
def test_validate_sheet_names_with_spec_invalid(self):
obj = ExcelParser(EXCEL_FILE_PATH, INVALID_EXCEL_SPEC_PATH)
with self.assertRaises(RuntimeError):
obj.validate_sheet_names_with_spec()
with self.assertRaises(ExcelSheetNotFound):
ExcelParser(EXCEL_FILE_PATH, INVALID_EXCEL_SPEC_PATH)
def test_get_data(self):
expected_data = self.site_data