Introduce a CRI driver

Implements: blueprint add-support-cri-runtime
Change-Id: Ia24e3aafd72ca6250c5c61f62352beb569aa7423
This commit is contained in:
Hongbin Lu 2020-01-04 23:40:28 +00:00
parent 63735680c3
commit 6ebf9b65a7
7 changed files with 550 additions and 1 deletions

View File

@ -74,6 +74,7 @@ zun.container.driver =
zun.capsule.driver =
docker = zun.container.docker.driver:DockerDriver
cri = zun.container.cri.driver:CriDriver
fake = zun.tests.unit.container.fake_driver:FakeDriver
zun.image.driver =
@ -91,6 +92,9 @@ zun.cni.binding =
VIFBridge = zun.cni.binding.bridge:BridgeDriver
VIFOpenVSwitch = zun.cni.binding.bridge:VIFOpenVSwitchDriver
zun.vif_translators =
ovs = zun.network.os_vif_util:neutron_to_osvif_vif_ovs
[extras]
osprofiler =
osprofiler>=1.4.0 # Apache-2.0

View File

@ -77,6 +77,7 @@ CUSTOM_TRAITS = (
# neutron related constants
BINDING_PROFILE = 'binding:profile'
BINDING_HOST_ID = 'binding:host_id'
DEVICE_OWNER_ZUN = 'compute:zun'
# CNI constants
CNI_EXCEPTION_CODE = 100

View File

@ -443,9 +443,10 @@ class ComputeNodeTracker(object):
# Grab all containers assigned to this node:
containers = objects.Container.list_by_host(context, self.host)
capsules = objects.Capsule.list_by_host(context, self.host)
# Now calculate usage based on container utilization:
self._update_usage_from_containers(context, containers)
self._update_usage_from_containers(context, containers + capsules)
# No migration for docker, is there will be orphan container? Nova has.

View File

@ -35,6 +35,7 @@ Interdependencies to other options:
Possible values:
* ``docker``
* ``cri``
Services which consume this:

View File

299
zun/container/cri/driver.py Normal file
View File

@ -0,0 +1,299 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc
from oslo_log import log as logging
import tenacity
from zun.common import consts
from zun.common import context as zun_context
from zun.common import exception
from zun.common.i18n import _
from zun.common import utils
import zun.conf
from zun.container import driver
from zun.criapi import api_pb2
from zun.criapi import api_pb2_grpc
from zun.network import neutron
from zun.network import os_vif_util
from zun import objects
CONF = zun.conf.CONF
LOG = logging.getLogger(__name__)
class CriDriver(driver.BaseDriver, driver.CapsuleDriver):
"""Implementation of container drivers for CRI runtime."""
# TODO(hongbin): define a list of capabilities of this driver.
capabilities = {}
def __init__(self):
super(CriDriver, self).__init__()
channel = grpc.insecure_channel(
'unix:///run/containerd/containerd.sock')
self.runtime_stub = api_pb2_grpc.RuntimeServiceStub(channel)
self.image_stub = api_pb2_grpc.ImageServiceStub(channel)
def create_capsule(self, context, capsule, image, requested_networks,
requested_volumes):
self._create_pod_sandbox(context, capsule, requested_networks)
# TODO(hongbin): handle init containers
for container in capsule.init_containers:
self._create_container(context, capsule, container,
requested_networks,
requested_volumes)
self._wait_for_init_container(context, container)
for container in capsule.containers:
self._create_container(context, capsule, container,
requested_networks,
requested_volumes)
capsule.status = consts.RUNNING
return capsule
def _create_pod_sandbox(self, context, capsule, requested_networks):
sandbox_config = self._get_sandbox_config(capsule)
self._write_cni_metadata(context, capsule, requested_networks)
sandbox_resp = self.runtime_stub.RunPodSandbox(
api_pb2.RunPodSandboxRequest(config=sandbox_config))
LOG.debug("podsandbox is created: %s", sandbox_resp)
capsule.container_id = sandbox_resp.pod_sandbox_id
def _get_sandbox_config(self, capsule):
return api_pb2.PodSandboxConfig(
metadata=api_pb2.PodSandboxMetadata(
name=capsule.uuid, namespace="default", uid=capsule.uuid
)
)
def _write_cni_metadata(self, context, capsule, requested_networks):
neutron_api = neutron.NeutronAPI(context)
security_group_ids = utils.get_security_group_ids(
context, capsule.security_groups)
# TODO(hongbin): handle multiple nics
requested_network = requested_networks[0]
network_id = requested_network['network']
addresses, port = neutron_api.create_or_update_port(
capsule, network_id, requested_network, consts.DEVICE_OWNER_ZUN,
security_group_ids, set_binding_host=True)
capsule.addresses = {network_id: addresses}
neutron_api = neutron.NeutronAPI(zun_context.get_admin_context())
network = neutron_api.show_network(port['network_id'])['network']
subnets = {}
for fixed_ip in port['fixed_ips']:
subnet_id = fixed_ip['subnet_id']
subnets[subnet_id] = neutron_api.show_subnet(subnet_id)['subnet']
vif_plugin = port.get('binding:vif_type')
vif_obj = os_vif_util.neutron_to_osvif_vif(vif_plugin, port, network,
subnets)
state = objects.vif.VIFState(default_vif=vif_obj)
state_dict = state.obj_to_primitive()
capsule.cni_metadata = {consts.CNI_METADATA_VIF: state_dict}
capsule.save(context)
def _create_container(self, context, capsule, container,
requested_networks, requested_volumes):
# pull image
self._pull_image(context, container)
sandbox_config = self._get_sandbox_config(capsule)
container_config = self._get_container_config(context, container,
requested_volumes)
response = self.runtime_stub.CreateContainer(
api_pb2.CreateContainerRequest(
pod_sandbox_id=capsule.container_id,
config=container_config,
sandbox_config=sandbox_config,
)
)
LOG.debug("container is created: %s", response)
container.container_id = response.container_id
container.save(context)
response = self.runtime_stub.StartContainer(
api_pb2.StartContainerRequest(
container_id=container.container_id
)
)
LOG.debug("container is started: %s", response)
def _get_container_config(self, context, container, requested_volumes):
args = []
if container.command:
args = [str(c) for c in container.command]
envs = []
if container.environment:
envs = [api_pb2.KeyValue(key=str(k), value=str(v))
for k, v in container.environment.items()]
mounts = []
if container.uuid in requested_volumes:
req_volume = requested_volumes[container.uuid]
mounts = self._get_mounts(context, req_volume)
working_dir = container.workdir or ""
labels = container.labels or []
cpu = 0
if container.cpu is not None:
cpu = int(1024 * container.cpu)
memory = 0
if container.memory is not None:
memory = int(container.memory) * 1024 * 1024
linux_config = api_pb2.LinuxContainerConfig(
security_context=api_pb2.LinuxContainerSecurityContext(
privileged=container.privileged
),
resources={
'cpu_shares': cpu,
'memory_limit_in_bytes': memory,
}
)
# TODO(hongbin): add support for entrypoint
return api_pb2.ContainerConfig(
metadata=api_pb2.ContainerMetadata(name=container.name),
image=api_pb2.ImageSpec(image=container.image),
tty=container.tty,
stdin=container.interactive,
args=args,
envs=envs,
working_dir=working_dir,
labels=labels,
mounts=mounts,
linux=linux_config,
)
def _pull_image(self, context, container):
# TODO(hongbin): add support for private registry
response = self.image_stub.PullImage(
api_pb2.PullImageRequest(
image=api_pb2.ImageSpec(image=container.image)
)
)
LOG.debug("image is pulled: %s", response)
def _get_mounts(self, context, volmaps):
mounts = []
for volume in volmaps:
volume_driver = self._get_volume_driver(volume)
source, destination = volume_driver.bind_mount(context, volume)
mounts.append(api_pb2.Mount(container_path=destination,
host_path=source))
return mounts
def _wait_for_init_container(self, context, container, timeout=3600):
def retry_if_result_is_false(result):
return result is False
def check_init_container_stopped():
status = self._show_container(context, container).status
if status == consts.STOPPED:
return True
elif status == consts.RUNNING:
return False
else:
raise exception.ZunException(
_("Container has unexpected status: %s") % status)
r = tenacity.Retrying(
stop=tenacity.stop_after_delay(timeout),
wait=tenacity.wait_exponential(),
retry=tenacity.retry_if_result(retry_if_result_is_false))
r.call(check_init_container_stopped)
def _show_container(self, context, container):
container_id = container.container_id
if not container_id:
return
response = self.runtime_stub.ListContainers(
api_pb2.ListContainersRequest(
filter={'id': container_id}
)
)
if not response.containers:
raise exception.ZunException(
"Container %s is not found in runtime", container_id)
container_response = response.containers[0]
self._populate_container(container, container_response)
return container
def _populate_container(self, container, response):
self._populate_container_state(container, response)
def _populate_container_state(self, container, response):
state = response.state
if state == api_pb2.ContainerState.CONTAINER_CREATED:
container.status = consts.CREATED
elif state == api_pb2.ContainerState.CONTAINER_RUNNING:
container.status = consts.RUNNING
elif state == api_pb2.ContainerState.CONTAINER_EXITED:
container.status = consts.STOPPED
elif state == api_pb2.ContainerState.CONTAINER_UNKNOWN:
LOG.debug('State is unknown, status: %s', state)
container.status = consts.UNKNOWN
else:
LOG.warning('Receive unexpected state from CRI runtime: %s', state)
container.status = consts.UNKNOWN
container.status_reason = "container state unknown"
def delete_capsule(self, context, capsule, force):
pod_id = capsule.container_id
if not pod_id:
return
try:
response = self.runtime_stub.StopPodSandbox(
api_pb2.StopPodSandboxRequest(
pod_sandbox_id=capsule.container_id,
)
)
LOG.debug("podsandbox is stopped: %s", response)
response = self.runtime_stub.RemovePodSandbox(
api_pb2.RemovePodSandboxRequest(
pod_sandbox_id=capsule.container_id,
)
)
LOG.debug("podsandbox is removed: %s", response)
except exception.CommandError as e:
if 'error occurred when try to find sandbox' in str(e):
LOG.error("cannot find pod sandbox in runtime")
pass
else:
raise
self._delete_neutron_ports(context, capsule)
def _delete_neutron_ports(self, context, capsule):
if not capsule.addresses:
return
neutron_ports = set()
all_ports = set()
for net_uuid, addrs_list in capsule.addresses.items():
for addr in addrs_list:
all_ports.add(addr['port'])
if not addr['preserve_on_delete']:
port_id = addr['port']
neutron_ports.add(port_id)
neutron_api = neutron.NeutronAPI(context)
neutron_api.delete_or_unbind_ports(all_ports, neutron_ports)

243
zun/network/os_vif_util.py Normal file
View File

@ -0,0 +1,243 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kuryr.lib.binding.drivers import utils as kl_utils
from kuryr.lib import constants as kl_const
from os_vif.objects import fixed_ip as osv_fixed_ip
from os_vif.objects import network as osv_network
from os_vif.objects import route as osv_route
from os_vif.objects import subnet as osv_subnet
from os_vif.objects import vif as osv_vif
from oslo_config import cfg as oslo_cfg
from stevedore import driver as stv_driver
from zun.common import exception
from zun.common import utils
# REVISIT(ivc): consider making this module part of kuryr-lib
_VIF_TRANSLATOR_NAMESPACE = "zun.vif_translators"
_VIF_MANAGERS = {}
def _neutron_to_osvif_network(os_network):
"""Converts Neutron network to os-vif Subnet.
:param os_network: openstack.network.v2.netwrork.Network object.
:return: an os-vif Network object
"""
obj = osv_network.Network(id=os_network['id'])
if os_network['name'] is not None:
obj.label = os_network['name']
if os_network['mtu'] is not None:
obj.mtu = os_network['mtu']
# Vlan information will be used later in Sriov binding driver
if os_network['provider:network_type'] == 'vlan':
obj.should_provide_vlan = True
obj.vlan = os_network['provider:segmentation_id']
return obj
def _neutron_to_osvif_subnet(os_subnet):
"""Converts Neutron subnet to os-vif Subnet.
:param os_subnet: openstack.network.v2.subnet.Subnet object
:return: an os-vif Subnet object
"""
obj = osv_subnet.Subnet(
cidr=os_subnet['cidr'],
dns=os_subnet['dns_nameservers'],
routes=_neutron_to_osvif_routes(os_subnet['host_routes']))
if os_subnet['gateway_ip'] is not None:
obj.gateway = os_subnet['gateway_ip']
return obj
def _neutron_to_osvif_routes(neutron_routes):
"""Converts Neutron host_routes to os-vif RouteList.
:param neutron_routes: list of routes as returned by neutron client's
'show_subnet' in 'host_routes' attribute
:return: an os-vif RouteList object
"""
# NOTE(gryf): Nested attributes for OpenStackSDK objects are simple types,
# like dicts and lists, that's why neutron_routes is a list of dicts.
obj_list = [osv_route.Route(cidr=route['destination'],
gateway=route['nexthop'])
for route in neutron_routes]
return osv_route.RouteList(objects=obj_list)
def _make_vif_subnet(subnets, subnet_id):
"""Makes a copy of an os-vif Subnet from subnets mapping.
:param subnets: subnet mapping
:param subnet_id: ID of the subnet to extract from 'subnets' mapping
:return: a copy of an os-vif Subnet object matching 'subnet_id'
"""
subnet = subnets[subnet_id]
vif_subnet = _neutron_to_osvif_subnet(subnet)
vif_subnet.ips = osv_fixed_ip.FixedIPList(objects=[])
return vif_subnet
def _make_vif_subnets(neutron_port, subnets):
"""Gets a list of os-vif Subnet objects for port.
:param neutron_port: dict containing port information as returned by
neutron client's 'show_port'
:param subnets: subnet mapping
:return: list of os-vif Subnet object
"""
vif_subnets = {}
for neutron_fixed_ip in neutron_port.get('fixed_ips', []):
subnet_id = neutron_fixed_ip['subnet_id']
ip_address = neutron_fixed_ip['ip_address']
if subnet_id not in subnets:
continue
try:
subnet = vif_subnets[subnet_id]
except KeyError:
subnet = _make_vif_subnet(subnets, subnet_id)
vif_subnets[subnet_id] = subnet
subnet.ips.objects.append(osv_fixed_ip.FixedIP(address=ip_address))
if not vif_subnets:
raise exception.ZunException(
"No valid subnets found for port %(port_id)s"
% {'port_id': neutron_port.get('id')})
return list(vif_subnets.values())
def _make_vif_network(neutron_port, network, subnets):
"""Get an os-vif Network object for port.
:param neutron_port: dict containing port information as returned by
neutron client's 'show_port'
:param subnets: subnet mapping
:return: os-vif Network object
"""
vif_network = _neutron_to_osvif_network(network)
vif_network.subnets = osv_subnet.SubnetList(
objects=_make_vif_subnets(neutron_port, subnets))
return vif_network
def _get_vif_name(neutron_port):
"""Gets a VIF device name for port.
:param neutron_port: dict containing port information as returned by
neutron client's 'show_port'
"""
vif_name, _ = kl_utils.get_veth_pair_names(neutron_port['id'])
return vif_name
def _get_ovs_hybrid_bridge_name(neutron_port):
"""Gets a name of the Linux bridge name for hybrid OpenVSwitch port.
:param neutron_port: dict containing port information as returned by
neutron client's 'show_port'
"""
return ('qbr' + neutron_port['id'])[:kl_const.NIC_NAME_LEN]
def neutron_to_osvif_vif_ovs(vif_plugin, neutron_port, network, subnets):
"""Converts Neutron port to VIF object for os-vif 'ovs' plugin.
:param vif_plugin: name of the os-vif plugin to use (i.e. 'ovs')
:param neutron_port: dict containing port information as returned by
neutron client's 'show_port'
:param subnets: subnet mapping
:return: os-vif VIF object
"""
profile = osv_vif.VIFPortProfileOpenVSwitch(
interface_id=neutron_port['id'])
details = neutron_port.get('binding:vif_details', {})
ovs_bridge = details.get('bridge_name',
oslo_cfg.CONF.neutron.ovs_bridge)
if not ovs_bridge:
raise oslo_cfg.RequiredOptError('ovs_bridge', 'neutron_defaults')
network = _make_vif_network(neutron_port, network, subnets)
network.bridge = ovs_bridge
if details.get('ovs_hybrid_plug'):
vif = osv_vif.VIFBridge(
id=neutron_port['id'],
address=neutron_port['mac_address'],
network=network,
has_traffic_filtering=details.get('port_filter', False),
preserve_on_delete=False,
active=utils.is_port_active(neutron_port),
port_profile=profile,
plugin=vif_plugin,
vif_name=_get_vif_name(neutron_port),
bridge_name=_get_ovs_hybrid_bridge_name(neutron_port))
else:
vif = osv_vif.VIFOpenVSwitch(
id=neutron_port['id'],
address=neutron_port['mac_address'],
network=network,
has_traffic_filtering=details.get('port_filter', False),
preserve_on_delete=False,
active=utils.is_port_active(neutron_port),
port_profile=profile,
plugin=vif_plugin,
vif_name=_get_vif_name(neutron_port),
bridge_name=network.bridge)
return vif
def neutron_to_osvif_vif(vif_translator, neutron_port, network, subnets):
"""Converts Neutron port to os-vif VIF object.
:param vif_translator: name of the traslator for the os-vif plugin to use
:param neutron_port: dict containing port information as returned by
neutron client
:param subnets: subnet mapping
:return: os-vif VIF object
"""
try:
mgr = _VIF_MANAGERS[vif_translator]
except KeyError:
mgr = stv_driver.DriverManager(
namespace=_VIF_TRANSLATOR_NAMESPACE,
name=vif_translator, invoke_on_load=False)
_VIF_MANAGERS[vif_translator] = mgr
return mgr.driver(vif_translator, neutron_port, network, subnets)