Disable anonymous result upload

This patch disables anonymous test result uploads. It also
fixes a python35 incompatability uncovered by more extensive
testing of signing uploads (bytes can no longer be decoded).
Also updated coverage tests to not include tests in coverage.

Co-Authored-By: megan guiney <meganmguiney@gmail.com>

Change-Id: I3f19f1399eb2948eaf8340b5c4be18b698c7e139
This commit is contained in:
Chris Hoge 2017-09-26 15:12:08 -07:00 committed by Megan
parent 2afff3094a
commit 4595c1f0b6
6 changed files with 163 additions and 26 deletions

View File

@ -154,6 +154,11 @@
# contents of that file. (string value)
#github_raw_base_url = https://raw.githubusercontent.com/openstack/interop/master/
# Enable or disable anonymous uploads. If set to False, all clients
# will need to authenticate and sign with a public/private keypair
# previously uploaded to their user account.
#enable_anonymous_upload = true
# Number of results for one page (integer value)
#results_per_page = 20

View File

@ -94,7 +94,14 @@ API_OPTS = [
help='This is the base URL that is used for retrieving '
'specific capability files. Capability file names will '
'be appended to this URL to get the contents of that file.'
)
),
cfg.BoolOpt('enable_anonymous_upload',
default=True,
help='Enable or disable anonymous uploads. If set to False, '
'all clients will need to authenticate and sign with a '
'public/private keypair previously uploaded to their '
'user account.'
)
]
CONF = cfg.CONF

View File

@ -103,6 +103,40 @@ class ResultsController(validation.BaseRestControllerWithValidation):
meta = MetadataController()
def _check_authentication(self):
x_public_key = pecan.request.headers.get('X-Public-Key')
if x_public_key:
public_key = x_public_key.strip().split()[1]
stored_public_key = db.get_pubkey(public_key)
if not stored_public_key:
pecan.abort(401, 'User with specified key not found. '
'Please log into the RefStack server to '
'upload your key.')
else:
stored_public_key = None
if not CONF.api.enable_anonymous_upload and not stored_public_key:
pecan.abort(401, 'Anonymous result uploads are disabled. '
'Please create a user account and an api '
'key at https://refstack.openstack.org/#/')
return stored_public_key
def _auto_version_associate(self, test, test_, pubkey):
if test.get('cpid'):
version = db.get_product_version_by_cpid(
test['cpid'], allowed_keys=['id', 'product_id'])
# Only auto-associate if there is a single product version
# with the given cpid.
if len(version) == 1:
is_foundation = api_utils.check_user_is_foundation_admin(
pubkey.openid)
is_product_admin = api_utils.check_user_is_product_admin(
version[0]['product_id'], pubkey.openid)
if is_foundation or is_product_admin:
test_['product_version_id'] = version[0]['id']
return test_
@pecan.expose('json')
@api_utils.check_permissions(level=const.ROLE_USER)
def get_one(self, test_id):
@ -125,7 +159,8 @@ class ResultsController(validation.BaseRestControllerWithValidation):
if user_role not in (const.ROLE_FOUNDATION, const.ROLE_OWNER):
# Don't expose product information if product is not public.
if (test_info.get('product_version') and
not test_info['product_version']['product_info']['public']):
not test_info['product_version']
['product_info']['public']):
test_info['product_version'] = None
@ -137,30 +172,16 @@ class ResultsController(validation.BaseRestControllerWithValidation):
def store_item(self, test):
"""Handler for storing item. Should return new item id."""
# If we need a key, or the key isn't available, this will throw
# an exception with a 401
pubkey = self._check_authentication()
test_ = test.copy()
if pecan.request.headers.get('X-Public-Key'):
key = pecan.request.headers.get('X-Public-Key').strip().split()[1]
if pubkey:
if 'meta' not in test_:
test_['meta'] = {}
pubkey = db.get_pubkey(key)
if not pubkey:
pecan.abort(400, 'User with specified key not found. '
'Please log into the RefStack server to '
'upload your key.')
test_['meta'][const.USER] = pubkey.openid
if test.get('cpid'):
version = db.get_product_version_by_cpid(
test['cpid'], allowed_keys=['id', 'product_id'])
# Only auto-associate if there is a single product version
# with the given cpid.
if len(version) == 1:
is_foundation = api_utils.check_user_is_foundation_admin(
pubkey.openid)
is_product_admin = api_utils.check_user_is_product_admin(
version[0]['product_id'], pubkey.openid)
if is_foundation or is_product_admin:
test_['product_version_id'] = version[0]['id']
test_ = self._auto_version_associate(test, test_, pubkey)
test_id = db.store_results(test_)
return {'test_id': test_id,
'url': parse.urljoin(CONF.ui_url,
@ -224,7 +245,8 @@ class ResultsController(validation.BaseRestControllerWithValidation):
# Don't expose product info if the product is not public.
if (result.get('product_version') and not
result['product_version']['product_info']['public']):
result['product_version']['product_info']
['public']):
result['product_version'] = None
# Only show all metadata if the user is the owner or a

View File

@ -323,7 +323,7 @@ def get_pubkey(key):
The md5 hash of the key is used for the query for quicker lookups.
"""
session = get_session()
md5_hash = hashlib.md5(base64.b64decode(key.encode('ascii'))).hexdigest()
md5_hash = hashlib.md5(base64.b64decode(key)).hexdigest()
pubkeys = session.query(models.PubKey).filter_by(md5_hash=md5_hash).all()
if len(pubkeys) == 1:
return pubkeys[0]
@ -342,7 +342,7 @@ def store_pubkey(pubkey_info):
pubkey.pubkey = pubkey_info['pubkey']
pubkey.md5_hash = hashlib.md5(
base64.b64decode(
pubkey_info['pubkey'].encode('ascii')
pubkey_info['pubkey']
)
).hexdigest()
pubkey.comment = pubkey_info['comment']

View File

@ -25,6 +25,14 @@ from refstack.api import validators
from refstack import db
from refstack.tests import api
import binascii
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
FAKE_TESTS_RESULT = {
'cpid': 'foo',
'duration_seconds': 10,
@ -407,3 +415,98 @@ class TestResultsEndpoint(api.FunctionalTest):
db.update_test({'id': test_id, 'verification_status': 0})
resp = self.delete(url, expect_errors=True)
self.assertEqual(204, resp.status_code)
class TestResultsEndpointNoAnonymous(api.FunctionalTest):
URL = '/v1/results/'
def _generate_keypair_(self):
return rsa.generate_private_key(
public_exponent=65537,
key_size=1024,
backend=default_backend()
)
def _sign_body_(self, keypair, body):
signer = keypair.signer(padding.PKCS1v15(), hashes.SHA256())
signer.update(body)
return signer.finalize()
def _get_public_key_(self, keypair):
pubkey = keypair.public_key().public_bytes(
serialization.Encoding.OpenSSH,
serialization.PublicFormat.OpenSSH
)
return pubkey
def setUp(self):
super(TestResultsEndpointNoAnonymous, self).setUp()
self.config_fixture = config_fixture.Config()
self.CONF = self.useFixture(self.config_fixture).conf
self.CONF.api.enable_anonymous_upload = False
self.user_info = {
'openid': 'test-open-id',
'email': 'foo@bar.com',
'fullname': 'Foo Bar'
}
db.user_save(self.user_info)
good_key = self._generate_keypair_()
self.body = json.dumps(FAKE_TESTS_RESULT).encode()
signature = self._sign_body_(good_key, self.body)
pubkey = self._get_public_key_(good_key)
x_signature = binascii.b2a_hex(signature)
self.good_headers = {
'X-Signature': x_signature,
'X-Public-Key': pubkey
}
self.pubkey_info = {
'openid': 'test-open-id',
'format': 'ssh-rsa',
'pubkey': pubkey.split()[1],
'comment': 'comment'
}
db.store_pubkey(self.pubkey_info)
bad_key = self._generate_keypair_()
bad_signature = self._sign_body_(bad_key, self.body)
bad_pubkey = self._get_public_key_(bad_key)
x_bad_signature = binascii.b2a_hex(bad_signature)
self.bad_headers = {
'X-Signature': x_bad_signature,
'X-Public-Key': bad_pubkey
}
def test_post_with_no_token(self):
"""Test results endpoint with post request."""
results = json.dumps(FAKE_TESTS_RESULT)
actual_response = self.post_json(self.URL, expect_errors=True,
params=results)
self.assertEqual(actual_response.status_code, 401)
def test_post_with_valid_token(self):
"""Test results endpoint with post request."""
results = json.dumps(FAKE_TESTS_RESULT)
actual_response = self.post_json(self.URL,
headers=self.good_headers,
params=results)
self.assertIn('test_id', actual_response)
try:
uuid.UUID(actual_response.get('test_id'), version=4)
except ValueError:
self.fail("actual_response doesn't contain test_id")
def test_post_with_invalid_token(self):
results = json.dumps(FAKE_TESTS_RESULT)
actual_response = self.post_json(self.URL,
headers=self.bad_headers,
expect_errors=True,
params=results)
self.assertEqual(actual_response.status_code, 401)

View File

@ -57,7 +57,7 @@ commands = {posargs}
[testenv:gen-cover]
commands = python setup.py testr --coverage \
--omit='{toxinidir}/refstack/tests*,{toxinidir}/refstack/api/config.py,{toxinidir}/refstack/db/migrations/alembic/*,{toxinidir}/refstack/opts.py' \
--omit='{toxinidir}/refstack/tests/unit/*,{toxinidir}/refstack/tests/api/*,{toxinidir}/refstack/api/config.py,{toxinidir}/refstack/db/migrations/alembic/*,{toxinidir}/refstack/opts.py' \
--testr-args='{posargs}'
[testenv:cover]