Merge "Support to get resource by id"

This commit is contained in:
Jenkins 2017-08-29 14:17:27 +00:00 committed by Gerrit Code Review
commit de90e2bdbe
17 changed files with 385 additions and 1 deletions

View File

@ -2667,6 +2667,20 @@ class OpenStackCloud(
"""
return _utils._get_entity(self.search_networks, name_or_id, filters)
def get_network_by_id(self, id):
""" Get a network by ID
:param id: ID of the network.
:returns: A network ``munch.Munch``.
"""
data = self._network_client.get(
'/networks/{id}'.format(id=id),
error_message="Error getting network with ID {id}".format(id=id)
)
network = self._get_and_munchify('network', data)
return network
def get_router(self, name_or_id, filters=None):
"""Get a router by name or ID.
@ -2713,6 +2727,20 @@ class OpenStackCloud(
"""
return _utils._get_entity(self.search_subnets, name_or_id, filters)
def get_subnet_by_id(self, id):
""" Get a subnet by ID
:param id: ID of the subnet.
:returns: A subnet ``munch.Munch``.
"""
data = self._network_client.get(
'/subnets/{id}'.format(id=id),
error_message="Error getting subnet with ID {id}".format(id=id)
)
subnet = self._get_and_munchify('subnet', data)
return subnet
def get_port(self, name_or_id, filters=None):
"""Get a port by name or ID.
@ -2737,6 +2765,20 @@ class OpenStackCloud(
"""
return _utils._get_entity(self.search_ports, name_or_id, filters)
def get_port_by_id(self, id):
""" Get a port by ID
:param id: ID of the port.
:returns: A port ``munch.Munch``.
"""
data = self._network_client.get(
'/ports/{id}'.format(id=id),
error_message="Error getting port with ID {id}".format(id=id)
)
port = self._get_and_munchify('port', data)
return port
def get_qos_policy(self, name_or_id, filters=None):
"""Get a QoS policy by name or ID.
@ -2788,6 +2830,21 @@ class OpenStackCloud(
"""
return _utils._get_entity(self.search_volumes, name_or_id, filters)
def get_volume_by_id(self, id):
""" Get a volume by ID
:param id: ID of the volume.
:returns: A volume ``munch.Munch``.
"""
data = self._volume_client.get(
'/volumes/{id}'.format(id=id),
error_message="Error getting volume with ID {id}".format(id=id)
)
volume = self._normalize_volume(
self._get_and_munchify('volume', data))
return volume
def get_volume_type(self, name_or_id, filters=None):
"""Get a volume type by name or ID.
@ -2844,6 +2901,42 @@ class OpenStackCloud(
self.search_flavors, get_extra=get_extra)
return _utils._get_entity(search_func, name_or_id, filters)
def get_flavor_by_id(self, id, get_extra=True):
""" Get a flavor by ID
:param id: ID of the flavor.
:param get_extra:
Whether or not the list_flavors call should get the extra flavor
specs.
:returns: A flavor ``munch.Munch``.
"""
data = self._compute_client.get(
'/flavors/{id}'.format(id=id),
error_message="Error getting flavor with ID {id}".format(id=id)
)
flavor = self._normalize_flavor(
self._get_and_munchify('flavor', data))
if get_extra is None:
get_extra = self._extra_config['get_flavor_extra_specs']
if not flavor.extra_specs and get_extra:
endpoint = "/flavors/{id}/os-extra_specs".format(
id=flavor.id)
try:
data = self._compute_client.get(
endpoint,
error_message="Error fetching flavor extra specs")
flavor.extra_specs = self._get_and_munchify(
'extra_specs', data)
except OpenStackCloudHTTPError as e:
flavor.extra_specs = {}
self.log.debug(
'Fetching extra specs for flavor failed:'
' %(msg)s', {'msg': str(e)})
return flavor
def get_security_group(self, name_or_id, filters=None):
"""Get a security group by name or ID.
@ -2870,6 +2963,29 @@ class OpenStackCloud(
return _utils._get_entity(
self.search_security_groups, name_or_id, filters)
def get_security_group_by_id(self, id):
""" Get a security group by ID
:param id: ID of the security group.
:returns: A security group ``munch.Munch``.
"""
if not self._has_secgroups():
raise OpenStackCloudUnavailableFeature(
"Unavailable feature: security groups"
)
error_message = ("Error getting security group with"
" ID {id}".format(id=id))
if self._use_neutron_secgroups():
data = self._network_client.get(
'/security-groups/{id}'.format(id=id),
error_message=error_message)
else:
data = self._compute_client.get(
'/os-security-groups/{id}'.format(id=id),
error_message=error_message)
return self._normalize_secgroup(
self._get_and_munchify('security_group', data))
def get_server_console(self, server, length=None):
"""Get the console log for a server.
@ -2998,6 +3114,22 @@ class OpenStackCloud(
"""
return _utils._get_entity(self.search_images, name_or_id, filters)
def get_image_by_id(self, id):
""" Get a image by ID
:param id: ID of the image.
:returns: An image ``munch.Munch``.
"""
data = self._image_client.get(
'/images/{id}'.format(id=id),
error_message="Error getting image with ID {id}".format(id=id)
)
key = 'image' if 'image' in data else None
image = self._normalize_image(
self._get_and_munchify(key, data))
return image
def download_image(
self, name_or_id, output_path=None, output_file=None,
chunk_size=1024):
@ -3073,6 +3205,27 @@ class OpenStackCloud(
"""
return _utils._get_entity(self.search_floating_ips, id, filters)
def get_floating_ip_by_id(self, id):
""" Get a floating ip by ID
:param id: ID of the floating ip.
:returns: A floating ip ``munch.Munch``.
"""
error_message = "Error getting floating ip with ID {id}".format(id=id)
if self._use_neutron_floating():
data = self._network_client.get(
'/floatingips/{id}'.format(id=id),
error_message=error_message)
return self._normalize_floating_ip(
self._get_and_munchify('floatingip', data))
else:
data = self._compute_client.get(
'/os-floating-ips/{id}'.format(id=id),
error_message=error_message)
return self._normalize_floating_ip(
self._get_and_munchify('floating_ip', data))
def get_stack(self, name_or_id, filters=None):
"""Get exactly one stack.

View File

@ -165,7 +165,7 @@ class TestFlavor(base.BaseFunctionalTestCase):
# Unset the 'foo' value
self.operator_cloud.unset_flavor_specs(mod_flavor['id'], ['foo'])
mod_flavor = self.operator_cloud.get_flavor(new_flavor['id'])
mod_flavor = self.operator_cloud.get_flavor_by_id(new_flavor['id'])
# Verify 'foo' is unset and 'bar' is still set
self.assertEqual({'bar': 'bbb'}, mod_flavor['extra_specs'])

View File

@ -275,3 +275,10 @@ class TestFloatingIP(base.BaseFunctionalTestCase):
[fip.id for fip in self.user_cloud.search_floating_ips(
filters={"attached": True})]
)
def test_get_floating_ip_by_id(self):
fip_user = self.user_cloud.create_floating_ip()
self.addCleanup(self.user_cloud.delete_floating_ip, fip_user.id)
ret_fip = self.user_cloud.get_floating_ip_by_id(fip_user.id)
self.assertEqual(fip_user, ret_fip)

View File

@ -148,3 +148,23 @@ class TestImage(base.BaseFunctionalTestCase):
self.assertEqual(image.properties['foo'], 'bar')
finally:
self.user_cloud.delete_image(image_name, wait=True)
def test_get_image_by_id(self):
test_image = tempfile.NamedTemporaryFile(delete=False)
test_image.write(b'\0' * 1024 * 1024)
test_image.close()
image_name = self.getUniqueString('image')
try:
image = self.user_cloud.create_image(
name=image_name,
filename=test_image.name,
disk_format='raw',
container_format='bare',
min_disk=10,
min_ram=1024,
wait=True)
image = self.user_cloud.get_image_by_id(image.id)
self.assertEqual(image_name, image.name)
self.assertEqual('raw', image.disk_format)
finally:
self.user_cloud.delete_image(image_name, wait=True)

View File

@ -50,6 +50,21 @@ class TestNetwork(base.BaseFunctionalTestCase):
self.assertFalse(net1['router:external'])
self.assertTrue(net1['admin_state_up'])
def test_get_network_by_id(self):
net1 = self.operator_cloud.create_network(name=self.network_name)
self.assertIn('id', net1)
self.assertEqual(self.network_name, net1['name'])
self.assertFalse(net1['shared'])
self.assertFalse(net1['router:external'])
self.assertTrue(net1['admin_state_up'])
ret_net1 = self.operator_cloud.get_network_by_id(net1.id)
self.assertIn('id', ret_net1)
self.assertEqual(self.network_name, ret_net1['name'])
self.assertFalse(ret_net1['shared'])
self.assertFalse(ret_net1['router:external'])
self.assertTrue(ret_net1['admin_state_up'])
def test_create_network_advanced(self):
net1 = self.operator_cloud.create_network(
name=self.network_name,

View File

@ -91,6 +91,25 @@ class TestPort(base.BaseFunctionalTestCase):
del updated_port['extra_dhcp_opts']
self.assertEqual(port, updated_port)
def test_get_port_by_id(self):
port_name = self.new_port_name + '_get_by_id'
networks = self.operator_cloud.list_networks()
if not networks:
self.assertFalse('no sensible network available')
port = self.operator_cloud.create_port(
network_id=networks[0]['id'], name=port_name)
self.assertIsInstance(port, dict)
self.assertIn('id', port)
self.assertEqual(port.get('name'), port_name)
updated_port = self.operator_cloud.get_port_by_id(port['id'])
# extra_dhcp_opts is added later by Neutron...
if 'extra_dhcp_opts' in updated_port and 'extra_dhcp_opts' not in port:
del updated_port['extra_dhcp_opts']
self.assertEqual(port, updated_port)
def test_update_port(self):
port_name = self.new_port_name + '_update'
new_port_name = port_name + '_new'

View File

@ -50,3 +50,10 @@ class TestSecurityGroups(base.BaseFunctionalTestCase):
sg_list = self.operator_cloud.list_security_groups(
filters={'all_tenants': 1})
self.assertIn(sg1['id'], [sg['id'] for sg in sg_list])
def test_get_security_group_by_id(self):
sg = self.user_cloud.create_security_group(name='sg', description='sg')
self.addCleanup(self.user_cloud.delete_security_group, sg['id'])
ret_sg = self.user_cloud.get_security_group_by_id(sg['id'])
self.assertEqual(sg, ret_sg)

View File

@ -47,6 +47,9 @@ class TestVolume(base.BaseFunctionalTestCase):
display_name=snapshot_name
)
ret_volume = self.user_cloud.get_volume_by_id(volume['id'])
self.assertEqual(volume['id'], ret_volume['id'])
volume_ids = [v['id'] for v in self.user_cloud.list_volumes()]
self.assertIn(volume['id'], volume_ids)

View File

@ -236,3 +236,23 @@ class TestFlavors(base.RequestsMockTestCase):
])
self.op_cloud.list_flavor_access('vanilla')
self.assert_calls()
def test_get_flavor_by_id(self):
flavor_uri = '{endpoint}/flavors/1'.format(
endpoint=fakes.COMPUTE_ENDPOINT)
flavor_extra_uri = '{endpoint}/flavors/1/os-extra_specs'.format(
endpoint=fakes.COMPUTE_ENDPOINT)
flavor_json = {'flavor': fakes.make_fake_flavor('1', 'vanilla')}
flavor_extra_json = {'extra_specs': {'name': 'test'}}
self.register_uris([
dict(method='GET', uri=flavor_uri, json=flavor_json),
dict(method='GET', uri=flavor_extra_uri, json=flavor_extra_json),
])
flavor1 = self.cloud.get_flavor_by_id('1')
self.assertEqual('1', flavor1['id'])
self.assertEqual({'name': 'test'}, flavor1.extra_specs)
flavor2 = self.cloud.get_flavor_by_id('1', get_extra=False)
self.assertEqual('1', flavor2['id'])
self.assertEqual({}, flavor2.extra_specs)

View File

@ -235,6 +235,29 @@ class TestFloatingIP(base.RequestsMockTestCase):
self.assertIsNone(floating_ip)
self.assert_calls()
def test_get_floating_ip_by_id(self):
fid = self.mock_floating_ip_new_rep['floatingip']['id']
self.register_uris([
dict(method='GET',
uri='https://network.example.com/v2.0/floatingips/'
'{id}'.format(id=fid),
json=self.mock_floating_ip_new_rep)])
floating_ip = self.cloud.get_floating_ip_by_id(id=fid)
self.assertIsInstance(floating_ip, dict)
self.assertEqual('172.24.4.229', floating_ip['floating_ip_address'])
self.assertEqual(
self.mock_floating_ip_new_rep['floatingip']['tenant_id'],
floating_ip['project_id']
)
self.assertEqual(
self.mock_floating_ip_new_rep['floatingip']['tenant_id'],
floating_ip['tenant_id']
)
self.assertIn('location', floating_ip)
self.assert_calls()
def test_create_floating_ip(self):
self.register_uris([
dict(method='GET',

View File

@ -142,6 +142,20 @@ class TestFloatingIP(base.RequestsMockTestCase):
self.assert_calls()
def test_get_floating_ip_by_id(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url('compute', append=['os-floating-ips',
'1']),
json={'floating_ip': self.mock_floating_ip_list_rep[0]}),
])
floating_ip = self.cloud.get_floating_ip_by_id(id='1')
self.assertIsInstance(floating_ip, dict)
self.assertEqual('203.0.113.1', floating_ip['floating_ip_address'])
self.assert_calls()
def test_create_floating_ip(self):
self.register_uris([
dict(method='POST',

View File

@ -672,6 +672,18 @@ class TestImage(BaseTestImage):
self.assertEqual(
self._munch_images(ret), self.cloud.list_images())
def test_get_image_by_id(self):
self.register_uris([
dict(method='GET',
uri='https://image.example.com/v2/images/{id}'.format(
id=self.image_id),
json=self.fake_image_dict)
])
self.assertEqual(
self.cloud._normalize_image(self.fake_image_dict),
self.cloud.get_image_by_id(self.image_id))
self.assert_calls()
class TestImageV1Only(base.RequestsMockTestCase):

View File

@ -234,3 +234,17 @@ class TestNetwork(base.RequestsMockTestCase):
self.assertRaises(shade.OpenStackCloudException,
self.cloud.delete_network, network_name)
self.assert_calls()
def test_get_network_by_id(self):
network_id = "test-net-id"
network_name = "network"
network = {'id': network_id, 'name': network_name}
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'networks', "%s" % network_id]),
json={'network': network})
])
self.assertTrue(self.cloud.get_network_by_id(network_id))
self.assert_calls()

View File

@ -346,3 +346,18 @@ class TestPort(base.RequestsMockTestCase):
])
self.assertTrue(self.cloud.delete_port(name_or_id=port1['id']))
self.assert_calls()
def test_get_port_by_id(self):
fake_port = dict(id='123', name='456')
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0',
'ports',
fake_port['id']]),
json={'port': fake_port})
])
r = self.cloud.get_port_by_id(fake_port['id'])
self.assertIsNotNone(r)
self.assertDictEqual(fake_port, r)
self.assert_calls()

View File

@ -749,3 +749,36 @@ class TestSecurityGroups(base.RequestsMockTestCase):
self.assertFalse(ret)
self.assert_calls()
def test_get_security_group_by_id_neutron(self):
self.cloud.secgroup_source = 'neutron'
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0',
'security-groups',
neutron_grp_dict['id']]),
json={'security_group': neutron_grp_dict})
])
ret_sg = self.cloud.get_security_group_by_id(neutron_grp_dict['id'])
self.assertEqual(neutron_grp_dict['id'], ret_sg['id'])
self.assertEqual(neutron_grp_dict['name'], ret_sg['name'])
self.assertEqual(neutron_grp_dict['description'],
ret_sg['description'])
self.assert_calls()
def test_get_security_group_by_id_nova(self):
self.register_uris([
dict(method='GET',
uri='{endpoint}/os-security-groups/{id}'.format(
endpoint=fakes.COMPUTE_ENDPOINT,
id=nova_grp_dict['id']),
json={'security_group': nova_grp_dict}),
])
self.cloud.secgroup_source = 'nova'
self.has_neutron = False
ret_sg = self.cloud.get_security_group_by_id(nova_grp_dict['id'])
self.assertEqual(nova_grp_dict['id'], ret_sg['id'])
self.assertEqual(nova_grp_dict['name'], ret_sg['name'])
self.assert_calls()

View File

@ -69,6 +69,20 @@ class TestSubnet(base.RequestsMockTestCase):
self.assertDictEqual(self.mock_subnet_rep, r)
self.assert_calls()
def test_get_subnet_by_id(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0',
'subnets',
self.subnet_id]),
json={'subnet': self.mock_subnet_rep})
])
r = self.cloud.get_subnet_by_id(self.subnet_id)
self.assertIsNotNone(r)
self.assertDictEqual(self.mock_subnet_rep, r)
self.assert_calls()
def test_create_subnet(self):
pool = [{'start': '192.168.199.2', 'end': '192.168.199.254'}]
dns = ['8.8.8.8']

View File

@ -433,3 +433,18 @@ class TestVolume(base.RequestsMockTestCase):
[self.cloud._normalize_volume(vol1)],
self.cloud.list_volumes())
self.assert_calls()
def test_get_volume_by_id(self):
vol1 = meta.obj_to_munch(fakes.FakeVolume('01', 'available', 'vol1'))
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'volumev2', 'public',
append=['volumes', '01']),
json={'volume': vol1}
)
])
self.assertEqual(
self.cloud._normalize_volume(vol1),
self.cloud.get_volume_by_id('01'))
self.assert_calls()