Create JSON request for API

This commit adds a method that forms a JSON request content string
in the correct format containing a list of test IDs that have
passed Tempest. Consequently, a method was created that uses
the Keystone client to get the Keystone ID which will be used
as the cpid.

Change-Id: I445415af8f0ecdad68b3ad2860cbe8da8a0dfe0a
This commit is contained in:
Paul Van Eck 2014-07-30 19:24:50 -07:00
parent 9ce4f345d1
commit bc649a6aa4
2 changed files with 126 additions and 12 deletions

View File

@ -25,10 +25,17 @@ Tempest configuration file.
"""
import argparse
import ConfigParser
import json
import logging
import os
import requests
import subprocess
import time
from keystoneclient.v2_0 import client as ksclient
from subunit_processor import SubunitProcessor
class RefstackClient:
@ -59,20 +66,23 @@ class RefstackClient:
# Check that the config file exists.
if not os.path.isfile(args.conf_file):
self.logger.error("File not valid: %s" % args.conf_file)
exit()
exit(1)
self.conf_file = args.conf_file
self.conf = ConfigParser.SafeConfigParser()
self.conf.read(self.conf_file)
self.results_file = self._get_subunit_output_file()
self.cpid = self._get_cpid_from_keystone()
def post_results(self):
'''Post the combined results back to the server.'''
# TODO(cdiep): This method needs to generate results in a format
# defined in https://review.openstack.org/#/c/105901/, and post those
# results using the v1 API.
# TODO(cdiep): Post results once the API is available as outlined here:
# github.com/stackforge/refstack/blob/master/specs/approved/api-v1.md
self.logger.info('Sending the Tempest test results to the Refstack '
'API server.')
json_content = self._form_json_content()
self.logger.debug('API request content: %s ' % json_content)
def _get_subunit_output_file(self):
'''This method reads from the next-stream file in the .testrepository
@ -81,7 +91,7 @@ class RefstackClient:
try:
subunit_file = open(os.path.join(
self.tempest_dir, '.testrepository',
'next-stream'), 'r').read()
'next-stream'), 'r').read().rstrip()
except (IOError, OSError):
self.logger.debug('The .testrepository/next-stream file was not '
'found. Assuming subunit results will be stored '
@ -93,12 +103,52 @@ class RefstackClient:
return os.path.join(self.tempest_dir, '.testrepository', subunit_file)
def _get_cpid_from_keystone(self):
'''This will get the Keystone service ID which is used as the CPID.'''
try:
args = {'auth_url': self.conf.get('identity', 'uri'),
'username': self.conf.get('identity', 'admin_username'),
'password': self.conf.get('identity', 'admin_password')}
if self.conf.has_option('identity', 'admin_tenant_id'):
args['tenant_id'] = self.conf.get('identity',
'admin_tenant_id')
else:
args['tenant_name'] = self.conf.get('identity',
'admin_tenant_name')
client = ksclient.Client(**args)
services = client.services.list()
for service in services:
if service.type == "identity":
return service.id
except ConfigParser.Error as e:
# Most likely a missing section or option in the config file.
self.logger.error("Invalid Config File: %s" % e)
exit(1)
def _form_json_content(self):
'''This method will create the JSON content for the request.'''
subunit_processor = SubunitProcessor(self.results_file)
results = subunit_processor.process_stream()
self.logger.info("Number of passed tests: %d" % len(results))
content = {}
content['cpid'] = self.cpid
content['duration_seconds'] = self.duration
content['results'] = results
return json.dumps(content)
def run(self):
'''Execute tempest test against the cloud.'''
try:
self.logger.info("Starting Tempest test...")
start_time = time.time()
# Run the tempest script, specifying the output_conf file and the
# Run the tempest script, specifying the conf file and the
# flag telling it to not use a virtual environment.
cmd = (self.tempest_script, '-C', self.conf_file, '-N')
@ -118,12 +168,15 @@ class RefstackClient:
process = subprocess.Popen(cmd, stderr=stderr)
process.communicate()
# TODO(cdiep): Enable post_test_result once the code is ready.
# self.post_results()
end_time = time.time()
elapsed = end_time - start_time
self.duration = int(elapsed)
self.logger.info('Tempest test complete.')
self.logger.debug('Subunit results located in: %s' %
self.results_file)
self.logger.info('Subunit results located in: %s' %
self.results_file)
self.post_results()
except subprocess.CalledProcessError as e:
self.logger.error('%s failed to complete' % (e))

View File

@ -0,0 +1,61 @@
# Copyright 2014 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import os
import re
import subunit
import testtools
import unittest
class TempestSubunitTestResultPassOnly(testtools.TestResult):
"""Class to process subunit stream.
This class maintains a list of test IDs that pass.
"""
def __init__(self, stream, descriptions, verbosity):
"""Initialize with super class signature."""
super(TempestSubunitTestResultPassOnly, self).__init__()
self.results = []
def addSuccess(self, testcase):
"""Overwrite super class method for additional data processing."""
super(TempestSubunitTestResultPassOnly, self).addSuccess(testcase)
# Remove any [] and () from the test ID before appending it.
self.results.append(re.sub('[\(\[].*[\]\)]', '', testcase.id()))
def get_results(self):
return self.results
class SubunitProcessor():
"""A class to replay subunit data from a stream."""
def __init__(self, in_stream,
result_class=TempestSubunitTestResultPassOnly):
self.in_stream = in_stream
self.result_class = result_class
def process_stream(self):
"""Read and process subunit data from a stream."""
with open(self.in_stream, 'r') as fin:
test = subunit.ProtocolTestCase(fin)
runner = unittest.TextTestRunner(stream=open(os.devnull, 'w'),
resultclass=self.result_class)
# Run (replay) the test from subunit stream.
test_result = runner.run(test)
return test_result.get_results()