a4c09b699f
Checks if file is csv and coverts csv data to a new workbook Change-Id: I69f53268870f9293172ac3c50ab9553db0061352
366 lines
13 KiB
Python
366 lines
13 KiB
Python
# Copyright 2019 AT&T Intellectual Property. All other rights reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
from copy import deepcopy
|
|
import csv
|
|
import logging
|
|
import os
|
|
import pprint
|
|
import re
|
|
|
|
from openpyxl import load_workbook
|
|
from openpyxl import Workbook
|
|
import yaml
|
|
|
|
from spyglass_plugin_xls import exceptions
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
class ExcelParser(object):
|
|
"""Parse data from excel into a dict"""
|
|
def __init__(
|
|
self, file_name: str, excel_specs: str, spec: str = 'xl_spec'):
|
|
"""Initializes an ExcelParser to extract data from the Excel workbook
|
|
|
|
:param file_name: path to the Excel workbook
|
|
:param excel_specs: path to the Excel workbook spec
|
|
"""
|
|
self.file_name = file_name
|
|
with open(excel_specs, "r") as f:
|
|
spec_raw_data = f.read()
|
|
self.excel_specs = yaml.safe_load(spec_raw_data)
|
|
# A combined design spec, returns a workbook object after combining
|
|
# all the inputs excel specs
|
|
combined_design_spec = self.load_excel_data(file_name)
|
|
self.wb_combined = combined_design_spec
|
|
self.spec = spec
|
|
|
|
self.loaded_spec = self.excel_specs['specs'][self.spec]
|
|
self.validate_sheet_names_with_spec()
|
|
self.loaded_data = self.extract_data_using_spec()
|
|
|
|
@staticmethod
|
|
def sanitize(string):
|
|
"""Remove extra spaces and convert string to lower case"""
|
|
|
|
return string.replace(" ", "").lower()
|
|
|
|
def compare(self, string1, string2):
|
|
"""Compare the strings"""
|
|
|
|
return bool(re.search(self.sanitize(string1), self.sanitize(string2)))
|
|
|
|
def _get_workbook(self, sheet_name, data=None):
|
|
sheet_name_to_use = sheet_name
|
|
if data and 'sheet_name' in data:
|
|
sheet_name_to_use = data['sheet_name']
|
|
|
|
workbook_object, extracted_sheetname = self.get_xl_obj_and_sheetname(
|
|
sheet_name_to_use)
|
|
if workbook_object is not None:
|
|
return workbook_object[extracted_sheetname]
|
|
else:
|
|
return self.wb_combined[sheet_name_to_use]
|
|
|
|
@staticmethod
|
|
def _check_sanitize_settings(data):
|
|
no_sanitize_keys = []
|
|
sanitize_default = True
|
|
if 'sanitize' in data and not data['sanitize']:
|
|
sanitize_default = False
|
|
if 'no_sanitize' in data:
|
|
no_sanitize_keys = data['no_sanitize']
|
|
return sanitize_default, no_sanitize_keys
|
|
|
|
def extract_data_points(self, data, sheet_name=None):
|
|
extracted_data = {}
|
|
ws = self._get_workbook(sheet_name, data)
|
|
|
|
sanitize_default, no_sanitize_keys = self._check_sanitize_settings(
|
|
data)
|
|
|
|
for key, coordinate in data['data'].items():
|
|
value = ws.cell(row=coordinate[0], column=coordinate[1]).value
|
|
if not sanitize_default or key in no_sanitize_keys:
|
|
extracted_data[key] = value
|
|
else:
|
|
extracted_data[key] = self.sanitize(value)
|
|
return extracted_data
|
|
|
|
def extract_data_series(self, data, sheet_name=None):
|
|
extracted_data = []
|
|
ws = self._get_workbook(sheet_name, data)
|
|
|
|
sanitize_default, no_sanitize_keys = self._check_sanitize_settings(
|
|
data)
|
|
|
|
for x in range(data['iter']['start'], data['iter']['end'] + 1):
|
|
data_dict = {}
|
|
for key, y in data['data'].items():
|
|
if data['iter']['index-type'] == 'row':
|
|
value = ws.cell(row=x, column=y).value
|
|
elif data['iter']['index-type'] == 'col':
|
|
value = ws.cell(row=y, column=x).value
|
|
else:
|
|
raise exceptions.InvalidSpec()
|
|
|
|
if value:
|
|
if not sanitize_default or key in no_sanitize_keys:
|
|
data_dict[key] = value
|
|
else:
|
|
data_dict[key] = self.sanitize(value)
|
|
if data_dict:
|
|
extracted_data.append(data_dict)
|
|
return extracted_data
|
|
|
|
def extract_data_using_spec(self, spec_dict=None, sheet_name=None):
|
|
if not spec_dict:
|
|
spec_dict = self.loaded_spec
|
|
|
|
extracted_data = {}
|
|
for name, data in spec_dict.items():
|
|
data_type = self.sanitize(data['type'])
|
|
if data_type == 'series':
|
|
extracted_data[name] = self.extract_data_series(
|
|
data, sheet_name)
|
|
elif data_type == 'point':
|
|
extracted_data[name] = self.extract_data_points(
|
|
data, sheet_name)
|
|
elif data_type == 'container':
|
|
sheet = None
|
|
if 'sheet_name' in data:
|
|
sheet = data['sheet_name']
|
|
extracted_data[name] = self.extract_data_using_spec(
|
|
data['data'], sheet or sheet_name)
|
|
return extracted_data
|
|
|
|
def get_ipmi_data(self):
|
|
"""Read IPMI data from the sheet"""
|
|
|
|
ipmi_data = {}
|
|
hosts = []
|
|
previous_server_gateway = None
|
|
for entry in self.loaded_data['ipmi']:
|
|
hostname = entry['hostname']
|
|
hosts.append(hostname)
|
|
ipmi_data[hostname] = deepcopy(entry)
|
|
ipmi_data[hostname].pop('hostname')
|
|
if "/" in ipmi_data[hostname]['ipmi_address']:
|
|
ipmi_data[hostname]['ipmi_address'] = ipmi_data[hostname][
|
|
'ipmi_address'].split("/")[0]
|
|
if ipmi_data[hostname]['ipmi_gateway']:
|
|
previous_server_gateway = ipmi_data[hostname]['ipmi_gateway']
|
|
else:
|
|
ipmi_data[hostname]['ipmi_gateway'] = previous_server_gateway
|
|
|
|
if not ipmi_data[hostname]['host_profile']:
|
|
raise exceptions.MissingData(
|
|
missing_data='host_profile', section='host %s' % hostname)
|
|
LOG.debug(
|
|
"ipmi data extracted from excel:\n{}".format(
|
|
pprint.pformat(ipmi_data)))
|
|
LOG.debug(
|
|
"host data extracted from excel:\n{}".format(
|
|
pprint.pformat(hosts)))
|
|
return [ipmi_data, hosts]
|
|
|
|
def get_private_vlan_data(self):
|
|
"""Get private vlan data from private IP sheet"""
|
|
|
|
vlan_data = {}
|
|
for entry in self.loaded_data['private_vlan']:
|
|
net_type = entry['net_type']
|
|
if net_type:
|
|
vlan = re.sub(r'\W+', '', entry['vlan']).lower()
|
|
vlan_data[vlan] = net_type
|
|
LOG.debug(
|
|
"vlan data extracted from excel:\n%s", pprint.pformat(vlan_data))
|
|
return vlan_data
|
|
|
|
def get_private_network_data(self):
|
|
"""Read network data from the private ip sheet"""
|
|
|
|
vlan_data = self.get_private_vlan_data()
|
|
network_data = {}
|
|
for entry in self.loaded_data['private_net']:
|
|
vlan = re.sub(r'\W+', '', entry['vlan']).lower()
|
|
network = entry['ip']
|
|
net_type = vlan_data[vlan]
|
|
if net_type not in network_data:
|
|
network_data[net_type] = {"vlan": vlan, "subnet": []}
|
|
network_data[net_type]["subnet"].append(network)
|
|
for network in network_data:
|
|
network_data[network]["is_common"] = True
|
|
return network_data
|
|
|
|
def get_public_network_data(self):
|
|
"""Read public network data from public ip data"""
|
|
|
|
oam_net = self.loaded_data['public']['oam']['ip']
|
|
if type(oam_net) is str:
|
|
oam_net = [oam_net]
|
|
|
|
network_data = {
|
|
"oam": {
|
|
'subnet': oam_net,
|
|
'vlan': re.sub(
|
|
r'\W+', '', self.loaded_data['public']['oam']['vlan'])
|
|
},
|
|
"ingress": self.loaded_data['public']['ingress']['ip'],
|
|
"oob": {
|
|
"subnet": []
|
|
}
|
|
}
|
|
|
|
for entry in self.loaded_data['public']['oob']:
|
|
oob_net = entry['ip']
|
|
if oob_net:
|
|
network_data["oob"]["subnet"].append(self.sanitize(oob_net))
|
|
LOG.debug(
|
|
"public network data extracted from\
|
|
excel:\n%s",
|
|
pprint.pformat(network_data),
|
|
)
|
|
return network_data
|
|
|
|
def get_site_info(self):
|
|
"""Read location, dns, ntp and ldap data"""
|
|
|
|
site_info = {}
|
|
dns_servers = self.loaded_data['site_info']['dns']
|
|
ntp_servers = self.loaded_data['site_info']['ntp']
|
|
if dns_servers is None:
|
|
raise exceptions.MissingData(
|
|
missing_data='dns servers', section='site_info')
|
|
dns_servers = list(filter(None, re.split(" |,|\n", dns_servers)))
|
|
ntp_servers = list(filter(None, re.split(" |,|\n", ntp_servers)))
|
|
site_info = {
|
|
"location": self.get_location_data(),
|
|
"dns": dns_servers,
|
|
"ntp": ntp_servers,
|
|
"domain": self.loaded_data['site_info']['domain'],
|
|
"ldap": {
|
|
"subdomain": self.loaded_data['site_info']['subdomain'],
|
|
"common_name": self.loaded_data['site_info']['global_group'],
|
|
"url": self.loaded_data['site_info']['ldap'],
|
|
},
|
|
}
|
|
LOG.debug(
|
|
"Site Info extracted from\
|
|
excel:\n%s",
|
|
pprint.pformat(site_info),
|
|
)
|
|
return site_info
|
|
|
|
def get_location_data(self):
|
|
"""Read location data from the site and zone sheet"""
|
|
return {
|
|
"corridor": self.loaded_data['location']['corridor'],
|
|
"name": self.loaded_data['location']['sitename'],
|
|
"state": self.loaded_data['location']['state'],
|
|
"country": self.loaded_data['location']['country'],
|
|
"physical_location": self.loaded_data['location']['clli'],
|
|
}
|
|
|
|
def validate_sheet_names_with_spec(self):
|
|
"""Checks is sheet name in spec file matches with excel file"""
|
|
|
|
sheet_name_list = []
|
|
for key, data in self.loaded_spec.items():
|
|
sheet_name_list.append(data['sheet_name'])
|
|
for sheet_name in sheet_name_list:
|
|
workbook_object, extracted_sheetname = (
|
|
self.get_xl_obj_and_sheetname(sheet_name))
|
|
if workbook_object is not None:
|
|
wb = workbook_object
|
|
sheet_name = extracted_sheetname
|
|
else:
|
|
wb = self.wb_combined
|
|
|
|
if sheet_name not in wb.sheetnames:
|
|
raise exceptions.ExcelSheetNotFound(sheet_name=sheet_name)
|
|
|
|
LOG.info("Sheet names in excel spec validated")
|
|
|
|
def get_data(self):
|
|
"""Create a dict with combined data"""
|
|
|
|
ipmi_data = self.get_ipmi_data()
|
|
network_data = self.get_private_network_data()
|
|
public_network_data = self.get_public_network_data()
|
|
site_info_data = self.get_site_info()
|
|
data = {
|
|
"ipmi_data": ipmi_data,
|
|
"network_data": {
|
|
"private": network_data,
|
|
"public": public_network_data,
|
|
},
|
|
"site_info": site_info_data,
|
|
}
|
|
LOG.debug(
|
|
"Location data extracted from\
|
|
excel:\n%s",
|
|
pprint.pformat(data),
|
|
)
|
|
return data
|
|
|
|
@staticmethod
|
|
def load_excel_data(filename):
|
|
"""Combines multiple excel or csv files to a single design spec"""
|
|
|
|
design_spec = Workbook()
|
|
if os.path.splitext(filename)[1] == '.csv':
|
|
loaded_workbook = Workbook()
|
|
ws = loaded_workbook.active
|
|
with open(filename) as f:
|
|
reader = csv.reader(f, delimiter=',')
|
|
for row in reader:
|
|
ws.append(row)
|
|
ws.title = os.path.splitext(os.path.basename(filename))[0]
|
|
else:
|
|
loaded_workbook = load_workbook(filename, data_only=True)
|
|
for names in loaded_workbook.sheetnames:
|
|
design_spec_worksheet = design_spec.create_sheet(names)
|
|
loaded_workbook_ws = loaded_workbook[names]
|
|
for row in loaded_workbook_ws:
|
|
for cell in row:
|
|
design_spec_worksheet[cell.coordinate].value = cell.value
|
|
return design_spec
|
|
|
|
@staticmethod
|
|
def get_xl_obj_and_sheetname(sheetname):
|
|
"""The logic confirms if the sheetname is specified for example as:
|
|
|
|
'MTN57a_AEC_Network_Design_v1.6.xlsx:Public IPs'
|
|
"""
|
|
file_type = os.path.splitext(sheetname.split(':')[0])[1]
|
|
if file_type == '.xlsx' or file_type == '.xls':
|
|
# Extract file name
|
|
source_xl_file = sheetname.split(":")[0]
|
|
wb = load_workbook(source_xl_file, data_only=True)
|
|
return [wb, sheetname.split(":")[1]]
|
|
elif file_type == ".csv":
|
|
source_csv_file = sheetname.split(":")[0]
|
|
wb = Workbook()
|
|
ws = wb.active
|
|
with open(source_csv_file) as f:
|
|
reader = csv.reader(f, delimiter=',')
|
|
for row in reader:
|
|
ws.append(row)
|
|
ws.title = os.path.splitext(os.path.basename(source_csv_file))[0]
|
|
return [wb, sheetname.split(":")[1]]
|
|
else:
|
|
return [None, sheetname]
|