spyglass-plugin-xls/spyglass_plugin_xls/excel_parser.py
Ryan Schroder a4c09b699f Allow excel plugin to take csv files as an input
Checks if file is csv and coverts csv data to a new workbook

Change-Id: I69f53268870f9293172ac3c50ab9553db0061352
2019-12-16 17:17:49 +00:00

366 lines
13 KiB
Python

# Copyright 2019 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from copy import deepcopy
import csv
import logging
import os
import pprint
import re
from openpyxl import load_workbook
from openpyxl import Workbook
import yaml
from spyglass_plugin_xls import exceptions
LOG = logging.getLogger(__name__)
class ExcelParser(object):
"""Parse data from excel into a dict"""
def __init__(
self, file_name: str, excel_specs: str, spec: str = 'xl_spec'):
"""Initializes an ExcelParser to extract data from the Excel workbook
:param file_name: path to the Excel workbook
:param excel_specs: path to the Excel workbook spec
"""
self.file_name = file_name
with open(excel_specs, "r") as f:
spec_raw_data = f.read()
self.excel_specs = yaml.safe_load(spec_raw_data)
# A combined design spec, returns a workbook object after combining
# all the inputs excel specs
combined_design_spec = self.load_excel_data(file_name)
self.wb_combined = combined_design_spec
self.spec = spec
self.loaded_spec = self.excel_specs['specs'][self.spec]
self.validate_sheet_names_with_spec()
self.loaded_data = self.extract_data_using_spec()
@staticmethod
def sanitize(string):
"""Remove extra spaces and convert string to lower case"""
return string.replace(" ", "").lower()
def compare(self, string1, string2):
"""Compare the strings"""
return bool(re.search(self.sanitize(string1), self.sanitize(string2)))
def _get_workbook(self, sheet_name, data=None):
sheet_name_to_use = sheet_name
if data and 'sheet_name' in data:
sheet_name_to_use = data['sheet_name']
workbook_object, extracted_sheetname = self.get_xl_obj_and_sheetname(
sheet_name_to_use)
if workbook_object is not None:
return workbook_object[extracted_sheetname]
else:
return self.wb_combined[sheet_name_to_use]
@staticmethod
def _check_sanitize_settings(data):
no_sanitize_keys = []
sanitize_default = True
if 'sanitize' in data and not data['sanitize']:
sanitize_default = False
if 'no_sanitize' in data:
no_sanitize_keys = data['no_sanitize']
return sanitize_default, no_sanitize_keys
def extract_data_points(self, data, sheet_name=None):
extracted_data = {}
ws = self._get_workbook(sheet_name, data)
sanitize_default, no_sanitize_keys = self._check_sanitize_settings(
data)
for key, coordinate in data['data'].items():
value = ws.cell(row=coordinate[0], column=coordinate[1]).value
if not sanitize_default or key in no_sanitize_keys:
extracted_data[key] = value
else:
extracted_data[key] = self.sanitize(value)
return extracted_data
def extract_data_series(self, data, sheet_name=None):
extracted_data = []
ws = self._get_workbook(sheet_name, data)
sanitize_default, no_sanitize_keys = self._check_sanitize_settings(
data)
for x in range(data['iter']['start'], data['iter']['end'] + 1):
data_dict = {}
for key, y in data['data'].items():
if data['iter']['index-type'] == 'row':
value = ws.cell(row=x, column=y).value
elif data['iter']['index-type'] == 'col':
value = ws.cell(row=y, column=x).value
else:
raise exceptions.InvalidSpec()
if value:
if not sanitize_default or key in no_sanitize_keys:
data_dict[key] = value
else:
data_dict[key] = self.sanitize(value)
if data_dict:
extracted_data.append(data_dict)
return extracted_data
def extract_data_using_spec(self, spec_dict=None, sheet_name=None):
if not spec_dict:
spec_dict = self.loaded_spec
extracted_data = {}
for name, data in spec_dict.items():
data_type = self.sanitize(data['type'])
if data_type == 'series':
extracted_data[name] = self.extract_data_series(
data, sheet_name)
elif data_type == 'point':
extracted_data[name] = self.extract_data_points(
data, sheet_name)
elif data_type == 'container':
sheet = None
if 'sheet_name' in data:
sheet = data['sheet_name']
extracted_data[name] = self.extract_data_using_spec(
data['data'], sheet or sheet_name)
return extracted_data
def get_ipmi_data(self):
"""Read IPMI data from the sheet"""
ipmi_data = {}
hosts = []
previous_server_gateway = None
for entry in self.loaded_data['ipmi']:
hostname = entry['hostname']
hosts.append(hostname)
ipmi_data[hostname] = deepcopy(entry)
ipmi_data[hostname].pop('hostname')
if "/" in ipmi_data[hostname]['ipmi_address']:
ipmi_data[hostname]['ipmi_address'] = ipmi_data[hostname][
'ipmi_address'].split("/")[0]
if ipmi_data[hostname]['ipmi_gateway']:
previous_server_gateway = ipmi_data[hostname]['ipmi_gateway']
else:
ipmi_data[hostname]['ipmi_gateway'] = previous_server_gateway
if not ipmi_data[hostname]['host_profile']:
raise exceptions.MissingData(
missing_data='host_profile', section='host %s' % hostname)
LOG.debug(
"ipmi data extracted from excel:\n{}".format(
pprint.pformat(ipmi_data)))
LOG.debug(
"host data extracted from excel:\n{}".format(
pprint.pformat(hosts)))
return [ipmi_data, hosts]
def get_private_vlan_data(self):
"""Get private vlan data from private IP sheet"""
vlan_data = {}
for entry in self.loaded_data['private_vlan']:
net_type = entry['net_type']
if net_type:
vlan = re.sub(r'\W+', '', entry['vlan']).lower()
vlan_data[vlan] = net_type
LOG.debug(
"vlan data extracted from excel:\n%s", pprint.pformat(vlan_data))
return vlan_data
def get_private_network_data(self):
"""Read network data from the private ip sheet"""
vlan_data = self.get_private_vlan_data()
network_data = {}
for entry in self.loaded_data['private_net']:
vlan = re.sub(r'\W+', '', entry['vlan']).lower()
network = entry['ip']
net_type = vlan_data[vlan]
if net_type not in network_data:
network_data[net_type] = {"vlan": vlan, "subnet": []}
network_data[net_type]["subnet"].append(network)
for network in network_data:
network_data[network]["is_common"] = True
return network_data
def get_public_network_data(self):
"""Read public network data from public ip data"""
oam_net = self.loaded_data['public']['oam']['ip']
if type(oam_net) is str:
oam_net = [oam_net]
network_data = {
"oam": {
'subnet': oam_net,
'vlan': re.sub(
r'\W+', '', self.loaded_data['public']['oam']['vlan'])
},
"ingress": self.loaded_data['public']['ingress']['ip'],
"oob": {
"subnet": []
}
}
for entry in self.loaded_data['public']['oob']:
oob_net = entry['ip']
if oob_net:
network_data["oob"]["subnet"].append(self.sanitize(oob_net))
LOG.debug(
"public network data extracted from\
excel:\n%s",
pprint.pformat(network_data),
)
return network_data
def get_site_info(self):
"""Read location, dns, ntp and ldap data"""
site_info = {}
dns_servers = self.loaded_data['site_info']['dns']
ntp_servers = self.loaded_data['site_info']['ntp']
if dns_servers is None:
raise exceptions.MissingData(
missing_data='dns servers', section='site_info')
dns_servers = list(filter(None, re.split(" |,|\n", dns_servers)))
ntp_servers = list(filter(None, re.split(" |,|\n", ntp_servers)))
site_info = {
"location": self.get_location_data(),
"dns": dns_servers,
"ntp": ntp_servers,
"domain": self.loaded_data['site_info']['domain'],
"ldap": {
"subdomain": self.loaded_data['site_info']['subdomain'],
"common_name": self.loaded_data['site_info']['global_group'],
"url": self.loaded_data['site_info']['ldap'],
},
}
LOG.debug(
"Site Info extracted from\
excel:\n%s",
pprint.pformat(site_info),
)
return site_info
def get_location_data(self):
"""Read location data from the site and zone sheet"""
return {
"corridor": self.loaded_data['location']['corridor'],
"name": self.loaded_data['location']['sitename'],
"state": self.loaded_data['location']['state'],
"country": self.loaded_data['location']['country'],
"physical_location": self.loaded_data['location']['clli'],
}
def validate_sheet_names_with_spec(self):
"""Checks is sheet name in spec file matches with excel file"""
sheet_name_list = []
for key, data in self.loaded_spec.items():
sheet_name_list.append(data['sheet_name'])
for sheet_name in sheet_name_list:
workbook_object, extracted_sheetname = (
self.get_xl_obj_and_sheetname(sheet_name))
if workbook_object is not None:
wb = workbook_object
sheet_name = extracted_sheetname
else:
wb = self.wb_combined
if sheet_name not in wb.sheetnames:
raise exceptions.ExcelSheetNotFound(sheet_name=sheet_name)
LOG.info("Sheet names in excel spec validated")
def get_data(self):
"""Create a dict with combined data"""
ipmi_data = self.get_ipmi_data()
network_data = self.get_private_network_data()
public_network_data = self.get_public_network_data()
site_info_data = self.get_site_info()
data = {
"ipmi_data": ipmi_data,
"network_data": {
"private": network_data,
"public": public_network_data,
},
"site_info": site_info_data,
}
LOG.debug(
"Location data extracted from\
excel:\n%s",
pprint.pformat(data),
)
return data
@staticmethod
def load_excel_data(filename):
"""Combines multiple excel or csv files to a single design spec"""
design_spec = Workbook()
if os.path.splitext(filename)[1] == '.csv':
loaded_workbook = Workbook()
ws = loaded_workbook.active
with open(filename) as f:
reader = csv.reader(f, delimiter=',')
for row in reader:
ws.append(row)
ws.title = os.path.splitext(os.path.basename(filename))[0]
else:
loaded_workbook = load_workbook(filename, data_only=True)
for names in loaded_workbook.sheetnames:
design_spec_worksheet = design_spec.create_sheet(names)
loaded_workbook_ws = loaded_workbook[names]
for row in loaded_workbook_ws:
for cell in row:
design_spec_worksheet[cell.coordinate].value = cell.value
return design_spec
@staticmethod
def get_xl_obj_and_sheetname(sheetname):
"""The logic confirms if the sheetname is specified for example as:
'MTN57a_AEC_Network_Design_v1.6.xlsx:Public IPs'
"""
file_type = os.path.splitext(sheetname.split(':')[0])[1]
if file_type == '.xlsx' or file_type == '.xls':
# Extract file name
source_xl_file = sheetname.split(":")[0]
wb = load_workbook(source_xl_file, data_only=True)
return [wb, sheetname.split(":")[1]]
elif file_type == ".csv":
source_csv_file = sheetname.split(":")[0]
wb = Workbook()
ws = wb.active
with open(source_csv_file) as f:
reader = csv.reader(f, delimiter=',')
for row in reader:
ws.append(row)
ws.title = os.path.splitext(os.path.basename(source_csv_file))[0]
return [wb, sheetname.split(":")[1]]
else:
return [None, sheetname]