diff --git a/zun/compute/compute_node_tracker.py b/zun/compute/compute_node_tracker.py index bb4a822ec..80c750899 100644 --- a/zun/compute/compute_node_tracker.py +++ b/zun/compute/compute_node_tracker.py @@ -81,19 +81,19 @@ class ComputeNodeTracker(object): """ # No memory and cpu specified, no need to claim resource now. if not (container.memory or container.cpu): - self._set_container_host(container) + self._set_container_host(context, container) return claims.NopClaim() # We should have the compute node created here, just get it. self.compute_node = self._get_compute_node(context) if self.disabled(hostname): - self._set_container_host(container) + self._set_container_host(context, container) return claims.NopClaim() claim = claims.Claim(context, container, self, self.compute_node, limits=limits) - self._set_container_host(container) + self._set_container_host(context, container) self._update_usage_from_container(container) # persist changes to the compute node: self._update(self.compute_node) @@ -103,14 +103,14 @@ class ComputeNodeTracker(object): def disabled(self, hostname): return not self.container_driver.node_is_available(hostname) - def _set_container_host(self, container): + def _set_container_host(self, context, container): """Tag the container as belonging to this host. This should be done while the COMPUTE_RESOURCES_SEMAPHORE is held so the resource claim will not be lost if the audit process starts. """ container.host = self.host - container.save() + container.save(context) def _update_usage_from_container(self, container, is_removed=False): """Update usage for a single container.""" diff --git a/zun/container/docker/driver.py b/zun/container/docker/driver.py index e11f353e4..e2dd22b1c 100644 --- a/zun/container/docker/driver.py +++ b/zun/container/docker/driver.py @@ -219,6 +219,8 @@ class DockerDriver(driver.ContainerDriver): raise def _cleanup_network_for_container(self, container, network_api): + if not container.addresses: + return for name in container.addresses: network_api.disconnect_container_from_network(container, name) diff --git a/zun/db/etcd/api.py b/zun/db/etcd/api.py index 392a6a995..da5ae6172 100644 --- a/zun/db/etcd/api.py +++ b/zun/db/etcd/api.py @@ -77,6 +77,8 @@ def translate_etcd_result(etcd_result, model_type): ret = models.Image(data) elif model_type == 'resource_class': ret = models.ResourceClass(data) + elif model_type == 'compute_node': + ret = models.ComputeNode(data) else: raise exception.InvalidParameterValue( _('The model_type value: %s is invalid.'), model_type) @@ -534,3 +536,105 @@ class EtcdAPI(object): six.text_type(e)) raise return translate_etcd_result(target, 'resource_class') + + def get_compute_node_by_hostname(self, context, hostname): + """Return a compute node. + + :param context: The security context + :param hostname: The hostname of a compute node. + :returns: A compute node. + """ + try: + compute_nodes = self.list_compute_nodes( + context, filters={'hostname': hostname}) + if compute_nodes: + return compute_nodes[0] + else: + raise exception.ComputeNodeNotFound(compute_node=hostname) + except Exception as e: + LOG.error('Error occurred while retrieving compute node: %s', + six.text_type(e)) + raise + + def _get_compute_node_by_uuid(self, context, uuid): + try: + compute_node = None + res = self.client.read('/compute_nodes/' + uuid) + compute_node = translate_etcd_result(res, 'compute_node') + except etcd.EtcdKeyNotFound: + raise exception.ComputeNodeNotFound(compute_node=uuid) + except Exception as e: + LOG.error( + 'Error occurred while retriving compute node: %s', + six.text_type(e)) + raise + return compute_node + + def get_compute_node(self, context, node_uuid): + try: + node = None + res = self.client.read('/compute_nodes/' + node_uuid) + node = translate_etcd_result(res, 'compute_node') + except etcd.EtcdKeyNotFound: + raise exception.ComputeNodeNotFound(compute_node=node_uuid) + except Exception as e: + LOG.error('Error occurred while retrieving zun compute nodes: %s', + six.text_type(e)) + raise + return node + + @lockutils.synchronized('etcd_computenode') + def update_compute_node(self, context, node_uuid, values): + if 'uuid' in values: + msg = _('Cannot overwrite UUID for an existing node.') + raise exception.InvalidParameterValue(err=msg) + + try: + target = self.client.read('/compute_nodes/' + node_uuid) + target_value = json.loads(target.value) + target_value.update(values) + target.value = json.dumps(target_value) + self.client.update(target) + except etcd.EtcdKeyNotFound: + raise exception.ComputeNodeNotFound(compute_node=node_uuid) + except Exception as e: + LOG.error( + 'Error occurred while updating compute node: %s', + six.text_type(e)) + raise + return translate_etcd_result(target, 'compute_node') + + @lockutils.synchronized('etcd_computenode') + def create_compute_node(self, context, values): + values['created_at'] = datetime.isoformat(timeutils.utcnow()) + if not values.get('uuid'): + values['uuid'] = uuidutils.generate_uuid() + compute_node = models.ComputeNode(values) + compute_node.save() + return compute_node + + @lockutils.synchronized('etcd_compute_node') + def destroy_compute_node(self, context, node_uuid): + compute_node = self._get_compute_node_by_uuid(context, node_uuid) + self.client.delete('/compute_nodes/' + compute_node.uuid) + + def list_compute_nodes(self, context, filters=None, limit=None, + marker=None, sort_key=None, sort_dir=None): + try: + res = getattr(self.client.read('/compute_nodes'), 'children', None) + except etcd.EtcdKeyNotFound: + return [] + except Exception as e: + LOG.error( + "Error occurred while reading from etcd server: %s", + six.text_type(e)) + raise + + compute_nodes = [] + for c in res: + if c.value is not None: + compute_nodes.append(translate_etcd_result(c, 'compute_node')) + if filters: + compute_nodes = self._filter_resources(compute_nodes, filters) + return self._process_list_result(compute_nodes, limit=limit, + sort_key=sort_key) diff --git a/zun/db/etcd/models.py b/zun/db/etcd/models.py index df24426d7..c5e6247b7 100644 --- a/zun/db/etcd/models.py +++ b/zun/db/etcd/models.py @@ -200,3 +200,46 @@ class Capsule(Base): @classmethod def fields(cls): return cls._fields + + +class ComputeNode(Base): + """Represents a compute node. """ + _path = '/compute_nodes' + + _fields = objects.ComputeNode.fields.keys() + + def __init__(self, compute_node_data): + self.path = ComputeNode.path() + for f in ComputeNode.fields(): + setattr(self, f, None) + self.cpus = 0 + self.cpu_used = 0 + self.mem_used = 0 + self.mem_total = 0 + self.mem_free = 0 + self.mem_available = 0 + self.total_containers = 0 + self.stopped_containers = 0 + self.paused_containers = 0 + self.running_containers = 0 + self.update(compute_node_data) + + @classmethod + def path(cls): + return cls._path + + @classmethod + def fields(cls): + return cls._fields + + def save(self, session=None): + if session is None: + session = db.api.get_connection() + client = session.client + path = self.etcd_path(self.uuid) + if self.path_already_exist(client, path): + raise exception.ComputeNodeAlreadyExists( + field='UUID', value=self.uuid) + + client.write(path, json.dump_as_bytes(self.as_dict())) + return diff --git a/zun/tests/unit/db/test_compute_host.py b/zun/tests/unit/db/test_compute_host.py index 443824294..f341dc51a 100644 --- a/zun/tests/unit/db/test_compute_host.py +++ b/zun/tests/unit/db/test_compute_host.py @@ -11,16 +11,21 @@ # under the License. """Tests for manipulating compute nodes via the DB API""" - +import json +import mock from oslo_config import cfg from oslo_utils import uuidutils import six +import etcd +from etcd import Client as etcd_client from zun.common import exception import zun.conf from zun.db import api as dbapi from zun.tests.unit.db import base from zun.tests.unit.db import utils +from zun.tests.unit.db.utils import FakeEtcdMultipleResult +from zun.tests.unit.db.utils import FakeEtcdResult CONF = zun.conf.CONF @@ -159,3 +164,151 @@ class DbComputeNodeTestCase(base.DbTestCase): self.assertRaises(exception.InvalidParameterValue, dbapi.update_compute_node, self.context, node.uuid, {'uuid': ''}) + + +class EtcdDbComputeNodeTestCase(base.DbTestCase): + + def setUp(self): + cfg.CONF.set_override('db_type', 'etcd') + super(EtcdDbComputeNodeTestCase, self).setUp() + + @mock.patch.object(etcd_client, 'read') + @mock.patch.object(etcd_client, 'write') + def test_create_compute_node(self, mock_write, mock_read): + mock_read.side_effect = etcd.EtcdKeyNotFound + utils.create_test_compute_node(context=self.context) + + @mock.patch.object(etcd_client, 'read') + @mock.patch.object(etcd_client, 'write') + def test_create_compute_node_already_exists(self, mock_write, + mock_read): + mock_read.side_effect = etcd.EtcdKeyNotFound + utils.create_test_compute_node(context=self.context, hostname='123') + mock_read.side_effect = lambda *args: None + self.assertRaises(exception.ResourceExists, + utils.create_test_compute_node, + context=self.context, hostname='123') + + @mock.patch.object(etcd_client, 'read') + @mock.patch.object(etcd_client, 'write') + def test_get_compute_node_by_uuid(self, mock_write, mock_read): + mock_read.side_effect = etcd.EtcdKeyNotFound + compute_node = utils.create_test_compute_node( + context=self.context) + mock_read.side_effect = lambda *args: FakeEtcdResult( + compute_node.as_dict()) + res = dbapi.get_compute_node(self.context, compute_node.uuid) + self.assertEqual(compute_node.uuid, res.uuid) + self.assertEqual(compute_node.hostname, res.hostname) + + @mock.patch.object(etcd_client, 'read') + @mock.patch.object(etcd_client, 'write') + def test_get_compute_node_by_name(self, mock_write, mock_read): + mock_read.side_effect = etcd.EtcdKeyNotFound + node = utils.create_test_compute_node(context=self.context) + mock_read.side_effect = lambda *args: FakeEtcdResult( + node.as_dict()) + res = dbapi.get_compute_node(self.context, node.hostname) + self.assertEqual(node.uuid, res.uuid) + + @mock.patch.object(etcd_client, 'read') + def test_get_compute_node_that_does_not_exist(self, mock_read): + mock_read.side_effect = etcd.EtcdKeyNotFound + self.assertRaises(exception.ComputeNodeNotFound, + dbapi.get_compute_node, + self.context, 'fake-ident') + + @mock.patch.object(etcd_client, 'read') + @mock.patch.object(etcd_client, 'write') + def test_list_compute_nodes(self, mock_write, mock_read): + hostnames = [] + compute_nodes = [] + mock_read.side_effect = etcd.EtcdKeyNotFound + for i in range(1, 6): + res_class = utils.create_test_compute_node( + context=self.context, hostname='class'+str(i)) + compute_nodes.append(res_class.as_dict()) + hostnames.append(six.text_type(res_class['hostname'])) + mock_read.side_effect = lambda *args: FakeEtcdMultipleResult( + compute_nodes) + res = dbapi.list_compute_nodes(self.context) + res_names = [r.hostname for r in res] + self.assertEqual(sorted(hostnames), sorted(res_names)) + + @mock.patch.object(etcd_client, 'read') + @mock.patch.object(etcd_client, 'write') + def test_list_compute_nodes_sorted(self, mock_write, mock_read): + hostnames = [] + compute_nodes = [] + mock_read.side_effect = etcd.EtcdKeyNotFound + for i in range(1, 6): + res_class = utils.create_test_compute_node( + context=self.context, hostname='class'+str(i)) + compute_nodes.append(res_class.as_dict()) + hostnames.append(six.text_type(res_class['hostname'])) + mock_read.side_effect = lambda *args: FakeEtcdMultipleResult( + compute_nodes) + res = dbapi.list_compute_nodes(self.context, sort_key='hostname') + res_names = [r.hostname for r in res] + self.assertEqual(sorted(hostnames), res_names) + + @mock.patch.object(etcd_client, 'read') + @mock.patch.object(etcd_client, 'write') + @mock.patch.object(etcd_client, 'delete') + def test_destroy_compute_node(self, mock_delete, + mock_write, mock_read): + mock_read.side_effect = etcd.EtcdKeyNotFound + compute_node = utils.create_test_compute_node( + context=self.context) + mock_read.side_effect = lambda *args: FakeEtcdResult( + compute_node.as_dict()) + dbapi.destroy_compute_node(self.context, compute_node.uuid) + mock_delete.assert_called_once_with( + '/compute_nodes/%s' % compute_node.uuid) + + @mock.patch.object(etcd_client, 'read') + def test_destroy_compute_node_that_does_not_exist(self, mock_read): + mock_read.side_effect = etcd.EtcdKeyNotFound + self.assertRaises(exception.ComputeNodeNotFound, + dbapi.destroy_compute_node, + self.context, + 'ca3e2a25-2901-438d-8157-de7ffd68d535') + + @mock.patch.object(etcd_client, 'read') + @mock.patch.object(etcd_client, 'write') + @mock.patch.object(etcd_client, 'update') + def test_update_compute_node(self, mock_update, + mock_write, mock_read): + mock_read.side_effect = etcd.EtcdKeyNotFound + compute_node = utils.create_test_compute_node( + context=self.context) + old_name = compute_node.hostname + new_name = 'new-name' + self.assertNotEqual(old_name, new_name) + mock_read.side_effect = lambda *args: FakeEtcdResult( + compute_node.as_dict()) + dbapi.update_compute_node( + self.context, compute_node.uuid, {'hostname': new_name}) + self.assertEqual(new_name, json.loads( + mock_update.call_args_list[0][0][0].value)['hostname']) + + @mock.patch.object(etcd_client, 'read') + def test_update_compute_node_not_found(self, mock_read): + mock_read.side_effect = etcd.EtcdKeyNotFound + new_name = 'new-name' + self.assertRaises(exception.ComputeNodeNotFound, + dbapi.update_compute_node, + self.context, + 'ca3e2a25-2901-438d-8157-de7ffd68d535', + {'hostname': new_name}) + + @mock.patch.object(etcd_client, 'read') + @mock.patch.object(etcd_client, 'write') + def test_update_compute_node_uuid(self, mock_write, mock_read): + mock_read.side_effect = etcd.EtcdKeyNotFound + compute_node = utils.create_test_compute_node( + context=self.context) + self.assertRaises(exception.InvalidParameterValue, + dbapi.update_compute_node, + self.context, compute_node.uuid, + {'uuid': ''})