From 6ebf9b65a78bef10051f0191ed4021e08750c678 Mon Sep 17 00:00:00 2001 From: Hongbin Lu Date: Sat, 4 Jan 2020 23:40:28 +0000 Subject: [PATCH] Introduce a CRI driver Implements: blueprint add-support-cri-runtime Change-Id: Ia24e3aafd72ca6250c5c61f62352beb569aa7423 --- setup.cfg | 4 + zun/common/consts.py | 1 + zun/compute/compute_node_tracker.py | 3 +- zun/conf/container_driver.py | 1 + zun/container/cri/__init__.py | 0 zun/container/cri/driver.py | 299 ++++++++++++++++++++++++++++ zun/network/os_vif_util.py | 243 ++++++++++++++++++++++ 7 files changed, 550 insertions(+), 1 deletion(-) create mode 100644 zun/container/cri/__init__.py create mode 100644 zun/container/cri/driver.py create mode 100644 zun/network/os_vif_util.py diff --git a/setup.cfg b/setup.cfg index b0559e5c6..a06f3d9cf 100644 --- a/setup.cfg +++ b/setup.cfg @@ -74,6 +74,7 @@ zun.container.driver = zun.capsule.driver = docker = zun.container.docker.driver:DockerDriver + cri = zun.container.cri.driver:CriDriver fake = zun.tests.unit.container.fake_driver:FakeDriver zun.image.driver = @@ -91,6 +92,9 @@ zun.cni.binding = VIFBridge = zun.cni.binding.bridge:BridgeDriver VIFOpenVSwitch = zun.cni.binding.bridge:VIFOpenVSwitchDriver +zun.vif_translators = + ovs = zun.network.os_vif_util:neutron_to_osvif_vif_ovs + [extras] osprofiler = osprofiler>=1.4.0 # Apache-2.0 diff --git a/zun/common/consts.py b/zun/common/consts.py index 115da3bbd..7f3299c3d 100644 --- a/zun/common/consts.py +++ b/zun/common/consts.py @@ -77,6 +77,7 @@ CUSTOM_TRAITS = ( # neutron related constants BINDING_PROFILE = 'binding:profile' BINDING_HOST_ID = 'binding:host_id' +DEVICE_OWNER_ZUN = 'compute:zun' # CNI constants CNI_EXCEPTION_CODE = 100 diff --git a/zun/compute/compute_node_tracker.py b/zun/compute/compute_node_tracker.py index c393ce4c2..7a7b55e6a 100644 --- a/zun/compute/compute_node_tracker.py +++ b/zun/compute/compute_node_tracker.py @@ -443,9 +443,10 @@ class ComputeNodeTracker(object): # Grab all containers assigned to this node: containers = objects.Container.list_by_host(context, self.host) + capsules = objects.Capsule.list_by_host(context, self.host) # Now calculate usage based on container utilization: - self._update_usage_from_containers(context, containers) + self._update_usage_from_containers(context, containers + capsules) # No migration for docker, is there will be orphan container? Nova has. diff --git a/zun/conf/container_driver.py b/zun/conf/container_driver.py index 1366fce0c..01306439b 100644 --- a/zun/conf/container_driver.py +++ b/zun/conf/container_driver.py @@ -35,6 +35,7 @@ Interdependencies to other options: Possible values: * ``docker`` +* ``cri`` Services which consume this: diff --git a/zun/container/cri/__init__.py b/zun/container/cri/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/zun/container/cri/driver.py b/zun/container/cri/driver.py new file mode 100644 index 000000000..ab9b1beec --- /dev/null +++ b/zun/container/cri/driver.py @@ -0,0 +1,299 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc +from oslo_log import log as logging +import tenacity + +from zun.common import consts +from zun.common import context as zun_context +from zun.common import exception +from zun.common.i18n import _ +from zun.common import utils +import zun.conf +from zun.container import driver +from zun.criapi import api_pb2 +from zun.criapi import api_pb2_grpc +from zun.network import neutron +from zun.network import os_vif_util +from zun import objects + + +CONF = zun.conf.CONF +LOG = logging.getLogger(__name__) + + +class CriDriver(driver.BaseDriver, driver.CapsuleDriver): + """Implementation of container drivers for CRI runtime.""" + + # TODO(hongbin): define a list of capabilities of this driver. + capabilities = {} + + def __init__(self): + super(CriDriver, self).__init__() + channel = grpc.insecure_channel( + 'unix:///run/containerd/containerd.sock') + self.runtime_stub = api_pb2_grpc.RuntimeServiceStub(channel) + self.image_stub = api_pb2_grpc.ImageServiceStub(channel) + + def create_capsule(self, context, capsule, image, requested_networks, + requested_volumes): + + self._create_pod_sandbox(context, capsule, requested_networks) + + # TODO(hongbin): handle init containers + for container in capsule.init_containers: + self._create_container(context, capsule, container, + requested_networks, + requested_volumes) + self._wait_for_init_container(context, container) + + for container in capsule.containers: + self._create_container(context, capsule, container, + requested_networks, + requested_volumes) + + capsule.status = consts.RUNNING + return capsule + + def _create_pod_sandbox(self, context, capsule, requested_networks): + sandbox_config = self._get_sandbox_config(capsule) + + self._write_cni_metadata(context, capsule, requested_networks) + sandbox_resp = self.runtime_stub.RunPodSandbox( + api_pb2.RunPodSandboxRequest(config=sandbox_config)) + LOG.debug("podsandbox is created: %s", sandbox_resp) + capsule.container_id = sandbox_resp.pod_sandbox_id + + def _get_sandbox_config(self, capsule): + return api_pb2.PodSandboxConfig( + metadata=api_pb2.PodSandboxMetadata( + name=capsule.uuid, namespace="default", uid=capsule.uuid + ) + ) + + def _write_cni_metadata(self, context, capsule, requested_networks): + neutron_api = neutron.NeutronAPI(context) + security_group_ids = utils.get_security_group_ids( + context, capsule.security_groups) + # TODO(hongbin): handle multiple nics + requested_network = requested_networks[0] + network_id = requested_network['network'] + addresses, port = neutron_api.create_or_update_port( + capsule, network_id, requested_network, consts.DEVICE_OWNER_ZUN, + security_group_ids, set_binding_host=True) + capsule.addresses = {network_id: addresses} + + neutron_api = neutron.NeutronAPI(zun_context.get_admin_context()) + network = neutron_api.show_network(port['network_id'])['network'] + subnets = {} + for fixed_ip in port['fixed_ips']: + subnet_id = fixed_ip['subnet_id'] + subnets[subnet_id] = neutron_api.show_subnet(subnet_id)['subnet'] + vif_plugin = port.get('binding:vif_type') + vif_obj = os_vif_util.neutron_to_osvif_vif(vif_plugin, port, network, + subnets) + state = objects.vif.VIFState(default_vif=vif_obj) + state_dict = state.obj_to_primitive() + capsule.cni_metadata = {consts.CNI_METADATA_VIF: state_dict} + capsule.save(context) + + def _create_container(self, context, capsule, container, + requested_networks, requested_volumes): + # pull image + self._pull_image(context, container) + + sandbox_config = self._get_sandbox_config(capsule) + container_config = self._get_container_config(context, container, + requested_volumes) + response = self.runtime_stub.CreateContainer( + api_pb2.CreateContainerRequest( + pod_sandbox_id=capsule.container_id, + config=container_config, + sandbox_config=sandbox_config, + ) + ) + + LOG.debug("container is created: %s", response) + container.container_id = response.container_id + container.save(context) + + response = self.runtime_stub.StartContainer( + api_pb2.StartContainerRequest( + container_id=container.container_id + ) + ) + LOG.debug("container is started: %s", response) + + def _get_container_config(self, context, container, requested_volumes): + args = [] + if container.command: + args = [str(c) for c in container.command] + envs = [] + if container.environment: + envs = [api_pb2.KeyValue(key=str(k), value=str(v)) + for k, v in container.environment.items()] + mounts = [] + if container.uuid in requested_volumes: + req_volume = requested_volumes[container.uuid] + mounts = self._get_mounts(context, req_volume) + working_dir = container.workdir or "" + labels = container.labels or [] + + cpu = 0 + if container.cpu is not None: + cpu = int(1024 * container.cpu) + memory = 0 + if container.memory is not None: + memory = int(container.memory) * 1024 * 1024 + linux_config = api_pb2.LinuxContainerConfig( + security_context=api_pb2.LinuxContainerSecurityContext( + privileged=container.privileged + ), + resources={ + 'cpu_shares': cpu, + 'memory_limit_in_bytes': memory, + } + ) + + # TODO(hongbin): add support for entrypoint + return api_pb2.ContainerConfig( + metadata=api_pb2.ContainerMetadata(name=container.name), + image=api_pb2.ImageSpec(image=container.image), + tty=container.tty, + stdin=container.interactive, + args=args, + envs=envs, + working_dir=working_dir, + labels=labels, + mounts=mounts, + linux=linux_config, + ) + + def _pull_image(self, context, container): + # TODO(hongbin): add support for private registry + response = self.image_stub.PullImage( + api_pb2.PullImageRequest( + image=api_pb2.ImageSpec(image=container.image) + ) + ) + LOG.debug("image is pulled: %s", response) + + def _get_mounts(self, context, volmaps): + mounts = [] + for volume in volmaps: + volume_driver = self._get_volume_driver(volume) + source, destination = volume_driver.bind_mount(context, volume) + mounts.append(api_pb2.Mount(container_path=destination, + host_path=source)) + return mounts + + def _wait_for_init_container(self, context, container, timeout=3600): + def retry_if_result_is_false(result): + return result is False + + def check_init_container_stopped(): + status = self._show_container(context, container).status + if status == consts.STOPPED: + return True + elif status == consts.RUNNING: + return False + else: + raise exception.ZunException( + _("Container has unexpected status: %s") % status) + + r = tenacity.Retrying( + stop=tenacity.stop_after_delay(timeout), + wait=tenacity.wait_exponential(), + retry=tenacity.retry_if_result(retry_if_result_is_false)) + r.call(check_init_container_stopped) + + def _show_container(self, context, container): + container_id = container.container_id + if not container_id: + return + + response = self.runtime_stub.ListContainers( + api_pb2.ListContainersRequest( + filter={'id': container_id} + ) + ) + if not response.containers: + raise exception.ZunException( + "Container %s is not found in runtime", container_id) + + container_response = response.containers[0] + self._populate_container(container, container_response) + return container + + def _populate_container(self, container, response): + self._populate_container_state(container, response) + + def _populate_container_state(self, container, response): + state = response.state + if state == api_pb2.ContainerState.CONTAINER_CREATED: + container.status = consts.CREATED + elif state == api_pb2.ContainerState.CONTAINER_RUNNING: + container.status = consts.RUNNING + elif state == api_pb2.ContainerState.CONTAINER_EXITED: + container.status = consts.STOPPED + elif state == api_pb2.ContainerState.CONTAINER_UNKNOWN: + LOG.debug('State is unknown, status: %s', state) + container.status = consts.UNKNOWN + else: + LOG.warning('Receive unexpected state from CRI runtime: %s', state) + container.status = consts.UNKNOWN + container.status_reason = "container state unknown" + + def delete_capsule(self, context, capsule, force): + pod_id = capsule.container_id + if not pod_id: + return + + try: + response = self.runtime_stub.StopPodSandbox( + api_pb2.StopPodSandboxRequest( + pod_sandbox_id=capsule.container_id, + ) + ) + LOG.debug("podsandbox is stopped: %s", response) + response = self.runtime_stub.RemovePodSandbox( + api_pb2.RemovePodSandboxRequest( + pod_sandbox_id=capsule.container_id, + ) + ) + LOG.debug("podsandbox is removed: %s", response) + except exception.CommandError as e: + if 'error occurred when try to find sandbox' in str(e): + LOG.error("cannot find pod sandbox in runtime") + pass + else: + raise + + self._delete_neutron_ports(context, capsule) + + def _delete_neutron_ports(self, context, capsule): + if not capsule.addresses: + return + + neutron_ports = set() + all_ports = set() + for net_uuid, addrs_list in capsule.addresses.items(): + for addr in addrs_list: + all_ports.add(addr['port']) + if not addr['preserve_on_delete']: + port_id = addr['port'] + neutron_ports.add(port_id) + + neutron_api = neutron.NeutronAPI(context) + neutron_api.delete_or_unbind_ports(all_ports, neutron_ports) diff --git a/zun/network/os_vif_util.py b/zun/network/os_vif_util.py new file mode 100644 index 000000000..76f67428e --- /dev/null +++ b/zun/network/os_vif_util.py @@ -0,0 +1,243 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from kuryr.lib.binding.drivers import utils as kl_utils +from kuryr.lib import constants as kl_const +from os_vif.objects import fixed_ip as osv_fixed_ip +from os_vif.objects import network as osv_network +from os_vif.objects import route as osv_route +from os_vif.objects import subnet as osv_subnet +from os_vif.objects import vif as osv_vif +from oslo_config import cfg as oslo_cfg +from stevedore import driver as stv_driver + +from zun.common import exception +from zun.common import utils + + +# REVISIT(ivc): consider making this module part of kuryr-lib +_VIF_TRANSLATOR_NAMESPACE = "zun.vif_translators" +_VIF_MANAGERS = {} + + +def _neutron_to_osvif_network(os_network): + """Converts Neutron network to os-vif Subnet. + + :param os_network: openstack.network.v2.netwrork.Network object. + :return: an os-vif Network object + """ + + obj = osv_network.Network(id=os_network['id']) + + if os_network['name'] is not None: + obj.label = os_network['name'] + + if os_network['mtu'] is not None: + obj.mtu = os_network['mtu'] + + # Vlan information will be used later in Sriov binding driver + if os_network['provider:network_type'] == 'vlan': + obj.should_provide_vlan = True + obj.vlan = os_network['provider:segmentation_id'] + + return obj + + +def _neutron_to_osvif_subnet(os_subnet): + """Converts Neutron subnet to os-vif Subnet. + + :param os_subnet: openstack.network.v2.subnet.Subnet object + :return: an os-vif Subnet object + """ + + obj = osv_subnet.Subnet( + cidr=os_subnet['cidr'], + dns=os_subnet['dns_nameservers'], + routes=_neutron_to_osvif_routes(os_subnet['host_routes'])) + + if os_subnet['gateway_ip'] is not None: + obj.gateway = os_subnet['gateway_ip'] + + return obj + + +def _neutron_to_osvif_routes(neutron_routes): + """Converts Neutron host_routes to os-vif RouteList. + + :param neutron_routes: list of routes as returned by neutron client's + 'show_subnet' in 'host_routes' attribute + :return: an os-vif RouteList object + """ + + # NOTE(gryf): Nested attributes for OpenStackSDK objects are simple types, + # like dicts and lists, that's why neutron_routes is a list of dicts. + obj_list = [osv_route.Route(cidr=route['destination'], + gateway=route['nexthop']) + for route in neutron_routes] + + return osv_route.RouteList(objects=obj_list) + + +def _make_vif_subnet(subnets, subnet_id): + """Makes a copy of an os-vif Subnet from subnets mapping. + + :param subnets: subnet mapping + :param subnet_id: ID of the subnet to extract from 'subnets' mapping + :return: a copy of an os-vif Subnet object matching 'subnet_id' + """ + subnet = subnets[subnet_id] + + vif_subnet = _neutron_to_osvif_subnet(subnet) + vif_subnet.ips = osv_fixed_ip.FixedIPList(objects=[]) + return vif_subnet + + +def _make_vif_subnets(neutron_port, subnets): + """Gets a list of os-vif Subnet objects for port. + + :param neutron_port: dict containing port information as returned by + neutron client's 'show_port' + :param subnets: subnet mapping + :return: list of os-vif Subnet object + """ + + vif_subnets = {} + + for neutron_fixed_ip in neutron_port.get('fixed_ips', []): + subnet_id = neutron_fixed_ip['subnet_id'] + ip_address = neutron_fixed_ip['ip_address'] + + if subnet_id not in subnets: + continue + + try: + subnet = vif_subnets[subnet_id] + except KeyError: + subnet = _make_vif_subnet(subnets, subnet_id) + vif_subnets[subnet_id] = subnet + + subnet.ips.objects.append(osv_fixed_ip.FixedIP(address=ip_address)) + + if not vif_subnets: + raise exception.ZunException( + "No valid subnets found for port %(port_id)s" + % {'port_id': neutron_port.get('id')}) + + return list(vif_subnets.values()) + + +def _make_vif_network(neutron_port, network, subnets): + """Get an os-vif Network object for port. + + :param neutron_port: dict containing port information as returned by + neutron client's 'show_port' + :param subnets: subnet mapping + :return: os-vif Network object + """ + + vif_network = _neutron_to_osvif_network(network) + vif_network.subnets = osv_subnet.SubnetList( + objects=_make_vif_subnets(neutron_port, subnets)) + + return vif_network + + +def _get_vif_name(neutron_port): + """Gets a VIF device name for port. + + :param neutron_port: dict containing port information as returned by + neutron client's 'show_port' + """ + + vif_name, _ = kl_utils.get_veth_pair_names(neutron_port['id']) + return vif_name + + +def _get_ovs_hybrid_bridge_name(neutron_port): + """Gets a name of the Linux bridge name for hybrid OpenVSwitch port. + + :param neutron_port: dict containing port information as returned by + neutron client's 'show_port' + """ + return ('qbr' + neutron_port['id'])[:kl_const.NIC_NAME_LEN] + + +def neutron_to_osvif_vif_ovs(vif_plugin, neutron_port, network, subnets): + """Converts Neutron port to VIF object for os-vif 'ovs' plugin. + + :param vif_plugin: name of the os-vif plugin to use (i.e. 'ovs') + :param neutron_port: dict containing port information as returned by + neutron client's 'show_port' + :param subnets: subnet mapping + :return: os-vif VIF object + """ + + profile = osv_vif.VIFPortProfileOpenVSwitch( + interface_id=neutron_port['id']) + + details = neutron_port.get('binding:vif_details', {}) + ovs_bridge = details.get('bridge_name', + oslo_cfg.CONF.neutron.ovs_bridge) + if not ovs_bridge: + raise oslo_cfg.RequiredOptError('ovs_bridge', 'neutron_defaults') + + network = _make_vif_network(neutron_port, network, subnets) + network.bridge = ovs_bridge + + if details.get('ovs_hybrid_plug'): + vif = osv_vif.VIFBridge( + id=neutron_port['id'], + address=neutron_port['mac_address'], + network=network, + has_traffic_filtering=details.get('port_filter', False), + preserve_on_delete=False, + active=utils.is_port_active(neutron_port), + port_profile=profile, + plugin=vif_plugin, + vif_name=_get_vif_name(neutron_port), + bridge_name=_get_ovs_hybrid_bridge_name(neutron_port)) + else: + vif = osv_vif.VIFOpenVSwitch( + id=neutron_port['id'], + address=neutron_port['mac_address'], + network=network, + has_traffic_filtering=details.get('port_filter', False), + preserve_on_delete=False, + active=utils.is_port_active(neutron_port), + port_profile=profile, + plugin=vif_plugin, + vif_name=_get_vif_name(neutron_port), + bridge_name=network.bridge) + + return vif + + +def neutron_to_osvif_vif(vif_translator, neutron_port, network, subnets): + """Converts Neutron port to os-vif VIF object. + + :param vif_translator: name of the traslator for the os-vif plugin to use + :param neutron_port: dict containing port information as returned by + neutron client + :param subnets: subnet mapping + :return: os-vif VIF object + """ + + try: + mgr = _VIF_MANAGERS[vif_translator] + except KeyError: + mgr = stv_driver.DriverManager( + namespace=_VIF_TRANSLATOR_NAMESPACE, + name=vif_translator, invoke_on_load=False) + _VIF_MANAGERS[vif_translator] = mgr + + return mgr.driver(vif_translator, neutron_port, network, subnets)