Imply compute node related Etcd API

Change-Id: Idc979865485d8b644e34d2078ed8a4704681e3db
Closes-Bug: 1691345
This commit is contained in:
wangzhh 2017-07-31 09:09:52 -04:00
parent 7055afa44c
commit c41b899449
5 changed files with 308 additions and 6 deletions

View File

@ -81,19 +81,19 @@ class ComputeNodeTracker(object):
""" """
# No memory and cpu specified, no need to claim resource now. # No memory and cpu specified, no need to claim resource now.
if not (container.memory or container.cpu): if not (container.memory or container.cpu):
self._set_container_host(container) self._set_container_host(context, container)
return claims.NopClaim() return claims.NopClaim()
# We should have the compute node created here, just get it. # We should have the compute node created here, just get it.
self.compute_node = self._get_compute_node(context) self.compute_node = self._get_compute_node(context)
if self.disabled(hostname): if self.disabled(hostname):
self._set_container_host(container) self._set_container_host(context, container)
return claims.NopClaim() return claims.NopClaim()
claim = claims.Claim(context, container, self, self.compute_node, claim = claims.Claim(context, container, self, self.compute_node,
limits=limits) limits=limits)
self._set_container_host(container) self._set_container_host(context, container)
self._update_usage_from_container(container) self._update_usage_from_container(container)
# persist changes to the compute node: # persist changes to the compute node:
self._update(self.compute_node) self._update(self.compute_node)
@ -103,14 +103,14 @@ class ComputeNodeTracker(object):
def disabled(self, hostname): def disabled(self, hostname):
return not self.container_driver.node_is_available(hostname) return not self.container_driver.node_is_available(hostname)
def _set_container_host(self, container): def _set_container_host(self, context, container):
"""Tag the container as belonging to this host. """Tag the container as belonging to this host.
This should be done while the COMPUTE_RESOURCES_SEMAPHORE is held so This should be done while the COMPUTE_RESOURCES_SEMAPHORE is held so
the resource claim will not be lost if the audit process starts. the resource claim will not be lost if the audit process starts.
""" """
container.host = self.host container.host = self.host
container.save() container.save(context)
def _update_usage_from_container(self, container, is_removed=False): def _update_usage_from_container(self, container, is_removed=False):
"""Update usage for a single container.""" """Update usage for a single container."""

View File

@ -219,6 +219,8 @@ class DockerDriver(driver.ContainerDriver):
raise raise
def _cleanup_network_for_container(self, container, network_api): def _cleanup_network_for_container(self, container, network_api):
if not container.addresses:
return
for name in container.addresses: for name in container.addresses:
network_api.disconnect_container_from_network(container, name) network_api.disconnect_container_from_network(container, name)

View File

@ -77,6 +77,8 @@ def translate_etcd_result(etcd_result, model_type):
ret = models.Image(data) ret = models.Image(data)
elif model_type == 'resource_class': elif model_type == 'resource_class':
ret = models.ResourceClass(data) ret = models.ResourceClass(data)
elif model_type == 'compute_node':
ret = models.ComputeNode(data)
else: else:
raise exception.InvalidParameterValue( raise exception.InvalidParameterValue(
_('The model_type value: %s is invalid.'), model_type) _('The model_type value: %s is invalid.'), model_type)
@ -534,3 +536,105 @@ class EtcdAPI(object):
six.text_type(e)) six.text_type(e))
raise raise
return translate_etcd_result(target, 'resource_class') return translate_etcd_result(target, 'resource_class')
def get_compute_node_by_hostname(self, context, hostname):
"""Return a compute node.
:param context: The security context
:param hostname: The hostname of a compute node.
:returns: A compute node.
"""
try:
compute_nodes = self.list_compute_nodes(
context, filters={'hostname': hostname})
if compute_nodes:
return compute_nodes[0]
else:
raise exception.ComputeNodeNotFound(compute_node=hostname)
except Exception as e:
LOG.error('Error occurred while retrieving compute node: %s',
six.text_type(e))
raise
def _get_compute_node_by_uuid(self, context, uuid):
try:
compute_node = None
res = self.client.read('/compute_nodes/' + uuid)
compute_node = translate_etcd_result(res, 'compute_node')
except etcd.EtcdKeyNotFound:
raise exception.ComputeNodeNotFound(compute_node=uuid)
except Exception as e:
LOG.error(
'Error occurred while retriving compute node: %s',
six.text_type(e))
raise
return compute_node
def get_compute_node(self, context, node_uuid):
try:
node = None
res = self.client.read('/compute_nodes/' + node_uuid)
node = translate_etcd_result(res, 'compute_node')
except etcd.EtcdKeyNotFound:
raise exception.ComputeNodeNotFound(compute_node=node_uuid)
except Exception as e:
LOG.error('Error occurred while retrieving zun compute nodes: %s',
six.text_type(e))
raise
return node
@lockutils.synchronized('etcd_computenode')
def update_compute_node(self, context, node_uuid, values):
if 'uuid' in values:
msg = _('Cannot overwrite UUID for an existing node.')
raise exception.InvalidParameterValue(err=msg)
try:
target = self.client.read('/compute_nodes/' + node_uuid)
target_value = json.loads(target.value)
target_value.update(values)
target.value = json.dumps(target_value)
self.client.update(target)
except etcd.EtcdKeyNotFound:
raise exception.ComputeNodeNotFound(compute_node=node_uuid)
except Exception as e:
LOG.error(
'Error occurred while updating compute node: %s',
six.text_type(e))
raise
return translate_etcd_result(target, 'compute_node')
@lockutils.synchronized('etcd_computenode')
def create_compute_node(self, context, values):
values['created_at'] = datetime.isoformat(timeutils.utcnow())
if not values.get('uuid'):
values['uuid'] = uuidutils.generate_uuid()
compute_node = models.ComputeNode(values)
compute_node.save()
return compute_node
@lockutils.synchronized('etcd_compute_node')
def destroy_compute_node(self, context, node_uuid):
compute_node = self._get_compute_node_by_uuid(context, node_uuid)
self.client.delete('/compute_nodes/' + compute_node.uuid)
def list_compute_nodes(self, context, filters=None, limit=None,
marker=None, sort_key=None, sort_dir=None):
try:
res = getattr(self.client.read('/compute_nodes'), 'children', None)
except etcd.EtcdKeyNotFound:
return []
except Exception as e:
LOG.error(
"Error occurred while reading from etcd server: %s",
six.text_type(e))
raise
compute_nodes = []
for c in res:
if c.value is not None:
compute_nodes.append(translate_etcd_result(c, 'compute_node'))
if filters:
compute_nodes = self._filter_resources(compute_nodes, filters)
return self._process_list_result(compute_nodes, limit=limit,
sort_key=sort_key)

View File

@ -200,3 +200,46 @@ class Capsule(Base):
@classmethod @classmethod
def fields(cls): def fields(cls):
return cls._fields return cls._fields
class ComputeNode(Base):
"""Represents a compute node. """
_path = '/compute_nodes'
_fields = objects.ComputeNode.fields.keys()
def __init__(self, compute_node_data):
self.path = ComputeNode.path()
for f in ComputeNode.fields():
setattr(self, f, None)
self.cpus = 0
self.cpu_used = 0
self.mem_used = 0
self.mem_total = 0
self.mem_free = 0
self.mem_available = 0
self.total_containers = 0
self.stopped_containers = 0
self.paused_containers = 0
self.running_containers = 0
self.update(compute_node_data)
@classmethod
def path(cls):
return cls._path
@classmethod
def fields(cls):
return cls._fields
def save(self, session=None):
if session is None:
session = db.api.get_connection()
client = session.client
path = self.etcd_path(self.uuid)
if self.path_already_exist(client, path):
raise exception.ComputeNodeAlreadyExists(
field='UUID', value=self.uuid)
client.write(path, json.dump_as_bytes(self.as_dict()))
return

View File

@ -11,16 +11,21 @@
# under the License. # under the License.
"""Tests for manipulating compute nodes via the DB API""" """Tests for manipulating compute nodes via the DB API"""
import json
import mock
from oslo_config import cfg from oslo_config import cfg
from oslo_utils import uuidutils from oslo_utils import uuidutils
import six import six
import etcd
from etcd import Client as etcd_client
from zun.common import exception from zun.common import exception
import zun.conf import zun.conf
from zun.db import api as dbapi from zun.db import api as dbapi
from zun.tests.unit.db import base from zun.tests.unit.db import base
from zun.tests.unit.db import utils from zun.tests.unit.db import utils
from zun.tests.unit.db.utils import FakeEtcdMultipleResult
from zun.tests.unit.db.utils import FakeEtcdResult
CONF = zun.conf.CONF CONF = zun.conf.CONF
@ -159,3 +164,151 @@ class DbComputeNodeTestCase(base.DbTestCase):
self.assertRaises(exception.InvalidParameterValue, self.assertRaises(exception.InvalidParameterValue,
dbapi.update_compute_node, self.context, dbapi.update_compute_node, self.context,
node.uuid, {'uuid': ''}) node.uuid, {'uuid': ''})
class EtcdDbComputeNodeTestCase(base.DbTestCase):
def setUp(self):
cfg.CONF.set_override('db_type', 'etcd')
super(EtcdDbComputeNodeTestCase, self).setUp()
@mock.patch.object(etcd_client, 'read')
@mock.patch.object(etcd_client, 'write')
def test_create_compute_node(self, mock_write, mock_read):
mock_read.side_effect = etcd.EtcdKeyNotFound
utils.create_test_compute_node(context=self.context)
@mock.patch.object(etcd_client, 'read')
@mock.patch.object(etcd_client, 'write')
def test_create_compute_node_already_exists(self, mock_write,
mock_read):
mock_read.side_effect = etcd.EtcdKeyNotFound
utils.create_test_compute_node(context=self.context, hostname='123')
mock_read.side_effect = lambda *args: None
self.assertRaises(exception.ResourceExists,
utils.create_test_compute_node,
context=self.context, hostname='123')
@mock.patch.object(etcd_client, 'read')
@mock.patch.object(etcd_client, 'write')
def test_get_compute_node_by_uuid(self, mock_write, mock_read):
mock_read.side_effect = etcd.EtcdKeyNotFound
compute_node = utils.create_test_compute_node(
context=self.context)
mock_read.side_effect = lambda *args: FakeEtcdResult(
compute_node.as_dict())
res = dbapi.get_compute_node(self.context, compute_node.uuid)
self.assertEqual(compute_node.uuid, res.uuid)
self.assertEqual(compute_node.hostname, res.hostname)
@mock.patch.object(etcd_client, 'read')
@mock.patch.object(etcd_client, 'write')
def test_get_compute_node_by_name(self, mock_write, mock_read):
mock_read.side_effect = etcd.EtcdKeyNotFound
node = utils.create_test_compute_node(context=self.context)
mock_read.side_effect = lambda *args: FakeEtcdResult(
node.as_dict())
res = dbapi.get_compute_node(self.context, node.hostname)
self.assertEqual(node.uuid, res.uuid)
@mock.patch.object(etcd_client, 'read')
def test_get_compute_node_that_does_not_exist(self, mock_read):
mock_read.side_effect = etcd.EtcdKeyNotFound
self.assertRaises(exception.ComputeNodeNotFound,
dbapi.get_compute_node,
self.context, 'fake-ident')
@mock.patch.object(etcd_client, 'read')
@mock.patch.object(etcd_client, 'write')
def test_list_compute_nodes(self, mock_write, mock_read):
hostnames = []
compute_nodes = []
mock_read.side_effect = etcd.EtcdKeyNotFound
for i in range(1, 6):
res_class = utils.create_test_compute_node(
context=self.context, hostname='class'+str(i))
compute_nodes.append(res_class.as_dict())
hostnames.append(six.text_type(res_class['hostname']))
mock_read.side_effect = lambda *args: FakeEtcdMultipleResult(
compute_nodes)
res = dbapi.list_compute_nodes(self.context)
res_names = [r.hostname for r in res]
self.assertEqual(sorted(hostnames), sorted(res_names))
@mock.patch.object(etcd_client, 'read')
@mock.patch.object(etcd_client, 'write')
def test_list_compute_nodes_sorted(self, mock_write, mock_read):
hostnames = []
compute_nodes = []
mock_read.side_effect = etcd.EtcdKeyNotFound
for i in range(1, 6):
res_class = utils.create_test_compute_node(
context=self.context, hostname='class'+str(i))
compute_nodes.append(res_class.as_dict())
hostnames.append(six.text_type(res_class['hostname']))
mock_read.side_effect = lambda *args: FakeEtcdMultipleResult(
compute_nodes)
res = dbapi.list_compute_nodes(self.context, sort_key='hostname')
res_names = [r.hostname for r in res]
self.assertEqual(sorted(hostnames), res_names)
@mock.patch.object(etcd_client, 'read')
@mock.patch.object(etcd_client, 'write')
@mock.patch.object(etcd_client, 'delete')
def test_destroy_compute_node(self, mock_delete,
mock_write, mock_read):
mock_read.side_effect = etcd.EtcdKeyNotFound
compute_node = utils.create_test_compute_node(
context=self.context)
mock_read.side_effect = lambda *args: FakeEtcdResult(
compute_node.as_dict())
dbapi.destroy_compute_node(self.context, compute_node.uuid)
mock_delete.assert_called_once_with(
'/compute_nodes/%s' % compute_node.uuid)
@mock.patch.object(etcd_client, 'read')
def test_destroy_compute_node_that_does_not_exist(self, mock_read):
mock_read.side_effect = etcd.EtcdKeyNotFound
self.assertRaises(exception.ComputeNodeNotFound,
dbapi.destroy_compute_node,
self.context,
'ca3e2a25-2901-438d-8157-de7ffd68d535')
@mock.patch.object(etcd_client, 'read')
@mock.patch.object(etcd_client, 'write')
@mock.patch.object(etcd_client, 'update')
def test_update_compute_node(self, mock_update,
mock_write, mock_read):
mock_read.side_effect = etcd.EtcdKeyNotFound
compute_node = utils.create_test_compute_node(
context=self.context)
old_name = compute_node.hostname
new_name = 'new-name'
self.assertNotEqual(old_name, new_name)
mock_read.side_effect = lambda *args: FakeEtcdResult(
compute_node.as_dict())
dbapi.update_compute_node(
self.context, compute_node.uuid, {'hostname': new_name})
self.assertEqual(new_name, json.loads(
mock_update.call_args_list[0][0][0].value)['hostname'])
@mock.patch.object(etcd_client, 'read')
def test_update_compute_node_not_found(self, mock_read):
mock_read.side_effect = etcd.EtcdKeyNotFound
new_name = 'new-name'
self.assertRaises(exception.ComputeNodeNotFound,
dbapi.update_compute_node,
self.context,
'ca3e2a25-2901-438d-8157-de7ffd68d535',
{'hostname': new_name})
@mock.patch.object(etcd_client, 'read')
@mock.patch.object(etcd_client, 'write')
def test_update_compute_node_uuid(self, mock_write, mock_read):
mock_read.side_effect = etcd.EtcdKeyNotFound
compute_node = utils.create_test_compute_node(
context=self.context)
self.assertRaises(exception.InvalidParameterValue,
dbapi.update_compute_node,
self.context, compute_node.uuid,
{'uuid': ''})