[Tempest]: Adding barbican cases

Check HTTPS traffic with brabican secrets and containers
Check cert and key updated on edge node and on MP
Check end to end connecitivity using barbican

Change-Id: I79b1ebe8bf162ff00a8caa50224c73a25b858c4f
This commit is contained in:
Shubhamk Kadam 2018-11-06 13:14:07 +00:00
parent 21d0729bbb
commit 55c7bfbb71
9 changed files with 1072 additions and 99 deletions

View File

@ -93,3 +93,12 @@ PEER_ADDRESS = "172.24.4.12"
SITE_CONNECTION_STATE = 'True'
PSK = "secret"
CIDR = "22.0.9.0/24"
#BARBICAN
SECRET_TYPE = "opaque"
ALGORITHM = "aes"
PAYLOAD_CONTENT_TYPE = "text/plain"
MODE = "cbc"
BIT_LENGTH = 256
CERT_FILE = "/root/server.crt"
KEY_FILE = "/root/server.key"
CONTAINER_TYPE = "certificate"

View File

@ -210,3 +210,12 @@ DNSGroup = [
default='',
help="DNS Nameserver IP address"),
]
barbican_group = cfg.OptGroup(name='barbican',
title="barbican Configuration Options")
BarbicanGroup = [
cfg.StrOpt('barbican_user_id',
default='',
help="barbican user id"),
]

View File

@ -86,6 +86,13 @@ class FeatureManager(traffic_manager.IperfManager,
cls.members_client = members_client.get_client(cls.os_primary)
cls.health_monitors_client = \
health_monitors_client.get_client(cls.os_primary)
cls.load_balancers_admin_client = \
load_balancers_client.get_client(cls.os_admin)
cls.listeners_admin_client = listeners_client.get_client(cls.os_admin)
cls.pools_admin_client = pools_client.get_client(cls.os_admin)
cls.members_admin_client = members_client.get_client(cls.os_admin)
cls.health_monitors_admin_client = \
health_monitors_client.get_client(cls.os_admin)
cls.fwaas_v2_client = openstack_network_clients.FwaasV2Client(
net_client.auth_provider,
net_client.service,
@ -130,6 +137,19 @@ class FeatureManager(traffic_manager.IperfManager,
net_client.region,
net_client.endpoint_type,
**_params)
net_client.service = 'key-manager'
cls.secret_client = openstack_network_clients.SecretClient(
net_client.auth_provider,
net_client.service,
net_client.region,
net_client.endpoint_type,
**_params)
cls.container_client = openstack_network_clients.ContainerClient(
net_client.auth_provider,
net_client.service,
net_client.region,
net_client.endpoint_type,
**_params)
#
# FwaasV2 base class
@ -590,10 +610,15 @@ class FeatureManager(traffic_manager.IperfManager,
protocol_port=protocol_port)
self.wait_for_load_balancer_status(lb_id)
def check_lbaas_project_weight_values(self, count=2):
def check_lbaas_project_weight_values(self, count=2, HTTPS=None,
member_count=None,
barbican_http=None):
vip = self.vip_ip_address
time.sleep(constants.SLEEP_BETWEEN_VIRTUAL_SEREVRS_OPEARTIONS)
self.do_http_request(vip=vip, send_counts=self.poke_counters)
if HTTPS is None:
self.do_http_request(vip=vip, send_counts=self.poke_counters)
else:
self.do_https_request(vip=vip, send_counts=self.poke_counters)
# ROUND_ROUBIN, so equal counts
if CONF.nsxv3.ens:
vms = len(self.topology_servers.keys())
@ -602,6 +627,21 @@ class FeatureManager(traffic_manager.IperfManager,
"LB fails with weighted values")
else:
pass
elif barbican_http:
no_of_vms = len(self.http_cnt)
if no_of_vms:
if (self.http_cnt['server_lbaas_2'] <
(self.poke_counters / no_of_vms)):
self.assertGreater(self.http_cnt['server_lbaas_3'],
self.poke_counters / no_of_vms)
elif (self.http_cnt['server_lbaas_3'] >
(self.poke_counters / no_of_vms)):
self.assertLess(self.http_cnt['server_lbaas_3'],
self.poke_counters / no_of_vms)
else:
self.assertEqual(self.http_cnt['server_lbaas_3'],
self.poke_counters / no_of_vms,
"LB fails with weighted values")
else:
no_of_vms = len(self.http_cnt)
if no_of_vms:
@ -610,7 +650,7 @@ class FeatureManager(traffic_manager.IperfManager,
self.assertGreater(self.http_cnt['server_lbaas_1'],
self.poke_counters / no_of_vms)
elif (self.http_cnt['server_lbaas_0'] >
(self.poke_counters / no_of_vms)):
(self.poke_counters / no_of_vms)):
self.assertLess(self.http_cnt['server_lbaas_1'],
self.poke_counters / no_of_vms)
else:
@ -618,7 +658,7 @@ class FeatureManager(traffic_manager.IperfManager,
self.poke_counters / no_of_vms,
"LB fails with weighted values")
def check_project_lbaas(self, count=2):
def check_project_lbaas(self, count=2, HTTPS=None):
i = 0
time.sleep(constants.SLEEP_BETWEEN_VIRTUAL_SEREVRS_OPEARTIONS)
vip = self.vip_ip_address
@ -655,74 +695,193 @@ class FeatureManager(traffic_manager.IperfManager,
self.pools_client.update_pool(self.pool['id'],
lb_algorithm=algo)
def create_project_lbaas(self, protocol_type, protocol_port, lb_algorithm,
hm_type, member_count=2, max_vms=None,
weight=None, fip_disassociate=None):
def create_project_lbaas(self, protocol_type,
protocol_port, lb_algorithm,
hm_type, member_count=2,
max_vms=None, weight=None,
fip_disassociate=None, barbican=False,
pool_protocol=None, pool_port=None,
vip_subnet_id=None, barbican_container=None,
lb_id=None, count=None,
clean_up=None):
count = 0
vip_subnet_id = self.topology_subnets["subnet_lbaas_1"]['id']
lb_name = data_utils.rand_name(self.namestart)
self.loadbalancer = self.load_balancers_client.create_load_balancer(
name=lb_name, vip_subnet_id=vip_subnet_id)['loadbalancer']
lb_id = self.loadbalancer['id']
self.wait_for_load_balancer_status(lb_id)
lb_name = None
if vip_subnet_id is None:
vip_subnet_id = self.topology_subnets["subnet_lbaas_1"]['id']
if lb_id is None:
lb_name = data_utils.rand_name(self.namestart)
if barbican:
self.loadbalancer = self.\
load_balancers_admin_client.\
create_load_balancer(name=lb_name,
vip_subnet_id=vip_subnet_id
)['loadbalancer']
lb_id = self.loadbalancer['id']
self.addCleanup(
self.load_balancers_admin_client.delete_load_balancer,
self.loadbalancer['id'])
self.load_balancers_admin_client.wait_for_load_balancer_status(
lb_id)
else:
self.loadbalancer = self.\
load_balancers_client.\
create_load_balancer(name=lb_name,
vip_subnet_id=vip_subnet_id
)['loadbalancer']
lb_id = self.loadbalancer['id']
self.wait_for_load_balancer_status(lb_id)
self.listener = self.listeners_client.create_listener(
loadbalancer_id=lb_id, protocol=protocol_type,
protocol_port=protocol_port, name=lb_name)['listener']
self.wait_for_load_balancer_status(lb_id)
self.pool = self.pools_client.create_pool(
listener_id=self.listener['id'],
lb_algorithm=lb_algorithm, protocol=protocol_type,
name=lb_name)['pool']
self.wait_for_load_balancer_status(lb_id)
if barbican:
listener_name = data_utils.rand_name("tempest_lb")
self.listener = self.listeners_admin_client.\
create_listener(loadbalancer_id=lb_id, protocol=protocol_type,
protocol_port=protocol_port,
name=listener_name,
default_tls_container_ref=barbican_container
["container_ref"])['listener']
if clean_up is None:
self.addCleanup(
self.listeners_admin_client.delete_listener,
self.listener['id'])
self.load_balancers_admin_client.wait_for_load_balancer_status(
lb_id)
else:
self.listener = self.listeners_client.create_listener(
loadbalancer_id=lb_id, protocol=protocol_type,
protocol_port=protocol_port, name=lb_name)['listener']
self.wait_for_load_balancer_status(lb_id)
if barbican:
if lb_name is not None:
self.pool = self.pools_admin_client.create_pool(
listener_id=self.listener['id'],
lb_algorithm=lb_algorithm, protocol=pool_protocol,
name=lb_name)['pool']
else:
self.pool = self.pools_admin_client.create_pool(
listener_id=self.listener['id'],
lb_algorithm=lb_algorithm, protocol=pool_protocol,
name=lb_id)['pool']
self.load_balancers_admin_client.wait_for_load_balancer_status(
lb_id)
pool_id = self.pool['id']
if clean_up is None:
self.addCleanup(self.pools_admin_client.delete_pool, pool_id)
self.load_balancers_admin_client.wait_for_load_balancer_status(
lb_id)
else:
self.pool = self.pools_client.create_pool(
listener_id=self.listener['id'],
lb_algorithm=lb_algorithm, protocol=protocol_type,
name=lb_name)['pool']
self.wait_for_load_balancer_status(lb_id)
pool_id = self.pool['id']
self.healthmonitor = (
self.health_monitors_client.create_health_monitor(
pool_id=pool_id, type=hm_type,
delay=self.hm_delay, max_retries=self.hm_max_retries,
timeout=self.hm_timeout))
self.wait_for_load_balancer_status(lb_id)
if barbican:
self.healthmonitor = (
self.health_monitors_admin_client.create_health_monitor(
pool_id=pool_id, type=hm_type,
delay=self.hm_delay, max_retries=self.hm_max_retries,
timeout=self.hm_timeout))['healthmonitor']
self.load_balancers_admin_client.wait_for_load_balancer_status(
lb_id)
if clean_up is None:
self.addCleanup(
self.health_monitors_admin_client.delete_health_monitor,
self.healthmonitor['id'])
else:
self.healthmonitor = (
self.health_monitors_client.create_health_monitor(
pool_id=pool_id, type=hm_type,
delay=self.hm_delay, max_retries=self.hm_max_retries,
timeout=self.hm_timeout))
self.wait_for_load_balancer_status(lb_id)
self.members = []
for server_name in self.topology_servers.keys():
if count < member_count:
fip_data = self.servers_details[server_name].floating_ips[0]
fixed_ip_address = fip_data['fixed_ip_address']
if fip_disassociate is None:
self._disassociate_floating_ip(fip_data)
if barbican:
kwargs = dict(port_id=None)
self.cmgr_adm.floating_ips_client.\
update_floatingip(fip_data['id'],
**kwargs)['floatingip']
else:
self._disassociate_floating_ip(fip_data)
if weight:
weight += count
member = self.members_client.create_member(
pool_id, subnet_id=vip_subnet_id,
address=fixed_ip_address,
protocol_port=protocol_port,
weight=weight)
if barbican:
member = self.members_admin_client.create_member(
pool_id, subnet_id=vip_subnet_id,
address=fixed_ip_address,
protocol_port=pool_port,
weight=weight)['member']
else:
member = self.members_client.create_member(
pool_id, subnet_id=vip_subnet_id,
address=fixed_ip_address,
protocol_port=protocol_port,
weight=weight)
else:
member = self.members_client.create_member(
pool_id, subnet_id=vip_subnet_id,
address=fixed_ip_address,
protocol_port=protocol_port)
self.wait_for_load_balancer_status(lb_id)
if barbican:
member = self.members_admin_client.create_member(
pool_id, subnet_id=vip_subnet_id,
address=fixed_ip_address,
protocol_port=pool_port)['member']
else:
member = self.members_client.create_member(
pool_id, subnet_id=vip_subnet_id,
address=fixed_ip_address,
protocol_port=protocol_port)
if barbican:
self.load_balancers_admin_client.\
wait_for_load_balancer_status(lb_id)
else:
self.wait_for_load_balancer_status(lb_id)
if barbican:
if clean_up is None:
self.addCleanup(
self.members_admin_client.delete_member,
pool_id,
member['id'])
self.members.append(member)
self.server_names.append(server_name)
count += 1
else:
break
if not CONF.nsxv3.ens:
self.ports_client.update_port(
self.cmgr_adm.ports_client.update_port(
self.loadbalancer['vip_port_id'],
security_groups=[self.sg['id']])
# create lbaas public interface
vip_fip = \
self.create_floatingip(self.loadbalancer,
port_id=self.loadbalancer['vip_port_id'])
self.vip_ip_address = vip_fip['floating_ip_address']
pools = self.pools_client.show_pool(
self.pool['id'])
return dict(lb_id=lb_id, pool=pools,
vip_port=self.loadbalancer['vip_port_id'],
vip_ip=self.vip_ip_address)
if barbican:
if not hasattr(self, 'vip_ip_address'):
self.cmgr_adm.ports_client.update_port(
self.loadbalancer['vip_port_id'],
security_groups=[
self.sg['id']])
vip_fip = self.create_floatingip(
self.loadbalancer,
client=self.cmgr_adm.floating_ips_client,
port_id=self.loadbalancer['vip_port_id'])
self.vip_ip_address = vip_fip['floating_ip_address']
return dict(lb_id=lb_id,
vip_address=self.vip_ip_address,
pool_id=pool_id,
healthmonitor_id=self.healthmonitor['id'],
members=self.members,
listener_id=self.listener['id'])
else:
vip_fip = \
self.create_floatingip(self.loadbalancer,
port_id=self.loadbalancer['vip_port_id']
)
self.vip_ip_address = vip_fip['floating_ip_address']
pools = self.pools_client.show_pool(
self.pool['id'])
return dict(lb_id=lb_id, pool=pools,
vip_port=self.loadbalancer['vip_port_id'],
vip_ip=self.vip_ip_address)
def get_router_port(self, client):
"""List ports using admin creds """
@ -1041,3 +1200,92 @@ class FeatureManager(traffic_manager.IperfManager,
ptr_id = region + ":" + fip_id
body = self.ptr_client.show_ptr_record(ptr_id)
return body
def _get_uuid(self, href):
return href.split('/')[-1]
def create_barbican_secret(self, **kwargs):
"""
Create barbican secret
"""
result = self.secret_client.create_secret(**kwargs)
uuid = self._get_uuid(result['secret_ref'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.secret_client.delete_secret, uuid)
self.secret_client.add_acl_user_secret(
secret_id=uuid, user_id=CONF.barbican.barbican_user_id)
return result
def create_barbican_container(self, **kwargs):
"""
Create barbican secret container
"""
result = self.container_client.create_container(**kwargs)
uuid = self._get_uuid(result['container_ref'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.container_client.delete_container, uuid)
self.container_client.add_acl_user_containers(
secret_id=uuid, user_id=CONF.barbican.barbican_user_id)
return result
def create_barbican_secret_conatainer(self, cert_file, key_file):
"""
Create barbican secrets with provided
cert file and key file.
Create barbican secret container with
created secrets and return secrets and container
"""
cert_file = open(cert_file, "r")
cert_content = cert_file.read()
secret_name1 = data_utils.rand_name(name='tempest-cert-secret')
kwargs = {"secret_type": 'passphrase',
"algorithm": constants.ALGORITHM,
"payload_content_type": constants.PAYLOAD_CONTENT_TYPE,
"mode": constants.MODE,
"bit_length": constants.BIT_LENGTH,
"payload": cert_content,
"name": secret_name1}
barbican_secret1 = self.create_barbican_secret(**kwargs)
cert_file = open(key_file, "r")
cert_content = cert_file.read()
secret_name2 = data_utils.rand_name(name='tempest-key-secret')
kwargs = {"secret_type": 'passphrase',
"algorithm": constants.ALGORITHM,
"payload_content_type": constants.PAYLOAD_CONTENT_TYPE,
"mode": constants.MODE,
"bit_length": constants.BIT_LENGTH,
"payload": cert_content,
"name": secret_name2}
barbican_secret2 = self.create_barbican_secret(**kwargs)
container_name = data_utils.rand_name(name='tempest-container')
kwargs = {"type": constants.CONTAINER_TYPE,
"name": container_name,
"secret_refs": [{"secret_ref": barbican_secret1
['secret_ref'],
"name": 'certificate'},
{"secret_ref": barbican_secret2
['secret_ref'],
"name": 'private_key'}]}
barbican_container = self.create_barbican_container(**kwargs)
secret_container_dict = dict(secret_1=barbican_secret1,
secret_2=barbican_secret2,
secret_container=barbican_container)
return secret_container_dict
def check_certificate_at_backend(self, should_present=True,
cert_conent=None):
"""
Check barbican certificate at backend
"""
# check certificate at backend
time.sleep(constants.NSX_BACKEND_VERY_SMALL_TIME_INTERVAL)
# nsx api call to get certificates from backend
certs = self.nsx.get_certificates()
Present = "False"
for cert in certs:
if cert['pem_encoded'] == cert_conent:
Present = "True"
if should_present:
self.assertIn(Present, "True")
else:
self.assertIn(Present, "False")

View File

@ -176,9 +176,12 @@ class TrafficManager(appliance_manager.ApplianceManager):
stderr)
return stdout
def query_webserver(self, web_ip):
def query_webserver(self, web_ip, HTTPS=None):
try:
url_path = "http://{0}/".format(web_ip)
if HTTPS is None:
url_path = "http://{0}/".format(web_ip)
else:
url_path = "https://{0}/".format(web_ip)
# lbaas servers use nc, might be slower to response
http = urllib3.PoolManager(retries=10)
resp = http.request('GET', url_path)
@ -206,6 +209,19 @@ class TrafficManager(appliance_manager.ApplianceManager):
# count_response counts the no of requests made for each members
return self.http_cnt
def do_https_request(self, vip, start_path='', send_counts=None):
# http_cnt stores no of requests made for each members
self.http_cnt = {}
if not CONF.nsxv3.ens:
for x in range(send_counts):
resp = self.query_webserver(vip, HTTPS=True)
self.count_response(resp)
else:
for x in range(send_counts):
self.http_cnt = self.query_ens(vip, HTTPS=True)
# count_response counts the no of requests made for each members
return self.http_cnt
def start_web_server(self, protocol_port, server, server_name=None):
"""start server's web service which return its server_name."""
fip_data = server.get('floating_ips')[0]

View File

@ -28,7 +28,8 @@ _opts = [
(config_nsx.nsxv_group, config_nsx.NSXvGroup),
(config_nsx.l2gw_group, config_nsx.L2gwGroup),
(config_nsx.nsxv3_group, config_nsx.NSXv3Group),
(config_nsx.dns_group, config_nsx.DNSGroup)
(config_nsx.dns_group, config_nsx.DNSGroup),
(config_nsx.barbican_group, config_nsx.BarbicanGroup)
]

View File

@ -624,3 +624,35 @@ class NSXV3Client(object):
"""
response = self.delete(endpoint=endpoint)
return response.json()
def get_loadbalancers(self):
"""
Get Loadbalancers
:return: returns list of load balancers information.
"""
return self.get_logical_resources("/loadbalancer/services")
def get_certificates(self):
"""
Get Certificates
:return: returns list of certificates information.
"""
return self.get_logical_resources("/trust-management/certificates")
def get_virtual_server(self):
"""
Get virtual_server
:return: returns list of virtual servers information.
"""
return self.get_logical_resources("/loadbalancer/virtual-servers")
def get_monitors(self):
"""
Get health monitors
:return: returns list of health monitors information.
"""
return self.get_logical_resources("/loadbalancer/monitors")

View File

@ -13,6 +13,12 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
import six
from tempest.lib.common import rest_client
from tempest.lib.common.utils import data_utils
from oslo_log import log
from tempest import config
@ -497,3 +503,120 @@ class DesignatePtrClient(designate_base.DnsClientBase):
"""
resp, body = self._update_request(self.path, ptr_id, ptrd)
return resp, body
class SecretClient(rest_client.RestClient):
"""
Request resources via API for BarbicanContainerClient
Barbican Container create request
Barbican Container update request
Barbican Container show request
Barbican Container delete request
Barbican Container list all request
Add acl user rule to Barbican Container
"""
def create_secret(self, **kwargs):
if 'name' not in kwargs:
kwargs['name'] = data_utils.rand_name("tempest-sec")
if 'payload' in kwargs and type(kwargs['payload']) is six.binary_type:
kwargs['payload'] = kwargs['payload'].decode('utf-8')
post_body = kwargs
body = json.dumps(post_body)
resp, body = self.post("v1/secrets", body)
self.expected_success(201, resp.status)
return self._parse_resp(body)
def delete_secret(self, secret_id):
resp, body = self.delete("v1/secrets/%s" % secret_id)
self.expected_success(204, resp.status)
return body
def list_secrets(self, **kwargs):
uri = "v1/secrets"
if kwargs is not None:
uri = '{base}?'.format(base=uri)
for key in kwargs.keys():
uri = '{base}&{name}={value}'.format(
base=uri,
name=key,
value=kwargs[key]
)
resp, body = self.get(uri)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def put_secret_payload(self, secret_id, payload):
content_headers = {
"Content-Type": "application/octet-stream",
"Content-Encoding": "base64"
}
resp, body = self.put("v1/secrets/%s" % secret_id,
payload,
headers=content_headers)
self.expected_success(204, resp.status)
return body
def add_acl_user_secret(self, secret_id, user_id):
kwargs = {"read":
{"project-access": True,
"users": [user_id]
}
}
resp, body = self.put("v1/secrets/%s/acl" % secret_id,
json.dumps(kwargs))
self.expected_success(200, resp.status)
return body
class ContainerClient(rest_client.RestClient):
"""
Request resources via API for BarbicanContainerClient
Barbican Container create request
Barbican Container update request
Barbican Container show request
Barbican Container delete request
Barbican Container list all request
Add acl user rule to Barbican Container
"""
def list_containers(self):
uri = "v1/containers"
response, body = self.get(uri)
self.expected_success(200, response.status)
return json.loads(body.decode("utf-8"))
def get_container(self, container_id):
uri = "v1/containers/%s" % container_id
response, body = self.get(uri)
self.expected_success(200, response.status)
return json.loads(body.decode("utf-8"))
def create_container(self, **kwargs):
uri = "v1/containers"
response, body = self.post(uri, json.dumps(kwargs))
self.expected_success(201, response.status)
return json.loads(body.decode("utf-8"))
def delete_container(self, container_id):
uri = "v1/containers/%s" % container_id
response, _ = self.delete(uri)
self.expected_success(204, response.status)
return
def add_acl_user_containers(self, secret_id, user_id):
kwargs = {"read":
{"project-access": True,
"users": [user_id]
}
}
resp, body = self.put("v1/containers/%s/acl" % secret_id,
json.dumps(kwargs))
self.expected_success(200, resp.status)
return body

View File

@ -0,0 +1,144 @@
# Copyright 2019 VMware Inc
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
from oslo_utils import uuidutils
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions
from vmware_nsx_tempest_plugin.common import constants
from vmware_nsx_tempest_plugin.lib import feature_manager
from vmware_nsx_tempest_plugin.services import nsxv3_client
from vmware_nsx_tempest_plugin.services import nsxv_client
CONF = config.CONF
class TestBarbican(feature_manager.FeatureManager):
"""Test Barbican APU cases
"""
@classmethod
def setup_clients(cls):
super(TestBarbican, cls).setup_clients()
cls.cmgr_adm = cls.get_client_manager('admin')
cls.cmgr_alt = cls.get_client_manager('alt')
cls.cmgr_adm = cls.get_client_manager('admin')
cls.routers_client = cls.cmgr_adm.routers_client
cls.networks_client = cls.cmgr_adm.networks_client
cls.subnets_client = cls.cmgr_adm.subnets_client
cls.sec_rule_client = cls.cmgr_adm.security_group_rules_client
cls.sec_client = cls.cmgr_adm.security_groups_client
@classmethod
def resource_setup(cls):
super(TestBarbican, cls).resource_setup()
if CONF.network.backend == "nsxv3":
cls.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
elif CONF.network.backend == "nsxv":
manager_ip = re.search(r"(\d{1,3}\.){3}\d{1,3}",
CONF.nsxv.manager_uri).group(0)
cls.vsm = nsxv_client.VSMClient(
manager_ip, CONF.nsxv.user, CONF.nsxv.password)
cls.namestart = 'lbaas-ops'
cls.poke_counters = 12
cls.hm_delay = 4
cls.hm_max_retries = 3
cls.hm_timeout = 10
cls.server_names = []
cls.loadbalancer = None
cls.vip_fip = None
cls.web_service_start_delay = 2.5
@decorators.idempotent_id('2e13d4bb-54de-463a-a358-0fb9a221d8f3')
def test_barbican_multiple_secret_and_container(self):
"""
Create multiple barbican secets and container
"""
for i in range(1, 51):
self.create_barbican_secret_conatainer(
constants.CERT_FILE, constants.KEY_FILE)
@decorators.idempotent_id('7d46a170-6b3b-4f4d-903a-b29aebb93289')
def test_barbican_secret_update(self):
"""
Update barbican secret
"""
cert_file = open(constants.CERT_FILE, "r")
cert_content = cert_file.read()
secret_name1 = data_utils.rand_name(name='tempest-cert-secret')
kwargs = {"secret_type": constants.SECRET_TYPE,
"name": secret_name1}
barbican_secret1 = self.create_barbican_secret(**kwargs)
uuid = self._get_uuid(barbican_secret1['secret_ref'])
self.secret_client.put_secret_payload(uuid, cert_content)
@decorators.idempotent_id('2b0c1707-afc3-4674-a6c6-4dc42f318117')
def test_barbican_secret_create_with_octet_stream(self):
"""
Create barbican secret with octet stream
"""
cert_file = open(constants.CERT_FILE, "r")
cert_content = cert_file.read()
secret_name1 = data_utils.rand_name(name='tempest-cert-secret')
kwargs = {"secret_type": "symmetric",
"algorithm": "binary",
"payload_content_type": "application/octet-stream",
"payload_content_encoding": "base64",
"mode": constants.MODE,
"bit_length": constants.BIT_LENGTH,
"payload": cert_content,
"name": secret_name1}
self.create_barbican_secret(**kwargs)
@decorators.idempotent_id('c5caa619-1e43-4724-8d94-a61ff7025a07')
def test_barbican_delete_secret_container_with_invalid_uuid(self):
"""
Delete barbican conrainer with
invalid barbican container id
"""
secert_id = uuidutils.generate_uuid()
self.assertRaises(exceptions.NotFound,
self.secret_client.delete_secret,
secert_id)
container_id = uuidutils.generate_uuid()
self.assertRaises(exceptions.NotFound,
self.container_client.delete_container,
container_id)
@decorators.idempotent_id('9aee2ad3-5b61-4451-8ccc-a727bbe4618a')
def test_barbican_secret_create_with_wrong_bit_length(self):
"""
Create barbican secret with wrong bit length
"""
cert_file = open(constants.CERT_FILE, "r")
cert_content = cert_file.read()
secret_name1 = data_utils.rand_name(name='tempest-cert-secret')
kwargs = {"secret_type": constants.SECRET_TYPE,
"algorithm": constants.ALGORITHM,
"payload_content_type": constants.PAYLOAD_CONTENT_TYPE,
"mode": constants.MODE,
"bit_length": 6382372,
"payload": cert_content,
"name": secret_name1}
self.assertRaises(exceptions.BadRequest,
self.create_barbican_secret, **kwargs
)

View File

@ -16,6 +16,7 @@ import re
import testtools
import time
from oslo_utils import uuidutils
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
@ -63,6 +64,16 @@ class TestNewCase(feature_manager.FeatureManager):
cls.vsm = nsxv_client.VSMClient(
manager_ip, CONF.nsxv.user, CONF.nsxv.password)
cls.namestart = 'lbaas-ops'
cls.poke_counters = 12
cls.hm_delay = 4
cls.hm_max_retries = 3
cls.hm_timeout = 10
cls.server_names = []
cls.loadbalancer = None
cls.vip_fip = None
cls.web_service_start_delay = 2.5
def create_topo_single_network(self, namestart, create_instance=True,
set_gateway=True, **kwargs):
"""
@ -114,19 +125,17 @@ class TestNewCase(feature_manager.FeatureManager):
network_state1 = self.create_topology_network(network_name1)
network_state2 = self.create_topology_network(network_name2)
subnet_state1 = self.create_topology_subnet(
subnet_name1,
network_state1,
router_id=router_state["id"])
subnet_state2 = self.create_topology_subnet(
subnet_name2,
network_state2,
router_id=router_state2["id"],
cidr=constants.CIDR)
subnet_name1, network_state1, router_id=router_state["id"])
subnet_state2 = self.create_topology_subnet(subnet_name2,
network_state2,
router_id=router_state2["\
id"],
cidr=constants.CIDR)
if create_instance:
self.create_topology_instance(
"server1", [network_state1])
self.create_topology_instance(
"server2", [network_state2])
self.create_topology_instance("server1",
[network_state1])
self.create_topology_instance("server2",
[network_state2])
topology_dict = dict(router_state=router_state,
router_state2=router_state2,
network_state1=network_state1,
@ -176,6 +185,30 @@ class TestNewCase(feature_manager.FeatureManager):
ping_cmd = "ping -c 1 %s " % fip
self.exec_cmd_on_server_using_fip(ping_cmd, ssh_client=client)
def create_single_network_topo_for_barbican(self, no_of_servers=None):
topology_dict = self.create_topo_single_network(
"test_secret", create_instance=False)
self.network_state = topology_dict['network_state']
self.subnet_state = topology_dict['subnet_state']
sec_rule_client = self.sec_rule_client
sec_client = self.sec_client
kwargs = dict(tenant_id=self.network_state['tenant_id'],
security_group_rules_client=sec_rule_client,
security_groups_client=sec_client)
self.sg = self.create_topology_security_group(**kwargs)
lbaas_rules = [dict(direction='ingress', protocol='tcp',
port_range_min=constants.HTTP_PORT,
port_range_max=constants.HTTP_PORT, ),
dict(direction='ingress', protocol='tcp',
port_range_min=443, port_range_max=443, )]
for rule in lbaas_rules:
self.add_security_group_rule(
self.sg,
rule,
ruleclient=sec_rule_client,
secclient=sec_client,
tenant_id=self.network_state['tenant_id'])
@decorators.idempotent_id('1206127a-91cc-8905-b217-98844caa35b2')
def test_router_interface_port_update(self):
"""
@ -731,15 +764,15 @@ class TestNewCase(feature_manager.FeatureManager):
router_name = data_utils.rand_name(name='tempest-router')
net_name = data_utils.rand_name(name='tempest-net')
vlanid = int(CONF.nsxv.provider_vlan_id)
router = self.create_topology_router(
router_name,
routers_client=self.routers_client)
router = self.\
create_topology_router(router_name,
routers_client=self.routers_client)
body = {"provider:network_type": constants.VLAN_TYPE,
"admin_state_up": 'True', "provider:segmentation_id": vlanid}
network = self.create_topology_network(
net_name,
networks_client=self.networks_client,
**body)
network = self.\
create_topology_network(net_name,
networks_client=self.networks_client,
**body)
subnet_name = network['name'] + 'sub'
self.create_topology_subnet(subnet_name, network,
routers_client=self.routers_client,
@ -749,14 +782,16 @@ class TestNewCase(feature_manager.FeatureManager):
security_group_rules_client=self.sec_rule_client,
security_groups_client=self.sec_client)
self.sg = self.create_topology_security_group(**kwargs)
vm1 = self.create_topology_instance(
"server1", [network],
security_groups=[{'name': self.sg['name']}],
clients=self.cmgr_adm)
vm2 = self.create_topology_instance(
"server2", [network],
security_groups=[{'name': self.sg['name']}],
clients=self.cmgr_adm)
vm1 = self.\
create_topology_instance("server1", [network],
security_groups=[
{'name': self.sg['name']}],
clients=self.cmgr_adm)
vm2 = self.\
create_topology_instance("server2", [network],
security_groups=[
{'name': self.sg['name']}],
clients=self.cmgr_adm)
ip_address = vm1['floating_ips'][0]['floating_ip_address']
ssh_source = self._get_remote_client(ip_address, use_password=True)
remote_ip = vm2.values()[1].values()[0][0]['addr']
@ -774,15 +809,15 @@ class TestNewCase(feature_manager.FeatureManager):
"""
router_name = data_utils.rand_name(name='tempest-rtr')
net_name = data_utils.rand_name(name='tempest-net')
router = self.create_topology_router(
router_name,
routers_client=self.routers_client)
router = self.\
create_topology_router(router_name,
routers_client=self.routers_client)
body = {"provider:network_type": 'vxlan',
"admin_state_up": 'True'}
network = self.create_topology_network(
net_name,
networks_client=self.networks_client,
**body)
network = self.\
create_topology_network(net_name,
networks_client=self.networks_client,
**body)
# Verify default physical network is vdnscope, not dvs
self.assertIn('vdnscope', network['provider:physical_network'])
subnet_name = network['name'] + 'sub'
@ -794,14 +829,16 @@ class TestNewCase(feature_manager.FeatureManager):
security_group_rules_client=self.sec_rule_client,
security_groups_client=self.sec_client)
self.sg = self.create_topology_security_group(**kwargs)
vm1 = self.create_topology_instance(
"server1", [network],
security_groups=[{'name': self.sg['name']}],
clients=self.cmgr_adm)
vm2 = self.create_topology_instance(
"server2", [network],
security_groups=[{'name': self.sg['name']}],
clients=self.cmgr_adm)
vm1 = self.\
create_topology_instance("server1", [network],
security_groups=[
{'name': self.sg['name']}],
clients=self.cmgr_adm)
vm2 = self.\
create_topology_instance("server2", [network],
security_groups=[
{'name': self.sg['name']}],
clients=self.cmgr_adm)
ip_address = vm1['floating_ips'][0]['floating_ip_address']
ssh_source = self._get_remote_client(ip_address, use_password=True)
remote_ip = vm2.values()[1].values()[0][0]['addr']
@ -821,10 +858,10 @@ class TestNewCase(feature_manager.FeatureManager):
"""
kwargs = {"distributed": "true",
"admin_state_up": "True"}
topology_dict = self.create_topo_two_routers_two_networks(
create_instance=False,
set_gateway=True,
**kwargs)
topology_dict = self.\
create_topo_two_routers_two_networks(create_instance=False,
set_gateway=True,
**kwargs)
router_id1 = topology_dict['router_state']['id']
router_id2 = topology_dict['router_state2']['id']
# Create Firewall1 and add it to the router1's interface
@ -864,3 +901,357 @@ class TestNewCase(feature_manager.FeatureManager):
time.sleep(constants.NSX_BACKEND_TIME_INTERVAL)
firewall_info = self.show_fw_v1(firewall_2['firewall']['id'])
self.assertIn("ACTIVE", firewall_info['firewall']['status'])
@decorators.idempotent_id('f0603dfd-8b2c-44e2-8b0f-d65c87aab257')
def test_lbaas_https_traffic_with_barbican_secrets(self):
"""
Create lbaas, in it create listener
with barbican certificate and with terminated
https protocol and check https
traffic should work
"""
barbican_secrets = self.create_barbican_secret_conatainer(
constants.CERT_FILE, constants.KEY_FILE)
barbican_container = barbican_secrets['secret_container']
self.create_single_network_topo_for_barbican()
no_of_servers = 2
image_id = self.get_glance_image_id(["cirros", "esx"])
for instance in range(0, no_of_servers):
self.create_topology_instance(
"server_lbaas_%s" % instance, [self.network_state],
security_groups=[{'name': self.sg['name']}],
image_id=image_id, clients=self.cmgr_adm)
self.start_web_servers(constants.HTTP_PORT)
self.create_project_lbaas(
protocol_type="TERMINATED_HTTPS",
protocol_port="443",
lb_algorithm="ROUND_ROBIN",
hm_type='HTTP',
member_count=2,
weight=5,
pool_protocol='HTTP',
pool_port='80',
vip_subnet_id=self.subnet_state['id'],
barbican_container=barbican_container,
count=0, barbican=True)
self.check_lbaas_project_weight_values(HTTPS=True)
@decorators.idempotent_id('74f022d6-a6ef-4458-96b7-541deadacf99')
def test_lbaas_http_https_traffic_with_barbican_secrets(self):
"""
Create lbaas, in it create listener
with barbican certificate and with terminated
https protocol and check https
traffic should work.
With same lb create http listener
with barbican certificate and check
http traffic should work.
"""
barbican_secrets = self.create_barbican_secret_conatainer(
constants.CERT_FILE, constants.KEY_FILE)
barbican_container = barbican_secrets['secret_container']
self.create_single_network_topo_for_barbican()
no_of_servers = 2
image_id = self.get_glance_image_id(["cirros", "esx"])
for instance in range(0, no_of_servers):
self.create_topology_instance(
"server_lbaas_%s" % instance, [self.network_state],
security_groups=[{'name': self.sg['name']}],
image_id=image_id, clients=self.cmgr_adm)
self.start_web_servers(constants.HTTP_PORT)
topo_dict = self.create_project_lbaas(
protocol_type="TERMINATED_HTTPS",
protocol_port="443",
lb_algorithm="ROUND_ROBIN",
hm_type='HTTP',
member_count=2,
weight=5,
pool_protocol='HTTP',
pool_port='80',
vip_subnet_id=self.subnet_state['id'],
barbican_container=barbican_container,
count=0, barbican=True)
self.check_lbaas_project_weight_values(HTTPS=True)
no_of_servers = 4
self.topology_servers = {}
for instance in range(2, no_of_servers):
self.create_topology_instance(
"server_lbaas_%s" % instance, [self.network_state],
security_groups=[{'name': self.sg['name']}],
image_id=image_id, clients=self.cmgr_adm)
self.start_web_servers(constants.HTTP_PORT)
topo_dict = self.create_project_lbaas(
protocol_type="HTTP",
protocol_port="80",
lb_algorithm="ROUND_ROBIN",
hm_type='HTTP',
member_count=4,
weight=5,
pool_protocol='HTTP',
pool_port='80',
vip_subnet_id=self.subnet_state['id'],
barbican_container=barbican_container,
lb_id=topo_dict['lb_id'],
count=2, barbican=True)
self.check_lbaas_project_weight_values(barbican_http=True)
@decorators.idempotent_id('4343df3c-5553-40ea-8705-0cce73b297a9')
def test_barbican_multiple_listeners_with_secrets(self):
"""
Create multiple terminated https protocol
based listener with same loadbalancer
"""
barbican_secrets = self.create_barbican_secret_conatainer(
constants.CERT_FILE, constants.KEY_FILE)
barbican_container = barbican_secrets['secret_container']
self.create_single_network_topo_for_barbican()
protocol_type = "TERMINATED_HTTPS"
protocol_port = 443
vip_subnet_id = self.subnet_state['id']
lb_name = data_utils.rand_name("tempest_lb")
self.loadbalancer = self.load_balancers_admin_client.\
create_load_balancer(name=lb_name,
vip_subnet_id=vip_subnet_id
)['loadbalancer']
lb_id = self.loadbalancer['id']
self.addCleanup(
self.load_balancers_admin_client.delete_load_balancer,
lb_id)
self.load_balancers_admin_client.wait_for_load_balancer_status(lb_id)
for i in range(1, 20):
listener_name = data_utils.rand_name("tempest_lb")
self.listener = self.listeners_admin_client.create_listener(
loadbalancer_id=lb_id,
protocol=protocol_type,
protocol_port=protocol_port,
name=listener_name,
default_tls_container_ref=barbican_container
["container_ref"])['listener']
self.addCleanup(
self.listeners_admin_client.delete_listener,
self.listener['id'])
self.load_balancers_admin_client.wait_for_load_balancer_status(
lb_id)
protocol_port = protocol_port + 1
@decorators.idempotent_id('afe720b9-8b35-4a3c-8ff3-15841c2d3148')
def test_barbican_create_listener_with_empty_secrets(self):
"""
Try to create listener with terminated https
protocol and empty secret , it should fail.
"""
secret_name1 = data_utils.rand_name(name='tempest-cert-secret')
kwargs = {"secret_type": constants.SECRET_TYPE,
"algorithm": constants.ALGORITHM,
"name": secret_name1}
barbican_secret1 = self.create_barbican_secret(**kwargs)
secret_name2 = data_utils.rand_name(name='tempest-key-secret')
kwargs = {"secret_type": constants.SECRET_TYPE,
"algorithm": constants.ALGORITHM,
"name": secret_name2}
barbican_secret2 = self.create_barbican_secret(**kwargs)
container_name = data_utils.rand_name(name='tempest-container')
kwargs = {"type": constants.CONTAINER_TYPE,
"name": container_name,
"secret_refs": [{"secret_ref":
barbican_secret1['secret_ref'],
"name": 'certificate'},
{"secret_ref":
barbican_secret2['secret_ref'],
"name": 'private_key'}]}
barbican_container = self.create_barbican_container(**kwargs)
self.create_single_network_topo_for_barbican()
protocol_type = "TERMINATED_HTTPS"
protocol_port = 443
vip_subnet_id = self.subnet_state['id']
lb_name = data_utils.rand_name("tempest_lb")
self.loadbalancer = self.load_balancers_admin_client.\
create_load_balancer(name=lb_name,
vip_subnet_id=vip_subnet_id)['loadbalancer']
lb_id = self.loadbalancer['id']
self.addCleanup(
self.load_balancers_admin_client.delete_load_balancer,
lb_id)
self.load_balancers_admin_client.wait_for_load_balancer_status(lb_id)
listener_name = data_utils.rand_name("tempest_lb")
self.assertRaises(
exceptions.ServerFault,
self.listeners_admin_client.create_listener,
loadbalancer_id=lb_id,
protocol=protocol_type,
protocol_port=protocol_port,
name=listener_name,
default_tls_container_ref=barbican_container["container_ref"])
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('47ebc42b-0e53-4060-b1a1-55bee2c7c43f')
def test_barbican_check_certificate_at_backend(self):
"""
Create lbaas , listener with terminated https
protocol and barbican secret.
Check barbican certificate at backend using
nsx-manager api.
"""
barbican_secrets = self.create_barbican_secret_conatainer(
constants.CERT_FILE, constants.KEY_FILE)
barbican_container = barbican_secrets['secret_container']
self.create_single_network_topo_for_barbican()
no_of_servers = 2
image_id = self.get_glance_image_id(["cirros", "esx"])
for instance in range(0, no_of_servers):
self.create_topology_instance(
"server_lbaas_%s" % instance, [self.network_state],
security_groups=[{'name': self.sg['name']}],
image_id=image_id, clients=self.cmgr_adm)
self.start_web_servers(constants.HTTP_PORT)
self.create_project_lbaas(
protocol_type="TERMINATED_HTTPS",
protocol_port="443",
lb_algorithm="ROUND_ROBIN",
hm_type='HTTP',
member_count=2,
weight=5,
pool_protocol='HTTP',
pool_port='80',
vip_subnet_id=self.subnet_state['id'],
barbican_container=barbican_container,
count=0, barbican=True)
cert_file = open(constants.CERT_FILE, "r")
cert_content = cert_file.read()
self.check_certificate_at_backend(cert_conent=cert_content.rstrip())
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('af10a78d-b1f8-440d-8b89-639861f16fd0')
def test_barbican_remove_listener_check_certificate_at_backend(self):
"""
Create lbaas , listener with terminated https
protocol and barbican secret.
Check barbican certificate at backend using
nsx-manager api.
Create one more listener with same barbican secret
and with terminated https protocol with another port,
Check certificate at backend it should present.
Delete one of them listener and check certificate at backend
it should present.
Delete last listener and check certificate at backend
it should not present.
"""
barbican_secrets = self.create_barbican_secret_conatainer(
constants.CERT_FILE, constants.KEY_FILE)
barbican_container = barbican_secrets['secret_container']
self.create_single_network_topo_for_barbican()
no_of_servers = 2
image_id = self.get_glance_image_id(["cirros", "esx"])
for instance in range(0, no_of_servers):
self.create_topology_instance(
"server_lbaas_%s" % instance, [self.network_state],
security_groups=[{'name': self.sg['name']}],
image_id=image_id, clients=self.cmgr_adm)
self.start_web_servers(constants.HTTP_PORT)
topo_dict = self.create_project_lbaas(
protocol_type="TERMINATED_HTTPS",
protocol_port="443",
lb_algorithm="ROUND_ROBIN",
hm_type='HTTP',
member_count=2,
weight=5,
pool_protocol='HTTP',
pool_port='80',
vip_subnet_id=self.subnet_state['id'],
barbican_container=barbican_container,
count=0,
clean_up=False, barbican=True)
cert_file = open(constants.CERT_FILE, "r")
cert_content = cert_file.read()
cert_content = cert_content.rstrip()
self.check_certificate_at_backend(cert_conent=cert_content)
no_of_servers = 4
self.topology_servers = {}
for instance in range(2, no_of_servers):
self.create_topology_instance(
"server_lbaas_%s" % instance, [self.network_state],
security_groups=[{'name': self.sg['name']}],
image_id=image_id, clients=self.cmgr_adm)
self.start_web_servers(constants.HTTP_PORT)
topo_dict_1 = self.create_project_lbaas(
protocol_type="TERMINATED_HTTPS",
protocol_port="444",
lb_algorithm="ROUND_ROBIN",
hm_type='HTTP',
member_count=4,
weight=5,
pool_protocol='HTTP',
pool_port='80',
vip_subnet_id=self.subnet_state['id'],
barbican_container=barbican_container,
lb_id=topo_dict['lb_id'],
count=2,
clean_up=False, barbican=True)
self.check_certificate_at_backend(cert_conent=cert_content)
for member in topo_dict_1['members']:
self.members_admin_client.delete_member(
topo_dict_1['pool_id'], member['id'])
self.load_balancers_admin_client.wait_for_load_balancer_status(
topo_dict['lb_id'])
self.health_monitors_admin_client.delete_health_monitor(
topo_dict_1['healthmonitor_id'])
self.load_balancers_admin_client.wait_for_load_balancer_status(
topo_dict['lb_id'])
self.pools_admin_client.delete_pool(topo_dict_1['pool_id'])
self.load_balancers_admin_client.wait_for_load_balancer_status(
topo_dict['lb_id'])
self.listeners_admin_client.delete_listener(topo_dict_1['listener_id'])
self.load_balancers_admin_client.wait_for_load_balancer_status(
topo_dict['lb_id'])
self.check_certificate_at_backend(cert_conent=cert_content)
for member in topo_dict['members']:
self.members_admin_client.delete_member(
topo_dict['pool_id'], member['id'])
self.load_balancers_admin_client.wait_for_load_balancer_status(
topo_dict['lb_id'])
self.health_monitors_admin_client.delete_health_monitor(
topo_dict['healthmonitor_id'])
self.load_balancers_admin_client.wait_for_load_balancer_status(
topo_dict['lb_id'])
self.pools_admin_client.delete_pool(topo_dict['pool_id'])
self.load_balancers_admin_client.wait_for_load_balancer_status(
topo_dict['lb_id'])
self.listeners_admin_client.delete_listener(topo_dict['listener_id'])
self.load_balancers_admin_client.wait_for_load_balancer_status(
topo_dict['lb_id'])
self.check_certificate_at_backend(should_present=False,
cert_conent=cert_content)
@decorators.idempotent_id('79ec555d-215d-4006-bcf0-ab4c6cb0b9ff')
def test_barbican_create_lbaas_listener_with_invalid_container_uuid(self):
"""
Create lbaas listener with invalid container id
it should fail
"""
barbican_secrets = self.create_barbican_secret_conatainer(
constants.CERT_FILE, constants.KEY_FILE)
container_ref = barbican_secrets["secret_container"]['container_ref']\
.split('/')
container_ref.remove(container_ref[len(container_ref) - 1])
container_ref.append(uuidutils.generate_uuid())
container_ref = '/'.join(str(e) for e in container_ref)
self.create_single_network_topo_for_barbican()
lb_name = data_utils.rand_name("tempest_lb")
self.loadbalancer = self.load_balancers_admin_client.\
create_load_balancer(name=lb_name,
vip_subnet_id=self.subnet_state
['id'])['loadbalancer']
lb_id = self.loadbalancer['id']
self.addCleanup(
self.load_balancers_admin_client.delete_load_balancer,
self.loadbalancer['id'])
self.load_balancers_admin_client.wait_for_load_balancer_status(
lb_id)
listener_name = data_utils.rand_name("tempest_lb")
self.assertRaises(exceptions.NotFound,
self.listeners_admin_client.create_listener,
loadbalancer_id=lb_id, protocol="TERMINATED_HTTPS",
protocol_port="443", name=listener_name,
default_tls_container_ref=container_ref)