diff --git a/neutron/tests/unit/services/vpn/device_drivers/cisco_csr_mock.py b/neutron/tests/unit/services/vpn/device_drivers/cisco_csr_mock.py deleted file mode 100644 index 93a126bfda..0000000000 --- a/neutron/tests/unit/services/vpn/device_drivers/cisco_csr_mock.py +++ /dev/null @@ -1,577 +0,0 @@ -# Copyright 2014 Cisco Systems, Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Mock REST requests to Cisco Cloud Services Router.""" - -import re - -import functools -# TODO(pcm): Remove when switch to requests-mock package. Comment out, if use -# local copy of httmock.py source. Needed for PEP8. -import httmock -import requests -from requests import exceptions as r_exc - -from neutron.openstack.common import log as logging -# TODO(pcm) Remove once httmock package is added to test-requirements. For -# now, uncomment and include httmock source to unit test. -# from neutron.tests.unit.services.vpn.device_drivers import httmock - -LOG = logging.getLogger(__name__) - - -def repeat(n): - """Decorator to limit the number of times a handler is called. - - Will allow the wrapped function (handler) to be called 'n' times. - After that, this will return None for any additional calls, - allowing other handlers, if any, to be invoked. - """ - - class static: - retries = n - - def decorator(func): - @functools.wraps(func) - def wrapped(*args, **kwargs): - if static.retries == 0: - return None - static.retries -= 1 - return func(*args, **kwargs) - return wrapped - return decorator - - -def filter_request(methods, resource): - """Decorator to invoke handler once for a specific resource. - - This will call the handler only for a specific resource using - a specific method(s). Any other resource request or method will - return None, allowing other handlers, if any, to be invoked. - """ - - class static: - target_methods = [m.upper() for m in methods] - target_resource = resource - - def decorator(func): - @functools.wraps(func) - def wrapped(*args, **kwargs): - if (args[1].method in static.target_methods and - static.target_resource in args[0].path): - return func(*args, **kwargs) - else: - return None # Not for this resource - return wrapped - return decorator - - -@httmock.urlmatch(netloc=r'localhost') -def token(url, request): - if 'auth/token-services' in url.path: - return {'status_code': requests.codes.OK, - 'content': {'token-id': 'dummy-token'}} - - -@httmock.urlmatch(netloc=r'localhost') -def token_unauthorized(url, request): - if 'auth/token-services' in url.path: - return {'status_code': requests.codes.UNAUTHORIZED} - - -@httmock.urlmatch(netloc=r'wrong-host') -def token_wrong_host(url, request): - raise r_exc.ConnectionError() - - -@httmock.all_requests -def token_timeout(url, request): - raise r_exc.Timeout() - - -@filter_request(['get'], 'global/host-name') -@httmock.all_requests -def timeout(url, request): - """Simulated timeout of a normal request.""" - - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - raise r_exc.Timeout() - - -@httmock.urlmatch(netloc=r'localhost') -def no_such_resource(url, request): - """Indicate not found error, when invalid resource requested.""" - return {'status_code': requests.codes.NOT_FOUND} - - -@filter_request(['get'], 'global/host-name') -@repeat(1) -@httmock.urlmatch(netloc=r'localhost') -def expired_request(url, request): - """Simulate access denied failure on first request for this resource. - - Intent here is to simulate that the token has expired, by failing - the first request to the resource. Because of the repeat=1, this - will only be called once, and subsequent calls will not be handled - by this function, but instead will access the normal handler and - will pass. Currently configured for a GET request, but will work - with POST and PUT as well. For DELETE, would need to filter_request on a - different resource (e.g. 'global/local-users') - """ - - return {'status_code': requests.codes.UNAUTHORIZED} - - -@httmock.urlmatch(netloc=r'localhost') -def normal_get(url, request): - if request.method != 'GET': - return - LOG.debug("GET mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - if 'global/host-name' in url.path: - content = {u'kind': u'object#host-name', - u'host-name': u'Router'} - return httmock.response(requests.codes.OK, content=content) - if 'global/local-users' in url.path: - content = {u'kind': u'collection#local-user', - u'users': ['peter', 'paul', 'mary']} - return httmock.response(requests.codes.OK, content=content) - if 'interfaces/GigabitEthernet' in url.path: - actual_interface = url.path.split('/')[-1] - ip = actual_interface[-1] - content = {u'kind': u'object#interface', - u'description': u'Changed description', - u'if-name': actual_interface, - u'proxy-arp': True, - u'subnet-mask': u'255.255.255.0', - u'icmp-unreachable': True, - u'nat-direction': u'', - u'icmp-redirects': True, - u'ip-address': u'192.168.200.%s' % ip, - u'verify-unicast-source': False, - u'type': u'ethernet'} - return httmock.response(requests.codes.OK, content=content) - if 'vpn-svc/ike/policies/2' in url.path: - content = {u'kind': u'object#ike-policy', - u'priority-id': u'2', - u'version': u'v1', - u'local-auth-method': u'pre-share', - u'encryption': u'aes256', - u'hash': u'sha', - u'dhGroup': 5, - u'lifetime': 3600} - return httmock.response(requests.codes.OK, content=content) - if 'vpn-svc/ike/keyrings' in url.path: - content = {u'kind': u'object#ike-keyring', - u'keyring-name': u'5', - u'pre-shared-key-list': [ - {u'key': u'super-secret', - u'encrypted': False, - u'peer-address': u'10.10.10.20 255.255.255.0'} - ]} - return httmock.response(requests.codes.OK, content=content) - if 'vpn-svc/ipsec/policies/' in url.path: - ipsec_policy_id = url.path.split('/')[-1] - content = {u'kind': u'object#ipsec-policy', - u'mode': u'tunnel', - u'policy-id': u'%s' % ipsec_policy_id, - u'protection-suite': { - u'esp-encryption': u'esp-256-aes', - u'esp-authentication': u'esp-sha-hmac', - u'ah': u'ah-sha-hmac', - }, - u'anti-replay-window-size': u'Disable', - u'lifetime-sec': 120, - u'pfs': u'group5', - u'lifetime-kb': 4608000, - u'idle-time': None} - return httmock.response(requests.codes.OK, content=content) - if 'vpn-svc/site-to-site/Tunnel' in url.path: - tunnel = url.path.split('/')[-1] - # Use same number, to allow mock to generate IPSec policy ID - ipsec_policy_id = tunnel[6:] - content = {u'kind': u'object#vpn-site-to-site', - u'vpn-interface-name': u'%s' % tunnel, - u'ip-version': u'ipv4', - u'vpn-type': u'site-to-site', - u'ipsec-policy-id': u'%s' % ipsec_policy_id, - u'ike-profile-id': None, - u'mtu': 1500, - u'local-device': { - u'ip-address': '10.3.0.1/24', - u'tunnel-ip-address': '10.10.10.10' - }, - u'remote-device': { - u'tunnel-ip-address': '10.10.10.20' - }} - return httmock.response(requests.codes.OK, content=content) - if 'vpn-svc/ike/keepalive' in url.path: - content = {u'interval': 60, - u'retry': 4, - u'periodic': True} - return httmock.response(requests.codes.OK, content=content) - if 'routing-svc/static-routes' in url.path: - content = {u'destination-network': u'10.1.0.0/24', - u'kind': u'object#static-route', - u'next-hop-router': None, - u'outgoing-interface': u'GigabitEthernet1', - u'admin-distance': 1} - return httmock.response(requests.codes.OK, content=content) - if 'vpn-svc/site-to-site/active/sessions' in url.path: - # Only including needed fields for mock - content = {u'kind': u'collection#vpn-active-sessions', - u'items': [{u'status': u'DOWN-NEGOTIATING', - u'vpn-interface-name': u'Tunnel123'}, ]} - return httmock.response(requests.codes.OK, content=content) - - -@filter_request(['get'], 'vpn-svc/ike/keyrings') -@httmock.urlmatch(netloc=r'localhost') -def get_fqdn(url, request): - LOG.debug("GET FQDN mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - content = {u'kind': u'object#ike-keyring', - u'keyring-name': u'5', - u'pre-shared-key-list': [ - {u'key': u'super-secret', - u'encrypted': False, - u'peer-address': u'cisco.com'} - ]} - return httmock.response(requests.codes.OK, content=content) - - -@filter_request(['get'], 'vpn-svc/ipsec/policies/') -@httmock.urlmatch(netloc=r'localhost') -def get_no_ah(url, request): - LOG.debug("GET No AH mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - ipsec_policy_id = url.path.split('/')[-1] - content = {u'kind': u'object#ipsec-policy', - u'mode': u'tunnel', - u'anti-replay-window-size': u'128', - u'policy-id': u'%s' % ipsec_policy_id, - u'protection-suite': { - u'esp-encryption': u'esp-aes', - u'esp-authentication': u'esp-sha-hmac', - }, - u'lifetime-sec': 120, - u'pfs': u'group5', - u'lifetime-kb': 4608000, - u'idle-time': None} - return httmock.response(requests.codes.OK, content=content) - - -@httmock.urlmatch(netloc=r'localhost') -def get_defaults(url, request): - if request.method != 'GET': - return - LOG.debug("GET mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - if 'vpn-svc/ike/policies/2' in url.path: - content = {u'kind': u'object#ike-policy', - u'priority-id': u'2', - u'version': u'v1', - u'local-auth-method': u'pre-share', - u'encryption': u'des', - u'hash': u'sha', - u'dhGroup': 1, - u'lifetime': 86400} - return httmock.response(requests.codes.OK, content=content) - if 'vpn-svc/ipsec/policies/' in url.path: - ipsec_policy_id = url.path.split('/')[-1] - content = {u'kind': u'object#ipsec-policy', - u'mode': u'tunnel', - u'policy-id': u'%s' % ipsec_policy_id, - u'protection-suite': {}, - u'lifetime-sec': 3600, - u'pfs': u'Disable', - u'anti-replay-window-size': u'None', - u'lifetime-kb': 4608000, - u'idle-time': None} - return httmock.response(requests.codes.OK, content=content) - - -@filter_request(['get'], 'vpn-svc/site-to-site') -@httmock.urlmatch(netloc=r'localhost') -def get_unnumbered(url, request): - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - tunnel = url.path.split('/')[-1] - ipsec_policy_id = tunnel[6:] - content = {u'kind': u'object#vpn-site-to-site', - u'vpn-interface-name': u'%s' % tunnel, - u'ip-version': u'ipv4', - u'vpn-type': u'site-to-site', - u'ipsec-policy-id': u'%s' % ipsec_policy_id, - u'ike-profile-id': None, - u'mtu': 1500, - u'local-device': { - u'ip-address': u'GigabitEthernet3', - u'tunnel-ip-address': u'10.10.10.10' - }, - u'remote-device': { - u'tunnel-ip-address': u'10.10.10.20' - }} - return httmock.response(requests.codes.OK, content=content) - - -@filter_request(['get'], 'vpn-svc/site-to-site/Tunnel') -@httmock.urlmatch(netloc=r'localhost') -def get_admin_down(url, request): - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - # URI has .../Tunnel#/state, so get number from 2nd to last element - tunnel = url.path.split('/')[-2] - content = {u'kind': u'object#vpn-site-to-site-state', - u'vpn-interface-name': u'%s' % tunnel, - u'line-protocol-state': u'down', - u'enabled': False} - return httmock.response(requests.codes.OK, content=content) - - -@filter_request(['get'], 'vpn-svc/site-to-site/Tunnel') -@httmock.urlmatch(netloc=r'localhost') -def get_admin_up(url, request): - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - # URI has .../Tunnel#/state, so get number from 2nd to last element - tunnel = url.path.split('/')[-2] - content = {u'kind': u'object#vpn-site-to-site-state', - u'vpn-interface-name': u'%s' % tunnel, - u'line-protocol-state': u'down', - u'enabled': True} - return httmock.response(requests.codes.OK, content=content) - - -@filter_request(['get'], 'vpn-svc/site-to-site') -@httmock.urlmatch(netloc=r'localhost') -def get_mtu(url, request): - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - tunnel = url.path.split('/')[-1] - ipsec_policy_id = tunnel[6:] - content = {u'kind': u'object#vpn-site-to-site', - u'vpn-interface-name': u'%s' % tunnel, - u'ip-version': u'ipv4', - u'vpn-type': u'site-to-site', - u'ipsec-policy-id': u'%s' % ipsec_policy_id, - u'ike-profile-id': None, - u'mtu': 9192, - u'local-device': { - u'ip-address': u'10.3.0.1/24', - u'tunnel-ip-address': u'10.10.10.10' - }, - u'remote-device': { - u'tunnel-ip-address': u'10.10.10.20' - }} - return httmock.response(requests.codes.OK, content=content) - - -@filter_request(['get'], 'vpn-svc/ike/keepalive') -@httmock.urlmatch(netloc=r'localhost') -def get_not_configured(url, request): - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - return {'status_code': requests.codes.NOT_FOUND} - - -@filter_request(['get'], 'vpn-svc/site-to-site/active/sessions') -@httmock.urlmatch(netloc=r'localhost') -def get_none(url, request): - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - content = {u'kind': u'collection#vpn-active-sessions', - u'items': []} - return httmock.response(requests.codes.OK, content=content) - - -@filter_request(['get'], 'interfaces/GigabitEthernet3') -@httmock.urlmatch(netloc=r'localhost') -def get_local_ip(url, request): - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - content = {u'kind': u'object#interface', - u'subnet-mask': u'255.255.255.0', - u'ip-address': u'10.5.0.2'} - return httmock.response(requests.codes.OK, content=content) - - -@httmock.urlmatch(netloc=r'localhost') -def post(url, request): - if request.method != 'POST': - return - LOG.debug("POST mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - if 'interfaces/GigabitEthernet' in url.path: - return {'status_code': requests.codes.NO_CONTENT} - if 'global/local-users' in url.path: - if 'username' not in request.body: - return {'status_code': requests.codes.BAD_REQUEST} - if '"privilege": 20' in request.body: - return {'status_code': requests.codes.BAD_REQUEST} - headers = {'location': '%s/test-user' % url.geturl()} - return httmock.response(requests.codes.CREATED, headers=headers) - if 'vpn-svc/ike/policies' in url.path: - headers = {'location': "%s/2" % url.geturl()} - return httmock.response(requests.codes.CREATED, headers=headers) - if 'vpn-svc/ipsec/policies' in url.path: - m = re.search(r'"policy-id": "(\S+)"', request.body) - if m: - headers = {'location': "%s/%s" % (url.geturl(), m.group(1))} - return httmock.response(requests.codes.CREATED, headers=headers) - return {'status_code': requests.codes.BAD_REQUEST} - if 'vpn-svc/ike/keyrings' in url.path: - headers = {'location': "%s/5" % url.geturl()} - return httmock.response(requests.codes.CREATED, headers=headers) - if 'vpn-svc/site-to-site' in url.path: - m = re.search(r'"vpn-interface-name": "(\S+)"', request.body) - if m: - headers = {'location': "%s/%s" % (url.geturl(), m.group(1))} - return httmock.response(requests.codes.CREATED, headers=headers) - return {'status_code': requests.codes.BAD_REQUEST} - if 'routing-svc/static-routes' in url.path: - headers = {'location': - "%s/10.1.0.0_24_GigabitEthernet1" % url.geturl()} - return httmock.response(requests.codes.CREATED, headers=headers) - - -@filter_request(['post'], 'global/local-users') -@httmock.urlmatch(netloc=r'localhost') -def post_change_attempt(url, request): - LOG.debug("POST change value mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - return {'status_code': requests.codes.NOT_FOUND, - 'content': { - u'error-code': -1, - u'error-message': u'user test-user already exists'}} - - -@httmock.urlmatch(netloc=r'localhost') -def post_duplicate(url, request): - LOG.debug("POST duplicate mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - return {'status_code': requests.codes.BAD_REQUEST, - 'content': { - u'error-code': -1, - u'error-message': u'policy 2 exist, not allow to ' - u'update policy using POST method'}} - - -@filter_request(['post'], 'vpn-svc/site-to-site') -@httmock.urlmatch(netloc=r'localhost') -def post_missing_ipsec_policy(url, request): - LOG.debug("POST missing ipsec policy mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - return {'status_code': requests.codes.BAD_REQUEST} - - -@filter_request(['post'], 'vpn-svc/site-to-site') -@httmock.urlmatch(netloc=r'localhost') -def post_missing_ike_policy(url, request): - LOG.debug("POST missing ike policy mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - return {'status_code': requests.codes.BAD_REQUEST} - - -@filter_request(['post'], 'vpn-svc/site-to-site') -@httmock.urlmatch(netloc=r'localhost') -def post_bad_ip(url, request): - LOG.debug("POST bad IP mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - return {'status_code': requests.codes.BAD_REQUEST} - - -@filter_request(['post'], 'vpn-svc/site-to-site') -@httmock.urlmatch(netloc=r'localhost') -def post_bad_mtu(url, request): - LOG.debug("POST bad mtu mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - return {'status_code': requests.codes.BAD_REQUEST} - - -@filter_request(['post'], 'vpn-svc/ipsec/policies') -@httmock.urlmatch(netloc=r'localhost') -def post_bad_lifetime(url, request): - LOG.debug("POST bad lifetime mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - return {'status_code': requests.codes.BAD_REQUEST} - - -@filter_request(['post'], 'vpn-svc/ipsec/policies') -@httmock.urlmatch(netloc=r'localhost') -def post_bad_name(url, request): - LOG.debug("POST bad IPSec policy name for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - return {'status_code': requests.codes.BAD_REQUEST} - - -@httmock.urlmatch(netloc=r'localhost') -def put(url, request): - if request.method != 'PUT': - return - LOG.debug("PUT mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - # Any resource - return {'status_code': requests.codes.NO_CONTENT} - - -@httmock.urlmatch(netloc=r'localhost') -def delete(url, request): - if request.method != 'DELETE': - return - LOG.debug("DELETE mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - # Any resource - return {'status_code': requests.codes.NO_CONTENT} - - -@httmock.urlmatch(netloc=r'localhost') -def delete_unknown(url, request): - if request.method != 'DELETE': - return - LOG.debug("DELETE unknown mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - # Any resource - return {'status_code': requests.codes.NOT_FOUND, - 'content': { - u'error-code': -1, - u'error-message': 'user unknown not found'}} - - -@httmock.urlmatch(netloc=r'localhost') -def delete_not_allowed(url, request): - if request.method != 'DELETE': - return - LOG.debug("DELETE not allowed mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - # Any resource - return {'status_code': requests.codes.METHOD_NOT_ALLOWED} diff --git a/neutron/tests/unit/services/vpn/device_drivers/notest_cisco_csr_rest.py b/neutron/tests/unit/services/vpn/device_drivers/notest_cisco_csr_rest.py deleted file mode 100644 index a89abe9445..0000000000 --- a/neutron/tests/unit/services/vpn/device_drivers/notest_cisco_csr_rest.py +++ /dev/null @@ -1,1358 +0,0 @@ -# Copyright 2014 Cisco Systems, Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -#TODO(pcm): Rename this file to remove the "no" prefix, once httmock is -# approved and added to requirements.txt - -import random - -# TODO(pcm): Remove when update to requests-mock package. Comment out, if use -# local copy of httmock.py source. Needed for PEP8. -import httmock -import requests - -from neutron.openstack.common import log as logging -from neutron.services.vpn.device_drivers import ( - cisco_csr_rest_client as csr_client) -from neutron.tests import base -from neutron.tests.unit.services.vpn.device_drivers import ( - cisco_csr_mock as csr_request) -# TODO(pcm) Uncomment to run w/local copy of httmock.py source. Remove when -# update to requests-mock package. -# from neutron.tests.unit.services.vpn.device_drivers import httmock - - -LOG = logging.getLogger(__name__) -# Enables debug logging to console -if True: - logging.CONF.set_override('debug', True) - logging.setup('neutron') - -dummy_policy_id = 'dummy-ipsec-policy-id-name' - - -# Note: Helper functions to test reuse of IDs. -def generate_pre_shared_key_id(): - return random.randint(100, 200) - - -def generate_ike_policy_id(): - return random.randint(200, 300) - - -def generate_ipsec_policy_id(): - return random.randint(300, 400) - - -class TestCsrLoginRestApi(base.BaseTestCase): - - """Test logging into CSR to obtain token-id.""" - - def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): - super(TestCsrLoginRestApi, self).setUp() - info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip, - 'username': 'stack', 'password': 'cisco', 'timeout': timeout} - self.csr = csr_client.CsrRestClient(info) - - def test_get_token(self): - """Obtain the token and its expiration time.""" - with httmock.HTTMock(csr_request.token): - self.assertTrue(self.csr.authenticate()) - self.assertEqual(requests.codes.OK, self.csr.status) - self.assertIsNotNone(self.csr.token) - - def test_unauthorized_token_request(self): - """Negative test of invalid user/password.""" - self.csr.auth = ('stack', 'bogus') - with httmock.HTTMock(csr_request.token_unauthorized): - self.assertIsNone(self.csr.authenticate()) - self.assertEqual(requests.codes.UNAUTHORIZED, self.csr.status) - - def test_non_existent_host(self): - """Negative test of request to non-existent host.""" - self.csr.host = 'wrong-host' - self.csr.token = 'Set by some previously successful access' - with httmock.HTTMock(csr_request.token_wrong_host): - self.assertIsNone(self.csr.authenticate()) - self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) - self.assertIsNone(self.csr.token) - - def test_timeout_on_token_access(self): - """Negative test of a timeout on a request.""" - with httmock.HTTMock(csr_request.token_timeout): - self.assertIsNone(self.csr.authenticate()) - self.assertEqual(requests.codes.REQUEST_TIMEOUT, self.csr.status) - self.assertIsNone(self.csr.token) - - -class TestCsrGetRestApi(base.BaseTestCase): - - """Test CSR GET REST API.""" - - def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): - super(TestCsrGetRestApi, self).setUp() - info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip, - 'username': 'stack', 'password': 'cisco', 'timeout': timeout} - self.csr = csr_client.CsrRestClient(info) - - def test_valid_rest_gets(self): - """Simple GET requests. - - First request will do a post to get token (login). Assumes - that there are two interfaces on the CSR. - """ - - with httmock.HTTMock(csr_request.token, - csr_request.normal_get): - content = self.csr.get_request('global/host-name') - self.assertEqual(requests.codes.OK, self.csr.status) - self.assertIn('host-name', content) - self.assertNotEqual(None, content['host-name']) - - content = self.csr.get_request('global/local-users') - self.assertEqual(requests.codes.OK, self.csr.status) - self.assertIn('users', content) - - -class TestCsrPostRestApi(base.BaseTestCase): - - """Test CSR POST REST API.""" - - def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): - super(TestCsrPostRestApi, self).setUp() - info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip, - 'username': 'stack', 'password': 'cisco', 'timeout': timeout} - self.csr = csr_client.CsrRestClient(info) - - def test_post_requests(self): - """Simple POST requests (repeatable). - - First request will do a post to get token (login). Assumes - that there are two interfaces (Ge1 and Ge2) on the CSR. - """ - - with httmock.HTTMock(csr_request.token, - csr_request.post): - content = self.csr.post_request( - 'interfaces/GigabitEthernet1/statistics', - payload={'action': 'clear'}) - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - self.assertIsNone(content) - content = self.csr.post_request( - 'interfaces/GigabitEthernet2/statistics', - payload={'action': 'clear'}) - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - self.assertIsNone(content) - - def test_post_with_location(self): - """Create a user and verify that location returned.""" - with httmock.HTTMock(csr_request.token, - csr_request.post): - location = self.csr.post_request( - 'global/local-users', - payload={'username': 'test-user', - 'password': 'pass12345', - 'privilege': 15}) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('global/local-users/test-user', location) - - def test_post_missing_required_attribute(self): - """Negative test of POST with missing mandatory info.""" - with httmock.HTTMock(csr_request.token, - csr_request.post): - self.csr.post_request('global/local-users', - payload={'password': 'pass12345', - 'privilege': 15}) - self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) - - def test_post_invalid_attribute(self): - """Negative test of POST with invalid info.""" - with httmock.HTTMock(csr_request.token, - csr_request.post): - self.csr.post_request('global/local-users', - payload={'username': 'test-user', - 'password': 'pass12345', - 'privilege': 20}) - self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) - - def test_post_already_exists(self): - """Negative test of a duplicate POST. - - Uses the lower level _do_request() API to just perform the POST and - obtain the response, without any error processing. - """ - with httmock.HTTMock(csr_request.token, - csr_request.post): - location = self.csr._do_request( - 'POST', - 'global/local-users', - payload={'username': 'test-user', - 'password': 'pass12345', - 'privilege': 15}, - more_headers=csr_client.HEADER_CONTENT_TYPE_JSON) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('global/local-users/test-user', location) - with httmock.HTTMock(csr_request.token, - csr_request.post_change_attempt): - self.csr._do_request( - 'POST', - 'global/local-users', - payload={'username': 'test-user', - 'password': 'pass12345', - 'privilege': 15}, - more_headers=csr_client.HEADER_CONTENT_TYPE_JSON) - # Note: For local-user, a 404 error is returned. For - # site-to-site connection a 400 is returned. - self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) - - def test_post_changing_value(self): - """Negative test of a POST trying to change a value.""" - with httmock.HTTMock(csr_request.token, - csr_request.post): - location = self.csr.post_request( - 'global/local-users', - payload={'username': 'test-user', - 'password': 'pass12345', - 'privilege': 15}) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('global/local-users/test-user', location) - with httmock.HTTMock(csr_request.token, - csr_request.post_change_attempt): - content = self.csr.post_request('global/local-users', - payload={'username': 'test-user', - 'password': 'changed', - 'privilege': 15}) - self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) - expected = {u'error-code': -1, - u'error-message': u'user test-user already exists'} - self.assertDictContainsSubset(expected, content) - - -class TestCsrPutRestApi(base.BaseTestCase): - - """Test CSR PUT REST API.""" - - def _save_resources(self): - with httmock.HTTMock(csr_request.token, - csr_request.normal_get): - details = self.csr.get_request('global/host-name') - if self.csr.status != requests.codes.OK: - self.fail("Unable to save original host name") - self.original_host = details['host-name'] - details = self.csr.get_request('interfaces/GigabitEthernet1') - if self.csr.status != requests.codes.OK: - self.fail("Unable to save interface Ge1 description") - self.original_if = details - if details.get('description', ''): - self.original_if['description'] = '' - self.csr.token = None - - def _restore_resources(self, user, password): - """Restore the host name and itnerface description. - - Must restore the user and password, so that authentication - token can be obtained (as some tests corrupt auth info). - Will also clear token, so that it gets a fresh token. - """ - - self.csr.auth = (user, password) - self.csr.token = None - with httmock.HTTMock(csr_request.token, - csr_request.put): - payload = {'host-name': self.original_host} - self.csr.put_request('global/host-name', payload=payload) - if self.csr.status != requests.codes.NO_CONTENT: - self.fail("Unable to restore host name after test") - payload = {'description': self.original_if['description'], - 'if-name': self.original_if['if-name'], - 'ip-address': self.original_if['ip-address'], - 'subnet-mask': self.original_if['subnet-mask'], - 'type': self.original_if['type']} - self.csr.put_request('interfaces/GigabitEthernet1', - payload=payload) - if self.csr.status != requests.codes.NO_CONTENT: - self.fail("Unable to restore I/F Ge1 description after test") - - def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): - """Prepare for PUT API tests.""" - super(TestCsrPutRestApi, self).setUp() - info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip, - 'username': 'stack', 'password': 'cisco', 'timeout': timeout} - self.csr = csr_client.CsrRestClient(info) - - self._save_resources() - self.addCleanup(self._restore_resources, 'stack', 'cisco') - - def test_put_requests(self): - """Simple PUT requests (repeatable). - - First request will do a post to get token (login). Assumes - that there are two interfaces on the CSR (Ge1 and Ge2). - """ - - with httmock.HTTMock(csr_request.token, - csr_request.put, - csr_request.normal_get): - payload = {'host-name': 'TestHost'} - content = self.csr.put_request('global/host-name', - payload=payload) - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - self.assertIsNone(content) - - payload = {'host-name': 'TestHost2'} - content = self.csr.put_request('global/host-name', - payload=payload) - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - self.assertIsNone(content) - - def test_change_interface_description(self): - """Test that interface description can be changed. - - This was a problem with an earlier version of the CSR image and is - here to prevent regression. - """ - with httmock.HTTMock(csr_request.token, - csr_request.put, - csr_request.normal_get): - payload = {'description': u'Changed description', - 'if-name': self.original_if['if-name'], - 'ip-address': self.original_if['ip-address'], - 'subnet-mask': self.original_if['subnet-mask'], - 'type': self.original_if['type']} - content = self.csr.put_request( - 'interfaces/GigabitEthernet1', payload=payload) - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - self.assertIsNone(content) - content = self.csr.get_request('interfaces/GigabitEthernet1') - self.assertEqual(requests.codes.OK, self.csr.status) - self.assertIn('description', content) - self.assertEqual(u'Changed description', - content['description']) - - def ignore_test_change_to_empty_interface_description(self): - """Test that interface description can be changed to empty string. - - This is a problem in the current version of the CSR image, which - rejects the change with a 400 error. This test is here to prevent - a regression (once it is fixed) Note that there is code in the - test setup to change the description to a non-empty string to - avoid failures in other tests. - """ - with httmock.HTTMock(csr_request.token, - csr_request.put, - csr_request.normal_get): - payload = {'description': '', - 'if-name': self.original_if['if-name'], - 'ip-address': self.original_if['ip-address'], - 'subnet-mask': self.original_if['subnet-mask'], - 'type': self.original_if['type']} - content = self.csr.put_request( - 'interfaces/GigabitEthernet1', payload=payload) - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - self.assertIsNone(content) - content = self.csr.get_request('interfaces/GigabitEthernet1') - self.assertEqual(requests.codes.OK, self.csr.status) - self.assertIn('description', content) - self.assertEqual('', content['description']) - - -class TestCsrDeleteRestApi(base.BaseTestCase): - - """Test CSR DELETE REST API.""" - - def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): - super(TestCsrDeleteRestApi, self).setUp() - info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip, - 'username': 'stack', 'password': 'cisco', 'timeout': timeout} - self.csr = csr_client.CsrRestClient(info) - - def _make_dummy_user(self): - """Create a user that will be later deleted.""" - self.csr.post_request('global/local-users', - payload={'username': 'dummy', - 'password': 'dummy', - 'privilege': 15}) - self.assertEqual(requests.codes.CREATED, self.csr.status) - - def test_delete_requests(self): - """Simple DELETE requests (creating entry first).""" - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.delete): - self._make_dummy_user() - self.csr.token = None # Force login - self.csr.delete_request('global/local-users/dummy') - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - # Delete again, but without logging in this time - self._make_dummy_user() - self.csr.delete_request('global/local-users/dummy') - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - - def test_delete_non_existent_entry(self): - """Negative test of trying to delete a non-existent user.""" - with httmock.HTTMock(csr_request.token, - csr_request.delete_unknown): - content = self.csr.delete_request('global/local-users/unknown') - self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) - expected = {u'error-code': -1, - u'error-message': u'user unknown not found'} - self.assertDictContainsSubset(expected, content) - - def test_delete_not_allowed(self): - """Negative test of trying to delete the host-name.""" - with httmock.HTTMock(csr_request.token, - csr_request.delete_not_allowed): - self.csr.delete_request('global/host-name') - self.assertEqual(requests.codes.METHOD_NOT_ALLOWED, - self.csr.status) - - -class TestCsrRestApiFailures(base.BaseTestCase): - - """Test failure cases common for all REST APIs. - - Uses the lower level _do_request() to just perform the operation and get - the result, without any error handling. - """ - - def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=0.1): - super(TestCsrRestApiFailures, self).setUp() - info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip, - 'username': 'stack', 'password': 'cisco', 'timeout': timeout} - self.csr = csr_client.CsrRestClient(info) - - def test_request_for_non_existent_resource(self): - """Negative test of non-existent resource on REST request.""" - with httmock.HTTMock(csr_request.token, - csr_request.no_such_resource): - self.csr.post_request('no/such/request') - self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) - # The result is HTTP 404 message, so no error content to check - - def test_timeout_during_request(self): - """Negative test of timeout during REST request.""" - with httmock.HTTMock(csr_request.token, - csr_request.timeout): - self.csr._do_request('GET', 'global/host-name') - self.assertEqual(requests.codes.REQUEST_TIMEOUT, self.csr.status) - - def test_token_expired_on_request(self): - """Token expired before trying a REST request. - - The mock is configured to return a 401 error on the first - attempt to reference the host name. Simulate expiration of - token by changing it. - """ - - with httmock.HTTMock(csr_request.token, - csr_request.expired_request, - csr_request.normal_get): - self.csr.token = '123' # These are 44 characters, so won't match - content = self.csr._do_request('GET', 'global/host-name') - self.assertEqual(requests.codes.OK, self.csr.status) - self.assertIn('host-name', content) - self.assertNotEqual(None, content['host-name']) - - def test_failed_to_obtain_token_for_request(self): - """Negative test of unauthorized user for REST request.""" - self.csr.auth = ('stack', 'bogus') - with httmock.HTTMock(csr_request.token_unauthorized): - self.csr._do_request('GET', 'global/host-name') - self.assertEqual(requests.codes.UNAUTHORIZED, self.csr.status) - - -class TestCsrRestIkePolicyCreate(base.BaseTestCase): - - """Test IKE policy create REST requests.""" - - def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): - super(TestCsrRestIkePolicyCreate, self).setUp() - info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip, - 'username': 'stack', 'password': 'cisco', 'timeout': timeout} - self.csr = csr_client.CsrRestClient(info) - - def test_create_delete_ike_policy(self): - """Create and then delete IKE policy.""" - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.normal_get): - policy_id = '2' - policy_info = {u'priority-id': u'%s' % policy_id, - u'encryption': u'aes256', - u'hash': u'sha', - u'dhGroup': 5, - u'lifetime': 3600} - location = self.csr.create_ike_policy(policy_info) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/ike/policies/%s' % policy_id, location) - # Check the hard-coded items that get set as well... - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - expected_policy = {u'kind': u'object#ike-policy', - u'version': u'v1', - u'local-auth-method': u'pre-share'} - expected_policy.update(policy_info) - self.assertEqual(expected_policy, content) - # Now delete and verify the IKE policy is gone - with httmock.HTTMock(csr_request.token, - csr_request.delete, - csr_request.no_such_resource): - self.csr.delete_ike_policy(policy_id) - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) - - def test_create_ike_policy_with_defaults(self): - """Create IKE policy using defaults for all optional values.""" - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.get_defaults): - policy_id = '2' - policy_info = {u'priority-id': u'%s' % policy_id} - location = self.csr.create_ike_policy(policy_info) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/ike/policies/%s' % policy_id, location) - # Check the hard-coded items that get set as well... - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - expected_policy = {u'kind': u'object#ike-policy', - u'version': u'v1', - u'encryption': u'des', - u'hash': u'sha', - u'dhGroup': 1, - u'lifetime': 86400, - # Lower level sets this, but it is the default - u'local-auth-method': u'pre-share'} - expected_policy.update(policy_info) - self.assertEqual(expected_policy, content) - - def test_create_duplicate_ike_policy(self): - """Negative test of trying to create a duplicate IKE policy.""" - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.normal_get): - policy_id = '2' - policy_info = {u'priority-id': u'%s' % policy_id, - u'encryption': u'aes', - u'hash': u'sha', - u'dhGroup': 5, - u'lifetime': 3600} - location = self.csr.create_ike_policy(policy_info) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/ike/policies/%s' % policy_id, location) - with httmock.HTTMock(csr_request.token, - csr_request.post_duplicate): - location = self.csr.create_ike_policy(policy_info) - self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) - expected = {u'error-code': -1, - u'error-message': u'policy 2 exist, not allow to ' - u'update policy using POST method'} - self.assertDictContainsSubset(expected, location) - - -class TestCsrRestIPSecPolicyCreate(base.BaseTestCase): - - """Test IPSec policy create REST requests.""" - - def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): - super(TestCsrRestIPSecPolicyCreate, self).setUp() - info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip, - 'username': 'stack', 'password': 'cisco', 'timeout': timeout} - self.csr = csr_client.CsrRestClient(info) - - def test_create_delete_ipsec_policy(self): - """Create and then delete IPSec policy.""" - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.normal_get): - policy_id = '123' - policy_info = { - u'policy-id': u'%s' % policy_id, - u'protection-suite': { - u'esp-encryption': u'esp-256-aes', - u'esp-authentication': u'esp-sha-hmac', - u'ah': u'ah-sha-hmac', - }, - u'lifetime-sec': 120, - u'pfs': u'group5', - u'anti-replay-window-size': u'disable' - } - location = self.csr.create_ipsec_policy(policy_info) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/ipsec/policies/%s' % policy_id, location) - # Check the hard-coded items that get set as well... - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - expected_policy = {u'kind': u'object#ipsec-policy', - u'mode': u'tunnel', - u'lifetime-kb': 4608000, - u'idle-time': None} - expected_policy.update(policy_info) - # CSR will respond with capitalized value - expected_policy[u'anti-replay-window-size'] = u'Disable' - self.assertEqual(expected_policy, content) - # Now delete and verify the IPSec policy is gone - with httmock.HTTMock(csr_request.token, - csr_request.delete, - csr_request.no_such_resource): - self.csr.delete_ipsec_policy(policy_id) - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) - - def test_create_ipsec_policy_with_defaults(self): - """Create IPSec policy with default for all optional values.""" - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.get_defaults): - policy_id = '123' - policy_info = { - u'policy-id': u'%s' % policy_id, - } - location = self.csr.create_ipsec_policy(policy_info) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/ipsec/policies/%s' % policy_id, location) - # Check the hard-coded items that get set as well... - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - expected_policy = {u'kind': u'object#ipsec-policy', - u'mode': u'tunnel', - u'protection-suite': {}, - u'lifetime-sec': 3600, - u'pfs': u'Disable', - u'anti-replay-window-size': u'None', - u'lifetime-kb': 4608000, - u'idle-time': None} - expected_policy.update(policy_info) - self.assertEqual(expected_policy, content) - - def test_create_ipsec_policy_with_uuid(self): - """Create IPSec policy using UUID for id.""" - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.normal_get): - policy_info = { - u'policy-id': u'%s' % dummy_policy_id, - u'protection-suite': { - u'esp-encryption': u'esp-256-aes', - u'esp-authentication': u'esp-sha-hmac', - u'ah': u'ah-sha-hmac', - }, - u'lifetime-sec': 120, - u'pfs': u'group5', - u'anti-replay-window-size': u'disable' - } - location = self.csr.create_ipsec_policy(policy_info) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/ipsec/policies/%s' % dummy_policy_id, - location) - # Check the hard-coded items that get set as well... - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - expected_policy = {u'kind': u'object#ipsec-policy', - u'mode': u'tunnel', - u'lifetime-kb': 4608000, - u'idle-time': None} - expected_policy.update(policy_info) - # CSR will respond with capitalized value - expected_policy[u'anti-replay-window-size'] = u'Disable' - self.assertEqual(expected_policy, content) - - def test_create_ipsec_policy_without_ah(self): - """Create IPSec policy.""" - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.get_no_ah): - policy_id = '10' - policy_info = { - u'policy-id': u'%s' % policy_id, - u'protection-suite': { - u'esp-encryption': u'esp-aes', - u'esp-authentication': u'esp-sha-hmac', - }, - u'lifetime-sec': 120, - u'pfs': u'group5', - u'anti-replay-window-size': u'128' - } - location = self.csr.create_ipsec_policy(policy_info) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/ipsec/policies/%s' % policy_id, location) - # Check the hard-coded items that get set as well... - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - expected_policy = {u'kind': u'object#ipsec-policy', - u'mode': u'tunnel', - u'lifetime-kb': 4608000, - u'idle-time': None} - expected_policy.update(policy_info) - self.assertEqual(expected_policy, content) - - def test_invalid_ipsec_policy_lifetime(self): - """Failure test of IPSec policy with unsupported lifetime.""" - with httmock.HTTMock(csr_request.token, - csr_request.post_bad_lifetime): - policy_id = '123' - policy_info = { - u'policy-id': u'%s' % policy_id, - u'protection-suite': { - u'esp-encryption': u'esp-aes', - u'esp-authentication': u'esp-sha-hmac', - u'ah': u'ah-sha-hmac', - }, - u'lifetime-sec': 119, - u'pfs': u'group5', - u'anti-replay-window-size': u'128' - } - self.csr.create_ipsec_policy(policy_info) - self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) - - def test_create_ipsec_policy_with_invalid_name(self): - """Failure test of creating IPSec policy with name too long.""" - with httmock.HTTMock(csr_request.token, - csr_request.post_bad_name, - csr_request.get_defaults): - policy_id = 'policy-name-is-too-long-32-chars' - policy_info = { - u'policy-id': u'%s' % policy_id, - } - self.csr.create_ipsec_policy(policy_info) - self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) - - -class TestCsrRestPreSharedKeyCreate(base.BaseTestCase): - - """Test Pre-shared key (PSK) create REST requests.""" - - def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): - super(TestCsrRestPreSharedKeyCreate, self).setUp() - info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip, - 'username': 'stack', 'password': 'cisco', 'timeout': timeout} - self.csr = csr_client.CsrRestClient(info) - - def test_create_delete_pre_shared_key(self): - """Create and then delete a keyring entry for pre-shared key.""" - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.normal_get): - psk_id = '5' - psk_info = {u'keyring-name': u'%s' % psk_id, - u'pre-shared-key-list': [ - {u'key': u'super-secret', - u'encrypted': False, - u'peer-address': u'10.10.10.20/24'} - ]} - location = self.csr.create_pre_shared_key(psk_info) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/ike/keyrings/%s' % psk_id, location) - # Check the hard-coded items that get set as well... - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - expected_policy = {u'kind': u'object#ike-keyring'} - expected_policy.update(psk_info) - # Note: the peer CIDR is returned as an IP and mask - expected_policy[u'pre-shared-key-list'][0][u'peer-address'] = ( - u'10.10.10.20 255.255.255.0') - self.assertEqual(expected_policy, content) - # Now delete and verify pre-shared key is gone - with httmock.HTTMock(csr_request.token, - csr_request.delete, - csr_request.no_such_resource): - self.csr.delete_pre_shared_key(psk_id) - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) - - def test_create_pre_shared_key_with_fqdn_peer(self): - """Create pre-shared key using FQDN for peer address.""" - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.get_fqdn): - psk_id = '5' - psk_info = {u'keyring-name': u'%s' % psk_id, - u'pre-shared-key-list': [ - {u'key': u'super-secret', - u'encrypted': False, - u'peer-address': u'cisco.com'} - ]} - location = self.csr.create_pre_shared_key(psk_info) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/ike/keyrings/%s' % psk_id, location) - # Check the hard-coded items that get set as well... - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - expected_policy = {u'kind': u'object#ike-keyring'} - expected_policy.update(psk_info) - self.assertEqual(expected_policy, content) - - def test_create_pre_shared_key_with_duplicate_peer_address(self): - """Negative test of creating a second pre-shared key with same peer.""" - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.normal_get): - psk_id = '5' - psk_info = {u'keyring-name': u'%s' % psk_id, - u'pre-shared-key-list': [ - {u'key': u'super-secret', - u'encrypted': False, - u'peer-address': u'10.10.10.20/24'} - ]} - location = self.csr.create_pre_shared_key(psk_info) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/ike/keyrings/%s' % psk_id, location) - with httmock.HTTMock(csr_request.token, - csr_request.post_duplicate): - psk_id = u'6' - another_psk_info = {u'keyring-name': psk_id, - u'pre-shared-key-list': [ - {u'key': u'abc123def', - u'encrypted': False, - u'peer-address': u'10.10.10.20/24'} - ]} - self.csr.create_ike_policy(another_psk_info) - self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) - - -class TestCsrRestIPSecConnectionCreate(base.BaseTestCase): - - """Test IPSec site-to-site connection REST requests. - - This requires us to have first created an IKE policy, IPSec policy, - and pre-shared key, so it's more of an itegration test, when used - with a real CSR (as we can't mock out these pre-conditions. - """ - - def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): - super(TestCsrRestIPSecConnectionCreate, self).setUp() - info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip, - 'username': 'stack', 'password': 'cisco', 'timeout': timeout} - self.csr = csr_client.CsrRestClient(info) - - def _make_psk_for_test(self): - psk_id = generate_pre_shared_key_id() - self._remove_resource_for_test(self.csr.delete_pre_shared_key, - psk_id) - with httmock.HTTMock(csr_request.token, - csr_request.post): - psk_info = {u'keyring-name': u'%d' % psk_id, - u'pre-shared-key-list': [ - {u'key': u'super-secret', - u'encrypted': False, - u'peer-address': u'10.10.10.20/24'} - ]} - self.csr.create_pre_shared_key(psk_info) - if self.csr.status != requests.codes.CREATED: - self.fail("Unable to create PSK for test case") - self.addCleanup(self._remove_resource_for_test, - self.csr.delete_pre_shared_key, psk_id) - return psk_id - - def _make_ike_policy_for_test(self): - policy_id = generate_ike_policy_id() - self._remove_resource_for_test(self.csr.delete_ike_policy, - policy_id) - with httmock.HTTMock(csr_request.token, - csr_request.post): - policy_info = {u'priority-id': u'%d' % policy_id, - u'encryption': u'aes', - u'hash': u'sha', - u'dhGroup': 5, - u'lifetime': 3600} - self.csr.create_ike_policy(policy_info) - if self.csr.status != requests.codes.CREATED: - self.fail("Unable to create IKE policy for test case") - self.addCleanup(self._remove_resource_for_test, - self.csr.delete_ike_policy, policy_id) - return policy_id - - def _make_ipsec_policy_for_test(self): - policy_id = generate_ipsec_policy_id() - self._remove_resource_for_test(self.csr.delete_ipsec_policy, - policy_id) - with httmock.HTTMock(csr_request.token, - csr_request.post): - policy_info = { - u'policy-id': u'%d' % policy_id, - u'protection-suite': { - u'esp-encryption': u'esp-aes', - u'esp-authentication': u'esp-sha-hmac', - u'ah': u'ah-sha-hmac', - }, - u'lifetime-sec': 120, - u'pfs': u'group5', - u'anti-replay-window-size': u'disable' - } - self.csr.create_ipsec_policy(policy_info) - if self.csr.status != requests.codes.CREATED: - self.fail("Unable to create IPSec policy for test case") - self.addCleanup(self._remove_resource_for_test, - self.csr.delete_ipsec_policy, policy_id) - return policy_id - - def _remove_resource_for_test(self, delete_resource, resource_id): - with httmock.HTTMock(csr_request.token, - csr_request.delete): - delete_resource(resource_id) - - def _prepare_for_site_conn_create(self, skip_psk=False, skip_ike=False, - skip_ipsec=False): - """Create the policies and PSK so can then create site conn.""" - if not skip_psk: - self._make_psk_for_test() - if not skip_ike: - self._make_ike_policy_for_test() - if not skip_ipsec: - ipsec_policy_id = self._make_ipsec_policy_for_test() - else: - ipsec_policy_id = generate_ipsec_policy_id() - # Note: Use same ID number for tunnel and IPSec policy, so that when - # GET tunnel info, the mocks can infer the IPSec policy ID from the - # tunnel number. - return (ipsec_policy_id, ipsec_policy_id) - - def test_create_delete_ipsec_connection(self): - """Create and then delete an IPSec connection.""" - tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create() - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.normal_get): - connection_info = { - u'vpn-interface-name': u'Tunnel%d' % tunnel_id, - u'ipsec-policy-id': u'%d' % ipsec_policy_id, - u'mtu': 1500, - u'local-device': {u'ip-address': u'10.3.0.1/24', - u'tunnel-ip-address': u'10.10.10.10'}, - u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} - } - location = self.csr.create_ipsec_connection(connection_info) - self.addCleanup(self._remove_resource_for_test, - self.csr.delete_ipsec_connection, - 'Tunnel%d' % tunnel_id) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/site-to-site/Tunnel%d' % tunnel_id, - location) - # Check the hard-coded items that get set as well... - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - expected_connection = {u'kind': u'object#vpn-site-to-site', - u'ike-profile-id': None, - u'mtu': 1500, - u'ip-version': u'ipv4'} - expected_connection.update(connection_info) - self.assertEqual(expected_connection, content) - # Now delete and verify that site-to-site connection is gone - with httmock.HTTMock(csr_request.token, - csr_request.delete, - csr_request.no_such_resource): - # Only delete connection. Cleanup will take care of prerequisites - self.csr.delete_ipsec_connection('Tunnel%d' % tunnel_id) - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) - - def test_create_ipsec_connection_with_no_tunnel_subnet(self): - """Create an IPSec connection without an IP address on tunnel.""" - tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create() - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.get_unnumbered): - connection_info = { - u'vpn-interface-name': u'Tunnel%d' % tunnel_id, - u'ipsec-policy-id': u'%d' % ipsec_policy_id, - u'local-device': {u'ip-address': u'GigabitEthernet3', - u'tunnel-ip-address': u'10.10.10.10'}, - u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} - } - location = self.csr.create_ipsec_connection(connection_info) - self.addCleanup(self._remove_resource_for_test, - self.csr.delete_ipsec_connection, - 'Tunnel%d' % tunnel_id) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/site-to-site/Tunnel%d' % tunnel_id, - location) - # Check the hard-coded items that get set as well... - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - expected_connection = {u'kind': u'object#vpn-site-to-site', - u'ike-profile-id': None, - u'mtu': 1500, - u'ip-version': u'ipv4'} - expected_connection.update(connection_info) - self.assertEqual(expected_connection, content) - - def test_create_ipsec_connection_no_pre_shared_key(self): - """Test of connection create without associated pre-shared key. - - The CSR will create the connection, but will not be able to pass - traffic without the pre-shared key. - """ - - tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create( - skip_psk=True) - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.normal_get): - connection_info = { - u'vpn-interface-name': u'Tunnel%d' % tunnel_id, - u'ipsec-policy-id': u'%d' % ipsec_policy_id, - u'mtu': 1500, - u'local-device': {u'ip-address': u'10.3.0.1/24', - u'tunnel-ip-address': u'10.10.10.10'}, - u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} - } - location = self.csr.create_ipsec_connection(connection_info) - self.addCleanup(self._remove_resource_for_test, - self.csr.delete_ipsec_connection, - 'Tunnel%d' % tunnel_id) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/site-to-site/Tunnel%d' % tunnel_id, - location) - # Check the hard-coded items that get set as well... - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - expected_connection = {u'kind': u'object#vpn-site-to-site', - u'ike-profile-id': None, - u'mtu': 1500, - u'ip-version': u'ipv4'} - expected_connection.update(connection_info) - self.assertEqual(expected_connection, content) - - def test_create_ipsec_connection_with_default_ike_policy(self): - """Test of connection create without IKE policy (uses default). - - Without an IKE policy, the CSR will use a built-in default IKE - policy setting for the connection. - """ - - tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create( - skip_ike=True) - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.normal_get): - connection_info = { - u'vpn-interface-name': u'Tunnel%d' % tunnel_id, - u'ipsec-policy-id': u'%d' % ipsec_policy_id, - u'mtu': 1500, - u'local-device': {u'ip-address': u'10.3.0.1/24', - u'tunnel-ip-address': u'10.10.10.10'}, - u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} - } - location = self.csr.create_ipsec_connection(connection_info) - self.addCleanup(self._remove_resource_for_test, - self.csr.delete_ipsec_connection, - 'Tunnel%d' % tunnel_id) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/site-to-site/Tunnel%d' % tunnel_id, - location) - # Check the hard-coded items that get set as well... - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - expected_connection = {u'kind': u'object#vpn-site-to-site', - u'ike-profile-id': None, - u'mtu': 1500, - u'ip-version': u'ipv4'} - expected_connection.update(connection_info) - self.assertEqual(expected_connection, content) - - def test_set_ipsec_connection_admin_state_changes(self): - """Create IPSec connection in admin down state.""" - tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create() - tunnel = u'Tunnel%d' % tunnel_id - with httmock.HTTMock(csr_request.token, - csr_request.post): - connection_info = { - u'vpn-interface-name': tunnel, - u'ipsec-policy-id': u'%d' % ipsec_policy_id, - u'mtu': 1500, - u'local-device': {u'ip-address': u'10.3.0.1/24', - u'tunnel-ip-address': u'10.10.10.10'}, - u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} - } - location = self.csr.create_ipsec_connection(connection_info) - self.addCleanup(self._remove_resource_for_test, - self.csr.delete_ipsec_connection, - tunnel) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/site-to-site/%s' % tunnel, location) - state_uri = location + "/state" - # Note: When created, the tunnel will be in admin 'up' state - # Note: Line protocol state will be down, unless have an active conn. - expected_state = {u'kind': u'object#vpn-site-to-site-state', - u'vpn-interface-name': tunnel, - u'line-protocol-state': u'down', - u'enabled': False} - with httmock.HTTMock(csr_request.put, - csr_request.get_admin_down): - self.csr.set_ipsec_connection_state(tunnel, admin_up=False) - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - content = self.csr.get_request(state_uri, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - self.assertEqual(expected_state, content) - - with httmock.HTTMock(csr_request.put, - csr_request.get_admin_up): - self.csr.set_ipsec_connection_state(tunnel, admin_up=True) - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - content = self.csr.get_request(state_uri, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - expected_state[u'enabled'] = True - self.assertEqual(expected_state, content) - - def test_create_ipsec_connection_missing_ipsec_policy(self): - """Negative test of connection create without IPSec policy.""" - tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create( - skip_ipsec=True) - with httmock.HTTMock( - csr_request.token, - csr_request.post_missing_ipsec_policy): - connection_info = { - u'vpn-interface-name': u'Tunnel%d' % tunnel_id, - u'ipsec-policy-id': u'%d' % ipsec_policy_id, - u'local-device': {u'ip-address': u'10.3.0.1/24', - u'tunnel-ip-address': u'10.10.10.10'}, - u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} - } - self.csr.create_ipsec_connection(connection_info) - self.addCleanup(self._remove_resource_for_test, - self.csr.delete_ipsec_connection, - 'Tunnel%d' % tunnel_id) - self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) - - def _determine_conflicting_ip(self): - with httmock.HTTMock(csr_request.token, - csr_request.get_local_ip): - details = self.csr.get_request('interfaces/GigabitEthernet3') - if self.csr.status != requests.codes.OK: - self.fail("Unable to obtain interface GigabitEthernet3's IP") - if_ip = details.get('ip-address') - if not if_ip: - self.fail("No IP address for GigabitEthernet3 interface") - return '.'.join(if_ip.split('.')[:3]) + '.10' - - def test_create_ipsec_connection_conficting_tunnel_ip(self): - """Negative test of connection create with conflicting tunnel IP. - - Find out the IP of a local interface (GigabitEthernet3) and create an - IP that is on the same subnet. Note: this interface needs to be up. - """ - - conflicting_ip = self._determine_conflicting_ip() - tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create() - with httmock.HTTMock(csr_request.token, - csr_request.post_bad_ip): - connection_info = { - u'vpn-interface-name': u'Tunnel%d' % tunnel_id, - u'ipsec-policy-id': u'%d' % ipsec_policy_id, - u'local-device': {u'ip-address': u'%s/24' % conflicting_ip, - u'tunnel-ip-address': u'10.10.10.10'}, - u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} - } - self.csr.create_ipsec_connection(connection_info) - self.addCleanup(self._remove_resource_for_test, - self.csr.delete_ipsec_connection, - 'Tunnel%d' % tunnel_id) - self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) - - def test_create_ipsec_connection_with_max_mtu(self): - """Create an IPSec connection with max MTU value.""" - tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create() - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.get_mtu): - connection_info = { - u'vpn-interface-name': u'Tunnel%d' % tunnel_id, - u'ipsec-policy-id': u'%d' % ipsec_policy_id, - u'mtu': 9192, - u'local-device': {u'ip-address': u'10.3.0.1/24', - u'tunnel-ip-address': u'10.10.10.10'}, - u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} - } - location = self.csr.create_ipsec_connection(connection_info) - self.addCleanup(self._remove_resource_for_test, - self.csr.delete_ipsec_connection, - 'Tunnel%d' % tunnel_id) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/site-to-site/Tunnel%d' % tunnel_id, - location) - # Check the hard-coded items that get set as well... - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - expected_connection = {u'kind': u'object#vpn-site-to-site', - u'ike-profile-id': None, - u'ip-version': u'ipv4'} - expected_connection.update(connection_info) - self.assertEqual(expected_connection, content) - - def test_create_ipsec_connection_with_bad_mtu(self): - """Negative test of connection create with unsupported MTU value.""" - tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create() - with httmock.HTTMock(csr_request.token, - csr_request.post_bad_mtu): - connection_info = { - u'vpn-interface-name': u'Tunnel%d' % tunnel_id, - u'ipsec-policy-id': u'%d' % ipsec_policy_id, - u'mtu': 9193, - u'local-device': {u'ip-address': u'10.3.0.1/24', - u'tunnel-ip-address': u'10.10.10.10'}, - u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} - } - self.csr.create_ipsec_connection(connection_info) - self.addCleanup(self._remove_resource_for_test, - self.csr.delete_ipsec_connection, - 'Tunnel%d' % tunnel_id) - self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) - - def test_status_when_no_tunnels_exist(self): - """Get status, when there are no tunnels.""" - with httmock.HTTMock(csr_request.token, - csr_request.get_none): - tunnels = self.csr.read_tunnel_statuses() - self.assertEqual(requests.codes.OK, self.csr.status) - self.assertEqual([], tunnels) - - def test_status_for_one_tunnel(self): - """Get status of one tunnel.""" - # Create the IPsec site-to-site connection first - tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create() - tunnel_id = 123 # Must hard code to work with mock - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.normal_get): - connection_info = { - u'vpn-interface-name': u'Tunnel123', - u'ipsec-policy-id': u'%d' % ipsec_policy_id, - u'local-device': {u'ip-address': u'10.3.0.1/24', - u'tunnel-ip-address': u'10.10.10.10'}, - u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} - } - location = self.csr.create_ipsec_connection(connection_info) - self.addCleanup(self._remove_resource_for_test, - self.csr.delete_ipsec_connection, - u'Tunnel123') - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/site-to-site/Tunnel%d' % tunnel_id, - location) - with httmock.HTTMock(csr_request.token, - csr_request.normal_get): - tunnels = self.csr.read_tunnel_statuses() - self.assertEqual(requests.codes.OK, self.csr.status) - self.assertEqual([(u'Tunnel123', u'DOWN-NEGOTIATING'), ], tunnels) - - -class TestCsrRestIkeKeepaliveCreate(base.BaseTestCase): - - """Test IKE keepalive REST requests. - - Note: On the Cisco CSR, the IKE keepalive for v1 is a global configuration - that applies to all VPN tunnels to specify Dead Peer Detection information. - As a result, this REST API is not used in the OpenStack device driver, and - the keepalive will default to zero (disabled). - """ - - def _save_dpd_info(self): - with httmock.HTTMock(csr_request.token, - csr_request.normal_get): - details = self.csr.get_request('vpn-svc/ike/keepalive') - if self.csr.status == requests.codes.OK: - self.dpd = details - self.addCleanup(self._restore_dpd_info) - elif self.csr.status != requests.codes.NOT_FOUND: - self.fail("Unable to save original DPD info") - - def _restore_dpd_info(self): - with httmock.HTTMock(csr_request.token, - csr_request.put): - payload = {'interval': self.dpd['interval'], - 'retry': self.dpd['retry']} - self.csr.put_request('vpn-svc/ike/keepalive', payload=payload) - if self.csr.status != requests.codes.NO_CONTENT: - self.fail("Unable to restore DPD info after test") - - def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): - super(TestCsrRestIkeKeepaliveCreate, self).setUp() - info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip, - 'username': 'stack', 'password': 'cisco', 'timeout': timeout} - self.csr = csr_client.CsrRestClient(info) - self._save_dpd_info() - self.csr.token = None - - def test_configure_ike_keepalive(self): - """Set IKE keep-alive (aka Dead Peer Detection) for the CSR.""" - with httmock.HTTMock(csr_request.token, - csr_request.put, - csr_request.normal_get): - keepalive_info = {'interval': 60, 'retry': 4} - self.csr.configure_ike_keepalive(keepalive_info) - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - content = self.csr.get_request('vpn-svc/ike/keepalive') - self.assertEqual(requests.codes.OK, self.csr.status) - expected = {'periodic': False} - expected.update(keepalive_info) - self.assertDictContainsSubset(expected, content) - - def test_disable_ike_keepalive(self): - """Disable IKE keep-alive (aka Dead Peer Detection) for the CSR.""" - with httmock.HTTMock(csr_request.token, - csr_request.delete, - csr_request.put, - csr_request.get_not_configured): - keepalive_info = {'interval': 0, 'retry': 4} - self.csr.configure_ike_keepalive(keepalive_info) - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - - -class TestCsrRestStaticRoute(base.BaseTestCase): - - """Test static route REST requests. - - A static route is added for the peer's private network. Would create - a route for each of the peer CIDRs specified for the VPN connection. - """ - - def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): - super(TestCsrRestStaticRoute, self).setUp() - info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip, - 'username': 'stack', 'password': 'cisco', 'timeout': timeout} - self.csr = csr_client.CsrRestClient(info) - - def test_create_delete_static_route(self): - """Create and then delete a static route for the tunnel.""" - cidr = u'10.1.0.0/24' - interface = u'GigabitEthernet1' - expected_id = '10.1.0.0_24_GigabitEthernet1' - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.normal_get): - route_info = {u'destination-network': cidr, - u'outgoing-interface': interface} - location = self.csr.create_static_route(route_info) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('routing-svc/static-routes/%s' % expected_id, - location) - # Check the hard-coded items that get set as well... - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - expected_route = {u'kind': u'object#static-route', - u'next-hop-router': None, - u'admin-distance': 1} - expected_route.update(route_info) - self.assertEqual(expected_route, content) - # Now delete and verify that static route is gone - with httmock.HTTMock(csr_request.token, - csr_request.delete, - csr_request.no_such_resource): - route_id = csr_client.make_route_id(cidr, interface) - self.csr.delete_static_route(route_id) - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) diff --git a/neutron/tests/unit/services/vpn/device_drivers/test_cisco_csr_rest.py b/neutron/tests/unit/services/vpn/device_drivers/test_cisco_csr_rest.py new file mode 100644 index 0000000000..b7092af457 --- /dev/null +++ b/neutron/tests/unit/services/vpn/device_drivers/test_cisco_csr_rest.py @@ -0,0 +1,1626 @@ +# Copyright 2014 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import random +import re + +import requests +from requests import exceptions as r_exc +from requests_mock.contrib import fixture as mock_fixture + +from neutron.services.vpn.device_drivers import ( + cisco_csr_rest_client as csr_client) +from neutron.tests import base + + +dummy_policy_id = 'dummy-ipsec-policy-id-name' +BASE_URL = 'https://%s:55443/api/v1/' +LOCAL_URL = 'https://localhost:55443/api/v1/' + +URI_HOSTNAME = 'global/host-name' +URI_USERS = 'global/local-users' +URI_AUTH = 'auth/token-services' +URI_INTERFACE_GE1 = 'interfaces/GigabitEthernet1' +URI_PSK = 'vpn-svc/ike/keyrings' +URI_PSK_ID = URI_PSK + '/%s' +URI_IKE_POLICY = 'vpn-svc/ike/policies' +URI_IKE_POLICY_ID = URI_IKE_POLICY + '/%s' +URI_IPSEC_POLICY = 'vpn-svc/ipsec/policies' +URI_IPSEC_POLICY_ID = URI_IPSEC_POLICY + '/%s' +URI_IPSEC_CONN = 'vpn-svc/site-to-site' +URI_IPSEC_CONN_ID = URI_IPSEC_CONN + '/%s' +URI_KEEPALIVE = 'vpn-svc/ike/keepalive' +URI_ROUTES = 'routing-svc/static-routes' +URI_ROUTES_ID = URI_ROUTES + '/%s' +URI_SESSIONS = 'vpn-svc/site-to-site/active/sessions' + + +# Note: Helper functions to test reuse of IDs. +def generate_pre_shared_key_id(): + return random.randint(100, 200) + + +def generate_ike_policy_id(): + return random.randint(200, 300) + + +def generate_ipsec_policy_id(): + return random.randint(300, 400) + + +class CiscoCsrBaseTestCase(base.BaseTestCase): + + """Helper methods to register mock intercepts - used by child classes.""" + + def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): + super(CiscoCsrBaseTestCase, self).setUp() + self.base_url = BASE_URL % host + self.requests = self.useFixture(mock_fixture.Fixture()) + info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip, + 'username': 'stack', 'password': 'cisco', 'timeout': timeout} + self.csr = csr_client.CsrRestClient(info) + + def _register_local_get(self, uri, json=None, + result_code=requests.codes.OK): + self.requests.register_uri( + 'GET', + LOCAL_URL + uri, + status_code=result_code, + json=json) + + def _register_local_post(self, uri, resource_id, + result_code=requests.codes.CREATED): + self.requests.register_uri( + 'POST', + LOCAL_URL + uri, + status_code=result_code, + headers={'location': LOCAL_URL + uri + '/' + str(resource_id)}) + + def _register_local_delete(self, uri, resource_id, json=None, + result_code=requests.codes.NO_CONTENT): + self.requests.register_uri( + 'DELETE', + LOCAL_URL + uri + '/' + str(resource_id), + status_code=result_code, + json=json) + + def _register_local_delete_by_id(self, resource_id, + result_code=requests.codes.NO_CONTENT): + local_resource_re = re.compile(LOCAL_URL + '.+%s$' % resource_id) + self.requests.register_uri( + 'DELETE', + local_resource_re, + status_code=result_code) + + def _register_local_put(self, uri, resource_id, + result_code=requests.codes.NO_CONTENT): + self.requests.register_uri('PUT', + LOCAL_URL + uri + '/' + resource_id, + status_code=result_code) + + def _register_local_get_not_found(self, uri, resource_id, + result_code=requests.codes.NOT_FOUND): + self.requests.register_uri( + 'GET', + LOCAL_URL + uri + '/' + str(resource_id), + status_code=result_code) + + def _helper_register_auth_request(self): + self.requests.register_uri('POST', + LOCAL_URL + URI_AUTH, + status_code=requests.codes.OK, + json={'token-id': 'dummy-token'}) + + def _helper_register_psk_post(self, psk_id): + self._register_local_post(URI_PSK, psk_id) + + def _helper_register_ike_policy_post(self, policy_id): + self._register_local_post(URI_IKE_POLICY, policy_id) + + def _helper_register_ipsec_policy_post(self, policy_id): + self._register_local_post(URI_IPSEC_POLICY, policy_id) + + def _helper_register_tunnel_post(self, tunnel): + self._register_local_post(URI_IPSEC_CONN, tunnel) + + +class TestCsrLoginRestApi(CiscoCsrBaseTestCase): + + """Test logging into CSR to obtain token-id.""" + + def test_get_token(self): + """Obtain the token and its expiration time.""" + self._helper_register_auth_request() + self.assertTrue(self.csr.authenticate()) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertIsNotNone(self.csr.token) + + def test_unauthorized_token_request(self): + """Negative test of invalid user/password.""" + self.requests.register_uri('POST', + LOCAL_URL + URI_AUTH, + status_code=requests.codes.UNAUTHORIZED) + self.csr.auth = ('stack', 'bogus') + self.assertIsNone(self.csr.authenticate()) + self.assertEqual(requests.codes.UNAUTHORIZED, self.csr.status) + + def _simulate_wrong_host(self, request): + if 'wrong-host' in request.url: + raise r_exc.ConnectionError() + + def test_non_existent_host(self): + """Negative test of request to non-existent host.""" + self.requests.add_matcher(self._simulate_wrong_host) + self.csr.host = 'wrong-host' + self.csr.token = 'Set by some previously successful access' + self.assertIsNone(self.csr.authenticate()) + self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) + self.assertIsNone(self.csr.token) + + def _simulate_token_timeout(self, request): + raise r_exc.Timeout() + + def test_timeout_on_token_access(self): + """Negative test of a timeout on a request.""" + self.requests.add_matcher(self._simulate_token_timeout) + self.assertIsNone(self.csr.authenticate()) + self.assertEqual(requests.codes.REQUEST_TIMEOUT, self.csr.status) + self.assertIsNone(self.csr.token) + + +class TestCsrGetRestApi(CiscoCsrBaseTestCase): + + """Test CSR GET REST API.""" + + def test_valid_rest_gets(self): + """Simple GET requests. + + First request will do a post to get token (login). Assumes + that there are two interfaces on the CSR. + """ + + self._helper_register_auth_request() + self._register_local_get(URI_HOSTNAME, + json={u'kind': u'object#host-name', + u'host-name': u'Router'}) + self._register_local_get(URI_USERS, + json={u'kind': u'collection#local-user', + u'users': ['peter', 'paul', 'mary']}) + + actual = self.csr.get_request(URI_HOSTNAME) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertIn('host-name', actual) + self.assertIsNotNone(actual['host-name']) + + actual = self.csr.get_request(URI_USERS) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertIn('users', actual) + + +class TestCsrPostRestApi(CiscoCsrBaseTestCase): + + """Test CSR POST REST API.""" + + def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): + """Setup for each test in this suite. + + Each test case will have a normal authentication mock response + registered here, although they may replace it, as needed. + """ + super(TestCsrPostRestApi, self).setUp(host, tunnel_ip, timeout) + self._helper_register_auth_request() + + def test_post_requests(self): + """Simple POST requests (repeatable). + + First request will do a post to get token (login). Assumes + that there are two interfaces (Ge1 and Ge2) on the CSR. + """ + + interface_re = re.compile('https://localhost:55443/.*/interfaces/' + 'GigabitEthernet\d/statistics') + self.requests.register_uri('POST', + interface_re, + status_code=requests.codes.NO_CONTENT) + + actual = self.csr.post_request( + 'interfaces/GigabitEthernet1/statistics', + payload={'action': 'clear'}) + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + self.assertIsNone(actual) + actual = self.csr.post_request( + 'interfaces/GigabitEthernet2/statistics', + payload={'action': 'clear'}) + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + self.assertIsNone(actual) + + def test_post_with_location(self): + """Create a user and verify that location returned.""" + self.requests.register_uri( + 'POST', + LOCAL_URL + URI_USERS, + status_code=requests.codes.CREATED, + headers={'location': LOCAL_URL + URI_USERS + '/test-user'}) + location = self.csr.post_request( + URI_USERS, + payload={'username': 'test-user', + 'password': 'pass12345', + 'privilege': 15}) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_USERS + '/test-user', location) + + def test_post_missing_required_attribute(self): + """Negative test of POST with missing mandatory info.""" + self.requests.register_uri('POST', + LOCAL_URL + URI_USERS, + status_code=requests.codes.BAD_REQUEST) + self.csr.post_request(URI_USERS, + payload={'password': 'pass12345', + 'privilege': 15}) + self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) + + def test_post_invalid_attribute(self): + """Negative test of POST with invalid info.""" + self.requests.register_uri('POST', + LOCAL_URL + URI_USERS, + status_code=requests.codes.BAD_REQUEST) + self.csr.post_request(URI_USERS, + payload={'username': 'test-user', + 'password': 'pass12345', + 'privilege': 20}) + self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) + + def test_post_already_exists(self): + """Negative test of a duplicate POST. + + Uses the lower level _do_request() API to just perform the POST and + obtain the response, without any error processing. + """ + + self.requests.register_uri( + 'POST', + LOCAL_URL + URI_USERS, + status_code=requests.codes.CREATED, + headers={'location': LOCAL_URL + URI_USERS + '/test-user'}) + + location = self.csr._do_request( + 'POST', + URI_USERS, + payload={'username': 'test-user', + 'password': 'pass12345', + 'privilege': 15}, + more_headers=csr_client.HEADER_CONTENT_TYPE_JSON) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_USERS + '/test-user', location) + self.csr.post_request(URI_USERS, + payload={'username': 'test-user', + 'password': 'pass12345', + 'privilege': 20}) + + self.requests.register_uri( + 'POST', + LOCAL_URL + URI_USERS, + status_code=requests.codes.NOT_FOUND, + json={u'error-code': -1, + u'error-message': u'user test-user already exists'}) + + self.csr._do_request( + 'POST', + URI_USERS, + payload={'username': 'test-user', + 'password': 'pass12345', + 'privilege': 15}, + more_headers=csr_client.HEADER_CONTENT_TYPE_JSON) + # Note: For local-user, a 404 error is returned. For + # site-to-site connection a 400 is returned. + self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) + + def test_post_changing_value(self): + """Negative test of a POST trying to change a value.""" + self.requests.register_uri( + 'POST', + LOCAL_URL + URI_USERS, + status_code=requests.codes.CREATED, + headers={'location': LOCAL_URL + URI_USERS + '/test-user'}) + + location = self.csr.post_request( + URI_USERS, + payload={'username': 'test-user', + 'password': 'pass12345', + 'privilege': 15}) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_USERS + '/test-user', location) + + self.requests.register_uri( + 'POST', + LOCAL_URL + URI_USERS, + status_code=requests.codes.NOT_FOUND, + json={u'error-code': -1, + u'error-message': u'user test-user already exists'}) + + actual = self.csr.post_request(URI_USERS, + payload={'username': 'test-user', + 'password': 'changed', + 'privilege': 15}) + self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) + expected = {u'error-code': -1, + u'error-message': u'user test-user already exists'} + self.assertDictSupersetOf(expected, actual) + + +class TestCsrPutRestApi(CiscoCsrBaseTestCase): + + """Test CSR PUT REST API.""" + + def _save_resources(self): + self._register_local_get(URI_HOSTNAME, + json={u'kind': u'object#host-name', + u'host-name': u'Router'}) + interface_info = {u'kind': u'object#interface', + u'description': u'Changed description', + u'if-name': 'interfaces/GigabitEthernet1', + u'proxy-arp': True, + u'subnet-mask': u'255.255.255.0', + u'icmp-unreachable': True, + u'nat-direction': u'', + u'icmp-redirects': True, + u'ip-address': u'192.168.200.1', + u'verify-unicast-source': False, + u'type': u'ethernet'} + self._register_local_get(URI_INTERFACE_GE1, + json=interface_info) + details = self.csr.get_request(URI_HOSTNAME) + if self.csr.status != requests.codes.OK: + self.fail("Unable to save original host name") + self.original_host = details['host-name'] + details = self.csr.get_request(URI_INTERFACE_GE1) + if self.csr.status != requests.codes.OK: + self.fail("Unable to save interface Ge1 description") + self.original_if = details + self.csr.token = None + + def _restore_resources(self, user, password): + """Restore the host name and interface description. + + Must restore the user and password, so that authentication + token can be obtained (as some tests corrupt auth info). + Will also clear token, so that it gets a fresh token. + """ + + self._register_local_put('global', 'host-name') + self._register_local_put('interfaces', 'GigabitEthernet1') + + self.csr.auth = (user, password) + self.csr.token = None + payload = {'host-name': self.original_host} + self.csr.put_request(URI_HOSTNAME, payload=payload) + if self.csr.status != requests.codes.NO_CONTENT: + self.fail("Unable to restore host name after test") + payload = {'description': self.original_if['description'], + 'if-name': self.original_if['if-name'], + 'ip-address': self.original_if['ip-address'], + 'subnet-mask': self.original_if['subnet-mask'], + 'type': self.original_if['type']} + self.csr.put_request(URI_INTERFACE_GE1, + payload=payload) + if self.csr.status != requests.codes.NO_CONTENT: + self.fail("Unable to restore I/F Ge1 description after test") + + def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): + """Setup for each test in this suite. + + Each test case will have a normal authentication mock response + registered here, although they may replace it, as needed. In + addition, resources are saved, before each test is run, and + restored, after each test completes. + """ + super(TestCsrPutRestApi, self).setUp(host, tunnel_ip, timeout) + self._helper_register_auth_request() + self._save_resources() + self.addCleanup(self._restore_resources, 'stack', 'cisco') + + def test_put_requests(self): + """Simple PUT requests (repeatable). + + First request will do a post to get token (login). Assumes + that there are two interfaces on the CSR (Ge1 and Ge2). + """ + + self._register_local_put('interfaces', 'GigabitEthernet1') + self._register_local_put('global', 'host-name') + + actual = self.csr.put_request(URI_HOSTNAME, + payload={'host-name': 'TestHost'}) + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + self.assertIsNone(actual) + + actual = self.csr.put_request(URI_HOSTNAME, + payload={'host-name': 'TestHost2'}) + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + self.assertIsNone(actual) + + def test_change_interface_description(self): + """Test that interface description can be changed. + + This was a problem with an earlier version of the CSR image and is + here to prevent regression. + """ + self._register_local_put('interfaces', 'GigabitEthernet1') + payload = {'description': u'Changed description', + 'if-name': self.original_if['if-name'], + 'ip-address': self.original_if['ip-address'], + 'subnet-mask': self.original_if['subnet-mask'], + 'type': self.original_if['type']} + actual = self.csr.put_request(URI_INTERFACE_GE1, payload=payload) + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + self.assertIsNone(actual) + actual = self.csr.get_request(URI_INTERFACE_GE1) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertIn('description', actual) + self.assertEqual(u'Changed description', + actual['description']) + + def ignore_test_change_to_empty_interface_description(self): + """Test that interface description can be changed to empty string. + + This is here to prevent regression, where the CSR was rejecting + an attempt to set the description to an empty string. + """ + self._register_local_put('interfaces', 'GigabitEthernet1') + payload = {'description': '', + 'if-name': self.original_if['if-name'], + 'ip-address': self.original_if['ip-address'], + 'subnet-mask': self.original_if['subnet-mask'], + 'type': self.original_if['type']} + actual = self.csr.put_request(URI_INTERFACE_GE1, payload=payload) + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + self.assertIsNone(actual) + actual = self.csr.get_request(URI_INTERFACE_GE1) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertIn('description', actual) + self.assertEqual('', actual['description']) + + +class TestCsrDeleteRestApi(CiscoCsrBaseTestCase): + + """Test CSR DELETE REST API.""" + + def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): + """Setup for each test in this suite. + + Each test case will have a normal authentication mock response + registered here, although they may replace it, as needed. + """ + super(TestCsrDeleteRestApi, self).setUp(host, tunnel_ip, timeout) + self._helper_register_auth_request() + + def _make_dummy_user(self): + """Create a user that will be later deleted.""" + self.requests.register_uri( + 'POST', + LOCAL_URL + URI_USERS, + status_code=requests.codes.CREATED, + headers={'location': LOCAL_URL + URI_USERS + '/dummy'}) + self.csr.post_request(URI_USERS, + payload={'username': 'dummy', + 'password': 'dummy', + 'privilege': 15}) + self.assertEqual(requests.codes.CREATED, self.csr.status) + + def test_delete_requests(self): + """Simple DELETE requests (creating entry first).""" + self._register_local_delete(URI_USERS, 'dummy') + self._make_dummy_user() + self.csr.token = None # Force login + self.csr.delete_request(URI_USERS + '/dummy') + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + # Delete again, but without logging in this time + self._make_dummy_user() + self.csr.delete_request(URI_USERS + '/dummy') + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + + def test_delete_non_existent_entry(self): + """Negative test of trying to delete a non-existent user.""" + expected = {u'error-code': -1, + u'error-message': u'user unknown not found'} + self._register_local_delete(URI_USERS, 'unknown', + result_code=requests.codes.NOT_FOUND, + json=expected) + actual = self.csr.delete_request(URI_USERS + '/unknown') + self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) + self.assertDictSupersetOf(expected, actual) + + def test_delete_not_allowed(self): + """Negative test of trying to delete the host-name.""" + self._register_local_delete( + 'global', 'host-name', + result_code=requests.codes.METHOD_NOT_ALLOWED) + self.csr.delete_request(URI_HOSTNAME) + self.assertEqual(requests.codes.METHOD_NOT_ALLOWED, + self.csr.status) + + +class TestCsrRestApiFailures(CiscoCsrBaseTestCase): + + """Test failure cases common for all REST APIs. + + Uses the lower level _do_request() to just perform the operation and get + the result, without any error handling. + """ + + def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=0.1): + """Setup for each test in this suite. + + Each test case will have a normal authentication mock response + registered here, although they may replace it, as needed. + """ + super(TestCsrRestApiFailures, self).setUp(host, tunnel_ip, timeout) + self._helper_register_auth_request() + + def _simulate_timeout(self, request): + if URI_HOSTNAME in request.path_uri: + raise r_exc.Timeout() + + def test_request_for_non_existent_resource(self): + """Negative test of non-existent resource on REST request.""" + self.requests.register_uri('POST', + LOCAL_URL + 'no/such/request', + status_code=requests.codes.NOT_FOUND) + self.csr.post_request('no/such/request') + self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) + # The result is HTTP 404 message, so no error content to check + + def _simulate_get_timeout(self, request): + """Will raise exception for any host request to this resource.""" + if URI_HOSTNAME in request.path_url: + raise r_exc.Timeout() + + def test_timeout_during_request(self): + """Negative test of timeout during REST request.""" + self.requests.add_matcher(self._simulate_get_timeout) + self.csr._do_request('GET', URI_HOSTNAME) + self.assertEqual(requests.codes.REQUEST_TIMEOUT, self.csr.status) + + def _simulate_auth_failure(self, request): + """First time auth POST is done, re-report unauthorized.""" + if URI_AUTH in request.path_url and not self.called_once: + self.called_once = True + resp = requests.Response() + resp.status_code = requests.codes.UNAUTHORIZED + return resp + + def test_token_expired_on_request(self): + """Token expired before trying a REST request. + + First, the token is set to a bogus value, to force it to + try to authenticate on the GET request. Second, a mock that + runs once, will simulate an auth failure. Third, the normal + auth mock will simulate success. + """ + + self._register_local_get(URI_HOSTNAME, + json={u'kind': u'object#host-name', + u'host-name': u'Router'}) + self.called_once = False + self.requests.add_matcher(self._simulate_auth_failure) + self.csr.token = '123' # These are 44 characters, so won't match + actual = self.csr._do_request('GET', URI_HOSTNAME) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertIn('host-name', actual) + self.assertIsNotNone(actual['host-name']) + + def test_failed_to_obtain_token_for_request(self): + """Negative test of unauthorized user for REST request.""" + self.csr.auth = ('stack', 'bogus') + self._register_local_get(URI_HOSTNAME, + result_code=requests.codes.UNAUTHORIZED) + self.csr._do_request('GET', URI_HOSTNAME) + self.assertEqual(requests.codes.UNAUTHORIZED, self.csr.status) + + +class TestCsrRestIkePolicyCreate(CiscoCsrBaseTestCase): + + """Test IKE policy create REST requests.""" + + def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): + """Setup for each test in this suite. + + Each test case will have a normal authentication and post mock + response registered, although the test may replace them, if needed. + """ + super(TestCsrRestIkePolicyCreate, self).setUp(host, tunnel_ip, timeout) + self._helper_register_auth_request() + self._helper_register_ike_policy_post(2) + + def _helper_register_ike_policy_get(self): + content = {u'kind': u'object#ike-policy', + u'priority-id': u'2', + u'version': u'v1', + u'local-auth-method': u'pre-share', + u'encryption': u'aes256', + u'hash': u'sha', + u'dhGroup': 5, + u'lifetime': 3600} + self._register_local_get(URI_IKE_POLICY_ID % '2', json=content) + + def test_create_delete_ike_policy(self): + """Create and then delete IKE policy.""" + self._helper_register_ike_policy_get() + policy_info = {u'priority-id': u'2', + u'encryption': u'aes256', + u'hash': u'sha', + u'dhGroup': 5, + u'lifetime': 3600} + location = self.csr.create_ike_policy(policy_info) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_IKE_POLICY_ID % '2', location) + # Check the hard-coded items that get set as well... + actual = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + expected_policy = {u'kind': u'object#ike-policy', + u'version': u'v1', + u'local-auth-method': u'pre-share'} + expected_policy.update(policy_info) + self.assertEqual(expected_policy, actual) + + # Now delete and verify the IKE policy is gone + self._register_local_delete(URI_IKE_POLICY, 2) + self._register_local_get_not_found(URI_IKE_POLICY, 2) + + self.csr.delete_ike_policy(2) + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + actual = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) + + def test_create_ike_policy_with_defaults(self): + """Create IKE policy using defaults for all optional values.""" + policy = {u'kind': u'object#ike-policy', + u'priority-id': u'2', + u'version': u'v1', + u'local-auth-method': u'pre-share', + u'encryption': u'des', + u'hash': u'sha', + u'dhGroup': 1, + u'lifetime': 86400} + self._register_local_get(URI_IKE_POLICY_ID % '2', json=policy) + policy_info = {u'priority-id': u'2'} + location = self.csr.create_ike_policy(policy_info) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_IKE_POLICY_ID % '2', location) + + # Check the hard-coded items that get set as well... + actual = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + expected_policy = {u'kind': u'object#ike-policy', + u'version': u'v1', + u'encryption': u'des', + u'hash': u'sha', + u'dhGroup': 1, + u'lifetime': 86400, + # Lower level sets this, but it is the default + u'local-auth-method': u'pre-share'} + expected_policy.update(policy_info) + self.assertEqual(expected_policy, actual) + + def test_create_duplicate_ike_policy(self): + """Negative test of trying to create a duplicate IKE policy.""" + self._helper_register_ike_policy_get() + policy_info = {u'priority-id': u'2', + u'encryption': u'aes', + u'hash': u'sha', + u'dhGroup': 5, + u'lifetime': 3600} + location = self.csr.create_ike_policy(policy_info) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_IKE_POLICY_ID % '2', location) + self.requests.register_uri( + 'POST', + LOCAL_URL + URI_IKE_POLICY, + status_code=requests.codes.BAD_REQUEST, + json={u'error-code': -1, + u'error-message': u'policy 2 exist, not allow to ' + u'update policy using POST method'}) + location = self.csr.create_ike_policy(policy_info) + self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) + expected = {u'error-code': -1, + u'error-message': u'policy 2 exist, not allow to ' + u'update policy using POST method'} + self.assertDictSupersetOf(expected, location) + + +class TestCsrRestIPSecPolicyCreate(CiscoCsrBaseTestCase): + + """Test IPSec policy create REST requests.""" + + def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): + """Set up for each test in this suite. + + Each test case will have a normal authentication and post mock + response registered, although the test may replace them, if needed. + """ + super(TestCsrRestIPSecPolicyCreate, self).setUp(host, + tunnel_ip, + timeout) + self._helper_register_auth_request() + self._helper_register_ipsec_policy_post(123) + + def _helper_register_ipsec_policy_get(self, override=None): + content = {u'kind': u'object#ipsec-policy', + u'mode': u'tunnel', + u'policy-id': u'123', + u'protection-suite': { + u'esp-encryption': u'esp-256-aes', + u'esp-authentication': u'esp-sha-hmac', + u'ah': u'ah-sha-hmac', + }, + u'anti-replay-window-size': u'Disable', + u'lifetime-sec': 120, + u'pfs': u'group5', + u'lifetime-kb': 4608000, + u'idle-time': None} + if override: + content.update(override) + self._register_local_get(URI_IPSEC_POLICY + '/123', json=content) + + def test_create_delete_ipsec_policy(self): + """Create and then delete IPSec policy.""" + policy_info = { + u'policy-id': u'123', + u'protection-suite': { + u'esp-encryption': u'esp-256-aes', + u'esp-authentication': u'esp-sha-hmac', + u'ah': u'ah-sha-hmac', + }, + u'lifetime-sec': 120, + u'pfs': u'group5', + u'anti-replay-window-size': u'disable' + } + location = self.csr.create_ipsec_policy(policy_info) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_IPSEC_POLICY + '/123', location) + + # Check the hard-coded items that get set as well... + self._helper_register_ipsec_policy_get() + actual = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + expected_policy = {u'kind': u'object#ipsec-policy', + u'mode': u'tunnel', + u'lifetime-kb': 4608000, + u'idle-time': None} + expected_policy.update(policy_info) + # CSR will respond with capitalized value + expected_policy[u'anti-replay-window-size'] = u'Disable' + self.assertEqual(expected_policy, actual) + + # Now delete and verify the IPSec policy is gone + self._register_local_delete(URI_IPSEC_POLICY, 123) + self._register_local_get_not_found(URI_IPSEC_POLICY, 123) + + self.csr.delete_ipsec_policy('123') + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + actual = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) + + def test_create_ipsec_policy_with_defaults(self): + """Create IPSec policy with default for all optional values.""" + policy_info = {u'policy-id': u'123'} + location = self.csr.create_ipsec_policy(policy_info) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_IPSEC_POLICY + '/123', location) + + # Check the hard-coded items that get set as well... + expected_policy = {u'kind': u'object#ipsec-policy', + u'mode': u'tunnel', + u'policy-id': u'123', + u'protection-suite': {}, + u'lifetime-sec': 3600, + u'pfs': u'Disable', + u'anti-replay-window-size': u'None', + u'lifetime-kb': 4608000, + u'idle-time': None} + self._register_local_get(URI_IPSEC_POLICY + '/123', + json=expected_policy) + + actual = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertEqual(expected_policy, actual) + + def test_create_ipsec_policy_with_uuid(self): + """Create IPSec policy using UUID for id.""" + # Override normal POST response w/one that has a different policy ID + self._helper_register_ipsec_policy_post(dummy_policy_id) + policy_info = { + u'policy-id': u'%s' % dummy_policy_id, + u'protection-suite': { + u'esp-encryption': u'esp-256-aes', + u'esp-authentication': u'esp-sha-hmac', + u'ah': u'ah-sha-hmac', + }, + u'lifetime-sec': 120, + u'pfs': u'group5', + u'anti-replay-window-size': u'disable' + } + location = self.csr.create_ipsec_policy(policy_info) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_IPSEC_POLICY_ID % dummy_policy_id, location) + + # Check the hard-coded items that get set as well... + expected_policy = {u'kind': u'object#ipsec-policy', + u'mode': u'tunnel', + u'lifetime-kb': 4608000, + u'idle-time': None} + expected_policy.update(policy_info) + # CSR will respond with capitalized value + expected_policy[u'anti-replay-window-size'] = u'Disable' + self._register_local_get(URI_IPSEC_POLICY_ID % dummy_policy_id, + json=expected_policy) + actual = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertEqual(expected_policy, actual) + + def test_create_ipsec_policy_without_ah(self): + """Create IPSec policy.""" + policy_info = { + u'policy-id': u'123', + u'protection-suite': { + u'esp-encryption': u'esp-aes', + u'esp-authentication': u'esp-sha-hmac', + }, + u'lifetime-sec': 120, + u'pfs': u'group5', + u'anti-replay-window-size': u'128' + } + location = self.csr.create_ipsec_policy(policy_info) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_IPSEC_POLICY_ID % '123', location) + + # Check the hard-coded items that get set as well... + self._helper_register_ipsec_policy_get( + override={u'anti-replay-window-size': u'128', + u'protection-suite': { + u'esp-encryption': u'esp-aes', + u'esp-authentication': u'esp-sha-hmac'}}) + + actual = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + expected_policy = {u'kind': u'object#ipsec-policy', + u'mode': u'tunnel', + u'lifetime-kb': 4608000, + u'idle-time': None} + expected_policy.update(policy_info) + self.assertEqual(expected_policy, actual) + + def test_invalid_ipsec_policy_lifetime(self): + """Failure test of IPSec policy with unsupported lifetime.""" + # Override normal POST response with one that indicates bad request + self.requests.register_uri('POST', + LOCAL_URL + URI_IPSEC_POLICY, + status_code=requests.codes.BAD_REQUEST) + policy_info = { + u'policy-id': u'123', + u'protection-suite': { + u'esp-encryption': u'esp-aes', + u'esp-authentication': u'esp-sha-hmac', + u'ah': u'ah-sha-hmac', + }, + u'lifetime-sec': 119, + u'pfs': u'group5', + u'anti-replay-window-size': u'128' + } + self.csr.create_ipsec_policy(policy_info) + self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) + + def test_create_ipsec_policy_with_invalid_name(self): + """Failure test of creating IPSec policy with name too long.""" + # Override normal POST response with one that indicates bad request + self.requests.register_uri('POST', + LOCAL_URL + URI_IPSEC_POLICY, + status_code=requests.codes.BAD_REQUEST) + policy_info = {u'policy-id': u'policy-name-is-too-long-32-chars'} + self.csr.create_ipsec_policy(policy_info) + self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) + + +class TestCsrRestPreSharedKeyCreate(CiscoCsrBaseTestCase): + + """Test Pre-shared key (PSK) create REST requests.""" + + def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): + """Set up for each test in this suite. + + Each test case will have a normal authentication and post mock + response registered, although the test may replace them, if needed. + """ + super(TestCsrRestPreSharedKeyCreate, self).setUp(host, + tunnel_ip, + timeout) + self._helper_register_auth_request() + self._helper_register_psk_post(5) + + def _helper_register_psk_get(self, override=None): + content = {u'kind': u'object#ike-keyring', + u'keyring-name': u'5', + u'pre-shared-key-list': [ + {u'key': u'super-secret', + u'encrypted': False, + u'peer-address': u'10.10.10.20 255.255.255.0'} + ]} + if override: + content.update(override) + self._register_local_get(URI_PSK_ID % '5', json=content) + + def test_create_delete_pre_shared_key(self): + """Create and then delete a keyring entry for pre-shared key.""" + psk_info = {u'keyring-name': u'5', + u'pre-shared-key-list': [ + {u'key': u'super-secret', + u'encrypted': False, + u'peer-address': u'10.10.10.20/24'} + ]} + location = self.csr.create_pre_shared_key(psk_info) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_PSK_ID % '5', location) + + # Check the hard-coded items that get set as well... + self._helper_register_psk_get() + content = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + expected_policy = {u'kind': u'object#ike-keyring'} + expected_policy.update(psk_info) + # Note: the peer CIDR is returned as an IP and mask + expected_policy[u'pre-shared-key-list'][0][u'peer-address'] = ( + u'10.10.10.20 255.255.255.0') + self.assertEqual(expected_policy, content) + + # Now delete and verify pre-shared key is gone + self._register_local_delete(URI_PSK, 5) + self._register_local_get_not_found(URI_PSK, 5) + + self.csr.delete_pre_shared_key('5') + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + content = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) + + def test_create_pre_shared_key_with_fqdn_peer(self): + """Create pre-shared key using FQDN for peer address.""" + psk_info = {u'keyring-name': u'5', + u'pre-shared-key-list': [ + {u'key': u'super-secret', + u'encrypted': False, + u'peer-address': u'cisco.com'} + ]} + location = self.csr.create_pre_shared_key(psk_info) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_PSK_ID % '5', location) + + # Check the hard-coded items that get set as well... + self._helper_register_psk_get( + override={u'pre-shared-key-list': [ + {u'key': u'super-secret', + u'encrypted': False, + u'peer-address': u'cisco.com'} + ]} + ) + content = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + expected_policy = {u'kind': u'object#ike-keyring'} + expected_policy.update(psk_info) + self.assertEqual(expected_policy, content) + + +class TestCsrRestIPSecConnectionCreate(CiscoCsrBaseTestCase): + + """Test IPSec site-to-site connection REST requests. + + This requires us to have first created an IKE policy, IPSec policy, + and pre-shared key, so it's more of an itegration test, when used + with a real CSR (as we can't mock out these pre-conditions). + """ + + def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): + """Setup for each test in this suite. + + Each test case will have a normal authentication mock response + registered here, although they may replace it, as needed. + """ + super(TestCsrRestIPSecConnectionCreate, self).setUp(host, + tunnel_ip, + timeout) + self._helper_register_auth_request() + self.route_id = '10.1.0.0_24_GigabitEthernet1' + + def _make_psk_for_test(self): + psk_id = generate_pre_shared_key_id() + self._remove_resource_for_test(self.csr.delete_pre_shared_key, + psk_id) + self._helper_register_psk_post(psk_id) + psk_info = {u'keyring-name': u'%d' % psk_id, + u'pre-shared-key-list': [ + {u'key': u'super-secret', + u'encrypted': False, + u'peer-address': u'10.10.10.20/24'} + ]} + self.csr.create_pre_shared_key(psk_info) + if self.csr.status != requests.codes.CREATED: + self.fail("Unable to create PSK for test case") + self.addCleanup(self._remove_resource_for_test, + self.csr.delete_pre_shared_key, psk_id) + return psk_id + + def _make_ike_policy_for_test(self): + policy_id = generate_ike_policy_id() + self._remove_resource_for_test(self.csr.delete_ike_policy, + policy_id) + self._helper_register_ike_policy_post(policy_id) + policy_info = {u'priority-id': u'%d' % policy_id, + u'encryption': u'aes', + u'hash': u'sha', + u'dhGroup': 5, + u'lifetime': 3600} + self.csr.create_ike_policy(policy_info) + if self.csr.status != requests.codes.CREATED: + self.fail("Unable to create IKE policy for test case") + self.addCleanup(self._remove_resource_for_test, + self.csr.delete_ike_policy, policy_id) + return policy_id + + def _make_ipsec_policy_for_test(self): + policy_id = generate_ipsec_policy_id() + self._remove_resource_for_test(self.csr.delete_ipsec_policy, + policy_id) + self._helper_register_ipsec_policy_post(policy_id) + policy_info = { + u'policy-id': u'%d' % policy_id, + u'protection-suite': { + u'esp-encryption': u'esp-aes', + u'esp-authentication': u'esp-sha-hmac', + u'ah': u'ah-sha-hmac', + }, + u'lifetime-sec': 120, + u'pfs': u'group5', + u'anti-replay-window-size': u'disable' + } + self.csr.create_ipsec_policy(policy_info) + if self.csr.status != requests.codes.CREATED: + self.fail("Unable to create IPSec policy for test case") + self.addCleanup(self._remove_resource_for_test, + self.csr.delete_ipsec_policy, policy_id) + return policy_id + + def _remove_resource_for_test(self, delete_resource, resource_id): + self._register_local_delete_by_id(resource_id) + delete_resource(resource_id) + + def _prepare_for_site_conn_create(self, skip_psk=False, skip_ike=False, + skip_ipsec=False): + """Create the policies and PSK so can then create site conn.""" + if not skip_psk: + ike_policy_id = self._make_psk_for_test() + else: + ike_policy_id = generate_ike_policy_id() + if not skip_ike: + self._make_ike_policy_for_test() + if not skip_ipsec: + ipsec_policy_id = self._make_ipsec_policy_for_test() + else: + ipsec_policy_id = generate_ipsec_policy_id() + # Note: Use same ID number for tunnel and IPSec policy, so that when + # GET tunnel info, the mocks can infer the IPSec policy ID from the + # tunnel number. + return (ike_policy_id, ipsec_policy_id, ipsec_policy_id) + + def _helper_register_ipsec_conn_get(self, tunnel, override=None): + # Use same number, to allow mock to generate IPSec policy ID + ipsec_policy_id = tunnel[6:] + content = {u'kind': u'object#vpn-site-to-site', + u'vpn-interface-name': u'%s' % tunnel, + u'ip-version': u'ipv4', + u'vpn-type': u'site-to-site', + u'ipsec-policy-id': u'%s' % ipsec_policy_id, + u'ike-profile-id': None, + u'mtu': 1500, + u'local-device': { + u'ip-address': '10.3.0.1/24', + u'tunnel-ip-address': '10.10.10.10' + }, + u'remote-device': { + u'tunnel-ip-address': '10.10.10.20' + }} + if override: + content.update(override) + self._register_local_get(URI_IPSEC_CONN_ID % tunnel, json=content) + + def test_create_delete_ipsec_connection(self): + """Create and then delete an IPSec connection.""" + ike_policy_id, ipsec_policy_id, tunnel_id = ( + self._prepare_for_site_conn_create()) + tunnel_name = u'Tunnel%s' % tunnel_id + self._helper_register_tunnel_post(tunnel_name) + self._register_local_post(URI_ROUTES, self.route_id) + connection_info = { + u'vpn-interface-name': tunnel_name, + u'ipsec-policy-id': u'%d' % ipsec_policy_id, + u'mtu': 1500, + u'local-device': {u'ip-address': u'10.3.0.1/24', + u'tunnel-ip-address': u'10.10.10.10'}, + u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} + } + expected_connection = {u'kind': u'object#vpn-site-to-site', + u'ike-profile-id': None, + u'vpn-type': u'site-to-site', + u'mtu': 1500, + u'ip-version': u'ipv4'} + expected_connection.update(connection_info) + location = self.csr.create_ipsec_connection(connection_info) + self.addCleanup(self._remove_resource_for_test, + self.csr.delete_ipsec_connection, + tunnel_name) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_IPSEC_CONN_ID % tunnel_name, location) + + # Check the hard-coded items that get set as well... + self._helper_register_ipsec_conn_get(tunnel_name) + content = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertEqual(expected_connection, content) + + # Now delete and verify that site-to-site connection is gone + self._register_local_delete_by_id(tunnel_name) + self._register_local_delete_by_id(ipsec_policy_id) + self._register_local_delete_by_id(ike_policy_id) + self._register_local_get_not_found(URI_IPSEC_CONN, + tunnel_name) + # Only delete connection. Cleanup will take care of prerequisites + self.csr.delete_ipsec_connection(tunnel_name) + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + content = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) + + def test_create_ipsec_connection_with_no_tunnel_subnet(self): + """Create an IPSec connection without an IP address on tunnel.""" + _, ipsec_policy_id, tunnel_id = ( + self._prepare_for_site_conn_create()) + tunnel_name = u'Tunnel%s' % tunnel_id + self._helper_register_tunnel_post(tunnel_name) + self._register_local_post(URI_ROUTES, self.route_id) + connection_info = { + u'vpn-interface-name': tunnel_name, + u'ipsec-policy-id': u'%d' % ipsec_policy_id, + u'local-device': {u'ip-address': u'GigabitEthernet3', + u'tunnel-ip-address': u'10.10.10.10'}, + u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} + } + expected_connection = {u'kind': u'object#vpn-site-to-site', + u'ike-profile-id': None, + u'vpn-type': u'site-to-site', + u'mtu': 1500, + u'ip-version': u'ipv4'} + expected_connection.update(connection_info) + location = self.csr.create_ipsec_connection(connection_info) + self.addCleanup(self._remove_resource_for_test, + self.csr.delete_ipsec_connection, + tunnel_name) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn('vpn-svc/site-to-site/' + tunnel_name, location) + + # Check the hard-coded items that get set as well... + self._helper_register_ipsec_conn_get(tunnel_name, override={ + u'local-device': { + u'ip-address': u'GigabitEthernet3', + u'tunnel-ip-address': u'10.10.10.10' + }}) + content = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertEqual(expected_connection, content) + + def test_create_ipsec_connection_no_pre_shared_key(self): + """Test of connection create without associated pre-shared key. + + The CSR will create the connection, but will not be able to pass + traffic without the pre-shared key. + """ + + _, ipsec_policy_id, tunnel_id = ( + self._prepare_for_site_conn_create(skip_psk=True)) + tunnel_name = u'Tunnel%s' % tunnel_id + self._helper_register_tunnel_post(tunnel_name) + self._register_local_post(URI_ROUTES, self.route_id) + connection_info = { + u'vpn-interface-name': tunnel_name, + u'ipsec-policy-id': u'%d' % ipsec_policy_id, + u'mtu': 1500, + u'local-device': {u'ip-address': u'10.3.0.1/24', + u'tunnel-ip-address': u'10.10.10.10'}, + u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} + } + expected_connection = {u'kind': u'object#vpn-site-to-site', + u'ike-profile-id': None, + u'vpn-type': u'site-to-site', + u'ip-version': u'ipv4'} + expected_connection.update(connection_info) + location = self.csr.create_ipsec_connection(connection_info) + self.addCleanup(self._remove_resource_for_test, + self.csr.delete_ipsec_connection, + tunnel_name) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_IPSEC_CONN_ID % tunnel_name, location) + + # Check the hard-coded items that get set as well... + self._helper_register_ipsec_conn_get(tunnel_name) + content = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertEqual(expected_connection, content) + + def test_create_ipsec_connection_with_default_ike_policy(self): + """Test of connection create without IKE policy (uses default). + + Without an IKE policy, the CSR will use a built-in default IKE + policy setting for the connection. + """ + + _, ipsec_policy_id, tunnel_id = ( + self._prepare_for_site_conn_create(skip_ike=True)) + tunnel_name = u'Tunnel%s' % tunnel_id + self._helper_register_tunnel_post(tunnel_name) + self._register_local_post(URI_ROUTES, self.route_id) + connection_info = { + u'vpn-interface-name': tunnel_name, + u'ipsec-policy-id': u'%d' % ipsec_policy_id, + u'mtu': 1500, + u'local-device': {u'ip-address': u'10.3.0.1/24', + u'tunnel-ip-address': u'10.10.10.10'}, + u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} + } + expected_connection = {u'kind': u'object#vpn-site-to-site', + u'ike-profile-id': None, + u'vpn-type': u'site-to-site', + u'ip-version': u'ipv4'} + expected_connection.update(connection_info) + location = self.csr.create_ipsec_connection(connection_info) + self.addCleanup(self._remove_resource_for_test, + self.csr.delete_ipsec_connection, + tunnel_name) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_IPSEC_CONN_ID % tunnel_name, location) + + # Check the hard-coded items that get set as well... + self._helper_register_ipsec_conn_get(tunnel_name) + content = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertEqual(expected_connection, content) + + def test_set_ipsec_connection_admin_state_changes(self): + """Create IPSec connection in admin down state.""" + _, ipsec_policy_id, tunnel_id = ( + self._prepare_for_site_conn_create()) + tunnel_name = u'Tunnel%s' % tunnel_id + self._helper_register_tunnel_post(tunnel_name) + self._register_local_post(URI_ROUTES, self.route_id) + connection_info = { + u'vpn-interface-name': tunnel_name, + u'ipsec-policy-id': u'%d' % ipsec_policy_id, + u'mtu': 1500, + u'local-device': {u'ip-address': u'10.3.0.1/24', + u'tunnel-ip-address': u'10.10.10.10'}, + u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} + } + location = self.csr.create_ipsec_connection(connection_info) + self.addCleanup(self._remove_resource_for_test, + self.csr.delete_ipsec_connection, + tunnel_name) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_IPSEC_CONN_ID % tunnel_name, location) + + state_url = location + "/state" + state_uri = URI_IPSEC_CONN_ID % tunnel_name + '/state' + # Note: When created, the tunnel will be in admin 'up' state + # Note: Line protocol state will be down, unless have an active conn. + expected_state = {u'kind': u'object#vpn-site-to-site-state', + u'vpn-interface-name': tunnel_name, + u'line-protocol-state': u'down', + u'enabled': False} + self._register_local_put(URI_IPSEC_CONN_ID % tunnel_name, 'state') + self.csr.set_ipsec_connection_state(tunnel_name, admin_up=False) + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + + self._register_local_get(state_uri, json=expected_state) + content = self.csr.get_request(state_url, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertEqual(expected_state, content) + + self.csr.set_ipsec_connection_state(tunnel_name, admin_up=True) + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + expected_state = {u'kind': u'object#vpn-site-to-site-state', + u'vpn-interface-name': tunnel_name, + u'line-protocol-state': u'down', + u'enabled': True} + self._register_local_get(state_uri, json=expected_state) + content = self.csr.get_request(state_url, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertEqual(expected_state, content) + + def test_create_ipsec_connection_missing_ipsec_policy(self): + """Negative test of connection create without IPSec policy.""" + _, ipsec_policy_id, tunnel_id = ( + self._prepare_for_site_conn_create(skip_ipsec=True)) + tunnel_name = u'Tunnel%s' % tunnel_id + self._register_local_post(URI_IPSEC_CONN, tunnel_name, + result_code=requests.codes.BAD_REQUEST) + connection_info = { + u'vpn-interface-name': tunnel_name, + u'ipsec-policy-id': u'%d' % ipsec_policy_id, + u'local-device': {u'ip-address': u'10.3.0.1/24', + u'tunnel-ip-address': u'10.10.10.10'}, + u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} + } + self.csr.create_ipsec_connection(connection_info) + self.addCleanup(self._remove_resource_for_test, + self.csr.delete_ipsec_connection, + 'Tunnel%d' % tunnel_id) + self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) + + def _determine_conflicting_ip(self): + content = {u'kind': u'object#interface', + u'subnet-mask': u'255.255.255.0', + u'ip-address': u'10.5.0.2'} + self._register_local_get('interfaces/GigabitEthernet3', json=content) + details = self.csr.get_request('interfaces/GigabitEthernet3') + if self.csr.status != requests.codes.OK: + self.fail("Unable to obtain interface GigabitEthernet3's IP") + if_ip = details.get('ip-address') + if not if_ip: + self.fail("No IP address for GigabitEthernet3 interface") + return '.'.join(if_ip.split('.')[:3]) + '.10' + + def test_create_ipsec_connection_conficting_tunnel_ip(self): + """Negative test of connection create with conflicting tunnel IP. + + Find out the IP of a local interface (GigabitEthernet3) and create an + IP that is on the same subnet. Note: this interface needs to be up. + """ + + conflicting_ip = self._determine_conflicting_ip() + _, ipsec_policy_id, tunnel_id = ( + self._prepare_for_site_conn_create()) + tunnel_name = u'Tunnel%s' % tunnel_id + self._register_local_post(URI_IPSEC_CONN, tunnel_name, + result_code=requests.codes.BAD_REQUEST) + connection_info = { + u'vpn-interface-name': tunnel_name, + u'ipsec-policy-id': u'%d' % ipsec_policy_id, + u'local-device': {u'ip-address': u'%s/24' % conflicting_ip, + u'tunnel-ip-address': u'10.10.10.10'}, + u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} + } + self.csr.create_ipsec_connection(connection_info) + self.addCleanup(self._remove_resource_for_test, + self.csr.delete_ipsec_connection, + tunnel_name) + self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) + + def test_create_ipsec_connection_with_max_mtu(self): + """Create an IPSec connection with max MTU value.""" + _, ipsec_policy_id, tunnel_id = ( + self._prepare_for_site_conn_create()) + tunnel_name = u'Tunnel%s' % tunnel_id + self._helper_register_tunnel_post(tunnel_name) + self._register_local_post(URI_ROUTES, self.route_id) + connection_info = { + u'vpn-interface-name': tunnel_name, + u'ipsec-policy-id': u'%d' % ipsec_policy_id, + u'mtu': 9192, + u'local-device': {u'ip-address': u'10.3.0.1/24', + u'tunnel-ip-address': u'10.10.10.10'}, + u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} + } + expected_connection = {u'kind': u'object#vpn-site-to-site', + u'ike-profile-id': None, + u'vpn-type': u'site-to-site', + u'ip-version': u'ipv4'} + expected_connection.update(connection_info) + location = self.csr.create_ipsec_connection(connection_info) + self.addCleanup(self._remove_resource_for_test, + self.csr.delete_ipsec_connection, + tunnel_name) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_IPSEC_CONN_ID % tunnel_name, location) + + # Check the hard-coded items that get set as well... + self._helper_register_ipsec_conn_get(tunnel_name, override={ + u'mtu': 9192}) + content = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertEqual(expected_connection, content) + + def test_create_ipsec_connection_with_bad_mtu(self): + """Negative test of connection create with unsupported MTU value.""" + _, ipsec_policy_id, tunnel_id = ( + self._prepare_for_site_conn_create()) + tunnel_name = u'Tunnel%s' % tunnel_id + self._register_local_post(URI_IPSEC_CONN, tunnel_name, + result_code=requests.codes.BAD_REQUEST) + connection_info = { + u'vpn-interface-name': tunnel_name, + u'ipsec-policy-id': u'%d' % ipsec_policy_id, + u'mtu': 9193, + u'local-device': {u'ip-address': u'10.3.0.1/24', + u'tunnel-ip-address': u'10.10.10.10'}, + u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} + } + self.csr.create_ipsec_connection(connection_info) + self.addCleanup(self._remove_resource_for_test, + self.csr.delete_ipsec_connection, + tunnel_name) + self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) + + def test_status_when_no_tunnels_exist(self): + """Get status, when there are no tunnels.""" + content = {u'kind': u'collection#vpn-active-sessions', + u'items': []} + self._register_local_get(URI_SESSIONS, json=content) + tunnels = self.csr.read_tunnel_statuses() + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertEqual([], tunnels) + + def test_status_for_one_tunnel(self): + """Get status of one tunnel.""" + # Create the IPsec site-to-site connection first + _, ipsec_policy_id, tunnel_id = ( + self._prepare_for_site_conn_create()) + tunnel_name = u'Tunnel%s' % tunnel_id + self._helper_register_tunnel_post(tunnel_name) + self._register_local_post(URI_ROUTES, self.route_id) + connection_info = { + u'vpn-interface-name': tunnel_name, + u'ipsec-policy-id': u'%d' % ipsec_policy_id, + u'local-device': {u'ip-address': u'10.3.0.1/24', + u'tunnel-ip-address': u'10.10.10.10'}, + u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} + } + location = self.csr.create_ipsec_connection(connection_info) + self.addCleanup(self._remove_resource_for_test, + self.csr.delete_ipsec_connection, + tunnel_name) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_IPSEC_CONN_ID % tunnel_name, location) + + # Now, check the status + content = {u'kind': u'collection#vpn-active-sessions', + u'items': [{u'status': u'DOWN-NEGOTIATING', + u'vpn-interface-name': tunnel_name}, ]} + self._register_local_get(URI_SESSIONS, json=content) + self._helper_register_ipsec_conn_get(tunnel_name) + tunnels = self.csr.read_tunnel_statuses() + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertEqual([(tunnel_name, u'DOWN-NEGOTIATING'), ], tunnels) + + +class TestCsrRestIkeKeepaliveCreate(CiscoCsrBaseTestCase): + + """Test IKE keepalive REST requests. + + Note: On the Cisco CSR, the IKE keepalive for v1 is a global configuration + that applies to all VPN tunnels to specify Dead Peer Detection information. + As a result, this REST API is not used in the OpenStack device driver, and + the keepalive will default to zero (disabled). + """ + + def _save_dpd_info(self): + details = self.csr.get_request(URI_KEEPALIVE) + if self.csr.status == requests.codes.OK: + self.dpd = details + self.addCleanup(self._restore_dpd_info) + elif self.csr.status != requests.codes.NOT_FOUND: + self.fail("Unable to save original DPD info") + + def _restore_dpd_info(self): + payload = {'interval': self.dpd['interval'], + 'retry': self.dpd['retry']} + self.csr.put_request(URI_KEEPALIVE, payload=payload) + if self.csr.status != requests.codes.NO_CONTENT: + self.fail("Unable to restore DPD info after test") + + def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): + """Set up for each test in this suite. + + Each test case will have a normal authentication, get, and put mock + responses registered, although the test may replace them, if needed. + Dead Peer Detection settions will be saved for each test, and + restored afterwards. + """ + super(TestCsrRestIkeKeepaliveCreate, self).setUp(host, + tunnel_ip, + timeout) + self._helper_register_auth_request() + self._helper_register_keepalive_get() + self._register_local_put('vpn-svc/ike', 'keepalive') + self._save_dpd_info() + self.csr.token = None + + def _helper_register_keepalive_get(self, override=None): + content = {u'interval': 60, + u'retry': 4, + u'periodic': True} + if override: + content.update(override) + self._register_local_get(URI_KEEPALIVE, json=content) + + def test_configure_ike_keepalive(self): + """Set IKE keep-alive (aka Dead Peer Detection) for the CSR.""" + keepalive_info = {'interval': 60, 'retry': 4} + self.csr.configure_ike_keepalive(keepalive_info) + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + content = self.csr.get_request(URI_KEEPALIVE) + self.assertEqual(requests.codes.OK, self.csr.status) + expected = {'periodic': False} + expected.update(keepalive_info) + self.assertDictSupersetOf(expected, content) + + def test_disable_ike_keepalive(self): + """Disable IKE keep-alive (aka Dead Peer Detection) for the CSR.""" + keepalive_info = {'interval': 0, 'retry': 4} + self.csr.configure_ike_keepalive(keepalive_info) + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + + +class TestCsrRestStaticRoute(CiscoCsrBaseTestCase): + + """Test static route REST requests. + + A static route is added for the peer's private network. Would create + a route for each of the peer CIDRs specified for the VPN connection. + """ + + def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): + """Set up for each test in this suite. + + Each test case will have a normal authentication mock response + registered, although the test may replace it, if needed. + """ + super(TestCsrRestStaticRoute, self).setUp(host, tunnel_ip, timeout) + self._helper_register_auth_request() + + def test_create_delete_static_route(self): + """Create and then delete a static route for the tunnel.""" + expected_id = '10.1.0.0_24_GigabitEthernet1' + self._register_local_post(URI_ROUTES, resource_id=expected_id) + cidr = u'10.1.0.0/24' + interface = u'GigabitEthernet1' + route_info = {u'destination-network': cidr, + u'outgoing-interface': interface} + location = self.csr.create_static_route(route_info) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_ROUTES_ID % expected_id, location) + + # Check the hard-coded items that get set as well... + expected_route = {u'destination-network': u'10.1.0.0/24', + u'kind': u'object#static-route', + u'next-hop-router': None, + u'outgoing-interface': u'GigabitEthernet1', + u'admin-distance': 1} + self._register_local_get(URI_ROUTES_ID % expected_id, + json=expected_route) + content = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertEqual(expected_route, content) + + # Now delete and verify that static route is gone + self._register_local_delete(URI_ROUTES, expected_id) + self._register_local_get_not_found(URI_ROUTES, expected_id) + route_id = csr_client.make_route_id(cidr, interface) + self.csr.delete_static_route(route_id) + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + content = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.NOT_FOUND, self.csr.status)